Commit 98281aa6 authored by Eric Cano's avatar Eric Cano
Browse files

Generalized queue type to "ToTransfer" "ToReport" "Failed".

"ToTransfer" are to be picked up by tape sessions.
"ToReport" Includes both successes and failures to report, as the mechanism to report is the same.
   They will be handled by the reporter, which shares the single thread of the garbage collector.
"Failed" Will be a (possibly non-queue) container which will contain the failed requests. The operators
   will be able to examine, relaunch or abandon those requests.

The states and lifecycles of the requests have been reworked to reflect this lifecycle too.
The container algorithmes have been adapted to handle the multiple queue/container types.
parent f2629518
......@@ -104,10 +104,10 @@ public:
template <class Element>
static ElementAddress getElementAddress(const Element & e);
static void getLockedAndFetched(Container & cont, ScopedExclusiveLock & contLock, AgentReference & agRef, const ContainerIdentifyer & cId,
log::LogContext & lc);
static void getLockedAndFetchedNoCreate(Container & cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId,
log::LogContext & lc);
static void getLockedAndFetched(Container & cont, ScopedExclusiveLock & contLock, AgentReference & agRef,
const ContainerIdentifyer & cId, QueueType queueType, log::LogContext & lc);
static void getLockedAndFetchedNoCreate(Container & cont, ScopedExclusiveLock & contLock,
const ContainerIdentifyer & cId, QueueType queueType, log::LogContext & lc);
static void addReferencesAndCommit(Container & cont, typename InsertedElement::list & elemMemCont,
AgentReference & agentRef, log::LogContext & lc);
static void addReferencesIfNecessaryAndCommit(Container & cont, typename InsertedElement::list & elemMemCont,
......@@ -136,13 +136,13 @@ public:
* are provided existing and owned by algorithm's agent.
*/
void referenceAndSwitchOwnership(const typename ContainerTraits<C>::ContainerIdentifyer & contId,
const typename ContainerTraits<C>::ContainerIdentifyer & prevContId,
QueueType queueType, const typename ContainerTraits<C>::ContainerIdentifyer & prevContId,
typename ContainerTraits<C>::InsertedElement::list & elements, log::LogContext & lc) {
C cont(m_backend);
ScopedExclusiveLock contLock;
log::TimingList timingList;
utils::Timer t;
ContainerTraits<C>::getLockedAndFetched(cont, contLock, m_agentReference, contId, lc);
ContainerTraits<C>::getLockedAndFetched(cont, contLock, m_agentReference, contId, queueType, lc);
ContainerTraits<C>::addReferencesAndCommit(cont, elements, m_agentReference, lc);
auto failedOwnershipSwitchElements = ContainerTraits<C>::switchElementsOwnership(elements, cont.getAddressIfSet(),
prevContId, timingList, t, lc);
......@@ -182,7 +182,7 @@ public:
* This function is typically used by the garbage collector. We do noe take care of dereferencing
* the object from the caller.
*/
void referenceAndSwitchOwnershipIfNecessary(const typename ContainerTraits<C>::ContainerIdentifyer & contId,
void referenceAndSwitchOwnershipIfNecessary(const typename ContainerTraits<C>::ContainerIdentifyer & contId, QueueType queueType,
typename ContainerTraits<C>::ContainerAddress & previousOwnerAddress,
typename ContainerTraits<C>::ContainerAddress & contAddress,
typename ContainerTraits<C>::InsertedElement::list & elements, log::LogContext & lc) {
......@@ -190,7 +190,7 @@ public:
ScopedExclusiveLock contLock;
log::TimingList timingList;
utils::Timer t;
ContainerTraits<C>::getLockedAndFetched(cont, contLock, m_agentReference, contId, lc);
ContainerTraits<C>::getLockedAndFetched(cont, contLock, m_agentReference, contId, queueType, lc);
contAddress = cont.getAddressIfSet();
auto contSummaryBefore = ContainerTraits<C>::getContainerSummary(cont);
timingList.insertAndReset("queueLockFetchTime", t);
......@@ -234,14 +234,14 @@ public:
* Addition of jobs to container. Convenience overload for cases when current agent is the previous owner
* (most cases except garbage collection).
*/
void referenceAndSwitchOwnership(const typename ContainerTraits<C>::ContainerIdentifyer & contId,
void referenceAndSwitchOwnership(const typename ContainerTraits<C>::ContainerIdentifyer & contId, QueueType queueType,
typename ContainerTraits<C>::InsertedElement::list & elements, log::LogContext & lc) {
referenceAndSwitchOwnership(contId, m_agentReference.getAgentAddress(), elements, lc);
referenceAndSwitchOwnership(contId, queueType, m_agentReference.getAgentAddress(), elements, lc);
}
typename ContainerTraits<C>::PoppedElementsBatch popNextBatch(const typename ContainerTraits<C>::ContainerIdentifyer & contId,
typename ContainerTraits<C>::PopCriteria & popCriteria, log::LogContext & lc) {
QueueType queueType, typename ContainerTraits<C>::PopCriteria & popCriteria, log::LogContext & lc) {
// Prepare the return value
typename ContainerTraits<C>::PoppedElementsBatch ret;
typename ContainerTraits<C>::PopCriteria unfulfilledCriteria = popCriteria;
......@@ -258,7 +258,7 @@ public:
iterationCount++;
ScopedExclusiveLock contLock;
try {
ContainerTraits<C>::getLockedAndFetchedNoCreate(cont, contLock, contId, lc);
ContainerTraits<C>::getLockedAndFetchedNoCreate(cont, contLock, contId, queueType, lc);
} catch (typename ContainerTraits<C>::NoSuchContainer &) {
localTimingList.insertAndReset("findLockFetchQueueTime", t);
timingList+=localTimingList;
......
......@@ -94,12 +94,12 @@ TEST(ObjectStore, ArchiveQueueAlgorithms) {
ar.insert();
}
ContainerAlgorithms<ArchiveQueue> archiveAlgos(be, agentRef);
archiveAlgos.referenceAndSwitchOwnership("Tapepool", requests, lc);
archiveAlgos.referenceAndSwitchOwnership("Tapepool", QueueType::JobsToTransfer, requests, lc);
// Now get the requests back
ContainerTraits<ArchiveQueue>::PopCriteria popCriteria;
popCriteria.bytes = std::numeric_limits<decltype(popCriteria.bytes)>::max();
popCriteria.files = 100;
auto popedJobs = archiveAlgos.popNextBatch("Tapepool", popCriteria, lc);
auto popedJobs = archiveAlgos.popNextBatch("Tapepool", QueueType::JobsToTransfer, popCriteria, lc);
ASSERT_EQ(popedJobs.summary.files, 10);
}
......@@ -161,7 +161,7 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) {
rqc.mountPolicy.retrieveMinRequestAge = 1;
rqc.mountPolicy.retrievePriority = 1;
requests.emplace_back(ContainerAlgorithms<RetrieveQueue>::InsertedElement{cta::make_unique<RetrieveRequest>(rrAddr, be), 1, i, 667, mp,
serializers::RetrieveJobStatus::RJS_Pending});
serializers::RetrieveJobStatus::RJS_ToTransfer});
auto & rr=*requests.back().retrieveRequest;
rr.initialize();
rr.setRetrieveFileQueueCriteria(rqc);
......@@ -176,7 +176,7 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) {
}
ContainerAlgorithms<RetrieveQueue> retrieveAlgos(be, agentRef);
try {
retrieveAlgos.referenceAndSwitchOwnership("VID", requests, lc);
retrieveAlgos.referenceAndSwitchOwnership("VID", QueueType::JobsToTransfer, agentRef.getAgentAddress(), requests, lc);
} catch (ContainerTraits<RetrieveQueue>::OwnershipSwitchFailure & ex) {
for (auto & e: ex.failedElements) {
try {
......
......@@ -221,7 +221,7 @@ void ArchiveQueue::garbageCollect(const std::string &presumedOwner, AgentReferen
RootEntry re(m_objectStore);
ScopedSharedLock rel (re);
re.fetch();
auto tpd=re.dumpArchiveQueues(QueueType::LiveJobs);
auto tpd=re.dumpArchiveQueues(QueueType::JobsToTransfer);
for (auto tp=tpd.begin(); tp!=tpd.end(); tp++) {
if (tp->address == getAddressIfSet()) {
setOwner(re.getAddressIfSet());
......
......@@ -27,8 +27,8 @@ const std::string ContainerTraits<ArchiveQueue>::c_containerTypeName = "ArchiveQ
const std::string ContainerTraits<ArchiveQueue>::c_identifyerType = "tapepool";
void ContainerTraits<ArchiveQueue>::getLockedAndFetched(Container& cont, ScopedExclusiveLock& aqL, AgentReference& agRef,
const ContainerIdentifyer& contId, log::LogContext& lc) {
Helpers::getLockedAndFetchedQueue<Container>(cont, aqL, agRef, contId, QueueType::LiveJobs, lc);
const ContainerIdentifyer& contId, QueueType queueType, log::LogContext& lc) {
Helpers::getLockedAndFetchedQueue<Container>(cont, aqL, agRef, contId, queueType, lc);
}
void ContainerTraits<ArchiveQueue>::addReferencesAndCommit(Container& cont, InsertedElement::list& elemMemCont,
......@@ -125,14 +125,14 @@ auto ContainerTraits<ArchiveQueue>::switchElementsOwnership(InsertedElement::lis
}
void ContainerTraits<ArchiveQueue>::getLockedAndFetchedNoCreate(Container& cont, ScopedExclusiveLock& contLock,
const ContainerIdentifyer& cId, log::LogContext& lc) {
const ContainerIdentifyer& cId, QueueType queueType, log::LogContext& lc) {
// Try and get access to a queue.
size_t attemptCount = 0;
retry:
objectstore::RootEntry re(cont.m_objectStore);
re.fetchNoLock();
std::string aqAddress;
auto aql = re.dumpArchiveQueues(QueueType::LiveJobs);
auto aql = re.dumpArchiveQueues(queueType);
for (auto & aqp : aql) {
if (aqp.tapePool == cId)
aqAddress = aqp.address;
......@@ -153,7 +153,7 @@ void ContainerTraits<ArchiveQueue>::getLockedAndFetchedNoCreate(Container& cont,
ScopedExclusiveLock rexl(re);
re.fetch();
try {
re.removeArchiveQueueAndCommit(cId, QueueType::LiveJobs, lc);
re.removeArchiveQueueAndCommit(cId, queueType, lc);
log::ScopedParamContainer params(lc);
params.add("tapepool", cId)
.add("queueObject", cont.getAddressIfSet());
......@@ -261,7 +261,7 @@ void ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(Container& cont, Scope
RootEntry re(cont.m_objectStore);
ScopedExclusiveLock rexl(re);
re.fetch();
re.removeArchiveQueueAndCommit(cId, QueueType::LiveJobs, lc);
re.removeArchiveQueueAndCommit(cId, QueueType::JobsToTransfer, lc);
log::ScopedParamContainer params(lc);
params.add("tapepool", cId)
.add("queueObject", cont.getAddressIfSet());
......
......@@ -59,11 +59,11 @@ public:
template <class Element>
static ElementAddress getElementAddress(const Element & e) { return e.archiveRequest->getAddressIfSet(); }
static void getLockedAndFetched(Container & cont, ScopedExclusiveLock & aqL, AgentReference & agRef, const ContainerIdentifyer & contId,
log::LogContext & lc);
static void getLockedAndFetched(Container & cont, ScopedExclusiveLock & aqL, AgentReference & agRef,
const ContainerIdentifyer & contId, QueueType queueType, log::LogContext & lc);
static void getLockedAndFetchedNoCreate(Container & cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId,
log::LogContext & lc);
static void getLockedAndFetchedNoCreate(Container & cont, ScopedExclusiveLock & contLock,
const ContainerIdentifyer & cId, QueueType queueType, log::LogContext & lc);
static void addReferencesAndCommit(Container & cont, InsertedElement::list & elemMemCont,
AgentReference & agentRef, log::LogContext & lc);
......
......@@ -51,14 +51,14 @@ void cta::objectstore::ArchiveRequest::initialize() {
}
void cta::objectstore::ArchiveRequest::addJob(uint16_t copyNumber,
const std::string& tapepool, const std::string& archivequeueaddress,
const std::string& tapepool, const std::string& initialOwner,
uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries) {
checkPayloadWritable();
auto *j = m_payload.add_jobs();
j->set_copynb(copyNumber);
j->set_status(serializers::ArchiveJobStatus::AJS_LinkingToArchiveQueue);
j->set_status(serializers::ArchiveJobStatus::AJS_ToTransfer);
j->set_tapepool(tapepool);
j->set_owner(archivequeueaddress);
j->set_owner(initialOwner);
j->set_archivequeueaddress("");
j->set_totalretries(0);
j->set_retrieswithinmount(0);
......@@ -67,23 +67,32 @@ void cta::objectstore::ArchiveRequest::addJob(uint16_t copyNumber,
j->set_maxtotalretries(maxTotalRetries);
}
bool cta::objectstore::ArchiveRequest::setJobSuccessful(uint16_t copyNumber) {
checkPayloadWritable();
auto * jl = m_payload.mutable_jobs();
for (auto j=jl->begin(); j!=jl->end(); j++) {
if (j->copynb() == copyNumber) {
j->set_status(serializers::ArchiveJobStatus::AJS_Complete);
for (auto j2=jl->begin(); j2!=jl->end(); j2++) {
if (j2->status()!= serializers::ArchiveJobStatus::AJS_Complete &&
j2->status()!= serializers::ArchiveJobStatus::AJS_Failed)
return false;
QueueType ArchiveRequest::getJobQueueType(uint16_t copyNumber) {
checkPayloadReadable();
for (auto &j: m_payload.jobs()) {
if (j.copynb() == copyNumber) {
switch (j.status()) {
case serializers::ArchiveJobStatus::AJS_ToTransfer:
return QueueType::JobsToTransfer;
case serializers::ArchiveJobStatus::AJS_Complete:
throw JobNotQueueable("In ArchiveRequest::getJobQueueType(): Complete jobs are not queueable. They are finished and pend siblings completion.");
case serializers::ArchiveJobStatus::AJS_ToReport:
// We should report a success...
return QueueType::JobsToReport;
case serializers::ArchiveJobStatus::AJS_FailedToReport:
// We should report a failure. The report queue can be shared.
return QueueType::JobsToReport;
case serializers::ArchiveJobStatus::AJS_Failed:
return QueueType::FailedJobs;
case serializers::ArchiveJobStatus::AJS_Abandoned:
throw JobNotQueueable("In ArchiveRequest::getJobQueueType(): Abandoned jobs are not queueable. They are finished and pend siblings completion.");
}
return true;
}
}
throw NoSuchJob("In ArchiveRequest::setJobSuccessful(): job not found");
throw exception::Exception("In ArchiveRequest::getJobQueueType(): Copy number not found.");
}
bool cta::objectstore::ArchiveRequest::addJobFailure(uint16_t copyNumber,
uint64_t mountId, const std::string & failureReason, log::LogContext & lc) {
checkPayloadWritable();
......@@ -106,8 +115,7 @@ bool cta::objectstore::ArchiveRequest::addJobFailure(uint16_t copyNumber,
if (!finishIfNecessary(lc)) commit();
return true;
} else {
j.set_status(serializers::AJS_PendingMount);
commit();
j.set_status(serializers::AJS_ToTransfer);
return false;
}
}
......@@ -129,22 +137,6 @@ ArchiveRequest::RetryStatus ArchiveRequest::getRetryStatus(const uint16_t copyNu
throw cta::exception::Exception("In ArchiveRequest::getRetryStatus(): job not found()");
}
void cta::objectstore::ArchiveRequest::setAllJobsLinkingToArchiveQueue() {
checkPayloadWritable();
auto * jl=m_payload.mutable_jobs();
for (auto j=jl->begin(); j!=jl->end(); j++) {
j->set_status(serializers::AJS_LinkingToArchiveQueue);
}
}
void cta::objectstore::ArchiveRequest::setAllJobsFailed() {
checkPayloadWritable();
auto * jl=m_payload.mutable_jobs();
for (auto j=jl->begin(); j!=jl->end(); j++) {
j->set_status(serializers::AJS_Failed);
}
}
void ArchiveRequest::setArchiveFile(const cta::common::dataStructures::ArchiveFile& archiveFile) {
checkPayloadWritable();
// TODO: factor out the archivefile structure from the flat ArchiveRequest.
......@@ -311,24 +303,18 @@ auto ArchiveRequest::dumpJobs() -> std::list<ArchiveRequest::JobDump> {
void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) {
checkPayloadWritable();
// The behavior here depends on which job the agent is supposed to own.
// We should first find this job (if any). This is for covering the case
// of a selected job. The Request could also still being connected to tape
// pools. In this case we will finish the connection to tape pools unconditionally.
// We need to find which job(s) we should actually work on. The job(s) will be
// requeued to the relevant queues depending on their statuses.
auto * jl = m_payload.mutable_jobs();
bool anythingGarbageCollected=false;
using serializers::ArchiveJobStatus;
std::set<ArchiveJobStatus> statusesImplyingQueueing ({ArchiveJobStatus::AJS_ToTransfer,
ArchiveJobStatus::AJS_ToReport, ArchiveJobStatus::AJS_Failed});
for (auto j=jl->begin(); j!=jl->end(); j++) {
auto owner=j->owner();
auto status=j->status();
if (status==serializers::AJS_LinkingToArchiveQueue ||
( (status==serializers::AJS_Selected || status==serializers::AJS_PendingMount)
&& owner==presumedOwner)) {
// If the job was being connected to the tape pool or was selected
// by the dead agent, then we have to ensure it is indeed connected to
// the tape pool and set its status to pending.
// (Re)connect the job to the tape pool and make it pending.
// If we fail to reconnect, we have to fail the job and potentially
// finish the request.
if ( statusesImplyingQueueing.count(status) && owner==presumedOwner) {
// The job is in a state which implies queuing.
std::string queueObject="Not defined yet";
anythingGarbageCollected=true;
try {
......@@ -337,7 +323,7 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer
// recreated (this will be done by helper).
ArchiveQueue aq(m_objectStore);
ScopedExclusiveLock aql;
Helpers::getLockedAndFetchedQueue<ArchiveQueue>(aq, aql, agentReference, j->tapepool(), QueueType::LiveJobs, lc);
Helpers::getLockedAndFetchedQueue<ArchiveQueue>(aq, aql, agentReference, j->tapepool(), getQueueType(status), lc);
queueObject=aq.getAddressIfSet();
ArchiveRequest::JobDump jd;
jd.copyNb = j->copynb();
......@@ -350,7 +336,6 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer
aq.addJobsIfNecessaryAndCommit(jta, agentReference, lc);
auto queueUpdateTime = t.secs(utils::Timer::resetCounter);
j->set_owner(aq.getAddressIfSet());
j->set_status(serializers::AJS_PendingMount);
commit();
aql.release();
auto commitUnlockQueueTime = t.secs(utils::Timer::resetCounter);
......@@ -405,21 +390,11 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer
} catch (...) {
params.add("exceptionType", "unknown");
}
// This could be the end of the request, with various consequences.
// This is handled here:
if (finishIfNecessary(lc)) {
std::string message="In ArchiveRequest::garbageCollect(): failed to requeue the job. Failed it and removed the request as a consequence.";
if (backtrace.size()) message += " Backtrace follows.";
lc.log(log::ERR, message);
if (backtrace.size()) lc.logBacktrace(log::ERR, backtrace);
return;
} else {
commit();
lc.log(log::ERR, "In ArchiveRequest::garbageCollect(): failed to requeue the job and failed it.");
lc.log(log::ERR, "In ArchiveRequest::garbageCollect(): failed to requeue the job and failed it. Internal error: the job is now orphaned.");
}
}
}
}
if (!anythingGarbageCollected) {
log::ScopedParamContainer params(lc);
params.add("jobObject", getAddressIfSet())
......@@ -563,13 +538,18 @@ ArchiveRequest::AsyncJobSuccessfulUpdater * ArchiveRequest::asyncUpdateJobSucces
if (j2->status()!= serializers::ArchiveJobStatus::AJS_Complete &&
j2->status()!= serializers::ArchiveJobStatus::AJS_Failed) {
retRef.m_isLastJob = false;
// The complete but not last job have now finished its
// lifecycle, and will get dereferenced.
j->set_owner("");
oh.set_payload(payload.SerializePartialAsString());
return oh.SerializeAsString();
}
}
retRef.m_isLastJob = true;
// If this is the last job, we indeed need to set the status to ToReport.
j->set_status(serializers::ArchiveJobStatus::AJS_ToReport);
oh.set_payload(payload.SerializePartialAsString());
throw cta::objectstore::Backend::AsyncUpdateWithDelete(oh.SerializeAsString());
return oh.SerializeAsString();
}
}
std::stringstream err;
......@@ -593,16 +573,32 @@ std::string ArchiveRequest::getJobOwner(uint16_t copyNumber) {
return j->owner();
}
QueueType ArchiveRequest::getQueueType(const serializers::ArchiveJobStatus& status) {
using serializers::ArchiveJobStatus;
switch(status) {
case ArchiveJobStatus::AJS_ToTransfer:
return QueueType::JobsToTransfer;
case ArchiveJobStatus::AJS_ToReport:
return QueueType::JobsToReport;
case ArchiveJobStatus::AJS_Failed:
return QueueType::FailedJobs;
default:
throw cta::exception::Exception("In ArchiveRequest::getQueueType(): invalid status for queueing.");
}
}
std::string ArchiveRequest::statusToString(const serializers::ArchiveJobStatus& status) {
switch(status) {
case serializers::ArchiveJobStatus::AJS_ToTransfer:
return "ToTransfer";
case serializers::ArchiveJobStatus::AJS_ToReport:
return "ToReport";
case serializers::ArchiveJobStatus::AJS_Complete:
return "Complete";
case serializers::ArchiveJobStatus::AJS_Failed:
return "Failed";
case serializers::ArchiveJobStatus::AJS_LinkingToArchiveQueue:
return "LinkingToArchiveQueue";
case serializers::ArchiveJobStatus::AJS_PendingMount:
return "PendingMount";
case serializers::ArchiveJobStatus::AJS_Abandoned:
return "Abandoned";
default:
return std::string("Unknown (")+std::to_string((uint64_t) status)+")";
}
......
......@@ -24,6 +24,7 @@
#include "common/dataStructures/MountPolicy.hpp"
#include "common/dataStructures/UserIdentity.hpp"
#include "common/dataStructures/ArchiveFile.hpp"
#include "QueueType.hpp"
#include "common/Timer.hpp"
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
......@@ -42,9 +43,12 @@ public:
ArchiveRequest(Backend & os);
ArchiveRequest(GenericObject & go);
void initialize();
// Ownership of archive requests is managed per job. Object level owner has no meaning.
std::string getOwner() = delete;
void setOwner(const std::string &) = delete;
// Job management ============================================================
void addJob(uint16_t copyNumber, const std::string & tapepool,
const std::string & archivequeueaddress, uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries);
const std::string & initialOwner, uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries);
void setJobSelected(uint16_t copyNumber, const std::string & owner);
void setJobPending(uint16_t copyNumber);
bool setJobSuccessful(uint16_t copyNumber); //< returns true if this is the last job
......@@ -61,10 +65,8 @@ public:
std::string statusToString(const serializers::ArchiveJobStatus & status);
bool finishIfNecessary(log::LogContext & lc);/**< Handling of the consequences of a job status change for the entire request.
* This function returns true if the request got finished. */
// Mark all jobs as pending mount (following their linking to a tape pool)
void setAllJobsLinkingToArchiveQueue();
// Mark all the jobs as being deleted, in case of a cancellation
void setAllJobsFailed();
CTA_GENERATE_EXCEPTION_CLASS(JobNotQueueable);
QueueType getJobQueueType(uint16_t copyNumber);
CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob);
// Set a job ownership
void setJobOwner(uint16_t copyNumber, const std::string & owner);
......@@ -110,6 +112,10 @@ public:
// Get a job owner
std::string getJobOwner(uint16_t copyNumber);
// Utility to convert status to queue type
static QueueType getQueueType(const serializers::ArchiveJobStatus &status);
// Request management ========================================================
void setSuccessful();
void setFailed();
......
......@@ -63,6 +63,7 @@ add_library (ctaobjectstore SHARED
ArchiveQueueAlgorithms.cpp
RetrieveQueue.cpp
RetrieveQueueShard.cpp
QueueType.cpp
ArchiveRequest.cpp
RetrieveRequest.cpp
DriveRegister.cpp
......
......@@ -264,9 +264,7 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std::
// 3 Now decide the fate of each fetched and owned object.
bool ownershipUdated=false;
using serializers::ArchiveJobStatus;
std::set<ArchiveJobStatus> inactiveArchiveJobStatuses({ArchiveJobStatus::AJS_Complete, ArchiveJobStatus::AJS_Failed});
using serializers::RetrieveJobStatus;
std::set<RetrieveJobStatus> inactiveRetrieveJobStatuses({RetrieveJobStatus::RJS_Complete, RetrieveJobStatus::RJS_Failed});
for (auto & obj: fetchedObjects) {
log::ScopedParamContainer params2(lc);
params2.add("objectAddress", obj->getAddressIfSet());
......@@ -297,15 +295,16 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std::
obj.reset();
bool jobRequeued=false;
for (auto &j: ar->dumpJobs()) {
if ((j.owner == agent.getAddressIfSet() || ar->getOwner() == agent.getAddressIfSet())
&& !inactiveArchiveJobStatuses.count(j.status)) {
archiveQueuesAndRequests[j.tapePool].emplace_back(ar);
log::ScopedParamContainer params3(lc);
params3.add("tapepool", j.tapePool)
.add("copynb", j.copyNb)
.add("fileId", ar->getArchiveFile().archiveFileID);
lc.log(log::INFO, "Selected archive request for requeueing to tape pool");
jobRequeued=true;
if ((j.owner == agent.getAddressIfSet())) {
try {
archiveQueuesAndRequests[std::make_tuple(j.tapePool, ar->getJobQueueType(j.copyNb))].emplace_back(ar);
log::ScopedParamContainer params3(lc);
params3.add("tapepool", j.tapePool)
.add("copynb", j.copyNb)
.add("fileId", ar->getArchiveFile().archiveFileID);
lc.log(log::INFO, "Selected archive request for requeueing to tape pool");
jobRequeued=true;
} catch (ArchiveRequest::JobNotQueueable &) {}
}
}
if (!jobRequeued) {
......@@ -323,17 +322,25 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std::
// Get the list of vids for non failed tape files.
std::set<std::string> candidateVids;
for (auto & j: rr->dumpJobs()) {
if (!inactiveRetrieveJobStatuses.count(j.status)) {
if(j.status==RetrieveJobStatus::RJS_ToTransfer) {
candidateVids.insert(rr->getArchiveFile().tapeFiles.at(j.copyNb).vid);
}
}
// Small parenthesis for non transfer cases.
if (candidateVids.empty()) {
log::ScopedParamContainer params3(lc);
params3.add("fileId", rr->getArchiveFile().archiveFileID);
lc.log(log::INFO, "No active retrieve job to requeue found. Marking request for normal GC (and probably deletion).");
otherObjects.emplace_back(new GenericObject(rr->getAddressIfSet(), objectStore));
break;
// The request might need to be added to the failed to report of failed queue/container.
try {
retrieveQueuesAndRequests[std::make_tuple(rr->getArchiveFile().tapeFiles.begin()->second.vid, rr->getQueueType())].emplace_back(rr);
} catch (cta::exception::Exception & ex) {
log::ScopedParamContainer params3(lc);
params3.add("fileId", rr->getArchiveFile().archiveFileID)
.add("exceptionMessage", ex.getMessageValue());
lc.log(log::ERR, "Failed to determine destination queue for retrieve request. Marking request for normal GC (and probably deletion).");
otherObjects.emplace_back(new GenericObject(rr->getAddressIfSet(), objectStore));
break;
}
}
// Back to the transfer case.
std::string vid;
try {
vid=Helpers::selectBestRetrieveQueue(candidateVids, catalogue, objectStore);
......@@ -344,7 +351,7 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std::
otherObjects.emplace_back(new GenericObject(rr->getAddressIfSet(), objectStore));
break;
}
retrieveQueuesAndRequests[vid].emplace_back(rr);
retrieveQueuesAndRequests[std::make_tuple(vid, QueueType::JobsToTransfer)].emplace_back(rr);
log::ScopedParamContainer params3(lc);
// Find copyNb for logging
size_t copyNb = std::numeric_limits<size_t>::max();
......@@ -377,15 +384,19 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a
// and validate ownership.
//
// 1) Get the archive requests done.
for (auto & tapepool: archiveQueuesAndRequests) {
for (auto & archiveQueueIdAndReqs: archiveQueuesAndRequests) {
// The number of objects to requeue could be very high. In order to limit the time taken by the
// individual requeue operations, we limit the number of concurrently requeued objects to an
// arbitrary 500.
while (tapepool.second.size()) {
decltype (tapepool.second) currentJobBatch;
while (tapepool.second.size() && currentJobBatch.size() <= 500) {
currentJobBatch.emplace_back(std::move(tapepool.second.front()));
tapepool.second.pop_front();
std::string tapepool;
QueueType queueType;
std::tie(tapepool, queueType) = archiveQueueIdAndReqs.first;
auto & requestsList = archiveQueueIdAndReqs.second;
while (requestsList.size()) {
decltype (archiveQueueIdAndReqs.second) currentJobBatch;
while (requestsList.size() && currentJobBatch.size() <= 500) {
currentJobBatch.emplace_back(std::move(requestsList.front()));
requestsList.pop_front();
}
utils::Timer t;
typedef ContainerAlgorithms<ArchiveQueue> AqAlgos;
......@@ -394,7 +405,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a
for (auto & ar: currentJobBatch) {
// Determine the copy number and feed the queue with it.
for (auto &j: ar->dumpJobs()) {
if (j.tapePool == tapepool.first) {
if (j.tapePool == tapepool) {
jobsToAdd.push_back({ar, j.copyNb, ar->getArchiveFile(), ar->getMountPolicy()});
}
}
......@@ -403,7 +414,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a
std::set<std::string> jobsNotRequeued;
std::string queueAddress;
try {
aqcl.referenceAndSwitchOwnershipIfNecessary(tapepool.first, agent.getAddressIfSet(), queueAddress, jobsToAdd, lc);
aqcl.referenceAndSwitchOwnershipIfNecessary(tapepool, queueType, agent.getAddressIfSet(), queueAddress, jobsToAdd, lc);
} catch (AqAlgos::OwnershipSwitchFailure & failure) {
for (auto &failedAR: failure.failedElements) {
try {
......@@ -450,7 +461,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a
params.add("archiveRequestObject", arup.archiveRequest->getAddressIfSet())
.add("copyNb", arup.copyNb)
.add("fileId", arup.archiveRequest->getArchiveFile().archiveFileID)
.add("tapepool", tapepool.first)
.add("tapepool", tapepool)
.add("archiveQueueObject", queueAddress)
.add("garbageCollectedPreviousOwner", agent.getAddressIfSet());
lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): requeued archive job.");
......@@ -486,7 +497,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a