ObjectOps.hpp 18.8 KB
Newer Older
1
/*
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
20
#pragma once

21
#include "Backend.hpp"
22
#include "common/exception/Exception.hpp"
23
#include "objectstore/cta.pb.h"
24
25
#include "common/log/LogContext.hpp"
#include "catalogue/Catalogue.hpp"
26
#include <memory>
27
#include <stdint.h>
28
#include <cryptopp/base64.h>
29

30
31
namespace cta { namespace objectstore {

32
class AgentReference;
33
34
class ScopedLock;
class ScopedExclusiveLock;
35

36
37
38
39
class ObjectOpsBase {
  friend class ScopedLock;
  friend class ScopedSharedLock;
  friend class ScopedExclusiveLock;
40
  friend class GenericObject;
41
  friend class Helpers;
42
43
protected:
  ObjectOpsBase(Backend & os): m_nameSet(false), m_objectStore(os), 
44
45
46
    m_headerInterpreted(false), m_payloadInterpreted(false),
    m_existingObject(false), m_locksCount(0),
    m_locksForWriteCount(0) {}
47
48
  
  virtual ~ObjectOpsBase();
49
public:
50
51
52
53
54
55
56
57
  CTA_GENERATE_EXCEPTION_CLASS(AddressNotSet);
  CTA_GENERATE_EXCEPTION_CLASS(NotLocked);  
  CTA_GENERATE_EXCEPTION_CLASS(WrongType);
  CTA_GENERATE_EXCEPTION_CLASS(NotNewObject);
  CTA_GENERATE_EXCEPTION_CLASS(NewObject);
  CTA_GENERATE_EXCEPTION_CLASS(NotFetched);
  CTA_GENERATE_EXCEPTION_CLASS(NotInitialized);  
  CTA_GENERATE_EXCEPTION_CLASS(AddressAlreadySet);
58
  CTA_GENERATE_EXCEPTION_CLASS(InvalidAddress);
59
  CTA_GENERATE_EXCEPTION_CLASS(FailedToSerialize);
60
  CTA_GENERATE_EXCEPTION_CLASS(StillLocked);
61
protected:
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
  void checkHeaderWritable() {
    if (!m_headerInterpreted) 
      throw NotFetched("In ObjectOps::checkHeaderWritable: header not yet fetched or initialized");
    checkWritable();
  }
  
  void checkHeaderReadable() {
    if (!m_headerInterpreted) 
      throw NotFetched("In ObjectOps::checkHeaderReadable: header not yet fetched or initialized");
    checkReadable();
  }
  
  void checkPayloadWritable() {
    if (!m_payloadInterpreted) 
      throw NotFetched("In ObjectOps::checkPayloadWritable: header not yet fetched or initialized");
    checkWritable();
  }
  
  void checkPayloadReadable() {
    if (!m_payloadInterpreted) 
      throw NotFetched("In ObjectOps::checkPayloadReadable: header not yet fetched or initialized");
    checkReadable();
  }
  
  void checkWritable() {
    if (m_existingObject && !m_locksForWriteCount)
      throw NotLocked("In ObjectOps::checkWritable: object not locked for write");
89
90
    if (m_existingObject && !m_exclusiveLock && !m_lockForSubObject)
      throw exception::Exception("In ObjectOps::checkWritable: missing reference to exclusive lock");
91
92
93
  }
  
  void checkReadable() {
94
    if (!m_locksCount && !m_noLock)
95
     throw NotLocked("In ObjectOps::checkReadable: object not locked");
96
97
98
99
  }
  
public:
  
100
  void setAddress(const std::string & name) {
101
    if (m_nameSet)
102
      throw AddressAlreadySet("In ObjectOps::setAddress(): trying to overwrite an already set name");
103
    if (name.empty())
104
      throw InvalidAddress("In ObjectOps::setAddress(): empty name");
105
106
107
108
    m_name = name;
    m_nameSet = true;
  }
  
109
110
111
112
113
114
115
116
117
118
119
  void resetAddress() {
    if (m_locksCount || m_locksForWriteCount) {
      throw StillLocked("In ObjectOps::resetAddress: reset the address of a locked object");
    }
    m_nameSet = false;
    m_name = "";
    m_headerInterpreted = false;
    m_payloadInterpreted = false;
    m_existingObject = false;
  }
  
120
  std::string & getAddressIfSet() {
121
    if (!m_nameSet) {
122
      throw AddressNotSet("In ObjectOpsBase::getNameIfSet: name not set yet");
123
124
125
    }
    return m_name;
  }
126
127
128
  
  void remove () {
    checkWritable();
129
    m_objectStore.remove(getAddressIfSet());
130
131
132
133
    m_existingObject = false;
    m_headerInterpreted = false;
    m_payloadInterpreted = false;
  }
134
   
135
136
137
138
139
  void resetValues () {
    m_existingObject = false;
    m_headerInterpreted = false;
    m_payloadInterpreted = false;
  }
140
   
141
  void setOwner(const std::string & owner) {
142
    checkHeaderWritable();
143
144
145
146
    m_header.set_owner(owner);
  }
  
  std::string getOwner() {
147
    checkHeaderReadable();
148
149
150
151
    return m_header.owner();
  }
  
  void setBackupOwner(const std::string & owner) {
152
    checkHeaderWritable();
153
154
155
156
    m_header.set_backupowner(owner);
  }
  
  std::string getBackupOwner() {
157
    checkHeaderReadable();
158
159
160
    return m_header.backupowner();
  }

161
162
163
164
165
166
167
protected:
  bool m_nameSet;
  std::string m_name;
  Backend & m_objectStore;
  serializers::ObjectHeader m_header;
  bool m_headerInterpreted;
  bool m_payloadInterpreted;
168
169
170
  bool m_existingObject;
  int m_locksCount;
  int m_locksForWriteCount;
171
  bool m_noLock = false;
172
173
174
175
176
177
  // When locked exclusively, we will keep a reference to the lock,
  // so we can propagate it to sub objects with minimal passing through.
  ScopedExclusiveLock * m_exclusiveLock = nullptr;
  // When being locked as a sub object, we will keep a reference to the lock
  // we are provided with. Likewise, the lock will update ourselves when released.
  ScopedLock * m_lockForSubObject = nullptr;
178
179
180
181
182
};

class ScopedLock {
public:
  void release() {
183
184
    checkLocked();
    releaseIfNeeded();
185
  }
186
  
187
188
189
190
  bool isLocked() {
    return m_locked;
  }
  
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
  /**
   * Virtual function (implemented differently in shared and exclusive locks),
   * marking the object as locked.
   * @param objectOps pointer to the ObjectOpsBase.
   */
  virtual void setObjectLocked(ObjectOpsBase * objectOps) = 0;
  
  /**
   * Virtual function (implemented differently in shared and exclusive locks),
   * marking the object as unlocked.
   * @param objectOps pointer to the ObjectOpsBase.
   */
  virtual void setObjectUnlocked(ObjectOpsBase * objectOps) = 0;
  
  /**
   * Expand the scope of the current lock to a sub object, which will also be covered
   * by this lock. This will allow the sub object to benefit from the same protection
   * from lack of proper locking. This feature is to be used with sharded objects.
   */
  void includeSubObject(ObjectOpsBase & subObject) {
    // To propagate a lock, we should have one to begin with.
    checkLocked();
    ObjectOpsBase * oob = & subObject;
    // Validate the sub object is defined.
    checkObjectAndAddressSet(oob);
    // Propagate the lock to the sub object (this is lock type dependant).
    setObjectLocked(oob);
    // Reference ourselves to the sub object so it can declare it destruction to us.
    oob->m_lockForSubObject = this;
    // Add a reference to the object.
    m_subObjectsOps.push_back(oob);
  }
  
224
225
226
227
228
229
230
  /** Move the locked object reference to a new one. This is done when the locked
   * object is a GenericObject and the caller instantiated a derived object from
   * it. The lock follows the move.
   * We check we move the lock from a Generic object (this is the only allowed
   * use case).
   * New object's locks are moved from the old one (referenced in the lock)
   */
231
  static void transfer(ObjectOpsBase & oldObject, ObjectOpsBase & newObject) {
232
    // Transfer the locks from old to new object
233
234
235
236
237
    newObject.m_locksCount = oldObject.m_locksCount;
    newObject.m_locksForWriteCount = oldObject.m_locksForWriteCount;
    newObject.m_exclusiveLock = oldObject.m_exclusiveLock;
    newObject.m_lockForSubObject = oldObject.m_lockForSubObject;
    newObject.m_noLock = oldObject.m_noLock;
238
239
    // The old object is not considered locked anymore and should be
    // discarded. A previous call the the new object's constructor should
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
    oldObject.m_locksCount =  0;
    oldObject.m_locksForWriteCount = 0;
    oldObject.m_exclusiveLock = nullptr;
    oldObject.m_lockForSubObject = nullptr;
    oldObject.m_noLock=false;
  }
  
  /**
   * 
   * @param subObject
   */
  
  /**
   * Dereference a sub object at destruction time
   * @param subObject
   */
  void dereferenceSubObject(ObjectOpsBase & subObject) {
    m_subObjectsOps.remove(&subObject);
258
259
  }
  
