diff --git a/objectstore/BackendRados.cpp b/objectstore/BackendRados.cpp index 40416e2dd12dab8769c89ea0725ddefc43fb77d4..1217a4f7fa09c27522739e8089ffdf9f0626bed9 100644 --- a/objectstore/BackendRados.cpp +++ b/objectstore/BackendRados.cpp @@ -140,7 +140,7 @@ lc.log(log::DEBUG, "BackendRados::BackendRados() namespace set. About to test ac params.add("workThreads", m_threads.size()); lc.log(log::INFO, "In BackendRados::BackendRados(): created worker threads"); } catch (...) { - for (size_t i=0; i<m_threads.size(); i++) m_JobQueue.push(nullptr); + for (size_t i=0; i<m_threads.size(); i++) m_jobQueue.push(nullptr); for (auto &t: m_threads) { if (t) t->wait(); delete t; @@ -158,7 +158,7 @@ lc.log(log::DEBUG, "BackendRados::BackendRados() namespace set. About to test ac } BackendRados::~BackendRados() { - for (size_t i=0; i<m_threads.size(); i++) m_JobQueue.push(nullptr); + for (size_t i=0; i<m_threads.size(); i++) m_jobQueue.push(nullptr); for (auto &t: m_threads) { t->wait(); } @@ -602,7 +602,7 @@ BackendRados::RadosWorkerThreadAndContext::~RadosWorkerThreadAndContext() { void BackendRados::RadosWorkerThreadAndContext::run() { while (1) { - BackendRados::AsyncJob * j=m_parentBackend.m_JobQueue.pop(); + BackendRados::AsyncJob * j=m_parentBackend.m_jobQueue.pop(); if (j) { j->execute(); } else { @@ -611,7 +611,6 @@ void BackendRados::RadosWorkerThreadAndContext::run() { } } - Backend::AsyncUpdater* BackendRados::asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update) { return new AsyncUpdater(*this, name, update); @@ -619,6 +618,7 @@ Backend::AsyncUpdater* BackendRados::asyncUpdate(const std::string & name, std:: BackendRados::AsyncUpdater::AsyncUpdater(BackendRados& be, const std::string& name, std::function<std::string(const std::string&)>& update): m_backend(be), m_name(name), m_update(update), m_job(), m_jobFuture(m_job.get_future()) { + m_updateJob.setParentUpdater(this); // At construction time, we just fire a lock. try { // Rados does not have aio_lock, so we do it in an async. @@ -680,75 +680,75 @@ void BackendRados::AsyncUpdater::fetchCallback(librados::completion_t completion std::string("In BackendRados::AsyncUpdater::fetchCallback(): could not read object: ") + au.m_name); throw Backend::CouldNotFetch(errnum.getMessageValue()); } - // We can now launch the update operation - au.m_updateAsync.reset(new std::future<void>(std::async(std::launch::async, - [pThis](){ - AsyncUpdater & au = *((AsyncUpdater *) pThis); - try { - // The data is in the buffer list. - std::string value; - try { - au.m_radosBufferList.copy(0, au.m_radosBufferList.length(), value); - } catch (std::exception & ex) { - throw CouldNotUpdateValue( - std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to read buffer: ") + - au.m_name + ": "+ ex.what()); - } - - bool updateWithDelete = false; - try { - // Execute the user's callback. - value=au.m_update(value); - } catch (AsyncUpdateWithDelete & ex) { - updateWithDelete = true; - } catch (...) { - // Let exceptions fly through. User knows his own exceptions. - throw; - } - - if(updateWithDelete) { - try { - au.m_backend.remove(au.m_name); - if (au.m_backend.exists(au.m_name)) { - throw exception::Exception("Object exists after remove"); - } - } catch (cta::exception::Exception &ex) { - throw CouldNotUpdateValue( - std::string("In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to remove value: ") + - au.m_name + ex.what()); - } - // Done! - ANNOTATE_HAPPENS_BEFORE(&au.m_job); - au.m_job.set_value(); - } else { - try { - // Prepare result in buffer list. - au.m_radosBufferList.clear(); - au.m_radosBufferList.append(value); - } catch (std::exception & ex) { - throw CouldNotUpdateValue( - std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to prepare write buffer(): ") + - au.m_name + ex.what()); - } - // Launch the write - librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, commitCallback, nullptr); - RadosTimeoutLogger rtl; - au.m_radosTimeoutLogger.reset(); - auto rc=au.m_backend.getRadosCtx().aio_write_full(au.m_name, aioc, au.m_radosBufferList); - rtl.logIfNeeded("In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): m_radosCtx.aio_write_full() call", au.m_name); - aioc->release(); - if (rc) { - cta::exception::Errnum errnum (-rc, - "In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to launch aio_write_full()" + au.m_name); - throw Backend::CouldNotCommit(errnum.getMessageValue()); - } - } - } catch (...) { - ANNOTATE_HAPPENS_BEFORE(&au.m_job); - au.m_job.set_exception(std::current_exception()); - } + // We can now launch the update operation (post to thread pool). + au.m_backend.m_jobQueue.push(&au.m_updateJob); + } catch (...) { + ANNOTATE_HAPPENS_BEFORE(&au.m_job); + au.m_job.set_exception(std::current_exception()); + } +} + +void BackendRados::AsyncUpdater::UpdateJob::execute() { + AsyncUpdater & au = *m_parentUpdater; + try { + // The data is in the buffer list. + std::string value; + try { + au.m_radosBufferList.copy(0, au.m_radosBufferList.length(), value); + } catch (std::exception & ex) { + throw CouldNotUpdateValue( + std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to read buffer: ") + + au.m_name + ": "+ ex.what()); + } + + bool updateWithDelete = false; + try { + // Execute the user's callback. + value=au.m_update(value); + } catch (AsyncUpdateWithDelete & ex) { + updateWithDelete = true; + } catch (...) { + // Let exceptions fly through. User knows his own exceptions. + throw; + } + + if(updateWithDelete) { + try { + au.m_backend.remove(au.m_name); + if (au.m_backend.exists(au.m_name)) { + throw exception::Exception("Object exists after remove"); } - ))); + } catch (cta::exception::Exception &ex) { + throw CouldNotUpdateValue( + std::string("In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to remove value: ") + + au.m_name + ex.what()); + } + // Done! + ANNOTATE_HAPPENS_BEFORE(&au.m_job); + au.m_job.set_value(); + } else { + try { + // Prepare result in buffer list. + au.m_radosBufferList.clear(); + au.m_radosBufferList.append(value); + } catch (std::exception & ex) { + throw CouldNotUpdateValue( + std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to prepare write buffer(): ") + + au.m_name + ex.what()); + } + // Launch the write + librados::AioCompletion * aioc = librados::Rados::aio_create_completion(m_parentUpdater, commitCallback, nullptr); + RadosTimeoutLogger rtl; + au.m_radosTimeoutLogger.reset(); + auto rc=au.m_backend.getRadosCtx().aio_write_full(au.m_name, aioc, au.m_radosBufferList); + rtl.logIfNeeded("In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): m_radosCtx.aio_write_full() call", au.m_name); + aioc->release(); + if (rc) { + cta::exception::Errnum errnum (-rc, + "In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to launch aio_write_full()" + au.m_name); + throw Backend::CouldNotCommit(errnum.getMessageValue()); + } + } } catch (...) { ANNOTATE_HAPPENS_BEFORE(&au.m_job); au.m_job.set_exception(std::current_exception()); diff --git a/objectstore/BackendRados.hpp b/objectstore/BackendRados.hpp index 60b5dd846cdb1ecd1ed985135596aab70d6996db..f90340c5cd7dbab4a9cf986dcdff1190a6f02215 100644 --- a/objectstore/BackendRados.hpp +++ b/objectstore/BackendRados.hpp @@ -203,7 +203,7 @@ private: /** * The queue for the thread-and-context pool. */ - cta::threading::BlockingQueue<AsyncJob *> m_JobQueue; + cta::threading::BlockingQueue<AsyncJob *> m_jobQueue; /** * The class for the worker threads @@ -229,7 +229,7 @@ private: public: /** - * A class following up the check existence-lock-fetch-update-write-unlock. Constructor implicitly + * A class following up the lock-fetch-update-write-unlock. Constructor implicitly * starts the lock step. */ class AsyncUpdater: public Backend::AsyncUpdater { @@ -256,9 +256,17 @@ public: std::string m_lockClient; /** The rados bufferlist used to hold the object data (read+write) */ ::librados::bufferlist m_radosBufferList; - /** A future the hole the the structure of the update operation. It will be either empty of complete at - destruction time */ - std::unique_ptr<std::future<void>> m_updateAsync; + /** An async job that will process the update of the object. */ + class UpdateJob: public AsyncJob { + public: + void setParentUpdater (AsyncUpdater * updater) { m_parentUpdater = updater; } + void execute() override; + private: + AsyncUpdater * m_parentUpdater = nullptr; + + }; + friend class UpdateJob; + UpdateJob m_updateJob; /** Async delete in case of zero sized object */ static void deleteEmptyCallback(librados::completion_t completion, void *pThis); /** The second callback operation (after reading) */