Commit 1b43a520 authored by Steven Murray's avatar Steven Murray
Browse files

Implemented tapeserverd parent process side of the watchdog logic

parent cd091850
......@@ -635,6 +635,12 @@
# server
#TAPEGATEWAY PORT 62801
## Tapeserverd #################################################################
# The maximum time in seconds the data-transfer session of tapeserverd can
# cease to move data blocks
#TAPESERVERD BLKMOVETIMEOUT 300
## Tape Bridge #################################################################
# The port on which the tapebridge will listen for RTCOPY jobs from the VDQM.
......
......@@ -299,7 +299,7 @@ void castor::tape::label::LabelCmd::writeTapeLabelRequest(const int timeout) {
char buf[REQBUFSZ];
const size_t len = castor::legacymsg::marshal(buf, sizeof(buf), body);
m_smartClientConnectionSock.reset(castor::io::connectWithTimeout("127.0.0.1",
castor::tape::tapeserver::daemon::TAPE_SERVER_LABELCMD_LISTENING_PORT, timeout));
tapeserver::daemon::TAPESERVER_LABELCMD_LISTENING_PORT, timeout));
try {
castor::io::writeBytes(m_smartClientConnectionSock.get(), timeout, len, buf);
......
......@@ -24,6 +24,7 @@
#include "castor/common/CastorConfiguration.hpp"
#include "castor/exception/Exception.hpp"
#include "castor/tape/tapeserver/daemon/CatalogueDrive.hpp"
#include "castor/tape/tapeserver/daemon/Constants.hpp"
#include "castor/utils/utils.hpp"
#include "h/Ctape_constants.h"
#include "h/rmc_constants.h"
......@@ -482,6 +483,9 @@ void castor::tape::tapeserver::daemon::CatalogueDrive::receivedVdqmJob(
const unsigned short rmcPort =
common::CastorConfiguration::getConfig().getConfEntInt("RMC", "PORT",
(unsigned short)RMC_PORT, &m_log);
const time_t blockMoveTimeoutInSecs =
common::CastorConfiguration::getConfig().getConfEntInt("TAPESERVERD",
"BLKMOVETIMEOUT", TAPESERVER_BLKMOVETIMEOUT_DEFAULT, &m_log);
CatalogueTransferSession *const transferSession =
CatalogueTransferSession::create(
m_log,
......@@ -492,6 +496,7 @@ void castor::tape::tapeserver::daemon::CatalogueDrive::receivedVdqmJob(
m_vmgr,
m_cupv,
m_hostName,
blockMoveTimeoutInSecs,
rmcPort,
m_processForker);
m_session = dynamic_cast<CatalogueSession *>(transferSession);
......
......@@ -31,6 +31,9 @@
#include "h/rmc_constants.h"
#include "h/vmgr_constants.h"
#include <sys/types.h>
#include <signal.h>
//------------------------------------------------------------------------------
// create
//------------------------------------------------------------------------------
......@@ -44,6 +47,7 @@ castor::tape::tapeserver::daemon::CatalogueTransferSession*
legacymsg::VmgrProxy &vmgr,
legacymsg::CupvProxy &cupv,
const std::string &hostName,
const time_t blockMoveTimeoutInSecs,
const unsigned short rmcPort,
ProcessForkerProxy &processForker) {
......@@ -59,7 +63,8 @@ castor::tape::tapeserver::daemon::CatalogueTransferSession*
vdqmJob,
vmgr,
cupv,
hostName);
hostName,
blockMoveTimeoutInSecs);
}
//------------------------------------------------------------------------------
......@@ -75,22 +80,44 @@ castor::tape::tapeserver::daemon::CatalogueTransferSession::
const legacymsg::RtcpJobRqstMsgBody &vdqmJob,
legacymsg::VmgrProxy &vmgr,
legacymsg::CupvProxy &cupv,
const std::string &hostName) throw():
const std::string &hostName,
const time_t blockMoveTimeoutInSecs) throw():
CatalogueSession(log, netTimeout, pid, driveConfig),
m_state(TRANSFERSTATE_WAIT_JOB),
m_mode(WRITE_DISABLE),
m_lastTimeSomeBlocksWereMoved(time(0)),
m_assignmentTime(time(0)),
m_dataTransferConfig(dataTransferConfig),
m_vdqmJob(vdqmJob),
m_vmgr(vmgr),
m_cupv(cupv),
m_hostName(hostName) {
m_hostName(hostName),
m_blockMoveTimeoutInSecs(blockMoveTimeoutInSecs) {
}
//------------------------------------------------------------------------------
// tick
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::CatalogueTransferSession::tick() {
const time_t now = time(0);
const time_t secsSinceSomeBlocksWereMoved = now -
m_lastTimeSomeBlocksWereMoved;
if(secsSinceSomeBlocksWereMoved > m_blockMoveTimeoutInSecs) {
std::list<log::Param> params;
params.push_back(log::Param("transferSessionPid", m_pid));
params.push_back(log::Param("secsSinceSomeBlocksWereMoved",
secsSinceSomeBlocksWereMoved));
params.push_back(log::Param("blockMoveTimeoutInSecs",
m_blockMoveTimeoutInSecs));
m_log(LOG_ERR, "Killing data-transfer session because it is stuck", params);
if(kill(m_pid, SIGKILL)) {
const std::string errnoStr = castor::utils::errnoToString(errno);
params.push_back(log::Param("message", errnoStr));
m_log(LOG_ERR, "Failed to kill data-transfer session", params);
}
}
}
//------------------------------------------------------------------------------
......@@ -421,6 +448,7 @@ bool castor::tape::tapeserver::daemon::CatalogueTransferSession::
//-----------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::CatalogueTransferSession::
receivedHeartbeat(const uint64_t nbBlocksMoved) {
log::Param params[] = {log::Param("nbBlocksMoved", nbBlocksMoved)};
m_log(LOG_DEBUG, "CatalogueTransferSession received heartbeat", params);
if(nbBlocksMoved > 0) {
m_lastTimeSomeBlocksWereMoved = time(0);
}
}
......@@ -58,6 +58,8 @@ public:
* @param cupv Proxy object representing the cupvd daemon.
* @param hostName The host name to be used as the target host when
* communicating with the cupvd daemon.
* @param blockMoveTimeoutInSecs The maximum time in seconds that the
* data-transfer session can cease to move data blocks.
* @param rmcPort The TCP/IP port on which the rmcd daemon is listening.
* @param processForker Proxy object representing the ProcessForker.
* @return A newly created CatalogueTransferSession object.
......@@ -71,6 +73,7 @@ public:
legacymsg::VmgrProxy &vmgr,
legacymsg::CupvProxy &cupv,
const std::string &hostName,
const time_t blockMoveTimeoutInSecs,
const unsigned short rmcPort,
ProcessForkerProxy &processForker);
......@@ -229,6 +232,8 @@ protected:
* @param cupv Proxy object representing the cupvd daemon.
* @param hostName The host name to be used as the target host when
* communicating with the cupvd daemon.
* @param blockMoveTimeoutInSecs The maximum time in seconds that the
* data-transfer session can cease to move data blocks.
*/
CatalogueTransferSession(
log::Logger &log,
......@@ -239,7 +244,8 @@ protected:
const legacymsg::RtcpJobRqstMsgBody &vdqmJob,
legacymsg::VmgrProxy &vmgr,
legacymsg::CupvProxy &cupv,
const std::string &hostName) throw();
const std::string &hostName,
const time_t blockMoveTimeoutInSecs) throw();
private:
......@@ -274,6 +280,12 @@ private:
*/
uint16_t m_mode;
/**
* The last time at which some data blocks were moved by the data-transfer
* session.
*/
time_t m_lastTimeSomeBlocksWereMoved;
/**
* The time at which the tape drive was assigned a data transfer job.
*/
......@@ -305,6 +317,12 @@ private:
*/
const std::string m_hostName;
/**
* The maximum time in seconds that the data-transfer session can cease to
* move data blocks.
*/
const time_t m_blockMoveTimeoutInSecs;
}; // class CatalogueTransferSession
} // namespace daemon
......
......@@ -30,25 +30,32 @@ namespace daemon {
* The TCP/IP port on which the tape server daemon listens for incoming
* connections from the VDQM server.
*/
const unsigned short TAPE_SERVER_VDQM_LISTENING_PORT = 5070;
const unsigned short TAPESERVER_VDQM_LISTENING_PORT = 5070;
/**
* The TCP/IP port on which the tape server daemon listens for incoming
* connections from the tpconfig admin command.
*/
const unsigned short TAPE_SERVER_ADMIN_LISTENING_PORT = 5011;
const unsigned short TAPESERVER_ADMIN_LISTENING_PORT = 5011;
/**
* The TCP/IP port on which the tape server daemon listens for incoming
* connections from the label command.
*/
const unsigned short TAPE_SERVER_LABELCMD_LISTENING_PORT = 54321;
const unsigned short TAPESERVER_LABELCMD_LISTENING_PORT = 54321;
/*
* The port on which ZMQ sockets will bind for internal communication between
* motherforker and forked session
* forked sessions and the parent tapeserverd process.
*/
const unsigned short TAPE_SERVER_INTERNAL_LISTENING_PORT = 54322;
const unsigned short TAPESERVER_INTERNAL_LISTENING_PORT = 54322;
/**
* The compile-time default value for the maximum time in seconds the
* data-transfer session of tapeserverd can cease to move data blocks.
*/
const time_t TAPESERVER_BLKMOVETIMEOUT_DEFAULT = 300; // 5 minutes
} // namespace daemon
} // namespace tapeserver
} // namespace tape
......
......@@ -517,7 +517,7 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
messages::SmartZmqContext
zmqContext(instantiateZmqContext(sizeOfIOThreadPoolForZMQ));
messages::TapeserverProxyZmq tapeserver(m_log,
TAPE_SERVER_INTERNAL_LISTENING_PORT, zmqContext.get());
TAPESERVER_INTERNAL_LISTENING_PORT, zmqContext.get());
castor::tape::System::realWrapper sysWrapper;
......@@ -848,7 +848,7 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
messages::SmartZmqContext
zmqContext(instantiateZmqContext(sizeOfIOThreadPoolForZMQ));
messages::TapeserverProxyZmq tapeserver(m_log,
TAPE_SERVER_INTERNAL_LISTENING_PORT, zmqContext.get());
TAPESERVER_INTERNAL_LISTENING_PORT, zmqContext.get());
// The network timeout of rmc communications should be several minutes due
// to the time it takes to mount and unmount tapes
......
......@@ -668,7 +668,7 @@ void castor::tape::tapeserver::daemon::TapeDaemon::
try {
castor::utils::SmartFd listenSock;
try {
listenSock.reset(io::createListenerSock(TAPE_SERVER_VDQM_LISTENING_PORT));
listenSock.reset(io::createListenerSock(TAPESERVER_VDQM_LISTENING_PORT));
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex(ne.code());
ex.getMessage() << "Failed to create socket to listen for vdqm connections"
......@@ -677,7 +677,7 @@ void castor::tape::tapeserver::daemon::TapeDaemon::
}
{
log::Param params[] = {
log::Param("listeningPort", TAPE_SERVER_VDQM_LISTENING_PORT)};
log::Param("listeningPort", TAPESERVER_VDQM_LISTENING_PORT)};
m_log(LOG_INFO, "Listening for connections from the vdqmd daemon", params);
}
......@@ -712,7 +712,7 @@ void castor::tape::tapeserver::daemon::TapeDaemon::
try {
castor::utils::SmartFd listenSock;
try {
listenSock.reset(io::createListenerSock(TAPE_SERVER_ADMIN_LISTENING_PORT));
listenSock.reset(io::createListenerSock(TAPESERVER_ADMIN_LISTENING_PORT));
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex(ne.code());
ex.getMessage() <<
......@@ -722,7 +722,7 @@ void castor::tape::tapeserver::daemon::TapeDaemon::
}
{
log::Param params[] = {
log::Param("listeningPort", TAPE_SERVER_ADMIN_LISTENING_PORT)};
log::Param("listeningPort", TAPESERVER_ADMIN_LISTENING_PORT)};
m_log(LOG_INFO, "Listening for connections from the admin commands",
params);
}
......@@ -759,7 +759,7 @@ void castor::tape::tapeserver::daemon::TapeDaemon::
castor::utils::SmartFd listenSock;
try {
listenSock.reset(
io::createListenerSock(TAPE_SERVER_LABELCMD_LISTENING_PORT));
io::createListenerSock(TAPESERVER_LABELCMD_LISTENING_PORT));
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex(ne.code());
ex.getMessage() <<
......@@ -769,7 +769,7 @@ void castor::tape::tapeserver::daemon::TapeDaemon::
}
{
log::Param params[] = {
log::Param("listeningPort", TAPE_SERVER_LABELCMD_LISTENING_PORT)};
log::Param("listeningPort", TAPESERVER_LABELCMD_LISTENING_PORT)};
m_log(LOG_INFO, "Listening for connections from label command",
params);
}
......
......@@ -63,7 +63,7 @@ castor::tape::tapeserver::daemon::TapeMessageHandler::TapeMessageHandler(
m_vmgr(vmgr) {
std::ostringstream endpoint;
endpoint << "tcp://127.0.0.1:" << TAPE_SERVER_INTERNAL_LISTENING_PORT;
endpoint << "tcp://127.0.0.1:" << TAPESERVER_INTERNAL_LISTENING_PORT;
try {
m_socket.bind(endpoint.str().c_str());
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment