Commit 391ca9a8 authored by Eric Cano's avatar Eric Cano
Browse files

Reworked ArchiveRequest jobs lifecycles.

Changed the lifecycle of the ArchiveRequest to handle the various
combinations of several jobs and their respective success/failures.
Most notably, the request now holds a reportdecided boolan, which
is set when decing to report. This happens when failing to archive
one copy (first failure), or when all copies are transferred (success
for all copies).

Added support for in-mount retries. On falure, the job will be
requeued (with a chance to pick it up again) in the same session
if sane session retries are not exceeded. Otherwise, the job is
left owned by the session, to be picked up by the garbage collector
at tape unmount.

Made disk reporter generic, dealing with both success and failure.
Improved mount policy support fir queueing.

Expanded information avaible in popped element from archive queues.

Added optional parameters to ArchiveRequest::asyncUpdateJobOwner() to
cover various cases.

Updated the archive job statuses.

Clarified naming of functions (transfer/report failure instead of bare
\"failure\").

Updated garbage collector for new archive job statuses.

Added support for report retries and batch reporting in the scheduler
database.

Updated obsolete wording in MigrationReportPacker log messages and error
counts.
parent 2de7844a
......@@ -22,7 +22,7 @@ namespace cta { namespace eos {
class DiskReporter {
public:
virtual void asyncReportArchiveFullyComplete() = 0;
virtual void asyncReport() = 0;
virtual ~DiskReporter() {};
};
......
......@@ -26,7 +26,7 @@ namespace cta { namespace eos {
EOSReporter::EOSReporter(const std::string& hostURL, const std::string& queryValue, std::promise<void>& reporterState):
m_fs(hostURL), m_query(queryValue), m_reporterState(reporterState) {}
void EOSReporter::asyncReportArchiveFullyComplete() {
void EOSReporter::asyncReport() {
auto qcOpaque = XrdCl::QueryCode::OpaqueFile;
XrdCl::Buffer arg (m_query.size());
arg.FromString(m_query);
......
......@@ -29,7 +29,7 @@ const uint16_t CTA_EOS_QUERY_TIMEOUT = 15; // Timeout in seconds that is rounded
class EOSReporter: public DiskReporter, public XrdCl::ResponseHandler {
public:
EOSReporter(const std::string & hostURL, const std::string & queryValue, std::promise<void> &reporterState);
void asyncReportArchiveFullyComplete() override;
void asyncReport() override;
private:
XrdCl::FileSystem m_fs;
std::string m_query;
......
......@@ -25,7 +25,7 @@ namespace cta { namespace eos {
class NullReporter: public DiskReporter {
public:
NullReporter() {};
void asyncReportArchiveFullyComplete() override {};
void asyncReport() override {};
};
}} // namespace cta::disk
\ No newline at end of file
/*
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
......@@ -185,9 +185,18 @@ public:
}
}
/**
* Addition of jobs to container. Convenience overload for cases when current agent is the previous owner
* (most cases except garbage collection).
*/
void referenceAndSwitchOwnership(const typename ContainerTraits<C>::ContainerIdentifyer & contId, QueueType queueType,
typename ContainerTraits<C>::InsertedElement::list & elements, log::LogContext & lc) {
referenceAndSwitchOwnership(contId, queueType, m_agentReference.getAgentAddress(), elements, lc);
}
/** Reference objects in the container if needed and then switch their ownership (if needed). Objects
* are expected to be owned by agent, and not listed in the container but situations might vary.
* This function is typically used by the garbage collector. We do noe take care of dereferencing
* This function is typically used by the garbage collector. We do not take care of dereferencing
* the object from the caller.
*/
void referenceAndSwitchOwnershipIfNecessary(const typename ContainerTraits<C>::ContainerIdentifyer & contId, QueueType queueType,
......@@ -238,16 +247,6 @@ public:
}
}
/**
* Addition of jobs to container. Convenience overload for cases when current agent is the previous owner
* (most cases except garbage collection).
*/
void referenceAndSwitchOwnership(const typename ContainerTraits<C>::ContainerIdentifyer & contId, QueueType queueType,
typename ContainerTraits<C>::InsertedElement::list & elements, log::LogContext & lc) {
referenceAndSwitchOwnership(contId, queueType, m_agentReference.getAgentAddress(), elements, lc);
}
typename ContainerTraits<C>::PoppedElementsBatch popNextBatch(const typename ContainerTraits<C>::ContainerIdentifyer & contId,
QueueType queueType, typename ContainerTraits<C>::PopCriteria & popCriteria, log::LogContext & lc) {
// Prepare the return value
......
......@@ -63,6 +63,7 @@ TEST(ObjectStore, ArchiveQueueAlgorithms) {
agent.initialize();
agent.insertAndRegisterSelf(lc);
ContainerAlgorithms<ArchiveQueue>::InsertedElement::list requests;
std::list<std::unique_ptr<cta::objectstore::ArchiveRequest>> archiveRequests;
for (size_t i=0; i<10; i++) {
std::string arAddr = agentRef.nextId("ArchiveRequest");
agentRef.addToOwnership(arAddr, be);
......@@ -79,12 +80,14 @@ TEST(ObjectStore, ArchiveQueueAlgorithms) {
aFile.diskInstance = "eoseos";
aFile.fileSize = 667;
aFile.storageClass = "sc";
requests.emplace_back(ContainerAlgorithms<ArchiveQueue>::InsertedElement{cta::make_unique<ArchiveRequest>(arAddr, be), 1, aFile, mp});
archiveRequests.emplace_back(new cta::objectstore::ArchiveRequest(arAddr, be));
requests.emplace_back(ContainerAlgorithms<ArchiveQueue>::InsertedElement{archiveRequests.back().get(), 1, aFile, mp,
cta::nullopt});
auto & ar=*requests.back().archiveRequest;
auto copyNb = requests.back().copyNb;
ar.initialize();
ar.setArchiveFile(aFile);
ar.addJob(copyNb, "TapePool0", agentRef.getAgentAddress(), 1, 1);
ar.addJob(copyNb, "TapePool0", agentRef.getAgentAddress(), 1, 1, 1);
ar.setMountPolicy(mp);
ar.setArchiveReportURL("");
ar.setArchiveErrorReportURL("");
......
......@@ -40,8 +40,13 @@ void ContainerTraits<ArchiveQueue>::addReferencesAndCommit(Container& cont, Inse
jd.tapePool = cont.getTapePool();
jd.owner = cont.getAddressIfSet();
ArchiveRequest & ar = *e.archiveRequest;
cta::common::dataStructures::MountPolicy mp;
if (e.mountPolicy)
mp=*e.mountPolicy;
else
mp=cta::common::dataStructures::MountPolicy();
jobsToAdd.push_back({jd, ar.getAddressIfSet(), e.archiveFile.archiveFileID, e.archiveFile.fileSize,
e.mountPolicy, time(nullptr)});
mp, time(nullptr)});
}
cont.addJobsAndCommit(jobsToAdd, agentRef, lc);
}
......@@ -55,8 +60,13 @@ void ContainerTraits<ArchiveQueue>::addReferencesIfNecessaryAndCommit(Container&
jd.tapePool = cont.getTapePool();
jd.owner = cont.getAddressIfSet();
ArchiveRequest & ar = *e.archiveRequest;
cta::common::dataStructures::MountPolicy mp;
if (e.mountPolicy)
mp=*e.mountPolicy;
else
mp=cta::common::dataStructures::MountPolicy();
jobsToAdd.push_back({jd, ar.getAddressIfSet(), e.archiveFile.archiveFileID, e.archiveFile.fileSize,
e.mountPolicy, time(nullptr)});
mp, time(nullptr)});
}
cont.addJobsIfNecessaryAndCommit(jobsToAdd, agentRef, lc);
}
......@@ -110,7 +120,7 @@ auto ContainerTraits<ArchiveQueue>::switchElementsOwnership(InsertedElement::lis
for (auto & e: elemMemCont) {
ArchiveRequest & ar = *e.archiveRequest;
auto copyNb = e.copyNb;
updaters.emplace_back(ar.asyncUpdateJobOwner(copyNb, contAddress, previousOwnerAddress));
updaters.emplace_back(ar.asyncUpdateJobOwner(copyNb, contAddress, previousOwnerAddress, cta::nullopt));
}
timingList.insertAndReset("asyncUpdateLaunchTime", t);
auto u = updaters.begin();
......@@ -198,8 +208,17 @@ auto ContainerTraits<ArchiveQueue>::getPoppingElementsCandidates(Container& cont
PoppedElementsBatch ret;
auto candidateJobsFromQueue=cont.getCandidateList(unfulfilledCriteria.bytes, unfulfilledCriteria.files, elemtsToSkip);
for (auto &cjfq: candidateJobsFromQueue.candidates) {
ret.elements.emplace_back(PoppedElement{cta::make_unique<ArchiveRequest>(cjfq.address, cont.m_objectStore), cjfq.copyNb, cjfq.size,
common::dataStructures::ArchiveFile(), "", "", "", });
ret.elements.emplace_back(PoppedElement());
ContainerTraits<ArchiveQueue>::PoppedElement & elem = ret.elements.back();
elem.archiveRequest = cta::make_unique<ArchiveRequest>(cjfq.address, cont.m_objectStore);
elem.copyNb = cjfq.copyNb;
elem.bytes = cjfq.size;
elem.archiveFile = common::dataStructures::ArchiveFile();
elem.srcURL = "";
elem.archiveReportURL = "";
elem.errorReportURL = "";
elem.latestError = "";
elem.reportType = SchedulerDatabase::ArchiveJob::ReportType::NoReportRequired;
ret.summary.bytes += cjfq.size;
ret.summary.files++;
}
......@@ -211,8 +230,17 @@ auto ContainerTraits<ArchiveQueueToReport>::getPoppingElementsCandidates(Contain
PoppedElementsBatch ret;
auto candidateJobsFromQueue=cont.getCandidateList(std::numeric_limits<uint64_t>::max(), unfulfilledCriteria.files, elemtsToSkip);
for (auto &cjfq: candidateJobsFromQueue.candidates) {
ret.elements.emplace_back(PoppedElement{cta::make_unique<ArchiveRequest>(cjfq.address, cont.m_objectStore), cjfq.copyNb, cjfq.size,
common::dataStructures::ArchiveFile(), "", "", "", });
ret.elements.emplace_back(PoppedElement());
ContainerTraits<ArchiveQueue>::PoppedElement & elem = ret.elements.back();
elem.archiveRequest = cta::make_unique<ArchiveRequest>(cjfq.address, cont.m_objectStore);
elem.copyNb = cjfq.copyNb;
elem.bytes = cjfq.size;
elem.archiveFile = common::dataStructures::ArchiveFile();
elem.srcURL = "";
elem.archiveReportURL = "";
elem.errorReportURL = "";
elem.latestError = "";
elem.reportType = SchedulerDatabase::ArchiveJob::ReportType::NoReportRequired;
ret.summary.files++;
}
return ret;
......
......@@ -19,6 +19,7 @@
#pragma once
#include "Algorithms.hpp"
#include "ArchiveQueue.hpp"
#include "common/optional.hpp"
namespace cta { namespace objectstore {
......@@ -32,10 +33,11 @@ public:
static const std::string c_containerTypeName; //= "ArchiveQueue";
static const std::string c_identifyerType; // = "tapepool";
struct InsertedElement {
std::shared_ptr<ArchiveRequest> archiveRequest;
ArchiveRequest* archiveRequest;
uint16_t copyNb;
cta::common::dataStructures::ArchiveFile archiveFile;
cta::common::dataStructures::MountPolicy mountPolicy;
cta::optional<cta::common::dataStructures::MountPolicy> mountPolicy;
cta::optional<serializers::ArchiveJobStatus> newStatus;
typedef std::list<InsertedElement> list;
};
......@@ -91,9 +93,11 @@ public:
uint16_t copyNb;
uint64_t bytes;
common::dataStructures::ArchiveFile archiveFile;
std::string srcURL;
std::string archiveReportURL;
std::string errorReportURL;
std::string srcURL;
std::string latestError;
SchedulerDatabase::ArchiveJob::ReportType reportType;
};
class PoppedElementsSummary;
class PopCriteria {
......@@ -145,7 +149,7 @@ public:
for (auto & e: popedElementBatch.elements) {
ArchiveRequest & ar = *e.archiveRequest;
auto copyNb = e.copyNb;
updaters.emplace_back(ar.asyncUpdateJobOwner(copyNb, contAddress, previousOwnerAddress));
updaters.emplace_back(ar.asyncUpdateJobOwner(copyNb, contAddress, previousOwnerAddress, cta::nullopt));
}
timingList.insertAndReset("asyncUpdateLaunchTime", t);
auto u = updaters.begin();
......@@ -158,6 +162,7 @@ public:
e->archiveReportURL = u->get()->getArchiveReportURL();
e->errorReportURL = u->get()->getArchiveErrorReportURL();
e->srcURL = u->get()->getSrcURL();
//if (u->get()->)
} catch (...) {
ret.push_back(OpFailure<PoppedElement>());
ret.back().element = &(*e);
......
......@@ -46,13 +46,15 @@ cta::objectstore::ArchiveRequest::ArchiveRequest(GenericObject& go):
void cta::objectstore::ArchiveRequest::initialize() {
// Setup underlying object
ObjectOps<serializers::ArchiveRequest, serializers::ArchiveRequest_t>::initialize();
// Setup the fields in the payload
m_payload.set_reportdecided(false);
// This object is good to go (to storage)
m_payloadInterpreted = true;
}
void cta::objectstore::ArchiveRequest::addJob(uint16_t copyNumber,
const std::string& tapepool, const std::string& initialOwner,
uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries) {
uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries, uint16_t maxReportRetries) {
checkPayloadWritable();
auto *j = m_payload.add_jobs();
j->set_copynb(copyNumber);
......@@ -65,6 +67,8 @@ void cta::objectstore::ArchiveRequest::addJob(uint16_t copyNumber,
j->set_lastmountwithfailure(0);
j->set_maxretrieswithinmount(maxRetiesWithinMount);
j->set_maxtotalretries(maxTotalRetries);
j->set_totalreportretries(0);
j->set_maxreportretries(maxReportRetries);
}
QueueType ArchiveRequest::getJobQueueType(uint16_t copyNumber) {
......@@ -76,10 +80,10 @@ QueueType ArchiveRequest::getJobQueueType(uint16_t copyNumber) {
return QueueType::JobsToTransfer;
case serializers::ArchiveJobStatus::AJS_Complete:
throw JobNotQueueable("In ArchiveRequest::getJobQueueType(): Complete jobs are not queueable. They are finished and pend siblings completion.");
case serializers::ArchiveJobStatus::AJS_ToReport:
case serializers::ArchiveJobStatus::AJS_ToReportForTransfer:
// We should report a success...
return QueueType::JobsToReport;
case serializers::ArchiveJobStatus::AJS_FailedToReport:
case serializers::ArchiveJobStatus::AJS_ToReportForFailure:
// We should report a failure. The report queue can be shared.
return QueueType::JobsToReport;
case serializers::ArchiveJobStatus::AJS_Failed:
......@@ -93,11 +97,10 @@ QueueType ArchiveRequest::getJobQueueType(uint16_t copyNumber) {
}
bool cta::objectstore::ArchiveRequest::addJobFailure(uint16_t copyNumber,
uint64_t mountId, const std::string & failureReason, log::LogContext & lc) {
auto cta::objectstore::ArchiveRequest::addTransferFailure(uint16_t copyNumber,
uint64_t mountId, const std::string & failureReason, log::LogContext & lc) -> EnqueueingNextStep {
checkPayloadWritable();
// Find the job and update the number of failures
// (and return the job status: failed (true) or to be retried (false))
// Find the job and update the number of failures
for (size_t i=0; i<(size_t)m_payload.jobs_size(); i++) {
auto &j=*m_payload.mutable_jobs(i);
if (j.copynb() == copyNumber) {
......@@ -111,13 +114,44 @@ bool cta::objectstore::ArchiveRequest::addJobFailure(uint16_t copyNumber,
* j.mutable_failurelogs()->Add() = failureReason;
}
if (j.totalretries() >= j.maxtotalretries()) {
j.set_status(serializers::AJS_Failed);
if (!finishIfNecessary(lc)) commit();
return true;
// We have to determine if this was the last copy to fail/succeed.
return determineNextStep(copyNumber, JobEvent::TransferFailed, lc);
} else {
j.set_status(serializers::AJS_ToTransfer);
return false;
EnqueueingNextStep ret;
ret.nextStatus = serializers::ArchiveJobStatus::AJS_ToTransfer;
// Decide if we want the job to have a chance to come back to this mount (requeue) or not. In the latter
// case, the job will remain owned by this session and get garbage collected.
if (j.retrieswithinmount() >= j.maxretrieswithinmount())
ret.nextStep = EnqueueingNextStep::NextStep::Nothing;
else
ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForTransfer;
return ret;
}
}
throw NoSuchJob ("In ArchiveRequest::addJobFailure(): could not find job");
}
auto ArchiveRequest::addReportFailure(uint16_t copyNumber, uint64_t sessionId, const std::string& failureReason,
log::LogContext& lc) -> EnqueueingNextStep {
checkPayloadWritable();
// Find the job and update the number of failures
for (size_t i=0; i<(size_t)m_payload.jobs_size(); i++) {
auto &j=*m_payload.mutable_jobs(i);
if (j.copynb() == copyNumber) {
j.set_totalreportretries(j.totalreportretries() + 1);
* j.mutable_reportfailurelogs()->Add() = failureReason;
}
EnqueueingNextStep ret;
if (j.totalreportretries() >= j.maxreportretries()) {
// Status is now failed
ret.nextStatus = serializers::ArchiveJobStatus::AJS_Failed;
ret.nextStep = EnqueueingNextStep::NextStep::StoreInFailedJobsContainer;
} else {
// Status is unchanged
ret.nextStatus = j.status();
ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForReport;
}
return ret;
}
throw NoSuchJob ("In ArchiveRequest::addJobFailure(): could not find job");
}
......@@ -308,8 +342,8 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer
auto * jl = m_payload.mutable_jobs();
bool anythingGarbageCollected=false;
using serializers::ArchiveJobStatus;
std::set<ArchiveJobStatus> statusesImplyingQueueing ({ArchiveJobStatus::AJS_ToTransfer,
ArchiveJobStatus::AJS_ToReport, ArchiveJobStatus::AJS_Failed});
std::set<ArchiveJobStatus> statusesImplyingQueueing ({ArchiveJobStatus::AJS_ToTransfer, ArchiveJobStatus::AJS_ToReportForTransfer,
ArchiveJobStatus::AJS_ToReportForFailure, ArchiveJobStatus::AJS_Failed});
for (auto j=jl->begin(); j!=jl->end(); j++) {
auto owner=j->owner();
auto status=j->status();
......@@ -418,12 +452,12 @@ void ArchiveRequest::setJobOwner(
}
ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint16_t copyNumber,
const std::string& owner, const std::string& previousOwner) {
const std::string& owner, const std::string& previousOwner, const cta::optional<serializers::ArchiveJobStatus>& newStatus) {
std::unique_ptr<AsyncJobOwnerUpdater> ret(new AsyncJobOwnerUpdater);
// Passing a reference to the unique pointer led to strange behaviors.
auto & retRef = *ret;
ret->m_updaterCallback=
[this, copyNumber, owner, previousOwner, &retRef](const std::string &in)->std::string {
[this, copyNumber, owner, previousOwner, &retRef, newStatus](const std::string &in)->std::string {
// We have a locked and fetched object, so we just need to work on its representation.
retRef.m_timingReport.lockFetchTime = retRef.m_timer.secs(utils::Timer::resetCounter);
serializers::ObjectHeader oh;
......@@ -456,6 +490,8 @@ ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint16
}
j->set_owner(owner);
}
// If a status change was requested, do it.
if (newStatus) j->set_status(*newStatus);
// We also need to gather all the job content for the user to get in-memory
// representation.
// TODO this is an unfortunate duplication of the getXXX() members of ArchiveRequest.
......@@ -476,6 +512,10 @@ ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint16
retRef.m_archiveReportURL = payload.archivereporturl();
retRef.m_archiveErrorReportURL = payload.archiveerrorreporturl();
retRef.m_srcURL = payload.srcurl();
if (j->failurelogs_size()) {
retRef.m_latestError = j->failurelogs(j->failurelogs_size()-1);
}
retRef.m_jobStatus = j->status();
oh.set_payload(payload.SerializePartialAsString());
retRef.m_timingReport.processTime = retRef.m_timer.secs(utils::Timer::resetCounter);
return oh.SerializeAsString();
......@@ -514,8 +554,8 @@ const std::string& ArchiveRequest::AsyncJobOwnerUpdater::getSrcURL() {
return m_srcURL;
}
ArchiveRequest::AsyncJobSuccessfulUpdater * ArchiveRequest::asyncUpdateJobSuccessful(const uint16_t copyNumber ) {
std::unique_ptr<AsyncJobSuccessfulUpdater> ret(new AsyncJobSuccessfulUpdater);
ArchiveRequest::AsyncTransferSuccessfulUpdater * ArchiveRequest::asyncUpdateTransferSuccessful(const uint16_t copyNumber ) {
std::unique_ptr<AsyncTransferSuccessfulUpdater> ret(new AsyncTransferSuccessfulUpdater);
// Passing a reference to the unique pointer led to strange behaviors.
auto & retRef = *ret;
ret->m_updaterCallback=
......@@ -531,23 +571,25 @@ ArchiveRequest::AsyncJobSuccessfulUpdater * ArchiveRequest::asyncUpdateJobSucces
serializers::ArchiveRequest payload;
payload.ParseFromString(oh.payload());
auto * jl = payload.mutable_jobs();
bool otherJobsToTransfer = false;
for (auto j=jl->begin(); j!=jl->end(); j++) {
if (j->copynb() != copyNumber && j->status() == serializers::ArchiveJobStatus::AJS_ToTransfer)
otherJobsToTransfer = true;
}
for (auto j=jl->begin(); j!=jl->end(); j++) {
if (j->copynb() == copyNumber) {
j->set_status(serializers::ArchiveJobStatus::AJS_Complete);
for (auto j2=jl->begin(); j2!=jl->end(); j2++) {
if (j2->status()!= serializers::ArchiveJobStatus::AJS_Complete &&
j2->status()!= serializers::ArchiveJobStatus::AJS_Failed) {
retRef.m_isLastJob = false;
// The complete but not last job have now finished its
// lifecycle, and will get dereferenced.
j->set_owner("");
oh.set_payload(payload.SerializePartialAsString());
return oh.SerializeAsString();
}
if (otherJobsToTransfer || m_payload.reportdecided()) {
// There are still other jobs to report or another failed and will
// report. No next step for this job.
j->set_status(serializers::ArchiveJobStatus::AJS_Complete);
j->set_owner("");
retRef.m_doReportTransferSuccess = false;
} else {
// We will report success with this job as it is the last to transfer and no report (for failure)
// happened.
j->set_status(serializers::ArchiveJobStatus::AJS_ToReportForTransfer);
retRef.m_doReportTransferSuccess = true;
}
retRef.m_isLastJob = true;
// If this is the last job, we indeed need to set the status to ToReport.
j->set_status(serializers::ArchiveJobStatus::AJS_ToReport);
oh.set_payload(payload.SerializePartialAsString());
return oh.SerializeAsString();
}
......@@ -560,10 +602,20 @@ ArchiveRequest::AsyncJobSuccessfulUpdater * ArchiveRequest::asyncUpdateJobSucces
return ret.release();
}
void ArchiveRequest::AsyncJobSuccessfulUpdater::wait() {
void ArchiveRequest::AsyncTransferSuccessfulUpdater::wait() {
m_backendUpdater->wait();
}
ArchiveRequest::AsyncRequestDeleter* ArchiveRequest::asyncDeleteRequest() {
std::unique_ptr<AsyncRequestDeleter> ret(new AsyncRequestDeleter);
ret->m_backendDeleter.reset(m_objectStore.asyncDelete(getAddressIfSet()));
return ret.release();
}
void ArchiveRequest::AsyncRequestDeleter::wait() {
m_backendDeleter->wait();
}
std::string ArchiveRequest::getJobOwner(uint16_t copyNumber) {
checkPayloadReadable();
auto jl = m_payload.jobs();
......@@ -578,7 +630,8 @@ QueueType ArchiveRequest::getQueueType(const serializers::ArchiveJobStatus& stat
switch(status) {
case ArchiveJobStatus::AJS_ToTransfer:
return QueueType::JobsToTransfer;
case ArchiveJobStatus::AJS_ToReport:
case ArchiveJobStatus::AJS_ToReportForTransfer:
case ArchiveJobStatus::AJS_ToReportForFailure:
return QueueType::JobsToReport;
case ArchiveJobStatus::AJS_Failed:
return QueueType::FailedJobs;
......@@ -591,8 +644,10 @@ std::string ArchiveRequest::statusToString(const serializers::ArchiveJobStatus&
switch(status) {
case serializers::ArchiveJobStatus::AJS_ToTransfer:
return "ToTransfer";
case serializers::ArchiveJobStatus::AJS_ToReport:
return "ToReport";
case serializers::ArchiveJobStatus::AJS_ToReportForTransfer:
return "ToReportForTransfer";
case serializers::ArchiveJobStatus::AJS_ToReportForFailure:
return "ToReportForFailure";
case serializers::ArchiveJobStatus::AJS_Complete:
return "Complete";
case serializers::ArchiveJobStatus::AJS_Failed:
......@@ -604,31 +659,86 @@ std::string ArchiveRequest::statusToString(const serializers::ArchiveJobStatus&
}
}
std::string ArchiveRequest::eventToString(JobEvent jobEvent) {
switch(jobEvent) {
case JobEvent::ReportFailed:
return "ReportFailed";
case JobEvent::TransferFailed:
return "EventFailed";
default:
return std::string("Unknown (")+std::to_string((uint64_t) jobEvent)+")";
}
}
bool ArchiveRequest::finishIfNecessary(log::LogContext & lc) {
auto ArchiveRequest::determineNextStep(uint16_t copyNumberUpdated, JobEvent jobEvent,
log::LogContext& lc) -> EnqueueingNextStep {
checkPayloadWritable();
// This function is typically called after changing the status of one job
// in memory. If the request is complete, we will just remove it.
// If all the jobs are either complete or failed, we can remove the request.
// We have to determine which next step should be taken.
// - When transfer succeeds or fails:
// - If the job got transferred and is not the last (other(s) remain to transfer), it becomes complete.
// - If the job failed and is not the last, we will queue it as "failed" in the failed jobs queue.
// - If the job is the last and all jobs succeeded, this jobs becomes ToReportForTransfer
// - If the job is the last and any (including this one) failed, this job becomes ToReportForFailure.
// - When report completes of fails:
// - If the report was for a failure, the job is
auto & jl=m_payload.jobs();
using serializers::ArchiveJobStatus;
std::set<serializers::ArchiveJobStatus> finishedStatuses(
{ArchiveJobStatus::AJS_Complete, ArchiveJobStatus::AJS_Failed});
for (auto & j: jl)
if (!finishedStatuses.count(j.status()))
return false;
log::ScopedParamContainer params(lc);
size_t failureNumber = 0;
for (auto failure: getFailures()) {
params.add(std::string("failure")+std::to_string(failureNumber++), failure);
// Validate the current status.
// Get status
cta::optional<ArchiveJobStatus> currentStatus;
for (auto &j:jl) { if (j.copynb() == copyNumberUpdated) currentStatus = j.status(); }
if (!currentStatus) {
std::stringstream err;
err << "In ArchiveRequest::updateJobStatus(): copynb not found : " << copyNumberUpdated
<< "exiing ones: ";
for (auto &j: jl) err << j.copynb() << " ";
throw cta::exception::Exception(err.str());
}
remove();
params.add("archiveRequestObject", getAddressIfSet());
for (auto & j: jl) {
params.add(std::string("statusForCopyNb")+std::to_string(j.copynb()), statusToString(j.status()));
// Check status compatibility with event.
switch (jobEvent) {
case JobEvent::TransferFailed:
if (*currentStatus != ArchiveJobStatus::AJS_ToTransfer) {
// Wrong status, but the context leaves no ambiguity. Just warn.
log::ScopedParamContainer params(lc);
params.add("event", eventToString(jobEvent))
.add("status", statusToString(*currentStatus))
.add("fileId", m_payload.archivefileid());
lc.log(log::WARNING, "In ArchiveRequest::updateJobStatus(): unexpected status. Assuming ToTransfer.");
}
break;
case JobEvent::ReportFailed:
if (*currentStatus != ArchiveJobStatus::AJS_ToReportForFailure && *currentStatus != ArchiveJobStatus::AJS_ToReportForTransfer) {
// Wrong status, but end status will be the same anyway.
log::ScopedParamContainer params(lc);
params.add("event", eventToString(jobEvent))
.add("status", statusToString(*currentStatus))
.add("fileId", m_payload.archivefileid());
lc.log(log::WARNING, "In ArchiveRequest::updateJobStatus(): unexpected status. Failing the job.");
}
}
lc.log(log::INFO, "In ArchiveRequest::finishIfNecessary(): Removed completed request.");
return true;
// We are in the normal cases now.
EnqueueingNextStep ret;
switch (jobEvent) {
case JobEvent::TransferFailed:
{
if (!m_payload.reportdecided()) {
m_payload.set_reportdecided(true);
ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForReport;
ret.nextStatus = serializers::ArchiveJobStatus::AJS_ToReportForFailure;
} else {
ret.nextStep = EnqueueingNextStep::NextStep::StoreInFailedJobsContainer;
ret.nextStatus = serializers::ArchiveJobStatus::AJS_Failed;
}
}
case JobEvent::ReportFailed:
{
ret.nextStep = EnqueueingNextStep::NextStep::StoreInFailedJobsContainer;
ret.nextStatus = serializers::ArchiveJobStatus::AJS_Failed;
}
}
return ret;
}
std::string ArchiveRequest::dump() {
......@@ -652,6 +762,25 @@ std::list<std::string> ArchiveRequest::getFailures() {
return ret;
}
void ArchiveRequest::setJobStatus(uint16_t copyNumber, const serializers::ArchiveJobStatus& status) {
checkPayloadWritable();