diff --git a/objectstore/ArchiveQueueAlgorithms.hpp b/objectstore/ArchiveQueueAlgorithms.hpp index 1c24f96e258a56504172ea3cdd451700dd979054..08955bd44cbb30e4ade1a764e7d440e0cef94ef6 100644 --- a/objectstore/ArchiveQueueAlgorithms.hpp +++ b/objectstore/ArchiveQueueAlgorithms.hpp @@ -51,9 +51,10 @@ struct ContainerTraitsTypes<ArchiveQueue> }; struct PoppedElementsSummary; struct PopCriteria { + PopCriteria(uint64_t f = 0, uint64_t b = 0) : files(f), bytes(b) {} PopCriteria& operator-=(const PoppedElementsSummary&); - uint64_t bytes = 0; - uint64_t files = 0; + uint64_t files; + uint64_t bytes; }; struct PoppedElementsSummary { uint64_t bytes = 0; diff --git a/objectstore/RetrieveQueueAlgorithms.cpp b/objectstore/RetrieveQueueAlgorithms.cpp index d7b16b8d1292c47a877863a84aaa3be837db15f2..575299865a650d496763abf91d921038f7e03db2 100644 --- a/objectstore/RetrieveQueueAlgorithms.cpp +++ b/objectstore/RetrieveQueueAlgorithms.cpp @@ -83,5 +83,4 @@ switchElementsOwnership(InsertedElement::list &elemMemCont, const ContainerAddre return ret; } - }} // namespace cta::objectstore diff --git a/objectstore/RetrieveQueueAlgorithms.hpp b/objectstore/RetrieveQueueAlgorithms.hpp index 537308ee6e07ff93310feefb3465df2101b2211b..453e6f24babba33230a85cbdd70a4a591f1c0996 100644 --- a/objectstore/RetrieveQueueAlgorithms.hpp +++ b/objectstore/RetrieveQueueAlgorithms.hpp @@ -42,25 +42,29 @@ struct ContainerTraitsTypes<RetrieveQueue> typedef RetrieveRequest::JobDump ElementDescriptor; - struct PoppedElement {}; + struct PoppedElement { + std::unique_ptr<RetrieveRequest> retrieveRequest; + common::dataStructures::ArchiveFile archiveFile; + }; struct PoppedElementsSummary; struct PopCriteria { - PopCriteria(); - PopCriteria& operator-= (const PoppedElementsSummary &); + PopCriteria(uint64_t f = 0, uint64_t b = 0) : files(f), bytes(b) {} + PopCriteria& operator-=(const PoppedElementsSummary&); + uint64_t files; + uint64_t bytes; }; struct PoppedElementsSummary { //PoppedElementsSummary(); bool operator<(const PopCriteria&); PoppedElementsSummary& operator+=(const PoppedElementsSummary&); //PoppedElementsSummary(const PoppedElementsSummary&); - //void addDeltaToLog(const PoppedElementsSummary&, log::ScopedParamContainer&); + void addDeltaToLog(const PoppedElementsSummary&, log::ScopedParamContainer&); }; - struct PoppedElementsList { - PoppedElementsList(); + struct PoppedElementsList : public std::list<PoppedElement> { void insertBack(PoppedElementsList&&); + void insertBack(PoppedElement&&); }; struct PoppedElementsBatch { - //PoppedElementsBatch(); PoppedElementsList elements; PoppedElementsSummary summary; void addToLog(log::ScopedParamContainer&); @@ -75,5 +79,4 @@ getElementAddress(const Element &e) { return e.retrieveRequest->getAddressIfSet(); } - }} // namespace cta::objectstore diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index ca196c23620a90dccd1aa147c3e5735c4b382b4a..f1935f2f8dc53d1c0851c3754371ace25ff08459 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -471,17 +471,17 @@ auto RetrieveRequest::asyncUpdateOwner(uint16_t copyNumber, const std::string& o // representation. // TODO this is an unfortunate duplication of the getXXX() members of ArchiveRequest. // We could try and refactor this. - retRef.m_retieveRequest.archiveFileID = payload.archivefile().archivefileid(); + retRef.m_retrieveRequest.archiveFileID = payload.archivefile().archivefileid(); objectstore::EntryLogSerDeser el; el.deserialize(payload.schedulerrequest().entrylog()); - retRef.m_retieveRequest.creationLog = el; + retRef.m_retrieveRequest.creationLog = el; objectstore::DiskFileInfoSerDeser dfi; dfi.deserialize(payload.schedulerrequest().diskfileinfo()); - retRef.m_retieveRequest.diskFileInfo = dfi; - retRef.m_retieveRequest.dstURL = payload.schedulerrequest().dsturl(); - retRef.m_retieveRequest.errorReportURL = payload.schedulerrequest().retrieveerrorreporturl(); - retRef.m_retieveRequest.requester.name = payload.schedulerrequest().requester().name(); - retRef.m_retieveRequest.requester.group = payload.schedulerrequest().requester().group(); + retRef.m_retrieveRequest.diskFileInfo = dfi; + retRef.m_retrieveRequest.dstURL = payload.schedulerrequest().dsturl(); + retRef.m_retrieveRequest.errorReportURL = payload.schedulerrequest().retrieveerrorreporturl(); + retRef.m_retrieveRequest.requester.name = payload.schedulerrequest().requester().name(); + retRef.m_retrieveRequest.requester.group = payload.schedulerrequest().requester().group(); objectstore::ArchiveFileSerDeser af; af.deserialize(payload.archivefile()); retRef.m_archiveFile = af; @@ -514,7 +514,7 @@ const common::dataStructures::ArchiveFile& RetrieveRequest::AsyncOwnerUpdater::g // RetrieveRequest::AsyncOwnerUpdater::getRetrieveRequest() //------------------------------------------------------------------------------ const common::dataStructures::RetrieveRequest& RetrieveRequest::AsyncOwnerUpdater::getRetrieveRequest() { - return m_retieveRequest; + return m_retrieveRequest; } //------------------------------------------------------------------------------ diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp index f8e1388bcfad4d47a3bc5a7b137a48f247d33eef..f0ac49dac365d168d1ac917aa08df3a569588247 100644 --- a/objectstore/RetrieveRequest.hpp +++ b/objectstore/RetrieveRequest.hpp @@ -96,7 +96,7 @@ public: private: std::function<std::string(const std::string &)> m_updaterCallback; std::unique_ptr<Backend::AsyncUpdater> m_backendUpdater; - common::dataStructures::RetrieveRequest m_retieveRequest; + common::dataStructures::RetrieveRequest m_retrieveRequest; common::dataStructures::ArchiveFile m_archiveFile; }; // An owner updater factory. The owner MUST be previousOwner for the update to be executed. diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 0858ef741deafb40071aee45535f64340a55aef0..d6fb7a5de6b799c0782abebde3c813799b85ff83 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -19,6 +19,7 @@ #include "OStoreDB.hpp" #include "MemQueues.hpp" #include "objectstore/ArchiveQueueAlgorithms.hpp" +#include "objectstore/RetrieveQueueAlgorithms.hpp" //#include "common/dataStructures/SecurityIdentity.hpp" #include "objectstore/DriveRegister.hpp" #include "objectstore/DriveState.hpp" @@ -1655,10 +1656,8 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun uint64_t bytesRequested, log::LogContext& logContext) { typedef objectstore::ContainerAlgorithms<ArchiveQueue> AQAlgos; AQAlgos aqAlgos(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); - AQAlgos::PopCriteria popCrieteria; - popCrieteria.files = filesRequested; - popCrieteria.bytes= bytesRequested; - auto jobs = aqAlgos.popNextBatch(mountInfo.tapePool, popCrieteria, logContext); + AQAlgos::PopCriteria popCriteria(filesRequested, bytesRequested); + auto jobs = aqAlgos.popNextBatch(mountInfo.tapePool, popCriteria, logContext); // We can construct the return value. std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > ret; for (auto & j: jobs.elements) { @@ -1723,370 +1722,27 @@ const OStoreDB::RetrieveMount::MountInfo& OStoreDB::RetrieveMount::getMountInfo( //------------------------------------------------------------------------------ // OStoreDB::RetrieveMount::getNextJobBatch() //------------------------------------------------------------------------------ -std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMount::getNextJobBatch(uint64_t filesRequested, - uint64_t bytesRequested, log::LogContext& logContext) { - utils::Timer t, totalTime; - double driveRegisterCheckTime = 0; - double findQueueTime = 0; - double lockFetchQueueTime = 0; - double emptyQueueCleanupTime = 0; - double jobSelectionTime = 0; - double ownershipAdditionTime = 0; - double asyncUpdateLaunchTime = 0; - double jobsUpdateTime = 0; - double queueProcessAndCommitTime = 0; - double queueRemovalTime = 0; - double ownershipRemovalTime = 0; - // Find the next files to retrieve - // First, check we should not forcibly go down. In such an occasion, we just find noting to do. - // Get drive register - { - // Get the drive status. Failure is non-fatal. We will carry on. - try { - objectstore::RootEntry re(m_oStoreDB.m_objectStore); - re.fetchNoLock(); - objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_oStoreDB.m_objectStore); - dr.fetchNoLock(); - objectstore::DriveState ds(dr.getDriveAddress(mountInfo.drive), m_oStoreDB.m_objectStore); - ds.fetchNoLock(); - auto driveStateValue = ds.getState(); - if (!driveStateValue.desiredDriveState.up && driveStateValue.desiredDriveState.forceDown) { - logContext.log(log::INFO, "In OStoreDB::RetrieveMount::getNextJobBatch(): returning no job as we are forcibly going down."); - return std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> >(); - } - } catch (cta::exception::Exception & ex) { - log::ScopedParamContainer params (logContext); - params.add("exceptionMessage", ex.getMessageValue()); - logContext.log(log::INFO, "In OStoreDB::RetrieveMount::getNextJobBatch(): failed to check up/down status."); - } - driveRegisterCheckTime = t.secs(utils::Timer::resetCounter); - } - // Now, we should repeatedly fetch jobs from the queue until we fulfilled the request or there is nothing to get form the - // queue anymore. - // Requests are left as-is on errors. We will keep a list of them to avoid re-accessing them in the same batch. - std::set<std::string> retrieveRequestsToSkip; - // Prepare the returned jobs that we might accumulate in several rounds. - std::list<std::unique_ptr<OStoreDB::RetrieveJob>> privateRet; - uint64_t currentBytes=0; - uint64_t currentFiles=0; - size_t iterationCount=0; - while (true) { - double localFindQueueTime = 0; - double localLockFetchQueueTime = 0; - double localEmptyCleanupQueueTime = 0; - double localJobSelectionTime = 0; - double localOwnershipAdditionTime = 0; - double localAsyncLaunchTime = 0; - double localQueueProcessAndCommitTime = 0; - double localOwnershipRemovalTime = 0; - double localJobsUpdateTime = 0; - double localQueueRemovalTime = 0; - iterationCount++; - uint64_t beforeBytes=currentBytes; - uint64_t beforeFiles=currentFiles; - // Try and get access to a queue. - objectstore::RootEntry re(m_oStoreDB.m_objectStore); - re.fetchNoLock(); - std::string rqAddress; - auto rql = re.dumpRetrieveQueues(QueueType::LiveJobs); - for (auto & rqp : rql) { - if (rqp.vid == mountInfo.vid) - rqAddress = rqp.address; - } - if (!rqAddress.size()) break; - // try and lock the retrieve queue. Any failure from here on means the end of the getting jobs. - objectstore::RetrieveQueue rq(rqAddress, m_oStoreDB.m_objectStore); - objectstore::ScopedExclusiveLock rqLock; - findQueueTime += localFindQueueTime = t.secs(utils::Timer::resetCounter); - try { - try { - rqLock.lock(rq); - rq.fetch(); - lockFetchQueueTime += localLockFetchQueueTime = t.secs(utils::Timer::resetCounter); - } catch (cta::exception::Exception & ex) { - // The queue is now absent. We can remove its reference in the root entry. - // A new queue could have been added in the mean time, and be non-empty. - // We will then fail to remove from the RootEntry (non-fatal). - ScopedExclusiveLock rexl(re); - re.fetch(); - try { - re.removeRetrieveQueueAndCommit(mountInfo.vid, QueueType::LiveJobs, logContext); - log::ScopedParamContainer params(logContext); - params.add("vid", mountInfo.vid) - .add("queueObject", rq.getAddressIfSet()); - logContext.log(log::INFO, "In RetrieveMount::getNextJobBatch(): de-referenced missing queue from root entry"); - } catch (RootEntry::ArchiveQueueNotEmpty & ex) { - // TODO: improve: if we fail here we could retry to fetch a job. - log::ScopedParamContainer params(logContext); - params.add("vid", mountInfo.vid) - .add("queueObject", rq.getAddressIfSet()) - .add("Message", ex.getMessageValue()); - logContext.log(log::INFO, "In RetrieveMount::getNextJobBatch(): could not de-referenced missing queue from root entry"); - } - emptyQueueCleanupTime += localEmptyCleanupQueueTime = t.secs(utils::Timer::resetCounter); - continue; - } - // We now have the queue. - auto queueObject = rq.getAddressIfSet(); - auto queueSummaryBefore = rq.getJobsSummary(); - { - log::ScopedParamContainer params(logContext); - params.add("vid", mountInfo.vid) - .add("queueObject", rq.getAddressIfSet()) - .add("queueSize", rq.getJobsSummary().files); - logContext.log(log::INFO, "In RetrieveMount::getNextJobBatch(): retrieve queue found."); - } - // We should build the list of jobs we intend to grab. We will attempt to - // dequeue them in one go, updating jobs in parallel. If some jobs turn out - // to not be there really, we will have to do several passes. - // We build directly the return value in the process. - auto candidateJobsFromQueue=rq.getCandidateList(bytesRequested, filesRequested, retrieveRequestsToSkip); - std::list<std::unique_ptr<OStoreDB::RetrieveJob>> candidateJobs; - // If we fail to find jobs in one round, we will exit. - for (auto & cj: candidateJobsFromQueue.candidates) { - currentFiles++; - currentBytes+=cj.size; - candidateJobs.emplace_back(new OStoreDB::RetrieveJob(cj.address, m_oStoreDB, *this)); - candidateJobs.back()->selectedCopyNb = cj.copyNb; - } - { - log::ScopedParamContainer params(logContext); - params.add("vid", mountInfo.vid) - .add("queueObject", rq.getAddressIfSet()) - .add("candidatesCount", candidateJobs.size()) - .add("currentFiles", currentFiles) - .add("currentBytes", currentBytes) - .add("requestedFiles", filesRequested) - .add("requestedBytes", bytesRequested); - logContext.log(log::INFO, "In RetrieveMount::getNextJobBatch(): will process a set of candidate jobs."); - } - jobSelectionTime += localJobSelectionTime = t.secs(utils::Timer::resetCounter); - // We now have a batch of jobs to try and dequeue. Should not be empty. - // First add the jobs to the owned list of the agent. - std::list<std::string> addedJobs; - for (const auto &j: candidateJobs) addedJobs.emplace_back(j->m_retrieveRequest.getAddressIfSet()); - m_oStoreDB.m_agentReference->addBatchToOwnership(addedJobs, m_oStoreDB.m_objectStore); - ownershipAdditionTime += localOwnershipAdditionTime = t.secs(utils::Timer::resetCounter); - // We can now attempt to switch the ownership of the jobs. Depending on the type of failure (if any) we - // will adapt the rest. - // First, start the parallel updates of jobs - std::list<std::unique_ptr<objectstore::RetrieveRequest::AsyncOwnerUpdater>> jobUpdates; - for (const auto &j: candidateJobs) jobUpdates.emplace_back( - j->m_retrieveRequest.asyncUpdateOwner(j->selectedCopyNb, m_oStoreDB.m_agentReference->getAgentAddress(), rqAddress)); - asyncUpdateLaunchTime += localAsyncLaunchTime = t.secs(utils::Timer::resetCounter); - // Now run through the results of the asynchronous updates. Non-sucess results come in the form of exceptions. - std::list<std::string> jobsToForget; // The jobs either absent or not owned, for which we should just remove references (agent). - std::list<std::string> jobsToDequeue; // The jobs that should not be queued anymore. All of them indeed (invalid or successfully poped). - std::list<std::unique_ptr<OStoreDB::RetrieveJob>> validatedJobs; // The jobs we successfully validated. - auto j=candidateJobs.begin(); // We will iterate on 2 lists... - auto ju=jobUpdates.begin(); - while (ju!=jobUpdates.end()) { - // Get the processing status of update - try { - (*ju)->wait(); - // Getting here means the update went through... We can proceed with removing the - // job from the queue, and populating the job to report in memory. - jobsToDequeue.emplace_back((*j)->m_retrieveRequest.getAddressIfSet()); - (*j)->archiveFile = (*ju)->getArchiveFile(); - (*j)->retrieveRequest = (*ju)->getRetrieveRequest(); - (*j)->m_jobOwned = true; - (*j)->m_mountId = mountInfo.mountId; - log::ScopedParamContainer params(logContext); - params.add("vid", mountInfo.vid) - .add("queueObject", rq.getAddressIfSet()) - .add("requestObject", (*j)->m_retrieveRequest.getAddressIfSet()) - .add("fileId", (*j)->archiveFile.archiveFileID); - logContext.log(log::INFO, "In RetrieveMount::getNextJobBatch(): popped one job"); - validatedJobs.emplace_back(std::move(*j)); - } catch (cta::exception::Exception & e) { - std::string debugType=typeid(e).name(); - if (typeid(e) == typeid(Backend::NoSuchObject) || - typeid(e) == typeid(Backend::WrongPreviousOwner)) { - // The object was not present or not owned, so we skip it. It should be removed from - // the queue. - jobsToDequeue.emplace_back((*j)->m_retrieveRequest.getAddressIfSet()); - // Log the event. - log::ScopedParamContainer params(logContext); - params.add("vid", mountInfo.vid) - .add("queueObject", rq.getAddressIfSet()) - .add("requestObject", (*j)->m_retrieveRequest.getAddressIfSet()); - logContext.log(log::WARNING, "In RetrieveMount::getNextJobBatch(): skipped job not owned or not present."); - } else if (typeid(e) == typeid(Backend::CouldNotUnlock)) { - // We failed to unlock the object. The request was successfully updated, so we do own it. This is a non-fatal - // situation, so we just issue a warning. Removing the request from our agent ownership would - // orphan it. - log::ScopedParamContainer params(logContext); - int demangleStatus; - char * exceptionTypeStr = abi::__cxa_demangle(typeid(e).name(), nullptr, nullptr, &demangleStatus); - params.add("vid", mountInfo.vid) - .add("queueObject", rq.getAddressIfSet()) - .add("requestObject", (*j)->m_retrieveRequest.getAddressIfSet()); - if (!demangleStatus) { - params.add("exceptionType", exceptionTypeStr); - } else { - params.add("exceptionType", typeid(e).name()); - } - free(exceptionTypeStr); - exceptionTypeStr = nullptr; - params.add("message", e.getMessageValue()); - logContext.log(log::WARNING, "In RetrieveMount::getNextJobBatch(): Failed to unlock the request (lock expiration). Request remains selected."); - validatedJobs.emplace_back(std::move(*j)); - } else { - // This is not a success, yet we could not confirm the job status due to an unexpected error. - // We leave the queue as is. We forget about owning this job. This is an error. - log::ScopedParamContainer params(logContext); - int demangleStatus; - char * exceptionTypeStr = abi::__cxa_demangle(typeid(e).name(), nullptr, nullptr, &demangleStatus); - params.add("vid", mountInfo.vid) - .add("queueObject", rq.getAddressIfSet()) - .add("requestObject", (*j)->m_retrieveRequest.getAddressIfSet()); - if (!demangleStatus) { - params.add("exceptionType", exceptionTypeStr); - } else { - params.add("exceptionType", typeid(e).name()); - } - free(exceptionTypeStr); - exceptionTypeStr = nullptr; - params.add("message", e.getMessageValue()); - logContext.log(log::ERR, "In RetrieveMount::getNextJobBatch(): unexpected error. Leaving the job queued."); - jobsToForget.emplace_back((*j)->m_retrieveRequest.getAddressIfSet()); - retrieveRequestsToSkip.insert((*j)->m_retrieveRequest.getAddressIfSet()); - } - // This job is not for us. - jobsToForget.emplace_back((*j)->m_retrieveRequest.getAddressIfSet()); - // We also need to update the counts. - currentFiles--; - currentBytes-=(*j)->archiveFile.fileSize; - } - jobsUpdateTime += localJobsUpdateTime = t.secs(utils::Timer::resetCounter); - // In all cases: move to the nexts. - ju=jobUpdates.erase(ju); - j=candidateJobs.erase(j); - } - // All (most) jobs are now officially owned by our agent. We can hence remove them from the queue. - rq.removeJobsAndCommit(jobsToDequeue); - queueProcessAndCommitTime += localQueueProcessAndCommitTime = t.secs(utils::Timer::resetCounter); - if (jobsToForget.size()) { - m_oStoreDB.m_agentReference->removeBatchFromOwnership(jobsToForget, m_oStoreDB.m_objectStore); - ownershipRemovalTime += localOwnershipRemovalTime = t.secs(utils::Timer::resetCounter); - } - // We can now add the validated jobs to the return value. - auto vj = validatedJobs.begin(); - while (vj != validatedJobs.end()) { - privateRet.emplace_back(std::move(*vj)); - vj=validatedJobs.erase(vj); - } - // Before going for another round, we can release the queue and delete it if we emptied it. - auto queueSummaryAfter=rq.getJobsSummary(); - rqLock.release(); - // If the queue is empty, we can get rid of it. - if (!queueSummaryAfter.files) { - try { - // The queue should be removed as it is empty. - ScopedExclusiveLock rexl(re); - re.fetch(); - re.removeRetrieveQueueAndCommit(mountInfo.vid, QueueType::LiveJobs, logContext); - log::ScopedParamContainer params(logContext); - params.add("vid", mountInfo.vid) - .add("queueObject", rq.getAddressIfSet()); - logContext.log(log::INFO, "In RetrieveMount::getNextJobBatch(): deleted empty queue"); - } catch (cta::exception::Exception &ex) { - log::ScopedParamContainer params(logContext); - params.add("vid", mountInfo.vid) - .add("queueObject", rq.getAddressIfSet()) - .add("Message", ex.getMessageValue()); - logContext.log(log::INFO, "In RetrieveMount::getNextJobBatch(): could not delete a presumably empty queue"); - } - queueRemovalTime += localQueueRemovalTime = t.secs(utils::Timer::resetCounter); - } - // We can now summarize this round - { - log::ScopedParamContainer params(logContext); - params.add("vid", mountInfo.vid) - .add("queueObject", rq.getAddressIfSet()) - .add("filesAdded", currentFiles - beforeFiles) - .add("bytesAdded", currentBytes - beforeBytes) - .add("filesBefore", beforeFiles) - .add("bytesBefore", beforeBytes) - .add("filesAfter", currentFiles) - .add("bytesAfter", currentBytes) - .add("queueJobsBefore", queueSummaryBefore.files) - .add("queueBytesBefore", queueSummaryBefore.bytes) - .add("queueJobsAfter", queueSummaryAfter.files) - .add("queueBytesAfter", queueSummaryAfter.bytes) - .add("queueObject", queueObject) - .add("findQueueTime", localFindQueueTime) - .add("lockFetchQueueTime", localLockFetchQueueTime) - .add("emptyQueueCleanupTime", localEmptyCleanupQueueTime) - .add("jobSelectionTime", localJobSelectionTime) - .add("ownershipAdditionTime", localOwnershipAdditionTime) - .add("asyncUpdateLaunchTime", localAsyncLaunchTime) - .add("jobsUpdateTime", localJobsUpdateTime) - .add("queueProcessAndCommitTime", localQueueProcessAndCommitTime) - .add("ownershipRemovalTime", localOwnershipRemovalTime) - .add("queueRemovalTime", localQueueRemovalTime) - .add("iterationTime", localFindQueueTime + localLockFetchQueueTime + localEmptyCleanupQueueTime - + localJobSelectionTime + localOwnershipAdditionTime + localAsyncLaunchTime - + localJobsUpdateTime + localQueueProcessAndCommitTime + localOwnershipRemovalTime - + localQueueRemovalTime) - .add("iterationCount", iterationCount); - logContext.log(log::INFO, "In RetrieveMount::getNextJobBatch(): did one round of jobs retrieval."); - } - // We could be done now. - if (currentBytes >= bytesRequested || currentFiles >= filesRequested) - break; - // If we had exhausted the queue while selecting the jobs, we stop here, else we can go for another - // round. - if (!candidateJobsFromQueue.remainingFilesAfterCandidates) - break; - } catch (cta::exception::Exception & ex) { - log::ScopedParamContainer params (logContext); - params.add("exceptionMessage", ex.getMessageValue()); - logContext.log(log::ERR, "In OStoreDB::RetrieveMount::getNextJobBatch(): error (CTA exception) getting more jobs. Backtrace follows."); - logContext.logBacktrace(log::ERR, ex.backtrace()); - break; - } catch (std::exception & e) { - log::ScopedParamContainer params (logContext); - params.add("exceptionWhat", e.what()); - logContext.log(log::ERR, "In OStoreDB::RetrieveMount::getNextJobBatch(): error (std exception) getting more jobs."); - break; - } catch (...) { - logContext.log(log::ERR, "In OStoreDB::RetrieveMount::getNextJobBatch(): error (unknown exception) getting more jobs."); - break; - } - } - // We either ran out of jobs or fulfilled the requirements. Time to build up the reply. - // Log the outcome. - uint64_t nFiles=privateRet.size(); - uint64_t nBytes=0; - for (auto & j: privateRet) { - nBytes+=j->archiveFile.fileSize; - } +std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::RetrieveMount:: +getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext &logContext) +{ + typedef objectstore::ContainerAlgorithms<RetrieveQueue> RQAlgos; + RQAlgos rqAlgos(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); + RQAlgos::PopCriteria popCriteria(filesRequested, bytesRequested); + auto jobs = rqAlgos.popNextBatch(mountInfo.vid, popCriteria, logContext); + // We can construct the return value + std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> ret; + for(auto &j : jobs.elements) { - log::ScopedParamContainer params(logContext); - params.add("tapepool", mountInfo.tapePool) - .add("files", nFiles) - .add("bytes", nBytes) - .add("driveRegisterCheckTime", driveRegisterCheckTime) - .add("findQueueTime", findQueueTime) - .add("lockFetchQueueTime", lockFetchQueueTime) - .add("emptyQueueCleanupTime", emptyQueueCleanupTime) - .add("jobSelectionTime", jobSelectionTime) - .add("ownershipAdditionTime", ownershipAdditionTime) - .add("asyncUpdateLaunchTime", asyncUpdateLaunchTime) - .add("jobsUpdateTime", jobsUpdateTime) - .add("queueProcessAndCommitTime", queueProcessAndCommitTime) - .add("ownershipRemovalTime", ownershipRemovalTime) - .add("schedulerDbTime", totalTime.secs()); - logContext.log(log::INFO, "In RetrieveMount::getNextJobBatch(): jobs retrieval complete."); + std::unique_ptr<OStoreDB::RetrieveJob> rj(new OStoreDB::RetrieveJob(j.retrieveRequest->getAddressIfSet(), m_oStoreDB, *this)); + rj->archiveFile = j.archiveFile; + rj->retrieveRequest = j.retrieveRequest->getSchedulerRequest(); + rj->m_jobOwned = true; + rj->m_mountId = mountInfo.mountId; + ret.emplace_back(std::move(rj)); } - // We can construct the return value. - std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > ret; - for (auto & j: privateRet) ret.emplace_back(std::move(j)); return ret; } - //------------------------------------------------------------------------------ // OStoreDB::RetrieveMount::complete() //------------------------------------------------------------------------------ diff --git a/xrootd-ssi-protobuf-interface b/xrootd-ssi-protobuf-interface index a0852801beb2802954596389b5a7126999c9c1c6..223c2e127c6b7530e839c52665e35c45dfd4fb7f 160000 --- a/xrootd-ssi-protobuf-interface +++ b/xrootd-ssi-protobuf-interface @@ -1 +1 @@ -Subproject commit a0852801beb2802954596389b5a7126999c9c1c6 +Subproject commit 223c2e127c6b7530e839c52665e35c45dfd4fb7f