Commit b7213e30 authored by Victor Kotlyar's avatar Victor Kotlyar
Browse files

Implement batch reporting to the backend for successful retrieve jobs.

Proceed all successful reports asynchronously and periodically check
and clear statuses if they have finished.
In the end of session do the check/flush for all reports in the
successful reports queue.

Switch from synchronous rados remove to async aio_remove in
case of zero size object in BackendRados::AsyncUpdater
parent 1b8d6a29
......@@ -108,6 +108,7 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(CouldNotFetch);
CTA_GENERATE_EXCEPTION_CLASS(CouldNotUpdateValue);
CTA_GENERATE_EXCEPTION_CLASS(CouldNotCommit);
CTA_GENERATE_EXCEPTION_CLASS(CouldNotDelete);
CTA_GENERATE_EXCEPTION_CLASS(CouldNotUnlock);
CTA_GENERATE_EXCEPTION_CLASS(AsyncUpdateWithDelete);
......@@ -138,6 +139,34 @@ public:
*/
virtual AsyncUpdater * asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update) = 0;
/**
* A base class handling asynchronous sequence of lock exclusive, delete.
* Each operation will be asynchronous, and the result
* (success or exception) will be returned via the wait() function call.
*/
class AsyncDeleter {
public:
/**
* Waits for completion (success) of throws exception (failure).
*/
virtual void wait() = 0;
/**
* Destructor
*/
virtual ~AsyncDeleter() {}
};
/**
* Triggers the asynchronous object delete sequence, as described in
* AsyncDeleter class description.
*
* @param name The name of the object to be deleted.
* @return pointer to a newly created AsyncDeleter
*/
virtual AsyncDeleter * asyncDelete(const std::string & name) = 0;
/**
* Base class for the representation of the parameters of the BackendStore.
*/
......
......@@ -338,12 +338,17 @@ void BackendRados::AsyncUpdater::statCallback(librados::completion_t completion,
std::string("In BackendRados::AsyncUpdater::statCallback(): could not stat object: ") + au.m_name);
throw Backend::NoSuchObject(errnum.getMessageValue());
}
// Check the size. If zero, we locked an empty object: delete and throw an exception.
// Check the size. If zero, we locked an empty object: delete and throw an exception in the deleteCallback
if (!au.m_size) {
// TODO. This is going to lock the callback thread of the rados context for a while.
// As this is not supposde to happen often, this is acceptable.
au.m_backend.remove(au.m_name);
throw Backend::NoSuchObject(std::string("In BackendRados::AsyncUpdater::statCallback(): no such object: ") + au.m_name);
// launch the delete operation (async).
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(&au, deleteEmptyCallback, nullptr);
auto rc=au.m_backend.m_radosCtx.aio_remove(au.m_name, aioc);
aioc->release();
if (rc) {
cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncUpdater::statCallback(): failed to launch aio_remove(): ")+au.m_name);
throw Backend::CouldNotDelete(errnum.getMessageValue());
}
return;
}
// Stat is done, we can launch the read operation (async).
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(&au, fetchCallback, nullptr);
......@@ -359,6 +364,23 @@ void BackendRados::AsyncUpdater::statCallback(librados::completion_t completion,
}
}
void BackendRados::AsyncUpdater::deleteEmptyCallback(librados::completion_t completion, void* pThis) {
AsyncUpdater & au = *((AsyncUpdater *) pThis);
try {
// Check that the object could be deleted.
if (rados_aio_get_return_value(completion)) {
cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
std::string("In BackendRados::AsyncUpdater::deleteEmptyCallback(): could not delete object: ") + au.m_name);
throw Backend::CouldNotDelete(errnum.getMessageValue());
}
// object deleted then throw an exception
throw Backend::NoSuchObject(std::string("In BackendRados::AsyncUpdater::deleteEmptyCallback(): no such object: ") + au.m_name);
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_exception(std::current_exception());
}
}
void BackendRados::AsyncUpdater::fetchCallback(librados::completion_t completion, void* pThis) {
AsyncUpdater & au = *((AsyncUpdater *) pThis);
try {
......@@ -484,6 +506,153 @@ void BackendRados::AsyncUpdater::wait() {
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
}
Backend::AsyncDeleter* BackendRados::asyncDelete(const std::string & name)
{
return new AsyncDeleter(*this, name);
}
BackendRados::AsyncDeleter::AsyncDeleter(BackendRados& be, const std::string& name):
m_backend(be), m_name(name), m_job(), m_jobFuture(m_job.get_future()) {
// At construction time, we just fire a lock.
try {
// Rados does not have aio_lock, so we do it in an async.
// Operation is lock (synchronous), and then launch an async stat, then read.
// The async function never fails, exceptions go to the promise (as everywhere).
m_lockAsync.reset(new std::future<void>(std::async(std::launch::async,
[this](){
try {
m_lockClient = BackendRados::createUniqueClientId();
struct timeval tv;
tv.tv_usec = 0;
tv.tv_sec = 60;
int rc;
// TODO: could be improved (but need aio_lock in rados, not available at the time
// of writing).
// Crude backoff: we will measure the RTT of the call and backoff a faction of this amount multiplied
// by the number of tries (and capped by a maximum). Then the value will be randomized
// (betweend and 50-150%)
size_t backoff=1;
utils::Timer t;
while (true) {
rc = m_backend.m_radosCtx.lock_exclusive(m_name, "lock", m_lockClient, "", &tv, 0);
if (-EBUSY != rc) break;
timespec ts;
auto wait=t.usecs(utils::Timer::resetCounter)*backoff++/c_backoffFraction;
wait = std::min(wait, c_maxWait);
if (backoff>c_maxBackoff) backoff=1;
// We need to get a random number [50, 150]
std::default_random_engine dre(std::chrono::system_clock::now().time_since_epoch().count());
std::uniform_int_distribution<size_t> distribution(50, 150);
decltype(wait) randFactor=distribution(dre);
wait=(wait * randFactor)/100;
ts.tv_sec = wait/(1000*1000);
ts.tv_nsec = (wait % (1000*1000)) * 1000;
nanosleep(&ts, nullptr);
}
if (rc) {
cta::exception::Errnum errnum(-rc,
std::string("In BackendRados::AsyncDeleter::statCallback::lock_lambda(): failed to librados::IoCtx::lock_exclusive: ") +
m_name + "/" + "lock" + "/" + m_lockClient + "//");
throw CouldNotLock(errnum.getMessageValue());
}
// Locking is done, we can launch the stat operation (async).
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(this, statCallback, nullptr);
rc=m_backend.m_radosCtx.aio_stat(m_name, aioc, &m_size, &date);
aioc->release();
if (rc) {
cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncDeleter::AsyncDeleter::lock_lambda(): failed to launch aio_stat(): ")+m_name);
throw Backend::NoSuchObject(errnum.getMessageValue());
}
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&m_job);
m_job.set_exception(std::current_exception());
}
}
)));
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&m_job);
m_job.set_exception(std::current_exception());
}
}
void BackendRados::AsyncDeleter::statCallback(librados::completion_t completion, void* pThis) {
AsyncDeleter & au = *((AsyncDeleter *) pThis);
try {
// Get the object size (it's already locked).
if (rados_aio_get_return_value(completion)) {
cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
std::string("In BackendRados::AsyncDeleter::statCallback(): could not stat object: ") + au.m_name);
throw Backend::NoSuchObject(errnum.getMessageValue());
}
// Check the size. If zero, we locked an empty object: delete and throw an exception.
if (!au.m_size) {
// launch the delete operation (async).
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(&au, deleteEmptyCallback, nullptr);
auto rc=au.m_backend.m_radosCtx.aio_remove(au.m_name, aioc);
aioc->release();
if (rc) {
cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncDeleter::statCallback():"
" failed to launch aio_remove() for zero size object: ")+au.m_name);
throw Backend::CouldNotDelete(errnum.getMessageValue());
}
return;
}
// Stat is done, we can launch the delete operation (async).
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(&au, deleteCallback, nullptr);
auto rc=au.m_backend.m_radosCtx.aio_remove(au.m_name, aioc);
aioc->release();
if (rc) {
cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncUpdater::statCallback(): failed to launch aio_remove(): ")+au.m_name);
throw Backend::CouldNotDelete(errnum.getMessageValue());
}
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_exception(std::current_exception());
}
}
void BackendRados::AsyncDeleter::deleteCallback(librados::completion_t completion, void* pThis) {
AsyncDeleter & au = *((AsyncDeleter *) pThis);
try {
// Check that the object could be deleted.
if (rados_aio_get_return_value(completion)) {
cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
std::string("In BackendRados::AsyncDeleter::deleteCallback(): could not delete object: ") + au.m_name);
throw Backend::CouldNotDelete(errnum.getMessageValue());
}
// Done!
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_value();
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_exception(std::current_exception());
}
}
void BackendRados::AsyncDeleter::deleteEmptyCallback(librados::completion_t completion, void* pThis) {
AsyncDeleter & au = *((AsyncDeleter *) pThis);
try {
// Check that the object could be deleted.
if (rados_aio_get_return_value(completion)) {
cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
std::string("In BackendRados::AsyncDeleter::deleteEmptyCallback(): could not delete object: ") + au.m_name);
throw Backend::CouldNotDelete(errnum.getMessageValue());
}
// object deleted then throw an exception
throw Backend::NoSuchObject(std::string("In BackendRados::AsyncDeleter::deleteEmptyCallback(): no such object: ") + au.m_name);
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_exception(std::current_exception());
}
}
void BackendRados::AsyncDeleter::wait() {
m_jobFuture.get();
ANNOTATE_HAPPENS_AFTER(&m_job);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
}
std::string BackendRados::Parameters::toStr() {
std::stringstream ret;
ret << "userId=" << m_userId << " pool=" << m_pool;
......
......@@ -120,6 +120,8 @@ public:
std::unique_ptr<std::future<void>> m_updateAsync;
/** The first callback operation (after checking existence) */
static void statCallback(librados::completion_t completion, void *pThis);
/** Async delete in case of zero sized object */
static void deleteEmptyCallback(librados::completion_t completion, void *pThis);
/** The second callback operation (after reading) */
static void fetchCallback(librados::completion_t completion, void *pThis);
/** The third callback operation (after writing) */
......@@ -130,6 +132,45 @@ public:
Backend::AsyncUpdater* asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update) override;
/**
* A class following up the check existence-lock-delete.
* Constructor implicitly starts the lock step.
*/
class AsyncDeleter: public Backend::AsyncDeleter {
public:
AsyncDeleter(BackendRados & be, const std::string & name);
void wait() override;
private:
/** A reference to the backend */
BackendRados &m_backend;
/** The object name */
const std::string m_name;
/** Storage for stat operation (size) */
uint64_t m_size;
/** Storage for stat operation (date) */
time_t date;
/** The promise that will both do the job and allow synchronization with the caller. */
std::promise<void> m_job;
/** The future from m_jobs, which will be extracted before any thread gets a chance to play with it. */
std::future<void> m_jobFuture;
/** A future used to hold the structure of the lock operation. It will be either empty of complete at
destruction time */
std::unique_ptr<std::future<void>> m_lockAsync;
/** A string used to identify the locker */
std::string m_lockClient;
/** A future the hole the the structure of the update operation. It will be either empty of complete at
destruction time */
std::unique_ptr<std::future<void>> m_updateAsync;
/** The first callback operation (after checking existence) */
static void statCallback(librados::completion_t completion, void *pThis);
/** The second callback operation (after deleting) */
static void deleteCallback(librados::completion_t completion, void *pThis);
/** Async delete in case of zero sized object */
static void deleteEmptyCallback(librados::completion_t completion, void *pThis);
};
Backend::AsyncDeleter* asyncDelete(const std::string & name) override;
class Parameters: public Backend::Parameters {
friend class BackendRados;
public:
......
......@@ -377,6 +377,46 @@ void BackendVFS::AsyncUpdater::wait() {
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
}
BackendVFS::AsyncDeleter::AsyncDeleter(BackendVFS & be, const std::string& name):
m_backend(be), m_name(name),
m_job(std::async(std::launch::async,
[&](){
std::unique_ptr<ScopedLock> sl;
try { // locking already throws proper exceptions for no such file.
sl.reset(m_backend.lockExclusive(m_name));
} catch (Backend::NoSuchObject &) {
ANNOTATE_HAPPENS_BEFORE(&m_job);
throw;
} catch (cta::exception::Exception & ex) {
ANNOTATE_HAPPENS_BEFORE(&m_job);
throw Backend::CouldNotLock(ex.getMessageValue());
}
try {
m_backend.remove(m_name);
} catch (cta::exception::Exception & ex) {
ANNOTATE_HAPPENS_BEFORE(&m_job);
throw Backend::CouldNotDelete(ex.getMessageValue());
}
try {
sl->release();
} catch (cta::exception::Exception & ex) {
ANNOTATE_HAPPENS_BEFORE(&m_job);
throw Backend::CouldNotUnlock(ex.getMessageValue());
}
ANNOTATE_HAPPENS_BEFORE(&m_job);
}))
{}
Backend::AsyncDeleter* BackendVFS::asyncDelete(const std::string & name) {
// Create the object. Done.
return new AsyncDeleter(*this, name);
}
void BackendVFS::AsyncDeleter::wait() {
m_job.get();
ANNOTATE_HAPPENS_AFTER(&m_job);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
}
std::string BackendVFS::Parameters::toStr() {
std::stringstream ret;
......
......@@ -107,9 +107,27 @@ public:
/** The future that will both do the job and allow synchronization with the caller. */
std::future<void> m_job;
};
/**
* A class mimicking AIO using C++ async tasks
*/
class AsyncDeleter: public Backend::AsyncDeleter {
public:
AsyncDeleter(BackendVFS & be, const std::string & name);
void wait() override;
private:
/** A reference to the backend */
BackendVFS &m_backend;
/** The object name */
const std::string m_name;
/** The future that will both do the job and allow synchronization with the caller. */
std::future<void> m_job;
};
Backend::AsyncUpdater* asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update) override;
Backend::AsyncDeleter* asyncDelete(const std::string & name) override;
class Parameters: public Backend::Parameters {
friend class BackendVFS;
public:
......
......@@ -124,13 +124,13 @@ public:
m_headerInterpreted = false;
m_payloadInterpreted = false;
}
void resetValues () {
m_existingObject = false;
m_headerInterpreted = false;
m_payloadInterpreted = false;
}
void setOwner(const std::string & owner) {
checkHeaderWritable();
m_header.set_owner(owner);
......
......@@ -476,5 +476,15 @@ std::string RetrieveRequest::dump() {
return headerDump;
}
RetrieveRequest::AsyncJobDeleter * RetrieveRequest::asyncDeleteJob() {
std::unique_ptr<AsyncJobDeleter> ret(new AsyncJobDeleter);
ret->m_backendDeleter.reset(m_objectStore.asyncDelete(getAddressIfSet()));
return ret.release();
}
void RetrieveRequest::AsyncJobDeleter::wait() {
m_backendDeleter->wait();
}
}} // namespace cta::objectstore
......@@ -57,6 +57,15 @@ public:
uint32_t totalRetries;
// TODO: status
};
// An asynchronous job ownership updating class.
class AsyncJobDeleter {
friend class RetrieveRequest;
public:
void wait();
private:
std::unique_ptr<Backend::AsyncDeleter> m_backendDeleter;
};
AsyncJobDeleter * asyncDeleteJob();
JobDump getJob(uint16_t copyNb);
std::list<JobDump> getJobs();
bool addJobFailure(uint16_t copyNumber, uint64_t mountId); /**< Returns true is the request is completely failed
......
......@@ -2594,26 +2594,28 @@ OStoreDB::RetrieveJob::~RetrieveJob() {
}
//------------------------------------------------------------------------------
// OStoreDB::RetrieveJob::succeed()
// OStoreDB::RetrieveJob::asyncSucceed()
//------------------------------------------------------------------------------
void OStoreDB::RetrieveJob::succeed() {
// Lock the request and set the request as successful (delete it).
utils::Timer t;
objectstore::ScopedExclusiveLock rtfrl(m_retrieveRequest);
m_retrieveRequest.fetch();
std::string rtfrAddress = m_retrieveRequest.getAddressIfSet();
m_retrieveRequest.remove();
void OStoreDB::RetrieveJob::asyncSucceed() {
// set the request as successful (delete it).
m_jobDelete.reset(m_retrieveRequest.asyncDeleteJob());
}
//------------------------------------------------------------------------------
// OStoreDB::RetrieveJob::checkSucceed()
//------------------------------------------------------------------------------
void OStoreDB::RetrieveJob::checkSucceed() {
m_jobDelete->wait();
m_retrieveRequest.resetValues();
// We no more own the job (which could be gone)
m_jobOwned = false;
// Remove ownership form the agent
const std::string rtfrAddress = m_retrieveRequest.getAddressIfSet();
m_agentReference.removeFromOwnership(rtfrAddress, m_objectStore);
log::LogContext lc(m_logger);
log::ScopedParamContainer params(lc);
params.add("requestObject", rtfrAddress)
.add("schedulerDbTime", t.secs());
lc.log(log::INFO, "In RetrieveJob::succeed(): deleted completed retrieve request.");
}
} // namespace cta
......@@ -188,7 +188,8 @@ public:
public:
CTA_GENERATE_EXCEPTION_CLASS(JobNowOwned);
CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob);
virtual void succeed() override;
virtual void asyncSucceed() override;
virtual void checkSucceed() override;
virtual void fail(log::LogContext &) override;
virtual ~RetrieveJob() override;
private:
......@@ -202,6 +203,7 @@ public:
objectstore::AgentReference & m_agentReference;
objectstore::RetrieveRequest m_retrieveRequest;
OStoreDB::RetrieveMount & m_retrieveMount;
std::unique_ptr<objectstore::RetrieveRequest::AsyncJobDeleter> m_jobDelete;
};
/* === Archive requests handling ========================================= */
......
......@@ -40,12 +40,19 @@ cta::RetrieveJob::RetrieveJob(RetrieveMount &mount,
transferredSize(std::numeric_limits<decltype(transferredSize)>::max()) {}
//------------------------------------------------------------------------------
// complete
// asyncComplete
//------------------------------------------------------------------------------
void cta::RetrieveJob::complete() {
m_dbJob->succeed();
void cta::RetrieveJob::asyncComplete() {
m_dbJob->asyncSucceed();
}
//------------------------------------------------------------------------------
// checkComplete
//------------------------------------------------------------------------------
void cta::RetrieveJob::checkComplete() {
m_dbJob->checkSucceed();
}
//------------------------------------------------------------------------------
// failed
//------------------------------------------------------------------------------
......
......@@ -72,14 +72,20 @@ public:
* Destructor.
*/
virtual ~RetrieveJob() throw();
/**
* Asynchronously indicates to the backend that the job was successful.
* The checksum and the size of the transfer should already stored in the
* object beforehand. Result setting and calling complete are done in 2
* different threads (disk write and reporter thread, respectively).
*/
virtual void asyncComplete();
/**
* Indicates that the job was successful. The checksum and the size of the
* transfer should already stored in the object beforehand. Result setting
* and calling complete are done in 2 different threads (disk write and
* reporter thread, respectively).
* Check that asynchronous complete is finished and cleanup the job structures
*
*/
virtual void complete();
virtual void checkComplete();
/**
* Indicates that the job failed. Like for complete(), reason for failure
......
......@@ -336,7 +336,8 @@ public:
cta::common::dataStructures::RetrieveRequest retrieveRequest;
cta::common::dataStructures::ArchiveFile archiveFile;
uint64_t selectedCopyNb;
virtual void succeed() = 0;
virtual void asyncSucceed() = 0;
virtual void checkSucceed() = 0;
virtual void fail(log::LogContext &) = 0;
virtual ~RetrieveJob() {}
};
......
......@@ -521,7 +521,8 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
ASSERT_EQ(1, jobBatch.size());
retrieveJob.reset(jobBatch.front().release());
ASSERT_NE((cta::RetrieveJob*)NULL, retrieveJob.get());
retrieveJob->complete();
retrieveJob->asyncComplete();
retrieveJob->checkComplete();
jobBatch = retrieveMount->getNextJobBatch(1,1,lc);
ASSERT_EQ(0, jobBatch.size());
}
......
......@@ -33,8 +33,8 @@ namespace cta {
cta::PositioningMethod::ByBlock), completes(0), failures(0) {
archiveFile.tapeFiles[1];
}
virtual void complete() override { completes++; }
virtual void asyncComplete() override { completes++; }
virtual void checkComplete() override {}
virtual void failed(cta::log::LogContext &) override { failures++; };
~MockRetrieveJob() throw() {}
......
......@@ -112,7 +112,14 @@ void RecallReportPacker::reportTestGoingToEnd(){
//ReportSuccessful::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& parent){
m_successfulRetrieveJob->complete();
m_successfulRetrieveJob->asyncComplete();
}
//------------------------------------------------------------------------------
//ReportSuccessful::waitForAsyncExecuteFinished
//------------------------------------------------------------------------------
void RecallReportPacker::ReportSuccessful::waitForAsyncExecuteFinished(){
m_successfulRetrieveJob->checkComplete();
}
//------------------------------------------------------------------------------
......@@ -217,6 +224,8 @@ void RecallReportPacker::WorkerThread::run(){
m_parent.m_lc.pushOrReplace(Param("thread", "RecallReportPacker"));
m_parent.m_lc.log(cta::log::DEBUG, "Starting RecallReportPacker thread");
bool endFound = false;
std::list <std::unique_ptr<Report>> reportedSuccessfully;
while(1) {
std::string debugType;