diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index 928dfe931dac510fbf3e971c3b22adfd9d59c1d2..a77e290b79c00f1906da669ed619d9c7a02ba31f 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -752,6 +752,10 @@ JobQueueType ArchiveRequest::getQueueType(const serializers::ArchiveJobStatus& s case ArchiveJobStatus::AJS_ToReportToUserForTransfer: case ArchiveJobStatus::AJS_ToReportToUserForFailure: return JobQueueType::JobsToReportToUser; + case ArchiveJobStatus::AJS_ToReportToRepackForSuccess: + return JobQueueType::JobsToReportToRepackForSuccess; + case ArchiveJobStatus::AJS_ToReportToRepackForFailure: + return JobQueueType::JobsToReportToRepackForFailure; case ArchiveJobStatus::AJS_Failed: return JobQueueType::FailedJobs; default: diff --git a/objectstore/GarbageCollector.hpp b/objectstore/GarbageCollector.hpp index 5652f629dd5b1d733ded86fa8151c581a4d5e12d..04111a9fd76b84275fc91edc6306d19b96680b78 100644 --- a/objectstore/GarbageCollector.hpp +++ b/objectstore/GarbageCollector.hpp @@ -93,7 +93,6 @@ private: AgentReference & m_ourAgentReference; AgentRegister m_agentRegister; std::map<std::string, AgentWatchdog * > m_watchedAgents; - //void garbageCollectArchiveRequests(Agent& agent, OwnedObjectSorter &ownedObjectSorter,log::LogContext & lc); }; }} diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index f85cab280cdd73e2c40d2fcc8c64eb46a7c24a65..4fbaeb5226797a136d899f1f6b82e06b547a0961 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -1643,7 +1643,48 @@ TEST(ObjectStore, GarbageCollectorArchiveAllStatusesAndQueues) { ar.fetchNoLock(); ASSERT_EQ(ar.getJobOwner(2),aq.getAddressIfSet()); } + { + //Test the AJS_ToTransferForUser auto garbage collection + cta::objectstore::AgentReference agentRefToTransferForUserAutoGC("ToTransferForUserAutoGC", dl); + cta::objectstore::Agent agentToTransferForUserAutoGC(agentRefToTransferForUserAutoGC.getAgentAddress(), be); + agentToTransferForUserAutoGC.initialize(); + agentToTransferForUserAutoGC.setTimeout_us(0); + agentToTransferForUserAutoGC.insertAndRegisterSelf(lc); + cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(tapePool, cta::objectstore::JobQueueType::JobsToTransferForUser), be); + cta::objectstore::ScopedExclusiveLock aql(aq); + aq.fetch(); + aq.removeJobsAndCommit({ar.getAddressIfSet()}); + aql.release(); + + + cta::objectstore::ScopedExclusiveLock sel(ar); + ar.fetch(); + ar.setJobOwner(2,agentRefToTransferForUserAutoGC.getAgentAddress()); + ar.setJobStatus(2,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToTransferForUser); + ar.commit(); + agentRefToTransferForUserAutoGC.addToOwnership(ar.getAddressIfSet(),be); + + ar.garbageCollect(agentRefToTransferForUserAutoGC.getAgentAddress(),agentRef,lc,catalogue); + sel.release(); + + { + //The Archive Request should be queued in the ArchiveQueueToTransferForUser + re.fetchNoLock(); + cta::objectstore::ArchiveQueue aqToTransferForUser(re.getArchiveQueueAddress(tapePool, cta::objectstore::JobQueueType::JobsToTransferForUser), be); + + aqToTransferForUser.fetchNoLock(); + + auto jobs = aqToTransferForUser.dumpJobs(); + ASSERT_EQ(1,jobs.size()); + + auto& job = jobs.front(); + ASSERT_EQ(2,job.copyNb); + + ar.fetchNoLock(); + ASSERT_EQ(ar.getJobOwner(2),aqToTransferForUser.getAddressIfSet()); + } + } { //Test the AJS_ToReportToUserForFailure Garbage collection cta::objectstore::AgentReference agentRefToReportToUserForFailure("ToReportToUserForFailure", dl); @@ -1670,7 +1711,47 @@ TEST(ObjectStore, GarbageCollectorArchiveAllStatusesAndQueues) { gc.runOnePass(lc); - //The Archive Request should be queued in the ArchiveQueueToReportToUserForFailure + //The Archive Request should be queued in the ArchiveQueueToReportForUser + { + re.fetchNoLock(); + cta::objectstore::ArchiveQueue aqToReportToUserForFailure(re.getArchiveQueueAddress(tapePool, cta::objectstore::JobQueueType::JobsToReportToUser), be); + + aqToReportToUserForFailure.fetchNoLock(); + + auto jobs = aqToReportToUserForFailure.dumpJobs(); + ASSERT_EQ(1,jobs.size()); + + auto& job = jobs.front(); + ASSERT_EQ(2,job.copyNb); + + ar.fetchNoLock(); + ASSERT_EQ(ar.getJobOwner(2),aqToReportToUserForFailure.getAddressIfSet()); + } + } + { + //Test the AJS_ToReportToUserForFailure Auto Garbage collection + cta::objectstore::AgentReference agentRefToReportToUserForFailureAutoGC("ToReportToUserForFailureAutoGC", dl); + cta::objectstore::Agent agentToReportToUserForFailureAutoGC(agentRefToReportToUserForFailureAutoGC.getAgentAddress(), be); + agentToReportToUserForFailureAutoGC.initialize(); + agentToReportToUserForFailureAutoGC.setTimeout_us(0); + agentToReportToUserForFailureAutoGC.insertAndRegisterSelf(lc); + + cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(tapePool, cta::objectstore::JobQueueType::JobsToReportToUser), be); + cta::objectstore::ScopedExclusiveLock aql(aq); + aq.fetch(); + aq.removeJobsAndCommit({ar.getAddressIfSet()}); + aql.release(); + + + cta::objectstore::ScopedExclusiveLock sel(ar); + ar.fetch(); + ar.setJobOwner(2,agentRefToReportToUserForFailureAutoGC.getAgentAddress()); + ar.setJobStatus(2,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToReportToUserForFailure); + ar.commit(); + agentRefToReportToUserForFailureAutoGC.addToOwnership(ar.getAddressIfSet(),be); + ar.garbageCollect(agentRefToReportToUserForFailureAutoGC.getAgentAddress(),agentRef,lc,catalogue); + + //The Archive Request should be queued in the ArchiveQueueToReportForUser { re.fetchNoLock(); cta::objectstore::ArchiveQueue aqToReportToUserForFailure(re.getArchiveQueueAddress(tapePool, cta::objectstore::JobQueueType::JobsToReportToUser), be); @@ -1714,7 +1795,7 @@ TEST(ObjectStore, GarbageCollectorArchiveAllStatusesAndQueues) { gc.runOnePass(lc); - //The Archive Request should be queued in the ArchiveQueueToReportToUserForFailure + //The Archive Request should be queued in the ArchiveQueueToReportForUser { re.fetchNoLock(); cta::objectstore::ArchiveQueue aqToReportToUserForTransfer(re.getArchiveQueueAddress(tapePool, cta::objectstore::JobQueueType::JobsToReportToUser), be); @@ -1731,7 +1812,45 @@ TEST(ObjectStore, GarbageCollectorArchiveAllStatusesAndQueues) { ASSERT_EQ(ar.getJobOwner(2),aqToReportToUserForTransfer.getAddressIfSet()); } } + { + //Test the AJS_ToReportToUserForTransfer Auto Garbage collection + cta::objectstore::AgentReference agentRefToReportToUserForTransferAutoGC("ToReportToUserForTransferAutoGC", dl); + cta::objectstore::Agent agentToReportToUserForTransferAutoGC(agentRefToReportToUserForTransferAutoGC.getAgentAddress(), be); + agentToReportToUserForTransferAutoGC.initialize(); + agentToReportToUserForTransferAutoGC.setTimeout_us(0); + agentToReportToUserForTransferAutoGC.insertAndRegisterSelf(lc); + cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(tapePool, cta::objectstore::JobQueueType::JobsToReportToUser), be); + cta::objectstore::ScopedExclusiveLock aql(aq); + aq.fetch(); + aq.removeJobsAndCommit({ar.getAddressIfSet()}); + aql.release(); + + cta::objectstore::ScopedExclusiveLock sel(ar); + ar.fetch(); + ar.setJobOwner(2,agentRefToReportToUserForTransferAutoGC.getAgentAddress()); + ar.setJobStatus(2,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToReportToUserForTransfer); + ar.commit(); + agentRefToReportToUserForTransferAutoGC.addToOwnership(ar.getAddressIfSet(),be); + ar.garbageCollect(agentRefToReportToUserForTransferAutoGC.getAgentAddress(),agentRef,lc,catalogue); + + //The Archive Request should be queued in the ArchiveQueueToReportForUser + { + re.fetchNoLock(); + cta::objectstore::ArchiveQueue aqToReportToUserForTransfer(re.getArchiveQueueAddress(tapePool, cta::objectstore::JobQueueType::JobsToReportToUser), be); + + aqToReportToUserForTransfer.fetchNoLock(); + + auto jobs = aqToReportToUserForTransfer.dumpJobs(); + ASSERT_EQ(1,jobs.size()); + + auto& job = jobs.front(); + ASSERT_EQ(2,job.copyNb); + + ar.fetchNoLock(); + ASSERT_EQ(ar.getJobOwner(2),aqToReportToUserForTransfer.getAddressIfSet()); + } + } { //Test the garbage collection of an AJS_Failed job cta::objectstore::AgentReference agentRefFailed("Failed", dl); @@ -1776,6 +1895,46 @@ TEST(ObjectStore, GarbageCollectorArchiveAllStatusesAndQueues) { } } + { + //Test the AJS_Failed job Auto Garbage collection + cta::objectstore::AgentReference agentRefFailedAutoGC("FailedAutoGC", dl); + cta::objectstore::Agent agentFailedAutoGC(agentRefFailedAutoGC.getAgentAddress(), be); + agentFailedAutoGC.initialize(); + agentFailedAutoGC.setTimeout_us(0); + agentFailedAutoGC.insertAndRegisterSelf(lc); + + cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(tapePool, cta::objectstore::JobQueueType::FailedJobs), be); + cta::objectstore::ScopedExclusiveLock aql(aq); + aq.fetch(); + aq.removeJobsAndCommit({ar.getAddressIfSet()}); + aql.release(); + + cta::objectstore::ScopedExclusiveLock sel(ar); + ar.fetch(); + ar.setJobOwner(2,agentRefFailedAutoGC.getAgentAddress()); + ar.setJobStatus(2,cta::objectstore::serializers::ArchiveJobStatus::AJS_Failed); + ar.commit(); + agentRefFailedAutoGC.addToOwnership(ar.getAddressIfSet(),be); + ar.garbageCollect(agentRefFailedAutoGC.getAgentAddress(),agentRef,lc,catalogue); + + //The Archive Request should be queued in the ArchiveQueueFailed + { + re.fetchNoLock(); + cta::objectstore::ArchiveQueue aqFailed(re.getArchiveQueueAddress(tapePool, cta::objectstore::JobQueueType::FailedJobs), be); + + aqFailed.fetchNoLock(); + + auto jobs = aqFailed.dumpJobs(); + ASSERT_EQ(1,jobs.size()); + + auto& job = jobs.front(); + ASSERT_EQ(2,job.copyNb); + + ar.fetchNoLock(); + ASSERT_EQ(ar.getJobOwner(2),aqFailed.getAddressIfSet()); + } + } + //Add Repack informations to test the garbage collection of Archive Requests for Repack //Create a repack info object for the garbage collection of Jobs ToReportToRepackForSuccess and ToReportToRepackForFailure cta::objectstore::ArchiveRequest::RepackInfo ri; @@ -1835,6 +1994,46 @@ TEST(ObjectStore, GarbageCollectorArchiveAllStatusesAndQueues) { } } + { + //Test the AJS_ToReportToRepackForSuccess job Auto Garbage collection + cta::objectstore::AgentReference agentRefToReportToRepackForSuccessAutoGC("ToReportToRepackForSuccessAutoGC", dl); + cta::objectstore::Agent agentToReportToRepackForSuccessAutoGC(agentRefToReportToRepackForSuccessAutoGC.getAgentAddress(), be); + agentToReportToRepackForSuccessAutoGC.initialize(); + agentToReportToRepackForSuccessAutoGC.setTimeout_us(0); + agentToReportToRepackForSuccessAutoGC.insertAndRegisterSelf(lc); + + cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(ri.repackRequestAddress, cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess), be); + cta::objectstore::ScopedExclusiveLock aql(aq); + aq.fetch(); + aq.removeJobsAndCommit({ar.getAddressIfSet()}); + aql.release(); + + cta::objectstore::ScopedExclusiveLock sel(ar); + ar.fetch(); + ar.setJobOwner(2,agentRefToReportToRepackForSuccessAutoGC.getAgentAddress()); + ar.setJobStatus(2,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToReportToRepackForSuccess); + ar.commit(); + agentRefToReportToRepackForSuccessAutoGC.addToOwnership(ar.getAddressIfSet(),be); + ar.garbageCollect(agentRefToReportToRepackForSuccessAutoGC.getAgentAddress(),agentRef,lc,catalogue); + + //The Archive Request should be queued in the ArchiveQueueToReportToRepackForSuccess + { + re.fetchNoLock(); + cta::objectstore::ArchiveQueue aqToReportToRepackForSuccess(re.getArchiveQueueAddress(ri.repackRequestAddress, cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess), be); + + aqToReportToRepackForSuccess.fetchNoLock(); + + auto jobs = aqToReportToRepackForSuccess.dumpJobs(); + ASSERT_EQ(1,jobs.size()); + + auto& job = jobs.front(); + ASSERT_EQ(2,job.copyNb); + + ar.fetchNoLock(); + ASSERT_EQ(ar.getJobOwner(2),aqToReportToRepackForSuccess.getAddressIfSet()); + } + } + { //Test the garbage collection of an AJS_ToReportToRepackForFailure job cta::objectstore::AgentReference agentRefToReportToRepackForFailure("ToReportToRepackForFailure", dl); @@ -1861,6 +2060,45 @@ TEST(ObjectStore, GarbageCollectorArchiveAllStatusesAndQueues) { gc.runOnePass(lc); + //The Archive Request should be queued in the ArchiveQueueToReportToRepackForFailure + { + re.fetchNoLock(); + cta::objectstore::ArchiveQueue aqToReportToRepackForFailure(re.getArchiveQueueAddress(ri.repackRequestAddress, cta::objectstore::JobQueueType::JobsToReportToRepackForFailure), be); + + aqToReportToRepackForFailure.fetchNoLock(); + + auto jobs = aqToReportToRepackForFailure.dumpJobs(); + ASSERT_EQ(1,jobs.size()); + + auto& job = jobs.front(); + ASSERT_EQ(2,job.copyNb); + + ar.fetchNoLock(); + ASSERT_EQ(ar.getJobOwner(2),aqToReportToRepackForFailure.getAddressIfSet()); + } + } + { + //Test the AJS_ToReportToRepackForFailure job Auto Garbage collection + cta::objectstore::AgentReference agentRefToReportToRepackForFailureAutoGC("ToReportToRepackForFailureAutoGC", dl); + cta::objectstore::Agent agentToReportToRepackForFailureAutoGC(agentRefToReportToRepackForFailureAutoGC.getAgentAddress(), be); + agentToReportToRepackForFailureAutoGC.initialize(); + agentToReportToRepackForFailureAutoGC.setTimeout_us(0); + agentToReportToRepackForFailureAutoGC.insertAndRegisterSelf(lc); + + cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(ri.repackRequestAddress, cta::objectstore::JobQueueType::JobsToReportToRepackForFailure), be); + cta::objectstore::ScopedExclusiveLock aql(aq); + aq.fetch(); + aq.removeJobsAndCommit({ar.getAddressIfSet()}); + aql.release(); + + cta::objectstore::ScopedExclusiveLock sel(ar); + ar.fetch(); + ar.setJobOwner(2,agentRefToReportToRepackForFailureAutoGC.getAgentAddress()); + ar.setJobStatus(2,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToReportToRepackForFailure); + ar.commit(); + agentRefToReportToRepackForFailureAutoGC.addToOwnership(ar.getAddressIfSet(),be); + ar.garbageCollect(agentRefToReportToRepackForFailureAutoGC.getAgentAddress(),agentRef,lc,catalogue); + //The Archive Request should be queued in the ArchiveQueueToReportToRepackForFailure { re.fetchNoLock();