diff --git a/scheduler/ArchiveMount.cpp b/scheduler/ArchiveMount.cpp index 0076d4ae947fa1fa891fd72feff78cf25e101c1a..8653b35c48a91636376ed347c361b1f245b94a1f 100644 --- a/scheduler/ArchiveMount.cpp +++ b/scheduler/ArchiveMount.cpp @@ -17,6 +17,7 @@ */ #include "scheduler/ArchiveMount.hpp" +#include "common/make_unique.hpp" //------------------------------------------------------------------------------ // constructor @@ -124,8 +125,8 @@ std::list<std::unique_ptr<cta::ArchiveJob> > cta::ArchiveMount::getNextJobBatch( // reportJobsBatchWritten //------------------------------------------------------------------------------ void cta::ArchiveMount::reportJobsBatchWritten(std::queue<std::unique_ptr<cta::ArchiveJob> > & successfulArchiveJobs, - cta::log::LogContext& logContext) { - std::set<cta::catalogue::TapeItemWrittenPointer> tapeFilesWritten; + std::queue<cta::catalogue::TapeItemWritten> & skippedFiles, cta::log::LogContext& logContext) { + std::set<cta::catalogue::TapeItemWrittenPointer> tapeItemsWritten; std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs; std::unique_ptr<cta::ArchiveJob> job; try{ @@ -139,19 +140,25 @@ void cta::ArchiveMount::reportJobsBatchWritten(std::queue<std::unique_ptr<cta::A job = std::move(successfulArchiveJobs.front()); successfulArchiveJobs.pop(); if (!job.get()) continue; - tapeFilesWritten.emplace(job->validateAndGetTapeFileWritten().release()); + tapeItemsWritten.emplace(job->validateAndGetTapeFileWritten().release()); files++; bytes+=job->archiveFile.fileSize; validatedSuccessfulArchiveJobs.emplace_back(std::move(job)); job.reset(); } + while (!skippedFiles.empty()) { + auto tiwup = cta::make_unique<cta::catalogue::TapeItemWritten>(); + *tiwup = skippedFiles.front(); + skippedFiles.pop(); + tapeItemsWritten.emplace(tiwup.release()); + } utils::Timer t; // Note: former content of ReportFlush::updateCatalogueWithTapeFilesWritten - updateCatalogueWithTapeFilesWritten(tapeFilesWritten); + updateCatalogueWithTapeFilesWritten(tapeItemsWritten); catalogueTime=t.secs(utils::Timer::resetCounter); { cta::log::ScopedParamContainer params(logContext); - params.add("tapeFilesWritten", tapeFilesWritten.size()) + params.add("tapeFilesWritten", tapeItemsWritten.size()) .add("files", files) .add("bytes", bytes) .add("catalogueTime", catalogueTime); diff --git a/scheduler/ArchiveMount.hpp b/scheduler/ArchiveMount.hpp index 2e655f16301337f7ce361099e9a498883f54e576..a74d1a62e6cdc9f673bba9427c5beec24fbb46a0 100644 --- a/scheduler/ArchiveMount.hpp +++ b/scheduler/ArchiveMount.hpp @@ -131,7 +131,8 @@ namespace cta { * @param successfulArchiveJobs the jobs to report * @param logContext */ - virtual void reportJobsBatchWritten (std::queue<std::unique_ptr<cta::ArchiveJob> > & successfulArchiveJobs, cta::log::LogContext &logContext); + virtual void reportJobsBatchWritten (std::queue<std::unique_ptr<cta::ArchiveJob> > & successfulArchiveJobs, + std::queue<cta::catalogue::TapeItemWritten> & skippedFiles, cta::log::LogContext &logContext); /** * Returns the tape pool of the tape to be mounted. diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 001449294d317d77cb09363323c2f70840636d90..6315ed90715422ca3dfa9937c5c26b9afb4191bf 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -467,8 +467,9 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) { archiveJob->tapeFile.copyNb = 1; archiveJob->validate(); std::queue<std::unique_ptr <cta::ArchiveJob >> sDBarchiveJobBatch; + std::queue<cta::catalogue::TapeItemWritten> sTapeItems; sDBarchiveJobBatch.emplace(std::move(archiveJob)); - archiveMount->reportJobsBatchWritten(sDBarchiveJobBatch, lc); + archiveMount->reportJobsBatchWritten(sDBarchiveJobBatch, sTapeItems, lc); archiveJobBatch = archiveMount->getNextJobBatch(1,1,lc); ASSERT_EQ(0, archiveJobBatch.size()); archiveMount->complete(); diff --git a/scheduler/testingMocks/MockArchiveMount.hpp b/scheduler/testingMocks/MockArchiveMount.hpp index d373adb68df0a1e3b790cc59bfae0e0fd2b0e195..6fe771e328a4a97a7f26da5813d96bfb7a387c97 100644 --- a/scheduler/testingMocks/MockArchiveMount.hpp +++ b/scheduler/testingMocks/MockArchiveMount.hpp @@ -50,7 +50,7 @@ namespace cta { } void reportJobsBatchWritten(std::queue<std::unique_ptr<cta::ArchiveJob> >& successfulArchiveJobs, - cta::log::LogContext& logContext) override { + std::queue<cta::catalogue::TapeItemWritten> & skippedFiles, cta::log::LogContext& logContext) override { try { std::set<cta::catalogue::TapeItemWrittenPointer> tapeItemsWritten; std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs; @@ -64,6 +64,12 @@ namespace cta { validatedSuccessfulArchiveJobs.emplace_back(std::move(job)); job.reset(nullptr); } + while (!skippedFiles.empty()) { + auto tiwup = cta::make_unique<cta::catalogue::TapeItemWritten>(); + *tiwup = skippedFiles.front(); + skippedFiles.pop(); + tapeItemsWritten.emplace(tiwup.release()); + } m_catalogue.filesWrittenToTape(tapeItemsWritten); for (auto &job: validatedSuccessfulArchiveJobs) { MockArchiveJob * maj = dynamic_cast<MockArchiveJob *>(job.get()); diff --git a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp index a13be67ac2336822ad58de73a20a20ae45ff615d..6a20b6e63b5ccc606f6f10d046c3da88770b47d6 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp @@ -1434,6 +1434,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionMissingFilesMigration) { ar.requester.group = "group"; ar.fileSize = 1000; ar.diskFileID = "x"; + ar.diskFileID += std::to_string(fseq); ar.diskFileInfo.path = "y"; ar.diskFileInfo.owner = "z"; ar.diskFileInfo.group = "g"; @@ -1442,7 +1443,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionMissingFilesMigration) { archiveFileIds.push_back(archiveFileId); scheduler.queueArchiveWithGivenId(archiveFileId,s_diskInstance,ar,logContext); // Delete the even files: the migration will work for half of them. - if (fseq % 2) sourceFiles.pop_back(); + if (!(fseq % 2)) sourceFiles.pop_back(); } } scheduler.waitSchedulerDbSubthreadsComplete(); @@ -1483,6 +1484,13 @@ TEST_P(DataTransferSessionTest, DataTransferSessionMissingFilesMigration) { count++; } ASSERT_EQ(5, count); + cta::catalogue::TapeSearchCriteria tapeCriteria; + tapeCriteria.vid=s_vid; + auto tapeInfo = catalogue.getTapes(tapeCriteria); + ASSERT_EQ(1, tapeInfo.size()); + ASSERT_EQ(10, tapeInfo.begin()->lastFSeq); + ASSERT_EQ(5*1000, tapeInfo.begin()->dataOnTapeInBytes); + // Check logs for drive statistics std::string logToCheck = logger.getLog(); logToCheck += ""; diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp index 5f0ec534b99d7fa9c16342a8ca31f370e792123f..0faaa9a830e93fb159a9ebd65ac18c5df63d7c3c 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp @@ -204,6 +204,11 @@ void MigrationReportPacker::ReportSkipped::execute(MigrationReportPacker& report reportPacker.m_lc.log(cta::log::ERR,"In MigrationReportPacker::ReportSkipped::execute(): call to m_failedArchiveJob->failed() threw an exception."); reportPacker.m_lc.logBacktrace(cta::log::ERR, ex.backtrace()); } + reportPacker.m_skippedFiles.push(cta::catalogue::TapeItemWritten()); + auto & tapeItem = reportPacker.m_skippedFiles.back(); + tapeItem.fSeq = m_skippedArchiveJob->tapeFile.fSeq; + tapeItem.tapeDrive = reportPacker.m_archiveMount->getDrive(); + tapeItem.vid = m_skippedArchiveJob->tapeFile.vid; } //------------------------------------------------------------------------------ @@ -236,11 +241,12 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPa // We can receive double flushes when the periodic flush happens // right before the end of session (which triggers also a flush) // We refrain from sending an empty report to the client in this case. - if (reportPacker.m_successfulArchiveJobs.empty()) { + if (reportPacker.m_successfulArchiveJobs.empty() && reportPacker.m_skippedFiles.empty()) { reportPacker.m_lc.log(cta::log::INFO,"Received a flush report from tape, but had no file to report to client. Doing nothing."); return; } - reportPacker.m_archiveMount->reportJobsBatchWritten(reportPacker.m_successfulArchiveJobs, reportPacker.m_lc); + reportPacker.m_archiveMount->reportJobsBatchWritten(reportPacker.m_successfulArchiveJobs, reportPacker.m_skippedFiles, + reportPacker.m_lc); } else { // This is an abnormal situation: we should never flush after an error! reportPacker.m_lc.log(cta::log::ALERT,"Received a flush after an error: sending file errors to client"); diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp index 375529eecf69bfae4131c120555ffa38b5d64135..2e85b7bad40ef9ec8ccf3c1e95d11edef96ac888 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp @@ -254,6 +254,11 @@ private: * The successful archive jobs to be reported when flushing */ std::queue<std::unique_ptr<cta::ArchiveJob> > m_successfulArchiveJobs; + + /** + * The skipped files (or placeholders list) + */ + std::queue<cta::catalogue::TapeItemWritten> m_skippedFiles; }; }}}}