From 765b7e0abeec8879d32ca582601b6b022c002440 Mon Sep 17 00:00:00 2001 From: Eric Cano <Eric.Cano@cern.ch> Date: Mon, 18 Sep 2017 16:33:02 +0200 Subject: [PATCH] Work in progree: Changed layout of GenericObject::asyncLockfreeFetch so it matches the model of previous sync operations like ArchiveRequest::asyncUpdateJobOwner(). Also made the no lock management match the model of previously developped nolock operations. Removed Backend::AsyncLockfreeFetcher::wait() as get() will pass the exceptions anyway. --- objectstore/Backend.hpp | 5 +---- objectstore/BackendRados.cpp | 13 ++++-------- objectstore/BackendRados.hpp | 1 - objectstore/BackendVFS.cpp | 9 ++------ objectstore/BackendVFS.hpp | 1 - objectstore/GarbageCollector.cpp | 8 +++++--- objectstore/GenericObject.cpp | 35 ++++++++++++++++---------------- objectstore/GenericObject.hpp | 25 +++++++++++++++-------- objectstore/ObjectOps.hpp | 6 ------ 9 files changed, 47 insertions(+), 56 deletions(-) diff --git a/objectstore/Backend.hpp b/objectstore/Backend.hpp index 9979289e3f..b04227134d 100644 --- a/objectstore/Backend.hpp +++ b/objectstore/Backend.hpp @@ -169,13 +169,10 @@ public: class AsyncLockfreeFetcher { public: - /** - * Waits for completion (success) of throws exception (failure). - */ - virtual void wait() = 0; /** * Returns the result of the async operation. * Only could be call once and will throw an exception for second call. + * Also throws an exception if a problem was encountered. */ virtual std::string get() = 0; diff --git a/objectstore/BackendRados.cpp b/objectstore/BackendRados.cpp index 725d60da9f..6c0fa737a4 100644 --- a/objectstore/BackendRados.cpp +++ b/objectstore/BackendRados.cpp @@ -682,7 +682,7 @@ void BackendRados::AsyncLockfreeFetcher::statCallback(librados::completion_t com // Get the object size if (rados_aio_get_return_value(completion)) { cta::exception::Errnum errnum(-rados_aio_get_return_value(completion), - std::string("In BackendRados::AsyncUpdater::statCallback(): could not stat object: ") + au.m_name); + std::string("In BackendRados::AsyncLockfreeFetcher::statCallback(): could not stat object: ") + au.m_name); throw Backend::NoSuchObject(errnum.getMessageValue()); } // Check the size. @@ -694,7 +694,7 @@ void BackendRados::AsyncLockfreeFetcher::statCallback(librados::completion_t com const auto rc = au.m_backend.m_radosCtx.aio_read(au.m_name, aioc, &au.m_radosBufferList, au.m_size, 0); aioc->release(); if (rc) { - cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncUpdater::statCallback(): failed to launch aio_read(): ")+au.m_name); + cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncLockfreeFetcher::statCallback(): failed to launch aio_read(): ")+au.m_name); throw Backend::NoSuchObject(errnum.getMessageValue()); } } catch (...) { @@ -731,16 +731,11 @@ void BackendRados::AsyncLockfreeFetcher::fetchCallback(librados::completion_t co } } -void BackendRados::AsyncLockfreeFetcher::wait() { - m_jobFuture.wait(); - ANNOTATE_HAPPENS_AFTER(&m_job); - ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job); -} - std::string BackendRados::AsyncLockfreeFetcher::get() { - return m_jobFuture.get(); + auto ret=m_jobFuture.get(); ANNOTATE_HAPPENS_AFTER(&m_job); ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job); + return ret; } std::string BackendRados::Parameters::toStr() { diff --git a/objectstore/BackendRados.hpp b/objectstore/BackendRados.hpp index a6b0421f39..72de100596 100644 --- a/objectstore/BackendRados.hpp +++ b/objectstore/BackendRados.hpp @@ -178,7 +178,6 @@ public: class AsyncLockfreeFetcher: public Backend::AsyncLockfreeFetcher { public: AsyncLockfreeFetcher(BackendRados & be, const std::string & name); - void wait() override; std::string get() override; private: /** A reference to the backend */ diff --git a/objectstore/BackendVFS.cpp b/objectstore/BackendVFS.cpp index 2cd0a50a16..598e082d15 100644 --- a/objectstore/BackendVFS.cpp +++ b/objectstore/BackendVFS.cpp @@ -449,16 +449,11 @@ Backend::AsyncLockfreeFetcher* BackendVFS::asyncLockfreeFetch(const std::string return new AsyncLockfreeFetcher(*this, name); } -void BackendVFS::AsyncLockfreeFetcher::wait() { - m_job.wait(); - ANNOTATE_HAPPENS_AFTER(&m_job); - ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job); -} - std::string BackendVFS::AsyncLockfreeFetcher::get() { - return m_job.get(); + auto ret=m_job.get(); ANNOTATE_HAPPENS_AFTER(&m_job); ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job); + return ret; } }} // end of cta::objectstore diff --git a/objectstore/BackendVFS.hpp b/objectstore/BackendVFS.hpp index 3b8359f439..94141b0025 100644 --- a/objectstore/BackendVFS.hpp +++ b/objectstore/BackendVFS.hpp @@ -130,7 +130,6 @@ public: class AsyncLockfreeFetcher: public Backend::AsyncLockfreeFetcher { public: AsyncLockfreeFetcher(BackendVFS & be, const std::string & name); - void wait() override; std::string get() override; private: /** A reference to the backend */ diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index 63dbc8f178..fbe628ee26 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -195,11 +195,12 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon // Parallel fetch (lock free) all the objects to assess their status (check ownership, // type and decide to which queue they will go. std::list<std::shared_ptr<GenericObject>> ownedObjects; + std::map<GenericObject *, std::unique_ptr<GenericObject::AsyncLockfreeFetcher>> ownedObjectsFetchers; for (auto & obj : ownedObjectAddresses) { // Create the generic objects and fetch them ownedObjects.emplace_back(new GenericObject(obj, m_objectStore)); if (ownedObjects.back()->exists()) { - ownedObjects.back()->asyncLockfreeFetch(); + ownedObjectsFetchers[ownedObjects.back().get()].reset(ownedObjects.back()->asyncLockfreeFetch()); } else { agent.removeFromOwnership(ownedObjects.back()->getAddressIfSet()); agent.commit(); @@ -210,8 +211,9 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon for (auto & obj : ownedObjects) { params.add("objectAddress", obj->getAddressIfSet()); - obj->waitAndGetAsyncLockfreeFetch(); - if (obj->getOwnerWithNoLock() != agent.getAddressIfSet()) { + ownedObjectsFetchers.at(obj.get())->wait(); + ownedObjectsFetchers.erase(obj.get()); + if (obj->getOwner() != agent.getAddressIfSet()) { lc.log(log::WARNING, "In GarbageCollector::cleanupDeadAgent(): skipping object which is not owned by this agent"); } else { switch (obj->getTypeWithNoLock()) { diff --git a/objectstore/GenericObject.cpp b/objectstore/GenericObject.cpp index cea1c0274a..27a596e4c3 100644 --- a/objectstore/GenericObject.cpp +++ b/objectstore/GenericObject.cpp @@ -38,28 +38,29 @@ void GenericObject::fetch() { m_existingObject = true; // Get the header from the object store. We don't care for the type auto objData=m_objectStore.read(getAddressIfSet()); - if (!m_header.ParseFromString(objData)) { - // Use a the tolerant parser to assess the situation. - m_header.ParsePartialFromString(objData); - throw cta::exception::Exception(std::string("In GenericObject::fetch: could not parse header: ") + - m_header.InitializationErrorString()); - } - m_headerInterpreted = true; + fetchBottomHalf(objData); } -void GenericObject::asyncLockfreeFetch() { - // Get the header from the object store. We don't care for the type - m_asyncLockfreeFetcher.reset(m_objectStore.asyncLockfreeFetch(getAddressIfSet())); +GenericObject::AsyncLockfreeFetcher::AsyncLockfreeFetcher(GenericObject& obj): +m_object(obj) {} + + +auto GenericObject::asyncLockfreeFetch() -> AsyncLockfreeFetcher * { + std::unique_ptr<AsyncLockfreeFetcher> ret (new AsyncLockfreeFetcher(*this)); + ret->m_backendFetcher.reset(m_objectStore.asyncLockfreeFetch(getAddressIfSet())); + return ret.release(); } -void GenericObject::waitAndGetAsyncLockfreeFetch() { - m_asyncLockfreeFetcher->wait(); - const auto objData = m_asyncLockfreeFetcher->get(); - - m_existingObject = true; - if (!m_header.ParseFromString(objData)) { +void GenericObject::AsyncLockfreeFetcher::wait() { + auto objData=m_backendFetcher->get(); + m_object.m_noLock=true; + m_object.fetchBottomHalf(objData); +} + +void GenericObject::fetchBottomHalf(const std::string& rawFetchedObject) { + if (!m_header.ParseFromString(rawFetchedObject)) { // Use a the tolerant parser to assess the situation. - m_header.ParsePartialFromString(objData); + m_header.ParsePartialFromString(rawFetchedObject); throw cta::exception::Exception(std::string("In GenericObject::fetch: could not parse header: ") + m_header.InitializationErrorString()); } diff --git a/objectstore/GenericObject.hpp b/objectstore/GenericObject.hpp index 1b2c1e9694..0098188b95 100644 --- a/objectstore/GenericObject.hpp +++ b/objectstore/GenericObject.hpp @@ -35,16 +35,25 @@ public: * types of objects */ void fetch(); - /** - * Asynchronously fetch object without lock. - */ - void asyncLockfreeFetch(); + /** An asynchronous lockfree fetcher class */ + class AsyncLockfreeFetcher { + friend class GenericObject; + public: + void wait(); + private: + AsyncLockfreeFetcher(GenericObject & obj); + std::unique_ptr<Backend::AsyncLockfreeFetcher> m_backendFetcher; + GenericObject & m_object; + }; + friend AsyncLockfreeFetcher; - /** - * wait for async object fetch and get the result. - */ - void waitAndGetAsyncLockfreeFetch(); + /** A lockfree fetcher factory */ + AsyncLockfreeFetcher * asyncLockfreeFetch(); + +private: + void fetchBottomHalf(const std::string & rawFetchedObject); +public: /** Overload of ObjectOps's implementation: we will leave the payload transparently * untouched and only deal with header parameters */ void commit(); diff --git a/objectstore/ObjectOps.hpp b/objectstore/ObjectOps.hpp index ea1777da72..661c61024e 100644 --- a/objectstore/ObjectOps.hpp +++ b/objectstore/ObjectOps.hpp @@ -141,12 +141,6 @@ public: return m_header.owner(); } - std::string getOwnerWithNoLock() { - if (!m_headerInterpreted) - throw NotFetched("In ObjectOps::checkHeaderReadable: header not yet fetched or initialized"); - return m_header.owner(); - } - void setBackupOwner(const std::string & owner) { checkHeaderWritable(); m_header.set_backupowner(owner); -- GitLab