diff --git a/scheduler/ArchiveJob.cpp b/scheduler/ArchiveJob.cpp index af23c6069a9b82960deff18e6ab04c280ebea85a..ea8b52c02aa93195e49afcc9899c84e46aa89412 100644 --- a/scheduler/ArchiveJob.cpp +++ b/scheduler/ArchiveJob.cpp @@ -47,6 +47,28 @@ void cta::ArchiveJob::asyncSetJobSucceed() { m_dbJob->asyncSucceed(); } +//------------------------------------------------------------------------------ +// asyncSetJobSucceed +//------------------------------------------------------------------------------ +void cta::ArchiveJob::asyncSucceedAndWaitJobsBatch(std::list<std::unique_ptr<cta::ArchiveJob> >& jobs) { + // Call succeed on all jobs + for (auto & j: jobs) { + j->asyncSetJobSucceed(); + } +} + + +//------------------------------------------------------------------------------ +// asyncSetJobsSucceed +//------------------------------------------------------------------------------ +void cta::ArchiveJob::asyncSetJobsBatchSucceed(std::list<std::unique_ptr<cta::ArchiveJob>> & jobs) { + // We need a handle on the mount (all jobs are supposed to come from the same mount. + // It will be provided indirectly by a non-static member function of one job (if any). + if (jobs.size()) { + jobs.front()->asyncSucceedAndWaitJobsBatch(jobs); + } +} + //------------------------------------------------------------------------------ // checkAndReportComplete //------------------------------------------------------------------------------ diff --git a/scheduler/ArchiveJob.hpp b/scheduler/ArchiveJob.hpp index 4c881959c8db0f3b30daed65cea432c9203f26ea..629cfb86bcbddba88a5ce6143d7456ba7de1fa99 100644 --- a/scheduler/ArchiveJob.hpp +++ b/scheduler/ArchiveJob.hpp @@ -78,6 +78,18 @@ public: */ virtual void asyncSetJobSucceed(); + /** + * Start an asynchronous update for a batch of jobs and then make sure they complete. + */ + static void asyncSetJobsBatchSucceed(std::list<std::unique_ptr<cta::ArchiveJob>> & jobs); + +protected: + /** + * Backend asynchronous batch of jobs update implementation. The default implementation + * can be overridden for efficiency. + */ + virtual void asyncSucceedAndWaitJobsBatch(std::list<std::unique_ptr<cta::ArchiveJob>> & jobs); +public: /** * Wait if the job was updated in the backend store asynchronously. * @return true if the archive was also sent to client asynchronously. diff --git a/scheduler/ArchiveMount.cpp b/scheduler/ArchiveMount.cpp index 32e7a9dfe9ad33306a3330a1f87d7eecdb1b6311..9f254e958604d58cce7cb3b5115da1a9a70e54be 100644 --- a/scheduler/ArchiveMount.cpp +++ b/scheduler/ArchiveMount.cpp @@ -120,6 +120,110 @@ std::list<std::unique_ptr<cta::ArchiveJob> > cta::ArchiveMount::getNextJobBatch( return ret; } +//------------------------------------------------------------------------------ +// reportJobsBatchWritten +//------------------------------------------------------------------------------ +void cta::ArchiveMount::reportJobsBatchWritten(std::queue<std::unique_ptr<cta::ArchiveJob> > successfulArchiveJobs, + cta::log::LogContext& logContext) { + std::set<cta::catalogue::TapeFileWritten> tapeFilesWritten; + std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs; + std::unique_ptr<cta::ArchiveJob> job; + try{ + while(!successfulArchiveJobs.empty()) { + // Get the next job to report and make sure we will not attempt to process it twice. + job = std::move(successfulArchiveJobs.front()); + successfulArchiveJobs.pop(); + if (!job.get()) continue; + tapeFilesWritten.insert(job->validateAndGetTapeFileWritten()); + validatedSuccessfulArchiveJobs.emplace_back(std::move(job)); + job.reset(nullptr); + } + + // Note: former content of ReportFlush::updateCatalogueWithTapeFilesWritten + updateCatalogueWithTapeFilesWritten(tapeFilesWritten); + { + cta::log::ScopedParamContainer params(logContext); + params.add("tapeFilesWritten", tapeFilesWritten.size()); + logContext.log(cta::log::INFO,"Catalog updated for batch of jobs"); + } + for (auto &job: validatedSuccessfulArchiveJobs) { + job->asyncSetJobSucceed(); + } + + // Note: former content of ReportFlush::checkAndAsyncReportCompletedJobs + std::list<std::unique_ptr <cta::ArchiveJob> > reportedArchiveJobs; + + for (auto &job: validatedSuccessfulArchiveJobs){ + cta::log::ScopedParamContainer params(logContext); + params.add("fileId", job->archiveFile.archiveFileID) + .add("diskInstance", job->archiveFile.diskInstance) + .add("diskFileId", job->archiveFile.diskFileId) + .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path); + logContext.log(cta::log::DEBUG, + "In MigrationReportPacker::ReportFlush::checkAndAsyncReportCompletedJobs()" + " check for async backend update finished"); + if(job->checkAndAsyncReportComplete()) { + params.add("reportURL", job->reportURL()); + reportedArchiveJobs.emplace_back(std::move(job)); + logContext.log(cta::log::INFO,"Sent to the client a full file archival"); + } else { + logContext.log(cta::log::INFO, "Recorded the partial migration of a file"); + } + } + + for (const auto &job: reportedArchiveJobs){ + try { + job->waitForReporting(); + cta::log::ScopedParamContainer params(logContext); + params.add("fileId", job->archiveFile.archiveFileID) + .add("diskInstance", job->archiveFile.diskInstance) + .add("diskFileId", job->archiveFile.diskFileId) + .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) + .add("reportURL", job->reportURL()) + .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) + .add("reportTime", job->reportTime()); + logContext.log(cta::log::INFO,"Reported to the client a full file archival"); + } catch(cta::exception::Exception &ex) { + cta::log::ScopedParamContainer params(logContext); + params.add("fileId", job->archiveFile.archiveFileID) + .add("diskInstance", job->archiveFile.diskInstance) + .add("diskFileId", job->archiveFile.diskFileId) + .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path).add("reportURL", job->reportURL()) + .add("errorMessage", ex.getMessage().str()); + logContext.log(cta::log::ERR,"Unsuccessful report to the client a full file archival:"); + } + } + + logContext.log(cta::log::INFO,"Reported to the client that a batch of files was written on tape"); + } catch(const cta::exception::Exception& e){ + cta::log::ScopedParamContainer params(logContext); + params.add("exceptionMessageValue", e.getMessageValue()); + if (job.get()) { + params.add("fileId", job->archiveFile.archiveFileID) + .add("diskInstance", job->archiveFile.diskInstance) + .add("diskFileId", job->archiveFile.diskFileId) + .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) + .add("reportURL", job->reportURL()); + } + const std::string msg_error="An exception was caught trying to call reportMigrationResults"; + logContext.log(cta::log::ERR, msg_error); + throw cta::ArchiveMount::FailedMigrationRecallResult(msg_error); + } catch(const std::exception& e){ + cta::log::ScopedParamContainer params(logContext); + params.add("exceptionWhat", e.what()); + if (job.get()) { + params.add("fileId", job->archiveFile.archiveFileID) + .add("diskInstance", job->archiveFile.diskInstance) + .add("diskFileId", job->archiveFile.diskFileId) + .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path); + } + const std::string msg_error="An std::exception was caught trying to call reportMigrationResults"; + logContext.log(cta::log::ERR, msg_error); + throw cta::ArchiveMount::FailedMigrationRecallResult(msg_error); + } +} + + //------------------------------------------------------------------------------ // complete //------------------------------------------------------------------------------ diff --git a/scheduler/ArchiveMount.hpp b/scheduler/ArchiveMount.hpp index f6584249d153294ded5d2d712535c731167071f0..0af070f07fa9027ed4d13a2b81e1726794f9da95 100644 --- a/scheduler/ArchiveMount.hpp +++ b/scheduler/ArchiveMount.hpp @@ -27,6 +27,7 @@ #include <memory> #include <atomic> +#include <queue> namespace cta { /** @@ -54,6 +55,7 @@ namespace cta { CTA_GENERATE_EXCEPTION_CLASS(WrongMountType); CTA_GENERATE_EXCEPTION_CLASS(NotImplemented); + CTA_GENERATE_EXCEPTION_CLASS(FailedMigrationRecallResult); /** * Returns The type of this tape mount. @@ -122,6 +124,15 @@ namespace cta { std::list<std::unique_ptr<ArchiveJob>> getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext &logContext); + /** + * Report a batch of jobs successes. The reporting will be asynchronous behind + * the scenes. + * + * @param successfulArchiveJobs the jobs to report + * @param logContext + */ + void reportJobsBatchWritten (std::queue<std::unique_ptr<cta::ArchiveJob> > successfulArchiveJobs, cta::log::LogContext &logContext); + /** * Returns the tape pool of the tape to be mounted. * diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp index fb74754d9449498823fbb0cf477fce1ebfb064fc..fbc4f9e734c0606eeb3ed6e54d189d7003ae2a20 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp @@ -30,11 +30,6 @@ #include <numeric> #include <cstdio> -namespace{ - struct failedMigrationRecallResult : public cta::exception::Exception{ - failedMigrationRecallResult(const std::string& s): Exception(s){} - }; -} using cta::log::LogContext; using cta::log::Param; @@ -206,139 +201,13 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPa reportPacker.m_lc.log(cta::log::INFO,"Received a flush report from tape, but had no file to report to client. Doing nothing."); return; } - proceedJobsBatch(reportPacker,std::move(reportPacker.m_successfulArchiveJobs), reportPacker.m_lc); + reportPacker.m_archiveMount->reportJobsBatchWritten(std::move(reportPacker.m_successfulArchiveJobs), reportPacker.m_lc); } else { // This is an abnormal situation: we should never flush after an error! reportPacker.m_lc.log(cta::log::ALERT,"Received a flush after an error: sending file errors to client"); } } -//------------------------------------------------------------------------------ -//ReportFlush::proceedJobsBatch -//------------------------------------------------------------------------------ -void MigrationReportPacker::ReportFlush::proceedJobsBatch(const MigrationReportPacker& reportPacker, std::queue<std::unique_ptr<cta::ArchiveJob> > successfulArchiveJobs, cta::log::LogContext &logContext){ - std::set<cta::catalogue::TapeFileWritten> tapeFilesWritten; - std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs; - std::unique_ptr<cta::ArchiveJob> job; - try{ - while(!successfulArchiveJobs.empty()) { - // Get the next job to report and make sure we will not attempt to process it twice. - job = std::move(successfulArchiveJobs.front()); - successfulArchiveJobs.pop(); - if (!job.get()) continue; - tapeFilesWritten.insert(job->validateAndGetTapeFileWritten()); - validatedSuccessfulArchiveJobs.emplace_back(std::move(job)); - job.reset(nullptr); - } - - updateCatalogueWithTapeFilesWritten(reportPacker, tapeFilesWritten, logContext); - asyncUpdateBackendWithJobsSucceeded(validatedSuccessfulArchiveJobs); - checkAndAsyncReportCompletedJobs(validatedSuccessfulArchiveJobs, logContext); - - logContext.log(cta::log::INFO,"Reported to the client that a batch of files was written on tape"); - } catch(const cta::exception::Exception& e){ - cta::log::ScopedParamContainer params(logContext); - params.add("exceptionMessageValue", e.getMessageValue()); - if (job.get()) { - params.add("fileId", job->archiveFile.archiveFileID) - .add("diskInstance", job->archiveFile.diskInstance) - .add("diskFileId", job->archiveFile.diskFileId) - .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) - .add("reportURL", job->reportURL()); - } - const std::string msg_error="An exception was caught trying to call reportMigrationResults"; - logContext.log(cta::log::ERR, msg_error); - throw failedMigrationRecallResult(msg_error); - } catch(const std::exception& e){ - cta::log::ScopedParamContainer params(logContext); - params.add("exceptionWhat", e.what()); - if (job.get()) { - params.add("fileId", job->archiveFile.archiveFileID) - .add("diskInstance", job->archiveFile.diskInstance) - .add("diskFileId", job->archiveFile.diskFileId) - .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path); - } - const std::string msg_error="An std::exception was caught trying to call reportMigrationResults"; - logContext.log(cta::log::ERR, msg_error); - throw failedMigrationRecallResult(msg_error); - } -} - -//------------------------------------------------------------------------------ -//ReportFlush::asyncUpdateBackendWithJobsSucceeded -//------------------------------------------------------------------------------ -void MigrationReportPacker::ReportFlush::asyncUpdateBackendWithJobsSucceeded( - const std::list<std::unique_ptr<cta::ArchiveJob> > &validatedSuccessfulArchiveJobs) { - for (const auto &job: validatedSuccessfulArchiveJobs){ - job->asyncSetJobSucceed(); - } -} - -//------------------------------------------------------------------------------ -//ReportFlush::checkAndAsyncReportCompletedJobs -//------------------------------------------------------------------------------ -void MigrationReportPacker::ReportFlush::checkAndAsyncReportCompletedJobs( - std::list<std::unique_ptr<cta::ArchiveJob> > &validatedSuccessfulArchiveJobs, - cta::log::LogContext &logContext) { - std::list<std::unique_ptr <cta::ArchiveJob> > reportedArchiveJobs; - - for (auto &job: validatedSuccessfulArchiveJobs){ - cta::log::ScopedParamContainer params(logContext); - params.add("fileId", job->archiveFile.archiveFileID) - .add("diskInstance", job->archiveFile.diskInstance) - .add("diskFileId", job->archiveFile.diskFileId) - .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path); - logContext.log(cta::log::DEBUG, - "In MigrationReportPacker::ReportFlush::checkAndAsyncReportCompletedJobs()" - " check for async backend update finished"); - if(job->checkAndAsyncReportComplete()) { - params.add("reportURL", job->reportURL()); - reportedArchiveJobs.emplace_back(std::move(job)); - logContext.log(cta::log::INFO,"Sent to the client a full file archival"); - } else { - logContext.log(cta::log::INFO, "Recorded the partial migration of a file"); - } - } - - for (const auto &job: reportedArchiveJobs){ - try { - job->waitForReporting(); // should not be a deadWait as soon as we have a timeout on the xroot query - cta::log::ScopedParamContainer params(logContext); - params.add("fileId", job->archiveFile.archiveFileID) - .add("diskInstance", job->archiveFile.diskInstance) - .add("diskFileId", job->archiveFile.diskFileId) - .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) - .add("reportURL", job->reportURL()) - .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) - .add("reportTime", job->reportTime()); - logContext.log(cta::log::INFO,"Reported to the client a full file archival"); - } catch(cta::exception::Exception &ex) { - cta::log::ScopedParamContainer params(logContext); - params.add("fileId", job->archiveFile.archiveFileID) - .add("diskInstance", job->archiveFile.diskInstance) - .add("diskFileId", job->archiveFile.diskFileId) - .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path).add("reportURL", job->reportURL()) - .add("errorMessage", ex.getMessage().str()); - logContext.log(cta::log::ERR,"Unsuccessful report to the client a full file archival:"); - } catch(...) { - throw; - } - } -} - -//------------------------------------------------------------------------------ -//ReportFlush::updateCatalogueWithTapeFilesWritten -//------------------------------------------------------------------------------ -void MigrationReportPacker::ReportFlush::updateCatalogueWithTapeFilesWritten( - const MigrationReportPacker &reportPacker, - const std::set<cta::catalogue::TapeFileWritten> &tapeFilesWritten, - cta::log::LogContext &logContext) { - reportPacker.m_archiveMount->updateCatalogueWithTapeFilesWritten(tapeFilesWritten); - cta::log::ScopedParamContainer params(logContext); - params.add("tapeFilesWritten", tapeFilesWritten.size()); - logContext.log(cta::log::INFO,"Catalog updated for batch of jobs"); -} - //------------------------------------------------------------------------------ //reportTapeFull()::execute //------------------------------------------------------------------------------ @@ -461,7 +330,7 @@ void MigrationReportPacker::WorkerThread::run(){ try{ rep->execute(m_parent); } - catch(const failedMigrationRecallResult& e){ + catch(const cta::ArchiveMount::FailedMigrationRecallResult& e){ //here we catch a failed report MigrationResult. We try to close and if that fails too //we end up in the catch below lc.log(cta::log::INFO,"Successfully closed client's session after the failed report MigrationResult"); diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp index 1bd7a6f2e051a7591bb4b1c5fca61a4b5457f381..dd014536c7a2e4ff5e068f3294ef558c2311164c 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp @@ -165,18 +165,6 @@ private: ReportFlush(drive::compressionStats compressStats):m_compressStats(compressStats){} void execute(MigrationReportPacker& reportPacker) override; - void proceedJobsBatch(const MigrationReportPacker& reportPacker, - std::queue<std::unique_ptr<cta::ArchiveJob> > successfulArchiveJobs, - cta::log::LogContext &log); - void asyncUpdateBackendWithJobsSucceeded( - const std::list<std::unique_ptr<cta::ArchiveJob> > &validatedSuccessfulArchiveJobs); - void checkAndAsyncReportCompletedJobs( - std::list<std::unique_ptr<cta::ArchiveJob> > &validatedSuccessfulArchiveJobs, - cta::log::LogContext &logContext); - void updateCatalogueWithTapeFilesWritten( - const MigrationReportPacker &reportPacker, - const std::set<cta::catalogue::TapeFileWritten> &tapeFilesWritten, - cta::log::LogContext &logContext); }; class ReportTapeFull: public Report { public: