diff --git a/common/optional.hpp b/common/optional.hpp index 364ac870bd0211221018f8894d6adf7b4293eaf0..e551acd2ea2275362cb94f20ec4e940fb050a4e2 100644 --- a/common/optional.hpp +++ b/common/optional.hpp @@ -26,6 +26,10 @@ namespace cta { struct nullopt_t { nullopt_t() {} + friend std::ostream& operator<<(std::ostream& os, const nullopt_t &nopt) + { + return os; + } }; extern const nullopt_t nullopt; @@ -331,5 +335,4 @@ template<class T> bool operator>=(const T& value, const optional<T>& opt) { template<class T> void swap(optional<T>& lhs, optional<T>& rhs) { lhs.swap(rhs); } - } diff --git a/objectstore/Algorithms.hpp b/objectstore/Algorithms.hpp index d404a2a2b91ef09ab77ab7de121567e74b80e09b..9fba2bc2b6e64bc3948f91fbfe9632ff166d66f3 100644 --- a/objectstore/Algorithms.hpp +++ b/objectstore/Algorithms.hpp @@ -41,19 +41,20 @@ public: typedef typename ContainerTraits<Q,C>::PopCriteria PopCriteria; typedef typename ContainerTraits<Q,C>::OwnershipSwitchFailure OwnershipSwitchFailure; typedef typename ContainerTraits<Q,C>::PoppedElementsBatch PoppedElementsBatch; + typedef typename ContainerTraits<Q,C>::QueueType JobQueueType; /** * Reference objects in the container and then switch their ownership. Objects * are provided existing and owned by algorithm's agent. */ void referenceAndSwitchOwnership(const typename ContainerTraits<Q,C>::ContainerIdentifier & contId, - const typename ContainerTraits<Q,C>::QueueType queueType, const typename ContainerTraits<Q,C>::ContainerAddress & prevContAddress, + const typename ContainerTraits<Q,C>::ContainerAddress & prevContAddress, typename ContainerTraits<Q,C>::InsertedElement::list & elements, log::LogContext & lc) { C cont(m_backend); ScopedExclusiveLock contLock; log::TimingList timingList; utils::Timer t; - ContainerTraits<Q,C>::getLockedAndFetched(cont, contLock, m_agentReference, contId, queueType, lc); + ContainerTraits<Q,C>::getLockedAndFetched(cont, contLock, m_agentReference, contId, lc); ContainerTraits<Q,C>::addReferencesAndCommit(cont, elements, m_agentReference, lc); auto failedOwnershipSwitchElements = ContainerTraits<Q,C>::switchElementsOwnership(elements, cont.getAddressIfSet(), prevContAddress, timingList, t, lc); @@ -94,9 +95,9 @@ public: * Addition of jobs to container. Convenience overload for cases when current agent is the previous owner * (most cases except garbage collection). */ - void referenceAndSwitchOwnership(const typename ContainerTraits<Q,C>::ContainerIdentifier &contId, JobQueueType queueType, + void referenceAndSwitchOwnership(const typename ContainerTraits<Q,C>::ContainerIdentifier &contId, typename ContainerTraits<Q,C>::InsertedElement::list &elements, log::LogContext &lc) { - referenceAndSwitchOwnership(contId, queueType, m_agentReference.getAgentAddress(), elements, lc); + referenceAndSwitchOwnership(contId, m_agentReference.getAgentAddress(), elements, lc); } /** @@ -106,7 +107,7 @@ public: * might vary. This function is typically used by the garbage collector. We do not take care of * dereferencing the object from the caller. */ - void referenceAndSwitchOwnershipIfNecessary(const typename ContainerTraits<Q,C>::ContainerIdentifier & contId, JobQueueType queueType, + void referenceAndSwitchOwnershipIfNecessary(const typename ContainerTraits<Q,C>::ContainerIdentifier & contId, typename ContainerTraits<Q,C>::ContainerAddress & previousOwnerAddress, typename ContainerTraits<Q,C>::ContainerAddress & contAddress, typename ContainerTraits<Q,C>::InsertedElement::list & elements, log::LogContext & lc) { @@ -114,7 +115,8 @@ public: ScopedExclusiveLock contLock; log::TimingList timingList; utils::Timer t; - ContainerTraits<Q,C>::getLockedAndFetched(cont, contLock, m_agentReference, contId, queueType, lc); + typename ContainerTraits<Q,C>::QueueType queueType; + ContainerTraits<Q,C>::getLockedAndFetched(cont, contLock, m_agentReference, contId, lc); contAddress = cont.getAddressIfSet(); auto contSummaryBefore = ContainerTraits<Q,C>::getContainerSummary(cont); timingList.insertAndReset("queueLockFetchTime", t); @@ -296,7 +298,6 @@ public: */ PoppedElementsBatch popNextBatch( const typename ContainerTraits<Q,C>::ContainerIdentifier &contId, - JobQueueType queueType, typename ContainerTraits<Q,C>::PopCriteria &popCriteria, log::LogContext &lc) { @@ -317,7 +318,7 @@ public: iterationCount++; ScopedExclusiveLock contLock; try { - ContainerTraits<Q,C>::getLockedAndFetchedNoCreate(cont, contLock, contId, queueType, lc); + ContainerTraits<Q,C>::getLockedAndFetchedNoCreate(cont, contLock, contId, lc); } catch (typename ContainerTraits<Q,C>::NoSuchContainer &) { localTimingList.insertAndReset("findLockFetchQueueTime", t); timingList+=localTimingList; diff --git a/objectstore/AlgorithmsTest.cpp b/objectstore/AlgorithmsTest.cpp index 8ec4867b61ea7d30c0c72a21d0a0b356902c2a4d..208c57e00f6186704eb7ca7e1fb60ff0761ecd51 100644 --- a/objectstore/AlgorithmsTest.cpp +++ b/objectstore/AlgorithmsTest.cpp @@ -150,12 +150,12 @@ TEST(ObjectStore, ArchiveQueueAlgorithms) { ar.insert(); } ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransfer> archiveAlgos(be, agentRef); - archiveAlgos.referenceAndSwitchOwnership("Tapepool", JobQueueType::JobsToTransfer, requests, lc); + archiveAlgos.referenceAndSwitchOwnership("Tapepool", requests, lc); // Now get the requests back ContainerTraits<ArchiveQueue,ArchiveQueueToTransfer>::PopCriteria popCriteria; popCriteria.bytes = std::numeric_limits<decltype(popCriteria.bytes)>::max(); popCriteria.files = 100; - auto poppedJobs = archiveAlgos.popNextBatch("Tapepool", JobQueueType::JobsToTransfer, popCriteria, lc); + auto poppedJobs = archiveAlgos.popNextBatch("Tapepool", popCriteria, lc); ASSERT_EQ(poppedJobs.summary.files, 10); } @@ -210,7 +210,7 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) { auto a1 = agentRef2.getAgentAddress(); auto a2 = agentRef2.getAgentAddress(); ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer> retrieveAlgos2(be2, agentRef2); - retrieveAlgos2.referenceAndSwitchOwnershipIfNecessary("VID", JobQueueType::JobsToTransfer, + retrieveAlgos2.referenceAndSwitchOwnershipIfNecessary("VID", a2, a1, requests2, lc); } @@ -218,14 +218,14 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) { try { ASSERT_EQ(requests.size(), 10); - retrieveAlgos.referenceAndSwitchOwnership("VID", JobQueueType::JobsToTransfer, + retrieveAlgos.referenceAndSwitchOwnership("VID", agentRef.getAgentAddress(), requests, lc); // Now get the requests back ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::PopCriteria popCriteria; popCriteria.bytes = std::numeric_limits<decltype(popCriteria.bytes)>::max(); popCriteria.files = 100; - auto poppedJobs = retrieveAlgos.popNextBatch("VID", JobQueueType::JobsToTransfer, popCriteria, lc); + auto poppedJobs = retrieveAlgos.popNextBatch("VID", popCriteria, lc); ASSERT_EQ(poppedJobs.summary.files, 10); // Validate that the summary has the same information as the popped elements diff --git a/objectstore/ArchiveQueueAlgorithms.hpp b/objectstore/ArchiveQueueAlgorithms.hpp index 1156f0c1eae5b91852efc28c1c296f51ca8083d6..2e92d62a469f8109a70acad1bc523eaa52cd9fc6 100644 --- a/objectstore/ArchiveQueueAlgorithms.hpp +++ b/objectstore/ArchiveQueueAlgorithms.hpp @@ -33,7 +33,7 @@ struct ContainerTraits<ArchiveQueue,C> void addDeltaToLog(ContainerSummary&, log::ScopedParamContainer&); }; - typedef cta::objectstore::JobQueueType QueueType; + struct QueueType; struct InsertedElement { ArchiveRequest* archiveRequest; @@ -84,7 +84,7 @@ struct ContainerTraits<ArchiveQueue,C> PoppedElementsSummary summary; void addToLog(log::ScopedParamContainer&); }; - + typedef ArchiveQueue Container; typedef std::string ContainerAddress; typedef std::string ElementAddress; @@ -120,9 +120,9 @@ struct ContainerTraits<ArchiveQueue,C> static bool trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId, log::LogContext &lc); static void getLockedAndFetched(Container &cont, ScopedExclusiveLock &contLock, AgentReference &agRef, - const ContainerIdentifier &cId, QueueType queueType, log::LogContext &lc); + const ContainerIdentifier &cId, log::LogContext &lc); static void getLockedAndFetchedNoCreate(Container &cont, ScopedExclusiveLock &contLock, - const ContainerIdentifier &cId, QueueType queueType, log::LogContext &lc); + const ContainerIdentifier &cId, log::LogContext &lc); static void addReferencesAndCommit(Container &cont, typename InsertedElement::list &elemMemCont, AgentReference &agentRef, log::LogContext &lc); static void addReferencesIfNecessaryAndCommit(Container &cont, typename InsertedElement::list &elemMemCont, @@ -145,9 +145,6 @@ struct ContainerTraits<ArchiveQueue,C> static const std::string c_containerTypeName; static const std::string c_identifierType; -private: - static bool trimContainerIfNeeded(Container &cont, QueueType queueType, ScopedExclusiveLock &contLock, - const ContainerIdentifier &cId, log::LogContext &lc); }; @@ -202,7 +199,7 @@ addToLog(log::ScopedParamContainer ¶ms) { template<typename C> bool ContainerTraits<ArchiveQueue,C>:: -trimContainerIfNeeded(Container& cont, QueueType queueType, ScopedExclusiveLock & contLock, +trimContainerIfNeeded(Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifier & cId, log::LogContext& lc) { if (!cont.isEmpty()) return false; @@ -210,10 +207,12 @@ trimContainerIfNeeded(Container& cont, QueueType queueType, ScopedExclusiveLock contLock.release(); try { // The queue should be removed as it is empty. + ContainerTraits<ArchiveQueue,C>::QueueType queueType; + //TODO : Voir si le queue type correspondra au bon type RootEntry re(cont.m_objectStore); ScopedExclusiveLock rexl(re); re.fetch(); - re.removeArchiveQueueAndCommit(cId, queueType, lc); + re.removeArchiveQueueAndCommit(cId, queueType.value, lc); log::ScopedParamContainer params(lc); params.add("tapepool", cId) .add("queueObject", cont.getAddressIfSet()); @@ -232,15 +231,15 @@ trimContainerIfNeeded(Container& cont, QueueType queueType, ScopedExclusiveLock template<typename C> void ContainerTraits<ArchiveQueue,C>:: getLockedAndFetched(Container& cont, ScopedExclusiveLock& aqL, AgentReference& agRef, - const ContainerIdentifier& contId, QueueType queueType, log::LogContext& lc) + const ContainerIdentifier& contId, log::LogContext& lc) { - Helpers::getLockedAndFetchedJobQueue<Container>(cont, aqL, agRef, contId, queueType, lc); + ContainerTraits<ArchiveQueue,C>::QueueType queueType; + Helpers::getLockedAndFetchedJobQueue<Container>(cont, aqL, agRef, contId, queueType.value, lc); } template<typename C> void ContainerTraits<ArchiveQueue,C>:: -getLockedAndFetchedNoCreate(Container& cont, ScopedExclusiveLock& contLock, const ContainerIdentifier& cId, - QueueType queueType, log::LogContext& lc) +getLockedAndFetchedNoCreate(Container& cont, ScopedExclusiveLock& contLock, const ContainerIdentifier& cId, log::LogContext& lc) { // Try and get access to a queue. size_t attemptCount = 0; @@ -248,7 +247,8 @@ getLockedAndFetchedNoCreate(Container& cont, ScopedExclusiveLock& contLock, cons objectstore::RootEntry re(cont.m_objectStore); re.fetchNoLock(); std::string aqAddress; - auto aql = re.dumpArchiveQueues(queueType); + ContainerTraits<ArchiveQueue,C>::QueueType queueType; + auto aql = re.dumpArchiveQueues(queueType.value); for (auto & aqp : aql) { if (aqp.tapePool == cId) aqAddress = aqp.address; @@ -269,7 +269,7 @@ getLockedAndFetchedNoCreate(Container& cont, ScopedExclusiveLock& contLock, cons ScopedExclusiveLock rexl(re); re.fetch(); try { - re.removeArchiveQueueAndCommit(cId, queueType, lc); + re.removeArchiveQueueAndCommit(cId, queueType.value, lc); log::ScopedParamContainer params(lc); params.add("tapepool", cId) .add("queueObject", cont.getAddressIfSet()); @@ -476,4 +476,19 @@ struct ContainerTraits<ArchiveQueue,ArchiveQueueToTransfer>::PoppedElementsSumma } }; +template<> +struct ContainerTraits<ArchiveQueue,ArchiveQueueToTransfer>::QueueType { + objectstore::JobQueueType value = objectstore::JobQueueType::JobsToTransfer; +}; + +template<> +struct ContainerTraits<ArchiveQueue,ArchiveQueueFailed>::QueueType { + objectstore::JobQueueType value = objectstore::JobQueueType::FailedJobs; +}; + +template<> +struct ContainerTraits<ArchiveQueue,ArchiveQueueToReport>::QueueType { + objectstore::JobQueueType value = objectstore::JobQueueType::JobsToReport; +}; + }} // namespace cta::objectstore diff --git a/objectstore/ArchiveQueueFailedAlgorithms.cpp b/objectstore/ArchiveQueueFailedAlgorithms.cpp index af73ce15378a4ff512c9ab49c1e4e5bdac8735ff..f44eb7a11d552cab9b6111e099b86808b82a4b54 100644 --- a/objectstore/ArchiveQueueFailedAlgorithms.cpp +++ b/objectstore/ArchiveQueueFailedAlgorithms.cpp @@ -30,12 +30,4 @@ const std::string ContainerTraits<ArchiveQueue,ArchiveQueueFailed>::c_containerT template<> const std::string ContainerTraits<ArchiveQueue,ArchiveQueueFailed>::c_identifierType = "tapepool"; -template<> -bool ContainerTraits<ArchiveQueue,ArchiveQueueFailed>:: -trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId, - log::LogContext &lc) -{ - return trimContainerIfNeeded(cont, JobQueueType::FailedJobs, contLock, cId, lc); -} - }} // namespace cta::objectstore diff --git a/objectstore/ArchiveQueueToReportAlgorithms.cpp b/objectstore/ArchiveQueueToReportAlgorithms.cpp index a26cf0711f95d252af89796eaf3df97daf4dc9d3..0baeae53cb776edbaae608079344b9896c489716 100644 --- a/objectstore/ArchiveQueueToReportAlgorithms.cpp +++ b/objectstore/ArchiveQueueToReportAlgorithms.cpp @@ -75,12 +75,4 @@ getContainerSummary(Container& cont) -> ContainerSummary { return ret; } -template<> -bool ContainerTraits<ArchiveQueue,ArchiveQueueToReport>:: -trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId, - log::LogContext &lc) -{ - return trimContainerIfNeeded(cont, JobQueueType::JobsToReport, contLock, cId, lc); -} - }} // namespace cta::objectstore diff --git a/objectstore/ArchiveQueueToTransferAlgorithms.cpp b/objectstore/ArchiveQueueToTransferAlgorithms.cpp index fcb14fe043eefe83eba35f4de1dddd6251b49147..66ffa45089c327a1d7870773c2a67c09de32578a 100644 --- a/objectstore/ArchiveQueueToTransferAlgorithms.cpp +++ b/objectstore/ArchiveQueueToTransferAlgorithms.cpp @@ -69,12 +69,4 @@ addToLog(log::ScopedParamContainer ¶ms) { .add("files", summary.files); } -template<> -bool ContainerTraits<ArchiveQueue,ArchiveQueueToTransfer>:: -trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId, - log::LogContext &lc) -{ - return trimContainerIfNeeded(cont, JobQueueType::JobsToTransfer, contLock, cId, lc); -} - }} // namespace cta::objectstore diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index c43edc55c75c4e92704e31c1201d1e1e81aa625b..7bc7bef6adb5093def67e7f82ede8a7f5bab45aa 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -413,7 +413,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a std::set<std::string> jobsNotRequeued; std::string queueAddress; try { - aqcl.referenceAndSwitchOwnershipIfNecessary(tapepool, queueType, agent.getAddressIfSet(), queueAddress, jobsToAdd, lc); + aqcl.referenceAndSwitchOwnershipIfNecessary(tapepool, agent.getAddressIfSet(), queueAddress, jobsToAdd, lc); } catch (AqAlgos::OwnershipSwitchFailure & failure) { for (auto &failedAR: failure.failedElements) { try { diff --git a/objectstore/RepackQueueAlgorithms.hpp b/objectstore/RepackQueueAlgorithms.hpp index cbac95132afb0261692e6312701c8a4583ec80e0..f0c50e7a0fc27f1f32dae2ec5d65d54556e544cd 100644 --- a/objectstore/RepackQueueAlgorithms.hpp +++ b/objectstore/RepackQueueAlgorithms.hpp @@ -35,8 +35,8 @@ struct ContainerTraits<RepackQueue,C> void addDeltaToLog(ContainerSummary&, log::ScopedParamContainer&); }; - typedef cta::objectstore::RepackQueueType QueueType; - + struct QueueType; + struct InsertedElement { std::unique_ptr<RepackRequest> repackRequest; cta::optional<serializers::RepackRequestStatus> newStatus; @@ -112,9 +112,9 @@ struct ContainerTraits<RepackQueue,C> static bool trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId, log::LogContext &lc); static void getLockedAndFetched(Container &cont, ScopedExclusiveLock &contLock, AgentReference &agRef, - const ContainerIdentifier &cId, RepackQueueType queueType, log::LogContext &lc); + const ContainerIdentifier &cId, log::LogContext &lc); static void getLockedAndFetchedNoCreate(Container &cont, ScopedExclusiveLock &contLock, - const ContainerIdentifier &cId, RepackQueueType queueType, log::LogContext &lc); + const ContainerIdentifier &cId, log::LogContext &lc); static void addReferencesAndCommit(Container &cont, typename InsertedElement::list &elemMemCont, AgentReference &agentRef, log::LogContext &lc); static void addReferencesIfNecessaryAndCommit(Container &cont, typename InsertedElement::list &elemMemCont, @@ -142,9 +142,6 @@ struct ContainerTraits<RepackQueue,C> static const std::string c_containerTypeName; static const std::string c_identifierType; -private: - static bool trimContainerIfNeeded(Container &cont, JobQueueType queueType, ScopedExclusiveLock &contLock, - const ContainerIdentifier &cId, log::LogContext &lc); }; @@ -216,7 +213,7 @@ addToLog(log::ScopedParamContainer ¶ms) { template<typename C> bool ContainerTraits<RepackQueue,C>:: -trimContainerIfNeeded(Container& cont, JobQueueType queueType, ScopedExclusiveLock & contLock, +trimContainerIfNeeded(Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifier & cId, log::LogContext& lc) { // Repack queues are one per status, so we do not need to trim them. @@ -226,15 +223,15 @@ trimContainerIfNeeded(Container& cont, JobQueueType queueType, ScopedExclusiveLo template<typename C> void ContainerTraits<RepackQueue,C>:: getLockedAndFetched(Container& cont, ScopedExclusiveLock& aqL, AgentReference& agRef, - const ContainerIdentifier& contId, RepackQueueType queueType, log::LogContext& lc) + const ContainerIdentifier& contId, log::LogContext& lc) { - Helpers::getLockedAndFetchedRepackQueue(cont, aqL, agRef, queueType, lc); + ContainerTraits<RepackQueue,C>::QueueType queueType; + Helpers::getLockedAndFetchedRepackQueue(cont, aqL, agRef, queueType.value, lc); } template<typename C> void ContainerTraits<RepackQueue,C>:: -getLockedAndFetchedNoCreate(Container& cont, ScopedExclusiveLock& contLock, const ContainerIdentifier& cId, - RepackQueueType queueType, log::LogContext& lc) +getLockedAndFetchedNoCreate(Container& cont, ScopedExclusiveLock& contLock, const ContainerIdentifier& cId, log::LogContext& lc) { // Try and get access to a queue. size_t attemptCount = 0; @@ -242,8 +239,9 @@ getLockedAndFetchedNoCreate(Container& cont, ScopedExclusiveLock& contLock, cons objectstore::RootEntry re(cont.m_objectStore); re.fetchNoLock(); std::string rpkQAddress; + ContainerTraits<RepackQueue,C>::QueueType queueType; try { - rpkQAddress = re.getRepackQueueAddress(queueType); + rpkQAddress = re.getRepackQueueAddress(queueType.value); } catch (RootEntry::NotAllocated &) { throw NoSuchContainer("In ContainerTraits<RepackQueue,C>::getLockedAndFetchedNoCreate(): no such repack queue"); } @@ -262,7 +260,7 @@ getLockedAndFetchedNoCreate(Container& cont, ScopedExclusiveLock& contLock, cons ScopedExclusiveLock rexl(re); re.fetch(); try { - re.removeRepackQueueAndCommit(queueType, lc); + re.removeRepackQueueAndCommit(queueType.value, lc); log::ScopedParamContainer params(lc); params.add("queueObject", cont.getAddressIfSet()); lc.log(log::INFO, @@ -392,6 +390,13 @@ getElementSummary(const PoppedElement& poppedElement) -> PoppedElementsSummary { return ret; } +template<> +struct ContainerTraits<RepackQueue,RepackQueuePending>::QueueType{ + objectstore::RepackQueueType value = objectstore::RepackQueueType::Pending; +}; - +template<> +struct ContainerTraits<RepackQueue,RepackQueueToExpand>::QueueType{ + objectstore::RepackQueueType value = objectstore::RepackQueueType::ToExpand; +}; }} // namespace cta::objectstore diff --git a/objectstore/RepackQueuePendingAlgorithms.cpp b/objectstore/RepackQueuePendingAlgorithms.cpp index 6f7f288d6790786e992790e5dd92c1b3c7e8ed5a..7a9ed8b4ed22f3b9028d7dd8da46c2bc15069c65 100644 --- a/objectstore/RepackQueuePendingAlgorithms.cpp +++ b/objectstore/RepackQueuePendingAlgorithms.cpp @@ -26,14 +26,6 @@ const std::string ContainerTraits<RepackQueue,RepackQueuePending>::c_containerTy template<> const std::string ContainerTraits<RepackQueue,RepackQueuePending>::c_identifierType = "uniqueQueue"; -template<> -bool ContainerTraits<RepackQueue,RepackQueuePending>:: -trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId, - log::LogContext &lc) -{ - return trimContainerIfNeeded(cont, JobQueueType::FailedJobs, contLock, cId, lc); -} - template<> auto ContainerTraits<RepackQueue,RepackQueuePending>::getContainerSummary(Container &cont) -> ContainerSummary { diff --git a/objectstore/RepackQueueToExpandAlgorithms.cpp b/objectstore/RepackQueueToExpandAlgorithms.cpp index 8b137891791fe96927ad78e64b0aad7bded08bdc..476a078e931ae9286acaa7ef09a865173bd2df29 100644 --- a/objectstore/RepackQueueToExpandAlgorithms.cpp +++ b/objectstore/RepackQueueToExpandAlgorithms.cpp @@ -1 +1,18 @@ +#include "RepackQueueAlgorithms.hpp" +namespace cta { namespace objectstore { + template<> + const std::string ContainerTraits<RepackQueue,RepackQueueToExpand>::c_containerTypeName = "RepackQueueToExpand"; + + template<> + const std::string ContainerTraits<RepackQueue,RepackQueueToExpand>::c_identifierType = "uniqueQueue"; + + template<> + auto ContainerTraits<RepackQueue,RepackQueueToExpand>::getContainerSummary(Container &cont) -> ContainerSummary + { + ContainerSummary ret; + ret.requests = cont.getRequestsSummary().requests; + return ret; + } + +}} \ No newline at end of file diff --git a/objectstore/RetrieveQueueAlgorithms.hpp b/objectstore/RetrieveQueueAlgorithms.hpp index f93dfb3debd533f7af14aea630bb7315a796fa45..d33c1c93a55c7686f9315ee01ce7e8c7f0a7cd61 100644 --- a/objectstore/RetrieveQueueAlgorithms.hpp +++ b/objectstore/RetrieveQueueAlgorithms.hpp @@ -33,8 +33,8 @@ struct ContainerTraits<RetrieveQueue,C> void addDeltaToLog(const ContainerSummary&, log::ScopedParamContainer&) const; }; - typedef cta::objectstore::JobQueueType QueueType; - + struct QueueType; + struct InsertedElement { RetrieveRequest *retrieveRequest; uint16_t copyNb; @@ -120,9 +120,9 @@ struct ContainerTraits<RetrieveQueue,C> static bool trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId, log::LogContext &lc); static void getLockedAndFetched(Container &cont, ScopedExclusiveLock &contLock, AgentReference &agRef, - const ContainerIdentifier &cId, QueueType queueType, log::LogContext &lc); + const ContainerIdentifier &cId, log::LogContext &lc); static void getLockedAndFetchedNoCreate(Container &cont, ScopedExclusiveLock &contLock, - const ContainerIdentifier &cId, QueueType queueType, log::LogContext &lc); + const ContainerIdentifier &cId, log::LogContext &lc); static void addReferencesAndCommit(Container &cont, typename InsertedElement::list &elemMemCont, AgentReference &agentRef, log::LogContext &lc); static void addReferencesIfNecessaryAndCommit(Container &cont, typename InsertedElement::list &elemMemCont, @@ -144,10 +144,6 @@ struct ContainerTraits<RetrieveQueue,C> static const std::string c_containerTypeName; static const std::string c_identifierType; - -private: - static bool trimContainerIfNeeded(Container &cont, QueueType queueType, ScopedExclusiveLock &contLock, - const ContainerIdentifier &cId, log::LogContext &lc); }; @@ -204,15 +200,16 @@ addToLog(log::ScopedParamContainer ¶ms) const { template<typename C> void ContainerTraits<RetrieveQueue,C>:: getLockedAndFetched(Container &cont, ScopedExclusiveLock &aqL, AgentReference &agRef, - const ContainerIdentifier &contId, QueueType queueType, log::LogContext &lc) + const ContainerIdentifier &contId, log::LogContext &lc) { - Helpers::getLockedAndFetchedJobQueue<Container>(cont, aqL, agRef, contId, queueType, lc); + ContainerTraits<RetrieveQueue,C>::QueueType queueType; + Helpers::getLockedAndFetchedJobQueue<Container>(cont, aqL, agRef, contId, queueType.value, lc); } template<typename C> void ContainerTraits<RetrieveQueue,C>:: getLockedAndFetchedNoCreate(Container &cont, ScopedExclusiveLock &contLock, - const ContainerIdentifier &cId, QueueType queueType, log::LogContext &lc) + const ContainerIdentifier &cId, log::LogContext &lc) { // Try and get access to a queue. size_t attemptCount = 0; @@ -221,7 +218,8 @@ retry: objectstore::RootEntry re(cont.m_objectStore); re.fetchNoLock(); std::string rqAddress; - auto rql = re.dumpRetrieveQueues(queueType); + ContainerTraits<RetrieveQueue,C>::QueueType queueType; + auto rql = re.dumpRetrieveQueues(queueType.value); for (auto &rqp : rql) { if (rqp.vid == cId) rqAddress = rqp.address; @@ -241,7 +239,7 @@ retry: ScopedExclusiveLock rexl(re); re.fetch(); try { - re.removeRetrieveQueueAndCommit(cId, queueType, lc); + re.removeRetrieveQueueAndCommit(cId, queueType.value, lc); log::ScopedParamContainer params(lc); params.add("tapeVid", cId) .add("queueObject", cont.getAddressIfSet()); @@ -390,7 +388,7 @@ switchElementsOwnership(PoppedElementsBatch &poppedElementBatch, const Container template<typename C> bool ContainerTraits<RetrieveQueue,C>:: -trimContainerIfNeeded(Container &cont, QueueType queueType, ScopedExclusiveLock &contLock, +trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId, log::LogContext &lc) { if(!cont.isEmpty()) return false; @@ -398,10 +396,11 @@ trimContainerIfNeeded(Container &cont, QueueType queueType, ScopedExclusiveLock contLock.release(); try { // The queue should be removed as it is empty + ContainerTraits<RetrieveQueue,C>::QueueType queueType; RootEntry re(cont.m_objectStore); ScopedExclusiveLock rexl(re); re.fetch(); - re.removeRetrieveQueueAndCommit(cId, queueType, lc); + re.removeRetrieveQueueAndCommit(cId, queueType.value, lc); log::ScopedParamContainer params(lc); params.add("tapeVid", cId) .add("queueObject", cont.getAddressIfSet()); @@ -460,4 +459,19 @@ struct ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::PoppedElementsSum } }; +template<> +struct ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::QueueType{ + objectstore::JobQueueType value = objectstore::JobQueueType::JobsToTransfer; +}; + +template<> +struct ContainerTraits<RetrieveQueue,RetrieveQueueFailed>::QueueType{ + objectstore::JobQueueType value = objectstore::JobQueueType::FailedJobs; +}; + +template<> +struct ContainerTraits<RetrieveQueue,RetrieveQueueToReport>::QueueType{ + objectstore::JobQueueType value = objectstore::JobQueueType::JobsToReport; +}; + }} // namespace cta::objectstore diff --git a/objectstore/RetrieveQueueFailedAlgorithms.cpp b/objectstore/RetrieveQueueFailedAlgorithms.cpp index d8892c7a3d52c318e208fe84adff83817d07c531..9ac63960c935a90e29dc3a219cfb580e487513da 100644 --- a/objectstore/RetrieveQueueFailedAlgorithms.cpp +++ b/objectstore/RetrieveQueueFailedAlgorithms.cpp @@ -47,12 +47,4 @@ getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, return ret; } -template<> -bool ContainerTraits<RetrieveQueue,RetrieveQueueFailed>:: -trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId, - log::LogContext &lc) -{ - return trimContainerIfNeeded(cont, QueueType::FailedJobs, contLock, cId, lc); -} - }} // namespace cta::objectstore diff --git a/objectstore/RetrieveQueueToReportAlgorithms.cpp b/objectstore/RetrieveQueueToReportAlgorithms.cpp index 421267836bf8dc1923a47d3d15779f3be3f9d8c0..d8865a30417913058c340783e69443b31d9b707c 100644 --- a/objectstore/RetrieveQueueToReportAlgorithms.cpp +++ b/objectstore/RetrieveQueueToReportAlgorithms.cpp @@ -47,12 +47,4 @@ getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, return ret; } -template<> -bool ContainerTraits<RetrieveQueue,RetrieveQueueToReport>:: -trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId, - log::LogContext &lc) -{ - return trimContainerIfNeeded(cont, QueueType::JobsToReport, contLock, cId, lc); -} - }} // namespace cta::objectstore diff --git a/objectstore/RetrieveQueueToTransferAlgorithms.cpp b/objectstore/RetrieveQueueToTransferAlgorithms.cpp index 01454a0947edb90f2c545ea1124ce6bf861f98ce..3e196af0ce55bceac846f82f7dc4c3f5ed5f5848 100644 --- a/objectstore/RetrieveQueueToTransferAlgorithms.cpp +++ b/objectstore/RetrieveQueueToTransferAlgorithms.cpp @@ -64,12 +64,4 @@ getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, return ret; } -template<> -bool ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>:: -trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId, - log::LogContext &lc) -{ - return trimContainerIfNeeded(cont, QueueType::JobsToTransfer, contLock, cId, lc); -} - }} // namespace cta::objectstore diff --git a/scheduler/CMakeLists.txt b/scheduler/CMakeLists.txt index ab194673e5a0e957be55f24e774b0da3419c59c2..036429339604bb54efff9cb46285cff405614e86 100644 --- a/scheduler/CMakeLists.txt +++ b/scheduler/CMakeLists.txt @@ -21,7 +21,8 @@ set (CTA_SCHEDULER_SRC_FILES OStoreDB/OStoreDB.cpp OStoreDB/OStoreDBWithAgent.cpp LabelMount.cpp - DiskReportRunner.cpp) + DiskReportRunner.cpp + RepackRequestManager.cpp) find_package(Protobuf3 REQUIRED) include_directories (${PROTOBUF3_INCLUDE_DIRS}) diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 8afb1c4d0f11145a5d2475cc4b1e3d432df3effa..2ce2631aa4e1e6937c025dc72f236e433fdfb3d1 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -693,7 +693,7 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::getNextArch // so we can got for another round. AQTRAlgo::PopCriteria criteria; criteria.files = filesRequested; - auto jobs = aqtrAlgo.popNextBatch(queueList.front().tapePool, JobQueueType::JobsToReport, criteria, logContext); + auto jobs = aqtrAlgo.popNextBatch(queueList.front().tapePool, criteria, logContext); if (jobs.elements.empty()) continue; for (auto & j: jobs.elements) { std::unique_ptr<OStoreDB::ArchiveJob> aj(new OStoreDB::ArchiveJob(j.archiveRequest->getAddressIfSet(), *this)); @@ -779,7 +779,7 @@ void OStoreDB::setJobBatchReported(std::list<cta::SchedulerDatabase::ArchiveJob* cta::nullopt, serializers::ArchiveJobStatus::AJS_Failed}); } try { - caAQF.referenceAndSwitchOwnership(queue.first, JobQueueType::FailedJobs, m_agentReference->getAgentAddress(), + caAQF.referenceAndSwitchOwnership(queue.first, m_agentReference->getAgentAddress(), insertedElements, lc); } catch (exception::Exception & ex) { log::ScopedParamContainer params(lc); @@ -1071,7 +1071,7 @@ void OStoreDB::queueRepack(const std::string& vid, const std::string& bufferURL, elements.push_back(RQPAlgo::InsertedElement()); elements.back().repackRequest=std::move(rr); RQPAlgo rqpAlgo(m_objectStore, *m_agentReference); - rqpAlgo.referenceAndSwitchOwnership(nullopt, RepackQueueType::Pending, m_agentReference->getAgentAddress(), + rqpAlgo.referenceAndSwitchOwnership(nullopt, m_agentReference->getAgentAddress(), elements, lc); } } @@ -1174,7 +1174,7 @@ auto OStoreDB::RepackRequestPromotionStatistics::promotePendingRequestsForExpans insertedElements.push_back(RQTEAlgo::InsertedElement()); insertedElements.back().repackRequest = std::move(rr.repackRequest); } - rqteAlgo.referenceAndSwitchOwnership(nullopt, RepackQueueType::ToExpand, m_agentReference.getAgentAddress(), + rqteAlgo.referenceAndSwitchOwnership(nullopt, m_agentReference.getAgentAddress(), insertedElements, lc); } ret.promotedRequests = poppedElements.summary.requests; @@ -1246,7 +1246,24 @@ auto OStoreDB::getRepackStatisticsNoLock() -> std::unique_ptr<SchedulerDatabase: // OStoreDB::getNextRequestToExpand() //------------------------------------------------------------------------------ std::unique_ptr<SchedulerDatabase::RepackRequest> OStoreDB::getNextRequestToExpand() { - throw exception::Exception("In OStoreDB::getNextRequestToExpand(): not implemented."); + typedef objectstore::ContainerAlgorithms<RepackQueue,RepackQueueToExpand> RQTEAlgo; + RQTEAlgo rqteAlgo(m_objectStore, *m_agentReference); + RootEntry re(m_objectStore); + re.fetchNoLock(); + log::LogContext lc(m_logger); + while(true){ + RQTEAlgo::PopCriteria criteria; + //A faire : faire en sorte que popNextBatch fonctionne + //rqteAlgo.popNextBatch(cta::nullopt,criteria,lc); + } + /*popNextBatch(queueList.front().vid, objectstore::JobQueueType::JobsToReport, criteria, logContext); + */ + + /*objectstore::RepackRequest repackRequest(rrAddresses.front()); + RepackIndex::Repack + std::unique_ptr<SchedulerDatabase::RepackRequest> ret;*/ + //return ret; + throw exception::Exception("In OStoreDB::getNextRequestToExpand(): not implemented."); } //------------------------------------------------------------------------------ @@ -1320,8 +1337,9 @@ getNextRetrieveJobsToReportBatch(uint64_t filesRequested, log::LogContext &logCo // Try to get jobs from the first queue. If it is empty, it will be trimmed, so we can go for another round. RQTRAlgo::PopCriteria criteria; + RQTRAlgo::JobQueueType jobQueueType; criteria.files = filesRequested; - auto jobs = rqtrAlgo.popNextBatch(queueList.front().vid, objectstore::JobQueueType::JobsToReport, criteria, logContext); + auto jobs = rqtrAlgo.popNextBatch(queueList.front().vid, criteria, logContext); if(jobs.elements.empty()) continue; for(auto &j : jobs.elements) { @@ -1355,7 +1373,7 @@ getNextRetrieveJobsFailedBatch(uint64_t filesRequested, log::LogContext &logCont // Try to get jobs from the first queue. If it is empty, it will be trimmed, so we can go for another round. RQTRAlgo::PopCriteria criteria; criteria.files = filesRequested; - auto jobs = rqtrAlgo.popNextBatch(queueList.front().vid, objectstore::JobQueueType::FailedJobs, criteria, logContext); + auto jobs = rqtrAlgo.popNextBatch(queueList.front().vid, criteria, logContext); if(jobs.elements.empty()) continue; for(auto &j : jobs.elements) { @@ -2102,7 +2120,7 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun typedef objectstore::ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransfer> AQAlgos; AQAlgos aqAlgos(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); AQAlgos::PopCriteria popCriteria(filesRequested, bytesRequested); - auto jobs = aqAlgos.popNextBatch(mountInfo.tapePool, objectstore::JobQueueType::JobsToTransfer, popCriteria, logContext); + auto jobs = aqAlgos.popNextBatch(mountInfo.tapePool, popCriteria, logContext); // We can construct the return value. std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > ret; for (auto & j: jobs.elements) { @@ -2182,7 +2200,7 @@ getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContex typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer> RQAlgos; RQAlgos rqAlgos(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); RQAlgos::PopCriteria popCriteria(filesRequested, bytesRequested); - auto jobs = rqAlgos.popNextBatch(mountInfo.vid, objectstore::JobQueueType::JobsToTransfer, popCriteria, logContext); + auto jobs = rqAlgos.popNextBatch(mountInfo.vid, popCriteria, logContext); // We can construct the return value std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> ret; for(auto &j : jobs.elements) @@ -2386,7 +2404,7 @@ void OStoreDB::ArchiveMount::setJobBatchTransferred(std::list<std::unique_ptr<ct for (auto &list: insertedElementsLists) { try { utils::Timer tLocal; - aqtrCa.referenceAndSwitchOwnership(list.first, JobQueueType::JobsToReport, m_oStoreDB.m_agentReference->getAgentAddress(), + aqtrCa.referenceAndSwitchOwnership(list.first, m_oStoreDB.m_agentReference->getAgentAddress(), list.second, lc); log::ScopedParamContainer params(lc); params.add("tapeVid", list.first) @@ -2475,7 +2493,7 @@ void OStoreDB::ArchiveJob::failTransfer(const std::string& failureReason, log::L CaAqtr caAqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); CaAqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaAqtr::InsertedElement{&m_archiveRequest, tapeFile.copyNb, archiveFile, cta::nullopt, cta::nullopt }); - caAqtr.referenceAndSwitchOwnership(tapeFile.vid, objectstore::JobQueueType::JobsToReport, insertedElements, lc); + caAqtr.referenceAndSwitchOwnership(tapeFile.vid, insertedElements, lc); log::ScopedParamContainer params(lc); params.add("fileId", archiveFile.archiveFileID) .add("copyNb", tapeFile.copyNb) @@ -2498,8 +2516,7 @@ void OStoreDB::ArchiveJob::failTransfer(const std::string& failureReason, log::L CaAqtr caAqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); CaAqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaAqtr::InsertedElement{&m_archiveRequest, tapeFile.copyNb, archiveFile, cta::nullopt, cta::nullopt }); - caAqtr.referenceAndSwitchOwnership(tapepool, objectstore::JobQueueType::JobsToTransfer, - insertedElements, lc); + caAqtr.referenceAndSwitchOwnership(tapepool, insertedElements, lc); log::ScopedParamContainer params(lc); params.add("fileId", archiveFile.archiveFileID) .add("copyNb", tapeFile.copyNb) @@ -2522,7 +2539,7 @@ void OStoreDB::ArchiveJob::failTransfer(const std::string& failureReason, log::L CaAqtr caAqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); CaAqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaAqtr::InsertedElement{&m_archiveRequest, tapeFile.copyNb, archiveFile, cta::nullopt, cta::nullopt }); - caAqtr.referenceAndSwitchOwnership(tapeFile.vid, objectstore::JobQueueType::FailedJobs, insertedElements, lc); + caAqtr.referenceAndSwitchOwnership(tapeFile.vid, insertedElements, lc); log::ScopedParamContainer params(lc); params.add("fileId", archiveFile.archiveFileID) .add("copyNb", tapeFile.copyNb) @@ -2567,7 +2584,7 @@ void OStoreDB::ArchiveJob::failReport(const std::string& failureReason, log::Log CaAqtr caAqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); CaAqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaAqtr::InsertedElement{&m_archiveRequest, tapeFile.copyNb, archiveFile, cta::nullopt, cta::nullopt }); - caAqtr.referenceAndSwitchOwnership(tapeFile.vid, objectstore::JobQueueType::JobsToReport, insertedElements, lc); + caAqtr.referenceAndSwitchOwnership(tapeFile.vid, insertedElements, lc); auto retryStatus = m_archiveRequest.getRetryStatus(tapeFile.copyNb); log::ScopedParamContainer params(lc); params.add("fileId", archiveFile.archiveFileID) @@ -2587,7 +2604,7 @@ void OStoreDB::ArchiveJob::failReport(const std::string& failureReason, log::Log CaAqtr caAqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); CaAqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaAqtr::InsertedElement{&m_archiveRequest, tapeFile.copyNb, archiveFile, cta::nullopt, cta::nullopt }); - caAqtr.referenceAndSwitchOwnership(tapeFile.vid, objectstore::JobQueueType::FailedJobs, insertedElements, lc); + caAqtr.referenceAndSwitchOwnership(tapeFile.vid, insertedElements, lc); auto retryStatus = m_archiveRequest.getRetryStatus(tapeFile.copyNb); log::ScopedParamContainer params(lc); params.add("fileId", archiveFile.archiveFileID) @@ -2864,7 +2881,7 @@ void OStoreDB::RetrieveJob::failTransfer(const std::string &failureReason, log:: rel.release(); CaRqtr caRqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); - caRqtr.referenceAndSwitchOwnership(tf.vid, objectstore::JobQueueType::JobsToReport, insertedElements, lc); + caRqtr.referenceAndSwitchOwnership(tf.vid, insertedElements, lc); log::ScopedParamContainer params(lc); params.add("fileId", archiveFile.archiveFileID) .add("copyNb", selectedCopyNb) @@ -2919,7 +2936,7 @@ void OStoreDB::RetrieveJob::failTransfer(const std::string &failureReason, log:: }); CaRqtr caRqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); - caRqtr.referenceAndSwitchOwnership(bestVid, objectstore::JobQueueType::JobsToTransfer, insertedElements, lc); + caRqtr.referenceAndSwitchOwnership(bestVid, insertedElements, lc); log::ScopedParamContainer params(lc); params.add("fileId", archiveFile.archiveFileID) .add("copyNb", selectedCopyNb) @@ -3005,7 +3022,7 @@ void OStoreDB::RetrieveJob::failReport(const std::string &failureReason, log::Lo insertedElements.push_back(CaAqtr::InsertedElement{ &m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy, serializers::RetrieveJobStatus::RJS_ToReportForFailure }); - caAqtr.referenceAndSwitchOwnership(tf.vid, objectstore::JobQueueType::JobsToReport, insertedElements, lc); + caAqtr.referenceAndSwitchOwnership(tf.vid, insertedElements, lc); log::ScopedParamContainer params(lc); params.add("fileId", archiveFile.archiveFileID) .add("copyNb", tf.copyNb) @@ -3023,7 +3040,7 @@ void OStoreDB::RetrieveJob::failReport(const std::string &failureReason, log::Lo insertedElements.push_back(CaAqtr::InsertedElement{ &m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy, serializers::RetrieveJobStatus::RJS_Failed }); - caAqtr.referenceAndSwitchOwnership(tf.vid, objectstore::JobQueueType::FailedJobs, insertedElements, lc); + caAqtr.referenceAndSwitchOwnership(tf.vid, insertedElements, lc); log::ScopedParamContainer params(lc); params.add("fileId", archiveFile.archiveFileID) .add("copyNb", tf.copyNb) diff --git a/scheduler/RepackRequestManager.cpp b/scheduler/RepackRequestManager.cpp index 9acd3a6b5ce36038ddb6a349e747e8d13d57c7a8..09b843fdc8f0ea4d95296d30dfcfec9f566ab04e 100644 --- a/scheduler/RepackRequestManager.cpp +++ b/scheduler/RepackRequestManager.cpp @@ -17,6 +17,7 @@ */ #include "RepackRequestManager.hpp" +#include "Scheduler.hpp" namespace cta { @@ -26,7 +27,7 @@ void RepackRequestManager::runOnePass(log::LogContext& lc) { // First expand any request to expand // TODO: implement expansion // Next promote requests to ToExpand if needed - m_scheduler.promoteRepackRequestsToToExpand(); + m_scheduler.promoteRepackRequestsToToExpand(lc); } diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index fd4350b690e6f2a8524ee90887b5313005e7ebc3..296d02d223108eaed82b013a059a2464a9f1517c 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -28,6 +28,7 @@ #include "common/exception/NonRetryableError.hpp" #include "common/exception/UserError.hpp" #include "common/make_unique.hpp" +#include "objectstore/RepackRequest.hpp" #include <iostream> #include <sstream> @@ -355,21 +356,26 @@ void Scheduler::promoteRepackRequestsToToExpand(log::LogContext & lc) { .add("toEnpandBefore", stats.toEnpandBefore) .add("pendingAfter", stats.pendingAfter) .add("toExpandAfter", stats.toExpandAfter); + lc.log(log::INFO,"In Scheduler::promoteRepackRequestsToToExpand(): Promoted repack request to \"to expand\""); } } } //------------------------------------------------------------------------------ -// getNextRepackJobToExpand +// getNextRepackRequestToExpand //------------------------------------------------------------------------------ -std::unique_ptr<RepackRequest> Scheduler::getNextRepackJobToExpand() { - throw exception::Exception("In Scheduler::getNextRepackJobToExpand(): not implemented."); +std::unique_ptr<RepackRequest> Scheduler::getNextRepackRequestToExpand() { + auto repackRequest = m_db.getNextRequestToExpand(); + //CONTINUER + std::unique_ptr<RepackRequest> repReqRet(new RepackRequest()); + + throw exception::Exception("In Scheduler::getNextRepackRequestToExpand(): not implemented."); } //------------------------------------------------------------------------------ // expandRepackRequest //------------------------------------------------------------------------------ -void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repqckRequest, log::TimingList&, utils::Timer&, log::LogContext&) { +void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackRequest, log::TimingList&, utils::Timer&, log::LogContext&) { throw exception::Exception("In Scheduler::expandRepackRequest(): not implemented"); } diff --git a/scheduler/Scheduler.hpp b/scheduler/Scheduler.hpp index 976e9d9b799b65bed91ab7d19aaddd543408010c..ea310d590f94493dfa42cc6eab7c394d73c0ac87 100644 --- a/scheduler/Scheduler.hpp +++ b/scheduler/Scheduler.hpp @@ -337,7 +337,7 @@ public: // Promotion of requests void promoteRepackRequestsToToExpand(log::LogContext & lc); // Expansion support - std::unique_ptr<RepackRequest> getNextRepackJobToExpand(); + std::unique_ptr<RepackRequest> getNextRepackRequestToExpand(); void expandRepackRequest(std::unique_ptr<RepackRequest> & repqckRequest, log::TimingList& , utils::Timer &, log::LogContext &); /* ============================== Retrieve reporting support ============================== */ diff --git a/tapeserver/daemon/MaintenanceHandler.cpp b/tapeserver/daemon/MaintenanceHandler.cpp index f2f1bcb4bac6e7b4c5127eb376ff02aa54a33c18..e119b891c5feac1fc23a5de10bdd1a89000ce6dc 100644 --- a/tapeserver/daemon/MaintenanceHandler.cpp +++ b/tapeserver/daemon/MaintenanceHandler.cpp @@ -314,6 +314,7 @@ int MaintenanceHandler::runChild() { "In MaintenanceHandler::runChild(): About to run a GC pass."); gc.runOnePass(m_processManager.logContext()); diskReportRunner.runOnePass(m_processManager.logContext()); + repackRequestManager.runOnePass(m_processManager.logContext()); try { server::SocketPair::poll(pollList, s_pollInterval - t.secs(), server::SocketPair::Side::parent); receivedMessage=true;