Commit 0c1de64e authored by Eric Cano's avatar Eric Cano
Browse files

Improved comments and indentation DiskReadTask, DiskReadTaskInterface, DiskReadThreadPool

Removed interface classes DiskReadTaskInterface, DiskWriteTaskInterface, DiskThreadPoolInterface
parent d2e1d6d0
......@@ -25,7 +25,7 @@
#include "castor/tape/tapeserver/daemon/DiskReadTask.hpp"
#include "castor/log/LogContext.hpp"
namespace{
/*Use RAII to make sure the memory block is released
/** Use RAII to make sure the memory block is released
*(ie pushed back to the memory manager) in any case (exception or not)
*/
class AutoPushBlock{
......@@ -47,81 +47,90 @@ namespace tape {
namespace tapeserver {
namespace daemon {
DiskReadTask::DiskReadTask(DataConsumer & destination,
tape::tapegateway::FileToMigrateStruct* file,
size_t numberOfBlock,castor::tape::threading::AtomicFlag& errorFlag):
m_nextTask(destination),m_migratedFile(file),
m_numberOfBlock(numberOfBlock),m_errorFlag(errorFlag)
{}
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
DiskReadTask::DiskReadTask(DataConsumer & destination,
tape::tapegateway::FileToMigrateStruct* file,
size_t numberOfBlock,castor::tape::threading::AtomicFlag& errorFlag):
m_nextTask(destination),m_migratedFile(file),
m_numberOfBlock(numberOfBlock),m_errorFlag(errorFlag)
{}
//------------------------------------------------------------------------------
// DiskReadTask::execute
//------------------------------------------------------------------------------
void DiskReadTask::execute(log::LogContext& lc) {
size_t blockId=0;
size_t migratingFileSize=m_migratedFile->fileSize();
try{
//we first check here to not even try to open the disk if a previous task has failed
//because the disk could the very reason why the previous one failed,
//so dont do the same mistake twice !
hasAnotherTaskTailed();
tape::diskFile::ReadFile sourceFile(m_migratedFile->path());
log::LogContext::ScopedParam sp(lc, log::Param("filePath",m_migratedFile->path()));
lc.log(LOG_INFO,"Opened file on disk for migration ");
while(migratingFileSize>0){
void DiskReadTask::execute(log::LogContext& lc) {
size_t blockId=0;
size_t migratingFileSize=m_migratedFile->fileSize();
try{
//we first check here to not even try to open the disk if a previous task has failed
//because the disk could the very reason why the previous one failed,
//so dont do the same mistake twice !
hasAnotherTaskTailed();
tape::diskFile::ReadFile sourceFile(m_migratedFile->path());
MemBlock* const mb = m_nextTask.getFreeBlock();
AutoPushBlock push(mb,m_nextTask);
log::LogContext::ScopedParam sp(lc, log::Param("filePath",m_migratedFile->path()));
lc.log(LOG_INFO,"Opened file on disk for migration ");
//set metadata and read the data
mb->m_fileid = m_migratedFile->fileid();
mb->m_fileBlock = blockId++;
while(migratingFileSize>0){
hasAnotherTaskTailed();
MemBlock* const mb = m_nextTask.getFreeBlock();
AutoPushBlock push(mb,m_nextTask);
//set metadata and read the data
mb->m_fileid = m_migratedFile->fileid();
mb->m_fileBlock = blockId++;
migratingFileSize -= mb->m_payload.read(sourceFile);
//we either read at full capacity (ie size=capacity) or if there different,
//it should be the end => migratingFileSize should be 0. If it not, error
if(mb->m_payload.size() != mb->m_payload.totalCapacity() && migratingFileSize>0){
mb->m_failed=true;
throw castor::tape::Exception("Error while reading a file. Did not read at full capacity but the file is not fully read");
}
} //end of while(migratingFileSize>0)
}
catch(const castor::tape::exceptions::ErrorFlag&){
lc.log(LOG_INFO,"DiskReadTask: a previous file has failed for migration "
"Do nothing except circulating blocks");
circulateAllBlocks(blockId);
}
catch(const castor::tape::Exception& e){
//signal to all others task that this session is screwed
m_errorFlag.set();
//we have to pump the blocks anyway, mark them failed and then pass them back to TapeWrite
//Otherwise they would be stuck into TapeWriteTask free block fifo
using log::LogContext;
using log::Param;
migratingFileSize -= mb->m_payload.read(sourceFile);
LogContext::ScopedParam sp(lc, Param("blockID",blockId));
LogContext::ScopedParam sp0(lc, Param("exceptionCode",e.code()));
LogContext::ScopedParam sp1(lc, Param("exceptionMessage", e.getMessageValue()));
lc.log(LOG_ERR,"Exception while reading a file");
//deal here the number of mem block
circulateAllBlocks(blockId);
} //end of catch
//we either read at full capacity (ie size=capacity) or if there different,
//it should be the end => migratingFileSize should be 0. If it not, error
if(mb->m_payload.size() != mb->m_payload.totalCapacity() && migratingFileSize>0){
mb->m_failed=true;
throw castor::tape::Exception("Error while reading a file. Did not read at full capacity but the file is not fully read");
}
} //end of while(migratingFileSize>0)
}
catch(const castor::tape::exceptions::ErrorFlag&){
lc.log(LOG_INFO,"DiskReadTask: a previous file has failed for migration "
"Do nothing except circulating blocks");
circulateAllBlocks(blockId);
}
catch(const castor::tape::Exception& e){
//signal to all others task that this session is screwed
m_errorFlag.set();
//we have to pump the blocks anyway, mark them failed and then pass them back to TapeWrite
//Otherwise they would be stuck into TapeWriteTask free block fifo
using log::LogContext;
using log::Param;
LogContext::ScopedParam sp(lc, Param("blockID",blockId));
LogContext::ScopedParam sp0(lc, Param("exceptionCode",e.code()));
LogContext::ScopedParam sp1(lc, Param("exceptionMessage", e.getMessageValue()));
lc.log(LOG_ERR,"Exception while reading a file");
//deal here the number of mem block
circulateAllBlocks(blockId);
} //end of catch
}
void DiskReadTask::circulateAllBlocks(size_t fromBlockId){
size_t blockId = fromBlockId;
while(blockId<m_numberOfBlock) {
MemBlock * mb = m_nextTask.getFreeBlock();
mb->m_failed=true;
m_nextTask.pushDataBlock(mb);
++blockId;
} //end of while
}
//------------------------------------------------------------------------------
// DiskReadTask::circulateAllBlocks
//------------------------------------------------------------------------------
void DiskReadTask::circulateAllBlocks(size_t fromBlockId){
size_t blockId = fromBlockId;
while(blockId<m_numberOfBlock) {
MemBlock * mb = m_nextTask.getFreeBlock();
mb->m_failed=true;
m_nextTask.pushDataBlock(mb);
++blockId;
} //end of while
}
}}}}
......@@ -24,17 +24,17 @@
#pragma once
#include "castor/tape/tapeserver/daemon/DiskReadTaskInterface.hpp"
#include "castor/tape/tapeserver/daemon/DataFifo.hpp"
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
#include "castor/tape/tapegateway/FileToMigrateStruct.hpp"
#include "castor/tape/tapeserver/threading/AtomicCounter.hpp"
#include "castor/log/LogContext.hpp"
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
class DiskReadTask :public DiskReadTaskInterface {
class DiskReadTask {
public:
/**
* @param destination The task that will consume data block we fill up
......
/******************************************************************************
* DiskReadTask.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
#include "castor/tape/tapeserver/daemon/Exception.hpp"
namespace castor {
namespace log{
class LogContext;
}
namespace tape {
namespace tapeserver {
namespace daemon {
class DiskReadTaskInterface {
public:
virtual void execute(log::LogContext& lc) =0;
virtual ~DiskReadTaskInterface() {}
};
}}}}
......@@ -31,83 +31,114 @@ namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
DiskReadThreadPool::DiskReadThreadPool(int nbThread, unsigned int maxFilesReq,unsigned int maxBytesReq,
castor::log::LogContext lc) : m_lc(lc),m_maxFilesReq(maxFilesReq),m_maxBytesReq(maxBytesReq),m_nbActiveThread(0){
for(int i=0; i<nbThread; i++) {
DiskReadWorkerThread * thr = new DiskReadWorkerThread(*this);
m_threads.push_back(thr);
m_lc.pushOrReplace(log::Param("threadID",i));
m_lc.log(LOG_INFO, "DiskReadWorkerThread created");
}
//------------------------------------------------------------------------------
// DiskReadThreadPool constructor
//------------------------------------------------------------------------------
DiskReadThreadPool::DiskReadThreadPool(int nbThread, unsigned int maxFilesReq,unsigned int maxBytesReq,
castor::log::LogContext lc) : m_lc(lc),m_maxFilesReq(maxFilesReq),m_maxBytesReq(maxBytesReq),m_nbActiveThread(0){
for(int i=0; i<nbThread; i++) {
DiskReadWorkerThread * thr = new DiskReadWorkerThread(*this);
m_threads.push_back(thr);
m_lc.pushOrReplace(log::Param("threadID",i));
m_lc.log(LOG_INFO, "DiskReadWorkerThread created");
}
DiskReadThreadPool::~DiskReadThreadPool() {
while (m_threads.size()) {
delete m_threads.back();
m_threads.pop_back();
}
m_lc.log(LOG_INFO, "All the DiskReadWorkerThreads have been destroyed");
}
//------------------------------------------------------------------------------
// DiskReadThreadPool destructor
//------------------------------------------------------------------------------
DiskReadThreadPool::~DiskReadThreadPool() {
while (m_threads.size()) {
delete m_threads.back();
m_threads.pop_back();
}
void DiskReadThreadPool::startThreads() {
for (std::vector<DiskReadWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->startThreads();
}
m_lc.log(LOG_INFO, "All the DiskReadWorkerThreads are started");
m_lc.log(LOG_INFO, "All the DiskReadWorkerThreads have been destroyed");
}
//------------------------------------------------------------------------------
// DiskReadThreadPool::startThreads
//------------------------------------------------------------------------------
void DiskReadThreadPool::startThreads() {
for (std::vector<DiskReadWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->start();
}
void DiskReadThreadPool::waitThreads() {
for (std::vector<DiskReadWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->waitThreads();
}
m_lc.log(LOG_INFO, "All the DiskReadWorkerThreads are started");
}
//------------------------------------------------------------------------------
// DiskReadThreadPool::waitThreads
//------------------------------------------------------------------------------
void DiskReadThreadPool::waitThreads() {
for (std::vector<DiskReadWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->wait();
}
void DiskReadThreadPool::push(DiskReadTaskInterface *t) {
m_tasks.push(t);
m_lc.log(LOG_INFO, "Push a task into the DiskReadThreadPool");
}
//------------------------------------------------------------------------------
// DiskReadThreadPool::push
//------------------------------------------------------------------------------
void DiskReadThreadPool::push(DiskReadTask *t) {
m_tasks.push(t);
m_lc.log(LOG_INFO, "Push a task into the DiskReadThreadPool");
}
//------------------------------------------------------------------------------
// DiskReadThreadPool::finish
//------------------------------------------------------------------------------
void DiskReadThreadPool::finish() {
/* Insert one endOfSession per thread */
for (size_t i=0; i<m_threads.size(); i++) {
m_tasks.push(NULL);
}
void DiskReadThreadPool::finish() {
/* Insert one endOfSession per thread */
for (size_t i=0; i<m_threads.size(); i++) {
m_tasks.push(NULL);
}
}
//------------------------------------------------------------------------------
// DiskReadThreadPool::popAndRequestMore
//------------------------------------------------------------------------------
DiskReadTask* DiskReadThreadPool::popAndRequestMore(castor::log::LogContext &lc){
castor::tape::threading::BlockingQueue<DiskReadTask*>::valueRemainingPair
vrp = m_tasks.popGetSize();
log::LogContext::ScopedParam sp(lc, log::Param("m_maxFilesReq", m_maxFilesReq));
log::LogContext::ScopedParam sp0(lc, log::Param("m_maxBytesReq", m_maxBytesReq));
if(0==vrp.remaining){
m_injector->requestInjection(true);
lc.log(LOG_DEBUG, "Requested injection from MigrationTaskInjector (with last call)");
}else if(vrp.remaining + 1 == m_maxFilesReq/2){
m_injector->requestInjection(false);
lc.log(LOG_DEBUG, "Requested injection from MigrationTaskInjector (without last call)");
}
DiskReadTaskInterface* DiskReadThreadPool::popAndRequestMore(castor::log::LogContext &lc){
castor::tape::threading::BlockingQueue<DiskReadTaskInterface*>::valueRemainingPair
vrp = m_tasks.popGetSize();
log::LogContext::ScopedParam sp(lc, log::Param("m_maxFilesReq", m_maxFilesReq));
log::LogContext::ScopedParam sp0(lc, log::Param("m_maxBytesReq", m_maxBytesReq));
return vrp.value;
}
if(0==vrp.remaining){
m_injector->requestInjection(true);
lc.log(LOG_DEBUG, "Requested injection from MigrationTaskInjector (with last call)");
}else if(vrp.remaining + 1 == m_maxFilesReq/2){
m_injector->requestInjection(false);
lc.log(LOG_DEBUG, "Requested injection from MigrationTaskInjector (without last call)");
//------------------------------------------------------------------------------
// DiskReadThreadPool::DiskReadWorkerThread::run
//------------------------------------------------------------------------------
void DiskReadThreadPool::DiskReadWorkerThread::run() {
m_lc.pushOrReplace(log::Param("thread", "DiskRead"));
m_lc.pushOrReplace(log::Param("threadID",m_threadID));
m_lc.log(LOG_DEBUG, "DiskReadWorkerThread Running");
std::auto_ptr<DiskReadTask> task;
while(1) {
task.reset( m_parent.popAndRequestMore(m_lc));
if (NULL!=task.get()) {
task->execute(m_lc);
}
return vrp.value;
}
void DiskReadThreadPool::DiskReadWorkerThread::run() {
m_lc.pushOrReplace(log::Param("thread", "DiskRead"));
m_lc.pushOrReplace(log::Param("threadID",m_threadID));
m_lc.log(LOG_DEBUG, "DiskReadWorkerThread Running");
std::auto_ptr<DiskReadTaskInterface> task;
while(1) {
task.reset( m_parent.popAndRequestMore(m_lc));
if (NULL!=task.get()) {
task->execute(m_lc);
}
else {
break;
}
} //end of while(1)
// We now acknowledge to the task injector that read reached the end. There
// will hence be no more requests for more. (last thread turns off the light)
if (0 == --m_parent.m_nbActiveThread) {
m_parent.m_injector->finish();
m_lc.log(LOG_INFO, "Signaled to task injector the end of disk read threads");
else {
break;
}
m_lc.log(LOG_INFO, "Finishing of DiskReadWorkerThread");
} //end of while(1)
// We now acknowledge to the task injector that read reached the end. There
// will hence be no more requests for more. (last thread turns off the light)
if (0 == --m_parent.m_nbActiveThread) {
m_parent.m_injector->finish();
m_lc.log(LOG_INFO, "Signaled to task injector the end of disk read threads");
}
m_lc.log(LOG_INFO, "Finishing of DiskReadWorkerThread");
}
}}}}
......@@ -24,23 +24,24 @@
#pragma once
#include "castor/tape/tapeserver/daemon/DiskReadTaskInterface.hpp"
#include "castor/tape/tapeserver/daemon/DiskReadTask.hpp"
#include "castor/tape/tapeserver/threading/BlockingQueue.hpp"
#include "castor/tape/tapeserver/threading/Threading.hpp"
#include "castor/tape/tapeserver/daemon/DiskThreadPoolInterface.hpp"
#include "castor/tape/tapeserver/threading/AtomicCounter.hpp"
#include "castor/tape/tapeserver/threading/Threading.hpp"
#include "castor/log/LogContext.hpp"
#include <vector>
#include <stdint.h>
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
class MigrationTaskInjector;
class DiskReadThreadPool : public DiskThreadPoolInterface<DiskReadTaskInterface> {
class DiskReadThreadPool {
public:
/**
* Constructor. The constructor creates the threads for the pool, but does not
* start them.
* @param nbThread Number of thread for reading files
* @param maxFilesReq maximal number of files we might require
* within a single request to the task injectore
......@@ -51,45 +52,106 @@ public:
DiskReadThreadPool(int nbThread, unsigned int maxFilesReq,unsigned int maxBytesReq,
castor::log::LogContext lc);
/**
* Destructor.
* Simply destroys the thread, which should have been joined by the caller
* before using waitThreads()
*/
~DiskReadThreadPool();
/**
* Starts the threads which were created at construction time.
*/
void startThreads();
/**
* Waits for threads completion of all threads. Should be called before
* destructor
*/
void waitThreads();
virtual void push(DiskReadTaskInterface *t);
/**
* Adds a DiskReadTask to the tape pool's queue of tasks.
* @param task pointer to the new task. The thread pool takes ownership of the
* task and will delete it after execution. Push() is not protected against races
* with finish() as the task injection is done from a single thread (the task
* injector)
*/
virtual void push(DiskReadTask *task);
/**
* Injects as many "end" tasks as there are threads in the thread pool.
* A thread getting such an end task will exit. This method is called by the
* task injector at the end of the tape session. It is not race protected.
* See push()
*/
void finish();
/**
* Sets up the pointer to the task injector. This cannot be done at
* construction time as both task injector and read thread pool refer to
* each other. This function should be called before starting the threads.
* This is used for the feedback loop where the injector is requested to
* fetch more work by the read thread pool when the task queue of the thread
* pool starts to run low.
*/
void setTaskInjector(MigrationTaskInjector* injector){
m_injector = injector;
}
private:
/** Get the next task to execute and if there is not enough tasks in queue,
* it will ask the TaskInjector to get more job
/**
* Get the next task to execute and if there is not enough tasks in queue,
* it will ask the TaskInjector to get more jobs.
* @return the next task to execute
*/
DiskReadTaskInterface* popAndRequestMore(castor::log::LogContext & lc);
DiskReadTask* popAndRequestMore(castor::log::LogContext & lc);
/**
* Subclass of the thread pool's worker thread.
*/
class DiskReadWorkerThread: private castor::tape::threading::Thread {
public:
DiskReadWorkerThread(DiskReadThreadPool & parent):
m_parent(parent),m_threadID(parent.m_nbActiveThread++),m_lc(m_parent.m_lc) {
m_parent(parent),m_threadID(parent.m_nbActiveThread++),m_lc(parent.m_lc) {
log::LogContext::ScopedParam param(m_lc, log::Param("threadID", m_threadID));
m_lc.log(LOG_INFO,"DiskWrite Thread created");
}
void startThreads() { start(); }
void waitThreads() { wait(); }
void start() { castor::tape::threading::Thread::start(); }
void wait() { castor::tape::threading::Thread::wait(); }
private:
/** Pointer to the thread pool, allowing calls to popAndRequestMore,
* and calling finish() on the task injector when the last thread
* is finishing (thanks to the actomic counter m_parent.m_nbActiveThread) */
DiskReadThreadPool & m_parent;
/** The sequential ID of the thread, used in logs */
const int m_threadID;
/** The local copy of the log context, allowing race-free logging with context
between threads. */
castor::log::LogContext m_lc;
/** The execution thread: pops and executes tasks (potentially asking for
more) and calls task injector's finish() on exit of the last thread. */
virtual void run();
};
/** Container for the threads */
std::vector<DiskReadWorkerThread *> m_threads;
/** The queue of pointer to tasks to be executed. We own the tasks (they are
* deleted by the threads after execution) */
castor::tape::threading::BlockingQueue<DiskReadTask *> m_tasks;
/** The log context. This is copied on construction to prevent interferences
* between threads.
*/
castor::log::LogContext m_lc;
/** Pointer to the task injector allowing request for more work, and
* termination signalling */
MigrationTaskInjector* m_injector;
const unsigned int m_maxFilesReq;
const unsigned int m_maxBytesReq;
/** The maximum number of files we ask per request. This value is also used as
* a threashold (half of it, indeed) to trigger the request for more work.
* Another request for more work is also triggered when the task FIFO gets empty.*/
const uint64_t m_maxFilesReq;
/** Same as m_maxFilesReq for size per request. */
const uint64_t m_maxBytesReq;
/** An atomic (i.e. thread safe) counter of the current number of thread (they
are counted up at creation time and down at completion time) */
tape::threading::AtomicCounter<int> m_nbActiveThread;
};
......
/*
* File: DiskThreadPoolInterface.h
* Author: dcome
*
* Created on March 18, 2014, 3:53 PM
*/
#pragma once
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
template <class Task> class DiskThreadPoolInterface
{
protected :
castor::tape::threading::BlockingQueue<Task*> m_tasks;
public :
virtual ~DiskThreadPoolInterface(){}
virtual void push(Task *t)=0;
virtual void finish()=0;
};
}}}}