Commit 3e0dfac5 authored by Daniele Kruse's avatar Daniele Kruse
Browse files

Fixed the MigrationReportPackerTest and changed the MigrationReportPacker to...

Fixed the MigrationReportPackerTest and changed the MigrationReportPacker to be more similar to the RecallReportPacker
parent dab54286
......@@ -18,6 +18,12 @@
#include "scheduler/ArchiveMount.hpp"
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
cta::ArchiveMount::ArchiveMount() {
}
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
......
......@@ -37,6 +37,11 @@ namespace cta {
friend class Scheduler;
protected:
/**
* Constructor.
*/
ArchiveMount();
/**
* Constructor.
*
......
......@@ -70,8 +70,8 @@ std::unique_ptr<cta::ArchiveJob> successfulArchiveJob,u_int32_t checksum,
//reportFailedJob
//------------------------------------------------------------------------------
void MigrationReportPacker::reportFailedJob(std::unique_ptr<cta::ArchiveJob> failedArchiveJob,
const std::string& msg,int error_code){
std::unique_ptr<Report> rep(new ReportError(std::move(failedArchiveJob),msg,error_code));
const castor::exception::Exception &ex){
std::unique_ptr<Report> rep(new ReportError(std::move(failedArchiveJob),ex));
castor::server::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep.release());
}
......@@ -254,7 +254,8 @@ void MigrationReportPacker::ReportEndofSessionWithErrors::execute(MigrationRepor
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportError::execute(MigrationReportPacker& reportPacker){
reportPacker.m_errorHappened=true;
m_failedArchiveJob->failed(cta::exception::Exception("Error happened somewhere during the migration process"));
reportPacker.m_lc.log(LOG_ERR,m_ex.getMessageValue());
m_failedArchiveJob->failed(cta::exception::Exception(m_ex.getMessageValue()));
}
//------------------------------------------------------------------------------
......
......@@ -62,10 +62,9 @@ public:
* Create into the MigrationReportPacker a report for the failled migration
* of migratedFile
* @param migratedFile the file which failled
* @param msg the error message to the failure
* @param error_code the error code related to the failure
* @param ex the reason for the failure
*/
void reportFailedJob(std::unique_ptr<cta::ArchiveJob> failedArchiveJob,const std::string& msg,int error_code);
void reportFailedJob(std::unique_ptr<cta::ArchiveJob> failedArchiveJob, const castor::exception::Exception& ex);
/**
* Create into the MigrationReportPacker a report for the signaling a flusing on tape
......@@ -118,20 +117,6 @@ private:
class ReportFlush : public Report {
drive::compressionStats m_compressStats;
/**
* This function will approximate the compressed size of the files which
* have been migrated. The idea is to compute the average ration
* logicalSize/nbByteWritenWithCompression for the whole batch
* and apply that ratio to the whole set of files
* We currently computing it only to the file that have been successfully
* migrated
* @param beg Beginning of the upper class' successfulMigrations()
* @param end End of upper class' successfulMigrations()
*/
// void computeCompressedSize(
// std::vector<tapegateway::FileMigratedNotificationStruct*>::iterator beg,
// std::vector<tapegateway::FileMigratedNotificationStruct*>::iterator end);
public:
/* We only can compute the compressed size once we have flushed on the drive
* We can get from the drive the number of byte it really wrote to tape
......@@ -143,16 +128,15 @@ private:
void execute(MigrationReportPacker& reportPacker);
};
class ReportError : public Report {
const std::string m_error_msg;
const int m_error_code;
const castor::exception::Exception m_ex;
/**
* The failed archive job to be reported immediately
*/
std::unique_ptr<cta::ArchiveJob> m_failedArchiveJob;
public:
ReportError(std::unique_ptr<cta::ArchiveJob> failedArchiveJob, std::string msg,int error_code):
m_error_msg(msg), m_error_code(error_code), m_failedArchiveJob(std::move(failedArchiveJob)){}
ReportError(std::unique_ptr<cta::ArchiveJob> failedArchiveJob, const castor::exception::Exception &ex):
m_ex(ex), m_failedArchiveJob(std::move(failedArchiveJob)){}
virtual void execute(MigrationReportPacker& reportPacker);
};
......
......@@ -29,166 +29,193 @@
#include <gtest/gtest.h>
using ::testing::_;
using ::testing::Invoke;
using namespace castor::tape;
namespace unitTests {
class TestingArchiveMount: public cta::ArchiveMount {
public:
TestingArchiveMount(std::unique_ptr<cta::SchedulerDatabase::ArchiveMount> dbam): ArchiveMount(std::move(dbam)) {
}
~TestingArchiveMount() throw() {}
void complete() {
complete_();
class castor_tape_tapeserver_daemonTest: public ::testing::Test {
protected:
void SetUp() {
}
void failed(const std::exception &ex) {
failed_(ex);
void TearDown() {
}
MOCK_METHOD0(complete_, void());
MOCK_METHOD1(failed_, void(const std::exception &ex));
};
class MockArchiveJob: public cta::ArchiveJob {
public:
MockArchiveJob() {
}
~MockArchiveJob() throw() {
}
MOCK_METHOD0(complete, void());
MOCK_METHOD1(failed, void(const cta::exception::Exception &ex));
}; // class MockArchiveJob
class MockArchiveMount: public cta::ArchiveMount {
public:
MockArchiveMount() {
const unsigned int nbArchiveJobs = 2;
createArchiveJobs(nbArchiveJobs);
}
~MockArchiveMount() throw() {
}
std::unique_ptr<cta::ArchiveJob> getNextJob() {
internalGetNextJob();
if(m_jobs.empty()) {
return std::unique_ptr<cta::ArchiveJob>();
} else {
std::unique_ptr<cta::ArchiveJob> job = std::move(m_jobs.front());
m_jobs.pop_front();
return job;
}
}
MOCK_METHOD0(internalGetNextJob, cta::ArchiveJob*());
MOCK_METHOD0(complete, void());
private:
std::list<std::unique_ptr<cta::ArchiveJob>> m_jobs;
void createArchiveJobs(const unsigned int nbJobs) {
for(unsigned int i = 0; i < nbJobs; i++) {
m_jobs.push_back(std::unique_ptr<cta::ArchiveJob>(
new MockArchiveJob()));
}
}
}; // class MockArchiveMount
}; // class castor_tape_tapeserver_daemonTest
class TestingArchiveJob: public cta::ArchiveJob {
public:
TestingArchiveJob() {
TEST_F(castor_tape_tapeserver_daemonTest, MigrationReportPackerNominal) {
MockArchiveMount tam;
::testing::InSequence dummy;
std::unique_ptr<cta::ArchiveJob> job1;
{
std::unique_ptr<MockArchiveJob> mockJob(new MockArchiveJob());
EXPECT_CALL(*mockJob, complete()).Times(1);
job1.reset(mockJob.release());
}
~TestingArchiveJob() throw() {}
void complete() {
complete_();
std::unique_ptr<cta::ArchiveJob> job2;
{
std::unique_ptr<MockArchiveJob> mockJob(new MockArchiveJob());
EXPECT_CALL(*mockJob, complete()).Times(1);
job2.reset(mockJob.release());
}
void failed(const cta::exception::Exception &ex) {
failed_(ex);
EXPECT_CALL(tam, complete()).Times(1);
castor::log::StringLogger log("castor_tape_tapeserver_daemon_MigrationReportPackerNominal");
castor::log::LogContext lc(log);
tapeserver::daemon::MigrationReportPacker mrp(&tam,lc);
mrp.startThreads();
mrp.reportCompletedJob(std::move(job1),0,0);
mrp.reportCompletedJob(std::move(job2),0,0);
const tapeserver::drive::compressionStats statsCompress;
mrp.reportFlush(statsCompress);
mrp.reportEndOfSession();
mrp.waitThread(); //here
std::string temp = log.getLog();
ASSERT_NE(std::string::npos, temp.find("Reported to the client that a batch of files was written on tape"));
}
TEST_F(castor_tape_tapeserver_daemonTest, MigrationReportPackerFailure) {
MockArchiveMount tam;
::testing::InSequence dummy;
std::unique_ptr<cta::ArchiveJob> job1;
{
std::unique_ptr<MockArchiveJob> mockJob(new MockArchiveJob());
job1.reset(mockJob.release());
}
MOCK_METHOD0(complete_, void());
MOCK_METHOD1(failed_, void(const cta::exception::Exception &ex));
};
class TestingDBArchiveJob: public cta::SchedulerDatabase::ArchiveMount {
virtual const MountInfo & getMountInfo() {
return mountInfo;
std::unique_ptr<cta::ArchiveJob> job2;
{
std::unique_ptr<MockArchiveJob> mockJob(new MockArchiveJob());
job2.reset(mockJob.release());
}
};
const std::string error="ERROR_TEST";
using namespace castor::tape;
const tapeserver::drive::compressionStats statsCompress;
using ::testing::_;
TEST(castor_tape_tapeserver_daemon, MigrationReportPackerNominal) {
std::unique_ptr<cta::SchedulerDatabase::ArchiveMount> dbam(new TestingDBArchiveJob);
TestingArchiveMount tam(std::move(dbam));
::testing::InSequence dummy;
EXPECT_CALL(tam, complete_()).Times(1);
castor::log::StringLogger log("castor_tape_tapeserver_daemon_MigrationReportPackerNominal");
castor::log::LogContext lc(log);
tapeserver::daemon::MigrationReportPacker mrp(&tam,lc);
mrp.startThreads();
std::unique_ptr<TestingArchiveJob> migratedFile(new TestingArchiveJob());
mrp.reportCompletedJob(std::move(migratedFile),0,0);
mrp.reportFlush(statsCompress);
mrp.reportEndOfSession();
mrp.waitThread(); //here
std::string temp = log.getLog();
ASSERT_NE(std::string::npos, temp.find("Reported to the client that a batch of files was written on tape"));
}
std::unique_ptr<cta::ArchiveJob> job3;
{
std::unique_ptr<MockArchiveJob> mockJob(new MockArchiveJob());
EXPECT_CALL(*mockJob, failed(_)).Times(1);
job3.reset(mockJob.release());
}
EXPECT_CALL(tam, complete()).Times(1);
TEST(castor_tape_tapeserver_daemon, MigrationReportPackerFailure) {
std::unique_ptr<cta::SchedulerDatabase::ArchiveMount> dbam(new TestingDBArchiveJob);
TestingArchiveMount tam(std::move(dbam));
::testing::InSequence dummy;
EXPECT_CALL(tam, complete_()).Times(1);
castor::log::StringLogger log("castor_tape_tapeserver_daemon_MigrationReportPackerFailure");
castor::log::LogContext lc(log);
tapeserver::daemon::MigrationReportPacker mrp(&tam,lc);
mrp.startThreads();
std::unique_ptr<TestingArchiveJob> migratedFile(new TestingArchiveJob());
std::unique_ptr<TestingArchiveJob> failed(new TestingArchiveJob());
mrp.reportCompletedJob(std::move(migratedFile),0,0);
mrp.reportFailedJob(std::move(failed),error,-1);
mrp.reportFlush(statsCompress);
mrp.reportEndOfSessionWithErrors(error,-1);
mrp.waitThread();
std::string temp = log.getLog();
ASSERT_NE(std::string::npos, temp.find("Reported end of session with error to client after sending file errors"));
}
TEST(castor_tape_tapeserver_daemon, MigrationReportPackerFailureGoodEnd) {
std::unique_ptr<cta::SchedulerDatabase::ArchiveMount> dbam(new TestingDBArchiveJob);
TestingArchiveMount tam(std::move(dbam));
::testing::InSequence dummy;
EXPECT_CALL(tam, complete_()).Times(1);
castor::log::StringLogger log("castor_tape_tapeserver_daemon_MigrationReportPackerFailureGoodEnd");
castor::log::LogContext lc(log);
tapeserver::daemon::MigrationReportPacker mrp(&tam,lc);
mrp.startThreads();
std::unique_ptr<TestingArchiveJob> migratedFile(new TestingArchiveJob());
std::unique_ptr<TestingArchiveJob> failed(new TestingArchiveJob());
mrp.reportCompletedJob(std::move(migratedFile),0,0);
mrp.reportFailedJob(std::move(failed),error,-1);
mrp.reportFlush(statsCompress);
mrp.reportEndOfSession();
mrp.waitThread();
std::string temp = log.getLog();
ASSERT_NE(std::string::npos, temp.find("Reported end of session with error to client due to previous file errors"));
}
TEST(castor_tape_tapeserver_daemon, MigrationReportPackerGoodBadEnd) {
std::unique_ptr<cta::SchedulerDatabase::ArchiveMount> dbam(new TestingDBArchiveJob);
TestingArchiveMount tam(std::move(dbam));
::testing::InSequence dummy;
EXPECT_CALL(tam, complete_()).Times(1);
castor::log::StringLogger log("castor_tape_tapeserver_daemon_MigrationReportPackerGoodBadEnd");
castor::log::LogContext lc(log);
tapeserver::daemon::MigrationReportPacker mrp(&tam,lc);
mrp.startThreads();
std::unique_ptr<TestingArchiveJob> migratedFile(new TestingArchiveJob());
mrp.reportCompletedJob(std::move(migratedFile),0,0);
mrp.reportFlush(statsCompress);
mrp.reportEndOfSessionWithErrors(error,-1);
mrp.waitThread();
std::string temp = log.getLog();
ASSERT_NE(std::string::npos, temp.find("Reported end of session with error to client"));
}
castor::log::StringLogger log("castor_tape_tapeserver_daemon_MigrationReportPackerFailure");
castor::log::LogContext lc(log);
tapeserver::daemon::MigrationReportPacker mrp(&tam,lc);
mrp.startThreads();
TEST(castor_tape_tapeserver_daemon, MigrationReportPackerOneByteFile) {
std::unique_ptr<cta::SchedulerDatabase::ArchiveMount> dbam(new TestingDBArchiveJob);
TestingArchiveMount tam(std::move(dbam));
::testing::InSequence dummy;
EXPECT_CALL(tam, complete_()).Times(1);
castor::log::StringLogger log("castor_tape_tapeserver_daemon_MigrationReportPackerOneByteFile");
castor::log::LogContext lc(log);
tapeserver::daemon::MigrationReportPacker mrp(&tam,lc);
mrp.startThreads();
std::unique_ptr<TestingArchiveJob> migratedBigFile(new TestingArchiveJob());
std::unique_ptr<TestingArchiveJob> migratedFileSmall(new TestingArchiveJob());
std::unique_ptr<TestingArchiveJob> migrateNullFile(new TestingArchiveJob());
migratedBigFile->archiveFile.size=100000;
migratedFileSmall->archiveFile.size=1;
migrateNullFile->archiveFile.size=0;
mrp.reportCompletedJob(std::move(migratedBigFile),0,0);
mrp.reportCompletedJob(std::move(migratedFileSmall),0,0);
mrp.reportCompletedJob(std::move(migrateNullFile),0,0);
tapeserver::drive::compressionStats stats;
stats.toTape=(100000+1)/3;
mrp.reportFlush(stats);
mrp.reportEndOfSession();
mrp.waitThread();
}
mrp.reportCompletedJob(std::move(job1),0,0);
mrp.reportCompletedJob(std::move(job2),0,0);
const std::string error_msg = "ERROR_TEST_MSG";
const castor::exception::Exception ex(error_msg);
mrp.reportFailedJob(std::move(job3),ex);
const tapeserver::drive::compressionStats statsCompress;
mrp.reportFlush(statsCompress);
mrp.reportEndOfSession();
mrp.waitThread();
std::string temp = log.getLog();
ASSERT_NE(std::string::npos, temp.find(error_msg));
}
TEST_F(castor_tape_tapeserver_daemonTest, MigrationReportPackerOneByteFile) {
MockArchiveMount tam;
::testing::InSequence dummy;
std::unique_ptr<cta::ArchiveJob> migratedBigFile;
{
std::unique_ptr<MockArchiveJob> mockJob(new MockArchiveJob());
EXPECT_CALL(*mockJob, complete()).Times(1);
migratedBigFile.reset(mockJob.release());
}
std::unique_ptr<cta::ArchiveJob> migratedFileSmall;
{
std::unique_ptr<MockArchiveJob> mockJob(new MockArchiveJob());
EXPECT_CALL(*mockJob, complete()).Times(1);
migratedFileSmall.reset(mockJob.release());
}
std::unique_ptr<cta::ArchiveJob> migratedNullFile;
{
std::unique_ptr<MockArchiveJob> mockJob(new MockArchiveJob());
EXPECT_CALL(*mockJob, complete()).Times(1);
migratedNullFile.reset(mockJob.release());
}
EXPECT_CALL(tam, complete()).Times(1);
migratedBigFile->archiveFile.size=100000;
migratedFileSmall->archiveFile.size=1;
migratedNullFile->archiveFile.size=0;
castor::log::StringLogger log("castor_tape_tapeserver_daemon_MigrationReportPackerOneByteFile");
castor::log::LogContext lc(log);
tapeserver::daemon::MigrationReportPacker mrp(&tam,lc);
mrp.startThreads();
mrp.reportCompletedJob(std::move(migratedBigFile),0,0);
mrp.reportCompletedJob(std::move(migratedFileSmall),0,0);
mrp.reportCompletedJob(std::move(migratedNullFile),0,0);
tapeserver::drive::compressionStats stats;
stats.toTape=(100000+1)/3;
mrp.reportFlush(stats);
mrp.reportEndOfSession();
mrp.waitThread();
std::string temp = log.getLog();
ASSERT_NE(std::string::npos, temp.find("Reported to the client that a batch of files was written on tape"));
}
}
......@@ -197,7 +197,7 @@ namespace daemon {
LogContext::ScopedParam sp1(lc, Param("exceptionMessage", e.getMessageValue()));
lc.log(errorLevel,"An error occurred for this file. End of migrations.");
circulateMemBlocks();
reportPacker.reportFailedJob(std::move(m_archiveJob),e.getMessageValue(),errorCode);
reportPacker.reportFailedJob(std::move(m_archiveJob),e);
//we throw again because we want TWST to stop all tasks from execution
//and go into a degraded mode operation.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment