DiskWriteThreadPool.hpp 6.53 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

#pragma once

26
27
28
#include "common/threading/BlockingQueue.hpp"
#include "common/threading/Threading.hpp"
#include "common/threading/AtomicCounter.hpp"
29

30
31
#include "castor/log/LogContext.hpp"
#include "castor/tape/tapeserver/utils/suppressUnusedVariable.hpp"
32
#include "castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
33
34
#include "castor/tape/tapeserver/daemon/DiskWriteTask.hpp"
#include "castor/tape/tapeserver/daemon/DiskStats.hpp"
35
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
Victor Kotlyar's avatar
Victor Kotlyar committed
36
#include "common/Timer.hpp"
37
#include <vector>
38
39
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
40

David COME's avatar
David COME committed
41
42
43
44
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
Eric Cano's avatar
Eric Cano committed
45
46
47
48
/**
 * Container for the threads that will execute the disk writes tasks in the 
 * migration.
 */
49
class DiskWriteThreadPool {
50
public:
Eric Cano's avatar
Eric Cano committed
51
52
53
54
55
56
57
58
59
  /**
   * Constructor: we create the thread structures here, but they do not get
   * started yet.
   * @param nbThread Fixed number of threads in the pool
   * @param reportPacker Reference to a previously created recall
   * report packer, to which the tasks will report their results.
   * @param lc reference to a log context object that will be copied at
   * construction time (and then copied further for each thread). There will
   * be no side effect on the caller's logs.
60
61
62
63
64
   * @param remoteFileProtocol a string describing how the bare (no URL)
   * pathes to files should be interpreted. Default is RFIO, unless the string
   * is "xroot" (with any case variation) in which case we would use Xroot as a
   * protocol.
   * @param xrootPrivateKeyPath the path to the xroot private key file.
Eric Cano's avatar
Eric Cano committed
65
66
   */
  DiskWriteThreadPool(int nbThread, 
67
          RecallReportPacker& reportPacker,
68
          RecallWatchDog& recallWatchDog,
69
          castor::log::LogContext lc,
70
          const std::string & remoteFileProtocol,
71
          const std::string & xrootPrivateKeyPath);
Eric Cano's avatar
Eric Cano committed
72
73
74
75
  /**
   * Destructor: we suppose the threads are no running (waitThreads() should
   * be called befor destruction unless the threads were not started.
   */
76
  virtual ~DiskWriteThreadPool();
77
  
Eric Cano's avatar
Eric Cano committed
78
79
80
81
  /**
   * Starts the thread created at construction time.
   */
  
82
  void startThreads();
Eric Cano's avatar
Eric Cano committed
83
84
85
  /**
   * Waits for completion of all the pool's threads.
   */
86
  void waitThreads();
Eric Cano's avatar
Eric Cano committed
87
88
89
90
91
92
  
  /**
   * Pushes a pointer to a task. The thread pool owns the task and will
   * de-allocate it.
   * @param t pointer to the task
   */
93
  void push(DiskWriteTask *t);
Eric Cano's avatar
Eric Cano committed
94
95
96
97
98
  
  /**
   * Signals to the thread pool that there will be no more tasks pushed to it,
   * and that the threads can therefore complete.
   */
99
  void finish();
100
101

private:
Eric Cano's avatar
Eric Cano committed
102
  /** Running counter active threads, used to determine which thread is the last. */
103
  cta::threading::AtomicCounter<int> m_nbActiveThread;
Eric Cano's avatar
Eric Cano committed
104
  /** Thread safe counter for failed tasks */
105
  cta::threading::AtomicCounter<int> m_failedWriteCount;
106
  
107
108
109
110
  /**
   * A disk file factory, that will create the proper type of file access class,
   * depending on the received path
   */
111
  diskFile::DiskFileFactory m_diskFileFactory;
112
  
Eric Cano's avatar
Eric Cano committed
113
114
115
  /**
   * Private class implementing the worker threads.
   */
116
  class DiskWriteWorkerThread: private cta::threading::Thread {
117
  public:
118
    DiskWriteWorkerThread(DiskWriteThreadPool & manager):
119
    m_threadID(manager.m_nbActiveThread++),m_parentThreadPool(manager),m_lc(m_parentThreadPool.m_lc)
120
    {
121
122
123
124
      // This thread Id will remain for the rest of the thread's lifetime (and 
      // also context's lifetime) so ne need for a scope.
      m_lc.pushOrReplace(log::Param("threadID", m_threadID));
      m_lc.log(LOG_INFO,"DiskWrite Thread created");
125
126
    }
      
127
128
    void start() { cta::threading::Thread::start(); }
    void wait() { cta::threading::Thread::wait(); }
129
  private:
130
131
132
133
134
    void logWithStat(int level, const std::string& message);
    /*
     * For measuring how long  are the the different steps 
     */
    DiskStats m_threadStat;
135
  
136
137
138
    /**
     * To identify the thread 
     */
139
    const int m_threadID;
140
141
142
143
    
    /**
     * The owning thread pool
     */
144
    DiskWriteThreadPool & m_parentThreadPool;
145
146
147
148
    
    /**
     * For logging the event
     */
149
    castor::log::LogContext m_lc;
150
151
    
    
152
    virtual void run();
153
  };
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
  /**
   * When a thread finishm it call this function to Add its stats to one one of the
   * Threadpool
   * @param threadStats
   */
  void addThreadStats(const DiskStats& threadStats);
  
  
  /**
   * When the last thread finish, we log all m_pooldStat members + message
   * at the given level
   * @param level
   * @param message
   */
  void logWithStat(int level, const std::string& message);
169
  
Eric Cano's avatar
Eric Cano committed
170
  /** The actual container for the thread objects */
171
  std::vector<DiskWriteWorkerThread *> m_threads;
Eric Cano's avatar
Eric Cano committed
172
173
  /** Mutex protecting the pushers of new tasks from having the object deleted
   * under their feet. */
174
  cta::threading::Mutex m_pusherProtection;
175
176
177
178
  
  /**
   To protect addThreadStats from concurrent calls
   */
179
  cta::threading::Mutex m_statAddingProtection;
180
protected:
Eric Cano's avatar
Eric Cano committed
181
  /** The (thread safe) queue of tasks */
182
  cta::threading::BlockingQueue<DiskWriteTask*> m_tasks;
183
private:
184
185
186
187
  /**
   * Aggregate all threads' stats 
   */
  DiskStats m_pooldStat;
188
189
190
191

  /**
   * Measure the thread pool's lifetime
   */
Victor Kotlyar's avatar
Victor Kotlyar committed
192
  cta::utils::Timer m_totalTime;
193
    
Eric Cano's avatar
Eric Cano committed
194
195
  /** Reference to the report packer where tasks report the result of their 
   * individual files and the end of session (for the last thread) */
196
  RecallReportPacker& m_reporter;
197
  
198
199
200
201
  /** Reference to the session watchdog, allowing reporting of errors to it.
   */
  RecallWatchDog& m_watchdog;
  
Eric Cano's avatar
Eric Cano committed
202
  /** logging context that will be copied by each thread for individual context */
203
  castor::log::LogContext m_lc;
204
};
David COME's avatar
David COME committed
205
206

}}}}