Commit a77670bd authored by Eric Cano's avatar Eric Cano
Browse files

Continued development of the MountSession and corresponding test.

Added support for TPconfig, looking for drive and density.
Improved logging and error situations handling.
parent 425ba989
......@@ -31,6 +31,7 @@
#include "castor/tape/tapegateway/EndNotification.hpp"
#include "castor/tape/tapegateway/NotificationAcknowledge.hpp"
#include "castor/tape/utils/Timer.hpp"
#include "castor/tape/utils/utils.hpp"
#include <cxxabi.h>
#include <memory>
#include <stdlib.h>
......@@ -80,7 +81,17 @@ throw (castor::tape::Exception)
// 4) Check we get a response for the request we sent (sanity check)
if ((ret->mountTransactionId() != m_request.volReqId) ||
ret->aggregatorTransactionId() != req.aggregatorTransactionId()) {
std::stringstream mess;
if (ret->mountTransactionId() != m_request.volReqId) {
mess << "In castor::tape::server::clientInterface::requestResponseSession, "
"expected a information about mountSessionId=" << m_request.volReqId
<< " and received: " << ret->mountTransactionId();
} else {
mess << "In castor::tape::server::clientInterface::requestResponseSession, "
"expected a response for tapebridgeTransId=" << req.aggregatorTransactionId()
<< " and received for: " << ret->aggregatorTransactionId();
}
throw UnexpectedResponse(resp.get(), mess.str());
}
// Slightly ugly sequence in order to not duplicate the dynamic_cast.
resp.release();
......@@ -96,7 +107,7 @@ throw (castor::tape::Exception) {
// The transaction id is auto-incremented (through the cast operator)
// so we just need to use it (and save the value locally if we need to reuse
// it)
report.transactionId = m_transactionId;
report.transactionId = ++m_transactionId;
request.setAggregatorTransactionId(report.transactionId);
request.setUnit(m_request.driveUnit);
// 2) get the reply from the client
......@@ -135,7 +146,32 @@ throw (castor::tape::Exception) {
// 1) Build the report
castor::tape::tapegateway::EndNotification endReport;
endReport.setMountTransactionId(m_request.volReqId);
endReport.setAggregatorTransactionId(m_transactionId);
endReport.setAggregatorTransactionId(++m_transactionId);
// 2) Send the report
std::auto_ptr<tapegateway::GatewayMessage> ack(
requestResponseSession(endReport, transactionReport));
// 3) If we did not get a ack, complain (not much more we can do)
// We could use the castor typing here, but we stick to case for homogeneity
// of the code.
try {
dynamic_cast<tapegateway::NotificationAcknowledge &>(*ack.get());
} catch (std::bad_cast) {
throw UnexpectedResponse(ack.get(),
"Unexpected response when reporting end of session");
}
}
void castor::tape::server::ClientInterface::reportEndOfSessionWithError(
RequestReport &transactionReport, const std::string & errorMsg,
int errorCode)
throw (castor::tape::Exception) {
// 1) Build the report
castor::tape::tapegateway::EndNotificationErrorReport endReport;
endReport.setMountTransactionId(m_request.volReqId);
endReport.setAggregatorTransactionId(++m_transactionId);
endReport.setErrorMessage(errorMsg);
endReport.setErrorCode(errorCode);
// 2) Send the report
std::auto_ptr<tapegateway::GatewayMessage> ack(
requestResponseSession(endReport, transactionReport));
......
......@@ -98,6 +98,12 @@ namespace server {
*/
void reportEndOfSession(RequestReport &report) throw (Exception);
/**
* Reports end of session to the client. This should be the last call to
* the client.
*/
void reportEndOfSessionWithError(RequestReport &transactionReport,
const std::string & errorMsg, int errorCode) throw (castor::tape::Exception);
/**
* Exception thrown when the wrong response type was received from
* the client after a request. Extracts the type and prints it.
......@@ -138,7 +144,7 @@ namespace server {
class ThreadSafeCounter {
public:
ThreadSafeCounter(): m_val(0) {};
operator T () {
T operator ++ () {
threading::MutexLocker ml(&m_mutex);
return ++m_val;
}
......
......@@ -33,8 +33,10 @@
using namespace castor::tape::server;
using namespace castor::tape;
ClientSimulator::ClientSimulator(uint32_t volReqId, std::string vid):
TpcpCommand("clientSimulator::clientSimulator"), m_vid(vid)
ClientSimulator::ClientSimulator(uint32_t volReqId, const std::string & vid,
const std::string & density):
TpcpCommand("clientSimulator::clientSimulator"), m_vid(vid),
m_density(density)
{
m_volReqId = volReqId;
setupCallbackSock();
......
......@@ -37,7 +37,8 @@ namespace server {
*/
class ClientSimulator: public tpcp::TpcpCommand {
public:
ClientSimulator(uint32_t volReqId, std::string vid);
ClientSimulator(uint32_t volReqId, const std::string & vid,
const std::string & density);
virtual ~ClientSimulator() throw () {}
......
......@@ -28,16 +28,21 @@
#include "ClientInterface.hpp"
#include "log.h"
#include "stager_client_commandline.h"
#include "castor/tape/utils/utils.hpp"
#include "castor/System.hpp"
#include "h/serrno.h"
using namespace castor::tape::server;
using namespace castor::tape;
using namespace castor::log;
castor::tape::server::MountSession::MountSession(
const legacymsg::RtcpJobRqstMsgBody& clientRequest,
castor::log::Logger& logger):
m_request(clientRequest), m_logger(logger), m_clientIf(clientRequest) {}
castor::log::Logger& logger, System::virtualWrapper & sysWrapper,
const utils::TpconfigLines & tpConfig):
m_request(clientRequest), m_logger(logger), m_clientIf(clientRequest),
m_sysWrapper(sysWrapper), m_tpConfig(tpConfig) {}
void castor::tape::server::MountSession::execute()
void castor::tape::server::MountSession::execute()
throw (castor::tape::Exception) {
// 1) Prepare the logging environment
LogContext lc(m_logger);
......@@ -47,23 +52,24 @@ throw (castor::tape::Exception) {
LogContext::ScopedParam sp04(lc, Param("volReqId", m_request.volReqId));
LogContext::ScopedParam sp05(lc, Param("driveUnit", m_request.driveUnit));
LogContext::ScopedParam sp06(lc, Param("dgn", m_request.dgn));
// 2) Get initial information from the client
// 2a) Get initial information from the client
ClientInterface::RequestReport reqReport;
try {
ClientInterface::RequestReport reqReport;
m_clientIf.fetchVolumeId(m_volInfo, reqReport);
} catch(ClientInterface::EndOfSession & eof) {
std::stringstream fullError("Received end of session from client when requesting Volume");
fullError << eof.what();
lc.log(LOG_ERR, fullError.str());
m_clientIf.reportEndOfSession(reqReport);
LogContext::ScopedParam sp07(lc, Param("tapebridgeTransId", reqReport.transactionId));
LogContext::ScopedParam sp08(lc, Param("connectDuration", reqReport.connectDuration));
LogContext::ScopedParam sp09(lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
LogContext::ScopedParam sp10(lc, Param("TPVID", m_volInfo.vid));
LogContext::ScopedParam sp11(lc, Param("density", m_volInfo.vid));
LogContext::ScopedParam sp12(lc, Param("label", m_volInfo.vid));
LogContext::ScopedParam sp13(lc, Param("clientType", m_volInfo.vid));
LogContext::ScopedParam sp14(lc, Param("mode", m_volInfo.vid));
lc.log(LOG_INFO, "Got volume from client");
} catch(ClientInterface::EndOfSession & eof) {
ClientInterface::RequestReport reqReport;
std::stringstream fullError("Failed to get volume from client");
fullError << eof.what();
LogContext::ScopedParam sp10(lc, Param("ErrorMsg", fullError.str()));
lc.log(LOG_ERR, "Notified client of end session with error");
return;
} catch (ClientInterface::UnexpectedResponse & unexp) {
std::stringstream fullError("Received unexpected response from client when requesting Volume");
fullError << unexp.what();
lc.log(LOG_ERR, fullError.str());
m_clientIf.reportEndOfSession(reqReport);
LogContext::ScopedParam sp07(lc, Param("tapebridgeTransId", reqReport.transactionId));
......@@ -73,9 +79,71 @@ throw (castor::tape::Exception) {
lc.log(LOG_ERR, "Notified client of end session with error");
return;
}
// 2b) ... and log.
// Make the TPVID parameter permanent.
LogContext::ScopedParam sp6(lc, Param("TPVID", m_request.dgn));
LogContext::ScopedParam sp07(lc, Param("TPVID", m_request.dgn));
{
LogContext::ScopedParam sp08(lc, Param("tapebridgeTransId", reqReport.transactionId));
LogContext::ScopedParam sp09(lc, Param("connectDuration", reqReport.connectDuration));
LogContext::ScopedParam sp00(lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
LogContext::ScopedParam sp11(lc, Param("TPVID", m_volInfo.vid));
LogContext::ScopedParam sp12(lc, Param("density", m_volInfo.density));
LogContext::ScopedParam sp13(lc, Param("label", m_volInfo.labelObsolete));
LogContext::ScopedParam sp14(lc, Param("clientType", utils::volumeClientTypeToString(m_volInfo.clientType)));
LogContext::ScopedParam sp15(lc, Param("mode", utils::volumeModeToString(m_volInfo.volumeMode)));
lc.log(LOG_INFO, "Got volume from client");
}
// Depending on the type of session, branch into the right execution
// TbC...
switch(m_volInfo.volumeMode) {
case tapegateway::READ:
executeRead(lc);
return;
case tapegateway::WRITE:
executeWrite(lc);
return;
case tapegateway::DUMP:
executeDump(lc);
return;
}
}
void castor::tape::server::MountSession::executeRead(LogContext & lc) {
// We are ready to start the session. In case of read there is no interest in
// creating the machinery before getting the tape mounted, so do it now.
// 1) Get hold of the drive and check it.
utils::TpconfigLines::iterator i;
for (i = m_tpConfig.begin(); i != m_tpConfig.end(); i++) {
if (i->unitName == m_request.driveUnit && i->density == m_volInfo.density) {
break;
}
}
// If we did not find the drive in the tpConfig, we have a problem
if (i == m_tpConfig.end()) {
LogContext::ScopedParam sp08(lc, Param("density", m_volInfo.density));
lc.log(LOG_ERR, "Drive unit not found");
ClientInterface::RequestReport reqReport;
std::stringstream errMsg("Drive unit not found");
errMsg << lc;
m_clientIf.reportEndOfSessionWithError(reqReport, "Drive unit not found", SEINTERNAL);
LogContext::ScopedParam sp09(lc, Param("tapebridgeTransId", reqReport.transactionId));
LogContext::ScopedParam sp10(lc, Param("connectDuration", reqReport.connectDuration));
LogContext::ScopedParam sp11(lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
LogContext::ScopedParam sp12(lc, Param("errorMessage", errMsg.str()));
LogContext::ScopedParam sp13(lc, Param("errorCode", SEINTERNAL));
lc.log(LOG_ERR, "Notified client of end session with error");
return;
}
}
void castor::tape::server::MountSession::executeWrite(LogContext & lc) {
}
void castor::tape::server::MountSession::executeDump(LogContext & lc) {
// We are ready to start the session. In case of read there is no interest in
// creating the machinery before getting the tape mounted, so do it now.
// 1) Get hold of the drive and check it.
}
......@@ -25,10 +25,14 @@
#pragma once
#include "castor/tape/legacymsg/RtcpJobRqstMsgBody.hpp"
#include "../../../../castor/log/Logger.hpp"
#include "castor/log/Logger.hpp"
#include "castor/log/LogContext.hpp"
#include "castor/tape/tapeserver/system/Wrapper.hpp"
#include "castor/tape/utils/utils.hpp"
#include "ClientInterface.hpp"
using namespace castor::tape;
using namespace castor::log;
namespace castor {
namespace tape {
......@@ -42,7 +46,8 @@ namespace server {
public:
/** Constructor */
MountSession(const legacymsg::RtcpJobRqstMsgBody & clientRequest,
castor::log::Logger & logger);
castor::log::Logger & logger, System::virtualWrapper & sysWrapper,
const utils::TpconfigLines & tpConfig);
/** The only method. It will execute (like a task, that it is) */
void execute() throw (Exception);
/** Temporary method used for debugging while building the session class */
......@@ -52,6 +57,14 @@ namespace server {
castor::log::Logger & m_logger;
ClientInterface m_clientIf;
ClientInterface::VolumeInfo m_volInfo;
System::virtualWrapper & m_sysWrapper;
utils::TpconfigLines m_tpConfig;
/** sub-part of execute for the read sessions */
void executeRead(LogContext & lc);
/** sub-part of execute for a write session */
void executeWrite(LogContext & lc);
/** sub-part of execute for a dump session */
void executeDump(LogContext & lc);
};
}
}
......
......@@ -28,6 +28,8 @@
#include "../threading/Threading.hpp"
#include "castor/log/StringLogger.hpp"
#include "MountSession.hpp"
#include "../system/Wrapper.hpp"
#include "Ctape.h"
using namespace castor::tape::server;
......@@ -52,7 +54,8 @@ TEST(tapeServer, MountSessionGoodday) {
// 1) prepare the client and run it in another thread
uint32_t volReq = 0xBEEF;
std::string vid = "V12345";
ClientSimulator sim(volReq, vid);
std::string density = "8000GC";
ClientSimulator sim(volReq, vid, density);
struct ClientSimulator::ipPort clientAddr = sim.getCallbackAddress();
clientRunner simRun(sim);
simRun.start();
......@@ -61,15 +64,23 @@ TEST(tapeServer, MountSessionGoodday) {
castor::tape::legacymsg::RtcpJobRqstMsgBody VDQMjob;
snprintf(VDQMjob.clientHost, CA_MAXHOSTNAMELEN+1, "%d.%d.%d.%d",
clientAddr.a, clientAddr.b, clientAddr.c, clientAddr.d);
snprintf(VDQMjob.driveUnit, CA_MAXUNMLEN+1, "/dev/nstXXX");
snprintf(VDQMjob.dgn, CA_MAXDGNLEN+1, "TAPELIBX");
snprintf(VDQMjob.driveUnit, CA_MAXUNMLEN+1, "T10D6116");
snprintf(VDQMjob.dgn, CA_MAXDGNLEN+1, "LIBXX");
VDQMjob.clientPort = clientAddr.port;
VDQMjob.volReqId = volReq;
// 3) Prepare the necessary environment (logger), construct and
// run the session.
// 3) Prepare the necessary environment (logger, plus system wrapper),
// construct and run the session.
castor::log::StringLogger logger("tapeServerUnitTest");
MountSession sess(VDQMjob, logger);
castor::tape::System::mockWrapper mockSys;
mockSys.fake.setupForVirtualDriveSLC6();
utils::TpconfigLines tpConfig;
// Actual TPCONFIG lifted from prod
tpConfig.push_back(utils::TpconfigLine("T10D6116", "T10KD6",
"/dev/tape_T10D6116", "8000GC", "down", "acs0,1,1,6", "T10000"));
tpConfig.push_back(utils::TpconfigLine("T10D6116", "T10KD6",
"/dev/tape_T10D6116", "5000GC", "down", "acs0,1,1,6", "T10000"));
MountSession sess(VDQMjob, logger, mockSys, tpConfig);
sess.execute();
simRun.wait();
std::string temp = logger.getLog();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment