diff --git a/catalogue/CMakeLists.txt b/catalogue/CMakeLists.txt index b576c91ea9152157906d5355357b6a4d392a956c..d43302c42586fd30d5ed1f0e85f8f9329259a5ac 100644 --- a/catalogue/CMakeLists.txt +++ b/catalogue/CMakeLists.txt @@ -71,6 +71,9 @@ set (CATALOGUE_LIB_SRC_FILES UserSpecifiedAnEmptyStringVendor.cpp UserSpecifiedAnEmptyStringVid.cpp UserSpecifiedAnEmptyStringVo.cpp + UserSpecifiedAnEmptyTapePool.cpp + UserSpecifiedStorageClassUsedByArchiveFiles.cpp + UserSpecifiedStorageClassUsedByArchiveRoutes.cpp UserSpecifiedAZeroCapacity.cpp UserSpecifiedAZeroCopyNb.cpp) @@ -159,7 +162,8 @@ add_custom_command(OUTPUT PostgresCatalogueSchema.cpp set(IN_MEMORY_CATALOGUE_UNIT_TESTS_LIB_SRC_FILES CatalogueTest.cpp InMemoryCatalogueTest.cpp - InMemoryVersionOfCatalogueTest.cpp) + InMemoryVersionOfCatalogueTest.cpp + TapeItemWrittenPointerTest.cpp) add_library (ctainmemorycatalogueunittests SHARED ${IN_MEMORY_CATALOGUE_UNIT_TESTS_LIB_SRC_FILES}) diff --git a/catalogue/CatalogueTest.cpp b/catalogue/CatalogueTest.cpp index 9becc11bc1b95a2e2b8036bc0228ce2aaf859835..9fcf23f773b1c6e9ed4fcef9a3829e439aadaada 100644 --- a/catalogue/CatalogueTest.cpp +++ b/catalogue/CatalogueTest.cpp @@ -33,8 +33,11 @@ #include "catalogue/UserSpecifiedAnEmptyStringVendor.hpp" #include "catalogue/UserSpecifiedAnEmptyStringVid.hpp" #include "catalogue/UserSpecifiedAnEmptyStringVo.hpp" +#include "catalogue/UserSpecifiedAnEmptyTapePool.hpp" #include "catalogue/UserSpecifiedAZeroCapacity.hpp" #include "catalogue/UserSpecifiedAZeroCopyNb.hpp" +#include "catalogue/UserSpecifiedStorageClassUsedByArchiveFiles.hpp" +#include "catalogue/UserSpecifiedStorageClassUsedByArchiveRoutes.hpp" #include "common/exception/Exception.hpp" #include "common/exception/UserError.hpp" #include "common/make_unique.hpp" @@ -1025,6 +1028,7 @@ TEST_P(cta_catalogue_CatalogueTest, deleteTapePool_notEmpty) { ASSERT_EQ(0, pool.nbPhysicalFiles); } + ASSERT_THROW(m_catalogue->deleteTapePool(tapePoolName), catalogue::UserSpecifiedAnEmptyTapePool); ASSERT_THROW(m_catalogue->deleteTapePool(tapePoolName), exception::UserError); } @@ -2044,7 +2048,11 @@ TEST_P(cta_catalogue_CatalogueTest, createArchiveRoute_deleteStorageClass) { route.lastModificationLog; ASSERT_EQ(creationLog, lastModificationLog); - ASSERT_THROW(m_catalogue->deleteStorageClass(storageClass.diskInstance, storageClass.name), exception::Exception); + ASSERT_THROW(m_catalogue->deleteStorageClass(storageClass.diskInstance, storageClass.name), + catalogue::UserSpecifiedStorageClassUsedByArchiveRoutes); + + ASSERT_THROW(m_catalogue->deleteStorageClass(storageClass.diskInstance, storageClass.name), + exception::UserError); } TEST_P(cta_catalogue_CatalogueTest, modifyArchiveRouteTapePoolName) { @@ -10212,6 +10220,276 @@ TEST_P(cta_catalogue_CatalogueTest, DISABLED_concurrent_filesWrittenToTape_many_ } } +TEST_P(cta_catalogue_CatalogueTest, filesWrittenToTape_1_archive_file_1_tape_copy) { + using namespace cta; + + const std::string vid1 = "VID123"; + const std::string mediaType = "media_type"; + const std::string vendor = "vendor"; + const std::string logicalLibraryName = "logical_library_name"; + const bool logicalLibraryIsDisabled= false; + const std::string tapePoolName = "tape_pool_name"; + const std::string vo = "vo"; + const uint64_t nbPartialTapes = 2; + const bool isEncrypted = true; + const cta::optional<std::string> supply("value for the supply pool mechanism"); + const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000; + const bool disabledValue = true; + const bool fullValue = false; + const bool readOnlyValue = true; + const std::string comment = "Create tape"; + + m_catalogue->createLogicalLibrary(m_admin, logicalLibraryName, logicalLibraryIsDisabled, "Create logical library"); + m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, supply, "Create tape pool"); + m_catalogue->createTape(m_admin, vid1, mediaType, vendor, logicalLibraryName, tapePoolName, capacityInBytes, + disabledValue, fullValue, readOnlyValue, comment); + + { + const std::list<common::dataStructures::Tape> tapes = m_catalogue->getTapes(); + + ASSERT_EQ(1, tapes.size()); + + const std::map<std::string, common::dataStructures::Tape> vidToTape = tapeListToMap(tapes); + { + auto it = vidToTape.find(vid1); + const common::dataStructures::Tape &tape = it->second; + ASSERT_EQ(vid1, tape.vid); + ASSERT_EQ(mediaType, tape.mediaType); + ASSERT_EQ(vendor, tape.vendor); + ASSERT_EQ(logicalLibraryName, tape.logicalLibraryName); + ASSERT_EQ(tapePoolName, tape.tapePoolName); + ASSERT_EQ(vo, tape.vo); + ASSERT_EQ(capacityInBytes, tape.capacityInBytes); + ASSERT_TRUE(disabledValue == tape.disabled); + ASSERT_TRUE(fullValue == tape.full); + ASSERT_TRUE(readOnlyValue == tape.readOnly); + ASSERT_FALSE(tape.isFromCastor); + ASSERT_EQ(0, tape.readMountCount); + ASSERT_EQ(0, tape.writeMountCount); + ASSERT_EQ(comment, tape.comment); + ASSERT_FALSE(tape.labelLog); + ASSERT_FALSE(tape.lastReadLog); + ASSERT_FALSE(tape.lastWriteLog); + + const common::dataStructures::EntryLog creationLog = tape.creationLog; + ASSERT_EQ(m_admin.username, creationLog.username); + ASSERT_EQ(m_admin.host, creationLog.host); + + const common::dataStructures::EntryLog lastModificationLog = + tape.lastModificationLog; + ASSERT_EQ(creationLog, lastModificationLog); + } + } + + const uint64_t archiveFileId = 1234; + + ASSERT_FALSE(m_catalogue->getArchiveFilesItor().hasMore()); + ASSERT_THROW(m_catalogue->getArchiveFileById(archiveFileId), exception::Exception); + + common::dataStructures::StorageClass storageClass; + storageClass.diskInstance = "disk_instance"; + storageClass.name = "storage_class"; + storageClass.nbCopies = 1; + storageClass.comment = "Create storage class"; + m_catalogue->createStorageClass(m_admin, storageClass); + + const uint64_t archiveFileSize = 1; + const std::string tapeDrive = "tape_drive"; + + auto file1WrittenUP=cta::make_unique<cta::catalogue::TapeFileWritten>(); + auto & file1Written = *file1WrittenUP; + std::set<cta::catalogue::TapeItemWrittenPointer> file1WrittenSet; + file1WrittenSet.insert(file1WrittenUP.release()); + file1Written.archiveFileId = archiveFileId; + file1Written.diskInstance = storageClass.diskInstance; + file1Written.diskFileId = "5678"; + file1Written.diskFilePath = "/public_dir/public_file"; + file1Written.diskFileOwnerUid = PUBLIC_DISK_USER; + file1Written.diskFileGid = PUBLIC_DISK_GROUP; + file1Written.size = archiveFileSize; + file1Written.checksumBlob.insert(checksum::ADLER32, "1234"); + file1Written.storageClassName = storageClass.name; + file1Written.vid = vid1; + file1Written.fSeq = 1; + file1Written.blockId = 4321; + file1Written.copyNb = 1; + file1Written.tapeDrive = tapeDrive; + m_catalogue->filesWrittenToTape(file1WrittenSet); + + { + catalogue::TapeSearchCriteria searchCriteria; + searchCriteria.vid = file1Written.vid; + std::list<common::dataStructures::Tape> tapes = m_catalogue->getTapes(searchCriteria); + ASSERT_EQ(1, tapes.size()); + const common::dataStructures::Tape &tape = tapes.front(); + ASSERT_EQ(1, tape.lastFSeq); + } + + { + const common::dataStructures::ArchiveFile archiveFile = m_catalogue->getArchiveFileById(archiveFileId); + + ASSERT_EQ(file1Written.archiveFileId, archiveFile.archiveFileID); + ASSERT_EQ(file1Written.diskFileId, archiveFile.diskFileId); + ASSERT_EQ(file1Written.size, archiveFile.fileSize); + ASSERT_EQ(file1Written.checksumBlob, archiveFile.checksumBlob); + ASSERT_EQ(file1Written.storageClassName, archiveFile.storageClass); + + ASSERT_EQ(file1Written.diskInstance, archiveFile.diskInstance); + ASSERT_EQ(file1Written.diskFilePath, archiveFile.diskFileInfo.path); + ASSERT_EQ(file1Written.diskFileOwnerUid, archiveFile.diskFileInfo.owner_uid); + ASSERT_EQ(file1Written.diskFileGid, archiveFile.diskFileInfo.gid); + + ASSERT_EQ(1, archiveFile.tapeFiles.size()); + auto copyNbToTapeFile1Itor = archiveFile.tapeFiles.find(1); + ASSERT_FALSE(copyNbToTapeFile1Itor == archiveFile.tapeFiles.end()); + const common::dataStructures::TapeFile &tapeFile1 = *copyNbToTapeFile1Itor; + ASSERT_EQ(file1Written.vid, tapeFile1.vid); + ASSERT_EQ(file1Written.fSeq, tapeFile1.fSeq); + ASSERT_EQ(file1Written.blockId, tapeFile1.blockId); + ASSERT_EQ(file1Written.checksumBlob, tapeFile1.checksumBlob); + ASSERT_EQ(file1Written.copyNb, tapeFile1.copyNb); + } +} + +TEST_P(cta_catalogue_CatalogueTest, filesWrittenToTape_1_archive_file_1_tape_copy_deleteStorageClass) { + using namespace cta; + + const std::string vid1 = "VID123"; + const std::string mediaType = "media_type"; + const std::string vendor = "vendor"; + const std::string logicalLibraryName = "logical_library_name"; + const bool logicalLibraryIsDisabled= false; + const std::string tapePoolName = "tape_pool_name"; + const std::string vo = "vo"; + const uint64_t nbPartialTapes = 2; + const bool isEncrypted = true; + const cta::optional<std::string> supply("value for the supply pool mechanism"); + const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000; + const bool disabledValue = true; + const bool fullValue = false; + const bool readOnlyValue = true; + const std::string comment = "Create tape"; + + m_catalogue->createLogicalLibrary(m_admin, logicalLibraryName, logicalLibraryIsDisabled, "Create logical library"); + m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, supply, "Create tape pool"); + m_catalogue->createTape(m_admin, vid1, mediaType, vendor, logicalLibraryName, tapePoolName, capacityInBytes, + disabledValue, fullValue, readOnlyValue, comment); + + { + const std::list<common::dataStructures::Tape> tapes = m_catalogue->getTapes(); + + ASSERT_EQ(1, tapes.size()); + + const std::map<std::string, common::dataStructures::Tape> vidToTape = tapeListToMap(tapes); + { + auto it = vidToTape.find(vid1); + const common::dataStructures::Tape &tape = it->second; + ASSERT_EQ(vid1, tape.vid); + ASSERT_EQ(mediaType, tape.mediaType); + ASSERT_EQ(vendor, tape.vendor); + ASSERT_EQ(logicalLibraryName, tape.logicalLibraryName); + ASSERT_EQ(tapePoolName, tape.tapePoolName); + ASSERT_EQ(vo, tape.vo); + ASSERT_EQ(capacityInBytes, tape.capacityInBytes); + ASSERT_TRUE(disabledValue == tape.disabled); + ASSERT_TRUE(fullValue == tape.full); + ASSERT_TRUE(readOnlyValue == tape.readOnly); + ASSERT_FALSE(tape.isFromCastor); + ASSERT_EQ(0, tape.readMountCount); + ASSERT_EQ(0, tape.writeMountCount); + ASSERT_EQ(comment, tape.comment); + ASSERT_FALSE(tape.labelLog); + ASSERT_FALSE(tape.lastReadLog); + ASSERT_FALSE(tape.lastWriteLog); + + const common::dataStructures::EntryLog creationLog = tape.creationLog; + ASSERT_EQ(m_admin.username, creationLog.username); + ASSERT_EQ(m_admin.host, creationLog.host); + + const common::dataStructures::EntryLog lastModificationLog = + tape.lastModificationLog; + ASSERT_EQ(creationLog, lastModificationLog); + } + } + + const uint64_t archiveFileId = 1234; + + ASSERT_FALSE(m_catalogue->getArchiveFilesItor().hasMore()); + ASSERT_THROW(m_catalogue->getArchiveFileById(archiveFileId), exception::Exception); + + common::dataStructures::StorageClass storageClass; + storageClass.diskInstance = "disk_instance"; + storageClass.name = "storage_class"; + storageClass.nbCopies = 1; + storageClass.comment = "Create storage class"; + m_catalogue->createStorageClass(m_admin, storageClass); + + const uint64_t archiveFileSize = 1; + const std::string tapeDrive = "tape_drive"; + + auto file1WrittenUP=cta::make_unique<cta::catalogue::TapeFileWritten>(); + auto & file1Written = *file1WrittenUP; + std::set<cta::catalogue::TapeItemWrittenPointer> file1WrittenSet; + file1WrittenSet.insert(file1WrittenUP.release()); + file1Written.archiveFileId = archiveFileId; + file1Written.diskInstance = storageClass.diskInstance; + file1Written.diskFileId = "5678"; + file1Written.diskFilePath = "/public_dir/public_file"; + file1Written.diskFileOwnerUid = PUBLIC_DISK_USER; + file1Written.diskFileGid = PUBLIC_DISK_GROUP; + file1Written.size = archiveFileSize; + file1Written.checksumBlob.insert(checksum::ADLER32, "1234"); + file1Written.storageClassName = storageClass.name; + file1Written.vid = vid1; + file1Written.fSeq = 1; + file1Written.blockId = 4321; + file1Written.copyNb = 1; + file1Written.tapeDrive = tapeDrive; + m_catalogue->filesWrittenToTape(file1WrittenSet); + + { + catalogue::TapeSearchCriteria searchCriteria; + searchCriteria.vid = file1Written.vid; + std::list<common::dataStructures::Tape> tapes = m_catalogue->getTapes(searchCriteria); + ASSERT_EQ(1, tapes.size()); + const common::dataStructures::Tape &tape = tapes.front(); + ASSERT_EQ(1, tape.lastFSeq); + } + + { + const common::dataStructures::ArchiveFile archiveFile = m_catalogue->getArchiveFileById(archiveFileId); + + ASSERT_EQ(file1Written.archiveFileId, archiveFile.archiveFileID); + ASSERT_EQ(file1Written.diskFileId, archiveFile.diskFileId); + ASSERT_EQ(file1Written.size, archiveFile.fileSize); + ASSERT_EQ(file1Written.checksumBlob, archiveFile.checksumBlob); + ASSERT_EQ(file1Written.storageClassName, archiveFile.storageClass); + + ASSERT_EQ(file1Written.diskInstance, archiveFile.diskInstance); + ASSERT_EQ(file1Written.diskFilePath, archiveFile.diskFileInfo.path); + ASSERT_EQ(file1Written.diskFileOwnerUid, archiveFile.diskFileInfo.owner_uid); + ASSERT_EQ(file1Written.diskFileGid, archiveFile.diskFileInfo.gid); + + ASSERT_EQ(1, archiveFile.tapeFiles.size()); + auto copyNbToTapeFile1Itor = archiveFile.tapeFiles.find(1); + ASSERT_FALSE(copyNbToTapeFile1Itor == archiveFile.tapeFiles.end()); + const common::dataStructures::TapeFile &tapeFile1 = *copyNbToTapeFile1Itor; + ASSERT_EQ(file1Written.vid, tapeFile1.vid); + ASSERT_EQ(file1Written.fSeq, tapeFile1.fSeq); + ASSERT_EQ(file1Written.blockId, tapeFile1.blockId); + ASSERT_EQ(file1Written.checksumBlob, tapeFile1.checksumBlob); + ASSERT_EQ(file1Written.copyNb, tapeFile1.copyNb); + } + + ASSERT_TRUE(m_catalogue->getArchiveRoutes().empty()); + + ASSERT_THROW(m_catalogue->deleteStorageClass(storageClass.diskInstance, storageClass.name), + catalogue::UserSpecifiedStorageClassUsedByArchiveFiles); + + ASSERT_THROW(m_catalogue->deleteStorageClass(storageClass.diskInstance, storageClass.name), + exception::UserError); +} + TEST_P(cta_catalogue_CatalogueTest, filesWrittenToTape_1_archive_file_2_tape_copies) { using namespace cta; diff --git a/catalogue/OracleCatalogue.cpp b/catalogue/OracleCatalogue.cpp index 14e108f7b514a351aaa6ca1f86abd1b71b215d99..98b28965e2523285084a40695671d5aff17a5328 100644 --- a/catalogue/OracleCatalogue.cpp +++ b/catalogue/OracleCatalogue.cpp @@ -485,26 +485,46 @@ void OracleCatalogue::filesWrittenToTape(const std::set<TapeItemWrittenPointer> fileSizeAndChecksum.checksumBlob.validate(event.checksumBlob); } - const char *const sql = - "BEGIN" "\n" - "INSERT INTO TAPE_FILE (VID, FSEQ, BLOCK_ID, LOGICAL_SIZE_IN_BYTES," "\n" - "COPY_NB, CREATION_TIME, ARCHIVE_FILE_ID)" "\n" - "SELECT VID, FSEQ, BLOCK_ID, LOGICAL_SIZE_IN_BYTES," "\n" - "COPY_NB, CREATION_TIME, ARCHIVE_FILE_ID FROM TEMP_TAPE_FILE_INSERTION_BATCH;" "\n" - "FOR TF IN (SELECT * FROM TEMP_TAPE_FILE_INSERTION_BATCH)" "\n" - "LOOP" "\n" - "UPDATE TAPE_FILE SET" "\n" - "SUPERSEDED_BY_VID=TF.VID," /*VID of the new file*/ "\n" - "SUPERSEDED_BY_FSEQ=TF.FSEQ" /*FSEQ of the new file*/ "\n" - "WHERE" "\n" - "TAPE_FILE.ARCHIVE_FILE_ID= TF.ARCHIVE_FILE_ID AND" "\n" - "TAPE_FILE.COPY_NB= TF.COPY_NB AND" "\n" - "(TAPE_FILE.VID <> TF.VID OR TAPE_FILE.FSEQ <> TF.FSEQ);" "\n" - "END LOOP;" "\n" - "END;"; - conn.setAutocommitMode(rdbms::AutocommitMode::AUTOCOMMIT_ON); - auto stmt = conn.createStmt(sql); - stmt.executeNonQuery(); + { + const char *const sql = + "INSERT INTO TAPE_FILE (VID, FSEQ, BLOCK_ID, LOGICAL_SIZE_IN_BYTES," "\n" + "COPY_NB, CREATION_TIME, ARCHIVE_FILE_ID)" "\n" + "SELECT VID, FSEQ, BLOCK_ID, LOGICAL_SIZE_IN_BYTES," "\n" + "COPY_NB, CREATION_TIME, ARCHIVE_FILE_ID FROM TEMP_TAPE_FILE_INSERTION_BATCH"; + auto stmt = conn.createStmt(sql); + stmt.executeNonQuery(); + } + + { + const char *const sql = + "MERGE INTO" "\n" + "TAPE_FILE" "\n" + "USING(" "\n" + "SELECT" "\n" + "ARCHIVE_FILE_ID," "\n" + "COPY_NB," "\n" + "VID," "\n" + // Using MAX(FSEQ) to cover the same tape copy being written more than + // once. The last one written supersedes the previous ones. + "MAX(FSEQ) AS MAX_FSEQ" "\n" + "FROM" "\n" + "TEMP_TAPE_FILE_INSERTION_BATCH" "\n" + "GROUP BY" "\n" + "ARCHIVE_FILE_ID, COPY_NB, VID" "\n" + ") TEMP" "\n" + "ON(" "\n" + "TAPE_FILE.ARCHIVE_FILE_ID = TEMP.ARCHIVE_FILE_ID AND" "\n" + "TAPE_FILE.COPY_NB = TEMP.COPY_NB)" "\n" + "WHEN MATCHED THEN" "\n" + "UPDATE SET" "\n" + "TAPE_FILE.SUPERSEDED_BY_VID = TEMP.VID," "\n" + "TAPE_FILE.SUPERSEDED_BY_FSEQ = TEMP.MAX_FSEQ" "\n" + "WHERE" "\n" + "NOT(TAPE_FILE.VID = TEMP.VID AND TAPE_FILE.FSEQ = TEMP.MAX_FSEQ)"; + conn.setAutocommitMode(rdbms::AutocommitMode::AUTOCOMMIT_ON); + auto stmt = conn.createStmt(sql); + stmt.executeNonQuery(); + } } catch(exception::UserError &) { throw; } catch(exception::Exception &ex) { diff --git a/catalogue/RdbmsCatalogue.cpp b/catalogue/RdbmsCatalogue.cpp index a5c2e975bbe29b679acc9c6cfc47b44b6f5c0ad0..c9df4e26af75300f2b1a90f607629d516f40a083 100644 --- a/catalogue/RdbmsCatalogue.cpp +++ b/catalogue/RdbmsCatalogue.cpp @@ -37,8 +37,11 @@ #include "catalogue/UserSpecifiedAnEmptyStringVendor.hpp" #include "catalogue/UserSpecifiedAnEmptyStringVid.hpp" #include "catalogue/UserSpecifiedAnEmptyStringVo.hpp" +#include "catalogue/UserSpecifiedAnEmptyTapePool.hpp" #include "catalogue/UserSpecifiedAZeroCapacity.hpp" #include "catalogue/UserSpecifiedAZeroCopyNb.hpp" +#include "catalogue/UserSpecifiedStorageClassUsedByArchiveFiles.hpp" +#include "catalogue/UserSpecifiedStorageClassUsedByArchiveRoutes.hpp" #include "common/dataStructures/TapeFile.hpp" #include "common/exception/Exception.hpp" #include "common/exception/UserError.hpp" @@ -407,13 +410,24 @@ bool RdbmsCatalogue::storageClassExists(rdbms::Conn &conn, const std::string &di //------------------------------------------------------------------------------ void RdbmsCatalogue::deleteStorageClass(const std::string &diskInstanceName, const std::string &storageClassName) { try { + auto conn = m_connPool.getConn(); + + if(storageClassIsUsedByArchiveRoutes(conn, storageClassName)) { + throw UserSpecifiedStorageClassUsedByArchiveRoutes(std::string("The ") + storageClassName + + " storage class is being used by one or more archive routes"); + } + + if(storageClassIsUsedByArchiveFiles(conn, storageClassName)) { + throw UserSpecifiedStorageClassUsedByArchiveFiles(std::string("The ") + storageClassName + + " storage class is being used by one or more archive files"); + } + const char *const sql = "DELETE FROM " "STORAGE_CLASS " "WHERE " "DISK_INSTANCE_NAME = :DISK_INSTANCE_NAME AND " "STORAGE_CLASS_NAME = :STORAGE_CLASS_NAME"; - auto conn = m_connPool.getConn(); auto stmt = conn.createStmt(sql); stmt.bindString(":DISK_INSTANCE_NAME", diskInstanceName); @@ -432,6 +446,62 @@ void RdbmsCatalogue::deleteStorageClass(const std::string &diskInstanceName, con } } +//------------------------------------------------------------------------------ +// storageClassIsUsedByArchiveRoutes +//------------------------------------------------------------------------------ +bool RdbmsCatalogue::storageClassIsUsedByArchiveRoutes(rdbms::Conn &conn, const std::string &storageClassName) const { + try { + const char *const sql = + "SELECT " + "STORAGE_CLASS.STORAGE_CLASS_NAME AS STORAGE_CLASS_NAME " + "FROM " + "ARCHIVE_ROUTE " + "INNER JOIN " + "STORAGE_CLASS " + "ON " + "ARCHIVE_ROUTE.STORAGE_CLASS_ID = STORAGE_CLASS.STORAGE_CLASS_ID " + "WHERE " + "STORAGE_CLASS_NAME = :STORAGE_CLASS_NAME"; + auto stmt = conn.createStmt(sql); + stmt.bindString(":STORAGE_CLASS_NAME", storageClassName); + auto rset = stmt.executeQuery(); + return rset.next(); + } catch(exception::UserError &) { + throw; + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + ": " + ex.getMessage().str()); + throw; + } +} + +//------------------------------------------------------------------------------ +// storageClassIsUsedByARchiveFiles +//------------------------------------------------------------------------------ +bool RdbmsCatalogue::storageClassIsUsedByArchiveFiles(rdbms::Conn &conn, const std::string &storageClassName) const { + try { + const char *const sql = + "SELECT " + "STORAGE_CLASS.STORAGE_CLASS_NAME AS STORAGE_CLASS_NAME " + "FROM " + "ARCHIVE_FILE " + "INNER JOIN " + "STORAGE_CLASS " + "ON " + "ARCHIVE_FILE.STORAGE_CLASS_ID = STORAGE_CLASS.STORAGE_CLASS_ID " + "WHERE " + "STORAGE_CLASS_NAME = :STORAGE_CLASS_NAME"; + auto stmt = conn.createStmt(sql); + stmt.bindString(":STORAGE_CLASS_NAME", storageClassName); + auto rset = stmt.executeQuery(); + return rset.next(); + } catch(exception::UserError &) { + throw; + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + ": " + ex.getMessage().str()); + throw; + } +} + //------------------------------------------------------------------------------ // getStorageClasses //------------------------------------------------------------------------------ @@ -878,7 +948,7 @@ void RdbmsCatalogue::deleteTapePool(const std::string &name) { throw exception::UserError(std::string("Cannot delete tape-pool ") + name + " because it does not exist"); } } else { - throw exception::UserError(std::string("Cannot delete tape-pool ") + name + " because it is not empty"); + throw UserSpecifiedAnEmptyTapePool(std::string("Cannot delete tape-pool ") + name + " because it is not empty"); } } catch(exception::UserError &) { throw; diff --git a/catalogue/RdbmsCatalogue.hpp b/catalogue/RdbmsCatalogue.hpp index bf4846626f0debacc2958736c9349bb72a9ea445..4e5a9515b5624061341ec897a6d98e8d14f87445 100644 --- a/catalogue/RdbmsCatalogue.hpp +++ b/catalogue/RdbmsCatalogue.hpp @@ -1340,6 +1340,24 @@ protected: */ bool isSetAndEmpty(const optional<std::string> &optionalStr) const; + /** + * Returns true if the specified storage class is currently being used by one + * or more archive routes. + * + * @param conn The database connection. + * @param storageClassName The name of the storage class. + */ + bool storageClassIsUsedByArchiveRoutes(rdbms::Conn &conn, const std::string &storageClassName) const; + + /** + * Returns true if the specified storage class is currently being used by one + * or more archive files. + * + * @param conn The database connection. + * @param storageClassName The name of the storage class. + */ + bool storageClassIsUsedByArchiveFiles(rdbms::Conn &conn, const std::string &storageClassName) const; + /** * Cached versions of tape copy to tape tape pool mappings for specific * storage classes. diff --git a/catalogue/TapeItemWrittenPointerTest.cpp b/catalogue/TapeItemWrittenPointerTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..84b4f9b2357f5eeb37f66328e31d26ce2aa83d4e --- /dev/null +++ b/catalogue/TapeItemWrittenPointerTest.cpp @@ -0,0 +1,146 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "catalogue/TapeFileWritten.hpp" +#include "catalogue/TapeItemWrittenPointer.hpp" +#include "common/make_unique.hpp" + +#include <gtest/gtest.h> +#include <set> + +namespace unitTests { + +class cta_catalogue_TapeItemWrittenPointerTest : public ::testing::Test { +protected: + + virtual void SetUp() { + } + + virtual void TearDown() { + } +}; + +TEST_F(cta_catalogue_TapeItemWrittenPointerTest, check_set_order_after_set_fseq_using_unique_ptr) { + using namespace cta::catalogue; + + std::set<TapeItemWrittenPointer> filesWrittenSet; + + auto file1WrittenUP = cta::make_unique<TapeFileWritten>(); + auto file2WrittenUP = cta::make_unique<TapeFileWritten>(); + + file1WrittenUP->fSeq = 1; + filesWrittenSet.insert(file1WrittenUP.release()); + + file2WrittenUP->fSeq = 2; + filesWrittenSet.insert(file2WrittenUP.release()); + + ASSERT_EQ(2, filesWrittenSet.size()); + + uint64_t expectedFSeq = 1; + for(const auto &event: filesWrittenSet) { + ASSERT_EQ(expectedFSeq, event->fSeq); + expectedFSeq++; + } +} + +TEST_F(cta_catalogue_TapeItemWrittenPointerTest, DISABLED_check_set_order_after_set_fseq_using_reference) { + using namespace cta::catalogue; + + std::set<TapeItemWrittenPointer> filesWrittenSet; + + auto file1WrittenUP = cta::make_unique<TapeFileWritten>(); + auto file2WrittenUP = cta::make_unique<TapeFileWritten>(); + + auto file1WrittenPtr = file1WrittenUP.get(); + auto file2WrittenPtr = file2WrittenUP.get(); + + auto & file1Written = *file1WrittenUP; + filesWrittenSet.insert(file1WrittenUP.release()); + file1Written.fSeq = 1; + + auto & file2Written = *file2WrittenUP; + filesWrittenSet.insert(file2WrittenUP.release()); + file2Written.fSeq = 2; + + ASSERT_LT(file1Written, file2Written); + + ASSERT_EQ(2, filesWrittenSet.size()); + + // Check the set contains the original objects + for(const auto &event: filesWrittenSet) { + ASSERT_TRUE(event.get() == file1WrittenPtr || event.get() == file2WrittenPtr); + + if(event.get() == file1WrittenPtr) { + ASSERT_EQ(1, event->fSeq); + } else { + ASSERT_EQ(2, event->fSeq); + } + } + + // Check the order of the set + uint64_t expectedFSeq = 1; + for(const auto &event: filesWrittenSet) { + ASSERT_EQ(expectedFSeq, event->fSeq); + expectedFSeq++; + } +} + +TEST_F(cta_catalogue_TapeItemWrittenPointerTest, check_set_order_after_set_fseq_using_reference_delayed_insert) { + using namespace cta::catalogue; + + std::set<TapeItemWrittenPointer> filesWrittenSet; + + auto file1WrittenUP = cta::make_unique<TapeFileWritten>(); + auto file2WrittenUP = cta::make_unique<TapeFileWritten>(); + + auto file1WrittenPtr = file1WrittenUP.get(); + auto file2WrittenPtr = file2WrittenUP.get(); + + auto & file1Written = *file1WrittenUP; + file1Written.fSeq = 1; + + auto & file2Written = *file2WrittenUP; + file2Written.fSeq = 2; + + filesWrittenSet.insert(file1WrittenUP.release()); + filesWrittenSet.insert(file2WrittenUP.release()); + + ASSERT_LT(file1Written, file2Written); + + ASSERT_EQ(2, filesWrittenSet.size()); + + // Check the set contains the original objects + for(const auto &event: filesWrittenSet) { + ASSERT_TRUE(event.get() == file1WrittenPtr || event.get() == file2WrittenPtr); + + if(event.get() == file1WrittenPtr) { + ASSERT_EQ(1, event->fSeq); + } else { + ASSERT_EQ(2, event->fSeq); + } + } + + // Check the order of the set + uint64_t expectedFSeq = 1; + for(const auto &event: filesWrittenSet) { + ASSERT_EQ(expectedFSeq, event->fSeq); + expectedFSeq++; + } +} + +} // namespace unitTests diff --git a/catalogue/UserSpecifiedAnEmptyTapePool.cpp b/catalogue/UserSpecifiedAnEmptyTapePool.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8882480a86f9573b52449bcff6064b0c66168cdc --- /dev/null +++ b/catalogue/UserSpecifiedAnEmptyTapePool.cpp @@ -0,0 +1,32 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "UserSpecifiedAnEmptyTapePool.hpp" + +namespace cta { +namespace catalogue { + +//------------------------------------------------------------------------------ +// constructor +//------------------------------------------------------------------------------ +UserSpecifiedAnEmptyTapePool::UserSpecifiedAnEmptyTapePool(const std::string &context, const bool embedBacktrace): + UserError(context, embedBacktrace) { +} + +} // namespace catalogue +} // namespace cta diff --git a/catalogue/UserSpecifiedAnEmptyTapePool.hpp b/catalogue/UserSpecifiedAnEmptyTapePool.hpp new file mode 100644 index 0000000000000000000000000000000000000000..0b76caef8b0001eac36833f6969e0a66568dc796 --- /dev/null +++ b/catalogue/UserSpecifiedAnEmptyTapePool.hpp @@ -0,0 +1,46 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "common/exception/UserError.hpp" + +namespace cta { +namespace catalogue { + +/** + * User error thrown when a tape pool they specified is not empty when it should + * be. + */ +class UserSpecifiedAnEmptyTapePool: public exception::UserError { +public: + + /** + * Constructor. + * + * @param context optional context string added to the message + * at initialisation time. + * @param embedBacktrace whether to embed a backtrace of where the + * exception was throw in the message + */ + UserSpecifiedAnEmptyTapePool(const std::string &context = "", const bool embedBacktrace = true); + +}; // class UserSpecifiedAnEmptyTapePool + +} // namespace catalogue +} // namespace cta diff --git a/catalogue/UserSpecifiedStorageClassUsedByArchiveFiles.cpp b/catalogue/UserSpecifiedStorageClassUsedByArchiveFiles.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f96b4f7f4a010c3667fd4fe844a155865b18ce3e --- /dev/null +++ b/catalogue/UserSpecifiedStorageClassUsedByArchiveFiles.cpp @@ -0,0 +1,39 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "catalogue/UserSpecifiedStorageClassUsedByArchiveFiles.hpp" + +namespace cta { +namespace catalogue { + + +//------------------------------------------------------------------------------ +// constructor +//------------------------------------------------------------------------------ +UserSpecifiedStorageClassUsedByArchiveFiles::UserSpecifiedStorageClassUsedByArchiveFiles(const std::string &context, + const bool embedBacktrace): cta::exception::UserError(context, embedBacktrace) { +} + +//------------------------------------------------------------------------------ +// destructor +//------------------------------------------------------------------------------ +UserSpecifiedStorageClassUsedByArchiveFiles::~UserSpecifiedStorageClassUsedByArchiveFiles() { +} + +} // namespace catalogue +} // namespace cta diff --git a/catalogue/UserSpecifiedStorageClassUsedByArchiveFiles.hpp b/catalogue/UserSpecifiedStorageClassUsedByArchiveFiles.hpp new file mode 100644 index 0000000000000000000000000000000000000000..892340d0f9604efb7ea019bbe2c0ff8dc90cbac8 --- /dev/null +++ b/catalogue/UserSpecifiedStorageClassUsedByArchiveFiles.hpp @@ -0,0 +1,49 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "common/exception/UserError.hpp" + +namespace cta { +namespace catalogue { + +/** + * User specified a storage class which is currently being used by one or more + * archive files. + */ +class UserSpecifiedStorageClassUsedByArchiveFiles: public exception::UserError { +public: + /** + * Constructor. + * + * @param context optional context string added to the message + * at initialisation time. + * @param embedBacktrace whether to embed a backtrace of where the + * exception was throw in the message + */ + UserSpecifiedStorageClassUsedByArchiveFiles(const std::string &context = "", const bool embedBacktrace = true); + + /** + * Destructor. + */ + ~UserSpecifiedStorageClassUsedByArchiveFiles() override; +}; // class UserSpecifiedStorageClassUsedByArchiveFiles + +} // namespace catalogue +} // namespace cta diff --git a/catalogue/UserSpecifiedStorageClassUsedByArchiveRoutes.cpp b/catalogue/UserSpecifiedStorageClassUsedByArchiveRoutes.cpp new file mode 100644 index 0000000000000000000000000000000000000000..fe4cc771ad55c2553505306babda8a655759a747 --- /dev/null +++ b/catalogue/UserSpecifiedStorageClassUsedByArchiveRoutes.cpp @@ -0,0 +1,38 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "catalogue/UserSpecifiedStorageClassUsedByArchiveRoutes.hpp" + +namespace cta { +namespace catalogue { + +//------------------------------------------------------------------------------ +// constructor +//------------------------------------------------------------------------------ +UserSpecifiedStorageClassUsedByArchiveRoutes::UserSpecifiedStorageClassUsedByArchiveRoutes(const std::string &context, + const bool embedBacktrace): cta::exception::UserError(context, embedBacktrace) { +} + +//------------------------------------------------------------------------------ +// destructor +//------------------------------------------------------------------------------ +UserSpecifiedStorageClassUsedByArchiveRoutes::~UserSpecifiedStorageClassUsedByArchiveRoutes() { +} + +} // namespace catalogue +} // namespace cta diff --git a/catalogue/UserSpecifiedStorageClassUsedByArchiveRoutes.hpp b/catalogue/UserSpecifiedStorageClassUsedByArchiveRoutes.hpp new file mode 100644 index 0000000000000000000000000000000000000000..ddf1fc49984d6ef0636aeb4ef9b72bcc489e9e5e --- /dev/null +++ b/catalogue/UserSpecifiedStorageClassUsedByArchiveRoutes.hpp @@ -0,0 +1,49 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "common/exception/UserError.hpp" + +namespace cta { +namespace catalogue { + +/** + * User specified a storage class which is currently being used by one or more + * archive routes. + */ +class UserSpecifiedStorageClassUsedByArchiveRoutes: public exception::UserError { +public: + /** + * Constructor. + * + * @param context optional context string added to the message + * at initialisation time. + * @param embedBacktrace whether to embed a backtrace of where the + * exception was throw in the message + */ + UserSpecifiedStorageClassUsedByArchiveRoutes(const std::string &context = "", const bool embedBacktrace = true); + + /** + * Destructor. + */ + ~UserSpecifiedStorageClassUsedByArchiveRoutes() override; +}; // class UserSpecifiedStorageClassUsedByArchiveRoutes + +} // namespace catalogue +} // namespace cta diff --git a/continuousintegration/docker/ctafrontend/cc7/etc/yum/pluginconf.d/versionlock.list b/continuousintegration/docker/ctafrontend/cc7/etc/yum/pluginconf.d/versionlock.list index 4654466fe19b1333a2a3b387baf67fd05473b25e..24d08f856f42928e84cac8b4f4288c09a2350c4c 100644 --- a/continuousintegration/docker/ctafrontend/cc7/etc/yum/pluginconf.d/versionlock.list +++ b/continuousintegration/docker/ctafrontend/cc7/etc/yum/pluginconf.d/versionlock.list @@ -1,17 +1,17 @@ -0:eos-archive-4.5.3-20190730172748gitea30da3.el7.cern.x86_64 -0:eos-cleanup-4.5.3-20190730172748gitea30da3.el7.cern.x86_64 -0:eos-client-4.5.3-20190730172748gitea30da3.el7.cern.x86_64 -0:eos-debuginfo-4.5.3-20190730172748gitea30da3.el7.cern.x86_64 -0:eos-fuse-4.5.3-20190730172748gitea30da3.el7.cern.x86_64 -0:eos-fuse-core-4.5.3-20190730172748gitea30da3.el7.cern.x86_64 -0:eos-fuse-sysv-4.5.3-20190730172748gitea30da3.el7.cern.x86_64 -0:eos-fusex-4.5.3-20190730172748gitea30da3.el7.cern.x86_64 -0:eos-fusex-core-4.5.3-20190730172748gitea30da3.el7.cern.x86_64 -0:eos-fusex-selinux-4.5.3-20190730172748gitea30da3.el7.cern.x86_64 -0:eos-server-4.5.3-20190730172748gitea30da3.el7.cern.x86_64 -0:eos-srm-4.5.3-20190730172748gitea30da3.el7.cern.x86_64 -0:eos-test-4.5.3-20190730172748gitea30da3.el7.cern.x86_64 -0:eos-testkeytab-4.5.3-20190730172748gitea30da3.el7.cern.x86_64 +0:eos-archive-4.5.6-20190829163023gite72c345.el7.cern.x86_64 +0:eos-cleanup-4.5.6-20190829163023gite72c345.el7.cern.x86_64 +0:eos-client-4.5.6-20190829163023gite72c345.el7.cern.x86_64 +0:eos-debuginfo-4.5.6-20190829163023gite72c345.el7.cern.x86_64 +0:eos-fuse-4.5.6-20190829163023gite72c345.el7.cern.x86_64 +0:eos-fuse-core-4.5.6-20190829163023gite72c345.el7.cern.x86_64 +0:eos-fuse-sysv-4.5.6-20190829163023gite72c345.el7.cern.x86_64 +0:eos-fusex-4.5.6-20190829163023gite72c345.el7.cern.x86_64 +0:eos-fusex-core-4.5.6-20190829163023gite72c345.el7.cern.x86_64 +0:eos-fusex-selinux-4.5.6-20190829163023gite72c345.el7.cern.x86_64 +0:eos-server-4.5.6-20190829163023gite72c345.el7.cern.x86_64 +0:eos-srm-4.5.6-20190829163023gite72c345.el7.cern.x86_64 +0:eos-test-4.5.6-20190829163023gite72c345.el7.cern.x86_64 +0:eos-testkeytab-4.5.6-20190829163023gite72c345.el7.cern.x86_64 1:python2-xrootd-4.10.0-1.el7.* 1:python3-xrootd-4.10.0-1.el7.* 1:xrootd-4.10.0-1.el7.* diff --git a/continuousintegration/docker/ctafrontend/cc7/opt/run/bin/ctaeos-mgm.sh b/continuousintegration/docker/ctafrontend/cc7/opt/run/bin/ctaeos-mgm.sh index 6917d8e37e6fb79917dab17dc59a053680dca832..cef628e2ccdbd2ede0385c5952efa3586518eb58 100755 --- a/continuousintegration/docker/ctafrontend/cc7/opt/run/bin/ctaeos-mgm.sh +++ b/continuousintegration/docker/ctafrontend/cc7/opt/run/bin/ctaeos-mgm.sh @@ -180,7 +180,7 @@ ACTUAL_NB_STARTED_CTA_FST_GCD=0 if test -f /var/log/eos/fst/cta-fst-gcd.log; then ACTUAL_NB_STARTED_CTA_FST_GCD=`grep "cta-fst-gcd started" /var/log/eos/fst/cta-fst-gcd.log | wc -l` else - echo "/usr/bin/cta-fst-gcd DOES NOT EXIST" + echo "/var/log/eos/fst/cta-fst-gcd.log DOES NOT EXIST" exit 1 fi if test ${EXPECTED_NB_STARTED_CTA_FST_GCD} = ${ACTUAL_NB_STARTED_CTA_FST_GCD}; then diff --git a/continuousintegration/orchestration/tests/client_ar.sh b/continuousintegration/orchestration/tests/client_ar.sh index ae13afd26a0e92c1830ee5d1c40e5c7dc7da06c1..39f4ddde7dc176e79f36f30e75ffefc4829f94b2 100644 --- a/continuousintegration/orchestration/tests/client_ar.sh +++ b/continuousintegration/orchestration/tests/client_ar.sh @@ -7,6 +7,7 @@ DATA_SOURCE=/dev/urandom ARCHIVEONLY=0 # Only archive files or do the full test? DONOTARCHIVE=0 # files were already archived in a previous run NEED TARGETDIR TARGETDIR='' +LOGDIR='/var/log' COMMENT='' # id of the test so that we can track it @@ -134,6 +135,9 @@ if [[ "x${TARGETDIR}" = "x" ]]; then else EOS_DIR="${EOS_BASEDIR}/${TARGETDIR}" fi +LOGDIR="${LOGDIR}/$(basename ${EOS_DIR})" +mkdir -p ${LOGDIR} || die "Cannot create directory LOGDIR: ${LOGDIR}" +mkdir -p ${LOGDIR}/xrd_errors || die "Cannot create directory LOGDIR/xrd_errors: ${LOGDIR}/xrd_errors" STATUS_FILE=$(mktemp) ERROR_FILE=$(mktemp) @@ -188,6 +192,12 @@ done | xargs --max-procs=${NB_PROCS} -iTEST_FILE_NAME bash -c "XRD_LOGLEVEL=Dump # done | xargs -n ${BATCH_SIZE} --max-procs=${NB_BATCH_PROCS} ./batch_xrdcp /tmp/testfile root://${EOSINSTANCE}/${EOS_DIR}/${subdir} echo Done. done +if [ "0" != "$(ls ${ERROR_DIR} 2> /dev/null | wc -l)" ]; then + # there were some xrdcp errors + echo "Several xrdcp errors occured during archival!" + echo "Please check client pod logs in artifacts" + mv ${ERROR_DIR}/* ${LOGDIR}/xrd_errors/ +fi COPIED=0 COPIED_EMPTY=0 @@ -264,10 +274,16 @@ done # CAREFULL HERE: ${STATUS_FILE} contains lines like: 99/test9900001 for ((subdir=0; subdir < ${NB_DIRS}; subdir++)); do echo -n "Recalling files to ${EOS_DIR}/${subdir} using ${NB_PROCS} processes..." - cat ${STATUS_FILE} | grep ^${subdir}/ | cut -d/ -f2 | xargs --max-procs=${NB_PROCS} -iTEST_FILE_NAME bash -c "XRD_LOGLEVEL=Dump KRB5CCNAME=/tmp/${EOSPOWER_USER}/krb5cc_0 XrdSecPROTOCOL=krb5 xrdfs ${EOSINSTANCE} prepare -s ${EOS_DIR}/${subdir}/TEST_FILE_NAME?activity=T0Reprocess 2>${ERROR_DIR}/RETRIEVE_TEST_FILE_NAME && rm ${ERROR_DIR}/RETRIEVE_TEST_FILE_NAME || echo ERROR with xrootd transfer for file TEST_FILE_NAME, full logs in ${ERROR_DIR}/RETRIEVE_TEST_FILE_NAME" > /dev/null + cat ${STATUS_FILE} | grep ^${subdir}/ | cut -d/ -f2 | xargs --max-procs=${NB_PROCS} -iTEST_FILE_NAME bash -c "XRD_LOGLEVEL=Dump KRB5CCNAME=/tmp/${EOSPOWER_USER}/krb5cc_0 XrdSecPROTOCOL=krb5 xrdfs ${EOSINSTANCE} prepare -s ${EOS_DIR}/${subdir}/TEST_FILE_NAME?activity=T0Reprocess 2>${ERROR_DIR}/RETRIEVE_TEST_FILE_NAME && rm ${ERROR_DIR}/RETRIEVE_TEST_FILE_NAME || echo ERROR with xrootd transfer for file TEST_FILE_NAME, full logs in ${ERROR_DIR}/RETRIEVE_TEST_FILE_NAME" | tee ${LOGDIR}/prepare_${subdir}.log | grep ^ERROR echo Done. + cat ${STATUS_FILE} | grep ^${subdir}/ | cut -d/ -f2 | xargs --max-procs=${NB_PROCS} -iTEST_FILE_NAME bash -c "XRD_LOGLEVEL=Dump KRB5CCNAME=/tmp/${EOSPOWER_USER}/krb5cc_0 XrdSecPROTOCOL=krb5 xrdfs ${EOSINSTANCE} query opaquefile ${EOS_DIR}/${subdir}/TEST_FILE_NAME?mgm.pcmd=xattr\&mgm.subcmd=get\&mgm.xattrname=sys.retrieve.req_id 2>${ERROR_DIR}/XATTRGET_TEST_FILE_NAME && rm ${ERROR_DIR}/XATTRGET_TEST_FILE_NAME || echo ERROR with xrootd xattr get for file TEST_FILE_NAME, full logs in ${ERROR_DIR}/XATTRGET_TEST_FILE_NAME" | tee ${LOGDIR}/prepare_sys.retrieve.req_id_${subdir}.log | grep ^ERROR done - +if [ "0" != "$(ls ${ERROR_DIR} 2> /dev/null | wc -l)" ]; then + # there were some prepare errors + echo "Several prepare errors occured during retrieval!" + echo "Please check client pod logs in artifacts" + mv ${ERROR_DIR}/* ${LOGDIR}/xrd_errors/ +fi ARCHIVED=$(cat ${STATUS_FILE} | wc -l) TO_BE_RETRIEVED=$(( ${ARCHIVED} - $(ls ${ERROR_DIR}/RETRIEVE_* 2>/dev/null | wc -l) )) @@ -434,9 +450,23 @@ test -z ${COMMENT} || annotate "test ${TESTID} FINISHED" "Summary:</br>NB_FILES: # stop tail test -z $TAILPID || kill ${TAILPID} &> /dev/null -test ${LASTCOUNT} -eq $((${NB_FILES} * ${NB_DIRS})) && exit 0 +RC=0 +if [ ${LASTCOUNT} -ne $((${NB_FILES} * ${NB_DIRS})) ]; then + ((RC++)) + echo "ERROR there were some lost files during the archive/retrieve test with ${NB_FILES} files (first 10):" + grep -v retrieved ${STATUS_FILE} | sed -e "s;^;${EOS_DIR}/;" | head -10 +fi -echo "ERROR there were some lost files during the archive/retrieve test with ${NB_FILES} files (first 10):" -grep -v retrieved ${STATUS_FILE} | sed -e "s;^;${EOS_DIR}/;" | head -10 +if [ $(cat ${LOGDIR}/prepare_sys.retrieve.req_id_*.log | grep -v value= | wc -l) -ne 0 ]; then + # THIS IS NOT YET AN ERROR: UNCOMMENT THE FOLLOWING LINE WHEN https://gitlab.cern.ch/cta/CTA/issues/606 is fixed + # ((RC++)) + echo "ERROR $(cat ${LOGDIR}/prepare_sys.retrieve.req_id_*.log | grep -v value= | wc -l) files out of $(cat ${LOGDIR}/prepare_sys.retrieve.req_id_*.log | wc -l) prepared files have no sys.retrieve.req_id extended attribute set" +fi -exit 1 +if [ $(ls ${LOGDIR}/xrd_errors | wc -l) -ne 0 ]; then + ((RC++)) + echo "ERROR several xrootd failures occured during this run, please check client dumps in ${LOGDIR}/xrd_errors." +fi + + +exit ${RC} diff --git a/continuousintegration/orchestration/tests/repack_generate_report.sh b/continuousintegration/orchestration/tests/repack_generate_report.sh index 8182ca5c79ec8233d1cf0cacb832149d8c1eaa99..345048a7445ce7b03943a8ba8724712354a1ea7b 100755 --- a/continuousintegration/orchestration/tests/repack_generate_report.sh +++ b/continuousintegration/orchestration/tests/repack_generate_report.sh @@ -1,6 +1,7 @@ #!/bin/bash REPORT_DIRECTORY=/var/log +ADD_COPIES_ONLY=0 die() { echo "$@" 1>&2 @@ -9,8 +10,9 @@ die() { } usage() { cat <<EOF 1>&2 -Usage: $0 -v <vid> [-r <report_directory>] +Usage: $0 -v <vid> [-r <report_directory>] [-a] Default report_directory = ${REPORT_DIRECTORY} +-a: Specify if the repack is add copies only or not EOF exit 1 } @@ -20,7 +22,7 @@ then usage fi; -while getopts "v:r:" o; do +while getopts "v:r:a" o; do case "${o}" in v) VID=${OPTARG} @@ -28,6 +30,9 @@ while getopts "v:r:" o; do r) REPORT_DIRECTORY=${OPTARG} ;; + a) + ADD_COPIES_ONLY=1 + ;; *) usage ;; @@ -42,41 +47,69 @@ shift $((OPTIND-1)) admin_kinit admin_klist > /dev/null 2>&1 || die "Cannot get kerberos credentials for user ${USER}" -echo "Generation of a repack report" +repackRequest=`admin_cta --json repack ls --vid ${VID}` + +echo $repackRequest + +if [ "$repackRequest" == "" ]; +then + die "No repack request for this VID." +fi; + +echo "Generation of a repack report of the vid ${VID}" DATE=`date +%d-%m-%y-%H:%M:%S` -ARCHIVE_FILE_LS_RESULT_PATH=${REPORT_DIRECTORY}/${VID}_af_ls_${DATE}.json +ARCHIVE_FILE_LS_RESULT_PATH=${REPORT_DIRECTORY}/af_ls_${DATE}.json +ARCHIVE_FILE_LS_VID_RESULT_PATH=${REPORT_DIRECTORY}/${VID}_af_ls_${DATE}.json NOT_REPACKED_JSON_PATH=${REPORT_DIRECTORY}/${VID}_report_not_repacked_${DATE}.json SELF_REPACKED_JSON_PATH=${REPORT_DIRECTORY}/${VID}_report_self_repacked_${DATE}.json -REPACKED_JSON_PATH=${REPORT_DIRECTORY}/${VID}_report_repacked_${DATE}.json +REPACKED_MOVE_JSON_PATH=${REPORT_DIRECTORY}/${VID}_report_repacked_move_${DATE}.json +REPACK_ADD_COPIES_JSON_PATH=${REPORT_DIRECTORY}/${VID}_report_repack_add_copies_${DATE}.json echo "1. Generate archive file ls result into ${ARCHIVE_FILE_LS_RESULT_PATH} file..." -admin_cta --json archivefile ls --vid ${VID} > ${ARCHIVE_FILE_LS_RESULT_PATH} +admin_cta --json archivefile ls --all > ${ARCHIVE_FILE_LS_RESULT_PATH} echo "OK" -echo "2. Generate the non-repacked files report into ${NOT_REPACKED_JSON_PATH} file..." -jq -r '[.[] | select(.tf.supersededByVid == "")]' ${ARCHIVE_FILE_LS_RESULT_PATH} > ${NOT_REPACKED_JSON_PATH} +echo "2. Generate all the archive files that are on vid ${VID} into ${ARCHIVE_FILE_LS_VID_RESULT_PATH} file..." +jq -r "[.[] | select (.tf.vid == \"${VID}\")]" ${ARCHIVE_FILE_LS_RESULT_PATH} > ${ARCHIVE_FILE_LS_VID_RESULT_PATH} echo "OK" -echo "3. Generating the self-repacked files report into ${SELF_REPACKED_JSON_PATH} file..." -jq -r '[.[] | select((.tf.supersededByVid == .tf.vid) and (.tf.fSeq < .tf.supersededByFSeq))]' ${ARCHIVE_FILE_LS_RESULT_PATH} > ${SELF_REPACKED_JSON_PATH} +echo "3. Generate the non-repacked files report into ${NOT_REPACKED_JSON_PATH} file..." +jq -r "[.[] | select (.tf.supersededByVid == \"\" and .tf.vid == \"${VID}\")]" ${ARCHIVE_FILE_LS_VID_RESULT_PATH} > ${NOT_REPACKED_JSON_PATH} echo "OK" -echo "4. Generate the repacked files report into ${REPACKED_JSON_PATH} file..." -jq -r '[.[] | select((.tf.supersededByVid != "") and (.tf.supersededByVid != .tf.vid))]' ${ARCHIVE_FILE_LS_RESULT_PATH} > ${REPACKED_JSON_PATH} +echo "4. Generating the self-repacked files report into ${SELF_REPACKED_JSON_PATH} file..." +jq -r '[.[] | select((.tf.supersededByVid == .tf.vid) and (.tf.fSeq < .tf.supersededByFSeq))]' ${ARCHIVE_FILE_LS_VID_RESULT_PATH} > ${SELF_REPACKED_JSON_PATH} echo "OK" -echo "5. Report of the repacked tape" -echo -NB_NON_REPACKED_FILES=$(jq '[.[]] | length' ${NOT_REPACKED_JSON_PATH} || 0) -echo "Number of non repacked files : ${NB_NON_REPACKED_FILES}" -if [ ${NB_NON_REPACKED_FILES} -ne 0 ] +echo "5. Generate the repacked (moved) files report into ${REPACKED_MOVE_JSON_PATH} file..." +jq -r '[.[] | select((.tf.supersededByVid != "") and (.tf.supersededByVid != .tf.vid))]' ${ARCHIVE_FILE_LS_VID_RESULT_PATH} > ${REPACKED_MOVE_JSON_PATH} +echo "OK" + +echo "6. Generate the repack \"just add copies\" report into ${REPACK_ADD_COPIES_JSON_PATH} file..." +storageClass=`jq ".[0] | .af.storageClass" ${ARCHIVE_FILE_LS_VID_RESULT_PATH}` +copyNbToExclude=`jq -r ".[0] | .copyNb" ${ARCHIVE_FILE_LS_VID_RESULT_PATH}` +copyNbs=`admin_cta --json archiveroute ls | jq -r ".[] | select(.storageClass == $storageClass and .copyNumber != $copyNbToExclude) | .copyNumber"` + +jq -r "[.[] | select(.copyNb == (\"$copyNbs\" | split(\"\n\")[]))]" ${ARCHIVE_FILE_LS_RESULT_PATH} > ${REPACK_ADD_COPIES_JSON_PATH} +echo "OK" + +echo "7. Report of the repacked tape" + +if [ ${ADD_COPIES_ONLY} -eq 0 ]; then - header="ArchiveID\tFSeq\tSize" - { echo -e $header; jq -r '.[] | [.af.archiveId,.tf.fSeq, .af.size] | @tsv' ${NOT_REPACKED_JSON_PATH}; } | column -t -fi; + echo + NB_NON_REPACKED_FILES=$(jq '[.[]] | length' ${NOT_REPACKED_JSON_PATH} || 0) + echo "Number of non repacked files : ${NB_NON_REPACKED_FILES}" + if [ ${NB_NON_REPACKED_FILES} -ne 0 ] + then + header="ArchiveID\tFSeq\tSize" + { echo -e $header; jq -r '.[] | [.af.archiveId,.tf.fSeq, .af.size] | @tsv' ${NOT_REPACKED_JSON_PATH}; } | column -t + fi; +fi; + echo NB_SELF_REPACKED_FILES=$(jq '[.[]] | length' ${SELF_REPACKED_JSON_PATH} || 0) echo "Number of self-repacked files : ${NB_SELF_REPACKED_FILES}" @@ -86,12 +119,22 @@ then { echo -e $header; jq -r '.[] | [.af.archiveId, .tf.fSeq, .af.size] | @tsv' ${SELF_REPACKED_JSON_PATH}; } | column -t fi; echo -NB_REPACKED_FILES=$(jq '[.[]] | length' ${REPACKED_JSON_PATH} || 0) -echo "Number of repacked files : ${NB_REPACKED_FILES}" -if [ ${NB_REPACKED_FILES} -ne 0 ] + +NB_REPACKED_MOVE_FILES=$(jq '[.[]] | length' ${REPACKED_MOVE_JSON_PATH} || 0) +echo "Number of repacked (moved) files : ${NB_REPACKED_MOVE_FILES}" +if [ ${NB_REPACKED_MOVE_FILES} -ne 0 ] +then + header="DestinationVID\tNbFiles\ttotalSize\n" + { echo -e $header; jq -r 'group_by(.tf.supersededByVid)[] | [(.[0].tf.supersededByVid),([.[] | .tf.supersededByFSeq] | length),(reduce [.[] | .af.size | tonumber][] as $currentSize (0; . + $currentSize))] | @tsv' ${REPACKED_MOVE_JSON_PATH}; } | column -t +fi; +echo + +NB_COPIED_FILES=$(jq '[.[]] | length' ${REPACK_ADD_COPIES_JSON_PATH} || 0) +echo "Number of copied files : $NB_COPIED_FILES" +if [ ${NB_COPIED_FILES} -ne 0 ] then header="DestinationVID\tNbFiles\ttotalSize\n" - { echo -e $header; jq -r 'group_by(.tf.supersededByVid)[] | [(.[0].tf.supersededByVid),([.[] | .tf.supersededByFSeq] | length),(reduce [.[] | .af.size | tonumber][] as $currentSize (0; . + $currentSize))] | @tsv' ${REPACKED_JSON_PATH}; } | column -t + { echo -e $header; jq -r 'group_by(.tf.vid)[] | [(.[0].tf.vid),([.[] | .tf.fSeq] | length),(reduce [.[] | .af.size | tonumber][] as $currentSize (0; . + $currentSize))] | @tsv' ${REPACK_ADD_COPIES_JSON_PATH}; } | column -t fi; echo "End of the repack report" \ No newline at end of file diff --git a/continuousintegration/orchestration/tests/repack_systemtest.sh b/continuousintegration/orchestration/tests/repack_systemtest.sh index f79f83e3018a5a33be953d45cbbf05fea31a54ac..22652c3288e7ea2b9d0dd5ba28167504e71462a5 100755 --- a/continuousintegration/orchestration/tests/repack_systemtest.sh +++ b/continuousintegration/orchestration/tests/repack_systemtest.sh @@ -12,10 +12,12 @@ die() { } usage() { cat <<EOF 1>&2 -Usage: $0 -v <vid> -b <bufferURL> [-e <eosinstance>] [-t <timeout>] +Usage: $0 -v <vid> -b <bufferURL> [-e <eosinstance>] [-t <timeout>] [-a] [m] (bufferURL example : /eos/ctaeos/repack) eosinstance : the name of the ctaeos instance to be used (default ctaeos) timeout : the timeout in seconds to wait for the repack to be done +-a : Launch a repack just add copies workflow +-m : Launch a repack just move workflow EOF exit 1 } @@ -38,7 +40,7 @@ then usage fi; -while getopts "v:e:b:t:" o; do +while getopts "v:e:b:t:amg" o; do case "${o}" in v) VID_TO_REPACK=${OPTARG} @@ -52,6 +54,15 @@ while getopts "v:e:b:t:" o; do t) WAIT_FOR_REPACK_TIMEOUT=${OPTARG} ;; + a) + ADD_COPIES_ONLY="-a" + ;; + m) + MOVE_ONLY="-m" + ;; + g) + GENERATE_REPORT=0 + ;; *) usage ;; @@ -69,6 +80,14 @@ if [ "x${VID_TO_REPACK}" = "x" ]; then die "No vid to repack provided." fi +REPACK_OPTION="" + +if [ "x${ADD_COPIES_ONLY}" != "x" ] && [ "x${MOVE_ONLY}" != "x" ]; then + die "-a and -m options are mutually exclusive" +fi + +[[ "x${ADD_COPIES_ONLY}" == "x" ]] && REPACK_OPTION=${MOVE_ONLY} || REPACK_OPTION=${ADD_COPIES_ONLY} + # get some common useful helpers for krb5 . /root/client_helper.sh @@ -86,7 +105,7 @@ echo "Marking the tape ${VID_TO_REPACK} as full before Repacking it" admin_cta tape ch --vid ${VID_TO_REPACK} --full true echo "Launching repack request for VID ${VID_TO_REPACK}, bufferURL = ${FULL_REPACK_BUFFER_URL}" -admin_cta re add --vid ${VID_TO_REPACK} --justmove --bufferurl ${FULL_REPACK_BUFFER_URL} +admin_cta re add --vid ${VID_TO_REPACK} ${REPACK_OPTION} --bufferurl ${FULL_REPACK_BUFFER_URL} SECONDS_PASSED=0 while test 0 = `admin_cta --json repack ls --vid ${VID_TO_REPACK} | jq -r '.[0] | select(.status == "Complete" or .status == "Failed")' | wc -l`; do @@ -104,4 +123,6 @@ if test 1 = `admin_cta --json repack ls --vid ${VID_TO_REPACK} | jq -r '.[0] | s exit 1 fi -exec /root/repack_generate_report.sh -v ${VID_TO_REPACK} \ No newline at end of file +echo "Repack request on VID ${VID_TO_REPACK} succeeded." + +exec /root/repack_generate_report.sh -v ${VID_TO_REPACK} ${ADD_COPIES_ONLY} || exit 0 \ No newline at end of file diff --git a/continuousintegration/orchestration/tests/repack_systemtest_wrapper.sh b/continuousintegration/orchestration/tests/repack_systemtest_wrapper.sh index 3526d24b0b479dde53b1002cbdf5300e45f023a7..ea5a7c60fcff325c6be1e90bfbcb6e5d0ac7a49a 100755 --- a/continuousintegration/orchestration/tests/repack_systemtest_wrapper.sh +++ b/continuousintegration/orchestration/tests/repack_systemtest_wrapper.sh @@ -52,14 +52,14 @@ kubectl -n ${NAMESPACE} cp repack_systemtest.sh client:/root/repack_systemtest.s kubectl -n ${NAMESPACE} cp repack_generate_report.sh client:/root/repack_generate_report.sh echo -echo "Launching a round trip repack request" +echo "Launching a round trip repack \"just move\" request" VID_TO_REPACK=$(getFirstVidContainingFiles) if [ "$VID_TO_REPACK" != "null" ] then echo - echo "Launching the repack test on VID ${VID_TO_REPACK}" - kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} || exit 1 + echo "Launching the repack \"just move\" test on VID ${VID_TO_REPACK}" + kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -m || exit 1 else echo "No vid found to repack" exit 1 @@ -72,8 +72,8 @@ VID_TO_REPACK=$(getFirstVidContainingFiles) if [ "$VID_TO_REPACK" != "null" ] then echo - echo "Launching the repack test on VID ${VID_TO_REPACK}" - kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} || exit 1 + echo "Launching the repack \"just move\" test on VID ${VID_TO_REPACK}" + kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -m || exit 1 else echo "No vid found to repack" exit 1 @@ -97,16 +97,105 @@ else exit 1 fi; -NB_FILES=1153 + +NB_FILES=1152 kubectl -n ${NAMESPACE} exec client -- bash /root/client_ar.sh -n ${NB_FILES} -s ${FILE_SIZE_KB} -p 100 -d /eos/ctaeos/preprod -v -A || exit 1 VID_TO_REPACK=$(getFirstVidContainingFiles) if [ "$VID_TO_REPACK" != "null" ] then echo - echo "Launching the repack test on VID ${VID_TO_REPACK}" - kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} || exit 1 + echo "Launching the repack test \"just move\" on VID ${VID_TO_REPACK}" + kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -m || exit 1 +else + echo "No vid found to repack" + exit 1 +fi + +echo "Reclaiming tape ${VID_TO_REPACK}" +kubectl -n ${NAMESPACE} exec ctacli -- cta-admin tape reclaim --vid ${VID_TO_REPACK} + +VID_TO_REPACK=$(getFirstVidContainingFiles) +if [ "$VID_TO_REPACK" != "null" ] +then + echo "Launching the repack \"just add copies\" test on VID ${VID_TO_REPACK} with all copies already on CTA" + kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -a -g || exit 1 else echo "No vid found to repack" exit 1 fi + +repackJustAddCopiesResult=`kubectl -n ${NAMESPACE} exec ctacli -- cta-admin --json re ls | jq -r ". [] | select (.vid == \"${VID_TO_REPACK}\")"` + +nbRetrievedFiles=`echo ${repackJustAddCopiesResult} | jq -r ".retrievedFiles"` +nbArchivedFiles=`echo ${repackJustAddCopiesResult} | jq -r ".archivedFiles"` + +if [ $nbArchivedFiles == 0 ] && [ $nbRetrievedFiles == 0 ] +then + echo "Nb retrieved files = 0 and nb archived files = 0. Test OK" +else + echo "Repack \"just add copies\" on VID ${VID_TO_REPACK} failed : nbRetrievedFiles = $nbRetrievedFiles, nbArchivedFiles = $nbArchivedFiles" + exit 1 +fi + +tapepoolDestination1="ctasystest2" +tapepoolDestination2="ctasystest3" + +echo "Creating two destination tapepool : $tapepoolDestination1 and $tapepoolDestination2" +kubectl -n ${NAMESPACE} exec ctacli -- cta-admin tapepool add --name $tapepoolDestination1 --vo vo --partialtapesnumber 2 --encrypted false --comment "$tapepoolDestination1 tapepool" +kubectl -n ${NAMESPACE} exec ctacli -- cta-admin tapepool add --name $tapepoolDestination2 --vo vo --partialtapesnumber 2 --encrypted false --comment "$tapepoolDestination2 tapepool" +echo "OK" + +echo "Creating archive routes for adding two copies of the file" +kubectl -n ${NAMESPACE} exec ctacli -- cta-admin archiveroute add --instance ctaeos --storageclass ctaStorageClass --copynb 2 --tapepool $tapepoolDestination1 --comment "ArchiveRoute2" +kubectl -n ${NAMESPACE} exec ctacli -- cta-admin archiveroute add --instance ctaeos --storageclass ctaStorageClass --copynb 3 --tapepool $tapepoolDestination2 --comment "ArchiveRoute3" +echo "OK" + +echo "Will change the tapepool of the tapes" + +allVID=`kubectl -n ${NAMESPACE} exec ctacli -- cta-admin --json tape ls --all | jq -r ". [] | .vid"` +read -a allVIDTable <<< $allVID + +nbVid=${#allVIDTable[@]} + +allTapepool=`kubectl -n ${NAMESPACE} exec ctacli -- cta-admin --json tapepool ls | jq -r ". [] .name"` + +read -a allTapepoolTable <<< $allTapepool + +nbTapepool=${#allTapepoolTable[@]} + +nbTapePerTapepool=$(($nbVid / $nbTapepool)) + +allTapepool=`kubectl -n ${NAMESPACE} exec ctacli -- cta-admin --json tapepool ls | jq -r ". [] .name"` +read -a allTapepoolTable <<< $allTapepool + + +countChanging=0 +tapepoolIndice=1 #We only change the vid of the remaining other tapes + +for ((i=$(($nbTapePerTapepool+$(($nbVid%$nbTapepool)))); i<$nbVid; i++)); +do + echo "kubectl -n ${NAMESPACE} exec ctacli -- cta-admin tape ch --vid ${allVIDTable[$i]} --tapepool ${allTapepoolTable[$tapepoolIndice]}" + kubectl -n ${NAMESPACE} exec ctacli -- cta-admin tape ch --vid ${allVIDTable[$i]} --tapepool ${allTapepoolTable[$tapepoolIndice]} + countChanging=$((countChanging + 1)) + if [ $countChanging != 0 ] && [ $((countChanging % nbTapePerTapepool)) == 0 ] + then + tapepoolIndice=$((tapepoolIndice + 1)) + fi +done + +echo "OK" + +storageClassName=`kubectl -n ${NAMESPACE} exec ctacli -- cta-admin --json storageclass ls | jq -r ". [0] | .name"` +instanceName=`kubectl -n ${NAMESPACE} exec ctacli -- cta-admin --json storageclass ls | jq -r ". [0] | .diskInstance"` + +echo "Changing the storage class $storageClassName nb copies" +kubectl -n ${NAMESPACE} exec ctacli -- cta-admin storageclass ch --instance $instanceName --name $storageClassName --copynb 3 +echo "OK" + +echo "Putting all drives up" +kubectl -n ${NAMESPACE} exec ctacli -- cta-admin dr up VD.* +echo "OK" + +echo "Launching the repack \"Move and add copies\" test on VID ${VID_TO_REPACK}" +kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -t 500 || exit 1 diff --git a/disk/DiskFile.cpp b/disk/DiskFile.cpp index 18667be23094059e3ae3f8897a1ca488bb44979a..32a31aa32cdb2587498b23504f0b97150e1f2de0 100644 --- a/disk/DiskFile.cpp +++ b/disk/DiskFile.cpp @@ -774,11 +774,7 @@ bool XRootdDirectory::exist() { if(statStatus.errNo == XErrorCode::kXR_NotFound){ return false; } - cta::exception::XrootCl::throwOnError(statStatus,"In XrootdDirectory::exist(): fail to determine if directory exists."); - if(statInfo->GetSize() != 0){ - return true; - } - return false; + return true; } std::set<std::string> XRootdDirectory::getFilesName(){ diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index c4a72d0dd1fb313c958639c90923a1989ea98922..3d42dd084c322636e8384dc92caf87ca90c8bac1 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -65,6 +65,20 @@ void ArchiveRequest::initialize() { m_payloadInterpreted = true; } +void ArchiveRequest::commit(){ + checkPayloadWritable(); + checkPayloadReadable(); + for(auto & job: m_payload.jobs()){ + int nbTapepool = std::count_if(m_payload.jobs().begin(),m_payload.jobs().end(),[&job](const cta::objectstore::serializers::ArchiveJob & archiveJob){ + return archiveJob.tapepool() == job.tapepool(); + }); + if(nbTapepool != 1){ + throw cta::exception::Exception("In ArchiveRequest::commit(), cannot insert an ArchiveRequest containing archive jobs with the same destination tapepool"); + } + } + ObjectOps<serializers::ArchiveRequest, serializers::ArchiveRequest_t>::commit(); +} + //------------------------------------------------------------------------------ // ArchiveRequest::addJob() //------------------------------------------------------------------------------ @@ -139,24 +153,24 @@ auto ArchiveRequest::addTransferFailure(uint32_t copyNumber, } j.set_totalretries(j.totalretries() + 1); * j.mutable_failurelogs()->Add() = failureReason; - } - if (j.totalretries() >= j.maxtotalretries()) { - // We have to determine if this was the last copy to fail/succeed. - return determineNextStep(copyNumber, JobEvent::TransferFailed, lc); - } else { - EnqueueingNextStep ret; - bool isRepack = m_payload.isrepack(); - ret.nextStatus = isRepack ? serializers::ArchiveJobStatus::AJS_ToTransferForRepack : serializers::ArchiveJobStatus::AJS_ToTransferForUser; - // Decide if we want the job to have a chance to come back to this mount (requeue) or not. In the latter - // case, the job will remain owned by this session and get garbage collected. - if (j.retrieswithinmount() >= j.maxretrieswithinmount()) - ret.nextStep = EnqueueingNextStep::NextStep::Nothing; - else - ret.nextStep = isRepack ? EnqueueingNextStep::NextStep::EnqueueForTransferForRepack : EnqueueingNextStep::NextStep::EnqueueForTransferForUser; - return ret; + if (j.totalretries() >= j.maxtotalretries()) { + // We have to determine if this was the last copy to fail/succeed. + return determineNextStep(copyNumber, JobEvent::TransferFailed, lc); + } else { + EnqueueingNextStep ret; + bool isRepack = m_payload.isrepack(); + ret.nextStatus = isRepack ? serializers::ArchiveJobStatus::AJS_ToTransferForRepack : serializers::ArchiveJobStatus::AJS_ToTransferForUser; + // Decide if we want the job to have a chance to come back to this mount (requeue) or not. In the latter + // case, the job will remain owned by this session and get garbage collected. + if (j.retrieswithinmount() >= j.maxretrieswithinmount()) + ret.nextStep = EnqueueingNextStep::NextStep::Nothing; + else + ret.nextStep = isRepack ? EnqueueingNextStep::NextStep::EnqueueForTransferForRepack : EnqueueingNextStep::NextStep::EnqueueForTransferForUser; + return ret; + } } } - throw NoSuchJob ("In ArchiveRequest::addJobFailure(): could not find job"); + throw NoSuchJob ("In ArchiveRequest::addTransferFailure(): could not find job"); } //------------------------------------------------------------------------------ @@ -822,7 +836,7 @@ auto ArchiveRequest::determineNextStep(uint32_t copyNumberUpdated, JobEvent jobE for (auto &j:jl) { if (j.copynb() == copyNumberUpdated) currentStatus = j.status(); } if (!currentStatus) { std::stringstream err; - err << "In ArchiveRequest::updateJobStatus(): copynb not found : " << copyNumberUpdated + err << "In ArchiveRequest::determineNextStep(): copynb not found : " << copyNumberUpdated << "existing ones: "; for (auto &j: jl) err << j.copynb() << " "; throw cta::exception::Exception(err.str()); @@ -830,13 +844,13 @@ auto ArchiveRequest::determineNextStep(uint32_t copyNumberUpdated, JobEvent jobE // Check status compatibility with event. switch (jobEvent) { case JobEvent::TransferFailed: - if (*currentStatus != ArchiveJobStatus::AJS_ToTransferForUser) { + if (*currentStatus != ArchiveJobStatus::AJS_ToTransferForUser && *currentStatus != ArchiveJobStatus::AJS_ToTransferForRepack) { // Wrong status, but the context leaves no ambiguity. Just warn. log::ScopedParamContainer params(lc); params.add("event", eventToString(jobEvent)) .add("status", statusToString(*currentStatus)) .add("fileId", m_payload.archivefileid()); - lc.log(log::WARNING, "In ArchiveRequest::updateJobStatus(): unexpected status. Assuming ToTransfer."); + lc.log(log::WARNING, "In ArchiveRequest::determineNextStep(): unexpected status. Assuming ToTransfer."); } break; case JobEvent::ReportFailed: @@ -846,7 +860,7 @@ auto ArchiveRequest::determineNextStep(uint32_t copyNumberUpdated, JobEvent jobE params.add("event", eventToString(jobEvent)) .add("status", statusToString(*currentStatus)) .add("fileId", m_payload.archivefileid()); - lc.log(log::WARNING, "In ArchiveRequest::updateJobStatus(): unexpected status. Failing the job."); + lc.log(log::WARNING, "In ArchiveRequest::determineNextStep(): unexpected status. Failing the job."); } } // We are in the normal cases now. @@ -854,18 +868,14 @@ auto ArchiveRequest::determineNextStep(uint32_t copyNumberUpdated, JobEvent jobE switch (jobEvent) { case JobEvent::TransferFailed: { + bool isRepack = m_payload.isrepack(); if (!m_payload.reportdecided()) { m_payload.set_reportdecided(true); - if(!m_payload.isrepack()){ - ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForReportForUser; - ret.nextStatus = serializers::ArchiveJobStatus::AJS_ToReportToUserForFailure; - } else { - ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForReportForRepack; - ret.nextStatus = serializers::ArchiveJobStatus::AJS_ToReportToRepackForFailure; - } + ret.nextStep = isRepack ? EnqueueingNextStep::NextStep::EnqueueForReportForRepack : EnqueueingNextStep::NextStep::EnqueueForReportForUser; + ret.nextStatus = isRepack ? serializers::ArchiveJobStatus::AJS_ToReportToRepackForFailure : serializers::ArchiveJobStatus::AJS_ToReportToUserForFailure; } else { - ret.nextStep = EnqueueingNextStep::NextStep::StoreInFailedJobsContainer; - ret.nextStatus = serializers::ArchiveJobStatus::AJS_Failed; + ret.nextStep = isRepack ? EnqueueingNextStep::NextStep::EnqueueForReportForRepack : EnqueueingNextStep::NextStep::StoreInFailedJobsContainer; + ret.nextStatus = isRepack ? serializers::ArchiveJobStatus::AJS_ToReportToRepackForFailure : serializers::ArchiveJobStatus::AJS_Failed; } } break; diff --git a/objectstore/ArchiveRequest.hpp b/objectstore/ArchiveRequest.hpp index 6936f967a1c55b695804566d571279b364bfc9e8..8d925e8150ec27b0a8633a900bc913bf6a0ec149 100644 --- a/objectstore/ArchiveRequest.hpp +++ b/objectstore/ArchiveRequest.hpp @@ -44,6 +44,7 @@ public: ArchiveRequest(Backend & os); ArchiveRequest(GenericObject & go); void initialize(); + void commit(); // Ownership of archive requests is managed per job. Object level owner has no meaning. std::string getOwner() = delete; void setOwner(const std::string &) = delete; diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index 8ab0fc970ff0834a66263f5c2c81827f3189ec85..672c5f5bb32066488f950ffd8ab27db34c6f33ee 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -489,6 +489,14 @@ void RetrieveRequest::setRetrieveFileQueueCriteria(const cta::common::dataStruct ArchiveFileSerDeser(criteria.archiveFile).serialize(*m_payload.mutable_archivefile()); for (auto &tf: criteria.archiveFile.tapeFiles) { MountPolicySerDeser(criteria.mountPolicy).serialize(*m_payload.mutable_mountpolicy()); + /* + * Explaination about these hardcoded retries : + * The hardcoded RetriesWithinMount will ensure that we will try to retrieve the file 3 times + * in the same mount. + * The hardcoded TotalRetries ensure that we will never try more than 6 times to retrieve a file. + * As totalretries = 6 and retrieswithinmount = 3, this will ensure that the file will be retried by maximum 2 mounts. + * (2 mounts * 3 retrieswithinmount = 6 totalretries) + */ const uint32_t hardcodedRetriesWithinMount = 3; const uint32_t hardcodedTotalRetries = 6; const uint32_t hardcodedReportRetries = 2; diff --git a/rdbms/Rset.cpp b/rdbms/Rset.cpp index 7950e08c95560c5c804cd4b667dc7f3095efe783..4c36b22cd6f15ceaef636320e708a253e5c99cf7 100644 --- a/rdbms/Rset.cpp +++ b/rdbms/Rset.cpp @@ -62,10 +62,15 @@ Rset &Rset::operator=(Rset &&rhs) { // columnString //------------------------------------------------------------------------------ std::string Rset::columnBlob(const std::string &colName) const { - if(nullptr == m_impl) { - throw InvalidResultSet(std::string(__FUNCTION__) + " failed: This result set is invalid"); + try { + if(nullptr == m_impl) { + throw InvalidResultSet("This result set is invalid"); + } + return m_impl->columnBlob(colName); + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } - return m_impl->columnBlob(colName); } //------------------------------------------------------------------------------ @@ -74,7 +79,7 @@ std::string Rset::columnBlob(const std::string &colName) const { std::string Rset::columnString(const std::string &colName) const { try { if(nullptr == m_impl) { - throw InvalidResultSet(std::string(__FUNCTION__) + " failed: This result set is invalid"); + throw InvalidResultSet("This result set is invalid"); } const optional<std::string> col = columnOptionalString(colName); @@ -84,7 +89,8 @@ std::string Rset::columnString(const std::string &colName) const { throw NullDbValue(std::string("Database column ") + colName + " contains a null value"); } } catch(exception::Exception &ex) { - throw exception::Exception(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } } @@ -94,7 +100,7 @@ std::string Rset::columnString(const std::string &colName) const { uint64_t Rset::columnUint64(const std::string &colName) const { try { if(nullptr == m_impl) { - throw InvalidResultSet(std::string(__FUNCTION__) + " failed: This result set is invalid"); + throw InvalidResultSet("This result set is invalid"); } const optional<uint64_t> col = columnOptionalUint64(colName); @@ -104,7 +110,8 @@ uint64_t Rset::columnUint64(const std::string &colName) const { throw NullDbValue(std::string("Database column ") + colName + " contains a null value"); } } catch(exception::Exception &ex) { - throw exception::Exception(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } } @@ -114,7 +121,7 @@ uint64_t Rset::columnUint64(const std::string &colName) const { bool Rset::columnBool(const std::string &colName) const { try { if(nullptr == m_impl) { - throw InvalidResultSet(std::string(__FUNCTION__) + " failed: This result set is invalid"); + throw InvalidResultSet("This result set is invalid"); } const optional<bool> col = columnOptionalBool(colName); @@ -124,7 +131,8 @@ bool Rset::columnBool(const std::string &colName) const { throw NullDbValue(std::string("Database column ") + colName + " contains a null value"); } } catch(exception::Exception &ex) { - throw exception::Exception(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } } @@ -134,7 +142,7 @@ bool Rset::columnBool(const std::string &colName) const { optional<bool> Rset::columnOptionalBool(const std::string &colName) const { try { if(nullptr == m_impl) { - throw InvalidResultSet(std::string(__FUNCTION__) + " failed: This result set is invalid"); + throw InvalidResultSet("This result set is invalid"); } const auto column = columnOptionalUint64(colName); @@ -144,7 +152,8 @@ optional<bool> Rset::columnOptionalBool(const std::string &colName) const { return nullopt; } } catch(exception::Exception &ex) { - throw exception::Exception(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } } @@ -152,28 +161,38 @@ optional<bool> Rset::columnOptionalBool(const std::string &colName) const { // getSql //------------------------------------------------------------------------------ const std::string &Rset::getSql() const { - if(nullptr == m_impl) { - throw InvalidResultSet(std::string(__FUNCTION__) + " failed: This result set is invalid"); + try { + if(nullptr == m_impl) { + throw InvalidResultSet("This result set is invalid"); + } + return m_impl->getSql(); + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } - return m_impl->getSql(); } //------------------------------------------------------------------------------ // next //------------------------------------------------------------------------------ bool Rset::next() { - if(nullptr == m_impl) { - throw InvalidResultSet(std::string(__FUNCTION__) + " failed: This result set is invalid"); - } + try { + if(nullptr == m_impl) { + throw InvalidResultSet("This result set is invalid"); + } - const bool aRowHasBeenRetrieved = m_impl->next(); + const bool aRowHasBeenRetrieved = m_impl->next(); - // Release resources of result set when its end has been reached - if(!aRowHasBeenRetrieved) { - m_impl.reset(nullptr); - } + // Release resources of result set when its end has been reached + if(!aRowHasBeenRetrieved) { + m_impl.reset(nullptr); + } - return aRowHasBeenRetrieved; + return aRowHasBeenRetrieved; + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; + } } //------------------------------------------------------------------------------ @@ -188,30 +207,45 @@ bool Rset::isEmpty() const // columnIsNull //------------------------------------------------------------------------------ bool Rset::columnIsNull(const std::string &colName) const { - if(nullptr == m_impl) { - throw InvalidResultSet(std::string(__FUNCTION__) + " failed: This result set is invalid"); + try { + if(nullptr == m_impl) { + throw InvalidResultSet("This result set is invalid"); + } + return m_impl->columnIsNull(colName); + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } - return m_impl->columnIsNull(colName); } //------------------------------------------------------------------------------ // columnOptionalString //------------------------------------------------------------------------------ optional<std::string> Rset::columnOptionalString(const std::string &colName) const { - if(nullptr == m_impl) { - throw InvalidResultSet(std::string(__FUNCTION__) + " failed: This result set is invalid"); + try { + if(nullptr == m_impl) { + throw InvalidResultSet("This result set is invalid"); + } + return m_impl->columnOptionalString(colName); + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } - return m_impl->columnOptionalString(colName); } //------------------------------------------------------------------------------ // columnOptionalUint64 //------------------------------------------------------------------------------ optional<uint64_t> Rset::columnOptionalUint64(const std::string &colName) const { - if(nullptr == m_impl) { - throw InvalidResultSet(std::string(__FUNCTION__) + " failed: This result set is invalid"); + try { + if(nullptr == m_impl) { + throw InvalidResultSet("This result set is invalid"); + } + return m_impl->columnOptionalUint64(colName); + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } - return m_impl->columnOptionalUint64(colName); } //------------------------------------------------------------------------------ @@ -220,7 +254,7 @@ optional<uint64_t> Rset::columnOptionalUint64(const std::string &colName) const double Rset::columnDouble(const std::string &colName) const { try { if(nullptr == m_impl) { - throw InvalidResultSet(std::string(__FUNCTION__) + " failed: This result set is invalid"); + throw InvalidResultSet("This result set is invalid"); } const optional<double> col = columnOptionalDouble(colName); @@ -230,7 +264,8 @@ double Rset::columnDouble(const std::string &colName) const { throw NullDbValue(std::string("Database column ") + colName + " contains a null value"); } } catch(exception::Exception &ex) { - throw exception::Exception(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } } @@ -238,10 +273,15 @@ double Rset::columnDouble(const std::string &colName) const { // columnOptionalDouble //------------------------------------------------------------------------------ optional<double> Rset::columnOptionalDouble(const std::string &colName) const { - if(nullptr == m_impl) { - throw InvalidResultSet(std::string(__FUNCTION__) + " failed: This result set is invalid"); + try { + if(nullptr == m_impl) { + throw InvalidResultSet("This result set is invalid"); + } + return m_impl->columnOptionalDouble(colName); + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } - return m_impl->columnOptionalDouble(colName); } } // namespace rdbms diff --git a/rdbms/Stmt.cpp b/rdbms/Stmt.cpp index c465d85f6f5fd7742d9b919e45d24b85baea6bc9..e0bb8d295bed6b4dbedb713c98680e318ff56911 100644 --- a/rdbms/Stmt.cpp +++ b/rdbms/Stmt.cpp @@ -88,10 +88,15 @@ Stmt &Stmt::operator=(Stmt &&rhs) { // getSql //----------------------------------------------------------------------------- const std::string &Stmt::getSql() const { - if(nullptr != m_stmt) { - return m_stmt->getSql(); - } else { - throw exception::Exception(std::string(__FUNCTION__) + " failed: Stmt does not contain a cached statement"); + try { + if(nullptr != m_stmt) { + return m_stmt->getSql(); + } else { + throw exception::Exception("Stmt does not contain a cached statement"); + } + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } } @@ -99,10 +104,15 @@ const std::string &Stmt::getSql() const { // getParamIdx //----------------------------------------------------------------------------- uint32_t Stmt::getParamIdx(const std::string ¶mName) const { - if(nullptr != m_stmt) { - return m_stmt->getParamIdx(paramName); - } else { - throw exception::Exception(std::string(__FUNCTION__) + " failed: Stmt does not contain a cached statement"); + try { + if(nullptr != m_stmt) { + return m_stmt->getParamIdx(paramName); + } else { + throw exception::Exception("Stmt does not contain a cached statement"); + } + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } } @@ -110,10 +120,15 @@ uint32_t Stmt::getParamIdx(const std::string ¶mName) const { // bindUint64 //----------------------------------------------------------------------------- void Stmt::bindUint64(const std::string ¶mName, const uint64_t paramValue) { - if(nullptr != m_stmt) { - return m_stmt->bindUint64(paramName, paramValue); - } else { - throw exception::Exception(std::string(__FUNCTION__) + " failed: Stmt does not contain a cached statement"); + try { + if(nullptr != m_stmt) { + return m_stmt->bindUint64(paramName, paramValue); + } else { + throw exception::Exception("Stmt does not contain a cached statement"); + } + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } } @@ -121,10 +136,15 @@ void Stmt::bindUint64(const std::string ¶mName, const uint64_t paramValue) { // bindOptionalUint64 //----------------------------------------------------------------------------- void Stmt::bindOptionalUint64(const std::string ¶mName, const optional<uint64_t> ¶mValue) { - if(nullptr != m_stmt) { - return m_stmt->bindOptionalUint64(paramName, paramValue); - } else { - throw exception::Exception(std::string(__FUNCTION__) + " failed: Stmt does not contain a cached statement"); + try { + if(nullptr != m_stmt) { + return m_stmt->bindOptionalUint64(paramName, paramValue); + } else { + throw exception::Exception("Stmt does not contain a cached statement"); + } + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } } @@ -132,10 +152,15 @@ void Stmt::bindOptionalUint64(const std::string ¶mName, const optional<uint6 // bindDouble //----------------------------------------------------------------------------- void Stmt::bindDouble(const std::string ¶mName, const double paramValue) { - if(nullptr != m_stmt) { - return m_stmt->bindDouble(paramName, paramValue); - } else { - throw exception::Exception(std::string(__FUNCTION__) + " failed: Stmt does not contain a cached statement"); + try { + if(nullptr != m_stmt) { + return m_stmt->bindDouble(paramName, paramValue); + } else { + throw exception::Exception("Stmt does not contain a cached statement"); + } + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } } @@ -143,10 +168,15 @@ void Stmt::bindDouble(const std::string ¶mName, const double paramValue) { // bindOptionalDouble //----------------------------------------------------------------------------- void Stmt::bindOptionalDouble(const std::string ¶mName, const optional<double> ¶mValue) { - if(nullptr != m_stmt) { - return m_stmt->bindOptionalDouble(paramName, paramValue); - } else { - throw exception::Exception(std::string(__FUNCTION__) + " failed: Stmt does not contain a cached statement"); + try { + if(nullptr != m_stmt) { + return m_stmt->bindOptionalDouble(paramName, paramValue); + } else { + throw exception::Exception("Stmt does not contain a cached statement"); + } + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } } @@ -154,10 +184,15 @@ void Stmt::bindOptionalDouble(const std::string ¶mName, const optional<doubl // bindBool //----------------------------------------------------------------------------- void Stmt::bindBool(const std::string ¶mName, const bool paramValue) { - if(nullptr != m_stmt) { - return m_stmt->bindBool(paramName, paramValue); - } else { - throw exception::Exception(std::string(__FUNCTION__) + " failed: Stmt does not contain a cached statement"); + try { + if(nullptr != m_stmt) { + return m_stmt->bindBool(paramName, paramValue); + } else { + throw exception::Exception("Stmt does not contain a cached statement"); + } + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } } @@ -165,10 +200,15 @@ void Stmt::bindBool(const std::string ¶mName, const bool paramValue) { // bindOptionalBool //----------------------------------------------------------------------------- void Stmt::bindOptionalBool(const std::string ¶mName, const optional<bool> ¶mValue) { - if(nullptr != m_stmt) { - return m_stmt->bindOptionalBool(paramName, paramValue); - } else { - throw exception::Exception(std::string(__FUNCTION__) + " failed: Stmt does not contain a cached statement"); + try { + if(nullptr != m_stmt) { + return m_stmt->bindOptionalBool(paramName, paramValue); + } else { + throw exception::Exception("Stmt does not contain a cached statement"); + } + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } } @@ -176,10 +216,15 @@ void Stmt::bindOptionalBool(const std::string ¶mName, const optional<bool> & // bindString //----------------------------------------------------------------------------- void Stmt::bindBlob(const std::string ¶mName, const std::string ¶mValue) { - if(nullptr != m_stmt) { - return m_stmt->bindBlob(paramName, paramValue); - } else { - throw exception::Exception(std::string(__FUNCTION__) + " failed: Stmt does not contain a cached statement"); + try { + if(nullptr != m_stmt) { + return m_stmt->bindBlob(paramName, paramValue); + } else { + throw exception::Exception("Stmt does not contain a cached statement"); + } + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } } @@ -187,10 +232,15 @@ void Stmt::bindBlob(const std::string ¶mName, const std::string ¶mValue) // bindString //----------------------------------------------------------------------------- void Stmt::bindString(const std::string ¶mName, const std::string ¶mValue) { - if(nullptr != m_stmt) { - return m_stmt->bindString(paramName, paramValue); - } else { - throw exception::Exception(std::string(__FUNCTION__) + " failed: Stmt does not contain a cached statement"); + try { + if(nullptr != m_stmt) { + return m_stmt->bindString(paramName, paramValue); + } else { + throw exception::Exception("Stmt does not contain a cached statement"); + } + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } } @@ -198,10 +248,15 @@ void Stmt::bindString(const std::string ¶mName, const std::string ¶mValu // bindOptionalString //----------------------------------------------------------------------------- void Stmt::bindOptionalString(const std::string ¶mName, const optional<std::string> ¶mValue) { - if(nullptr != m_stmt) { - return m_stmt->bindOptionalString(paramName, paramValue); - } else { - throw exception::Exception(std::string(__FUNCTION__) + " failed: Stmt does not contain a cached statement"); + try { + if(nullptr != m_stmt) { + return m_stmt->bindOptionalString(paramName, paramValue); + } else { + throw exception::Exception("Stmt does not contain a cached statement"); + } + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } } @@ -209,10 +264,15 @@ void Stmt::bindOptionalString(const std::string ¶mName, const optional<std:: // executeQuery //----------------------------------------------------------------------------- Rset Stmt::executeQuery() { - if(nullptr != m_stmt) { - return Rset(m_stmt->executeQuery()); - } else { - throw exception::Exception(std::string(__FUNCTION__) + " failed: Stmt does not contain a cached statement"); + try { + if(nullptr != m_stmt) { + return Rset(m_stmt->executeQuery()); + } else { + throw exception::Exception("Stmt does not contain a cached statement"); + } + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } } @@ -220,10 +280,15 @@ Rset Stmt::executeQuery() { // executeNonQuery //----------------------------------------------------------------------------- void Stmt::executeNonQuery() { - if(nullptr != m_stmt) { - return m_stmt->executeNonQuery(); - } else { - throw exception::Exception(std::string(__FUNCTION__) + " failed: Stmt does not contain a cached statement"); + try { + if(nullptr != m_stmt) { + return m_stmt->executeNonQuery(); + } else { + throw exception::Exception("Stmt does not contain a cached statement"); + } + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } } @@ -231,10 +296,15 @@ void Stmt::executeNonQuery() { // getNbAffectedRows //----------------------------------------------------------------------------- uint64_t Stmt::getNbAffectedRows() const { - if(nullptr != m_stmt) { - return m_stmt->getNbAffectedRows(); - } else { - throw exception::Exception(std::string(__FUNCTION__) + " failed: Stmt does not contain a cached statement"); + try { + if(nullptr != m_stmt) { + return m_stmt->getNbAffectedRows(); + } else { + throw exception::Exception("Stmt does not contain a cached statement"); + } + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } } @@ -242,10 +312,15 @@ uint64_t Stmt::getNbAffectedRows() const { // getStmt //----------------------------------------------------------------------------- wrapper::StmtWrapper &Stmt::getStmt() { - if(nullptr != m_stmt) { - return *m_stmt; - } else { - throw exception::Exception(std::string(__FUNCTION__) + " failed: Stmt does not contain a cached statement"); + try { + if(nullptr != m_stmt) { + return *m_stmt; + } else { + throw exception::Exception("Stmt does not contain a cached statement"); + } + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + throw; } } diff --git a/scheduler/ArchiveMount.cpp b/scheduler/ArchiveMount.cpp index fbfe60f96f809b8b48d790412c6f28a77bc64a87..42d5cf2bce0461ddc5eac4d36c6d9d297bc2d5d6 100644 --- a/scheduler/ArchiveMount.cpp +++ b/scheduler/ArchiveMount.cpp @@ -154,7 +154,7 @@ std::list<std::unique_ptr<cta::ArchiveJob> > cta::ArchiveMount::getNextJobBatch( // reportJobsBatchWritten //------------------------------------------------------------------------------ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<cta::ArchiveJob> > & successfulArchiveJobs, - std::queue<cta::catalogue::TapeItemWritten> & skippedFiles, cta::log::LogContext& logContext) { + std::queue<cta::catalogue::TapeItemWritten> & skippedFiles, std::queue<std::unique_ptr<cta::ArchiveJob>>& failedToReportArchiveJobs,cta::log::LogContext& logContext) { std::set<cta::catalogue::TapeItemWrittenPointer> tapeItemsWritten; std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs; std::unique_ptr<cta::ArchiveJob> job; @@ -224,6 +224,9 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct } const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): got an exception"; logContext.log(cta::log::ERR, msg_error); + for(auto &aj: validatedSuccessfulArchiveJobs){ + failedToReportArchiveJobs.push(std::move(aj)); + } throw cta::ArchiveMount::FailedMigrationRecallResult(msg_error); } catch(const std::exception& e){ cta::log::ScopedParamContainer params(logContext); @@ -236,6 +239,9 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct } const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): got an standard exception"; logContext.log(cta::log::ERR, msg_error); + for(auto &aj: validatedSuccessfulArchiveJobs){ + failedToReportArchiveJobs.push(std::move(aj)); + } throw cta::ArchiveMount::FailedMigrationRecallResult(msg_error); } } diff --git a/scheduler/ArchiveMount.hpp b/scheduler/ArchiveMount.hpp index 06bf04077baacaa8076075e71a3a5a0c1f9f5737..df62d10ae41afc3ad4a38ab3105728c4bf485595 100644 --- a/scheduler/ArchiveMount.hpp +++ b/scheduler/ArchiveMount.hpp @@ -149,7 +149,7 @@ namespace cta { * @param logContext */ virtual void reportJobsBatchTransferred (std::queue<std::unique_ptr<cta::ArchiveJob> > & successfulArchiveJobs, - std::queue<cta::catalogue::TapeItemWritten> & skippedFiles, cta::log::LogContext &logContext); + std::queue<cta::catalogue::TapeItemWritten> & skippedFiles, std::queue<std::unique_ptr<cta::ArchiveJob>>& failedToReportArchiveJobs, cta::log::LogContext &logContext); /** * Returns the tape pool of the tape to be mounted. diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 83998e8c72d6a9ddbd6e9e8549ed27b4f34e4c12..b1b057272dcf8507be206dbb7d3f7789e67d6ca3 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -1804,7 +1804,7 @@ std::unique_ptr<SchedulerDatabase::RepackReportBatch> OStoreDB::getNextSuccessfu // As we are popping from a single report queue, all requests should concern only one repack request. if (repackRequestAddresses.size() != 1) { std::stringstream err; - err << "In OStoreDB::getNextSuccessfulRetrieveRepackReportBatch(): reports for several repack requests in the same queue. "; + err << "In OStoreDB::getNextSuccessfulArchiveRepackReportBatch(): reports for several repack requests in the same queue. "; for (auto & rr: repackRequestAddresses) { err << rr << " "; } throw exception::Exception(err.str()); } @@ -2240,6 +2240,15 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest> for (auto & ar: archiveRoutesMap.at(std::make_tuple(rsr.archiveFile.diskInstance, rsr.archiveFile.storageClass))) { rRRepackInfo.archiveRouteMap[ar.second.copyNb] = ar.second.tapePoolName; } + //Check that we do not have the same destination tapepool for two different copyNb + for(auto & currentCopyNbTapePool: rRRepackInfo.archiveRouteMap){ + int nbTapepool = std::count_if(rRRepackInfo.archiveRouteMap.begin(),rRRepackInfo.archiveRouteMap.end(),[¤tCopyNbTapePool](const std::pair<uint64_t,std::string> & copyNbTapepool){ + return copyNbTapepool.second == currentCopyNbTapePool.second; + }); + if(nbTapepool != 1){ + throw cta::ExpandRepackRequestException("In OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(), found the same destination tapepool for different copyNb."); + } + } } catch (std::out_of_range &) { notCreatedSubrequests.emplace_back(rsr); failedCreationStats.files++; @@ -4012,7 +4021,7 @@ void OStoreDB::ArchiveJob::failTransfer(const std::string& failureReason, log::L .add("totalRetries", retryStatus.totalRetries) .add("maxTotalRetries", retryStatus.maxTotalRetries); lc.log(log::INFO, - "In ArchiveJob::failTransfer(): requeued job for (potentially in-mount) retry."); + "In ArchiveJob::failTransfer(): requeued job for (potentially in-mount) retry (repack)."); return; } case NextStep::StoreInFailedJobsContainer: { @@ -4233,12 +4242,13 @@ objectstore::RepackRequest::SubrequestStatistics::List OStoreDB::RepackArchiveRe ssl.back().fSeq = sri.repackInfo.fSeq; ssl.back().copyNb = sri.archivedCopyNb; for(auto &j: sri.archiveJobsStatusMap){ - if(j.first != sri.archivedCopyNb && - (j.second != objectstore::serializers::ArchiveJobStatus::AJS_Complete) && - (j.second != objectstore::serializers::ArchiveJobStatus::AJS_Failed)){ - break; - } else { - ssl.back().subrequestDeleted = true; + if(j.first != sri.archivedCopyNb){ + if((j.second != objectstore::serializers::ArchiveJobStatus::AJS_Complete) && (j.second != objectstore::serializers::ArchiveJobStatus::AJS_Failed)){ + break; + } else { + ssl.back().subrequestDeleted = true; + break; + } } } } @@ -4279,21 +4289,35 @@ void OStoreDB::RepackArchiveReportBatch::report(log::LogContext& lc){ for (auto &sri: m_subrequestList) { bufferURL = sri.repackInfo.fileBufferURL; bool moreJobsToDo = false; + //Check if the ArchiveRequest contains other jobs that are not finished for (auto &j: sri.archiveJobsStatusMap) { - if ((j.first != sri.archivedCopyNb) && - (j.second != serializers::ArchiveJobStatus::AJS_Complete) && - (j.second != serializers::ArchiveJobStatus::AJS_Failed)) { - moreJobsToDo = true; - break; + //Getting the siblings jobs (ie copy nb != current one) + if (j.first != sri.archivedCopyNb) { + //Sibling job not finished mean its status is nor AJS_Complete nor AJS_Failed + if ((j.second != serializers::ArchiveJobStatus::AJS_Complete) && + (j.second != serializers::ArchiveJobStatus::AJS_Failed)) { + //The sibling job is not finished, but maybe it is planned to change its status, checking the jobOwnerUpdaterList that is the list containing the jobs + //we want to change its status to AJS_Complete + bool copyNbStatusUpdating = (std::find_if(jobOwnerUpdatersList.begin(), jobOwnerUpdatersList.end(), [&j,&sri](JobOwnerUpdaters &jou){ + return ((jou.subrequestInfo.archiveFile.archiveFileID == sri.archiveFile.archiveFileID) && (jou.subrequestInfo.archivedCopyNb == j.first)); + }) != jobOwnerUpdatersList.end()); + if(!copyNbStatusUpdating){ + //The sibling job is not in the jobOwnerUpdaterList, it means that it is not finished yet, there is more jobs to do + moreJobsToDo = true; + break; + } + } } } objectstore::ArchiveRequest & ar = *sri.subrequest; if (moreJobsToDo) { try { - jobOwnerUpdatersList.push_back(JobOwnerUpdaters{std::unique_ptr<objectstore::ArchiveRequest::AsyncJobOwnerUpdater> ( - ar.asyncUpdateJobOwner(sri.archivedCopyNb, "", m_oStoreDb.m_agentReference->getAgentAddress(), - newStatus)), - sri}); + if(ar.exists()){ + jobOwnerUpdatersList.push_back(JobOwnerUpdaters{std::unique_ptr<objectstore::ArchiveRequest::AsyncJobOwnerUpdater> ( + ar.asyncUpdateJobOwner(sri.archivedCopyNb, "", m_oStoreDb.m_agentReference->getAgentAddress(), + newStatus)), + sri}); + } } catch (cta::exception::Exception & ex) { // Log the error log::ScopedParamContainer params(lc); @@ -4400,7 +4424,7 @@ void OStoreDB::RepackArchiveReportBatch::report(log::LogContext& lc){ params.add("fileId", jou.subrequestInfo.archiveFile.archiveFileID) .add("subrequestAddress", jou.subrequestInfo.subrequest->getAddressIfSet()) .add("exceptionMsg", ex.getMessageValue()); - lc.log(log::ERR, "In OStoreDB::RepackArchiveReportBatch::report(): async job update."); + lc.log(log::ERR, "In OStoreDB::RepackArchiveReportBatch::report(): async job update failed."); } } timingList.insertAndReset("asyncUpdateOrDeleteCompletionTime", t); diff --git a/scheduler/RepackRequestManager.cpp b/scheduler/RepackRequestManager.cpp index 62a615b5f2c1189cda4f81845d0f18879da19a25..7f7146793d46699bcea90dc380e284149840d772 100644 --- a/scheduler/RepackRequestManager.cpp +++ b/scheduler/RepackRequestManager.cpp @@ -41,7 +41,10 @@ void RepackRequestManager::runOnePass(log::LogContext& lc) { //We have a RepackRequest that has the status ToExpand, expand it try{ m_scheduler.expandRepackRequest(repackRequest,timingList,t,lc); - } catch(const cta::exception::Exception &e){ + } catch (const ExpandRepackRequestException& ex){ + lc.log(log::ERR,ex.what()); + repackRequest->fail(); + } catch (const cta::exception::Exception &e){ lc.log(log::ERR,e.what()); repackRequest->fail(); throw(e); diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index b2d4050de72a2bf3ddffbc639c05aae65c41493b..62a65d0e70160cc305efa7112de240449e8872cd 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -450,13 +450,7 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques auto repackInfo = repackRequest->getRepackInfo(); typedef cta::common::dataStructures::RepackInfo::Type RepackType; - if (repackInfo.type != RepackType::MoveOnly) { - log::ScopedParamContainer params(lc); - params.add("tapeVid", repackInfo.vid); - lc.log(log::ERR, "In Scheduler::expandRepackRequest(): failing repack request with unsupported (yet) type."); - repackRequest->fail(); - return; - } + //We need to get the ArchiveRoutes to allow the retrieval of the tapePool in which the //tape where the file is is located std::list<common::dataStructures::ArchiveRoute> routes = m_catalogue.getArchiveRoutes(); @@ -478,10 +472,10 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques std::stringstream dirBufferURL; dirBufferURL << repackInfo.repackBufferBaseURL << "/" << repackInfo.vid << "/"; std::set<std::string> filesInDirectory; + std::unique_ptr<cta::disk::Directory> dir; if(archiveFilesForCatalogue.hasMore()){ //We only create the folder if there are some files to Repack cta::disk::DirectoryFactory dirFactory; - std::unique_ptr<cta::disk::Directory> dir; dir.reset(dirFactory.createDirectory(dirBufferURL.str())); if(dir->exist()){ filesInDirectory = dir->getFilesName(); @@ -491,6 +485,11 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques } double elapsedTime = 0; bool stopExpansion = false; + + std::list<common::dataStructures::StorageClass> storageClasses; + if(repackInfo.type == RepackType::AddCopiesOnly || repackInfo.type == RepackType::MoveAndAddCopies) + storageClasses = m_catalogue.getStorageClasses(); + repackRequest->m_dbReq->setExpandStartedAndChangeStatus(); while(archiveFilesForCatalogue.hasMore() && !stopExpansion) { size_t filesCount = 0; @@ -526,6 +525,61 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques //retrieveSubRequest.fSeq = (retrieveSubRequest.fSeq == std::numeric_limits<decltype(retrieveSubRequest.fSeq)>::max()) ? tc.fSeq : std::max(tc.fSeq, retrieveSubRequest.fSeq); } } + + if(repackInfo.type == RepackType::AddCopiesOnly || repackInfo.type == RepackType::MoveAndAddCopies){ + //We are in the case where we possibly need to create new copies (if the number of copies the storage class of the current ArchiveFile + //is greater than the number of tape files we have in the current ArchiveFile) + auto archiveFileRoutes = archiveRoutesMap[std::make_pair(archiveFile.diskInstance,archiveFile.storageClass)]; + auto storageClassItor = std::find_if(storageClasses.begin(),storageClasses.end(),[&archiveFile](const common::dataStructures::StorageClass& sc){ + return sc.name == archiveFile.storageClass; + }); + if(storageClassItor != storageClasses.end()){ + common::dataStructures::StorageClass sc = *storageClassItor; + uint64_t nbFilesAlreadyArchived = getNbFilesAlreadyArchived(archiveFile); + uint64_t nbCopiesInStorageClass = sc.nbCopies; + uint64_t filesToArchive = nbCopiesInStorageClass - nbFilesAlreadyArchived; + if(filesToArchive > 0){ + totalStatsFile.totalFilesToArchive += filesToArchive; + totalStatsFile.totalBytesToArchive += (filesToArchive * archiveFile.fileSize); + std::set<uint64_t> copyNbsAlreadyInCTA; + for (auto & tc: archiveFile.tapeFiles) { + copyNbsAlreadyInCTA.insert(tc.copyNb); + if (tc.vid == repackInfo.vid) { + // We make the (reasonable) assumption that the archive file only has one copy on this tape. + // If not, we will ensure the subrequest is filed under the lowest fSeq existing on this tape. + // This will prevent double subrequest creation (we already have such a mechanism in case of crash and + // restart of expansion. + //We found the copy of the file we want to retrieve and archive + //retrieveSubRequest.fSeq = tc.fSeq; + if(repackInfo.type == RepackType::AddCopiesOnly) + retrieveSubRequest.fSeq = (retrieveSubRequest.fSeq == std::numeric_limits<decltype(retrieveSubRequest.fSeq)>::max()) ? tc.fSeq : std::max(tc.fSeq, retrieveSubRequest.fSeq); + } + } + for(auto archiveFileRoutesItor = archiveFileRoutes.begin(); archiveFileRoutesItor != archiveFileRoutes.end(); ++archiveFileRoutesItor){ + if(copyNbsAlreadyInCTA.find(archiveFileRoutesItor->first) == copyNbsAlreadyInCTA.end()){ + //We need to archive the missing copy + retrieveSubRequest.copyNbsToRearchive.insert(archiveFileRoutesItor->first); + } + } + if(retrieveSubRequest.copyNbsToRearchive.size() < filesToArchive){ + deleteRepackBuffer(std::move(dir)); + throw ExpandRepackRequestException("In Scheduler::expandRepackRequest(): Missing archive routes for the creation of the new copies of the files"); + } + } else { + if(repackInfo.type == RepackType::AddCopiesOnly){ + //Nothing to Archive so nothing to Retrieve as well + retrieveSubrequests.pop_back(); + continue; + } + } + } else { + //No storage class have been found for the current tapefile throw an exception + deleteRepackBuffer(std::move(dir)); + throw ExpandRepackRequestException("In Scheduler::expandRepackRequest(): No storage class have been found for the file to add copies"); + } + } + + std::stringstream fileName; fileName << std::setw(9) << std::setfill('0') << retrieveSubRequest.fSeq; bool createArchiveSubrequest = false; @@ -542,16 +596,13 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques if(!createArchiveSubrequest){ totalStatsFile.totalBytesToRetrieve += retrieveSubRequest.archiveFile.fileSize; totalStatsFile.totalFilesToRetrieve += 1; - if (repackInfo.type == RepackType::MoveAndAddCopies || repackInfo.type == RepackType::AddCopiesOnly) { - // We should not get here are the type is filtered at the beginning of the function. - // TODO: add support for expand. - throw cta::exception::Exception("In Scheduler::expandRepackRequest(): expand not yet supported."); - } - if ((retrieveSubRequest.fSeq == std::numeric_limits<decltype(retrieveSubRequest.fSeq)>::max()) || retrieveSubRequest.copyNbsToRearchive.empty()) { + if (retrieveSubRequest.fSeq == std::numeric_limits<decltype(retrieveSubRequest.fSeq)>::max()) { log::ScopedParamContainer params(lc); params.add("fileId", retrieveSubRequest.archiveFile.archiveFileID) .add("repackVid", repackInfo.vid); lc.log(log::ERR, "In Scheduler::expandRepackRequest(): no fSeq found for this file on this tape."); + totalStatsFile.totalBytesToRetrieve -= retrieveSubRequest.archiveFile.fileSize; + totalStatsFile.totalFilesToRetrieve -= 1; retrieveSubrequests.pop_back(); } else { // We found some copies to rearchive. We still have to decide which file path we are going to use. @@ -564,12 +615,17 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques } } stopExpansion = (elapsedTime >= m_repackRequestExpansionTimeLimit); - } + } // Note: the highest fSeq will be recorded internally in the following call. // We know that the fSeq processed on the tape are >= initial fSeq + filesCount - 1 (or fSeq - 1 as we counted). // We pass this information to the db for recording in the repack request. This will allow restarting from the right // value in case of crash. - repackRequest->m_dbReq->addSubrequestsAndUpdateStats(retrieveSubrequests, archiveRoutesMap, fSeq, maxAddedFSeq, totalStatsFile, lc); + try{ + repackRequest->m_dbReq->addSubrequestsAndUpdateStats(retrieveSubrequests, archiveRoutesMap, fSeq, maxAddedFSeq, totalStatsFile, lc); + } catch(const cta::ExpandRepackRequestException& e){ + deleteRepackBuffer(std::move(dir)); + throw e; + } timingList.insertAndReset("addSubrequestsAndUpdateStatsTime",t); { if(!stopExpansion && archiveFilesForCatalogue.hasMore()){ @@ -589,6 +645,10 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques lc.log(log::INFO,"Expansion time reached, Repack Request requeued in ToExpand queue."); } } else { + if(totalStatsFile.totalFilesToRetrieve == 0){ + //If no files have been retrieve, the repack buffer will have to be deleted + deleteRepackBuffer(std::move(dir)); + } repackRequest->m_dbReq->expandDone(); lc.log(log::INFO,"In Scheduler::expandRepackRequest(), repack request expanded"); } @@ -1020,6 +1080,25 @@ cta::optional<common::dataStructures::LogicalLibrary> Scheduler::getLogicalLibra return ret; } +void Scheduler::deleteRepackBuffer(std::unique_ptr<cta::disk::Directory> repackBuffer) { + if(repackBuffer != nullptr && repackBuffer->exist()){ + repackBuffer->rmdir(); + } +} + +uint64_t Scheduler::getNbFilesAlreadyArchived(const common::dataStructures::ArchiveFile& archiveFile) { + uint64_t nbFilesAlreadyArchived = 0; + for(auto &tf: archiveFile.tapeFiles){ + if(tf.supersededByVid == ""){ + //We only want the "active" copies of the archive file + nbFilesAlreadyArchived++; + } + } + return nbFilesAlreadyArchived; +} + + + //------------------------------------------------------------------------------ // getNextMountDryRun //------------------------------------------------------------------------------ diff --git a/scheduler/Scheduler.hpp b/scheduler/Scheduler.hpp index 5d9dc60306e09048b5e592d0a22fd6893a35d00d..d378ce5a398ff468ae7bd013d9ff9ea5754245d5 100644 --- a/scheduler/Scheduler.hpp +++ b/scheduler/Scheduler.hpp @@ -52,6 +52,7 @@ #include "tapeserver/daemon/TapedConfiguration.hpp" +#include "disk/DiskFile.hpp" #include "disk/DiskReporter.hpp" #include "disk/DiskReporterFactory.hpp" @@ -74,6 +75,8 @@ class RetrieveJob; * The scheduler is the unique entry point to the central storage for taped. It is * */ +CTA_GENERATE_EXCEPTION_CLASS(ExpandRepackRequestException); + class Scheduler { public: @@ -310,6 +313,10 @@ private: cta::optional<common::dataStructures::LogicalLibrary> getLogicalLibrary(const std::string &libraryName, double &getLogicalLibraryTime); + void deleteRepackBuffer(std::unique_ptr<cta::disk::Directory> repackBuffer); + + uint64_t getNbFilesAlreadyArchived(const common::dataStructures::ArchiveFile& archiveFile); + public: /** * Run the mount decision logic lock free, so we have no contention in the diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index f93a7f4d04734fb6c34ff81ea93216636020d77a..3f7773055426c865cc7b74f8e6d15ad03abaf2f8 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -589,19 +589,26 @@ public: if (activityNameAndWeightedMountCount.value().weightedMountCount < other.activityNameAndWeightedMountCount.value().weightedMountCount) return false; } - if(minRequestAge < other.minRequestAge) + //The smaller the oldest job start time is, the bigger the age is, hence the inverted comparison + if(oldestJobStartTime > other.oldestJobStartTime) return true; - if(minRequestAge > other.minRequestAge) + if(oldestJobStartTime < other.oldestJobStartTime) return false; /** * For the tests, we try to have the priority by - * alphabetical order : vid1 should be treated before vid2, + * alphabetical order : vid1 / tapepool1 should be treated before vid2/tapepool2, * so if this->vid < other.vid : then this > other.vid, so return false */ if(vid < other.vid) return false; if(vid > other.vid) return true; + + if(tapePool < other.tapePool) + return false; + if(tapePool > other.tapePool) + return true; + return false; } }; diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 496495a7e0c47bceb092ac5930343286ccfe8940..d7af0f0dfe54cfa26d5a90615aef7cdae56529c8 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -248,7 +248,7 @@ protected: const std::string s_diskInstance = "disk_instance"; const std::string s_storageClassName = "TestStorageClass"; const cta::common::dataStructures::SecurityIdentity s_adminOnAdminHost = { "admin1", "host1" }; - const std::string s_tapePoolName = "TestTapePool"; + const std::string s_tapePoolName = "TapePool"; const std::string s_libraryName = "TestLogicalLibrary"; const std::string s_vid = "TestVid"; const std::string s_mediaType = "TestMediaType"; @@ -476,7 +476,7 @@ TEST_P(SchedulerTest, archive_report_and_retrieve_new_file) { auto & osdb=getSchedulerDB(); auto mi=osdb.getMountInfo(lc); ASSERT_EQ(1, mi->existingOrNextMounts.size()); - ASSERT_EQ("TestTapePool", mi->existingOrNextMounts.front().tapePool); + ASSERT_EQ("TapePool", mi->existingOrNextMounts.front().tapePool); ASSERT_EQ("TestVid", mi->existingOrNextMounts.front().vid); std::unique_ptr<cta::ArchiveMount> archiveMount; archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release())); @@ -492,8 +492,9 @@ TEST_P(SchedulerTest, archive_report_and_retrieve_new_file) { archiveJob->validate(); std::queue<std::unique_ptr <cta::ArchiveJob >> sDBarchiveJobBatch; std::queue<cta::catalogue::TapeItemWritten> sTapeItems; + std::queue<std::unique_ptr <cta::ArchiveJob >> failedToReportArchiveJobs; sDBarchiveJobBatch.emplace(std::move(archiveJob)); - archiveMount->reportJobsBatchTransferred(sDBarchiveJobBatch, sTapeItems, lc); + archiveMount->reportJobsBatchTransferred(sDBarchiveJobBatch, sTapeItems,failedToReportArchiveJobs, lc); archiveJobBatch = archiveMount->getNextJobBatch(1,1,lc); ASSERT_EQ(0, archiveJobBatch.size()); archiveMount->complete(); @@ -675,7 +676,7 @@ TEST_P(SchedulerTest, archive_and_retrieve_failure) { auto & osdb=getSchedulerDB(); auto mi=osdb.getMountInfo(lc); ASSERT_EQ(1, mi->existingOrNextMounts.size()); - ASSERT_EQ("TestTapePool", mi->existingOrNextMounts.front().tapePool); + ASSERT_EQ("TapePool", mi->existingOrNextMounts.front().tapePool); ASSERT_EQ("TestVid", mi->existingOrNextMounts.front().vid); std::unique_ptr<cta::ArchiveMount> archiveMount; archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release())); @@ -691,8 +692,9 @@ TEST_P(SchedulerTest, archive_and_retrieve_failure) { archiveJob->validate(); std::queue<std::unique_ptr <cta::ArchiveJob >> sDBarchiveJobBatch; std::queue<cta::catalogue::TapeItemWritten> sTapeItems; + std::queue<std::unique_ptr <cta::ArchiveJob >> failedToReportArchiveJobs; sDBarchiveJobBatch.emplace(std::move(archiveJob)); - archiveMount->reportJobsBatchTransferred(sDBarchiveJobBatch, sTapeItems, lc); + archiveMount->reportJobsBatchTransferred(sDBarchiveJobBatch, sTapeItems,failedToReportArchiveJobs, lc); archiveJobBatch = archiveMount->getNextJobBatch(1,1,lc); ASSERT_EQ(0, archiveJobBatch.size()); archiveMount->complete(); @@ -925,7 +927,7 @@ TEST_P(SchedulerTest, archive_and_retrieve_report_failure) { auto & osdb=getSchedulerDB(); auto mi=osdb.getMountInfo(lc); ASSERT_EQ(1, mi->existingOrNextMounts.size()); - ASSERT_EQ("TestTapePool", mi->existingOrNextMounts.front().tapePool); + ASSERT_EQ("TapePool", mi->existingOrNextMounts.front().tapePool); ASSERT_EQ("TestVid", mi->existingOrNextMounts.front().vid); std::unique_ptr<cta::ArchiveMount> archiveMount; archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release())); @@ -942,7 +944,8 @@ TEST_P(SchedulerTest, archive_and_retrieve_report_failure) { std::queue<std::unique_ptr <cta::ArchiveJob >> sDBarchiveJobBatch; std::queue<cta::catalogue::TapeItemWritten> sTapeItems; sDBarchiveJobBatch.emplace(std::move(archiveJob)); - archiveMount->reportJobsBatchTransferred(sDBarchiveJobBatch, sTapeItems, lc); + std::queue<std::unique_ptr<cta::ArchiveJob>> failedToReportArchiveJobs; + archiveMount->reportJobsBatchTransferred(sDBarchiveJobBatch, sTapeItems,failedToReportArchiveJobs, lc); archiveJobBatch = archiveMount->getNextJobBatch(1,1,lc); ASSERT_EQ(0, archiveJobBatch.size()); archiveMount->complete(); @@ -2832,7 +2835,7 @@ TEST_P(SchedulerTest, archiveReportMultipleAndQueueRetrievesWithActivities) { auto & osdb=getSchedulerDB(); auto mi=osdb.getMountInfo(lc); ASSERT_EQ(1, mi->existingOrNextMounts.size()); - ASSERT_EQ("TestTapePool", mi->existingOrNextMounts.front().tapePool); + ASSERT_EQ("TapePool", mi->existingOrNextMounts.front().tapePool); std::unique_ptr<cta::ArchiveMount> archiveMount; archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release())); ASSERT_NE(nullptr, archiveMount.get()); @@ -2848,8 +2851,9 @@ TEST_P(SchedulerTest, archiveReportMultipleAndQueueRetrievesWithActivities) { archiveJob->validate(); std::queue<std::unique_ptr <cta::ArchiveJob >> sDBarchiveJobBatch; std::queue<cta::catalogue::TapeItemWritten> sTapeItems; + std::queue<std::unique_ptr <cta::ArchiveJob >> failedToReportArchiveJobs; sDBarchiveJobBatch.emplace(std::move(archiveJob)); - archiveMount->reportJobsBatchTransferred(sDBarchiveJobBatch, sTapeItems, lc); + archiveMount->reportJobsBatchTransferred(sDBarchiveJobBatch, sTapeItems, failedToReportArchiveJobs, lc); // Mark the tape full so we get one file per tape. archiveMount->setTapeFull(); archiveMount->complete(); @@ -2971,6 +2975,511 @@ TEST_P(SchedulerTest, archiveReportMultipleAndQueueRetrievesWithActivities) { } } +TEST_P(SchedulerTest, expandRepackRequestAddCopiesOnly) { + using namespace cta; + using namespace cta::objectstore; + unitTests::TempDirectory tempDirectory; + auto &catalogue = getCatalogue(); + auto &scheduler = getScheduler(); + auto &schedulerDB = getSchedulerDB(); + cta::objectstore::Backend& backend = schedulerDB.getBackend(); + setupDefaultCatalogue(); +#ifdef STDOUT_LOGGING + log::StdoutLogger dl("dummy", "unitTest"); +#else + log::DummyLogger dl("", ""); +#endif + log::LogContext lc(dl); + + //Create an agent to represent this test process + cta::objectstore::AgentReference agentReference("expandRepackRequestTest", dl); + cta::objectstore::Agent agent(agentReference.getAgentAddress(), backend); + agent.initialize(); + agent.setTimeout_us(0); + agent.insertAndRegisterSelf(lc); + + const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000; + const bool disabledValue = false; + const bool fullValue = true; + const bool readOnlyValue = false; + const std::string comment = "Create tape"; + cta::common::dataStructures::SecurityIdentity admin; + admin.username = "admin_user_name"; + admin.host = "admin_host"; + + //Create a logical library in the catalogue + const bool logicalLibraryIsDisabled = false; + catalogue.createLogicalLibrary(admin, s_libraryName, logicalLibraryIsDisabled, "Create logical library"); + + //Create the source tape + std::string vid = "vidSource"; + catalogue.createTape(s_adminOnAdminHost,vid, s_mediaType, s_vendor, s_libraryName, s_tapePoolName, capacityInBytes, + disabledValue, fullValue, readOnlyValue, comment); + + //Create two different destination tapepool + std::string tapepool2Name = "tapepool2"; + const cta::optional<std::string> supply; + catalogue.createTapePool(admin,tapepool2Name,"vo",1,false,supply,"comment"); + + std::string tapepool3Name = "tapepool3"; + catalogue.createTapePool(admin,tapepool3Name,"vo",1,false,supply,"comment"); + + //Create a storage class in the catalogue + common::dataStructures::StorageClass storageClass; + storageClass.diskInstance = s_diskInstance; + storageClass.name = s_storageClassName; + storageClass.nbCopies = 3; + storageClass.comment = "Create storage class"; + catalogue.modifyStorageClassNbCopies(admin,storageClass.diskInstance,storageClass.name,storageClass.nbCopies); + + //Create the two archive routes for the new copies + catalogue.createArchiveRoute(admin,storageClass.diskInstance,storageClass.name,2,tapepool2Name,"ArchiveRoute2"); + catalogue.createArchiveRoute(admin,storageClass.diskInstance,storageClass.name,3,tapepool3Name,"ArchiveRoute3"); + + //Create two other destinationTape + std::string vidDestination1 = "vidDestination1"; + catalogue.createTape(s_adminOnAdminHost,vidDestination1, s_mediaType, s_vendor, s_libraryName, tapepool2Name, capacityInBytes, + disabledValue, false, readOnlyValue, comment); + + std::string vidDestination2 = "vidDestination2"; + catalogue.createTape(s_adminOnAdminHost,vidDestination2, s_mediaType, s_vendor, s_libraryName, tapepool3Name, capacityInBytes, + disabledValue, false, readOnlyValue, comment); + + const std::string tapeDrive = "tape_drive"; + const uint64_t nbArchiveFilesPerTape = 10; + const uint64_t archiveFileSize = 2 * 1000 * 1000 * 1000; + + //Simulate the writing of 10 files the source tape in the catalogue + std::set<catalogue::TapeItemWrittenPointer> tapeFilesWrittenCopy1; + { + uint64_t archiveFileId = 1; + std::string currentVid = vid; + for(uint64_t j = 1; j <= nbArchiveFilesPerTape; ++j) { + std::ostringstream diskFileId; + diskFileId << (12345677 + archiveFileId); + std::ostringstream diskFilePath; + diskFilePath << "/public_dir/public_file_"<<1<<"_"<< j; + auto fileWrittenUP=cta::make_unique<cta::catalogue::TapeFileWritten>(); + auto & fileWritten = *fileWrittenUP; + fileWritten.archiveFileId = archiveFileId++; + fileWritten.diskInstance = storageClass.diskInstance; + fileWritten.diskFileId = diskFileId.str(); + fileWritten.diskFilePath = diskFilePath.str(); + fileWritten.diskFileOwnerUid = PUBLIC_OWNER_UID; + fileWritten.diskFileGid = PUBLIC_GID; + fileWritten.size = archiveFileSize; + fileWritten.checksumBlob.insert(cta::checksum::ADLER32,"1234"); + fileWritten.storageClassName = s_storageClassName; + fileWritten.vid = currentVid; + fileWritten.fSeq = j; + fileWritten.blockId = j * 100; + fileWritten.size = archiveFileSize; + fileWritten.copyNb = 1; + fileWritten.tapeDrive = tapeDrive; + tapeFilesWrittenCopy1.emplace(fileWrittenUP.release()); + } + //update the DB tape + catalogue.filesWrittenToTape(tapeFilesWrittenCopy1); + tapeFilesWrittenCopy1.clear(); + } + //Test the expanding requeue the Repack after the creation of + //one retrieve request + scheduler.waitSchedulerDbSubthreadsComplete(); + { + scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::AddCopiesOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, s_defaultRepackDisabledTapeFlag,lc); + scheduler.waitSchedulerDbSubthreadsComplete(); + + //Get the address of the Repack Request + cta::objectstore::RootEntry re(backend); + re.fetchNoLock(); + + std::string repackQueueAddress = re.getRepackQueueAddress(RepackQueueType::Pending); + + cta::objectstore::RepackQueuePending repackQueuePending(repackQueueAddress,backend); + repackQueuePending.fetchNoLock(); + + std::string repackRequestAddress = repackQueuePending.getCandidateList(1,{}).candidates.front().address; + + log::TimingList tl; + utils::Timer t; + + scheduler.promoteRepackRequestsToToExpand(lc); + scheduler.waitSchedulerDbSubthreadsComplete(); + + auto repackRequestToExpand = scheduler.getNextRepackRequestToExpand(); + //scheduler.expandRepackRequest(repackRequestToExpand,tl,t,lc); + scheduler.waitSchedulerDbSubthreadsComplete(); + + ASSERT_EQ(vid,repackRequestToExpand->getRepackInfo().vid); + + scheduler.expandRepackRequest(repackRequestToExpand,tl,t,lc); + + { + cta::objectstore::RepackRequest rr(repackRequestAddress,backend); + rr.fetchNoLock(); + //As storage class nbcopies = 3 and as the 10 files already archived have 1 copy in CTA, + //The repack request should have 20 files to archive + ASSERT_EQ(20,rr.getTotalStatsFile().totalFilesToArchive); + ASSERT_EQ(20*archiveFileSize, rr.getTotalStatsFile().totalBytesToArchive); + //The number of files to Retrieve remains the same + ASSERT_EQ(10,rr.getTotalStatsFile().totalFilesToRetrieve); + ASSERT_EQ(10*archiveFileSize,rr.getTotalStatsFile().totalBytesToRetrieve); + } + } + + { + std::unique_ptr<cta::TapeMount> mount; + mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + ASSERT_NE(nullptr, mount.get()); + ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); + std::unique_ptr<cta::RetrieveMount> retrieveMount; + retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(mount.release())); + ASSERT_NE(nullptr, retrieveMount.get()); + std::unique_ptr<cta::RetrieveJob> retrieveJob; + + std::list<std::unique_ptr<cta::RetrieveJob>> executedJobs; + //For each tape we will see if the retrieve jobs are not null + for(uint64_t j = 1; j<=nbArchiveFilesPerTape; ++j) + { + auto jobBatch = retrieveMount->getNextJobBatch(1,archiveFileSize,lc); + retrieveJob.reset(jobBatch.front().release()); + ASSERT_NE(nullptr, retrieveJob.get()); + executedJobs.push_back(std::move(retrieveJob)); + } + //Now, report the retrieve jobs to be completed + castor::tape::tapeserver::daemon::RecallReportPacker rrp(retrieveMount.get(),lc); + + rrp.startThreads(); + + //Report all jobs as succeeded + for(auto it = executedJobs.begin(); it != executedJobs.end(); ++it) + { + rrp.reportCompletedJob(std::move(*it)); + } + + rrp.setDiskDone(); + rrp.setTapeDone(); + + rrp.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unmounting); + + rrp.reportEndOfSession(); + rrp.waitThread(); + + ASSERT_TRUE(rrp.allThreadsDone()); + } + { + //Do the reporting of RetrieveJobs, will transform the Retrieve request in Archive requests + while (true) { + auto rep = schedulerDB.getNextRepackReportBatch(lc); + if (nullptr == rep) break; + rep->report(lc); + } + } + //All retrieve have been successfully executed, let's see if there are 2 mount for different vids with 10 files + //per batch + { + scheduler.waitSchedulerDbSubthreadsComplete(); + { + //The first mount given by the scheduler should be the vidDestination1 that belongs to the tapepool1 + std::unique_ptr<cta::TapeMount> mount; + mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + ASSERT_NE(nullptr, mount.get()); + ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForRepack, mount.get()->getMountType()); + + std::unique_ptr<cta::ArchiveMount> archiveMount; + archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release())); + ASSERT_NE(nullptr, archiveMount.get()); + + { + auto jobBatch = archiveMount->getNextJobBatch(20,20 * archiveFileSize,lc); + ASSERT_EQ(10,jobBatch.size()); + ASSERT_EQ(vidDestination1,archiveMount->getVid()); + } + } + + { + //Second mount should be the vidDestination2 that belongs to the tapepool2 + std::unique_ptr<cta::TapeMount> mount; + mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + ASSERT_NE(nullptr, mount.get()); + ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForRepack, mount.get()->getMountType()); + + std::unique_ptr<cta::ArchiveMount> archiveMount; + archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release())); + ASSERT_NE(nullptr, archiveMount.get()); + + { + auto jobBatch = archiveMount->getNextJobBatch(20,20 * archiveFileSize,lc); + ASSERT_EQ(10,jobBatch.size()); + ASSERT_EQ(vidDestination2,archiveMount->getVid()); + } + } + } +} + +TEST_P(SchedulerTest, expandRepackRequestMoveAndAddCopies){ + using namespace cta; + using namespace cta::objectstore; + unitTests::TempDirectory tempDirectory; + auto &catalogue = getCatalogue(); + auto &scheduler = getScheduler(); + auto &schedulerDB = getSchedulerDB(); + cta::objectstore::Backend& backend = schedulerDB.getBackend(); + setupDefaultCatalogue(); +#ifdef STDOUT_LOGGING + log::StdoutLogger dl("dummy", "unitTest"); +#else + log::DummyLogger dl("", ""); +#endif + log::LogContext lc(dl); + + //Create an agent to represent this test process + cta::objectstore::AgentReference agentReference("expandRepackRequestTest", dl); + cta::objectstore::Agent agent(agentReference.getAgentAddress(), backend); + agent.initialize(); + agent.setTimeout_us(100); + agent.insertAndRegisterSelf(lc); + + const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000; + const bool disabledValue = false; + const bool fullValue = true; + const bool readOnlyValue = false; + const std::string comment = "Create tape"; + cta::common::dataStructures::SecurityIdentity admin; + admin.username = "admin_user_name"; + admin.host = "admin_host"; + + //Create a logical library in the catalogue + const bool logicalLibraryIsDisabled = false; + catalogue.createLogicalLibrary(admin, s_libraryName, logicalLibraryIsDisabled, "Create logical library"); + + //Create the source tape + std::string vid = "vidSource"; + catalogue.createTape(s_adminOnAdminHost,vid, s_mediaType, s_vendor, s_libraryName, s_tapePoolName, capacityInBytes, + disabledValue, fullValue, readOnlyValue, comment); + + //Create two different destination tapepool + std::string tapepool2Name = "tapepool2"; + const cta::optional<std::string> supply; + catalogue.createTapePool(admin,tapepool2Name,"vo",1,false,supply,"comment"); + + std::string tapepool3Name = "tapepool3"; + catalogue.createTapePool(admin,tapepool3Name,"vo",1,false,supply,"comment"); + + //Create a storage class in the catalogue + common::dataStructures::StorageClass storageClass; + storageClass.diskInstance = s_diskInstance; + storageClass.name = s_storageClassName; + storageClass.nbCopies = 3; + storageClass.comment = "Create storage class"; + catalogue.modifyStorageClassNbCopies(admin,storageClass.diskInstance,storageClass.name,storageClass.nbCopies); + + //Create the two archive routes for the new copies + catalogue.createArchiveRoute(admin,storageClass.diskInstance,storageClass.name,2,tapepool2Name,"ArchiveRoute2"); + catalogue.createArchiveRoute(admin,storageClass.diskInstance,storageClass.name,3,tapepool3Name,"ArchiveRoute3"); + + //Create two other destinationTape and one for the move workflow + std::string vidDestination1 = "vidDestination1"; + catalogue.createTape(s_adminOnAdminHost,vidDestination1, s_mediaType, s_vendor, s_libraryName, tapepool2Name, capacityInBytes, + disabledValue, false, readOnlyValue, comment); + + std::string vidDestination2 = "vidDestination2"; + catalogue.createTape(s_adminOnAdminHost,vidDestination2, s_mediaType, s_vendor, s_libraryName, tapepool3Name, capacityInBytes, + disabledValue, false, readOnlyValue, comment); + + std::string vidMove = "vidMove"; + catalogue.createTape(s_adminOnAdminHost,vidMove, s_mediaType, s_vendor, s_libraryName, s_tapePoolName, capacityInBytes, + disabledValue, false, readOnlyValue, comment); + + const std::string tapeDrive = "tape_drive"; + const uint64_t nbArchiveFilesPerTape = 10; + const uint64_t archiveFileSize = 2 * 1000 * 1000 * 1000; + + //Simulate the writing of 10 files the source tape in the catalogue + std::set<catalogue::TapeItemWrittenPointer> tapeFilesWrittenCopy1; + { + uint64_t archiveFileId = 1; + std::string currentVid = vid; + for(uint64_t j = 1; j <= nbArchiveFilesPerTape; ++j) { + std::ostringstream diskFileId; + diskFileId << (12345677 + archiveFileId); + std::ostringstream diskFilePath; + diskFilePath << "/public_dir/public_file_"<<1<<"_"<< j; + auto fileWrittenUP=cta::make_unique<cta::catalogue::TapeFileWritten>(); + auto & fileWritten = *fileWrittenUP; + fileWritten.archiveFileId = archiveFileId++; + fileWritten.diskInstance = storageClass.diskInstance; + fileWritten.diskFileId = diskFileId.str(); + fileWritten.diskFilePath = diskFilePath.str(); + fileWritten.diskFileOwnerUid = PUBLIC_OWNER_UID; + fileWritten.diskFileGid = PUBLIC_GID; + fileWritten.size = archiveFileSize; + fileWritten.checksumBlob.insert(cta::checksum::ADLER32,"1234"); + fileWritten.storageClassName = s_storageClassName; + fileWritten.vid = currentVid; + fileWritten.fSeq = j; + fileWritten.blockId = j * 100; + fileWritten.size = archiveFileSize; + fileWritten.copyNb = 1; + fileWritten.tapeDrive = tapeDrive; + tapeFilesWrittenCopy1.emplace(fileWrittenUP.release()); + } + //update the DB tape + catalogue.filesWrittenToTape(tapeFilesWrittenCopy1); + tapeFilesWrittenCopy1.clear(); + } + //Test the expanding requeue the Repack after the creation of + //one retrieve request + scheduler.waitSchedulerDbSubthreadsComplete(); + { + scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveAndAddCopies,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack,s_defaultRepackDisabledTapeFlag, lc); + scheduler.waitSchedulerDbSubthreadsComplete(); + + //Get the address of the Repack Request + cta::objectstore::RootEntry re(backend); + re.fetchNoLock(); + + std::string repackQueueAddress = re.getRepackQueueAddress(RepackQueueType::Pending); + + cta::objectstore::RepackQueuePending repackQueuePending(repackQueueAddress,backend); + repackQueuePending.fetchNoLock(); + + std::string repackRequestAddress = repackQueuePending.getCandidateList(1,{}).candidates.front().address; + + log::TimingList tl; + utils::Timer t; + + scheduler.promoteRepackRequestsToToExpand(lc); + scheduler.waitSchedulerDbSubthreadsComplete(); + + auto repackRequestToExpand = scheduler.getNextRepackRequestToExpand(); + + scheduler.waitSchedulerDbSubthreadsComplete(); + + ASSERT_EQ(vid,repackRequestToExpand->getRepackInfo().vid); + + scheduler.expandRepackRequest(repackRequestToExpand,tl,t,lc); + + { + cta::objectstore::RepackRequest rr(repackRequestAddress,backend); + rr.fetchNoLock(); + //As storage class nbcopies = 3 and as the 10 files already archived have 1 copy in CTA, + //The repack request should have 20 files to archive + ASSERT_EQ(30,rr.getTotalStatsFile().totalFilesToArchive); + ASSERT_EQ(30*archiveFileSize, rr.getTotalStatsFile().totalBytesToArchive); + //The number of files to Retrieve remains the same + ASSERT_EQ(10,rr.getTotalStatsFile().totalFilesToRetrieve); + ASSERT_EQ(10*archiveFileSize,rr.getTotalStatsFile().totalBytesToRetrieve); + } + } + + { + std::unique_ptr<cta::TapeMount> mount; + mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + ASSERT_NE(nullptr, mount.get()); + ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); + std::unique_ptr<cta::RetrieveMount> retrieveMount; + retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(mount.release())); + ASSERT_NE(nullptr, retrieveMount.get()); + std::unique_ptr<cta::RetrieveJob> retrieveJob; + + std::list<std::unique_ptr<cta::RetrieveJob>> executedJobs; + //For each tape we will see if the retrieve jobs are not null + for(uint64_t j = 1; j<=nbArchiveFilesPerTape; ++j) + { + auto jobBatch = retrieveMount->getNextJobBatch(1,archiveFileSize,lc); + retrieveJob.reset(jobBatch.front().release()); + ASSERT_NE(nullptr, retrieveJob.get()); + executedJobs.push_back(std::move(retrieveJob)); + } + //Now, report the retrieve jobs to be completed + castor::tape::tapeserver::daemon::RecallReportPacker rrp(retrieveMount.get(),lc); + + rrp.startThreads(); + + //Report all jobs as succeeded + for(auto it = executedJobs.begin(); it != executedJobs.end(); ++it) + { + rrp.reportCompletedJob(std::move(*it)); + } + + rrp.setDiskDone(); + rrp.setTapeDone(); + + rrp.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unmounting); + + rrp.reportEndOfSession(); + rrp.waitThread(); + + ASSERT_TRUE(rrp.allThreadsDone()); + } + { + //Do the reporting of RetrieveJobs, will transform the Retrieve request in Archive requests + while (true) { + auto rep = schedulerDB.getNextRepackReportBatch(lc); + if (nullptr == rep) break; + rep->report(lc); + } + } + //All retrieve have been successfully executed, let's see if there are 2 mount for different vids with 10 files + //per batch + { + scheduler.waitSchedulerDbSubthreadsComplete(); + { + //The first mount given by the scheduler should be the vidMove that belongs to the TapePool tapepool + std::unique_ptr<cta::TapeMount> mount; + mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + ASSERT_NE(nullptr, mount.get()); + ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForRepack, mount.get()->getMountType()); + + std::unique_ptr<cta::ArchiveMount> archiveMount; + archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release())); + ASSERT_NE(nullptr, archiveMount.get()); + + { + auto jobBatch = archiveMount->getNextJobBatch(20,20 * archiveFileSize,lc); + ASSERT_EQ(10,jobBatch.size()); + ASSERT_EQ(vidMove,archiveMount->getVid()); + } + } + + { + //Second mount should be the vidDestination1 that belongs to the tapepool + std::unique_ptr<cta::TapeMount> mount; + mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + ASSERT_NE(nullptr, mount.get()); + ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForRepack, mount.get()->getMountType()); + + std::unique_ptr<cta::ArchiveMount> archiveMount; + archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release())); + ASSERT_NE(nullptr, archiveMount.get()); + + { + auto jobBatch = archiveMount->getNextJobBatch(20,20 * archiveFileSize,lc); + ASSERT_EQ(10,jobBatch.size()); + ASSERT_EQ(vidDestination1,archiveMount->getVid()); + } + } + + { + //Third mount should be the vidDestination2 that belongs to the same tapepool as the repacked tape + std::unique_ptr<cta::TapeMount> mount; + mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + ASSERT_NE(nullptr, mount.get()); + ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForRepack, mount.get()->getMountType()); + + std::unique_ptr<cta::ArchiveMount> archiveMount; + archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release())); + ASSERT_NE(nullptr, archiveMount.get()); + + { + auto jobBatch = archiveMount->getNextJobBatch(20,20 * archiveFileSize,lc); + ASSERT_EQ(10,jobBatch.size()); + ASSERT_EQ(vidDestination2,archiveMount->getVid()); + } + } + } +} #undef TEST_MOCK_DB #ifdef TEST_MOCK_DB diff --git a/scheduler/testingMocks/MockArchiveMount.hpp b/scheduler/testingMocks/MockArchiveMount.hpp index 48593ff7cd003e3bfa76dcabc18bf149c0407ced..fac5e6a11cd7abfcc36c9e2cb288bd530f7efe54 100644 --- a/scheduler/testingMocks/MockArchiveMount.hpp +++ b/scheduler/testingMocks/MockArchiveMount.hpp @@ -50,7 +50,7 @@ namespace cta { } void reportJobsBatchTransferred(std::queue<std::unique_ptr<cta::ArchiveJob> >& successfulArchiveJobs, - std::queue<cta::catalogue::TapeItemWritten> & skippedFiles, cta::log::LogContext& logContext) override { + std::queue<cta::catalogue::TapeItemWritten> & skippedFiles, std::queue<std::unique_ptr<cta::ArchiveJob>>& failedToReportArchiveJobs, cta::log::LogContext& logContext) override { try { std::set<cta::catalogue::TapeItemWrittenPointer> tapeItemsWritten; std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs; diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp index 6f37b52595d4a0fabc5e1e1f286eb44db50c1137..27fe5b7629843bdd4949130398df4374536255cc 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp @@ -245,8 +245,18 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPa reportPacker.m_lc.log(cta::log::INFO,"Received a flush report from tape, but had no file to report to client. Doing nothing."); return; } - reportPacker.m_archiveMount->reportJobsBatchTransferred(reportPacker.m_successfulArchiveJobs, reportPacker.m_skippedFiles, + std::queue<std::unique_ptr<cta::ArchiveJob>> failedToReportArchiveJobs; + try{ + reportPacker.m_archiveMount->reportJobsBatchTransferred(reportPacker.m_successfulArchiveJobs, reportPacker.m_skippedFiles, failedToReportArchiveJobs, reportPacker.m_lc); + } catch(const cta::ArchiveMount::FailedMigrationRecallResult &ex){ + while(!failedToReportArchiveJobs.empty()){ + auto archiveJob = std::move(failedToReportArchiveJobs.front()); + archiveJob->transferFailed(ex.getMessageValue(),reportPacker.m_lc); + failedToReportArchiveJobs.pop(); + } + throw ex; + } } else { // This is an abnormal situation: we should never flush after an error! reportPacker.m_lc.log(cta::log::ALERT,"Received a flush after an error: sending file errors to client");