Commit a4b918f0 authored by Steven Murray's avatar Steven Murray
Browse files

I have moved the code in RtcpdHandlerThread to VdqmRequestHandlerThread and

have removed RtcpdHandlerThread.  The code compiles, but probably does not
work.

The concurrency and communication model has been decided.  There will be a pool of VDQM threads with one thread per tape drive.  Today there is only one drive
per tape server at CERN, but we are trying to keep backwards compatibility.
The existing code we are trying to replace kept open the option of more than
one drive per tape server.  One VDQM thread will use a select loop to
communicate with the five or more incomming connections required by RTCPD.  By
default, RTCPD makes five connections to a client.  The initial connection to
signal the start of a remote tape copy and to listen for ping and abort
messages, one connection for the RTCPD tape IO thread, and three connections
for the default number of three RTCPD disk IO threads.

The next step is to code the select loop for the five or more connections.
parent 5dee46b7
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Mon Jan 19 09:17:28 CET 2009
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Fri Jan 23 11:26:28 CET 2009
*/
/******************************************************************************
......@@ -42,32 +42,28 @@ AGGREGATOR_VDQM_CONNECTION_WITH_INFO=5, /* "Received a connection from VDQM" */
AGGREGATOR_VDQM_CONNECTION_WITHOUT_INFO=6, /* "Received a connection from VDQM without peer host and port information" */
AGGREGATOR_HANDLE_VDQM_REQUEST_EXCEPT=7, /* "Exception raised when handling a request from the VDQM" */
AGGREGATOR_FAILED_TO_READ_MESSAGE_HEADER=8, /* "Failed to read message header" */
AGGREGATOR_FAILED_TO_UNMARSHALL_MESSAGE_HEADER=9, /* "Failed to unmarshall message header" */
AGGREGATOR_UNKNOWN_MAGIC=10, /* "Unknown magic number" */
AGGREGATOR_UNKNOWN_REQUEST_TYPE=11, /* "Unknown request type" */
AGGREGATOR_MESSAGE_BODY_LENGTH_TOO_LARGE=12, /* "Length of message body is too large" */
AGGREGATOR_JOB_MESSAGE_BODY_LENGTH_TOO_SHORT=13, /* "Length of job submission message body is too short" */
AGGREGATOR_FAILED_TO_READ_MESSAGE_BODY=14, /* "Failed to read message body" */
AGGREGATOR_HANDLE_JOB_MESSAGE=15, /* "Handle job submission message VDQM_CLIENTINFO:RTCOPY_MAGIC_OLD0" */
AGGREGATOR_SUBMITTING_JOB_TO_RTCPD=16, /* "Submitting remote copy job to RTCPD" */
AGGREGATOR_FAILED_TO_SUBMIT_JOB_TO_RTCPD=17, /* "Failed to submit remote copy job to RTCPD" */
AGGREGATOR_FAILED_TO_MARSHALL_RTCP_ACKN=18, /* "Failed to marshall RTCP acknowledge message" */
AGGREGATOR_FAILED_TO_SEND_RCP_JOB_REPLY_TO_VDQM=19, /* "Failed to send RCP job reply to VDQM" */
AGGREGATOR_RECEIVED_RCP_JOB_FROM_UNAUTHORIZED_HOST=20, /* "Received a remote copy job from an unauthorized host" */
AGGREGATOR_RECEIVED_RTCPD_ERROR_MESSAGE=21, /* "Received error message from RTCPD" */
AGGREGATOR_RTCPD_HANDLER_SOCKET_IS_NULL=22, /* "The RtcpdHandlerThread has been passed a NULL socket pointer" */
AGGREGATOR_RTCPD_CONNECTION_WITH_INFO=23, /* "Received a connection from RTCPD" */
AGGREGATOR_RTCPD_CONNECTION_WITHOUT_INFO=24, /* "Received a connection from RTCPD without peer host and port information" */
AGGREGATOR_GOT_VOLREQID=25, /* "Got volume request ID from RTCPD" */
AGGREGATOR_FAILED_TO_GET_VOLREQID=26, /* "Failed to get volume request ID from RTCPD" */
AGGREGATOR_FAILED_TO_UNMARSHALL_MESSAGE_BODY=27, /* "Failed to unmarshall message body" */
AGGREGATOR_FAILED_TO_MARSHALL_RCP_JOB_REPLY_MESSAGE=28, /* "Failed to marshall RCP job reply message" */
AGGREGATOR_GAVE_VOLUME_INFO=29, /* "Gave volume information to RTCPD" */
AGGREGATOR_FAILED_TO_GIVE_VOLUME_INFO=30, /* "Failed to give volume information to RTCPD" */
AGGREGATOR_GAVE_FILE_INFO=31, /* "Gave file information to RTCPD" */
AGGREGATOR_FAILED_TO_GIVE_FILE_INFO=32, /* "Failed to give file information to RTCPD" */
AGGREGATOR_SIGNALLED_NO_MORE_REQUESTS=33, /* "Signalled no more requests to RTCPD" */
AGGREGATOR_FAILED_TO_SIGNAL_NO_MORE_REQUESTS=34 /* "Failed to signal no more requests to RTCPD" */
AGGREGATOR_UNKNOWN_MAGIC=9, /* "Unknown magic number" */
AGGREGATOR_UNKNOWN_REQUEST_TYPE=10, /* "Unknown request type" */
AGGREGATOR_FAILED_TO_READ_MESSAGE_BODY=11, /* "Failed to read message body" */
AGGREGATOR_HANDLE_JOB_MESSAGE=12, /* "Handle job submission message VDQM_CLIENTINFO:RTCOPY_MAGIC_OLD0" */
AGGREGATOR_SUBMITTING_JOB_TO_RTCPD=13, /* "Submitting remote copy job to RTCPD" */
AGGREGATOR_FAILED_TO_SUBMIT_JOB_TO_RTCPD=14, /* "Failed to submit remote copy job to RTCPD" */
AGGREGATOR_FAILED_TO_MARSHALL_RTCP_ACKN=15, /* "Failed to marshall RTCP acknowledge message" */
AGGREGATOR_FAILED_TO_SEND_RCP_JOB_REPLY_TO_VDQM=16, /* "Failed to send RCP job reply to VDQM" */
AGGREGATOR_FAILED_TO_PROCESS_RCP_JOB_SUBMISSION=17, /* "Failed to process RCP job submission message" */
AGGREGATOR_RECEIVED_RTCPD_ERROR_MESSAGE=18, /* "Received error message from RTCPD" */
AGGREGATOR_RTCPD_HANDLER_SOCKET_IS_NULL=19, /* "The RtcpdHandlerThread has been passed a NULL socket pointer" */
AGGREGATOR_RTCPD_CONNECTION_WITH_INFO=20, /* "Received a connection from RTCPD" */
AGGREGATOR_RTCPD_CONNECTION_WITHOUT_INFO=21, /* "Received a connection from RTCPD without peer host and port information" */
AGGREGATOR_GOT_VOLREQID=22, /* "Got volume request ID from RTCPD" */
AGGREGATOR_FAILED_TO_GET_VOLREQID=23, /* "Failed to get volume request ID from RTCPD" */
AGGREGATOR_FAILED_TO_MARSHALL_RCP_JOB_REPLY_MESSAGE=24, /* "Failed to marshall RCP job reply message" */
AGGREGATOR_GAVE_VOLUME_INFO=25, /* "Gave volume information to RTCPD" */
AGGREGATOR_FAILED_TO_GIVE_VOLUME_INFO=26, /* "Failed to give volume information to RTCPD" */
AGGREGATOR_GAVE_FILE_INFO=27, /* "Gave file information to RTCPD" */
AGGREGATOR_FAILED_TO_GIVE_FILE_INFO=28, /* "Failed to give file information to RTCPD" */
AGGREGATOR_SIGNALLED_NO_MORE_REQUESTS=29, /* "Signalled no more requests to RTCPD" */
AGGREGATOR_FAILED_TO_SIGNAL_NO_MORE_REQUESTS=30 /* "Failed to signal no more requests to RTCPD" */
}; // enum AggregatorDlfMessages
} // namespace aggregator
} // namespace tape
......
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Mon Jan 19 09:17:28 CET 2009
/* This file was generated by ./AggregatorDlfMessagesCodeGenerator on Fri Jan 23 11:26:28 CET 2009
*/
/******************************************************************************
......@@ -38,25 +38,21 @@ castor::dlf::Message castor::tape::aggregator::AggregatorDaemon::s_dlfMessages[]
{AGGREGATOR_VDQM_CONNECTION_WITHOUT_INFO, "Received a connection from VDQM without peer host and port information"},
{AGGREGATOR_HANDLE_VDQM_REQUEST_EXCEPT, "Exception raised when handling a request from the VDQM"},
{AGGREGATOR_FAILED_TO_READ_MESSAGE_HEADER, "Failed to read message header"},
{AGGREGATOR_FAILED_TO_UNMARSHALL_MESSAGE_HEADER, "Failed to unmarshall message header"},
{AGGREGATOR_UNKNOWN_MAGIC, "Unknown magic number"},
{AGGREGATOR_UNKNOWN_REQUEST_TYPE, "Unknown request type"},
{AGGREGATOR_MESSAGE_BODY_LENGTH_TOO_LARGE, "Length of message body is too large"},
{AGGREGATOR_JOB_MESSAGE_BODY_LENGTH_TOO_SHORT, "Length of job submission message body is too short"},
{AGGREGATOR_FAILED_TO_READ_MESSAGE_BODY, "Failed to read message body"},
{AGGREGATOR_HANDLE_JOB_MESSAGE, "Handle job submission message VDQM_CLIENTINFO:RTCOPY_MAGIC_OLD0"},
{AGGREGATOR_SUBMITTING_JOB_TO_RTCPD, "Submitting remote copy job to RTCPD"},
{AGGREGATOR_FAILED_TO_SUBMIT_JOB_TO_RTCPD, "Failed to submit remote copy job to RTCPD"},
{AGGREGATOR_FAILED_TO_MARSHALL_RTCP_ACKN, "Failed to marshall RTCP acknowledge message"},
{AGGREGATOR_FAILED_TO_SEND_RCP_JOB_REPLY_TO_VDQM, "Failed to send RCP job reply to VDQM"},
{AGGREGATOR_RECEIVED_RCP_JOB_FROM_UNAUTHORIZED_HOST, "Received a remote copy job from an unauthorized host"},
{AGGREGATOR_FAILED_TO_PROCESS_RCP_JOB_SUBMISSION, "Failed to process RCP job submission message"},
{AGGREGATOR_RECEIVED_RTCPD_ERROR_MESSAGE, "Received error message from RTCPD"},
{AGGREGATOR_RTCPD_HANDLER_SOCKET_IS_NULL, "The RtcpdHandlerThread has been passed a NULL socket pointer"},
{AGGREGATOR_RTCPD_CONNECTION_WITH_INFO, "Received a connection from RTCPD"},
{AGGREGATOR_RTCPD_CONNECTION_WITHOUT_INFO, "Received a connection from RTCPD without peer host and port information"},
{AGGREGATOR_GOT_VOLREQID, "Got volume request ID from RTCPD"},
{AGGREGATOR_FAILED_TO_GET_VOLREQID, "Failed to get volume request ID from RTCPD"},
{AGGREGATOR_FAILED_TO_UNMARSHALL_MESSAGE_BODY, "Failed to unmarshall message body"},
{AGGREGATOR_FAILED_TO_MARSHALL_RCP_JOB_REPLY_MESSAGE, "Failed to marshall RCP job reply message"},
{AGGREGATOR_GAVE_VOLUME_INFO, "Gave volume information to RTCPD"},
{AGGREGATOR_FAILED_TO_GIVE_VOLUME_INFO, "Failed to give volume information to RTCPD"},
......
......@@ -7,29 +7,25 @@
6,AGGREGATOR_VDQM_CONNECTION_WITHOUT_INFO,"Received a connection from VDQM without peer host and port information"
7,AGGREGATOR_HANDLE_VDQM_REQUEST_EXCEPT,"Exception raised when handling a request from the VDQM"
8,AGGREGATOR_FAILED_TO_READ_MESSAGE_HEADER,"Failed to read message header"
9,AGGREGATOR_FAILED_TO_UNMARSHALL_MESSAGE_HEADER,"Failed to unmarshall message header"
10,AGGREGATOR_UNKNOWN_MAGIC,"Unknown magic number"
11,AGGREGATOR_UNKNOWN_REQUEST_TYPE,"Unknown request type"
12,AGGREGATOR_MESSAGE_BODY_LENGTH_TOO_LARGE,"Length of message body is too large"
13,AGGREGATOR_JOB_MESSAGE_BODY_LENGTH_TOO_SHORT,"Length of job submission message body is too short"
14,AGGREGATOR_FAILED_TO_READ_MESSAGE_BODY,"Failed to read message body"
15,AGGREGATOR_HANDLE_JOB_MESSAGE,"Handle job submission message VDQM_CLIENTINFO:RTCOPY_MAGIC_OLD0"
16,AGGREGATOR_SUBMITTING_JOB_TO_RTCPD,"Submitting remote copy job to RTCPD"
17,AGGREGATOR_FAILED_TO_SUBMIT_JOB_TO_RTCPD,"Failed to submit remote copy job to RTCPD"
18,AGGREGATOR_FAILED_TO_MARSHALL_RTCP_ACKN,"Failed to marshall RTCP acknowledge message"
19,AGGREGATOR_FAILED_TO_SEND_RCP_JOB_REPLY_TO_VDQM,"Failed to send RCP job reply to VDQM"
20,AGGREGATOR_RECEIVED_RCP_JOB_FROM_UNAUTHORIZED_HOST,"Received a remote copy job from an unauthorized host"
21,AGGREGATOR_RECEIVED_RTCPD_ERROR_MESSAGE,"Received error message from RTCPD"
22,AGGREGATOR_RTCPD_HANDLER_SOCKET_IS_NULL,"The RtcpdHandlerThread has been passed a NULL socket pointer"
23,AGGREGATOR_RTCPD_CONNECTION_WITH_INFO,"Received a connection from RTCPD"
24,AGGREGATOR_RTCPD_CONNECTION_WITHOUT_INFO,"Received a connection from RTCPD without peer host and port information"
25,AGGREGATOR_GOT_VOLREQID,"Got volume request ID from RTCPD"
26,AGGREGATOR_FAILED_TO_GET_VOLREQID,"Failed to get volume request ID from RTCPD"
27,AGGREGATOR_FAILED_TO_UNMARSHALL_MESSAGE_BODY,"Failed to unmarshall message body"
28,AGGREGATOR_FAILED_TO_MARSHALL_RCP_JOB_REPLY_MESSAGE,"Failed to marshall RCP job reply message"
29,AGGREGATOR_GAVE_VOLUME_INFO,"Gave volume information to RTCPD"
30,AGGREGATOR_FAILED_TO_GIVE_VOLUME_INFO,"Failed to give volume information to RTCPD"
31,AGGREGATOR_GAVE_FILE_INFO,"Gave file information to RTCPD"
32,AGGREGATOR_FAILED_TO_GIVE_FILE_INFO,"Failed to give file information to RTCPD"
33,AGGREGATOR_SIGNALLED_NO_MORE_REQUESTS,"Signalled no more requests to RTCPD"
34,AGGREGATOR_FAILED_TO_SIGNAL_NO_MORE_REQUESTS,"Failed to signal no more requests to RTCPD"
9,AGGREGATOR_UNKNOWN_MAGIC,"Unknown magic number"
10,AGGREGATOR_UNKNOWN_REQUEST_TYPE,"Unknown request type"
11,AGGREGATOR_FAILED_TO_READ_MESSAGE_BODY,"Failed to read message body"
12,AGGREGATOR_HANDLE_JOB_MESSAGE,"Handle job submission message VDQM_CLIENTINFO:RTCOPY_MAGIC_OLD0"
13,AGGREGATOR_SUBMITTING_JOB_TO_RTCPD,"Submitting remote copy job to RTCPD"
14,AGGREGATOR_FAILED_TO_SUBMIT_JOB_TO_RTCPD,"Failed to submit remote copy job to RTCPD"
15,AGGREGATOR_FAILED_TO_MARSHALL_RTCP_ACKN,"Failed to marshall RTCP acknowledge message"
16,AGGREGATOR_FAILED_TO_SEND_RCP_JOB_REPLY_TO_VDQM,"Failed to send RCP job reply to VDQM"
17,AGGREGATOR_FAILED_TO_PROCESS_RCP_JOB_SUBMISSION,"Failed to process RCP job submission message"
18,AGGREGATOR_RECEIVED_RTCPD_ERROR_MESSAGE,"Received error message from RTCPD"
19,AGGREGATOR_RTCPD_HANDLER_SOCKET_IS_NULL,"The RtcpdHandlerThread has been passed a NULL socket pointer"
20,AGGREGATOR_RTCPD_CONNECTION_WITH_INFO,"Received a connection from RTCPD"
21,AGGREGATOR_RTCPD_CONNECTION_WITHOUT_INFO,"Received a connection from RTCPD without peer host and port information"
22,AGGREGATOR_GOT_VOLREQID,"Got volume request ID from RTCPD"
23,AGGREGATOR_FAILED_TO_GET_VOLREQID,"Failed to get volume request ID from RTCPD"
24,AGGREGATOR_FAILED_TO_MARSHALL_RCP_JOB_REPLY_MESSAGE,"Failed to marshall RCP job reply message"
25,AGGREGATOR_GAVE_VOLUME_INFO,"Gave volume information to RTCPD"
26,AGGREGATOR_FAILED_TO_GIVE_VOLUME_INFO,"Failed to give volume information to RTCPD"
27,AGGREGATOR_GAVE_FILE_INFO,"Gave file information to RTCPD"
28,AGGREGATOR_FAILED_TO_GIVE_FILE_INFO,"Failed to give file information to RTCPD"
29,AGGREGATOR_SIGNALLED_NO_MORE_REQUESTS,"Signalled no more requests to RTCPD"
30,AGGREGATOR_FAILED_TO_SIGNAL_NO_MORE_REQUESTS,"Failed to signal no more requests to RTCPD"
......@@ -28,7 +28,6 @@
#include "castor/server/TCPListenerThreadPool.hpp"
#include "castor/tape/aggregator/AggregatorDaemon.hpp"
#include "castor/tape/aggregator/AggregatorDlfMessageConstants.hpp"
#include "castor/tape/aggregator/RtcpdHandlerThread.hpp"
#include "castor/tape/aggregator/VdqmRequestHandlerThread.hpp"
......@@ -95,29 +94,6 @@ int main(int argc, char *argv[]) {
}
//----------------------------------
// Create the RtcpdHandlerThreadPool
//----------------------------------
daemon.addThreadPool(
new castor::server::TCPListenerThreadPool("RtcpdHandlerThreadPool",
new castor::tape::aggregator::RtcpdHandlerThread(), rtcpdListenPort));
{
castor::server::BaseThreadPool *const rtcpdHandlerThreadPool =
daemon.getThreadPool('R');
if(rtcpdHandlerThreadPool == NULL) {
castor::exception::Internal ie;
ie.getMessage() << __PRETTY_FUNCTION__
<< ": Failed to get RtcpdHandlerThreadPool";
throw ie;
}
rtcpdHandlerThreadPool->setNbThreads(10);
}
//------------------
// Start the threads
//------------------
......
......@@ -27,16 +27,18 @@
#define CASTOR_TAPE_AGGREGATOR_CONSTANTS_HPP 1
#include <stdint.h>
#include <stdlib.h>
namespace castor {
namespace tape {
namespace aggregator {
const size_t STRERRORBUFLEN = 256;
const int NETRWTIMEOUT = 300;
const size_t HDRBUFSIZ = 3 * sizeof(uint32_t);
const size_t MSGBUFSIZ = 1024;
const int NETRWTIMEOUT = 300;
const size_t STRERRORBUFLEN = 256;
const int LISTENBACKLOG = 2;
} // namespace aggregator
} // namespace tape
......
......@@ -37,7 +37,6 @@ AGGREGATORBIN_SRCS = \
AggregatorMain.cpp \
Marshaller.cpp \
RcpJobSubmitter.cpp \
RtcpdHandlerThread.cpp \
SocketHelper.cpp \
Transceiver.cpp \
Utils.cpp \
......
/******************************************************************************
* castor/tape/aggregator/RtcpdHandlerThread.cpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Nicola.Bessone@cern.ch Steven.Murray@cern.ch
*****************************************************************************/
#include "castor/exception/Exception.hpp"
#include "castor/exception/Internal.hpp"
#include "castor/tape/aggregator/AggregatorDlfMessageConstants.hpp"
#include "castor/tape/aggregator/Constants.hpp"
#include "castor/tape/aggregator/Marshaller.hpp"
#include "castor/tape/aggregator/RtcpdHandlerThread.hpp"
#include "castor/tape/aggregator/RcpJobSubmitter.hpp"
#include "castor/tape/aggregator/SocketHelper.hpp"
#include "castor/tape/aggregator/Transceiver.hpp"
#include "castor/tape/aggregator/Utils.hpp"
#include "h/common.h"
#include "h/rtcp.h"
#include <string.h>
#include <time.h>
//-----------------------------------------------------------------------------
// constructor
//-----------------------------------------------------------------------------
castor::tape::aggregator::RtcpdHandlerThread::RtcpdHandlerThread() throw() {
}
//-----------------------------------------------------------------------------
// destructor
//-----------------------------------------------------------------------------
castor::tape::aggregator::RtcpdHandlerThread::~RtcpdHandlerThread()
throw() {
}
//-----------------------------------------------------------------------------
// init
//-----------------------------------------------------------------------------
void castor::tape::aggregator::RtcpdHandlerThread::init()
throw() {
}
//-----------------------------------------------------------------------------
// run
//-----------------------------------------------------------------------------
void castor::tape::aggregator::RtcpdHandlerThread::run(void *param)
throw() {
Cuuid_t cuuid = nullCuuid;
// Gives a Cuuid to the request
Cuuid_create(&cuuid);
if(param == NULL) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_RTCPD_HANDLER_SOCKET_IS_NULL, 1, params);
return;
}
castor::io::ServerSocket *socket = (castor::io::ServerSocket*)param;
// Log the new connection
try {
unsigned short port = 0; // Client port
unsigned long ip = 0; // Client IP
// Get client IP info
socket->getPeerIp(port, ip);
castor::dlf::Param params[] = {
castor::dlf::Param("IP" , castor::dlf::IPAddress(ip)),
castor::dlf::Param("Port", port)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_RTCPD_CONNECTION_WITH_INFO, 2, params);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_RTCPD_CONNECTION_WITHOUT_INFO, 3, params);
}
// Get the volume request ID associated with the remote copy job from RTCPD
uint32_t volReqId = 0;
{
RtcpTapeRequestMessage reply;
try {
Transceiver::getVolumeRequestIdFromRtcpd(cuuid, *socket, NETRWTIMEOUT,
reply);
volReqId = reply.volReqId;
castor::dlf::Param params[] = {
castor::dlf::Param("vid" , reply.vid ),
castor::dlf::Param("vsn" , reply.vsn ),
castor::dlf::Param("label" , reply.label ),
castor::dlf::Param("devtype" , reply.devtype ),
castor::dlf::Param("density" , reply.density ),
castor::dlf::Param("unit" , reply.unit ),
castor::dlf::Param("volReqId" , reply.volReqId ),
castor::dlf::Param("jobId" , reply.jobId ),
castor::dlf::Param("mode" , reply.mode ),
castor::dlf::Param("start_file" , reply.start_file ),
castor::dlf::Param("end_file" , reply.end_file ),
castor::dlf::Param("side" , reply.side ),
castor::dlf::Param("tprc" , reply.tprc ),
castor::dlf::Param("tStartRequest" , reply.tStartRequest ),
castor::dlf::Param("tEndRequest" , reply.tEndRequest ),
castor::dlf::Param("tStartRtcpd" , reply.tStartRtcpd ),
castor::dlf::Param("tStartMount" , reply.tStartMount ),
castor::dlf::Param("tEndMount" , reply.tEndMount ),
castor::dlf::Param("tStartUnmount" , reply.tStartUnmount ),
castor::dlf::Param("tEndUnmount" , reply.tEndUnmount ),
castor::dlf::Param("rtcpReqId" , reply.rtcpReqId ),
castor::dlf::Param("err.errmsgtxt" , reply.err.errmsgtxt ),
castor::dlf::Param("err.severity" , reply.err.severity ),
castor::dlf::Param("err.errorcode" , reply.err.errorcode ),
castor::dlf::Param("err.maxTpRetry", reply.err.maxTpRetry),
castor::dlf::Param("err.maxCpRetry", reply.err.maxCpRetry)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_GOT_VOLREQID, 26, params);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_GET_VOLREQID, 3, params);
}
}
// Give volume ID to RTCPD
{
uint32_t tStartRequest = time(NULL); // CASTOR2/rtcopy/rtcpclientd.c:1494
RtcpTapeRequestMessage request;
RtcpTapeRequestMessage reply;
Utils::setBytes(request, '\0');
try {
Utils::copyString(request.vid , "I10547");
Utils::copyString(request.vsn , "I10547");
Utils::copyString(request.label , "aul" );
Utils::copyString(request.density, "700GC" );
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_NULL, 3, params);
return;
}
request.volReqId = volReqId;
request.tStartRequest = tStartRequest;
try {
Transceiver::giveVolumeIdToRtcpd(cuuid, *socket, NETRWTIMEOUT, request,
reply);
castor::dlf::Param params[] = {
castor::dlf::Param("vid" , reply.vid ),
castor::dlf::Param("vsn" , reply.vsn ),
castor::dlf::Param("label" , reply.label ),
castor::dlf::Param("devtype" , reply.devtype ),
castor::dlf::Param("density" , reply.density ),
castor::dlf::Param("unit" , reply.unit ),
castor::dlf::Param("volReqId" , reply.volReqId ),
castor::dlf::Param("jobId" , reply.jobId ),
castor::dlf::Param("mode" , reply.mode ),
castor::dlf::Param("start_file" , reply.start_file ),
castor::dlf::Param("end_file" , reply.end_file ),
castor::dlf::Param("side" , reply.side ),
castor::dlf::Param("tprc" , reply.tprc ),
castor::dlf::Param("tStartRequest" , reply.tStartRequest ),
castor::dlf::Param("tEndRequest" , reply.tEndRequest ),
castor::dlf::Param("tStartRtcpd" , reply.tStartRtcpd ),
castor::dlf::Param("tStartMount" , reply.tStartMount ),
castor::dlf::Param("tEndMount" , reply.tEndMount ),
castor::dlf::Param("tStartUnmount" , reply.tStartUnmount ),
castor::dlf::Param("tEndUnmount" , reply.tEndUnmount ),
castor::dlf::Param("rtcpReqId" , reply.rtcpReqId ),
castor::dlf::Param("err.errmsgtxt" , reply.err.errmsgtxt ),
castor::dlf::Param("err.severity" , reply.err.severity ),
castor::dlf::Param("err.errorcode" , reply.err.errorcode ),
castor::dlf::Param("err.maxTpRetry", reply.err.maxTpRetry),
castor::dlf::Param("err.maxCpRetry", reply.err.maxCpRetry)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_GAVE_VOLUME_INFO, 26, params);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_GIVE_VOLUME_INFO, 3, params);
}
}
// Give file information to RTCPD
{
RtcpFileRequestMessage request;
Utils::setBytes(request, '\0');
try {
Utils::copyString(request.filePath, "lxc2disk07:/dev/null");
Utils::copyString(request.recfm, "F");
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_NULL, 3, params);
return;
}
request.volReqId = volReqId;
request.jobId = -1;
request.stageSubReqId = -1;
request.umask = 18;
request.tapeFseq = 1;
request.diskFseq = 1;
request.blockSize = -1;
request.recordLength = -1;
request.retention = -1;
request.defAlloc = -1;
request.rtcpErrAction = -1;
request.tpErrAction = -1;
request.convert = -1;
request.checkFid = -1;
request.concat = 1;
request.procStatus = RTCP_WAITING;
request.err.severity = 1;
request.err.maxTpRetry = -1;
request.err.maxCpRetry = -1;
RtcpFileRequestMessage reply;
try {
Transceiver::giveFileInfoToRtcpd(cuuid, *socket, NETRWTIMEOUT, request,
reply);
castor::dlf::Param params[] = {
castor::dlf::Param("filePath", reply.filePath),
castor::dlf::Param("tapePath", reply.tapePath),
castor::dlf::Param("recfm", reply.recfm),
castor::dlf::Param("fid", reply.fid),
castor::dlf::Param("ifce", reply.ifce),
castor::dlf::Param("stageId", reply.stageId),
castor::dlf::Param("volReqId", reply.volReqId),
castor::dlf::Param("jobId", reply.jobId),
castor::dlf::Param("stageSubReqId", reply.stageSubReqId),
castor::dlf::Param("umask", reply.umask),
castor::dlf::Param("positionMethod", reply.positionMethod),
castor::dlf::Param("tapeFseq", reply.tapeFseq),
castor::dlf::Param("diskFseq", reply.diskFseq),
castor::dlf::Param("blockSize", reply.blockSize),
castor::dlf::Param("recordLength", reply.recordLength),
castor::dlf::Param("retention", reply.retention),
castor::dlf::Param("defAlloc", reply.defAlloc),
castor::dlf::Param("rtcpErrAction", reply.rtcpErrAction),
castor::dlf::Param("tpErrAction", reply.tpErrAction),
castor::dlf::Param("convert", reply.convert),
castor::dlf::Param("checkFid", reply.checkFid),
castor::dlf::Param("concat", reply.concat),
castor::dlf::Param("procStatus", reply.procStatus),
castor::dlf::Param("cprc", reply.cprc),
castor::dlf::Param("tStartPosition", reply.tStartPosition),
castor::dlf::Param("tEndPosition", reply.tEndPosition),
castor::dlf::Param("tStartTransferDisk", reply.tStartTransferDisk),
castor::dlf::Param("tEndTransferDisk", reply.tEndTransferDisk),
castor::dlf::Param("tStartTransferTape", reply.tStartTransferTape),
castor::dlf::Param("tEndTransferTape", reply.tEndTransferTape),
castor::dlf::Param("blockId[0]", reply.blockId[0]),
castor::dlf::Param("blockId[1]", reply.blockId[1]),
castor::dlf::Param("blockId[2]", reply.blockId[2]),
castor::dlf::Param("blockId[3]", reply.blockId[3]),
castor::dlf::Param("offset", reply.offset),
castor::dlf::Param("bytesIn", reply.bytesIn),
castor::dlf::Param("bytesOut", reply.bytesOut),
castor::dlf::Param("hostBytes", reply.hostBytes),
castor::dlf::Param("nbRecs", reply.nbRecs),
castor::dlf::Param("maxNbRec", reply.maxNbRec),
castor::dlf::Param("maxSize", reply.maxSize),
castor::dlf::Param("startSize", reply.startSize),
castor::dlf::Param("segAttr.nameServerHostName",
reply.segAttr.nameServerHostName),
castor::dlf::Param("segAttr.segmCksumAlgorithm",
reply.segAttr.segmCksumAlgorithm),
castor::dlf::Param("segAttr.segmCksum", reply.segAttr.segmCksum),
castor::dlf::Param("segAttr.castorFileId", reply.segAttr.castorFileId),
castor::dlf::Param("stgReqId.time_low", reply.stgReqId.time_low),
castor::dlf::Param("stgReqId.time_mid", reply.stgReqId.time_mid),
castor::dlf::Param("stgReqId.time_hi_and_version",
reply.stgReqId.time_hi_and_version),
castor::dlf::Param("stgReqId.clock_seq_hi_and_reserved",
reply.stgReqId.clock_seq_hi_and_reserved),
castor::dlf::Param("stgReqId.clock_seq_low",
reply.stgReqId.clock_seq_low),
castor::dlf::Param("stgReqId.node[0]", reply.stgReqId.node[0]),
castor::dlf::Param("stgReqId.node[1]", reply.stgReqId.node[1]),
castor::dlf::Param("stgReqId.node[2]", reply.stgReqId.node[2]),
castor::dlf::Param("stgReqId.node[3]", reply.stgReqId.node[3]),
castor::dlf::Param("stgReqId.node[4]", reply.stgReqId.node[4]),
castor::dlf::Param("stgReqId.node[5]", reply.stgReqId.node[5]),
castor::dlf::Param("err.errmsgtxt", reply.err.errmsgtxt),
castor::dlf::Param("err.severity", reply.err.severity),
castor::dlf::Param("err.errorcode", reply.err.errorcode),
castor::dlf::Param("err.maxTpRetry", reply.err.maxTpRetry),
castor::dlf::Param("err.maxCpRetry", reply.err.maxCpRetry)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_GAVE_FILE_INFO, 62, params);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_GIVE_FILE_INFO, 3, params);
}
}
// Signal the end of the file list to RTCPD
try {
Transceiver::signalNoMoreRequestsToRtcpd(cuuid, *socket, NETRWTIMEOUT);
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_SIGNALLED_NO_MORE_REQUESTS);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("Function", __PRETTY_FUNCTION__),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
castor::dlf::dlf_writep(cuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_SIGNAL_NO_MORE_REQUESTS, 3, params);
}