DiskReadThreadPool.cpp 9.6 KB
Newer Older
1
2
3
4
5
6
7
/*
 * @project        The CERN Tape Archive (CTA)
 * @copyright      Copyright(C) 2021 CERN
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
8
 *
9
10
11
12
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
13
 *
14
15
16
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
17
18

#include "castor/tape/tapeserver/daemon/DiskReadThreadPool.hpp"
19
#include "castor/tape/tapeserver/daemon/MigrationTaskInjector.hpp"
20
#include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp"
21
22
#include <memory>
#include <sstream>
23

24
25
26
27
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
28
29
30
31

//------------------------------------------------------------------------------
// DiskReadThreadPool constructor
//------------------------------------------------------------------------------
32
33
DiskReadThreadPool::DiskReadThreadPool(int nbThread, uint64_t maxFilesReq,uint64_t maxBytesReq,
    castor::tape::tapeserver::daemon::MigrationWatchDog & migrationWatchDog,
34
    cta::log::LogContext lc, const std::string & xrootPrivateKeyPath, uint16_t xrootTimeout) : 
35
36
    m_xrootPrivateKeyPath(xrootPrivateKeyPath),
    m_xrootTimeout(xrootTimeout),
37
    m_watchdog(migrationWatchDog),
38
    m_lc(lc),m_maxFilesReq(maxFilesReq),
39
    m_maxBytesReq(maxBytesReq), m_nbActiveThread(0) {
40
41
42
  for(int i=0; i<nbThread; i++) {
    DiskReadWorkerThread * thr = new DiskReadWorkerThread(*this);
    m_threads.push_back(thr);
Victor Kotlyar's avatar
Victor Kotlyar committed
43
44
    m_lc.pushOrReplace(cta::log::Param("threadID",i));
    m_lc.log(cta::log::DEBUG, "DiskReadWorkerThread created");
45
  }
46
47
48
49
50
51
52
53
54
}

//------------------------------------------------------------------------------
// DiskReadThreadPool destructor
//------------------------------------------------------------------------------
DiskReadThreadPool::~DiskReadThreadPool() { 
  while (m_threads.size()) {
    delete m_threads.back();
    m_threads.pop_back();
55
  }
Victor Kotlyar's avatar
Victor Kotlyar committed
56
  m_lc.log(cta::log::DEBUG, "Deleted threads in DiskReadThreadPool::~DiskReadThreadPool");
57
58
59
60
61
62
63
64
65
}

//------------------------------------------------------------------------------
// DiskReadThreadPool::startThreads
//------------------------------------------------------------------------------
void DiskReadThreadPool::startThreads() {
  for (std::vector<DiskReadWorkerThread *>::iterator i=m_threads.begin();
          i != m_threads.end(); i++) {
    (*i)->start();
66
  }
Victor Kotlyar's avatar
Victor Kotlyar committed
67
  m_lc.log(cta::log::INFO, "All the DiskReadWorkerThreads are started");
68
69
70
71
72
73
74
75
76
}

//------------------------------------------------------------------------------
// DiskReadThreadPool::waitThreads
//------------------------------------------------------------------------------
void DiskReadThreadPool::waitThreads() {
  for (std::vector<DiskReadWorkerThread *>::iterator i=m_threads.begin();
          i != m_threads.end(); i++) {
    (*i)->wait();
77
  }
78
79
80
81
82
83
84
}

//------------------------------------------------------------------------------
// DiskReadThreadPool::push
//------------------------------------------------------------------------------
void DiskReadThreadPool::push(DiskReadTask *t) { 
  m_tasks.push(t); 
Victor Kotlyar's avatar
Victor Kotlyar committed
85
  m_lc.log(cta::log::INFO, "Push a task into the DiskReadThreadPool");
86
87
88
89
90
91
92
93
94
}

//------------------------------------------------------------------------------
// DiskReadThreadPool::finish
//------------------------------------------------------------------------------
void DiskReadThreadPool::finish() {
  /* Insert one endOfSession per thread */
  for (size_t i=0; i<m_threads.size(); i++) {
    m_tasks.push(NULL);
95
  }
96
97
98
99
100
}

//------------------------------------------------------------------------------
// DiskReadThreadPool::popAndRequestMore
//------------------------------------------------------------------------------
Victor Kotlyar's avatar
Victor Kotlyar committed
101
DiskReadTask* DiskReadThreadPool::popAndRequestMore(cta::log::LogContext &lc){
102
  cta::threading::BlockingQueue<DiskReadTask*>::valueRemainingPair 
103
  vrp = m_tasks.popGetSize();
Victor Kotlyar's avatar
Victor Kotlyar committed
104
105
  cta::log::LogContext::ScopedParam sp(lc, cta::log::Param("m_maxFilesReq", m_maxFilesReq));
  cta::log::LogContext::ScopedParam sp0(lc, cta::log::Param("m_maxBytesReq", m_maxBytesReq));
106
107
108

  if(0==vrp.remaining){
    m_injector->requestInjection(true);
Victor Kotlyar's avatar
Victor Kotlyar committed
109
    lc.log(cta::log::DEBUG, "Requested injection from MigrationTaskInjector (with last call)");
110
111
  }else if(vrp.remaining + 1 ==  m_maxFilesReq/2){
    m_injector->requestInjection(false);
Victor Kotlyar's avatar
Victor Kotlyar committed
112
    lc.log(cta::log::DEBUG, "Requested injection from MigrationTaskInjector (without last call)");
113
  }
114
115
  return vrp.value;
}
116
117
118
//------------------------------------------------------------------------------
//addThreadStats
//------------------------------------------------------------------------------
119
void DiskReadThreadPool::addThreadStats(const DiskStats& other){
120
  cta::threading::MutexLocker lock(m_statAddingProtection);
121
122
  m_pooldStat+=other;
}
123
//------------------------------------------------------------------------------
124
125
126
//logWithStat
//------------------------------------------------------------------------------
void DiskReadThreadPool::logWithStat(int level, const std::string& message){
127
  m_pooldStat.totalTime = m_totalTime.secs();
Victor Kotlyar's avatar
Victor Kotlyar committed
128
  cta::log::ScopedParamContainer params(m_lc);
129
130
131
132
133
134
  params.add("poolReadWriteTime", m_pooldStat.readWriteTime)
        .add("poolWaitFreeMemoryTime",m_pooldStat.waitFreeMemoryTime)
        .add("poolCheckingErrorTime",m_pooldStat.checkingErrorTime)
        .add("poolOpeningTime",m_pooldStat.openingTime)
        .add("poolTransferTime", m_pooldStat.transferTime)
        .add("poolRealTime",m_pooldStat.totalTime)
135
        .add("poolFileCount",m_pooldStat.filesCount)
Eric Cano's avatar
Eric Cano committed
136
        .add("poolDataVolume", m_pooldStat.dataVolume)
137
        .add("poolGlobalPayloadTransferSpeedMBps",
138
           m_pooldStat.totalTime?1.0*m_pooldStat.dataVolume/1000/1000/m_pooldStat.totalTime:0)
139
        .add("poolAverageDiskPerformanceMBps",
140
           m_pooldStat.transferTime?1.0*m_pooldStat.dataVolume/1000/1000/m_pooldStat.transferTime:0.0)
141
        .add("poolOpenRWCloseToTransferTimeRatio",
142
           m_pooldStat.transferTime?(m_pooldStat.openingTime+m_pooldStat.readWriteTime+m_pooldStat.closingTime)/m_pooldStat.transferTime:0.0);
143
  m_lc.log(level,message);
144
145
}
//------------------------------------------------------------------------------
146
// DiskReadWorkerThread::run
147
148
//------------------------------------------------------------------------------
void DiskReadThreadPool::DiskReadWorkerThread::run() {
Victor Kotlyar's avatar
Victor Kotlyar committed
149
  cta::log::ScopedParamContainer logParams(m_lc);
150
151
  logParams.add("thread", "DiskRead")
           .add("threadID", m_threadID);
Victor Kotlyar's avatar
Victor Kotlyar committed
152
  m_lc.log(cta::log::DEBUG, "Starting DiskReadWorkerThread");
153
  
154
  std::unique_ptr<DiskReadTask> task;
Victor Kotlyar's avatar
Victor Kotlyar committed
155
156
  cta::utils::Timer localTime;
  cta::utils::Timer totalTime;
157
  
158
159
  while(1) {
    task.reset( m_parent.popAndRequestMore(m_lc));
Victor Kotlyar's avatar
Victor Kotlyar committed
160
    m_threadStat.waitInstructionsTime += localTime.secs(cta::utils::Timer::resetCounter);
161
    if (NULL!=task.get()) {
162
      task->execute(m_lc, m_diskFileFactory,m_parent.m_watchdog, m_threadID);
163
      m_threadStat += task->getTaskStats();
164
    }
165
166
    else {
      break;
167
    }
168
  } //end of while(1)
169
170
  m_threadStat.totalTime = totalTime.secs();
  m_parent.addThreadStats(m_threadStat);
Victor Kotlyar's avatar
Victor Kotlyar committed
171
  logWithStat(cta::log::INFO, "Finishing of DiskReadWorkerThread");
172
173
  // We now acknowledge to the task injector that read reached the end. There
  // will hence be no more requests for more. (last thread turns off the light)
174
175
  int remainingThreads = --m_parent.m_nbActiveThread;
  if (!remainingThreads) {
176
    m_parent.m_injector->finish();
Victor Kotlyar's avatar
Victor Kotlyar committed
177
178
    m_lc.log(cta::log::INFO, "Signalled to task injector the end of disk read threads");
    m_parent.logWithStat(cta::log::INFO, "All the DiskReadWorkerThreads have completed");
179
  } else {
Victor Kotlyar's avatar
Victor Kotlyar committed
180
    cta::log::ScopedParamContainer params(m_lc);
181
    params.add("remainingThreads", remainingThreads);
Victor Kotlyar's avatar
Victor Kotlyar committed
182
    m_lc.log(cta::log::DEBUG, "Will not signal the end to task injector yet");
183
  }
184
185
186
187
188
189
190
}

//------------------------------------------------------------------------------
// DiskReadWorkerThread::logWithStat
//------------------------------------------------------------------------------
void DiskReadThreadPool::DiskReadWorkerThread::
logWithStat(int level, const std::string& message){
Victor Kotlyar's avatar
Victor Kotlyar committed
191
  cta::log::ScopedParamContainer params(m_lc);
192
193
194
195
196
197
     params.add("threadReadWriteTime", m_threadStat.readWriteTime)
           .add("threadWaitFreeMemoryTime",m_threadStat.waitFreeMemoryTime)
           .add("threadCheckingErrorTime",m_threadStat.checkingErrorTime)
           .add("threadOpeningTime",m_threadStat.openingTime)
           .add("threadTransferTime",m_threadStat.transferTime)
           .add("threadTotalTime",m_threadStat.totalTime)
198
199
           .add("threadDataVolume",m_threadStat.dataVolume)
           .add("threadFileCount",m_threadStat.filesCount)
200
           .add("threadGlobalPayloadTransferSpeedMBps",
201
              m_threadStat.totalTime?1.0*m_threadStat.dataVolume/1000/1000/m_threadStat.totalTime:0)
202
           .add("threadAverageDiskPerformanceMBps",
203
              m_threadStat.transferTime?1.0*m_threadStat.dataVolume/1000/1000/m_threadStat.transferTime:0.0)
204
           .add("threadOpenRWCloseToTransferTimeRatio",
205
              m_threadStat.transferTime?(m_threadStat.openingTime+m_threadStat.readWriteTime+m_threadStat.closingTime)/m_threadStat.transferTime:0.0);
206
    m_lc.log(level,message);
207
}
208
}}}}
209