diff --git a/objectstore/AlgorithmsTest.cpp b/objectstore/AlgorithmsTest.cpp index 8eced3a140e60fcdde65026ac1da358daa43b1c7..4f77bf42cadcc39993b117175275ef55ea36d1ef 100644 --- a/objectstore/AlgorithmsTest.cpp +++ b/objectstore/AlgorithmsTest.cpp @@ -78,7 +78,7 @@ void fillRetrieveRequests( rqc.mountPolicy.retrievePriority = 1; requestPtrs.emplace_back(new cta::objectstore::RetrieveRequest(rrAddr, be)); requests.emplace_back(ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransferForUser>::InsertedElement{ - requestPtrs.back().get(), 1, i, 667, mp, serializers::RetrieveJobStatus::RJS_ToTransferForUser, cta::nullopt + requestPtrs.back().get(), 1, i, 667, mp, serializers::RetrieveJobStatus::RJS_ToTransferForUser, cta::nullopt, cta::nullopt }); auto &rr = *requests.back().retrieveRequest; rr.initialize(); diff --git a/objectstore/CMakeLists.txt b/objectstore/CMakeLists.txt index d30eb6aa7f687da85aebd42283ea7f34644a7115..508be71b85885032673cecd3c6a26a240a5ec082 100644 --- a/objectstore/CMakeLists.txt +++ b/objectstore/CMakeLists.txt @@ -75,12 +75,12 @@ add_library (ctaobjectstore SHARED ArchiveQueueToTransferForRepackAlgorithms.cpp RetrieveQueue.cpp RetrieveQueueShard.cpp - RetrieveQueueToTransferAlgorithms.cpp + RetrieveQueueToTransferForUserAlgorithms.cpp + RetrieveQueueToTransferForRepackAlgorithms.cpp RetrieveQueueToReportAlgorithms.cpp RetrieveQueueFailedAlgorithms.cpp RetrieveQueueToReportToRepackForSuccessAlgorithms.cpp RetrieveQueueToReportToRepackForFailureAlgorithms.cpp - RetrieveQueueToTransferForRepackAlgorithms.cpp JobQueueType.cpp Sorter.cpp ArchiveRequest.cpp diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index 6ff68c09a4772e5e8803ce99735dd61d315efc73..548637d6998801de0933d9229fc9fb6e510a07e2 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -559,7 +559,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& for (auto &tf: rr->getArchiveFile().tapeFiles) { if (tf.vid == vid) { jta.push_back({tf.copyNb, tf.fSeq, rr->getAddressIfSet(), rr->getArchiveFile().fileSize, - rr->getRetrieveFileQueueCriteria().mountPolicy, rr->getEntryLog().time, rr->getActivity()}); + rr->getRetrieveFileQueueCriteria().mountPolicy, rr->getEntryLog().time, rr->getActivity(), rr->getDiskSystemName()}); } } } diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index 249b1f99ce40875ddf88c426298ded9f2c4995f7..b6a97711c72e45240fceb60e9c0caea6a6b86cd1 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -608,7 +608,8 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) { cta::objectstore::ScopedExclusiveLock rql(rq); rq.fetch(); std::list <cta::objectstore::RetrieveQueue::JobToAdd> jta; - jta.push_back({1,rqc.archiveFile.tapeFiles.front().fSeq, rr.getAddressIfSet(), rqc.archiveFile.fileSize, rqc.mountPolicy, sReq.creationLog.time, cta::nullopt}); + jta.push_back({1,rqc.archiveFile.tapeFiles.front().fSeq, rr.getAddressIfSet(), rqc.archiveFile.fileSize, rqc.mountPolicy, + sReq.creationLog.time, cta::nullopt, cta::nullopt}); rq.addJobsAndCommit(jta, agentRef, lc); } if (pass < 5) { pass++; continue; } diff --git a/objectstore/RetrieveQueue.cpp b/objectstore/RetrieveQueue.cpp index 755e48550338480eb412a9098909741e884246d1..6536126d60bbd223149e0e458869e32dc631a5d5 100644 --- a/objectstore/RetrieveQueue.cpp +++ b/objectstore/RetrieveQueue.cpp @@ -531,7 +531,7 @@ auto RetrieveQueue::addJobsIfNecessaryAndCommit(std::list<JobToAdd> & jobsToAdd, } shardsDumps.emplace_back(std::list<JobDump>()); for (auto & j: s->dumpJobs()) { - shardsDumps.back().emplace_back(JobDump({j.address, j.copyNb, j.size})); + shardsDumps.back().emplace_back(JobDump({j.address, j.copyNb, j.size, j.activityDescription, j.diskSystemName})); } nextShard: s++; @@ -606,7 +606,7 @@ auto RetrieveQueue::dumpJobs() -> std::list<JobDump> { goto nextShard; } for (auto & j: s->dumpJobs()) { - ret.emplace_back(JobDump{j.address, j.copyNb, j.size}); + ret.emplace_back(JobDump{j.address, j.copyNb, j.size, j.activityDescription, j.diskSystemName}); } nextShard: s++; sf++; diff --git a/objectstore/RetrieveQueue.hpp b/objectstore/RetrieveQueue.hpp index d9566754e42c0a23d05491452a6470d247d37674..f3337bc2d78e4293252fb60eeadd9bfd84f9010e 100644 --- a/objectstore/RetrieveQueue.hpp +++ b/objectstore/RetrieveQueue.hpp @@ -66,7 +66,8 @@ public: uint64_t fileSize; cta::common::dataStructures::MountPolicy policy; time_t startTime; - optional<RetrieveActivityDescription> activityDescription; + optional<RetrieveActivityDescription> activityDescription; + optional<std::string> diskSystemName; }; void addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentReference & agentReference, log::LogContext & lc); // This version will check for existence of the job in the queue before @@ -97,6 +98,12 @@ public: std::string address; uint32_t copyNb; uint64_t size; + struct ActivityDescription { + std::string diskInstanceName; + std::string activity; + }; + optional<ActivityDescription> activity; + optional<std::string> diskSystemName; }; std::list<JobDump> dumpJobs(); struct CandidateJobList { diff --git a/objectstore/RetrieveQueueAlgorithms.hpp b/objectstore/RetrieveQueueAlgorithms.hpp index 45dd12db5e3914b59ba2ae0c082e95f00d9530f0..450108e59888b8ff103b024b33ae6ec55197a02e 100644 --- a/objectstore/RetrieveQueueAlgorithms.hpp +++ b/objectstore/RetrieveQueueAlgorithms.hpp @@ -45,6 +45,7 @@ struct ContainerTraits<RetrieveQueue,C> cta::common::dataStructures::MountPolicy policy; serializers::RetrieveJobStatus status; optional<RetrieveActivityDescription> activityDescription; + optional<std::string> diskSystemName; typedef std::list<InsertedElement> list; }; @@ -59,6 +60,8 @@ struct ContainerTraits<RetrieveQueue,C> std::string errorReportURL; SchedulerDatabase::RetrieveJob::ReportType reportType; RetrieveRequest::RepackInfo repackInfo; + optional<RetrieveQueue::JobDump::ActivityDescription> activity; + optional<std::string> diskSystemName; }; struct PoppedElementsSummary; struct PopCriteria { @@ -279,7 +282,7 @@ addReferencesAndCommit(Container &cont, typename InsertedElement::list &elemMemC std::list<RetrieveQueue::JobToAdd> jobsToAdd; for (auto &e : elemMemCont) { RetrieveRequest &rr = *e.retrieveRequest; - jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr), e.activityDescription}); + jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr), e.activityDescription, e.diskSystemName}); } cont.addJobsAndCommit(jobsToAdd, agentRef, lc); } @@ -292,7 +295,7 @@ addReferencesIfNecessaryAndCommit(Container& cont, typename InsertedElement::lis std::list<RetrieveQueue::JobToAdd> jobsToAdd; for (auto &e : elemMemCont) { RetrieveRequest &rr = *e.retrieveRequest; - jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr), e.activityDescription}); + jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr), e.activityDescription, e.diskSystemName}); } cont.addJobsIfNecessaryAndCommit(jobsToAdd, agentRef, lc); } @@ -387,6 +390,11 @@ switchElementsOwnership(PoppedElementsBatch &poppedElementBatch, const Container e.archiveFile = u.get()->getArchiveFile(); e.rr = u.get()->getRetrieveRequest(); e.repackInfo = u.get()->getRepackInfo(); + auto & rad = u.get()->getRetrieveActivityDescription(); + if (rad) { + e.activity = RetrieveQueue::JobDump::ActivityDescription{ rad.value().diskInstanceName, rad.value().activity }; + } + e.diskSystemName = u.get()->getDiskSystemName(); switch(u.get()->getJobStatus()) { case serializers::RetrieveJobStatus::RJS_ToReportToUserForFailure: e.reportType = SchedulerDatabase::RetrieveJob::ReportType::FailureReport; @@ -492,7 +500,7 @@ getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, common::dataStructures::ArchiveFile(), common::dataStructures::RetrieveRequest(), "", SchedulerDatabase::RetrieveJob::ReportType::NoReportRequired, - RetrieveRequest::RepackInfo() + RetrieveRequest::RepackInfo(), cjfq.activity, cjfq.diskSystemName }); ret.summary.files++; } diff --git a/objectstore/RetrieveQueueShard.cpp b/objectstore/RetrieveQueueShard.cpp index 76ebe1e5c36e03698b3c8f7894f8dbade013fe38..9c00e31696d4346e93a973c4b14a59944281a3ce 100644 --- a/objectstore/RetrieveQueueShard.cpp +++ b/objectstore/RetrieveQueueShard.cpp @@ -69,7 +69,16 @@ RetrieveQueue::CandidateJobList RetrieveQueueShard::getCandidateJobList(uint64_t ret.remainingFilesAfterCandidates = m_payload.retrievejobs_size(); for (auto & j: m_payload.retrievejobs()) { if (!retrieveRequestsToSkip.count(j.address())) { - ret.candidates.push_back({j.address(), (uint16_t)j.copynb(), j.size()}); + ret.candidates.push_back({j.address(), (uint16_t)j.copynb(), j.size(), nullopt, nullopt}); + if (j.has_activity()) { + RetrieveQueue::JobDump::ActivityDescription ad; + ad.activity = j.activity(); + ad.diskInstanceName = j.disk_instance_name(); + ret.candidates.back().activity = ad; + } + if (j.has_destination_disk_system_name()) { + ret.candidates.back().diskSystemName = j.destination_disk_system_name(); + } ret.candidateBytes += j.size(); ret.candidateFiles ++; } @@ -104,7 +113,9 @@ auto RetrieveQueueShard::removeJobs(const std::list<std::string>& jobsToRemove) ret.removedJobs.back().size = j.size(); ret.removedJobs.back().startTime = j.starttime(); if (j.has_activity()) - ret.removedJobs.back().activityDescription = JobInfo::ActivityDescription{ j.disk_instance_name(), j.activity() }; + ret.removedJobs.back().activityDescription = RetrieveQueue::JobDump::ActivityDescription{ j.disk_instance_name(), j.activity() }; + if (j.has_destination_disk_system_name()) + ret.removedJobs.back().diskSystemName = j.destination_disk_system_name(); ret.bytesRemoved += j.size(); totalSize -= j.size(); ret.jobsRemoved++; @@ -139,9 +150,12 @@ auto RetrieveQueueShard::dumpJobs() -> std::list<JobInfo> { std::list<JobInfo> ret; for (auto &j: m_payload.retrievejobs()) { ret.emplace_back(JobInfo{j.size(), j.address(), (uint16_t)j.copynb(), j.priority(), - j.minretrieverequestage(), j.maxdrivesallowed(), (time_t)j.starttime(), j.fseq(), nullopt}); + j.minretrieverequestage(), j.maxdrivesallowed(), (time_t)j.starttime(), j.fseq(), nullopt, nullopt}); if (j.has_activity()) { - ret.back().activityDescription = JobInfo::ActivityDescription{ j.disk_instance_name(), j.activity() }; + ret.back().activityDescription = RetrieveQueue::JobDump::ActivityDescription{ j.disk_instance_name(), j.activity() }; + } + if (j.has_destination_disk_system_name()) { + ret.back().diskSystemName = j.destination_disk_system_name(); } } return ret; diff --git a/objectstore/RetrieveQueueShard.hpp b/objectstore/RetrieveQueueShard.hpp index 7da9d10d72144578e2975e25332770e85d0b489b..8eef6d0af623ed8038e1b97a21160a546d4d2920 100644 --- a/objectstore/RetrieveQueueShard.hpp +++ b/objectstore/RetrieveQueueShard.hpp @@ -53,11 +53,8 @@ public: uint64_t maxDrivesAllowed; time_t startTime; uint64_t fSeq; - struct ActivityDescription { - std::string diskInstanceName; - std::string activity; - }; - optional<ActivityDescription> activityDescription; + optional<RetrieveQueue::JobDump::ActivityDescription> activityDescription; + optional<std::string> diskSystemName; }; std::list<JobInfo> dumpJobs(); diff --git a/objectstore/RetrieveQueueToTransferAlgorithms.cpp b/objectstore/RetrieveQueueToTransferForUserAlgorithms.cpp similarity index 96% rename from objectstore/RetrieveQueueToTransferAlgorithms.cpp rename to objectstore/RetrieveQueueToTransferForUserAlgorithms.cpp index 2d80b46b33cf75d87886f1b2572b3f7739547096..90dcb21623eb00dfc456bde5dc00039d95cfa43d 100644 --- a/objectstore/RetrieveQueueToTransferAlgorithms.cpp +++ b/objectstore/RetrieveQueueToTransferForUserAlgorithms.cpp @@ -56,7 +56,9 @@ getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, common::dataStructures::RetrieveRequest(), "", SchedulerDatabase::RetrieveJob::ReportType::NoReportRequired, - RetrieveRequest::RepackInfo() + RetrieveRequest::RepackInfo(), + cjfq.activity, + cjfq.diskSystemName }); ret.summary.bytes += cjfq.size; ret.summary.files++; diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index f1a5be8b1fdf95f739e0d95a1aee476e5052877f..a1c75c17a7ecfb6deb9dddd11624f0f1ea6a87de 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -158,7 +158,7 @@ queueForFailure:; objectstore::MountPolicySerDeser mp; std::list<RetrieveQueue::JobToAdd> jta; jta.push_back({activeCopyNb, activeFseq, getAddressIfSet(), m_payload.archivefile().filesize(), - mp, (signed)m_payload.schedulerrequest().entrylog().time(), nullopt}); + mp, (signed)m_payload.schedulerrequest().entrylog().time(), nullopt, nullopt}); if (m_payload.has_activity_weight()) { RetrieveActivityDescription activityDescription; activityDescription.priority = m_payload.activity_weight().priority(); @@ -226,7 +226,7 @@ queueForTransfer:; mp.deserialize(m_payload.mountpolicy()); std::list<RetrieveQueue::JobToAdd> jta; jta.push_back({bestTapeFile->copynb(), bestTapeFile->fseq(), getAddressIfSet(), m_payload.archivefile().filesize(), - mp, (signed)m_payload.schedulerrequest().entrylog().time(), nullopt}); + mp, (signed)m_payload.schedulerrequest().entrylog().time(), getActivity(), getDiskSystemName()}); if (m_payload.has_activity_weight()) { RetrieveActivityDescription activityDescription; activityDescription.priority = m_payload.activity_weight().priority(); @@ -484,6 +484,25 @@ optional<RetrieveActivityDescription> RetrieveRequest::getActivity() { return ret; } +//------------------------------------------------------------------------------ +// RetrieveRequest::setDiskSystemName() +//------------------------------------------------------------------------------ +void RetrieveRequest::setDiskSystemName(const std::string& diskSystemName) { + checkPayloadWritable(); + m_payload.set_disk_system_name(diskSystemName); +} + +//------------------------------------------------------------------------------ +// RetrieveRequest::getDiskSystemName() +//------------------------------------------------------------------------------ +optional<std::string> RetrieveRequest::getDiskSystemName() { + checkPayloadReadable(); + optional<std::string> ret; + if (m_payload.has_disk_system_name()) + ret = m_payload.disk_system_name(); + return ret; +} + //------------------------------------------------------------------------------ // RetrieveRequest::dumpJobs() //------------------------------------------------------------------------------ @@ -852,6 +871,15 @@ auto RetrieveRequest::asyncUpdateJobOwner(uint32_t copyNumber, const std::string af.deserialize(payload.archivefile()); retRef.m_archiveFile = af; retRef.m_jobStatus = j.status(); + if (payload.has_activity_weight()) { + retRef.m_retrieveActivityDescription = RetrieveActivityDescription{ + payload.activity_weight().priority(), payload.activity_weight().disk_instance_name(), + payload.activity_weight().activity(), payload.activity_weight().creation_time(), + payload.activity_weight().weight(), 0 + }; + } + if (payload.has_disk_system_name()) + retRef.m_diskSystemName = payload.disk_system_name(); RetrieveRequest::updateLifecycleTiming(payload,j); LifecycleTimingsSerDeser lifeCycleSerDeser; lifeCycleSerDeser.deserialize(payload.lifecycle_timings()); @@ -902,6 +930,20 @@ const RetrieveRequest::RepackInfo& RetrieveRequest::AsyncJobOwnerUpdater::getRep return m_repackInfo; } +//------------------------------------------------------------------------------ +// RetrieveRequest::AsyncJobOwnerUpdater::getRetrieveActivityDescription() +//------------------------------------------------------------------------------ +const optional<RetrieveActivityDescription>& RetrieveRequest::AsyncJobOwnerUpdater::getRetrieveActivityDescription() { + return m_retrieveActivityDescription; +} + +//------------------------------------------------------------------------------ +// RetrieveRequest::AsyncJobOwnerUpdater::getDiskSystemName() +//------------------------------------------------------------------------------ +const optional<std::string>& RetrieveRequest::AsyncJobOwnerUpdater::getDiskSystemName() { + return m_diskSystemName; +} + //------------------------------------------------------------------------------ // RetrieveRequest::AsyncJobOwnerUpdater::getRetrieveRequest() //------------------------------------------------------------------------------ diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp index ec82378e1b40156a0f251d10918729eb72976faa..0ad3cd77fae0a58c15a3d68d19a1e492bea11af2 100644 --- a/objectstore/RetrieveRequest.hpp +++ b/objectstore/RetrieveRequest.hpp @@ -220,6 +220,8 @@ public: const common::dataStructures::RetrieveRequest &getRetrieveRequest(); const common::dataStructures::ArchiveFile &getArchiveFile(); const RepackInfo &getRepackInfo(); + const optional<RetrieveActivityDescription> &getRetrieveActivityDescription(); + const optional<std::string> &getDiskSystemName(); private: std::function<std::string(const std::string &)> m_updaterCallback; std::unique_ptr<Backend::AsyncUpdater> m_backendUpdater; @@ -227,6 +229,8 @@ public: common::dataStructures::ArchiveFile m_archiveFile; RepackInfo m_repackInfo; serializers::RetrieveJobStatus m_jobStatus; + optional<RetrieveActivityDescription> m_retrieveActivityDescription; + optional<std::string> m_diskSystemName; }; // An owner updater factory. The owner MUST be previousOwner for the update to be executed. AsyncJobOwnerUpdater *asyncUpdateJobOwner(uint32_t copyNumber, const std::string &owner, const std::string &previousOwner); @@ -238,6 +242,8 @@ public: void setActivityIfNeeded(const cta::common::dataStructures::RetrieveRequest & retrieveRequest, const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria); optional<RetrieveActivityDescription> getActivity(); + void setDiskSystemName(const std::string & diskSystemName); + optional<std::string> getDiskSystemName(); cta::common::dataStructures::RetrieveFileQueueCriteria getRetrieveFileQueueCriteria(); cta::common::dataStructures::ArchiveFile getArchiveFile(); cta::common::dataStructures::EntryLog getEntryLog(); diff --git a/objectstore/Sorter.cpp b/objectstore/Sorter.cpp index c1159267d2cd8eff0946008edb01d5e491289520..4424bce68bb868c427e0039d987fb583c4545a59 100644 --- a/objectstore/Sorter.cpp +++ b/objectstore/Sorter.cpp @@ -159,7 +159,7 @@ void Sorter::executeRetrieveAlgorithm(const std::string vid, std::string& queueA Sorter::RetrieveJob job = std::get<0>(jobToAdd->jobToQueue); succeededJobs[job.jobDump.copyNb] = jobToAdd; previousOwner = job.previousOwner->getAgentAddress(); - jobsToAdd.push_back({job.retrieveRequest.get(),job.jobDump.copyNb,job.fSeq,job.fileSize,job.mountPolicy,job.jobDump.status,job.activityDescription}); + jobsToAdd.push_back({job.retrieveRequest.get(),job.jobDump.copyNb,job.fSeq,job.fileSize,job.mountPolicy,job.jobDump.status,job.activityDescription,job.diskSystemName}); } try{ algo.referenceAndSwitchOwnershipIfNecessary(vid,previousOwner,queueAddress,jobsToAdd,lc); diff --git a/objectstore/Sorter.hpp b/objectstore/Sorter.hpp index b68b1a8e4634587b584fff06335db6a18d63b847..21198193760cb46d20939750c1189f5902c3a32a 100644 --- a/objectstore/Sorter.hpp +++ b/objectstore/Sorter.hpp @@ -126,6 +126,7 @@ public: common::dataStructures::MountPolicy mountPolicy; cta::objectstore::JobQueueType jobQueueType; optional<RetrieveActivityDescription> activityDescription; + optional<std::string> diskSystemName; }; /** diff --git a/objectstore/cta.proto b/objectstore/cta.proto index daab50293501736f6dcc6ff96158ac8bf4cf9579..0b09a98070813db285b938fcf4476dbd341166e0 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -422,6 +422,7 @@ message RetrieveRequest { required bool isrepack = 9157; optional RetrieveRequestRepackInfo repack_info = 9158; optional LifecycleTimings lifecycle_timings = 9159; + optional string disk_system_name = 9161; } message ValueCountPair { @@ -476,6 +477,7 @@ message RetrieveJobPointer { // For activity (if present), we need disk instance and activity name (priority is always provided) optional string disk_instance_name = 3109; optional string activity = 3110; + optional string destination_disk_system_name = 3111; } message RetrieveQueueShardPointer { diff --git a/scheduler/OStoreDB/MemQueues.cpp b/scheduler/OStoreDB/MemQueues.cpp index 7a1cafd3a8660db7dbc335a8894c1280838eb7b6..67c3172e1ae5d59710eb6687705b4483047cb17b 100644 --- a/scheduler/OStoreDB/MemQueues.cpp +++ b/scheduler/OStoreDB/MemQueues.cpp @@ -55,7 +55,7 @@ void MemQueue<objectstore::RetrieveRequest, objectstore::RetrieveQueue>::special if (j.copyNb == job.copyNb) { auto criteria = request.getRetrieveFileQueueCriteria(); jtal.push_back({j.copyNb, j.fSeq, request.getAddressIfSet(), criteria.archiveFile.fileSize, - criteria.mountPolicy, request.getEntryLog().time, request.getActivity()}); + criteria.mountPolicy, request.getEntryLog().time, request.getActivity(), request.getDiskSystemName()}); request.setActiveCopyNumber(j.copyNb); request.setOwner(queueAddress); goto jobAdded; diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 15f98fa6641e1c6e680120de14fd6e76bfbe0fb6..f843c7bd34840097b90ddcdcb557d571d6794ce6 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -1103,7 +1103,7 @@ void OStoreDB::setRetrieveJobBatchReportedToUser(std::list<cta::SchedulerDatabas insertedElements.emplace_back(CaRQF::InsertedElement{ &j.job->m_retrieveRequest, tf_it->copyNb, tf_it->fSeq, tf_it->compressedSize, common::dataStructures::MountPolicy(), serializers::RetrieveJobStatus::RJS_Failed, - j.job->m_activityDescription + j.job->m_activityDescription, j.job->m_diskSystemName }); } try { @@ -1129,8 +1129,10 @@ std::list<SchedulerDatabase::RetrieveQueueStatistics> OStoreDB::getRetrieveQueue //------------------------------------------------------------------------------ // OStoreDB::queueRetrieve() //------------------------------------------------------------------------------ + std::string OStoreDB::queueRetrieve(cta::common::dataStructures::RetrieveRequest& rqst, - const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, log::LogContext &logContext) { + const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, + const optional<std::string> diskSystemName, log::LogContext& logContext) { assertAgentAddressSet(); auto mutexForHelgrind = cta::make_unique<cta::threading::Mutex>(); cta::threading::MutexLocker mlForHelgrind(*mutexForHelgrind); @@ -1182,6 +1184,7 @@ std::string OStoreDB::queueRetrieve(cta::common::dataStructures::RetrieveRequest rReq->setRetrieveFileQueueCriteria(criteria); rReq->setActivityIfNeeded(rqst, criteria); rReq->setCreationTime(rqst.creationLog.time); + if (diskSystemName) rReq->setDiskSystemName(diskSystemName.value()); // Find the job corresponding to the vid (and check we indeed have one). auto jobs = rReq->getJobs(); objectstore::RetrieveRequest::JobDump job; @@ -3439,9 +3442,8 @@ const OStoreDB::RetrieveMount::MountInfo& OStoreDB::RetrieveMount::getMountInfo( //------------------------------------------------------------------------------ // OStoreDB::RetrieveMount::getNextJobBatch() //------------------------------------------------------------------------------ -std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::RetrieveMount:: -getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext &logContext) -{ +std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::RetrieveMount::getNextJobBatch(uint64_t filesRequested, + uint64_t bytesRequested, disk::DiskSystemList& diskSystemList, log::LogContext& logContext) { typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransferForUser> RQAlgos; RQAlgos rqAlgos(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); RQAlgos::PopCriteria popCriteria(filesRequested, bytesRequested); @@ -3607,7 +3609,7 @@ void OStoreDB::RetrieveMount::flushAsyncSuccessReports(std::list<cta::SchedulerD insertedRequests.push_back(RQTRTRFSAlgo::InsertedElement{&req->m_retrieveRequest, req->selectedCopyNb, req->archiveFile.tapeFiles.at(req->selectedCopyNb).fSeq, req->archiveFile.fileSize, cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, - serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess, req->m_activityDescription}); + serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess, req->m_activityDescription, req->m_diskSystemName}); requestToJobMap[&req->m_retrieveRequest] = req; } RQTRTRFSAlgo rQTRTRFSAlgo(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); @@ -4487,7 +4489,7 @@ void OStoreDB::RetrieveJob::failTransfer(const std::string &failureReason, log:: CaRqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaRqtr::InsertedElement{ &m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy, - serializers::RetrieveJobStatus::RJS_Failed, m_activityDescription + serializers::RetrieveJobStatus::RJS_Failed, m_activityDescription, m_diskSystemName }); m_retrieveRequest.commit(); rel.release(); @@ -4554,7 +4556,7 @@ void OStoreDB::RetrieveJob::failTransfer(const std::string &failureReason, log:: CaRqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaRqtr::InsertedElement{ &m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy, serializers::RetrieveJobStatus::RJS_ToTransferForUser, - m_activityDescription + m_activityDescription, m_diskSystemName }); CaRqtr caRqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); @@ -4618,7 +4620,7 @@ void OStoreDB::RetrieveJob::failReport(const std::string &failureReason, log::Lo CaRqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaRqtr::InsertedElement{ &m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy, - serializers::RetrieveJobStatus::RJS_ToReportToUserForFailure, m_activityDescription + serializers::RetrieveJobStatus::RJS_ToReportToUserForFailure, m_activityDescription, m_diskSystemName }); caRqtr.referenceAndSwitchOwnership(tf.vid, insertedElements, lc); log::ScopedParamContainer params(lc); @@ -4637,7 +4639,7 @@ void OStoreDB::RetrieveJob::failReport(const std::string &failureReason, log::Lo CaRqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaRqtr::InsertedElement{ &m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy, - serializers::RetrieveJobStatus::RJS_Failed, m_activityDescription + serializers::RetrieveJobStatus::RJS_Failed, m_activityDescription, m_diskSystemName }); caRqtr.referenceAndSwitchOwnership(tf.vid, insertedElements, lc); log::ScopedParamContainer params(lc); diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index de93c966c41a97b7ac19cf97f262e5b439630e0d..e7a9889c7f0566a2066a7b333d0f649a8bb676b0 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -217,7 +217,8 @@ public: OStoreDB & m_oStoreDB; public: const MountInfo & getMountInfo() override; - std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext& logContext) override; + std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, + disk::DiskSystemList& diskSystemList, log::LogContext& logContext) override; void complete(time_t completionTime) override; void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override; void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override; @@ -258,6 +259,7 @@ public: std::unique_ptr<objectstore::RetrieveRequest::AsyncJobSucceedForRepackReporter> m_jobSucceedForRepackReporter; objectstore::RetrieveRequest::RepackInfo m_repackInfo; optional<objectstore::RetrieveActivityDescription> m_activityDescription; + optional<std::string> m_diskSystemName; }; static RetrieveJob * castFromSchedDBJob(SchedulerDatabase::RetrieveJob * job); @@ -296,8 +298,9 @@ public: CTA_GENERATE_EXCEPTION_CLASS(RetrieveRequestHasNoCopies); CTA_GENERATE_EXCEPTION_CLASS(TapeCopyNumberOutOfRange); - std::string queueRetrieve(cta::common::dataStructures::RetrieveRequest& rqst, - const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria, log::LogContext &logContext) override; + std::string queueRetrieve(cta::common::dataStructures::RetrieveRequest& rqst, + const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, + const optional<std::string> diskSystemName, log::LogContext& logContext) override; std::list<RetrieveRequestDump> getRetrieveRequestsByVid(const std::string& vid) const override; diff --git a/scheduler/OStoreDB/OStoreDBFactory.hpp b/scheduler/OStoreDB/OStoreDBFactory.hpp index bb9cb26357f5e2c09419bcc3c783c59daec5a918..c407eaf74d06e72d58f27be5a66f73a696b161a2 100644 --- a/scheduler/OStoreDB/OStoreDBFactory.hpp +++ b/scheduler/OStoreDB/OStoreDBFactory.hpp @@ -202,9 +202,11 @@ public: return m_OStoreDB.getRetrieveQueueStatistics(criteria, vidsToConsider); } - std::string queueRetrieve(common::dataStructures::RetrieveRequest& rqst, - const common::dataStructures::RetrieveFileQueueCriteria &criteria, log::LogContext &logContext) override { - return m_OStoreDB.queueRetrieve(rqst, criteria, logContext); + + std::string queueRetrieve(cta::common::dataStructures::RetrieveRequest& rqst, + const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, + const optional<std::string> diskSystemName, log::LogContext& logContext) override { + return m_OStoreDB.queueRetrieve(rqst, criteria, diskSystemName, logContext); } diff --git a/scheduler/RetrieveMount.cpp b/scheduler/RetrieveMount.cpp index a629077989c32cf0e3087a949469fdfc801426c5..4cff8c76c8eaaf9cb7b717f890af1ed094cb4b50 100644 --- a/scheduler/RetrieveMount.cpp +++ b/scheduler/RetrieveMount.cpp @@ -110,7 +110,7 @@ std::string cta::RetrieveMount::getMediaType() const } //------------------------------------------------------------------------------ -// getVo() +// getVendor() //------------------------------------------------------------------------------ std::string cta::RetrieveMount::getVendor() const { @@ -121,6 +121,9 @@ std::string cta::RetrieveMount::getVendor() const return sVendor.str(); } +//------------------------------------------------------------------------------ +// getCapacityInBytes() +//------------------------------------------------------------------------------ uint64_t cta::RetrieveMount::getCapacityInBytes() const { if(!m_dbMount.get()) throw exception::Exception("In cta::RetrieveMount::getVendor(): got NULL dbMount"); @@ -134,9 +137,12 @@ std::list<std::unique_ptr<cta::RetrieveJob> > cta::RetrieveMount::getNextJobBatc log::LogContext& logContext) { if (!m_sessionRunning) throw SessionNotRunning("In RetrieveMount::getNextJobBatch(): trying to get job from complete/not started session"); + // Get the current file systems list from the catalogue + disk::DiskSystemList diskSystemList; + if (m_catalogue) diskSystemList = m_catalogue->getDiskSystems(); // Try and get a new job from the DB std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> dbJobBatch(m_dbMount->getNextJobBatch(filesRequested, - bytesRequested, logContext)); + bytesRequested, diskSystemList, logContext)); std::list<std::unique_ptr<RetrieveJob>> ret; // We prepare the response for (auto & sdrj: dbJobBatch) { @@ -227,6 +233,17 @@ cta::disk::DiskReporter* cta::RetrieveMount::createDiskReporter(std::string& URL return m_reporterFactory.createDiskReporter(URL); } +//------------------------------------------------------------------------------ +// setCatalogue() +//------------------------------------------------------------------------------ +void cta::RetrieveMount::setCatalogue(catalogue::Catalogue* catalogue) { + if (m_catalogue) + throw exception::Exception("In RetrieveMount::setCatalogue(): catalogue already set."); + if (!catalogue) + throw exception::Exception("In RetrieveMount::setCatalogue(): trying to set a null catalogue."); + m_catalogue = catalogue; +} + //------------------------------------------------------------------------------ // tapeComplete() //------------------------------------------------------------------------------ diff --git a/scheduler/RetrieveMount.hpp b/scheduler/RetrieveMount.hpp index fff81a1bea77c8350a4dfa8686049477bd3ee4dd..eb81a9ef8313212e447b2d971e9a83ccaad83c35 100644 --- a/scheduler/RetrieveMount.hpp +++ b/scheduler/RetrieveMount.hpp @@ -24,6 +24,7 @@ #include "scheduler/SchedulerDatabase.hpp" #include "scheduler/TapeMount.hpp" #include "disk/DiskReporterFactory.hpp" +#include "catalogue/Catalogue.hpp" #include <memory> #include <queue> @@ -191,6 +192,12 @@ namespace cta { */ disk::DiskReporter * createDiskReporter(std::string & URL); + /** + * Passes a reference to the catalogue, used for disk space back pressure + * @param catalogue + */ + void setCatalogue(catalogue::Catalogue *catalogue); + /** * Destructor. */ @@ -203,6 +210,11 @@ namespace cta { */ std::unique_ptr<cta::SchedulerDatabase::RetrieveMount> m_dbMount; + /** + * A reference to the catalogue + */ + catalogue::Catalogue * m_catalogue = nullptr; + /** * Internal tracking of the session completion */ diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index 4a356161ae67de90c45c2f629a8bceabd11cd576..60d61aeb137922d0792ceaf68916c56e9985b6b4 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -211,8 +211,15 @@ void Scheduler::queueRetrieve( // Get the queue criteria common::dataStructures::RetrieveFileQueueCriteria queueCriteria; queueCriteria = m_catalogue.prepareToRetrieveFile(instanceName, request.archiveFileID, request.requester, request.activity, lc); + // Get DiskSystem list. + auto diskSystemList = m_catalogue.getDiskSystems(); auto catalogueTime = t.secs(cta::utils::Timer::resetCounter); - std::string selectedVid = m_db.queueRetrieve(request, queueCriteria, lc); + // Determine disk system for this request, if any. + optional<std::string> diskSystemName; + try { + diskSystemName = diskSystemList.getFSNAme(request.dstURL); + } catch (std::out_of_range&) {} + std::string selectedVid = m_db.queueRetrieve(request, queueCriteria, diskSystemName, lc); auto schedulerDbTime = t.secs(); log::ScopedParamContainer spc(lc); spc.add("fileId", request.archiveFileID) diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index 414b3ee460c356479f818b284c999e04d47ca690..4e08c982726abaf54637196ea7f6c60d872ce4a9 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -38,6 +38,7 @@ #include "common/log/LogContext.hpp" #include "catalogue/TapeForWriting.hpp" #include "scheduler/TapeMount.hpp" +#include "disk/DiskSystem.hpp" #include <list> #include <limits> @@ -267,11 +268,13 @@ public: * @param rqst The request. * @param criteria The criteria retrieved from the CTA catalogue to be used to * decide how to quue the request. + * @param diskSystemName optional disk system name if the destination matches a declared one. * @param logContext context allowing logging db operation * @return the selected vid (mostly for logging) */ virtual std::string queueRetrieve(cta::common::dataStructures::RetrieveRequest &rqst, - const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria, log::LogContext &logContext) = 0; + const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria, const optional<std::string> diskSystemName, + log::LogContext &logContext) = 0; /** * Returns all of the existing retrieve jobs grouped by tape and then @@ -355,7 +358,7 @@ public: } mountInfo; virtual const MountInfo & getMountInfo() = 0; virtual std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> getNextJobBatch(uint64_t filesRequested, - uint64_t bytesRequested, log::LogContext& logContext) = 0; + uint64_t bytesRequested, disk::DiskSystemList & diskSystemList, log::LogContext& logContext) = 0; virtual void complete(time_t completionTime) = 0; virtual void setDriveStatus(common::dataStructures::DriveStatus status, time_t completionTime) = 0; virtual void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) = 0; diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp index d04d6967984f2ab66b3d02a7d6fb6da765502916..a6b34e00e6b3c2ed8fe913680b65ff1182788e90 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp @@ -42,7 +42,8 @@ namespace unitTests{ class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount { const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); } - std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} + std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, + cta::disk::DiskSystemList& diskSystemList, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); } diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp index 2fff5e769f016d9118971c56a1cad2ebcd4f947d..cdbbbdabce68fba288568a06ea336302730f0679 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp @@ -36,7 +36,7 @@ namespace unitTests{ class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount { const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); } - std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} + std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::disk::DiskSystemList& diskSystemList, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); } diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp index 4a331e51d2c971ee1be25443cbe905e5f9702b72..912ab584c30af45c6747203059924fffa8a768a0 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp @@ -131,7 +131,8 @@ namespace unitTests class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount { const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); } - std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} + std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, + cta::disk::DiskSystemList& diskSystemList, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); }