Commit 69aba7a4 authored by Steven Murray's avatar Steven Murray
Browse files

Started investigations into the error protocol that is observerd to start on

the initial connection from RTCPD.  The main select loop is now in place so
that we can see many messages and connections at the same time.  However we
still have not seen the incoming connections we expect for the tape and disk
I/O threads.
parent 1f69b4a2
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Fri Jan 30 13:44:21 CET 2009
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Fri Jan 30 18:49:46 CET 2009
*/
/******************************************************************************
......@@ -68,7 +68,11 @@ AGGREGATOR_SIGNALLED_NO_MORE_REQUESTS=31, /* "Signalled no more requests to RTCP
AGGREGATOR_FAILED_TO_SIGNAL_NO_MORE_REQUESTS=32, /* "Failed to signal no more requests to RTCPD" */
AGGREGATOR_FAILED_TO_COORDINATE_REMOTE_COPY=33, /* "Failed to coordinate remote copy" */
AGGREGATOR_FAILED_TO_CREATE_RTCPD_CALLBACK_SOCKET=34, /* "Failed to create RTCPD callback socket" */
AGGREGATOR_SELECT_LOOP_FAILED=35 /* "Main select loop failed" */
AGGREGATOR_PROCESS_RTCPD_CONNECTIONS_FAILED=35, /* "Failed to process RTCPD connections" */
AGGREGATOR_MAIN_SELECT_TIMEDOUT=36, /* "The select of the main select loop timed out" */
AGGREGATOR_MAIN_SELECT_FAILED=37, /* "The select of the main select loop encountered an error other than an interruption" */
AGGREGATOR_GAVE_REQUEST_FOR_MORE_WORK=38, /* "Gave request for more work to RTCPD" */
AGGREGATOR_DATA_ON_INITIAL_RTCPD_CONNECTION=39 /* "Data has arrived on the initial RTCPD connection" */
}; // enum AggregatorDlfMessages
} // namespace aggregator
} // namespace tape
......
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Fri Jan 30 13:44:21 CET 2009
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Fri Jan 30 18:49:46 CET 2009
*/
/******************************************************************************
......@@ -64,5 +64,9 @@ castor::dlf::Message castor::tape::aggregator::AggregatorDaemon::s_dlfMessages[]
{AGGREGATOR_FAILED_TO_SIGNAL_NO_MORE_REQUESTS, "Failed to signal no more requests to RTCPD"},
{AGGREGATOR_FAILED_TO_COORDINATE_REMOTE_COPY, "Failed to coordinate remote copy"},
{AGGREGATOR_FAILED_TO_CREATE_RTCPD_CALLBACK_SOCKET, "Failed to create RTCPD callback socket"},
{AGGREGATOR_SELECT_LOOP_FAILED, "Main select loop failed"},
{AGGREGATOR_PROCESS_RTCPD_CONNECTIONS_FAILED, "Failed to process RTCPD connections"},
{AGGREGATOR_MAIN_SELECT_TIMEDOUT, "The select of the main select loop timed out"},
{AGGREGATOR_MAIN_SELECT_FAILED, "The select of the main select loop encountered an error other than an interruption"},
{AGGREGATOR_GAVE_REQUEST_FOR_MORE_WORK, "Gave request for more work to RTCPD"},
{AGGREGATOR_DATA_ON_INITIAL_RTCPD_CONNECTION, "Data has arrived on the initial RTCPD connection"},
{-1, ""}};
......@@ -33,4 +33,8 @@
32,AGGREGATOR_FAILED_TO_SIGNAL_NO_MORE_REQUESTS,"Failed to signal no more requests to RTCPD"
33,AGGREGATOR_FAILED_TO_COORDINATE_REMOTE_COPY,"Failed to coordinate remote copy"
34,AGGREGATOR_FAILED_TO_CREATE_RTCPD_CALLBACK_SOCKET,"Failed to create RTCPD callback socket"
35,AGGREGATOR_SELECT_LOOP_FAILED,"Main select loop failed"
35,AGGREGATOR_PROCESS_RTCPD_CONNECTIONS_FAILED,"Failed to process RTCPD connections"
36,AGGREGATOR_MAIN_SELECT_TIMEDOUT,"The select of the main select loop timed out"
37,AGGREGATOR_MAIN_SELECT_FAILED,"The select of the main select loop encountered an error other than an interruption"
38,AGGREGATOR_GAVE_REQUEST_FOR_MORE_WORK,"Gave request for more work to RTCPD"
39,AGGREGATOR_DATA_ON_INITIAL_RTCPD_CONNECTION,"Data has arrived on the initial RTCPD connection"
......@@ -38,7 +38,7 @@ const size_t HDRBUFSIZ = 3 * sizeof(uint32_t);
const size_t HOSTNAMEBUFLEN = 256;
const int LISTENBACKLOG = 2;
const size_t MSGBUFSIZ = 1024;
const int NETRWTIMEOUT = 300;
const int RTCPDNETRWTIMEOUT = 5;
const int RTCPDCALLBACKTIMEOUT = 5;
const size_t SERVICENAMEBUFLEN = 256;
const size_t STRERRORBUFLEN = 256;
......
......@@ -179,7 +179,8 @@ void castor::tape::aggregator::RcpJobSubmitter::readReply(
// Read in the message header
char headerBuf[3 * sizeof(uint32_t)]; // magic + request type + len
try {
Net::readBytes(socket.socket(), NETRWTIMEOUT, sizeof(headerBuf), headerBuf);
Net::readBytes(socket.socket(), RTCPDNETRWTIMEOUT, sizeof(headerBuf),
headerBuf);
} catch (castor::exception::Exception &ex) {
castor::exception::Exception ex2(SECOMERR);
......@@ -267,7 +268,7 @@ void castor::tape::aggregator::RcpJobSubmitter::readReply(
// Read the message body
try {
Net::readBytes(socket.socket(), NETRWTIMEOUT, header.len, bodyBuf);
Net::readBytes(socket.socket(), RTCPDNETRWTIMEOUT, header.len, bodyBuf);
} catch (castor::exception::Exception &ex) {
castor::exception::Exception ex2(SECOMERR);
......
......@@ -244,7 +244,7 @@ void castor::tape::aggregator::Transceiver::receiveRtcpTapeRequest(
// Read in the message header
char headerBuf[3 * sizeof(uint32_t)]; // magic + request type + len
try {
Net::readBytes(socketFd, NETRWTIMEOUT, sizeof(headerBuf), headerBuf);
Net::readBytes(socketFd, RTCPDNETRWTIMEOUT, sizeof(headerBuf), headerBuf);
} catch (castor::exception::Exception &ex) {
castor::exception::Exception ex2(SECOMERR);
......@@ -314,7 +314,7 @@ void castor::tape::aggregator::Transceiver::receiveRtcpTapeRequest(
// Read the message body
try {
Net::readBytes(socketFd, NETRWTIMEOUT, header.len, bodyBuf);
Net::readBytes(socketFd, RTCPDNETRWTIMEOUT, header.len, bodyBuf);
} catch (castor::exception::Exception &ex) {
castor::exception::Exception ex2(EIO);
......@@ -435,7 +435,7 @@ void castor::tape::aggregator::Transceiver::receiveRtcpFileRequest(
// Read in the message header
char headerBuf[3 * sizeof(uint32_t)]; // magic + request type + len
try {
Net::readBytes(socketFd, NETRWTIMEOUT, sizeof(headerBuf), headerBuf);
Net::readBytes(socketFd, RTCPDNETRWTIMEOUT, sizeof(headerBuf), headerBuf);
} catch (castor::exception::Exception &ex) {
castor::exception::Exception ex2(SECOMERR);
......@@ -505,7 +505,7 @@ void castor::tape::aggregator::Transceiver::receiveRtcpFileRequest(
// Read the message body
try {
Net::readBytes(socketFd, NETRWTIMEOUT, header.len, bodyBuf);
Net::readBytes(socketFd, RTCPDNETRWTIMEOUT, header.len, bodyBuf);
} catch (castor::exception::Exception &ex) {
castor::exception::Exception ex2(EIO);
......@@ -544,7 +544,7 @@ void castor::tape::aggregator::Transceiver::receiveRtcpAcknowledge(
// body)
char messageBuf[3 * sizeof(uint32_t)]; // magic + request type + status
try {
Net::readBytes(socketFd, NETRWTIMEOUT, sizeof(messageBuf),
Net::readBytes(socketFd, RTCPDNETRWTIMEOUT, sizeof(messageBuf),
messageBuf);
} catch (castor::exception::Exception &ex) {
castor::exception::Exception ex2(SECOMERR);
......@@ -700,7 +700,7 @@ void castor::tape::aggregator::Transceiver::receiveRcpJobRequest(
// Read in the message header
char headerBuf[3 * sizeof(uint32_t)]; // magic + request type + len
try {
Net::readBytes(socketFd, NETRWTIMEOUT, sizeof(headerBuf), headerBuf);
Net::readBytes(socketFd, RTCPDNETRWTIMEOUT, sizeof(headerBuf), headerBuf);
} catch (castor::exception::Exception &ex) {
castor::exception::Exception ex2(SECOMERR);
......@@ -787,7 +787,7 @@ void castor::tape::aggregator::Transceiver::receiveRcpJobRequest(
// Read the message body
try {
Net::readBytes(socketFd, NETRWTIMEOUT, header.len, bodyBuf);
Net::readBytes(socketFd, RTCPDNETRWTIMEOUT, header.len, bodyBuf);
} catch (castor::exception::Exception &ex) {
castor::exception::Exception ex2(EIO);
......@@ -847,12 +847,12 @@ void castor::tape::aggregator::Transceiver::giveRequestForMoreWorkToRtcpd(
request.err.maxTpRetry = -1;
request.err.maxCpRetry = -1;
giveFileInfoToRtcpd(socketFd, NETRWTIMEOUT, request, reply);
giveFileInfoToRtcpd(socketFd, RTCPDNETRWTIMEOUT, request, reply);
// TBD - process reply
// Signal the end of the file list to RTCPD
signalNoMoreRequestsToRtcpd(socketFd, NETRWTIMEOUT);
signalNoMoreRequestsToRtcpd(socketFd, RTCPDNETRWTIMEOUT);
}
......@@ -894,7 +894,7 @@ void castor::tape::aggregator::Transceiver::giveFileListToRtcpd(
request.err.maxTpRetry = -1;
request.err.maxCpRetry = -1;
giveFileInfoToRtcpd(socketFd, NETRWTIMEOUT, request, reply);
giveFileInfoToRtcpd(socketFd, RTCPDNETRWTIMEOUT, request, reply);
// TBD - process reply
}
......@@ -922,11 +922,11 @@ void castor::tape::aggregator::Transceiver::giveFileListToRtcpd(
request.err.maxTpRetry = -1;
request.err.maxCpRetry = -1;
giveFileInfoToRtcpd(socketFd, NETRWTIMEOUT, request, reply);
giveFileInfoToRtcpd(socketFd, RTCPDNETRWTIMEOUT, request, reply);
// TBD - process reply
}
// Signal the end of the file list to RTCPD
signalNoMoreRequestsToRtcpd(socketFd, NETRWTIMEOUT);
signalNoMoreRequestsToRtcpd(socketFd, RTCPDNETRWTIMEOUT);
}
......@@ -39,7 +39,7 @@
#include "h/rtcp_constants.h"
#include "h/vdqm_constants.h"
#include <list>
#include <sys/select.h>
//-----------------------------------------------------------------------------
......@@ -162,10 +162,11 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::
checkRcpJobSubmitterIsAuthorised(vdqmSocketFd);
Transceiver::receiveRcpJobRequest(vdqmSocketFd, NETRWTIMEOUT, jobRequest);
Transceiver::receiveRcpJobRequest(vdqmSocketFd, RTCPDNETRWTIMEOUT,
jobRequest);
{
castor::dlf::Param params[] = {
castor::dlf::Param("tapeRequestId" , jobRequest.tapeRequestId ),
castor::dlf::Param("volReqId" , jobRequest.tapeRequestId ),
castor::dlf::Param("clientPort" , jobRequest.clientPort ),
castor::dlf::Param("clientEuid" , jobRequest.clientEuid ),
castor::dlf::Param("clientEgid" , jobRequest.clientEgid ),
......@@ -188,13 +189,17 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::
// Pass a modified version of the job request through to RTCPD, setting the
// clientHost and clientPort parameters to identify the tape aggregator as
// being a proxy for RTCPClientD
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_SUBMITTING_JOB_TO_RTCPD);
{
castor::dlf::Param params[] = {
castor::dlf::Param("volReqId", jobRequest.tapeRequestId)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_SUBMITTING_JOB_TO_RTCPD, 1, params);
}
RcpJobReplyMessage rtcpdReply;
RcpJobSubmitter::submit(
"localhost", // host
RTCOPY_PORT, // port
NETRWTIMEOUT, // netReadWriteTimeout
RTCPDNETRWTIMEOUT, // netReadWriteTimeout
"RTCPD", // remoteCopyType
jobRequest.tapeRequestId,
jobRequest.clientUserName,
......@@ -216,11 +221,12 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::
// even if there is no error
if(strlen(rtcpdReply.errorMessage) > 0) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , rtcpdReply.errorMessage),
castor::dlf::Param("Code" , rtcpdReply.status)};
castor::dlf::Param("volReqId", jobRequest.tapeRequestId),
castor::dlf::Param("Function", __PRETTY_FUNCTION__ ),
castor::dlf::Param("Message" , rtcpdReply.errorMessage ),
castor::dlf::Param("Code" , rtcpdReply.status )};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_RECEIVED_RTCPD_ERROR_MESSAGE, 3, params);
AGGREGATOR_RECEIVED_RTCPD_ERROR_MESSAGE, 4, params);
// Override positive response with the error message from RTCPD
errorStatusForVdqm = rtcpdReply.status;
......@@ -235,80 +241,156 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::
vdqmReplyLen = Marshaller::marshallRcpJobReplyMessage(vdqmReplyBuf,
rtcpdReply);
Net::writeBytes(vdqmSocketFd, NETRWTIMEOUT, vdqmReplyLen, vdqmReplyBuf);
Net::writeBytes(vdqmSocketFd, RTCPDNETRWTIMEOUT, vdqmReplyLen, vdqmReplyBuf);
}
//-----------------------------------------------------------------------------
// selectLoop
// processErrorOnInitialRtcpdConnection
//-----------------------------------------------------------------------------
void castor::tape::aggregator::VdqmRequestHandlerThread::selectLoop(
const Cuuid_t &cuuid, const RcpJobRequestMessage &vdqmJobRequest,
const int rtcpdCallbackSocketFd, const int rtcpdInitialSocketFd)
void castor::tape::aggregator::VdqmRequestHandlerThread::
processErrorOnInitialRtcpdConnection(const Cuuid_t &cuuid,
const RcpJobRequestMessage &vdqmJobRequest, const int rtcpdInitialSocketFd)
throw(castor::exception::Exception) {
{
castor::dlf::Param params[] = {
castor::dlf::Param("volReqId", vdqmJobRequest.tapeRequestId)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_DATA_ON_INITIAL_RTCPD_CONNECTION, 1, params);
}
RtcpTapeRequestMessage request;
Transceiver::receiveRtcpTapeRequest(rtcpdInitialSocketFd, RTCPDNETRWTIMEOUT,
request);
// TBD - Process incoming request?
}
//-----------------------------------------------------------------------------
// acceptRtcpdConnection
//-----------------------------------------------------------------------------
void castor::tape::aggregator::VdqmRequestHandlerThread::
acceptRtcpdConnection(const Cuuid_t &cuuid,
const RcpJobRequestMessage &vdqmJobRequest, const int rtcpdCallbackSocketFd,
std::list<int> &connectedSockets) throw(castor::exception::Exception) {
const int connectedSocketFd = Net::acceptConnection(rtcpdCallbackSocketFd,
RTCPDCALLBACKTIMEOUT);
try {
unsigned short port = 0; // Client port
unsigned long ip = 0; // Client IP
char hostName[HOSTNAMEBUFLEN];
Net::getPeerIpAndPort(connectedSocketFd, ip, port);
Net::getPeerHostName(connectedSocketFd, hostName);
castor::dlf::Param params[] = {
castor::dlf::Param("volReqId", vdqmJobRequest.tapeRequestId),
castor::dlf::Param("IP" , castor::dlf::IPAddress(ip)),
castor::dlf::Param("Port" , port),
castor::dlf::Param("HostName", hostName)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_RTCPD_CALLBACK_WITH_INFO, 4, params);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("volReqId", vdqmJobRequest.tapeRequestId)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_RTCPD_CALLBACK_WITHOUT_INFO, 1, params);
}
connectedSockets.push_back(connectedSocketFd);
}
//-----------------------------------------------------------------------------
// processRtcpdSockets
//-----------------------------------------------------------------------------
void castor::tape::aggregator::VdqmRequestHandlerThread::
processRtcpdSockets(const Cuuid_t &cuuid,
const RcpJobRequestMessage &vdqmJobRequest, const int rtcpdCallbackSocketFd,
const int rtcpdInitialSocketFd) throw(castor::exception::Exception) {
std::list<int> connectedSockets;
fd_set fdSet;
int selectRc = 0;
int selectErrno = 0;
fd_set readFdSet;
int maxFd = 0;
timeval timeout;
try {
// Select loop
bool selectAgain = true;
while(selectAgain) {
//MISING largest fd logic!!!!!!
// Build the file descriptor set ready for the select call
FD_ZERO(&fdSet);
FD_SET(rtcpdInitialSocketFd, &fdSet);
for(
std::list<int>::iterator itor = connectedSockets.begin();
itor != connectedSockets.end();
itor++) {
FD_SET(*itor, &fdSet);
FD_ZERO(&readFdSet);
FD_SET(rtcpdInitialSocketFd, &readFdSet);
if(rtcpdCallbackSocketFd > rtcpdInitialSocketFd) {
maxFd = rtcpdCallbackSocketFd;
} else {
maxFd = rtcpdInitialSocketFd;
}
}
} catch(castor::exception::Exception &ex) {
// TBD
}
/*
// Give the volume ID from the VDQM job submission message to RTCPD
{
uint32_t tStartRequest = time(NULL); // CASTOR2/rtcopy/rtcpclientd.c:1494
RtcpTapeRequestMessage request;
RtcpTapeRequestMessage reply;
Utils::setBytes(request, '\0');
Utils::copyString(request.vid , "I10547");
Utils::copyString(request.vsn , "I10547");
Utils::copyString(request.label , "aul" );
Utils::copyString(request.density, "700GC" );
request.volReqId = vdqmJobRequest.tapeRequestId;
request.tStartRequest = tStartRequest;
try {
for(std::list<int>::iterator itor = connectedSockets.begin();
itor != connectedSockets.end(); itor++) {
Transceiver::giveVolumeIdToRtcpd(rtcpdInitialSocketFd, NETRWTIMEOUT,
request, reply);
FD_SET(*itor, &readFdSet);
castor::dlf::Param params[] = {
castor::dlf::Param("vid" , request.vid ),
castor::dlf::Param("vsn" , request.vsn ),
castor::dlf::Param("label" , request.label ),
castor::dlf::Param("devtype" , request.devtype),
castor::dlf::Param("density" , request.density)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_GAVE_VOLUME_INFO, 26, params);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_GIVE_VOLUME_INFO, 3, params);
}
}
if(*itor > maxFd) {
maxFd = *itor;
}
}
timeout.tv_sec = RTCPDNETRWTIMEOUT;
timeout.tv_usec = 0;
selectRc = select(maxFd + 1, &readFdSet, NULL, NULL, &timeout);
selectErrno = errno;
switch(selectRc) {
case 0: // Select timed out
castor::dlf::dlf_writep(cuuid, DLF_LVL_DEBUG,
AGGREGATOR_MAIN_SELECT_TIMEDOUT);
break;
case -1: // Select encountered an error
// If select encountered an error other than an interruption
if(selectErrno != EINTR) {
char strerrorBuf[STRERRORBUFLEN];
char *const errorStr = strerror_r(selectErrno, strerrorBuf,
sizeof(strerrorBuf));
castor::exception::Exception ex(selectErrno);
ex.getMessage() << __PRETTY_FUNCTION__
<< ": Select encountered an error other than an interruption"
": " << errorStr;
throw ex;
}
break;
default: // One or more select file descriptors require attention
// If there is an incoming message on the initial RTCPD connection
if(FD_ISSET(rtcpdInitialSocketFd, &readFdSet)) {
processErrorOnInitialRtcpdConnection(cuuid, vdqmJobRequest,
rtcpdInitialSocketFd);
// Else if there is a callback connection request from RTCPD
} else if(FD_ISSET(rtcpdCallbackSocketFd, &readFdSet)) {
acceptRtcpdConnection(cuuid, vdqmJobRequest, rtcpdCallbackSocketFd,
connectedSockets);
// Else there are one or messages from the tape and disk I/O threads
} else {
// TBD
/*
// Give file information to RTCPD
try {
Transceiver::giveFileListToRtcpd(rtcpdInitialSocketFd, NETRWTIMEOUT,
Transceiver::giveFileListToRtcpd(rtcpdInitialSocketFd, RTCPDNETRWTIMEOUT,
vdqmJobRequest.tapeRequestId, "lxc2disk07:/dev/null", 18, false);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
......@@ -319,6 +401,18 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::selectLoop(
AGGREGATOR_FAILED_TO_GIVE_FILE_INFO, 3, params);
}
*/
}
}
}
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("volReqId", vdqmJobRequest.tapeRequestId),
castor::dlf::Param("Function", __PRETTY_FUNCTION__ ),
castor::dlf::Param("Message" , ex.getMessage().str() ),
castor::dlf::Param("Code" , ex.code() )};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_MAIN_SELECT_FAILED, 4, params);
}
// Close all connected sockets
for(
......@@ -352,11 +446,12 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::coordinateRemoteCopy(
Net::getPeerHostName(rtcpdInitialSocketFd, hostName);
castor::dlf::Param params[] = {
castor::dlf::Param("IP" , castor::dlf::IPAddress(ip)),
castor::dlf::Param("Port" , port),
castor::dlf::Param("HostName", hostName)};
castor::dlf::Param("volReqId", vdqmJobRequest.tapeRequestId),
castor::dlf::Param("IP" , castor::dlf::IPAddress(ip) ),
castor::dlf::Param("Port" , port ),
castor::dlf::Param("HostName", hostName )};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_INITIAL_RTCPD_CALLBACK_WITH_INFO, 3, params);
AGGREGATOR_INITIAL_RTCPD_CALLBACK_WITH_INFO, 4, params);
} catch(castor::exception::Exception &ex) {
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_INITIAL_RTCPD_CALLBACK_WITHOUT_INFO);
......@@ -372,52 +467,68 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::coordinateRemoteCopy(
Utils::copyString(request.vsn , "I10547");
Utils::copyString(request.label , "aul" );
Utils::copyString(request.density, "700GC" );
request.volReqId = vdqmJobRequest.tapeRequestId;
request.tStartRequest = tStartRequest;
request.volReqId = vdqmJobRequest.tapeRequestId;
request.tStartRequest = tStartRequest;
request.err.severity = 1;
request.err.maxTpRetry = -1;
request.err.maxCpRetry = -1;
Transceiver::giveVolumeIdToRtcpd(rtcpdInitialSocketFd, NETRWTIMEOUT,
Transceiver::giveVolumeIdToRtcpd(rtcpdInitialSocketFd, RTCPDNETRWTIMEOUT,
request, reply);
castor::dlf::Param params[] = {
castor::dlf::Param("vid" , request.vid ),
castor::dlf::Param("vsn" , request.vsn ),
castor::dlf::Param("label" , request.label ),
castor::dlf::Param("devtype", request.devtype),
castor::dlf::Param("density", request.density)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_GAVE_VOLUME_INFO, 26, params);
{
castor::dlf::Param params[] = {
castor::dlf::Param("volReqId", vdqmJobRequest.tapeRequestId),
castor::dlf::Param("vid" , request.vid ),
castor::dlf::Param("vsn" , request.vsn ),
castor::dlf::Param("label" , request.label ),
castor::dlf::Param("devtype" , request.devtype ),
castor::dlf::Param("density" , request.density )};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_GAVE_VOLUME_INFO, 6, params);
}
Transceiver::giveRequestForMoreWorkToRtcpd(rtcpdInitialSocketFd,
NETRWTIMEOUT, vdqmJobRequest.tapeRequestId);
RTCPDNETRWTIMEOUT, vdqmJobRequest.tapeRequestId);
{
castor::dlf::Param params[] = {
castor::dlf::Param("volReqId", request.volReqId)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_GAVE_REQUEST_FOR_MORE_WORK, 1, params);
}
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::Param("volReqId", vdqmJobRequest.tapeRequestId),
castor::dlf::Param("Function", __PRETTY_FUNCTION__ ),
castor::dlf::Param("Message" , ex.getMessage().str() ),
castor::dlf::Param("Code" , ex.code() )};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_GIVE_VOLUME_INFO, 3, params);
AGGREGATOR_FAILED_TO_GIVE_VOLUME_INFO, 4, params);
}
try {
selectLoop(cuuid, vdqmJobRequest, rtcpdCallbackSocketFd,
processRtcpdSockets(cuuid, vdqmJobRequest, rtcpdCallbackSocketFd,
rtcpdInitialSocketFd);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::Param("volReqId", vdqmJobRequest.tapeRequestId),
castor::dlf::Param("Function", __PRETTY_FUNCTION__ ),
castor::dlf::Param("Message" , ex.getMessage().str() ),
castor::dlf::Param("Code" , ex.code() )};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_SELECT_LOOP_FAILED, 3, params);
AGGREGATOR_PROCESS_RTCPD_CONNECTIONS_FAILED, 4, params);
}
close(rtcpdInitialSocketFd);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM, AGGREGATOR_NULL, 3, params);
castor::dlf::Param("volReqId", vdqmJobRequest.tapeRequestId),
castor::dlf::Param("Function", __PRETTY_FUNCTION__ ),
castor::dlf::Param("Message" , ex.getMessage().str() ),
castor::dlf::Param("Code" , ex.code() )};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM, AGGREGATOR_NULL, 4, params);
}
}
......@@ -486,11 +597,12 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::run(void *param)
coordinateRemoteCopy(cuuid, vdqmJobRequest, rtcpdCallbackSocketFd);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::Param("volReqId", vdqmJobRequest.tapeRequestId),
castor::dlf::Param("Function", __PRETTY_FUNCTION__ ),
castor::dlf::Param("Message" , ex.getMessage().str() ),
castor::dlf::Param("Code" , ex.code() )};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_COORDINATE_REMOTE_COPY, 3, params);
AGGREGATOR_FAILED_TO_COORDINATE_REMOTE_COPY, 4, params);
}