Commit e5b1f960 authored by Eric Cano's avatar Eric Cano
Browse files

Catch up with parallel development.

parents 473540f2 3c310375
......@@ -25,21 +25,42 @@
#pragma once
#include <memory>
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
/*Use RAII to make sure the memory block is released
/*
* Use RAII to make sure the memory block is released
*(ie pushed back to the memory manager) in any case (exception or not)
* Example of use
* {
* MemBlock* block = getItFromSomewhere()
* {Recall/Migration}MemoryManager mm;
* AutoReleaseBlock releaser(block,mm);
* }
*/
template <class MemManagerT> class AutoReleaseBlock {
MemBlock *m_block;
/**
* The block to release
*/
MemBlock* const m_block;
/**
* To whom it should be given back
*/
MemManagerT& memManager;
public:
AutoReleaseBlock(MemBlock* mb,MemManagerT& mm):
/**
*
* @param mb he block to release
* @param mm To whom it should be given back
*/
AutoReleaseBlock(MemBlock* const mb,MemManagerT& mm):
m_block(mb),memManager(mm){}
//let the magic begin
~AutoReleaseBlock(){
memManager.releaseBlock(m_block);
}
......
......@@ -35,19 +35,40 @@ namespace tape {
namespace tapeserver {
namespace daemon {
/* A fixed payload FIFO: at creation time, we know how many blocks will go through the FIFO.
The provide block method return true as long as it still needs more block. False when last
block is provided (and throws an exception after that).
/* A fixed payload FIFO: at creation time, we know how many blocks will go
* through the FIFO (its size). The provide block method return true as long as
* it still needs more block. False when last block is provided (and throws an
* exception after that).
* Sum up
+------------------------------+ |
getFreeBlock| | provideBlock |
<----------+ <----------------+
| DataFifo |
| |
+----------> +--------------->
pushDataBlock +------------------------------+ getDataBlock
*/
class DataFifo : public MemoryManagerClient {
public:
DataFifo(int bn) throw() : m_blocksNeeded(bn), m_freeBlocksProvided(0),
/**
* Constructor
* @param bn :how many memory block we want in the fifo (its size)
*/
DataFifo(int bn) : m_blocksNeeded(bn), m_freeBlocksProvided(0),
m_dataBlocksPushed(0), m_dataBlocksPopped(0) {};
~DataFifo() throw() { castor::tape::threading::MutexLocker ml(&m_freeBlockProviderProtection); }
~DataFifo() throw() {
castor::tape::threading::MutexLocker ml(&m_freeBlockProviderProtection);
}
/* Memory manager client interface implementation */
/*
* Return a memory block to the object
* @param mb : the memory block to be returned
* @return true true if not all the needed blocks has not yet been provided
*/
virtual bool provideBlock(MemBlock *mb) {
bool ret;
castor::tape::threading::MutexLocker ml(&m_freeBlockProviderProtection);
......@@ -62,12 +83,20 @@ public:
return ret;
}
/* Rest of the data Fifo interface. */
MemBlock * getFreeBlock() throw(castor::exception::Exception) {
/*
* Get a free block
* @return a free block
*/
MemBlock * getFreeBlock() {
return m_freeBlocks.pop();
}
void pushDataBlock(MemBlock *mb) throw(castor::exception::Exception) {
/**
* Push into the object a memory block that has been filled somehow :
* tape/disk reading
* @param mb the block we want to push back
*/
void pushDataBlock(MemBlock *mb) {
{
castor::tape::threading::MutexLocker ml(&m_countersMutex);
if (m_dataBlocksPushed >= m_blocksNeeded)
......@@ -80,7 +109,12 @@ public:
}
}
MemBlock * popDataBlock() throw(castor::exception::Exception) {
/**
* Push into the object a memory block that has been filled somehow :
* tape/disk reading
* @param mb the block we want to push back
*/
MemBlock * popDataBlock() {
MemBlock *ret = m_dataBlocks.pop();
{
castor::tape::threading::MutexLocker ml(&m_countersMutex);
......@@ -89,7 +123,11 @@ public:
return ret;
}
bool finished() throw() {
/**
* Check if we have finish
* @return Return true if we have popped more data blocks than its size
*/
bool finished() {
// No need to lock because only one int variable is read.
//TODO : are we sure the operation is atomic ? It is plateform dependant
castor::tape::threading::MutexLocker ml(&m_countersMutex);
......@@ -99,11 +137,23 @@ public:
private:
castor::tape::threading::Mutex m_countersMutex;
castor::tape::threading::Mutex m_freeBlockProviderProtection;
///the number of memory blocks we want to be provided to the object (its size).
const int m_blocksNeeded;
///how many blocks have been currently provided
volatile int m_freeBlocksProvided;
///how many data blocks have been currently pushed
volatile int m_dataBlocksPushed;
///how many data blocks have been currently taken
volatile int m_dataBlocksPopped;
///thread sage storage of all free blocks
castor::tape::threading::BlockingQueue<MemBlock *> m_freeBlocks;
///thread sage storage of all blocks filled with data
castor::tape::threading::BlockingQueue<MemBlock *> m_dataBlocks;
};
......
......@@ -41,7 +41,7 @@ m_recallingFile(file),m_memManager(mm){
//------------------------------------------------------------------------------
// DiskWriteTask::execute
//------------------------------------------------------------------------------
bool DiskWriteTask::execute(ReportPackerInterface<detail::Recall>& reporter,log::LogContext& lc) {
bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc) {
using log::LogContext;
using log::Param;
try{
......
......@@ -29,7 +29,7 @@
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
#include "castor/tape/tapeserver/file/File.hpp"
#include "castor/tape/tapegateway/FileToRecallStruct.hpp"
#include "castor/tape/tapeserver/daemon/ReportPackerInterface.hpp"
#include "castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
#include <memory>
namespace {
......@@ -63,7 +63,7 @@ public:
* Main routine: takes each memory block in the fifo and writes it to disk
* @return true if the file has been successfully written false otherwise.
*/
virtual bool execute(ReportPackerInterface<detail::Recall>& reporter,log::LogContext& lc) ;
virtual bool execute(RecallReportPacker& reporter,log::LogContext& lc) ;
/**
* Allows client code to return a reusable memory block. Should not been called
......
......@@ -14,13 +14,13 @@
namespace unitTests{
using namespace castor::tape::tapeserver::daemon;
using namespace castor::tape::tapeserver::client;
struct MockRecallReportPacker : public ReportPackerInterface<detail::Recall>{
struct MockRecallReportPacker : public RecallReportPacker {
MOCK_METHOD2(reportCompletedJob,void(const FileStruct&,unsigned long));
MOCK_METHOD3(reportFailedJob, void(const FileStruct& ,const std::string&,int));
MOCK_METHOD0(reportEndOfSession, void());
MOCK_METHOD2(reportEndOfSessionWithErrors, void(const std::string,int));
MockRecallReportPacker(ClientInterface& client,castor::log::LogContext lc):
ReportPackerInterface<detail::Recall>(client,lc){}
RecallReportPacker(client,1,lc){}
};
TEST(castor_tape_tapeserver_daemon, DiskWriteTaskFailledBlock){
......
......@@ -11,7 +11,7 @@ namespace daemon {
// constructor
//------------------------------------------------------------------------------
DiskWriteThreadPool::DiskWriteThreadPool(int nbThread,
ReportPackerInterface<detail::Recall>& report,castor::log::LogContext lc):
RecallReportPacker& report,castor::log::LogContext lc):
m_reporter(report),m_lc(lc)
{
m_lc.pushOrReplace(castor::log::Param("threadCount", nbThread));
......
......@@ -57,7 +57,7 @@ public:
* be no side effect on the caller's logs.
*/
DiskWriteThreadPool(int nbThread,
ReportPackerInterface<detail::Recall>& reportPacker,
RecallReportPacker& reportPacker,
castor::log::LogContext lc);
/**
* Destructor: we suppose the threads are no running (waitThreads() should
......@@ -128,7 +128,7 @@ protected:
private:
/** Reference to the report packer where tasks report the result of their
* individual files and the end of session (for the last thread) */
ReportPackerInterface<detail::Recall>& m_reporter;
RecallReportPacker& m_reporter;
/** logging context that will be copied by each thread for individual context */
castor::log::LogContext m_lc;
......
......@@ -14,13 +14,13 @@
namespace unitTests{
using namespace castor::tape::tapeserver::daemon;
using namespace castor::tape::tapeserver::client;
struct MockRecallReportPacker : public ReportPackerInterface<detail::Recall>{
struct MockRecallReportPacker : public RecallReportPacker {
MOCK_METHOD2(reportCompletedJob,void(const FileStruct&,unsigned long));
MOCK_METHOD3(reportFailedJob, void(const FileStruct& ,const std::string&,int));
MOCK_METHOD0(reportEndOfSession, void());
MOCK_METHOD2(reportEndOfSessionWithErrors, void(const std::string,int));
MockRecallReportPacker(ClientInterface& client,castor::log::LogContext lc):
ReportPackerInterface<detail::Recall>(client,lc){}
RecallReportPacker(client,1,lc){}
};
struct MockTaskInjector : public TaskInjector{
......
......@@ -41,43 +41,61 @@ namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
//------------------------------------------------------------------------------
//Constructor
//------------------------------------------------------------------------------
MigrationReportPacker::MigrationReportPacker(client::ClientInterface & tg,castor::log::LogContext lc):
ReportPackerInterface<detail::Migration>(tg,lc),
m_workerThread(*this),m_errorHappened(false),m_continue(true) {
}
//------------------------------------------------------------------------------
//Destructore
//------------------------------------------------------------------------------
MigrationReportPacker::~MigrationReportPacker(){
castor::tape::threading::MutexLocker ml(&m_producterProtection);
}
//------------------------------------------------------------------------------
//reportCompletedJob
//------------------------------------------------------------------------------
void MigrationReportPacker::reportCompletedJob(
const tapegateway::FileToMigrateStruct& migratedFile,unsigned long checksum) {
std::auto_ptr<Report> rep(new ReportSuccessful(migratedFile,checksum));
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep.release());
}
//------------------------------------------------------------------------------
//reportFailedJob
//------------------------------------------------------------------------------
void MigrationReportPacker::reportFailedJob(const tapegateway::FileToMigrateStruct& migratedFile,
const std::string& msg,int error_code){
std::auto_ptr<Report> rep(new ReportError(migratedFile,msg,error_code));
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep.release());
}
//------------------------------------------------------------------------------
//reportFlush
//------------------------------------------------------------------------------
void MigrationReportPacker::reportFlush() {
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportFlush());
}
//------------------------------------------------------------------------------
//reportEndOfSession
//------------------------------------------------------------------------------
void MigrationReportPacker::reportEndOfSession() {
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportEndofSession());
}
//------------------------------------------------------------------------------
//reportEndOfSessionWithErrors
//------------------------------------------------------------------------------
void MigrationReportPacker::reportEndOfSessionWithErrors(std::string msg,int error_code){
castor::tape::threading::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportEndofSessionWithErrors(msg,error_code));
}
//------------------------------------------------------------------------------
//ReportSuccessful::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportSuccessful::execute(MigrationReportPacker& _this){
std::auto_ptr<tapegateway::FileMigratedNotificationStruct> successMigration(new tapegateway::FileMigratedNotificationStruct);
......@@ -93,6 +111,9 @@ void MigrationReportPacker::ReportSuccessful::execute(MigrationReportPacker& _th
_this.m_listReports->addSuccessfulMigrations(successMigration.release());
}
//------------------------------------------------------------------------------
//ReportFlush::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& _this){
if(!_this.m_errorHappened){
_this.logReport(_this.m_listReports->successfulMigrations(),"A file was successfully written on the tape");
......@@ -122,6 +143,10 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& _this){
//Thus all current reports are deleted otherwise they would have been sent again at the next flush
_this.m_listReports.reset(new tapegateway::FileMigrationReportList);
}
//------------------------------------------------------------------------------
//ReportEndofSession::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportEndofSession::execute(MigrationReportPacker& _this){
client::ClientInterface::RequestReport chrono;
if(!_this.m_errorHappened){
......@@ -135,7 +160,9 @@ void MigrationReportPacker::ReportEndofSession::execute(MigrationReportPacker& _
}
_this.m_continue=false;
}
//------------------------------------------------------------------------------
//ReportEndofSessionWithErrors::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportEndofSessionWithErrors::execute(MigrationReportPacker& _this){
client::ClientInterface::RequestReport chrono;
......@@ -151,7 +178,9 @@ void MigrationReportPacker::ReportEndofSessionWithErrors::execute(MigrationRepor
}
_this.m_continue=false;
}
//------------------------------------------------------------------------------
//ReportError::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportError::execute(MigrationReportPacker& _this){
std::auto_ptr<tapegateway::FileErrorReportStruct> failedMigration(new tapegateway::FileErrorReportStruct);
......@@ -166,12 +195,16 @@ void MigrationReportPacker::ReportError::execute(MigrationReportPacker& _this){
_this.m_listReports->addFailedMigrations(failedMigration.release());
_this.m_errorHappened=true;
}
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
//WorkerThread::WorkerThread
//------------------------------------------------------------------------------
MigrationReportPacker::WorkerThread::WorkerThread(MigrationReportPacker& parent):
m_parent(parent) {
}
//------------------------------------------------------------------------------
//WorkerThread::run
//------------------------------------------------------------------------------
void MigrationReportPacker::WorkerThread::run(){
m_parent.m_lc.pushOrReplace(log::Param("thread", "ReportPacker"));
client::ClientInterface::RequestReport chrono;
......
......@@ -27,7 +27,14 @@
#include "log.h"
namespace{
/*
* function to set a NULL the owning FilesToMigrateList of a FileToMigrateStruct
* Indeed, a clone of a structure will only do a shallow copy (sic).
* Otherwise at the second destruction the object will try to remove itself
* from the owning list and then boom !
* @param ptr a pointer to an object to change
* @return the parameter ptr
*/
castor::tape::tapegateway::FileToMigrateStruct* removeOwningList(castor::tape::tapegateway::FileToMigrateStruct* ptr){
ptr->setFilesToMigrateList(0);
return ptr;
......@@ -42,7 +49,9 @@ namespace tape{
namespace tapeserver{
namespace daemon {
//------------------------------------------------------------------------------
//Constructor
//------------------------------------------------------------------------------
MigrationTaskInjector::MigrationTaskInjector(MigrationMemoryManager & mm,
DiskReadThreadPool & diskReader,
TapeSingleThreadInterface<TapeWriteTaskInterface> & tapeWriter,client::ClientInterface& client,
......@@ -55,11 +64,11 @@ namespace daemon {
}
/**
* Create all the tape-read and write-disk tasks for set of files to retrieve
* @param jobs to transform into tasks
*/
void MigrationTaskInjector::injectBulkMigrations(const std::vector<tapegateway::FileToMigrateStruct*>& jobs){
//------------------------------------------------------------------------------
//injectBulkMigrations
//------------------------------------------------------------------------------
void MigrationTaskInjector::injectBulkMigrations(
const std::vector<tapegateway::FileToMigrateStruct*>& jobs){
const u_signed64 blockCapacity = m_memManager.blockCapacity();
for(std::vector<tapegateway::FileToMigrateStruct*>::const_iterator it= jobs.begin();it!=jobs.end();++it){
......@@ -88,27 +97,32 @@ namespace daemon {
}
}
/**
* Wait for the inner thread to finish
*/
//------------------------------------------------------------------------------
//injectBulkMigrations
//------------------------------------------------------------------------------
void MigrationTaskInjector::waitThreads(){
m_thread.wait();
}
/**
* Start the inner thread
*/
//------------------------------------------------------------------------------
//injectBulkMigrations
//------------------------------------------------------------------------------
void MigrationTaskInjector::startThreads(){
m_thread.start();
}
//------------------------------------------------------------------------------
//requestInjection
//------------------------------------------------------------------------------
void MigrationTaskInjector::requestInjection( bool lastCall) {
castor::tape::threading::MutexLocker ml(&m_producerProtection);
if(!m_errorFlag) {
m_queue.push(Request(m_maxFiles, m_maxByte, lastCall));
}
}
//------------------------------------------------------------------------------
//synchronousInjection
//------------------------------------------------------------------------------
bool MigrationTaskInjector::synchronousInjection() {
client::ClientProxy::RequestReport reqReport;
std::auto_ptr<tapegateway::FilesToMigrateList>
......@@ -123,13 +137,17 @@ namespace daemon {
return true;
}
}
//------------------------------------------------------------------------------
//finish
//------------------------------------------------------------------------------
void MigrationTaskInjector::finish(){
castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_queue.push(Request());
}
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
//WorkerThread::run
//------------------------------------------------------------------------------
void MigrationTaskInjector::WorkerThread::run(){
m_parent.m_lc.pushOrReplace(Param("thread", "MigrationTaskInjector"));
m_parent.m_lc.log(LOG_INFO, "Starting MigrationTaskInjector thread");
......
......@@ -46,18 +46,26 @@ namespace daemon {
class MigrationTaskInjector: public TaskInjector {
public:
/**
* Constructor
* @param mm The memory manager for accessing memory blocks.
* The Newly created tapeWriter Tasks will register themselves
* as a client to it.
* @param diskReader the one object that will hold all the threads which will be executing
* disk-reading tasks
* @param tapeWriter the one object that will hold the thread which will be executing
* tape-writing tasks
* @param client The one that will give us files to migrate
* @param maxFiles maximal number of files we may request to the client at once
* @param byteSizeThreshold maximal number of cumulated byte
* we may request to the client. at once
* @param lc log context, copied because of the threading mechanism
*/
MigrationTaskInjector(MigrationMemoryManager & mm,
DiskReadThreadPool & diskReader,
TapeSingleThreadInterface<TapeWriteTaskInterface> & tapeWriter,client::ClientInterface& client,
uint64_t maxFiles, uint64_t byteSizeThreshold,castor::log::LogContext lc);
/**
* Create all the tape-read and write-disk tasks for set of files to retrieve
* @param jobs
*/
void injectBulkMigrations(const std::vector<castor::tape::tapegateway::FileToMigrateStruct*>& jobs);
/**
* Wait for the inner thread to finish
*/
......@@ -72,8 +80,6 @@ public:
* DiskReadThreadPool. When DiskReadThreadPool::popAndRequestMoreJobs detects
* it has not enough jobs to do to, it is class to push a request
* in order to (try) fill up the queue.
* @param maxFiles files count requested.
* @param maxBlocks total bytes count at least requested
* @param lastCall true if we want the new request to be a last call.
* See Request::lastCall
*/
......@@ -83,8 +89,6 @@ public:
* Contact the client to make sure there are really something to do
* Something = migration at most maxFiles or at least maxBytes
*
* @param maxFiles files count requested.
* @param byteSizeThreshold total bytes count at least requested
* @return true if there are jobs to be done, false otherwise
*/
bool synchronousInjection();
......@@ -95,6 +99,11 @@ public:
*/
void finish();
private:
/**
* Create all the tape-read and write-disk tasks for set of files to retrieve
* @param jobs the list of FileToMigrateStructs we have to transform in a pair of task
*/
void injectBulkMigrations(const std::vector<castor::tape::tapegateway::FileToMigrateStruct*>& jobs);
/*Compute how many blocks are needed for a file of fileSize bytes*/
size_t howManyBlocksNeeded(size_t fileSize,size_t blockCapacity){
......@@ -139,12 +148,18 @@ private:
private:
MigrationTaskInjector & m_parent;
} m_thread;
///The memory manager for accessing memory blocks.
MigrationMemoryManager & m_memManager;
///the one object that will hold the thread which will be executing
///tape-writing tasks
TapeSingleThreadInterface<TapeWriteTaskInterface>& m_tapeWriter;
///the one object that will hold all the threads which will be executing
///disk-reading tasks
DiskReadThreadPool & m_diskReader;
/// the client who is sending us jobs
client::ClientInterface& m_client;
/**
......@@ -153,14 +168,20 @@ private:
castor::log::LogContext m_lc;
castor::tape::threading::Mutex m_producerProtection;
///all the requests for work we will forward to the client.
castor::tape::threading::BlockingQueue<Request> m_queue;
/** a shared flag among the all tasks related to migration, set as true
* as soon a single task encounters a failure. That way we go into a degraded mode
* where we only circulate memory without writing anything on tape
*/
castor::tape::threading::AtomicFlag m_errorFlag;
//maximal number of files requested. at once
/// maximal number of files requested. at once
const uint64_t m_maxFiles;
//maximal number of cumulated byte requested. at once
/// maximal number of cumulated byte requested. at once
const uint64_t m_maxByte;
};
......
/******************************************************************************
* RecallReportPacker.hpp
* RecallReportPacker.cpp
*
* This file is part