diff --git a/objectstore/ArchiveQueue.cpp b/objectstore/ArchiveQueue.cpp index c61d34f1cca57e097d50fe1fc819ff0656287211..400c264662f19a64e9f7a14447cfa36288a8135f 100644 --- a/objectstore/ArchiveQueue.cpp +++ b/objectstore/ArchiveQueue.cpp @@ -144,7 +144,7 @@ std::string ArchiveQueue::getTapePool() { return m_payload.tapepool(); } -void ArchiveQueue::addJobs(std::list<JobToAdd> & jobsToAdd) { +void ArchiveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd) { checkPayloadWritable(); for (auto & jta: jobsToAdd) { // Keep track of the mounting criteria @@ -171,6 +171,7 @@ void ArchiveQueue::addJobs(std::list<JobToAdd> & jobsToAdd) { j->set_priority(jta.policy.archivePriority); j->set_minarchiverequestage(jta.policy.archiveMinRequestAge); } + commit(); } auto ArchiveQueue::getJobsSummary() -> JobsSummary { diff --git a/objectstore/ArchiveQueue.hpp b/objectstore/ArchiveQueue.hpp index 3beaf1bff54c66086165c6acde2cad59aae02af4..0f758092aac7f6e43d1c3c1d4e80d88041ec5b95 100644 --- a/objectstore/ArchiveQueue.hpp +++ b/objectstore/ArchiveQueue.hpp @@ -65,7 +65,7 @@ public: const cta::common::dataStructures::MountPolicy policy; time_t startTime; }; - void addJobs(std::list<JobToAdd> & jobsToAdd); + void addJobsAndCommit(std::list<JobToAdd> & jobsToAdd); /// This version will check for existence of the job in the queue before // returns true if a new job was actually inserted. bool addJobIfNecessary(const ArchiveRequest::JobDump & job, diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index 67296e1fddd7492ce0653113cc6d95969a360d20..bed103ee7a66952f54159ade2bddc1bf0e841557 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -387,8 +387,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { policy.maxDrivesAllowed = 1; std::list <cta::objectstore::ArchiveQueue::JobToAdd> jta; jta.push_back({jd, ar.getAddressIfSet(), ar.getArchiveFile().archiveFileID, 1000U+pass, policy, time(NULL)}); - aq.addJobs(jta); - aq.commit(); + aq.addJobsAndCommit(jta); } if (pass < 4) { pass++; continue; } // TODO: partially migrated or selected @@ -407,8 +406,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { policy.maxDrivesAllowed = 1; std::list <cta::objectstore::ArchiveQueue::JobToAdd> jta; jta.push_back({jd, ar.getAddressIfSet(), ar.getArchiveFile().archiveFileID, 1000+pass, policy, time(NULL)}); - aq.addJobs(jta); - aq.commit(); + aq.addJobsAndCommit(jta); } if (pass < 5) { pass++; continue; } // - Still marked a not owned but referenced in the agent @@ -590,8 +588,7 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) { rq.fetch(); std::list <cta::objectstore::RetrieveQueue::JobToAdd> jta; jta.push_back({1,rqc.archiveFile.tapeFiles[1].fSeq, rr.getAddressIfSet(), rqc.archiveFile.fileSize, rqc.mountPolicy, sReq.creationLog.time}); - rq.addJobs(jta); - rq.commit(); + rq.addJobsAndCommit(jta); } if (pass < 5) { pass++; continue; } // - Still marked a not owned but referenced in the agent diff --git a/objectstore/RetrieveQueue.cpp b/objectstore/RetrieveQueue.cpp index 7bc1345791bcd0756502387f5e1a041f78d588a4..3bd7ecaeed9a3df546a32c4bf2753a0cad9a9a4e 100644 --- a/objectstore/RetrieveQueue.cpp +++ b/objectstore/RetrieveQueue.cpp @@ -99,7 +99,7 @@ std::string cta::objectstore::RetrieveQueue::dump() { return headerDump; } -void cta::objectstore::RetrieveQueue::addJobs(std::list<cta::objectstore::RetrieveQueue::JobToAdd> & jobsToAdd) { +void cta::objectstore::RetrieveQueue::addJobsAndCommit(std::list<cta::objectstore::RetrieveQueue::JobToAdd> & jobsToAdd) { checkPayloadWritable(); // Keep track of the mounting criteria ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); @@ -134,6 +134,7 @@ void cta::objectstore::RetrieveQueue::addJobs(std::list<cta::objectstore::Retrie i--; } } + commit(); } bool cta::objectstore::RetrieveQueue::addJobIfNecessary(uint64_t copyNb, uint64_t fSeq, diff --git a/objectstore/RetrieveQueue.hpp b/objectstore/RetrieveQueue.hpp index b9bb22bec03e7fe5bb650e0271681df90a9d7b85..52405ff9995b2eff41584223b713cc603d3f5aac 100644 --- a/objectstore/RetrieveQueue.hpp +++ b/objectstore/RetrieveQueue.hpp @@ -53,7 +53,7 @@ public: const cta::common::dataStructures::MountPolicy policy; time_t startTime; }; - void addJobs(std::list<JobToAdd> & jobsToAdd); + void addJobsAndCommit(std::list<JobToAdd> & jobsToAdd); /// This version will check for existence of the job in the queue before // returns true if a new job was actually inserted. bool addJobIfNecessary(uint64_t copyNb, uint64_t fSeq, diff --git a/scheduler/OStoreDB/MemQueues.cpp b/scheduler/OStoreDB/MemQueues.cpp index d14d7ad462bdd0ffcd22ab5be4676c02d4c1aafd..857f553c0fe57e5ec3eb97dff82dab778f7207ae 100644 --- a/scheduler/OStoreDB/MemQueues.cpp +++ b/scheduler/OStoreDB/MemQueues.cpp @@ -24,7 +24,7 @@ namespace cta { namespace ostoredb { template<> -void MemQueue<objectstore::ArchiveRequest, objectstore::ArchiveQueue>::specializedAddJobsToQueue( +void MemQueue<objectstore::ArchiveRequest, objectstore::ArchiveQueue>::specializedAddJobsToQueueAndCommit( std::list<MemQueue<objectstore::ArchiveRequest, objectstore::ArchiveQueue>::JobAndRequest> & jobsToAdd, objectstore::ArchiveQueue& queue) { std::list<objectstore::ArchiveQueue::JobToAdd> jtal; @@ -38,11 +38,11 @@ void MemQueue<objectstore::ArchiveRequest, objectstore::ArchiveQueue>::specializ j.job.owner = queueAddress; j.request.setJobOwner(j.job.copyNb, j.job.owner); } - queue.addJobs(jtal); + queue.addJobsAndCommit(jtal); } template<> -void MemQueue<objectstore::RetrieveRequest, objectstore::RetrieveQueue>::specializedAddJobsToQueue( +void MemQueue<objectstore::RetrieveRequest, objectstore::RetrieveQueue>::specializedAddJobsToQueueAndCommit( std::list<MemQueue<objectstore::RetrieveRequest, objectstore::RetrieveQueue>::JobAndRequest> & jobsToAdd, objectstore::RetrieveQueue &queue) { std::list<objectstore::RetrieveQueue::JobToAdd> jtal; @@ -63,7 +63,7 @@ void MemQueue<objectstore::RetrieveRequest, objectstore::RetrieveQueue>::special } jobAdded:; } - queue.addJobs(jtal); + queue.addJobsAndCommit(jtal); } template<> diff --git a/scheduler/OStoreDB/MemQueues.hpp b/scheduler/OStoreDB/MemQueues.hpp index ad92c81e1a8baa975543a1f4109208882d083c74..70c596c8d6b9c34b2aacbc10872f6981b1caeb09 100644 --- a/scheduler/OStoreDB/MemQueues.hpp +++ b/scheduler/OStoreDB/MemQueues.hpp @@ -185,7 +185,7 @@ private: }; /** Helper function handling the difference between archive and retrieve (vid vs tapepool) */ - static void specializedAddJobsToQueue(std::list<JobAndRequest> & jobsToAdd, Queue & queue); + static void specializedAddJobsToQueueAndCommit(std::list<JobAndRequest> & jobsToAdd, Queue & queue); /** Helper function updating the cached retrieve queue stats. Noop for archive queues */ static void specializedUpdateCachedQueueStats(Queue &queue); @@ -320,11 +320,8 @@ std::shared_ptr<SharedQueueLock<Queue, Request>> MemQueue<Request, Queue>::share addedJobs++; } // Actually ass the jobs. - specializedAddJobsToQueue(jta, queue); - double inMemoryQueueProcessTime = timer.secs(utils::Timer::resetCounter); - // We can now commit the multi-request addition to the object store - queue.commit(); - double queueCommitTime = timer.secs(utils::Timer::resetCounter); + specializedAddJobsToQueueAndCommit(jta, queue); + double queueProcessAndCommitTime = timer.secs(utils::Timer::resetCounter); // Update the cache stats in memory as we hold the queue. specializedUpdateCachedQueueStats(queue); double cacheUpdateTime = timer.secs(utils::Timer::resetCounter); @@ -351,10 +348,9 @@ std::shared_ptr<SharedQueueLock<Queue, Request>> MemQueue<Request, Queue>::share .add("addedJobs", addedJobs) .add("waitTime", waitTime) .add("getFetchedQueueTime", getFetchedQueueTime) - .add("inMemoryQueueProcessTime", inMemoryQueueProcessTime) - .add("queueCommitTime", queueCommitTime) + .add("queueProcessAndCommitTime", queueProcessAndCommitTime) .add("cacheUpdateTime", cacheUpdateTime) - .add("totalEnqueueTime", getFetchedQueueTime + inMemoryQueueProcessTime + queueCommitTime + .add("totalEnqueueTime", getFetchedQueueTime + queueProcessAndCommitTime + cacheUpdateTime + timer.secs()); logContext.log(log::INFO, "In MemQueue::sharedAddToNewQueue(): added batch of jobs to the queue."); }