Commit cb7d2a10 authored by Eric Cano's avatar Eric Cano
Browse files

Refactored member names in diskWriteThreadPoll and Thread, and moved atomic...

Refactored member names in diskWriteThreadPoll and Thread, and moved atomic counters to th Thread pool.
parent e3881eea
......@@ -26,13 +26,13 @@ namespace daemon {
void DiskWriteThreadPool::startThreads() {
for (std::vector<DiskWriteWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->startThreads();
(*i)->start();
}
}
void DiskWriteThreadPool::waitThreads() {
for (std::vector<DiskWriteWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->waitThreads();
(*i)->wait();
}
}
void DiskWriteThreadPool::push(DiskWriteTaskInterface *t) {
......@@ -98,33 +98,30 @@ namespace daemon {
std::auto_ptr<DiskWriteTaskInterface> task;
while(1) {
task.reset(_this. m_tasks.pop());
task.reset(m_parentThreadPool. m_tasks.pop());
if (NULL!=task.get()) {
if(false==task->execute(_this.m_reporter,lc)) {
++failledWritting;
if(false==task->execute(m_parentThreadPool.m_reporter,m_lc)) {
++m_parentThreadPool.m_failedWriteCount;
}
} //end of task!=NULL
else {
log::LogContext::ScopedParam param(lc, log::Param("threadID", threadID));
lc.log(LOG_INFO,"Disk write thread finishing");
log::LogContext::ScopedParam param(m_lc, log::Param("threadID", m_threadID));
m_lc.log(LOG_INFO,"Disk write thread finishing");
break;
}
} //enf of while(1)
if(0 == --m_nbActiveThread){
if(0 == --m_parentThreadPool.m_nbActiveThread){
//Im the last Thread alive, report end of session
if(failledWritting==0){
_this.m_reporter.reportEndOfSession();
if(m_parentThreadPool.m_failedWriteCount==0){
m_parentThreadPool.m_reporter.reportEndOfSession();
//TODO
// _this.m_jobInjector->end();
}
else{
_this.m_reporter.reportEndOfSessionWithErrors("A thread failed to write a file",SEINTERNAL);
m_parentThreadPool.m_reporter.reportEndOfSessionWithErrors("A thread failed to write a file",SEINTERNAL);
}
}
}
tape::threading::AtomicCounter<int> DiskWriteThreadPool::DiskWriteWorkerThread::m_nbActiveThread(0);
tape::threading::AtomicCounter<int> DiskWriteThreadPool::DiskWriteWorkerThread::failledWritting(0);
}}}}
......@@ -63,24 +63,26 @@ private:
*/
DiskWriteTaskInterface * popAndRequestMoreJobs() ;
tape::threading::AtomicCounter<int> m_nbActiveThread;
tape::threading::AtomicCounter<int> m_failedWriteCount;
class DiskWriteWorkerThread: private castor::tape::threading::Thread {
public:
DiskWriteWorkerThread(DiskWriteThreadPool & manager):
threadID(m_nbActiveThread++),_this(manager),lc(_this.m_lc)
m_threadID(manager.m_nbActiveThread++),m_parentThreadPool(manager),m_lc(m_parentThreadPool.m_lc)
{
log::LogContext::ScopedParam param(lc, log::Param("threadID", threadID));
lc.log(LOG_INFO,"DiskWrite Thread created");
// This thread Id will remain for the rest of the thread's lifetime (and
// also context's lifetime) so ne need for a scope.
m_lc.pushOrReplace(log::Param("threadID", m_threadID));
m_lc.log(LOG_INFO,"DiskWrite Thread created");
}
void startThreads() { start(); }
void waitThreads() { wait(); }
void start() { castor::tape::threading::Thread::start(); }
void wait() { castor::tape::threading::Thread::wait(); }
private:
//counter to generate threadID and to know how many thread are still doing something
static tape::threading::AtomicCounter<int> m_nbActiveThread;
static tape::threading::AtomicCounter<int> failledWritting;
const int threadID;
DiskWriteThreadPool & _this;
castor::log::LogContext lc;
const int m_threadID;
DiskWriteThreadPool & m_parentThreadPool;
castor::log::LogContext m_lc;
virtual void run();
};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment