Commit 26d119ac authored by Eric Cano's avatar Eric Cano
Browse files

Implemented the ClientInterface.

The client interface now manages all requests and reports for the mountSession and
sub classes. Most request reply sequences are unit tested.
parent db41f6bf
......@@ -232,17 +232,107 @@ throw (castor::tape::Exception) {
}
void castor::tape::tapeserver::daemon::ClientInterface::reportMigrationResults(
RequestReport& report) throw (castor::tape::Exception) {
throw castor::tape::Exception("reportMigrationResults to be implemented");
tapegateway::FileMigrationReportList & migrationReport,
RequestReport& report) throw (castor::tape::Exception) {
// 1) The request is provided already fleshed out by the user. We have to
// add the administrative numbering
migrationReport.setMountTransactionId(m_request.volReqId);
report.transactionId = ++m_transactionId;
migrationReport.setAggregatorTransactionId(report.transactionId);
// The next 2 parameters are currently set to hardcoded defaults (as were in
// the tape bridge). They were created in prevision of an evolution where
// mode responsibility of central servers updates was to be pushed to the
// tape server.
migrationReport.setFseqSet(false);
migrationReport.setFseq(0);
// 2) Exchange messages with the server
std::auto_ptr<tapegateway::GatewayMessage> resp(
requestResponseSession(migrationReport, report));
// 3) We expect 2 types of return messages: NotificationAcknowledge and
// EndNotificationErrorReport.
// 3a) Handle the NotificationAcknowledge
try {
dynamic_cast<tapegateway::NotificationAcknowledge &>(*resp);
return;
} catch (std::bad_cast) {}
// 3b) Handle the end notification error report and turn it into a bare
// tape::exception
try {
tapegateway::EndNotificationErrorReport & err =
dynamic_cast<tapegateway::EndNotificationErrorReport &> (*resp);
std::stringstream mess("End notification report: errorMessage=\"");
mess << err.errorMessage() << "\" errorCode=" << err.errorCode();
throw castor::tape::Exception(mess.str());
} catch (std::bad_cast) {
throw UnexpectedResponse(resp.get(),
"Unexpected response to FileMigrationReportList in reportMigrationResults");
}
}
void castor::tape::tapeserver::daemon::ClientInterface::getFilesToRecall(
tapegateway::FilesToRecallList *
castor::tape::tapeserver::daemon::ClientInterface::getFilesToRecall(
uint64_t files, uint64_t bytes, RequestReport& report)
throw (castor::tape::Exception) {
throw castor::tape::Exception("getFilesToRecall to be implemented");
// 1) Build the request
castor::tape::tapegateway::FilesToRecallListRequest ftrReq;
report.transactionId = ++m_transactionId;
ftrReq.setMountTransactionId(m_request.volReqId);
ftrReq.setAggregatorTransactionId(report.transactionId);
ftrReq.setMaxBytes(files);
ftrReq.setMaxBytes(bytes);
// 2) Exchange messages with the server
std::auto_ptr<tapegateway::GatewayMessage> resp(
requestResponseSession(ftrReq, report));
// 3) We expect either a NoMoreFiles response or FilesToRecallList
// 3a) Handle the FilesToRecallListl
try {
tapegateway::FilesToRecallList & ftr =
dynamic_cast <tapegateway::FilesToRecallList &>(*resp);
if (ftr.filesToRecall().size()) {
resp.release();
return &ftr;
} else {
return NULL;
}
} catch (std::bad_cast) {}
// 3b) Try again with NoMoreFiles (and this time failure is fatal)
try {
dynamic_cast<tapegateway::NoMoreFiles &>(*resp);
return NULL;
} catch (std::bad_cast) {
throw UnexpectedResponse(resp.get(),
"Unexpected response to FilesToRecallListRequest in getFilesToRecall");
}
}
void castor::tape::tapeserver::daemon::ClientInterface::reportRecallResults(
tapegateway::FileRecallReportList & recallReport,
RequestReport& report) throw (castor::tape::Exception) {
throw castor::tape::Exception("reportRecallResult to be implemented");
// 1) The request is provided already fleshed out by the user. We have to
// add the administrative numbering
recallReport.setMountTransactionId(m_request.volReqId);
report.transactionId = ++m_transactionId;
recallReport.setAggregatorTransactionId(report.transactionId);
// 2) Exchange messages with the server
std::auto_ptr<tapegateway::GatewayMessage> resp(
requestResponseSession(recallReport, report));
// 3) We expect 2 types of return messages: NotificationAcknowledge and
// EndNotificationErrorReport.
// 3a) Handle the NotificationAcknowledge
try {
dynamic_cast<tapegateway::NotificationAcknowledge &>(*resp);
return;
} catch (std::bad_cast) {}
// 3b) Handle the end notification error report and turn it into a bare
// tape::exception
try {
tapegateway::EndNotificationErrorReport & err =
dynamic_cast<tapegateway::EndNotificationErrorReport &> (*resp);
std::stringstream mess("End notification report: errorMessage=\"");
mess << err.errorMessage() << "\" errorCode=" << err.errorCode();
throw castor::tape::Exception(mess.str());
} catch (std::bad_cast) {
throw UnexpectedResponse(resp.get(),
"Unexpected response to FileRecallReportList in reportRecallResults");
}
}
......@@ -30,6 +30,9 @@
#include "castor/tape/tapegateway/ClientType.hpp"
#include "castor/tape/tapegateway/VolumeMode.hpp"
#include "castor/tape/tapegateway/FilesToMigrateList.hpp"
#include "castor/tape/tapegateway/FileMigrationReportList.hpp"
#include "castor/tape/tapegateway/FilesToRecallList.hpp"
#include "castor/tape/tapegateway/FileRecallReportList.hpp"
#include "../threading/Threading.hpp"
using namespace castor::tape;
......@@ -134,7 +137,8 @@ namespace daemon {
* Detailed interface is still TBD.
* @param report Placeholder to network timing information
*/
void reportMigrationResults(RequestReport &report) throw (castor::tape::Exception);
void reportMigrationResults(tapegateway::FileMigrationReportList & migrationReport,
RequestReport &report) throw (castor::tape::Exception);
/**
* Asks the the client for files to recall, with at least files files, or
......@@ -146,7 +150,8 @@ namespace daemon {
* populated during the call and used by the caller to log performance
* and context information
*/
void getFilesToRecall(uint64_t files, uint64_t bytes, RequestReport &report)
tapegateway::FilesToRecallList * getFilesToRecall(uint64_t files,
uint64_t bytes, RequestReport &report)
throw (castor::tape::Exception);
/**
......@@ -154,7 +159,8 @@ namespace daemon {
* Detailed interface is still TBD.
* @param report Placeholder to network timing information
*/
void reportRecallResults(RequestReport &report) throw (castor::tape::Exception);
void reportRecallResults(tapegateway::FileRecallReportList & recallReport,
RequestReport &report) throw (castor::tape::Exception);
/**
* Exception thrown when the wrong response type was received from
* the client after a request. Extracts the type and prints it.
......
......@@ -28,6 +28,8 @@
#include <memory>
#include "castor/tape/tapegateway/FileToMigrateStruct.hpp"
#include "castor/tape/tapegateway/FilesToMigrateList.hpp"
#include "castor/tape/tapegateway/FileToRecallStruct.hpp"
#include "castor/tape/tapegateway/FilesToRecallList.hpp"
namespace castor {
namespace tape {
......@@ -159,6 +161,30 @@ namespace daemon {
repl.filesToMigrate().push_back(new FileToMigrateStruct());
clientConnection->sendObject(repl);
}
/**
* Specific version for the FilesToRecallList: we want
* to pass a non empty list, so it has to be a little bit less trivial.
*/
template<>
void ClientSimSingleReply<castor::tape::tapegateway::FilesToRecallList>::processFirstRequest()
throw(castor::exception::Exception) {
using namespace castor::tape::tapegateway;
// Accept the next connection
std::auto_ptr<castor::io::ServerSocket> clientConnection(m_callbackSock.accept());
// Read in the message sent by the tapebridge
std::auto_ptr<castor::IObject> obj(clientConnection->readObject());
// Convert to a gateway message (for transactionId)
GatewayMessage & gm =
dynamic_cast<tapegateway::GatewayMessage &>(*obj);
// Reply with our own type and transaction id
FilesToRecallList repl;
repl.setAggregatorTransactionId(gm.aggregatorTransactionId() ^
(m_breakTransaction?666:0));
repl.setMountTransactionId(m_volReqId);
repl.filesToRecall().push_back(new FileToRecallStruct());
clientConnection->sendObject(repl);
}
}
}
}
......
......@@ -505,4 +505,279 @@ TEST(tapeServerClientInterface, FilesToMigrateReqFilesToMigrateSeqBreak) {
csVolRun.wait();
}
TEST(tapeServerClientInterface, MigrationReportNotifAck) {
using namespace castor::tape::tapegateway;
// Create the "client" that will just reply a scripted reply
uint32_t volReq = 0xBEEF;
std::string vid = "V12345";
std::string density = "8000GC";
ClientSimSingleReply<NotificationAcknowledge> csVol(volReq, vid, density);
struct ClientSimSingleReply<NotificationAcknowledge>::ipPort clientAddr =
csVol.getCallbackAddress();
clientSingleReplRunner<NotificationAcknowledge> csVolRun(csVol);
csVolRun.start();
// Setup a clientInterface to talk to it
castor::tape::legacymsg::RtcpJobRqstMsgBody VDQMjob;
snprintf(VDQMjob.clientHost, CA_MAXHOSTNAMELEN+1, "%d.%d.%d.%d",
clientAddr.a, clientAddr.b, clientAddr.c, clientAddr.d);
snprintf(VDQMjob.driveUnit, CA_MAXUNMLEN+1, "T10D6116");
snprintf(VDQMjob.dgn, CA_MAXDGNLEN+1, "LIBXX");
VDQMjob.clientPort = clientAddr.port;
VDQMjob.volReqId = volReq;
// Setup an interface to it.
ClientInterface cInterf(VDQMjob);
ClientInterface::VolumeInfo volInfo;
ClientInterface::RequestReport reqRep;
// We have to create the migration report
FileMigrationReportList migRep;
ASSERT_NO_THROW(cInterf.reportMigrationResults(migRep, reqRep));
// Cleanup
csVolRun.wait();
}
TEST(tapeServerClientInterface, MigrationReportEndNotifError) {
using namespace castor::tape::tapegateway;
// Create the "client" that will just reply a scripted reply
uint32_t volReq = 0xBEEF;
std::string vid = "V12345";
std::string density = "8000GC";
ClientSimSingleReply<EndNotificationErrorReport> csVol(volReq, vid, density);
struct ClientSimSingleReply<EndNotificationErrorReport>::ipPort clientAddr =
csVol.getCallbackAddress();
clientSingleReplRunner<EndNotificationErrorReport> csVolRun(csVol);
csVolRun.start();
// Setup a clientInterface to talk to it
castor::tape::legacymsg::RtcpJobRqstMsgBody VDQMjob;
snprintf(VDQMjob.clientHost, CA_MAXHOSTNAMELEN+1, "%d.%d.%d.%d",
clientAddr.a, clientAddr.b, clientAddr.c, clientAddr.d);
snprintf(VDQMjob.driveUnit, CA_MAXUNMLEN+1, "T10D6116");
snprintf(VDQMjob.dgn, CA_MAXDGNLEN+1, "LIBXX");
VDQMjob.clientPort = clientAddr.port;
VDQMjob.volReqId = volReq;
// Setup an interface to it.
ClientInterface cInterf(VDQMjob);
ClientInterface::VolumeInfo volInfo;
ClientInterface::RequestReport reqRep;
// We have to create the migration report
FileMigrationReportList migRep;
ASSERT_THROW(cInterf.reportMigrationResults(migRep, reqRep), castor::tape::Exception);
// Cleanup
csVolRun.wait();
}
TEST(tapeServerClientInterface, MigrationReportNotifAckSeqBreak) {
using namespace castor::tape::tapegateway;
// Create the "client" that will just reply a scripted reply
uint32_t volReq = 0xBEEF;
std::string vid = "V12345";
std::string density = "8000GC";
ClientSimSingleReply<NotificationAcknowledge> csVol(volReq, vid, density, true);
struct ClientSimSingleReply<NotificationAcknowledge>::ipPort clientAddr =
csVol.getCallbackAddress();
clientSingleReplRunner<NotificationAcknowledge> csVolRun(csVol);
csVolRun.start();
// Setup a clientInterface to talk to it
castor::tape::legacymsg::RtcpJobRqstMsgBody VDQMjob;
snprintf(VDQMjob.clientHost, CA_MAXHOSTNAMELEN+1, "%d.%d.%d.%d",
clientAddr.a, clientAddr.b, clientAddr.c, clientAddr.d);
snprintf(VDQMjob.driveUnit, CA_MAXUNMLEN+1, "T10D6116");
snprintf(VDQMjob.dgn, CA_MAXDGNLEN+1, "LIBXX");
VDQMjob.clientPort = clientAddr.port;
VDQMjob.volReqId = volReq;
// Setup an interface to it.
ClientInterface cInterf(VDQMjob);
ClientInterface::VolumeInfo volInfo;
ClientInterface::RequestReport reqRep;
// We have to create the migration report
FileMigrationReportList migRep;
ASSERT_THROW(cInterf.reportMigrationResults(migRep, reqRep),
ClientInterface::UnexpectedResponse);
// Cleanup
csVolRun.wait();
}
TEST(tapeServerClientInterface, FilesToRecallReqFilesToMigrate) {
using namespace castor::tape::tapegateway;
// Create the "client" that will just reply a scripted reply
uint32_t volReq = 0xBEEF;
std::string vid = "V12345";
std::string density = "8000GC";
ClientSimSingleReply<FilesToRecallList> csVol(volReq, vid, density);
struct ClientSimSingleReply<FilesToRecallList>::ipPort clientAddr =
csVol.getCallbackAddress();
clientSingleReplRunner<FilesToRecallList> csVolRun(csVol);
csVolRun.start();
// Setup a clientInterface to talk to it
castor::tape::legacymsg::RtcpJobRqstMsgBody VDQMjob;
snprintf(VDQMjob.clientHost, CA_MAXHOSTNAMELEN+1, "%d.%d.%d.%d",
clientAddr.a, clientAddr.b, clientAddr.c, clientAddr.d);
snprintf(VDQMjob.driveUnit, CA_MAXUNMLEN+1, "T10D6116");
snprintf(VDQMjob.dgn, CA_MAXDGNLEN+1, "LIBXX");
VDQMjob.clientPort = clientAddr.port;
VDQMjob.volReqId = volReq;
// Setup an interface to it.
ClientInterface cInterf(VDQMjob);
ClientInterface::VolumeInfo volInfo;
ClientInterface::RequestReport reqRep;
// We are responsible for the freeing of the result: chuck it into an auto_ptr.
std::auto_ptr<FilesToRecallList> resp;
ASSERT_NO_THROW(resp.reset(cInterf.getFilesToRecall(10, 10, reqRep)));
ASSERT_NE((FilesToRecallList*)NULL, resp.get());
// Cleanup
csVolRun.wait();
}
TEST(tapeServerClientInterface, FilesToRecallReqNoMore) {
using namespace castor::tape::tapegateway;
// Create the "client" that will just reply a scripted reply
uint32_t volReq = 0xBEEF;
std::string vid = "V12345";
std::string density = "8000GC";
ClientSimSingleReply<NoMoreFiles> csVol(volReq, vid, density);
struct ClientSimSingleReply<NoMoreFiles>::ipPort clientAddr =
csVol.getCallbackAddress();
clientSingleReplRunner<NoMoreFiles> csVolRun(csVol);
csVolRun.start();
// Setup a clientInterface to talk to it
castor::tape::legacymsg::RtcpJobRqstMsgBody VDQMjob;
snprintf(VDQMjob.clientHost, CA_MAXHOSTNAMELEN+1, "%d.%d.%d.%d",
clientAddr.a, clientAddr.b, clientAddr.c, clientAddr.d);
snprintf(VDQMjob.driveUnit, CA_MAXUNMLEN+1, "T10D6116");
snprintf(VDQMjob.dgn, CA_MAXDGNLEN+1, "LIBXX");
VDQMjob.clientPort = clientAddr.port;
VDQMjob.volReqId = volReq;
// Setup an interface to it.
ClientInterface cInterf(VDQMjob);
ClientInterface::VolumeInfo volInfo;
ClientInterface::RequestReport reqRep;
// We are responsible for the freeing of the result: chuck it into an auto_ptr.
std::auto_ptr<FilesToRecallList> resp;
ASSERT_NO_THROW(resp.reset(cInterf.getFilesToRecall(10, 10, reqRep)));
ASSERT_EQ((FilesToRecallList*)NULL, resp.get());
// Cleanup
csVolRun.wait();
}
TEST(tapeServerClientInterface, FilesToRecallReqFilesToMigrateSeqBreak) {
using namespace castor::tape::tapegateway;
// Create the "client" that will just reply a scripted reply
uint32_t volReq = 0xBEEF;
std::string vid = "V12345";
std::string density = "8000GC";
ClientSimSingleReply<FilesToRecallList> csVol(volReq, vid, density, true);
struct ClientSimSingleReply<FilesToRecallList>::ipPort clientAddr =
csVol.getCallbackAddress();
clientSingleReplRunner<FilesToRecallList> csVolRun(csVol);
csVolRun.start();
// Setup a clientInterface to talk to it
castor::tape::legacymsg::RtcpJobRqstMsgBody VDQMjob;
snprintf(VDQMjob.clientHost, CA_MAXHOSTNAMELEN+1, "%d.%d.%d.%d",
clientAddr.a, clientAddr.b, clientAddr.c, clientAddr.d);
snprintf(VDQMjob.driveUnit, CA_MAXUNMLEN+1, "T10D6116");
snprintf(VDQMjob.dgn, CA_MAXDGNLEN+1, "LIBXX");
VDQMjob.clientPort = clientAddr.port;
VDQMjob.volReqId = volReq;
// Setup an interface to it.
ClientInterface cInterf(VDQMjob);
ClientInterface::VolumeInfo volInfo;
ClientInterface::RequestReport reqRep;
// We are responsible for the freeing of the result: chuck it into an auto_ptr.
std::auto_ptr<FilesToRecallList> resp;
ASSERT_THROW(resp.reset(cInterf.getFilesToRecall(10, 10, reqRep)),ClientInterface::UnexpectedResponse);
// Cleanup
csVolRun.wait();
}
TEST(tapeServerClientInterface, RecallReportNotifAck) {
using namespace castor::tape::tapegateway;
// Create the "client" that will just reply a scripted reply
uint32_t volReq = 0xBEEF;
std::string vid = "V12345";
std::string density = "8000GC";
ClientSimSingleReply<NotificationAcknowledge> csVol(volReq, vid, density);
struct ClientSimSingleReply<NotificationAcknowledge>::ipPort clientAddr =
csVol.getCallbackAddress();
clientSingleReplRunner<NotificationAcknowledge> csVolRun(csVol);
csVolRun.start();
// Setup a clientInterface to talk to it
castor::tape::legacymsg::RtcpJobRqstMsgBody VDQMjob;
snprintf(VDQMjob.clientHost, CA_MAXHOSTNAMELEN+1, "%d.%d.%d.%d",
clientAddr.a, clientAddr.b, clientAddr.c, clientAddr.d);
snprintf(VDQMjob.driveUnit, CA_MAXUNMLEN+1, "T10D6116");
snprintf(VDQMjob.dgn, CA_MAXDGNLEN+1, "LIBXX");
VDQMjob.clientPort = clientAddr.port;
VDQMjob.volReqId = volReq;
// Setup an interface to it.
ClientInterface cInterf(VDQMjob);
ClientInterface::VolumeInfo volInfo;
ClientInterface::RequestReport reqRep;
// We have to create the migration report
FileRecallReportList recallRep;
ASSERT_NO_THROW(cInterf.reportRecallResults(recallRep, reqRep));
// Cleanup
csVolRun.wait();
}
TEST(tapeServerClientInterface, RecallReportEndNotifError) {
using namespace castor::tape::tapegateway;
// Create the "client" that will just reply a scripted reply
uint32_t volReq = 0xBEEF;
std::string vid = "V12345";
std::string density = "8000GC";
ClientSimSingleReply<EndNotificationErrorReport> csVol(volReq, vid, density);
struct ClientSimSingleReply<EndNotificationErrorReport>::ipPort clientAddr =
csVol.getCallbackAddress();
clientSingleReplRunner<EndNotificationErrorReport> csVolRun(csVol);
csVolRun.start();
// Setup a clientInterface to talk to it
castor::tape::legacymsg::RtcpJobRqstMsgBody VDQMjob;
snprintf(VDQMjob.clientHost, CA_MAXHOSTNAMELEN+1, "%d.%d.%d.%d",
clientAddr.a, clientAddr.b, clientAddr.c, clientAddr.d);
snprintf(VDQMjob.driveUnit, CA_MAXUNMLEN+1, "T10D6116");
snprintf(VDQMjob.dgn, CA_MAXDGNLEN+1, "LIBXX");
VDQMjob.clientPort = clientAddr.port;
VDQMjob.volReqId = volReq;
// Setup an interface to it.
ClientInterface cInterf(VDQMjob);
ClientInterface::VolumeInfo volInfo;
ClientInterface::RequestReport reqRep;
// We have to create the migration report
FileRecallReportList recallRep;
ASSERT_THROW(cInterf.reportRecallResults(recallRep, reqRep), castor::tape::Exception);
// Cleanup
csVolRun.wait();
}
TEST(tapeServerClientInterface, RecallReportNotifAckSeqBreak) {
using namespace castor::tape::tapegateway;
// Create the "client" that will just reply a scripted reply
uint32_t volReq = 0xBEEF;
std::string vid = "V12345";
std::string density = "8000GC";
ClientSimSingleReply<NotificationAcknowledge> csVol(volReq, vid, density, true);
struct ClientSimSingleReply<NotificationAcknowledge>::ipPort clientAddr =
csVol.getCallbackAddress();
clientSingleReplRunner<NotificationAcknowledge> csVolRun(csVol);
csVolRun.start();
// Setup a clientInterface to talk to it
castor::tape::legacymsg::RtcpJobRqstMsgBody VDQMjob;
snprintf(VDQMjob.clientHost, CA_MAXHOSTNAMELEN+1, "%d.%d.%d.%d",
clientAddr.a, clientAddr.b, clientAddr.c, clientAddr.d);
snprintf(VDQMjob.driveUnit, CA_MAXUNMLEN+1, "T10D6116");
snprintf(VDQMjob.dgn, CA_MAXDGNLEN+1, "LIBXX");
VDQMjob.clientPort = clientAddr.port;
VDQMjob.volReqId = volReq;
// Setup an interface to it.
ClientInterface cInterf(VDQMjob);
ClientInterface::VolumeInfo volInfo;
ClientInterface::RequestReport reqRep;
// We have to create the migration report
FileRecallReportList recallRep;
ASSERT_THROW(cInterf.reportRecallResults(recallRep, reqRep),
ClientInterface::UnexpectedResponse);
// Cleanup
csVolRun.wait();
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment