Commit 08e744a3 authored by Eric Cano's avatar Eric Cano
Browse files

CASTOR-4832: tapeserverd should report error counts in the end of session statistics

Propagated references to the watchdog to disk and tape threads and tasks.
Added a maps for storing the count of all errors that occured during the session.
Added propagation of the counts to the initial process.
Added error reporting in disk and tapes threads and tasks by using a string marker
allowing to know which part went wrong in high level exception. Exceptions then
bumps up the count in the watchdog (synchronously), and the watchdog sends the
new count to the initial thread later (in the watchdog's thread).
Plus some missing logs fixed when an exception is thrown in a disk write task.

Added interface to reference the recall watchdog to the recall disk thread pool.
Next: Do same for migration. Actually store the reference. Add new error map storing in watchdog. Add error reporting in tape and disk threads.

WIP: Added missing file ID for disk write thread. Added missing error log when an exception is thrown in a disk write task.

WIP: switching to CASTOR-4839 tapeserverd: task injector should decide on closing the session earlier

WIP. Next: log unmount errors (in the RAII)

Finished error counting in data threads.
parent d2f56f72
......@@ -84,7 +84,7 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
// 1) Prepare the logging environment
log::LogContext lc(m_log);
// Create a sticky thread name, which will be overridden by the other threads
lc.pushOrReplace(log::Param("thread", "mainThread"));
lc.pushOrReplace(log::Param("thread", "MainThread"));
log::LogContext::ScopedParam sp01(lc, log::Param("clientHost", m_request.clientHost));
log::LogContext::ScopedParam sp02(lc, log::Param("clientPort", m_request.clientPort));
log::LogContext::ScopedParam sp03(lc, log::Param("mountTransactionId", m_request.volReqId));
......@@ -212,6 +212,7 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
DiskWriteThreadPool dwtp(m_castorConf.nbDiskThreads,
rrp,
rwd,
lc,
m_castorConf.remoteFileProtocol,
m_castorConf.xrootPrivateKey);
......@@ -308,6 +309,7 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
DiskReadThreadPool drtp(m_castorConf.nbDiskThreads,
m_castorConf.bulkRequestMigrationMaxFiles,
m_castorConf.bulkRequestMigrationMaxBytes,
mwd,
lc,
m_castorConf.remoteFileProtocol,
m_castorConf.xrootPrivateKey);
......
......@@ -43,7 +43,8 @@ m_nextTask(destination),m_migratedFile(file),
//------------------------------------------------------------------------------
// DiskReadTask::execute
//------------------------------------------------------------------------------
void DiskReadTask::execute(log::LogContext& lc, diskFile::DiskFileFactory & fileFactory) {
void DiskReadTask::execute(log::LogContext& lc, diskFile::DiskFileFactory & fileFactory,
MigrationWatchDog & watchdog) {
using log::LogContext;
using log::Param;
......@@ -52,22 +53,28 @@ void DiskReadTask::execute(log::LogContext& lc, diskFile::DiskFileFactory & file
size_t blockId=0;
size_t migratingFileSize=m_migratedFile->fileSize();
MemBlock* mb=NULL;
// This out-of-try-catch variables allows us to record the stage of the
// process we're in, and to count the error if it occurs.
// We will not record errors for an empty string. This will allow us to
// prevent counting where error happened upstream.
std::string currentErrorToCount = "";
try{
//we first check here to not even try to open the disk if a previous task has failed
//because the disk could the very reason why the previous one failed,
//so dont do the same mistake twice !
checkMigrationFailing();
currentErrorToCount = "diskOpenForReadErrorCount";
std::auto_ptr<tape::diskFile::ReadFile> sourceFile(
fileFactory.createReadFile(m_migratedFile->path()));
log::ScopedParamContainer URLcontext(lc);
URLcontext.add("path", m_migratedFile->path())
.add("actualURL", sourceFile->URL());
currentErrorToCount = "diskFileToReadSizeMismatchCount";
if(migratingFileSize != sourceFile->size()){
throw castor::exception::Exception("Mismtach between size given by the client "
"and the real one");
}
currentErrorToCount = "";
m_stats.openingTime+=localTime.secs(castor::utils::Timer::resetCounter);
......@@ -84,7 +91,8 @@ void DiskReadTask::execute(log::LogContext& lc, diskFile::DiskFileFactory & file
//set metadata and read the data
mb->m_fileid = m_migratedFile->fileid();
mb->m_fileBlock = blockId++;
currentErrorToCount = "diskReadErrorCount";
migratingFileSize -= mb->m_payload.read(*sourceFile);
m_stats.readWriteTime+=localTime.secs(castor::utils::Timer::resetCounter);
......@@ -92,11 +100,13 @@ void DiskReadTask::execute(log::LogContext& lc, diskFile::DiskFileFactory & file
//we either read at full capacity (ie size=capacity) or if there different,
//it should be the end => migratingFileSize should be 0. If it not, error
currentErrorToCount = "diskUnexpectedSizeWhenReadingCount";
if(mb->m_payload.size() != mb->m_payload.totalCapacity() && migratingFileSize>0){
std::string erroMsg = "Error while reading a file. Did not read at full capacity but the file is not fully read";
mb->markAsFailed(erroMsg,SEINTERNAL);
throw castor::exception::Exception(erroMsg);
}
currentErrorToCount = "";
m_stats.checkingErrorTime += localTime.secs(castor::utils::Timer::resetCounter);
// We are done with the block, push it to the write task
......@@ -118,6 +128,10 @@ void DiskReadTask::execute(log::LogContext& lc, diskFile::DiskFileFactory & file
circulateAllBlocks(blockId,mb);
}
catch(const castor::exception::Exception& e){
// Send the error for counting to the watchdog
if (currentErrorToCount.size()) {
watchdog.addToErrorCount(currentErrorToCount);
}
// We have to pump the blocks anyway, mark them failed and then pass them back
// to TapeWriteTask
// Otherwise they would be stuck into TapeWriteTask free block fifo
......
......@@ -27,6 +27,7 @@
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
#include "castor/tape/tapeserver/daemon/DiskStats.hpp"
#include "castor/tape/tapeserver/daemon/ErrorFlag.hpp"
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
#include "castor/tape/tapegateway/FileToMigrateStruct.hpp"
#include "castor/server/AtomicFlag.hpp"
#include "castor/log/LogContext.hpp"
......@@ -48,7 +49,8 @@ public:
tape::tapegateway::FileToMigrateStruct* file,size_t numberOfBlock,
castor::server::AtomicFlag& errorFlag);
void execute(log::LogContext& lc, diskFile::DiskFileFactory & fileFactory);
void execute(log::LogContext& lc, diskFile::DiskFileFactory & fileFactory,
MigrationWatchDog & watchdog);
/**
* Return the stats of the tasks. Should be call after execute
* (otherwise, it is pointless)
......
......@@ -36,10 +36,12 @@ namespace daemon {
//------------------------------------------------------------------------------
// DiskReadThreadPool constructor
//------------------------------------------------------------------------------
DiskReadThreadPool::DiskReadThreadPool(int nbThread, uint64_t maxFilesReq,uint64_t maxBytesReq,
DiskReadThreadPool::DiskReadThreadPool(int nbThread, uint64_t maxFilesReq,uint64_t maxBytesReq,
castor::tape::tapeserver::daemon::MigrationWatchDog & migrationWatchDog,
castor::log::LogContext lc, const std::string & remoteFileProtocol,
const std::string & xrootPrivateKeyPath) :
m_diskFileFactory(remoteFileProtocol, xrootPrivateKeyPath),
m_watchdog(migrationWatchDog),
m_lc(lc),m_maxFilesReq(maxFilesReq),
m_maxBytesReq(maxBytesReq), m_nbActiveThread(0) {
for(int i=0; i<nbThread; i++) {
......@@ -151,10 +153,10 @@ void DiskReadThreadPool::logWithStat(int level, const std::string& message){
// DiskReadWorkerThread::run
//------------------------------------------------------------------------------
void DiskReadThreadPool::DiskReadWorkerThread::run() {
m_lc.pushOrReplace(log::Param("thread", "DiskRead"));
m_lc.pushOrReplace(log::Param("threadID",m_threadID));
m_lc.log(LOG_DEBUG, "DiskReadWorkerThread Running");
castor::log::ScopedParamContainer logParams(m_lc);
logParams.add("thread", "DiskRead")
.add("threadID", m_threadID);
m_lc.log(LOG_DEBUG, "Starting DiskReadWorkerThread");
std::auto_ptr<DiskReadTask> task;
castor::utils::Timer localTime;
......@@ -164,7 +166,7 @@ void DiskReadThreadPool::DiskReadWorkerThread::run() {
task.reset( m_parent.popAndRequestMore(m_lc));
m_threadStat.waitInstructionsTime += localTime.secs(castor::utils::Timer::resetCounter);
if (NULL!=task.get()) {
task->execute(m_lc, m_parent.m_diskFileFactory);
task->execute(m_lc, m_parent.m_diskFileFactory,m_parent.m_watchdog);
m_threadStat += task->getTaskStats();
}
else {
......
......@@ -25,6 +25,7 @@
#pragma once
#include "castor/tape/tapeserver/daemon/DiskReadTask.hpp"
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
#include "castor/server/BlockingQueue.hpp"
#include "castor/server/Threading.hpp"
#include "castor/server/AtomicCounter.hpp"
......@@ -51,6 +52,7 @@ public:
* @param lc log context for logging purpose
*/
DiskReadThreadPool(int nbThread, uint64_t maxFilesReq,uint64_t maxBytesReq,
castor::tape::tapeserver::daemon::MigrationWatchDog & migrationWatchDog,
castor::log::LogContext lc, const std::string & remoteFileProtocol,
const std::string & xrootPrivateKeyPath);
......@@ -186,6 +188,11 @@ private:
* deleted by the threads after execution) */
castor::server::BlockingQueue<DiskReadTask *> m_tasks;
/**
* Reference to the watchdog, for error reporting.
*/
castor::tape::tapeserver::daemon::MigrationWatchDog & m_watchdog;
/** The log context. This is copied on construction to prevent interferences
* between threads.
*/
......
......@@ -44,19 +44,27 @@ m_recallingFile(file),m_memManager(mm){
// DiskWriteTask::execute
//------------------------------------------------------------------------------
bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc,
diskFile::DiskFileFactory & fileFactory) {
diskFile::DiskFileFactory & fileFactory, RecallWatchDog & watchdog) {
using log::LogContext;
using log::Param;
castor::utils::Timer localTime;
castor::utils::Timer totalTime(localTime);
castor::utils::Timer transferTime(localTime);
log::ScopedParamContainer URLcontext(lc);
URLcontext.add("NSFILEID",m_recallingFile->fileid())
.add("path", m_recallingFile->path())
.add("fileTransactionId",m_recallingFile->fileTransactionId())
.add("fSeq",m_recallingFile->fseq());
// This out-of-try-catch variables allows us to record the stage of the
// process we're in, and to count the error if it occurs.
// We will not record errors for an empty string. This will allow us to
// prevent counting where error happened upstream.
std::string currentErrorToCount = "";
try{
currentErrorToCount = "";
// Placeholder for the disk file. We will open it only
// after getting a first correct memory block.
std::auto_ptr<tape::diskFile::WriteFile> writeFile;
log::ScopedParamContainer URLcontext(lc);
URLcontext.add("NSFILEID",m_recallingFile->fileid())
.add("path", m_recallingFile->path());
int blockId = 0;
unsigned long checksum = Payload::zeroAdler32();
......@@ -77,8 +85,9 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc,
// If we got that far on the first pass, it's now good enough to open
// the disk file for writing...
if (!writeFile.get()) {
lc.log(LOG_INFO, "About to open disk file for writing");
lc.log(LOG_DEBUG, "About to open disk file for writing");
// Synchronise the counter with the open time counter.
currentErrorToCount = "diskOpenForWriteErrorCount";
transferTime = localTime;
writeFile.reset(fileFactory.createWriteFile(m_recallingFile->path()));
URLcontext.add("actualURL", writeFile->URL());
......@@ -87,12 +96,14 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc,
}
// Write the data.
currentErrorToCount = "diskWriteErrorCount";
m_stats.dataVolume+=mb->m_payload.size();
mb->m_payload.write(*writeFile);
m_stats.readWriteTime+=localTime.secs(castor::utils::Timer::resetCounter);
checksum = mb->m_payload.adler32(checksum);
m_stats.checksumingTime+=localTime.secs(castor::utils::Timer::resetCounter);
currentErrorToCount = "";
blockId++;
} //end if block non NULL
......@@ -100,10 +111,12 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc,
//close has to be explicit, because it may throw.
//A close is done in WriteFile's destructor, but it may lead to some
//silent data loss
currentErrorToCount = "diskCloseAfterWriteErrorCount";
writeFile->close();
m_stats.closingTime +=localTime.secs(castor::utils::Timer::resetCounter);
m_stats.filesCount++;
break;
currentErrorToCount = "";
}
} //end of while(1)
reporter.reportCompletedJob(*m_recallingFile,checksum,m_stats.dataVolume);
......@@ -128,8 +141,19 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc,
// We need to empty it
releaseAllBlock();
reporter.reportFailedJob(*m_recallingFile,e.getMessageValue(),e.code());
// Propagate the error to the watchdog
if (currentErrorToCount.size()) {
watchdog.addToErrorCount(currentErrorToCount);
}
m_stats.waitReportingTime+=localTime.secs(castor::utils::Timer::resetCounter);
log::ScopedParamContainer params(lc);
params.add("errorMessage", e.getMessageValue())
.add("errorCode", e.code());
logWithStat(LOG_ERR, "File writing to disk failed.", lc);
lc.logBacktrace(LOG_ERR, e.backtrace());
reporter.reportFailedJob(*m_recallingFile,e.getMessageValue(),e.code());
//got an exception, return false
return false;
......
......@@ -30,6 +30,7 @@
#include "castor/tape/tapegateway/FileToRecallStruct.hpp"
#include "castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
#include "castor/tape/tapeserver/daemon/DiskStats.hpp"
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
#include <memory>
namespace {
......@@ -63,7 +64,7 @@ public:
* @return true if the file has been successfully written false otherwise.
*/
virtual bool execute(RecallReportPacker& reporter,log::LogContext& lc,
diskFile::DiskFileFactory & fileFactory) ;
diskFile::DiskFileFactory & fileFactory, RecallWatchDog & watchdog) ;
/**
* Allows client code to return a reusable memory block. Should not been called
......
......@@ -76,7 +76,7 @@ namespace unitTests{
t.pushDataBlock(mb);
t.pushDataBlock(NULL);
t.execute(report,lc,fileFactory);
t.execute(report,lc,fileFactory,*((RecallWatchDog*)NULL));
}
}
......@@ -38,11 +38,13 @@ namespace daemon {
// constructor
//------------------------------------------------------------------------------
DiskWriteThreadPool::DiskWriteThreadPool(int nbThread,
RecallReportPacker& report,castor::log::LogContext lc,
RecallReportPacker& report,
RecallWatchDog& recallWatchDog,
castor::log::LogContext lc,
const std::string & remoteFileProtocol,
const std::string & xrootPrivateKeyPath):
m_diskFileFactory(remoteFileProtocol,xrootPrivateKeyPath),
m_reporter(report),m_lc(lc)
m_reporter(report),m_watchdog(recallWatchDog),m_lc(lc)
{
m_lc.pushOrReplace(castor::log::Param("threadCount", nbThread));
for(int i=0; i<nbThread; i++) {
......@@ -145,10 +147,9 @@ void DiskWriteThreadPool::logWithStat(int level, const std::string& message){
// DiskWriteWorkerThread::run
//------------------------------------------------------------------------------
void DiskWriteThreadPool::DiskWriteWorkerThread::run() {
typedef castor::log::LogContext::ScopedParam ScopedParam;
using castor::log::Param;
m_lc.pushOrReplace(log::Param("thread", "diskWrite"));
m_lc.pushOrReplace(log::Param("threadID", m_threadID));
castor::log::ScopedParamContainer logParams(m_lc);
logParams.add("thread", "DiskWrite")
.add("threadID", m_threadID);
m_lc.log(LOG_INFO, "Starting DiskWriteWorkerThread");
std::auto_ptr<DiskWriteTask> task;
......@@ -160,9 +161,10 @@ void DiskWriteThreadPool::DiskWriteWorkerThread::run() {
m_threadStat.waitInstructionsTime+=localTime.secs(castor::utils::Timer::resetCounter);
if (NULL!=task.get()) {
if(false==task->execute(m_parentThreadPool.m_reporter,m_lc,
m_parentThreadPool.m_diskFileFactory)) {
m_parentThreadPool.m_diskFileFactory, m_parentThreadPool.m_watchdog)) {
++m_parentThreadPool.m_failedWriteCount;
ScopedParam sp(m_lc, Param("errorCount", m_parentThreadPool.m_failedWriteCount));
castor::log::ScopedParamContainer logParams(m_lc);
logParams.add("errorCount", m_parentThreadPool.m_failedWriteCount);
m_lc.log(LOG_ERR, "Task failed: counting another error for this session");
}
m_threadStat+=task->getTaskStats();
......@@ -184,7 +186,8 @@ void DiskWriteThreadPool::DiskWriteWorkerThread::run() {
}
else{
m_parentThreadPool.m_reporter.reportEndOfSessionWithErrors("End of recall session with error(s)",SEINTERNAL);
ScopedParam sp(m_lc, Param("errorCount", m_parentThreadPool.m_failedWriteCount));
castor::log::ScopedParamContainer logParams(m_lc);
logParams.add("errorCount", m_parentThreadPool.m_failedWriteCount);
m_parentThreadPool.logWithStat(LOG_INFO, "As last exiting DiskWriteWorkerThread, reported an end of session with errors");
}
}
......
......@@ -32,6 +32,7 @@
#include "castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
#include "castor/tape/tapeserver/daemon/DiskWriteTask.hpp"
#include "castor/tape/tapeserver/daemon/DiskStats.hpp"
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
#include "castor/utils/Timer.hpp"
#include <vector>
#define __STDC_FORMAT_MACROS
......@@ -64,6 +65,7 @@ public:
*/
DiskWriteThreadPool(int nbThread,
RecallReportPacker& reportPacker,
RecallWatchDog& recallWatchDog,
castor::log::LogContext lc,
const std::string & remoteFileProtocol,
const std::string & xrootPrivateKeyPath);
......@@ -193,6 +195,10 @@ private:
* individual files and the end of session (for the last thread) */
RecallReportPacker& m_reporter;
/** Reference to the session watchdog, allowing reporting of errors to it.
*/
RecallWatchDog& m_watchdog;
/** logging context that will be copied by each thread for individual context */
castor::log::LogContext m_lc;
};
......
......@@ -63,7 +63,7 @@ namespace unitTests{
RecallMemoryManager mm(10,100,lc);
DiskWriteThreadPool dwtp(2,report,lc,"RFIO","/dev/null");
DiskWriteThreadPool dwtp(2,report,*((RecallWatchDog*)NULL),lc,"RFIO","/dev/null");
dwtp.startThreads();
castor::tape::tapegateway::FileToRecallStruct file;
......
......@@ -136,6 +136,7 @@ void RecallTaskInjector::injectBulkRecalls(const std::vector<castor::tape::tapeg
m_diskWriter.push(dwt);
m_tapeReader.push(trt);
m_lc.log(LOG_INFO, "Created tasks for recalling a file");
}
LogContext::ScopedParam sp03(m_lc, Param("nbFile", jobs.size()));
m_lc.log(LOG_INFO, "Finished processing batch of recall tasks from client");
......@@ -199,7 +200,7 @@ bool RecallTaskInjector::synchronousInjection()
void RecallTaskInjector::WorkerThread::run()
{
using castor::log::LogContext;
m_parent.m_lc.pushOrReplace(Param("thread", "recallTaskInjector"));
m_parent.m_lc.pushOrReplace(Param("thread", "RecallTaskInjector"));
m_parent.m_lc.log(LOG_DEBUG, "Starting RecallTaskInjector thread");
try{
......
......@@ -53,7 +53,8 @@ class FakeDiskWriteThreadPool: public DiskWriteThreadPool
public:
using DiskWriteThreadPool::m_tasks;
FakeDiskWriteThreadPool(castor::log::LogContext & lc):
DiskWriteThreadPool(1,*((RecallReportPacker*)NULL), lc, "RFIO","/dev/null"){}
DiskWriteThreadPool(1,*((RecallReportPacker*)NULL),
*((RecallWatchDog*)NULL),lc, "RFIO","/dev/null"){}
virtual ~FakeDiskWriteThreadPool() {};
};
......
......@@ -50,6 +50,11 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeClean
//then we log/notify
m_this.m_logContext.log(LOG_DEBUG, "Starting session cleanup. Signaled end of session to task injector.");
m_this.m_stats.waitReportingTime += m_timer.secs(castor::utils::Timer::resetCounter);
// This out-of-try-catch variables allows us to record the stage of the
// process we're in, and to count the error if it occurs.
// We will not record errors for an empty string. This will allow us to
// prevent counting where error happened upstream.
std::string currentErrorToCount = "tapeUnloadErrorCount";
try {
// Do the final cleanup
// in the special case of a "manual" mode tape, we should skip the unload too.
......@@ -63,6 +68,7 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeClean
// And return the tape to the library
// In case of manual mode, this will be filtered by the rmc daemon
// (which will do nothing)
currentErrorToCount = "tapeDismountErrorCount";
m_this.m_mc.dismountTape(m_this.m_volInfo.vid, m_this.m_drive.librarySlot.str());
m_this.m_stats.unmountTime += m_timer.secs(castor::utils::Timer::resetCounter);
m_this.m_logContext.log(LOG_INFO, mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.librarySlot.getLibraryType() ?
......@@ -76,10 +82,20 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeClean
scoped.add("exception_message", ex.getMessageValue())
.add("exception_code",ex.code());
m_this.m_logContext.log(LOG_ERR, "Exception in TapeReadSingleThread-TapeCleaning when unmounting the tape");
try {
if (currentErrorToCount.size()) {
m_this.m_watchdog.addToErrorCount(currentErrorToCount);
}
} catch (...) {}
} catch (...) {
// Notify something failed during the cleaning
m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN;
m_this.m_logContext.log(LOG_ERR, "Non-Castor exception in TapeReadSingleThread-TapeCleaning when unmounting the tape");
try {
if (currentErrorToCount.size()) {
m_this.m_watchdog.addToErrorCount(currentErrorToCount);
}
} catch (...) {}
}
//then we terminate the global status reporter
m_this.m_initialProcess.finish();
......@@ -129,8 +145,9 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::openReadSession() {
//TapeReadSingleThread::run()
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() {
m_logContext.pushOrReplace(log::Param("thread", "tapeRead"));
m_logContext.pushOrReplace(log::Param("thread", "TapeRead"));
castor::utils::Timer timer, totalTimer;
std::string currentErrorToCount = "failedToSetCapabilitiesCount";
try{
// Set capabilities allowing rawio (and hence arbitrary SCSI commands)
// through the st driver file descriptor.
......@@ -154,7 +171,9 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() {
// will also take care of the TapeServerReporter and of RecallTaskInjector
TapeCleaning tapeCleaner(*this, timer);
// Before anything, the tape should be mounted
currentErrorToCount = "tapeFailedToMountForWriteCount";
mountTapeReadOnly();
currentErrorToCount = "tapeFailedToLoadCount";
waitForDrive();
m_stats.mountTime += timer.secs(castor::utils::Timer::resetCounter);
{
......@@ -163,7 +182,10 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() {
m_logContext.log(LOG_INFO, "Tape mounted and drive ready");
}
// Then we have to initialise the tape read session
currentErrorToCount = "tapeFailedToCheckLabelBeforeReadingCount";
std::auto_ptr<castor::tape::tapeFile::ReadSession> rs(openReadSession());
// From now on, the tasks will identify problems when executed.
currentErrorToCount = "";
m_stats.positionTime += timer.secs(castor::utils::Timer::resetCounter);
//and then report
{
......@@ -217,6 +239,10 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() {
m_stats.totalTime = totalTimer.secs();
logWithStat(LOG_INFO, "Tape thread complete",
params);
// Also transmit the error step to the watchdog
if (currentErrorToCount.size()) {
m_watchdog.addToErrorCount(currentErrorToCount);
}
// Flush the remaining tasks to cleanly exit.
while(1){
TapeReadTask* task=m_tasks.pop();
......
......@@ -88,9 +88,14 @@ public:
//(because one mem block can hold several tape blocks
int fileBlock = 0;
int tapeBlock = 0;
// This out-of-try-catch variables allows us to record the stage of the
// process we're in, and to count the error if it occurs.
// We will not record errors for an empty string. This will allow us to
// prevent counting where error happened upstream.
std::string currentErrorToCount = "";
MemBlock* mb=NULL;
try {
currentErrorToCount = "tapeReadPositionErrorCount";
std::auto_ptr<castor::tape::tapeFile::ReadFile> rf(openReadFile(rs,lc));
// At that point we already read the header.
localStats.headerVolume += TapeSessionStats::headerVolumePerFile;
......@@ -99,6 +104,7 @@ public:
localStats.positionTime += timer.secs(castor::utils::Timer::resetCounter);
watchdog.notifyBeginNewJob(*m_fileToRecall);
localStats.waitReportingTime += timer.secs(castor::utils::Timer::resetCounter);
currentErrorToCount = "tapeReadDataError";
while (stillReading) {
// Get a memory block and add information to its metadata
mb=m_mm.getFreeBlock();
......@@ -157,6 +163,10 @@ public:
//we end up there because :
//-- openReadFile brought us here (cant position to the file)
//-- m_payload.append brought us here (error while reading the file)
// Record the error in the watchdog
if (currentErrorToCount.size()) {
watchdog.addToErrorCount(currentErrorToCount);
}
// This is an error case. Log and signal to the disk write task
{
castor::log::LogContext::ScopedParam sp0(lc, Param("fileBlock", fileBlock));
......
......@@ -123,6 +123,11 @@ void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() {
castor::log::ScopedParamContainer threadGlobalParams(m_logContext);
threadGlobalParams.add("thread", "TapeWrite");
castor::utils::Timer timer, totalTimer;
// This out-of-try-catch variables allows us to record the stage of the
// process we're in, and to count the error if it occurs.
// We will not record errors for an empty string. This will allow us to
// prevent counting where error happened upstream.
std::string currentErrorToCount = "failedToSetCapabilitiesCount";
try
{
// Set capabilities allowing rawio (and hence arbitrary SCSI commands)
......@@ -148,13 +153,15 @@ void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() {
// The tape will be loaded
// it has to be unloaded, unmounted at all cost -> RAII
// will also take care of the TapeServerReporter
//
TapeCleaning cleaner(*this, timer);
currentErrorToCount = "tapeFailedToMountForWriteCount";
// Before anything, the tape should be mounted
// This call does the logging of the mount
mountTapeReadWrite();
currentErrorToCount = "tapeFailedToLoadCount";
waitForDrive();
currentErrorToCount = "tapeNotWriteableCount";
isTapeWritable();
m_stats.mountTime += timer.secs(castor::utils::Timer::resetCounter);
......@@ -163,7 +170,7 @@ void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() {
scoped.add("mountTime", m_stats.mountTime);
m_logContext.log(LOG_INFO, "Tape mounted and drive ready");
}
currentErrorToCount = "tapeFailedToPositionForWriteCount";
// Then we have to initialize the tape write session
std::auto_ptr<castor::tape::tapeFile::WriteSession> writeSession(openWriteSession());
m_stats.positionTime += timer.secs(castor::utils::Timer::resetCounter);
......@@ -177,7 +184,8 @@ void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() {
uint64_t bytes=0;
uint64_t files=0;
m_stats.waitReportingTime += timer.secs(castor::utils::Timer::resetCounter);
// Tasks handle their error logging themselves.
currentErrorToCount = "";
std::auto_ptr<TapeWriteTask> task;
while(1) {
//get a task
......@@ -203,9 +211,11 @@ void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() {
bytes+=task->fileSize();
//if one flush counter is above a threshold, then we flush
if (files >= m_filesBeforeFlush || bytes >= m_bytesBeforeFlush) {
currentErrorToCount = "tapeFailedToFlushCount";
tapeFlush("Normal flush because thresholds was reached",bytes,files,timer);