diff --git a/objectstore/Algorithms.hpp b/objectstore/Algorithms.hpp index c2002bf6356560ec5dbf1d8a0b83332a0338b27e..2adcbcd3e69504fd6f63ab2385e8808a7f732232 100644 --- a/objectstore/Algorithms.hpp +++ b/objectstore/Algorithms.hpp @@ -104,10 +104,10 @@ public: template <class Element> static ElementAddress getElementAddress(const Element & e); - static void getLockedAndFetched(Container & cont, ScopedExclusiveLock & contLock, AgentReference & agRef, const ContainerIdentifyer & cId, - log::LogContext & lc); - static void getLockedAndFetchedNoCreate(Container & cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, - log::LogContext & lc); + static void getLockedAndFetched(Container & cont, ScopedExclusiveLock & contLock, AgentReference & agRef, + const ContainerIdentifyer & cId, QueueType queueType, log::LogContext & lc); + static void getLockedAndFetchedNoCreate(Container & cont, ScopedExclusiveLock & contLock, + const ContainerIdentifyer & cId, QueueType queueType, log::LogContext & lc); static void addReferencesAndCommit(Container & cont, typename InsertedElement::list & elemMemCont, AgentReference & agentRef, log::LogContext & lc); static void addReferencesIfNecessaryAndCommit(Container & cont, typename InsertedElement::list & elemMemCont, @@ -136,13 +136,13 @@ public: * are provided existing and owned by algorithm's agent. */ void referenceAndSwitchOwnership(const typename ContainerTraits<C>::ContainerIdentifyer & contId, - const typename ContainerTraits<C>::ContainerIdentifyer & prevContId, + QueueType queueType, const typename ContainerTraits<C>::ContainerIdentifyer & prevContId, typename ContainerTraits<C>::InsertedElement::list & elements, log::LogContext & lc) { C cont(m_backend); ScopedExclusiveLock contLock; log::TimingList timingList; utils::Timer t; - ContainerTraits<C>::getLockedAndFetched(cont, contLock, m_agentReference, contId, lc); + ContainerTraits<C>::getLockedAndFetched(cont, contLock, m_agentReference, contId, queueType, lc); ContainerTraits<C>::addReferencesAndCommit(cont, elements, m_agentReference, lc); auto failedOwnershipSwitchElements = ContainerTraits<C>::switchElementsOwnership(elements, cont.getAddressIfSet(), prevContId, timingList, t, lc); @@ -182,7 +182,7 @@ public: * This function is typically used by the garbage collector. We do noe take care of dereferencing * the object from the caller. */ - void referenceAndSwitchOwnershipIfNecessary(const typename ContainerTraits<C>::ContainerIdentifyer & contId, + void referenceAndSwitchOwnershipIfNecessary(const typename ContainerTraits<C>::ContainerIdentifyer & contId, QueueType queueType, typename ContainerTraits<C>::ContainerAddress & previousOwnerAddress, typename ContainerTraits<C>::ContainerAddress & contAddress, typename ContainerTraits<C>::InsertedElement::list & elements, log::LogContext & lc) { @@ -190,7 +190,7 @@ public: ScopedExclusiveLock contLock; log::TimingList timingList; utils::Timer t; - ContainerTraits<C>::getLockedAndFetched(cont, contLock, m_agentReference, contId, lc); + ContainerTraits<C>::getLockedAndFetched(cont, contLock, m_agentReference, contId, queueType, lc); contAddress = cont.getAddressIfSet(); auto contSummaryBefore = ContainerTraits<C>::getContainerSummary(cont); timingList.insertAndReset("queueLockFetchTime", t); @@ -234,14 +234,14 @@ public: * Addition of jobs to container. Convenience overload for cases when current agent is the previous owner * (most cases except garbage collection). */ - void referenceAndSwitchOwnership(const typename ContainerTraits<C>::ContainerIdentifyer & contId, + void referenceAndSwitchOwnership(const typename ContainerTraits<C>::ContainerIdentifyer & contId, QueueType queueType, typename ContainerTraits<C>::InsertedElement::list & elements, log::LogContext & lc) { - referenceAndSwitchOwnership(contId, m_agentReference.getAgentAddress(), elements, lc); + referenceAndSwitchOwnership(contId, queueType, m_agentReference.getAgentAddress(), elements, lc); } typename ContainerTraits<C>::PoppedElementsBatch popNextBatch(const typename ContainerTraits<C>::ContainerIdentifyer & contId, - typename ContainerTraits<C>::PopCriteria & popCriteria, log::LogContext & lc) { + QueueType queueType, typename ContainerTraits<C>::PopCriteria & popCriteria, log::LogContext & lc) { // Prepare the return value typename ContainerTraits<C>::PoppedElementsBatch ret; typename ContainerTraits<C>::PopCriteria unfulfilledCriteria = popCriteria; @@ -258,7 +258,7 @@ public: iterationCount++; ScopedExclusiveLock contLock; try { - ContainerTraits<C>::getLockedAndFetchedNoCreate(cont, contLock, contId, lc); + ContainerTraits<C>::getLockedAndFetchedNoCreate(cont, contLock, contId, queueType, lc); } catch (typename ContainerTraits<C>::NoSuchContainer &) { localTimingList.insertAndReset("findLockFetchQueueTime", t); timingList+=localTimingList; diff --git a/objectstore/AlgorithmsTest.cpp b/objectstore/AlgorithmsTest.cpp index 7a50093d0a7f59753dad2849da494803671f0e79..f2daf790d1ec08f2dd22f1247b91d9ce293c5721 100644 --- a/objectstore/AlgorithmsTest.cpp +++ b/objectstore/AlgorithmsTest.cpp @@ -94,12 +94,12 @@ TEST(ObjectStore, ArchiveQueueAlgorithms) { ar.insert(); } ContainerAlgorithms<ArchiveQueue> archiveAlgos(be, agentRef); - archiveAlgos.referenceAndSwitchOwnership("Tapepool", requests, lc); + archiveAlgos.referenceAndSwitchOwnership("Tapepool", QueueType::JobsToTransfer, requests, lc); // Now get the requests back ContainerTraits<ArchiveQueue>::PopCriteria popCriteria; popCriteria.bytes = std::numeric_limits<decltype(popCriteria.bytes)>::max(); popCriteria.files = 100; - auto popedJobs = archiveAlgos.popNextBatch("Tapepool", popCriteria, lc); + auto popedJobs = archiveAlgos.popNextBatch("Tapepool", QueueType::JobsToTransfer, popCriteria, lc); ASSERT_EQ(popedJobs.summary.files, 10); } @@ -161,7 +161,7 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) { rqc.mountPolicy.retrieveMinRequestAge = 1; rqc.mountPolicy.retrievePriority = 1; requests.emplace_back(ContainerAlgorithms<RetrieveQueue>::InsertedElement{cta::make_unique<RetrieveRequest>(rrAddr, be), 1, i, 667, mp, - serializers::RetrieveJobStatus::RJS_Pending}); + serializers::RetrieveJobStatus::RJS_ToTransfer}); auto & rr=*requests.back().retrieveRequest; rr.initialize(); rr.setRetrieveFileQueueCriteria(rqc); @@ -176,7 +176,7 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) { } ContainerAlgorithms<RetrieveQueue> retrieveAlgos(be, agentRef); try { - retrieveAlgos.referenceAndSwitchOwnership("VID", requests, lc); + retrieveAlgos.referenceAndSwitchOwnership("VID", QueueType::JobsToTransfer, agentRef.getAgentAddress(), requests, lc); } catch (ContainerTraits<RetrieveQueue>::OwnershipSwitchFailure & ex) { for (auto & e: ex.failedElements) { try { diff --git a/objectstore/ArchiveQueue.cpp b/objectstore/ArchiveQueue.cpp index 1d6962c1158fd123dc705c1c3bdcc9b35cb99a35..55d303d652566585845b780befaa8f171ec83071 100644 --- a/objectstore/ArchiveQueue.cpp +++ b/objectstore/ArchiveQueue.cpp @@ -221,7 +221,7 @@ void ArchiveQueue::garbageCollect(const std::string &presumedOwner, AgentReferen RootEntry re(m_objectStore); ScopedSharedLock rel (re); re.fetch(); - auto tpd=re.dumpArchiveQueues(QueueType::LiveJobs); + auto tpd=re.dumpArchiveQueues(QueueType::JobsToTransfer); for (auto tp=tpd.begin(); tp!=tpd.end(); tp++) { if (tp->address == getAddressIfSet()) { setOwner(re.getAddressIfSet()); diff --git a/objectstore/ArchiveQueueAlgorithms.cpp b/objectstore/ArchiveQueueAlgorithms.cpp index 6d6844fd8e13831ee4053f4c70a7a9aa2758cf6d..066fdb4d12034cc5212e1510ef1e3089b481d380 100644 --- a/objectstore/ArchiveQueueAlgorithms.cpp +++ b/objectstore/ArchiveQueueAlgorithms.cpp @@ -27,8 +27,8 @@ const std::string ContainerTraits<ArchiveQueue>::c_containerTypeName = "ArchiveQ const std::string ContainerTraits<ArchiveQueue>::c_identifyerType = "tapepool"; void ContainerTraits<ArchiveQueue>::getLockedAndFetched(Container& cont, ScopedExclusiveLock& aqL, AgentReference& agRef, - const ContainerIdentifyer& contId, log::LogContext& lc) { - Helpers::getLockedAndFetchedQueue<Container>(cont, aqL, agRef, contId, QueueType::LiveJobs, lc); + const ContainerIdentifyer& contId, QueueType queueType, log::LogContext& lc) { + Helpers::getLockedAndFetchedQueue<Container>(cont, aqL, agRef, contId, queueType, lc); } void ContainerTraits<ArchiveQueue>::addReferencesAndCommit(Container& cont, InsertedElement::list& elemMemCont, @@ -125,14 +125,14 @@ auto ContainerTraits<ArchiveQueue>::switchElementsOwnership(InsertedElement::lis } void ContainerTraits<ArchiveQueue>::getLockedAndFetchedNoCreate(Container& cont, ScopedExclusiveLock& contLock, - const ContainerIdentifyer& cId, log::LogContext& lc) { + const ContainerIdentifyer& cId, QueueType queueType, log::LogContext& lc) { // Try and get access to a queue. size_t attemptCount = 0; retry: objectstore::RootEntry re(cont.m_objectStore); re.fetchNoLock(); std::string aqAddress; - auto aql = re.dumpArchiveQueues(QueueType::LiveJobs); + auto aql = re.dumpArchiveQueues(queueType); for (auto & aqp : aql) { if (aqp.tapePool == cId) aqAddress = aqp.address; @@ -153,7 +153,7 @@ void ContainerTraits<ArchiveQueue>::getLockedAndFetchedNoCreate(Container& cont, ScopedExclusiveLock rexl(re); re.fetch(); try { - re.removeArchiveQueueAndCommit(cId, QueueType::LiveJobs, lc); + re.removeArchiveQueueAndCommit(cId, queueType, lc); log::ScopedParamContainer params(lc); params.add("tapepool", cId) .add("queueObject", cont.getAddressIfSet()); @@ -261,7 +261,7 @@ void ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(Container& cont, Scope RootEntry re(cont.m_objectStore); ScopedExclusiveLock rexl(re); re.fetch(); - re.removeArchiveQueueAndCommit(cId, QueueType::LiveJobs, lc); + re.removeArchiveQueueAndCommit(cId, QueueType::JobsToTransfer, lc); log::ScopedParamContainer params(lc); params.add("tapepool", cId) .add("queueObject", cont.getAddressIfSet()); diff --git a/objectstore/ArchiveQueueAlgorithms.hpp b/objectstore/ArchiveQueueAlgorithms.hpp index a06a019a85b934b75cbb21ba5178fa99e5a24a5a..8ba0a6783d8fb7bc5dfe5e47e80416bc729fb4d1 100644 --- a/objectstore/ArchiveQueueAlgorithms.hpp +++ b/objectstore/ArchiveQueueAlgorithms.hpp @@ -59,11 +59,11 @@ public: template <class Element> static ElementAddress getElementAddress(const Element & e) { return e.archiveRequest->getAddressIfSet(); } - static void getLockedAndFetched(Container & cont, ScopedExclusiveLock & aqL, AgentReference & agRef, const ContainerIdentifyer & contId, - log::LogContext & lc); + static void getLockedAndFetched(Container & cont, ScopedExclusiveLock & aqL, AgentReference & agRef, + const ContainerIdentifyer & contId, QueueType queueType, log::LogContext & lc); - static void getLockedAndFetchedNoCreate(Container & cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, - log::LogContext & lc); + static void getLockedAndFetchedNoCreate(Container & cont, ScopedExclusiveLock & contLock, + const ContainerIdentifyer & cId, QueueType queueType, log::LogContext & lc); static void addReferencesAndCommit(Container & cont, InsertedElement::list & elemMemCont, AgentReference & agentRef, log::LogContext & lc); diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index d5b3423848c9f47c977ea1acb1294c131bcce6e8..9b6a6a93f6c41b4ceb32ccfb94dfe25410916f28 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -51,14 +51,14 @@ void cta::objectstore::ArchiveRequest::initialize() { } void cta::objectstore::ArchiveRequest::addJob(uint16_t copyNumber, - const std::string& tapepool, const std::string& archivequeueaddress, + const std::string& tapepool, const std::string& initialOwner, uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries) { checkPayloadWritable(); auto *j = m_payload.add_jobs(); j->set_copynb(copyNumber); - j->set_status(serializers::ArchiveJobStatus::AJS_LinkingToArchiveQueue); + j->set_status(serializers::ArchiveJobStatus::AJS_ToTransfer); j->set_tapepool(tapepool); - j->set_owner(archivequeueaddress); + j->set_owner(initialOwner); j->set_archivequeueaddress(""); j->set_totalretries(0); j->set_retrieswithinmount(0); @@ -67,23 +67,32 @@ void cta::objectstore::ArchiveRequest::addJob(uint16_t copyNumber, j->set_maxtotalretries(maxTotalRetries); } -bool cta::objectstore::ArchiveRequest::setJobSuccessful(uint16_t copyNumber) { - checkPayloadWritable(); - auto * jl = m_payload.mutable_jobs(); - for (auto j=jl->begin(); j!=jl->end(); j++) { - if (j->copynb() == copyNumber) { - j->set_status(serializers::ArchiveJobStatus::AJS_Complete); - for (auto j2=jl->begin(); j2!=jl->end(); j2++) { - if (j2->status()!= serializers::ArchiveJobStatus::AJS_Complete && - j2->status()!= serializers::ArchiveJobStatus::AJS_Failed) - return false; +QueueType ArchiveRequest::getJobQueueType(uint16_t copyNumber) { + checkPayloadReadable(); + for (auto &j: m_payload.jobs()) { + if (j.copynb() == copyNumber) { + switch (j.status()) { + case serializers::ArchiveJobStatus::AJS_ToTransfer: + return QueueType::JobsToTransfer; + case serializers::ArchiveJobStatus::AJS_Complete: + throw JobNotQueueable("In ArchiveRequest::getJobQueueType(): Complete jobs are not queueable. They are finished and pend siblings completion."); + case serializers::ArchiveJobStatus::AJS_ToReport: + // We should report a success... + return QueueType::JobsToReport; + case serializers::ArchiveJobStatus::AJS_FailedToReport: + // We should report a failure. The report queue can be shared. + return QueueType::JobsToReport; + case serializers::ArchiveJobStatus::AJS_Failed: + return QueueType::FailedJobs; + case serializers::ArchiveJobStatus::AJS_Abandoned: + throw JobNotQueueable("In ArchiveRequest::getJobQueueType(): Abandoned jobs are not queueable. They are finished and pend siblings completion."); } - return true; } } - throw NoSuchJob("In ArchiveRequest::setJobSuccessful(): job not found"); + throw exception::Exception("In ArchiveRequest::getJobQueueType(): Copy number not found."); } + bool cta::objectstore::ArchiveRequest::addJobFailure(uint16_t copyNumber, uint64_t mountId, const std::string & failureReason, log::LogContext & lc) { checkPayloadWritable(); @@ -106,8 +115,7 @@ bool cta::objectstore::ArchiveRequest::addJobFailure(uint16_t copyNumber, if (!finishIfNecessary(lc)) commit(); return true; } else { - j.set_status(serializers::AJS_PendingMount); - commit(); + j.set_status(serializers::AJS_ToTransfer); return false; } } @@ -129,22 +137,6 @@ ArchiveRequest::RetryStatus ArchiveRequest::getRetryStatus(const uint16_t copyNu throw cta::exception::Exception("In ArchiveRequest::getRetryStatus(): job not found()"); } -void cta::objectstore::ArchiveRequest::setAllJobsLinkingToArchiveQueue() { - checkPayloadWritable(); - auto * jl=m_payload.mutable_jobs(); - for (auto j=jl->begin(); j!=jl->end(); j++) { - j->set_status(serializers::AJS_LinkingToArchiveQueue); - } -} - -void cta::objectstore::ArchiveRequest::setAllJobsFailed() { - checkPayloadWritable(); - auto * jl=m_payload.mutable_jobs(); - for (auto j=jl->begin(); j!=jl->end(); j++) { - j->set_status(serializers::AJS_Failed); - } -} - void ArchiveRequest::setArchiveFile(const cta::common::dataStructures::ArchiveFile& archiveFile) { checkPayloadWritable(); // TODO: factor out the archivefile structure from the flat ArchiveRequest. @@ -311,24 +303,18 @@ auto ArchiveRequest::dumpJobs() -> std::list<ArchiveRequest::JobDump> { void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, cta::catalogue::Catalogue & catalogue) { checkPayloadWritable(); - // The behavior here depends on which job the agent is supposed to own. - // We should first find this job (if any). This is for covering the case - // of a selected job. The Request could also still being connected to tape - // pools. In this case we will finish the connection to tape pools unconditionally. + // We need to find which job(s) we should actually work on. The job(s) will be + // requeued to the relevant queues depending on their statuses. auto * jl = m_payload.mutable_jobs(); bool anythingGarbageCollected=false; + using serializers::ArchiveJobStatus; + std::set<ArchiveJobStatus> statusesImplyingQueueing ({ArchiveJobStatus::AJS_ToTransfer, + ArchiveJobStatus::AJS_ToReport, ArchiveJobStatus::AJS_Failed}); for (auto j=jl->begin(); j!=jl->end(); j++) { auto owner=j->owner(); auto status=j->status(); - if (status==serializers::AJS_LinkingToArchiveQueue || - ( (status==serializers::AJS_Selected || status==serializers::AJS_PendingMount) - && owner==presumedOwner)) { - // If the job was being connected to the tape pool or was selected - // by the dead agent, then we have to ensure it is indeed connected to - // the tape pool and set its status to pending. - // (Re)connect the job to the tape pool and make it pending. - // If we fail to reconnect, we have to fail the job and potentially - // finish the request. + if ( statusesImplyingQueueing.count(status) && owner==presumedOwner) { + // The job is in a state which implies queuing. std::string queueObject="Not defined yet"; anythingGarbageCollected=true; try { @@ -337,7 +323,7 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer // recreated (this will be done by helper). ArchiveQueue aq(m_objectStore); ScopedExclusiveLock aql; - Helpers::getLockedAndFetchedQueue<ArchiveQueue>(aq, aql, agentReference, j->tapepool(), QueueType::LiveJobs, lc); + Helpers::getLockedAndFetchedQueue<ArchiveQueue>(aq, aql, agentReference, j->tapepool(), getQueueType(status), lc); queueObject=aq.getAddressIfSet(); ArchiveRequest::JobDump jd; jd.copyNb = j->copynb(); @@ -350,7 +336,6 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer aq.addJobsIfNecessaryAndCommit(jta, agentReference, lc); auto queueUpdateTime = t.secs(utils::Timer::resetCounter); j->set_owner(aq.getAddressIfSet()); - j->set_status(serializers::AJS_PendingMount); commit(); aql.release(); auto commitUnlockQueueTime = t.secs(utils::Timer::resetCounter); @@ -405,21 +390,11 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer } catch (...) { params.add("exceptionType", "unknown"); } - // This could be the end of the request, with various consequences. - // This is handled here: - if (finishIfNecessary(lc)) { - std::string message="In ArchiveRequest::garbageCollect(): failed to requeue the job. Failed it and removed the request as a consequence."; - if (backtrace.size()) message += " Backtrace follows."; - lc.log(log::ERR, message); - if (backtrace.size()) lc.logBacktrace(log::ERR, backtrace); - return; - } else { commit(); - lc.log(log::ERR, "In ArchiveRequest::garbageCollect(): failed to requeue the job and failed it."); + lc.log(log::ERR, "In ArchiveRequest::garbageCollect(): failed to requeue the job and failed it. Internal error: the job is now orphaned."); } } } - } if (!anythingGarbageCollected) { log::ScopedParamContainer params(lc); params.add("jobObject", getAddressIfSet()) @@ -563,13 +538,18 @@ ArchiveRequest::AsyncJobSuccessfulUpdater * ArchiveRequest::asyncUpdateJobSucces if (j2->status()!= serializers::ArchiveJobStatus::AJS_Complete && j2->status()!= serializers::ArchiveJobStatus::AJS_Failed) { retRef.m_isLastJob = false; + // The complete but not last job have now finished its + // lifecycle, and will get dereferenced. + j->set_owner(""); oh.set_payload(payload.SerializePartialAsString()); return oh.SerializeAsString(); } } retRef.m_isLastJob = true; + // If this is the last job, we indeed need to set the status to ToReport. + j->set_status(serializers::ArchiveJobStatus::AJS_ToReport); oh.set_payload(payload.SerializePartialAsString()); - throw cta::objectstore::Backend::AsyncUpdateWithDelete(oh.SerializeAsString()); + return oh.SerializeAsString(); } } std::stringstream err; @@ -593,16 +573,32 @@ std::string ArchiveRequest::getJobOwner(uint16_t copyNumber) { return j->owner(); } +QueueType ArchiveRequest::getQueueType(const serializers::ArchiveJobStatus& status) { + using serializers::ArchiveJobStatus; + switch(status) { + case ArchiveJobStatus::AJS_ToTransfer: + return QueueType::JobsToTransfer; + case ArchiveJobStatus::AJS_ToReport: + return QueueType::JobsToReport; + case ArchiveJobStatus::AJS_Failed: + return QueueType::FailedJobs; + default: + throw cta::exception::Exception("In ArchiveRequest::getQueueType(): invalid status for queueing."); + } +} + std::string ArchiveRequest::statusToString(const serializers::ArchiveJobStatus& status) { switch(status) { + case serializers::ArchiveJobStatus::AJS_ToTransfer: + return "ToTransfer"; + case serializers::ArchiveJobStatus::AJS_ToReport: + return "ToReport"; case serializers::ArchiveJobStatus::AJS_Complete: return "Complete"; case serializers::ArchiveJobStatus::AJS_Failed: return "Failed"; - case serializers::ArchiveJobStatus::AJS_LinkingToArchiveQueue: - return "LinkingToArchiveQueue"; - case serializers::ArchiveJobStatus::AJS_PendingMount: - return "PendingMount"; + case serializers::ArchiveJobStatus::AJS_Abandoned: + return "Abandoned"; default: return std::string("Unknown (")+std::to_string((uint64_t) status)+")"; } diff --git a/objectstore/ArchiveRequest.hpp b/objectstore/ArchiveRequest.hpp index 026618611166a5a25c5064bc5507f6cc36fd2570..4576e29b0abd8a3ee59c806af46fe733639c878d 100644 --- a/objectstore/ArchiveRequest.hpp +++ b/objectstore/ArchiveRequest.hpp @@ -24,6 +24,7 @@ #include "common/dataStructures/MountPolicy.hpp" #include "common/dataStructures/UserIdentity.hpp" #include "common/dataStructures/ArchiveFile.hpp" +#include "QueueType.hpp" #include "common/Timer.hpp" #include "ObjectOps.hpp" #include "objectstore/cta.pb.h" @@ -42,9 +43,12 @@ public: ArchiveRequest(Backend & os); ArchiveRequest(GenericObject & go); void initialize(); + // Ownership of archive requests is managed per job. Object level owner has no meaning. + std::string getOwner() = delete; + void setOwner(const std::string &) = delete; // Job management ============================================================ void addJob(uint16_t copyNumber, const std::string & tapepool, - const std::string & archivequeueaddress, uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries); + const std::string & initialOwner, uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries); void setJobSelected(uint16_t copyNumber, const std::string & owner); void setJobPending(uint16_t copyNumber); bool setJobSuccessful(uint16_t copyNumber); //< returns true if this is the last job @@ -61,10 +65,8 @@ public: std::string statusToString(const serializers::ArchiveJobStatus & status); bool finishIfNecessary(log::LogContext & lc);/**< Handling of the consequences of a job status change for the entire request. * This function returns true if the request got finished. */ - // Mark all jobs as pending mount (following their linking to a tape pool) - void setAllJobsLinkingToArchiveQueue(); - // Mark all the jobs as being deleted, in case of a cancellation - void setAllJobsFailed(); + CTA_GENERATE_EXCEPTION_CLASS(JobNotQueueable); + QueueType getJobQueueType(uint16_t copyNumber); CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob); // Set a job ownership void setJobOwner(uint16_t copyNumber, const std::string & owner); @@ -110,6 +112,10 @@ public: // Get a job owner std::string getJobOwner(uint16_t copyNumber); + + // Utility to convert status to queue type + static QueueType getQueueType(const serializers::ArchiveJobStatus &status); + // Request management ======================================================== void setSuccessful(); void setFailed(); diff --git a/objectstore/CMakeLists.txt b/objectstore/CMakeLists.txt index b2ec2cc34cc1d5b39a281b62ca7ac0bf1109be16..ca01814a65a5f402e636b96648ed30c51c610f64 100644 --- a/objectstore/CMakeLists.txt +++ b/objectstore/CMakeLists.txt @@ -63,6 +63,7 @@ add_library (ctaobjectstore SHARED ArchiveQueueAlgorithms.cpp RetrieveQueue.cpp RetrieveQueueShard.cpp + QueueType.cpp ArchiveRequest.cpp RetrieveRequest.cpp DriveRegister.cpp diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index 235b0df3ee589cbd1ccde48002942f2722e26ea0..6a9472498a7037e99b0a92595e4b1f30fe700220 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -264,9 +264,7 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std:: // 3 Now decide the fate of each fetched and owned object. bool ownershipUdated=false; using serializers::ArchiveJobStatus; - std::set<ArchiveJobStatus> inactiveArchiveJobStatuses({ArchiveJobStatus::AJS_Complete, ArchiveJobStatus::AJS_Failed}); using serializers::RetrieveJobStatus; - std::set<RetrieveJobStatus> inactiveRetrieveJobStatuses({RetrieveJobStatus::RJS_Complete, RetrieveJobStatus::RJS_Failed}); for (auto & obj: fetchedObjects) { log::ScopedParamContainer params2(lc); params2.add("objectAddress", obj->getAddressIfSet()); @@ -297,15 +295,16 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std:: obj.reset(); bool jobRequeued=false; for (auto &j: ar->dumpJobs()) { - if ((j.owner == agent.getAddressIfSet() || ar->getOwner() == agent.getAddressIfSet()) - && !inactiveArchiveJobStatuses.count(j.status)) { - archiveQueuesAndRequests[j.tapePool].emplace_back(ar); - log::ScopedParamContainer params3(lc); - params3.add("tapepool", j.tapePool) - .add("copynb", j.copyNb) - .add("fileId", ar->getArchiveFile().archiveFileID); - lc.log(log::INFO, "Selected archive request for requeueing to tape pool"); - jobRequeued=true; + if ((j.owner == agent.getAddressIfSet())) { + try { + archiveQueuesAndRequests[std::make_tuple(j.tapePool, ar->getJobQueueType(j.copyNb))].emplace_back(ar); + log::ScopedParamContainer params3(lc); + params3.add("tapepool", j.tapePool) + .add("copynb", j.copyNb) + .add("fileId", ar->getArchiveFile().archiveFileID); + lc.log(log::INFO, "Selected archive request for requeueing to tape pool"); + jobRequeued=true; + } catch (ArchiveRequest::JobNotQueueable &) {} } } if (!jobRequeued) { @@ -323,17 +322,25 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std:: // Get the list of vids for non failed tape files. std::set<std::string> candidateVids; for (auto & j: rr->dumpJobs()) { - if (!inactiveRetrieveJobStatuses.count(j.status)) { + if(j.status==RetrieveJobStatus::RJS_ToTransfer) { candidateVids.insert(rr->getArchiveFile().tapeFiles.at(j.copyNb).vid); } } + // Small parenthesis for non transfer cases. if (candidateVids.empty()) { - log::ScopedParamContainer params3(lc); - params3.add("fileId", rr->getArchiveFile().archiveFileID); - lc.log(log::INFO, "No active retrieve job to requeue found. Marking request for normal GC (and probably deletion)."); - otherObjects.emplace_back(new GenericObject(rr->getAddressIfSet(), objectStore)); - break; + // The request might need to be added to the failed to report of failed queue/container. + try { + retrieveQueuesAndRequests[std::make_tuple(rr->getArchiveFile().tapeFiles.begin()->second.vid, rr->getQueueType())].emplace_back(rr); + } catch (cta::exception::Exception & ex) { + log::ScopedParamContainer params3(lc); + params3.add("fileId", rr->getArchiveFile().archiveFileID) + .add("exceptionMessage", ex.getMessageValue()); + lc.log(log::ERR, "Failed to determine destination queue for retrieve request. Marking request for normal GC (and probably deletion)."); + otherObjects.emplace_back(new GenericObject(rr->getAddressIfSet(), objectStore)); + break; + } } + // Back to the transfer case. std::string vid; try { vid=Helpers::selectBestRetrieveQueue(candidateVids, catalogue, objectStore); @@ -344,7 +351,7 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std:: otherObjects.emplace_back(new GenericObject(rr->getAddressIfSet(), objectStore)); break; } - retrieveQueuesAndRequests[vid].emplace_back(rr); + retrieveQueuesAndRequests[std::make_tuple(vid, QueueType::JobsToTransfer)].emplace_back(rr); log::ScopedParamContainer params3(lc); // Find copyNb for logging size_t copyNb = std::numeric_limits<size_t>::max(); @@ -377,15 +384,19 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a // and validate ownership. // // 1) Get the archive requests done. - for (auto & tapepool: archiveQueuesAndRequests) { + for (auto & archiveQueueIdAndReqs: archiveQueuesAndRequests) { // The number of objects to requeue could be very high. In order to limit the time taken by the // individual requeue operations, we limit the number of concurrently requeued objects to an // arbitrary 500. - while (tapepool.second.size()) { - decltype (tapepool.second) currentJobBatch; - while (tapepool.second.size() && currentJobBatch.size() <= 500) { - currentJobBatch.emplace_back(std::move(tapepool.second.front())); - tapepool.second.pop_front(); + std::string tapepool; + QueueType queueType; + std::tie(tapepool, queueType) = archiveQueueIdAndReqs.first; + auto & requestsList = archiveQueueIdAndReqs.second; + while (requestsList.size()) { + decltype (archiveQueueIdAndReqs.second) currentJobBatch; + while (requestsList.size() && currentJobBatch.size() <= 500) { + currentJobBatch.emplace_back(std::move(requestsList.front())); + requestsList.pop_front(); } utils::Timer t; typedef ContainerAlgorithms<ArchiveQueue> AqAlgos; @@ -394,7 +405,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a for (auto & ar: currentJobBatch) { // Determine the copy number and feed the queue with it. for (auto &j: ar->dumpJobs()) { - if (j.tapePool == tapepool.first) { + if (j.tapePool == tapepool) { jobsToAdd.push_back({ar, j.copyNb, ar->getArchiveFile(), ar->getMountPolicy()}); } } @@ -403,7 +414,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a std::set<std::string> jobsNotRequeued; std::string queueAddress; try { - aqcl.referenceAndSwitchOwnershipIfNecessary(tapepool.first, agent.getAddressIfSet(), queueAddress, jobsToAdd, lc); + aqcl.referenceAndSwitchOwnershipIfNecessary(tapepool, queueType, agent.getAddressIfSet(), queueAddress, jobsToAdd, lc); } catch (AqAlgos::OwnershipSwitchFailure & failure) { for (auto &failedAR: failure.failedElements) { try { @@ -450,7 +461,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a params.add("archiveRequestObject", arup.archiveRequest->getAddressIfSet()) .add("copyNb", arup.copyNb) .add("fileId", arup.archiveRequest->getArchiveFile().archiveFileID) - .add("tapepool", tapepool.first) + .add("tapepool", tapepool) .add("archiveQueueObject", queueAddress) .add("garbageCollectedPreviousOwner", agent.getAddressIfSet()); lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): requeued archive job."); @@ -486,7 +497,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a } currentJobBatch.clear(); // Sleep a bit if we have oher rounds to go not to hog the queue - if (tapepool.second.size()) sleep (5); + if (archiveQueueIdAndReqs.second.size()) sleep (5); } } } @@ -495,12 +506,16 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& Backend & objectStore, log::LogContext & lc) { // 2) Get the retrieve requests done. They are simpler as retrieve requests are fully owned. // Then should hence not have changes since we pre-fetched them. - for (auto & tape: retrieveQueuesAndRequests) { - while (tape.second.size()) { - decltype (tape.second) currentJobBatch; - while (tape.second.size() && currentJobBatch.size() <= 500) { - currentJobBatch.emplace_back(std::move(tape.second.front())); - tape.second.pop_front(); + for (auto & retriveQueueIdAndReqs: retrieveQueuesAndRequests) { + std::string vid; + QueueType queueType; + std::tie(vid, queueType) = retriveQueueIdAndReqs.first; + auto & requestsList = retriveQueueIdAndReqs.second; + while (requestsList.size()) { + decltype (retriveQueueIdAndReqs.second) currentJobBatch; + while (requestsList.size() && currentJobBatch.size() <= 500) { + currentJobBatch.emplace_back(std::move(requestsList.front())); + requestsList.pop_front(); } double queueLockFetchTime=0; double queueProcessAndCommitTime=0; @@ -517,7 +532,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& // Get the retrieve queue and add references to the jobs to it. RetrieveQueue rq(objectStore); ScopedExclusiveLock rql; - Helpers::getLockedAndFetchedQueue<RetrieveQueue>(rq,rql, agentReference, tape.first, QueueType::LiveJobs, lc); + Helpers::getLockedAndFetchedQueue<RetrieveQueue>(rq,rql, agentReference, vid, queueType, lc); queueLockFetchTime = t.secs(utils::Timer::resetCounter); auto jobsSummary=rq.getJobsSummary(); filesBefore=jobsSummary.files; @@ -529,7 +544,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& for (auto & rr: currentJobBatch) { // Determine the copy number and feed the queue with it. for (auto &tf: rr->getArchiveFile().tapeFiles) { - if (tf.second.vid == tape.first) { + if (tf.second.vid == vid) { jta.push_back({tf.second.copyNb, tf.second.fSeq, rr->getAddressIfSet(), rr->getArchiveFile().fileSize, rr->getRetrieveFileQueueCriteria().mountPolicy, rr->getEntryLog().time}); } @@ -555,7 +570,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& std::list<RRUpdatedParams> rrUpdatersParams; for (auto & rr: currentJobBatch) { for (auto & tf: rr->getArchiveFile().tapeFiles) { - if (tf.second.vid == tape.first) { + if (tf.second.vid == vid) { rrUpdatersParams.emplace_back(); rrUpdatersParams.back().retrieveRequest = rr; rrUpdatersParams.back().copyNb = tf.second.copyNb; @@ -575,7 +590,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& params.add("retrieveRequestObject", rrup.retrieveRequest->getAddressIfSet()) .add("copyNb", rrup.copyNb) .add("fileId", rrup.retrieveRequest->getArchiveFile().archiveFileID) - .add("vid", tape.first) + .add("vid", vid) .add("retreveQueueObject", rq.getAddressIfSet()) .add("garbageCollectedPreviousOwner", agent.getAddressIfSet()); lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(): requeued retrieve job."); @@ -627,7 +642,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& { log::ScopedParamContainer params(lc); auto jobsSummary = rq.getJobsSummary(); - params.add("vid", tape.first) + params.add("vid", vid) .add("retrieveQueueObject", rq.getAddressIfSet()) .add("filesAdded", filesQueued - filesDequeued) .add("bytesAdded", bytesQueued - bytesDequeued) @@ -664,7 +679,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& if (ownershipUpdated) agent.commit(); currentJobBatch.clear(); // Sleep a bit if we have oher rounds to go not to hog the queue - if (tape.second.size()) sleep (5); + if (retriveQueueIdAndReqs.second.size()) sleep (5); } } } diff --git a/objectstore/GarbageCollector.hpp b/objectstore/GarbageCollector.hpp index b01b9c3f36fef61f8161f467487ae58bddca0acc..88900c473ae32fd52845453d5c13b2af765bb0b1 100644 --- a/objectstore/GarbageCollector.hpp +++ b/objectstore/GarbageCollector.hpp @@ -22,6 +22,7 @@ #include "Agent.hpp" #include "AgentWatchdog.hpp" #include "AgentRegister.hpp" +#include "QueueType.hpp" #include "common/log/LogContext.hpp" /** @@ -54,8 +55,8 @@ public: /** Structure allowing the sorting of owned objects, so they can be requeued in batches, * one batch per queue. */ struct OwnedObjectSorter { - std::map<std::string, std::list<std::shared_ptr <ArchiveRequest>>> archiveQueuesAndRequests; - std::map<std::string, std::list<std::shared_ptr <RetrieveRequest>>> retrieveQueuesAndRequests; + std::map<std::tuple<std::string, QueueType>, std::list<std::shared_ptr <ArchiveRequest>>> archiveQueuesAndRequests; + std::map<std::tuple<std::string, QueueType>, std::list<std::shared_ptr <RetrieveRequest>>> retrieveQueuesAndRequests; std::list<std::shared_ptr<GenericObject>> otherObjects; /// Fill up the fetchedObjects with objects of interest. void fetchOwnedObjects(Agent & agent, std::list<std::shared_ptr<GenericObject>> & fetchedObjects, Backend & objectStore, diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index 8079b3e18337ce9124e121474dc6598edad72116..95903a49bbe736c912af7d198b391cd6594c19ab 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -341,7 +341,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { re.fetch(); std::stringstream tapePoolName; tapePoolName << "TapePool" << i; - tpAddr[i] = re.addOrGetArchiveQueueAndCommit(tapePoolName.str(), agentRef, cta::objectstore::QueueType::LiveJobs, lc); + tpAddr[i] = re.addOrGetArchiveQueueAndCommit(tapePoolName.str(), agentRef, cta::objectstore::QueueType::JobsToTransfer, lc); cta::objectstore::ArchiveQueue aq(tpAddr[i], be); } // Create the various ATFR's, stopping one step further each time. @@ -369,8 +369,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { aFile.storageClass = "sc"; ar.setArchiveFile(aFile); ar.addJob(1, "TapePool0", tpAddr[0], 1, 1); - ar.addJob(2, "TapePool1", tpAddr[1], 1, 1); - ar.setOwner(agA.getAddressIfSet()); + ar.addJob(2, "TapePool1", tpAddr[1], 1, 1); cta::common::dataStructures::MountPolicy mp; ar.setMountPolicy(mp); ar.setArchiveReportURL(""); @@ -381,13 +380,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { ar.insert(); cta::objectstore::ScopedExclusiveLock atfrl(ar); if (pass < 2) { pass++; continue; } - // - Change the jobs statuses from PendingNSCreation to LinkingToArchiveQueue. - // They will be automatically connected to the tape pool by the garbage - // collector from that moment on. - { - ar.setAllJobsLinkingToArchiveQueue(); - ar.commit(); - } + // The step is now deprecated if (pass < 3) { pass++; continue; } // - Referenced in the first tape pool { @@ -426,11 +419,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { aq.addJobsAndCommit(jta, agentRef, lc); } if (pass < 5) { pass++; continue; } - // - Still marked a not owned but referenced in the agent - { - ar.setOwner(""); - ar.commit(); - } + // The step is now deprecated break; } // Create the garbage collector and run it twice. @@ -470,7 +459,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { std::list<std::string> tapePools = { "TapePool0", "TapePool1" }; for (auto & tp: tapePools) { // Empty queue - cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(tp, cta::objectstore::QueueType::LiveJobs), be); + cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(tp, cta::objectstore::QueueType::JobsToTransfer), be); cta::objectstore::ScopedExclusiveLock aql(aq); aq.fetch(); std::list<std::string> ajtr; @@ -480,7 +469,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { aq.removeJobsAndCommit(ajtr); aql.release(); // Remove queues from root - re.removeArchiveQueueAndCommit(tp, cta::objectstore::QueueType::LiveJobs, lc); + re.removeArchiveQueueAndCommit(tp, cta::objectstore::QueueType::JobsToTransfer, lc); } ASSERT_NO_THROW(re.removeAgentRegisterAndCommit(lc)); @@ -540,7 +529,7 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) { re.fetch(); std::stringstream vid; vid << "Tape" << i; - tAddr[i] = re.addOrGetRetrieveQueueAndCommit(vid.str(), agentRef, cta::objectstore::QueueType::LiveJobs, lc); + tAddr[i] = re.addOrGetRetrieveQueueAndCommit(vid.str(), agentRef, cta::objectstore::QueueType::JobsToTransfer, lc); cta::objectstore::RetrieveQueue rq(tAddr[i], be); } // Create the various ATFR's, stopping one step further each time. @@ -653,7 +642,7 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) { std::list<std::string> retrieveQueues = { "Tape0", "Tape1" }; for (auto & vid: retrieveQueues) { // Empty queue - cta::objectstore::RetrieveQueue rq(re.getRetrieveQueueAddress(vid, cta::objectstore::QueueType::LiveJobs), be); + cta::objectstore::RetrieveQueue rq(re.getRetrieveQueueAddress(vid, cta::objectstore::QueueType::JobsToTransfer), be); cta::objectstore::ScopedExclusiveLock rql(rq); rq.fetch(); std::list<std::string> jtrl; @@ -663,7 +652,7 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) { rq.removeJobsAndCommit(jtrl); rql.release(); // Remove queues from root - re.removeRetrieveQueueAndCommit(vid, cta::objectstore::QueueType::LiveJobs, lc); + re.removeRetrieveQueueAndCommit(vid, cta::objectstore::QueueType::JobsToTransfer, lc); } ASSERT_NO_THROW(re.removeAgentRegisterAndCommit(lc)); diff --git a/objectstore/Helpers.cpp b/objectstore/Helpers.cpp index 5ae488180d58f18ebba0624243335551cfa226d9..556070e9bf2149f7cf5c60782709d78b66635173 100644 --- a/objectstore/Helpers.cpp +++ b/objectstore/Helpers.cpp @@ -413,7 +413,7 @@ std::list<SchedulerDatabase::RetrieveQueueStatistics> Helpers::getRetrieveQueueS continue; std::string rqAddr; try { - std::string rqAddr = re.getRetrieveQueueAddress(tf.second.vid, QueueType::LiveJobs); + std::string rqAddr = re.getRetrieveQueueAddress(tf.second.vid, QueueType::JobsToTransfer); } catch (cta::exception::Exception &) { ret.push_back(SchedulerDatabase::RetrieveQueueStatistics()); ret.back().vid=tf.second.vid; diff --git a/objectstore/QueueType.cpp b/objectstore/QueueType.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2b8af7d4ea50eb0ec4f58f1b61f8b0ead2af1046 --- /dev/null +++ b/objectstore/QueueType.cpp @@ -0,0 +1,37 @@ + +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "QueueType.hpp" + +namespace cta { namespace objectstore { + +std::string toString(QueueType queueType) { + switch (queueType) { + case QueueType::FailedJobs: + return "failedJobs"; + case QueueType::JobsToReport: + return "jobsToReport"; + case QueueType::JobsToTransfer: + return "jobsToTranfer"; + default: + return "Unknown queue type."; + } +} + +}} //namespace cta::objectstore diff --git a/objectstore/QueueType.hpp b/objectstore/QueueType.hpp index 132ed6aa30f9ec53fede2af41fd45bbd8b2e8bf9..8180ccacbfcc0f17414531989e32ac9546a996d3 100644 --- a/objectstore/QueueType.hpp +++ b/objectstore/QueueType.hpp @@ -18,6 +18,9 @@ #pragma once +#include <string> + namespace cta { namespace objectstore { -enum class QueueType { LiveJobs, FailedJobs, JobsToReport }; +enum class QueueType { JobsToTransfer, FailedJobs, JobsToReport }; +std::string toString(QueueType queueType); }} // namespace cta::objectstore \ No newline at end of file diff --git a/objectstore/RetrieveQueueAlgorithms.hpp b/objectstore/RetrieveQueueAlgorithms.hpp index 239d16ad35682b9e05483a7b6b081a24dae73f4a..baec47426e857fd7dae9db5261e7089bc3a528c3 100644 --- a/objectstore/RetrieveQueueAlgorithms.hpp +++ b/objectstore/RetrieveQueueAlgorithms.hpp @@ -51,9 +51,9 @@ public: template <class Element> static ElementAddress getElementAddress(const Element & e) { return e.retrieveRequest->getAddressIfSet(); } - static void getLockedAndFetched(Container & cont, ScopedExclusiveLock & aqL, AgentReference & agRef, const ContainerIdentifyer & contId, - log::LogContext & lc) { - Helpers::getLockedAndFetchedQueue<Container>(cont, aqL, agRef, contId, QueueType::LiveJobs, lc); + static void getLockedAndFetched(Container & cont, ScopedExclusiveLock & aqL, AgentReference & agRef, + const ContainerIdentifyer & contId, QueueType queueType, log::LogContext & lc) { + Helpers::getLockedAndFetchedQueue<Container>(cont, aqL, agRef, contId, queueType, lc); } static void addReferencesAndCommit(Container & cont, InsertedElement::list & elemMemCont, diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index ca196c23620a90dccd1aa147c3e5735c4b382b4a..cde922c4a2a64b5534ebf30ad02ca16e07f79b56 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -25,6 +25,7 @@ #include "RetrieveQueue.hpp" #include "objectstore/cta.pb.h" #include "Helpers.hpp" +#include "common/utils/utils.hpp" #include <google/protobuf/util/json_util.h> #include <cmath> @@ -54,6 +55,9 @@ RetrieveRequest::RetrieveRequest(GenericObject& go): void RetrieveRequest::initialize() { // Setup underlying object ObjectOps<serializers::RetrieveRequest, serializers::RetrieveRequest_t>::initialize(); + m_payload.set_failurereported(false); + m_payload.set_failurereportlog(""); + m_payload.set_failurereporturl(""); // This object is good to go (to storage) m_payloadInterpreted = true; } @@ -73,13 +77,13 @@ void RetrieveRequest::garbageCollect(const std::string& presumedOwner, AgentRefe .add("owner", getOwner()); lc.log(log::INFO, "In RetrieveRequest::garbageCollect(): no garbage collection needed."); } - // The owner is indeed the right one. We should requeue the request if possible. - // Find the vids for active jobs in the request (pending ones). + // The owner is indeed the right one. We should requeue the request either to + // the to tranfer queue for one vid, or to the to report (or failed) queue (for one arbitrary VID). + // Find the vids for active jobs in the request (to transfer ones). using serializers::RetrieveJobStatus; - std::set<RetrieveJobStatus> validStates({RetrieveJobStatus::RJS_Pending, RetrieveJobStatus::RJS_Selected}); std::set<std::string> candidateVids; for (auto &j: m_payload.jobs()) { - if (validStates.count(j.status())) { + if (j.status() == RetrieveJobStatus::RJS_ToTransfer) { // Find the job details in tape file for (auto &tf: m_payload.archivefile().tapefiles()) { if (tf.copynb() == j.copynb()) { @@ -95,98 +99,173 @@ void RetrieveRequest::garbageCollect(const std::string& presumedOwner, AgentRefe found:; } } - // If there is no candidate, we cancel the job - // TODO: in the future, we might queue it for reporting to EOS. - if (candidateVids.empty()) { - remove(); - log::ScopedParamContainer params(lc); - params.add("jobObject", getAddressIfSet()); - lc.log(log::INFO, "In RetrieveRequest::garbageCollect(): deleted job as no tape file is available for recall."); - return; - } - // If we have to fetch the status of the tapes and queued for the non-disabled vids. - auto bestVid=Helpers::selectBestRetrieveQueue(candidateVids, catalogue, m_objectStore); - // Find the corresponding tape file, which will give the copynb, which will allow finding the retrieve job. - auto bestTapeFile=m_payload.archivefile().tapefiles().begin(); - while (bestTapeFile != m_payload.archivefile().tapefiles().end()) { - if (bestTapeFile->vid() == bestVid) - goto tapeFileFound; - bestTapeFile++; - } - { - std::stringstream err; - err << "In RetrieveRequest::garbageCollect(): could not find tapefile for vid " << bestVid; - throw exception::Exception(err.str()); - } -tapeFileFound:; - auto tapeSelectionTime = t.secs(utils::Timer::resetCounter); - auto bestJob=m_payload.mutable_jobs()->begin(); - while (bestJob!=m_payload.mutable_jobs()->end()) { - if (bestJob->copynb() == bestTapeFile->copynb()) - goto jobFound; - bestJob++; - } - { - std::stringstream err; - err << "In RetrieveRequest::garbageCollect(): could not find job for copynb " << bestTapeFile->copynb(); - throw exception::Exception(err.str()); - } -jobFound:; - // We now need to grab the queue a requeue the request. - RetrieveQueue rq(m_objectStore); - ScopedExclusiveLock rql; - Helpers::getLockedAndFetchedQueue<RetrieveQueue>(rq, rql, agentReference, bestVid, QueueType::LiveJobs, lc); - // Enqueue add the job to the queue - objectstore::MountPolicySerDeser mp; - mp.deserialize(m_payload.mountpolicy()); - std::list<RetrieveQueue::JobToAdd> jta; - jta.push_back({bestTapeFile->copynb(), bestTapeFile->fseq(), getAddressIfSet(), m_payload.archivefile().filesize(), - mp, (signed)m_payload.schedulerrequest().entrylog().time()}); - rq.addJobsIfNecessaryAndCommit(jta, agentReference, lc); - auto jobsSummary=rq.getJobsSummary(); - auto queueUpdateTime = t.secs(utils::Timer::resetCounter); - // We can now make the transition official - bestJob->set_status(serializers::RetrieveJobStatus::RJS_Pending); - m_payload.set_activecopynb(bestJob->copynb()); - setOwner(rq.getAddressIfSet()); - commit(); - Helpers::updateRetrieveQueueStatisticsCache(bestVid, jobsSummary.files, jobsSummary.bytes, jobsSummary.priority); - rql.release(); - auto commitUnlockQueueTime = t.secs(utils::Timer::resetCounter); + std::string bestVid; + // If no tape file is a candidate, we just need to skip to queueing to the failed queue + if (candidateVids.empty()) goto queueForFailure; + // We have a chance to find an available tape. Let's compute best VID (this will + // filter on tape availability. + try { + // If we have to fetch the status of the tapes and queued for the non-disabled vids. + auto bestVid=Helpers::selectBestRetrieveQueue(candidateVids, catalogue, m_objectStore); + goto queueForTransfer; + } catch (Helpers::NoTapeAvailableForRetrieve &) {} +queueForFailure:; { + // If there is no candidate, we fail the jobs that are not yet, and queue the request as failed (on any VID). + for (auto & j: *m_payload.mutable_jobs()) { + if (j.status() == RetrieveJobStatus::RJS_ToTransfer) { + j.set_status(RetrieveJobStatus::RJS_Failed); log::ScopedParamContainer params(lc); - params.add("jobObject", getAddressIfSet()) - .add("queueObject", rq.getAddressIfSet()) - .add("copynb", bestTapeFile->copynb()) - .add("vid", bestTapeFile->vid()) - .add("tapeSelectionTime", tapeSelectionTime) - .add("queueUpdateTime", queueUpdateTime) - .add("commitUnlockQueueTime", commitUnlockQueueTime); - lc.log(log::INFO, "In RetrieveRequest::garbageCollect(): requeued the request."); + params.add("fileId", m_payload.archivefile().archivefileid()) + .add("copyNb", j.copynb()); + for (auto &tf: m_payload.archivefile().tapefiles()) { + if (tf.copynb() == j.copynb()) { + params.add("vid", tf.vid()) + .add("fSeq", tf.fseq()); + break; + } + } + // Generate the last failure for this job (tape unavailable). + *j.mutable_failurelogs()->Add() = utils::getCurrentLocalTime() + " " + + utils::getShortHostname() + " In RetrieveRequest::garbageCollect(): No VID avaiable to requeue the request. Failing it."; + lc.log(log::ERR, "In RetrieveRequest::garbageCollect(): No VID avaiable to requeue the request. Failing all jobs."); + } + } + // Ok, the request is ready to be queued. We will queue it to the VID corresponding + // to the latestes queued copy. + auto activeCopyNb = m_payload.activecopynb(); + std::string activeVid; + uint64_t activeFseq; + for (auto &tf: m_payload.archivefile().tapefiles()) { + if (tf.copynb() == activeCopyNb) { + activeVid = tf.vid(); + activeFseq = tf.fseq(); + goto failedVidFound; + } + } + { + std::stringstream err; + err << "In RetrieveRequest::garbageCollect(): could not find tapefile for copynb " << activeCopyNb; + throw exception::Exception(err.str()); + } + failedVidFound:; + // We now need to grab the failed queue and queue the request. + RetrieveQueue rq(m_objectStore); + ScopedExclusiveLock rql; + // We need to know if this failure got reported yet. + QueueType queueType=isFailureReported()?QueueType::FailedJobs:QueueType::JobsToReport; + Helpers::getLockedAndFetchedQueue<RetrieveQueue>(rq, rql, agentReference, bestVid, queueType, lc); + // Enqueue the job + objectstore::MountPolicySerDeser mp; + std::list<RetrieveQueue::JobToAdd> jta; + jta.push_back({activeCopyNb, activeFseq, getAddressIfSet(), m_payload.archivefile().filesize(), + mp, (signed)m_payload.schedulerrequest().entrylog().time()}); + rq.addJobsIfNecessaryAndCommit(jta, agentReference, lc); + auto queueUpdateTime = t.secs(utils::Timer::resetCounter); + // We can now make the transition official. + setOwner(rq.getAddressIfSet()); + commit(); + rql.release(); + auto commitUnlockQueueTime = t.secs(utils::Timer::resetCounter); + { + log::ScopedParamContainer params(lc); + params.add("jobObject", getAddressIfSet()) + .add("fileId", m_payload.archivefile().archivefileid()) + .add("queueObject", rq.getAddressIfSet()) + .add("copynb", activeCopyNb) + .add("vid", activeVid) + .add("queueUpdateTime", queueUpdateTime) + .add("commitUnlockQueueTime", commitUnlockQueueTime) + .add("queueType", toString(queueType)); + lc.log(log::INFO, "In RetrieveRequest::garbageCollect(): queued the request to the failed queue."); + } } - timespec ts; - // We will sleep a bit to make sure other processes can also access the queue - // as we are very likely to be part of a tight loop. - // TODO: ideally, end of session requeueing and garbage collection should be - // done in parallel. - // We sleep half the time it took to queue to give a chance to other lockers. - double secSleep, fracSecSleep; - fracSecSleep = std::modf(queueUpdateTime / 2, &secSleep); - ts.tv_sec = secSleep; - ts.tv_nsec = std::round(fracSecSleep * 1000 * 1000 * 1000); - nanosleep(&ts, nullptr); - auto sleepTime = t.secs(); + + // Find the corresponding tape file, which will give the copynb, which will allow finding the retrieve job. +queueForTransfer:; { - log::ScopedParamContainer params(lc); - params.add("jobObject", getAddressIfSet()) - .add("queueObject", rq.getAddressIfSet()) - .add("copynb", bestTapeFile->copynb()) - .add("vid", bestTapeFile->vid()) - .add("tapeSelectionTime", tapeSelectionTime) - .add("queueUpdateTime", queueUpdateTime) - .add("commitUnlockQueueTime", commitUnlockQueueTime) - .add("sleepTime", sleepTime); - lc.log(log::INFO, "In RetrieveRequest::garbageCollect(): slept some time to not sit on the queue after GC requeueing."); + auto bestTapeFile=m_payload.archivefile().tapefiles().begin(); + while (bestTapeFile != m_payload.archivefile().tapefiles().end()) { + if (bestTapeFile->vid() == bestVid) + goto tapeFileFound; + bestTapeFile++; + } + { + std::stringstream err; + err << "In RetrieveRequest::garbageCollect(): could not find tapefile for vid " << bestVid; + throw exception::Exception(err.str()); + } + tapeFileFound:; + auto tapeSelectionTime = t.secs(utils::Timer::resetCounter); + auto bestJob=m_payload.mutable_jobs()->begin(); + while (bestJob!=m_payload.mutable_jobs()->end()) { + if (bestJob->copynb() == bestTapeFile->copynb()) + goto jobFound; + bestJob++; + } + { + std::stringstream err; + err << "In RetrieveRequest::garbageCollect(): could not find job for copynb " << bestTapeFile->copynb(); + throw exception::Exception(err.str()); + } + jobFound:; + // We now need to grab the queue and requeue the request. + RetrieveQueue rq(m_objectStore); + ScopedExclusiveLock rql; + Helpers::getLockedAndFetchedQueue<RetrieveQueue>(rq, rql, agentReference, bestVid, QueueType::JobsToTransfer, lc); + // Enqueue the job + objectstore::MountPolicySerDeser mp; + mp.deserialize(m_payload.mountpolicy()); + std::list<RetrieveQueue::JobToAdd> jta; + jta.push_back({bestTapeFile->copynb(), bestTapeFile->fseq(), getAddressIfSet(), m_payload.archivefile().filesize(), + mp, (signed)m_payload.schedulerrequest().entrylog().time()}); + rq.addJobsIfNecessaryAndCommit(jta, agentReference, lc); + auto jobsSummary=rq.getJobsSummary(); + auto queueUpdateTime = t.secs(utils::Timer::resetCounter); + // We can now make the transition official. + m_payload.set_activecopynb(bestJob->copynb()); + setOwner(rq.getAddressIfSet()); + commit(); + Helpers::updateRetrieveQueueStatisticsCache(bestVid, jobsSummary.files, jobsSummary.bytes, jobsSummary.priority); + rql.release(); + auto commitUnlockQueueTime = t.secs(utils::Timer::resetCounter); + { + log::ScopedParamContainer params(lc); + params.add("jobObject", getAddressIfSet()) + .add("fileId", m_payload.archivefile().archivefileid()) + .add("queueObject", rq.getAddressIfSet()) + .add("copynb", bestTapeFile->copynb()) + .add("vid", bestTapeFile->vid()) + .add("tapeSelectionTime", tapeSelectionTime) + .add("queueUpdateTime", queueUpdateTime) + .add("commitUnlockQueueTime", commitUnlockQueueTime); + lc.log(log::INFO, "In RetrieveRequest::garbageCollect(): requeued the request."); + } + timespec ts; + // We will sleep a bit to make sure other processes can also access the queue + // as we are very likely to be part of a tight loop. + // TODO: ideally, end of session requeueing and garbage collection should be + // done in parallel. + // We sleep half the time it took to queue to give a chance to other lockers. + double secSleep, fracSecSleep; + fracSecSleep = std::modf(queueUpdateTime / 2, &secSleep); + ts.tv_sec = secSleep; + ts.tv_nsec = std::round(fracSecSleep * 1000 * 1000 * 1000); + nanosleep(&ts, nullptr); + auto sleepTime = t.secs(); + { + log::ScopedParamContainer params(lc); + params.add("jobObject", getAddressIfSet()) + .add("fileId", m_payload.archivefile().archivefileid()) + .add("queueObject", rq.getAddressIfSet()) + .add("copynb", bestTapeFile->copynb()) + .add("vid", bestTapeFile->vid()) + .add("tapeSelectionTime", tapeSelectionTime) + .add("queueUpdateTime", queueUpdateTime) + .add("commitUnlockQueueTime", commitUnlockQueueTime) + .add("sleepTime", sleepTime); + lc.log(log::INFO, "In RetrieveRequest::garbageCollect(): slept some time to not sit on the queue after GC requeueing."); + } } } @@ -202,9 +281,23 @@ void RetrieveRequest::addJob(uint64_t copyNb, uint16_t maxRetiesWithinMount, uin tf->set_maxtotalretries(maxTotalRetries); tf->set_retrieswithinmount(0); tf->set_totalretries(0); - tf->set_status(serializers::RetrieveJobStatus::RJS_Pending); + tf->set_status(serializers::RetrieveJobStatus::RJS_ToTransfer); +} + +//------------------------------------------------------------------------------ +// RetrieveRequest::getLastActiveVid() +//------------------------------------------------------------------------------ +std::string RetrieveRequest::getLastActiveVid() { + checkPayloadReadable(); + auto activeCopyNb = m_payload.activecopynb(); + for (auto & tf: m_payload.archivefile().tapefiles()) { + if (tf.copynb() == activeCopyNb) + return tf.vid(); + } + return m_payload.archivefile().tapefiles(0).vid(); } + //------------------------------------------------------------------------------ // RetrieveRequest::setSchedulerRequest() //------------------------------------------------------------------------------ @@ -333,13 +426,12 @@ bool RetrieveRequest::addJobFailure(uint16_t copyNumber, uint64_t mountId, * j.mutable_failurelogs()->Add() = failureReason; } if (j.totalretries() >= j.maxtotalretries()) { - j.set_status(serializers::RJS_Failed); - bool ret=finishIfNecessary(lc); - if (!ret) commit(); - return ret; + j.set_status(serializers::RetrieveJobStatus::RJS_FailedToReport); + for (auto & j2: m_payload.jobs()) + if (j2.status() == serializers::RetrieveJobStatus::RJS_ToTransfer) return false; + return true; } else { - j.set_status(serializers::RJS_Pending); - commit(); + j.set_status(serializers::RetrieveJobStatus::RJS_ToTransfer); return false; } } @@ -364,57 +456,43 @@ RetrieveRequest::RetryStatus RetrieveRequest::getRetryStatus(const uint16_t copy throw cta::exception::Exception("In RetrieveRequest::getRetryStatus(): job not found()"); } +//------------------------------------------------------------------------------ +// RetrieveRequest::getQueueType() +//------------------------------------------------------------------------------ +QueueType RetrieveRequest::getQueueType() { + checkPayloadReadable(); + bool hasToReport=false; + for (auto &j: m_payload.jobs()) { + // Any job is to be transfered => To transfer + switch(j.status()) { + case serializers::RetrieveJobStatus::RJS_ToTransfer: + return QueueType::JobsToTransfer; + break; + case serializers::RetrieveJobStatus::RJS_FailedToReport: + // Else any job to report => to report. + hasToReport=true; + break; + default: break; + } + } + if (hasToReport) return QueueType::JobsToReport; + return QueueType::FailedJobs; +} //------------------------------------------------------------------------------ // RetrieveRequest::statusToString() //------------------------------------------------------------------------------ std::string RetrieveRequest::statusToString(const serializers::RetrieveJobStatus& status) { switch(status) { - case serializers::RetrieveJobStatus::RJS_Complete: - return "Complete"; + case serializers::RetrieveJobStatus::RJS_ToTransfer: + return "ToTransfer"; case serializers::RetrieveJobStatus::RJS_Failed: return "Failed"; - case serializers::RetrieveJobStatus::RJS_LinkingToTape: - return "LinkingToTape"; - case serializers::RetrieveJobStatus::RJS_Pending: - return "Pending"; - case serializers::RetrieveJobStatus::RJS_Selected: - return "Selected"; default: return std::string("Unknown (")+std::to_string((uint64_t) status)+")"; } } - -//------------------------------------------------------------------------------ -// RetrieveRequest::finishIfNecessary() -//------------------------------------------------------------------------------ -bool RetrieveRequest::finishIfNecessary(log::LogContext & lc) { - checkPayloadWritable(); - // This function is typically called after changing the status of one job - // in memory. If the request is complete, we will just remove it. - // If all the jobs are either complete or failed, we can remove the request. - auto & jl=m_payload.jobs(); - using serializers::RetrieveJobStatus; - std::set<serializers::RetrieveJobStatus> finishedStatuses( - {RetrieveJobStatus::RJS_Complete, RetrieveJobStatus::RJS_Failed}); - for (auto & j: jl) - if (!finishedStatuses.count(j.status())) - return false; - log::ScopedParamContainer params(lc); - size_t failureNumber = 0; - for (auto failure: getFailures()) { - params.add(std::string("failure")+std::to_string(failureNumber++), failure); - } - remove(); - params.add("retrieveRequestObject", getAddressIfSet()); - for (auto & j: jl) { - params.add(std::string("statusForCopyNb")+std::to_string(j.copynb()), statusToString(j.status())); - } - lc.log(log::INFO, "In RetrieveRequest::finishIfNecessary(): removed finished retrieve request."); - return true; -} - //------------------------------------------------------------------------------ // RetrieveRequest::getJobStatus() //------------------------------------------------------------------------------ @@ -600,6 +678,28 @@ std::list<std::string> RetrieveRequest::getFailures() { return ret; } +//------------------------------------------------------------------------------ +// RetrieveRequest::setFailureReason() +//------------------------------------------------------------------------------ +void RetrieveRequest::setFailureReason(const std::string& reason) { + checkPayloadWritable(); + m_payload.set_failurereportlog(reason); +} -}} // namespace cta::objectstore +//------------------------------------------------------------------------------ +// RetrieveRequest::isFailureReported() +//------------------------------------------------------------------------------ +bool RetrieveRequest::isFailureReported() { + checkPayloadReadable(); + return m_payload.failurereported(); +} + +//------------------------------------------------------------------------------ +// RetrieveRequest::setFailureReported() +//------------------------------------------------------------------------------ +void RetrieveRequest::setFailureReported() { + checkPayloadWritable(); + m_payload.set_failurereported(true); +} +}} // namespace cta::objectstore diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp index f8e1388bcfad4d47a3bc5a7b137a48f247d33eef..0390c5124f653124d197da4bcc38b74314ff0871 100644 --- a/objectstore/RetrieveRequest.hpp +++ b/objectstore/RetrieveRequest.hpp @@ -21,6 +21,7 @@ #include "ObjectOps.hpp" #include "objectstore/cta.pb.h" #include "TapeFileSerDeser.hpp" +#include "QueueType.hpp" #include <list> #include "common/dataStructures/DiskFileInfo.hpp" #include "common/dataStructures/EntryLog.hpp" @@ -46,8 +47,10 @@ public: cta::catalogue::Catalogue & catalogue) override; // Job management ============================================================ void addJob(uint64_t copyNumber, uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries); - void setJobSelected(uint16_t copyNumber, const std::string & owner); - void setJobPending(uint16_t copyNumber); + std::string getLastActiveVid(); + void setFailureReason(const std::string & reason); + bool isFailureReported(); + void setFailureReported(); class JobDump { public: uint64_t copyNb; @@ -74,10 +77,10 @@ public: uint64_t maxTotalRetries = 0; }; RetryStatus getRetryStatus(uint16_t copyNumber); + /// Returns queue type depending on the compound statuses of all retrieve requests. + QueueType getQueueType(); std::list<std::string> getFailures(); std::string statusToString(const serializers::RetrieveJobStatus & status); - bool finishIfNecessary(log::LogContext & lc); /**< Handling of the consequences of a job status change for the entire request. - * This function returns true if the request got finished. */ serializers::RetrieveJobStatus getJobStatus(uint16_t copyNumber); // Mark all jobs as pending mount (following their linking to a tape pool) void setAllJobsLinkingToTapePool(); diff --git a/objectstore/RootEntry.cpp b/objectstore/RootEntry.cpp index 80663acc764772c899fe746c8f15838023563859..11ecc4d5d62417bac00f60bc5c523baf07fd9c70 100644 --- a/objectstore/RootEntry.cpp +++ b/objectstore/RootEntry.cpp @@ -67,11 +67,11 @@ bool RootEntry::isEmpty() { if (m_payload.has_schedulerlockpointer() && m_payload.schedulerlockpointer().address().size()) return false; - for (auto &qt: {QueueType::LiveJobs, QueueType::FailedJobs, QueueType::JobsToReport}) { + for (auto &qt: {QueueType::JobsToTransfer, QueueType::JobsToReport, QueueType::FailedJobs}) { if (archiveQueuePointers(qt).size()) return false; } - for (auto &qt: {QueueType::LiveJobs, QueueType::FailedJobs}) { + for (auto &qt: {QueueType::JobsToTransfer, QueueType::JobsToReport, QueueType::FailedJobs}) { if (retrieveQueuePointers(qt).size()) return false; } @@ -101,7 +101,7 @@ void RootEntry::garbageCollect(const std::string& presumedOwner, AgentReference const ::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::ArchiveQueuePointer>& RootEntry::archiveQueuePointers(QueueType queueType) { switch(queueType) { - case QueueType::LiveJobs: + case QueueType::JobsToTransfer: return m_payload.livearchivejobsqueuepointers(); case QueueType::JobsToReport: return m_payload.archivejobstoreportqueuepointers(); @@ -114,7 +114,7 @@ const ::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::Arch ::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::ArchiveQueuePointer>* RootEntry::mutableArchiveQueuePointers(QueueType queueType) { switch(queueType) { - case QueueType::LiveJobs: + case QueueType::JobsToTransfer: return m_payload.mutable_livearchivejobsqueuepointers(); case QueueType::JobsToReport: return m_payload.mutable_archivejobstoreportqueuepointers(); @@ -127,8 +127,10 @@ const ::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::Arch const ::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::RetrieveQueuePointer>& RootEntry::retrieveQueuePointers(QueueType queueType) { switch(queueType) { - case QueueType::LiveJobs: + case QueueType::JobsToTransfer: return m_payload.liveretrievejobsqueuepointers(); + case QueueType::JobsToReport: + return m_payload.retrievefailurestoreportqueuepointers(); case QueueType::FailedJobs: return m_payload.failedretrievejobsqueuepointers(); default: @@ -138,8 +140,10 @@ const ::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::Retr ::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::RetrieveQueuePointer>* RootEntry::mutableRetrieveQueuePointers(QueueType queueType) { switch(queueType) { - case QueueType::LiveJobs: + case QueueType::JobsToTransfer: return m_payload.mutable_liveretrievejobsqueuepointers(); + case QueueType::JobsToReport: + return m_payload.mutable_retrievefailurestoreportqueuepointers(); case QueueType::FailedJobs: return m_payload.mutable_failedretrievejobsqueuepointers(); default: @@ -161,8 +165,6 @@ namespace { } } - - std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool, AgentReference& agentRef, QueueType queueType, log::LogContext & lc) { checkPayloadWritable(); @@ -170,29 +172,27 @@ std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool try { return serializers::findElement(archiveQueuePointers(queueType), tapePool).address(); } catch (serializers::NotFound &) {} - // Insert the archive queue, then its pointer, with agent intent log update - // First generate the intent. We expect the agent to be passed locked. - std::string archiveQueueAddress = agentRef.nextId("ArchiveQueue"); - agentRef.addToOwnership(archiveQueueAddress, m_objectStore); - // Then create the tape pool queue object - ArchiveQueue aq(archiveQueueAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore); - aq.initialize(tapePool); - aq.setOwner(agentRef.getAgentAddress()); - aq.setBackupOwner("root"); - aq.insert(); - ScopedExclusiveLock tpl(aq); - // Now move the tape pool's ownership to the root entry + // Insert the archive queue pointer in the root entry, then the queue. + std::string archiveQueueNameHeader = "ArchiveQueue"; + switch(queueType) { + case QueueType::JobsToTransfer: archiveQueueNameHeader+="ToTransfer"; break; + case QueueType::JobsToReport: archiveQueueNameHeader+="ToReport"; break; + case QueueType::FailedJobs: archiveQueueNameHeader+="Failed"; break; + default: break; + } + std::string archiveQueueAddress = agentRef.nextId(archiveQueueNameHeader+"-"+tapePool); + // Now move create a reference the tape pool's ownership to the root entry auto * tpp = mutableArchiveQueuePointers(queueType)->Add(); tpp->set_address(archiveQueueAddress); tpp->set_name(tapePool); // We must commit here to ensure the tape pool object is referenced. commit(); - // Now update the tape pool's ownership. + // Then insert the queue object + ArchiveQueue aq(archiveQueueAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore); + aq.initialize(tapePool); aq.setOwner(getAddressIfSet()); aq.setBackupOwner(getAddressIfSet()); - aq.commit(); - // ... and clean up the agent - agentRef.removeFromOwnership(archiveQueueAddress, m_objectStore); + aq.insert(); return archiveQueueAddress; } @@ -303,27 +303,26 @@ std::string RootEntry::addOrGetRetrieveQueueAndCommit(const std::string& vid, Ag // Insert the retrieve queue, then its pointer, with agent intent log update // First generate the intent. We expect the agent to be passed locked. // The make of the vid in the object name will be handy. - std::string retrieveQueueAddress = agentRef.nextId(std::string("RetrieveQueue-")+vid); - agentRef.addToOwnership(retrieveQueueAddress, m_objectStore); - // Then create the tape pool queue object - RetrieveQueue rq(retrieveQueueAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore); - rq.initialize(vid); - rq.setOwner(agentRef.getAgentAddress()); - rq.setBackupOwner("root"); - rq.insert(); - ScopedExclusiveLock tpl(rq); - // Now move the tape pool's ownership to the root entry + std::string retrieveQueueNameHeader = "RetrieveQueue"; + switch(queueType) { + case QueueType::JobsToTransfer: retrieveQueueNameHeader+="ToTransfer"; break; + case QueueType::JobsToReport: retrieveQueueNameHeader+="ToReport"; break; + case QueueType::FailedJobs: retrieveQueueNameHeader+="Failed"; break; + default: break; + } + std::string retrieveQueueAddress = agentRef.nextId(retrieveQueueNameHeader+"-"+vid); + // Reference the queue to the root entry before creation auto * rqp = mutableRetrieveQueuePointers(queueType)->Add(); rqp->set_address(retrieveQueueAddress); rqp->set_vid(vid); // We must commit here to ensure the tape pool object is referenced. commit(); - // Now update the tape pool's ownership. + // Then create the tape pool queue object + RetrieveQueue rq(retrieveQueueAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore); + rq.initialize(vid); rq.setOwner(getAddressIfSet()); - rq.setBackupOwner(getAddressIfSet()); - rq.commit(); - // ... and clean up the agent - agentRef.removeFromOwnership(retrieveQueueAddress, m_objectStore); + rq.setBackupOwner("root"); + rq.insert(); return retrieveQueueAddress; } diff --git a/objectstore/RootEntryTest.cpp b/objectstore/RootEntryTest.cpp index ed0e56e5e7de4fe985180e68a449104f46fdacb9..3bd8cdcb4e442662cd7a0db9f81a22bb1dbe080c 100644 --- a/objectstore/RootEntryTest.cpp +++ b/objectstore/RootEntryTest.cpp @@ -97,9 +97,9 @@ TEST (ObjectStore, RootEntryArchiveQueues) { cta::objectstore::RootEntry re(be); cta::objectstore::ScopedExclusiveLock lock(re); re.fetch(); - ASSERT_THROW(re.getArchiveQueueAddress("tapePool1", cta::objectstore::QueueType::LiveJobs), + ASSERT_THROW(re.getArchiveQueueAddress("tapePool1", cta::objectstore::QueueType::JobsToTransfer), cta::objectstore::RootEntry::NoSuchArchiveQueue); - tpAddr1 = re.addOrGetArchiveQueueAndCommit("tapePool1", agr, cta::objectstore::QueueType::LiveJobs, lc); + tpAddr1 = re.addOrGetArchiveQueueAndCommit("tapePool1", agr, cta::objectstore::QueueType::JobsToTransfer, lc); // Check that we car read it cta::objectstore::ArchiveQueue aq(tpAddr1, be); cta::objectstore::ScopedSharedLock aql(aq); @@ -110,7 +110,7 @@ TEST (ObjectStore, RootEntryArchiveQueues) { cta::objectstore::RootEntry re(be); cta::objectstore::ScopedExclusiveLock lock(re); re.fetch(); - tpAddr2 = re.addOrGetArchiveQueueAndCommit("tapePool2", agr, cta::objectstore::QueueType::LiveJobs, lc); + tpAddr2 = re.addOrGetArchiveQueueAndCommit("tapePool2", agr, cta::objectstore::QueueType::JobsToTransfer, lc); ASSERT_TRUE(be.exists(tpAddr2)); } { @@ -118,7 +118,7 @@ TEST (ObjectStore, RootEntryArchiveQueues) { cta::objectstore::RootEntry re(be); cta::objectstore::ScopedExclusiveLock lock(re); re.fetch(); - re.removeArchiveQueueAndCommit("tapePool2", cta::objectstore::QueueType::LiveJobs, lc); + re.removeArchiveQueueAndCommit("tapePool2", cta::objectstore::QueueType::JobsToTransfer, lc); ASSERT_FALSE(be.exists(tpAddr2)); } // Unregister the agent @@ -129,7 +129,7 @@ TEST (ObjectStore, RootEntryArchiveQueues) { cta::objectstore::ScopedExclusiveLock lock(re); re.fetch(); re.removeAgentRegisterAndCommit(lc); - re.removeArchiveQueueAndCommit("tapePool1", cta::objectstore::QueueType::LiveJobs, lc); + re.removeArchiveQueueAndCommit("tapePool1", cta::objectstore::QueueType::JobsToTransfer, lc); ASSERT_FALSE(be.exists(tpAddr1)); re.removeIfEmpty(lc); ASSERT_FALSE(re.exists()); diff --git a/objectstore/cta-objectstore-dereference-removed-queues.cpp b/objectstore/cta-objectstore-dereference-removed-queues.cpp index 51f60249528547fc5abadbdf3423c5481b6e62a8..d440b024bb558cea47802ba294686ae62f43bca4 100644 --- a/objectstore/cta-objectstore-dereference-removed-queues.cpp +++ b/objectstore/cta-objectstore-dereference-removed-queues.cpp @@ -58,14 +58,14 @@ int main(int argc, char ** argv) { cta::objectstore::ScopedExclusiveLock rel(re); re.fetch(); std::list<std::string> missingArchiveQueues, missingRetrieveQueues; - for (auto & aq: re.dumpArchiveQueues(cta::objectstore::QueueType::LiveJobs)) { + for (auto & aq: re.dumpArchiveQueues(cta::objectstore::QueueType::JobsToTransfer)) { if (!be->exists(aq.address)) { missingArchiveQueues.emplace_back(aq.tapePool); std::cout << "The archive queue for tape pool " << aq.tapePool << " at address " << aq.address << " is missing and will be dereferenced." << std::endl; } } - for (auto & rq: re.dumpRetrieveQueues(cta::objectstore::QueueType::LiveJobs)) { + for (auto & rq: re.dumpRetrieveQueues(cta::objectstore::QueueType::JobsToTransfer)) { if (!be->exists(rq.address)) { missingRetrieveQueues.emplace_back(rq.vid); std::cout << "The retrieve queue for vid " << rq.vid << " at address " << rq.address @@ -74,11 +74,11 @@ int main(int argc, char ** argv) { } // Actually do the job for (auto & tp: missingArchiveQueues) { - re.removeMissingArchiveQueueReference(tp, cta::objectstore::QueueType::LiveJobs); + re.removeMissingArchiveQueueReference(tp, cta::objectstore::QueueType::JobsToTransfer); std::cout << "Archive queue for tape pool " << tp << "dereferenced." << std::endl; } for (auto & vid: missingRetrieveQueues) { - re.removeMissingRetrieveQueueReference(vid, cta::objectstore::QueueType::LiveJobs); + re.removeMissingRetrieveQueueReference(vid, cta::objectstore::QueueType::JobsToTransfer); std::cout << "Retrieve queue for vid " << vid << "dereferenced." << std::endl; } if (missingArchiveQueues.size() || missingRetrieveQueues.size()) { diff --git a/objectstore/cta.proto b/objectstore/cta.proto index def6b690025673e06bdbdb1bd221e50799239e67..a3016849d479c4ffd1c6448db21a229f6bbd5b3c 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -102,7 +102,9 @@ message RetrieveQueuePointer { message RootEntry { repeated ArchiveQueuePointer livearchivejobsqueuepointers = 1050; repeated RetrieveQueuePointer liveretrievejobsqueuepointers = 1060; + repeated ArchiveQueuePointer archivefailurestoreportqueuepointers = 1061; repeated ArchiveQueuePointer failedarchivejobsqueuepointers = 1062; + repeated RetrieveQueuePointer retrievefailurestoreportqueuepointers = 1063; repeated RetrieveQueuePointer failedretrievejobsqueuepointers = 1065; repeated ArchiveQueuePointer archivejobstoreportqueuepointers = 1068; optional DriveRegisterPointer driveregisterpointer = 1070; @@ -181,58 +183,8 @@ message ArchiveFile { required string storageclass = 4361; } -// ------------- Archive Jobs -------------------------------------------------- - -// The status of the individual archive jobs. The jobs are materialised -// by table entries in the ArchiveRequest. -// This life cycle represented by the following enum - -enum ArchiveJobStatus { - AJS_LinkingToArchiveQueue = 1; - AJS_PendingMount = 2; - AJS_Selected = 3; - AJS_Complete = 4; - AJS_Failed = 999; -} - -message FilePathAndStatus { - required string path = 4450; - required uint64 mode = 4451; - required uint64 size = 4452; - required UserIdentity owner = 4453; -} - -// ------------- Retrieve Jobs ------------------------------------------------- - -// The status of the individual retrieve jobs. The jobs are materialised -// by table entries in the RetrieveRequest. -// This life cycle represented by the following enum - -enum RetrieveJobStatus { - RJS_LinkingToTape = 0; - RJS_Pending = 1; - RJS_Selected = 2; - RJS_Complete = 3; - RJS_Failed = 99; -} - -message RetrieveJobEntry { - required uint32 copynb = 4700; - required string tape = 4701; - required string tapeaddress = 4702; - required uint64 fseq = 4703; - required uint64 blockid = 4704; - required RetrieveJobStatus status = 4705; - required uint32 totalretries = 4706; - required uint32 retrieswithinmount = 4707; - required uint64 compressedsize = 4708; - required uint64 creationtime = 4709; -} - // ------------- Drives handling ---------------------------------------------- - - message DriveState { required string drivename = 5000; required string host = 5001; @@ -280,8 +232,6 @@ message SchedulerGlobalLock { required uint64 nextmountid = 8000; } -// ------------- New interface ------------------------------------------------ - message User { required string name = 8800; required string group = 8810; @@ -305,6 +255,21 @@ message MountPolicy { required string comment = 8988; } +// ------------- Archive Jobs -------------------------------------------------- + +// The status of the individual archive jobs. The jobs are materialised +// by table entries in the ArchiveRequest. +// This life cycle represented by the following enum + +enum ArchiveJobStatus { + AJS_ToTransfer = 1; + AJS_ToReport = 2; + AJS_Complete = 3; + AJS_FailedToReport = 997; + AJS_Failed = 998; + AJS_Abandoned = 999; +} + message ArchiveJob { required uint32 copynb = 4400; required string tapepool = 4401; @@ -339,6 +304,21 @@ message ArchiveRequest { repeated ArchiveJob jobs = 9092; } +// ------------- Retrieve Jobs ------------------------------------------------- + +// The status of the individual retrieve jobs. The jobs are materialised +// by table entries in the RetrieveRequest. +// This life cycle represented by the following enum +// There is no complete state as the completion of one jobs implies the +// completion of the whole requests, and leads to the immediate deletion +// of the request. + +enum RetrieveJobStatus { + RJS_ToTransfer = 1; + RJS_FailedToReport = 998; + RJS_Failed = 999; +} + message SchedulerRetrieveRequest { required UserIdentity requester = 9100; required uint64 ArchiveFileId = 9101; @@ -365,6 +345,9 @@ message RetrieveRequest { required ArchiveFile archivefile = 9152; required uint32 activecopynb = 9153; repeated RetrieveJob jobs = 9154; + required string failurereporturl = 9155; + required string failurereportlog = 9156; + required bool failurereported = 9157; } message ValueCountPair { diff --git a/scheduler/OStoreDB/MemQueues.hpp b/scheduler/OStoreDB/MemQueues.hpp index 29fa08d89a7b22dd2b970d210c5614a0403dbbc8..be267ea56d925bd1142934e42e83c8f83649bd6a 100644 --- a/scheduler/OStoreDB/MemQueues.hpp +++ b/scheduler/OStoreDB/MemQueues.hpp @@ -301,7 +301,7 @@ std::shared_ptr<SharedQueueLock<Queue, Request>> MemQueue<Request, Queue>::share auto & queue = *ret->m_queue; auto & aql = *ret->m_lock; objectstore::Helpers::getLockedAndFetchedQueue<Queue>(queue, aql, - *oStoreDB.m_agentReference, queueIndex, objectstore::QueueType::LiveJobs, logContext); + *oStoreDB.m_agentReference, queueIndex, objectstore::QueueType::JobsToTransfer, logContext); double getFetchedQueueTime = timer.secs(utils::Timer::resetCounter); size_t qJobsBefore=queue.dumpJobs().size(); uint64_t qBytesBefore=0; diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index cd898d9b0b700066ed49f75858a92e8424431504..57c7aa2d02ca832983af3b017e018e0dd927a864 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -179,7 +179,7 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro log::LogContext & logContext) { utils::Timer t, t2; // Walk the archive queues for statistics - for (auto & aqp: re.dumpArchiveQueues(QueueType::LiveJobs)) { + for (auto & aqp: re.dumpArchiveQueues(QueueType::JobsToTransfer)) { objectstore::ArchiveQueue aqueue(aqp.address, m_objectStore); // debug utility variable std::string __attribute__((__unused__)) poolName = aqp.tapePool; @@ -224,7 +224,7 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro logContext.log(log::INFO, "In OStoreDB::fetchMountInfo(): fetched an archive queue."); } // Walk the retrieve queues for statistics - for (auto & rqp: re.dumpRetrieveQueues(QueueType::LiveJobs)) { + for (auto & rqp: re.dumpRetrieveQueues(QueueType::JobsToTransfer)) { RetrieveQueue rqueue(rqp.address, m_objectStore); // debug utility variable std::string __attribute__((__unused__)) vid = rqp.vid; @@ -389,40 +389,44 @@ void OStoreDB::trimEmptyQueues(log::LogContext& lc) { RootEntry re(m_objectStore); ScopedExclusiveLock rel(re); re.fetch(); - try { - auto archiveQueueList = re.dumpArchiveQueues(QueueType::LiveJobs); - for (auto & a: archiveQueueList) { - ArchiveQueue aq(a.address, m_objectStore); - ScopedSharedLock aql(aq); - aq.fetch(); - if (!aq.getJobsSummary().jobs) { - aql.release(); - re.removeArchiveQueueAndCommit(a.tapePool, QueueType::LiveJobs, lc); - log::ScopedParamContainer params(lc); - params.add("tapePool", a.tapePool) - .add("queueObject", a.address); - lc.log(log::INFO, "In OStoreDB::trimEmptyQueues(): deleted empty archive queue."); + for (auto & queueType: { QueueType::JobsToTransfer, QueueType::JobsToReport, QueueType::FailedJobs} ) { + try { + auto archiveQueueList = re.dumpArchiveQueues(queueType); + for (auto & a: archiveQueueList) { + ArchiveQueue aq(a.address, m_objectStore); + ScopedSharedLock aql(aq); + aq.fetch(); + if (!aq.getJobsSummary().jobs) { + aql.release(); + re.removeArchiveQueueAndCommit(a.tapePool, queueType, lc); + log::ScopedParamContainer params(lc); + params.add("tapePool", a.tapePool) + .add("queueType", toString(queueType)) + .add("queueObject", a.address); + lc.log(log::INFO, "In OStoreDB::trimEmptyQueues(): deleted empty archive queue."); + } } - } - auto retrieveQeueueList = re.dumpRetrieveQueues(QueueType::LiveJobs); - for (auto & r:retrieveQeueueList) { - RetrieveQueue rq(r.address, m_objectStore); - ScopedSharedLock rql(rq); - rq.fetch(); - if (!rq.getJobsSummary().files) { - rql.release(); - re.removeRetrieveQueueAndCommit(r.vid, QueueType::LiveJobs, lc); - log::ScopedParamContainer params(lc); - params.add("vid", r.vid) - .add("queueObject", r.address); - lc.log(log::INFO, "In OStoreDB::trimEmptyQueues(): deleted empty retrieve queue."); + auto retrieveQeueueList = re.dumpRetrieveQueues(queueType); + for (auto & r:retrieveQeueueList) { + RetrieveQueue rq(r.address, m_objectStore); + ScopedSharedLock rql(rq); + rq.fetch(); + if (!rq.getJobsSummary().files) { + rql.release(); + re.removeRetrieveQueueAndCommit(r.vid, queueType, lc); + log::ScopedParamContainer params(lc); + params.add("vid", r.vid) + .add("queueType", toString(queueType)) + .add("queueObject", r.address); + lc.log(log::INFO, "In OStoreDB::trimEmptyQueues(): deleted empty retrieve queue."); + } } + } catch (cta::exception::Exception & ex) { + log::ScopedParamContainer params(lc); + params.add("exceptionMessage", ex.getMessageValue()); + lc.log(log::ERR, "In OStoreDB::trimEmptyQueues(): got an exception. Stack trace follows."); + lc.logBacktrace(log::ERR, ex.backtrace()); } - } catch (cta::exception::Exception & ex) { - log::ScopedParamContainer params(lc); - params.add("exceptionMessage", ex.getMessageValue()); - lc.log(log::ERR, "In OStoreDB::trimEmptyQueues(): got an exception. Stack trace follows."); - lc.logBacktrace(log::ERR, ex.backtrace()); } } @@ -493,7 +497,6 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common:: // We create the object here m_agentReference->addToOwnership(aReq->getAddressIfSet(), m_objectStore); double agentReferencingTime = timer.secs(cta::utils::Timer::reset_t::resetCounter); - aReq->setOwner(m_agentReference->getAgentAddress()); aReq->insert(); double insertionTime = timer.secs(cta::utils::Timer::reset_t::resetCounter); // The request is now safe in the object store. We can now return to the caller and fire (and forget) a thread @@ -576,11 +579,8 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common:: logContext.log(log::ERR, "In OStoreDB::queueArchive_bottomHalf(): failed to enqueue job"); return; } - // The request is now fully set. As it's multi-owned, we do not set the owner, - // just to disown it from the agent. - aReq->setOwner(""); + // The request is now fully set. auto archiveFile = aReq->getArchiveFile(); - aReq->commit(); double arOwnerResetTime = timer.secs(cta::utils::Timer::reset_t::resetCounter); arl.release(); double arLockRelease = timer.secs(cta::utils::Timer::reset_t::resetCounter); @@ -1656,7 +1656,7 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun AQAlgos::PopCriteria popCrieteria; popCrieteria.files = filesRequested; popCrieteria.bytes= bytesRequested; - auto jobs = aqAlgos.popNextBatch(mountInfo.tapePool, popCrieteria, logContext); + auto jobs = aqAlgos.popNextBatch(mountInfo.tapePool, objectstore::QueueType::JobsToTransfer, popCrieteria, logContext); // We can construct the return value. std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > ret; for (auto & j: jobs.elements) { @@ -1786,7 +1786,7 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo objectstore::RootEntry re(m_oStoreDB.m_objectStore); re.fetchNoLock(); std::string rqAddress; - auto rql = re.dumpRetrieveQueues(QueueType::LiveJobs); + auto rql = re.dumpRetrieveQueues(QueueType::JobsToTransfer); for (auto & rqp : rql) { if (rqp.vid == mountInfo.vid) rqAddress = rqp.address; @@ -1808,7 +1808,7 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo ScopedExclusiveLock rexl(re); re.fetch(); try { - re.removeRetrieveQueueAndCommit(mountInfo.vid, QueueType::LiveJobs, logContext); + re.removeRetrieveQueueAndCommit(mountInfo.vid, QueueType::JobsToTransfer, logContext); log::ScopedParamContainer params(logContext); params.add("vid", mountInfo.vid) .add("queueObject", rq.getAddressIfSet()); @@ -1983,7 +1983,7 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo // The queue should be removed as it is empty. ScopedExclusiveLock rexl(re); re.fetch(); - re.removeRetrieveQueueAndCommit(mountInfo.vid, QueueType::LiveJobs, logContext); + re.removeRetrieveQueueAndCommit(mountInfo.vid, QueueType::JobsToTransfer, logContext); log::ScopedParamContainer params(logContext); params.add("vid", mountInfo.vid) .add("queueObject", rq.getAddressIfSet()); @@ -2301,7 +2301,7 @@ bool OStoreDB::ArchiveJob::fail(const std::string& failureReason, log::LogContex objectstore::ArchiveQueue aq(m_oStoreDB.m_objectStore); objectstore::ScopedExclusiveLock aqlock; objectstore::Helpers::getLockedAndFetchedQueue<ArchiveQueue>(aq, aqlock, - *m_oStoreDB.m_agentReference, m_tapePool, objectstore::QueueType::LiveJobs, lc); + *m_oStoreDB.m_agentReference, m_tapePool, objectstore::QueueType::JobsToTransfer, lc); // Find the right job auto jl = m_archiveRequest.dumpJobs(); for (auto & j:jl) { @@ -2416,10 +2416,8 @@ bool OStoreDB::RetrieveJob::fail(const std::string& failureReason, log::LogConte // Get the best vid from the cache std::set<std::string> candidateVids; using serializers::RetrieveJobStatus; - std::set<serializers::RetrieveJobStatus> finishedStatuses( - {RetrieveJobStatus::RJS_Complete, RetrieveJobStatus::RJS_Failed}); for (auto & tf: m_retrieveRequest.getRetrieveFileQueueCriteria().archiveFile.tapeFiles) - if (!finishedStatuses.count(m_retrieveRequest.getJobStatus(tf.second.copyNb))) + if (m_retrieveRequest.getJobStatus(tf.second.copyNb)==serializers::RetrieveJobStatus::RJS_ToTransfer) candidateVids.insert(tf.second.vid); if (candidateVids.empty()) throw cta::exception::Exception("In OStoreDB::RetrieveJob::fail(): no active job after addJobFailure() returned false."); @@ -2445,7 +2443,7 @@ bool OStoreDB::RetrieveJob::fail(const std::string& failureReason, log::LogConte objectstore::RetrieveQueue rq(m_oStoreDB.m_objectStore); objectstore::ScopedExclusiveLock rql; objectstore::Helpers::getLockedAndFetchedQueue<RetrieveQueue>(rq, rql, - *m_oStoreDB.m_agentReference, bestVid, objectstore::QueueType::LiveJobs, logContext); + *m_oStoreDB.m_agentReference, bestVid, objectstore::QueueType::JobsToTransfer, logContext); auto rfqc = m_retrieveRequest.getRetrieveFileQueueCriteria(); auto & af = rfqc.archiveFile; auto & tf = af.tapeFiles.at(bestCopyNb); diff --git a/scheduler/OStoreDB/OStoreDBTest.cpp b/scheduler/OStoreDB/OStoreDBTest.cpp index 013837ead62fec9dcbea71193992cd948a9f3ff2..d33a6ec87a57f375b241a91cf77828f7bde413f3 100644 --- a/scheduler/OStoreDB/OStoreDBTest.cpp +++ b/scheduler/OStoreDB/OStoreDBTest.cpp @@ -141,7 +141,7 @@ TEST_P(OStoreDBTest, getBatchArchiveJob) { RootEntry re(osdbi.getBackend()); ScopedSharedLock rel(re); re.fetch(); - aqAddr = re.getArchiveQueueAddress("Tapepool1", cta::objectstore::QueueType::LiveJobs); + aqAddr = re.getArchiveQueueAddress("Tapepool1", cta::objectstore::QueueType::JobsToTransfer); rel.release(); ArchiveQueue aq(aqAddr, osdbi.getBackend()); ScopedSharedLock aql(aq); diff --git a/scheduler/OStoreDB/QueueItor.cpp b/scheduler/OStoreDB/QueueItor.cpp index dbdb04d8eeba389c800b91f14f67f98c97fc80e5..d98771cebb3cf92a6f8e4701b611238f051ff4c1 100644 --- a/scheduler/OStoreDB/QueueItor.cpp +++ b/scheduler/OStoreDB/QueueItor.cpp @@ -38,7 +38,7 @@ QueueItor(objectstore::Backend &objectStore, const std::string &queue_id) : objectstore::RootEntry re(m_objectStore); objectstore::ScopedSharedLock rel(re); re.fetch(); - m_jobQueuesQueue = re.dumpArchiveQueues(objectstore::QueueType::LiveJobs); + m_jobQueuesQueue = re.dumpArchiveQueues(objectstore::QueueType::JobsToTransfer); } // Set queue iterator to the first queue in the list @@ -141,7 +141,7 @@ QueueItor(objectstore::Backend &objectStore, const std::string &queue_id) : objectstore::RootEntry re(m_objectStore); objectstore::ScopedSharedLock rel(re); re.fetch(); - m_jobQueuesQueue = re.dumpRetrieveQueues(objectstore::QueueType::LiveJobs); + m_jobQueuesQueue = re.dumpRetrieveQueues(objectstore::QueueType::JobsToTransfer); } // Find the first queue