diff --git a/ReleaseNotes.md b/ReleaseNotes.md index a7b0292a82343a83bcc71dd8c5171545d51f2ced..72f69f78099bb11c506e533d33b54aa0b670ac74 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -1,10 +1,12 @@ -# v4.NEXT +# v4.7.1-1 ## Summary ### Upgrade Instructions ### Features - cta/CTA#1179 - Use std::optional instead of cta::optional - cta/CTA#1190 - Use std::make_unique instead of cta::make_unique +- cta/CTA#1198 - Use hardcoded mount policy for verification requests +- propagate labelFormat from TAPE catalog to VolumeInfo ### Bug fixes ### Building and Packaging diff --git a/catalogue/Catalogue.hpp b/catalogue/Catalogue.hpp index 6ad859d838d0f9946f16991c6d84b78e3e2a2016..36b0ce9ea471d5239047f3742540926fb22aae41 100644 --- a/catalogue/Catalogue.hpp +++ b/catalogue/Catalogue.hpp @@ -255,7 +255,8 @@ public: const uint64_t archiveFileId, const common::dataStructures::RequesterIdentity &user, const std::optional<std::string> & activity, - log::LogContext &lc) = 0; + log::LogContext &lc, + const std::optional<std::string> &mountPolicyName = std::nullopt) = 0; /** * Notifies the CTA catalogue that the specified tape has been mounted in @@ -771,6 +772,14 @@ public: */ virtual std::list<common::dataStructures::MountPolicy> getMountPolicies() const = 0; + /** + * Returns the mount policy with the specified name. + * + * @return the specified mount policy + */ + virtual std::optional<common::dataStructures::MountPolicy> getMountPolicy(const std::string &mountPolicyName) const = 0; + + /** * Returns the cached list of all existing mount policies. * diff --git a/catalogue/CatalogueRetryWrapper.hpp b/catalogue/CatalogueRetryWrapper.hpp index b8d43166b474eccfc3e99fa47905045f20000cc4..540c6764d4cf548731bc01bace533baace65b20c 100644 --- a/catalogue/CatalogueRetryWrapper.hpp +++ b/catalogue/CatalogueRetryWrapper.hpp @@ -87,8 +87,8 @@ public: return retryOnLostConnection(m_log, [&]{return m_catalogue->tapeMountedForArchive(vid, drive);}, m_maxTriesToConnect); } - common::dataStructures::RetrieveFileQueueCriteria prepareToRetrieveFile(const std::string& diskInstanceName, const uint64_t archiveFileId, const common::dataStructures::RequesterIdentity& user, const std::optional<std::string>& activity, log::LogContext& lc) override { - return retryOnLostConnection(m_log, [&]{return m_catalogue->prepareToRetrieveFile(diskInstanceName, archiveFileId, user, activity, lc);}, m_maxTriesToConnect); + common::dataStructures::RetrieveFileQueueCriteria prepareToRetrieveFile(const std::string& diskInstanceName, const uint64_t archiveFileId, const common::dataStructures::RequesterIdentity& user, const std::optional<std::string>& activity, log::LogContext& lc, const std::optional<std::string> &mountPolicyName) override { + return retryOnLostConnection(m_log, [&]{return m_catalogue->prepareToRetrieveFile(diskInstanceName, archiveFileId, user, activity, lc, mountPolicyName);}, m_maxTriesToConnect); } void tapeMountedForRetrieve(const std::string &vid, const std::string &drive) override { @@ -455,6 +455,11 @@ public: return retryOnLostConnection(m_log, [&]{return m_catalogue->getMountPolicies();}, m_maxTriesToConnect); } + std::optional<common::dataStructures::MountPolicy> getMountPolicy(const std::string &mountPolicyName) const override { + return retryOnLostConnection(m_log, [&]{return m_catalogue->getMountPolicy(mountPolicyName);}, m_maxTriesToConnect); + } + + std::list<common::dataStructures::MountPolicy> getCachedMountPolicies() const override { return retryOnLostConnection(m_log, [&]{return m_catalogue->getCachedMountPolicies();}, m_maxTriesToConnect); } diff --git a/catalogue/CatalogueTest.cpp b/catalogue/CatalogueTest.cpp index ca9a23cbc63222b2e891a4295bd2bfb458a00a57..00fc65373210c4092a403ac3059562c803ee9ed2 100644 --- a/catalogue/CatalogueTest.cpp +++ b/catalogue/CatalogueTest.cpp @@ -7500,6 +7500,50 @@ TEST_P(cta_catalogue_CatalogueTest, deleteMountPolicy_non_existent) { ASSERT_THROW(m_catalogue->deleteMountPolicy("non_existent_mount_policy"), exception::UserError); } +TEST_P(cta_catalogue_CatalogueTest, getMountPolicyByName) { + using namespace cta; + + ASSERT_TRUE(m_catalogue->getMountPolicies().empty()); + + catalogue::CreateMountPolicyAttributes mountPolicyToAdd = getMountPolicy1(); + std::string mountPolicyName = mountPolicyToAdd.name; + m_catalogue->createMountPolicy(m_admin, mountPolicyToAdd); + { + const std::optional<common::dataStructures::MountPolicy> mountPolicyOpt = + m_catalogue->getMountPolicy(mountPolicyName); + + ASSERT_TRUE(static_cast<bool>(mountPolicyOpt)); + + const common::dataStructures::MountPolicy mountPolicy = *mountPolicyOpt; + + ASSERT_EQ(mountPolicyName, mountPolicy.name); + + ASSERT_EQ(mountPolicyToAdd.archivePriority, mountPolicy.archivePriority); + ASSERT_EQ(mountPolicyToAdd.minArchiveRequestAge, mountPolicy.archiveMinRequestAge); + + ASSERT_EQ(mountPolicyToAdd.retrievePriority, mountPolicy.retrievePriority); + ASSERT_EQ(mountPolicyToAdd.minRetrieveRequestAge, mountPolicy.retrieveMinRequestAge); + + ASSERT_EQ(mountPolicyToAdd.comment, mountPolicy.comment); + + const common::dataStructures::EntryLog creationLog = mountPolicy.creationLog; + ASSERT_EQ(m_admin.username, creationLog.username); + ASSERT_EQ(m_admin.host, creationLog.host); + + const common::dataStructures::EntryLog lastModificationLog = + mountPolicy.lastModificationLog; + ASSERT_EQ(creationLog, lastModificationLog); + } + + { + //non existant name + const std::optional<common::dataStructures::MountPolicy> mountPolicyOpt = + m_catalogue->getMountPolicy("non existant mount policy"); + ASSERT_FALSE(static_cast<bool>(mountPolicyOpt)); + } +} + + TEST_P(cta_catalogue_CatalogueTest, modifyMountPolicyArchivePriority) { using namespace cta; diff --git a/catalogue/DummyCatalogue.hpp b/catalogue/DummyCatalogue.hpp index 29febf6bc1ae5ea0e9a9f20a4619882a725c08fc..cb57cba49a46552af6a8e7c4a3522ee70eb406fa 100644 --- a/catalogue/DummyCatalogue.hpp +++ b/catalogue/DummyCatalogue.hpp @@ -116,6 +116,7 @@ public: void deleteTapeFileCopy(common::dataStructures::ArchiveFile &file, const std::string &reason) override {throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented");} std::list<TapePool> getTapePools(const TapePoolSearchCriteria &searchCriteria) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } std::optional<TapePool> getTapePool(const std::string &tapePoolName) const override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + std::optional<common::dataStructures::MountPolicy> getMountPolicy(const std::string &mountPolicyName) const override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } std::list<common::dataStructures::Tape> getTapes(const TapeSearchCriteria& searchCriteria) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } // getTapesByVid is implemented below (and works). std::map<std::string, std::string> getVidToLogicalLibrary(const std::set<std::string> &vids) const override { throw exception::Exception(std::string("In ") + __PRETTY_FUNCTION__ + ": not implemented"); } @@ -174,7 +175,7 @@ public: uint64_t checkAndGetNextArchiveFileId(const std::string &diskInstanceName, const std::string &storageClassName, const common::dataStructures::RequesterIdentity &user) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } common::dataStructures::ArchiveFileQueueCriteria getArchiveFileQueueCriteria(const std::string &diskInstanceName, const std::string &storageClassName, const common::dataStructures::RequesterIdentity &user) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } - common::dataStructures::RetrieveFileQueueCriteria prepareToRetrieveFile(const std::string& diskInstanceName, const uint64_t archiveFileId, const common::dataStructures::RequesterIdentity& user, const std::optional<std::string>& activity, log::LogContext& lc) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + common::dataStructures::RetrieveFileQueueCriteria prepareToRetrieveFile(const std::string& diskInstanceName, const uint64_t archiveFileId, const common::dataStructures::RequesterIdentity& user, const std::optional<std::string>& activity, log::LogContext& lc, const std::optional<std::string> &mountPolicyName) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } void reclaimTape(const common::dataStructures::SecurityIdentity& admin, const std::string& vid, cta::log::LogContext & lc) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } void checkTapeForLabel(const std::string& vid) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } uint64_t getNbFilesOnTape(const std::string& vid) const override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } diff --git a/catalogue/RdbmsCatalogue.cpp b/catalogue/RdbmsCatalogue.cpp index 99f6f3ea051c5254532493727f3f3bb95a30b622..7c9ed9410f0052904ded850e10be4c2b04be7f64 100644 --- a/catalogue/RdbmsCatalogue.cpp +++ b/catalogue/RdbmsCatalogue.cpp @@ -6709,6 +6709,85 @@ try { } } +//------------------------------------------------------------------------------ +// getMountPolicy +//------------------------------------------------------------------------------ +std::optional<common::dataStructures::MountPolicy> RdbmsCatalogue::getMountPolicy(const std::string &mountPolicyName) const { + try { + auto conn = m_connPool.getConn(); + return getMountPolicy(conn, mountPolicyName); + } catch(exception::UserError &) { + throw; + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + ": " + ex.getMessage().str()); + throw; + } +} + +//------------------------------------------------------------------------------ +// getMountPolicy +//------------------------------------------------------------------------------ +std::optional<common::dataStructures::MountPolicy> RdbmsCatalogue::getMountPolicy(rdbms::Conn &conn, const std::string &mountPolicyName) const { + try { + const char *const sql = + "SELECT " + "MOUNT_POLICY_NAME AS MOUNT_POLICY_NAME," + + "ARCHIVE_PRIORITY AS ARCHIVE_PRIORITY," + "ARCHIVE_MIN_REQUEST_AGE AS ARCHIVE_MIN_REQUEST_AGE," + + "RETRIEVE_PRIORITY AS RETRIEVE_PRIORITY," + "RETRIEVE_MIN_REQUEST_AGE AS RETRIEVE_MIN_REQUEST_AGE," + + "USER_COMMENT AS USER_COMMENT," + + "CREATION_LOG_USER_NAME AS CREATION_LOG_USER_NAME," + "CREATION_LOG_HOST_NAME AS CREATION_LOG_HOST_NAME," + "CREATION_LOG_TIME AS CREATION_LOG_TIME," + + "LAST_UPDATE_USER_NAME AS LAST_UPDATE_USER_NAME," + "LAST_UPDATE_HOST_NAME AS LAST_UPDATE_HOST_NAME," + "LAST_UPDATE_TIME AS LAST_UPDATE_TIME " + "FROM " + "MOUNT_POLICY " + "WHERE " + "MOUNT_POLICY_NAME = :MOUNT_POLICY_NAME"; + auto stmt = conn.createStmt(sql); + stmt.bindString(":MOUNT_POLICY_NAME", mountPolicyName); + auto rset = stmt.executeQuery(); + if (rset.next()) { + common::dataStructures::MountPolicy policy; + + policy.name = rset.columnString("MOUNT_POLICY_NAME"); + + policy.archivePriority = rset.columnUint64("ARCHIVE_PRIORITY"); + policy.archiveMinRequestAge = rset.columnUint64("ARCHIVE_MIN_REQUEST_AGE"); + + policy.retrievePriority = rset.columnUint64("RETRIEVE_PRIORITY"); + policy.retrieveMinRequestAge = rset.columnUint64("RETRIEVE_MIN_REQUEST_AGE"); + + policy.comment = rset.columnString("USER_COMMENT"); + + policy.creationLog.username = rset.columnString("CREATION_LOG_USER_NAME"); + policy.creationLog.host = rset.columnString("CREATION_LOG_HOST_NAME"); + policy.creationLog.time = rset.columnUint64("CREATION_LOG_TIME"); + + policy.lastModificationLog.username = rset.columnString("LAST_UPDATE_USER_NAME"); + policy.lastModificationLog.host = rset.columnString("LAST_UPDATE_HOST_NAME"); + policy.lastModificationLog.time = rset.columnUint64("LAST_UPDATE_TIME"); + + return policy; + } + return std::nullopt; + + } catch(exception::UserError &) { + throw; + } catch(exception::Exception &ex) { + ex.getMessage().str(std::string(__FUNCTION__) + ": " + ex.getMessage().str()); + throw; + } +} + //------------------------------------------------------------------------------ // getCachedMountPolicies //------------------------------------------------------------------------------ @@ -8821,7 +8900,8 @@ common::dataStructures::RetrieveFileQueueCriteria RdbmsCatalogue::prepareToRetri const uint64_t archiveFileId, const common::dataStructures::RequesterIdentity &user, const std::optional<std::string>& activity, - log::LogContext &lc) { + log::LogContext &lc, + const std::optional<std::string> &mountPolicyName) { try { cta::utils::Timer t; common::dataStructures::RetrieveFileQueueCriteria criteria; @@ -8851,6 +8931,19 @@ common::dataStructures::RetrieveFileQueueCriteria RdbmsCatalogue::prepareToRetri " exits in CTA namespace but is permanently unavailable on " << brokenState.second << " tape " << brokenState.first; throw ex; } + if (mountPolicyName) { + std::optional<common::dataStructures::MountPolicy> mountPolicy = getMountPolicy(conn, mountPolicyName.value()); + if (mountPolicy) { + criteria.archiveFile = *archiveFile; + criteria.mountPolicy = mountPolicy.value(); + return criteria; + } else { + log::ScopedParamContainer spc(lc); + spc.add("mountPolicyName", mountPolicyName.value()) + .add("archiveFileId", archiveFileId); + lc.log(log::WARNING, "Catalogue::prepareToRetrieve Could not find specified mount policy, falling back to querying mount rules"); + } + } if(diskInstanceName != archiveFile->diskInstance) { exception::UserError ue; diff --git a/catalogue/RdbmsCatalogue.hpp b/catalogue/RdbmsCatalogue.hpp index a6977bdfa4ea7f27fcacbb2f810021a869103533..34a4804357eced8943174c7e431523678330e82c 100644 --- a/catalogue/RdbmsCatalogue.hpp +++ b/catalogue/RdbmsCatalogue.hpp @@ -189,7 +189,8 @@ public: const uint64_t archiveFileId, const common::dataStructures::RequesterIdentity &user, const std::optional<std::string> & activity, - log::LogContext &lc) override; + log::LogContext &lc, + const std::optional<std::string> &mountPolicyName = std::nullopt) override; /** * Notifies the CTA catalogue that the specified tape has been mounted in @@ -705,6 +706,20 @@ public: */ std::list<common::dataStructures::MountPolicy> getMountPolicies() const override; + /** + * Returns the mount policy with the specified name. + * + * @return the specified mount policy + */ + std::optional<common::dataStructures::MountPolicy> getMountPolicy(const std::string &mountPolicyName) const override; + + /** + * Returns the mount policy with the specified name. + * + * @return the specified mount policy + */ + std::optional<common::dataStructures::MountPolicy> getMountPolicy(rdbms::Conn &conn, const std::string &mountPolicyName) const; + /** * Returns the list of all existing mount policies. * diff --git a/common/dataStructures/RetrieveRequest.hpp b/common/dataStructures/RetrieveRequest.hpp index d1d3dab4ef0ac8e454ddf35b048f29b62837f8d5..3fc68f295043756740bb1c8591ef49d2d0a17dfb 100644 --- a/common/dataStructures/RetrieveRequest.hpp +++ b/common/dataStructures/RetrieveRequest.hpp @@ -59,7 +59,7 @@ struct RetrieveRequest { EntryLog creationLog; bool isVerifyOnly; // request to retrieve file from tape but do not write a disk copy std::optional<std::string> vid; // limit retrieve requests to the specified vid (in the case of dual-copy files) - + std::optional<std::string> mountPolicy; // limit retrieve requests to a specified mount policy (only used for verification requests) LifecycleTimings lifecycleTimings; std::optional<std::string> activity; diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index 1b4d91d743799476dc34721498bb3a803ad23125..bbac0f37aecfec4c59e07e4d1a8a6997ffc9be9a 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -192,7 +192,9 @@ std::string Scheduler::queueRetrieve( utils::Timer t; // Get the queue criteria common::dataStructures::RetrieveFileQueueCriteria queueCriteria; - queueCriteria = m_catalogue.prepareToRetrieveFile(instanceName, request.archiveFileID, request.requester, request.activity, lc); + + queueCriteria = m_catalogue.prepareToRetrieveFile(instanceName, request.archiveFileID, request.requester, request.activity, lc, request.mountPolicy); + queueCriteria.archiveFile.diskFileInfo = request.diskFileInfo; // The following block of code is a temporary fix for the following CTA issue: diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index eb01b28a9c92edc04f35d3559d0f6d9cda154272..175a7611a8649511e5a3fe43328c8eff054e7834 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -646,6 +646,225 @@ TEST_P(SchedulerTest, archive_report_and_retrieve_new_file) { } } +TEST_P(SchedulerTest, archive_report_and_retrieve_new_file_with_specific_mount_policy) { + using namespace cta; + + Scheduler &scheduler = getScheduler(); + auto &catalogue = getCatalogue(); + + setupDefaultCatalogue(); + +#ifdef STDOUT_LOGGING + log::StdoutLogger dl("dummy", "unitTest"); +#else + log::DummyLogger dl("", ""); +#endif + log::LogContext lc(dl); + + uint64_t archiveFileId; + { + // Queue an archive request. + cta::common::dataStructures::EntryLog creationLog; + creationLog.host="host2"; + creationLog.time=0; + creationLog.username="admin1"; + cta::common::dataStructures::DiskFileInfo diskFileInfo; + diskFileInfo.gid=GROUP_2; + diskFileInfo.owner_uid=CMS_USER; + diskFileInfo.path="path/to/file"; + cta::common::dataStructures::ArchiveRequest request; + request.checksumBlob.insert(cta::checksum::ADLER32, 0x1234abcd); + request.creationLog=creationLog; + request.diskFileInfo=diskFileInfo; + request.diskFileID="diskFileID"; + request.fileSize=100*1000*1000; + cta::common::dataStructures::RequesterIdentity requester; + requester.name = s_userName; + requester.group = "userGroup"; + request.requester = requester; + request.srcURL="srcURL"; + request.storageClass=s_storageClassName; + archiveFileId = scheduler.checkAndGetNextArchiveFileId(s_diskInstance, request.storageClass, request.requester, lc); + scheduler.queueArchiveWithGivenId(archiveFileId, s_diskInstance, request, lc); + } + scheduler.waitSchedulerDbSubthreadsComplete(); + + // Check that we have the file in the queues + // TODO: for this to work all the time, we need an index of all requests + // (otherwise we miss the selected ones). + // Could also be limited to querying by ID (global index needed) + bool found=false; + for (auto & tp: scheduler.getPendingArchiveJobs(lc)) { + for (auto & req: tp.second) { + if (req.archiveFileID == archiveFileId) + found = true; + } + } + ASSERT_TRUE(found); + + // Create the environment for the migration to happen (library + tape) + const std::string libraryComment = "Library comment"; + const bool libraryIsDisabled = true; + catalogue.createLogicalLibrary(s_adminOnAdminHost, s_libraryName, + libraryIsDisabled, libraryComment); + { + auto libraries = catalogue.getLogicalLibraries(); + ASSERT_EQ(1, libraries.size()); + ASSERT_EQ(s_libraryName, libraries.front().name); + ASSERT_EQ(libraryComment, libraries.front().comment); + } + + { + auto tape = getDefaultTape(); + catalogue.createTape(s_adminOnAdminHost, tape); + } + + const std::string driveName = "tape_drive"; + + catalogue.tapeLabelled(s_vid, "tape_drive"); + + { + // Emulate a tape server by asking for a mount and then a file (and succeed the transfer) + std::unique_ptr<cta::TapeMount> mount; + // This first initialization is normally done by the dataSession function. + cta::common::dataStructures::DriveInfo driveInfo = { driveName, "myHost", s_libraryName }; + scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc); + scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Up, lc); + mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + //Test that no mount is available when a logical library is disabled + ASSERT_EQ(nullptr, mount.get()); + catalogue.setLogicalLibraryDisabled(s_adminOnAdminHost,s_libraryName,false); + //continue our test + mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + ASSERT_NE(nullptr, mount.get()); + ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForUser, mount.get()->getMountType()); + auto & osdb=getSchedulerDB(); + auto mi=osdb.getMountInfo(lc); + ASSERT_EQ(1, mi->existingOrNextMounts.size()); + ASSERT_EQ("TapePool", mi->existingOrNextMounts.front().tapePool); + ASSERT_EQ("TestVid", mi->existingOrNextMounts.front().vid); + std::unique_ptr<cta::ArchiveMount> archiveMount; + archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release())); + ASSERT_NE(nullptr, archiveMount.get()); + std::list<std::unique_ptr<cta::ArchiveJob>> archiveJobBatch = archiveMount->getNextJobBatch(1,1,lc); + ASSERT_NE(nullptr, archiveJobBatch.front().get()); + std::unique_ptr<ArchiveJob> archiveJob = std::move(archiveJobBatch.front()); + archiveJob->tapeFile.blockId = 1; + archiveJob->tapeFile.fSeq = 1; + archiveJob->tapeFile.checksumBlob.insert(cta::checksum::ADLER32, 0x1234abcd); + archiveJob->tapeFile.fileSize = archiveJob->archiveFile.fileSize; + archiveJob->tapeFile.copyNb = 1; + archiveJob->validate(); + std::queue<std::unique_ptr <cta::ArchiveJob >> sDBarchiveJobBatch; + std::queue<cta::catalogue::TapeItemWritten> sTapeItems; + std::queue<std::unique_ptr <cta::SchedulerDatabase::ArchiveJob >> failedToReportArchiveJobs; + sDBarchiveJobBatch.emplace(std::move(archiveJob)); + archiveMount->reportJobsBatchTransferred(sDBarchiveJobBatch, sTapeItems,failedToReportArchiveJobs, lc); + archiveJobBatch = archiveMount->getNextJobBatch(1,1,lc); + ASSERT_EQ(0, archiveJobBatch.size()); + archiveMount->complete(); + } + + { + // Emulate the the reporter process reporting successful transfer to tape to the disk system + auto jobsToReport = scheduler.getNextArchiveJobsToReportBatch(10, lc); + ASSERT_NE(0, jobsToReport.size()); + disk::DiskReporterFactory factory; + log::TimingList timings; + utils::Timer t; + scheduler.reportArchiveJobsBatch(jobsToReport, factory, timings, t, lc); + ASSERT_EQ(0, scheduler.getNextArchiveJobsToReportBatch(10, lc).size()); + } + + { + //create custom mount policy for retrieve + catalogue::CreateMountPolicyAttributes mountPolicy; + mountPolicy.name = "custom_mount_policy"; + mountPolicy.archivePriority = s_archivePriority; + mountPolicy.minArchiveRequestAge = s_minArchiveRequestAge; + mountPolicy.retrievePriority = s_retrievePriority; + mountPolicy.minRetrieveRequestAge = s_minRetrieveRequestAge; + mountPolicy.comment = "custom mount policy"; + + catalogue.createMountPolicy(s_adminOnAdminHost, mountPolicy); + } + + { + //queue retrieve + cta::common::dataStructures::EntryLog creationLog; + creationLog.host="host2"; + creationLog.time=0; + creationLog.username="admin1"; + cta::common::dataStructures::DiskFileInfo diskFileInfo; + diskFileInfo.gid=GROUP_2; + diskFileInfo.owner_uid=CMS_USER; + diskFileInfo.path="path/to/file"; + cta::common::dataStructures::RetrieveRequest request; + request.archiveFileID = archiveFileId; + request.creationLog = creationLog; + request.diskFileInfo = diskFileInfo; + request.dstURL = "dstURL"; + request.requester.name = s_userName; + request.requester.group = "userGroup"; + request.mountPolicy = "custom_mount_policy"; + scheduler.queueRetrieve("disk_instance", request, lc); + scheduler.waitSchedulerDbSubthreadsComplete(); + } + + // Check that the retrieve request is queued + { + auto rqsts = scheduler.getPendingRetrieveJobs(lc); + // We expect 1 tape with queued jobs + ASSERT_EQ(1, rqsts.size()); + // We expect the queue to contain 1 job + ASSERT_EQ(1, rqsts.cbegin()->second.size()); + // We expect the job to be single copy + auto & job = rqsts.cbegin()->second.back(); + ASSERT_EQ(1, job.tapeCopies.size()); + // We expect the copy to be on the provided tape. + ASSERT_TRUE(s_vid == job.tapeCopies.cbegin()->first); + // Check the remote target + ASSERT_EQ("dstURL", job.request.dstURL); + // Check the archive file ID + ASSERT_EQ(archiveFileId, job.request.archiveFileID); + + // Check that we can retrieve jobs by VID + + // Get the vid from the above job and submit a separate request for the same vid + auto vid = rqsts.begin()->second.back().tapeCopies.begin()->first; + auto rqsts_vid = scheduler.getPendingRetrieveJobs(vid, lc); + // same tests as above + ASSERT_EQ(1, rqsts_vid.size()); + auto &job_vid = rqsts_vid.back(); + ASSERT_EQ(1, job_vid.tapeCopies.size()); + ASSERT_TRUE(s_vid == job_vid.tapeCopies.cbegin()->first); + ASSERT_EQ("dstURL", job_vid.request.dstURL); + ASSERT_EQ(archiveFileId, job_vid.request.archiveFileID); + } + + { + // Emulate a tape server by asking for a mount and then a file (and succeed the transfer) + std::unique_ptr<cta::TapeMount> mount; + mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + ASSERT_NE(nullptr, mount.get()); + ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); + std::unique_ptr<cta::RetrieveMount> retrieveMount; + retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(mount.release())); + ASSERT_NE(nullptr, retrieveMount.get()); + std::unique_ptr<cta::RetrieveJob> retrieveJob; + auto jobBatch = retrieveMount->getNextJobBatch(1,1,lc); + ASSERT_EQ(1, jobBatch.size()); + retrieveJob.reset(jobBatch.front().release()); + ASSERT_NE(nullptr, retrieveJob.get()); + retrieveJob->asyncSetSuccessful(); + std::queue<std::unique_ptr<cta::RetrieveJob> > jobQueue; + jobQueue.push(std::move(retrieveJob)); + retrieveMount->flushAsyncSuccessReports(jobQueue, lc); + jobBatch = retrieveMount->getNextJobBatch(1,1,lc); + ASSERT_EQ(0, jobBatch.size()); + } +} + TEST_P(SchedulerTest, archive_report_and_retrieve_new_dual_copy_file) { using namespace cta; diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp index bfa51e048b817da26eee318f84eb5f1a070d7f82..0f57b4680d4b0a7c5fbe24eb9b00339d1797eeeb 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp @@ -631,6 +631,9 @@ void RequestMessage::processPREPARE(const cta::eos::Notification ¬ification, request.creationLog.username = m_cliIdentity.username; request.creationLog.time = time(nullptr); request.isVerifyOnly = notification.wf().verify_only(); + if (request.isVerifyOnly) { + request.mountPolicy = m_verificationMountPolicy; + } // Vid is for tape verification use case (for dual-copy files) so normally is not specified if(!notification.wf().vid().empty()) { diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.hpp b/xroot_plugins/XrdSsiCtaRequestMessage.hpp index e39efc70c0d9883314acad8281630d40eebc9f50..4e180e4c815302398c8f6a16c87c459c1b887713 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.hpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.hpp @@ -39,6 +39,7 @@ public: m_scheduler(service->getScheduler()), m_archiveFileMaxSize(service->getArchiveFileMaxSize()), m_repackBufferURL(service->getRepackBufferURL()), + m_verificationMountPolicy(service->getVerificationMountPolicy()), m_namespaceMap(service->getNamespaceMap()), m_lc(service->getLogContext()), m_catalogue_conn_string(service->getCatalogueConnectionString()) @@ -305,24 +306,25 @@ private: // Member variables - Protocol m_protocol; //!< The protocol the client used to connect - cta::common::dataStructures::SecurityIdentity m_cliIdentity; //!< Client identity: username/host - const XrdSsiCtaServiceProvider &m_service; //!< Const reference to the XRootD SSI Service - cta::catalogue::Catalogue &m_catalogue; //!< Reference to CTA Catalogue - cta::Scheduler &m_scheduler; //!< Reference to CTA Scheduler - uint64_t m_archiveFileMaxSize; //!< Maximum allowed file size for archive requests - std::optional<std::string> m_repackBufferURL; //!< Repack buffer URL - NamespaceMap_t m_namespaceMap; //!< Identifiers for namespace queries - cta::log::LogContext m_lc; //!< CTA Log Context - std::map<cta::admin::OptionBoolean::Key, bool> m_option_bool; //!< Boolean options - std::map<cta::admin::OptionUInt64::Key, uint64_t> m_option_uint64; //!< UInt64 options - std::map<cta::admin::OptionString::Key, std::string> m_option_str; //!< String options + Protocol m_protocol; //!< The protocol the client used to connect + cta::common::dataStructures::SecurityIdentity m_cliIdentity; //!< Client identity: username/host + const XrdSsiCtaServiceProvider &m_service; //!< Const reference to the XRootD SSI Service + cta::catalogue::Catalogue &m_catalogue; //!< Reference to CTA Catalogue + cta::Scheduler &m_scheduler; //!< Reference to CTA Scheduler + uint64_t m_archiveFileMaxSize; //!< Maximum allowed file size for archive requests + std::optional<std::string> m_repackBufferURL; //!< Repack buffer URL + std::optional<std::string> m_verificationMountPolicy; //!< Repack buffer URL + NamespaceMap_t m_namespaceMap; //!< Identifiers for namespace queries + cta::log::LogContext m_lc; //!< CTA Log Context + std::map<cta::admin::OptionBoolean::Key, bool> m_option_bool; //!< Boolean options + std::map<cta::admin::OptionUInt64::Key, uint64_t> m_option_uint64; //!< UInt64 options + std::map<cta::admin::OptionString::Key, std::string> m_option_str; //!< String options std::map<cta::admin::OptionStrList::Key, - std::vector<std::string>> m_option_str_list; //!< String List options - Versions m_client_versions; //!< Client CTA and xrootd-ssi-proto version(tag) - std::string m_client_cta_version; //!< Client CTA Version - std::string m_client_xrd_ssi_proto_int_version; //!< Client xrootd-ssi-protobuf-interface version (tag) - std::string m_catalogue_conn_string; //!< Server catalogue connection string + std::vector<std::string>> m_option_str_list; //!< String List options + Versions m_client_versions; //!< Client CTA and xrootd-ssi-proto version(tag) + std::string m_client_cta_version; //!< Client CTA Version + std::string m_client_xrd_ssi_proto_int_version; //!< Client xrootd-ssi-protobuf-interface version (tag) + std::string m_catalogue_conn_string; //!< Server catalogue connection string }; }} // namespace cta::xrd diff --git a/xroot_plugins/XrdSsiCtaServiceProvider.cpp b/xroot_plugins/XrdSsiCtaServiceProvider.cpp index 289010f71459500934df13942c985332ff89e9d6..26ccc01e6228f0f32ecff9f04d61190ea3db9e82 100644 --- a/xroot_plugins/XrdSsiCtaServiceProvider.cpp +++ b/xroot_plugins/XrdSsiCtaServiceProvider.cpp @@ -241,6 +241,12 @@ void XrdSsiCtaServiceProvider::ExceptionThrowingInit(XrdSsiLogger *logP, XrdSsiC m_repackBufferURL = repackBufferURLConf.second; } + // Get the verification mount policy + const auto verificationMountPolicy = config.getOptionValueStr("cta.verification.mount_policy"); + if(verificationMountPolicy.first){ + m_verificationMountPolicy = verificationMountPolicy.second; + } + { // Log cta.repack.repack_buffer_url if(repackBufferURLConf.first){ @@ -273,6 +279,8 @@ void XrdSsiCtaServiceProvider::ExceptionThrowingInit(XrdSsiLogger *logP, XrdSsiC } } + // Get the mount policy name for verification requests + // All done log(log::INFO, std::string("cta-frontend started"), params); } diff --git a/xroot_plugins/XrdSsiCtaServiceProvider.hpp b/xroot_plugins/XrdSsiCtaServiceProvider.hpp index edce055b4316168ecb831fcdde0e223db546c8e2..9e38319f34f27131ef71ecbbff5fcda6e9497aec 100644 --- a/xroot_plugins/XrdSsiCtaServiceProvider.hpp +++ b/xroot_plugins/XrdSsiCtaServiceProvider.hpp @@ -107,6 +107,12 @@ public: */ std::optional<std::string> getRepackBufferURL() const { return m_repackBufferURL; } + /*! + * Get the verification mount policy + */ + std::optional<std::string> getVerificationMountPolicy() const { return m_verificationMountPolicy; } + + /*! * Populate the namespace endpoint configuration from a keytab file */ @@ -136,6 +142,7 @@ public: uint64_t m_archiveFileMaxSize; //!< Maximum allowed file size for archive requests std::optional<std::string> m_repackBufferURL; //!< The repack buffer URL + std::optional<std::string> m_verificationMountPolicy; //!< The mount policy for verification requests cta::NamespaceMap_t m_namespaceMap; //!< Endpoints for namespace queries std::string m_catalogue_conn_string; //!< The catalogue connection string (without the password) diff --git a/xroot_plugins/cta-frontend-xrootd.conf b/xroot_plugins/cta-frontend-xrootd.conf index ea7b9e7d43f3e9d867cbcbe779bde54268345c0f..eac3b76818698fd3c61eab40998502b29f6c27b0 100644 --- a/xroot_plugins/cta-frontend-xrootd.conf +++ b/xroot_plugins/cta-frontend-xrootd.conf @@ -29,6 +29,9 @@ cta.log.ssi warning # CTA Repack buffer URL # cta.repack.repack_buffer_url root://ctaeos//eos/ctaeos/repack +# CTA Verification Mount Policy +cta.verification.mount_policy verification + # Keytab containing gRPC endpoints and tokens for each disk instance #cta.ns.config /etc/cta/eos.grpc.keytab