260
  virtual ~ScopedLock() {
261
262
    // Each child class will have to call releaseIfNeeded() in their own destructor
    // as it relies on pure virtual members of this base class.
263
  }
264
  
265
266
  CTA_GENERATE_EXCEPTION_CLASS(AlreadyLocked);
  CTA_GENERATE_EXCEPTION_CLASS(NotLocked);
267
  CTA_GENERATE_EXCEPTION_CLASS(MissingAddress);
268
  
269
protected:
270
  ScopedLock(): m_objectOps(NULL), m_locked(false) {}
271
  std::unique_ptr<Backend::ScopedLock> m_lock;
272
  ObjectOpsBase * m_objectOps;
273
  std::list <ObjectOpsBase *> m_subObjectsOps;
274
275
276
277
278
279
280
281
282
  bool m_locked;
  void checkNotLocked() {
    if (m_locked)
      throw AlreadyLocked("In ScopedLock::checkNotLocked: trying to lock an already locked lock");
  }
  void checkLocked() {
    if (!m_locked)
      throw NotLocked("In ScopedLock::checkLocked: trying to unlock an unlocked lock");
  }
283
284
285
286
  void checkObjectAndAddressSet(ObjectOpsBase * oob = nullptr) {
    // By default we deal with the main object.
    if (!oob) oob = m_objectOps;
    if (!oob) {
287
      throw MissingAddress("In ScopedLock::checkAddressSet: trying to lock a NULL object");
288
    } else if (!oob->m_nameSet || oob->m_name.empty()) {
289
290
291
      throw MissingAddress("In ScopedLock::checkAddressSet: trying to lock an object without address");
    }
  }
292
293
294
295
  virtual void releaseIfNeeded() {
    if(!m_locked) return;
    m_lock.reset(NULL);
    m_locked = false;
296
    setObjectUnlocked(m_objectOps);
297
    // Releasing a lock voids the object content in memory as stored object can now change. 
298
299
300
301
302
303
    m_objectOps->m_payloadInterpreted = false;
    // Apply the same to sub objects
    for (auto & oob: m_subObjectsOps) {
      setObjectUnlocked(oob);
      oob->m_payloadInterpreted = false;
    }
304
  }
305
};
306
  
307
308
class ScopedSharedLock: public ScopedLock {
public:
309
310
311
312
  ScopedSharedLock() {}
  ScopedSharedLock(ObjectOpsBase & oo) {
    lock(oo);
  }
313
314
315
316
317
318
319
320
321
  
  void setObjectLocked(ObjectOpsBase* objectOps) override {
    objectOps->m_locksCount++;
  }

  void setObjectUnlocked(ObjectOpsBase* objectOps) override {
    objectOps->m_locksCount--;
  }
  
322
323
324
  void lock(ObjectOpsBase & oo) {
    checkNotLocked();
    m_objectOps  = & oo;
325
    checkObjectAndAddressSet();
326
    m_lock.reset(m_objectOps->m_objectStore.lockShared(m_objectOps->getAddressIfSet()));
327
    setObjectLocked(m_objectOps);
328
    m_locked = true;
329
  }
330
331
332
333
334
  
  virtual ~ScopedSharedLock() {
    releaseIfNeeded();
  }

335
336
337
338
};

class ScopedExclusiveLock: public ScopedLock {
public:
339
  ScopedExclusiveLock() {}
340
341
  ScopedExclusiveLock(ObjectOpsBase & oo, uint64_t timeout_us = 0) {
    lock(oo, timeout_us);
342
  }
343
344
345
346
347
348
349
350
351
352
353
  
  void setObjectLocked(ObjectOpsBase* objectOps) override {
    m_objectOps->m_locksCount++;
    m_objectOps->m_locksForWriteCount++;
  }
  
  void setObjectUnlocked(ObjectOpsBase* objectOps) override {
    m_objectOps->m_locksCount--;
    m_objectOps->m_locksForWriteCount--;
  }

354
  void lock(ObjectOpsBase & oo, uint64_t timeout_us = 0) {
355
356
    checkNotLocked();
    m_objectOps = &oo;
357
    checkObjectAndAddressSet();
358
    m_lock.reset(m_objectOps->m_objectStore.lockExclusive(m_objectOps->getAddressIfSet(), timeout_us));
359
360
    setObjectLocked(m_objectOps);
    m_objectOps->m_exclusiveLock = this;
361
362
    m_locked = true;
  }
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
  
  /** Move the locked object reference to a new one. This is done when the locked
   * object is a GenericObject and the caller instantiated a derived object from
   * it. The lock follows the move.
   * We check we move the lock from a Generic object (this is the only allowed
   * use case).
   * New object's locks are moved from the old one (referenced in the lock)
   */
  void transfer(ObjectOpsBase & newObject) {
    // Sanity checks: we should be the lock for this object.
    if ((m_objectOps->m_exclusiveLock && m_objectOps->m_exclusiveLock != this) ||
            (m_objectOps->m_lockForSubObject && m_objectOps->m_lockForSubObject != this)) {
      std::stringstream err;
      err << "In ScopedExclusiveLock::transfer(): we should be this object's lock (and are not): "
          << std::hex << std::showbase << " exclusiveLock=" << m_objectOps->m_exclusiveLock
          << " lockForSubObject=" << m_objectOps->m_lockForSubObject
          << " this=" << this;
      throw exception::Exception (err.str());
    }
    ScopedLock::transfer(*m_objectOps, newObject);
  }
  
  virtual ~ScopedExclusiveLock() {
    releaseIfNeeded();
387
  }
388

389
390
};

391
template <class PayloadType, serializers::ObjectType PayloadTypeId>
392
393
394
class ObjectOps: public ObjectOpsBase {
protected:
  ObjectOps(Backend & os, const std::string & name): ObjectOpsBase(os) {
395
    setAddress(name);
396
397
  }
  
398
399
  ObjectOps(Backend & os): ObjectOpsBase(os) {}
  
400
401
  virtual ~ObjectOps() {}
  
402
403
404
public:
  void fetch() {
    // Check that the object is locked, one way or another
405
    if(!m_locksCount)
406
      throw NotLocked("In ObjectOps::fetch(): object not locked");
407
408
409
410
411
412
413
414
415
    fetchBottomHalf();
  }
  
  void fetchNoLock() {
    m_noLock = true;
    fetchBottomHalf();
  }
  
  void fetchBottomHalf() {
416
    m_existingObject = true;
417
418
419
420
    // Get the header from the object store
    getHeaderFromObjectStore();
    // Interpret the data
    getPayloadFromHeader();
421
422
  }
  
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
  class AsyncLockfreeFetcher {
    friend class ObjectOps;
    AsyncLockfreeFetcher(ObjectOps & obj): m_obj(obj) {}
  public:
    void wait() {
      // Current simplification: the parsing of the header/payload is synchronous.
      // This could be delegated to the backend.
      auto objData = m_asyncLockfreeFetcher->wait();
      m_obj.m_noLock = true;
      m_obj.m_existingObject = true;
      m_obj.getHeaderFromObjectData(objData);
      m_obj.getPayloadFromHeader();
    }
  private:
    ObjectOps & m_obj;
    std::unique_ptr<Backend::AsyncLockfreeFetcher> m_asyncLockfreeFetcher;
    
  };
  friend AsyncLockfreeFetcher;
  
  AsyncLockfreeFetcher * asyncLockfreeFetch () {
    std::unique_ptr<AsyncLockfreeFetcher> ret;
    ret.reset(new AsyncLockfreeFetcher(*this));
    ret->m_asyncLockfreeFetcher.reset(m_objectStore.asyncLockfreeFetch(getAddressIfSet()));
    return ret.release();
  }
  
450
  void commit() {
451
    checkPayloadWritable();
452
453
    if (!m_existingObject) 
      throw NewObject("In ObjectOps::commit: trying to update a new object");
454
    // Serialise the payload into the header
455
456
457
458
459
460
    try {
      m_header.set_payload(m_payload.SerializeAsString());
    } catch (std::exception & stdex) {
      cta::exception::Exception ex(std::string("In ObjectOps::commit(): failed to serialize: ")+stdex.what());
      throw ex;
    }
461
    // Write the object
462
    m_objectStore.atomicOverwrite(getAddressIfSet(), m_header.SerializeAsString());
463
464
  }
  
465
466
467
468
  CTA_GENERATE_EXCEPTION_CLASS(WrongTypeForGarbageCollection);
  /**
   * This function should be overloaded in the inheriting classes
   */
469
470
  virtual void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
    cta::catalogue::Catalogue & catalogue) = 0;
471
  
472
protected:
473
  
474
  virtual void getPayloadFromHeader () {
475
    if (!m_payload.ParseFromString(m_header.payload())) {
476
      // Use the tolerant parser to assess the situation.
477
      m_header.ParsePartialFromString(m_header.payload());
478
479
480
481
482
483
      // Base64 encode the payload for diagnostics.
      const bool noNewLineInBase64Output = false;
      std::string payloadBase64;
      CryptoPP::StringSource ss1(m_header.payload(), true,
        new CryptoPP::Base64Encoder(
           new CryptoPP::StringSink(payloadBase64), noNewLineInBase64Output));
484
      throw cta::exception::Exception(std::string("In <ObjectOps") + typeid(PayloadType).name() + 
485
486
487
              ">::getPayloadFromHeader(): could not parse payload: " + m_header.InitializationErrorString() + 
              " size=" + std::to_string(m_header.payload().size()) + " data(b64)=\"" + 
              payloadBase64 + "\"");
488
    }
489
490
    m_payloadInterpreted = true;
  }
491
  
492
  virtual void getHeaderFromObjectData(const std::string & objData) {
493
    if (!m_header.ParseFromString(objData)) {
494
      // Use the tolerant parser to assess the situation.
495
      m_header.ParsePartialFromString(objData);
496
497
498
499
500
501
      // Base64 encode the header for diagnostics.
      const bool noNewLineInBase64Output = false;
      std::string objDataBase64;
      CryptoPP::StringSource ss1(objData, true,
        new CryptoPP::Base64Encoder(
           new CryptoPP::StringSink(objDataBase64), noNewLineInBase64Output));
502
503
      throw cta::exception::Exception(std::string("In ObjectOps<") + typeid(PayloadType).name() + 
              ">::getHeaderFromObjectData(): could not parse header: " + m_header.InitializationErrorString() + 
504
505
              " size=" + std::to_string(objData.size()) + " data(b64)=\"" + 
              objDataBase64 + "\"");
506
    }
507
    if (m_header.type() != payloadTypeId) {
508
509
      std::stringstream err;
      err << "In ObjectOps::getHeaderFromObjectStore wrong object type: "
510
          << "found=" << m_header.type() << " expected=" << payloadTypeId;
511
      throw ObjectOpsBase::WrongType(err.str());
512
513
514
    }
    m_headerInterpreted = true;
  }
515
516
517
518
519

  void getHeaderFromObjectStore () {
    auto objData=m_objectStore.read(getAddressIfSet());
    getHeaderFromObjectData(objData);
  }
520
521
522
523
524
525
  
public:
  /**
   * Fill up the header and object with its default contents
   */
  void initialize() {
526
527
    if (m_headerInterpreted || m_existingObject)
      throw NotNewObject("In ObjectOps::initialize: trying to initialize an exitsting object");
528
    m_header.set_type(payloadTypeId);
529
530
531
532
    m_header.set_version(0);
    m_header.set_owner("");
    m_header.set_backupowner("");
    m_headerInterpreted = true;
533
534
  }
  
535
  void insert() {
536
537
538
    // Check that we are not dealing with an existing object
    if (m_existingObject)
      throw NotNewObject("In ObjectOps::insert: trying to insert an already exitsting object");
539
540
541
    // Check that the object is ready in memory
    if (!m_headerInterpreted || !m_payloadInterpreted)
      throw NotInitialized("In ObjectOps::insert: trying to insert an uninitialized object");
542
543
544
545
    // Push the payload into the header and write the object
    // We don't require locking here, as the object does not exist
    // yet in the object store (and this is ensured by the )
    m_header.set_payload(m_payload.SerializeAsString());
546
    m_objectStore.create(getAddressIfSet(), m_header.SerializeAsString());
547
548
549
550
    m_existingObject = true;
  }
  
  bool exists() {
551
    return m_objectStore.exists(getAddressIfSet());
552
553
554
  }
  
private:
555
556
  template <class ChildType>
  void writeChild (const std::string & name, ChildType & val) {
557
    m_objectStore.create(name, val.SerializeAsString());
558
  }
559
560
561
562
563
564
  
  void removeOther(const std::string & name) {
    m_objectStore.remove(name);
  }
  
  std::string selfName() {
565
    if(!m_nameSet) throw AddressNotSet("In ObjectOps<>::updateFromObjectStore: name not set");
566
567
568
    return m_name;
  }
  
569
  Backend & objectStore() {
570
571
572
    return m_objectStore;
  }
  
573
protected:
574
575
  static const serializers::ObjectType payloadTypeId = PayloadTypeId;
  PayloadType m_payload;
576
577
};

578
}}