Commit 97c11d58 authored by Eric Cano's avatar Eric Cano
Browse files

Added support for placeholders reporting to catalogue.

This is validated in a unit test.
parent 32797cf8
......@@ -17,6 +17,7 @@
*/
#include "scheduler/ArchiveMount.hpp"
#include "common/make_unique.hpp"
//------------------------------------------------------------------------------
// constructor
......@@ -124,8 +125,8 @@ std::list<std::unique_ptr<cta::ArchiveJob> > cta::ArchiveMount::getNextJobBatch(
// reportJobsBatchWritten
//------------------------------------------------------------------------------
void cta::ArchiveMount::reportJobsBatchWritten(std::queue<std::unique_ptr<cta::ArchiveJob> > & successfulArchiveJobs,
cta::log::LogContext& logContext) {
std::set<cta::catalogue::TapeItemWrittenPointer> tapeFilesWritten;
std::queue<cta::catalogue::TapeItemWritten> & skippedFiles, cta::log::LogContext& logContext) {
std::set<cta::catalogue::TapeItemWrittenPointer> tapeItemsWritten;
std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs;
std::unique_ptr<cta::ArchiveJob> job;
try{
......@@ -139,19 +140,25 @@ void cta::ArchiveMount::reportJobsBatchWritten(std::queue<std::unique_ptr<cta::A
job = std::move(successfulArchiveJobs.front());
successfulArchiveJobs.pop();
if (!job.get()) continue;
tapeFilesWritten.emplace(job->validateAndGetTapeFileWritten().release());
tapeItemsWritten.emplace(job->validateAndGetTapeFileWritten().release());
files++;
bytes+=job->archiveFile.fileSize;
validatedSuccessfulArchiveJobs.emplace_back(std::move(job));
job.reset();
}
while (!skippedFiles.empty()) {
auto tiwup = cta::make_unique<cta::catalogue::TapeItemWritten>();
*tiwup = skippedFiles.front();
skippedFiles.pop();
tapeItemsWritten.emplace(tiwup.release());
}
utils::Timer t;
// Note: former content of ReportFlush::updateCatalogueWithTapeFilesWritten
updateCatalogueWithTapeFilesWritten(tapeFilesWritten);
updateCatalogueWithTapeFilesWritten(tapeItemsWritten);
catalogueTime=t.secs(utils::Timer::resetCounter);
{
cta::log::ScopedParamContainer params(logContext);
params.add("tapeFilesWritten", tapeFilesWritten.size())
params.add("tapeFilesWritten", tapeItemsWritten.size())
.add("files", files)
.add("bytes", bytes)
.add("catalogueTime", catalogueTime);
......
......@@ -131,7 +131,8 @@ namespace cta {
* @param successfulArchiveJobs the jobs to report
* @param logContext
*/
virtual void reportJobsBatchWritten (std::queue<std::unique_ptr<cta::ArchiveJob> > & successfulArchiveJobs, cta::log::LogContext &logContext);
virtual void reportJobsBatchWritten (std::queue<std::unique_ptr<cta::ArchiveJob> > & successfulArchiveJobs,
std::queue<cta::catalogue::TapeItemWritten> & skippedFiles, cta::log::LogContext &logContext);
/**
* Returns the tape pool of the tape to be mounted.
......
......@@ -467,8 +467,9 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
archiveJob->tapeFile.copyNb = 1;
archiveJob->validate();
std::queue<std::unique_ptr <cta::ArchiveJob >> sDBarchiveJobBatch;
std::queue<cta::catalogue::TapeItemWritten> sTapeItems;
sDBarchiveJobBatch.emplace(std::move(archiveJob));
archiveMount->reportJobsBatchWritten(sDBarchiveJobBatch, lc);
archiveMount->reportJobsBatchWritten(sDBarchiveJobBatch, sTapeItems, lc);
archiveJobBatch = archiveMount->getNextJobBatch(1,1,lc);
ASSERT_EQ(0, archiveJobBatch.size());
archiveMount->complete();
......
......@@ -50,7 +50,7 @@ namespace cta {
}
void reportJobsBatchWritten(std::queue<std::unique_ptr<cta::ArchiveJob> >& successfulArchiveJobs,
cta::log::LogContext& logContext) override {
std::queue<cta::catalogue::TapeItemWritten> & skippedFiles, cta::log::LogContext& logContext) override {
try {
std::set<cta::catalogue::TapeItemWrittenPointer> tapeItemsWritten;
std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs;
......@@ -64,6 +64,12 @@ namespace cta {
validatedSuccessfulArchiveJobs.emplace_back(std::move(job));
job.reset(nullptr);
}
while (!skippedFiles.empty()) {
auto tiwup = cta::make_unique<cta::catalogue::TapeItemWritten>();
*tiwup = skippedFiles.front();
skippedFiles.pop();
tapeItemsWritten.emplace(tiwup.release());
}
m_catalogue.filesWrittenToTape(tapeItemsWritten);
for (auto &job: validatedSuccessfulArchiveJobs) {
MockArchiveJob * maj = dynamic_cast<MockArchiveJob *>(job.get());
......
......@@ -1434,6 +1434,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionMissingFilesMigration) {
ar.requester.group = "group";
ar.fileSize = 1000;
ar.diskFileID = "x";
ar.diskFileID += std::to_string(fseq);
ar.diskFileInfo.path = "y";
ar.diskFileInfo.owner = "z";
ar.diskFileInfo.group = "g";
......@@ -1442,7 +1443,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionMissingFilesMigration) {
archiveFileIds.push_back(archiveFileId);
scheduler.queueArchiveWithGivenId(archiveFileId,s_diskInstance,ar,logContext);
// Delete the even files: the migration will work for half of them.
if (fseq % 2) sourceFiles.pop_back();
if (!(fseq % 2)) sourceFiles.pop_back();
}
}
scheduler.waitSchedulerDbSubthreadsComplete();
......@@ -1483,6 +1484,13 @@ TEST_P(DataTransferSessionTest, DataTransferSessionMissingFilesMigration) {
count++;
}
ASSERT_EQ(5, count);
cta::catalogue::TapeSearchCriteria tapeCriteria;
tapeCriteria.vid=s_vid;
auto tapeInfo = catalogue.getTapes(tapeCriteria);
ASSERT_EQ(1, tapeInfo.size());
ASSERT_EQ(10, tapeInfo.begin()->lastFSeq);
ASSERT_EQ(5*1000, tapeInfo.begin()->dataOnTapeInBytes);
// Check logs for drive statistics
std::string logToCheck = logger.getLog();
logToCheck += "";
......
......@@ -204,6 +204,11 @@ void MigrationReportPacker::ReportSkipped::execute(MigrationReportPacker& report
reportPacker.m_lc.log(cta::log::ERR,"In MigrationReportPacker::ReportSkipped::execute(): call to m_failedArchiveJob->failed() threw an exception.");
reportPacker.m_lc.logBacktrace(cta::log::ERR, ex.backtrace());
}
reportPacker.m_skippedFiles.push(cta::catalogue::TapeItemWritten());
auto & tapeItem = reportPacker.m_skippedFiles.back();
tapeItem.fSeq = m_skippedArchiveJob->tapeFile.fSeq;
tapeItem.tapeDrive = reportPacker.m_archiveMount->getDrive();
tapeItem.vid = m_skippedArchiveJob->tapeFile.vid;
}
//------------------------------------------------------------------------------
......@@ -236,11 +241,12 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPa
// We can receive double flushes when the periodic flush happens
// right before the end of session (which triggers also a flush)
// We refrain from sending an empty report to the client in this case.
if (reportPacker.m_successfulArchiveJobs.empty()) {
if (reportPacker.m_successfulArchiveJobs.empty() && reportPacker.m_skippedFiles.empty()) {
reportPacker.m_lc.log(cta::log::INFO,"Received a flush report from tape, but had no file to report to client. Doing nothing.");
return;
}
reportPacker.m_archiveMount->reportJobsBatchWritten(reportPacker.m_successfulArchiveJobs, reportPacker.m_lc);
reportPacker.m_archiveMount->reportJobsBatchWritten(reportPacker.m_successfulArchiveJobs, reportPacker.m_skippedFiles,
reportPacker.m_lc);
} else {
// This is an abnormal situation: we should never flush after an error!
reportPacker.m_lc.log(cta::log::ALERT,"Received a flush after an error: sending file errors to client");
......
......@@ -254,6 +254,11 @@ private:
* The successful archive jobs to be reported when flushing
*/
std::queue<std::unique_ptr<cta::ArchiveJob> > m_successfulArchiveJobs;
/**
* The skipped files (or placeholders list)
*/
std::queue<cta::catalogue::TapeItemWritten> m_skippedFiles;
};
}}}}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment