Commit 4c847a70 authored by Michael Davis's avatar Michael Davis
Browse files

[os-generic-queues] Adds failedTransfer() and failedReport() to Retrieve queue

parent ce62897f
......@@ -2326,6 +2326,7 @@ OStoreDB::ArchiveJob::~ArchiveJob() {
OStoreDB::RetrieveJob::RetrieveJob(const std::string& jobAddress, OStoreDB & oStoreDB, OStoreDB::RetrieveMount& rm):
m_jobOwned(false), m_oStoreDB(oStoreDB), m_retrieveRequest(jobAddress, m_oStoreDB.m_objectStore), m_retrieveMount(rm) { }
#if 0
//------------------------------------------------------------------------------
// OStoreDB::RetrieveJob::fail()
//------------------------------------------------------------------------------
......@@ -2407,6 +2408,204 @@ bool OStoreDB::RetrieveJob::fail(const std::string& failureReason, log::LogConte
m_oStoreDB.m_agentReference->removeFromOwnership(m_retrieveRequest.getAddressIfSet(), m_oStoreDB.m_objectStore);
return false;
}
#endif
//------------------------------------------------------------------------------
// OStoreDB::RetrieveJob::failTransfer()
//------------------------------------------------------------------------------
void OStoreDB::RetrieveJob::failTransfer(const std::string &failureReason, log::LogContext &lc) {
throw cta::exception::Exception("OStoreDB::RetrieveJob::failTransfer(): not implemented.");
#if 0
if (!m_jobOwned)
throw JobNowOwned("In OStoreDB::ArchiveJob::failTransfer: cannot fail a job not owned");
// Lock the archive request. Fail the job.
objectstore::ScopedExclusiveLock arl(m_archiveRequest);
m_archiveRequest.fetch();
// Add a job failure. We will know what to do next..
typedef objectstore::ArchiveRequest::EnqueueingNextStep EnqueueingNextStep;
typedef EnqueueingNextStep::NextStep NextStep;
EnqueueingNextStep enQueueingNextStep =
m_archiveRequest.addTransferFailure(tapeFile.copyNb, m_mountId, failureReason, lc);
// First set the job status
m_archiveRequest.setJobStatus(tapeFile.copyNb, enQueueingNextStep.nextStatus);
// Now apply the decision.
// TODO: this will probably need to be factored out.
switch (enQueueingNextStep.nextStep) {
case NextStep::Nothing: {
m_archiveRequest.commit();
auto retryStatus = m_archiveRequest.getRetryStatus(tapeFile.copyNb);
log::ScopedParamContainer params(lc);
params.add("fileId", archiveFile.archiveFileID)
.add("copyNb", tapeFile.copyNb)
.add("failureReason", failureReason)
.add("requestObject", m_archiveRequest.getAddressIfSet())
.add("retriesWithinMount", retryStatus.retriesWithinMount)
.add("maxRetriesWithinMount", retryStatus.maxRetriesWithinMount)
.add("totalRetries", retryStatus.totalRetries)
.add("maxTotalRetries", retryStatus.maxTotalRetries);
lc.log(log::INFO,
"In ArchiveJob::failTransfer(): left the request owned, to be garbage collected for retry at the end of the mount.");
return;
}
case NextStep::Delete: {
auto retryStatus = m_archiveRequest.getRetryStatus(tapeFile.copyNb);
m_archiveRequest.remove();
log::ScopedParamContainer params(lc);
params.add("fileId", archiveFile.archiveFileID)
.add("copyNb", tapeFile.copyNb)
.add("failureReason", failureReason)
.add("requestObject", m_archiveRequest.getAddressIfSet())
.add("retriesWithinMount", retryStatus.retriesWithinMount)
.add("maxRetriesWithinMount", retryStatus.maxRetriesWithinMount)
.add("totalRetries", retryStatus.totalRetries)
.add("maxTotalRetries", retryStatus.maxTotalRetries);
lc.log(log::INFO, "In ArchiveJob::failTransfer(): removed request");
return;
}
case NextStep::EnqueueForReport: {
// Algorithms suppose the objects are not locked.
auto retryStatus = m_archiveRequest.getRetryStatus(tapeFile.copyNb);
m_archiveRequest.commit();
arl.release();
typedef objectstore::ContainerAlgorithms<ArchiveQueue,ArchiveQueueToReport> CaAqtr;
CaAqtr caAqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference);
CaAqtr::InsertedElement::list insertedElements;
insertedElements.push_back(CaAqtr::InsertedElement{&m_archiveRequest, tapeFile.copyNb, archiveFile, cta::nullopt, cta::nullopt });
caAqtr.referenceAndSwitchOwnership(tapeFile.vid, objectstore::QueueType::JobsToReport, insertedElements, lc);
log::ScopedParamContainer params(lc);
params.add("fileId", archiveFile.archiveFileID)
.add("copyNb", tapeFile.copyNb)
.add("failureReason", failureReason)
.add("requestObject", m_archiveRequest.getAddressIfSet())
.add("retriesWithinMount", retryStatus.retriesWithinMount)
.add("maxRetriesWithinMount", retryStatus.maxRetriesWithinMount)
.add("totalRetries", retryStatus.totalRetries)
.add("maxTotalRetries", retryStatus.maxTotalRetries);
lc.log(log::INFO, "In ArchiveJob::failTransfer(): enqueued job for reporting");
return;
}
case NextStep::EnqueueForTransfer: {
// Algorithms suppose the objects are not locked.
auto tapepool = m_archiveRequest.getTapePoolForJob(tapeFile.copyNb);
auto retryStatus = m_archiveRequest.getRetryStatus(tapeFile.copyNb);
m_archiveRequest.commit();
arl.release();
typedef objectstore::ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransfer> CaAqtr;
CaAqtr caAqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference);
CaAqtr::InsertedElement::list insertedElements;
insertedElements.push_back(CaAqtr::InsertedElement{&m_archiveRequest, tapeFile.copyNb, archiveFile, cta::nullopt, cta::nullopt });
caAqtr.referenceAndSwitchOwnership(tapepool, objectstore::QueueType::JobsToTransfer,
insertedElements, lc);
log::ScopedParamContainer params(lc);
params.add("fileId", archiveFile.archiveFileID)
.add("copyNb", tapeFile.copyNb)
.add("failureReason", failureReason)
.add("requestObject", m_archiveRequest.getAddressIfSet())
.add("retriesWithinMount", retryStatus.retriesWithinMount)
.add("maxRetriesWithinMount", retryStatus.maxRetriesWithinMount)
.add("totalRetries", retryStatus.totalRetries)
.add("maxTotalRetries", retryStatus.maxTotalRetries);
lc.log(log::INFO,
"In ArchiveJob::failTransfer(): requeued job for (potentially in-mount) retry.");
return;
}
case NextStep::StoreInFailedJobsContainer: {
// Algorithms suppose the objects are not locked.
auto retryStatus = m_archiveRequest.getRetryStatus(tapeFile.copyNb);
m_archiveRequest.commit();
arl.release();
typedef objectstore::ContainerAlgorithms<ArchiveQueue,ArchiveQueueFailed> CaAqtr;
CaAqtr caAqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference);
CaAqtr::InsertedElement::list insertedElements;
insertedElements.push_back(CaAqtr::InsertedElement{&m_archiveRequest, tapeFile.copyNb, archiveFile, cta::nullopt, cta::nullopt });
caAqtr.referenceAndSwitchOwnership(tapeFile.vid, objectstore::QueueType::FailedJobs, insertedElements, lc);
log::ScopedParamContainer params(lc);
params.add("fileId", archiveFile.archiveFileID)
.add("copyNb", tapeFile.copyNb)
.add("failureReason", failureReason)
.add("requestObject", m_archiveRequest.getAddressIfSet())
.add("retriesWithinMount", retryStatus.retriesWithinMount)
.add("maxRetriesWithinMount", retryStatus.maxRetriesWithinMount)
.add("totalRetries", retryStatus.totalRetries)
.add("maxTotalRetries", retryStatus.maxTotalRetries);
lc.log(log::INFO,
"In ArchiveJob::failTransfer(): stored job in failed container for operator handling.");
return;
}
}
#endif
}
//------------------------------------------------------------------------------
// OStoreDB::RetrieveJob::failReport()
//------------------------------------------------------------------------------
void OStoreDB::RetrieveJob::failReport(const std::string &failureReason, log::LogContext &lc) {
throw cta::exception::Exception("OStoreDB::RetrieveJob::failReport(): not implemented.");
#if 0
if (!m_jobOwned)
throw JobNowOwned("In OStoreDB::ArchiveJob::failReport: cannot fail a job not owned");
// Lock the archive request. Fail the job.
objectstore::ScopedExclusiveLock arl(m_archiveRequest);
m_archiveRequest.fetch();
// Add a job failure. We will know what to do next..
typedef objectstore::ArchiveRequest::EnqueueingNextStep EnqueueingNextStep;
typedef EnqueueingNextStep::NextStep NextStep;
EnqueueingNextStep enQueueingNextStep =
m_archiveRequest.addReportFailure(tapeFile.copyNb, m_mountId, failureReason, lc);
// First set the job status
m_archiveRequest.setJobStatus(tapeFile.copyNb, enQueueingNextStep.nextStatus);
// Now apply the decision.
// TODO: this will probably need to be factored out.
switch (enQueueingNextStep.nextStep) {
// We have a reduced set of supported next steps as some are not compatible with this event (see default).
case NextStep::EnqueueForReport: {
// Algorithms suppose the objects are not locked.
m_archiveRequest.commit();
arl.release();
typedef objectstore::ContainerAlgorithms<ArchiveQueue,ArchiveQueueToReport> CaAqtr;
CaAqtr caAqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference);
CaAqtr::InsertedElement::list insertedElements;
insertedElements.push_back(CaAqtr::InsertedElement{&m_archiveRequest, tapeFile.copyNb, archiveFile, cta::nullopt, cta::nullopt });
caAqtr.referenceAndSwitchOwnership(tapeFile.vid, objectstore::QueueType::JobsToReport, insertedElements, lc);
auto retryStatus = m_archiveRequest.getRetryStatus(tapeFile.copyNb);
log::ScopedParamContainer params(lc);
params.add("fileId", archiveFile.archiveFileID)
.add("copyNb", tapeFile.copyNb)
.add("failureReason", failureReason)
.add("requestObject", m_archiveRequest.getAddressIfSet())
.add("reportRetries", retryStatus.reportRetries)
.add("maxReportRetries", retryStatus.maxReportRetries);
lc.log(log::INFO, "In ArchiveJob::failReport(): requeued job for report retry.");
return;
}
default: {
// Algorithms suppose the objects are not locked.
m_archiveRequest.commit();
arl.release();
typedef objectstore::ContainerAlgorithms<ArchiveQueue,ArchiveQueueFailed> CaAqtr;
CaAqtr caAqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference);
CaAqtr::InsertedElement::list insertedElements;
insertedElements.push_back(CaAqtr::InsertedElement{&m_archiveRequest, tapeFile.copyNb, archiveFile, cta::nullopt, cta::nullopt });
caAqtr.referenceAndSwitchOwnership(tapeFile.vid, objectstore::QueueType::FailedJobs, insertedElements, lc);
auto retryStatus = m_archiveRequest.getRetryStatus(tapeFile.copyNb);
log::ScopedParamContainer params(lc);
params.add("fileId", archiveFile.archiveFileID)
.add("copyNb", tapeFile.copyNb)
.add("failureReason", failureReason)
.add("requestObject", m_archiveRequest.getAddressIfSet())
.add("reportRetries", retryStatus.reportRetries)
.add("maxReportRetries", retryStatus.maxReportRetries);
if (enQueueingNextStep.nextStep == NextStep::StoreInFailedJobsContainer)
lc.log(log::INFO,
"In ArchiveJob::failReport(): stored job in failed container for operator handling.");
else
lc.log(log::ERR,
"In ArchiveJob::failReport(): stored job in failed contained after unexpected next step.");
return;
}
}
#endif
}
//------------------------------------------------------------------------------
// OStoreDB::RetrieveJob::~RetrieveJob()
......
......@@ -217,7 +217,11 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob);
virtual void asyncSucceed() override;
virtual void checkSucceed() override;
#if 0
bool fail(const std::string& failureReason, log::LogContext&) override;
#endif
void failTransfer(const std::string& failureReason, log::LogContext& lc) override;
void failReport(const std::string& failureReason, log::LogContext& lc) override;
virtual ~RetrieveJob() override;
private:
RetrieveJob(const std::string &, OStoreDB &, RetrieveMount &);
......
......@@ -58,6 +58,24 @@ void cta::RetrieveJob::checkComplete() {
m_dbJob->checkSucceed();
}
//------------------------------------------------------------------------------
// reportFailed
//------------------------------------------------------------------------------
void cta::RetrieveJob::reportFailed(const std::string &failureReason, log::LogContext &lc) {
// This is fully delegated to the DB, which will handle the queueing for next steps (if any)
m_dbJob->failReport(failureReason, lc);
}
//------------------------------------------------------------------------------
// transferFailed
//------------------------------------------------------------------------------
void cta::RetrieveJob::transferFailed(const std::string &failureReason, log::LogContext &lc) {
// This is fully delegated to the DB, which will handle the queueing for next steps (if any)
m_dbJob->failTransfer(failureReason, lc);
}
#if 0
//------------------------------------------------------------------------------
// failed
//------------------------------------------------------------------------------
......@@ -97,6 +115,7 @@ void cta::RetrieveJob::failed(const std::string & failureReason, log::LogContext
}
}
}
#endif
//------------------------------------------------------------------------------
// selectedTapeFile
......
......@@ -86,12 +86,26 @@ public:
*
*/
virtual void checkComplete();
#if 0
/**
* Indicates that the job failed. Reason for failure is indicated. Retry policy will
* be applied by the scheduler.
*/
virtual void failed(const std::string &failureReason, cta::log::LogContext &);
#endif
/**
* Triggers a scheduler update following the failure of the job. Retry policy will
* be applied by the scheduler.
*/
virtual void transferFailed(const std::string &failureReason, log::LogContext &lc);
/**
* Triggers a scheduler update following the failure of the report. Retry policy will
* be applied by the scheduler.
*/
virtual void reportFailed(const std::string &failureReason, log::LogContext &lc);
/**
* Helper function returning a reference to the currently selected tape file.
......
......@@ -347,7 +347,11 @@ public:
uint64_t selectedCopyNb;
virtual void asyncSucceed() = 0;
virtual void checkSucceed() = 0;
#if 0
virtual bool fail(const std::string & failureReason, log::LogContext &) = 0;
#endif
virtual void failTransfer(const std::string &failureReason, log::LogContext &lc) = 0;
virtual void failReport(const std::string &failureReason, log::LogContext &lc) = 0;
virtual ~RetrieveJob() {}
};
......
......@@ -436,8 +436,7 @@ TEST_P(SchedulerTest, archive_report_and_retrieve_new_file) {
catalogue.tapeLabelled(s_vid, "tape_drive", lbpIsOn);
{
// Emulate a tape server by asking for a mount and then a file (and succeed
// the transfer)
// Emulate a tape server by asking for a mount and then a file (and succeed the transfer)
std::unique_ptr<cta::TapeMount> mount;
// This first initialization is normally done by the dataSession function.
cta::common::dataStructures::DriveInfo driveInfo = { driveName, "myHost", s_libraryName };
......@@ -537,8 +536,7 @@ TEST_P(SchedulerTest, archive_report_and_retrieve_new_file) {
}
{
// Emulate a tape server by asking for a mount and then a file (and succeed
// the transfer)
// Emulate a tape server by asking for a mount and then a file (and succeed the transfer)
std::unique_ptr<cta::TapeMount> mount;
mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release());
ASSERT_NE((cta::TapeMount*)NULL, mount.get());
......@@ -551,8 +549,11 @@ TEST_P(SchedulerTest, archive_report_and_retrieve_new_file) {
ASSERT_EQ(1, jobBatch.size());
retrieveJob.reset(jobBatch.front().release());
ASSERT_NE((cta::RetrieveJob*)NULL, retrieveJob.get());
retrieveJob->transferFailed("Retrieve transfer failed", lc);
#if 0
retrieveJob->asyncComplete();
retrieveJob->checkComplete();
#endif
jobBatch = retrieveMount->getNextJobBatch(1,1,lc);
ASSERT_EQ(0, jobBatch.size());
}
......
......@@ -35,7 +35,7 @@ namespace cta {
}
virtual void asyncComplete() override { completes++; }
virtual void checkComplete() override {}
void failed(const std::string& failureReason, cta::log::LogContext&) override { failures++; };
void transferFailed(const std::string &failureReason, cta::log::LogContext&) override { failures++; };
~MockRetrieveJob() throw() {}
};
......
......@@ -211,7 +211,7 @@ void RecallReportPacker::ReportError::execute(RecallReportPacker& reportPacker){
reportPacker.m_lc.log(cta::log::ERR,"In RecallReportPacker::ReportError::execute(): failing retrieve job after exception.");
}
try {
m_failedRetrieveJob->failed(m_failureLog, reportPacker.m_lc);
m_failedRetrieveJob->transferFailed(m_failureLog, reportPacker.m_lc);
} catch (cta::exception::Exception & ex) {
cta::log::ScopedParamContainer params(reportPacker.m_lc);
params.add("ExceptionMSG", ex.getMessageValue())
......
......@@ -58,7 +58,7 @@ protected:
void checkComplete() override {}
void failed(const std::string& failureReason, cta::log::LogContext&) override {
void transferFailed(const std::string &failureReason, cta::log::LogContext&) override {
failuresRef++;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment