Commit d4c83fac authored by Eric Cano's avatar Eric Cano
Browse files

CASTOR-4765: The heartbeat messages of the data-transfer session of...

CASTOR-4765: The heartbeat messages of the data-transfer session of tapeserverd should include valid drive unit-names

Partial implementation of the ticket:
- The reporting to client and session exit part has been dismantled.
- The template based implementation of the watchdog has been changed to a more classing inheritance schema.
- The reporting to mother process of the moved blocks has been re-enabled.
parent faaac3b8
......@@ -187,7 +187,7 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
if (tapegateway::READ_TP == m_volInfo.clientType) {
rrp.disableBulk();
}
TaskWatchDog<detail::Recall> watchdog(2,60*10,m_intialProcess,rrp,lc);
RecallWatchDog watchdog(2,60*10,m_intialProcess,lc);
RecallMemoryManager mm(m_castorConf.rtcopydNbBufs, m_castorConf.rtcopydBufsz,lc);
TapeServerReporter tsr(m_intialProcess, m_driveConfig,
......
......@@ -95,11 +95,7 @@ void MigrationReportPacker::reportEndOfSessionWithErrors(std::string msg,int err
castor::server::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportEndofSessionWithErrors(msg,errorCode));
}
//------------------------------------------------------------------------------
//ReportSuccessful::reportStuckOn
//------------------------------------------------------------------------------
void MigrationReportPacker::reportStuckOn(FileStruct& file){
}
//------------------------------------------------------------------------------
//ReportSuccessful::execute
//------------------------------------------------------------------------------
......
......@@ -85,13 +85,6 @@ public:
* @param error_code The error code given by the drive
*/
void reportEndOfSessionWithErrors(const std::string msg,int error_code);
/**
* Create into the MigrationReportPacker a report signaling we have stuck on
* that particlar file without moving for to long
* @param file
*/
void reportStuckOn(FileStruct& file);
void startThreads() { m_workerThread.start(); }
void waitThread() { m_workerThread.wait(); }
......
......@@ -89,13 +89,7 @@ void RecallReportPacker::reportEndOfSessionWithErrors(const std::string msg,int
castor::server::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportEndofSessionWithErrors(msg,error_code));
}
//------------------------------------------------------------------------------
//ReportSuccessful::reportStuckOn
//------------------------------------------------------------------------------
void RecallReportPacker::reportStuckOn(FileStruct& file){
castor::server::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportStuck(file));
}
//------------------------------------------------------------------------------
//ReportSuccessful::execute
//------------------------------------------------------------------------------
......@@ -197,25 +191,6 @@ void RecallReportPacker::ReportError::execute(RecallReportPacker& parent){
parent.m_errorHappened=true;
}
//------------------------------------------------------------------------------
//WorkerThread::run
//------------------------------------------------------------------------------
void RecallReportPacker::ReportStuck::execute(RecallReportPacker& parent){
//we are stuck while recalling a file
const int errCode=SEINTERNAL;
const std::string msg="Stuck while reading that file";
//add the file to the list of error
RecallReportPacker::ReportError(m_recalledFile,msg,errCode).execute(parent);
//send the reports
parent.flush();
//finish the session
RecallReportPacker::ReportEndofSessionWithErrors(msg,errCode).execute(parent);
//suicide
kill(getpid(),SIGABRT);
}
//------------------------------------------------------------------------------
//WorkerThread::WorkerThread
//------------------------------------------------------------------------------
RecallReportPacker::WorkerThread::WorkerThread(RecallReportPacker& parent):
......
......@@ -76,8 +76,6 @@ public:
* @param error_code The error code given by the drive
*/
virtual void reportEndOfSessionWithErrors(const std::string msg,int error_code);
void reportStuckOn(FileStruct& file);
/**
* Start the inner thread
......@@ -119,14 +117,6 @@ private:
virtual void execute(RecallReportPacker& reportPacker);
};
class ReportStuck : public Report {
const FileStruct m_recalledFile;
public:
ReportStuck(const FileStruct& file):
Report(false),m_recalledFile(file){}
virtual void execute(RecallReportPacker& reportPacker);
};
class ReportEndofSession : public Report {
public:
......
......@@ -171,12 +171,6 @@ template <class PlaceHolder> class ReportPackerInterface{
enum detail::ReportBatching m_reportBatching;
public:
/**
* Put a message to into the queue to notify we have been stuck on
* the given file
*/
virtual void reportStuckOn(FileStruct& file) =0;
/**
* Turn off the packing of the reports by the report packer.
* This is used for recalls driven by read_tp.
......
......@@ -32,7 +32,7 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeReadSingleThread(
const client::ClientInterface::VolumeInfo& volInfo,
uint64_t maxFilesRequest,
castor::server::ProcessCap& capUtils,
TaskWatchDog<detail::Recall>& watchdog,
RecallWatchDog& watchdog,
castor::log::LogContext& lc) :
TapeSingleThreadInterface<TapeReadTask>(drive, rmc, initialProcess, volInfo,
capUtils, lc),
......
......@@ -62,7 +62,7 @@ public:
const client::ClientInterface::VolumeInfo& volInfo,
uint64_t maxFilesRequest,
castor::server::ProcessCap &capUtils,
TaskWatchDog<detail::Recall>& watchdog,
RecallWatchDog& watchdog,
castor::log::LogContext & lc);
/**
......@@ -123,7 +123,7 @@ private:
///a pointer to task injector, thus we can ask him for more tasks
castor::tape::tapeserver::daemon::RecallTaskInjector * m_taskInjector;
TaskWatchDog<detail::Recall>& m_watchdog;
RecallWatchDog& m_watchdog;
}; // class TapeReadSingleThread
......
......@@ -61,7 +61,7 @@ public:
* Acquire a free memory block from the memory manager , fill it, push it
*/
void execute(castor::tape::tapeFile::ReadSession & rs,
castor::log::LogContext & lc,TaskWatchDog<detail::Recall>& watchdog,
castor::log::LogContext & lc,RecallWatchDog& watchdog,
TapeSessionStats & stats, utils::Timer & timer) {
using castor::log::Param;
......
......@@ -30,6 +30,7 @@
#include "castor/tape/tapeserver/daemon/TapeSingleThreadInterface.hpp"
#include "castor/tape/tapeserver/daemon/TapeWriteTask.hpp"
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
#include "castor/tape/tapeserver/daemon/TapeServerReporter.hpp"
#include "castor/tape/tapeserver/drive/DriveInterface.hpp"
#include "castor/server/BlockingQueue.hpp"
#include "castor/server/Threading.hpp"
......@@ -42,10 +43,6 @@ namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
// forward declaration
class TapeServerReporter;
class TapeWriteSingleThread : public TapeSingleThreadInterface<TapeWriteTask> {
public:
/**
......
......@@ -28,7 +28,7 @@
#include "castor/log/LogContext.hpp"
#include "castor/messages/TapeserverProxy.hpp"
#include "castor/tape/tapeserver/daemon/ReportPackerInterface.hpp"
#include "castor/tape/tapeserver/daemon/TapeServerReporter.hpp"
#include "castor/tape/utils/Timer.hpp"
#include "castor/utils/utils.hpp"
namespace castor {
......@@ -37,58 +37,60 @@ namespace tape {
namespace tapeserver {
namespace daemon {
/**
* Templated class for watching tape read or write operation
* Virtual class for watching tape read or write operation (mostly complete)
*/
template <class placeHolder> class TaskWatchDog : private castor::server::Thread{
class TaskWatchDog : private castor::server::Thread{
protected:
/**
* The mutex protecting updates and the worker thread from each other
*/
castor::server::Mutex m_mutex;
typedef typename ReportPackerInterface<placeHolder>::FileStruct FileStruct;
/*
/**
* Number of blocks we moved since the last update. Has to be atomic because it is
* updated from the outside
*/
uint64_t m_nbOfMemblocksMoved;
/*
* Utility member to send heartbeat notifications at a given period.
/**
* Timer for regular heartbeat reports to parent process
*/
timeval m_previousReportTime;
castor::tape::utils::Timer m_reportTimer;
/*
* When was the last time we have been notified by the tape working thread
* How long since we last logged a warning?
*/
timeval m_previousNotifiedTime;
castor::tape::utils::Timer m_blockMovementReportTimer;
/*
* How often to we send heartbeat notifications (in second)
/**
* how long since the last block movement?
*/
const double m_periodToReport;
castor::tape::utils::Timer m_blockMovementTimer;
/*
* How long to we have to wait before saying we are stuck (in second)
/**
* How fast should we tick?
*/
const double m_stuckPeriod;
const double m_pollPeriod;
/*
* Atomic flag to stop the thread's loop
/**
* How often to we send heartbeat notifications (in second)
*/
castor::server::AtomicFlag m_stopFlag;
const double m_reportPeriod;
/*
* The proxy that will receive or heartbeat notifications
/**
* How long to we have to wait before saying we are stuck (in second)
*/
messages::TapeserverProxy& m_initialProcess;
const double m_stuckPeriod;
/*
* An interace on the report packer for sending a message to the client if we
* were to be stuck
* Atomic flag to stop the thread's loop
*/
ReportPackerInterface<placeHolder>& m_reportPacker;
castor::server::AtomicFlag m_stopFlag;
/*
* The file we are operating
* The proxy that will receive or heartbeat notifications
*/
FileStruct m_file;
messages::TapeserverProxy& m_initialProcess;
/*
* Is the system at _this very moment_ reading or writing on tape
......@@ -98,53 +100,50 @@ template <class placeHolder> class TaskWatchDog : private castor::server::Thread
* Logging system
*/
log::LogContext m_lc;
/*
* Member function actually logging the file.
*/
virtual void logStuckFile() = 0;
/**
* Thread;s loop
* Thread's loop
*/
void run(){
timeval currentTime;
// reset timers as we don't know how long it took before the thread started
m_reportTimer.reset();
m_blockMovementReportTimer.reset();
m_blockMovementTimer.reset();
while(!m_stopFlag) {
//CRITICAL SECTION
// Critical section block for internal watchdog
{
castor::server::MutexLocker locker(&m_mutex);
castor::utils::getTimeOfDay(&currentTime);
timeval diffTimeStuck = castor::utils::timevalAbsDiff(currentTime,m_previousNotifiedTime);
double diffTimeStuckd = castor::utils::timevalToDouble(diffTimeStuck);
//useful if we are stuck on a particular file
if(diffTimeStuckd>m_stuckPeriod && m_fileBeingMoved){
m_reportPacker.reportStuckOn(m_file);
if(m_fileBeingMoved &&
m_blockMovementTimer.secs()>m_stuckPeriod &&
m_blockMovementReportTimer.secs()>m_stuckPeriod){
// We are stuck while moving a file. We will log that.
logStuckFile();
m_blockMovementReportTimer.reset();
break;
}
}
timeval diffTimeReport = castor::utils::timevalAbsDiff(currentTime,m_previousReportTime);
double diffTimeReportd = castor::utils::timevalToDouble(diffTimeReport);
//heartbeat to notify activity to the mother
if(diffTimeReportd > m_periodToReport){
if(m_reportTimer.secs() > m_reportPeriod){
castor::server::MutexLocker locker(&m_mutex);
m_lc.log(LOG_DEBUG,"going to report");
m_previousReportTime=currentTime;
//m_initialProcess.notifyHeartbeat("PIPPO", m_nbOfMemblocksMoved);
m_reportTimer.reset();
m_initialProcess.notifyHeartbeat("Heartbeat report", m_nbOfMemblocksMoved);
m_nbOfMemblocksMoved=0;
}
else{
usleep(100000);
usleep(m_pollPeriod*1000*1000);
}
}
}
/*
*
*/
void updateStuckTime(){
timeval tmpTime;
castor::utils::getTimeOfDay(&tmpTime);
m_previousNotifiedTime=tmpTime;
}
public:
/**
......@@ -155,24 +154,21 @@ template <class placeHolder> class TaskWatchDog : private castor::server::Thread
* @param reportPacker
* @param lc To log the events
*/
TaskWatchDog(double periodToReport,double stuckPeriod,
TaskWatchDog(double reportPeriod,double stuckPeriod,
messages::TapeserverProxy& initialProcess,
ReportPackerInterface<placeHolder>& reportPacker,
log::LogContext lc):
m_nbOfMemblocksMoved(0), m_periodToReport(periodToReport),
m_stuckPeriod(stuckPeriod), m_initialProcess(initialProcess),
m_reportPacker(reportPacker), m_fileBeingMoved(false), m_lc(lc) {
log::LogContext lc, double pollPeriod = 0.1):
m_nbOfMemblocksMoved(0), m_pollPeriod(pollPeriod),
m_reportPeriod(reportPeriod), m_stuckPeriod(stuckPeriod),
m_initialProcess(initialProcess), m_fileBeingMoved(false), m_lc(lc) {
m_lc.pushOrReplace(log::Param("thread","Watchdog"));
castor::utils::getTimeOfDay(&m_previousReportTime);
updateStuckTime();
}
/**
* notify the wtachdog a mem block has been moved
* notify the watchdog a mem block has been moved
*/
void notify(){
castor::server::MutexLocker locker(&m_mutex);
updateStuckTime();
m_blockMovementTimer.reset();
m_nbOfMemblocksMoved++;
}
......@@ -184,31 +180,60 @@ template <class placeHolder> class TaskWatchDog : private castor::server::Thread
}
/**
* Ask tp stop the watchdog thread and join it
* Ask to stop the watchdog thread and join it
*/
void stopAndWaitThread(){
m_stopFlag.set();
wait();
}
};
/**
* Implementation of TaskWatchDog for recalls
*/
class RecallWatchDog: public TaskWatchDog {
private:
/** Our file type */
typedef castor::tape::tapegateway::FileToRecallStruct FileStruct;
/** The file we are working on */
FileStruct m_file;
virtual void logStuckFile() {
castor::log::ScopedParamContainer params(m_lc);
params.addTiming("TimeSinceLastBlockMove", m_blockMovementTimer.usecs())
.add("Path",m_file.path())
.add("FILEID",m_file.fileid())
.add("fSeq",m_file.fseq());
m_lc.log(LOG_WARNING, "No tape block movement for too long");
}
public:
/** Pass through constructor */
RecallWatchDog(double periodToReport,double stuckPeriod,
messages::TapeserverProxy& initialProcess,
log::LogContext lc, double pollPeriod = 0.1):
TaskWatchDog(periodToReport, stuckPeriod, initialProcess, lc, pollPeriod) {}
/**
* Notify the watchdog which file we are operating
* @param file
*/
void notifyBeginNewJob(const FileStruct& file){
castor::server::MutexLocker locker(&m_mutex);
m_file=file;
m_fileBeingMoved=true;
}
/**
* Notify the watchdog we have finished operating on the current file
*/
void fileFinished(){
castor::server::MutexLocker locker(&m_mutex);
m_fileBeingMoved=false;
m_file=FileStruct();
}
void notifyBeginNewJob(const FileStruct& file){
castor::server::MutexLocker locker(&m_mutex);
m_file=file;
m_fileBeingMoved=true;
}
/**
* Notify the watchdog we have finished operating on the current file
*/
void fileFinished(){
castor::server::MutexLocker locker(&m_mutex);
m_fileBeingMoved=false;
m_file=FileStruct();
}
};
}}}}
......@@ -34,7 +34,7 @@ namespace unitTests {
struct MockReportPacker :
public tapeserver::daemon::ReportPackerInterface<tapeserver::daemon::detail::Recall> {
MOCK_METHOD1(reportStuckOn, void(FileStruct& file));
// TODO MOCK_METHOD1(reportStuckOn, void(FileStruct& file));
MockReportPacker(tapeserver::client::ClientInterface & tg,castor::log::LogContext lc):
tapeserver::daemon::ReportPackerInterface<castor::tape::tapeserver::daemon::detail::Recall>(tg,lc){
......@@ -45,49 +45,50 @@ namespace unitTests {
TEST(castor_tape_tapeserver_daemon, WatchdogTestStuckWithNothing) {
const double periodToReport = 10;
const double stuckPeriod = 1;
const double periodToReport = 10; // We wont report in practice
const double stuckPeriod = 0.01;
const double pollPeriod = 0.01;
castor::log::StringLogger log("castor_tape_tapeserver_daemon_WatchdogTestStuck");
castor::log::LogContext lc(log);
MockClient mockClient;
MockReportPacker mockReportPacker(mockClient,lc);
castor::messages::TapeserverProxyDummy dummyInitialProcess;
//we dont tell the watchdog we are working on file,
//it should not report as being stuck
EXPECT_CALL(mockReportPacker,reportStuckOn(_)).Times(0);
tapeserver::daemon::TaskWatchDog<tapeserver::daemon::detail::Recall>
watchdog(periodToReport,stuckPeriod,dummyInitialProcess,mockReportPacker,lc);
tapeserver::daemon::RecallWatchDog watchdog(periodToReport,
stuckPeriod,dummyInitialProcess,lc,pollPeriod);
watchdog.startThread();
sleep(2);
usleep(100000);
watchdog.stopAndWaitThread();
//we dont tell the watchdog we are working on file,
//it should not report as being stuck
ASSERT_EQ(std::string::npos, log.getLog().find("No tape block movement for too long"));
}
TEST(castor_tape_tapeserver_daemon, WatchdogTestStuck) {
const double periodToReport = 10;
const double stuckPeriod = 1;
TEST(castor_tape_tapeserver_daemon, RecallWatchdogTestStuck) {
const double reportPeriod = 10; // We wont report in practice
const double stuckPeriod = 0.01;
const double pollPeriod = 0.01;
castor::log::StringLogger log("castor_tape_tapeserver_daemon_WatchdogTestStuck");
castor::log::LogContext lc(log);
MockClient mockClient;
MockReportPacker mockReportPacker(mockClient,lc);
castor::messages::TapeserverProxyDummy dummyInitialProcess;
//we dont tell the watchdog we are working on file,
//it should not report as being stuck
EXPECT_CALL(mockReportPacker,reportStuckOn(_)).Times(1);
tapeserver::daemon::TaskWatchDog<tapeserver::daemon::detail::Recall>
watchdog(periodToReport,stuckPeriod,dummyInitialProcess,mockReportPacker,lc);
// We will poll for a
tapeserver::daemon::RecallWatchDog watchdog(reportPeriod,stuckPeriod,
dummyInitialProcess,lc, pollPeriod);
watchdog.startThread();
tapeserver::daemon::ReportPackerInterface<castor::tape::tapeserver::daemon::detail::Recall>::FileStruct file;
watchdog.notifyBeginNewJob(file);
sleep(2);
usleep(100000);
watchdog.stopAndWaitThread();
// This time the internal watchdog should have triggered
ASSERT_NE(std::string::npos, log.getLog().find("No tape block movement for too long"));
}
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment