Commit 2e87bb5d authored by David COME's avatar David COME
Browse files

(WIP) First implementation of RecallReportPacker. Tests missing

parent 1c0ac0a6
......@@ -22,7 +22,8 @@ add_executable(tapeserver-mm
RecallTaskInjector.cpp
Exception.cpp
ClientProxy.cpp
MigrationReportPacker.cpp)
MigrationReportPacker.cpp
RecallReportPacker.cpp)
target_link_libraries(tapeserver-mm
TapeDrive Exception SCSI System Utils File
......
/******************************************************************************
* RecallReportPacker.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
#include "castor/tape/tapegateway/FileRecalledNotificationStruct.hpp"
#include "castor/tape/tapegateway/FileRecalledNotificationStruct.hpp"
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
RecallReportPacker::RecallReportPacker(ClientInterface & tg,unsigned int reportFilePeriod,log::LogContext lc):
ReportPackerInterface<detail::Recall>(tg,lc),
m_workerThread(*this),m_reportFilePeriod(reportFilePeriod),m_errorHappened(false),m_continue(true){
}
RecallReportPacker::~RecallReportPacker(){
}
void RecallReportPacker::reportCompletedJob(const tapegateway::FileToRecallStruct& recalledFile){
std::auto_ptr<Report> rep(new ReportSuccessful(recalledFile));
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep.release());
}
void RecallReportPacker::reportFailedJob(const tapegateway::FileToRecallStruct & recalledFile
,const std::string& msg,int error_code){
std::auto_ptr<Report> rep(new ReportError(recalledFile,msg,error_code));
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep.release());
}
void RecallReportPacker::reportEndOfSession(){
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportEndofSession());
}
void RecallReportPacker::reportEndOfSessionWithErrors(const std::string msg,int error_code){
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportEndofSessionWithErrors(msg,error_code));
}
//------------------------------------------------------------------------------
void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& _this){
std::auto_ptr<FileSuccessStruct> successRecall(new FileSuccessStruct);
successRecall->setFseq(m_migratedFile.fseq());
successRecall->setFileTransactionId(m_migratedFile.fileTransactionId());
successRecall->setId(m_migratedFile.id());
successRecall->setNshost(m_migratedFile.nshost());
successRecall->setFileid(m_migratedFile.fileid());
_this.m_listReports->addSuccessfulRecalls(successRecall.release());
}
void RecallReportPacker::flush(){
unsigned int totalSize = m_listReports->failedRecalls().size() +
m_listReports->successfulRecalls().size();
if(totalSize > 0){
ClientInterface::RequestReport chrono;
m_client.reportRecallResults(*m_listReports,chrono);
}
}
void RecallReportPacker::ReportEndofSession::execute(RecallReportPacker& _this){
if(!_this.m_errorHappened){
tapeserver::daemon::ClientInterface::RequestReport chrono;
_this.m_client.reportEndOfSession(chrono);
}
else {
const std::string& msg ="Nominal EndofSession has been reported but an writing error on the tape happened before";
_this.m_lc.log(LOG_ERR,msg);
//throw castor::exception::Exception(msg);
}
_this.m_continue=false;
}
void RecallReportPacker::ReportEndofSessionWithErrors::execute(RecallReportPacker& _this){
if(_this.m_errorHappened) {
tapeserver::daemon::ClientInterface::RequestReport chrono;
_this.m_client.reportEndOfSessionWithError(m_message,m_error_code,chrono);
}
else{
const std::string& msg ="EndofSessionWithErrors has been reported but NO writing error on the tape was detected";
_this.m_lc.log(LOG_ERR,msg);
//throw castor::exception::Exception(msg);
}
_this.m_continue=false;
}
void RecallReportPacker::ReportError::execute(RecallReportPacker& _this){
std::auto_ptr<FileErrorStruct> failed(new FileErrorStruct);
//failedMigration->setFileMigrationReportList(_this.m_listReports.get());
failed->setErrorCode(m_error_code);
failed->setErrorMessage(m_error_msg);
failed->setFseq(m_migratedFile.fseq());
failed->setFileTransactionId(m_migratedFile.fileTransactionId());
failed->setId(m_migratedFile.id());
failed->setNshost(m_migratedFile.nshost());
_this.m_listReports->addFailedRecalls(failed.release());
_this.m_errorHappened=true;
}
//------------------------------------------------------------------------------
RecallReportPacker::WorkerThread::WorkerThread(RecallReportPacker& parent):
m_parent(parent) {
}
void RecallReportPacker::WorkerThread::run(){
while(m_parent.m_continue) {
std::auto_ptr<Report> rep (m_parent.m_fifo.pop());
rep->execute(m_parent);
unsigned int totalSize = m_parent.m_listReports->failedRecalls().size() +
m_parent.m_listReports->successfulRecalls().size();
if(totalSize>m_parent.m_reportFilePeriod)
{
m_parent.flush();
}
}
/*
* we might have stored into m_parent.m_listReports some reports between
* the last flush and the End of session but not reached the critical mass
* required to flush. So we unconditionally flush
*/
m_parent.flush();
}
}}}}
/******************************************************************************
* RecallReportPacker.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#ifndef RECALLREPORTPACKER_HPP
#define RECALLREPORTPACKER_HPP
#include "castor/tape/tapeserver/daemon/ReportPackerInterface.hpp"
#include "castor/log/LogContext.hpp"
#include "castor/tape/tapeserver/daemon/ClientInterface.hpp"
#include "castor/tape/tapeserver/threading/Threading.hpp"
#include "castor/tape/tapeserver/threading/BlockingQueue.hpp"
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
class RecallReportPacker : private ReportPackerInterface<detail::Recall> {
public:
RecallReportPacker(ClientInterface & tg,unsigned int reportFilePeriod,log::LogContext lc);
~RecallReportPacker();
/**
* Create into the MigrationReportPacker a report for the successful migration
* of migratedFile
* @param migratedFile the file successfully migrated
*/
void reportCompletedJob(const tapegateway::FileToRecallStruct& recalledFile);
/**
* Create into the MigrationReportPacker a report for the failled migration
* of migratedFile
* @param migratedFile the file which failled
*/
void reportFailedJob(const tapegateway::FileToRecallStruct & recalledFile,const std::string& msg,int error_code);
/**
* Create into the MigrationReportPacker a report for the nominal end of session
*/
void reportEndOfSession();
/**
* Create into the MigrationReportPacker a report for an erroneous end of session
* @param msg The error message
* @param error_code The error code given by the drive
*/
void reportEndOfSessionWithErrors(const std::string msg,int error_code);
//void startThreads() { m_workerThread.start(); }
//void waitThread() { m_workerThread.wait(); }
private:
class Report {
public:
virtual ~Report(){}
virtual void execute(RecallReportPacker& packer)=0;
};
class ReportSuccessful : public Report {
const FileStruct m_migratedFile;
public:
ReportSuccessful(const FileStruct& file):
m_migratedFile(file){}
virtual void execute(RecallReportPacker& _this);
};
class ReportError : public Report {
const FileStruct m_migratedFile;
const std::string m_error_msg;
const int m_error_code;
public:
ReportError(const FileStruct& file,std::string msg,int error_code):
m_migratedFile(file),m_error_msg(msg),m_error_code(error_code){}
virtual void execute(RecallReportPacker& _this);
};
class ReportEndofSession : public Report {
public:
virtual void execute(RecallReportPacker& _this);
};
class ReportEndofSessionWithErrors : public Report {
std::string m_message;
int m_error_code;
public:
ReportEndofSessionWithErrors(std::string msg,int error_code):
m_message(msg),m_error_code(error_code){}
virtual void execute(RecallReportPacker& _this);
};
class WorkerThread: public castor::tape::threading::Thread {
RecallReportPacker & m_parent;
public:
WorkerThread(RecallReportPacker& parent);
virtual void run();
} m_workerThread;
void flush();
castor::tape::threading::Mutex m_producterProtection;
/**
* m_fifo is holding all the report waiting to be processed
*/
castor::tape::threading::BlockingQueue<Report*> m_fifo;
unsigned int m_reportFilePeriod;
bool m_errorHappened;
bool m_continue;
};
}}}}
#endif /* RECALLREPORTPACKER_HPP */
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment