BackendRados.hpp 6.43 KB
Newer Older
1
2
3
#pragma once

#include "Backend.hpp"
4
5
6
7
#include "exception/Errnum.hpp"
#include <rados/librados.hpp>
#include <sys/syscall.h>
#include <errno.h>
8
9

namespace cta { namespace objectstore {
10
11
12
13
/**
 * An implementation of the object store primitives, using Rados.
 */
class BackendRados: public Backend {
14
public:
15
16
17
18
19
20
21
  /**
   * The constructor, connecting to the storage pool 'pool' using the user id
   * 'userId'
   * @param userId
   * @param pool
   */
  BackendRados(std::string userId, std::string pool): 
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
    m_user(userId), m_pool(pool), m_cluster(), m_radosCtx() {
    cta::exception::Errnum::throwOnNonZero(m_cluster.init(userId.c_str()),
      "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.init");
    try {
      cta::exception::Errnum::throwOnNonZero(m_cluster.conf_read_file(NULL),
        "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.conf_read_file");
      cta::exception::Errnum::throwOnNonZero(m_cluster.conf_parse_env(NULL),
        "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.conf_read_file");
      cta::exception::Errnum::throwOnNonZero(m_cluster.connect(),
        "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.connect");
      cta::exception::Errnum::throwOnNonZero(m_cluster.ioctx_create(pool.c_str(), m_radosCtx),
        "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.ioctx_create");
    } catch (...) {
      m_cluster.shutdown();
      throw;
    }
  }
39
  virtual ~BackendRados() {
40
41
42
43
44
45
46
47
48
49
50
51
    m_radosCtx.close();
    m_cluster.shutdown();
  }
  virtual std::string user() {
    return m_user;
  }
  virtual std::string pool() {
    return m_pool;
  }
  

  virtual void create(std::string name, std::string content) {
52
53
54
55
56
57
58
59
60
    librados::ObjectWriteOperation wop;
    const bool createExclusive = true;
    wop.create(createExclusive);
    ceph::bufferlist bl;
    bl.append(content.c_str(), content.size());
    wop.write_full(bl);
    cta::exception::Errnum::throwOnNonZero(m_radosCtx.operate(name, &wop),
      std::string("In ObjectStoreRados::create, failed to create exclusively or write ")
      + name);
61
62
63
  }
  
  virtual void atomicOverwrite(std::string name, std::string content) {
64
65
    librados::ObjectWriteOperation wop;
    wop.assert_exists();
66
67
    ceph::bufferlist bl;
    bl.append(content.c_str(), content.size());
68
69
70
    wop.write_full(bl);
    cta::exception::Errnum::throwOnNonZero(m_radosCtx.operate(name, &wop),
      std::string("In ObjectStoreRados::atomicOverwrite, failed to assert existence or write ")
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
      + name);
  }

  virtual std::string read(std::string name) {
    std::string ret;
    uint64_t size;
    time_t time;
    cta::exception::Errnum::throwOnNonZero(m_radosCtx.stat(name, &size, &time),
      std::string("In ObjectStoreRados::read,  failed to stat ")
      + name);
    librados::bufferlist bl;
    cta::exception::Errnum::throwOnNegative(m_radosCtx.read(name, bl, size, 0),
      std::string("In ObjectStoreRados::read,  failed to read ")
      + name);
    bl.copy(0, size, ret);
    return ret;
  }
  
  virtual void remove(std::string name) {
    cta::exception::Errnum::throwOnNegative(m_radosCtx.remove(name));
  }
  
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
  virtual bool exists(std::string name) {
    uint64_t size;
    time_t date;
    if (m_radosCtx.stat(name, &size, &date)) {
      return false;
    } else {
      return true;
    }
  }
  
  class ScopedLock: public Backend::ScopedLock {
    friend class BackendRados;
  public:
    virtual void release() {
      if (!m_lockSet) return;
      cta::exception::Errnum::throwOnReturnedErrno(
        -m_context.unlock(m_oid, "lock", m_clientId),
        std::string("In cta::objectstore::ScopedLock::release, failed unlock ")+
      m_oid);
      m_lockSet = false;
    }
    virtual ~ScopedLock() { release(); }
  private:
    ScopedLock(librados::IoCtx & ioCtx): m_lockSet(false), m_context(ioCtx) {}
    void set(const std::string & oid, const std::string clientId) {
      m_oid = oid;
      m_clientId = clientId;\
      m_lockSet = true;
    }
    bool m_lockSet;
    librados::IoCtx & m_context;
    std::string m_clientId;
    std::string m_oid;
  };
  
private:
  std::string createUniqueClientId() {
130
131
132
133
134
135
136
    // Build a unique client name: host:thread
    char buff[200];
    cta::exception::Errnum::throwOnMinusOne(gethostname(buff, sizeof(buff)),
      "In ObjectStoreRados::lockExclusive:  failed to gethostname");
    pid_t tid = syscall(SYS_gettid);
    std::stringstream client;
    client << buff << ":" << tid;
137
138
139
140
141
142
    return client.str();
  }
  
public:  
  virtual ScopedLock * lockExclusive(std::string name) {
    std::string client = createUniqueClientId();
143
144
145
146
    struct timeval tv;
    tv.tv_usec = 0;
    tv.tv_sec = 10;
    int rc;
147
    std::auto_ptr<ScopedLock> ret(new ScopedLock(m_radosCtx));
148
    do {
149
      rc=m_radosCtx.lock_exclusive(name, "lock", client, "", &tv, 0);
150
151
152
    } while (-EBUSY==rc);
    cta::exception::Errnum::throwOnReturnedErrno(-rc,
      std::string("In ObjectStoreRados::lockExclusive, failed to librados::IoCtx::lock_exclusive: ")+
153
154
155
      name + "/" + "lock" + "/" + client + "//");
    ret->set(name, client);
    return ret.release();
156
157
158
  }


159
160
  virtual ScopedLock * lockShared(std::string name) {
    std::string client = createUniqueClientId();
161
162
163
164
    struct timeval tv;
    tv.tv_usec = 0;
    tv.tv_sec = 10;
    int rc;
165
    std::auto_ptr<ScopedLock> ret(new ScopedLock(m_radosCtx));
166
    do {
167
      rc=m_radosCtx.lock_shared(name, "lock", client, "", "", &tv, 0);
168
169
170
    } while (-EBUSY==rc);
    cta::exception::Errnum::throwOnReturnedErrno(-rc,
      std::string("In ObjectStoreRados::lockShared, failed to librados::IoCtx::lock_shared: ")+
171
172
173
      name + "/" + "lock" + "/" + client + "//");
    ret->set(name, client);
    return ret.release();
174
  }
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
  
  class Parameters: public Backend::Parameters {
    friend class BackendRados;
  public:
    /**
     * The standard-issue params to string for logging
     * @return a string representation of the parameters for logging
     */
    virtual std::string toStr() {
      std::stringstream ret;
      ret << "userId=" << m_userId << " pool=" << m_pool;
      return ret.str();
    }
  private:
    std::string m_userId;
    std::string m_pool;
  };
  
  virtual Parameters * getParams() {
    std::auto_ptr<Parameters> ret (new Parameters);
    ret->m_pool = m_pool;
    ret->m_userId = m_user;
    return ret.release();
198
199
  }

200
201
202
203
  virtual std::string typeName() {
    return "cta::objectstore::BackendRados";
  }
  
204
205
206
207
208
209
210
211
private:
  std::string m_user;
  std::string m_pool;
  librados::Rados m_cluster;
  librados::IoCtx m_radosCtx;
};

}} // end of cta::objectstore