Commit 7651682d authored by Steven Murray's avatar Steven Murray
Browse files

Created the new but yet to be implemented 'selectLoop' function. This function

will deal with the following:

     * The incoming callback connection request from the tape I/O thread.
     * The incoming callback connection requests from the disk I/O threads.
     * Any incoming messages sent from RTCPD over any of the connected
       sockets, including the initial RTCPD connection.
parent 88f77112
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Thu Jan 29 17:48:08 CET 2009
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Fri Jan 30 13:44:21 CET 2009
*/
/******************************************************************************
......@@ -65,7 +65,10 @@ AGGREGATOR_FAILED_TO_GIVE_VOLUME_INFO=28, /* "Failed to give volume information
AGGREGATOR_GAVE_FILE_INFO=29, /* "Gave file information to RTCPD" */
AGGREGATOR_FAILED_TO_GIVE_FILE_INFO=30, /* "Failed to give file information to RTCPD" */
AGGREGATOR_SIGNALLED_NO_MORE_REQUESTS=31, /* "Signalled no more requests to RTCPD" */
AGGREGATOR_FAILED_TO_SIGNAL_NO_MORE_REQUESTS=32 /* "Failed to signal no more requests to RTCPD" */
AGGREGATOR_FAILED_TO_SIGNAL_NO_MORE_REQUESTS=32, /* "Failed to signal no more requests to RTCPD" */
AGGREGATOR_FAILED_TO_COORDINATE_REMOTE_COPY=33, /* "Failed to coordinate remote copy" */
AGGREGATOR_FAILED_TO_CREATE_RTCPD_CALLBACK_SOCKET=34, /* "Failed to create RTCPD callback socket" */
AGGREGATOR_SELECT_LOOP_FAILED=35 /* "Main select loop failed" */
}; // enum AggregatorDlfMessages
} // namespace aggregator
} // namespace tape
......
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Thu Jan 29 17:48:08 CET 2009
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Fri Jan 30 13:44:21 CET 2009
*/
/******************************************************************************
......@@ -62,4 +62,7 @@ castor::dlf::Message castor::tape::aggregator::AggregatorDaemon::s_dlfMessages[]
{AGGREGATOR_FAILED_TO_GIVE_FILE_INFO, "Failed to give file information to RTCPD"},
{AGGREGATOR_SIGNALLED_NO_MORE_REQUESTS, "Signalled no more requests to RTCPD"},
{AGGREGATOR_FAILED_TO_SIGNAL_NO_MORE_REQUESTS, "Failed to signal no more requests to RTCPD"},
{AGGREGATOR_FAILED_TO_COORDINATE_REMOTE_COPY, "Failed to coordinate remote copy"},
{AGGREGATOR_FAILED_TO_CREATE_RTCPD_CALLBACK_SOCKET, "Failed to create RTCPD callback socket"},
{AGGREGATOR_SELECT_LOOP_FAILED, "Main select loop failed"},
{-1, ""}};
......@@ -31,3 +31,6 @@
30,AGGREGATOR_FAILED_TO_GIVE_FILE_INFO,"Failed to give file information to RTCPD"
31,AGGREGATOR_SIGNALLED_NO_MORE_REQUESTS,"Signalled no more requests to RTCPD"
32,AGGREGATOR_FAILED_TO_SIGNAL_NO_MORE_REQUESTS,"Failed to signal no more requests to RTCPD"
33,AGGREGATOR_FAILED_TO_COORDINATE_REMOTE_COPY,"Failed to coordinate remote copy"
34,AGGREGATOR_FAILED_TO_CREATE_RTCPD_CALLBACK_SOCKET,"Failed to create RTCPD callback socket"
35,AGGREGATOR_SELECT_LOOP_FAILED,"Main select loop failed"
......@@ -34,13 +34,14 @@ namespace castor {
namespace tape {
namespace aggregator {
const size_t HDRBUFSIZ = 3 * sizeof(uint32_t);
const size_t HOSTNAMEBUFLEN = 256;
const int LISTENBACKLOG = 2;
const size_t MSGBUFSIZ = 1024;
const int NETRWTIMEOUT = 300;
const size_t SERVICENAMEBUFLEN = 256;
const size_t STRERRORBUFLEN = 256;
const size_t HDRBUFSIZ = 3 * sizeof(uint32_t);
const size_t HOSTNAMEBUFLEN = 256;
const int LISTENBACKLOG = 2;
const size_t MSGBUFSIZ = 1024;
const int NETRWTIMEOUT = 300;
const int RTCPDCALLBACKTIMEOUT = 5;
const size_t SERVICENAMEBUFLEN = 256;
const size_t STRERRORBUFLEN = 256;
} // namespace aggregator
} // namespace tape
......
......@@ -358,7 +358,7 @@ size_t castor::tape::aggregator::Marshaller::marshallRcpJobRequestMessage(
marshallUint32(RTCOPY_MAGIC_OLD0 , p); // Magic number
marshallUint32(VDQM_CLIENTINFO , p); // Request type
marshallUint32(len , p); // Length of message body
marshallUint32(src.tapeRequestID , p);
marshallUint32(src.tapeRequestId , p);
marshallUint32(src.clientPort , p);
marshallUint32(src.clientEuid , p);
marshallUint32(src.clientEgid , p);
......@@ -395,7 +395,7 @@ void castor::tape::aggregator::Marshaller::unmarshallRcpJobRequestMessageBody(
const char * &src, size_t &srcLen, RcpJobRequestMessage &dst)
throw(castor::exception::Exception) {
unmarshallUint32(src, srcLen, dst.tapeRequestID);
unmarshallUint32(src, srcLen, dst.tapeRequestId);
unmarshallUint32(src, srcLen, dst.clientPort);
unmarshallUint32(src, srcLen, dst.clientEuid);
unmarshallUint32(src, srcLen, dst.clientEgid);
......
......@@ -38,7 +38,7 @@ namespace aggregator {
* An RCP job submission request message.
*/
struct RcpJobRequestMessage {
uint32_t tapeRequestID;
uint32_t tapeRequestId;
uint32_t clientPort;
uint32_t clientEuid;
uint32_t clientEgid;
......
......@@ -51,7 +51,7 @@ void castor::tape::aggregator::RcpJobSubmitter::submit(
const unsigned int port,
const int netReadWriteTimeout,
const char *remoteCopyType,
const u_signed64 tapeRequestID,
const u_signed64 tapeRequestId,
const std::string &clientUserName,
const std::string &clientHost,
const int clientPort,
......@@ -108,7 +108,7 @@ void castor::tape::aggregator::RcpJobSubmitter::submit(
// Prepare the job submission request information ready for marshalling
// The validity of the string length were check above
request.tapeRequestID = tapeRequestID;
request.tapeRequestId = tapeRequestId;
request.clientPort = clientPort;
request.clientEuid = clientEuid;
request.clientEgid = clientEgid;
......
......@@ -39,6 +39,8 @@
#include "h/rtcp_constants.h"
#include "h/vdqm_constants.h"
#include <list>
//-----------------------------------------------------------------------------
// constructor
......@@ -128,7 +130,7 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::init() throw() {
//-----------------------------------------------------------------------------
void castor::tape::aggregator::VdqmRequestHandlerThread::
processJobSubmissionRequest(const Cuuid_t &cuuid, const int vdqmSocketFd,
RcpJobRequestMessage &jobRequest, int &rtcpdCallbackSocketFd)
RcpJobRequestMessage &jobRequest, const int rtcpdCallbackSocketFd)
throw(castor::exception::Exception) {
// Log the new connection
......@@ -163,7 +165,7 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::
Transceiver::receiveRcpJobRequest(vdqmSocketFd, NETRWTIMEOUT, jobRequest);
{
castor::dlf::Param params[] = {
castor::dlf::Param("tapeRequestID" , jobRequest.tapeRequestID ),
castor::dlf::Param("tapeRequestId" , jobRequest.tapeRequestId ),
castor::dlf::Param("clientPort" , jobRequest.clientPort ),
castor::dlf::Param("clientEuid" , jobRequest.clientEuid ),
castor::dlf::Param("clientEgid" , jobRequest.clientEgid ),
......@@ -175,9 +177,6 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::
AGGREGATOR_HANDLE_JOB_MESSAGE, 8, params);
}
// Create, bind and mark a listener socket for RTCPD callback connections
rtcpdCallbackSocketFd = Net::createListenerSocket(0);
// Get the IP and port of the RTCPD callback socket
unsigned long rtcpdCallbackSocketIp = 0;
unsigned short rtcpdCallbackSocketPort = 0;
......@@ -197,7 +196,7 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::
RTCOPY_PORT, // port
NETRWTIMEOUT, // netReadWriteTimeout
"RTCPD", // remoteCopyType
jobRequest.tapeRequestID,
jobRequest.tapeRequestId,
jobRequest.clientUserName,
rtcpdCallbackHostName,
rtcpdCallbackSocketPort,
......@@ -241,144 +240,261 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::
//-----------------------------------------------------------------------------
// run
// selectLoop
//-----------------------------------------------------------------------------
void castor::tape::aggregator::VdqmRequestHandlerThread::run(void *param)
throw() {
Cuuid_t cuuid = nullCuuid;
void castor::tape::aggregator::VdqmRequestHandlerThread::selectLoop(
const Cuuid_t &cuuid, const RcpJobRequestMessage &vdqmJobRequest,
const int rtcpdCallbackSocketFd, const int rtcpdInitialSocketFd)
throw(castor::exception::Exception) {
// Give a Cuuid to the request
Cuuid_create(&cuuid);
std::list<int> connectedSockets;
fd_set fdSet;
if(param == NULL) {
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_VDQM_REQUEST_HANDLER_SOCKET_IS_NULL);
return;
try {
// Select loop
bool selectAgain = true;
while(selectAgain) {
//MISING largest fd logic!!!!!!
// Build the file descriptor set ready for the select call
FD_ZERO(&fdSet);
FD_SET(rtcpdInitialSocketFd, &fdSet);
for(
std::list<int>::iterator itor = connectedSockets.begin();
itor != connectedSockets.end();
itor++) {
FD_SET(*itor, &fdSet);
}
}
} catch(castor::exception::Exception &ex) {
// TBD
}
/*
// Give the volume ID from the VDQM job submission message to RTCPD
{
uint32_t tStartRequest = time(NULL); // CASTOR2/rtcopy/rtcpclientd.c:1494
RtcpTapeRequestMessage request;
RtcpTapeRequestMessage reply;
castor::io::AbstractTCPSocket *vdqmSocket =
(castor::io::AbstractTCPSocket*)param;
Utils::setBytes(request, '\0');
Utils::copyString(request.vid , "I10547");
Utils::copyString(request.vsn , "I10547");
Utils::copyString(request.label , "aul" );
Utils::copyString(request.density, "700GC" );
request.volReqId = vdqmJobRequest.tapeRequestId;
request.tStartRequest = tStartRequest;
// Process the job submission request message from the VDQM using the
// processJobSubmissionRequest function.
//
// If successfull then the function will have passed the request onto RTCPD
// and will have given as output the contents of the job submission request
// and the file descriptor of the listener socket which will be used to
// accept callback connections from RTCPD.
RcpJobRequestMessage vdqmJobRequest;
int rtcpdCallbackSocketFd = 0;
bool processedJobSubmissionRequest = false;
try {
Transceiver::giveVolumeIdToRtcpd(rtcpdInitialSocketFd, NETRWTIMEOUT,
request, reply);
castor::dlf::Param params[] = {
castor::dlf::Param("vid" , request.vid ),
castor::dlf::Param("vsn" , request.vsn ),
castor::dlf::Param("label" , request.label ),
castor::dlf::Param("devtype" , request.devtype),
castor::dlf::Param("density" , request.density)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_GAVE_VOLUME_INFO, 26, params);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_GIVE_VOLUME_INFO, 3, params);
}
}
// Give file information to RTCPD
try {
processJobSubmissionRequest(cuuid, vdqmSocket->socket(), vdqmJobRequest,
rtcpdCallbackSocketFd);
processedJobSubmissionRequest = true;
Transceiver::giveFileListToRtcpd(rtcpdInitialSocketFd, NETRWTIMEOUT,
vdqmJobRequest.tapeRequestId, "lxc2disk07:/dev/null", 18, false);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_PROCESS_RCP_JOB_SUBMISSION, 3, params);
AGGREGATOR_FAILED_TO_GIVE_FILE_INFO, 3, params);
}
*/
// Close and de-allocate the VDQM socket no matter if the processing of the
// job submission request succeeded or not
delete(vdqmSocket);
// Return if the processing of the job submission request failed
if(!processedJobSubmissionRequest) {
return;
// Close all connected sockets
for(
std::list<int>::iterator itor = connectedSockets.begin();
itor != connectedSockets.end();
itor++) {
close(*itor);
}
}
//-----------------------------------------------------------------------------
// coordinateRemoteCopy
//-----------------------------------------------------------------------------
void castor::tape::aggregator::VdqmRequestHandlerThread::coordinateRemoteCopy(
const Cuuid_t &cuuid, const RcpJobRequestMessage &vdqmJobRequest,
const int rtcpdCallbackSocketFd) throw(castor::exception::Exception) {
// Accept the initial incoming RTCPD callback connection
int rtcpdInitialSocketFd = 0;
try {
rtcpdInitialSocketFd = Net::acceptConnection(rtcpdCallbackSocketFd,
NETRWTIMEOUT);
RTCPDCALLBACKTIMEOUT);
unsigned short port = 0; // Client port
unsigned long ip = 0; // Client IP
char hostName[HOSTNAMEBUFLEN];
try {
unsigned short port = 0; // Client port
unsigned long ip = 0; // Client IP
char hostName[HOSTNAMEBUFLEN];
Net::getPeerIpAndPort(rtcpdInitialSocketFd, ip, port);
Net::getPeerHostName(rtcpdInitialSocketFd, hostName);
Net::getPeerIpAndPort(rtcpdInitialSocketFd, ip, port);
Net::getPeerHostName(rtcpdInitialSocketFd, hostName);
castor::dlf::Param params[] = {
castor::dlf::Param("IP" , castor::dlf::IPAddress(ip)),
castor::dlf::Param("Port" , port),
castor::dlf::Param("HostName", hostName)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_INITIAL_RTCPD_CALLBACK_WITH_INFO, 3, params);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM, AGGREGATOR_NULL, 3, params);
}
// Give the volume ID from the VDQM job submission message to RTCPD
{
uint32_t tStartRequest = time(NULL); // CASTOR2/rtcopy/rtcpclientd.c:1494
RtcpTapeRequestMessage request;
RtcpTapeRequestMessage reply;
castor::dlf::Param params[] = {
castor::dlf::Param("IP" , castor::dlf::IPAddress(ip)),
castor::dlf::Param("Port" , port),
castor::dlf::Param("HostName", hostName)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_INITIAL_RTCPD_CALLBACK_WITH_INFO, 3, params);
} catch(castor::exception::Exception &ex) {
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_INITIAL_RTCPD_CALLBACK_WITHOUT_INFO);
}
Utils::setBytes(request, '\0');
try {
uint32_t tStartRequest = time(NULL); // CASTOR2/rtcopy/rtcpclientd.c:1494
RtcpTapeRequestMessage request;
RtcpTapeRequestMessage reply;
Utils::setBytes(request, '\0');
Utils::copyString(request.vid , "I10547");
Utils::copyString(request.vsn , "I10547");
Utils::copyString(request.label , "aul" );
Utils::copyString(request.density, "700GC" );
request.volReqId = vdqmJobRequest.tapeRequestId;
request.tStartRequest = tStartRequest;
Transceiver::giveVolumeIdToRtcpd(rtcpdInitialSocketFd, NETRWTIMEOUT,
request, reply);
castor::dlf::Param params[] = {
castor::dlf::Param("vid" , request.vid ),
castor::dlf::Param("vsn" , request.vsn ),
castor::dlf::Param("label" , request.label ),
castor::dlf::Param("devtype", request.devtype),
castor::dlf::Param("density", request.density)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_GAVE_VOLUME_INFO, 26, params);
Transceiver::giveRequestForMoreWorkToRtcpd(rtcpdInitialSocketFd,
NETRWTIMEOUT, vdqmJobRequest.tapeRequestId);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_NULL, 3, params);
return;
AGGREGATOR_FAILED_TO_GIVE_VOLUME_INFO, 3, params);
}
request.volReqId = vdqmJobRequest.tapeRequestID;
request.tStartRequest = tStartRequest;
try {
selectLoop(cuuid, vdqmJobRequest, rtcpdCallbackSocketFd,
rtcpdInitialSocketFd);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_SELECT_LOOP_FAILED, 3, params);
}
Transceiver::giveVolumeIdToRtcpd(rtcpdInitialSocketFd, NETRWTIMEOUT,
request, reply);
close(rtcpdInitialSocketFd);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM, AGGREGATOR_NULL, 3, params);
}
}
castor::dlf::Param params[] = {
castor::dlf::Param("vid" , request.vid ),
castor::dlf::Param("vsn" , request.vsn ),
castor::dlf::Param("label" , request.label ),
castor::dlf::Param("devtype" , request.devtype),
castor::dlf::Param("density" , request.density)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_GAVE_VOLUME_INFO, 26, params);
//-----------------------------------------------------------------------------
// run
//-----------------------------------------------------------------------------
void castor::tape::aggregator::VdqmRequestHandlerThread::run(void *param)
throw() {
Cuuid_t cuuid = nullCuuid;
// Give a Cuuid to the request
Cuuid_create(&cuuid);
if(param == NULL) {
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_VDQM_REQUEST_HANDLER_SOCKET_IS_NULL);
return;
}
castor::io::AbstractTCPSocket *vdqmSocket =
(castor::io::AbstractTCPSocket*)param;
int rtcpdCallbackSocketFd = 0;
bool processedJobSubmissionRequest = false;
RcpJobRequestMessage vdqmJobRequest;
try {
// Create, bind and mark a listener socket for RTCPD callback connections
rtcpdCallbackSocketFd = Net::createListenerSocket(0);
try {
processJobSubmissionRequest(cuuid, vdqmSocket->socket(), vdqmJobRequest,
rtcpdCallbackSocketFd);
processedJobSubmissionRequest = true;
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_GIVE_VOLUME_INFO, 3, params);
AGGREGATOR_FAILED_TO_PROCESS_RCP_JOB_SUBMISSION, 3, params);
}
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_CREATE_RTCPD_CALLBACK_SOCKET, 3, params);
}
// Give file information to RTCPD
// Close and de-allocate the VDQM socket no matter if the processing of the
// job submission request succeeded or not
delete(vdqmSocket);
// Close the RTCPD callback listener socket and return if the job submisison
// request was not processed
if(!processedJobSubmissionRequest) {
close(rtcpdCallbackSocketFd);
return;
}
// CoordinateRemoteCopy
try {
Transceiver::giveFileListToRtcpd(rtcpdInitialSocketFd, NETRWTIMEOUT,
vdqmJobRequest.tapeRequestID, "lxc2disk07:/dev/null", 18, false);
coordinateRemoteCopy(cuuid, vdqmJobRequest, rtcpdCallbackSocketFd);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_GIVE_FILE_INFO, 3, params);
AGGREGATOR_FAILED_TO_COORDINATE_REMOTE_COPY, 3, params);
}
// Select loop
// Close RTCPD callback listener socket
close(rtcpdCallbackSocketFd);
}
......
......@@ -96,21 +96,53 @@ namespace aggregator {
*
* If the processing of the request is successfull then this function will
* have passed the request onto RTCPD and will have given as output the
* contents of the job submission request and the file descriptor of the
* listener socket which will be used to accept callback connections from
* RTCPD.
* contents of the job submission request.
*
* @param cuuid The ccuid to be used for logging.
* @param vdqmSocketFd The socket file descriptor of the connection from
* which the request is to be processed.
* @param jobRequest Out parameter. The job request message structure
* which will be filled by this function.
* @rtcpdCallbackSocketFd The file descriptor of the listener socket which
* will be used to accept callback connections from RTCPD.
* @param rtcpdCallbackSocketFd The file descriptor of the listener socket
* to be used to accept callback connections from RTCPD.
*/
void processJobSubmissionRequest(const Cuuid_t &cuuid,
const int vdqmSocketFd, RcpJobRequestMessage &jobRequest,
int &rtcpdCallbackSocketFd) throw(castor::exception::Exception);
const int rtcpdCallbackSocketFd) throw(castor::exception::Exception);
/**
* The main select loop responsible for processing:
* <ul>
* <li>The incoming callback connection request from the tape I/O thread.
* <li>The incoming callback connection requests from the disk I/O threads.
* <li>Any incoming messages sent from RTCPD over any of the connected
* sockets, including the initial RTCPD connection.
* </ul>
*
* @param cuuid The ccuid to be used for logging.
* @param vdqmJobRequest The job request message received from the VDQM.
* @param rtcpdCallbackSocketFd The file descriptor of the listener socket
* to be used to accept callback connections from RTCPD.
* @param rtcpdInitialSocketFd The socket file descriptor of the initial
* connection made by RTCPD.
*/
void selectLoop(const Cuuid_t &cuuid,
const RcpJobRequestMessage &vdqmJobRequest,
const int rtcpdCallbackSocketFd, const int rtcpdInitialSocketFd)
throw(castor::exception::Exception);
/**
* Coordinates the remote copy operations by sending and recieving the
* necessary messages.
*
* @param cuuid The ccuid to be used for logging.
* @param jobRequest The job request message received from the VDQM.
* @param rtcpdCallbackSocketFd The file descriptor of the listener socket
* to be used to accept callback connections from RTCPD.
*/
void coordinateRemoteCopy(const Cuuid_t &cuuid,
const RcpJobRequestMessage &vdqmJobRequest,
const int rtcpdCallbackSocketFd) throw(castor::exception::Exception);
/**
* Throws an exception if the peer host associated with the specified
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment