Commit 942a4cd8 authored by Eric Cano's avatar Eric Cano
Browse files

Added support for heartbeat and moved bytes to drive handler and process (cta/CTA#69).

parent dd6d1764
......@@ -127,11 +127,12 @@ public:
stillReading = false;
}
localStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter);
localStats.dataVolume += mb->m_payload.size();
auto blockSize = mb->m_payload.size();
localStats.dataVolume += blockSize;
// Pass the block to the disk write task
m_fifo.pushDataBlock(mb);
mb=NULL;
watchdog.notify();
watchdog.notify(blockSize);
localStats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter);
} //end of while(stillReading)
// we have to signal the end of the tape read to the disk write task.
......
......@@ -113,7 +113,7 @@ namespace daemon {
m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter);
m_taskStats.dataVolume += mb->m_payload.size();
watchdog.notify();
watchdog.notify(mb->m_payload.size());
++memBlockId;
}
......
......@@ -54,7 +54,7 @@ protected:
* Number of blocks we moved since the last update. Has to be atomic because it is
* updated from the outside
*/
uint64_t m_nbOfMemblocksMoved;
uint64_t m_TapeBytesMovedMoved;
/**
* Statistics of the current session
......@@ -252,9 +252,8 @@ protected:
cta::threading::MutexLocker locker(m_mutex);
m_lc.log(cta::log::DEBUG,"going to report");
m_reportTimer.reset();
m_initialProcess.reportHeartbeat(m_nbOfMemblocksMoved, 0);
m_initialProcess.reportHeartbeat(m_TapeBytesMovedMoved, 0);
reportStats();
m_nbOfMemblocksMoved=0;
}
else{
usleep(m_pollPeriod*1000*1000);
......@@ -304,7 +303,7 @@ protected:
cta::tape::daemon::TapedProxy& initialProcess,
const std::string & driveUnitName,
cta::log::LogContext& lc, double pollPeriod = 0.1):
m_nbOfMemblocksMoved(0), m_statsSet(false), m_pollPeriod(pollPeriod),
m_TapeBytesMovedMoved(0), m_statsSet(false), m_pollPeriod(pollPeriod),
m_reportPeriod(reportPeriod), m_stuckPeriod(stuckPeriod),
m_initialProcess(initialProcess), m_driveUnitName(driveUnitName),
m_fileBeingMoved(false), m_lc(lc) {
......@@ -314,10 +313,10 @@ protected:
/**
* notify the watchdog a mem block has been moved
*/
void notify(){
void notify(uint64_t movedBytes){
cta::threading::MutexLocker locker(m_mutex);
m_blockMovementTimer.reset();
m_nbOfMemblocksMoved++;
m_TapeBytesMovedMoved+=movedBytes;
}
/**
......
......@@ -247,7 +247,7 @@ SubprocessHandler::ProcessingStatus DriveHandler::processEvent() {
message.ParseFromString(m_socketPair->receive());
// Logs are processed in all cases
processLogs(message);
// If we report bytes, process the report
// If we report bytes, process the report (this is a heartbeat)
if (message.reportingbytes()) {
processBytes(message);
}
......@@ -499,29 +499,12 @@ SubprocessHandler::ProcessingStatus DriveHandler::processRunning(serializers::Wa
if (m_sessionState!=(SessionState)message.sessionstate()) {
m_lastStateChangeTime=std::chrono::steady_clock::now();
m_lastDataMovementTime=std::chrono::steady_clock::now();
// heartbeat is update further down.
}
// In all cases, this is a heartbeat.
m_lastHeartBeatTime=std::chrono::steady_clock::now();
// Record the state
}
// Record the state in all cases. Child process knows better.
m_sessionState=(SessionState)message.sessionstate();
m_sessionType=(SessionType)message.sessiontype();
m_sessionVid=message.vid();
// Record data moved totals if needed.
if (m_totalTapeBytesMoved != message.totaltapebytesmoved()||
m_totalDiskBytesMoved != message.totaldiskbytesmoved()) {
if (message.totaltapebytesmoved()<m_totalTapeBytesMoved||
message.totaldiskbytesmoved()<m_totalDiskBytesMoved) {
params.add("PreviousTapeBytesMoved", m_totalTapeBytesMoved)
.add("PreviousDiskBytesMoved", m_totalDiskBytesMoved)
.add("NewTapeBytesMoved", message.totaltapebytesmoved())
.add("NewDiskBytesMoved", message.totaldiskbytesmoved());
m_processManager.logContext().log(log::WARNING, "In DriveHandler::processRunning(): total bytes moved going backwards");
}
m_totalTapeBytesMoved=message.totaltapebytesmoved();
m_totalDiskBytesMoved=message.totaldiskbytesmoved();
m_lastDataMovementTime=std::chrono::steady_clock::now();
}
// We can now compute the timeout and check for potential exceeding of timeout
m_processingStatus.nextTimeout = nextTimeout();
return m_processingStatus;
......@@ -649,7 +632,25 @@ void DriveHandler::processLogs(serializers::WatchdogMessage& message) {
// DriveHandler::processBytes
//------------------------------------------------------------------------------
void DriveHandler::processBytes(serializers::WatchdogMessage& message) {
throw cta::exception::Exception("In DriveHandler::processBytes(): not implemented");
// In all cases, this is a heartbeat.
m_lastHeartBeatTime=std::chrono::steady_clock::now();
// Record data moved totals if needed.
if (m_totalTapeBytesMoved != message.totaltapebytesmoved()||
m_totalDiskBytesMoved != message.totaldiskbytesmoved()) {
if (message.totaltapebytesmoved()<m_totalTapeBytesMoved||
message.totaldiskbytesmoved()<m_totalDiskBytesMoved) {
log::ScopedParamContainer params(m_processManager.logContext());
params.add("PreviousTapeBytesMoved", m_totalTapeBytesMoved)
.add("PreviousDiskBytesMoved", m_totalDiskBytesMoved)
.add("NewTapeBytesMoved", message.totaltapebytesmoved())
.add("NewDiskBytesMoved", message.totaldiskbytesmoved());
m_processManager.logContext().log(log::WARNING, "In DriveHandler::processRunning(): total bytes moved going backwards");
}
m_totalTapeBytesMoved=message.totaltapebytesmoved();
m_totalDiskBytesMoved=message.totaldiskbytesmoved();
m_lastDataMovementTime=std::chrono::steady_clock::now();
}
}
//------------------------------------------------------------------------------
......@@ -768,7 +769,7 @@ SubprocessHandler::ProcessingStatus DriveHandler::processTimeout() {
} catch (...) {}
try {
decltype (SubprocessHandler::ProcessingStatus::nextTimeout) nextTimeout =
m_lastStateChangeTime + m_heartbeatTimeouts.at(m_sessionState);
m_lastHeartBeatTime + m_heartbeatTimeouts.at(m_sessionState);
std::chrono::duration<double> timeToTimeout = nextTimeout - now;
params.add("BeforeHeartbeatTimeout_s", timeToTimeout.count());
} catch (...) {}
......
......@@ -60,8 +60,14 @@ void DriveHandlerProxy::labelError(const std::string& unitName, const std::strin
}
void DriveHandlerProxy::reportHeartbeat(uint64_t totalTapeBytesMoved, uint64_t totalDiskBytesMoved) {
// TODO
throw cta::exception::Exception("In DriveHandlerProxy::reportHeartbeat(): not implemented");
serializers::WatchdogMessage watchdogMessage;
watchdogMessage.set_reportingstate(false);
watchdogMessage.set_reportingbytes(true);
watchdogMessage.set_totaltapebytesmoved(totalTapeBytesMoved);
watchdogMessage.set_totaldiskbytesmoved(totalDiskBytesMoved);
std::string buffer;
watchdogMessage.SerializeToString(&buffer);
m_socketPair.send(buffer);
}
void DriveHandlerProxy::reportState(const cta::tape::session::SessionState state, const cta::tape::session::SessionType type, const std::string& vid) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment