Commit 3a824734 authored by Eric Cano's avatar Eric Cano
Browse files

Implemented missing OStoreDB::RetrieveJob::fail()

The retrieve request now gets properly queued in case of retrieve error.
The errors are counted and the request gets deleted eventually.
A new field was added to the retrive request in object store. This commit
will fail on upgrade if there are retrieve requests still queued at update time.
Cleaned up some unused structures in cta.proto
Minor modifications to ArchiveJobs.
parent 6650bb41
......@@ -89,23 +89,24 @@ bool cta::objectstore::ArchiveRequest::addJobFailure(uint16_t copyNumber,
uint64_t mountId) {
checkPayloadWritable();
auto * jl = m_payload.mutable_jobs();
// Find the job and update the number of failures (and return the new count)
for (auto j=jl->begin(); j!=jl->end(); j++) {
if (j->copynb() == copyNumber) {
if (j->lastmountwithfailure() == mountId) {
j->set_retrieswithinmount(j->retrieswithinmount() + 1);
// Find the job and update the number of failures
// (and return the job status: failed (true) or to be retried (false))
for (auto & j: *jl) {
if (j.copynb() == copyNumber) {
if (j.lastmountwithfailure() == mountId) {
j.set_retrieswithinmount(j.retrieswithinmount() + 1);
} else {
j->set_retrieswithinmount(1);
j->set_lastmountwithfailure(mountId);
j.set_retrieswithinmount(1);
j.set_lastmountwithfailure(mountId);
}
j->set_totalretries(j->totalretries() + 1);
j.set_totalretries(j.totalretries() + 1);
}
if (j->totalretries() >= j->maxtotalretries()) {
j->set_status(serializers::AJS_Failed);
if (j.totalretries() >= j.maxtotalretries()) {
j.set_status(serializers::AJS_Failed);
finishIfNecessary();
return true;
} else {
j->set_status(serializers::AJS_PendingMount);
j.set_status(serializers::AJS_PendingMount);
return false;
}
}
......@@ -492,17 +493,15 @@ std::string ArchiveRequest::getJobOwner(uint16_t copyNumber) {
bool ArchiveRequest::finishIfNecessary() {
checkPayloadWritable();
// This function is typically called after changing the status of one job
// in memory. If the job is complete, we will just remove it.
// TODO: we will have to push the result to the ArchiveToDirRequest when
// it gets implemented.
// in memory. If the request is complete, we will just remove it.
// If all the jobs are either complete or failed, we can remove the request.
auto & jl=m_payload.jobs();
for (auto j=jl.begin(); j!=jl.end(); j++) {
if (j->status() != serializers::AJS_Complete
&& j->status() != serializers::AJS_Failed) {
using serializers::ArchiveJobStatus;
std::set<serializers::ArchiveJobStatus> finishedStatuses(
{ArchiveJobStatus::AJS_Complete, ArchiveJobStatus::AJS_Failed});
for (auto & j: jl)
if (!finishedStatuses.count(j.status()))
return false;
}
}
remove();
return true;
}
......
......@@ -47,11 +47,10 @@ public:
void setJobSelected(uint16_t copyNumber, const std::string & owner);
void setJobPending(uint16_t copyNumber);
bool setJobSuccessful(uint16_t copyNumber); //< returns true if this is the last job
bool addJobFailure(uint16_t copyNumber, uint64_t sessionId); //< returns true the job failed
bool addJobFailure(uint16_t copyNumber, uint64_t sessionId); //< returns true the job is failed
serializers::ArchiveJobStatus getJobStatus(uint16_t copyNumber);
// Handling of the consequences of a job status change for the entire request.
// This function returns true if the request got finished.
bool finishIfNecessary();
bool finishIfNecessary(); /**< Handling of the consequences of a job status change for the entire request.
* This function returns true if the request got finished. */
// Mark all jobs as pending mount (following their linking to a tape pool)
void setAllJobsLinkingToArchiveQueue();
// Mark all the jobs as being deleted, in case of a cancellation
......
......@@ -163,6 +163,7 @@ void RetrieveRequest::addJob(uint64_t copyNb, uint16_t maxRetiesWithinMount, uin
checkPayloadWritable();
auto *tf = m_payload.add_jobs();
tf->set_copynb(copyNb);
tf->set_lastmountwithfailure(0);
tf->set_maxretrieswithinmount(maxRetiesWithinMount);
tf->set_maxtotalretries(maxTotalRetries);
tf->set_retrieswithinmount(0);
......@@ -170,27 +171,6 @@ void RetrieveRequest::addJob(uint64_t copyNb, uint16_t maxRetiesWithinMount, uin
tf->set_status(serializers::RetrieveJobStatus::RJS_Pending);
}
//------------------------------------------------------------------------------
// setJobSuccessful
//------------------------------------------------------------------------------
bool RetrieveRequest::setJobSuccessful(uint16_t copyNumber) {
checkPayloadWritable();
auto * jl = m_payload.mutable_jobs();
for (auto j=jl->begin(); j!=jl->end(); j++) {
if (j->copynb() == copyNumber) {
j->set_status(serializers::RetrieveJobStatus::RJS_Complete);
for (auto j2=jl->begin(); j2!=jl->end(); j2++) {
if (j2->status()!= serializers::RetrieveJobStatus::RJS_Complete &&
j2->status()!= serializers::RetrieveJobStatus::RJS_Failed)
return false;
}
return true;
}
}
throw NoSuchJob("In RetrieveRequest::setJobSuccessful(): job not found");
}
//------------------------------------------------------------------------------
// setSchedulerRequest
//------------------------------------------------------------------------------
......@@ -304,12 +284,57 @@ auto RetrieveRequest::getJobs() -> std::list<JobDump> {
return ret;
}
RetrieveRequest::FailuresCount RetrieveRequest::addJobFailure(uint16_t copyNumber, uint64_t sessionId) {
throw exception::Exception(std::string(__FUNCTION__) + " not implemented");
bool RetrieveRequest::addJobFailure(uint16_t copyNumber, uint64_t mountId) {
checkPayloadWritable();
auto * jl = m_payload.mutable_jobs();
// Find the job and update the number of failures
// (and return the full request status: failed (true) or to be retried (false))
// The request will go through a full requeueing if retried (in caller).
for (auto j: *jl) {
if (j.copynb() == copyNumber) {
if (j.lastmountwithfailure() == mountId) {
j.set_retrieswithinmount(j.retrieswithinmount() + 1);
} else {
j.set_retrieswithinmount(1);
j.set_lastmountwithfailure(mountId);
}
j.set_totalretries(j.totalretries() + 1);
}
if (j.totalretries() >= j.maxtotalretries()) {
j.set_status(serializers::RJS_Failed);
return finishIfNecessary();
} else {
j.set_status(serializers::RJS_Pending);
return false;
}
}
throw NoSuchJob ("In ArchiveRequest::addJobFailure(): could not find job");
}
void RetrieveRequest::finish() {
throw exception::Exception(std::string(__FUNCTION__) + " not implemented");
bool RetrieveRequest::finishIfNecessary() {
checkPayloadWritable();
// This function is typically called after changing the status of one job
// in memory. If the request is complete, we will just remove it.
// If all the jobs are either complete or failed, we can remove the request.
auto & jl=m_payload.jobs();
using serializers::RetrieveJobStatus;
std::set<serializers::RetrieveJobStatus> finishedStatuses(
{RetrieveJobStatus::RJS_Complete, RetrieveJobStatus::RJS_Failed});
for (auto & j: jl)
if (!finishedStatuses.count(j.status()))
return false;
remove();
return true;
}
serializers::RetrieveJobStatus RetrieveRequest::getJobStatus(uint16_t copyNumber) {
checkPayloadReadable();
for (auto & j: m_payload.jobs())
if (j.copynb() == copyNumber)
return j.status();
std::stringstream err;
err << "In RetrieveRequest::getJobStatus(): could not find job for copynb=" << copyNumber;
throw exception::Exception(err.str());
}
void RetrieveRequest::setActiveCopyNumber(uint32_t activeCopyNb) {
......
......@@ -48,7 +48,6 @@ public:
void addJob(uint64_t copyNumber, uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries);
void setJobSelected(uint16_t copyNumber, const std::string & owner);
void setJobPending(uint16_t copyNumber);
bool setJobSuccessful(uint16_t copyNumber); //< returns true if this is the last job
class JobDump {
public:
uint64_t copyNb;
......@@ -60,15 +59,11 @@ public:
};
JobDump getJob(uint16_t copyNb);
std::list<JobDump> getJobs();
struct FailuresCount {
uint16_t failuresWithinMount;
uint16_t totalFailures;
};
FailuresCount addJobFailure(uint16_t copyNumber, uint64_t sessionId);
bool addJobFailure(uint16_t copyNumber, uint64_t mountId); /**< Returns true is the request is completely failed
(telling wheather we should requeue or not). */
bool finishIfNecessary(); /**< Handling of the consequences of a job status change for the entire request.
* This function returns true if the request got finished. */
serializers::RetrieveJobStatus getJobStatus(uint16_t copyNumber);
// Handling of the consequences of a job status. This is simpler that archival
// as one finish is enough.
void finish();
// Mark all jobs as pending mount (following their linking to a tape pool)
void setAllJobsLinkingToTapePool();
// Mark all the jobs as being deleted, in case of a cancellation
......
......@@ -222,19 +222,6 @@ message FilePathAndStatus {
required UserIdentity owner = 4453;
}
message ArchiveJobEntry {
required uint32 copynb = 4400;
required string tapepool = 4401;
required string archivequeueaddress = 4402;
required string owner = 4403;
required ArchiveJobStatus status = 4404;
required uint32 totalretries = 4405;
required uint32 retrieswithinmount = 4406;
required uint64 lastmountwithfailure = 4407;
required uint32 maxtotalretries = 4408;
required uint32 maxretrieswithinmount = 4409;
}
// ------------- Retrieve Jobs -------------------------------------------------
// The status of the individual retrieve jobs. The jobs are materialised
......@@ -331,6 +318,19 @@ message MountPolicy {
required string comment = 8988;
}
message ArchiveJob {
required uint32 copynb = 4400;
required string tapepool = 4401;
required string archivequeueaddress = 4402;
required string owner = 4403;
required ArchiveJobStatus status = 4404;
required uint32 totalretries = 4405;
required uint32 retrieswithinmount = 4406;
required uint64 lastmountwithfailure = 4407;
required uint32 maxtotalretries = 4408;
required uint32 maxretrieswithinmount = 4409;
}
message ArchiveRequest {
required uint64 archiveFileID = 8990;
required MountPolicy mountpolicy = 8995;
......@@ -347,7 +347,7 @@ message ArchiveRequest {
required string srcurl = 9080;
required string storageclass = 9090;
required EntryLog creationlog = 9091;
repeated ArchiveJobEntry jobs = 9092;
repeated ArchiveJob jobs = 9092;
}
message SchedulerRetrieveRequest {
......@@ -365,6 +365,7 @@ message RetrieveJob {
required uint32 retrieswithinmount = 9203;
required uint32 totalretries = 9204;
required RetrieveJobStatus status = 9205;
required uint64 lastmountwithfailure = 9206;
}
message RetrieveRequest {
......
......@@ -1536,7 +1536,7 @@ std::unique_ptr<SchedulerDatabase::RetrieveMount>
// latest known state of the drive (and its absence of updating if needed)
// Prepare the return value
std::unique_ptr<OStoreDB::RetrieveMount> privateRet(
new OStoreDB::RetrieveMount(m_objectStore, m_agentReference));
new OStoreDB::RetrieveMount(m_objectStore, m_agentReference, m_catalogue));
auto &rm = *privateRet;
// Check we hold the scheduling lock
if (!m_lockTaken)
......@@ -1896,8 +1896,8 @@ OStoreDB::ArchiveJob::ArchiveJob(const std::string& jobAddress,
//------------------------------------------------------------------------------
// OStoreDB::RetrieveMount::RetrieveMount()
//------------------------------------------------------------------------------
OStoreDB::RetrieveMount::RetrieveMount(objectstore::Backend& os, objectstore::AgentReference& ar):
m_objectStore(os), m_agentReference(ar) { }
OStoreDB::RetrieveMount::RetrieveMount(objectstore::Backend& os, objectstore::AgentReference& ar, catalogue::Catalogue & c):
m_objectStore(os), m_agentReference(ar), m_catalogue(c) { }
//------------------------------------------------------------------------------
// OStoreDB::RetrieveMount::getMountInfo()
......@@ -1969,7 +1969,7 @@ auto OStoreDB::RetrieveMount::getNextJob(log::LogContext & logContext) -> std::u
// Prepare the return value
auto job=rq.dumpJobs().front();
std::unique_ptr<OStoreDB::RetrieveJob> privateRet(new OStoreDB::RetrieveJob(
job.address, m_objectStore, m_agentReference, *this));
job.address, m_objectStore, m_catalogue, m_agentReference, *this));
privateRet->selectedCopyNb = job.copyNb;
objectstore::ScopedExclusiveLock rrl;
try {
......@@ -2252,17 +2252,80 @@ OStoreDB::ArchiveJob::~ArchiveJob() {
//------------------------------------------------------------------------------
// OStoreDB::RetrieveJob::RetrieveJob()
//------------------------------------------------------------------------------
OStoreDB::RetrieveJob::RetrieveJob(const std::string& jobAddress,
objectstore::Backend& os, objectstore::AgentReference& ar,
OStoreDB::RetrieveJob::RetrieveJob(const std::string& jobAddress,
objectstore::Backend& os, catalogue::Catalogue & c, objectstore::AgentReference& ar,
OStoreDB::RetrieveMount& rm): m_jobOwned(false),
m_objectStore(os), m_agentReference(ar), m_retrieveRequest(jobAddress, os),
m_objectStore(os), m_catalogue(c), m_agentReference(ar), m_retrieveRequest(jobAddress, os),
m_retrieveMount(rm) { }
//------------------------------------------------------------------------------
// OStoreDB::RetrieveJob::fail()
//------------------------------------------------------------------------------
void OStoreDB::RetrieveJob::fail() {
throw NotImplemented("");
void OStoreDB::RetrieveJob::fail(log::LogContext &logContext) {
if (!m_jobOwned)
throw JobNowOwned("In OStoreDB::RetrieveJob::fail: cannot fail a job not owned");
// Lock the retrieve request. Fail the job.
objectstore::ScopedExclusiveLock rrl(m_retrieveRequest);
m_retrieveRequest.fetch();
// Add a job failure. If the job is failed, we will delete it.
if (m_retrieveRequest.addJobFailure(selectedCopyNb, m_mountId)) {
// The job will not be retried. Either another jobs for the same request is
// queued and keeps the request referenced or the request has been deleted.
// In any case, we can forget it.
m_agentReference.removeFromOwnership(m_retrieveRequest.getAddressIfSet(), m_objectStore);
m_jobOwned = false;
log::ScopedParamContainer params(logContext);
params.add("object", m_retrieveRequest.getAddressIfSet());
logContext.log(log::ERR, "In OStoreDB::RetrieveJob::fail(): request was definitely failed and deleted.");
return;
}
// The job still has a chance, requeue is to the best tape.
// Get the best vid from the cache
std::set<std::string> candidateVids;
using serializers::RetrieveJobStatus;
std::set<serializers::RetrieveJobStatus> finishedStatuses(
{RetrieveJobStatus::RJS_Complete, RetrieveJobStatus::RJS_Failed});
for (auto & tf: m_retrieveRequest.getRetrieveFileQueueCriteria().archiveFile.tapeFiles)
if (!finishedStatuses.count(m_retrieveRequest.getJobStatus(tf.second.copyNb)))
candidateVids.insert(tf.second.vid);
if (candidateVids.empty())
throw cta::exception::Exception("In OStoreDB::RetrieveJob::fail(): no active job after addJobFailure() returned false.");
std::string bestVid=Helpers::selectBestRetrieveQueue(candidateVids, m_catalogue, m_objectStore);
// Check that the requested retrieve job (for the provided vid) exists, and record the copynb.
uint64_t bestCopyNb;
for (auto & tf: m_retrieveRequest.getRetrieveFileQueueCriteria().archiveFile.tapeFiles) {
if (tf.second.vid == bestVid) {
bestCopyNb = tf.second.copyNb;
goto vidFound;
}
}
{
std::stringstream err;
err << "In OStoreDB::RetrieveJob::fail(): no tape file for requested vid. archiveId="
<< m_retrieveRequest.getRetrieveFileQueueCriteria().archiveFile.archiveFileID
<< " vid=" << bestVid;
throw RetrieveRequestHasNoCopies(err.str());
}
vidFound:
{
// Add the request to the queue.
objectstore::RetrieveQueue rq(m_objectStore);
objectstore::ScopedExclusiveLock rql;
objectstore::Helpers::getLockedAndFetchedQueue<RetrieveQueue>(rq, rql, m_agentReference, bestVid);
auto rfqc = m_retrieveRequest.getRetrieveFileQueueCriteria();
auto & af=rfqc.archiveFile;
auto & tf = af.tapeFiles.at(bestCopyNb);
auto sr = m_retrieveRequest.getSchedulerRequest();
rq.addJobIfNecessary(bestCopyNb, tf.fSeq, m_retrieveRequest.getAddressIfSet(), af.fileSize, rfqc.mountPolicy, sr.creationLog.time);
m_retrieveRequest.setOwner(rq.getAddressIfSet());
m_retrieveRequest.commit();
// We do not own the request anymore
m_jobOwned = false;
// The lock on the queue is released here (has to be after the request commit for consistency.
}
rrl.release();
// And relinquish ownership form agent
m_agentReference.removeFromOwnership(m_retrieveRequest.getAddressIfSet(), m_objectStore);
}
//------------------------------------------------------------------------------
......@@ -2343,16 +2406,11 @@ OStoreDB::RetrieveJob::~RetrieveJob() {
// OStoreDB::RetrieveJob::succeed()
//------------------------------------------------------------------------------
void OStoreDB::RetrieveJob::succeed() {
// Lock the request and set the job as successful.
// Lock the request and set the request as successful (delete it).
objectstore::ScopedExclusiveLock rtfrl(m_retrieveRequest);
m_retrieveRequest.fetch();
std::string rtfrAddress = m_retrieveRequest.getAddressIfSet();
if (m_retrieveRequest.setJobSuccessful(selectedCopyNb)) {
m_retrieveRequest.remove();
} else {
m_retrieveRequest.commit();
}
// We no more own the job (which could be gone)
m_retrieveRequest.remove();
m_jobOwned = false;
// Remove ownership form the agent
m_agentReference.removeFromOwnership(rtfrAddress, m_objectStore);
......
......@@ -162,9 +162,10 @@ public:
class RetrieveMount: public SchedulerDatabase::RetrieveMount {
friend class TapeMountDecisionInfo;
private:
RetrieveMount(objectstore::Backend &, objectstore::AgentReference &);
RetrieveMount(objectstore::Backend &, objectstore::AgentReference &, catalogue::Catalogue &);
objectstore::Backend & m_objectStore;
objectstore::AgentReference & m_agentReference;
catalogue::Catalogue & m_catalogue;
public:
const MountInfo & getMountInfo() override;
std::unique_ptr<RetrieveJob> getNextJob(log::LogContext & logContext) override;
......@@ -180,17 +181,18 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(JobNowOwned);
CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob);
virtual void succeed() override;
virtual void fail() override;
virtual void fail(log::LogContext &) override;
virtual ~RetrieveJob() override;
private:
RetrieveJob(const std::string &, objectstore::Backend &,
RetrieveJob(const std::string &, objectstore::Backend &, catalogue::Catalogue &,
objectstore::AgentReference &, RetrieveMount &);
bool m_jobOwned;
uint64_t m_mountId;
objectstore::Backend & m_objectStore;
catalogue::Catalogue & m_catalogue;
objectstore::AgentReference & m_agentReference;
objectstore::RetrieveRequest m_retrieveRequest;
RetrieveMount & m_retrieveMount;
OStoreDB::RetrieveMount & m_retrieveMount;
};
/* === Archive requests handling ========================================= */
......
......@@ -49,8 +49,8 @@ void cta::RetrieveJob::complete() {
//------------------------------------------------------------------------------
// failed
//------------------------------------------------------------------------------
void cta::RetrieveJob::failed() {
m_dbJob->fail();
void cta::RetrieveJob::failed(log::LogContext &lc) {
m_dbJob->fail(lc);
}
//------------------------------------------------------------------------------
......
......@@ -86,7 +86,7 @@ public:
* should already be recorded in the object beforehand. Retry policy will
* be applied by the scheduler.
*/
virtual void failed();
virtual void failed(cta::log::LogContext &);
/**
* Helper function returning a reference to the currently selected tape file.
......
......@@ -345,7 +345,7 @@ public:
cta::common::dataStructures::ArchiveFile archiveFile;
uint64_t selectedCopyNb;
virtual void succeed() = 0;
virtual void fail() = 0;
virtual void fail(log::LogContext &) = 0;
virtual ~RetrieveJob() {}
};
......
......@@ -34,8 +34,8 @@ namespace cta {
archiveFile.tapeFiles[1];
}
virtual void complete() { completes++; }
virtual void failed() { failures++; };
virtual void complete() override { completes++; }
virtual void failed(cta::log::LogContext &) override { failures++; };
~MockRetrieveJob() throw() {}
};
......
......@@ -201,7 +201,7 @@ bool RecallReportPacker::ReportEndofSessionWithErrors::goingToEnd() {
void RecallReportPacker::ReportError::execute(RecallReportPacker& parent){
parent.m_errorHappened=true;
parent.m_lc.log(cta::log::ERR,m_failedRetrieveJob->failureMessage);
m_failedRetrieveJob->failed();
m_failedRetrieveJob->failed(parent.m_lc);
}
//------------------------------------------------------------------------------
......
......@@ -52,12 +52,12 @@ protected:
MockRetrieveJobExternalStats(cta::RetrieveMount & rm, int & completes, int &failures):
MockRetrieveJob(rm), completesRef(completes), failuresRef(failures) {}
virtual void complete() {
virtual void complete() override {
completesRef++;
}
virtual void failed() {
virtual void failed(cta::log::LogContext &) override {
failuresRef++;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment