ObjectOps.hpp 12.4 KB
Newer Older
1
/*
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
20
#pragma once

21
#include "Backend.hpp"
22
#include "common/exception/Exception.hpp"
23
#include "objectstore/cta.pb.h"
24
25
#include "common/log/LogContext.hpp"
#include "catalogue/Catalogue.hpp"
26
#include <memory>
27
#include <stdint.h>
28

29
30
namespace cta { namespace objectstore {

31
32
class AgentReference;

33
34
35
36
class ObjectOpsBase {
  friend class ScopedLock;
  friend class ScopedSharedLock;
  friend class ScopedExclusiveLock;
37
  friend class GenericObject;
38
  friend class Helpers;
39
40
protected:
  ObjectOpsBase(Backend & os): m_nameSet(false), m_objectStore(os), 
41
42
43
    m_headerInterpreted(false), m_payloadInterpreted(false),
    m_existingObject(false), m_locksCount(0),
    m_locksForWriteCount(0) {}
44
public:
45
46
47
48
49
50
51
52
  CTA_GENERATE_EXCEPTION_CLASS(AddressNotSet);
  CTA_GENERATE_EXCEPTION_CLASS(NotLocked);  
  CTA_GENERATE_EXCEPTION_CLASS(WrongType);
  CTA_GENERATE_EXCEPTION_CLASS(NotNewObject);
  CTA_GENERATE_EXCEPTION_CLASS(NewObject);
  CTA_GENERATE_EXCEPTION_CLASS(NotFetched);
  CTA_GENERATE_EXCEPTION_CLASS(NotInitialized);  
  CTA_GENERATE_EXCEPTION_CLASS(AddressAlreadySet);
53
  CTA_GENERATE_EXCEPTION_CLASS(InvalidAddress);
54
  CTA_GENERATE_EXCEPTION_CLASS(FailedToSerialize);
55
  CTA_GENERATE_EXCEPTION_CLASS(StillLocked);
56
protected:
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
  void checkHeaderWritable() {
    if (!m_headerInterpreted) 
      throw NotFetched("In ObjectOps::checkHeaderWritable: header not yet fetched or initialized");
    checkWritable();
  }
  
  void checkHeaderReadable() {
    if (!m_headerInterpreted) 
      throw NotFetched("In ObjectOps::checkHeaderReadable: header not yet fetched or initialized");
    checkReadable();
  }
  
  void checkPayloadWritable() {
    if (!m_payloadInterpreted) 
      throw NotFetched("In ObjectOps::checkPayloadWritable: header not yet fetched or initialized");
    checkWritable();
  }
  
  void checkPayloadReadable() {
    if (!m_payloadInterpreted) 
      throw NotFetched("In ObjectOps::checkPayloadReadable: header not yet fetched or initialized");
    checkReadable();
  }
  
  void checkWritable() {
    if (m_existingObject && !m_locksForWriteCount)
      throw NotLocked("In ObjectOps::checkWritable: object not locked for write");
  }
  
  void checkReadable() {
    if (!m_locksCount)
88
     throw NotLocked("In ObjectOps::checkReadable: object not locked");
89
90
91
92
  }
  
public:
  
93
  void setAddress(const std::string & name) {
94
    if (m_nameSet)
95
      throw AddressAlreadySet("In ObjectOps::setAddress(): trying to overwrite an already set name");
96
    if (name.empty())
97
      throw InvalidAddress("In ObjectOps::setAddress(): empty name");
98
99
100
101
    m_name = name;
    m_nameSet = true;
  }
  
102
103
104
105
106
107
108
109
110
111
112
  void resetAddress() {
    if (m_locksCount || m_locksForWriteCount) {
      throw StillLocked("In ObjectOps::resetAddress: reset the address of a locked object");
    }
    m_nameSet = false;
    m_name = "";
    m_headerInterpreted = false;
    m_payloadInterpreted = false;
    m_existingObject = false;
  }
  
113
  std::string & getAddressIfSet() {
114
    if (!m_nameSet) {
115
      throw AddressNotSet("In ObjectOpsBase::getNameIfSet: name not set yet");
116
117
118
    }
    return m_name;
  }
119
120
121
  
  void remove () {
    checkWritable();
122
    m_objectStore.remove(getAddressIfSet());
123
124
125
126
127
    m_existingObject = false;
    m_headerInterpreted = false;
    m_payloadInterpreted = false;
  }
  
128
129
130
131
132
133
  void resetValues () {
    m_existingObject = false;
    m_headerInterpreted = false;
    m_payloadInterpreted = false;
  }
  
134
  void setOwner(const std::string & owner) {
135
    checkHeaderWritable();
136
137
138
139
    m_header.set_owner(owner);
  }
  
  std::string getOwner() {
140
    checkHeaderReadable();
141
142
143
144
    return m_header.owner();
  }
  
  void setBackupOwner(const std::string & owner) {
145
    checkHeaderWritable();
146
147
148
149
    m_header.set_backupowner(owner);
  }
  
  std::string getBackupOwner() {
150
    checkHeaderReadable();
151
152
153
    return m_header.backupowner();
  }

154
155
156
157
158
159
160
protected:
  bool m_nameSet;
  std::string m_name;
  Backend & m_objectStore;
  serializers::ObjectHeader m_header;
  bool m_headerInterpreted;
  bool m_payloadInterpreted;
161
162
163
  bool m_existingObject;
  int m_locksCount;
  int m_locksForWriteCount;
164
165
166
167
168
};

class ScopedLock {
public:
  void release() {
169
170
    checkLocked();
    releaseIfNeeded();
171
  }
172
  
173
174
175
176
  bool isLocked() {
    return m_locked;
  }
  
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
  /** Move the locked object reference to a new one. This is done when the locked
   * object is a GenericObject and the caller instantiated a derived object from
   * it. The lock follows the move.
   * We check we move the lock from a Generic object (this is the only allowed
   * use case).
   * New object's locks are moved from the old one (referenced in the lock)
   */
  void transfer(ObjectOpsBase & newObject) {
    decltype(m_objectOps) oldObj(m_objectOps);
    m_objectOps = & newObject;
    // Transfer the locks from old to new object
    m_objectOps->m_locksCount = oldObj->m_locksCount;
    m_objectOps->m_locksForWriteCount = oldObj->m_locksForWriteCount;
    // The old object is not considered locked anymore and should be
    // discarded. A previous call the the new object's constructor should
    oldObj->m_locksCount =  0;
    oldObj->m_locksForWriteCount = 0;
  }
  
196
  virtual ~ScopedLock() {
197
    releaseIfNeeded();
198
  }
199
  
200
201
  CTA_GENERATE_EXCEPTION_CLASS(AlreadyLocked);
  CTA_GENERATE_EXCEPTION_CLASS(NotLocked);
202
  CTA_GENERATE_EXCEPTION_CLASS(MissingAddress);
203
  
204
protected:
205
  ScopedLock(): m_objectOps(NULL), m_locked(false) {}
206
  std::unique_ptr<Backend::ScopedLock> m_lock;
207
208
209
210
211
212
213
214
215
216
  ObjectOpsBase * m_objectOps;
  bool m_locked;
  void checkNotLocked() {
    if (m_locked)
      throw AlreadyLocked("In ScopedLock::checkNotLocked: trying to lock an already locked lock");
  }
  void checkLocked() {
    if (!m_locked)
      throw NotLocked("In ScopedLock::checkLocked: trying to unlock an unlocked lock");
  }
217
218
219
220
221
222
223
  void checkObjectAndAddressSet() {
    if (!m_objectOps) {
      throw MissingAddress("In ScopedLock::checkAddressSet: trying to lock a NULL object");
    } else if (!m_objectOps->m_nameSet || m_objectOps->m_name.empty()) {
      throw MissingAddress("In ScopedLock::checkAddressSet: trying to lock an object without address");
    }
  }
224
225
226
227
228
  virtual void releaseIfNeeded() {
    if(!m_locked) return;
    m_lock.reset(NULL);
    m_objectOps->m_locksCount--;
    m_locked = false;
229
230
    // Releasing a lock voids the object content in memory as stored object can now change. 
    m_objectOps->m_payloadInterpreted=false;
231
  }
232
};
233
  
234
235
class ScopedSharedLock: public ScopedLock {
public:
236
237
238
239
240
241
242
  ScopedSharedLock() {}
  ScopedSharedLock(ObjectOpsBase & oo) {
    lock(oo);
  }
  void lock(ObjectOpsBase & oo) {
    checkNotLocked();
    m_objectOps  = & oo;
243
    checkObjectAndAddressSet();
244
    m_lock.reset(m_objectOps->m_objectStore.lockShared(m_objectOps->getAddressIfSet()));
245
246
    m_objectOps->m_locksCount++;
    m_locked = true;
247
248
249
250
251
  }
};

class ScopedExclusiveLock: public ScopedLock {
public:
252
253
254
255
256
257
258
  ScopedExclusiveLock() {}
  ScopedExclusiveLock(ObjectOpsBase & oo) {
    lock(oo);
  }
  void lock(ObjectOpsBase & oo) {
    checkNotLocked();
    m_objectOps = &oo;
259
    checkObjectAndAddressSet();
260
    m_lock.reset(m_objectOps->m_objectStore.lockExclusive(m_objectOps->getAddressIfSet()));
261
262
263
264
265
266
267
268
269
    m_objectOps->m_locksCount++;
    m_objectOps->m_locksForWriteCount++;
    m_locked = true;
  }
protected:
  void releaseIfNeeded() {
    if (!m_locked) return;
    ScopedLock::releaseIfNeeded();
    m_objectOps->m_locksForWriteCount--;
270
271
272
  }
};

273
template <class PayloadType, serializers::ObjectType PayloadTypeId>
274
275
276
class ObjectOps: public ObjectOpsBase {
protected:
  ObjectOps(Backend & os, const std::string & name): ObjectOpsBase(os) {
277
    setAddress(name);
278
279
  }
  
280
281
  ObjectOps(Backend & os): ObjectOpsBase(os) {}
  
282
283
  virtual ~ObjectOps() {}
  
284
285
286
public:
  void fetch() {
    // Check that the object is locked, one way or another
287
    if(!m_locksCount)
288
      throw NotLocked("In ObjectOps::fetch(): object not locked");
289
    m_existingObject = true;
290
291
292
293
    // Get the header from the object store
    getHeaderFromObjectStore();
    // Interpret the data
    getPayloadFromHeader();
294
295
  }
  
296
  void commit() {
297
    checkPayloadWritable();
298
299
    if (!m_existingObject) 
      throw NewObject("In ObjectOps::commit: trying to update a new object");
300
    // Serialise the payload into the header
301
302
303
304
305
306
    try {
      m_header.set_payload(m_payload.SerializeAsString());
    } catch (std::exception & stdex) {
      cta::exception::Exception ex(std::string("In ObjectOps::commit(): failed to serialize: ")+stdex.what());
      throw ex;
    }
307
    // Write the object
308
    m_objectStore.atomicOverwrite(getAddressIfSet(), m_header.SerializeAsString());
309
310
  }
  
311
312
313
314
  CTA_GENERATE_EXCEPTION_CLASS(WrongTypeForGarbageCollection);
  /**
   * This function should be overloaded in the inheriting classes
   */
315
316
  virtual void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
    cta::catalogue::Catalogue & catalogue) = 0;
317
  
318
protected:
319
320
  
  void getPayloadFromHeader () {
321
322
323
324
325
326
    if (!m_payload.ParseFromString(m_header.payload())) {
      // Use a the tolerant parser to assess the situation.
      m_header.ParsePartialFromString(m_header.payload());
      throw cta::exception::Exception(std::string("In <ObjectOps") + typeid(PayloadType).name() + 
              ">::getPayloadFromHeader(): could not parse payload: " + m_header.InitializationErrorString());
    }
327
328
329
    m_payloadInterpreted = true;
  }

330
  void getHeaderFromObjectStore () {
331
332
333
334
335
336
337
    auto objData=m_objectStore.read(getAddressIfSet());
    if (!m_header.ParseFromString(objData)) {
      // Use a the tolerant parser to assess the situation.
      m_header.ParsePartialFromString(objData);
      throw cta::exception::Exception(std::string("In <ObjectOps") + typeid(PayloadType).name() + 
              ">::getHeaderFromObjectStore(): could not parse header: " + m_header.InitializationErrorString());
    }
338
    if (m_header.type() != payloadTypeId) {
339
340
      std::stringstream err;
      err << "In ObjectOps::getHeaderFromObjectStore wrong object type: "
341
          << "found=" << m_header.type() << " expected=" << payloadTypeId;
342
      throw ObjectOpsBase::WrongType(err.str());
343
344
345
346
347
348
349
350
351
    }
    m_headerInterpreted = true;
  }
  
public:
  /**
   * Fill up the header and object with its default contents
   */
  void initialize() {
352
353
    if (m_headerInterpreted || m_existingObject)
      throw NotNewObject("In ObjectOps::initialize: trying to initialize an exitsting object");
354
    m_header.set_type(payloadTypeId);
355
356
357
358
    m_header.set_version(0);
    m_header.set_owner("");
    m_header.set_backupowner("");
    m_headerInterpreted = true;
359
360
  }
  
361
  void insert() {
362
363
364
    // Check that we are not dealing with an existing object
    if (m_existingObject)
      throw NotNewObject("In ObjectOps::insert: trying to insert an already exitsting object");
365
366
367
    // Check that the object is ready in memory
    if (!m_headerInterpreted || !m_payloadInterpreted)
      throw NotInitialized("In ObjectOps::insert: trying to insert an uninitialized object");
368
369
370
371
    // Push the payload into the header and write the object
    // We don't require locking here, as the object does not exist
    // yet in the object store (and this is ensured by the )
    m_header.set_payload(m_payload.SerializeAsString());
372
    m_objectStore.create(getAddressIfSet(), m_header.SerializeAsString());
373
374
375
376
    m_existingObject = true;
  }
  
  bool exists() {
377
    return m_objectStore.exists(getAddressIfSet());
378
379
380
  }
  
private:
381
382
  template <class ChildType>
  void writeChild (const std::string & name, ChildType & val) {
383
    m_objectStore.create(name, val.SerializeAsString());
384
  }
385
386
387
388
389
390
  
  void removeOther(const std::string & name) {
    m_objectStore.remove(name);
  }
  
  std::string selfName() {
391
    if(!m_nameSet) throw AddressNotSet("In ObjectOps<>::updateFromObjectStore: name not set");
392
393
394
    return m_name;
  }
  
395
  Backend & objectStore() {
396
397
398
    return m_objectStore;
  }
  
399
protected:
400
401
  static const serializers::ObjectType payloadTypeId = PayloadTypeId;
  PayloadType m_payload;
402
403
};

404
}}