diff --git a/objectstore/Agent.hpp b/objectstore/Agent.hpp index c17fbccbc67d20beb229cafa59f7d5ef96ea7104..aaf5111294e16d30ab9d053f136285cd34f614b9 100644 --- a/objectstore/Agent.hpp +++ b/objectstore/Agent.hpp @@ -59,61 +59,11 @@ public: void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, cta::catalogue::Catalogue & catalogue) override; - /* class ScopedIntent { - public: - ScopedIntent(Agent & agent, std::string container, std::string name, serializers::ObjectType objectType): - m_agent(agent), m_container(container), m_name(name), m_objectType(objectType), m_present(false) { - m_agent.addToOwnership(m_name); - m_present = true; - } - void removeFromIntent() { - if(!m_present) return; - m_agent.removeFromOwnership(m_name); - m_present = false; - } - ~ScopedIntent() { - try { - removeFromIntent(); - } catch (std::exception &) { - } catch (...) {throw;} - } - private: - Agent & m_agent; - std::string m_container; - std::string m_name; - serializers::ObjectType m_objectType; - bool m_present; - };*/ - - /*class ScopedOwnership { - public: - ScopedOwnership(Agent & agent, std::string name): - m_agent(agent), m_name(name), m_present(false) { - m_agent.addToOwnership(m_name); - m_present = true; - } - void removeFromOwnership() { - if(!m_present) return; - m_agent.removeFromOwnership( m_name); - m_present = false; - } - ~ScopedOwnership() { - try { - removeFromOwnership(); - } catch (std::exception &) { - } catch (...) {throw;} - } - private: - Agent & m_agent; - std::string m_name; - bool m_present; - };*/ - private: void addToOwnership(std::string name); void removeFromOwnership(std::string name); - + public: std::list<std::string> getOwnershipList(); diff --git a/objectstore/Algorithms.hpp b/objectstore/Algorithms.hpp index de187213792ea93faa85fec8154caf91e43f7ba3..c2002bf6356560ec5dbf1d8a0b83332a0338b27e 100644 --- a/objectstore/Algorithms.hpp +++ b/objectstore/Algorithms.hpp @@ -91,6 +91,12 @@ public: }; typedef std::set<ElementAddress> ElementsToSkipSet; + class OwnershipSwitchFailure: public cta::exception::Exception { + public: + OwnershipSwitchFailure(const std::string & message): cta::exception::Exception(message) {}; + typename OpFailure<InsertedElement>::list failedElements; + }; + static void trimContainerIfNeeded(Container & cont); CTA_GENERATE_EXCEPTION_CLASS(NoSuchContainer); @@ -102,7 +108,10 @@ public: log::LogContext & lc); static void getLockedAndFetchedNoCreate(Container & cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, log::LogContext & lc); - static void addReferencesAndCommit(Container & cont, ElementMemoryContainer & elemMemCont); + static void addReferencesAndCommit(Container & cont, typename InsertedElement::list & elemMemCont, + AgentReference & agentRef, log::LogContext & lc); + static void addReferencesIfNecessaryAndCommit(Container & cont, typename InsertedElement::list & elemMemCont, + AgentReference & agentRef, log::LogContext & lc); void removeReferencesAndCommit(Container & cont, typename OpFailure<InsertedElement>::list & elementsOpFailures); void removeReferencesAndCommit(Container & cont, std::list<ElementAddress>& elementAddressList); static ElementPointerContainer switchElementsOwnership(ElementMemoryContainer & elemMemCont, const ContainerAddress & contAddress, @@ -121,12 +130,13 @@ public: typedef typename ContainerTraits<C>::InsertedElement InsertedElement; typedef typename ContainerTraits<C>::PopCriteria PopCriteria; + typedef typename ContainerTraits<C>::OwnershipSwitchFailure OwnershipSwitchFailure; - /** Reference objects in the container and then switch their ownership them. Objects - * are provided existing and owned by algorithm's agent. Returns a list of - * @returns list of elements for which the addition or ownership switch failed. - * @throws */ + /** Reference objects in the container and then switch their ownership. Objects + * are provided existing and owned by algorithm's agent. + */ void referenceAndSwitchOwnership(const typename ContainerTraits<C>::ContainerIdentifyer & contId, + const typename ContainerTraits<C>::ContainerIdentifyer & prevContId, typename ContainerTraits<C>::InsertedElement::list & elements, log::LogContext & lc) { C cont(m_backend); ScopedExclusiveLock contLock; @@ -135,7 +145,7 @@ public: ContainerTraits<C>::getLockedAndFetched(cont, contLock, m_agentReference, contId, lc); ContainerTraits<C>::addReferencesAndCommit(cont, elements, m_agentReference, lc); auto failedOwnershipSwitchElements = ContainerTraits<C>::switchElementsOwnership(elements, cont.getAddressIfSet(), - m_agentReference.getAgentAddress(), timingList, t, lc); + prevContId, timingList, t, lc); // If ownership switching failed, remove failed object from queue to not leave stale pointers. if (failedOwnershipSwitchElements.size()) { ContainerTraits<C>::removeReferencesAndCommit(cont, failedOwnershipSwitchElements); @@ -167,6 +177,69 @@ public: } } + /** Reference objects in the container if needed and then switch their ownership (if needed). Objects + * are expected to be owned by agent, and not listed in the container but situations might vary. + * This function is typically used by the garbage collector. We do noe take care of dereferencing + * the object from the caller. + */ + void referenceAndSwitchOwnershipIfNecessary(const typename ContainerTraits<C>::ContainerIdentifyer & contId, + typename ContainerTraits<C>::ContainerAddress & previousOwnerAddress, + typename ContainerTraits<C>::ContainerAddress & contAddress, + typename ContainerTraits<C>::InsertedElement::list & elements, log::LogContext & lc) { + C cont(m_backend); + ScopedExclusiveLock contLock; + log::TimingList timingList; + utils::Timer t; + ContainerTraits<C>::getLockedAndFetched(cont, contLock, m_agentReference, contId, lc); + contAddress = cont.getAddressIfSet(); + auto contSummaryBefore = ContainerTraits<C>::getContainerSummary(cont); + timingList.insertAndReset("queueLockFetchTime", t); + ContainerTraits<C>::addReferencesIfNecessaryAndCommit(cont, elements, m_agentReference, lc); + timingList.insertAndReset("queueProcessAndCommitTime", t); + auto failedOwnershipSwitchElements = ContainerTraits<C>::switchElementsOwnership(elements, cont.getAddressIfSet(), + previousOwnerAddress, timingList, t, lc); + timingList.insertAndReset("requestsUpdatingTime", t); + // If ownership switching failed, remove failed object from queue to not leave stale pointers. + if (failedOwnershipSwitchElements.size()) { + ContainerTraits<C>::removeReferencesAndCommit(cont, failedOwnershipSwitchElements); + timingList.insertAndReset("queueRecommitTime", t); + } + // We are now done with the container. + auto contSummaryAfter = ContainerTraits<C>::getContainerSummary(cont); + contLock.release(); + timingList.insertAndReset("queueUnlockTime", t); + log::ScopedParamContainer params(lc); + params.add("C", ContainerTraits<C>::c_containerTypeName) + .add(ContainerTraits<C>::c_identifyerType, contId) + .add("containerAddress", cont.getAddressIfSet()); + contSummaryAfter.addDeltaToLog(contSummaryBefore, params); + timingList.addToLog(params); + if (failedOwnershipSwitchElements.empty()) { + // That's it, we're done. + lc.log(log::INFO, "In ContainerAlgorithms::referenceAndSwitchOwnershipIfNecessary(): Requeued a batch of elements."); + return; + } else { + // Bad case: just return the failure set to the caller. + typename ContainerTraits<C>::OwnershipSwitchFailure failureEx( + "In ContainerAlgorithms<>::referenceAndSwitchOwnershipIfNecessar(): failed to switch ownership of some elements"); + failureEx.failedElements = failedOwnershipSwitchElements; + params.add("errorCount", failedOwnershipSwitchElements.size()); + lc.log(log::WARNING, "In ContainerAlgorithms::referenceAndSwitchOwnershipIfNecessary(): " + "Encountered problems while requeuing a batch of elements"); + throw failureEx; + } + } + + /** + * Addition of jobs to container. Convenience overload for cases when current agent is the previous owner + * (most cases except garbage collection). + */ + void referenceAndSwitchOwnership(const typename ContainerTraits<C>::ContainerIdentifyer & contId, + typename ContainerTraits<C>::InsertedElement::list & elements, log::LogContext & lc) { + referenceAndSwitchOwnership(contId, m_agentReference.getAgentAddress(), elements, lc); + } + + typename ContainerTraits<C>::PoppedElementsBatch popNextBatch(const typename ContainerTraits<C>::ContainerIdentifyer & contId, typename ContainerTraits<C>::PopCriteria & popCriteria, log::LogContext & lc) { // Prepare the return value diff --git a/objectstore/ArchiveQueueAlgorithms.cpp b/objectstore/ArchiveQueueAlgorithms.cpp index b194a5a64f8ac9f4080a219d43ed4331992dcd59..6d6844fd8e13831ee4053f4c70a7a9aa2758cf6d 100644 --- a/objectstore/ArchiveQueueAlgorithms.cpp +++ b/objectstore/ArchiveQueueAlgorithms.cpp @@ -46,6 +46,21 @@ void ContainerTraits<ArchiveQueue>::addReferencesAndCommit(Container& cont, Inse cont.addJobsAndCommit(jobsToAdd, agentRef, lc); } +void ContainerTraits<ArchiveQueue>::addReferencesIfNecessaryAndCommit(Container& cont, InsertedElement::list& elemMemCont, + AgentReference& agentRef, log::LogContext& lc) { + std::list<ArchiveQueue::JobToAdd> jobsToAdd; + for (auto & e: elemMemCont) { + ElementDescriptor jd; + jd.copyNb = e.copyNb; + jd.tapePool = cont.getTapePool(); + jd.owner = cont.getAddressIfSet(); + ArchiveRequest & ar = *e.archiveRequest; + jobsToAdd.push_back({jd, ar.getAddressIfSet(), e.archiveFile.archiveFileID, e.archiveFile.fileSize, + e.mountPolicy, time(nullptr)}); + } + cont.addJobsIfNecessaryAndCommit(jobsToAdd, agentRef, lc); +} + void ContainerTraits<ArchiveQueue>::removeReferencesAndCommit(Container& cont, OpFailure<InsertedElement>::list& elementsOpFailures) { std::list<std::string> elementsToRemove; for (auto & eof: elementsOpFailures) { diff --git a/objectstore/ArchiveQueueAlgorithms.hpp b/objectstore/ArchiveQueueAlgorithms.hpp index 2909ee319dc118fdc390f5e1797499ae6d9c9fe1..a06a019a85b934b75cbb21ba5178fa99e5a24a5a 100644 --- a/objectstore/ArchiveQueueAlgorithms.hpp +++ b/objectstore/ArchiveQueueAlgorithms.hpp @@ -32,7 +32,7 @@ public: static const std::string c_containerTypeName; //= "ArchiveQueue"; static const std::string c_identifyerType; // = "tapepool"; struct InsertedElement { - std::unique_ptr<ArchiveRequest> archiveRequest; + std::shared_ptr<ArchiveRequest> archiveRequest; uint16_t copyNb; cta::common::dataStructures::ArchiveFile archiveFile; cta::common::dataStructures::MountPolicy mountPolicy; @@ -68,6 +68,9 @@ public: static void addReferencesAndCommit(Container & cont, InsertedElement::list & elemMemCont, AgentReference & agentRef, log::LogContext & lc); + static void addReferencesIfNecessaryAndCommit(Container & cont, InsertedElement::list & elemMemCont, + AgentReference & agentRef, log::LogContext & lc); + static void removeReferencesAndCommit(Container & cont, OpFailure<InsertedElement>::list & elementsOpFailures); static void removeReferencesAndCommit(Container & cont, std::list<ElementAddress>& elementAddressList); diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index 232f8e3d5befc037655af2d5978d681ea6d39c3e..d5b3423848c9f47c977ea1acb1294c131bcce6e8 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -474,10 +474,13 @@ ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint16 auto *jl=payload.mutable_jobs(); for (auto j=jl->begin(); j!=jl->end(); j++) { if (j->copynb() == copyNumber) { - if (j->owner() != previousOwner) { - throw Backend::WrongPreviousOwner("In ArchiveRequest::asyncUpdateJobOwner()::lambda(): Job not owned."); + // The owner might already be the right one (in garbage collection cases), in which case, it's job done. + if (j->owner() != owner) { + if (j->owner() != previousOwner) { + throw Backend::WrongPreviousOwner("In ArchiveRequest::asyncUpdateJobOwner()::lambda(): Job not owned."); + } + j->set_owner(owner); } - j->set_owner(owner); // We also need to gather all the job content for the user to get in-memory // representation. // TODO this is an unfortunate duplication of the getXXX() members of ArchiveRequest. diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index 45f63f7c16c093bf4b9fa50cf05b8391aea5d450..235b0df3ee589cbd1ccde48002942f2722e26ea0 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -20,7 +20,7 @@ #include "AgentReference.hpp" #include "ArchiveRequest.hpp" #include "RetrieveRequest.hpp" -#include "ArchiveQueue.hpp" +#include "ArchiveQueueAlgorithms.hpp" #include "RetrieveQueue.hpp" #include "Helpers.hpp" #include "RootEntry.hpp" @@ -191,7 +191,8 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, std::list<l ownedObjectSorter.lockFetchAndUpdateOtherObjects(agent, m_ourAgentReference, m_objectStore, m_catalogue, lc); } -void GarbageCollector::OwnedObjectSorter::fetchOwnedObjects(Agent& agent, std::list<std::shared_ptr<GenericObject> >& fetchedObjects, Backend & objectStore, log::LogContext & lc) { +void GarbageCollector::OwnedObjectSorter::fetchOwnedObjects(Agent& agent, std::list<std::shared_ptr<GenericObject> >& fetchedObjects, + Backend & objectStore, log::LogContext & lc) { const auto ownedObjectAddresses = agent.getOwnershipList(); // Parallel fetch (lock free) all the objects to assess their status (check ownership, // type and decide to which queue they will go. @@ -221,7 +222,8 @@ void GarbageCollector::OwnedObjectSorter::fetchOwnedObjects(Agent& agent, std::l log::ScopedParamContainer params(lc); params.add("objectAddress", obj) .add("exceptionMessage", ex.getMessageValue()); - lc.log(log::ERR, "In GarbageCollector::OwnedObjectSorter::fetchOwnedObjects(): failed to asyncLockfreeFetch(): skipping object. Garbage collection will be incomplete."); + lc.log(log::ERR, "In GarbageCollector::OwnedObjectSorter::fetchOwnedObjects(): " + "failed to asyncLockfreeFetch(): skipping object. Garbage collection will be incomplete."); } } @@ -243,7 +245,8 @@ void GarbageCollector::OwnedObjectSorter::fetchOwnedObjects(Agent& agent, std::l // Again, we have a problem. We will skip the object and have an incomplete GC. skippedObjects.push_back(obj->getAddressIfSet()); params2.add("exceptionMessage", ex.getMessageValue()); - lc.log(log::ERR, "In GarbageCollector::OwnedObjectSorter::fetchOwnedObjects(): failed to AsyncLockfreeFetch::wait(): skipping object. Garbage collection will be incomplete."); + lc.log(log::ERR, "In GarbageCollector::OwnedObjectSorter::fetchOwnedObjects(): " + "failed to AsyncLockfreeFetch::wait(): skipping object. Garbage collection will be incomplete."); ownedObjectsFetchers.erase(obj.get()); continue; } @@ -256,7 +259,8 @@ void GarbageCollector::OwnedObjectSorter::fetchOwnedObjects(Agent& agent, std::l if (ownershipUdated) agent.commit(); } -void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std::list<std::shared_ptr<GenericObject> >& fetchedObjects, Backend & objectStore, cta::catalogue::Catalogue & catalogue, log::LogContext & lc) { +void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std::list<std::shared_ptr<GenericObject> >& fetchedObjects, + Backend & objectStore, cta::catalogue::Catalogue & catalogue, log::LogContext & lc) { // 3 Now decide the fate of each fetched and owned object. bool ownershipUdated=false; using serializers::ArchiveJobStatus; @@ -367,7 +371,8 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std:: fetchedObjects.clear(); } -void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& agent, AgentReference& agentReference, Backend & objectStore, log::LogContext & lc) { +void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& agent, AgentReference& agentReference, Backend & objectStore, + log::LogContext & lc) { // We can now start updating the objects efficiently. We still need to re-fetch them locked // and validate ownership. // @@ -382,87 +387,33 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a currentJobBatch.emplace_back(std::move(tapepool.second.front())); tapepool.second.pop_front(); } - double queueLockFetchTime=0; - double queueProcessAndCommitTime=0; - double requestsUpdatePreparationTime=0; - double requestsUpdatingTime=0; - double queueRecommitTime=0; - uint64_t filesQueued=0; - uint64_t filesDequeued=0; - uint64_t bytesQueued=0; - uint64_t bytesDequeued=0; - uint64_t filesBefore=0; - uint64_t bytesBefore=0; utils::Timer t; - // Get the archive queue and add references to the jobs in it. - ArchiveQueue aq(objectStore); - ScopedExclusiveLock aql; - Helpers::getLockedAndFetchedQueue<ArchiveQueue>(aq, aql, agentReference, tapepool.first, QueueType::LiveJobs, lc); - queueLockFetchTime = t.secs(utils::Timer::resetCounter); - auto jobsSummary=aq.getJobsSummary(); - filesBefore=jobsSummary.jobs; - bytesBefore=jobsSummary.bytes; - // We have the queue. We will loop on the requests, add them to the queue. We will launch their updates - // after committing the queue. - std::list<ArchiveQueue::JobToAdd> jtal; + typedef ContainerAlgorithms<ArchiveQueue> AqAlgos; + AqAlgos aqcl(objectStore, agentReference); + decltype(aqcl)::InsertedElement::list jobsToAdd; for (auto & ar: currentJobBatch) { // Determine the copy number and feed the queue with it. for (auto &j: ar->dumpJobs()) { if (j.tapePool == tapepool.first) { - jtal.push_back({j, ar->getAddressIfSet(), ar->getArchiveFile().archiveFileID, - ar->getArchiveFile().fileSize, ar->getMountPolicy(), ar->getEntryLog().time}); + jobsToAdd.push_back({ar, j.copyNb, ar->getArchiveFile(), ar->getMountPolicy()}); } } } - auto addedJobs = aq.addJobsIfNecessaryAndCommit(jtal, agentReference, lc); - queueProcessAndCommitTime = t.secs(utils::Timer::resetCounter); - // If we have an unexpected failure, we will re-run the individual garbage collection. Before that, - // we will NOT remove the object from agent's ownership. This variable is declared a bit ahead so - // the goto will not cross its initialization. std::set<std::string> jobsIndividuallyGCed; - if (!addedJobs.files) { - goto agentCleanupForArchive; - } - // We will keep individual references for each job update we launch so that we make - // no assumption (several jobs could be queued to the same pool, even if not expected - // at the high level). - struct ARUpdatersParams { - std::unique_ptr<ArchiveRequest::AsyncJobOwnerUpdater> updater; - std::shared_ptr<ArchiveRequest> archiveRequest; - uint16_t copyNb; - }; - { - std::list<ARUpdatersParams> arUpdatersParams; - for (auto & ar: currentJobBatch) { - for (auto & j: ar->dumpJobs()) { - if (j.tapePool == tapepool.first) { - arUpdatersParams.emplace_back(); - arUpdatersParams.back().archiveRequest = ar; - arUpdatersParams.back().copyNb = j.copyNb; - arUpdatersParams.back().updater.reset( - ar->asyncUpdateJobOwner(j.copyNb, aq.getAddressIfSet(), agent.getAddressIfSet())); - } - } - } - requestsUpdatePreparationTime = t.secs(utils::Timer::resetCounter); - // Now collect the results. - std::list<std::string> requestsToDequeue; - for (auto & arup: arUpdatersParams) { + std::set<std::string> jobsNotRequeued; + std::string queueAddress; + try { + aqcl.referenceAndSwitchOwnershipIfNecessary(tapepool.first, agent.getAddressIfSet(), queueAddress, jobsToAdd, lc); + } catch (AqAlgos::OwnershipSwitchFailure & failure) { + for (auto &failedAR: failure.failedElements) { try { - arup.updater->wait(); - // OK, the job made it to the queue - log::ScopedParamContainer params(lc); - params.add("archiveRequestObject", arup.archiveRequest->getAddressIfSet()) - .add("copyNb", arup.copyNb) - .add("fileId", arup.archiveRequest->getArchiveFile().archiveFileID) - .add("tapepool", tapepool.first) - .add("archiveQueueObject", aq.getAddressIfSet()) - .add("garbageCollectedPreviousOwner", agent.getAddressIfSet()); - lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): requeued archive job."); + std::rethrow_exception(failedAR.failure); } catch (cta::exception::Exception & e) { // Update did not go through. It could be benign std::string debugType=typeid(e).name(); - if (typeid(e) == typeid(Backend::NoSuchObject)) { + auto & arup=*failedAR.element; + jobsNotRequeued.insert(arup.archiveRequest->getAddressIfSet()); + if (typeid(e) == typeid(Backend::NoSuchObject) || typeid(e) == typeid(Backend::WrongPreviousOwner)) { // The object was not present or not owned during update, so we skip it. // This is nevertheless unexpected (from previous fetch, so this is an error). log::ScopedParamContainer params(lc); @@ -470,7 +421,9 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a .add("copyNb", arup.copyNb) .add("fileId", arup.archiveRequest->getArchiveFile().archiveFileID) .add("exceptionType", debugType); - lc.log(log::ERR, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): failed to requeue gone/not owned archive job. Removing from queue."); + lc.log(log::ERR, + "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): " + "failed to requeue gone/not owned archive job. Removed from queue."); } else { // We have an unexpected error. We will handle this with the request-by-request garbage collection. log::ScopedParamContainer params(lc); @@ -479,63 +432,58 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a .add("fileId", arup.archiveRequest->getArchiveFile().archiveFileID) .add("exceptionType", debugType) .add("exceptionMessage", e.getMessageValue()); - lc.log(log::ERR, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): failed to requeue archive job with unexpected error. Removing from queue and will re-run individual garbage collection."); + lc.log(log::ERR, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): " + "failed to requeue archive job with unexpected error. " + "Removing from queue and will re-run individual garbage collection."); // We will re-run the individual GC for this one. jobsIndividuallyGCed.insert(arup.archiveRequest->getAddressIfSet()); otherObjects.emplace_back(new GenericObject(arup.archiveRequest->getAddressIfSet(), objectStore)); } - // In all cases, the object did NOT make it to the queue. - filesDequeued ++; - bytesDequeued += arup.archiveRequest->getArchiveFile().fileSize; - requestsToDequeue.push_back(arup.archiveRequest->getAddressIfSet()); } } - requestsUpdatingTime = t.secs(utils::Timer::resetCounter); - if (requestsToDequeue.size()) { - aq.removeJobsAndCommit(requestsToDequeue); - log::ScopedParamContainer params(lc); - params.add("archiveQueueObject", aq.getAddressIfSet()); - lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): Cleaned up and re-committed archive queue after error handling."); - queueRecommitTime = t.secs(utils::Timer::resetCounter); - } } - { - log::ScopedParamContainer params(lc); - auto jobsSummary = aq.getJobsSummary(); - params.add("tapepool", tapepool.first) - .add("archiveQueueObject", aq.getAddressIfSet()) - .add("filesAdded", filesQueued - filesDequeued) - .add("bytesAdded", bytesQueued - bytesDequeued) - .add("filesAddedInitially", filesQueued) - .add("bytesAddedInitially", bytesQueued) - .add("filesDequeuedAfterErrors", filesDequeued) - .add("bytesDequeuedAfterErrors", bytesDequeued) - .add("filesBefore", filesBefore) - .add("bytesBefore", bytesBefore) - .add("filesAfter", jobsSummary.jobs) - .add("bytesAfter", jobsSummary.bytes) - .add("queueLockFetchTime", queueLockFetchTime) - .add("queueProcessAndCommitTime", queueProcessAndCommitTime) - .add("requestsUpdatePreparationTime", requestsUpdatePreparationTime) - .add("requestsUpdatingTime", requestsUpdatingTime) - .add("queueRecommitTime", queueRecommitTime); - lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): Requeued a batch of archive requests."); + // We can now log individually requeued jobs. + for (auto & arup: jobsToAdd) { + if (!jobsNotRequeued.count(arup.archiveRequest->getAddressIfSet())) { + // OK, the job made it to the queue + log::ScopedParamContainer params(lc); + params.add("archiveRequestObject", arup.archiveRequest->getAddressIfSet()) + .add("copyNb", arup.copyNb) + .add("fileId", arup.archiveRequest->getArchiveFile().archiveFileID) + .add("tapepool", tapepool.first) + .add("archiveQueueObject", queueAddress) + .add("garbageCollectedPreviousOwner", agent.getAddressIfSet()); + lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): requeued archive job."); + } } + jobsToAdd.clear(); // We can now forget pool level list. But before that, we can remove the objects // from agent ownership if this was the last reference to it. // The usage of use_count() is safe here because we are in a single threaded environment. // In a multi threaded environment, its usage would not be appropriate. // See for example http://en.cppreference.com/w/cpp/memory/shared_ptr/use_count - agentCleanupForArchive: bool ownershipUpdated=false; + auto agentOwnership=agent.getOwnershipSet(); for (auto &ar: currentJobBatch) { if (ar.use_count() == 1 && !jobsIndividuallyGCed.count(ar->getAddressIfSet())) { - // This tapepool is the last users of this archive request. We will remove is from ownership. - agent.removeFromOwnership(ar->getAddressIfSet()); + // This tapepool is the last users of this archive request. We will remove it from ownership. + agentOwnership.erase(ar->getAddressIfSet()); ownershipUpdated=true; + log::ScopedParamContainer params(lc); + params.add("archiveRequestObject", ar->getAddressIfSet()); + lc.log(log::DEBUG, "Removed AR from agent ownership."); + } else { + log::ScopedParamContainer params(lc); + params.add("archiveRequestObject", ar->getAddressIfSet()) + .add("use_count", ar.use_count()) + .add("IndividuallyGCed", jobsIndividuallyGCed.count(ar->getAddressIfSet())); + lc.log(log::DEBUG, "Did not remove AR from agent ownership."); } } - if (ownershipUpdated) agent.commit(); + if (ownershipUpdated) { + agent.resetOwnership(agentOwnership); + agent.commit(); + } currentJobBatch.clear(); // Sleep a bit if we have oher rounds to go not to hog the queue if (tapepool.second.size()) sleep (5); @@ -543,7 +491,8 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a } } -void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& agent, AgentReference& agentReference, Backend & objectStore, log::LogContext & lc) { +void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& agent, AgentReference& agentReference, + Backend & objectStore, log::LogContext & lc) { // 2) Get the retrieve requests done. They are simpler as retrieve requests are fully owned. // Then should hence not have changes since we pre-fetched them. for (auto & tape: retrieveQueuesAndRequests) { @@ -642,7 +591,8 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& .add("copyNb", rrup.copyNb) .add("fileId", rrup.retrieveRequest->getArchiveFile().archiveFileID) .add("exceptionType", debugType); - lc.log(log::ERR, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(): failed to requeue gone/not owned retrieve job. Removing from queue."); + lc.log(log::ERR, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(): " + "failed to requeue gone/not owned retrieve job. Removing from queue."); } else { // We have an unexpected error. Log it, and remove form queue. Not much we can // do at this point. @@ -652,7 +602,8 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& .add("fileId", rrup.retrieveRequest->getArchiveFile().archiveFileID) .add("exceptionType", debugType) .add("exceptionMessage", e.getMessageValue()); - lc.log(log::ERR, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(): failed to requeue retrieve job with unexpected error. Removing from queue and will re-run individual garbage collection."); + lc.log(log::ERR, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(): " + "failed to requeue retrieve job with unexpected error. Removing from queue and will re-run individual garbage collection."); // We will re-run the individual GC for this one. jobsIndividuallyGCed.insert(rrup.retrieveRequest->getAddressIfSet()); otherObjects.emplace_back(new GenericObject(rrup.retrieveRequest->getAddressIfSet(), objectStore)); @@ -668,7 +619,8 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& rq.removeJobsAndCommit(requestsToDequeue); log::ScopedParamContainer params(lc); params.add("retreveQueueObject", rq.getAddressIfSet()); - lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(): Cleaned up and re-committed retrieve queue after error handling."); + lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(): " + "Cleaned up and re-committed retrieve queue after error handling."); queueRecommitTime = t.secs(utils::Timer::resetCounter); } } @@ -692,7 +644,8 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& .add("requestsUpdatePreparationTime", requestsUpdatePreparationTime) .add("requestsUpdatingTime", requestsUpdatingTime) .add("queueRecommitTime", queueRecommitTime); - lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(): Requeued a batch of retrieve requests."); + lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(): " + "Requeued a batch of retrieve requests."); } // We can now forget pool level list. But before that, we can remove the objects // from agent ownership if this was the last reference to it. @@ -716,7 +669,8 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& } } -void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateOtherObjects(Agent& agent, AgentReference& agentReference, Backend & objectStore, cta::catalogue::Catalogue & catalogue, log::LogContext & lc) { +void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateOtherObjects(Agent& agent, AgentReference& agentReference, + Backend & objectStore, cta::catalogue::Catalogue & catalogue, log::LogContext & lc) { // 3) are done with the objects requiring mutualized queueing, and hence special treatement. // The rest will be garbage collected on a object-by-object basis. for (auto & go : otherObjects) { @@ -732,7 +686,8 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateOtherObjects(Agent& go->garbageCollectDispatcher(goLock, agent.getAddressIfSet(), agentReference, lc, catalogue); lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateOtherObjects(): garbage collected owned object."); } else { - lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateOtherObjects(): skipping garbage collection of now gone object."); + lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateOtherObjects(): " + "skipping garbage collection of now gone object."); } // In all cases, relinquish ownership for this object agent.removeFromOwnership(go->getAddressIfSet()); diff --git a/objectstore/GarbageCollector.hpp b/objectstore/GarbageCollector.hpp index 603d1d47fb224404511ecad51570523f486b1cc9..b01b9c3f36fef61f8161f467487ae58bddca0acc 100644 --- a/objectstore/GarbageCollector.hpp +++ b/objectstore/GarbageCollector.hpp @@ -58,15 +58,18 @@ public: std::map<std::string, std::list<std::shared_ptr <RetrieveRequest>>> retrieveQueuesAndRequests; std::list<std::shared_ptr<GenericObject>> otherObjects; /// Fill up the fetchedObjects with objects of interest. - void fetchOwnedObjects(Agent & agent, std::list<std::shared_ptr<GenericObject>> & fetchedObjects, Backend & objectStore, log::LogContext & lc); + void fetchOwnedObjects(Agent & agent, std::list<std::shared_ptr<GenericObject>> & fetchedObjects, Backend & objectStore, + log::LogContext & lc); /// Fill up the sorter with the fetched objects - void sortFetchedObjects(Agent & agent, std::list<std::shared_ptr<GenericObject>> & fetchedObjects, Backend & objectStore, cta::catalogue::Catalogue & catalogue, log::LogContext & lc); + void sortFetchedObjects(Agent & agent, std::list<std::shared_ptr<GenericObject>> & fetchedObjects, Backend & objectStore, + cta::catalogue::Catalogue & catalogue, log::LogContext & lc); /// Lock, fetch and update archive jobs void lockFetchAndUpdateArchiveJobs(Agent & agent, AgentReference & agentReference, Backend & objectStore, log::LogContext & lc); /// Lock, fetch and update retrieve jobs void lockFetchAndUpdateRetrieveJobs(Agent & agent, AgentReference & agentReference, Backend & objectStore, log::LogContext & lc); // Lock, fetch and update other objects - void lockFetchAndUpdateOtherObjects(Agent & agent, AgentReference & agentReference, Backend & objectStore, cta::catalogue::Catalogue & catalogue, log::LogContext & lc); + void lockFetchAndUpdateOtherObjects(Agent & agent, AgentReference & agentReference, Backend & objectStore, + cta::catalogue::Catalogue & catalogue, log::LogContext & lc); }; private: Backend & m_objectStore;