Commit 22870c12 authored by Eric Cano's avatar Eric Cano
Browse files

Used thread pool instead of direct posting for async fetcher in BackenRados.

parent 0a6b6e19
......@@ -879,19 +879,32 @@ void BackendRados::AsyncDeleter::wait() {
BackendRados::AsyncLockfreeFetcher::AsyncLockfreeFetcher(BackendRados& be, const std::string& name):
m_backend(be), m_name(name), m_job(), m_jobFuture(m_job.get_future()) {
// At construction time, we just start the read.
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(this, fetchCallback, nullptr);
RadosTimeoutLogger rtl;
m_radosTimeoutLogger.reset();
auto rc=m_backend.getRadosCtx().aio_read(m_name, aioc, &m_radosBufferList, std::numeric_limits<int32_t>::max(), 0);
rtl.logIfNeeded("BackendRados::AsyncLockfreeFetcher::AsyncLockfreeFetcher(): m_radosCtx.aio_read()", m_name);
aioc->release();
if (rc) {
cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncLockfreeFetcher::AsyncLockfreeFetcher(): failed to launch aio_read(): ")+m_name);
throw Backend::NoSuchObject(errnum.getMessageValue());
// At construction, just post the aio poster to the thread pool.
m_aioReadPoster.setParentFatcher(this);
m_backend.m_jobQueue.push(&m_aioReadPoster);
}
void BackendRados::AsyncLockfreeFetcher::AioReadPoster::execute() {
AsyncLockfreeFetcher & au = *m_parentFetcher;
try {
// Just start the read.
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(m_parentFetcher, fetchCallback, nullptr);
RadosTimeoutLogger rtl;
au.m_radosTimeoutLogger.reset();
auto rc=au.m_backend.getRadosCtx().aio_read(au.m_name, aioc, &au.m_radosBufferList, std::numeric_limits<int32_t>::max(), 0);
rtl.logIfNeeded("BackendRados::AsyncLockfreeFetcher::AsyncLockfreeFetcher(): m_radosCtx.aio_read()", au.m_name);
aioc->release();
if (rc) {
cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncLockfreeFetcher::AsyncLockfreeFetcher(): failed to launch aio_read(): ")+au.m_name);
throw Backend::NoSuchObject(errnum.getMessageValue());
}
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_exception(std::current_exception());
}
}
void BackendRados::AsyncLockfreeFetcher::fetchCallback(librados::completion_t completion, void* pThis) {
AsyncLockfreeFetcher & au = *((AsyncLockfreeFetcher *) pThis);
au.m_radosTimeoutLogger.logIfNeeded("In BackendRados::AsyncLockfreeFetcher::fetchCallback(): aio_read callback", au.m_name);
......
......@@ -263,7 +263,6 @@ public:
void execute() override;
private:
AsyncUpdater * m_parentUpdater = nullptr;
};
friend class UpdateJob;
UpdateJob m_updateJob;
......@@ -326,6 +325,16 @@ public:
BackendRados &m_backend;
/** The object name */
const std::string m_name;
/** The aio posting task */
class AioReadPoster: public AsyncJob {
public:
void setParentFatcher (AsyncLockfreeFetcher * fetcher) { m_parentFetcher = fetcher; }
void execute() override;
private:
AsyncLockfreeFetcher * m_parentFetcher = nullptr;
};
friend class AioReadPoster;
AioReadPoster m_aioReadPoster;
/** The promise that will both do the job and allow synchronization with the caller. */
std::promise<std::string> m_job;
/** The future from m_jobs, which will be extracted before any thread gets a chance to play with it. */
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment