Commit 6c26de94 authored by Steven Murray's avatar Steven Murray
Browse files

Increased the number of data types that can be marshalled.

parent b5b361ba
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Tue Dec 2 10:42:34 CET 2008
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Tue Dec 2 16:35:03 CET 2008
*/
/******************************************************************************
......@@ -55,8 +55,10 @@ AGGREGATOR_FAILED_TO_SEND_RTCP_ACKN_TO_VDQM=18, /* "Failed to send RTCP acknowle
AGGREGATOR_RECEIVED_RCP_JOB_FROM_UNAUTHORIZED_HOST=19, /* "Received a remote copy job from an unauthorized host" */
AGGREGATOR_RECEIVED_RTCPD_ERROR_MESSAGE=20, /* "Received error message from RTCPD" */
AGGREGATOR_RTCPD_HANDLER_SOCKET_IS_NULL=21, /* "The RtcpdHandlerThread has been passed a NULL socket pointer" */
AGGREGATOR_RTCPD_CONNECTION_WITH_INFO=22, /* "Recieved a connection from RTCPD with peer host and port information" */
AGGREGATOR_RTCPD_CONNECTION_WITHOUT_INFO=23 /* "Recieved a connection from RTCPD without peer host and port information" */
AGGREGATOR_RTCPD_CONNECTION_WITH_INFO=22, /* "Recieved a connection from RTCPD" */
AGGREGATOR_RTCPD_CONNECTION_WITHOUT_INFO=23, /* "Recieved a connection from RTCPD without peer host and port information" */
AGGREGATOR_GOT_VID_FROM_RTCPD=24, /* "Got VID from RTCPD" */
AGGREGATOR_FAILED_TO_GET_VID_FROM_RTCPD=25 /* "Failed to VID from RTCPD" */
}; // enum AggregatorDlfMessages
} // namespace aggregator
} // namespace tape
......
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Tue Dec 2 10:42:34 CET 2008
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Tue Dec 2 16:35:03 CET 2008
*/
/******************************************************************************
......@@ -51,6 +51,8 @@ castor::dlf::Message castor::tape::aggregator::AggregatorDaemon::s_dlfMessages[]
{AGGREGATOR_RECEIVED_RCP_JOB_FROM_UNAUTHORIZED_HOST, "Received a remote copy job from an unauthorized host"},
{AGGREGATOR_RECEIVED_RTCPD_ERROR_MESSAGE, "Received error message from RTCPD"},
{AGGREGATOR_RTCPD_HANDLER_SOCKET_IS_NULL, "The RtcpdHandlerThread has been passed a NULL socket pointer"},
{AGGREGATOR_RTCPD_CONNECTION_WITH_INFO, "Recieved a connection from RTCPD with peer host and port information"},
{AGGREGATOR_RTCPD_CONNECTION_WITH_INFO, "Recieved a connection from RTCPD"},
{AGGREGATOR_RTCPD_CONNECTION_WITHOUT_INFO, "Recieved a connection from RTCPD without peer host and port information"},
{AGGREGATOR_GOT_VID_FROM_RTCPD, "Got VID from RTCPD"},
{AGGREGATOR_FAILED_TO_GET_VID_FROM_RTCPD, "Failed to VID from RTCPD"},
{-1, ""}};
......@@ -20,5 +20,7 @@ AGGREGATOR_FAILED_TO_SEND_RTCP_ACKN_TO_VDQM,18,"Failed to send RTCP acknowledge
AGGREGATOR_RECEIVED_RCP_JOB_FROM_UNAUTHORIZED_HOST,19,"Received a remote copy job from an unauthorized host"
AGGREGATOR_RECEIVED_RTCPD_ERROR_MESSAGE,20,"Received error message from RTCPD"
AGGREGATOR_RTCPD_HANDLER_SOCKET_IS_NULL,21,"The RtcpdHandlerThread has been passed a NULL socket pointer"
AGGREGATOR_RTCPD_CONNECTION_WITH_INFO,22,"Recieved a connection from RTCPD with peer host and port information"
AGGREGATOR_RTCPD_CONNECTION_WITH_INFO,22,"Recieved a connection from RTCPD"
AGGREGATOR_RTCPD_CONNECTION_WITHOUT_INFO,23,"Recieved a connection from RTCPD without peer host and port information"
AGGREGATOR_GOT_VID_FROM_RTCPD,24,"Got VID from RTCPD"
AGGREGATOR_FAILED_TO_GET_VID_FROM_RTCPD,25,"Failed to VID from RTCPD"
......@@ -20,7 +20,7 @@
*
*
*
* @author Castor dev team
* @author Steven Murray Steven.Murray@cern.ch
*****************************************************************************/
#ifndef CASTOR_TAPE_AGGREGATOR_CONSTANTS_HPP
......
/******************************************************************************
* Marshaller.cpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
*
* @author Steven Murray Steven.Murray@cern.ch
*****************************************************************************/
#include "castor/tape/aggregator/Marshaller.hpp"
#include "h/rtcp_constants.h"
#include "h/vdqm_constants.h"
#include <arpa/inet.h>
#include <errno.h>
......@@ -6,6 +33,48 @@
#include <string.h>
//------------------------------------------------------------------------------
// marshallUint8
//------------------------------------------------------------------------------
void castor::tape::aggregator::Marshaller::marshallUint8(uint8_t src,
char * &dst) throw (castor::exception::Exception) {
if(dst == NULL) {
castor::exception::Exception ex(EINVAL);
ex.getMessage() << "Pointer to destination buffer is NULL";
throw ex;
}
src = htonl(src);
memcpy(dst, &src, sizeof(src));
dst += sizeof(src);
}
//------------------------------------------------------------------------------
// marshallUint16
//------------------------------------------------------------------------------
void castor::tape::aggregator::Marshaller::marshallUint16(uint16_t src,
char * &dst) throw (castor::exception::Exception) {
if(dst == NULL) {
castor::exception::Exception ex(EINVAL);
ex.getMessage() << "Pointer to destination buffer is NULL";
throw ex;
}
src = htonl(src);
memcpy(dst, &src, sizeof(src));
dst += sizeof(src);
}
//------------------------------------------------------------------------------
// marshallUint32
//------------------------------------------------------------------------------
......@@ -159,3 +228,130 @@ void castor::tape::aggregator::Marshaller::unmarshallString(char * &src,
throw ex;
}
}
//-----------------------------------------------------------------------------
// marshallRTCPAckn
//-----------------------------------------------------------------------------
size_t castor::tape::aggregator::Marshaller::marshallRTCPAckn(
char *dst, const size_t dstLen, const uint32_t status, const char *errorMsg)
throw (castor::exception::Exception) {
if(dst == NULL) {
castor::exception::Exception ex(EINVAL);
ex.getMessage() << "Pointer to destination buffer is NULL";
throw ex;
}
// Minimum buffer length = magic number + request type + length + status code
// + the string termination character '\0'
if(dstLen < 4*sizeof(uint32_t) + 1) {
castor::exception::Exception ex(EINVAL);
ex.getMessage() << "Destination buffer length is too small: "
"Expected minimum length: " << (4*sizeof(uint32_t) + 1)
<< ": Actual length: " << dstLen;
throw ex;
}
if(errorMsg == NULL) {
castor::exception::Exception ex(EINVAL);
ex.getMessage() << "Pointer to error message is NULL";
throw ex;
}
const size_t errorMsgLen = strlen(errorMsg);
const size_t maxErrorMsgLen = dstLen - 4*sizeof(uint32_t) - 1;
const size_t errorMsg2SendLen = errorMsgLen > maxErrorMsgLen ? maxErrorMsgLen
: errorMsgLen;
// Length of message body equals the size fo the status code plus the length
// of the error message string plus the size of the string termination
// characater '\0'
const uint32_t len = sizeof(uint32_t) + errorMsg2SendLen + 1;
marshallUint32(RTCOPY_MAGIC_OLD0, dst); // Magic number
marshallUint32(VDQM_CLIENTINFO , dst); // Request type
marshallUint32(len , dst); // Length
marshallUint32(status , dst); // status code
memcpy(dst, errorMsg, errorMsg2SendLen);
*(dst+errorMsg2SendLen) = '\0';
return 3 * sizeof(uint32_t) + len; // Header + body
}
//-----------------------------------------------------------------------------
// marshallRTCPTapeRequest
//-----------------------------------------------------------------------------
size_t castor::tape::aggregator::Marshaller::marshallRTCPTapeRequest(char *dst,
const size_t dstLen, const RtcpTapeRequest &request)
throw (castor::exception::Exception) {
// Calculate the length of the message body
const uint32_t len = 14 * sizeof(uint32_t) + strlen(request.vid) +
strlen(request.vsn) + strlen(request.label) + strlen(request.devtype) +
strlen(request.density) + strlen(request.unit) + 6 + sizeof(Cuuid_t) +
// Plus the size of the err member which is of type rtcpErrMsg_t
4 * sizeof(uint32_t) + strlen(request.err.errmsgtxt) + 1;
// Calculate the total length of the message
// Message header = magic + reqtype + len = 3 * sizeof(uint32_t)
const size_t totalLen = 3 * sizeof(uint32_t) + len;
// Check that the message buffer is big enough
if(sizeof(dstLen) < totalLen) {
castor::exception::Exception ex(EMSGSIZE);
ex.getMessage() << "Buffer too small for tape request message: "
"Required size: " << totalLen << " Actual size: " << dstLen;
throw ex;
}
char *p = dst;
marshallUint32(RTCOPY_MAGIC , p);
marshallUint32(RTCP_TAPEERR_REQ , p);
marshallUint32(len , p);
marshallString(request.vid , p);
marshallString(request.vsn , p);
marshallString(request.label , p);
marshallString(request.devtype , p);
marshallString(request.density , p);
marshallString(request.unit , p);
marshallUint32(request.VolReqID , p);
marshallUint32(request.jobID , p);
marshallUint32(request.mode , p);
marshallUint32(request.start_file , p);
marshallUint32(request.end_file , p);
marshallUint32(request.side , p);
marshallUint32(request.tprc , p);
marshallUint32(request.TStartRequest , p);
marshallUint32(request.TEndRequest , p);
marshallUint32(request.TStartRtcpd , p);
marshallUint32(request.TStartMount , p);
marshallUint32(request.TEndMount , p);
marshallUint32(request.TStartUnmount , p);
marshallUint32(request.TEndUnmount , p);
marshallUint32(request.rtcpReqId.time_low , p);
marshallUint16(request.rtcpReqId.time_mid , p);
marshallUint16(request.rtcpReqId.time_hi_and_version , p);
marshallUint8(request.rtcpReqId.clock_seq_hi_and_reserved, p);
marshallUint8(request.rtcpReqId.clock_seq_low , p);
marshallUint8(request.rtcpReqId.node[0] , p);
marshallUint8(request.rtcpReqId.node[1] , p);
marshallUint8(request.rtcpReqId.node[2] , p);
marshallUint8(request.rtcpReqId.node[3] , p);
marshallUint8(request.rtcpReqId.node[4] , p);
marshallUint8(request.rtcpReqId.node[5] , p);
marshallString(request.err.errmsgtxt , p);
marshallUint32(request.err.severity , p);
marshallUint32(request.err.errorcode , p);
marshallUint32(request.err.max_tpretry , p);
marshallUint32(request.err.max_cpretry , p);
return totalLen;
}
......@@ -20,13 +20,14 @@
*
*
*
* @author Castor dev team
* @author Steven Murray Steven.Murray@cern.ch
*****************************************************************************/
#ifndef CASTOR_TAPE_AGGREGATOR_MARSHALLER_HPP
#define CASTOR_TAPE_AGGREGATOR_MARSHALLER_HPP 1
#include "castor/exception/Exception.hpp"
#include "castor/tape/aggregator/RtcpTapeRequest.hpp"
#include <stdint.h>
#include <string>
......@@ -42,6 +43,32 @@ namespace aggregator {
class Marshaller {
public:
/**
* Marshalls the specified unsigned 8-bit integer into the specified
* destination buffer.
*
* @param src the unsigned 8-bit integer to be marshalled
* @param dst in/out parameter, before invocation points to the destination
* buffer where the unsigned 8-bit integer should be marshalled to and on
* return points to the byte in the destination buffer immediately after
* the marshalled unsigned 8-bit integer
*/
static void marshallUint8(uint8_t src, char * &dst)
throw(castor::exception::Exception);
/**
* Marshalls the specified unsigned 16-bit integer into the specified
* destination buffer.
*
* @param src the unsigned 16-bit integer to be marshalled
* @param dst in/out parameter, before invocation points to the destination
* buffer where the unsigned 16-bit integer should be marshalled to and on
* return points to the byte in the destination buffer immediately after
* the marshalled unsigned 16-bit integer
*/
static void marshallUint16(uint16_t src, char * &dst)
throw(castor::exception::Exception);
/**
* Marshalls the specified unsigned 32-bit integer into the specified
* destination buffer.
......@@ -116,6 +143,33 @@ namespace aggregator {
static void unmarshallString(char * &src, size_t &srcLen, char *dst,
const size_t dstLen) throw(castor::exception::Exception);
/**
* Marshalls the specified status code and possible error message into
* the specified destination buffer in order to create an RTCP acknowledge
* message.
*
* @param dst The destination message buffer
* @param dstLen The length of the destination buffer
* @param status The status code to be marshalled
* @param errorMsg The error message to be marshalled
* @return The total length of the message (header + body)
*/
static size_t marshallRTCPAckn(char *dst, const size_t dstLen,
const uint32_t status, const char *errorMsg)
throw (castor::exception::Exception);
/**
* Marshalls the specified tape request into the specified destination
* buffer in order to create an RTCP tape request message.
*
* @param dst The destination message buffer
* @param dstLen The length of the destination buffer
* @param request The tape request
* @return The total length of the message (header + body)
*/
static size_t marshallRTCPTapeRequest(char *dst, const size_t dstLen,
const RtcpTapeRequest &request) throw (castor::exception::Exception);
}; // class Utils
} // namespace aggregator
......
......@@ -20,7 +20,7 @@
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
* @author Steven Murray Steven.Murray@cern.ch
*****************************************************************************/
#include "castor/exception/InvalidArgument.hpp"
......
......@@ -20,7 +20,7 @@
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
* @author Steven Murray Steven.Murray@cern.ch
*****************************************************************************/
#ifndef _CASTOR_TAPE_AGGREGATOR_RCPJOBSUBMITTER_HPP_
#define _CASTOR_TAPE_AGGREGATOR_RCPJOBSUBMITTER_HPP_
......
......@@ -32,8 +32,7 @@
#include "castor/tape/aggregator/SocketHelper.hpp"
#include "castor/tape/aggregator/exception/RTCPDErrorMessage.hpp"
#include "h/common.h"
#include "h/rtcp_constants.h"
#include "h/vdqm_constants.h"
#include "h/rtcp.h"
//-----------------------------------------------------------------------------
......@@ -77,6 +76,7 @@ void castor::tape::aggregator::RtcpdHandlerThread::run(void *param)
castor::io::ServerSocket *socket = (castor::io::ServerSocket*)param;
// Log the new connection
try {
unsigned short port = 0; // Client port
unsigned long ip = 0; // Client IP
......@@ -98,6 +98,37 @@ void castor::tape::aggregator::RtcpdHandlerThread::run(void *param)
AGGREGATOR_RTCPD_CONNECTION_WITHOUT_INFO, 2, params);
}
std::string vid;
{
castor::dlf::Param params[] = {
castor::dlf::Param("Message", "Just before getVidFromRtcpd")};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_NULL, 1, params);
}
try {
vid = getVidFromRtcpd(*socket);
{
castor::dlf::Param params[] = {
castor::dlf::Param("Message", "Just after getVidFromRtcpd")};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_NULL, 1, params);
}
castor::dlf::Param params[] = {
castor::dlf::Param("VID", vid)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_GOT_VID_FROM_RTCPD, 1, params);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Message", ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_GET_VID_FROM_RTCPD, 2, params);
}
// Close and de-allocate the socket
socket->close();
delete socket;
......@@ -110,3 +141,73 @@ void castor::tape::aggregator::RtcpdHandlerThread::run(void *param)
void castor::tape::aggregator::RtcpdHandlerThread::stop()
throw() {
}
//-----------------------------------------------------------------------------
// getVidFromRtcpd
//-----------------------------------------------------------------------------
std::string castor::tape::aggregator::RtcpdHandlerThread::getVidFromRtcpd(
castor::io::AbstractTCPSocket &socket) throw(castor::exception::Exception) {
return "NOTIMP";
/*
SOCKET s = socket.socket();
rtcpHdr_t hdr;
rtcpTapeRequest_t tapereq;
hdr.magic = RTCOPY_MAGIC;
hdr.reqtype = RTCP_TAPEERR_REQ;
hdr.len = -1;
tapereq.vid[0] = '\0';
if(rtcp_SendReq(&s, &hdr, NULL, &tapereq, NULL) == -1) {
const int se = serrno;
castor::exception::Exception ex(se);
ex.getMessage() << "Failed to send request for VID to RTCPD: "
<< sstrerror(se);
throw ex;
}
if(rtcp_RecvAckn(&s, hdr.reqtype) == -1) {
const int se = serrno;
castor::exception::Exception ex(se);
ex.getMessage() << "Failed to receive acknowldege from RTCPD after sending"
" request for VID: " << sstrerror(se);
throw ex;
}
if(rtcp_RecvReq(&s, &hdr, NULL, &tapereq, NULL) == -1) {
const int se = serrno;
castor::exception::Exception ex(se);
ex.getMessage() << "Failed to receive VID from RTCPD after receiving the"
" acknowledge: " << sstrerror(se);
throw ex;
}
if(rtcp_SendAckn(&s, hdr.reqtype) == -1) {
const int se = serrno;
castor::exception::Exception ex(se);
ex.getMessage() << "Failed to send acknowledge to RTCPD after receiving"
" the VID: " << sstrerror(se);
throw ex;
}
if(tapereq.VolReqID <= 0) {
castor::exception::Exception ex(EINVAL);
ex.getMessage() << "Failed to get VID from RTCPD: No volume request ID";
throw ex;
}
return tapereq.vid;
*/
}
......@@ -68,6 +68,17 @@ namespace aggregator {
*/
virtual void stop() throw();
private:
/**
* Get the VID associated with the remote copy job from RTCPD.
*
* @param socket The socket of the connection with RTCPD.
* @return The VID.
*/
std::string getVidFromRtcpd(castor::io::AbstractTCPSocket &socket)
throw(castor::exception::Exception);
}; // class RtcpdHandlerThread
} // namespace aggregator
......
......@@ -18,10 +18,9 @@
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
* @author castor dev team
* @author Steven Murray Steven.Murray@cern.ch
*****************************************************************************/
#include "castor/exception/Internal.hpp"
#include "castor/io/AbstractTCPSocket.hpp"
#include "castor/tape/aggregator/Constants.hpp"
......
......@@ -19,7 +19,7 @@
*
*
*
* @author castor dev team
* @author Steven Murray Steven.Murray@cern.ch
*****************************************************************************/
#ifndef CASTOR_TAPE_AGGREGATOR_SOCKETHELPER_HPP
......
......@@ -357,8 +357,8 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::handleJobSubmission(
size_t ackMsgLen = 0;
try {
ackMsgLen = marshallRTCPAckn(ackMsg, sizeof(ackMsg), errorStatusForVdqm,
errorMessageForVdqm.c_str());
ackMsgLen = Marshaller::marshallRTCPAckn(ackMsg, sizeof(ackMsg),
errorStatusForVdqm, errorMessageForVdqm.c_str());
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Message", ex.getMessage().str()),
......@@ -390,57 +390,3 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::handleJobSubmission(
AGGREGATOR_FAILED_TO_SEND_RTCP_ACKN_TO_VDQM, 2, params);
}
}
//-----------------------------------------------------------------------------
// marshallRTCPAckn
//-----------------------------------------------------------------------------
size_t castor::tape::aggregator::VdqmRequestHandlerThread::marshallRTCPAckn(
char *dst, const size_t dstLen, const uint32_t status, const char *errorMsg)
throw (castor::exception::Exception) {
if(dst == NULL) {
castor::exception::Exception ex(EINVAL);
ex.getMessage() << "Pointer to destination buffer is NULL";
throw ex;
}
// Minimum buffer length = magic number + request type + length + status code
// + the string termination character '\0'
if(dstLen < 4*sizeof(uint32_t) + 1) {
castor::exception::Exception ex(EINVAL);
ex.getMessage() << "Destination buffer length is too small: "
"Expected minimum length: " << (4*sizeof(uint32_t) + 1)
<< ": Actual length: " << dstLen;
throw ex;
}
if(errorMsg == NULL) {
castor::exception::Exception ex(EINVAL);
ex.getMessage() << "Pointer to error message is NULL";
throw ex;
}
const size_t errorMsgLen = strlen(errorMsg);
const size_t maxErrorMsgLen = dstLen - 4*sizeof(uint32_t) - 1;
const size_t errorMsg2SendLen = errorMsgLen > maxErrorMsgLen ? maxErrorMsgLen
: errorMsgLen;
// Length of message body equals the size fo the status code plus the length
// of the error message string plus the size of the string termination
// characater '\0'
const uint32_t len = sizeof(uint32_t) + errorMsg2SendLen + 1;
Marshaller::marshallUint32(RTCOPY_MAGIC_OLD0, dst); // Magic number
Marshaller::marshallUint32(VDQM_CLIENTINFO , dst); // Request type
Marshaller::marshallUint32(len , dst); // Length
Marshaller::marshallUint32(status , dst); // status code
memcpy(dst, errorMsg, errorMsg2SendLen);
*(dst+errorMsg2SendLen) = '\0';