From 0fff5c40c1143194ce1d50829f6d935144c80816 Mon Sep 17 00:00:00 2001 From: Eric Cano <Eric.Cano@cern.ch> Date: Mon, 1 Jul 2019 10:40:45 +0200 Subject: [PATCH] #533: Added support for disk system in queues and object store. --- objectstore/AlgorithmsTest.cpp | 2 +- objectstore/CMakeLists.txt | 4 +- objectstore/GarbageCollector.cpp | 2 +- objectstore/GarbageCollectorTest.cpp | 3 +- objectstore/RetrieveQueue.cpp | 4 +- objectstore/RetrieveQueue.hpp | 9 +++- objectstore/RetrieveQueueAlgorithms.hpp | 14 ++++-- objectstore/RetrieveQueueShard.cpp | 22 +++++++-- objectstore/RetrieveQueueShard.hpp | 7 +-- ...rieveQueueToTransferForUserAlgorithms.cpp} | 4 +- objectstore/RetrieveRequest.cpp | 46 ++++++++++++++++++- objectstore/RetrieveRequest.hpp | 6 +++ objectstore/Sorter.cpp | 2 +- objectstore/Sorter.hpp | 1 + objectstore/cta.proto | 2 + scheduler/OStoreDB/MemQueues.cpp | 2 +- scheduler/OStoreDB/OStoreDB.cpp | 22 +++++---- scheduler/OStoreDB/OStoreDB.hpp | 9 ++-- scheduler/OStoreDB/OStoreDBFactory.hpp | 8 ++-- scheduler/RetrieveMount.cpp | 21 ++++++++- scheduler/RetrieveMount.hpp | 12 +++++ scheduler/Scheduler.cpp | 9 +++- scheduler/SchedulerDatabase.hpp | 7 ++- .../tapeserver/daemon/DiskWriteTaskTest.cpp | 3 +- .../daemon/DiskWriteThreadPoolTest.cpp | 2 +- .../daemon/RecallTaskInjectorTest.cpp | 3 +- 26 files changed, 177 insertions(+), 49 deletions(-) rename objectstore/{RetrieveQueueToTransferAlgorithms.cpp => RetrieveQueueToTransferForUserAlgorithms.cpp} (96%) diff --git a/objectstore/AlgorithmsTest.cpp b/objectstore/AlgorithmsTest.cpp index 8eced3a140..4f77bf42ca 100644 --- a/objectstore/AlgorithmsTest.cpp +++ b/objectstore/AlgorithmsTest.cpp @@ -78,7 +78,7 @@ void fillRetrieveRequests( rqc.mountPolicy.retrievePriority = 1; requestPtrs.emplace_back(new cta::objectstore::RetrieveRequest(rrAddr, be)); requests.emplace_back(ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransferForUser>::InsertedElement{ - requestPtrs.back().get(), 1, i, 667, mp, serializers::RetrieveJobStatus::RJS_ToTransferForUser, cta::nullopt + requestPtrs.back().get(), 1, i, 667, mp, serializers::RetrieveJobStatus::RJS_ToTransferForUser, cta::nullopt, cta::nullopt }); auto &rr = *requests.back().retrieveRequest; rr.initialize(); diff --git a/objectstore/CMakeLists.txt b/objectstore/CMakeLists.txt index d30eb6aa7f..508be71b85 100644 --- a/objectstore/CMakeLists.txt +++ b/objectstore/CMakeLists.txt @@ -75,12 +75,12 @@ add_library (ctaobjectstore SHARED ArchiveQueueToTransferForRepackAlgorithms.cpp RetrieveQueue.cpp RetrieveQueueShard.cpp - RetrieveQueueToTransferAlgorithms.cpp + RetrieveQueueToTransferForUserAlgorithms.cpp + RetrieveQueueToTransferForRepackAlgorithms.cpp RetrieveQueueToReportAlgorithms.cpp RetrieveQueueFailedAlgorithms.cpp RetrieveQueueToReportToRepackForSuccessAlgorithms.cpp RetrieveQueueToReportToRepackForFailureAlgorithms.cpp - RetrieveQueueToTransferForRepackAlgorithms.cpp JobQueueType.cpp Sorter.cpp ArchiveRequest.cpp diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index 6ff68c09a4..548637d699 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -559,7 +559,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& for (auto &tf: rr->getArchiveFile().tapeFiles) { if (tf.vid == vid) { jta.push_back({tf.copyNb, tf.fSeq, rr->getAddressIfSet(), rr->getArchiveFile().fileSize, - rr->getRetrieveFileQueueCriteria().mountPolicy, rr->getEntryLog().time, rr->getActivity()}); + rr->getRetrieveFileQueueCriteria().mountPolicy, rr->getEntryLog().time, rr->getActivity(), rr->getDiskSystemName()}); } } } diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index 249b1f99ce..b6a97711c7 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -608,7 +608,8 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) { cta::objectstore::ScopedExclusiveLock rql(rq); rq.fetch(); std::list <cta::objectstore::RetrieveQueue::JobToAdd> jta; - jta.push_back({1,rqc.archiveFile.tapeFiles.front().fSeq, rr.getAddressIfSet(), rqc.archiveFile.fileSize, rqc.mountPolicy, sReq.creationLog.time, cta::nullopt}); + jta.push_back({1,rqc.archiveFile.tapeFiles.front().fSeq, rr.getAddressIfSet(), rqc.archiveFile.fileSize, rqc.mountPolicy, + sReq.creationLog.time, cta::nullopt, cta::nullopt}); rq.addJobsAndCommit(jta, agentRef, lc); } if (pass < 5) { pass++; continue; } diff --git a/objectstore/RetrieveQueue.cpp b/objectstore/RetrieveQueue.cpp index 755e485503..6536126d60 100644 --- a/objectstore/RetrieveQueue.cpp +++ b/objectstore/RetrieveQueue.cpp @@ -531,7 +531,7 @@ auto RetrieveQueue::addJobsIfNecessaryAndCommit(std::list<JobToAdd> & jobsToAdd, } shardsDumps.emplace_back(std::list<JobDump>()); for (auto & j: s->dumpJobs()) { - shardsDumps.back().emplace_back(JobDump({j.address, j.copyNb, j.size})); + shardsDumps.back().emplace_back(JobDump({j.address, j.copyNb, j.size, j.activityDescription, j.diskSystemName})); } nextShard: s++; @@ -606,7 +606,7 @@ auto RetrieveQueue::dumpJobs() -> std::list<JobDump> { goto nextShard; } for (auto & j: s->dumpJobs()) { - ret.emplace_back(JobDump{j.address, j.copyNb, j.size}); + ret.emplace_back(JobDump{j.address, j.copyNb, j.size, j.activityDescription, j.diskSystemName}); } nextShard: s++; sf++; diff --git a/objectstore/RetrieveQueue.hpp b/objectstore/RetrieveQueue.hpp index d9566754e4..f3337bc2d7 100644 --- a/objectstore/RetrieveQueue.hpp +++ b/objectstore/RetrieveQueue.hpp @@ -66,7 +66,8 @@ public: uint64_t fileSize; cta::common::dataStructures::MountPolicy policy; time_t startTime; - optional<RetrieveActivityDescription> activityDescription; + optional<RetrieveActivityDescription> activityDescription; + optional<std::string> diskSystemName; }; void addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentReference & agentReference, log::LogContext & lc); // This version will check for existence of the job in the queue before @@ -97,6 +98,12 @@ public: std::string address; uint32_t copyNb; uint64_t size; + struct ActivityDescription { + std::string diskInstanceName; + std::string activity; + }; + optional<ActivityDescription> activity; + optional<std::string> diskSystemName; }; std::list<JobDump> dumpJobs(); struct CandidateJobList { diff --git a/objectstore/RetrieveQueueAlgorithms.hpp b/objectstore/RetrieveQueueAlgorithms.hpp index 45dd12db5e..450108e598 100644 --- a/objectstore/RetrieveQueueAlgorithms.hpp +++ b/objectstore/RetrieveQueueAlgorithms.hpp @@ -45,6 +45,7 @@ struct ContainerTraits<RetrieveQueue,C> cta::common::dataStructures::MountPolicy policy; serializers::RetrieveJobStatus status; optional<RetrieveActivityDescription> activityDescription; + optional<std::string> diskSystemName; typedef std::list<InsertedElement> list; }; @@ -59,6 +60,8 @@ struct ContainerTraits<RetrieveQueue,C> std::string errorReportURL; SchedulerDatabase::RetrieveJob::ReportType reportType; RetrieveRequest::RepackInfo repackInfo; + optional<RetrieveQueue::JobDump::ActivityDescription> activity; + optional<std::string> diskSystemName; }; struct PoppedElementsSummary; struct PopCriteria { @@ -279,7 +282,7 @@ addReferencesAndCommit(Container &cont, typename InsertedElement::list &elemMemC std::list<RetrieveQueue::JobToAdd> jobsToAdd; for (auto &e : elemMemCont) { RetrieveRequest &rr = *e.retrieveRequest; - jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr), e.activityDescription}); + jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr), e.activityDescription, e.diskSystemName}); } cont.addJobsAndCommit(jobsToAdd, agentRef, lc); } @@ -292,7 +295,7 @@ addReferencesIfNecessaryAndCommit(Container& cont, typename InsertedElement::lis std::list<RetrieveQueue::JobToAdd> jobsToAdd; for (auto &e : elemMemCont) { RetrieveRequest &rr = *e.retrieveRequest; - jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr), e.activityDescription}); + jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr), e.activityDescription, e.diskSystemName}); } cont.addJobsIfNecessaryAndCommit(jobsToAdd, agentRef, lc); } @@ -387,6 +390,11 @@ switchElementsOwnership(PoppedElementsBatch &poppedElementBatch, const Container e.archiveFile = u.get()->getArchiveFile(); e.rr = u.get()->getRetrieveRequest(); e.repackInfo = u.get()->getRepackInfo(); + auto & rad = u.get()->getRetrieveActivityDescription(); + if (rad) { + e.activity = RetrieveQueue::JobDump::ActivityDescription{ rad.value().diskInstanceName, rad.value().activity }; + } + e.diskSystemName = u.get()->getDiskSystemName(); switch(u.get()->getJobStatus()) { case serializers::RetrieveJobStatus::RJS_ToReportToUserForFailure: e.reportType = SchedulerDatabase::RetrieveJob::ReportType::FailureReport; @@ -492,7 +500,7 @@ getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, common::dataStructures::ArchiveFile(), common::dataStructures::RetrieveRequest(), "", SchedulerDatabase::RetrieveJob::ReportType::NoReportRequired, - RetrieveRequest::RepackInfo() + RetrieveRequest::RepackInfo(), cjfq.activity, cjfq.diskSystemName }); ret.summary.files++; } diff --git a/objectstore/RetrieveQueueShard.cpp b/objectstore/RetrieveQueueShard.cpp index 76ebe1e5c3..9c00e31696 100644 --- a/objectstore/RetrieveQueueShard.cpp +++ b/objectstore/RetrieveQueueShard.cpp @@ -69,7 +69,16 @@ RetrieveQueue::CandidateJobList RetrieveQueueShard::getCandidateJobList(uint64_t ret.remainingFilesAfterCandidates = m_payload.retrievejobs_size(); for (auto & j: m_payload.retrievejobs()) { if (!retrieveRequestsToSkip.count(j.address())) { - ret.candidates.push_back({j.address(), (uint16_t)j.copynb(), j.size()}); + ret.candidates.push_back({j.address(), (uint16_t)j.copynb(), j.size(), nullopt, nullopt}); + if (j.has_activity()) { + RetrieveQueue::JobDump::ActivityDescription ad; + ad.activity = j.activity(); + ad.diskInstanceName = j.disk_instance_name(); + ret.candidates.back().activity = ad; + } + if (j.has_destination_disk_system_name()) { + ret.candidates.back().diskSystemName = j.destination_disk_system_name(); + } ret.candidateBytes += j.size(); ret.candidateFiles ++; } @@ -104,7 +113,9 @@ auto RetrieveQueueShard::removeJobs(const std::list<std::string>& jobsToRemove) ret.removedJobs.back().size = j.size(); ret.removedJobs.back().startTime = j.starttime(); if (j.has_activity()) - ret.removedJobs.back().activityDescription = JobInfo::ActivityDescription{ j.disk_instance_name(), j.activity() }; + ret.removedJobs.back().activityDescription = RetrieveQueue::JobDump::ActivityDescription{ j.disk_instance_name(), j.activity() }; + if (j.has_destination_disk_system_name()) + ret.removedJobs.back().diskSystemName = j.destination_disk_system_name(); ret.bytesRemoved += j.size(); totalSize -= j.size(); ret.jobsRemoved++; @@ -139,9 +150,12 @@ auto RetrieveQueueShard::dumpJobs() -> std::list<JobInfo> { std::list<JobInfo> ret; for (auto &j: m_payload.retrievejobs()) { ret.emplace_back(JobInfo{j.size(), j.address(), (uint16_t)j.copynb(), j.priority(), - j.minretrieverequestage(), j.maxdrivesallowed(), (time_t)j.starttime(), j.fseq(), nullopt}); + j.minretrieverequestage(), j.maxdrivesallowed(), (time_t)j.starttime(), j.fseq(), nullopt, nullopt}); if (j.has_activity()) { - ret.back().activityDescription = JobInfo::ActivityDescription{ j.disk_instance_name(), j.activity() }; + ret.back().activityDescription = RetrieveQueue::JobDump::ActivityDescription{ j.disk_instance_name(), j.activity() }; + } + if (j.has_destination_disk_system_name()) { + ret.back().diskSystemName = j.destination_disk_system_name(); } } return ret; diff --git a/objectstore/RetrieveQueueShard.hpp b/objectstore/RetrieveQueueShard.hpp index 7da9d10d72..8eef6d0af6 100644 --- a/objectstore/RetrieveQueueShard.hpp +++ b/objectstore/RetrieveQueueShard.hpp @@ -53,11 +53,8 @@ public: uint64_t maxDrivesAllowed; time_t startTime; uint64_t fSeq; - struct ActivityDescription { - std::string diskInstanceName; - std::string activity; - }; - optional<ActivityDescription> activityDescription; + optional<RetrieveQueue::JobDump::ActivityDescription> activityDescription; + optional<std::string> diskSystemName; }; std::list<JobInfo> dumpJobs(); diff --git a/objectstore/RetrieveQueueToTransferAlgorithms.cpp b/objectstore/RetrieveQueueToTransferForUserAlgorithms.cpp similarity index 96% rename from objectstore/RetrieveQueueToTransferAlgorithms.cpp rename to objectstore/RetrieveQueueToTransferForUserAlgorithms.cpp index 2d80b46b33..90dcb21623 100644 --- a/objectstore/RetrieveQueueToTransferAlgorithms.cpp +++ b/objectstore/RetrieveQueueToTransferForUserAlgorithms.cpp @@ -56,7 +56,9 @@ getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, common::dataStructures::RetrieveRequest(), "", SchedulerDatabase::RetrieveJob::ReportType::NoReportRequired, - RetrieveRequest::RepackInfo() + RetrieveRequest::RepackInfo(), + cjfq.activity, + cjfq.diskSystemName }); ret.summary.bytes += cjfq.size; ret.summary.files++; diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index f1a5be8b1f..a1c75c17a7 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -158,7 +158,7 @@ queueForFailure:; objectstore::MountPolicySerDeser mp; std::list<RetrieveQueue::JobToAdd> jta; jta.push_back({activeCopyNb, activeFseq, getAddressIfSet(), m_payload.archivefile().filesize(), - mp, (signed)m_payload.schedulerrequest().entrylog().time(), nullopt}); + mp, (signed)m_payload.schedulerrequest().entrylog().time(), nullopt, nullopt}); if (m_payload.has_activity_weight()) { RetrieveActivityDescription activityDescription; activityDescription.priority = m_payload.activity_weight().priority(); @@ -226,7 +226,7 @@ queueForTransfer:; mp.deserialize(m_payload.mountpolicy()); std::list<RetrieveQueue::JobToAdd> jta; jta.push_back({bestTapeFile->copynb(), bestTapeFile->fseq(), getAddressIfSet(), m_payload.archivefile().filesize(), - mp, (signed)m_payload.schedulerrequest().entrylog().time(), nullopt}); + mp, (signed)m_payload.schedulerrequest().entrylog().time(), getActivity(), getDiskSystemName()}); if (m_payload.has_activity_weight()) { RetrieveActivityDescription activityDescription; activityDescription.priority = m_payload.activity_weight().priority(); @@ -484,6 +484,25 @@ optional<RetrieveActivityDescription> RetrieveRequest::getActivity() { return ret; } +//------------------------------------------------------------------------------ +// RetrieveRequest::setDiskSystemName() +//------------------------------------------------------------------------------ +void RetrieveRequest::setDiskSystemName(const std::string& diskSystemName) { + checkPayloadWritable(); + m_payload.set_disk_system_name(diskSystemName); +} + +//------------------------------------------------------------------------------ +// RetrieveRequest::getDiskSystemName() +//------------------------------------------------------------------------------ +optional<std::string> RetrieveRequest::getDiskSystemName() { + checkPayloadReadable(); + optional<std::string> ret; + if (m_payload.has_disk_system_name()) + ret = m_payload.disk_system_name(); + return ret; +} + //------------------------------------------------------------------------------ // RetrieveRequest::dumpJobs() //------------------------------------------------------------------------------ @@ -852,6 +871,15 @@ auto RetrieveRequest::asyncUpdateJobOwner(uint32_t copyNumber, const std::string af.deserialize(payload.archivefile()); retRef.m_archiveFile = af; retRef.m_jobStatus = j.status(); + if (payload.has_activity_weight()) { + retRef.m_retrieveActivityDescription = RetrieveActivityDescription{ + payload.activity_weight().priority(), payload.activity_weight().disk_instance_name(), + payload.activity_weight().activity(), payload.activity_weight().creation_time(), + payload.activity_weight().weight(), 0 + }; + } + if (payload.has_disk_system_name()) + retRef.m_diskSystemName = payload.disk_system_name(); RetrieveRequest::updateLifecycleTiming(payload,j); LifecycleTimingsSerDeser lifeCycleSerDeser; lifeCycleSerDeser.deserialize(payload.lifecycle_timings()); @@ -902,6 +930,20 @@ const RetrieveRequest::RepackInfo& RetrieveRequest::AsyncJobOwnerUpdater::getRep return m_repackInfo; } +//------------------------------------------------------------------------------ +// RetrieveRequest::AsyncJobOwnerUpdater::getRetrieveActivityDescription() +//------------------------------------------------------------------------------ +const optional<RetrieveActivityDescription>& RetrieveRequest::AsyncJobOwnerUpdater::getRetrieveActivityDescription() { + return m_retrieveActivityDescription; +} + +//------------------------------------------------------------------------------ +// RetrieveRequest::AsyncJobOwnerUpdater::getDiskSystemName() +//------------------------------------------------------------------------------ +const optional<std::string>& RetrieveRequest::AsyncJobOwnerUpdater::getDiskSystemName() { + return m_diskSystemName; +} + //------------------------------------------------------------------------------ // RetrieveRequest::AsyncJobOwnerUpdater::getRetrieveRequest() //------------------------------------------------------------------------------ diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp index ec82378e1b..0ad3cd77fa 100644 --- a/objectstore/RetrieveRequest.hpp +++ b/objectstore/RetrieveRequest.hpp @@ -220,6 +220,8 @@ public: const common::dataStructures::RetrieveRequest &getRetrieveRequest(); const common::dataStructures::ArchiveFile &getArchiveFile(); const RepackInfo &getRepackInfo(); + const optional<RetrieveActivityDescription> &getRetrieveActivityDescription(); + const optional<std::string> &getDiskSystemName(); private: std::function<std::string(const std::string &)> m_updaterCallback; std::unique_ptr<Backend::AsyncUpdater> m_backendUpdater; @@ -227,6 +229,8 @@ public: common::dataStructures::ArchiveFile m_archiveFile; RepackInfo m_repackInfo; serializers::RetrieveJobStatus m_jobStatus; + optional<RetrieveActivityDescription> m_retrieveActivityDescription; + optional<std::string> m_diskSystemName; }; // An owner updater factory. The owner MUST be previousOwner for the update to be executed. AsyncJobOwnerUpdater *asyncUpdateJobOwner(uint32_t copyNumber, const std::string &owner, const std::string &previousOwner); @@ -238,6 +242,8 @@ public: void setActivityIfNeeded(const cta::common::dataStructures::RetrieveRequest & retrieveRequest, const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria); optional<RetrieveActivityDescription> getActivity(); + void setDiskSystemName(const std::string & diskSystemName); + optional<std::string> getDiskSystemName(); cta::common::dataStructures::RetrieveFileQueueCriteria getRetrieveFileQueueCriteria(); cta::common::dataStructures::ArchiveFile getArchiveFile(); cta::common::dataStructures::EntryLog getEntryLog(); diff --git a/objectstore/Sorter.cpp b/objectstore/Sorter.cpp index c1159267d2..4424bce68b 100644 --- a/objectstore/Sorter.cpp +++ b/objectstore/Sorter.cpp @@ -159,7 +159,7 @@ void Sorter::executeRetrieveAlgorithm(const std::string vid, std::string& queueA Sorter::RetrieveJob job = std::get<0>(jobToAdd->jobToQueue); succeededJobs[job.jobDump.copyNb] = jobToAdd; previousOwner = job.previousOwner->getAgentAddress(); - jobsToAdd.push_back({job.retrieveRequest.get(),job.jobDump.copyNb,job.fSeq,job.fileSize,job.mountPolicy,job.jobDump.status,job.activityDescription}); + jobsToAdd.push_back({job.retrieveRequest.get(),job.jobDump.copyNb,job.fSeq,job.fileSize,job.mountPolicy,job.jobDump.status,job.activityDescription,job.diskSystemName}); } try{ algo.referenceAndSwitchOwnershipIfNecessary(vid,previousOwner,queueAddress,jobsToAdd,lc); diff --git a/objectstore/Sorter.hpp b/objectstore/Sorter.hpp index b68b1a8e46..2119819376 100644 --- a/objectstore/Sorter.hpp +++ b/objectstore/Sorter.hpp @@ -126,6 +126,7 @@ public: common::dataStructures::MountPolicy mountPolicy; cta::objectstore::JobQueueType jobQueueType; optional<RetrieveActivityDescription> activityDescription; + optional<std::string> diskSystemName; }; /** diff --git a/objectstore/cta.proto b/objectstore/cta.proto index daab502935..0b09a98070 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -422,6 +422,7 @@ message RetrieveRequest { required bool isrepack = 9157; optional RetrieveRequestRepackInfo repack_info = 9158; optional LifecycleTimings lifecycle_timings = 9159; + optional string disk_system_name = 9161; } message ValueCountPair { @@ -476,6 +477,7 @@ message RetrieveJobPointer { // For activity (if present), we need disk instance and activity name (priority is always provided) optional string disk_instance_name = 3109; optional string activity = 3110; + optional string destination_disk_system_name = 3111; } message RetrieveQueueShardPointer { diff --git a/scheduler/OStoreDB/MemQueues.cpp b/scheduler/OStoreDB/MemQueues.cpp index 7a1cafd3a8..67c3172e1a 100644 --- a/scheduler/OStoreDB/MemQueues.cpp +++ b/scheduler/OStoreDB/MemQueues.cpp @@ -55,7 +55,7 @@ void MemQueue<objectstore::RetrieveRequest, objectstore::RetrieveQueue>::special if (j.copyNb == job.copyNb) { auto criteria = request.getRetrieveFileQueueCriteria(); jtal.push_back({j.copyNb, j.fSeq, request.getAddressIfSet(), criteria.archiveFile.fileSize, - criteria.mountPolicy, request.getEntryLog().time, request.getActivity()}); + criteria.mountPolicy, request.getEntryLog().time, request.getActivity(), request.getDiskSystemName()}); request.setActiveCopyNumber(j.copyNb); request.setOwner(queueAddress); goto jobAdded; diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 15f98fa664..f843c7bd34 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -1103,7 +1103,7 @@ void OStoreDB::setRetrieveJobBatchReportedToUser(std::list<cta::SchedulerDatabas insertedElements.emplace_back(CaRQF::InsertedElement{ &j.job->m_retrieveRequest, tf_it->copyNb, tf_it->fSeq, tf_it->compressedSize, common::dataStructures::MountPolicy(), serializers::RetrieveJobStatus::RJS_Failed, - j.job->m_activityDescription + j.job->m_activityDescription, j.job->m_diskSystemName }); } try { @@ -1129,8 +1129,10 @@ std::list<SchedulerDatabase::RetrieveQueueStatistics> OStoreDB::getRetrieveQueue //------------------------------------------------------------------------------ // OStoreDB::queueRetrieve() //------------------------------------------------------------------------------ + std::string OStoreDB::queueRetrieve(cta::common::dataStructures::RetrieveRequest& rqst, - const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, log::LogContext &logContext) { + const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, + const optional<std::string> diskSystemName, log::LogContext& logContext) { assertAgentAddressSet(); auto mutexForHelgrind = cta::make_unique<cta::threading::Mutex>(); cta::threading::MutexLocker mlForHelgrind(*mutexForHelgrind); @@ -1182,6 +1184,7 @@ std::string OStoreDB::queueRetrieve(cta::common::dataStructures::RetrieveRequest rReq->setRetrieveFileQueueCriteria(criteria); rReq->setActivityIfNeeded(rqst, criteria); rReq->setCreationTime(rqst.creationLog.time); + if (diskSystemName) rReq->setDiskSystemName(diskSystemName.value()); // Find the job corresponding to the vid (and check we indeed have one). auto jobs = rReq->getJobs(); objectstore::RetrieveRequest::JobDump job; @@ -3439,9 +3442,8 @@ const OStoreDB::RetrieveMount::MountInfo& OStoreDB::RetrieveMount::getMountInfo( //------------------------------------------------------------------------------ // OStoreDB::RetrieveMount::getNextJobBatch() //------------------------------------------------------------------------------ -std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::RetrieveMount:: -getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext &logContext) -{ +std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::RetrieveMount::getNextJobBatch(uint64_t filesRequested, + uint64_t bytesRequested, disk::DiskSystemList& diskSystemList, log::LogContext& logContext) { typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransferForUser> RQAlgos; RQAlgos rqAlgos(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); RQAlgos::PopCriteria popCriteria(filesRequested, bytesRequested); @@ -3607,7 +3609,7 @@ void OStoreDB::RetrieveMount::flushAsyncSuccessReports(std::list<cta::SchedulerD insertedRequests.push_back(RQTRTRFSAlgo::InsertedElement{&req->m_retrieveRequest, req->selectedCopyNb, req->archiveFile.tapeFiles.at(req->selectedCopyNb).fSeq, req->archiveFile.fileSize, cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, - serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess, req->m_activityDescription}); + serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess, req->m_activityDescription, req->m_diskSystemName}); requestToJobMap[&req->m_retrieveRequest] = req; } RQTRTRFSAlgo rQTRTRFSAlgo(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); @@ -4487,7 +4489,7 @@ void OStoreDB::RetrieveJob::failTransfer(const std::string &failureReason, log:: CaRqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaRqtr::InsertedElement{ &m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy, - serializers::RetrieveJobStatus::RJS_Failed, m_activityDescription + serializers::RetrieveJobStatus::RJS_Failed, m_activityDescription, m_diskSystemName }); m_retrieveRequest.commit(); rel.release(); @@ -4554,7 +4556,7 @@ void OStoreDB::RetrieveJob::failTransfer(const std::string &failureReason, log:: CaRqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaRqtr::InsertedElement{ &m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy, serializers::RetrieveJobStatus::RJS_ToTransferForUser, - m_activityDescription + m_activityDescription, m_diskSystemName }); CaRqtr caRqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); @@ -4618,7 +4620,7 @@ void OStoreDB::RetrieveJob::failReport(const std::string &failureReason, log::Lo CaRqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaRqtr::InsertedElement{ &m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy, - serializers::RetrieveJobStatus::RJS_ToReportToUserForFailure, m_activityDescription + serializers::RetrieveJobStatus::RJS_ToReportToUserForFailure, m_activityDescription, m_diskSystemName }); caRqtr.referenceAndSwitchOwnership(tf.vid, insertedElements, lc); log::ScopedParamContainer params(lc); @@ -4637,7 +4639,7 @@ void OStoreDB::RetrieveJob::failReport(const std::string &failureReason, log::Lo CaRqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaRqtr::InsertedElement{ &m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy, - serializers::RetrieveJobStatus::RJS_Failed, m_activityDescription + serializers::RetrieveJobStatus::RJS_Failed, m_activityDescription, m_diskSystemName }); caRqtr.referenceAndSwitchOwnership(tf.vid, insertedElements, lc); log::ScopedParamContainer params(lc); diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index de93c966c4..e7a9889c7f 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -217,7 +217,8 @@ public: OStoreDB & m_oStoreDB; public: const MountInfo & getMountInfo() override; - std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext& logContext) override; + std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, + disk::DiskSystemList& diskSystemList, log::LogContext& logContext) override; void complete(time_t completionTime) override; void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override; void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override; @@ -258,6 +259,7 @@ public: std::unique_ptr<objectstore::RetrieveRequest::AsyncJobSucceedForRepackReporter> m_jobSucceedForRepackReporter; objectstore::RetrieveRequest::RepackInfo m_repackInfo; optional<objectstore::RetrieveActivityDescription> m_activityDescription; + optional<std::string> m_diskSystemName; }; static RetrieveJob * castFromSchedDBJob(SchedulerDatabase::RetrieveJob * job); @@ -296,8 +298,9 @@ public: CTA_GENERATE_EXCEPTION_CLASS(RetrieveRequestHasNoCopies); CTA_GENERATE_EXCEPTION_CLASS(TapeCopyNumberOutOfRange); - std::string queueRetrieve(cta::common::dataStructures::RetrieveRequest& rqst, - const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria, log::LogContext &logContext) override; + std::string queueRetrieve(cta::common::dataStructures::RetrieveRequest& rqst, + const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, + const optional<std::string> diskSystemName, log::LogContext& logContext) override; std::list<RetrieveRequestDump> getRetrieveRequestsByVid(const std::string& vid) const override; diff --git a/scheduler/OStoreDB/OStoreDBFactory.hpp b/scheduler/OStoreDB/OStoreDBFactory.hpp index bb9cb26357..c407eaf74d 100644 --- a/scheduler/OStoreDB/OStoreDBFactory.hpp +++ b/scheduler/OStoreDB/OStoreDBFactory.hpp @@ -202,9 +202,11 @@ public: return m_OStoreDB.getRetrieveQueueStatistics(criteria, vidsToConsider); } - std::string queueRetrieve(common::dataStructures::RetrieveRequest& rqst, - const common::dataStructures::RetrieveFileQueueCriteria &criteria, log::LogContext &logContext) override { - return m_OStoreDB.queueRetrieve(rqst, criteria, logContext); + + std::string queueRetrieve(cta::common::dataStructures::RetrieveRequest& rqst, + const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, + const optional<std::string> diskSystemName, log::LogContext& logContext) override { + return m_OStoreDB.queueRetrieve(rqst, criteria, diskSystemName, logContext); } diff --git a/scheduler/RetrieveMount.cpp b/scheduler/RetrieveMount.cpp index a629077989..4cff8c76c8 100644 --- a/scheduler/RetrieveMount.cpp +++ b/scheduler/RetrieveMount.cpp @@ -110,7 +110,7 @@ std::string cta::RetrieveMount::getMediaType() const } //------------------------------------------------------------------------------ -// getVo() +// getVendor() //------------------------------------------------------------------------------ std::string cta::RetrieveMount::getVendor() const { @@ -121,6 +121,9 @@ std::string cta::RetrieveMount::getVendor() const return sVendor.str(); } +//------------------------------------------------------------------------------ +// getCapacityInBytes() +//------------------------------------------------------------------------------ uint64_t cta::RetrieveMount::getCapacityInBytes() const { if(!m_dbMount.get()) throw exception::Exception("In cta::RetrieveMount::getVendor(): got NULL dbMount"); @@ -134,9 +137,12 @@ std::list<std::unique_ptr<cta::RetrieveJob> > cta::RetrieveMount::getNextJobBatc log::LogContext& logContext) { if (!m_sessionRunning) throw SessionNotRunning("In RetrieveMount::getNextJobBatch(): trying to get job from complete/not started session"); + // Get the current file systems list from the catalogue + disk::DiskSystemList diskSystemList; + if (m_catalogue) diskSystemList = m_catalogue->getDiskSystems(); // Try and get a new job from the DB std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> dbJobBatch(m_dbMount->getNextJobBatch(filesRequested, - bytesRequested, logContext)); + bytesRequested, diskSystemList, logContext)); std::list<std::unique_ptr<RetrieveJob>> ret; // We prepare the response for (auto & sdrj: dbJobBatch) { @@ -227,6 +233,17 @@ cta::disk::DiskReporter* cta::RetrieveMount::createDiskReporter(std::string& URL return m_reporterFactory.createDiskReporter(URL); } +//------------------------------------------------------------------------------ +// setCatalogue() +//------------------------------------------------------------------------------ +void cta::RetrieveMount::setCatalogue(catalogue::Catalogue* catalogue) { + if (m_catalogue) + throw exception::Exception("In RetrieveMount::setCatalogue(): catalogue already set."); + if (!catalogue) + throw exception::Exception("In RetrieveMount::setCatalogue(): trying to set a null catalogue."); + m_catalogue = catalogue; +} + //------------------------------------------------------------------------------ // tapeComplete() //------------------------------------------------------------------------------ diff --git a/scheduler/RetrieveMount.hpp b/scheduler/RetrieveMount.hpp index fff81a1bea..eb81a9ef83 100644 --- a/scheduler/RetrieveMount.hpp +++ b/scheduler/RetrieveMount.hpp @@ -24,6 +24,7 @@ #include "scheduler/SchedulerDatabase.hpp" #include "scheduler/TapeMount.hpp" #include "disk/DiskReporterFactory.hpp" +#include "catalogue/Catalogue.hpp" #include <memory> #include <queue> @@ -191,6 +192,12 @@ namespace cta { */ disk::DiskReporter * createDiskReporter(std::string & URL); + /** + * Passes a reference to the catalogue, used for disk space back pressure + * @param catalogue + */ + void setCatalogue(catalogue::Catalogue *catalogue); + /** * Destructor. */ @@ -203,6 +210,11 @@ namespace cta { */ std::unique_ptr<cta::SchedulerDatabase::RetrieveMount> m_dbMount; + /** + * A reference to the catalogue + */ + catalogue::Catalogue * m_catalogue = nullptr; + /** * Internal tracking of the session completion */ diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index 4a356161ae..60d61aeb13 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -211,8 +211,15 @@ void Scheduler::queueRetrieve( // Get the queue criteria common::dataStructures::RetrieveFileQueueCriteria queueCriteria; queueCriteria = m_catalogue.prepareToRetrieveFile(instanceName, request.archiveFileID, request.requester, request.activity, lc); + // Get DiskSystem list. + auto diskSystemList = m_catalogue.getDiskSystems(); auto catalogueTime = t.secs(cta::utils::Timer::resetCounter); - std::string selectedVid = m_db.queueRetrieve(request, queueCriteria, lc); + // Determine disk system for this request, if any. + optional<std::string> diskSystemName; + try { + diskSystemName = diskSystemList.getFSNAme(request.dstURL); + } catch (std::out_of_range&) {} + std::string selectedVid = m_db.queueRetrieve(request, queueCriteria, diskSystemName, lc); auto schedulerDbTime = t.secs(); log::ScopedParamContainer spc(lc); spc.add("fileId", request.archiveFileID) diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index 414b3ee460..4e08c98272 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -38,6 +38,7 @@ #include "common/log/LogContext.hpp" #include "catalogue/TapeForWriting.hpp" #include "scheduler/TapeMount.hpp" +#include "disk/DiskSystem.hpp" #include <list> #include <limits> @@ -267,11 +268,13 @@ public: * @param rqst The request. * @param criteria The criteria retrieved from the CTA catalogue to be used to * decide how to quue the request. + * @param diskSystemName optional disk system name if the destination matches a declared one. * @param logContext context allowing logging db operation * @return the selected vid (mostly for logging) */ virtual std::string queueRetrieve(cta::common::dataStructures::RetrieveRequest &rqst, - const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria, log::LogContext &logContext) = 0; + const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria, const optional<std::string> diskSystemName, + log::LogContext &logContext) = 0; /** * Returns all of the existing retrieve jobs grouped by tape and then @@ -355,7 +358,7 @@ public: } mountInfo; virtual const MountInfo & getMountInfo() = 0; virtual std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> getNextJobBatch(uint64_t filesRequested, - uint64_t bytesRequested, log::LogContext& logContext) = 0; + uint64_t bytesRequested, disk::DiskSystemList & diskSystemList, log::LogContext& logContext) = 0; virtual void complete(time_t completionTime) = 0; virtual void setDriveStatus(common::dataStructures::DriveStatus status, time_t completionTime) = 0; virtual void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) = 0; diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp index d04d696798..a6b34e00e6 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp @@ -42,7 +42,8 @@ namespace unitTests{ class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount { const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); } - std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} + std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, + cta::disk::DiskSystemList& diskSystemList, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); } diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp index 2fff5e769f..cdbbbdabce 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp @@ -36,7 +36,7 @@ namespace unitTests{ class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount { const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); } - std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} + std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::disk::DiskSystemList& diskSystemList, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); } diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp index 4a331e51d2..912ab584c3 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp @@ -131,7 +131,8 @@ namespace unitTests class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount { const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); } - std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} + std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, + cta::disk::DiskSystemList& diskSystemList, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); } -- GitLab