Commit e616b05d authored by Eric Cano's avatar Eric Cano
Browse files

Created a unit test for validating the propagation of end of tape if the error...

Created a unit test for validating the propagation of end of tape if the error happens during a flush.
Updated the FakeDrive to generate the error.
Added support for the propagation of ENOSPC error when it happens durring a flush.
Added support for end of session status recording by the FakeClient and checks in the related unit tests.
parent f608e4fe
......@@ -50,8 +50,9 @@ using namespace castor::tape;
ClientSimulator::ClientSimulator(uint32_t volReqId, const std::string & vid,
const std::string & density, tapegateway::ClientType clientType,
tapegateway::VolumeMode volumeMode):
TpcpCommand("clientSimulator::clientSimulator"), m_vid(vid),
m_density(density), m_clientType(clientType), m_volumeMode(volumeMode)
TpcpCommand("clientSimulator::clientSimulator"), m_sessionErrorCode(0),
m_vid(vid), m_density(density), m_clientType(clientType),
m_volumeMode(volumeMode)
{
m_volReqId = volReqId;
setupCallbackSock();
......@@ -247,7 +248,9 @@ bool ClientSimulator::processOneRequest()
(void)dynamic_cast<tapegateway::EndNotification &> (*obj);
} catch (std::bad_cast&) {
try {
(void)dynamic_cast<tapegateway::EndNotificationErrorReport &> (*obj);
tapegateway::EndNotificationErrorReport & enr =
dynamic_cast<tapegateway::EndNotificationErrorReport &> (*obj);
m_sessionErrorCode = enr.errorCode();
} catch (std::bad_cast&) {
std::stringstream oss;
oss <<
......
......@@ -91,6 +91,10 @@ namespace client {
* stored.
*/
std::map<uint64_t, uint64_t> m_receivedErrorCodes;
/**
* The resulting error code of the session.
*/
int m_sessionErrorCode;
protected:
// Place holders for pure virtual members of TpcpCommand we don't
// use in the simulator
......
......@@ -173,6 +173,7 @@ TEST(tapeServer, DataTransferSessionGooddayRecall) {
std::string temp = logger.getLog();
temp += "";
ASSERT_EQ("V12345", sess.getVid());
ASSERT_EQ(0, sim.m_sessionErrorCode);
}
TEST(tapeServer, DataTransferSessionWrongRecall) {
......@@ -277,6 +278,7 @@ TEST(tapeServer, DataTransferSessionWrongRecall) {
std::string temp = logger.getLog();
temp += "";
ASSERT_EQ("V12345", sess.getVid());
ASSERT_EQ(SEINTERNAL, sim.m_sessionErrorCode);
}
TEST(tapeServer, DataTransferSessionNoSuchDrive) {
......@@ -333,6 +335,7 @@ TEST(tapeServer, DataTransferSessionNoSuchDrive) {
std::string temp = logger.getLog();
temp += "";
ASSERT_NE(std::string::npos, logger.getLog().find("Drive not found on this path"));
ASSERT_EQ(SEINTERNAL, sim.m_sessionErrorCode);
}
class tempFile {
......@@ -480,6 +483,7 @@ TEST(tapeServer, DataTransferSessionGooddayMigration) {
i != expected.end(); i++) {
ASSERT_EQ(i->checksum, sim.m_receivedChecksums[i->fSeq]);
}
ASSERT_EQ(0, sim.m_sessionErrorCode);
}
/**
......@@ -565,6 +569,7 @@ TEST(tapeServer, DataTransferSessionMissingFilesMigration) {
driveConfig, rmc, initialProcess, capUtils, castorConf);
sess.execute();
simRun.wait();
ASSERT_EQ(SEINTERNAL, sim.m_sessionErrorCode);
}
/**
......@@ -674,6 +679,114 @@ TEST(tapeServer, DataTransferSessionTapeFullMigration) {
ASSERT_EQ(i->errorCode, sim.m_receivedErrorCodes[i->fSeq]);
}
}
ASSERT_EQ(ENOSPC, sim.m_sessionErrorCode);
}
TEST(tapeServer, DataTransferSessionTapeFullOnFlushMigration) {
// TpcpClients only supports 32 bits session number
// This number has to be less than 2^31 as in addition there is a mix
// of signed and unsigned numbers
// As the current ids in prod are ~30M, we are far from overflow (Feb 2013)
// 0) Prepare the logger for everyone
castor::log::StringLogger logger("tapeServerUnitTest");
// 1) prepare the client and run it in another thread
uint32_t volReq = 0xBEEF;
std::string vid = "V12345";
std::string density = "8000GC";
client::ClientSimulator sim(volReq, vid, density,
castor::tape::tapegateway::WRITE_TP, castor::tape::tapegateway::WRITE);
client::ClientSimulator::ipPort clientAddr = sim.getCallbackAddress();
clientRunner simRun(sim);
simRun.start();
// 2) Prepare the VDQM request
castor::legacymsg::RtcpJobRqstMsgBody VDQMjob;
snprintf(VDQMjob.clientHost, CA_MAXHOSTNAMELEN+1, "%d.%d.%d.%d",
clientAddr.a, clientAddr.b, clientAddr.c, clientAddr.d);
snprintf(VDQMjob.driveUnit, CA_MAXUNMLEN+1, "T10D6116");
snprintf(VDQMjob.dgn, CA_MAXDGNLEN+1, "LIBXX");
VDQMjob.clientPort = clientAddr.port;
VDQMjob.volReqId = volReq;
// 3) Prepare the necessary environment (logger, plus system wrapper),
// construct and run the session.
castor::tape::System::mockWrapper mockSys;
mockSys.delegateToFake();
mockSys.disableGMockCallsCounting();
mockSys.fake.setupForVirtualDriveSLC6();
//delete is unnecessary
//pointer with ownership will be passed to the application,
//which will do the delete
const uint64_t tapeSize = 5000;
mockSys.fake.m_pathToDrive["/dev/nst0"] =
new castor::tape::tapeserver::drives::FakeDrive(tapeSize,
castor::tape::tapeserver::drives::FakeDrive::OnFlush);
// Just label the tape
castor::tape::tapeFile::LabelSession ls(*mockSys.fake.m_pathToDrive["/dev/nst0"],
"V12345", true);
mockSys.fake.m_pathToDrive["/dev/nst0"]->rewind();
tempFileVector tempFiles;
std::vector<expectedResult> expected;
uint64_t remainingSpace = tapeSize;
bool failedFileDone = false;
// Prepare the files (in real filesystem as they will be opened by the rfio client)
for (int fseq=1; fseq<=10; fseq++) {
// Create the file from which we will recall
const size_t fileSize = 1000;
std::auto_ptr<tempFile> tf(new tempFile(fileSize));
// Prepare the migrationRequest
castor::tape::tapegateway::FileToMigrateStruct ftm;
ftm.setFileSize(tf->m_size);
ftm.setFileid(1000 + fseq);
ftm.setFseq(fseq);
ftm.setPath(tf->path());
sim.addFileToMigrate(ftm);
if (fileSize + 6 * 80 < remainingSpace) {
expected.push_back(expectedResult(fseq, tf->checksum()));
remainingSpace -= fileSize + 6 * 80;
} else if (!failedFileDone) {
// We expect no report for this file anymore (but the report for the
// session)
failedFileDone = true;
}
tempFiles.push_back(tf.release());
}
castor::tape::utils::DriveConfig driveConfig;
driveConfig.unitName = "T10D6116";
driveConfig.dgn = "T10KD6";
driveConfig.devFilename = "/dev/tape_T10D6116";
driveConfig.densities.push_back("8000GC");
driveConfig.densities.push_back("5000GC");
driveConfig.librarySlot = "manual";
driveConfig.devType = "T10000";
DataTransferSession::CastorConf castorConf;
castorConf.rtcopydBufsz = 1024*1024; // 1 MB memory buffers
castorConf.rtcopydNbBufs = 10;
castorConf.tapebridgeBulkRequestMigrationMaxBytes = UINT64_C(100)*1000*1000*1000;
castorConf.tapebridgeBulkRequestMigrationMaxFiles = 1000;
castorConf.tapeserverdDiskThreads = 1;
castor::legacymsg::VmgrProxyDummy vmgr;
castor::legacymsg::VdqmProxyDummy vdqm(VDQMjob);
castor::legacymsg::RmcProxyDummy rmc;
castor::messages::TapeserverProxyDummy initialProcess;
castor::server::ProcessCapDummy capUtils;
DataTransferSession sess("tapeHost", VDQMjob, logger, mockSys,
driveConfig, rmc, initialProcess, capUtils, castorConf);
sess.execute();
simRun.wait();
for (std::vector<struct expectedResult>::iterator i = expected.begin();
i != expected.end(); i++) {
if (!i->errorCode) {
ASSERT_EQ(i->checksum, sim.m_receivedChecksums[i->fSeq]);
} else {
ASSERT_EQ(i->errorCode, sim.m_receivedErrorCodes[i->fSeq]);
}
}
ASSERT_EQ(ENOSPC, sim.m_sessionErrorCode);
}
} // namespace unitTest
......@@ -91,9 +91,9 @@ void MigrationReportPacker::reportEndOfSession() {
//------------------------------------------------------------------------------
//reportEndOfSessionWithErrors
//------------------------------------------------------------------------------
void MigrationReportPacker::reportEndOfSessionWithErrors(std::string msg,int error_code){
void MigrationReportPacker::reportEndOfSessionWithErrors(std::string msg,int errorCode){
castor::server::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportEndofSessionWithErrors(msg,error_code));
m_fifo.push(new ReportEndofSessionWithErrors(msg,errorCode));
}
//------------------------------------------------------------------------------
//ReportSuccessful::reportStuckOn
......@@ -263,17 +263,22 @@ void MigrationReportPacker::ReportEndofSessionWithErrors::execute(MigrationRepor
if(_this.m_errorHappened) {
reportFileErrors(_this);
_this.m_client.reportEndOfSessionWithError(m_message,m_error_code,chrono);
_this.m_client.reportEndOfSessionWithError(m_message,m_errorCode,chrono);
log::ScopedParamContainer sp(_this.m_lc);
sp.add("errorMessage", m_message)
.add("errorCode", m_error_code)
.add("errorCode", m_errorCode)
.add("connectDuration", chrono.connectDuration)
.add("sendRecvDuration", chrono.sendRecvDuration)
.add("transactionId", chrono.transactionId);
_this.m_lc.log(LOG_INFO,"Reported end of session with error to client after sending file errors");
} else{
const std::string& msg ="Reported end of session with error to client";
_this.m_client.reportEndOfSessionWithError(msg,SEINTERNAL,chrono);
// As a measure of safety we censor any session error which is not ENOSPC into
// SEINTERNAL. ENOSPC is the only one interpreted by the tape gateway.
if (ENOSPC != m_errorCode) {
m_errorCode = SEINTERNAL;
}
_this.m_client.reportEndOfSessionWithError(msg,m_errorCode,chrono);
_this.m_lc.log(LOG_INFO,msg);
}
_this.m_continue=false;
......
......@@ -163,10 +163,10 @@ private:
};
class ReportEndofSessionWithErrors : public Report {
std::string m_message;
int m_error_code;
int m_errorCode;
public:
ReportEndofSessionWithErrors(std::string msg,int error_code):
m_message(msg),m_error_code(error_code){}
ReportEndofSessionWithErrors(std::string msg,int errorCode):
m_message(msg),m_errorCode(errorCode){}
virtual void execute(MigrationReportPacker& _this);
};
......
......@@ -212,13 +212,26 @@ void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() {
}
task->circulateMemBlocks();
}
//then log the end of write thread
// Prepare the standard error codes for the session
std::string errorMessage(e.getMessageValue());
int errorCode(e.code());
// Override if we got en ENOSPC error (end of tape)
// This is
try {
const castor::exception::Errnum & errnum =
dynamic_cast<const castor::exception::Errnum &> (e);
if (ENOSPC == errnum.errorNumber()) {
errorCode = ENOSPC;
errorMessage = "End of migration due to tape full";
}
} catch (...) {}
// then log the end of write thread
log::ScopedParamContainer params(m_logContext);
params.add("status", "error")
.add("ErrorMesage", e.getMessageValue());
.add("ErrorMesage", errorMessage);
logWithStats(LOG_ERR, "Tape thread complete",
params,totalTimer.secs());
m_reportPacker.reportEndOfSessionWithErrors(e.getMessageValue(),e.code());
m_reportPacker.reportEndOfSessionWithErrors(errorMessage,errorCode);
}
}
......
......@@ -28,13 +28,16 @@ namespace {
const char filemark[] = "";
}
castor::tape::tapeserver::drives::FakeDrive::FakeDrive(uint64_t capacity) throw():
m_currentPosition(0), m_tapeCapacity(capacity), beginOfCompressStats(0) {
castor::tape::tapeserver::drives::FakeDrive::FakeDrive(uint64_t capacity,
FailureMoment failureMoment) throw():
m_currentPosition(0), m_tapeCapacity(capacity), m_beginOfCompressStats(0),
m_failureMoment(failureMoment), m_tapeOverflow(false)
{
m_tape.reserve(max_fake_drive_record_length);
}
castor::tape::tapeserver::drives::compressionStats castor::tape::tapeserver::drives::FakeDrive::getCompression() {
castor::tape::tapeserver::drives::compressionStats stats;
for(unsigned int i=beginOfCompressStats;i<m_tape.size();++i){
for(unsigned int i=m_beginOfCompressStats;i<m_tape.size();++i){
stats.toTape += m_tape[i].data.length();
}
......@@ -43,7 +46,7 @@ castor::tape::tapeserver::drives::compressionStats castor::tape::tapeserver::dri
return stats;
}
void castor::tape::tapeserver::drives::FakeDrive::clearCompressionStats() {
beginOfCompressStats=m_tape.size();
m_beginOfCompressStats=m_tape.size();
}
castor::tape::tapeserver::drives::deviceInfo castor::tape::tapeserver::drives::FakeDrive::getDeviceInfo() {
deviceInfo devInfo;
......@@ -118,7 +121,11 @@ void castor::tape::tapeserver::drives::FakeDrive::spaceFileMarksForward(size_t c
void castor::tape::tapeserver::drives::FakeDrive::unloadTape(void) {
}
void castor::tape::tapeserver::drives::FakeDrive::flush(void) {
//already flushing
if (m_failureMoment == OnFlush) {
if (m_tapeOverflow) {
throw castor::exception::Errnum(ENOSPC, "Error in castor::tape::tapeserver::drives::FakeDrive::flush");
}
}
}
uint64_t castor::tape::tapeserver::drives::FakeDrive::getRemaingSpace(uint32_t currentPosition)
......@@ -142,13 +149,22 @@ void castor::tape::tapeserver::drives::FakeDrive::writeImmediateFileMarks(size_t
writeSyncFileMarks(count);
}
void castor::tape::tapeserver::drives::FakeDrive::writeBlock(const void * data, size_t count) {
// check that the next block will fit in the remaining space on the tape.
// check that the next block will fit in the remaining space on the tape
// and compute what will be left after
uint64_t remainingSpaceAfterBlock;
if (count > getRemaingSpace(m_currentPosition)) {
throw castor::exception::Errnum(ENOSPC, "Error in castor::tape::tapeserver::drives::FakeDrive::writeBlock");
if (m_failureMoment == OnWrite) {
throw castor::exception::Errnum(ENOSPC, "Error in castor::tape::tapeserver::drives::FakeDrive::writeBlock");
} else {
remainingSpaceAfterBlock = 0;
m_tapeOverflow = true;
}
} else {
remainingSpaceAfterBlock = getRemaingSpace(m_currentPosition) - count;
}
m_tape.resize(m_currentPosition+1);
m_tape[m_currentPosition].data.assign((const char *)data, count);
m_tape[m_currentPosition].remainingSpaceAfter = getRemaingSpace(m_currentPosition) - count;
m_tape[m_currentPosition].remainingSpaceAfter = remainingSpaceAfterBlock;
m_currentPosition++;
}
ssize_t castor::tape::tapeserver::drives::FakeDrive::readBlock(void *data, size_t count) {
......
......@@ -41,11 +41,18 @@ namespace drives {
std::vector<tapeBlock> m_tape;
uint32_t m_currentPosition;
uint64_t m_tapeCapacity;
int beginOfCompressStats;
int m_beginOfCompressStats;
uint64_t getRemaingSpace(uint32_t currentPosition);
public:
enum FailureMoment { OnWrite, OnFlush } ;
private:
const enum FailureMoment m_failureMoment;
bool m_tapeOverflow;
public:
std::string contentToString() throw();
FakeDrive(uint64_t capacity=std::numeric_limits<uint64_t>::max()) throw();
FakeDrive(uint64_t capacity=std::numeric_limits<uint64_t>::max(),
enum FailureMoment failureMoment=OnWrite) throw();
virtual ~FakeDrive() throw(){}
virtual compressionStats getCompression() ;
virtual void clearCompressionStats() ;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment