diff --git a/catalogue/DummyCatalogue.hpp b/catalogue/DummyCatalogue.hpp index e44f909c53a99a1a98fc36c4b52d46c6ac9b4cb3..d41e119999b0750dd4b8ada0311ea9839ddf2403 100644 --- a/catalogue/DummyCatalogue.hpp +++ b/catalogue/DummyCatalogue.hpp @@ -69,7 +69,7 @@ public: common::dataStructures::ArchiveFileSummary getTapeFileSummary(const TapeFileSearchCriteria& searchCriteria) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } std::list<common::dataStructures::TapePool> getTapePools() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } std::list<common::dataStructures::Tape> getTapes(const TapeSearchCriteria& searchCriteria) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } - common::dataStructures::VidToTapeMap getTapesByVid(const std::set<std::string>& vids) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + // getTapesByVid is implemented below (and works). std::list<TapeForWriting> getTapesForWriting(const std::string& logicalLibraryName) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } bool isAdmin(const common::dataStructures::SecurityIdentity& admin) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } void modifyAdminHostComment(const common::dataStructures::SecurityIdentity& admin, const std::string& hostName, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } @@ -109,6 +109,34 @@ public: void tapeMountedForArchive(const std::string& vid, const std::string& drive) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } void tapeMountedForRetrieve(const std::string& vid, const std::string& drive) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } bool tapePoolExists(const std::string& tapePoolName) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + + // Special functions for unit tests. + void addEnabledTape(const std::string & vid) { + threading::MutexLocker lm(m_tapeEnablingMutex); + m_tapeEnabling[vid]=true; + } + void addDisabledTape(const std::string & vid) { + threading::MutexLocker lm(m_tapeEnablingMutex); + m_tapeEnabling[vid]=false; + } + common::dataStructures::VidToTapeMap getTapesByVid(const std::set<std::string>& vids) const { + // Minimal implementation of VidToMap for retrieve request unit tests. We just support + // disabled status for the tapes. + // If the tape is not listed, it is listed as disabled in the return value. + threading::MutexLocker lm(m_tapeEnablingMutex); + common::dataStructures::VidToTapeMap ret; + for (const auto & v: vids) { + try { + ret[v].disabled = !m_tapeEnabling.at(v); + } catch (std::out_of_range &) { + ret[v].disabled = true; + } + } + return ret; + } +private: + mutable threading::Mutex m_tapeEnablingMutex; + std::map<std::string, bool> m_tapeEnabling; }; }} // namespace cta::catalogue. \ No newline at end of file diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index 62e56eb042462b80d7566259ff6bd4c881917f2b..5d533778cc996aee7341045188eb0c029cef951f 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -32,7 +32,9 @@ #include "AgentRegister.hpp" #include "DriveRegister.hpp" #include "ArchiveRequest.hpp" +#include "RetrieveRequest.hpp" #include "ArchiveQueue.hpp" +#include "RetrieveQueue.hpp" #include "EntryLogSerDeser.hpp" #include "catalogue/DummyCatalogue.hpp" @@ -465,4 +467,183 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { // TODO: this unit test still leaks tape pools and requests } +TEST(ObjectStore, GarbageCollectorRetrieveRequest) { + // We will need a log object +#ifdef STDOUT_LOGGING + cta::log::StdoutLogger dl("unitTest"); +#else + cta::log::DummyLogger dl("unitTest"); +#endif + cta::log::LogContext lc(dl); + // We need a dummy catalogue + cta::catalogue::DummyCatalogue catalogue(dl); + // Here we check that can successfully call RetrieveRequests's garbage collector + cta::objectstore::BackendVFS be; + // Create the root entry + cta::objectstore::RootEntry re(be); + re.initialize(); + re.insert(); + // Create the agent register + cta::objectstore::EntryLogSerDeser el("user0", + "unittesthost", time(NULL)); + cta::objectstore::ScopedExclusiveLock rel(re); + // Create the agent for objects creation + cta::objectstore::AgentReference agentRef("unitTestCreateEnv"); + // Finish root creation. + re.addOrGetAgentRegisterPointerAndCommit(agentRef, el); + rel.release(); + // continue agent creation. + cta::objectstore::Agent agent(agentRef.getAgentAddress(), be); + agent.initialize(); + agent.setTimeout_us(0); + agent.insertAndRegisterSelf(); + // Create an agent to garbage be collected + cta::objectstore::AgentReference agrA("unitTestAgentA"); + cta::objectstore::Agent agA(agrA.getAgentAddress(), be); + agA.initialize(); + agA.setTimeout_us(0); + agA.insertAndRegisterSelf(); + // Several use cases are present for the RetrieveRequests: + // - just referenced in agent ownership list, but not yet created. + // - just created but not linked to any tape + // - partially linked to tape + // - When requeueing the request, the tape could be disabled, in which case + // it will be deleted. + // + // Create 2 retrieve queues + std::string tAddr[2]; + for (int i=0; i<2; i++) + { + cta::objectstore::RootEntry re(be); + cta::objectstore::ScopedExclusiveLock rel(re); + re.fetch(); + std::stringstream vid; + vid << "Tape" << i; + tAddr[i] = re.addOrGetRetrieveQueueAndCommit(vid.str(), agentRef); + cta::objectstore::RetrieveQueue rq(tAddr[i], be); + } + // Create the various ATFR's, stopping one step further each time. + int pass=0; + while (true) + { + // -just referenced + std::string atfrAddr = agrA.nextId("RetrieveRequest"); + agrA.addToOwnership(atfrAddr, be); + if (pass < 1) { pass++; continue; } + // - created, but not linked to tape pools. Those jobs will be queued by the garbage + // collector. + cta::objectstore::RetrieveRequest rr(atfrAddr, be); + rr.initialize(); + cta::common::dataStructures::RetrieveFileQueueCriteria rqc; + rqc.archiveFile.archiveFileID = 123456789L; + rqc.archiveFile.diskFileId = "eos://diskFile"; + rqc.archiveFile.checksumType = ""; + rqc.archiveFile.checksumValue = ""; + rqc.archiveFile.creationTime = 0; + rqc.archiveFile.reconciliationTime = 0; + rqc.archiveFile.diskFileInfo = cta::common::dataStructures::DiskFileInfo(); + rqc.archiveFile.diskInstance = "eoseos"; + rqc.archiveFile.fileSize = 667; + rqc.archiveFile.storageClass = "sc"; + rqc.archiveFile.tapeFiles[1].blockId=0; + rqc.archiveFile.tapeFiles[1].compressedSize=1; + rqc.archiveFile.tapeFiles[1].compressedSize=1; + rqc.archiveFile.tapeFiles[1].copyNb=1; + rqc.archiveFile.tapeFiles[1].creationTime=time(nullptr); + rqc.archiveFile.tapeFiles[1].fSeq=pass; + rqc.archiveFile.tapeFiles[1].vid="Tape0"; + rqc.archiveFile.tapeFiles[2].blockId=0; + rqc.archiveFile.tapeFiles[2].compressedSize=1; + rqc.archiveFile.tapeFiles[2].compressedSize=1; + rqc.archiveFile.tapeFiles[2].copyNb=2; + rqc.archiveFile.tapeFiles[2].creationTime=time(nullptr); + rqc.archiveFile.tapeFiles[2].fSeq=pass; + rqc.archiveFile.tapeFiles[2].vid="Tape1"; + rqc.mountPolicy.archiveMinRequestAge = 1; + rqc.mountPolicy.archivePriority = 1; + rqc.mountPolicy.creationLog.time = time(nullptr); + rqc.mountPolicy.lastModificationLog.time = time(nullptr); + rqc.mountPolicy.maxDrivesAllowed = 1; + rqc.mountPolicy.retrieveMinRequestAge = 1; + rqc.mountPolicy.retrievePriority = 1; + rr.setRetrieveFileQueueCriteria(rqc); + cta::common::dataStructures::RetrieveRequest sReq; + sReq.archiveFileID = rqc.archiveFile.archiveFileID; + sReq.creationLog.time=time(nullptr); + rr.setSchedulerRequest(sReq); + rr.addJob(1, 1, 1); + rr.addJob(2, 1, 1); + rr.setOwner(agA.getAddressIfSet()); + rr.setActiveCopyNumber(0); + rr.insert(); + cta::objectstore::ScopedExclusiveLock rrl(rr); + if (pass < 3) { pass++; continue; } + // - Reference job in the first tape + { + cta::objectstore::RetrieveQueue rq(tAddr[0], be); + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + rq.addJob(1,rqc.archiveFile.tapeFiles[1].fSeq, rr.getAddressIfSet(), rqc.archiveFile.fileSize, rqc.mountPolicy, sReq.creationLog.time); + rq.commit(); + } + if (pass < 5) { pass++; continue; } + // - Still marked a not owned but referenced in the agent + { + rr.setOwner(tAddr[0]); + rr.setActiveCopyNumber(1); + rr.commit(); + } + break; + } + // Mark the tape as enabled + catalogue.addEnabledTape("Tape0"); + // Create the garbage collector and run it twice. + cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector"); + cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be); + gcAgent.initialize(); + gcAgent.setTimeout_us(0); + gcAgent.insertAndRegisterSelf(); + { + cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue); + gc.runOnePass(lc); + gc.runOnePass(lc); + } + // All 4 requests should be linked in the first tape queue + { + cta::objectstore::RetrieveQueue rq(tAddr[0], be); + cta::objectstore::ScopedExclusiveLock tp0lock(rq); + rq.fetch(); + auto dump=rq.dumpJobs(); + // We expect all jobs with sizes 1002-1005 inclusive to be connected to + // their respective tape pools. + ASSERT_EQ(5, rq.getJobsSummary().files); + } + // Unregister gc's agent + cta::objectstore::ScopedExclusiveLock gcal(gcAgent); + gcAgent.fetch(); + gcAgent.removeAndUnregisterSelf(); + // We should not be able to remove the agent register (as it should be empty) + rel.lock(re); + re.fetch(); + // Remove jobs from retrieve queue + std::list<std::string> retrieveQueues = { "Tape0", "Tape1" }; + for (auto & vid: retrieveQueues) { + // Empty queue + cta::objectstore::RetrieveQueue rq(re.getRetrieveQueueAddress(vid), be); + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + for (auto &j: rq.dumpJobs()) { + rq.removeJob(j.address); + } + rq.commit(); + rql.release(); + // Remove queues from root + re.removeRetrieveQueueAndCommit(vid); + } + + ASSERT_NO_THROW(re.removeAgentRegisterAndCommit()); + ASSERT_NO_THROW(re.removeIfEmpty()); + // TODO: this unit test still leaks tape pools and requests +} + } diff --git a/objectstore/Helpers.cpp b/objectstore/Helpers.cpp index fe6dc3bd9d6f4a40ddd184de54960b7bf07be5ea..4da82bba892bf5ddfa8ed192f8408cdf99a824b2 100644 --- a/objectstore/Helpers.cpp +++ b/objectstore/Helpers.cpp @@ -86,7 +86,7 @@ void Helpers::getLockedAndFetchedRetrieveQueue(RetrieveQueue& retrieveQueue, Sco ScopedSharedLock rel(re); re.fetch(); try { - retrieveQueue.setAddress(re.getRetrieveQueue(vid)); + retrieveQueue.setAddress(re.getRetrieveQueueAddress(vid)); } catch (cta::exception::Exception & ex) { rel.release(); ScopedExclusiveLock rexl(re); @@ -258,7 +258,7 @@ std::list<SchedulerDatabase::RetrieveQueueStatistics> Helpers::getRetrieveQueueS continue; std::string rqAddr; try { - std::string rqAddr = re.getRetrieveQueue(tf.second.vid); + std::string rqAddr = re.getRetrieveQueueAddress(tf.second.vid); } catch (cta::exception::Exception &) { ret.push_back(SchedulerDatabase::RetrieveQueueStatistics()); ret.back().vid=tf.second.vid; diff --git a/objectstore/RetrieveQueue.cpp b/objectstore/RetrieveQueue.cpp index e721eabb55299f26bd892fe6a8012904937c2b82..596ae24135d894c126f7a14f41456c33f4420c04 100644 --- a/objectstore/RetrieveQueue.cpp +++ b/objectstore/RetrieveQueue.cpp @@ -133,6 +133,49 @@ void cta::objectstore::RetrieveQueue::addJob(uint64_t copyNb, uint64_t fSeq, } } +bool cta::objectstore::RetrieveQueue::addJobIfNecessary(uint64_t copyNb, uint64_t fSeq, + const std::string & retrieveRequestAddress, uint64_t size, + const cta::common::dataStructures::MountPolicy & policy, time_t startTime) { + checkPayloadWritable(); + // Check if the job is present and skip insertion if so + for (auto &j: m_payload.retrievejobs()) { + if (j.address() == retrieveRequestAddress) + return false; + } + // Keep track of the mounting criteria + ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); + maxDriveAllowedMap.incCount(policy.maxDrivesAllowed); + ValueCountMap priorityMap(m_payload.mutable_prioritymap()); + priorityMap.incCount(policy.retrievePriority); + ValueCountMap minRetrieveRequestAgeMap(m_payload.mutable_minretrieverequestagemap()); + minRetrieveRequestAgeMap.incCount(policy.retrieveMinRequestAge); + if (m_payload.retrievejobs_size()) { + if (m_payload.oldestjobcreationtime() > (uint64_t)startTime) { + m_payload.set_oldestjobcreationtime(startTime); + } + m_payload.set_retrievejobstotalsize(m_payload.retrievejobstotalsize() + size); + } else { + m_payload.set_oldestjobcreationtime(startTime); + m_payload.set_retrievejobstotalsize(size); + } + auto * j = m_payload.add_retrievejobs(); + j->set_address(retrieveRequestAddress); + j->set_size(size); + j->set_copynb(copyNb); + j->set_fseq(fSeq); + j->set_priority(policy.retrievePriority); + j->set_minretrieverequestage(policy.retrieveMinRequestAge); + j->set_maxdrivesallowed(policy.maxDrivesAllowed); + // move the the new job in the right spot on the queue. + // i points to the newly added job all the time. + size_t i=m_payload.retrievejobs_size() - 1; + while (i > 0 && m_payload.retrievejobs(i).fseq() < m_payload.retrievejobs(i - 1).fseq()) { + m_payload.mutable_retrievejobs()->SwapElements(i-1, i); + i--; + } + return true; +} + cta::objectstore::RetrieveQueue::JobsSummary cta::objectstore::RetrieveQueue::getJobsSummary() { checkPayloadReadable(); JobsSummary ret; diff --git a/objectstore/RetrieveQueue.hpp b/objectstore/RetrieveQueue.hpp index 5091da0bf2b97dd941e68c17a47ecde2a0e51b27..5f3cdd529bf747fd37b347692f23d38549bab3ef 100644 --- a/objectstore/RetrieveQueue.hpp +++ b/objectstore/RetrieveQueue.hpp @@ -48,6 +48,11 @@ public: void addJob(uint64_t copyNb, uint64_t fSeq, const std::string & retrieveRequestAddress, uint64_t size, const cta::common::dataStructures::MountPolicy & policy, time_t startTime); + /// This version will check for existence of the job in the queue before + // returns true if a new job was actually inserted. + bool addJobIfNecessary(uint64_t copyNb, uint64_t fSeq, + const std::string & retrieveRequestAddress, uint64_t size, + const cta::common::dataStructures::MountPolicy & policy, time_t startTime); struct JobsSummary { uint64_t files; uint64_t bytes; diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index 7efcae8c60fa6569e77eb1ccf8b089d79a36269e..6425696eca59df5fa6ee908c0576d65b53956f43 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -64,7 +64,13 @@ void RetrieveRequest::garbageCollect(const std::string& presumedOwner, AgentRefe cta::catalogue::Catalogue & catalogue) { checkPayloadWritable(); // Check the request is indeed owned by the right owner. - if (getOwner() != presumedOwner) return; + if (getOwner() != presumedOwner) { + log::ScopedParamContainer params(lc); + params.add("jobObject", getAddressIfSet()) + .add("presumedOwner", presumedOwner) + .add("owner", getOwner()); + lc.log(log::INFO, "In RetrieveRequest::garbageCollect(): no garbage collection needed."); + } // The owner is indeed the right one. We should requeue the request if possible. // Find the vids for active jobs in the request (pending ones). using serializers::RetrieveJobStatus; @@ -130,7 +136,7 @@ jobFound:; // Enqueue add the job to the queue objectstore::MountPolicySerDeser mp; mp.deserialize(m_payload.mountpolicy()); - rq.addJob(bestTapeFile->copynb(), bestTapeFile->fseq(), getAddressIfSet(), m_payload.archivefile().filesize(), + rq.addJobIfNecessary(bestTapeFile->copynb(), bestTapeFile->fseq(), getAddressIfSet(), m_payload.archivefile().filesize(), mp, m_payload.schedulerrequest().entrylog().time()); auto jobsSummary=rq.getJobsSummary(); rq.commit(); diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp index e5c0b1669b3a10ea58c9e8cef78fbacdc6e7e1f2..2b503f4a10f1a05b621c415f6aa5c337a55edd36 100644 --- a/objectstore/RetrieveRequest.hpp +++ b/objectstore/RetrieveRequest.hpp @@ -83,17 +83,13 @@ public: void setSchedulerRequest(const cta::common::dataStructures::RetrieveRequest & retrieveRequest); cta::common::dataStructures::RetrieveRequest getSchedulerRequest(); - void setArchiveFile(const cta::common::dataStructures::ArchiveFile & archiveFile); - cta::common::dataStructures::ArchiveFile getArchiveFile(); - void setRetrieveFileQueueCriteria(const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria); cta::common::dataStructures::RetrieveFileQueueCriteria getRetrieveFileQueueCriteria(); + cta::common::dataStructures::ArchiveFile getArchiveFile(); + cta::common::dataStructures::EntryLog getEntryLog(); void setActiveCopyNumber(uint32_t activeCopyNb); uint32_t getActiveCopyNumber(); - - void setEntryLog(const cta::common::dataStructures::EntryLog &creationLog); - cta::common::dataStructures::EntryLog getEntryLog(); // =========================================================================== std::list<JobDump> dumpJobs(); std::string dump(); diff --git a/objectstore/RootEntry.cpp b/objectstore/RootEntry.cpp index 6c0628ae6d50984be0b595d670ec4e8ab2341946..9393d4e203f3c2126ffe7576e8a1e3499ee7f800 100644 --- a/objectstore/RootEntry.cpp +++ b/objectstore/RootEntry.cpp @@ -110,7 +110,7 @@ std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool } catch (serializers::NotFound &) {} // Insert the archive queue, then its pointer, with agent intent log update // First generate the intent. We expect the agent to be passed locked. - std::string archiveQueueAddress = agentRef.nextId("archiveQueue"); + std::string archiveQueueAddress = agentRef.nextId("ArchiveQueue"); agentRef.addToOwnership(archiveQueueAddress, m_objectStore); // Then create the tape pool queue object ArchiveQueue aq(archiveQueueAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore); @@ -229,7 +229,7 @@ std::string RootEntry::addOrGetRetrieveQueueAndCommit(const std::string& vid, Ag } catch (serializers::NotFound &) {} // Insert the retrieve queue, then its pointer, with agent intent log update // First generate the intent. We expect the agent to be passed locked. - std::string retrieveQueueAddress = agentRef.nextId("retriveQueue"); + std::string retrieveQueueAddress = agentRef.nextId("RetriveQueue"); agentRef.addToOwnership(retrieveQueueAddress, m_objectStore); // Then create the tape pool queue object RetrieveQueue rq(retrieveQueueAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore); @@ -269,7 +269,7 @@ void RootEntry::removeRetrieveQueueAndCommit(const std::string& vid) { rql.lock(rq); rq.fetch(); } catch (cta::exception::Exception & ex) { - // The archive queue seems to not be there. Make sure this is the case: + // The retrieve queue seems to not be there. Make sure this is the case: if (rq.exists()) { // We failed to access the queue, yet it is present. This is an error. // Let the exception pass through. @@ -306,8 +306,14 @@ void RootEntry::removeRetrieveQueueAndCommit(const std::string& vid) { } -std::string RootEntry::getRetrieveQueue(const std::string& vid) { - throw cta::exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__); +std::string RootEntry::getRetrieveQueueAddress(const std::string& vid) { + checkPayloadReadable(); + try { + auto & rqp = serializers::findElement(m_payload.retrievequeuepointers(), vid); + return rqp.address(); + } catch (serializers::NotFound &) { + throw NoSuchRetrieveQueue("In RootEntry::getRetreveQueueAddress: retrieve queue not allocated"); + } } void RootEntry::removeArchiveQueueIfAddressMatchesAndCommit(const std::string& tapePool, const std::string& archiveQueueAddress) { @@ -338,7 +344,7 @@ std::string RootEntry::addOrGetDriveRegisterPointerAndCommit( return getDriveRegisterAddress(); } catch (NotAllocated &) { // decide on the object's name and add to agent's intent. - std::string drAddress (agentRef.nextId("driveRegister")); + std::string drAddress (agentRef.nextId("DriveRegister")); agentRef.addToOwnership(drAddress, m_objectStore); // Then create the drive register object DriveRegister dr(drAddress, m_objectStore); @@ -426,7 +432,7 @@ std::string RootEntry::addOrGetAgentRegisterPointerAndCommit(AgentReference& age return m_payload.agentregisterpointer().address(); } // decide on the object's name - std::string arAddress (agentRef.nextId("agentRegister")); + std::string arAddress (agentRef.nextId("AgentRegister")); // Record the agent registry in our own intent addIntendedAgentRegistry(arAddress); commit(); @@ -545,7 +551,7 @@ std::string RootEntry::addOrGetSchedulerGlobalLockAndCommit(AgentReference& agen return getSchedulerGlobalLock(); } catch (NotAllocated &) { // decide on the object's name and add to agent's intent. - std::string sglAddress (agentRef.nextId("schedulerGlobalLock")); + std::string sglAddress (agentRef.nextId("SchedulerGlobalLock")); agentRef.addToOwnership(sglAddress, m_objectStore); // Then create the drive register object SchedulerGlobalLock sgl(sglAddress, m_objectStore); diff --git a/objectstore/RootEntry.hpp b/objectstore/RootEntry.hpp index 70ef5964580e0de09b28ffe9501aa529efe85d82..530471d770f5ddad750275262c3100a2a50ae23f 100644 --- a/objectstore/RootEntry.hpp +++ b/objectstore/RootEntry.hpp @@ -87,7 +87,7 @@ public: void removeMissingRetrieveQueueReference(const std::string & address); CTA_GENERATE_EXCEPTION_CLASS(NoSuchRetrieveQueue); void removeRetrieveQueueAndCommit(const std::string & vid); - std::string getRetrieveQueue(const std::string & vid); + std::string getRetrieveQueueAddress(const std::string & vid); struct RetrieveQueueDump { std::string vid; std::string address; diff --git a/objectstore/cta-objectstore-collect-orphaned-object.cpp b/objectstore/cta-objectstore-collect-orphaned-object.cpp index 5506f4f7376f820fe24c067ce868a117e3efc941..98a85ffec62b9c846523738b88bc34781912bc0c 100644 --- a/objectstore/cta-objectstore-collect-orphaned-object.cpp +++ b/objectstore/cta-objectstore-collect-orphaned-object.cpp @@ -27,6 +27,7 @@ #include "Agent.hpp" #include "RootEntry.hpp" #include "ArchiveRequest.hpp" +#include "RetrieveRequest.hpp" #include "GenericObject.hpp" #include "common/log/StringLogger.hpp" #include "catalogue/CatalogueFactory.hpp" @@ -56,7 +57,7 @@ int main(int argc, char ** argv) { } catch (std::bad_cast &){} std::cout << "Object store path: " << be->getParams()->toURL() << std::endl; // Try and open the object. - cta::objectstore::GenericObject go(argv[2], *be); + cta::objectstore::GenericObject go(argv[3], *be); cta::objectstore::ScopedExclusiveLock gol(go); std::cout << "Object address: " << go.getAddressIfSet() << std::endl; go.fetch(); @@ -73,7 +74,7 @@ int main(int argc, char ** argv) { gol.release(); bool someGcDone=false; gcpass: - cta::objectstore::ArchiveRequest ar(argv[2], *be); + cta::objectstore::ArchiveRequest ar(argv[3], *be); cta::objectstore::ScopedExclusiveLock arl(ar); ar.fetch(); for (auto & j: ar.dumpJobs()) { @@ -94,6 +95,24 @@ int main(int argc, char ** argv) { } break; } + case cta::objectstore::serializers::ObjectType::RetrieveRequest_t: + { + // Reopen the object as an ArchiveRequest + std::cout << "The object is an RetrieveRequest" << std::endl; + gol.release(); + cta::objectstore::RetrieveRequest rr(argv[3], *be); + cta::objectstore::ScopedExclusiveLock rrl(rr); + rr.fetch(); + if (!be->exists(rr.getOwner())) { + std::cout << "Owner " << rr.getOwner() << " does not exist." << std::endl; + rr.garbageCollect(rr.getOwner(), agr, lc, *catalogue); + rr.fetch(); + std::cout << "New owner for request is " << rr.getOwner() << std::endl; + } else { + std::cout << "No request was not orphaned." << std::endl; + } + break; + } default: std::cout << "Object type: " << go.type() << " not supported for this operation" << std::endl; break; diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 52d723620fe023d19f2c3805ee78b992a1dcf9ff..25febcb40d97ba46adb4e1b6bac2208d00c46f3e 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -777,7 +777,7 @@ void OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveRequest& [vid](decltype(*criteria.archiveFile.tapeFiles.cbegin()) & tf){ return tf.second.vid == vid; })) throw RetrieveRequestHasNoCopies("In OStoreDB::queueRetrieve(): no tape file for requested vid."); // In order to post the job, construct it first in memory. - objectstore::RetrieveRequest rReq(m_agentReference->nextId("RetrieveToFileRequest"), m_objectStore); + objectstore::RetrieveRequest rReq(m_agentReference->nextId("RetrieveRequest"), m_objectStore); rReq.initialize(); rReq.setSchedulerRequest(rqst); rReq.setRetrieveFileQueueCriteria(criteria);