From 441eeab5933eeb8465d3895d3a97bbabc4d27786 Mon Sep 17 00:00:00 2001 From: Cedric CAFFY <cedric.caffy@cern.ch> Date: Tue, 9 Jul 2019 11:25:58 +0200 Subject: [PATCH] Corrected the RetrieveRequest::garbageCollect method Added some unit tests to test all garbage collection of all type of Retrieve Requests (ToTransferForUser, ToReportToUser, ToReportToRepackForSuccess, ToReportToRepackForFailure) --- objectstore/ArchiveRequest.cpp | 12 +- objectstore/ArchiveRequest.hpp | 7 + objectstore/GarbageCollector.cpp | 27 +- objectstore/GarbageCollector.hpp | 6 +- objectstore/GarbageCollectorTest.cpp | 603 +++++++++++++++++++++++++++ objectstore/RetrieveRequest.cpp | 6 +- 6 files changed, 643 insertions(+), 18 deletions(-) diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index fbb8ee03c6..928dfe931d 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -381,12 +381,10 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer auto * jl = m_payload.mutable_jobs(); bool anythingGarbageCollected=false; using serializers::ArchiveJobStatus; - std::set<ArchiveJobStatus> statusesImplyingQueueing ({ArchiveJobStatus::AJS_ToTransferForUser, ArchiveJobStatus::AJS_ToReportToUserForTransfer, - ArchiveJobStatus::AJS_ToReportToUserForFailure, ArchiveJobStatus::AJS_Failed}); for (auto j=jl->begin(); j!=jl->end(); j++) { auto owner=j->owner(); auto status=j->status(); - if ( statusesImplyingQueueing.count(status) && owner==presumedOwner) { + if ( c_statusesImplyingQueueing.count(status) && owner==presumedOwner) { // The job is in a state which implies queuing. std::string queueObject="Not defined yet"; anythingGarbageCollected=true; @@ -396,7 +394,13 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer // recreated (this will be done by helper). ArchiveQueue aq(m_objectStore); ScopedExclusiveLock aql; - Helpers::getLockedAndFetchedJobQueue<ArchiveQueue>(aq, aql, agentReference, j->tapepool(), getQueueType(status), lc); + std::string containerId; + if(!c_statusesImplyingQueueingByRepackRequestAddress.count(status)){ + containerId = j->tapepool(); + } else { + containerId = m_payload.repack_info().repack_request_address(); + } + Helpers::getLockedAndFetchedJobQueue<ArchiveQueue>(aq, aql, agentReference, containerId, getQueueType(status), lc); queueObject=aq.getAddressIfSet(); ArchiveRequest::JobDump jd; jd.copyNb = j->copynb(); diff --git a/objectstore/ArchiveRequest.hpp b/objectstore/ArchiveRequest.hpp index 27b1f00402..6936f967a1 100644 --- a/objectstore/ArchiveRequest.hpp +++ b/objectstore/ArchiveRequest.hpp @@ -83,6 +83,13 @@ public: * success/failure scenario. */ serializers::ArchiveJobStatus nextStatus; }; + const std::set<serializers::ArchiveJobStatus> c_statusesImplyingQueueing = {serializers::ArchiveJobStatus::AJS_ToTransferForUser, serializers::ArchiveJobStatus::AJS_ToReportToUserForTransfer, + serializers::ArchiveJobStatus::AJS_ToReportToUserForFailure, serializers::ArchiveJobStatus::AJS_Failed, + serializers::ArchiveJobStatus::AJS_ToTransferForRepack, serializers::ArchiveJobStatus::AJS_ToReportToRepackForFailure, + serializers::ArchiveJobStatus::AJS_ToReportToRepackForSuccess + }; + const std::set<serializers::ArchiveJobStatus> c_statusesImplyingQueueingByRepackRequestAddress {serializers::ArchiveJobStatus::AJS_ToReportToRepackForFailure, + serializers::ArchiveJobStatus::AJS_ToReportToRepackForSuccess}; private: /** * Determine and set the new status of the job and determine whether and where the request should be queued diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index 6ff68c09a4..91da1174e8 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -339,9 +339,17 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std:: } // Small parenthesis for non transfer cases. if (candidateVids.empty()) { - // The request might need to be added to the failed to report of failed queue/container. + //If the queueType of the RetrieveRequest is FailedJobs or JobsToReportToUser, it needs to be requeued in a queue identified by the vid of the tape + //If queueType is JobsToReportToRepackForSuccess or JobsToReportToRepackForFailure, it needs to be requeued in a queue identified by the RepackRequest's address try { - retrieveQueuesAndRequests[std::make_tuple(rr->getArchiveFile().tapeFiles.begin()->vid, rr->getQueueType())].emplace_back(rr); + std::string vid = rr->getArchiveFile().tapeFiles.begin()->vid; + if(rr->getQueueType() != JobQueueType::FailedJobs && rr->getQueueType() != JobQueueType::JobsToReportToUser){ + retrieveQueuesAndRequests[std::make_tuple(rr->getRepackInfo().repackRequestAddress, rr->getQueueType(),vid)].emplace_back(rr); + } else { + // The request has failed, might need to be added to the failed to report of failed queue/container. + retrieveQueuesAndRequests[std::make_tuple(vid, rr->getQueueType(),vid)].emplace_back(rr); + } + break; } catch (cta::exception::Exception & ex) { log::ScopedParamContainer params3(lc); params3.add("fileId", rr->getArchiveFile().archiveFileID) @@ -362,7 +370,7 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std:: otherObjects.emplace_back(new GenericObject(rr->getAddressIfSet(), objectStore)); break; } - retrieveQueuesAndRequests[std::make_tuple(vid, JobQueueType::JobsToTransferForUser)].emplace_back(rr); + retrieveQueuesAndRequests[std::make_tuple(vid, JobQueueType::JobsToTransferForUser,vid)].emplace_back(rr); log::ScopedParamContainer params3(lc); // Find copyNb for logging size_t copyNb = std::numeric_limits<size_t>::max(); @@ -373,8 +381,8 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std:: .add("tapeVid", vid) .add("fSeq", fSeq); lc.log(log::INFO, "Selected vid to be requeued for retrieve request."); - break; } + break; default: // For other objects, we will not implement any optimization and simply call // their individual garbageCollect method. @@ -520,9 +528,10 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& // 2) Get the retrieve requests done. They are simpler as retrieve requests are fully owned. // Then should hence not have changes since we pre-fetched them. for (auto & retriveQueueIdAndReqs: retrieveQueuesAndRequests) { - std::string vid; + std::string containerIdentifier; JobQueueType queueType; - std::tie(vid, queueType) = retriveQueueIdAndReqs.first; + std::string vid; + std::tie(containerIdentifier, queueType, vid) = retriveQueueIdAndReqs.first; auto & requestsList = retriveQueueIdAndReqs.second; while (requestsList.size()) { decltype (retriveQueueIdAndReqs.second) currentJobBatch; @@ -545,7 +554,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& // Get the retrieve queue and add references to the jobs to it. RetrieveQueue rq(objectStore); ScopedExclusiveLock rql; - Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(rq,rql, agentReference, vid, queueType, lc); + Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(rq,rql, agentReference, containerIdentifier, queueType, lc); queueLockFetchTime = t.secs(utils::Timer::resetCounter); auto jobsSummary=rq.getJobsSummary(); filesBefore=jobsSummary.jobs; @@ -557,9 +566,9 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& for (auto & rr: currentJobBatch) { // Determine the copy number and feed the queue with it. for (auto &tf: rr->getArchiveFile().tapeFiles) { - if (tf.vid == vid) { + if (tf.vid == vid) { jta.push_back({tf.copyNb, tf.fSeq, rr->getAddressIfSet(), rr->getArchiveFile().fileSize, - rr->getRetrieveFileQueueCriteria().mountPolicy, rr->getEntryLog().time, rr->getActivity()}); + rr->getRetrieveFileQueueCriteria().mountPolicy, rr->getEntryLog().time, rr->getActivity()}); } } } diff --git a/objectstore/GarbageCollector.hpp b/objectstore/GarbageCollector.hpp index f9eedf80f1..639274f8c6 100644 --- a/objectstore/GarbageCollector.hpp +++ b/objectstore/GarbageCollector.hpp @@ -28,7 +28,7 @@ /** * Plan => Garbage collector keeps track of the agents. - * If an agent is declared dead => tape ownership of owned objects + * If an agent is declared dead => take ownership of owned objects * Using the backup owner, re-post the objet to the container. * All containers will have a "repost" method, which is more thorough * (and expensive) than the usual one. It can for example prevent double posting. @@ -56,8 +56,10 @@ public: /** Structure allowing the sorting of owned objects, so they can be requeued in batches, * one batch per queue. */ struct OwnedObjectSorter { + // TODO: tuple[0] = containerIdentifier (tapepool or Repack Request's address), tuple[1]=jobQueueType, tuple[2]=tapepoolOfTheJob std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr <ArchiveRequest>>> archiveQueuesAndRequests; - std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr <RetrieveRequest>>> retrieveQueuesAndRequests; + //tuple[0] = containerIdentifier (vid or Repack Request's address), tuple[1]=jobQueueType, tuple[2]=vidOfTheJob + std::map<std::tuple<std::string, JobQueueType,std::string>, std::list<std::shared_ptr <RetrieveRequest>>> retrieveQueuesAndRequests; std::list<std::shared_ptr<GenericObject>> otherObjects; //Sorter m_sorter; /// Fill up the fetchedObjects with objects of interest. diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index 752d708d40..c6366eec3e 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -1083,4 +1083,607 @@ TEST(ObjectStore, GarbageCollectorRepackRequestStarting) { ASSERT_NE(std::string::npos,logToCheck.find("MSG=\"In RepackRequest::garbageCollect(): failed to requeue the RepackRequest (leaving it as it is) : The status Starting have no corresponding queue.\"")); } +TEST(ObjectStore, GarbageCollectorRetrieveAllStatusesAndQueues) { +// We will need a log object +#ifdef STDOUT_LOGGING + cta::log::StdoutLogger dl("dummy", "unitTest"); +#else + cta::log::DummyLogger dl("dummy", "unitTest"); +#endif + cta::log::LogContext lc(dl); + // We need a dummy catalogue + cta::catalogue::DummyCatalogue catalogue; + // Here we check that can successfully call RetrieveRequests's garbage collector + cta::objectstore::BackendVFS be; + // Create the root entry + cta::objectstore::RootEntry re(be); + re.initialize(); + re.insert(); + // Create the agent register + cta::objectstore::EntryLogSerDeser el("user0", + "unittesthost", time(NULL)); + cta::objectstore::ScopedExclusiveLock rel(re); + // Create the agent for objects creation + cta::objectstore::AgentReference agentRef("unitTestCreateEnv", dl); + // Finish root creation. + re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc); + rel.release(); + // continue agent creation. + cta::objectstore::Agent agent(agentRef.getAgentAddress(), be); + agent.initialize(); + agent.setTimeout_us(10000); + agent.insertAndRegisterSelf(lc); + // Create all agents to be garbage collected + cta::objectstore::AgentReference agentRefToTransferForUser("ToTransferForUser", dl); + cta::objectstore::Agent agentToTransferForUser(agentRefToTransferForUser.getAgentAddress(), be); + agentToTransferForUser.initialize(); + agentToTransferForUser.setTimeout_us(0); + agentToTransferForUser.insertAndRegisterSelf(lc); + + std::string retrieveRequestAddress = agentRefToTransferForUser.nextId("RetrieveRequest"); + agentRefToTransferForUser.addToOwnership(retrieveRequestAddress, be); + + cta::objectstore::RetrieveRequest rr(retrieveRequestAddress, be); + + rr.initialize(); + cta::common::dataStructures::RetrieveFileQueueCriteria rqc; + rqc.archiveFile.archiveFileID = 123456789L; + rqc.archiveFile.diskFileId = "eos://diskFile"; + rqc.archiveFile.checksumBlob.insert(cta::checksum::NONE, ""); + rqc.archiveFile.creationTime = 0; + rqc.archiveFile.reconciliationTime = 0; + rqc.archiveFile.diskFileInfo = cta::common::dataStructures::DiskFileInfo(); + rqc.archiveFile.diskInstance = "eoseos"; + rqc.archiveFile.fileSize = 1000; + rqc.archiveFile.storageClass = "sc"; + { + cta::common::dataStructures::TapeFile tf; + tf.blockId=0; + tf.fileSize=1; + tf.copyNb=2; + tf.creationTime=time(nullptr); + tf.fSeq=1; + tf.vid="Tape0"; + rqc.archiveFile.tapeFiles.push_back(tf); + } + rqc.mountPolicy.archiveMinRequestAge = 1; + rqc.mountPolicy.archivePriority = 1; + rqc.mountPolicy.creationLog.time = time(nullptr); + rqc.mountPolicy.lastModificationLog.time = time(nullptr); + rqc.mountPolicy.maxDrivesAllowed = 1; + rqc.mountPolicy.retrieveMinRequestAge = 1; + rqc.mountPolicy.retrievePriority = 1; + rr.setRetrieveFileQueueCriteria(rqc); + cta::common::dataStructures::RetrieveRequest sReq; + sReq.archiveFileID = rqc.archiveFile.archiveFileID; + sReq.creationLog.time=time(nullptr); + rr.setSchedulerRequest(sReq); + rr.setJobStatus(2,cta::objectstore::serializers::RetrieveJobStatus::RJS_ToTransferForUser); + rr.setOwner(agentToTransferForUser.getAddressIfSet()); + rr.setActiveCopyNumber(0); + rr.insert(); + + // Create the garbage collector and run it once. + cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector", dl); + cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be); + gcAgent.initialize(); + gcAgent.setTimeout_us(0); + gcAgent.insertAndRegisterSelf(lc); + + cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue); + gc.runOnePass(lc); + + { + //The Retrieve Request should now be queued in the RetrieveQueueToTransferForUser + re.fetchNoLock(); + cta::objectstore::RetrieveQueue rq(re.getRetrieveQueueAddress("Tape0", cta::objectstore::JobQueueType::JobsToTransferForUser), be); + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + auto jobs = rq.dumpJobs(); + ASSERT_EQ(1,jobs.size()); + + auto& job = jobs.front(); + ASSERT_EQ(2,job.copyNb); + + rr.fetchNoLock(); + ASSERT_EQ(rr.getOwner(),rq.getAddressIfSet()); + } + + { + //Test the RetrieveRequest::garbageCollect method for RetrieveQueueToTransferForUser + cta::objectstore::AgentReference agentRefToTransferForUserAutoGc("ToTransferForUser", dl); + cta::objectstore::Agent agentToTransferForUserAutoGc(agentRefToTransferForUserAutoGc.getAgentAddress(), be); + agentToTransferForUserAutoGc.initialize(); + agentToTransferForUserAutoGc.setTimeout_us(0); + agentToTransferForUserAutoGc.insertAndRegisterSelf(lc); + + cta::objectstore::ScopedExclusiveLock sel(rr); + rr.fetch(); + rr.setOwner(agentRefToTransferForUserAutoGc.getAgentAddress()); + agentRefToTransferForUserAutoGc.addToOwnership(rr.getAddressIfSet(),be); + + ASSERT_NO_THROW(rr.garbageCollect(agentRefToTransferForUserAutoGc.getAgentAddress(),agentRef,lc,catalogue)); + sel.release(); + //The Retrieve Request should now be queued in the RetrieveQueueToTransferForUser + re.fetchNoLock(); + cta::objectstore::RetrieveQueue rq(re.getRetrieveQueueAddress("Tape0", cta::objectstore::JobQueueType::JobsToTransferForUser), be); + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + auto jobs = rq.dumpJobs(); + ASSERT_EQ(1,jobs.size()); + + auto& job = jobs.front(); + ASSERT_EQ(2,job.copyNb); + + rr.fetchNoLock(); + ASSERT_EQ(rr.getOwner(),rq.getAddressIfSet()); + } + + { + //Test the Garbage collection of the RetrieveRequest with a reportToUserForFailure job + cta::objectstore::AgentReference agentRefToReportToUser("ToReportToUser", dl); + cta::objectstore::Agent agentToReportToUser(agentRefToReportToUser.getAgentAddress(), be); + agentToReportToUser.initialize(); + agentToReportToUser.setTimeout_us(0); + agentToReportToUser.insertAndRegisterSelf(lc); + + cta::objectstore::RetrieveQueue rq(re.getRetrieveQueueAddress("Tape0", cta::objectstore::JobQueueType::JobsToTransferForUser), be); + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + rq.removeJobsAndCommit({rr.getAddressIfSet()}); + rql.release(); + + { + cta::objectstore::ScopedExclusiveLock sel(rr); + rr.fetch(); + rr.setOwner(agentRefToReportToUser.getAgentAddress()); + rr.setJobStatus(2,cta::objectstore::serializers::RetrieveJobStatus::RJS_ToReportToUserForFailure); + rr.commit(); + } + + agentRefToReportToUser.addToOwnership(rr.getAddressIfSet(),be); + + gc.runOnePass(lc); + + //The Retrieve Request should be queued in the RetrieveQueueToReportToUser + re.fetchNoLock(); + cta::objectstore::RetrieveQueue rqToReportToUser(re.getRetrieveQueueAddress("Tape0", cta::objectstore::JobQueueType::JobsToReportToUser), be); + rqToReportToUser.fetchNoLock(); + + auto jobs = rqToReportToUser.dumpJobs(); + ASSERT_EQ(1,jobs.size()); + + auto& job = jobs.front(); + ASSERT_EQ(2,job.copyNb); + } + + { + //Test the RetrieveRequest::garbageCollect method for ToReportToUserForFailure job + cta::objectstore::AgentReference agentRefToReportToUserAutoGc("ToReportForUser", dl); + cta::objectstore::Agent agentToReportToUserAutoGc(agentRefToReportToUserAutoGc.getAgentAddress(), be); + agentToReportToUserAutoGc.initialize(); + agentToReportToUserAutoGc.setTimeout_us(0); + agentToReportToUserAutoGc.insertAndRegisterSelf(lc); + + + cta::objectstore::RetrieveQueue rq(re.getRetrieveQueueAddress("Tape0", cta::objectstore::JobQueueType::JobsToReportToUser), be); + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + rq.removeJobsAndCommit({rr.getAddressIfSet()}); + rql.release(); + + { + cta::objectstore::ScopedExclusiveLock sel(rr); + rr.fetch(); + rr.setOwner(agentRefToReportToUserAutoGc.getAgentAddress()); + rr.setJobStatus(2,cta::objectstore::serializers::RetrieveJobStatus::RJS_ToReportToUserForFailure); + rr.commit(); + + agentRefToReportToUserAutoGc.addToOwnership(rr.getAddressIfSet(),be); + + ASSERT_NO_THROW(rr.garbageCollect(agentRefToReportToUserAutoGc.getAgentAddress(),agentRef,lc,catalogue)); + } + + //The Retrieve Request should now be queued in the RetrieveQueueToTransferForUser + + re.fetchNoLock(); + cta::objectstore::RetrieveQueue rqToReportToUser(re.getRetrieveQueueAddress("Tape0", cta::objectstore::JobQueueType::JobsToReportToUser), be); + rqToReportToUser.fetchNoLock(); + + auto jobs = rqToReportToUser.dumpJobs(); + ASSERT_EQ(1,jobs.size()); + + auto& job = jobs.front(); + ASSERT_EQ(2,job.copyNb); + + rr.fetchNoLock(); + ASSERT_EQ(rqToReportToUser.getAddressIfSet(),rr.getOwner()); + } + + { + //Test the Garbage collection of the RetrieveRequest with a RJS_Failed job + cta::objectstore::AgentReference agentRefFailedJob("FailedJob", dl); + cta::objectstore::Agent agentFailedJob(agentRefFailedJob.getAgentAddress(), be); + agentFailedJob.initialize(); + agentFailedJob.setTimeout_us(0); + agentFailedJob.insertAndRegisterSelf(lc); + + cta::objectstore::RetrieveQueue rq(re.getRetrieveQueueAddress("Tape0", cta::objectstore::JobQueueType::JobsToReportToUser), be); + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + rq.removeJobsAndCommit({rr.getAddressIfSet()}); + rql.release(); + + { + cta::objectstore::ScopedExclusiveLock sel(rr); + rr.fetch(); + rr.setOwner(agentRefFailedJob.getAgentAddress()); + rr.setJobStatus(2,cta::objectstore::serializers::RetrieveJobStatus::RJS_Failed); + rr.commit(); + } + agentRefFailedJob.addToOwnership(rr.getAddressIfSet(),be); + + gc.runOnePass(lc); + + //The Retrieve Request should be queued in the RetrieveQueueFailed + re.fetchNoLock(); + cta::objectstore::RetrieveQueue rqFailed(re.getRetrieveQueueAddress("Tape0", cta::objectstore::JobQueueType::FailedJobs), be); + rqFailed.fetchNoLock(); + + auto jobs = rqFailed.dumpJobs(); + ASSERT_EQ(1,jobs.size()); + + auto& job = jobs.front(); + ASSERT_EQ(2,job.copyNb); + } + + { + //Test the RetrieveRequest::garbageCollect method for RJS_Failed job + cta::objectstore::AgentReference agentRefFailedJobAutoGc("FailedJob", dl); + cta::objectstore::Agent agentFailedJobAutoGc(agentRefFailedJobAutoGc.getAgentAddress(), be); + agentFailedJobAutoGc.initialize(); + agentFailedJobAutoGc.setTimeout_us(0); + agentFailedJobAutoGc.insertAndRegisterSelf(lc); + + + cta::objectstore::RetrieveQueue rq(re.getRetrieveQueueAddress("Tape0", cta::objectstore::JobQueueType::FailedJobs), be); + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + rq.removeJobsAndCommit({rr.getAddressIfSet()}); + rql.release(); + + { + cta::objectstore::ScopedExclusiveLock sel(rr); + rr.fetch(); + rr.setOwner(agentRefFailedJobAutoGc.getAgentAddress()); + rr.setJobStatus(2,cta::objectstore::serializers::RetrieveJobStatus::RJS_Failed); + rr.commit(); + + + agentRefFailedJobAutoGc.addToOwnership(rr.getAddressIfSet(),be); + + ASSERT_NO_THROW(rr.garbageCollect(agentRefFailedJobAutoGc.getAgentAddress(),agentRef,lc,catalogue)); + } + + //The Retrieve Request should now be queued in the RetrieveQueueToTransferForUser + + re.fetchNoLock(); + cta::objectstore::RetrieveQueue rqToReportToUser(re.getRetrieveQueueAddress("Tape0", cta::objectstore::JobQueueType::FailedJobs), be); + rqToReportToUser.fetchNoLock(); + + auto jobs = rqToReportToUser.dumpJobs(); + ASSERT_EQ(1,jobs.size()); + + auto& job = jobs.front(); + ASSERT_EQ(2,job.copyNb); + + rr.fetchNoLock(); + ASSERT_EQ(rqToReportToUser.getAddressIfSet(),rr.getOwner()); + } + + //Create a repack info object for the garbage collection of Jobs ToReportToRepackForSuccess and ToReportToRepackForFailure + cta::objectstore::RetrieveRequest::RepackInfo ri; + ri.isRepack = true; + ri.fSeq = 1; + ri.fileBufferURL = "testFileBufferURL"; + ri.repackRequestAddress = "repackRequestAddress"; + + { + //Test the Garbage collection of the RetrieveRequest with a Retrieve job ToReportToRepackForSuccess + cta::objectstore::AgentReference agentRefToReportToRepackForSuccess("ToReportToRepackForSuccess", dl); + cta::objectstore::Agent agentToReportToRepackForSuccess(agentRefToReportToRepackForSuccess.getAgentAddress(), be); + agentToReportToRepackForSuccess.initialize(); + agentToReportToRepackForSuccess.setTimeout_us(0); + agentToReportToRepackForSuccess.insertAndRegisterSelf(lc); + + cta::objectstore::RetrieveQueue rq(re.getRetrieveQueueAddress("Tape0", cta::objectstore::JobQueueType::FailedJobs), be); + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + rq.removeJobsAndCommit({rr.getAddressIfSet()}); + rql.release(); + + { + cta::objectstore::ScopedExclusiveLock sel(rr); + rr.fetch(); + rr.setOwner(agentRefToReportToRepackForSuccess.getAgentAddress()); + //Add the repack informations to the RetrieveRequest + rr.setRepackInfo(ri); + rr.setJobStatus(2,cta::objectstore::serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess); + rr.commit(); + } + agentRefToReportToRepackForSuccess.addToOwnership(rr.getAddressIfSet(),be); + + gc.runOnePass(lc); + + //The Retrieve Request should be queued in the RetrieveQueueToReportToRepackForSuccess + re.fetchNoLock(); + cta::objectstore::RetrieveQueue rqToReportToRepackForSuccess(re.getRetrieveQueueAddress(ri.repackRequestAddress, cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess), be); + rqToReportToRepackForSuccess.fetchNoLock(); + + auto jobs = rqToReportToRepackForSuccess.dumpJobs(); + ASSERT_EQ(1,jobs.size()); + + auto& job = jobs.front(); + ASSERT_EQ(2,job.copyNb); + } + + { + //Test the RetrieveRequest::garbageCollect method for RJS_ToReportToRepackForSuccess job + cta::objectstore::AgentReference agentRefToReportToRepackForSuccessJobAutoGc("ToReportToRepackForSuccessAutoGC", dl); + cta::objectstore::Agent agentToReportToRepackForSuccessJobAutoGc(agentRefToReportToRepackForSuccessJobAutoGc.getAgentAddress(), be); + agentToReportToRepackForSuccessJobAutoGc.initialize(); + agentToReportToRepackForSuccessJobAutoGc.setTimeout_us(0); + agentToReportToRepackForSuccessJobAutoGc.insertAndRegisterSelf(lc); + + + cta::objectstore::RetrieveQueue rq(re.getRetrieveQueueAddress(ri.repackRequestAddress, cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess), be); + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + rq.removeJobsAndCommit({rr.getAddressIfSet()}); + rql.release(); + + { + cta::objectstore::ScopedExclusiveLock sel(rr); + rr.fetch(); + rr.setOwner(agentRefToReportToRepackForSuccessJobAutoGc.getAgentAddress()); + rr.setJobStatus(2,cta::objectstore::serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess); + rr.commit(); + + agentRefToReportToRepackForSuccessJobAutoGc.addToOwnership(rr.getAddressIfSet(),be); + + ASSERT_NO_THROW(rr.garbageCollect(agentRefToReportToRepackForSuccessJobAutoGc.getAgentAddress(),agentRef,lc,catalogue)); + } + + //The Retrieve Request should now be queued in the RetrieveQueueToReportToRepackForSuccess + + re.fetchNoLock(); + cta::objectstore::RetrieveQueue rqToReportToRepackForSuccess(re.getRetrieveQueueAddress(ri.repackRequestAddress, cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess), be); + rqToReportToRepackForSuccess.fetchNoLock(); + + auto jobs = rqToReportToRepackForSuccess.dumpJobs(); + ASSERT_EQ(1,jobs.size()); + + auto& job = jobs.front(); + ASSERT_EQ(2,job.copyNb); + + rr.fetchNoLock(); + ASSERT_EQ(rqToReportToRepackForSuccess.getAddressIfSet(),rr.getOwner()); + } + + { + //Test the Garbage collection of the RetrieveRequest with a Retrieve job ToReportToRepackForFailure + cta::objectstore::AgentReference agentRefToReportToRepackForFailure("ToReportToRepackForFailure", dl); + cta::objectstore::Agent agentToReportToRepackForFailure(agentRefToReportToRepackForFailure.getAgentAddress(), be); + agentToReportToRepackForFailure.initialize(); + agentToReportToRepackForFailure.setTimeout_us(0); + agentToReportToRepackForFailure.insertAndRegisterSelf(lc); + + cta::objectstore::RetrieveQueue rq(re.getRetrieveQueueAddress(ri.repackRequestAddress, cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess), be); + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + rq.removeJobsAndCommit({rr.getAddressIfSet()}); + rql.release(); + + cta::objectstore::ScopedExclusiveLock sel(rr); + rr.fetch(); + rr.setOwner(agentRefToReportToRepackForFailure.getAgentAddress()); + + rr.setJobStatus(2,cta::objectstore::serializers::RetrieveJobStatus::RJS_ToReportToRepackForFailure); + rr.commit(); + sel.release(); + + agentRefToReportToRepackForFailure.addToOwnership(rr.getAddressIfSet(),be); + + gc.runOnePass(lc); + + //The Retrieve Request should be queued in the RetrieveQueueToReportToRepackForFailure + re.fetchNoLock(); + cta::objectstore::RetrieveQueue rqToReportToRepackForFailure(re.getRetrieveQueueAddress(ri.repackRequestAddress, cta::objectstore::JobQueueType::JobsToReportToRepackForFailure), be); + rqToReportToRepackForFailure.fetchNoLock(); + + auto jobs = rqToReportToRepackForFailure.dumpJobs(); + ASSERT_EQ(1,jobs.size()); + + auto& job = jobs.front(); + ASSERT_EQ(2,job.copyNb); + } + + { + //Test the RetrieveRequest::garbageCollect method for RJS_ToReportToRepackForSuccess job + cta::objectstore::AgentReference agentRefToReportToRepackForFailureJobAutoGc("ToReportToRepackForFailureAutoGC", dl); + cta::objectstore::Agent agentToReportToRepackForFailureJobAutoGc(agentRefToReportToRepackForFailureJobAutoGc.getAgentAddress(), be); + agentToReportToRepackForFailureJobAutoGc.initialize(); + agentToReportToRepackForFailureJobAutoGc.setTimeout_us(0); + agentToReportToRepackForFailureJobAutoGc.insertAndRegisterSelf(lc); + + + cta::objectstore::RetrieveQueue rq(re.getRetrieveQueueAddress(ri.repackRequestAddress, cta::objectstore::JobQueueType::JobsToReportToRepackForFailure), be); + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + rq.removeJobsAndCommit({rr.getAddressIfSet()}); + rql.release(); + + { + cta::objectstore::ScopedExclusiveLock sel(rr); + rr.fetch(); + rr.setOwner(agentRefToReportToRepackForFailureJobAutoGc.getAgentAddress()); + rr.setJobStatus(2,cta::objectstore::serializers::RetrieveJobStatus::RJS_ToReportToRepackForFailure); + rr.commit(); + + agentRefToReportToRepackForFailureJobAutoGc.addToOwnership(rr.getAddressIfSet(),be); + + ASSERT_NO_THROW(rr.garbageCollect(agentRefToReportToRepackForFailureJobAutoGc.getAgentAddress(),agentRef,lc,catalogue)); + } + + //The Retrieve Request should now be queued in the RetrieveQueueToReportToRepackForFailure + + re.fetchNoLock(); + cta::objectstore::RetrieveQueue rqToReportToRepackForFailure(re.getRetrieveQueueAddress(ri.repackRequestAddress, cta::objectstore::JobQueueType::JobsToReportToRepackForFailure), be); + rqToReportToRepackForFailure.fetchNoLock(); + + auto jobs = rqToReportToRepackForFailure.dumpJobs(); + ASSERT_EQ(1,jobs.size()); + + auto& job = jobs.front(); + ASSERT_EQ(2,job.copyNb); + + rr.fetchNoLock(); + ASSERT_EQ(rqToReportToRepackForFailure.getAddressIfSet(),rr.getOwner()); + } +} + +TEST(ObjectStore, GarbageCollectorArchiveAllStatusesAndQueues) { + // We will need a log object +#ifdef STDOUT_LOGGING + cta::log::StdoutLogger dl("dummy", "unitTest"); +#else + cta::log::DummyLogger dl("dummy", "unitTest"); +#endif + cta::log::LogContext lc(dl); + // We need a dummy catalogue + cta::catalogue::DummyCatalogue catalogue; + // Here we check that can successfully call RetrieveRequests's garbage collector + cta::objectstore::BackendVFS be; + // Create the root entry + cta::objectstore::RootEntry re(be); + re.initialize(); + re.insert(); + // Create the agent register + cta::objectstore::EntryLogSerDeser el("user0", + "unittesthost", time(NULL)); + cta::objectstore::ScopedExclusiveLock rel(re); + // Create the agent for objects creation + cta::objectstore::AgentReference agentRef("unitTestCreateEnv", dl); + // Finish root creation. + re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc); + rel.release(); + // continue agent creation. + cta::objectstore::Agent agent(agentRef.getAgentAddress(), be); + agent.initialize(); + agent.setTimeout_us(0); + agent.insertAndRegisterSelf(lc); + + // Create all agents to be garbage collected + cta::objectstore::AgentReference agentRefToTransferForUser("ToTransferForUser", dl); + cta::objectstore::Agent agentToTransferForUser(agentRefToTransferForUser.getAgentAddress(), be); + agentToTransferForUser.initialize(); + agentToTransferForUser.setTimeout_us(0); + agentToTransferForUser.insertAndRegisterSelf(lc); + + std::string archiveRequestAddress = agentRefToTransferForUser.nextId("ArchiveRequest"); + agentRefToTransferForUser.addToOwnership(archiveRequestAddress, be); + + std::string tapePool = "tapePool"; + + cta::objectstore::ArchiveRequest ar(archiveRequestAddress, be); + ar.initialize(); + cta::common::dataStructures::ArchiveFile aFile; + aFile.archiveFileID = 123456789L; + aFile.diskFileId = "eos://diskFile"; + aFile.checksumBlob.insert(cta::checksum::NONE, ""); + aFile.creationTime = 0; + aFile.reconciliationTime = 0; + aFile.diskFileInfo = cta::common::dataStructures::DiskFileInfo(); + aFile.diskInstance = "eoseos"; + aFile.fileSize = 667; + aFile.storageClass = "sc"; + ar.setArchiveFile(aFile); + ar.addJob(2, tapePool, agentRefToTransferForUser.getAgentAddress(), 1, 1, 1); + cta::common::dataStructures::MountPolicy mp; + ar.setMountPolicy(mp); + ar.setArchiveReportURL(""); + ar.setArchiveErrorReportURL(""); + ar.setRequester(cta::common::dataStructures::RequesterIdentity("user0", "group0")); + ar.setSrcURL("root://eoseos/myFile"); + ar.setEntryLog(cta::common::dataStructures::EntryLog("user0", "host0", time(nullptr))); + ar.insert(); + + // Create the garbage collector and run it once. + cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector", dl); + cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be); + gcAgent.initialize(); + gcAgent.setTimeout_us(0); + gcAgent.insertAndRegisterSelf(lc); + + cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue); + gc.runOnePass(lc); + + { + //The Archive Request should now be queued in the ArchiveQueueToTransferForUser + re.fetchNoLock(); + cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(tapePool, cta::objectstore::JobQueueType::JobsToTransferForUser), be); + cta::objectstore::ScopedExclusiveLock aql(aq); + aq.fetch(); + auto jobs = aq.dumpJobs(); + ASSERT_EQ(1,jobs.size()); + + auto& job = jobs.front(); + ASSERT_EQ(2,job.copyNb); + + ar.fetchNoLock(); + ASSERT_EQ(ar.getJobOwner(2),aq.getAddressIfSet()); + } + + { + // TODO : Implement the garbage collection of Archive requests with the new Repack Statuses + /*cta::objectstore::AgentReference agentRefToReportForUserForFailure("ToReportToUserForFailure", dl); + cta::objectstore::Agent agentToReportToUserForFailure(agentRefToReportForUserForFailure.getAgentAddress(), be); + agentToReportToUserForFailure.initialize(); + agentToReportToUserForFailure.setTimeout_us(0); + agentToReportToUserForFailure.insertAndRegisterSelf(lc); + + cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(tapePool, cta::objectstore::JobQueueType::JobsToTransferForUser), be); + cta::objectstore::ScopedExclusiveLock aql(aq); + aq.fetch(); + aq.removeJobsAndCommit({ar.getAddressIfSet()}); + aql.release(); + + + cta::objectstore::ScopedExclusiveLock sel(ar); + ar.fetch(); + ar.setJobOwner(2,agentRefToReportForUserForFailure.getAgentAddress()); + ar.setJobStatus(2,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToReportToUserForFailure); + ar.commit(); + sel.release(); + + agentRefToReportForUserForFailure.addToOwnership(ar.getAddressIfSet(),be); + + gc.runOnePass(lc); + + //The Archive Request should be queued in the ArchiveQueueToReportToUserForFailure + re.fetchNoLock(); + cta::objectstore::ArchiveQueue aqToReportToUserForFailure(re.getArchiveQueueAddress(tapePool, cta::objectstore::JobQueueType::JobsToReportToUser), be); + + aqToReportToUserForFailure.fetchNoLock(); + + auto jobs = aqToReportToUserForFailure.dumpJobs(); + ASSERT_EQ(1,jobs.size()); + + auto& job = jobs.front(); + ASSERT_EQ(2,job.copyNb); + ASSERT_EQ(ar.getJobOwner(2),aqToReportToUserForFailure.getAddressIfSet()); + */ + } +} + } diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index 321d3153ee..b83332cb11 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -153,7 +153,7 @@ void RetrieveRequest::garbageCollect(const std::string& presumedOwner, AgentRefe // filter on tape availability. try { // If we have to fetch the status of the tapes and queued for the non-disabled vids. - auto bestVid=Helpers::selectBestRetrieveQueue(candidateVids, catalogue, m_objectStore); + bestVid=Helpers::selectBestRetrieveQueue(candidateVids, catalogue, m_objectStore); goto queueForTransfer; } catch (Helpers::NoTapeAvailableForRetrieve &) {} queueForFailure:; @@ -195,11 +195,11 @@ queueForFailure:; err << "In RetrieveRequest::garbageCollect(): could not find tapefile for copynb " << activeCopyNb; throw exception::Exception(err.str()); } - failedVidFound:; + failedVidFound:;qqq // We now need to grab the failed queue and queue the request. RetrieveQueue rq(m_objectStore); ScopedExclusiveLock rql; - Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(rq, rql, agentReference, bestVid, JobQueueType::JobsToReportToUser, lc); + Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(rq, rql, agentReference, activeVid, getQueueType(), lc); // Enqueue the job objectstore::MountPolicySerDeser mp; std::list<RetrieveQueue::JobToAdd> jta; -- GitLab