ArchiveQueueAlgorithms.hpp 5.27 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
#pragma once
20
21
22
23
24
25
26
27
28
29
30
31
#include "Algorithms.hpp"
#include "ArchiveQueue.hpp"

namespace cta { namespace objectstore {

template <>
class ContainerTraits<ArchiveQueue> {
public:
  typedef ArchiveQueue                                           Container;
  typedef std::string                                            ContainerAddress;
  typedef std::string                                            ElementAddress;
  typedef std::string                                            ContainerIdentifyer;
32
  struct InsertedElement {
33
34
35
36
    std::unique_ptr<ArchiveRequest> archiveRequest;
    uint16_t copyNb;
    cta::common::dataStructures::ArchiveFile archiveFile;
    cta::common::dataStructures::MountPolicy mountPolicy;
37
    typedef std::list<InsertedElement> list;
38
  };
39
40
41
  
  template <class Element>
  struct OpFailure {
42
43
    Element * element;
    std::exception_ptr failure;
44
    typedef std::list<OpFailure> list;
45
  };
46
  
47
48
49
  typedef ArchiveRequest::JobDump                                ElementDescriptor;
  typedef std::list<ElementDescriptor>                           ElementDescriptorContainer;
  
50
  template <class Element>
51
52
  static ElementAddress getElementAddress(const Element & e) { return e.archiveRequest->getAddressIfSet(); }
  
53
  static void getLockedAndFetched(Container & cont, ScopedExclusiveLock & aqL, AgentReference & agRef, const ContainerIdentifyer & contId,
54
    log::LogContext & lc);
55
  
56
57
  static void getLockedAndFetchedNoCreate(Container & cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId,
    log::LogContext & lc);
58
  
59
60
  static void addReferencesAndCommit(Container & cont, InsertedElement::list & elemMemCont,
      AgentReference & agentRef, log::LogContext & lc);
61
  
62
63
64
65
66
67
  static void removeReferencesAndCommit(Container & cont, OpFailure<InsertedElement>::list & elementsOpFailures);
  
  static void removeReferencesAndCommit(Container & cont, std::list<ElementAddress>& elementAddressList);
  
  static OpFailure<InsertedElement>::list switchElementsOwnership(InsertedElement::list & elemMemCont,
      const ContainerAddress & contAddress, const ContainerAddress & previousOwnerAddress, log::LogContext & lc);
68
  
69
70
71
  class OwnershipSwitchFailure: public cta::exception::Exception {
  public:
    OwnershipSwitchFailure(const std::string & message): cta::exception::Exception(message) {};
72
    OpFailure<InsertedElement>::list failedElements;
73
  };
74
  
75
76
77
78
79
80
  class PoppedElement {
  public:
    std::unique_ptr<ArchiveRequest> archiveRequest;
    uint16_t copyNb;
    uint64_t bytes;
  };
81
82
83
84
  class PoppedElementsSummary;
  class PopCriteria {
  public:
    PopCriteria& operator-= (const PoppedElementsSummary &);
85
86
    uint64_t bytes;
    uint64_t files;
87
  };
88
  class PoppedElementsList: public std::list<PoppedElement> {
89
90
  public:
    void insertBack(PoppedElementsList &&);
91
    void insertBack(PoppedElement &&);
92
  };
93
  
94
95
  class PoppedElementsSummary {
  public:
96
97
98
99
100
101
102
103
104
105
    uint64_t bytes;
    uint64_t files;
    bool operator< (const PopCriteria & pc) {
      return bytes < pc.bytes && files < pc.files;
    }
    PoppedElementsSummary& operator+= (const PoppedElementsSummary & other) {
      bytes += other.bytes;
      files += other.files;
      return *this;
    }
106
107
108
109
110
111
112
  };
  class PoppedElementsBatch {
  public:
    PoppedElementsList elements;
    PoppedElementsSummary summary;
  };
  
113
114
115
116
117
118
  typedef std::set<ElementAddress> ElementsToSkipSet;
  
  static PoppedElementsSummary getElementSummary(const PoppedElement &);
  
  static PoppedElementsBatch getPoppingElementsCandidates(Container & cont, PopCriteria & unfulfilledCriteria,
      ElementsToSkipSet & elemtsToSkip, log::LogContext & lc);
119
120
  CTA_GENERATE_EXCEPTION_CLASS(NoSuchContainer);

121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
  static OpFailure<PoppedElement>::list switchElementsOwnership(PoppedElementsBatch & popedElementBatch, const ContainerAddress & contAddress,
      const ContainerAddress & previousOwnerAddress, log::LogContext & lc) {
    std::list<std::unique_ptr<ArchiveRequest::AsyncJobOwnerUpdater>> updaters;
    for (auto & e: popedElementBatch.elements) {
      ArchiveRequest & ar = *e.archiveRequest;
      auto copyNb = e.copyNb;
      updaters.emplace_back(ar.asyncUpdateJobOwner(copyNb, contAddress, previousOwnerAddress));
    }
    auto u = updaters.begin();
    auto e = popedElementBatch.elements.begin();
    OpFailure<PoppedElement>::list ret;
    while (e != popedElementBatch.elements.end()) {
      try {
        u->get()->wait();
      } catch (...) {
        ret.push_back(OpFailure<PoppedElement>());
        ret.back().element = &(*e);
        ret.back().failure = std::current_exception();
      }
      u++;
      e++;
    }
    return ret;
  }
  
146
147
148
};

}} // namespace cta::objectstore