Commit f0402bf1 authored by Eric Cano's avatar Eric Cano
Browse files

Used thread pool instead of std::async for computing updates in BackenRados.

parent db777150
......@@ -140,7 +140,7 @@ lc.log(log::DEBUG, "BackendRados::BackendRados() namespace set. About to test ac
params.add("workThreads", m_threads.size());
lc.log(log::INFO, "In BackendRados::BackendRados(): created worker threads");
} catch (...) {
for (size_t i=0; i<m_threads.size(); i++) m_JobQueue.push(nullptr);
for (size_t i=0; i<m_threads.size(); i++) m_jobQueue.push(nullptr);
for (auto &t: m_threads) {
if (t) t->wait();
delete t;
......@@ -158,7 +158,7 @@ lc.log(log::DEBUG, "BackendRados::BackendRados() namespace set. About to test ac
}
BackendRados::~BackendRados() {
for (size_t i=0; i<m_threads.size(); i++) m_JobQueue.push(nullptr);
for (size_t i=0; i<m_threads.size(); i++) m_jobQueue.push(nullptr);
for (auto &t: m_threads) {
t->wait();
}
......@@ -602,7 +602,7 @@ BackendRados::RadosWorkerThreadAndContext::~RadosWorkerThreadAndContext() {
void BackendRados::RadosWorkerThreadAndContext::run() {
while (1) {
BackendRados::AsyncJob * j=m_parentBackend.m_JobQueue.pop();
BackendRados::AsyncJob * j=m_parentBackend.m_jobQueue.pop();
if (j) {
j->execute();
} else {
......@@ -611,7 +611,6 @@ void BackendRados::RadosWorkerThreadAndContext::run() {
}
}
Backend::AsyncUpdater* BackendRados::asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update)
{
return new AsyncUpdater(*this, name, update);
......@@ -619,6 +618,7 @@ Backend::AsyncUpdater* BackendRados::asyncUpdate(const std::string & name, std::
BackendRados::AsyncUpdater::AsyncUpdater(BackendRados& be, const std::string& name, std::function<std::string(const std::string&)>& update):
m_backend(be), m_name(name), m_update(update), m_job(), m_jobFuture(m_job.get_future()) {
m_updateJob.setParentUpdater(this);
// At construction time, we just fire a lock.
try {
// Rados does not have aio_lock, so we do it in an async.
......@@ -680,75 +680,75 @@ void BackendRados::AsyncUpdater::fetchCallback(librados::completion_t completion
std::string("In BackendRados::AsyncUpdater::fetchCallback(): could not read object: ") + au.m_name);
throw Backend::CouldNotFetch(errnum.getMessageValue());
}
// We can now launch the update operation
au.m_updateAsync.reset(new std::future<void>(std::async(std::launch::async,
[pThis](){
AsyncUpdater & au = *((AsyncUpdater *) pThis);
try {
// The data is in the buffer list.
std::string value;
try {
au.m_radosBufferList.copy(0, au.m_radosBufferList.length(), value);
} catch (std::exception & ex) {
throw CouldNotUpdateValue(
std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to read buffer: ") +
au.m_name + ": "+ ex.what());
}
bool updateWithDelete = false;
try {
// Execute the user's callback.
value=au.m_update(value);
} catch (AsyncUpdateWithDelete & ex) {
updateWithDelete = true;
} catch (...) {
// Let exceptions fly through. User knows his own exceptions.
throw;
}
if(updateWithDelete) {
try {
au.m_backend.remove(au.m_name);
if (au.m_backend.exists(au.m_name)) {
throw exception::Exception("Object exists after remove");
}
} catch (cta::exception::Exception &ex) {
throw CouldNotUpdateValue(
std::string("In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to remove value: ") +
au.m_name + ex.what());
}
// Done!
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_value();
} else {
try {
// Prepare result in buffer list.
au.m_radosBufferList.clear();
au.m_radosBufferList.append(value);
} catch (std::exception & ex) {
throw CouldNotUpdateValue(
std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to prepare write buffer(): ") +
au.m_name + ex.what());
}
// Launch the write
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, commitCallback, nullptr);
RadosTimeoutLogger rtl;
au.m_radosTimeoutLogger.reset();
auto rc=au.m_backend.getRadosCtx().aio_write_full(au.m_name, aioc, au.m_radosBufferList);
rtl.logIfNeeded("In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): m_radosCtx.aio_write_full() call", au.m_name);
aioc->release();
if (rc) {
cta::exception::Errnum errnum (-rc,
"In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to launch aio_write_full()" + au.m_name);
throw Backend::CouldNotCommit(errnum.getMessageValue());
}
}
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_exception(std::current_exception());
}
// We can now launch the update operation (post to thread pool).
au.m_backend.m_jobQueue.push(&au.m_updateJob);
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_exception(std::current_exception());
}
}
void BackendRados::AsyncUpdater::UpdateJob::execute() {
AsyncUpdater & au = *m_parentUpdater;
try {
// The data is in the buffer list.
std::string value;
try {
au.m_radosBufferList.copy(0, au.m_radosBufferList.length(), value);
} catch (std::exception & ex) {
throw CouldNotUpdateValue(
std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to read buffer: ") +
au.m_name + ": "+ ex.what());
}
bool updateWithDelete = false;
try {
// Execute the user's callback.
value=au.m_update(value);
} catch (AsyncUpdateWithDelete & ex) {
updateWithDelete = true;
} catch (...) {
// Let exceptions fly through. User knows his own exceptions.
throw;
}
if(updateWithDelete) {
try {
au.m_backend.remove(au.m_name);
if (au.m_backend.exists(au.m_name)) {
throw exception::Exception("Object exists after remove");
}
)));
} catch (cta::exception::Exception &ex) {
throw CouldNotUpdateValue(
std::string("In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to remove value: ") +
au.m_name + ex.what());
}
// Done!
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_value();
} else {
try {
// Prepare result in buffer list.
au.m_radosBufferList.clear();
au.m_radosBufferList.append(value);
} catch (std::exception & ex) {
throw CouldNotUpdateValue(
std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to prepare write buffer(): ") +
au.m_name + ex.what());
}
// Launch the write
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(m_parentUpdater, commitCallback, nullptr);
RadosTimeoutLogger rtl;
au.m_radosTimeoutLogger.reset();
auto rc=au.m_backend.getRadosCtx().aio_write_full(au.m_name, aioc, au.m_radosBufferList);
rtl.logIfNeeded("In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): m_radosCtx.aio_write_full() call", au.m_name);
aioc->release();
if (rc) {
cta::exception::Errnum errnum (-rc,
"In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to launch aio_write_full()" + au.m_name);
throw Backend::CouldNotCommit(errnum.getMessageValue());
}
}
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_exception(std::current_exception());
......
......@@ -203,7 +203,7 @@ private:
/**
* The queue for the thread-and-context pool.
*/
cta::threading::BlockingQueue<AsyncJob *> m_JobQueue;
cta::threading::BlockingQueue<AsyncJob *> m_jobQueue;
/**
* The class for the worker threads
......@@ -229,7 +229,7 @@ private:
public:
/**
* A class following up the check existence-lock-fetch-update-write-unlock. Constructor implicitly
* A class following up the lock-fetch-update-write-unlock. Constructor implicitly
* starts the lock step.
*/
class AsyncUpdater: public Backend::AsyncUpdater {
......@@ -256,9 +256,17 @@ public:
std::string m_lockClient;
/** The rados bufferlist used to hold the object data (read+write) */
::librados::bufferlist m_radosBufferList;
/** A future the hole the the structure of the update operation. It will be either empty of complete at
destruction time */
std::unique_ptr<std::future<void>> m_updateAsync;
/** An async job that will process the update of the object. */
class UpdateJob: public AsyncJob {
public:
void setParentUpdater (AsyncUpdater * updater) { m_parentUpdater = updater; }
void execute() override;
private:
AsyncUpdater * m_parentUpdater = nullptr;
};
friend class UpdateJob;
UpdateJob m_updateJob;
/** Async delete in case of zero sized object */
static void deleteEmptyCallback(librados::completion_t completion, void *pThis);
/** The second callback operation (after reading) */
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment