diff --git a/objectstore/Backend.hpp b/objectstore/Backend.hpp index cd2632053bf2ab33050e5f61dc071d9c75abf376..a20bd81e869970164a95b81f13eee4fb7b4d5107 100644 --- a/objectstore/Backend.hpp +++ b/objectstore/Backend.hpp @@ -18,8 +18,10 @@ #pragma once +#include "common/exception/Exception.hpp" #include <string> #include <list> +#include <functional> namespace cta { namespace objectstore { @@ -62,7 +64,7 @@ public: * @param name * @return true if the object is found */ - virtual bool exists(std::string name) = 0; + virtual bool exists(std::string name) = 0; /** * Lists all objects @@ -99,7 +101,42 @@ public: * @return pointer to a newly created scoped lock object (for RAII) */ virtual ScopedLock * lockExclusive(std::string name) = 0; - + + /// A collection of exceptions allowing the user to find out which step failed. + CTA_GENERATE_EXCEPTION_CLASS(NoSuchObject); + CTA_GENERATE_EXCEPTION_CLASS(CouldNotLock); + CTA_GENERATE_EXCEPTION_CLASS(CouldNotFetch); + CTA_GENERATE_EXCEPTION_CLASS(CouldNotUpdateValue); + CTA_GENERATE_EXCEPTION_CLASS(CouldNotCommit); + CTA_GENERATE_EXCEPTION_CLASS(CouldNotUnlock); + + /** + * A base class handling asynchronous sequence of lock exclusive, fetch, call user + * operation, commit, unlock. Each operation will be asynchronous, and the result + * (success or exception) will be returned via the wait() function call. + */ + class AsyncUpdater { + public: + /** + * Waits for completion (success) of throws exception (failure). + */ + virtual void wait() = 0; + + /** + * Destructor + */ + virtual ~AsyncUpdater() {} + }; + + /** + * Triggers the asynchronous object update sequence, as described in AsyncUpdater + * class description. + * @param update a callable/lambda that will receive the fetched value as a + * parameter and return the updated value for commit. + * @return pointer to a newly created AsyncUpdater (for RAII) + */ + virtual AsyncUpdater * asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update) = 0; + /** * Base class for the representation of the parameters of the BackendStore. */ diff --git a/objectstore/BackendRados.cpp b/objectstore/BackendRados.cpp index f00871b830fe8e85a938a1e0657cba2c2e938a13..afaf15e10e506b4529de660e6cdb235e68a3a6c8 100644 --- a/objectstore/BackendRados.cpp +++ b/objectstore/BackendRados.cpp @@ -193,6 +193,179 @@ BackendRados::ScopedLock* BackendRados::lockShared(std::string name) { return ret.release(); } +Backend::AsyncUpdater* BackendRados::asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update) +{ + return new AsyncUpdater(*this, name, update); +} + +BackendRados::AsyncUpdater::AsyncUpdater(BackendRados& be, const std::string& name, std::function<std::string(const std::string&)>& update): m_backend(be), m_name(name), m_update(update) { + try { + librados::AioCompletion * aioc = librados::Rados::aio_create_completion(this, statCallback, nullptr); + // At construction time, we just fire a stat. + auto rc=m_backend.m_radosCtx.aio_stat(name, aioc, &m_size, &date); + aioc->release(); + if (rc) { + cta::exception::Errnum errnum (-rc, "In BackendRados::AsyncUpdater::AsyncUpdater(): failed to launch aio_stat()"); + throw Backend::NoSuchObject(errnum.getMessageValue()); + } + } catch (...) { + m_job.set_exception(std::current_exception()); + } +} + +void BackendRados::AsyncUpdater::statCallback(librados::completion_t completion, void* pThis) { + AsyncUpdater & au = *((AsyncUpdater *) pThis); + try { + // Check that the object exists. + if (rados_aio_get_return_value(completion)) { + cta::exception::Errnum errnum(-rados_aio_get_return_value(completion), + "In BackendRados::AsyncUpdater::statCallback(): could not stat object: "); + throw Backend::NoSuchObject(errnum.getMessageValue()); + } + // It does! Let's lock it. Rados does not have aio_lock, so we do it in an async. + // Operation is lock (synchronous), and then launch an async read. + // The async function never fails, exceptions go to the promise (as everywhere). + au.m_lockAsync.reset(new std::future<void>(std::async(std::launch::async, + [pThis](){ + AsyncUpdater & au = *((AsyncUpdater *) pThis); + try { + au.m_lockClient = BackendRados::createUniqueClientId(); + struct timeval tv; + tv.tv_usec = 0; + tv.tv_sec = 10; + int rc; + // Unfortunately, those loops will run in a limited number of threads, + // limiting the parallelism of the locking. + // TODO: could be improved (but need aio_lock in rados, not available at the time + // of writing). + do { + rc = au.m_backend.m_radosCtx.lock_exclusive(au.m_name, "lock", au.m_lockClient, "", &tv, 0); + } while (-EBUSY == rc); + if (rc) { + cta::exception::Errnum errnum(-rc, + std::string("In BackendRados::AsyncUpdater::statCallback::lock_lambda(): failed to librados::IoCtx::lock_exclusive: ") + + au.m_name + "/" + "lock" + "/" + au.m_lockClient + "//"); + throw CouldNotLock(errnum.getMessageValue()); + } + // Locking is done, we can launch the read operation (async). + librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, fetchCallback, nullptr); + rc=au.m_backend.m_radosCtx.aio_read(au.m_name, aioc, &au.m_radosBufferList, au.m_size, 0); + aioc->release(); + if (rc) { + cta::exception::Errnum errnum (-rc, "In BackendRados::AsyncUpdater::AsyncUpdater(): failed to launch aio_stat()"); + throw Backend::CouldNotFetch(errnum.getMessageValue()); + } + } catch (...) { + au.m_job.set_exception(std::current_exception()); + } + } + ))); + } catch (...) { + au.m_job.set_exception(std::current_exception()); + } +} + +void BackendRados::AsyncUpdater::fetchCallback(librados::completion_t completion, void* pThis) { + AsyncUpdater & au = *((AsyncUpdater *) pThis); + try { + // Check that the object could be read. + if (rados_aio_get_return_value(completion)<0) { + cta::exception::Errnum errnum(-rados_aio_get_return_value(completion), + "In BackendRados::AsyncUpdater::statCallback(): could not read object: "); + throw Backend::CouldNotFetch(errnum.getMessageValue()); + } + // We can now launch the update operation + au.m_updateAsync.reset(new std::future<void>(std::async(std::launch::async, + [pThis](){ + AsyncUpdater & au = *((AsyncUpdater *) pThis); + try { + // The data is in the buffer list. + std::string value; + try { + au.m_radosBufferList.copy(0, au.m_size, value); + } catch (std::exception & ex) { + throw CouldNotUpdateValue( + std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to read buffer: ") + + ex.what()); + } + try { + // Execute the user's callback. + value=au.m_update(value); + } catch (std::exception & ex) { + throw CouldNotUpdateValue( + std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to call update(): ") + + ex.what()); + } + try { + // Prepare result in buffer list. + au.m_radosBufferList.clear(); + au.m_radosBufferList.append(value); + } catch (std::exception & ex) { + throw CouldNotUpdateValue( + std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to prepare write buffer(): ") + + ex.what()); + } + // Launch the write + librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, commitCallback, nullptr); + auto rc=au.m_backend.m_radosCtx.aio_write_full(au.m_name, aioc, au.m_radosBufferList); + aioc->release(); + if (rc) { + cta::exception::Errnum errnum (-rc, + "In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to launch aio_write_full()"); + throw Backend::CouldNotCommit(errnum.getMessageValue()); + } + } catch (...) { + au.m_job.set_exception(std::current_exception()); + } + } + ))); + } catch (...) { + au.m_job.set_exception(std::current_exception()); + } +} + +void BackendRados::AsyncUpdater::commitCallback(librados::completion_t completion, void* pThis) { + AsyncUpdater & au = *((AsyncUpdater *) pThis); + try { + // Check that the object could be written. + if (rados_aio_get_return_value(completion)) { + cta::exception::Errnum errnum(-rados_aio_get_return_value(completion), + "In BackendRados::AsyncUpdater::commitCallback(): could not write object: "); + throw Backend::CouldNotCommit(errnum.getMessageValue()); + } + // Launch the async unlock. + librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, unlockCallback, nullptr); + auto rc=au.m_backend.m_radosCtx.aio_unlock(au.m_name, "lock", au.m_lockClient, aioc); + aioc->release(); + if (rc) { + cta::exception::Errnum errnum (-rc, "In BackendRados::AsyncUpdater::commitCallback(): failed to launch aio_unlock()"); + throw Backend::CouldNotUnlock(errnum.getMessageValue()); + } + } catch (...) { + au.m_job.set_exception(std::current_exception()); + } +} + +void BackendRados::AsyncUpdater::unlockCallback(librados::completion_t completion, void* pThis) { + AsyncUpdater & au = *((AsyncUpdater *) pThis); + try { + // Check that the object could be unlocked. + if (rados_aio_get_return_value(completion)) { + cta::exception::Errnum errnum(-rados_aio_get_return_value(completion), + "In BackendRados::AsyncUpdater::unlockCallback(): could not unlock object: "); + throw Backend::CouldNotUnlock(errnum.getMessageValue()); + } + // Done! + au.m_job.set_value(); + } catch (...) { + au.m_job.set_exception(std::current_exception()); + } +} + +void BackendRados::AsyncUpdater::wait() { + m_job.get_future().get(); +} + std::string BackendRados::Parameters::toStr() { std::stringstream ret; ret << "userId=" << m_userId << " pool=" << m_pool; diff --git a/objectstore/BackendRados.hpp b/objectstore/BackendRados.hpp index 684d6ac1ed2511ca1f41eb74962f0f56d3067dfb..899bb6aa4e4bf811c0e752cf33855be0b89b5fbe 100644 --- a/objectstore/BackendRados.hpp +++ b/objectstore/BackendRados.hpp @@ -20,6 +20,7 @@ #include "Backend.hpp" #include "rados/librados.hpp" +#include <future> namespace cta { namespace objectstore { @@ -28,6 +29,8 @@ namespace cta { namespace objectstore { */ class BackendRados: public Backend { public: + class AsyncUpdater; + friend class AsyncUpdater; /** * The constructor, connecting to the storage pool 'pool' using the user id * 'userId' @@ -35,32 +38,32 @@ public: * @param pool */ BackendRados(const std::string & userId, const std::string & pool, const std::string &radosNameSpace = ""); - virtual ~BackendRados(); - virtual std::string user() { + ~BackendRados() override; + std::string user() { return m_user; } - virtual std::string pool() { + std::string pool() { return m_pool; } - virtual void create(std::string name, std::string content); + void create(std::string name, std::string content) override; - virtual void atomicOverwrite(std::string name, std::string content); + void atomicOverwrite(std::string name, std::string content) override; - virtual std::string read(std::string name); + std::string read(std::string name) override; - virtual void remove(std::string name); + void remove(std::string name) override; - virtual bool exists(std::string name); + bool exists(std::string name) override; - virtual std::list<std::string> list(); + std::list<std::string> list() override; class ScopedLock: public Backend::ScopedLock { friend class BackendRados; public: - virtual void release(); - virtual ~ScopedLock(); + void release() override; + ~ScopedLock() override; private: ScopedLock(librados::IoCtx & ioCtx): m_lockSet(false), m_context(ioCtx) {} void set(const std::string & oid, const std::string clientId); @@ -71,14 +74,56 @@ public: }; private: - std::string createUniqueClientId(); + static std::string createUniqueClientId(); public: - virtual ScopedLock * lockExclusive(std::string name); + ScopedLock * lockExclusive(std::string name) override; - - virtual ScopedLock * lockShared(std::string name); + ScopedLock * lockShared(std::string name) override; + + /** + * A class following up the check existence-lock-fetch-update-write-unlock. Constructor implicitly + * starts the lock step. + */ + class AsyncUpdater: public Backend::AsyncUpdater { + public: + AsyncUpdater(BackendRados & be, const std::string & name, std::function <std::string(const std::string &)> & update); + void wait() override; + private: + /** A reference to the backend */ + BackendRados &m_backend; + /** The object name */ + const std::string m_name; + /** The operation on the object */ + std::function <std::string(const std::string &)> & m_update; + /** Storage for stat operation (size) */ + uint64_t m_size; + /** Storage for stat operation (date) */ + time_t date; + /** The promise that will both do the job and allow synchronization with the caller. */ + std::promise<void> m_job; + /** A future used to hold the structure of the lock operation. It will be either empty of complete at + destruction time */ + std::unique_ptr<std::future<void>> m_lockAsync; + /** A string used to identify the locker */ + std::string m_lockClient; + /** The rados bufferlist used to hold the object data (read+write) */ + ::librados::bufferlist m_radosBufferList; + /** A future the hole the the structure of the update operation. It will be either empty of complete at + destruction time */ + std::unique_ptr<std::future<void>> m_updateAsync; + /** The first callback operation (after checking existence) */ + static void statCallback(librados::completion_t completion, void *pThis); + /** The second callback operation (after reading) */ + static void fetchCallback(librados::completion_t completion, void *pThis); + /** The third callback operation (after writing) */ + static void commitCallback(librados::completion_t completion, void *pThis); + /** The fourth callback operation (after unlocking) */ + static void unlockCallback(librados::completion_t completion, void *pThis); + }; + Backend::AsyncUpdater* asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update) override; + class Parameters: public Backend::Parameters { friend class BackendRados; public: @@ -86,17 +131,17 @@ public: * The standard-issue params to string for logging * @return a string representation of the parameters for logging */ - virtual std::string toStr(); - virtual std::string toURL(); + std::string toStr() override; + std::string toURL() override; private: std::string m_userId; std::string m_pool; std::string m_namespace; }; - virtual Parameters * getParams(); + Parameters * getParams() override; - virtual std::string typeName() { + std::string typeName() override { return "cta::objectstore::BackendRados"; } diff --git a/objectstore/BackendTest.cpp b/objectstore/BackendTest.cpp index 357286ac840c79393ba0aadecbb3c2dfe1f68540..7d1fa99f7712e2d1c500a17f739afe59fbd75e32 100644 --- a/objectstore/BackendTest.cpp +++ b/objectstore/BackendTest.cpp @@ -85,6 +85,21 @@ TEST_P(BackendAbstractTest, LockingInterface) { ASSERT_FALSE(m_os->exists(nonExistingObject)); } +TEST_P(BackendAbstractTest, AsyncIOInterface) { + // Create object to update. + const std::string testValue = "1234"; + const std::string testSecondValue = "12345"; + const std::string testObjectName = "testObject"; + try {m_os->remove(testObjectName);}catch(...){} + m_os->create(testObjectName, testValue); + // Launch update of object via asynchronous IO + std::function<std::string(const std::string &)> updaterCallback=[&](const std::string &s)->std::string{return testSecondValue;}; + std::unique_ptr<cta::objectstore::Backend::AsyncUpdater> updater(m_os->asyncUpdate(testObjectName,updaterCallback)); + updater->wait(); + ASSERT_EQ(testSecondValue, m_os->read(testObjectName)); + m_os->remove(testObjectName); +} + TEST_P(BackendAbstractTest, ParametersInterface) { //std::cout << "Type=" << m_os->typeName() << std::endl; std::unique_ptr<cta::objectstore::Backend::Parameters> params( diff --git a/objectstore/BackendVFS.cpp b/objectstore/BackendVFS.cpp index a7ecf76d2da7f64b91b7c0f08d0ab803c82c2da3..6082ab0fe768d8bdd1ed931aa33d70ae855daddb 100644 --- a/objectstore/BackendVFS.cpp +++ b/objectstore/BackendVFS.cpp @@ -279,6 +279,25 @@ BackendVFS::ScopedLock * BackendVFS::lockShared(std::string name) { return ret.release(); } +BackendVFS::AsyncUpdater::AsyncUpdater(BackendVFS & be, const std::string& name, std::function<std::string(const std::string&)>& update): + m_backend(be), m_name(name), m_update(update), + m_job(std::async(std::launch::async, + [&](){ + std::unique_ptr<ScopedLock> sl(m_backend.lockExclusive(m_name)); + m_backend.atomicOverwrite(m_name, m_update(m_backend.read(m_name))); + })) +{} + +Backend::AsyncUpdater* BackendVFS::asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update) { + // Create the object. Done. + return new AsyncUpdater(*this, name, update); +} + +void BackendVFS::AsyncUpdater::wait() { + m_job.get(); +} + + std::string BackendVFS::Parameters::toStr() { std::stringstream ret; ret << "path=" << m_path; diff --git a/objectstore/BackendVFS.hpp b/objectstore/BackendVFS.hpp index 6894ae5b45d4c45d0022d3c2af4c7ff2122c7e84..6d39ebc99933febac93183ee3685fa96a7b4a20c 100644 --- a/objectstore/BackendVFS.hpp +++ b/objectstore/BackendVFS.hpp @@ -19,6 +19,8 @@ #pragma once #include "Backend.hpp" +#include <future> +#include <functional> namespace cta { namespace objectstore { /** @@ -57,25 +59,25 @@ public: */ void deleteOnExit(); - virtual ~BackendVFS(); + ~BackendVFS() override; - virtual void create(std::string name, std::string content); + void create(std::string name, std::string content) override; - virtual void atomicOverwrite(std::string name, std::string content); + void atomicOverwrite(std::string name, std::string content) override; - virtual std::string read(std::string name); + std::string read(std::string name) override; - virtual void remove(std::string name); + void remove(std::string name) override; - virtual bool exists(std::string name); + bool exists(std::string name) override; - virtual std::list<std::string> list(); + std::list<std::string> list() override; class ScopedLock: public Backend::ScopedLock { friend class BackendVFS; public: - virtual void release(); - virtual ~ScopedLock() { release(); } + void release() override; + ~ScopedLock() override { release(); } private: ScopedLock(): m_fdSet(false) {} void set(int fd, const std::string & path) { m_fd=fd; m_fdSet=true; m_path=path; } @@ -84,11 +86,30 @@ public: int m_fd; }; - virtual ScopedLock * lockExclusive(std::string name); + ScopedLock * lockExclusive(std::string name) override; - virtual ScopedLock * lockShared(std::string name); + ScopedLock * lockShared(std::string name) override; + + /** + * A class mimicking AIO using C++ async tasks + */ + class AsyncUpdater: public Backend::AsyncUpdater { + public: + AsyncUpdater(BackendVFS & be, const std::string & name, std::function <std::string(const std::string &)> & update); + void wait() override; + private: + /** A reference to the backend */ + BackendVFS &m_backend; + /** The object name */ + const std::string m_name; + /** The operation on the object */ + std::function <std::string(const std::string &)> & m_update; + /** The future that will both do the job and allow synchronization with the caller. */ + std::future<void> m_job; + }; + + Backend::AsyncUpdater* asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update) override; - class Parameters: public Backend::Parameters { friend class BackendVFS; public: @@ -96,13 +117,13 @@ public: * The standard-issue params to string for logging * @return a string representation of the parameters for logging */ - virtual std::string toStr(); + std::string toStr() override; /** * The standard-issue params to URL * @return a string representation of the parameters for logging */ - virtual std::string toURL(); + std::string toURL() override; /** * A more specific member, giving access to the path itself @@ -113,10 +134,10 @@ public: std::string m_path; }; - virtual Parameters * getParams(); + Parameters * getParams() override; - virtual std::string typeName() { + std::string typeName() override { return "cta::objectstore::BackendVFS"; } diff --git a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp index 0f06b227dd9ef24d5ab79304f97612a4de6c25e1..aa778cb975c74aa68e0ea33e47da452c24d00bf3 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp @@ -1540,8 +1540,8 @@ INSTANTIATE_TEST_CASE_P(OStoreDBPlusMockSchedulerTestVFS, DataTransferSessionTes #ifdef TEST_RADOS static cta::OStoreDBFactory<cta::objectstore::BackendRados> OStoreDBFactoryRados("rados://tapetest@tapetest"); -INSTANTIATE_TEST_CASE_P(OStoreDBPlusMockSchedulerTestRados, SchedulerTest, - ::testing::Values(SchedulerTestParam(OStoreDBFactoryRados))); +INSTANTIATE_TEST_CASE_P(OStoreDBPlusMockSchedulerTestRados, DataTransferSessionTest, + ::testing::Values(DataTransferSessionTestParam(OStoreDBFactoryRados))); #endif } // namespace unitTest