diff --git a/objectstore/ArchiveQueue.cpp b/objectstore/ArchiveQueue.cpp index 35f988b497e0b125c9f37cafdb18f0f49039eea6..be2d7b035ca6127f477161e9b131329526f0c93c 100644 --- a/objectstore/ArchiveQueue.cpp +++ b/objectstore/ArchiveQueue.cpp @@ -29,6 +29,9 @@ namespace cta { namespace objectstore { ArchiveQueue::ArchiveQueue(const std::string& address, Backend& os): ObjectOps<serializers::ArchiveQueue, serializers::ArchiveQueue_t>(os, address) { } +ArchiveQueue::ArchiveQueue(Backend& os): + ObjectOps<serializers::ArchiveQueue, serializers::ArchiveQueue_t>(os) { } + ArchiveQueue::ArchiveQueue(GenericObject& go): ObjectOps<serializers::ArchiveQueue, serializers::ArchiveQueue_t>(go.objectStore()) { // Here we transplant the generic object into the new object diff --git a/objectstore/ArchiveQueue.hpp b/objectstore/ArchiveQueue.hpp index fc416b67165eb8ad097cb80ab6ad062f1248ab9c..d237b0c288996035a3dd8eaa1e4d9d88cb052d4f 100644 --- a/objectstore/ArchiveQueue.hpp +++ b/objectstore/ArchiveQueue.hpp @@ -40,6 +40,9 @@ public: // Constructor ArchiveQueue(const std::string & address, Backend & os); + // Undefined object constructor + ArchiveQueue(Backend & os); + // Upgrader form generic object ArchiveQueue(GenericObject & go); diff --git a/objectstore/RootEntry.cpp b/objectstore/RootEntry.cpp index 42cbcaacb146089b8688b7abab0b7358faf0baec..8927d3bb210a42af1b0bdf90d24da8ff0f8a5c16 100644 --- a/objectstore/RootEntry.cpp +++ b/objectstore/RootEntry.cpp @@ -31,10 +31,12 @@ namespace cta { namespace objectstore { +const std::string RootEntry::address("root"); + // construtor, when the backend store exists. // Checks the existence and correctness of the root entry RootEntry::RootEntry(Backend & os): - ObjectOps<serializers::RootEntry, serializers::RootEntry_t>(os, "root") {} + ObjectOps<serializers::RootEntry, serializers::RootEntry_t>(os, address) {} RootEntry::RootEntry(GenericObject& go): ObjectOps<serializers::RootEntry, serializers::RootEntry_t>(go.objectStore()) { diff --git a/objectstore/RootEntry.hpp b/objectstore/RootEntry.hpp index df22fe32f6afc0ded6485a60489e17d32479b075..0a8b3d71cdda8e1937afd981b2f6b54dffa680dd 100644 --- a/objectstore/RootEntry.hpp +++ b/objectstore/RootEntry.hpp @@ -34,6 +34,9 @@ class GenericObject; class RootEntry: public ObjectOps<serializers::RootEntry, serializers::RootEntry_t> { public: + // The conventional address of the root entry + static const std::string address; // = "root" + // Constructor RootEntry(Backend & os); RootEntry(GenericObject & go); diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index d594b8e918ea54c39446b0d7b3ee8c34ca421e47..d0c3926b27371c9165147ac8776d37505f542ba5 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -167,7 +167,6 @@ std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> ret(std::move(privateRet)); return ret; } - /* Old getMountInfo //------------------------------------------------------------------------------ // OStoreDB::getMountInfo() @@ -277,6 +276,45 @@ std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> } */ +//------------------------------------------------------------------------------ +// OStoreDB::getLockedAndFetchedArchiveQueue() +//------------------------------------------------------------------------------ +void OStoreDB::getLockedAndFetchedArchiveQueue(cta::objectstore::ArchiveQueue& archiveQueue, + cta::objectstore::ScopedExclusiveLock& archiveQueueLock, const std::string& tapePool) { + // TODO: if necessary, we could use a singleton caching object here to accelerate + // lookups. + // Getting a locked AQ is the name of the game. + // Try and find an existing one first, create if needed + for (size_t i=0; i<5; i++) { + { + RootEntry re (m_objectStore); + ScopedSharedLock rel(re); + re.fetch(); + try { + archiveQueue.setAddress(re.getArchiveQueueAddress(tapePool)); + } catch (cta::exception::Exception & ex) { + rel.release(); + ScopedExclusiveLock rexl(re); + archiveQueue.setAddress(re.addOrGetArchiveQueueAndCommit(tapePool, *m_agentReference)); + } + } + try { + archiveQueueLock.lock(archiveQueue); + archiveQueue.fetch(); + return; + } catch (cta::exception::Exception & ex) { + // We have a (rare) opportunity for a race condition, where we identify the + // queue and it gets deleted before we manage to lock it. + // The locking of fetching will fail in this case. + // We hence allow ourselves to retry a couple times. + continue; + } + } + throw cta::exception::Exception(std::string( + "In OStoreDB::getLockedArchiveQueue(): failed to find or create and lock archive queue after 5 retries for tapepool: ") + + tapePool); +} + //------------------------------------------------------------------------------ // OStoreDB::queueArchive() //------------------------------------------------------------------------------ @@ -305,21 +343,18 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common:: aReq.setRequester(request.requester); aReq.setSrcURL(request.srcURL); aReq.setEntryLog(request.creationLog); - // We will need to identity tapepools is order to construct the request - RootEntry re(m_objectStore); - // TODO: need a softer locking, and a conbined usage of addOrGetArchiveQueueAndCommit and getArchiveQueue. - ScopedExclusiveLock rel(re); - re.fetch(); std::list<cta::objectstore::ArchiveRequest::JobDump> jl; for (auto & copy:criteria.copyToPoolMap) { - std::string aqaddr = re.addOrGetArchiveQueueAndCommit(copy.second, *m_agentReference); + ArchiveQueue aq(m_objectStore); + ScopedExclusiveLock aql; + getLockedAndFetchedArchiveQueue(aq, aql, copy.second); const uint32_t hardcodedRetriesWithinMount = 3; const uint32_t hardcodedTotalRetries = 6; - aReq.addJob(copy.first, copy.second, aqaddr, hardcodedRetriesWithinMount, hardcodedTotalRetries); + aReq.addJob(copy.first, copy.second, aq.getAddressIfSet(), hardcodedRetriesWithinMount, hardcodedTotalRetries); jl.push_back(cta::objectstore::ArchiveRequest::JobDump()); jl.back().copyNb = copy.first; jl.back().tapePool = copy.second; - jl.back().ArchiveQueueAddress = aqaddr; + jl.back().ArchiveQueueAddress = aq.getAddressIfSet(); } if (jl.empty()) { throw ArchiveRequestHasNoCopies("In OStoreDB::queue: the archive to file request has no copy"); @@ -342,7 +377,7 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common:: objectstore::ArchiveQueue aq(j.ArchiveQueueAddress, m_objectStore); ScopedExclusiveLock aql(aq); aq.fetch(); - if (aq.getOwner() != re.getAddressIfSet()) + if (aq.getOwner() != RootEntry::address) throw NoSuchArchiveQueue("In OStoreDB::queue: non-existing archive queue found " "(dangling pointer): canceling request creation."); aq.addJob(j, aReq.getAddressIfSet(), aReq.getArchiveFile().archiveFileID, diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index 10660e7c1940c2d793ace95659e5ba4dcf115b1c..abb3a210a8474f9c63a4d412ddd6b9a4d3a3e0e3 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -22,6 +22,7 @@ #include "objectstore/Agent.hpp" #include "objectstore/AgentReference.hpp" #include "objectstore/ArchiveRequest.hpp" +#include "objectstore/ArchiveQueue.hpp" #include "objectstore/ArchiveRequest.hpp" #include "objectstore/DriveRegister.hpp" #include "objectstore/RetrieveRequest.hpp" @@ -156,6 +157,17 @@ public: void queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request, const cta::common::dataStructures::ArchiveFileQueueCriteria &criteria) override; + + /** + * Find or create an archive queue, and return it locked and fetched to the caller + * (ArchiveQueue and ScopedExclusiveLock objects are provided empty) + * @param archiveQueue the ArchiveQueue object, empty + * @param archiveQueueLock the lock, not initialized + * @param tapePool the name of the needed tape pool + */ + void getLockedAndFetchedArchiveQueue(cta::objectstore::ArchiveQueue & archiveQueue, + cta::objectstore::ScopedExclusiveLock & archiveQueueLock, + const std::string & tapePool); CTA_GENERATE_EXCEPTION_CLASS(NoSuchArchiveRequest); CTA_GENERATE_EXCEPTION_CLASS(ArchiveRequestAlreadyDeleted);