Skip to content
Snippets Groups Projects
Commit 05d66ea9 authored by Michael Davis's avatar Michael Davis
Browse files

[os-generic-queues] Adds methods to resolve missing symbols

parent c7b5b789
No related branches found
No related tags found
No related merge requests found
......@@ -736,6 +736,7 @@ auto ArchiveRequest::determineNextStep(uint16_t copyNumberUpdated, JobEvent jobE
ret.nextStatus = serializers::ArchiveJobStatus::AJS_Failed;
}
}
break;
case JobEvent::ReportFailed:
{
ret.nextStep = EnqueueingNextStep::NextStep::StoreInFailedJobsContainer;
......
......@@ -284,6 +284,45 @@ void RetrieveRequest::addJob(uint64_t copyNb, uint16_t maxRetiesWithinMount, uin
tf->set_status(serializers::RetrieveJobStatus::RJS_ToTransfer);
}
//------------------------------------------------------------------------------
// addTransferFailure()
//------------------------------------------------------------------------------
auto RetrieveRequest::addTransferFailure(uint16_t copyNumber, uint64_t mountId, const std::string &failureReason,
log::LogContext &lc) -> EnqueueingNextStep
{
checkPayloadWritable();
// Find the job and update the number of failures
for(int i = 0; i < m_payload.jobs_size(); i++) {
auto &j = *m_payload.mutable_jobs(i);
if(j.copynb() == copyNumber) {
if(j.lastmountwithfailure() == mountId) {
j.set_retrieswithinmount(j.retrieswithinmount() + 1);
} else {
j.set_retrieswithinmount(1);
j.set_lastmountwithfailure(mountId);
}
j.set_totalretries(j.totalretries() + 1);
*j.mutable_failurelogs()->Add() = failureReason;
}
if(j.totalretries() >= j.maxtotalretries()) {
// We have to determine if this was the last copy to fail/succeed.
return determineNextStep(copyNumber, JobEvent::TransferFailed, lc);
} else {
EnqueueingNextStep ret;
ret.nextStatus = serializers::RetrieveJobStatus::RJS_ToTransfer;
// Decide if we want the job to have a chance to come back to this mount (requeue) or not. In the latter
// case, the job will remain owned by this session and get garbage collected.
if(j.retrieswithinmount() >= j.maxretrieswithinmount())
ret.nextStep = EnqueueingNextStep::NextStep::Nothing;
else
ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForTransfer;
return ret;
}
}
throw NoSuchJob("In RetrieveRequest::addJobFailure(): could not find job");
}
//------------------------------------------------------------------------------
// RetrieveRequest::getLastActiveVid()
//------------------------------------------------------------------------------
......@@ -493,6 +532,98 @@ std::string RetrieveRequest::statusToString(const serializers::RetrieveJobStatus
}
}
//------------------------------------------------------------------------------
// RetrieveRequest::eventToString()
//------------------------------------------------------------------------------
std::string RetrieveRequest::eventToString(JobEvent jobEvent) {
switch(jobEvent) {
case JobEvent::ReportFailed: return "ReportFailed";
case JobEvent::TransferFailed: return "EventFailed";
}
return std::string("Unknown (") + std::to_string(static_cast<unsigned int>(jobEvent)) + ")";
}
//------------------------------------------------------------------------------
// RetrieveRequest::determineNextStep()
//
// We have to determine which next step should be taken:
//
// * When transfer succeeds or fails:
// - If the job got transferred and is not the last (other(s) remain to transfer), it becomes complete.
// - If the job failed and is not the last, we will queue it as "failed" in the failed jobs queue.
// - If the job is the last and all jobs succeeded, this jobs becomes ToReportForTransfer
// - If the job is the last and any (including this one) failed, this job becomes ToReportForFailure.
//
// * When report completes or fails:
// - If the report was for a failure, the job is
//------------------------------------------------------------------------------
auto RetrieveRequest::determineNextStep(uint16_t copyNumberUpdated, JobEvent jobEvent,
log::LogContext& lc) -> EnqueueingNextStep
{
checkPayloadWritable();
auto &jl = m_payload.jobs();
using serializers::RetrieveJobStatus;
// Validate the current status
//
// Get status
cta::optional<RetrieveJobStatus> currentStatus;
for (auto &j : jl) {
if(j.copynb() == copyNumberUpdated) currentStatus = j.status();
}
if (!currentStatus) {
std::stringstream err;
err << "In RetrieveRequest::updateJobStatus(): copynb not found : " << copyNumberUpdated << ", exiting ones: ";
for(auto &j : jl) err << j.copynb() << " ";
throw cta::exception::Exception(err.str());
}
// Check status compatibility with event
switch (jobEvent)
{
case JobEvent::TransferFailed:
if (*currentStatus != RetrieveJobStatus::RJS_ToTransfer) {
// Wrong status, but the context leaves no ambiguity. Just warn.
log::ScopedParamContainer params(lc);
params.add("event", eventToString(jobEvent))
.add("status", statusToString(*currentStatus))
.add("fileId", m_payload.archivefile().archivefileid());
lc.log(log::WARNING, "In RetrieveRequest::updateJobStatus(): unexpected status. Assuming ToTransfer.");
}
break;
case JobEvent::ReportFailed:
if(*currentStatus != RetrieveJobStatus::RJS_FailedToReport) {
// Wrong status, but end status will be the same anyway
log::ScopedParamContainer params(lc);
params.add("event", eventToString(jobEvent))
.add("status", statusToString(*currentStatus))
.add("fileId", m_payload.archivefile().archivefileid());
lc.log(log::WARNING, "In RetrieveRequest::updateJobStatus(): unexpected status. Failing the job.");
}
}
// We are in the normal cases now
EnqueueingNextStep ret;
switch(jobEvent)
{
case JobEvent::TransferFailed: {
if(!m_payload.failurereported()) {
m_payload.set_failurereported(true);
ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForReport;
ret.nextStatus = serializers::RetrieveJobStatus::RJS_FailedToReport;
} else {
ret.nextStep = EnqueueingNextStep::NextStep::StoreInFailedJobsContainer;
ret.nextStatus = serializers::RetrieveJobStatus::RJS_Failed;
}
}
break;
case JobEvent::ReportFailed: {
ret.nextStep = EnqueueingNextStep::NextStep::StoreInFailedJobsContainer;
ret.nextStatus = serializers::RetrieveJobStatus::RJS_Failed;
}
}
return ret;
}
//------------------------------------------------------------------------------
// RetrieveRequest::getJobStatus()
//------------------------------------------------------------------------------
......@@ -703,4 +834,18 @@ void RetrieveRequest::setFailureReported() {
m_payload.set_failurereported(true);
}
//------------------------------------------------------------------------------
// RetrieveRequest::setJobStatus()
//------------------------------------------------------------------------------
void RetrieveRequest::setJobStatus(uint64_t copyNumber, const serializers::RetrieveJobStatus& status) {
checkPayloadWritable();
for (auto j = m_payload.mutable_jobs()->begin(); j != m_payload.mutable_jobs()->end(); j++) {
if (j->copynb() == copyNumber) {
j->set_status(status);
return;
}
}
throw exception::Exception("In RetrieveRequest::setJobStatus(): job not found.");
}
}} // namespace cta::objectstore
......@@ -93,6 +93,22 @@ public:
//! The copy number to enqueue. It could be different from the updated one in mixed success/failure scenario.
serializers::RetrieveJobStatus nextStatus;
};
private:
/*!
* Determine and set the new status of the job.
*
* Determines whether the request should be queued or deleted after the job status change. This method
* only handles failures, which have a more varied array of possibilities.
*
* @param[in] copyNumberToUpdate the copy number to update
* @param[in] jobEvent the event that happened to the job
* @param[in] lc the log context
*
* @returns The next step to be taken by the caller (OStoreDB), which is in charge of the queueing
* and status setting
*/
EnqueueingNextStep determineNextStep(uint16_t copyNumberToUpdate, JobEvent jobEvent, log::LogContext &lc);
public:
//! Returns next step to take with the job
EnqueueingNextStep addTransferFailure(uint16_t copyNumber, uint64_t sessionId, const std::string &failureReason, log::LogContext &lc);
//! Returns queue type depending on the compound statuses of all retrieve requests
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment