Commit 66e8313d authored by Steven Murray's avatar Steven Murray
Browse files

The DriveAllocationProtocolEngine now sends a tapegateway...

The DriveAllocationProtocolEngine now sends a tapegateway EndNotificationErrorReport if it detects an error.  Tests have shown that RTCPD may not make the initial callback under certain circumstances.  This is an error and should therefore be report to the TapeGateway
parent 28d81b26
......@@ -45,7 +45,7 @@ namespace aggregator {
const char *const RECORDFORMAT = "F";
const int RTCPDNETRWTIMEOUT = 5;
const int RTCPDCALLBACKTIMEOUT = 5;
const int RTCPDPINGTIMEOUT = 10;
const int RTCPDPINGTIMEOUT = 30;
} // namespace aggregator
} // namespace tape
......
......@@ -43,68 +43,26 @@
#include <iostream>
//-----------------------------------------------------------------------------
// checkRcpJobSubmitterIsAuthorised
//-----------------------------------------------------------------------------
void castor::tape::aggregator::DriveAllocationProtocolEngine::
checkRcpJobSubmitterIsAuthorised(const int socketFd)
throw(castor::exception::Exception) {
char peerHost[CA_MAXHOSTNAMELEN+1];
// isadminhost fills in peerHost
const int rc = isadminhost(socketFd, peerHost);
if(rc == -1 && serrno != SENOTADMIN) {
TAPE_THROW_EX(castor::exception::Internal,
": Failed to lookup connection"
<< ": Peer Host: " << peerHost);
}
if(*peerHost == '\0' ) {
TAPE_THROW_CODE(EINVAL,
": Peer host name is an empty string");
}
if(rc != 0) {
TAPE_THROW_CODE(SENOTADMIN,
": Unauthorized host"
<< ": Peer Host: " << peerHost);
}
}
//-----------------------------------------------------------------------------
// run
//-----------------------------------------------------------------------------
bool castor::tape::aggregator::DriveAllocationProtocolEngine::run(
const Cuuid_t &cuuid, castor::io::AbstractTCPSocket &vdqmSock,
const int rtcpdCallbackSockFd, const char *rtcpdCallbackHost,
const unsigned short rtcpdCallbackPort, uint32_t &volReqId,
char (&gatewayHost)[CA_MAXHOSTNAMELEN+1], unsigned short &gatewayPort,
SmartFd &rtcpdInitialSockFd, char (&unit)[CA_MAXUNMLEN+1],
tapegateway::Volume &volume) throw(castor::exception::Exception) {
RcpJobRqstMsgBody jobRequest;
utils::setBytes(jobRequest, '\0');
checkRcpJobSubmitterIsAuthorised(vdqmSock.socket());
RtcpTxRx::receiveRcpJobRqst(cuuid, vdqmSock.socket(), RTCPDNETRWTIMEOUT,
jobRequest);
// Extract the volume request ID, gateway host and gateway port
volReqId = jobRequest.tapeRequestId;
utils::copyString(gatewayHost, jobRequest.clientHost);
gatewayPort = jobRequest.clientPort;
const Cuuid_t &cuuid,
castor::io::AbstractTCPSocket &vdqmSock,
const int rtcpdCallbackSockFd,
const char *rtcpdCallbackHost,
const unsigned short rtcpdCallbackPort,
SmartFd &rtcpdInitialSockFd,
const RcpJobRqstMsgBody &jobRequest,
tapegateway::Volume &volume)
throw(castor::exception::Exception) {
// Pass a modified version of the job request through to RTCPD, setting the
// clientHost and clientPort parameters to identify the tape aggregator as
// being a proxy for RTCPClientD
{
castor::dlf::Param params[] = {
castor::dlf::Param("volReqId" , jobRequest.tapeRequestId ),
castor::dlf::Param("volReqId" , jobRequest.volReqId ),
castor::dlf::Param("Port" , rtcpdCallbackPort ),
castor::dlf::Param("HostName" , rtcpdCallbackHost ),
castor::dlf::Param("clientHost" , jobRequest.clientHost ),
......@@ -113,7 +71,7 @@ bool castor::tape::aggregator::DriveAllocationProtocolEngine::run(
castor::dlf::Param("clientEuid" , jobRequest.clientEuid ),
castor::dlf::Param("clientEgid" , jobRequest.clientEgid ),
castor::dlf::Param("deviceGroupName", jobRequest.deviceGroupName),
castor::dlf::Param("driveName" , jobRequest.driveName )};
castor::dlf::Param("driveUnit" , jobRequest.driveUnit )};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_SUBMITTING_JOB_TO_RTCPD, params);
}
......@@ -124,14 +82,14 @@ bool castor::tape::aggregator::DriveAllocationProtocolEngine::run(
RTCOPY_PORT, // port
RTCPDNETRWTIMEOUT, // netReadWriteTimeout
"RTCPD", // remoteCopyType
jobRequest.tapeRequestId,
jobRequest.volReqId,
jobRequest.clientUserName,
rtcpdCallbackHost,
rtcpdCallbackPort,
jobRequest.clientEuid,
jobRequest.clientEgid,
jobRequest.deviceGroupName,
jobRequest.driveName,
jobRequest.driveUnit,
rtcpdReply);
// Prepare a positive response for the VDQM which will be overwritten if
......@@ -144,7 +102,7 @@ bool castor::tape::aggregator::DriveAllocationProtocolEngine::run(
// even if there is no error
if(strlen(rtcpdReply.errorMessage) > 0) {
castor::dlf::Param params[] = {
castor::dlf::Param("volReqId", jobRequest.tapeRequestId),
castor::dlf::Param("volReqId", jobRequest.volReqId),
castor::dlf::Param("Message" , rtcpdReply.errorMessage ),
castor::dlf::Param("Code" , rtcpdReply.status )};
CASTOR_DLF_WRITEPC(cuuid, DLF_LVL_ERROR,
......@@ -191,7 +149,7 @@ bool castor::tape::aggregator::DriveAllocationProtocolEngine::run(
net::getPeerHostName(rtcpdInitialSockFd.get(), hostName);
castor::dlf::Param params[] = {
castor::dlf::Param("volReqId", volReqId),
castor::dlf::Param("volReqId", jobRequest.volReqId ),
castor::dlf::Param("IP" , castor::dlf::IPAddress(ip) ),
castor::dlf::Param("Port" , port ),
castor::dlf::Param("HostName", hostName ),
......@@ -205,22 +163,31 @@ bool castor::tape::aggregator::DriveAllocationProtocolEngine::run(
// Get the request informatiom and the drive unit from RTCPD
RtcpTapeRqstErrMsgBody rtcpdRequestInfoReply;
RtcpTxRx::getRequestInfoFromRtcpd(cuuid, volReqId, rtcpdInitialSockFd.get(),
RTCPDNETRWTIMEOUT, rtcpdRequestInfoReply);
utils::copyString(unit, rtcpdRequestInfoReply.unit);
RtcpTxRx::getRequestInfoFromRtcpd(cuuid, jobRequest.volReqId,
rtcpdInitialSockFd.get(), RTCPDNETRWTIMEOUT, rtcpdRequestInfoReply);
// If the VDQM and RTCPD drive units do not match
if(strcmp(jobRequest.driveUnit, rtcpdRequestInfoReply.unit) != 0) {
TAPE_THROW_CODE(EBADMSG,
": VDQM and RTCPD drive units do not match"
": VDQM drive unit='" << jobRequest.driveUnit << "'"
" RTCPD drive unit='" << rtcpdRequestInfoReply.volReqId << "'");
}
// If the VDQM and RTCPD volume request IDs do not match
if(rtcpdRequestInfoReply.volReqId != volReqId) {
if(jobRequest.volReqId != rtcpdRequestInfoReply.volReqId) {
TAPE_THROW_CODE(EBADMSG,
": VDQM and RTCPD volume request Ids do not match"
": VDQM volume request ID: " << volReqId
<< ": RTCPD volume request ID: " << rtcpdRequestInfoReply.volReqId);
": VDQM and RTCPD volume request Ids do not match"
": VDQM volume request ID=" << jobRequest.volReqId <<
" RTCPD volume request ID=" << rtcpdRequestInfoReply.volReqId);
}
// Get the volume from the tape gateway
const bool thereIsAVolume = GatewayTxRx::getVolumeFromGateway(cuuid,
volReqId, gatewayHost, gatewayPort, unit, volume);
jobRequest.volReqId, jobRequest.clientHost, jobRequest.clientPort,
jobRequest.driveUnit, volume);
return thereIsAVolume;
}
......
......@@ -28,6 +28,7 @@
#include "castor/dlf/Dlf.hpp"
#include "castor/exception/Exception.hpp"
#include "castor/io/AbstractTCPSocket.hpp"
#include "castor/tape/aggregator/RcpJobRqstMsgBody.hpp"
#include "castor/tape/aggregator/SmartFd.hpp"
#include "castor/tape/fsm/StateMachine.hpp"
#include "castor/tape/tapegateway/Volume.hpp"
......@@ -51,6 +52,9 @@ public:
* Execute the drive allocation protocol which will result in the volume
* information being received from the tape gateway.
*
* This method will send an EndNotificationErrorReport message to the tape
* gateway if an error is detected.
*
* @param cuuid The ccuid to be used for logging.
* @param vdqmSock The socket of the VDQM connection.
* @param rtcpdCallbackSockFd The file descriptor of the listener socket to
......@@ -60,22 +64,22 @@ public:
* to accept callback connections from RTCPD.
* @param rtcpdCallbackPort The port number of the listener socket to be
* used to accept callback connections from RTCPD.
* @param volReqId Out parameter: The volume request ID.
* @param gatewayHost Out parameter: The tape gateway host name.
* @param gatewayPort Out parameter: The tape gateway port number.
* @param rtcpdInitialSockFd Out parameter: The socket file descriptor of
* the initial RTCPD connection.
* @param unit Out parameter: The drive unit returned by RTCPD.
* @param jobRequest The RTCOPY job requext from the VDQM.
* @param volume Out parameter: The volume message received from
* the tape gateway.
* @return True if there is a volume to mount.
*/
bool run(const Cuuid_t &cuuid, castor::io::AbstractTCPSocket &vdqmSock,
const int rtcpdCallbackSockFd, const char *rtcpdCallbackHost,
const unsigned short rtcpdCallbackPort, uint32_t &volReqId,
char (&gatewayHost)[CA_MAXHOSTNAMELEN+1], unsigned short &gatewayPort,
SmartFd &rtcpdInitialSockFd, char (&unit)[CA_MAXUNMLEN+1],
tapegateway::Volume &volume)
bool run(
const Cuuid_t &cuuid,
castor::io::AbstractTCPSocket &vdqmSock,
const int rtcpdCallbackSockFd,
const char *rtcpdCallbackHost,
const unsigned short rtcpdCallbackPort,
SmartFd &rtcpdInitialSockFd,
const RcpJobRqstMsgBody &jobRequest,
tapegateway::Volume &volume)
throw(castor::exception::Exception);
/**
......@@ -92,15 +96,6 @@ private:
*/
fsm::StateMachine m_fsm;
/**
* Throws an exception if the peer host associated with the specified
* socket is not an authorised RCP job submitter.
*
* @param socketFd The socket file descriptor.
*/
void checkRcpJobSubmitterIsAuthorised(const int socketFd)
throw(castor::exception::Exception);
const char *getReqFromRtcpd();
const char *getVolFromTGate();
......
......@@ -38,13 +38,13 @@ void castor::tape::aggregator::LogHelper::logMsgBody(const Cuuid_t &cuuid,
castor::dlf::Param params[] = {
castor::dlf::Param("volReqId" , volReqId ),
castor::dlf::Param("socketFd" , socketFd ),
castor::dlf::Param("tapeRequestId" , body.tapeRequestId ),
castor::dlf::Param("volReqId" , body.volReqId ),
castor::dlf::Param("clientPort" , body.clientPort ),
castor::dlf::Param("clientEuid" , body.clientEuid ),
castor::dlf::Param("clientEgid" , body.clientEgid ),
castor::dlf::Param("clientHost" , body.clientHost ),
castor::dlf::Param("deviceGroupName", body.deviceGroupName),
castor::dlf::Param("driveName" , body.driveName ),
castor::dlf::Param("driveUnit" , body.driveUnit ),
castor::dlf::Param("clientUserName" , body.clientUserName )};
castor::dlf::dlf_writep(cuuid, severity, message_no, params);
}
......
......@@ -38,14 +38,14 @@ namespace aggregator {
* An RCP job submission request message.
*/
struct RcpJobRqstMsgBody {
uint32_t tapeRequestId;
uint32_t volReqId;
uint32_t clientPort;
uint32_t clientEuid;
uint32_t clientEgid;
char clientHost[CA_MAXHOSTNAMELEN+1];
char deviceGroupName[CA_MAXDGNLEN+1];
char driveName[CA_MAXUNMLEN+1];
char clientUserName[CA_MAXUSRNAMELEN+1];
char clientHost[CA_MAXHOSTNAMELEN+1];
char deviceGroupName[CA_MAXDGNLEN+1];
char driveUnit[CA_MAXUNMLEN+1];
char clientUserName[CA_MAXUSRNAMELEN+1];
};
} // namespace aggregator
......
......@@ -52,14 +52,14 @@ void castor::tape::aggregator::RcpJobSubmitter::submit(
const unsigned int port,
const int netReadWriteTimeout,
const char *remoteCopyType,
const u_signed64 tapeRequestId,
const u_signed64 volReqId,
const std::string &clientUserName,
const std::string &clientHost,
const int clientPort,
const int clientEuid,
const int clientEgid,
const std::string &deviceGroupName,
const std::string &driveName,
const std::string &driveUnit,
RcpJobReplyMsgBody &reply)
throw(castor::exception::Exception) {
......@@ -80,11 +80,11 @@ void castor::tape::aggregator::RcpJobSubmitter::submit(
": Maximum: " << (sizeof(request.deviceGroupName) - 1) <<
": Actual: " << deviceGroupName.length());
}
if(driveName.length() > sizeof(request.driveName) - 1) {
if(driveUnit.length() > sizeof(request.driveUnit) - 1) {
TAPE_THROW_CODE(EINVAL,
": Length of driveName string is too large"
": Maximum: " << (sizeof(request.driveName) - 1) <<
": Actual: " << driveName.length());
": Length of driveUnit string is too large"
": Maximum: " << (sizeof(request.driveUnit) - 1) <<
": Actual: " << driveUnit.length());
}
if(clientUserName.length() > sizeof(request.clientUserName) - 1) {
TAPE_THROW_CODE(EINVAL,
......@@ -95,13 +95,13 @@ void castor::tape::aggregator::RcpJobSubmitter::submit(
// Prepare the job submission request information ready for marshalling
// The validity of the string length were check above
request.tapeRequestId = tapeRequestId;
request.volReqId = volReqId;
request.clientPort = clientPort;
request.clientEuid = clientEuid;
request.clientEgid = clientEgid;
strcpy(request.clientHost , clientHost.c_str());
strcpy(request.deviceGroupName, deviceGroupName.c_str());
strcpy(request.driveName , driveName.c_str());
strcpy(request.driveUnit , driveUnit.c_str());
strcpy(request.clientUserName , clientUserName.c_str());
// Marshall the job submission request message
......
......@@ -66,7 +66,7 @@ namespace aggregator {
* @param clientEuid The client user ID.
* @param clientEgid The client group ID.
* @param deviceGroupName The device group name.
* @param driveName The tape drive name.
* @param driveUnit The tape drive name.
* @param reply The reply from RTCPD which may be positive or negative.
*/
static void submit(
......@@ -81,7 +81,7 @@ namespace aggregator {
const int clientEuid,
const int clientEgid,
const std::string &deviceGroupName,
const std::string &driveName,
const std::string &driveUnit,
RcpJobReplyMsgBody &reply)
throw(castor::exception::Exception);
......
......@@ -299,7 +299,7 @@ size_t castor::tape::aggregator::RtcpMarshaller::marshall(char *const dst,
4*sizeof(uint32_t) +
strlen(src.clientHost) +
strlen(src.deviceGroupName) +
strlen(src.driveName) +
strlen(src.driveUnit) +
strlen(src.clientUserName) +
4; // 4 = the number of string termination characters
......@@ -320,13 +320,13 @@ size_t castor::tape::aggregator::RtcpMarshaller::marshall(char *const dst,
marshallUint32(RTCOPY_MAGIC_OLD0 , p); // Magic number
marshallUint32(VDQM_CLIENTINFO , p); // Request type
marshallUint32(len , p); // Length of message body
marshallUint32(src.tapeRequestId , p);
marshallUint32(src.volReqId , p);
marshallUint32(src.clientPort , p);
marshallUint32(src.clientEuid , p);
marshallUint32(src.clientEgid , p);
marshallString(src.clientHost , p);
marshallString(src.deviceGroupName, p);
marshallString(src.driveName , p);
marshallString(src.driveUnit , p);
marshallString(src.clientUserName , p);
// Calculate the number of bytes actually marshalled
......@@ -352,13 +352,13 @@ size_t castor::tape::aggregator::RtcpMarshaller::marshall(char *const dst,
void castor::tape::aggregator::RtcpMarshaller::unmarshall(const char * &src,
size_t &srcLen, RcpJobRqstMsgBody &dst) throw(castor::exception::Exception) {
unmarshallUint32(src, srcLen, dst.tapeRequestId);
unmarshallUint32(src, srcLen, dst.volReqId);
unmarshallUint32(src, srcLen, dst.clientPort);
unmarshallUint32(src, srcLen, dst.clientEuid);
unmarshallUint32(src, srcLen, dst.clientEgid);
unmarshallString(src, srcLen, dst.clientHost);
unmarshallString(src, srcLen, dst.deviceGroupName);
unmarshallString(src, srcLen, dst.driveName);
unmarshallString(src, srcLen, dst.driveUnit);
unmarshallString(src, srcLen, dst.clientUserName);
}
......
......@@ -658,7 +658,7 @@ void castor::tape::aggregator::RtcpTxRx::receiveRcpJobRqst(const Cuuid_t &cuuid,
}
LogHelper::logMsgBody(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_RECEIVED_RCP_JOB_RQST, request.tapeRequestId, socketFd, request);
AGGREGATOR_RECEIVED_RCP_JOB_RQST, request.volReqId, socketFd, request);
}
//-----------------------------------------------------------------------------
......
......@@ -145,31 +145,31 @@ void castor::tape::aggregator::VdqmRequestHandler::run(void *param)
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_CREATED_RTCPD_CALLBACK_PORT, params);
uint32_t volReqId = 0;
char gatewayHost[CA_MAXHOSTNAMELEN+1];
utils::setBytes(gatewayHost, '\0');
unsigned short gatewayPort = 0;
SmartFd rtcpdInitialSockFd;
char unit[CA_MAXUNMLEN+1];
utils::setBytes(unit, '\0');
tapegateway::Volume volume;
// Receive the RTCOPY job request from the VDQM
RcpJobRqstMsgBody jobRequest;
utils::setBytes(jobRequest, '\0');
checkRcpJobSubmitterIsAuthorised(vdqmSock->socket());
RtcpTxRx::receiveRcpJobRqst(cuuid, vdqmSock->socket(), RTCPDNETRWTIMEOUT,
jobRequest);
DriveAllocationProtocolEngine driveAllocationProtocolEngine;
const bool thereIsAVolume = driveAllocationProtocolEngine.run(cuuid,
*(vdqmSock.get()), rtcpdCallbackSockFd.get(), rtcpdCallbackHost,
rtcpdCallbackPort, volReqId, gatewayHost, gatewayPort,
rtcpdInitialSockFd, unit, volume);
rtcpdCallbackPort, rtcpdInitialSockFd, jobRequest, volume);
// If there is no volume to mount, then notify tape gatway of end of session
// and return
// If there is no volume to mount, then notify tape gatway of end of
// session and return
if(!thereIsAVolume) {
try {
GatewayTxRx::notifyGatewayEndOfSession(cuuid, volReqId, gatewayHost,
gatewayPort);
GatewayTxRx::notifyGatewayEndOfSession(cuuid, jobRequest.volReqId,
jobRequest.clientHost, jobRequest.clientPort);
} catch(castor::exception::Exception &ex) {
// Don't rethrow, just log the exception
castor::dlf::Param params[] = {
castor::dlf::Param("volReqId", volReqId ),
castor::dlf::Param("volReqId", jobRequest.volReqId ),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code() )};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
......@@ -201,9 +201,9 @@ void castor::tape::aggregator::VdqmRequestHandler::run(void *param)
// Note the call to run() sets the vsn to ""
char vsn[CA_MAXVSNLEN+1];
utils::setBytes(vsn, '\0');
BridgeProtocolEngine bridgeProtocolEngine(cuuid, volReqId, gatewayHost,
gatewayPort, rtcpdCallbackSockFd.get(), rtcpdInitialSockFd.get(), unit,
volume, vsn, m_stoppingGracefullyFunctor);
BridgeProtocolEngine bridgeProtocolEngine(cuuid, jobRequest.volReqId,
jobRequest.clientHost, jobRequest.clientPort, rtcpdCallbackSockFd.get(), rtcpdInitialSockFd.get(), jobRequest.driveUnit, volume, vsn,
m_stoppingGracefullyFunctor);
bridgeProtocolEngine.run();
}
} catch(castor::exception::Exception &ex) {
......@@ -237,3 +237,34 @@ bool castor::tape::aggregator::VdqmRequestHandler::stoppingGracefully()
throw() {
return m_stoppingGracefully;
}
//-----------------------------------------------------------------------------
// checkRcpJobSubmitterIsAuthorised
//-----------------------------------------------------------------------------
void castor::tape::aggregator::VdqmRequestHandler::
checkRcpJobSubmitterIsAuthorised(const int socketFd)
throw(castor::exception::Exception) {
char peerHost[CA_MAXHOSTNAMELEN+1];
// isadminhost fills in peerHost
const int rc = isadminhost(socketFd, peerHost);
if(rc == -1 && serrno != SENOTADMIN) {
TAPE_THROW_EX(castor::exception::Internal,
": Failed to lookup connection"
<< ": Peer Host: " << peerHost);
}
if(*peerHost == '\0' ) {
TAPE_THROW_CODE(EINVAL,
": Peer host name is an empty string");
}
if(rc != 0) {
TAPE_THROW_CODE(SENOTADMIN,
": Unauthorized host"
<< ": Peer Host: " << peerHost);
}
}
......@@ -122,6 +122,14 @@ private:
*/
StoppingGracefullyFunctor m_stoppingGracefullyFunctor;
/**
* Throws an exception if the peer host associated with the specified
* socket is not an authorised RCP job submitter.
*
* @param socketFd The socket file descriptor.
*/
void checkRcpJobSubmitterIsAuthorised(const int socketFd)
throw(castor::exception::Exception);
}; // class VdqmRequestHandler
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment