diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..80f408ededabb8406b9751db415c62db69e4954f --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +nbproject/ diff --git a/CMakeLists.txt b/CMakeLists.txt index 02a2594604ad0de736589f5d5300a82dbd4723a6..2a474d40431a8aa0f3891c1de3740a67acc73bed 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,6 +7,11 @@ project(cta) set(CMAKE_C_FLAGS_RELWITHDEBINFO "-g") set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-g") +set(CMAKE_DISABLE_SOURCE_CHANGES ON) +set(CMAKE_DISABLE_IN_SOURCE_BUILD ON) +list(APPEND CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake) + add_subdirectory(libs) add_subdirectory(xroot_clients) add_subdirectory(xroot_plugins) +add_subdirectory(objectstore) diff --git a/cmake/FindProtobuf.cmake b/cmake/FindProtobuf.cmake new file mode 100644 index 0000000000000000000000000000000000000000..ecc6ff39a514b764b35d658e677cd606751b354b --- /dev/null +++ b/cmake/FindProtobuf.cmake @@ -0,0 +1,54 @@ +# From, https://raw.githubusercontent.com/Kitware/CMake/master/Modules/FindProtobuf.cmake +# cut down to solve our problem and nothing more +#============================================================================= +# Copyright 2009 Kitware, Inc. +# Copyright 2009-2011 Philip Lowman <philip@yhbt.com> +# Copyright 2008 Esben Mose Hansen, Ange Optimization ApS +# +# Distributed under the OSI-approved BSD License (the "License"); +# see accompanying file Copyright.txt for details. +# +# This software is distributed WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# See the License for more information. +#============================================================================= +# (To distribute this file outside of CMake, substitute the full +# License text for the above reference.) + +find_program(PROTOBUF_PROTOC_EXECUTABLE + NAMES protoc + DOC "The Google Protocol Buffers Compiler" +) + +message(STATUS "protoc is at ${PROTOBUF_PROTOC_EXECUTABLE} ") +function(PROTOBUF_GENERATE_CPP SRCS HDRS) + if(NOT ARGN) + message(SEND_ERROR "Error: PROTOBUF_GENERATE_CPP() called without any proto files") + return() + endif() + + set(_protobuf_include_path -I ${CMAKE_CURRENT_SOURCE_DIR}) + + set(${SRCS}) + set(${HDRS}) + foreach(FIL ${ARGN}) + get_filename_component(ABS_FIL ${FIL} ABSOLUTE) + get_filename_component(FIL_WE ${FIL} NAME_WE) + + list(APPEND ${SRCS} "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.pb.cc") + list(APPEND ${HDRS} "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.pb.h") + + add_custom_command( + OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.pb.cc" + "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.pb.h" + COMMAND ${PROTOBUF_PROTOC_EXECUTABLE} + ARGS --cpp_out ${CMAKE_CURRENT_BINARY_DIR} ${_protobuf_include_path} ${ABS_FIL} + DEPENDS ${ABS_FIL} + COMMENT "Running C++ protocol buffer compiler on ${FIL}" + VERBATIM ) + endforeach() + + set_source_files_properties(${${SRCS}} ${${HDRS}} PROPERTIES GENERATED TRUE) + set(${SRCS} ${${SRCS}} PARENT_SCOPE) + set(${HDRS} ${${HDRS}} PARENT_SCOPE) +endfunction() \ No newline at end of file diff --git a/objectstore/Agent.hpp b/objectstore/Agent.hpp new file mode 100644 index 0000000000000000000000000000000000000000..85de0ef4ecbcee57cb306e3f05008a7fafead67d --- /dev/null +++ b/objectstore/Agent.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include "ObjectStores.hpp" +#include "ContextHandle.hpp" + +class Agent { +public: + Agent(ObjectStore & os): m_objectStore(os) {}; + ~Agent() { + for (size_t i=0; i < c_handleCount; i++) { + m_contexts[i].release(); + } + } + void act() { + + } +private: + ObjectStore & m_objectStore; + static const size_t c_handleCount = 100; + ContextHandleImplementation<myOS> m_contexts[c_handleCount]; + ContextHandleImplementation<myOS> getFreeContext() { + for (size_t i=0; i < c_handleCount; i++) { + if (!m_contexts[i].isSet()) + return m_contexts[i]; + } + throw cta::exception::Exception("Could not find free context slot"); + } +}; \ No newline at end of file diff --git a/objectstore/AgentId.hpp b/objectstore/AgentId.hpp new file mode 100644 index 0000000000000000000000000000000000000000..88a0ad6179ecf0528bcc8c7a6bbf26eb59e62246 --- /dev/null +++ b/objectstore/AgentId.hpp @@ -0,0 +1,41 @@ +#pragma once + +#include <sstream> +#include <string> +#include <sys/syscall.h> +#include <ctime> + +class AgentId { +public: + AgentId(std::string agentType): m_nextId(0) { + std::stringstream aid; + // Get time + time_t now = time(0); + struct tm localNow; + localtime_r(&now, &localNow); + // Get hostname + char host[200]; + cta::exception::Errnum::throwOnMinusOne(gethostname(host, sizeof(host)), + "In AgentId::AgentId: failed to gethostname"); + // gettid is a safe system call (never fails) + aid << agentType << "-" << host << "-" << syscall(SYS_gettid) << "-" + << 1900 + localNow.tm_year + << std::setfill('0') << std::setw(2) << 1 + localNow.tm_mon + << localNow.tm_mday << "-" + << localNow.tm_hour << ":" + << localNow.tm_min << ":" + << localNow.tm_sec; + m_agentId = aid.str(); + } + std::string name() { + return m_agentId; + } + std::string nextId() { + std::stringstream id; + id << m_agentId << "-" << m_nextId++; + return id.str(); + } +private: + std::string m_agentId; + uint64_t m_nextId; +}; \ No newline at end of file diff --git a/objectstore/CMakeLists.txt b/objectstore/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..af8cb5030c509c543c35b73ac978730ce8eec268 --- /dev/null +++ b/objectstore/CMakeLists.txt @@ -0,0 +1,25 @@ +cmake_minimum_required (VERSION 2.6) + +include_directories(${CMAKE_CURRENT_SOURCE_DIR}) +include_directories(${CMAKE_BINARY_DIR}) + +find_package(Protobuf) + +set (CTAProtoFiles + cta.proto) + +PROTOBUF_GENERATE_CPP(CTAProtoSources CTAProtoHeaders ${CTAProtoFiles}) + +add_executable (tapeResourceMangerTest tapeResourceManagerTest.cpp + ${CTAProtoSources} + exception/Backtrace.cpp + exception/Errnum.cpp + exception/Exception.cpp + exception/strerror_r_wrapper.cpp + threading/ChildProcess.cpp + threading/Mutex.cpp + threading/Threading.cpp) + +target_link_libraries(tapeResourceMangerTest + protobuf + rados) diff --git a/objectstore/ContextHandle.hpp b/objectstore/ContextHandle.hpp new file mode 100644 index 0000000000000000000000000000000000000000..c32ec5903b8572fbe3f7e96df9d8dc32f8639a60 --- /dev/null +++ b/objectstore/ContextHandle.hpp @@ -0,0 +1,32 @@ +#pragma once + +#include "ObjectStores.hpp" + +template <class C> +class ContextHandleImplementation: public ContextHandle {}; + +template <> +class ContextHandleImplementation <ObjectStoreVFS>: public ContextHandle { +public: + ContextHandleImplementation(): m_set(false), m_fd(-1) {} + void set(int fd) { m_set = true; m_fd = fd; __sync_synchronize(); } + int get(int) { return m_fd; } + void reset() { m_set = false; m_fd = -1; __sync_synchronize(); } + bool isSet() { return m_set; } + void release() { __sync_synchronize(); if (m_set && -1 != m_fd) ::close(m_fd); reset(); } +private: + bool m_set; + volatile int m_fd; +}; + +template <> +class ContextHandleImplementation <ObjectStoreRados>: public ContextHandle { +public: + ContextHandleImplementation(): m_set(false) {} + void set(int) { m_set = true; } + void reset() { m_set = false; } + bool isSet() { return m_set; } + void release() { reset(); } +private: + bool m_set; +}; \ No newline at end of file diff --git a/objectstore/FIFO.hpp b/objectstore/FIFO.hpp new file mode 100644 index 0000000000000000000000000000000000000000..7342741cb0152b675c9e8b916dfba7fef769ac88 --- /dev/null +++ b/objectstore/FIFO.hpp @@ -0,0 +1,92 @@ +#pragma once + +#include "ObjectOps.hpp" +#include "exception/Exception.hpp" + +class FIFO: private ObjectOps<cta::objectstore::FIFO> { +public: + FIFO(ObjectStore & os, const std::string & name, ContextHandle & context): + ObjectOps<cta::objectstore::FIFO>(os, name), + m_locked(false) { + cta::objectstore::FIFO fs; + updateFromObjectStore(fs, context); + } + + void lock(ContextHandle & context) { + if (m_locked) throw cta::exception::Exception( + "In FIFO::lock: already locked"); + lockExclusiveAndRead(m_currentState, context); + m_locked = true; + } + + std::string peek() { + std::string ret; + if (!m_locked) throw cta::exception::Exception( + "In FIFO::pop: FIFO should be locked"); + return m_currentState.name(m_currentState.readpointer()); + return ret; + } + + void popAndUnlock(ContextHandle & context) { + if (!m_locked) throw cta::exception::Exception( + "In FIFO::pop: FIFO should be locked"); + m_currentState.set_readpointer(m_currentState.readpointer()+1); + if (m_currentState.readpointer() > 100) { + compactCurrentState(); + } + write(m_currentState); + unlock(context); + m_locked = false; + } + + void push(std::string name, ContextHandle & context) { + cta::objectstore::FIFO fs; + lockExclusiveAndRead(fs, context); + fs.add_name(name); + write(fs); + unlock(context); + } + + std::string dump(ContextHandle & context) { + cta::objectstore::FIFO fs; + updateFromObjectStore(fs, context); + std::stringstream ret; + ret<< "<<<< FIFO dump start" << std::endl + << "Read pointer=" << fs.readpointer() << std::endl + << "Array size=" << fs.name_size() << std::endl; + for (int i=fs.readpointer(); i<fs.name_size(); i++) { + ret << "name[phys=" << i << " ,log=" << i-fs.readpointer() + << "]=" << fs.name(i) << std::endl; + } + ret<< ">>>> FIFO dump end." << std::endl; + return ret.str(); + } + +private: + bool m_locked; + cta::objectstore::FIFO m_currentState; + + void compactCurrentState() { + uint64_t oldReadPointer = m_currentState.readpointer(); + uint64_t oldSize = m_currentState.name_size(); + // Copy the elements at position oldReadPointer + i to i (squash all) + // before the read pointer + for (int i = oldReadPointer; i<m_currentState.name_size(); i++) { + *m_currentState.mutable_name(i-oldReadPointer) = m_currentState.name(i); + } + // Shorten the name array by oldReadPointer elements + for (uint64_t i = 0; i < oldReadPointer; i++) { + m_currentState.mutable_name()->RemoveLast(); + } + // reset the read pointer + m_currentState.set_readpointer(0); + // Check the size is as expected + if ((uint64_t)m_currentState.name_size() != oldSize - oldReadPointer) { + std::stringstream err; + err << "In FIFO::compactCurrentState: wrong size after compaction: " + << "oldSize=" << oldSize << " oldReadPointer=" << oldReadPointer + << " newSize=" << m_currentState.name_size(); + throw cta::exception::Exception(err.str()); + } + } +}; \ No newline at end of file diff --git a/objectstore/JobPool.hpp b/objectstore/JobPool.hpp new file mode 100644 index 0000000000000000000000000000000000000000..f9eb086e3f889fa97348448605018598f1c62766 --- /dev/null +++ b/objectstore/JobPool.hpp @@ -0,0 +1,24 @@ +#pragma once + +#include "ObjectOps.hpp" +#include "FIFO.hpp" + +class JobPool: private ObjectOps<cta::objectstore::jobPool> { +public: + JobPool(ObjectStore & os, const std::string & name, ContextHandle & context): + ObjectOps<cta::objectstore::jobPool>(os, name) { + cta::objectstore::jobPool jps; + updateFromObjectStore(jps, context); + } + + void PostRecallJob (const std::string & string, ContextHandle & context) { + cta::objectstore::jobPool jps; + lockExclusiveAndRead(jps, context); + } + +private: + // The following functions are hidden from the user in order to provide + // higher level functionnality + //std::string get + +}; \ No newline at end of file diff --git a/objectstore/ObjectOps.hpp b/objectstore/ObjectOps.hpp new file mode 100644 index 0000000000000000000000000000000000000000..d91484bf0c639005d941930a1a8b0fe584d14d5d --- /dev/null +++ b/objectstore/ObjectOps.hpp @@ -0,0 +1,42 @@ +#pragma once + +#include "ObjectStores.hpp" + +template <class C> +class ObjectOps { +public: + ObjectOps(ObjectStore & os, const std::string & name):m_name(name), + m_objectStore(os) {} + + void updateFromObjectStore (C & val, ContextHandle & context) { + m_objectStore.lockShared(m_name, context); + std::string reStr = m_objectStore.read(m_name); + m_objectStore.unlock(m_name, context); + val.ParseFromString(reStr); + } + + void lockExclusiveAndRead (C & val, ContextHandle & context) { + m_objectStore.lockExclusive(m_name, context); + // Re-read to get latest version (lock upgrade could be useful here) + std::string reStr = m_objectStore.read(m_name); + cta::objectstore::RootEntry res; + val.ParseFromString(reStr); + } + + void write (C & val) { + m_objectStore.atomicOverwrite(m_name, val.SerializeAsString()); + } + + void unlock (ContextHandle & context) { + // release the lock, and return the register name + m_objectStore.unlock(m_name, context); + } + + template <class C2> + void writeChild (const std::string & name, C2 & val) { + m_objectStore.atomicOverwrite(name, val.SerializeAsString()); + } +private: + std::string m_name; + ObjectStore & m_objectStore; +}; \ No newline at end of file diff --git a/objectstore/ObjectStores.hpp b/objectstore/ObjectStores.hpp new file mode 100644 index 0000000000000000000000000000000000000000..ae175452a9abba12dcc8ed37a668268d4a5237d1 --- /dev/null +++ b/objectstore/ObjectStores.hpp @@ -0,0 +1,281 @@ +#pragma once + +#include <string> +#include <errno.h> +#include <stdlib.h> +#include <ftw.h> +#include <fcntl.h> +#include <stdio.h> +#include <sys/file.h> +#include <sys/syscall.h> +#include <fstream> +#include <rados/librados.hpp> + +class ContextHandle { +public: + template <typename T> + void set(T); + template <typename T> + T get(T); + void reset(); + bool isSet(); + void release(); +}; + +class ObjectStore { +public: + virtual ~ObjectStore() {} + virtual void atomicOverwrite(std::string name, std::string content) = 0; + virtual std::string read(std::string name) = 0; + virtual void lockShared(std::string name, ContextHandle & context) = 0; + virtual void lockExclusive(std::string name, ContextHandle & context) = 0; + virtual void unlock(std::string name, ContextHandle & context) = 0; + virtual std::string path() { return ""; } + virtual std::string user() { return ""; } + virtual std::string pool() { return ""; } +}; + + + +// An implementation of the object store primitives, using flock to lock, +// so that several threads can compete for the same file (locks are per file +// descriptor). The lock will be done on a special file (XXXX.lock), which will +// not be moved (as opposed to the content itself, which needs to be moved to +// be atomically created) +// The context here is a pointer to an element of a per thread int array, that will +// hold the file descriptors. This int is chosen for containing -1 (free). +// When the thread is killed, the destructor will be able to safely close +// all the file descriptors in the array. +// The fd will be deleted from the array before calling close. +// This will be a fd leak source, but this might be good enough for this test's +// purpose. +class ObjectStoreVFS: public ObjectStore { +public: + ObjectStoreVFS():m_deleteOnExit(true) { + // Create the directory for storage + char tplt[] = "/tmp/jobStoreVFSXXXXXX"; + mkdtemp(tplt); + // If template is an empty string, we failed, otherwise, we're fine. + if (*tplt) { + m_root = tplt; + } else { + throw cta::exception::Errnum("Failed to create temporary directory"); + } + } + ObjectStoreVFS(std::string path, std::string user, std::string pool): + m_root(path), m_deleteOnExit(false) {} + virtual ~ObjectStoreVFS() { + if (m_deleteOnExit) { + // Delete the created directories recursively + nftw (m_root.c_str(), deleteFileOrDirectory, 100, FTW_DEPTH); + } + } + virtual std::string path() { + return m_root; + } + virtual void atomicOverwrite(std::string name, std::string content) { + // When entering here, we should hold an exclusive lock on the *context + // file descriptor. We will create a new file, lock it immediately exclusively, + // create the new content in it, move it over the old file, and close the *context + // file descriptor. + std::string tempPath = m_root+"/." + name + ".pre-overwrite"; + std::string targetPath = m_root+"/" + name; + // Create the new version of the file, make sure it's visible, lock it. + int fd= ::creat(tempPath.c_str(), S_IRWXU); + cta::exception::Errnum::throwOnMinusOne(fd, + "In ObjectStoreVFS::atomicOverwrite, failed to creat the pre-overwrite file"); + cta::exception::Errnum::throwOnMinusOne( + ::write(fd, content.c_str(), content.size()), + "In ObjectStoreVFS::atomicOverwrite, failed to write to the pre-overwrite file"); + cta::exception::Errnum::throwOnMinusOne(::close(fd), + "In ObjectStoreVFS::atomicOverwrite, failed to close the pre-overwrite file"); + cta::exception::Errnum::throwOnMinusOne( + ::rename(tempPath.c_str(), targetPath.c_str()), + "In ObjectStoreVFS::atomicOverwrite, failed to rename the file"); + } + virtual std::string read(std::string name) { + std::string path = m_root+"/" + name; + std::string ret; + std::ifstream file(path.c_str()); + if (!file) { + throw cta::exception::Errnum( + std::string("In ObjectStoreVFS::read, failed to open file for read: ") + + path); + } + char buff[200]; + while(!file.eof()) { + file.read(buff, sizeof(buff)); + ret.append(buff,file.gcount()); + } + return ret; + } + + void lockHelper(std::string name, ContextHandle & context, int type) { + std::string path = m_root + "/" + name + ".lock"; + context.set(::open(path.c_str(), O_CREAT, S_IRWXU)); + cta::exception::Errnum::throwOnMinusOne(context.get(0), + "In ObjectStoreVFS::lockHelper, failed to open the lock file."); + cta::exception::Errnum::throwOnMinusOne(::flock(context.get(0), LOCK_EX), + "In ObjectStoreVFS::lockHelper, failed to flock the lock file."); + } + + virtual void lockExclusive(std::string name, ContextHandle & context) { + lockHelper(name, context, LOCK_EX); + } + + virtual void lockShared(std::string name, ContextHandle & context) { + lockHelper(name, context, LOCK_SH); + } + + virtual void unlock(std::string name, ContextHandle & context) { + ::flock(context.get(0), LOCK_UN); + int fd= context.get(0); + context.reset(); + ::close(fd); + } + + + + +private: + std::string m_root; + bool m_deleteOnExit; + // Utility function to be used in the destructor + static int deleteFileOrDirectory(const char *fpath, const struct stat *sb, + int tflag, struct FTW *ftwbuf) { + switch (tflag) { + case FTW_D: + case FTW_DNR: + case FTW_DP: + rmdir (fpath); + break; + default: + unlink (fpath); + break; + } + return 0; + } +}; + +class ObjectStoreRados: public ObjectStore { +public: + ObjectStoreRados(std::string path, std::string userId, std::string pool): + m_user(userId), m_pool(pool), m_cluster(), m_radosCtx() { + cta::exception::Errnum::throwOnNonZero(m_cluster.init(userId.c_str()), + "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.init"); + try { + cta::exception::Errnum::throwOnNonZero(m_cluster.conf_read_file(NULL), + "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.conf_read_file"); + cta::exception::Errnum::throwOnNonZero(m_cluster.conf_parse_env(NULL), + "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.conf_read_file"); + cta::exception::Errnum::throwOnNonZero(m_cluster.connect(), + "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.connect"); + cta::exception::Errnum::throwOnNonZero(m_cluster.ioctx_create(pool.c_str(), m_radosCtx), + "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.ioctx_create"); + } catch (...) { + m_cluster.shutdown(); + throw; + } + } + virtual ~ObjectStoreRados() { + m_radosCtx.close(); + m_cluster.shutdown(); + } + virtual std::string user() { + return m_user; + } + virtual std::string pool() { + return m_pool; + } + virtual void atomicOverwrite(std::string name, std::string content) { + ceph::bufferlist bl; + bl.append(content.c_str(), content.size()); + cta::exception::Errnum::throwOnNonZero(m_radosCtx.write_full(name, bl), + std::string("In ObjectStoreRados::atomicOverwrite, failed to write_full ") + + name); + } + + virtual std::string read(std::string name) { + std::string ret; + uint64_t size; + time_t time; + cta::exception::Errnum::throwOnNonZero(m_radosCtx.stat(name, &size, &time), + std::string("In ObjectStoreRados::read, failed to stat ") + + name); + librados::bufferlist bl; + cta::exception::Errnum::throwOnNegative(m_radosCtx.read(name, bl, size, 0), + std::string("In ObjectStoreRados::read, failed to read ") + + name); + bl.copy(0, size, ret); + return ret; + } + + + virtual void lockExclusive(std::string name, ContextHandle & context) { + // Build a unique client name: host:thread + char buff[200]; + cta::exception::Errnum::throwOnMinusOne(gethostname(buff, sizeof(buff)), + "In ObjectStoreRados::lockExclusive: failed to gethostname"); + pid_t tid = syscall(SYS_gettid); + std::stringstream client; + client << buff << ":" << tid; + struct timeval tv; + tv.tv_usec = 0; + tv.tv_sec = 60; + int rc; + do { + rc=m_radosCtx.lock_exclusive(name, "lock", client.str(), "", &tv, 0); + } while (-EBUSY==rc); + if (0!=rc) { + std::cout << "Oh-oh, rc=" << rc << std::endl; + } + cta::exception::Errnum::throwOnNonZero(rc, + std::string("In ObjectStoreRados::lockExclusive, failed to lock_exclusive ")+ + name); + } + + + virtual void lockShared(std::string name, ContextHandle & context) { + // Build a unique client name: host:thread + char buff[200]; + cta::exception::Errnum::throwOnMinusOne(gethostname(buff, sizeof(buff)), + "In ObjectStoreRados::lockExclusive: failed to gethostname"); + pid_t tid = syscall(SYS_gettid); + std::stringstream client; + client << buff << ":" << tid; + struct timeval tv; + tv.tv_usec = 0; + tv.tv_sec = 60; + int rc; + do { + rc=m_radosCtx.lock_shared(name, "lock", client.str(), "", "", &tv, 0); + } while (-EBUSY==rc); + if (0!=rc) { + std::cout << "Oh-oh, rc=" << rc << std::endl; + } + cta::exception::Errnum::throwOnNonZero(rc, + std::string("In ObjectStoreRados::lockShared, failed to lock_shared ")+ + name); + } + + virtual void unlock(std::string name, ContextHandle & context) { + // Build a unique client name: host:thread + char buff[200]; + cta::exception::Errnum::throwOnMinusOne(gethostname(buff, sizeof(buff)), + "In ObjectStoreRados::lockExclusive: failed to gethostname"); + pid_t tid = syscall(SYS_gettid); + std::stringstream client; + client << buff << ":" << tid; + cta::exception::Errnum::throwOnNonZero( + m_radosCtx.unlock(name, "lock", client.str()), + std::string("In ObjectStoreRados::lockExclusive, failed to lock_exclusive ")+ + name); + } + + +private: + std::string m_user; + std::string m_pool; + librados::Rados m_cluster; + librados::IoCtx m_radosCtx; +}; \ No newline at end of file diff --git a/objectstore/ObjectStructureDumper.hpp b/objectstore/ObjectStructureDumper.hpp new file mode 100644 index 0000000000000000000000000000000000000000..fbdba068a5c185d4bbef6e736ec0557108cc6e88 --- /dev/null +++ b/objectstore/ObjectStructureDumper.hpp @@ -0,0 +1,21 @@ +#pragma once + +#include "RootEntry.hpp" +#include "Register.hpp" +#include <iostream> + +class ObjectStrucutreDumper { +public: + std::string dump(ObjectStore & os, ContextHandle & context) { + std::stringstream ret; + ret << "<< Structure dump start" << std::endl; + RootEntry re(os, context); + ret << re.dump(context); + try { + Register ar(os, re.getAgentRegister(context), context); + ret << ar.dump("root->agentRegister", context); + } catch (RootEntry::NotAllocatedEx &) {} + ret << ">> Structure dump end" << std::endl; + return ret.str(); + } +}; \ No newline at end of file diff --git a/objectstore/Register.hpp b/objectstore/Register.hpp new file mode 100644 index 0000000000000000000000000000000000000000..3e22932e7e868f5908f26bcb160656284d317d8c --- /dev/null +++ b/objectstore/Register.hpp @@ -0,0 +1,65 @@ +#pragma once + +#include "ObjectOps.hpp" +#include <algorithm> + +class Register: private ObjectOps<cta::objectstore::Register> { +public: + Register(ObjectStore & os, const std::string & name, ContextHandle & context): + ObjectOps<cta::objectstore::Register>(os, name) { + // Check that the entry is present and readable (depending on implementation + // of object store, locking might or might not succeed) + cta::objectstore::Register rs; + updateFromObjectStore(rs, context); + } + + void addElement (std::string name, ContextHandle & context) { + cta::objectstore::Register rs; + lockExclusiveAndRead(rs, context); + rs.add_elements(name); + write(rs); + unlock(context); + } + + void removeElement (const std::string & name, ContextHandle & context) { + cta::objectstore::Register rs; + lockExclusiveAndRead(rs, context); + bool found; + do { + found = false; + for (int i=0; i<rs.elements_size(); i++) { + if (name == rs.elements(i)) { + found = true; + rs.mutable_elements()->SwapElements(i, rs.elements_size()-1); + rs.mutable_elements()->RemoveLast(); + break; + } + } + } while (found); + write(rs); + unlock(context); + } + + std::list<std::string> getElements(ContextHandle & context) { + cta::objectstore::Register rs; + updateFromObjectStore(rs, context); + std::list<std::string> ret; + for (int i=0; i<rs.elements_size(); i++) { + ret.push_back(rs.elements(i)); + } + return ret; + } + + std::string dump(const std::string & title, ContextHandle & context) { + cta::objectstore::Register rs; + updateFromObjectStore(rs, context); + std::stringstream ret; + ret<< "<<<< Register " << title << " dump start" << std::endl + << "Array size=" << rs.elements_size() << std::endl; + for (int i=0; i<rs.elements_size(); i++) { + ret << "element[" << i << "]=" << rs.elements(i) << std::endl; + } + ret<< ">>>> Register " << title << " dump end" << std::endl; + return ret.str(); + } +}; \ No newline at end of file diff --git a/objectstore/RootEntry.hpp b/objectstore/RootEntry.hpp new file mode 100644 index 0000000000000000000000000000000000000000..8e52936401522ada2b6914b4c5fcc4d3e7b517bf --- /dev/null +++ b/objectstore/RootEntry.hpp @@ -0,0 +1,166 @@ +#pragma once + +#include <google/protobuf/text_format.h> +#include "objectstore/cta.pb.h" + +#include "ObjectStores.hpp" +#include "ObjectOps.hpp" + +class RootEntry: private ObjectOps<cta::objectstore::RootEntry> { +public: + // Initializer. + static void init(ObjectStore & os) { + // check existence of root entry before creating it. We expect read to fail. + try { + os.read(s_rootEntryName); + throw cta::exception::Exception("In RootEntry::init: root entry already exists"); + } catch (...) {} + cta::objectstore::RootEntry res; + os.atomicOverwrite(s_rootEntryName, res.SerializeAsString()); + } + // construtor, when the backend store exists. + // Checks the existence and correctness of the root entry + RootEntry(ObjectStore & os, ContextHandle & context): + ObjectOps<cta::objectstore::RootEntry>(os, s_rootEntryName) { + // Check that the root entry is readable. + cta::objectstore::RootEntry res; + updateFromObjectStore(res, context); + } + + class NotAllocatedEx: public cta::exception::Exception { + public: + NotAllocatedEx(const std::string & context): cta::exception::Exception(context) {} + }; + + // Get the name of the agent register (or exception if not available) + std::string getAgentRegister(ContextHandle & context) { + // Check if the agent register exists + cta::objectstore::RootEntry res; + updateFromObjectStore(res, context); + // If the registry is defined, return it, job done. + if (res.agentregister().size()) + return res.agentregister(); + throw NotAllocatedEx("In RootEntry::getAgentRegister: agentRegister not yet allocated"); + } + + // Get the name of a (possibly freshly created) agent register + std::string allocateOrGetAgentRegister(ContextHandle & context, std::string agentRegistryId) { + // Check if the agent register exists + try { + return getAgentRegister(context); + } catch (NotAllocatedEx &) { + // If we get here, the agent register is not created yet, so we have to do it: + // lock the entry again, for writing + cta::objectstore::RootEntry res; + lockExclusiveAndRead(res, context); + // If the registry is already defined, somebody was faster. We're done. + if (res.agentregister().size()) { + unlock(context); + return res.agentregister(); + } + // We will really create the register + // decide on the object's name + std::string arName ("agentRegister-"); + arName += agentRegistryId; + // Record the agent in the intent log + res.add_agentregisterintentlog(arName); + // Commit the intents + write(res); + // The potential object can now be garbage collected if we die from here. + // Create the object, then lock. The name should be unique, so no race. + cta::objectstore::Register ars; + writeChild(arName, ars); + // If we lived that far, we can update the root entry to point to our + // new agent register, and remove the name from the intent log. + res.set_agentregister(arName); + res.mutable_agentregisterintentlog()->RemoveLast(); + write(res); + // release the lock, and return the register name + unlock(context); + return arName; + } + } + + // Get the name of the JobPool (or exception if not available) + std::string getJobPool(ContextHandle & context) { + // Check if the job pool exists + cta::objectstore::RootEntry res; + updateFromObjectStore(res, context); + // If the registry is defined, return it, job done. + if (res.jobpool().size()) + return res.jobpool(); + throw NotAllocatedEx("In RootEntry::getJobPool: jobPool not yet allocated"); + } + + // Get the name of a (possibly freshly created) job pool + std::string allocateOrGetJobPool(ContextHandle & context, std::string jobPoolId) { + // Check if the job pool exists + try { + return getJobPool(context); + } catch (NotAllocatedEx &) { + // If we get here, the job pool is not created yet, so we have to do it: + // lock the entry again, for writing + cta::objectstore::RootEntry res; + lockExclusiveAndRead(res, context); + // If the registry is already defined, somebody was faster. We're done. + if (res.jobpool().size()) { + unlock(context); + return res.jobpool(); + } + // We will really create the register + // decide on the object's name + std::string jpName ("jobPool-"); + jpName += jobPoolId; + // Record the agent in the intent log + res.add_jobpoolintentlog(jpName); + // Commit the intents + write(res); + // The potential object can now be garbage collected if we die from here. + // Create the object, then lock. The name should be unique, so no race. + cta::objectstore::jobPool jps; + jps.set_migration(""); + jps.set_recall(""); + writeChild(jpName, jps); + // If we lived that far, we can update the root entry to point to our + // new agent register, and remove the name from the intent log. + res.set_jobpool(jpName); + res.mutable_jobpoolintentlog()->RemoveLast(); + write(res); + // release the lock, and return the register name + unlock(context); + return jpName; + } + } + + // Dump the root entry + std::string dump (ContextHandle & context) { + std::stringstream ret; + cta::objectstore::RootEntry res; + updateFromObjectStore(res, context); + ret << "<<<< Root entry dump start" << std::endl; + if (res.has_agentregister()) ret << "agentRegister=" << res.agentregister() << std::endl; + for (int i=0; i<res.agentregisterintentlog_size(); i++) { + ret << "agentRegisterIntentLog=" << res.agentregisterintentlog(i) << std::endl; + } + if (res.has_jobpool()) ret << "jobPool=" << res.jobpool() << std::endl; + for (int i=0; i<res.jobpoolintentlog_size(); i++) { + ret << "jobPoolIntentLog=" << res.jobpoolintentlog(i) << std::endl; + } + if (res.has_driveregister()) ret << "driveRegister=" << res.driveregister() << std::endl; + for (int i=0; i<res.driveregisterintentlog_size(); i++) { + ret << "driveRegisterIntentLog=" << res.driveregisterintentlog(i) << std::endl; + } + if (res.has_taperegister()) ret << "tapeRegister=" << res.taperegister() << std::endl; + for (int i=0; i<res.taperegisterintentlog_size(); i++) { + ret << "tapeRegisterIntentLog=" << res.taperegisterintentlog(i) << std::endl; + } + ret << ">>>> Root entry dump start" << std::endl; + return ret.str(); + } + + private: + static const std::string s_rootEntryName; +}; + +const std::string RootEntry::s_rootEntryName("root"); + diff --git a/objectstore/cta.proto b/objectstore/cta.proto new file mode 100644 index 0000000000000000000000000000000000000000..7aae95cf394ff7255ab164db7e81e4f5fc6d5c4e --- /dev/null +++ b/objectstore/cta.proto @@ -0,0 +1,84 @@ +package cta.objectstore; + +// The root entry: it point to an agent register and a job pool register. +// There is an integrated intent log, but the updates should be very rare (besides first registers creation). +message RootEntry { + optional string agentregister = 1; + repeated string agentregisterintentlog = 2; + optional string jobpool = 3; + repeated string jobpoolintentlog = 4; + optional string driveregister = 5; + repeated string driveregisterintentlog = 6; + optional string taperegister = 7; + repeated string taperegisterintentlog = 8; +} + +// The registers (simple name arrays) +message Register { + repeated string elements = 100; +} + +// A basic FIFO +// poping is done by increasing the read pointer, and from time to time +// collapsing the name array. +// There is no write pointer because we always append at the end of the name +// array. +message FIFO { + required uint64 readPointer = 200; + repeated string name = 201; +} + +// The agents's elements: +message ObjectCreationIntent { + required string type = 1101; + required string name = 1102; + required string container = 1103; +} + +message ObjectOwnershipIntent { + required string type = 1111; + required string name = 1112; +} + +// The agent (session) record. +message Agent { + required string name = 1000; + repeated ObjectCreationIntent creationintent = 1001; + repeated ObjectOwnershipIntent ownershipintent = 1002; +} + +// The tape record +message Tape { + required string type = 2001; + required string format = 2002; + required string vid = 2003; + required uint64 maxFseq = 2004; + required string status = 2005 ; +} + +// The drive record +message Drive { + required string name = 3001; + required string status = 3002; +} + +// The jobs +message MigrationJob { + required string owner = 4001; + required string status = 4002; + required string source = 4003; + required string destination = 4004; +} + +message RecallJob { + required string owner = 5001; + required string status = 5002; + required string source = 5003; + required string destination = 5004; +} + +// The job pool +message jobPool { + required string migration = 7001; + required string recall = 7002; +} diff --git a/objectstore/exception/Backtrace.cpp b/objectstore/exception/Backtrace.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e146b0fbd434250bc597bf417e294e6768914389 --- /dev/null +++ b/objectstore/exception/Backtrace.cpp @@ -0,0 +1,170 @@ +/****************************************************************************** + * + * This file is part of the Castor project. + * See http://castor.web.cern.ch/castor + * + * Copyright (C) 2003 CERN + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * + * + * + * @author Castor Dev team, castor-dev@cern.ch + *****************************************************************************/ + +#include <execinfo.h> +#include <cxxabi.h> +#include <stdlib.h> +#include "Backtrace.hpp" + +#ifdef COLLECTEXTRABACKTRACEINFOS +#include <bfd.h> +#include <sstream> +namespace castor { + namespace exception { + class bfdContext { + public: + bfdContext(); + ~bfdContext(); + std::string collectExtraInfos(const std::string &address); + private: + class mutex { + public: + mutex() { pthread_mutex_init(&m_mutex, NULL); } + void lock() { pthread_mutex_lock(&m_mutex); } + void unlock() { pthread_mutex_unlock(&m_mutex); } + private: + pthread_mutex_t m_mutex; + }; + mutex m_mutex; + bfd* m_abfd; + asymbol **m_syms; + asection *m_text; + }; + } +} + +// code dedicated to extracting more information in the backtraces, typically +// resolving line numbers from adresses +// This code is compiled only in debug mode (where COLLECTEXTRABACKTRACEINFOS will be defined) +// as it's pretty heavy +cta::exception::bfdContext::bfdContext(): +m_abfd(NULL), m_syms(NULL), m_text(NULL) +{ + char ename[1024]; + int l = readlink("/proc/self/exe",ename,sizeof(ename)); + if (l != -1) { + ename[l] = 0; + bfd_init(); + m_abfd = bfd_openr(ename, 0); + if (m_abfd) { + /* oddly, this is required for it to work... */ + bfd_check_format(m_abfd,bfd_object); + unsigned storage_needed = bfd_get_symtab_upper_bound(m_abfd); + m_syms = (asymbol **) malloc(storage_needed); + bfd_canonicalize_symtab(m_abfd, m_syms); + m_text = bfd_get_section_by_name(m_abfd, ".text"); + } + } +} + +cta::exception::bfdContext::~bfdContext() { + free (m_syms); + /* According the bfd documentation, closing the bfd frees everything */ + m_text=NULL; + bfd_close(m_abfd); +} + +std::string cta::exception::bfdContext::collectExtraInfos(const std::string& address) { + std::ostringstream result; + m_mutex.lock(); + if (m_abfd && m_text && m_syms) { + std::stringstream ss; + long offset; + ss << std::hex << address; + ss >> offset; + offset -= m_text->vma; + if (offset > 0) { + const char *file; + const char *func; + unsigned line; + if (bfd_find_nearest_line(m_abfd, m_text, m_syms, offset, &file, &func, &line) + && file) { + int status(-1); + char * demangledFunc = abi::__cxa_demangle(func, NULL, NULL, &status); + result << "at " << file << ":" << line << " (" << address << ")"; + free (demangledFunc); + } + } + } + m_mutex.unlock(); + return result.str(); +} + +namespace cta { + namespace exception { + bfdContext g_bfdContext; + } +} +#endif // COLLECTEXTRABACKTRACEINFOS + +cta::exception::Backtrace::Backtrace(bool fake): m_trace() { + if (fake) return; + void * array[200]; + g_lock.lock(); + size_t depth = ::backtrace(array, sizeof(array)/sizeof(void*)); + g_lock.unlock(); + char ** strings = ::backtrace_symbols(array, depth); + + if (!strings) + m_trace = ""; + else { + for (size_t i=0; i<depth; i++) { + std::string line(strings[i]); + /* Demangle the c++, if possible. We expect the c++ function name's to live + * between a '(' and a + + * line format: /usr/lib/somelib.so.1(_Mangle2Mangle3Ev+0x123) [0x12345] */ + if ((std::string::npos != line.find("(")) && (std::string::npos != line.find("+"))) { + std::string before, theFunc, after, addr; + before = line.substr(0, line.find("(")+1); + theFunc = line.substr(line.find("(")+1, line.find("+")-line.find("(")-1); + after = line.substr(line.find("+"), line.find("[")-line.find("+")+1); + addr = line.substr(line.find("[")+1, line.find("]")-line.find("[")-1); + int status(-1); + char * demangled = abi::__cxa_demangle(theFunc.c_str(), NULL, NULL, &status); + if (0 == status) { + m_trace += before; + m_trace += demangled; + m_trace += after; +#ifdef COLLECTEXTRABACKTRACEINFOS + m_trace += g_bfdContext.collectExtraInfos(addr); +#else + m_trace += addr; +#endif // COLLECTEXTRABACKTRACEINFOS + m_trace += "]"; + } else { + m_trace += strings[i]; + } + m_trace += "\n"; + free(demangled); + } else { + m_trace += strings[i]; + m_trace += "\n"; + } + } + free (strings); + } +} + +/* Implementation of the singleton lock */ +cta::exception::Backtrace::mutex cta::exception::Backtrace::g_lock; diff --git a/objectstore/exception/Backtrace.hpp b/objectstore/exception/Backtrace.hpp new file mode 100644 index 0000000000000000000000000000000000000000..4191f5cae92943a908d9a5d4d02f652a2ac80eaf --- /dev/null +++ b/objectstore/exception/Backtrace.hpp @@ -0,0 +1,53 @@ +/****************************************************************************** + * + * This file is part of the Castor project. + * See http://castor.web.cern.ch/castor + * + * Copyright (C) 2003 CERN + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * @author Castor Dev team, castor-dev@cern.ch + *****************************************************************************/ + +#pragma once + +#include <string> +#include <pthread.h> + +namespace cta { + namespace exception { + class Backtrace { + public: + Backtrace(bool fake=false); + operator std::string() const { return m_trace; } + Backtrace& operator= (const Backtrace& bt) { m_trace = bt.m_trace; return *this; } + private: + std::string m_trace; + /** + * Singleton lock around the apparently racy backtrace(). + * We write it with no error check as it's used only here. + * We need a class in order to have a constructor for the global object. + */ + class mutex { + public: + mutex() { pthread_mutex_init(&m_mutex, NULL); } + void lock() { pthread_mutex_lock(&m_mutex); } + void unlock() { pthread_mutex_unlock(&m_mutex); } + private: + pthread_mutex_t m_mutex; + }; + static mutex g_lock; + }; + } +} + diff --git a/objectstore/exception/Errnum.cpp b/objectstore/exception/Errnum.cpp new file mode 100644 index 0000000000000000000000000000000000000000..6a61ed65417e901653f20a553bd3a21c3fffbabd --- /dev/null +++ b/objectstore/exception/Errnum.cpp @@ -0,0 +1,85 @@ +/****************************************************************************** + * + * This file is part of the Castor project. + * See http://castor.web.cern.ch/castor + * + * Copyright (C) 2003 CERN + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * + * + * + * @author Castor Dev team, castor-dev@cern.ch + *****************************************************************************/ + +#include "exception/Errnum.hpp" +#include "exception/strerror_r_wrapper.h" +#include <string.h> +#include <errno.h> + +using namespace cta::exception; + +Errnum::Errnum(std::string what):Exception("") { + m_errnum = errno; + ErrnumConstructorBottomHalf(what); +} + +Errnum::Errnum(int err, std::string what):Exception("") { + m_errnum = err; + ErrnumConstructorBottomHalf(what); +} + +void Errnum::ErrnumConstructorBottomHalf(const std::string & what) { + char buf[100]; + + if(int ret = strerror_r_wrapper(m_errnum, buf, sizeof(buf))) { + // sstrerror_r() failed + const int new_errno = errno; + std::stringstream w; + w << "Errno=" << m_errnum << ". In addition, failed to read the corresponding error string (sstrerror_r gave errno=" + << new_errno << ")"; + m_strerror = w.str(); + } else { + // sstrerror_r() succeeded + m_strerror = buf; + } + std::stringstream w2; + if (what.size()) + w2 << what << " "; + w2 << "Errno=" << m_errnum << ": " << m_strerror; + getMessage().str(w2.str()); +} + +void Errnum::throwOnReturnedErrno (int err, std::string context) { + if (err) throw Errnum(err, context); +} + +void Errnum::throwOnNonZero(int status, std::string context) { + if (status) throw Errnum(context); +} + +void Errnum::throwOnZero(int status, std::string context) { + if (!status) throw Errnum(context); +} + +void Errnum::throwOnNull(void * f, std::string context) { + if (NULL == f) throw Errnum(context); +} + +void Errnum::throwOnNegative(int ret, std::string context) { + if (ret < 0) throw Errnum(context); +} + +void Errnum::throwOnMinusOne(int ret, std::string context) { + if (-1 == ret) throw Errnum(context); +} diff --git a/objectstore/exception/Errnum.hpp b/objectstore/exception/Errnum.hpp new file mode 100644 index 0000000000000000000000000000000000000000..08aca0321f1e48e91addf97409cbb716bc61e7c5 --- /dev/null +++ b/objectstore/exception/Errnum.hpp @@ -0,0 +1,50 @@ +/****************************************************************************** + * + * This file is part of the Castor project. + * See http://castor.web.cern.ch/castor + * + * Copyright (C) 2003 CERN + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * + * + * + * @author Castor Dev team, castor-dev@cern.ch + *****************************************************************************/ + +#pragma once + +#include "Exception.hpp" + +namespace cta { +namespace exception { + class Errnum: public cta::exception::Exception { + public: + Errnum(std::string what = ""); + Errnum (int err, std::string what = ""); + virtual ~Errnum() throw() {}; + int errorNumber() const { return m_errnum; } + std::string strError() const { return m_strerror; } + static void throwOnReturnedErrno(int err, std::string context = ""); + static void throwOnNonZero(int status, std::string context = ""); + static void throwOnZero(int status, std::string context = ""); + static void throwOnNull(void * f, std::string context = ""); + static void throwOnNegative(int ret, std::string context = ""); + static void throwOnMinusOne(int ret, std::string context = ""); + protected: + void ErrnumConstructorBottomHalf(const std::string & what); + int m_errnum; + std::string m_strerror; + }; +} +} diff --git a/objectstore/exception/Exception.cpp b/objectstore/exception/Exception.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ec6b2e3cfb0b2a5d436416a0ea11e401d34c7aaa --- /dev/null +++ b/objectstore/exception/Exception.cpp @@ -0,0 +1,79 @@ +/****************************************************************************** + * + * This file is part of the Castor project. + * See http://castor.web.cern.ch/castor + * + * Copyright (C) 2003 CERN + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * + * + * + * @author Castor Dev team, castor-dev@cern.ch + *****************************************************************************/ + +#include "exception/Exception.hpp" + +//------------------------------------------------------------------------------ +// constructor +//------------------------------------------------------------------------------ +cta::exception::Exception::Exception(int se, std::string context, bool embedBacktrace) : + m_message(context), m_backtrace(!embedBacktrace) {} + +//------------------------------------------------------------------------------ +// constructor +//------------------------------------------------------------------------------ +cta::exception::Exception::Exception(std::string context, bool embedBacktrace) : + m_message(context), m_backtrace(!embedBacktrace) {} + +//------------------------------------------------------------------------------ +// copy constructor +//------------------------------------------------------------------------------ +cta::exception::Exception::Exception( + const cta::exception::Exception& rhs): + std::exception() { + m_message << rhs.m_message.str(); + m_backtrace = rhs.m_backtrace; +} + + +//------------------------------------------------------------------------------ +// assignment operator +//------------------------------------------------------------------------------ +cta::exception::Exception& cta::exception::Exception::operator=( + const cta::exception::Exception &rhs) { + m_message << rhs.m_message.str(); + return *this; +} + +//------------------------------------------------------------------------------ +// what operator +//------------------------------------------------------------------------------ +const char * cta::exception::Exception::what() const throw () { + m_what = getMessageValue(); + m_what += "\n"; + m_what += (std::string) m_backtrace; + return m_what.c_str(); +} + +//------------------------------------------------------------------------------ +// destructor +//------------------------------------------------------------------------------ +cta::exception::Exception::~Exception() throw() {} +//------------------------------------------------------------------------------ +// setWhat +//------------------------------------------------------------------------------ +void cta::exception::Exception::setWhat(const std::string& what) { + getMessage() << what; +} + diff --git a/objectstore/exception/Exception.hpp b/objectstore/exception/Exception.hpp new file mode 100644 index 0000000000000000000000000000000000000000..b1ca73e7ad578bbdf246d39c2f1eb2caf91bea09 --- /dev/null +++ b/objectstore/exception/Exception.hpp @@ -0,0 +1,145 @@ +/****************************************************************************** + * + * This file is part of the Castor project. + * See http://castor.web.cern.ch/castor + * + * Copyright (C) 2003 CERN + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * + * + * + * @author Castor Dev team, castor-dev@cern.ch + *****************************************************************************/ + +#pragma once + +#include "exception/Backtrace.hpp" + +#include <exception> +#include <sstream> + +namespace cta { + + namespace exception { + + /** + * class Exception + * A simple exception used for error handling in castor + */ + class Exception: public std::exception { + + public: + + /** + * Empty Constructor + * @param serrno the serrno code of the corresponding C error + * @param context optional context string added to the message + * at initialisation time. + * @param embedBacktrace whether to embed a backtrace of where the + * exception was throw in the message + */ + Exception(int se, std::string context="", bool embedBacktrace=true); + + /** + * Empty Constructor with implicit serrno = SEINERNAL; + * @param context optional context string added to the message + * at initialisation time. + * @param embedBacktrace whether to embed a backtrace of where the + * exception was throw in the message + */ + Exception(std::string context="", bool embedBacktrace=true); + + /** + * Copy Constructor + */ + Exception(const Exception& rhs); + + /** + * Assignment operator + */ + Exception& operator=(const Exception &rhs); + + /** + * Empty Destructor, explicitely non-throwing (needed for std::exception + * inheritance) + */ + virtual ~Exception() throw (); + + /** + * Get the value of m_message + * A message explaining why this exception was raised + * @return the value of m_message + */ + std::ostringstream& getMessage() { + return m_message; + } + + /** + * Get the value of m_message + * A message explaining why this exception was raised + * @return the value of m_message + */ + const std::ostringstream& getMessage() const { + return m_message; + } + + /** + * Get the value of m_message as a sting, for const-c orrectness + * @return the value as a string. + */ + std::string getMessageValue() const { + return m_message.str(); + } + + /** + * Get the backtrace's contents + * @return backtrace in a standard string. + */ + std::string const backtrace() const { + return (std::string)m_backtrace; + } + + /** + * Updates the m_what member with a concatenation of the message and + * the stack trace. + * @return pointer to m_what's contents + */ + virtual const char * what() const throw (); + + private: + /// A message explaining why this exception was raised + std::ostringstream m_message; + + /** + * Placeholder for the what result. It has to be a member + * of the object, and not on the stack of the "what" function. + */ + mutable std::string m_what; + + protected: + void setWhat(const std::string &w); + + /** + * Backtrace object. Its constructor does the heavy lifting of + * generating the backtrace. + */ + Backtrace m_backtrace; + + }; + + } // end of exception namespace + +} // end of castor namespace + + diff --git a/objectstore/exception/strerror_r_wrapper.cpp b/objectstore/exception/strerror_r_wrapper.cpp new file mode 100644 index 0000000000000000000000000000000000000000..921b5212cbbcd6c4c6be5aa55a985407a4a3ad15 --- /dev/null +++ b/objectstore/exception/strerror_r_wrapper.cpp @@ -0,0 +1,52 @@ +/****************************************************************************** + * + * This file is part of the Castor project. + * See http://castor.web.cern.ch/castor + * + * Copyright (C) 2003 CERN + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * + * + * @author Castor Dev team, castor-dev@cern.ch + *****************************************************************************/ + +#include "strerror_r_wrapper.h" + +#if defined(linux) +/* + * Undefine _GNU_SOURCE and define _XOPEN_SOURCE as being 600 so that the + * XSI compliant version of strerror_r() will be used + */ +#undef _GNU_SOURCE +#define _XOPEN_SOURCE 600 +#endif + +#include <string.h> + +/******************************************************************************* + * strerror_r_wrapper + ******************************************************************************/ +extern "C" int strerror_r_wrapper(int errnum, char *buf, size_t buflen) { + /* This function should be compiled using a C++ compiler and not a C compiler. + * + * C++ compilers are better at spotting whether the GNU version or the + * XSI complicant version of sterror_() is being used. This is because the + * difference between the two versions is their return types. The GNU + * version returns a 'char *' whereas the XSI compliant version returns an + * 'int'. A C compiler may allow the strerror_r() function to return a + * 'char *' and have that 'char *' assigned to an 'int'. A C++ compiler + * usually gives an error if this is tried. + */ + return strerror_r(errnum, buf, buflen); +} diff --git a/objectstore/exception/strerror_r_wrapper.h b/objectstore/exception/strerror_r_wrapper.h new file mode 100644 index 0000000000000000000000000000000000000000..12619a10e420871e28f7896be59255cdb8446536 --- /dev/null +++ b/objectstore/exception/strerror_r_wrapper.h @@ -0,0 +1,64 @@ +/****************************************************************************** + * + * This file is part of the Castor project. + * See http://castor.web.cern.ch/castor + * + * Copyright (C) 2003 CERN + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * + * + * @author Castor Dev team, castor-dev@cern.ch + *****************************************************************************/ + +#pragma once + +#include <stddef.h> + +/* The following EXTERN_C marco has been intentionally copied from */ +/* h/osdep.h instead of just including h/osdep.h. The reason for this is */ +/* this header file must include the minimum number of header files because */ +/* the implementation file common/strerror_r_wrapper.cpp will undefine */ +/* _GNU_SOURCE and define _XOPEN_SOURCE as being 600. */ +/* */ +/* Macros for externalization (UNIX) (J.-D.Durand) */ +#if defined(__cplusplus) +#define EXTERN_C extern "C" +#else +#define EXTERN_C extern +#endif + +/** + * This function wraps the XSI compliant version of strerror_r() and therefore + * writes the string representation of the specified error number to the + * specified buffer. + * + * This function should be compiled using a C++ compiler and not a C compiler. + * + * C++ compilers are better at spotting whether the GNU version or the + * XSI complicant version of sterror_() is being used. This is because the + * difference between the two versions is their return types. The GNU + * version returns a 'char *' whereas the XSI compliant version returns an + * 'int'. A C compiler may allow the strerror_r() function to return a + * 'char *' and have that 'char *' assigned to an 'int'. A C++ compiler + * usually gives an error if this is tried. + * + * @param errnum The error number. + * @param buf The buffer. + * @param buflen The length of the buffer. + * @return 0 on success and -1 on error. If -1 is returned then errno is set + * to either EINVAL to indicate the error number is invalid, or to ERANGE to + * indicate the supplied error buffer is not large enough. + */ +EXTERN_C int strerror_r_wrapper(int errnum, char *buf, size_t buflen); + diff --git a/objectstore/tapeResourceManagerTest.cpp b/objectstore/tapeResourceManagerTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7576c51523e31c8f4873329e7a15fcf9bb8cb0fd --- /dev/null +++ b/objectstore/tapeResourceManagerTest.cpp @@ -0,0 +1,75 @@ +#include "threading/Threading.hpp" +#include "threading/ChildProcess.hpp" +#include "exception/Exception.hpp" +#include "exception/Errnum.hpp" +#include <iostream> +#include "ObjectStores.hpp" +typedef ObjectStoreRados myOS; +#include "RootEntry.hpp" +#include "Agent.hpp" +#include "ContextHandle.hpp" +#include "AgentId.hpp" +#include "ObjectStructureDumper.hpp" +#include "JobPool.hpp" + + + +class jobExecutorThread: public cta::threading::Thread { +public: + jobExecutorThread(Agent & a): cta::threading::Thread(), m_agent(a) {} +private: + virtual void run () { + // make the agent act + m_agent.act(); + } + Agent & m_agent; +}; + +class jobExecutorProcess: public cta::threading::ChildProcess { +public: + jobExecutorProcess(Agent & a): cta::threading::ChildProcess(), m_agent(a) {} +private: + virtual int run () { + // increase 100 time root entry's + try { + m_agent.act(); + } catch (std::exception & e) { + std::cout << e.what() << std::endl; + } catch (...) { + return -1; + } + return 0; + } + Agent & m_agent; +}; + +class dummyCleanup: public cta::threading::ChildProcess::Cleanup { + virtual void operator()() {} +} dc; + +int main(void){ + try { + myOS os("", "tapetest", "tapetest"); + //myOS os; + std::cout << "os.path=" << os.path() << " os.user=" << os.user() + << "os.pool=" << os.pool() << std::endl; + // Initialize the root entry + RootEntry::init(os); + ContextHandleImplementation<myOS> ctx; + // Dump the structure + ObjectStrucutreDumper osd; + std::cout << osd.dump(os, ctx) << std::endl; + // Get hold of the root entry + RootEntry re(os,ctx); + // Create and populate the job queues + AgentId aid("masterProcess"); + Register agentRegister(os, re.allocateOrGetJobPool(ctx, aid.nextId()), ctx); + JobPool jobPool(os, re.allocateOrGetJobPool(ctx, aid.nextId()), ctx); + // Dump again + std::cout << osd.dump(os, ctx) << std::endl; + } catch (std::exception &e) { + std::cout << "got exception: " << e.what() << std::endl; + } + + return 0; +} diff --git a/objectstore/threading/ChildProcess.cpp b/objectstore/threading/ChildProcess.cpp new file mode 100644 index 0000000000000000000000000000000000000000..49577f06ad889c7d0d6d65f3e7b562a19835cad6 --- /dev/null +++ b/objectstore/threading/ChildProcess.cpp @@ -0,0 +1,105 @@ +/****************************************************************************** + * + * This file is part of the Castor project. + * See http://castor.web.cern.ch/castor + * + * Copyright (C) 2003 CERN + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * + * + * @author Castor Dev team, castor-dev@cern.ch + *****************************************************************************/ + +#include "ChildProcess.hpp" +#include <unistd.h> +#include <stdlib.h> +#include <sys/wait.h> + +void cta::threading::ChildProcess::start(Cleanup & cleanup) { + m_pid = fork(); + if (!m_pid) { + /* We are the child process. Do our stuff and exit. */ + cleanup(); + exit(run()); + } else if (-1 == m_pid) { + /* We are in the parent process, for failed */ + throw cta::exception::Errnum("Failed to fork a child process in cta::threading::ChildProcess::ChildProcess()"); + } + /* In parent process, child is OK. */ + m_started = true; +} + +void cta::threading::ChildProcess::parseStatus(int status) { + if (WIFEXITED(status)) { + m_finished = true; + m_exited = true; + m_exitCode = WEXITSTATUS(status); + } else if (WIFSIGNALED(status)) { + m_finished = true; + m_wasKilled = true; + } +} + +bool cta::threading::ChildProcess::running() { + /* Checking for a running process before starting gets an exception */ + if (!m_started) throw ProcessNeverStarted(); + /* If we are not aware of process exiting, let's check and collect exit code */ + if (!m_finished) { + /* Re-check the status now. */ + int status, ret; + cta::exception::Errnum::throwOnMinusOne( + ret = waitpid(m_pid, &status, WNOHANG), + "Error from waitpid in cta::threading::ChildProcess::running()"); + if (ret == m_pid) parseStatus(status); + } + return !m_finished; +} + +void cta::threading::ChildProcess::wait() { + /* Checking for a running process before starting gets an exception */ + if (!m_started) throw ProcessNeverStarted(); + if (m_finished) return; + int status, ret; + cta::exception::Errnum::throwOnMinusOne( + ret = waitpid(m_pid, &status, 0), + "Error from waitpid in cta::threading::ChildProcess::wait()"); + /* Check child status*/ + if (ret == m_pid) parseStatus(status); + if(!m_finished) + throw cta::exception::Exception("Process did not exit after waitpid()."); +} + +int cta::threading::ChildProcess::exitCode() { + if (!m_started) throw ProcessNeverStarted(); + if (!m_finished) { + int status, ret; + cta::exception::Errnum::throwOnMinusOne( + ret = waitpid(m_pid, &status, WNOHANG), + "Error from waitpid in cta::threading::ChildProcess::running()"); + if (ret == m_pid) parseStatus(status); + } + /* Check child status*/ + if (!m_finished) { + throw ProcessStillRunning(); + } + if (!m_exited) { + throw ProcessWasKilled(); + } + return m_exitCode; +} + +void cta::threading::ChildProcess::kill() { + if (!m_started) throw ProcessNeverStarted(); + ::kill(m_pid, SIGTERM); +} diff --git a/objectstore/threading/ChildProcess.hpp b/objectstore/threading/ChildProcess.hpp new file mode 100644 index 0000000000000000000000000000000000000000..b2c4c51aab045fc583bac32f9a54f7620de60f64 --- /dev/null +++ b/objectstore/threading/ChildProcess.hpp @@ -0,0 +1,100 @@ +/****************************************************************************** + * + * This file is part of the Castor project. + * See http://castor.web.cern.ch/castor + * + * Copyright (C) 2003 CERN + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * + * + * @author Castor Dev team, castor-dev@cern.ch + *****************************************************************************/ +#pragma once + +#include "exception/Errnum.hpp" +#include "exception/Exception.hpp" +#include <unistd.h> + + +namespace cta { +namespace threading { + + /** + * A class allowing forking of a child process, and subsequent follow up + * of the child process. Status check, killing, return code collection. + */ + class ChildProcess { + public: + /** + * Helper functor for child to clean up unneeded parent resources + * after forking. + */ + class Cleanup { + public: + virtual void operator() () = 0; + virtual ~Cleanup() {} + }; + /** + * Exceptions for wrong usage. + */ + class ProcessStillRunning: public cta::exception::Exception { + public: + ProcessStillRunning(const std::string & what = "Process still running"): + cta::exception::Exception::Exception(what) {} + }; + + class ProcessNeverStarted: public cta::exception::Exception { + public: + ProcessNeverStarted(const std::string & what = "Process never started"): + cta::exception::Exception::Exception(what) {} + }; + + class ProcessWasKilled: public cta::exception::Exception { + public: + ProcessWasKilled(const std::string & what = "Process was killed"): + cta::exception::Exception::Exception(what) {} + }; + + ChildProcess(): m_started(false), m_finished(false), m_exited(false), + m_wasKilled(false) {} + /* Clean up leftover child processes (hopefully not useful) */ + virtual ~ChildProcess() { if (m_started && !m_finished) kill(); }; + /** start function, taking as an argument a callback for parent's + * resources cleanup. A child process can only be fired once. */ + void start(Cleanup & cleanup) ; + /** Check running status */ + bool running() ; + /** Wait for completion */ + void wait() ; + /** collect exit code */ + int exitCode() ; + /** kill */ + void kill() ; + private: + pid_t m_pid; + /** Was the process started? */ + bool m_started; + /** As the process finished? */ + bool m_finished; + /** Did the process exit cleanly? */ + bool m_exited; + /** Was the process killed? */ + bool m_wasKilled; + int m_exitCode; + /** The function actually being run in the child process. The value returned + * by run() will be the exit code of the process (if we get that far) */ + virtual int run() = 0; + void parseStatus(int status); + }; +}} diff --git a/objectstore/threading/Mutex.cpp b/objectstore/threading/Mutex.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0be4f3c17f089f2a1a4fe76dd0e5920ebc9f0102 --- /dev/null +++ b/objectstore/threading/Mutex.cpp @@ -0,0 +1,49 @@ +#include "threading/Mutex.hpp" +#include "exception/Errnum.hpp" +#include "exception/Exception.hpp" + +//------------------------------------------------------------------------------ +//constructor +//------------------------------------------------------------------------------ +cta::threading::Mutex::Mutex() { + pthread_mutexattr_t attr; + cta::exception::Errnum::throwOnReturnedErrno( + pthread_mutexattr_init(&attr), + "Error from pthread_mutexattr_init in cta::threading::Mutex::Mutex()"); + cta::exception::Errnum::throwOnReturnedErrno( + pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK), + "Error from pthread_mutexattr_settype in cta::threading::Mutex::Mutex()"); + cta::exception::Errnum::throwOnReturnedErrno( + pthread_mutex_init(&m_mutex, &attr), + "Error from pthread_mutex_init in cta::threading::Mutex::Mutex()"); + try { + cta::exception::Errnum::throwOnReturnedErrno( + pthread_mutexattr_destroy(&attr), + "Error from pthread_mutexattr_destroy in cta::threading::Mutex::Mutex()"); + } catch (...) { + pthread_mutex_destroy(&m_mutex); + throw; + } +} +//------------------------------------------------------------------------------ +//destructor +//------------------------------------------------------------------------------ +cta::threading::Mutex::~Mutex() { + pthread_mutex_destroy(&m_mutex); +} +//------------------------------------------------------------------------------ +//lock +//------------------------------------------------------------------------------ +void cta::threading::Mutex::lock() { + cta::exception::Errnum::throwOnReturnedErrno( + pthread_mutex_lock(&m_mutex), + "Error from pthread_mutex_lock in cta::threading::Mutex::lock()"); +} +//------------------------------------------------------------------------------ +//unlock +//------------------------------------------------------------------------------ +void cta::threading::Mutex::unlock() { + cta::exception::Errnum::throwOnReturnedErrno( + pthread_mutex_unlock(&m_mutex), + "Error from pthread_mutex_unlock in cta::threading::Mutex::unlock()"); +} diff --git a/objectstore/threading/Mutex.hpp b/objectstore/threading/Mutex.hpp new file mode 100644 index 0000000000000000000000000000000000000000..dced78cbdc29bc92832e358a033466b29cd1d700 --- /dev/null +++ b/objectstore/threading/Mutex.hpp @@ -0,0 +1,44 @@ +/****************************************************************************** + * Payload.hpp + * + * This file is part of the Castor project. + * See http://castor.web.cern.ch/castor + * + * Copyright (C) 2003 CERN + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * + * + * @author Castor Dev team, castor-dev@cern.ch + *****************************************************************************/ +#pragma once + +#include <pthread.h> +#include <semaphore.h> + +namespace cta { +namespace threading { + /** + * A simple exception throwing wrapper for pthread mutexes. + * Inspired from the interface of Qt. + */ + class Mutex { + public: + Mutex() ; + ~Mutex(); + void lock() ; + void unlock(); + private: + pthread_mutex_t m_mutex; + }; +}} diff --git a/objectstore/threading/Threading.cpp b/objectstore/threading/Threading.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f25e9885973eb8146f70c3700f2cbf255902a42f --- /dev/null +++ b/objectstore/threading/Threading.cpp @@ -0,0 +1,100 @@ +/****************************************************************************** + * + * This file is part of the Castor project. + * See http://castor.web.cern.ch/castor + * + * Copyright (C) 2003 CERN + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * + * + * @author Castor Dev team, castor-dev@cern.ch + *****************************************************************************/ + +#include "Threading.hpp" +#include <errno.h> +#include <typeinfo> +#include <stdlib.h> +#include <cxxabi.h> + +/* Implmentations of the threading primitives */ +//------------------------------------------------------------------------------ +//start +//------------------------------------------------------------------------------ +void cta::threading::Thread::start() + { + cta::exception::Errnum::throwOnReturnedErrno( + pthread_create(&m_thread, NULL, pthread_runner, this), + "Error from pthread_create in cta::threading::Thread::start()"); +} +//------------------------------------------------------------------------------ +//wait +//------------------------------------------------------------------------------ +void cta::threading::Thread::wait() + { + cta::exception::Errnum::throwOnReturnedErrno( + pthread_join(m_thread, NULL), + "Error from pthread_join in cta::threading::Thread::wait()"); + if (m_hadException) { + std::string w = "Uncaught exception of type \""; + w += m_type; + w += "\" in Thread.run():\n>>>>\n"; + w += m_what; + w += "<<<< End of uncaught exception"; + throw UncaughtExceptionInThread(w); + } +} +//------------------------------------------------------------------------------ +//cancel +//------------------------------------------------------------------------------ +void cta::threading::Thread::cancel() + { + cta::exception::Errnum::throwOnReturnedErrno( + pthread_cancel(m_thread), + "Error from pthread_cancel in cta::threading::Thread::cancel()"); +} +//------------------------------------------------------------------------------ +//pthread_runner +//------------------------------------------------------------------------------ +void * cta::threading::Thread::pthread_runner (void * arg) { + + /* static_casting a pointer to and from void* preserves the address. + * See https://stackoverflow.com/questions/573294/when-to-use-reinterpret-cast + */ + Thread * _this = static_cast<Thread *>(arg); + + // Set the thread cancellation type to immediate, for use in the tapeResourceManager tests + cta::exception::Errnum::throwOnReturnedErrno(::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL), + "Error from pthread_setcancelstate in cta::threading::Thread::pthread_runner"); + cta::exception::Errnum::throwOnReturnedErrno(::pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL), + "Error from pthread_setcanceltype in cta::threading::Thread::pthread_runner"); + try { + _this->run(); + } catch (std::exception & e) { + _this->m_hadException = true; + int status = -1; + char * demangled = abi::__cxa_demangle(typeid(e).name(), NULL, NULL, &status); + if (!status) { + _this->m_type += demangled; + } else { + _this->m_type = typeid(e).name(); + } + free(demangled); + _this->m_what = e.what(); + } catch (...) { + _this->m_hadException = true; + _this->m_type = "unknown"; + _this->m_what = "uncaught non-standard exception"; + } + return NULL; +} diff --git a/objectstore/threading/Threading.hpp b/objectstore/threading/Threading.hpp new file mode 100644 index 0000000000000000000000000000000000000000..67bf43d14d15aeccae98801e994dc27704dc5e97 --- /dev/null +++ b/objectstore/threading/Threading.hpp @@ -0,0 +1,67 @@ +/****************************************************************************** + * + * This file is part of the Castor project. + * See http://castor.web.cern.ch/castor + * + * Copyright (C) 2003 CERN + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * + * + * @author Castor Dev team, castor-dev@cern.ch + *****************************************************************************/ + +#pragma once + +#include <pthread.h> +#include <semaphore.h> +#include "exception/Errnum.hpp" +#include "exception/Exception.hpp" +#include "threading/Mutex.hpp" + +namespace cta { +namespace threading { + + + + /** + * An exception class thrown by the Thread class. + */ + class UncaughtExceptionInThread: public cta::exception::Exception { + public: + UncaughtExceptionInThread(const std::string& w= ""): cta::exception::Exception(w) {} + }; + + /** + * A Thread class, based on the Qt interface. To be used, on should + * inherit from it, and implement the run() method. + * The thread is started with start() and joined with wait(). + */ + class Thread { + public: + Thread(): m_hadException(false), m_what("") {} + virtual ~Thread () {} + void start() ; + void wait() ; + void cancel(); + protected: + virtual void run () = 0; + private: + pthread_t m_thread; + bool m_hadException; + std::string m_what; + std::string m_type; + static void * pthread_runner (void * arg); + }; + +}}