Commit 00520fcf authored by Eric Cano's avatar Eric Cano
Browse files

Created a new asyncCreate call in backends.

parent 3fd17f30
......@@ -105,6 +105,7 @@ public:
/// A collection of exceptions allowing the user to find out which step failed.
CTA_GENERATE_EXCEPTION_CLASS(NoSuchObject);
CTA_GENERATE_EXCEPTION_CLASS(WrongPreviousOwner);
CTA_GENERATE_EXCEPTION_CLASS(CouldNotCreate);
CTA_GENERATE_EXCEPTION_CLASS(CouldNotLock);
CTA_GENERATE_EXCEPTION_CLASS(CouldNotFetch);
CTA_GENERATE_EXCEPTION_CLASS(CouldNotUpdateValue);
......@@ -113,13 +114,36 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(CouldNotUnlock);
CTA_GENERATE_EXCEPTION_CLASS(AsyncUpdateWithDelete);
/**
* A base class handling asynchronous creation of objects.
*/
class AsyncCreator {
public:
/**
* Waits for completion (success) of throws exception (failure).
*/
virtual void wait() = 0;
/**
* Destructor
*/
virtual ~AsyncCreator() {}
};
/**
* Triggers the asynchronous object creator.
*
* @return pointer to a newly created AsyncUpdater (for RAII)
*/
virtual AsyncCreator * asyncCreate(const std::string & name, const std::string & value) = 0;
/**
* A base class handling asynchronous sequence of lock exclusive, fetch, call user
* operation, commit, unlock. Each operation will be asynchronous, and the result
* (success or exception) will be returned via the wait() function call.
*/
class AsyncUpdater {
public:
public:
/**
* Waits for completion (success) of throws exception (failure).
*/
......@@ -158,6 +182,15 @@ public:
*/
virtual ~AsyncDeleter() {}
};
/**
* Triggers the asynchronous object delete sequence, as described in
* AsyncDeleter class description.
*
* @param name The name of the object to be deleted.
* @return pointer to a newly created AsyncDeleter
*/
virtual AsyncDeleter * asyncDelete(const std::string & name) = 0;
/**
* A base class handling asynchronous fetch (lockfree).
......@@ -187,15 +220,6 @@ public:
*/
virtual AsyncLockfreeFetcher * asyncLockfreeFetch(const std::string & name) = 0;
/**
* Triggers the asynchronous object delete sequence, as described in
* AsyncDeleter class description.
*
* @param name The name of the object to be deleted.
* @return pointer to a newly created AsyncDeleter
*/
virtual AsyncDeleter * asyncDelete(const std::string & name) = 0;
/**
* Base class for the representation of the parameters of the BackendStore.
*/
......
......@@ -199,7 +199,7 @@ void BackendRados::create(std::string name, std::string content) {
int ret = -getRadosCtx().operate(name, &wop);
return ret;
},
std::string("In BackendRados::create, failed to create exclusively or write: ")
std::string("In BackendRados::create(), failed to create exclusively or write: ")
+ name);
} catch (cta::exception::Errnum & en) {
if (en.errorNumber() == EEXIST) {
......@@ -209,20 +209,20 @@ void BackendRados::create(std::string name, std::string content) {
// The lock function will delete it immediately, but we could have attempted to create the object in this very moment.
// We will stat-poll the object and retry the create as soon as it's gone.
uint64_t size;
time_t date;
time_t time;
cta::utils::Timer t;
restat:;
int statRet = getRadosCtx().stat(name, &size, &date);
int statRet = getRadosCtx().stat(name, &size, &time);
if (-ENOENT == statRet) {
// Object is gone already, let's retry.
goto retry;
} else if (!statRet) {
// If the size of the object is not zero, this is another problem.
if (size) {
en.getMessage() << " After statRet=" << statRet << " size=" << size << " date=" << date;
en.getMessage() << " After statRet=" << statRet << " size=" << size << " time=" << time;
throw en;
} else if (t.secs() > 10) {
en.getMessage() << " Object is still here after 10s. statRet=" << statRet << " size=" << size << " date=" << date;
en.getMessage() << " Object is still here after 10s. statRet=" << statRet << " size=" << size << " time=" << time;
throw en;
} else goto restat;
} else {
......@@ -825,6 +825,147 @@ void BackendRados::RadosWorkerThreadAndContext::run() {
}
}
Backend::AsyncCreator* BackendRados::asyncCreate(const std::string& name, const std::string& value) {
return new AsyncCreator(*this, name, value);
}
BackendRados::AsyncCreator::AsyncCreator(BackendRados& be, const std::string& name, const std::string& value):
m_backend(be), m_name(name), m_value(value), m_job(), m_jobFuture(m_job.get_future()) {
try {
librados::ObjectWriteOperation wop;
const bool createExclusive = true;
wop.create(createExclusive);
m_radosBufferList.clear();
m_radosBufferList.append(value.c_str(), value.size());
wop.write_full(m_radosBufferList);
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(this, createExclusiveCallback, nullptr);
m_radosTimeoutLogger.reset();
RadosTimeoutLogger rtl;
int rc;
cta::exception::Errnum::throwOnReturnedErrnoOrThrownStdException([&]() {
rc=m_backend.getRadosCtx().aio_operate(m_name, aioc, &wop);
return 0;
}, "In BackendRados::AsyncCreator::AsyncCreator(): failed m_backend.getRadosCtx().aio_operate()");
rtl.logIfNeeded("In BackendRados::AsyncCreator::AsyncCreator(): m_radosCtx.aio_operate() call", m_name);
aioc->release();
if (rc) {
cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncCreator::AsyncCreator(): failed to launch aio_operate(): ")+m_name);
throw Backend::CouldNotCreate(errnum.getMessageValue());
}
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&m_job);
m_job.set_exception(std::current_exception());
}
}
void BackendRados::AsyncCreator::createExclusiveCallback(librados::completion_t completion, void* pThis) {
AsyncCreator & ac = *((AsyncCreator *) pThis);
ac.m_radosTimeoutLogger.logIfNeeded("In BackendRados::AsyncCreator::createExclusiveCallback(): aio_operate callback", ac.m_name);
try {
// Check that the object could be created.
if (rados_aio_get_return_value(completion)) {
if (EEXIST == -rados_aio_get_return_value(completion)) {
// We can race with locking in some situations: attempting to lock a non-existing object creates it, of size
// zero.
// The lock function will delete it immediately, but we could have attempted to create the object in this very moment.
// We will stat-poll the object and retry the create as soon as it's gone.
// Prepare the retry timer (it will be used in the stat step).
if (!ac.m_retryTimer) ac.m_retryTimer.reset(new utils::Timer());
RadosTimeoutLogger rtl;
int rc;
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, statCallback, nullptr);
cta::exception::Errnum::throwOnReturnedErrnoOrThrownStdException([&]() {
rc=ac.m_backend.getRadosCtx().aio_stat(ac.m_name, aioc, &ac.m_size, &ac.m_time);
return 0;
}, "In BackendRados::AsyncCreator::createExclusiveCallback(): failed m_backend.getRadosCtx().aio_operate()");
rtl.logIfNeeded("In BackendRados::AsyncCreator::createExclusiveCallback(): m_radosCtx.aio_operate() call", ac.m_name);
} else {
cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
std::string("In BackendRados::AsyncCreator::createExclusiveCallback(): could not create object: ") + ac.m_name);
throw Backend::CouldNotCreate(errnum.getMessageValue());
}
}
// Done!
ANNOTATE_HAPPENS_BEFORE(&ac.m_job);
ac.m_job.set_value();
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&ac.m_job);
ac.m_job.set_exception(std::current_exception());
}
}
void BackendRados::AsyncCreator::statCallback(librados::completion_t completion, void* pThis) {
AsyncCreator & ac = *((AsyncCreator *) pThis);
ac.m_radosTimeoutLogger.logIfNeeded("In BackendRados::AsyncCreator::statCallback(): aio_stat callback", ac.m_name);
try {
if (rados_aio_get_return_value(completion)) {
if (ENOENT == -rados_aio_get_return_value(completion)) {
// The object is gone while we tried to stat it. Fine. Let's retry the write.
librados::ObjectWriteOperation wop;
const bool createExclusive = true;
wop.create(createExclusive);
ac.m_radosBufferList.clear();
ac.m_radosBufferList.append(ac.m_value.c_str(), ac.m_value.size());
wop.write_full(ac.m_radosBufferList);
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, createExclusiveCallback, nullptr);
ac.m_radosTimeoutLogger.reset();
RadosTimeoutLogger rtl;
int rc;
cta::exception::Errnum::throwOnReturnedErrnoOrThrownStdException([&]() {
rc=ac.m_backend.getRadosCtx().aio_operate(ac.m_name, aioc, &wop);
return 0;
}, "In BackendRados::AsyncCreator::statCallback(): failed m_backend.getRadosCtx().aio_operate()");
rtl.logIfNeeded("In BackendRados::AsyncCreator::statCallback(): m_radosCtx.aio_operate() call", ac.m_name);
aioc->release();
if (rc) {
cta::exception::Errnum errnum (-rc,
std::string("In BackendRados::AsyncCreator::statCallback(): failed to launch aio_operate(): ")+ ac.m_name);
throw Backend::CouldNotCreate(errnum.getMessageValue());
}
} else {
// We had some other error. This is a failure.
cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
std::string("In BackendRados::AsyncCreator::statCallback(): could not stat object: ") + ac.m_name);
throw Backend::CouldNotCreate(errnum.getMessageValue());
}
} else {
// We got our stat result: let's see if the object is zero sized.
if (ac.m_size) {
// We have a non-zero sized object. This is not just a race.
exception::Errnum en(EEXIST, "In BackendRados::AsyncCreator::statCallback: object already exists: ");
en.getMessage() << ac.m_name << "After statRet=" << -rados_aio_get_return_value(completion)
<< " size=" << ac.m_size << " time=" << ac.m_time;
throw en;
} else {
// The object is indeed zero-sized. We can just retry stat (for 10s max)
if (ac.m_retryTimer && (ac.m_retryTimer->secs() > 10)) {
exception::Errnum en(EEXIST, "In BackendRados::AsyncCreator::statCallback: Object is still here after 10s: ");
en.getMessage() << ac.m_name << "After statRet=" << -rados_aio_get_return_value(completion)
<< " size=" << ac.m_size << " time=" << ac.m_time;
throw en;
}
RadosTimeoutLogger rtl;
int rc;
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, statCallback, nullptr);
cta::exception::Errnum::throwOnReturnedErrnoOrThrownStdException([&]() {
rc=ac.m_backend.getRadosCtx().aio_stat(ac.m_name, aioc, &ac.m_size, &ac.m_time);
return 0;
}, "In BackendRados::AsyncCreator::statCallback(): failed m_backend.getRadosCtx().aio_operate()");
rtl.logIfNeeded("In BackendRados::AsyncCreator::statCallback(): m_radosCtx.aio_operate() call", ac.m_name);
}
}
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&ac.m_job);
ac.m_job.set_exception(std::current_exception());
}
}
void BackendRados::AsyncCreator::wait() {
m_jobFuture.get();
ANNOTATE_HAPPENS_AFTER(&m_job);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
}
Backend::AsyncUpdater* BackendRados::asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update)
{
return new AsyncUpdater(*this, name, update);
......@@ -1067,7 +1208,7 @@ BackendRados::AsyncDeleter::AsyncDeleter(BackendRados& be, const std::string& na
rtl.logIfNeeded("In BackendRados::AsyncDeleter::AsyncDeleter(): m_radosCtx.aio_remove() call", m_name);
aioc->release();
if (rc) {
cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncUpdater::statCallback(): failed to launch aio_remove(): ")+m_name);
cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncDeleter::AsyncDeleter(): failed to launch aio_remove(): ")+m_name);
throw Backend::CouldNotDelete(errnum.getMessageValue());
}
} catch (...) {
......
......@@ -266,6 +266,42 @@ private:
std::vector<RadosWorkerThreadAndContext *> m_threads;
public:
/**
* A class following up the async creation. Constructor implicitly starts the creation.
*/
class AsyncCreator: public Backend::AsyncCreator {
public:
AsyncCreator(BackendRados & be, const std::string & name, const std::string & value);
void wait() override;
private:
/** A reference to the backend */
BackendRados &m_backend;
/** The object name */
const std::string m_name;
/** The content of object */
std::string m_value;
/** Storage for stat operation (date) in case of EEXIST (we retry on zero sized objects, others are a genuine error). */
time_t m_time;
/** Storage for stat operation (size) in case of EEXIST (we retry on zero sized objects, others are a genuine error). */
uint64_t m_size;
/** The promise that will both do the job and allow synchronization with the caller. */
std::promise<void> m_job;
/** The future from m_jobs, which will be extracted before any thread gets a chance to play with it. */
std::future<void> m_jobFuture;
/** The rados bufferlist used to hold the object data (read+write) */
::librados::bufferlist m_radosBufferList;
/** Callback for the write operation */
static void createExclusiveCallback(librados::completion_t completion, void *pThis);
/** Callback for stat operation, handling potential retries after EEXIST */
static void statCallback(librados::completion_t completion, void *pThis);
/** Instrumentation for rados calls timing */
RadosTimeoutLogger m_radosTimeoutLogger;
/** Timer for retries (created only when needed */
std::unique_ptr<cta::utils::Timer> m_retryTimer;
};
Backend::AsyncCreator* asyncCreate(const std::string& name, const std::string& value) override;
/**
* A class following up the lock-fetch-update-write-unlock. Constructor implicitly
* starts the lock step.
......@@ -281,8 +317,6 @@ public:
const std::string m_name;
/** The operation on the object */
std::function <std::string(const std::string &)> & m_update;
/** Storage for stat operation (date) */
time_t date;
/** The promise that will both do the job and allow synchronization with the caller. */
std::promise<void> m_job;
/** The future from m_jobs, which will be extracted before any thread gets a chance to play with it. */
......@@ -331,8 +365,6 @@ public:
BackendRados &m_backend;
/** The object name */
const std::string m_name;
/** Storage for stat operation (date) */
time_t date;
/** The promise that will both do the job and allow synchronization with the caller. */
std::promise<void> m_job;
/** The future from m_jobs, which will be extracted before any thread gets a chance to play with it. */
......
......@@ -331,6 +331,56 @@ BackendVFS::ScopedLock * BackendVFS::lockShared(std::string name, uint64_t timeo
return ret.release();
}
BackendVFS::AsyncCreator::AsyncCreator(BackendVFS& be, const std::string& name, const std::string& value):
m_backend(be), m_name(name), m_value(value),
m_job(std::async(std::launch::async,
[&](){
std::string path = m_backend.m_root + "/" + m_name;
std::string lockPath = m_backend.m_root + "/." + m_name + ".lock";
bool fileCreated = false;
bool lockCreated = false;
try {
// TODO: lax permissions to get prototype going. Should be revisited
int fd = ::open(path.c_str(), O_WRONLY | O_CREAT | O_EXCL, S_IRWXU | S_IRWXG | S_IRWXO);
// Create and fill up the path
cta::exception::Errnum::throwOnMinusOne(fd,
"In AsyncCreator::AsyncCreator::lambda, failed to open the file");
fileCreated = true;
#ifdef LOW_LEVEL_TRACING
::printf("In BackendVFS::create(): created object %s, tid=%li\n", name.c_str(), ::syscall(SYS_gettid));
#endif
cta::exception::Errnum::throwOnMinusOne(
::write(fd, m_value.c_str(), m_value.size()),
"In AsyncCreator::AsyncCreator::lambda, failed to write to file");
cta::exception::Errnum::throwOnMinusOne(::close(fd),
"In AsyncCreator::AsyncCreator::lambda, failed to close the file");
// Create the lock file
// TODO: lax permissions to get prototype going. Should be revisited
int fdLock = ::open(lockPath.c_str(), O_WRONLY | O_CREAT | O_EXCL, S_IRWXU | S_IRWXG | S_IRWXO);
lockCreated = true;
cta::exception::Errnum::throwOnMinusOne(fdLock,
std::string("In AsyncCreator::AsyncCreator::lambda, failed to create the lock file: ") + name);
cta::exception::Errnum::throwOnMinusOne(::close(fdLock),
std::string("In AsyncCreator::AsyncCreator::lambda, failed to close the lock file: ") + name);
} catch (...) {
if (fileCreated) unlink(path.c_str());
if (lockCreated) unlink(lockPath.c_str());
throw;
}
}))
{}
Backend::AsyncCreator* BackendVFS::asyncCreate(const std::string& name, const std::string& value) {
// Create the object. Done.
return new AsyncCreator(*this, name, value);
}
void BackendVFS::AsyncCreator::wait() {
m_job.get();
ANNOTATE_HAPPENS_AFTER(&m_job);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
}
BackendVFS::AsyncUpdater::AsyncUpdater(BackendVFS & be, const std::string& name, std::function<std::string(const std::string&)>& update):
m_backend(be), m_name(name), m_update(update),
m_job(std::async(std::launch::async,
......
......@@ -90,6 +90,24 @@ public:
ScopedLock * lockExclusive(std::string name, uint64_t timeout_us=0) override;
ScopedLock * lockShared(std::string name, uint64_t timeout_us=0) override;
/**
* A class mimicking AIO using C++ async tasks
*/
class AsyncCreator: public Backend::AsyncCreator {
public:
AsyncCreator(BackendVFS & be, const std::string & name, const std::string & value);
void wait() override;
private:
/** A reference to the backend */
BackendVFS &m_backend;
/** The object name */
const std::string m_name;
/** The object value */
std::string m_value;
/** The future that will both do the job and allow synchronization with the caller. */
std::future<void> m_job;
};
/**
* A class mimicking AIO using C++ async tasks
......@@ -147,6 +165,8 @@ public:
void run() override;
};
Backend::AsyncCreator* asyncCreate(const std::string& name, const std::string& value) override;
Backend::AsyncUpdater* asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update) override;
Backend::AsyncDeleter* asyncDelete(const std::string & name) override;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment