Commit 27d83a93 authored by Cedric CAFFY's avatar Cedric CAFFY Committed by Eric Cano
Browse files

Support of reclaim a vid

parent 55709a25
......@@ -11302,6 +11302,216 @@ TEST_P(cta_catalogue_CatalogueTest, reclaimTape_full_lastFSeq_1_one_tape_file) {
ASSERT_THROW(m_catalogue->reclaimTape(m_admin, vid1), exception::UserError);
}
TEST_P(cta_catalogue_CatalogueTest, reclaimTape_full_lastFSeq_1_one_tape_file_superseded) {
using namespace cta;
const std::string diskInstanceName1 = "disk_instance_1";
ASSERT_TRUE(m_catalogue->getTapes().empty());
const std::string vid1 = "VID123";
const std::string vid2 = "VID234";
const std::string mediaType = "media_type";
const std::string vendor = "vendor";
const std::string logicalLibraryName = "logical_library_name";
const std::string tapePoolName = "tape_pool_name";
const std::string vo = "vo";
const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000;
const bool disabledValue = true;
const bool fullValue = false;
const std::string createTapeComment = "Create tape";
m_catalogue->createLogicalLibrary(m_admin, logicalLibraryName, "Create logical library");
m_catalogue->createTapePool(m_admin, tapePoolName, vo, 2, true, "Create tape pool");
m_catalogue->createTape(m_admin, vid1, mediaType, vendor, logicalLibraryName, tapePoolName, capacityInBytes,
disabledValue, fullValue, createTapeComment);
m_catalogue->createTape(m_admin, vid2, mediaType, vendor, logicalLibraryName, tapePoolName, capacityInBytes,
disabledValue, fullValue, createTapeComment);
{
const std::list<common::dataStructures::Tape> tapes = m_catalogue->getTapes();
const std::map<std::string, common::dataStructures::Tape> vidToTape = tapeListToMap(tapes);
ASSERT_EQ(2, vidToTape.size());
auto it = vidToTape.find(vid1);
const common::dataStructures::Tape &tape = it->second;
ASSERT_EQ(vid1, tape.vid);
ASSERT_EQ(mediaType, tape.mediaType);
ASSERT_EQ(vendor, tape.vendor);
ASSERT_EQ(0, tape.dataOnTapeInBytes);
ASSERT_EQ(0, tape.lastFSeq);
ASSERT_EQ(logicalLibraryName, tape.logicalLibraryName);
ASSERT_EQ(tapePoolName, tape.tapePoolName);
ASSERT_EQ(vo, tape.vo);
ASSERT_EQ(capacityInBytes, tape.capacityInBytes);
ASSERT_TRUE(disabledValue == tape.disabled);
ASSERT_TRUE(fullValue == tape.full);
ASSERT_EQ(createTapeComment, tape.comment);
ASSERT_FALSE(tape.labelLog);
ASSERT_FALSE(tape.lastReadLog);
ASSERT_FALSE(tape.lastWriteLog);
const common::dataStructures::EntryLog creationLog = tape.creationLog;
ASSERT_EQ(m_admin.username, creationLog.username);
ASSERT_EQ(m_admin.host, creationLog.host);
const common::dataStructures::EntryLog lastModificationLog =
tape.lastModificationLog;
ASSERT_EQ(creationLog, lastModificationLog);
}
// Record initial tape file
const uint64_t archiveFileId = 1234;
ASSERT_FALSE(m_catalogue->getArchiveFilesItor().hasMore());
ASSERT_THROW(m_catalogue->getArchiveFileById(archiveFileId), exception::Exception);
common::dataStructures::StorageClass storageClass;
storageClass.diskInstance = diskInstanceName1;
storageClass.name = "storage_class";
storageClass.nbCopies = 1;
storageClass.comment = "Create storage class";
m_catalogue->createStorageClass(m_admin, storageClass);
const uint64_t archiveFileSize = 1;
const std::string tapeDrive = "tape_drive";
const std::string checksumType = "checksum_type";
const std::string checksumValue = "checksum_value";
auto file1WrittenUP=cta::make_unique<cta::catalogue::TapeFileWritten>();
auto & file1Written = *file1WrittenUP;
std::set<cta::catalogue::TapeItemWrittenPointer> file1WrittenSet;
file1WrittenSet.insert(file1WrittenUP.release());
file1Written.archiveFileId = archiveFileId;
file1Written.diskInstance = storageClass.diskInstance;
file1Written.diskFileId = "5678";
file1Written.diskFilePath = "/public_dir/public_file";
file1Written.diskFileUser = "public_disk_user";
file1Written.diskFileGroup = "public_disk_group";
file1Written.size = archiveFileSize;
file1Written.checksumType = checksumType;
file1Written.checksumValue = checksumValue;
file1Written.storageClassName = storageClass.name;
file1Written.vid = vid1;
file1Written.fSeq = 1;
file1Written.blockId = 4321;
file1Written.compressedSize = 1;
file1Written.copyNb = 1;
file1Written.tapeDrive = tapeDrive;
m_catalogue->filesWrittenToTape(file1WrittenSet);
{
const common::dataStructures::ArchiveFile archiveFile = m_catalogue->getArchiveFileById(archiveFileId);
ASSERT_EQ(file1Written.archiveFileId, archiveFile.archiveFileID);
ASSERT_EQ(file1Written.diskFileId, archiveFile.diskFileId);
ASSERT_EQ(file1Written.size, archiveFile.fileSize);
ASSERT_EQ(file1Written.checksumType, archiveFile.checksumType);
ASSERT_EQ(file1Written.checksumValue, archiveFile.checksumValue);
ASSERT_EQ(file1Written.storageClassName, archiveFile.storageClass);
ASSERT_EQ(file1Written.diskInstance, archiveFile.diskInstance);
ASSERT_EQ(file1Written.diskFilePath, archiveFile.diskFileInfo.path);
ASSERT_EQ(file1Written.diskFileUser, archiveFile.diskFileInfo.owner);
ASSERT_EQ(file1Written.diskFileGroup, archiveFile.diskFileInfo.group);
ASSERT_EQ(1, archiveFile.tapeFiles.size());
auto copyNbToTapeFile1Itor = archiveFile.tapeFiles.find(1);
ASSERT_FALSE(copyNbToTapeFile1Itor == archiveFile.tapeFiles.end());
const common::dataStructures::TapeFile &tapeFile1 = copyNbToTapeFile1Itor->second;
ASSERT_EQ(file1Written.vid, tapeFile1.vid);
ASSERT_EQ(file1Written.fSeq, tapeFile1.fSeq);
ASSERT_EQ(file1Written.blockId, tapeFile1.blockId);
ASSERT_EQ(file1Written.compressedSize, tapeFile1.compressedSize);
ASSERT_EQ(file1Written.checksumType, tapeFile1.checksumType);
ASSERT_EQ(file1Written.checksumValue, tapeFile1.checksumValue);
ASSERT_EQ(file1Written.copyNb, tapeFile1.copyNb);
}
{
const std::list<common::dataStructures::Tape> tapes = m_catalogue->getTapes();
const std::map<std::string, common::dataStructures::Tape> vidToTape = tapeListToMap(tapes);
ASSERT_EQ(2, vidToTape.size());
auto it = vidToTape.find(vid1);
const common::dataStructures::Tape &tape = it->second;
ASSERT_EQ(vid1, tape.vid);
ASSERT_EQ(mediaType, tape.mediaType);
ASSERT_EQ(vendor, tape.vendor);
ASSERT_EQ(file1Written.size, tape.dataOnTapeInBytes);
ASSERT_EQ(1, tape.lastFSeq);
ASSERT_EQ(logicalLibraryName, tape.logicalLibraryName);
ASSERT_EQ(tapePoolName, tape.tapePoolName);
ASSERT_EQ(vo, tape.vo);
ASSERT_EQ(capacityInBytes, tape.capacityInBytes);
ASSERT_TRUE(disabledValue == tape.disabled);
ASSERT_TRUE(fullValue == tape.full);
ASSERT_EQ(createTapeComment, tape.comment);
ASSERT_FALSE(tape.labelLog);
ASSERT_FALSE(tape.lastReadLog);
ASSERT_TRUE((bool)tape.lastWriteLog);
ASSERT_EQ(tapeDrive, tape.lastWriteLog.value().drive);
const common::dataStructures::EntryLog creationLog = tape.creationLog;
ASSERT_EQ(m_admin.username, creationLog.username);
ASSERT_EQ(m_admin.host, creationLog.host);
const common::dataStructures::EntryLog lastModificationLog =
tape.lastModificationLog;
ASSERT_EQ(creationLog, lastModificationLog);
}
m_catalogue->setTapeFull(m_admin, vid1, true);
// Record superseding tape file
file1WrittenUP=cta::make_unique<cta::catalogue::TapeFileWritten>();
auto & file1WrittenAgain = *file1WrittenUP;
std::set<cta::catalogue::TapeItemWrittenPointer> file1WrittenAgainSet;
file1WrittenAgainSet.insert(file1WrittenUP.release());
file1WrittenAgain.archiveFileId = archiveFileId;
file1WrittenAgain.diskInstance = storageClass.diskInstance;
file1WrittenAgain.diskFileId = "5678";
file1WrittenAgain.diskFilePath = "/public_dir/public_file";
file1WrittenAgain.diskFileUser = "public_disk_user";
file1WrittenAgain.diskFileGroup = "public_disk_group";
file1WrittenAgain.size = archiveFileSize;
file1WrittenAgain.checksumType = checksumType;
file1WrittenAgain.checksumValue = checksumValue;
file1WrittenAgain.storageClassName = storageClass.name;
file1WrittenAgain.vid = vid2;
file1WrittenAgain.fSeq = 1;
file1WrittenAgain.blockId = 4321;
file1WrittenAgain.compressedSize = 1;
file1WrittenAgain.copyNb = 1;
file1WrittenAgain.tapeDrive = tapeDrive;
m_catalogue->filesWrittenToTape(file1WrittenAgainSet);
ASSERT_NO_THROW(m_catalogue->reclaimTape(m_admin, vid1));
{
//Test that the tape with vid1 is reclaimed
common::dataStructures::Tape tape = m_catalogue->getTapes().front();
ASSERT_EQ(vid1, tape.vid);
ASSERT_EQ(mediaType, tape.mediaType);
ASSERT_EQ(vendor, tape.vendor);
ASSERT_EQ(0, tape.dataOnTapeInBytes);
ASSERT_EQ(0, tape.lastFSeq);
ASSERT_EQ(logicalLibraryName, tape.logicalLibraryName);
ASSERT_EQ(tapePoolName, tape.tapePoolName);
ASSERT_EQ(vo, tape.vo);
ASSERT_EQ(capacityInBytes, tape.capacityInBytes);
ASSERT_TRUE(tape.disabled);
ASSERT_FALSE(tape.full);
ASSERT_EQ(createTapeComment, tape.comment);
ASSERT_FALSE(tape.labelLog);
ASSERT_FALSE(tape.lastReadLog);
ASSERT_TRUE((bool)tape.lastWriteLog);
ASSERT_EQ(tapeDrive, tape.lastWriteLog.value().drive);
}
}
TEST_P(cta_catalogue_CatalogueTest, ping) {
using namespace cta;
......
......@@ -2221,7 +2221,9 @@ void RdbmsCatalogue::reclaimTape(const common::dataStructures::SecurityIdentity
"WHERE "
"VID = :UPDATE_VID AND "
"IS_FULL != '0' AND "
"NOT EXISTS (SELECT VID FROM TAPE_FILE WHERE VID = :SELECT_VID)";
"NOT EXISTS (SELECT VID FROM TAPE_FILE WHERE VID = :SELECT_VID "
" AND SUPERSEDED_BY_VID IS NULL "
" AND SUPERSEDED_BY_FSEQ IS NULL)";
auto conn = m_connPool.getConn();
auto stmt = conn.createStmt(sql);
stmt.bindString(":LAST_UPDATE_USER_NAME", admin.username);
......@@ -4826,36 +4828,76 @@ void RdbmsCatalogue::insertTapeFile(
rdbms::Conn &conn,
const common::dataStructures::TapeFile &tapeFile,
const uint64_t archiveFileId) {
try {
const time_t now = time(nullptr);
const char *const sql =
"INSERT INTO TAPE_FILE("
"VID,"
"FSEQ,"
"BLOCK_ID,"
"COMPRESSED_SIZE_IN_BYTES,"
"COPY_NB,"
"CREATION_TIME,"
"ARCHIVE_FILE_ID)"
"VALUES("
":VID,"
":FSEQ,"
":BLOCK_ID,"
":COMPRESSED_SIZE_IN_BYTES,"
":COPY_NB,"
":CREATION_TIME,"
":ARCHIVE_FILE_ID)";
auto stmt = conn.createStmt(sql);
stmt.bindString(":VID", tapeFile.vid);
stmt.bindUint64(":FSEQ", tapeFile.fSeq);
stmt.bindUint64(":BLOCK_ID", tapeFile.blockId);
stmt.bindUint64(":COMPRESSED_SIZE_IN_BYTES", tapeFile.compressedSize);
stmt.bindUint64(":COPY_NB", tapeFile.copyNb);
stmt.bindUint64(":CREATION_TIME", now);
stmt.bindUint64(":ARCHIVE_FILE_ID", archiveFileId);
stmt.executeNonQuery();
rdbms::AutoRollback autoRollback(conn);
bool updateSupersededNeeded = false;
try{
{
const char *const sql =
"SELECT VID FROM TAPE_FILE "
"WHERE "
" TAPE_FILE.ARCHIVE_FILE_ID=:ARCHIVE_FILE_ID AND"
" TAPE_FILE.COPY_NB=:COPY_NB";
auto stmt = conn.createStmt(sql);
stmt.bindUint64(":ARCHIVE_FILE_ID",archiveFileId);
stmt.bindUint64(":COPY_NB",tapeFile.copyNb);
auto result = stmt.executeQuery();
if(result.next()){
updateSupersededNeeded = true;
}
}
{
const time_t now = time(nullptr);
const char *const sql =
"INSERT INTO TAPE_FILE("
"VID,"
"FSEQ,"
"BLOCK_ID,"
"COMPRESSED_SIZE_IN_BYTES,"
"COPY_NB,"
"CREATION_TIME,"
"ARCHIVE_FILE_ID)"
"VALUES("
":VID,"
":FSEQ,"
":BLOCK_ID,"
":COMPRESSED_SIZE_IN_BYTES,"
":COPY_NB,"
":CREATION_TIME,"
":ARCHIVE_FILE_ID)";
auto stmt = conn.createStmt(sql);
stmt.bindString(":VID", tapeFile.vid);
stmt.bindUint64(":FSEQ", tapeFile.fSeq);
stmt.bindUint64(":BLOCK_ID", tapeFile.blockId);
stmt.bindUint64(":COMPRESSED_SIZE_IN_BYTES", tapeFile.compressedSize);
stmt.bindUint64(":COPY_NB", tapeFile.copyNb);
stmt.bindUint64(":CREATION_TIME", now);
stmt.bindUint64(":ARCHIVE_FILE_ID", archiveFileId);
stmt.executeNonQuery();
if(!updateSupersededNeeded){
conn.commit();
}
}
{
if(updateSupersededNeeded){
const char *const sql =
"UPDATE TAPE_FILE SET "
"SUPERSEDED_BY_VID=:NEW_VID, " //VID of the new file
"SUPERSEDED_BY_FSEQ=:NEW_FSEQ " //FSEQ of the new file
"WHERE"
" TAPE_FILE.ARCHIVE_FILE_ID=:ARCHIVE_FILE_ID AND"
" TAPE_FILE.COPY_NB=:COPY_NB";
auto stmt = conn.createStmt(sql);
stmt.bindString(":NEW_VID",tapeFile.vid);
stmt.bindUint64(":NEW_FSEQ",tapeFile.fSeq);
stmt.bindUint64(":ARCHIVE_FILE_ID",archiveFileId);
stmt.bindUint64(":COPY_NB",tapeFile.copyNb);
stmt.executeNonQuery();
conn.commit();
}
}
} catch(exception::UserError &) {
throw;
} catch(exception::Exception &ex) {
......
......@@ -177,13 +177,17 @@ CREATE TABLE TAPE_FILE(
COPY_NB NUMERIC(20, 0) CONSTRAINT TAPE_FILE_CN_NN NOT NULL,
CREATION_TIME NUMERIC(20, 0) CONSTRAINT TAPE_FILE_CT_NN NOT NULL,
ARCHIVE_FILE_ID NUMERIC(20, 0) CONSTRAINT TAPE_FILE_AFI_NN NOT NULL,
SUPERSEDED_BY_VID VARCHAR(100),
SUPERSEDED_BY_FSEQ INTEGER,
CONSTRAINT TAPE_FILE_PK PRIMARY KEY(VID, FSEQ),
CONSTRAINT TAPE_FILE_TAPE_FK FOREIGN KEY(VID)
REFERENCES TAPE(VID),
CONSTRAINT TAPE_FILE_ARCHIVE_FILE_FK FOREIGN KEY(ARCHIVE_FILE_ID)
REFERENCES ARCHIVE_FILE(ARCHIVE_FILE_ID),
CONSTRAINT TAPE_FILE_VID_BLOCK_ID_UN UNIQUE(VID, BLOCK_ID),
CONSTRAINT TAPE_FILE_COPY_NB_GT_ZERO CHECK(COPY_NB > 0)
CONSTRAINT TAPE_FILE_COPY_NB_GT_ZERO CHECK(COPY_NB > 0),
CONSTRAINT TAPE_FILE_SS_VID_FSEQ FOREIGN KEY(SUPERSEDED_BY_VID, SUPERSEDED_BY_FSEQ)
REFERENCES TAPE_FILE(VID, FSEQ)
);
CREATE INDEX TAPE_FILE_VID_IDX ON TAPE_FILE(VID);
CREATE INDEX TAPE_FILE_ARCHIVE_FILE_ID_IDX ON TAPE_FILE(ARCHIVE_FILE_ID);
......
......@@ -41,7 +41,7 @@ kubectl -n ${NAMESPACE} exec client -- bash /root/simple_client_ar.sh || exit 1
kubectl -n ${NAMESPACE} cp grep_xrdlog_mgm_for_error.sh ctaeos:/root/grep_xrdlog_mgm_for_error.sh
kubectl -n ${NAMESPACE} exec ctaeos -- bash /root/grep_xrdlog_mgm_for_error.sh || exit 1
NB_FILES=10000
NB_FILES=20
FILE_SIZE_KB=15
echo
......@@ -50,7 +50,7 @@ echo " Archiving ${NB_FILES} files of ${FILE_SIZE_KB}kB each"
echo " Archiving files: xrdcp as user1"
echo " Retrieving them as poweruser1"
kubectl -n ${NAMESPACE} cp client_ar.sh client:/root/client_ar.sh
kubectl -n ${NAMESPACE} exec client -- bash /root/client_ar.sh -n ${NB_FILES} -s ${FILE_SIZE_KB} -p 100 -d /eos/ctaeos/preprod -v -r || exit 1
kubectl -n ${NAMESPACE} exec client -- bash /root/client_ar.sh -n ${NB_FILES} -s ${FILE_SIZE_KB} -p 100 -d /eos/ctaeos/preprod -v || exit 1
kubectl -n ${NAMESPACE} exec ctaeos -- bash /root/grep_xrdlog_mgm_for_error.sh || exit 1
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment