Commit 5b1a2a27 authored by David COME's avatar David COME
Browse files

Added a bunch of comments for MigrationTaskInjector,MigrationReportPacker,...

Added a bunch of comments for MigrationTaskInjector,MigrationReportPacker, RecallTaskInjector and DataFifo
parent 734c6e9d
......@@ -35,19 +35,40 @@ namespace tape {
namespace tapeserver {
namespace daemon {
/* A fixed payload FIFO: at creation time, we know how many blocks will go through the FIFO.
The provide block method return true as long as it still needs more block. False when last
block is provided (and throws an exception after that).
/* A fixed payload FIFO: at creation time, we know how many blocks will go
* through the FIFO (its size). The provide block method return true as long as
* it still needs more block. False when last block is provided (and throws an
* exception after that).
* Sum up
+------------------------------+ |
getFreeBlock| | provideBlock |
<----------+ <----------------+
| DataFifo |
| |
+----------> +--------------->
pushDataBlock +------------------------------+ getDataBlock
*/
class DataFifo : public MemoryManagerClient {
public:
DataFifo(int bn) throw() : m_blocksNeeded(bn), m_freeBlocksProvided(0),
/**
* Constructor
* @param bn :how many memory block we want in the fifo (its size)
*/
DataFifo(int bn) : m_blocksNeeded(bn), m_freeBlocksProvided(0),
m_dataBlocksPushed(0), m_dataBlocksPopped(0) {};
~DataFifo() throw() { castor::tape::threading::MutexLocker ml(&m_freeBlockProviderProtection); }
~DataFifo() throw() {
castor::tape::threading::MutexLocker ml(&m_freeBlockProviderProtection);
}
/* Memory manager client interface implementation */
/*
* Return a memory block to the object
* @param mb : the memory block to be returned
* @return true true if not all the needed blocks has not yet been provided
*/
virtual bool provideBlock(MemBlock *mb) {
bool ret;
castor::tape::threading::MutexLocker ml(&m_freeBlockProviderProtection);
......@@ -62,12 +83,20 @@ public:
return ret;
}
/* Rest of the data Fifo interface. */
MemBlock * getFreeBlock() throw(castor::exception::Exception) {
/*
* Get a free block
* @return a free block
*/
MemBlock * getFreeBlock() {
return m_freeBlocks.pop();
}
void pushDataBlock(MemBlock *mb) throw(castor::exception::Exception) {
/**
* Push into the object a memory block that has been filled somehow :
* tape/disk reading
* @param mb the block we want to push back
*/
void pushDataBlock(MemBlock *mb) {
{
castor::tape::threading::MutexLocker ml(&m_countersMutex);
if (m_dataBlocksPushed >= m_blocksNeeded)
......@@ -80,7 +109,12 @@ public:
}
}
MemBlock * popDataBlock() throw(castor::exception::Exception) {
/**
* Push into the object a memory block that has been filled somehow :
* tape/disk reading
* @param mb the block we want to push back
*/
MemBlock * popDataBlock() {
MemBlock *ret = m_dataBlocks.pop();
{
castor::tape::threading::MutexLocker ml(&m_countersMutex);
......@@ -89,7 +123,11 @@ public:
return ret;
}
bool finished() throw() {
/**
* Check if we have finish
* @return Return true if we have popped more data blocks than its size
*/
bool finished() {
// No need to lock because only one int variable is read.
//TODO : are we sure the operation is atomic ? It is plateform dependant
castor::tape::threading::MutexLocker ml(&m_countersMutex);
......@@ -99,11 +137,23 @@ public:
private:
castor::tape::threading::Mutex m_countersMutex;
castor::tape::threading::Mutex m_freeBlockProviderProtection;
///the number of memory blocks we want to be provided to the object (its size).
const int m_blocksNeeded;
///how many blocks have been currently provided
volatile int m_freeBlocksProvided;
///how many data blocks have been currently pushed
volatile int m_dataBlocksPushed;
///how many data blocks have been currently taken
volatile int m_dataBlocksPopped;
///thread sage storage of all free blocks
castor::tape::threading::BlockingQueue<MemBlock *> m_freeBlocks;
///thread sage storage of all blocks filled with data
castor::tape::threading::BlockingQueue<MemBlock *> m_dataBlocks;
};
......
......@@ -41,43 +41,61 @@ namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
//------------------------------------------------------------------------------
//Constructor
//------------------------------------------------------------------------------
MigrationReportPacker::MigrationReportPacker(client::ClientInterface & tg,castor::log::LogContext lc):
ReportPackerInterface<detail::Migration>(tg,lc),
m_workerThread(*this),m_errorHappened(false),m_continue(true) {
}
//------------------------------------------------------------------------------
//Destructore
//------------------------------------------------------------------------------
MigrationReportPacker::~MigrationReportPacker(){
castor::tape::threading::MutexLocker ml(&m_producterProtection);
}
//------------------------------------------------------------------------------
//reportCompletedJob
//------------------------------------------------------------------------------
void MigrationReportPacker::reportCompletedJob(
const tapegateway::FileToMigrateStruct& migratedFile,unsigned long checksum) {
std::auto_ptr<Report> rep(new ReportSuccessful(migratedFile,checksum));
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep.release());
}
//------------------------------------------------------------------------------
//reportFailedJob
//------------------------------------------------------------------------------
void MigrationReportPacker::reportFailedJob(const tapegateway::FileToMigrateStruct& migratedFile,
const std::string& msg,int error_code){
std::auto_ptr<Report> rep(new ReportError(migratedFile,msg,error_code));
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep.release());
}
//------------------------------------------------------------------------------
//reportFlush
//------------------------------------------------------------------------------
void MigrationReportPacker::reportFlush() {
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportFlush());
}
//------------------------------------------------------------------------------
//reportEndOfSession
//------------------------------------------------------------------------------
void MigrationReportPacker::reportEndOfSession() {
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportEndofSession());
}
//------------------------------------------------------------------------------
//reportEndOfSessionWithErrors
//------------------------------------------------------------------------------
void MigrationReportPacker::reportEndOfSessionWithErrors(std::string msg,int error_code){
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportEndofSessionWithErrors(msg,error_code));
}
//------------------------------------------------------------------------------
//ReportSuccessful::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportSuccessful::execute(MigrationReportPacker& _this){
std::auto_ptr<tapegateway::FileMigratedNotificationStruct> successMigration(new tapegateway::FileMigratedNotificationStruct);
......@@ -93,6 +111,9 @@ void MigrationReportPacker::ReportSuccessful::execute(MigrationReportPacker& _th
_this.m_listReports->addSuccessfulMigrations(successMigration.release());
}
//------------------------------------------------------------------------------
//ReportFlush::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& _this){
if(!_this.m_errorHappened){
_this.logReport(_this.m_listReports->successfulMigrations(),"A file was successfully written on the tape");
......@@ -122,6 +143,10 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& _this){
//Thus all current reports are deleted otherwise they would have been sent again at the next flush
_this.m_listReports.reset(new tapegateway::FileMigrationReportList);
}
//------------------------------------------------------------------------------
//ReportEndofSession::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportEndofSession::execute(MigrationReportPacker& _this){
client::ClientInterface::RequestReport chrono;
if(!_this.m_errorHappened){
......@@ -135,7 +160,9 @@ void MigrationReportPacker::ReportEndofSession::execute(MigrationReportPacker& _
}
_this.m_continue=false;
}
//------------------------------------------------------------------------------
//ReportEndofSessionWithErrors::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportEndofSessionWithErrors::execute(MigrationReportPacker& _this){
client::ClientInterface::RequestReport chrono;
......@@ -151,7 +178,9 @@ void MigrationReportPacker::ReportEndofSessionWithErrors::execute(MigrationRepor
}
_this.m_continue=false;
}
//------------------------------------------------------------------------------
//ReportError::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportError::execute(MigrationReportPacker& _this){
std::auto_ptr<tapegateway::FileErrorReportStruct> failedMigration(new tapegateway::FileErrorReportStruct);
......@@ -166,12 +195,16 @@ void MigrationReportPacker::ReportError::execute(MigrationReportPacker& _this){
_this.m_listReports->addFailedMigrations(failedMigration.release());
_this.m_errorHappened=true;
}
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
//WorkerThread::WorkerThread
//------------------------------------------------------------------------------
MigrationReportPacker::WorkerThread::WorkerThread(MigrationReportPacker& parent):
m_parent(parent) {
}
//------------------------------------------------------------------------------
//WorkerThread::run
//------------------------------------------------------------------------------
void MigrationReportPacker::WorkerThread::run(){
m_parent.m_lc.pushOrReplace(log::Param("thread", "ReportPacker"));
client::ClientInterface::RequestReport chrono;
......
......@@ -27,7 +27,14 @@
#include "log.h"
namespace{
/*
* function to set a NULL the owning FilesToMigrateList of a FileToMigrateStruct
* Indeed, a clone of a structure will only do a shallow copy (sic).
* Otherwise at the second destruction the object will try to remove itself
* from the owning list and then boom !
* @param ptr a pointer to an object to change
* @return the parameter ptr
*/
castor::tape::tapegateway::FileToMigrateStruct* removeOwningList(castor::tape::tapegateway::FileToMigrateStruct* ptr){
ptr->setFilesToMigrateList(0);
return ptr;
......@@ -42,7 +49,9 @@ namespace tape{
namespace tapeserver{
namespace daemon {
//------------------------------------------------------------------------------
//Constructor
//------------------------------------------------------------------------------
MigrationTaskInjector::MigrationTaskInjector(MigrationMemoryManager & mm,
DiskReadThreadPool & diskReader,
TapeSingleThreadInterface<TapeWriteTaskInterface> & tapeWriter,client::ClientInterface& client,
......@@ -55,11 +64,11 @@ namespace daemon {
}
/**
* Create all the tape-read and write-disk tasks for set of files to retrieve
* @param jobs to transform into tasks
*/
void MigrationTaskInjector::injectBulkMigrations(const std::vector<tapegateway::FileToMigrateStruct*>& jobs){
//------------------------------------------------------------------------------
//injectBulkMigrations
//------------------------------------------------------------------------------
void MigrationTaskInjector::injectBulkMigrations(
const std::vector<tapegateway::FileToMigrateStruct*>& jobs){
const u_signed64 blockCapacity = m_memManager.blockCapacity();
for(std::vector<tapegateway::FileToMigrateStruct*>::const_iterator it= jobs.begin();it!=jobs.end();++it){
......@@ -88,27 +97,32 @@ namespace daemon {
}
}
/**
* Wait for the inner thread to finish
*/
//------------------------------------------------------------------------------
//injectBulkMigrations
//------------------------------------------------------------------------------
void MigrationTaskInjector::waitThreads(){
m_thread.wait();
}
/**
* Start the inner thread
*/
//------------------------------------------------------------------------------
//injectBulkMigrations
//------------------------------------------------------------------------------
void MigrationTaskInjector::startThreads(){
m_thread.start();
}
//------------------------------------------------------------------------------
//requestInjection
//------------------------------------------------------------------------------
void MigrationTaskInjector::requestInjection( bool lastCall) {
castor::tape::threading::MutexLocker ml(&m_producerProtection);
if(!m_errorFlag) {
m_queue.push(Request(m_maxFiles, m_maxByte, lastCall));
}
}
//------------------------------------------------------------------------------
//synchronousInjection
//------------------------------------------------------------------------------
bool MigrationTaskInjector::synchronousInjection() {
client::ClientProxy::RequestReport reqReport;
std::auto_ptr<tapegateway::FilesToMigrateList>
......@@ -123,13 +137,17 @@ namespace daemon {
return true;
}
}
//------------------------------------------------------------------------------
//finish
//------------------------------------------------------------------------------
void MigrationTaskInjector::finish(){
castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_queue.push(Request());
}
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
//WorkerThread::run
//------------------------------------------------------------------------------
void MigrationTaskInjector::WorkerThread::run(){
m_parent.m_lc.pushOrReplace(Param("thread", "MigrationTaskInjector"));
m_parent.m_lc.log(LOG_INFO, "Starting MigrationTaskInjector thread");
......
......@@ -46,18 +46,26 @@ namespace daemon {
class MigrationTaskInjector: public TaskInjector {
public:
/**
* Constructor
* @param mm The memory manager for accessing memory blocks.
* The Newly created tapeWriter Tasks will register themselves
* as a client to it.
* @param diskReader the one object that will hold all the threads which will be executing
* disk-reading tasks
* @param tapeWriter the one object that will hold the thread which will be executing
* tape-writing tasks
* @param client The one that will give us files to migrate
* @param maxFiles maximal number of files we may request to the client at once
* @param byteSizeThreshold maximal number of cumulated byte
* we may request to the client. at once
* @param lc log context, copied because of the threading mechanism
*/
MigrationTaskInjector(MigrationMemoryManager & mm,
DiskReadThreadPool & diskReader,
TapeSingleThreadInterface<TapeWriteTaskInterface> & tapeWriter,client::ClientInterface& client,
uint64_t maxFiles, uint64_t byteSizeThreshold,castor::log::LogContext lc);
/**
* Create all the tape-read and write-disk tasks for set of files to retrieve
* @param jobs
*/
void injectBulkMigrations(const std::vector<castor::tape::tapegateway::FileToMigrateStruct*>& jobs);
/**
* Wait for the inner thread to finish
*/
......@@ -72,8 +80,6 @@ public:
* DiskReadThreadPool. When DiskReadThreadPool::popAndRequestMoreJobs detects
* it has not enough jobs to do to, it is class to push a request
* in order to (try) fill up the queue.
* @param maxFiles files count requested.
* @param maxBlocks total bytes count at least requested
* @param lastCall true if we want the new request to be a last call.
* See Request::lastCall
*/
......@@ -83,8 +89,6 @@ public:
* Contact the client to make sure there are really something to do
* Something = migration at most maxFiles or at least maxBytes
*
* @param maxFiles files count requested.
* @param byteSizeThreshold total bytes count at least requested
* @return true if there are jobs to be done, false otherwise
*/
bool synchronousInjection();
......@@ -95,6 +99,11 @@ public:
*/
void finish();
private:
/**
* Create all the tape-read and write-disk tasks for set of files to retrieve
* @param jobs the list of FileToMigrateStructs we have to transform in a pair of task
*/
void injectBulkMigrations(const std::vector<castor::tape::tapegateway::FileToMigrateStruct*>& jobs);
/*Compute how many blocks are needed for a file of fileSize bytes*/
size_t howManyBlocksNeeded(size_t fileSize,size_t blockCapacity){
......@@ -139,12 +148,18 @@ private:
private:
MigrationTaskInjector & m_parent;
} m_thread;
///The memory manager for accessing memory blocks.
MigrationMemoryManager & m_memManager;
///the one object that will hold the thread which will be executing
///tape-writing tasks
TapeSingleThreadInterface<TapeWriteTaskInterface>& m_tapeWriter;
///the one object that will hold all the threads which will be executing
///disk-reading tasks
DiskReadThreadPool & m_diskReader;
/// the client who is sending us jobs
client::ClientInterface& m_client;
/**
......@@ -153,14 +168,20 @@ private:
castor::log::LogContext m_lc;
castor::tape::threading::Mutex m_producerProtection;
///all the requests for work we will forward to the client.
castor::tape::threading::BlockingQueue<Request> m_queue;
/** a shared flag among the all tasks related to migration, set as true
* as soon a single task encounters a failure. That way we go into a degraded mode
* where we only circulate memory without writing anything on tape
*/
castor::tape::threading::AtomicFlag m_errorFlag;
//maximal number of files requested. at once
/// maximal number of files requested. at once
const uint64_t m_maxFiles;
//maximal number of cumulated byte requested. at once
/// maximal number of cumulated byte requested. at once
const uint64_t m_maxByte;
};
......
/******************************************************************************
* RecallReportPacker.hpp
* RecallReportPacker.cpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
......@@ -41,39 +41,56 @@ namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
RecallReportPacker::RecallReportPacker(client::ClientInterface & tg,unsigned int reportFilePeriod,log::LogContext lc):
//------------------------------------------------------------------------------
//Constructor
//------------------------------------------------------------------------------
RecallReportPacker::RecallReportPacker(client::ClientInterface & tg,
unsigned int reportFilePeriod,log::LogContext lc):
ReportPackerInterface<detail::Recall>(tg,lc),
m_workerThread(*this),m_reportFilePeriod(reportFilePeriod),m_errorHappened(false){
}
//------------------------------------------------------------------------------
//Destructor
//------------------------------------------------------------------------------
RecallReportPacker::~RecallReportPacker(){
castor::tape::threading::MutexLocker ml(&m_producterProtection);
}
//------------------------------------------------------------------------------
//reportCompletedJob
//------------------------------------------------------------------------------
void RecallReportPacker::reportCompletedJob(const FileStruct& recalledFile,unsigned long checksum){
std::auto_ptr<Report> rep(new ReportSuccessful(recalledFile,checksum));
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep.release());
}
//------------------------------------------------------------------------------
//reportFailedJob
//------------------------------------------------------------------------------
void RecallReportPacker::reportFailedJob(const FileStruct & recalledFile
,const std::string& msg,int error_code){
std::auto_ptr<Report> rep(new ReportError(recalledFile,msg,error_code));
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep.release());
}
//------------------------------------------------------------------------------
//reportEndOfSession
//------------------------------------------------------------------------------
void RecallReportPacker::reportEndOfSession(){
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportEndofSession());
}
//------------------------------------------------------------------------------
//reportEndOfSessionWithErrors
//------------------------------------------------------------------------------
void RecallReportPacker::reportEndOfSessionWithErrors(const std::string msg,int error_code){
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportEndofSessionWithErrors(msg,error_code));
}
//------------------------------------------------------------------------------
//ReportSuccessful::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& _this){
std::auto_ptr<FileSuccessStruct> successRecall(new FileSuccessStruct);
......@@ -89,11 +106,10 @@ void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& _this){
successRecall->setChecksumName("adler32");
_this.m_listReports->addSuccessfulRecalls(successRecall.release());
}
//------------------------------------------------------------------------------
//flush
//------------------------------------------------------------------------------
void RecallReportPacker::flush(){
unsigned int totalSize = m_listReports->failedRecalls().size() +
m_listReports->successfulRecalls().size();
if(totalSize > 0){
client::ClientInterface::RequestReport chrono;
try{
......@@ -111,12 +127,13 @@ void RecallReportPacker::flush(){
m_lc.log(LOG_ERR,msg_error);
throw failedReportRecallResult(msg_error);
}
}
//delete the old pointer and replace it with the new one provided
//that way, all the reports that have been send are deleted (by FileReportList's destructor)
m_listReports.reset(new FileReportList);
}
//------------------------------------------------------------------------------
//ReportEndofSession::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportEndofSession::execute(RecallReportPacker& _this){
client::ClientInterface::RequestReport chrono;
if(!_this.m_errorHappened){
......@@ -130,7 +147,9 @@ void RecallReportPacker::ReportEndofSession::execute(RecallReportPacker& _this){
_this.logRequestReport(chrono,"reporting EndOfSessionWithError done",LOG_ERR);
}
}
//------------------------------------------------------------------------------
//ReportEndofSessionWithErrors::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportEndofSessionWithErrors::execute(RecallReportPacker& _this){
client::ClientInterface::RequestReport chrono;
if(_this.m_errorHappened) {
......@@ -144,7 +163,9 @@ void RecallReportPacker::ReportEndofSessionWithErrors::execute(RecallReportPacke
_this.m_client.reportEndOfSessionWithError(msg,SEINTERNAL,chrono);
}
}
//------------------------------------------------------------------------------
//ReportError::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportError::execute(RecallReportPacker& _this){
std::auto_ptr<FileErrorStruct> failed(new FileErrorStruct);
......@@ -160,11 +181,14 @@ void RecallReportPacker::ReportError::execute(RecallReportPacker& _this){
_this.m_errorHappened=true;
}
//------------------------------------------------------------------------------
//WorkerThread::WorkerThread
//------------------------------------------------------------------------------
RecallReportPacker::WorkerThread::WorkerThread(RecallReportPacker& parent):
m_parent(parent) {
}
//------------------------------------------------------------------------------
//WorkerThread::run
//------------------------------------------------------------------------------
void RecallReportPacker::WorkerThread::run(){
m_parent.m_lc.pushOrReplace(Param("thread", "RecallReportPacker"));
m_parent.m_lc.log(LOG_DEBUG, "Starting RecallReportPacker thread");
......@@ -173,9 +197,11 @@ void RecallReportPacker::WorkerThread::run(){
while(1) {
std::auto_ptr<Report> rep (m_parent.m_fifo.pop());
//how mane files we have globally treated
unsigned int totalSize = m_parent.m_listReports->failedRecalls().size() +
m_parent.m_listReports->successfulRecalls().size();
//If we have enough reports or we are going to end the loop
//then we flush
if(totalSize >= m_parent.m_reportFilePeriod || rep->goingToEnd() )
{
......@@ -183,8 +209,9 @@ void RecallReportPacker::WorkerThread::run(){
m_parent.flush();