Sorter.hpp 14.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
 * The CERN Tape Archive (CTA) project
 * Copyright © 2018 CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef SORTER_HPP
#define SORTER_HPP
#include <map>
#include <tuple>
#include "JobQueueType.hpp"
#include <memory>
#include "ArchiveRequest.hpp"
#include "RetrieveRequest.hpp"
#include "common/log/LogContext.hpp"
#include "Agent.hpp"
#include <future>
#include "common/threading/Mutex.hpp"
#include "GenericObject.hpp"
#include "catalogue/Catalogue.hpp"
#include "common/dataStructures/ArchiveJob.hpp"
#include "RetrieveQueue.hpp"
#include "ArchiveQueue.hpp"
36
37
#include "Algorithms.hpp"
#include "ArchiveQueueAlgorithms.hpp"
38
#include "RetrieveQueueAlgorithms.hpp"
39
40
41

namespace cta { namespace objectstore {  
  
42
//forward declarations  
43
44
struct ArchiveJobQueueInfo;
struct RetrieveJobQueueInfo;
45
46
47
class RetrieveRequestInfosAccessorInterface;
class OStoreRetrieveRequestAccessor;
class SorterRetrieveRequestAccessor;
48
  
49
50
51
class Sorter {
public:  
  CTA_GENERATE_EXCEPTION_CLASS(RetrieveRequestHasNoCopies);
52
  
53
  Sorter(AgentReference& agentReference,Backend &objectstore, catalogue::Catalogue& catalogue);
54
  ~Sorter();
55
  
56
57
58
  typedef std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr<ArchiveJobQueueInfo>>> MapArchive;
  typedef std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr<RetrieveJobQueueInfo>>> MapRetrieve;
  
59
60
61
  /**
   * This structure holds the necessary data to queue a job taken from the ArchiveRequest that needs to be queued.
   */
62
63
64
  struct ArchiveJob{
    std::shared_ptr<ArchiveRequest> archiveRequest;
    ArchiveRequest::JobDump jobDump;
65
66
    common::dataStructures::ArchiveFile archiveFile;
    AgentReferenceInterface * previousOwner;
67
    common::dataStructures::MountPolicy mountPolicy;
68
69
70
71
72
73
74
75
76
    cta::objectstore::JobQueueType jobQueueType;
  };
  
  /**
   * This structure holds the datas the user have to 
   * give to insert an ArchiveRequest without any fetch needed on the Request
   */
  struct SorterArchiveRequest{
    std::list<ArchiveJob> archiveJobs;
77
78
  };
  
79
  /* Archive-related methods */
80
81
82
83
  /**
   * This method allows to insert the ArchiveRequest passed in parameter into the sorter.
   * It will put all the ArchiveRequest's jobs in the correct list so that they will be queued in the correct
   * queue after a flush*() call
84
85
   * @param archiveRequest the ArchiveRequest containing the jobs to queue. This request should be locked and fetched before calling this method. You will have to unlock this
   * request after the execution of this method
86
87
88
89
   * @param previousOwner the previous Owner of the jobs of the ArchiveRequest. The new owner will be the agent of the sorter.
   * @param lc the log context for logging
   * @throws a cta::exception::Exception if the QueueType could not have been determined according to the status of the job
   */
90
  void insertArchiveRequest(std::shared_ptr<ArchiveRequest> archiveRequest, AgentReferenceInterface& previousOwner, log::LogContext& lc);
91
  
92
93
94
95
96
97
98
99
  /**
   * This method allow to insert an ArchiveRequest without having to lock it before
   * User has to create a SorterArchiveRequest object that will hold a list<Sorter::ArchiveJob>.
   * The Sorter::ArchiveJob have to be created by the user with all fields filled
   * @param archiveRequest The SorterArchiveRequest to insert into the sorter
   */
  void insertArchiveRequest(const SorterArchiveRequest& archiveRequest,AgentReferenceInterface& previousOwner, log::LogContext& lc);
  
100
101
102
103
104
105
  /**
   * This method will take the first list<ArchiveJobQueueInfo> contained in the MapArchive, queue all Archive jobs contained in it and delete the list from the map
   * @param lc the LogContext for logging
   * @return true if a list have been flush, false otherwise
   * If an exception is thrown during the queueing, the promise of each failed queueing job will get the exception
   */
106
  bool flushOneArchive(log::LogContext &lc);
107
108
109
110
111
  
  /**
   * Returns the map of all ArchiveJobs that will be queued. This method could be use to save all the std::future of all jobs.
   * @return the map[tapePool,jobQueueType] = list<ArchiveJobQueueInfo>
   */
112
  MapArchive getAllArchive();
113
  
114
115
  /* End of Archive-related methods */
  
116
117
118
  /**
   * This structure holds the necessary data to queue a job taken from the RetrieveRequest that needs to be queued.
   */
119
120
121
  struct RetrieveJob{
    std::shared_ptr<RetrieveRequest> retrieveRequest;
    RetrieveRequest::JobDump jobDump;
122
    AgentReferenceInterface * previousOwner;
123
124
125
    uint64_t fileSize;
    uint64_t fSeq;
    common::dataStructures::MountPolicy mountPolicy;
126
127
128
129
130
131
132
133
134
135
    cta::objectstore::JobQueueType jobQueueType;
  };
  
  /**
   * This structure holds the datas the user have to 
   * give to insert a RetrieveRequest without any fetch needed on the Request
   */
  struct SorterRetrieveRequest{
    common::dataStructures::ArchiveFile archiveFile;
    std::map<uint32_t, RetrieveJob> retrieveJobs;
136
137
  };
  
138
  /* Retrieve-related methods */
139
140
141
142
143
144
145
  /**
   * This methods allows to insert the RetrieveRequest passed in parameter into the sorter.
   * It works as following :
   * 1. if copyNb is nullopt, then the best vid will be selected by the Helpers::getBestRetrieveQueue() method. The tapeFile (copyNb) corresponding to this vid
   * will be selected amongst the jobs of the RetrieveRequest. The job corresponding to the tapeFile will be inserted.
   * 2. if copyNb corresponds to a tapeFile, the corresponding job will be inserted according to its status (e.g : if jobStatus = ToReportToUser, the job will be inserted in
   * the list associated to the vid of the job and the ToReportToUser queueType.
146
147
   * @param retrieveRequest the RetrieveRequest that needs to be queued. This request should be locked and fetched before calling this method. You have to unlock the request 
   * after this method
148
149
150
151
152
153
154
155
156
   * @param previousOwner the previous owner of the RetrieveRequest. The new owner will be the agent of the Sorter.
   * @param copyNb : if copyNb is nullopt, then the job we want to queue is supposed to be a ToTransfer job (if no job ToTransfer are present in the RetrieveJobs, an exception
   * will be thrown). If not, this method will select the queueType according 
   * to the copyNb passed in parameter. If no queueType is found, an exception will be thrown.  
   * @param lc the LogContext for logging
   * @throws RetrieveRequestHasNoCopies exception if no tapeFile is found for the best vid (in the case where copyNb == nullopt)
   * @throws Exception if no ToTransfer jobs are found in the RetrieveRequest passed in parameter (in the case where copyNb == nullopt)
   * @throws Exception if the destination queue could not have been determined according to the copyNb passed in parameter
   */
157
  void insertRetrieveRequest(std::shared_ptr<RetrieveRequest> retrieveRequest, AgentReferenceInterface &previousOwner, cta::optional<uint32_t> copyNb, log::LogContext & lc);
158
  
159
160
161
162
163
164
165
166
167
168
169
170
171
172
  /**
   * This method is the same as the one above. The difference is on the representation of a RetrieveRequest
   * @param retrieveRequest the SorterRetrieveRequest object created by the user before calling this method
   * @param previousOwner the previous owner of the retrieveRequest to insert
   * @param if copyNb is nullopt, then the job we want to queue is supposed to be a ToTransfer job (if no job ToTransfer are present in the RetrieveJobs, an exception
   * will be thrown). If not, this method will select the queueType according 
   * to the copyNb passed in parameter. If no queueType is found, an exception will be thrown.
   * @param lc the LogContext for logging
   * @throws RetrieveRequestHasNoCopies exception if no tapeFile is found for the best vid (in the case where copyNb == nullopt)
   * @throws Exception if no ToTransfer jobs are found in the RetrieveRequest passed in parameter (in the case where copyNb == nullopt)
   * @throws Exception if the destination queue could not have been determined according to the copyNb passed in parameter
   */
  void insertRetrieveRequest(SorterRetrieveRequest& retrieveRequest, AgentReferenceInterface &previousOwner,cta::optional<uint32_t> copyNb, log::LogContext & lc);
  
173
174
175
176
177
178
  /**
   * This method will take the first list<RetrieveJobQueueInfo> contained in the MapRetrieve, queue all Retrieve jobs contained in it and delete the list from the map
   * @param lc the LogContext for logging
   * @return true if a list have been flush, false otherwise
   * If an exception is thrown during the queueing, the promise of each failed queueing job will get the exception
   */
179
  bool flushOneRetrieve(log::LogContext &lc);
180
181
182
183
184
  
   /**
   * Returns the map of all RetrieveJobs that will be queued. This method could be use to save all the std::future of all jobs.
   * @return the map[vid,jobQueueType] = list<RetrieveJobQueueInfo>
   */
185
186
187
  MapRetrieve getAllRetrieve();
  /* End of Retrieve-related methods */
  
188
189
190
191
192
193
194
  /**
   * This method allows to queue all the jobs that are in the sorter's MapArchive and MapRetrieve maps;
   * @param lc the LogContext for logging
   * If an exception happens while queueing a job, the promise associated will get the exception.
   */
  void flushAll(log::LogContext& lc);
  
195
196
197
198
199
200
201
private:
  AgentReference &m_agentReference;
  Backend &m_objectstore;
  catalogue::Catalogue &m_catalogue;
  MapArchive m_archiveQueuesAndRequests;
  MapRetrieve m_retrieveQueuesAndRequests;
  threading::Mutex m_mutex;
202
203
204
  
  /* Retrieve-related methods */
  std::set<std::string> getCandidateVidsToTransfer(RetrieveRequest &request);
205
206
  std::set<std::string> getCandidateVidsToTransfer(const SorterRetrieveRequest& request);
  std::set<std::string> getCandidateVidsToTransfer(RetrieveRequestInfosAccessorInterface &requestAccessor);
207
  std::string getBestVidForQueueingRetrieveRequest(RetrieveRequest& retrieveRequest, std::set<std::string>& candidateVids, log::LogContext &lc);
208
209
  std::string getBestVidForQueueingRetrieveRequest(const SorterRetrieveRequest& request, std::set<std::string>& candidateVids, log::LogContext &lc);
  std::string getBestVidForQueueingRetrieveRequest(RetrieveRequestInfosAccessorInterface &requestAccessor, std::set<std::string>& candidateVids, log::LogContext &lc);
210
  void queueRetrieveRequests(const std::string vid, const JobQueueType jobQueueType, std::list<std::shared_ptr<RetrieveJobQueueInfo>>& archiveJobInfos, log::LogContext &lc);
211
212
  void dispatchRetrieveAlgorithm(const std::string vid, const JobQueueType jobQueueType, std::string& queueAddress, std::list<std::shared_ptr<RetrieveJobQueueInfo>>& retrieveJobsInfos, log::LogContext &lc);
  Sorter::RetrieveJob createRetrieveJob(std::shared_ptr<RetrieveRequest> retrieveRequest, const cta::common::dataStructures::ArchiveFile archiveFile, const uint32_t copyNb, const uint64_t fSeq, AgentReferenceInterface *previousOwner);
213
  void insertRetrieveRequest(RetrieveRequestInfosAccessorInterface &accessor,AgentReferenceInterface &previousOwner, cta::optional<uint32_t> copyNb, log::LogContext & lc);
214
215
216
217
218
219
220
  
  template<typename SpecificQueue>
  void executeRetrieveAlgorithm(const std::string vid, std::string& queueAddress, std::list<std::shared_ptr<RetrieveJobQueueInfo>>& jobs, log::LogContext& lc);
  /* End of Retrieve-related methods */
  
  /* Archive-related methods */
  void queueArchiveRequests(const std::string tapePool, const JobQueueType jobQueueType, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& requests, log::LogContext &lc);
221
222
223
  
  void insertArchiveJob(const ArchiveJob& job); 
  
224
225
  void dispatchArchiveAlgorithm(const std::string tapePool, const JobQueueType jobQueueType, std::string& queueAddress, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& archiveJobsInfos, log::LogContext &lc);

226
227
  template<typename SpecificQueue>
  void executeArchiveAlgorithm(const std::string tapePool, std::string& queueAddress, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& jobs, log::LogContext& lc);
228
  /* End of Archive-related methods */
229
230
231
232
233
234
235
236
237
238
239
240
};

struct ArchiveJobQueueInfo{
  std::tuple<Sorter::ArchiveJob,std::promise<void>> jobToQueue;
  //TODO : Job reporting
};

struct RetrieveJobQueueInfo{
  std::tuple<Sorter::RetrieveJob,std::promise<void>> jobToQueue;
  //TODO : Job reporting
};

241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
class RetrieveRequestInfosAccessorInterface{
  public:
    RetrieveRequestInfosAccessorInterface();
    virtual std::list<RetrieveRequest::JobDump> getJobs() = 0;
    virtual common::dataStructures::ArchiveFile getArchiveFile() = 0;
    virtual Sorter::RetrieveJob createRetrieveJob(const cta::common::dataStructures::ArchiveFile archiveFile,
        const uint32_t copyNb, const uint64_t fSeq, AgentReferenceInterface* previousOwner) = 0;
    virtual ~RetrieveRequestInfosAccessorInterface();
};

class OStoreRetrieveRequestAccessor: public RetrieveRequestInfosAccessorInterface{
  public:
    OStoreRetrieveRequestAccessor(std::shared_ptr<RetrieveRequest> retrieveRequest);
    ~OStoreRetrieveRequestAccessor();
    std::list<RetrieveRequest::JobDump> getJobs();
    common::dataStructures::ArchiveFile getArchiveFile();
    Sorter::RetrieveJob createRetrieveJob(const cta::common::dataStructures::ArchiveFile archiveFile,
        const uint32_t copyNb, const uint64_t fSeq, AgentReferenceInterface* previousOwner);
  private:
    std::shared_ptr<RetrieveRequest> m_retrieveRequest;
};

class SorterRetrieveRequestAccessor: public RetrieveRequestInfosAccessorInterface{
  public:
    SorterRetrieveRequestAccessor(Sorter::SorterRetrieveRequest& request);
    ~SorterRetrieveRequestAccessor();
    std::list<RetrieveRequest::JobDump> getJobs();
    common::dataStructures::ArchiveFile getArchiveFile();
    Sorter::RetrieveJob createRetrieveJob(const cta::common::dataStructures::ArchiveFile archiveFile,
        const uint32_t copyNb, const uint64_t fSeq, AgentReferenceInterface* previousOwner);
  private:
    Sorter::SorterRetrieveRequest& m_retrieveRequest;
};

275
276
}}
#endif /* SORTER_HPP */