diff --git a/objectstore/Backend.hpp b/objectstore/Backend.hpp index 7b9a5ca0db36fc4330d4553fa0175d10cb5cd337..0de0bbe4e7d651b4ceac526f4d8e18256352f659 100644 --- a/objectstore/Backend.hpp +++ b/objectstore/Backend.hpp @@ -108,6 +108,7 @@ public: CTA_GENERATE_EXCEPTION_CLASS(CouldNotFetch); CTA_GENERATE_EXCEPTION_CLASS(CouldNotUpdateValue); CTA_GENERATE_EXCEPTION_CLASS(CouldNotCommit); + CTA_GENERATE_EXCEPTION_CLASS(CouldNotDelete); CTA_GENERATE_EXCEPTION_CLASS(CouldNotUnlock); CTA_GENERATE_EXCEPTION_CLASS(AsyncUpdateWithDelete); @@ -138,6 +139,34 @@ public: */ virtual AsyncUpdater * asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update) = 0; + + /** + * A base class handling asynchronous sequence of lock exclusive, delete. + * Each operation will be asynchronous, and the result + * (success or exception) will be returned via the wait() function call. + */ + class AsyncDeleter { + public: + /** + * Waits for completion (success) of throws exception (failure). + */ + virtual void wait() = 0; + + /** + * Destructor + */ + virtual ~AsyncDeleter() {} + }; + + /** + * Triggers the asynchronous object delete sequence, as described in + * AsyncDeleter class description. + * + * @param name The name of the object to be deleted. + * @return pointer to a newly created AsyncDeleter + */ + virtual AsyncDeleter * asyncDelete(const std::string & name) = 0; + /** * Base class for the representation of the parameters of the BackendStore. */ diff --git a/objectstore/BackendRados.cpp b/objectstore/BackendRados.cpp index 18f7a3187da43f0870f2cbbbce06ae46296b4565..4631049d8cb936f7780313e5ffc954b2459688da 100644 --- a/objectstore/BackendRados.cpp +++ b/objectstore/BackendRados.cpp @@ -338,12 +338,17 @@ void BackendRados::AsyncUpdater::statCallback(librados::completion_t completion, std::string("In BackendRados::AsyncUpdater::statCallback(): could not stat object: ") + au.m_name); throw Backend::NoSuchObject(errnum.getMessageValue()); } - // Check the size. If zero, we locked an empty object: delete and throw an exception. + // Check the size. If zero, we locked an empty object: delete and throw an exception in the deleteCallback if (!au.m_size) { - // TODO. This is going to lock the callback thread of the rados context for a while. - // As this is not supposde to happen often, this is acceptable. - au.m_backend.remove(au.m_name); - throw Backend::NoSuchObject(std::string("In BackendRados::AsyncUpdater::statCallback(): no such object: ") + au.m_name); + // launch the delete operation (async). + librados::AioCompletion * aioc = librados::Rados::aio_create_completion(&au, deleteEmptyCallback, nullptr); + auto rc=au.m_backend.m_radosCtx.aio_remove(au.m_name, aioc); + aioc->release(); + if (rc) { + cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncUpdater::statCallback(): failed to launch aio_remove(): ")+au.m_name); + throw Backend::CouldNotDelete(errnum.getMessageValue()); + } + return; } // Stat is done, we can launch the read operation (async). librados::AioCompletion * aioc = librados::Rados::aio_create_completion(&au, fetchCallback, nullptr); @@ -359,6 +364,23 @@ void BackendRados::AsyncUpdater::statCallback(librados::completion_t completion, } } +void BackendRados::AsyncUpdater::deleteEmptyCallback(librados::completion_t completion, void* pThis) { + AsyncUpdater & au = *((AsyncUpdater *) pThis); + try { + // Check that the object could be deleted. + if (rados_aio_get_return_value(completion)) { + cta::exception::Errnum errnum(-rados_aio_get_return_value(completion), + std::string("In BackendRados::AsyncUpdater::deleteEmptyCallback(): could not delete object: ") + au.m_name); + throw Backend::CouldNotDelete(errnum.getMessageValue()); + } + // object deleted then throw an exception + throw Backend::NoSuchObject(std::string("In BackendRados::AsyncUpdater::deleteEmptyCallback(): no such object: ") + au.m_name); + } catch (...) { + ANNOTATE_HAPPENS_BEFORE(&au.m_job); + au.m_job.set_exception(std::current_exception()); + } +} + void BackendRados::AsyncUpdater::fetchCallback(librados::completion_t completion, void* pThis) { AsyncUpdater & au = *((AsyncUpdater *) pThis); try { @@ -484,6 +506,153 @@ void BackendRados::AsyncUpdater::wait() { ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job); } + +Backend::AsyncDeleter* BackendRados::asyncDelete(const std::string & name) +{ + return new AsyncDeleter(*this, name); +} + +BackendRados::AsyncDeleter::AsyncDeleter(BackendRados& be, const std::string& name): + m_backend(be), m_name(name), m_job(), m_jobFuture(m_job.get_future()) { + // At construction time, we just fire a lock. + try { + // Rados does not have aio_lock, so we do it in an async. + // Operation is lock (synchronous), and then launch an async stat, then read. + // The async function never fails, exceptions go to the promise (as everywhere). + m_lockAsync.reset(new std::future<void>(std::async(std::launch::async, + [this](){ + try { + m_lockClient = BackendRados::createUniqueClientId(); + struct timeval tv; + tv.tv_usec = 0; + tv.tv_sec = 60; + int rc; + // TODO: could be improved (but need aio_lock in rados, not available at the time + // of writing). + // Crude backoff: we will measure the RTT of the call and backoff a faction of this amount multiplied + // by the number of tries (and capped by a maximum). Then the value will be randomized + // (betweend and 50-150%) + size_t backoff=1; + utils::Timer t; + while (true) { + rc = m_backend.m_radosCtx.lock_exclusive(m_name, "lock", m_lockClient, "", &tv, 0); + if (-EBUSY != rc) break; + timespec ts; + auto wait=t.usecs(utils::Timer::resetCounter)*backoff++/c_backoffFraction; + wait = std::min(wait, c_maxWait); + if (backoff>c_maxBackoff) backoff=1; + // We need to get a random number [50, 150] + std::default_random_engine dre(std::chrono::system_clock::now().time_since_epoch().count()); + std::uniform_int_distribution<size_t> distribution(50, 150); + decltype(wait) randFactor=distribution(dre); + wait=(wait * randFactor)/100; + ts.tv_sec = wait/(1000*1000); + ts.tv_nsec = (wait % (1000*1000)) * 1000; + nanosleep(&ts, nullptr); + } + if (rc) { + cta::exception::Errnum errnum(-rc, + std::string("In BackendRados::AsyncDeleter::statCallback::lock_lambda(): failed to librados::IoCtx::lock_exclusive: ") + + m_name + "/" + "lock" + "/" + m_lockClient + "//"); + throw CouldNotLock(errnum.getMessageValue()); + } + // Locking is done, we can launch the stat operation (async). + librados::AioCompletion * aioc = librados::Rados::aio_create_completion(this, statCallback, nullptr); + rc=m_backend.m_radosCtx.aio_stat(m_name, aioc, &m_size, &date); + aioc->release(); + if (rc) { + cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncDeleter::AsyncDeleter::lock_lambda(): failed to launch aio_stat(): ")+m_name); + throw Backend::NoSuchObject(errnum.getMessageValue()); + } + } catch (...) { + ANNOTATE_HAPPENS_BEFORE(&m_job); + m_job.set_exception(std::current_exception()); + } + } + ))); + } catch (...) { + ANNOTATE_HAPPENS_BEFORE(&m_job); + m_job.set_exception(std::current_exception()); + } +} + +void BackendRados::AsyncDeleter::statCallback(librados::completion_t completion, void* pThis) { + AsyncDeleter & au = *((AsyncDeleter *) pThis); + try { + // Get the object size (it's already locked). + if (rados_aio_get_return_value(completion)) { + cta::exception::Errnum errnum(-rados_aio_get_return_value(completion), + std::string("In BackendRados::AsyncDeleter::statCallback(): could not stat object: ") + au.m_name); + throw Backend::NoSuchObject(errnum.getMessageValue()); + } + // Check the size. If zero, we locked an empty object: delete and throw an exception. + if (!au.m_size) { + // launch the delete operation (async). + librados::AioCompletion * aioc = librados::Rados::aio_create_completion(&au, deleteEmptyCallback, nullptr); + auto rc=au.m_backend.m_radosCtx.aio_remove(au.m_name, aioc); + aioc->release(); + if (rc) { + cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncDeleter::statCallback():" + " failed to launch aio_remove() for zero size object: ")+au.m_name); + throw Backend::CouldNotDelete(errnum.getMessageValue()); + } + return; + } + // Stat is done, we can launch the delete operation (async). + librados::AioCompletion * aioc = librados::Rados::aio_create_completion(&au, deleteCallback, nullptr); + auto rc=au.m_backend.m_radosCtx.aio_remove(au.m_name, aioc); + aioc->release(); + if (rc) { + cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncUpdater::statCallback(): failed to launch aio_remove(): ")+au.m_name); + throw Backend::CouldNotDelete(errnum.getMessageValue()); + } + } catch (...) { + ANNOTATE_HAPPENS_BEFORE(&au.m_job); + au.m_job.set_exception(std::current_exception()); + } +} + +void BackendRados::AsyncDeleter::deleteCallback(librados::completion_t completion, void* pThis) { + AsyncDeleter & au = *((AsyncDeleter *) pThis); + try { + // Check that the object could be deleted. + if (rados_aio_get_return_value(completion)) { + cta::exception::Errnum errnum(-rados_aio_get_return_value(completion), + std::string("In BackendRados::AsyncDeleter::deleteCallback(): could not delete object: ") + au.m_name); + throw Backend::CouldNotDelete(errnum.getMessageValue()); + } + // Done! + ANNOTATE_HAPPENS_BEFORE(&au.m_job); + au.m_job.set_value(); + } catch (...) { + ANNOTATE_HAPPENS_BEFORE(&au.m_job); + au.m_job.set_exception(std::current_exception()); + } +} + +void BackendRados::AsyncDeleter::deleteEmptyCallback(librados::completion_t completion, void* pThis) { + AsyncDeleter & au = *((AsyncDeleter *) pThis); + try { + // Check that the object could be deleted. + if (rados_aio_get_return_value(completion)) { + cta::exception::Errnum errnum(-rados_aio_get_return_value(completion), + std::string("In BackendRados::AsyncDeleter::deleteEmptyCallback(): could not delete object: ") + au.m_name); + throw Backend::CouldNotDelete(errnum.getMessageValue()); + } + // object deleted then throw an exception + throw Backend::NoSuchObject(std::string("In BackendRados::AsyncDeleter::deleteEmptyCallback(): no such object: ") + au.m_name); + } catch (...) { + ANNOTATE_HAPPENS_BEFORE(&au.m_job); + au.m_job.set_exception(std::current_exception()); + } +} + +void BackendRados::AsyncDeleter::wait() { + m_jobFuture.get(); + ANNOTATE_HAPPENS_AFTER(&m_job); + ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job); +} + std::string BackendRados::Parameters::toStr() { std::stringstream ret; ret << "userId=" << m_userId << " pool=" << m_pool; diff --git a/objectstore/BackendRados.hpp b/objectstore/BackendRados.hpp index 81421b82fdf9e2ef61ed18eca4016dc0dee835db..b4895ac574e484870448e0b47c2db99da6ace0f6 100644 --- a/objectstore/BackendRados.hpp +++ b/objectstore/BackendRados.hpp @@ -120,6 +120,8 @@ public: std::unique_ptr<std::future<void>> m_updateAsync; /** The first callback operation (after checking existence) */ static void statCallback(librados::completion_t completion, void *pThis); + /** Async delete in case of zero sized object */ + static void deleteEmptyCallback(librados::completion_t completion, void *pThis); /** The second callback operation (after reading) */ static void fetchCallback(librados::completion_t completion, void *pThis); /** The third callback operation (after writing) */ @@ -130,6 +132,45 @@ public: Backend::AsyncUpdater* asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update) override; + /** + * A class following up the check existence-lock-delete. + * Constructor implicitly starts the lock step. + */ + class AsyncDeleter: public Backend::AsyncDeleter { + public: + AsyncDeleter(BackendRados & be, const std::string & name); + void wait() override; + private: + /** A reference to the backend */ + BackendRados &m_backend; + /** The object name */ + const std::string m_name; + /** Storage for stat operation (size) */ + uint64_t m_size; + /** Storage for stat operation (date) */ + time_t date; + /** The promise that will both do the job and allow synchronization with the caller. */ + std::promise<void> m_job; + /** The future from m_jobs, which will be extracted before any thread gets a chance to play with it. */ + std::future<void> m_jobFuture; + /** A future used to hold the structure of the lock operation. It will be either empty of complete at + destruction time */ + std::unique_ptr<std::future<void>> m_lockAsync; + /** A string used to identify the locker */ + std::string m_lockClient; + /** A future the hole the the structure of the update operation. It will be either empty of complete at + destruction time */ + std::unique_ptr<std::future<void>> m_updateAsync; + /** The first callback operation (after checking existence) */ + static void statCallback(librados::completion_t completion, void *pThis); + /** The second callback operation (after deleting) */ + static void deleteCallback(librados::completion_t completion, void *pThis); + /** Async delete in case of zero sized object */ + static void deleteEmptyCallback(librados::completion_t completion, void *pThis); + }; + + Backend::AsyncDeleter* asyncDelete(const std::string & name) override; + class Parameters: public Backend::Parameters { friend class BackendRados; public: diff --git a/objectstore/BackendVFS.cpp b/objectstore/BackendVFS.cpp index a8680b6565c35241b3537f2c83e96641b0cdaad2..dbef4b2681786157510ba1c9997b028d79f4beb1 100644 --- a/objectstore/BackendVFS.cpp +++ b/objectstore/BackendVFS.cpp @@ -377,6 +377,46 @@ void BackendVFS::AsyncUpdater::wait() { ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job); } +BackendVFS::AsyncDeleter::AsyncDeleter(BackendVFS & be, const std::string& name): + m_backend(be), m_name(name), + m_job(std::async(std::launch::async, + [&](){ + std::unique_ptr<ScopedLock> sl; + try { // locking already throws proper exceptions for no such file. + sl.reset(m_backend.lockExclusive(m_name)); + } catch (Backend::NoSuchObject &) { + ANNOTATE_HAPPENS_BEFORE(&m_job); + throw; + } catch (cta::exception::Exception & ex) { + ANNOTATE_HAPPENS_BEFORE(&m_job); + throw Backend::CouldNotLock(ex.getMessageValue()); + } + try { + m_backend.remove(m_name); + } catch (cta::exception::Exception & ex) { + ANNOTATE_HAPPENS_BEFORE(&m_job); + throw Backend::CouldNotDelete(ex.getMessageValue()); + } + try { + sl->release(); + } catch (cta::exception::Exception & ex) { + ANNOTATE_HAPPENS_BEFORE(&m_job); + throw Backend::CouldNotUnlock(ex.getMessageValue()); + } + ANNOTATE_HAPPENS_BEFORE(&m_job); + })) +{} + +Backend::AsyncDeleter* BackendVFS::asyncDelete(const std::string & name) { + // Create the object. Done. + return new AsyncDeleter(*this, name); +} + +void BackendVFS::AsyncDeleter::wait() { + m_job.get(); + ANNOTATE_HAPPENS_AFTER(&m_job); + ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job); +} std::string BackendVFS::Parameters::toStr() { std::stringstream ret; diff --git a/objectstore/BackendVFS.hpp b/objectstore/BackendVFS.hpp index d55523f29479cb5686c9c381f3f2e4cbef1538e1..51398cede09b809f4cea7466dff7b32f8c213e6b 100644 --- a/objectstore/BackendVFS.hpp +++ b/objectstore/BackendVFS.hpp @@ -107,9 +107,27 @@ public: /** The future that will both do the job and allow synchronization with the caller. */ std::future<void> m_job; }; + + /** + * A class mimicking AIO using C++ async tasks + */ + class AsyncDeleter: public Backend::AsyncDeleter { + public: + AsyncDeleter(BackendVFS & be, const std::string & name); + void wait() override; + private: + /** A reference to the backend */ + BackendVFS &m_backend; + /** The object name */ + const std::string m_name; + /** The future that will both do the job and allow synchronization with the caller. */ + std::future<void> m_job; + }; Backend::AsyncUpdater* asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update) override; + Backend::AsyncDeleter* asyncDelete(const std::string & name) override; + class Parameters: public Backend::Parameters { friend class BackendVFS; public: diff --git a/objectstore/ObjectOps.hpp b/objectstore/ObjectOps.hpp index 3dbd6b3aef1c945c895cb72dc33097e622bea317..661c61024eba2174dbc9068172e12b351d42680e 100644 --- a/objectstore/ObjectOps.hpp +++ b/objectstore/ObjectOps.hpp @@ -124,13 +124,13 @@ public: m_headerInterpreted = false; m_payloadInterpreted = false; } - + void resetValues () { m_existingObject = false; m_headerInterpreted = false; m_payloadInterpreted = false; } - + void setOwner(const std::string & owner) { checkHeaderWritable(); m_header.set_owner(owner); diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index f8623f377729dd4d36c388d2b34a7a354d3ba43b..1c71cc2dbc994efdf4c43988bc185531c35218af 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -476,5 +476,15 @@ std::string RetrieveRequest::dump() { return headerDump; } +RetrieveRequest::AsyncJobDeleter * RetrieveRequest::asyncDeleteJob() { + std::unique_ptr<AsyncJobDeleter> ret(new AsyncJobDeleter); + ret->m_backendDeleter.reset(m_objectStore.asyncDelete(getAddressIfSet())); + return ret.release(); +} + +void RetrieveRequest::AsyncJobDeleter::wait() { + m_backendDeleter->wait(); +} + }} // namespace cta::objectstore diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp index 7a1f3f22c6f48bc834cd3ff1a5ad94c397bd05c3..2e8efbed04fe269f383867414b78a88054d6fb9d 100644 --- a/objectstore/RetrieveRequest.hpp +++ b/objectstore/RetrieveRequest.hpp @@ -57,6 +57,15 @@ public: uint32_t totalRetries; // TODO: status }; + // An asynchronous job ownership updating class. + class AsyncJobDeleter { + friend class RetrieveRequest; + public: + void wait(); + private: + std::unique_ptr<Backend::AsyncDeleter> m_backendDeleter; + }; + AsyncJobDeleter * asyncDeleteJob(); JobDump getJob(uint16_t copyNb); std::list<JobDump> getJobs(); bool addJobFailure(uint16_t copyNumber, uint64_t mountId); /**< Returns true is the request is completely failed diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 4437c1427f1dae3ff6d55f60ac97d8da11bb3f1a..abc1acf316b63f67c0dd3fc197bb14a96e9c1371 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -2594,26 +2594,28 @@ OStoreDB::RetrieveJob::~RetrieveJob() { } //------------------------------------------------------------------------------ -// OStoreDB::RetrieveJob::succeed() +// OStoreDB::RetrieveJob::asyncSucceed() //------------------------------------------------------------------------------ -void OStoreDB::RetrieveJob::succeed() { - // Lock the request and set the request as successful (delete it). - utils::Timer t; - objectstore::ScopedExclusiveLock rtfrl(m_retrieveRequest); - m_retrieveRequest.fetch(); - std::string rtfrAddress = m_retrieveRequest.getAddressIfSet(); - m_retrieveRequest.remove(); +void OStoreDB::RetrieveJob::asyncSucceed() { + // set the request as successful (delete it). + m_jobDelete.reset(m_retrieveRequest.asyncDeleteJob()); +} + +//------------------------------------------------------------------------------ +// OStoreDB::RetrieveJob::checkSucceed() +//------------------------------------------------------------------------------ +void OStoreDB::RetrieveJob::checkSucceed() { + m_jobDelete->wait(); + m_retrieveRequest.resetValues(); + // We no more own the job (which could be gone) m_jobOwned = false; // Remove ownership form the agent + const std::string rtfrAddress = m_retrieveRequest.getAddressIfSet(); m_agentReference.removeFromOwnership(rtfrAddress, m_objectStore); - log::LogContext lc(m_logger); - log::ScopedParamContainer params(lc); - params.add("requestObject", rtfrAddress) - .add("schedulerDbTime", t.secs()); - lc.log(log::INFO, "In RetrieveJob::succeed(): deleted completed retrieve request."); } + } // namespace cta diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index df6da7fc8b94ad7ec7182bbf2f68bd6dde19b6e6..9d22e05fe10812165ca770cdda04e51468ebd945 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -188,7 +188,8 @@ public: public: CTA_GENERATE_EXCEPTION_CLASS(JobNowOwned); CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob); - virtual void succeed() override; + virtual void asyncSucceed() override; + virtual void checkSucceed() override; virtual void fail(log::LogContext &) override; virtual ~RetrieveJob() override; private: @@ -202,6 +203,7 @@ public: objectstore::AgentReference & m_agentReference; objectstore::RetrieveRequest m_retrieveRequest; OStoreDB::RetrieveMount & m_retrieveMount; + std::unique_ptr<objectstore::RetrieveRequest::AsyncJobDeleter> m_jobDelete; }; /* === Archive requests handling ========================================= */ diff --git a/scheduler/RetrieveJob.cpp b/scheduler/RetrieveJob.cpp index d27da1c188d42e8a70d02969c410043a266d2713..f9a31574ee28342567096a99ef1ae2cf6f4da1bb 100644 --- a/scheduler/RetrieveJob.cpp +++ b/scheduler/RetrieveJob.cpp @@ -40,12 +40,19 @@ cta::RetrieveJob::RetrieveJob(RetrieveMount &mount, transferredSize(std::numeric_limits<decltype(transferredSize)>::max()) {} //------------------------------------------------------------------------------ -// complete +// asyncComplete //------------------------------------------------------------------------------ -void cta::RetrieveJob::complete() { - m_dbJob->succeed(); +void cta::RetrieveJob::asyncComplete() { + m_dbJob->asyncSucceed(); } - + +//------------------------------------------------------------------------------ +// checkComplete +//------------------------------------------------------------------------------ +void cta::RetrieveJob::checkComplete() { + m_dbJob->checkSucceed(); +} + //------------------------------------------------------------------------------ // failed //------------------------------------------------------------------------------ diff --git a/scheduler/RetrieveJob.hpp b/scheduler/RetrieveJob.hpp index d17ccde95f31ec8fb96fb77eb0b1e167da29a2b5..ef61d045b69bdb703bb09d6831f982b392bd2683 100644 --- a/scheduler/RetrieveJob.hpp +++ b/scheduler/RetrieveJob.hpp @@ -72,14 +72,20 @@ public: * Destructor. */ virtual ~RetrieveJob() throw(); - + + /** + * Asynchronously indicates to the backend that the job was successful. + * The checksum and the size of the transfer should already stored in the + * object beforehand. Result setting and calling complete are done in 2 + * different threads (disk write and reporter thread, respectively). + */ + virtual void asyncComplete(); + /** - * Indicates that the job was successful. The checksum and the size of the - * transfer should already stored in the object beforehand. Result setting - * and calling complete are done in 2 different threads (disk write and - * reporter thread, respectively). + * Check that asynchronous complete is finished and cleanup the job structures + * */ - virtual void complete(); + virtual void checkComplete(); /** * Indicates that the job failed. Like for complete(), reason for failure diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index 89426ea1ed58d13c70b96af988323dee05bea536..d4321688e64ac18b0841a74121b8655aeee747c8 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -336,7 +336,8 @@ public: cta::common::dataStructures::RetrieveRequest retrieveRequest; cta::common::dataStructures::ArchiveFile archiveFile; uint64_t selectedCopyNb; - virtual void succeed() = 0; + virtual void asyncSucceed() = 0; + virtual void checkSucceed() = 0; virtual void fail(log::LogContext &) = 0; virtual ~RetrieveJob() {} }; diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 35bcc403a48334a5bf0d5147d2b955c1f8499306..ba9b9a845b15590ec45cb9d4bfd831959e3debe3 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -521,7 +521,8 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) { ASSERT_EQ(1, jobBatch.size()); retrieveJob.reset(jobBatch.front().release()); ASSERT_NE((cta::RetrieveJob*)NULL, retrieveJob.get()); - retrieveJob->complete(); + retrieveJob->asyncComplete(); + retrieveJob->checkComplete(); jobBatch = retrieveMount->getNextJobBatch(1,1,lc); ASSERT_EQ(0, jobBatch.size()); } diff --git a/scheduler/testingMocks/MockRetrieveJob.hpp b/scheduler/testingMocks/MockRetrieveJob.hpp index 3693a0a8f8957d76fe7a2b3df41739d1bd248cf6..dbb9d2d3a10364c0e34e769fea1901979a9adf62 100644 --- a/scheduler/testingMocks/MockRetrieveJob.hpp +++ b/scheduler/testingMocks/MockRetrieveJob.hpp @@ -33,8 +33,8 @@ namespace cta { cta::PositioningMethod::ByBlock), completes(0), failures(0) { archiveFile.tapeFiles[1]; } - - virtual void complete() override { completes++; } + virtual void asyncComplete() override { completes++; } + virtual void checkComplete() override {} virtual void failed(cta::log::LogContext &) override { failures++; }; ~MockRetrieveJob() throw() {} diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp index 7b7576c5024710a5e6178276d6763621a978cd85..2466fe548353e054971c74c89d88fa7940218434 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp @@ -112,7 +112,14 @@ void RecallReportPacker::reportTestGoingToEnd(){ //ReportSuccessful::execute //------------------------------------------------------------------------------ void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& parent){ - m_successfulRetrieveJob->complete(); + m_successfulRetrieveJob->asyncComplete(); +} + +//------------------------------------------------------------------------------ +//ReportSuccessful::waitForAsyncExecuteFinished +//------------------------------------------------------------------------------ +void RecallReportPacker::ReportSuccessful::waitForAsyncExecuteFinished(){ + m_successfulRetrieveJob->checkComplete(); } //------------------------------------------------------------------------------ @@ -217,6 +224,8 @@ void RecallReportPacker::WorkerThread::run(){ m_parent.m_lc.pushOrReplace(Param("thread", "RecallReportPacker")); m_parent.m_lc.log(cta::log::DEBUG, "Starting RecallReportPacker thread"); bool endFound = false; + + std::list <std::unique_ptr<Report>> reportedSuccessfully; while(1) { std::string debugType; std::unique_ptr<Report> rep(m_parent.m_fifo.pop()); @@ -242,6 +251,17 @@ void RecallReportPacker::WorkerThread::run(){ // as opposed to migrations where one failure fails the session. try { rep->execute(m_parent); + if (typeid(*rep) == typeid(RecallReportPacker::ReportSuccessful)) { + reportedSuccessfully.emplace_back(std::move(rep)); + cta::utils::Timer timing; + const unsigned int checkedReports = m_parent.flushCheckAndFinishAsyncExecute(reportedSuccessfully); + if(checkedReports) { + cta::log::ScopedParamContainer params(m_parent.m_lc); + params.add("checkedReports", checkedReports) + .add("reportingTime", timing.secs()); + m_parent.m_lc.log(cta::log::DEBUG, "After flushCheckAndFinishAsyncExecute()"); + } + } } catch(const cta::exception::Exception& e){ //we get there because to tried to close the connection and it failed //either from the catch a few lines above or directly from rep->execute @@ -275,6 +295,42 @@ void RecallReportPacker::WorkerThread::run(){ } if (endFound) break; } + + try { + cta::utils::Timer timing; + const unsigned int checkedReports = m_parent.fullCheckAndFinishAsyncExecute(reportedSuccessfully); + if (checkedReports) { + cta::log::ScopedParamContainer params(m_parent.m_lc); + params.add("checkedReports", checkedReports) + .add("reportingTime", timing.secs()); + m_parent.m_lc.log(cta::log::DEBUG, "After fullCheckAndFinishAsyncExecute()"); + } + } catch(const cta::exception::Exception& e){ + cta::log::ScopedParamContainer params(m_parent.m_lc); + params.add("exceptionWhat", e.getMessageValue()) + .add("exceptionType", typeid(e).name()); + m_parent.m_lc.log(cta::log::ERR, "Tried to report and got a CTA exception."); + if (m_parent.m_watchdog) { + m_parent.m_watchdog->addToErrorCount("Error_clientCommunication"); + m_parent.m_watchdog->addParameter(cta::log::Param("status","failure")); + } + } catch(const std::exception& e){ + cta::log::ScopedParamContainer params(m_parent.m_lc); + params.add("exceptionWhat", e.what()) + .add("exceptionType", typeid(e).name()); + m_parent.m_lc.log(cta::log::ERR, "Tried to report and got a standard exception."); + if (m_parent.m_watchdog) { + m_parent.m_watchdog->addToErrorCount("Error_clientCommunication"); + m_parent.m_watchdog->addParameter(cta::log::Param("status","failure")); + } + } catch(...){ + m_parent.m_lc.log(cta::log::ERR, "Tried to report and got an unknown exception."); + if (m_parent.m_watchdog) { + m_parent.m_watchdog->addToErrorCount("Error_clientCommunication"); + m_parent.m_watchdog->addParameter(cta::log::Param("status","failure")); + } + } + // Drain the fifo in case we got an exception if (!endFound) { while (1) { @@ -305,6 +361,38 @@ bool RecallReportPacker::errorHappened() { return m_errorHappened || (m_watchdog && m_watchdog->errorHappened()); } +//------------------------------------------------------------------------------ +//flushCheckAndFinishAsyncExecute() +//------------------------------------------------------------------------------ +unsigned int RecallReportPacker::flushCheckAndFinishAsyncExecute(std::list <std::unique_ptr<Report>> &reportedSuccessfully) { + unsigned int checkedReports = 0; + if (reportedSuccessfully.size() >= RECALL_REPORT_PACKER_FLUSH_SIZE) { + while (!reportedSuccessfully.empty()) { + std::unique_ptr<Report> report=std::move(reportedSuccessfully.back()); + reportedSuccessfully.pop_back(); + if (!report.get()) continue; + report->waitForAsyncExecuteFinished(); + checkedReports ++; + } + } + return checkedReports; +} + +//------------------------------------------------------------------------------ +//fullCheckAndFinishAsyncExecute() +//------------------------------------------------------------------------------ +unsigned int RecallReportPacker::fullCheckAndFinishAsyncExecute(std::list <std::unique_ptr<Report>> &reportedSuccessfully) { + unsigned int checkedReports = 0; + while (!reportedSuccessfully.empty()) { + std::unique_ptr<Report> report=std::move(reportedSuccessfully.back()); + reportedSuccessfully.pop_back(); + if (!report.get()) continue; + report->waitForAsyncExecuteFinished(); + checkedReports++; + } + return checkedReports; +} + //------------------------------------------------------------------------------ //reportTapeDone() //------------------------------------------------------------------------------ diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp index d917307c2ba2bab7f2f302a5d7e4db6c3d3464fd..3d8defd399274dc95c8ada1b680638bf84a62c9f 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp @@ -125,6 +125,7 @@ private: public: virtual ~Report(){} virtual void execute(RecallReportPacker& packer)=0; + virtual void waitForAsyncExecuteFinished() {}; virtual bool goingToEnd() {return false;} }; class ReportTestGoingToEnd : public Report { @@ -144,6 +145,7 @@ private: ReportSuccessful(std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob): m_successfulRetrieveJob(std::move(successfulRetrieveJob)){} void execute(RecallReportPacker& reportPacker) override; + void waitForAsyncExecuteFinished() override; }; class ReportError : public Report { /** @@ -220,6 +222,27 @@ private: * Tracking of the disk thread end */ bool m_diskThreadComplete; +public: + /* + * Check if flush limit is reached and proceed finish procedure for async execute + * + * @param reportedSuccessfuly The successful reports to check + * @return The number of reports proceeded + */ + unsigned int flushCheckAndFinishAsyncExecute(std::list <std::unique_ptr<Report>> &reportedSuccessfully); + + /* + * Proceed finish procedure for async execute for all reports. + * + * @param reportedSuccessfuly The successful reports to check + * @return The number of reports proceeded + */ + unsigned int fullCheckAndFinishAsyncExecute(std::list <std::unique_ptr<Report>> &reportedSuccessfully); + + /* + * The limit for successful reports to trigger flush. + */ + const unsigned int RECALL_REPORT_PACKER_FLUSH_SIZE = 32; }; }}}} diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPackerTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPackerTest.cpp index 9f254e454d7be03569c52111fa78727256840253..ff950c5d0ecfbccc1e6d33cdf57d648007d6342e 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPackerTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPackerTest.cpp @@ -51,11 +51,12 @@ protected: public: MockRetrieveJobExternalStats(cta::RetrieveMount & rm, int & completes, int &failures): MockRetrieveJob(rm), completesRef(completes), failuresRef(failures) {} - - virtual void complete() override { + + virtual void asyncComplete() override { completesRef++; } + virtual void checkComplete() override {} virtual void failed(cta::log::LogContext &) override { failuresRef++; diff --git a/tests/helgrind.suppr b/tests/helgrind.suppr index 0d762ca6a983e85a94cc6f62700c29c1f965734c..f21bee3b7e44de54a36971f5333e3170fea23c5c 100644 --- a/tests/helgrind.suppr +++ b/tests/helgrind.suppr @@ -469,6 +469,55 @@ fun:_ZN9__gnu_cxx13new_allocatorINSt13__future_base17_Async_state_implISt12_Bind_simpleIFZN3cta11objectstore10BackendVFS12AsyncUpdaterC1ERS6_RKSsRSt8functionIFSsSA_EEEUlvE_vEEvEEE9constructISI_ISH_EEEvPT_DpOT0_ } +{ + SharePtrRace3 + Helgrind:Race + fun:_ZNSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS1_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS6_RKSsEUlvE_vEEvEC1EOSD_EUlvE_vEEED1Ev + fun:_ZN9__gnu_cxx13new_allocatorINSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS3_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS8_RKSsEUlvE_vEEvEC1EOSF_EUlvE_vEEEEE7destroyISL_EEvPT_ + fun:_ZNSt16allocator_traitsISaINSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS2_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS7_RKSsEUlvE_vEEvEC1EOSE_EUlvE_vEEEEEE10_S_destroyISK_EENSt9enable_ifIXsrNSM_16__destroy_helperIT_EE5valueEvE4typeERSL_PSQ_ + fun:_ZNSt16allocator_traitsISaINSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS2_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS7_RKSsEUlvE_vEEvEC1EOSE_EUlvE_vEEEEEE7destroyISK_EEvRSL_PT_ + fun:_ZNSt23_Sp_counted_ptr_inplaceINSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS2_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS7_RKSsEUlvE_vEEvEC1EOSE_EUlvE_vEEEESaISK_ELN9__gnu_cxx12_Lock_policyE2EE10_M_disposeEv + fun:_ZNSt16_Sp_counted_baseILN9__gnu_cxx12_Lock_policyE2EE10_M_releaseEv + fun:_ZNSt14__shared_countILN9__gnu_cxx12_Lock_policyE2EED1Ev + fun:_ZNSt12__shared_ptrINSt6thread10_Impl_baseELN9__gnu_cxx12_Lock_policyE2EED1Ev + fun:_ZNSt10shared_ptrINSt6thread10_Impl_baseEED1Ev + fun:_ZNSt6threadC1IZNSt13__future_base17_Async_state_implISt12_Bind_simpleIFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS6_RKSsEUlvE_vEEvEC1EOSD_EUlvE_IEEEOT_DpOT0_ + fun:_ZNSt13__future_base17_Async_state_implISt12_Bind_simpleIFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS4_RKSsEUlvE_vEEvEC1EOSB_ + fun:_ZN9__gnu_cxx13new_allocatorINSt13__future_base17_Async_state_implISt12_Bind_simpleIFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS6_RKSsEUlvE_vEEvEEE9constructISE_ISD_EEEvPT_DpOT0_ +} + +{ + SharePtrRace4 + Helgrind:Race + fun:_ZNSt6thread10_Impl_baseD1Ev + fun:_ZNSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS1_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS6_RKSsEUlvE_vEEvEC1EOSD_EUlvE_vEEED1Ev + fun:_ZN9__gnu_cxx13new_allocatorINSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS3_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS8_RKSsEUlvE_vEEvEC1EOSF_EUlvE_vEEEEE7destroyISL_EEvPT_ + fun:_ZNSt16allocator_traitsISaINSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS2_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS7_RKSsEUlvE_vEEvEC1EOSE_EUlvE_vEEEEEE10_S_destroyISK_EENSt9enable_ifIXsrNSM_16__destroy_helperIT_EE5valueEvE4typeERSL_PSQ_ + fun:_ZNSt16allocator_traitsISaINSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS2_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS7_RKSsEUlvE_vEEvEC1EOSE_EUlvE_vEEEEEE7destroyISK_EEvRSL_PT_ + fun:_ZNSt23_Sp_counted_ptr_inplaceINSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS2_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS7_RKSsEUlvE_vEEvEC1EOSE_EUlvE_vEEEESaISK_ELN9__gnu_cxx12_Lock_policyE2EE10_M_disposeEv + fun:_ZNSt16_Sp_counted_baseILN9__gnu_cxx12_Lock_policyE2EE10_M_releaseEv + fun:_ZNSt14__shared_countILN9__gnu_cxx12_Lock_policyE2EED1Ev + fun:_ZNSt12__shared_ptrINSt6thread10_Impl_baseELN9__gnu_cxx12_Lock_policyE2EED1Ev + fun:_ZNSt10shared_ptrINSt6thread10_Impl_baseEED1Ev + fun:_ZNSt6threadC1IZNSt13__future_base17_Async_state_implISt12_Bind_simpleIFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS6_RKSsEUlvE_vEEvEC1EOSD_EUlvE_IEEEOT_DpOT0_ + fun:_ZNSt13__future_base17_Async_state_implISt12_Bind_simpleIFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS4_RKSsEUlvE_vEEvEC1EOSB_ +} + +{ + SharePtrRace5 + Helgrind:Race + fun:_ZSt9call_onceIMNSt13__future_base11_State_baseEFvRSt8functionIFSt10unique_ptrINS0_12_Result_baseENS4_8_DeleterEEvEERbEIKPS1_St17reference_wrapperIS8_ESF_IbEEEvRSt9once_flagOT_DpOT0_ + fun:_ZNSt13__future_base11_State_base13_M_set_resultESt8functionIFSt10unique_ptrINS_12_Result_baseENS3_8_DeleterEEvEEb + fun:_ZZNSt13__future_base17_Async_state_implISt12_Bind_simpleIFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS4_RKSsEUlvE_vEEvEC1EOSB_ENKUlvE_clEv + fun:_ZNSt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS4_RKSsEUlvE_vEEvEC1EOSB_EUlvE_vEE9_M_invokeIIEEEvSt12_Index_tupleIIXspT_EEE + fun:_ZNSt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS4_RKSsEUlvE_vEEvEC1EOSB_EUlvE_vEEclEv + fun:_ZNSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS1_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS6_RKSsEUlvE_vEEvEC1EOSD_EUlvE_vEEE6_M_runEv + obj:/usr/lib64/libstdc++.so.6.0.19 + fun:mythread_wrapper + fun:start_thread + fun:clone +} + { AsyncGccRace Helgrind:Race