Commit 0ca18445 authored by Steven Murray's avatar Steven Murray
Browse files

Added initial structure of AcsProxy

parent 54e7d8ed
// This file is part of the Castor project.
// See http://castor.web.cern.ch/castor
//
// Copyright (C) 2003 CERN
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
//
// @author Castor Dev team, castor-dev@cern.ch
package castor.messages;
message AcsMountTapeForRecall {
required string vid = 1;
required uint32 acs = 2;
required uint32 lsm = 3;
required uint32 panel = 4;
required uint32 drive = 5;
}
......@@ -16,13 +16,15 @@
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "castor/messages/TapeserverProxyFactory.hpp"
#include "castor/messages/AcsProxy.hpp"
//------------------------------------------------------------------------------
//-----------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
castor::messages::TapeserverProxyFactory::~TapeserverProxyFactory() throw() {
//-----------------------------------------------------------------------------
castor::messages::AcsProxy::~AcsProxy() {
}
......@@ -21,35 +21,37 @@
#pragma once
#include "castor/messages/TapeserverProxy.hpp"
#include <stdint.h>
#include <string>
namespace castor {
namespace messages {
/**
* Abstract factory for creating objects of type TapeserverProxy.
* Abstract class defining the interface to a proxy object representing the
* CASTOR ACS daemon.
*/
class TapeserverProxyFactory {
class AcsProxy {
public:
/**
* Destructor.
*/
virtual ~TapeserverProxyFactory() throw() = 0;
virtual ~AcsProxy() = 0;
/**
* Creates an object of type TapeserverProxy on the heap and returns a pointer to
* it.
* Request the CASTOR ACS daemon to mount the specifed tape for recall.
*
* Please note that it is the responsibility of the caller to deallocate the
* proxy object from the heap.
*
* @param zmqContext The ZMQ context.
* @return A pointer to the newly created object.
* @param vid The tape to be mounted.
* @param acs The ACS identifier.
* @param lsm The LSM identifier.
* @param panel The panel identifier.
* @param drive The drive identifier.
*/
virtual TapeserverProxy *create(void *const zmqContext) = 0;
virtual void mountTapeForRecall(const std::string &vid, const uint32_t acs,
const uint32_t lsm, const uint32_t panel, const uint32_t drive) = 0;
}; // class TapeserverProxyFactory
}; // class AcsProxy
} // namespace messages
} // namespace castor
......
......@@ -19,18 +19,12 @@
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "castor/messages/TapeserverProxyDummy.hpp"
#include "castor/messages/TapeserverProxyDummyFactory.hpp"
#include "castor/log/LogContext.hpp"
//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
castor::messages::TapeserverProxyDummyFactory::~TapeserverProxyDummyFactory() throw() {
}
#include "castor/messages/AcsProxyDummy.hpp"
//------------------------------------------------------------------------------
// create
// mountTapeForRecall
//------------------------------------------------------------------------------
castor::messages::TapeserverProxy *castor::messages::TapeserverProxyDummyFactory::create(void *const zmqContext) {
return new TapeserverProxyDummy();
void castor::messages::AcsProxyDummy::mountTapeForRecall(const std::string &vid,
const uint32_t acs, const uint32_t lsm, const uint32_t panel,
const uint32_t drive) {
}
......@@ -21,35 +21,30 @@
#pragma once
#include "castor/messages/TapeserverProxyFactory.hpp"
#include "castor/messages/AcsProxy.hpp"
namespace castor {
namespace messages {
/**
* Concrete factory for creating objects of type TapeserverProxyDummy.
* Concrete class implementimg a dummy AcsProxy.
*/
class TapeserverProxyDummyFactory: public TapeserverProxyFactory {
class AcsProxyDummy: public AcsProxy {
public:
/**
* Destructor.
*/
~TapeserverProxyDummyFactory() throw();
/**
* Creates an object of type TapeserverProxyDummy on the heap and returns a pointer
* to it.
*
* Please note that it is the responsibility of the caller to deallocate the
* proxy object from the heap.
* Request the CASTOR ACS daemon to mount the specifed tape for recall.
*
* @param zmqContext The ZMQ context.
* @return A pointer to the newly created object.
* @param vid The tape to be mounted.
* @param acs The ACS identifier.
* @param lsm The LSM identifier.
* @param panel The panel identifier.
* @param drive The drive identifier.
*/
TapeserverProxy *create(void *const zmqContext);
void mountTapeForRecall(const std::string &vid, const uint32_t acs,
const uint32_t lsm, const uint32_t panel, const uint32_t drive);
}; // class TapeserverProxyDummyFactory
}; // class AcsProxyDummy
} // namespace messages
} // namespace castor
......
......@@ -19,27 +19,79 @@
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "castor/messages/TapeserverProxyZmq.hpp"
#include "castor/messages/TapeserverProxyZmqFactory.hpp"
#include "castor/messages/AcsMountTapeForRecall.pb.h"
#include "castor/messages/AcsProxyZmq.hpp"
#include "castor/messages/Constants.hpp"
#include "castor/messages/messages.hpp"
#include "castor/messages/ReturnValue.pb.h"
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
castor::messages::TapeserverProxyZmqFactory::TapeserverProxyZmqFactory(log::Logger &log, const unsigned short tapeserverPort, const int netTimeout) throw():
castor::messages::AcsProxyZmq::AcsProxyZmq(log::Logger &log,
const unsigned short serverPort, void *const zmqContext) throw():
m_log(log),
m_tapeserverPort(tapeserverPort),
m_netTimeout(netTimeout) {
m_serverPort(serverPort),
m_serverSocket(zmqContext, ZMQ_REQ) {
connectZmqSocketToLocalhost(m_serverSocket, serverPort);
}
//------------------------------------------------------------------------------
// destructor
// mountTapeForRecall
//------------------------------------------------------------------------------
castor::messages::TapeserverProxyZmqFactory::~TapeserverProxyZmqFactory() throw() {
void castor::messages::AcsProxyZmq::mountTapeForRecall(const std::string &vid,
const uint32_t acs, const uint32_t lsm, const uint32_t panel,
const uint32_t drive) {
try {
const Frame rqst = createAcsMountTapeForRecallFrame(vid, acs, lsm, panel,
drive);
sendFrame(m_serverSocket, rqst);
ReturnValue reply;
recvTapeReplyOrEx(m_serverSocket, reply);
if(0 != reply.value()) {
// Should never get here
castor::exception::Exception ex;
ex.getMessage() << "Received an unexpected return value"
": expected=0 actual=" << reply.value();
throw ex;
}
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() <<
"Failed to request CASTOR ACS daemon to mount tape for recall: " <<
"vid=" << vid << " acs=" << acs << " lsm=" << lsm << " panel=" << panel <<
" drive=" << drive << ": " << ne.getMessage().str();
throw ex;
}
}
//------------------------------------------------------------------------------
// create
// createAcsMountTapeForRecallFrame
//------------------------------------------------------------------------------
castor::messages::TapeserverProxy *castor::messages::TapeserverProxyZmqFactory::create(void *const zmqContext) {
return new TapeserverProxyZmq(m_log, m_tapeserverPort, m_netTimeout, zmqContext);
castor::messages::Frame castor::messages::AcsProxyZmq::
createAcsMountTapeForRecallFrame(const std::string &vid, const uint32_t acs,
const uint32_t lsm, const uint32_t panel, const uint32_t drive) {
try {
Frame frame;
frame.header = messages::protoTapePreFillHeader();
frame.header.set_msgtype(messages::MSG_TYPE_ACSMOUNTTAPEFORRECALL);
frame.header.set_bodysignature("PIPO");
AcsMountTapeForRecall body;
body.set_vid(vid);
body.set_acs(acs);
body.set_lsm(lsm);
body.set_panel(panel);
body.set_drive(drive);
frame.serializeProtocolBufferIntoBody(body);
return frame;
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() << "Failed to create AcsMountTapeForRecall frame: " <<
ne.getMessage().str();
throw ex;
}
}
......@@ -21,42 +21,42 @@
#pragma once
#include "castor/messages/TapeserverProxyFactory.hpp"
#include "castor/log/Logger.hpp"
#include "castor/messages/AcsProxy.hpp"
#include "castor/messages/Frame.hpp"
#include "castor/messages/ZmqSocketMT.hpp"
namespace castor {
namespace messages {
/**
* Concrete factory for creating objects of type TapeserverProxyZmq.
* Concrete class providing a ZMQ implementation of an AcsProxy.
*/
class TapeserverProxyZmqFactory: public TapeserverProxyFactory {
class AcsProxyZmq: public AcsProxy {
public:
/**
* Constructor.
*
* @param log The object representing the API of the CASTOR logging system.
* @param netTimeout The timeout in seconds to be applied when performing
* network read and write operations.
*/
TapeserverProxyZmqFactory(log::Logger &log, const unsigned short tapeserverPort, const int netTimeout) throw();
/**
* Destructor.
* @param serverPort The TCP/IP port on which the CASTOR ACS daemon is listening
* for ZMQ messages.
* @param zmqContext The ZMQ context.
*/
~TapeserverProxyZmqFactory() throw();
AcsProxyZmq(log::Logger &log, const unsigned short serverPort,
void *const zmqContext) throw();
/**
* Creates an object of type TapeserverProxyZmq on the heap and returns a pointer
* to it.
*
* Please note that it is the responsibility of the caller to deallocate the
* proxy object from the heap.
* Request the CASTOR ACS daemon to mount the specifed tape for recall.
*
* @param zmqContext The ZMQ context.
* @return A pointer to the newly created object.
* @param vid The tape to be mounted.
* @param acs The ACS identifier.
* @param lsm The LSM identifier.
* @param panel The panel identifier.
* @param drive The drive identifier.
*/
TapeserverProxy *create(void *const zmqContext);
void mountTapeForRecall(const std::string &vid, const uint32_t acs,
const uint32_t lsm, const uint32_t panel, const uint32_t drive);
private:
......@@ -66,17 +66,31 @@ private:
log::Logger &m_log;
/**
* The TCP/IP port on which the vdqmd daemon is listening.
* The TCP/IP port on which the CASTOR ACS daemon is listening for ZMQ
* messages.
*/
const unsigned short m_tapeserverPort;
const unsigned short m_serverPort;
/**
* The timeout in seconds to be applied when performing network read and
* write operations.
* Socket connecting this proxy the daemon it represents.
*/
ZmqSocketMT m_serverSocket;
/**
* Creates a frame containing a AcsMountTapeForRecall message.
*
* @param vid The tape to be mounted.
* @param acs The ACS identifier.
* @param lsm The LSM identifier.
* @param panel The panel identifier.
* @param drive The drive identifier.
* @return The frame.
*/
const int m_netTimeout;
Frame createAcsMountTapeForRecallFrame(const std::string &vid,
const uint32_t acs, const uint32_t lsm, const uint32_t panel,
const uint32_t drive);
}; // class TapeserverProxyZmqFactory
}; // class AcsProxyZmq
} // namespace messages
} // namespace castor
......
......@@ -5,16 +5,16 @@ PROTOBUF_GENERATE_CPP(ProtoSources ProtoHeaders ${ProtoFiles})
add_library(castormessages SHARED
${ProtoSources}
AcsProxy.cpp
AcsProxyDummy.cpp
AcsProxyZmq.cpp
Constants.cpp
Frame.cpp
messages.cpp
SmartZmqContext.cpp
TapeserverProxy.cpp
TapeserverProxyDummy.cpp
TapeserverProxyDummyFactory.cpp
TapeserverProxyFactory.cpp
TapeserverProxyZmq.cpp
TapeserverProxyZmqFactory.cpp
ZmqMsg.cpp
ZmqSocket.cpp
ZmqSocketMT.cpp
......
......@@ -70,6 +70,8 @@ const char *castor::messages::msgTypeToString(const MsgType msgType) throw() {
return "TapeUnmounted";
case MSG_TYPE_LABELERROR:
return "LabelError";
case MSG_TYPE_ACSMOUNTTAPEFORRECALL:
return "AcsMountTapeForRecall";
default:
return "Unknown";
}
......
......@@ -52,7 +52,8 @@ enum MsgType {
MSG_TYPE_TAPEMOUNTEDFORRECALL,
MSG_TYPE_TAPEUNMOUNTSTARTED,
MSG_TYPE_TAPEUNMOUNTED,
MSG_TYPE_LABELERROR
MSG_TYPE_LABELERROR,
MSG_TYPE_ACSMOUNTTAPEFORRECALL
};
enum ProtocolVersion {
......
//
// castor/messages/TapeserverProxyFactory.cpp
//
// This file is part of the Castor project.
// See http://castor.web.cern.ch/castor
//
......
......@@ -21,9 +21,8 @@
#pragma once
#include "castor/log/Logger.hpp"
#include "castor/exception/Exception.hpp"
#include <memory>
#include <stdint.h>
#include <string>
......@@ -36,11 +35,9 @@ namespace messages {
*/
class TapeserverProxy {
public:
/**
* Destructor.
*
* Closes the listening socket created in the constructor to listen for
* connections from the vdqmd daemon.
*/
virtual ~TapeserverProxy() = 0;
......
......@@ -21,18 +21,6 @@
#include "castor/messages/TapeserverProxyDummy.hpp"
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
castor::messages::TapeserverProxyDummy::TapeserverProxyDummy() throw() {
}
//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
castor::messages::TapeserverProxyDummy::~TapeserverProxyDummy() throw() {
}
//------------------------------------------------------------------------------
// gotRecallJobFromTapeGateway
//------------------------------------------------------------------------------
......
......@@ -32,16 +32,6 @@ namespace messages {
class TapeserverProxyDummy: public TapeserverProxy {
public:
/**
* Constructor.
*/
TapeserverProxyDummy() throw();
/**
* Destructor.
*/
~TapeserverProxyDummy() throw();
/**
* Notifies the tapeserverd daemon that the mount-session child-process got
* a recall job from the tapegatewayd daemon.
......
......@@ -19,7 +19,6 @@
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "castor/io/io.hpp"
#include "castor/messages/Heartbeat.pb.h"
#include "castor/messages/Header.pb.h"
#include "castor/messages/Constants.hpp"
......@@ -36,26 +35,16 @@
#include "castor/messages/TapeserverProxyZmq.hpp"
#include "castor/messages/TapeUnmounted.pb.h"
#include "castor/messages/TapeUnmountStarted.pb.h"
#include "castor/tape/tapegateway/ClientType.hpp"
#include "castor/tape/tapegateway/VolumeMode.hpp"
#include "castor/utils/SmartFd.hpp"
#include "castor/utils/utils.hpp"
#include "h/Ctape.h"
#include "h/rtcp_constants.h"
#include "h/vdqm_constants.h"
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
castor::messages::TapeserverProxyZmq::TapeserverProxyZmq(log::Logger &log,
const unsigned short tapeserverPort, const int netTimeout,
void *const zmqContext) throw():
const unsigned short serverPort, void *const zmqContext) throw():
m_log(log),
m_tapeserverHostName("localhost"),
m_tapeserverPort(tapeserverPort),
m_netTimeout(netTimeout),
m_tapeserverSocket(zmqContext, ZMQ_REQ) {
connectZmqSocketToLocalhost(m_tapeserverSocket, tapeserverPort);
m_serverPort(serverPort),
m_serverSocket(zmqContext, ZMQ_REQ) {
connectZmqSocketToLocalhost(m_serverSocket, serverPort);
}
//------------------------------------------------------------------------------
......@@ -65,10 +54,10 @@ void castor::messages::TapeserverProxyZmq::gotRecallJobFromTapeGateway(
const std::string &vid, const std::string &unitName) {
try {
const Frame rqst = createRecallJobFromTapeGatewayFrame(vid, unitName);
sendFrame(m_tapeserverSocket, rqst);
sendFrame(m_serverSocket, rqst);
ReturnValue reply;
recvTapeReplyOrEx(m_tapeserverSocket, reply);
recvTapeReplyOrEx(m_serverSocket, reply);
if(0 != reply.value()) {
// Should never get here
castor::exception::Exception ex;
......@@ -121,10 +110,10 @@ void castor::messages::TapeserverProxyZmq::gotRecallJobFromReadTp(
const std::string &vid, const std::string &unitName) {
try {
const Frame rqst = createRecallJobFromReadTpFrame(vid, unitName);
sendFrame(m_tapeserverSocket, rqst);
sendFrame(m_serverSocket, rqst);
ReturnValue reply;
recvTapeReplyOrEx(m_tapeserverSocket, reply);
recvTapeReplyOrEx(m_serverSocket, reply);
if(0 != reply.value()) {
// Should never get here
castor::exception::Exception ex;
......@@ -176,10 +165,10 @@ uint32_t castor::messages::TapeserverProxyZmq::gotMigrationJobFromTapeGateway(
const std::string &vid, const std::string &unitName) {
try {
const Frame rqst = createMigrationJobFromTapeGatewayFrame(vid, unitName);
sendFrame(m_tapeserverSocket, rqst);
sendFrame(m_serverSocket, rqst);
NbFilesOnTape reply;
recvTapeReplyOrEx(m_tapeserverSocket, reply);
recvTapeReplyOrEx(m_serverSocket, reply);
return reply.nbfiles();
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
......@@ -225,10 +214,10 @@ uint32_t castor::messages::TapeserverProxyZmq::gotMigrationJobFromWriteTp(
const std::string &vid, const std::string &unitName) {
try {
const Frame rqst = createMigrationJobFromWriteTpFrame(vid, unitName);
sendFrame(m_tapeserverSocket, rqst);
sendFrame(m_serverSocket, rqst);
NbFilesOnTape reply;
recvTapeReplyOrEx(m_tapeserverSocket, reply);
recvTapeReplyOrEx(m_serverSocket, reply);
return reply.nbfiles();
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
......@@ -274,10 +263,10 @@ void castor::messages::TapeserverProxyZmq::tapeMountedForRecall(
const std::string &vid, const std::string &unitName) {
try {
const Frame rqst = createTapeMountedForRecallFrame(vid, unitName);
sendFrame(m_tapeserverSocket, rqst);
sendFrame(m_serverSocket, rqst);
ReturnValue reply;
recvTapeReplyOrEx(m_tapeserverSocket, reply);
recvTapeReplyOrEx(m_serverSocket, reply);
if(0 != reply.value()) {
// Should never get here
castor::exception::Exception ex;
......@@ -329,10 +318,10 @@ void castor::messages::TapeserverProxyZmq::tapeMountedForMigration(
const std::string &vid, const std::string &unitName) {
try {
const Frame rqst = createTapeMountedForMigrationFrame(vid, unitName);
sendFrame(m_tapeserverSocket, rqst);
sendFrame(m_serverSocket, rqst);
ReturnValue reply;
recvTapeReplyOrEx(m_tapeserverSocket, reply);
recvTapeReplyOrEx(m_serverSocket, reply);
if(0 != reply.value()) {
// Should never get here
castor::exception::Exception ex;
......@@ -384,10 +373,10 @@ void castor::messages::TapeserverProxyZmq::tapeUnmountStarted(
const std::string &vid, const std::string &unitName) {
try {
const Frame rqst = createTapeUnmountStartedFrame(vid, unitName);
sendFrame(m_tapeserverSocket, rqst);
sendFrame(m_serverSocket, rqst);
ReturnValue reply;
recvTapeReplyOrEx(m_tapeserverSocket, reply);
recvTapeReplyOrEx(m_serverSocket, reply);
if(0 != reply.value()) {
// Should never get here
castor::exception::Exception ex;
......@@ -439,10 +428,10 @@ void castor::messages::TapeserverProxyZmq::tapeUnmounted(
const std::string &vid, const std::string &unitName) {
try {
const Frame rqst = createTapeUnmountedFrame(vid, unitName);
sendFrame(m_tapeserverSocket, rqst);
sendFrame(m_serverSocket, rqst);
ReturnValue reply;
recvTapeReplyOrEx(m_tapeserverSocket, reply);
recvTapeReplyOrEx(m_serverSocket, reply);
if(0 != reply.value()) {