From f6b0786be43cff86b2eb059bd7efd13fbc2b933a Mon Sep 17 00:00:00 2001 From: Eric Cano <Eric.Cano@cern.ch> Date: Fri, 29 Jun 2018 18:01:41 +0200 Subject: [PATCH] Moved ArchiveMount::getNextJobsBatch to generic algoritms. --- objectstore/Algorithms.hpp | 8 +- objectstore/ArchiveQueueAlgorithms.cpp | 7 +- objectstore/ArchiveQueueAlgorithms.hpp | 4 + scheduler/OStoreDB/OStoreDB.cpp | 398 ++----------------------- 4 files changed, 39 insertions(+), 378 deletions(-) diff --git a/objectstore/Algorithms.hpp b/objectstore/Algorithms.hpp index 2f971c188d..de18721379 100644 --- a/objectstore/Algorithms.hpp +++ b/objectstore/Algorithms.hpp @@ -120,6 +120,7 @@ public: m_backend(backend), m_agentReference(agentReference) {} typedef typename ContainerTraits<C>::InsertedElement InsertedElement; + typedef typename ContainerTraits<C>::PopCriteria PopCriteria; /** Reference objects in the container and then switch their ownership them. Objects * are provided existing and owned by algorithm's agent. Returns a list of @@ -175,7 +176,8 @@ public: typename ContainerTraits<C>::ElementsToSkipSet elementsToSkip; log::TimingList timingList; utils::Timer t, totalTime; - while (ret.summary < popCriteria) { + bool unexpectedException = false; + while (!unexpectedException && ret.summary < popCriteria) { typename ContainerTraits<C>::PoppedElementsSummary previousSummary = ret.summary; log::TimingList localTimingList; // Get a container if it exists @@ -233,7 +235,7 @@ public: std::list<typename ContainerTraits<C>::ElementAddress> elementsToDereferenceFromAgent; for (auto &e: failedOwnershipSwitchElements) { try { - throw e.failure; + std::rethrow_exception(e.failure); } catch (Backend::NoSuchObject &) { elementsToDereferenceFromAgent.push_back(ContainerTraits<C>::getElementAddress(*e.element)); elementsNotToReport.insert(ContainerTraits<C>::getElementAddress(*e.element)); @@ -249,6 +251,8 @@ public: elementsToDereferenceFromAgent.push_back(ContainerTraits<C>::getElementAddress(*e.element)); elementsNotToReport.insert(ContainerTraits<C>::getElementAddress(*e.element)); elementsToSkip.insert(ContainerTraits<C>::getElementAddress(*e.element)); + // If we get this kind of situation, we do not try to carry on, as it becomes too complex. + unexpectedException = true; } } // We are done with the sorting. Apply the decisions... diff --git a/objectstore/ArchiveQueueAlgorithms.cpp b/objectstore/ArchiveQueueAlgorithms.cpp index deea1026bd..b194a5a64f 100644 --- a/objectstore/ArchiveQueueAlgorithms.cpp +++ b/objectstore/ArchiveQueueAlgorithms.cpp @@ -172,7 +172,8 @@ auto ContainerTraits<ArchiveQueue>::getPoppingElementsCandidates(Container& cont PoppedElementsBatch ret; auto candidateJobsFromQueue=cont.getCandidateList(unfulfilledCriteria.bytes, unfulfilledCriteria.files, elemtsToSkip); for (auto &cjfq: candidateJobsFromQueue.candidates) { - ret.elements.emplace_back(PoppedElement{cta::make_unique<ArchiveRequest>(cjfq.address, cont.m_objectStore), cjfq.copyNb, cjfq.size}); + ret.elements.emplace_back(PoppedElement{cta::make_unique<ArchiveRequest>(cjfq.address, cont.m_objectStore), cjfq.copyNb, cjfq.size, + common::dataStructures::ArchiveFile(), "", "", "", }); ret.summary.bytes += cjfq.size; ret.summary.files++; } @@ -219,6 +220,10 @@ auto ContainerTraits<ArchiveQueue>::switchElementsOwnership(PoppedElementsBatch while (e != popedElementBatch.elements.end()) { try { u->get()->wait(); + e->archiveFile = u->get()->getArchiveFile(); + e->archiveReportURL = u->get()->getArchiveReportURL(); + e->errorReportURL = u->get()->getArchiveErrorReportURL(); + e->srcURL = u->get()->getSrcURL(); } catch (...) { ret.push_back(OpFailure<PoppedElement>()); ret.back().element = &(*e); diff --git a/objectstore/ArchiveQueueAlgorithms.hpp b/objectstore/ArchiveQueueAlgorithms.hpp index 90a42f82b2..2909ee319d 100644 --- a/objectstore/ArchiveQueueAlgorithms.hpp +++ b/objectstore/ArchiveQueueAlgorithms.hpp @@ -87,6 +87,10 @@ public: std::unique_ptr<ArchiveRequest> archiveRequest; uint16_t copyNb; uint64_t bytes; + common::dataStructures::ArchiveFile archiveFile; + std::string archiveReportURL; + std::string errorReportURL; + std::string srcURL; }; class PoppedElementsSummary; class PopCriteria { diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 832ce9c219..cd898d9b0b 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -18,6 +18,7 @@ #include "OStoreDB.hpp" #include "MemQueues.hpp" +#include "objectstore/ArchiveQueueAlgorithms.hpp" //#include "common/dataStructures/SecurityIdentity.hpp" #include "objectstore/DriveRegister.hpp" #include "objectstore/DriveState.hpp" @@ -1650,383 +1651,30 @@ const SchedulerDatabase::ArchiveMount::MountInfo& OStoreDB::ArchiveMount::getMou //------------------------------------------------------------------------------ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMount::getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext& logContext) { - utils::Timer t, totalTime; - double driveRegisterCheckTime = 0; - double findQueueTime = 0; - double lockFetchQueueTime = 0; - double emptyQueueCleanupTime = 0; - double jobSelectionTime = 0; - double ownershipAdditionTime = 0; - double asyncUpdateLaunchTime = 0; - double jobsUpdateTime = 0; - double queueProcessAndCommitTime = 0; - double queueRemovalTime = 0; - double ownershipRemovalTime = 0; - // Find the next files to archive - // First, check we should not forcibly go down. In such an occasion, we just find noting to do. - // Get drive register - { - // Get the archive queue. Failure is non-fatal. We will carry on. - try { - objectstore::RootEntry re(m_oStoreDB.m_objectStore); - re.fetchNoLock(); - objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_oStoreDB.m_objectStore); - dr.fetchNoLock(); - objectstore::DriveState ds(dr.getDriveAddress(mountInfo.drive), m_oStoreDB.m_objectStore); - ds.fetchNoLock(); - auto driveStateValue = ds.getState(); - if (!driveStateValue.desiredDriveState.up && driveStateValue.desiredDriveState.forceDown) { - logContext.log(log::INFO, "In OStoreDB::ArchiveMount::getNextJobBatch(): returning no job as we are forcibly going down."); - return std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> >(); - } - } catch (cta::exception::Exception & ex) { - log::ScopedParamContainer params (logContext); - params.add("exceptionMessage", ex.getMessageValue()); - logContext.log(log::INFO, "In OStoreDB::ArchiveMount::getNextJobBatch(): failed to check up/down status."); - } - driveRegisterCheckTime = t.secs(utils::Timer::resetCounter); - } - // Now, we should repeatedly fetch jobs from the queue until we fulfilled the request or there is nothing to get form the - // queue anymore. - // Requests are left as-is on errors. We will keep a list of them to avoid re-accessing them in the same batch. - std::set<std::string> archiveRequestsToSkip; - // Prepare the returned jobs that we might accumulate in several rounds. - std::list<std::unique_ptr<OStoreDB::ArchiveJob>> privateRet; - uint64_t currentBytes=0; - uint64_t currentFiles=0; - size_t iterationCount=0; - while (true) { - double localFindQueueTime = 0; - double localLockFetchQueueTime = 0; - double localEmptyCleanupQueueTime = 0; - double localJobSelectionTime = 0; - double localOwnershipAdditionTime = 0; - double localAsyncLaunchTime = 0; - double localQueueProcessAndCommitTime = 0; - double localOwnershipRemovalTime = 0; - double localJobsUpdateTime = 0; - double localQueueRemovalTime = 0; - iterationCount++; - uint64_t beforeBytes=currentBytes; - uint64_t beforeFiles=currentFiles; - // Try and get access to a queue. - objectstore::RootEntry re(m_oStoreDB.m_objectStore); - re.fetchNoLock(); - std::string aqAddress; - auto aql = re.dumpArchiveQueues(QueueType::LiveJobs); - for (auto & aqp : aql) { - if (aqp.tapePool == mountInfo.tapePool) - aqAddress = aqp.address; - } - if (!aqAddress.size()) break; - // try and lock the archive queue. Any failure from here on means the end of the getting jobs. - objectstore::ArchiveQueue aq(aqAddress, m_oStoreDB.m_objectStore); - objectstore::ScopedExclusiveLock aqlock; - findQueueTime += localFindQueueTime = t.secs(utils::Timer::resetCounter); - try { - try { - aqlock.lock(aq); - aq.fetch(); - lockFetchQueueTime += localLockFetchQueueTime = t.secs(utils::Timer::resetCounter); - } catch (cta::exception::Exception & ex) { - // The queue is now absent. We can remove its reference in the root entry. - // A new queue could have been added in the mean time, and be non-empty. - // We will then fail to remove from the RootEntry (non-fatal). - ScopedExclusiveLock rexl(re); - re.fetch(); - try { - re.removeArchiveQueueAndCommit(mountInfo.tapePool, QueueType::LiveJobs, logContext); - log::ScopedParamContainer params(logContext); - params.add("tapepool", mountInfo.tapePool) - .add("queueObject", aq.getAddressIfSet()); - logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): de-referenced missing queue from root entry"); - } catch (RootEntry::ArchiveQueueNotEmpty & ex) { - // TODO: improve: if we fail here we could retry to fetch a job. - log::ScopedParamContainer params(logContext); - params.add("tapepool", mountInfo.tapePool) - .add("queueObject", aq.getAddressIfSet()) - .add("Message", ex.getMessageValue()); - logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry"); - } catch (RootEntry::NoSuchArchiveQueue & ex) { - // Somebody removed the queue in the mean time. Barely worth mentioning. - log::ScopedParamContainer params(logContext); - params.add("tapepool", mountInfo.tapePool) - .add("queueObject", aq.getAddressIfSet()); - logContext.log(log::DEBUG, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry: already done."); - } - emptyQueueCleanupTime += localEmptyCleanupQueueTime = t.secs(utils::Timer::resetCounter); - continue; - } - // We now have the queue. - auto queueObject = aq.getAddressIfSet(); - auto queueSummaryBefore = aq.getJobsSummary(); - { - log::ScopedParamContainer params(logContext); - params.add("tapepool", mountInfo.tapePool) - .add("queueObject", queueObject) - .add("jobs", queueSummaryBefore.jobs) - .add("bytes", queueSummaryBefore.bytes); - logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): archive queue found."); - } - // The queue will give us a list of files to try and grab. We will attempt to - // dequeue them in one go, updating jobs in parallel. If some jobs turn out - // to not be there really, we will have to do several passes. - // We build directly the return value in the process. - auto candidateJobsFromQueue=aq.getCandidateList(bytesRequested, filesRequested, archiveRequestsToSkip); - std::list<std::unique_ptr<OStoreDB::ArchiveJob>> candidateJobs; - // If we fail to find jobs in one round, we will exit. - for (auto & cj: candidateJobsFromQueue.candidates) { - currentFiles++; - currentBytes+=cj.size; - candidateJobs.emplace_back(new OStoreDB::ArchiveJob(cj.address, m_oStoreDB, *this)); - candidateJobs.back()->tapeFile.copyNb = cj.copyNb; - } - { - log::ScopedParamContainer params(logContext); - params.add("tapepool", mountInfo.tapePool) - .add("queueObject", aq.getAddressIfSet()) - .add("candidatesCount", candidateJobs.size()) - .add("currentFiles", currentFiles) - .add("currentBytes", currentBytes) - .add("requestedFiles", filesRequested) - .add("requestedBytes", bytesRequested); - logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): will process a set of candidate jobs."); - } - jobSelectionTime += localJobSelectionTime = t.secs(utils::Timer::resetCounter); - // We now have a batch of jobs to try and dequeue. Should not be empty. - // First add the jobs to the owned list of the agent. - std::list<std::string> addedJobs; - for (const auto &j: candidateJobs) addedJobs.emplace_back(j->m_archiveRequest.getAddressIfSet()); - m_oStoreDB.m_agentReference->addBatchToOwnership(addedJobs, m_oStoreDB.m_objectStore); - ownershipAdditionTime += localOwnershipAdditionTime = t.secs(utils::Timer::resetCounter); - // We can now attempt to switch the ownership of the jobs. Depending on the type of failure (if any) we - // will adapt the rest. - // First, start the parallel updates of jobs - std::list<std::unique_ptr<objectstore::ArchiveRequest::AsyncJobOwnerUpdater>> jobUpdates; - for (const auto &j: candidateJobs) jobUpdates.emplace_back( - j->m_archiveRequest.asyncUpdateJobOwner(j->tapeFile.copyNb, m_oStoreDB.m_agentReference->getAgentAddress(), aqAddress)); - asyncUpdateLaunchTime += localAsyncLaunchTime = t.secs(utils::Timer::resetCounter); - // Now run through the results of the asynchronous updates. Non-success results come in the form of exceptions. - std::list<std::string> jobsToForget; // The jobs either absent or not owned, for which we should just remove references (agent). - std::list<std::string> jobsToDequeue; // The jobs that should not be queued anymore. All of them indeed (invalid or successfully poped). - std::list<std::unique_ptr<OStoreDB::ArchiveJob>> validatedJobs; // The jobs we successfully validated. - auto j=candidateJobs.begin(); // We will iterate on 2 lists... - auto ju=jobUpdates.begin(); - while (ju!=jobUpdates.end()) { - // Get the processing status of update - try { - (*ju)->wait(); - // Getting here means the update went through... We can proceed with removing the - // job from the queue, and populating the job to report in memory. - jobsToDequeue.emplace_back((*j)->m_archiveRequest.getAddressIfSet()); - (*j)->archiveFile = (*ju)->getArchiveFile(); - (*j)->srcURL = (*ju)->getSrcURL(); - (*j)->archiveReportURL = (*ju)->getArchiveReportURL(); - (*j)->errorReportURL = (*ju)->getArchiveErrorReportURL(); - (*j)->tapeFile.fSeq = ++nbFilesCurrentlyOnTape; - (*j)->tapeFile.vid = mountInfo.vid; - (*j)->tapeFile.blockId = - std::numeric_limits<decltype((*j)->tapeFile.blockId)>::max(); - (*j)->m_jobOwned = true; - (*j)->m_mountId = mountInfo.mountId; - (*j)->m_tapePool = mountInfo.tapePool; - log::ScopedParamContainer params(logContext); - auto timingsReport = (*ju)->getTimeingsReport(); - params.add("tapepool", mountInfo.tapePool) - .add("queueObject", aq.getAddressIfSet()) - .add("requestObject", (*j)->m_archiveRequest.getAddressIfSet()) - .add("fileId", (*j)->archiveFile.archiveFileID) - .add("lockFetchTime", timingsReport.lockFetchTime) - .add("processTime", timingsReport.processTime) - .add("commitUnlockTime", timingsReport.commitUnlockTime); - logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): popped one job"); - validatedJobs.emplace_back(std::move(*j)); - } catch (cta::exception::Exception & e) { - std::string debugType=typeid(e).name(); - if (typeid(e) == typeid(Backend::NoSuchObject) || - typeid(e) == typeid(Backend::WrongPreviousOwner)) { - // The object was not present or not owned, so we skip it. It should be removed from - // the queue. - jobsToDequeue.emplace_back((*j)->m_archiveRequest.getAddressIfSet()); - // Log the event. - log::ScopedParamContainer params(logContext); - params.add("tapepool", mountInfo.tapePool) - .add("queueObject", aq.getAddressIfSet()) - .add("requestObject", (*j)->m_archiveRequest.getAddressIfSet()); - logContext.log(log::WARNING, "In ArchiveMount::getNextJobBatch(): skipped job not owned or not present."); - } else if (typeid(e) == typeid(Backend::CouldNotUnlock)) { - // We failed to unlock the object. The request was successfully updated, so we do own it. This is a non-fatal - // situation, so we just issue a warning. Removing the request from our agent ownership would - // orphan it. - log::ScopedParamContainer params(logContext); - int demangleStatus; - char * exceptionTypeStr = abi::__cxa_demangle(typeid(e).name(), nullptr, nullptr, &demangleStatus); - params.add("tapepool", mountInfo.tapePool) - .add("queueObject", aq.getAddressIfSet()) - .add("requestObject", (*j)->m_archiveRequest.getAddressIfSet()); - if (!demangleStatus) { - params.add("exceptionType", exceptionTypeStr); - } else { - params.add("exceptionType", typeid(e).name()); - } - free(exceptionTypeStr); - exceptionTypeStr = nullptr; - params.add("message", e.getMessageValue()); - logContext.log(log::WARNING, "In ArchiveMount::getNextJobBatch(): Failed to unlock the request (lock expiration). Request remains selected."); - validatedJobs.emplace_back(std::move(*j)); - } else { - // This is not a success, yet we could not confirm the job status due to an unexpected error. - // We leave the queue as is. We forget about owning this job. This is an error. - log::ScopedParamContainer params(logContext); - int demangleStatus; - char * exceptionTypeStr = abi::__cxa_demangle(typeid(e).name(), nullptr, nullptr, &demangleStatus); - params.add("tapepool", mountInfo.tapePool) - .add("queueObject", aq.getAddressIfSet()) - .add("requestObject", (*j)->m_archiveRequest.getAddressIfSet()); - if (!demangleStatus) { - params.add("exceptionType", exceptionTypeStr); - } else { - params.add("exceptionType", typeid(e).name()); - } - free(exceptionTypeStr); - exceptionTypeStr = nullptr; - params.add("message", e.getMessageValue()); - logContext.log(log::ERR, "In ArchiveMount::getNextJobBatch(): unexpected error. Leaving the job queued."); - jobsToForget.emplace_back((*j)->m_archiveRequest.getAddressIfSet()); - archiveRequestsToSkip.insert((*j)->m_archiveRequest.getAddressIfSet()); - } - // This job is not for us. - jobsToForget.emplace_back((*j)->m_archiveRequest.getAddressIfSet()); - // We also need to update the counts. - currentFiles--; - currentBytes-=(*j)->archiveFile.fileSize; - } - jobsUpdateTime += localJobsUpdateTime = t.secs(utils::Timer::resetCounter); - // In all cases: move to the nexts. - ju=jobUpdates.erase(ju); - j=candidateJobs.erase(j); - } - // All (most) jobs are now officially owned by our agent. We can hence remove them from the queue. - aq.removeJobsAndCommit(jobsToDequeue); - queueProcessAndCommitTime += localQueueProcessAndCommitTime = t.secs(utils::Timer::resetCounter); - if (jobsToForget.size()) { - m_oStoreDB.m_agentReference->removeBatchFromOwnership(jobsToForget, m_oStoreDB.m_objectStore); - ownershipRemovalTime += localOwnershipRemovalTime = t.secs(utils::Timer::resetCounter); - } - // We can now add the validated jobs to the return value. - auto vj = validatedJobs.begin(); - while (vj != validatedJobs.end()) { - privateRet.emplace_back(std::move(*vj)); - vj=validatedJobs.erase(vj); - } - // Before going for another round, we can release the queue and delete it if we emptied it. - auto queueSummaryAfter=aq.getJobsSummary(); - aqlock.release(); - // If the queue is empty, we can get rid of it. - if (!queueSummaryAfter.jobs) { - try { - // The queue should be removed as it is empty. - ScopedExclusiveLock rexl(re); - re.fetch(); - re.removeArchiveQueueAndCommit(mountInfo.tapePool, QueueType::LiveJobs, logContext); - log::ScopedParamContainer params(logContext); - params.add("tapepool", mountInfo.tapePool) - .add("queueObject", aq.getAddressIfSet()); - logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): deleted empty queue"); - } catch (cta::exception::Exception &ex) { - log::ScopedParamContainer params(logContext); - params.add("tapepool", mountInfo.tapePool) - .add("queueObject", aq.getAddressIfSet()) - .add("Message", ex.getMessageValue()); - logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): could not delete a presumably empty queue"); - } - queueRemovalTime += localQueueRemovalTime = t.secs(utils::Timer::resetCounter); - } - // We can now summarize this round - { - log::ScopedParamContainer params(logContext); - params.add("tapepool", mountInfo.tapePool) - .add("queueObject", aq.getAddressIfSet()) - .add("filesAdded", currentFiles - beforeFiles) - .add("bytesAdded", currentBytes - beforeBytes) - .add("filesBefore", beforeFiles) - .add("bytesBefore", beforeBytes) - .add("filesAfter", currentFiles) - .add("bytesAfter", currentBytes) - .add("queueJobsBefore", queueSummaryBefore.jobs) - .add("queueBytesBefore", queueSummaryBefore.bytes) - .add("queueJobsAfter", queueSummaryAfter.jobs) - .add("queueBytesAfter", queueSummaryAfter.bytes) - .add("queueObject", queueObject) - .add("findQueueTime", localFindQueueTime) - .add("lockFetchQueueTime", localLockFetchQueueTime) - .add("emptyQueueCleanupTime", localEmptyCleanupQueueTime) - .add("jobSelectionTime", localJobSelectionTime) - .add("ownershipAdditionTime", localOwnershipAdditionTime) - .add("asyncUpdateLaunchTime", localAsyncLaunchTime) - .add("jobsUpdateTime", localJobsUpdateTime) - .add("queueProcessAndCommitTime", localQueueProcessAndCommitTime) - .add("ownershipRemovalTime", localOwnershipRemovalTime) - .add("queueRemovalTime", localQueueRemovalTime) - .add("iterationTime", localFindQueueTime + localLockFetchQueueTime + localEmptyCleanupQueueTime - + localJobSelectionTime + localOwnershipAdditionTime + localAsyncLaunchTime - + localJobsUpdateTime + localQueueProcessAndCommitTime + localOwnershipRemovalTime - + localQueueRemovalTime) - .add("iterationCount", iterationCount); - logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): did one round of jobs retrieval."); - } - // We could be done now. - if (currentBytes >= bytesRequested || currentFiles >= filesRequested) - break; - // If we had exhausted the queue while selecting the jobs, we stop here, else we can go for another - // round. - if (!candidateJobsFromQueue.remainingFilesAfterCandidates) - break; - } catch (cta::exception::Exception & ex) { - log::ScopedParamContainer params (logContext); - params.add("exceptionMessage", ex.getMessageValue()); - logContext.log(log::ERR, "In OStoreDB::ArchiveMount::getNextJobBatch(): error (CTA exception) getting more jobs. Backtrace follows."); - logContext.logBacktrace(log::ERR, ex.backtrace()); - break; - } catch (std::exception & e) { - log::ScopedParamContainer params (logContext); - params.add("exceptionWhat", e.what()); - logContext.log(log::ERR, "In OStoreDB::ArchiveMount::getNextJobBatch(): error (std exception) getting more jobs."); - break; - } catch (...) { - logContext.log(log::ERR, "In OStoreDB::ArchiveMount::getNextJobBatch(): error (unknown exception) getting more jobs."); - break; - } - } - // We either ran out of jobs or fulfilled the requirements. Time to build up the reply. - // Log the outcome. - uint64_t nFiles=privateRet.size(); - uint64_t nBytes=0; - for (auto & j: privateRet) { - nBytes+=j->archiveFile.fileSize; - } - { - log::ScopedParamContainer params(logContext); - params.add("tapepool", mountInfo.tapePool) - .add("files", nFiles) - .add("bytes", nBytes) - .add("driveRegisterCheckTime", driveRegisterCheckTime) - .add("findQueueTime", findQueueTime) - .add("lockFetchQueueTime", lockFetchQueueTime) - .add("emptyQueueCleanupTime", emptyQueueCleanupTime) - .add("jobSelectionTime", jobSelectionTime) - .add("ownershipAdditionTime", ownershipAdditionTime) - .add("asyncUpdateLaunchTime", asyncUpdateLaunchTime) - .add("jobsUpdateTime", jobsUpdateTime) - .add("queueProcessAndCommitTime", queueProcessAndCommitTime) - .add("ownershipRemovalTime", ownershipRemovalTime) - .add("queueRemovalTime", queueRemovalTime) - .add("schedulerDbTime", totalTime.secs()); - logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): jobs retrieval complete."); - } + typedef objectstore::ContainerAlgorithms<ArchiveQueue> AQAlgos; + AQAlgos aqAlgos(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); + AQAlgos::PopCriteria popCrieteria; + popCrieteria.files = filesRequested; + popCrieteria.bytes= bytesRequested; + auto jobs = aqAlgos.popNextBatch(mountInfo.tapePool, popCrieteria, logContext); // We can construct the return value. std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > ret; - for (auto & j: privateRet) ret.emplace_back(std::move(j)); + for (auto & j: jobs.elements) { + std::unique_ptr<OStoreDB::ArchiveJob> aj(new OStoreDB::ArchiveJob(j.archiveRequest->getAddressIfSet(), m_oStoreDB, *this)); + aj->tapeFile.copyNb = j.copyNb; + aj->archiveFile = j.archiveFile; + aj->archiveReportURL = j.archiveReportURL; + aj->errorReportURL = j.errorReportURL; + aj->srcURL = j.srcURL; + aj->tapeFile.fSeq = ++nbFilesCurrentlyOnTape; + aj->tapeFile.vid = mountInfo.vid; + aj->tapeFile.blockId = + std::numeric_limits<decltype(aj->tapeFile.blockId)>::max(); + aj->m_jobOwned = true; + aj->m_mountId = mountInfo.mountId; + aj->m_tapePool = mountInfo.tapePool; + ret.emplace_back(std::move(aj)); + } return ret; } -- GitLab