Commit 84c0382f authored by David COME's avatar David COME
Browse files

Moving MountSessionConnectionHandler to ZMQ.

Server side is done for messages from mount session
MountSession side done

All messages are now using zeroMQ. May be some race conditions that need toi be fixed
parent 9906bde9
......@@ -188,6 +188,7 @@ ELSE(DEFINED PackageOnly)
add_subdirectory (upv)
add_subdirectory (vdqm)
add_subdirectory (vmgr)
add_subdirectory (zmq)
if (${COMPILE_SERVER} STREQUAL "1")
add_subdirectory (xrootd)
endif (${COMPILE_SERVER} STREQUAL "1")
......
......@@ -86,6 +86,6 @@ set (TAPE_LEGACYMSG_LIB_SRC_FILES
add_library (castortapelegacymsg SHARED ${TAPE_LEGACYMSG_LIB_SRC_FILES})
CastorSetLibraryVersions (castortapelegacymsg)
target_link_libraries (castortapelegacymsg castorclient castorcommon
castortapeutils castorns)
castortapeutils castorns castorMessages castorZMQWrapper)
install (TARGETS castortapelegacymsg LIBRARY DESTINATION ${CASTOR_DEST_LIB_DIR}
NAMELINK_SKIP)
......@@ -27,5 +27,5 @@
//-----------------------------------------------------------------------------
// destructor
//-----------------------------------------------------------------------------
castor::legacymsg::TapeserverProxy::~TapeserverProxy() throw() {
castor::legacymsg::TapeserverProxy::~TapeserverProxy() {
}
......@@ -42,7 +42,7 @@ public:
* Closes the listening socket created in the constructor to listen for
* connections from the vdqmd daemon.
*/
virtual ~TapeserverProxy() throw() = 0;
virtual ~TapeserverProxy() = 0;
/**
* Notifies the tapeserverd daemon that the mount-session child-process got
......
......@@ -32,6 +32,6 @@ castor::legacymsg::TapeserverProxyDummyFactory::~TapeserverProxyDummyFactory() t
//------------------------------------------------------------------------------
// create
//------------------------------------------------------------------------------
castor::legacymsg::TapeserverProxy *castor::legacymsg::TapeserverProxyDummyFactory::create() {
castor::legacymsg::TapeserverProxy *castor::legacymsg::TapeserverProxyDummyFactory::create(zmq::context_t& ctx) {
return new TapeserverProxyDummy();
}
......@@ -47,7 +47,7 @@ public:
*
* @return A pointer to the newly created object.
*/
TapeserverProxy *create();
TapeserverProxy *create(zmq::context_t& ctx);
}; // class TapeserverProxyDummyFactory
......
......@@ -23,7 +23,7 @@
#pragma once
#include "castor/legacymsg/TapeserverProxy.hpp"
#include "zmq/castorZmqWrapper.hpp"
namespace castor {
namespace legacymsg {
......@@ -47,7 +47,7 @@ public:
*
* @return A pointer to the newly created object.
*/
virtual TapeserverProxy *create() = 0;
virtual TapeserverProxy *create(zmq::context_t& ctx) = 0;
}; // class TapeserverProxyFactory
......
......@@ -34,99 +34,74 @@
#include "h/rtcp_constants.h"
#include "h/vdqm_constants.h"
#include "h/Ctape.h"
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
castor::legacymsg::TapeserverProxyTcpIp::TapeserverProxyTcpIp(log::Logger &log, const unsigned short tapeserverPort, const int netTimeout) throw():
m_log(log),
m_tapeserverHostName("localhost"),
m_tapeserverPort(tapeserverPort),
m_netTimeout(netTimeout) {
}
//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
castor::legacymsg::TapeserverProxyTcpIp::~TapeserverProxyTcpIp() throw() {
}
//------------------------------------------------------------------------------
// updateDrive
//------------------------------------------------------------------------------
void castor::legacymsg::TapeserverProxyTcpIp::updateDriveInfo(
const castor::legacymsg::TapeUpdateDriveRqstMsgBody::TapeEvent event,
const castor::tape::tapegateway::VolumeMode mode,
const castor::tape::tapegateway::ClientType clientType,
const std::string &unitName,
const std::string &vid)
{
legacymsg::TapeUpdateDriveRqstMsgBody body;
try {
switch(clientType) {
#include "castor/messages/Header.pb.h"
#include "castor/messages/NotifyDrive.pb.h"
#include "castor/messages/Constants.hpp"
#include "zmq/castorZmqUtils.hpp"
namespace {
castor::messages::NotifyDriveBeforeMountStarted_TapeClientType
convertClientType(castor::tape::tapegateway::ClientType val){
switch(val){
case castor::tape::tapegateway::TAPE_GATEWAY:
body.clientType=castor::legacymsg::TapeUpdateDriveRqstMsgBody::CLIENT_TYPE_GATEWAY;
break;
return castor::messages::NotifyDriveBeforeMountStarted::CLIENT_TYPE_GATEWAY;
case castor::tape::tapegateway::READ_TP:
body.clientType=castor::legacymsg::TapeUpdateDriveRqstMsgBody::CLIENT_TYPE_READTP;
break;
return castor::messages::NotifyDriveBeforeMountStarted::CLIENT_TYPE_READTP;
case castor::tape::tapegateway::WRITE_TP:
body.clientType=castor::legacymsg::TapeUpdateDriveRqstMsgBody::CLIENT_TYPE_WRITETP;
break;
return castor::messages::NotifyDriveBeforeMountStarted::CLIENT_TYPE_WRITETP;
case castor::tape::tapegateway::DUMP_TP:
body.clientType=castor::legacymsg::TapeUpdateDriveRqstMsgBody::CLIENT_TYPE_DUMPTP;
break;
return castor::messages::NotifyDriveBeforeMountStarted::CLIENT_TYPE_DUMPTP;
default:
body.clientType=castor::legacymsg::TapeUpdateDriveRqstMsgBody::CLIENT_TYPE_NONE;
break;
return castor::messages::NotifyDriveBeforeMountStarted::CLIENT_TYPE_DUMPTP;
}
switch(mode) {
}
castor::messages::TapeMode
convertVolumeMode(castor::tape::tapegateway::VolumeMode val){
switch(val){
case castor::tape::tapegateway::READ:
body.mode=castor::legacymsg::TapeUpdateDriveRqstMsgBody::TAPE_MODE_READ;
break;
return castor::messages::TAPE_MODE_READ;
case castor::tape::tapegateway::WRITE:
body.mode=castor::legacymsg::TapeUpdateDriveRqstMsgBody::TAPE_MODE_READWRITE;
break;
return castor::messages::TAPE_MODE_READWRITE;
case castor::tape::tapegateway::DUMP:
body.mode=castor::legacymsg::TapeUpdateDriveRqstMsgBody::TAPE_MODE_DUMP;
break;
return castor::messages::TAPE_MODE_DUMP;
default:
body.mode=castor::legacymsg::TapeUpdateDriveRqstMsgBody::TAPE_MODE_NONE;
break;
return castor::messages::TAPE_MODE_NONE;
}
body.event=event;
castor::utils::copyString(body.vid, vid.c_str());
castor::utils::copyString(body.drive, unitName.c_str());
castor::utils::SmartFd fd(connectToTapeserver());
writeTapeUpdateDriveRqstMsg(fd.get(), body);
readReplyMsg(fd.get());
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() << "Failed to notify tapeserverd of drive info update: "
<< " event = "<< body.event
<< " mode = " << body.mode
<< " clientType = " << body.clientType
<< " unitName = " << body.drive
<< " vid = " << body.vid
<< ": " << ne.getMessage().str();
throw ex;
}
}
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
castor::legacymsg::TapeserverProxyTcpIp::TapeserverProxyTcpIp(log::Logger &log,
const unsigned short tapeserverPort, const int netTimeout,zmq::context_t& ctx) throw():
m_log(log),
m_tapeserverHostName("localhost"),
m_tapeserverPort(tapeserverPort),
m_netTimeout(netTimeout),m_socket(ctx,ZMQ_REQ) {
castor::utils::connectToLocalhost(m_socket);
}
//------------------------------------------------------------------------------
// gotReadMountDetailsFromClient
//------------------------------------------------------------------------------
void castor::legacymsg::TapeserverProxyTcpIp::gotReadMountDetailsFromClient(
castor::tape::tapeserver::client::ClientProxy::VolumeInfo volInfo,
const std::string &unitName) {
updateDriveInfo(
legacymsg::TapeUpdateDriveRqstMsgBody::TAPE_STATUS_BEFORE_MOUNT_STARTED,
volInfo.volumeMode,
volInfo.clientType,
unitName,
volInfo.vid);
castor::tape::tapeserver::client::ClientProxy::VolumeInfo volInfo,
const std::string &unitName) {
castor::messages::Header header=castor::utils::preFilleHeader();
header.set_bodyhashvalue("PIPO");
header.set_bodysignature("PIPO");
header.set_reqtype(castor::messages::reqType::NotifyDriveBeforeMountStarted);
castor::messages::NotifyDriveBeforeMountStarted body;
body.set_clienttype(convertClientType(volInfo.clientType));
body.set_mode(convertVolumeMode(volInfo.volumeMode));
body.set_unitname(unitName);
body.set_vid(volInfo.vid);
castor::utils::sendMessage(m_socket,header,ZMQ_SNDMORE);
castor::utils::sendMessage(m_socket,body);
readReplyMsg();
}
//------------------------------------------------------------------------------
......@@ -136,13 +111,20 @@ uint64_t
castor::legacymsg::TapeserverProxyTcpIp::gotWriteMountDetailsFromClient(
castor::tape::tapeserver::client::ClientProxy::VolumeInfo volInfo,
const std::string &unitName) {
updateDriveInfo(
legacymsg::TapeUpdateDriveRqstMsgBody::TAPE_STATUS_BEFORE_MOUNT_STARTED,
volInfo.volumeMode,
volInfo.clientType,
unitName,
volInfo.vid);
castor::messages::Header header=castor::utils::preFilleHeader();
header.set_bodyhashvalue("PIPO");
header.set_bodysignature("PIPO");
header.set_reqtype(castor::messages::reqType::NotifyDriveBeforeMountStarted);
castor::messages::NotifyDriveBeforeMountStarted body;
body.set_clienttype(convertClientType(volInfo.clientType));
body.set_mode(convertVolumeMode(volInfo.volumeMode));
body.set_unitname(unitName);
body.set_vid(volInfo.vid);
castor::utils::sendMessage(m_socket,header,ZMQ_SNDMORE);
castor::utils::sendMessage(m_socket,body);
readReplyMsg();
return 0; // TO BE DONE
}
......@@ -152,12 +134,21 @@ uint64_t
void castor::legacymsg::TapeserverProxyTcpIp::gotDumpMountDetailsFromClient(
castor::tape::tapeserver::client::ClientProxy::VolumeInfo volInfo,
const std::string &unitName) {
updateDriveInfo(
legacymsg::TapeUpdateDriveRqstMsgBody::TAPE_STATUS_BEFORE_MOUNT_STARTED,
volInfo.volumeMode,
volInfo.clientType,
unitName,
volInfo.vid);
castor::messages::Header header=castor::utils::preFilleHeader();
header.set_bodyhashvalue("PIPO");
header.set_bodysignature("PIPO");
header.set_reqtype(castor::messages::reqType::NotifyDriveBeforeMountStarted);
castor::messages::NotifyDriveBeforeMountStarted body;
body.set_clienttype(convertClientType(volInfo.clientType));
body.set_mode(convertVolumeMode(volInfo.volumeMode));
body.set_unitname(unitName);
body.set_vid(volInfo.vid);
castor::utils::sendMessage(m_socket,header,ZMQ_SNDMORE);
castor::utils::sendMessage(m_socket,body);
readReplyMsg();
}
//------------------------------------------------------------------------------
......@@ -166,12 +157,20 @@ void castor::legacymsg::TapeserverProxyTcpIp::gotDumpMountDetailsFromClient(
void castor::legacymsg::TapeserverProxyTcpIp::tapeMountedForRead(
castor::tape::tapeserver::client::ClientProxy::VolumeInfo volInfo,
const std::string &unitName) {
updateDriveInfo(
legacymsg::TapeUpdateDriveRqstMsgBody::TAPE_STATUS_MOUNTED,
volInfo.volumeMode,
volInfo.clientType,
unitName,
volInfo.vid);
castor::messages::Header header=castor::utils::preFilleHeader();
header.set_bodyhashvalue("PIPO");
header.set_bodysignature("PIPO");
header.set_reqtype(castor::messages::reqType::NotifyDriveTapeMounted);
castor::messages::NotifyDriveTapeMounted body;
body.set_mode(convertVolumeMode(volInfo.volumeMode));
body.set_unitname(unitName);
body.set_vid(volInfo.vid);
castor::utils::sendMessage(m_socket,header,ZMQ_SNDMORE);
castor::utils::sendMessage(m_socket,body);
readReplyMsg();
}
//------------------------------------------------------------------------------
......@@ -180,12 +179,20 @@ void castor::legacymsg::TapeserverProxyTcpIp::tapeMountedForRead(
void castor::legacymsg::TapeserverProxyTcpIp::tapeMountedForWrite(
castor::tape::tapeserver::client::ClientProxy::VolumeInfo volInfo,
const std::string &unitName) {
updateDriveInfo(
legacymsg::TapeUpdateDriveRqstMsgBody::TAPE_STATUS_MOUNTED,
volInfo.volumeMode,
volInfo.clientType,
unitName,
volInfo.vid);
castor::messages::Header header=castor::utils::preFilleHeader();
header.set_bodyhashvalue("PIPO");
header.set_bodysignature("PIPO");
header.set_reqtype(castor::messages::reqType::NotifyDriveTapeMounted);
castor::messages::NotifyDriveTapeMounted body;
body.set_mode(convertVolumeMode(volInfo.volumeMode));
body.set_unitname(unitName);
body.set_vid(volInfo.vid);
castor::utils::sendMessage(m_socket,header,ZMQ_SNDMORE);
castor::utils::sendMessage(m_socket,body);
readReplyMsg();
}
//------------------------------------------------------------------------------
......@@ -194,12 +201,17 @@ void castor::legacymsg::TapeserverProxyTcpIp::tapeMountedForWrite(
void castor::legacymsg::TapeserverProxyTcpIp::tapeUnmounting(
castor::tape::tapeserver::client::ClientProxy::VolumeInfo volInfo,
const std::string &unitName) {
updateDriveInfo(
legacymsg::TapeUpdateDriveRqstMsgBody::TAPE_STATUS_UNMOUNT_STARTED,
volInfo.volumeMode,
volInfo.clientType,
unitName,
volInfo.vid);
castor::messages::Header header=castor::utils::preFilleHeader();
header.set_bodyhashvalue("PIPO");
header.set_bodysignature("PIPO");
header.set_reqtype(castor::messages::reqType::NotifyDriveUnmountStarted);
castor::messages::NotifyDriveUnmountStarted body;
castor::utils::sendMessage(m_socket,header,ZMQ_SNDMORE);
castor::utils::sendMessage(m_socket,body);
readReplyMsg();
}
//------------------------------------------------------------------------------
......@@ -208,102 +220,22 @@ void castor::legacymsg::TapeserverProxyTcpIp::tapeUnmounting(
void castor::legacymsg::TapeserverProxyTcpIp::tapeUnmounted(
castor::tape::tapeserver::client::ClientProxy::VolumeInfo volInfo,
const std::string &unitName) {
updateDriveInfo(
legacymsg::TapeUpdateDriveRqstMsgBody::TAPE_STATUS_UNMOUNTED,
volInfo.volumeMode,
volInfo.clientType,
unitName,
volInfo.vid);
}
//-----------------------------------------------------------------------------
// connectToTapeserver
//-----------------------------------------------------------------------------
int castor::legacymsg::TapeserverProxyTcpIp::connectToTapeserver() const {
castor::utils::SmartFd smartConnectSock;
try {
smartConnectSock.reset(io::connectWithTimeout(m_tapeserverHostName,
m_tapeserverPort, m_netTimeout));
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() << "Failed to connect to tapeserver on host " <<
m_tapeserverHostName << " port " << m_tapeserverPort << ": " <<
ne.getMessage().str();
throw ex;
}
return smartConnectSock.release();
}
//-----------------------------------------------------------------------------
// writeTapeUpdateDriveRqstMsg
//-----------------------------------------------------------------------------
void castor::legacymsg::TapeserverProxyTcpIp::writeTapeUpdateDriveRqstMsg(
const int fd, const legacymsg::TapeUpdateDriveRqstMsgBody &body) {
char buf[3 * sizeof(uint32_t) + sizeof(body)]; // header + body
const size_t len = legacymsg::marshal(buf, sizeof(buf), body);
try {
io::writeBytes(fd, m_netTimeout, len, buf);
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() << "Failed to write drive status message: "
<< ne.getMessage().str();
throw ex;
}
castor::messages::Header header=castor::utils::preFilleHeader();
header.set_bodyhashvalue("PIPO");
header.set_bodysignature("PIPO");
header.set_reqtype(castor::messages::reqType::NotifyDriveTapeUnmounted);
castor::messages::NotifyDriveTapeUnmounted body;
castor::utils::sendMessage(m_socket,header,ZMQ_SNDMORE);
castor::utils::sendMessage(m_socket,body);
readReplyMsg();
}
//-----------------------------------------------------------------------------
// readReplyMsg
//-----------------------------------------------------------------------------
void castor::legacymsg::TapeserverProxyTcpIp::readReplyMsg(const int fd) {
size_t headerBufLen = 12; // 12 bytes of header
char headerBuf[12];
memset(headerBuf, '\0', headerBufLen);
try {
castor::io::readBytes(fd, m_netTimeout, headerBufLen, headerBuf);
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() << "Failed to read Tape Update Drive reply message: " <<
ne.getMessage().str();
throw ex;
}
void castor::legacymsg::TapeserverProxyTcpIp::readReplyMsg() {
const char *headerBufPtr = headerBuf;
castor::legacymsg::MessageHeader replyHeader;
memset(&replyHeader, '\0', sizeof(replyHeader));
castor::legacymsg::unmarshal(headerBufPtr, headerBufLen, replyHeader);
if(MSG_DATA != replyHeader.reqType || TPMAGIC != replyHeader.magic) {
castor::exception::Exception ex;
ex.getMessage() << "Wrong reply type or magic # in Tape Update Drive reply message. replymsg.reqType: " << replyHeader.reqType << " (expected: " << MSG_DATA << ") replymsg.magic: " << replyHeader.magic << " (expected: " << TPMAGIC << ")";
throw ex;
}
size_t bodyBufLen = 4+CA_MAXLINELEN+1; // 4 bytes of return code + max length of error message
char bodyBuf[4+CA_MAXLINELEN+1];
memset(bodyBuf, '\0', bodyBufLen);
int actualBodyLen = replyHeader.lenOrStatus;
try {
castor::io::readBytes(fd, m_netTimeout, actualBodyLen, bodyBuf);
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() << "Failed to read Tape Update Drive reply message: " <<
ne.getMessage().str();
throw ex;
}
const char *bodyBufPtr = bodyBuf;
castor::legacymsg::GenericReplyMsgBody replymsg;
memset(&replymsg, '\0', sizeof(replymsg));
castor::legacymsg::unmarshal(bodyBufPtr, bodyBufLen, replymsg);
if(0 != replymsg.status) {
castor::exception::Exception ex;
ex.getMessage() << "Failed to update drive state: " << replymsg.errorMessage;
throw ex;
}
}
......@@ -27,7 +27,7 @@
#include "castor/legacymsg/TapeUpdateDriveRqstMsgBody.hpp"
#include "castor/legacymsg/TapeserverProxy.hpp"
#include "castor/tape/tapeserver/client/ClientProxy.hpp"
#include "zmq/castorZmqWrapper.hpp"
namespace castor {
namespace legacymsg {
......@@ -49,15 +49,7 @@ public:
* network read and write operations.
*/
TapeserverProxyTcpIp(log::Logger &log, const unsigned short tapeserverPort,
const int netTimeout) throw();
/**
* Destructor.
*
* Closes the listening socket created in the constructor to listen for
* connections from the vdqmd daemon.
*/
~TapeserverProxyTcpIp() throw();
const int netTimeout,zmq::context_t& ctx) throw();
/**
* Informs the tapeserverd daemon that the mount-session child-process got
......@@ -145,31 +137,6 @@ public:
private:
/**
* Fills in the body of the update drive request and calls
* writeTapeUpdateDriveRqstMsg to send it to the tapeserver
*
* @param event The status of the tape with respect to the drive mount
* and unmount operations
* @param mode Read (read only), write (read/write), or dump
* @param clientType The client could be the gateway, readtp, writetp, or
* dumptp
* @param unitName The unit name of the tape drive.
* @param vid The Volume ID of the tape to be mounted.
*/
void updateDriveInfo(
const castor::legacymsg::TapeUpdateDriveRqstMsgBody::TapeEvent event,
const castor::tape::tapegateway::VolumeMode mode,
const castor::tape::tapegateway::ClientType clientType,
const std::string &unitName,
const std::string &vid);
/**
* Connects to the vdqmd daemon.
*
* @return The socket-descriptor of the connection with the vdqmd daemon.
*/
int connectToTapeserver() const;
/**
* Writes the specified message to the specified connection.
......@@ -184,7 +151,7 @@ private:
*
* @param fd The file-descriptor of the connection.
*/
void readReplyMsg(const int fd);
void readReplyMsg();
/**
* The object representing the API of the CASTOR logging system.
......@@ -206,6 +173,8 @@ private:
* write operations.
*/
const int m_netTimeout;
zmq::socket_t m_socket;
}; // class TapeserverProxyTcpIp
......
......@@ -41,6 +41,6 @@ castor::legacymsg::TapeserverProxyTcpIpFactory::~TapeserverProxyTcpIpFactory() t
//------------------------------------------------------------------------------
// create
//------------------------------------------------------------------------------
castor::legacymsg::TapeserverProxy *castor::legacymsg::TapeserverProxyTcpIpFactory::create() {
return new TapeserverProxyTcpIp(m_log, m_tapeserverPort, m_netTimeout);
castor::legacymsg::TapeserverProxy *castor::legacymsg::TapeserverProxyTcpIpFactory::create(zmq::context_t& ctx) {
return new TapeserverProxyTcpIp(m_log, m_tapeserverPort, m_netTimeout,ctx);
}
......@@ -56,7 +56,7 @@ public:
*
* @return A pointer to the newly created object.
*/
TapeserverProxy *create();
TapeserverProxy *create(zmq::context_t& ctx);
private:
......
add_library(castorMessages
add_library(castorMessages SHARED
Header.pb.cc
Heartbeat.pb.cc
NotifyDrive.pb.cc
)
target_link_libraries(castorMessages protobuf)
\ No newline at end of file
......@@ -21,6 +21,7 @@
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
namespace castor {
namespace messages{
......@@ -31,7 +32,17 @@ namespace castor {
};
struct reqType{
enum {
Heartbeat
NoReturnValue,
Heartbeat,
NotifyDriveBeforeMountStarted,
NotifyDriveTapeMounted,
NotifyDriveUnmountStarted,
NotifyDriveTapeUnmounted
};
};
struct protocolVersion{
enum {
prototype