diff --git a/objectstore/AgentRegister.cpp b/objectstore/AgentRegister.cpp index ce92b8a85795120b1eae08c2f99e09ad4999b68a..0536401cadf7e96304d3ef0327c9fe4b2ab45079 100644 --- a/objectstore/AgentRegister.cpp +++ b/objectstore/AgentRegister.cpp @@ -40,7 +40,7 @@ void cta::objectstore::AgentRegister::initialize() { m_payloadInterpreted = true; } -void cta::objectstore::AgentRegister::garbageCollect() { +void cta::objectstore::AgentRegister::garbageCollect(const std::string &presumedOwner) { checkPayloadWritable(); if (!isEmpty()) { throw (NotEmpty("Trying to garbage collect a non-empty AgentRegister: internal error")); diff --git a/objectstore/AgentRegister.hpp b/objectstore/AgentRegister.hpp index a695d093f401bf6f5850768203201a5378d4d608..fc81cd718a511aa0512e0566beaa188b5d6af63a 100644 --- a/objectstore/AgentRegister.hpp +++ b/objectstore/AgentRegister.hpp @@ -34,7 +34,7 @@ public: AgentRegister(const std::string & name, Backend & os); void initialize(); CTA_GENERATE_EXCEPTION_CLASS(NotEmpty); - void garbageCollect(); + void garbageCollect(const std::string &presumedOwner); bool isEmpty(); void addAgent (std::string name); void removeAgent (const std::string & name); diff --git a/objectstore/ArchiveToFileRequest.cpp b/objectstore/ArchiveToFileRequest.cpp index fad961799b344784d6e8d7c5b7f5fa9e13686ab8..26134f6309930bab593296158694e48d88d032d2 100644 --- a/objectstore/ArchiveToFileRequest.cpp +++ b/objectstore/ArchiveToFileRequest.cpp @@ -19,6 +19,7 @@ #include "ArchiveToFileRequest.hpp" #include "GenericObject.hpp" #include "CreationLog.hpp" +#include "TapePool.hpp" cta::objectstore::ArchiveToFileRequest::ArchiveToFileRequest( const std::string& address, Backend& os): @@ -46,6 +47,7 @@ void cta::objectstore::ArchiveToFileRequest::addJob(uint16_t copyNumber, j->set_copynb(copyNumber); j->set_status(serializers::ArchiveJobStatus::AJS_LinkingToTapePool); j->set_tapepool(tapepool); + j->set_owner(""); j->set_tapepooladdress(tapepooladdress); j->set_totalretries(0); j->set_retrieswithinmount(0); @@ -122,4 +124,70 @@ void cta::objectstore::ArchiveToFileRequest::setSize(uint64_t size) { m_payload.set_size(size); } +void cta::objectstore::ArchiveToFileRequest::garbageCollect(const std::string &presumedOwner) { + checkPayloadWritable(); + // The behavior here depends on which job the agent is supposed to own. + // We should first find this job (if any). This is for covering the case + // of a selected job. The Request could also still being connected to tape + // pools. In this case we will finish the connection to tape pools unconditionally. + auto * jl = m_payload.mutable_jobs(); + auto s= m_payload.size(); + s=s; + for (auto j=jl->begin(); j!=jl->end(); j++) { + auto owner=j->owner(); + auto status=j->status(); + if (status==serializers::AJS_LinkingToTapePool || + (status==serializers::AJS_Selected && owner==presumedOwner)) { + // If the job was being connected to the tape pool or was selected + // by the dead agent, then we have to ensure it is indeed connected to + // the tape pool and set its status to pending. + // (Re)connect the job to the tape pool and make it pending. + // If we fail to reconnect, we have to fail the job and potentially + // finish the request. + try { + TapePool tp(j->tapepooladdress(), m_objectStore); + ScopedExclusiveLock tpl(tp); + tp.fetch(); + JobDump jd; + jd.copyNb = j->copynb(); + jd.tapePool = j->tapepool(); + jd.tapePoolAddress = j->tapepooladdress(); + if (tp.addJobIfNecessary(jd, getAddressIfSet(), m_payload.size())) + tp.commit(); + j->set_status(serializers::AJS_Pending); + commit(); + } catch (...) { + j->set_status(serializers::AJS_Failed); + // This could be the end of the request, with various consequences. + // This is handled here: + if (finishIfNecessary()) + return; + } + } else { + return; + } + } +} + +bool cta::objectstore::ArchiveToFileRequest::finishIfNecessary() { + checkPayloadWritable(); + // This function is typically called after changing the status of one job + // in memory. If the job is complete, we will just remove it. + // TODO: we will have to push the result to the ArchiveToDirRequest when + // it gets implemented. + // If all the jobs are either complete or failed, we can remove the request. + auto & jl=m_payload.jobs(); + for (auto j=jl.begin(); j!=jl.end(); j++) { + if (j->status() != serializers::AJS_Complete + && j->status() != serializers::AJS_Failed) { + return false; + } + } + remove(); + return true; +} + + + + diff --git a/objectstore/ArchiveToFileRequest.hpp b/objectstore/ArchiveToFileRequest.hpp index f34b647b1226bfef359a574a4af85ff61c7a341e..c4e867a4926fceaba284008fdd474c11a2d941ca 100644 --- a/objectstore/ArchiveToFileRequest.hpp +++ b/objectstore/ArchiveToFileRequest.hpp @@ -34,8 +34,29 @@ public: ArchiveToFileRequest(const std::string & address, Backend & os); ArchiveToFileRequest(GenericObject & go); void initialize(); + // Job management ============================================================ void addJob(uint16_t copyNumber, const std::string & tapepool, const std::string & tapepooladdress); + void jobSelect(uint16_t copyNumber, const std::string & owner); + void jobSetPending(uint16_t copyNumber); + void jobSetSuccessful(uint16_t copyNumber); + void jobSetFailed(uint16_t copyNumber); + // Duplication of the protbuff statuses + enum jobStatus { + AJS_LinkingToTapePool = 0, + AJS_Pending = 1, + AJS_Selected = 2, + AJS_Complete = 3, + AJS_Failed = 99 + }; + enum jobStatus getJobStatus(uint16_t copyNumber); + // Handling of the consequences of a job status change for the entire request. + // This function returns true if the request got finished. + bool finishIfNecessary(); + // Request management ======================================================== + void setSuccessful(); + void setFailed(); + // =========================================================================== void setArchiveFile(const std::string & archiveFile); std::string getArchiveFile(); void setRemoteFile (const std::string & remoteFile); @@ -53,6 +74,7 @@ public: std::string tapePoolAddress; }; std::list<JobDump> dumpJobs(); + void garbageCollect(const std::string &presumedOwner); }; }} diff --git a/objectstore/DriveRegister.cpp b/objectstore/DriveRegister.cpp index f44f0dde283cb7ab4ea7e40853cefee1e479f6d5..8047e69b048014021b776249340ef1e7c2069088 100644 --- a/objectstore/DriveRegister.cpp +++ b/objectstore/DriveRegister.cpp @@ -19,6 +19,7 @@ #include "DriveRegister.hpp" #include "ProtocolBuffersAlgorithms.hpp" #include "GenericObject.hpp" +#include "RootEntry.hpp" cta::objectstore::DriveRegister::DriveRegister(const std::string & address, Backend & os): ObjectOps<serializers::DriveRegister>(os, address) { } @@ -37,8 +38,28 @@ void cta::objectstore::DriveRegister::initialize() { m_payloadInterpreted = true; } -void cta::objectstore::DriveRegister::garbageCollect() { +void cta::objectstore::DriveRegister::garbageCollect(const std::string &presumedOwner) { checkPayloadWritable(); + // If the agent is not anymore the owner of the object, then only the very + // last operation of the drive register creation failed. We have nothing to do. + if (presumedOwner != m_header.owner()) + return; + // If the owner is still the agent, we have 2 possibilities: + // 1) The register is referenced by the root entry. We just need to officialize + // the ownership on the reguster + { + RootEntry re(m_objectStore); + ScopedSharedLock rel (re); + re.fetch(); + try { + if (re.getAgentRegisterAddress() == getAddressIfSet()) { + setOwner(re.getAddressIfSet()); + commit(); + return; + } + } catch (RootEntry::NotAllocated &) {} + } + // 2) The tape pool is not referenced in the root entry. We can just clean it up. if (!isEmpty()) { throw (NotEmpty("Trying to garbage collect a non-empty AgentRegister: internal error")); } diff --git a/objectstore/DriveRegister.hpp b/objectstore/DriveRegister.hpp index f91f767ebeb6103cc09c295b09f4706b44568b96..766e55d380324bedbbc8fb5d24a466433c501653 100644 --- a/objectstore/DriveRegister.hpp +++ b/objectstore/DriveRegister.hpp @@ -35,7 +35,7 @@ public: DriveRegister(GenericObject & go); void initialize(); CTA_GENERATE_EXCEPTION_CLASS(NotEmpty); - void garbageCollect(); + void garbageCollect(const std::string &presumedOwner); bool isEmpty(); void addDrive (std::string name); void removeDrive (const std::string & name); diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index c5759b28d3051625ff55ccf97d694ddbfdf94d46..bf18dcdc08a9fbb11f33607450fff847c6f3b439 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -152,9 +152,9 @@ void GarbageCollector::checkHeartbeats() { } } - void GarbageCollector::cleanupDeadAgent(const std::string & name) { + void GarbageCollector::cleanupDeadAgent(const std::string & address) { // Check that we are still owners of the agent (sanity check). - Agent agent(name, m_objectStore); + Agent agent(address, m_objectStore); ScopedExclusiveLock agLock(agent); agent.fetch(); if (agent.getOwner() != m_ourAgent.getAddressIfSet()) { @@ -171,7 +171,7 @@ void GarbageCollector::checkHeartbeats() { go.fetch(); // Call GenericOpbject's garbage collect method, which in turn will // delegate to the object type's garbage collector. - go.garbageCollect(goLock); + go.garbageCollect(goLock, address); } // In all cases, relinquish ownership for this object agent.removeFromOwnership(*obj); diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index d2c8788f31564b3f557591016be5213b8aca7269..b6c94d8aa580ecf9fa6937aeb3b21d01b5ceb15d 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -206,13 +206,13 @@ TEST(ObjectStore, GarbageCollectorDriveRegister) { cta::objectstore::ScopedExclusiveLock rel(re); re.addOrGetAgentRegisterPointerAndCommit(agent, cl); rel.release(); - // Create an agent and add and agent register to it as an owned object + // Create an agent and add the drive register to it as an owned object cta::objectstore::Agent agA(be); agA.initialize(); agA.generateName("unitTestAgentA"); agA.setTimeout_us(0); agA.insertAndRegisterSelf(); - // Create a new agent register, owned by agA (by hand as it is not an usual + // Create a new drive register, owned by agA (by hand as it is not an usual // situation) std::string tpName; { @@ -248,5 +248,136 @@ TEST(ObjectStore, GarbageCollectorDriveRegister) { ASSERT_NO_THROW(re.removeIfEmpty()); } +TEST(ObjectStore, GarbageCollectorArchiveToFileRequest) { + // Here we check that can successfully call ArchiveToFileRequests's garbage collector + cta::objectstore::BackendVFS be; + cta::objectstore::Agent agent(be); + agent.generateName("unitTestGarbageCollector"); + // Create the root entry + cta::objectstore::RootEntry re(be); + re.initialize(); + re.insert(); + // Create the agent register + cta::objectstore::CreationLog cl(cta::UserIdentity(99, 99), + "unittesthost", time(NULL), "Creation of unit test agent register"); + cta::objectstore::ScopedExclusiveLock rel(re); + re.addOrGetAgentRegisterPointerAndCommit(agent, cl); + rel.release(); + // Create an agent + cta::objectstore::Agent agA(be); + agA.initialize(); + agA.generateName("unitTestAgentA"); + agA.setTimeout_us(0); + agA.insertAndRegisterSelf(); + // Several use cases are present for the ArchiveToFileRequests: + // - just referenced in agent ownership list, but not yet created. + // - just created but not linked to any tape pool + // - partially linked to tape pools + // - linked to all tape pools + // - In the 2 latter cases, the job could have been picked up for processing + // - already + //w + // Create 2 tape pools (not owned). + std::string tpAddr[2]; + for (int i=0; i<2; i++) + { + std::stringstream tpid; + tpid << "TapePool" << i; + tpAddr[i] = agent.nextId(tpid.str()); + cta::objectstore::TapePool tp(tpAddr[i], be); + tp.initialize(tpid.str()); + tp.setOwner(""); + tp.insert(); + } + // Create the various ATFR's, stopping one step further each time. + int pass=0; + while (true) + { + // -just referenced + std::string atfrAddr = agA.nextId("ArchiveToFileRequest"); + cta::objectstore::ScopedExclusiveLock agl(agA); + agA.fetch(); + agA.addToOwnership(atfrAddr); + agA.commit(); + if (pass < 1) { pass++; continue; } + // - created, but not linked to tape pools + cta::objectstore::ArchiveToFileRequest atfr(atfrAddr, be); + atfr.initialize(); + atfr.setArchiveFile("cta:/file"); + atfr.setRemoteFile("eos:/file"); + atfr.setPriority(0); + atfr.setSize(1000+pass); + cta::objectstore::CreationLog log(cta::objectstore::UserIdentity(123,456), + "unitTestHost", time(NULL), "ArchiveJobForGarbageCollection"); + atfr.setCreationLog(log); + atfr.addJob(1, "TapePool0", tpAddr[0]); + atfr.addJob(2, "TapePool1", tpAddr[1]); + atfr.setOwner(agA.getAddressIfSet()); + atfr.insert(); + cta::objectstore::ScopedExclusiveLock atfrl(atfr); + if (pass < 2) { pass++; continue; } + // - Referenced in the first tape pool + { + cta::objectstore::TapePool tp(tpAddr[0], be); + cta::objectstore::ScopedExclusiveLock tpl(tp); + tp.fetch(); + cta::objectstore::ArchiveToFileRequest::JobDump jd; + jd.copyNb = 1; + jd.tapePool = "TapePool0"; + jd.tapePoolAddress = tpAddr[0]; + tp.addJob(jd, atfr.getAddressIfSet(), 1000+pass); + tp.commit(); + } + if (pass < 3) { pass++; continue; } + // TODO: partially migrated or selected + // - Referenced in the second tape pool + { + cta::objectstore::TapePool tp(tpAddr[1], be); + cta::objectstore::ScopedExclusiveLock tpl(tp); + tp.fetch(); + cta::objectstore::ArchiveToFileRequest::JobDump jd; + jd.copyNb = 2; + jd.tapePool = "TapePool1"; + jd.tapePoolAddress = tpAddr[1]; + tp.addJob(jd, atfr.getAddressIfSet(), 1000); + tp.commit(); + } + if (pass < 4) { pass++; continue; } + // - Still marked a not owned but referenced in the agent + { + atfr.setOwner(""); + atfr.commit(); + } + break; + } + // Create the garbage collector and run it twice. + cta::objectstore::Agent gcAgent(be); + gcAgent.initialize(); + gcAgent.generateName("unitTestGarbageCollector"); + gcAgent.setTimeout_us(0); + gcAgent.insertAndRegisterSelf(); + { + cta::objectstore::GarbageCollector gc(be, gcAgent); + gc.runOnePass(); + gc.runOnePass(); + } + // All 4 requests should be linked in both tape pools + { + cta::objectstore::TapePool tp0(tpAddr[0], be); + cta::objectstore::ScopedExclusiveLock tp0l(tp0); + tp0.fetch(); + auto d=tp0.dumpJobs(); + ASSERT_EQ(4, tp0.getJobsSummary().files); + } + // Unregister gc's agent + cta::objectstore::ScopedExclusiveLock gcal(gcAgent); + gcAgent.removeAndUnregisterSelf(); + // We should not be able to remove the agent register (as it should be empty) + rel.lock(re); + re.fetch(); + ASSERT_NO_THROW(re.removeAgentRegisterAndCommit()); + ASSERT_NO_THROW(re.removeIfEmpty()); + // TODO: this unit test still leaks tape pools and requests +} } diff --git a/objectstore/GenericObject.cpp b/objectstore/GenericObject.cpp index 82442e169b7218b00591301e08fd0cea2478c389..98462c23899d120c1f976ed8842d42dc5f41643a 100644 --- a/objectstore/GenericObject.cpp +++ b/objectstore/GenericObject.cpp @@ -64,34 +64,37 @@ Backend& GenericObject::objectStore() { return m_objectStore; } +namespace { + using cta::objectstore::GenericObject; + using cta::objectstore::ScopedExclusiveLock; + template <class C> + void garbageCollectWithType(GenericObject * gop, ScopedExclusiveLock& lock, + const std::string &presumedOwner) { + C typedObject(*gop); + lock.transfer(typedObject); + typedObject.garbageCollect(presumedOwner); + // Release the lock now as if we let the caller do, it will point + // to the then-removed typedObject. + lock.release(); + } +} -void GenericObject::garbageCollect(ScopedExclusiveLock& lock) { +void GenericObject::garbageCollect(ScopedExclusiveLock& lock, + const std::string &presumedOwner) { checkHeaderWritable(); switch(m_header.type()) { - case serializers::AgentRegister_t: { - AgentRegister ar(*this); - lock.transfer(ar); - ar.garbageCollect(); - // Release the lock now as if we let the caller do, it will point - // to the then-removed ar. - lock.release(); - } break; - case serializers::TapePool_t: { - TapePool tp(*this); - lock.transfer(tp); - tp.garbageCollect(); - // Release the lock now as if we let the caller do, it will point - // to the then-removed tp. - lock.release(); - } break; - case serializers::DriveRegister_t: { - DriveRegister dr(*this); - lock.transfer(dr); - dr.garbageCollect(); - // Release the lock now as if we let the caller do, it will point - // to the then-removed dr. - lock.release(); - } break; + case serializers::AgentRegister_t: + garbageCollectWithType<AgentRegister>(this, lock, presumedOwner); + break; + case serializers::TapePool_t: + garbageCollectWithType<TapePool>(this, lock, presumedOwner); + break; + case serializers::DriveRegister_t: + garbageCollectWithType<DriveRegister>(this, lock, presumedOwner); + break; + case serializers::ArchiveToFileRequest_t: + garbageCollectWithType<ArchiveToFileRequest>(this, lock, presumedOwner); + break; default: { std::stringstream err; err << "In GenericObject::garbageCollect, unsupported type: " diff --git a/objectstore/GenericObject.hpp b/objectstore/GenericObject.hpp index 3a13f59a01a4f009fa35b92632093bd49e62ba7f..f8f12f024e7943f788d56f98a8062de7900aea38 100644 --- a/objectstore/GenericObject.hpp +++ b/objectstore/GenericObject.hpp @@ -56,8 +56,11 @@ public: * It also handles the passed lock and returns is unlocked. * The object is expected to be passed exclusive locked and already fetched. * No extra care will be required from the object + * + * @param lock reference to the generic object's lock + * @param presumedOwner address of the agent which pointed to the object */ - void garbageCollect(ScopedExclusiveLock & lock); + void garbageCollect(ScopedExclusiveLock & lock, const std::string &presumedOwner); CTA_GENERATE_EXCEPTION_CLASS(UnsupportedType); /** diff --git a/objectstore/ObjectOps.cpp b/objectstore/ObjectOps.cpp index 2d19c66ef52e411993cbd6e9185a5acca04378ac..bdf160b75ab267a664afba23e68187f672dd248a 100644 --- a/objectstore/ObjectOps.cpp +++ b/objectstore/ObjectOps.cpp @@ -35,4 +35,4 @@ namespace cta { namespace objectstore { MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(RetrieveToFileRequest); #undef MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID -}} \ No newline at end of file +}} \ No newline at end of file diff --git a/objectstore/ObjectOps.hpp b/objectstore/ObjectOps.hpp index a1b0f1977e24dd4a433722511a7b7ebf53877654..c5a1ebb9500ffc438bf8afec14ca3bb2acf4e354 100644 --- a/objectstore/ObjectOps.hpp +++ b/objectstore/ObjectOps.hpp @@ -294,7 +294,7 @@ public: /** * This function should be overloaded in the inheriting classes */ - void garbageCollect() { + void garbageCollect(const std::string &presumedOwner) { throw WrongTypeForGarbageCollection("In ObjectOps::garbageCollect. " "This function should have been overloaded in derived class"); } diff --git a/objectstore/RootEntry.cpp b/objectstore/RootEntry.cpp index d3e6d90cc4e708117726c68be50521f785da5e43..cfcc3129056a23fbaa723e458f8f0eaa977ebe3b 100644 --- a/objectstore/RootEntry.cpp +++ b/objectstore/RootEntry.cpp @@ -465,7 +465,7 @@ std::string cta::objectstore::RootEntry::addOrGetTapePoolAndCommit(const std::st TapePool tp(tapePoolAddress, ObjectOps<serializers::RootEntry>::m_objectStore); tp.initialize(tapePool); tp.setOwner(agent.getAddressIfSet()); - tp.setBackupOwner(agent.getAddressIfSet()); + tp.setBackupOwner("root"); tp.insert(); ScopedExclusiveLock tpl(tp); // Now move the tape pool's ownership to the root entry @@ -571,13 +571,11 @@ std::string cta::objectstore::RootEntry::addOrGetDriveRegisterPointerAndCommit( agent.fetch(); agent.addToOwnership(drAddress); agent.commit(); - // Then create the drive register object + // Then create the drive register object DriveRegister dr(drAddress, m_objectStore); dr.initialize(); - // There is no garbage collection for a drive register: if it is not - // plugged to the root entry, it does not exist. - dr.setOwner(""); - dr.setBackupOwner(""); + dr.setOwner(agent.getAddressIfSet()); + dr.setBackupOwner(getAddressIfSet()); dr.insert(); // Take a lock on drive registry ScopedExclusiveLock drLock(dr); diff --git a/objectstore/TapePool.cpp b/objectstore/TapePool.cpp index 28cade2dcb7f3444a9f5f81586854de385773011..6010e4e0b57810e7ae8f5db3a04a84365b65e7d3 100644 --- a/objectstore/TapePool.cpp +++ b/objectstore/TapePool.cpp @@ -21,6 +21,7 @@ #include "ProtocolBuffersAlgorithms.hpp" #include "CreationLog.hpp" #include "Tape.hpp" +#include "RootEntry.hpp" cta::objectstore::TapePool::TapePool(const std::string& address, Backend& os): ObjectOps<serializers::TapePool>(os, address) { } @@ -141,25 +142,44 @@ std::string cta::objectstore::TapePool::getTapeAddress(const std::string& vid) { return serializers::findElement(m_payload.tapes(), vid).address(); } - - - - - bool cta::objectstore::TapePool::isEmpty() { checkPayloadReadable(); // Check we have no tapes in pool if (m_payload.tapes_size()) return false; // Check we have no archival jobs pending - if (m_payload.archivaljobs_size()) + if (m_payload.archivejobs_size()) return false; // If we made it to here, it seems the pool is indeed empty. return true; } -void cta::objectstore::TapePool::garbageCollect() { +void cta::objectstore::TapePool::garbageCollect(const std::string &presumedOwner) { checkPayloadWritable(); + // If the agent is not anymore the owner of the object, then only the very + // last operation of the tape pool creation failed. We have nothing to do. + if (presumedOwner != m_header.owner()) + return; + // If the owner is still the agent, there are 2 possibilities + // 1) The tape pool is referenced in the root entry, and then nothing is needed + // besides setting the tape pool's owner to the root entry's address in + // order to enable its usage. Before that, it was considered as a dangling + // pointer. + { + RootEntry re(m_objectStore); + ScopedSharedLock rel (re); + re.fetch(); + auto tpd=re.dumpTapePools(); + for (auto tp=tpd.begin(); tp!=tpd.end(); tp++) { + if (tp->address == getAddressIfSet()) { + setOwner(re.getAddressIfSet()); + commit(); + return; + } + } + } + // 2) The tape pool is not referenced by the root entry. It is then effectively + // not accessible and should be discarded. if (!isEmpty()) { throw (NotEmpty("Trying to garbage collect a non-empty TapePool: internal error")); } @@ -179,8 +199,64 @@ std::string cta::objectstore::TapePool::getName() { void cta::objectstore::TapePool::addJob(const ArchiveToFileRequest::JobDump& job, const std::string & archiveToFileAddress, uint64_t size) { checkPayloadWritable(); - auto * j = m_payload.add_archivaljobs(); + auto * j = m_payload.add_archivejobs(); + j->set_address(archiveToFileAddress); + j->set_size(size); +} + +auto cta::objectstore::TapePool::getJobsSummary() -> JobsSummary { + checkPayloadReadable(); + JobsSummary ret; + ret.files = m_payload.archivejobs_size(); + ret.bytes = m_payload.archivejobstotalsize(); + return ret; +} + +bool cta::objectstore::TapePool::addJobIfNecessary(const ArchiveToFileRequest::JobDump& job, const std::string& archiveToFileAddress, uint64_t size) { + checkPayloadWritable(); + auto & jl=m_payload.archivejobs(); + for (auto j=jl.begin(); j!= jl.end(); j++) { + if (j->address() == archiveToFileAddress) + return false; + } + auto * j = m_payload.add_archivejobs(); j->set_address(archiveToFileAddress); j->set_size(size); + return true; +} + +void cta::objectstore::TapePool::removeJob(const std::string& archiveToFileAddress) { + checkPayloadWritable(); + auto * jl=m_payload.mutable_archivejobs(); + bool found = false; + do { + // Push the found entry all the way to the end. + for (size_t i=0; i<(size_t)jl->size(); i++) { + if (jl->Get(i).address() == archiveToFileAddress) { + found = true; + while (i+1 < (size_t)jl->size()) { + jl->SwapElements(i, i+1); + i++; + } + break; + } + } + // and remove it + if (found) + jl->RemoveLast(); + } while (found); } +auto cta::objectstore::TapePool::dumpJobs() -> std::list<JobDump> { + checkPayloadReadable(); + std::list<JobDump> ret; + auto & jl=m_payload.archivejobs(); + for (auto j=jl.begin(); j!=jl.end(); j++) { + ret.push_back(JobDump()); + ret.back().address = j->address(); + ret.back().size = j->size(); + } + return ret; +} + + diff --git a/objectstore/TapePool.hpp b/objectstore/TapePool.hpp index 3bda22618d1104cbae4696fd68421999c66c440f..74dc06bbc387ca8efbbdcbbbf2d6f893da31e8f6 100644 --- a/objectstore/TapePool.hpp +++ b/objectstore/TapePool.hpp @@ -67,13 +67,30 @@ public: // Archival jobs management ================================================== void addJob(const ArchiveToFileRequest::JobDump & job, const std::string & archiveToFileAddress, uint64_t size); + /// This version will check for existence of the job in the queue before + // returns true if a new job was inserted. + bool addJobIfNecessary(const ArchiveToFileRequest::JobDump & job, + const std::string & archiveToFileAddress, uint64_t size); + class JobsSummary { + public: + uint64_t files; + uint64_t bytes; + }; + JobsSummary getJobsSummary(); + void removeJob(const std::string &archiveToFileAddress); + class JobDump { + public: + uint64_t size; + std::string address; + }; + std::list<JobDump> dumpJobs(); // Check that the tape pool is empty (of both tapes and jobs) bool isEmpty(); CTA_GENERATE_EXCEPTION_CLASS(NotEmpty); // Garbage collection - void garbageCollect(); + void garbageCollect(const std::string &presumedOwner); }; }} \ No newline at end of file diff --git a/objectstore/cta.proto b/objectstore/cta.proto index 1f6f858a53c37f9f2602221a66e312c3898e9ed8..75560d2f01dfe38ff8c12e759945f697aeb32487 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -174,7 +174,7 @@ message TapePointer { message TapePool { required string name = 4100; repeated TapePointer tapes = 4101; - repeated ArchiveJobPointer archivaljobs = 4102; + repeated ArchiveJobPointer archivejobs = 4102; required uint64 ArchiveJobsTotalSize = 4103; } @@ -215,9 +215,10 @@ message ArchiveJobEntry { required uint32 copynb = 4400; required string tapepool = 4401; required string tapepooladdress = 4402; - required ArchiveJobStatus status = 4403; - required uint32 totalretries = 4404; - required uint32 retrieswithinmount = 4405; + required string owner = 4403; + required ArchiveJobStatus status = 4404; + required uint32 totalretries = 4405; + required uint32 retrieswithinmount = 4406; } message ArchiveToFileRequest { diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index fb6411d649b334922316fb87758e2f4aa3e74428..4c367e15eb2cd1608e694a8214eeca48f77198d8 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -319,7 +319,13 @@ void OStoreDB::createTape(const std::string& vid, objectstore::TapePool tp(tpAddress, m_objectStore); ScopedExclusiveLock tpl(tp); tp.fetch(); + // Check the tape pool is owned by the root entry. If not, it should be + // considered as a dangling pointer. + if (tp.getOwner() != re.getAddressIfSet()) + throw NoSuchTapePool("In OStoreDB::createTape: trying to create a tape in a" + " non-existing tape pool (dangling pointer)"); // Check that the tape exists and throw an exception if it does. + // TODO: we should check in all tape pools (or have a central index) try { tp.getTapeAddress(vid); throw TapeAlreadyExists("In OStoreDB::createTape: trying to create an existing tape."); @@ -345,6 +351,10 @@ std::list<cta::Tape> OStoreDB::getTapes() const { objectstore::TapePool tp(tpi->address, m_objectStore); ScopedSharedLock tpl(tp); tp.fetch(); + // Check the tape pool is owned by the root entry. If not, it should be + // considered as a dangling pointer (and skip it) + if (tp.getOwner() != re.getAddressIfSet()) + continue; auto tl=tp.dumpTapes(); for (auto ti=tl.begin(); ti!=tl.end(); ti++) { objectstore::Tape t(ti->address, m_objectStore); @@ -395,6 +405,10 @@ void OStoreDB::deleteLogicalLibrary(const SecurityIdentity& requester, objectstore::TapePool tp(tpp->address, m_objectStore); ScopedSharedLock tplock(tp); tp.fetch(); + // Check the tape pool is owned by the root entry. If not, it should be + // considered as a dangling pointer. + if (tp.getOwner() != re.getAddressIfSet()) + continue; auto tl=tp.dumpTapes(); for (auto t=tl.begin(); t!=tl.end(); t++) { if (t->logicalLibraryName == name) @@ -443,13 +457,36 @@ void OStoreDB::queue(const cta::ArchiveToFileRequest& rqst) { atfr.setOwner(m_agent->getAddressIfSet()); atfr.insert(); ScopedExclusiveLock atfrl(atfr); - // We can now plug the request onto its tape pools - for (auto j=jl.begin(); j!=jl.end(); j++) { - objectstore::TapePool tp(j->tapePoolAddress, m_objectStore); - ScopedExclusiveLock tpl(tp); - tp.fetch(); - tp.addJob(*j, atfr.getAddressIfSet(), atfr.getSize()); - tp.commit(); + // We can now plug the request onto its tape pools. + // We can discover at that point that a tape pool is actually not + // really owned by the root entry, and hence a dangling pointer + // We should then unlink the jobs from that already connected + // tape pools and abort the job creation. + // The list of done tape pools is held here for this purpose + std::list<std::string> linkedTapePools; + try { + for (auto j=jl.begin(); j!=jl.end(); j++) { + objectstore::TapePool tp(j->tapePoolAddress, m_objectStore); + ScopedExclusiveLock tpl(tp); + tp.fetch(); + if (tp.getOwner() != re.getAddressIfSet()) + throw NoSuchTapePool("In OStoreDB::queue: non-existing tape pool found " + "(dangling pointer): cancelling request creation."); + tp.addJob(*j, atfr.getAddressIfSet(), atfr.getSize()); + tp.commit(); + linkedTapePools.push_back(j->tapePoolAddress); + } + } catch (NoSuchTapePool &) { + // Unlink the request from already connected tape pools + for (auto tpa=linkedTapePools.begin(); tpa!=linkedTapePools.end(); tpa++) { + objectstore::TapePool tp(*tpa, m_objectStore); + ScopedExclusiveLock tpl(tp); + tp.fetch(); + tp.removeJob(atfr.getAddressIfSet()); + tp.commit(); + atfr.remove(); + } + throw; } // The request is now fully set. As it's multi-owned, we do not set the owner atfr.setOwner(""); @@ -461,6 +498,7 @@ void OStoreDB::queue(const cta::ArchiveToFileRequest& rqst) { m_agent->removeFromOwnership(atfr.getAddressIfSet()); m_agent->commit(); } + return; } void OStoreDB::queue(const ArchiveToDirRequest& rqst) { diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index 87b1bef5c52e73367aba0ca8a298acefdc775b22..0a4eaf5ebc7fa3209da947b2d041437af892ccb1 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -103,6 +103,7 @@ public: /* === Tapes handling ==================================================== */ CTA_GENERATE_EXCEPTION_CLASS(TapeAlreadyExists); CTA_GENERATE_EXCEPTION_CLASS(NoSuchLibrary); + CTA_GENERATE_EXCEPTION_CLASS(NoSuchTapePool); virtual void createTape(const std::string& vid, const std::string& logicalLibraryName, const std::string& tapePoolName, const uint64_t capacityInBytes, const cta::CreationLog& creationLog);