diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index afd8bdb177e988f27f25953ceeed9769f398ff30..ff1ee159110bafa618db9cf2548ed228e8b40472 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -587,7 +587,7 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon } requestsUpdatePreparationTime = t.secs(utils::Timer::resetCounter); // Now collect the results. - bool rqUpdated=false; + std::list<std::string> requestsToDequeue; for (auto & rrup: rrUpdatersParams) { try { rrup.updater->wait(); @@ -630,16 +630,15 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon // In all cases, the object did NOT make it to the queue. filesDequeued ++; bytesDequeued += rrup.retrieveRequest->getArchiveFile().fileSize; - rq.removeJob(rrup.retrieveRequest->getAddressIfSet()); - rqUpdated=true; + requestsToDequeue.push_back(rrup.retrieveRequest->getAddressIfSet()); } } requestsUpdatingTime = t.secs(utils::Timer::resetCounter); - if (rqUpdated) { - rq.commit(); + if (requestsToDequeue.size()) { + rq.removeJobsAndCommit(requestsToDequeue); log::ScopedParamContainer params(lc); params.add("retreveQueueObject", rq.getAddressIfSet()); - lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): RE-committed retrieve queue after error handling."); + lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): Cleaned up and re-committed retrieve queue after error handling."); queueRecommitTime = t.secs(utils::Timer::resetCounter); } } diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index bfd40ea3b237dd9fda67998d3b32ed39b0d4d7b1..44d48285cd19cd7ff8e0d0b83a247235c47b2042 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -638,10 +638,11 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) { cta::objectstore::RetrieveQueue rq(re.getRetrieveQueueAddress(vid), be); cta::objectstore::ScopedExclusiveLock rql(rq); rq.fetch(); + std::list<std::string> jtrl; for (auto &j: rq.dumpJobs()) { - rq.removeJob(j.address); + jtrl.push_back(j.address); } - rq.commit(); + rq.removeJobsAndCommit(jtrl); rql.release(); // Remove queues from root re.removeRetrieveQueueAndCommit(vid, lc); diff --git a/objectstore/RetrieveQueue.cpp b/objectstore/RetrieveQueue.cpp index c1c0aae9da50679dff1319f88ce4846dbeeb7f00..576b1aa1ff62bcc6a2291d42e67afbd4c406f89c 100644 --- a/objectstore/RetrieveQueue.cpp +++ b/objectstore/RetrieveQueue.cpp @@ -244,34 +244,39 @@ auto cta::objectstore::RetrieveQueue::dumpJobs() -> std::list<JobDump> { return ret; } -void cta::objectstore::RetrieveQueue::removeJob(const std::string& retrieveToFileAddress) { +void cta::objectstore::RetrieveQueue::removeJobsAndCommit(const std::list<std::string>& requestsToRemove) { checkPayloadWritable(); auto * jl = m_payload.mutable_retrievejobs(); - bool found=false; - do { - found=false; - // Push the found entry all the way to the end. - for (size_t i=0; i<(size_t)jl->size(); i++) { - if (jl->Get(i).address() == retrieveToFileAddress) { - found = true; - // Keep track of the mounting criteria - ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); - maxDriveAllowedMap.decCount(jl->Get(i).maxdrivesallowed()); - ValueCountMap priorityMap(m_payload.mutable_prioritymap()); - priorityMap.decCount(jl->Get(i).priority()); - ValueCountMap minRetrieveRequestAgeMap(m_payload.mutable_minretrieverequestagemap()); - minRetrieveRequestAgeMap.decCount(jl->Get(i).minretrieverequestage()); - while (i+1 < (size_t)jl->size()) { - jl->SwapElements(i, i+1); - i++; + bool jobRemoved=false; + for (auto &rrt: requestsToRemove) { + bool found=false; + do { + found=false; + // Push the found entry all the way to the end. + for (size_t i=0; i<(size_t)jl->size(); i++) { + if (jl->Get(i).address() == rrt) { + found = true; + // Keep track of the mounting criteria + ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); + maxDriveAllowedMap.decCount(jl->Get(i).maxdrivesallowed()); + ValueCountMap priorityMap(m_payload.mutable_prioritymap()); + priorityMap.decCount(jl->Get(i).priority()); + ValueCountMap minRetrieveRequestAgeMap(m_payload.mutable_minretrieverequestagemap()); + minRetrieveRequestAgeMap.decCount(jl->Get(i).minretrieverequestage()); + while (i+1 < (size_t)jl->size()) { + jl->SwapElements(i, i+1); + i++; + } + break; } - break; } - } - // and remove it - if (found) - jl->RemoveLast(); - } while (found); + // and remove it + if (found) + jl->RemoveLast(); + jobRemoved |= found; + } while (found); + } + if (jobRemoved) commit(); } void cta::objectstore::RetrieveQueue::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, diff --git a/objectstore/RetrieveQueue.hpp b/objectstore/RetrieveQueue.hpp index e61e7c3f1edcc940fda399991262d4338b72ce04..033ff1269f274e0030a40a327ff3712665e10fcb 100644 --- a/objectstore/RetrieveQueue.hpp +++ b/objectstore/RetrieveQueue.hpp @@ -78,7 +78,7 @@ public: }; std::list<JobDump> dumpJobs(); - void removeJob(const std::string & retrieveToFileAddress); + void removeJobsAndCommit(const std::list<std::string> & requestsToRemove); // -- Generic parameters std::string getVid(); }; diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 9ad8140309a3f95fee93e3ba290399221e34e92d..fe79b114718d262284f7c4243b85d93129fd137a 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -2346,10 +2346,8 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo j=candidateJobs.erase(j); } // All (most) jobs are now officially owned by our agent. We can hence remove them from the queue. - for (const auto &j: jobsToDequeue) rq.removeJob(j); + rq.removeJobsAndCommit(jobsToDequeue); if (jobsToForget.size()) m_oStoreDB.m_agentReference->removeBatchFromOwnership(jobsToForget, m_oStoreDB.m_objectStore); - // (Possibly intermediate) commit of the queue. We keep the lock for the moment. - rq.commit(); // We can now add the validated jobs to the return value. auto vj = validatedJobs.begin(); while (vj != validatedJobs.end()) {