DiskWriteThreadPool.cpp 5.04 KB
Newer Older
1
#include "castor/tape/tapeserver/daemon/DiskWriteThreadPool.hpp"
2
#include "log.h"
3
#include <memory>
4
#include <sstream>
5
6
7
8
9
10
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {


11
  DiskWriteThreadPool::DiskWriteThreadPool(int nbThread, int maxFilesReq, int maxBlocksReq,
12
          ReportPackerInterface<detail::Recall>& report,castor::log::LogContext lc):
13
          m_maxFilesReq(maxFilesReq), m_maxBytesReq(maxBlocksReq),
14
          m_reporter(report),m_lc(lc)
15
16
17
  {
    m_lc.pushOrReplace(castor::log::Param("threadCount", nbThread));
    m_lc.log(LOG_INFO, "Creating threads in DiskWriteThreadPool::DiskWriteThreadPool");
18
19
20
21
22
    for(int i=0; i<nbThread; i++) {
      DiskWriteWorkerThread * thr = new DiskWriteWorkerThread(*this);
      m_threads.push_back(thr);
    }
  }
23
  DiskWriteThreadPool::~DiskWriteThreadPool() {
24
25
26
27
28
29
    while (m_threads.size()) {
      delete m_threads.back();
      m_threads.pop_back();
    }
  }
  void DiskWriteThreadPool::startThreads() {
30
    m_lc.log(LOG_INFO, "Starting threads in DiskWriteThreadPool::DiskWriteThreadPool");
31
32
    for (std::vector<DiskWriteWorkerThread *>::iterator i=m_threads.begin();
            i != m_threads.end(); i++) {
33
      (*i)->start();
34
35
    }
  }
36
  
37
38
39
  void DiskWriteThreadPool::waitThreads() {
    for (std::vector<DiskWriteWorkerThread *>::iterator i=m_threads.begin();
            i != m_threads.end(); i++) {
40
      (*i)->wait();
41
42
43
    }
  }
   void DiskWriteThreadPool::push(DiskWriteTaskInterface *t) { 
David COME's avatar
David COME committed
44
45
46
47
48
49
50
     {//begin of critical section
       castor::tape::threading::MutexLocker ml(&m_counterProtection);
       if(NULL==t){
         throw castor::tape::Exception("NULL task should not been directly pushed into DiskWriteThreadPool");
       }
     }
      m_tasks.push(t);
51
52
  }
  void DiskWriteThreadPool::finish() {
David COME's avatar
David COME committed
53
    castor::tape::threading::MutexLocker ml(&m_counterProtection);
54
55
56
57
58
    for (size_t i=0; i<m_threads.size(); i++) {
      m_tasks.push(NULL);
    }
  }

59
60
  bool DiskWriteThreadPool::belowMidFilesAfterPop(int filesPopped) const {
    return m_tasks.size() -filesPopped < m_maxFilesReq/2;
61
  }
62

63
64
  bool DiskWriteThreadPool::crossingDownFileThreshod(int filesPopped) const {
    return (m_tasks.size() >= m_maxFilesReq/2) && (m_tasks.size() - filesPopped < m_maxFilesReq/2);
65
66
67
68
69
70
  }
  DiskWriteTaskInterface * DiskWriteThreadPool::popAndRequestMoreJobs() {
    using castor::log::LogContext;
    using castor::log::Param;
    
    DiskWriteTaskInterface * ret = m_tasks.pop();
71
    // TODO: completely remove task injection in writers, move it to readers
72
    if(ret)
73
74
75
    {
      castor::tape::threading::MutexLocker ml(&m_counterProtection);
      // We are about to go to empty: request a last call job injection 
76
      if(m_tasks.size() == 1) {
77
78
        
        LogContext::ScopedParam sp[]={
79
          LogContext::ScopedParam(m_lc, Param("files",m_tasks.size())),
80
          LogContext::ScopedParam(m_lc, Param("ret->files", 1)),
81
82
83
84
85
86
87
88
          LogContext::ScopedParam(m_lc, Param("maxFiles", m_maxFilesReq)),
          LogContext::ScopedParam(m_lc, Param("maxBlocks", m_maxBytesReq))
        };
        tape::utils::suppresUnusedVariable(sp);
    
        m_lc.log(LOG_INFO, "In DiskWriteTaskInterface::popAndRequestMoreJobs(), requesting last call");
        //if we are below mid on both block and files and we are crossing a threshold 
        //on either files of blocks, then request more jobs
89
      } else if ( belowMidFilesAfterPop(1) && crossingDownFileThreshod(1)) {
90
        LogContext::ScopedParam sp[]={
91
          LogContext::ScopedParam(m_lc, Param("files",m_tasks.size())),
92
          LogContext::ScopedParam(m_lc, Param("ret->files", 1)),
93
94
95
96
97
          LogContext::ScopedParam(m_lc, Param("maxFiles", m_maxFilesReq)),
          LogContext::ScopedParam(m_lc, Param("maxBlocks", m_maxBytesReq))
        };
        tape::utils::suppresUnusedVariable(sp);
        m_lc.log(LOG_INFO, "In DiskWriteTaskInterface::popAndRequestMoreJobs(), requesting: files");
98
      }      
99
100
101
    }
    return ret;
  }
102
  void DiskWriteThreadPool::DiskWriteWorkerThread::run() {
103
104
    m_lc.pushOrReplace(log::Param("thread", "diskWrite"));
    m_lc.log(LOG_INFO, "Starting DiskWriteWorkerThread");
105
106
    std::auto_ptr<DiskWriteTaskInterface>  task;
    while(1) {
107
      task.reset(m_parentThreadPool. m_tasks.pop());
108
      if (NULL!=task.get()) {
109
110
        if(false==task->execute(m_parentThreadPool.m_reporter,m_lc)) {
          ++m_parentThreadPool.m_failedWriteCount; 
111
112
        }
      } //end of task!=NULL
David COME's avatar
David COME committed
113
      else {
114
115
        log::LogContext::ScopedParam param(m_lc, log::Param("threadID", m_threadID));
        m_lc.log(LOG_INFO,"Disk write thread finishing");
David COME's avatar
David COME committed
116
117
118
        break;
      }
    } //enf of while(1)
119
    
120
    if(0 == --m_parentThreadPool.m_nbActiveThread){
121
      //Im the last Thread alive, report end of session
122
123
      if(m_parentThreadPool.m_failedWriteCount==0){
        m_parentThreadPool.m_reporter.reportEndOfSession();
124
125
        //TODO
//        _this.m_jobInjector->end();
126
127
      }
      else{
128
        m_parentThreadPool.m_reporter.reportEndOfSessionWithErrors("A thread failed to write a file",SEINTERNAL);
129
      }
130
    }
131
    m_lc.log(LOG_INFO, "Finishing DiskWriteWorkerThread");
132
  }
133
134
}}}}