ArchiveQueue.hpp 4.75 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/*
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#pragma once

#include "Backend.hpp"
#include "ObjectOps.hpp"
#include <string>
#include "objectstore/cta.pb.h"
#include "common/CreationLog.hpp"
#include "common/MountControl.hpp"
#include "ArchiveRequest.hpp"
28
#include "ArchiveRequest.hpp"
29
#include "EntryLogSerDeser.hpp"
30
31
32
33
34
35
#include "Agent.hpp"

namespace cta { namespace objectstore {
  
class GenericObject;

36
class ArchiveQueue: public ObjectOps<serializers::ArchiveQueue, serializers::ArchiveQueue_t> {
37
38
  // TODO: rename tapepoolname field to tapepool (including in probuf)
  
39
40
public:
  // Constructor
41
  ArchiveQueue(const std::string & address, Backend & os);
42
  
43
44
45
  // Undefined object constructor
  ArchiveQueue(Backend & os);
  
46
  // Upgrader form generic object
47
  ArchiveQueue(GenericObject & go);
48
49
50
51

  // In memory initialiser
  void initialize(const std::string & name);
  
52
53
  // Commit with sanity checks (override from ObjectOps
  void commit();
54
55
56
private:
  // Validates all summaries are in accordance with each other.
  bool checkMapsAndShardsCoherency();
57
  
58
59
60
61
  // Rebuild from shards if something goes wrong.
  void rebuild();
  
public:
62
63
64
  // Set/get tape pool
  void setTapePool(const std::string & name);
  std::string getTapePool();
65
  
66
67
  // Archive jobs management ===================================================
  struct JobToAdd {
68
    ArchiveRequest::JobDump job;
69
70
71
72
73
74
    const std::string archiveRequestAddress;
    uint64_t archiveFileId;
    uint64_t fileSize;
    const cta::common::dataStructures::MountPolicy policy;
    time_t startTime;
  };
75
76
77
78
79
80
81
82
  /** Add the jobs to the queue. 
   * The lock will be used to mark the shards as locked (the lock is the same for 
   * the main object and the shard, the is no shared access.
   * As we potentially have to create new shard(s), we need access to the agent 
   * reference (to generate a non-colliding object name).
   * We will also log the shard creation (hence the context)
   */ 
  void addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentReference & agentReference, log::LogContext & lc);
83
  /// This version will check for existence of the job in the queue before
84
85
86
87
88
  // returns the count and sizes of actually added jobs (if any).
  struct AdditionSummary {
    uint64_t files = 0;
    uint64_t bytes = 0;
  };
89
90
  AdditionSummary addJobsIfNecessaryAndCommit(std::list<JobToAdd> & jobsToAdd,
    AgentReference & agentReference, log::LogContext & lc);
91
92
  
  struct JobsSummary {
93
    uint64_t jobs;
94
95
96
    uint64_t bytes;
    time_t oldestJobStartTime;
    uint64_t priority;
97
98
    uint64_t minArchiveRequestAge;
    uint64_t maxDrivesAllowed;
99
100
101
  };
  JobsSummary getJobsSummary();
  
102
  void removeJobsAndCommit(const std::list<std::string> & jobsToRemove);
103
  struct JobDump {
104
105
106
107
108
    uint64_t size;
    std::string address;
    uint16_t copyNb;
  };
  std::list<JobDump> dumpJobs();
109
110
111
112
113
114
115
116
117
118
119
  struct CandidateJobList {
    uint64_t remainingFilesAfterCandidates = 0;
    uint64_t remainingBytesAfterCandidates = 0;
    uint64_t candidateFiles = 0;
    uint64_t candidateBytes = 0;
    std::list<JobDump> candidates;
  };
  // The set of archive requests to skip are requests previously identified by the caller as bad,
  // which still should be removed from the queue. They will be disregarded from  listing.
  CandidateJobList getCandidateList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> archiveRequestsToSkip);
   
120
121
122
123
124
  // Check that the tape pool is empty (of both tapes and jobs)
  bool isEmpty();
 
  CTA_GENERATE_EXCEPTION_CLASS(NotEmpty);
  // Garbage collection
125
126
  void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
    cta::catalogue::Catalogue & catalogue) override;
127
128
  
  std::string dump();
129
130
131
132
133
134
135
  
  // The shard size. From experience, 100k is where we start to see performance difference,
  // but nothing prevents us from using a smaller size.
  // The performance will be roughly flat until the queue size reaches the square of this limit
  // (meaning the queue object updates start to take too much time).
  // with this current value of 25k, the performance should be roughly flat until 25k^2=625M.
  static const uint64_t c_maxShardSize = 25000;
136
137
138
};
  
}}