diff --git a/catalogue/Catalogue.hpp b/catalogue/Catalogue.hpp index 844d7d44245c3c8bad88f264efbd4ee110571bde..d7d692b6ff7b8109212aa9983f7a83fb0101e0d6 100644 --- a/catalogue/Catalogue.hpp +++ b/catalogue/Catalogue.hpp @@ -251,7 +251,7 @@ public: const common::dataStructures::SecurityIdentity &admin, const std::string &diskInstanceName, const std::string &storageClassName, - const uint64_t copyNb, + const uint32_t copyNb, const std::string &tapePoolName, const std::string &comment) = 0; @@ -267,11 +267,11 @@ public: virtual void deleteArchiveRoute( const std::string &diskInstanceName, const std::string &storageClassName, - const uint64_t copyNb) = 0; + const uint32_t copyNb) = 0; virtual std::list<common::dataStructures::ArchiveRoute> getArchiveRoutes() const = 0; - virtual void modifyArchiveRouteTapePoolName(const common::dataStructures::SecurityIdentity &admin, const std::string &instanceName, const std::string &storageClassName, const uint64_t copyNb, const std::string &tapePoolName) = 0; - virtual void modifyArchiveRouteComment(const common::dataStructures::SecurityIdentity &admin, const std::string &instanceName, const std::string &storageClassName, const uint64_t copyNb, const std::string &comment) = 0; + virtual void modifyArchiveRouteTapePoolName(const common::dataStructures::SecurityIdentity &admin, const std::string &instanceName, const std::string &storageClassName, const uint32_t copyNb, const std::string &tapePoolName) = 0; + virtual void modifyArchiveRouteComment(const common::dataStructures::SecurityIdentity &admin, const std::string &instanceName, const std::string &storageClassName, const uint32_t copyNb, const std::string &comment) = 0; virtual void createLogicalLibrary(const common::dataStructures::SecurityIdentity &admin, const std::string &name, const std::string &comment) = 0; virtual void deleteLogicalLibrary(const std::string &name) = 0; diff --git a/catalogue/CatalogueRetryWrapper.hpp b/catalogue/CatalogueRetryWrapper.hpp index 1d87f366cef24e92d86d495a520fd984de29fe83..b88f30c1719b7885ca8f0064d5ac59c9e18de490 100644 --- a/catalogue/CatalogueRetryWrapper.hpp +++ b/catalogue/CatalogueRetryWrapper.hpp @@ -165,11 +165,11 @@ public: return retryOnLostConnection(m_log, [&]{return m_catalogue->setTapePoolEncryption(admin, name, encryptionValue);}, m_maxTriesToConnect); } - void createArchiveRoute(const common::dataStructures::SecurityIdentity &admin, const std::string &diskInstanceName, const std::string &storageClassName, const uint64_t copyNb, const std::string &tapePoolName, const std::string &comment) override { + void createArchiveRoute(const common::dataStructures::SecurityIdentity &admin, const std::string &diskInstanceName, const std::string &storageClassName, const uint32_t copyNb, const std::string &tapePoolName, const std::string &comment) override { return retryOnLostConnection(m_log, [&]{return m_catalogue->createArchiveRoute(admin, diskInstanceName, storageClassName, copyNb, tapePoolName, comment);}, m_maxTriesToConnect); } - void deleteArchiveRoute(const std::string &diskInstanceName, const std::string &storageClassName, const uint64_t copyNb) override { + void deleteArchiveRoute(const std::string &diskInstanceName, const std::string &storageClassName, const uint32_t copyNb) override { return retryOnLostConnection(m_log, [&]{return m_catalogue->deleteArchiveRoute(diskInstanceName, storageClassName, copyNb);}, m_maxTriesToConnect); } @@ -177,11 +177,11 @@ public: return retryOnLostConnection(m_log, [&]{return m_catalogue->getArchiveRoutes();}, m_maxTriesToConnect); } - void modifyArchiveRouteTapePoolName(const common::dataStructures::SecurityIdentity &admin, const std::string &instanceName, const std::string &storageClassName, const uint64_t copyNb, const std::string &tapePoolName) override { + void modifyArchiveRouteTapePoolName(const common::dataStructures::SecurityIdentity &admin, const std::string &instanceName, const std::string &storageClassName, const uint32_t copyNb, const std::string &tapePoolName) override { return retryOnLostConnection(m_log, [&]{return m_catalogue->modifyArchiveRouteTapePoolName(admin, instanceName, storageClassName, copyNb, tapePoolName);}, m_maxTriesToConnect); } - void modifyArchiveRouteComment(const common::dataStructures::SecurityIdentity &admin, const std::string &instanceName, const std::string &storageClassName, const uint64_t copyNb, const std::string &comment) override { + void modifyArchiveRouteComment(const common::dataStructures::SecurityIdentity &admin, const std::string &instanceName, const std::string &storageClassName, const uint32_t copyNb, const std::string &comment) override { return retryOnLostConnection(m_log, [&]{return m_catalogue->modifyArchiveRouteComment(admin, instanceName, storageClassName, copyNb, comment);}, m_maxTriesToConnect); } diff --git a/catalogue/CatalogueTest.cpp b/catalogue/CatalogueTest.cpp index e60775a4d7ba0f01c9cc9ea3ce777163d243a0ee..cb2c4b478e03356e76be6d1255778227d3d4c7dc 100644 --- a/catalogue/CatalogueTest.cpp +++ b/catalogue/CatalogueTest.cpp @@ -1341,7 +1341,7 @@ TEST_P(cta_catalogue_CatalogueTest, createArchiveRoute) { const bool isEncrypted = true; m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, "Create tape pool"); - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string comment = "Create archive route"; m_catalogue->createArchiveRoute(m_admin, storageClass.diskInstance, storageClass.name, copyNb, tapePoolName, comment); @@ -1385,7 +1385,7 @@ TEST_P(cta_catalogue_CatalogueTest, createArchiveRoute_emptyStringDiskInstanceNa m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, "Create tape pool"); const std::string diskInstanceName = ""; - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string comment = "Create archive route"; ASSERT_THROW(m_catalogue->createArchiveRoute(m_admin, diskInstanceName, storageClass.name, copyNb, tapePoolName, comment), catalogue::UserSpecifiedAnEmptyStringDiskInstanceName); @@ -1412,7 +1412,7 @@ TEST_P(cta_catalogue_CatalogueTest, createArchiveRoute_emptyStringStorageClassNa m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, "Create tape pool"); const std::string storageClassName = ""; - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string comment = "Create archive route"; ASSERT_THROW(m_catalogue->createArchiveRoute(m_admin, storageClass.diskInstance, storageClassName, copyNb, tapePoolName, comment), catalogue::UserSpecifiedAnEmptyStringStorageClassName); @@ -1438,7 +1438,7 @@ TEST_P(cta_catalogue_CatalogueTest, createArchiveRoute_zeroCopyNb) { const bool isEncrypted = true; m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, "Create tape pool"); - const uint64_t copyNb = 0; + const uint32_t copyNb = 0; const std::string comment = "Create archive route"; ASSERT_THROW(m_catalogue->createArchiveRoute(m_admin, storageClass.diskInstance, storageClass.name, copyNb, tapePoolName, comment), catalogue::UserSpecifiedAZeroCopyNb); @@ -1459,7 +1459,7 @@ TEST_P(cta_catalogue_CatalogueTest, createArchiveRoute_emptyStringTapePoolName) m_catalogue->createStorageClass(m_admin, storageClass); const std::string tapePoolName = ""; - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string comment = "Create archive route"; ASSERT_THROW(m_catalogue->createArchiveRoute(m_admin, storageClass.diskInstance, storageClass.name, copyNb, tapePoolName, comment), catalogue::UserSpecifiedAnEmptyStringTapePoolName); @@ -1485,7 +1485,7 @@ TEST_P(cta_catalogue_CatalogueTest, createArchiveRoute_emptyStringComment) { const bool isEncrypted = true; m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, "Create tape pool"); - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string comment = ""; ASSERT_THROW(m_catalogue->createArchiveRoute(m_admin, storageClass.diskInstance, storageClass.name, copyNb, tapePoolName, comment), catalogue::UserSpecifiedAnEmptyStringComment); @@ -1507,7 +1507,7 @@ TEST_P(cta_catalogue_CatalogueTest, createArchiveRoute_non_existent_storage_clas const bool isEncrypted = true; m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, "Create tape pool"); - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string comment = "Create archive route"; ASSERT_THROW(m_catalogue->createArchiveRoute(m_admin, diskInstanceName, storageClassName, copyNb, tapePoolName, comment), exception::UserError); @@ -1529,7 +1529,7 @@ TEST_P(cta_catalogue_CatalogueTest, createArchiveRoute_non_existent_tape_pool) { const std::string tapePoolName = "tape_pool"; - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string comment = "Create archive route"; ASSERT_THROW(m_catalogue->createArchiveRoute(m_admin, storageClass.diskInstance, storageClass.name, copyNb, @@ -1564,7 +1564,7 @@ TEST_P(cta_catalogue_CatalogueTest, createArchiveRoute_same_name_different_disk_ const bool isEncrypted = true; m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, "Create tape pool"); - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string comment = "Create archive route"; m_catalogue->createArchiveRoute(m_admin, storageClass1DiskInstance1.diskInstance, storageClass1DiskInstance1.name, copyNb, tapePoolName, comment); @@ -1616,7 +1616,7 @@ TEST_P(cta_catalogue_CatalogueTest, createArchiveRoute_same_twice) { const bool isEncrypted = true; m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, "Create tape pool"); - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string comment = "Create archive route"; m_catalogue->createArchiveRoute(m_admin, storageClass.diskInstance, storageClass.name, copyNb, tapePoolName, comment); ASSERT_THROW(m_catalogue->createArchiveRoute(m_admin, storageClass.diskInstance, storageClass.name, copyNb, @@ -1643,7 +1643,7 @@ TEST_P(cta_catalogue_CatalogueTest, deleteArchiveRoute) { const bool isEncrypted = true; m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, "Create tape pool"); - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string comment = "Create archive route"; m_catalogue->createArchiveRoute(m_admin, storageClass.diskInstance, storageClass.name, copyNb, tapePoolName, comment); @@ -1696,7 +1696,7 @@ TEST_P(cta_catalogue_CatalogueTest, createArchiveRoute_deleteStorageClass) { const bool isEncrypted = true; m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, "Create tape pool"); - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string comment = "Create archive route"; m_catalogue->createArchiveRoute(m_admin, storageClass.diskInstance, storageClass.name, copyNb, tapePoolName, comment); @@ -1745,7 +1745,7 @@ TEST_P(cta_catalogue_CatalogueTest, modifyArchiveRouteTapePoolName) { const std::string anotherTapePoolName = "another_tape_pool"; m_catalogue->createTapePool(m_admin, anotherTapePoolName, vo, nbPartialTapes, isEncrypted, "Create another tape pool"); - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string comment = "Create archive route"; m_catalogue->createArchiveRoute(m_admin, storageClass.diskInstance, storageClass.name, copyNb, tapePoolName, comment); @@ -1810,7 +1810,7 @@ TEST_P(cta_catalogue_CatalogueTest, modifyArchiveRouteTapePoolName_nonExistentAr const bool isEncrypted = true; m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, "Create tape pool"); - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; ASSERT_THROW(m_catalogue->modifyArchiveRouteTapePoolName(m_admin, storageClass.diskInstance, storageClass.name, copyNb, tapePoolName), exception::UserError); } @@ -1835,7 +1835,7 @@ TEST_P(cta_catalogue_CatalogueTest, modifyArchiveRouteComment) { const bool isEncrypted = true; m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, "Create tape pool"); - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string comment = "Create archive route"; m_catalogue->createArchiveRoute(m_admin, storageClass.diskInstance, storageClass.name, copyNb, tapePoolName, comment); @@ -1902,7 +1902,7 @@ TEST_P(cta_catalogue_CatalogueTest, modifyArchiveRouteComment_nonExistentArchive const bool isEncrypted = true; m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, "Create tape pool"); - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string comment = "Comment"; ASSERT_THROW(m_catalogue->modifyArchiveRouteComment(m_admin, storageClass.diskInstance, storageClass.name, copyNb, comment), exception::UserError); @@ -5701,7 +5701,7 @@ TEST_P(cta_catalogue_CatalogueTest, checkAndGetNextArchiveFileId_no_mount_rules) const bool isEncrypted = true; m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, "Create tape pool"); - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string archiveRouteComment = "Create archive route"; m_catalogue->createArchiveRoute(m_admin, storageClass.diskInstance, storageClass.name, copyNb, tapePoolName, archiveRouteComment); @@ -5786,7 +5786,7 @@ TEST_P(cta_catalogue_CatalogueTest, checkAndGetNextArchiveFileId_requester_mount const bool isEncrypted = true; m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, "Create tape pool"); - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string archiveRouteComment = "Create archive route"; m_catalogue->createArchiveRoute(m_admin, storageClass.diskInstance, storageClass.name, copyNb, tapePoolName, archiveRouteComment); @@ -5876,7 +5876,7 @@ TEST_P(cta_catalogue_CatalogueTest, checkAndGetNextArchiveFileId_requester_group const bool isEncrypted = true; m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, "Create tape pool"); - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string archiveRouteComment = "Create archive route"; m_catalogue->createArchiveRoute(m_admin, storageClass.diskInstance, storageClass.name, copyNb, tapePoolName, archiveRouteComment); @@ -5985,7 +5985,7 @@ TEST_P(cta_catalogue_CatalogueTest, checkAndGetNextArchiveFileId_requester_mount const bool isEncrypted = true; m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, "Create tape pool"); - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string archiveRouteComment = "Create archive route"; m_catalogue->createArchiveRoute(m_admin, storageClass.diskInstance, storageClass.name, copyNb, tapePoolName, archiveRouteComment); @@ -6134,7 +6134,7 @@ TEST_P(cta_catalogue_CatalogueTest, getArchiveFileQueueCriteria_requester_mount_ const bool isEncrypted = true; m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, "Create tape pool"); - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string archiveRouteComment = "Create archive route"; m_catalogue->createArchiveRoute(m_admin, storageClass.diskInstance, storageClass.name, copyNb, tapePoolName, archiveRouteComment); @@ -6216,7 +6216,7 @@ TEST_P(cta_catalogue_CatalogueTest, getArchiveFileQueueCriteria_requester_group_ const bool isEncrypted = true; m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, "Create tape pool"); - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string archiveRouteComment = "Create archive route"; m_catalogue->createArchiveRoute(m_admin, storageClass.diskInstance, storageClass.name, copyNb, tapePoolName, archiveRouteComment); @@ -6317,7 +6317,7 @@ TEST_P(cta_catalogue_CatalogueTest, getArchiveFileQueueCriteria_requester_mount_ const bool isEncrypted = true; m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, "Create tape pool"); - const uint64_t copyNb = 1; + const uint32_t copyNb = 1; const std::string archiveRouteComment = "Create archive route"; m_catalogue->createArchiveRoute(m_admin, storageClass.diskInstance, storageClass.name, copyNb, tapePoolName, archiveRouteComment); @@ -7578,7 +7578,7 @@ TEST_P(cta_catalogue_CatalogueTest, filesWrittenToTape_many_archive_files) { } } - for(uint64_t copyNb = 1; copyNb <= 2; copyNb++) { + for(uint32_t copyNb = 1; copyNb <= 2; copyNb++) { const std::string vid = copyNb == 1 ? vid1 : vid2; const uint64_t startFseq = 1; const uint64_t maxNbFiles = nbArchiveFiles; @@ -7644,7 +7644,7 @@ TEST_P(cta_catalogue_CatalogueTest, filesWrittenToTape_many_archive_files) { } } - for(uint64_t copyNb = 1; copyNb <= 2; copyNb++) { + for(uint32_t copyNb = 1; copyNb <= 2; copyNb++) { const std::string vid = copyNb == 1 ? vid1 : vid2; const uint64_t startFseq = 1; const uint64_t maxNbFiles = nbArchiveFiles / 2; @@ -7710,7 +7710,7 @@ TEST_P(cta_catalogue_CatalogueTest, filesWrittenToTape_many_archive_files) { } } - for(uint64_t copyNb = 1; copyNb <= 2; copyNb++) { + for(uint32_t copyNb = 1; copyNb <= 2; copyNb++) { const std::string vid = copyNb == 1 ? vid1 : vid2; const uint64_t startFseq = nbArchiveFiles / 2 + 1; const uint64_t maxNbFiles = nbArchiveFiles / 2; diff --git a/catalogue/DummyCatalogue.hpp b/catalogue/DummyCatalogue.hpp index 8c2277c3aea3b63189e02172e37ec325db4a6ed9..6bf982b5f5d56f1a5905c7c37d22bf990b37ca38 100644 --- a/catalogue/DummyCatalogue.hpp +++ b/catalogue/DummyCatalogue.hpp @@ -35,7 +35,7 @@ public: virtual ~DummyCatalogue() { } void createAdminUser(const common::dataStructures::SecurityIdentity& admin, const std::string& username, const std::string& comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } - void createArchiveRoute(const common::dataStructures::SecurityIdentity& admin, const std::string& diskInstanceName, const std::string& storageClassName, const uint64_t copyNb, const std::string& tapePoolName, const std::string& comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void createArchiveRoute(const common::dataStructures::SecurityIdentity& admin, const std::string& diskInstanceName, const std::string& storageClassName, const uint32_t copyNb, const std::string& tapePoolName, const std::string& comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } void createLogicalLibrary(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const std::string& comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } void createMountPolicy(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const uint64_t archivePriority, const uint64_t minArchiveRequestAge, const uint64_t retrievePriority, const uint64_t minRetrieveRequestAge, const uint64_t maxDrivesAllowed, const std::string& comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } void createRequesterGroupMountRule(const common::dataStructures::SecurityIdentity& admin, const std::string& mountPolicyName, const std::string& diskInstanceName, const std::string& requesterGroupName, const std::string& comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } @@ -45,7 +45,7 @@ public: void createTapePool(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const std::string & vo, const uint64_t nbPartialTapes, const bool encryptionValue, const std::string& comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } void deleteAdminUser(const std::string& username) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } void deleteArchiveFile(const std::string& instanceName, const uint64_t archiveFileId, log::LogContext &lc) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } - void deleteArchiveRoute(const std::string& diskInstanceName, const std::string& storageClassName, const uint64_t copyNb) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void deleteArchiveRoute(const std::string& diskInstanceName, const std::string& storageClassName, const uint32_t copyNb) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } void deleteLogicalLibrary(const std::string& name) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } void deleteMountPolicy(const std::string& name) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } void deleteRequesterGroupMountRule(const std::string& diskInstanceName, const std::string& requesterGroupName) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } @@ -73,8 +73,8 @@ public: std::list<TapeForWriting> getTapesForWriting(const std::string& logicalLibraryName) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } bool isAdmin(const common::dataStructures::SecurityIdentity& admin) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } void modifyAdminUserComment(const common::dataStructures::SecurityIdentity& admin, const std::string& username, const std::string& comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } - void modifyArchiveRouteComment(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& storageClassName, const uint64_t copyNb, const std::string& comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } - void modifyArchiveRouteTapePoolName(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& storageClassName, const uint64_t copyNb, const std::string& tapePoolName) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyArchiveRouteComment(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& storageClassName, const uint32_t copyNb, const std::string& comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyArchiveRouteTapePoolName(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& storageClassName, const uint32_t copyNb, const std::string& tapePoolName) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } void modifyLogicalLibraryComment(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const std::string& comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } void modifyMountPolicyArchiveMinRequestAge(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const uint64_t minArchiveRequestAge) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } void modifyMountPolicyArchivePriority(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const uint64_t archivePriority) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } diff --git a/catalogue/RdbmsCatalogue.cpp b/catalogue/RdbmsCatalogue.cpp index 5dbdde05af4c1d62fc181293726269878d1f97e0..4d2eccc1af63b5fee7e5a0922cd996f93b6b7ff8 100644 --- a/catalogue/RdbmsCatalogue.cpp +++ b/catalogue/RdbmsCatalogue.cpp @@ -825,7 +825,7 @@ bool RdbmsCatalogue::diskFileGroupExists(rdbms::Conn &conn, const std::string &d // archiveRouteExists //------------------------------------------------------------------------------ bool RdbmsCatalogue::archiveRouteExists(rdbms::Conn &conn, const std::string &diskInstanceName, - const std::string &storageClassName, const uint64_t copyNb) const { + const std::string &storageClassName, const uint32_t copyNb) const { try { const char *const sql = "SELECT " @@ -1125,7 +1125,7 @@ void RdbmsCatalogue::createArchiveRoute( const common::dataStructures::SecurityIdentity &admin, const std::string &diskInstanceName, const std::string &storageClassName, - const uint64_t copyNb, + const uint32_t copyNb, const std::string &tapePoolName, const std::string &comment) { try { @@ -1234,7 +1234,7 @@ void RdbmsCatalogue::createArchiveRoute( // deleteArchiveRoute //------------------------------------------------------------------------------ void RdbmsCatalogue::deleteArchiveRoute(const std::string &diskInstanceName, const std::string &storageClassName, - const uint64_t copyNb) { + const uint32_t copyNb) { try { const char *const sql = "DELETE FROM " @@ -1332,7 +1332,7 @@ std::list<common::dataStructures::ArchiveRoute> RdbmsCatalogue::getArchiveRoutes // modifyArchiveRouteTapePoolName //------------------------------------------------------------------------------ void RdbmsCatalogue::modifyArchiveRouteTapePoolName(const common::dataStructures::SecurityIdentity &admin, - const std::string &instanceName, const std::string &storageClassName, const uint64_t copyNb, + const std::string &instanceName, const std::string &storageClassName, const uint32_t copyNb, const std::string &tapePoolName) { try { const time_t now = time(nullptr); @@ -1381,7 +1381,7 @@ void RdbmsCatalogue::modifyArchiveRouteTapePoolName(const common::dataStructures // modifyArchiveRouteComment //------------------------------------------------------------------------------ void RdbmsCatalogue::modifyArchiveRouteComment(const common::dataStructures::SecurityIdentity &admin, - const std::string &instanceName, const std::string &storageClassName, const uint64_t copyNb, + const std::string &instanceName, const std::string &storageClassName, const uint32_t copyNb, const std::string &comment) { try { const time_t now = time(nullptr); @@ -4459,7 +4459,7 @@ common::dataStructures::TapeCopyToPoolMap RdbmsCatalogue::getTapeCopyToPoolMap(r stmt.bindString(":STORAGE_CLASS_NAME", storageClass.storageClassName); auto rset = stmt.executeQuery(); while (rset.next()) { - const uint64_t copyNb = rset.columnUint64("COPY_NB"); + const uint32_t copyNb = rset.columnUint64("COPY_NB"); const std::string tapePoolName = rset.columnString("TAPE_POOL_NAME"); copyToPoolMap[copyNb] = tapePoolName; } diff --git a/catalogue/RdbmsCatalogue.hpp b/catalogue/RdbmsCatalogue.hpp index a3036b80b7577403c075f71eb2edb54c1ccc5fc3..04854b7c47d3b29c65ca05703150295153268600 100644 --- a/catalogue/RdbmsCatalogue.hpp +++ b/catalogue/RdbmsCatalogue.hpp @@ -246,7 +246,7 @@ public: const common::dataStructures::SecurityIdentity &admin, const std::string &diskInstanceName, const std::string &storageClassName, - const uint64_t copyNb, + const uint32_t copyNb, const std::string &tapePoolName, const std::string &comment) override; @@ -263,11 +263,11 @@ public: void deleteArchiveRoute( const std::string &diskInstanceName, const std::string &storageClassName, - const uint64_t copyNb) override; + const uint32_t copyNb) override; std::list<common::dataStructures::ArchiveRoute> getArchiveRoutes() const override; - void modifyArchiveRouteTapePoolName(const common::dataStructures::SecurityIdentity &admin, const std::string &instanceName, const std::string &storageClassName, const uint64_t copyNb, const std::string &tapePoolName) override; - void modifyArchiveRouteComment(const common::dataStructures::SecurityIdentity &admin, const std::string &instanceName, const std::string &storageClassName, const uint64_t copyNb, const std::string &comment) override; + void modifyArchiveRouteTapePoolName(const common::dataStructures::SecurityIdentity &admin, const std::string &instanceName, const std::string &storageClassName, const uint32_t copyNb, const std::string &tapePoolName) override; + void modifyArchiveRouteComment(const common::dataStructures::SecurityIdentity &admin, const std::string &instanceName, const std::string &storageClassName, const uint32_t copyNb, const std::string &comment) override; void createLogicalLibrary(const common::dataStructures::SecurityIdentity &admin, const std::string &name, const std::string &comment) override; void deleteLogicalLibrary(const std::string &name) override; @@ -696,7 +696,7 @@ protected: * @return True if the archive route exists. */ bool archiveRouteExists(rdbms::Conn &conn, const std::string &diskInstanceName, const std::string &storageClassName, - const uint64_t copyNb) const; + const uint32_t copyNb) const; /** * Returns true if the specified tape exists. diff --git a/catalogue/TapeFileWritten.hpp b/catalogue/TapeFileWritten.hpp index 093b1556b2a2e1e4c3389cb92a176c6e397fbd65..09856cec75cc6d81d90f5dea562fbe3462f09c82 100644 --- a/catalogue/TapeFileWritten.hpp +++ b/catalogue/TapeFileWritten.hpp @@ -120,7 +120,7 @@ struct TapeFileWritten: public TapeItemWritten { /** * The copy number of the tape file. */ - uint64_t copyNb; + uint32_t copyNb; }; // struct TapeFileWritten diff --git a/common/dataStructures/ArchiveFile.hpp b/common/dataStructures/ArchiveFile.hpp index 287bdab2d3c461b4ab4087bb247d942aae3e2fee..725b83aa7c7aaa9a566cecdbcc02951242399187 100644 --- a/common/dataStructures/ArchiveFile.hpp +++ b/common/dataStructures/ArchiveFile.hpp @@ -69,7 +69,7 @@ struct ArchiveFile { * for a single tape, the map will contain only one element. */ std::map<uint64_t,TapeFile> tapeFiles; - time_t creationTime; + time_t creationTime; time_t reconciliationTime; }; // struct ArchiveFile diff --git a/common/dataStructures/ArchiveJob.hpp b/common/dataStructures/ArchiveJob.hpp index 09eb0cf64245a86245552dd99d5c12eceaeb559e..5073d6c975a5c480e394c2cc288c2b6e5ee952ec 100644 --- a/common/dataStructures/ArchiveJob.hpp +++ b/common/dataStructures/ArchiveJob.hpp @@ -43,7 +43,7 @@ struct ArchiveJob { ArchiveRequest request; std::string tapePool; std::string instanceName; - uint64_t copyNumber; + uint32_t copyNumber; uint64_t archiveFileID; std::list<std::string> failurelogs; diff --git a/common/dataStructures/ArchiveRoute.hpp b/common/dataStructures/ArchiveRoute.hpp index cf9573082a33363bf15596c9c89ab09288e379f2..6f668cede55dee8eeedc81c57c5be95743ee6be8 100644 --- a/common/dataStructures/ArchiveRoute.hpp +++ b/common/dataStructures/ArchiveRoute.hpp @@ -55,7 +55,7 @@ struct ArchiveRoute { /** * The cipy number of the tape file. */ - uint64_t copyNb; + uint32_t copyNb; std::string tapePoolName; EntryLog creationLog; diff --git a/common/dataStructures/TapeFile.hpp b/common/dataStructures/TapeFile.hpp index a37771ea6645000bc291e6d4c3b8040737655c7e..7121343503a86866cde31faab55a5cc62b075e06 100644 --- a/common/dataStructures/TapeFile.hpp +++ b/common/dataStructures/TapeFile.hpp @@ -64,7 +64,7 @@ struct TapeFile { * The copy number of the file. Copy numbers start from 1. Copy number 0 * is an invalid copy number. */ - uint16_t copyNb; + uint32_t copyNb; /** * The time the file recorded in the catalogue. */ diff --git a/objectstore/ArchiveQueue.hpp b/objectstore/ArchiveQueue.hpp index ce016de144a341b63d4a7843f1a8493b8471bd0d..6bcf624e4626fa55ccb5f90def5ad04df67c2626 100644 --- a/objectstore/ArchiveQueue.hpp +++ b/objectstore/ArchiveQueue.hpp @@ -106,7 +106,7 @@ public: struct JobDump { uint64_t size; std::string address; - uint16_t copyNb; + uint32_t copyNb; }; std::list<JobDump> dumpJobs(); struct CandidateJobList { diff --git a/objectstore/ArchiveQueueAlgorithms.hpp b/objectstore/ArchiveQueueAlgorithms.hpp index 682baedde0c2137c3e38dcbc1b95b28bb5d317b0..33bdb27b2ca92b156ba2851ab157114b25f85439 100644 --- a/objectstore/ArchiveQueueAlgorithms.hpp +++ b/objectstore/ArchiveQueueAlgorithms.hpp @@ -37,7 +37,7 @@ struct ContainerTraits<ArchiveQueue,C> struct InsertedElement { ArchiveRequest* archiveRequest; - uint16_t copyNb; + uint32_t copyNb; cta::common::dataStructures::ArchiveFile archiveFile; cta::optional<cta::common::dataStructures::MountPolicy> mountPolicy; cta::optional<serializers::ArchiveJobStatus> newStatus; @@ -48,7 +48,7 @@ struct ContainerTraits<ArchiveQueue,C> struct PoppedElement { std::unique_ptr<ArchiveRequest> archiveRequest; - uint16_t copyNb; + uint32_t copyNb; uint64_t bytes; common::dataStructures::ArchiveFile archiveFile; std::string srcURL; diff --git a/objectstore/ArchiveQueueShard.hpp b/objectstore/ArchiveQueueShard.hpp index 1a2074305da88d48db600bc8e0a110ecfb66b280..8443779a606e8337e8e2b75e008ceca5ad74a2db 100644 --- a/objectstore/ArchiveQueueShard.hpp +++ b/objectstore/ArchiveQueueShard.hpp @@ -47,7 +47,7 @@ public: struct JobInfo { uint64_t size; std::string address; - uint16_t copyNb; + uint32_t copyNb; uint64_t priority; uint64_t minArchiveRequestAge; uint64_t maxDrivesAllowed; diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index 986998bb66ea694b0e69e8d1218dac3e3366f083..709358b1cc47047b5738bb6677340e1a5b81018a 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -67,7 +67,7 @@ void ArchiveRequest::initialize() { //------------------------------------------------------------------------------ // ArchiveRequest::addJob() //------------------------------------------------------------------------------ -void ArchiveRequest::addJob(uint16_t copyNumber, +void ArchiveRequest::addJob(uint32_t copyNumber, const std::string& tapepool, const std::string& initialOwner, uint16_t maxRetriesWithinMount, uint16_t maxTotalRetries, uint16_t maxReportRetries) { checkPayloadWritable(); @@ -89,7 +89,7 @@ void ArchiveRequest::addJob(uint16_t copyNumber, //------------------------------------------------------------------------------ // ArchiveRequest::getJobQueueType() //------------------------------------------------------------------------------ -JobQueueType ArchiveRequest::getJobQueueType(uint16_t copyNumber) { +JobQueueType ArchiveRequest::getJobQueueType(uint32_t copyNumber) { checkPayloadReadable(); for (auto &j: m_payload.jobs()) { if (j.copynb() == copyNumber) { @@ -119,7 +119,7 @@ JobQueueType ArchiveRequest::getJobQueueType(uint16_t copyNumber) { //------------------------------------------------------------------------------ // ArchiveRequest::addTransferFailure() //------------------------------------------------------------------------------ -auto ArchiveRequest::addTransferFailure(uint16_t copyNumber, +auto ArchiveRequest::addTransferFailure(uint32_t copyNumber, uint64_t mountId, const std::string & failureReason, log::LogContext & lc) -> EnqueueingNextStep { checkPayloadWritable(); // Find the job and update the number of failures @@ -156,7 +156,7 @@ auto ArchiveRequest::addTransferFailure(uint16_t copyNumber, //------------------------------------------------------------------------------ // ArchiveRequest::addReportFailure() //------------------------------------------------------------------------------ -auto ArchiveRequest::addReportFailure(uint16_t copyNumber, uint64_t sessionId, const std::string& failureReason, +auto ArchiveRequest::addReportFailure(uint32_t copyNumber, uint64_t sessionId, const std::string& failureReason, log::LogContext& lc) -> EnqueueingNextStep { checkPayloadWritable(); // Find the job and update the number of failures @@ -184,7 +184,7 @@ auto ArchiveRequest::addReportFailure(uint16_t copyNumber, uint64_t sessionId, c //------------------------------------------------------------------------------ // ArchiveRequest::getRetryStatus() //------------------------------------------------------------------------------ -ArchiveRequest::RetryStatus ArchiveRequest::getRetryStatus(const uint16_t copyNumber) { +ArchiveRequest::RetryStatus ArchiveRequest::getRetryStatus(const uint32_t copyNumber) { checkPayloadReadable(); for (auto &j: m_payload.jobs()) { if (copyNumber == j.copynb()) { @@ -478,7 +478,7 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer // ArchiveRequest::setJobOwner() //------------------------------------------------------------------------------ void ArchiveRequest::setJobOwner( - uint16_t copyNumber, const std::string& owner) { + uint32_t copyNumber, const std::string& owner) { checkPayloadWritable(); // Find the right job auto mutJobs = m_payload.mutable_jobs(); @@ -494,7 +494,7 @@ void ArchiveRequest::setJobOwner( //------------------------------------------------------------------------------ // ArchiveRequest::asyncUpdateJobOwner() //------------------------------------------------------------------------------ -ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint16_t copyNumber, +ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint32_t copyNumber, const std::string& owner, const std::string& previousOwner, const cta::optional<serializers::ArchiveJobStatus>& newStatus) { std::unique_ptr<AsyncJobOwnerUpdater> ret(new AsyncJobOwnerUpdater); // The unique pointer will be std::moved so we need to work with its content (bare pointer or here ref to content). @@ -624,7 +624,7 @@ objectstore::serializers::ArchiveJobStatus ArchiveRequest::AsyncJobOwnerUpdater: //------------------------------------------------------------------------------ // ArchiveRequest::asyncUpdateTransferSuccessful() //------------------------------------------------------------------------------ -ArchiveRequest::AsyncTransferSuccessfulUpdater * ArchiveRequest::asyncUpdateTransferSuccessful(const uint16_t copyNumber ) { +ArchiveRequest::AsyncTransferSuccessfulUpdater * ArchiveRequest::asyncUpdateTransferSuccessful(const uint32_t copyNumber ) { std::unique_ptr<AsyncTransferSuccessfulUpdater> ret(new AsyncTransferSuccessfulUpdater); // The unique pointer will be std::moved so we need to work with its content (bare pointer or here ref to content). auto & retRef = *ret; @@ -698,7 +698,7 @@ void ArchiveRequest::AsyncRequestDeleter::wait() { //------------------------------------------------------------------------------ // ArchiveRequest::getJobOwner() //------------------------------------------------------------------------------ -std::string ArchiveRequest::getJobOwner(uint16_t copyNumber) { +std::string ArchiveRequest::getJobOwner(uint32_t copyNumber) { checkPayloadReadable(); auto jl = m_payload.jobs(); auto j=std::find_if(jl.begin(), jl.end(), [&](decltype(*jl.begin())& j2){ return j2.copynb() == copyNumber; }); @@ -764,7 +764,7 @@ std::string ArchiveRequest::eventToString(JobEvent jobEvent) { //------------------------------------------------------------------------------ // ArchiveRequest::determineNextStep() //------------------------------------------------------------------------------ -auto ArchiveRequest::determineNextStep(uint16_t copyNumberUpdated, JobEvent jobEvent, +auto ArchiveRequest::determineNextStep(uint32_t copyNumberUpdated, JobEvent jobEvent, log::LogContext& lc) -> EnqueueingNextStep { checkPayloadWritable(); // We have to determine which next step should be taken. @@ -865,7 +865,7 @@ std::list<std::string> ArchiveRequest::getFailures() { //------------------------------------------------------------------------------ // ArchiveRequest::setJobStatus() //------------------------------------------------------------------------------ -void ArchiveRequest::setJobStatus(uint16_t copyNumber, const serializers::ArchiveJobStatus& status) { +void ArchiveRequest::setJobStatus(uint32_t copyNumber, const serializers::ArchiveJobStatus& status) { checkPayloadWritable(); for (auto j=m_payload.mutable_jobs()->begin(); j!=m_payload.mutable_jobs()->end(); j++) { if (j->copynb() == copyNumber) { @@ -879,7 +879,7 @@ void ArchiveRequest::setJobStatus(uint16_t copyNumber, const serializers::Archiv //------------------------------------------------------------------------------ // ArchiveRequest::getTapePoolForJob() //------------------------------------------------------------------------------ -std::string ArchiveRequest::getTapePoolForJob(uint16_t copyNumber) { +std::string ArchiveRequest::getTapePoolForJob(uint32_t copyNumber) { checkPayloadReadable(); for (auto j:m_payload.jobs()) if (j.copynb() == copyNumber) return j.tapepool(); throw exception::Exception("In ArchiveRequest::getTapePoolForJob(): job not found."); diff --git a/objectstore/ArchiveRequest.hpp b/objectstore/ArchiveRequest.hpp index 64915ddd4558d4cbdb83aa6386269c0e05f3fb9e..3c0fb439ca8721d3c0c9cc942edfc7fdbccb5db0 100644 --- a/objectstore/ArchiveRequest.hpp +++ b/objectstore/ArchiveRequest.hpp @@ -48,7 +48,7 @@ public: std::string getOwner() = delete; void setOwner(const std::string &) = delete; // Job management ============================================================ - void addJob(uint16_t copyNumber, const std::string & tapepool, + void addJob(uint32_t copyNumber, const std::string & tapepool, const std::string & initialOwner, uint16_t maxRetriesWithinMount, uint16_t maxTotalRetries, uint16_t maxReportRetries); struct RetryStatus { uint64_t retriesWithinMount = 0; @@ -58,11 +58,11 @@ public: uint64_t reportRetries = 0; uint64_t maxReportRetries = 0; }; - RetryStatus getRetryStatus(uint16_t copyNumber); + RetryStatus getRetryStatus(uint32_t copyNumber); std::list<std::string> getFailures(); - serializers::ArchiveJobStatus getJobStatus(uint16_t copyNumber); - void setJobStatus(uint16_t copyNumber, const serializers::ArchiveJobStatus & status); - std::string getTapePoolForJob(uint16_t copyNumber); + serializers::ArchiveJobStatus getJobStatus(uint32_t copyNumber); + void setJobStatus(uint32_t copyNumber, const serializers::ArchiveJobStatus & status); + std::string getTapePoolForJob(uint32_t copyNumber); std::string statusToString(const serializers::ArchiveJobStatus & status); enum class JobEvent { TransferFailed, @@ -91,17 +91,17 @@ private: * @param lc * @return The next step to be taken by the caller (OStoreDB), which is in charge of the queueing and status setting. */ - EnqueueingNextStep determineNextStep(uint16_t copyNumberToUpdate, JobEvent jobEvent, log::LogContext & lc); + EnqueueingNextStep determineNextStep(uint32_t copyNumberToUpdate, JobEvent jobEvent, log::LogContext & lc); public: - EnqueueingNextStep addTransferFailure(uint16_t copyNumber, uint64_t sessionId, const std::string & failureReason, + EnqueueingNextStep addTransferFailure(uint32_t copyNumber, uint64_t sessionId, const std::string & failureReason, log::LogContext &lc); //< returns next step to take with the job - EnqueueingNextStep addReportFailure(uint16_t copyNumber, uint64_t sessionId, const std::string & failureReason, + EnqueueingNextStep addReportFailure(uint32_t copyNumber, uint64_t sessionId, const std::string & failureReason, log::LogContext &lc); //< returns next step to take with the job CTA_GENERATE_EXCEPTION_CLASS(JobNotQueueable); - JobQueueType getJobQueueType(uint16_t copyNumber); + JobQueueType getJobQueueType(uint32_t copyNumber); CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob); // Set a job ownership - void setJobOwner(uint16_t copyNumber, const std::string & owner); + void setJobOwner(uint32_t copyNumber, const std::string & owner); // An asynchronous job ownership updating class. class AsyncJobOwnerUpdater { friend class ArchiveRequest; @@ -134,7 +134,7 @@ public: }; // An job owner updater factory. The owner MUST be previousOwner for the update to be executed. If the owner is already the targeted // one, the request will do nothing and not fail. - AsyncJobOwnerUpdater * asyncUpdateJobOwner(uint16_t copyNumber, const std::string & owner, const std::string &previousOwner, + AsyncJobOwnerUpdater * asyncUpdateJobOwner(uint32_t copyNumber, const std::string & owner, const std::string &previousOwner, const cta::optional<serializers::ArchiveJobStatus>& newStatus); // An asynchronous job updating class for transfer success. @@ -147,7 +147,7 @@ public: std::function<std::string(const std::string &)> m_updaterCallback; std::unique_ptr<Backend::AsyncUpdater> m_backendUpdater; }; - AsyncTransferSuccessfulUpdater * asyncUpdateTransferSuccessful(uint16_t copyNumber); + AsyncTransferSuccessfulUpdater * asyncUpdateTransferSuccessful(uint32_t copyNumber); // An asynchronous request deleter class after report of success. class AsyncRequestDeleter { @@ -159,7 +159,7 @@ public: }; AsyncRequestDeleter * asyncDeleteRequest(); // Get a job owner - std::string getJobOwner(uint16_t copyNumber); + std::string getJobOwner(uint32_t copyNumber); // Utility to convert status to queue type static JobQueueType getQueueType(const serializers::ArchiveJobStatus &status); @@ -189,7 +189,7 @@ public: class JobDump { public: - uint16_t copyNb; + uint32_t copyNb; std::string tapePool; std::string owner; serializers::ArchiveJobStatus status; diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index 7537cf9ec7d2bb63430de40b12a45627b9f7a831..6cb4069bdcb02f12f9c300e132f95363954cb700 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -573,7 +573,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& struct RRUpdatedParams { std::unique_ptr<RetrieveRequest::AsyncJobOwnerUpdater> updater; std::shared_ptr<RetrieveRequest> retrieveRequest; - uint16_t copyNb; + uint32_t copyNb; }; { std::list<RRUpdatedParams> rrUpdatersParams; @@ -600,7 +600,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& .add("copyNb", rrup.copyNb) .add("fileId", rrup.retrieveRequest->getArchiveFile().archiveFileID) .add("tapeVid", vid) - .add("retreveQueueObject", rq.getAddressIfSet()) + .add("retrieveQueueObject", rq.getAddressIfSet()) .add("garbageCollectedPreviousOwner", agent.getAddressIfSet()); lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(): requeued retrieve job."); } catch (cta::exception::Exception & e) { diff --git a/objectstore/RetrieveQueue.hpp b/objectstore/RetrieveQueue.hpp index 3c4243b572f2acd5d0726d4bdab5b0132eb0cf4b..c8513904facec03ee1e07411dfac7265bfdb2ab8 100644 --- a/objectstore/RetrieveQueue.hpp +++ b/objectstore/RetrieveQueue.hpp @@ -59,7 +59,7 @@ public: // Retrieve jobs management ================================================== struct JobToAdd { - uint64_t copyNb; + uint32_t copyNb; uint64_t fSeq; std::string retrieveRequestAddress; uint64_t fileSize; @@ -86,7 +86,7 @@ public: JobsSummary getJobsSummary(); struct JobDump { std::string address; - uint16_t copyNb; + uint32_t copyNb; uint64_t size; }; std::list<JobDump> dumpJobs(); diff --git a/objectstore/RetrieveQueueAlgorithms.hpp b/objectstore/RetrieveQueueAlgorithms.hpp index 7999cc198b5b6fc3da44bfa6df2c808721abad44..341142515aa36dae36e4e72d8f35ef459ed752d5 100644 --- a/objectstore/RetrieveQueueAlgorithms.hpp +++ b/objectstore/RetrieveQueueAlgorithms.hpp @@ -29,7 +29,7 @@ struct ContainerTraits<RetrieveQueue,C> { struct ContainerSummary : public RetrieveQueue::JobsSummary { ContainerSummary() : RetrieveQueue::JobsSummary() {} - ContainerSummary(const RetrieveQueue::JobsSummary &c) : RetrieveQueue::JobsSummary() {} + ContainerSummary(const RetrieveQueue::JobsSummary &c) : RetrieveQueue::JobsSummary({c.jobs,c.bytes,c.oldestJobStartTime,c.priority,c.minRetrieveRequestAge,c.maxDrivesAllowed}) {} void addDeltaToLog(const ContainerSummary&, log::ScopedParamContainer&) const; }; @@ -37,7 +37,7 @@ struct ContainerTraits<RetrieveQueue,C> struct InsertedElement { RetrieveRequest *retrieveRequest; - uint16_t copyNb; + uint32_t copyNb; uint64_t fSeq; uint64_t filesize; cta::common::dataStructures::MountPolicy policy; @@ -49,7 +49,7 @@ struct ContainerTraits<RetrieveQueue,C> struct PoppedElement { std::unique_ptr<RetrieveRequest> retrieveRequest; - uint16_t copyNb; + uint32_t copyNb; uint64_t bytes; common::dataStructures::ArchiveFile archiveFile; common::dataStructures::RetrieveRequest rr; diff --git a/objectstore/RetrieveQueueShard.hpp b/objectstore/RetrieveQueueShard.hpp index 81549e76c715deb8c595acd0ed51c08145c7176c..01c231740890084da9cfd7a2c0f6256c83b24f0f 100644 --- a/objectstore/RetrieveQueueShard.hpp +++ b/objectstore/RetrieveQueueShard.hpp @@ -47,7 +47,7 @@ public: struct JobInfo { uint64_t size; std::string address; - uint16_t copyNb; + uint32_t copyNb; uint64_t priority; uint64_t minRetrieveRequestAge; uint64_t maxDrivesAllowed; diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index f4c70dd8d3d64ef961bb63fcce0b88299bbad416..74e2f758ec7088a11a18deae2c44cee6d36fb1ac 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -271,7 +271,7 @@ queueForTransfer:; //------------------------------------------------------------------------------ // RetrieveRequest::addJob() //------------------------------------------------------------------------------ -void RetrieveRequest::addJob(uint64_t copyNb, uint16_t maxRetriesWithinMount, uint16_t maxTotalRetries, +void RetrieveRequest::addJob(uint32_t copyNb, uint16_t maxRetriesWithinMount, uint16_t maxTotalRetries, uint16_t maxReportRetries) { checkPayloadWritable(); @@ -290,7 +290,7 @@ void RetrieveRequest::addJob(uint64_t copyNb, uint16_t maxRetriesWithinMount, ui //------------------------------------------------------------------------------ // addTransferFailure() //------------------------------------------------------------------------------ -auto RetrieveRequest::addTransferFailure(uint16_t copyNumber, uint64_t mountId, const std::string &failureReason, +auto RetrieveRequest::addTransferFailure(uint32_t copyNumber, uint64_t mountId, const std::string &failureReason, log::LogContext &lc) -> EnqueueingNextStep { checkPayloadWritable(); @@ -331,7 +331,7 @@ auto RetrieveRequest::addTransferFailure(uint16_t copyNumber, uint64_t mountId, //------------------------------------------------------------------------------ // addReportFailure() //------------------------------------------------------------------------------ -auto RetrieveRequest::addReportFailure(uint16_t copyNumber, uint64_t sessionId, const std::string &failureReason, +auto RetrieveRequest::addReportFailure(uint32_t copyNumber, uint64_t sessionId, const std::string &failureReason, log::LogContext &lc) -> EnqueueingNextStep { checkPayloadWritable(); @@ -454,7 +454,7 @@ auto RetrieveRequest::dumpJobs() -> std::list<JobDump> { //------------------------------------------------------------------------------ // RetrieveRequest::getJob() //------------------------------------------------------------------------------ -auto RetrieveRequest::getJob(uint16_t copyNb) -> JobDump { +auto RetrieveRequest::getJob(uint32_t copyNb) -> JobDump { checkPayloadReadable(); // find the job for (auto & j: m_payload.jobs()) { @@ -485,7 +485,7 @@ auto RetrieveRequest::getJobs() -> std::list<JobDump> { //------------------------------------------------------------------------------ // RetrieveRequest::addJobFailure() //------------------------------------------------------------------------------ -bool RetrieveRequest::addJobFailure(uint16_t copyNumber, uint64_t mountId, +bool RetrieveRequest::addJobFailure(uint32_t copyNumber, uint64_t mountId, const std::string & failureReason, log::LogContext & lc) { checkPayloadWritable(); // Find the job and update the number of failures @@ -519,7 +519,7 @@ bool RetrieveRequest::addJobFailure(uint16_t copyNumber, uint64_t mountId, //------------------------------------------------------------------------------ // RetrieveRequest::getRetryStatus() //------------------------------------------------------------------------------ -RetrieveRequest::RetryStatus RetrieveRequest::getRetryStatus(const uint16_t copyNumber) { +RetrieveRequest::RetryStatus RetrieveRequest::getRetryStatus(const uint32_t copyNumber) { checkPayloadReadable(); for (auto &j: m_payload.jobs()) { if (copyNumber == j.copynb()) { @@ -562,6 +562,27 @@ JobQueueType RetrieveRequest::getQueueType() { return JobQueueType::FailedJobs; } +JobQueueType RetrieveRequest::getQueueType(uint32_t copyNb){ + checkPayloadReadable(); + for(auto &j: m_payload.jobs()){ + if(j.copynb() == copyNb){ + switch(j.status()){ + case serializers::RetrieveJobStatus::RJS_ToTransfer: + return JobQueueType::JobsToTransfer; + case serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess: + return JobQueueType::JobsToReportToRepackForSuccess; + case serializers::RetrieveJobStatus::RJS_ToReportForFailure: + return JobQueueType::JobsToReportToUser; + case serializers::RetrieveJobStatus::RJS_Failed: + return JobQueueType::FailedJobs; + default: + return JobQueueType::FailedJobs; + } + } + } + throw cta::exception::Exception("In RetrieveRequest::getJobQueueType(): Copy number not found."); +} + //------------------------------------------------------------------------------ // RetrieveRequest::statusToString() //------------------------------------------------------------------------------ @@ -591,7 +612,7 @@ std::string RetrieveRequest::eventToString(JobEvent jobEvent) { //------------------------------------------------------------------------------ // RetrieveRequest::determineNextStep() //------------------------------------------------------------------------------ -auto RetrieveRequest::determineNextStep(uint16_t copyNumberUpdated, JobEvent jobEvent, +auto RetrieveRequest::determineNextStep(uint32_t copyNumberUpdated, JobEvent jobEvent, log::LogContext& lc) -> EnqueueingNextStep { checkPayloadWritable(); @@ -653,7 +674,7 @@ auto RetrieveRequest::determineNextStep(uint16_t copyNumberUpdated, JobEvent job //------------------------------------------------------------------------------ // RetrieveRequest::getJobStatus() //------------------------------------------------------------------------------ -serializers::RetrieveJobStatus RetrieveRequest::getJobStatus(uint16_t copyNumber) { +serializers::RetrieveJobStatus RetrieveRequest::getJobStatus(uint32_t copyNumber) { checkPayloadReadable(); for (auto & j: m_payload.jobs()) if (j.copynb() == copyNumber) @@ -666,7 +687,7 @@ serializers::RetrieveJobStatus RetrieveRequest::getJobStatus(uint16_t copyNumber //------------------------------------------------------------------------------ // RetrieveRequest::asyncUpdateJobOwner() //------------------------------------------------------------------------------ -auto RetrieveRequest::asyncUpdateJobOwner(uint16_t copyNumber, const std::string &owner, +auto RetrieveRequest::asyncUpdateJobOwner(uint32_t copyNumber, const std::string &owner, const std::string &previousOwner) -> AsyncJobOwnerUpdater* { std::unique_ptr<AsyncJobOwnerUpdater> ret(new AsyncJobOwnerUpdater); @@ -829,7 +850,7 @@ void RetrieveRequest::AsyncJobDeleter::wait() { //------------------------------------------------------------------------------ // RetrieveRequest::asyncReportSucceedForRepack() //------------------------------------------------------------------------------ -RetrieveRequest::AsyncJobSucceedForRepackReporter * RetrieveRequest::asyncReportSucceedForRepack(uint64_t copyNb) +RetrieveRequest::AsyncJobSucceedForRepackReporter * RetrieveRequest::asyncReportSucceedForRepack(uint32_t copyNb) { std::unique_ptr<AsyncJobSucceedForRepackReporter> ret(new AsyncJobSucceedForRepackReporter); ret->m_updaterCallback = [copyNb](const std::string &in)->std::string{ @@ -1004,7 +1025,7 @@ void RetrieveRequest::setFailureReason(const std::string& reason) { //------------------------------------------------------------------------------ // RetrieveRequest::setJobStatus() //------------------------------------------------------------------------------ -void RetrieveRequest::setJobStatus(uint64_t copyNumber, const serializers::RetrieveJobStatus& status) { +void RetrieveRequest::setJobStatus(uint32_t copyNumber, const serializers::RetrieveJobStatus& status) { checkPayloadWritable(); for (auto j = m_payload.mutable_jobs()->begin(); j != m_payload.mutable_jobs()->end(); j++) { if (j->copynb() == copyNumber) { diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp index 9a4970d0bc449d26f0b36478f600fca384c8230f..3a55e8706e30c32f135255abf328e31ae622e153 100644 --- a/objectstore/RetrieveRequest.hpp +++ b/objectstore/RetrieveRequest.hpp @@ -48,12 +48,12 @@ public: void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, cta::catalogue::Catalogue & catalogue) override; // Job management ============================================================ - void addJob(uint64_t copyNumber, uint16_t maxRetriesWithinMount, uint16_t maxTotalRetries, uint16_t maxReportRetries); + void addJob(uint32_t copyNumber, uint16_t maxRetriesWithinMount, uint16_t maxTotalRetries, uint16_t maxReportRetries); std::string getLastActiveVid(); void setFailureReason(const std::string & reason); class JobDump { public: - uint64_t copyNb; + uint32_t copyNb; serializers::RetrieveJobStatus status; }; // An asynchronous job ownership updating class. @@ -104,7 +104,7 @@ public: * @return the class that is Responsible to save the updater callback * and the backend async updater (responsible for executing asynchronously the updater callback */ - AsyncJobSucceedForRepackReporter * asyncReportSucceedForRepack(uint64_t copyNb); + AsyncJobSucceedForRepackReporter * asyncReportSucceedForRepack(uint32_t copyNb); /** * Asynchronously transform the current RetrieveRequest into an ArchiveRequest @@ -113,9 +113,9 @@ public: */ AsyncRetrieveToArchiveTransformer * asyncTransformToArchiveRequest(AgentReference& processAgent); - JobDump getJob(uint16_t copyNb); + JobDump getJob(uint32_t copyNb); std::list<JobDump> getJobs(); - bool addJobFailure(uint16_t copyNumber, uint64_t mountId, const std::string & failureReason, log::LogContext & lc); + bool addJobFailure(uint32_t copyNumber, uint64_t mountId, const std::string & failureReason, log::LogContext & lc); /**< Returns true is the request is completely failed (telling wheather we should requeue or not). */ struct RetryStatus { @@ -126,7 +126,7 @@ public: uint64_t totalReportRetries = 0; uint64_t maxReportRetries = 0; }; - RetryStatus getRetryStatus(uint16_t copyNumber); + RetryStatus getRetryStatus(uint32_t copyNumber); enum class JobEvent { TransferFailed, ReportFailed @@ -162,18 +162,20 @@ private: * @returns The next step to be taken by the caller (OStoreDB), which is in charge of the queueing * and status setting */ - EnqueueingNextStep determineNextStep(uint16_t copyNumberToUpdate, JobEvent jobEvent, log::LogContext &lc); + EnqueueingNextStep determineNextStep(uint32_t copyNumberToUpdate, JobEvent jobEvent, log::LogContext &lc); public: //! Returns next step to take with the job - EnqueueingNextStep addTransferFailure(uint16_t copyNumber, uint64_t sessionId, const std::string &failureReason, log::LogContext &lc); + EnqueueingNextStep addTransferFailure(uint32_t copyNumber, uint64_t sessionId, const std::string &failureReason, log::LogContext &lc); //! Returns next step to take with the job - EnqueueingNextStep addReportFailure(uint16_t copyNumber, uint64_t sessionId, const std::string &failureReason, log::LogContext &lc); + EnqueueingNextStep addReportFailure(uint32_t copyNumber, uint64_t sessionId, const std::string &failureReason, log::LogContext &lc); //! Returns queue type depending on the compound statuses of all retrieve requests JobQueueType getQueueType(); + CTA_GENERATE_EXCEPTION_CLASS(JobNotQueueable); + JobQueueType getQueueType(uint32_t copyNumber); std::list<std::string> getFailures(); std::string statusToString(const serializers::RetrieveJobStatus & status); - serializers::RetrieveJobStatus getJobStatus(uint16_t copyNumber); - void setJobStatus(uint64_t copyNumber, const serializers::RetrieveJobStatus &status); + serializers::RetrieveJobStatus getJobStatus(uint32_t copyNumber); + void setJobStatus(uint32_t copyNumber, const serializers::RetrieveJobStatus &status); CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob); // An asynchronous job ownership updating class. class AsyncJobOwnerUpdater { @@ -191,7 +193,7 @@ public: serializers::RetrieveJobStatus m_jobStatus; }; // An owner updater factory. The owner MUST be previousOwner for the update to be executed. - AsyncJobOwnerUpdater *asyncUpdateJobOwner(uint16_t copyNumber, const std::string &owner, const std::string &previousOwner); + AsyncJobOwnerUpdater *asyncUpdateJobOwner(uint32_t copyNumber, const std::string &owner, const std::string &previousOwner); // =========================================================================== void setSchedulerRequest(const cta::common::dataStructures::RetrieveRequest & retrieveRequest); cta::common::dataStructures::RetrieveRequest getSchedulerRequest(); diff --git a/objectstore/Sorter.cpp b/objectstore/Sorter.cpp index 5359ae600f946d9885961adc8485ce5cbfa4e8b1..663c1471de73e38f4df15dce2fecb7bedfa26b34 100644 --- a/objectstore/Sorter.cpp +++ b/objectstore/Sorter.cpp @@ -27,6 +27,8 @@ Sorter::Sorter(AgentReference& agentReference, Backend& objectstore, catalogue:: Sorter::~Sorter() { } +/* Archive related algorithms */ + template <typename SpecificQueue> void Sorter::executeArchiveAlgorithm(const std::string tapePool, std::string& queueAddress, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& jobs, log::LogContext& lc){ typedef ContainerAlgorithms<ArchiveQueue,SpecificQueue> Algo; @@ -37,7 +39,6 @@ void Sorter::executeArchiveAlgorithm(const std::string tapePool, std::string& qu for(auto& jobToAdd: jobs){ Sorter::ArchiveJob job = std::get<0>(jobToAdd->jobToQueue); succeededJobs[job.jobDump.copyNb] = jobToAdd; - //TODO, change the ownership by passing the previousOwner to this sorter previousOwner = job.previousOwner->getAgentAddress(); jobsToAdd.push_back({ jobToAdd->archiveRequest.get(),job.jobDump.copyNb,job.archiveFile, job.mountPolicy,cta::nullopt }); } @@ -48,7 +49,7 @@ void Sorter::executeArchiveAlgorithm(const std::string tapePool, std::string& qu try{ std::rethrow_exception(failedAR.failure); } catch(cta::exception::Exception &e){ - uint16_t copyNb = failedAR.element->copyNb; + uint32_t copyNb = failedAR.element->copyNb; std::get<1>(succeededJobs[copyNb]->jobToQueue).set_exception(std::current_exception()); succeededJobs.erase(copyNb); } @@ -70,43 +71,181 @@ void Sorter::dispatchArchiveAlgorithm(const std::string tapePool, const JobQueue } } +void Sorter::insertArchiveRequest(std::shared_ptr<ArchiveRequest> archiveRequest, AgentReferenceInterface& previousOwner, log::LogContext& lc){ + for(auto& job: archiveRequest->dumpJobs()){ + insertArchiveJob(archiveRequest,previousOwner,job,lc); + } +} + void Sorter::insertArchiveJob(std::shared_ptr<ArchiveRequest> archiveRequest, AgentReferenceInterface &previousOwner, ArchiveRequest::JobDump& jobToInsert, log::LogContext & lc){ auto ajqi = std::make_shared<ArchiveJobQueueInfo>(); ajqi->archiveRequest = archiveRequest; Sorter::ArchiveJob jobToAdd; jobToAdd.archiveRequest = archiveRequest; jobToAdd.archiveFile = archiveRequest->getArchiveFile(); - jobToAdd.archiveFileId = archiveRequest->getArchiveFile().archiveFileID; jobToAdd.jobDump.copyNb = jobToInsert.copyNb; - jobToAdd.fileSize = archiveRequest->getArchiveFile().fileSize; jobToAdd.mountPolicy = archiveRequest->getMountPolicy(); jobToAdd.previousOwner = &previousOwner; - jobToAdd.startTime = archiveRequest->getEntryLog().time; jobToAdd.jobDump.tapePool = jobToInsert.tapePool; ajqi->jobToQueue = std::make_tuple(jobToAdd,std::promise<void>()); + try{ + threading::MutexLocker mapLocker(m_mutex); + m_archiveQueuesAndRequests[std::make_tuple(jobToInsert.tapePool, archiveRequest->getJobQueueType(jobToInsert.copyNb))].emplace_back(ajqi); + } catch (cta::exception::Exception &ex){ + log::ScopedParamContainer params(lc); + params.add("fileId", archiveRequest->getArchiveFile().archiveFileID) + .add("exceptionMessage", ex.getMessageValue()); + lc.log(log::ERR,"In Sorter::insertArchiveJob() Failed to determine destination queue for Archive Job."); + } +} + +bool Sorter::flushOneArchive(log::LogContext &lc) { + threading::MutexLocker locker(m_mutex); + for(auto & kv: m_archiveQueuesAndRequests){ + if(!kv.second.empty()){ + queueArchiveRequests(std::get<0>(kv.first),std::get<1>(kv.first),kv.second,lc); + m_archiveQueuesAndRequests.erase(kv.first); + return true; + } + } + return false; +} + +Sorter::MapArchive Sorter::getAllArchive(){ threading::MutexLocker mapLocker(m_mutex); - m_archiveQueuesAndRequests[std::make_tuple(jobToInsert.tapePool, archiveRequest->getJobQueueType(jobToInsert.copyNb))].emplace_back(ajqi); + return m_archiveQueuesAndRequests; +} + +void Sorter::queueArchiveRequests(const std::string tapePool, const JobQueueType jobQueueType, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& archiveJobsInfos, log::LogContext &lc){ + std::string queueAddress; + this->dispatchArchiveAlgorithm(tapePool,jobQueueType,queueAddress,archiveJobsInfos,lc); +} + +/* End of archive related algorithms */ + +/* Retrieve related algorithms */ + +template <typename SpecificQueue> +void Sorter::executeRetrieveAlgorithm(const std::string vid, std::string& queueAddress, std::list<std::shared_ptr<RetrieveJobQueueInfo>>& jobs, log::LogContext& lc){ + typedef ContainerAlgorithms<RetrieveQueue,SpecificQueue> Algo; + Algo algo(m_objectstore,m_agentReference); + typename Algo::InsertedElement::list jobsToAdd; + std::map<uint64_t, std::shared_ptr<RetrieveJobQueueInfo>> succeededJobs; + std::string previousOwner; + for(auto& jobToAdd: jobs){ + Sorter::RetrieveJob job = std::get<0>(jobToAdd->jobToQueue); + succeededJobs[job.jobDump.copyNb] = jobToAdd; + previousOwner = job.previousOwner->getAgentAddress(); + jobsToAdd.push_back({job.retrieveRequest.get(),job.jobDump.copyNb,job.fSeq,job.fileSize,job.mountPolicy,job.jobDump.status}); + } + try{ + algo.referenceAndSwitchOwnershipIfNecessary(vid,previousOwner,queueAddress,jobsToAdd,lc); + } catch(typename Algo::OwnershipSwitchFailure &failure){ + for(auto& failedRR: failure.failedElements){ + try{ + std::rethrow_exception(failedRR.failure); + } catch (cta::exception::Exception){ + uint32_t copyNb = failedRR.element->copyNb; + std::get<1>(succeededJobs[copyNb]->jobToQueue).set_exception(std::current_exception()); + succeededJobs.erase(copyNb); + } + } + } + for(auto &succeededJob: succeededJobs){ + std::get<1>(succeededJob.second->jobToQueue).set_value(); + } +} + +void Sorter::dispatchRetrieveAlgorithm(const std::string vid, const JobQueueType jobQueueType, std::string& queueAddress, std::list<std::shared_ptr<RetrieveJobQueueInfo> >& jobs, log::LogContext& lc){ + switch(jobQueueType){ + case JobQueueType::JobsToReportToUser: + this->executeRetrieveAlgorithm<RetrieveQueueToReport>(vid,queueAddress,jobs,lc); + break; + case JobQueueType::JobsToTransfer: + this->executeRetrieveAlgorithm<RetrieveQueueToTransfer>(vid,queueAddress,jobs,lc); + break; + case JobQueueType::JobsToReportToRepackForSuccess: + this->executeRetrieveAlgorithm<RetrieveQueueToReportToRepackForSuccess>(vid,queueAddress,jobs,lc); + break; + default: + throw cta::exception::Exception("In Sorter::dispatchRetrieveAlgorithm(), unknown JobQueueType"); + break; + } +} + +Sorter::RetrieveJob Sorter::createRetrieveJob(std::shared_ptr<RetrieveRequest> retrieveRequest, const cta::common::dataStructures::ArchiveFile archiveFile, + const uint32_t copyNb, const uint64_t fSeq, AgentReferenceInterface* previousOwner){ + + Sorter::RetrieveJob jobToAdd; + jobToAdd.jobDump.copyNb = copyNb; + jobToAdd.fSeq = fSeq; + jobToAdd.mountPolicy = retrieveRequest->getRetrieveFileQueueCriteria().mountPolicy; + jobToAdd.retrieveRequest = retrieveRequest; + jobToAdd.previousOwner = previousOwner; + jobToAdd.jobDump.status = retrieveRequest->getJobStatus(jobToAdd.jobDump.copyNb); + jobToAdd.fileSize = archiveFile.fileSize; + return jobToAdd; } -void Sorter::insertRetrieveRequest(std::shared_ptr<RetrieveRequest> retrieveRequest, log::LogContext & lc){ - std::set<std::string> candidateVids = getCandidateVids(*retrieveRequest); - //We need to select the best VID to queue the RetrieveJob in the best queue - if(candidateVids.empty()){ +void Sorter::insertRetrieveRequest(std::shared_ptr<RetrieveRequest> retrieveRequest, AgentReferenceInterface &previousOwner, cta::optional<uint32_t> copyNb, log::LogContext & lc){ + if(copyNb == cta::nullopt){ + //The job to queue will be a ToTransfer + std::set<std::string> candidateVidsToTransfer = getCandidateVidsToTransfer(*retrieveRequest); + + if(!candidateVidsToTransfer.empty()){ + + std::string bestVid = getBestVidForQueueingRetrieveRequest(*retrieveRequest, candidateVidsToTransfer ,lc); + + for (auto & tf: retrieveRequest->getArchiveFile().tapeFiles) { + if (tf.second.vid == bestVid) { + goto vidFound; + } + } + { + std::stringstream err; + err << "In Sorter::insertRetrieveRequest(): no tape file for requested vid. archiveId=" << retrieveRequest->getArchiveFile().archiveFileID + << " vid=" << bestVid; + throw RetrieveRequestHasNoCopies(err.str()); + } + vidFound: + std::shared_ptr<RetrieveJobQueueInfo> rjqi = std::make_shared<RetrieveJobQueueInfo>(RetrieveJobQueueInfo()); + rjqi->retrieveRequest = retrieveRequest; + log::ScopedParamContainer params(lc); + size_t copyNb = std::numeric_limits<size_t>::max(); + uint64_t fSeq = std::numeric_limits<uint64_t>::max(); + for (auto & tc: retrieveRequest->getArchiveFile().tapeFiles) { if (tc.second.vid==bestVid) { copyNb=tc.first; fSeq=tc.second.fSeq; } } + cta::common::dataStructures::ArchiveFile archiveFile = retrieveRequest->getArchiveFile(); + Sorter::RetrieveJob jobToAdd = createRetrieveJob(retrieveRequest,archiveFile,copyNb,fSeq,&previousOwner); + //We are sure that we want to queue a ToTransfer Job + rjqi->jobToQueue = std::make_tuple(jobToAdd,std::promise<void>()); + threading::MutexLocker mapLocker(m_mutex); + m_retrieveQueuesAndRequests[std::make_tuple(bestVid,JobQueueType::JobsToTransfer)].emplace_back(rjqi); + params.add("fileId", retrieveRequest->getArchiveFile().archiveFileID) + .add("copyNb", copyNb) + .add("tapeVid", bestVid) + .add("fSeq", fSeq); + lc.log(log::INFO, "Selected vid to be queued for retrieve request."); + return; + } else { + throw cta::exception::Exception("In Sorter::insertRetrieveRequest(), there is no ToTransfer jobs in the RetrieveRequest. Please provide the copyNb of the job you want to queue."); + } + } else { + //We want to queue a specific job identified by its copyNb + log::ScopedParamContainer params(lc); auto rjqi = std::make_shared<RetrieveJobQueueInfo>(); rjqi->retrieveRequest = retrieveRequest; - //The first copy of the ArchiveFile will be queued - cta::common::dataStructures::TapeFile jobTapeFile = retrieveRequest->getArchiveFile().tapeFiles.begin()->second; - Sorter::RetrieveJob jobToAdd; - jobToAdd.jobDump.copyNb = jobTapeFile.copyNb; - jobToAdd.fSeq = jobTapeFile.fSeq; - jobToAdd.fileSize = retrieveRequest->getArchiveFile().fileSize; - jobToAdd.mountPolicy = retrieveRequest->getRetrieveFileQueueCriteria().mountPolicy; - jobToAdd.retrieveRequest = retrieveRequest; - jobToAdd.startTime = retrieveRequest->getEntryLog().time; + cta::common::dataStructures::ArchiveFile archiveFile = retrieveRequest->getArchiveFile(); + cta::common::dataStructures::TapeFile jobTapeFile = archiveFile.tapeFiles[copyNb.value()]; + Sorter::RetrieveJob jobToAdd = createRetrieveJob(retrieveRequest,archiveFile,jobTapeFile.copyNb,jobTapeFile.fSeq,&previousOwner); rjqi->jobToQueue = std::make_tuple(jobToAdd,std::promise<void>()); try{ threading::MutexLocker mapLocker(m_mutex); - m_retrieveQueuesAndRequests[std::make_tuple(retrieveRequest->getArchiveFile().tapeFiles.begin()->second.vid, retrieveRequest->getQueueType())].emplace_back(rjqi); + m_retrieveQueuesAndRequests[std::make_tuple(jobTapeFile.vid, retrieveRequest->getQueueType(copyNb.value()))].emplace_back(rjqi); + params.add("fileId", retrieveRequest->getArchiveFile().archiveFileID) + .add("copyNb", copyNb.value()) + .add("tapeVid", jobTapeFile.vid) + .add("fSeq", jobTapeFile.fSeq); + lc.log(log::INFO, "Selected the vid of the job to be queued for retrieve request."); } catch (cta::exception::Exception &ex){ log::ScopedParamContainer params(lc); params.add("fileId", retrieveRequest->getArchiveFile().archiveFileID) @@ -115,46 +254,9 @@ void Sorter::insertRetrieveRequest(std::shared_ptr<RetrieveRequest> retrieveRequ throw ex; } } - std::string bestVid = getBestVidForQueueingRetrieveRequest(*retrieveRequest, candidateVids ,lc); - - for (auto & tf: retrieveRequest->getArchiveFile().tapeFiles) { - if (tf.second.vid == bestVid) { - goto vidFound; - } - } - { - std::stringstream err; - err << "In Sorter::insertRetrieveRequest(): no tape file for requested vid. archiveId=" << retrieveRequest->getArchiveFile().archiveFileID - << " vid=" << bestVid; - throw RetrieveRequestHasNoCopies(err.str()); - } - vidFound: - std::shared_ptr<RetrieveJobQueueInfo> rjqi = std::make_shared<RetrieveJobQueueInfo>(RetrieveJobQueueInfo()); - rjqi->retrieveRequest = retrieveRequest; - log::ScopedParamContainer params(lc); - size_t copyNb = std::numeric_limits<size_t>::max(); - uint64_t fSeq = std::numeric_limits<uint64_t>::max(); - for (auto & tc: retrieveRequest->getArchiveFile().tapeFiles) { if (tc.second.vid==bestVid) { copyNb=tc.first; fSeq=tc.second.fSeq; } } - Sorter::RetrieveJob jobToAdd; - jobToAdd.jobDump.copyNb = copyNb; - jobToAdd.fSeq = fSeq; - jobToAdd.fileSize = retrieveRequest->getArchiveFile().fileSize; - jobToAdd.mountPolicy = retrieveRequest->getRetrieveFileQueueCriteria().mountPolicy; - jobToAdd.retrieveRequest = retrieveRequest; - jobToAdd.startTime = retrieveRequest->getEntryLog().time; - - threading::MutexLocker mapLocker(m_mutex); - //We are sure that we want to transfer jobs - rjqi->jobToQueue = std::make_tuple(jobToAdd,std::promise<void>()); - m_retrieveQueuesAndRequests[std::make_tuple(bestVid,JobQueueType::JobsToTransfer)].emplace_back(rjqi); - params.add("fileId", retrieveRequest->getArchiveFile().archiveFileID) - .add("copyNb", copyNb) - .add("tapeVid", bestVid) - .add("fSeq", fSeq); - lc.log(log::INFO, "Selected vid to be queued for retrieve request."); } -std::set<std::string> Sorter::getCandidateVids(RetrieveRequest &request){ +std::set<std::string> Sorter::getCandidateVidsToTransfer(RetrieveRequest &request){ using serializers::RetrieveJobStatus; std::set<std::string> candidateVids; for (auto & j: request.dumpJobs()) { @@ -178,175 +280,28 @@ std::string Sorter::getBestVidForQueueingRetrieveRequest(RetrieveRequest& retrie return vid; } -bool Sorter::flushOneArchive(log::LogContext &lc) { +bool Sorter::flushOneRetrieve(log::LogContext &lc){ threading::MutexLocker locker(m_mutex); - for(auto & kv: m_archiveQueuesAndRequests){ + for(auto & kv: m_retrieveQueuesAndRequests){ if(!kv.second.empty()){ - queueArchiveRequests(std::get<0>(kv.first),std::get<1>(kv.first),kv.second,lc); + queueRetrieveRequests(std::get<0>(kv.first),std::get<1>(kv.first),kv.second,lc); + m_retrieveQueuesAndRequests.erase(kv.first); return true; } } return false; } -bool Sorter::flushOneRetrieve(log::LogContext &lc){ - return true; -} - -Sorter::MapArchive Sorter::getAllArchive(){ - return m_archiveQueuesAndRequests; -} - Sorter::MapRetrieve Sorter::getAllRetrieve(){ + threading::MutexLocker mapLocker(m_mutex); return m_retrieveQueuesAndRequests; } -void Sorter::queueArchiveRequests(const std::string tapePool, const JobQueueType jobQueueType, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& archiveJobInfos, log::LogContext &lc){ - - /* double queueLockFetchTime=0; - double queueProcessAndCommitTime=0; - double requestsUpdatePreparationTime=0; - double requestsUpdatingTime = 0; - utils::Timer t;*/ - /*uint64_t filesBefore=0; - uint64_t bytesBefore=0;*/ - - /*ArchiveQueue aq(m_objectstore); - ScopedExclusiveLock rql; - Helpers::getLockedAndFetchedJobQueue<ArchiveQueue>(aq,rql, m_agentReference, tapePool, jobQueueType, lc); - queueLockFetchTime = t.secs(utils::Timer::resetCounter); - auto jobsSummary=aq.getJobsSummary(); - filesBefore=jobsSummary.jobs; - bytesBefore=jobsSummary.bytes; - // Prepare the list of requests to add to the queue (if needed). - std::list<ArchiveQueue::JobToAdd> jta; - // We have the queue. We will loop on the requests, add them to the list. We will launch their updates - // after committing the queue. - Sorter::ArchiveJob jobToQueue = std::get<0>(archiveJobInfo->jobToQueue); - std::promise<void>& jobPromise = std::get<1>(archiveJobInfo->jobToQueue); - - jta.push_back({jobToQueue.jobDump,jobToQueue.archiveRequest->getAddressIfSet(),jobToQueue.archiveFileId,jobToQueue.fileSize,jobToQueue.mountPolicy,jobToQueue.startTime}); - auto addedJobs = aq.addJobsIfNecessaryAndCommit(jta,m_agentReference,lc); - queueProcessAndCommitTime = t.secs(utils::Timer::resetCounter); - if(!addedJobs.files){ - try{ - throw cta::exception::Exception("In Sorter::queueArchiveRequests, no job have been added with addJobsIfNecessaryAndCommit()."); - } catch (cta::exception::Exception &e){ - jobPromise.set_exception(std::current_exception()); - continue; - } - } - // We will keep individual references for the job update we launch so that we make - // our life easier downstream. - struct ARUpdatedParams { - std::unique_ptr<ArchiveRequest::AsyncJobOwnerUpdater> updater; - std::shared_ptr<ArchiveRequest> archiveRequest; - uint16_t copyNb; - }; - - ARUpdatedParams arUpdaterParams; - arUpdaterParams.archiveRequest = archiveJobInfo->archiveRequest; - arUpdaterParams.copyNb = jobToQueue.jobDump.copyNb; - //Here the new owner is the agentReference of the process that runs the sorter, ArchiveRequest has no owner, the jobs have - arUpdaterParams.updater.reset(archiveJobInfo->archiveRequest->asyncUpdateJobOwner(jobToQueue.jobDump.copyNb,m_agentReference.getAgentAddress(),jobToQueue.jobDump.owner,cta::nullopt)); - - requestsUpdatePreparationTime = t.secs(utils::Timer::resetCounter); - try{ - arUpdaterParams.updater->wait(); - //No problem, the job has been inserted into the queue, log it. - jobPromise.set_value(); - log::ScopedParamContainer params(lc); - params.add("archiveRequestObject", archiveJobInfo->archiveRequest->getAddressIfSet()) - .add("copyNb", arUpdaterParams.copyNb) - .add("fileId",arUpdaterParams.updater->getArchiveFile().archiveFileID) - .add("tapePool",tapePool) - .add("archiveQueueObject",aq.getAddressIfSet()) - .add("previousOwner",jobToQueue.jobDump.owner); - lc.log(log::INFO, "In Sorter::queueArchiveRequests(): queued archive job."); - } catch (cta::exception::Exception &e){ - jobPromise.set_exception(std::current_exception()); - continue; - } - */ - std::string queueAddress; - this->dispatchArchiveAlgorithm(tapePool,jobQueueType,queueAddress,archiveJobInfos,lc); - archiveJobInfos.clear(); - /*requestsUpdatingTime = t.secs(utils::Timer::resetCounter); - { - log::ScopedParamContainer params(lc); - ArchiveQueue aq(queueAddress,m_objectstore); - ScopedExclusiveLock aql(aq); - aq.fetch(); - auto jobsSummary = aq.getJobsSummary(); - params.add("tapePool", tapePool) - .add("archiveQueueObject", aq.getAddressIfSet()) - .add("filesAdded", filesQueued) - .add("bytesAdded", bytesQueued) - .add("filesAddedInitially", filesQueued) - .add("bytesAddedInitially", bytesQueued) - .add("filesDequeuedAfterErrors", filesDequeued) - .add("bytesDequeuedAfterErrors", bytesDequeued) - .add("filesBefore", filesBefore) - .add("bytesBefore", bytesBefore) - .add("filesAfter", jobsSummary.jobs) - .add("bytesAfter", jobsSummary.bytes) - .add("queueLockFetchTime", queueLockFetchTime) - .add("queuePreparationTime", queueProcessAndCommitTime) - .add("requestsUpdatePreparationTime", requestsUpdatePreparationTime) - .add("requestsUpdatingTime", requestsUpdatingTime); - //.add("queueRecommitTime", queueRecommitTime); - lc.log(log::INFO, "In Sorter::queueArchiveRequests(): " - "Queued an archiveRequest"); - }*/ +void Sorter::queueRetrieveRequests(const std::string vid, const JobQueueType jobQueueType, std::list<std::shared_ptr<RetrieveJobQueueInfo>>& retrieveJobsInfos, log::LogContext &lc){ + std::string queueAddress; + this->dispatchRetrieveAlgorithm(vid,jobQueueType,queueAddress,retrieveJobsInfos,lc); } -void Sorter::queueRetrieveRequests(const std::string vid, const JobQueueType jobQueueType, std::list<std::shared_ptr<RetrieveJobQueueInfo>>& retrieveJobsInfo, log::LogContext &lc){ - /*for(auto& retrieveJobInfo: retrieveJobsInfo){ - double queueLockFetchTime=0; - double queueProcessAndCommitTime=0; - //double requestsUpdatePreparationTime=0; - //double requestsUpdatingTime = 0; - utils::Timer t; - uint64_t filesBefore=0; - uint64_t bytesBefore=0; - - RetrieveQueue rq(m_objectstore); - ScopedExclusiveLock rql; - Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(rq,rql,m_agentReference,vid,jobQueueType,lc); - queueLockFetchTime = t.secs(utils::Timer::resetCounter); - auto jobsSummary=rq.getJobsSummary(); - filesBefore=jobsSummary.jobs; - bytesBefore=jobsSummary.bytes; - - Sorter::RetrieveJob jobToQueue = std::get<0>(retrieveJobInfo->jobToQueue); - std::promise<void>& jobPromise = std::get<1>(retrieveJobInfo->jobToQueue); - - std::list<RetrieveQueue::JobToAdd> jta; - jta.push_back({jobToQueue.jobDump.copyNb,jobToQueue.fSeq,jobToQueue.retrieveRequest->getAddressIfSet(),jobToQueue.fileSize,jobToQueue.mountPolicy,jobToQueue.startTime}); - - auto addedJobs = rq.addJobsIfNecessaryAndCommit(jta, m_agentReference, lc); - queueProcessAndCommitTime = t.secs(utils::Timer::resetCounter); - - if(!addedJobs.files){ - throw cta::exception::Exception("In Sorter::queueRetrieveRequests(), failed of adding a job to the retrieve queue through addJobsIfNecessaryAndCommit()"); - } - - // We will keep individual references for each job update we launch so that we make - // our life easier downstream. - struct RRUpdatedParams { - std::unique_ptr<RetrieveRequest::AsyncJobOwnerUpdater> updater; - std::shared_ptr<RetrieveRequest> retrieveRequest; - uint64_t copyNb; - }; - - { - std::list<RRUpdatedParams> rrUpdatersParams; - - } - - if(queueLockFetchTime && filesBefore && bytesBefore &&queueProcessAndCommitTime){} - - }*/ -} +/* End of Retrieve related algorithms */ }} diff --git a/objectstore/Sorter.hpp b/objectstore/Sorter.hpp index 51f8904dc802518b3f348783d8c0c9c8d092edf7..12cf302435465d5253f391431a52e4eec173bb0a 100644 --- a/objectstore/Sorter.hpp +++ b/objectstore/Sorter.hpp @@ -35,53 +35,53 @@ #include "ArchiveQueue.hpp" #include "Algorithms.hpp" #include "ArchiveQueueAlgorithms.hpp" +#include "RetrieveQueueAlgorithms.hpp" namespace cta { namespace objectstore { +//forward declarations struct ArchiveJobQueueInfo; struct RetrieveJobQueueInfo; class Sorter { public: CTA_GENERATE_EXCEPTION_CLASS(RetrieveRequestHasNoCopies); + Sorter(AgentReference& agentReference,Backend &objectstore, catalogue::Catalogue& catalogue); virtual ~Sorter(); - void insertArchiveJob(std::shared_ptr<ArchiveRequest> archiveRequest, AgentReferenceInterface &previousOwner, ArchiveRequest::JobDump& jobToInsert,log::LogContext & lc); - /** - * - * @param retrieveRequest - * @param lc - * @throws TODO : explain what is the exception thrown by this method - */ - void insertRetrieveRequest(std::shared_ptr<RetrieveRequest> retrieveRequest, log::LogContext & lc); + typedef std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr<ArchiveJobQueueInfo>>> MapArchive; typedef std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr<RetrieveJobQueueInfo>>> MapRetrieve; - bool flushOneRetrieve(log::LogContext &lc); - bool flushOneArchive(log::LogContext &lc); - MapArchive getAllArchive(); - MapRetrieve getAllRetrieve(); struct ArchiveJob{ std::shared_ptr<ArchiveRequest> archiveRequest; ArchiveRequest::JobDump jobDump; common::dataStructures::ArchiveFile archiveFile; AgentReferenceInterface * previousOwner; - uint64_t archiveFileId; - time_t startTime; - uint64_t fileSize; common::dataStructures::MountPolicy mountPolicy; }; + /* Archive-related methods */ + void insertArchiveRequest(std::shared_ptr<ArchiveRequest> archiveRequest, AgentReferenceInterface& previousOwner, log::LogContext& lc); + bool flushOneArchive(log::LogContext &lc); + MapArchive getAllArchive(); + /* End of Archive-related methods */ + struct RetrieveJob{ std::shared_ptr<RetrieveRequest> retrieveRequest; RetrieveRequest::JobDump jobDump; - uint64_t archiveFileId; - time_t startTime; + AgentReferenceInterface * previousOwner; uint64_t fileSize; uint64_t fSeq; common::dataStructures::MountPolicy mountPolicy; }; + /* Retrieve-related methods */ + void insertRetrieveRequest(std::shared_ptr<RetrieveRequest> retrieveRequest, AgentReferenceInterface &previousOwner, cta::optional<uint32_t> copyNb, log::LogContext & lc); + bool flushOneRetrieve(log::LogContext &lc); + MapRetrieve getAllRetrieve(); + /* End of Retrieve-related methods */ + private: AgentReference &m_agentReference; Backend &m_objectstore; @@ -90,13 +90,26 @@ private: MapRetrieve m_retrieveQueuesAndRequests; threading::Mutex m_mutex; const unsigned int c_maxBatchSize = 500; - std::set<std::string> getCandidateVids(RetrieveRequest &request); + + /* Retrieve-related methods */ + std::set<std::string> getCandidateVidsToTransfer(RetrieveRequest &request); std::string getBestVidForQueueingRetrieveRequest(RetrieveRequest& retrieveRequest, std::set<std::string>& candidateVids, log::LogContext &lc); - void queueArchiveRequests(const std::string tapePool, const JobQueueType jobQueueType, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& requests, log::LogContext &lc); void queueRetrieveRequests(const std::string vid, const JobQueueType jobQueueType, std::list<std::shared_ptr<RetrieveJobQueueInfo>>& archiveJobInfos, log::LogContext &lc); - void dispatchArchiveAlgorithm(const std::string tapePool, const JobQueueType jobQueueType, std::string& queueAddress, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& archiveJobInfos, log::LogContext &lc); + void dispatchRetrieveAlgorithm(const std::string vid, const JobQueueType jobQueueType, std::string& queueAddress, std::list<std::shared_ptr<RetrieveJobQueueInfo>>& retrieveJobsInfos, log::LogContext &lc); + Sorter::RetrieveJob createRetrieveJob(std::shared_ptr<RetrieveRequest> retrieveRequest, const cta::common::dataStructures::ArchiveFile archiveFile, const uint32_t copyNb, const uint64_t fSeq, AgentReferenceInterface *previousOwner); + + template<typename SpecificQueue> + void executeRetrieveAlgorithm(const std::string vid, std::string& queueAddress, std::list<std::shared_ptr<RetrieveJobQueueInfo>>& jobs, log::LogContext& lc); + /* End of Retrieve-related methods */ + + /* Archive-related methods */ + void queueArchiveRequests(const std::string tapePool, const JobQueueType jobQueueType, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& requests, log::LogContext &lc); + void insertArchiveJob(std::shared_ptr<ArchiveRequest> archiveRequest, AgentReferenceInterface &previousOwner, ArchiveRequest::JobDump& jobToInsert,log::LogContext & lc); + void dispatchArchiveAlgorithm(const std::string tapePool, const JobQueueType jobQueueType, std::string& queueAddress, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& archiveJobsInfos, log::LogContext &lc); + template<typename SpecificQueue> void executeArchiveAlgorithm(const std::string tapePool, std::string& queueAddress, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& jobs, log::LogContext& lc); + /* End of Archive-related methods */ }; struct ArchiveJobQueueInfo{ diff --git a/objectstore/SorterTest.cpp b/objectstore/SorterTest.cpp index 48d56f37f0e27f413d5ee0b435a8d83fdb67085c..7780327bb0957db1183782753e384b9a852ab249 100644 --- a/objectstore/SorterTest.cpp +++ b/objectstore/SorterTest.cpp @@ -40,155 +40,353 @@ namespace unitTests { - TEST(ObjectStore,SorterInsertArchiveRequest){ - cta::log::StdoutLogger dl("dummy", "unitTest"); - //cta::log::DummyLogger dl("dummy", "unitTest"); - cta::log::LogContext lc(dl); - // We need a dummy catalogue - cta::catalogue::DummyCatalogue catalogue; - // Here we check that can successfully call ArchiveRequests's garbage collector - cta::objectstore::BackendVFS be; - // Create the root entry - cta::objectstore::RootEntry re(be); - re.initialize(); - re.insert(); - // Create the agent register - cta::objectstore::EntryLogSerDeser el("user0", - "unittesthost", time(NULL)); - cta::objectstore::ScopedExclusiveLock rel(re); - // Create the agent for objects creation - cta::objectstore::AgentReference agentRef("unitTestCreateEnv", dl); - // Finish root creation. - re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc); - rel.release(); - // continue agent creation. - cta::objectstore::Agent agent(agentRef.getAgentAddress(), be); - agent.initialize(); - agent.setTimeout_us(0); - agent.insertAndRegisterSelf(lc); - - //Create the agent of the Sorter - cta::objectstore::AgentReference agentRefSorter("agentRefSorter", dl); - cta::objectstore::Agent agentSorter(agentRefSorter.getAgentAddress(), be); - agentSorter.initialize(); - agentSorter.setTimeout_us(0); - agentSorter.insertAndRegisterSelf(lc); - - std::string archiveRequestID = agentRef.nextId("ArchiveRequest"); - agentRef.addToOwnership(archiveRequestID,be); - cta::objectstore::ArchiveRequest ar(archiveRequestID,be); - ar.initialize(); - cta::common::dataStructures::ArchiveFile aFile; - aFile.archiveFileID = 123456789L; - aFile.diskFileId = "eos://diskFile"; - aFile.checksumType = "checksumType"; - aFile.checksumValue = "checksumValue"; - aFile.creationTime = 0; - aFile.reconciliationTime = 0; - aFile.diskFileInfo = cta::common::dataStructures::DiskFileInfo(); - aFile.diskInstance = "eoseos"; - aFile.fileSize = 667; - aFile.storageClass = "sc"; - ar.setArchiveFile(aFile); - ar.addJob(1, "TapePool0", agentRef.getAgentAddress(), 1, 1, 1); - ar.addJob(2, "TapePool1", agentRef.getAgentAddress(), 1, 1, 1); - ar.addJob(3,"TapePool0",agentRef.getAgentAddress(),1,1,1); - ar.setJobStatus(1,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToReportForTransfer); - ar.setJobStatus(3,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToReportForTransfer); - cta::common::dataStructures::MountPolicy mp; - ar.setMountPolicy(mp); - ar.setArchiveReportURL(""); - ar.setArchiveErrorReportURL(""); - ar.setRequester(cta::common::dataStructures::UserIdentity("user0", "group0")); - ar.setSrcURL("root://eoseos/myFile"); - ar.setEntryLog(cta::common::dataStructures::EntryLog("user0", "host0", time(nullptr))); - ar.insert(); - cta::objectstore::ScopedExclusiveLock atfrl(ar); - ar.fetch(); - auto jobs = ar.dumpJobs(); - cta::objectstore::Sorter sorter(agentRefSorter,be,catalogue); - std::shared_ptr<cta::objectstore::ArchiveRequest> arPtr = std::make_shared<cta::objectstore::ArchiveRequest>(ar); - for(auto& j: jobs){ - sorter.insertArchiveJob(arPtr,agentRef,j,lc); +TEST(ObjectStore,SorterInsertArchiveRequest){ + cta::log::StdoutLogger dl("dummy", "unitTest"); + //cta::log::DummyLogger dl("dummy", "unitTest"); + cta::log::LogContext lc(dl); + // We need a dummy catalogue + cta::catalogue::DummyCatalogue catalogue; + // Here we check that can successfully call ArchiveRequests's garbage collector + cta::objectstore::BackendVFS be; + // Create the root entry + cta::objectstore::RootEntry re(be); + re.initialize(); + re.insert(); + // Create the agent register + cta::objectstore::EntryLogSerDeser el("user0", + "unittesthost", time(NULL)); + cta::objectstore::ScopedExclusiveLock rel(re); + // Create the agent for objects creation + cta::objectstore::AgentReference agentRef("unitTestCreateEnv", dl); + // Finish root creation. + re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc); + rel.release(); + // continue agent creation. + cta::objectstore::Agent agent(agentRef.getAgentAddress(), be); + agent.initialize(); + agent.setTimeout_us(0); + agent.insertAndRegisterSelf(lc); + + //Create the agent of the Sorter + cta::objectstore::AgentReference agentRefSorter("agentRefSorter", dl); + cta::objectstore::Agent agentSorter(agentRefSorter.getAgentAddress(), be); + agentSorter.initialize(); + agentSorter.setTimeout_us(0); + agentSorter.insertAndRegisterSelf(lc); + + std::string archiveRequestID = agentRef.nextId("ArchiveRequest"); + agentRef.addToOwnership(archiveRequestID,be); + cta::objectstore::ArchiveRequest ar(archiveRequestID,be); + ar.initialize(); + cta::common::dataStructures::ArchiveFile aFile; + aFile.archiveFileID = 123456789L; + aFile.diskFileId = "eos://diskFile"; + aFile.checksumType = "checksumType"; + aFile.checksumValue = "checksumValue"; + aFile.creationTime = 0; + aFile.reconciliationTime = 0; + aFile.diskFileInfo = cta::common::dataStructures::DiskFileInfo(); + aFile.diskInstance = "eoseos"; + aFile.fileSize = 667; + aFile.storageClass = "sc"; + ar.setArchiveFile(aFile); + ar.addJob(1, "TapePool0", agentRef.getAgentAddress(), 1, 1, 1); + ar.addJob(2, "TapePool1", agentRef.getAgentAddress(), 1, 1, 1); + ar.addJob(3,"TapePool0",agentRef.getAgentAddress(),1,1,1); + ar.setJobStatus(1,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToReportForTransfer); + ar.setJobStatus(3,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToReportForTransfer); + cta::common::dataStructures::MountPolicy mp; + ar.setMountPolicy(mp); + ar.setArchiveReportURL(""); + ar.setArchiveErrorReportURL(""); + ar.setRequester(cta::common::dataStructures::UserIdentity("user0", "group0")); + ar.setSrcURL("root://eoseos/myFile"); + ar.setEntryLog(cta::common::dataStructures::EntryLog("user0", "host0", time(nullptr))); + ar.insert(); + cta::objectstore::ScopedExclusiveLock atfrl(ar); + ar.fetch(); + auto jobs = ar.dumpJobs(); + cta::objectstore::Sorter sorter(agentRefSorter,be,catalogue); + std::shared_ptr<cta::objectstore::ArchiveRequest> arPtr = std::make_shared<cta::objectstore::ArchiveRequest>(ar); + sorter.insertArchiveRequest(arPtr,agentRef,lc); + atfrl.release(); + //Get the future + cta::objectstore::Sorter::MapArchive allArchiveJobs = sorter.getAllArchive(); + std::list<std::tuple<cta::objectstore::Sorter::ArchiveJob,std::future<void>>> allFutures; + + for(auto& kv: allArchiveJobs){ + for(auto& job: kv.second){ + allFutures.emplace_back(std::make_tuple(std::get<0>(job->jobToQueue),std::get<1>(job->jobToQueue).get_future())); + } + } + sorter.flushOneArchive(lc); + sorter.flushOneArchive(lc); + for(auto& future: allFutures){ + ASSERT_NO_THROW(std::get<1>(future).get()); + } + + cta::objectstore::ScopedExclusiveLock sel(re); + re.fetch(); + + { + //Get the archiveQueueToReport + std::string archiveQueueToReport = re.getArchiveQueueAddress("TapePool0",cta::objectstore::JobQueueType::JobsToReportToUser); + cta::objectstore::ArchiveQueue aq(archiveQueueToReport,be); + + //Fetch the queue so that we can get the archiveRequests from it + cta::objectstore::ScopedExclusiveLock aql(aq); + aq.fetch(); + ASSERT_EQ(aq.dumpJobs().size(),2); + for(auto &job: aq.dumpJobs()){ + ASSERT_TRUE(job.copyNb == 1 || job.copyNb == 3); + ASSERT_EQ(job.size,667); + cta::objectstore::ArchiveRequest archiveRequest(job.address,be); + archiveRequest.fetchNoLock(); + cta::common::dataStructures::ArchiveFile archiveFile = archiveRequest.getArchiveFile(); + + ASSERT_EQ(archiveFile.archiveFileID,aFile.archiveFileID); + + ASSERT_EQ(archiveFile.diskFileId,aFile.diskFileId); + ASSERT_EQ(archiveFile.checksumType,aFile.checksumType); + ASSERT_EQ(archiveFile.checksumValue,aFile.checksumValue); + ASSERT_EQ(archiveFile.creationTime,aFile.creationTime); + ASSERT_EQ(archiveFile.reconciliationTime,aFile.reconciliationTime); + ASSERT_EQ(archiveFile.diskFileInfo,aFile.diskFileInfo); + ASSERT_EQ(archiveFile.fileSize,aFile.fileSize); + ASSERT_EQ(archiveFile.storageClass,aFile.storageClass); + } + } + + { + //Get the archiveQueueToTransfer + std::string archiveQueueToTransfer = re.getArchiveQueueAddress("TapePool1",cta::objectstore::JobQueueType::JobsToTransfer); + cta::objectstore::ArchiveQueue aq(archiveQueueToTransfer,be); + + //Fetch the queue so that we can get the archiveRequests from it + cta::objectstore::ScopedExclusiveLock aql(aq); + aq.fetch(); + ASSERT_EQ(aq.dumpJobs().size(),1); + ASSERT_EQ(aq.getTapePool(),"TapePool1"); + for(auto &job: aq.dumpJobs()){ + ASSERT_EQ(job.copyNb,2); + ASSERT_EQ(job.size,667); + cta::objectstore::ArchiveRequest archiveRequest(job.address,be); + archiveRequest.fetchNoLock(); + cta::common::dataStructures::ArchiveFile archiveFile = archiveRequest.getArchiveFile(); + + ASSERT_EQ(archiveFile.archiveFileID,aFile.archiveFileID); + + ASSERT_EQ(archiveFile.diskFileId,aFile.diskFileId); + ASSERT_EQ(archiveFile.checksumType,aFile.checksumType); + ASSERT_EQ(archiveFile.checksumValue,aFile.checksumValue); + ASSERT_EQ(archiveFile.creationTime,aFile.creationTime); + ASSERT_EQ(archiveFile.reconciliationTime,aFile.reconciliationTime); + ASSERT_EQ(archiveFile.diskFileInfo,aFile.diskFileInfo); + ASSERT_EQ(archiveFile.fileSize,aFile.fileSize); + ASSERT_EQ(archiveFile.storageClass,aFile.storageClass); } - atfrl.release(); - //Get the future - cta::objectstore::Sorter::MapArchive allArchiveJobs = sorter.getAllArchive(); - std::list<std::tuple<cta::objectstore::Sorter::ArchiveJob,std::future<void>>> allFutures; - for(auto& kv: allArchiveJobs){ + } + + ASSERT_EQ(sorter.getAllArchive().size(),0); +} + +TEST(ObjectStore,SorterInsertRetrieveRequest){ + + using namespace cta::objectstore; + + cta::log::StdoutLogger dl("dummy", "unitTest"); + //cta::log::DummyLogger dl("dummy", "unitTest"); + cta::log::LogContext lc(dl); + // We need a dummy catalogue + cta::catalogue::DummyCatalogue catalogue; + // Here we check that can successfully call ArchiveRequests's garbage collector + cta::objectstore::BackendVFS be; + // Create the root entry + cta::objectstore::RootEntry re(be); + re.initialize(); + re.insert(); + // Create the agent register + cta::objectstore::EntryLogSerDeser el("user0", + "unittesthost", time(NULL)); + cta::objectstore::ScopedExclusiveLock rel(re); + // Create the agent for objects creation + cta::objectstore::AgentReference agentRef("unitTestCreateEnv", dl); + // Finish root creation. + re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc); + rel.release(); + // continue agent creation. + cta::objectstore::Agent agent(agentRef.getAgentAddress(), be); + agent.initialize(); + agent.setTimeout_us(0); + agent.insertAndRegisterSelf(lc); + + //Create the agent of the Sorter + cta::objectstore::AgentReference agentRefSorter("agentRefSorter", dl); + cta::objectstore::Agent agentSorter(agentRefSorter.getAgentAddress(), be); + agentSorter.initialize(); + agentSorter.setTimeout_us(0); + agentSorter.insertAndRegisterSelf(lc); + + std::string retrieveRequestID = agentRef.nextId("RetrieveRequest"); + agentRef.addToOwnership(retrieveRequestID,be); + cta::objectstore::RetrieveRequest rr(retrieveRequestID,be); + rr.initialize(); + cta::common::dataStructures::RetrieveFileQueueCriteria rqc; + rqc.archiveFile.archiveFileID = 123456789L; + rqc.archiveFile.diskFileId = "eos://diskFile"; + rqc.archiveFile.checksumType = ""; + rqc.archiveFile.checksumValue = ""; + rqc.archiveFile.creationTime = 0; + rqc.archiveFile.reconciliationTime = 0; + rqc.archiveFile.diskFileInfo = cta::common::dataStructures::DiskFileInfo(); + rqc.archiveFile.diskInstance = "eoseos"; + rqc.archiveFile.fileSize = 1000; + rqc.archiveFile.storageClass = "sc"; + rqc.archiveFile.tapeFiles[1].blockId=0; + rqc.archiveFile.tapeFiles[1].compressedSize=1; + rqc.archiveFile.tapeFiles[1].compressedSize=1; + rqc.archiveFile.tapeFiles[1].copyNb=1; + rqc.archiveFile.tapeFiles[1].creationTime=time(nullptr); + rqc.archiveFile.tapeFiles[1].fSeq=2; + rqc.archiveFile.tapeFiles[1].vid="Tape0"; + rqc.archiveFile.tapeFiles[2].blockId=0; + rqc.archiveFile.tapeFiles[2].compressedSize=2; + rqc.archiveFile.tapeFiles[2].compressedSize=2; + rqc.archiveFile.tapeFiles[2].copyNb=2; + rqc.archiveFile.tapeFiles[2].creationTime=time(nullptr); + rqc.archiveFile.tapeFiles[2].fSeq=2; + rqc.archiveFile.tapeFiles[2].vid="Tape1"; + rqc.mountPolicy.archiveMinRequestAge = 1; + rqc.mountPolicy.archivePriority = 1; + rqc.mountPolicy.creationLog.time = time(nullptr); + rqc.mountPolicy.lastModificationLog.time = time(nullptr); + rqc.mountPolicy.maxDrivesAllowed = 1; + rqc.mountPolicy.retrieveMinRequestAge = 1; + rqc.mountPolicy.retrievePriority = 1; + rr.setRetrieveFileQueueCriteria(rqc); + + rr.setJobStatus(2,cta::objectstore::serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess); + + cta::common::dataStructures::RetrieveRequest sReq; + sReq.archiveFileID = rqc.archiveFile.archiveFileID; + sReq.creationLog.time=time(nullptr); + rr.setSchedulerRequest(sReq); + + rr.setOwner(agent.getAddressIfSet()); + rr.setActiveCopyNumber(0); + rr.insert(); + + cta::objectstore::Sorter sorter(agentRefSorter,be,catalogue); + std::shared_ptr<cta::objectstore::RetrieveRequest> retrieveRequest = std::make_shared<cta::objectstore::RetrieveRequest>(rr); + + { + cta::objectstore::ScopedExclusiveLock rrl(*retrieveRequest); + retrieveRequest->fetch(); + + sorter.insertRetrieveRequest(retrieveRequest,agentRef,cta::nullopt,lc); + rrl.release(); + cta::objectstore::Sorter::MapRetrieve allRetrieveJobs = sorter.getAllRetrieve(); + std::list<std::tuple<cta::objectstore::Sorter::RetrieveJob,std::future<void>>> allFutures; + for(auto& kv: allRetrieveJobs){ for(auto& job: kv.second){ allFutures.emplace_back(std::make_tuple(std::get<0>(job->jobToQueue),std::get<1>(job->jobToQueue).get_future())); } } - sorter.flushOneArchive(lc); - sorter.flushOneArchive(lc); + sorter.flushOneRetrieve(lc); + sorter.flushOneRetrieve(lc); for(auto& future: allFutures){ ASSERT_NO_THROW(std::get<1>(future).get()); } - - cta::objectstore::ScopedExclusiveLock sel(re); - re.fetch(); - - { - //Get the archiveQueueToReport - std::string archiveQueueToReport = re.getArchiveQueueAddress("TapePool0",cta::objectstore::JobQueueType::JobsToReportToUser); - cta::objectstore::ArchiveQueue aq(archiveQueueToReport,be); - - //Fetch the queue so that we can get the archiveRequests from it - cta::objectstore::ScopedExclusiveLock aql(aq); - aq.fetch(); - ASSERT_EQ(aq.dumpJobs().size(),2); - for(auto &job: aq.dumpJobs()){ - ASSERT_TRUE(job.copyNb == 1 || job.copyNb == 3); - ASSERT_EQ(job.size,667); - cta::objectstore::ArchiveRequest archiveRequest(job.address,be); - archiveRequest.fetchNoLock(); - cta::common::dataStructures::ArchiveFile archiveFile = archiveRequest.getArchiveFile(); - - ASSERT_EQ(archiveFile.archiveFileID,aFile.archiveFileID); - - ASSERT_EQ(archiveFile.diskFileId,aFile.diskFileId); - ASSERT_EQ(archiveFile.checksumType,aFile.checksumType); - ASSERT_EQ(archiveFile.checksumValue,aFile.checksumValue); - ASSERT_EQ(archiveFile.creationTime,aFile.creationTime); - ASSERT_EQ(archiveFile.reconciliationTime,aFile.reconciliationTime); - ASSERT_EQ(archiveFile.diskFileInfo,aFile.diskFileInfo); - ASSERT_EQ(archiveFile.fileSize,aFile.fileSize); - ASSERT_EQ(archiveFile.storageClass,aFile.storageClass); + + allFutures.clear(); + + typedef ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer> Algo; + Algo algo(be,agentRef); + + typename Algo::PopCriteria criteria; + criteria.files = 1; + criteria.bytes = 1000; + typename Algo::PoppedElementsBatch elements = algo.popNextBatch("Tape0",criteria,lc); + ASSERT_EQ(elements.elements.size(),1); + + auto& elt = elements.elements.front(); + + cta::common::dataStructures::ArchiveFile aFile = elt.archiveFile; + ASSERT_EQ(aFile.archiveFileID,rqc.archiveFile.archiveFileID); + ASSERT_EQ(aFile.diskFileId,rqc.archiveFile.diskFileId); + ASSERT_EQ(aFile.checksumType,rqc.archiveFile.checksumType); + ASSERT_EQ(aFile.checksumValue,rqc.archiveFile.checksumValue); + ASSERT_EQ(aFile.creationTime,rqc.archiveFile.creationTime); + ASSERT_EQ(aFile.reconciliationTime,rqc.archiveFile.reconciliationTime); + ASSERT_EQ(aFile.diskFileInfo,rqc.archiveFile.diskFileInfo); + ASSERT_EQ(aFile.fileSize,rqc.archiveFile.fileSize); + ASSERT_EQ(aFile.storageClass,rqc.archiveFile.storageClass); + ASSERT_EQ(elt.copyNb,1); + ASSERT_EQ(elt.bytes,1000); + ASSERT_EQ(elt.reportType,cta::SchedulerDatabase::RetrieveJob::ReportType::NoReportRequired); + ASSERT_EQ(elt.rr.archiveFileID,aFile.archiveFileID); + } + { + ScopedExclusiveLock sel(*retrieveRequest); + retrieveRequest->fetch(); + + sorter.insertRetrieveRequest(retrieveRequest,agentRef,cta::optional<uint32_t>(2),lc); + + sel.release(); + + cta::objectstore::Sorter::MapRetrieve allRetrieveJobs = sorter.getAllRetrieve(); + std::list<std::tuple<cta::objectstore::Sorter::RetrieveJob,std::future<void>>> allFutures; + ASSERT_EQ(allRetrieveJobs.size(),1); + for(auto& kv: allRetrieveJobs){ + for(auto& job: kv.second){ + allFutures.emplace_back(std::make_tuple(std::get<0>(job->jobToQueue),std::get<1>(job->jobToQueue).get_future())); } } - - { - //Get the archiveQueueToTransfer - std::string archiveQueueToTransfer = re.getArchiveQueueAddress("TapePool1",cta::objectstore::JobQueueType::JobsToTransfer); - cta::objectstore::ArchiveQueue aq(archiveQueueToTransfer,be); - - //Fetch the queue so that we can get the archiveRequests from it - cta::objectstore::ScopedExclusiveLock aql(aq); - aq.fetch(); - ASSERT_EQ(aq.dumpJobs().size(),1); - ASSERT_EQ(aq.getTapePool(),"TapePool1"); - for(auto &job: aq.dumpJobs()){ - ASSERT_EQ(job.copyNb,2); - ASSERT_EQ(job.size,667); - cta::objectstore::ArchiveRequest archiveRequest(job.address,be); - archiveRequest.fetchNoLock(); - cta::common::dataStructures::ArchiveFile archiveFile = archiveRequest.getArchiveFile(); - - ASSERT_EQ(archiveFile.archiveFileID,aFile.archiveFileID); - - ASSERT_EQ(archiveFile.diskFileId,aFile.diskFileId); - ASSERT_EQ(archiveFile.checksumType,aFile.checksumType); - ASSERT_EQ(archiveFile.checksumValue,aFile.checksumValue); - ASSERT_EQ(archiveFile.creationTime,aFile.creationTime); - ASSERT_EQ(archiveFile.reconciliationTime,aFile.reconciliationTime); - ASSERT_EQ(archiveFile.diskFileInfo,aFile.diskFileInfo); - ASSERT_EQ(archiveFile.fileSize,aFile.fileSize); - ASSERT_EQ(archiveFile.storageClass,aFile.storageClass); - } + sorter.flushOneRetrieve(lc); + sorter.flushOneRetrieve(lc); + for(auto& future: allFutures){ + ASSERT_NO_THROW(std::get<1>(future).get()); } + + ASSERT_EQ(sorter.getAllRetrieve().size(),0); + typedef ContainerAlgorithms<RetrieveQueue,RetrieveQueueToReportToRepackForSuccess> Algo; + Algo algo(be,agentRef); + + typename Algo::PopCriteria criteria; + criteria.files = 1; + typename Algo::PoppedElementsBatch elements = algo.popNextBatch("Tape1",criteria,lc); + ASSERT_EQ(elements.elements.size(),1); + auto& elt = elements.elements.front(); + + cta::common::dataStructures::ArchiveFile aFile = elt.archiveFile; + ASSERT_EQ(aFile.archiveFileID,rqc.archiveFile.archiveFileID); + ASSERT_EQ(aFile.diskFileId,rqc.archiveFile.diskFileId); + ASSERT_EQ(aFile.checksumType,rqc.archiveFile.checksumType); + ASSERT_EQ(aFile.checksumValue,rqc.archiveFile.checksumValue); + ASSERT_EQ(aFile.creationTime,rqc.archiveFile.creationTime); + ASSERT_EQ(aFile.reconciliationTime,rqc.archiveFile.reconciliationTime); + ASSERT_EQ(aFile.diskFileInfo,rqc.archiveFile.diskFileInfo); + ASSERT_EQ(aFile.fileSize,rqc.archiveFile.fileSize); + ASSERT_EQ(aFile.storageClass,rqc.archiveFile.storageClass); + ASSERT_EQ(elt.copyNb,2); + ASSERT_EQ(elt.archiveFile.tapeFiles[2].compressedSize,2); + ASSERT_EQ(elt.bytes,1000); + ASSERT_EQ(elt.reportType,cta::SchedulerDatabase::RetrieveJob::ReportType::NoReportRequired); + ASSERT_EQ(elt.rr.archiveFileID,aFile.archiveFileID); } - + { + ScopedExclusiveLock sel(*retrieveRequest); + retrieveRequest->fetch(); + + ASSERT_THROW(sorter.insertRetrieveRequest(retrieveRequest,agentRef,cta::optional<uint32_t>(4),lc),cta::exception::Exception); + + retrieveRequest->setJobStatus(1,serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess); + + ASSERT_THROW(sorter.insertRetrieveRequest(retrieveRequest,agentRef,cta::nullopt,lc),cta::exception::Exception); + + sel.release(); + } +} } diff --git a/objectstore/cta.proto b/objectstore/cta.proto index 1cc97d906259ad79010881a2f1e7ae479fa4f4f3..e9249f6ce90f3d22d1db5fafd18b9c8357cf45b5 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -173,7 +173,7 @@ message TapeFile { required uint64 fseq = 9121; required uint64 blockid = 9122; required uint64 compressedsize = 9123; - required uint64 copynb = 9124; + required uint32 copynb = 9124; required uint64 creationtime = 9125; required string checksumtype = 9126; required string checksumvalue = 9127; @@ -347,7 +347,7 @@ message SchedulerRetrieveRequest { } message RetrieveJob { - required uint64 copynb = 9200; + required uint32 copynb = 9200; required uint32 maxtotalretries = 9201; required uint32 maxretrieswithinmount = 9202; required uint32 retrieswithinmount = 9203; diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 9aa4e3296a84c5b218e9cd6810a0970d6adf03cd..2c6831d5a255745eca20a8d6df062a68dc9e0ffd 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -211,7 +211,7 @@ public: const std::string vo = "vo"; const bool tapePoolEncryption = false; catalogue.createTapePool(s_adminOnAdminHost, s_tapePoolName, vo, nbPartialTapes, tapePoolEncryption, tapePoolComment); - const uint16_t copyNb = 1; + const uint32_t copyNb = 1; const std::string archiveRouteComment = "Archive-route comment"; catalogue.createArchiveRoute(s_adminOnAdminHost, s_diskInstance, s_storageClassName, copyNb, s_tapePoolName, archiveRouteComment); @@ -1559,7 +1559,7 @@ TEST_P(SchedulerTest, expandRepackRequest) { //Create the retrieve request from the address of the job and the current backend cta::objectstore::RetrieveRequest retrieveRequest(job.address,schedulerDB.getBackend()); retrieveRequest.fetchNoLock(); - uint64_t copyNb = job.copyNb; + uint32_t copyNb = job.copyNb; common::dataStructures::TapeFile tapeFile = retrieveRequest.getArchiveFile().tapeFiles[copyNb]; common::dataStructures::RetrieveRequest schedulerRetrieveRequest = retrieveRequest.getSchedulerRequest(); common::dataStructures::ArchiveFile archiveFile = retrieveRequest.getArchiveFile(); @@ -1615,7 +1615,7 @@ TEST_P(SchedulerTest, expandRepackRequest) { retrieveRequest->fetch(); //initialize variable for the test - uint64_t copyNb = retrieveRequest->getActiveCopyNumber(); + uint32_t copyNb = retrieveRequest->getActiveCopyNumber(); common::dataStructures::ArchiveFile archiveFile = retrieveRequest->getArchiveFile(); common::dataStructures::TapeFile tapeFile = archiveFile.tapeFiles[copyNb]; std::string currentVid = allVid.at(i-1); diff --git a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp index 2f0a88718aff0cdc09e52c5ff7beaa083fd93823..570b550d08ac50c325536088446ca2cb554484ae 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp @@ -297,7 +297,7 @@ public: const bool tapePoolEncryption = false; ASSERT_NO_THROW(catalogue.createTapePool(s_adminOnAdminHost, s_tapePoolName, vo, nbPartialTapes, tapePoolEncryption, tapePoolComment)); - const uint16_t copyNb = 1; + const uint32_t copyNb = 1; const std::string archiveRouteComment = "Archive-route comment"; catalogue.createArchiveRoute(s_adminOnAdminHost, s_diskInstance, s_storageClassName, copyNb, s_tapePoolName, archiveRouteComment);