From 2ce7d5d6045bb3c8f0ff42bd3628a8df03ff2493 Mon Sep 17 00:00:00 2001 From: Eric Cano <Eric.Cano@cern.ch> Date: Thu, 20 Jul 2017 11:48:30 +0200 Subject: [PATCH] Moved catalogue access of Scheduler::queueRetrieve() into OStoreDB::queueRetrieve() Changed the default behavior of DummyCatalogue so unit tests keep passing. Adapted SchedulerDatabase API. --- catalogue/DummyCatalogue.hpp | 4 +-- objectstore/GarbageCollectorTest.cpp | 3 +++ objectstore/Helpers.cpp | 2 +- scheduler/OStoreDB/OStoreDB.cpp | 16 ++++++----- scheduler/OStoreDB/OStoreDB.hpp | 5 ++-- scheduler/OStoreDB/OStoreDBFactory.hpp | 7 +++-- scheduler/Scheduler.cpp | 37 +------------------------- scheduler/SchedulerDatabase.hpp | 10 +++---- 8 files changed, 27 insertions(+), 57 deletions(-) diff --git a/catalogue/DummyCatalogue.hpp b/catalogue/DummyCatalogue.hpp index d41e119999..b3eb001562 100644 --- a/catalogue/DummyCatalogue.hpp +++ b/catalogue/DummyCatalogue.hpp @@ -122,14 +122,14 @@ public: common::dataStructures::VidToTapeMap getTapesByVid(const std::set<std::string>& vids) const { // Minimal implementation of VidToMap for retrieve request unit tests. We just support // disabled status for the tapes. - // If the tape is not listed, it is listed as disabled in the return value. + // If the tape is not listed, it is listed as enabled in the return value. threading::MutexLocker lm(m_tapeEnablingMutex); common::dataStructures::VidToTapeMap ret; for (const auto & v: vids) { try { ret[v].disabled = !m_tapeEnabling.at(v); } catch (std::out_of_range &) { - ret[v].disabled = true; + ret[v].disabled = false; } } return ret; diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index 5d533778cc..507955e0cf 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -1,3 +1,4 @@ + /* * The CERN Tape Archive (CTA) project * Copyright (C) 2015 CERN @@ -597,6 +598,8 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) { } // Mark the tape as enabled catalogue.addEnabledTape("Tape0"); + // Mark the other tape as disabled + catalogue.addDisabledTape("Tape1"); // Create the garbage collector and run it twice. cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector"); cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be); diff --git a/objectstore/Helpers.cpp b/objectstore/Helpers.cpp index 4da82bba89..83bf4eaac9 100644 --- a/objectstore/Helpers.cpp +++ b/objectstore/Helpers.cpp @@ -242,7 +242,7 @@ std::map<std::string, Helpers::RetrieveQueueStatisticsWithTime> Helpers::g_retri cta::threading::Mutex Helpers::g_retrieveQueueStatisticsMutex; //------------------------------------------------------------------------------ -// Helpers::getLockedAndFetchedArchiveQueue() +// Helpers::getRetrieveQueueStatistics() //------------------------------------------------------------------------------ std::list<SchedulerDatabase::RetrieveQueueStatistics> Helpers::getRetrieveQueueStatistics( const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, const std::set<std::string>& vidsToConsider, diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 25febcb40d..be4d31ad94 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -767,14 +767,17 @@ std::list<SchedulerDatabase::RetrieveQueueStatistics> OStoreDB::getRetrieveQueue //------------------------------------------------------------------------------ // OStoreDB::queueRetrieve() //------------------------------------------------------------------------------ -void OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveRequest& rqst, - const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, - const std::string &vid) { +std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveRequest& rqst, + const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria) { assertAgentAddressSet(); + // Get the best vid from the cache + std::set<std::string> candidateVids; + for (auto & tf:criteria.archiveFile.tapeFiles) candidateVids.insert(tf.second.vid); + std::string bestVid=Helpers::selectBestRetrieveQueue(candidateVids, m_catalogue, m_objectStore); // Check that the requested retrieve job (for the provided vid) exists. if (!std::count_if(criteria.archiveFile.tapeFiles.cbegin(), criteria.archiveFile.tapeFiles.end(), - [vid](decltype(*criteria.archiveFile.tapeFiles.cbegin()) & tf){ return tf.second.vid == vid; })) + [bestVid](decltype(*criteria.archiveFile.tapeFiles.cbegin()) & tf){ return tf.second.vid == bestVid; })) throw RetrieveRequestHasNoCopies("In OStoreDB::queueRetrieve(): no tape file for requested vid."); // In order to post the job, construct it first in memory. objectstore::RetrieveRequest rReq(m_agentReference->nextId("RetrieveRequest"), m_objectStore); @@ -792,7 +795,7 @@ void OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveRequest& RootEntry re(m_objectStore); ScopedExclusiveLock rel(re); re.fetch(); - auto rqAddr=re.addOrGetRetrieveQueueAndCommit(vid, *m_agentReference); + auto rqAddr=re.addOrGetRetrieveQueueAndCommit(bestVid, *m_agentReference); // Create the request. rel.release(); RetrieveQueue rq(rqAddr, m_objectStore); @@ -800,7 +803,7 @@ void OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveRequest& rq.fetch(); // We need to find the job corresponding to the vid for (auto & j: rReq.getArchiveFile().tapeFiles) { - if (j.second.vid == vid) { + if (j.second.vid == bestVid) { rq.addJob(j.second.copyNb, j.second.fSeq, rReq.getAddressIfSet(), criteria.archiveFile.fileSize, criteria.mountPolicy, rReq.getEntryLog().time); rReq.setActiveCopyNumber(j.second.copyNb); @@ -819,6 +822,7 @@ void OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveRequest& rrl.release(); // And relinquish ownership form agent m_agentReference->removeFromOwnership(rReq.getAddressIfSet(), m_objectStore); + return bestVid; } //------------------------------------------------------------------------------ diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index e3b9bebd88..3146b3cc3b 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -231,9 +231,8 @@ public: CTA_GENERATE_EXCEPTION_CLASS(RetrieveRequestHasNoCopies); CTA_GENERATE_EXCEPTION_CLASS(TapeCopyNumberOutOfRange); - void queueRetrieve(const cta::common::dataStructures::RetrieveRequest& rqst, - const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria, - const std::string &vid) override; + std::string queueRetrieve(const cta::common::dataStructures::RetrieveRequest& rqst, + const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria) override; std::list<RetrieveRequestDump> getRetrieveRequestsByVid(const std::string& vid) const override; diff --git a/scheduler/OStoreDB/OStoreDBFactory.hpp b/scheduler/OStoreDB/OStoreDBFactory.hpp index 0afddd3d6a..5a6e39dc78 100644 --- a/scheduler/OStoreDB/OStoreDBFactory.hpp +++ b/scheduler/OStoreDB/OStoreDBFactory.hpp @@ -139,10 +139,9 @@ public: return m_OStoreDB.getRetrieveQueueStatistics(criteria, vidsToConsider); } - void queueRetrieve(const common::dataStructures::RetrieveRequest& rqst, - const common::dataStructures::RetrieveFileQueueCriteria &criteria, - const std::string &vid) override { - m_OStoreDB.queueRetrieve(rqst, criteria, vid); + std::string queueRetrieve(const common::dataStructures::RetrieveRequest& rqst, + const common::dataStructures::RetrieveFileQueueCriteria &criteria) override { + return m_OStoreDB.queueRetrieve(rqst, criteria); } std::list<cta::common::dataStructures::DriveState> getDriveStates() const override { diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index 07b561aed8..a8da1fa006 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -130,41 +130,7 @@ void Scheduler::queueRetrieve( // Get the queue criteria const common::dataStructures::RetrieveFileQueueCriteria queueCriteria = m_catalogue.prepareToRetrieveFile(instanceName, request.archiveFileID, request.requester); - // Get the statuses of the tapes on which we have files. - std::set<std::string> vids; - for (auto & tf: queueCriteria.archiveFile.tapeFiles) { - vids.insert(tf.second.vid); - } - auto tapeStatuses = m_catalogue.getTapesByVid(vids); - // Filter out the tapes with are disabled - for (auto & t: tapeStatuses) { - if (t.second.disabled) - vids.erase(t.second.vid); - } - if (vids.empty()) - throw exception::NonRetryableError("In Scheduler::queueRetrieve(): all copies are on disabled tapes"); - auto catalogueTime = t.secs(utils::Timer::resetCounter); - // Get the statistics for the potential tapes on which we will retrieve. - auto stats=m_db.getRetrieveQueueStatistics(queueCriteria, vids); - // Sort the potential queues. - stats.sort(SchedulerDatabase::RetrieveQueueStatistics::leftGreaterThanRight); - // If there are several equivalent entries, choose randomly among them. - // First element will always be selected. - std::set<std::string> candidateVids; - for (auto & s: stats) { - if (!(s<stats.front()) && !(s>stats.front())) - candidateVids.insert(s.vid); - } - if (candidateVids.empty()) - throw exception::Exception("In Scheduler::queueRetrieve(): failed to sort and select candidate VIDs"); - // We need to get a random number [0, candidateVids.size() -1] - std::default_random_engine dre(std::chrono::system_clock::now().time_since_epoch().count()); - std::uniform_int_distribution<size_t> distribution(0, candidateVids.size() -1); - size_t index=distribution(dre); - auto it=candidateVids.cbegin(); - std::advance(it, index); - std::string selectedVid=*it; - m_db.queueRetrieve(request, queueCriteria, selectedVid); + std::string selectedVid = m_db.queueRetrieve(request, queueCriteria); auto schedulerDbTime = t.secs(); log::ScopedParamContainer spc(lc); spc.add("fileId", request.archiveFileID) @@ -201,7 +167,6 @@ void Scheduler::queueRetrieve( .add("policyMaxDrives", queueCriteria.mountPolicy.maxDrivesAllowed) .add("policyMinAge", queueCriteria.mountPolicy.retrieveMinRequestAge) .add("policyPriority", queueCriteria.mountPolicy.retrievePriority) - .add("catalogueTime", catalogueTime) .add("schedulerDbTime", schedulerDbTime); lc.log(log::INFO, "Queued retrieve request"); } diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index b4685aba82..35be19c52e 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -243,16 +243,16 @@ public: const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria, const std::set<std::string> & vidsToConsider) = 0; /** - * Queues the specified request. + * Queues the specified request. As the object store has access to the catalogue, + * the best queue (most likely to go, and not disabled can be chosen directly there). * * @param rqst The request. * @param criteria The criteria retrieved from the CTA catalogue to be used to * decide how to quue the request. - * @param vid: the vid of the retrieve queue on which we will queue the request. + * @return the selected vid (mostly for logging) */ - virtual void queueRetrieve(const cta::common::dataStructures::RetrieveRequest &rqst, - const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria, - const std::string &vid) = 0; + virtual std::string queueRetrieve(const cta::common::dataStructures::RetrieveRequest &rqst, + const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria) = 0; /** * Returns all of the existing retrieve jobs grouped by tape and then -- GitLab