diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 91cf6a0eaea431159a11d4ce908c144b7e5b6949..423902af009fb5175260b77052c27e939a31c350 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -39,6 +39,7 @@ #include "common/archiveNS/TapeFileLocation.hpp" #include "ArchiveToTapeCopyRequest.hpp" #include "common/archiveNS/ArchiveFile.hpp" +#include "objectstore/ArchiveRequest.hpp" #include <algorithm> #include <stdlib.h> /* srand, rand */ #include <time.h> /* time */ @@ -368,6 +369,109 @@ OStoreDB::ArchiveToFileRequestCreation::~ArchiveToFileRequestCreation() { } catch (...) {} } +void OStoreDB::ArchiveRequestCreation::complete() { + // We inherited all the objects from the creation. + // Lock is still here at that point. + // First, record that we are fine for next step. + m_request.setAllJobsLinkingToTapePool(); + m_request.commit(); + objectstore::RootEntry re(m_objectStore); + // We can now plug the request onto its tape pools. + // We can discover at that point that a tape pool is actually not + // really owned by the root entry, and hence a dangling pointer + // We should then unlink the jobs from that already connected + // tape pools and abort the job creation. + // The list of done tape pools is held here for this purpose + // Reconstruct the job list + auto jl = m_request.dumpJobs(); + std::list<std::string> linkedTapePools; + try { + for (auto j=jl.begin(); j!=jl.end(); j++) { + objectstore::TapePool tp(j->tapePoolAddress, m_objectStore); + ScopedExclusiveLock tpl(tp); + tp.fetch(); + if (tp.getOwner() != re.getAddressIfSet()) + throw NoSuchTapePool("In OStoreDB::queue: non-existing tape pool found " + "(dangling pointer): cancelling request creation."); + tp.addJob(*j, m_request.getAddressIfSet(), m_request.getDrData().getDrPath(), + m_request.getFileSize(), 0, + m_request.getCreationLog().getTime()); + // Now that we have the tape pool handy, get the retry limits from it and + // assign them to the job + m_request.setJobFailureLimits(j->copyNb, tp.getMaxRetriesWithinMount(), + tp.getMaxTotalRetries()); + tp.commit(); + linkedTapePools.push_back(j->tapePoolAddress); + } + } catch (NoSuchTapePool &) { + // Unlink the request from already connected tape pools + for (auto tpa=linkedTapePools.begin(); tpa!=linkedTapePools.end(); tpa++) { + objectstore::TapePool tp(*tpa, m_objectStore); + ScopedExclusiveLock tpl(tp); + tp.fetch(); + tp.removeJob(m_request.getAddressIfSet()); + tp.commit(); + m_request.remove(); + } + throw; + } + // The request is now fully set. As it's multi-owned, we do not set the owner, + // just to disown it from the agent. + m_request.setOwner(""); + m_request.commit(); + m_lock.release(); + // And remove reference from the agent + { + objectstore::ScopedExclusiveLock al(*m_agent); + m_agent->fetch(); + m_agent->removeFromOwnership(m_request.getAddressIfSet()); + m_agent->commit(); + } + m_closed=true; + return; +} + +void OStoreDB::ArchiveRequestCreation::cancel() { + // We inherited everything from the creation, and all we have to + // do here is to delete the request from storage and dereference it from + // the agent's entry + if (m_closed) { + throw ArchiveRequestAlreadyCompleteOrCanceled( + "In OStoreDB::ArchiveToFileRequestCreation::cancel: trying the close " + "the request creation twice"); + } + m_request.remove(); + { + objectstore::ScopedExclusiveLock al(*m_agent); + m_agent->fetch(); + m_agent->removeFromOwnership(m_request.getAddressIfSet()); + m_agent->commit(); + } + m_closed=true; + return; +} + +OStoreDB::ArchiveRequestCreation::~ArchiveRequestCreation() { + // We have to determine whether complete() or cancel() were called, in which + // case there is nothing to do, or not, in which case we have to garbage + // collect the archive to file request. This will queue it to the appropriate + // tape pool(s) orphanesArchiveToFileCreations. The schedule will then + // determine its fate depending on the status of the NS entry creation + // (no entry, just cancel, already created in NS, carry on). + if (m_closed) + return; + try { + m_request.garbageCollect(m_agent->getAddressIfSet()); + { + objectstore::ScopedExclusiveLock al(*m_agent); + m_agent->fetch(); + m_agent->removeFromOwnership(m_request.getAddressIfSet()); + m_agent->commit(); + } + m_closed=true; + } catch (...) {} +} + void OStoreDB::createTapePool(const std::string& name, const uint32_t nbPartialTapes, const cta::CreationLog &creationLog) { RootEntry re(m_objectStore); @@ -639,9 +743,9 @@ std::unique_ptr<cta::SchedulerDatabase::ArchiveToFileRequestCreation> return ret; } -std::unique_ptr<cta::SchedulerDatabase::ArchiveToFileRequestCreation> +std::unique_ptr<cta::SchedulerDatabase::ArchiveRequestCreation> OStoreDB::queue(const cta::common::dataStructures::ArchiveRequest &request, const uint64_t archiveFileId) { - return std::unique_ptr<cta::SchedulerDatabase::ArchiveToFileRequestCreation>(); + return std::unique_ptr<cta::SchedulerDatabase::ArchiveRequestCreation>(); } void OStoreDB::deleteArchiveRequest(const SecurityIdentity& requester, diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index cf745b89b3c32255168bf008e618bc51d86992af..18106691b1271454124bdf500e3256bf62c28147 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -21,6 +21,7 @@ #include "scheduler/SchedulerDatabase.hpp" #include "objectstore/Agent.hpp" #include "objectstore/ArchiveToFileRequest.hpp" +#include "objectstore/ArchiveRequest.hpp" #include "objectstore/DriveRegister.hpp" #include "objectstore/RetrieveToFileRequest.hpp" #include "objectstore/SchedulerGlobalLock.hpp" @@ -230,11 +231,29 @@ public: bool m_closed; friend class cta::OStoreDB; }; + + class ArchiveRequestCreation: + public cta::SchedulerDatabase::ArchiveRequestCreation { + public: + ArchiveRequestCreation(objectstore::Agent * agent, + objectstore::Backend & be): m_request(be), m_lock(), m_objectStore(be), + m_agent(agent), m_closed(false) {} + virtual void complete(); + virtual void cancel(); + virtual ~ArchiveRequestCreation(); + private: + objectstore::ArchiveRequest m_request; + objectstore::ScopedExclusiveLock m_lock; + objectstore::Backend & m_objectStore; + objectstore::Agent * m_agent; + bool m_closed; + friend class cta::OStoreDB; + }; virtual std::unique_ptr<cta::SchedulerDatabase::ArchiveToFileRequestCreation> queue(const ArchiveToFileRequest& rqst); - virtual std::unique_ptr<cta::SchedulerDatabase::ArchiveToFileRequestCreation> queue(const cta::common::dataStructures::ArchiveRequest &request, const uint64_t archiveFileId); + virtual std::unique_ptr<cta::SchedulerDatabase::ArchiveRequestCreation> queue(const cta::common::dataStructures::ArchiveRequest &request, const uint64_t archiveFileId); CTA_GENERATE_EXCEPTION_CLASS(NoSuchArchiveRequest); CTA_GENERATE_EXCEPTION_CLASS(ArchiveRequestAlreadyDeleted); diff --git a/scheduler/OStoreDB/OStoreDBFactory.hpp b/scheduler/OStoreDB/OStoreDBFactory.hpp index b71d55e523d2f696cf6bfc76c6597af5fed8edb0..e0e6f6be3492bb703f6ce122cad16d68767899a1 100644 --- a/scheduler/OStoreDB/OStoreDBFactory.hpp +++ b/scheduler/OStoreDB/OStoreDBFactory.hpp @@ -165,7 +165,7 @@ public: return m_OStoreDB.queue(rqst); } - virtual std::unique_ptr<ArchiveToFileRequestCreation> queue(const cta::common::dataStructures::ArchiveRequest &request, const uint64_t archiveFileId) { + virtual std::unique_ptr<ArchiveRequestCreation> queue(const cta::common::dataStructures::ArchiveRequest &request, const uint64_t archiveFileId) { return m_OStoreDB.queue(request, archiveFileId); } diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index bf278916a5598b1a749b79ff80b5b8329b10b396..9c640358678839a3b7b41f5a731e3b4e7a3b5199 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -49,7 +49,7 @@ cta::Scheduler::~Scheduler() throw() { //------------------------------------------------------------------------------ uint64_t cta::Scheduler::queueArchiveRequest(const cta::common::dataStructures::SecurityIdentity &requestPusher, const cta::common::dataStructures::ArchiveRequest &request) { const uint64_t archiveFileId = m_catalogue.getNextArchiveFileId(); - std::unique_ptr<SchedulerDatabase::ArchiveToFileRequestCreation> requestCreation(m_db.queue(request, archiveFileId)); + std::unique_ptr<SchedulerDatabase::ArchiveRequestCreation> requestCreation(m_db.queue(request, archiveFileId)); requestCreation->complete(); return 0; diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index e131b1bce118b197b7d836d0ff1d6593ff89926b..d8f8486055af7368ae08255d78b26f35fd975bca 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -65,6 +65,20 @@ public: virtual void cancel() = 0; virtual ~ArchiveToFileRequestCreation() {}; }; + + /* + * Subclass allowing the tracking and automated cleanup of a + * ArchiveToFile requests on the SchdulerDB. Those 2 operations (creation+close + * or cancel) surround an NS operation. This class can keep references, locks, + * etc... handy to simplify the implementation of the completion and cancelling + * (plus the destructor in case the caller fails half way through). + */ + class ArchiveRequestCreation { + public: + virtual void complete() = 0; + virtual void cancel() = 0; + virtual ~ArchiveRequestCreation() {}; + }; /** * Queues the specified request. DEPRECATED @@ -78,7 +92,7 @@ public: * * @param rqst The request. */ - virtual std::unique_ptr<ArchiveToFileRequestCreation> queue(const cta::common::dataStructures::ArchiveRequest &request, const uint64_t archiveFileId) = 0; + virtual std::unique_ptr<ArchiveRequestCreation> queue(const cta::common::dataStructures::ArchiveRequest &request, const uint64_t archiveFileId) = 0; /** * Returns all of the queued archive requests. The returned requests are