Commit e25badf5 authored by Eric Cano's avatar Eric Cano
Browse files

Created a ClientInterface class for the tape server.

This class will handle all communication with the tape gateway of the command line clients.
It replaces the development code in the tapeSession class (removed).
parent 13698e5a
......@@ -3,7 +3,7 @@ add_executable(tapeserverd tapeserverd.cpp)
target_link_libraries(tapeserverd Exception SCSI System Utils File castorcommon
castorclient)
add_library(tapeserver tapeSession.cpp)
add_library(tapeserver clientInterface.cpp)
add_library(tapeserverdTest
clientSimulator.cpp
......
/******************************************************************************
* clientInterface.cpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "clientInterface.hpp"
#include "castor/io/ClientSocket.hpp"
#include "castor/tape/tapegateway/VolumeRequest.hpp"
#include "castor/tape/tapegateway/Volume.hpp"
#include "castor/tape/tapegateway/NoMoreFiles.hpp"
#include "castor/tape/tapegateway/EndNotificationErrorReport.hpp"
#include "castor/tape/tapegateway/EndNotification.hpp"
#include "castor/tape/tapegateway/NotificationAcknowledge.hpp"
#include <cxxabi.h>
#include <memory>
#include <stdlib.h>
#include <typeinfo>
castor::tape::server::
ClientInterface::ClientInterface(const legacymsg::RtcpJobRqstMsgBody& clientRequest)
throw (castor::tape::Exception): m_request(clientRequest) {
// 1) Placeholder: we might have to check with Cupv in the future.
// 2) Get the volume information from the client
try {
fetchVolumeId();
} catch (EndOfSession) {
reportEndOfSession();
}
// 3) Pre-retrieve the first file to work on in order to check we are not
// a blank mount.
}
castor::tape::server::ClientInterface::UnexpectedResponse::
UnexpectedResponse(const castor::IObject* resp, const std::string & w):
castor::tape::Exception(w) {
std::string responseType = typeid(*resp).name();
int status = -1;
char * demangled = abi::__cxa_demangle(responseType.c_str(), NULL, NULL, &status);
if (!status)
responseType = demangled;
free(demangled);
getMessage() << " Response type was: " << responseType;
}
tapegateway::GatewayMessage *
castor::tape::server::ClientInterface::requestResponseSession(
const tapegateway::GatewayMessage &req)
throw (castor::tape::Exception)
{
// 1) We re-open connection to client for each request-response exchange
castor::io::ClientSocket clientConnection(m_request.clientPort,
m_request.clientHost);
clientConnection.connect();
// 2) The actual exchange over the network.
clientConnection.sendObject(const_cast<tapegateway::GatewayMessage &>(req));
std::auto_ptr<castor::IObject> resp (clientConnection.readObject());
// 3) Check response type
tapegateway::GatewayMessage * ret =
dynamic_cast<tapegateway::GatewayMessage*>(resp.get());
if (NULL == ret) {
throw UnexpectedResponse(resp.get(),
"In castor::tape::server::clientInterface::requestResponseSession, "
"expected a tapegateway::GatewayMessage response. ");
}
// 4) Check we get a response for the request we sent (sanity check)
if ((ret->mountTransactionId() != m_request.volReqId) ||
ret->aggregatorTransactionId() != req.aggregatorTransactionId()) {
}
// Slightly ugly sequence in order to not duplicate the dynamic_cast.
resp.release();
return ret;
}
void castor::tape::server::ClientInterface::fetchVolumeId() throw (castor::tape::Exception) {
// 1) Build the request
castor::tape::tapegateway::VolumeRequest request;
request.setMountTransactionId(m_request.volReqId);
// The transaction id is auto-incremented (through the cast operator)
// so we just need to use it (and save the value locally if we need to reuse
// it)
request.setAggregatorTransactionId(m_transactionId);
request.setUnit(m_request.driveUnit);
// 2) get the reply from the client
std::auto_ptr<tapegateway::GatewayMessage>
response (requestResponseSession(request));
// 3) Process the possible outputs
tapegateway::Volume * volume;
tapegateway::NoMoreFiles * noMore;
tapegateway::EndNotificationErrorReport * errorReport;
// 3) a) Volume: this is the good day answer
if (NULL != (volume = dynamic_cast<tapegateway::Volume *>(response.get()))) {
m_vid = volume->vid();
m_clientType = volume->clientType();
m_density = volume->density();
m_labelObsolete = volume->label();
m_volumeMode = volume->mode();
// 3) b) Straight noMoreFiles answer: at least we know.
} else if (NULL != (noMore = dynamic_cast<tapegateway::NoMoreFiles *>(response.get()))) {
throw EndOfSession("Client replied noMoreFiles directly to volume request");
// 3) c) End notification error report.
} else if (NULL != (errorReport = dynamic_cast<tapegateway::EndNotificationErrorReport *>(
response.get()))) {
EndOfSession eos("Client replied noMoreFiles directly to volume request: ");
eos.getMessage() << "errorCode=" << errorReport->errorCode()
<< "errorReport=\"" << errorReport->errorMessage() << "\"";
// Unexpected response type
} else {
throw UnexpectedResponse(response.get(), "Unexpected response from client in response "
"to a volume request");
}
}
void castor::tape::server::ClientInterface::reportEndOfSession()
throw (castor::tape::Exception) {
// 1) Build the report
castor::tape::tapegateway::EndNotification report;
report.setMountTransactionId(m_request.volReqId);
report.setAggregatorTransactionId(m_transactionId);
// 2) Send the report
std::auto_ptr<tapegateway::GatewayMessage> ack(
requestResponseSession(report));
// 3) If we did not get a ack, complain (not much more we can do)
// We could use the castor typing here, but we stick to case for homogeneity
// of the code.
try {
dynamic_cast<tapegateway::NotificationAcknowledge &>(*ack.get());
} catch (std::bad_cast) {
throw UnexpectedResponse(ack.get(),
"Unexpected response when reporting end of session");
}
}
\ No newline at end of file
/******************************************************************************
* clientInterface.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "../../legacymsg/RtcpJobRqstMsgBody.hpp"
#include "../exception/Exception.hpp"
#include "castor/tape/tapegateway/GatewayMessage.hpp"
#include "castor/tape/tapegateway/ClientType.hpp"
#include "castor/tape/tapegateway/VolumeMode.hpp"
#include "../threading/Threading.hpp"
using namespace castor::tape;
namespace castor {
namespace tape {
namespace server {
/**
* A class managing the communications with the tape server's client.
* The client address will have been received from the VDQM by the main
* process.
*/
class ClientInterface {
public:
/**
* Constructor: contact client, gather initial information about the
* session and decide get that information ready for th user of the class
* (read/write session, first file information, etc...)
* @param clientRequest the client information as sent by VDQM.
*/
ClientInterface(const legacymsg::RtcpJobRqstMsgBody & clientRequest)
throw (castor::tape::Exception);
/**
* Get the VID we requested from client at construction time.
* @return the VID we got from the client.
*/
std::string getVid() throw (castor::tape::Exception) {
return m_vid;
}
/**
* Exception thrown when the wrong response type was received from
* the client after a request. Extracts the type and prints it.
*/
class UnexpectedResponse: public castor::tape::Exception {
public:
UnexpectedResponse(const castor::IObject * resp, const std::string & w="");
};
/**
* Exception marking end of session
*/
class EndOfSession: public castor::tape::Exception {
public:
EndOfSession(std::string w=""):castor::tape::Exception(w) {}
};
private:
/** The VDQM request that kickstarted the session */
legacymsg::RtcpJobRqstMsgBody m_request;
/** The VID we will work on */
std::string m_vid;
/** The type of the session */
tapegateway::ClientType m_clientType;
/** The density of the volume */
std::string m_density;
/** The label field seems to be in disuse */
std::string m_labelObsolete;
/** */
tapegateway::VolumeMode m_volumeMode;
/**
* A helper function managing a single request-response session with the
* client.
* @param req the request to send to the client
* @return the response from the client
*/
tapegateway::GatewayMessage * requestResponseSession(
const tapegateway::GatewayMessage &req) throw (castor::tape::Exception);
/**
* A helper retrieving the volume Id from the client (with transfer direction)
* Throws an EndOfSession exception
*/
void fetchVolumeId() throw (Exception);
/**
* A helper sending an end of session to the client.
*/
void reportEndOfSession() throw (Exception);
/**
* A helper class managing a thread safe message counter (we need it thread
* safe as the ClientInterface class will be used by both the getting of
* the work to be done and the reporting of the completed work, in parallel
*/
template <typename T>
class ThreadSafeCounter {
public:
ThreadSafeCounter(): m_val(0) {};
operator T () {
threading::MutexLocker ml(&m_mutex);
return ++m_val;
}
private:
T m_val;
threading::Mutex m_mutex;
};
/** The file transaction id a.k.a. aggregator transaction id. This is the
* serial number of the message in the session */
ThreadSafeCounter<uint32_t> m_transactionId;
};
}
}
}
\ No newline at end of file
......@@ -43,7 +43,15 @@ namespace server {
struct ipPort {
ipPort(uint32_t i, uint16_t p): ip(i), port(p) {}
uint32_t ip;
union {
uint32_t ip;
struct {
uint8_t a;
uint8_t b;
uint8_t c;
uint8_t d;
};
};
uint16_t port;
};
struct ipPort getCallbackAddress() {
......
/******************************************************************************
* tapeSession.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include <memory>
#include "tapeSession.hpp"
#include "castor/io/ClientSocket.hpp"
#include "castor/tape/tapegateway/VolumeRequest.hpp"
#include "castor/tape/tapegateway/Volume.hpp"
using namespace castor::tape;
castor::IObject *
castor::tape::server::tapeSession::requestReplySession(
tapegateway::GatewayMessage &req) {
// Open connection to client
castor::io::ClientSocket clientConnection(m_client.port, m_client.ip);
clientConnection.connect();
clientConnection.sendObject(req);
castor::IObject * ret = clientConnection.readObject();
return ret;
}
void castor::tape::server::tapeSession::getVolume() {
tapegateway::VolumeRequest volReq;
volReq.setMountTransactionId(m_mountTransactionId);
std::auto_ptr<castor::IObject> reply(requestReplySession(volReq));
// Get a Volume object or die
tapegateway::Volume & vol = dynamic_cast<tapegateway::Volume &>(*reply);
m_vid = vol.vid();
}
/******************************************************************************
* tapeSession.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include <sys/socket.h>
#include <stdint.h>
#include <string>
#include "../../tapegateway/GatewayMessage.hpp"
namespace castor {
namespace tape {
namespace server {
class tapeSession {
public:
tapeSession(uint32_t client_ip, uint16_t client_port,
uint64_t mountTransactionId):
m_client(client_ip, client_port),
m_mountTransactionId(mountTransactionId) {}
std::string m_vid;
void getVolume();
private:
castor::IObject *
requestReplySession(castor::tape::tapegateway::GatewayMessage &req);
// Address of the client (in terms of tape server, as it's a TCP SERVER)
struct clientAddr {
clientAddr(uint32_t cip, uint16_t cport):
ip(cip), port(cport) {}
uint32_t ip;
uint16_t port;
} m_client;
uint64_t m_mountTransactionId;
};
}
}
}
......@@ -24,7 +24,7 @@
#include <gtest/gtest.h>
#include "clientSimulator.hpp"
#include "tapeSession.hpp"
#include "clientInterface.hpp"
#include "../threading/Threading.hpp"
using namespace castor::tape::server;
......@@ -42,21 +42,32 @@ private:
};
TEST(tapeServer, simpleGetVol) {
TEST(tapeServer, ClientInterfaceGoodDay) {
// TpcpClients only supports 32 bits session number
// This number has to be less than 2^31 as in addition there is a mix
// of signed and unsigned numbers
// As the current ids in prod are ~30M, we are far from overflow (Feb 2013)
// 1) prepare the client and run it in another thread
uint32_t volReq = 0xBEEF;
std::string vid = "V12345";
clientSimulator sim(volReq, vid);
struct clientSimulator::ipPort clientAddr = sim.getCallbackAddress();
clientRunner simRun(sim);
simRun.start();
tapeSession sess(clientAddr.ip, clientAddr.port, volReq);
sess.getVolume();
// 2) Prepare the VDQM request
castor::tape::legacymsg::RtcpJobRqstMsgBody VDQMjob;
snprintf(VDQMjob.clientHost, CA_MAXHOSTNAMELEN+1, "%d.%d.%d.%d",
clientAddr.a, clientAddr.b, clientAddr.c, clientAddr.d);
snprintf(VDQMjob.driveUnit, CA_MAXUNMLEN+1, "/dev/nstXXX");
snprintf(VDQMjob.dgn, CA_MAXDGNLEN+1, "TAPELIBX");
VDQMjob.clientPort = clientAddr.port;
VDQMjob.volReqId = volReq;
// 3) Instantiate the client interface
ClientInterface sess(VDQMjob);
simRun.wait();
ASSERT_EQ("V12345", sess.m_vid);
ASSERT_EQ("V12345", sess.getVid());
}
}
......@@ -22,6 +22,8 @@
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include <pthread.h>
#include <semaphore.h>
#include "castor/exception/Errnum.hpp"
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment