BackendRados.cpp 16.9 KB
Newer Older
1
/*
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
#include "BackendRados.hpp"
20
#include "common/exception/Errnum.hpp"
21
22
23
#include <rados/librados.hpp>
#include <sys/syscall.h>
#include <errno.h>
24
#include <unistd.h>
25
#include <valgrind/helgrind.h>
26
27
28

namespace cta { namespace objectstore {

29
BackendRados::BackendRados(const std::string & userId, const std::string & pool, const std::string &radosNameSpace) :
30
m_user(userId), m_pool(pool), m_namespace(radosNameSpace), m_cluster(), m_radosCtx() {
31
  cta::exception::Errnum::throwOnReturnedErrno(-m_cluster.init(userId.c_str()),
32
33
      "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.init");
  try {
34
    cta::exception::Errnum::throwOnReturnedErrno(-m_cluster.conf_read_file(NULL),
35
        "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.conf_read_file");
36
    cta::exception::Errnum::throwOnReturnedErrno(-m_cluster.conf_parse_env(NULL),
37
        "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.conf_parse_env");
38
    cta::exception::Errnum::throwOnReturnedErrno(-m_cluster.connect(),
39
        "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.connect");
40
    cta::exception::Errnum::throwOnReturnedErrno(-m_cluster.ioctx_create(pool.c_str(), m_radosCtx),
41
        "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.ioctx_create");
42
43
    // An empty string also sets the namespace to default so no need to filter. This function does not fail.
    m_radosCtx.set_namespace(radosNameSpace);
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
  } catch (...) {
    m_cluster.shutdown();
    throw;
  }
}

BackendRados::~BackendRados() {
  m_radosCtx.close();
  m_cluster.shutdown();
}

void BackendRados::create(std::string name, std::string content) {
  librados::ObjectWriteOperation wop;
  const bool createExclusive = true;
  wop.create(createExclusive);
  ceph::bufferlist bl;
  bl.append(content.c_str(), content.size());
  wop.write_full(bl);
62
  cta::exception::Errnum::throwOnReturnedErrno(-m_radosCtx.operate(name, &wop),
63
64
65
66
67
68
69
70
71
72
      std::string("In ObjectStoreRados::create, failed to create exclusively or write ")
      + name);
}

void BackendRados::atomicOverwrite(std::string name, std::string content) {
  librados::ObjectWriteOperation wop;
  wop.assert_exists();
  ceph::bufferlist bl;
  bl.append(content.c_str(), content.size());
  wop.write_full(bl);
73
  cta::exception::Errnum::throwOnReturnedErrno(-m_radosCtx.operate(name, &wop),
74
75
76
77
78
79
80
81
      std::string("In ObjectStoreRados::atomicOverwrite, failed to assert existence or write ")
      + name);
}

std::string BackendRados::read(std::string name) {
  std::string ret;
  uint64_t size;
  time_t time;
82
  cta::exception::Errnum::throwOnReturnedErrno(-m_radosCtx.stat(name, &size, &time),
83
84
85
86
87
88
89
90
91
92
93
      std::string("In ObjectStoreRados::read,  failed to stat ")
      + name);
  librados::bufferlist bl;
  cta::exception::Errnum::throwOnNegative(m_radosCtx.read(name, bl, size, 0),
      std::string("In ObjectStoreRados::read,  failed to read ")
      + name);
  bl.copy(0, size, ret);
  return ret;
}

void BackendRados::remove(std::string name) {
94
  cta::exception::Errnum::throwOnReturnedErrno(-m_radosCtx.remove(name));
95
96
97
98
99
100
101
102
103
104
105
106
}

bool BackendRados::exists(std::string name) {
  uint64_t size;
  time_t date;
  if (m_radosCtx.stat(name, &size, &date)) {
    return false;
  } else {
    return true;
  }
}

107
108
std::list<std::string> BackendRados::list() {
  std::list<std::string> ret;
109
110
  for (auto o=m_radosCtx.objects_begin(); o!=m_radosCtx.objects_end(); o++) {
    ret.push_back(o->first);
111
112
113
114
115
  }
  return ret;
}


116
117
void BackendRados::ScopedLock::release() {
  if (!m_lockSet) return;
118
119
120
121
122
123
124
125
126
127
128
129
130
  // We should be tolerant with unlocking a deleted object, which is part
  // of the lock-remove-(implicit unlock) cycle when we delete an object
  // we hence overlook the ENOENT errors.
  int rc=m_context.unlock(m_oid, "lock", m_clientId);
  switch (-rc) {
    case ENOENT:
      break;
    default:
      cta::exception::Errnum::throwOnReturnedErrno(-rc,
        std::string("In cta::objectstore::ScopedLock::release, failed unlock ") +
        m_oid);
      break;
  }
131
132
133
134
135
136
  m_lockSet = false;
}

void BackendRados::ScopedLock::set(const std::string& oid, const std::string clientId) {
  m_oid = oid;
  m_clientId = clientId;\
137
  m_lockSet = true;
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
}

BackendRados::ScopedLock::~ScopedLock() {
  release();
}

std::string BackendRados::createUniqueClientId() {
  // Build a unique client name: host:thread
  char buff[200];
  cta::exception::Errnum::throwOnMinusOne(gethostname(buff, sizeof (buff)),
      "In ObjectStoreRados::lockExclusive:  failed to gethostname");
  pid_t tid = syscall(SYS_gettid);
  std::stringstream client;
  client << buff << ":" << tid;
  return client.str();
}

BackendRados::ScopedLock* BackendRados::lockExclusive(std::string name) {
156
  // In Rados, locking a non-existing object will create it. This is not our intended
157
158
  // behavior. We will lock anyway, test the object and re-delete it if it has a size of 0 
  // (while we own the lock).
159
160
161
162
163
  std::string client = createUniqueClientId();
  struct timeval tv;
  tv.tv_usec = 0;
  tv.tv_sec = 10;
  int rc;
164
  std::unique_ptr<ScopedLock> ret(new ScopedLock(m_radosCtx));
165
166
167
168
169
170
  do {
    rc = m_radosCtx.lock_exclusive(name, "lock", client, "", &tv, 0);
  } while (-EBUSY == rc);
  cta::exception::Errnum::throwOnReturnedErrno(-rc,
      std::string("In ObjectStoreRados::lockExclusive, failed to librados::IoCtx::lock_exclusive: ") +
      name + "/" + "lock" + "/" + client + "//");
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
  // We could have created an empty object by trying to lock it. We can find this out: if the object is
  // empty, we should delete it and throw an exception.
  // Get the size:
  uint64_t size;
  time_t date;
  cta::exception::Errnum::throwOnReturnedErrno (-m_radosCtx.stat(name, &size, &date),
      std::string("In ObjectStoreRados::lockExclusive, failed to librados::IoCtx::stat: ") +
      name + "/" + "lock" + "/" + client + "//");
  if (!size) {
    // The object has a zero size: we probably created it by attempting the locking.
    cta::exception::Errnum::throwOnReturnedErrno (-m_radosCtx.remove(name),
        std::string("In ObjectStoreRados::lockExclusive, failed to librados::IoCtx::remove: ") +
        name + "//");
    throw cta::exception::Errnum(ENOENT, "In BackendRados::lockExclusive(): trying to lock a non-existing object");
  }
186
187
188
189
190
  ret->set(name, client);
  return ret.release();
}

BackendRados::ScopedLock* BackendRados::lockShared(std::string name) {
191
  // In Rados, locking a non-existing object will create it. This is not our intended
192
193
  // behavior. We will lock anyway, test the object and re-delete it if it has a size of 0 
  // (while we own the lock).
194
195
196
197
198
  std::string client = createUniqueClientId();
  struct timeval tv;
  tv.tv_usec = 0;
  tv.tv_sec = 10;
  int rc;
199
  std::unique_ptr<ScopedLock> ret(new ScopedLock(m_radosCtx));
200
201
202
203
204
205
  do {
    rc = m_radosCtx.lock_shared(name, "lock", client, "", "", &tv, 0);
  } while (-EBUSY == rc);
  cta::exception::Errnum::throwOnReturnedErrno(-rc,
      std::string("In ObjectStoreRados::lockShared, failed to librados::IoCtx::lock_shared: ") +
      name + "/" + "lock" + "/" + client + "//");
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
  // We could have created an empty object by trying to lock it. We can find this out: if the object is
  // empty, we should delete it and throw an exception.
  // Get the size:
  uint64_t size;
  time_t date;
  cta::exception::Errnum::throwOnReturnedErrno (-m_radosCtx.stat(name, &size, &date),
      std::string("In ObjectStoreRados::lockShared, failed to librados::IoCtx::stat: ") +
      name + "/" + "lock" + "/" + client + "//");
  if (!size) {
    // The object has a zero size: we probably created it by attempting the locking.
    cta::exception::Errnum::throwOnReturnedErrno (-m_radosCtx.remove(name),
        std::string("In ObjectStoreRados::lockShared, failed to librados::IoCtx::remove: ") +
        name + "//");
    throw cta::exception::Errnum(ENOENT, "In BackendRados::lockShared(): trying to lock a non-existing object");
  }
221
222
223
224
  ret->set(name, client);
  return ret.release();
}

225
226
227
228
229
Backend::AsyncUpdater* BackendRados::asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update)
{
  return new AsyncUpdater(*this, name, update);
}

230
231
BackendRados::AsyncUpdater::AsyncUpdater(BackendRados& be, const std::string& name, std::function<std::string(const std::string&)>& update):
  m_backend(be), m_name(name), m_update(update), m_job(), m_jobFuture(m_job.get_future()) {
232
233
234
235
236
237
238
239
240
241
  try {
    librados::AioCompletion * aioc = librados::Rados::aio_create_completion(this, statCallback, nullptr);
    // At construction time, we just fire a stat.
    auto rc=m_backend.m_radosCtx.aio_stat(name, aioc, &m_size, &date);
    aioc->release();
    if (rc) {
      cta::exception::Errnum errnum (-rc, "In BackendRados::AsyncUpdater::AsyncUpdater(): failed to launch aio_stat()");
      throw Backend::NoSuchObject(errnum.getMessageValue());
    }
  } catch (...) {
242
    ANNOTATE_HAPPENS_BEFORE(&m_job);
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
    m_job.set_exception(std::current_exception());
  }
}

void BackendRados::AsyncUpdater::statCallback(librados::completion_t completion, void* pThis) {
  AsyncUpdater & au = *((AsyncUpdater *) pThis);
  try {
    // Check that the object exists.
    if (rados_aio_get_return_value(completion)) {
      cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
          "In BackendRados::AsyncUpdater::statCallback(): could not stat object: ");
      throw Backend::NoSuchObject(errnum.getMessageValue());
    }
    // It does! Let's lock it. Rados does not have aio_lock, so we do it in an async.
    // Operation is lock (synchronous), and then launch an async read.
    // The async function never fails, exceptions go to the promise (as everywhere).
    au.m_lockAsync.reset(new std::future<void>(std::async(std::launch::async,
        [pThis](){
          AsyncUpdater & au = *((AsyncUpdater *) pThis);
          try {
            au.m_lockClient = BackendRados::createUniqueClientId();
            struct timeval tv;
            tv.tv_usec = 0;
            tv.tv_sec = 10;
            int rc;
            // Unfortunately, those loops will run in a limited number of threads, 
            // limiting the parallelism of the locking.
            // TODO: could be improved (but need aio_lock in rados, not available at the time
            // of writing).
            do {
              rc = au.m_backend.m_radosCtx.lock_exclusive(au.m_name, "lock", au.m_lockClient, "", &tv, 0);
            } while (-EBUSY == rc);
            if (rc) {
              cta::exception::Errnum errnum(-rc,
                std::string("In BackendRados::AsyncUpdater::statCallback::lock_lambda(): failed to librados::IoCtx::lock_exclusive: ") +
                au.m_name + "/" + "lock" + "/" + au.m_lockClient + "//");
              throw CouldNotLock(errnum.getMessageValue());
            }
            // Locking is done, we can launch the read operation (async).
            librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, fetchCallback, nullptr);
            rc=au.m_backend.m_radosCtx.aio_read(au.m_name, aioc, &au.m_radosBufferList, au.m_size, 0);
            aioc->release();
            if (rc) {
              cta::exception::Errnum errnum (-rc, "In BackendRados::AsyncUpdater::AsyncUpdater(): failed to launch aio_stat()");
              throw Backend::CouldNotFetch(errnum.getMessageValue());
            }
          } catch (...) {
290
            ANNOTATE_HAPPENS_BEFORE(&au.m_job);
291
292
293
294
295
            au.m_job.set_exception(std::current_exception());
          }
        }
        )));
  } catch (...) {
296
    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
    au.m_job.set_exception(std::current_exception());
  }
}

void BackendRados::AsyncUpdater::fetchCallback(librados::completion_t completion, void* pThis) {
  AsyncUpdater & au = *((AsyncUpdater *) pThis);
  try {
    // Check that the object could be read.
    if (rados_aio_get_return_value(completion)<0) {
      cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
          "In BackendRados::AsyncUpdater::statCallback(): could not read object: ");
      throw Backend::CouldNotFetch(errnum.getMessageValue());
    }
    // We can now launch the update operation
    au.m_updateAsync.reset(new std::future<void>(std::async(std::launch::async,
        [pThis](){
          AsyncUpdater & au = *((AsyncUpdater *) pThis);
          try {
            // The data is in the buffer list.
            std::string value;
            try {
              au.m_radosBufferList.copy(0, au.m_size, value);
            } catch (std::exception & ex) {
              throw CouldNotUpdateValue(
                  std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to read buffer: ")
                  + ex.what());
            }
324
325
            // Execute the user's callback. Let exceptions fly through. User knows his own exceptions.
            value=au.m_update(value);
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
            try {
              // Prepare result in buffer list.
              au.m_radosBufferList.clear();
              au.m_radosBufferList.append(value);
            } catch (std::exception & ex) {
              throw CouldNotUpdateValue(
                  std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to prepare write buffer(): ")
                  + ex.what());
            }
            // Launch the write
            librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, commitCallback, nullptr);
            auto rc=au.m_backend.m_radosCtx.aio_write_full(au.m_name, aioc, au.m_radosBufferList);
            aioc->release();
            if (rc) {
              cta::exception::Errnum errnum (-rc, 
                "In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to launch aio_write_full()");
              throw Backend::CouldNotCommit(errnum.getMessageValue());
            }
          } catch (...) {
345
            ANNOTATE_HAPPENS_BEFORE(&au.m_job);
346
347
348
349
350
            au.m_job.set_exception(std::current_exception());
          }
        }
        )));
  } catch (...) {
351
    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
    au.m_job.set_exception(std::current_exception());
  }
}

void BackendRados::AsyncUpdater::commitCallback(librados::completion_t completion, void* pThis) {
  AsyncUpdater & au = *((AsyncUpdater *) pThis);
  try {
    // Check that the object could be written.
    if (rados_aio_get_return_value(completion)) {
      cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
          "In BackendRados::AsyncUpdater::commitCallback(): could not write object: ");
      throw Backend::CouldNotCommit(errnum.getMessageValue());
    }
    // Launch the async unlock.
    librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, unlockCallback, nullptr);
    auto rc=au.m_backend.m_radosCtx.aio_unlock(au.m_name, "lock", au.m_lockClient, aioc);
    aioc->release();
    if (rc) {
      cta::exception::Errnum errnum (-rc, "In BackendRados::AsyncUpdater::commitCallback(): failed to launch aio_unlock()");
      throw Backend::CouldNotUnlock(errnum.getMessageValue());
    }
  } catch (...) {
374
    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
375
376
377
378
379
380
381
382
383
384
385
386
387
388
    au.m_job.set_exception(std::current_exception());
  }
}

void BackendRados::AsyncUpdater::unlockCallback(librados::completion_t completion, void* pThis) {
  AsyncUpdater & au = *((AsyncUpdater *) pThis);
  try {
    // Check that the object could be unlocked.
    if (rados_aio_get_return_value(completion)) {
      cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
          "In BackendRados::AsyncUpdater::unlockCallback(): could not unlock object: ");
      throw Backend::CouldNotUnlock(errnum.getMessageValue());
    }
    // Done!
389
    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
390
391
    au.m_job.set_value();
  } catch (...) {
392
    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
393
394
395
396
397
    au.m_job.set_exception(std::current_exception());
  }
}

void BackendRados::AsyncUpdater::wait() {
398
399
400
  m_jobFuture.get();
  ANNOTATE_HAPPENS_AFTER(&m_job);
  ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
401
402
}

403
404
405
406
407
408
std::string BackendRados::Parameters::toStr() {
  std::stringstream ret;
  ret << "userId=" << m_userId << " pool=" << m_pool;
  return ret.str();
}

409
410
411
std::string BackendRados::Parameters::toURL() {
  std::stringstream ret;
  ret << "rados://" << m_userId << "@" << m_pool;
412
413
  if (m_namespace.size())
    ret << ":" << m_namespace;
414
415
416
417
  return ret.str();
}


418
BackendRados::Parameters* BackendRados::getParams() {
419
  std::unique_ptr<Parameters> ret(new Parameters);
420
421
  ret->m_pool = m_pool;
  ret->m_userId = m_user;
422
  ret->m_namespace = m_namespace;
423
424
425
  return ret.release();
}

426
}}