Commit 5675a989 authored by Eric Cano's avatar Eric Cano
Browse files

Remodeled the async deletion of archive requests on updates.

This allows better internal (in-memory) tracking, and avoid spurious
"In OStoreDB::ArchiveJob::~ArchiveJob(): will leave the job owned after destruction."
message in the logs.
parent 0c1382be
......@@ -701,11 +701,7 @@ void OStoreDB::setJobBatchReported(std::list<cta::SchedulerDatabase::ArchiveJob*
// We can have a mixture of failed and successful jobs, so we will sort them before batch queue/discarding them.
// First, sort the jobs. Done jobs get deleted (no need to sort further) and failed jobs go to their per-VID queues/containers.
// Status gets updated on the fly on the latter case.
struct RequestToDelete {
ArchiveJob * job;
std::unique_ptr<objectstore::ArchiveRequest::AsyncRequestDeleter> deleter;
};
std::list<RequestToDelete> competeJobsToDelete;
std::list<ArchiveJob *> completeJobsToDelete;
struct FailedJobToQueue {
ArchiveJob * job;
};
......@@ -714,8 +710,7 @@ void OStoreDB::setJobBatchReported(std::list<cta::SchedulerDatabase::ArchiveJob*
for (auto &j: jobsBatch) {
switch (j->reportType) {
case SchedulerDatabase::ArchiveJob::ReportType::CompletionReport:
competeJobsToDelete.push_back(RequestToDelete());
competeJobsToDelete.back().job = castFromSchedDBJob(j);
completeJobsToDelete.push_back(castFromSchedDBJob(j));
break;
case SchedulerDatabase::ArchiveJob::ReportType::FailureReport:
failedQueues[j->tapeFile.vid].push_back(FailedJobToQueue());
......@@ -730,24 +725,23 @@ void OStoreDB::setJobBatchReported(std::list<cta::SchedulerDatabase::ArchiveJob*
}
}
}
if (competeJobsToDelete.size()) {
if (completeJobsToDelete.size()) {
// Launch deletion.
std::list<std::unique_ptr<objectstore::ArchiveRequest::AsyncRequestDeleter>> deleters;
for (auto &j: competeJobsToDelete) {
j.deleter.reset(j.job->m_archiveRequest.asyncDeleteRequest());
for (auto &j: completeJobsToDelete) {
j->asyncDeleteRequest();
}
timingList.insertAndReset("deleteLaunchTime", t);
for (auto &j: competeJobsToDelete) {
for (auto &j: completeJobsToDelete) {
try {
j.deleter->wait();
j->waitAsyncDelete();
log::ScopedParamContainer params(lc);
params.add("fileId", j.job->archiveFile.archiveFileID)
.add("objectAddress", j.job->m_archiveRequest.getAddressIfSet());
params.add("fileId", j->archiveFile.archiveFileID)
.add("objectAddress", j->m_archiveRequest.getAddressIfSet());
lc.log(log::INFO, "In OStoreDB::setJobBatchReported(): deleted ArchiveRequest after completion and reporting.");
} catch (cta::exception::Exception & ex) {
log::ScopedParamContainer params(lc);
params.add("fileId", j.job->archiveFile.archiveFileID)
.add("objectAddress", j.job->m_archiveRequest.getAddressIfSet())
params.add("fileId", j->archiveFile.archiveFileID)
.add("objectAddress", j->m_archiveRequest.getAddressIfSet())
.add("exceptionMSG", ex.getMessageValue());
lc.log(log::ERR, "In OStoreDB::setJobBatchReported(): failed to delete ArchiveRequest after completion and reporting.");
}
......@@ -2346,7 +2340,7 @@ void OStoreDB::ArchiveMount::setJobBatchTransferred(std::list<std::unique_ptr<ct
// We will asynchronously report the archive jobs (which MUST be OStoreDBJobs).
// We let the exceptions through as failing to report is fatal.
for (auto & sDBJob: jobsBatch) {
castFromSchedDBJob(sDBJob.get())->asyncSucceed();
castFromSchedDBJob(sDBJob.get())->asyncSucceedTransfer();
}
timingList.insertAndReset("asyncSucceedLaunchTime", t);
// TODO : could be optimized with single copy requests: we know we have to requeue. (minor optimization)
......@@ -2610,29 +2604,54 @@ void OStoreDB::ArchiveJob::bumpUpTapeFileCount(uint64_t newFileCount) {
//------------------------------------------------------------------------------
// OStoreDB::ArchiveJob::asyncSucceed()
//------------------------------------------------------------------------------
void OStoreDB::ArchiveJob::asyncSucceed() {
void OStoreDB::ArchiveJob::asyncSucceedTransfer() {
log::LogContext lc(m_oStoreDB.m_logger);
log::ScopedParamContainer params(lc);
params.add("requestObject", m_archiveRequest.getAddressIfSet());
lc.log(log::DEBUG, "Will start async update archiveRequest for success");
m_jobUpdate.reset(m_archiveRequest.asyncUpdateTransferSuccessful(tapeFile.copyNb));
lc.log(log::DEBUG, "Will start async update archiveRequest for transfer success");
m_succesfulTransferUpdater.reset(m_archiveRequest.asyncUpdateTransferSuccessful(tapeFile.copyNb));
}
//------------------------------------------------------------------------------
// OStoreDB::ArchiveJob::checkSucceed()
// OStoreDB::ArchiveJob::waitAsyncSucceed()
//------------------------------------------------------------------------------
bool OStoreDB::ArchiveJob::waitAsyncSucceed() {
m_jobUpdate->wait();
m_succesfulTransferUpdater->wait();
log::LogContext lc(m_oStoreDB.m_logger);
log::ScopedParamContainer params(lc);
params.add("requestObject", m_archiveRequest.getAddressIfSet());
lc.log(log::DEBUG, "Async update of archiveRequest for success complete");
lc.log(log::DEBUG, "Async update of archiveRequest for transfer success complete");
// We no more own the job (which could be gone)
m_jobOwned = false;
// Ownership removal will be done globally by the caller.
return m_jobUpdate->m_doReportTransferSuccess;
return m_succesfulTransferUpdater->m_doReportTransferSuccess;
}
//------------------------------------------------------------------------------
// OStoreDB::ArchiveJob::asyncDeleteRequest()
//------------------------------------------------------------------------------
void OStoreDB::ArchiveJob::asyncDeleteRequest() {
log::LogContext lc(m_oStoreDB.m_logger);
log::ScopedParamContainer params(lc);
params.add("requestObject", m_archiveRequest.getAddressIfSet());
lc.log(log::DEBUG, "Will start async delete archiveRequest");
m_requestDeleter.reset(m_archiveRequest.asyncDeleteRequest());
}
//------------------------------------------------------------------------------
// OStoreDB::ArchiveJob::waitAsyncDelete()
//------------------------------------------------------------------------------
void OStoreDB::ArchiveJob::waitAsyncDelete() {
m_requestDeleter->wait();
log::LogContext lc(m_oStoreDB.m_logger);
log::ScopedParamContainer params(lc);
params.add("requestObject", m_archiveRequest.getAddressIfSet());
lc.log(log::DEBUG, "Async delete of archiveRequest complete");
// We no more own the job (which could be gone)
m_jobOwned = false;
}
//------------------------------------------------------------------------------
// OStoreDB::ArchiveJob::~ArchiveJob()
//------------------------------------------------------------------------------
......
......@@ -168,9 +168,11 @@ public:
void failTransfer(const std::string& failureReason, log::LogContext& lc) override;
void failReport(const std::string& failureReason, log::LogContext& lc) override;
private:
void asyncSucceed();
void asyncSucceedTransfer();
/** Returns true if the jobs was the last one and the request should be queued for report. */
bool waitAsyncSucceed();
void asyncDeleteRequest();
void waitAsyncDelete();
public:
void bumpUpTapeFileCount(uint64_t newFileCount) override;
~ArchiveJob() override;
......@@ -181,7 +183,8 @@ public:
std::string m_tapePool;
OStoreDB & m_oStoreDB;
objectstore::ArchiveRequest m_archiveRequest;
std::unique_ptr<objectstore::ArchiveRequest::AsyncTransferSuccessfulUpdater> m_jobUpdate;
std::unique_ptr<objectstore::ArchiveRequest::AsyncTransferSuccessfulUpdater> m_succesfulTransferUpdater;
std::unique_ptr<objectstore::ArchiveRequest::AsyncRequestDeleter> m_requestDeleter;
};
friend class ArchiveJob;
static ArchiveJob* castFromSchedDBJob(SchedulerDatabase::ArchiveJob * job);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment