Commit fbd5aa91 authored by Cristina Moraru's avatar Cristina Moraru
Browse files

Merge branch 'master' into rao_branch

parents 19d0fbbf b7cf5b21
......@@ -113,6 +113,7 @@ set (COMMON_LIB_SRC_FILES
SmartFILEPtr.cpp
CRC.cpp
threading/ChildProcess.cpp
threading/CondVar.cpp
threading/Daemon.cpp
threading/Mutex.cpp
threading/SocketPair.cpp
......@@ -158,6 +159,7 @@ set (COMMON_UNIT_TESTS_LIB_SRC_FILES
SmartFdTest.cpp
SmartArrayPtrTest.cpp
CRCTest.cpp
threading/CondVarTest.cpp
threading/DaemonTest.cpp
threading/SocketPairTest.cpp
threading/ThreadingBlockingQTests.cpp
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "common/exception/Exception.hpp"
#include "common/threading/CondVar.hpp"
#include "common/threading/MutexLocker.hpp"
#include "common/utils/utils.hpp"
namespace cta {
namespace threading {
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
CondVar::CondVar() {
const int initRc = pthread_cond_init(&m_cond, nullptr);
if(0 != initRc) {
throw exception::Exception(std::string(__FUNCTION__) + " failed: Failed to initialise condition variable");
}
}
//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
CondVar::~CondVar() {
pthread_cond_destroy(&m_cond);
}
//------------------------------------------------------------------------------
// wait
//------------------------------------------------------------------------------
void CondVar::wait(MutexLocker &locker) {
if(!locker.m_locked) {
throw exception::Exception(std::string(__FUNCTION__) + " failed: Underlying mutex is not locked.");
}
const int waitRc = pthread_cond_wait(&m_cond, &locker.m_mutex.m_mutex);
if(0 != waitRc) {
throw exception::Exception(std::string(__FUNCTION__) + " failed: pthread_cond_wait failed:" +
utils::errnoToString(waitRc));
}
}
//------------------------------------------------------------------------------
// signal
//------------------------------------------------------------------------------
void CondVar::signal() {
const int signalRc = pthread_cond_signal(&m_cond);
if(0 != signalRc) {
throw exception::Exception(std::string(__FUNCTION__) + " failed: pthread_cond_signal failed:" +
utils::errnoToString(signalRc));
}
}
//------------------------------------------------------------------------------
// broadcast
//------------------------------------------------------------------------------
void CondVar::broadcast() {
const int broadcastRc = pthread_cond_broadcast(&m_cond);
if(0 != broadcastRc) {
throw exception::Exception(std::string(__FUNCTION__) + " failed: pthread_cond_broadcast failed:" +
utils::errnoToString(broadcastRc));
}
}
} // namespace threading
} // namespace cta
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <memory>
#include <pthread.h>
namespace cta {
namespace threading {
/**
* Forward declaration of the class representing a mutex locker.
*/
class MutexLocker;
/**
* Class representing a POSIX thread conditional variable.
*/
class CondVar {
public:
/**
* Constructor.
*/
CondVar();
/**
* Destructor.
*/
~CondVar();
/**
* Delete the copy constructor.
*/
CondVar(const CondVar &) = delete;
/**
* Delete the move constructor.
*/
CondVar(const CondVar &&) = delete;
/**
* Delete the copy assignment operator.
*/
CondVar& operator=(const CondVar &) = delete;
/**
* Delete the move assignment operator.
*/
CondVar& operator=(const CondVar &&) = delete;
/**
* Waits on the specified MutexLocker and its corresponding Mutex.
*/
void wait(MutexLocker &);
/**
* Unblocks at least one waiting thread.
*/
void signal();
/**
* Unblocks all waiting threads.
*/
void broadcast();
private:
/**
* The underlying POSIX thread condition variable.
*/
pthread_cond_t m_cond;
}; // class CondVar
} // namespace threading
} // namespace cta
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "common/threading/CondVar.hpp"
#include "common/threading/Mutex.hpp"
#include "common/threading/MutexLocker.hpp"
#include "common/threading/Thread.hpp"
#include <gtest/gtest.h>
#include <stdint.h>
namespace unitTests {
class cta_threading_CondVarTest : public ::testing::Test {
protected:
virtual void SetUp() {
}
virtual void TearDown() {
}
};
class WaitingThread: public cta::threading::Thread {
public:
WaitingThread(cta::threading::CondVar &cond, cta::threading::Mutex &m):
m_cond(cond), m_mutex(m) {
}
void run() override {
cta::threading::MutexLocker locker(m_mutex);
m_cond.wait(locker);
}
private:
cta::threading::CondVar &m_cond;
cta::threading::Mutex &m_mutex;
}; // class WaitingThread
class CounterThread: public cta::threading::Thread {
public:
enum CounterType {ODD_COUNTER, EVEN_COUNTER};
enum NotificationType {SIGNAL_NOTIFICATION, BROADCAST_NOTIFICATION};
CounterThread(
const CounterType counterType,
const NotificationType notificationType,
cta::threading::Mutex &counterMutex,
cta::threading::CondVar &counterIsOdd,
cta::threading::CondVar &counterIsEven,
uint64_t &counter,
const uint64_t maxCount):
m_counterType(counterType),
m_notificationType(notificationType),
m_counterMutex(counterMutex),
m_counterIsOdd(counterIsOdd),
m_counterIsEven(counterIsEven),
m_counter(counter),
m_maxCount(maxCount) {
}
void run() override {
if(m_counterType == ODD_COUNTER) {
incrementOddCounterValuesUntilMax();
} else { // EVEN_COUNTER
incrementEvenCounterValuesUntilMax();
}
}
void incrementOddCounterValuesUntilMax() {
cta::threading::MutexLocker locker(m_counterMutex);
while(m_counter < m_maxCount) {
const bool counterValueIsOdd = m_counter % 2;
if(counterValueIsOdd) {
m_counter++;
if(m_notificationType == SIGNAL_NOTIFICATION) {
m_counterIsEven.signal();
} else { // BROADCAST_NOTIFICATION
m_counterIsEven.broadcast();
}
}
if(m_counter < m_maxCount) {
m_counterIsOdd.wait(locker);
}
}
}
void incrementEvenCounterValuesUntilMax() {
cta::threading::MutexLocker locker(m_counterMutex);
while(m_counter < m_maxCount) {
const bool counterValueIsEven = !(m_counter % 2);
if(counterValueIsEven) {
m_counter++;
if(m_notificationType == SIGNAL_NOTIFICATION) {
m_counterIsOdd.signal();
} else { // BROADCAST_NOTIFICATION
m_counterIsOdd.broadcast();
}
}
if(m_counter < m_maxCount) {
m_counterIsEven.wait(locker);
}
}
}
private:
const CounterType m_counterType;
const NotificationType m_notificationType;
cta::threading::Mutex &m_counterMutex;
cta::threading::CondVar &m_counterIsOdd;
cta::threading::CondVar &m_counterIsEven;
uint64_t &m_counter;
const uint64_t m_maxCount;
}; // class CounterThread
TEST_F(cta_threading_CondVarTest, waitAndSignal) {
using namespace cta::threading;
cta::threading::Mutex counterMutex;
cta::threading::CondVar counterIsOdd;
cta::threading::CondVar counterIsEven;
uint64_t counter = 0;
const uint64_t maxCounter = 1024;
CounterThread oddCounter(
CounterThread::ODD_COUNTER,
CounterThread::SIGNAL_NOTIFICATION,
counterMutex,
counterIsOdd,
counterIsEven,
counter,
maxCounter);
CounterThread evenCounter(
CounterThread::EVEN_COUNTER,
CounterThread::SIGNAL_NOTIFICATION,
counterMutex,
counterIsOdd,
counterIsEven,
counter,
maxCounter);
oddCounter.start();
evenCounter.start();
oddCounter.wait();
evenCounter.wait();
}
TEST_F(cta_threading_CondVarTest, waitAndBroadcast) {
using namespace cta::threading;
cta::threading::Mutex counterMutex;
cta::threading::CondVar counterIsOdd;
cta::threading::CondVar counterIsEven;
uint64_t counter = 0;
const uint64_t maxCounter = 1024;
CounterThread oddCounter(
CounterThread::ODD_COUNTER,
CounterThread::BROADCAST_NOTIFICATION,
counterMutex,
counterIsOdd,
counterIsEven,
counter,
maxCounter);
CounterThread evenCounter(
CounterThread::EVEN_COUNTER,
CounterThread::BROADCAST_NOTIFICATION,
counterMutex,
counterIsOdd,
counterIsEven,
counter,
maxCounter);
oddCounter.start();
evenCounter.start();
oddCounter.wait();
evenCounter.wait();
}
} // namespace unitTests
......@@ -23,6 +23,12 @@
namespace cta {
namespace threading {
/**
* Forward declaration of the friend class that represents a pthread condition
* variable.
*/
class CondVar;
/**
* A simple exception throwing wrapper for pthread mutexes.
......@@ -35,6 +41,7 @@ public:
void lock() ;
void unlock();
private:
friend CondVar;
pthread_mutex_t m_mutex;
};
......
......@@ -21,11 +21,16 @@
#include "common/exception/Exception.hpp"
#include <pthread.h>
#include <semaphore.h>
namespace cta {
namespace threading {
/**
* Forward declaration of the friend class representing a pthread condition
* variable.
*/
class CondVar;
/**
* A simple scoped locker for mutexes. Highly recommended as
* the mutex will be released in all cases (exception, mid-code return, etc...)
......@@ -79,6 +84,7 @@ public:
}
private:
friend CondVar;
/**
* The mutex owened by this MutexLocker.
......
......@@ -18,13 +18,14 @@
#include "GetOptThreadSafe.hpp"
#include "common/exception/Exception.hpp"
#include "common/threading/MutexLocker.hpp"
#include <memory>
namespace cta { namespace utils {
GetOpThreadSafe::Reply GetOpThreadSafe::getOpt(const Request& request) {
std::lock_guard<std::mutex> lock(gMutex);
threading::MutexLocker locker(gMutex);
// Prepare the classic styled argv.
std::unique_ptr<char * []>argv(new char *[request.argv.size()]);
char ** p=argv.get();
......@@ -71,6 +72,6 @@ GetOpThreadSafe::Reply GetOpThreadSafe::getOpt(const Request& request) {
return ret;
}
std::mutex GetOpThreadSafe::gMutex;
threading::Mutex GetOpThreadSafe::gMutex;
}} // namespace cta::utils
\ No newline at end of file
}} // namespace cta::utils
......@@ -16,11 +16,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "common/threading/Mutex.hpp"
#include <vector>
#include <string>
#include <unistd.h>
#include <getopt.h>
#include <mutex>
namespace cta { namespace utils {
......@@ -45,7 +46,7 @@ public:
};
static Reply getOpt (const Request & request);
private:
static std::mutex gMutex;
static threading::Mutex gMutex;
};
}} // namespace cta::utils
\ No newline at end of file
}} // namespace cta::utils
......@@ -37,6 +37,7 @@ echo ${DATABASEURL} >/etc/cta/cta_catalogue_db.conf
# cta-taped setup
echo "taped BufferCount 10" > /etc/cta/cta.conf
echo "taped MountCriteria 2000000, 5" >> /etc/cta/cta.conf
echo "general ObjectStoreURL $OBJECTSTOREURL" >> /etc/cta/cta.conf
echo "${tpconfig}" > /etc/cta/TPCONFIG
......
......@@ -40,6 +40,7 @@ echo ${DATABASEURL} >/etc/cta/cta_catalogue_db.conf
# cta-taped setup
echo "taped BufferCount 10" > /etc/cta/cta.conf
echo "taped MountCriteria 2000000, 5" >> /etc/cta/cta.conf
echo "general ObjectStoreURL $OBJECTSTOREURL" >> /etc/cta/cta.conf
echo "${tpconfig}" > /etc/cta/TPCONFIG
......
No preview for this file type
No preview for this file type
......@@ -33,7 +33,7 @@ then propagated to CTA, rate limited). 2 sub use cases:
In addition, in order to make sure no changes were lost, implicit operations are needed: \begin{itemize} \item
fast reconciliation (in flight archive requests for sure, maybe retrieve requests as well) \item full or slow
reconciliation (complete name space scan) \end{itemize}
reconciliation (complete name space scan) \item synchronize the list of valid tape storage classes between EOS and CTA\end{itemize}
Finally, this section will describe the technical means for the interface between EOS and CTA and the performance
requirements.
......@@ -378,7 +378,9 @@ to CTA when the file is moved to the recycle bin in EOS, or when it is definitel
chain reconciliation should be devised. \item Slow reconciliation interface \item Action on storage class change
for a file? (postponed to repack?) \item Catalogue will also keep track of requests for each files (archive and
retrieve) so that queueing can be made idempotent. \item Chaining of archive and retrieve requests to retrieve
requests. Excution of retrieve requests as disk to disk copy if possible. \item CAtalogue files could hold the
necessary info to recreate the archive request if needed. \end{itemize}
requests. Excution of retrieve requests as disk to disk copy if possible. \item Catalogue files could hold the
necessary info to recreate the archive request if needed. \item The list of valid storage classes needs to be
synchronized between EOS and CTA. EOS should not allow a power user to label a directory with an invalid storage
class. CTA should not delete or invalidate a storage class that is being used by EOS.\end{itemize}
\end{document}
......@@ -17,6 +17,7 @@
*/
#include "common/exception/Exception.hpp"
#include "common/threading/MutexLocker.hpp"
#include "rdbms/ConnPool.hpp"
#include <iostream>
......@@ -56,10 +57,10 @@ void ConnPool::createConns(const uint64_t nbConns) {
// getConn
//------------------------------------------------------------------------------
PooledConn ConnPool::getConn() {
std::unique_lock<std::mutex> lock(m_connsMutex);
threading::MutexLocker locker(m_connsMutex);
while(m_conns.size() == 0 && m_nbConnsOnLoan == m_maxNbConns) {
m_connsCv.wait(lock);
m_connsCv.wait(locker);
}
if(m_conns.size() == 0) {
......@@ -99,7 +100,7 @@ void ConnPool::returnConn(std::unique_ptr<Conn> conn) {
// connection, if there is one, has been lost. Delete all the connections
// currently in the pool because their underlying TCP/IP connections may
// also have been lost.
std::unique_lock<std::mutex> lock(m_connsMutex);
threading::MutexLocker locker(m_connsMutex);
while(!m_conns.empty()) {
m_conns.pop_front();
}
......@@ -107,17 +108,17 @@ void ConnPool::returnConn(std::unique_ptr<Conn> conn) {
throw exception::Exception("Would have reached -1 connections on loan");
}
m_nbConnsOnLoan--;
m_connsCv.notify_one();
m_connsCv.signal();
return;
}
std::unique_lock<std::mutex> lock(m_connsMutex);
threading::MutexLocker locker(m_connsMutex);
if(0 == m_nbConnsOnLoan) {
throw exception::Exception("Would have reached -1 connections on loan");
}
m_nbConnsOnLoan--;
m_conns.push_back(std::move(conn));
m_connsCv.notify_one();
m_connsCv.signal();
// Else the connection is closed
} else {
......@@ -126,7 +127,7 @@ void ConnPool::returnConn(std::unique_ptr<Conn> conn) {
// connection, if there is one, has been lost. Delete all the connections
// currently in the pool because their underlying TCP/IP connections may
// also have been lost.
std::unique_lock<std::mutex> lock(m_connsMutex);
threading::MutexLocker locker(m_connsMutex);
while(!m_conns.empty()) {
m_conns.pop_front();
}
......@@ -134,7 +135,7 @@ void ConnPool::returnConn(std::unique_ptr<Conn> conn) {
throw exception::Exception("Would have reached -1 connections on loan");
}
m_nbConnsOnLoan--;
m_connsCv.notify_one();
m_connsCv.signal();
}
} catch(exception::Exception &ex) {
throw exception::Exception(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str());
......
......@@ -18,14 +18,14 @@
#pragma once
#include "Conn.hpp"
#include "ConnFactory.hpp"
#include "PooledConn.hpp"
#include "common/threading/CondVar.hpp"
#include "common/threading/Mutex.hpp"
#include "rdbms/Conn.hpp"
#include "rdbms/ConnFactory.hpp"
#include "rdbms/PooledConn.hpp"
#include <condition_variable>
#include <list>