DiskReadThreadPool.cpp 5.82 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/******************************************************************************
 *                      DiskReadThreadPool.cpp
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

#include "castor/tape/tapeserver/daemon/DiskReadThreadPool.hpp"
#include <memory>
#include <sstream>
28
#include "castor/tape/tapeserver/daemon/MigrationTaskInjector.hpp"
29
#include "log.h"
30
31
#include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp"

32
33
34
35
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
36
37
38
39
40
41
42
43
44
45
46

//------------------------------------------------------------------------------
// DiskReadThreadPool constructor
//------------------------------------------------------------------------------
DiskReadThreadPool::DiskReadThreadPool(int nbThread, unsigned int maxFilesReq,unsigned int maxBytesReq, 
        castor::log::LogContext lc) : m_lc(lc),m_maxFilesReq(maxFilesReq),m_maxBytesReq(maxBytesReq),m_nbActiveThread(0){
  for(int i=0; i<nbThread; i++) {
    DiskReadWorkerThread * thr = new DiskReadWorkerThread(*this);
    m_threads.push_back(thr);
    m_lc.pushOrReplace(log::Param("threadID",i));
    m_lc.log(LOG_INFO, "DiskReadWorkerThread created");
47
  }
48
49
50
51
52
53
54
55
56
}

//------------------------------------------------------------------------------
// DiskReadThreadPool destructor
//------------------------------------------------------------------------------
DiskReadThreadPool::~DiskReadThreadPool() { 
  while (m_threads.size()) {
    delete m_threads.back();
    m_threads.pop_back();
57
  }
58
59
60
61
62
63
64
65
66
67
  m_lc.log(LOG_INFO, "All the DiskReadWorkerThreads have been destroyed");
}

//------------------------------------------------------------------------------
// DiskReadThreadPool::startThreads
//------------------------------------------------------------------------------
void DiskReadThreadPool::startThreads() {
  for (std::vector<DiskReadWorkerThread *>::iterator i=m_threads.begin();
          i != m_threads.end(); i++) {
    (*i)->start();
68
  }
69
70
71
72
73
74
75
76
77
78
  m_lc.log(LOG_INFO, "All the DiskReadWorkerThreads are started");
}

//------------------------------------------------------------------------------
// DiskReadThreadPool::waitThreads
//------------------------------------------------------------------------------
void DiskReadThreadPool::waitThreads() {
  for (std::vector<DiskReadWorkerThread *>::iterator i=m_threads.begin();
          i != m_threads.end(); i++) {
    (*i)->wait();
79
  }
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
}

//------------------------------------------------------------------------------
// DiskReadThreadPool::push
//------------------------------------------------------------------------------
void DiskReadThreadPool::push(DiskReadTask *t) { 
  m_tasks.push(t); 
  m_lc.log(LOG_INFO, "Push a task into the DiskReadThreadPool");
}

//------------------------------------------------------------------------------
// DiskReadThreadPool::finish
//------------------------------------------------------------------------------
void DiskReadThreadPool::finish() {
  /* Insert one endOfSession per thread */
  for (size_t i=0; i<m_threads.size(); i++) {
    m_tasks.push(NULL);
97
  }
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
}

//------------------------------------------------------------------------------
// DiskReadThreadPool::popAndRequestMore
//------------------------------------------------------------------------------
DiskReadTask* DiskReadThreadPool::popAndRequestMore(castor::log::LogContext &lc){
  castor::tape::threading::BlockingQueue<DiskReadTask*>::valueRemainingPair 
  vrp = m_tasks.popGetSize();
  log::LogContext::ScopedParam sp(lc, log::Param("m_maxFilesReq", m_maxFilesReq));
  log::LogContext::ScopedParam sp0(lc, log::Param("m_maxBytesReq", m_maxBytesReq));

  if(0==vrp.remaining){
    m_injector->requestInjection(true);
    lc.log(LOG_DEBUG, "Requested injection from MigrationTaskInjector (with last call)");
  }else if(vrp.remaining + 1 ==  m_maxFilesReq/2){
    m_injector->requestInjection(false);
    lc.log(LOG_DEBUG, "Requested injection from MigrationTaskInjector (without last call)");
115
  }
116
117
  return vrp.value;
}
118

119
120
121
122
123
124
125
126
127
128
129
130
//------------------------------------------------------------------------------
// DiskReadThreadPool::DiskReadWorkerThread::run
//------------------------------------------------------------------------------
void DiskReadThreadPool::DiskReadWorkerThread::run() {
  m_lc.pushOrReplace(log::Param("thread", "DiskRead"));
  m_lc.pushOrReplace(log::Param("threadID",m_threadID));
  m_lc.log(LOG_DEBUG, "DiskReadWorkerThread Running");
  std::auto_ptr<DiskReadTask> task;
  while(1) {
    task.reset( m_parent.popAndRequestMore(m_lc));
    if (NULL!=task.get()) {
      task->execute(m_lc);
131
    }
132
133
    else {
      break;
134
    }
135
136
137
138
139
140
  } //end of while(1)
  // We now acknowledge to the task injector that read reached the end. There
  // will hence be no more requests for more. (last thread turns off the light)
  if (0 == --m_parent.m_nbActiveThread) {
    m_parent.m_injector->finish();
    m_lc.log(LOG_INFO, "Signaled to task injector the end of disk read threads");
141
  }
142
143
  m_lc.log(LOG_INFO, "Finishing of DiskReadWorkerThread");
}
144
145
  
}}}}
146