Skip to content
Snippets Groups Projects
Commit 8c2df253 authored by Eric Cano's avatar Eric Cano
Browse files

Removed usage of std::mutex where helgrind is used.

See commit de5f707b: std::mutex does not use pthtread_mutex_destroy and
this confuses helgrind when two mutexes exist successively on the same address. This should remove spurious
false positives in CI.
parent fd5561f4
No related branches found
No related tags found
No related merge requests found
......@@ -18,6 +18,7 @@
#pragma once
#include "Mutex.hpp"
#include "common/exception/Exception.hpp"
#include <pthread.h>
#include <semaphore.h>
......@@ -41,7 +42,29 @@ public:
MutexLocker(Mutex & m): m_mutex(m) {
m.lock();
}
/**
* Unlocker
*/
void unlock() {
if (!m_locked) {
throw exception::Exception("In MutexLocker::unlock(): trying to unlock an already unlocked mutex");
}
m_mutex.unlock();
m_locked=false;
}
/**
* Locker
*/
void lock() {
if (m_locked) {
throw exception::Exception("In MutexLocker::lock(): trying to relock an locked mutex");
}
m_mutex.lock();
m_locked=true;
}
/**
* Destructor.
*/
......@@ -59,6 +82,11 @@ private:
* The mutex owened by this MutexLocker.
*/
Mutex & m_mutex;
/**
* Tracking of the state of the mutex
*/
bool m_locked=true;
}; // class MutexLocker
......
......@@ -51,7 +51,7 @@ AgentReference::AgentReference(const std::string & clientType) {
m_agentAddress = aid.str();
// Initialize the serialization token for queued actions (lock will make helgrind
// happy, but not really needed
std::unique_lock<std::mutex> ulg(m_currentQueueMutex);
threading::MutexLocker ml(m_currentQueueMutex);
m_nextQueueExecutionPromise.reset(new std::promise<void>);
m_nextQueueExecutionFuture = m_nextQueueExecutionPromise->get_future();
ANNOTATE_HAPPENS_BEFORE(m_nextQueueExecutionPromise.get());
......@@ -107,10 +107,10 @@ void AgentReference::bumpHeatbeat(objectstore::Backend& backend) {
void AgentReference::queueAndExecuteAction(std::shared_ptr<Action> action, objectstore::Backend& backend) {
// First, we need to determine if a queue exists or not.
// If so, we just use it, and if not, we create and serve it.
std::unique_lock<std::mutex> ulGlobal(m_currentQueueMutex);
threading::MutexLocker ulGlobal(m_currentQueueMutex);
if (m_currentQueue) {
// There is already a queue
std::unique_lock<std::mutex> ulQueue(m_currentQueue->mutex);
threading::MutexLocker ulQueue(m_currentQueue->mutex);
m_currentQueue->queue.push_back(action);
// If this is time to run, wake up the serving thread
if (m_currentQueue->queue.size() + 1 >= m_maxQueuedItems) {
......@@ -131,7 +131,7 @@ void AgentReference::queueAndExecuteAction(std::shared_ptr<Action> action, objec
// To make sure there is no lifetime issues, we make it a shared_ptr
std::shared_ptr<ActionQueue> q(new ActionQueue);
// Lock the queue
std::unique_lock<std::mutex> ulq(q->mutex);
threading::MutexLocker ulq(q->mutex);
// Get it referenced
m_currentQueue = q;
// Get our execution promise and future and leave one behind.
......@@ -172,7 +172,7 @@ void AgentReference::queueAndExecuteAction(std::shared_ptr<Action> action, objec
appyAction(*action, ag);
// Then those of other threads
for (auto a: q->queue) {
std::lock_guard<std::mutex> lg(a->mutex);
threading::MutexLocker ml(a->mutex);
appyAction(*a, ag);
}
// and commit
......@@ -183,7 +183,7 @@ void AgentReference::queueAndExecuteAction(std::shared_ptr<Action> action, objec
promiseForNextQueue->set_value();
// We now pass the exception to all threads
for (auto a: q->queue) {
std::lock_guard<std::mutex> lg(a->mutex);
threading::MutexLocker ml(a->mutex);
ANNOTATE_HAPPENS_BEFORE(&a->promise);
a->promise.set_exception(std::current_exception());
}
......@@ -195,7 +195,7 @@ void AgentReference::queueAndExecuteAction(std::shared_ptr<Action> action, objec
promiseForNextQueue->set_value();
// and release the other threads
for (auto a: q->queue) {
std::lock_guard<std::mutex> lg(a->mutex);
threading::MutexLocker ml(a->mutex);
ANNOTATE_HAPPENS_BEFORE(&a->promise);
a->promise.set_value();
}
......
......@@ -20,6 +20,8 @@
#include "common/helgrind_annotator.hpp"
#include "objectstore/Backend.hpp"
#include "common/threading/Mutex.hpp"
#include "common/threading/MutexLocker.hpp"
#include <atomic>
#include <string>
#include <future>
......@@ -125,11 +127,11 @@ private:
* A mutex ensuring the object will not be released before the promise's result
* is fully pushed.
*/
std::mutex mutex;
threading::Mutex mutex;
~Action() {
// The setting of promise result will be protected by this mutex, so destruction
// will only happen after promise setting is complete.
std::lock_guard<std::mutex> lm(mutex);
threading::MutexLocker ml(mutex);
}
};
......@@ -137,7 +139,7 @@ private:
* The queue with the lock and flush control
*/
struct ActionQueue {
std::mutex mutex;
threading::Mutex mutex;
std::list<std::shared_ptr<Action>> queue;
std::promise<void> promise;
};
......@@ -157,7 +159,7 @@ private:
*/
void queueAndExecuteAction(std::shared_ptr<Action> action, objectstore::Backend& backend);
std::mutex m_currentQueueMutex;
threading::Mutex m_currentQueueMutex;
std::shared_ptr<ActionQueue> m_currentQueue;
/**
* This pointer holds a promise that will be picked up by the thread managing
......
......@@ -22,14 +22,14 @@
namespace cta { namespace ostoredb {
std::mutex MemArchiveQueue::g_mutex;
threading::Mutex MemArchiveQueue::g_mutex;
std::map<std::string, std::shared_ptr<MemArchiveQueue>> MemArchiveQueue::g_queues;
std::shared_ptr<SharedQueueLock> MemArchiveQueue::sharedAddToArchiveQueue(objectstore::ArchiveRequest::JobDump& job,
objectstore::ArchiveRequest& archiveRequest, OStoreDB & oStoreDB, log::LogContext & logContext) {
// 1) Take the global lock (implicit in the constructor)
std::unique_lock<std::mutex> globalLock(g_mutex);
threading::MutexLocker globalLock(g_mutex);
std::shared_ptr<MemArchiveQueue> q;
try {
// 2) Determine if the queue exists already or not
......@@ -40,7 +40,7 @@ std::shared_ptr<SharedQueueLock> MemArchiveQueue::sharedAddToArchiveQueue(object
}
// It does: we just ride the train: queue ourselves
// Lock the queue.
std::unique_lock<std::mutex> ulq(q->m_mutex);
threading::MutexLocker ulq(q->m_mutex);
std::shared_ptr<MemArchiveQueueRequest> maqr(new MemArchiveQueueRequest(job, archiveRequest));
// Extract the future before the other thread gets a chance to touch the promise.
auto resultFuture = maqr->m_promise.get_future();
......@@ -69,7 +69,7 @@ std::shared_ptr<SharedQueueLock> MemArchiveQueue::sharedAddToArchiveQueue(object
std::shared_ptr<SharedQueueLock> MemArchiveQueue::sharedAddToArchiveQueueWithNewQueue(
objectstore::ArchiveRequest::JobDump& job, objectstore::ArchiveRequest& archiveRequest,
OStoreDB& oStoreDB, log::LogContext& logContext, std::unique_lock<std::mutex>& globalLock) {
OStoreDB& oStoreDB, log::LogContext& logContext, threading::MutexLocker &globalLock) {
utils::Timer timer;
// Re-check the queue is not there
if (g_queues.end() != g_queues.find(job.tapePool)) {
......@@ -95,7 +95,7 @@ std::shared_ptr<SharedQueueLock> MemArchiveQueue::sharedAddToArchiveQueueWithNew
// Our mem queue is now unreachable so we can let the global part go
globalLock.unlock();
// Lock the queue, to make sure the last user is done posting.
std::unique_lock<std::mutex> ulq(maq->m_mutex);
threading::MutexLocker ulq(maq->m_mutex);
double waitTime = timer.secs(utils::Timer::resetCounter);
// We can now proceed with the queuing of the jobs in the object store.
try {
......@@ -148,7 +148,7 @@ std::shared_ptr<SharedQueueLock> MemArchiveQueue::sharedAddToArchiveQueueWithNew
// And finally release all the user threads
for (auto &maqr: maq->m_requests) {
{
std::lock_guard<std::mutex> (maqr->m_mutex);
threading::MutexLocker (maqr->m_mutex);
ANNOTATE_HAPPENS_BEFORE(&maqr->m_promise);
maqr->m_returnValue=ret;
maqr->m_promise.set_value();
......@@ -170,7 +170,7 @@ std::shared_ptr<SharedQueueLock> MemArchiveQueue::sharedAddToArchiveQueueWithNew
// Something went wrong. We should inform the other threads
for (auto & maqr: maq->m_requests) {
try {
std::lock_guard<std::mutex> (maqr->m_mutex);
threading::MutexLocker (maqr->m_mutex);
ANNOTATE_HAPPENS_BEFORE(&maqr->m_promise);
maqr->m_promise.set_exception(std::current_exception());
} catch (...) {
......
......@@ -22,6 +22,8 @@
#include "objectstore/ArchiveRequest.hpp"
#include "objectstore/ArchiveQueue.hpp"
#include "common/log/LogContext.hpp"
#include "common/threading/Mutex.hpp"
#include "common/threading/MutexLocker.hpp"
#include <unistd.h>
#include <syscall.h>
......@@ -55,7 +57,7 @@ public:
MemArchiveQueueRequest(objectstore::ArchiveRequest::JobDump & job,
objectstore::ArchiveRequest & archiveRequest): m_job(job), m_archiveRequest(archiveRequest), m_tid(::syscall(SYS_gettid)) {}
virtual ~MemArchiveQueueRequest() {
std::lock_guard<std::mutex> lg(m_mutex);
threading::MutexLocker ml(m_mutex);
}
private:
objectstore::ArchiveRequest::JobDump & m_job;
......@@ -63,7 +65,7 @@ private:
std::promise<void> m_promise;
std::shared_ptr<SharedQueueLock> m_returnValue;
// Mutex protecting users against premature deletion
std::mutex m_mutex;
threading::Mutex m_mutex;
// Helper for debugging
pid_t m_tid;
};
......@@ -94,7 +96,7 @@ public:
private:
/** Mutex that should be locked before attempting any operation */
std::mutex m_mutex;
threading::Mutex m_mutex;
/** Add the object */
void add(std::shared_ptr<MemArchiveQueueRequest>& request);
/** Static function implementing the shared addition of archive requests to
......@@ -102,12 +104,12 @@ private:
static const size_t g_maxQueuedElements = 100;
std::list<std::shared_ptr<MemArchiveQueueRequest>> m_requests;
std::promise<void> m_promise;
static std::mutex g_mutex;
static threading::Mutex g_mutex;
static std::map<std::string, std::shared_ptr<MemArchiveQueue>> g_queues;
/** Helper function for sharedAddToArchiveQueue */
static std::shared_ptr<SharedQueueLock> sharedAddToArchiveQueueWithNewQueue(objectstore::ArchiveRequest::JobDump & job,
objectstore::ArchiveRequest & archiveRequest, OStoreDB & oStoreDB, log::LogContext & logContext, std::unique_lock<std::mutex> & globalLock);
objectstore::ArchiveRequest & archiveRequest, OStoreDB & oStoreDB, log::LogContext & logContext, threading::MutexLocker &globalLock);
};
}} // namespace cta::ostoreDBUtils
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment