Skip to content
Snippets Groups Projects
Commit cd94751e authored by Steven Murray's avatar Steven Murray
Browse files

Fixed file descriptor leak in net::createListenSocket logic

parent a86370c7
Branches
Tags
No related merge requests found
......@@ -109,65 +109,10 @@ void castor::tape::aggregator::VdqmRequestHandler::run(void *param)
"VDQM request handler socket is NULL");
}
// Wrap the VDQM connection socket within an auto pointer. When the auto
// pointer goes out of scope it will delete the socket. The destructor of
// the socket will in turn close the connection.
std::auto_ptr<castor::io::AbstractTCPSocket>
vdqmSock((castor::io::AbstractTCPSocket*)param);
// Log the new connection
try {
unsigned short port = 0; // Client port
unsigned long ip = 0; // Client IP
char hostName[net::HOSTNAMEBUFLEN];
net::getPeerIpPort(vdqmSock->socket(), ip, port);
net::getPeerHostName(vdqmSock->socket(), hostName);
castor::dlf::Param params[] = {
castor::dlf::Param("IP" , castor::dlf::IPAddress(ip)),
castor::dlf::Param("Port" , port ),
castor::dlf::Param("HostName", hostName ),
castor::dlf::Param("socketFd", vdqmSock->socket() )};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_RECEIVED_VDQM_CONNECTION, params);
} catch(castor::exception::Exception &ex) {
castor::exception::Exception ex2(ex.code());
ex2.getMessage() <<
"Failed to log new connection"
": " << ex.getMessage().str();
throw(ex2);
}
// Check the VDQM (an RTCP job submitter) is authorised
checkRtcpJobSubmitterIsAuthorised(vdqmSock->socket());
// Receive the RTCOPY job request from the VDQM
legacymsg::RtcpJobRqstMsgBody jobRequest;
utils::setBytes(jobRequest, '\0');
RtcpTxRx::receiveRtcpJobRqst(cuuid, vdqmSock->socket(), RTCPDNETRWTIMEOUT,
getRtcpJobAndCloseConn(cuuid, (castor::io::ServerSocket*)param,
jobRequest);
// Send a positive acknowledge to the VDQM
{
legacymsg::RtcpJobReplyMsgBody rtcpdReply;
utils::setBytes(rtcpdReply, '\0');
rtcpdReply.status = VDQM_CLIENTINFO; // Strange status code
char vdqmReplyBuf[RTCPMSGBUFSIZE];
size_t vdqmReplyLen = 0;
vdqmReplyLen = legacymsg::marshal(vdqmReplyBuf, rtcpdReply);
net::writeBytes(vdqmSock->socket(), RTCPDNETRWTIMEOUT, vdqmReplyLen,
vdqmReplyBuf);
}
// Close the connection to the VDQM
// Please note that the destructor of AbstractTCPSocket will not close the
// socket a second time
vdqmSock->close();
// Check the drive unit name given by the VDQM exists in the TPCONFIG file
{
utils::TpconfigLines tpconfigLines;
......@@ -328,6 +273,69 @@ void castor::tape::aggregator::VdqmRequestHandler::run(void *param)
}
//-----------------------------------------------------------------------------
// getRtcpJobFromVdqmAndCloseConn
//-----------------------------------------------------------------------------
void castor::tape::aggregator::VdqmRequestHandler::getRtcpJobAndCloseConn(
const Cuuid_t &cuuid,
io::ServerSocket* sock,
legacymsg::RtcpJobRqstMsgBody &jobRequest)
throw(castor::exception::Exception) {
// Wrap the VDQM connection socket within an auto pointer. When the auto
// pointer goes out of scope it will delete the socket. The destructor of
// the socket will in turn close the connection.
std::auto_ptr<castor::io::ServerSocket> vdqmSock(sock);
// Log the new connection
try {
unsigned short port = 0; // Client port
unsigned long ip = 0; // Client IP
char hostName[net::HOSTNAMEBUFLEN];
net::getPeerIpPort(vdqmSock->socket(), ip, port);
net::getPeerHostName(vdqmSock->socket(), hostName);
castor::dlf::Param params[] = {
castor::dlf::Param("IP" , castor::dlf::IPAddress(ip)),
castor::dlf::Param("Port" , port ),
castor::dlf::Param("HostName", hostName ),
castor::dlf::Param("socketFd", vdqmSock->socket() )};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_RECEIVED_VDQM_CONNECTION, params);
} catch(castor::exception::Exception &ex) {
castor::exception::Exception ex2(ex.code());
ex2.getMessage() <<
"Failed to log new connection"
": " << ex.getMessage().str();
throw(ex2);
}
// Check the VDQM (an RTCP job submitter) is authorised
checkRtcpJobSubmitterIsAuthorised(vdqmSock->socket());
// Receive the RTCOPY job request from the VDQM
utils::setBytes(jobRequest, '\0');
RtcpTxRx::receiveRtcpJobRqst(cuuid, vdqmSock->socket(), RTCPDNETRWTIMEOUT,
jobRequest);
// Send a positive acknowledge to the VDQM
{
legacymsg::RtcpJobReplyMsgBody rtcpdReply;
utils::setBytes(rtcpdReply, '\0');
rtcpdReply.status = VDQM_CLIENTINFO; // Strange status code
char vdqmReplyBuf[RTCPMSGBUFSIZE];
size_t vdqmReplyLen = 0;
vdqmReplyLen = legacymsg::marshal(vdqmReplyBuf, rtcpdReply);
net::writeBytes(vdqmSock->socket(), RTCPDNETRWTIMEOUT, vdqmReplyLen,
vdqmReplyBuf);
}
}
//-----------------------------------------------------------------------------
// exceptionThrowingRun
//-----------------------------------------------------------------------------
......
......@@ -111,6 +111,20 @@ private:
*/
static StoppingGracefullyFunctor s_stoppingGracefullyFunctor;
/**
* Gets the RTCOPY job from the specified VDQM connection socket and then
* deletes the socket object causing it to close the underlying connection.
*
* @param cuuid The ccuid to be used for logging.
* @param sock The VDQM connection socket.
* @param jobRequest Outpur parameter: The RTCOPY job request from the VDQM.
*/
void getRtcpJobAndCloseConn(
const Cuuid_t &cuuid,
io::ServerSocket* sock,
legacymsg::RtcpJobRqstMsgBody &jobRequest)
throw(castor::exception::Exception);
/**
* The entry point of this thread delegates its work to this method with a
* try and catch around the call so that we can throw exceptions.
......
......@@ -28,6 +28,7 @@
#include "castor/exception/InvalidArgument.hpp"
#include "castor/tape/aggregator/Constants.hpp"
#include "castor/tape/net/net.hpp"
#include "castor/tape/utils/SmartFd.hpp"
#include "castor/tape/utils/utils.hpp"
#include "h/serrno.h"
#include "h/socket_timeout.h"
......@@ -40,48 +41,6 @@
#include <time.h>
//-----------------------------------------------------------------------------
// createListenerSock
//-----------------------------------------------------------------------------
int castor::tape::net::createListenerSock(const char *addr,
const unsigned short port) throw(castor::exception::Exception) {
int socketFd = 0;
struct sockaddr_in address;
if ((socketFd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
const int savedErrno = errno;
TAPE_THROW_EX(castor::exception::Internal,
": Failed to create listener socket"
": " << sstrerror(savedErrno));
}
utils::setBytes(address, '\0');
address.sin_family = AF_INET;
address.sin_addr.s_addr = inet_addr(addr);
address.sin_port = htons(port);
if(bind(socketFd, (struct sockaddr *) &address, sizeof(address)) < 0) {
const int savedErrno = errno;
TAPE_THROW_CODE(savedErrno,
": Failed to bind listener socket"
": " << sstrerror(savedErrno));
}
if(listen(socketFd, LISTENBACKLOG) < 0) {
const int savedErrno = errno;
TAPE_THROW_EX(castor::exception::Internal,
": Failed to mark socket as being a listener"
": " << sstrerror(savedErrno));
}
return socketFd;
}
//-----------------------------------------------------------------------------
// createListenerSock
//-----------------------------------------------------------------------------
......@@ -109,24 +68,59 @@ int castor::tape::net::createListenerSock(
": lowPort=" << lowPort << " highPort=" << highPort);
}
// Create a socket
utils::SmartFd sock(socket(PF_INET, SOCK_STREAM, IPPROTO_TCP));
if(sock.get() < 0) {
const int savedErrno = errno;
TAPE_THROW_EX(castor::exception::Internal,
": Failed to create socket"
": " << sstrerror(savedErrno));
}
// Address structure to be used to bind the socket
struct sockaddr_in address;
// For each port in the range
for(unsigned short p=lowPort; p<=highPort; ++p) {
try{
const int rc = createListenerSock(addr, p);
chosenPort = p;
return rc;
} catch(castor::exception::Exception &ex) {
// Rethow all exceptions except for address in use
if(ex.code() != EADDRINUSE) {
throw(ex);
for(unsigned short port=lowPort; port<=highPort; ++port) {
// Try to bind the socket to the port
utils::setBytes(address, '\0');
address.sin_family = AF_INET;
address.sin_addr.s_addr = inet_addr(addr);
address.sin_port = htons(port);
const int rc = bind(sock.get(), (struct sockaddr *) &address,
sizeof(address));
const int savedErrno = errno;
// If the bind was successful, then release and return the socket descriptor
if(rc == 0) {
return(sock.release());
// Else the bind failed
} else {
// If the bind failed because the address was in use, then continue
if(savedErrno == EADDRINUSE) {
continue;
// Else throw an exception
} else {
TAPE_THROW_CODE(savedErrno,
": Failed to bind listener socket"
": " << sstrerror(savedErrno));
}
}
}
// Failed to bind to a port within the specified range
castor::exception::NoPortInRange ex(lowPort, highPort);
// If this line is reached then all ports in the specified range are in use
ex.getMessage() << "Failed to bind a port within the specified range"
// Throw an exception
castor::exception::NoPortInRange ex(lowPort, highPort);
ex.getMessage() << "All ports within the specified range are in use"
": lowPort=" << lowPort << " highPort=" << highPort;
throw(ex);
......
......@@ -42,17 +42,6 @@ namespace castor {
namespace tape {
namespace net {
/**
* Creates a listener socket including, creation, binding and marking as a
* listener socket.
*
* @param addr The IP address in dotted quad notation to be used by
* inet_addr().
* @param port The port number to listen on or 0 if one should be allocated.
* @return The socket file descriptor.
*/
int createListenerSock(const char *addr, const unsigned short port)
throw(castor::exception::Exception);
/**
* Creates a listener socket with a port number within the specified range.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment