Commit 75dd7503 authored by David COME's avatar David COME
Browse files

MigrationReportPacker is pretty much finished

parent e784be4f
......@@ -27,16 +27,23 @@
#include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp"
#include "castor/tape/tapegateway/FileErrorReportStruct.hpp"
#include "castor/tape/tapegateway/FileMigratedNotificationStruct.hpp"
#include <cstdio>
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
MigrationReportPacker::MigrationReportPacker(ClientInterface & tg):
m_workerThread(*this),m_client(tg) {
MigrationReportPacker::MigrationReportPacker(ClientInterface & tg,castor::log::LogContext& lc):
m_workerThread(*this),m_client(tg),m_lc(lc),
m_listReports(new tapegateway::FileMigrationReportList),m_errorHappened(false),m_continue(true) {
}
MigrationReportPacker::~MigrationReportPacker(){
castor::tape::threading::MutexLocker ml(&m_producterProtection);
}
void MigrationReportPacker::reportCompletedJob(const tapegateway::FileToMigrateStruct& migratedFile) {
std::auto_ptr<Report> rep(new ReportSuccessful(migratedFile));
castor::tape::threading::MutexLocker ml(&m_producterProtection);
......@@ -58,7 +65,7 @@ void MigrationReportPacker::reportEndOfSession() {
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportEndofSession());
}
void MigrationReportPacker::reportEndOfSessionWithErrors(const std::string msg,int error_code){
void MigrationReportPacker::reportEndOfSessionWithErrors(std::string msg,int error_code){
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportEndofSessionWithErrors(msg,error_code));
}
......@@ -72,29 +79,55 @@ void MigrationReportPacker::ReportSuccessful::execute(MigrationReportPacker& _th
successMigration->setNshost(m_migratedFile.nshost());
successMigration->setFileid(m_migratedFile.fileid());
_this.m_listReports.addSuccessfulMigrations(successMigration.release());
_this.m_listReports->addSuccessfulMigrations(successMigration.release());
}
void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& _this){
if(!_this.m_errorHappened){
_this.logReport(_this.m_listReports->successfulMigrations(),"A file was successfully written on the tape");
tapeserver::daemon::ClientInterface::RequestReport chrono;
_this.m_client.reportMigrationResults(_this.m_listReports,chrono);
_this.m_client.reportMigrationResults(*(_this.m_listReports),chrono);
}
else {
const std::string& msg ="A flush on tape has been reported but a writing error happened before";
_this.logReport(_this.m_listReports->failedMigrations(),msg);
//throw castor::exception::Exception(msg);
}
//reset (ie delete and replace) the current m_listReports.
//Thus all current reports are deleted otherwise they would have been sent again at the next flush
_this.m_listReports.reset(new tapegateway::FileMigrationReportList);
}
void MigrationReportPacker::ReportEndofSession::execute(MigrationReportPacker& _this){
tapeserver::daemon::ClientInterface::RequestReport chrono;
_this.m_client.reportEndOfSession(chrono);
if(!_this.m_errorHappened){
tapeserver::daemon::ClientInterface::RequestReport chrono;
_this.m_client.reportEndOfSession(chrono);
}
else {
const std::string& msg ="Nominal EndofSession has been reported but an writing error on the tape happened before";
_this.m_lc.log(LOG_ERR,msg);
//throw castor::exception::Exception(msg);
}
_this.m_continue=false;
}
void MigrationReportPacker::ReportEndofSessionWithErrors::execute(MigrationReportPacker& _this){
if(_this.m_errorHappened) {
tapeserver::daemon::ClientInterface::RequestReport chrono;
_this.m_listReports.successfulMigrations().clear();
_this.m_client.reportEndOfSessionWithError(m_message,m_error_code,chrono);
_this.m_client.reportEndOfSessionWithError(m_message,m_error_code,chrono);
}
else{
const std::string& msg ="EndofSessionWithErrors has been reported but NO writing error on the tape was detected";
_this.m_lc.log(LOG_ERR,msg);
//throw castor::exception::Exception(msg);
}
_this.m_continue=false;
}
void MigrationReportPacker::ReportError::execute(MigrationReportPacker& _this){
std::auto_ptr<tapegateway::FileErrorReportStruct> failedMigration(new tapegateway::FileErrorReportStruct);
//failedMigration->setFileMigrationReportList(_this.m_listReports.get());
failedMigration->setErrorCode(m_error_code);
failedMigration->setErrorMessage(m_error_msg);
failedMigration->setFseq(m_migratedFile.fseq());
......@@ -102,8 +135,8 @@ void MigrationReportPacker::ReportError::execute(MigrationReportPacker& _this){
failedMigration->setId(m_migratedFile.id());
failedMigration->setNshost(m_migratedFile.nshost());
_this.m_listReports.addFailedMigrations(failedMigration.release());
_this.reportEndOfSessionWithErrors(m_error_msg,m_error_code);
_this.m_listReports->addFailedMigrations(failedMigration.release());
_this.m_errorHappened=true;
}
//------------------------------------------------------------------------------
MigrationReportPacker::WorkerThread::WorkerThread(MigrationReportPacker& parent):
......@@ -111,8 +144,8 @@ m_parent(parent) {
}
void MigrationReportPacker::WorkerThread::run(){
while(1) {
std::auto_ptr<MigrationReportPacker::Report> rep (m_parent.m_fifo.pop());
while(m_parent.m_continue) {
std::auto_ptr<Report> rep (m_parent.m_fifo.pop());
rep->execute(m_parent);
}
}
......
......@@ -28,7 +28,10 @@
#include "castor/tape/tapeserver/daemon/MigrationJob.hpp"
#include "castor/tape/tapeserver/daemon/ClientInterface.hpp"
#include "castor/tape/tapegateway/FileToMigrateStruct.hpp"
#include "castor/log/LogContext.hpp"
#include "castor/tape/tapeserver/utils/suppressUnusedVariable.hpp"
#include <list>
#include <memory>
namespace castor {
namespace tape {
......@@ -37,8 +40,14 @@ namespace daemon {
class MigrationReportPacker {
public:
MigrationReportPacker(ClientInterface & tg);
/**
* @param tg The client who is asking for a migration of his files
* and to whom we have to report to the status of the operations.
*/
MigrationReportPacker(ClientInterface & tg,log::LogContext& lc);
~MigrationReportPacker();
/**
* Create into the MigrationReportPacker a report for the successful migration
* of migratedFile
......@@ -75,46 +84,63 @@ public:
private:
/**
* Log a set of files independently of the success/failure
* @param c The set of files to log
* @param msg The message to be append at the end.
*/
template <class C> void logReport(const C& c,const std::string& msg){
using castor::log::LogContext;
using castor::log::Param;
for(typename C::const_iterator it=c.begin();it!=c.end();++it)
{
LogContext::ScopedParam sp[]={
LogContext::ScopedParam(m_lc, Param("ID", (*it)->id())),
LogContext::ScopedParam(m_lc, Param("FILEID",(*it)->fileid())),
LogContext::ScopedParam(m_lc, Param("FSEQ", (*it)->fseq())),
LogContext::ScopedParam(m_lc, Param("NSHOST", (*it)->nshost())),
LogContext::ScopedParam(m_lc, Param("FILETRANSACTIONID", (*it)->fileTransactionId()))
};
tape::utils::suppresUnusedVariable(sp);
m_lc.log(LOG_INFO,msg);
}
}
class Report {
public:
virtual ~Report(){}
virtual void execute(MigrationReportPacker& packer)=0;
};
class ReportSuccessful : public Report {
const tapegateway::FileToMigrateStruct& m_migratedFile;
const tapegateway::FileToMigrateStruct m_migratedFile;
public:
ReportSuccessful(const tapegateway::FileToMigrateStruct& file):
m_migratedFile(file){}
virtual void execute(MigrationReportPacker& _this);
};
class ReportFlush : public Report {
public:
void execute(MigrationReportPacker& _this);
};
class ReportError : public Report {
const tapegateway::FileToMigrateStruct& m_migratedFile;
class ReportError : public Report {
const tapegateway::FileToMigrateStruct m_migratedFile;
const std::string m_error_msg;
const int m_error_code;
public:
ReportError(const tapegateway::FileToMigrateStruct& file,const std::string& msg,int error_code):
ReportError(const tapegateway::FileToMigrateStruct& file,std::string msg,int error_code):
m_migratedFile(file),m_error_msg(msg),m_error_code(error_code){}
virtual void execute(MigrationReportPacker& _this);
};
class ReportEndofSession : public Report {
class ReportEndofSession : public Report {
public:
virtual void execute(MigrationReportPacker& _this);
};
class ReportEndofSessionWithErrors : public Report {
std::string m_message;
int m_error_code;
public:
ReportEndofSessionWithErrors(const std::string msg,int error_code):
ReportEndofSessionWithErrors(std::string msg,int error_code):
m_message(msg),m_error_code(error_code){}
virtual void execute(MigrationReportPacker& _this);
......@@ -128,10 +154,28 @@ private:
} m_workerThread;
ClientInterface & m_client;
castor::log::LogContext& m_lc;
/**
* m_fifo is holding all the report waiting to be processed
*/
castor::tape::threading::BlockingQueue<Report*> m_fifo;
tapegateway::FileMigrationReportList m_listReports;
/**
* m_listReports is holding all the report waiting to be processed
*/
std::auto_ptr<tapegateway::FileMigrationReportList> m_listReports;
castor::tape::threading::Mutex m_producterProtection;
/**
* Sanity check variable to register if an error has happened
* Is set at true as soon as a ReportError has been processed.
*/
bool m_errorHappened;
bool m_continue;
};
}}}}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment