Commit 1e32adb2 authored by Eric Cano's avatar Eric Cano
Browse files

Added support for retrieve failure reporting to EOS.

parent 70688412
......@@ -45,6 +45,7 @@ struct RetrieveRequest {
UserIdentity requester;
uint64_t archiveFileID;
std::string dstURL;
std::string errorReportURL;
DiskFileInfo diskFileInfo;
EntryLog creationLog;
......
......@@ -87,10 +87,10 @@ bool cta::objectstore::ArchiveRequest::setJobSuccessful(uint16_t copyNumber) {
bool cta::objectstore::ArchiveRequest::addJobFailure(uint16_t copyNumber,
uint64_t mountId, log::LogContext & lc) {
checkPayloadWritable();
auto * jl = m_payload.mutable_jobs();
// Find the job and update the number of failures
// (and return the job status: failed (true) or to be retried (false))
for (auto & j: *jl) {
for (size_t i=0; i<(size_t)m_payload.jobs_size(); i++) {
auto &j=*m_payload.mutable_jobs(i);
if (j.copynb() == copyNumber) {
if (j.lastmountwithfailure() == mountId) {
j.set_retrieswithinmount(j.retrieswithinmount() + 1);
......@@ -102,16 +102,31 @@ bool cta::objectstore::ArchiveRequest::addJobFailure(uint16_t copyNumber,
}
if (j.totalretries() >= j.maxtotalretries()) {
j.set_status(serializers::AJS_Failed);
finishIfNecessary(lc);
if (!finishIfNecessary(lc)) commit();
return true;
} else {
j.set_status(serializers::AJS_PendingMount);
commit();
return false;
}
}
throw NoSuchJob ("In ArchiveRequest::addJobFailure(): could not find job");
}
ArchiveRequest::RetryStatus ArchiveRequest::getRetryStatus(const uint16_t copyNumber) {
checkPayloadReadable();
for (auto &j: m_payload.jobs()) {
if (copyNumber == j.copynb()) {
RetryStatus ret;
ret.retriesWithinMount = j.retrieswithinmount();
ret.maxRetriesWithinMount = j.maxretrieswithinmount();
ret.totalRetries = j.totalretries();
ret.maxTotalRetries = j.maxtotalretries();
return ret;
}
}
throw cta::exception::Exception("In ArchiveRequest::getRetryStatus(): job not found()");
}
void cta::objectstore::ArchiveRequest::setAllJobsLinkingToArchiveQueue() {
checkPayloadWritable();
......
......@@ -49,6 +49,13 @@ public:
void setJobPending(uint16_t copyNumber);
bool setJobSuccessful(uint16_t copyNumber); //< returns true if this is the last job
bool addJobFailure(uint16_t copyNumber, uint64_t sessionId, log::LogContext &lc); //< returns true the job is failed
struct RetryStatus {
uint64_t retriesWithinMount = 0;
uint64_t maxRetriesWithinMount = 0;
uint64_t totalRetries = 0;
uint64_t maxTotalRetries = 0;
};
RetryStatus getRetryStatus(uint16_t copyNumber);
serializers::ArchiveJobStatus getJobStatus(uint16_t copyNumber);
std::string statusToString(const serializers::ArchiveJobStatus & status);
bool finishIfNecessary(log::LogContext & lc);/**< Handling of the consequences of a job status change for the entire request.
......
......@@ -215,6 +215,7 @@ void RetrieveRequest::setSchedulerRequest(const cta::common::dataStructures::Ret
sr->mutable_requester()->set_group(retrieveRequest.requester.group);
sr->set_archivefileid(retrieveRequest.archiveFileID);
sr->set_dsturl(retrieveRequest.dstURL);
sr->set_retrieveerrorreporturl(retrieveRequest.errorReportURL);
DiskFileInfoSerDeser dfisd(retrieveRequest.diskFileInfo);
dfisd.serialize(*sr->mutable_diskfileinfo());
objectstore::EntryLogSerDeser el(retrieveRequest.creationLog);
......@@ -233,6 +234,7 @@ cta::common::dataStructures::RetrieveRequest RetrieveRequest::getSchedulerReques
objectstore::EntryLogSerDeser el(ret.creationLog);
el.deserialize(m_payload.schedulerrequest().entrylog());
ret.dstURL = m_payload.schedulerrequest().dsturl();
ret.errorReportURL = m_payload.schedulerrequest().retrieveerrorreporturl();
objectstore::DiskFileInfoSerDeser dfisd;
dfisd.deserialize(m_payload.schedulerrequest().diskfileinfo());
ret.diskFileInfo = dfisd;
......@@ -314,11 +316,11 @@ auto RetrieveRequest::getJobs() -> std::list<JobDump> {
//------------------------------------------------------------------------------
bool RetrieveRequest::addJobFailure(uint16_t copyNumber, uint64_t mountId, log::LogContext & lc) {
checkPayloadWritable();
auto * jl = m_payload.mutable_jobs();
// Find the job and update the number of failures
// (and return the full request status: failed (true) or to be retried (false))
// The request will go through a full requeueing if retried (in caller).
for (auto j: *jl) {
for (size_t i=0; i<(size_t)m_payload.jobs_size(); i++) {
auto &j=*m_payload.mutable_jobs(i);
if (j.copynb() == copyNumber) {
if (j.lastmountwithfailure() == mountId) {
j.set_retrieswithinmount(j.retrieswithinmount() + 1);
......@@ -330,15 +332,40 @@ bool RetrieveRequest::addJobFailure(uint16_t copyNumber, uint64_t mountId, log::
}
if (j.totalretries() >= j.maxtotalretries()) {
j.set_status(serializers::RJS_Failed);
return finishIfNecessary(lc);
bool ret=finishIfNecessary(lc);
if (!ret) commit();
return ret;
} else {
j.set_status(serializers::RJS_Pending);
commit();
return false;
}
}
throw NoSuchJob ("In RetrieveRequest::addJobFailure(): could not find job");
}
//------------------------------------------------------------------------------
// RetrieveRequest::getRetryStatus()
//------------------------------------------------------------------------------
RetrieveRequest::RetryStatus RetrieveRequest::getRetryStatus(const uint16_t copyNumber) {
checkPayloadReadable();
for (auto &j: m_payload.jobs()) {
if (copyNumber == j.copynb()) {
RetryStatus ret;
ret.retriesWithinMount = j.retrieswithinmount();
ret.maxRetriesWithinMount = j.maxretrieswithinmount();
ret.totalRetries = j.totalretries();
ret.maxTotalRetries = j.maxtotalretries();
return ret;
}
}
throw cta::exception::Exception("In RetrieveRequest::getRetryStatus(): job not found()");
}
//------------------------------------------------------------------------------
// RetrieveRequest::statusToString()
//------------------------------------------------------------------------------
std::string RetrieveRequest::statusToString(const serializers::RetrieveJobStatus& status) {
switch(status) {
case serializers::RetrieveJobStatus::RJS_Complete:
......@@ -446,6 +473,7 @@ auto RetrieveRequest::asyncUpdateOwner(uint16_t copyNumber, const std::string& o
dfi.deserialize(payload.schedulerrequest().diskfileinfo());
retRef.m_retieveRequest.diskFileInfo = dfi;
retRef.m_retieveRequest.dstURL = payload.schedulerrequest().dsturl();
retRef.m_retieveRequest.errorReportURL = payload.schedulerrequest().retrieveerrorreporturl();
retRef.m_retieveRequest.requester.name = payload.schedulerrequest().requester().name();
retRef.m_retieveRequest.requester.group = payload.schedulerrequest().requester().group();
objectstore::ArchiveFileSerDeser af;
......
......@@ -66,6 +66,13 @@ public:
std::list<JobDump> getJobs();
bool addJobFailure(uint16_t copyNumber, uint64_t mountId, log::LogContext & lc); /**< Returns true is the request is completely failed
(telling wheather we should requeue or not). */
struct RetryStatus {
uint64_t retriesWithinMount = 0;
uint64_t maxRetriesWithinMount = 0;
uint64_t totalRetries = 0;
uint64_t maxTotalRetries = 0;
};
RetryStatus getRetryStatus(uint16_t copyNumber);
std::string statusToString(const serializers::RetrieveJobStatus & status);
bool finishIfNecessary(log::LogContext & lc); /**< Handling of the consequences of a job status change for the entire request.
* This function returns true if the request got finished. */
......
......@@ -342,6 +342,7 @@ message SchedulerRetrieveRequest {
required string dstURL = 9102;
required DiskFileInfo diskfileinfo = 9103;
required EntryLog entrylog = 9106;
required string retrieveerrorreporturl = 9110;
}
message RetrieveJob {
......
......@@ -1974,7 +1974,7 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo
rqAddress = rqp.address;
}
if (!rqAddress.size()) break;
// try and lock the archive queue. Any failure from here on means the end of the getting jobs.
// try and lock the retrieve queue. Any failure from here on means the end of the getting jobs.
objectstore::RetrieveQueue rq(rqAddress, m_oStoreDB.m_objectStore);
objectstore::ScopedExclusiveLock rqLock;
findQueueTime += t.secs(utils::Timer::resetCounter);
......@@ -2442,6 +2442,17 @@ void OStoreDB::ArchiveJob::fail(log::LogContext & lc) {
m_jobOwned = false;
return;
}
{
auto retryStatus = m_archiveRequest.getRetryStatus(tapeFile.copyNb);
log::ScopedParamContainer params(lc);
params.add("fileId", archiveFile.archiveFileID)
.add("copyNb", tapeFile.copyNb)
.add("retriesWithinMount", retryStatus.retriesWithinMount)
.add("maxRetriesWithinMount", retryStatus.maxRetriesWithinMount)
.add("totalRetries", retryStatus.totalRetries)
.add("maxTotalRetries", retryStatus.maxTotalRetries);
lc.log(log::INFO, "OStoreDB::ArchiveJob::fail(): increased the error count for retrieve job.");
}
// The job still has a chance, return it to its original tape pool's queue
objectstore::ArchiveQueue aq(m_oStoreDB.m_objectStore);
objectstore::ScopedExclusiveLock aqlock;
......@@ -2528,7 +2539,7 @@ OStoreDB::RetrieveJob::RetrieveJob(const std::string& jobAddress, OStoreDB & oSt
//------------------------------------------------------------------------------
// OStoreDB::RetrieveJob::fail()
//------------------------------------------------------------------------------
void OStoreDB::RetrieveJob::fail(log::LogContext &logContext) {
bool OStoreDB::RetrieveJob::fail(log::LogContext &logContext) {
if (!m_jobOwned)
throw JobNowOwned("In OStoreDB::RetrieveJob::fail: cannot fail a job not owned");
// Lock the retrieve request. Fail the job.
......@@ -2544,7 +2555,17 @@ void OStoreDB::RetrieveJob::fail(log::LogContext &logContext) {
log::ScopedParamContainer params(logContext);
params.add("object", m_retrieveRequest.getAddressIfSet());
logContext.log(log::ERR, "In OStoreDB::RetrieveJob::fail(): request was definitely failed and deleted.");
return;
return true;
} else {
auto retryStatus = m_retrieveRequest.getRetryStatus(selectedCopyNb);
log::ScopedParamContainer params(logContext);
params.add("fileId", archiveFile.archiveFileID)
.add("copyNb", selectedCopyNb)
.add("retriesWithinMount", retryStatus.retriesWithinMount)
.add("maxRetriesWithinMount", retryStatus.maxRetriesWithinMount)
.add("totalRetries", retryStatus.totalRetries)
.add("maxTotalRetries", retryStatus.maxTotalRetries);
logContext.log(log::INFO, "OStoreDB::RetrieveJob::fail(): increased the error count for retrieve job.");
}
// The job still has a chance, requeue is to the best tape.
// Get the best vid from the cache
......@@ -2595,6 +2616,7 @@ void OStoreDB::RetrieveJob::fail(log::LogContext &logContext) {
rrl.release();
// And relinquish ownership form agent
m_oStoreDB.m_agentReference->removeFromOwnership(m_retrieveRequest.getAddressIfSet(), m_oStoreDB.m_objectStore);
return false;
}
//------------------------------------------------------------------------------
......
......@@ -190,7 +190,7 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob);
virtual void asyncSucceed() override;
virtual void checkSucceed() override;
virtual void fail(log::LogContext &) override;
virtual bool fail(log::LogContext &) override; ///< Returns true if this failure is final (we will not retry).
virtual ~RetrieveJob() override;
private:
RetrieveJob(const std::string &, OStoreDB &, RetrieveMount &);
......
......@@ -17,6 +17,11 @@
*/
#include "scheduler/RetrieveJob.hpp"
#include "common/Timer.hpp"
#include "eos/DiskReporter.hpp"
#include "RetrieveMount.hpp"
#include <cryptopp/base64.h>
#include <future>
//------------------------------------------------------------------------------
// destructor
......@@ -56,8 +61,40 @@ void cta::RetrieveJob::checkComplete() {
//------------------------------------------------------------------------------
// failed
//------------------------------------------------------------------------------
void cta::RetrieveJob::failed(log::LogContext &lc) {
m_dbJob->fail(lc);
void cta::RetrieveJob::failed(const std::string & errorReport, log::LogContext &lc) {
if (m_dbJob->fail(lc)) {
std::string base64ErrorReport;
// Construct a pipe: msg -> sign -> Base64 encode -> result goes into ret.
const bool noNewLineInBase64Output = false;
CryptoPP::StringSource ss1(errorReport, true,
new CryptoPP::Base64Encoder(
new CryptoPP::StringSink(base64ErrorReport), noNewLineInBase64Output));
std::string fullReportURL = m_dbJob->retrieveRequest.errorReportURL + base64ErrorReport;
// That's all job's already done.
std::promise<void> reporterState;
utils::Timer t;
std::unique_ptr<cta::eos::DiskReporter> reporter(m_mount.createDiskReporter(fullReportURL, reporterState));
reporter->asyncReportArchiveFullyComplete();
try {
reporterState.get_future().get();
log::ScopedParamContainer params(lc);
params.add("fileId", m_dbJob->archiveFile.archiveFileID)
.add("diskInstance", m_dbJob->archiveFile.diskInstance)
.add("diskFileId", m_dbJob->archiveFile.diskFileId)
.add("errorReport", errorReport)
.add("reportTime", t.secs());
lc.log(log::INFO, "In RetrieveJob::failed(): reported error to client.");
} catch (cta::exception::Exception & ex) {
log::ScopedParamContainer params(lc);
params.add("fileId", m_dbJob->archiveFile.archiveFileID)
.add("diskInstance", m_dbJob->archiveFile.diskInstance)
.add("diskFileId", m_dbJob->archiveFile.diskFileId)
.add("errorReport", errorReport)
.add("exceptionMsg", ex.getMessageValue())
.add("reportTime", t.secs());
lc.log(log::ERR, "In RetrieveJob::failed(): failed to report error to client.");
}
}
}
//------------------------------------------------------------------------------
......
......@@ -92,7 +92,7 @@ public:
* should already be recorded in the object beforehand. Retry policy will
* be applied by the scheduler.
*/
virtual void failed(cta::log::LogContext &);
virtual void failed(const std::string & errorReport, cta::log::LogContext &);
/**
* Helper function returning a reference to the currently selected tape file.
......
......@@ -156,6 +156,13 @@ void cta::RetrieveMount::waitAndFinishSettingJobsBatchRetrieved(std::queue<std::
}
}
//------------------------------------------------------------------------------
// createDiskReporter()
//------------------------------------------------------------------------------
cta::eos::DiskReporter* cta::RetrieveMount::createDiskReporter(std::string& URL,
std::promise<void>& reporterState) {
return m_reporterFactory.createDiskReporter(URL, reporterState);
}
//------------------------------------------------------------------------------
// tapeComplete()
......
......@@ -23,6 +23,7 @@
#include "scheduler/RetrieveMount.hpp"
#include "scheduler/SchedulerDatabase.hpp"
#include "scheduler/TapeMount.hpp"
#include "eos/DiskReporterFactory.hpp"
#include <memory>
#include <queue>
......@@ -143,6 +144,15 @@ namespace cta {
*/
virtual void waitAndFinishSettingJobsBatchRetrieved(std::queue<std::unique_ptr<cta::RetrieveJob> > & successfulRetrieveJobs, cta::log::LogContext &logContext);
/**
* Creates a disk reporter for the retrieve job (this is a wrapper).
* @param URL: report address
* @param reporterState void promise to be set when the report is done asynchronously.
* @return pointer to the reporter created.
*/
eos::DiskReporter * createDiskReporter(std::string & URL, std::promise<void> &reporterState);
/**
* Destructor.
*/
......@@ -170,6 +180,8 @@ namespace cta {
*/
bool m_diskRunning;
/** An initialized-once factory for archive reports (indirectly used by ArchiveJobs) */
eos::DiskReporterFactory m_reporterFactory;
}; // class RetrieveMount
......
......@@ -182,6 +182,7 @@ void Scheduler::queueRetrieve(
.add("diskFileGroup", request.diskFileInfo.group)
.add("diskFileRecoveryBlob", postEllipsis(request.diskFileInfo.recoveryBlob, 20))
.add("dstURL", request.dstURL)
.add("errorReportURL", request.errorReportURL)
.add("creationHost", request.creationLog.host)
.add("creationTime", request.creationLog.time)
.add("creationUser", request.creationLog.username)
......
......@@ -329,7 +329,7 @@ public:
uint64_t selectedCopyNb;
virtual void asyncSucceed() = 0;
virtual void checkSucceed() = 0;
virtual void fail(log::LogContext &) = 0;
virtual bool fail(log::LogContext &) = 0; ///< Returns true if this failure is final (we will not retry).
virtual ~RetrieveJob() {}
};
......
......@@ -35,7 +35,7 @@ namespace cta {
}
virtual void asyncComplete() override { completes++; }
virtual void checkComplete() override {}
virtual void failed(cta::log::LogContext &) override { failures++; };
virtual void failed(const std::string & errorReport, cta::log::LogContext &) override { failures++; };
~MockRetrieveJob() throw() {}
};
......
......@@ -201,8 +201,12 @@ bool RecallReportPacker::ReportEndofSessionWithErrors::goingToEnd() {
//------------------------------------------------------------------------------
void RecallReportPacker::ReportError::execute(RecallReportPacker& parent){
parent.m_errorHappened=true;
parent.m_lc.log(cta::log::ERR,m_failedRetrieveJob->failureMessage);
m_failedRetrieveJob->failed(parent.m_lc);
{
cta::log::ScopedParamContainer params(parent.m_lc);
params.add("errorMessage", m_failedRetrieveJob->failureMessage);
parent.m_lc.log(cta::log::ERR, "In RecallReportPacker::ReportError::execute(): processing error message");
}
m_failedRetrieveJob->failed(m_failedRetrieveJob->failureMessage, parent.m_lc);
}
//------------------------------------------------------------------------------
......
......@@ -58,7 +58,7 @@ protected:
virtual void checkComplete() override {}
virtual void failed(cta::log::LogContext &) override {
virtual void failed(const std::string & errorReport, cta::log::LogContext &) override {
failuresRef++;
}
......
......@@ -503,6 +503,7 @@ void RequestMessage::processPREPARE(const cta::eos::Notification &notification,
cta::common::dataStructures::RetrieveRequest request;
request.requester = originator;
request.dstURL = notification.transport().dst_url();
request.errorReportURL = notification.transport().error_report_url();
request.diskFileInfo = diskFileInfo;
request.creationLog.host = m_cliIdentity.host;
request.creationLog.username = m_cliIdentity.username;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment