Commit 9744b1ea authored by David COME's avatar David COME
Browse files

Threading utils have been moved to castor/server

parent 96c0142a
......@@ -24,75 +24,43 @@
#pragma once
#include "castor/tape/tapeserver/threading/Threading.hpp"
#include "castor/server/Threading.hpp"
namespace castor {
namespace tape {
namespace threading {
namespace server {
/**
* A helper class managing a thread safe message counter (we need it thread
* safe as the ClientInterface class will be used by both the getting of
* the work to be done and the reporting of the completed work, in parallel
* A helper class managing a thread safe message counter
* When C++11 will be used, just delete it to use std::atomic
*/
template <class T> struct AtomicCounter{
AtomicCounter(T init = 0): m_val(init) {};
T operator ++ () {
threading::MutexLocker ml(&m_mutex);
MutexLocker ml(&m_mutex);
return ++m_val;
}
T operator ++ (int) {
threading::MutexLocker ml(&m_mutex);
MutexLocker ml(&m_mutex);
return m_val++;
}
T operator -- () {
threading::MutexLocker ml(&m_mutex);
MutexLocker ml(&m_mutex);
return --m_val;
}
operator T() const {
threading::MutexLocker ml(&m_mutex);
MutexLocker ml(&m_mutex);
return m_val;
}
T getAndReset(){
threading::MutexLocker ml(&m_mutex);
MutexLocker ml(&m_mutex);
T old =m_val;
m_val=0;
return old;
}
private:
T m_val;
mutable threading::Mutex m_mutex;
mutable Mutex m_mutex;
};
template <class T> struct AtomicVariable{
operator T() const {
threading::MutexLocker ml(&m_mutex);
return m_val;
}
T operator=(const T& t){
threading::MutexLocker ml(&m_mutex);
m_val=t;
return t;
}
private:
T m_val;
mutable threading::Mutex m_mutex;
};
//A 1 way flag
struct AtomicFlag{
AtomicFlag(): m_set(false) {};
void set() {
threading::MutexLocker ml(&m_mutex);
m_set=true;
}
operator bool() const {
threading::MutexLocker ml(&m_mutex);
return m_set;
}
private:
bool m_set;
mutable threading::Mutex m_mutex;
};
}}}
}}
......@@ -21,29 +21,29 @@
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include <gtest/gtest.h>
#include "castor/tape/tapeserver/threading/Threading.hpp"
#include "castor/tape/tapeserver/threading/AtomicCounter.hpp"
#include "castor/server/Threading.hpp"
#include "castor/server/AtomicCounter.hpp"
namespace unitTest {
struct ThreadPlus : public castor::tape::threading::Thread{
ThreadPlus( castor::tape::threading::AtomicCounter<int>& c):count(c){}
struct ThreadPlus : public castor::server::Thread{
ThreadPlus( castor::server::AtomicCounter<int>& c):count(c){}
protected:
castor::tape::threading::AtomicCounter<int>& count;
castor::server::AtomicCounter<int>& count;
virtual void run (){
for(int i=0;i<100;++i)
++count;
}
};
struct ThreadMinus : public castor::tape::threading::Thread{
ThreadMinus( castor::tape::threading::AtomicCounter<int>& c):count(c){}
struct ThreadMinus : public castor::server::Thread{
ThreadMinus( castor::server::AtomicCounter<int>& c):count(c){}
protected:
castor::tape::threading::AtomicCounter<int>& count;
castor::server::AtomicCounter<int>& count;
virtual void run (){
for(int i=0;i<100;++i)
--count;
}
};
TEST(castor_tape_threading, AtomicCOunterTest) {
castor::tape::threading::AtomicCounter<int> c(42);
castor::server::AtomicCounter<int> c(42);
ThreadPlus t(c);
ThreadPlus t1(c);
......
/******************************************************************************
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/server/Threading.hpp"
namespace castor {
namespace server {
//A 1 way flag : Once set, it can be reset
struct AtomicFlag{
AtomicFlag(): m_set(false) {};
void set() {
MutexLocker ml(&m_mutex);
m_set=true;
}
operator bool() const {
MutexLocker ml(&m_mutex);
return m_set;
}
private:
bool m_set;
mutable Mutex m_mutex;
};
}}
\ No newline at end of file
......@@ -31,11 +31,11 @@
#include <queue>
#include <exception>
#include "Threading.hpp"
#include "castor/server/Threading.hpp"
#include "castor/server/Semaphores.hpp"
namespace castor {
namespace tape {
namespace threading {
namespace server {
/***
* This simple class provides a thread-safe blocking queue
......@@ -126,6 +126,4 @@ private:
};
} //end of threading
} //end of tape
} //end of castor
}}
\ No newline at end of file
......@@ -46,15 +46,19 @@ set (SERVER_LIB_SRC_FILES
NotifierThread.cpp
ProcessCap.cpp
Queue.cpp
Mutex.cpp
SelectProcessThread.cpp
SignalThreadPool.cpp
Semaphores.cpp
SmartCap.cpp
TCPListenerThreadPool.cpp
ThreadNotification.cpp
UDPListenerThreadPool.cpp)
UDPListenerThreadPool.cpp
Threading.cpp ChildProcess.cpp
)
add_library (castorserver SHARED ${SERVER_LIB_SRC_FILES})
CastorSetLibraryVersions (castorserver)
target_link_libraries (castorserver cap castorclient castorcommon)
target_link_libraries (castorserver cap castorclient castorcommon Exception)
install (TARGETS castorserver LIBRARY DESTINATION ${CASTOR_DEST_LIB_DIR}
NAMELINK_SKIP)
......
......@@ -26,7 +26,7 @@
#include <stdlib.h>
#include <sys/wait.h>
void castor::tape::threading::ChildProcess::start(Cleanup & cleanup) {
void castor::server::ChildProcess::start(Cleanup & cleanup) {
m_pid = fork();
if (!m_pid) {
/* We are the child process. Do our stuff and exit. */
......@@ -34,13 +34,13 @@ void castor::tape::threading::ChildProcess::start(Cleanup & cleanup) {
exit(run());
} else if (-1 == m_pid) {
/* We are in the parent process, for failed */
throw castor::exception::Errnum("Failed to fork a child process in castor::tape::threading::ChildProcess::ChildProcess()");
throw castor::exception::Errnum("Failed to fork a child process in castor::server::ChildProcess::ChildProcess()");
}
/* In parent process, child is OK. */
m_started = true;
}
void castor::tape::threading::ChildProcess::parseStatus(int status) {
void castor::server::ChildProcess::parseStatus(int status) {
if (WIFEXITED(status)) {
m_finished = true;
m_exited = true;
......@@ -51,7 +51,7 @@ void castor::tape::threading::ChildProcess::parseStatus(int status) {
}
}
bool castor::tape::threading::ChildProcess::running() {
bool castor::server::ChildProcess::running() {
/* Checking for a running process before starting gets an exception */
if (!m_started) throw ProcessNeverStarted();
/* If we are not aware of process exiting, let's check and collect exit code */
......@@ -60,33 +60,33 @@ bool castor::tape::threading::ChildProcess::running() {
int status, ret;
castor::exception::Errnum::throwOnMinusOne(
ret = waitpid(m_pid, &status, WNOHANG),
"Error from waitpid in castor::tape::threading::ChildProcess::running()");
"Error from waitpid in castor::server::ChildProcess::running()");
if (ret == m_pid) parseStatus(status);
}
return !m_finished;
}
void castor::tape::threading::ChildProcess::wait() {
void castor::server::ChildProcess::wait() {
/* Checking for a running process before starting gets an exception */
if (!m_started) throw ProcessNeverStarted();
if (m_finished) return;
int status, ret;
castor::exception::Errnum::throwOnMinusOne(
ret = waitpid(m_pid, &status, 0),
"Error from waitpid in castor::tape::threading::ChildProcess::wait()");
"Error from waitpid in castor::server::ChildProcess::wait()");
/* Check child status*/
if (ret == m_pid) parseStatus(status);
if(!m_finished)
throw castor::tape::Exception("Process did not exit after waitpid().");
}
int castor::tape::threading::ChildProcess::exitCode() {
int castor::server::ChildProcess::exitCode() {
if (!m_started) throw ProcessNeverStarted();
if (!m_finished) {
int status, ret;
castor::exception::Errnum::throwOnMinusOne(
ret = waitpid(m_pid, &status, WNOHANG),
"Error from waitpid in castor::tape::threading::ChildProcess::running()");
"Error from waitpid in castor::server::ChildProcess::running()");
if (ret == m_pid) parseStatus(status);
}
/* Check child status*/
......@@ -99,7 +99,7 @@ int castor::tape::threading::ChildProcess::exitCode() {
return m_exitCode;
}
void castor::tape::threading::ChildProcess::kill() {
void castor::server::ChildProcess::kill() {
if (!m_started) throw ProcessNeverStarted();
::kill(m_pid, SIGTERM);
}
......@@ -23,13 +23,13 @@
#pragma once
#include "castor/exception/Errnum.hpp"
#include "../exception/Exception.hpp"
#include "castor/tape/tapeserver/exception/Exception.hpp"
#include <unistd.h>
namespace castor {
namespace tape {
namespace threading {
namespace server {
/**
* A class allowing forking of a child process, and subsequent follow up
* of the child process. Status check, killing, return code collection.
......@@ -97,6 +97,4 @@ namespace threading {
virtual int run() = 0;
void parseStatus(int status);
};
} // namespace threading
} // namespace tape
} // namespace castor
}}
#include "castor/server/Mutex.hpp"
//------------------------------------------------------------------------------
//constructor
//------------------------------------------------------------------------------
castor::server::Mutex::Mutex() {
pthread_mutexattr_t attr;
castor::exception::Errnum::throwOnReturnedErrno(
pthread_mutexattr_init(&attr),
"Error from pthread_mutexattr_init in castor::server::Mutex::Mutex()");
castor::exception::Errnum::throwOnReturnedErrno(
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK),
"Error from pthread_mutexattr_settype in castor::server::Mutex::Mutex()");
castor::exception::Errnum::throwOnReturnedErrno(
pthread_mutex_init(&m_mutex, &attr),
"Error from pthread_mutex_init in castor::server::Mutex::Mutex()");
try {
castor::exception::Errnum::throwOnReturnedErrno(
pthread_mutexattr_destroy(&attr),
"Error from pthread_mutexattr_destroy in castor::server::Mutex::Mutex()");
} catch (...) {
pthread_mutex_destroy(&m_mutex);
throw;
}
}
//------------------------------------------------------------------------------
//destructor
//------------------------------------------------------------------------------
castor::server::Mutex::~Mutex() {
pthread_mutex_destroy(&m_mutex);
}
//------------------------------------------------------------------------------
//lock
//------------------------------------------------------------------------------
void castor::server::Mutex::lock() {
castor::exception::Errnum::throwOnReturnedErrno(
pthread_mutex_lock(&m_mutex),
"Error from pthread_mutex_lock in castor::server::Mutex::lock()");
}
//------------------------------------------------------------------------------
//unlock
//------------------------------------------------------------------------------
void castor::server::Mutex::unlock() {
castor::exception::Errnum::throwOnReturnedErrno(
pthread_mutex_unlock(&m_mutex),
"Error from pthread_mutex_unlock in castor::server::Mutex::unlock()");
}
/******************************************************************************
* Payload.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include <pthread.h>
#include <semaphore.h>
#include "castor/exception/Errnum.hpp"
#include "castor/tape/tapeserver/exception/Exception.hpp"
namespace castor {
namespace server {
/**
* A simple exception throwing wrapper for pthread mutexes.
* Inspired from the interface of Qt.
*/
class Mutex {
public:
Mutex() ;
~Mutex();
void lock() ;
void unlock();
private:
pthread_mutex_t m_mutex;
};
/**
* A simple scoped locker for mutexes. Highly recommended as
* the mutex will be released in all cases (exception, mid-code return, etc...)
* To use, simply instanciate and forget.
* @param m pointer to a Mutex instance
*/
class MutexLocker {
public:
MutexLocker(Mutex * m) :m_mutex(m) {m->lock();}
~MutexLocker() { try { m_mutex->unlock(); } catch (...) {} }
private:
Mutex * m_mutex;
};
}}
\ No newline at end of file
/******************************************************************************
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "Threading.hpp"
#include "castor/server/Semaphores.hpp"
#include "castor/server/Threading.hpp"
#include "castor/exception/Errnum.hpp"
#include "castor/exception/Exception.hpp"
#include <errno.h>
#include <typeinfo>
#include <stdlib.h>
#include <cxxabi.h>
#include "../../../../h/Cthread_api.h"
#include "castor/BaseObject.hpp"
/* Implmentations of the threading primitives */
/* Mutex */
//------------------------------------------------------------------------------
//constructor
//------------------------------------------------------------------------------
castor::tape::threading::Mutex::Mutex() {
pthread_mutexattr_t attr;
castor::exception::Errnum::throwOnReturnedErrno(
pthread_mutexattr_init(&attr),
"Error from pthread_mutexattr_init in castor::tape::threading::Mutex::Mutex()");
castor::exception::Errnum::throwOnReturnedErrno(
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK),
"Error from pthread_mutexattr_settype in castor::tape::threading::Mutex::Mutex()");
castor::exception::Errnum::throwOnReturnedErrno(
pthread_mutex_init(&m_mutex, &attr),
"Error from pthread_mutex_init in castor::tape::threading::Mutex::Mutex()");
try {
castor::exception::Errnum::throwOnReturnedErrno(
pthread_mutexattr_destroy(&attr),
"Error from pthread_mutexattr_destroy in castor::tape::threading::Mutex::Mutex()");
} catch (...) {
pthread_mutex_destroy(&m_mutex);
throw;
}
}
//------------------------------------------------------------------------------
//destructor
//------------------------------------------------------------------------------
castor::tape::threading::Mutex::~Mutex() {
pthread_mutex_destroy(&m_mutex);
}
//------------------------------------------------------------------------------
//lock
//------------------------------------------------------------------------------
void castor::tape::threading::Mutex::lock() {
castor::exception::Errnum::throwOnReturnedErrno(
pthread_mutex_lock(&m_mutex),
"Error from pthread_mutex_lock in castor::tape::threading::Mutex::lock()");
}
//------------------------------------------------------------------------------
//unlock
//------------------------------------------------------------------------------
void castor::tape::threading::Mutex::unlock() {
castor::exception::Errnum::throwOnReturnedErrno(
pthread_mutex_unlock(&m_mutex),
"Error from pthread_mutex_unlock in castor::tape::threading::Mutex::unlock()");
}
//------------------------------------------------------------------------------
//PosixSemaphore constructor
//------------------------------------------------------------------------------
castor::tape::threading::PosixSemaphore::PosixSemaphore(int initial)
castor::server::PosixSemaphore::PosixSemaphore(int initial)
{
castor::exception::Errnum::throwOnReturnedErrno(
sem_init(&m_sem, 0, initial),
"Error from sem_init in castor::tape::threading::PosixSemaphore::PosixSemaphore()");
"Error from sem_init in castor::server::PosixSemaphore::PosixSemaphore()");
}
//------------------------------------------------------------------------------
//PosixSemaphore destructor
//------------------------------------------------------------------------------
castor::tape::threading::PosixSemaphore::~PosixSemaphore() {
castor::server::PosixSemaphore::~PosixSemaphore() {
/* There is a danger of destroying the semaphore in the consumer
while the producer is still referring to the object.
This mutex prevents this from happening. (The release method locks it). */
......@@ -100,56 +26,56 @@ castor::tape::threading::PosixSemaphore::~PosixSemaphore() {
//------------------------------------------------------------------------------
//acquire
//------------------------------------------------------------------------------
void castor::tape::threading::PosixSemaphore::acquire()
void castor::server::PosixSemaphore::acquire()
{
int ret;
/* If we receive EINTR, we should just keep trying (signal interruption) */
while((ret = sem_wait(&m_sem)) && EINTR == errno) {}
/* If it was not EINTR, it's a failure */
castor::exception::Errnum::throwOnNonZero(ret,
"Error from sem_wait in castor::tape::threading::PosixSemaphore::acquire()");
"Error from sem_wait in castor::server::PosixSemaphore::acquire()");
}
//------------------------------------------------------------------------------
//tryAcquire
//------------------------------------------------------------------------------
bool castor::tape::threading::PosixSemaphore::tryAcquire()
bool castor::server::PosixSemaphore::tryAcquire()
{
int ret = sem_trywait(&m_sem);
if (!ret) return true;
if (ret && EAGAIN == errno) return false;
castor::exception::Errnum::throwOnNonZero(ret,
"Error from sem_trywait in castor::tape::threading::PosixSemaphore::tryAcquire()");
"Error from sem_trywait in castor::server::PosixSemaphore::tryAcquire()");
/* unreacheable, just for compiler happiness */
return false;
}
//------------------------------------------------------------------------------
//release
//------------------------------------------------------------------------------
void castor::tape::threading::PosixSemaphore::release(int n)
void castor::server::PosixSemaphore::release(int n)
{
for (int i=0; i<n; i++) {
MutexLocker ml(&m_mutexPosterProtection);
castor::exception::Errnum::throwOnNonZero(sem_post(&m_sem),
"Error from sem_post in castor::tape::threading::PosixSemaphore::release()");
"Error from sem_post in castor::server::PosixSemaphore::release()");
}
}
//------------------------------------------------------------------------------
//CondVarSemaphore constructor
//------------------------------------------------------------------------------
castor::tape::threading::CondVarSemaphore::CondVarSemaphore(int initial)
castor::server::CondVarSemaphore::CondVarSemaphore(int initial)
:m_value(initial) {
castor::exception::Errnum::throwOnReturnedErrno(
pthread_cond_init(&m_cond, NULL),
"Error from pthread_cond_init in castor::tape::threading::CondVarSemaphore::CondVarSemaphore()");
"Error from pthread_cond_init in castor::server::CondVarSemaphore::CondVarSemaphore()");
castor::exception::Errnum::throwOnReturnedErrno(
pthread_mutex_init(&m_mutex, NULL),
"Error from pthread_mutex_init in castor::tape::threading::CondVarSemaphore::CondVarSemaphore()");
"Error from pthread_mutex_init in castor::server::CondVarSemaphore::CondVarSemaphore()");
}
//------------------------------------------------------------------------------
//CondVarSemaphore destructor
//------------------------------------------------------------------------------
castor::tape::threading::CondVarSemaphore::~CondVarSemaphore() {
castor::server::CondVarSemaphore::~CondVarSemaphore() {
/* Barrier protecting the last user */
pthread_mutex_lock(&m_mutex);
pthread_mutex_unlock(&m_mutex);
......@@ -160,30 +86,30 @@ castor::tape::threading::CondVarSemaphore::~CondVarSemaphore() {
//------------------------------------------------------------------------------