Commit 07a07a4a authored by Steven Murray's avatar Steven Murray
Browse files

Aggregator <-> RTCPD protocol started. Got as far as receiving the

ackowledge of a request for a VID from RTCPD.  Still a long, long way to go,
but it's a start.
parent 7a93521d
...@@ -128,9 +128,10 @@ void castor::tape::aggregator::AggregatorDaemon::parseCommandLine( ...@@ -128,9 +128,10 @@ void castor::tape::aggregator::AggregatorDaemon::parseCommandLine(
// Log and throw an exception // Log and throw an exception
castor::dlf::Param params[] = { castor::dlf::Param params[] = {
castor::dlf::Param("reason", oss.str())}; castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Reason" , oss.str())};
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_ERROR, castor::dlf::dlf_writep(nullCuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_PARSE_COMMAND_LINE, 1, params); AGGREGATOR_FAILED_TO_PARSE_COMMAND_LINE, 2, params);
castor::exception::InvalidArgument ex; castor::exception::InvalidArgument ex;
ex.getMessage() << oss.str(); ex.getMessage() << oss.str();
throw ex; throw ex;
...@@ -148,9 +149,10 @@ void castor::tape::aggregator::AggregatorDaemon::parseCommandLine( ...@@ -148,9 +149,10 @@ void castor::tape::aggregator::AggregatorDaemon::parseCommandLine(
// Log and throw an exception // Log and throw an exception
castor::dlf::Param params[] = { castor::dlf::Param params[] = {
castor::dlf::Param("reason", oss.str())}; castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Reason" , oss.str())};
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_ERROR, castor::dlf::dlf_writep(nullCuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_PARSE_COMMAND_LINE, 1, params); AGGREGATOR_FAILED_TO_PARSE_COMMAND_LINE, 2, params);
castor::exception::InvalidArgument ex; castor::exception::InvalidArgument ex;
ex.getMessage() << oss.str(); ex.getMessage() << oss.str();
throw ex; throw ex;
...@@ -162,9 +164,10 @@ void castor::tape::aggregator::AggregatorDaemon::parseCommandLine( ...@@ -162,9 +164,10 @@ void castor::tape::aggregator::AggregatorDaemon::parseCommandLine(
// Log and throw an exception // Log and throw an exception
castor::dlf::Param params[] = { castor::dlf::Param params[] = {
castor::dlf::Param("reason", oss.str())}; castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Reason" , oss.str())};
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_ERROR, castor::dlf::dlf_writep(nullCuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_PARSE_COMMAND_LINE, 1, params); AGGREGATOR_FAILED_TO_PARSE_COMMAND_LINE, 2, params);
castor::exception::InvalidArgument ex; castor::exception::InvalidArgument ex;
ex.getMessage() << oss.str(); ex.getMessage() << oss.str();
throw ex; throw ex;
...@@ -177,9 +180,10 @@ void castor::tape::aggregator::AggregatorDaemon::parseCommandLine( ...@@ -177,9 +180,10 @@ void castor::tape::aggregator::AggregatorDaemon::parseCommandLine(
// Log and throw an exception // Log and throw an exception
castor::dlf::Param params[] = { castor::dlf::Param params[] = {
castor::dlf::Param("reason", oss.str())}; castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Reason" , oss.str())};
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_ERROR, castor::dlf::dlf_writep(nullCuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_PARSE_COMMAND_LINE, 1, params); AGGREGATOR_FAILED_TO_PARSE_COMMAND_LINE, 2, params);
castor::exception::Internal ex; castor::exception::Internal ex;
ex.getMessage() << oss.str(); ex.getMessage() << oss.str();
throw ex; throw ex;
...@@ -193,9 +197,10 @@ void castor::tape::aggregator::AggregatorDaemon::parseCommandLine( ...@@ -193,9 +197,10 @@ void castor::tape::aggregator::AggregatorDaemon::parseCommandLine(
// Log and throw an exception // Log and throw an exception
castor::dlf::Param params[] = { castor::dlf::Param params[] = {
castor::dlf::Param("reason", oss.str())}; castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Reason", oss.str())};
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_ERROR, castor::dlf::dlf_writep(nullCuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_PARSE_COMMAND_LINE, 1, params); AGGREGATOR_FAILED_TO_PARSE_COMMAND_LINE, 2, params);
castor::exception::Internal ex; castor::exception::Internal ex;
ex.getMessage() << oss.str(); ex.getMessage() << oss.str();
throw ex; throw ex;
...@@ -210,9 +215,10 @@ void castor::tape::aggregator::AggregatorDaemon::parseCommandLine( ...@@ -210,9 +215,10 @@ void castor::tape::aggregator::AggregatorDaemon::parseCommandLine(
// Log and throw an exception // Log and throw an exception
castor::dlf::Param params[] = { castor::dlf::Param params[] = {
castor::dlf::Param("reason", oss.str())}; castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Reason" , oss.str())};
castor::dlf::dlf_writep(nullCuuid, DLF_LVL_ERROR, castor::dlf::dlf_writep(nullCuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_PARSE_COMMAND_LINE, 1, params); AGGREGATOR_FAILED_TO_PARSE_COMMAND_LINE, 2, params);
castor::exception::InvalidArgument e; castor::exception::InvalidArgument e;
e.getMessage() << oss.str(); e.getMessage() << oss.str();
throw e; throw e;
......
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Tue Dec 2 16:35:03 CET 2008 /* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Thu Dec 4 11:35:54 CET 2008
*/ */
/****************************************************************************** /******************************************************************************
......
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Tue Dec 2 16:35:03 CET 2008 /* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Thu Dec 4 11:35:54 CET 2008
*/ */
/****************************************************************************** /******************************************************************************
......
...@@ -34,7 +34,7 @@ namespace tape { ...@@ -34,7 +34,7 @@ namespace tape {
namespace aggregator { namespace aggregator {
const size_t STRERRORBUFLEN = 256; const size_t STRERRORBUFLEN = 256;
const int NETRW_TIMEOUT = 300; const int NETRWTIMEOUT = 300;
const size_t HDRBUFSIZ = 3 * sizeof(uint32_t); const size_t HDRBUFSIZ = 3 * sizeof(uint32_t);
const size_t MSGBUFSIZ = 1024; const size_t MSGBUFSIZ = 1024;
......
...@@ -259,7 +259,7 @@ size_t castor::tape::aggregator::Marshaller::marshallRcpJobRequest( ...@@ -259,7 +259,7 @@ size_t castor::tape::aggregator::Marshaller::marshallRcpJobRequest(
const size_t totalLen = 3 * sizeof(uint32_t) + len; const size_t totalLen = 3 * sizeof(uint32_t) + len;
// Check that the message buffer is big enough // Check that the message buffer is big enough
if(sizeof(dstLen) < totalLen) { if(totalLen > dstLen) {
castor::exception::Exception ex(EMSGSIZE); castor::exception::Exception ex(EMSGSIZE);
ex.getMessage() << "Buffer too small for job submission request message: " ex.getMessage() << "Buffer too small for job submission request message: "
...@@ -398,7 +398,7 @@ size_t castor::tape::aggregator::Marshaller::marshallRTCPTapeRequest(char *dst, ...@@ -398,7 +398,7 @@ size_t castor::tape::aggregator::Marshaller::marshallRTCPTapeRequest(char *dst,
const size_t totalLen = 3 * sizeof(uint32_t) + len; const size_t totalLen = 3 * sizeof(uint32_t) + len;
// Check that the message buffer is big enough // Check that the message buffer is big enough
if(sizeof(dstLen) < totalLen) { if(totalLen > dstLen) {
castor::exception::Exception ex(EMSGSIZE); castor::exception::Exception ex(EMSGSIZE);
ex.getMessage() << "Buffer too small for tape request message: " ex.getMessage() << "Buffer too small for tape request message: "
......
...@@ -175,7 +175,7 @@ void castor::tape::aggregator::RcpJobSubmitter::readReply( ...@@ -175,7 +175,7 @@ void castor::tape::aggregator::RcpJobSubmitter::readReply(
// Read and unmarshall the magic number of the request // Read and unmarshall the magic number of the request
uint32_t magic = 0; uint32_t magic = 0;
try { try {
magic = SocketHelper::readUint32(socket, NETRW_TIMEOUT); magic = SocketHelper::readUint32(socket, NETRWTIMEOUT);
} catch (castor::exception::Exception &ex) { } catch (castor::exception::Exception &ex) {
castor::exception::Exception ex2(EIO); castor::exception::Exception ex2(EIO);
...@@ -202,7 +202,7 @@ void castor::tape::aggregator::RcpJobSubmitter::readReply( ...@@ -202,7 +202,7 @@ void castor::tape::aggregator::RcpJobSubmitter::readReply(
// Read and unmarshall the type of the request // Read and unmarshall the type of the request
uint32_t reqtype = 0; uint32_t reqtype = 0;
try { try {
reqtype = SocketHelper::readUint32(socket, NETRW_TIMEOUT); reqtype = SocketHelper::readUint32(socket, NETRWTIMEOUT);
} catch (castor::exception::Exception &ex) { } catch (castor::exception::Exception &ex) {
castor::exception::Exception ex2(EIO); castor::exception::Exception ex2(EIO);
...@@ -229,7 +229,7 @@ void castor::tape::aggregator::RcpJobSubmitter::readReply( ...@@ -229,7 +229,7 @@ void castor::tape::aggregator::RcpJobSubmitter::readReply(
// Read and unmarshall the type of the request // Read and unmarshall the type of the request
uint32_t len = 0; uint32_t len = 0;
try { try {
len = SocketHelper::readUint32(socket, NETRW_TIMEOUT); len = SocketHelper::readUint32(socket, NETRWTIMEOUT);
} catch (castor::exception::Exception &ex) { } catch (castor::exception::Exception &ex) {
castor::exception::Exception ex2(EIO); castor::exception::Exception ex2(EIO);
...@@ -272,7 +272,7 @@ void castor::tape::aggregator::RcpJobSubmitter::readReply( ...@@ -272,7 +272,7 @@ void castor::tape::aggregator::RcpJobSubmitter::readReply(
// Read the message body from the socket // Read the message body from the socket
try { try {
SocketHelper::readBytes(socket, NETRW_TIMEOUT, len, body); SocketHelper::readBytes(socket, NETRWTIMEOUT, len, body);
} catch (castor::exception::Exception &ex) { } catch (castor::exception::Exception &ex) {
castor::exception::Exception ex2(EIO); castor::exception::Exception ex2(EIO);
......
...@@ -34,6 +34,9 @@ ...@@ -34,6 +34,9 @@
#include "castor/tape/aggregator/exception/RTCPDErrorMessage.hpp" #include "castor/tape/aggregator/exception/RTCPDErrorMessage.hpp"
#include "h/common.h" #include "h/common.h"
#include "h/rtcp.h" #include "h/rtcp.h"
#include "h/rtcp_constants.h"
#include <string.h>
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
...@@ -47,7 +50,7 @@ castor::tape::aggregator::RtcpdHandlerThread::RtcpdHandlerThread() throw() { ...@@ -47,7 +50,7 @@ castor::tape::aggregator::RtcpdHandlerThread::RtcpdHandlerThread() throw() {
// destructor // destructor
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
castor::tape::aggregator::RtcpdHandlerThread::~RtcpdHandlerThread() castor::tape::aggregator::RtcpdHandlerThread::~RtcpdHandlerThread()
throw () { throw() {
} }
...@@ -70,8 +73,10 @@ void castor::tape::aggregator::RtcpdHandlerThread::run(void *param) ...@@ -70,8 +73,10 @@ void castor::tape::aggregator::RtcpdHandlerThread::run(void *param)
Cuuid_create(&cuuid); Cuuid_create(&cuuid);
if(param == NULL) { if(param == NULL) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR, castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_RTCPD_HANDLER_SOCKET_IS_NULL); AGGREGATOR_RTCPD_HANDLER_SOCKET_IS_NULL, 1, params);
return; return;
} }
...@@ -93,41 +98,45 @@ void castor::tape::aggregator::RtcpdHandlerThread::run(void *param) ...@@ -93,41 +98,45 @@ void castor::tape::aggregator::RtcpdHandlerThread::run(void *param)
} catch(castor::exception::Exception &ex) { } catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = { castor::dlf::Param params[] = {
castor::dlf::Param("Message", ex.getMessage().str()), castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Code" , ex.code())}; castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR, castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_RTCPD_CONNECTION_WITHOUT_INFO, 2, params); AGGREGATOR_RTCPD_CONNECTION_WITHOUT_INFO, 3, params);
} }
std::string vid; std::string vid;
{ {
castor::dlf::Param params[] = { castor::dlf::Param params[] = {
castor::dlf::Param("Message", "Just before getVidFromRtcpd")}; castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , "Just before getVidFromRtcpd")};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM, castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_NULL, 1, params); AGGREGATOR_NULL, 2, params);
} }
try { try {
vid = getVidFromRtcpd(*socket); vid = getVidFromRtcpd(*socket, NETRWTIMEOUT);
{ {
castor::dlf::Param params[] = { castor::dlf::Param params[] = {
castor::dlf::Param("Message", "Just after getVidFromRtcpd")}; castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , "Just after getVidFromRtcpd")};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM, castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_NULL, 1, params); AGGREGATOR_NULL, 2, params);
} }
castor::dlf::Param params[] = { castor::dlf::Param params[] = {
castor::dlf::Param("VID", vid)}; castor::dlf::Param("VID" , vid)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM, castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_GOT_VID_FROM_RTCPD, 1, params); AGGREGATOR_GOT_VID_FROM_RTCPD, 1, params);
} catch(castor::exception::Exception &ex) { } catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = { castor::dlf::Param params[] = {
castor::dlf::Param("Message", ex.getMessage().str()), castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Code" , ex.code())}; castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR, castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_GET_VID_FROM_RTCPD, 2, params); AGGREGATOR_FAILED_TO_GET_VID_FROM_RTCPD, 3, params);
} }
// Close and de-allocate the socket // Close and de-allocate the socket
...@@ -148,13 +157,20 @@ void castor::tape::aggregator::RtcpdHandlerThread::stop() ...@@ -148,13 +157,20 @@ void castor::tape::aggregator::RtcpdHandlerThread::stop()
// getVidFromRtcpd // getVidFromRtcpd
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
std::string castor::tape::aggregator::RtcpdHandlerThread::getVidFromRtcpd( std::string castor::tape::aggregator::RtcpdHandlerThread::getVidFromRtcpd(
castor::io::AbstractTCPSocket &socket) throw(castor::exception::Exception) { castor::io::AbstractTCPSocket &socket, const int netReadWriteTimeout)
throw(castor::exception::Exception) {
char buffer[MSGBUFSIZ]; char buf[MSGBUFSIZ];
RtcpTapeRequest request; RtcpTapeRequest request;
memset(&request, '\0', sizeof(request));
// Marshall the request for VID message
size_t totalLen = 0;
try { try {
Marshaller::marshallRTCPTapeRequest(&buffer[0], sizeof(buffer), request); totalLen = Marshaller::marshallRTCPTapeRequest(&buf[0], sizeof(buf),
request);
} catch(castor::exception::Exception &ex) { } catch(castor::exception::Exception &ex) {
castor::exception::Internal ie; castor::exception::Internal ie;
...@@ -164,6 +180,42 @@ std::string castor::tape::aggregator::RtcpdHandlerThread::getVidFromRtcpd( ...@@ -164,6 +180,42 @@ std::string castor::tape::aggregator::RtcpdHandlerThread::getVidFromRtcpd(
throw ie; throw ie;
} }
// Send the request for a VID to RTCPD
const int rc = netwrite_timeout(socket.socket(), buf, totalLen,
netReadWriteTimeout);
if(rc == -1) {
castor::exception::Exception ex(SECOMERR);
ex.getMessage() << __PRETTY_FUNCTION__
<< ": Failed netwrite of request for VID to RTCPD: " << neterror();
throw ex;
} else if(rc == 0) {
castor::exception::Exception ex(SECONNDROP);
ex.getMessage() << __PRETTY_FUNCTION__
<< ": Failed netwrite of request for VID to RTCPD: Connection dropped";
throw ex;
}
// Receive acknowlege from RTCPD
uint32_t status = 0;
try {
status = receiveRtcpdAcknowledge(socket, netReadWriteTimeout);
} catch(castor::exception::Exception &ex) {
castor::exception::Exception ex2(EPROTO);
ex2.getMessage() << "Failed to receive acknowlege from RTCPD: "
<< ex.getMessage().str();
throw ex2;
}
if(status != 0) {
castor::exception::Exception ex(ECANCELED);
ex.getMessage() << "Recieved negative acknowledge from RTCPD";
throw ex;
}
return "NOTIMP"; return "NOTIMP";
/* /*
SOCKET s = socket.socket(); SOCKET s = socket.socket();
...@@ -226,3 +278,76 @@ std::string castor::tape::aggregator::RtcpdHandlerThread::getVidFromRtcpd( ...@@ -226,3 +278,76 @@ std::string castor::tape::aggregator::RtcpdHandlerThread::getVidFromRtcpd(
return tapereq.vid; return tapereq.vid;
*/ */
} }
//-----------------------------------------------------------------------------
// receiveRtcpdAcknowledge
//-----------------------------------------------------------------------------
uint32_t castor::tape::aggregator::RtcpdHandlerThread::receiveRtcpdAcknowledge(
castor::io::AbstractTCPSocket &socket, const int netReadWriteTimeout)
throw(castor::exception::Exception) {
// Read and unmarshall the magic number
uint32_t magic = 0;
try {
magic = SocketHelper::readUint32(socket, NETRWTIMEOUT);
} catch (castor::exception::Exception &ex) {
castor::exception::Internal ie;
ie.getMessage() << __PRETTY_FUNCTION__
<< ": Failed to read and unmarshall magic number: "
<< ex.getMessage().str();
throw ie;
}
if(magic != RTCOPY_MAGIC) {
castor::exception::Exception ex(EINVAL);
ex.getMessage() << __PRETTY_FUNCTION__
<< ": Wrong magic number: Expected: " << std::hex << RTCOPY_MAGIC
<< ": Received: " << magic;
throw ex;
}
// Read and unmarshall the request type
uint32_t reqtype = 0;
try {
reqtype = SocketHelper::readUint32(socket, NETRWTIMEOUT);
} catch (castor::exception::Exception &ex) {
castor::exception::Internal ie;
ie.getMessage() << __PRETTY_FUNCTION__
<< ": Failed to read and unmarshall request type: "
<< ex.getMessage().str();
throw ie;
}
if(reqtype != RTCP_TAPEERR_REQ) {
castor::exception::Exception ex(EINVAL);
ex.getMessage() << __PRETTY_FUNCTION__
<< ": Wrong request type: Expected: " << std::hex << RTCP_TAPEERR_REQ
<< ": Received: " << reqtype;
throw ex;
}
// Read and unmarshall the status
uint32_t status = 0;
try {
status = SocketHelper::readUint32(socket, NETRWTIMEOUT);
} catch (castor::exception::Exception &ex) {
castor::exception::Internal ie;
ie.getMessage() << __PRETTY_FUNCTION__
<< ": Failed to read and unmarshall status: "
<< ex.getMessage().str();
throw ie;
}
return status;
}
...@@ -76,8 +76,18 @@ namespace aggregator { ...@@ -76,8 +76,18 @@ namespace aggregator {
* @param socket The socket of the connection with RTCPD. * @param socket The socket of the connection with RTCPD.
* @return The VID. * @return The VID.
*/ */
std::string getVidFromRtcpd(castor::io::AbstractTCPSocket &socket) std::string getVidFromRtcpd(castor::io::AbstractTCPSocket &socket,
throw(castor::exception::Exception); const int netReadWriteTimeout) throw(castor::exception::Exception);
/**
* Receives an acknowledge message from RTCPD and returns the status code
* embedded within the message.
*
* @param socket The socket of the connection with RTCPD.
* @return The status code embedded within the message.
*/
uint32_t receiveRtcpdAcknowledge(castor::io::AbstractTCPSocket &socket,
const int netReadWriteTimeout) throw(castor::exception::Exception);
}; // class RtcpdHandlerThread }; // class RtcpdHandlerThread
......
...@@ -93,10 +93,11 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::run(void *param) ...@@ -93,10 +93,11 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::run(void *param)
} catch(castor::exception::Exception &ex) { } catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = { castor::dlf::Param params[] = {
castor::dlf::Param("Message", ex.getMessage().str()), castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Code" , ex.code())}; castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR, castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_HANDLE_VDQM_REQUEST_EXCEPT, 2, params); AGGREGATOR_HANDLE_VDQM_REQUEST_EXCEPT, 3, params);
} }
// Close and de-allocate the socket // Close and de-allocate the socket
...@@ -123,13 +124,14 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::dispatchRequest( ...@@ -123,13 +124,14 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::dispatchRequest(
// Read and unmarshall the magic number of the request // Read and unmarshall the magic number of the request
uint32_t magic = 0; uint32_t magic = 0;
try { try {
magic = SocketHelper::readUint32(socket, NETRW_TIMEOUT); magic = SocketHelper::readUint32(socket, NETRWTIMEOUT);
} catch (castor::exception::Exception &ex) { } catch (castor::exception::Exception &ex) {
castor::dlf::Param params[] = { castor::dlf::Param params[] = {
castor::dlf::Param("Message", ex.getMessage().str()), castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Code" , ex.code())}; castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR, castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_READ_MAGIC, 2, params); AGGREGATOR_FAILED_TO_READ_MAGIC, 3, params);
return; return;
} }
...@@ -139,8 +141,9 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::dispatchRequest( ...@@ -139,8 +141,9 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::dispatchRequest(
if(handlerMapItor == m_magicToHandlers.end()) { if(handlerMapItor == m_magicToHandlers.end()) {
// Unknown magic number // Unknown magic number
castor::dlf::Param params[] = { castor::dlf::Param params[] = {
castor::dlf::Param("Function" , __PRETTY_FUNCTION__),
castor::dlf::Param("Magic Number", magic)}; castor::dlf::Param("Magic Number", magic)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR, AGGREGATOR_UNKNOWN_MAGIC, 1, castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR, AGGREGATOR_UNKNOWN_MAGIC, 2,
params); params);