Commit 2c47340e authored by Steven Murray's avatar Steven Murray
Browse files

Watchdog of tapeserverd parent-process now covers jobs and mounts

parent 404cfa37
......@@ -604,6 +604,14 @@
## Tapeserverd #################################################################
# The maximum time in seconds that the data-transfer session can take to get the
# transfer job from the client.
#TAPESERVERD WAITJOBTIMEOUT 60
# The maximum time in seconds that the data-transfer session can take to mount a
# tape.
#TAPESERVERD MOUNTTIMEOUT 900
# The maximum time in seconds the data-transfer session of tapeserverd can
# cease to move data blocks
#TAPESERVERD BLKMOVETIMEOUT 300
......
......@@ -21,7 +21,9 @@
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "castor/common/CastorConfiguration.hpp"
#include "castor/tape/tapeserver/daemon/Catalogue.hpp"
#include "castor/tape/tapeserver/daemon/Constants.hpp"
#include "castor/utils/utils.hpp"
#include <string.h>
......@@ -107,6 +109,16 @@ void castor::tape::tapeserver::daemon::Catalogue::populate(
void castor::tape::tapeserver::daemon::Catalogue::enterDriveConfig(
const utils::DriveConfig &driveConfig) {
common::CastorConfiguration &castorConf =
common::CastorConfiguration::getConfig();
const time_t waitJobTimeoutInSecs = castorConf.getConfEntInt("TAPESERVERD",
"WAITJOBTIMEOUT", (time_t)TAPESERVER_WAITJOBTIMEOUT_DEFAULT, &m_log);
const time_t mountTimeoutInSecs = castorConf.getConfEntInt("TAPESERVERD",
"MOUNTTIMEOUT", (time_t)TAPESERVER_MOUNTTIMEOUT_DEFAULT, &m_log);
const time_t blockMoveTimeoutInSec = castorConf.getConfEntInt("TAPESERVERD",
"BLKMOVETIMEOUT", (time_t)TAPESERVER_BLKMOVETIMEOUT_DEFAULT, &m_log);
DriveMap::iterator itor = m_drives.find(driveConfig.unitName);
// If the drive is not in the catalogue
......@@ -114,7 +126,8 @@ void castor::tape::tapeserver::daemon::Catalogue::enterDriveConfig(
// Insert it
m_drives[driveConfig.unitName] = new CatalogueDrive(m_netTimeout,
m_log, m_processForker, m_cupv, m_vdqm, m_vmgr, m_hostName, driveConfig,
CatalogueDrive::DRIVE_STATE_DOWN);
CatalogueDrive::DRIVE_STATE_DOWN, waitJobTimeoutInSecs,
mountTimeoutInSecs, blockMoveTimeoutInSec);
// Else the drive is already in the catalogue
} else {
castor::exception::Exception ex;
......
......@@ -21,7 +21,6 @@
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "castor/common/CastorConfiguration.hpp"
#include "castor/exception/Exception.hpp"
#include "castor/tape/tapeserver/daemon/CatalogueDrive.hpp"
#include "castor/tape/tapeserver/daemon/Constants.hpp"
......@@ -43,7 +42,10 @@ castor::tape::tapeserver::daemon::CatalogueDrive::CatalogueDrive(
legacymsg::VmgrProxy &vmgr,
const std::string &hostName,
const utils::DriveConfig &config,
const DriveState state)
const DriveState state,
const time_t waitJobTimeoutInSecs,
const time_t mountTimeoutInSecs,
const time_t blockMoveTimeoutInSecs)
throw():
m_netTimeout(netTimeout),
m_log(log),
......@@ -54,6 +56,9 @@ castor::tape::tapeserver::daemon::CatalogueDrive::CatalogueDrive(
m_hostName(hostName),
m_config(config),
m_state(state),
m_waitJobTimeoutInSecs(waitJobTimeoutInSecs),
m_mountTimeoutInSecs(mountTimeoutInSecs),
m_blockMoveTimeoutInSecs(blockMoveTimeoutInSecs),
m_sessionType(SESSION_TYPE_NONE),
m_session(NULL) {
}
......@@ -479,12 +484,6 @@ void castor::tape::tapeserver::daemon::CatalogueDrive::receivedVdqmJob(
m_state = DRIVE_STATE_RUNNING;
m_sessionType = SESSION_TYPE_DATATRANSFER;
{
const unsigned short rmcPort =
common::CastorConfiguration::getConfig().getConfEntInt("RMC", "PORT",
(unsigned short)RMC_PORT, &m_log);
const time_t blockMoveTimeoutInSecs =
common::CastorConfiguration::getConfig().getConfEntInt("TAPESERVERD",
"BLKMOVETIMEOUT", TAPESERVER_BLKMOVETIMEOUT_DEFAULT, &m_log);
CatalogueTransferSession *const transferSession =
CatalogueTransferSession::create(
m_log,
......@@ -494,8 +493,9 @@ void castor::tape::tapeserver::daemon::CatalogueDrive::receivedVdqmJob(
m_vmgr,
m_cupv,
m_hostName,
blockMoveTimeoutInSecs,
rmcPort,
m_waitJobTimeoutInSecs,
m_mountTimeoutInSecs,
m_blockMoveTimeoutInSecs,
m_processForker);
m_session = dynamic_cast<CatalogueSession *>(transferSession);
m_vdqm.assignDrive(m_hostName, m_config.unitName, job.dgn,
......
......@@ -175,6 +175,12 @@ public:
* name is needed to fill in messages to be sent to the vdqmd daemon.
* @param config The configuration of the tape drive.
* @param state The initial state of the tape drive.
* @param waitJobTimeoutInSecs The maximum time in seconds that the
* data-transfer session can take to get the transfer job from the client.
* @param mountTimeoutInSecs The maximum time in seconds that the
* data-transfer session can take to mount a tape.
* @param blockMoveTimeoutInSecs The maximum time in seconds that the
* data-transfer session can cease to move data blocks.
*/
CatalogueDrive(
const int netTimeout,
......@@ -185,7 +191,10 @@ public:
legacymsg::VmgrProxy &vmgr,
const std::string &hostName,
const utils::DriveConfig &config,
const DriveState state) throw();
const DriveState state,
const time_t waitJobTimeoutInSecs,
const time_t mountTimeoutInSecs,
const time_t blockMoveTimeoutInSecs) throw();
/**
* Destructor
......@@ -469,6 +478,24 @@ private:
* The current state of the tape drive.
*/
DriveState m_state;
/**
* The maximum time in seconds that the data-transfer session can take to get
* the transfer job from the client.
*/
const time_t m_waitJobTimeoutInSecs;
/**
* The maximum time in seconds that the data-transfer session can take to
* mount a tape.
*/
const time_t m_mountTimeoutInSecs;
/**
* The maximum time in seconds that the data-transfer session can cease to
* move data blocks.
*/
const time_t m_blockMoveTimeoutInSecs;
/**
* The type of data-transfer session.
......
......@@ -46,8 +46,9 @@ castor::tape::tapeserver::daemon::CatalogueTransferSession*
legacymsg::VmgrProxy &vmgr,
legacymsg::CupvProxy &cupv,
const std::string &hostName,
const time_t waitJobTimeoutInSecs,
const time_t mountTimeoutInSecs,
const time_t blockMoveTimeoutInSecs,
const unsigned short rmcPort,
ProcessForkerProxy &processForker) {
const pid_t pid = processForker.forkDataTransfer(driveConfig, vdqmJob);
......@@ -61,6 +62,8 @@ castor::tape::tapeserver::daemon::CatalogueTransferSession*
vmgr,
cupv,
hostName,
waitJobTimeoutInSecs,
mountTimeoutInSecs,
blockMoveTimeoutInSecs);
}
......@@ -77,16 +80,21 @@ castor::tape::tapeserver::daemon::CatalogueTransferSession::
legacymsg::VmgrProxy &vmgr,
legacymsg::CupvProxy &cupv,
const std::string &hostName,
const time_t waitJobTimeoutInSecs,
const time_t mountTimeoutInSecs,
const time_t blockMoveTimeoutInSecs) throw():
CatalogueSession(log, netTimeout, pid, driveConfig),
m_state(TRANSFERSTATE_WAIT_JOB),
m_mode(WRITE_DISABLE),
m_lastTimeSomeBlocksWereMoved(time(0)),
m_assignmentTime(time(0)),
m_mountStartTime(0),
m_lastTimeSomeBlocksWereMoved(0),
m_vdqmJob(vdqmJob),
m_vmgr(vmgr),
m_cupv(cupv),
m_hostName(hostName),
m_waitJobTimeoutInSecs(waitJobTimeoutInSecs),
m_mountTimeoutInSecs(mountTimeoutInSecs),
m_blockMoveTimeoutInSecs(blockMoveTimeoutInSecs) {
}
......@@ -94,23 +102,84 @@ castor::tape::tapeserver::daemon::CatalogueTransferSession::
// tick
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::CatalogueTransferSession::tick() {
switch(m_state) {
case TRANSFERSTATE_WAIT_JOB: return waitJobTick();
case TRANSFERSTATE_WAIT_MOUNTED: return waitMountedTick();
case TRANSFERSTATE_RUNNING: return runningTick();
default: return;
}
}
//------------------------------------------------------------------------------
// waitJobTick
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::CatalogueTransferSession::waitJobTick() {
const time_t now = time(0);
const time_t secsWaiting = now - m_assignmentTime;
const bool timeOutExceeded = secsWaiting > m_waitJobTimeoutInSecs;
if(timeOutExceeded) {
std::list<log::Param> params;
params.push_back(log::Param("transferSessionPid", m_pid));
params.push_back(log::Param("secsWaiting", secsWaiting));
params.push_back(log::Param("waitJobTimeoutInSecs",
m_waitJobTimeoutInSecs));
m_log(LOG_ERR,
"Killing data-transfer session because transfer job is too late",
params);
if(kill(m_pid, SIGKILL)) {
const std::string errnoStr = castor::utils::errnoToString(errno);
params.push_back(log::Param("message", errnoStr));
m_log(LOG_ERR, "Failed to kill data-transfer session", params);
}
}
}
//------------------------------------------------------------------------------
// waitMountTick
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::CatalogueTransferSession::
waitMountedTick() {
const time_t now = time(0);
const time_t secsSinceSomeBlocksWereMoved = now -
m_lastTimeSomeBlocksWereMoved;
const bool timeOutExceeded = secsSinceSomeBlocksWereMoved >
m_blockMoveTimeoutInSecs;
// Only execute watchdog logic when the tape has been mounted and the
// session is running, because it is not fair to apply the watchdog logic
// whilst a tape is being mounted
if(TRANSFERSTATE_RUNNING == m_state && timeOutExceeded) {
const time_t secsWaiting = now - m_mountStartTime;
const bool timeOutExceeded = secsWaiting > m_mountTimeoutInSecs;
if(timeOutExceeded) {
std::list<log::Param> params;
params.push_back(log::Param("transferSessionPid", m_pid));
params.push_back(log::Param("secsWaiting", secsWaiting));
params.push_back(log::Param("mountTimeoutInSecs",
m_mountTimeoutInSecs));
m_log(LOG_ERR,
"Killing data-transfer session because tape mount is taking too long",
params);
if(kill(m_pid, SIGKILL)) {
const std::string errnoStr = castor::utils::errnoToString(errno);
params.push_back(log::Param("message", errnoStr));
m_log(LOG_ERR, "Failed to kill data-transfer session", params);
}
}
}
//------------------------------------------------------------------------------
// runningTick
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::CatalogueTransferSession::runningTick() {
const time_t now = time(0);
const time_t secsWaiting = now - m_lastTimeSomeBlocksWereMoved;
const bool timeOutExceeded = secsWaiting > m_blockMoveTimeoutInSecs;
if(timeOutExceeded) {
std::list<log::Param> params;
params.push_back(log::Param("transferSessionPid", m_pid));
params.push_back(log::Param("secsSinceSomeBlocksWereMoved",
secsSinceSomeBlocksWereMoved));
params.push_back(log::Param("secsWaiting", secsWaiting));
params.push_back(log::Param("blockMoveTimeoutInSecs",
m_blockMoveTimeoutInSecs));
m_log(LOG_ERR, "Killing data-transfer session because it is stuck", params);
m_log(LOG_ERR,
"Killing data-transfer session because data blocks are not being moved",
params);
if(kill(m_pid, SIGKILL)) {
const std::string errnoStr = castor::utils::errnoToString(errno);
......@@ -168,6 +237,7 @@ void castor::tape::tapeserver::daemon::CatalogueTransferSession::
checkUserCanRecallFromTape(vid);
m_mountStartTime = time(0);
m_state = TRANSFERSTATE_WAIT_MOUNTED;
m_mode = WRITE_DISABLE;
......@@ -241,6 +311,7 @@ void castor::tape::tapeserver::daemon::CatalogueTransferSession::
checkUserCanMigrateToTape(vid);
m_mountStartTime = time(0);
m_state = TRANSFERSTATE_WAIT_MOUNTED;
m_mode = WRITE_ENABLE;
......
......@@ -57,6 +57,10 @@ public:
* @param cupv Proxy object representing the cupvd daemon.
* @param hostName The host name to be used as the target host when
* communicating with the cupvd daemon.
* @param waitJobTimeoutInSecs The maximum time in seconds that the
* data-transfer session can take to get the transfer job from the client.
* @param mountTimeoutInSecs The maximum time in seconds that the
* data-transfer session can take to mount a tape.
* @param blockMoveTimeoutInSecs The maximum time in seconds that the
* data-transfer session can cease to move data blocks.
* @param rmcPort The TCP/IP port on which the rmcd daemon is listening.
......@@ -71,8 +75,9 @@ public:
legacymsg::VmgrProxy &vmgr,
legacymsg::CupvProxy &cupv,
const std::string &hostName,
const time_t waitJobTimeoutInSecs,
const time_t mountTimeoutInSecs,
const time_t blockMoveTimeoutInSecs,
const unsigned short rmcPort,
ProcessForkerProxy &processForker);
/**
......@@ -229,6 +234,10 @@ protected:
* @param cupv Proxy object representing the cupvd daemon.
* @param hostName The host name to be used as the target host when
* communicating with the cupvd daemon.
* @param waitJobTimeoutInSecs The maximum time in seconds that the
* data-transfer session can take to get the transfer job from the client.
* @param mountTimeoutInSecs The maximum time in seconds that the
* data-transfer session can take to mount a tape.
* @param blockMoveTimeoutInSecs The maximum time in seconds that the
* data-transfer session can cease to move data blocks.
*/
......@@ -241,6 +250,8 @@ protected:
legacymsg::VmgrProxy &vmgr,
legacymsg::CupvProxy &cupv,
const std::string &hostName,
const time_t waitJobTimeoutInSecs,
const time_t mountTimeoutInSecs,
const time_t blockMoveTimeoutInSecs) throw();
private:
......@@ -277,15 +288,21 @@ private:
uint16_t m_mode;
/**
* The last time at which some data blocks were moved by the data-transfer
* session.
* The time at which the tape drive was assigned a data transfer job.
*/
time_t m_lastTimeSomeBlocksWereMoved;
const time_t m_assignmentTime;
/**
* The time at which the tape drive was assigned a data transfer job.
* The time at which this catalogue session started waiting for the data
* transfer-session to mount the tape.
*/
const time_t m_assignmentTime;
time_t m_mountStartTime;
/**
* The last time at which some data blocks were moved by the data-transfer
* session.
*/
time_t m_lastTimeSomeBlocksWereMoved;
/**
* The job received from the vdqmd daemon.
......@@ -308,12 +325,42 @@ private:
*/
const std::string m_hostName;
/**
* The maximum time in seconds that the data-transfer session can take to get
* the transfer job from the client.
*/
const time_t m_waitJobTimeoutInSecs;
/**
* The maximum time in seconds that the data-transfer session can take to
* mount a tape.
*/
const time_t m_mountTimeoutInSecs;
/**
* The maximum time in seconds that the data-transfer session can cease to
* move data blocks.
*/
const time_t m_blockMoveTimeoutInSecs;
/**
* A tick has occurred whilst the catalogue transfer-session is in thei
* TRANSFERSTATE_WAIT_JOB state.
*/
void waitJobTick();
/**
* A tick has occurred whilst the catalogue transfer-session is in thei
* TRANSFERSTATE_WAIT_MOUNTED state.
*/
void waitMountedTick();
/**
* A tick has occurred whilst the catalogue transfer-session is in thei
* TRANSFERSTATE_RUNNING state.
*/
void runningTick();
}; // class CatalogueTransferSession
} // namespace daemon
......
......@@ -50,6 +50,18 @@ const unsigned short TAPESERVER_LABELCMD_LISTENING_PORT = 54321;
*/
const unsigned short TAPESERVER_INTERNAL_LISTENING_PORT = 54322;
/**
* The compile-time default value for the maximum time in seconds that the
* data-transfer session can take to get the transfer job from the client.
*/
const time_t TAPESERVER_WAITJOBTIMEOUT_DEFAULT = 60; // 1 minute
/**
* The compile-time default value for the maximum time in seconds that the
* data-transfer session can take to mount a tape.
*/
const time_t TAPESERVER_MOUNTTIMEOUT_DEFAULT = 900; // 15 minutes
/**
* The compile-time default value for the maximum time in seconds the
* data-transfer session of tapeserverd can cease to move data blocks.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment