BackendRados.cpp 33.2 KB
Newer Older
1
/*
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
#include "BackendRados.hpp"
20
#include "common/exception/Errnum.hpp"
Eric Cano's avatar
Eric Cano committed
21
#include "common/Timer.hpp"
22
#include "common/threading/MutexLocker.hpp"
23
24
25
#include <rados/librados.hpp>
#include <sys/syscall.h>
#include <errno.h>
26
#include <unistd.h>
27
#include <valgrind/helgrind.h>
Eric Cano's avatar
Eric Cano committed
28
#include <random>
29

30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// This macro should be defined to get printouts to understand timings of locking.
// Usually while running BackendTestRados/BackendAbstractTest.MultithreadLockingInterface
// Also define  TEST_RADOS in objectstore/BackendRadosTestSwitch.hpp.
// Nunber of threads/passes should be reduced in the test for any usefullness.
#undef DEBUG_RADOS_LOCK_TIMINGS
#ifdef DEBUG_RADOS_LOCK_TIMINGS

namespace {
  std::atomic<double> previousSec;
  std::atomic<bool> everReleased{false};
  std::atomic<double> lastReleased;

void timestampedPrint (const char * f, const char *s) {
  struct ::timeval tv;
  ::gettimeofday(&tv, nullptr);
  double localPreviousSec=previousSec;
  double secs=previousSec=tv.tv_sec % 1000 + tv.tv_usec / 1000.0 / 1000;
  uint8_t tid = syscall(__NR_gettid) % 100;
  ::printf ("%03.06f %02.06f %02d %s %s\n", secs, secs - localPreviousSec, tid, f, s);
  ::fflush(::stdout);
}

void notifyReleased() {
  struct ::timeval tv;
  ::gettimeofday(&tv, nullptr);
  lastReleased=tv.tv_sec + tv.tv_usec / 1000.0 / 1000;
  everReleased=true;
}

void notifyLocked() {
  if (everReleased) {
    struct ::timeval tv;
    ::gettimeofday(&tv, nullptr);
    ::printf ("Relocked after %02.06f\n", (tv.tv_sec + tv.tv_usec / 1000.0 / 1000) - lastReleased);
    ::fflush(::stdout);
  }
}

}

#define TIMESTAMPEDPRINT(A) timestampedPrint(__PRETTY_FUNCTION__, (A))
#define NOTIFYLOCKED() notifyLocked()
#define NOTIFYRELEASED() notifyReleased()
#else
#define TIMESTAMPEDPRINT(A)
#define NOTIFYLOCKED()
#define NOTIFYRELEASED()
#endif

79
80
namespace cta { namespace objectstore {

81
82
cta::threading::Mutex BackendRados::RadosTimeoutLogger::g_mutex;

83
BackendRados::BackendRados(const std::string & userId, const std::string & pool, const std::string &radosNameSpace) :
84
m_user(userId), m_pool(pool), m_namespace(radosNameSpace), m_cluster(), m_radosCtx() {
85
  cta::exception::Errnum::throwOnReturnedErrno(-m_cluster.init(userId.c_str()),
86
87
      "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.init");
  try {
88
    cta::exception::Errnum::throwOnReturnedErrno(-m_cluster.conf_read_file(NULL),
89
        "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.conf_read_file");
90
    cta::exception::Errnum::throwOnReturnedErrno(-m_cluster.conf_parse_env(NULL),
91
        "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.conf_parse_env");
92
    cta::exception::Errnum::throwOnReturnedErrno(-m_cluster.connect(),
93
        "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.connect");
94
    cta::exception::Errnum::throwOnReturnedErrno(-m_cluster.ioctx_create(pool.c_str(), m_radosCtx),
95
        "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.ioctx_create");
96
97
    // An empty string also sets the namespace to default so no need to filter. This function does not fail.
    m_radosCtx.set_namespace(radosNameSpace);
98
99
100
101
102
103
104
  } catch (...) {
    m_cluster.shutdown();
    throw;
  }
}

BackendRados::~BackendRados() {
105
  RadosTimeoutLogger rtl1;
106
  m_radosCtx.close();
107
108
  rtl1.logIfNeeded("In BackendRados::~BackendRados(): m_radosCtx.close()", "no object");
  RadosTimeoutLogger rtl2;
109
  m_cluster.shutdown();
110
  rtl2.logIfNeeded("In BackendRados::~BackendRados(): m_cluster.shutdown()", "no object");
111
112
113
114
115
116
117
118
119
}

void BackendRados::create(std::string name, std::string content) {
  librados::ObjectWriteOperation wop;
  const bool createExclusive = true;
  wop.create(createExclusive);
  ceph::bufferlist bl;
  bl.append(content.c_str(), content.size());
  wop.write_full(bl);
120
  RadosTimeoutLogger rtl;
121
  cta::exception::Errnum::throwOnReturnedErrno(-m_radosCtx.operate(name, &wop),
122
      std::string("In ObjectStoreRados::create, failed to create exclusively or write: ")
123
      + name);
124
  rtl.logIfNeeded("In BackendRados::create(): m_radosCtx.operate(create+write_full)", name);
125
126
127
128
129
130
131
132
}

void BackendRados::atomicOverwrite(std::string name, std::string content) {
  librados::ObjectWriteOperation wop;
  wop.assert_exists();
  ceph::bufferlist bl;
  bl.append(content.c_str(), content.size());
  wop.write_full(bl);
133
  RadosTimeoutLogger rtl;
134
  cta::exception::Errnum::throwOnReturnedErrno(-m_radosCtx.operate(name, &wop),
135
      std::string("In ObjectStoreRados::atomicOverwrite, failed to assert existence or write: ")
136
      + name);
137
  rtl.logIfNeeded("In BackendRados::atomicOverwrite(): m_radosCtx.operate(assert_exists+write_full)", name);
138
139
140
141
142
}

std::string BackendRados::read(std::string name) {
  std::string ret;
  librados::bufferlist bl;
143
  RadosTimeoutLogger rtl;
Eric Cano's avatar
Eric Cano committed
144
  cta::exception::Errnum::throwOnNegativeErrnoIfNegative(m_radosCtx.read(name, bl, std::numeric_limits<int32_t>::max(), 0),
145
      std::string("In ObjectStoreRados::read,  failed to read: ")
146
      + name);
147
  rtl.logIfNeeded("In BackendRados::read(): m_radosCtx.read()", name);
Eric Cano's avatar
Eric Cano committed
148
  bl.copy(0, bl.length(), ret);
149
150
151
152
  return ret;
}

void BackendRados::remove(std::string name) {
153
  RadosTimeoutLogger rtl;
154
  cta::exception::Errnum::throwOnReturnedErrno(-m_radosCtx.remove(name));
155
  rtl.logIfNeeded("In BackendRados::remove(): m_radosCtx.remove()", name);
156
157
158
159
160
}

bool BackendRados::exists(std::string name) {
  uint64_t size;
  time_t date;
161
162
163
164
  RadosTimeoutLogger rtl;
  auto statRet=m_radosCtx.stat(name, &size, &date);
  rtl.logIfNeeded("In BackendRados::exists(): m_radosCtx.stat()", name);
  if (statRet) {
165
166
167
168
169
170
    return false;
  } else {
    return true;
  }
}

171
172
std::list<std::string> BackendRados::list() {
  std::list<std::string> ret;
Julien Leduc's avatar
Julien Leduc committed
173
  for (auto o=m_radosCtx.nobjects_begin(); o!=m_radosCtx.nobjects_end(); o++) {
174
    ret.push_back(o->get_oid());
175
176
177
178
  }
  return ret;
}

179
void BackendRados::ScopedLock::release() {
180
181
182
183
184
185
186
187
188
189
#if RADOS_LOCKING_STRATEGY == NOTIFY
  releaseNotify();
#elif RADOS_LOCKING_STRATEGY == BACKOFF
  releaseBackoff();
#else
#error Wrong value for "RADOS_LOCKING_STRATEGY"
#endif
}

void BackendRados::ScopedLock::releaseNotify() {
190
  if (!m_lockSet) return;
191
192
193
  // We should be tolerant with unlocking a deleted object, which is part
  // of the lock-remove-(implicit unlock) cycle when we delete an object
  // we hence overlook the ENOENT errors.
194
  TIMESTAMPEDPRINT("Pre-release");
195
  RadosTimeoutLogger rtl1;
196
  int rc=m_context.unlock(m_oid, "lock", m_clientId);
197
  rtl1.logIfNeeded("In BackendRados::ScopedLock::releaseNotify(): m_context.unlock()", m_oid);
198
199
200
201
202
  switch (-rc) {
    case ENOENT:
      break;
    default:
      cta::exception::Errnum::throwOnReturnedErrno(-rc,
203
        std::string("In cta::objectstore::ScopedLock::release, failed unlock: ") +
204
205
206
        m_oid);
      break;
  }
207
208
209
210
  NOTIFYRELEASED();
  TIMESTAMPEDPRINT("Post-release/pre-notify");
  // Notify potential waiters to take their chances now on the lock.
  librados::bufferlist bl;
211
  // We use a fire and forget aio call.
212
  librados::AioCompletion * completion = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
213
  RadosTimeoutLogger rtl2;
214
  cta::exception::Errnum::throwOnReturnedErrno(-m_context.aio_notify(m_oid, completion, bl, 10000, nullptr), 
215
      "In BackendRados::ScopedLock::releaseNotify(): failed to aio_notify()");
216
  rtl2.logIfNeeded("In BackendRados::ScopedLock::releaseNotify(): m_context.aio_notify()", m_oid);
217
  completion->release();
218
  m_lockSet = false;
219
  TIMESTAMPEDPRINT("Post-notify");
220
221
}

222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
void BackendRados::ScopedLock::releaseBackoff() {
  if (!m_lockSet) return;
  // We should be tolerant with unlocking a deleted object, which is part
  // of the lock-remove-(implicit unlock) cycle when we delete an object
  // we hence overlook the ENOENT errors.
  TIMESTAMPEDPRINT("Pre-release");
  RadosTimeoutLogger rtl1;
  int rc=m_context.unlock(m_oid, "lock", m_clientId);
  rtl1.logIfNeeded("In BackendRados::ScopedLock::releaseBackoff(): m_context.unlock()", m_oid);
  switch (-rc) {
    case ENOENT:
      break;
    default:
      cta::exception::Errnum::throwOnReturnedErrno(-rc,
        std::string("In cta::objectstore::ScopedLock::releaseBackoff, failed unlock: ") +
        m_oid);
      break;
  }
  NOTIFYRELEASED();
  TIMESTAMPEDPRINT("Post-release");
}


245
246
void BackendRados::ScopedLock::set(const std::string& oid, const std::string clientId, 
    LockType lockType) {
247
  m_oid = oid;
248
249
  m_clientId = clientId;
  m_lockType = lockType;
250
  m_lockSet = true;
251
252
253
254
255
256
}

BackendRados::ScopedLock::~ScopedLock() {
  release();
}

257
BackendRados::LockWatcher::LockWatcher(librados::IoCtx& context, const std::string& name):
258
  m_context(context) {
259
  m_internal.reset(new Internal);
260
  m_internal->m_name = name;
261
  m_internal->m_future = m_internal->m_promise.get_future();
262
  TIMESTAMPEDPRINT("Pre-watch2");
263
  RadosTimeoutLogger rtl;
264
  cta::exception::Errnum::throwOnReturnedErrno(-m_context.watch2(name, &m_watchHandle, m_internal.get()));
265
  rtl.logIfNeeded("In BackendRados::LockWatcher::LockWatcher(): m_context.watch2()", name);
266
267
268
  TIMESTAMPEDPRINT("Post-watch2");
}

269
void BackendRados::LockWatcher::Internal::handle_error(uint64_t cookie, int err) {
270
  threading::MutexLocker ml(m_promiseMutex);
271
  if (m_promiseSet) return;
272
  m_promise.set_value();
273
274
  TIMESTAMPEDPRINT("Handled notify");
  m_promiseSet = true;
275
276
}

277
void BackendRados::LockWatcher::Internal::handle_notify(uint64_t notify_id, uint64_t cookie, uint64_t notifier_id, librados::bufferlist& bl) {
278
  threading::MutexLocker ml(m_promiseMutex);
279
  if (m_promiseSet) return;
280
  m_promise.set_value();
281
282
  TIMESTAMPEDPRINT("Handled notify");
  m_promiseSet = true;
283
284
285
286
}

void BackendRados::LockWatcher::wait(const durationUs& timeout) {
  TIMESTAMPEDPRINT("Pre-wait"); 
287
  m_internal->m_future.wait_for(timeout);
288
289
290
291
  TIMESTAMPEDPRINT("Post-wait");
}

BackendRados::LockWatcher::~LockWatcher() {
292
293
294
295
296
297
298
299
300
  TIMESTAMPEDPRINT("Pre-aio_unwatch2");
  m_internal->m_radosTimeoutLogger.reset();
  auto name=m_internal->m_name;
  librados::AioCompletion *completion = librados::Rados::aio_create_completion(m_internal.release(), nullptr, Internal::deleter);
  RadosTimeoutLogger rtl;
  m_context.aio_unwatch(m_watchHandle, completion);
  completion->release();
  rtl.logIfNeeded("In BackendRados::LockWatcher::~LockWatcher(): m_context.aio_unwatch() call", name);
  TIMESTAMPEDPRINT("Post-aio_unwatch2");
301
302
}

303
void BackendRados::LockWatcher::Internal::deleter(librados::completion_t cb, void* i) {
304
305
306
  TIMESTAMPEDPRINT("Unwatch2 completion");
  std::unique_ptr<Internal> internal((Internal *) i);
  internal->m_radosTimeoutLogger.logIfNeeded("BackendRados::LockWatcher::Internal::deleter(): aio_unwatch callback", internal->m_name);
307
308
}

309
310
311
312
313
314
315
316
317
318
319
std::string BackendRados::createUniqueClientId() {
  // Build a unique client name: host:thread
  char buff[200];
  cta::exception::Errnum::throwOnMinusOne(gethostname(buff, sizeof (buff)),
      "In ObjectStoreRados::lockExclusive:  failed to gethostname");
  pid_t tid = syscall(SYS_gettid);
  std::stringstream client;
  client << buff << ":" << tid;
  return client.str();
}

320
321
322
323
324
325
326
327
328
329
330
void BackendRados::lock(std::string name, uint64_t timeout_us, LockType lockType, const std::string& clientId) {
#if RADOS_LOCKING_STRATEGY == NOTIFY
  lockNotify(name, timeout_us, lockType, clientId);
#elif RADOS_LOCKING_STRATEGY == BACKOFF
  lockBackoff(name, timeout_us, lockType, clientId);
#else
#error Wrong value for "RADOS_LOCKING_STRATEGY"
#endif
}

void BackendRados::lockNotify(std::string name, uint64_t timeout_us, LockType lockType, 
331
  const std::string & clientId) {
332
  // In Rados, locking a non-existing object will create it. This is not our intended
333
334
  // behavior. We will lock anyway, test the object and re-delete it if it has a size of 0 
  // (while we own the lock).
335
336
  struct timeval tv;
  tv.tv_usec = 0;
337
  tv.tv_sec = 240;
338
  int rc;
339
  utils::Timer timeoutTimer;
Eric Cano's avatar
Eric Cano committed
340
  while (true) {
341
    TIMESTAMPEDPRINT(lockType==LockType::Shared?"Pre-lock (shared)":"Pre-lock (exclusive)");
342
343
    RadosTimeoutLogger rtl;
    if (lockType==LockType::Shared) {
344
      rc = m_radosCtx.lock_shared(name, "lock", clientId, "", "", &tv, 0);
345
346
      rtl.logIfNeeded("In BackendRados::lockNotify(): m_radosCtx.lock_shared()", name);
    } else {
347
      rc = m_radosCtx.lock_exclusive(name, "lock", clientId, "", &tv, 0);
348
349
      rtl.logIfNeeded("In BackendRados::lockNotify(): m_radosCtx.lock_exclusive()", name);
    }
350
    if (!rc) {
351
      TIMESTAMPEDPRINT(lockType==LockType::Shared?"Post-lock (shared) (got it)":"Post-lock (exclusive) (got it)");
352
353
354
355
356
357
358
      NOTIFYLOCKED();
    } else {
      TIMESTAMPEDPRINT("Post-lock");
    }
    if (-EBUSY != rc) break;
    // The lock is taken. Start a watch on it immediately. Inspired from the algorithm listed her:
    // https://zookeeper.apache.org/doc/r3.1.2/recipes.html#sc_recipes_Locks
359
    TIMESTAMPEDPRINT(lockType==LockType::Shared?"Pre-watch-setup (shared)":"Pre-watch-setup (exclusive)");
360
    LockWatcher watcher(m_radosCtx, name);
361
    TIMESTAMPEDPRINT(lockType==LockType::Shared?"Post-watch-setup/Pre-relock (shared)":"Post-watch-setup/Pre-relock (exclusive)");
362
    // We need to retry the lock after establishing the watch: it could have been released during that time.
363
364
    rtl.reset();
    if (lockType==LockType::Shared) {
365
      rc = m_radosCtx.lock_shared(name, "lock", clientId, "", "", &tv, 0);
366
367
      rtl.logIfNeeded("In BackendRados::lockNotify(): m_radosCtx.lock_shared()", name);
    } else {
368
      rc = m_radosCtx.lock_exclusive(name, "lock", clientId, "", &tv, 0);
369
370
      rtl.logIfNeeded("In BackendRados::lockNotify(): m_radosCtx.lock_exclusive()", name);
    }
371
    if (!rc) {
372
      TIMESTAMPEDPRINT(lockType==LockType::Shared?"Post-relock (shared) (got it)":"Post-relock (exclusive) (got it)");
373
374
      NOTIFYLOCKED();
    } else {
375
      TIMESTAMPEDPRINT(lockType==LockType::Shared?"Post-relock (shared)":"Post-relock (exclusive)");
376
    }
Eric Cano's avatar
Eric Cano committed
377
    if (-EBUSY != rc) break;
378
    LockWatcher::durationUs watchTimeout = LockWatcher::durationUs(15L * 1000 * 1000); // We will poll at least every 15 second.
379
380
381
382
383
384
385
386
    // If we are dealing with a user-defined timeout, take it into account.
    if (timeout_us) {
      watchTimeout = std::min(watchTimeout, 
          LockWatcher::durationUs(timeout_us) - LockWatcher::durationUs(timeoutTimer.usecs()));
      // Make sure the value makes sense if we just crossed the deadline.
      watchTimeout = std::max(watchTimeout, LockWatcher::durationUs(1));
    }
    watcher.wait(watchTimeout);
387
    TIMESTAMPEDPRINT(lockType==LockType::Shared?"Post-wait (shared)":"Post-wait (exclusive)");
388
    if (timeout_us && (timeoutTimer.usecs() > (int64_t)timeout_us)) {
389
      throw exception::Exception("In BackendRados::lockNotify(): timeout.");
390
    }
Eric Cano's avatar
Eric Cano committed
391
  }
392
  cta::exception::Errnum::throwOnReturnedErrno(-rc,
393
394
      std::string("In ObjectStoreRados::lock(), failed to librados::IoCtx::") + 
      (lockType==LockType::Shared?"lock_shared: ":"lock_exclusive: ") +
395
      name + "/" + "lock" + "/" + clientId + "//");
396
397
398
399
400
401
  // We could have created an empty object by trying to lock it. We can find this out: if the object is
  // empty, we should delete it and throw an exception.
  // Get the size:
  uint64_t size;
  time_t date;
  cta::exception::Errnum::throwOnReturnedErrno (-m_radosCtx.stat(name, &size, &date),
402
      std::string("In ObjectStoreRados::lock, failed to librados::IoCtx::stat: ") +
403
      name + "/" + "lock" + "/" + clientId + "//");
404
405
406
  if (!size) {
    // The object has a zero size: we probably created it by attempting the locking.
    cta::exception::Errnum::throwOnReturnedErrno (-m_radosCtx.remove(name),
407
        std::string("In ObjectStoreRados::lock, failed to librados::IoCtx::remove: ") +
408
        name + "//");
409
    throw Backend::NoSuchObject(std::string("In BackendRados::lockWatch(): "
410
        "trying to lock a non-existing object: ") + name);
411
  }
412
413
}

414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
void BackendRados::lockBackoff(std::string name, uint64_t timeout_us, LockType lockType, const std::string& clientId) {
  // In Rados, locking a non-existing object will create it. This is not our intended
  // behavior. We will lock anyway, test the object and re-delete it if it has a size of 0 
  // (while we own the lock).
  struct timeval tv;
  tv.tv_usec = 0;
  tv.tv_sec = 240;
  int rc;
  // Crude backoff: we will measure the RTT of the call and backoff a faction of this amount multiplied
  // by the number of tries (and capped by a maximum). Then the value will be randomized 
  // (betweend and 50-150%)
  size_t backoff=1;
  utils::Timer t, timeoutTimer;
  while (true) {
    TIMESTAMPEDPRINT(lockType==LockType::Shared?"Pre-lock (shared)":"Pre-lock (exclusive)");
    RadosTimeoutLogger rtl;
    if (lockType==LockType::Shared) {
      rc = m_radosCtx.lock_shared(name, "lock", clientId, "", "", &tv, 0);
      rtl.logIfNeeded("In BackendRados::lockBackoff(): m_radosCtx.lock_shared()", name);
    } else {
      rc = m_radosCtx.lock_exclusive(name, "lock", clientId, "", &tv, 0);
      rtl.logIfNeeded("In BackendRados::lockBackoff(): m_radosCtx.lock_exclusive()", name);
    }
    if (!rc) {
      TIMESTAMPEDPRINT(lockType==LockType::Shared?"Post-lock (shared) (got it)":"Post-lock (exclusive) (got it)");
      NOTIFYLOCKED();
    } else {
      TIMESTAMPEDPRINT("Post-lock");
    }
    if (-EBUSY != rc) break;
    if (timeout_us && (timeoutTimer.usecs() > (int64_t)timeout_us)) {
      throw exception::Exception("In BackendRados::lockBackoff(): timeout.");
    }
    timespec ts;
    auto wait=t.usecs(utils::Timer::resetCounter)*backoff++/c_backoffFraction;
    wait = std::min(wait, c_maxWait);
    if (backoff>c_maxBackoff) backoff=1;
    // We need to get a random number [50, 150]
    std::default_random_engine dre(std::chrono::system_clock::now().time_since_epoch().count());
    std::uniform_int_distribution<size_t> distribution(50, 150);
    decltype(wait) randFactor=distribution(dre);
    wait=(wait * randFactor)/100;
    ts.tv_sec = wait/(1000*1000);
    ts.tv_nsec = (wait % (1000*1000)) * 1000;
    nanosleep(&ts, nullptr);
  }
  cta::exception::Errnum::throwOnReturnedErrno(-rc,
      std::string("In ObjectStoreRados::lock(), failed to librados::IoCtx::") + 
      (lockType==LockType::Shared?"lock_shared: ":"lock_exclusive: ") +
      name + "/" + "lock" + "/" + clientId + "//");
  // We could have created an empty object by trying to lock it. We can find this out: if the object is
  // empty, we should delete it and throw an exception.
  // Get the size:
  uint64_t size;
  time_t date;
  cta::exception::Errnum::throwOnReturnedErrno (-m_radosCtx.stat(name, &size, &date),
      std::string("In ObjectStoreRados::lockBackoff, failed to librados::IoCtx::stat: ") +
      name + "/" + "lock" + "/" + clientId + "//");
  if (!size) {
    // The object has a zero size: we probably created it by attempting the locking.
    cta::exception::Errnum::throwOnReturnedErrno (-m_radosCtx.remove(name),
        std::string("In ObjectStoreRados::lockBackoff, failed to librados::IoCtx::remove: ") +
        name + "//");
477
478
    throw Backend::NoSuchObject(std::string("In BackendRados::lockBackoff(): "
        "trying to lock a non-existing object: ") + name);
479
480
481
482
  }
}


483
484
485
486
487
BackendRados::ScopedLock* BackendRados::lockExclusive(std::string name, uint64_t timeout_us) {
  std::string client = createUniqueClientId();
  lock(name, timeout_us, LockType::Exclusive, client);
  std::unique_ptr<ScopedLock> ret(new ScopedLock(m_radosCtx));
  ret->set(name, client, LockType::Exclusive);
488
489
490
  return ret.release();
}

491
BackendRados::ScopedLock* BackendRados::lockShared(std::string name, uint64_t timeout_us) {
492
493
   std::string client = createUniqueClientId();
  lock(name, timeout_us, LockType::Shared, client);
494
  std::unique_ptr<ScopedLock> ret(new ScopedLock(m_radosCtx));
495
  ret->set(name, client, LockType::Shared);
496
497
498
  return ret.release();
}

499
500
501
502
503
Backend::AsyncUpdater* BackendRados::asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update)
{
  return new AsyncUpdater(*this, name, update);
}

504
505
BackendRados::AsyncUpdater::AsyncUpdater(BackendRados& be, const std::string& name, std::function<std::string(const std::string&)>& update):
  m_backend(be), m_name(name), m_update(update), m_job(), m_jobFuture(m_job.get_future()) {
Eric Cano's avatar
Eric Cano committed
506
  // At construction time, we just fire a lock.
507
  try {
Eric Cano's avatar
Eric Cano committed
508
509
    // Rados does not have aio_lock, so we do it in an async.
    // Operation is lock (synchronous), and then launch an async stat, then read.
510
    // The async function never fails, exceptions go to the promise (as everywhere).
Eric Cano's avatar
Eric Cano committed
511
512
    m_lockAsync.reset(new std::future<void>(std::async(std::launch::async,
        [this](){
513
          try {
Eric Cano's avatar
Eric Cano committed
514
            m_lockClient = BackendRados::createUniqueClientId();
515
516
517
            m_backend.lock(m_name, 60*1000, BackendRados::LockType::Exclusive, m_lockClient);
            // Locking is done, we can launch the read operation (async).
            librados::AioCompletion * aioc = librados::Rados::aio_create_completion(this, fetchCallback, nullptr);
518
519
            RadosTimeoutLogger rtl;
            m_radosTimeoutLogger.reset();
520
            auto rc=m_backend.m_radosCtx.aio_read(m_name, aioc, &m_radosBufferList, std::numeric_limits<int32_t>::max(), 0);
521
            rtl.logIfNeeded("BackendRados::AsyncUpdater::AsyncUpdater::lock_lambda(): m_radosCtx.aio_read()", m_name);
522
523
            aioc->release();
            if (rc) {
524
              cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncUpdater::AsyncUpdater::lock_lambda(): failed to launch aio_read(): ")+m_name);
Eric Cano's avatar
Eric Cano committed
525
              throw Backend::NoSuchObject(errnum.getMessageValue());
526
527
            }
          } catch (...) {
Eric Cano's avatar
Eric Cano committed
528
529
            ANNOTATE_HAPPENS_BEFORE(&m_job);
            m_job.set_exception(std::current_exception());
530
531
532
          }
        }
        )));
Eric Cano's avatar
Eric Cano committed
533
534
535
536
537
538
  } catch (...) {
    ANNOTATE_HAPPENS_BEFORE(&m_job);
    m_job.set_exception(std::current_exception());
  }
}

539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
void BackendRados::AsyncUpdater::deleteEmptyCallback(librados::completion_t completion, void* pThis) {
  AsyncUpdater & au = *((AsyncUpdater *) pThis);
  try {
    // Check that the object could be deleted.
    if (rados_aio_get_return_value(completion)) {
      cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
          std::string("In BackendRados::AsyncUpdater::deleteEmptyCallback(): could not delete object: ") + au.m_name);
      throw Backend::CouldNotDelete(errnum.getMessageValue());
    }
    // object deleted then throw an exception
    throw Backend::NoSuchObject(std::string("In BackendRados::AsyncUpdater::deleteEmptyCallback(): no such object: ") + au.m_name);
  } catch (...) {
    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
    au.m_job.set_exception(std::current_exception());
  }
}

556
557
void BackendRados::AsyncUpdater::fetchCallback(librados::completion_t completion, void* pThis) {
  AsyncUpdater & au = *((AsyncUpdater *) pThis);
558
  au.m_radosTimeoutLogger.logIfNeeded("In BackendRados::AsyncUpdater::fetchCallback(): aio_read callback", au.m_name);
559
560
561
562
  try {
    // Check that the object could be read.
    if (rados_aio_get_return_value(completion)<0) {
      cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
Eric Cano's avatar
Eric Cano committed
563
          std::string("In BackendRados::AsyncUpdater::fetchCallback(): could not read object: ") + au.m_name);
564
565
566
567
568
569
570
571
572
573
      throw Backend::CouldNotFetch(errnum.getMessageValue());
    }
    // We can now launch the update operation
    au.m_updateAsync.reset(new std::future<void>(std::async(std::launch::async,
        [pThis](){
          AsyncUpdater & au = *((AsyncUpdater *) pThis);
          try {
            // The data is in the buffer list.
            std::string value;
            try {
574
              au.m_radosBufferList.copy(0, au.m_radosBufferList.length(), value);
575
576
            } catch (std::exception & ex) {
              throw CouldNotUpdateValue(
577
578
                  std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to read buffer: ") +
                  au.m_name + ": "+ ex.what());
579
            }
580
581
582
583
584
585
586
587
588
589
            
            bool updateWithDelete = false;
            try {      
              // Execute the user's callback.
              value=au.m_update(value);
            } catch (AsyncUpdateWithDelete & ex) {
              updateWithDelete = true;               
            } catch (...) {
              // Let exceptions fly through. User knows his own exceptions.
              throw; 
590
            }
591
592
593
594
             
            if(updateWithDelete) {
              try {
                au.m_backend.remove(au.m_name);
595
596
597
                if (au.m_backend.exists(au.m_name)) {
                  throw exception::Exception("Object exists after remove");
                }
598
599
              } catch (cta::exception::Exception &ex) {
                throw CouldNotUpdateValue(
600
                    std::string("In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to remove value: ") +
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
                    au.m_name + ex.what());
              }
              // Done!
              ANNOTATE_HAPPENS_BEFORE(&au.m_job);
              au.m_job.set_value();
            } else {
              try {
                // Prepare result in buffer list.
                au.m_radosBufferList.clear();
                au.m_radosBufferList.append(value);
              } catch (std::exception & ex) {
                throw CouldNotUpdateValue(
                    std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to prepare write buffer(): ") + 
                    au.m_name + ex.what());
              }
              // Launch the write
              librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, commitCallback, nullptr);
618
619
              RadosTimeoutLogger rtl;
              au.m_radosTimeoutLogger.reset();
620
              auto rc=au.m_backend.m_radosCtx.aio_write_full(au.m_name, aioc, au.m_radosBufferList);
621
              rtl.logIfNeeded("In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): m_radosCtx.aio_write_full() call", au.m_name);
622
623
624
625
626
627
              aioc->release();
              if (rc) {
                cta::exception::Errnum errnum (-rc, 
                  "In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to launch aio_write_full()" + au.m_name);
                throw Backend::CouldNotCommit(errnum.getMessageValue());
              }
628
629
            }
          } catch (...) {
630
            ANNOTATE_HAPPENS_BEFORE(&au.m_job);
631
632
633
634
635
            au.m_job.set_exception(std::current_exception());
          }
        }
        )));
  } catch (...) {
636
    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
637
638
639
640
641
642
    au.m_job.set_exception(std::current_exception());
  }
}

void BackendRados::AsyncUpdater::commitCallback(librados::completion_t completion, void* pThis) {
  AsyncUpdater & au = *((AsyncUpdater *) pThis);
643
  au.m_radosTimeoutLogger.logIfNeeded("In BackendRados::AsyncUpdater::commitCallback(): aio_write_full() callback", au.m_name);
644
645
646
647
  try {
    // Check that the object could be written.
    if (rados_aio_get_return_value(completion)) {
      cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
648
          std::string("In BackendRados::AsyncUpdater::commitCallback(): could not write object: ")+au.m_name);
649
650
651
652
      throw Backend::CouldNotCommit(errnum.getMessageValue());
    }
    // Launch the async unlock.
    librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, unlockCallback, nullptr);
653
654
    au.m_radosTimeoutLogger.reset();
    RadosTimeoutLogger rtl;
655
    auto rc=au.m_backend.m_radosCtx.aio_unlock(au.m_name, "lock", au.m_lockClient, aioc);
656
    rtl.logIfNeeded("In BackendRados::AsyncUpdater::commitCallback(): m_radosCtx.aio_unlock", au.m_name);
657
658
    aioc->release();
    if (rc) {
659
      cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncUpdater::commitCallback(): failed to launch aio_unlock()")+au.m_name);
660
661
662
      throw Backend::CouldNotUnlock(errnum.getMessageValue());
    }
  } catch (...) {
663
    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
664
665
666
667
668
669
    au.m_job.set_exception(std::current_exception());
  }
}

void BackendRados::AsyncUpdater::unlockCallback(librados::completion_t completion, void* pThis) {
  AsyncUpdater & au = *((AsyncUpdater *) pThis);
670
  au.m_radosTimeoutLogger.logIfNeeded("In BackendRados::AsyncUpdater::unlockCallback(): aio_unlock() callback", au.m_name);
671
672
673
674
  try {
    // Check that the object could be unlocked.
    if (rados_aio_get_return_value(completion)) {
      cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
675
          std::string("In BackendRados::AsyncUpdater::unlockCallback(): could not unlock object: ")+au.m_name);
676
677
678
      throw Backend::CouldNotUnlock(errnum.getMessageValue());
    }
    // Done!
679
    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
680
681
    au.m_job.set_value();
  } catch (...) {
682
    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
683
684
685
686
687
    au.m_job.set_exception(std::current_exception());
  }
}

void BackendRados::AsyncUpdater::wait() {
688
689
690
  m_jobFuture.get();
  ANNOTATE_HAPPENS_AFTER(&m_job);
  ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
691
692
}

693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709

Backend::AsyncDeleter* BackendRados::asyncDelete(const std::string & name)
{
  return new AsyncDeleter(*this, name);
}

BackendRados::AsyncDeleter::AsyncDeleter(BackendRados& be, const std::string& name):
  m_backend(be), m_name(name), m_job(), m_jobFuture(m_job.get_future()) {
  // At construction time, we just fire a lock.
  try {
    // Rados does not have aio_lock, so we do it in an async.
    // Operation is lock (synchronous), and then launch an async stat, then read.
    // The async function never fails, exceptions go to the promise (as everywhere).
    m_lockAsync.reset(new std::future<void>(std::async(std::launch::async,
        [this](){
          try {
            m_lockClient = BackendRados::createUniqueClientId();
710
711
712
            m_backend.lock(m_name, 60*1000, BackendRados::LockType::Exclusive, m_lockClient);
            // Locking is done, we can launch the remove operation (async).
            librados::AioCompletion * aioc = librados::Rados::aio_create_completion(this, deleteCallback, nullptr);
713
714
            m_radosTimeoutLogger.reset();
            RadosTimeoutLogger rtl;
715
            auto rc=m_backend.m_radosCtx.aio_remove(m_name, aioc);
716
            rtl.logIfNeeded("In BackendRados::AsyncDeleter::AsyncDeleter(): m_radosCtx.aio_remove() call", m_name);
717
718
            aioc->release();
            if (rc) {
719
720
              cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncUpdater::statCallback(): failed to launch aio_remove(): ")+m_name);
              throw Backend::CouldNotDelete(errnum.getMessageValue());
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
            }
          } catch (...) {
            ANNOTATE_HAPPENS_BEFORE(&m_job);
            m_job.set_exception(std::current_exception());
          }
        }
        )));
  } catch (...) {
    ANNOTATE_HAPPENS_BEFORE(&m_job);
    m_job.set_exception(std::current_exception());
  }
}

void BackendRados::AsyncDeleter::deleteCallback(librados::completion_t completion, void* pThis) {
  AsyncDeleter & au = *((AsyncDeleter *) pThis);
736
  au.m_radosTimeoutLogger.logIfNeeded("In BackendRados::AsyncDeleter::deleteCallback(): aio_remove() callback", au.m_name);
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
  try {
    // Check that the object could be deleted.
    if (rados_aio_get_return_value(completion)) {
      cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
          std::string("In BackendRados::AsyncDeleter::deleteCallback(): could not delete object: ") + au.m_name);
      throw Backend::CouldNotDelete(errnum.getMessageValue());
    }
    // Done!
    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
    au.m_job.set_value();
  } catch (...) {
    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
    au.m_job.set_exception(std::current_exception());
  }
}

void BackendRados::AsyncDeleter::deleteEmptyCallback(librados::completion_t completion, void* pThis) {
  AsyncDeleter & au = *((AsyncDeleter *) pThis);
  try {
    // Check that the object could be deleted.
    if (rados_aio_get_return_value(completion)) {
      cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
          std::string("In BackendRados::AsyncDeleter::deleteEmptyCallback(): could not delete object: ") + au.m_name);
      throw Backend::CouldNotDelete(errnum.getMessageValue());
    }
    // object deleted then throw an exception
    throw Backend::NoSuchObject(std::string("In BackendRados::AsyncDeleter::deleteEmptyCallback(): no such object: ") + au.m_name);
  } catch (...) {
    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
    au.m_job.set_exception(std::current_exception());
  }
}

void BackendRados::AsyncDeleter::wait() {
  m_jobFuture.get();
  ANNOTATE_HAPPENS_AFTER(&m_job);
  ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
} 

776
777
778
779
780
781
std::string BackendRados::Parameters::toStr() {
  std::stringstream ret;
  ret << "userId=" << m_userId << " pool=" << m_pool;
  return ret.str();
}

782
783
784
std::string BackendRados::Parameters::toURL() {
  std::stringstream ret;
  ret << "rados://" << m_userId << "@" << m_pool;
785
786
  if (m_namespace.size())
    ret << ":" << m_namespace;
787
788
789
790
  return ret.str();
}


791
BackendRados::Parameters* BackendRados::getParams() {
792
  std::unique_ptr<Parameters> ret(new Parameters);
793
794
  ret->m_pool = m_pool;
  ret->m_userId = m_user;
795
  ret->m_namespace = m_namespace;
796
797
798
  return ret.release();
}

Eric Cano's avatar
Eric Cano committed
799
800
801
802
const size_t BackendRados::c_maxBackoff=32;
const size_t BackendRados::c_backoffFraction=4;
const uint64_t BackendRados::c_maxWait=1000000;

803
}}