Skip to content
Snippets Groups Projects
Commit e3207666 authored by Eric Cano's avatar Eric Cano
Browse files

Moved reportJobsBatchWritten from MigrationReportPacker::ReportFlush to cta::ArchiveMount.

This will enable better integration and bulk archive jobs ownership removal from agent.
parent 4510aa36
No related branches found
No related tags found
No related merge requests found
......@@ -47,6 +47,28 @@ void cta::ArchiveJob::asyncSetJobSucceed() {
m_dbJob->asyncSucceed();
}
//------------------------------------------------------------------------------
// asyncSetJobSucceed
//------------------------------------------------------------------------------
void cta::ArchiveJob::asyncSucceedAndWaitJobsBatch(std::list<std::unique_ptr<cta::ArchiveJob> >& jobs) {
// Call succeed on all jobs
for (auto & j: jobs) {
j->asyncSetJobSucceed();
}
}
//------------------------------------------------------------------------------
// asyncSetJobsSucceed
//------------------------------------------------------------------------------
void cta::ArchiveJob::asyncSetJobsBatchSucceed(std::list<std::unique_ptr<cta::ArchiveJob>> & jobs) {
// We need a handle on the mount (all jobs are supposed to come from the same mount.
// It will be provided indirectly by a non-static member function of one job (if any).
if (jobs.size()) {
jobs.front()->asyncSucceedAndWaitJobsBatch(jobs);
}
}
//------------------------------------------------------------------------------
// checkAndReportComplete
//------------------------------------------------------------------------------
......
......@@ -78,6 +78,18 @@ public:
*/
virtual void asyncSetJobSucceed();
/**
* Start an asynchronous update for a batch of jobs and then make sure they complete.
*/
static void asyncSetJobsBatchSucceed(std::list<std::unique_ptr<cta::ArchiveJob>> & jobs);
protected:
/**
* Backend asynchronous batch of jobs update implementation. The default implementation
* can be overridden for efficiency.
*/
virtual void asyncSucceedAndWaitJobsBatch(std::list<std::unique_ptr<cta::ArchiveJob>> & jobs);
public:
/**
* Wait if the job was updated in the backend store asynchronously.
* @return true if the archive was also sent to client asynchronously.
......
......@@ -120,6 +120,110 @@ std::list<std::unique_ptr<cta::ArchiveJob> > cta::ArchiveMount::getNextJobBatch(
return ret;
}
//------------------------------------------------------------------------------
// reportJobsBatchWritten
//------------------------------------------------------------------------------
void cta::ArchiveMount::reportJobsBatchWritten(std::queue<std::unique_ptr<cta::ArchiveJob> > successfulArchiveJobs,
cta::log::LogContext& logContext) {
std::set<cta::catalogue::TapeFileWritten> tapeFilesWritten;
std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs;
std::unique_ptr<cta::ArchiveJob> job;
try{
while(!successfulArchiveJobs.empty()) {
// Get the next job to report and make sure we will not attempt to process it twice.
job = std::move(successfulArchiveJobs.front());
successfulArchiveJobs.pop();
if (!job.get()) continue;
tapeFilesWritten.insert(job->validateAndGetTapeFileWritten());
validatedSuccessfulArchiveJobs.emplace_back(std::move(job));
job.reset(nullptr);
}
// Note: former content of ReportFlush::updateCatalogueWithTapeFilesWritten
updateCatalogueWithTapeFilesWritten(tapeFilesWritten);
{
cta::log::ScopedParamContainer params(logContext);
params.add("tapeFilesWritten", tapeFilesWritten.size());
logContext.log(cta::log::INFO,"Catalog updated for batch of jobs");
}
for (auto &job: validatedSuccessfulArchiveJobs) {
job->asyncSetJobSucceed();
}
// Note: former content of ReportFlush::checkAndAsyncReportCompletedJobs
std::list<std::unique_ptr <cta::ArchiveJob> > reportedArchiveJobs;
for (auto &job: validatedSuccessfulArchiveJobs){
cta::log::ScopedParamContainer params(logContext);
params.add("fileId", job->archiveFile.archiveFileID)
.add("diskInstance", job->archiveFile.diskInstance)
.add("diskFileId", job->archiveFile.diskFileId)
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path);
logContext.log(cta::log::DEBUG,
"In MigrationReportPacker::ReportFlush::checkAndAsyncReportCompletedJobs()"
" check for async backend update finished");
if(job->checkAndAsyncReportComplete()) {
params.add("reportURL", job->reportURL());
reportedArchiveJobs.emplace_back(std::move(job));
logContext.log(cta::log::INFO,"Sent to the client a full file archival");
} else {
logContext.log(cta::log::INFO, "Recorded the partial migration of a file");
}
}
for (const auto &job: reportedArchiveJobs){
try {
job->waitForReporting();
cta::log::ScopedParamContainer params(logContext);
params.add("fileId", job->archiveFile.archiveFileID)
.add("diskInstance", job->archiveFile.diskInstance)
.add("diskFileId", job->archiveFile.diskFileId)
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
.add("reportURL", job->reportURL())
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
.add("reportTime", job->reportTime());
logContext.log(cta::log::INFO,"Reported to the client a full file archival");
} catch(cta::exception::Exception &ex) {
cta::log::ScopedParamContainer params(logContext);
params.add("fileId", job->archiveFile.archiveFileID)
.add("diskInstance", job->archiveFile.diskInstance)
.add("diskFileId", job->archiveFile.diskFileId)
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path).add("reportURL", job->reportURL())
.add("errorMessage", ex.getMessage().str());
logContext.log(cta::log::ERR,"Unsuccessful report to the client a full file archival:");
}
}
logContext.log(cta::log::INFO,"Reported to the client that a batch of files was written on tape");
} catch(const cta::exception::Exception& e){
cta::log::ScopedParamContainer params(logContext);
params.add("exceptionMessageValue", e.getMessageValue());
if (job.get()) {
params.add("fileId", job->archiveFile.archiveFileID)
.add("diskInstance", job->archiveFile.diskInstance)
.add("diskFileId", job->archiveFile.diskFileId)
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
.add("reportURL", job->reportURL());
}
const std::string msg_error="An exception was caught trying to call reportMigrationResults";
logContext.log(cta::log::ERR, msg_error);
throw cta::ArchiveMount::FailedMigrationRecallResult(msg_error);
} catch(const std::exception& e){
cta::log::ScopedParamContainer params(logContext);
params.add("exceptionWhat", e.what());
if (job.get()) {
params.add("fileId", job->archiveFile.archiveFileID)
.add("diskInstance", job->archiveFile.diskInstance)
.add("diskFileId", job->archiveFile.diskFileId)
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path);
}
const std::string msg_error="An std::exception was caught trying to call reportMigrationResults";
logContext.log(cta::log::ERR, msg_error);
throw cta::ArchiveMount::FailedMigrationRecallResult(msg_error);
}
}
//------------------------------------------------------------------------------
// complete
//------------------------------------------------------------------------------
......
......@@ -27,6 +27,7 @@
#include <memory>
#include <atomic>
#include <queue>
namespace cta {
/**
......@@ -54,6 +55,7 @@ namespace cta {
CTA_GENERATE_EXCEPTION_CLASS(WrongMountType);
CTA_GENERATE_EXCEPTION_CLASS(NotImplemented);
CTA_GENERATE_EXCEPTION_CLASS(FailedMigrationRecallResult);
/**
* Returns The type of this tape mount.
......@@ -122,6 +124,15 @@ namespace cta {
std::list<std::unique_ptr<ArchiveJob>> getNextJobBatch(uint64_t filesRequested,
uint64_t bytesRequested, log::LogContext &logContext);
/**
* Report a batch of jobs successes. The reporting will be asynchronous behind
* the scenes.
*
* @param successfulArchiveJobs the jobs to report
* @param logContext
*/
void reportJobsBatchWritten (std::queue<std::unique_ptr<cta::ArchiveJob> > successfulArchiveJobs, cta::log::LogContext &logContext);
/**
* Returns the tape pool of the tape to be mounted.
*
......
......@@ -30,11 +30,6 @@
#include <numeric>
#include <cstdio>
namespace{
struct failedMigrationRecallResult : public cta::exception::Exception{
failedMigrationRecallResult(const std::string& s): Exception(s){}
};
}
using cta::log::LogContext;
using cta::log::Param;
......@@ -206,139 +201,13 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPa
reportPacker.m_lc.log(cta::log::INFO,"Received a flush report from tape, but had no file to report to client. Doing nothing.");
return;
}
proceedJobsBatch(reportPacker,std::move(reportPacker.m_successfulArchiveJobs), reportPacker.m_lc);
reportPacker.m_archiveMount->reportJobsBatchWritten(std::move(reportPacker.m_successfulArchiveJobs), reportPacker.m_lc);
} else {
// This is an abnormal situation: we should never flush after an error!
reportPacker.m_lc.log(cta::log::ALERT,"Received a flush after an error: sending file errors to client");
}
}
//------------------------------------------------------------------------------
//ReportFlush::proceedJobsBatch
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportFlush::proceedJobsBatch(const MigrationReportPacker& reportPacker, std::queue<std::unique_ptr<cta::ArchiveJob> > successfulArchiveJobs, cta::log::LogContext &logContext){
std::set<cta::catalogue::TapeFileWritten> tapeFilesWritten;
std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs;
std::unique_ptr<cta::ArchiveJob> job;
try{
while(!successfulArchiveJobs.empty()) {
// Get the next job to report and make sure we will not attempt to process it twice.
job = std::move(successfulArchiveJobs.front());
successfulArchiveJobs.pop();
if (!job.get()) continue;
tapeFilesWritten.insert(job->validateAndGetTapeFileWritten());
validatedSuccessfulArchiveJobs.emplace_back(std::move(job));
job.reset(nullptr);
}
updateCatalogueWithTapeFilesWritten(reportPacker, tapeFilesWritten, logContext);
asyncUpdateBackendWithJobsSucceeded(validatedSuccessfulArchiveJobs);
checkAndAsyncReportCompletedJobs(validatedSuccessfulArchiveJobs, logContext);
logContext.log(cta::log::INFO,"Reported to the client that a batch of files was written on tape");
} catch(const cta::exception::Exception& e){
cta::log::ScopedParamContainer params(logContext);
params.add("exceptionMessageValue", e.getMessageValue());
if (job.get()) {
params.add("fileId", job->archiveFile.archiveFileID)
.add("diskInstance", job->archiveFile.diskInstance)
.add("diskFileId", job->archiveFile.diskFileId)
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
.add("reportURL", job->reportURL());
}
const std::string msg_error="An exception was caught trying to call reportMigrationResults";
logContext.log(cta::log::ERR, msg_error);
throw failedMigrationRecallResult(msg_error);
} catch(const std::exception& e){
cta::log::ScopedParamContainer params(logContext);
params.add("exceptionWhat", e.what());
if (job.get()) {
params.add("fileId", job->archiveFile.archiveFileID)
.add("diskInstance", job->archiveFile.diskInstance)
.add("diskFileId", job->archiveFile.diskFileId)
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path);
}
const std::string msg_error="An std::exception was caught trying to call reportMigrationResults";
logContext.log(cta::log::ERR, msg_error);
throw failedMigrationRecallResult(msg_error);
}
}
//------------------------------------------------------------------------------
//ReportFlush::asyncUpdateBackendWithJobsSucceeded
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportFlush::asyncUpdateBackendWithJobsSucceeded(
const std::list<std::unique_ptr<cta::ArchiveJob> > &validatedSuccessfulArchiveJobs) {
for (const auto &job: validatedSuccessfulArchiveJobs){
job->asyncSetJobSucceed();
}
}
//------------------------------------------------------------------------------
//ReportFlush::checkAndAsyncReportCompletedJobs
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportFlush::checkAndAsyncReportCompletedJobs(
std::list<std::unique_ptr<cta::ArchiveJob> > &validatedSuccessfulArchiveJobs,
cta::log::LogContext &logContext) {
std::list<std::unique_ptr <cta::ArchiveJob> > reportedArchiveJobs;
for (auto &job: validatedSuccessfulArchiveJobs){
cta::log::ScopedParamContainer params(logContext);
params.add("fileId", job->archiveFile.archiveFileID)
.add("diskInstance", job->archiveFile.diskInstance)
.add("diskFileId", job->archiveFile.diskFileId)
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path);
logContext.log(cta::log::DEBUG,
"In MigrationReportPacker::ReportFlush::checkAndAsyncReportCompletedJobs()"
" check for async backend update finished");
if(job->checkAndAsyncReportComplete()) {
params.add("reportURL", job->reportURL());
reportedArchiveJobs.emplace_back(std::move(job));
logContext.log(cta::log::INFO,"Sent to the client a full file archival");
} else {
logContext.log(cta::log::INFO, "Recorded the partial migration of a file");
}
}
for (const auto &job: reportedArchiveJobs){
try {
job->waitForReporting(); // should not be a deadWait as soon as we have a timeout on the xroot query
cta::log::ScopedParamContainer params(logContext);
params.add("fileId", job->archiveFile.archiveFileID)
.add("diskInstance", job->archiveFile.diskInstance)
.add("diskFileId", job->archiveFile.diskFileId)
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
.add("reportURL", job->reportURL())
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
.add("reportTime", job->reportTime());
logContext.log(cta::log::INFO,"Reported to the client a full file archival");
} catch(cta::exception::Exception &ex) {
cta::log::ScopedParamContainer params(logContext);
params.add("fileId", job->archiveFile.archiveFileID)
.add("diskInstance", job->archiveFile.diskInstance)
.add("diskFileId", job->archiveFile.diskFileId)
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path).add("reportURL", job->reportURL())
.add("errorMessage", ex.getMessage().str());
logContext.log(cta::log::ERR,"Unsuccessful report to the client a full file archival:");
} catch(...) {
throw;
}
}
}
//------------------------------------------------------------------------------
//ReportFlush::updateCatalogueWithTapeFilesWritten
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportFlush::updateCatalogueWithTapeFilesWritten(
const MigrationReportPacker &reportPacker,
const std::set<cta::catalogue::TapeFileWritten> &tapeFilesWritten,
cta::log::LogContext &logContext) {
reportPacker.m_archiveMount->updateCatalogueWithTapeFilesWritten(tapeFilesWritten);
cta::log::ScopedParamContainer params(logContext);
params.add("tapeFilesWritten", tapeFilesWritten.size());
logContext.log(cta::log::INFO,"Catalog updated for batch of jobs");
}
//------------------------------------------------------------------------------
//reportTapeFull()::execute
//------------------------------------------------------------------------------
......@@ -461,7 +330,7 @@ void MigrationReportPacker::WorkerThread::run(){
try{
rep->execute(m_parent);
}
catch(const failedMigrationRecallResult& e){
catch(const cta::ArchiveMount::FailedMigrationRecallResult& e){
//here we catch a failed report MigrationResult. We try to close and if that fails too
//we end up in the catch below
lc.log(cta::log::INFO,"Successfully closed client's session after the failed report MigrationResult");
......
......@@ -165,18 +165,6 @@ private:
ReportFlush(drive::compressionStats compressStats):m_compressStats(compressStats){}
void execute(MigrationReportPacker& reportPacker) override;
void proceedJobsBatch(const MigrationReportPacker& reportPacker,
std::queue<std::unique_ptr<cta::ArchiveJob> > successfulArchiveJobs,
cta::log::LogContext &log);
void asyncUpdateBackendWithJobsSucceeded(
const std::list<std::unique_ptr<cta::ArchiveJob> > &validatedSuccessfulArchiveJobs);
void checkAndAsyncReportCompletedJobs(
std::list<std::unique_ptr<cta::ArchiveJob> > &validatedSuccessfulArchiveJobs,
cta::log::LogContext &logContext);
void updateCatalogueWithTapeFilesWritten(
const MigrationReportPacker &reportPacker,
const std::set<cta::catalogue::TapeFileWritten> &tapeFilesWritten,
cta::log::LogContext &logContext);
};
class ReportTapeFull: public Report {
public:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment