MigrationTaskInjector.hpp 7.82 KB
Newer Older
David COME's avatar
David COME committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

#pragma once

26
#include "castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp"
David COME's avatar
David COME committed
27
28
29
30
#include "castor/tape/tapeserver/daemon/TapeWriteSingleThread.hpp"
#include "castor/tape/tapeserver/daemon/TapeWriteTask.hpp"
#include "castor/tape/tapeserver/daemon/DiskReadThreadPool.hpp"
#include "castor/tape/tapeserver/daemon/DiskReadTask.hpp"
Victor Kotlyar's avatar
Victor Kotlyar committed
31
#include "common/log/LogContext.hpp"
32
#include "common/threading/AtomicCounter.hpp"
33
34
#include "scheduler/ArchiveMount.hpp"

David COME's avatar
David COME committed
35
36
37
38
39
40
namespace castor{
namespace tape{
namespace tapeserver{
namespace daemon {

/**
41
 * This class is responsible for creating the tasks in case of a recall job
David COME's avatar
David COME committed
42
 */
43
class MigrationTaskInjector {  
David COME's avatar
David COME committed
44
45
public:

46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
  /**
   * Constructor
   * @param mm The memory manager for accessing memory blocks. 
   * The Newly created tapeWriter Tasks will register themselves 
   * as a client to it. 
   * @param diskReader the one object that will hold all the threads which will be executing 
   * disk-reading tasks
   * @param tapeWriter the one object that will hold the thread which will be executing 
   * tape-writing tasks
   * @param client The one that will give us files to migrate 
   * @param maxFiles maximal number of files we may request to the client at once 
   * @param byteSizeThreshold maximal number of cumulated byte 
   * we may request to the client. at once
   * @param lc log context, copied because of the threading mechanism 
   */
61
  MigrationTaskInjector(MigrationMemoryManager & mm, 
62
        DiskReadThreadPool & diskReader,
63
        TapeSingleThreadInterface<TapeWriteTask> & tapeWriter,cta::ArchiveMount &archiveMount,
Victor Kotlyar's avatar
Victor Kotlyar committed
64
        uint64_t maxFiles, uint64_t byteSizeThreshold,cta::log::LogContext lc);
David COME's avatar
David COME committed
65
66
67
68
69
70
71
72
73
74

  /**
   * Wait for the inner thread to finish
   */
  void waitThreads();
  
  /**
   * Start the inner thread 
   */
  void startThreads();
75
76
77
78
79
80
81
82
  /**
   * Function for a feed-back loop purpose between MigrationTaskInjector and 
   * DiskReadThreadPool. When DiskReadThreadPool::popAndRequestMoreJobs detects 
   * it has not enough jobs to do to, it is class to push a request 
   * in order to (try) fill up the queue. 
   * @param lastCall true if we want the new request to be a last call. 
   * See Request::lastCall 
   */
83
  void requestInjection(bool lastCall);
84
85
86
87
88
  
  /**
   * Contact the client to make sure there are really something to do
   * Something = migration at most  maxFiles or at least maxBytes
   * 
89
   * @param noFilesToMigrate[out] will be true if it triggered an empty mount because of no files to migrate
90
91
   * @return true if there are jobs to be done, false otherwise 
   */
92
  bool synchronousInjection(bool & noFilesToMigrate);
93
94
95
96
97
98
  
  /**
   * Send an end token in the request queue. There should be no subsequent
   * calls to requestInjection.
   */
  void finish();
99
  
100
101
102
103
104
  /**
   * Return the first file to be written's fseq
   * @return 
   */
  uint64_t firstFseqToWrite() const;
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
  
  /**
   * Public interface allowing to set the error flag. This is used
   * by the tasks (disk and tape) and the tape thread to indicate
   * that the session cannot continue.
   */
  void setErrorFlag() {
    m_errorFlag.set();
  }
  
  /**
   * Public interface to the error flag, allowing the disk and tape tasks
   * to decide whether they should carry on or just free memory.
   * @return value of the error flag
   */
  bool hasErrorFlag() {
    return m_errorFlag;
  }
David COME's avatar
David COME committed
123
private:
124
125
126
127
  /**
   * Create all the tape-read and write-disk tasks for set of files to retrieve
   * @param jobs the list of FileToMigrateStructs we have to transform in a pair of task
   */
128
  void injectBulkMigrations(std::list<std::unique_ptr<cta::ArchiveJob>>& jobs);
129
130
131
132
133
134
  
  /*Compute how many blocks are needed for a file of fileSize bytes*/
  size_t howManyBlocksNeeded(size_t fileSize,size_t blockCapacity){
    return fileSize/blockCapacity + ((fileSize%blockCapacity==0) ? 0 : 1); 
  }
  
135
136
137
138
139
140
  /**
   * It will signal to the disk read thread  pool, tape write single thread
   * and to the mem manager they have to stop their threads(s)
   */
  void signalEndDataMovement();

141
142
143
144
145
146
147
148
149
   /**
   * A request of files to migrate. We request EITHER
   * - a maximum of nbMaxFiles files
   * - OR at least byteSizeThreshold bytes. 
   * That means we stop as soon as we have nbMaxFiles files or the cumulated size 
   * is equal or above byteSizeThreshold. 
   */
  class Request {
  public:
150
    Request(uint64_t mf, uint64_t mb, bool lc):
151
    filesRequested(mf), bytesRequested(mb), lastCall(lc), end(false) {}
152
153
    
    Request():
154
    filesRequested(-1), bytesRequested(-1), lastCall(true),end(true) {}
155
    
156
157
    const uint64_t filesRequested;
    const uint64_t bytesRequested;
158
159
160
161
162
163
164
    
    /** 
     * True if it is the last call for the set of requests :it means
     *  we don't need to try to get more files to recall 
     *  and can send into all the different threads a signal  .
     */
    const bool lastCall;
165
166
167
168
169
    
    /**
     * True indicates the task injector will not receive any more request.
     */
    const bool end;
170
171
  };
  
172
  class WorkerThread: public cta::threading::Thread {
David COME's avatar
David COME committed
173
  public:
174
    WorkerThread(MigrationTaskInjector & rji): m_parent(rji) {}
David COME's avatar
David COME committed
175
176
    virtual void run();
  private:
177
    MigrationTaskInjector & m_parent;
David COME's avatar
David COME committed
178
  } m_thread;
179
  ///The memory manager for accessing memory blocks. 
180
  MigrationMemoryManager & m_memManager;
David COME's avatar
David COME committed
181
  
182
183
  ///the one object that will hold the thread which will be executing 
  ///tape-writing tasks
184
  TapeSingleThreadInterface<TapeWriteTask>& m_tapeWriter;
185
186
187
  
  ///the one object that will hold all the threads which will be executing 
  ///disk-reading tasks
188
  DiskReadThreadPool & m_diskReader;
189
190
  
  /// the client who is sending us jobs
191
  cta::ArchiveMount &m_archiveMount;
David COME's avatar
David COME committed
192
193
194
195
  
  /**
   * utility member to log some pieces of information
   */
Victor Kotlyar's avatar
Victor Kotlyar committed
196
  cta::log::LogContext m_lc;
David COME's avatar
David COME committed
197
  
198
  cta::threading::Mutex m_producerProtection;
199
200
  
  ///all the requests for work we will forward to the client.
201
  cta::threading::BlockingQueue<Request> m_queue;
202
  
203
204
205
  /** a shared flag among the all tasks related to migration, set as true 
   * as soon a single task encounters a failure. That way we go into a degraded mode
   * where we only circulate memory without writing anything on tape
206
   */ 
207
  cta::threading::AtomicFlag m_errorFlag;
208

209
  /// The maximum number of files we ask per request. 
210
211
  const uint64_t m_maxFiles;
  
212
  /// Same as m_maxFilesReq for size per request. (in bytes))
213
  const uint64_t m_maxBytes;
214
215
216
217
218
  
  /**The last fseq used on the tape. We should not see this but 
   * IT is computed by subtracting 1 to fSeg  of the first file to migrate we 
   * receive. That part is done by the 
   * MigrationTaskInjector.::synchronousInjection. Thus, we compute it into 
219
   * that function and retrieve/set it within DataTransferSession executeWrite
220
221
222
   * after we make sure synchronousInjection returned true. To do so, we
   *  need to store it
   */
223
  uint64_t m_firstFseqToWrite;
David COME's avatar
David COME committed
224
225
226
227
228
229
};

} //end namespace daemon
} //end namespace tapeserver
} //end namespace tape
} //end namespace castor