Commit b9a116d5 authored by Eric Cano's avatar Eric Cano
Browse files

Created a MountSession class.

The MountSession will be the basis of the forked tape sessions.
ClientInterface is now a utility for it.
Added timing support to ClientInterface functions.
MountSession logs through the scoped log interface.
parent c0de2450
......@@ -3,7 +3,7 @@ add_executable(tapeserverd TapeDaemon.cpp TapeDaemonMain.cpp)
target_link_libraries(tapeserverd Exception SCSI System Utils File castorcommon
castorclient)
add_library(tapeserver ClientInterface.cpp)
add_library(tapeserver ClientInterface.cpp MountSession.cpp)
add_library(tapeserverdTest
ClientSimulator.cpp
......
......@@ -30,6 +30,7 @@
#include "castor/tape/tapegateway/EndNotificationErrorReport.hpp"
#include "castor/tape/tapegateway/EndNotification.hpp"
#include "castor/tape/tapegateway/NotificationAcknowledge.hpp"
#include "castor/tape/utils/Timer.hpp"
#include <cxxabi.h>
#include <memory>
#include <stdlib.h>
......@@ -37,17 +38,7 @@
castor::tape::server::
ClientInterface::ClientInterface(const legacymsg::RtcpJobRqstMsgBody& clientRequest)
throw (castor::tape::Exception): m_request(clientRequest) {
// 1) Placeholder: we might have to check with Cupv in the future.
// 2) Get the volume information from the client
try {
fetchVolumeId();
} catch (EndOfSession) {
reportEndOfSession();
}
// 3) Pre-retrieve the first file to work on in order to check we are not
// a blank mount.
}
throw (castor::tape::Exception): m_request(clientRequest) {}
castor::tape::server::ClientInterface::UnexpectedResponse::
UnexpectedResponse(const castor::IObject* resp, const std::string & w):
......@@ -63,16 +54,21 @@ castor::tape::Exception(w) {
tapegateway::GatewayMessage *
castor::tape::server::ClientInterface::requestResponseSession(
const tapegateway::GatewayMessage &req)
const tapegateway::GatewayMessage &req,
RequestReport & report)
throw (castor::tape::Exception)
{
// 0) Start the stopwatch
castor::tape::utils::Timer timer;
// 1) We re-open connection to client for each request-response exchange
castor::io::ClientSocket clientConnection(m_request.clientPort,
m_request.clientHost);
clientConnection.connect();
report.connectDuration = timer.secs();
// 2) The actual exchange over the network.
clientConnection.sendObject(const_cast<tapegateway::GatewayMessage &>(req));
std::auto_ptr<castor::IObject> resp (clientConnection.readObject());
std::auto_ptr<castor::IObject> resp (clientConnection.readObject());
report.sendRecvDuration = timer.secs();
// 3) Check response type
tapegateway::GatewayMessage * ret =
dynamic_cast<tapegateway::GatewayMessage*>(resp.get());
......@@ -91,29 +87,32 @@ throw (castor::tape::Exception)
return ret;
}
void castor::tape::server::ClientInterface::fetchVolumeId() throw (castor::tape::Exception) {
void castor::tape::server::ClientInterface::fetchVolumeId(
VolumeInfo & volInfo, RequestReport &report)
throw (castor::tape::Exception) {
// 1) Build the request
castor::tape::tapegateway::VolumeRequest request;
request.setMountTransactionId(m_request.volReqId);
// The transaction id is auto-incremented (through the cast operator)
// so we just need to use it (and save the value locally if we need to reuse
// it)
request.setAggregatorTransactionId(m_transactionId);
report.transactionId = m_transactionId;
request.setAggregatorTransactionId(report.transactionId);
request.setUnit(m_request.driveUnit);
// 2) get the reply from the client
std::auto_ptr<tapegateway::GatewayMessage>
response (requestResponseSession(request));
response (requestResponseSession(request, report));
// 3) Process the possible outputs
tapegateway::Volume * volume;
tapegateway::NoMoreFiles * noMore;
tapegateway::EndNotificationErrorReport * errorReport;
// 3) a) Volume: this is the good day answer
if (NULL != (volume = dynamic_cast<tapegateway::Volume *>(response.get()))) {
m_vid = volume->vid();
m_clientType = volume->clientType();
m_density = volume->density();
m_labelObsolete = volume->label();
m_volumeMode = volume->mode();
volInfo.vid = volume->vid();
volInfo.clientType = volume->clientType();
volInfo.density = volume->density();
volInfo.labelObsolete = volume->label();
volInfo.volumeMode = volume->mode();
// 3) b) Straight noMoreFiles answer: at least we know.
} else if (NULL != (noMore = dynamic_cast<tapegateway::NoMoreFiles *>(response.get()))) {
throw EndOfSession("Client replied noMoreFiles directly to volume request");
......@@ -130,15 +129,16 @@ void castor::tape::server::ClientInterface::fetchVolumeId() throw (castor::tape:
}
}
void castor::tape::server::ClientInterface::reportEndOfSession()
void castor::tape::server::ClientInterface::reportEndOfSession(
RequestReport &transactionReport)
throw (castor::tape::Exception) {
// 1) Build the report
castor::tape::tapegateway::EndNotification report;
report.setMountTransactionId(m_request.volReqId);
report.setAggregatorTransactionId(m_transactionId);
castor::tape::tapegateway::EndNotification endReport;
endReport.setMountTransactionId(m_request.volReqId);
endReport.setAggregatorTransactionId(m_transactionId);
// 2) Send the report
std::auto_ptr<tapegateway::GatewayMessage> ack(
requestResponseSession(report));
requestResponseSession(endReport, transactionReport));
// 3) If we did not get a ack, complain (not much more we can do)
// We could use the castor typing here, but we stick to case for homogeneity
// of the code.
......
......@@ -24,8 +24,8 @@
#pragma once
#include "../../legacymsg/RtcpJobRqstMsgBody.hpp"
#include "../exception/Exception.hpp"
#include "castor/tape/legacymsg/RtcpJobRqstMsgBody.hpp"
#include "castor/tape/tapeserver/exception/Exception.hpp"
#include "castor/tape/tapegateway/GatewayMessage.hpp"
#include "castor/tape/tapegateway/ClientType.hpp"
#include "castor/tape/tapegateway/VolumeMode.hpp"
......@@ -53,12 +53,50 @@ namespace server {
throw (castor::tape::Exception);
/**
* Get the VID we requested from client at construction time.
* @return the VID we got from the client.
* Class holding the timing information for the request/reply,
* and the message sequence Id.
*/
std::string getVid() throw (castor::tape::Exception) {
return m_vid;
}
class RequestReport {
public:
RequestReport(): transactionId(0),
connectDuration(0), sendRecvDuration(0) {}
uint32_t transactionId;
double connectDuration;
double sendRecvDuration;
};
/**
* Class holding the result of a Volume request
*/
class VolumeInfo {
public:
VolumeInfo() {};
/** The VID we will work on */
std::string vid;
/** The type of the session */
tapegateway::ClientType clientType;
/** The density of the volume */
std::string density;
/** The label field seems to be in disuse */
std::string labelObsolete;
/** The read/write mode */
tapegateway::VolumeMode volumeMode;
};
/**
* Retrieves the volume Id from the client (with transfer direction)
* Throws an EndOfSession exception
* @param report report on timing and request Id. It will still be filled
* up and can be used when a exception is thrown.
* @return the transaction id
*/
void fetchVolumeId(VolumeInfo & volInfo, RequestReport &report) throw (castor::tape::Exception);
/**
* Reports end of session to the client. This should be the last call to
* the client.
*/
void reportEndOfSession(RequestReport &report) throw (Exception);
/**
* Exception thrown when the wrong response type was received from
......@@ -76,20 +114,11 @@ namespace server {
public:
EndOfSession(std::string w=""):castor::tape::Exception(w) {}
};
private:
/** The VDQM request that kickstarted the session */
legacymsg::RtcpJobRqstMsgBody m_request;
/** The VID we will work on */
std::string m_vid;
/** The type of the session */
tapegateway::ClientType m_clientType;
/** The density of the volume */
std::string m_density;
/** The label field seems to be in disuse */
std::string m_labelObsolete;
/** */
tapegateway::VolumeMode m_volumeMode;
/**
* A helper function managing a single request-response session with the
* client.
......@@ -97,17 +126,8 @@ namespace server {
* @return the response from the client
*/
tapegateway::GatewayMessage * requestResponseSession(
const tapegateway::GatewayMessage &req) throw (castor::tape::Exception);
/**
* A helper retrieving the volume Id from the client (with transfer direction)
* Throws an EndOfSession exception
*/
void fetchVolumeId() throw (Exception);
/**
* A helper sending an end of session to the client.
*/
void reportEndOfSession() throw (Exception);
const tapegateway::GatewayMessage &req,
RequestReport & report) throw (castor::tape::Exception);
/**
* A helper class managing a thread safe message counter (we need it thread
......
/******************************************************************************
* MountSession.cpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "MountSession.hpp"
#include "castor/log/LogContext.hpp"
#include "castor/tape/tapeserver/exception/Exception.hpp"
#include "ClientInterface.hpp"
#include "log.h"
#include "stager_client_commandline.h"
using namespace castor::tape::server;
using namespace castor::log;
castor::tape::server::MountSession::MountSession(
const legacymsg::RtcpJobRqstMsgBody& clientRequest,
castor::log::Logger& logger):
m_request(clientRequest), m_logger(logger), m_clientIf(clientRequest) {}
void castor::tape::server::MountSession::execute()
throw (castor::tape::Exception) {
// 1) Prepare the logging environment
LogContext lc(m_logger);
LogContext::ScopedParam sp01(lc, Param("clientHost", m_request.clientHost));
LogContext::ScopedParam sp02(lc, Param("clientPort", m_request.clientPort));
LogContext::ScopedParam sp03(lc, Param("mountTransactionId", m_request.volReqId));
LogContext::ScopedParam sp04(lc, Param("volReqId", m_request.volReqId));
LogContext::ScopedParam sp05(lc, Param("driveUnit", m_request.driveUnit));
LogContext::ScopedParam sp06(lc, Param("dgn", m_request.dgn));
// 2) Get initial information from the client
try {
ClientInterface::RequestReport reqReport;
m_clientIf.fetchVolumeId(m_volInfo, reqReport);
LogContext::ScopedParam sp07(lc, Param("tapebridgeTransId", reqReport.transactionId));
LogContext::ScopedParam sp08(lc, Param("connectDuration", reqReport.connectDuration));
LogContext::ScopedParam sp09(lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
LogContext::ScopedParam sp10(lc, Param("TPVID", m_volInfo.vid));
LogContext::ScopedParam sp11(lc, Param("density", m_volInfo.vid));
LogContext::ScopedParam sp12(lc, Param("label", m_volInfo.vid));
LogContext::ScopedParam sp13(lc, Param("clientType", m_volInfo.vid));
LogContext::ScopedParam sp14(lc, Param("mode", m_volInfo.vid));
lc.log(LOG_INFO, "Got volume from client");
} catch(ClientInterface::EndOfSession & eof) {
ClientInterface::RequestReport reqReport;
std::stringstream fullError("Failed to get volume from client");
fullError << eof.what();
lc.log(LOG_ERR, fullError.str());
m_clientIf.reportEndOfSession(reqReport);
LogContext::ScopedParam sp07(lc, Param("tapebridgeTransId", reqReport.transactionId));
LogContext::ScopedParam sp08(lc, Param("connectDuration", reqReport.connectDuration));
LogContext::ScopedParam sp09(lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
LogContext::ScopedParam sp10(lc, Param("ErrorMsg", fullError.str()));
lc.log(LOG_ERR, "Notified client of end session with error");
return;
}
// Make the TPVID parameter permanent.
LogContext::ScopedParam sp6(lc, Param("TPVID", m_request.dgn));
// Depending on the type of session, branch into the right execution
// TbC...
}
/******************************************************************************
* MountSession.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/tape/legacymsg/RtcpJobRqstMsgBody.hpp"
#include "../../../../castor/log/Logger.hpp"
#include "ClientInterface.hpp"
using namespace castor::tape;
namespace castor {
namespace tape {
namespace server {
/**
* The main class handling a tape session. This is the main container started
* by the master process. It will drive a separate process. Only the sub
* process interface is not included here to allow testability.
*/
class MountSession {
public:
/** Constructor */
MountSession(const legacymsg::RtcpJobRqstMsgBody & clientRequest,
castor::log::Logger & logger);
/** The only method. It will execute (like a task, that it is) */
void execute() throw (Exception);
/** Temporary method used for debugging while building the session class */
std::string getVid() { return m_volInfo.vid; }
private:
legacymsg::RtcpJobRqstMsgBody m_request;
castor::log::Logger & m_logger;
ClientInterface m_clientIf;
ClientInterface::VolumeInfo m_volInfo;
};
}
}
}
/******************************************************************************
* tapeserverdTest.hpp
* tapeserverdTest.cpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
......@@ -26,6 +26,8 @@
#include "ClientSimulator.hpp"
#include "ClientInterface.hpp"
#include "../threading/Threading.hpp"
#include "castor/log/StringLogger.hpp"
#include "MountSession.hpp"
using namespace castor::tape::server;
......@@ -42,7 +44,7 @@ private:
};
TEST(tapeServer, ClientInterfaceGoodDay) {
TEST(tapeServer, MountSessionGoodday) {
// TpcpClients only supports 32 bits session number
// This number has to be less than 2^31 as in addition there is a mix
// of signed and unsigned numbers
......@@ -64,9 +66,14 @@ TEST(tapeServer, ClientInterfaceGoodDay) {
VDQMjob.clientPort = clientAddr.port;
VDQMjob.volReqId = volReq;
// 3) Instantiate the client interface
ClientInterface sess(VDQMjob);
// 3) Prepare the necessary environment (logger), construct and
// run the session.
castor::log::StringLogger logger("tapeServerUnitTest");
MountSession sess(VDQMjob, logger);
sess.execute();
simRun.wait();
std::string temp = logger.getLog();
temp += "";
ASSERT_EQ("V12345", sess.getVid());
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment