From 8ebbd146dfe44891fc76dfeb0ec7c4bd569d8c00 Mon Sep 17 00:00:00 2001 From: Michael Davis <michael.davis@cern.ch> Date: Thu, 26 Jul 2018 15:15:56 +0200 Subject: [PATCH] [os-generic-queues] Adds skeleton method definitions for RetrieveQueueAlgorithms --- objectstore/Algorithms.hpp | 8 +- objectstore/ArchiveQueueAlgorithms.cpp | 14 +- objectstore/RetrieveQueueAlgorithms.cpp | 261 ++++++++++++++++++++++++ objectstore/RetrieveQueueAlgorithms.hpp | 16 +- 4 files changed, 285 insertions(+), 14 deletions(-) diff --git a/objectstore/Algorithms.hpp b/objectstore/Algorithms.hpp index ba676865ee..5dd4f26ec7 100644 --- a/objectstore/Algorithms.hpp +++ b/objectstore/Algorithms.hpp @@ -139,7 +139,7 @@ struct ContainerTraits ElementsToSkipSet &elemtsToSkip, log::LogContext &lc); static const std::string c_containerTypeName; // = "genericContainer" - static const std::string c_identifyerType; // = "genericId" + static const std::string c_identifierType; // = "genericId" }; @@ -232,7 +232,7 @@ public: timingList.insertAndReset("queueUnlockTime", t); log::ScopedParamContainer params(lc); params.add("C", ContainerTraits<C>::c_containerTypeName) - .add(ContainerTraits<C>::c_identifyerType, contId) + .add(ContainerTraits<C>::c_identifierType, contId) .add("containerAddress", cont.getAddressIfSet()); contSummaryAfter.addDeltaToLog(contSummaryBefore, params); timingList.addToLog(params); @@ -378,7 +378,7 @@ public: } log::ScopedParamContainer params(lc); params.add("C", ContainerTraits<C>::c_containerTypeName) - .add(ContainerTraits<C>::c_identifyerType, contId) + .add(ContainerTraits<C>::c_identifierType, contId) .add("containerAddress", cont.getAddressIfSet()); ret.summary.addDeltaToLog(previousSummary, params); contSummaryAfter.addDeltaToLog(contSummaryBefore, params); @@ -390,7 +390,7 @@ public: { log::ScopedParamContainer params(lc); params.add("C", ContainerTraits<C>::c_containerTypeName) - .add(ContainerTraits<C>::c_identifyerType, contId); + .add(ContainerTraits<C>::c_identifierType, contId); ret.addToLog(params); timingList.addToLog(params); params.add("schedulerDbTime", totalTime.secs()); diff --git a/objectstore/ArchiveQueueAlgorithms.cpp b/objectstore/ArchiveQueueAlgorithms.cpp index 05bbc4a2c3..d7951432b1 100644 --- a/objectstore/ArchiveQueueAlgorithms.cpp +++ b/objectstore/ArchiveQueueAlgorithms.cpp @@ -22,6 +22,14 @@ namespace cta { namespace objectstore { +template<> +const std::string ContainerTraits<ArchiveQueue>::c_containerTypeName = "ArchiveQueue"; + +template<> +const std::string ContainerTraits<ArchiveQueue>::c_identifierType = "tapepool"; + +// ContainerTraitsTypes + void ContainerTraitsTypes<ArchiveQueue>::PoppedElementsSummary:: addDeltaToLog(const PoppedElementsSummary &previous, log::ScopedParamContainer ¶ms) { params.add("filesAdded", files - previous.files) @@ -64,11 +72,7 @@ addToLog(log::ScopedParamContainer ¶ms) { .add("files", summary.files); } -template<> -const std::string ContainerTraits<ArchiveQueue>::c_containerTypeName = "ArchiveQueue"; - -template<> -const std::string ContainerTraits<ArchiveQueue>::c_identifyerType = "tapepool"; +// ContainerTraits template<> void ContainerTraits<ArchiveQueue>::getLockedAndFetched(Container& cont, ScopedExclusiveLock& aqL, AgentReference& agRef, diff --git a/objectstore/RetrieveQueueAlgorithms.cpp b/objectstore/RetrieveQueueAlgorithms.cpp index 575299865a..b40c406619 100644 --- a/objectstore/RetrieveQueueAlgorithms.cpp +++ b/objectstore/RetrieveQueueAlgorithms.cpp @@ -17,9 +17,66 @@ */ #include "RetrieveQueueAlgorithms.hpp" +#include "common/make_unique.hpp" namespace cta { namespace objectstore { +template<> +const std::string ContainerTraits<RetrieveQueue>::c_containerTypeName = "RetrieveQueue"; + +template<> +const std::string ContainerTraits<RetrieveQueue>::c_identifierType = "vid"; + +// ContainerTraitsTypes + +void ContainerTraitsTypes<RetrieveQueue>::PoppedElementsSummary:: +addDeltaToLog(const PoppedElementsSummary &previous, log::ScopedParamContainer ¶ms) { +#if 0 + params.add("filesAdded", files - previous.files) + .add("bytesAdded", bytes - previous.bytes) + .add("filesBefore", previous.files) + .add("bytesBefore", previous.bytes) + .add("filesAfter", files) + .add("bytesAfter", bytes); +#endif +} + +void ContainerTraitsTypes<RetrieveQueue>::ContainerSummary:: +addDeltaToLog(const ContainerSummary &previous, log::ScopedParamContainer ¶ms) { +#if 0 + params.add("queueJobsBefore", previous.jobs) + .add("queueBytesBefore", previous.bytes) + .add("queueJobsAfter", jobs) + .add("queueBytesAfter", bytes); +#endif +} + +auto ContainerTraits<RetrieveQueue>::PopCriteria:: +operator-=(const PoppedElementsSummary &pes) -> PopCriteria & { + bytes -= pes.bytes; + files -= pes.files; + return *this; +} + +void ContainerTraitsTypes<RetrieveQueue>::PoppedElementsList:: +insertBack(PoppedElementsList &&insertedList) { + for (auto &e: insertedList) { + std::list<PoppedElement>::emplace_back(std::move(e)); + } +} + +void ContainerTraitsTypes<RetrieveQueue>::PoppedElementsList::insertBack(PoppedElement &&e) { + std::list<PoppedElement>::emplace_back(std::move(e)); +} + +void ContainerTraitsTypes<RetrieveQueue>::PoppedElementsBatch:: +addToLog(log::ScopedParamContainer ¶ms) { + params.add("bytes", summary.bytes) + .add("files", summary.files); +} + +// ContainerTraits + template<> void ContainerTraits<RetrieveQueue>:: getLockedAndFetched(Container &cont, ScopedExclusiveLock &aqL, AgentReference &agRef, @@ -28,6 +85,63 @@ getLockedAndFetched(Container &cont, ScopedExclusiveLock &aqL, AgentReference &a Helpers::getLockedAndFetchedQueue<Container>(cont, aqL, agRef, contId, QueueType::LiveJobs, lc); } +template<> +void ContainerTraits<RetrieveQueue>:: +getLockedAndFetchedNoCreate(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId, log::LogContext &lc) +{ +#if 0 + // Try and get access to a queue. + size_t attemptCount = 0; + retry: + objectstore::RootEntry re(cont.m_objectStore); + re.fetchNoLock(); + std::string aqAddress; + auto aql = re.dumpArchiveQueues(QueueType::LiveJobs); + for (auto & aqp : aql) { + if (aqp.tapePool == cId) + aqAddress = aqp.address; + } + if (!aqAddress.size()) throw NoSuchContainer("In ContainerTraits<ArchiveQueue>::getLockedAndFetchedNoCreate(): no such archive queue"); + // try and lock the archive queue. Any failure from here on means the end of the getting jobs. + cont.setAddress(aqAddress); + //findQueueTime += localFindQueueTime = t.secs(utils::Timer::resetCounter); + try { + if (contLock.isLocked()) contLock.release(); + contLock.lock(cont); + cont.fetch(); + //lockFetchQueueTime += localLockFetchQueueTime = t.secs(utils::Timer::resetCounter); + } catch (cta::exception::Exception & ex) { + // The queue is now absent. We can remove its reference in the root entry. + // A new queue could have been added in the mean time, and be non-empty. + // We will then fail to remove from the RootEntry (non-fatal). + ScopedExclusiveLock rexl(re); + re.fetch(); + try { + re.removeArchiveQueueAndCommit(cId, QueueType::LiveJobs, lc); + log::ScopedParamContainer params(lc); + params.add("tapepool", cId) + .add("queueObject", cont.getAddressIfSet()); + lc.log(log::INFO, "In ArchiveMount::getNextJobBatch(): de-referenced missing queue from root entry"); + } catch (RootEntry::ArchiveQueueNotEmpty & ex) { + log::ScopedParamContainer params(lc); + params.add("tapepool", cId) + .add("queueObject", cont.getAddressIfSet()) + .add("Message", ex.getMessageValue()); + lc.log(log::INFO, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry"); + } catch (RootEntry::NoSuchArchiveQueue & ex) { + // Somebody removed the queue in the mean time. Barely worth mentioning. + log::ScopedParamContainer params(lc); + params.add("tapepool", cId) + .add("queueObject", cont.getAddressIfSet()); + lc.log(log::DEBUG, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry: already done."); + } + //emptyQueueCleanupTime += localEmptyCleanupQueueTime = t.secs(utils::Timer::resetCounter); + attemptCount++; + goto retry; + } +#endif +} + template<> void ContainerTraits<RetrieveQueue>:: addReferencesAndCommit(Container &cont, InsertedElement::list &elemMemCont, AgentReference &agentRef, @@ -39,6 +153,38 @@ addReferencesAndCommit(Container &cont, InsertedElement::list &elemMemCont, Agen jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr)}); } cont.addJobsAndCommit(jobsToAdd, agentRef, lc); +#if 0 + std::list<ArchiveQueue::JobToAdd> jobsToAdd; + for (auto & e: elemMemCont) { + ElementDescriptor jd; + jd.copyNb = e.copyNb; + jd.tapePool = cont.getTapePool(); + jd.owner = cont.getAddressIfSet(); + ArchiveRequest & ar = *e.archiveRequest; + jobsToAdd.push_back({jd, ar.getAddressIfSet(), e.archiveFile.archiveFileID, e.archiveFile.fileSize, + e.mountPolicy, time(nullptr)}); + } + cont.addJobsAndCommit(jobsToAdd, agentRef, lc); +#endif +} + +template<> +void ContainerTraits<RetrieveQueue>::addReferencesIfNecessaryAndCommit(Container& cont, + InsertedElement::list& elemMemCont, AgentReference& agentRef, log::LogContext& lc) +{ +#if 0 + std::list<ArchiveQueue::JobToAdd> jobsToAdd; + for (auto & e: elemMemCont) { + ElementDescriptor jd; + jd.copyNb = e.copyNb; + jd.tapePool = cont.getTapePool(); + jd.owner = cont.getAddressIfSet(); + ArchiveRequest & ar = *e.archiveRequest; + jobsToAdd.push_back({jd, ar.getAddressIfSet(), e.archiveFile.archiveFileID, e.archiveFile.fileSize, + e.mountPolicy, time(nullptr)}); + } + cont.addJobsIfNecessaryAndCommit(jobsToAdd, agentRef, lc); +#endif } template<> @@ -52,6 +198,22 @@ removeReferencesAndCommit(Container &cont, OpFailure<InsertedElement>::list &ele cont.removeJobsAndCommit(elementsToRemove); } +template<> +void ContainerTraits<RetrieveQueue>:: +removeReferencesAndCommit(Container &cont, std::list<ElementAddress> &elementAddressList) { + cont.removeJobsAndCommit(elementAddressList); +} + +template<> +auto ContainerTraits<RetrieveQueue>:: +getContainerSummary(Container &cont) -> ContainerSummary { + ContainerSummary ret; +#if 0 + ret.JobsSummary::operator=(cont.getJobsSummary()); +#endif + return ret; +} + template<> auto ContainerTraits<RetrieveQueue>:: switchElementsOwnership(InsertedElement::list &elemMemCont, const ContainerAddress &contAddress, @@ -83,4 +245,103 @@ switchElementsOwnership(InsertedElement::list &elemMemCont, const ContainerAddre return ret; } +template<> +auto ContainerTraits<RetrieveQueue>:: +getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, ElementsToSkipSet &elemtsToSkip, + log::LogContext &lc) -> PoppedElementsBatch +{ + PoppedElementsBatch ret; +#if 0 + auto candidateJobsFromQueue=cont.getCandidateList(unfulfilledCriteria.bytes, unfulfilledCriteria.files, elemtsToSkip); + for (auto &cjfq: candidateJobsFromQueue.candidates) { + ret.elements.emplace_back(PoppedElement{cta::make_unique<ArchiveRequest>(cjfq.address, cont.m_objectStore), cjfq.copyNb, cjfq.size, + common::dataStructures::ArchiveFile(), "", "", "", }); + ret.summary.bytes += cjfq.size; + ret.summary.files++; + } +#endif + return ret; +} + +template<> +auto ContainerTraits<RetrieveQueue>:: +getElementSummary(const PoppedElement &poppedElement) -> PoppedElementsSummary { + PoppedElementsSummary ret; +#if 0 + ret.bytes = poppedElement.bytes; + ret.files = 1; +#endif + return ret; +} + +template<> +auto ContainerTraits<RetrieveQueue>:: +switchElementsOwnership(PoppedElementsBatch &poppedElementBatch, const ContainerAddress &contAddress, + const ContainerAddress &previousOwnerAddress, log::TimingList &timingList, utils::Timer &t, + log::LogContext &lc) -> OpFailure<PoppedElement>::list +{ +#if 0 + std::list<std::unique_ptr<ArchiveRequest::AsyncJobOwnerUpdater>> updaters; + for (auto & e: popedElementBatch.elements) { + ArchiveRequest & ar = *e.archiveRequest; + auto copyNb = e.copyNb; + updaters.emplace_back(ar.asyncUpdateJobOwner(copyNb, contAddress, previousOwnerAddress)); + } + timingList.insertAndReset("asyncUpdateLaunchTime", t); + auto u = updaters.begin(); + auto e = popedElementBatch.elements.begin(); +#endif + OpFailure<PoppedElement>::list ret; +#if 0 + while (e != popedElementBatch.elements.end()) { + try { + u->get()->wait(); + e->archiveFile = u->get()->getArchiveFile(); + e->archiveReportURL = u->get()->getArchiveReportURL(); + e->errorReportURL = u->get()->getArchiveErrorReportURL(); + e->srcURL = u->get()->getSrcURL(); + } catch (...) { + ret.push_back(OpFailure<PoppedElement>()); + ret.back().element = &(*e); + ret.back().failure = std::current_exception(); + } + u++; + e++; + } + timingList.insertAndReset("asyncUpdateCompletionTime", t); +#endif + return ret; +} + +template<> +void ContainerTraits<RetrieveQueue>:: +trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId, + log::LogContext &lc) +{ +#if 0 + if(cont.isEmpty()) { + // The current implementation is done unlocked. + contLock.release(); + try { + // The queue should be removed as it is empty. + RootEntry re(cont.m_objectStore); + ScopedExclusiveLock rexl(re); + re.fetch(); + re.removeArchiveQueueAndCommit(cId, QueueType::LiveJobs, lc); + log::ScopedParamContainer params(lc); + params.add("tapepool", cId) + .add("queueObject", cont.getAddressIfSet()); + lc.log(log::INFO, "In ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(): deleted empty queue"); + } catch (cta::exception::Exception &ex) { + log::ScopedParamContainer params(lc); + params.add("tapepool", cId) + .add("queueObject", cont.getAddressIfSet()) + .add("Message", ex.getMessageValue()); + lc.log(log::INFO, "In ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(): could not delete a presumably empty queue"); + } + //queueRemovalTime += localQueueRemovalTime = t.secs(utils::Timer::resetCounter); + } +#endif +} + }} // namespace cta::objectstore diff --git a/objectstore/RetrieveQueueAlgorithms.hpp b/objectstore/RetrieveQueueAlgorithms.hpp index 453e6f24ba..dcd733a434 100644 --- a/objectstore/RetrieveQueueAlgorithms.hpp +++ b/objectstore/RetrieveQueueAlgorithms.hpp @@ -26,7 +26,7 @@ namespace cta { namespace objectstore { template<> struct ContainerTraitsTypes<RetrieveQueue> { - struct ContainerSummary { + struct ContainerSummary : public RetrieveQueue::JobsSummary { void addDeltaToLog(const ContainerSummary&, log::ScopedParamContainer&); }; @@ -54,10 +54,16 @@ struct ContainerTraitsTypes<RetrieveQueue> uint64_t bytes; }; struct PoppedElementsSummary { - //PoppedElementsSummary(); - bool operator<(const PopCriteria&); - PoppedElementsSummary& operator+=(const PoppedElementsSummary&); - //PoppedElementsSummary(const PoppedElementsSummary&); + uint64_t bytes = 0; + uint64_t files = 0; + bool operator<(const PopCriteria &pc) { + return bytes < pc.bytes && files < pc.files; + } + PoppedElementsSummary& operator+=(const PoppedElementsSummary &other) { + bytes += other.bytes; + files += other.files; + return *this; + } void addDeltaToLog(const PoppedElementsSummary&, log::ScopedParamContainer&); }; struct PoppedElementsList : public std::list<PoppedElement> { -- GitLab