DiskWriteThreadPool.hpp 6.57 KB
Newer Older
1
2
/*
 * @project        The CERN Tape Archive (CTA)
3
 * @copyright      Copyright(C) 2003-2021 CERN
4
5
6
7
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
8
 *
9
10
11
12
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
13
 *
14
15
16
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
17
18
19

#pragma once

20
#include "common/threading/BlockingQueue.hpp"
21
#include "common/threading/Thread.hpp"
22
#include "common/threading/AtomicCounter.hpp"
Victor Kotlyar's avatar
Victor Kotlyar committed
23
#include "common/log/LogContext.hpp"
24
#include "castor/tape/tapeserver/utils/suppressUnusedVariable.hpp"
25
#include "castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
26
27
#include "castor/tape/tapeserver/daemon/DiskWriteTask.hpp"
#include "castor/tape/tapeserver/daemon/DiskStats.hpp"
28
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
29
#include "disk/RadosStriperPool.hpp"
Victor Kotlyar's avatar
Victor Kotlyar committed
30
#include "common/Timer.hpp"
31
#include <vector>
32
33
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
34

David COME's avatar
David COME committed
35
36
37
38
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
Eric Cano's avatar
Eric Cano committed
39
40
41
42
/**
 * Container for the threads that will execute the disk writes tasks in the 
 * migration.
 */
43
class DiskWriteThreadPool {
44
public:
Eric Cano's avatar
Eric Cano committed
45
46
47
48
49
50
51
52
53
  /**
   * Constructor: we create the thread structures here, but they do not get
   * started yet.
   * @param nbThread Fixed number of threads in the pool
   * @param reportPacker Reference to a previously created recall
   * report packer, to which the tasks will report their results.
   * @param lc reference to a log context object that will be copied at
   * construction time (and then copied further for each thread). There will
   * be no side effect on the caller's logs.
54
   * @param xrootPrivateKeyPath the path to the xroot private key file.
Eric Cano's avatar
Eric Cano committed
55
56
   */
  DiskWriteThreadPool(int nbThread, 
57
          RecallReportPacker& reportPacker,
58
          RecallWatchDog& recallWatchDog,
Victor Kotlyar's avatar
Victor Kotlyar committed
59
          cta::log::LogContext lc,
60
61
          const std::string & xrootPrivateKeyPath,
          uint16_t xrootTimeout);
Eric Cano's avatar
Eric Cano committed
62
63
64
65
  /**
   * Destructor: we suppose the threads are no running (waitThreads() should
   * be called befor destruction unless the threads were not started.
   */
66
  virtual ~DiskWriteThreadPool();
67
  
Eric Cano's avatar
Eric Cano committed
68
69
70
71
  /**
   * Starts the thread created at construction time.
   */
  
72
  void startThreads();
Eric Cano's avatar
Eric Cano committed
73
74
75
  /**
   * Waits for completion of all the pool's threads.
   */
76
  void waitThreads();
Eric Cano's avatar
Eric Cano committed
77
78
79
80
81
82
  
  /**
   * Pushes a pointer to a task. The thread pool owns the task and will
   * de-allocate it.
   * @param t pointer to the task
   */
83
  void push(DiskWriteTask *t);
Eric Cano's avatar
Eric Cano committed
84
85
86
87
88
  
  /**
   * Signals to the thread pool that there will be no more tasks pushed to it,
   * and that the threads can therefore complete.
   */
89
  void finish();
90
91

private:
Eric Cano's avatar
Eric Cano committed
92
  /** Running counter active threads, used to determine which thread is the last. */
93
  cta::threading::AtomicCounter<int> m_nbActiveThread;
Eric Cano's avatar
Eric Cano committed
94
  /** Thread safe counter for failed tasks */
95
  cta::threading::AtomicCounter<int> m_failedWriteCount;
96
  
Eric Cano's avatar
Eric Cano committed
97
98
99
  /**
   * Private class implementing the worker threads.
   */
100
  class DiskWriteWorkerThread: private cta::threading::Thread {
101
  public:
102
    DiskWriteWorkerThread(DiskWriteThreadPool & manager):
103
104
    m_threadID(manager.m_nbActiveThread++),m_parentThreadPool(manager),
    m_lc(m_parentThreadPool.m_lc), 
105
    m_diskFileFactory(manager.m_xrootPrivateKeyPath, 
106
      manager.m_xrootTimeout, manager.m_striperPool)
107
    {
108
109
      // This thread Id will remain for the rest of the thread's lifetime (and 
      // also context's lifetime) so ne need for a scope.
Victor Kotlyar's avatar
Victor Kotlyar committed
110
111
      m_lc.pushOrReplace(cta::log::Param("threadID", m_threadID));
      m_lc.log(cta::log::INFO,"DiskWrite Thread created");
112
113
    }
      
114
115
    void start() { cta::threading::Thread::start(); }
    void wait() { cta::threading::Thread::wait(); }
116
  private:
117
118
119
120
121
    void logWithStat(int level, const std::string& message);
    /*
     * For measuring how long  are the the different steps 
     */
    DiskStats m_threadStat;
122
  
123
124
125
    /**
     * To identify the thread 
     */
126
    const int m_threadID;
127
128
129
130
    
    /**
     * The owning thread pool
     */
131
    DiskWriteThreadPool & m_parentThreadPool;
132
133
134
135
    
    /**
     * For logging the event
     */
Victor Kotlyar's avatar
Victor Kotlyar committed
136
    cta::log::LogContext m_lc;
137
138
139
140
141

    /**
     * A disk file factory, that will create the proper type of file access class,
     * depending on the received path
     */
142
    cta::disk::DiskFileFactory m_diskFileFactory;
143

144
    virtual void run();
145
  };
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
  /**
   * When a thread finishm it call this function to Add its stats to one one of the
   * Threadpool
   * @param threadStats
   */
  void addThreadStats(const DiskStats& threadStats);
  
  
  /**
   * When the last thread finish, we log all m_pooldStat members + message
   * at the given level
   * @param level
   * @param message
   */
  void logWithStat(int level, const std::string& message);
161
  
Eric Cano's avatar
Eric Cano committed
162
  /** The actual container for the thread objects */
163
  std::vector<DiskWriteWorkerThread *> m_threads;
Eric Cano's avatar
Eric Cano committed
164
165
  /** Mutex protecting the pushers of new tasks from having the object deleted
   * under their feet. */
166
  cta::threading::Mutex m_pusherProtection;
167
168
169
170
  
  /**
   To protect addThreadStats from concurrent calls
   */
171
  cta::threading::Mutex m_statAddingProtection;
172
protected:
Eric Cano's avatar
Eric Cano committed
173
  /** The (thread safe) queue of tasks */
174
  cta::threading::BlockingQueue<DiskWriteTask*> m_tasks;
175
176
177
178
179
180
181
182
183
184
185
  
  /**
   * Parameter: path to xroot private key
   */
  std::string m_xrootPrivateKeyPath;
  
  /**
   * Parameter: xroot timeout
   */
  uint16_t m_xrootTimeout;

186
187
188
  /**
   * A pool of rados striper connections, to be shared by all threads
   */
189
  cta::disk::RadosStriperPool m_striperPool;
190

191
private:
192
193
194
195
  /**
   * Aggregate all threads' stats 
   */
  DiskStats m_pooldStat;
196
197
198
199

  /**
   * Measure the thread pool's lifetime
   */
Victor Kotlyar's avatar
Victor Kotlyar committed
200
  cta::utils::Timer m_totalTime;
201
    
Eric Cano's avatar
Eric Cano committed
202
203
  /** Reference to the report packer where tasks report the result of their 
   * individual files and the end of session (for the last thread) */
204
  RecallReportPacker& m_reporter;
205
  
206
207
208
209
  /** Reference to the session watchdog, allowing reporting of errors to it.
   */
  RecallWatchDog& m_watchdog;
  
Eric Cano's avatar
Eric Cano committed
210
  /** logging context that will be copied by each thread for individual context */
Victor Kotlyar's avatar
Victor Kotlyar committed
211
  cta::log::LogContext m_lc;
212
};
David COME's avatar
David COME committed
213
214

}}}}