From f60aafd8b6ab3d948d2493ab19e32a820283d71c Mon Sep 17 00:00:00 2001 From: Eric Cano <Eric.Cano@cern.ch> Date: Mon, 1 Oct 2018 16:59:52 +0200 Subject: [PATCH] Renamed QueueType to JobQueueType. Created RepackQueueType. --- objectstore/Algorithms.hpp | 8 +- objectstore/AlgorithmsTest.cpp | 10 +-- objectstore/ArchiveQueue.cpp | 2 +- objectstore/ArchiveQueueAlgorithms.hpp | 4 +- objectstore/ArchiveQueueFailedAlgorithms.cpp | 2 +- .../ArchiveQueueToReportAlgorithms.cpp | 2 +- .../ArchiveQueueToTransferAlgorithms.cpp | 2 +- objectstore/ArchiveRequest.cpp | 29 ++++--- objectstore/ArchiveRequest.hpp | 6 +- objectstore/CMakeLists.txt | 3 +- objectstore/GarbageCollector.cpp | 8 +- objectstore/GarbageCollector.hpp | 6 +- objectstore/GarbageCollectorTest.cpp | 12 +-- objectstore/Helpers.cpp | 30 +++---- objectstore/Helpers.hpp | 7 +- .../{QueueType.cpp => JobQueueType.cpp} | 10 +-- .../{QueueType.hpp => JobQueueType.hpp} | 4 +- objectstore/RepackQueueType.cpp | 34 ++++++++ objectstore/RepackQueueType.hpp | 26 ++++++ objectstore/RetrieveQueueAlgorithms.hpp | 4 +- objectstore/RetrieveRequest.cpp | 14 ++-- objectstore/RetrieveRequest.hpp | 4 +- objectstore/RootEntry.cpp | 80 ++++++++++--------- objectstore/RootEntry.hpp | 33 ++++---- objectstore/RootEntryTest.cpp | 10 +-- ...objectstore-dereference-removed-queues.cpp | 8 +- scheduler/OStoreDB/MemQueues.hpp | 4 +- scheduler/OStoreDB/OStoreDB.cpp | 32 ++++---- scheduler/OStoreDB/OStoreDBTest.cpp | 2 +- scheduler/OStoreDB/QueueItor.cpp | 4 +- 30 files changed, 239 insertions(+), 161 deletions(-) rename objectstore/{QueueType.cpp => JobQueueType.cpp} (84%) rename objectstore/{QueueType.hpp => JobQueueType.hpp} (87%) create mode 100644 objectstore/RepackQueueType.cpp create mode 100644 objectstore/RepackQueueType.hpp diff --git a/objectstore/Algorithms.hpp b/objectstore/Algorithms.hpp index 4969c4a0f3..a21768905a 100644 --- a/objectstore/Algorithms.hpp +++ b/objectstore/Algorithms.hpp @@ -45,7 +45,7 @@ public: * are provided existing and owned by algorithm's agent. */ void referenceAndSwitchOwnership(const typename ContainerTraits<Q,C>::ContainerIdentifier & contId, - QueueType queueType, const typename ContainerTraits<Q,C>::ContainerIdentifier & prevContId, + const typename ContainerTraits<Q,C>::QueueType queueType, const typename ContainerTraits<Q,C>::ContainerAddress & prevContAddress, typename ContainerTraits<Q,C>::InsertedElement::list & elements, log::LogContext & lc) { C cont(m_backend); ScopedExclusiveLock contLock; @@ -92,7 +92,7 @@ public: * Addition of jobs to container. Convenience overload for cases when current agent is the previous owner * (most cases except garbage collection). */ - void referenceAndSwitchOwnership(const typename ContainerTraits<Q,C>::ContainerIdentifier &contId, QueueType queueType, + void referenceAndSwitchOwnership(const typename ContainerTraits<Q,C>::ContainerIdentifier &contId, JobQueueType queueType, typename ContainerTraits<Q,C>::InsertedElement::list &elements, log::LogContext &lc) { referenceAndSwitchOwnership(contId, queueType, m_agentReference.getAgentAddress(), elements, lc); } @@ -104,7 +104,7 @@ public: * might vary. This function is typically used by the garbage collector. We do not take care of * dereferencing the object from the caller. */ - void referenceAndSwitchOwnershipIfNecessary(const typename ContainerTraits<Q,C>::ContainerIdentifier & contId, QueueType queueType, + void referenceAndSwitchOwnershipIfNecessary(const typename ContainerTraits<Q,C>::ContainerIdentifier & contId, JobQueueType queueType, typename ContainerTraits<Q,C>::ContainerAddress & previousOwnerAddress, typename ContainerTraits<Q,C>::ContainerAddress & contAddress, typename ContainerTraits<Q,C>::InsertedElement::list & elements, log::LogContext & lc) { @@ -154,7 +154,7 @@ public: typename ContainerTraits<Q,C>::PoppedElementsBatch popNextBatch( const typename ContainerTraits<Q,C>::ContainerIdentifier &contId, - QueueType queueType, + JobQueueType queueType, typename ContainerTraits<Q,C>::PopCriteria &popCriteria, log::LogContext &lc) { diff --git a/objectstore/AlgorithmsTest.cpp b/objectstore/AlgorithmsTest.cpp index 7e74e2bcb5..e319fbbbc3 100644 --- a/objectstore/AlgorithmsTest.cpp +++ b/objectstore/AlgorithmsTest.cpp @@ -150,12 +150,12 @@ TEST(ObjectStore, ArchiveQueueAlgorithms) { ar.insert(); } ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransfer> archiveAlgos(be, agentRef); - archiveAlgos.referenceAndSwitchOwnership("Tapepool", QueueType::JobsToTransfer, requests, lc); + archiveAlgos.referenceAndSwitchOwnership("Tapepool", JobQueueType::JobsToTransfer, requests, lc); // Now get the requests back ContainerTraits<ArchiveQueue,ArchiveQueueToTransfer>::PopCriteria popCriteria; popCriteria.bytes = std::numeric_limits<decltype(popCriteria.bytes)>::max(); popCriteria.files = 100; - auto poppedJobs = archiveAlgos.popNextBatch("Tapepool", QueueType::JobsToTransfer, popCriteria, lc); + auto poppedJobs = archiveAlgos.popNextBatch("Tapepool", JobQueueType::JobsToTransfer, popCriteria, lc); ASSERT_EQ(poppedJobs.summary.files, 10); } @@ -210,7 +210,7 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) { auto a1 = agentRef2.getAgentAddress(); auto a2 = agentRef2.getAgentAddress(); ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer> retrieveAlgos2(be2, agentRef2); - retrieveAlgos2.referenceAndSwitchOwnershipIfNecessary("VID", QueueType::JobsToTransfer, + retrieveAlgos2.referenceAndSwitchOwnershipIfNecessary("VID", JobQueueType::JobsToTransfer, a2, a1, requests2, lc); } @@ -218,14 +218,14 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) { try { ASSERT_EQ(requests.size(), 10); - retrieveAlgos.referenceAndSwitchOwnership("VID", QueueType::JobsToTransfer, + retrieveAlgos.referenceAndSwitchOwnership("VID", JobQueueType::JobsToTransfer, agentRef.getAgentAddress(), requests, lc); // Now get the requests back ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::PopCriteria popCriteria; popCriteria.bytes = std::numeric_limits<decltype(popCriteria.bytes)>::max(); popCriteria.files = 100; - auto poppedJobs = retrieveAlgos.popNextBatch("VID", QueueType::JobsToTransfer, popCriteria, lc); + auto poppedJobs = retrieveAlgos.popNextBatch("VID", JobQueueType::JobsToTransfer, popCriteria, lc); ASSERT_EQ(poppedJobs.summary.files, 10); // Validate that the summary has the same information as the popped elements diff --git a/objectstore/ArchiveQueue.cpp b/objectstore/ArchiveQueue.cpp index 55d303d652..790e470451 100644 --- a/objectstore/ArchiveQueue.cpp +++ b/objectstore/ArchiveQueue.cpp @@ -221,7 +221,7 @@ void ArchiveQueue::garbageCollect(const std::string &presumedOwner, AgentReferen RootEntry re(m_objectStore); ScopedSharedLock rel (re); re.fetch(); - auto tpd=re.dumpArchiveQueues(QueueType::JobsToTransfer); + auto tpd=re.dumpArchiveQueues(JobQueueType::JobsToTransfer); for (auto tp=tpd.begin(); tp!=tpd.end(); tp++) { if (tp->address == getAddressIfSet()) { setOwner(re.getAddressIfSet()); diff --git a/objectstore/ArchiveQueueAlgorithms.hpp b/objectstore/ArchiveQueueAlgorithms.hpp index 8000d515d7..ea9ca917d3 100644 --- a/objectstore/ArchiveQueueAlgorithms.hpp +++ b/objectstore/ArchiveQueueAlgorithms.hpp @@ -32,6 +32,8 @@ struct ContainerTraits<ArchiveQueue,C> struct ContainerSummary : public ArchiveQueue::JobsSummary { void addDeltaToLog(ContainerSummary&, log::ScopedParamContainer&); }; + + typedef cta::objectstore::JobQueueType QueueType; struct InsertedElement { ArchiveRequest* archiveRequest; @@ -231,7 +233,7 @@ void ContainerTraits<ArchiveQueue,C>:: getLockedAndFetched(Container& cont, ScopedExclusiveLock& aqL, AgentReference& agRef, const ContainerIdentifier& contId, QueueType queueType, log::LogContext& lc) { - Helpers::getLockedAndFetchedQueue<Container>(cont, aqL, agRef, contId, queueType, lc); + Helpers::getLockedAndFetchedJobQueue<Container>(cont, aqL, agRef, contId, queueType, lc); } template<typename C> diff --git a/objectstore/ArchiveQueueFailedAlgorithms.cpp b/objectstore/ArchiveQueueFailedAlgorithms.cpp index 2553eff4b4..7637420293 100644 --- a/objectstore/ArchiveQueueFailedAlgorithms.cpp +++ b/objectstore/ArchiveQueueFailedAlgorithms.cpp @@ -35,7 +35,7 @@ void ContainerTraits<ArchiveQueue,ArchiveQueueFailed>:: trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId, log::LogContext &lc) { - trimContainerIfNeeded(cont, QueueType::FailedJobs, contLock, cId, lc); + trimContainerIfNeeded(cont, JobQueueType::FailedJobs, contLock, cId, lc); } }} // namespace cta::objectstore diff --git a/objectstore/ArchiveQueueToReportAlgorithms.cpp b/objectstore/ArchiveQueueToReportAlgorithms.cpp index 2f96bca620..2b7e76bf23 100644 --- a/objectstore/ArchiveQueueToReportAlgorithms.cpp +++ b/objectstore/ArchiveQueueToReportAlgorithms.cpp @@ -80,7 +80,7 @@ void ContainerTraits<ArchiveQueue,ArchiveQueueToReport>:: trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId, log::LogContext &lc) { - trimContainerIfNeeded(cont, QueueType::JobsToReport, contLock, cId, lc); + trimContainerIfNeeded(cont, JobQueueType::JobsToReport, contLock, cId, lc); } }} // namespace cta::objectstore diff --git a/objectstore/ArchiveQueueToTransferAlgorithms.cpp b/objectstore/ArchiveQueueToTransferAlgorithms.cpp index bc2cdb574c..c8cd63500e 100644 --- a/objectstore/ArchiveQueueToTransferAlgorithms.cpp +++ b/objectstore/ArchiveQueueToTransferAlgorithms.cpp @@ -74,7 +74,7 @@ void ContainerTraits<ArchiveQueue,ArchiveQueueToTransfer>:: trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId, log::LogContext &lc) { - trimContainerIfNeeded(cont, QueueType::JobsToTransfer, contLock, cId, lc); + trimContainerIfNeeded(cont, JobQueueType::JobsToTransfer, contLock, cId, lc); } }} // namespace cta::objectstore diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index 119ceba720..61677bd460 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -71,23 +71,26 @@ void cta::objectstore::ArchiveRequest::addJob(uint16_t copyNumber, j->set_maxreportretries(maxReportRetries); } -QueueType ArchiveRequest::getJobQueueType(uint16_t copyNumber) { +//------------------------------------------------------------------------------ +// ArchiveRequest::getJobQueueType() +//------------------------------------------------------------------------------ +JobQueueType ArchiveRequest::getJobQueueType(uint16_t copyNumber) { checkPayloadReadable(); for (auto &j: m_payload.jobs()) { if (j.copynb() == copyNumber) { switch (j.status()) { case serializers::ArchiveJobStatus::AJS_ToTransfer: - return QueueType::JobsToTransfer; + return JobQueueType::JobsToTransfer; case serializers::ArchiveJobStatus::AJS_Complete: throw JobNotQueueable("In ArchiveRequest::getJobQueueType(): Complete jobs are not queueable. They are finished and pend siblings completion."); case serializers::ArchiveJobStatus::AJS_ToReportForTransfer: // We should report a success... - return QueueType::JobsToReport; + return JobQueueType::JobsToReport; case serializers::ArchiveJobStatus::AJS_ToReportForFailure: // We should report a failure. The report queue can be shared. - return QueueType::JobsToReport; + return JobQueueType::JobsToReport; case serializers::ArchiveJobStatus::AJS_Failed: - return QueueType::FailedJobs; + return JobQueueType::FailedJobs; case serializers::ArchiveJobStatus::AJS_Abandoned: throw JobNotQueueable("In ArchiveRequest::getJobQueueType(): Abandoned jobs are not queueable. They are finished and pend siblings completion."); } @@ -357,7 +360,7 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer // recreated (this will be done by helper). ArchiveQueue aq(m_objectStore); ScopedExclusiveLock aql; - Helpers::getLockedAndFetchedQueue<ArchiveQueue>(aq, aql, agentReference, j->tapepool(), getQueueType(status), lc); + Helpers::getLockedAndFetchedJobQueue<ArchiveQueue>(aq, aql, agentReference, j->tapepool(), getQueueType(status), lc); queueObject=aq.getAddressIfSet(); ArchiveRequest::JobDump jd; jd.copyNb = j->copynb(); @@ -629,21 +632,27 @@ std::string ArchiveRequest::getJobOwner(uint16_t copyNumber) { return j->owner(); } -QueueType ArchiveRequest::getQueueType(const serializers::ArchiveJobStatus& status) { +//------------------------------------------------------------------------------ +// ArchiveRequest::getQueueType() +//------------------------------------------------------------------------------ +JobQueueType ArchiveRequest::getQueueType(const serializers::ArchiveJobStatus& status) { using serializers::ArchiveJobStatus; switch(status) { case ArchiveJobStatus::AJS_ToTransfer: - return QueueType::JobsToTransfer; + return JobQueueType::JobsToTransfer; case ArchiveJobStatus::AJS_ToReportForTransfer: case ArchiveJobStatus::AJS_ToReportForFailure: - return QueueType::JobsToReport; + return JobQueueType::JobsToReport; case ArchiveJobStatus::AJS_Failed: - return QueueType::FailedJobs; + return JobQueueType::FailedJobs; default: throw cta::exception::Exception("In ArchiveRequest::getQueueType(): invalid status for queueing."); } } +//------------------------------------------------------------------------------ +// ArchiveRequest::statusToString() +//------------------------------------------------------------------------------ std::string ArchiveRequest::statusToString(const serializers::ArchiveJobStatus& status) { switch(status) { case serializers::ArchiveJobStatus::AJS_ToTransfer: diff --git a/objectstore/ArchiveRequest.hpp b/objectstore/ArchiveRequest.hpp index 9020ca385f..d5277f08d3 100644 --- a/objectstore/ArchiveRequest.hpp +++ b/objectstore/ArchiveRequest.hpp @@ -24,7 +24,7 @@ #include "common/dataStructures/MountPolicy.hpp" #include "common/dataStructures/UserIdentity.hpp" #include "common/dataStructures/ArchiveFile.hpp" -#include "QueueType.hpp" +#include "JobQueueType.hpp" #include "common/Timer.hpp" #include "common/optional.hpp" #include "ObjectOps.hpp" @@ -98,7 +98,7 @@ public: EnqueueingNextStep addReportFailure(uint16_t copyNumber, uint64_t sessionId, const std::string & failureReason, log::LogContext &lc); //< returns next step to take with the job CTA_GENERATE_EXCEPTION_CLASS(JobNotQueueable); - QueueType getJobQueueType(uint16_t copyNumber); + JobQueueType getJobQueueType(uint16_t copyNumber); CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob); // Set a job ownership void setJobOwner(uint16_t copyNumber, const std::string & owner); @@ -162,7 +162,7 @@ public: std::string getJobOwner(uint16_t copyNumber); // Utility to convert status to queue type - static QueueType getQueueType(const serializers::ArchiveJobStatus &status); + static JobQueueType getQueueType(const serializers::ArchiveJobStatus &status); // =========================================================================== // TODO: ArchiveFile comes with extraneous information. diff --git a/objectstore/CMakeLists.txt b/objectstore/CMakeLists.txt index 07f0c5e436..ae270e204a 100644 --- a/objectstore/CMakeLists.txt +++ b/objectstore/CMakeLists.txt @@ -70,13 +70,14 @@ add_library (ctaobjectstore SHARED RetrieveQueueToTransferAlgorithms.cpp RetrieveQueueToReportAlgorithms.cpp RetrieveQueueFailedAlgorithms.cpp - QueueType.cpp + JobQueueType.cpp ArchiveRequest.cpp RetrieveRequest.cpp DriveRegister.cpp DriveState.cpp RepackIndex.cpp RepackRequest.cpp + RepackQueueType.cpp BackendVFS.cpp BackendRados.cpp BackendPopulator.cpp diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index 8f5fa13d50..6351443861 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -351,7 +351,7 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std:: otherObjects.emplace_back(new GenericObject(rr->getAddressIfSet(), objectStore)); break; } - retrieveQueuesAndRequests[std::make_tuple(vid, QueueType::JobsToTransfer)].emplace_back(rr); + retrieveQueuesAndRequests[std::make_tuple(vid, JobQueueType::JobsToTransfer)].emplace_back(rr); log::ScopedParamContainer params3(lc); // Find copyNb for logging size_t copyNb = std::numeric_limits<size_t>::max(); @@ -389,7 +389,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a // individual requeue operations, we limit the number of concurrently requeued objects to an // arbitrary 500. std::string tapepool; - QueueType queueType; + JobQueueType queueType; std::tie(tapepool, queueType) = archiveQueueIdAndReqs.first; auto & requestsList = archiveQueueIdAndReqs.second; while (requestsList.size()) { @@ -508,7 +508,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& // Then should hence not have changes since we pre-fetched them. for (auto & retriveQueueIdAndReqs: retrieveQueuesAndRequests) { std::string vid; - QueueType queueType; + JobQueueType queueType; std::tie(vid, queueType) = retriveQueueIdAndReqs.first; auto & requestsList = retriveQueueIdAndReqs.second; while (requestsList.size()) { @@ -532,7 +532,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& // Get the retrieve queue and add references to the jobs to it. RetrieveQueue rq(objectStore); ScopedExclusiveLock rql; - Helpers::getLockedAndFetchedQueue<RetrieveQueue>(rq,rql, agentReference, vid, queueType, lc); + Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(rq,rql, agentReference, vid, queueType, lc); queueLockFetchTime = t.secs(utils::Timer::resetCounter); auto jobsSummary=rq.getJobsSummary(); filesBefore=jobsSummary.files; diff --git a/objectstore/GarbageCollector.hpp b/objectstore/GarbageCollector.hpp index 88900c473a..b9aa9d8270 100644 --- a/objectstore/GarbageCollector.hpp +++ b/objectstore/GarbageCollector.hpp @@ -22,7 +22,7 @@ #include "Agent.hpp" #include "AgentWatchdog.hpp" #include "AgentRegister.hpp" -#include "QueueType.hpp" +#include "JobQueueType.hpp" #include "common/log/LogContext.hpp" /** @@ -55,8 +55,8 @@ public: /** Structure allowing the sorting of owned objects, so they can be requeued in batches, * one batch per queue. */ struct OwnedObjectSorter { - std::map<std::tuple<std::string, QueueType>, std::list<std::shared_ptr <ArchiveRequest>>> archiveQueuesAndRequests; - std::map<std::tuple<std::string, QueueType>, std::list<std::shared_ptr <RetrieveRequest>>> retrieveQueuesAndRequests; + std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr <ArchiveRequest>>> archiveQueuesAndRequests; + std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr <RetrieveRequest>>> retrieveQueuesAndRequests; std::list<std::shared_ptr<GenericObject>> otherObjects; /// Fill up the fetchedObjects with objects of interest. void fetchOwnedObjects(Agent & agent, std::list<std::shared_ptr<GenericObject>> & fetchedObjects, Backend & objectStore, diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index c046101026..92c60d4988 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -341,7 +341,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { re.fetch(); std::stringstream tapePoolName; tapePoolName << "TapePool" << i; - tpAddr[i] = re.addOrGetArchiveQueueAndCommit(tapePoolName.str(), agentRef, cta::objectstore::QueueType::JobsToTransfer, lc); + tpAddr[i] = re.addOrGetArchiveQueueAndCommit(tapePoolName.str(), agentRef, cta::objectstore::JobQueueType::JobsToTransfer, lc); cta::objectstore::ArchiveQueue aq(tpAddr[i], be); } // Create the various ATFR's, stopping one step further each time. @@ -463,7 +463,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { std::list<std::string> tapePools = { "TapePool0", "TapePool1" }; for (auto & tp: tapePools) { // Empty queue - cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(tp, cta::objectstore::QueueType::JobsToTransfer), be); + cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(tp, cta::objectstore::JobQueueType::JobsToTransfer), be); cta::objectstore::ScopedExclusiveLock aql(aq); aq.fetch(); std::list<std::string> ajtr; @@ -473,7 +473,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { aq.removeJobsAndCommit(ajtr); aql.release(); // Remove queues from root - re.removeArchiveQueueAndCommit(tp, cta::objectstore::QueueType::JobsToTransfer, lc); + re.removeArchiveQueueAndCommit(tp, cta::objectstore::JobQueueType::JobsToTransfer, lc); } ASSERT_NO_THROW(re.removeAgentRegisterAndCommit(lc)); @@ -533,7 +533,7 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) { re.fetch(); std::stringstream vid; vid << "Tape" << i; - tAddr[i] = re.addOrGetRetrieveQueueAndCommit(vid.str(), agentRef, cta::objectstore::QueueType::JobsToTransfer, lc); + tAddr[i] = re.addOrGetRetrieveQueueAndCommit(vid.str(), agentRef, cta::objectstore::JobQueueType::JobsToTransfer, lc); cta::objectstore::RetrieveQueue rq(tAddr[i], be); } // Create the various ATFR's, stopping one step further each time. @@ -646,7 +646,7 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) { std::list<std::string> retrieveQueues = { "Tape0", "Tape1" }; for (auto & vid: retrieveQueues) { // Empty queue - cta::objectstore::RetrieveQueue rq(re.getRetrieveQueueAddress(vid, cta::objectstore::QueueType::JobsToTransfer), be); + cta::objectstore::RetrieveQueue rq(re.getRetrieveQueueAddress(vid, cta::objectstore::JobQueueType::JobsToTransfer), be); cta::objectstore::ScopedExclusiveLock rql(rq); rq.fetch(); std::list<std::string> jtrl; @@ -656,7 +656,7 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) { rq.removeJobsAndCommit(jtrl); rql.release(); // Remove queues from root - re.removeRetrieveQueueAndCommit(vid, cta::objectstore::QueueType::JobsToTransfer, lc); + re.removeRetrieveQueueAndCommit(vid, cta::objectstore::JobQueueType::JobsToTransfer, lc); } ASSERT_NO_THROW(re.removeAgentRegisterAndCommit(lc)); diff --git a/objectstore/Helpers.cpp b/objectstore/Helpers.cpp index f7893cb6e6..c90c002799 100644 --- a/objectstore/Helpers.cpp +++ b/objectstore/Helpers.cpp @@ -35,9 +35,9 @@ namespace cta { namespace objectstore { // Helpers::getLockedAndFetchedQueue <ArchiveQueue> () //------------------------------------------------------------------------------ template <> -void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue, +void Helpers::getLockedAndFetchedJobQueue<ArchiveQueue>(ArchiveQueue& archiveQueue, ScopedExclusiveLock& archiveQueueLock, AgentReference & agentReference, - const std::string& tapePool, QueueType queueType, log::LogContext & lc) { + const cta::optional<std::string>& tapePool, JobQueueType queueType, log::LogContext & lc) { // TODO: if necessary, we could use a singleton caching object here to accelerate // lookups. // Getting a locked AQ is the name of the game. @@ -58,13 +58,13 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue, re.fetchNoLock(); rootFetchNoLockTime = t.secs(utils::Timer::resetCounter); try { - archiveQueue.setAddress(re.getArchiveQueueAddress(tapePool, queueType)); + archiveQueue.setAddress(re.getArchiveQueueAddress(tapePool.value(), queueType)); } catch (cta::exception::Exception & ex) { ScopedExclusiveLock rexl(re); rootRelockExclusiveTime = t.secs(utils::Timer::resetCounter); re.fetch(); rootRefetchTime = t.secs(utils::Timer::resetCounter); - archiveQueue.setAddress(re.addOrGetArchiveQueueAndCommit(tapePool, agentReference, queueType, lc)); + archiveQueue.setAddress(re.addOrGetArchiveQueueAndCommit(tapePool.value(), agentReference, queueType, lc)); addOrGetQueueandCommitTime = t.secs(utils::Timer::resetCounter); } } @@ -108,10 +108,10 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue, re.fetch(); rootRefetchTime += t.secs(utils::Timer::resetCounter); try { - re.removeArchiveQueueAndCommit(tapePool, queueType, lc); + re.removeArchiveQueueAndCommit(tapePool.value(), queueType, lc); rootQueueDereferenceTime += t.secs(utils::Timer::resetCounter); log::ScopedParamContainer params(lc); - params.add("tapepool", tapePool) + params.add("tapepool", tapePool.value()) .add("queueObject", archiveQueue.getAddressIfSet()) .add("exceptionMsg", ex.getMessageValue()); lc.log(log::INFO, "In Helpers::getLockedAndFetchedQueue<ArchiveQueue>(): removed reference to gone archive queue from root entry."); @@ -144,7 +144,7 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue, archiveQueue.resetAddress(); throw cta::exception::Exception(std::string( "In OStoreDB::getLockedAndFetchedArchiveQueue(): failed to find or create and lock archive queue after 5 retries for tapepool: ") - + tapePool); + + tapePool.value()); } @@ -152,9 +152,9 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue, // Helpers::getLockedAndFetchedQueue <RetrieveQueue> () //------------------------------------------------------------------------------ template <> -void Helpers::getLockedAndFetchedQueue<RetrieveQueue>(RetrieveQueue& retrieveQueue, +void Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(RetrieveQueue& retrieveQueue, ScopedExclusiveLock& retrieveQueueLock, AgentReference& agentReference, - const std::string& vid, QueueType queueType, log::LogContext & lc) { + const cta::optional<std::string>& vid, JobQueueType queueType, log::LogContext & lc) { // TODO: if necessary, we could use a singleton caching object here to accelerate // lookups. // Getting a locked AQ is the name of the game. @@ -175,13 +175,13 @@ void Helpers::getLockedAndFetchedQueue<RetrieveQueue>(RetrieveQueue& retrieveQue re.fetchNoLock(); rootFetchNoLockTime = t.secs(utils::Timer::resetCounter); try { - retrieveQueue.setAddress(re.getRetrieveQueueAddress(vid, queueType)); + retrieveQueue.setAddress(re.getRetrieveQueueAddress(vid.value(), queueType)); } catch (cta::exception::Exception & ex) { ScopedExclusiveLock rexl(re); rootRelockExclusiveTime = t.secs(utils::Timer::resetCounter); re.fetch(); rootRefetchTime = t.secs(utils::Timer::resetCounter); - retrieveQueue.setAddress(re.addOrGetRetrieveQueueAndCommit(vid, agentReference, queueType, lc)); + retrieveQueue.setAddress(re.addOrGetRetrieveQueueAndCommit(vid.value(), agentReference, queueType, lc)); addOrGetQueueandCommitTime = t.secs(utils::Timer::resetCounter); } } @@ -225,10 +225,10 @@ void Helpers::getLockedAndFetchedQueue<RetrieveQueue>(RetrieveQueue& retrieveQue re.fetch(); rootRefetchTime += t.secs(utils::Timer::resetCounter); try { - re.removeRetrieveQueueAndCommit(vid, queueType, lc); + re.removeRetrieveQueueAndCommit(vid.value(), queueType, lc); rootQueueDereferenceTime += t.secs(utils::Timer::resetCounter); log::ScopedParamContainer params(lc); - params.add("vid", vid) + params.add("vid", vid.value()) .add("queueObject", retrieveQueue.getAddressIfSet()) .add("exceptionMsg", ex.getMessageValue()); lc.log(log::INFO, "In Helpers::getLockedAndFetchedQueue<RetrieveQueue>(): removed reference to gone retrieve queue from root entry."); @@ -261,7 +261,7 @@ void Helpers::getLockedAndFetchedQueue<RetrieveQueue>(RetrieveQueue& retrieveQue retrieveQueue.resetAddress(); throw cta::exception::Exception(std::string( "In OStoreDB::getLockedAndFetchedRetrieveQueue(): failed to find or create and lock archive queue after 5 retries for vid: ") - + vid); + + vid.value()); } //------------------------------------------------------------------------------ @@ -412,7 +412,7 @@ std::list<SchedulerDatabase::RetrieveQueueStatistics> Helpers::getRetrieveQueueS continue; std::string rqAddr; try { - std::string rqAddr = re.getRetrieveQueueAddress(tf.second.vid, QueueType::JobsToTransfer); + std::string rqAddr = re.getRetrieveQueueAddress(tf.second.vid, JobQueueType::JobsToTransfer); } catch (cta::exception::Exception &) { ret.push_back(SchedulerDatabase::RetrieveQueueStatistics()); ret.back().vid=tf.second.vid; diff --git a/objectstore/Helpers.hpp b/objectstore/Helpers.hpp index bf622480ca..52a0176490 100644 --- a/objectstore/Helpers.hpp +++ b/objectstore/Helpers.hpp @@ -23,7 +23,7 @@ #include "common/threading/MutexLocker.hpp" #include "catalogue/Catalogue.hpp" #include "scheduler/OStoreDB/OStoreDB.hpp" -#include "QueueType.hpp" +#include "JobQueueType.hpp" #include <string> #include <set> #include <future> @@ -52,9 +52,10 @@ public: * @param tapePool or vid the name of the needed tape pool */ template <class Queue> - static void getLockedAndFetchedQueue(Queue & queue, + static void getLockedAndFetchedJobQueue(Queue & queue, ScopedExclusiveLock & queueLock, AgentReference & agentReference, - const std::string & tapePoolOrVid, QueueType queueType, log::LogContext & lc); + const cta::optional<std::string> & tapePoolOrVid, JobQueueType queueType, log::LogContext & lc); + CTA_GENERATE_EXCEPTION_CLASS(NoTapeAvailableForRetrieve); /** diff --git a/objectstore/QueueType.cpp b/objectstore/JobQueueType.cpp similarity index 84% rename from objectstore/QueueType.cpp rename to objectstore/JobQueueType.cpp index ad0d2df50c..7a84a0551e 100644 --- a/objectstore/QueueType.cpp +++ b/objectstore/JobQueueType.cpp @@ -16,17 +16,17 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "QueueType.hpp" +#include "JobQueueType.hpp" namespace cta { namespace objectstore { -std::string toString(QueueType queueType) { +std::string toString(JobQueueType queueType) { switch (queueType) { - case QueueType::FailedJobs: + case JobQueueType::FailedJobs: return "failedJobs"; - case QueueType::JobsToReport: + case JobQueueType::JobsToReport: return "jobsToReport"; - case QueueType::JobsToTransfer: + case JobQueueType::JobsToTransfer: return "jobsToTranfer"; default: return "Unknown queue type."; diff --git a/objectstore/QueueType.hpp b/objectstore/JobQueueType.hpp similarity index 87% rename from objectstore/QueueType.hpp rename to objectstore/JobQueueType.hpp index 8180ccacbf..97cdab1e9f 100644 --- a/objectstore/QueueType.hpp +++ b/objectstore/JobQueueType.hpp @@ -21,6 +21,6 @@ #include <string> namespace cta { namespace objectstore { -enum class QueueType { JobsToTransfer, FailedJobs, JobsToReport }; -std::string toString(QueueType queueType); +enum class JobQueueType { JobsToTransfer, FailedJobs, JobsToReport }; +std::string toString(JobQueueType queueType); }} // namespace cta::objectstore \ No newline at end of file diff --git a/objectstore/RepackQueueType.cpp b/objectstore/RepackQueueType.cpp new file mode 100644 index 0000000000..21f0b97fed --- /dev/null +++ b/objectstore/RepackQueueType.cpp @@ -0,0 +1,34 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "RepackQueueType.hpp" +#include "common/exception/Exception.hpp" + +namespace cta { namespace objectstore { + +std::string toString(RepackQueueType queueType) { + switch(queueType) { + case RepackQueueType::Pending: + return "Pending"; + case RepackQueueType::ToExpand: + return "ToExpand"; + } + throw exception::Exception("In toString(RepackQueueType): unexpected queue type."); +} + +}} // namespace cta::objectstore \ No newline at end of file diff --git a/objectstore/RepackQueueType.hpp b/objectstore/RepackQueueType.hpp new file mode 100644 index 0000000000..a1e0b2d681 --- /dev/null +++ b/objectstore/RepackQueueType.hpp @@ -0,0 +1,26 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <string> + +namespace cta { namespace objectstore { +enum class RepackQueueType { Pending, ToExpand }; +std::string toString(RepackQueueType queueType); +}} // namespace cta::objectstore diff --git a/objectstore/RetrieveQueueAlgorithms.hpp b/objectstore/RetrieveQueueAlgorithms.hpp index 2e0280e8d7..b2df660434 100644 --- a/objectstore/RetrieveQueueAlgorithms.hpp +++ b/objectstore/RetrieveQueueAlgorithms.hpp @@ -32,6 +32,8 @@ struct ContainerTraits<RetrieveQueue,C> ContainerSummary(const RetrieveQueue::JobsSummary &c) : RetrieveQueue::JobsSummary() {} void addDeltaToLog(const ContainerSummary&, log::ScopedParamContainer&) const; }; + + typedef cta::objectstore::JobQueueType QueueType; struct InsertedElement { std::unique_ptr<RetrieveRequest> retrieveRequest; @@ -206,7 +208,7 @@ void ContainerTraits<RetrieveQueue,C>:: getLockedAndFetched(Container &cont, ScopedExclusiveLock &aqL, AgentReference &agRef, const ContainerIdentifier &contId, QueueType queueType, log::LogContext &lc) { - Helpers::getLockedAndFetchedQueue<Container>(cont, aqL, agRef, contId, queueType, lc); + Helpers::getLockedAndFetchedJobQueue<Container>(cont, aqL, agRef, contId, queueType, lc); } template<typename C> diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index 6f7d36720d..712d3c8784 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -153,8 +153,8 @@ queueForFailure:; RetrieveQueue rq(m_objectStore); ScopedExclusiveLock rql; // We need to know if this failure got reported yet. - QueueType queueType=isFailureReported()?QueueType::FailedJobs:QueueType::JobsToReport; - Helpers::getLockedAndFetchedQueue<RetrieveQueue>(rq, rql, agentReference, bestVid, queueType, lc); + JobQueueType queueType=isFailureReported()?JobQueueType::FailedJobs:JobQueueType::JobsToReport; + Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(rq, rql, agentReference, bestVid, queueType, lc); // Enqueue the job objectstore::MountPolicySerDeser mp; std::list<RetrieveQueue::JobToAdd> jta; @@ -212,7 +212,7 @@ queueForTransfer:; // We now need to grab the queue and requeue the request. RetrieveQueue rq(m_objectStore); ScopedExclusiveLock rql; - Helpers::getLockedAndFetchedQueue<RetrieveQueue>(rq, rql, agentReference, bestVid, QueueType::JobsToTransfer, lc); + Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(rq, rql, agentReference, bestVid, JobQueueType::JobsToTransfer, lc); // Enqueue the job objectstore::MountPolicySerDeser mp; mp.deserialize(m_payload.mountpolicy()); @@ -459,14 +459,14 @@ RetrieveRequest::RetryStatus RetrieveRequest::getRetryStatus(const uint16_t copy //------------------------------------------------------------------------------ // RetrieveRequest::getQueueType() //------------------------------------------------------------------------------ -QueueType RetrieveRequest::getQueueType() { +JobQueueType RetrieveRequest::getQueueType() { checkPayloadReadable(); bool hasToReport=false; for (auto &j: m_payload.jobs()) { // Any job is to be transfered => To transfer switch(j.status()) { case serializers::RetrieveJobStatus::RJS_ToTransfer: - return QueueType::JobsToTransfer; + return JobQueueType::JobsToTransfer; break; case serializers::RetrieveJobStatus::RJS_FailedToReport: // Else any job to report => to report. @@ -475,8 +475,8 @@ QueueType RetrieveRequest::getQueueType() { default: break; } } - if (hasToReport) return QueueType::JobsToReport; - return QueueType::FailedJobs; + if (hasToReport) return JobQueueType::JobsToReport; + return JobQueueType::FailedJobs; } //------------------------------------------------------------------------------ diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp index 5fdfee4db2..e0e2da0a0c 100644 --- a/objectstore/RetrieveRequest.hpp +++ b/objectstore/RetrieveRequest.hpp @@ -21,7 +21,7 @@ #include "ObjectOps.hpp" #include "objectstore/cta.pb.h" #include "TapeFileSerDeser.hpp" -#include "QueueType.hpp" +#include "JobQueueType.hpp" #include <list> #include "common/dataStructures/DiskFileInfo.hpp" #include "common/dataStructures/EntryLog.hpp" @@ -78,7 +78,7 @@ public: }; RetryStatus getRetryStatus(uint16_t copyNumber); /// Returns queue type depending on the compound statuses of all retrieve requests. - QueueType getQueueType(); + JobQueueType getQueueType(); std::list<std::string> getFailures(); std::string statusToString(const serializers::RetrieveJobStatus & status); serializers::RetrieveJobStatus getJobStatus(uint16_t copyNumber); diff --git a/objectstore/RootEntry.cpp b/objectstore/RootEntry.cpp index 80711b415f..d54d2c2f64 100644 --- a/objectstore/RootEntry.cpp +++ b/objectstore/RootEntry.cpp @@ -68,11 +68,11 @@ bool RootEntry::isEmpty() { if (m_payload.has_schedulerlockpointer() && m_payload.schedulerlockpointer().address().size()) return false; - for (auto &qt: {QueueType::JobsToTransfer, QueueType::JobsToReport, QueueType::FailedJobs}) { + for (auto &qt: {JobQueueType::JobsToTransfer, JobQueueType::JobsToReport, JobQueueType::FailedJobs}) { if (archiveQueuePointers(qt).size()) return false; } - for (auto &qt: {QueueType::JobsToTransfer, QueueType::JobsToReport, QueueType::FailedJobs}) { + for (auto &qt: {JobQueueType::JobsToTransfer, JobQueueType::JobsToReport, JobQueueType::FailedJobs}) { if (retrieveQueuePointers(qt).size()) return false; } @@ -100,52 +100,52 @@ void RootEntry::garbageCollect(const std::string& presumedOwner, AgentReference // ========== Queue types and helper functions ================================= // ============================================================================= -const ::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::ArchiveQueuePointer>& RootEntry::archiveQueuePointers(QueueType queueType) { +const ::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::ArchiveQueuePointer>& RootEntry::archiveQueuePointers(JobQueueType queueType) { switch(queueType) { - case QueueType::JobsToTransfer: + case JobQueueType::JobsToTransfer: return m_payload.livearchivejobsqueuepointers(); - case QueueType::JobsToReport: + case JobQueueType::JobsToReport: return m_payload.archivejobstoreportqueuepointers(); - case QueueType::FailedJobs: + case JobQueueType::FailedJobs: return m_payload.failedarchivejobsqueuepointers(); default: throw cta::exception::Exception("In RootEntry::archiveQueuePointers(): unknown queue type."); } } -::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::ArchiveQueuePointer>* RootEntry::mutableArchiveQueuePointers(QueueType queueType) { +::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::ArchiveQueuePointer>* RootEntry::mutableArchiveQueuePointers(JobQueueType queueType) { switch(queueType) { - case QueueType::JobsToTransfer: + case JobQueueType::JobsToTransfer: return m_payload.mutable_livearchivejobsqueuepointers(); - case QueueType::JobsToReport: + case JobQueueType::JobsToReport: return m_payload.mutable_archivejobstoreportqueuepointers(); - case QueueType::FailedJobs: + case JobQueueType::FailedJobs: return m_payload.mutable_failedarchivejobsqueuepointers(); default: throw cta::exception::Exception("In RootEntry::mutableArchiveQueuePointers(): unknown queue type."); } } -const ::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::RetrieveQueuePointer>& RootEntry::retrieveQueuePointers(QueueType queueType) { +const ::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::RetrieveQueuePointer>& RootEntry::retrieveQueuePointers(JobQueueType queueType) { switch(queueType) { - case QueueType::JobsToTransfer: + case JobQueueType::JobsToTransfer: return m_payload.liveretrievejobsqueuepointers(); - case QueueType::JobsToReport: + case JobQueueType::JobsToReport: return m_payload.retrievefailurestoreportqueuepointers(); - case QueueType::FailedJobs: + case JobQueueType::FailedJobs: return m_payload.failedretrievejobsqueuepointers(); default: throw cta::exception::Exception("In RootEntry::retrieveQueuePointers(): unknown queue type."); } } -::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::RetrieveQueuePointer>* RootEntry::mutableRetrieveQueuePointers(QueueType queueType) { +::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::RetrieveQueuePointer>* RootEntry::mutableRetrieveQueuePointers(JobQueueType queueType) { switch(queueType) { - case QueueType::JobsToTransfer: + case JobQueueType::JobsToTransfer: return m_payload.mutable_liveretrievejobsqueuepointers(); - case QueueType::JobsToReport: + case JobQueueType::JobsToReport: return m_payload.mutable_retrievefailurestoreportqueuepointers(); - case QueueType::FailedJobs: + case JobQueueType::FailedJobs: return m_payload.mutable_failedretrievejobsqueuepointers(); default: throw cta::exception::Exception("In RootEntry::mutableRetrieveQueuePointers(): unknown queue type."); @@ -167,7 +167,7 @@ namespace { } std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool, AgentReference& agentRef, - QueueType queueType, log::LogContext & lc) { + JobQueueType queueType, log::LogContext & lc) { checkPayloadWritable(); // Check the archive queue does not already exist try { @@ -176,9 +176,9 @@ std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool // Insert the archive queue pointer in the root entry, then the queue. std::string archiveQueueNameHeader = "ArchiveQueue"; switch(queueType) { - case QueueType::JobsToTransfer: archiveQueueNameHeader+="ToTransfer"; break; - case QueueType::JobsToReport: archiveQueueNameHeader+="ToReport"; break; - case QueueType::FailedJobs: archiveQueueNameHeader+="Failed"; break; + case JobQueueType::JobsToTransfer: archiveQueueNameHeader+="ToTransfer"; break; + case JobQueueType::JobsToReport: archiveQueueNameHeader+="ToReport"; break; + case JobQueueType::FailedJobs: archiveQueueNameHeader+="Failed"; break; default: break; } std::string archiveQueueAddress = agentRef.nextId(archiveQueueNameHeader+"-"+tapePool); @@ -197,7 +197,7 @@ std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool return archiveQueueAddress; } -void RootEntry::removeArchiveQueueAndCommit(const std::string& tapePool, QueueType queueType, log::LogContext & lc) { +void RootEntry::removeArchiveQueueAndCommit(const std::string& tapePool, JobQueueType queueType, log::LogContext & lc) { checkPayloadWritable(); // find the address of the archive queue object try { @@ -246,7 +246,8 @@ void RootEntry::removeArchiveQueueAndCommit(const std::string& tapePool, QueueTy commit(); { log::ScopedParamContainer params(lc); - params.add("tapePool", tapePool); + params.add("tapePool", tapePool) + .add("queueType", toString(queueType)); lc.log(log::INFO, "In RootEntry::removeArchiveQueueAndCommit(): removed archive queue reference."); } } catch (serializers::NotFound &) { @@ -255,11 +256,11 @@ void RootEntry::removeArchiveQueueAndCommit(const std::string& tapePool, QueueTy } } -void RootEntry::removeMissingArchiveQueueReference(const std::string& tapePool, QueueType queueType) { +void RootEntry::removeMissingArchiveQueueReference(const std::string& tapePool, JobQueueType queueType) { serializers::removeOccurences(mutableArchiveQueuePointers(queueType), tapePool); } -std::string RootEntry::getArchiveQueueAddress(const std::string& tapePool, QueueType queueType) { +std::string RootEntry::getArchiveQueueAddress(const std::string& tapePool, JobQueueType queueType) { checkPayloadReadable(); try { auto & tpp = serializers::findElement(archiveQueuePointers(queueType), tapePool); @@ -269,7 +270,7 @@ std::string RootEntry::getArchiveQueueAddress(const std::string& tapePool, Queue } } -auto RootEntry::dumpArchiveQueues(QueueType queueType) -> std::list<ArchiveQueueDump> { +auto RootEntry::dumpArchiveQueues(JobQueueType queueType) -> std::list<ArchiveQueueDump> { checkPayloadReadable(); std::list<ArchiveQueueDump> ret; auto & tpl = archiveQueuePointers(queueType); @@ -295,7 +296,7 @@ namespace { } std::string RootEntry::addOrGetRetrieveQueueAndCommit(const std::string& vid, AgentReference& agentRef, - QueueType queueType, log::LogContext & lc) { + JobQueueType queueType, log::LogContext & lc) { checkPayloadWritable(); // Check the retrieve queue does not already exist try { @@ -306,9 +307,9 @@ std::string RootEntry::addOrGetRetrieveQueueAndCommit(const std::string& vid, Ag // The make of the vid in the object name will be handy. std::string retrieveQueueNameHeader = "RetrieveQueue"; switch(queueType) { - case QueueType::JobsToTransfer: retrieveQueueNameHeader+="ToTransfer"; break; - case QueueType::JobsToReport: retrieveQueueNameHeader+="ToReport"; break; - case QueueType::FailedJobs: retrieveQueueNameHeader+="Failed"; break; + case JobQueueType::JobsToTransfer: retrieveQueueNameHeader+="ToTransfer"; break; + case JobQueueType::JobsToReport: retrieveQueueNameHeader+="ToReport"; break; + case JobQueueType::FailedJobs: retrieveQueueNameHeader+="Failed"; break; default: break; } std::string retrieveQueueAddress = agentRef.nextId(retrieveQueueNameHeader+"-"+vid); @@ -327,11 +328,11 @@ std::string RootEntry::addOrGetRetrieveQueueAndCommit(const std::string& vid, Ag return retrieveQueueAddress; } -void RootEntry::removeMissingRetrieveQueueReference(const std::string& vid, QueueType queueType) { +void RootEntry::removeMissingRetrieveQueueReference(const std::string& vid, JobQueueType queueType) { serializers::removeOccurences(mutableRetrieveQueuePointers(queueType), vid); } -void RootEntry::removeRetrieveQueueAndCommit(const std::string& vid, QueueType queueType, log::LogContext & lc) { +void RootEntry::removeRetrieveQueueAndCommit(const std::string& vid, JobQueueType queueType, log::LogContext & lc) { checkPayloadWritable(); // find the address of the retrieve queue object try { @@ -359,7 +360,7 @@ void RootEntry::removeRetrieveQueueAndCommit(const std::string& vid, QueueType q std::stringstream err; err << "Unexpected vid found in retrieve queue pointed to for vid: " << vid << " found: " << rq.getVid(); - throw WrongArchiveQueue(err.str()); + throw WrongRetrieveQueue(err.str()); } // Check the retrieve queue is empty if (!rq.isEmpty()) { @@ -380,17 +381,18 @@ void RootEntry::removeRetrieveQueueAndCommit(const std::string& vid, QueueType q commit(); { log::ScopedParamContainer params(lc); - params.add("vid", vid); + params.add("vid", vid) + .add("queueType", toString(queueType)); lc.log(log::INFO, "In RootEntry::removeRetrieveQueueAndCommit(): removed retrieve queue reference."); } } catch (serializers::NotFound &) { // No such tape pool. Nothing to to. - throw NoSuchRetrieveQueue("In RootEntry::addOrGetRetrieveQueueAndCommit: trying to remove non-existing retrieve queue"); + throw NoSuchRetrieveQueue("In RootEntry::removeRetrieveQueueAndCommit: trying to remove non-existing retrieve queue"); } } -std::string RootEntry::getRetrieveQueueAddress(const std::string& vid, QueueType queueType) { +std::string RootEntry::getRetrieveQueueAddress(const std::string& vid, JobQueueType queueType) { checkPayloadReadable(); try { auto & rqp = serializers::findElement(retrieveQueuePointers(queueType), vid); @@ -400,7 +402,7 @@ std::string RootEntry::getRetrieveQueueAddress(const std::string& vid, QueueType } } -auto RootEntry::dumpRetrieveQueues(QueueType queueType) -> std::list<RetrieveQueueDump> { +auto RootEntry::dumpRetrieveQueues(JobQueueType queueType) -> std::list<RetrieveQueueDump> { checkPayloadReadable(); std::list<RetrieveQueueDump> ret; auto & tpl = retrieveQueuePointers(queueType); @@ -695,7 +697,7 @@ void RootEntry::removeSchedulerGlobalLockAndCommit(log::LogContext & lc) { } // ============================================================================= -// ================ Repack index lock manipulation ============================= +// ================ Repack index manipulation ================================== // ============================================================================= std::string RootEntry::getRepackIndexAddress() { diff --git a/objectstore/RootEntry.hpp b/objectstore/RootEntry.hpp index 2a1cc7a195..8ad0ff7d01 100644 --- a/objectstore/RootEntry.hpp +++ b/objectstore/RootEntry.hpp @@ -20,7 +20,7 @@ #include "objectstore/cta.pb.h" -#include "QueueType.hpp" +#include "JobQueueType.hpp" #include "Backend.hpp" #include "ObjectOps.hpp" #include "EntryLogSerDeser.hpp" @@ -62,13 +62,13 @@ public: // Queue types and helper functions ========================================== private: const ::google::protobuf::RepeatedPtrField< ::cta::objectstore::serializers::ArchiveQueuePointer >& - archiveQueuePointers(QueueType queueType); + archiveQueuePointers(JobQueueType queueType); ::google::protobuf::RepeatedPtrField< ::cta::objectstore::serializers::ArchiveQueuePointer >* - mutableArchiveQueuePointers(QueueType queueType); + mutableArchiveQueuePointers(JobQueueType queueType); const ::google::protobuf::RepeatedPtrField< ::cta::objectstore::serializers::RetrieveQueuePointer >& - retrieveQueuePointers(QueueType queueType); + retrieveQueuePointers(JobQueueType queueType); ::google::protobuf::RepeatedPtrField< ::cta::objectstore::serializers::RetrieveQueuePointer >* - mutableRetrieveQueuePointers(QueueType queueType); + mutableRetrieveQueuePointers(JobQueueType queueType); public: // ArchiveQueue handling ==================================================== @@ -77,37 +77,38 @@ public: /** This function implicitly creates the archive queue structure and updates * the pointer to it. It will implicitly commit the object to the store. */ std::string addOrGetArchiveQueueAndCommit(const std::string & tapePool, AgentReference & agentRef, - QueueType queueType, log::LogContext & lc); + JobQueueType queueType, log::LogContext & lc); /** This function implicitly deletes the tape pool structure. * Fails if it not empty*/ CTA_GENERATE_EXCEPTION_CLASS(NoSuchArchiveQueue); - void removeArchiveQueueAndCommit(const std::string & tapePool, QueueType queueType, log::LogContext & lc); + void removeArchiveQueueAndCommit(const std::string & tapePool, JobQueueType queueType, log::LogContext & lc); /** This function is used in a cleanup utility. Removes unconditionally the reference to the archive queue */ - void removeMissingArchiveQueueReference(const std::string & tapePool, QueueType queueType); - void removeArchiveQueueIfAddressMatchesAndCommit(const std::string & tapePool, const std::string & archiveQueueAddress, QueueType queueType); - std::string getArchiveQueueAddress(const std::string & tapePool, QueueType queueType); + void removeMissingArchiveQueueReference(const std::string & tapePool, JobQueueType queueType); + void removeArchiveQueueIfAddressMatchesAndCommit(const std::string & tapePool, const std::string & archiveQueueAddress, JobQueueType queueType); + std::string getArchiveQueueAddress(const std::string & tapePool, JobQueueType queueType); struct ArchiveQueueDump { std::string tapePool; std::string address; }; - std::list<ArchiveQueueDump> dumpArchiveQueues(QueueType queueType); + std::list<ArchiveQueueDump> dumpArchiveQueues(JobQueueType queueType); // RetrieveQueue handling ==================================================== CTA_GENERATE_EXCEPTION_CLASS(RetrieveQueueNotEmpty); + CTA_GENERATE_EXCEPTION_CLASS(WrongRetrieveQueue); /** This function implicitly creates the retrieve queue structure and updates * the pointer to it. It will implicitly commit the object to the store. */ std::string addOrGetRetrieveQueueAndCommit(const std::string & vid, AgentReference & agentRef, - QueueType queueType, log::LogContext & lc); + JobQueueType queueType, log::LogContext & lc); /** This function is used in a cleanup utility. Removes unconditionally the reference to the retrieve queue */ - void removeMissingRetrieveQueueReference(const std::string & address, QueueType queueType); + void removeMissingRetrieveQueueReference(const std::string & address, JobQueueType queueType); CTA_GENERATE_EXCEPTION_CLASS(NoSuchRetrieveQueue); - void removeRetrieveQueueAndCommit(const std::string & vid, QueueType queueType, log::LogContext & lc); - std::string getRetrieveQueueAddress(const std::string & vid, QueueType queueType); + void removeRetrieveQueueAndCommit(const std::string & vid, JobQueueType queueType, log::LogContext & lc); + std::string getRetrieveQueueAddress(const std::string & vid, JobQueueType queueType); struct RetrieveQueueDump { std::string vid; std::string address; }; - std::list<RetrieveQueueDump> dumpRetrieveQueues(QueueType queueType); + std::list<RetrieveQueueDump> dumpRetrieveQueues(JobQueueType queueType); // Drive register manipulations ============================================== CTA_GENERATE_EXCEPTION_CLASS(DriveRegisterNotEmpty); diff --git a/objectstore/RootEntryTest.cpp b/objectstore/RootEntryTest.cpp index 3bd8cdcb4e..d06cf5d231 100644 --- a/objectstore/RootEntryTest.cpp +++ b/objectstore/RootEntryTest.cpp @@ -97,9 +97,9 @@ TEST (ObjectStore, RootEntryArchiveQueues) { cta::objectstore::RootEntry re(be); cta::objectstore::ScopedExclusiveLock lock(re); re.fetch(); - ASSERT_THROW(re.getArchiveQueueAddress("tapePool1", cta::objectstore::QueueType::JobsToTransfer), + ASSERT_THROW(re.getArchiveQueueAddress("tapePool1", cta::objectstore::JobQueueType::JobsToTransfer), cta::objectstore::RootEntry::NoSuchArchiveQueue); - tpAddr1 = re.addOrGetArchiveQueueAndCommit("tapePool1", agr, cta::objectstore::QueueType::JobsToTransfer, lc); + tpAddr1 = re.addOrGetArchiveQueueAndCommit("tapePool1", agr, cta::objectstore::JobQueueType::JobsToTransfer, lc); // Check that we car read it cta::objectstore::ArchiveQueue aq(tpAddr1, be); cta::objectstore::ScopedSharedLock aql(aq); @@ -110,7 +110,7 @@ TEST (ObjectStore, RootEntryArchiveQueues) { cta::objectstore::RootEntry re(be); cta::objectstore::ScopedExclusiveLock lock(re); re.fetch(); - tpAddr2 = re.addOrGetArchiveQueueAndCommit("tapePool2", agr, cta::objectstore::QueueType::JobsToTransfer, lc); + tpAddr2 = re.addOrGetArchiveQueueAndCommit("tapePool2", agr, cta::objectstore::JobQueueType::JobsToTransfer, lc); ASSERT_TRUE(be.exists(tpAddr2)); } { @@ -118,7 +118,7 @@ TEST (ObjectStore, RootEntryArchiveQueues) { cta::objectstore::RootEntry re(be); cta::objectstore::ScopedExclusiveLock lock(re); re.fetch(); - re.removeArchiveQueueAndCommit("tapePool2", cta::objectstore::QueueType::JobsToTransfer, lc); + re.removeArchiveQueueAndCommit("tapePool2", cta::objectstore::JobQueueType::JobsToTransfer, lc); ASSERT_FALSE(be.exists(tpAddr2)); } // Unregister the agent @@ -129,7 +129,7 @@ TEST (ObjectStore, RootEntryArchiveQueues) { cta::objectstore::ScopedExclusiveLock lock(re); re.fetch(); re.removeAgentRegisterAndCommit(lc); - re.removeArchiveQueueAndCommit("tapePool1", cta::objectstore::QueueType::JobsToTransfer, lc); + re.removeArchiveQueueAndCommit("tapePool1", cta::objectstore::JobQueueType::JobsToTransfer, lc); ASSERT_FALSE(be.exists(tpAddr1)); re.removeIfEmpty(lc); ASSERT_FALSE(re.exists()); diff --git a/objectstore/cta-objectstore-dereference-removed-queues.cpp b/objectstore/cta-objectstore-dereference-removed-queues.cpp index d440b024bb..bcb329eade 100644 --- a/objectstore/cta-objectstore-dereference-removed-queues.cpp +++ b/objectstore/cta-objectstore-dereference-removed-queues.cpp @@ -58,14 +58,14 @@ int main(int argc, char ** argv) { cta::objectstore::ScopedExclusiveLock rel(re); re.fetch(); std::list<std::string> missingArchiveQueues, missingRetrieveQueues; - for (auto & aq: re.dumpArchiveQueues(cta::objectstore::QueueType::JobsToTransfer)) { + for (auto & aq: re.dumpArchiveQueues(cta::objectstore::JobQueueType::JobsToTransfer)) { if (!be->exists(aq.address)) { missingArchiveQueues.emplace_back(aq.tapePool); std::cout << "The archive queue for tape pool " << aq.tapePool << " at address " << aq.address << " is missing and will be dereferenced." << std::endl; } } - for (auto & rq: re.dumpRetrieveQueues(cta::objectstore::QueueType::JobsToTransfer)) { + for (auto & rq: re.dumpRetrieveQueues(cta::objectstore::JobQueueType::JobsToTransfer)) { if (!be->exists(rq.address)) { missingRetrieveQueues.emplace_back(rq.vid); std::cout << "The retrieve queue for vid " << rq.vid << " at address " << rq.address @@ -74,11 +74,11 @@ int main(int argc, char ** argv) { } // Actually do the job for (auto & tp: missingArchiveQueues) { - re.removeMissingArchiveQueueReference(tp, cta::objectstore::QueueType::JobsToTransfer); + re.removeMissingArchiveQueueReference(tp, cta::objectstore::JobQueueType::JobsToTransfer); std::cout << "Archive queue for tape pool " << tp << "dereferenced." << std::endl; } for (auto & vid: missingRetrieveQueues) { - re.removeMissingRetrieveQueueReference(vid, cta::objectstore::QueueType::JobsToTransfer); + re.removeMissingRetrieveQueueReference(vid, cta::objectstore::JobQueueType::JobsToTransfer); std::cout << "Retrieve queue for vid " << vid << "dereferenced." << std::endl; } if (missingArchiveQueues.size() || missingRetrieveQueues.size()) { diff --git a/scheduler/OStoreDB/MemQueues.hpp b/scheduler/OStoreDB/MemQueues.hpp index 9165ce11d3..b7c17a5c3a 100644 --- a/scheduler/OStoreDB/MemQueues.hpp +++ b/scheduler/OStoreDB/MemQueues.hpp @@ -300,8 +300,8 @@ std::shared_ptr<SharedQueueLock<Queue, Request>> MemQueue<Request, Queue>::share ret->m_lock.reset(new objectstore::ScopedExclusiveLock); auto & queue = *ret->m_queue; auto & aql = *ret->m_lock; - objectstore::Helpers::getLockedAndFetchedQueue<Queue>(queue, aql, - *oStoreDB.m_agentReference, queueIndex, objectstore::QueueType::JobsToTransfer, logContext); + objectstore::Helpers::getLockedAndFetchedJobQueue<Queue>(queue, aql, + *oStoreDB.m_agentReference, queueIndex, objectstore::JobQueueType::JobsToTransfer, logContext); double getFetchedQueueTime = timer.secs(utils::Timer::resetCounter); size_t qJobsBefore=queue.dumpJobs().size(); uint64_t qBytesBefore=0; diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 940e045384..7059dbb254 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -182,7 +182,7 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro log::LogContext & logContext) { utils::Timer t, t2; // Walk the archive queues for statistics - for (auto & aqp: re.dumpArchiveQueues(QueueType::JobsToTransfer)) { + for (auto & aqp: re.dumpArchiveQueues(JobQueueType::JobsToTransfer)) { objectstore::ArchiveQueue aqueue(aqp.address, m_objectStore); // debug utility variable std::string __attribute__((__unused__)) poolName = aqp.tapePool; @@ -227,7 +227,7 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro logContext.log(log::INFO, "In OStoreDB::fetchMountInfo(): fetched an archive queue."); } // Walk the retrieve queues for statistics - for (auto & rqp: re.dumpRetrieveQueues(QueueType::JobsToTransfer)) { + for (auto & rqp: re.dumpRetrieveQueues(JobQueueType::JobsToTransfer)) { RetrieveQueue rqueue(rqp.address, m_objectStore); // debug utility variable std::string __attribute__((__unused__)) vid = rqp.vid; @@ -392,7 +392,7 @@ void OStoreDB::trimEmptyQueues(log::LogContext& lc) { RootEntry re(m_objectStore); ScopedExclusiveLock rel(re); re.fetch(); - for (auto & queueType: { QueueType::JobsToTransfer, QueueType::JobsToReport, QueueType::FailedJobs} ) { + for (auto & queueType: { JobQueueType::JobsToTransfer, JobQueueType::JobsToReport, JobQueueType::FailedJobs} ) { try { auto archiveQueueList = re.dumpArchiveQueues(queueType); for (auto & a: archiveQueueList) { @@ -669,14 +669,14 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::getNextArch RootEntry re(m_objectStore); re.fetchNoLock(); while (true) { - auto queueList = re.dumpArchiveQueues(QueueType::JobsToReport); + auto queueList = re.dumpArchiveQueues(JobQueueType::JobsToReport); std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > ret; if (queueList.empty()) return ret; // Try to get jobs from the first queue. If it is empty, it will be trimmed, // so we can got for another round. AQTRAlgo::PopCriteria criteria; criteria.files = filesRequested; - auto jobs = aqtrAlgo.popNextBatch(queueList.front().tapePool, QueueType::JobsToReport, criteria, logContext); + auto jobs = aqtrAlgo.popNextBatch(queueList.front().tapePool, JobQueueType::JobsToReport, criteria, logContext); if (jobs.elements.empty()) continue; for (auto & j: jobs.elements) { std::unique_ptr<OStoreDB::ArchiveJob> aj(new OStoreDB::ArchiveJob(j.archiveRequest->getAddressIfSet(), *this)); @@ -762,7 +762,7 @@ void OStoreDB::setJobBatchReported(std::list<cta::SchedulerDatabase::ArchiveJob* cta::nullopt, serializers::ArchiveJobStatus::AJS_Failed}); } try { - caAQF.referenceAndSwitchOwnership(queue.first, QueueType::FailedJobs, m_agentReference->getAgentAddress(), + caAQF.referenceAndSwitchOwnership(queue.first, JobQueueType::FailedJobs, m_agentReference->getAgentAddress(), insertedElements, lc); } catch (exception::Exception & ex) { log::ScopedParamContainer params(lc); @@ -1779,7 +1779,7 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun typedef objectstore::ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransfer> AQAlgos; AQAlgos aqAlgos(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); AQAlgos::PopCriteria popCriteria(filesRequested, bytesRequested); - auto jobs = aqAlgos.popNextBatch(mountInfo.tapePool, objectstore::QueueType::JobsToTransfer, popCriteria, logContext); + auto jobs = aqAlgos.popNextBatch(mountInfo.tapePool, objectstore::JobQueueType::JobsToTransfer, popCriteria, logContext); // We can construct the return value. std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > ret; for (auto & j: jobs.elements) { @@ -1859,7 +1859,7 @@ getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContex typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer> RQAlgos; RQAlgos rqAlgos(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); RQAlgos::PopCriteria popCriteria(filesRequested, bytesRequested); - auto jobs = rqAlgos.popNextBatch(mountInfo.vid, objectstore::QueueType::JobsToTransfer, popCriteria, logContext); + auto jobs = rqAlgos.popNextBatch(mountInfo.vid, objectstore::JobQueueType::JobsToTransfer, popCriteria, logContext); // We can construct the return value std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> ret; for(auto &j : jobs.elements) @@ -2063,7 +2063,7 @@ void OStoreDB::ArchiveMount::setJobBatchTransferred(std::list<std::unique_ptr<ct for (auto &list: insertedElementsLists) { try { utils::Timer tLocal; - aqtrCa.referenceAndSwitchOwnership(list.first, QueueType::JobsToReport, m_oStoreDB.m_agentReference->getAgentAddress(), + aqtrCa.referenceAndSwitchOwnership(list.first, JobQueueType::JobsToReport, m_oStoreDB.m_agentReference->getAgentAddress(), list.second, lc); log::ScopedParamContainer params(lc); params.add("VID", list.first) @@ -2152,7 +2152,7 @@ void OStoreDB::ArchiveJob::failTransfer(const std::string& failureReason, log::L CaAqtr caAqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); CaAqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaAqtr::InsertedElement{&m_archiveRequest, tapeFile.copyNb, archiveFile, cta::nullopt, cta::nullopt }); - caAqtr.referenceAndSwitchOwnership(tapeFile.vid, objectstore::QueueType::JobsToReport, insertedElements, lc); + caAqtr.referenceAndSwitchOwnership(tapeFile.vid, objectstore::JobQueueType::JobsToReport, insertedElements, lc); log::ScopedParamContainer params(lc); params.add("fileId", archiveFile.archiveFileID) .add("copyNb", tapeFile.copyNb) @@ -2175,7 +2175,7 @@ void OStoreDB::ArchiveJob::failTransfer(const std::string& failureReason, log::L CaAqtr caAqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); CaAqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaAqtr::InsertedElement{&m_archiveRequest, tapeFile.copyNb, archiveFile, cta::nullopt, cta::nullopt }); - caAqtr.referenceAndSwitchOwnership(tapepool, objectstore::QueueType::JobsToTransfer, + caAqtr.referenceAndSwitchOwnership(tapepool, objectstore::JobQueueType::JobsToTransfer, insertedElements, lc); log::ScopedParamContainer params(lc); params.add("fileId", archiveFile.archiveFileID) @@ -2199,7 +2199,7 @@ void OStoreDB::ArchiveJob::failTransfer(const std::string& failureReason, log::L CaAqtr caAqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); CaAqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaAqtr::InsertedElement{&m_archiveRequest, tapeFile.copyNb, archiveFile, cta::nullopt, cta::nullopt }); - caAqtr.referenceAndSwitchOwnership(tapeFile.vid, objectstore::QueueType::FailedJobs, insertedElements, lc); + caAqtr.referenceAndSwitchOwnership(tapeFile.vid, objectstore::JobQueueType::FailedJobs, insertedElements, lc); log::ScopedParamContainer params(lc); params.add("fileId", archiveFile.archiveFileID) .add("copyNb", tapeFile.copyNb) @@ -2244,7 +2244,7 @@ void OStoreDB::ArchiveJob::failReport(const std::string& failureReason, log::Log CaAqtr caAqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); CaAqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaAqtr::InsertedElement{&m_archiveRequest, tapeFile.copyNb, archiveFile, cta::nullopt, cta::nullopt }); - caAqtr.referenceAndSwitchOwnership(tapeFile.vid, objectstore::QueueType::JobsToReport, insertedElements, lc); + caAqtr.referenceAndSwitchOwnership(tapeFile.vid, objectstore::JobQueueType::JobsToReport, insertedElements, lc); auto retryStatus = m_archiveRequest.getRetryStatus(tapeFile.copyNb); log::ScopedParamContainer params(lc); params.add("fileId", archiveFile.archiveFileID) @@ -2264,7 +2264,7 @@ void OStoreDB::ArchiveJob::failReport(const std::string& failureReason, log::Log CaAqtr caAqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); CaAqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaAqtr::InsertedElement{&m_archiveRequest, tapeFile.copyNb, archiveFile, cta::nullopt, cta::nullopt }); - caAqtr.referenceAndSwitchOwnership(tapeFile.vid, objectstore::QueueType::FailedJobs, insertedElements, lc); + caAqtr.referenceAndSwitchOwnership(tapeFile.vid, objectstore::JobQueueType::FailedJobs, insertedElements, lc); auto retryStatus = m_archiveRequest.getRetryStatus(tapeFile.copyNb); log::ScopedParamContainer params(lc); params.add("fileId", archiveFile.archiveFileID) @@ -2424,8 +2424,8 @@ bool OStoreDB::RetrieveJob::fail(const std::string& failureReason, log::LogConte // Add the request to the queue. objectstore::RetrieveQueue rq(m_oStoreDB.m_objectStore); objectstore::ScopedExclusiveLock rql; - objectstore::Helpers::getLockedAndFetchedQueue<RetrieveQueue>(rq, rql, - *m_oStoreDB.m_agentReference, bestVid, objectstore::QueueType::JobsToTransfer, logContext); + objectstore::Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(rq, rql, + *m_oStoreDB.m_agentReference, bestVid, objectstore::JobQueueType::JobsToTransfer, logContext); auto rfqc = m_retrieveRequest.getRetrieveFileQueueCriteria(); auto & af = rfqc.archiveFile; auto & tf = af.tapeFiles.at(bestCopyNb); diff --git a/scheduler/OStoreDB/OStoreDBTest.cpp b/scheduler/OStoreDB/OStoreDBTest.cpp index d0f235eaa4..4a21ffde40 100644 --- a/scheduler/OStoreDB/OStoreDBTest.cpp +++ b/scheduler/OStoreDB/OStoreDBTest.cpp @@ -141,7 +141,7 @@ TEST_P(OStoreDBTest, getBatchArchiveJob) { RootEntry re(osdbi.getBackend()); ScopedSharedLock rel(re); re.fetch(); - aqAddr = re.getArchiveQueueAddress("Tapepool1", cta::objectstore::QueueType::JobsToTransfer); + aqAddr = re.getArchiveQueueAddress("Tapepool1", cta::objectstore::JobQueueType::JobsToTransfer); rel.release(); ArchiveQueue aq(aqAddr, osdbi.getBackend()); ScopedSharedLock aql(aq); diff --git a/scheduler/OStoreDB/QueueItor.cpp b/scheduler/OStoreDB/QueueItor.cpp index d98771cebb..837d31048f 100644 --- a/scheduler/OStoreDB/QueueItor.cpp +++ b/scheduler/OStoreDB/QueueItor.cpp @@ -38,7 +38,7 @@ QueueItor(objectstore::Backend &objectStore, const std::string &queue_id) : objectstore::RootEntry re(m_objectStore); objectstore::ScopedSharedLock rel(re); re.fetch(); - m_jobQueuesQueue = re.dumpArchiveQueues(objectstore::QueueType::JobsToTransfer); + m_jobQueuesQueue = re.dumpArchiveQueues(objectstore::JobQueueType::JobsToTransfer); } // Set queue iterator to the first queue in the list @@ -141,7 +141,7 @@ QueueItor(objectstore::Backend &objectStore, const std::string &queue_id) : objectstore::RootEntry re(m_objectStore); objectstore::ScopedSharedLock rel(re); re.fetch(); - m_jobQueuesQueue = re.dumpRetrieveQueues(objectstore::QueueType::JobsToTransfer); + m_jobQueuesQueue = re.dumpRetrieveQueues(objectstore::JobQueueType::JobsToTransfer); } // Find the first queue -- GitLab