Commit 7d83cc53 authored by Eric Cano's avatar Eric Cano
Browse files

Initial import of draft version of the object store based on both Ceph and VFS.

Some legacy utilities from castor were imported alongside, with some adaptations.
parent 87e85017
nbproject/
......@@ -7,6 +7,11 @@ project(cta)
set(CMAKE_C_FLAGS_RELWITHDEBINFO "-g")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-g")
set(CMAKE_DISABLE_SOURCE_CHANGES ON)
set(CMAKE_DISABLE_IN_SOURCE_BUILD ON)
list(APPEND CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake)
add_subdirectory(libs)
add_subdirectory(xroot_clients)
add_subdirectory(xroot_plugins)
add_subdirectory(objectstore)
# From, https://raw.githubusercontent.com/Kitware/CMake/master/Modules/FindProtobuf.cmake
# cut down to solve our problem and nothing more
#=============================================================================
# Copyright 2009 Kitware, Inc.
# Copyright 2009-2011 Philip Lowman <philip@yhbt.com>
# Copyright 2008 Esben Mose Hansen, Ange Optimization ApS
#
# Distributed under the OSI-approved BSD License (the "License");
# see accompanying file Copyright.txt for details.
#
# This software is distributed WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the License for more information.
#=============================================================================
# (To distribute this file outside of CMake, substitute the full
# License text for the above reference.)
find_program(PROTOBUF_PROTOC_EXECUTABLE
NAMES protoc
DOC "The Google Protocol Buffers Compiler"
)
message(STATUS "protoc is at ${PROTOBUF_PROTOC_EXECUTABLE} ")
function(PROTOBUF_GENERATE_CPP SRCS HDRS)
if(NOT ARGN)
message(SEND_ERROR "Error: PROTOBUF_GENERATE_CPP() called without any proto files")
return()
endif()
set(_protobuf_include_path -I ${CMAKE_CURRENT_SOURCE_DIR})
set(${SRCS})
set(${HDRS})
foreach(FIL ${ARGN})
get_filename_component(ABS_FIL ${FIL} ABSOLUTE)
get_filename_component(FIL_WE ${FIL} NAME_WE)
list(APPEND ${SRCS} "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.pb.cc")
list(APPEND ${HDRS} "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.pb.h")
add_custom_command(
OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.pb.cc"
"${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.pb.h"
COMMAND ${PROTOBUF_PROTOC_EXECUTABLE}
ARGS --cpp_out ${CMAKE_CURRENT_BINARY_DIR} ${_protobuf_include_path} ${ABS_FIL}
DEPENDS ${ABS_FIL}
COMMENT "Running C++ protocol buffer compiler on ${FIL}"
VERBATIM )
endforeach()
set_source_files_properties(${${SRCS}} ${${HDRS}} PROPERTIES GENERATED TRUE)
set(${SRCS} ${${SRCS}} PARENT_SCOPE)
set(${HDRS} ${${HDRS}} PARENT_SCOPE)
endfunction()
\ No newline at end of file
#pragma once
#include "ObjectStores.hpp"
#include "ContextHandle.hpp"
class Agent {
public:
Agent(ObjectStore & os): m_objectStore(os) {};
~Agent() {
for (size_t i=0; i < c_handleCount; i++) {
m_contexts[i].release();
}
}
void act() {
}
private:
ObjectStore & m_objectStore;
static const size_t c_handleCount = 100;
ContextHandleImplementation<myOS> m_contexts[c_handleCount];
ContextHandleImplementation<myOS> getFreeContext() {
for (size_t i=0; i < c_handleCount; i++) {
if (!m_contexts[i].isSet())
return m_contexts[i];
}
throw cta::exception::Exception("Could not find free context slot");
}
};
\ No newline at end of file
#pragma once
#include <sstream>
#include <string>
#include <sys/syscall.h>
#include <ctime>
class AgentId {
public:
AgentId(std::string agentType): m_nextId(0) {
std::stringstream aid;
// Get time
time_t now = time(0);
struct tm localNow;
localtime_r(&now, &localNow);
// Get hostname
char host[200];
cta::exception::Errnum::throwOnMinusOne(gethostname(host, sizeof(host)),
"In AgentId::AgentId: failed to gethostname");
// gettid is a safe system call (never fails)
aid << agentType << "-" << host << "-" << syscall(SYS_gettid) << "-"
<< 1900 + localNow.tm_year
<< std::setfill('0') << std::setw(2) << 1 + localNow.tm_mon
<< localNow.tm_mday << "-"
<< localNow.tm_hour << ":"
<< localNow.tm_min << ":"
<< localNow.tm_sec;
m_agentId = aid.str();
}
std::string name() {
return m_agentId;
}
std::string nextId() {
std::stringstream id;
id << m_agentId << "-" << m_nextId++;
return id.str();
}
private:
std::string m_agentId;
uint64_t m_nextId;
};
\ No newline at end of file
cmake_minimum_required (VERSION 2.6)
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
include_directories(${CMAKE_BINARY_DIR})
find_package(Protobuf)
set (CTAProtoFiles
cta.proto)
PROTOBUF_GENERATE_CPP(CTAProtoSources CTAProtoHeaders ${CTAProtoFiles})
add_executable (tapeResourceMangerTest tapeResourceManagerTest.cpp
${CTAProtoSources}
exception/Backtrace.cpp
exception/Errnum.cpp
exception/Exception.cpp
exception/strerror_r_wrapper.cpp
threading/ChildProcess.cpp
threading/Mutex.cpp
threading/Threading.cpp)
target_link_libraries(tapeResourceMangerTest
protobuf
rados)
#pragma once
#include "ObjectStores.hpp"
template <class C>
class ContextHandleImplementation: public ContextHandle {};
template <>
class ContextHandleImplementation <ObjectStoreVFS>: public ContextHandle {
public:
ContextHandleImplementation(): m_set(false), m_fd(-1) {}
void set(int fd) { m_set = true; m_fd = fd; __sync_synchronize(); }
int get(int) { return m_fd; }
void reset() { m_set = false; m_fd = -1; __sync_synchronize(); }
bool isSet() { return m_set; }
void release() { __sync_synchronize(); if (m_set && -1 != m_fd) ::close(m_fd); reset(); }
private:
bool m_set;
volatile int m_fd;
};
template <>
class ContextHandleImplementation <ObjectStoreRados>: public ContextHandle {
public:
ContextHandleImplementation(): m_set(false) {}
void set(int) { m_set = true; }
void reset() { m_set = false; }
bool isSet() { return m_set; }
void release() { reset(); }
private:
bool m_set;
};
\ No newline at end of file
#pragma once
#include "ObjectOps.hpp"
#include "exception/Exception.hpp"
class FIFO: private ObjectOps<cta::objectstore::FIFO> {
public:
FIFO(ObjectStore & os, const std::string & name, ContextHandle & context):
ObjectOps<cta::objectstore::FIFO>(os, name),
m_locked(false) {
cta::objectstore::FIFO fs;
updateFromObjectStore(fs, context);
}
void lock(ContextHandle & context) {
if (m_locked) throw cta::exception::Exception(
"In FIFO::lock: already locked");
lockExclusiveAndRead(m_currentState, context);
m_locked = true;
}
std::string peek() {
std::string ret;
if (!m_locked) throw cta::exception::Exception(
"In FIFO::pop: FIFO should be locked");
return m_currentState.name(m_currentState.readpointer());
return ret;
}
void popAndUnlock(ContextHandle & context) {
if (!m_locked) throw cta::exception::Exception(
"In FIFO::pop: FIFO should be locked");
m_currentState.set_readpointer(m_currentState.readpointer()+1);
if (m_currentState.readpointer() > 100) {
compactCurrentState();
}
write(m_currentState);
unlock(context);
m_locked = false;
}
void push(std::string name, ContextHandle & context) {
cta::objectstore::FIFO fs;
lockExclusiveAndRead(fs, context);
fs.add_name(name);
write(fs);
unlock(context);
}
std::string dump(ContextHandle & context) {
cta::objectstore::FIFO fs;
updateFromObjectStore(fs, context);
std::stringstream ret;
ret<< "<<<< FIFO dump start" << std::endl
<< "Read pointer=" << fs.readpointer() << std::endl
<< "Array size=" << fs.name_size() << std::endl;
for (int i=fs.readpointer(); i<fs.name_size(); i++) {
ret << "name[phys=" << i << " ,log=" << i-fs.readpointer()
<< "]=" << fs.name(i) << std::endl;
}
ret<< ">>>> FIFO dump end." << std::endl;
return ret.str();
}
private:
bool m_locked;
cta::objectstore::FIFO m_currentState;
void compactCurrentState() {
uint64_t oldReadPointer = m_currentState.readpointer();
uint64_t oldSize = m_currentState.name_size();
// Copy the elements at position oldReadPointer + i to i (squash all)
// before the read pointer
for (int i = oldReadPointer; i<m_currentState.name_size(); i++) {
*m_currentState.mutable_name(i-oldReadPointer) = m_currentState.name(i);
}
// Shorten the name array by oldReadPointer elements
for (uint64_t i = 0; i < oldReadPointer; i++) {
m_currentState.mutable_name()->RemoveLast();
}
// reset the read pointer
m_currentState.set_readpointer(0);
// Check the size is as expected
if ((uint64_t)m_currentState.name_size() != oldSize - oldReadPointer) {
std::stringstream err;
err << "In FIFO::compactCurrentState: wrong size after compaction: "
<< "oldSize=" << oldSize << " oldReadPointer=" << oldReadPointer
<< " newSize=" << m_currentState.name_size();
throw cta::exception::Exception(err.str());
}
}
};
\ No newline at end of file
#pragma once
#include "ObjectOps.hpp"
#include "FIFO.hpp"
class JobPool: private ObjectOps<cta::objectstore::jobPool> {
public:
JobPool(ObjectStore & os, const std::string & name, ContextHandle & context):
ObjectOps<cta::objectstore::jobPool>(os, name) {
cta::objectstore::jobPool jps;
updateFromObjectStore(jps, context);
}
void PostRecallJob (const std::string & string, ContextHandle & context) {
cta::objectstore::jobPool jps;
lockExclusiveAndRead(jps, context);
}
private:
// The following functions are hidden from the user in order to provide
// higher level functionnality
//std::string get
};
\ No newline at end of file
#pragma once
#include "ObjectStores.hpp"
template <class C>
class ObjectOps {
public:
ObjectOps(ObjectStore & os, const std::string & name):m_name(name),
m_objectStore(os) {}
void updateFromObjectStore (C & val, ContextHandle & context) {
m_objectStore.lockShared(m_name, context);
std::string reStr = m_objectStore.read(m_name);
m_objectStore.unlock(m_name, context);
val.ParseFromString(reStr);
}
void lockExclusiveAndRead (C & val, ContextHandle & context) {
m_objectStore.lockExclusive(m_name, context);
// Re-read to get latest version (lock upgrade could be useful here)
std::string reStr = m_objectStore.read(m_name);
cta::objectstore::RootEntry res;
val.ParseFromString(reStr);
}
void write (C & val) {
m_objectStore.atomicOverwrite(m_name, val.SerializeAsString());
}
void unlock (ContextHandle & context) {
// release the lock, and return the register name
m_objectStore.unlock(m_name, context);
}
template <class C2>
void writeChild (const std::string & name, C2 & val) {
m_objectStore.atomicOverwrite(name, val.SerializeAsString());
}
private:
std::string m_name;
ObjectStore & m_objectStore;
};
\ No newline at end of file
#pragma once
#include <string>
#include <errno.h>
#include <stdlib.h>
#include <ftw.h>
#include <fcntl.h>
#include <stdio.h>
#include <sys/file.h>
#include <sys/syscall.h>
#include <fstream>
#include <rados/librados.hpp>
class ContextHandle {
public:
template <typename T>
void set(T);
template <typename T>
T get(T);
void reset();
bool isSet();
void release();
};
class ObjectStore {
public:
virtual ~ObjectStore() {}
virtual void atomicOverwrite(std::string name, std::string content) = 0;
virtual std::string read(std::string name) = 0;
virtual void lockShared(std::string name, ContextHandle & context) = 0;
virtual void lockExclusive(std::string name, ContextHandle & context) = 0;
virtual void unlock(std::string name, ContextHandle & context) = 0;
virtual std::string path() { return ""; }
virtual std::string user() { return ""; }
virtual std::string pool() { return ""; }
};
// An implementation of the object store primitives, using flock to lock,
// so that several threads can compete for the same file (locks are per file
// descriptor). The lock will be done on a special file (XXXX.lock), which will
// not be moved (as opposed to the content itself, which needs to be moved to
// be atomically created)
// The context here is a pointer to an element of a per thread int array, that will
// hold the file descriptors. This int is chosen for containing -1 (free).
// When the thread is killed, the destructor will be able to safely close
// all the file descriptors in the array.
// The fd will be deleted from the array before calling close.
// This will be a fd leak source, but this might be good enough for this test's
// purpose.
class ObjectStoreVFS: public ObjectStore {
public:
ObjectStoreVFS():m_deleteOnExit(true) {
// Create the directory for storage
char tplt[] = "/tmp/jobStoreVFSXXXXXX";
mkdtemp(tplt);
// If template is an empty string, we failed, otherwise, we're fine.
if (*tplt) {
m_root = tplt;
} else {
throw cta::exception::Errnum("Failed to create temporary directory");
}
}
ObjectStoreVFS(std::string path, std::string user, std::string pool):
m_root(path), m_deleteOnExit(false) {}
virtual ~ObjectStoreVFS() {
if (m_deleteOnExit) {
// Delete the created directories recursively
nftw (m_root.c_str(), deleteFileOrDirectory, 100, FTW_DEPTH);
}
}
virtual std::string path() {
return m_root;
}
virtual void atomicOverwrite(std::string name, std::string content) {
// When entering here, we should hold an exclusive lock on the *context
// file descriptor. We will create a new file, lock it immediately exclusively,
// create the new content in it, move it over the old file, and close the *context
// file descriptor.
std::string tempPath = m_root+"/." + name + ".pre-overwrite";
std::string targetPath = m_root+"/" + name;
// Create the new version of the file, make sure it's visible, lock it.
int fd= ::creat(tempPath.c_str(), S_IRWXU);
cta::exception::Errnum::throwOnMinusOne(fd,
"In ObjectStoreVFS::atomicOverwrite, failed to creat the pre-overwrite file");
cta::exception::Errnum::throwOnMinusOne(
::write(fd, content.c_str(), content.size()),
"In ObjectStoreVFS::atomicOverwrite, failed to write to the pre-overwrite file");
cta::exception::Errnum::throwOnMinusOne(::close(fd),
"In ObjectStoreVFS::atomicOverwrite, failed to close the pre-overwrite file");
cta::exception::Errnum::throwOnMinusOne(
::rename(tempPath.c_str(), targetPath.c_str()),
"In ObjectStoreVFS::atomicOverwrite, failed to rename the file");
}
virtual std::string read(std::string name) {
std::string path = m_root+"/" + name;
std::string ret;
std::ifstream file(path.c_str());
if (!file) {
throw cta::exception::Errnum(
std::string("In ObjectStoreVFS::read, failed to open file for read: ") +
path);
}
char buff[200];
while(!file.eof()) {
file.read(buff, sizeof(buff));
ret.append(buff,file.gcount());
}
return ret;
}
void lockHelper(std::string name, ContextHandle & context, int type) {
std::string path = m_root + "/" + name + ".lock";
context.set(::open(path.c_str(), O_CREAT, S_IRWXU));
cta::exception::Errnum::throwOnMinusOne(context.get(0),
"In ObjectStoreVFS::lockHelper, failed to open the lock file.");
cta::exception::Errnum::throwOnMinusOne(::flock(context.get(0), LOCK_EX),
"In ObjectStoreVFS::lockHelper, failed to flock the lock file.");
}
virtual void lockExclusive(std::string name, ContextHandle & context) {
lockHelper(name, context, LOCK_EX);
}
virtual void lockShared(std::string name, ContextHandle & context) {
lockHelper(name, context, LOCK_SH);
}
virtual void unlock(std::string name, ContextHandle & context) {
::flock(context.get(0), LOCK_UN);
int fd= context.get(0);
context.reset();
::close(fd);
}
private:
std::string m_root;
bool m_deleteOnExit;
// Utility function to be used in the destructor
static int deleteFileOrDirectory(const char *fpath, const struct stat *sb,
int tflag, struct FTW *ftwbuf) {
switch (tflag) {
case FTW_D:
case FTW_DNR:
case FTW_DP:
rmdir (fpath);
break;
default:
unlink (fpath);
break;
}
return 0;
}
};
class ObjectStoreRados: public ObjectStore {
public:
ObjectStoreRados(std::string path, std::string userId, std::string pool):
m_user(userId), m_pool(pool), m_cluster(), m_radosCtx() {
cta::exception::Errnum::throwOnNonZero(m_cluster.init(userId.c_str()),
"In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.init");
try {
cta::exception::Errnum::throwOnNonZero(m_cluster.conf_read_file(NULL),
"In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.conf_read_file");
cta::exception::Errnum::throwOnNonZero(m_cluster.conf_parse_env(NULL),
"In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.conf_read_file");
cta::exception::Errnum::throwOnNonZero(m_cluster.connect(),
"In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.connect");
cta::exception::Errnum::throwOnNonZero(m_cluster.ioctx_create(pool.c_str(), m_radosCtx),
"In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.ioctx_create");
} catch (...) {
m_cluster.shutdown();
throw;
}
}
virtual ~ObjectStoreRados() {
m_radosCtx.close();
m_cluster.shutdown();
}
virtual std::string user() {
return m_user;
}
virtual std::string pool() {
return m_pool;
}
virtual void atomicOverwrite(std::string name, std::string content) {
ceph::bufferlist bl;
bl.append(content.c_str(), content.size());
cta::exception::Errnum::throwOnNonZero(m_radosCtx.write_full(name, bl),
std::string("In ObjectStoreRados::atomicOverwrite, failed to write_full ")
+ name);
}
virtual std::string read(std::string name) {
std::string ret;
uint64_t size;
time_t time;
cta::exception::Errnum::throwOnNonZero(m_radosCtx.stat(name, &size, &time),
std::string("In ObjectStoreRados::read, failed to stat ")
+ name);
librados::bufferlist bl;
cta::exception::Errnum::throwOnNegative(m_radosCtx.read(name, bl, size, 0),
std::string("In ObjectStoreRados::read, failed to read ")
+ name);
bl.copy(0, size, ret);
return ret;
}
virtual void lockExclusive(std::string name, ContextHandle & context) {
// Build a unique client name: host:thread
char buff[200];
cta::exception::Errnum::throwOnMinusOne(gethostname(buff, sizeof(buff)),
"In ObjectStoreRados::lockExclusive: failed to gethostname");
pid_t tid = syscall(SYS_gettid);
std::stringstream client;
client << buff << ":" << tid;
struct timeval tv;
tv.tv_usec = 0;
tv.tv_sec = 60;
int rc;
do {
rc=m_radosCtx.lock_exclusive(name, "lock", client.str(), "", &tv, 0);
} while (-EBUSY==rc);
if (0!=rc) {
std::cout << "Oh-oh, rc=" << rc << std::endl;
}
cta::exception::Errnum::throwOnNonZero(rc,
std::string("In ObjectStoreRados::lockExclusive, failed to lock_exclusive ")+
name);
}
virtual void lockShared(std::string name, ContextHandle & context) {
// Build a unique client name: host:thread
char buff[200];
cta::exception::Errnum::throwOnMinusOne(gethostname(buff, sizeof(buff)),