MigrationTaskInjector.hpp 7.26 KB
Newer Older
David COME's avatar
David COME committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/******************************************************************************
 *                      MigrationTaskInjector.hpp
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/

#pragma once

27
#include "castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp"
David COME's avatar
David COME committed
28
29
30
31
32
33
34
35
#include "castor/tape/tapeserver/daemon/TapeWriteSingleThread.hpp"
#include "castor/tape/tapeserver/daemon/TapeWriteTask.hpp"
#include "castor/tape/tapeserver/daemon/DiskReadThreadPool.hpp"
#include "castor/tape/tapeserver/daemon/DiskReadTask.hpp"
#include "castor/tape/tapeserver/client/ClientProxy.hpp"
#include "castor/tape/tapegateway/FileToMigrateStruct.hpp"
#include "castor/tape/tapeserver/client/ClientInterface.hpp"
#include "castor/log/LogContext.hpp"
36

37
#include "castor/tape/tapeserver/threading/AtomicCounter.hpp"
David COME's avatar
David COME committed
38
39
40
41
42
43
44
45
namespace castor{
namespace tape{
namespace tapeserver{
namespace daemon {

/**
 * This classis responsible for creating the tasks in case of a recall job
 */
46
class MigrationTaskInjector /*: public TaskInjector*/ {  
David COME's avatar
David COME committed
47
48
public:

49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
  /**
   * Constructor
   * @param mm The memory manager for accessing memory blocks. 
   * The Newly created tapeWriter Tasks will register themselves 
   * as a client to it. 
   * @param diskReader the one object that will hold all the threads which will be executing 
   * disk-reading tasks
   * @param tapeWriter the one object that will hold the thread which will be executing 
   * tape-writing tasks
   * @param client The one that will give us files to migrate 
   * @param maxFiles maximal number of files we may request to the client at once 
   * @param byteSizeThreshold maximal number of cumulated byte 
   * we may request to the client. at once
   * @param lc log context, copied because of the threading mechanism 
   */
64
  MigrationTaskInjector(MigrationMemoryManager & mm, 
65
        DiskReadThreadPool & diskReader,
66
        TapeSingleThreadInterface<TapeWriteTask> & tapeWriter,client::ClientInterface& client,
67
        uint64_t maxFiles, uint64_t byteSizeThreshold,castor::log::LogContext lc);
David COME's avatar
David COME committed
68
69
70
71
72
73
74
75
76
77

  /**
   * Wait for the inner thread to finish
   */
  void waitThreads();
  
  /**
   * Start the inner thread 
   */
  void startThreads();
78
79
80
81
82
83
84
85
  /**
   * Function for a feed-back loop purpose between MigrationTaskInjector and 
   * DiskReadThreadPool. When DiskReadThreadPool::popAndRequestMoreJobs detects 
   * it has not enough jobs to do to, it is class to push a request 
   * in order to (try) fill up the queue. 
   * @param lastCall true if we want the new request to be a last call. 
   * See Request::lastCall 
   */
86
  void requestInjection(bool lastCall);
87
88
89
90
91
92
93
  
  /**
   * Contact the client to make sure there are really something to do
   * Something = migration at most  maxFiles or at least maxBytes
   * 
   * @return true if there are jobs to be done, false otherwise 
   */
94
  bool synchronousInjection();
95
96
97
98
99
100
  
  /**
   * Send an end token in the request queue. There should be no subsequent
   * calls to requestInjection.
   */
  void finish();
101
102
103
104
  
  uint64_t lastFSeq() const {
    return m_lastFseq;
  }
David COME's avatar
David COME committed
105
private:
106
107
108
109
110
  /**
   * Create all the tape-read and write-disk tasks for set of files to retrieve
   * @param jobs the list of FileToMigrateStructs we have to transform in a pair of task
   */
  void injectBulkMigrations(const std::vector<castor::tape::tapegateway::FileToMigrateStruct*>& jobs);
111
112
113
114
115
116
  
