Commit d0f56936 authored by Steven Murray's avatar Steven Murray
Browse files

Added first attempt at remote copy job submitter (RCPJobSubmitter).

parent fadecf55
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Fri Nov 7 18:42:23 CET 2008
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Wed Nov 12 13:48:41 CET 2008
*/
/******************************************************************************
......@@ -39,7 +39,10 @@ AGGREGATOR_FAILED_TO_PARSE_COMMAND_LINE=2, /* "Failed to parse the command line"
AGGREGATOR_FAILED_TO_INIT_DB_SERVICE=3, /* "Failed to initialise database service" */
AGGREGATOR_REQUEST_HANDLER_SOCKET_IS_NULL=4, /* "The RequestHandlerThread has been passed a NULL socket pointer" */
AGGREGATOR_HANDLE_REQUEST_EXCEPT=5, /* "Exception raised by castor::tape::aggregator::RequestHandlerThread::handleRequest" */
AGGREGATOR_FAILED_TO_READ_MAGIC=6 /* "Failed to read magic number from socket" */
AGGREGATOR_FAILED_TO_READ_MAGIC=6, /* "Failed to read magic number from socket" */
AGGREGATOR_UNKNOWN_MAGIC=7, /* "Unknown magic number" */
AGGREGATOR_FAILED_TO_READ_REQUEST_TYPE=8, /* "Failed to read request type from socket" */
AGGREGATOR_UNKNOWN_REQUEST_TYPE=9 /* "Unknown request type" */
}; // enum AggregatorDlfMessages
} // namespace aggregator
} // namespace tape
......
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Fri Nov 7 18:42:23 CET 2008
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Wed Nov 12 13:48:41 CET 2008
*/
/******************************************************************************
......@@ -36,4 +36,7 @@ castor::dlf::Message castor::tape::aggregator::AggregatorDaemon::s_dlfMessages[]
{AGGREGATOR_REQUEST_HANDLER_SOCKET_IS_NULL, "The RequestHandlerThread has been passed a NULL socket pointer"},
{AGGREGATOR_HANDLE_REQUEST_EXCEPT, "Exception raised by castor::tape::aggregator::RequestHandlerThread::handleRequest"},
{AGGREGATOR_FAILED_TO_READ_MAGIC, "Failed to read magic number from socket"},
{AGGREGATOR_UNKNOWN_MAGIC, "Unknown magic number"},
{AGGREGATOR_FAILED_TO_READ_REQUEST_TYPE, "Failed to read request type from socket"},
{AGGREGATOR_UNKNOWN_REQUEST_TYPE, "Unknown request type"},
{-1, ""}};
......@@ -5,3 +5,6 @@ AGGREGATOR_FAILED_TO_INIT_DB_SERVICE,3,"Failed to initialise database service"
AGGREGATOR_REQUEST_HANDLER_SOCKET_IS_NULL,4,"The RequestHandlerThread has been passed a NULL socket pointer"
AGGREGATOR_HANDLE_REQUEST_EXCEPT,5,"Exception raised by castor::tape::aggregator::RequestHandlerThread::handleRequest"
AGGREGATOR_FAILED_TO_READ_MAGIC,6,"Failed to read magic number from socket"
AGGREGATOR_UNKNOWN_MAGIC,7,"Unknown magic number"
AGGREGATOR_FAILED_TO_READ_REQUEST_TYPE,8,"Failed to read request type from socket"
AGGREGATOR_UNKNOWN_REQUEST_TYPE,9,"Unknown request type"
/******************************************************************************
* Constants.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
*
* @author Castor dev team
*****************************************************************************/
#ifndef CASTOR_TAPE_AGGREGATOR_CONSTANTS_HPP
#define CASTOR_TAPE_AGGREGATOR_CONSTANTS_HPP 1
#include <stdint.h>
namespace castor {
namespace tape {
namespace aggregator {
const size_t STRERRORBUFLEN = 256;
const int NETRW_TIMEOUT = 300;
const size_t HDRBUFSIZ = 3 * sizeof(uint32_t);
const size_t MSGBUFSIZ = 1024;
} // namespace aggregator
} // namespace tape
} // namespace castor
#endif // CASTOR_TAPE_AGGREGATOR_CONSTANTS_HPP
......@@ -42,6 +42,7 @@ AGGREGATORBIN_SRCS = \
AggregatorDaemon.cpp \
AggregatorMain.cpp \
Marshaller.cpp \
RCPJobSubmitter.cpp \
RequestHandlerThread.cpp \
SocketHelper.cpp
......
......@@ -4,6 +4,20 @@
#include <string.h>
//------------------------------------------------------------------------------
// marshallUint32
//------------------------------------------------------------------------------
char* castor::tape::aggregator::Marshaller::marshallUint32(char *const ptr,
uint32_t value) throw() {
value = htonl(value);
memcpy(ptr, &value, sizeof(value));
return ptr + sizeof(value);
}
//------------------------------------------------------------------------------
// marshallString
//------------------------------------------------------------------------------
......@@ -16,15 +30,26 @@ char* castor::tape::aggregator::Marshaller::marshallString(char *const ptr,
}
//------------------------------------------------------------------------------
// marshallString
//------------------------------------------------------------------------------
char* castor::tape::aggregator::Marshaller::marshallString(char *const ptr,
const std::string value) throw() {
strcpy(ptr, value.c_str());
return ptr + strlen(value.c_str()) + 1;
}
//------------------------------------------------------------------------------
// unmarshallInt32
//------------------------------------------------------------------------------
char* castor::tape::aggregator::Marshaller::unmarshallUint32(char *const ptr,
uint32_t &value) throw() {
value = htonl(value);
memcpy(ptr, &value, sizeof(value));
memcpy(&value, ptr, sizeof(value));
value = ntohl(value);
return ptr + sizeof(value);
}
......@@ -27,6 +27,7 @@
#define CASTOR_TAPE_AGGREGATOR_MARSHALLER_HPP 1
#include <stdint.h>
#include <string>
namespace castor {
......@@ -39,6 +40,16 @@ namespace aggregator {
class Marshaller {
public:
/**
* Marshalls the specified unsigned 32-bit integer into the specified
* message buffer.
*
* @param ptr pointer to where the value should be marshalled
* @param value the value to be marshalled
* @return pointer to the first byte after the marshalled value
*/
static char* marshallUint32(char *const ptr, uint32_t value) throw();
/**
* Marshalls the specified string into the specified message buffer.
*
......@@ -49,6 +60,16 @@ namespace aggregator {
static char* marshallString(char *const ptr, const char *const value)
throw();
/**
* Marshalls the specified string into the specified message buffer.
*
* @param ptr pointer to where the value should be marshalled
* @param value the value to be marshalled
* @return pointer to the first byte after the marshalled value
*/
static char* marshallString(char *const ptr, const std::string value)
throw();
/**
* Unmarshalls an unsigned 32 bit integer from the specified buffer.
*
......
/******************************************************************************
* RCPJobSubmitter.cpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "castor/exception/InvalidArgument.hpp"
#include "castor/io/ClientSocket.hpp"
#include "castor/tape/aggregator/Constants.hpp"
#include "castor/tape/aggregator/Marshaller.hpp"
#include "castor/tape/aggregator/RCPJobSubmitter.hpp"
#include "h/net.h"
#include "h/osdep.h"
#include "h/rtcp_constants.h"
#include "h/serrno.h"
#include "h/vdqm_constants.h"
#include <errno.h>
#include <unistd.h>
//------------------------------------------------------------------------------
// submit
//------------------------------------------------------------------------------
void castor::tape::aggregator::RCPJobSubmitter::submit(
const std::string &host,
const unsigned port,
const char *remoteCopyType,
const u_signed64 tapeRequestID,
const std::string &clientUserName,
const std::string &clientHost,
const int clientPort,
const int clientEuid,
const int clientEgid,
const std::string &deviceGroupName,
const std::string &tapeDriveName)
throw (castor::exception::Exception) {
castor::io::ClientSocket socket(port, host);
try {
socket.connect();
} catch(castor::exception::Exception &connectex) {
castor::exception::Exception ex(ECONNABORTED);
ex.getMessage() << "Failed to connect to " << remoteCopyType
<< ": host: " << host << ": port: " << port << ": "
<< connectex.getMessage().str();
}
// Let's count the length of the message for the header.
// Please notice: Normally the ID is a 64Bit number!!
// But for historical reasons, we will do a downcast of the
// id, which is then still unique, because a tapeRequest has
// a very short lifetime, compared to the number of new IDs
int len =
4*sizeof(uint32_t) +
clientHost.length() +
deviceGroupName.length() +
tapeDriveName.length() +
clientUserName.length() +
4; // 4 = the number of string termination characters
char buf[MSGBUFSIZ];
char *p = buf;
p = Marshaller::marshallUint32(p, RTCOPY_MAGIC_OLD0); // Magic number
p = Marshaller::marshallUint32(p, VDQM_CLIENTINFO); // Request type
p = Marshaller::marshallUint32(p, len);
p = Marshaller::marshallUint32(p, tapeRequestID); // Downcast to 32-bits
p = Marshaller::marshallUint32(p, clientPort);
p = Marshaller::marshallUint32(p, clientEuid);
p = Marshaller::marshallUint32(p, clientEgid);
p = Marshaller::marshallString(p, clientHost);
p = Marshaller::marshallString(p, deviceGroupName);
p = Marshaller::marshallString(p, tapeDriveName);
p = Marshaller::marshallString(p, clientUserName);
// Calculate the length of header + body ready for netwrite
len += 3 * sizeof(uint32_t);
// After marshalling we can send the information to the RTCPD or tape
// aggregator daemon
int rc = netwrite_timeout(socket.socket(), buf, len, NETRW_TIMEOUT);
if (rc == -1) {
serrno = SECOMERR;
castor::exception::Exception ex(serrno);
ex.getMessage() << __PRETTY_FUNCTION__
<< ": netwrite(REQ) to " << remoteCopyType << ": " << neterror();
throw ex;
}
else if (rc == 0) {
serrno = SECONNDROP;
castor::exception::Exception ex(serrno);
ex.getMessage() << __PRETTY_FUNCTION__
<< ": netwrite(REQ) to " << remoteCopyType << ": connection dropped";
throw ex;
}
readReply(socket, remoteCopyType);
}
//------------------------------------------------------------------------------
// readReply
//------------------------------------------------------------------------------
void castor::tape::aggregator::RCPJobSubmitter::readReply(
castor::io::AbstractTCPSocket &socket, const char *remoteCopyType)
throw (castor::exception::Exception) {
char buf[MSGBUFSIZ];
int rc = netread_timeout(socket.socket(), buf, sizeof(uint32_t)*3,
NETRW_TIMEOUT);
switch (rc) {
case -1:
{
serrno = SECOMERR;
castor::exception::Exception ex(serrno);
ex.getMessage() << __PRETTY_FUNCTION__
<< ": netread(HDR) from " << remoteCopyType << ": " << neterror();
throw ex;
}
case 0:
{
serrno = SECONNDROP;
castor::exception::Exception ex(serrno);
ex.getMessage() << __PRETTY_FUNCTION__
<< ": netread(HDR) from " << remoteCopyType << ": connection dropped";
throw ex;
}
}
char *p = buf;
// Unmarshall the magic number
uint32_t magic = 0;
p = Marshaller::unmarshallUint32(p, magic);
// If the magic number is invalid
if(magic != RTCOPY_MAGIC && magic != RTCOPY_MAGIC_OLD0) {
castor::exception::Exception ex(EBADMSG);
ex.getMessage() << __PRETTY_FUNCTION__
<< std::hex
<< ": invalid header from " << remoteCopyType
<< ": invalid magic number: expected: 0x" << RTCOPY_MAGIC << " or 0x"
<< RTCOPY_MAGIC_OLD0 << ": received: 0x" << magic;
throw ex;
}
// Unmarshall the request type
uint32_t reqtype = 0;
p = Marshaller::unmarshallUint32(p, reqtype);
// If the request type is invalid
if(reqtype != VDQM_CLIENTINFO) {
castor::exception::Exception ex(EBADMSG);
ex.getMessage() << __PRETTY_FUNCTION__
<< std::hex
<< ": invalid header from " << remoteCopyType
<< ": invalid request type: expected: 0x" << VDQM_CLIENTINFO
<< ": received 0x" << reqtype;
throw ex;
}
// Unmarshall the length of the message body
uint32_t len = 0;
p = Marshaller::unmarshallUint32(p, len);
// If the message body is not large enough for a status number and an empty
// error string
if(len < sizeof(uint32_t) + 1) {
castor::exception::Exception ex(EMSGSIZE);
ex.getMessage() << __PRETTY_FUNCTION__
<< ": invalid header from " << remoteCopyType
<< ": message body not large enough for a status number and an empty "
"string: minimum size expected: " << (sizeof(uint32_t) + 1)
<< ": received: " << len;
throw ex;
}
// If the message body is too large
if(len > MSGBUFSIZ - 3*sizeof(uint32_t) ) {
castor::exception::Exception ex(EMSGSIZE);
ex.getMessage() << __PRETTY_FUNCTION__
<< ": invalid header from " << remoteCopyType
<< ": message body too large: maximum length: "
<< (MSGBUFSIZ-3*sizeof(uint32_t)) << " actual length: " << len;
throw ex;
}
rc = netread_timeout(socket.socket(), p, len, NETRW_TIMEOUT);
switch (rc) {
case -1:
{
serrno = SECOMERR;
castor::exception::Exception ex(serrno);
ex.getMessage() << __PRETTY_FUNCTION__
<< ": netread(HDR) from " << remoteCopyType << ": " << neterror();
throw ex;
}
case 0:
{
serrno = SECONNDROP;
castor::exception::Exception ex(serrno);
ex.getMessage() << __PRETTY_FUNCTION__
<< ": netread(HDR) from " << remoteCopyType << ": connection dropped";
throw ex;
}
}
// Unmarshall the status number
uint32_t status = 0;
p = Marshaller::unmarshallUint32(p, status);
// The size of the received error message is the size of the mesaage body
// minus the size of the status number
const size_t receivedErrMsgSize = len - sizeof(uint32_t);
// Unmarshall the error message
char errMsg[1024];
const size_t errMsgSize = receivedErrMsgSize < sizeof(errMsg) ?
receivedErrMsgSize : sizeof(errMsg);
errMsg[0] = '\0';
strncpy(errMsg, p, errMsgSize);
errMsg[errMsgSize - 1] = '\0';
// If RTCOPY or tape aggregator daemon returned an error message
if(status != 0 || errMsgSize > 1) {
castor::exception::Exception ex(ECANCELED);
ex.getMessage() << __PRETTY_FUNCTION__
<< ": " << remoteCopyType << " returned an error message: status: "
<< status << ": error message: " << errMsg;
throw ex;
}
}
/******************************************************************************
* RCPJobSubmitter.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#ifndef _CASTOR_TAPE_AGGREGATOR_RCPJOBSUBMITTER_HPP_
#define _CASTOR_TAPE_AGGREGATOR_RCPJOBSUBMITTER_HPP_
#include "h/net.h"
#include <string>
namespace castor {
namespace io {
class AbstractTCPSocket;
}
namespace tape {
namespace aggregator {
/**
* Remote copy job submitter.
*
* A helper class for submitting remote copy jobs to either RTCPD or tape
* aggregator daemons.
*/
class RCPJobSubmitter {
public:
/**
* Submits a remote copy job to either a RTCPD or tape aggregator daemon.
*
* @param host the hostname of the RTCPD or tape aggregator daemon
* @param port the port number of the RTCPD or tape aggregator daemon
* @param remoteCopyType remote copy type to be used for exception messages
* @param tapeRequestID tape request ID
* @param clientUserName client user name
* @param clientHost client host
* @param clientPort client port
* @param clientEuid client user ID
* @param clientEgid client group ID
* @param deviceGroupName device group name
* @param tapeDriveName tape drive name
*/
static void submit(
const std::string &host,
const unsigned port,
const char *remoteCopyType,
const u_signed64 tapeRequestID,
const std::string &clientUserName,
const std::string &clientHost,
const int clientPort,
const int clientEuid,
const int clientEgid,
const std::string &deviceGroupName,
const std::string &tapeDriveName)
throw (castor::exception::Exception);
private:
/**
* This is a helper function for sendJob() to read the reply of the RTCOPY
* or tape aggregator daemon
*
* @param remoteCopyType remote copy type to be used for exception messages
*/
static void readReply(castor::io::AbstractTCPSocket &socket,
const char *remoteCopyType) throw (castor::exception::Exception);
}; // class RCPJobSubmitter
} // namespace aggregator
} // namespace tape
} // namespace castor
#endif // _CASTOR_TAPE_AGGREGATOR_RCPJOBSUBMITTER_HPP_
......@@ -22,10 +22,14 @@
* @author Steven Murray Steven.Murray@cern.ch
*****************************************************************************/
#include "castor/exception/Exception.hpp"
#include "castor/exception/Internal.hpp"
#include "castor/tape/aggregator/AggregatorDlfMessageConstants.hpp"
#include "castor/tape/aggregator/RequestHandlerThread.hpp"
#include "castor/tape/aggregator/RCPJobSubmitter.hpp"
#include "castor/tape/aggregator/SocketHelper.hpp"
#include "h/rtcp_constants.h"
#include "h/vdqm_constants.h"
//-----------------------------------------------------------------------------
......@@ -33,8 +37,14 @@
//-----------------------------------------------------------------------------
castor::tape::aggregator::RequestHandlerThread::RequestHandlerThread()
throw () : m_jobQueue(1) {
m_rtcopyMagicOld0Handlers[VDQM_CLIENTINFO] =
&RequestHandlerThread::handleJobSubmission;
m_magicToHandlers[RTCOPY_MAGIC_OLD0] = &m_rtcopyMagicOld0Handlers;
}
//-----------------------------------------------------------------------------
// destructor
//-----------------------------------------------------------------------------
......@@ -42,6 +52,7 @@ castor::tape::aggregator::RequestHandlerThread::~RequestHandlerThread()
throw () {
}
//-----------------------------------------------------------------------------
// init
//-----------------------------------------------------------------------------
......@@ -70,7 +81,7 @@ void castor::tape::aggregator::RequestHandlerThread::run(void *param)
try {
handleRequest(cuuid, *socket);
dispatchRequest(cuuid, *socket);
} catch(castor::exception::Exception &e) {
......@@ -82,7 +93,8 @@ void castor::tape::aggregator::RequestHandlerThread::run(void *param)
AGGREGATOR_HANDLE_REQUEST_EXCEPT, 2, params);
}
// De-allocate the socket
// Close and de-allocate the socket
socket->close();
delete socket;
}
......@@ -96,28 +108,75 @@ void castor::tape::aggregator::RequestHandlerThread::stop()