Commit 6697e2ce authored by Steven Murray's avatar Steven Murray
Browse files

Added RTCPD ping.

Implemented the position, file transfered and end of recall parts of the
RTCOPY recall protocol.
parent 0541830b
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Mon Feb 2 17:47:21 CET 2009
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Wed Feb 4 18:11:21 CET 2009
*/
/******************************************************************************
......@@ -69,11 +69,14 @@ AGGREGATOR_FAILED_TO_SIGNAL_NO_MORE_REQUESTS=32, /* "Failed to signal no more re
AGGREGATOR_FAILED_TO_COORDINATE_REMOTE_COPY=33, /* "Failed to coordinate remote copy" */
AGGREGATOR_FAILED_TO_CREATE_RTCPD_CALLBACK_SOCKET=34, /* "Failed to create RTCPD callback socket" */
AGGREGATOR_PROCESS_RTCPD_CONNECTIONS_FAILED=35, /* "Failed to process RTCPD connections" */
AGGREGATOR_MAIN_SELECT_TIMEDOUT=36, /* "The select of the main select loop timed out" */
AGGREGATOR_PINGED_RTCPD=36, /* "Pinged RTCPD" */
AGGREGATOR_MAIN_SELECT_FAILED=37, /* "The select of the main select loop encountered an error other than an interruption" */
AGGREGATOR_GAVE_REQUEST_FOR_MORE_WORK=38, /* "Gave request for more work to RTCPD" */
AGGREGATOR_DATA_ON_INITIAL_RTCPD_CONNECTION=39, /* "Data has arrived on the initial RTCPD connection" */
AGGREGATOR_RECEIVED_RTCP_ENDOF_REQ=40 /* "Received RTCP_ENDOF_REQ" */
AGGREGATOR_RECEIVED_RTCP_ENDOF_REQ=40, /* "Received RTCP_ENDOF_REQ" */
AGGREGATOR_TAPE_POSITIONED_FILE_REQ=41, /* "Tape positioned (file request)" */
AGGREGATOR_TAPE_POSITIONED_TAPE_REQ=42, /* "Tape positioned (tape request)" */
AGGREGATOR_FILE_TRANSFERED=43 /* "File transfered" */
}; // enum AggregatorDlfMessages
} // namespace aggregator
} // namespace tape
......
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Mon Feb 2 17:47:21 CET 2009
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Wed Feb 4 18:11:21 CET 2009
*/
/******************************************************************************
......@@ -65,9 +65,12 @@ castor::dlf::Message castor::tape::aggregator::AggregatorDaemon::s_dlfMessages[]
{AGGREGATOR_FAILED_TO_COORDINATE_REMOTE_COPY, "Failed to coordinate remote copy"},
{AGGREGATOR_FAILED_TO_CREATE_RTCPD_CALLBACK_SOCKET, "Failed to create RTCPD callback socket"},
{AGGREGATOR_PROCESS_RTCPD_CONNECTIONS_FAILED, "Failed to process RTCPD connections"},
{AGGREGATOR_MAIN_SELECT_TIMEDOUT, "The select of the main select loop timed out"},
{AGGREGATOR_PINGED_RTCPD, "Pinged RTCPD"},
{AGGREGATOR_MAIN_SELECT_FAILED, "The select of the main select loop encountered an error other than an interruption"},
{AGGREGATOR_GAVE_REQUEST_FOR_MORE_WORK, "Gave request for more work to RTCPD"},
{AGGREGATOR_DATA_ON_INITIAL_RTCPD_CONNECTION, "Data has arrived on the initial RTCPD connection"},
{AGGREGATOR_RECEIVED_RTCP_ENDOF_REQ, "Received RTCP_ENDOF_REQ"},
{AGGREGATOR_TAPE_POSITIONED_FILE_REQ, "Tape positioned (file request)"},
{AGGREGATOR_TAPE_POSITIONED_TAPE_REQ, "Tape positioned (tape request)"},
{AGGREGATOR_FILE_TRANSFERED, "File transfered"},
{-1, ""}};
......@@ -34,8 +34,11 @@
33,AGGREGATOR_FAILED_TO_COORDINATE_REMOTE_COPY,"Failed to coordinate remote copy"
34,AGGREGATOR_FAILED_TO_CREATE_RTCPD_CALLBACK_SOCKET,"Failed to create RTCPD callback socket"
35,AGGREGATOR_PROCESS_RTCPD_CONNECTIONS_FAILED,"Failed to process RTCPD connections"
36,AGGREGATOR_MAIN_SELECT_TIMEDOUT,"The select of the main select loop timed out"
36,AGGREGATOR_PINGED_RTCPD,"Pinged RTCPD"
37,AGGREGATOR_MAIN_SELECT_FAILED,"The select of the main select loop encountered an error other than an interruption"
38,AGGREGATOR_GAVE_REQUEST_FOR_MORE_WORK,"Gave request for more work to RTCPD"
39,AGGREGATOR_DATA_ON_INITIAL_RTCPD_CONNECTION,"Data has arrived on the initial RTCPD connection"
40,AGGREGATOR_RECEIVED_RTCP_ENDOF_REQ,"Received RTCP_ENDOF_REQ"
41,AGGREGATOR_TAPE_POSITIONED_FILE_REQ,"Tape positioned (file request)"
42,AGGREGATOR_TAPE_POSITIONED_TAPE_REQ,"Tape positioned (tape request)"
43,AGGREGATOR_FILE_TRANSFERED,"File transfered"
......@@ -40,6 +40,7 @@ const int LISTENBACKLOG = 2;
const size_t MSGBUFSIZ = 1024;
const int RTCPDNETRWTIMEOUT = 5;
const int RTCPDCALLBACKTIMEOUT = 5;
const int RTCPDPINGTIMEOUT = 10;
const size_t SERVICENAMEBUFLEN = 256;
const size_t STRERRORBUFLEN = 256;
......
......@@ -332,6 +332,48 @@ void castor::tape::aggregator::Transceiver::sendRtcpAcknowledge(
}
//-----------------------------------------------------------------------------
// pingRtcpd
//-----------------------------------------------------------------------------
void castor::tape::aggregator::Transceiver::pingRtcpd(
const int socketFd, const int netReadWriteTimeout)
throw(castor::exception::Exception) {
char buf[MSGBUFSIZ];
size_t totalLen = 0;
MessageHeader header;
header.magic = RTCOPY_MAGIC;
header.reqType = RTCP_PING_REQ;
header.len = 0;
try {
// The RTCPD message is a bodiless RTCP message
totalLen = Marshaller::marshallMessageHeader(buf, header);
} catch(castor::exception::Exception &ex) {
castor::exception::Internal ie;
ie.getMessage() << __PRETTY_FUNCTION__
<< ": Failed to marshall RCPD ping message : "
<< ex.getMessage().str();
throw ie;
}
try {
Net::writeBytes(socketFd, netReadWriteTimeout, totalLen, buf);
} catch(castor::exception::Exception &ex) {
castor::exception::Exception ex2(SECOMERR);
ex2.getMessage() << __PRETTY_FUNCTION__
<< ": Failed to send the RCPD ping message to RTCPD"
": " << ex.getMessage().str();
throw ex2;
}
}
//-----------------------------------------------------------------------------
// signalNoMoreRequestsToRtcpd
//-----------------------------------------------------------------------------
......@@ -540,8 +582,8 @@ void castor::tape::aggregator::Transceiver::giveRequestForMoreWorkToRtcpd(
//-----------------------------------------------------------------------------
void castor::tape::aggregator::Transceiver::giveFileListToRtcpd(
const int socketFd, const int netReadWriteTimeout, const uint32_t volReqId,
const char *const filePath, const uint32_t umask, const bool requestMoreWork)
throw(castor::exception::Exception) {
const char *const filePath, const char *const tapePath, const uint32_t umask,
const bool requestMoreWork) throw(castor::exception::Exception) {
RtcpFileRqstErrMsgBody request;
RtcpFileRqstErrMsgBody reply;
......@@ -551,6 +593,7 @@ void castor::tape::aggregator::Transceiver::giveFileListToRtcpd(
{
Utils::setBytes(request, '\0');
Utils::copyString(request.filePath, filePath);
Utils::copyString(request.tapePath, tapePath);
Utils::copyString(request.recfm, "F");
request.volReqId = volReqId;
......
......@@ -128,6 +128,16 @@ public:
const int netReadWriteTimeout, const RtcpAcknowledgeMsg &message)
throw(castor::exception::Exception);
/**
* Pings RTCPD using the specified socket.
*
* @param socketFd The socket file descriptor of the connection with RTCPD.
* @param netReadWriteTimeout The timeout to be applied when performing
* network read and write operations.
*/
static void pingRtcpd(const int socketFd,
const int netReadWriteTimeout) throw(castor::exception::Exception);
/**
* Receives an RTCP job submission message.
*
......@@ -160,6 +170,7 @@ public:
* @param socketFd The socket file descriptor of the connection with RTCPD.
* @param netReadWriteTimeout The timeout to be applied when performing
* @param filePath The file path.
* @param tapePath The tape path.
* @param umask The umask of the file.
* @param requestMoreWork Set to true if RTCPD should be told that there is
* a possibility of more work in the future.
......@@ -167,8 +178,9 @@ public:
*/
static void giveFileListToRtcpd(const int socketFd,
const int netReadWriteTimeout, const uint32_t volReqId,
const char *const filePath, const uint32_t umask,
const bool requestMoreWork) throw(castor::exception::Exception);
const char *const filePath, const char *const tapePath,
const uint32_t umask, const bool requestMoreWork)
throw(castor::exception::Exception);
/**
* Receives a message header.
......
......@@ -252,11 +252,12 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::
//-----------------------------------------------------------------------------
// processTapeDiskIoConnection
//-----------------------------------------------------------------------------
void castor::tape::aggregator::VdqmRequestHandlerThread::
bool castor::tape::aggregator::VdqmRequestHandlerThread::
processTapeDiskIoConnection(const Cuuid_t &cuuid,
const RcpJobRqstMsgBody &vdqmJobRequest, const int socketFd)
throw(castor::exception::Exception) {
bool continueMainSelectLoop = true;
MessageHeader header;
Transceiver::receiveRtcpMsgHeader(socketFd, RTCPDNETRWTIMEOUT,
......@@ -267,38 +268,113 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::
{
RtcpFileRqstMsgBody body;
// Receive what should be a request for more work
Transceiver::receiveRtcpFileRqstBody(socketFd, RTCPDNETRWTIMEOUT,
header, body);
if(body.procStatus == RTCP_REQUEST_MORE_WORK) {
switch(body.procStatus) {
case RTCP_REQUEST_MORE_WORK:
{
// Give file information to RTCPD
try {
Transceiver::giveFileListToRtcpd(socketFd, RTCPDNETRWTIMEOUT,
vdqmJobRequest.tapeRequestId, "lxc2disk07:/dev/null",
body.tapePath, 18, false);
castor::dlf::Param params[] = {
castor::dlf::Param("volReqId", vdqmJobRequest.tapeRequestId),
castor::dlf::Param("filePath","lxc2disk07:/dev/null"),
castor::dlf::Param("tapePath", body.tapePath)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_GAVE_FILE_INFO, params);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("volReqId", vdqmJobRequest.tapeRequestId),
castor::dlf::Param("filePath","lxc2disk07:/dev/null"),
castor::dlf::Param("tapePath", body.tapePath),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
CASTOR_DLF_WRITEPC(cuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_GIVE_FILE_INFO, params);
}
// Give file information to RTCPD
try {
Transceiver::giveFileListToRtcpd(socketFd, RTCPDNETRWTIMEOUT,
vdqmJobRequest.tapeRequestId, "lxc2disk07:/dev/null", 18, false);
} catch(castor::exception::Exception &ex) {
// Acknowledge request for more work from RTCPD
RtcpAcknowledgeMsg ackMsg;
ackMsg.magic = RTCOPY_MAGIC;
ackMsg.reqType = RTCP_FILE_REQ;
ackMsg.status = 0;
Transceiver::sendRtcpAcknowledge(socketFd, RTCPDNETRWTIMEOUT, ackMsg);
}
break;
case RTCP_POSITIONED:
{
castor::dlf::Param params[] = {
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
CASTOR_DLF_WRITEPC(cuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_GIVE_FILE_INFO, params);
castor::dlf::Param("volReqId", vdqmJobRequest.tapeRequestId),
castor::dlf::Param("filePath", "lxc2disk07:/dev/null"),
castor::dlf::Param("tapePath", body.tapePath)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_TAPE_POSITIONED_FILE_REQ, params);
RtcpAcknowledgeMsg ackMsg;
ackMsg.magic = RTCOPY_MAGIC;
ackMsg.reqType = RTCP_FILE_REQ;
ackMsg.status = 0;
Transceiver::sendRtcpAcknowledge(socketFd, RTCPDNETRWTIMEOUT, ackMsg);
}
break;
case RTCP_FINISHED:
{
castor::dlf::Param params[] = {
castor::dlf::Param("volReqId", vdqmJobRequest.tapeRequestId),
castor::dlf::Param("filePath", "lxc2disk07:/dev/null"),
castor::dlf::Param("tapePath", body.tapePath)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_FILE_TRANSFERED, params);
RtcpAcknowledgeMsg ackMsg;
ackMsg.magic = RTCOPY_MAGIC;
ackMsg.reqType = RTCP_FILE_REQ;
ackMsg.status = 0;
Transceiver::sendRtcpAcknowledge(socketFd, RTCPDNETRWTIMEOUT, ackMsg);
}
break;
default:
{
castor::exception::Exception ex(EBADMSG);
// Acknowledge request for more work from RTCPD
RtcpAcknowledgeMsg ackMsg;
ackMsg.magic = RTCOPY_MAGIC;
ackMsg.reqType = RTCP_FILE_REQ;
ackMsg.status = 0;
Transceiver::sendRtcpAcknowledge(socketFd, RTCPDNETRWTIMEOUT, ackMsg);
} else {
castor::dlf::Param params[] = {
castor::dlf::Param("Message" , "Pandora's box"),
castor::dlf::Param("RtcpFileRqstErrMsgBody.proc_status",
body.procStatus)};
CASTOR_DLF_WRITEPC(cuuid, DLF_LVL_ERROR, AGGREGATOR_NULL, params);
ex.getMessage() << __PRETTY_FUNCTION__
<< ": Received unexpected file request process status 0x"
<< std::hex << body.procStatus
<< "(" << Utils::procStatusToStr(body.procStatus) << ")";
throw ex;
}
}
}
break;
case RTCP_FILEERR_REQ:
{
RtcpFileRqstErrMsgBody body;
Transceiver::receiveRtcpFileRqstErrBody(socketFd, RTCPDNETRWTIMEOUT,
header, body);
castor::dlf::Param params[] = {
castor::dlf::Param("Message" , "Think we got an error"),
castor::dlf::Param("RtcpFileRqstErrMsgBody.proc_status",
body.procStatus),
castor::dlf::Param("RtcpFileRqstErrMsgBody.err.errmsgtxt",
body.err.errmsgtxt)};
CASTOR_DLF_WRITEPC(cuuid, DLF_LVL_ERROR, AGGREGATOR_NULL, params);
RtcpAcknowledgeMsg ackMsg;
ackMsg.magic = RTCOPY_MAGIC;
ackMsg.reqType = RTCP_FILEERR_REQ;
ackMsg.status = 0;
Transceiver::sendRtcpAcknowledge(socketFd, RTCPDNETRWTIMEOUT, ackMsg);
}
break;
case RTCP_TAPE_REQ:
{
RtcpTapeRqstMsgBody body;
......@@ -327,8 +403,7 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::
ackMsg.status = 0;
Transceiver::sendRtcpAcknowledge(socketFd, RTCPDNETRWTIMEOUT, ackMsg);
// TBD
// Do something to end this madness
continueMainSelectLoop = false;
break;
}
default:
......@@ -342,6 +417,8 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::
throw ex;
}
}
return continueMainSelectLoop;
}
......@@ -362,8 +439,8 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::
try {
// Select loop
bool selectAgain = true;
while(selectAgain) {
bool continueMainSelectLoop = true;
while(continueMainSelectLoop) {
// Build the file descriptor set ready for the select call
FD_ZERO(&readFdSet);
......@@ -384,7 +461,7 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::
}
}
timeout.tv_sec = RTCPDNETRWTIMEOUT;
timeout.tv_sec = RTCPDPINGTIMEOUT;
timeout.tv_usec = 0;
selectRc = select(maxFd + 1, &readFdSet, NULL, NULL, &timeout);
......@@ -393,8 +470,8 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::
switch(selectRc) {
case 0: // Select timed out
castor::dlf::dlf_writep(cuuid, DLF_LVL_DEBUG,
AGGREGATOR_MAIN_SELECT_TIMEDOUT);
Transceiver::pingRtcpd(rtcpdInitialSocketFd, RTCPDNETRWTIMEOUT);
castor::dlf::dlf_writep(cuuid, DLF_LVL_DEBUG, AGGREGATOR_PINGED_RTCPD);
break;
case -1: // Select encountered an error
......@@ -442,7 +519,8 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::
if(FD_ISSET(*itor, &readFdSet)) {
processTapeDiskIoConnection(cuuid, vdqmJobRequest, *itor);
continueMainSelectLoop = processTapeDiskIoConnection(cuuid,
vdqmJobRequest, *itor);
FD_CLR(*itor, &readFdSet);
}
......
......@@ -144,12 +144,22 @@ namespace aggregator {
/**
* Processes the connected socket of a tape and disk I/O thread.
*
* Along with other request types, this function has to process the
* RTCP_ENDOF_REQ request type. A message of this type is sent by RTCPD
* at the end of a migration. The reception of such a message should end
* the current thread's current migration, in other words it should cause
* the main select loop to exit. This method returns true if the main
* select loop should continue or false if it has received an
* RTCP_ENDOF_REQ request.
*
* @param cuuid The ccuid to be used for logging.
* @param vdqmJobRequest The job request message received from the VDQM.
* @param socketFd The file descriptor of the connected socket of a tape
* or disk I/O thread.
* @return true if the main select loop should continue or false if a
* RTCP_ENDOF_REQ request has been received.
*/
void processTapeDiskIoConnection(const Cuuid_t &cuuid,
bool processTapeDiskIoConnection(const Cuuid_t &cuuid,
const RcpJobRqstMsgBody &vdqmJobRequest, const int socketFd)
throw(castor::exception::Exception);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment