Commit 473540f2 authored by Eric Cano's avatar Eric Cano
Browse files

Code reformat:

Added Doxygen comments in DiskWriteTask.hpp.
Reformatted the code in DiskWriteTask.cpp, DiskWriteTask.cpp and added logging.
Removed unnecessary members m_maxFilesReq and m_maxBytesReq from DiskWriteThreadPool and adapted callers.
parent 48ac2127
......@@ -30,90 +30,96 @@ namespace tape {
namespace tapeserver {
namespace daemon {
/**
* Constructor
* @param file: All we need to know about the file we are recalling
* @param mm: memory manager of the session
*/
DiskWriteTask::DiskWriteTask(tape::tapegateway::FileToRecallStruct* file,RecallMemoryManager& mm):
m_recallingFile(file),m_memManager(mm){
}
/**
* Main routine: takes each memory block in the fifo and writes it to disk
* @return true if the file has been successfully written false otherwise.
*/
bool DiskWriteTask::execute(ReportPackerInterface<detail::Recall>& reporter,log::LogContext& lc) {
using log::LogContext;
using log::Param;
try{
tape::diskFile::WriteFile ourFile(m_recallingFile->path());
int blockId = 0;
unsigned long checksum = Payload::zeroAdler32();
while(1) {
if(MemBlock* const mb = m_fifo.pop()) {
AutoReleaseBlock<RecallMemoryManager> releaser(mb,m_memManager);
if(m_recallingFile->fileid() != static_cast<unsigned int>(mb->m_fileid)
|| blockId != mb->m_fileBlock || mb->m_failed ){
LogContext::ScopedParam sp[]={
LogContext::ScopedParam(lc, Param("expected_NSFILEID",m_recallingFile->fileid())),
LogContext::ScopedParam(lc, Param("received_NSFILEID", mb->m_fileid)),
LogContext::ScopedParam(lc, Param("expected_NSFBLOCKId", blockId)),
LogContext::ScopedParam(lc, Param("received_NSFBLOCKId", mb->m_fileBlock)),
LogContext::ScopedParam(lc, Param("failed_Status", mb->m_failed))
};
tape::utils::suppresUnusedVariable(sp);
lc.log(LOG_ERR,"received a bad block for writing");
throw castor::tape::Exception("received a bad block for writing");
}
mb->m_payload.write(ourFile);
checksum = mb->m_payload.adler32(checksum);
blockId++;
}
else
break;
} //end of while(1)
reporter.reportCompletedJob(*m_recallingFile,checksum);
return true;
} //end of try
catch(const castor::exception::Exception& e){
/*
*We might end up there with some blocks into m_fifo
* We need to empty it
*/
releaseAllBlock();
reporter.reportFailedJob(*m_recallingFile,e.getMessageValue(),e.code());
return false;
}
}
/**
* Allows client code to return a reusable memory block. Should not been called
* @return the pointer to the memory block that can be reused
*/
MemBlock *DiskWriteTask::getFreeBlock() {
throw castor::tape::Exception("DiskWriteTask::getFreeBlock should mot be called");
}
void DiskWriteTask::pushDataBlock(MemBlock *mb) {
castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_fifo.push(mb);
}
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
DiskWriteTask::DiskWriteTask(tape::tapegateway::FileToRecallStruct* file,RecallMemoryManager& mm):
m_recallingFile(file),m_memManager(mm){
DiskWriteTask::~DiskWriteTask() {
volatile castor::tape::threading::MutexLocker ml(&m_producerProtection);
}
}
void DiskWriteTask::releaseAllBlock(){
while(1){
if(MemBlock* mb=m_fifo.pop())
AutoReleaseBlock<RecallMemoryManager> release(mb,m_memManager);
//------------------------------------------------------------------------------
// DiskWriteTask::execute
//------------------------------------------------------------------------------
bool DiskWriteTask::execute(ReportPackerInterface<detail::Recall>& reporter,log::LogContext& lc) {
using log::LogContext;
using log::Param;
try{
tape::diskFile::WriteFile ourFile(m_recallingFile->path());
int blockId = 0;
unsigned long checksum = Payload::zeroAdler32();
while(1) {
if(MemBlock* const mb = m_fifo.pop()) {
AutoReleaseBlock<RecallMemoryManager> releaser(mb,m_memManager);
if(m_recallingFile->fileid() != static_cast<unsigned int>(mb->m_fileid)
|| blockId != mb->m_fileBlock || mb->m_failed ){
LogContext::ScopedParam sp[]={
LogContext::ScopedParam(lc, Param("expected_NSFILEID",m_recallingFile->fileid())),
LogContext::ScopedParam(lc, Param("received_NSFILEID", mb->m_fileid)),
LogContext::ScopedParam(lc, Param("expected_NSFBLOCKId", blockId)),
LogContext::ScopedParam(lc, Param("received_NSFBLOCKId", mb->m_fileBlock)),
LogContext::ScopedParam(lc, Param("failed_Status", mb->m_failed))
};
tape::utils::suppresUnusedVariable(sp);
lc.log(LOG_ERR,"received a bad block for writing");
throw castor::tape::Exception("received a bad block for writing");
}
mb->m_payload.write(ourFile);
checksum = mb->m_payload.adler32(checksum);
blockId++;
}
else
break;
}
} //end of while(1)
reporter.reportCompletedJob(*m_recallingFile,checksum);
return true;
} //end of try
catch(const castor::exception::Exception& e){
/*
*We might end up there with some blocks into m_fifo
* We need to empty it
*/
releaseAllBlock();
reporter.reportFailedJob(*m_recallingFile,e.getMessageValue(),e.code());
return false;
}
}
//------------------------------------------------------------------------------
// DiskWriteTask::getFreeBlock
//------------------------------------------------------------------------------
MemBlock *DiskWriteTask::getFreeBlock() {
throw castor::tape::Exception("DiskWriteTask::getFreeBlock should mot be called");
}
//------------------------------------------------------------------------------
// DiskWriteTask::pushDataBlock
//------------------------------------------------------------------------------
void DiskWriteTask::pushDataBlock(MemBlock *mb) {
castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_fifo.push(mb);
}
//------------------------------------------------------------------------------
// DiskWriteTask::~DiskWriteTask
//------------------------------------------------------------------------------
DiskWriteTask::~DiskWriteTask() {
volatile castor::tape::threading::MutexLocker ml(&m_producerProtection);
}
//------------------------------------------------------------------------------
// DiskWriteTask::releaseAllBlock
//------------------------------------------------------------------------------
void DiskWriteTask::releaseAllBlock(){
while(1){
if(MemBlock* mb=m_fifo.pop())
AutoReleaseBlock<RecallMemoryManager> release(mb,m_memManager);
else
break;
}
}
}}}}
......@@ -7,91 +7,119 @@ namespace tape {
namespace tapeserver {
namespace daemon {
DiskWriteThreadPool::DiskWriteThreadPool(int nbThread, int maxFilesReq, int maxBlocksReq,
ReportPackerInterface<detail::Recall>& report,castor::log::LogContext lc):
m_maxFilesReq(maxFilesReq), m_maxBytesReq(maxBlocksReq),
m_reporter(report),m_lc(lc)
{
m_lc.pushOrReplace(castor::log::Param("threadCount", nbThread));
m_lc.log(LOG_INFO, "Creating threads in DiskWriteThreadPool::DiskWriteThreadPool");
for(int i=0; i<nbThread; i++) {
DiskWriteWorkerThread * thr = new DiskWriteWorkerThread(*this);
m_threads.push_back(thr);
}
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
DiskWriteThreadPool::DiskWriteThreadPool(int nbThread,
ReportPackerInterface<detail::Recall>& report,castor::log::LogContext lc):
m_reporter(report),m_lc(lc)
{
m_lc.pushOrReplace(castor::log::Param("threadCount", nbThread));
for(int i=0; i<nbThread; i++) {
DiskWriteWorkerThread * thr = new DiskWriteWorkerThread(*this);
m_threads.push_back(thr);
}
DiskWriteThreadPool::~DiskWriteThreadPool() {
while (m_threads.size()) {
delete m_threads.back();
m_threads.pop_back();
}
}
void DiskWriteThreadPool::startThreads() {
m_lc.log(LOG_INFO, "Starting threads in DiskWriteThreadPool::DiskWriteThreadPool");
for (std::vector<DiskWriteWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->start();
}
m_lc.log(LOG_DEBUG, "Created threads in DiskWriteThreadPool::DiskWriteThreadPool");
}
//------------------------------------------------------------------------------
// DiskWriteThreadPool::~DiskWriteThreadPool
//------------------------------------------------------------------------------
DiskWriteThreadPool::~DiskWriteThreadPool() {
// A barrier preventing destruction of the object if a poster has still not
// returned yet from the push or finish function.
castor::tape::threading::MutexLocker ml(&m_pusherProtection);
while (m_threads.size()) {
delete m_threads.back();
m_threads.pop_back();
}
void DiskWriteThreadPool::waitThreads() {
for (std::vector<DiskWriteWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->wait();
}
m_lc.log(LOG_DEBUG, "Deleted threads in DiskWriteThreadPool::~DiskWriteThreadPool");
}
//------------------------------------------------------------------------------
// DiskWriteThreadPool::startThreads
//------------------------------------------------------------------------------
void DiskWriteThreadPool::startThreads() {
for (std::vector<DiskWriteWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->start();
}
void DiskWriteThreadPool::push(DiskWriteTask *t) {
{//begin of critical section
castor::tape::threading::MutexLocker ml(&m_counterProtection);
if(NULL==t){
throw castor::tape::Exception("NULL task should not been directly pushed into DiskWriteThreadPool");
}
}
m_tasks.push(t);
m_lc.log(LOG_INFO, "Starting threads in DiskWriteThreadPool::DiskWriteThreadPool");
}
//------------------------------------------------------------------------------
// DiskWriteThreadPool::waitThreads
//------------------------------------------------------------------------------
void DiskWriteThreadPool::waitThreads() {
for (std::vector<DiskWriteWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->wait();
}
void DiskWriteThreadPool::finish() {
castor::tape::threading::MutexLocker ml(&m_counterProtection);
for (size_t i=0; i<m_threads.size(); i++) {
m_tasks.push(NULL);
m_lc.log(LOG_INFO, "All DiskWriteThreadPool threads are now complete");
}
//------------------------------------------------------------------------------
// DiskWriteThreadPool::push
//------------------------------------------------------------------------------
void DiskWriteThreadPool::push(DiskWriteTask *t) {
{
if(NULL==t){
throw castor::tape::Exception("NULL task should not been directly pushed into DiskWriteThreadPool");
}
}
castor::tape::threading::MutexLocker ml(&m_pusherProtection);
m_tasks.push(t);
}
bool DiskWriteThreadPool::belowMidFilesAfterPop(int filesPopped) const {
return m_tasks.size() -filesPopped < m_maxFilesReq/2;
//------------------------------------------------------------------------------
// DiskWriteThreadPool::finish
//------------------------------------------------------------------------------
void DiskWriteThreadPool::finish() {
castor::tape::threading::MutexLocker ml(&m_pusherProtection);
for (size_t i=0; i<m_threads.size(); i++) {
m_tasks.push(NULL);
}
}
bool DiskWriteThreadPool::crossingDownFileThreshod(int filesPopped) const {
return (m_tasks.size() >= m_maxFilesReq/2) && (m_tasks.size() - filesPopped < m_maxFilesReq/2);
}
void DiskWriteThreadPool::DiskWriteWorkerThread::run() {
m_lc.pushOrReplace(log::Param("thread", "diskWrite"));
m_lc.log(LOG_INFO, "Starting DiskWriteWorkerThread");
std::auto_ptr<DiskWriteTask> task;
while(1) {
task.reset(m_parentThreadPool. m_tasks.pop());
if (NULL!=task.get()) {
if(false==task->execute(m_parentThreadPool.m_reporter,m_lc)) {
++m_parentThreadPool.m_failedWriteCount;
}
} //end of task!=NULL
else {
log::LogContext::ScopedParam param(m_lc, log::Param("threadID", m_threadID));
m_lc.log(LOG_INFO,"Disk write thread finishing");
break;
}
} //enf of while(1)
if(0 == --m_parentThreadPool.m_nbActiveThread){
//Im the last Thread alive, report end of session
if(m_parentThreadPool.m_failedWriteCount==0){
m_parentThreadPool.m_reporter.reportEndOfSession();
}
else{
m_parentThreadPool.m_reporter.reportEndOfSessionWithErrors("A thread failed to write a file",SEINTERNAL);
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
void DiskWriteThreadPool::DiskWriteWorkerThread::run() {
typedef castor::log::LogContext::ScopedParam ScopedParam;
using castor::log::Param;
m_lc.pushOrReplace(log::Param("thread", "diskWrite"));
m_lc.pushOrReplace(log::Param("threadID", m_threadID));
m_lc.log(LOG_INFO, "Starting DiskWriteWorkerThread");
std::auto_ptr<DiskWriteTask> task;
while(1) {
task.reset(m_parentThreadPool. m_tasks.pop());
if (NULL!=task.get()) {
if(false==task->execute(m_parentThreadPool.m_reporter,m_lc)) {
++m_parentThreadPool.m_failedWriteCount;
ScopedParam sp(m_lc, Param("errorCount", m_parentThreadPool.m_failedWriteCount));
m_lc.log(LOG_ERR, "Task failed: counting another error for this session");
}
} //end of task!=NULL
else {
m_lc.log(LOG_DEBUG,"DiskWriteWorkerThread exiting: no more work");
break;
}
} //enf of while(1)
if(0 == --m_parentThreadPool.m_nbActiveThread){
//Im the last Thread alive, report end of session
if(m_parentThreadPool.m_failedWriteCount==0){
m_parentThreadPool.m_reporter.reportEndOfSession();
m_lc.log(LOG_INFO, "As last exiting DiskWriteWorkerThread, reported a successful end of session");
}
else{
m_parentThreadPool.m_reporter.reportEndOfSessionWithErrors("A thread failed to write a file",SEINTERNAL);
ScopedParam sp(m_lc, Param("errorCount", m_parentThreadPool.m_failedWriteCount));
m_lc.log(LOG_ERR, "As last exiting DiskWriteWorkerThread, reported an end of session with errors");
}
m_lc.log(LOG_INFO, "Finishing DiskWriteWorkerThread");
}
m_lc.log(LOG_INFO, "Finishing DiskWriteWorkerThread");
}
}}}}
......@@ -40,26 +40,63 @@ namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
/**
* Container for the threads that will execute the disk writes tasks in the
* migration.
*/
class DiskWriteThreadPool {
public:
DiskWriteThreadPool(int nbThread, int maxFilesReq, int maxBlocksReq,
ReportPackerInterface<detail::Recall>& report,castor::log::LogContext lc);
/**
* Constructor: we create the thread structures here, but they do not get
* started yet.
* @param nbThread Fixed number of threads in the pool
* @param reportPacker Reference to a previously created recall
* report packer, to which the tasks will report their results.
* @param lc reference to a log context object that will be copied at
* construction time (and then copied further for each thread). There will
* be no side effect on the caller's logs.
*/
DiskWriteThreadPool(int nbThread,
ReportPackerInterface<detail::Recall>& reportPacker,
castor::log::LogContext lc);
/**
* Destructor: we suppose the threads are no running (waitThreads() should
* be called befor destruction unless the threads were not started.
*/
~DiskWriteThreadPool();
/**
* Starts the thread created at construction time.
*/
void startThreads();
/**
* Waits for completion of all the pool's threads.
*/
void waitThreads();
/**
* Pushes a pointer to a task. The thread pool owns the task and will
* de-allocate it.
* @param t pointer to the task
*/
void push(DiskWriteTask *t);
/**
* Signals to the thread pool that there will be no more tasks pushed to it,
* and that the threads can therefore complete.
*/
void finish();
private:
bool belowMidFilesAfterPop(int filesPopped) const ;
bool crossingDownFileThreshod(int filesPopped) const;
/** Running counter active threads, used to determine which thread is the last. */
tape::threading::AtomicCounter<int> m_nbActiveThread;
/** Thread safe counter for failed tasks */
tape::threading::AtomicCounter<int> m_failedWriteCount;
/**
* Private class implementing the worker threads.
*/
class DiskWriteWorkerThread: private castor::tape::threading::Thread {
public:
DiskWriteWorkerThread(DiskWriteThreadPool & manager):
......@@ -80,16 +117,20 @@ private:
virtual void run();
};
/** The actual container for the thread objects */
std::vector<DiskWriteWorkerThread *> m_threads;
castor::tape::threading::Mutex m_counterProtection;
/** Mutex protecting the pushers of new tasks from having the object deleted
* under their feet. */
castor::tape::threading::Mutex m_pusherProtection;
protected:
/** The (thread safe) queue of tasks */
castor::tape::threading::BlockingQueue<DiskWriteTask*> m_tasks;
private:
uint32_t m_maxFilesReq;
uint64_t m_maxBytesReq;
/** Reference to the report packer where tasks report the result of their
* individual files and the end of session (for the last thread) */
ReportPackerInterface<detail::Recall>& m_reporter;
//logging context that will copied by each thread
/** logging context that will be copied by each thread for individual context */
castor::log::LogContext m_lc;
};
......
......@@ -40,7 +40,7 @@ namespace unitTests{
RecallMemoryManager mm(10,100,lc);
DiskWriteThreadPool dwtp(2,5,500,report,lc);
DiskWriteThreadPool dwtp(2,report,lc);
dwtp.startThreads();
castor::tape::tapegateway::FileToRecallStruct file;
......
......@@ -141,8 +141,6 @@ void castor::tape::tapeserver::daemon::MountSession::executeRead(LogContext & lc
m_castorConf.tapebridgeBulkRequestMigrationMaxFiles,
lc);
DiskWriteThreadPool dwtp(m_castorConf.tapeserverdDiskThreads,
m_castorConf.tapebridgeBulkRequestRecallMaxFiles,
m_castorConf.tapebridgeBulkRequestRecallMaxBytes,
rrp,
lc);
RecallTaskInjector rti(mm, trst, dwtp, m_clientProxy,
......
......@@ -15,7 +15,7 @@ class FakeDiskWriteThreadPool: public DiskWriteThreadPool
public:
using DiskWriteThreadPool::m_tasks;
FakeDiskWriteThreadPool(castor::log::LogContext & lc):
DiskWriteThreadPool(1,0,0,*((ReportPackerInterface<detail::Recall>*)NULL), lc){}
DiskWriteThreadPool(1,*((ReportPackerInterface<detail::Recall>*)NULL), lc){}
virtual ~FakeDiskWriteThreadPool() {};
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment