diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index 28351d42e30b60fbeac1f829ca7f8c3564141832..67296e1fddd7492ce0653113cc6d95969a360d20 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -588,7 +588,9 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) { cta::objectstore::RetrieveQueue rq(tAddr[0], be); cta::objectstore::ScopedExclusiveLock rql(rq); rq.fetch(); - rq.addJob(1,rqc.archiveFile.tapeFiles[1].fSeq, rr.getAddressIfSet(), rqc.archiveFile.fileSize, rqc.mountPolicy, sReq.creationLog.time); + std::list <cta::objectstore::RetrieveQueue::JobToAdd> jta; + jta.push_back({1,rqc.archiveFile.tapeFiles[1].fSeq, rr.getAddressIfSet(), rqc.archiveFile.fileSize, rqc.mountPolicy, sReq.creationLog.time}); + rq.addJobs(jta); rq.commit(); } if (pass < 5) { pass++; continue; } diff --git a/objectstore/RetrieveQueue.cpp b/objectstore/RetrieveQueue.cpp index 9caeb93a6d901a36cffe53d6ea25aefd133771e1..7bc1345791bcd0756502387f5e1a041f78d588a4 100644 --- a/objectstore/RetrieveQueue.cpp +++ b/objectstore/RetrieveQueue.cpp @@ -99,40 +99,40 @@ std::string cta::objectstore::RetrieveQueue::dump() { return headerDump; } -void cta::objectstore::RetrieveQueue::addJob(uint64_t copyNb, uint64_t fSeq, - const std::string & retrieveRequestAddress, uint64_t size, - const cta::common::dataStructures::MountPolicy & policy, time_t startTime) { +void cta::objectstore::RetrieveQueue::addJobs(std::list<cta::objectstore::RetrieveQueue::JobToAdd> & jobsToAdd) { checkPayloadWritable(); // Keep track of the mounting criteria ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); - maxDriveAllowedMap.incCount(policy.maxDrivesAllowed); ValueCountMap priorityMap(m_payload.mutable_prioritymap()); - priorityMap.incCount(policy.retrievePriority); ValueCountMap minRetrieveRequestAgeMap(m_payload.mutable_minretrieverequestagemap()); - minRetrieveRequestAgeMap.incCount(policy.retrieveMinRequestAge); - if (m_payload.retrievejobs_size()) { - if (m_payload.oldestjobcreationtime() > (uint64_t)startTime) { - m_payload.set_oldestjobcreationtime(startTime); + for (auto & jta: jobsToAdd) { + maxDriveAllowedMap.incCount(jta.policy.maxDrivesAllowed); + priorityMap.incCount(jta.policy.retrievePriority); + minRetrieveRequestAgeMap.incCount(jta.policy.retrieveMinRequestAge); + if (m_payload.retrievejobs_size()) { + if (m_payload.oldestjobcreationtime() > (uint64_t)jta.startTime) { + m_payload.set_oldestjobcreationtime(jta.startTime); + } + m_payload.set_retrievejobstotalsize(m_payload.retrievejobstotalsize() + jta.size); + } else { + m_payload.set_oldestjobcreationtime(jta.startTime); + m_payload.set_retrievejobstotalsize(jta.size); + } + auto * j = m_payload.add_retrievejobs(); + j->set_address(jta.retrieveRequestAddress); + j->set_size(jta.size); + j->set_copynb(jta.copyNb); + j->set_fseq(jta.fSeq); + j->set_priority(jta.policy.retrievePriority); + j->set_minretrieverequestage(jta.policy.retrieveMinRequestAge); + j->set_maxdrivesallowed(jta.policy.maxDrivesAllowed); + // move the the new job in the right spot on the queue. + // i points to the newly added job all the time. + size_t i=m_payload.retrievejobs_size() - 1; + while (i > 0 && m_payload.retrievejobs(i).fseq() < m_payload.retrievejobs(i - 1).fseq()) { + m_payload.mutable_retrievejobs()->SwapElements(i-1, i); + i--; } - m_payload.set_retrievejobstotalsize(m_payload.retrievejobstotalsize() + size); - } else { - m_payload.set_oldestjobcreationtime(startTime); - m_payload.set_retrievejobstotalsize(size); - } - auto * j = m_payload.add_retrievejobs(); - j->set_address(retrieveRequestAddress); - j->set_size(size); - j->set_copynb(copyNb); - j->set_fseq(fSeq); - j->set_priority(policy.retrievePriority); - j->set_minretrieverequestage(policy.retrieveMinRequestAge); - j->set_maxdrivesallowed(policy.maxDrivesAllowed); - // move the the new job in the right spot on the queue. - // i points to the newly added job all the time. - size_t i=m_payload.retrievejobs_size() - 1; - while (i > 0 && m_payload.retrievejobs(i).fseq() < m_payload.retrievejobs(i - 1).fseq()) { - m_payload.mutable_retrievejobs()->SwapElements(i-1, i); - i--; } } diff --git a/objectstore/RetrieveQueue.hpp b/objectstore/RetrieveQueue.hpp index e816303a72dbcdb18c4524a90c3456508ac41dd1..b9bb22bec03e7fe5bb650e0271681df90a9d7b85 100644 --- a/objectstore/RetrieveQueue.hpp +++ b/objectstore/RetrieveQueue.hpp @@ -45,9 +45,15 @@ public: std::string dump(); // Retrieve jobs management ================================================== - void addJob(uint64_t copyNb, uint64_t fSeq, - const std::string & retrieveRequestAddress, uint64_t size, - const cta::common::dataStructures::MountPolicy & policy, time_t startTime); + struct JobToAdd { + uint64_t copyNb; + uint64_t fSeq; + const std::string retrieveRequestAddress; + uint64_t size; + const cta::common::dataStructures::MountPolicy policy; + time_t startTime; + }; + void addJobs(std::list<JobToAdd> & jobsToAdd); /// This version will check for existence of the job in the queue before // returns true if a new job was actually inserted. bool addJobIfNecessary(uint64_t copyNb, uint64_t fSeq, diff --git a/scheduler/OStoreDB/MemQueues.cpp b/scheduler/OStoreDB/MemQueues.cpp index dc5cca52b0684fbae1c440b993d8041268a9636f..d14d7ad462bdd0ffcd22ab5be4676c02d4c1aafd 100644 --- a/scheduler/OStoreDB/MemQueues.cpp +++ b/scheduler/OStoreDB/MemQueues.cpp @@ -45,22 +45,25 @@ template<> void MemQueue<objectstore::RetrieveRequest, objectstore::RetrieveQueue>::specializedAddJobsToQueue( std::list<MemQueue<objectstore::RetrieveRequest, objectstore::RetrieveQueue>::JobAndRequest> & jobsToAdd, objectstore::RetrieveQueue &queue) { + std::list<objectstore::RetrieveQueue::JobToAdd> jtal; + auto queueAddress = queue.getAddressIfSet(); for (auto & jta: jobsToAdd) { - // We need to find corresponding to the copyNb + // We need to find the job corresponding to the copyNb auto & job = jta.job; auto & request = jta.request; for (auto & j: request.getArchiveFile().tapeFiles) { if (j.second.copyNb == job.copyNb) { auto criteria = request.getRetrieveFileQueueCriteria(); - queue.addJob(j.second.copyNb, j.second.fSeq, request.getAddressIfSet(), criteria.archiveFile.fileSize, - criteria.mountPolicy, request.getEntryLog().time); + jtal.push_back({j.second.copyNb, j.second.fSeq, request.getAddressIfSet(), criteria.archiveFile.fileSize, + criteria.mountPolicy, request.getEntryLog().time}); request.setActiveCopyNumber(j.second.copyNb); - request.setOwner(queue.getAddressIfSet()); + request.setOwner(queueAddress); goto jobAdded; } } jobAdded:; } + queue.addJobs(jtal); } template<>