diff --git a/castor/tape/net/Imakefile b/castor/tape/net/Imakefile index e0bdc49bc65d98957439634398501607583a6fe8..7926980eaa9f2a25479f94a7e79b90eba2000477 100644 --- a/castor/tape/net/Imakefile +++ b/castor/tape/net/Imakefile @@ -24,6 +24,7 @@ COMM AddLdFlags(-lstdc++) TAPENETLIB_SRCS = \ + IpAndPort.cpp \ net.cpp TAPENETLIB_OBJS = $(TAPENETLIB_SRCS:.cpp=.o) diff --git a/castor/tape/net/IpAndPort.cpp b/castor/tape/net/IpAndPort.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a26354a3af34bfe723e80ef1932dcde3b0020858 --- /dev/null +++ b/castor/tape/net/IpAndPort.cpp @@ -0,0 +1,70 @@ +/****************************************************************************** + * castor/tape/net/IpAndPort.cpp + * + * This file is part of the Castor project. + * See http://castor.web.cern.ch/castor + * + * Copyright (C) 2003 CERN + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * + * + * + * @author Steven.Murray@cern.ch + *****************************************************************************/ + +#include "castor/tape/net/IpAndPort.hpp" + + +//----------------------------------------------------------------------------- +// constructor +//----------------------------------------------------------------------------- +castor::tape::net::IpAndPort::IpAndPort( + const unsigned long ip, + const unsigned short port) throw(): + m_ip(ip), + m_port(port) { + // Do nothing +} + + +//----------------------------------------------------------------------------- +// setIp +//----------------------------------------------------------------------------- +void castor::tape::net::IpAndPort::setIp(const unsigned long ip) throw() { + m_ip = ip; +} + + +//----------------------------------------------------------------------------- +// setPort +//----------------------------------------------------------------------------- +void castor::tape::net::IpAndPort::setPort(const unsigned short port) throw() { + m_port = port; +} + + +//----------------------------------------------------------------------------- +// getIp +//----------------------------------------------------------------------------- +unsigned long castor::tape::net::IpAndPort::getIp() const throw() { + return m_ip; +} + + +//----------------------------------------------------------------------------- +// getPort +//----------------------------------------------------------------------------- +unsigned short castor::tape::net::IpAndPort::getPort() const throw() { + return m_port; +} diff --git a/castor/tape/net/IpAndPort.hpp b/castor/tape/net/IpAndPort.hpp new file mode 100644 index 0000000000000000000000000000000000000000..da10b832059c06cec71bbd34e8b0db3c71b2cd5b --- /dev/null +++ b/castor/tape/net/IpAndPort.hpp @@ -0,0 +1,89 @@ +/****************************************************************************** + * castor/tape/net/IpAndPort.hpp + * + * This file is part of the Castor project. + * See http://castor.web.cern.ch/castor + * + * Copyright (C) 2003 CERN + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * + * + * + * @author Steven.Murray@cern.ch + *****************************************************************************/ + +#ifndef CASTOR_TAPE_NET_IPANDPORT_HPP +#define CASTOR_TAPE_NET_IPANDPORT_HPP 1 + +#include <stdint.h> +#include <stdlib.h> + + +namespace castor { +namespace tape { +namespace net { + +/** + * Class used to store the IP number and the port number of a TCP/IP address. + */ +class IpAndPort { +public: + + /** + * Constructor. + * + * @param ip The IP number of the TCP/IP address. + * @param port The port number of the TCP/IP address. + */ + IpAndPort(const unsigned long ip, const unsigned short port) throw(); + + /** + * Sets the IP number of the TCP/IP address. + */ + void setIp(const unsigned long ip) throw(); + + /** + * Sets the port number of the TCP/IP address. + */ + void setPort(const unsigned short port) throw(); + + /** + * Gets the IP number of the TCP/IP address. + */ + unsigned long getIp() const throw(); + + /** + * Gets the port number of the TCP/IP address. + */ + unsigned short getPort() const throw(); + +private: + + /** + * The IP number of the TCP/IP address. + */ + unsigned long m_ip; + + /** + * The port number of the TCP/IP address. + */ + unsigned short m_port; +}; // class IpAndPort + +} // namespace net +} // namespace tape +} // namespace castor + + +#endif // CASTOR_TAPE_NET_IPANDPORT_HPP diff --git a/castor/tape/net/net.cpp b/castor/tape/net/net.cpp index e7b02f3866224178d0e74a469817e683cbe460cc..5e9d81f74f4c2db61c0ad4042a30420d961586b7 100644 --- a/castor/tape/net/net.cpp +++ b/castor/tape/net/net.cpp @@ -36,6 +36,7 @@ #include <fcntl.h> #include <netdb.h> #include <sstream> +#include <string.h> #include <sys/select.h> #include <sys/socket.h> #include <sys/types.h> @@ -46,7 +47,7 @@ // createListenerSock //----------------------------------------------------------------------------- int castor::tape::net::createListenerSock( - const char *addr, + const char *const addr, const unsigned short lowPort, const unsigned short highPort, unsigned short &chosenPort) @@ -207,9 +208,11 @@ int castor::tape::net::acceptConnection(const int listenSocketFd) //----------------------------------------------------------------------------- // acceptConnection //----------------------------------------------------------------------------- -int castor::tape::net::acceptConnection(const int listenSocketFd, - const time_t timeout) throw(castor::exception::TimeOut, - castor::exception::TapeNetAcceptInterrupted, castor::exception::Exception) { +int castor::tape::net::acceptConnection( + const int listenSocketFd, + const time_t timeout) + throw(castor::exception::TimeOut, + castor::exception::TapeNetAcceptInterrupted, castor::exception::Exception) { // Throw an exception if listenSocketFd is invalid if(listenSocketFd < 0) { @@ -293,9 +296,8 @@ int castor::tape::net::acceptConnection(const int listenSocketFd, //----------------------------------------------------------------------------- // getSockIpPort //----------------------------------------------------------------------------- -void castor::tape::net::getSockIpPort(const int socketFd, - unsigned long& ip, unsigned short& port) - throw(castor::exception::Exception) { +castor::tape::net::IpAndPort castor::tape::net::getSockIpPort( + const int socketFd) throw(castor::exception::Exception) { // Throw an exception if socketFd is invalid if(socketFd < 0) { @@ -305,6 +307,7 @@ void castor::tape::net::getSockIpPort(const int socketFd, } struct sockaddr_in address; + memset(&address, '\0', sizeof(address)); socklen_t addressLen = sizeof(address); if(getsockname(socketFd, (struct sockaddr*)&address, &addressLen) < 0) { @@ -316,17 +319,15 @@ void castor::tape::net::getSockIpPort(const int socketFd, ": " << sstrerror(savedErrno)); } - ip = ntohl(address.sin_addr.s_addr); - port = ntohs(address.sin_port); + return IpAndPort(ntohl(address.sin_addr.s_addr), ntohs(address.sin_port)); } //----------------------------------------------------------------------------- // getPeerIpPort //----------------------------------------------------------------------------- -void castor::tape::net::getPeerIpPort(const int socketFd, - unsigned long& ip, unsigned short& port) - throw(castor::exception::Exception) { +castor::tape::net::IpAndPort castor::tape::net::getPeerIpPort( + const int socketFd) throw(castor::exception::Exception) { // Throw an exception if socketFd is invalid if(socketFd < 0) { @@ -336,6 +337,7 @@ void castor::tape::net::getPeerIpPort(const int socketFd, } struct sockaddr_in address; + memset(&address, '\0', sizeof(address)); socklen_t addressLen = sizeof(address); if(getpeername(socketFd, (struct sockaddr*)&address, &addressLen) < 0) { @@ -347,16 +349,18 @@ void castor::tape::net::getPeerIpPort(const int socketFd, ": " << sstrerror(savedErrno)); } - ip = ntohl(address.sin_addr.s_addr); - port = ntohs(address.sin_port); + return IpAndPort(ntohl(address.sin_addr.s_addr), ntohs(address.sin_port)); } //------------------------------------------------------------------------------ // getSockHostName //------------------------------------------------------------------------------ -void castor::tape::net::getSockHostName(const int socketFd, - char *buf, size_t len) throw(castor::exception::Exception) { +void castor::tape::net::getSockHostName( + const int socketFd, + char *const buf, + const size_t len) + throw(castor::exception::Exception) { // Throw an exception if socketFd is invalid if(socketFd < 0) { @@ -396,9 +400,13 @@ void castor::tape::net::getSockHostName(const int socketFd, //------------------------------------------------------------------------------ // getSockIpHostnamePort //------------------------------------------------------------------------------ -void castor::tape::net::getSockIpHostnamePort(const int socketFd, - unsigned long& ip, char *hostName, size_t hostNameLen, - unsigned short& port) throw(castor::exception::Exception) { +void castor::tape::net::getSockIpHostnamePort( + const int socketFd, + unsigned long &ip, + char *const hostName, + const size_t hostNameLen, + unsigned short &port) + throw(castor::exception::Exception) { // Throw an exception if socketFd is invalid if(socketFd < 0) { @@ -440,8 +448,11 @@ void castor::tape::net::getSockIpHostnamePort(const int socketFd, //------------------------------------------------------------------------------ // getPeerHostName //------------------------------------------------------------------------------ -void castor::tape::net::getPeerHostName(const int socketFd, - char *buf, size_t len) throw(castor::exception::Exception) { +void castor::tape::net::getPeerHostName( + const int socketFd, + char *const buf, + const size_t len) + throw(castor::exception::Exception) { // Throw an exception if socketFd is invalid if(socketFd < 0) { @@ -483,8 +494,10 @@ void castor::tape::net::getPeerHostName(const int socketFd, //------------------------------------------------------------------------------ // writeIp //------------------------------------------------------------------------------ -void castor::tape::net::writeIp(std::ostream &os, - const unsigned long ip) throw() { +void castor::tape::net::writeIp( + std::ostream &os, + const unsigned long ip) + throw() { os << ((ip >> 24) & 0x000000FF) << "." << ((ip >> 16) & 0x000000FF) << "." << ((ip >> 8) & 0x000000FF) << "." @@ -495,8 +508,10 @@ void castor::tape::net::writeIp(std::ostream &os, //------------------------------------------------------------------------------ // writeSockDescription //------------------------------------------------------------------------------ -void castor::tape::net::writeSockDescription(std::ostream &os, - const int socketFd) throw() { +void castor::tape::net::writeSockDescription( + std::ostream &os, + const int socketFd) + throw() { // Throw an exception if socketFd is invalid if(socketFd < 0) { @@ -505,31 +520,28 @@ void castor::tape::net::writeSockDescription(std::ostream &os, ": socketFd=" << socketFd); } - unsigned long localIp = 0; - unsigned short localPort = 0; - unsigned long peerIp = 0; - unsigned short peerPort = 0; - + IpAndPort localIpAndPort(0, 0); try { - getSockIpPort(socketFd, localIp, localPort); + localIpAndPort = getSockIpPort(socketFd); } catch(castor::exception::Exception &e) { - localIp = 0; - localPort = 0; + localIpAndPort.setIp(0); + localIpAndPort.setPort(0); } + IpAndPort peerIpAndPort(0, 0); try { - getPeerIpPort(socketFd, peerIp, peerPort); + peerIpAndPort = getPeerIpPort(socketFd); } catch(castor::exception::Exception &e) { - peerIp = 0; - peerPort = 0; + peerIpAndPort.setIp(0); + peerIpAndPort.setPort(0); } os << "{local="; - writeIp(os, localIp); - os << ":" << localPort; + writeIp(os, localIpAndPort.getIp()); + os << ":" << localIpAndPort.getPort(); os << ",peer="; - writeIp(os, peerIp); - os << ":" << peerPort; + writeIp(os, peerIpAndPort.getIp()); + os << ":" << peerIpAndPort.getPort(); os << "}"; } @@ -537,8 +549,12 @@ void castor::tape::net::writeSockDescription(std::ostream &os, //------------------------------------------------------------------------------ // readBytes //------------------------------------------------------------------------------ -void castor::tape::net::readBytes(const int socketFd, const int timeout, - const int nbBytes, char *buf) throw(castor::exception::Exception) { +void castor::tape::net::readBytes( + const int socketFd, + const int timeout, + const int nbBytes, + char *const buf) + throw(castor::exception::Exception) { // Throw an exception if socketFd is invalid if(socketFd < 0) { @@ -547,9 +563,8 @@ void castor::tape::net::readBytes(const int socketFd, const int timeout, ": socketFd=" << socketFd); } - bool connClosed = false; - - readBytesFromCloseable(connClosed, socketFd, timeout, nbBytes, buf); + const bool connClosed = readBytesFromCloseable(socketFd, timeout, nbBytes, + buf); if(connClosed) { std::stringstream oss; @@ -565,8 +580,11 @@ void castor::tape::net::readBytes(const int socketFd, const int timeout, //------------------------------------------------------------------------------ // readBytesFromCloseable //------------------------------------------------------------------------------ -void castor::tape::net::readBytesFromCloseable(bool &connClosed, - const int socketFd, const int timeout, const int nbBytes, char *buf) +bool castor::tape::net::readBytesFromCloseable( + const int socketFd, + const int timeout, + const int nbBytes, + char *const buf) throw(castor::exception::Exception) { // Throw an exception if socketFd is invalid @@ -576,7 +594,7 @@ void castor::tape::net::readBytesFromCloseable(bool &connClosed, ": socketFd=" << socketFd); } - connClosed = false; + bool connClosed = false; const int rc = netread_timeout(socketFd, buf, nbBytes, timeout); int savedSerrno = serrno; @@ -620,14 +638,20 @@ void castor::tape::net::readBytesFromCloseable(bool &connClosed, TAPE_THROW_CODE(SECOMERR, oss.str()); } } + + return connClosed; } //------------------------------------------------------------------------------ // writeBytes //------------------------------------------------------------------------------ -void castor::tape::net::writeBytes(const int socketFd, const int timeout, - const int nbBytes, char *const buf) throw(castor::exception::Exception) { +void castor::tape::net::writeBytes( + const int socketFd, + const int timeout, + const int nbBytes, + char *const buf) + throw(castor::exception::Exception) { // Throw an exception if socketFd is invalid if(socketFd < 0) { diff --git a/castor/tape/net/net.hpp b/castor/tape/net/net.hpp index 38630b97c93e2fea2cda1e15367fe253571cc462..3589c7e79243ae794b07c6a37913fec2dacec9ec 100644 --- a/castor/tape/net/net.hpp +++ b/castor/tape/net/net.hpp @@ -32,6 +32,7 @@ #include "castor/exception/TapeNetAcceptInterrupted.hpp" #include "castor/exception/TimeOut.hpp" #include "castor/tape/net/Constants.hpp" +#include "castor/tape/net/IpAndPort.hpp" #include <errno.h> #include <string.h> @@ -62,24 +63,29 @@ namespace net { * parameter must be an unsigned integer greater than 0. * @param chosenPort Out parameter: The actual port that this method binds the * socket to. - * @return The socket file descriptor. + * @return The socket descriptor. */ -int createListenerSock(const char *addr, const unsigned short lowPort, - const unsigned short highPort, unsigned short &chosenPort) +int createListenerSock( + const char *const addr, + const unsigned short lowPort, + const unsigned short highPort, + unsigned short &chosenPort) throw(castor::exception::Exception); /** * Accepts a connection on the specified listener socket and returns the - * socket file descriptor of the newly created connected socket. + * socket descriptor of the newly created and connected socket. * * @param listenSockFd The file descriptor of the listener socket. + * @return The socket descriptor of the newly created and connected + * socket. */ int acceptConnection(const int listenSockFd) throw(castor::exception::Exception); /** * Accepts a connection on the specified listener socket and returns the - * socket file descriptor of the newly created connected socket. + * socket descriptor of the newly created and connected socket. * * This method accepts a timeout parameter. If the timeout is exceeded, then * this method raises a castor::exception::TimeOut exception. If this method @@ -96,30 +102,32 @@ int acceptConnection(const int listenSockFd) * @param listenSockFd The file descriptor of the listener socket. * @param timeout The timeout in seconds to be used when waiting for a * connection. + * @return The socket descriptor of the newly created and connected + * socket. */ -int acceptConnection(const int listenSockFd, - const time_t timeout) throw(castor::exception::TimeOut, - castor::exception::TapeNetAcceptInterrupted, castor::exception::Exception); +int acceptConnection( + const int listenSockFd, + const time_t timeout) + throw(castor::exception::TimeOut, + castor::exception::TapeNetAcceptInterrupted, castor::exception::Exception); /** * Gets the locally-bound IP and port number of the specified socket. * * @param socketFd The socket file descriptor. - * @param ip The IP to be filled. - * @param port The port to be filled. + * @return The IP and port number of the specified socket. */ -void getSockIpPort(const int socketFd, unsigned long& ip, - unsigned short& port) throw(castor::exception::Exception); +IpAndPort getSockIpPort(const int socketFd) + throw(castor::exception::Exception); /** * Gets the peer IP and port number of the specified socket. * * @param socketFd The socket file descriptor. - * @param ip The IP to be filled. - * @param port The port to be filled. + * @return The IP and port number of the specified socket. */ -void getPeerIpPort(const int socketFd, unsigned long& ip, - unsigned short& port) throw(castor::exception::Exception); +IpAndPort getPeerIpPort(const int socketFd) + throw(castor::exception::Exception); /** * Gets the locally-bound host name of the specified socket. @@ -129,7 +137,10 @@ void getPeerIpPort(const int socketFd, unsigned long& ip, * @param len The length of the buffer into which the host name should be * written to. */ -void getSockHostName(const int socketFd, char *buf, size_t len) +void getSockHostName( + const int socketFd, + char *const buf, + const size_t len) throw(castor::exception::Exception); /** @@ -138,12 +149,13 @@ void getSockHostName(const int socketFd, char *buf, size_t len) * @param socketFd The socket file descriptor. * @param buf The buffer into which the hostname should written to. */ -template<int n> static void getSockHostName(const int socketFd, - char (&buf)[n]) throw(castor::exception::Exception) { +template<int n> static void getSockHostName( + const int socketFd, + char (&buf)[n]) + throw(castor::exception::Exception) { getSockHostName(socketFd, buf, n); } - /** * Gets the locally-bound IP, host name and port of the specified socket. * @@ -154,9 +166,12 @@ template<int n> static void getSockHostName(const int socketFd, * should be written to. * @param port The port to be filled. */ -void getSockIpHostnamePort(const int socketFd, - unsigned long& ip, char *hostName, size_t hostNameLen, - unsigned short& port) throw(castor::exception::Exception); +void getSockIpHostnamePort( + const int socketFd, + unsigned long &ip, + char *const hostName, + const size_t hostNameLen, + unsigned short &port) throw(castor::exception::Exception); /** * Gets the locally-bound IP, host name and port of the specified socket. @@ -166,8 +181,11 @@ void getSockIpHostnamePort(const int socketFd, * @param hostName The buffer into which the hostname should written to. * @param port The port to be filled. */ -template<int n> static void getSockIpHostnamePort(const int socketFd, - unsigned long& ip, char (&hostName)[n], unsigned short& port) +template<int n> static void getSockIpHostnamePort( + const int socketFd, + unsigned long &ip, + char (&hostName)[n], + unsigned short &port) throw(castor::exception::Exception) { getSockIpHostnamePort(socketFd, ip, hostName, n, port); } @@ -180,7 +198,10 @@ template<int n> static void getSockIpHostnamePort(const int socketFd, * @param len The length of the buffer into which the host name should be * written to. */ -void getPeerHostName(const int socketFd, char *buf, size_t len) +void getPeerHostName( + const int socketFd, + char *const buf, + const size_t len) throw(castor::exception::Exception); /** @@ -189,8 +210,10 @@ void getPeerHostName(const int socketFd, char *buf, size_t len) * @param socketFd The socket file descriptor of the connection. * @param buf The buffer into which the hostname should written to. */ -template<int n> static void getPeerHostName(const int socketFd, - char (&buf)[n]) throw(castor::exception::Exception) { +template<int n> static void getPeerHostName( + const int socketFd, + char (&buf)[n]) + throw(castor::exception::Exception) { getPeerHostName(socketFd, buf, n); } @@ -201,7 +224,10 @@ template<int n> static void getPeerHostName(const int socketFd, * @param os The output stream. * @param ip The IP address in host byte order. */ -void writeIp(std::ostream &os, const unsigned long ip) throw(); +void writeIp( + std::ostream &os, + const unsigned long ip) + throw(); /** * Writes a textual description of the specified socket to the specified @@ -211,7 +237,9 @@ void writeIp(std::ostream &os, const unsigned long ip) throw(); * @param socketFd The file descriptor of the socket whose textual * description is to be printed to the stream. */ -void writeSockDescription(std::ostream &os, const int socketFd) +void writeSockDescription( + std::ostream &os, + const int socketFd) throw(); /** @@ -231,22 +259,28 @@ void writeSockDescription(std::ostream &os, const int socketFd) * @param nbBytes The number of bytes to be read. * @param buf The buffer into which the bytes will be written. */ -void readBytes(const int socketFd, const int timeout, const int nbBytes, - char *buf) throw(castor::exception::Exception); +void readBytes( + const int socketFd, + const int timeout, + const int nbBytes, + char *const buf) + throw(castor::exception::Exception); /** * Reads the specified number of bytes from the specified closable socket * and writes the result into the specified buffer. * - * @param connClosed Output parameter: True if the connection was closed - * by the peer. - * @param socketFd The file descriptor of the socket to be read from. - * @param timeout The timeout in seconds. - * @param nbBytes The number of bytes to be read. - * @param buf The buffer into which the bytes will be written. + * @param socketFd The file descriptor of the socket to be read from. + * @param timeout The timeout in seconds. + * @param nbBytes The number of bytes to be read. + * @param buf The buffer into which the bytes will be written. + * @return True if the connection was closed by the peer, else false. */ -void readBytesFromCloseable(bool &connClosed, const int socketFd, - const int timeout, const int nbBytes, char *buf) +bool readBytesFromCloseable( + const int socketFd, + const int timeout, + const int nbBytes, + char *const buf) throw(castor::exception::Exception); /** @@ -258,8 +292,12 @@ void readBytesFromCloseable(bool &connClosed, const int socketFd, * @param nbBytes The number of bytes to be written. * @param buf The buffer of bytes to be written to the socket. */ -void writeBytes(const int socketFd, const int timeout, const int nbBytes, - char *const buf) throw(castor::exception::Exception); +void writeBytes( + const int socketFd, + const int timeout, + const int nbBytes, + char *const buf) + throw(castor::exception::Exception); /** * Creates the specified socket and uses it to connects to the specified diff --git a/castor/tape/tapebridge/BridgeProtocolEngine.cpp b/castor/tape/tapebridge/BridgeProtocolEngine.cpp index 3a7e725cc410e8e1f99cbf655552a0473b8cb7eb..3f68e08a5a950c97a3b01760021a6a840416aa60 100644 --- a/castor/tape/tapebridge/BridgeProtocolEngine.cpp +++ b/castor/tape/tapebridge/BridgeProtocolEngine.cpp @@ -90,7 +90,8 @@ castor::tape::tapebridge::BridgeProtocolEngine::BridgeProtocolEngine( Counter<uint64_t> &tapebridgeTransactionCounter, const bool logPeerOfCallbackConnectionsFromRtcpd, const bool checkRtcpdIsConnectingFromLocalHost, - IClientProxy &clientProxy) + IClientProxy &clientProxy, + ILegacyTxRx &legacyTxRx) throw(castor::exception::Exception) : m_fileCloser(fileCloser), m_bulkRequestConfigParams(bulkRequestConfigParams), @@ -114,6 +115,7 @@ castor::tape::tapebridge::BridgeProtocolEngine::BridgeProtocolEngine( logPeerOfCallbackConnectionsFromRtcpd), m_checkRtcpdIsConnectingFromLocalHost(checkRtcpdIsConnectingFromLocalHost), m_clientProxy(clientProxy), + m_legacyTxRx(legacyTxRx), m_sessionErrors(cuuid, jobRequest, volume) { // Store the listen socket and initial rtcpd connection in the socket @@ -177,11 +179,9 @@ void castor::tape::tapebridge::BridgeProtocolEngine:: const int connectedSock) throw(castor::exception::Exception) { try { - unsigned short port = 0; // Client port - unsigned long ip = 0; // Client IP - char hostName[net::HOSTNAMEBUFLEN]; + char hostName[net::HOSTNAMEBUFLEN]; - net::getPeerIpPort(connectedSock, ip, port); + const net::IpAndPort peerIpAndPort = net::getPeerIpPort(connectedSock); net::getPeerHostName(connectedSock, hostName); castor::dlf::Param params[] = { @@ -194,8 +194,9 @@ void castor::tape::tapebridge::BridgeProtocolEngine:: castor::dlf::Param("clientPort" , m_jobRequest.clientPort ), castor::dlf::Param("clientType", utils::volumeClientTypeToString(m_volume.clientType())), - castor::dlf::Param("IP" , castor::dlf::IPAddress(ip) ), - castor::dlf::Param("Port" , port ), + castor::dlf::Param("IP" , + castor::dlf::IPAddress(peerIpAndPort.getIp())), + castor::dlf::Param("Port" , peerIpAndPort.getPort() ), castor::dlf::Param("HostName" , hostName ), castor::dlf::Param("socketFd" , connectedSock ), castor::dlf::Param("nbDiskTapeConns" , @@ -472,20 +473,17 @@ void castor::tape::tapebridge::BridgeProtocolEngine:: void castor::tape::tapebridge::BridgeProtocolEngine::checkPeerIsLocalhost( const int socketFd) throw(castor::exception::Exception) { - unsigned long ip; - unsigned short port; - - net::getPeerIpPort(socketFd, ip, port); + const net::IpAndPort peerIpAndPort = net::getPeerIpPort(socketFd); // localhost = 127.0.0.1 = 0x7F000001 - if(ip != 0x7F000001) { + if(peerIpAndPort.getIp() != 0x7F000001) { castor::exception::PermissionDenied ex; std::ostream &os = ex.getMessage(); os << "Peer is not local host" ": expected=127.0.0.1" ": actual="; - net::writeIp(os, ip); + net::writeIp(os, peerIpAndPort.getIp()); throw ex; } @@ -511,7 +509,7 @@ void castor::tape::tapebridge::BridgeProtocolEngine:: bool rtcpdClosedConnection = false; try { char dummyBuf[1]; - net::readBytesFromCloseable(rtcpdClosedConnection, pendingSock, + rtcpdClosedConnection = net::readBytesFromCloseable(pendingSock, RTCPDNETRWTIMEOUT, sizeof(dummyBuf), dummyBuf); } catch(castor::exception::Exception &ex) { TAPE_THROW_EX(castor::exception::Internal, @@ -551,9 +549,8 @@ void castor::tape::tapebridge::BridgeProtocolEngine:: // Try to receive the message header which may not be possible; The file // descriptor may be ready because rtcpd has closed the connection { - bool peerClosed = false; - LegacyTxRx::receiveMsgHeaderFromCloseable(m_cuuid, peerClosed, - m_jobRequest.volReqId, pendingSock, RTCPDNETRWTIMEOUT, header); + const bool peerClosed = m_legacyTxRx.receiveMsgHeaderFromCloseable( + pendingSock, header); // If the peer closed its side of the connection, then close this side // of the connection and return in order to continue the RTCOPY session @@ -839,6 +836,16 @@ bool castor::tape::tapebridge::BridgeProtocolEngine::startRtcpdSession() castor::dlf::dlf_writep(m_cuuid, DLF_LVL_ERROR, TAPEBRIDGE_FAILED_TO_START_RTCPD_SESSION, params); + // Gather the error information into an SessionError object + SessionError sessionError; + sessionError.setErrorCode(ex.code()); + sessionError.setErrorMessage(ex.getMessage().str()); + sessionError.setErrorScope(SessionError::SESSION_SCOPE); + + // Push the error onto the back of the list of errors generated during the + // sesion with the rtcpd daemon + m_sessionErrors.push_back(sessionError); + rtcpdSessionStarted = false; } @@ -1215,6 +1222,16 @@ void castor::tape::tapebridge::BridgeProtocolEngine::endRtcpdSession() throw() { castor::dlf::Param("errorMessage" , ex.getMessage().str() )}; castor::dlf::dlf_writep(m_cuuid, DLF_LVL_ERROR, TAPEBRIDGE_FAILED_TO_END_RTCPD_SESSION, params); + + // Gather the error information into an SessionError object + SessionError sessionError; + sessionError.setErrorCode(ex.code()); + sessionError.setErrorMessage(ex.getMessage().str()); + sessionError.setErrorScope(SessionError::SESSION_SCOPE); + + // Push the error onto the back of the list of errors generated during the + // sesion with the rtcpd daemon + m_sessionErrors.push_back(sessionError); } // Close the initial rtcpd-connection @@ -1436,8 +1453,7 @@ void castor::tape::tapebridge::BridgeProtocolEngine::processRtcpFileErrReqDump( ackMsg.magic = header.magic; ackMsg.reqType = header.reqType; ackMsg.lenOrStatus = 0; - LegacyTxRx::sendMsgHeader(m_cuuid, m_jobRequest.volReqId, rtcpdSock, - RTCPDNETRWTIMEOUT, ackMsg); + m_legacyTxRx.sendMsgHeader(rtcpdSock, ackMsg); } @@ -1454,8 +1470,7 @@ void castor::tape::tapebridge::BridgeProtocolEngine::processRtcpWaiting( ackMsg.magic = header.magic; ackMsg.reqType = header.reqType; ackMsg.lenOrStatus = 0; - LegacyTxRx::sendMsgHeader(m_cuuid, m_jobRequest.volReqId, rtcpdSock, - RTCPDNETRWTIMEOUT, ackMsg); + m_legacyTxRx.sendMsgHeader(rtcpdSock, ackMsg); // Determine the error code and message if there is an error embedded in the // message @@ -1916,12 +1931,7 @@ void castor::tape::tapebridge::BridgeProtocolEngine::sendFileToRecallToRtcpd( ackMsg.magic = rtcpdReqMagic; ackMsg.reqType = rtcpdReqType; ackMsg.lenOrStatus = 0; - LegacyTxRx::sendMsgHeader( - m_cuuid, - m_jobRequest.volReqId, - rtcpdSock, - RTCPDNETRWTIMEOUT, - ackMsg); + m_legacyTxRx.sendMsgHeader(rtcpdSock, ackMsg); { castor::dlf::Param params[] = { @@ -1954,8 +1964,7 @@ void castor::tape::tapebridge::BridgeProtocolEngine:: ackMsg.magic = header.magic; ackMsg.reqType = header.reqType; ackMsg.lenOrStatus = 0; - LegacyTxRx::sendMsgHeader(m_cuuid, m_jobRequest.volReqId, rtcpdSock, - RTCPDNETRWTIMEOUT, ackMsg); + m_legacyTxRx.sendMsgHeader(rtcpdSock, ackMsg); // Determine the error code and message if there is an error embedded in the // message @@ -2054,8 +2063,7 @@ void castor::tape::tapebridge::BridgeProtocolEngine:: ackMsg.magic = header.magic; ackMsg.reqType = header.reqType; ackMsg.lenOrStatus = 0; - LegacyTxRx::sendMsgHeader(m_cuuid, m_jobRequest.volReqId, rtcpdSock, - RTCPDNETRWTIMEOUT, ackMsg); + m_legacyTxRx.sendMsgHeader(rtcpdSock, ackMsg); // Drop the message from rtcpd and return if the session is being shutdown if(shuttingDownRtcpdSession()) { @@ -2354,8 +2362,7 @@ void castor::tape::tapebridge::BridgeProtocolEngine::rtcpTapeReqRtcpdCallback( ackMsg.magic = RTCOPY_MAGIC; ackMsg.reqType = RTCP_TAPE_REQ; ackMsg.lenOrStatus = 0; - LegacyTxRx::sendMsgHeader(m_cuuid, m_jobRequest.volReqId, socketFd, - RTCPDNETRWTIMEOUT, ackMsg); + m_legacyTxRx.sendMsgHeader(socketFd, ackMsg); } @@ -2377,8 +2384,7 @@ void ackMsg.magic = RTCOPY_MAGIC; ackMsg.reqType = RTCP_TAPE_REQ; ackMsg.lenOrStatus = 0; - LegacyTxRx::sendMsgHeader(m_cuuid, m_jobRequest.volReqId, socketFd, - RTCPDNETRWTIMEOUT, ackMsg); + m_legacyTxRx.sendMsgHeader(socketFd, ackMsg); if(body.err.errorCode != 0) { TAPE_THROW_CODE(body.err.errorCode, @@ -2401,8 +2407,7 @@ void castor::tape::tapebridge::BridgeProtocolEngine::rtcpEndOfReqRtcpdCallback( ackMsg.magic = RTCOPY_MAGIC; ackMsg.reqType = RTCP_ENDOF_REQ; ackMsg.lenOrStatus = 0; - LegacyTxRx::sendMsgHeader(m_cuuid, m_jobRequest.volReqId, socketFd, - RTCPDNETRWTIMEOUT, ackMsg); + m_legacyTxRx.sendMsgHeader(socketFd, ackMsg); m_nbReceivedENDOF_REQs++; @@ -2508,8 +2513,7 @@ void castor::tape::tapebridge::BridgeProtocolEngine:: ackMsg.magic = RTCOPY_MAGIC; ackMsg.reqType = TAPEBRIDGE_FLUSHEDTOTAPE; ackMsg.lenOrStatus = 0; - LegacyTxRx::sendMsgHeader(m_cuuid, m_jobRequest.volReqId, socketFd, - RTCPDNETRWTIMEOUT, ackMsg); + m_legacyTxRx.sendMsgHeader(socketFd, ackMsg); // Drop the message from rtcpd and return if the session is being shutdown if(shuttingDownRtcpdSession()) { @@ -2981,11 +2985,7 @@ void castor::tape::tapebridge::BridgeProtocolEngine::sendFileToMigrateToRtcpd( ackMsg.magic = rtcpdReqMagic; ackMsg.reqType = rtcpdReqType; ackMsg.lenOrStatus = 0; - LegacyTxRx::sendMsgHeader( - m_cuuid, - m_jobRequest.volReqId, - rtcpdSock, - RTCPDNETRWTIMEOUT,ackMsg); + m_legacyTxRx.sendMsgHeader(rtcpdSock, ackMsg); { castor::dlf::Param params[] = { @@ -3197,8 +3197,7 @@ void castor::tape::tapebridge::BridgeProtocolEngine:: ackMsg.magic = rtcpdReqMagic; ackMsg.reqType = rtcpdReqType; ackMsg.lenOrStatus = 0; - LegacyTxRx::sendMsgHeader(m_cuuid, m_jobRequest.volReqId, rtcpdSock, - RTCPDNETRWTIMEOUT,ackMsg); + m_legacyTxRx.sendMsgHeader(rtcpdSock, ackMsg); { castor::dlf::Param params[] = { @@ -3274,8 +3273,7 @@ void castor::tape::tapebridge::BridgeProtocolEngine::notifyRtcpdEndOfSession() endofReqMsg.magic = RTCOPY_MAGIC; endofReqMsg.reqType = RTCP_ENDOF_REQ; endofReqMsg.lenOrStatus = 0; - LegacyTxRx::sendMsgHeader(m_cuuid, m_jobRequest.volReqId, - m_sockCatalogue.getInitialRtcpdConn(), RTCPDNETRWTIMEOUT, + m_legacyTxRx.sendMsgHeader(m_sockCatalogue.getInitialRtcpdConn(), endofReqMsg); { @@ -3299,8 +3297,8 @@ void castor::tape::tapebridge::BridgeProtocolEngine::notifyRtcpdEndOfSession() // Receive the acknowledge of the RTCP_ENDOF_REQ message legacymsg::MessageHeader ackMsg; try { - LegacyTxRx::receiveMsgHeader(m_cuuid, m_jobRequest.volReqId, - m_sockCatalogue.getInitialRtcpdConn(), RTCPDNETRWTIMEOUT, ackMsg); + m_legacyTxRx.receiveMsgHeader(m_sockCatalogue.getInitialRtcpdConn(), + ackMsg); } catch(castor::exception::Exception &ex) { TAPE_THROW_CODE(EPROTO, ": Failed to receive acknowledge of RTCP_ENDOF_REQ from rtcpd: " @@ -3551,13 +3549,14 @@ void castor::tape::tapebridge::BridgeProtocolEngine::notifyClientEndOfSession() if(!m_sessionErrors.empty()) { // Get the oldest and first error to have occurred - const SessionError sessionError = m_sessionErrors.front(); + const SessionError firstSessionError = m_sessionErrors.front(); // Notify the client the session ended due to the error const uint64_t tapebridgeTransId = m_tapebridgeTransactionCounter.next(); m_clientProxy.notifyEndOfFailedSession(tapebridgeTransId, - sessionError.getErrorCode(), sessionError.getErrorMessage()); + firstSessionError.getErrorCode(), + firstSessionError.getErrorMessage()); } else { // Notify the client the session has ended with success const uint64_t tapebridgeTransId = diff --git a/castor/tape/tapebridge/BridgeProtocolEngine.hpp b/castor/tape/tapebridge/BridgeProtocolEngine.hpp index 8f87d73cab14d14da48314be4da8bec77d4d96a5..622b3a2e637c9579d4012d46e9233ddd0c2f708b 100644 --- a/castor/tape/tapebridge/BridgeProtocolEngine.hpp +++ b/castor/tape/tapebridge/BridgeProtocolEngine.hpp @@ -36,6 +36,7 @@ #include "castor/tape/tapebridge/GetMoreWorkConnection.hpp" #include "castor/tape/tapebridge/IClientProxy.hpp" #include "castor/tape/tapebridge/IFileCloser.hpp" +#include "castor/tape/tapebridge/ILegacyTxRx.hpp" #include "castor/tape/tapebridge/PendingMigrationsStore.hpp" #include "castor/tape/tapebridge/SessionErrorList.hpp" #include "castor/tape/tapebridge/TapeFlushConfigParams.hpp" @@ -63,12 +64,11 @@ namespace tapebridge { * Acts as a bridge between the tapegatewayd and rtcpd daemons. * * The BridgeProtocolEngine behaves like a smart pointer for the initital - * rtcpd-connection, the rtcpd disk/tape IO control-connections and the client - * connections. This means the destructor of the BridgeSocketCatalogue will - * close them if they are still open. + * rtcpd-connection. This means the destructor of the BridgeSocketCatalogue + * will close the inititalrtcpd-connection if it is still open. * * The BridgeProtocolEngine will not close the listen socket used to accept - * rtcpd connections. This is the responsibility of the VdqmRequestHandler. + * rtcpd connections. */ class BridgeProtocolEngine { @@ -114,6 +114,9 @@ public: * connecting from the local host. * @param clientProxy Object representing the client of the * tapebridged daemon. + * @param legacyTxRx Object reponsible for sending and + * receiving the headers of messages + * belonging to the legacy RTCOPY protocol. */ BridgeProtocolEngine( IFileCloser &fileCloser, @@ -129,7 +132,8 @@ public: Counter<uint64_t> &tapebridgeTransactionCounter, const bool logPeerOfCallbackConnectionsFromRtcpd, const bool checkRtcpdIsConnectingFromLocalHost, - IClientProxy &clientProxy) + IClientProxy &clientProxy, + ILegacyTxRx &legacyTxRx) throw(castor::exception::Exception); /** @@ -142,7 +146,7 @@ private: /** * The object used to close file-descriptors. * - * The main goal of this object to facilitate in unit-testing the + * The main goal of this object is to facilitate in unit-testing the * BridgeProtocolEngine. */ IFileCloser &m_fileCloser; @@ -267,6 +271,12 @@ private: */ IClientProxy &m_clientProxy; + /** + * Object reponsible for sending and receiving the headers of messages + * belonging to the legacy RTCOPY protocol. + */ + ILegacyTxRx &m_legacyTxRx; + /** * The files to be migrated to tape that have been returned by the * tapegatewayd daemon but have not yet been consumed by the rtcpd daemon. diff --git a/castor/tape/tapebridge/ClientProxy.cpp b/castor/tape/tapebridge/ClientProxy.cpp index 2d3144cc35b5e8ca8e8ae2f75af1b0019ba26b28..7d207ac6a7b04b1e710df916fa1306540eae63e9 100644 --- a/castor/tape/tapebridge/ClientProxy.cpp +++ b/castor/tape/tapebridge/ClientProxy.cpp @@ -306,11 +306,19 @@ castor::tape::tapegateway::FilesToMigrateList const int clientSock) const throw(castor::exception::Exception) { - std::auto_ptr<castor::IObject> obj(receiveReplyAndClose(clientSock)); - const char *const task = "receive reply to FilesToMigrateListRequest and" " close connection"; + std::auto_ptr<castor::IObject> obj; + try { + obj.reset(receiveReplyAndClose(clientSock)); + } catch(castor::exception::Exception &ex) { + // Add context and rethrow + TAPE_THROW_CODE(ex.code(), + ": Failed to " << task << + ": " << ex.getMessage().str()); + } + switch(obj->type()) { case OBJ_FilesToMigrateList: { @@ -437,11 +445,19 @@ castor::tape::tapegateway::FilesToRecallList const int clientSock) const throw(castor::exception::Exception) { - std::auto_ptr<castor::IObject> obj(receiveReplyAndClose(clientSock)); - const char *const task = "receive reply to FilesToRecallListRequest and" " close connection"; + std::auto_ptr<castor::IObject> obj; + try { + obj.reset(receiveReplyAndClose(clientSock)); + } catch(castor::exception::Exception &ex) { + // Add context and rethrow + TAPE_THROW_CODE(ex.code(), + ": Failed to " << task << + ": " << ex.getMessage().str()); + } + switch(obj->type()) { case OBJ_FilesToRecallList: { @@ -687,6 +703,8 @@ int castor::tape::tapebridge::ClientProxy::connectAndSendMessage( castor::io::ClientSocket sock(m_clientAddress.connectToClient(m_netTimeout, connectDuration)); + sock.setTimeout(m_netTimeout); + sock.setConnTimeout(m_netTimeout); // Send the message try { @@ -765,6 +783,8 @@ castor::IObject castor::io::ClientSocket sock(m_clientAddress.connectToClient(m_netTimeout, connectDuration)); + sock.setTimeout(m_netTimeout); + sock.setConnTimeout(m_netTimeout); // Send the request timeval sendAndReceiveStartTime = {0, 0}; @@ -824,7 +844,18 @@ void castor::tape::tapebridge::ClientProxy::receiveNotificationReplyAndClose( const int clientSock) const throw(castor::exception::Exception) { - std::auto_ptr<castor::IObject> obj(receiveReplyAndClose(clientSock)); + const char *const task = "receive notification reply and close connection"; + + std::auto_ptr<castor::IObject> obj; + + try { + obj.reset(receiveReplyAndClose(clientSock)); + } catch(castor::exception::Exception &ex) { + // Add context and rethrow + TAPE_THROW_CODE(ex.code(), + ": Failed to " << task << + ": " << ex.getMessage().str()); + } switch(obj->type()) { case OBJ_NotificationAcknowledge: @@ -835,7 +866,7 @@ void castor::tape::tapebridge::ClientProxy::receiveNotificationReplyAndClose( if(reply == NULL) { TAPE_THROW_EX(castor::exception::Internal, - ": Failed to receive notification reply and close connection" + ": Failed to " << task << ": Failed to down cast reply object to " "tapegateway::NotificationAcknowledge"); } @@ -846,7 +877,7 @@ void castor::tape::tapebridge::ClientProxy::receiveNotificationReplyAndClose( reply->aggregatorTransactionId()); } catch(castor::exception::Exception &ex) { TAPE_THROW_CODE(ex.code(), - ": Failed to receive notification reply and close connection" + ": Failed to " << task << ": " << ex.getMessage().str()); } } @@ -859,7 +890,7 @@ void castor::tape::tapebridge::ClientProxy::receiveNotificationReplyAndClose( default: { TAPE_THROW_CODE(EBADMSG, - ": Failed to receive notification reply and close connection" + ": Failed to " << task << ": Unknown reply type " ": Reply type = " << obj->type()); } diff --git a/castor/tape/tapebridge/IClientProxy.hpp b/castor/tape/tapebridge/IClientProxy.hpp index 3631196e36552d328f09f9bcc12cfcda6a707c58..5a9a7db2681e1f4e706fcdb38e9cfe6d57597c1a 100644 --- a/castor/tape/tapebridge/IClientProxy.hpp +++ b/castor/tape/tapebridge/IClientProxy.hpp @@ -44,8 +44,8 @@ namespace tape { namespace tapebridge { /** - * The interface to implemented by an object acting as a proxy for the client - * of the tapebridged daemon. + * Abstract class specifiying the interface to be implemented by an object + * acting as a proxy for the client of the tapebridged daemon. * * A client of the tapebridged daemon may be dumptp, readtp, tapegatewayd, or * writetp. diff --git a/castor/tape/tapebridge/ILegacyTxRx.cpp b/castor/tape/tapebridge/ILegacyTxRx.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c14eb249be1c142fa369d3210d2df2f867a00d10 --- /dev/null +++ b/castor/tape/tapebridge/ILegacyTxRx.cpp @@ -0,0 +1,32 @@ +/****************************************************************************** + * castor/tape/tapebridge/ILegacyTxRx.cpp + * + * This file is part of the Castor project. + * See http://castor.web.cern.ch/castor + * + * Copyright (C) 2003 CERN + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * + * + * @author Steven.Murray@cern.ch + *****************************************************************************/ + +#include "castor/tape/tapebridge/ILegacyTxRx.hpp" + +//----------------------------------------------------------------------------- +// destructor +//----------------------------------------------------------------------------- +castor::tape::tapebridge::ILegacyTxRx::~ILegacyTxRx() throw() { + // Do nothing +} diff --git a/castor/tape/tapebridge/ILegacyTxRx.hpp b/castor/tape/tapebridge/ILegacyTxRx.hpp new file mode 100644 index 0000000000000000000000000000000000000000..06f5ceb78a44da0f3af2217b67602e9b069a8eb7 --- /dev/null +++ b/castor/tape/tapebridge/ILegacyTxRx.hpp @@ -0,0 +1,99 @@ +/****************************************************************************** + * castor/tape/tapebridge/ILegacyTxRx.hpp + * + * This file is part of the Castor project. + * See http://castor.web.cern.ch/castor + * + * Copyright (C) 2003 CERN + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * + * + * @author Steven.Murray@cern.ch + *****************************************************************************/ + +#ifndef CASTOR_TAPE_TAPEBRIDGE_ILEGACYTXRX_HPP +#define CASTOR_TAPE_TAPEBRIDGE_ILEGACYTXRX_HPP 1 + +#include "castor/exception/Exception.hpp" +#include "castor/tape/legacymsg/MessageHeader.hpp" + +namespace castor { +namespace tape { +namespace tapebridge { + +/** + * Abstract class specifying the interface to be implemented by an object + * repsonsible for sending and receiving the headers of messages belonging to + * the legacy RTCOPY protocol. + */ +class ILegacyTxRx { + +public: + + /** + * Virtual destructor. + */ + virtual ~ILegacyTxRx() throw(); + + /** + * Sends the specified message header to RTCPD using the specified socket. + * + * @param socketFd The socket file descriptor of the connection with RTCPD. + * @param header The message header to be sent. + */ + virtual void sendMsgHeader( + const int socketFd, + const legacymsg::MessageHeader &header) + throw(castor::exception::Exception) = 0; + + /** + * Receives a message header. + * + * This operation assumes that all of the bytes can be read in. Failure + * to read in all the bytes or a closed connection will result in an + * exception being thrown. + * + * If it is normal that the connection can be closed by the peer, for + * example you are using select, then please use + * receiveMessageHeaderFromCloseableConn(). + * + * @param socketFd The socket file descriptor of the connection with RTCPD. + * @param request The request which will be filled with the contents of the + * received message. + */ + virtual void receiveMsgHeader( + const int socketFd, + legacymsg::MessageHeader &header) + throw(castor::exception::Exception) = 0; + + /** + * Receives a message header or a connection close message. + * + * @param socketFd The socket file-descriptor of the connection with RTCPD. + * @param request The request which will be filled with the contents of the + * received message. + * @return True if the connection was closed by the peer, else false. + */ + virtual bool receiveMsgHeaderFromCloseable( + const int socketFd, + legacymsg::MessageHeader &header) + throw(castor::exception::Exception) = 0; + +}; // class LegacyTxRx + +} // namespace tapebridge +} // namespace tape +} // namespace castor + +#endif // CASTOR_TAPE_TAPEBRIDGE_ILEGACYTXRX_HPP diff --git a/castor/tape/tapebridge/Imakefile b/castor/tape/tapebridge/Imakefile index a641d85fd6e357768516c3f6b9d50780368cde4d..50885781da43e64a756b50319d97dc8f5954251b 100644 --- a/castor/tape/tapebridge/Imakefile +++ b/castor/tape/tapebridge/Imakefile @@ -49,6 +49,7 @@ TAPEBRIDGEBIN_SRCS = \ DlfMessageStrings.cpp \ IClientProxy.cpp \ IFileCloser.cpp \ + ILegacyTxRx.cpp \ LegacyTxRx.cpp \ LogHelper.cpp \ MigrationReportConnection.cpp \ diff --git a/castor/tape/tapebridge/LegacyTxRx.cpp b/castor/tape/tapebridge/LegacyTxRx.cpp index 33ee4b2e35acdc3daf2b18f9aa781fd80f5f2dc5..a87d8fe361fb7713d0f3a1f38fedc6f7031f8385 100644 --- a/castor/tape/tapebridge/LegacyTxRx.cpp +++ b/castor/tape/tapebridge/LegacyTxRx.cpp @@ -25,10 +25,11 @@ #include "castor/Constants.hpp" #include "castor/dlf/Dlf.hpp" #include "castor/exception/Internal.hpp" +#include "castor/tape/legacymsg/CommonMarshal.hpp" +#include "castor/tape/net/net.hpp" #include "castor/tape/tapebridge/DlfMessageConstants.hpp" #include "castor/tape/tapebridge/Constants.hpp" #include "castor/tape/tapebridge/LegacyTxRx.hpp" -#include "castor/tape/net/net.hpp" #include "castor/tape/utils/utils.hpp" #include "h/common.h" @@ -36,12 +37,38 @@ #include <string.h> +//----------------------------------------------------------------------------- +// constructor +//----------------------------------------------------------------------------- +castor::tape::tapebridge::LegacyTxRx::LegacyTxRx(const int netReadWriteTimeout) + throw(): m_netReadWriteTimeout(netReadWriteTimeout) { + // Do nothing +} + + +//----------------------------------------------------------------------------- +// destructor +//----------------------------------------------------------------------------- +castor::tape::tapebridge::LegacyTxRx::~LegacyTxRx() throw() { + // Do nothing +} + + +//----------------------------------------------------------------------------- +// getNetReadWriteTimeout +//----------------------------------------------------------------------------- +int castor::tape::tapebridge::LegacyTxRx::getNetReadWriteTimeout() const + throw() { + return m_netReadWriteTimeout; +} + + //----------------------------------------------------------------------------- // sendMsgHeader //----------------------------------------------------------------------------- void castor::tape::tapebridge::LegacyTxRx::sendMsgHeader( - const Cuuid_t &cuuid, const uint32_t volReqId, const int socketFd, - const int netReadWriteTimeout, const legacymsg::MessageHeader &header) + const int socketFd, + const legacymsg::MessageHeader &header) throw(castor::exception::Exception) { char buf[RTCPMSGBUFSIZE]; @@ -55,31 +82,13 @@ void castor::tape::tapebridge::LegacyTxRx::sendMsgHeader( << ex.getMessage().str()); } - { - castor::dlf::Param params[] = { - castor::dlf::Param("volReqId", volReqId), - castor::dlf::Param("socketFd", socketFd)}; - - castor::dlf::dlf_writep(cuuid, DLF_LVL_DEBUG, - TAPEBRIDGE_SEND_HEADER_TO_RTCPD, params); - } - try { - net::writeBytes(socketFd, netReadWriteTimeout, totalLen, buf); + net::writeBytes(socketFd, m_netReadWriteTimeout, totalLen, buf); } catch(castor::exception::Exception &ex) { TAPE_THROW_CODE(SECOMERR, ": Failed to send message header to RTCPD" ": " << ex.getMessage().str()); } - - { - castor::dlf::Param params[] = { - castor::dlf::Param("volReqId", volReqId), - castor::dlf::Param("socketFd", socketFd)}; - - castor::dlf::dlf_writep(cuuid, DLF_LVL_DEBUG, - TAPEBRIDGE_SENT_HEADER_TO_RTCPD, params); - } } @@ -87,14 +96,15 @@ void castor::tape::tapebridge::LegacyTxRx::sendMsgHeader( // receiveMsgHeader //----------------------------------------------------------------------------- void castor::tape::tapebridge::LegacyTxRx::receiveMsgHeader( - const Cuuid_t&, const uint32_t, const int socketFd, - const int, legacymsg::MessageHeader &header) + const int socketFd, + legacymsg::MessageHeader &header) throw(castor::exception::Exception) { // Read in the message header char headerBuf[3 * sizeof(uint32_t)]; // magic + request type + len try { - net::readBytes(socketFd, RTCPDNETRWTIMEOUT, sizeof(headerBuf), headerBuf); + net::readBytes(socketFd, m_netReadWriteTimeout, sizeof(headerBuf), + headerBuf); } catch (castor::exception::Exception &ex) { TAPE_THROW_CODE(SECOMERR, ": Failed to read message header from RTCPD" @@ -117,15 +127,16 @@ void castor::tape::tapebridge::LegacyTxRx::receiveMsgHeader( //----------------------------------------------------------------------------- // receiveMsgHeaderFromCloseableConn //----------------------------------------------------------------------------- -void castor::tape::tapebridge::LegacyTxRx::receiveMsgHeaderFromCloseable( - const Cuuid_t&, bool &connClosed, const uint32_t, - const int socketFd, const int, +bool castor::tape::tapebridge::LegacyTxRx::receiveMsgHeaderFromCloseable( + const int socketFd, legacymsg::MessageHeader &header) throw(castor::exception::Exception) { + bool connClosed = false; + // Read in the message header char headerBuf[3 * sizeof(uint32_t)]; // magic + request type + len try { - net::readBytesFromCloseable(connClosed, socketFd, RTCPDNETRWTIMEOUT, + connClosed = net::readBytesFromCloseable(socketFd, m_netReadWriteTimeout, sizeof(headerBuf), headerBuf); } catch (castor::exception::Exception &ex) { TAPE_THROW_CODE(SECOMERR, @@ -143,4 +154,6 @@ void castor::tape::tapebridge::LegacyTxRx::receiveMsgHeaderFromCloseable( ": Failed to unmarshal message header from RTCPD" ": " << ex.getMessage().str()); } + + return connClosed; } diff --git a/castor/tape/tapebridge/LegacyTxRx.hpp b/castor/tape/tapebridge/LegacyTxRx.hpp index b0634c517f7c502a8483b6cea01d45301a14e8d1..12fac76130d91d061ee0dd1ff883900be7d9bd20 100644 --- a/castor/tape/tapebridge/LegacyTxRx.hpp +++ b/castor/tape/tapebridge/LegacyTxRx.hpp @@ -25,21 +25,7 @@ #ifndef CASTOR_TAPE_TAPEBRIDGE_LEGACYTXRX_HPP #define CASTOR_TAPE_TAPEBRIDGE_LEGACYTXRX_HPP 1 -#include "castor/exception/Exception.hpp" -#include "castor/io/ClientSocket.hpp" -#include "castor/tape/tapebridge/Constants.hpp" -#include "castor/tape/tapebridge/LogHelper.hpp" -#include "castor/tape/legacymsg/RtcpDumpTapeRqstMsgBody.hpp" -#include "castor/tape/legacymsg/CommonMarshal.hpp" -#include "castor/tape/legacymsg/RtcpMarshal.hpp" -#include "castor/tape/legacymsg/MessageHeader.hpp" -#include "castor/tape/net/net.hpp" -#include "castor/tape/utils/utils.hpp" -#include "h/Cuuid.h" - -#include <iostream> -#include <stdint.h> - +#include "castor/tape/tapebridge/ILegacyTxRx.hpp" namespace castor { namespace tape { @@ -49,23 +35,42 @@ namespace tapebridge { * Provides common functions for sending and receiving the messages of the * legacy protocols: RTCOPY and VMGR. */ -class LegacyTxRx { +class LegacyTxRx: public ILegacyTxRx { public: + /** + * Constructor. + * + * @param netReadWriteTimeout The timeout in seconds to be applied when + * performing network read and write operations. + */ + LegacyTxRx(const int netReadWriteTimeout) throw(); + + /** + * Destructor. + */ + ~LegacyTxRx() throw(); + + /** + * Returns the timeout in seconds to be applied when performing network read + * and write operations. + * + * @return The timeout in seconds to be applied when performing network read + * and write operations. + */ + int getNetReadWriteTimeout() const throw(); + /** * Sends the specified message header to RTCPD using the specified socket. * - * @param cuuid The ccuid to be used for logging. - * @param volReqId The volume request ID to be used for logging. * @param socketFd The socket file descriptor of the connection with RTCPD. - * @param netReadWriteTimeout The timeout to be applied when performing - * network read and write operations. - * @param header The message header to be sent. + * @param header The message header to be sent. */ - static void sendMsgHeader(const Cuuid_t &cuuid, - const uint32_t volReqId, const int socketFd, const int netReadWriteTimeout, - const legacymsg::MessageHeader &header) throw(castor::exception::Exception); + void sendMsgHeader( + const int socketFd, + const legacymsg::MessageHeader &header) + throw(castor::exception::Exception); /** * Receives a message header. @@ -78,44 +83,35 @@ public: * example you are using select, then please use * receiveMessageHeaderFromCloseableConn(). * - * @param cuuid The ccuid to be used for logging. - * @param volReqId The volume request ID to be used for logging. * @param socketFd The socket file descriptor of the connection with RTCPD. - * @param netReadWriteTimeout The timeout to be applied when performing - * network read and write operations. - * @param request The request which will be filled with the contents of the - * received message. + * @param request The request which will be filled with the contents of the + * received message. */ - static void receiveMsgHeader(const Cuuid_t &cuuid, - const uint32_t volReqId, const int socketFd, const int netReadWriteTimeout, - legacymsg::MessageHeader &header) throw(castor::exception::Exception); + void receiveMsgHeader( + const int socketFd, + legacymsg::MessageHeader &header) + throw(castor::exception::Exception); /** * Receives a message header or a connection close message. * - * @param cuuid The ccuid to be used for logging. - * @param connClosed Output parameter: True if the connection was closed by - * the peer. - * @param volReqId The volume request ID to be used for logging. - * @param socketFd The socket file descriptor of the connection with RTCPD. - * @param netReadWriteTimeout The timeout to be applied when performing - * network read and write operations. - * @param request The request which will be filled with the contents of the - * received message. + * @param socketFd The socket file-descriptor of the connection with RTCPD. + * @param request The request which will be filled with the contents of the + * received message. + * @return True if the connection was closed by the peer, else false. */ - static void receiveMsgHeaderFromCloseable(const Cuuid_t &cuuid, - bool &connClosed, const uint32_t volReqId, const int socketFd, - const int netReadWriteTimeout, legacymsg::MessageHeader &header) + bool receiveMsgHeaderFromCloseable( + const int socketFd, + legacymsg::MessageHeader &header) throw(castor::exception::Exception); - private: /** - * Private constructor to inhibit instances of this class from being - * instantiated. + * The timeout in seconds to be applied when performing network read and + * write operations. */ - LegacyTxRx() {} + int m_netReadWriteTimeout; }; // class LegacyTxRx diff --git a/castor/tape/tapebridge/RtcpTxRx.cpp b/castor/tape/tapebridge/RtcpTxRx.cpp index bb34064e82bf167d88430291db15b8f022a4d142..f8edf8e6ca3c4803d218dad7d70f223f47c88030 100644 --- a/castor/tape/tapebridge/RtcpTxRx.cpp +++ b/castor/tape/tapebridge/RtcpTxRx.cpp @@ -84,10 +84,10 @@ void castor::tape::tapebridge::RtcpTxRx::getRequestInfoFromRtcpd( } // Receive acknowledge from RTCPD + LegacyTxRx legacyTxRx(netReadWriteTimeout); legacymsg::MessageHeader ackMsg; try { - LegacyTxRx::receiveMsgHeader(cuuid, volReqId, socketFd, netReadWriteTimeout, - ackMsg); + legacyTxRx.receiveMsgHeader(socketFd, ackMsg); } catch(castor::exception::Exception &ex) { TAPE_THROW_CODE(EPROTO, ": Failed to receive acknowledge from RTCPD: " @@ -108,8 +108,7 @@ void castor::tape::tapebridge::RtcpTxRx::getRequestInfoFromRtcpd( utils::setBytes(reply, '\0'); try { legacymsg::MessageHeader header; - LegacyTxRx::receiveMsgHeader(cuuid, volReqId, socketFd, netReadWriteTimeout, - header); + legacyTxRx.receiveMsgHeader(socketFd, header); RtcpTxRx::receiveMsgBody(cuuid, volReqId, socketFd, netReadWriteTimeout, header, reply); } catch(castor::exception::Exception &ex) { @@ -120,8 +119,7 @@ void castor::tape::tapebridge::RtcpTxRx::getRequestInfoFromRtcpd( // Send acknowledge to RTCPD try { - LegacyTxRx::sendMsgHeader(cuuid, volReqId, socketFd, netReadWriteTimeout, - ackMsg); + legacyTxRx.sendMsgHeader(socketFd, ackMsg); } catch(castor::exception::Exception &ex) { TAPE_THROW_CODE(EPROTO, ": Failed to send acknowledge to RTCPD" @@ -165,10 +163,10 @@ void castor::tape::tapebridge::RtcpTxRx::giveVolumeToRtcpd( } // Receive acknowledge from RTCPD + LegacyTxRx legacyTxRx(netReadWriteTimeout); legacymsg::MessageHeader ackMsg; try { - LegacyTxRx::receiveMsgHeader(cuuid, volReqId, socketFd, netReadWriteTimeout, - ackMsg); + legacyTxRx.receiveMsgHeader(socketFd, ackMsg); } catch(castor::exception::Exception &ex) { TAPE_THROW_CODE(EPROTO, ": Failed to receive acknowledge from RTCPD: " @@ -230,9 +228,9 @@ void castor::tape::tapebridge::RtcpTxRx::giveFileToRtcpd( // Receive acknowledge from RTCPD legacymsg::MessageHeader ackMsg; + LegacyTxRx legacyTxRx(netReadWriteTimeout); try { - LegacyTxRx::receiveMsgHeader(cuuid, volReqId, socketFd, netReadWriteTimeout, - ackMsg); + legacyTxRx.receiveMsgHeader(socketFd, ackMsg); } catch(castor::exception::Exception &ex) { TAPE_THROW_CODE(EPROTO, ": Failed to receive acknowledge from RTCPD: " @@ -293,9 +291,9 @@ void castor::tape::tapebridge::RtcpTxRx::tellRtcpdDumpTape( // Receive acknowledge from RTCPD legacymsg::MessageHeader ackMsg; + LegacyTxRx legacyTxRx(netReadWriteTimeout); try { - LegacyTxRx::receiveMsgHeader(cuuid, volReqId, socketFd, netReadWriteTimeout, - ackMsg); + legacyTxRx.receiveMsgHeader(socketFd, ackMsg); } catch(castor::exception::Exception &ex) { TAPE_THROW_CODE(EPROTO, ": Failed to receive acknowledge from RTCPD: " @@ -410,9 +408,9 @@ void castor::tape::tapebridge::RtcpTxRx::tellRtcpdEndOfFileList( // Receive acknowledge from RTCPD legacymsg::MessageHeader ackMsg; + LegacyTxRx legacyTxRx(netReadWriteTimeout); try { - LegacyTxRx::receiveMsgHeader(cuuid, volReqId, socketFd, netReadWriteTimeout, - ackMsg); + legacyTxRx.receiveMsgHeader(socketFd, ackMsg); } catch(castor::exception::Exception &ex) { TAPE_THROW_CODE(EPROTO, ": Failed to receive acknowledge from RTCPD" @@ -505,9 +503,9 @@ void castor::tape::tapebridge::RtcpTxRx::tellRtcpdToAbort(const Cuuid_t &cuuid, // Receive acknowledge from RTCPD legacymsg::MessageHeader ackMsg; + LegacyTxRx legacyTxRx(netReadWriteTimeout); try { - LegacyTxRx::receiveMsgHeader(cuuid, volReqId, socketFd, netReadWriteTimeout, - ackMsg); + legacyTxRx.receiveMsgHeader(socketFd, ackMsg); } catch(castor::exception::Exception &ex) { TAPE_THROW_CODE(EPROTO, ": Failed to receive acknowledge from RTCPD: " @@ -691,9 +689,9 @@ void castor::tape::tapebridge::RtcpTxRx::askRtcpdToRequestMoreWork( // Receive acknowledge from RTCPD legacymsg::MessageHeader ackMsg; + LegacyTxRx legacyTxRx(netReadWriteTimeout); try { - LegacyTxRx::receiveMsgHeader(cuuid, volReqId, socketFd, netReadWriteTimeout, - ackMsg); + legacyTxRx.receiveMsgHeader(socketFd, ackMsg); } catch(castor::exception::Exception &ex) { TAPE_THROW_CODE(EPROTO, ": Failed to receive acknowledge from RTCPD" diff --git a/castor/tape/tapebridge/VdqmRequestHandler.cpp b/castor/tape/tapebridge/VdqmRequestHandler.cpp index d6855a7dd7c4a579fffbe6945532d12a3be004b5..6ae277cdcd6862b25a558b604a1d1ee131b3ef47 100644 --- a/castor/tape/tapebridge/VdqmRequestHandler.cpp +++ b/castor/tape/tapebridge/VdqmRequestHandler.cpp @@ -32,6 +32,7 @@ #include "castor/tape/tapebridge/ClientAddressTcpIp.hpp" #include "castor/tape/tapebridge/ClientProxy.hpp" #include "castor/tape/tapebridge/Constants.hpp" +#include "castor/tape/tapebridge/LegacyTxRx.hpp" #include "castor/tape/tapebridge/RtcpJobSubmitter.hpp" #include "castor/tape/tapebridge/RtcpTxRx.hpp" #include "castor/tape/tapebridge/SystemFileCloser.hpp" @@ -147,18 +148,16 @@ void castor::tape::tapebridge::VdqmRequestHandler::run(void *param) // Log the connection from the VDQM { - unsigned short port = 0; // Client port - unsigned long ip = 0; // Client IP - char hostName[net::HOSTNAMEBUFLEN]; - - net::getPeerIpPort(vdqmSock.get(), ip, port); + char hostName[net::HOSTNAMEBUFLEN]; + const net::IpAndPort peerIpAndPort = net::getPeerIpPort(vdqmSock.get()); net::getPeerHostName(vdqmSock.get(), hostName); castor::dlf::Param params[] = { - castor::dlf::Param("IP" , castor::dlf::IPAddress(ip)), - castor::dlf::Param("Port" , port ), - castor::dlf::Param("HostName", hostName ), - castor::dlf::Param("socketFd", vdqmSock.get() )}; + castor::dlf::Param("IP" , + castor::dlf::IPAddress(peerIpAndPort.getIp())), + castor::dlf::Param("Port" , peerIpAndPort.getPort()), + castor::dlf::Param("HostName", hostName ), + castor::dlf::Param("socketFd", vdqmSock.get() )}; castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM, TAPEBRIDGE_RECEIVED_VDQM_CONNECTION, params); } @@ -391,17 +390,6 @@ void castor::tape::tapebridge::VdqmRequestHandler::exceptionThrowingRun( const int bridgeCallbackSockFd) throw(castor::exception::Exception) { - ClientAddressTcpIp - clientAddress(jobRequest.clientHost, jobRequest.clientPort); - - ClientProxy clientProxy( - cuuid, - jobRequest.volReqId, - CLIENTNETRWTIMEOUT, - clientAddress, - jobRequest.dgn, - jobRequest.driveUnit); - // Accept the initial incoming RTCPD callback connection. // Wrap the socket file descriptor in a smart file descriptor so that it is // guaranteed to be closed if it goes out of scope. @@ -410,17 +398,17 @@ void castor::tape::tapebridge::VdqmRequestHandler::exceptionThrowingRun( // Log the initial callback connection from RTCPD try { - unsigned short port = 0; // Client port - unsigned long ip = 0; // Client IP - char hostName[net::HOSTNAMEBUFLEN]; + char hostName[net::HOSTNAMEBUFLEN]; - net::getPeerIpPort(rtcpdInitialSock.get(), ip, port); + const net::IpAndPort peerIpAndPort = + net::getPeerIpPort(rtcpdInitialSock.get()); net::getPeerHostName(rtcpdInitialSock.get(), hostName); castor::dlf::Param params[] = { castor::dlf::Param("volReqId", jobRequest.volReqId ), - castor::dlf::Param("IP" , castor::dlf::IPAddress(ip)), - castor::dlf::Param("Port" , port ), + castor::dlf::Param("IP" , + castor::dlf::IPAddress(peerIpAndPort.getIp())), + castor::dlf::Param("Port" , peerIpAndPort.getPort() ), castor::dlf::Param("HostName", hostName ), castor::dlf::Param("socketFd", rtcpdInitialSock.get() )}; castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM, @@ -452,6 +440,17 @@ void castor::tape::tapebridge::VdqmRequestHandler::exceptionThrowingRun( " RTCPD volume request ID=" << rtcpdRequestInfoReply.volReqId); } + // Create the client proxy + ClientAddressTcpIp + clientAddress(jobRequest.clientHost, jobRequest.clientPort); + ClientProxy clientProxy( + cuuid, + jobRequest.volReqId, + CLIENTNETRWTIMEOUT, + clientAddress, + jobRequest.dgn, + jobRequest.driveUnit); + // Get the volume from the client of the tape-bridge std::auto_ptr<tapegateway::Volume> volume(clientProxy.getVolume( tapebridgeTransactionCounter.next())); @@ -478,6 +477,10 @@ void castor::tape::tapebridge::VdqmRequestHandler::exceptionThrowingRun( return; } + // Create the object reponsible for sending and receiving the headers of + // messages belonging to the legacy RTCOPY protocol + LegacyTxRx legacyTxRx(RTCPDNETRWTIMEOUT); + // If migrating if(volume->mode() == tapegateway::WRITE) { @@ -534,7 +537,8 @@ void castor::tape::tapebridge::VdqmRequestHandler::exceptionThrowingRun( tapebridgeTransactionCounter, logPeerOfCallbackConnectionsFromRtcpd, checkRtcpdIsConnectingFromLocalHost, - clientProxy); + clientProxy, + legacyTxRx); bridgeProtocolEngine.run(); // Else recalling @@ -558,7 +562,8 @@ void castor::tape::tapebridge::VdqmRequestHandler::exceptionThrowingRun( tapebridgeTransactionCounter, logPeerOfCallbackConnectionsFromRtcpd, checkRtcpdIsConnectingFromLocalHost, - clientProxy); + clientProxy, + legacyTxRx); bridgeProtocolEngine.run(); } } diff --git a/castor/tape/tapebridge/VmgrTxRx.cpp b/castor/tape/tapebridge/VmgrTxRx.cpp index 3f40fb569d074216de1024d1092187137411ff2f..2ef6beaf1a42bebe38a4653339178ac1e9b973f1 100644 --- a/castor/tape/tapebridge/VmgrTxRx.cpp +++ b/castor/tape/tapebridge/VmgrTxRx.cpp @@ -140,8 +140,8 @@ void castor::tape::tapebridge::VmgrTxRx::getTapeInfoFromVmgr( legacymsg::MessageHeader header; utils::setBytes(header, '\0'); try { - LegacyTxRx::receiveMsgHeader(cuuid, volReqId, sock.socket(), - netReadWriteTimeout, header); + LegacyTxRx legacyTxRx(netReadWriteTimeout); + legacyTxRx.receiveMsgHeader(sock.socket(), header); } catch(castor::exception::Exception &ex) { TAPE_THROW_CODE(EPROTO, ": Failed to receive tape request from RTCPD" diff --git a/test/unittest/Makefile b/test/unittest/Makefile index a489228568eec4f0f438a4596bc3c5d1b7f279ad..b8f87dd1fdd89659b815f5124f639e1f33d72a11 100644 --- a/test/unittest/Makefile +++ b/test/unittest/Makefile @@ -243,6 +243,7 @@ rununittests: \ tape_findpgrp.o \ tape_initlabel.o \ tape_net.o \ + tape_net_IpAndPort.o \ tape_send2tpd.o \ tape_usrmsg.o \ tape_utils.o \ @@ -262,6 +263,7 @@ rununittests: \ tapebridge_GetMoreWorkConnection.o \ tapebridge_IClientProxy.o \ tapebridge_IFileCloser.o \ + tapebridge_ILegacyTxRx.o \ tapebridge_LegacyTxRx.o \ tapebridge_LogHelper.o \ tapebridge_marshall.o \ @@ -341,6 +343,7 @@ runUnitTestsMain.cpp: \ $(ROOT_DIR)/test/unittest/castor/tape/tapebridge/FileToMigrateTest.hpp \ $(ROOT_DIR)/test/unittest/castor/tape/tapebridge/FileToRecallTest.hpp \ $(ROOT_DIR)/test/unittest/castor/tape/tapebridge/GetMoreWorkConnectionTest.hpp \ + $(ROOT_DIR)/test/unittest/castor/tape/tapebridge/LegacyTxRxTest.hpp \ $(ROOT_DIR)/test/unittest/castor/tape/tapebridge/MigrationReportConnectionTest.hpp \ $(ROOT_DIR)/test/unittest/castor/tape/tapebridge/TapeBridgeDaemonTest.hpp \ $(ROOT_DIR)/test/unittest/castor/tape/tapebridge/TapeFlushConfigParamsTest.hpp \ @@ -1132,6 +1135,9 @@ tape_getcompstat.o: $(ROOT_DIR)/tape/getcompstat.c tape_net.o: $(ROOT_DIR)/castor/tape/net/net.cpp g++ $(COMMON_OPS) -c $(INCLUDE_OPS) -o $@ $^ +tape_net_IpAndPort.o: $(ROOT_DIR)/castor/tape/net/IpAndPort.cpp + g++ $(COMMON_OPS) -c $(INCLUDE_OPS) -o $@ $^ + tape_initlabel.o: $(ROOT_DIR)/tape/initlabel.c gcc $(COMMON_OPS) -c $(INCLUDE_OPS) -o $@ $^ @@ -1248,6 +1254,10 @@ tapebridge_IFileCloser.o: \ $(ROOT_DIR)/castor/tape/tapebridge/IFileCloser.cpp g++ $(COMMON_OPS) -c $(INCLUDE_OPS) -o $@ $^ +tapebridge_ILegacyTxRx.o: \ + $(ROOT_DIR)/castor/tape/tapebridge/ILegacyTxRx.cpp + g++ $(COMMON_OPS) -c $(INCLUDE_OPS) -o $@ $^ + tapebridge_LegacyTxRx.o: \ $(ROOT_DIR)/castor/tape/tapebridge/LegacyTxRx.cpp g++ $(COMMON_OPS) -c $(INCLUDE_OPS) -o $@ $^ diff --git a/test/unittest/castor/tape/tapebridge/BridgeClientInfo2SenderTest.hpp b/test/unittest/castor/tape/tapebridge/BridgeClientInfo2SenderTest.hpp index c131f7e7b56a6d3b154883fe575a9ef9a800e4b5..2fa5a89151786194003300c913592796655b98fd 100644 --- a/test/unittest/castor/tape/tapebridge/BridgeClientInfo2SenderTest.hpp +++ b/test/unittest/castor/tape/tapebridge/BridgeClientInfo2SenderTest.hpp @@ -56,81 +56,99 @@ namespace castor { namespace tape { namespace tapebridge { -int createListenerSock_stdException(const char *addr, - const unsigned short lowPort, const unsigned short highPort, - unsigned short &chosenPort) { - try { - return net::createListenerSock(addr, lowPort, highPort, - chosenPort); - } catch(castor::exception::Exception &ex) { - test_exception te(ex.getMessage().str()); - - throw te; - } -} - -typedef struct { - int inListenSocketFd; - int32_t inMarshalledMsgBodyLen; - int outGetClientInfo2Success; - tapeBridgeClientInfo2MsgBody_t outMsgBody; -} rtcpd_thread_params; - -void *rtcpd_thread(void *arg) { - try { - rtcpd_thread_params *threadParams = - (rtcpd_thread_params*)arg; - utils::SmartFd listenSock(threadParams->inListenSocketFd); - - const time_t acceptTimeout = 10; // Timeout is in seconds - utils::SmartFd connectionSockFd( - net::acceptConnection(listenSock.get(), acceptTimeout)); - - const int netReadWriteTimeout = 10; // Timeout is in seconds - rtcpClientInfo_t client; - rtcpTapeRequest_t tapeReq; - rtcpFileRequest_t fileReq; - int clientIsTapeBridge = 0; - char errBuf[1024]; - rtcpd_GetClientInfo( - connectionSockFd.get(), - netReadWriteTimeout, - &tapeReq, - &fileReq, - &client, - &clientIsTapeBridge, - &(threadParams->outMsgBody), - errBuf, - sizeof(errBuf)); - - threadParams->outGetClientInfo2Success = 1; - } catch(castor::exception::Exception &ce) { - std::cerr << - "ERROR" - ": rtcpd_thread" - ": Caught a castor::exception::Exception" - ": " << ce.getMessage().str() << std::endl; - } catch(std::exception &se) { - std::cerr << - "ERROR" - ": rtcpd_thread" - ": Caught an std::exception" - ": " << se.what() << std::endl; - } catch(...) { - std::cerr << - "ERROR" - ": rtcpd_thread" - ": Caught an unknown exception"; - } - - return arg; -} - class BridgeClientInfo2SenderTest: public CppUnit::TestFixture { private: tapeBridgeClientInfo2MsgBody_t m_clientInfoMsgBody; + int createListenerSock_stdException(const char *addr, + const unsigned short lowPort, const unsigned short highPort, + unsigned short &chosenPort) { + try { + return net::createListenerSock(addr, lowPort, highPort, + chosenPort); + } catch(castor::exception::Exception &ex) { + test_exception te(ex.getMessage().str()); + + throw te; + } + } + + struct rtcpd_thread_params { + int inListenSocketFd; + int32_t inMarshalledMsgBodyLen; + int outGetClientInfo2Success; + tapeBridgeClientInfo2MsgBody_t outMsgBody; + bool outAnErrorOccurred; + std::ostringstream outErrorStream; + + rtcpd_thread_params(): + inListenSocketFd(-1), + inMarshalledMsgBodyLen(0), + outGetClientInfo2Success(0), + outAnErrorOccurred(false) { + memset(&outMsgBody, '\0', sizeof(outMsgBody)); + } + }; + + static void *rtcpd_thread(void *arg) { + rtcpd_thread_params *const threadParams = (rtcpd_thread_params*)arg; + + try { + if(NULL == threadParams) { + test_exception te("Pointer to the thread-parameters is NULL"); + throw te; + } + + utils::SmartFd listenSock(threadParams->inListenSocketFd); + + const time_t acceptTimeout = 10; // Timeout is in seconds + utils::SmartFd connectionSockFd( + net::acceptConnection(listenSock.get(), acceptTimeout)); + + const int netReadWriteTimeout = 10; // Timeout is in seconds + rtcpClientInfo_t client; + rtcpTapeRequest_t tapeReq; + rtcpFileRequest_t fileReq; + int clientIsTapeBridge = 0; + char errBuf[1024]; + rtcpd_GetClientInfo( + connectionSockFd.get(), + netReadWriteTimeout, + &tapeReq, + &fileReq, + &client, + &clientIsTapeBridge, + &(threadParams->outMsgBody), + errBuf, + sizeof(errBuf)); + + threadParams->outGetClientInfo2Success = 1; + } catch(castor::exception::Exception &ce) { + threadParams->outAnErrorOccurred = true; + threadParams->outErrorStream << + "ERROR" + ": " << __FUNCTION__ << + ": Caught a castor::exception::Exception" + ": " << ce.getMessage().str() << std::endl; + } catch(std::exception &se) { + threadParams->outAnErrorOccurred = true; + threadParams->outErrorStream << + "ERROR" + ": " << __FUNCTION__ << + ": Caught an std::exception" + ": " << se.what() << std::endl; + } catch(...) { + threadParams->outAnErrorOccurred = true; + threadParams->outErrorStream << + "ERROR" + ": " << __FUNCTION__ << + ": Caught an unknown exception"; + } + + return arg; + } + public: BridgeClientInfo2SenderTest() { @@ -202,7 +220,6 @@ public: void testSubmit() { rtcpd_thread_params threadParams; - memset(&threadParams, '\0', sizeof(threadParams)); threadParams.inMarshalledMsgBodyLen = tapebridge_tapeBridgeClientInfo2MsgBodyMarshalledSize( &m_clientInfoMsgBody); @@ -256,6 +273,13 @@ public: CPPUNIT_ASSERT_EQUAL_MESSAGE("Thread results are same structure", (void *)&threadParams, rtcpd_thread_result); + if(threadParams.outAnErrorOccurred) { + test_exception te(threadParams.outErrorStream.str()); + CPPUNIT_ASSERT_NO_THROW_MESSAGE( + "The rtcpd_thread encountered an error", + throw te); + } + CPPUNIT_ASSERT_MESSAGE("Check rtcpd_GetClientInfo2", threadParams.outGetClientInfo2Success); CPPUNIT_ASSERT_EQUAL_MESSAGE("volReqId", diff --git a/test/unittest/castor/tape/tapebridge/BridgeProtocolEngineTest.hpp b/test/unittest/castor/tape/tapebridge/BridgeProtocolEngineTest.hpp index d916985a559c355096f1fa3a871a168ba302c4c9..239da20a249056c03d37029812a50768a0842995 100644 --- a/test/unittest/castor/tape/tapebridge/BridgeProtocolEngineTest.hpp +++ b/test/unittest/castor/tape/tapebridge/BridgeProtocolEngineTest.hpp @@ -225,6 +225,12 @@ private: */ AlwaysFalseBoolFunctor m_stoppingGracefully; + /** + * Object responisble for sending and receiving the header of messages + * belonging to the legacy RTCOPY-protocol. + */ + LegacyTxRx m_legacyTxRx; + /** * Pointer to the BridgeProtocolEngine. */ @@ -347,7 +353,7 @@ public: m_driveUnit("unit"), m_tapebridgeTransactionCounter(0), m_mountTransactionId(5678), - m_netTimeout(1), + m_netTimeout(2), m_clientAddress(m_clientListenSockPath), m_clientProxy( m_cuuid, @@ -357,6 +363,7 @@ public: m_volumeDgn, m_driveUnit), m_nbFilesOnDestinationTape(2), + m_legacyTxRx(m_netTimeout), m_volReqId(m_mountTransactionId) { // Do nothing } @@ -449,7 +456,8 @@ public: m_tapebridgeTransactionCounter, logPeerOfCallbackConnectionsFromRtcpd, checkRtcpdIsConnectingFromLocalHost, - m_clientProxy); + m_clientProxy, + m_legacyTxRx); // Clear the list of threads to join with at tearDown m_threadsToJoinWithAtTearDown.clear(); @@ -500,7 +508,8 @@ public: Counter<uint64_t> &tapebridgeTransactionCounter, const bool logPeerOfCallbackConnectionsFromRtcpd, const bool checkRtcpdIsConnectingFromLocalHost, - IClientProxy &clientProxy) + IClientProxy &clientProxy, + ILegacyTxRx &legacyTxRx) throw(std::exception) { TestingBridgeProtocolEngine *engine = NULL; @@ -519,7 +528,8 @@ public: tapebridgeTransactionCounter, logPeerOfCallbackConnectionsFromRtcpd, checkRtcpdIsConnectingFromLocalHost, - clientProxy); + clientProxy, + legacyTxRx); } catch(castor::exception::Exception &ce) { test_exception te(ce.getMessage().str()); throw te; @@ -754,12 +764,11 @@ public: // Act as the client and accept the connection for more work from the // BridgeProtocolEngine - const int acceptTimeout = 10; int clientConnection1Fd = -1; CPPUNIT_ASSERT_NO_THROW_MESSAGE( "Check accept of first client-connection from the BridgeProtcolEngine", clientConnection1Fd = unittest::netAcceptConnection( - m_clientListenSock, acceptTimeout)); + m_clientListenSock, m_netTimeout)); castor::io::AbstractTCPSocket clientMarshallingSock1(clientConnection1Fd); clientMarshallingSock1.setTimeout(1); @@ -822,8 +831,8 @@ public: legacymsg::MessageHeader rtcpEndOfReqMsg; CPPUNIT_ASSERT_NO_THROW_MESSAGE( "Receive header of RTCP_ENDOF_REQ message from BridgeProtocolEngine", - LegacyTxRx::receiveMsgHeader(m_cuuid, m_volReqId, - m_initialRtcpdSockRtcpdSide, m_netTimeout, rtcpEndOfReqMsg)); + m_legacyTxRx.receiveMsgHeader(m_initialRtcpdSockRtcpdSide, + rtcpEndOfReqMsg)); CPPUNIT_ASSERT_EQUAL_MESSAGE( "Check magic number of RTCP_ENDOF_REQ message from" " BridgeProtocolEngine", @@ -851,8 +860,7 @@ public: CPPUNIT_ASSERT_NO_THROW_MESSAGE( "Send ack of RTCP_ENDOF_REQ message to BridgeProtocolEngine", - LegacyTxRx::sendMsgHeader(m_cuuid, m_volReqId, - m_initialRtcpdSockRtcpdSide, m_netTimeout, ackMsg)); + m_legacyTxRx.sendMsgHeader(m_initialRtcpdSockRtcpdSide, ackMsg)); } // Act as the client and accept the second connection from the @@ -864,7 +872,7 @@ public: CPPUNIT_ASSERT_NO_THROW_MESSAGE( "Check accept of second client-connection from the BridgeProtcolEngine", clientConnection2Fd = unittest::netAcceptConnection( - m_clientListenSock, acceptTimeout)); + m_clientListenSock, m_netTimeout)); castor::io::AbstractTCPSocket clientMarshallingSock2(clientConnection2Fd); clientMarshallingSock2.setTimeout(1); @@ -900,14 +908,6 @@ public: errorReport->errorCode()); } -/* - // Act as the client a send back a NotificationAcknowledge message - { - tapegateway::NotificationAcknowledge ack; - ack.setMountTransactionId(m_mountTransactionId); - ack.setAggregatorTransactionId(); - } -*/ // Join with the start rtcpd session thread void *startRtcpdSessionThreadResult = NULL; CPPUNIT_ASSERT_EQUAL_MESSAGE("pthread_join", 0, @@ -927,196 +927,6 @@ public: "The startRtcpdSessionThread thread encountered an error", throw te); } -return; -/* - - // Act as the rtcpd daemon and read back the ACK from the - // BridgeProtocolEngine - CPPUNIT_ASSERT_NO_THROW_MESSAGE( - "Check read back of ACK of RTCP_FINISHED message", - unittest::readAck(smartIoControlConnectionSock.get(), RTCOPY_MAGIC, - RTCP_FILEERR_REQ, 0)); - - // Act as the rtcpd daemon and send the BridgeProtocolEngine a - // flushed-to-tape message - { - tapeBridgeFlushedToTapeMsgBody_t flushedMsgBody; - memset(&flushedMsgBody, '\0', sizeof(flushedMsgBody)); - flushedMsgBody.volReqId = volReqId; - flushedMsgBody.tapeFseq = tapeFSeqOfFirstFileToMigrate; - - CPPUNIT_ASSERT_MESSAGE( - "Check tapebridge_sendTapeBridgeFlushedToTape()", - 0 < tapebridge_sendTapeBridgeFlushedToTape( - smartIoControlConnectionSock.get(), netTimeout, &flushedMsgBody)); - } - - // Act as the BridgeProtocolEngine and handle the flushed-to-tape message - // sent by the rtcpd daemon - { - struct timeval selectTimeout = {0, 0}; - CPPUNIT_ASSERT_NO_THROW_MESSAGE( - "Check handling of the first flushed-to-tape message", - m_engine->handleSelectEvents(selectTimeout)); - } - CPPUNIT_ASSERT_EQUAL_MESSAGE( - "Check the there is still only one I/O control-connection", - 1, - m_engine->getNbDiskTapeIOControlConns()); - CPPUNIT_ASSERT_EQUAL_MESSAGE( - "Check no RTCP_ENDOF_REQ messages have been received", - (uint32_t)0, - m_engine->getNbReceivedENDOF_REQs()); - CPPUNIT_ASSERT_EQUAL_MESSAGE( - "Check the session with the rtcpd daemon is not being shutdown", - false, - m_engine->shuttingDownRtcpdSession()); - CPPUNIT_ASSERT_EQUAL_MESSAGE( - "Check rtcpd session has not finished", - false, - m_engine->sessionWithRtcpdIsFinished()); - CPPUNIT_ASSERT_EQUAL_MESSAGE( - "Check BridgeProtocolEngine should continue processing sockets", - true, - m_engine->continueProcessingSocks()); - - // Act as the rtcpd daemon and read in the ACK from the - // BridgeProtocolEngine of the flushed-to-tape message - { - tapeBridgeFlushedToTapeAckMsg_t flushedAckMsg; - memset(&flushedAckMsg, '\0', sizeof(flushedAckMsg)); - - CPPUNIT_ASSERT_MESSAGE( - "Check tapebridge_recvTapeBridgeFlushedToTapeAck()", - 0 <= tapebridge_recvTapeBridgeFlushedToTapeAck( - smartIoControlConnectionSock.get(), netTimeout, &flushedAckMsg)); - } - - // Act as the client and accept the second connection from the - // BridgeProtocolEngine. This connection will be used by the - // BridgeProtocolEngine to send the FileMigrationReportList message of - // the first migrated file. - int clientConnection2Fd = -1; - CPPUNIT_ASSERT_NO_THROW_MESSAGE( - "Check accept of second client-connection from the BridgeProtcolEngine", - clientConnection2Fd = unittest::netAcceptConnection( - m_clientListenSock, acceptTimeout)); - castor::io::AbstractTCPSocket clientMarshallingSock2(clientConnection2Fd); - clientMarshallingSock2.setTimeout(1); - - // The client is now slow to process the second connection from the - // BridgeProtocolEngine so does nothing at this very moment in time. - - // Act as the rtcpd daemon and send a request for more work using the - // the already existant disk/tape I/O control-connection - CPPUNIT_ASSERT_NO_THROW_MESSAGE( - "Check writeRTCP_REQUEST_MORE_WORK()", - unittest::writeRTCP_REQUEST_MORE_WORK( - smartIoControlConnectionSock.get(), - volReqId, - tapePath)); - - // Act as the BridgeProtocolEngine and handle the second - // RTCP_REQUEST_MORE_WORK message from the rtcpd daemon - { - struct timeval selectTimeout = {0, 0}; - CPPUNIT_ASSERT_NO_THROW_MESSAGE( - "Check handling of the second RTCP_REQUEST_MORE_WORK message", - m_engine->handleSelectEvents(selectTimeout)); - } - CPPUNIT_ASSERT_EQUAL_MESSAGE( - "Check the there is still only one I/O control-connection", - 1, - m_engine->getNbDiskTapeIOControlConns()); - CPPUNIT_ASSERT_EQUAL_MESSAGE( - "Check no RTCP_ENDOF_REQ messages have been received", - (uint32_t)0, - m_engine->getNbReceivedENDOF_REQs()); - CPPUNIT_ASSERT_EQUAL_MESSAGE( - "Check the session with the rtcpd daemon is not being shutdown", - false, - m_engine->shuttingDownRtcpdSession()); - CPPUNIT_ASSERT_EQUAL_MESSAGE( - "Check rtcpd session has not finished", - false, - m_engine->sessionWithRtcpdIsFinished()); - CPPUNIT_ASSERT_EQUAL_MESSAGE( - "Check BridgeProtocolEngine should continue processing sockets", - true, - m_engine->continueProcessingSocks()); - - // The client now catches up - - // Act as the client and read in the FileMigrationReportList message of - // the first file to be migrated from the seond connection made with the - // client by the BridgeProtocolEngine - std::auto_ptr<IObject> fileMigrationReportListObj; - tapegateway::FileMigrationReportList *fileMigrationReportList = NULL; - { - CPPUNIT_ASSERT_NO_THROW_MESSAGE( - "Read in FileMigrationReportList the BridgeProtocolEngine", - fileMigrationReportListObj.reset( - clientMarshallingSock2.readObject())); - CPPUNIT_ASSERT_MESSAGE( - "Check FileMigrationReportList was read in from the" - " BridgeProtocolEngine", - NULL != fileMigrationReportListObj.get()); - CPPUNIT_ASSERT_EQUAL_MESSAGE( - "Check FileMigrationReportList is of the correct type", - (int)castor::OBJ_FileMigrationReportList, - fileMigrationReportListObj->type()); - fileMigrationReportList = - dynamic_cast<tapegateway::FileMigrationReportList*> - (fileMigrationReportListObj.get()); - CPPUNIT_ASSERT_MESSAGE( - "Check dynamic_cast to FileMigrationReportList", - NULL != fileMigrationReportList); - } - - // Act as the client and reply to the BridgeProtocolEngine with an error - // report - { - tapegateway::EndNotificationErrorReport errorReport; - errorReport.setErrorCode(ECANCELED); - errorReport.setErrorMessage("Error message"); - - CPPUNIT_ASSERT_NO_THROW_MESSAGE( - "Send error report in response to first file migrated", - clientMarshallingSock2.sendObject(errorReport)); - - clientMarshallingSock2.close(); - } - - // Act as the BridgeProtocolEngine and handle the error report from the - // client about the first migrated-file - { - struct timeval selectTimeout = {0, 0}; - CPPUNIT_ASSERT_NO_THROW_MESSAGE( - "Check handling of the first file's error report from the client", - m_engine->handleSelectEvents(selectTimeout)); - } - CPPUNIT_ASSERT_EQUAL_MESSAGE( - "Check the there is still only one I/O control-connection", - 1, - m_engine->getNbDiskTapeIOControlConns()); - CPPUNIT_ASSERT_EQUAL_MESSAGE( - "Check no RTCP_ENDOF_REQ messages have been received", - (uint32_t)0, - m_engine->getNbReceivedENDOF_REQs()); - CPPUNIT_ASSERT_EQUAL_MESSAGE( - "Check the session with the rtcpd daemon is being gracefully shutdown", - true, - m_engine->shuttingDownRtcpdSession()); - CPPUNIT_ASSERT_EQUAL_MESSAGE( - "Check rtcpd session has not finished", - false, - m_engine->sessionWithRtcpdIsFinished()); - CPPUNIT_ASSERT_EQUAL_MESSAGE( - "Check BridgeProtocolEngine should continue processing sockets", - true, - m_engine->continueProcessingSocks()); - } - */ } /** @@ -1336,12 +1146,11 @@ return; // Act as the client and accept the connection for more work from the // BridgeProtocolEngine - const int acceptTimeout = 1; int clientConnection1Fd = -1; CPPUNIT_ASSERT_NO_THROW_MESSAGE( "Check accept of first client-connection from the BridgeProtcolEngine", clientConnection1Fd = unittest::netAcceptConnection( - m_clientListenSock, acceptTimeout)); + m_clientListenSock, m_netTimeout)); castor::io::AbstractTCPSocket clientMarshallingSock1(clientConnection1Fd); clientMarshallingSock1.setTimeout(1); @@ -1611,7 +1420,7 @@ return; CPPUNIT_ASSERT_NO_THROW_MESSAGE( "Check accept of second client-connection from the BridgeProtcolEngine", clientConnection2Fd = unittest::netAcceptConnection( - m_clientListenSock, acceptTimeout)); + m_clientListenSock, m_netTimeout)); castor::io::AbstractTCPSocket clientMarshallingSock2(clientConnection2Fd); clientMarshallingSock2.setTimeout(1); diff --git a/test/unittest/castor/tape/tapebridge/LegacyTxRxTest.hpp b/test/unittest/castor/tape/tapebridge/LegacyTxRxTest.hpp new file mode 100644 index 0000000000000000000000000000000000000000..afdb442db76a032f571ef92ae65aa833038416bc --- /dev/null +++ b/test/unittest/castor/tape/tapebridge/LegacyTxRxTest.hpp @@ -0,0 +1,239 @@ +/****************************************************************************** + * test/unittest/castor/tape/tapebridge/LegacyTxRxTest.hpp + * + * This file is part of the Castor project. + * See http://castor.web.cern.ch/castor + * + * Copyright (C) 2003 CERN + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * + * + * @author Steven.Murray@cern.ch + *****************************************************************************/ + +#ifndef TEST_UNITTEST_CASTOR_TAPE_TAPEBRIDGE_LEGACYTXRXTEST_HPP +#define TEST_UNITTEST_CASTOR_TAPE_TAPEBRIDGE_LEGACYTXRXTEST_HPP 1 + +#include "castor/tape/tapebridge/LegacyTxRx.hpp" +#include "castor/tape/utils/SmartFd.hpp" +#include "h/serrno.h" + +#include <cppunit/extensions/HelperMacros.h> +#include <exception> +#include <memory> +#include <stdint.h> +#include <stdlib.h> +#include <sys/socket.h> + +namespace castor { +namespace tape { +namespace tapebridge { + +class LegacyTxRxTest: public CppUnit::TestFixture { +public: + + LegacyTxRxTest() { + } + + void setUp() { + } + + void tearDown() { + } + + void testConstructor() { + const int netReadWriteTimeout = 12345; + std::auto_ptr<LegacyTxRx> smartLegacyTxRx; + + CPPUNIT_ASSERT_NO_THROW_MESSAGE( + "Check new LegacyTxRx()", + smartLegacyTxRx.reset(new LegacyTxRx(netReadWriteTimeout))); + + CPPUNIT_ASSERT_EQUAL_MESSAGE( + "Check value of netReadWriteTimeout", + netReadWriteTimeout, + smartLegacyTxRx->getNetReadWriteTimeout()); + } + + void testSendAndRecieveMsgHeader() { + legacymsg::MessageHeader msgHeaderToSend; + msgHeaderToSend.magic = 12; + msgHeaderToSend.reqType = 34; + msgHeaderToSend.lenOrStatus = 56; + + int sockPair[2] = {-1, -1}; + CPPUNIT_ASSERT_EQUAL_MESSAGE( + "Create socket pair", + 0, + socketpair(PF_LOCAL, SOCK_STREAM, 0, sockPair)); + utils::SmartFd sendSock(sockPair[0]); + utils::SmartFd recvSock(sockPair[1]); + + const int netReadWriteTimeout = 1; + std::auto_ptr<LegacyTxRx> smartLegacyTxRx; + CPPUNIT_ASSERT_NO_THROW_MESSAGE( + "Check new LegacyTxRx()", + smartLegacyTxRx.reset(new LegacyTxRx(netReadWriteTimeout))); + + CPPUNIT_ASSERT_EQUAL_MESSAGE( + "Check value of netReadWriteTimeout", + netReadWriteTimeout, + smartLegacyTxRx->getNetReadWriteTimeout()); + + CPPUNIT_ASSERT_NO_THROW_MESSAGE( + "Check sendMsgHeader()", + smartLegacyTxRx->sendMsgHeader(sendSock.get(), msgHeaderToSend)); + + legacymsg::MessageHeader receivedMsgHeader; + receivedMsgHeader.magic = 0; + receivedMsgHeader.reqType = 0; + receivedMsgHeader.lenOrStatus = 0; + + CPPUNIT_ASSERT_NO_THROW_MESSAGE( + "Check receiveMsgHeader()", + smartLegacyTxRx->receiveMsgHeader(recvSock.get(), receivedMsgHeader)); + + CPPUNIT_ASSERT_EQUAL_MESSAGE( + "Check magic", + msgHeaderToSend.magic, + receivedMsgHeader.magic); + + CPPUNIT_ASSERT_EQUAL_MESSAGE( + "Check reqType", + msgHeaderToSend.reqType, + receivedMsgHeader.reqType); + + CPPUNIT_ASSERT_EQUAL_MESSAGE( + "Check lenOrStatus", + msgHeaderToSend.lenOrStatus, + receivedMsgHeader.lenOrStatus); + } + + void testSendAndRecieveMsgHeaderFromCloseableWithNoClose() { + legacymsg::MessageHeader msgHeaderToSend; + msgHeaderToSend.magic = 12; + msgHeaderToSend.reqType = 34; + msgHeaderToSend.lenOrStatus = 56; + + int sockPair[2] = {-1, -1}; + CPPUNIT_ASSERT_EQUAL_MESSAGE( + "Create socket pair", + 0, + socketpair(PF_LOCAL, SOCK_STREAM, 0, sockPair)); + utils::SmartFd sendSock(sockPair[0]); + utils::SmartFd recvSock(sockPair[1]); + + const int netReadWriteTimeout = 1; + std::auto_ptr<LegacyTxRx> smartLegacyTxRx; + CPPUNIT_ASSERT_NO_THROW_MESSAGE( + "Check new LegacyTxRx()", + smartLegacyTxRx.reset(new LegacyTxRx(netReadWriteTimeout))); + + CPPUNIT_ASSERT_EQUAL_MESSAGE( + "Check value of netReadWriteTimeout", + netReadWriteTimeout, + smartLegacyTxRx->getNetReadWriteTimeout()); + + CPPUNIT_ASSERT_NO_THROW_MESSAGE( + "Check sendMsgHeader()", + smartLegacyTxRx->sendMsgHeader(sendSock.get(), msgHeaderToSend)); + + legacymsg::MessageHeader receivedMsgHeader; + receivedMsgHeader.magic = 0; + receivedMsgHeader.reqType = 0; + receivedMsgHeader.lenOrStatus = 0; + + bool connectionClosed = false; + CPPUNIT_ASSERT_NO_THROW_MESSAGE( + "Check receiveMsgHeaderFromCloseable()", + connectionClosed = smartLegacyTxRx->receiveMsgHeaderFromCloseable( + recvSock.get(), receivedMsgHeader)); + + CPPUNIT_ASSERT_EQUAL_MESSAGE( + "Check connection not closed", + false, + connectionClosed); + + CPPUNIT_ASSERT_EQUAL_MESSAGE( + "Check magic", + msgHeaderToSend.magic, + receivedMsgHeader.magic); + + CPPUNIT_ASSERT_EQUAL_MESSAGE( + "Check reqType", + msgHeaderToSend.reqType, + receivedMsgHeader.reqType); + + CPPUNIT_ASSERT_EQUAL_MESSAGE( + "Check lenOrStatus", + msgHeaderToSend.lenOrStatus, + receivedMsgHeader.lenOrStatus); + } + + void testSendAndRecieveMsgHeaderFromCloseableWithClose() { + int sockPair[2] = {-1, -1}; + CPPUNIT_ASSERT_EQUAL_MESSAGE( + "Create socket pair", + 0, + socketpair(PF_LOCAL, SOCK_STREAM, 0, sockPair)); + utils::SmartFd sendSock(sockPair[0]); + utils::SmartFd recvSock(sockPair[1]); + + const int netReadWriteTimeout = 1; + std::auto_ptr<LegacyTxRx> smartLegacyTxRx; + CPPUNIT_ASSERT_NO_THROW_MESSAGE( + "Check new LegacyTxRx()", + smartLegacyTxRx.reset(new LegacyTxRx(netReadWriteTimeout))); + + CPPUNIT_ASSERT_EQUAL_MESSAGE( + "Check value of netReadWriteTimeout", + netReadWriteTimeout, + smartLegacyTxRx->getNetReadWriteTimeout()); + + close(sendSock.release()); + + legacymsg::MessageHeader receivedMsgHeader; + receivedMsgHeader.magic = 0; + receivedMsgHeader.reqType = 0; + receivedMsgHeader.lenOrStatus = 0; + + bool connectionClosed = false; + CPPUNIT_ASSERT_NO_THROW_MESSAGE( + "Check receiveMsgHeaderFromCloseable()", + connectionClosed = smartLegacyTxRx->receiveMsgHeaderFromCloseable( + recvSock.get(), receivedMsgHeader)); + + CPPUNIT_ASSERT_EQUAL_MESSAGE( + "Check connection is closed", + true, + connectionClosed); + } + + CPPUNIT_TEST_SUITE(LegacyTxRxTest); + + CPPUNIT_TEST(testConstructor); + CPPUNIT_TEST(testSendAndRecieveMsgHeader); + CPPUNIT_TEST(testSendAndRecieveMsgHeaderFromCloseableWithNoClose); + CPPUNIT_TEST(testSendAndRecieveMsgHeaderFromCloseableWithClose); + + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(LegacyTxRxTest); + +} // namespace tapebridge +} // namespace tape +} // namespace castor + +#endif // TEST_UNITTEST_CASTOR_TAPE_TAPEBRIDGE_LEGACYTXRXTEST_HPP diff --git a/test/unittest/castor/tape/tapebridge/SessionErrorTest.hpp b/test/unittest/castor/tape/tapebridge/SessionErrorTest.hpp index f28a760ac39bba9fb3248db4bdd1868dfbd73c5d..4851c8d4ec10ea40f4538867db208c1eb71e0b0d 100644 --- a/test/unittest/castor/tape/tapebridge/SessionErrorTest.hpp +++ b/test/unittest/castor/tape/tapebridge/SessionErrorTest.hpp @@ -50,10 +50,10 @@ public: } void testConstructor() { - castor::tape::tapebridge::SessionError rtcpdError; + SessionError rtcpdError; CPPUNIT_ASSERT_EQUAL( - castor::tape::tapebridge::SessionError::UNKNOWN_SCOPE, + SessionError::UNKNOWN_SCOPE, rtcpdError.getErrorScope()); CPPUNIT_ASSERT_EQUAL( @@ -82,28 +82,28 @@ public: } void testAttributes() { - castor::tape::tapebridge::SessionError rtcpdError; + SessionError rtcpdError; - rtcpdError.setErrorScope(castor::tape::tapebridge::SessionError::FILE_SCOPE); + rtcpdError.setErrorScope(SessionError::FILE_SCOPE); CPPUNIT_ASSERT_EQUAL( - castor::tape::tapebridge::SessionError::FILE_SCOPE, + SessionError::FILE_SCOPE, rtcpdError.getErrorScope()); rtcpdError.setErrorScope( - castor::tape::tapebridge::SessionError::SESSION_SCOPE); + SessionError::SESSION_SCOPE); CPPUNIT_ASSERT_EQUAL( - castor::tape::tapebridge::SessionError::SESSION_SCOPE, + SessionError::SESSION_SCOPE, rtcpdError.getErrorScope()); rtcpdError.setErrorScope( - castor::tape::tapebridge::SessionError::UNKNOWN_SCOPE); + SessionError::UNKNOWN_SCOPE); CPPUNIT_ASSERT_EQUAL( - castor::tape::tapebridge::SessionError::UNKNOWN_SCOPE, + SessionError::UNKNOWN_SCOPE, rtcpdError.getErrorScope()); { - const castor::tape::tapebridge::SessionError::ErrorScope crazyScopeValue = - (castor::tape::tapebridge::SessionError::ErrorScope)12345; + const SessionError::ErrorScope crazyScopeValue = + (SessionError::ErrorScope)12345; CPPUNIT_ASSERT_THROW( rtcpdError.setErrorScope(crazyScopeValue), diff --git a/test/unittest/castor/tape/tapebridge/TestingBridgeProtocolEngine.hpp b/test/unittest/castor/tape/tapebridge/TestingBridgeProtocolEngine.hpp index db294043f8ec9b57e2eb05d88b8c614ceab073d3..c167992a195e7546a05e5117fe69e51a277e4fd7 100644 --- a/test/unittest/castor/tape/tapebridge/TestingBridgeProtocolEngine.hpp +++ b/test/unittest/castor/tape/tapebridge/TestingBridgeProtocolEngine.hpp @@ -50,7 +50,8 @@ public: Counter<uint64_t> &tapebridgeTransactionCounter, const bool logPeerOfCallbackConnectionsFromRtcpd, const bool checkRtcpdIsConnectingFromLocalHost, - IClientProxy &clientProxy) + IClientProxy &clientProxy, + ILegacyTxRx &legacyTxRx) throw(): BridgeProtocolEngine( fileCloser, @@ -66,7 +67,8 @@ public: tapebridgeTransactionCounter, logPeerOfCallbackConnectionsFromRtcpd, checkRtcpdIsConnectingFromLocalHost, - clientProxy) { + clientProxy, + legacyTxRx) { // Do nothing } diff --git a/test/unittest/runUnitTestsMain.cpp b/test/unittest/runUnitTestsMain.cpp index 5e9179efb5dbfc59d364a96c927dd291ec9739d2..4a772f978a2e65e3832eb138e89879f031b97833 100644 --- a/test/unittest/runUnitTestsMain.cpp +++ b/test/unittest/runUnitTestsMain.cpp @@ -34,6 +34,7 @@ #include "test/unittest/castor/tape/tapebridge/FileToMigrateTest.hpp" #include "test/unittest/castor/tape/tapebridge/FileToRecallTest.hpp" #include "test/unittest/castor/tape/tapebridge/GetMoreWorkConnectionTest.hpp" +#include "test/unittest/castor/tape/tapebridge/LegacyTxRxTest.hpp" #include "test/unittest/castor/tape/tapebridge/MigrationReportConnectionTest.hpp" #include "test/unittest/castor/tape/tapebridge/TapeBridgeDaemonTest.hpp" #include "test/unittest/castor/tape/tapebridge/TapeFlushConfigParamsTest.hpp" diff --git a/test/unittest/tapebridge/RecvTapeBridgeFlushedToTapeAckTest.hpp b/test/unittest/tapebridge/RecvTapeBridgeFlushedToTapeAckTest.hpp index e399d35921607294271fa248816d13cb545d3d0e..932b39e0e628e6cfe7711bed72e9edbdbeabffeb 100644 --- a/test/unittest/tapebridge/RecvTapeBridgeFlushedToTapeAckTest.hpp +++ b/test/unittest/tapebridge/RecvTapeBridgeFlushedToTapeAckTest.hpp @@ -50,6 +50,10 @@ #define FAKE_TAPEBRIDGE_LISTEN_PORT 64000 +namespace castor { +namespace tape { +namespace tapebridge { + class RecvTapeBridgeFlushedToTapeAckTest: public CppUnit::TestFixture { private: @@ -185,8 +189,8 @@ public: &threadParams)); // Create client connection - castor::tape::utils::SmartFd clientConnectionSock(socket(PF_INET, - SOCK_STREAM, IPPROTO_TCP)); + utils::SmartFd clientConnectionSock(socket(PF_INET, SOCK_STREAM, + IPPROTO_TCP)); CPPUNIT_ASSERT_MESSAGE("create client connection socket", 0 <= clientConnectionSock.get()); { @@ -204,14 +208,13 @@ public: // Send TAPEBRIDGE_FLUSHEDTOTAPE acknowledgement message using client // connection - const uint32_t dummyVolReqId = 7777; - castor::tape::legacymsg::MessageHeader ackMsg; + legacymsg::MessageHeader ackMsg; memset(&ackMsg, '\0', sizeof(ackMsg)); ackMsg.magic = RTCOPY_MAGIC; ackMsg.reqType = TAPEBRIDGE_FLUSHEDTOTAPE; ackMsg.lenOrStatus = 0; - castor::tape::tapebridge::LegacyTxRx::sendMsgHeader(nullCuuid, - dummyVolReqId, clientConnectionSock.get(), netReadWriteTimeout, ackMsg); + LegacyTxRx legacyTxRx(netReadWriteTimeout); + legacyTxRx.sendMsgHeader(clientConnectionSock.get(), ackMsg); void *tapebridged_thread_result = NULL; CPPUNIT_ASSERT_EQUAL_MESSAGE("pthread_join", 0, @@ -238,4 +241,8 @@ public: CPPUNIT_TEST_SUITE_REGISTRATION(RecvTapeBridgeFlushedToTapeAckTest); +} // namespace tapebridge +} // namespace tape +} // namespace castor + #endif // TEST_UNITTEST_TAPEBRIDGE_RECVTAPEBRIDGEFLUSHEDTOTAPEACKTEST_HPP