Commit 2cee1cca authored by Steven Murray's avatar Steven Murray
Browse files

Aggregator server and clients now use utils::getPortFromConfig. Plus the...

Aggregator server and clients now use utils::getPortFromConfig.  Plus the manual pages have been updated to describe the port numbers which can be set as CASTOR configuration parameters
parent e602fe22
......@@ -33,14 +33,9 @@
namespace castor {
namespace tape {
const char *const CLIENT_CONF = "CLIENT";
const char *const HIGHPORT_CONF = "HIGHPORT";
const int HIGH_CLIENT_PORT_RANGE = 30100;
const char *const LOWPORT_CONF = "LOWPORT";
const int LOW_CLIENT_PORT_RANGE = 30000;
const size_t RTCPDNETRWTIMEOUT = 5;
const size_t STRERRORBUFLEN = 256;
const size_t VMGRERRORBUFLEN = 512;
const size_t RTCPDNETRWTIMEOUT = 5;
const size_t STRERRORBUFLEN = 256;
const size_t VMGRERRORBUFLEN = 512;
} // namespace tape
} // namespace castor
......
......@@ -259,44 +259,14 @@ void castor::tape::aggregator::AggregatorDaemon::parseCommandLine(
}
//------------------------------------------------------------------------------
// getVdqmListenPort()
//------------------------------------------------------------------------------
int castor::tape::aggregator::AggregatorDaemon::getVdqmListenPort()
throw(castor::exception::InvalidConfigEntry) {
int port = AGGREGATOR_VDQMPORT; // Initialise to default value
const char *const configEntry = getconfent("TAPEAGGREGATOR", "VDQMPORT", 0);
if(configEntry != NULL) {
if(utils::isValidUInt(configEntry)) {
port = atoi(configEntry);
} else {
castor::exception::InvalidConfigEntry ex("TAPEAGGREGATOR", "VDQMPORT",
configEntry);
ex.getMessage()
<< "File=" << __FILE__
<< " Line=" << __LINE__
<< " Function=" << __FUNCTION__
<< ": Invalid configuration entry: "
<< ex.getEntryCategory() << " " << ex.getEntryName() << " "
<< ex.getEntryValue();
throw ex;
}
}
return port;
}
//------------------------------------------------------------------------------
// createVdqmRequestHandlerPool
//------------------------------------------------------------------------------
void castor::tape::aggregator::AggregatorDaemon::
createVdqmRequestHandlerPool() throw(castor::exception::Exception) {
const int vdqmListenPort = getVdqmListenPort();
const int vdqmListenPort = utils::getPortFromConfig("AGGREGATOR", "VDQMPORT",
AGGREGATOR_VDQMPORT);
server::IThread *iThread =
new castor::tape::aggregator::VdqmRequestHandler();
server::BaseThreadPool* threadPool =
......
......@@ -121,12 +121,6 @@ private:
*/
static castor::dlf::Message s_dlfMessages[];
/**
* Returns the port on which the server will listen for connections from the
* VDQM.
*/
int getVdqmListenPort() throw(castor::exception::InvalidConfigEntry);
}; // class AggregatorDaemon
} // namespace aggregator
......
......@@ -34,19 +34,67 @@ namespace castor {
namespace tape {
namespace aggregator {
const int CLIENTPINGTIMEOUT = 5;
const size_t HDRBUFSIZ = 3 * sizeof(uint32_t);
const int MAXDRIVES = 4;
const int MAXINITMIGFILES = 2;
const int MAXPENDINGTRANSFERS = 100;
const uint32_t MIGRATEUMASK = 022;
const size_t MSGBUFSIZ = 1024;
/**
* The minumim timeout in seconds between pings to the aggregator clients.
* Please note that the tape gateway is not pinged.
*/
const int CLIENTPINGTIMEOUT = 5;
/**
* The size of an RTCOPY message header. The header contains 3 32-bit integer
* fields: magic number, request type and length or status.
*/
const size_t HDRBUFSIZ = 3 * sizeof(uint32_t);
/**
* The maximum number of tape drives a single aggregator can cope with.
*/
const int MAXDRIVES = 4;
/**
* The maximum number of pending transfers between the aggregator and RTCPD.
* The actual number of outstanding pending transfers is equal to the number
* of disk IO when all of the disk IO threads are busy transfering files.
*/
const int MAXPENDINGTRANSFERS = 100;
const uint32_t MIGRATEUMASK = 022;
/**
* The maximum size of an RTCOPY message in bytes including both message
* header and message body.
*/
const size_t RTCPMSGBUFSIZE = 4096;
/**
* The program name of the aggregator daemon.
*/
const char *const AGGREGATORPROGRAMNAME = "aggregatord";
const uint32_t RECALLUMASK = 077;
const char *const RECORDFORMAT = "F";
const int RTCPDNETRWTIMEOUT = 5;
const int RTCPDCALLBACKTIMEOUT = 5;
const int RTCPDPINGTIMEOUT = 30;
/**
* The tape record format.
*/
const char *const RECORDFORMAT = "F";
/**
* The timeout in seconds to be used when the network reads and writes of the
* RTCOPY protocol.
*/
const int RTCPDNETRWTIMEOUT = 5;
/**
* The timeout in seconds for which the aggregator waits for RTCPD to call it
* back.
*/
const int RTCPDCALLBACKTIMEOUT = 5;
/**
* The minimum timeout in seconds between sending a ping message from the
* aggregator to RTCPD.
*/
const int RTCPDPINGTIMEOUT = 30;
} // namespace aggregator
} // namespace tape
......
......@@ -115,7 +115,7 @@ castor::tape::tapegateway::Volume
// Acknowledge the VDQM - maybe positive or negative depending on reply
// from RTCPD
char vdqmReplyBuf[MSGBUFSIZ];
char vdqmReplyBuf[RTCPMSGBUFSIZE];
size_t vdqmReplyLen = 0;
vdqmReplyLen = RtcpMarshaller::marshall(vdqmReplyBuf, rtcpdReply);
net::writeBytes(vdqmSock.socket(), RTCPDNETRWTIMEOUT, vdqmReplyLen,
......
......@@ -105,7 +105,7 @@ void castor::tape::aggregator::RcpJobSubmitter::submit(
strcpy(request.clientUserName , clientUserName.c_str());
// Marshall the job submission request message
char buf[MSGBUFSIZ];
char buf[RTCPMSGBUFSIZE];
size_t totalLen = 0;
try {
......@@ -208,7 +208,7 @@ void castor::tape::aggregator::RcpJobSubmitter::readReply(
}
// Length of message header = 3 * sizeof(uint32_t)
char bodyBuf[MSGBUFSIZ - 3 * sizeof(uint32_t)];
char bodyBuf[RTCPMSGBUFSIZE - 3 * sizeof(uint32_t)];
// If the message body is larger than the message body buffer
if(header.lenOrStatus > sizeof(bodyBuf)) {
......
......@@ -67,7 +67,7 @@ void castor::tape::aggregator::RtcpTxRx::getRequestInfoFromRtcpd(
utils::setBytes(request, '\0');
// Marshall the request
char buf[MSGBUFSIZ];
char buf[RTCPMSGBUFSIZE];
size_t totalLen = 0;
try {
totalLen = RtcpMarshaller::marshall(buf, request);
......@@ -148,7 +148,7 @@ void castor::tape::aggregator::RtcpTxRx::giveVolumeToRtcpd(
AGGREGATOR_GIVE_VOLUME_TO_RTCPD, volReqId, socketFd, request);
// Marshall the message
char buf[MSGBUFSIZ];
char buf[RTCPMSGBUFSIZE];
size_t totalLen = 0;
try {
totalLen = RtcpMarshaller::marshall(buf, request);
......@@ -211,7 +211,7 @@ void castor::tape::aggregator::RtcpTxRx::giveFileToRtcpd(
}
// Marshall the message
char buf[MSGBUFSIZ];
char buf[RTCPMSGBUFSIZE];
size_t totalLen = 0;
try {
totalLen = RtcpMarshaller::marshall(buf, request);
......@@ -274,7 +274,7 @@ void castor::tape::aggregator::RtcpTxRx::tellRtcpdDumpTape(
volReqId, socketFd, request);
// Marshall the message
char buf[MSGBUFSIZ];
char buf[RTCPMSGBUFSIZE];
size_t totalLen = 0;
try {
totalLen = RtcpMarshaller::marshall(buf, request);
......@@ -330,7 +330,7 @@ void castor::tape::aggregator::RtcpTxRx::sendMessageHeader(
const int netReadWriteTimeout, const MessageHeader &header)
throw(castor::exception::Exception) {
char buf[MSGBUFSIZ];
char buf[RTCPMSGBUFSIZE];
size_t totalLen = 0;
try {
......@@ -376,7 +376,7 @@ void castor::tape::aggregator::RtcpTxRx::pingRtcpd(const Cuuid_t &cuuid,
const uint32_t volReqId, const int socketFd, const int netReadWriteTimeout)
throw(castor::exception::Exception) {
char buf[MSGBUFSIZ];
char buf[RTCPMSGBUFSIZE];
size_t totalLen = 0;
MessageHeader header;
......@@ -438,7 +438,7 @@ void castor::tape::aggregator::RtcpTxRx::tellRtcpdEndOfFileList(
// Marshall the message
RtcpNoMoreRequestsMsgBody body;
char buf[MSGBUFSIZ];
char buf[RTCPMSGBUFSIZE];
size_t totalLen = 0;
try {
totalLen = RtcpMarshaller::marshall(buf, body);
......@@ -525,7 +525,7 @@ void castor::tape::aggregator::RtcpTxRx::tellRtcpdToAbort(const Cuuid_t &cuuid,
// Marshall the message
RtcpAbortMsgBody body;
char buf[MSGBUFSIZ];
char buf[RTCPMSGBUFSIZE];
size_t totalLen = 0;
try {
totalLen = RtcpMarshaller::marshall(buf, body);
......@@ -614,7 +614,7 @@ void castor::tape::aggregator::RtcpTxRx::receiveRcpJobRqst(const Cuuid_t &cuuid,
checkRtcopyReqType(VDQM_CLIENTINFO, header.reqType, __FUNCTION__);
// Length of body buffer = Length of message buffer - length of header
char bodyBuf[MSGBUFSIZ - 3 * sizeof(uint32_t)];
char bodyBuf[RTCPMSGBUFSIZE - 3 * sizeof(uint32_t)];
// If the message body is too large
if(header.lenOrStatus > sizeof(bodyBuf)) {
......@@ -708,7 +708,7 @@ void castor::tape::aggregator::RtcpTxRx::askRtcpdToRequestMoreWork(
request.err.maxCpRetry = -1;
// Marshall the message
char buf[MSGBUFSIZ];
char buf[RTCPMSGBUFSIZE];
size_t totalLen = 0;
try {
totalLen = RtcpMarshaller::marshall(buf, request);
......
......@@ -27,6 +27,7 @@
#include "castor/exception/Exception.hpp"
#include "castor/io/ClientSocket.hpp"
#include "castor/tape/aggregator/Constants.hpp"
#include "castor/tape/aggregator/LogHelper.hpp"
#include "castor/tape/aggregator/MessageHeader.hpp"
#include "castor/tape/aggregator/RtcpDumpTapeRqstMsgBody.hpp"
......@@ -240,7 +241,7 @@ public:
T &body) throw(castor::exception::Exception) {
// Length of body buffer = Length of message buffer - length of header
char bodyBuf[MSGBUFSIZ - 3 * sizeof(uint32_t)];
char bodyBuf[RTCPMSGBUFSIZE - 3 * sizeof(uint32_t)];
// If the message body is too large
if(header.lenOrStatus > sizeof(bodyBuf)) {
......
......@@ -22,6 +22,7 @@
* @author Nicola.Bessone@cern.ch Steven.Murray@cern.ch
*****************************************************************************/
#include "castor/PortNumbers.hpp"
#include "castor/dlf/Dlf.hpp"
#include "castor/exception/Exception.hpp"
#include "castor/exception/Internal.hpp"
......@@ -128,7 +129,13 @@ void castor::tape::aggregator::VdqmRequestHandler::run(void *param)
// Create, bind and mark a listen socket for RTCPD callback connections
// Wrap the socket file descriptor in a smart file descriptor so that it is
// guaranteed to be closed when it goes out of scope.
SmartFd rtcpdCallbackSockFd(net::createListenerSock("127.0.0.1",0));
const unsigned short lowPort = utils::getPortFromConfig(
"AGGREGATORRTCPD", "LOWPORT", AGGREGATORRTCPD_LOWPORT);
const unsigned short highPort = utils::getPortFromConfig(
"AGGREGATORRTCPD", "HIGHPORT", AGGREGATORRTCPD_HIGHPORT);
unsigned short chosenPort = 0;
SmartFd rtcpdCallbackSockFd(net::createListenerSock("127.0.0.1", lowPort,
highPort, chosenPort));
// Get the IP, host name and port of the callback port
unsigned long rtcpdCallbackIp = 0;
......
......@@ -10,7 +10,7 @@
.\" You should have received a copy of the GNU General Public License
.\" along with this program; if not, write to the Free Software
.\" Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
.TH AGGREGATORD 8 "$Date: 2009/08/06 21:17:48 $" CASTOR "CASTOR"
.TH AGGREGATORD 8 "$Date: 2009/08/07 15:56:38 $" CASTOR "CASTOR"
.SH NAME
aggregatord \- Tape aggregator daemon
.SH SYNOPSIS
......@@ -42,8 +42,21 @@ Prints the usage message
.SH CASTOR CONFIGURATION PARAMETERS
The tape aggregator daemon reads and uses the following CASTOR configuration
parameters which are specified within the CASTOR configuration file (the
default location is /etc/castor/castor.conf).
parameters which are specified using environment variables or within the CASTOR
configuration file (the default location is /etc/castor/castor.conf).
.TP
\fBAGGREGATOR VDQMPORT
The port on which the aggregator will listen for RTCOPY jobs from the VDQM.
The default is 5070. This configuration parameter should \fBNOT\fP normally be
set and the default should be used.
.TP
\fBAGGREGATORRTCPD LOWPORT
The inclusive low port of the aggregator's RTCPD callback port number range.
The default is 300101.
.TP
\fBAGGREGATORRTCPD HIGHPORT
The inclusive high port of the aggregator's RTCPD callback port number range.
The default is 300200.
.TP
\fBaggregatord LOGSTANDARD XXXX
Sets the location of the \fBaggregatord\fP log file to be \fBXXXX\fP.
......
......@@ -561,30 +561,6 @@ int castor::tape::tpcp::TpcpCommand::main(const char *const programName,
}
//------------------------------------------------------------------------------
// getVdqmListenPort()
//------------------------------------------------------------------------------
int castor::tape::tpcp::TpcpCommand::getVdqmListenPort()
throw(castor::exception::Exception) {
int port = AGGREGATOR_VDQMPORT; // Initialise to default value
const char *const configEntry = getconfent("TAPEAGGREGATOR", "VDQMPORT", 0);
if(configEntry != NULL) {
if(utils::isValidUInt(configEntry)) {
port = atoi(configEntry);
} else {
TAPE_THROW_EX(castor::exception::Internal,
": Invalid configuration entry:"
<< configEntry);
}
}
return port;
}
//------------------------------------------------------------------------------
// calculateMinNbOfFiles
//------------------------------------------------------------------------------
......@@ -640,16 +616,10 @@ void castor::tape::tpcp::TpcpCommand::vmgrQueryTape(
void castor::tape::tpcp::TpcpCommand::setupCallbackSock()
throw(castor::exception::Exception) {
// Get the port range to be used by the aggregator callback socket
int lowPort = LOW_CLIENT_PORT_RANGE;
int highPort = HIGH_CLIENT_PORT_RANGE;
char* sport = NULL;
if((sport = getconfent((char *)CLIENT_CONF,(char *)LOWPORT_CONF,0)) != 0) {
lowPort = castor::System::porttoi(sport);
}
if((sport = getconfent((char *)CLIENT_CONF,(char *)HIGHPORT_CONF,0)) != 0) {
highPort = castor::System::porttoi(sport);
}
const unsigned short lowPort = utils::getPortFromConfig(
"AGGREGATORCLIENT", "LOWPORT", AGGREGATORCLIENT_LOWPORT);
const unsigned short highPort = utils::getPortFromConfig(
"AGGREGATORCLIENT", "HIGHPORT", AGGREGATORCLIENT_HIGHPORT);
// Bind the aggregator callback socket
m_callbackSock.bind(lowPort, highPort);
......@@ -1067,6 +1037,8 @@ void castor::tape::tpcp::TpcpCommand::deleteVdqmVolumeRequest()
": " << errorStr);
}
}
//------------------------------------------------------------------------------
// checkFilenameFormat
//------------------------------------------------------------------------------
......@@ -1092,7 +1064,7 @@ void castor::tape::tpcp::TpcpCommand::checkFilenameFormat()
castor::exception::Exception ex(ECANCELED);
ex.getMessage() <<
": Invalid RFIO filename syntax"
": Filename must identiry a regular file"
": Filename must identify a regular file"
": filename=\"" << line <<"\"";
throw ex;
......@@ -1119,7 +1091,7 @@ void castor::tape::tpcp::TpcpCommand::checkFilenameFormat()
castor::exception::Exception ex(ECANCELED);
ex.getMessage() <<
": Invalid RFIO filename syntax"
": Found ':/' character whith not hostname specified"
": Found ':/' character with no hostname specified"
": filename=\"" << line <<"\"";
throw ex;
......
......@@ -33,8 +33,8 @@
#include "castor/tape/tpcp/FilenameList.hpp"
#include "castor/tape/tpcp/ParsedCommandLine.hpp"
#include "castor/tape/utils/utils.hpp"
#include "h/vmgr_api.h"
#include "h/Castor_limits.h"
#include "h/vmgr_api.h"
#include <iostream>
#include <list>
......@@ -186,12 +186,6 @@ protected:
*/
uint64_t m_fileTransactionId;
/**
* Returns the port on which the server will listen for connections from the
* VDQM.
*/
int getVdqmListenPort() throw(castor::exception::Exception);
/**
* Calculate the minimum number of files specified in the tape file
* ranges provided as a command-line parameter.
......
......@@ -10,7 +10,7 @@
.\" You should have received a copy of the GNU General Public License
.\" along with this program; if not, write to the Free Software
.\" Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
.TH DUMPTP 1 "$Date: 2009/07/31 14:07:48 $" CASTOR "CASTOR"
.TH DUMPTP 1 "$Date: 2009/08/07 15:56:38 $" CASTOR "CASTOR"
.SH NAME
dumptp \- dump the contents of a tape
.SH SYNOPSIS
......@@ -61,6 +61,25 @@ default is 1.
End block per file. This should be an unsigned integer greater than 0. The
default is 1.
.SH CASTOR CONFIGURATION PARAMETERS
The \fBdumptp\fP command reads and uses the following CASTOR configuration
parameters which are specified using environment variables or within the CASTOR
configuration file (the default location is /etc/castor/castor.conf).
.TP
\fBAGGREGATORCLIENT LOWPORT
The inclusive low port of the aggregator client callback port number range.
The default is 300201.
.TP
\fBAGGREGATORCLIENT HIGHPORT
The inclusive high port of the aggregator client callback port number range.
The default is 300300.
.TP
\fBVDQM HOST
The name of the host on which the VDQM daemon is running.
.TP
\fBVMGR HOST
The name of the host on which the VMGR daemon is running.
.SH EXAMPLE
The following example dumps the contents of the tape with the VID \fBI10553\fP.
.P
......
......@@ -10,7 +10,7 @@
.\" You should have received a copy of the GNU General Public License
.\" along with this program; if not, write to the Free Software
.\" Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
.TH READTP 1 "$Date: 2009/08/05 09:34:14 $" CASTOR "CASTOR"
.TH READTP 1 "$Date: 2009/08/07 15:56:38 $" CASTOR "CASTOR"
.SH NAME
readtp \- read from a tape
.SH SYNOPSIS
......@@ -71,6 +71,25 @@ ignored. Whitespace is allowed between the beginning of the line and a '#'
character. Please note that this option is mutually exclusive with the method
of providing filenames on the command-line.
.SH CASTOR CONFIGURATION PARAMETERS
The \fBreadtp\fP command reads and uses the following CASTOR configuration
parameters which are specified using environment variables or within the CASTOR
configuration file (the default location is /etc/castor/castor.conf).
.TP
\fBAGGREGATORCLIENT LOWPORT
The inclusive low port of the aggregator client callback port number range.
The default is 300201.
.TP
\fBAGGREGATORCLIENT HIGHPORT
The inclusive high port of the aggregator client callback port number range.
The default is 300300.
.TP
\fBVDQM HOST
The name of the host on which the VDQM daemon is running.
.TP
\fBVMGR HOST
The name of the host on which the VMGR daemon is running.
.SH EXAMPLE
The follwing example reads the first and fourth files from the tape with the
VID \fBI10553\fP and writes them to the files \fBlxc2disk07:/tmp/test_a\fP and
......
......@@ -10,7 +10,7 @@
.\" You should have received a copy of the GNU General Public License
.\" along with this program; if not, write to the Free Software
.\" Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
.TH WRITETP 1 "$Date: 2009/08/05 09:34:14 $" CASTOR "CASTOR"
.TH WRITETP 1 "$Date: 2009/08/07 15:56:38 $" CASTOR "CASTOR"
.SH NAME
writetp \- write to a tape
.SH SYNOPSIS
......@@ -54,6 +54,25 @@ ignored. Whitespace is allowed between the beginning of the line and a '#'
character. Please note that this option is mutually exclusive with the method
of providing filenames on the command-line.
.SH CASTOR CONFIGURATION PARAMETERS
The \fBwritetp\fP command reads and uses the following CASTOR configuration
parameters which are specified using environment variables or within the CASTOR
configuration file (the default location is /etc/castor/castor.conf).
.TP
\fBAGGREGATORCLIENT LOWPORT
The inclusive low port of the aggregator client callback port number range.
The default is 300201.
.TP
\fBAGGREGATORCLIENT HIGHPORT
The inclusive high port of the aggregator client callback port number range.
The default is 300300.
.TP
\fBVDQM HOST
The name of the host on which the VDQM daemon is running.
.TP
\fBVMGR HOST
The name of the host on which the VMGR daemon is running.
.SH EXAMPLE
The following example appends all of the files specified in the file
\fBfile_list\fP to the end of the tape with the VID \fBI02024\fP.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment