diff --git a/ReleaseNotes.md b/ReleaseNotes.md index bf46e09f63b4cda421ca06f7de0d54ce1dd0b2e2..81c4ed1ab0c19b0025dec1004ea224a2e3aff152 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -4,6 +4,7 @@ This release reduces the number of DB queries issued to the CTA catalogue. ### Bug Fixes - cta/CTA#275 - Avoid DB queries via getTapesByVid in OStoreDB::fetchMountInfo +- cta/CTA#274 - Remove unnecessary catalogue DB queries from QueueCleanupRunner # v4.8.3-1 diff --git a/catalogue/Catalogue.hpp b/catalogue/Catalogue.hpp index ca66d8cb69e5a00005784cce591e7152034a4ab6..5e1cd13cb2c3502e575fca6a8819ef3ddce4217b 100644 --- a/catalogue/Catalogue.hpp +++ b/catalogue/Catalogue.hpp @@ -658,6 +658,18 @@ public: */ virtual common::dataStructures::VidToTapeMap getTapesByVid(const std::set<std::string> &vids) const = 0; + /** + * Returns the tapes with the specified volume identifiers. + * + * This method will throw an exception if it cannot find ALL of the specified + * tapes. + * + * @param vids The tape volume identifiers (VIDs). + * @param ignoreMissingVids Allow non-existing VIDs to be ignored. + * @return Map from tape volume identifier to tape. + */ + virtual common::dataStructures::VidToTapeMap getTapesByVid(const std::set<std::string> &vids, bool ignoreMissingVids) const = 0; + /** * Returns map from VID to logical library name for specified set of VIDs. * diff --git a/catalogue/CatalogueRetryWrapper.cpp b/catalogue/CatalogueRetryWrapper.cpp index a609c73e7c30473b501db055c923fd88984394ec..ca51a3b59f327ad3fcd5469845ca25f80a70542a 100644 --- a/catalogue/CatalogueRetryWrapper.cpp +++ b/catalogue/CatalogueRetryWrapper.cpp @@ -349,6 +349,10 @@ common::dataStructures::VidToTapeMap CatalogueRetryWrapper::getTapesByVid(const return retryOnLostConnection(m_log, [&]{return m_catalogue->getTapesByVid(vids);}, m_maxTriesToConnect); } +common::dataStructures::VidToTapeMap CatalogueRetryWrapper::getTapesByVid(const std::set<std::string> &vids, bool ignoreMissingVids) const { + return retryOnLostConnection(m_log, [&]{return m_catalogue->getTapesByVid(vids, ignoreMissingVids);}, m_maxTriesToConnect); +} + std::map<std::string, std::string> CatalogueRetryWrapper::getVidToLogicalLibrary(const std::set<std::string> &vids) const { return retryOnLostConnection(m_log, [&]{return m_catalogue->getVidToLogicalLibrary(vids);}, m_maxTriesToConnect); } diff --git a/catalogue/CatalogueRetryWrapper.hpp b/catalogue/CatalogueRetryWrapper.hpp index 55702751fb139b1040bb55ed91faca2106e36f35..cab499d1d4b14575f081ba2fc311e20329b68a97 100644 --- a/catalogue/CatalogueRetryWrapper.hpp +++ b/catalogue/CatalogueRetryWrapper.hpp @@ -199,6 +199,8 @@ public: common::dataStructures::VidToTapeMap getTapesByVid(const std::set<std::string> &vids) const override; + common::dataStructures::VidToTapeMap getTapesByVid(const std::set<std::string> &vids, bool ignoreMissingVids) const override; + std::map<std::string, std::string> getVidToLogicalLibrary(const std::set<std::string> &vids) const override; void reclaimTape(const common::dataStructures::SecurityIdentity &admin, const std::string &vid, cta::log::LogContext & lc) override; diff --git a/catalogue/CatalogueTest.cpp b/catalogue/CatalogueTest.cpp index c1ee3ac8f4fed26b6fe505fc61cb4a8bea782bd8..6004daedbdc3c4eaf7c19d6561b5296ae14d740f 100644 --- a/catalogue/CatalogueTest.cpp +++ b/catalogue/CatalogueTest.cpp @@ -14430,13 +14430,20 @@ TEST_P(cta_catalogue_CatalogueTest, deleteArchiveFile_by_archive_file_id_non_exi m_catalogue->DO_NOT_USE_deleteArchiveFile_DO_NOT_USE("disk_instance", 12345678, dummyLc); } -TEST_P(cta_catalogue_CatalogueTest, getTapesByVid_non_existent_tape) { +TEST_P(cta_catalogue_CatalogueTest, getTapesByVid_non_existent_tape_set) { using namespace cta; std::set<std::string> vids = {{"non_existent_tape"}}; ASSERT_THROW(m_catalogue->getTapesByVid(vids), exception::Exception); } +TEST_P(cta_catalogue_CatalogueTest, getTapesByVid_non_existent_tape_set_ignore_missing) { + using namespace cta; + + std::set<std::string> vids = {{"non_existent_tape"}}; + ASSERT_NO_THROW(m_catalogue->getTapesByVid(vids, true)); +} + TEST_P(cta_catalogue_CatalogueTest, getTapesByVid_no_vids) { using namespace cta; diff --git a/catalogue/DummyCatalogue.cpp b/catalogue/DummyCatalogue.cpp index 12d93142a82ac725657e819c6b01fcbaf13137d5..8fa521c501b351541995245930b1d2fd158a9d58 100644 --- a/catalogue/DummyCatalogue.cpp +++ b/catalogue/DummyCatalogue.cpp @@ -229,6 +229,9 @@ common::dataStructures::VidToTapeMap DummyCatalogue::getTapesByVid(const std::st return getTapesByVid(vids); } common::dataStructures::VidToTapeMap DummyCatalogue::getTapesByVid(const std::set<std::string>& vids) const { + return getTapesByVid(vids, false); +} +common::dataStructures::VidToTapeMap DummyCatalogue::getTapesByVid(const std::set<std::string>& vids, bool ignoreMissingVids) const { // Minimal implementation of VidToMap for retrieve request unit tests. We just support // disabled status for the tapes. // If the tape is not listed, it is listed as enabled in the return value. diff --git a/catalogue/DummyCatalogue.hpp b/catalogue/DummyCatalogue.hpp index 1ffaff73bdcc3f849db9f77d4b99e644cf8d1c03..f3fe0750d75f0843a10b48bf63d73c4b2d4a0abf 100644 --- a/catalogue/DummyCatalogue.hpp +++ b/catalogue/DummyCatalogue.hpp @@ -208,6 +208,8 @@ public: common::dataStructures::VidToTapeMap getTapesByVid(const std::set<std::string>& vids) const override; + common::dataStructures::VidToTapeMap getTapesByVid(const std::set<std::string>& vids, bool ignoreMissingVids) const override; + std::list<common::dataStructures::MountPolicy> getMountPolicies() const override; std::list<common::dataStructures::MountPolicy> getCachedMountPolicies() const override; diff --git a/catalogue/RdbmsCatalogue.cpp b/catalogue/RdbmsCatalogue.cpp index 327e1dae6a231990b275bd502812329f36c57656..102a55b0942463cef42350915053d610fc078a86 100644 --- a/catalogue/RdbmsCatalogue.cpp +++ b/catalogue/RdbmsCatalogue.cpp @@ -4391,6 +4391,13 @@ common::dataStructures::VidToTapeMap RdbmsCatalogue::getTapesByVid(const std::st // getTapesByVid (set of VIDs) //------------------------------------------------------------------------------ common::dataStructures::VidToTapeMap RdbmsCatalogue::getTapesByVid(const std::set<std::string> &vids) const { + return getTapesByVid(vids, false); +} + +//------------------------------------------------------------------------------ +// getTapesByVid (set of VIDs) +//------------------------------------------------------------------------------ +common::dataStructures::VidToTapeMap RdbmsCatalogue::getTapesByVid(const std::set<std::string> &vids, bool ignoreMissingVids) const { try { common::dataStructures::VidToTapeMap vidToTapeMap; @@ -4439,7 +4446,7 @@ common::dataStructures::VidToTapeMap RdbmsCatalogue::getTapesByVid(const std::se executeGetTapesByVidStmtAndCollectResults(stmt, vidToTapeMap); } - if(vids.size() != vidToTapeMap.size()) { + if(!ignoreMissingVids && (vids.size() != vidToTapeMap.size())) { exception::Exception ex; ex.getMessage() << "Not all tapes were found: expected=" << vids.size() << " actual=" << vidToTapeMap.size(); throw ex; diff --git a/catalogue/RdbmsCatalogue.hpp b/catalogue/RdbmsCatalogue.hpp index 79f2dbb1ab03d05ab4e072d8e5f07b6fc90dd049..05c6bd2b10abfd7894f48ffa613586f7de159952 100644 --- a/catalogue/RdbmsCatalogue.hpp +++ b/catalogue/RdbmsCatalogue.hpp @@ -563,6 +563,18 @@ public: */ common::dataStructures::VidToTapeMap getTapesByVid(const std::set<std::string> &vids) const override; + /** + * Returns the tapes with the specified volume identifiers. + * + * This method will throw an exception if it cannot find ALL of the specified + * tapes. + * + * @param vids The tape volume identifiers (VIDs). + * @param ignoreMissingVids Allow non-existing VIDs to be ignored. + * @return Map from tape volume identifier to tape. + */ + common::dataStructures::VidToTapeMap getTapesByVid(const std::set<std::string> &vids, bool ignoreMissingVids) const override; + /** * Returns map from VID to logical library name for specified set of VIDs. * diff --git a/objectstore/QueueCleanupRunner.cpp b/objectstore/QueueCleanupRunner.cpp index 0b8d61c86e2d02a9b18cf2ca74a3231373fb7547..b4ef3695b328a33379180d0dc17a4ec77629f167 100644 --- a/objectstore/QueueCleanupRunner.cpp +++ b/objectstore/QueueCleanupRunner.cpp @@ -32,34 +32,18 @@ void QueueCleanupRunner::runOnePass(log::LogContext &logContext) { admin.username = "Queue cleanup runner"; admin.host = cta::utils::getShortHostname(); - std::list<QueueCleanupInfo> queuesForCleanupInfo; + auto queueVidSet = std::set<std::string>(); auto queuesForCleanup = m_db.getRetrieveQueuesCleanupInfo(logContext); - // Check which queues need and can be cleaned up - - for (auto queue: queuesForCleanup) { - - cta::common::dataStructures::Tape tapeToCheck; - - try { - m_catalogue.countGetTapesByVid(cta::catalogue::countGetTapesByVid::QCR1); - auto vidToTapesMap = m_catalogue.getTapesByVid(queue.vid); //throws an exception if the vid is not found on the database - tapeToCheck = vidToTapesMap.at(queue.vid); - } catch (const exception::UserError &ex) { - log::ScopedParamContainer params(logContext); - params.add("tapeVid", queue.vid) - .add("cleanupFlag", queue.doCleanup) - .add("exceptionMessage", ex.getMessageValue()); - logContext.log(log::WARNING, "WARNING: In QueueCleanupRunner::runOnePass(): failed to find a tape in the database. Skipping it."); - continue; // Ignore queue - } + // Check, one-by-one, queues need to be cleaned up + for (const auto &queue: queuesForCleanup) { + // Do not clean a queue that does not have the cleanup flag set true if (!queue.doCleanup) { - // Do not clean a queue that does not have the cleanup flag set true continue; // Ignore queue } - // Check heartbeat of other agents + // Check heartbeat of other queues being cleaned up if (queue.assignedAgent.has_value()) { bool newEntry = false; @@ -87,41 +71,62 @@ void QueueCleanupRunner::runOnePass(log::LogContext &logContext) { } } } + queueVidSet.insert(queue.vid); + } + + common::dataStructures::VidToTapeMap vidToTapesMap; + + if (!queueVidSet.empty()){ + try { + m_catalogue.countGetTapesByVid(cta::catalogue::countGetTapesByVid::QCR1); + vidToTapesMap = m_catalogue.getTapesByVid(queueVidSet, true); + } catch (const exception::UserError &ex) { + log::ScopedParamContainer params(logContext); + params.add("exceptionMessage", ex.getMessageValue()); + logContext.log(log::ERR, + "ERROR: In QueueCleanupRunner::runOnePass(): failed to read set of tapes from the database. Aborting cleanup."); + return; // Unable to proceed from here... + } + + for (const auto &vid: queueVidSet) { + if (vidToTapesMap.count(vid) == 0) { + log::ScopedParamContainer params(logContext); + params.add("tapeVid", vid); + logContext.log(log::ERR, + "ERROR: In QueueCleanupRunner::runOnePass(): failed to find the tape " + vid + " in the database. Skipping it."); + } + } + } else { + logContext.log(log::DEBUG, + "DEBUG: In QueueCleanupRunner::runOnePass(): no queues requested a cleanup."); + return; + } + + for (const auto &[queueVid, tapeData]: vidToTapesMap) { - if (tapeToCheck.state != common::dataStructures::Tape::REPACKING_PENDING - && tapeToCheck.state != common::dataStructures::Tape::BROKEN_PENDING - && tapeToCheck.state != common::dataStructures::Tape::EXPORTED_PENDING) { + // Check if tape state is the expected one (PENDING) + if (tapeData.state != common::dataStructures::Tape::REPACKING_PENDING + && tapeData.state != common::dataStructures::Tape::BROKEN_PENDING + && tapeData.state != common::dataStructures::Tape::EXPORTED_PENDING) { // Do not cleanup a tape that is not in a X_PENDING state log::ScopedParamContainer params(logContext); - params.add("tapeVid", queue.vid) - .add("cleanupFlag", queue.doCleanup) - .add("tapeState", common::dataStructures::Tape::stateToString(tapeToCheck.state)); + params.add("tapeVid", queueVid) + .add("tapeState", common::dataStructures::Tape::stateToString(tapeData.state)); logContext.log( log::WARNING, "In QueueCleanupRunner::runOnePass(): Queue is has cleanup flag enabled but is not in the expected PENDING state. Skipping it."); continue; } - queuesForCleanupInfo.push_back(QueueCleanupInfo()); - queuesForCleanupInfo.back().vid = queue.vid; - queuesForCleanupInfo.back().tapeState = tapeToCheck.state; - } - - // Cleanup queues one by one - - for (auto qForCleanup: queuesForCleanupInfo) { - - utils::Timer t; - log::ScopedParamContainer loopParams(logContext); - loopParams.add("tapeVid", qForCleanup.vid) - .add("tapeState", common::dataStructures::Tape::stateToString(qForCleanup.tapeState)); + loopParams.add("tapeVid", queueVid) + .add("tapeState", common::dataStructures::Tape::stateToString(tapeData.state)); try { - bool prevHeartbeatExists = (m_heartbeatCheck.find(qForCleanup.vid) != m_heartbeatCheck.end()); + bool prevHeartbeatExists = (m_heartbeatCheck.find(queueVid) != m_heartbeatCheck.end()); m_db.reserveRetrieveQueueForCleanup( - qForCleanup.vid, - prevHeartbeatExists ? std::optional(m_heartbeatCheck[qForCleanup.vid].heartbeat) : std::nullopt); + queueVid, + prevHeartbeatExists ? std::optional(m_heartbeatCheck[queueVid].heartbeat) : std::nullopt); } catch (OStoreDB::RetrieveQueueNotFound & ex) { log::ScopedParamContainer paramsExcMsg(logContext); paramsExcMsg.add("exceptionMessage", ex.getMessageValue()); @@ -142,14 +147,13 @@ void QueueCleanupRunner::runOnePass(log::LogContext &logContext) { continue; } - // Transfer all the jobs to a different queue, or report to the user if no replicas exist - + // Transfer all the jobs to a different queue (if there are replicas) or report the error back to the user while (true) { utils::Timer tLoop; log::ScopedParamContainer paramsLoopMsg(logContext); - auto dbRet = m_db.getNextRetrieveJobsToTransferBatch(qForCleanup.vid, m_batchSize, logContext); + auto dbRet = m_db.getNextRetrieveJobsToTransferBatch(queueVid, m_batchSize, logContext); if (dbRet.empty()) break; std::list<cta::SchedulerDatabase::RetrieveJob *> jobPtList; for (auto &j: dbRet) { @@ -161,12 +165,12 @@ void QueueCleanupRunner::runOnePass(log::LogContext &logContext) { paramsLoopMsg.add("numberOfJobsMoved", dbRet.size()) .add("jobMovingTime", jobMovingTime) - .add("tapeVid", qForCleanup.vid); + .add("tapeVid", queueVid); logContext.log(cta::log::INFO,"In DiskReportRunner::runOnePass(): Queue jobs moved."); // Tick heartbeat try { - m_db.tickRetrieveQueueCleanupHeartbeat(qForCleanup.vid); + m_db.tickRetrieveQueueCleanupHeartbeat(queueVid); } catch (OStoreDB::RetrieveQueueNotFound & ex) { break; // Queue was already deleted, probably after all the requests have been removed } catch (OStoreDB::RetrieveQueueNotReservedForCleanup & ex) { @@ -184,18 +188,17 @@ void QueueCleanupRunner::runOnePass(log::LogContext &logContext) { } } - // Finally, update the tape state - + // Finally, update the tape state out of PENDING { - cta::common::dataStructures::Tape tapeToModify; + cta::common::dataStructures::Tape tapeDataRefreshed; try { m_catalogue.countGetTapesByVid(cta::catalogue::countGetTapesByVid::QCR2); - auto vidToTapesMap = m_catalogue.getTapesByVid(qForCleanup.vid); //throws an exception if the vid is not found on the database - tapeToModify = vidToTapesMap.at(qForCleanup.vid); + auto vidToTapesMapRefreshed = m_catalogue.getTapesByVid(queueVid); //throws an exception if the vid is not found on the database + tapeDataRefreshed = vidToTapesMapRefreshed.at(queueVid); } catch (const exception::UserError &ex) { log::ScopedParamContainer params(logContext); - params.add("tapeVid", qForCleanup.vid) + params.add("tapeVid", queueVid) .add("exceptionMessage", ex.getMessageValue()); logContext.log(log::WARNING, "WARNING: In QueueCleanupRunner::runOnePass(): Failed to find a tape in the database. Unable to update tape state."); continue; // Ignore queue @@ -203,25 +206,25 @@ void QueueCleanupRunner::runOnePass(log::LogContext &logContext) { // Finally, modify tape state to REPACKING or BROKEN // The STATE_REASON set by operator will be preserved, with just an extra message prepended. - std::optional<std::string> prevReason = tapeToModify.stateReason; - switch (tapeToModify.state) { + std::optional<std::string> prevReason = tapeDataRefreshed.stateReason; + switch (tapeDataRefreshed.state) { case common::dataStructures::Tape::REPACKING_PENDING: - m_catalogue.modifyTapeState(admin, qForCleanup.vid, common::dataStructures::Tape::REPACKING, common::dataStructures::Tape::REPACKING_PENDING, prevReason.value_or("QueueCleanupRunner: changed tape state to REPACKING")); - m_db.clearRetrieveQueueStatisticsCache(qForCleanup.vid); + m_catalogue.modifyTapeState(admin, queueVid, common::dataStructures::Tape::REPACKING, common::dataStructures::Tape::REPACKING_PENDING, prevReason.value_or("QueueCleanupRunner: changed tape state to REPACKING")); + m_db.clearRetrieveQueueStatisticsCache(queueVid); break; case common::dataStructures::Tape::BROKEN_PENDING: - m_catalogue.modifyTapeState(admin, qForCleanup.vid, common::dataStructures::Tape::BROKEN, common::dataStructures::Tape::BROKEN_PENDING, prevReason.value_or("QueueCleanupRunner: changed tape state to BROKEN")); - m_db.clearRetrieveQueueStatisticsCache(qForCleanup.vid); + m_catalogue.modifyTapeState(admin, queueVid, common::dataStructures::Tape::BROKEN, common::dataStructures::Tape::BROKEN_PENDING, prevReason.value_or("QueueCleanupRunner: changed tape state to BROKEN")); + m_db.clearRetrieveQueueStatisticsCache(queueVid); break; case common::dataStructures::Tape::EXPORTED_PENDING: - m_catalogue.modifyTapeState(admin, qForCleanup.vid, common::dataStructures::Tape::EXPORTED, common::dataStructures::Tape::EXPORTED_PENDING, prevReason.value_or("QueueCleanupRunner: changed tape state to EXPORTED")); - m_db.clearRetrieveQueueStatisticsCache(qForCleanup.vid); + m_catalogue.modifyTapeState(admin, queueVid, common::dataStructures::Tape::EXPORTED, common::dataStructures::Tape::EXPORTED_PENDING, prevReason.value_or("QueueCleanupRunner: changed tape state to EXPORTED")); + m_db.clearRetrieveQueueStatisticsCache(queueVid); break; default: log::ScopedParamContainer paramsWarnMsg(logContext); - paramsWarnMsg.add("tapeVid", qForCleanup.vid) - .add("expectedPrevState", common::dataStructures::Tape::stateToString(qForCleanup.tapeState)) - .add("actualPrevState", common::dataStructures::Tape::stateToString(tapeToModify.state)); + paramsWarnMsg.add("tapeVid", queueVid) + .add("expectedPrevState", common::dataStructures::Tape::stateToString(tapeData.state)) + .add("actualPrevState", common::dataStructures::Tape::stateToString(tapeDataRefreshed.state)); logContext.log(log::WARNING, "WARNING: In QueueCleanupRunner::runOnePass(): Cleaned up tape is not in a PENDING state. Unable to change it to its corresponding final state."); break; } diff --git a/objectstore/QueueCleanupRunner.hpp b/objectstore/QueueCleanupRunner.hpp index dc4281ba7e400fed49ae677bfdd9697008d6d07b..ca0f32d35c8cdd290e6478cd8d79f29730f69e6a 100644 --- a/objectstore/QueueCleanupRunner.hpp +++ b/objectstore/QueueCleanupRunner.hpp @@ -51,11 +51,6 @@ public: private: - struct QueueCleanupInfo { - std::string vid; - cta::common::dataStructures::Tape::State tapeState; - }; - struct HeartbeatStatus { std::string agent; uint64_t heartbeat; diff --git a/objectstore/QueueCleanupRunnerConcurrentTest.cpp b/objectstore/QueueCleanupRunnerConcurrentTest.cpp index f37ba9844333cd1d12119610b78396910325a42f..c3c95b121471c97e06b8efa7470c6b9b15bab7a5 100644 --- a/objectstore/QueueCleanupRunnerConcurrentTest.cpp +++ b/objectstore/QueueCleanupRunnerConcurrentTest.cpp @@ -205,7 +205,7 @@ public: using OStoreDBWithAgent::OStoreDBWithAgent; std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> getNextRetrieveJobsToTransferBatch( - std::string & vid, uint64_t filesRequested, cta::log::LogContext &logContext) override { + const std::string & vid, uint64_t filesRequested, cta::log::LogContext &logContext) override { throw TriggeredException(); } }; diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 864e8db589c86b603e5012f89160b88298685844..67574a0332857ad6c9cf598a5cf20fab88f03a59 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -1976,7 +1976,7 @@ void OStoreDB::requeueRetrieveRequestJobs(std::list<cta::SchedulerDatabase::Retr //------------------------------------------------------------------------------ // OStoreDB::reserveRetrieveQueueForCleanup() //------------------------------------------------------------------------------ -void OStoreDB::reserveRetrieveQueueForCleanup(std::string & vid, std::optional<uint64_t> cleanupHeartBeatValue) { +void OStoreDB::reserveRetrieveQueueForCleanup(const std::string & vid, std::optional<uint64_t> cleanupHeartBeatValue) { RootEntry re(m_objectStore); RetrieveQueue rq(m_objectStore); @@ -2016,7 +2016,7 @@ void OStoreDB::reserveRetrieveQueueForCleanup(std::string & vid, std::optional<u //------------------------------------------------------------------------------ // OStoreDB::tickRetrieveQueueCleanupHeartbeat() //------------------------------------------------------------------------------ -void OStoreDB::tickRetrieveQueueCleanupHeartbeat(std::string & vid) { +void OStoreDB::tickRetrieveQueueCleanupHeartbeat(const std::string & vid) { RootEntry re(m_objectStore); RetrieveQueue rq(m_objectStore); @@ -2171,7 +2171,7 @@ auto OStoreDB::getRepackStatisticsNoLock() -> std::unique_ptr<SchedulerDatabase: // OStoreDB::getNextRetrieveJobsToTransferBatch() //------------------------------------------------------------------------------ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::getNextRetrieveJobsToTransferBatch( - std::string & vid, uint64_t filesRequested, log::LogContext &logContext) { + const std::string & vid, uint64_t filesRequested, log::LogContext &logContext) { using RQTTAlgo = objectstore::ContainerAlgorithms<RetrieveQueue, RetrieveQueueToTransfer>; RQTTAlgo rqttAlgo(m_objectStore, *m_agentReference); diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index 89b7aadc208a181dae2184c57853eee0157b4c1c..0c3fc6be5333ce3cdd103aa92992e941c43edd8e 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -472,10 +472,10 @@ class OStoreDB: public SchedulerDatabase { // common::dataStructures::JobQueueType queueType = common::dataStructures::JobQueueType::JobsToTransferForUser) const; std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> getNextRetrieveJobsToTransferBatch( - std::string & vid, uint64_t filesRequested, log::LogContext &logContext) override; + const std::string & vid, uint64_t filesRequested, log::LogContext &logContext) override; void requeueRetrieveRequestJobs(std::list<cta::SchedulerDatabase::RetrieveJob *> &jobs, log::LogContext& logContext) override; - void reserveRetrieveQueueForCleanup(std::string & vid, std::optional<uint64_t> cleanupHeartBeatValue) override; - void tickRetrieveQueueCleanupHeartbeat(std::string & vid) override; + void reserveRetrieveQueueForCleanup(const std::string & vid, std::optional<uint64_t> cleanupHeartBeatValue) override; + void tickRetrieveQueueCleanupHeartbeat(const std::string & vid) override; CTA_GENERATE_EXCEPTION_CLASS(RetrieveQueueNotReservedForCleanup); CTA_GENERATE_EXCEPTION_CLASS(RetrieveQueueNotFound); diff --git a/scheduler/PostgresSchedDB/PostgresSchedDB.cpp b/scheduler/PostgresSchedDB/PostgresSchedDB.cpp index d6a01f9cff2f2400f621c0ec2e752c2eafea66a4..ae058a2b9d0ff86a8021fec0efafafae7093fe0f 100644 --- a/scheduler/PostgresSchedDB/PostgresSchedDB.cpp +++ b/scheduler/PostgresSchedDB/PostgresSchedDB.cpp @@ -75,7 +75,7 @@ SchedulerDatabase::JobsFailedSummary PostgresSchedDB::getArchiveJobsFailedSummar throw cta::exception::Exception("Not implemented"); } -std::list<std::unique_ptr<RetrieveJob>> PostgresSchedDB::getNextRetrieveJobsToTransferBatch(std::string & vid, uint64_t filesRequested, log::LogContext &lc) +std::list<std::unique_ptr<RetrieveJob>> PostgresSchedDB::getNextRetrieveJobsToTransferBatch(const std::string & vid, uint64_t filesRequested, log::LogContext &lc) { throw cta::exception::Exception("Not implemented"); } @@ -85,12 +85,12 @@ void PostgresSchedDB::requeueRetrieveRequestJobs(std::list<cta::SchedulerDatabas throw cta::exception::Exception("Not implemented"); } -void PostgresSchedDB::reserveRetrieveQueueForCleanup(std::string & vid, std::optional<uint64_t> cleanupHeartBeatValue) +void PostgresSchedDB::reserveRetrieveQueueForCleanup(const std::string & vid, std::optional<uint64_t> cleanupHeartBeatValue) { throw cta::exception::Exception("Not implemented"); } -void PostgresSchedDB::tickRetrieveQueueCleanupHeartbeat(std::string & vid) +void PostgresSchedDB::tickRetrieveQueueCleanupHeartbeat(const std::string & vid) { throw cta::exception::Exception("Not implemented"); } diff --git a/scheduler/PostgresSchedDB/PostgresSchedDB.hpp b/scheduler/PostgresSchedDB/PostgresSchedDB.hpp index a19e91673522f4c572f8a1c8efd826e4d1369030..e6d9695d8e21b35dcc6395d45aa3c6c7b18d16c9 100644 --- a/scheduler/PostgresSchedDB/PostgresSchedDB.hpp +++ b/scheduler/PostgresSchedDB/PostgresSchedDB.hpp @@ -82,13 +82,13 @@ class PostgresSchedDB: public SchedulerDatabase { JobsFailedSummary getArchiveJobsFailedSummary(log::LogContext &logContext) override; - std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> getNextRetrieveJobsToTransferBatch(std::string & vid, uint64_t filesRequested, log::LogContext &lc) override; + std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> getNextRetrieveJobsToTransferBatch(const std::string & vid, uint64_t filesRequested, log::LogContext &lc) override; void requeueRetrieveRequestJobs(std::list<cta::SchedulerDatabase::RetrieveJob *> &jobs, log::LogContext &lc) override; - void reserveRetrieveQueueForCleanup(std::string & vid, std::optional<uint64_t> cleanupHeartBeatValue) override; + void reserveRetrieveQueueForCleanup(const std::string & vid, std::optional<uint64_t> cleanupHeartBeatValue) override; - void tickRetrieveQueueCleanupHeartbeat(std::string & vid) override; + void tickRetrieveQueueCleanupHeartbeat(const std::string & vid) override; void setArchiveJobBatchReported(std::list<SchedulerDatabase::ArchiveJob*> & jobsBatch, log::TimingList & timingList, utils::Timer & t, log::LogContext & lc) override; diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index 0e9a2f50a617ec72923638d14dc23da92d4cbb0a..d6ba539aded9fecb825ae4bfa4eadf6e70407bb1 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -606,10 +606,10 @@ class SchedulerDatabase { /***/ virtual std::unique_ptr<RepackRequest> getNextRepackJobToExpand() = 0; - virtual std::list<std::unique_ptr<RetrieveJob>> getNextRetrieveJobsToTransferBatch(std::string & vid, uint64_t filesRequested, log::LogContext &logContext) = 0; + virtual std::list<std::unique_ptr<RetrieveJob>> getNextRetrieveJobsToTransferBatch(const std::string & vid, uint64_t filesRequested, log::LogContext &logContext) = 0; virtual void requeueRetrieveRequestJobs(std::list<cta::SchedulerDatabase::RetrieveJob *> &jobs, log::LogContext& logContext) = 0; - virtual void reserveRetrieveQueueForCleanup(std::string & vid, std::optional<uint64_t> cleanupHeartBeatValue) = 0; - virtual void tickRetrieveQueueCleanupHeartbeat(std::string & vid) = 0; + virtual void reserveRetrieveQueueForCleanup(const std::string & vid, std::optional<uint64_t> cleanupHeartBeatValue) = 0; + virtual void tickRetrieveQueueCleanupHeartbeat(const std::string & vid) = 0; /*============ Repack management: maintenance process side =========================*/ diff --git a/scheduler/SchedulerDatabaseFactory.hpp b/scheduler/SchedulerDatabaseFactory.hpp index 2556c04aaa584de1c5f9cb49995161064306f766..fc127d3a7ebd86cf70fac13bab47b04d8ab0e913 100644 --- a/scheduler/SchedulerDatabaseFactory.hpp +++ b/scheduler/SchedulerDatabaseFactory.hpp @@ -115,7 +115,7 @@ public: return m_SchedDB->getArchiveJobsFailedSummary(lc); } - std::list<std::unique_ptr<RetrieveJob>> getNextRetrieveJobsToTransferBatch(std::string & vid, uint64_t filesRequested, log::LogContext &lc) override { + std::list<std::unique_ptr<RetrieveJob>> getNextRetrieveJobsToTransferBatch(const std::string & vid, uint64_t filesRequested, log::LogContext &lc) override { return m_SchedDB->getNextRetrieveJobsToTransferBatch(vid, filesRequested, lc); } @@ -123,11 +123,11 @@ public: m_SchedDB->requeueRetrieveRequestJobs(jobs, lc); } - void reserveRetrieveQueueForCleanup(std::string & vid, std::optional<uint64_t> cleanupHeartBeatValue) override { + void reserveRetrieveQueueForCleanup(const std::string & vid, std::optional<uint64_t> cleanupHeartBeatValue) override { m_SchedDB->reserveRetrieveQueueForCleanup(vid, cleanupHeartBeatValue); } - void tickRetrieveQueueCleanupHeartbeat(std::string & vid) override { + void tickRetrieveQueueCleanupHeartbeat(const std::string & vid) override { m_SchedDB->tickRetrieveQueueCleanupHeartbeat(vid); }