Algorithms.hpp 11.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
20
#pragma once

21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * This file defines the containers, their traits and algorithms to add/remove
 * to/from them.
 */

#include <string>
#include "ArchiveRequest.hpp"
#include "Helpers.hpp"
#include "common/log/LogContext.hpp"
#include "common/exception/Exception.hpp"

namespace cta { namespace objectstore {

/**
 * Container traits definition. To be specialized class by class.
 * This is mostly a model.
 */
38
template <class C> 
39
40
class ContainerTraits{
public:
41
  typedef C                                   Container;
42
43
44
  typedef std::string                         ContainerAddress;
  typedef std::string                         ElementAddress;
  typedef std::string                         ContainerIdentifyer;
45
46
47
48
49
  struct InsertedElement {
    typedef std::list<InsertedElement> list;
  };
  typedef std::list<std::unique_ptr<InsertedElement>> ElementMemoryContainer;
  typedef std::list <InsertedElement *>       ElementPointerContainer;
50
  class                                       ElementDescriptor {};
51
  typedef std::list<ElementDescriptor>        ElementDescriptorContainer;
52
53
54
  
  template <class Element>
  struct OpFailure {
55
56
    Element * element;
    std::exception_ptr failure;
57
    typedef std::list<OpFailure> list;
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
  };
  
  class PoppedElementsSummary;
  class PopCriteria {
  public:
    PopCriteria();
    PopCriteria& operator-= (const PoppedElementsSummary &);
  };
  class PoppedElementsList {
  public:
    PoppedElementsList();
    void insertBack(PoppedElementsList &&);
  };
  class PoppedElementsSummary {
  public:
    PoppedElementsSummary();
    bool operator< (const PopCriteria &);
    PoppedElementsSummary& operator+= (const PoppedElementsSummary &);
  };
  class PoppedElementsBatch {
  public:
    PoppedElementsBatch();
    PoppedElementsList elements;
    PoppedElementsSummary summary;
  };
83
  typedef std::set<ElementAddress> ElementsToSkipSet;
84
85
  
  CTA_GENERATE_EXCEPTION_CLASS(NoSuchContainer);
86
87
  
  template <class Element>
88
  static ElementAddress getElementAddress(const Element & e);
89
  
90
91
  static void getLockedAndFetched(Container & cont, ScopedExclusiveLock & contLock, AgentReference & agRef, const ContainerIdentifyer & cId,
    log::LogContext & lc);
92
93
  static void getLockedAndFetchedNoCreate(Container & cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId,
    log::LogContext & lc);
94
  static void addReferencesAndCommit(Container & cont, ElementMemoryContainer & elemMemCont);
95
  void removeReferencesAndCommit(Container & cont, typename OpFailure<InsertedElement>::list & elementsOpFailures);
96
97
98
  void removeReferencesAndCommit(Container & cont, std::list<ElementAddress>& elementAddressList);
  static ElementPointerContainer switchElementsOwnership(ElementMemoryContainer & elemMemCont, const ContainerAddress & contAddress,
      const ContainerAddress & previousOwnerAddress, log::LogContext & lc);
99
100
101
102
  template <class Element>
  static PoppedElementsSummary getElementSummary(const Element &);
  static PoppedElementsBatch getPoppingElementsCandidates(Container & cont, PopCriteria & unfulfilledCriteria,
      ElementsToSkipSet & elemtsToSkip, log::LogContext & lc);
103
104
105
106
107
108
109
110
};

template <class C>
class ContainerAlgorithms {
public:
  ContainerAlgorithms(Backend & backend, AgentReference & agentReference):
    m_backend(backend), m_agentReference(agentReference) {}
    
111
  typedef typename ContainerTraits<C>::InsertedElement InsertedElement;
112
113
114
115
116
117
    
  /** Reference objects in the container and then switch their ownership them. Objects 
   * are provided existing and owned by algorithm's agent. Returns a list of 
   * @returns list of elements for which the addition or ownership switch failed.
   * @throws */
  void referenceAndSwitchOwnership(const typename ContainerTraits<C>::ContainerIdentifyer & contId,
118
      typename ContainerTraits<C>::InsertedElement::list & elements, log::LogContext & lc) {
119
120
121
122
    C cont(m_backend);
    ScopedExclusiveLock contLock;
    ContainerTraits<C>::getLockedAndFetched(cont, contLock, m_agentReference, contId, lc);
    ContainerTraits<C>::addReferencesAndCommit(cont, elements, m_agentReference, lc);
123
124
    auto failedOwnershipSwitchElements = ContainerTraits<C>::switchElementsOwnership(elements, cont.getAddressIfSet(),
        m_agentReference.getAgentAddress(), lc);
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
    // If ownership switching failed, remove failed object from queue to not leave stale pointers.
    if (failedOwnershipSwitchElements.size()) {
      ContainerTraits<C>::removeReferencesAndCommit(cont, failedOwnershipSwitchElements);
    }
    // We are now done with the container.
    contLock.release();
    if (failedOwnershipSwitchElements.empty()) {
      // The good case: all elements went through.
      std::list<std::string> transferedElements;
      for (const auto & e: elements) transferedElements.emplace_back(ContainerTraits<C>::getElementAddress(e));
      m_agentReference.removeBatchFromOwnership(transferedElements, m_backend);
      // That's it, we're done.
      return;
    } else {
      // Bad case: we have to filter the elements and remove ownership only for the successful ones.
      std::set<std::string> failedElementsSet;
      for (const auto & feos: failedOwnershipSwitchElements) failedElementsSet.insert(ContainerTraits<C>::getElementAddress(*feos.element));
      std::list<std::string> transferedElements;
      typename ContainerTraits<C>::OwnershipSwitchFailure failureEx(
          "In ContainerAlgorithms<>::referenceAndSwitchOwnership(): failed to switch ownership of some elements");
      for (const auto & e: elements) {
        if (!failedElementsSet.count(ContainerTraits<C>::getElementAddress(e))) {
          transferedElements.emplace_back(ContainerTraits<C>::getElementAddress(e));
        }
      }
      if (transferedElements.size()) m_agentReference.removeBatchFromOwnership(transferedElements, m_backend);
      failureEx.failedElements = failedOwnershipSwitchElements;
      throw failureEx;
    }
  }
155
  
156
  typename ContainerTraits<C>::PoppedElementsBatch popNextBatch(const typename ContainerTraits<C>::ContainerIdentifyer & contId,
157
158
159
160
161
      typename ContainerTraits<C>::PopCriteria & popCriteria, log::LogContext & lc) {
    // Prepare the return value
    typename ContainerTraits<C>::PoppedElementsBatch ret;
    typename ContainerTraits<C>::PopCriteria unfulfilledCriteria = popCriteria;
    size_t iterationCount=0;
162
    typename ContainerTraits<C>::ElementsToSkipSet elementsToSkip;
163
164
165
166
    while (ret.summary < popCriteria) {
      // Get a container if it exists
      C cont(m_backend);
      iterationCount++;
167
      ScopedExclusiveLock contLock;
168
      try {
169
        ContainerTraits<C>::getLockedAndFetchedNoCreate(cont, contLock, contId, lc);
170
171
172
173
174
      } catch (typename ContainerTraits<C>::NoSuchContainer &) {
        // We could not find a container to pop from: return what we have.
        return ret;
      }
      // We have a container. Get candidate element list from it.
175
176
      typename ContainerTraits<C>::PoppedElementsBatch candidateElements = 
          ContainerTraits<C>::getPoppingElementsCandidates(cont, unfulfilledCriteria, elementsToSkip, lc);
177
178
      // Reference the candidates to our agent
      std::list<typename ContainerTraits<C>::ElementAddress> candidateElementsAddresses;
179
      for (auto & e: candidateElements.elements) {
180
181
182
183
184
        candidateElementsAddresses.emplace_back(ContainerTraits<C>::getElementAddress(e));
      }
      m_agentReference.addBatchToOwnership(candidateElementsAddresses, m_backend);
      // We can now attempt to switch ownership of elements
      auto failedOwnershipSwitchElements = ContainerTraits<C>::switchElementsOwnership(candidateElements,
185
          m_agentReference.getAgentAddress(), cont.getAddressIfSet(), lc);
186
      if (failedOwnershipSwitchElements.empty()) {
187
188
189
190
191
192
193
        // This is the easy case (and most common case). Everything went through fine.
        ContainerTraits<C>::removeReferencesAndCommit(cont, candidateElementsAddresses);
        contLock.release();
        // All jobs are validated
        ret.summary += candidateElements.summary;
        unfulfilledCriteria -= candidateElements.summary;
        ret.elements.insertBack(std::move(candidateElements.elements));
194
195
196
197
198
      } else {
        // For the failed files, we have to differentiate the not owned or not existing ones from other error cases.
        // For the not owned, not existing and those successfully switched, we have to de reference them form the container.
        // For other cases, we will leave the elements referenced in the container, as we cannot ensure de-referencing is safe.
        std::set<typename ContainerTraits<C>::ElementAddress> elementsNotToDereferenceFromContainer;
199
200
        std::set<typename ContainerTraits<C>::ElementAddress> elementsNotToReport;
        std::list<typename ContainerTraits<C>::ElementAddress> elementsToDereferenceFromAgent;
201
202
203
        for (auto &e: failedOwnershipSwitchElements) {
          try {
            throw e.failure;
204
205
206
207
208
209
210
211
212
          } catch (Backend::NoSuchObject &) {
            elementsToDereferenceFromAgent.push_back(ContainerTraits<C>::getElementAddress(*e.element));
            elementsNotToReport.insert(ContainerTraits<C>::getElementAddress(*e.element));
          } catch (Backend::WrongPreviousOwner &) {
            elementsToDereferenceFromAgent.push_back(ContainerTraits<C>::getElementAddress(*e.element));
            elementsNotToReport.insert(ContainerTraits<C>::getElementAddress(*e.element));
          } catch (Backend::CouldNotUnlock&) {
            // Do nothing, this element was indeed OK.
          }
213
214
215
          catch (...) {
            // This is a different error, so we will leave the reference to the element in the container
            elementsNotToDereferenceFromContainer.insert(ContainerTraits<C>::getElementAddress(*e.element));
216
217
218
            elementsToDereferenceFromAgent.push_back(ContainerTraits<C>::getElementAddress(*e.element));
            elementsNotToReport.insert(ContainerTraits<C>::getElementAddress(*e.element));
            elementsToSkip.insert(ContainerTraits<C>::getElementAddress(*e.element));
219
          }
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
        }
        // We are done with the sorting. Apply the decisions...
        std::list<typename ContainerTraits<C>::ElementAddress> elementsToDereferenceFromContainer;
        for (auto & e: candidateElements.elements) {
          if (!elementsNotToDereferenceFromContainer.count(ContainerTraits<C>::getElementAddress(e))) {
            elementsToDereferenceFromContainer.push_back(ContainerTraits<C>::getElementAddress(e));
          }
        }
        ContainerTraits<C>::removeReferencesAndCommit(cont, elementsToDereferenceFromContainer);
        contLock.release();
        m_agentReference.removeBatchFromOwnership(elementsToDereferenceFromAgent, m_backend);
        for (auto & e: candidateElements.elements) {
          if (!elementsNotToReport.count(ContainerTraits<C>::getElementAddress(e))) {
            ret.summary += ContainerTraits<C>::getElementSummary(e);
            unfulfilledCriteria -= ContainerTraits<C>::getElementSummary(e);
            ret.elements.insertBack(std::move(e));
          }
        }
238
239
240
241
      }
    }
    return ret;
  }
242
243
244
245
246
247
private:
  Backend & m_backend;
  AgentReference & m_agentReference;
};

}} // namespace cta::objectstore