From 853ff48c8ec23e3cdb9aea11f74c4af9d798b2e1 Mon Sep 17 00:00:00 2001 From: Cedric CAFFY <cedric.caffy@cern.ch> Date: Thu, 25 Jul 2019 15:22:28 +0200 Subject: [PATCH] Changed the flush policy of successful Retrieve Requests (every 2000 files or every 3 minutes) Added mount policy to the Repack Request --- cmdline/CtaAdminCmdParse.hpp | 2 +- objectstore/GarbageCollectorTest.cpp | 5 ++++ objectstore/RepackRequest.cpp | 14 +++++++++++ objectstore/RepackRequest.hpp | 3 +++ objectstore/RetrieveRequest.cpp | 3 ++- objectstore/RetrieveRequest.hpp | 2 ++ objectstore/cta.proto | 1 + scheduler/OStoreDB/OStoreDB.cpp | 15 ++++++++---- scheduler/OStoreDB/OStoreDB.hpp | 2 +- scheduler/OStoreDB/OStoreDBFactory.hpp | 4 ++-- scheduler/Scheduler.cpp | 4 ++-- scheduler/Scheduler.hpp | 2 +- scheduler/SchedulerDatabase.hpp | 2 +- scheduler/SchedulerTest.cpp | 22 ++++++++--------- .../daemon/DataTransferSessionTest.cpp | 2 +- .../tapeserver/daemon/RecallReportPacker.hpp | 2 +- xroot_plugins/XrdSsiCtaRequestMessage.cpp | 24 ++++++++++++++++++- 17 files changed, 81 insertions(+), 28 deletions(-) diff --git a/cmdline/CtaAdminCmdParse.hpp b/cmdline/CtaAdminCmdParse.hpp index ef521e133b..8ec43721f8 100644 --- a/cmdline/CtaAdminCmdParse.hpp +++ b/cmdline/CtaAdminCmdParse.hpp @@ -488,7 +488,7 @@ const std::map<cmd_key_t, cmd_val_t> cmdOptions = { {{ AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_LS }, { }}, /*----------------------------------------------------------------------------------------------------*/ {{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ADD }, - { opt_vid.optional(), opt_vidfile.optional(), opt_bufferurl.optional(), opt_justmove.optional(), opt_justaddcopies.optional() }}, + { opt_vid.optional(), opt_vidfile.optional(), opt_bufferurl.optional(), opt_justmove.optional(), opt_justaddcopies.optional(), opt_mountpolicy.optional() }}, {{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_RM }, { opt_vid }}, {{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_LS }, { opt_vid.optional() }}, {{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ERR }, { opt_vid }}, diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index 4fbaeb5226..c80cb6ca70 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -720,6 +720,7 @@ TEST(ObjectStore, GarbageCollectorRepackRequestPending) { repackRequest.setVid("VIDTest"); repackRequest.setBufferURL("test/buffer/url"); repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress()); + repackRequest.setMountPolicy(cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack); repackRequest.insert(); } { @@ -800,6 +801,7 @@ TEST(ObjectStore, GarbageCollectorRepackRequestToExpand) { repackRequest.setVid("VID2Test"); repackRequest.setBufferURL("test/buffer/url"); repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress()); + repackRequest.setMountPolicy(cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack); repackRequest.insert(); } { @@ -880,6 +882,7 @@ TEST(ObjectStore, GarbageCollectorRepackRequestRunningExpandNotFinished) { repackRequest.setBufferURL("test/buffer/url"); repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress()); repackRequest.setExpandFinished(false); + repackRequest.setMountPolicy(cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack); repackRequest.insert(); } { @@ -961,6 +964,7 @@ TEST(ObjectStore, GarbageCollectorRepackRequestRunningExpandFinished) { repackRequest.setBufferURL("test/buffer/url"); repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress()); repackRequest.setExpandFinished(true); + repackRequest.setMountPolicy(cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack); repackRequest.insert(); } cta::log::StringLogger strLogger("dummy", "dummy", cta::log::DEBUG); @@ -1059,6 +1063,7 @@ TEST(ObjectStore, GarbageCollectorRepackRequestStarting) { repackRequest.setBufferURL("test/buffer/url"); repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress()); repackRequest.setExpandFinished(true); + repackRequest.setMountPolicy(cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack); repackRequest.insert(); } cta::log::StringLogger strLogger("dummy", "dummy", cta::log::DEBUG); diff --git a/objectstore/RepackRequest.cpp b/objectstore/RepackRequest.cpp index fa481405e9..b7bb2a6d9f 100644 --- a/objectstore/RepackRequest.cpp +++ b/objectstore/RepackRequest.cpp @@ -21,6 +21,7 @@ #include "AgentReference.hpp" #include "RepackQueueAlgorithms.hpp" #include "Algorithms.hpp" +#include "MountPolicySerDeser.hpp" #include <google/protobuf/util/json_util.h> #include <iostream> @@ -177,6 +178,19 @@ void RepackRequest::setTotalStats(const cta::SchedulerDatabase::RepackRequest::T setTotalBytesToRetrieve(totalStatsFiles.totalBytesToRetrieve); } +void RepackRequest::setMountPolicy(const common::dataStructures::MountPolicy& mp){ + checkPayloadWritable(); + MountPolicySerDeser mpSerDeser(mp); + mpSerDeser.serialize(*m_payload.mutable_mount_policy()); +} + +common::dataStructures::MountPolicy RepackRequest::getMountPolicy(){ + checkPayloadReadable(); + MountPolicySerDeser mpSerDeser; + mpSerDeser.deserialize(m_payload.mount_policy()); + return mpSerDeser; +} + void RepackRequest::setStatus(){ checkPayloadWritable(); checkPayloadReadable(); diff --git a/objectstore/RepackRequest.hpp b/objectstore/RepackRequest.hpp index f1ae27b6ae..70dac1f871 100644 --- a/objectstore/RepackRequest.hpp +++ b/objectstore/RepackRequest.hpp @@ -48,6 +48,9 @@ public: void setExpandStarted(const bool expandStarted); void setTotalStats(const cta::SchedulerDatabase::RepackRequest::TotalStatsFiles& totalStatsFiles); cta::SchedulerDatabase::RepackRequest::TotalStatsFiles getTotalStatsFile(); + void setMountPolicy(const common::dataStructures::MountPolicy &mp); + common::dataStructures::MountPolicy getMountPolicy(); + /** * Automatically set the new status of the Repack Request * regarding multiple parameters diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index 223e84a606..ee799a1592 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -1052,7 +1052,7 @@ void RetrieveRequest::AsyncJobDeleter::wait() { RetrieveRequest::AsyncJobSucceedForRepackReporter * RetrieveRequest::asyncReportSucceedForRepack(uint32_t copyNb) { std::unique_ptr<AsyncJobSucceedForRepackReporter> ret(new AsyncJobSucceedForRepackReporter); - ret->m_updaterCallback = [copyNb](const std::string &in)->std::string{ + ret->m_updaterCallback = [&ret,copyNb](const std::string &in)->std::string{ // We have a locked and fetched object, so we just need to work on its representation. cta::objectstore::serializers::ObjectHeader oh; if (!oh.ParseFromString(in)) { @@ -1085,6 +1085,7 @@ RetrieveRequest::AsyncJobSucceedForRepackReporter * RetrieveRequest::asyncReport return oh.SerializeAsString(); } } + ret->m_MountPolicy.deserialize(payload.mountpolicy()); throw cta::exception::Exception("In RetrieveRequest::asyncReportSucceedForRepack::lambda(): copyNb not found"); }; ret->m_backendUpdater.reset(m_objectStore.asyncUpdate(getAddressIfSet(),ret->m_updaterCallback)); diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp index d01720113c..62a7808f89 100644 --- a/objectstore/RetrieveRequest.hpp +++ b/objectstore/RetrieveRequest.hpp @@ -33,6 +33,7 @@ #include "common/dataStructures/LifecycleTimings.hpp" #include "AgentReference.hpp" #include "SorterArchiveJob.hpp" +#include "MountPolicySerDeser.hpp" namespace cta { namespace objectstore { @@ -77,6 +78,7 @@ public: * Wait for the end of the execution of the updater callback */ void wait(); + MountPolicySerDeser m_MountPolicy; private: //Hold the AsyncUpdater that will run asynchronously the m_updaterCallback std::unique_ptr<Backend::AsyncUpdater> m_backendUpdater; diff --git a/objectstore/cta.proto b/objectstore/cta.proto index 8669b7f1bc..55a327cac4 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -576,6 +576,7 @@ message RepackRequest { //the expansion of the RepackRequest is done or not required bool is_expand_finished = 11561; required bool is_expand_started = 11562; + required MountPolicy mount_policy = 11563; repeated RepackSubRequestPointer subrequests = 11570; } diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 10364dcb10..412ddcc7da 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -1401,7 +1401,7 @@ OStoreDB::RetrieveQueueItor_t* OStoreDB::getRetrieveJobItorPtr(const std::string // OStoreDB::queueRepack() //------------------------------------------------------------------------------ void OStoreDB::queueRepack(const std::string& vid, const std::string& bufferURL, - common::dataStructures::RepackInfo::Type repackType, log::LogContext & lc) { + common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy& mountPolicy, log::LogContext & lc) { // Prepare the repack request object in memory. assertAgentAddressSet(); cta::utils::Timer t; @@ -1412,6 +1412,7 @@ void OStoreDB::queueRepack(const std::string& vid, const std::string& bufferURL, rr->setVid(vid); rr->setType(repackType); rr->setBufferURL(bufferURL); + rr->setMountPolicy(mountPolicy); // Try to reference the object in the index (will fail if there is already a request with this VID. try { Helpers::registerRepackRequestToIndex(vid, rr->getAddressIfSet(), *m_agentReference, m_objectStore, lc); @@ -1863,7 +1864,7 @@ void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) { // As usual there are many opportunities for failure. utils::Timer t; log::TimingList timingList; - + cta::common::dataStructures::MountPolicy mountPolicy; // 1) Update statistics. As the repack request is protected against double reporting, we can release its lock // before the next step. { @@ -1880,6 +1881,7 @@ void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) { objectstore::ScopedExclusiveLock rrl(m_repackRequest); timingList.insertAndReset("successStatsLockTime", t); m_repackRequest.fetch(); + mountPolicy = m_repackRequest.getMountPolicy(); timingList.insertAndReset("successStatsFetchTime", t); m_repackRequest.reportRetriveSuccesses(ssl); timingList.insertAndReset("successStatsUpdateTime", t); @@ -1956,7 +1958,7 @@ void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) { sorterArchiveJob.jobDump.copyNb = copyNbToArchive; sorterArchiveJob.jobDump.tapePool = atar.subrequestInfo.repackInfo.archiveRouteMap[copyNbToArchive]; sorterArchiveJob.jobQueueType = cta::objectstore::JobQueueType::JobsToTransferForRepack; - sorterArchiveJob.mountPolicy = common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack; + sorterArchiveJob.mountPolicy = mountPolicy; sorterArchiveJob.previousOwner = atar.subrequestInfo.owner; sorterArchiveRequest.archiveJobs.push_back(sorterArchiveJob); } @@ -2187,6 +2189,7 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest> m_repackRequest.setTotalStats(totalStatsFiles); uint64_t fSeq = std::max(maxFSeqLowBound + 1, maxAddedFSeq + 1); m_repackRequest.setLastExpandedFSeq(fSeq); + common::dataStructures::MountPolicy mountPolicy = m_repackRequest.getMountPolicy(); // We make sure the references to subrequests exist persistently before creating them. m_repackRequest.commit(); // We keep holding the repack request lock: we need to ensure de deleted boolean of each subrequest does @@ -2258,7 +2261,7 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest> // Set the queueing parameters common::dataStructures::RetrieveFileQueueCriteria fileQueueCriteria; fileQueueCriteria.archiveFile = rsr.archiveFile; - fileQueueCriteria.mountPolicy = common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack; + fileQueueCriteria.mountPolicy = mountPolicy; rr->setRetrieveFileQueueCriteria(fileQueueCriteria); // Decide of which vid we are going to retrieve from. Here, if we can retrieve from the repack VID, we // will set the initial recall on it. Retries will we requeue to best VID as usual if needed. @@ -3579,11 +3582,13 @@ void OStoreDB::RetrieveMount::flushAsyncSuccessReports(std::list<cta::SchedulerD // We will wait on the asynchronously started reports of jobs, queue the retrieve jobs // for report and remove them from ownership. // 1) Check the async update result. + common::dataStructures::MountPolicy mountPolicy; for (auto & sDBJob: jobsBatch) { auto osdbJob = castFromSchedDBJob(sDBJob); if (osdbJob->isRepack) { try { osdbJob->m_jobSucceedForRepackReporter->wait(); + mountPolicy = osdbJob->m_jobSucceedForRepackReporter->m_MountPolicy; jobsToRequeueForRepackMap[osdbJob->m_repackInfo.repackRequestAddress].emplace_back(osdbJob); } catch (cta::exception::Exception & ex) { log::ScopedParamContainer params(lc); @@ -3630,7 +3635,7 @@ void OStoreDB::RetrieveMount::flushAsyncSuccessReports(std::list<cta::SchedulerD for (auto & req: repackRequestQueue.second) { insertedRequests.push_back(RQTRTRFSAlgo::InsertedElement{&req->m_retrieveRequest, req->selectedCopyNb, req->archiveFile.tapeFiles.at(req->selectedCopyNb).fSeq, req->archiveFile.fileSize, - cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, + mountPolicy, serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess, req->m_activityDescription}); requestToJobMap[&req->m_retrieveRequest] = req; } diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index 2ea2e547e5..e93bd70342 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -331,7 +331,7 @@ public: /* === Repack requests handling =========================================== */ void queueRepack(const std::string& vid, const std::string& bufferURL, - common::dataStructures::RepackInfo::Type repackType, log::LogContext &logContext) override; + common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, log::LogContext &logContext) override; std::list<common::dataStructures::RepackInfo> getRepackInfo() override; CTA_GENERATE_EXCEPTION_CLASS(NoSuchRepackRequest); diff --git a/scheduler/OStoreDB/OStoreDBFactory.hpp b/scheduler/OStoreDB/OStoreDBFactory.hpp index d8f732ec2a..858d48c43c 100644 --- a/scheduler/OStoreDB/OStoreDBFactory.hpp +++ b/scheduler/OStoreDB/OStoreDBFactory.hpp @@ -224,8 +224,8 @@ public: } - void queueRepack(const std::string& vid, const std::string& bufferURL, common::dataStructures::RepackInfo::Type repackType, log::LogContext& lc) override { - m_OStoreDB.queueRepack(vid, bufferURL, repackType, lc); + void queueRepack(const std::string& vid, const std::string& bufferURL, common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, log::LogContext& lc) override { + m_OStoreDB.queueRepack(vid, bufferURL, repackType, mountPolicy, lc); } std::list<common::dataStructures::RepackInfo> getRepackInfo() override { diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index f354089d93..b1118555ba 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -337,13 +337,13 @@ void Scheduler::checkTapeFullBeforeRepack(std::string vid){ // repack //------------------------------------------------------------------------------ void Scheduler::queueRepack(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid, - const std::string & bufferURL, const common::dataStructures::RepackInfo::Type repackType, log::LogContext & lc) { + const std::string & bufferURL, const common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, log::LogContext & lc) { // Check request sanity if (vid.empty()) throw exception::UserError("Empty VID name."); if (bufferURL.empty()) throw exception::UserError("Empty buffer URL."); utils::Timer t; checkTapeFullBeforeRepack(vid); - m_db.queueRepack(vid, bufferURL, repackType, lc); + m_db.queueRepack(vid, bufferURL, repackType, mountPolicy, lc); log::TimingList tl; tl.insertAndReset("schedulerDbTime", t); log::ScopedParamContainer params(lc); diff --git a/scheduler/Scheduler.hpp b/scheduler/Scheduler.hpp index 9ec0464e64..8a4a3ab192 100644 --- a/scheduler/Scheduler.hpp +++ b/scheduler/Scheduler.hpp @@ -200,7 +200,7 @@ public: const bool force); void queueRepack(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid, - const std::string & bufferURL, const common::dataStructures::RepackInfo::Type repackType, log::LogContext & lc); + const std::string & bufferURL, const common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, log::LogContext & lc); void cancelRepack(const cta::common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid, log::LogContext & lc); std::list<cta::common::dataStructures::RepackInfo> getRepacks(); cta::common::dataStructures::RepackInfo getRepack(const std::string &vid); diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index 23dde264af..09f67419e4 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -388,7 +388,7 @@ public: /*============ Repack management: user side ================================*/ virtual void queueRepack(const std::string & vid, const std::string & bufferURL, - common::dataStructures::RepackInfo::Type repackType, log::LogContext & lc) = 0; + common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, log::LogContext & lc) = 0; virtual std::list<common::dataStructures::RepackInfo> getRepackInfo() = 0; virtual common::dataStructures::RepackInfo getRepackInfo(const std::string & vid) = 0; virtual void cancelRepack(const std::string & vid, log::LogContext & lc) = 0; diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 89d617c3ff..3b1b664464 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -1275,14 +1275,14 @@ TEST_P(SchedulerTest, repack) { catalogue.createTape(cliId,tape1,"mediaType","vendor",s_libraryName,s_tapePoolName,500,false,false, notReadOnly, "Comment"); //The queueing of a repack request should fail if the tape to repack is not full - ASSERT_THROW(scheduler.queueRepack(cliId, tape1, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, lc),cta::exception::UserError); + ASSERT_THROW(scheduler.queueRepack(cliId, tape1, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack,lc),cta::exception::UserError); //The queueing of a repack request in a vid that does not exist should throw an exception - ASSERT_THROW(scheduler.queueRepack(cliId, "NOT_EXIST", "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, lc),cta::exception::UserError); + ASSERT_THROW(scheduler.queueRepack(cliId, "NOT_EXIST", "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc),cta::exception::UserError); catalogue.setTapeFull(cliId,tape1,true); // Create and then cancel repack - scheduler.queueRepack(cliId, tape1, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, lc); + scheduler.queueRepack(cliId, tape1, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc); { auto repacks = scheduler.getRepacks(); ASSERT_EQ(1, repacks.size()); @@ -1294,7 +1294,7 @@ TEST_P(SchedulerTest, repack) { // Recreate a repack and get it moved to ToExpand std::string tape2 = "Tape2"; catalogue.createTape(cliId,tape2,"mediaType","vendor",s_libraryName,s_tapePoolName,500,false,true, notReadOnly, "Comment"); - scheduler.queueRepack(cliId, tape2, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, lc); + scheduler.queueRepack(cliId, tape2, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc); { auto repacks = scheduler.getRepacks(); ASSERT_EQ(1, repacks.size()); @@ -1335,13 +1335,13 @@ TEST_P(SchedulerTest, getNextRepackRequestToExpand) { catalogue.createTape(cliId,tape1,"mediaType","vendor",s_libraryName,s_tapePoolName,500,false,true, notReadOnly, "Comment"); //Queue the first repack request - scheduler.queueRepack(cliId, tape1, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, lc); + scheduler.queueRepack(cliId, tape1, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc); std::string tape2 = "Tape2"; catalogue.createTape(cliId,tape2,"mediaType","vendor",s_libraryName,s_tapePoolName,500,false,true, notReadOnly, "Comment"); //Queue the second repack request - scheduler.queueRepack(cliId,tape2,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::AddCopiesOnly,lc); + scheduler.queueRepack(cliId,tape2,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::AddCopiesOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc); //Test the repack request queued has status Pending ASSERT_EQ(scheduler.getRepack(tape1).status,common::dataStructures::RepackInfo::Status::Pending); @@ -1472,7 +1472,7 @@ TEST_P(SchedulerTest, expandRepackRequest) { scheduler.waitSchedulerDbSubthreadsComplete(); { for(uint64_t i = 0; i < nbTapesToRepack ; ++i) { - scheduler.queueRepack(admin,allVid.at(i),"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,lc); + scheduler.queueRepack(admin,allVid.at(i),"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc); } scheduler.waitSchedulerDbSubthreadsComplete(); //scheduler.waitSchedulerDbSubthreadsComplete(); @@ -1789,7 +1789,7 @@ TEST_P(SchedulerTest, expandRepackRequestRetrieveFailed) { scheduler.waitSchedulerDbSubthreadsComplete(); { - scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,lc); + scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc); scheduler.waitSchedulerDbSubthreadsComplete(); log::TimingList tl; @@ -2028,7 +2028,7 @@ TEST_P(SchedulerTest, expandRepackRequestArchiveSuccess) { scheduler.waitSchedulerDbSubthreadsComplete(); { - scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,lc); + scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc); scheduler.waitSchedulerDbSubthreadsComplete(); //scheduler.waitSchedulerDbSubthreadsComplete(); @@ -2274,7 +2274,7 @@ TEST_P(SchedulerTest, expandRepackRequestArchiveFailed) { scheduler.waitSchedulerDbSubthreadsComplete(); { - scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,lc); + scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly, common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc); scheduler.waitSchedulerDbSubthreadsComplete(); log::TimingList tl; @@ -2568,7 +2568,7 @@ TEST_P(SchedulerTest, expandRepackRequestExpansionTimeLimitReached) { //one retrieve request scheduler.waitSchedulerDbSubthreadsComplete(); { - scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,lc); + scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc); scheduler.waitSchedulerDbSubthreadsComplete(); log::TimingList tl; diff --git a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp index 8865377c0f..83184c131c 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp @@ -1805,7 +1805,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionTapeFullOnFlushMigration) { // We need to create the drive in the registry before being able to put it up. scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, logContext); scheduler.setDesiredDriveState(s_adminOnAdminHost, driveConfig.unitName, true, false, logContext); - + // Create the data transfer session DataTransferConfig castorConf; castorConf.bufsz = 1024*1024; // 1 MB memory buffers diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp index 0a22fdc290..057d310377 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp @@ -238,7 +238,7 @@ private: /* * The limit for successful reports to trigger flush. */ - const unsigned int RECALL_REPORT_PACKER_FLUSH_SIZE = 500; + const unsigned int RECALL_REPORT_PACKER_FLUSH_SIZE = 2000; /* * The time limit for successful reports to trigger flush. diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp index 75f4d2ac42..af9c5a4d7c 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp @@ -1083,6 +1083,28 @@ void RequestMessage::processRepack_Add(cta::xrd::Response &response) throw cta::exception::UserError("Must specify the buffer URL using --bufferurl option or using the frontend configuration file."); } } + + typedef common::dataStructures::MountPolicy MountPolicy; + MountPolicy mountPolicy = MountPolicy::s_defaultMountPolicyForRepack; + + auto mountPolicyProvidedByUserOpt = getOptional(OptionString::MOUNT_POLICY); + if(mountPolicyProvidedByUserOpt){ + //The user specified a mount policy name for this repack request + std::string mountPolicyProvidedByUser = mountPolicyProvidedByUserOpt.value(); + //Get the mountpolicy from the catalogue + typedef std::list<common::dataStructures::MountPolicy> MountPolicyList; + MountPolicyList mountPolicies = m_catalogue.getMountPolicies(); + MountPolicyList::const_iterator repackMountPolicyItor = std::find_if(mountPolicies.begin(),mountPolicies.end(),[mountPolicyProvidedByUser](const common::dataStructures::MountPolicy & mp){ + return mp.name == mountPolicyProvidedByUser; + }); + if(repackMountPolicyItor != mountPolicies.end()){ + //The mount policy exists + mountPolicy = *repackMountPolicyItor; + } else { + //The mount policy does not exist, throw a user error + throw cta::exception::UserError("The mount policy name provided does not match any existing mount policy."); + } + } // Expand, repack, or both ? cta::common::dataStructures::RepackInfo::Type type; @@ -1099,7 +1121,7 @@ void RequestMessage::processRepack_Add(cta::xrd::Response &response) // Process each item in the list for(auto it = vid_list.begin(); it != vid_list.end(); ++it) { - m_scheduler.queueRepack(m_cliIdentity, *it, bufferURL, type, m_lc); + m_scheduler.queueRepack(m_cliIdentity, *it, bufferURL, type, mountPolicy , m_lc); } response.set_type(cta::xrd::Response::RSP_SUCCESS); -- GitLab