RecallTaskInjector.hpp 7.86 KB
Newer Older
1
2
/*
 * @project        The CERN Tape Archive (CTA)
3
 * @copyright      Copyright(C) 2003-2021 CERN
4
5
6
7
 * @license        This program is free software: you can redistribute it and/or modify
 *                 it under the terms of the GNU General Public License as published by
 *                 the Free Software Foundation, either version 3 of the License, or
 *                 (at your option) any later version.
8
 *
9
10
11
12
 *                 This program is distributed in the hope that it will be useful,
 *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
 *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *                 GNU General Public License for more details.
13
 *
14
15
16
 *                 You should have received a copy of the GNU General Public License
 *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
17
18
19

#pragma once

20
#include <stdint.h>
Victor Kotlyar's avatar
Victor Kotlyar committed
21
#include "common/log/LogContext.hpp"
22
#include "common/threading/BlockingQueue.hpp"
23
#include "common/threading/Thread.hpp"
24
25
#include "scheduler/RetrieveJob.hpp"
#include "scheduler/RetrieveMount.hpp"
26
#include "castor/tape/tapeserver/drive/DriveInterface.hpp"
27
#include "castor/tape/tapeserver/RAO/RAOParams.hpp"
28
#include "castor/tape/tapeserver/RAO/RAOManager.hpp"
29
30

#include <future>
31

32
33
namespace castor{
namespace tape{
34
35
36
37
  //forward declarations
  namespace tapegateway {
    class FileToRecallStruct;
  }
38
namespace tapeserver{
39
40
41
  namespace client {
    class ClientInterface;
  }
42
namespace daemon {
43
44

  //forward declaration
45
46
  class RecallMemoryManager;
  class DiskWriteThreadPool;
47
  class TapeReadTask;
48
49
50
  //forward declaration of template class
  template <class T> class TapeSingleThreadInterface;
  
51
/**
David COME's avatar
David COME committed
52
53
 * This classis responsible for creating the tasks in case of a recall job
 */
54
class RecallTaskInjector {  
55
public:
56
57
58
59
60
61
62
63
 /**
  * Constructor
  * @param mm the memory manager from whom the TRT will be pulling blocks
  * @param tapeReader the one object that will hold the thread which will be executing 
  * tape-reading tasks
  * @param diskWriter the one object that will hold all the threads which will be executing 
  * disk-writing tasks
  * @param client The one that will give us files to recall 
64
65
  * @param filesPerRequest number of files we request from the client at once 
  * @param bytesPerRequest number of bytes we request from the client at once 
66
67
68
  * we may request to the client. at once
  * @param lc  copied because of the threading mechanism 
  */
69
  RecallTaskInjector(RecallMemoryManager & mm, 
70
        TapeSingleThreadInterface<TapeReadTask> & tapeReader,
71
        DiskWriteThreadPool & diskWriter,cta::RetrieveMount &retrieveMount,
Victor Kotlyar's avatar
Victor Kotlyar committed
72
        uint64_t filesPerRequest, uint64_t bytesPerRequest,cta::log::LogContext lc);
73

74
  virtual ~RecallTaskInjector();
David COME's avatar
David COME committed
75
76
77
  
  /**
   * Function for a feed-back loop purpose between RecallTaskInjector and 
78
   * TapeReadSingleThread. When TapeReadSingleThread::popAndRequestMoreJobs detects 
79
80
   * it has not enough jobs to do to, it will push a request, that when executed 
   * will ask the client to try to fill up the queue. 
81

David COME's avatar
David COME committed
82
83
84
   * @param lastCall true if we want the new request to be a last call. 
   * See Request::lastCall 
   */
85
  virtual void requestInjection(bool lastCall);
David COME's avatar
David COME committed
86

87
88
89
90
91
92
  /**
   * Send an end token in the request queue. There should be no subsequent
   * calls to requestInjection.
   */
  void finish();
  
David COME's avatar
David COME committed
93
94
95
96
    /**
     * Contact the client to make sure there are really something to do
     * Something = recall at most  maxFiles or at least maxBytes
     * 
97
     * @param noFilesToRecall will be true if noFilesWere popped from the queue.
David COME's avatar
David COME committed
98
99
     * @return true if there are jobs to be done, false otherwise 
     */
100
  bool synchronousFetch(bool & noFilesToRecall);
101

David COME's avatar
David COME committed
102
103
104
  /**
   * Wait for the inner thread to finish
   */
105
  void waitThreads();
David COME's avatar
David COME committed
106
107
108
109
  
  /**
   * Start the inner thread 
   */
110
  void startThreads();
111
112
113
114
115
116
117
118
119
120

  /**
   * Set the drive interface in use
   * @param di - Drive interface
   */
  void setDriveInterface(castor::tape::tapeserver::drive::DriveInterface *di);

  /**
   * Initialize Recommended Access Order parameters
   */
121
  void initRAO(const castor::tape::tapeserver::rao::RAOParams & dataConfig, cta::catalogue::Catalogue * catalogue);
122

123
  void waitForPromise();
124
125

  void setPromise();
126
127
128
129
130
131
132
133
134
135
136
137
138
139
  
  /**
   * This method will tell the TapeReadSingleThread that the
   * first batch of tasks has been injected
   * by the RecallTaskInjector
   */
  void setFirstTasksInjectedPromise();
  
  /**
   * This method will be called by the TapeReadSingleThread
   * so that TapeReadSingleThread will wait the first batch
   * of tasks to be injected by the RecallTaskInjector
   */
  void waitForFirstTasksInjectedPromise();
140

141
private:
142
143
144
145
146
147
148
149
150
151
  /**
   * It will signal to the disk read thread  pool, tape write single thread
   * and to the mem manager they have to stop their threads(s)
   */
  void signalEndDataMovement();

  /**
   * It will delete all remaining tasks 
   */
  void deleteAllTasks();
David COME's avatar
David COME committed
152
153
154
155
  
  /**
   * Create all the tape-read and write-disk tasks for set of files to retrieve
   */
156
  void injectBulkRecalls();
157
158

  /**
David COME's avatar
David COME committed
159
160
161
162
163
   * A request of files to recall. We request EITHER
   * - a maximum of nbMaxFiles files
   * - OR at least byteSizeThreshold bytes. 
   * That means we stop as soon as we have nbMaxFiles files or the cumulated size 
   * is equal or above byteSizeThreshold. 
164
   */
165
166
  class Request {
  public:
167
    Request(uint64_t mf, uint64_t mb, bool lc):
168
    filesRequested(mf), bytesRequested(mb), lastCall(lc),end(false) {}
David COME's avatar
David COME committed
169
    
170
    Request():
171
172
173
    filesRequested(0), bytesRequested(0), lastCall(true),end(true) {}
    const uint64_t filesRequested;
    const uint64_t bytesRequested;
David COME's avatar
David COME committed
174
175
176
177
178
179
    
    /** 
     * True if it is the last call for the set of requests :it means
     *  we don't need to try to get more files to recall 
     *  and can send into all the different threads a signal  .
     */
180
    const bool lastCall;
181
    
182
183
184
    /**
     * Set as true if the loop back process notified to us the end 
     */
185
    const bool end;
186
  };
187
  
188
  class WorkerThread: public cta::threading::Thread {
189
  public:
190
    WorkerThread(RecallTaskInjector & rji): m_parent(rji) {}
191
    virtual void run();
192
  private:
193
    RecallTaskInjector & m_parent;
194
  } m_thread;
195
  ///The memory manager for accessing memory blocks. 
196
  RecallMemoryManager & m_memManager;
David COME's avatar
David COME committed
197
  
198
199
  ///the one object that will hold the thread which will be executing 
  ///tape-reading tasks
200
  TapeSingleThreadInterface<TapeReadTask> & m_tapeReader;
201
202
203
  
  ///the one object that will hold all the threads which will be executing 
  ///disk-writing tasks
204
  DiskWriteThreadPool & m_diskWriter;
205
206
  
  /// the client who is sending us jobs
207
  cta::RetrieveMount &m_retrieveMount;
David COME's avatar
David COME committed
208
  
209
210
211
212
213
  /// Drive interface needed for performing Recommended Access Order query
  castor::tape::tapeserver::drive::DriveInterface * m_drive;

  std::vector<std::unique_ptr<cta::RetrieveJob>> m_jobs;

David COME's avatar
David COME committed
214
215
216
  /**
   * utility member to log some pieces of information
   */
Victor Kotlyar's avatar
Victor Kotlyar committed
217
  cta::log::LogContext m_lc;
David COME's avatar
David COME committed
218
  
219
220
  cta::threading::Mutex m_producerProtection;
  cta::threading::BlockingQueue<Request> m_queue;
221
222
223
224
225
226
  

  //maximal number of files requested. at once
  const uint64_t m_maxFiles;
  
  //maximal number of cumulated byte requested. at once
227
  const uint64_t m_maxBytes;
228
229
  
  /**
230
   * The RAO manager to perofrm RAO operations
231
   */
232
  castor::tape::tapeserver::rao::RAOManager m_raoManager;
233
234
235
236
237
238

  /** Number of jobs to be fetched before the tape is mounted.
   *  The desired number is m_raoLimits.maxSupported
   */
  unsigned int m_fetched;

239
240
241
242
  /**
   * The promise for reordering the read tasks according to RAO by the
   * RecallTaskInjector. The tasks to be run are placed in the m_tasks queue
   */
Cristina Moraru's avatar
Cristina Moraru committed
243
  std::promise<void> m_raoPromise;
244
  std::future<void> m_raoFuture;
245
246
247
248
249

  std::promise<void> m_firstTasksInjectedPromise;
  std::future<void> m_firstTasksInjectedFuture;
  
  bool m_promiseFirstTaskInjectedSet = false;
250
};
251
252
253
254
255

} //end namespace daemon
} //end namespace tapeserver
} //end namespace tape
} //end namespace castor