  /*Compute how many blocks are needed for a file of fileSize bytes*/
  size_t howManyBlocksNeeded(size_t fileSize,size_t blockCapacity){
    return fileSize/blockCapacity + ((fileSize%blockCapacity==0) ? 0 : 1); 
  }
  
117
118
119
120
121
122
123
124
125
   /**
   * A request of files to migrate. We request EITHER
   * - a maximum of nbMaxFiles files
   * - OR at least byteSizeThreshold bytes. 
   * That means we stop as soon as we have nbMaxFiles files or the cumulated size 
   * is equal or above byteSizeThreshold. 
   */
  class Request {
  public:
126
    Request(uint64_t mf, uint64_t mb, bool lc):
127
128
129
130
    nbMaxFiles(mf), byteSizeThreshold(mb), lastCall(lc), end(false) {}
    
    Request():
    nbMaxFiles(-1), byteSizeThreshold(-1), lastCall(true),end(true) {}
131
    
132
133
    const uint64_t nbMaxFiles;
    const uint64_t byteSizeThreshold;
134
135
136
137
138
139
140
    
    /** 
     * True if it is the last call for the set of requests :it means
     *  we don't need to try to get more files to recall 
     *  and can send into all the different threads a signal  .
     */
    const bool lastCall;
141
142
143
144
145
    
    /**
     * True indicates the task injector will not receive any more request.
     */
    const bool end;
146
147
  };
  
David COME's avatar
David COME committed
148
149
  class WorkerThread: public castor::tape::threading::Thread {
  public:
150
    WorkerThread(MigrationTaskInjector & rji): m_parent(rji) {}
David COME's avatar
David COME committed
151
152
    virtual void run();
  private:
153
    MigrationTaskInjector & m_parent;
David COME's avatar
David COME committed
154
  } m_thread;
155
  ///The memory manager for accessing memory blocks. 
156
  MigrationMemoryManager & m_memManager;
David COME's avatar
David COME committed
157
  
158
159
  ///the one object that will hold the thread which will be executing 
  ///tape-writing tasks
160
  TapeSingleThreadInterface<TapeWriteTask>& m_tapeWriter;
161
162
163
  
  ///the one object that will hold all the threads which will be executing 
  ///disk-reading tasks
164
  DiskReadThreadPool & m_diskReader;
165
166
  
  /// the client who is sending us jobs
David COME's avatar
David COME committed
167
168
169
170
171
172
173
174
  client::ClientInterface& m_client;
  
  /**
   * utility member to log some pieces of information
   */
  castor::log::LogContext m_lc;
  
  castor::tape::threading::Mutex m_producerProtection;
175
176
  
  ///all the requests for work we will forward to the client.
David COME's avatar
David COME committed
177
  castor::tape::threading::BlockingQueue<Request> m_queue;
178
  
179
180
181
182
  /** a shared flag among the all tasks related to migration, set as true 
   * as soon a single task encounters a failure. That way we go into a degraded mode
   * where we only circulate memory without writing anything on tape
   */
183
  castor::tape::threading::AtomicFlag m_errorFlag;
184

185
  /// The maximum number of files we ask per request. 
186
187
  const uint64_t m_maxFiles;
  
188
  /// Same as m_maxFilesReq for size per request. (in bytes))
189
  const uint64_t m_maxBytes;
190
191
192
193
194
195
196
197
198
199
  
  /**The last fseq used on the tape. We should not see this but 
   * IT is computed by subtracting 1 to fSeg  of the first file to migrate we 
   * receive. That part is done by the 
   * MigrationTaskInjector.::synchronousInjection. Thus, we compute it into 
   * that function and retrieve/set it within MountSession executeWrite
   * after we make sure synchronousInjection returned true. To do so, we
   *  need to store it
   */
  uint64_t m_lastFseq;
David COME's avatar
David COME committed
200
201
202
203
204
205
206
};

} //end namespace daemon
} //end namespace tapeserver
} //end namespace tape
} //end namespace castor