diff --git a/scheduler/OStoreDB/MemQueues.cpp b/scheduler/OStoreDB/MemQueues.cpp index 998a97835842e415f2ed13c0a58111ae184ac4b0..d3c95ca91026de80724187bd450bb783b75e7183 100644 --- a/scheduler/OStoreDB/MemQueues.cpp +++ b/scheduler/OStoreDB/MemQueues.cpp @@ -26,7 +26,7 @@ std::mutex MemArchiveQueue::g_mutex; std::map<std::string, MemArchiveQueue *> MemArchiveQueue::g_queues; -void MemArchiveQueue::sharedAddToArchiveQueue(objectstore::ArchiveRequest::JobDump& job, +std::shared_ptr<SharedQueueLock> MemArchiveQueue::sharedAddToArchiveQueue(objectstore::ArchiveRequest::JobDump& job, objectstore::ArchiveRequest& archiveRequest, OStoreDB & oStoreDB, log::LogContext & logContext) { // 1) Take the global lock (implicit in the constructor) std::unique_lock<std::mutex> ul(g_mutex); @@ -48,7 +48,7 @@ void MemArchiveQueue::sharedAddToArchiveQueue(objectstore::ArchiveRequest::JobDu ulq.unlock(); ul.unlock(); // Wait for our request completion (this could throw, if there was a problem) - maqr.m_promise.get_future().get(); + return maqr.m_promise.get_future().get(); } catch (std::out_of_range &) { utils::Timer timer; // The queue for our tape pool does not exist. We will create it, wait for @@ -75,8 +75,11 @@ void MemArchiveQueue::sharedAddToArchiveQueue(objectstore::ArchiveRequest::JobDu double waitTime = timer.secs(utils::Timer::resetCounter); // We can now proceed with the queuing of the jobs. try { - objectstore::ArchiveQueue aq(oStoreDB.m_objectStore); - objectstore::ScopedExclusiveLock aql; + std::shared_ptr<SharedQueueLock> ret(new SharedQueueLock(logContext)); + ret->m_queue.reset(new objectstore::ArchiveQueue(oStoreDB.m_objectStore)); + ret->m_lock.reset(new objectstore::ScopedExclusiveLock); + auto & aq = *ret->m_queue; + auto & aql = *ret->m_lock; oStoreDB.getLockedAndFetchedArchiveQueue(aq, aql, job.tapePool); size_t aqSizeBefore=aq.dumpJobs().size(); size_t addedJobs=1; @@ -112,13 +115,16 @@ void MemArchiveQueue::sharedAddToArchiveQueue(objectstore::ArchiveRequest::JobDu .add("addedJobs", addedJobs) .add("waitTime", waitTime) .add("enqueueTime", timer.secs()); - logContext.log(log::INFO, "In MemArchiveQueue::sharedAddToArchiveQueue(): add batch of jobs to the queue."); + logContext.log(log::INFO, "In MemArchiveQueue::sharedAddToArchiveQueue(): added batch of jobs to the queue."); } + // We will also count how much time we mutually wait for the other threads. + ret->m_timer.reset(); // And finally release all the user threads for (auto &maqr: maq.m_requests) { - maqr->m_promise.set_value(); + maqr->m_promise.set_value(ret); } // Done! + return ret; } catch (...) { try { std::rethrow_exception(std::current_exception()); @@ -160,6 +166,13 @@ void MemArchiveQueue::sharedAddToArchiveQueue(objectstore::ArchiveRequest::JobDu } } +SharedQueueLock::~SharedQueueLock() { + m_lock->release(); + log::ScopedParamContainer params(m_logContext); + params.add("objectQueue", m_queue->getAddressIfSet()) + .add("waitAndUnlockTime", m_timer.secs()); + m_logContext.log(log::INFO, "In SharedQueueLock::~SharedQueueLock(): unlocked the archive queue pointer."); +} void MemArchiveQueue::add(MemArchiveQueueRequest& request) { m_requests.emplace_back(&request); diff --git a/scheduler/OStoreDB/MemQueues.hpp b/scheduler/OStoreDB/MemQueues.hpp index 383cbbaa317d66a5871089a75691bd4915857287..cf2578340de53be5e3da5334dd92c3f82ba9e328 100644 --- a/scheduler/OStoreDB/MemQueues.hpp +++ b/scheduler/OStoreDB/MemQueues.hpp @@ -29,6 +29,23 @@ namespace cta { // Forward declaration class OStoreDB; namespace ostoredb { +/** + * A container to which the ownership of the archive queue (and more important, + * its lock) will be passed. This container will be passed as a shared pointer + * to the caller of sharedAddToArchiveQueue, so they can delete their copy AFTER + * updating the ownership of their requests. + */ +class SharedQueueLock { + friend class MemArchiveQueue; +public: + SharedQueueLock(log::LogContext & logContext): m_logContext(logContext) {} + ~SharedQueueLock(); +private: + std::unique_ptr<objectstore::ScopedExclusiveLock> m_lock; + std::unique_ptr<objectstore::ArchiveQueue> m_queue; + log::LogContext & m_logContext; + utils::Timer m_timer; +}; class MemArchiveQueueRequest { friend class MemArchiveQueue; @@ -38,7 +55,7 @@ public: private: objectstore::ArchiveRequest::JobDump & m_job; objectstore::ArchiveRequest & m_archiveRequest; - std::promise<void> m_promise; + std::promise<std::shared_ptr<SharedQueueLock>> m_promise; }; class MemArchiveQueue { @@ -62,7 +79,7 @@ public: * if needed * @param logContext log context to log addition of jobs to the queue. */ - static void sharedAddToArchiveQueue(objectstore::ArchiveRequest::JobDump & job, + static std::shared_ptr<SharedQueueLock> sharedAddToArchiveQueue(objectstore::ArchiveRequest::JobDump & job, objectstore::ArchiveRequest & archiveRequest, OStoreDB & oStoreDB, log::LogContext & logContext); private: diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 780fff0084fc4516d70c91df776c81b7222d598b..8a0984cfeb6cdc3a8fdef0f54941ff44d1abbf77 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -367,7 +367,8 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common:: try { for (auto &j: aReq.dumpJobs()) { currentTapepool = j.tapePool; - ostoredb::MemArchiveQueue::sharedAddToArchiveQueue(j, aReq, *this, logContext); + // The shared lock will be released automatically at the end of this scope. + auto shareLock = ostoredb::MemArchiveQueue::sharedAddToArchiveQueue(j, aReq, *this, logContext); linkedTapePools.push_back(j.ArchiveQueueAddress); aReq.setJobOwner(j.copyNb, j.ArchiveQueueAddress); log::ScopedParamContainer params(logContext); diff --git a/tests/helgrind.suppr b/tests/helgrind.suppr index 2fc975432ef307f6ca980e041d6b547b593521ac..f8acc4b9529548f59c763e3a51a38b8f5d21d439 100644 --- a/tests/helgrind.suppr +++ b/tests/helgrind.suppr @@ -287,16 +287,6 @@ ... } -{ - heap_address_collision - Helgrind:Race - ... - fun:_ZNSt7promiseIvED1Ev - fun:_ZN3cta11objectstore14AgentReference6ActionD1Ev - fun:_ZN3cta11objectstore14AgentReference19removeFromOwnershipERKSsRNS0_7BackendE - ... -} - { dubious_cond_var_in_promise Helgrind:Misc @@ -308,20 +298,10 @@ } { - Stack_allocated_object_deletion_probably_coincidence_of_address - Helgrind:Race - ... - fun:_ZN3cta11objectstore14AgentReference6ActionD1Ev - fun:_ZN3cta11objectstore14AgentReference14addToOwnershipERKSsRNS0_7BackendE - ... -} - -{ - Objects_passed_form_thread_to_thread + Locks taken from 2 objects Helgrind:Race ... + fun:_ZNSt7promiseIvED1Ev + fun:_ZNKSt14default_deleteISt7promiseIvEEclEPS1_ fun:_ZNSt10unique_ptrISt7promiseIvESt14default_deleteIS1_EED1Ev fun:_ZN3cta11objectstore14AgentReference21queueAndExecuteActionERNS1_6ActionERNS0_7BackendE - ... -} -