Commit 1a6c0f34 authored by Steven Murray's avatar Steven Murray
Browse files

cta/CTA#130 Helgrind gets spurious when using `std::mutex` in the catalogue.

Added cta::threading::CondVar.

The new CondVar class is a C++ wrapper around a
pthread condition variable.
parent e5daf633
......@@ -113,6 +113,7 @@ set (COMMON_LIB_SRC_FILES
SmartFILEPtr.cpp
CRC.cpp
threading/ChildProcess.cpp
threading/CondVar.cpp
threading/Daemon.cpp
threading/Mutex.cpp
threading/SocketPair.cpp
......@@ -158,6 +159,7 @@ set (COMMON_UNIT_TESTS_LIB_SRC_FILES
SmartFdTest.cpp
SmartArrayPtrTest.cpp
CRCTest.cpp
threading/CondVarTest.cpp
threading/DaemonTest.cpp
threading/SocketPairTest.cpp
threading/ThreadingBlockingQTests.cpp
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "common/exception/Exception.hpp"
#include "common/threading/CondVar.hpp"
#include "common/threading/MutexLocker.hpp"
#include "common/utils/utils.hpp"
namespace cta {
namespace threading {
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
CondVar::CondVar() {
const int initRc = pthread_cond_init(&m_cond, nullptr);
if(0 != initRc) {
throw exception::Exception(std::string(__FUNCTION__) + " failed: Failed to initialise condition variable");
}
}
//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
CondVar::~CondVar() {
pthread_cond_destroy(&m_cond);
}
//------------------------------------------------------------------------------
// wait
//------------------------------------------------------------------------------
void CondVar::wait(MutexLocker &locker) {
if(!locker.m_locked) {
throw exception::Exception(std::string(__FUNCTION__) + " failed: Underlying mutex is not locked.");
}
const int waitRc = pthread_cond_wait(&m_cond, &locker.m_mutex.m_mutex);
if(0 != waitRc) {
throw exception::Exception(std::string(__FUNCTION__) + " failed: pthread_cond_wait failed:" +
utils::errnoToString(waitRc));
}
}
//------------------------------------------------------------------------------
// signal
//------------------------------------------------------------------------------
void CondVar::signal() {
const int signalRc = pthread_cond_signal(&m_cond);
if(0 != signalRc) {
throw exception::Exception(std::string(__FUNCTION__) + " failed: pthread_cond_signal failed:" +
utils::errnoToString(signalRc));
}
}
//------------------------------------------------------------------------------
// broadcast
//------------------------------------------------------------------------------
void CondVar::broadcast() {
const int broadcastRc = pthread_cond_broadcast(&m_cond);
if(0 != broadcastRc) {
throw exception::Exception(std::string(__FUNCTION__) + " failed: pthread_cond_broadcast failed:" +
utils::errnoToString(broadcastRc));
}
}
} // namespace threading
} // namespace cta
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <memory>
#include <pthread.h>
namespace cta {
namespace threading {
/**
* Forward declaration of the class representing a mutex locker.
*/
class MutexLocker;
/**
* Class representing a POSIX thread conditional variable.
*/
class CondVar {
public:
/**
* Constructor.
*/
CondVar();
/**
* Destructor.
*/
~CondVar();
/**
* Delete the copy constructor.
*/
CondVar(const CondVar &) = delete;
/**
* Delete the move constructor.
*/
CondVar(const CondVar &&) = delete;
/**
* Delete the copy assignment operator.
*/
CondVar& operator=(const CondVar &) = delete;
/**
* Delete the move assignment operator.
*/
CondVar& operator=(const CondVar &&) = delete;
/**
* Waits on the specified MutexLocker and its corresponding Mutex.
*/
void wait(MutexLocker &);
/**
* Unblocks at least one waiting thread.
*/
void signal();
/**
* Unblocks all waiting threads.
*/
void broadcast();
private:
/**
* The underlying POSIX thread condition variable.
*/
pthread_cond_t m_cond;
}; // class CondVar
} // namespace threading
} // namespace cta
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "common/threading/CondVar.hpp"
#include "common/threading/Mutex.hpp"
#include "common/threading/MutexLocker.hpp"
#include "common/threading/Thread.hpp"
#include <gtest/gtest.h>
#include <stdint.h>
namespace unitTests {
class cta_threading_CondVarTest : public ::testing::Test {
protected:
virtual void SetUp() {
}
virtual void TearDown() {
}
};
class WaitingThread: public cta::threading::Thread {
public:
WaitingThread(cta::threading::CondVar &cond, cta::threading::Mutex &m):
m_cond(cond), m_mutex(m) {
}
void run() override {
cta::threading::MutexLocker locker(m_mutex);
m_cond.wait(locker);
}
private:
cta::threading::CondVar &m_cond;
cta::threading::Mutex &m_mutex;
}; // class WaitingThread
class CounterThread: public cta::threading::Thread {
public:
enum CounterType {ODD_COUNTER, EVEN_COUNTER};
enum NotificationType {SIGNAL_NOTIFICATION, BROADCAST_NOTIFICATION};
CounterThread(
const CounterType counterType,
const NotificationType notificationType,
cta::threading::Mutex &counterMutex,
cta::threading::CondVar &counterIsOdd,
cta::threading::CondVar &counterIsEven,
uint64_t &counter,
const uint64_t maxCount):
m_counterType(counterType),
m_notificationType(notificationType),
m_counterMutex(counterMutex),
m_counterIsOdd(counterIsOdd),
m_counterIsEven(counterIsEven),
m_counter(counter),
m_maxCount(maxCount) {
}
void run() override {
if(m_counterType == ODD_COUNTER) {
incrementOddCounterValuesUntilMax();
} else { // EVEN_COUNTER
incrementEvenCounterValuesUntilMax();
}
}
void incrementOddCounterValuesUntilMax() {
cta::threading::MutexLocker locker(m_counterMutex);
while(m_counter < m_maxCount) {
const bool counterValueIsOdd = m_counter % 2;
if(counterValueIsOdd) {
m_counter++;
if(m_notificationType == SIGNAL_NOTIFICATION) {
m_counterIsEven.signal();
} else { // BROADCAST_NOTIFICATION
m_counterIsEven.broadcast();
}
}
if(m_counter < m_maxCount) {
m_counterIsOdd.wait(locker);
}
}
}
void incrementEvenCounterValuesUntilMax() {
cta::threading::MutexLocker locker(m_counterMutex);
while(m_counter < m_maxCount) {
const bool counterValueIsEven = !(m_counter % 2);
if(counterValueIsEven) {
m_counter++;
if(m_notificationType == SIGNAL_NOTIFICATION) {
m_counterIsOdd.signal();
} else { // BROADCAST_NOTIFICATION
m_counterIsOdd.broadcast();
}
}
if(m_counter < m_maxCount) {
m_counterIsEven.wait(locker);
}
}
}
private:
const CounterType m_counterType;
const NotificationType m_notificationType;
cta::threading::Mutex &m_counterMutex;
cta::threading::CondVar &m_counterIsOdd;
cta::threading::CondVar &m_counterIsEven;
uint64_t &m_counter;
const uint64_t m_maxCount;
}; // class CounterThread
TEST_F(cta_threading_CondVarTest, waitAndSignal) {
using namespace cta::threading;
cta::threading::Mutex counterMutex;
cta::threading::CondVar counterIsOdd;
cta::threading::CondVar counterIsEven;
uint64_t counter = 0;
const uint64_t maxCounter = 1024;
CounterThread oddCounter(
CounterThread::ODD_COUNTER,
CounterThread::SIGNAL_NOTIFICATION,
counterMutex,
counterIsOdd,
counterIsEven,
counter,
maxCounter);
CounterThread evenCounter(
CounterThread::EVEN_COUNTER,
CounterThread::SIGNAL_NOTIFICATION,
counterMutex,
counterIsOdd,
counterIsEven,
counter,
maxCounter);
oddCounter.start();
evenCounter.start();
oddCounter.wait();
evenCounter.wait();
}
TEST_F(cta_threading_CondVarTest, waitAndBroadcast) {
using namespace cta::threading;
cta::threading::Mutex counterMutex;
cta::threading::CondVar counterIsOdd;
cta::threading::CondVar counterIsEven;
uint64_t counter = 0;
const uint64_t maxCounter = 1024;
CounterThread oddCounter(
CounterThread::ODD_COUNTER,
CounterThread::BROADCAST_NOTIFICATION,
counterMutex,
counterIsOdd,
counterIsEven,
counter,
maxCounter);
CounterThread evenCounter(
CounterThread::EVEN_COUNTER,
CounterThread::BROADCAST_NOTIFICATION,
counterMutex,
counterIsOdd,
counterIsEven,
counter,
maxCounter);
oddCounter.start();
evenCounter.start();
oddCounter.wait();
evenCounter.wait();
}
} // namespace unitTests
......@@ -23,6 +23,12 @@
namespace cta {
namespace threading {
/**
* Forward declaration of the friend class that represents a pthread condition
* variable.
*/
class CondVar;
/**
* A simple exception throwing wrapper for pthread mutexes.
......@@ -35,6 +41,7 @@ public:
void lock() ;
void unlock();
private:
friend CondVar;
pthread_mutex_t m_mutex;
};
......
......@@ -25,6 +25,12 @@
namespace cta {
namespace threading {
/**
* Forward declaration of the friend class representing a pthread condition
* variable.
*/
class CondVar;
/**
* A simple scoped locker for mutexes. Highly recommended as
* the mutex will be released in all cases (exception, mid-code return, etc...)
......@@ -78,6 +84,7 @@ public:
}
private:
friend CondVar;
/**
* The mutex owened by this MutexLocker.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment