Commit 608ab462 authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Corrected the RetrieveRequest::garbageCollect method

Added some unit tests to test all garbage collection of all type of Retrieve Requests (ToTransferForUser, ToReportToUser, ToReportToRepackForSuccess, ToReportToRepackForFailure)
parent ef4aea8c
......@@ -381,12 +381,10 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer
auto * jl = m_payload.mutable_jobs();
bool anythingGarbageCollected=false;
using serializers::ArchiveJobStatus;
std::set<ArchiveJobStatus> statusesImplyingQueueing ({ArchiveJobStatus::AJS_ToTransferForUser, ArchiveJobStatus::AJS_ToReportToUserForTransfer,
ArchiveJobStatus::AJS_ToReportToUserForFailure, ArchiveJobStatus::AJS_Failed});
for (auto j=jl->begin(); j!=jl->end(); j++) {
auto owner=j->owner();
auto status=j->status();
if ( statusesImplyingQueueing.count(status) && owner==presumedOwner) {
if ( c_statusesImplyingQueueing.count(status) && owner==presumedOwner) {
// The job is in a state which implies queuing.
std::string queueObject="Not defined yet";
anythingGarbageCollected=true;
......@@ -396,7 +394,13 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer
// recreated (this will be done by helper).
ArchiveQueue aq(m_objectStore);
ScopedExclusiveLock aql;
Helpers::getLockedAndFetchedJobQueue<ArchiveQueue>(aq, aql, agentReference, j->tapepool(), getQueueType(status), lc);
std::string containerId;
if(!c_statusesImplyingQueueingByRepackRequestAddress.count(status)){
containerId = j->tapepool();
} else {
containerId = m_payload.repack_info().repack_request_address();
}
Helpers::getLockedAndFetchedJobQueue<ArchiveQueue>(aq, aql, agentReference, containerId, getQueueType(status), lc);
queueObject=aq.getAddressIfSet();
ArchiveRequest::JobDump jd;
jd.copyNb = j->copynb();
......
......@@ -83,6 +83,13 @@ public:
* success/failure scenario. */
serializers::ArchiveJobStatus nextStatus;
};
const std::set<serializers::ArchiveJobStatus> c_statusesImplyingQueueing = {serializers::ArchiveJobStatus::AJS_ToTransferForUser, serializers::ArchiveJobStatus::AJS_ToReportToUserForTransfer,
serializers::ArchiveJobStatus::AJS_ToReportToUserForFailure, serializers::ArchiveJobStatus::AJS_Failed,
serializers::ArchiveJobStatus::AJS_ToTransferForRepack, serializers::ArchiveJobStatus::AJS_ToReportToRepackForFailure,
serializers::ArchiveJobStatus::AJS_ToReportToRepackForSuccess
};
const std::set<serializers::ArchiveJobStatus> c_statusesImplyingQueueingByRepackRequestAddress {serializers::ArchiveJobStatus::AJS_ToReportToRepackForFailure,
serializers::ArchiveJobStatus::AJS_ToReportToRepackForSuccess};
private:
/**
* Determine and set the new status of the job and determine whether and where the request should be queued
......
......@@ -339,9 +339,17 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std::
}
// Small parenthesis for non transfer cases.
if (candidateVids.empty()) {
// The request might need to be added to the failed to report of failed queue/container.
//If the queueType of the RetrieveRequest is FailedJobs or JobsToReportToUser, it needs to be requeued in a queue identified by the vid of the tape
//If queueType is JobsToReportToRepackForSuccess or JobsToReportToRepackForFailure, it needs to be requeued in a queue identified by the RepackRequest's address
try {
retrieveQueuesAndRequests[std::make_tuple(rr->getArchiveFile().tapeFiles.begin()->vid, rr->getQueueType())].emplace_back(rr);
std::string vid = rr->getArchiveFile().tapeFiles.begin()->vid;
if(rr->getQueueType() != JobQueueType::FailedJobs && rr->getQueueType() != JobQueueType::JobsToReportToUser){
retrieveQueuesAndRequests[std::make_tuple(rr->getRepackInfo().repackRequestAddress, rr->getQueueType(),vid)].emplace_back(rr);
} else {
// The request has failed, might need to be added to the failed to report of failed queue/container.
retrieveQueuesAndRequests[std::make_tuple(vid, rr->getQueueType(),vid)].emplace_back(rr);
}
break;
} catch (cta::exception::Exception & ex) {
log::ScopedParamContainer params3(lc);
params3.add("fileId", rr->getArchiveFile().archiveFileID)
......@@ -362,7 +370,7 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std::
otherObjects.emplace_back(new GenericObject(rr->getAddressIfSet(), objectStore));
break;
}
retrieveQueuesAndRequests[std::make_tuple(vid, JobQueueType::JobsToTransferForUser)].emplace_back(rr);
retrieveQueuesAndRequests[std::make_tuple(vid, JobQueueType::JobsToTransferForUser,vid)].emplace_back(rr);
log::ScopedParamContainer params3(lc);
// Find copyNb for logging
size_t copyNb = std::numeric_limits<size_t>::max();
......@@ -373,8 +381,8 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std::
.add("tapeVid", vid)
.add("fSeq", fSeq);
lc.log(log::INFO, "Selected vid to be requeued for retrieve request.");
break;
}
break;
default:
// For other objects, we will not implement any optimization and simply call
// their individual garbageCollect method.
......@@ -520,9 +528,10 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent&
// 2) Get the retrieve requests done. They are simpler as retrieve requests are fully owned.
// Then should hence not have changes since we pre-fetched them.
for (auto & retriveQueueIdAndReqs: retrieveQueuesAndRequests) {
std::string vid;
std::string containerIdentifier;
JobQueueType queueType;
std::tie(vid, queueType) = retriveQueueIdAndReqs.first;
std::string vid;
std::tie(containerIdentifier, queueType, vid) = retriveQueueIdAndReqs.first;
auto & requestsList = retriveQueueIdAndReqs.second;
while (requestsList.size()) {
decltype (retriveQueueIdAndReqs.second) currentJobBatch;
......@@ -545,7 +554,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent&
// Get the retrieve queue and add references to the jobs to it.
RetrieveQueue rq(objectStore);
ScopedExclusiveLock rql;
Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(rq,rql, agentReference, vid, queueType, lc);
Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(rq,rql, agentReference, containerIdentifier, queueType, lc);
queueLockFetchTime = t.secs(utils::Timer::resetCounter);
auto jobsSummary=rq.getJobsSummary();
filesBefore=jobsSummary.jobs;
......@@ -557,9 +566,9 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent&
for (auto & rr: currentJobBatch) {
// Determine the copy number and feed the queue with it.
for (auto &tf: rr->getArchiveFile().tapeFiles) {
if (tf.vid == vid) {
if (tf.vid == vid) {
jta.push_back({tf.copyNb, tf.fSeq, rr->getAddressIfSet(), rr->getArchiveFile().fileSize,
rr->getRetrieveFileQueueCriteria().mountPolicy, rr->getEntryLog().time, rr->getActivity()});
rr->getRetrieveFileQueueCriteria().mountPolicy, rr->getEntryLog().time, rr->getActivity()});
}
}
}
......
......@@ -28,7 +28,7 @@
/**
* Plan => Garbage collector keeps track of the agents.
* If an agent is declared dead => tape ownership of owned objects
* If an agent is declared dead => take ownership of owned objects
* Using the backup owner, re-post the objet to the container.
* All containers will have a "repost" method, which is more thorough
* (and expensive) than the usual one. It can for example prevent double posting.
......@@ -56,8 +56,10 @@ public:
/** Structure allowing the sorting of owned objects, so they can be requeued in batches,
* one batch per queue. */
struct OwnedObjectSorter {
// TODO: tuple[0] = containerIdentifier (tapepool or Repack Request's address), tuple[1]=jobQueueType, tuple[2]=tapepoolOfTheJob
std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr <ArchiveRequest>>> archiveQueuesAndRequests;
std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr <RetrieveRequest>>> retrieveQueuesAndRequests;
//tuple[0] = containerIdentifier (vid or Repack Request's address), tuple[1]=jobQueueType, tuple[2]=vidOfTheJob
std::map<std::tuple<std::string, JobQueueType,std::string>, std::list<std::shared_ptr <RetrieveRequest>>> retrieveQueuesAndRequests;
std::list<std::shared_ptr<GenericObject>> otherObjects;
//Sorter m_sorter;
/// Fill up the fetchedObjects with objects of interest.
......
This diff is collapsed.
......@@ -153,7 +153,7 @@ void RetrieveRequest::garbageCollect(const std::string& presumedOwner, AgentRefe
// filter on tape availability.
try {
// If we have to fetch the status of the tapes and queued for the non-disabled vids.
auto bestVid=Helpers::selectBestRetrieveQueue(candidateVids, catalogue, m_objectStore);
bestVid=Helpers::selectBestRetrieveQueue(candidateVids, catalogue, m_objectStore);
goto queueForTransfer;
} catch (Helpers::NoTapeAvailableForRetrieve &) {}
queueForFailure:;
......@@ -195,11 +195,11 @@ queueForFailure:;
err << "In RetrieveRequest::garbageCollect(): could not find tapefile for copynb " << activeCopyNb;
throw exception::Exception(err.str());
}
failedVidFound:;
failedVidFound:;qqq
// We now need to grab the failed queue and queue the request.
RetrieveQueue rq(m_objectStore);
ScopedExclusiveLock rql;
Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(rq, rql, agentReference, bestVid, JobQueueType::JobsToReportToUser, lc);
Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(rq, rql, agentReference, activeVid, getQueueType(), lc);
// Enqueue the job
objectstore::MountPolicySerDeser mp;
std::list<RetrieveQueue::JobToAdd> jta;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment