Commit ff1339ef authored by Eric Cano's avatar Eric Cano
Browse files

Started to move a production implementation of the object store.

The context handle concept is being dropped, with scoped lock objects in place.
The VFS backent store now compiles.
parent 18c05d87
#pragma once
#include <string>
namespace cta { namespace objectstore {
/**
* Interface to the backend stores that we can use.
*/
class Backend {
public:
virtual ~Backend() {}
/**
* Create an object (and possibly the necessary locking structures)
* @param name name of the object
* @param content the object's content
*/
virtual void create(std::string name, std::string content) = 0;
/**
* Overwrite an existing object atomically
* @param name name of the object
* @param content new content of the object
*/
virtual void atomicOverwrite(std::string name, std::string content) = 0;
/**
* Read the content of an object
* @param name name of the object
* @return the content of the object, as a string.
*/
virtual std::string read(std::string name) = 0;
/**
* Delete an object (and possibly its locking structure)
* @param name name of the object
*/
virtual void remove(std::string name) = 0;
/**
* Tests the existence of the object
* @param name
* @return true if the object is found
*/
virtual bool exists(std::string name) = 0;
/**
* RAII class holding locks
*/
class ScopedLock {
public:
/**
* Explicitely releases the lock
*/
virtual void release() = 0;
/**
* Destructor (implicitly releases the lock).
*/
virtual ~ScopedLock() {}
};
/**
* Locks the object shared
* @param name name of the object
* @return pointer to a newly created scoped lock object (for RAII)
*/
virtual ScopedLock * lockShared(std::string name) = 0;
/**
* Locks the object exclusively
* @param name name of the object
* @return pointer to a newly created scoped lock object (for RAII)
*/
virtual ScopedLock * lockExclusive(std::string name) = 0;
/**
* Base class for the representation of the parameters of the BackendStore.
*/
class Parameters {
public:
/**
* Turns parameter class into string representation
* @return the string representation
*/
virtual std::string toStr() = 0;
/**
* Virtual destructor
*/
virtual ~Parameters() {}
};
/**
* Returns a type specific representation of the parameters
* @return pointer to the newly created representation.
*/
virtual Parameters * getParams() = 0;
};
}} // end of cta::objectstore
#pragma once
#include <string>
#include <errno.h>
#include <stdlib.h>
#include <ftw.h>
#include <fcntl.h>
#include <stdio.h>
#include <sys/file.h>
#include <sys/syscall.h>
#include <fstream>
#include <rados/librados.hpp>
#include "exception/Exception.hpp"
#include "exception/Errnum.hpp"
#include "Backend.hpp"
namespace cta { namespace objectstore {
class ContextHandle {
public:
ContextHandle(): m_set(false), m_hasFd(false), m_fd(-1) {}
void set(int fd) { m_set = true; m_fd = fd; m_hasFd = true; __sync_synchronize(); }
void set() { m_set = true; }
int get(int) { return m_fd; }
void reset() { m_set = false; m_fd = -1; m_hasFd = false; __sync_synchronize(); }
bool isSet() { return m_set; }
void release() { if (!m_hasFd) return; __sync_synchronize(); if (m_set && -1 != m_fd) ::close(m_fd); reset(); }
~ContextHandle() { release(); }
private:
bool m_set;
bool m_hasFd;
volatile int m_fd;
};
class ObjectStore {
public:
virtual ~ObjectStore() {}
virtual void create(std::string name, std::string content) = 0;
virtual void atomicOverwrite(std::string name, std::string content) = 0;
virtual std::string read(std::string name) = 0;
virtual void lockShared(std::string name, ContextHandle & context, std::string where="") = 0;
virtual void lockExclusive(std::string name, ContextHandle & context, std::string where="") = 0;
virtual void unlock(std::string name, ContextHandle & context, std::string where="") = 0;
virtual void remove(std::string name) = 0;
virtual std::string path() { return ""; }
virtual std::string user() { return ""; }
virtual std::string pool() { return ""; }
};
// An implementation of the object store primitives, using flock to lock,
// so that several threads can compete for the same file (locks are per file
// descriptor). The lock will be done on a special file (XXXX.lock), which will
// not be moved (as opposed to the content itself, which needs to be moved to
// be atomically created)
// The context here is a pointer to an element of a per thread int array, that will
// hold the file descriptors. This int is chosen for containing -1 (free).
// When the thread is killed, the destructor will be able to safely close
// all the file descriptors in the array.
// The fd will be deleted from the array before calling close.
// This will be a fd leak source, but this might be good enough for this test's
// purpose.
class ObjectStoreVFS: public ObjectStore {
public:
ObjectStoreVFS():m_deleteOnExit(true) {
// Create the directory for storage
char tplt[] = "/tmp/jobStoreVFSXXXXXX";
mkdtemp(tplt);
// If template is an empty string, we failed, otherwise, we're fine.
if (*tplt) {
m_root = tplt;
} else {
throw cta::exception::Errnum("Failed to create temporary directory");
}
}
void noDeleteOnExit() {
m_deleteOnExit = false;
}
ObjectStoreVFS(std::string path, std::string user, std::string pool):
m_root(path), m_deleteOnExit(false) {}
virtual ~ObjectStoreVFS() {
if (m_deleteOnExit) {
// Delete the created directories recursively
nftw (m_root.c_str(), deleteFileOrDirectory, 100, FTW_DEPTH);
}
}
virtual std::string path() {
return m_root;
}
virtual void create(std::string name, std::string content){
std::string path = m_root+ "/" + name;
std::string lockPath = m_root + "/." + name + ".lock";
try {
int fd= ::creat(path.c_str(), S_IRWXU);
// Create and fill up the path
cta::exception::Errnum::throwOnMinusOne(fd,
"In ObjectStoreVFS::create, failed to creat the file");
cta::exception::Errnum::throwOnMinusOne(
::write(fd, content.c_str(), content.size()),
"In ObjectStoreVFS::create, failed to write to file");
cta::exception::Errnum::throwOnMinusOne(::close(fd),
"In ObjectStoreVFS::create, failed to close the file");
// Create the lock file
int fdLock= ::creat(lockPath.c_str(), S_IRWXU);
cta::exception::Errnum::throwOnMinusOne(fdLock,
"In ObjectStoreVFS::create, failed to creat the lock file");
cta::exception::Errnum::throwOnMinusOne(::close(fdLock),
"In ObjectStoreVFS::create, failed to close the lock file");
} catch (...) {
unlink(path.c_str());
unlink(lockPath.c_str());
throw;
}
}
virtual void atomicOverwrite(std::string name, std::string content) {
// When entering here, we should hold an exclusive lock on the *context
// file descriptor. We will create a new file, lock it immediately exclusively,
// create the new content in it, move it over the old file, and close the *context
// file descriptor.
std::string tempPath = m_root+"/." + name + ".pre-overwrite";
std::string targetPath = m_root+"/" + name;
// Create the new version of the file, make sure it's visible, lock it.
int fd= ::creat(tempPath.c_str(), S_IRWXU);
cta::exception::Errnum::throwOnMinusOne(fd,
"In ObjectStoreVFS::atomicOverwrite, failed to creat the pre-overwrite file");
cta::exception::Errnum::throwOnMinusOne(
::write(fd, content.c_str(), content.size()),
"In ObjectStoreVFS::atomicOverwrite, failed to write to the pre-overwrite file");
cta::exception::Errnum::throwOnMinusOne(::close(fd),
"In ObjectStoreVFS::atomicOverwrite, failed to close the pre-overwrite file");
std::stringstream err;
err << "In ObjectStoreVFS::atomicOverwrite, failed to rename the file"
<< " tempPath=" << tempPath << " targetPath=" << targetPath << " tid=" << syscall(SYS_gettid);
cta::exception::Errnum::throwOnMinusOne(
::rename(tempPath.c_str(), targetPath.c_str()),
err.str());
}
virtual std::string read(std::string name) {
std::string path = m_root+"/" + name;
std::string ret;
std::ifstream file(path.c_str());
if (!file) {
throw cta::exception::Errnum(
std::string("In ObjectStoreVFS::read, failed to open file for read: ") +
path);
}
char buff[200];
while(!file.eof()) {
file.read(buff, sizeof(buff));
ret.append(buff,file.gcount());
}
return ret;
}
virtual void remove(std::string name) {
std::string path = m_root+"/" + name;
std::string lockPath = m_root+"/." + name + ".lock";
cta::exception::Errnum::throwOnNonZero(unlink(path.c_str()), "Failed to remove object file");
cta::exception::Errnum::throwOnNonZero(unlink(lockPath.c_str()), "Failed to remove lock file.");
}
void lockHelper(std::string name, ContextHandle & context, int type) {
std::string path = m_root + "/." + name + ".lock";
context.set(::open(path.c_str(), O_RDONLY, S_IRWXU));
cta::exception::Errnum::throwOnMinusOne(context.get(0),
"In ObjectStoreVFS::lockHelper, failed to open the lock file.");
cta::exception::Errnum::throwOnMinusOne(::flock(context.get(0), LOCK_EX),
"In ObjectStoreVFS::lockHelper, failed to flock the lock file.");
}
virtual void lockExclusive(std::string name, ContextHandle & context, std::string where) {
lockHelper(name, context, LOCK_EX);
//std::cout << "LockedExclusive " << name << " with fd=" << context.get(0) << " @" << where << " tid=" << syscall(SYS_gettid) << std::endl;
}
virtual void lockShared(std::string name, ContextHandle & context, std::string where) {
lockHelper(name, context, LOCK_SH);
//std::cout << "LockedShared " << name << " with fd=" << context.get(0) << " @" << where << " tid=" << syscall(SYS_gettid) << std::endl;
}
virtual void unlock(std::string name, ContextHandle & context, std::string where) {
::flock(context.get(0), LOCK_UN);
int fd= context.get(0);
context.reset();
::close(fd);
//std::cout << "Unlocked " << name << " with fd=" << fd << " @" << where << " tid=" << syscall(SYS_gettid) << std::endl;
}
private:
std::string m_root;
bool m_deleteOnExit;
// Utility function to be used in the destructor
static int deleteFileOrDirectory(const char *fpath, const struct stat *sb,
int tflag, struct FTW *ftwbuf) {
switch (tflag) {
case FTW_D:
case FTW_DNR:
case FTW_DP:
rmdir (fpath);
break;
default:
unlink (fpath);
break;
}
return 0;
}
};
class ObjectStoreRados: public ObjectStore {
class ObjectStoreRados: public Backend {
public:
ObjectStoreRados(std::string path, std::string userId, std::string pool):
m_user(userId), m_pool(pool), m_cluster(), m_radosCtx() {
......@@ -345,4 +136,4 @@ private:
librados::IoCtx m_radosCtx;
};
}}
\ No newline at end of file
}} // end of cta::objectstore
#include "BackendVFS.hpp"
#include "exception/Errnum.hpp"
#include <fstream>
#include <stdlib.h>
#include <ftw.h>
#include <fcntl.h>
#include <sys/syscall.h>
#include <sys/file.h>
#include <stdio.h>
#include <memory>
namespace cta { namespace objectstore {
BackendVFS::BackendVFS() : m_deleteOnExit(true) {
// Create the directory for storage
char tplt[] = "/tmp/jobStoreVFSXXXXXX";
mkdtemp(tplt);
// If template is an empty string, we failed, otherwise, we're fine.
if (*tplt) {
m_root = tplt;
} else {
throw cta::exception::Errnum("Failed to create temporary directory");
}
}
BackendVFS::BackendVFS(std::string path):
m_root(path), m_deleteOnExit(false) {}
void BackendVFS::noDeleteOnExit() {
m_deleteOnExit = false;
}
void BackendVFS::deleteOnExit() {
m_deleteOnExit = true;
}
int deleteFileOrDirectory(const char* fpath,
const struct ::stat* sb, int tflag, struct FTW* ftwbuf) {
switch (tflag) {
case FTW_D:
case FTW_DNR:
case FTW_DP:
rmdir(fpath);
break;
default:
unlink(fpath);
break;
}
return 0;
}
BackendVFS::~BackendVFS() {
if (m_deleteOnExit) {
// Delete the created directories recursively
nftw (m_root.c_str(), deleteFileOrDirectory, 100, FTW_DEPTH);
}
}
void BackendVFS::create(std::string name, std::string content) {
std::string path = m_root + "/" + name;
std::string lockPath = m_root + "/." + name + ".lock";
try {
int fd = ::creat(path.c_str(), S_IRWXU);
// Create and fill up the path
cta::exception::Errnum::throwOnMinusOne(fd,
"In ObjectStoreVFS::create, failed to creat the file");
cta::exception::Errnum::throwOnMinusOne(
::write(fd, content.c_str(), content.size()),
"In ObjectStoreVFS::create, failed to write to file");
cta::exception::Errnum::throwOnMinusOne(::close(fd),
"In ObjectStoreVFS::create, failed to close the file");
// Create the lock file
int fdLock = ::creat(lockPath.c_str(), S_IRWXU);
cta::exception::Errnum::throwOnMinusOne(fdLock,
"In ObjectStoreVFS::create, failed to creat the lock file");
cta::exception::Errnum::throwOnMinusOne(::close(fdLock),
"In ObjectStoreVFS::create, failed to close the lock file");
} catch (...) {
unlink(path.c_str());
unlink(lockPath.c_str());
throw;
}
}
void BackendVFS::atomicOverwrite(std::string name, std::string content) {
// When entering here, we should hold an exclusive lock on the *context
// file descriptor. We will create a new file, lock it immediately exclusively,
// create the new content in it, move it over the old file, and close the *context
// file descriptor.
std::string tempPath = m_root + "/." + name + ".pre-overwrite";
std::string targetPath = m_root + "/" + name;
// Create the new version of the file, make sure it's visible, lock it.
int fd = ::creat(tempPath.c_str(), S_IRWXU);
cta::exception::Errnum::throwOnMinusOne(fd,
"In ObjectStoreVFS::atomicOverwrite, failed to creat the pre-overwrite file");
cta::exception::Errnum::throwOnMinusOne(
::write(fd, content.c_str(), content.size()),
"In ObjectStoreVFS::atomicOverwrite, failed to write to the pre-overwrite file");
cta::exception::Errnum::throwOnMinusOne(::close(fd),
"In ObjectStoreVFS::atomicOverwrite, failed to close the pre-overwrite file");
std::stringstream err;
err << "In ObjectStoreVFS::atomicOverwrite, failed to rename the file"
<< " tempPath=" << tempPath << " targetPath=" << targetPath << " tid=" << syscall(SYS_gettid);
cta::exception::Errnum::throwOnMinusOne(
::rename(tempPath.c_str(), targetPath.c_str()),
err.str());
}
std::string BackendVFS::read(std::string name) {
std::string path = m_root + "/" + name;
std::string ret;
std::ifstream file(path.c_str());
if (!file) {
throw cta::exception::Errnum(
std::string("In ObjectStoreVFS::read, failed to open file for read: ") +
path);
}
char buff[200];
while (!file.eof()) {
file.read(buff, sizeof (buff));
ret.append(buff, file.gcount());
}
return ret;
}
void BackendVFS::remove(std::string name) {
std::string path = m_root + "/" + name;
std::string lockPath = m_root + "/." + name + ".lock";
cta::exception::Errnum::throwOnNonZero(unlink(path.c_str()), "Failed to remove object file");
cta::exception::Errnum::throwOnNonZero(unlink(lockPath.c_str()), "Failed to remove lock file.");
}
bool BackendVFS::exists(std::string name) {
std::string path = m_root + "/" + name;
if (::access(path.c_str(), F_OK))
return false;
return true;
}
BackendVFS::Parameters* BackendVFS::getParams() {
std::auto_ptr<Parameters> ret(new Parameters);
ret->m_path = m_root;
return ret.release();
}
void BackendVFS::ScopedLock::release() {
if (!m_fdSet) return;
::flock(m_fd, LOCK_UN);
::close(m_fd);
m_fdSet = false;
//std::cout << "Unlocked " << name << " with fd=" << fd << " @" << where << " tid=" << syscall(SYS_gettid) << std::endl;
}
BackendVFS::ScopedLock * BackendVFS::lockHelper(
std::string name, int type) {
std::string path = m_root + "/." + name + ".lock";
std::auto_ptr<ScopedLock> ret(new ScopedLock);
ret->set(::open(path.c_str(), O_RDONLY, S_IRWXU));
cta::exception::Errnum::throwOnMinusOne(ret->m_fd,
"In BackendStoreVFS::lockHelper, failed to open the lock file.");
cta::exception::Errnum::throwOnMinusOne(::flock(ret->m_fd, LOCK_EX),
"In BackendStoreVFS::lockHelper, failed to flock the lock file.");
return ret.release();
}
BackendVFS::ScopedLock * BackendVFS::lockExclusive(std::string name) {
return lockHelper(name, LOCK_EX);
//std::cout << "LockedExclusive " << name << " with fd=" << context.get(0) << " @" << where << " tid=" << syscall(SYS_gettid) << std::endl;
}
BackendVFS::ScopedLock * BackendVFS::lockShared(std::string name) {
return lockHelper(name, LOCK_SH);
//std::cout << "LockedShared " << name << " with fd=" << context.get(0) << " @" << where << " tid=" << syscall(SYS_gettid) << std::endl;
}
std::string BackendVFS::Parameters::toStr() {
std::stringstream ret;
ret << "path=" << m_path;
return ret.str();
}
}} // end of cta::objectstore
#pragma once
#include "Backend.hpp"
namespace cta { namespace objectstore {
/**
* An implementation of the object store primitives, using flock to lock,
* so that several threads can compete for the same file (locks are per file
* descriptor). The lock will be done on a special file (XXXX.lock), which will
* not be moved (as opposed to the content itself, which needs to be moved to
* be atomically created)
*/
class BackendVFS: public Backend {
public:
/**
* Default constructor, generating automatically a directory in /tmp
* Following the use of this constructor, the object store WILL be
* destroyed by default on destruction. This can be overridden with
* noDeleteOnExit()
*/
BackendVFS();
/**
* Passive constructor, using an existing store. It will NOT destroy the
* storage on exit, but this can be changed with deleteOnExit()
* @param path
*/
BackendVFS(std::string path);
/**
* Instructs the object to not delete the storage on exit. This is useful
* to create automatically a storage in /tmp, to then use it from several
* programs (potentially in parallel).
*/
void noDeleteOnExit();
/**
* Instructs the object to DO delete the storage on exit.
*/
void deleteOnExit();
virtual ~BackendVFS();
virtual void create(std::string name, std::string content);
virtual void atomicOverwrite(std::string name, std::string content);
virtual std::string read(std::string name);
virtual void remove(std::string name);
virtual bool exists(std::string name);
class ScopedLock: public Backend::ScopedLock {
friend class BackendVFS;
public:
virtual void release();
virtual ~ScopedLock() { release(); }
private:
ScopedLock(): m_fdSet(false) {}
void set(int fd) { m_fd=fd; m_fdSet=true; }
bool m_fdSet;
int m_fd;
};
virtual ScopedLock * lockExclusive(std::string name);
virtual ScopedLock * lockShared(std::string name);
class Parameters: public Backend::Parameters {
friend class BackendVFS;
public:
/**
* The standard-issue params to string for logging
* @return a string representation of the parameters for logging
*/
virtual std::string toStr();
/**
* A more specific member, giving access to the path itself
* @return the path to the VFS storage.
*/
std::string getPath() { return m_path; }
private:
std::string m_path;
};
virtual Parameters * getParams();
private:
std::string m_root;