diff --git a/objectstore/RetrieveQueueAlgorithms.cpp b/objectstore/RetrieveQueueAlgorithms.cpp index b40c40661994122ac2de4fc889dea0b75b47702b..67cc64fe1c95a1d630006e5cd03be0217de71b65 100644 --- a/objectstore/RetrieveQueueAlgorithms.cpp +++ b/objectstore/RetrieveQueueAlgorithms.cpp @@ -31,6 +31,7 @@ const std::string ContainerTraits<RetrieveQueue>::c_identifierType = "vid"; void ContainerTraitsTypes<RetrieveQueue>::PoppedElementsSummary:: addDeltaToLog(const PoppedElementsSummary &previous, log::ScopedParamContainer ¶ms) { + throw std::runtime_error("1 Not implemented."); #if 0 params.add("filesAdded", files - previous.files) .add("bytesAdded", bytes - previous.bytes) @@ -43,6 +44,7 @@ addDeltaToLog(const PoppedElementsSummary &previous, log::ScopedParamContainer & void ContainerTraitsTypes<RetrieveQueue>::ContainerSummary:: addDeltaToLog(const ContainerSummary &previous, log::ScopedParamContainer ¶ms) { + throw std::runtime_error("2 Not implemented."); #if 0 params.add("queueJobsBefore", previous.jobs) .add("queueBytesBefore", previous.bytes) @@ -89,57 +91,54 @@ template<> void ContainerTraits<RetrieveQueue>:: getLockedAndFetchedNoCreate(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId, log::LogContext &lc) { -#if 0 // Try and get access to a queue. size_t attemptCount = 0; - retry: + +retry: objectstore::RootEntry re(cont.m_objectStore); re.fetchNoLock(); - std::string aqAddress; - auto aql = re.dumpArchiveQueues(QueueType::LiveJobs); - for (auto & aqp : aql) { - if (aqp.tapePool == cId) - aqAddress = aqp.address; + std::string rqAddress; + auto rql = re.dumpRetrieveQueues(QueueType::LiveJobs); + for (auto &rqp : rql) { + if (rqp.vid == cId) + rqAddress = rqp.address; } - if (!aqAddress.size()) throw NoSuchContainer("In ContainerTraits<ArchiveQueue>::getLockedAndFetchedNoCreate(): no such archive queue"); - // try and lock the archive queue. Any failure from here on means the end of the getting jobs. - cont.setAddress(aqAddress); - //findQueueTime += localFindQueueTime = t.secs(utils::Timer::resetCounter); + if(rqAddress.empty()) throw NoSuchContainer("In ContainerTraits<RetrieveQueue>::getLockedAndFetchedNoCreate(): no such retrieve queue"); + + // try and lock the retrieve queue. Any failure from here on means the end of the getting jobs. + cont.setAddress(rqAddress); try { - if (contLock.isLocked()) contLock.release(); + if(contLock.isLocked()) contLock.release(); contLock.lock(cont); cont.fetch(); - //lockFetchQueueTime += localLockFetchQueueTime = t.secs(utils::Timer::resetCounter); - } catch (cta::exception::Exception & ex) { + } catch(cta::exception::Exception & ex) { // The queue is now absent. We can remove its reference in the root entry. - // A new queue could have been added in the mean time, and be non-empty. + // A new queue could have been added in the meantime, and be non-empty. // We will then fail to remove from the RootEntry (non-fatal). ScopedExclusiveLock rexl(re); re.fetch(); try { - re.removeArchiveQueueAndCommit(cId, QueueType::LiveJobs, lc); + re.removeRetrieveQueueAndCommit(cId, QueueType::LiveJobs, lc); log::ScopedParamContainer params(lc); - params.add("tapepool", cId) + params.add("vid", cId) .add("queueObject", cont.getAddressIfSet()); - lc.log(log::INFO, "In ArchiveMount::getNextJobBatch(): de-referenced missing queue from root entry"); - } catch (RootEntry::ArchiveQueueNotEmpty & ex) { + lc.log(log::INFO, "In ContainerTraits<RetrieveQueue>::getLockedAndFetchedNoCreate(): dereferenced missing queue from root entry"); + } catch (RootEntry::RetrieveQueueNotEmpty &ex) { log::ScopedParamContainer params(lc); - params.add("tapepool", cId) + params.add("vid", cId) .add("queueObject", cont.getAddressIfSet()) .add("Message", ex.getMessageValue()); - lc.log(log::INFO, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry"); - } catch (RootEntry::NoSuchArchiveQueue & ex) { - // Somebody removed the queue in the mean time. Barely worth mentioning. + lc.log(log::INFO, "In ContainerTraits<RetrieveQueue>::getLockedAndFetchedNoCreate(): could not dereference missing queue from root entry"); + } catch (RootEntry::NoSuchRetrieveQueue &ex) { + // Somebody removed the queue in the meantime. Barely worth mentioning. log::ScopedParamContainer params(lc); - params.add("tapepool", cId) + params.add("vid", cId) .add("queueObject", cont.getAddressIfSet()); - lc.log(log::DEBUG, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry: already done."); + lc.log(log::DEBUG, "In ContainerTraits<RetrieveQueue>::getLockedAndFetchedNoCreate(): could not dereference missing queue from root entry: already done."); } - //emptyQueueCleanupTime += localEmptyCleanupQueueTime = t.secs(utils::Timer::resetCounter); attemptCount++; goto retry; } -#endif } template<> @@ -147,6 +146,7 @@ void ContainerTraits<RetrieveQueue>:: addReferencesAndCommit(Container &cont, InsertedElement::list &elemMemCont, AgentReference &agentRef, log::LogContext &lc) { + throw std::runtime_error("4 Not implemented."); std::list<RetrieveQueue::JobToAdd> jobsToAdd; for (auto &e : elemMemCont) { RetrieveRequest &rr = *e.retrieveRequest; @@ -172,6 +172,7 @@ template<> void ContainerTraits<RetrieveQueue>::addReferencesIfNecessaryAndCommit(Container& cont, InsertedElement::list& elemMemCont, AgentReference& agentRef, log::LogContext& lc) { + throw std::runtime_error("5 Not implemented."); #if 0 std::list<ArchiveQueue::JobToAdd> jobsToAdd; for (auto & e: elemMemCont) { @@ -207,6 +208,7 @@ removeReferencesAndCommit(Container &cont, std::list<ElementAddress> &elementAdd template<> auto ContainerTraits<RetrieveQueue>:: getContainerSummary(Container &cont) -> ContainerSummary { + throw std::runtime_error("6 Not implemented."); ContainerSummary ret; #if 0 ret.JobsSummary::operator=(cont.getJobsSummary()); @@ -250,6 +252,7 @@ auto ContainerTraits<RetrieveQueue>:: getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, ElementsToSkipSet &elemtsToSkip, log::LogContext &lc) -> PoppedElementsBatch { + throw std::runtime_error("7 Not implemented."); PoppedElementsBatch ret; #if 0 auto candidateJobsFromQueue=cont.getCandidateList(unfulfilledCriteria.bytes, unfulfilledCriteria.files, elemtsToSkip); @@ -266,6 +269,7 @@ getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, template<> auto ContainerTraits<RetrieveQueue>:: getElementSummary(const PoppedElement &poppedElement) -> PoppedElementsSummary { + throw std::runtime_error("8 Not implemented."); PoppedElementsSummary ret; #if 0 ret.bytes = poppedElement.bytes; @@ -280,6 +284,7 @@ switchElementsOwnership(PoppedElementsBatch &poppedElementBatch, const Container const ContainerAddress &previousOwnerAddress, log::TimingList &timingList, utils::Timer &t, log::LogContext &lc) -> OpFailure<PoppedElement>::list { + throw std::runtime_error("9 Not implemented."); #if 0 std::list<std::unique_ptr<ArchiveRequest::AsyncJobOwnerUpdater>> updaters; for (auto & e: popedElementBatch.elements) { @@ -318,6 +323,7 @@ void ContainerTraits<RetrieveQueue>:: trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId, log::LogContext &lc) { + throw std::runtime_error("10 Not implemented."); #if 0 if(cont.isEmpty()) { // The current implementation is done unlocked.