BackendRados.cpp 21.1 KB
Newer Older
1
/*
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
#include "BackendRados.hpp"
20
#include "common/exception/Errnum.hpp"
Eric Cano's avatar
Eric Cano committed
21
#include "common/Timer.hpp"
22
23
24
#include <rados/librados.hpp>
#include <sys/syscall.h>
#include <errno.h>
25
#include <unistd.h>
26
#include <valgrind/helgrind.h>
Eric Cano's avatar
Eric Cano committed
27
#include <random>
28
29
30

namespace cta { namespace objectstore {

31
BackendRados::BackendRados(const std::string & userId, const std::string & pool, const std::string &radosNameSpace) :
32
m_user(userId), m_pool(pool), m_namespace(radosNameSpace), m_cluster(), m_radosCtx() {
33
  cta::exception::Errnum::throwOnReturnedErrno(-m_cluster.init(userId.c_str()),
34
35
      "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.init");
  try {
36
    cta::exception::Errnum::throwOnReturnedErrno(-m_cluster.conf_read_file(NULL),
37
        "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.conf_read_file");
38
    cta::exception::Errnum::throwOnReturnedErrno(-m_cluster.conf_parse_env(NULL),
39
        "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.conf_parse_env");
40
    cta::exception::Errnum::throwOnReturnedErrno(-m_cluster.connect(),
41
        "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.connect");
42
    cta::exception::Errnum::throwOnReturnedErrno(-m_cluster.ioctx_create(pool.c_str(), m_radosCtx),
43
        "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.ioctx_create");
44
45
    // An empty string also sets the namespace to default so no need to filter. This function does not fail.
    m_radosCtx.set_namespace(radosNameSpace);
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
  } catch (...) {
    m_cluster.shutdown();
    throw;
  }
}

BackendRados::~BackendRados() {
  m_radosCtx.close();
  m_cluster.shutdown();
}

void BackendRados::create(std::string name, std::string content) {
  librados::ObjectWriteOperation wop;
  const bool createExclusive = true;
  wop.create(createExclusive);
  ceph::bufferlist bl;
  bl.append(content.c_str(), content.size());
  wop.write_full(bl);
64
  cta::exception::Errnum::throwOnReturnedErrno(-m_radosCtx.operate(name, &wop),
65
      std::string("In ObjectStoreRados::create, failed to create exclusively or write: ")
66
67
68
69
70
71
72
73
74
      + name);
}

void BackendRados::atomicOverwrite(std::string name, std::string content) {
  librados::ObjectWriteOperation wop;
  wop.assert_exists();
  ceph::bufferlist bl;
  bl.append(content.c_str(), content.size());
  wop.write_full(bl);
75
  cta::exception::Errnum::throwOnReturnedErrno(-m_radosCtx.operate(name, &wop),
76
      std::string("In ObjectStoreRados::atomicOverwrite, failed to assert existence or write: ")
77
78
79
80
81
82
83
      + name);
}

std::string BackendRados::read(std::string name) {
  std::string ret;
  uint64_t size;
  time_t time;
84
  cta::exception::Errnum::throwOnReturnedErrno(-m_radosCtx.stat(name, &size, &time),
85
      std::string("In ObjectStoreRados::read,  failed to stat: ")
86
87
88
      + name);
  librados::bufferlist bl;
  cta::exception::Errnum::throwOnNegative(m_radosCtx.read(name, bl, size, 0),
89
      std::string("In ObjectStoreRados::read,  failed to read: ")
90
91
92
93
94
95
      + name);
  bl.copy(0, size, ret);
  return ret;
}

void BackendRados::remove(std::string name) {
96
  cta::exception::Errnum::throwOnReturnedErrno(-m_radosCtx.remove(name));
97
98
99
100
101
102
103
104
105
106
107
108
}

bool BackendRados::exists(std::string name) {
  uint64_t size;
  time_t date;
  if (m_radosCtx.stat(name, &size, &date)) {
    return false;
  } else {
    return true;
  }
}

109
110
std::list<std::string> BackendRados::list() {
  std::list<std::string> ret;
111
112
  for (auto o=m_radosCtx.objects_begin(); o!=m_radosCtx.objects_end(); o++) {
    ret.push_back(o->first);
113
114
115
116
117
  }
  return ret;
}


118
119
void BackendRados::ScopedLock::release() {
  if (!m_lockSet) return;
120
121
122
123
124
125
126
127
128
  // We should be tolerant with unlocking a deleted object, which is part
  // of the lock-remove-(implicit unlock) cycle when we delete an object
  // we hence overlook the ENOENT errors.
  int rc=m_context.unlock(m_oid, "lock", m_clientId);
  switch (-rc) {
    case ENOENT:
      break;
    default:
      cta::exception::Errnum::throwOnReturnedErrno(-rc,
129
        std::string("In cta::objectstore::ScopedLock::release, failed unlock: ") +
130
131
132
        m_oid);
      break;
  }
133
134
135
136
137
138
  m_lockSet = false;
}

void BackendRados::ScopedLock::set(const std::string& oid, const std::string clientId) {
  m_oid = oid;
  m_clientId = clientId;\
139
  m_lockSet = true;
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
}

BackendRados::ScopedLock::~ScopedLock() {
  release();
}

std::string BackendRados::createUniqueClientId() {
  // Build a unique client name: host:thread
  char buff[200];
  cta::exception::Errnum::throwOnMinusOne(gethostname(buff, sizeof (buff)),
      "In ObjectStoreRados::lockExclusive:  failed to gethostname");
  pid_t tid = syscall(SYS_gettid);
  std::stringstream client;
  client << buff << ":" << tid;
  return client.str();
}

BackendRados::ScopedLock* BackendRados::lockExclusive(std::string name) {
158
  // In Rados, locking a non-existing object will create it. This is not our intended
159
160
  // behavior. We will lock anyway, test the object and re-delete it if it has a size of 0 
  // (while we own the lock).
161
162
163
  std::string client = createUniqueClientId();
  struct timeval tv;
  tv.tv_usec = 0;
164
  tv.tv_sec = 240;
165
  int rc;
166
  std::unique_ptr<ScopedLock> ret(new ScopedLock(m_radosCtx));
Eric Cano's avatar
Eric Cano committed
167
168
169
170
171
172
  // Crude backoff: we will measure the RTT of the call and backoff a faction of this amount multiplied
  // by the number of tries (and capped by a maximum). Then the value will be randomized 
  // (betweend and 50-150%)
  size_t backoff=1;
  utils::Timer t;
  while (true) {
173
    rc = m_radosCtx.lock_exclusive(name, "lock", client, "", &tv, 0);
Eric Cano's avatar
Eric Cano committed
174
175
176
177
178
179
180
181
182
183
184
185
186
187
    if (-EBUSY != rc) break;
    timespec ts;
    auto wait=t.usecs(utils::Timer::resetCounter)*backoff++/c_backoffFraction;
    wait = std::min(wait, c_maxWait);
    if (backoff>c_maxBackoff) backoff=1;
    // We need to get a random number [50, 150]
    std::default_random_engine dre(std::chrono::system_clock::now().time_since_epoch().count());
    std::uniform_int_distribution<size_t> distribution(50, 150);
    decltype(wait) randFactor=distribution(dre);
    wait=(wait * randFactor)/100;
    ts.tv_sec = wait/(1000*1000);
    ts.tv_nsec = (wait % (1000*1000)) * 1000;
    nanosleep(&ts, nullptr);
  }
188
189
190
  cta::exception::Errnum::throwOnReturnedErrno(-rc,
      std::string("In ObjectStoreRados::lockExclusive, failed to librados::IoCtx::lock_exclusive: ") +
      name + "/" + "lock" + "/" + client + "//");
191
192
193
194
195
196
197
198
199
200
201
202
203
  // We could have created an empty object by trying to lock it. We can find this out: if the object is
  // empty, we should delete it and throw an exception.
  // Get the size:
  uint64_t size;
  time_t date;
  cta::exception::Errnum::throwOnReturnedErrno (-m_radosCtx.stat(name, &size, &date),
      std::string("In ObjectStoreRados::lockExclusive, failed to librados::IoCtx::stat: ") +
      name + "/" + "lock" + "/" + client + "//");
  if (!size) {
    // The object has a zero size: we probably created it by attempting the locking.
    cta::exception::Errnum::throwOnReturnedErrno (-m_radosCtx.remove(name),
        std::string("In ObjectStoreRados::lockExclusive, failed to librados::IoCtx::remove: ") +
        name + "//");
204
    throw cta::exception::Errnum(ENOENT, std::string("In BackendRados::lockExclusive(): trying to lock a non-existing object: ") + name);
205
  }
206
207
208
209
210
  ret->set(name, client);
  return ret.release();
}

BackendRados::ScopedLock* BackendRados::lockShared(std::string name) {
211
  // In Rados, locking a non-existing object will create it. This is not our intended
212
213
  // behavior. We will lock anyway, test the object and re-delete it if it has a size of 0 
  // (while we own the lock).
214
215
216
  std::string client = createUniqueClientId();
  struct timeval tv;
  tv.tv_usec = 0;
217
  tv.tv_sec = 240;
218
  int rc;
219
  std::unique_ptr<ScopedLock> ret(new ScopedLock(m_radosCtx));
Eric Cano's avatar
Eric Cano committed
220
221
222
223
224
225
  // Crude backoff: we will measure the RTT of the call and backoff a faction of this amount multiplied
  // by the number of tries (and capped by a maximum). Then the value will be randomized 
  // (betweend and 50-150%)
  size_t backoff=1;
  utils::Timer t;
  while (true) {
226
    rc = m_radosCtx.lock_shared(name, "lock", client, "", "", &tv, 0);
Eric Cano's avatar
Eric Cano committed
227
228
229
230
231
232
233
234
235
236
237
238
239
240
    if (-EBUSY != rc) break;
    timespec ts;
    auto wait=t.usecs(utils::Timer::resetCounter)*backoff++/c_backoffFraction;
    wait = std::min(wait, c_maxWait);
    if (backoff>c_maxBackoff) backoff=1;
    // We need to get a random number [50, 150]
    std::default_random_engine dre(std::chrono::system_clock::now().time_since_epoch().count());
    std::uniform_int_distribution<size_t> distribution(50, 150);
    decltype(wait) randFactor=distribution(dre);
    wait=(wait * randFactor)/100;
    ts.tv_sec = wait/(1000*1000);
    ts.tv_nsec = (wait % (1000*1000)) * 1000;
    nanosleep(&ts, nullptr);
  }
241
242
243
  cta::exception::Errnum::throwOnReturnedErrno(-rc,
      std::string("In ObjectStoreRados::lockShared, failed to librados::IoCtx::lock_shared: ") +
      name + "/" + "lock" + "/" + client + "//");
244
245
246
247
248
249
250
251
252
253
254
255
256
  // We could have created an empty object by trying to lock it. We can find this out: if the object is
  // empty, we should delete it and throw an exception.
  // Get the size:
  uint64_t size;
  time_t date;
  cta::exception::Errnum::throwOnReturnedErrno (-m_radosCtx.stat(name, &size, &date),
      std::string("In ObjectStoreRados::lockShared, failed to librados::IoCtx::stat: ") +
      name + "/" + "lock" + "/" + client + "//");
  if (!size) {
    // The object has a zero size: we probably created it by attempting the locking.
    cta::exception::Errnum::throwOnReturnedErrno (-m_radosCtx.remove(name),
        std::string("In ObjectStoreRados::lockShared, failed to librados::IoCtx::remove: ") +
        name + "//");
Eric Cano's avatar
Eric Cano committed
257
    throw cta::exception::Errnum(ENOENT, std::string ("In BackendRados::lockShared(): trying to lock a non-existing object: ") + name);
258
  }
259
260
261
262
  ret->set(name, client);
  return ret.release();
}

263
264
265
266
267
Backend::AsyncUpdater* BackendRados::asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update)
{
  return new AsyncUpdater(*this, name, update);
}

268
269
BackendRados::AsyncUpdater::AsyncUpdater(BackendRados& be, const std::string& name, std::function<std::string(const std::string&)>& update):
  m_backend(be), m_name(name), m_update(update), m_job(), m_jobFuture(m_job.get_future()) {
Eric Cano's avatar
Eric Cano committed
270
  // At construction time, we just fire a lock.
271
  try {
Eric Cano's avatar
Eric Cano committed
272
273
    // Rados does not have aio_lock, so we do it in an async.
    // Operation is lock (synchronous), and then launch an async stat, then read.
274
    // The async function never fails, exceptions go to the promise (as everywhere).
Eric Cano's avatar
Eric Cano committed
275
276
    m_lockAsync.reset(new std::future<void>(std::async(std::launch::async,
        [this](){
277
          try {
Eric Cano's avatar
Eric Cano committed
278
            m_lockClient = BackendRados::createUniqueClientId();
279
280
            struct timeval tv;
            tv.tv_usec = 0;
281
            tv.tv_sec = 240;
282
283
284
            int rc;
            // TODO: could be improved (but need aio_lock in rados, not available at the time
            // of writing).
Eric Cano's avatar
Eric Cano committed
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
            // Crude backoff: we will measure the RTT of the call and backoff a faction of this amount multiplied
            // by the number of tries (and capped by a maximum). Then the value will be randomized 
            // (betweend and 50-150%)
            size_t backoff=1;
            utils::Timer t;
            while (true) {
              rc = m_backend.m_radosCtx.lock_exclusive(m_name, "lock", m_lockClient, "", &tv, 0);
              if (-EBUSY != rc) break;
              timespec ts;
              auto wait=t.usecs(utils::Timer::resetCounter)*backoff++/c_backoffFraction;
              wait = std::min(wait, c_maxWait);
              if (backoff>c_maxBackoff) backoff=1;
              // We need to get a random number [50, 150]
              std::default_random_engine dre(std::chrono::system_clock::now().time_since_epoch().count());
              std::uniform_int_distribution<size_t> distribution(50, 150);
              decltype(wait) randFactor=distribution(dre);
              wait=(wait * randFactor)/100;
              ts.tv_sec = wait/(1000*1000);
              ts.tv_nsec = (wait % (1000*1000)) * 1000;
              nanosleep(&ts, nullptr);
            }
306
307
308
            if (rc) {
              cta::exception::Errnum errnum(-rc,
                std::string("In BackendRados::AsyncUpdater::statCallback::lock_lambda(): failed to librados::IoCtx::lock_exclusive: ") +
Eric Cano's avatar
Eric Cano committed
309
                m_name + "/" + "lock" + "/" + m_lockClient + "//");
310
311
              throw CouldNotLock(errnum.getMessageValue());
            }
Eric Cano's avatar
Eric Cano committed
312
313
314
            // Locking is done, we can launch the stat operation (async).
            librados::AioCompletion * aioc = librados::Rados::aio_create_completion(this, statCallback, nullptr);
            rc=m_backend.m_radosCtx.aio_stat(m_name, aioc, &m_size, &date);
315
316
            aioc->release();
            if (rc) {
Eric Cano's avatar
Eric Cano committed
317
318
              cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncUpdater::AsyncUpdater::lock_lambda(): failed to launch aio_stat(): ")+m_name);
              throw Backend::NoSuchObject(errnum.getMessageValue());
319
320
            }
          } catch (...) {
Eric Cano's avatar
Eric Cano committed
321
322
            ANNOTATE_HAPPENS_BEFORE(&m_job);
            m_job.set_exception(std::current_exception());
323
324
325
          }
        }
        )));
Eric Cano's avatar
Eric Cano committed
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
  } catch (...) {
    ANNOTATE_HAPPENS_BEFORE(&m_job);
    m_job.set_exception(std::current_exception());
  }
}

void BackendRados::AsyncUpdater::statCallback(librados::completion_t completion, void* pThis) {
  AsyncUpdater & au = *((AsyncUpdater *) pThis);
  try {
    // Get the object size (it's already locked).
    if (rados_aio_get_return_value(completion)) {
      cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
          std::string("In BackendRados::AsyncUpdater::statCallback(): could not stat object: ") + au.m_name);
      throw Backend::NoSuchObject(errnum.getMessageValue());
    }
    // Check the size. If zero, we locked an empty object: delete and throw an exception.
    if (!au.m_size) {
      // TODO. This is going to lock the callback thread of the rados context for a while.
      // As this is not supposde to happen often, this is acceptable.
      au.m_backend.remove(au.m_name);
      throw Backend::NoSuchObject(std::string("In BackendRados::AsyncUpdater::statCallback(): no such object: ") + au.m_name);
    }
    // Stat is done, we can launch the read operation (async).
    librados::AioCompletion * aioc = librados::Rados::aio_create_completion(&au, fetchCallback, nullptr);
    auto rc=au.m_backend.m_radosCtx.aio_read(au.m_name, aioc, &au.m_radosBufferList, au.m_size, 0);
    aioc->release();
    if (rc) {
      cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncUpdater::statCallback(): failed to launch aio_read(): ")+au.m_name);
      throw Backend::NoSuchObject(errnum.getMessageValue());
    }
356
  } catch (...) {
357
    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
358
359
360
361
362
363
364
365
366
367
    au.m_job.set_exception(std::current_exception());
  }
}

void BackendRados::AsyncUpdater::fetchCallback(librados::completion_t completion, void* pThis) {
  AsyncUpdater & au = *((AsyncUpdater *) pThis);
  try {
    // Check that the object could be read.
    if (rados_aio_get_return_value(completion)<0) {
      cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
Eric Cano's avatar
Eric Cano committed
368
          std::string("In BackendRados::AsyncUpdater::fetchCallback(): could not read object: ") + au.m_name);
369
370
371
372
373
374
375
376
377
378
379
380
381
      throw Backend::CouldNotFetch(errnum.getMessageValue());
    }
    // We can now launch the update operation
    au.m_updateAsync.reset(new std::future<void>(std::async(std::launch::async,
        [pThis](){
          AsyncUpdater & au = *((AsyncUpdater *) pThis);
          try {
            // The data is in the buffer list.
            std::string value;
            try {
              au.m_radosBufferList.copy(0, au.m_size, value);
            } catch (std::exception & ex) {
              throw CouldNotUpdateValue(
382
383
                  std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to read buffer: ") +
                  au.m_name + ": "+ ex.what());
384
            }
385
386
387
388
389
390
391
392
393
394
            
            bool updateWithDelete = false;
            try {      
              // Execute the user's callback.
              value=au.m_update(value);
            } catch (AsyncUpdateWithDelete & ex) {
              updateWithDelete = true;               
            } catch (...) {
              // Let exceptions fly through. User knows his own exceptions.
              throw; 
395
            }
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
             
            if(updateWithDelete) {
              try {
                au.m_backend.remove(au.m_name);
              } catch (cta::exception::Exception &ex) {
                throw CouldNotUpdateValue(
                    std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to remove value: ") +
                    au.m_name + ex.what());
              }
              // Done!
              ANNOTATE_HAPPENS_BEFORE(&au.m_job);
              au.m_job.set_value();
            } else {
              try {
                // Prepare result in buffer list.
                au.m_radosBufferList.clear();
                au.m_radosBufferList.append(value);
              } catch (std::exception & ex) {
                throw CouldNotUpdateValue(
                    std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to prepare write buffer(): ") + 
                    au.m_name + ex.what());
              }
              // Launch the write
              librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, commitCallback, nullptr);
              auto rc=au.m_backend.m_radosCtx.aio_write_full(au.m_name, aioc, au.m_radosBufferList);
              aioc->release();
              if (rc) {
                cta::exception::Errnum errnum (-rc, 
                  "In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to launch aio_write_full()" + au.m_name);
                throw Backend::CouldNotCommit(errnum.getMessageValue());
              }
427
428
            }
          } catch (...) {
429
            ANNOTATE_HAPPENS_BEFORE(&au.m_job);
430
431
432
433
434
            au.m_job.set_exception(std::current_exception());
          }
        }
        )));
  } catch (...) {
435
    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
436
437
438
439
440
441
442
443
444
445
    au.m_job.set_exception(std::current_exception());
  }
}

void BackendRados::AsyncUpdater::commitCallback(librados::completion_t completion, void* pThis) {
  AsyncUpdater & au = *((AsyncUpdater *) pThis);
  try {
    // Check that the object could be written.
    if (rados_aio_get_return_value(completion)) {
      cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
446
          std::string("In BackendRados::AsyncUpdater::commitCallback(): could not write object: ")+au.m_name);
447
448
449
450
451
452
453
      throw Backend::CouldNotCommit(errnum.getMessageValue());
    }
    // Launch the async unlock.
    librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, unlockCallback, nullptr);
    auto rc=au.m_backend.m_radosCtx.aio_unlock(au.m_name, "lock", au.m_lockClient, aioc);
    aioc->release();
    if (rc) {
454
      cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncUpdater::commitCallback(): failed to launch aio_unlock()")+au.m_name);
455
456
457
      throw Backend::CouldNotUnlock(errnum.getMessageValue());
    }
  } catch (...) {
458
    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
459
460
461
462
463
464
465
466
467
468
    au.m_job.set_exception(std::current_exception());
  }
}

void BackendRados::AsyncUpdater::unlockCallback(librados::completion_t completion, void* pThis) {
  AsyncUpdater & au = *((AsyncUpdater *) pThis);
  try {
    // Check that the object could be unlocked.
    if (rados_aio_get_return_value(completion)) {
      cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
469
          std::string("In BackendRados::AsyncUpdater::unlockCallback(): could not unlock object: ")+au.m_name);
470
471
472
      throw Backend::CouldNotUnlock(errnum.getMessageValue());
    }
    // Done!
473
    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
474
475
    au.m_job.set_value();
  } catch (...) {
476
    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
477
478
479
480
481
    au.m_job.set_exception(std::current_exception());
  }
}

void BackendRados::AsyncUpdater::wait() {
482
483
484
  m_jobFuture.get();
  ANNOTATE_HAPPENS_AFTER(&m_job);
  ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
485
486
}

487
488
489
490
491
492
std::string BackendRados::Parameters::toStr() {
  std::stringstream ret;
  ret << "userId=" << m_userId << " pool=" << m_pool;
  return ret.str();
}

493
494
495
std::string BackendRados::Parameters::toURL() {
  std::stringstream ret;
  ret << "rados://" << m_userId << "@" << m_pool;
496
497
  if (m_namespace.size())
    ret << ":" << m_namespace;
498
499
500
501
  return ret.str();
}


502
BackendRados::Parameters* BackendRados::getParams() {
503
  std::unique_ptr<Parameters> ret(new Parameters);
504
505
  ret->m_pool = m_pool;
  ret->m_userId = m_user;
506
  ret->m_namespace = m_namespace;
507
508
509
  return ret.release();
}

Eric Cano's avatar
Eric Cano committed
510
511
512
513
const size_t BackendRados::c_maxBackoff=32;
const size_t BackendRados::c_backoffFraction=4;
const uint64_t BackendRados::c_maxWait=1000000;

514
}}