ObjectOps.hpp 9.81 KB
Newer Older
1
/*
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
20
#pragma once

21
#include "Backend.hpp"
22
#include "common/exception/Exception.hpp"
23
#include "objectstore/cta.pb.h"
24
#include <memory>
25
#include <stdint.h>
26

27
28
namespace cta { namespace objectstore {

29
30
31
32
33
34
class ObjectOpsBase {
  friend class ScopedLock;
  friend class ScopedSharedLock;
  friend class ScopedExclusiveLock;
protected:
  ObjectOpsBase(Backend & os): m_nameSet(false), m_objectStore(os), 
35
36
37
    m_headerInterpreted(false), m_payloadInterpreted(false),
    m_existingObject(false), m_locksCount(0),
    m_locksForWriteCount(0) {}
38
  
39
  class AddressNotSet: public cta::exception::Exception {
40
  public:
41
    AddressNotSet(const std::string & w): cta::exception::Exception(w) {}
42
43
  };
  
44
45
46
47
48
  class NotLocked: public cta::exception::Exception {
  public:
    NotLocked(const std::string & w): cta::exception::Exception(w) {}
  };
  
49
50
51
52
53
  class AlreadyLocked: public cta::exception::Exception {
  public:
    AlreadyLocked(const std::string & w): cta::exception::Exception(w) {}
  };
  
54
55
56
57
58
  class WrongType: public cta::exception::Exception {
  public:
    WrongType(const std::string & w): cta::exception::Exception(w) {}
  };
  
59
  class NotNewObject: public cta::exception::Exception {
60
  public:
61
62
63
64
65
66
    NotNewObject(const std::string & w): cta::exception::Exception(w) {}
  };
  
  class NewObject: public cta::exception::Exception {
  public:
    NewObject(const std::string & w): cta::exception::Exception(w) {}
67
68
69
70
71
72
73
  };
  
  class NotFetched: public cta::exception::Exception {
  public:
    NotFetched(const std::string & w): cta::exception::Exception(w) {}
  };
  
74
75
76
77
78
  class NotInitialized: public cta::exception::Exception {
  public:
    NotInitialized(const std::string & w): cta::exception::Exception(w) {}
  };
  
79
  class AddressAlreadySet: public cta::exception::Exception {
80
  public:
81
    AddressAlreadySet(const std::string & w): cta::exception::Exception(w) {}
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
  };
  
  void checkHeaderWritable() {
    if (!m_headerInterpreted) 
      throw NotFetched("In ObjectOps::checkHeaderWritable: header not yet fetched or initialized");
    checkWritable();
  }
  
  void checkHeaderReadable() {
    if (!m_headerInterpreted) 
      throw NotFetched("In ObjectOps::checkHeaderReadable: header not yet fetched or initialized");
    checkReadable();
  }
  
  void checkPayloadWritable() {
    if (!m_payloadInterpreted) 
      throw NotFetched("In ObjectOps::checkPayloadWritable: header not yet fetched or initialized");
    checkWritable();
  }
  
  void checkPayloadReadable() {
    if (!m_payloadInterpreted) 
      throw NotFetched("In ObjectOps::checkPayloadReadable: header not yet fetched or initialized");
    checkReadable();
  }
  
  void checkWritable() {
    if (m_existingObject && !m_locksForWriteCount)
      throw NotLocked("In ObjectOps::checkWritable: object not locked for write");
  }
  
  void checkReadable() {
    if (!m_locksCount)
      throw NotLocked("In ObjectOps::checkReadable: object not locked");
  }
  
public:
  
120
  void setAddress(const std::string & name) {
121
    if (m_nameSet)
122
      throw AddressAlreadySet("In ObjectOps::setName: trying to overwrite an already set name");
123
124
125
126
    m_name = name;
    m_nameSet = true;
  }
  
127
  std::string & getAddressIfSet() {
128
    if (!m_nameSet) {
129
      throw AddressNotSet("In ObjectOpsBase::getNameIfSet: name not set yet");
130
131
132
    }
    return m_name;
  }
133
134
135
  
  void remove () {
    checkWritable();
136
    m_objectStore.remove(getAddressIfSet());
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
    m_existingObject = false;
    m_headerInterpreted = false;
    m_payloadInterpreted = false;
  }
  
  void setOwner(const std::string & owner) {
    m_header.set_owner(owner);
  }
  
  std::string getOwner() {
    return m_header.owner();
  }
  
  void setBackupOwner(const std::string & owner) {
    m_header.set_backupowner(owner);
  }
  
  std::string getBackupOwner() {
    return m_header.backupowner();
  }

158
159
160
161
162
163
164
protected:
  bool m_nameSet;
  std::string m_name;
  Backend & m_objectStore;
  serializers::ObjectHeader m_header;
  bool m_headerInterpreted;
  bool m_payloadInterpreted;
165
166
167
  bool m_existingObject;
  int m_locksCount;
  int m_locksForWriteCount;
168
  std::unique_ptr<Backend::ScopedLock> m_writeLock;
169
170
171
172
173
};

class ScopedLock {
public:
  void release() {
174
175
    checkLocked();
    releaseIfNeeded();
176
  }
177
  
178
  virtual ~ScopedLock() {
179
    releaseIfNeeded();
180
  }
181
182
183
184
185
186
187
188
189
190
  class AlreadyLocked: public cta::exception::Exception {
  public:
    AlreadyLocked(const std::string & w): cta::exception::Exception(w) {}
  };
  
  class NotLocked: public cta::exception::Exception {
  public:
    NotLocked(const std::string & w): cta::exception::Exception(w) {}
  };
  
191
protected:
192
  ScopedLock(): m_objectOps(NULL), m_locked(false) {}
193
  std::unique_ptr<Backend::ScopedLock> m_lock;
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
  ObjectOpsBase * m_objectOps;
  bool m_locked;
  void checkNotLocked() {
    if (m_locked)
      throw AlreadyLocked("In ScopedLock::checkNotLocked: trying to lock an already locked lock");
  }
  void checkLocked() {
    if (!m_locked)
      throw NotLocked("In ScopedLock::checkLocked: trying to unlock an unlocked lock");
  }
  virtual void releaseIfNeeded() {
    if(!m_locked) return;
    m_lock.reset(NULL);
    m_objectOps->m_locksCount--;
    m_locked = false;
  }
210
};
211
  
212
213
class ScopedSharedLock: public ScopedLock {
public:
214
215
216
217
218
219
220
  ScopedSharedLock() {}
  ScopedSharedLock(ObjectOpsBase & oo) {
    lock(oo);
  }
  void lock(ObjectOpsBase & oo) {
    checkNotLocked();
    m_objectOps  = & oo;
221
    m_lock.reset(m_objectOps->m_objectStore.lockShared(m_objectOps->getAddressIfSet()));
222
223
    m_objectOps->m_locksCount++;
    m_locked = true;
224
225
226
227
228
  }
};

class ScopedExclusiveLock: public ScopedLock {
public:
229
230
231
232
233
234
235
  ScopedExclusiveLock() {}
  ScopedExclusiveLock(ObjectOpsBase & oo) {
    lock(oo);
  }
  void lock(ObjectOpsBase & oo) {
    checkNotLocked();
    m_objectOps = &oo;
236
    m_lock.reset(m_objectOps->m_objectStore.lockExclusive(m_objectOps->getAddressIfSet()));
237
238
239
240
241
242
243
244
245
    m_objectOps->m_locksCount++;
    m_objectOps->m_locksForWriteCount++;
    m_locked = true;
  }
protected:
  void releaseIfNeeded() {
    if (!m_locked) return;
    ScopedLock::releaseIfNeeded();
    m_objectOps->m_locksForWriteCount--;
246
247
248
249
250
251
252
  }
};

template <class C>
class ObjectOps: public ObjectOpsBase {
protected:
  ObjectOps(Backend & os, const std::string & name): ObjectOpsBase(os) {
253
    setAddress(name);
254
255
  }
  
256
257
258
259
260
  ObjectOps(Backend & os): ObjectOpsBase(os) {}
  
public:
  void fetch() {
    // Check that the object is locked, one way or another
261
    if(!m_locksCount)
262
      throw NotLocked("In ObjectOps::fetch(): object not locked");
263
    m_existingObject = true;
264
265
266
267
    // Get the header from the object store
    getHeaderFromObjectStore();
    // Interpret the data
    getPayloadFromHeader();
268
269
  }
  
270
  void commit() {
271
    checkPayloadWritable();
272
273
    if (!m_existingObject) 
      throw NewObject("In ObjectOps::commit: trying to update a new object");
274
275
    // Serialise the payload into the header
    m_header.set_payload(m_payload.SerializeAsString());
276
    // Write the object
277
    m_objectStore.atomicOverwrite(getAddressIfSet(), m_header.SerializeAsString());
278
279
280
  }
  
protected:
281
282
283
284
285
286
  
  void getPayloadFromHeader () {
    m_payload.ParseFromString(m_header.payload());
    m_payloadInterpreted = true;
  }

287
  void getHeaderFromObjectStore () {
288
    m_header.ParseFromString(m_objectStore.read(getAddressIfSet()));
289
290
291
292
293
294
295
296
297
298
299
300
301
302
    if (m_header.type() != typeId) {
      std::stringstream err;
      err << "In ObjectOps::getHeaderFromObjectStore wrong object type: "
          << "found=" << m_header.type() << " expected=" << typeId;
      throw cta::exception::Exception(err.str());
    }
    m_headerInterpreted = true;
  }
  
public:
  /**
   * Fill up the header and object with its default contents
   */
  void initialize() {
303
304
    if (m_headerInterpreted || m_existingObject)
      throw NotNewObject("In ObjectOps::initialize: trying to initialize an exitsting object");
305
    m_header.set_type(typeId);
306
307
308
309
    m_header.set_version(0);
    m_header.set_owner("");
    m_header.set_backupowner("");
    m_headerInterpreted = true;
310
311
  }
  
312
  void insert() {
313
314
315
    // Check that we are not dealing with an existing object
    if (m_existingObject)
      throw NotNewObject("In ObjectOps::insert: trying to insert an already exitsting object");
316
317
318
    // Check that the object is ready in memory
    if (!m_headerInterpreted || !m_payloadInterpreted)
      throw NotInitialized("In ObjectOps::insert: trying to insert an uninitialized object");
319
320
321
322
    // Push the payload into the header and write the object
    // We don't require locking here, as the object does not exist
    // yet in the object store (and this is ensured by the )
    m_header.set_payload(m_payload.SerializeAsString());
323
    m_objectStore.create(getAddressIfSet(), m_header.SerializeAsString());
324
325
326
327
    m_existingObject = true;
  }
  
  bool exists() {
328
    return m_objectStore.exists(getAddressIfSet());
329
330
331
  }
  
private:
332
333
  template <class C2>
  void writeChild (const std::string & name, C2 & val) {
334
    m_objectStore.create(name, val.SerializeAsString());
335
  }
336
337
338
339
340
341
  
  void removeOther(const std::string & name) {
    m_objectStore.remove(name);
  }
  
  std::string selfName() {
342
    if(!m_nameSet) throw AddressNotSet("In ObjectOps<>::updateFromObjectStore: name not set");
343
344
345
    return m_name;
  }
  
346
  Backend & objectStore() {
347
348
349
    return m_objectStore;
  }
  
350
351
352
protected:
  static const serializers::ObjectType typeId;
  C m_payload;
353
354
};

355
}}