diff --git a/scheduler/OStoreDB/MemQueues.cpp b/scheduler/OStoreDB/MemQueues.cpp index 7253eacd2c321d7b01bf733a8ffa8c7a3b368c72..ce43be435b33119ce76504b7eacde5771cff7d2d 100644 --- a/scheduler/OStoreDB/MemQueues.cpp +++ b/scheduler/OStoreDB/MemQueues.cpp @@ -27,7 +27,7 @@ std::mutex MemArchiveQueue::g_mutex; std::map<std::string, MemArchiveQueue *> MemArchiveQueue::g_queues; void MemArchiveQueue::sharedAddToArchiveQueue(objectstore::ArchiveRequest::JobDump& job, - objectstore::ArchiveRequest& archiveRequest, OStoreDB & oStoreDB) { + objectstore::ArchiveRequest& archiveRequest, OStoreDB & oStoreDB, log::LogContext & logContext) { // 1) Take the global lock (implicit in the constructor) std::unique_lock<std::mutex> ul(g_mutex); try { @@ -50,6 +50,7 @@ void MemArchiveQueue::sharedAddToArchiveQueue(objectstore::ArchiveRequest::JobDu // Wait for our request completion (this could throw, if there was a problem) maqr.m_promise.get_future().get(); } catch (std::out_of_range &) { + utils::Timer timer; // The queue for our tape pool does not exist. We will create it, wait for // the necessary amount of time or requests and release it. // Create the queue @@ -71,11 +72,14 @@ void MemArchiveQueue::sharedAddToArchiveQueue(objectstore::ArchiveRequest::JobDu g_queues.erase(i); // Our mem queue is now unreachable so we can let the global part go ul.unlock(); + double waitTime = timer.secs(utils::Timer::resetCounter); // We can now proceed with the queuing of the jobs. try { objectstore::ArchiveQueue aq(oStoreDB.m_objectStore); objectstore::ScopedExclusiveLock aql; oStoreDB.getLockedAndFetchedArchiveQueue(aq, aql, job.tapePool); + size_t aqSizeBefore=aq.dumpJobs().size(); + size_t addedJobs=1; // First add the job for this thread { auto af = archiveRequest.getArchiveFile(); @@ -95,9 +99,21 @@ void MemArchiveQueue::sharedAddToArchiveQueue(objectstore::ArchiveRequest::JobDu // Back reference the queue in the job and archive request maqr->m_job.ArchiveQueueAddress = aq.getAddressIfSet(); archiveRequest.setJobArchiveQueueAddress(maqr->m_job.copyNb, aq.getAddressIfSet()); + addedJobs++; } // We can now commit the multi-request addition to the object store aq.commit(); + size_t aqSizeAfter=aq.dumpJobs().size(); + { + log::ScopedParamContainer params(logContext); + params.add("objectQueue", aq.getAddressIfSet()) + .add("sizeBefore", aqSizeBefore) + .add("sizeAfter", aqSizeAfter) + .add("addedJobs", addedJobs) + .add("waitTime", waitTime) + .add("enqueueTime", timer.secs()); + logContext.log(log::INFO, "In MemArchiveQueue::sharedAddToArchiveQueue"); + } // And finally release all the user threads for (auto &maqr: maq.m_requests) { maqr->m_promise.set_value(); diff --git a/scheduler/OStoreDB/MemQueues.hpp b/scheduler/OStoreDB/MemQueues.hpp index 5b1ab4f74a7f4ca1a13c835e33f49aa9fe386b7c..383cbbaa317d66a5871089a75691bd4915857287 100644 --- a/scheduler/OStoreDB/MemQueues.hpp +++ b/scheduler/OStoreDB/MemQueues.hpp @@ -21,6 +21,7 @@ #include "objectstore/ArchiveRequest.hpp" #include "objectstore/ArchiveQueue.hpp" +#include "common/log/LogContext.hpp" #include <future> @@ -57,9 +58,12 @@ public: * * @param job to be added to the ArchiveQueue (contains the tape pool name) * @param archiveRequest the request itself. + * @param oStoreDB reference to the object store, allowing creation of the queue + * if needed + * @param logContext log context to log addition of jobs to the queue. */ static void sharedAddToArchiveQueue(objectstore::ArchiveRequest::JobDump & job, - objectstore::ArchiveRequest & archiveRequest, OStoreDB & oStoreDB); + objectstore::ArchiveRequest & archiveRequest, OStoreDB & oStoreDB, log::LogContext & logContext); private: /** Mutex that should be locked before attempting any operation */ diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 5cbce2b203510666ccdf95fff4223308fadba7fa..95c937cdc5200de664ded4469f884d87202744d2 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -367,7 +367,7 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common:: try { for (auto &j: aReq.dumpJobs()) { currentTapepool = j.tapePool; - ostoredb::MemArchiveQueue::sharedAddToArchiveQueue(j, aReq, *this); + ostoredb::MemArchiveQueue::sharedAddToArchiveQueue(j, aReq, *this, logContext); linkedTapePools.push_back(j.ArchiveQueueAddress); aReq.setJobOwner(j.copyNb, j.ArchiveQueueAddress); log::ScopedParamContainer params(logContext);