Commit 349b91ec authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Adapted the garbage collector to honour new statuses of Archive Request for Repack

parent b4b21a16
......@@ -303,14 +303,21 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std::
obj.reset();
bool jobRequeued=false;
for (auto &j: ar->dumpJobs()) {
if ((j.owner == agent.getAddressIfSet())) {
if ((j.owner == agent.getAddressIfSet() && ar->c_statusesImplyingQueueing.count(j.status))) {
std::string containerIdentifier;
try {
archiveQueuesAndRequests[std::make_tuple(j.tapePool, ar->getJobQueueType(j.copyNb))].emplace_back(ar);
if(ar->c_statusesImplyingQueueingByRepackRequestAddress.count(j.status)){
containerIdentifier = ar->getRepackInfo().repackRequestAddress;
} else {
containerIdentifier = j.tapePool;
}
archiveQueuesAndRequests[std::make_tuple(containerIdentifier, ar->getJobQueueType(j.copyNb),j.tapePool)].emplace_back(ar);
log::ScopedParamContainer params3(lc);
params3.add("tapePool", j.tapePool)
.add("containerIdentifier", containerIdentifier)
.add("copynb", j.copyNb)
.add("fileId", ar->getArchiveFile().archiveFileID);
lc.log(log::INFO, "Selected archive request for requeueing to tape pool");
lc.log(log::INFO, "Selected archive request for requeueing to the corresponding queue");
jobRequeued=true;
} catch (ArchiveRequest::JobNotQueueable &) {}
}
......@@ -397,6 +404,109 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std::
fetchedObjects.clear();
}
template<typename ArchiveSpecificQueue>
void GarbageCollector::OwnedObjectSorter::executeArchiveAlgorithm(std::list<std::shared_ptr<ArchiveRequest>> &jobs,std::string &queueAddress, const std::string& containerIdentifier, const std::string& tapepool,
std::set<std::string> & jobsIndividuallyGCed, Agent& agent, AgentReference& agentReference,
Backend &objectStore, log::LogContext& lc)
{
typedef ContainerAlgorithms<ArchiveQueue,ArchiveSpecificQueue> AqAlgos;
AqAlgos aqcl(objectStore, agentReference);
typename decltype(aqcl)::InsertedElement::list jobsToAdd;
for (auto & ar: jobs) {
// Determine the copy number and feed the queue with it.
for (auto &j: ar->dumpJobs()) {
if (j.tapePool == tapepool) {
jobsToAdd.push_back({ar.get(), j.copyNb, ar->getArchiveFile(), ar->getMountPolicy(), cta::nullopt});
}
}
}
std::set<std::string> jobsNotRequeued;
try {
aqcl.referenceAndSwitchOwnershipIfNecessary(containerIdentifier, agent.getAddressIfSet(), queueAddress, jobsToAdd, lc);
} catch (typename AqAlgos::OwnershipSwitchFailure & failure) {
for (auto &failedAR: failure.failedElements) {
try {
std::rethrow_exception(failedAR.failure);
} catch (cta::exception::Exception & e) {
// Update did not go through. It could be benign
std::string debugType=typeid(e).name();
auto & arup=*failedAR.element;
jobsNotRequeued.insert(arup.archiveRequest->getAddressIfSet());
if (typeid(e) == typeid(Backend::NoSuchObject) || typeid(e) == typeid(Backend::WrongPreviousOwner)) {
// The object was not present or not owned during update, so we skip it.
// This is nevertheless unexpected (from previous fetch, so this is an error).
log::ScopedParamContainer params(lc);
params.add("archiveRequestObject", arup.archiveRequest->getAddressIfSet())
.add("copyNb", arup.copyNb)
.add("fileId", arup.archiveRequest->getArchiveFile().archiveFileID)
.add("exceptionType", debugType);
lc.log(log::ERR,
"In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): "
"failed to requeue gone/not owned archive job. Removed from queue.");
} else {
// We have an unexpected error. We will handle this with the request-by-request garbage collection.
log::ScopedParamContainer params(lc);
params.add("archiveRequestObject", arup.archiveRequest->getAddressIfSet())
.add("copyNb", arup.copyNb)
.add("fileId", arup.archiveRequest->getArchiveFile().archiveFileID)
.add("exceptionType", debugType)
.add("exceptionMessage", e.getMessageValue());
lc.log(log::ERR, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): "
"failed to requeue archive job with unexpected error. "
"Removing from queue and will re-run individual garbage collection.");
// We will re-run the individual GC for this one.
jobsIndividuallyGCed.insert(arup.archiveRequest->getAddressIfSet());
otherObjects.emplace_back(new GenericObject(arup.archiveRequest->getAddressIfSet(), objectStore));
}
}
}
}
// We can now log individually requeued jobs.
for (auto & arup: jobsToAdd) {
if (!jobsNotRequeued.count(arup.archiveRequest->getAddressIfSet())) {
// OK, the job made it to the queue
log::ScopedParamContainer params(lc);
params.add("archiveRequestObject", arup.archiveRequest->getAddressIfSet())
.add("copyNb", arup.copyNb)
.add("fileId", arup.archiveRequest->getArchiveFile().archiveFileID)
.add("tapePool", tapepool)
.add("archiveQueueObject", queueAddress)
.add("garbageCollectedPreviousOwner", agent.getAddressIfSet());
lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): requeued archive job.");
}
}
jobsToAdd.clear();
}
std::string GarbageCollector::OwnedObjectSorter::dispatchArchiveAlgorithms(std::list<std::shared_ptr<ArchiveRequest>> &jobs,const JobQueueType& jobQueueType, const std::string& containerIdentifier,
const std::string& tapepool,std::set<std::string> & jobsIndividuallyGCed,
Agent& agent, AgentReference& agentReference, Backend & objectstore, log::LogContext &lc) {
std::string queueAddress;
switch(jobQueueType){
case JobQueueType::JobsToTransferForUser:
executeArchiveAlgorithm<ArchiveQueueToTransferForUser>(jobs,queueAddress,containerIdentifier,tapepool, jobsIndividuallyGCed, agent, agentReference, objectstore, lc);
break;
case JobQueueType::JobsToReportToUser:
executeArchiveAlgorithm<ArchiveQueueToReportForUser>(jobs,queueAddress,containerIdentifier,tapepool, jobsIndividuallyGCed, agent, agentReference, objectstore, lc);
break;
case JobQueueType::JobsToTransferForRepack:
executeArchiveAlgorithm<ArchiveQueueToTransferForRepack>(jobs,queueAddress,containerIdentifier,tapepool, jobsIndividuallyGCed, agent, agentReference, objectstore, lc);
break;
case JobQueueType::JobsToReportToRepackForSuccess:
executeArchiveAlgorithm<ArchiveQueueToReportToRepackForSuccess>(jobs,queueAddress,containerIdentifier,tapepool, jobsIndividuallyGCed, agent, agentReference, objectstore, lc);
break;
case JobQueueType::JobsToReportToRepackForFailure:
executeArchiveAlgorithm<ArchiveQueueToReportToRepackForFailure>(jobs,queueAddress,containerIdentifier,tapepool, jobsIndividuallyGCed, agent, agentReference, objectstore, lc);
break;
case JobQueueType::FailedJobs:
executeArchiveAlgorithm<ArchiveQueueFailed>(jobs,queueAddress,containerIdentifier,tapepool, jobsIndividuallyGCed, agent, agentReference, objectstore, lc);
break;
default:
break;
}
return queueAddress;
}
//TODO : We should record the VID in the ArchiveRequest object to allow the requeueing in the proper report queue (currently, the report queue is selected
//by tapepool, which works but is not the most efficient way to report the request (contention problem)
void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& agent, AgentReference& agentReference, Backend & objectStore,
......@@ -409,9 +519,10 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a
// The number of objects to requeue could be very high. In order to limit the time taken by the
// individual requeue operations, we limit the number of concurrently requeued objects to an
// arbitrary 500.
std::string containerIdentifier;
std::string tapepool;
JobQueueType queueType;
std::tie(tapepool, queueType) = archiveQueueIdAndReqs.first;
std::tie(containerIdentifier, queueType, tapepool) = archiveQueueIdAndReqs.first;
auto & requestsList = archiveQueueIdAndReqs.second;
while (requestsList.size()) {
decltype (archiveQueueIdAndReqs.second) currentJobBatch;
......@@ -419,76 +530,11 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a
currentJobBatch.emplace_back(std::move(requestsList.front()));
requestsList.pop_front();
}
utils::Timer t;
typedef ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransferForUser> AqAlgos;
AqAlgos aqcl(objectStore, agentReference);
decltype(aqcl)::InsertedElement::list jobsToAdd;
for (auto & ar: currentJobBatch) {
// Determine the copy number and feed the queue with it.
for (auto &j: ar->dumpJobs()) {
if (j.tapePool == tapepool) {
jobsToAdd.push_back({ar.get(), j.copyNb, ar->getArchiveFile(), ar->getMountPolicy(), cta::nullopt});
}
}
}
std::set<std::string> jobsIndividuallyGCed;
std::set<std::string> jobsNotRequeued;
std::string queueAddress;
try {
aqcl.referenceAndSwitchOwnershipIfNecessary(tapepool, agent.getAddressIfSet(), queueAddress, jobsToAdd, lc);
} catch (AqAlgos::OwnershipSwitchFailure & failure) {
for (auto &failedAR: failure.failedElements) {
try {
std::rethrow_exception(failedAR.failure);
} catch (cta::exception::Exception & e) {
// Update did not go through. It could be benign
std::string debugType=typeid(e).name();
auto & arup=*failedAR.element;
jobsNotRequeued.insert(arup.archiveRequest->getAddressIfSet());
if (typeid(e) == typeid(Backend::NoSuchObject) || typeid(e) == typeid(Backend::WrongPreviousOwner)) {
// The object was not present or not owned during update, so we skip it.
// This is nevertheless unexpected (from previous fetch, so this is an error).
log::ScopedParamContainer params(lc);
params.add("archiveRequestObject", arup.archiveRequest->getAddressIfSet())
.add("copyNb", arup.copyNb)
.add("fileId", arup.archiveRequest->getArchiveFile().archiveFileID)
.add("exceptionType", debugType);
lc.log(log::ERR,
"In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): "
"failed to requeue gone/not owned archive job. Removed from queue.");
} else {
// We have an unexpected error. We will handle this with the request-by-request garbage collection.
log::ScopedParamContainer params(lc);
params.add("archiveRequestObject", arup.archiveRequest->getAddressIfSet())
.add("copyNb", arup.copyNb)
.add("fileId", arup.archiveRequest->getArchiveFile().archiveFileID)
.add("exceptionType", debugType)
.add("exceptionMessage", e.getMessageValue());
lc.log(log::ERR, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): "
"failed to requeue archive job with unexpected error. "
"Removing from queue and will re-run individual garbage collection.");
// We will re-run the individual GC for this one.
jobsIndividuallyGCed.insert(arup.archiveRequest->getAddressIfSet());
otherObjects.emplace_back(new GenericObject(arup.archiveRequest->getAddressIfSet(), objectStore));
}
}
}
}
// We can now log individually requeued jobs.
for (auto & arup: jobsToAdd) {
if (!jobsNotRequeued.count(arup.archiveRequest->getAddressIfSet())) {
// OK, the job made it to the queue
log::ScopedParamContainer params(lc);
params.add("archiveRequestObject", arup.archiveRequest->getAddressIfSet())
.add("copyNb", arup.copyNb)
.add("fileId", arup.archiveRequest->getArchiveFile().archiveFileID)
.add("tapePool", tapepool)
.add("archiveQueueObject", queueAddress)
.add("garbageCollectedPreviousOwner", agent.getAddressIfSet());
lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): requeued archive job.");
}
}
jobsToAdd.clear();
utils::Timer t;
//Dispatch the archive algorithms
dispatchArchiveAlgorithms(currentJobBatch,queueType,containerIdentifier,tapepool,jobsIndividuallyGCed,agent,agentReference,objectStore,lc);
// We can now forget pool level list. But before that, we can remove the objects
// from agent ownership if this was the last reference to it.
// The usage of use_count() is safe here because we are in a single threaded environment.
......
......@@ -56,8 +56,8 @@ public:
/** Structure allowing the sorting of owned objects, so they can be requeued in batches,
* one batch per queue. */
struct OwnedObjectSorter {
// TODO: tuple[0] = containerIdentifier (tapepool or Repack Request's address), tuple[1]=jobQueueType, tuple[2]=tapepoolOfTheJob
std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr <ArchiveRequest>>> archiveQueuesAndRequests;
//tuple[0] = containerIdentifier (tapepool or Repack Request's address), tuple[1]=jobQueueType, tuple[2]=tapepoolOfTheJob
std::map<std::tuple<std::string, JobQueueType ,std::string>, std::list<std::shared_ptr <ArchiveRequest>>> archiveQueuesAndRequests;
//tuple[0] = containerIdentifier (vid or Repack Request's address), tuple[1]=jobQueueType, tuple[2]=vidOfTheJob
std::map<std::tuple<std::string, JobQueueType,std::string>, std::list<std::shared_ptr <RetrieveRequest>>> retrieveQueuesAndRequests;
std::list<std::shared_ptr<GenericObject>> otherObjects;
......@@ -76,6 +76,15 @@ public:
void lockFetchAndUpdateOtherObjects(Agent & agent, AgentReference & agentReference, Backend & objectStore,
cta::catalogue::Catalogue & catalogue, log::LogContext & lc);
//Sorter& getSorter();
std::string dispatchArchiveAlgorithms(std::list<std::shared_ptr<ArchiveRequest>> &jobs,const JobQueueType& jobQueueType, const std::string& containerIdentifier,
const std::string& tapepool,std::set<std::string> & jobsIndividuallyGCed,
Agent& agent, AgentReference& agentReference, Backend & objectstore, log::LogContext &lc);
template<typename ArchiveSpecificQueue>
void executeArchiveAlgorithm(std::list<std::shared_ptr<ArchiveRequest>> &jobs,std::string &queueAddress, const std::string& containerIdentifier, const std::string& tapepool,
std::set<std::string> & jobsIndividuallyGCed, Agent& agent, AgentReference& agentReference,
Backend &objectStore, log::LogContext& lc);
};
private:
......
......@@ -1645,9 +1645,9 @@ TEST(ObjectStore, GarbageCollectorArchiveAllStatusesAndQueues) {
}
{
// TODO : Implement the garbage collection of Archive requests with the new Repack Statuses
/*cta::objectstore::AgentReference agentRefToReportForUserForFailure("ToReportToUserForFailure", dl);
cta::objectstore::Agent agentToReportToUserForFailure(agentRefToReportForUserForFailure.getAgentAddress(), be);
//Test the AJS_ToReportToUserForFailure Garbage collection
cta::objectstore::AgentReference agentRefToReportToUserForFailure("ToReportToUserForFailure", dl);
cta::objectstore::Agent agentToReportToUserForFailure(agentRefToReportToUserForFailure.getAgentAddress(), be);
agentToReportToUserForFailure.initialize();
agentToReportToUserForFailure.setTimeout_us(0);
agentToReportToUserForFailure.insertAndRegisterSelf(lc);
......@@ -1661,28 +1661,222 @@ TEST(ObjectStore, GarbageCollectorArchiveAllStatusesAndQueues) {
cta::objectstore::ScopedExclusiveLock sel(ar);
ar.fetch();
ar.setJobOwner(2,agentRefToReportForUserForFailure.getAgentAddress());
ar.setJobOwner(2,agentRefToReportToUserForFailure.getAgentAddress());
ar.setJobStatus(2,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToReportToUserForFailure);
ar.commit();
sel.release();
agentRefToReportForUserForFailure.addToOwnership(ar.getAddressIfSet(),be);
agentRefToReportToUserForFailure.addToOwnership(ar.getAddressIfSet(),be);
gc.runOnePass(lc);
//The Archive Request should be queued in the ArchiveQueueToReportToUserForFailure
re.fetchNoLock();
cta::objectstore::ArchiveQueue aqToReportToUserForFailure(re.getArchiveQueueAddress(tapePool, cta::objectstore::JobQueueType::JobsToReportToUser), be);
{
re.fetchNoLock();
cta::objectstore::ArchiveQueue aqToReportToUserForFailure(re.getArchiveQueueAddress(tapePool, cta::objectstore::JobQueueType::JobsToReportToUser), be);
aqToReportToUserForFailure.fetchNoLock();
auto jobs = aqToReportToUserForFailure.dumpJobs();
ASSERT_EQ(1,jobs.size());
auto& job = jobs.front();
ASSERT_EQ(2,job.copyNb);
ar.fetchNoLock();
ASSERT_EQ(ar.getJobOwner(2),aqToReportToUserForFailure.getAddressIfSet());
}
}
{
//Test the AJS_ToReportToUserForTransfer Garbage collection
cta::objectstore::AgentReference agentRefToReportToUserForTransfer("ToReportToUserForTransfer", dl);
cta::objectstore::Agent agentToReportToUserForTransfer(agentRefToReportToUserForTransfer.getAgentAddress(), be);
agentToReportToUserForTransfer.initialize();
agentToReportToUserForTransfer.setTimeout_us(0);
agentToReportToUserForTransfer.insertAndRegisterSelf(lc);
cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(tapePool, cta::objectstore::JobQueueType::JobsToReportToUser), be);
cta::objectstore::ScopedExclusiveLock aql(aq);
aq.fetch();
aq.removeJobsAndCommit({ar.getAddressIfSet()});
aql.release();
aqToReportToUserForFailure.fetchNoLock();
auto jobs = aqToReportToUserForFailure.dumpJobs();
ASSERT_EQ(1,jobs.size());
cta::objectstore::ScopedExclusiveLock sel(ar);
ar.fetch();
ar.setJobOwner(2,agentRefToReportToUserForTransfer.getAgentAddress());
ar.setJobStatus(2,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToReportToUserForTransfer);
ar.commit();
sel.release();
agentRefToReportToUserForTransfer.addToOwnership(ar.getAddressIfSet(),be);
auto& job = jobs.front();
ASSERT_EQ(2,job.copyNb);
ASSERT_EQ(ar.getJobOwner(2),aqToReportToUserForFailure.getAddressIfSet());
*/
gc.runOnePass(lc);
//The Archive Request should be queued in the ArchiveQueueToReportToUserForFailure
{
re.fetchNoLock();
cta::objectstore::ArchiveQueue aqToReportToUserForTransfer(re.getArchiveQueueAddress(tapePool, cta::objectstore::JobQueueType::JobsToReportToUser), be);
aqToReportToUserForTransfer.fetchNoLock();
auto jobs = aqToReportToUserForTransfer.dumpJobs();
ASSERT_EQ(1,jobs.size());
auto& job = jobs.front();
ASSERT_EQ(2,job.copyNb);
ar.fetchNoLock();
ASSERT_EQ(ar.getJobOwner(2),aqToReportToUserForTransfer.getAddressIfSet());
}
}
{
//Test the garbage collection of an AJS_Failed job
cta::objectstore::AgentReference agentRefFailed("Failed", dl);
cta::objectstore::Agent agentFailed(agentRefFailed.getAgentAddress(), be);
agentFailed.initialize();
agentFailed.setTimeout_us(0);
agentFailed.insertAndRegisterSelf(lc);
cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(tapePool, cta::objectstore::JobQueueType::JobsToReportToUser), be);
cta::objectstore::ScopedExclusiveLock aql(aq);
aq.fetch();
aq.removeJobsAndCommit({ar.getAddressIfSet()});
aql.release();
cta::objectstore::ScopedExclusiveLock sel(ar);
ar.fetch();
ar.setJobOwner(2,agentRefFailed.getAgentAddress());
ar.setJobStatus(2,cta::objectstore::serializers::ArchiveJobStatus::AJS_Failed);
ar.commit();
sel.release();
agentRefFailed.addToOwnership(ar.getAddressIfSet(),be);
gc.runOnePass(lc);
//The Archive Request should be queued in the ArchiveQueueFailed
{
re.fetchNoLock();
cta::objectstore::ArchiveQueue aqFailed(re.getArchiveQueueAddress(tapePool, cta::objectstore::JobQueueType::FailedJobs), be);
aqFailed.fetchNoLock();
auto jobs = aqFailed.dumpJobs();
ASSERT_EQ(1,jobs.size());
auto& job = jobs.front();
ASSERT_EQ(2,job.copyNb);
ar.fetchNoLock();
ASSERT_EQ(ar.getJobOwner(2),aqFailed.getAddressIfSet());
}
}
//Add Repack informations to test the garbage collection of Archive Requests for Repack
//Create a repack info object for the garbage collection of Jobs ToReportToRepackForSuccess and ToReportToRepackForFailure
cta::objectstore::ArchiveRequest::RepackInfo ri;
ri.isRepack = true;
ri.fSeq = 1;
ri.fileBufferURL = "testFileBufferURL";
ri.repackRequestAddress = "repackRequestAddress";
{
cta::objectstore::ScopedExclusiveLock sel(ar);
ar.fetch();
ar.setRepackInfo(ri);
ar.commit();
}
{
//Test the Garbage collection of an AJS_ToReportToRepackForSuccess job
cta::objectstore::AgentReference agentRefToReportToRepackForSuccess("ToReportToUserForTransfer", dl);
cta::objectstore::Agent agentToReportToRepackForSuccess(agentRefToReportToRepackForSuccess.getAgentAddress(), be);
agentToReportToRepackForSuccess.initialize();
agentToReportToRepackForSuccess.setTimeout_us(0);
agentToReportToRepackForSuccess.insertAndRegisterSelf(lc);
cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(tapePool, cta::objectstore::JobQueueType::FailedJobs), be);
cta::objectstore::ScopedExclusiveLock aql(aq);
aq.fetch();
aq.removeJobsAndCommit({ar.getAddressIfSet()});
aql.release();
cta::objectstore::ScopedExclusiveLock sel(ar);
ar.fetch();
ar.setJobOwner(2,agentRefToReportToRepackForSuccess.getAgentAddress());
ar.setJobStatus(2,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToReportToRepackForSuccess);
ar.commit();
sel.release();
agentRefToReportToRepackForSuccess.addToOwnership(ar.getAddressIfSet(),be);
gc.runOnePass(lc);
//The Archive Request should be queued in the ArchiveQueueToReportToRepackForSuccess
{
re.fetchNoLock();
cta::objectstore::ArchiveQueue aqToReportToRepackForSuccess(re.getArchiveQueueAddress(ri.repackRequestAddress, cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess), be);
aqToReportToRepackForSuccess.fetchNoLock();
auto jobs = aqToReportToRepackForSuccess.dumpJobs();
ASSERT_EQ(1,jobs.size());
auto& job = jobs.front();
ASSERT_EQ(2,job.copyNb);
ar.fetchNoLock();
ASSERT_EQ(ar.getJobOwner(2),aqToReportToRepackForSuccess.getAddressIfSet());
}
}
{
//Test the garbage collection of an AJS_ToReportToRepackForFailure job
cta::objectstore::AgentReference agentRefToReportToRepackForFailure("ToReportToRepackForFailure", dl);
cta::objectstore::Agent agentToReportToRepackForFailure(agentRefToReportToRepackForFailure.getAgentAddress(), be);
agentToReportToRepackForFailure.initialize();
agentToReportToRepackForFailure.setTimeout_us(0);
agentToReportToRepackForFailure.insertAndRegisterSelf(lc);
cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(ri.repackRequestAddress, cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess), be);
cta::objectstore::ScopedExclusiveLock aql(aq);
aq.fetch();
aq.removeJobsAndCommit({ar.getAddressIfSet()});
aql.release();
cta::objectstore::ScopedExclusiveLock sel(ar);
ar.fetch();
ar.setJobOwner(2,agentRefToReportToRepackForFailure.getAgentAddress());
ar.setJobStatus(2,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToReportToRepackForFailure);
ar.commit();
sel.release();
agentRefToReportToRepackForFailure.addToOwnership(ar.getAddressIfSet(),be);
gc.runOnePass(lc);
//The Archive Request should be queued in the ArchiveQueueToReportToRepackForFailure
{
re.fetchNoLock();
cta::objectstore::ArchiveQueue aqToReportToRepackForFailure(re.getArchiveQueueAddress(ri.repackRequestAddress, cta::objectstore::JobQueueType::JobsToReportToRepackForFailure), be);
aqToReportToRepackForFailure.fetchNoLock();
auto jobs = aqToReportToRepackForFailure.dumpJobs();
ASSERT_EQ(1,jobs.size());
auto& job = jobs.front();
ASSERT_EQ(2,job.copyNb);
ar.fetchNoLock();
ASSERT_EQ(ar.getJobOwner(2),aqToReportToRepackForFailure.getAddressIfSet());
}
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment