From 69861709e822894eeb6df4161d648212204c3273 Mon Sep 17 00:00:00 2001 From: Eric Cano <Eric.Cano@cern.ch> Date: Thu, 22 Mar 2018 16:31:37 +0100 Subject: [PATCH] Limited number of parallel requeued jobs in garbage collection. Also cut the main garbage collection function into smaller bits. We requeue jobs in batches of 500. --- objectstore/GarbageCollector.cpp | 654 ++++++++++++++++--------------- objectstore/GarbageCollector.hpp | 10 + 2 files changed, 353 insertions(+), 311 deletions(-) diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index 14bbdf913e..60bb50cca3 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -25,6 +25,7 @@ #include "Helpers.hpp" #include "RootEntry.hpp" #include <algorithm> +#include <unistd.h> namespace cta { namespace objectstore { @@ -176,12 +177,21 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon arl.release(); lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): will cleanup dead agent."); // Return all objects owned by the agent to their respective backup owners - + + OwnedObjectSorter ownedObjectSorter; + std::list<std::shared_ptr<GenericObject>> fetchedObjects; + ownedObjectSorter.fetchOwnedObjects(agent, fetchedObjects, m_objectStore, lc); + ownedObjectSorter.sortFetchedObjects(agent, fetchedObjects, m_objectStore, m_catalogue, lc); + ownedObjectSorter.lockFetchAndUpdateArchiveJobs(agent, m_ourAgentReference, m_objectStore, lc); + ownedObjectSorter.lockFetchAndUpdateRetrieveJobs(agent, m_ourAgentReference, m_objectStore, lc); + ownedObjectSorter.lockFetchAndUpdateOtherObjects(agent, m_ourAgentReference, m_objectStore, m_catalogue, lc); +} + +void GarbageCollector::OwnedObjectSorter::fetchOwnedObjects(Agent& agent, std::list<std::shared_ptr<GenericObject> >& fetchedObjects, Backend & objectStore, log::LogContext & lc) { const auto ownedObjectAddresses = agent.getOwnershipList(); // Parallel fetch (lock free) all the objects to assess their status (check ownership, // type and decide to which queue they will go. std::list<std::shared_ptr<GenericObject>> ownedObjects; - std::list<std::shared_ptr<GenericObject>> fetchedObjects; std::map<GenericObject *, std::unique_ptr<GenericObject::AsyncLockfreeFetcher>> ownedObjectsFetchers; // This will be the list of objects we failed to garbage collect. This means the garbage collection // will be partial (looping?). @@ -192,7 +202,7 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon // 1 launch the async fetch of all the objects. for (auto & obj : ownedObjectAddresses) { // Fetch generic objects - ownedObjects.emplace_back(new GenericObject(obj, m_objectStore)); + ownedObjects.emplace_back(new GenericObject(obj, objectStore)); try { ownedObjectsFetchers[ownedObjects.back().get()].reset(ownedObjects.back()->asyncLockfreeFetch()); } catch (cta::exception::Exception & ex) { @@ -210,23 +220,9 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon lc.log(log::ERR, "In GarbageCollector::cleanupDeadAgent(): failed to asyncLockfreeFetch(): skipping object. Garbage collection will be incomplete."); } } -// if (ownedObjects.back()->exists()) { -// ownedObjectsFetchers[ownedObjects.back().get()].reset(ownedObjects.back()->asyncLockfreeFetch()); -// } else { -// agent.removeFromOwnership(ownedObjects.back()->getAddressIfSet()); -// agent.commit(); -// ownedObjects.pop_back(); -// lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): skipping garbage collection of now gone object."); -// } -// } // 2 find out the result of the fetches - OwnedObjectSorter ownedObjectSorter; bool ownershipUdated=false; - using serializers::ArchiveJobStatus; - std::set<ArchiveJobStatus> inactiveArchiveJobStatuses({ArchiveJobStatus::AJS_Complete, ArchiveJobStatus::AJS_Failed}); - using serializers::RetrieveJobStatus; - std::set<RetrieveJobStatus> inactiveRetrieveJobStatuses({RetrieveJobStatus::RJS_Complete, RetrieveJobStatus::RJS_Failed}); for (auto & obj : ownedObjects) { log::ScopedParamContainer params2(lc); params2.add("objectAddress", obj->getAddressIfSet()); @@ -253,7 +249,16 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon } // The generic objects we are interested in are now also stored in fetchedObjects. ownedObjects.clear(); + if (ownershipUdated) agent.commit(); +} + +void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std::list<std::shared_ptr<GenericObject> >& fetchedObjects, Backend & objectStore, cta::catalogue::Catalogue & catalogue, log::LogContext & lc) { // 3 Now decide the fate of each fetched and owned object. + bool ownershipUdated=false; + using serializers::ArchiveJobStatus; + std::set<ArchiveJobStatus> inactiveArchiveJobStatuses({ArchiveJobStatus::AJS_Complete, ArchiveJobStatus::AJS_Failed}); + using serializers::RetrieveJobStatus; + std::set<RetrieveJobStatus> inactiveRetrieveJobStatuses({RetrieveJobStatus::RJS_Complete, RetrieveJobStatus::RJS_Failed}); for (auto & obj: fetchedObjects) { log::ScopedParamContainer params2(lc); params2.add("objectAddress", obj->getAddressIfSet()); @@ -286,7 +291,7 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon for (auto &j: ar->dumpJobs()) { if ((j.owner == agent.getAddressIfSet() || ar->getOwner() == agent.getAddressIfSet()) && !inactiveArchiveJobStatuses.count(j.status)) { - ownedObjectSorter.archiveQueuesAndRequests[j.tapePool].emplace_back(ar); + archiveQueuesAndRequests[j.tapePool].emplace_back(ar); log::ScopedParamContainer params3(lc); params3.add("tapepool", j.tapePool) .add("copynb", j.copyNb) @@ -318,20 +323,20 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon log::ScopedParamContainer params3(lc); params3.add("fileId", rr->getArchiveFile().archiveFileID); lc.log(log::INFO, "No active retrieve job to requeue found. Marking request for normal GC (and probably deletion)."); - ownedObjectSorter.otherObjects.emplace_back(new GenericObject(rr->getAddressIfSet(), m_objectStore)); + otherObjects.emplace_back(new GenericObject(rr->getAddressIfSet(), objectStore)); break; } std::string vid; try { - vid=Helpers::selectBestRetrieveQueue(candidateVids, m_catalogue, m_objectStore); + vid=Helpers::selectBestRetrieveQueue(candidateVids, catalogue, objectStore); } catch (Helpers::NoTapeAvailableForRetrieve & ex) { log::ScopedParamContainer params3(lc); params3.add("fileId", rr->getArchiveFile().archiveFileID); lc.log(log::INFO, "No available tape found. Marking request for normal GC (and probably deletion)."); - ownedObjectSorter.otherObjects.emplace_back(new GenericObject(rr->getAddressIfSet(), m_objectStore)); + otherObjects.emplace_back(new GenericObject(rr->getAddressIfSet(), objectStore)); break; } - ownedObjectSorter.retrieveQueuesAndRequests[vid].emplace_back(rr); + retrieveQueuesAndRequests[vid].emplace_back(rr); log::ScopedParamContainer params3(lc); // Find copyNb for logging size_t copyNb = std::numeric_limits<size_t>::max(); @@ -347,7 +352,7 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon default: // For other objects, we will not implement any optimization and simply call // their individual garbageCollect method. - ownedObjectSorter.otherObjects.emplace_back(obj); + otherObjects.emplace_back(obj); obj.reset(); break; } @@ -356,334 +361,361 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon // We are now done with the next container. if (ownershipUdated) agent.commit(); fetchedObjects.clear(); - +} + +void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& agent, AgentReference& agentReference, Backend & objectStore, log::LogContext & lc) { // We can now start updating the objects efficiently. We still need to re-fetch them locked // and validate ownership. // // 1) Get the archive requests done. - for (auto & tapepool: ownedObjectSorter.archiveQueuesAndRequests) { - double queueLockFetchTime=0; - double queueProcessAndCommitTime=0; - double requestsUpdatePreparationTime=0; - double requestsUpdatingTime=0; - double queueRecommitTime=0; - uint64_t filesQueued=0; - uint64_t filesDequeued=0; - uint64_t bytesQueued=0; - uint64_t bytesDequeued=0; - uint64_t filesBefore=0; - uint64_t bytesBefore=0; - utils::Timer t; - // Get the archive queue and add references to the jobs in it. - ArchiveQueue aq(m_objectStore); - ScopedExclusiveLock aql; - Helpers::getLockedAndFetchedQueue<ArchiveQueue>(aq, aql, m_ourAgentReference, tapepool.first, lc); - queueLockFetchTime = t.secs(utils::Timer::resetCounter); - auto jobsSummary=aq.getJobsSummary(); - filesBefore=jobsSummary.jobs; - bytesBefore=jobsSummary.bytes; - // We have the queue. We will loop on the requests, add them to the queue. We will launch their updates - // after committing the queue. - std::list<ArchiveQueue::JobToAdd> jtal; - for (auto & ar: tapepool.second) { - // Determine the copy number and feed the queue with it. - for (auto &j: ar->dumpJobs()) { - if (j.tapePool == tapepool.first) { - jtal.push_back({j, ar->getAddressIfSet(), ar->getArchiveFile().archiveFileID, - ar->getArchiveFile().fileSize, ar->getMountPolicy(), ar->getEntryLog().time}); - } + for (auto & tapepool: archiveQueuesAndRequests) { + // The number of objects to requeue could be very high. In order to limit the time taken by the + // individual requeue operations, we limit the number of concurrently requeued objects to an + // arbitrary 500. + while (tapepool.second.size()) { + decltype (tapepool.second) currentJobBatch; + while (tapepool.second.size() && currentJobBatch.size() <= 500) { + currentJobBatch.emplace_back(std::move(tapepool.second.front())); + tapepool.second.pop_front(); } - } - auto addedJobs = aq.addJobsIfNecessaryAndCommit(jtal, m_ourAgentReference, lc); - queueProcessAndCommitTime = t.secs(utils::Timer::resetCounter); - // If we have an unexpected failure, we will re-run the individual garbage collection. Before that, - // we will NOT remove the object from agent's ownership. This variable is declared a bit ahead so - // the goto will not cross its initialization. - std::set<std::string> jobsIndividuallyGCed; - if (!addedJobs.files) { - goto agentCleanupForArchive; - } - // We will keep individual references for each job update we launch so that we make - // no assumption (several jobs could be queued to the same pool, even if not expected - // at the high level). - struct ARUpdatersParams { - std::unique_ptr<ArchiveRequest::AsyncJobOwnerUpdater> updater; - std::shared_ptr<ArchiveRequest> archiveRequest; - uint16_t copyNb; - }; - { - std::list<ARUpdatersParams> arUpdatersParams; - for (auto & ar: tapepool.second) { - for (auto & j: ar->dumpJobs()) { + double queueLockFetchTime=0; + double queueProcessAndCommitTime=0; + double requestsUpdatePreparationTime=0; + double requestsUpdatingTime=0; + double queueRecommitTime=0; + uint64_t filesQueued=0; + uint64_t filesDequeued=0; + uint64_t bytesQueued=0; + uint64_t bytesDequeued=0; + uint64_t filesBefore=0; + uint64_t bytesBefore=0; + utils::Timer t; + // Get the archive queue and add references to the jobs in it. + ArchiveQueue aq(objectStore); + ScopedExclusiveLock aql; + Helpers::getLockedAndFetchedQueue<ArchiveQueue>(aq, aql, agentReference, tapepool.first, lc); + queueLockFetchTime = t.secs(utils::Timer::resetCounter); + auto jobsSummary=aq.getJobsSummary(); + filesBefore=jobsSummary.jobs; + bytesBefore=jobsSummary.bytes; + // We have the queue. We will loop on the requests, add them to the queue. We will launch their updates + // after committing the queue. + std::list<ArchiveQueue::JobToAdd> jtal; + for (auto & ar: currentJobBatch) { + // Determine the copy number and feed the queue with it. + for (auto &j: ar->dumpJobs()) { if (j.tapePool == tapepool.first) { - arUpdatersParams.emplace_back(); - arUpdatersParams.back().archiveRequest = ar; - arUpdatersParams.back().copyNb = j.copyNb; - arUpdatersParams.back().updater.reset( - ar->asyncUpdateJobOwner(j.copyNb, aq.getAddressIfSet(), address)); + jtal.push_back({j, ar->getAddressIfSet(), ar->getArchiveFile().archiveFileID, + ar->getArchiveFile().fileSize, ar->getMountPolicy(), ar->getEntryLog().time}); } } } - requestsUpdatePreparationTime = t.secs(utils::Timer::resetCounter); - // Now collect the results. - std::list<std::string> requestsToDequeue; - for (auto & arup: arUpdatersParams) { - try { - arup.updater->wait(); - // OK, the job made it to the queue - log::ScopedParamContainer params(lc); - params.add("archiveRequestObject", arup.archiveRequest->getAddressIfSet()) - .add("copyNb", arup.copyNb) - .add("fileId", arup.archiveRequest->getArchiveFile().archiveFileID) - .add("tapepool", tapepool.first) - .add("archiveQueueObject", aq.getAddressIfSet()) - .add("garbageCollectedPreviousOwner", agent.getAddressIfSet()); - lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): requeued archive job."); - } catch (cta::exception::Exception & e) { - // Update did not go through. It could be benign - std::string debugType=typeid(e).name(); - if (typeid(e) == typeid(Backend::NoSuchObject)) { - // The object was not present or not owned during update, so we skip it. - // This is nevertheless unexpected (from previous fetch, so this is an error). - log::ScopedParamContainer params(lc); - params.add("archiveRequestObject", arup.archiveRequest->getAddressIfSet()) - .add("copyNb", arup.copyNb) - .add("fileId", arup.archiveRequest->getArchiveFile().archiveFileID) - .add("exceptionType", debugType); - lc.log(log::ERR, "In GarbageCollector::cleanupDeadAgent(): failed to requeue gone/not owned archive job. Removing from queue."); - } else { - // We have an unexpected error. We will handle this with the request-by-request garbage collection. + auto addedJobs = aq.addJobsIfNecessaryAndCommit(jtal, agentReference, lc); + queueProcessAndCommitTime = t.secs(utils::Timer::resetCounter); + // If we have an unexpected failure, we will re-run the individual garbage collection. Before that, + // we will NOT remove the object from agent's ownership. This variable is declared a bit ahead so + // the goto will not cross its initialization. + std::set<std::string> jobsIndividuallyGCed; + if (!addedJobs.files) { + goto agentCleanupForArchive; + } + // We will keep individual references for each job update we launch so that we make + // no assumption (several jobs could be queued to the same pool, even if not expected + // at the high level). + struct ARUpdatersParams { + std::unique_ptr<ArchiveRequest::AsyncJobOwnerUpdater> updater; + std::shared_ptr<ArchiveRequest> archiveRequest; + uint16_t copyNb; + }; + { + std::list<ARUpdatersParams> arUpdatersParams; + for (auto & ar: currentJobBatch) { + for (auto & j: ar->dumpJobs()) { + if (j.tapePool == tapepool.first) { + arUpdatersParams.emplace_back(); + arUpdatersParams.back().archiveRequest = ar; + arUpdatersParams.back().copyNb = j.copyNb; + arUpdatersParams.back().updater.reset( + ar->asyncUpdateJobOwner(j.copyNb, aq.getAddressIfSet(), agent.getAddressIfSet())); + } + } + } + requestsUpdatePreparationTime = t.secs(utils::Timer::resetCounter); + // Now collect the results. + std::list<std::string> requestsToDequeue; + for (auto & arup: arUpdatersParams) { + try { + arup.updater->wait(); + // OK, the job made it to the queue log::ScopedParamContainer params(lc); params.add("archiveRequestObject", arup.archiveRequest->getAddressIfSet()) .add("copyNb", arup.copyNb) .add("fileId", arup.archiveRequest->getArchiveFile().archiveFileID) - .add("exceptionType", debugType) - .add("exceptionMessage", e.getMessageValue()); - lc.log(log::ERR, "In GarbageCollector::cleanupDeadAgent(): failed to requeue archive job with unexpected error. Removing from queue and will re-run individual garbage collection."); - // We will re-run the individual GC for this one. - jobsIndividuallyGCed.insert(arup.archiveRequest->getAddressIfSet()); - ownedObjectSorter.otherObjects.emplace_back(new GenericObject(arup.archiveRequest->getAddressIfSet(), m_objectStore)); + .add("tapepool", tapepool.first) + .add("archiveQueueObject", aq.getAddressIfSet()) + .add("garbageCollectedPreviousOwner", agent.getAddressIfSet()); + lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): requeued archive job."); + } catch (cta::exception::Exception & e) { + // Update did not go through. It could be benign + std::string debugType=typeid(e).name(); + if (typeid(e) == typeid(Backend::NoSuchObject)) { + // The object was not present or not owned during update, so we skip it. + // This is nevertheless unexpected (from previous fetch, so this is an error). + log::ScopedParamContainer params(lc); + params.add("archiveRequestObject", arup.archiveRequest->getAddressIfSet()) + .add("copyNb", arup.copyNb) + .add("fileId", arup.archiveRequest->getArchiveFile().archiveFileID) + .add("exceptionType", debugType); + lc.log(log::ERR, "In GarbageCollector::cleanupDeadAgent(): failed to requeue gone/not owned archive job. Removing from queue."); + } else { + // We have an unexpected error. We will handle this with the request-by-request garbage collection. + log::ScopedParamContainer params(lc); + params.add("archiveRequestObject", arup.archiveRequest->getAddressIfSet()) + .add("copyNb", arup.copyNb) + .add("fileId", arup.archiveRequest->getArchiveFile().archiveFileID) + .add("exceptionType", debugType) + .add("exceptionMessage", e.getMessageValue()); + lc.log(log::ERR, "In GarbageCollector::cleanupDeadAgent(): failed to requeue archive job with unexpected error. Removing from queue and will re-run individual garbage collection."); + // We will re-run the individual GC for this one. + jobsIndividuallyGCed.insert(arup.archiveRequest->getAddressIfSet()); + otherObjects.emplace_back(new GenericObject(arup.archiveRequest->getAddressIfSet(), objectStore)); + } + // In all cases, the object did NOT make it to the queue. + filesDequeued ++; + bytesDequeued += arup.archiveRequest->getArchiveFile().fileSize; + requestsToDequeue.push_back(arup.archiveRequest->getAddressIfSet()); } - // In all cases, the object did NOT make it to the queue. - filesDequeued ++; - bytesDequeued += arup.archiveRequest->getArchiveFile().fileSize; - requestsToDequeue.push_back(arup.archiveRequest->getAddressIfSet()); + } + requestsUpdatingTime = t.secs(utils::Timer::resetCounter); + if (requestsToDequeue.size()) { + aq.removeJobsAndCommit(requestsToDequeue); + log::ScopedParamContainer params(lc); + params.add("archiveQueueObject", aq.getAddressIfSet()); + lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): Cleaned up and re-committed archive queue after error handling."); + queueRecommitTime = t.secs(utils::Timer::resetCounter); } } - requestsUpdatingTime = t.secs(utils::Timer::resetCounter); - if (requestsToDequeue.size()) { - aq.removeJobsAndCommit(requestsToDequeue); + { log::ScopedParamContainer params(lc); - params.add("archiveQueueObject", aq.getAddressIfSet()); - lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): Cleaned up and re-committed archive queue after error handling."); - queueRecommitTime = t.secs(utils::Timer::resetCounter); + auto jobsSummary = aq.getJobsSummary(); + params.add("tapepool", tapepool.first) + .add("archiveQueueObject", aq.getAddressIfSet()) + .add("filesAdded", filesQueued - filesDequeued) + .add("bytesAdded", bytesQueued - bytesDequeued) + .add("filesAddedInitially", filesQueued) + .add("bytesAddedInitially", bytesQueued) + .add("filesDequeuedAfterErrors", filesDequeued) + .add("bytesDequeuedAfterErrors", bytesDequeued) + .add("filesBefore", filesBefore) + .add("bytesBefore", bytesBefore) + .add("filesAfter", jobsSummary.jobs) + .add("bytesAfter", jobsSummary.bytes) + .add("queueLockFetchTime", queueLockFetchTime) + .add("queueProcessAndCommitTime", queueProcessAndCommitTime) + .add("requestsUpdatePreparationTime", requestsUpdatePreparationTime) + .add("requestsUpdatingTime", requestsUpdatingTime) + .add("queueRecommitTime", queueRecommitTime); + lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): Requeued a batch of archive requests."); } - } - { - log::ScopedParamContainer params(lc); - auto jobsSummary = aq.getJobsSummary(); - params.add("tapepool", tapepool.first) - .add("archiveQueueObject", aq.getAddressIfSet()) - .add("filesAdded", filesQueued - filesDequeued) - .add("bytesAdded", bytesQueued - bytesDequeued) - .add("filesAddedInitially", filesQueued) - .add("bytesAddedInitially", bytesQueued) - .add("filesDequeuedAfterErrors", filesDequeued) - .add("bytesDequeuedAfterErrors", bytesDequeued) - .add("filesBefore", filesBefore) - .add("bytesBefore", bytesBefore) - .add("filesAfter", jobsSummary.jobs) - .add("bytesAfter", jobsSummary.bytes) - .add("queueLockFetchTime", queueLockFetchTime) - .add("queueProcessAndCommitTime", queueProcessAndCommitTime) - .add("requestsUpdatePreparationTime", requestsUpdatePreparationTime) - .add("requestsUpdatingTime", requestsUpdatingTime) - .add("queueRecommitTime", queueRecommitTime); - lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): Requeued a batch of archive requests."); - } - // We can now forget pool level list. But before that, we can remove the objects - // from agent ownership if this was the last reference to it. - // The usage of use_count() is safe here because we are in a single threaded environment. - // In a multi threaded environment, its usage would not be appropriate. - // See for example http://en.cppreference.com/w/cpp/memory/shared_ptr/use_count - agentCleanupForArchive: - bool ownershipUpdated=false; - for (auto &ar: tapepool.second) { - if (ar.use_count() == 1 && !jobsIndividuallyGCed.count(ar->getAddressIfSet())) { - // This tapepool is the last users of this archive request. We will remove is from ownership. - agent.removeFromOwnership(ar->getAddressIfSet()); - ownershipUpdated=true; + // We can now forget pool level list. But before that, we can remove the objects + // from agent ownership if this was the last reference to it. + // The usage of use_count() is safe here because we are in a single threaded environment. + // In a multi threaded environment, its usage would not be appropriate. + // See for example http://en.cppreference.com/w/cpp/memory/shared_ptr/use_count + agentCleanupForArchive: + bool ownershipUpdated=false; + for (auto &ar: currentJobBatch) { + if (ar.use_count() == 1 && !jobsIndividuallyGCed.count(ar->getAddressIfSet())) { + // This tapepool is the last users of this archive request. We will remove is from ownership. + agent.removeFromOwnership(ar->getAddressIfSet()); + ownershipUpdated=true; + } } + if (ownershipUpdated) agent.commit(); + currentJobBatch.clear(); + // Sleep a bit if we have oher rounds to go not to hog the queue + if (tapepool.second.size()) sleep (5); } - if (ownershipUpdated) agent.commit(); - tapepool.second.clear(); } - +} + +void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& agent, AgentReference& agentReference, Backend & objectStore, log::LogContext & lc) { // 2) Get the retrieve requests done. They are simpler as retrieve requests are fully owned. // Then should hence not have changes since we pre-fetched them. - for (auto & tape: ownedObjectSorter.retrieveQueuesAndRequests) { - double queueLockFetchTime=0; - double queueProcessAndCommitTime=0; - double requestsUpdatePreparationTime=0; - double requestsUpdatingTime=0; - double queueRecommitTime=0; - uint64_t filesQueued=0; - uint64_t filesDequeued=0; - uint64_t bytesQueued=0; - uint64_t bytesDequeued=0; - uint64_t filesBefore=0; - uint64_t bytesBefore=0; - utils::Timer t; - // Get the retrieve queue and add references to the jobs to it. - RetrieveQueue rq(m_objectStore); - ScopedExclusiveLock rql; - Helpers::getLockedAndFetchedQueue<RetrieveQueue>(rq,rql, m_ourAgentReference, tape.first, lc); - queueLockFetchTime = t.secs(utils::Timer::resetCounter); - auto jobsSummary=rq.getJobsSummary(); - filesBefore=jobsSummary.files; - bytesBefore=jobsSummary.bytes; - // Prepare the list of requests to add to the queue (if needed). - std::list<RetrieveQueue::JobToAdd> jta; - // We have the queue. We will loop on the requests, add them to the list. We will launch their updates - // after committing the queue. - for (auto & rr: tape.second) { - // Determine the copy number and feed the queue with it. - for (auto &tf: rr->getArchiveFile().tapeFiles) { - if (tf.second.vid == tape.first) { - jta.push_back({tf.second.copyNb, tf.second.fSeq, rr->getAddressIfSet(), rr->getArchiveFile().fileSize, - rr->getRetrieveFileQueueCriteria().mountPolicy, rr->getEntryLog().time}); - } + for (auto & tape: retrieveQueuesAndRequests) { + while (tape.second.size()) { + decltype (tape.second) currentJobBatch; + while (tape.second.size() && currentJobBatch.size() <= 500) { + currentJobBatch.emplace_back(std::move(tape.second.front())); + tape.second.pop_front(); } - } - auto addedJobs = rq.addJobsIfNecessaryAndCommit(jta, m_ourAgentReference, lc); - queueProcessAndCommitTime = t.secs(utils::Timer::resetCounter); - // If we have an unexpected failure, we will re-run the individual garbage collection. Before that, - // we will NOT remove the object from agent's ownership. This variable is declared a bit ahead so - // the goto will not cross its initialization. - std::set<std::string> jobsIndividuallyGCed; - if (!addedJobs.files) { - goto agentCleanupForRetrieve; - } - // We will keep individual references for each job update we launch so that we make - // our life easier downstream. - struct RRUpdatedParams { - std::unique_ptr<RetrieveRequest::AsyncOwnerUpdater> updater; - std::shared_ptr<RetrieveRequest> retrieveRequest; - uint16_t copyNb; - }; - { - std::list<RRUpdatedParams> rrUpdatersParams; - for (auto & rr: tape.second) { - for (auto & tf: rr->getArchiveFile().tapeFiles) { + double queueLockFetchTime=0; + double queueProcessAndCommitTime=0; + double requestsUpdatePreparationTime=0; + double requestsUpdatingTime=0; + double queueRecommitTime=0; + uint64_t filesQueued=0; + uint64_t filesDequeued=0; + uint64_t bytesQueued=0; + uint64_t bytesDequeued=0; + uint64_t filesBefore=0; + uint64_t bytesBefore=0; + utils::Timer t; + // Get the retrieve queue and add references to the jobs to it. + RetrieveQueue rq(objectStore); + ScopedExclusiveLock rql; + Helpers::getLockedAndFetchedQueue<RetrieveQueue>(rq,rql, agentReference, tape.first, lc); + queueLockFetchTime = t.secs(utils::Timer::resetCounter); + auto jobsSummary=rq.getJobsSummary(); + filesBefore=jobsSummary.files; + bytesBefore=jobsSummary.bytes; + // Prepare the list of requests to add to the queue (if needed). + std::list<RetrieveQueue::JobToAdd> jta; + // We have the queue. We will loop on the requests, add them to the list. We will launch their updates + // after committing the queue. + for (auto & rr: currentJobBatch) { + // Determine the copy number and feed the queue with it. + for (auto &tf: rr->getArchiveFile().tapeFiles) { if (tf.second.vid == tape.first) { - rrUpdatersParams.emplace_back(); - rrUpdatersParams.back().retrieveRequest = rr; - rrUpdatersParams.back().copyNb = tf.second.copyNb; - rrUpdatersParams.back().updater.reset(rr->asyncUpdateOwner(tf.second.copyNb, - rq.getAddressIfSet(), agent.getAddressIfSet())); + jta.push_back({tf.second.copyNb, tf.second.fSeq, rr->getAddressIfSet(), rr->getArchiveFile().fileSize, + rr->getRetrieveFileQueueCriteria().mountPolicy, rr->getEntryLog().time}); } } } - requestsUpdatePreparationTime = t.secs(utils::Timer::resetCounter); - // Now collect the results. - std::list<std::string> requestsToDequeue; - for (auto & rrup: rrUpdatersParams) { - try { - rrup.updater->wait(); - // OK, the job made it to the queue - log::ScopedParamContainer params(lc); - params.add("retrieveRequestObject", rrup.retrieveRequest->getAddressIfSet()) - .add("copyNb", rrup.copyNb) - .add("fileId", rrup.retrieveRequest->getArchiveFile().archiveFileID) - .add("vid", tape.first) - .add("retreveQueueObject", rq.getAddressIfSet()) - .add("garbageCollectedPreviousOwner", agent.getAddressIfSet()); - lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): requeued retrieve job."); - } catch (cta::exception::Exception & e) { - // Update did not go through. It could be benign - std::string debugType=typeid(e).name(); - if (typeid(e) == typeid(Backend::NoSuchObject) || - typeid(e) == typeid(objectstore::ArchiveRequest::WrongPreviousOwner)) { - // The object was not present or not owned during update, so we skip it. - // This is nevertheless unexpected (from previous fetch, so this is an error). - log::ScopedParamContainer params(lc); - params.add("retrieveRequestObject", rrup.retrieveRequest->getAddressIfSet()) - .add("copyNb", rrup.copyNb) - .add("fileId", rrup.retrieveRequest->getArchiveFile().archiveFileID) - .add("exceptionType", debugType); - lc.log(log::ERR, "In GarbageCollector::cleanupDeadAgent(): failed to requeue gone/not owned retrieve job. Removing from queue."); - } else { - // We have an unexpected error. Log it, and remove form queue. Not much we can - // do at this point. + auto addedJobs = rq.addJobsIfNecessaryAndCommit(jta, agentReference, lc); + queueProcessAndCommitTime = t.secs(utils::Timer::resetCounter); + // If we have an unexpected failure, we will re-run the individual garbage collection. Before that, + // we will NOT remove the object from agent's ownership. This variable is declared a bit ahead so + // the goto will not cross its initialization. + std::set<std::string> jobsIndividuallyGCed; + if (!addedJobs.files) { + goto agentCleanupForRetrieve; + } + // We will keep individual references for each job update we launch so that we make + // our life easier downstream. + struct RRUpdatedParams { + std::unique_ptr<RetrieveRequest::AsyncOwnerUpdater> updater; + std::shared_ptr<RetrieveRequest> retrieveRequest; + uint16_t copyNb; + }; + { + std::list<RRUpdatedParams> rrUpdatersParams; + for (auto & rr: currentJobBatch) { + for (auto & tf: rr->getArchiveFile().tapeFiles) { + if (tf.second.vid == tape.first) { + rrUpdatersParams.emplace_back(); + rrUpdatersParams.back().retrieveRequest = rr; + rrUpdatersParams.back().copyNb = tf.second.copyNb; + rrUpdatersParams.back().updater.reset(rr->asyncUpdateOwner(tf.second.copyNb, + rq.getAddressIfSet(), agent.getAddressIfSet())); + } + } + } + requestsUpdatePreparationTime = t.secs(utils::Timer::resetCounter); + // Now collect the results. + std::list<std::string> requestsToDequeue; + for (auto & rrup: rrUpdatersParams) { + try { + rrup.updater->wait(); + // OK, the job made it to the queue log::ScopedParamContainer params(lc); params.add("retrieveRequestObject", rrup.retrieveRequest->getAddressIfSet()) .add("copyNb", rrup.copyNb) .add("fileId", rrup.retrieveRequest->getArchiveFile().archiveFileID) - .add("exceptionType", debugType) - .add("exceptionMessage", e.getMessageValue()); - lc.log(log::ERR, "In GarbageCollector::cleanupDeadAgent(): failed to requeue retrieve job with unexpected error. Removing from queue and will re-run individual garbage collection."); - // We will re-run the individual GC for this one. - jobsIndividuallyGCed.insert(rrup.retrieveRequest->getAddressIfSet()); - ownedObjectSorter.otherObjects.emplace_back(new GenericObject(rrup.retrieveRequest->getAddressIfSet(), m_objectStore)); + .add("vid", tape.first) + .add("retreveQueueObject", rq.getAddressIfSet()) + .add("garbageCollectedPreviousOwner", agent.getAddressIfSet()); + lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): requeued retrieve job."); + } catch (cta::exception::Exception & e) { + // Update did not go through. It could be benign + std::string debugType=typeid(e).name(); + if (typeid(e) == typeid(Backend::NoSuchObject) || + typeid(e) == typeid(objectstore::ArchiveRequest::WrongPreviousOwner)) { + // The object was not present or not owned during update, so we skip it. + // This is nevertheless unexpected (from previous fetch, so this is an error). + log::ScopedParamContainer params(lc); + params.add("retrieveRequestObject", rrup.retrieveRequest->getAddressIfSet()) + .add("copyNb", rrup.copyNb) + .add("fileId", rrup.retrieveRequest->getArchiveFile().archiveFileID) + .add("exceptionType", debugType); + lc.log(log::ERR, "In GarbageCollector::cleanupDeadAgent(): failed to requeue gone/not owned retrieve job. Removing from queue."); + } else { + // We have an unexpected error. Log it, and remove form queue. Not much we can + // do at this point. + log::ScopedParamContainer params(lc); + params.add("retrieveRequestObject", rrup.retrieveRequest->getAddressIfSet()) + .add("copyNb", rrup.copyNb) + .add("fileId", rrup.retrieveRequest->getArchiveFile().archiveFileID) + .add("exceptionType", debugType) + .add("exceptionMessage", e.getMessageValue()); + lc.log(log::ERR, "In GarbageCollector::cleanupDeadAgent(): failed to requeue retrieve job with unexpected error. Removing from queue and will re-run individual garbage collection."); + // We will re-run the individual GC for this one. + jobsIndividuallyGCed.insert(rrup.retrieveRequest->getAddressIfSet()); + otherObjects.emplace_back(new GenericObject(rrup.retrieveRequest->getAddressIfSet(), objectStore)); + } + // In all cases, the object did NOT make it to the queue. + filesDequeued ++; + bytesDequeued += rrup.retrieveRequest->getArchiveFile().fileSize; + requestsToDequeue.push_back(rrup.retrieveRequest->getAddressIfSet()); } - // In all cases, the object did NOT make it to the queue. - filesDequeued ++; - bytesDequeued += rrup.retrieveRequest->getArchiveFile().fileSize; - requestsToDequeue.push_back(rrup.retrieveRequest->getAddressIfSet()); + } + requestsUpdatingTime = t.secs(utils::Timer::resetCounter); + if (requestsToDequeue.size()) { + rq.removeJobsAndCommit(requestsToDequeue); + log::ScopedParamContainer params(lc); + params.add("retreveQueueObject", rq.getAddressIfSet()); + lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): Cleaned up and re-committed retrieve queue after error handling."); + queueRecommitTime = t.secs(utils::Timer::resetCounter); } } - requestsUpdatingTime = t.secs(utils::Timer::resetCounter); - if (requestsToDequeue.size()) { - rq.removeJobsAndCommit(requestsToDequeue); + { log::ScopedParamContainer params(lc); - params.add("retreveQueueObject", rq.getAddressIfSet()); - lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): Cleaned up and re-committed retrieve queue after error handling."); - queueRecommitTime = t.secs(utils::Timer::resetCounter); + auto jobsSummary = rq.getJobsSummary(); + params.add("vid", tape.first) + .add("retrieveQueueObject", rq.getAddressIfSet()) + .add("filesAdded", filesQueued - filesDequeued) + .add("bytesAdded", bytesQueued - bytesDequeued) + .add("filesAddedInitially", filesQueued) + .add("bytesAddedInitially", bytesQueued) + .add("filesDequeuedAfterErrors", filesDequeued) + .add("bytesDequeuedAfterErrors", bytesDequeued) + .add("filesBefore", filesBefore) + .add("bytesBefore", bytesBefore) + .add("filesAfter", jobsSummary.files) + .add("bytesAfter", jobsSummary.bytes) + .add("queueLockFetchTime", queueLockFetchTime) + .add("queuePreparationTime", queueProcessAndCommitTime) + .add("requestsUpdatePreparationTime", requestsUpdatePreparationTime) + .add("requestsUpdatingTime", requestsUpdatingTime) + .add("queueRecommitTime", queueRecommitTime); + lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): Requeued a batch of retrieve requests."); } - } - { - log::ScopedParamContainer params(lc); - auto jobsSummary = rq.getJobsSummary(); - params.add("vid", tape.first) - .add("archiveQueueObject", rq.getAddressIfSet()) - .add("filesAdded", filesQueued - filesDequeued) - .add("bytesAdded", bytesQueued - bytesDequeued) - .add("filesAddedInitially", filesQueued) - .add("bytesAddedInitially", bytesQueued) - .add("filesDequeuedAfterErrors", filesDequeued) - .add("bytesDequeuedAfterErrors", bytesDequeued) - .add("filesBefore", filesBefore) - .add("bytesBefore", bytesBefore) - .add("filesAfter", jobsSummary.files) - .add("bytesAfter", jobsSummary.bytes) - .add("queueLockFetchTime", queueLockFetchTime) - .add("queuePreparationTime", queueProcessAndCommitTime) - .add("requestsUpdatePreparationTime", requestsUpdatePreparationTime) - .add("requestsUpdatingTime", requestsUpdatingTime) - .add("queueRecommitTime", queueRecommitTime); - lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): Requeued a batch of retrieve requests."); - } - // We can now forget pool level list. But before that, we can remove the objects - // from agent ownership if this was the last reference to it. - // The usage of use_count() is safe here because we are in a single threaded environment. - // In a multi threaded environment, its usage would not be appropriate. - // See for example http://en.cppreference.com/w/cpp/memory/shared_ptr/use_count - agentCleanupForRetrieve: - bool ownershipUpdated=false; - for (auto &rr: tape.second) { - if (rr.use_count() == 1 && !jobsIndividuallyGCed.count(rr->getAddressIfSet())) { - // This tapepool is the last users of this archive request. We will remove is from ownership. - agent.removeFromOwnership(rr->getAddressIfSet()); - ownershipUpdated=true; + // We can now forget pool level list. But before that, we can remove the objects + // from agent ownership if this was the last reference to it. + // The usage of use_count() is safe here because we are in a single threaded environment. + // In a multi threaded environment, its usage would not be appropriate. + // See for example http://en.cppreference.com/w/cpp/memory/shared_ptr/use_count + agentCleanupForRetrieve: + bool ownershipUpdated=false; + for (auto &rr: currentJobBatch) { + if (rr.use_count() == 1 && !jobsIndividuallyGCed.count(rr->getAddressIfSet())) { + // This tapepool is the last users of this archive request. We will remove is from ownership. + agent.removeFromOwnership(rr->getAddressIfSet()); + ownershipUpdated=true; + } } + if (ownershipUpdated) agent.commit(); + currentJobBatch.clear(); + // Sleep a bit if we have oher rounds to go not to hog the queue + if (tape.second.size()) sleep (5); } - if (ownershipUpdated) agent.commit(); - tape.second.clear(); } - +} + +void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateOtherObjects(Agent& agent, AgentReference& agentReference, Backend & objectStore, cta::catalogue::Catalogue & catalogue, log::LogContext & lc) { // 3) are done with the objects requiring mutualized queueing, and hence special treatement. // The rest will be garbage collected on a object-by-object basis. - for (auto & go : ownedObjectSorter.otherObjects) { + for (auto & go : otherObjects) { // Find the object log::ScopedParamContainer params2(lc); params2.add("objectAddress", go->getAddressIfSet()); @@ -693,7 +725,7 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon go->fetch(); // Call GenericOpbject's garbage collect method, which in turn will // delegate to the object type's garbage collector. - go->garbageCollectDispatcher(goLock, address, m_ourAgentReference, lc, m_catalogue); + go->garbageCollectDispatcher(goLock, agent.getAddressIfSet(), agentReference, lc, catalogue); lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): garbage collected owned object."); } else { lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): skipping garbage collection of now gone object."); @@ -706,7 +738,7 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon agent.removeAndUnregisterSelf(lc); lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): agent entry removed."); // We can remove the agent from our own ownership. - m_ourAgentReference.removeFromOwnership(address, m_objectStore); + agentReference.removeFromOwnership(agent.getAddressIfSet(), objectStore); } }} diff --git a/objectstore/GarbageCollector.hpp b/objectstore/GarbageCollector.hpp index e5f2b77160..70f8c4deae 100644 --- a/objectstore/GarbageCollector.hpp +++ b/objectstore/GarbageCollector.hpp @@ -57,6 +57,16 @@ public: std::map<std::string, std::list<std::shared_ptr <ArchiveRequest>>> archiveQueuesAndRequests; std::map<std::string, std::list<std::shared_ptr <RetrieveRequest>>> retrieveQueuesAndRequests; std::list<std::shared_ptr<GenericObject>> otherObjects; + /// Fill up the fetchedObjects with objects of interest. + void fetchOwnedObjects(Agent & agent, std::list<std::shared_ptr<GenericObject>> & fetchedObjects, Backend & objectStore, log::LogContext & lc); + /// Fill up the sorter with the fetched objects + void sortFetchedObjects(Agent & agent, std::list<std::shared_ptr<GenericObject>> & fetchedObjects, Backend & objectStore, cta::catalogue::Catalogue & catalogue, log::LogContext & lc); + /// Lock, fetch and update archive jobs + void lockFetchAndUpdateArchiveJobs(Agent & agent, AgentReference & agentReference, Backend & objectStore, log::LogContext & lc); + /// Lock, fetch and update retrieve jobs + void lockFetchAndUpdateRetrieveJobs(Agent & agent, AgentReference & agentReference, Backend & objectStore, log::LogContext & lc); + // Lock, fetch and update other objects + void lockFetchAndUpdateOtherObjects(Agent & agent, AgentReference & agentReference, Backend & objectStore, cta::catalogue::Catalogue & catalogue, log::LogContext & lc); }; private: Backend & m_objectStore; -- GitLab