Commit 991b5fc5 authored by David COME's avatar David COME
Browse files

TaskWatchDog is now template, can detect when the drive is stuck and ask the...

TaskWatchDog is now template, can detect when the drive is stuck and ask the right report packer to report that file as wrong, close the session and kill the whole process
parent d3a6c76b
......@@ -186,7 +186,11 @@ int castor::tape::tapeserver::daemon::DataTransferSession::executeRead(LogContex
{
// Allocate all the elements of the memory management (in proper order
// to refer them to each other)
TaskWatchDog watchdog(m_intialProcess,m_logger);
RecallReportPacker rrp(m_clientProxy,
m_castorConf.tapebridgeBulkRequestMigrationMaxFiles,
lc);
TaskWatchDog<detail::Recall> watchdog(m_intialProcess,rrp,m_logger);
RecallMemoryManager mm(m_castorConf.rtcopydNbBufs, m_castorConf.rtcopydBufsz,lc);
TapeServerReporter tsr(m_intialProcess, m_driveConfig,
m_hostname, m_volInfo, lc);
......@@ -196,9 +200,7 @@ int castor::tape::tapeserver::daemon::DataTransferSession::executeRead(LogContex
TapeReadSingleThread trst(*drive, m_rmc, tsr, m_volInfo,
m_castorConf.tapebridgeBulkRequestRecallMaxFiles,m_capUtils,watchdog,lc);
RecallReportPacker rrp(m_clientProxy,
m_castorConf.tapebridgeBulkRequestMigrationMaxFiles,
lc);
DiskWriteThreadPool dwtp(m_castorConf.tapeserverdDiskThreads,
rrp,
lc);
......
......@@ -23,11 +23,10 @@
*****************************************************************************/
#include "castor/tape/tapeserver/daemon/DiskReadThreadPool.hpp"
#include <memory>
#include <sstream>
#include "castor/tape/tapeserver/daemon/MigrationTaskInjector.hpp"
#include "log.h"
#include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp"
#include <memory>
#include <sstream>
namespace castor {
namespace tape {
......
......@@ -94,6 +94,11 @@ void MigrationReportPacker::reportEndOfSessionWithErrors(std::string msg,int err
m_fifo.push(new ReportEndofSessionWithErrors(msg,error_code));
}
//------------------------------------------------------------------------------
//ReportSuccessful::reportStuckOn
//------------------------------------------------------------------------------
void MigrationReportPacker::reportStuckOn(FileStruct& file){
}
//------------------------------------------------------------------------------
//ReportSuccessful::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportSuccessful::execute(MigrationReportPacker& _this){
......
......@@ -37,7 +37,7 @@ namespace tape {
namespace tapeserver {
namespace daemon {
class MigrationReportPacker : private ReportPackerInterface<detail::Migration> {
class MigrationReportPacker : public ReportPackerInterface<detail::Migration> {
public:
/**
* @param tg The client who is asking for a migration of his files
......@@ -82,6 +82,9 @@ public:
*/
void reportEndOfSessionWithErrors(const std::string msg,int error_code);
void reportStuckOn(FileStruct& file);
void startThreads() { m_workerThread.start(); }
void waitThread() { m_workerThread.wait(); }
......
......@@ -27,6 +27,7 @@
#include "castor/tape/tapegateway/FileRecalledNotificationStruct.hpp"
#include "castor/log/Logger.hpp"
#include "log.h"
#include <signal.h>
namespace{
struct failedReportRecallResult : public castor::tape::Exception{
......@@ -88,23 +89,29 @@ void RecallReportPacker::reportEndOfSessionWithErrors(const std::string msg,int
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportEndofSessionWithErrors(msg,error_code));
}
//------------------------------------------------------------------------------
//ReportSuccessful::reportStuckOn
//------------------------------------------------------------------------------
void RecallReportPacker::reportStuckOn(FileStruct& file){
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportStuck(file));
}
//------------------------------------------------------------------------------
//ReportSuccessful::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& _this){
void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& parent){
std::auto_ptr<FileSuccessStruct> successRecall(new FileSuccessStruct);
successRecall->setFseq(m_migratedFile.fseq());
successRecall->setFileTransactionId(m_migratedFile.fileTransactionId());
successRecall->setId(m_migratedFile.id());
successRecall->setNshost(m_migratedFile.nshost());
successRecall->setFileid(m_migratedFile.fileid());
successRecall->setFseq(m_recalledFile.fseq());
successRecall->setFileTransactionId(m_recalledFile.fileTransactionId());
successRecall->setId(m_recalledFile.id());
successRecall->setNshost(m_recalledFile.nshost());
successRecall->setFileid(m_recalledFile.fileid());
successRecall->setChecksum(m_checksum);
//WARNING : ad hoc name of checksum algorithm
successRecall->setChecksumName("adler32");
_this.m_listReports->addSuccessfulRecalls(successRecall.release());
parent.m_listReports->addSuccessfulRecalls(successRecall.release());
}
//------------------------------------------------------------------------------
//flush
......@@ -134,51 +141,62 @@ void RecallReportPacker::flush(){
//------------------------------------------------------------------------------
//ReportEndofSession::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportEndofSession::execute(RecallReportPacker& _this){
void RecallReportPacker::ReportEndofSession::execute(RecallReportPacker& parent){
client::ClientInterface::RequestReport chrono;
if(!_this.m_errorHappened){
_this.m_client.reportEndOfSession(chrono);
_this.logRequestReport(chrono,"Nominal RecallReportPacker::EndofSession has been reported",LOG_INFO);
if(!parent.m_errorHappened){
parent.m_client.reportEndOfSession(chrono);
parent.logRequestReport(chrono,"Nominal RecallReportPacker::EndofSession has been reported",LOG_INFO);
}
else {
const std::string& msg ="RecallReportPacker::EndofSession has been reported but an error happened somewhere in the process";
_this.m_lc.log(LOG_ERR,msg);
_this.m_client.reportEndOfSessionWithError(msg,SEINTERNAL,chrono);
_this.logRequestReport(chrono,"reporting EndOfSessionWithError done",LOG_ERR);
parent.m_lc.log(LOG_ERR,msg);
parent.m_client.reportEndOfSessionWithError(msg,SEINTERNAL,chrono);
parent.logRequestReport(chrono,"reporting EndOfSessionWithError done",LOG_ERR);
}
}
//------------------------------------------------------------------------------
//ReportEndofSessionWithErrors::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportEndofSessionWithErrors::execute(RecallReportPacker& _this){
void RecallReportPacker::ReportEndofSessionWithErrors::execute(RecallReportPacker& parent){
client::ClientInterface::RequestReport chrono;
if(_this.m_errorHappened) {
_this.m_client.reportEndOfSessionWithError(m_message,m_error_code,chrono);
LogContext::ScopedParam(_this.m_lc,Param("errorCode",m_error_code));
_this.m_lc.log(LOG_ERR,m_message);
if(parent.m_errorHappened) {
parent.m_client.reportEndOfSessionWithError(m_message,m_error_code,chrono);
LogContext::ScopedParam(parent.m_lc,Param("errorCode",m_error_code));
parent.m_lc.log(LOG_ERR,m_message);
}
else{
const std::string& msg ="RecallReportPacker::EndofSessionWithErrors has been reported but NO error was detected during the process";
_this.m_lc.log(LOG_ERR,msg);
_this.m_client.reportEndOfSessionWithError(msg,SEINTERNAL,chrono);
parent.m_lc.log(LOG_ERR,msg);
parent.m_client.reportEndOfSessionWithError(msg,SEINTERNAL,chrono);
}
}
//------------------------------------------------------------------------------
//ReportError::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportError::execute(RecallReportPacker& _this){
void RecallReportPacker::ReportError::execute(RecallReportPacker& parent){
std::auto_ptr<FileErrorStruct> failed(new FileErrorStruct);
//failedMigration->setFileMigrationReportList(_this.m_listReports.get());
//failedMigration->setFileMigrationReportList(parent.m_listReports.get());
failed->setErrorCode(m_error_code);
failed->setErrorMessage(m_error_msg);
failed->setFseq(m_migratedFile.fseq());
failed->setFileTransactionId(m_migratedFile.fileTransactionId());
failed->setId(m_migratedFile.id());
failed->setNshost(m_migratedFile.nshost());
failed->setFseq(m_recalledFile.fseq());
failed->setFileTransactionId(m_recalledFile.fileTransactionId());
failed->setId(m_recalledFile.id());
failed->setNshost(m_recalledFile.nshost());
_this.m_listReports->addFailedRecalls(failed.release());
_this.m_errorHappened=true;
parent.m_listReports->addFailedRecalls(failed.release());
parent.m_errorHappened=true;
}
//------------------------------------------------------------------------------
//WorkerThread::run
//------------------------------------------------------------------------------
void RecallReportPacker::ReportStuck::execute(RecallReportPacker& parent){
const int errCode=SEINTERNAL;
const std::string msg="Stuck while reading that file";
RecallReportPacker::ReportError(m_recalledFile,msg,errCode).execute(parent);
parent.flush();
RecallReportPacker::ReportEndofSessionWithErrors(msg,errCode).execute(parent);
kill(getpid(),SIGABRT);
}
//------------------------------------------------------------------------------
//WorkerThread::WorkerThread
......
......@@ -35,7 +35,7 @@ namespace tape {
namespace tapeserver {
namespace daemon {
class RecallReportPacker : protected ReportPackerInterface<detail::Recall> {
class RecallReportPacker : public ReportPackerInterface<detail::Recall> {
public:
/**
* Constructor
......@@ -76,7 +76,10 @@ public:
* @param msg The error message
* @param error_code The error code given by the drive
*/
virtual void reportEndOfSessionWithErrors(const std::string msg,int error_code);
virtual void reportEndOfSessionWithErrors(const std::string msg,int error_code);
void reportStuckOn(FileStruct& file);
/**
* Start the inner thread
*/
......@@ -98,23 +101,32 @@ private:
bool goingToEnd() const {return m_endNear;};
};
class ReportSuccessful : public Report {
const FileStruct m_migratedFile;
const FileStruct m_recalledFile;
unsigned long m_checksum;
public:
ReportSuccessful(const FileStruct& file,unsigned long checksum):
Report(false),m_migratedFile(file),m_checksum(checksum){}
Report(false),m_recalledFile(file),m_checksum(checksum){}
virtual void execute(RecallReportPacker& _this);
};
class ReportError : public Report {
const FileStruct m_migratedFile;
const FileStruct m_recalledFile;
const std::string m_error_msg;
const int m_error_code;
public:
ReportError(const FileStruct& file,std::string msg,int error_code):
Report(false),m_migratedFile(file),m_error_msg(msg),m_error_code(error_code){}
Report(false),m_recalledFile(file),m_error_msg(msg),m_error_code(error_code){}
virtual void execute(RecallReportPacker& _this);
};
class ReportStuck : public Report {
const FileStruct m_recalledFile;
public:
ReportStuck(const FileStruct& file):
Report(false),m_recalledFile(file){}
virtual void execute(RecallReportPacker& _this);
};
class ReportEndofSession : public Report {
public:
ReportEndofSession():Report(true){}
......
......@@ -74,14 +74,14 @@ namespace detail{
* the type PlaceHolder is either detail::Recall or detail::Migration
*/
template <class PlaceHolder> class ReportPackerInterface{
protected :
public :
//some inner typedef to have shorter (and unified) types inside the class
typedef typename detail::HelperTrait<PlaceHolder>::FileReportList FileReportList;
typedef typename detail::HelperTrait<PlaceHolder>::FileStruct FileStruct;
typedef typename detail::HelperTrait<PlaceHolder>::FileSuccessStruct FileSuccessStruct;
typedef typename detail::HelperTrait<PlaceHolder>::FileErrorStruct FileErrorStruct;
protected:
ReportPackerInterface(client::ClientInterface & tg,log::LogContext lc):
m_client(tg),m_lc(lc),m_listReports(new FileReportList)
{}
......@@ -140,6 +140,8 @@ template <class PlaceHolder> class ReportPackerInterface{
* m_listReports is holding all the report waiting to be processed
*/
std::auto_ptr<FileReportList> m_listReports;
public:
virtual void reportStuckOn(FileStruct& file) =0;
};
}}}}
......
......@@ -59,8 +59,11 @@ public:
TapeReadSingleThread(castor::tape::drives::DriveInterface & drive,
castor::legacymsg::RmcProxy & rmc,
TapeServerReporter & gsr,
const client::ClientInterface::VolumeInfo& volInfo, uint64_t maxFilesRequest,
CapabilityUtils &capUtils,TaskWatchDog& watchdog,castor::log::LogContext & lc):
const client::ClientInterface::VolumeInfo& volInfo,
uint64_t maxFilesRequest,
CapabilityUtils &capUtils,
TaskWatchDog<detail::Recall>& watchdog,
castor::log::LogContext & lc):
TapeSingleThreadInterface<TapeReadTask>(drive, rmc, gsr,volInfo,capUtils,lc),
m_maxFilesRequest(maxFilesRequest),m_watchdog(watchdog) {
}
......@@ -231,7 +234,7 @@ private:
///a pointer to task injector, thus we can ask him for more tasks
castor::tape::tapeserver::daemon::RecallTaskInjector * m_taskInjector;
TaskWatchDog& m_watchdog;
TaskWatchDog<detail::Recall>& m_watchdog;
};
}
}
......
......@@ -60,7 +60,7 @@ public:
* Acquire a free memory block from the memory manager , fill it, push it
*/
void execute(castor::tape::tapeFile::ReadSession & rs,
castor::log::LogContext & lc,TaskWatchDog& watchdog) {
castor::log::LogContext & lc,TaskWatchDog<detail::Recall>& watchdog) {
using castor::log::Param;
typedef castor::log::LogContext::ScopedParam ScopedParam;
......@@ -84,6 +84,7 @@ public:
MemBlock* mb=NULL;
try {
std::auto_ptr<castor::tape::tapeFile::ReadFile> rf(openReadFile(rs,lc));
watchdog.notifyBeginNewJob(m_fileToRecall);
while (stillReading) {
// Get a memory block and add information to its metadata
mb=m_mm.getFreeBlock();
......@@ -142,6 +143,7 @@ public:
reportErrorToDiskTask(mb);
}
} //end of catch
watchdog.fileFinished();
}
/**
* Get a valid block and ask to to do the report to the disk write task
......
......@@ -40,7 +40,7 @@ namespace messages{
namespace tape {
namespace tapeserver {
namespace daemon {
class TaskWatchDog;
class TapeServerReporter : private castor::tape::threading::Thread {
public:
......
......@@ -37,37 +37,55 @@ namespace tape {
namespace tapeserver {
namespace daemon {
void TaskWatchDog::run(){
timeval currentTime;
while(!m_stopFlag) {
castor::utils::getTimeOfDay(&currentTime);
timeval diffTime = castor::utils::timevalAbsDiff(currentTime,previousTime);
double diffTimed = castor::utils::timevalToDouble(diffTime);
if(diffTimed > periodToReport){
m_lc.log(LOG_DEBUG,"going to report");
previousTime=currentTime;
m_initialProcess.notifyHeartbeat(nbOfMemblocksMoved.getAndReset());
}else{
usleep(100000);
}
}
}
TaskWatchDog::TaskWatchDog(messages::TapeserverProxy& initialProcess,log::LogContext lc):
nbOfMemblocksMoved(0),periodToReport(2),
m_initialProcess(initialProcess),m_lc(lc){
m_lc.pushOrReplace(log::Param("thread","Watchdog"));
castor::utils::getTimeOfDay(&previousTime);
}
void TaskWatchDog::notify(){
nbOfMemblocksMoved++;
}
void TaskWatchDog::startThread(){
start();
}
void TaskWatchDog::stopThread(){
m_stopFlag.set();
wait();
}
//void TaskWatchDog::run(){
// timeval currentTime;
// while(!m_stopFlag) {
// castor::utils::getTimeOfDay(&currentTime);
//
// timeval diffTimeStuck = castor::utils::timevalAbsDiff(currentTime,m_previousNotifiedTime);
// double diffTimeStuckd = castor::utils::timevalToDouble(diffTimeStuck);
// if(diffTimeStuckd>m_stuckPeriod){
//
// m_reportPacker.reportStuckOn(file);
// }
//
// timeval diffTimeReport = castor::utils::timevalAbsDiff(currentTime,m_previousReportTime);
// double diffTimeReportd = castor::utils::timevalToDouble(diffTimeReport);
// if(diffTimeReportd > m_periodToReport){
// m_lc.log(LOG_DEBUG,"going to report");
// m_previousReportTime=currentTime;
// m_initialProcess.notifyHeartbeat(m_nbOfMemblocksMoved.getAndReset());
// }
// else{
// usleep(100000);
// }
// }
//}
//
//TaskWatchDog::TaskWatchDog(messages::TapeserverProxy& initialProcess,
// ReportPackerInterface<placeHolder>& reportPacker ,log::LogContext lc):
//m_nbOfMemblocksMoved(0),m_periodToReport(2),m_stuckPeriod(60*10),
// m_initialProcess(initialProcess),m_reportPacker(reportPacker),m_lc(lc){
// m_lc.pushOrReplace(log::Param("thread","Watchdog"));
// castor::utils::getTimeOfDay(&m_previousReportTime);
//}
//void TaskWatchDog::notify(){
// timeval tmpTime;
// castor::utils::getTimeOfDay(&tmpTime);
// m_previousNotifiedTime=tmpTime;
// m_nbOfMemblocksMoved++;
//}
//void TaskWatchDog::startThread(){
// start();
//}
//void TaskWatchDog::stopThread(){
// m_stopFlag.set();
// wait();
//}
// void TaskWatchDog::notifyBeginNewJob(const FileStruct& file){
// m_file=file;
// }
// void TaskWatchDog::fileFinished(){
// m_file=file;
// }
}}}}
......@@ -25,35 +25,91 @@
#pragma once
#include "zmq/ZmqWrapper.hpp"
#include "castor/tape/tapeserver/threading/AtomicCounter.hpp"
#include "castor/log/LogContext.hpp"
#include "castor/messages/TapeserverProxy.hpp"
#include "castor/tape/tapeserver/daemon/ReportPackerInterface.hpp"
#include "castor/tape/tapeserver/daemon/TapeServerReporter.hpp"
#include "castor/utils/utils.hpp"
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
class TaskWatchDog : private castor::tape::threading::Thread{
protected:
castor::tape::threading::AtomicCounter<uint64_t> nbOfMemblocksMoved;
timeval previousTime;
const double periodToReport; //in second
castor::tape::threading::AtomicFlag m_stopFlag;
messages::TapeserverProxy& m_initialProcess;
log::LogContext m_lc;
void report(zmq::Socket& m_socket);
virtual void run();
template <class placeHolder> class TaskWatchDog : private castor::tape::threading::Thread{
typedef typename ReportPackerInterface<placeHolder>::FileStruct FileStruct;
castor::tape::threading::AtomicCounter<uint64_t> m_nbOfMemblocksMoved;
timeval m_previousReportTime;
castor::tape::threading::AtomicVariable<timeval> m_previousNotifiedTime;
const double m_periodToReport; //in second
const double m_stuckPeriod; //in second
castor::tape::threading::AtomicFlag m_stopFlag;
messages::TapeserverProxy& m_initialProcess;
ReportPackerInterface<placeHolder>& m_reportPacker;
FileStruct m_file;
bool m_fileBeingMoved;
log::LogContext m_lc;
void run(){
timeval currentTime;
while(!m_stopFlag) {
castor::utils::getTimeOfDay(&currentTime);
timeval diffTimeStuck = castor::utils::timevalAbsDiff(currentTime,m_previousNotifiedTime);
double diffTimeStuckd = castor::utils::timevalToDouble(diffTimeStuck);
if(diffTimeStuckd>m_stuckPeriod && m_fileBeingMoved){
m_reportPacker.reportStuckOn(m_file);
break;
}
timeval diffTimeReport = castor::utils::timevalAbsDiff(currentTime,m_previousReportTime);
double diffTimeReportd = castor::utils::timevalToDouble(diffTimeReport);
if(diffTimeReportd > m_periodToReport){
m_lc.log(LOG_DEBUG,"going to report");
m_previousReportTime=currentTime;
m_initialProcess.notifyHeartbeat(m_nbOfMemblocksMoved.getAndReset());
}
else{
usleep(100000);
}
}
}
void updateStuckTime(){
timeval tmpTime;
castor::utils::getTimeOfDay(&tmpTime);
m_previousNotifiedTime=tmpTime;
}
public:
TaskWatchDog(messages::TapeserverProxy& initialProcess,log::LogContext lc);
void notify();
void startThread();
void stopThread();
};
TaskWatchDog(messages::TapeserverProxy& initialProcess,
ReportPackerInterface<placeHolder>& reportPacker ,log::LogContext lc):
m_nbOfMemblocksMoved(0),m_periodToReport(2),m_stuckPeriod(60*10),
m_initialProcess(initialProcess),m_reportPacker(reportPacker),
m_fileBeingMoved(false),m_lc(lc){
m_lc.pushOrReplace(log::Param("thread","Watchdog"));
castor::utils::getTimeOfDay(&m_previousReportTime);
updateStuckTime();
}
void notify(){
updateStuckTime();
m_nbOfMemblocksMoved++;
}
void startThread(){
start();
}
void stopThread(){
m_stopFlag.set();
wait();
}
void notifyBeginNewJob(const FileStruct& file){
m_file=file;
m_fileBeingMoved=true;
}
void fileFinished(){
m_fileBeingMoved=false;
}
};
}}}}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment