OStoreDB.hpp 17.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#pragma once

#include "scheduler/SchedulerDatabase.hpp"
22
#include "objectstore/Agent.hpp"
23
#include "objectstore/AgentReference.hpp"
24
#include "objectstore/ArchiveRequest.hpp"
25
#include "objectstore/ArchiveQueue.hpp"
Daniele Kruse's avatar
Daniele Kruse committed
26
#include "objectstore/ArchiveRequest.hpp"
27
#include "objectstore/DriveRegister.hpp"
28
#include "objectstore/RetrieveRequest.hpp"
29
#include "objectstore/SchedulerGlobalLock.hpp"
30
31
#include "catalogue/Catalogue.hpp"
#include "common/log/Logger.hpp"
32
33
#include "common/threading/BlockingQueue.hpp"
#include "common/threading/Thread.hpp"
34
35
36
37

namespace cta {
  
namespace objectstore {
38
39
  class Backend;
  class Agent;
40
  class RootEntry;
41
}
42
43

namespace ostoredb {
44
45
  template <class, class>
  class MemQueue;
46
}
47
48
  
class OStoreDB: public SchedulerDatabase {
49
50
  template <class, class>
  friend class cta::ostoredb::MemQueue;
51
public:
52
  OStoreDB(objectstore::Backend & be, catalogue::Catalogue & catalogue, log::Logger &logger);
53
54
  virtual ~OStoreDB() throw();
  
55
  /* === Object store and agent handling ==================================== */
56
  void setAgentReference(objectstore::AgentReference *agentReference);
57
58
  CTA_GENERATE_EXCEPTION_CLASS(AgentNotSet);
private:
59
  void assertAgentAddressSet();
60
public:
61
62
  
  CTA_GENERATE_EXCEPTION_CLASS(NotImplemented);
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
  /*============ Thread pool for queueing bottom halfs ======================*/
private:
  typedef std::function<void()> EnqueueingTask;
  cta::threading::BlockingQueue<EnqueueingTask*> m_enqueueingTasksQueue;
  class EnqueueingWorkerThread: private cta::threading::Thread {
  public:
    EnqueueingWorkerThread(cta::threading::BlockingQueue<EnqueueingTask*> & etq): 
      m_enqueueingTasksQueue(etq) {}
    void start() { cta::threading::Thread::start(); }
    void wait() { cta::threading::Thread::wait(); }
  private:
    void run() override;
    cta::threading::BlockingQueue<EnqueueingTask*> & m_enqueueingTasksQueue;
  };
  std::vector<EnqueueingWorkerThread *> m_enqueueingWorkerThreads;
78
79
80
81
  std::atomic<uint64_t> m_taskQueueSize; ///< This counter ensures destruction happens after the last thread completed.
  /// Delay introduced before posting to the task queue when it becomes too long.
  void delayIfNecessary(log::LogContext &lc);
  cta::threading::Semaphore m_taskPostingSemaphore;
82
public:
83
  void waitSubthreadsComplete() override;
84
  void setThreadNumber(uint64_t threadNumber);
85
  void setBottomHalfQueueSize(uint64_t tasksNumber);
86
  /*============ Basic IO check: validate object store access ===============*/
87
  void ping() override;
88

89
  /* === Session handling =================================================== */
90
  class TapeMountDecisionInfo: public SchedulerDatabase::TapeMountDecisionInfo {
91
    friend class OStoreDB;
92
  public:
93
94
95
    CTA_GENERATE_EXCEPTION_CLASS(SchedulingLockNotHeld);
    CTA_GENERATE_EXCEPTION_CLASS(TapeNotWritable);
    CTA_GENERATE_EXCEPTION_CLASS(TapeIsBusy);
96
    std::unique_ptr<SchedulerDatabase::ArchiveMount> createArchiveMount(
Eric Cano's avatar
Eric Cano committed
97
      const catalogue::TapeForWriting & tape,
98
      const std::string driveName, const std::string& logicalLibrary, 
99
100
      const std::string & hostName, time_t startTime) override;
    std::unique_ptr<SchedulerDatabase::RetrieveMount> createRetrieveMount(
101
102
      const std::string & vid, const std::string & tapePool,
      const std::string driveName,
103
      const std::string& logicalLibrary, const std::string& hostName, 
104
      time_t startTime) override;
105
    virtual ~TapeMountDecisionInfo();
106
  private:
107
    TapeMountDecisionInfo (OStoreDB & oStoreDB);
108
109
110
    bool m_lockTaken;
    objectstore::ScopedExclusiveLock m_lockOnSchedulerGlobalLock;
    std::unique_ptr<objectstore::SchedulerGlobalLock> m_schedulerGlobalLock;
111
    OStoreDB & m_oStoreDB;
112
  };
113
  friend class TapeMountDecisionInfo;
114
115
116
117
118
119
120
121
122
123
124
125
126
127
  
  class TapeMountDecisionInfoNoLock: public SchedulerDatabase::TapeMountDecisionInfo {
  public:
    std::unique_ptr<SchedulerDatabase::ArchiveMount> createArchiveMount(
      const catalogue::TapeForWriting & tape,
      const std::string driveName, const std::string& logicalLibrary, 
      const std::string & hostName, time_t startTime) override;
    std::unique_ptr<SchedulerDatabase::RetrieveMount> createRetrieveMount(
      const std::string & vid, const std::string & tapePool,
      const std::string driveName,
      const std::string& logicalLibrary, const std::string& hostName, 
      time_t startTime) override;
    virtual ~TapeMountDecisionInfoNoLock();
  };
128

129
130
131
132
133
134
private:
  /**
   * An internal helper function with commonalities of both following functions
   * @param tmdi The TapeMountDecisionInfo where to store the data.
   * @param re A RootEntry object that should be locked and fetched.
   */
135
  void fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo &tmdi, objectstore::RootEntry &re, 
136
    log::LogContext & logContext);
137
public:
138
139
  std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> getMountInfo(log::LogContext& logContext) override;
  std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> getMountInfoNoLock(log::LogContext& logContext) override;
140
  void trimEmptyQueues(log::LogContext& lc) override;
141

142
  /* === Archive Mount handling ============================================= */
143
  class ArchiveJob;
144
145
146
  class ArchiveMount: public SchedulerDatabase::ArchiveMount {
    friend class TapeMountDecisionInfo;
  private:
147
148
    ArchiveMount(OStoreDB & oStoreDB);
    OStoreDB & m_oStoreDB;
149
  public:
150
    CTA_GENERATE_EXCEPTION_CLASS(MaxFSeqNotGoingUp);
151
    const MountInfo & getMountInfo() override;
152
    std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > getNextJobBatch(uint64_t filesRequested, 
153
      uint64_t bytesRequested, log::LogContext& logContext) override;
154
    void complete(time_t completionTime) override;
155
    void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override;
156
    void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override;
157
158
159
160
161
  private:
    OStoreDB::ArchiveJob * castFromSchedDBJob(SchedulerDatabase::ArchiveJob * job);
  public:
    std::set<cta::SchedulerDatabase::ArchiveJob*> setJobBatchSuccessful(
      std::list<cta::SchedulerDatabase::ArchiveJob*>& jobsBatch, log::LogContext & lc) override;
162
  };
163
  friend class ArchiveMount;
164
165
166
  
  /* === Archive Job Handling =============================================== */
  class ArchiveJob: public SchedulerDatabase::ArchiveJob {
167
    friend class OStoreDB::ArchiveMount;
168
  public:
169
170
    CTA_GENERATE_EXCEPTION_CLASS(JobNowOwned);
    CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob);
171
    bool fail(const std::string& failureReason, log::LogContext& lc) override;
172
173
174
175
  private:
    void asyncSucceed();
    bool waitAsyncSucceed();
  public:
176
177
    void bumpUpTapeFileCount(uint64_t newFileCount) override;
    ~ArchiveJob() override;
178
  private:
179
    ArchiveJob(const std::string &, OStoreDB &, ArchiveMount &);
180
    bool m_jobOwned;
181
182
    uint64_t m_mountId;
    std::string m_tapePool;
183
    OStoreDB & m_oStoreDB;
184
    objectstore::ArchiveRequest m_archiveRequest;
185
    ArchiveMount & m_archiveMount;
186
    std::unique_ptr<objectstore::ArchiveRequest::AsyncJobSuccessfulUpdater> m_jobUpdate;
187
  };
188
  friend class ArchiveJob;
189
190
  
  /* === Retrieve Mount handling ============================================ */
191
  class RetrieveJob;
192
193
194
  class RetrieveMount: public SchedulerDatabase::RetrieveMount {
    friend class TapeMountDecisionInfo;
  private:
195
196
    RetrieveMount(OStoreDB &oStoreDB);
    OStoreDB & m_oStoreDB;
197
  public:
198
    const MountInfo & getMountInfo() override;
199
    std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext& logContext) override;
200
    void complete(time_t completionTime) override;
201
    void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override;
202
    void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override;
203
204
205
206
  private:
    OStoreDB::RetrieveJob * castFromSchedDBJob(SchedulerDatabase::RetrieveJob * job);
  public:
    std::set<cta::SchedulerDatabase::RetrieveJob*> finishSettingJobsBatchSuccessful(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, log::LogContext& lc) override;
207
  };
208
  friend class RetrieveMount;
209
210
211
  
  /* === Retrieve Job handling ============================================== */
  class RetrieveJob: public SchedulerDatabase::RetrieveJob {
212
    friend class OStoreDB::RetrieveMount;
213
214
215
  public:
    CTA_GENERATE_EXCEPTION_CLASS(JobNowOwned);
    CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob);
216
217
    virtual void asyncSucceed() override;
    virtual void checkSucceed() override;
218
    bool fail(const std::string& failureReason, log::LogContext&) override;
219
    virtual ~RetrieveJob() override;
220
  private:
221
    RetrieveJob(const std::string &, OStoreDB &, RetrieveMount &);
222
    bool m_jobOwned;
223
    uint64_t m_mountId;
224
    OStoreDB & m_oStoreDB;
225
    objectstore::RetrieveRequest m_retrieveRequest;
226
    OStoreDB::RetrieveMount & m_retrieveMount;
227
    std::unique_ptr<objectstore::RetrieveRequest::AsyncJobDeleter> m_jobDelete;
228
  };
229
  
230
  /* === Archive requests handling  ========================================= */
231
  CTA_GENERATE_EXCEPTION_CLASS(ArchiveRequestHasNoCopies);
232
  CTA_GENERATE_EXCEPTION_CLASS(ArchiveRequestAlreadyCompleteOrCanceled);
233
  CTA_GENERATE_EXCEPTION_CLASS(NoSuchArchiveQueue);
234
  
235
  void queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request, 
236
    const cta::common::dataStructures::ArchiveFileQueueCriteriaAndFileId &criteria, log::LogContext &logContext) override;
237
  
238
239
240
241
  CTA_GENERATE_EXCEPTION_CLASS(ArchiveRequestAlreadyDeleted);
  class ArchiveToFileRequestCancelation:
    public SchedulerDatabase::ArchiveToFileRequestCancelation {
  public:
242
    ArchiveToFileRequestCancelation(objectstore::AgentReference & agentReference, 
243
244
      objectstore::Backend & be, catalogue::Catalogue & catalogue, log::Logger &logger): m_request(be), m_lock(), m_objectStore(be),
      m_catalogue(catalogue), m_logger(logger), m_agentReference(agentReference), m_closed(false) {} 
245
    virtual ~ArchiveToFileRequestCancelation();
246
    void complete(log::LogContext & lc) override;
247
  private:
248
    objectstore::ArchiveRequest m_request;
249
250
    objectstore::ScopedExclusiveLock m_lock;
    objectstore::Backend & m_objectStore;
251
252
    catalogue::Catalogue & m_catalogue;
    log::Logger & m_logger;
253
    objectstore::AgentReference &m_agentReference;
254
255
256
    bool m_closed;
    friend class OStoreDB;
  };
257

258
259
260
  std::map<std::string, std::list<common::dataStructures::ArchiveJob> > getArchiveJobs() const override;
  
  std::list<cta::common::dataStructures::ArchiveJob> getArchiveJobs(const std::string& tapePoolName) const override;
261
262

  /* === Retrieve requests handling  ======================================== */
263
264
  std::list<RetrieveQueueStatistics> getRetrieveQueueStatistics(const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, const std::set<std::string>& vidsToConsider) override;
  
265
  CTA_GENERATE_EXCEPTION_CLASS(RetrieveRequestHasNoCopies);
266
  CTA_GENERATE_EXCEPTION_CLASS(TapeCopyNumberOutOfRange);
267
  std::string queueRetrieve(const cta::common::dataStructures::RetrieveRequest& rqst,
268
    const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria, log::LogContext &logContext) override;
269

270
271
272
  std::list<RetrieveRequestDump> getRetrieveRequestsByVid(const std::string& vid) const override;
  
  std::list<RetrieveRequestDump> getRetrieveRequestsByRequester(const std::string& vid) const override;
273

274
  std::map<std::string, std::list<RetrieveRequestDump> > getRetrieveRequests() const override;
275

276
  void deleteRetrieveRequest(const common::dataStructures::SecurityIdentity& requester, 
277
    const std::string& remoteFile) override;
278
279
280
281
282

  std::list<cta::common::dataStructures::RetrieveJob> getRetrieveJobs(const std::string& tapePoolName) const override;

  std::map<std::string, std::list<common::dataStructures::RetrieveJob> > getRetrieveJobs() const override;

283
  
284
  /* === Drive state handling  ============================================== */
285
286
287
288
  /**
   * Get states of all drives.
   * @return list of all known drive states
   */
289
  std::list<cta::common::dataStructures::DriveState> getDriveStates(log::LogContext & lc) const override;
290
  
291
292
  void setDesiredDriveState(const std::string& drive, const common::dataStructures::DesiredDriveState & desiredState, 
    log::LogContext & lc) override;
293
    
294
  void removeDrive(const std::string & drive, log::LogContext & logContext) override;
295
296
  
  void reportDriveStatus(const common::dataStructures::DriveInfo& driveInfo, cta::common::dataStructures::MountType mountType, 
297
    common::dataStructures::DriveStatus status, time_t reportTime, log::LogContext & lc, uint64_t mountSessionId, uint64_t byteTransfered, 
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
    uint64_t filesTransfered, double latestBandwidth, const std::string& vid, const std::string& tapepool) override;
  
  /* --- Private helper part implementing state transition logic -------------*/
  /*
   * The drive register should gracefully handle reports of status from the drive
   * in all conditions, including when the drive's entry is absent. This ensures
   * that the drive is the leader and register always yields incase of conflicting
   * information.
   */
private:
  /** Collection of smaller scale parts of reportDriveStatus */
  struct ReportDriveStatusInputs {
    common::dataStructures::DriveStatus status;
    cta::common::dataStructures::MountType mountType;
    time_t reportTime; 
    uint64_t mountSessionId;
314
315
    uint64_t byteTransferred;
    uint64_t filesTransferred;
316
317
318
319
    double latestBandwidth;
    std::string vid;
    std::string tapepool;
  };
320
321
322
  /** Collection of smaller scale parts of reportDriveStats */
  struct ReportDriveStatsInputs {
    time_t reportTime; 
323
    uint64_t bytesTransferred;
324
    uint64_t filesTransferred;
325
  };
326
  /**
327
328
   * Utility implementing the operation of finding/creating the drive state object and 
   * updating it.
329
   */
330
331
  void updateDriveStatus(const common::dataStructures::DriveInfo & driveInfo, 
    const ReportDriveStatusInputs & inputs, log::LogContext & lc);
332
  
333
334
335
336
337
338
339
340
  /**
   * Utility implementing the operation get drive state and update stats in it if present, set on an
   * already locked and fetched DriveRegistry. 
   *  
   * @param dr
   * @param driveInfo
   * @param inputs
   */
341
342
  void updateDriveStatistics(const common::dataStructures::DriveInfo & driveInfo, 
    const ReportDriveStatsInputs & inputs, log::LogContext & lc);
343
  
344
345
346
347
348
349
  /** Helper for setting a drive state in a specific case */
  static void setDriveDown(common::dataStructures::DriveState & driveState, const ReportDriveStatusInputs & inputs);
  
  /** Helper for setting a drive state in a specific case. Going UP means drive is ready. It will be instead marked as
   * down if this is the requested state */
  static void setDriveUpOrMaybeDown(common::dataStructures::DriveState & driveState, const ReportDriveStatusInputs & inputs);
350
351
352
 
  /** Helper for setting a drive state in a specific case */
  static void setDriveProbing(common::dataStructures::DriveState & driveState, const ReportDriveStatusInputs & inputs);
353
354
355
356
357
358
359
360
  
  /** Helper for setting a drive state in a specific case */
  static void setDriveStarting(common::dataStructures::DriveState & driveState, const ReportDriveStatusInputs & inputs);
  
  /** Helper for setting a drive state in a specific case */
  static void setDriveMounting(common::dataStructures::DriveState & driveState, const ReportDriveStatusInputs & inputs);
  
  /** Helper for setting a drive state in a specific case */
Eric Cano's avatar
Eric Cano committed
361
  static void setDriveTransferring(common::dataStructures::DriveState & driveState, const ReportDriveStatusInputs & inputs);
362
363
364
365
366
367
368
369
370
371
372
373
374
  
  /** Helper for setting a drive state in a specific case */
  static void setDriveUnloading(common::dataStructures::DriveState & driveState, const ReportDriveStatusInputs & inputs);
  
  /** Helper for setting a drive state in a specific case */
  static void setDriveUnmounting(common::dataStructures::DriveState & driveState, const ReportDriveStatusInputs & inputs);
  
  /** Helper for setting a drive state in a specific case */
  static void setDriveDrainingToDisk(common::dataStructures::DriveState & driveState, const ReportDriveStatusInputs & inputs);
  
  /** Helper for setting a drive state in a specific case */
  static void setDriveCleaningUp(common::dataStructures::DriveState & driveState, const ReportDriveStatusInputs & inputs);
  
375
376
  /** Helper for setting a drive state in a specific case */
  static void setDriveShutdown(common::dataStructures::DriveState & driveState, const ReportDriveStatusInputs & inputs); 
377
378
private:
  objectstore::Backend & m_objectStore;
379
380
  catalogue::Catalogue & m_catalogue;
  log::Logger & m_logger;
381
  objectstore::AgentReference *m_agentReference = nullptr;
382
383
};
  
384
}