diff --git a/objectstore/Backend.hpp b/objectstore/Backend.hpp index 9979289e3fd0b3e00b15ad7bd0c5abc473745337..b04227134d854582ea070dfa11a7bf5f20f473da 100644 --- a/objectstore/Backend.hpp +++ b/objectstore/Backend.hpp @@ -169,13 +169,10 @@ public: class AsyncLockfreeFetcher { public: - /** - * Waits for completion (success) of throws exception (failure). - */ - virtual void wait() = 0; /** * Returns the result of the async operation. * Only could be call once and will throw an exception for second call. + * Also throws an exception if a problem was encountered. */ virtual std::string get() = 0; diff --git a/objectstore/BackendRados.cpp b/objectstore/BackendRados.cpp index 725d60da9f3af7ff962c09a09e5a51dc427adcd7..6c0fa737a41f056f66e837fbba4d0b7b75cf132a 100644 --- a/objectstore/BackendRados.cpp +++ b/objectstore/BackendRados.cpp @@ -682,7 +682,7 @@ void BackendRados::AsyncLockfreeFetcher::statCallback(librados::completion_t com // Get the object size if (rados_aio_get_return_value(completion)) { cta::exception::Errnum errnum(-rados_aio_get_return_value(completion), - std::string("In BackendRados::AsyncUpdater::statCallback(): could not stat object: ") + au.m_name); + std::string("In BackendRados::AsyncLockfreeFetcher::statCallback(): could not stat object: ") + au.m_name); throw Backend::NoSuchObject(errnum.getMessageValue()); } // Check the size. @@ -694,7 +694,7 @@ void BackendRados::AsyncLockfreeFetcher::statCallback(librados::completion_t com const auto rc = au.m_backend.m_radosCtx.aio_read(au.m_name, aioc, &au.m_radosBufferList, au.m_size, 0); aioc->release(); if (rc) { - cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncUpdater::statCallback(): failed to launch aio_read(): ")+au.m_name); + cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncLockfreeFetcher::statCallback(): failed to launch aio_read(): ")+au.m_name); throw Backend::NoSuchObject(errnum.getMessageValue()); } } catch (...) { @@ -731,16 +731,11 @@ void BackendRados::AsyncLockfreeFetcher::fetchCallback(librados::completion_t co } } -void BackendRados::AsyncLockfreeFetcher::wait() { - m_jobFuture.wait(); - ANNOTATE_HAPPENS_AFTER(&m_job); - ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job); -} - std::string BackendRados::AsyncLockfreeFetcher::get() { - return m_jobFuture.get(); + auto ret=m_jobFuture.get(); ANNOTATE_HAPPENS_AFTER(&m_job); ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job); + return ret; } std::string BackendRados::Parameters::toStr() { diff --git a/objectstore/BackendRados.hpp b/objectstore/BackendRados.hpp index a6b0421f395fe042f58ae7efdc033473cf7a9658..72de10059648a4f178106c0d829d731516a59c10 100644 --- a/objectstore/BackendRados.hpp +++ b/objectstore/BackendRados.hpp @@ -178,7 +178,6 @@ public: class AsyncLockfreeFetcher: public Backend::AsyncLockfreeFetcher { public: AsyncLockfreeFetcher(BackendRados & be, const std::string & name); - void wait() override; std::string get() override; private: /** A reference to the backend */ diff --git a/objectstore/BackendVFS.cpp b/objectstore/BackendVFS.cpp index 2cd0a50a167e73f44375a751f3763e2aae4f55d3..598e082d1559ea85342e931420caeee4891aefa6 100644 --- a/objectstore/BackendVFS.cpp +++ b/objectstore/BackendVFS.cpp @@ -449,16 +449,11 @@ Backend::AsyncLockfreeFetcher* BackendVFS::asyncLockfreeFetch(const std::string return new AsyncLockfreeFetcher(*this, name); } -void BackendVFS::AsyncLockfreeFetcher::wait() { - m_job.wait(); - ANNOTATE_HAPPENS_AFTER(&m_job); - ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job); -} - std::string BackendVFS::AsyncLockfreeFetcher::get() { - return m_job.get(); + auto ret=m_job.get(); ANNOTATE_HAPPENS_AFTER(&m_job); ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job); + return ret; } }} // end of cta::objectstore diff --git a/objectstore/BackendVFS.hpp b/objectstore/BackendVFS.hpp index 3b8359f439def9a798cf35eb79b5979a333d54af..94141b002590ecf3dfd94d6659287e6c0b3d4ba2 100644 --- a/objectstore/BackendVFS.hpp +++ b/objectstore/BackendVFS.hpp @@ -130,7 +130,6 @@ public: class AsyncLockfreeFetcher: public Backend::AsyncLockfreeFetcher { public: AsyncLockfreeFetcher(BackendVFS & be, const std::string & name); - void wait() override; std::string get() override; private: /** A reference to the backend */ diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index 63dbc8f178fc0ad3da31e5bbea145b5d1a7b8fc3..fbe628ee2646ca2b7657bcd70bbd824d8ab93430 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -195,11 +195,12 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon // Parallel fetch (lock free) all the objects to assess their status (check ownership, // type and decide to which queue they will go. std::list<std::shared_ptr<GenericObject>> ownedObjects; + std::map<GenericObject *, std::unique_ptr<GenericObject::AsyncLockfreeFetcher>> ownedObjectsFetchers; for (auto & obj : ownedObjectAddresses) { // Create the generic objects and fetch them ownedObjects.emplace_back(new GenericObject(obj, m_objectStore)); if (ownedObjects.back()->exists()) { - ownedObjects.back()->asyncLockfreeFetch(); + ownedObjectsFetchers[ownedObjects.back().get()].reset(ownedObjects.back()->asyncLockfreeFetch()); } else { agent.removeFromOwnership(ownedObjects.back()->getAddressIfSet()); agent.commit(); @@ -210,8 +211,9 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon for (auto & obj : ownedObjects) { params.add("objectAddress", obj->getAddressIfSet()); - obj->waitAndGetAsyncLockfreeFetch(); - if (obj->getOwnerWithNoLock() != agent.getAddressIfSet()) { + ownedObjectsFetchers.at(obj.get())->wait(); + ownedObjectsFetchers.erase(obj.get()); + if (obj->getOwner() != agent.getAddressIfSet()) { lc.log(log::WARNING, "In GarbageCollector::cleanupDeadAgent(): skipping object which is not owned by this agent"); } else { switch (obj->getTypeWithNoLock()) { diff --git a/objectstore/GenericObject.cpp b/objectstore/GenericObject.cpp index cea1c0274a5f8785c128e17cba24214bfbf86c53..27a596e4c3b913f030d9ef321c43edd9166be1c3 100644 --- a/objectstore/GenericObject.cpp +++ b/objectstore/GenericObject.cpp @@ -38,28 +38,29 @@ void GenericObject::fetch() { m_existingObject = true; // Get the header from the object store. We don't care for the type auto objData=m_objectStore.read(getAddressIfSet()); - if (!m_header.ParseFromString(objData)) { - // Use a the tolerant parser to assess the situation. - m_header.ParsePartialFromString(objData); - throw cta::exception::Exception(std::string("In GenericObject::fetch: could not parse header: ") + - m_header.InitializationErrorString()); - } - m_headerInterpreted = true; + fetchBottomHalf(objData); } -void GenericObject::asyncLockfreeFetch() { - // Get the header from the object store. We don't care for the type - m_asyncLockfreeFetcher.reset(m_objectStore.asyncLockfreeFetch(getAddressIfSet())); +GenericObject::AsyncLockfreeFetcher::AsyncLockfreeFetcher(GenericObject& obj): +m_object(obj) {} + + +auto GenericObject::asyncLockfreeFetch() -> AsyncLockfreeFetcher * { + std::unique_ptr<AsyncLockfreeFetcher> ret (new AsyncLockfreeFetcher(*this)); + ret->m_backendFetcher.reset(m_objectStore.asyncLockfreeFetch(getAddressIfSet())); + return ret.release(); } -void GenericObject::waitAndGetAsyncLockfreeFetch() { - m_asyncLockfreeFetcher->wait(); - const auto objData = m_asyncLockfreeFetcher->get(); - - m_existingObject = true; - if (!m_header.ParseFromString(objData)) { +void GenericObject::AsyncLockfreeFetcher::wait() { + auto objData=m_backendFetcher->get(); + m_object.m_noLock=true; + m_object.fetchBottomHalf(objData); +} + +void GenericObject::fetchBottomHalf(const std::string& rawFetchedObject) { + if (!m_header.ParseFromString(rawFetchedObject)) { // Use a the tolerant parser to assess the situation. - m_header.ParsePartialFromString(objData); + m_header.ParsePartialFromString(rawFetchedObject); throw cta::exception::Exception(std::string("In GenericObject::fetch: could not parse header: ") + m_header.InitializationErrorString()); } diff --git a/objectstore/GenericObject.hpp b/objectstore/GenericObject.hpp index 1b2c1e9694db070cec16254194b25f0cce2ee094..0098188b955e02b69696545333b9a3de3bf8ab6c 100644 --- a/objectstore/GenericObject.hpp +++ b/objectstore/GenericObject.hpp @@ -35,16 +35,25 @@ public: * types of objects */ void fetch(); - /** - * Asynchronously fetch object without lock. - */ - void asyncLockfreeFetch(); + /** An asynchronous lockfree fetcher class */ + class AsyncLockfreeFetcher { + friend class GenericObject; + public: + void wait(); + private: + AsyncLockfreeFetcher(GenericObject & obj); + std::unique_ptr<Backend::AsyncLockfreeFetcher> m_backendFetcher; + GenericObject & m_object; + }; + friend AsyncLockfreeFetcher; - /** - * wait for async object fetch and get the result. - */ - void waitAndGetAsyncLockfreeFetch(); + /** A lockfree fetcher factory */ + AsyncLockfreeFetcher * asyncLockfreeFetch(); + +private: + void fetchBottomHalf(const std::string & rawFetchedObject); +public: /** Overload of ObjectOps's implementation: we will leave the payload transparently * untouched and only deal with header parameters */ void commit(); diff --git a/objectstore/ObjectOps.hpp b/objectstore/ObjectOps.hpp index ea1777da72f0ac19eae77770356b36b29e227e55..661c61024eba2174dbc9068172e12b351d42680e 100644 --- a/objectstore/ObjectOps.hpp +++ b/objectstore/ObjectOps.hpp @@ -141,12 +141,6 @@ public: return m_header.owner(); } - std::string getOwnerWithNoLock() { - if (!m_headerInterpreted) - throw NotFetched("In ObjectOps::checkHeaderReadable: header not yet fetched or initialized"); - return m_header.owner(); - } - void setBackupOwner(const std::string & owner) { checkHeaderWritable(); m_header.set_backupowner(owner);