diff --git a/scheduler/ArchiveJob.cpp b/scheduler/ArchiveJob.cpp index ea8b52c02aa93195e49afcc9899c84e46aa89412..595b08a028aafe7826a5baa330462271c30706a8 100644 --- a/scheduler/ArchiveJob.cpp +++ b/scheduler/ArchiveJob.cpp @@ -41,45 +41,12 @@ cta::ArchiveJob::ArchiveJob(ArchiveMount &mount, tapeFile(tapeFile) {} //------------------------------------------------------------------------------ -// asyncSetJobSucceed +// asyncReportComplete //------------------------------------------------------------------------------ -void cta::ArchiveJob::asyncSetJobSucceed() { - m_dbJob->asyncSucceed(); -} - -//------------------------------------------------------------------------------ -// asyncSetJobSucceed -//------------------------------------------------------------------------------ -void cta::ArchiveJob::asyncSucceedAndWaitJobsBatch(std::list<std::unique_ptr<cta::ArchiveJob> >& jobs) { - // Call succeed on all jobs - for (auto & j: jobs) { - j->asyncSetJobSucceed(); - } -} - - -//------------------------------------------------------------------------------ -// asyncSetJobsSucceed -//------------------------------------------------------------------------------ -void cta::ArchiveJob::asyncSetJobsBatchSucceed(std::list<std::unique_ptr<cta::ArchiveJob>> & jobs) { - // We need a handle on the mount (all jobs are supposed to come from the same mount. - // It will be provided indirectly by a non-static member function of one job (if any). - if (jobs.size()) { - jobs.front()->asyncSucceedAndWaitJobsBatch(jobs); - } -} - -//------------------------------------------------------------------------------ -// checkAndReportComplete -//------------------------------------------------------------------------------ -bool cta::ArchiveJob::checkAndAsyncReportComplete() { - if (m_dbJob->checkSucceed()) { - m_reporter.reset(m_mount.createDiskReporter(m_dbJob->archiveReportURL, m_reporterState)); - m_reporter->asyncReportArchiveFullyComplete(); - m_reporterTimer.reset(); - return true; - } - return false; +void cta::ArchiveJob::asyncReportComplete() { + m_reporter.reset(m_mount.createDiskReporter(m_dbJob->archiveReportURL, m_reporterState)); + m_reporter->asyncReportArchiveFullyComplete(); + m_reporterTimer.reset(); } //------------------------------------------------------------------------------ @@ -89,30 +56,6 @@ double cta::ArchiveJob::reportTime() { return m_reporterTimer.secs(); } -//------------------------------------------------------------------------------ -// ArchiveJob::writeToCatalogue -//------------------------------------------------------------------------------ -void cta::ArchiveJob::writeToCatalogue() { - catalogue::TapeFileWritten fileReport; - fileReport.archiveFileId = archiveFile.archiveFileID; - fileReport.blockId = tapeFile.blockId; - fileReport.checksumType = tapeFile.checksumType; - fileReport.checksumValue = tapeFile.checksumValue; - fileReport.compressedSize = tapeFile.compressedSize; - fileReport.copyNb = tapeFile.copyNb; - fileReport.diskFileId = archiveFile.diskFileId; - fileReport.diskFileUser = archiveFile.diskFileInfo.owner; - fileReport.diskFileGroup = archiveFile.diskFileInfo.group; - fileReport.diskFilePath = archiveFile.diskFileInfo.path; - fileReport.diskFileRecoveryBlob = archiveFile.diskFileInfo.recoveryBlob; - fileReport.diskInstance = archiveFile.diskInstance; - fileReport.fSeq = tapeFile.fSeq; - fileReport.size = archiveFile.fileSize; - fileReport.storageClassName = archiveFile.storageClass; - fileReport.tapeDrive = m_mount.getDrive(); - fileReport.vid = tapeFile.vid; - m_catalogue.filesWrittenToTape (std::set<catalogue::TapeFileWritten>{fileReport}); -} //------------------------------------------------------------------------------ // ArchiveJob::validateAndGetTapeFileWritten //------------------------------------------------------------------------------ diff --git a/scheduler/ArchiveJob.hpp b/scheduler/ArchiveJob.hpp index 629cfb86bcbddba88a5ce6143d7456ba7de1fa99..525598360098e8d17a72161fc8aa5e35f9821510 100644 --- a/scheduler/ArchiveJob.hpp +++ b/scheduler/ArchiveJob.hpp @@ -72,29 +72,15 @@ public: CTA_GENERATE_EXCEPTION_CLASS(ChecksumNotSet); CTA_GENERATE_EXCEPTION_CLASS(ChecksumMismatch); - /** - * Indicates that the job was successful and updates the backend store - * asynchronously. - */ - virtual void asyncSetJobSucceed(); - /** * Start an asynchronous update for a batch of jobs and then make sure they complete. */ static void asyncSetJobsBatchSucceed(std::list<std::unique_ptr<cta::ArchiveJob>> & jobs); - -protected: - /** - * Backend asynchronous batch of jobs update implementation. The default implementation - * can be overridden for efficiency. - */ - virtual void asyncSucceedAndWaitJobsBatch(std::list<std::unique_ptr<cta::ArchiveJob>> & jobs); public: /** - * Wait if the job was updated in the backend store asynchronously. - * @return true if the archive was also sent to client asynchronously. + * Launch a report to the user. */ - virtual bool checkAndAsyncReportComplete(); + virtual void asyncReportComplete(); /** * Get the report time (in seconds). @@ -108,11 +94,6 @@ public: */ virtual void validate(); - /** - * Update the catalog with the archive request. - */ - virtual void writeToCatalogue(); - /** * Validate that archiveFile and tapeFile fields are set correctly for archive * request. diff --git a/scheduler/ArchiveMount.cpp b/scheduler/ArchiveMount.cpp index 9f254e958604d58cce7cb3b5115da1a9a70e54be..1c27486fbdadec33a21dde754ab323484f6a69bb 100644 --- a/scheduler/ArchiveMount.cpp +++ b/scheduler/ArchiveMount.cpp @@ -123,78 +123,106 @@ std::list<std::unique_ptr<cta::ArchiveJob> > cta::ArchiveMount::getNextJobBatch( //------------------------------------------------------------------------------ // reportJobsBatchWritten //------------------------------------------------------------------------------ -void cta::ArchiveMount::reportJobsBatchWritten(std::queue<std::unique_ptr<cta::ArchiveJob> > successfulArchiveJobs, +void cta::ArchiveMount::reportJobsBatchWritten(std::queue<std::unique_ptr<cta::ArchiveJob> > & successfulArchiveJobs, cta::log::LogContext& logContext) { std::set<cta::catalogue::TapeFileWritten> tapeFilesWritten; std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs; std::unique_ptr<cta::ArchiveJob> job; try{ + uint64_t files=0; + uint64_t bytes=0; + double catalogueTime=0; + double schedulerDbTime=0; + double clientReportingTime=0; while(!successfulArchiveJobs.empty()) { // Get the next job to report and make sure we will not attempt to process it twice. job = std::move(successfulArchiveJobs.front()); successfulArchiveJobs.pop(); if (!job.get()) continue; tapeFilesWritten.insert(job->validateAndGetTapeFileWritten()); + files++; + bytes+=job->archiveFile.fileSize; validatedSuccessfulArchiveJobs.emplace_back(std::move(job)); job.reset(nullptr); } - + utils::Timer t; // Note: former content of ReportFlush::updateCatalogueWithTapeFilesWritten updateCatalogueWithTapeFilesWritten(tapeFilesWritten); + catalogueTime=t.secs(utils::Timer::resetCounter); { cta::log::ScopedParamContainer params(logContext); - params.add("tapeFilesWritten", tapeFilesWritten.size()); - logContext.log(cta::log::INFO,"Catalog updated for batch of jobs"); + params.add("tapeFilesWritten", tapeFilesWritten.size()) + .add("files", files) + .add("bytes", bytes) + .add("catalogueTime", catalogueTime); + logContext.log(cta::log::DEBUG,"Catalog updated for batch of jobs"); } - for (auto &job: validatedSuccessfulArchiveJobs) { - job->asyncSetJobSucceed(); + + // Now get the db mount to mark the jobs as successful. + // Extract the db jobs from the scheduler jobs. + std::list<cta::SchedulerDatabase::ArchiveJob *> validatedSuccessfulDBArchiveJobs; + for (auto &schJob: validatedSuccessfulArchiveJobs) { + validatedSuccessfulDBArchiveJobs.emplace_back(schJob->m_dbJob.get()); } - // Note: former content of ReportFlush::checkAndAsyncReportCompletedJobs - std::list<std::unique_ptr <cta::ArchiveJob> > reportedArchiveJobs; - - for (auto &job: validatedSuccessfulArchiveJobs){ + // We can now pass this list for the dbMount to process. + // The dbMount will indicate the list of jobs that need to the reported to + // the client (the complete ones) in the report set. + std::set<cta::SchedulerDatabase::ArchiveJob *> jobsToReport = + m_dbMount->setJobBatchSuccessful(validatedSuccessfulDBArchiveJobs, logContext); + schedulerDbTime=t.secs(utils::Timer::resetCounter); + // We have the list of files to report to the user and the that just needed + // an update. + for (auto &job: validatedSuccessfulArchiveJobs) { cta::log::ScopedParamContainer params(logContext); params.add("fileId", job->archiveFile.archiveFileID) .add("diskInstance", job->archiveFile.diskInstance) .add("diskFileId", job->archiveFile.diskFileId) .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path); - logContext.log(cta::log::DEBUG, - "In MigrationReportPacker::ReportFlush::checkAndAsyncReportCompletedJobs()" - " check for async backend update finished"); - if(job->checkAndAsyncReportComplete()) { - params.add("reportURL", job->reportURL()); - reportedArchiveJobs.emplace_back(std::move(job)); - logContext.log(cta::log::INFO,"Sent to the client a full file archival"); + if (jobsToReport.count(job->m_dbJob.get())) { + logContext.log(cta::log::DEBUG, + "In ArchiveMount::reportJobsBatchWritten(): archive request complete. Will launch async report to user."); + job->asyncReportComplete(); } else { - logContext.log(cta::log::INFO, "Recorded the partial migration of a file"); + logContext.log(cta::log::DEBUG, + "In ArchiveMount::reportJobsBatchWritten(): Recorded the partial migration of a file."); } } - - for (const auto &job: reportedArchiveJobs){ - try { - job->waitForReporting(); - cta::log::ScopedParamContainer params(logContext); - params.add("fileId", job->archiveFile.archiveFileID) - .add("diskInstance", job->archiveFile.diskInstance) - .add("diskFileId", job->archiveFile.diskFileId) - .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) - .add("reportURL", job->reportURL()) - .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) - .add("reportTime", job->reportTime()); - logContext.log(cta::log::INFO,"Reported to the client a full file archival"); - } catch(cta::exception::Exception &ex) { - cta::log::ScopedParamContainer params(logContext); + + // Now gather the result of the reporting to client. + for (auto &job: validatedSuccessfulArchiveJobs) { + if (jobsToReport.count(job->m_dbJob.get())) { + try { + job->waitForReporting(); + cta::log::ScopedParamContainer params(logContext); params.add("fileId", job->archiveFile.archiveFileID) .add("diskInstance", job->archiveFile.diskInstance) .add("diskFileId", job->archiveFile.diskFileId) - .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path).add("reportURL", job->reportURL()) - .add("errorMessage", ex.getMessage().str()); - logContext.log(cta::log::ERR,"Unsuccessful report to the client a full file archival:"); + .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) + .add("reportURL", job->reportURL()) + .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) + .add("reportTime", job->reportTime()); + logContext.log(cta::log::INFO,"Reported to the client a full file archival"); + } catch(cta::exception::Exception &ex) { + cta::log::ScopedParamContainer params(logContext); + params.add("fileId", job->archiveFile.archiveFileID) + .add("diskInstance", job->archiveFile.diskInstance) + .add("diskFileId", job->archiveFile.diskFileId) + .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path).add("reportURL", job->reportURL()) + .add("errorMessage", ex.getMessage().str()); + logContext.log(cta::log::ERR,"Unsuccessful report to the client a full file archival:"); + } } } - - logContext.log(cta::log::INFO,"Reported to the client that a batch of files was written on tape"); + clientReportingTime=t.secs(); + cta::log::ScopedParamContainer params(logContext); + params.add("files", files) + .add("bytes", bytes) + .add("catalogueTime", catalogueTime) + .add("schedulerDbTime", schedulerDbTime) + .add("clientReportingTime", clientReportingTime) + .add("totalTime", catalogueTime + schedulerDbTime + clientReportingTime); + logContext.log(log::INFO, "In ArchiveMount::reportJobsBatchWritten(): recorded a batch of archive jobs in metadata."); } catch(const cta::exception::Exception& e){ cta::log::ScopedParamContainer params(logContext); params.add("exceptionMessageValue", e.getMessageValue()); @@ -205,7 +233,7 @@ void cta::ArchiveMount::reportJobsBatchWritten(std::queue<std::unique_ptr<cta::A .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) .add("reportURL", job->reportURL()); } - const std::string msg_error="An exception was caught trying to call reportMigrationResults"; + const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): got an exception"; logContext.log(cta::log::ERR, msg_error); throw cta::ArchiveMount::FailedMigrationRecallResult(msg_error); } catch(const std::exception& e){ @@ -217,7 +245,7 @@ void cta::ArchiveMount::reportJobsBatchWritten(std::queue<std::unique_ptr<cta::A .add("diskFileId", job->archiveFile.diskFileId) .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path); } - const std::string msg_error="An std::exception was caught trying to call reportMigrationResults"; + const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): got an standard exception"; logContext.log(cta::log::ERR, msg_error); throw cta::ArchiveMount::FailedMigrationRecallResult(msg_error); } @@ -267,4 +295,3 @@ void cta::ArchiveMount::setTapeSessionStats(const castor::tape::tapeserver::daem void cta::ArchiveMount::setTapeFull() { m_catalogue.noSpaceLeftOnTape(m_dbMount->getMountInfo().vid); } - diff --git a/scheduler/ArchiveMount.hpp b/scheduler/ArchiveMount.hpp index 0af070f07fa9027ed4d13a2b81e1726794f9da95..81e53a4b2070650c7a2d68883be5b81a4969739f 100644 --- a/scheduler/ArchiveMount.hpp +++ b/scheduler/ArchiveMount.hpp @@ -32,7 +32,7 @@ namespace cta { /** * The class driving a retrieve mount. - * The class only has private constructors as it is instanciated by + * The class only has private constructors as it is instantiated by * the Scheduler class. */ class ArchiveMount: public TapeMount { @@ -131,7 +131,7 @@ namespace cta { * @param successfulArchiveJobs the jobs to report * @param logContext */ - void reportJobsBatchWritten (std::queue<std::unique_ptr<cta::ArchiveJob> > successfulArchiveJobs, cta::log::LogContext &logContext); + virtual void reportJobsBatchWritten (std::queue<std::unique_ptr<cta::ArchiveJob> > & successfulArchiveJobs, cta::log::LogContext &logContext); /** * Returns the tape pool of the tape to be mounted. diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 9146f6e8e6dfc44ff8ef26eba5aac38967694a05..f8209895589e7a9f0707da04042959873514aada 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -2547,6 +2547,41 @@ void OStoreDB::ArchiveMount::setTapeSessionStats(const castor::tape::tapeserver: m_oStoreDB.updateDriveStatistics(driveInfo, inputs, lc); } +//------------------------------------------------------------------------------ +// OStoreDB::ArchiveMount::castFromSchedDBJob() +//------------------------------------------------------------------------------ +OStoreDB::ArchiveJob * OStoreDB::ArchiveMount::castFromSchedDBJob(SchedulerDatabase::ArchiveJob * job) { + OStoreDB::ArchiveJob * ret = dynamic_cast<OStoreDB::ArchiveJob *> (job); + if (!ret) { + std::string unexpectedType = typeid(*job).name(); + throw cta::exception::Exception(std::string("In OStoreDB::ArchiveMount::castFromSchedDBJob(): unexpected archive job type while casting: ")+ + unexpectedType); + } + return ret; +} + +//------------------------------------------------------------------------------ +// OStoreDB::ArchiveMount::asyncSetJobBatchSuccessful() +//------------------------------------------------------------------------------ +std::set<cta::SchedulerDatabase::ArchiveJob*> OStoreDB::ArchiveMount::setJobBatchSuccessful( + std::list<cta::SchedulerDatabase::ArchiveJob*>& jobsBatch, log::LogContext & lc) { + std::set<cta::SchedulerDatabase::ArchiveJob*> ret; + std::list<std::string> ajToUnown; + // We will asynchronously report the archive jobs (which MUST be OStoreDBJobs). + // We let the exceptions through as failing to report is fatal. + for (auto & sDBJob: jobsBatch) { + castFromSchedDBJob(sDBJob)->asyncSucceed(); + } + for (auto & sDBJob: jobsBatch) { + if (castFromSchedDBJob(sDBJob)->waitAsyncSucceed()) + ret.insert(sDBJob); + ajToUnown.push_back(castFromSchedDBJob(sDBJob)->m_archiveRequest.getAddressIfSet()); + } + m_oStoreDB.m_agentReference->removeBatchFromOwnership(ajToUnown, m_oStoreDB.m_objectStore); + return ret; +} + + //------------------------------------------------------------------------------ // OStoreDB::ArchiveJob::fail() //------------------------------------------------------------------------------ @@ -2612,7 +2647,7 @@ void OStoreDB::ArchiveJob::asyncSucceed() { //------------------------------------------------------------------------------ // OStoreDB::ArchiveJob::checkSucceed() //------------------------------------------------------------------------------ -bool OStoreDB::ArchiveJob::checkSucceed() { +bool OStoreDB::ArchiveJob::waitAsyncSucceed() { m_jobUpdate->wait(); log::LogContext lc(m_oStoreDB.m_logger); log::ScopedParamContainer params(lc); @@ -2623,11 +2658,7 @@ bool OStoreDB::ArchiveJob::checkSucceed() { } // We no more own the job (which could be gone) m_jobOwned = false; - // Remove ownership from agent - const std::string atfrAddress = m_archiveRequest.getAddressIfSet(); - m_oStoreDB.m_agentReference->removeFromOwnership(atfrAddress, m_oStoreDB.m_objectStore); - params.add("agentObject", m_oStoreDB.m_agentReference->getAgentAddress()); - lc.log(log::DEBUG, "Removed job from ownership"); + // Ownership removal will be done globally by the caller. return m_jobUpdate->m_isLastJob; } diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index 925f798e09ec78ab2bd3664f26780e708b651324..992f6d8db4d6ace16e9d8258f1efe054d2cb1956 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -115,6 +115,7 @@ public: void trimEmptyQueues(log::LogContext& lc) override; /* === Archive Mount handling ============================================= */ + class ArchiveJob; class ArchiveMount: public SchedulerDatabase::ArchiveMount { friend class TapeMountDecisionInfo; private: @@ -123,11 +124,16 @@ public: public: CTA_GENERATE_EXCEPTION_CLASS(MaxFSeqNotGoingUp); const MountInfo & getMountInfo() override; - std::list<std::unique_ptr<ArchiveJob> > getNextJobBatch(uint64_t filesRequested, + std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext& logContext) override; void complete(time_t completionTime) override; void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override; void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override; + private: + OStoreDB::ArchiveJob * castFromSchedDBJob(SchedulerDatabase::ArchiveJob * job); + public: + std::set<cta::SchedulerDatabase::ArchiveJob*> setJobBatchSuccessful( + std::list<cta::SchedulerDatabase::ArchiveJob*>& jobsBatch, log::LogContext & lc) override; }; friend class ArchiveMount; @@ -138,8 +144,10 @@ public: CTA_GENERATE_EXCEPTION_CLASS(JobNowOwned); CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob); void fail(log::LogContext & lc) override; - void asyncSucceed() override; - bool checkSucceed() override; + private: + void asyncSucceed(); + bool waitAsyncSucceed(); + public: void bumpUpTapeFileCount(uint64_t newFileCount) override; ~ArchiveJob() override; private: diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index bac987e63207585da78c650b07cd8e743eff5c36..f9e3062ed72225dc2edaee6264fd629a3e9697d4 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -172,6 +172,8 @@ public: virtual void complete(time_t completionTime) = 0; virtual void setDriveStatus(common::dataStructures::DriveStatus status, time_t completionTime) = 0; virtual void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) = 0; + virtual std::set<cta::SchedulerDatabase::ArchiveJob *> setJobBatchSuccessful( + std::list<cta::SchedulerDatabase::ArchiveJob *> & jobsBatch, log::LogContext & lc) = 0; virtual ~ArchiveMount() {} uint32_t nbFilesCurrentlyOnTape; }; @@ -180,16 +182,13 @@ public: * The class to handle the DB-side of a tape job. */ class ArchiveJob { + friend class ArchiveMount; public: std::string srcURL; std::string archiveReportURL; cta::common::dataStructures::ArchiveFile archiveFile; cta::common::dataStructures::TapeFile tapeFile; virtual void fail(log::LogContext & lc) = 0; - /// Indicates a success to the DB. - virtual void asyncSucceed() = 0; - /// Check a succeed job status. If this is the last job, return true. - virtual bool checkSucceed() = 0; virtual void bumpUpTapeFileCount(uint64_t newFileCount) = 0; virtual ~ArchiveJob() {} }; diff --git a/scheduler/SchedulerDatabaseTest.cpp b/scheduler/SchedulerDatabaseTest.cpp index d2d73c510d6a9040cf27b0e779d428438624ed0d..57fc412ea052e3cf63fbdd9b97fd15c542531bcc 100644 --- a/scheduler/SchedulerDatabaseTest.cpp +++ b/scheduler/SchedulerDatabaseTest.cpp @@ -203,10 +203,10 @@ TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) { while (!done) { auto aj = am->getNextJobBatch(1,1,lc); if (aj.size()) { + std::list <cta::SchedulerDatabase::ArchiveJob *> jobBatch; + jobBatch.emplace_back(std::move(aj.front()).get()); count++; - //std::cout << aj->archiveFile.diskFileInfo.path << std::endl; - aj.front()->asyncSucceed(); - aj.front()->checkSucceed(); + am->setJobBatchSuccessful(jobBatch, lc); } else done = true; @@ -290,10 +290,10 @@ TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) { while (!done) { auto aj = am->getNextJobBatch(1,1,lc); if (aj.size()) { + std::list <cta::SchedulerDatabase::ArchiveJob *> jobBatch; + jobBatch.emplace_back(aj.front().get()); count++; - //std::cout << aj->archiveFile.diskFileInfo.path << std::endl; - aj.front()->asyncSucceed(); - aj.front()->checkSucceed(); + am->setJobBatchSuccessful(jobBatch, lc); } else done = true; diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index a9070717b6dd7b635afef30cd331dcab3783f8cc..fe45504f62be2440f6242c14cf7811fb813c5ad5 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -452,7 +452,7 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) { ASSERT_NE((cta::ArchiveMount*)NULL, archiveMount.get()); std::list<std::unique_ptr<cta::ArchiveJob>> archiveJobBatch = archiveMount->getNextJobBatch(1,1,lc); ASSERT_NE((cta::ArchiveJob*)NULL, archiveJobBatch.front().get()); - auto * archiveJob = archiveJobBatch.front().get(); + std::unique_ptr<ArchiveJob> archiveJob = std::move(archiveJobBatch.front()); archiveJob->tapeFile.blockId = 1; archiveJob->tapeFile.fSeq = 1; archiveJob->tapeFile.checksumType = "ADLER32"; @@ -460,9 +460,9 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) { archiveJob->tapeFile.compressedSize = archiveJob->archiveFile.fileSize; archiveJob->tapeFile.copyNb = 1; archiveJob->validate(); - archiveJob->writeToCatalogue(); - archiveJob->asyncSetJobSucceed(); - archiveJob->checkAndAsyncReportComplete(); + std::queue<std::unique_ptr <cta::ArchiveJob >> sDBarchiveJobBatch; + sDBarchiveJobBatch.emplace(std::move(archiveJob)); + archiveMount->reportJobsBatchWritten(sDBarchiveJobBatch, lc); archiveJobBatch = archiveMount->getNextJobBatch(1,1,lc); ASSERT_EQ(0, archiveJobBatch.size()); archiveMount->complete(); diff --git a/scheduler/testingMocks/MockArchiveJob.hpp b/scheduler/testingMocks/MockArchiveJob.hpp index 53df1ac01bf2e807945c6fb93ced91301aee72d9..0f57a2e3129176c33dbd5be14e883b2dbe6e0b45 100644 --- a/scheduler/testingMocks/MockArchiveJob.hpp +++ b/scheduler/testingMocks/MockArchiveJob.hpp @@ -38,14 +38,11 @@ namespace cta { failures++; } - virtual void asyncSetJobSucceed() override { + virtual void reportJobSucceeded() { completes++; } - virtual bool checkAndAsyncReportComplete() override { - return false; - } + virtual void validate() override {} - virtual void writeToCatalogue() override {} virtual catalogue::TapeFileWritten validateAndGetTapeFileWritten() override { catalogue::TapeFileWritten fileReport; fileReport.archiveFileId = archiveFile.archiveFileID; diff --git a/scheduler/testingMocks/MockArchiveMount.hpp b/scheduler/testingMocks/MockArchiveMount.hpp index cc9aa79900470ff58120f614f7327f4726728b09..b059ca436c13ee8585caf73082c39b6b7cb4a097 100644 --- a/scheduler/testingMocks/MockArchiveMount.hpp +++ b/scheduler/testingMocks/MockArchiveMount.hpp @@ -49,6 +49,38 @@ namespace cta { } } + void reportJobsBatchWritten(std::queue<std::unique_ptr<cta::ArchiveJob> >& successfulArchiveJobs, + cta::log::LogContext& logContext) override { + try { + std::set<cta::catalogue::TapeFileWritten> tapeFilesWritten; + std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs; + std::unique_ptr<cta::ArchiveJob> job; + while(!successfulArchiveJobs.empty()) { + // Get the next job to report and make sure we will not attempt to process it twice. + job = std::move(successfulArchiveJobs.front()); + successfulArchiveJobs.pop(); + if (!job.get()) continue; + tapeFilesWritten.insert(job->validateAndGetTapeFileWritten()); + validatedSuccessfulArchiveJobs.emplace_back(std::move(job)); + job.reset(nullptr); + } + m_catalogue.filesWrittenToTape(tapeFilesWritten); + for (auto &job: validatedSuccessfulArchiveJobs) { + MockArchiveJob * maj = dynamic_cast<MockArchiveJob *>(job.get()); + if (!maj) throw cta::exception::Exception("Wrong job type."); + maj->reportJobSucceeded(); + logContext.log(log::INFO, "Reported to the client a full file archival."); + } + logContext.log(log::INFO, "Reported to the client that a batch of files was written on tape"); + } catch(const cta::exception::Exception& e){ + cta::log::ScopedParamContainer params(logContext); + params.add("exceptionMessageValue", e.getMessageValue()); + const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): got an exception"; + logContext.log(cta::log::ERR, msg_error); + throw cta::ArchiveMount::FailedMigrationRecallResult(msg_error); + } + } + void complete() override { completes++; } diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp index fbc4f9e734c0606eeb3ed6e54d189d7003ae2a20..fcc187c6a8cabcfc377581a9ce8beb36d70e6114 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp @@ -201,7 +201,7 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPa reportPacker.m_lc.log(cta::log::INFO,"Received a flush report from tape, but had no file to report to client. Doing nothing."); return; } - reportPacker.m_archiveMount->reportJobsBatchWritten(std::move(reportPacker.m_successfulArchiveJobs), reportPacker.m_lc); + reportPacker.m_archiveMount->reportJobsBatchWritten(reportPacker.m_successfulArchiveJobs, reportPacker.m_lc); } else { // This is an abnormal situation: we should never flush after an error! reportPacker.m_lc.log(cta::log::ALERT,"Received a flush after an error: sending file errors to client"); diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp index 908f5e8c7f5578a55bc23da328a63d95b0e1f05a..db831632cd81f11804ce67a1f612499a7817542c 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp @@ -69,16 +69,7 @@ namespace unitTests { int & completes, int &failures): MockArchiveJob(am, catalogue), completesRef(completes), failuresRef(failures) {} - virtual void asyncSetJobSucceed() override { - completesRef++; - } - - virtual bool checkAndAsyncReportComplete() override { - return false; - } - virtual void validate() override {} - virtual void writeToCatalogue() override {} virtual cta::catalogue::TapeFileWritten validateAndGetTapeFileWritten() override { cta::catalogue::TapeFileWritten fileReport; fileReport.archiveFileId = archiveFile.archiveFileID; @@ -105,6 +96,11 @@ namespace unitTests { void failed(const cta::exception::Exception& ex, cta::log::LogContext & lc) override { failuresRef++; } + + void reportJobSucceeded() override { + completesRef++; + } + private: int & completesRef; int & failuresRef;