diff --git a/objectstore/ArchiveQueue.cpp b/objectstore/ArchiveQueue.cpp index 57fef41d6e5444d15df99c9ca9290990ef1c8dfe..c61d34f1cca57e097d50fe1fc819ff0656287211 100644 --- a/objectstore/ArchiveQueue.cpp +++ b/objectstore/ArchiveQueue.cpp @@ -144,34 +144,33 @@ std::string ArchiveQueue::getTapePool() { return m_payload.tapepool(); } -void ArchiveQueue::addJob(const ArchiveRequest::JobDump& job, - const std::string & archiveRequestAddress, uint64_t archiveFileId, - uint64_t fileSize, const cta::common::dataStructures::MountPolicy & policy, - time_t startTime) { +void ArchiveQueue::addJobs(std::list<JobToAdd> & jobsToAdd) { checkPayloadWritable(); - // Keep track of the mounting criteria - ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); - maxDriveAllowedMap.incCount(policy.maxDrivesAllowed); - ValueCountMap priorityMap(m_payload.mutable_prioritymap()); - priorityMap.incCount(policy.archivePriority); - ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap()); - minArchiveRequestAgeMap.incCount(policy.archiveMinRequestAge); - if (m_payload.pendingarchivejobs_size()) { - if ((uint64_t)startTime < m_payload.oldestjobcreationtime()) - m_payload.set_oldestjobcreationtime(startTime); - m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() + fileSize); - } else { - m_payload.set_archivejobstotalsize(fileSize); - m_payload.set_oldestjobcreationtime(startTime); + for (auto & jta: jobsToAdd) { + // Keep track of the mounting criteria + ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); + maxDriveAllowedMap.incCount(jta.policy.maxDrivesAllowed); + ValueCountMap priorityMap(m_payload.mutable_prioritymap()); + priorityMap.incCount(jta.policy.archivePriority); + ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap()); + minArchiveRequestAgeMap.incCount(jta.policy.archiveMinRequestAge); + if (m_payload.pendingarchivejobs_size()) { + if ((uint64_t)jta.startTime < m_payload.oldestjobcreationtime()) + m_payload.set_oldestjobcreationtime(jta.startTime); + m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() + jta.fileSize); + } else { + m_payload.set_archivejobstotalsize(jta.fileSize); + m_payload.set_oldestjobcreationtime(jta.startTime); + } + auto * j = m_payload.add_pendingarchivejobs(); + j->set_address(jta.archiveRequestAddress); + j->set_size(jta.fileSize); + j->set_fileid(jta.archiveFileId); + j->set_copynb(jta.job.copyNb); + j->set_maxdrivesallowed(jta.policy.maxDrivesAllowed); + j->set_priority(jta.policy.archivePriority); + j->set_minarchiverequestage(jta.policy.archiveMinRequestAge); } - auto * j = m_payload.add_pendingarchivejobs(); - j->set_address(archiveRequestAddress); - j->set_size(fileSize); - j->set_fileid(archiveFileId); - j->set_copynb(job.copyNb); - j->set_maxdrivesallowed(policy.maxDrivesAllowed); - j->set_priority(policy.archivePriority); - j->set_minarchiverequestage(policy.archiveMinRequestAge); } auto ArchiveQueue::getJobsSummary() -> JobsSummary { diff --git a/objectstore/ArchiveQueue.hpp b/objectstore/ArchiveQueue.hpp index 943add52387a1962b0dcad72b1714f93688048e4..3beaf1bff54c66086165c6acde2cad59aae02af4 100644 --- a/objectstore/ArchiveQueue.hpp +++ b/objectstore/ArchiveQueue.hpp @@ -56,10 +56,16 @@ public: void setTapePool(const std::string & name); std::string getTapePool(); - // Archive jobs management =================================================== - void addJob(const ArchiveRequest::JobDump & job, - const std::string & archiveRequestAddress, uint64_t archiveFileId, - uint64_t fileSize, const cta::common::dataStructures::MountPolicy & policy, time_t startTime); + // Archive jobs management =================================================== + struct JobToAdd { + ArchiveRequest::JobDump & job; + const std::string archiveRequestAddress; + uint64_t archiveFileId; + uint64_t fileSize; + const cta::common::dataStructures::MountPolicy policy; + time_t startTime; + }; + void addJobs(std::list<JobToAdd> & jobsToAdd); /// This version will check for existence of the job in the queue before // returns true if a new job was actually inserted. bool addJobIfNecessary(const ArchiveRequest::JobDump & job, diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index 5b3e6edbc98705fe92973a44b7a15ffe09bf7d9c..28351d42e30b60fbeac1f829ca7f8c3564141832 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -329,7 +329,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { cta::objectstore::ArchiveQueue aq(tpAddr[i], be); } // Create the various ATFR's, stopping one step further each time. - int pass=0; + unsigned int pass=0; while (true) { // -just referenced @@ -385,7 +385,9 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { policy.archiveMinRequestAge = 0; policy.archivePriority = 1; policy.maxDrivesAllowed = 1; - aq.addJob(jd, ar.getAddressIfSet(), ar.getArchiveFile().archiveFileID, 1000+pass, policy, time(NULL)); + std::list <cta::objectstore::ArchiveQueue::JobToAdd> jta; + jta.push_back({jd, ar.getAddressIfSet(), ar.getArchiveFile().archiveFileID, 1000U+pass, policy, time(NULL)}); + aq.addJobs(jta); aq.commit(); } if (pass < 4) { pass++; continue; } @@ -403,7 +405,9 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { policy.archiveMinRequestAge = 0; policy.archivePriority = 1; policy.maxDrivesAllowed = 1; - aq.addJob(jd, ar.getAddressIfSet(), ar.getArchiveFile().archiveFileID, 1000+pass, policy, time(NULL)); + std::list <cta::objectstore::ArchiveQueue::JobToAdd> jta; + jta.push_back({jd, ar.getAddressIfSet(), ar.getArchiveFile().archiveFileID, 1000+pass, policy, time(NULL)}); + aq.addJobs(jta); aq.commit(); } if (pass < 5) { pass++; continue; } diff --git a/scheduler/OStoreDB/MemQueues.cpp b/scheduler/OStoreDB/MemQueues.cpp index 7ce519ecd748006dc2e95120cc1149bca5062824..dc5cca52b0684fbae1c440b993d8041268a9636f 100644 --- a/scheduler/OStoreDB/MemQueues.cpp +++ b/scheduler/OStoreDB/MemQueues.cpp @@ -24,31 +24,43 @@ namespace cta { namespace ostoredb { template<> -void MemQueue<objectstore::ArchiveRequest, objectstore::ArchiveQueue>::specializedAddJobToQueue( - objectstore::ArchiveRequest::JobDump& job, objectstore::ArchiveRequest& request, objectstore::ArchiveQueue& queue) { - auto af = request.getArchiveFile(); - queue.addJob(job, request.getAddressIfSet(), af.archiveFileID, - af.fileSize, request.getMountPolicy(), request.getEntryLog().time); - // Back reference the queue in the job and archive request - job.owner = queue.getAddressIfSet(); - request.setJobOwner(job.copyNb, job.owner); +void MemQueue<objectstore::ArchiveRequest, objectstore::ArchiveQueue>::specializedAddJobsToQueue( + std::list<MemQueue<objectstore::ArchiveRequest, objectstore::ArchiveQueue>::JobAndRequest> & jobsToAdd, + objectstore::ArchiveQueue& queue) { + std::list<objectstore::ArchiveQueue::JobToAdd> jtal; + auto queueAddress = queue.getAddressIfSet(); + for (auto & j: jobsToAdd) { + jtal.push_back({j.job, j.request.getAddressIfSet(), j.request.getArchiveFile().archiveFileID, j.request.getArchiveFile().fileSize, + j.request.getMountPolicy(), j.request.getEntryLog().time}); + // We pre-mark (in memory) request as being owned by the queue. + // The actual commit of the request will happen after the queue's, + // so the back reference will be valid. + j.job.owner = queueAddress; + j.request.setJobOwner(j.job.copyNb, j.job.owner); + } + queue.addJobs(jtal); } template<> -void MemQueue<objectstore::RetrieveRequest, objectstore::RetrieveQueue>::specializedAddJobToQueue( - objectstore::RetrieveRequest::JobDump& job, objectstore::RetrieveRequest& request, objectstore::RetrieveQueue& queue) { - // We need to find corresponding to the copyNb - for (auto & j: request.getArchiveFile().tapeFiles) { - if (j.second.copyNb == job.copyNb) { - auto criteria = request.getRetrieveFileQueueCriteria(); - queue.addJob(j.second.copyNb, j.second.fSeq, request.getAddressIfSet(), criteria.archiveFile.fileSize, - criteria.mountPolicy, request.getEntryLog().time); - request.setActiveCopyNumber(j.second.copyNb); - request.setOwner(queue.getAddressIfSet()); - goto jobAdded; +void MemQueue<objectstore::RetrieveRequest, objectstore::RetrieveQueue>::specializedAddJobsToQueue( + std::list<MemQueue<objectstore::RetrieveRequest, objectstore::RetrieveQueue>::JobAndRequest> & jobsToAdd, + objectstore::RetrieveQueue &queue) { + for (auto & jta: jobsToAdd) { + // We need to find corresponding to the copyNb + auto & job = jta.job; + auto & request = jta.request; + for (auto & j: request.getArchiveFile().tapeFiles) { + if (j.second.copyNb == job.copyNb) { + auto criteria = request.getRetrieveFileQueueCriteria(); + queue.addJob(j.second.copyNb, j.second.fSeq, request.getAddressIfSet(), criteria.archiveFile.fileSize, + criteria.mountPolicy, request.getEntryLog().time); + request.setActiveCopyNumber(j.second.copyNb); + request.setOwner(queue.getAddressIfSet()); + goto jobAdded; + } } - } jobAdded:; + } } template<> diff --git a/scheduler/OStoreDB/MemQueues.hpp b/scheduler/OStoreDB/MemQueues.hpp index 682fe93dbed7958f032b52e5939f033b1cb63791..ad92c81e1a8baa975543a1f4109208882d083c74 100644 --- a/scheduler/OStoreDB/MemQueues.hpp +++ b/scheduler/OStoreDB/MemQueues.hpp @@ -178,8 +178,14 @@ private: static std::shared_ptr<SharedQueueLock<Queue, Request>> sharedAddToNewQueue(typename Request::JobDump & job, const std::string & queueIndex, Request & request, OStoreDB & oStoreDB, log::LogContext & logContext, threading::MutexLocker &globalLock); + /** Struct holding the job plus request data */ + struct JobAndRequest { + typename Request::JobDump & job; + Request & request; + }; + /** Helper function handling the difference between archive and retrieve (vid vs tapepool) */ - static void specializedAddJobToQueue(typename Request::JobDump & job, Request & request, Queue & queue); + static void specializedAddJobsToQueue(std::list<JobAndRequest> & jobsToAdd, Queue & queue); /** Helper function updating the cached retrieve queue stats. Noop for archive queues */ static void specializedUpdateCachedQueueStats(Queue &queue); @@ -301,16 +307,20 @@ std::shared_ptr<SharedQueueLock<Queue, Request>> MemQueue<Request, Queue>::share qBytesBefore+=j.size; } size_t addedJobs=1; + // Build the list of jobs to add to the queue + std::list<JobAndRequest> jta; // First add the job for this thread - specializedAddJobToQueue(job, request, queue); + jta.push_back({job, request}); // We are done with the queue: release the lock to make helgrind happy. ulq.unlock(); // We do the same for all the queued requests for (auto &maqr: maq->m_requests) { // Add the job - specializedAddJobToQueue(maqr->m_job, maqr->m_request, queue); + jta.push_back({maqr->m_job, maqr->m_request}); addedJobs++; } + // Actually ass the jobs. + specializedAddJobsToQueue(jta, queue); double inMemoryQueueProcessTime = timer.secs(utils::Timer::resetCounter); // We can now commit the multi-request addition to the object store queue.commit();