Commit 763314f8 authored by Eric Cano's avatar Eric Cano
Browse files

Added support for migrations to the ClientSimualtor.

The session mode (and client type) is now construction parameter.
We now support migration requests as well.
A new facility records the checkums received in migration notifications.
parent 2547718e
......@@ -36,6 +36,8 @@
#include "castor/tape/tapegateway/FilesToRecallList.hpp"
#include "castor/tape/tapegateway/NoMoreFiles.hpp"
#include "castor/tape/tapeserver/daemon/ReportPackerInterface.hpp"
#include "castor/tape/tapegateway/FilesToMigrateListRequest.hpp"
#include "castor/tape/tapegateway/FileMigrationReportList.hpp"
namespace castor {
namespace tape {
......@@ -45,9 +47,10 @@ namespace client {
using namespace castor::tape;
ClientSimulator::ClientSimulator(uint32_t volReqId, const std::string & vid,
const std::string & density):
const std::string & density, tapegateway::ClientType clientType,
tapegateway::VolumeMode volumeMode):
TpcpCommand("clientSimulator::clientSimulator"), m_vid(vid),
m_density(density)
m_density(density), m_clientType(clientType), m_volumeMode(volumeMode)
{
m_volReqId = volReqId;
setupCallbackSock();
......@@ -93,8 +96,8 @@ throw (castor::exception::Exception) {
tapegateway::Volume vol;
vol.setAggregatorTransactionId(vReq.aggregatorTransactionId());
vol.setVid(m_vid);
vol.setClientType(tapegateway::READ_TP);
vol.setMode(castor::tape::tapegateway::READ);
vol.setClientType(m_clientType);
vol.setMode(m_volumeMode);
vol.setLabel(m_volLabel);
vol.setMountTransactionId(m_volReqId);
vol.setDensity(m_density);
......@@ -160,11 +163,11 @@ throw (castor::exception::Exception) {
clientConnection->sendObject(noMore);
return true; // The end of session is not signalled here
}
} catch (std::bad_cast) {}
} catch (std::bad_cast&) {}
// Process the recall reports
try {
// Check that we
// Check that we get a recall report and simply acknowledge
tapegateway::FileRecallReportList & req =
dynamic_cast<tapegateway::FileRecallReportList &> (*obj);
tapegateway::NotificationAcknowledge reply;
......@@ -172,9 +175,43 @@ throw (castor::exception::Exception) {
reply.setAggregatorTransactionId(req.aggregatorTransactionId());
clientConnection->sendObject(reply);
return true; // The end of session is not signalled here
} catch (std::bad_cast) {}
} catch (std::bad_cast&) {}
// TODO Same for migrations requests should go here....
// Handle request for more work (migration)
try {
tapegateway::FilesToMigrateListRequest & req =
dynamic_cast<tapegateway::FilesToMigrateListRequest &> (*obj);
uint32_t files = 0;
uint64_t bytes = 0;
tapegateway::FilesToMigrateList reply;
while (files < req.maxFiles() && bytes < req.maxFiles()) {
if (m_filesToMigrate.size()) {
files++;
std::auto_ptr<tapegateway::FileToMigrateStruct> ftm(new tapegateway::FileToMigrateStruct);
*ftm = m_filesToMigrate.front();
bytes += ftm->fileSize();
reply.filesToMigrate().push_back(ftm.release());
m_filesToMigrate.pop();
} else {
break;
}
}
// At this point, the reply should be populated with all the files asked for
// (or as much as we could provide).
// If not, we should send a NoMoreFiles instead of if.
if(reply.filesToMigrate().size()) {
reply.setAggregatorTransactionId(msg.aggregatorTransactionId());
reply.setMountTransactionId(m_volReqId);
clientConnection->sendObject(reply);
return true; // The end of session is not signalled here
} else {
tapegateway::NoMoreFiles noMore;
noMore.setAggregatorTransactionId(msg.aggregatorTransactionId());
noMore.setMountTransactionId(m_volReqId);
clientConnection->sendObject(noMore);
return true; // The end of session is not signalled here
}
} catch(std::bad_cast&) {}
// Process the migration reports
try {
......@@ -184,6 +221,13 @@ throw (castor::exception::Exception) {
reply.setMountTransactionId(m_volReqId);
reply.setAggregatorTransactionId(req.aggregatorTransactionId());
clientConnection->sendObject(reply);
req.successfulMigrations();
// We will now record the fseqs and checksums reported:
for(std::vector<tapegateway::FileMigratedNotificationStruct*>::iterator i =
req.successfulMigrations().begin(); i!=req.successfulMigrations().end();
i++) {
m_receivedChecksums[(*i)->fseq()] = (*i)->checksum();
}
return true; // The end of session is not signalled here
} catch (std::bad_cast) {}
......
......@@ -26,6 +26,7 @@
#include "castor/tape/tpcp/TpcpCommand.hpp"
#include "castor/tape/tapegateway/FileToRecallStruct.hpp"
#include "castor/tape/tapegateway/FileToMigrateStruct.hpp"
#include <queue>
namespace castor {
......@@ -41,7 +42,8 @@ namespace client {
class ClientSimulator: public tpcp::TpcpCommand {
public:
ClientSimulator(uint32_t volReqId, const std::string & vid,
const std::string & density);
const std::string & density, tapegateway::ClientType clientType,
tapegateway::VolumeMode volumeMode);
virtual ~ClientSimulator() throw () {}
......@@ -76,7 +78,15 @@ namespace client {
m_filesToRecall.push(ftr);
m_recallSizes.push(size);
}
void addFileToMigrate(tapegateway::FileToMigrateStruct & ftm) {
m_filesToMigrate.push(ftm);
}
/**
* Container where the migration result pairs (fseq, checksum) are
* stored.
*/
std::map<uint64_t, uint64_t> m_receivedChecksums;
protected:
// Place holders for pure virtual members of TpcpCommand we don't
// use in the simulator
......@@ -116,11 +126,14 @@ namespace client {
const std::string &errorMessage,
castor::io::AbstractSocket &sock)
throw();
std::string m_vid;
std::string m_volLabel;
std::string m_density;
const std::string m_vid;
const std::string m_volLabel;
const std::string m_density;
std::queue<tapegateway::FileToRecallStruct> m_filesToRecall;
std::queue<uint64_t> m_recallSizes;
std::queue<tapegateway::FileToMigrateStruct> m_filesToMigrate;
const castor::tape::tapegateway::ClientType m_clientType;
const castor::tape::tapegateway::VolumeMode m_volumeMode;
};
}
}
......
......@@ -68,7 +68,8 @@ TEST(tapeServer, MountSessionGoodday) {
uint32_t volReq = 0xBEEF;
std::string vid = "V12345";
std::string density = "8000GC";
client::ClientSimulator sim(volReq, vid, density);
client::ClientSimulator sim(volReq, vid, density, tapegateway::READ_TP,
tapegateway::READ);
client::ClientSimulator::ipPort clientAddr = sim.getCallbackAddress();
clientRunner simRun(sim);
simRun.start();
......@@ -159,7 +160,8 @@ TEST(tapeServer, MountSessionNoSuchDrive) {
uint32_t volReq = 0xBEEF;
std::string vid = "V12345";
std::string density = "8000GC";
client::ClientSimulator sim(volReq, vid, density);
client::ClientSimulator sim(volReq, vid, density, tapegateway::READ_TP,
tapegateway::READ);
client::ClientSimulator::ipPort clientAddr = sim.getCallbackAddress();
clientRunner simRun(sim);
simRun.start();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment