MigrationTaskInjector.hpp 7.69 KB
Newer Older
1
2
/*
 * @project        The CERN Tape Archive (CTA)
3
 * @copyright      Copyright(C) 2003-2021 CERN
4
5
6
7
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
David COME's avatar
David COME committed
8
 *
9
10
11
12
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
David COME's avatar
David COME committed
13
 *
14
15
16
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
David COME's avatar
David COME committed
17
18
19

#pragma once

20
#include "castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp"
David COME's avatar
David COME committed
21
22
23
24
#include "castor/tape/tapeserver/daemon/TapeWriteSingleThread.hpp"
#include "castor/tape/tapeserver/daemon/TapeWriteTask.hpp"
#include "castor/tape/tapeserver/daemon/DiskReadThreadPool.hpp"
#include "castor/tape/tapeserver/daemon/DiskReadTask.hpp"
Victor Kotlyar's avatar
Victor Kotlyar committed
25
#include "common/log/LogContext.hpp"
26
#include "common/threading/AtomicCounter.hpp"
27
28
#include "scheduler/ArchiveMount.hpp"

David COME's avatar
David COME committed
29
30
31
32
33
34
namespace castor{
namespace tape{
namespace tapeserver{
namespace daemon {

/**
35
 * This class is responsible for creating the tasks in case of a recall job
David COME's avatar
David COME committed
36
 */
37
class MigrationTaskInjector {  
David COME's avatar
David COME committed
38
39
public:

40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
  /**
   * Constructor
   * @param mm The memory manager for accessing memory blocks. 
   * The Newly created tapeWriter Tasks will register themselves 
   * as a client to it. 
   * @param diskReader the one object that will hold all the threads which will be executing 
   * disk-reading tasks
   * @param tapeWriter the one object that will hold the thread which will be executing 
   * tape-writing tasks
   * @param client The one that will give us files to migrate 
   * @param maxFiles maximal number of files we may request to the client at once 
   * @param byteSizeThreshold maximal number of cumulated byte 
   * we may request to the client. at once
   * @param lc log context, copied because of the threading mechanism 
   */
55
  MigrationTaskInjector(MigrationMemoryManager & mm, 
56
        DiskReadThreadPool & diskReader,
57
        TapeSingleThreadInterface<TapeWriteTask> & tapeWriter,cta::ArchiveMount &archiveMount,
Victor Kotlyar's avatar
Victor Kotlyar committed
58
        uint64_t maxFiles, uint64_t byteSizeThreshold,cta::log::LogContext lc);
David COME's avatar
David COME committed
59
60
61
62
63
64
65
66
67
68

  /**
   * Wait for the inner thread to finish
   */
  void waitThreads();
  
  /**
   * Start the inner thread 
   */
  void startThreads();
69
70
71
72
73
74
75
76
  /**
   * Function for a feed-back loop purpose between MigrationTaskInjector and 
   * DiskReadThreadPool. When DiskReadThreadPool::popAndRequestMoreJobs detects 
   * it has not enough jobs to do to, it is class to push a request 
   * in order to (try) fill up the queue. 
   * @param lastCall true if we want the new request to be a last call. 
   * See Request::lastCall 
   */
77
  void requestInjection(bool lastCall);
78
79
80
81
82
  
  /**
   * Contact the client to make sure there are really something to do
   * Something = migration at most  maxFiles or at least maxBytes
   * 
83
   * @param noFilesToMigrate[out] will be true if it triggered an empty mount because of no files to migrate
84
85
   * @return true if there are jobs to be done, false otherwise 
   */
86
  bool synchronousInjection(bool & noFilesToMigrate);
87
88
89
90
91
92
  
  /**
   * Send an end token in the request queue. There should be no subsequent
   * calls to requestInjection.
   */
  void finish();
93
  
94
95
96
97
98
  /**
   * Return the first file to be written's fseq
   * @return 
   */
  uint64_t firstFseqToWrite() const;
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
  
  /**
   * Public interface allowing to set the error flag. This is used
   * by the tasks (disk and tape) and the tape thread to indicate
   * that the session cannot continue.
   */
  void setErrorFlag() {
    m_errorFlag.set();
  }
  
  /**
   * Public interface to the error flag, allowing the disk and tape tasks
   * to decide whether they should carry on or just free memory.
   * @return value of the error flag
   */
  bool hasErrorFlag() {
    return m_errorFlag;
  }
David COME's avatar
David COME committed
117
private:
118
119
120
121
  /**
   * Create all the tape-read and write-disk tasks for set of files to retrieve
   * @param jobs the list of FileToMigrateStructs we have to transform in a pair of task
   */
122
  void injectBulkMigrations(std::list<std::unique_ptr<cta::ArchiveJob>>& jobs);
123
124
125
126
127
128
  
  /*Compute how many blocks are needed for a file of fileSize bytes*/
  size_t howManyBlocksNeeded(size_t fileSize,size_t blockCapacity){
    return fileSize/blockCapacity + ((fileSize%blockCapacity==0) ? 0 : 1); 
  }
  
129
130
131
132
133
134
  /**
   * It will signal to the disk read thread  pool, tape write single thread
   * and to the mem manager they have to stop their threads(s)
   */
  void signalEndDataMovement();

135
136
137
138
139
140
141
142
143
   /**
   * A request of files to migrate. We request EITHER
   * - a maximum of nbMaxFiles files
   * - OR at least byteSizeThreshold bytes. 
   * That means we stop as soon as we have nbMaxFiles files or the cumulated size 
   * is equal or above byteSizeThreshold. 
   */
  class Request {
  public:
144
    Request(uint64_t mf, uint64_t mb, bool lc):
145
    filesRequested(mf), bytesRequested(mb), lastCall(lc), end(false) {}
146
147
    
    Request():
148
    filesRequested(-1), bytesRequested(-1), lastCall(true),end(true) {}
149
    
150
151
    const uint64_t filesRequested;
    const uint64_t bytesRequested;
152
153
154
155
156
157
158
    
    /** 
     * True if it is the last call for the set of requests :it means
     *  we don't need to try to get more files to recall 
     *  and can send into all the different threads a signal  .
     */
    const bool lastCall;
159
160
161
162
163
    
    /**
     * True indicates the task injector will not receive any more request.
     */
    const bool end;
164
165
  };
  
166
  class WorkerThread: public cta::threading::Thread {
David COME's avatar
David COME committed
167
  public:
168
    WorkerThread(MigrationTaskInjector & rji): m_parent(rji) {}
David COME's avatar
David COME committed
169
170
    virtual void run();
  private:
171
    MigrationTaskInjector & m_parent;
David COME's avatar
David COME committed
172
  } m_thread;
173
  ///The memory manager for accessing memory blocks. 
174
  MigrationMemoryManager & m_memManager;
David COME's avatar
David COME committed
175
  
176
177
  ///the one object that will hold the thread which will be executing 
  ///tape-writing tasks
178
  TapeSingleThreadInterface<TapeWriteTask>& m_tapeWriter;
179
180
181
  
  ///the one object that will hold all the threads which will be executing 
  ///disk-reading tasks
182
  DiskReadThreadPool & m_diskReader;
183
184
  
  /// the client who is sending us jobs
185
  cta::ArchiveMount &m_archiveMount;
David COME's avatar
David COME committed
186
187
188
189
  
  /**
   * utility member to log some pieces of information
   */
Victor Kotlyar's avatar
Victor Kotlyar committed
190
  cta::log::LogContext m_lc;
David COME's avatar
David COME committed
191
  
192
  cta::threading::Mutex m_producerProtection;
193
194
  
  ///all the requests for work we will forward to the client.
195
  cta::threading::BlockingQueue<Request> m_queue;
196
  
197
198
199
  /** a shared flag among the all tasks related to migration, set as true 
   * as soon a single task encounters a failure. That way we go into a degraded mode
   * where we only circulate memory without writing anything on tape
200
   */ 
201
  cta::threading::AtomicFlag m_errorFlag;
202

203
  /// The maximum number of files we ask per request. 
204
205
  const uint64_t m_maxFiles;
  
206
  /// Same as m_maxFilesReq for size per request. (in bytes))
207
  const uint64_t m_maxBytes;
208
209
210
211
212
  
  /**The last fseq used on the tape. We should not see this but 
   * IT is computed by subtracting 1 to fSeg  of the first file to migrate we 
   * receive. That part is done by the 
   * MigrationTaskInjector.::synchronousInjection. Thus, we compute it into 
213
   * that function and retrieve/set it within DataTransferSession executeWrite
214
215
216
   * after we make sure synchronousInjection returned true. To do so, we
   *  need to store it
   */
217
  uint64_t m_firstFseqToWrite;
David COME's avatar
David COME committed
218
219
220
221
222
223
};

} //end namespace daemon
} //end namespace tapeserver
} //end namespace tape
} //end namespace castor