Commit 9b52a981 authored by David COME's avatar David COME
Browse files

(WIP) first iteration of MigrationReportPacker

parent 638eaa4a
......@@ -21,14 +21,15 @@ add_executable(tapeserver-mm
tapeserver-mm.cpp
RecallTaskInjector.cpp
Exception.cpp
ClientProxy.cpp)
ClientProxy.cpp
MigrationReportPacker.cpp)
target_link_libraries(tapeserver-mm
TapeDrive Exception SCSI System Utils File
castorcommon castorclient castorTapeServerThreading castortapeutils
castortapegatewayprotocol)
add_library(tapeserver ClientProxy.cpp MountSession.cpp RecallTaskInjector.cpp)
add_library(tapeserver ClientProxy.cpp MountSession.cpp RecallTaskInjector.cpp MigrationReportPacker.cpp)
add_library(tapeserverdTest
ClientSimulator.cpp
......
/******************************************************************************
* MigrationReportPacker.cpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include <memory>
#include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp"
#include "castor/tape/tapegateway/FileErrorReportStruct.hpp"
#include "castor/tape/tapegateway/FileMigratedNotificationStruct.hpp"
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
MigrationReportPacker::MigrationReportPacker(ClientInterface & tg):
m_workerThread(*this),m_client(tg) {
}
void MigrationReportPacker::reportCompletedJob(const tapegateway::FileToMigrateStruct& migratedFile) {
std::auto_ptr<Report> rep(new ReportSuccessful(migratedFile));
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep.release());
}
void MigrationReportPacker::reportFailedJob(const tapegateway::FileToMigrateStruct& migratedFile,
const std::string& msg,int error_code){
std::auto_ptr<Report> rep(new ReportError(migratedFile,msg,error_code));
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep.release());
}
void MigrationReportPacker::reportFlush() {
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportFlush());
}
void MigrationReportPacker::reportEndOfSession() {
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportEndofSession());
}
void MigrationReportPacker::reportEndOfSessionWithErrors(const std::string msg,int error_code){
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportEndofSessionWithErrors(msg,error_code));
}
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportSuccessful::execute(MigrationReportPacker& _this){
std::auto_ptr<tapegateway::FileMigratedNotificationStruct> successMigration(new tapegateway::FileMigratedNotificationStruct);
successMigration->setFseq(m_migratedFile.fseq());
successMigration->setFileTransactionId(m_migratedFile.fileTransactionId());
successMigration->setId(m_migratedFile.id());
successMigration->setNshost(m_migratedFile.nshost());
successMigration->setFileid(m_migratedFile.fileid());
_this.m_listReports.addSuccessfulMigrations(successMigration.release());
}
void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& _this){
tapeserver::daemon::ClientInterface::RequestReport chrono;
_this.m_client.reportMigrationResults(_this.m_listReports,chrono);
}
void MigrationReportPacker::ReportEndofSession::execute(MigrationReportPacker& _this){
tapeserver::daemon::ClientInterface::RequestReport chrono;
_this.m_client.reportEndOfSession(chrono);
}
void MigrationReportPacker::ReportEndofSessionWithErrors::execute(MigrationReportPacker& _this){
tapeserver::daemon::ClientInterface::RequestReport chrono;
_this.m_listReports.successfulMigrations().clear();
_this.m_client.reportEndOfSessionWithError(m_message,m_error_code,chrono);
}
void MigrationReportPacker::ReportError::execute(MigrationReportPacker& _this){
std::auto_ptr<tapegateway::FileErrorReportStruct> failedMigration(new tapegateway::FileErrorReportStruct);
failedMigration->setErrorCode(m_error_code);
failedMigration->setErrorMessage(m_error_msg);
failedMigration->setFseq(m_migratedFile.fseq());
failedMigration->setFileTransactionId(m_migratedFile.fileTransactionId());
failedMigration->setId(m_migratedFile.id());
failedMigration->setNshost(m_migratedFile.nshost());
_this.m_listReports.addFailedMigrations(failedMigration.release());
_this.reportEndOfSessionWithErrors(m_error_msg,m_error_code);
}
//------------------------------------------------------------------------------
MigrationReportPacker::WorkerThread::WorkerThread(MigrationReportPacker& parent):
m_parent(parent) {
}
void MigrationReportPacker::WorkerThread::run(){
while(1) {
std::auto_ptr<MigrationReportPacker::Report> rep (m_parent.m_fifo.pop());
rep->execute(m_parent);
}
}
}}}}
......@@ -24,68 +24,114 @@
#pragma once
#include "castor/tape/tapeserver/daemon/MockTapeGateway.hpp"
#include "castor/tape/tapeserver/threading/BlockingQueue.hpp"
#include "castor/tape/tapeserver/daemon/MigrationJob.hpp"
#include "castor/tape/tapeserver/daemon/ClientInterface.hpp"
#include "castor/tape/tapegateway/FileToMigrateStruct.hpp"
#include <list>
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
class MigrationReportPacker {
public:
MigrationReportPacker(MockTapeGateway & tg): m_workerThread(*this),
m_tapeGateway(tg) {}
void reportCompletedJob(MigrationJob mj) {
Report rep;
rep.migrationJob = mj;
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep);
}
void reportFlush() {
Report rep;
rep.flush = true;
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep);
}
void reportEndOfSession() {
Report rep;
rep.EoSession = true;
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep);
}
MigrationReportPacker(ClientInterface & tg);
/**
* Create into the MigrationReportPacker a report for the successful migration
* of migratedFile
* @param migratedFile the file successfully migrated
*/
void reportCompletedJob(const tapegateway::FileToMigrateStruct& migratedFile);
/**
* Create into the MigrationReportPacker a report for the failled migration
* of migratedFile
* @param migratedFile the file which failled
*/
void reportFailedJob(const tapegateway::FileToMigrateStruct& migratedFile,const std::string& msg,int error_code);
/**
* Create into the MigrationReportPacker a report for the signaling a flusing on tape
*/
void reportFlush();
/**
* Create into the MigrationReportPacker a report for the nominal end of session
*/
void reportEndOfSession();
/**
* Create into the MigrationReportPacker a report for an erroneous end of session
* @param msg The error message
* @param error_code The error code given by the drive
*/
void reportEndOfSessionWithErrors(const std::string msg,int error_code);
void startThreads() { m_workerThread.start(); }
void waitThread() { m_workerThread.wait(); }
virtual ~MigrationReportPacker() { castor::tape::threading::MutexLocker ml(&m_producterProtection); }
private:
class Report {
public:
Report(): flush(false), EoSession(false),
migrationJob(-1, -1, -1) {}
bool flush;
bool EoSession;
MigrationJob migrationJob;
virtual ~Report(){}
virtual void execute(MigrationReportPacker& packer)=0;
};
class WorkerThread: public castor::tape::threading::Thread {
class ReportSuccessful : public Report {
const tapegateway::FileToMigrateStruct& m_migratedFile;
public:
ReportSuccessful(const tapegateway::FileToMigrateStruct& file):
m_migratedFile(file){}
virtual void execute(MigrationReportPacker& _this);
};
class ReportFlush : public Report {
public:
void execute(MigrationReportPacker& _this);
};
class ReportError : public Report {
const tapegateway::FileToMigrateStruct& m_migratedFile;
const std::string m_error_msg;
const int m_error_code;
public:
ReportError(const tapegateway::FileToMigrateStruct& file,const std::string& msg,int error_code):
m_migratedFile(file),m_error_msg(msg),m_error_code(error_code){}
virtual void execute(MigrationReportPacker& _this);
};
class ReportEndofSession : public Report {
public:
virtual void execute(MigrationReportPacker& _this);
};
class ReportEndofSessionWithErrors : public Report {
std::string m_message;
int m_error_code;
public:
WorkerThread(MigrationReportPacker& parent): m_parent(parent) {}
void run() {
while(1) {
MigrationReportPacker::Report rep = m_parent.m_fifo.pop();
if (rep.flush || rep.EoSession) {
m_parent.m_tapeGateway.reportMigratedFiles(m_parent.m_currentReport);
m_parent.m_currentReport.clear();
if (rep.EoSession) {
m_parent.m_tapeGateway.reportEndOfSession(false);
return;
}
} else {
m_parent.m_currentReport.push_back(rep.migrationJob);
}
}
}
ReportEndofSessionWithErrors(const std::string msg,int error_code):
m_message(msg),m_error_code(error_code){}
virtual void execute(MigrationReportPacker& _this);
};
class WorkerThread: public castor::tape::threading::Thread {
MigrationReportPacker & m_parent;
public:
WorkerThread(MigrationReportPacker& parent);
virtual void run();
} m_workerThread;
friend class WorkerThread;
MockTapeGateway & m_tapeGateway;
castor::tape::threading::BlockingQueue<Report> m_fifo;
std::list<MigrationJob> m_currentReport;
ClientInterface & m_client;
castor::tape::threading::BlockingQueue<Report*> m_fifo;
tapegateway::FileMigrationReportList m_listReports;
castor::tape::threading::Mutex m_producterProtection;
};
}}}}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment