diff --git a/cmdline/CtaAdminCmdParse.hpp b/cmdline/CtaAdminCmdParse.hpp index ef521e133b1d7e91a56a5cc22614314cf26797db..8ec43721f875bb2cbffe48691e575be34be5945e 100644 --- a/cmdline/CtaAdminCmdParse.hpp +++ b/cmdline/CtaAdminCmdParse.hpp @@ -488,7 +488,7 @@ const std::map<cmd_key_t, cmd_val_t> cmdOptions = { {{ AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_LS }, { }}, /*----------------------------------------------------------------------------------------------------*/ {{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ADD }, - { opt_vid.optional(), opt_vidfile.optional(), opt_bufferurl.optional(), opt_justmove.optional(), opt_justaddcopies.optional() }}, + { opt_vid.optional(), opt_vidfile.optional(), opt_bufferurl.optional(), opt_justmove.optional(), opt_justaddcopies.optional(), opt_mountpolicy.optional() }}, {{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_RM }, { opt_vid }}, {{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_LS }, { opt_vid.optional() }}, {{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ERR }, { opt_vid }}, diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index 4fbaeb5226797a136d899f1f6b82e06b547a0961..c80cb6ca70dd52960042484a21b37bca12bc2482 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -720,6 +720,7 @@ TEST(ObjectStore, GarbageCollectorRepackRequestPending) { repackRequest.setVid("VIDTest"); repackRequest.setBufferURL("test/buffer/url"); repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress()); + repackRequest.setMountPolicy(cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack); repackRequest.insert(); } { @@ -800,6 +801,7 @@ TEST(ObjectStore, GarbageCollectorRepackRequestToExpand) { repackRequest.setVid("VID2Test"); repackRequest.setBufferURL("test/buffer/url"); repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress()); + repackRequest.setMountPolicy(cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack); repackRequest.insert(); } { @@ -880,6 +882,7 @@ TEST(ObjectStore, GarbageCollectorRepackRequestRunningExpandNotFinished) { repackRequest.setBufferURL("test/buffer/url"); repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress()); repackRequest.setExpandFinished(false); + repackRequest.setMountPolicy(cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack); repackRequest.insert(); } { @@ -961,6 +964,7 @@ TEST(ObjectStore, GarbageCollectorRepackRequestRunningExpandFinished) { repackRequest.setBufferURL("test/buffer/url"); repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress()); repackRequest.setExpandFinished(true); + repackRequest.setMountPolicy(cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack); repackRequest.insert(); } cta::log::StringLogger strLogger("dummy", "dummy", cta::log::DEBUG); @@ -1059,6 +1063,7 @@ TEST(ObjectStore, GarbageCollectorRepackRequestStarting) { repackRequest.setBufferURL("test/buffer/url"); repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress()); repackRequest.setExpandFinished(true); + repackRequest.setMountPolicy(cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack); repackRequest.insert(); } cta::log::StringLogger strLogger("dummy", "dummy", cta::log::DEBUG); diff --git a/objectstore/RepackRequest.cpp b/objectstore/RepackRequest.cpp index fa481405e9745f80a6954fa79bfabe126d456d00..b7bb2a6d9fbb39717037fe1fc14018f66a832dea 100644 --- a/objectstore/RepackRequest.cpp +++ b/objectstore/RepackRequest.cpp @@ -21,6 +21,7 @@ #include "AgentReference.hpp" #include "RepackQueueAlgorithms.hpp" #include "Algorithms.hpp" +#include "MountPolicySerDeser.hpp" #include <google/protobuf/util/json_util.h> #include <iostream> @@ -177,6 +178,19 @@ void RepackRequest::setTotalStats(const cta::SchedulerDatabase::RepackRequest::T setTotalBytesToRetrieve(totalStatsFiles.totalBytesToRetrieve); } +void RepackRequest::setMountPolicy(const common::dataStructures::MountPolicy& mp){ + checkPayloadWritable(); + MountPolicySerDeser mpSerDeser(mp); + mpSerDeser.serialize(*m_payload.mutable_mount_policy()); +} + +common::dataStructures::MountPolicy RepackRequest::getMountPolicy(){ + checkPayloadReadable(); + MountPolicySerDeser mpSerDeser; + mpSerDeser.deserialize(m_payload.mount_policy()); + return mpSerDeser; +} + void RepackRequest::setStatus(){ checkPayloadWritable(); checkPayloadReadable(); diff --git a/objectstore/RepackRequest.hpp b/objectstore/RepackRequest.hpp index f1ae27b6aea5a4ebd98316fe9e0d813f1f549c2f..70dac1f871a13ccb9aef43b0f470ea8d24093ba7 100644 --- a/objectstore/RepackRequest.hpp +++ b/objectstore/RepackRequest.hpp @@ -48,6 +48,9 @@ public: void setExpandStarted(const bool expandStarted); void setTotalStats(const cta::SchedulerDatabase::RepackRequest::TotalStatsFiles& totalStatsFiles); cta::SchedulerDatabase::RepackRequest::TotalStatsFiles getTotalStatsFile(); + void setMountPolicy(const common::dataStructures::MountPolicy &mp); + common::dataStructures::MountPolicy getMountPolicy(); + /** * Automatically set the new status of the Repack Request * regarding multiple parameters diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index 223e84a606e168134b2792a808d023c9eb42053e..ee799a1592f350bb314acf1b264df21452d5cf91 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -1052,7 +1052,7 @@ void RetrieveRequest::AsyncJobDeleter::wait() { RetrieveRequest::AsyncJobSucceedForRepackReporter * RetrieveRequest::asyncReportSucceedForRepack(uint32_t copyNb) { std::unique_ptr<AsyncJobSucceedForRepackReporter> ret(new AsyncJobSucceedForRepackReporter); - ret->m_updaterCallback = [copyNb](const std::string &in)->std::string{ + ret->m_updaterCallback = [&ret,copyNb](const std::string &in)->std::string{ // We have a locked and fetched object, so we just need to work on its representation. cta::objectstore::serializers::ObjectHeader oh; if (!oh.ParseFromString(in)) { @@ -1085,6 +1085,7 @@ RetrieveRequest::AsyncJobSucceedForRepackReporter * RetrieveRequest::asyncReport return oh.SerializeAsString(); } } + ret->m_MountPolicy.deserialize(payload.mountpolicy()); throw cta::exception::Exception("In RetrieveRequest::asyncReportSucceedForRepack::lambda(): copyNb not found"); }; ret->m_backendUpdater.reset(m_objectStore.asyncUpdate(getAddressIfSet(),ret->m_updaterCallback)); diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp index d01720113c47658e814ea2e6fa75c42fafd1b46d..62a7808f89c563e2bc81c1b7a00c78b88eed946b 100644 --- a/objectstore/RetrieveRequest.hpp +++ b/objectstore/RetrieveRequest.hpp @@ -33,6 +33,7 @@ #include "common/dataStructures/LifecycleTimings.hpp" #include "AgentReference.hpp" #include "SorterArchiveJob.hpp" +#include "MountPolicySerDeser.hpp" namespace cta { namespace objectstore { @@ -77,6 +78,7 @@ public: * Wait for the end of the execution of the updater callback */ void wait(); + MountPolicySerDeser m_MountPolicy; private: //Hold the AsyncUpdater that will run asynchronously the m_updaterCallback std::unique_ptr<Backend::AsyncUpdater> m_backendUpdater; diff --git a/objectstore/cta.proto b/objectstore/cta.proto index 8669b7f1bca23a30e956c3b8ae0c69bbf8979f70..55a327cac44482c5126eb8c448f86b1ce226e06b 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -576,6 +576,7 @@ message RepackRequest { //the expansion of the RepackRequest is done or not required bool is_expand_finished = 11561; required bool is_expand_started = 11562; + required MountPolicy mount_policy = 11563; repeated RepackSubRequestPointer subrequests = 11570; } diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 10364dcb100933a5d688bfada64da5f9e9974086..412ddcc7da788d2fe007e4949f3d40a2201c3ba9 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -1401,7 +1401,7 @@ OStoreDB::RetrieveQueueItor_t* OStoreDB::getRetrieveJobItorPtr(const std::string // OStoreDB::queueRepack() //------------------------------------------------------------------------------ void OStoreDB::queueRepack(const std::string& vid, const std::string& bufferURL, - common::dataStructures::RepackInfo::Type repackType, log::LogContext & lc) { + common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy& mountPolicy, log::LogContext & lc) { // Prepare the repack request object in memory. assertAgentAddressSet(); cta::utils::Timer t; @@ -1412,6 +1412,7 @@ void OStoreDB::queueRepack(const std::string& vid, const std::string& bufferURL, rr->setVid(vid); rr->setType(repackType); rr->setBufferURL(bufferURL); + rr->setMountPolicy(mountPolicy); // Try to reference the object in the index (will fail if there is already a request with this VID. try { Helpers::registerRepackRequestToIndex(vid, rr->getAddressIfSet(), *m_agentReference, m_objectStore, lc); @@ -1863,7 +1864,7 @@ void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) { // As usual there are many opportunities for failure. utils::Timer t; log::TimingList timingList; - + cta::common::dataStructures::MountPolicy mountPolicy; // 1) Update statistics. As the repack request is protected against double reporting, we can release its lock // before the next step. { @@ -1880,6 +1881,7 @@ void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) { objectstore::ScopedExclusiveLock rrl(m_repackRequest); timingList.insertAndReset("successStatsLockTime", t); m_repackRequest.fetch(); + mountPolicy = m_repackRequest.getMountPolicy(); timingList.insertAndReset("successStatsFetchTime", t); m_repackRequest.reportRetriveSuccesses(ssl); timingList.insertAndReset("successStatsUpdateTime", t); @@ -1956,7 +1958,7 @@ void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) { sorterArchiveJob.jobDump.copyNb = copyNbToArchive; sorterArchiveJob.jobDump.tapePool = atar.subrequestInfo.repackInfo.archiveRouteMap[copyNbToArchive]; sorterArchiveJob.jobQueueType = cta::objectstore::JobQueueType::JobsToTransferForRepack; - sorterArchiveJob.mountPolicy = common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack; + sorterArchiveJob.mountPolicy = mountPolicy; sorterArchiveJob.previousOwner = atar.subrequestInfo.owner; sorterArchiveRequest.archiveJobs.push_back(sorterArchiveJob); } @@ -2187,6 +2189,7 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest> m_repackRequest.setTotalStats(totalStatsFiles); uint64_t fSeq = std::max(maxFSeqLowBound + 1, maxAddedFSeq + 1); m_repackRequest.setLastExpandedFSeq(fSeq); + common::dataStructures::MountPolicy mountPolicy = m_repackRequest.getMountPolicy(); // We make sure the references to subrequests exist persistently before creating them. m_repackRequest.commit(); // We keep holding the repack request lock: we need to ensure de deleted boolean of each subrequest does @@ -2258,7 +2261,7 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest> // Set the queueing parameters common::dataStructures::RetrieveFileQueueCriteria fileQueueCriteria; fileQueueCriteria.archiveFile = rsr.archiveFile; - fileQueueCriteria.mountPolicy = common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack; + fileQueueCriteria.mountPolicy = mountPolicy; rr->setRetrieveFileQueueCriteria(fileQueueCriteria); // Decide of which vid we are going to retrieve from. Here, if we can retrieve from the repack VID, we // will set the initial recall on it. Retries will we requeue to best VID as usual if needed. @@ -3579,11 +3582,13 @@ void OStoreDB::RetrieveMount::flushAsyncSuccessReports(std::list<cta::SchedulerD // We will wait on the asynchronously started reports of jobs, queue the retrieve jobs // for report and remove them from ownership. // 1) Check the async update result. + common::dataStructures::MountPolicy mountPolicy; for (auto & sDBJob: jobsBatch) { auto osdbJob = castFromSchedDBJob(sDBJob); if (osdbJob->isRepack) { try { osdbJob->m_jobSucceedForRepackReporter->wait(); + mountPolicy = osdbJob->m_jobSucceedForRepackReporter->m_MountPolicy; jobsToRequeueForRepackMap[osdbJob->m_repackInfo.repackRequestAddress].emplace_back(osdbJob); } catch (cta::exception::Exception & ex) { log::ScopedParamContainer params(lc); @@ -3630,7 +3635,7 @@ void OStoreDB::RetrieveMount::flushAsyncSuccessReports(std::list<cta::SchedulerD for (auto & req: repackRequestQueue.second) { insertedRequests.push_back(RQTRTRFSAlgo::InsertedElement{&req->m_retrieveRequest, req->selectedCopyNb, req->archiveFile.tapeFiles.at(req->selectedCopyNb).fSeq, req->archiveFile.fileSize, - cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, + mountPolicy, serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess, req->m_activityDescription}); requestToJobMap[&req->m_retrieveRequest] = req; } diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index 2ea2e547e57d1e8ca604af22848a0fdb2c2c780f..e93bd703427d3b7387103046483f6c7f6ee705a2 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -331,7 +331,7 @@ public: /* === Repack requests handling =========================================== */ void queueRepack(const std::string& vid, const std::string& bufferURL, - common::dataStructures::RepackInfo::Type repackType, log::LogContext &logContext) override; + common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, log::LogContext &logContext) override; std::list<common::dataStructures::RepackInfo> getRepackInfo() override; CTA_GENERATE_EXCEPTION_CLASS(NoSuchRepackRequest); diff --git a/scheduler/OStoreDB/OStoreDBFactory.hpp b/scheduler/OStoreDB/OStoreDBFactory.hpp index d8f732ec2a96040037581b210ef8f8c7a12f03b5..858d48c43cbb0a6327dc2c0433f0df42b9127223 100644 --- a/scheduler/OStoreDB/OStoreDBFactory.hpp +++ b/scheduler/OStoreDB/OStoreDBFactory.hpp @@ -224,8 +224,8 @@ public: } - void queueRepack(const std::string& vid, const std::string& bufferURL, common::dataStructures::RepackInfo::Type repackType, log::LogContext& lc) override { - m_OStoreDB.queueRepack(vid, bufferURL, repackType, lc); + void queueRepack(const std::string& vid, const std::string& bufferURL, common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, log::LogContext& lc) override { + m_OStoreDB.queueRepack(vid, bufferURL, repackType, mountPolicy, lc); } std::list<common::dataStructures::RepackInfo> getRepackInfo() override { diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index f354089d93fcf4485f13350138b3796920171e8b..b1118555ba36f8ea3ef5e38909f154eb8bbe33c7 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -337,13 +337,13 @@ void Scheduler::checkTapeFullBeforeRepack(std::string vid){ // repack //------------------------------------------------------------------------------ void Scheduler::queueRepack(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid, - const std::string & bufferURL, const common::dataStructures::RepackInfo::Type repackType, log::LogContext & lc) { + const std::string & bufferURL, const common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, log::LogContext & lc) { // Check request sanity if (vid.empty()) throw exception::UserError("Empty VID name."); if (bufferURL.empty()) throw exception::UserError("Empty buffer URL."); utils::Timer t; checkTapeFullBeforeRepack(vid); - m_db.queueRepack(vid, bufferURL, repackType, lc); + m_db.queueRepack(vid, bufferURL, repackType, mountPolicy, lc); log::TimingList tl; tl.insertAndReset("schedulerDbTime", t); log::ScopedParamContainer params(lc); diff --git a/scheduler/Scheduler.hpp b/scheduler/Scheduler.hpp index 9ec0464e648e36606eec7886931cf2efa4d7819d..8a4a3ab192b16d2865954667be23a793f2d7dd4e 100644 --- a/scheduler/Scheduler.hpp +++ b/scheduler/Scheduler.hpp @@ -200,7 +200,7 @@ public: const bool force); void queueRepack(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid, - const std::string & bufferURL, const common::dataStructures::RepackInfo::Type repackType, log::LogContext & lc); + const std::string & bufferURL, const common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, log::LogContext & lc); void cancelRepack(const cta::common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid, log::LogContext & lc); std::list<cta::common::dataStructures::RepackInfo> getRepacks(); cta::common::dataStructures::RepackInfo getRepack(const std::string &vid); diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index 23dde264afe4ed7c9af32d5d81b84fae51e9f860..09f67419e4113ae0019ee8f0cd33a81c64f52a4a 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -388,7 +388,7 @@ public: /*============ Repack management: user side ================================*/ virtual void queueRepack(const std::string & vid, const std::string & bufferURL, - common::dataStructures::RepackInfo::Type repackType, log::LogContext & lc) = 0; + common::dataStructures::RepackInfo::Type repackType, const common::dataStructures::MountPolicy &mountPolicy, log::LogContext & lc) = 0; virtual std::list<common::dataStructures::RepackInfo> getRepackInfo() = 0; virtual common::dataStructures::RepackInfo getRepackInfo(const std::string & vid) = 0; virtual void cancelRepack(const std::string & vid, log::LogContext & lc) = 0; diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 89d617c3ff7940b9ca2e953450c99b4c4bb62696..3b1b664464a5751b89ee9eb65d6a770a78edd0d2 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -1275,14 +1275,14 @@ TEST_P(SchedulerTest, repack) { catalogue.createTape(cliId,tape1,"mediaType","vendor",s_libraryName,s_tapePoolName,500,false,false, notReadOnly, "Comment"); //The queueing of a repack request should fail if the tape to repack is not full - ASSERT_THROW(scheduler.queueRepack(cliId, tape1, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, lc),cta::exception::UserError); + ASSERT_THROW(scheduler.queueRepack(cliId, tape1, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack,lc),cta::exception::UserError); //The queueing of a repack request in a vid that does not exist should throw an exception - ASSERT_THROW(scheduler.queueRepack(cliId, "NOT_EXIST", "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, lc),cta::exception::UserError); + ASSERT_THROW(scheduler.queueRepack(cliId, "NOT_EXIST", "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc),cta::exception::UserError); catalogue.setTapeFull(cliId,tape1,true); // Create and then cancel repack - scheduler.queueRepack(cliId, tape1, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, lc); + scheduler.queueRepack(cliId, tape1, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc); { auto repacks = scheduler.getRepacks(); ASSERT_EQ(1, repacks.size()); @@ -1294,7 +1294,7 @@ TEST_P(SchedulerTest, repack) { // Recreate a repack and get it moved to ToExpand std::string tape2 = "Tape2"; catalogue.createTape(cliId,tape2,"mediaType","vendor",s_libraryName,s_tapePoolName,500,false,true, notReadOnly, "Comment"); - scheduler.queueRepack(cliId, tape2, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, lc); + scheduler.queueRepack(cliId, tape2, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc); { auto repacks = scheduler.getRepacks(); ASSERT_EQ(1, repacks.size()); @@ -1335,13 +1335,13 @@ TEST_P(SchedulerTest, getNextRepackRequestToExpand) { catalogue.createTape(cliId,tape1,"mediaType","vendor",s_libraryName,s_tapePoolName,500,false,true, notReadOnly, "Comment"); //Queue the first repack request - scheduler.queueRepack(cliId, tape1, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly, lc); + scheduler.queueRepack(cliId, tape1, "file://"+tempDirectory.path(), common::dataStructures::RepackInfo::Type::MoveOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc); std::string tape2 = "Tape2"; catalogue.createTape(cliId,tape2,"mediaType","vendor",s_libraryName,s_tapePoolName,500,false,true, notReadOnly, "Comment"); //Queue the second repack request - scheduler.queueRepack(cliId,tape2,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::AddCopiesOnly,lc); + scheduler.queueRepack(cliId,tape2,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::AddCopiesOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc); //Test the repack request queued has status Pending ASSERT_EQ(scheduler.getRepack(tape1).status,common::dataStructures::RepackInfo::Status::Pending); @@ -1472,7 +1472,7 @@ TEST_P(SchedulerTest, expandRepackRequest) { scheduler.waitSchedulerDbSubthreadsComplete(); { for(uint64_t i = 0; i < nbTapesToRepack ; ++i) { - scheduler.queueRepack(admin,allVid.at(i),"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,lc); + scheduler.queueRepack(admin,allVid.at(i),"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc); } scheduler.waitSchedulerDbSubthreadsComplete(); //scheduler.waitSchedulerDbSubthreadsComplete(); @@ -1789,7 +1789,7 @@ TEST_P(SchedulerTest, expandRepackRequestRetrieveFailed) { scheduler.waitSchedulerDbSubthreadsComplete(); { - scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,lc); + scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc); scheduler.waitSchedulerDbSubthreadsComplete(); log::TimingList tl; @@ -2028,7 +2028,7 @@ TEST_P(SchedulerTest, expandRepackRequestArchiveSuccess) { scheduler.waitSchedulerDbSubthreadsComplete(); { - scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,lc); + scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc); scheduler.waitSchedulerDbSubthreadsComplete(); //scheduler.waitSchedulerDbSubthreadsComplete(); @@ -2274,7 +2274,7 @@ TEST_P(SchedulerTest, expandRepackRequestArchiveFailed) { scheduler.waitSchedulerDbSubthreadsComplete(); { - scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,lc); + scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly, common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc); scheduler.waitSchedulerDbSubthreadsComplete(); log::TimingList tl; @@ -2568,7 +2568,7 @@ TEST_P(SchedulerTest, expandRepackRequestExpansionTimeLimitReached) { //one retrieve request scheduler.waitSchedulerDbSubthreadsComplete(); { - scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,lc); + scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, lc); scheduler.waitSchedulerDbSubthreadsComplete(); log::TimingList tl; diff --git a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp index 8865377c0ff04afa2f304bfb63fa6c364b9573ba..83184c131c203ff385d655058ee73abe637830e1 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp @@ -1805,7 +1805,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionTapeFullOnFlushMigration) { // We need to create the drive in the registry before being able to put it up. scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, logContext); scheduler.setDesiredDriveState(s_adminOnAdminHost, driveConfig.unitName, true, false, logContext); - + // Create the data transfer session DataTransferConfig castorConf; castorConf.bufsz = 1024*1024; // 1 MB memory buffers diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp index 0a22fdc2902c8e9589267e64dfd6579fe5fa4971..057d310377a98ec45f180b985205deee392ad081 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp @@ -238,7 +238,7 @@ private: /* * The limit for successful reports to trigger flush. */ - const unsigned int RECALL_REPORT_PACKER_FLUSH_SIZE = 500; + const unsigned int RECALL_REPORT_PACKER_FLUSH_SIZE = 2000; /* * The time limit for successful reports to trigger flush. diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp index 75f4d2ac421290190dfc4cc5ce1a8e81538817c2..af9c5a4d7c53f666ef40601d968d6e81ef2d3e18 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp @@ -1083,6 +1083,28 @@ void RequestMessage::processRepack_Add(cta::xrd::Response &response) throw cta::exception::UserError("Must specify the buffer URL using --bufferurl option or using the frontend configuration file."); } } + + typedef common::dataStructures::MountPolicy MountPolicy; + MountPolicy mountPolicy = MountPolicy::s_defaultMountPolicyForRepack; + + auto mountPolicyProvidedByUserOpt = getOptional(OptionString::MOUNT_POLICY); + if(mountPolicyProvidedByUserOpt){ + //The user specified a mount policy name for this repack request + std::string mountPolicyProvidedByUser = mountPolicyProvidedByUserOpt.value(); + //Get the mountpolicy from the catalogue + typedef std::list<common::dataStructures::MountPolicy> MountPolicyList; + MountPolicyList mountPolicies = m_catalogue.getMountPolicies(); + MountPolicyList::const_iterator repackMountPolicyItor = std::find_if(mountPolicies.begin(),mountPolicies.end(),[mountPolicyProvidedByUser](const common::dataStructures::MountPolicy & mp){ + return mp.name == mountPolicyProvidedByUser; + }); + if(repackMountPolicyItor != mountPolicies.end()){ + //The mount policy exists + mountPolicy = *repackMountPolicyItor; + } else { + //The mount policy does not exist, throw a user error + throw cta::exception::UserError("The mount policy name provided does not match any existing mount policy."); + } + } // Expand, repack, or both ? cta::common::dataStructures::RepackInfo::Type type; @@ -1099,7 +1121,7 @@ void RequestMessage::processRepack_Add(cta::xrd::Response &response) // Process each item in the list for(auto it = vid_list.begin(); it != vid_list.end(); ++it) { - m_scheduler.queueRepack(m_cliIdentity, *it, bufferURL, type, m_lc); + m_scheduler.queueRepack(m_cliIdentity, *it, bufferURL, type, mountPolicy , m_lc); } response.set_type(cta::xrd::Response::RSP_SUCCESS);