Commit b2f1c4e5 authored by Eric Cano's avatar Eric Cano
Browse files

Added support for placeholder writing on tape.

When no data is received for a given file, the tape server will
leave a placeholder on tape an carry on with the writing of the
following files, avoiding a tape unmount.

Updated unit test.
parent d0b47ecd
......@@ -52,8 +52,10 @@ namespace unitTests{
struct MockMigrationReportPacker : public MigrationReportPacker {
void reportCompletedJob(std::unique_ptr<cta::ArchiveJob> successfulArchiveJob, cta::log::LogContext & lc) override {}
void reportSkippedJob(std::unique_ptr<cta::ArchiveJob> skippedArchiveJob, const std::string& failure,
cta::log::LogContext& lc) override {}
void reportFailedJob(std::unique_ptr<cta::ArchiveJob> failedArchiveJob,
const cta::exception::Exception& ex, cta::log::LogContext & lc) override {}
const cta::exception::Exception& ex, cta::log::LogContext & lc) override {}
void reportEndOfSession(cta::log::LogContext & lc) override {}
void reportEndOfSessionWithErrors(const std::string msg, int error_code, cta::log::LogContext & lc) override {}
void disableBulk() override {}
......
......@@ -65,6 +65,20 @@ std::unique_ptr<cta::ArchiveJob> successfulArchiveJob, cta::log::LogContext & lc
m_fifo.push(rep.release());
}
//------------------------------------------------------------------------------
//reportSkippedJob
//------------------------------------------------------------------------------
void MigrationReportPacker::reportSkippedJob(std::unique_ptr<cta::ArchiveJob> skippedArchiveJob, const std::string& failure,
cta::log::LogContext& lc) {
std::string failureLog = cta::utils::getCurrentLocalTime() + " " + cta::utils::getShortHostname() +
" " + failure;
std::unique_ptr<Report> rep(new ReportSkipped(std::move(skippedArchiveJob), failureLog));
cta::log::ScopedParamContainer params(lc);
params.add("type", "ReporSkipped");
lc.log(cta::log::DEBUG, "In MigrationReportPacker::reportSkippedJob(), pushing a report.");
cta::threading::MutexLocker ml(m_producterProtection);
m_fifo.push(rep.release());
}
//------------------------------------------------------------------------------
//reportFailedJob
//------------------------------------------------------------------------------
void MigrationReportPacker::reportFailedJob(std::unique_ptr<cta::ArchiveJob> failedArchiveJob,
......@@ -170,6 +184,28 @@ void MigrationReportPacker::ReportSuccessful::execute(MigrationReportPacker& rep
reportPacker.m_successfulArchiveJobs.push(std::move(m_successfulArchiveJob));
}
//------------------------------------------------------------------------------
//reportSkipped:execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportSkipped::execute(MigrationReportPacker& reportPacker) {
// We have no successful file to add, but we should report the failure for the file.
{
cta::log::ScopedParamContainer params(reportPacker.m_lc);
params.add("failureLog", m_failureLog)
.add("fileId", m_skippedArchiveJob->archiveFile.archiveFileID);
reportPacker.m_lc.log(cta::log::ERR,"In MigrationReportPacker::ReportSkipped::execute(): skipping archive job after exception.");
}
try {
m_skippedArchiveJob->failed(m_failureLog, reportPacker.m_lc);
} catch (cta::exception::Exception & ex) {
cta::log::ScopedParamContainer params(reportPacker.m_lc);
params.add("ExceptionMSG", ex.getMessageValue())
.add("fileId", m_skippedArchiveJob->archiveFile.archiveFileID);
reportPacker.m_lc.log(cta::log::ERR,"In MigrationReportPacker::ReportSkipped::execute(): call to m_failedArchiveJob->failed() threw an exception.");
reportPacker.m_lc.logBacktrace(cta::log::ERR, ex.backtrace());
}
}
//------------------------------------------------------------------------------
//reportDriveStatus
//------------------------------------------------------------------------------
......
......@@ -58,6 +58,14 @@ public:
*/
virtual void reportCompletedJob(std::unique_ptr<cta::ArchiveJob> successfulArchiveJob, cta::log::LogContext & lc);
/**
* Create into the MigrationReportPacker a report for a skipped file. We left a placeholder on tape, so
* writing can carry on, but this fSeq holds no data. In the mean time, the job has to count a failure.
* @param skippedArchiveJob the failed file
* @param ex the reason for the failure
* @param lc log context provided by the calling thread.
*/
virtual void reportSkippedJob(std::unique_ptr<cta::ArchiveJob> skippedArchiveJob, const std::string& failure, cta::log::LogContext & lc);
/**
* Create into the MigrationReportPacker a report for the failled migration
* of migratedFile
......@@ -136,6 +144,19 @@ private:
m_successfulArchiveJob(std::move(successfulArchiveJob)) {}
void execute(MigrationReportPacker& reportPacker) override;
};
class ReportSkipped : public Report{
const std::string m_failureLog;
/**
* The failed archive job we skipped
*/
std::unique_ptr<cta::ArchiveJob> m_skippedArchiveJob;
public:
ReportSkipped(std::unique_ptr<cta::ArchiveJob> skippedArchiveJob, std::string &failureLog):
m_failureLog(failureLog), m_skippedArchiveJob(std::move(skippedArchiveJob)) {}
void execute(MigrationReportPacker& reportPacker) override;
};
class ReportTestGoingToEnd : public Report {
public:
ReportTestGoingToEnd() {}
......
......@@ -97,11 +97,29 @@ namespace daemon {
m_taskStats.headerVolume += TapeSessionStats::headerVolumePerFile;
// We are not error sources here until we actually write.
currentErrorToCount = "";
bool firstBlock = true;
while(!m_fifo.finished()) {
MemBlock* const mb = m_fifo.popDataBlock();
m_taskStats.waitDataTime += timer.secs(cta::utils::Timer::resetCounter);
AutoReleaseBlock<MigrationMemoryManager> releaser(mb,m_memManager);
// Special treatment for 1st block. If disk failed to provide anything, we can skip the file
// by leaving a placeholder on the tape (at minimal tape space cost), so we can continue
// the tape session (and save a tape mount!).
if (firstBlock && mb->isFailed()) {
currentErrorToCount = "Error_tapeWriteData";
const char blank[]="This file intentionally left blank: leaving placeholder after failing to read from disk.";
output->write(blank, sizeof(blank));
m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter);
watchdog.notify(sizeof(blank));
currentErrorToCount = "Error_tapeWriteTrailer";
output->close();
currentErrorToCount = "";
// Possibly failing writes are finished. We can continue this in catch for skip. outside of the loop.
throw Skip(mb->errorMsg());
}
firstBlock = false;
//will throw (thus exiting the loop) if something is wrong
checkErrors(mb,memBlockId,lc);
......@@ -130,7 +148,8 @@ namespace daemon {
m_archiveJob->tapeFile.checksumType = "ADLER32";
{
std::stringstream cs;
cs << "0X" << std::hex << std::noshowbase << std::uppercase << std::setfill('0') << std::setw(8) << (uint32_t)ckSum;
cs << "0X" << std::hex << std::noshowbase << std::uppercase
<< std::setfill('0') << std::setw(8) << (uint32_t)ckSum;
m_archiveJob->tapeFile.checksumValue = cs.str();
}
m_archiveJob->tapeFile.compressedSize = m_taskStats.dataVolume;
......@@ -152,7 +171,23 @@ namespace daemon {
// and go into a degraded mode operation.
throw;
}
catch(const cta::exception::Exception& e){
catch(const Skip& s) {
// We failed to read anything from the file. We can get rid of any block from the queue to
// recycle them, and pass the report to the report packer. After than, we can carry on with
// the write session.
circulateMemBlocks();
watchdog.addToErrorCount("Info_fileSkipped");
m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter);
m_taskStats.headerVolume += TapeSessionStats::trailerVolumePerFile;
m_taskStats.filesCount ++;
// Record the fSeq in the tape session
session.reportWrittenFSeq(m_archiveJob->tapeFile.fSeq);
reportPacker.reportSkippedJob(std::move(m_archiveJob), s, lc);
m_taskStats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter);
m_taskStats.totalTime = localTime.secs();
// Log the successful transfer
logWithStats(cta::log::INFO, "Left placeholder on tape after skipping unreadable file.", lc);
} catch(const cta::exception::Exception& e){
//we can end up there because
//we failed to open the WriteFile
//we received a bad block or a block written failed
......
......@@ -78,6 +78,14 @@ public:
MigrationReportPacker & reportPacker, MigrationWatchDog & watchdog,
cta::log::LogContext& lc, cta::utils::Timer & timer);
private:
/** Utility class used in execute()'s implementation*/
class Skip: public std::string {
public:
template<typename T> Skip(const T&t): std::string(t) {}
};
public:
/**
* Used to reclaim used memory blocks
* @return the recyclable memory block
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment