DiskFile.cpp 29.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/******************************************************************************
 *
 * This file is part of the Castor project.
 * See http://castor.web.cern.ch/castor
 *
 * Copyright (C) 2003  CERN
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * 
 *
 * @author Castor Dev team, castor-dev@cern.ch
 *****************************************************************************/
23
#include <sys/types.h>
24
#include <sys/stat.h>
25

26
27
#include "disk/DiskFileImplementations.hpp"
#include "disk/RadosStriperPool.hpp"
28
#include "common/exception/Errnum.hpp"
29
#include "common/threading/MutexLocker.hpp"
30
#include "common/utils/utils.hpp"
31
#include <rados/buffer.h>
32
#include <xrootd/XrdCl/XrdClFile.hh>
33
#include <uuid/uuid.h>
34
35
36
#include <algorithm>
#include <cryptopp/base64.h>
#include <cryptopp/osrng.h>
37

38
39
namespace cta {
namespace disk {
40

41
DiskFileFactory::DiskFileFactory(const std::string & xrootPrivateKeyFile, uint16_t xrootTimeout,
42
  cta::disk::RadosStriperPool & striperPool):
43
  m_NoURLLocalFile("^(localhost:|)(/.*)$"),
44
  m_NoURLRemoteFile("^([^:]*:)(.*)$"),
45
  m_NoURLRadosStriperFile("^localhost:([^/]+)/(.*)$"),
46
47
  m_URLLocalFile("^file://(.*)$"),
  m_URLXrootFile("^(root://.*)$"),
48
  m_URLCephFile("^radosstriper:///([^:]+@[^:]+):(.*)$"),
49
  m_xrootPrivateKeyFile(xrootPrivateKeyFile),
50
  m_xrootPrivateKeyLoaded(false),
51
  m_xrootTimeout(xrootTimeout),
52
  m_striperPool(striperPool) {}
53

54
55
const CryptoPP::RSA::PrivateKey & DiskFileFactory::xrootPrivateKey() {
  if(!m_xrootPrivateKeyLoaded) {
56
57
58
59
60
61
    // There is one DiskFactory per disk thread, so this function is called
    // in a single thread. Nevertheless, we experience errors from double
    // deletes in the CryptoPP functions called from here.
    // As this function portion of the code is called once per disk thread,
    // serialising them will have little effect in performance.
    // This is an experimental workaround.
62
63
    static cta::threading::Mutex mutex;
    cta::threading::MutexLocker ml(mutex);
64
65
66
67
    // The loading of a PEM-style key is described in 
    // http://www.cryptopp.com/wiki/Keys_and_Formats#PEM_Encoded_Keys
    std::string key;
    std::ifstream keyFile(m_xrootPrivateKeyFile.c_str());
68
69
    if (!keyFile) {
      // We should get the detailed error from errno.
70
      throw cta::exception::Errnum(
71
72
        std::string("Failed to open xroot key file: ")+m_xrootPrivateKeyFile);
    }
73
74
75
76
77
78
79
80
81
82
83
    char buff[200];
    while(!keyFile.eof()) {
      keyFile.read(buff, sizeof(buff));
      key.append(buff,keyFile.gcount());
    }
    const std::string HEADER = "-----BEGIN RSA PRIVATE KEY-----";
    const std::string FOOTER = "-----END RSA PRIVATE KEY-----";
        
    size_t pos1, pos2;
    pos1 = key.find(HEADER);
    if(pos1 == std::string::npos)
84
        throw cta::exception::Exception(
85
86
87
88
          "In DiskFileFactory::xrootCryptoPPPrivateKey, PEM header not found");
        
    pos2 = key.find(FOOTER, pos1+1);
    if(pos2 == std::string::npos)
89
        throw cta::exception::Exception(
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
          "In DiskFileFactory::xrootCryptoPPPrivateKey, PEM footer not found");
        
    // Start position and length
    pos1 = pos1 + HEADER.length();
    pos2 = pos2 - pos1;
    std::string keystr = key.substr(pos1, pos2);
    
    // Base64 decode, place in a ByteQueue    
    CryptoPP::ByteQueue queue;
    CryptoPP::Base64Decoder decoder;
    
    decoder.Attach(new CryptoPP::Redirector(queue));
    decoder.Put((const byte*)keystr.data(), keystr.length());
    decoder.MessageEnd();

105
    m_xrootPrivateKey.BERDecodePrivateKey(queue, false /*paramsPresent*/, queue.MaxRetrievable());
106
107
108
109
    
    // BERDecodePrivateKey is a void function. Here's the only check
    // we have regarding the DER bytes consumed.
    if(!queue.IsEmpty())
110
      throw cta::exception::Exception(
111
112
113
        "In DiskFileFactory::xrootCryptoPPPrivateKey, garbage at end of key");
    
    CryptoPP::AutoSeededRandomPool prng;
114
    bool valid = m_xrootPrivateKey.Validate(prng, 3);
115
    if(!valid)
116
      throw cta::exception::Exception(
117
118
        "In DiskFileFactory::xrootCryptoPPPrivateKey, RSA private key is not valid");

119
    m_xrootPrivateKeyLoaded = true;
120
  }
121
  return m_xrootPrivateKey;
122
}
123

124
ReadFile * DiskFileFactory::createReadFile(const std::string& path) {
125
126
127
128
129
130
131
132
133
134
  std::vector<std::string> regexResult;
  // URL path parsing
  // local file URL?
  regexResult = m_URLLocalFile.exec(path);
  if (regexResult.size()) {
    return new LocalReadFile(regexResult[1]);
  }
  // Xroot URL?
  regexResult = m_URLXrootFile.exec(path);
  if (regexResult.size()) {
135
    return new XrootReadFile(regexResult[1], m_xrootTimeout);
136
137
138
139
  }
  // radosStriper URL?
  regexResult = m_URLCephFile.exec(path);
  if (regexResult.size()) {
140
141
142
    return new RadosStriperReadFile(regexResult[0],
      m_striperPool.throwingGetStriper(regexResult[1]),
      regexResult[2]);
143
144
145
146
147
148
149
150
151
152
  }
  // No URL path parsing
  // Do we have a local file?
  regexResult = m_NoURLLocalFile.exec(path);
  if (regexResult.size()) {
    return new LocalReadFile(regexResult[2]);
  }
  // Do we have a remote file?
  regexResult = m_NoURLRemoteFile.exec(path);
  if (regexResult.size()) {
153
154
155
    // In the current CASTOR implementation, the xrootd port is hard coded to 1095
    return new XrootC2FSReadFile(
      std::string("root://") + regexResult[1] + "1095/" + regexResult[2],
156
      xrootPrivateKey(), m_xrootTimeout);
157
158
159
160
  }
  // Do we have a radosStriper file?
  regexResult = m_NoURLRadosStriperFile.exec(path);
  if (regexResult.size()) {
161
    return new XrootC2FSReadFile(
162
      std::string("root://localhost:1095/")+regexResult[1]+"/"+regexResult[2],
163
      xrootPrivateKey(), m_xrootTimeout, regexResult[1]);
164
  }
165
  throw cta::exception::Exception(
166
      std::string("In DiskFileFactory::createReadFile failed to parse URL: ")+path);
167
168
}

169
WriteFile * DiskFileFactory::createWriteFile(const std::string& path) {
170
171
172
173
174
175
176
177
178
179
  std::vector<std::string> regexResult;
  // URL path parsing
  // local file URL?
  regexResult = m_URLLocalFile.exec(path);
  if (regexResult.size()) {
    return new LocalWriteFile(regexResult[1]);
  }
  // Xroot URL?
  regexResult = m_URLXrootFile.exec(path);
  if (regexResult.size()) {
180
    return new XrootWriteFile(regexResult[1], m_xrootTimeout);
181
182
183
184
  }
  // radosStriper URL?
  regexResult = m_URLCephFile.exec(path);
  if (regexResult.size()) {
185
186
187
    return new RadosStriperWriteFile(regexResult[0],
      m_striperPool.throwingGetStriper(regexResult[1]),
      regexResult[2]);
188
189
190
191
192
193
194
195
196
197
  }
  // No URL path parsing
  // Do we have a local file?
  regexResult = m_NoURLLocalFile.exec(path);
  if (regexResult.size()) {
    return new LocalWriteFile(regexResult[2]);
  }
  // Do we have a remote file?
  regexResult = m_NoURLRemoteFile.exec(path);
  if (regexResult.size()) {
198
199
200
    // In the current CASTOR implementation, the xrootd port is hard coded to 1095
    return new XrootC2FSWriteFile(
      std::string("root://") + regexResult[1] + "1095/" + regexResult[2],
201
      xrootPrivateKey(), m_xrootTimeout);
202
203
204
205
  }
  // Do we have a radosStriper file?
  regexResult = m_NoURLRadosStriperFile.exec(path);
  if (regexResult.size()) {
206
    return new XrootC2FSWriteFile(
207
      std::string("root://localhost:1095/")+regexResult[1]+"/"+regexResult[2],
208
      xrootPrivateKey(), m_xrootTimeout, regexResult[1]);
209
  }
210
  throw cta::exception::Exception(
211
      std::string("In DiskFileFactory::createWriteFile failed to parse URL: ")+path);
212
213
214
215
216
217
218
}

//==============================================================================
// LOCAL READ FILE
//==============================================================================  
LocalReadFile::LocalReadFile(const std::string &path)  {
  m_fd = ::open64((char *)path.c_str(), O_RDONLY);
219
220
  m_URL = "file://";
  m_URL += path;
221
  cta::exception::Errnum::throwOnMinusOne(m_fd,
222
223
    std::string("In diskFile::LocalReadFile::LocalReadFile failed open64() on ")+m_URL);

224
225
}

226
size_t LocalReadFile::read(void *data, const size_t size) const {
227
228
229
230
231
232
233
  return ::read(m_fd, data, size);
}

size_t LocalReadFile::size() const {
  //struct is mandatory here, because there is a function stat64 
  struct stat64 statbuf;        
  int ret = ::fstat64(m_fd,&statbuf);
234
  cta::exception::Errnum::throwOnMinusOne(ret,
235
    std::string("In diskFile::LocalReadFile::LocalReadFile failed stat64() on ")+m_URL);
236
237
238
239
240
241
242
243
244
245
246
247
248
249
  
  return statbuf.st_size;
}

LocalReadFile::~LocalReadFile() throw() {
  ::close(m_fd);
}

//==============================================================================
// LOCAL WRITE FILE
//============================================================================== 
LocalWriteFile::LocalWriteFile(const std::string &path): m_closeTried(false){
  // For local files, we truncate the file like for RFIO
  m_fd = ::open64((char *)path.c_str(), O_WRONLY|O_CREAT|O_TRUNC, 0666);
250
251
  m_URL = "file://";
  m_URL += path;
252
  cta::exception::Errnum::throwOnMinusOne(m_fd,
253
254
255
      std::string("In LocalWriteFile::LocalWriteFile() failed to open64() on ")
      +m_URL);        

256
257
258
259
260
261
}

void LocalWriteFile::write(const void *data, const size_t size)  {
  ::write(m_fd, (void *)data, size);
}

262
263
264
265
void LocalWriteFile::setChecksum(uint32_t checksum) {
  // Noop: this is only implemented for rados striper
}

266
267
268
269
void LocalWriteFile::close()  {
  // Multiple close protection
  if (m_closeTried) return;
  m_closeTried=true;
270
  cta::exception::Errnum::throwOnMinusOne(::close(m_fd),
271
      std::string("In LocalWriteFile::close failed close() on ")+m_URL);        
272
273
274
275
276
277
278
279
}

LocalWriteFile::~LocalWriteFile() throw() {
  if(!m_closeTried){
    ::close(m_fd);
  }
}

280
281
282
//==============================================================================
// CRYPTOPP SIGNER
//==============================================================================
283
cta::threading::Mutex CryptoPPSigner::s_mutex;
284
285
286
287

std::string CryptoPPSigner::sign(const std::string msg, 
  const CryptoPP::RSA::PrivateKey& privateKey) {
  // Global lock as Crypto++ seems not to be thread safe (helgrind complains)
288
  cta::threading::MutexLocker ml(s_mutex);
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
  // Create a signer object
  CryptoPP::RSASSA_PKCS1v15_SHA_Signer signer(privateKey);
  // Return value
  std::string ret;
  // Random number generator (not sure it's really used)
  CryptoPP::AutoSeededRandomPool rng;
  // Construct a pipe: msg -> sign -> Base64 encode -> result goes into ret.
  const bool noNewLineInBase64Output = false;
  CryptoPP::StringSource ss1(msg, true, 
      new CryptoPP::SignerFilter(rng, signer,
        new CryptoPP::Base64Encoder(
          new CryptoPP::StringSink(ret), noNewLineInBase64Output)));
  // That's all job's already done.
  return ret;
}

305
306
307
//==============================================================================
// XROOT READ FILE
//==============================================================================  
308
XrootC2FSReadFile::XrootC2FSReadFile(const std::string &url,
309
310
  const CryptoPP::RSA::PrivateKey & xrootPrivateKey, uint16_t timeout,
  const std::string & pool): XrootBaseReadFile(timeout) {
311
312
  // Setup parent's members
  m_readPosition = 0;
313
  m_URL = url;
314
315
  // And start opening
  using XrdCl::OpenFlags;
316
317
  m_signedURL = m_URL;
  // Turn the bare URL into a Castor URL, by adding opaque tags:
318
  // ?castor.pfn1=/srv/castor/...  (duplication of the path in practice)
319
  // ?castor.pfn2=0:moverHandlerPort:transferId
320
321
  // ?castor.pool=xxx optional ceph pool
  // ?castor.exptime=(unix time)
322
  // ?castor.txtype=tape
323
  // ?castor.signature=
324
325
326
327
328
  //Find the path part of the url. It is the first occurence of "//"
  // after the inital [x]root://
  const std::string scheme = "root://";
  size_t schemePos = url.find(scheme);
  if (std::string::npos == schemePos) 
329
    throw cta::exception::Exception(
330
      std::string("In XrootC2FSReadFile::XrootC2FSReadFile could not find the scheme[x]root:// in URL "+
331
        url));
332
  size_t pathPos = url.find("/", schemePos + scheme.size());
333
  if (std::string::npos == pathPos) 
334
    throw cta::exception::Exception(
335
      std::string("In XrootC2FSReadFile::XrootC2FSReadFile could not path in URL "+
336
        url));
337
  // Build signature block
338
339
  std::string path = url.substr(pathPos + 1);
  time_t expTime = time(NULL)+3600;
340
341
342
343
  uuid_t uuid;
  char suuid[100];
  uuid_generate(uuid);
  uuid_unparse(uuid, suuid);
344
  std::stringstream signatureBlock;
345
  signatureBlock << path << "0:" << suuid << "0" << expTime << "tape";
346
347
  // Sign the block
  std::string signature = CryptoPPSigner::sign(signatureBlock.str(), xrootPrivateKey);
348
  // Build URL parameters
349
  std::stringstream opaqueBloc;
350
  opaqueBloc << "?castor.pfn1=" << path;
351
  opaqueBloc << "&castor.pfn2=0:" << suuid;
352
  if (pool.size())
353
354
    opaqueBloc << "&castor.pool=" << pool;
  opaqueBloc << "&castor.exptime=" << expTime;
355
  opaqueBloc << "&castor.txtype=tape";
356
  opaqueBloc << "&castor.signature=" << signature;
357
358
359
  m_signedURL = m_URL + opaqueBloc.str();
  
  // ... and finally open the file
360
  XrootClEx::throwOnError(m_xrootFile.Open(m_signedURL, OpenFlags::Read, XrdCl::Access::None, m_timeout),
361
    std::string("In XrootC2FSReadFile::XrootC2FSReadFile failed XrdCl::File::Open() on ")
362
    +m_URL+" opaqueBlock="+opaqueBloc.str());
363
364
}

365
366
XrootReadFile::XrootReadFile(const std::string &xrootUrl, uint16_t timeout):
  XrootBaseReadFile(timeout) {
367
368
  // Setup parent's variables
  m_readPosition = 0;
369
370
  m_URL = xrootUrl;

371
372
  // and simply open
  using XrdCl::OpenFlags;
373
  XrootClEx::throwOnError(m_xrootFile.Open(m_URL, OpenFlags::Read, XrdCl::Access::None, m_timeout),
374
    std::string("In XrootReadFile::XrootReadFile failed XrdCl::File::Open() on ")+m_URL);
375
376
377
}

size_t XrootBaseReadFile::read(void *data, const size_t size) const {
378
  uint32_t ret;
379
  XrootClEx::throwOnError(m_xrootFile.Read(m_readPosition, size, data, ret, m_timeout),
380
    std::string("In XrootReadFile::read failed XrdCl::File::Read() on ")+m_URL);
381
382
383
384
  m_readPosition += ret;
  return ret;
}

385
size_t XrootBaseReadFile::size() const {
386
387
388
  const bool forceStat=true;
  XrdCl::StatInfo *statInfo(NULL);
  size_t ret;
389
  XrootClEx::throwOnError(m_xrootFile.Stat(forceStat, statInfo, m_timeout),
390
    std::string("In XrootReadFile::size failed XrdCl::File::Stat() on ")+m_URL);
391
392
393
394
395
  ret= statInfo->GetSize();
  delete statInfo;
  return ret;
}

396
XrootBaseReadFile::~XrootBaseReadFile() throw() {
397
  try{
398
399
400
401
402
    // Use the result of Close() to avoid gcc >= 7 generating an unused-result
    // warning (casting the result to void is not good enough for gcc >= 7)
    if(!m_xrootFile.Close(m_timeout).IsOK()) {
      // Ignore the error
    }
403
  } catch (...) {}
404
405
406
407
408
}

//==============================================================================
// XROOT WRITE FILE
//============================================================================== 
409
XrootC2FSWriteFile::XrootC2FSWriteFile(const std::string &url,
410
411
412
  const CryptoPP::RSA::PrivateKey & xrootPrivateKey, uint16_t timeout,
  const std::string & pool):
  XrootBaseWriteFile(timeout) {
413
  // Start opening
414
  using XrdCl::OpenFlags;
415
416
417
  m_URL=url;
  m_signedURL = m_URL;
  // Turn the bare URL into a Castor URL, by adding opaque tags:
418
  // ?castor.pfn1=/srv/castor/...  (duplication of the path in practice)
419
  // ?castor.pfn2=0:moverHandlerPort:transferId
420
421
  // ?castor.pool=xxx optional ceph pool
  // ?castor.exptime=(unix time)
422
  // ?castor.txtype=tape
423
  // ?castor.signature=
424
425
426
427
428
  //Find the path part of the url. It is the first occurence of "//"
  // after the inital [x]root://
  const std::string scheme = "root://";
  size_t schemePos = url.find(scheme);
  if (std::string::npos == schemePos) 
429
    throw cta::exception::Exception(
430
      std::string("In XrootC2FSWriteFile::XrootC2FSWriteFile could not find the scheme[x]root:// in URL "+
431
        url));
432
  size_t pathPos = url.find("/", schemePos + scheme.size());
433
  if (std::string::npos == pathPos) 
434
    throw cta::exception::Exception(
435
      std::string("In XrootC2FSWriteFile::XrootC2FSWriteFile could not path in URL "+
436
437
        url));
  // Build signature block
438
  std::string path = url.substr(pathPos + 1);
439
  time_t expTime = time(NULL)+3600;
440
441
442
443
  uuid_t uuid;
  char suuid[100];
  uuid_generate(uuid);
  uuid_unparse(uuid, suuid);
444
  std::stringstream signatureBlock;
445
  signatureBlock << path << "0:" << suuid << "0" << expTime << "tape";
446
447
  // Sign the block
  std::string signature = CryptoPPSigner::sign(signatureBlock.str(), xrootPrivateKey);
448
  // Build URL parameters
449
  std::stringstream opaqueBloc;
450
  opaqueBloc << "?castor.pfn1=" << path;
451
  opaqueBloc << "&castor.pfn2=0:" << suuid;
452
  if (pool.size())
453
454
    opaqueBloc << "&castor.pool=" << pool;
  opaqueBloc << "&castor.exptime=" << expTime;
455
  opaqueBloc << "&castor.txtype=tape";
456
  opaqueBloc << "&castor.signature=" << signature;
457
458
  m_signedURL = m_URL + opaqueBloc.str();
  
459
  // ... and finally open the file for write (deleting any existing one in case)
460
461
  XrootClEx::throwOnError(m_xrootFile.Open(m_signedURL, OpenFlags::Delete | OpenFlags::Write,
    XrdCl::Access::None, m_timeout),
462
    std::string("In XrootC2FSWriteFile::XrootC2FSWriteFile failed XrdCl::File::Open() on ")
463
    +m_URL);
464
465
}

466
467
XrootWriteFile::XrootWriteFile(const std::string& xrootUrl, uint16_t timeout):
  XrootBaseWriteFile(timeout) {
468
469
470
471
  // Setup parent's variables
  m_URL = xrootUrl;
  // and simply open
  using XrdCl::OpenFlags;
472
473
  XrootClEx::throwOnError(m_xrootFile.Open(m_URL, OpenFlags::Delete | OpenFlags::Write,
    XrdCl::Access::None, m_timeout),
474
475
476
477
478
    std::string("In XrootWriteFile::XrootWriteFile failed XrdCl::File::Open() on ")+m_URL);

}

void XrootBaseWriteFile::write(const void *data, const size_t size)  {
479
  XrootClEx::throwOnError(m_xrootFile.Write(m_writePosition, size, data, m_timeout),
480
481
    std::string("In XrootWriteFile::write failed XrdCl::File::Write() on ")
    +m_URL);
482
483
484
  m_writePosition += size;
}

485
486
487
488
void XrootBaseWriteFile::setChecksum(uint32_t checksum) {
  // Noop: this is only implemented for rados striper
}

489
void XrootBaseWriteFile::close()  {
490
491
492
  // Multiple close protection
  if (m_closeTried) return;
  m_closeTried=true;
493
  XrootClEx::throwOnError(m_xrootFile.Close(m_timeout),
Eric Cano's avatar
Eric Cano committed
494
    std::string("In XrootWriteFile::close failed XrdCl::File::Close() on ")+m_URL);
495
496
}

497
XrootBaseWriteFile::~XrootBaseWriteFile() throw() {
498
  if(!m_closeTried){
499
500
501
502
503
    // Use the result of Close() to avoid gcc >= 7 generating an unused-result
    // warning (casting the result to void is not good enough for gcc >= 7)
    if(!m_xrootFile.Close(m_timeout).IsOK()) {
      // Ignore the error
    }
504
505
506
507
508
509
  }
}

//==============================================================================
// RADOS STRIPER READ FILE
//==============================================================================
510
511
512
513
514
RadosStriperReadFile::RadosStriperReadFile(const std::string &fullURL,
  libradosstriper::RadosStriper * striper,
  const std::string &osd): m_striper(striper),
  m_osd(osd), m_readPosition(0) {
    m_URL=fullURL;
515
516
}

517
size_t RadosStriperReadFile::read(void *data, const size_t size) const {
518
519
520
521
522
523
524
525
526
  ::ceph::bufferlist bl;
  int rc = m_striper->read(m_osd, &bl, size, m_readPosition);
  if (rc < 0) {
    throw cta::exception::Errnum(-rc,
        "In RadosStriperReadFile::read(): failed to striper->read: ");
  }
  bl.copy(0, rc, (char *)data);
  m_readPosition += rc;
  return rc;
527
528
529
}

size_t RadosStriperReadFile::size() const {
530
531
532
533
534
535
  uint64_t size;
  time_t time;
  cta::exception::Errnum::throwOnReturnedErrno(
      -m_striper->stat(m_osd, &size, &time),
      "In RadosStriperReadFile::size(): failed to striper->stat(): ");
  return size;
536
537
}

538
RadosStriperReadFile::~RadosStriperReadFile() throw() {}
539
540

//==============================================================================
541
// RADOS STRIPER WRITE FILE
542
//============================================================================== 
543
544
545
546
547
548
549
550
551
552
553
RadosStriperWriteFile::RadosStriperWriteFile(const std::string &fullURL,
  libradosstriper::RadosStriper * striper,
  const std::string &osd): m_striper(striper),
  m_osd(osd), m_writePosition(0) {
  m_URL=fullURL;
  // Truncate the possibly existing file. If the file does not exist, it's fine.
  int rc=m_striper->trunc(m_osd, 0);
  if (rc < 0 && rc != -ENOENT) {
    throw cta::exception::Errnum(-rc, "In RadosStriperWriteFile::RadosStriperWriteFile(): "
        "failed to striper->trunc(): ");
  }
554
555
556
}

void RadosStriperWriteFile::write(const void *data, const size_t size)  {
557
558
559
560
561
562
563
564
  ::ceph::bufferlist bl;
  bl.append((char *)data, size);
  int rc = m_striper->write(m_osd, bl, size, m_writePosition);
  if (rc) {
    throw cta::exception::Errnum(-rc, "In RadosStriperWriteFile::write(): "
        "failed to striper->write(): ");
  }
  m_writePosition += size;
565
566
}

567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
void RadosStriperWriteFile::setChecksum(uint32_t checksum) {
  // Set the checksum type (hardcoded)
  int rc;
  std::string checksumType("ADLER32");
  ::ceph::bufferlist blType;
  blType.append(checksumType.c_str(), checksumType.size());
  rc = m_striper->setxattr(m_osd, "user.castor.checksum.type", blType);
  if (rc) {
    throw cta::exception::Errnum(-rc, "In RadosStriperWriteFile::setChecksum(): "
        "failed to striper->setxattr(user.castor.checksum.type): ");
  }
  // Turn the numeric checksum into a string and set it as checksum value
  std::stringstream checksumStr;
  checksumStr << std::hex << std::nouppercase << checksum;
  ::ceph::bufferlist blChecksum;
  blChecksum.append(checksumStr.str().c_str(), checksumStr.str().size());
  rc = m_striper->setxattr(m_osd, "user.castor.checksum.value", blChecksum);
  if (rc) {
    throw cta::exception::Errnum(-rc, "In RadosStriperWriteFile::setChecksum(): "
        "failed to striper->setxattr(user.castor.checksum.value): ");
  }
}

590
void RadosStriperWriteFile::close()  {
591
  // Nothing to do as writes are synchronous
592
593
}

594
RadosStriperWriteFile::~RadosStriperWriteFile() throw() {}
595

596
//==============================================================================
597
// AsyncDiskFileRemover FACTORY
598
//==============================================================================
599
AsyncDiskFileRemoverFactory::AsyncDiskFileRemoverFactory():
600
601
602
    m_URLLocalFile("^file://(.*)$"),
    m_URLXrootdFile("^(root://.*)$"){}

603
AsyncDiskFileRemover * AsyncDiskFileRemoverFactory::createAsyncDiskFileRemover(const std::string &path){
604
605
606
607
608
  // URL path parsing
  std::vector<std::string> regexResult;
  //local file URL?
  regexResult = m_URLLocalFile.exec(path);
  if(regexResult.size()){
609
    return new AsyncLocalDiskFileRemover(regexResult[1]);
610
611
612
  }
  regexResult = m_URLXrootdFile.exec(path);
  if(regexResult.size()){
613
    return new AsyncXRootdDiskFileRemover(path);
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
  }
  throw cta::exception::Exception("In DiskFileRemoverFactory::createDiskFileRemover: unknown type of URL");
}


//==============================================================================
// LocalDiskFileRemover
//==============================================================================
LocalDiskFileRemover::LocalDiskFileRemover(const std::string &path){
  m_URL = path;
}

void LocalDiskFileRemover::remove(){
  cta::exception::Errnum::throwOnNonZero(::remove(m_URL.c_str()),"In LocalDiskFileRemover::remove(), failed to delete the file at "+m_URL);
}

//==============================================================================
// XRootdDiskFileRemover
//==============================================================================
XRootdDiskFileRemover::XRootdDiskFileRemover(const std::string& path):m_xrootFileSystem(path){
  m_URL = path;
635
  m_truncatedFileURL = cta::utils::extractPathFromXrootdPath(path);
636
637
638
639
}

void XRootdDiskFileRemover::remove(){
  XrdCl::XRootDStatus statusRm = m_xrootFileSystem.Rm(m_truncatedFileURL,c_xrootTimeout);
640
641
642
643
644
645
646
647
648
649
  cta::exception::XrootCl::throwOnError(statusRm,"In XRootdDiskFileRemover::remove(), fail to remove file.");;
}

void XRootdDiskFileRemover::removeAsync(AsyncXRootdDiskFileRemover::XRootdFileRemoverResponseHandler &responseHandler){
  XrdCl::XRootDStatus statusRm = m_xrootFileSystem.Rm(m_truncatedFileURL,&responseHandler,c_xrootTimeout);
  try{
    cta::exception::XrootCl::throwOnError(statusRm,"In XRootdDiskFileRemover::remove(), fail to remove file.");
  } catch(const cta::exception::Exception &e){
    responseHandler.m_deletionPromise.set_exception(std::current_exception());
  }
650
651
652
}

//==============================================================================
653
// AsyncXrootdDiskFileRemover
654
//============================================================================== 
655
656
AsyncXRootdDiskFileRemover::AsyncXRootdDiskFileRemover(const std::string &path){
  m_diskFileRemover.reset(new XRootdDiskFileRemover(path));
657
658
}

659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
void AsyncXRootdDiskFileRemover::asyncDelete(){
  m_diskFileRemover->removeAsync(m_responseHandler);
}

void AsyncXRootdDiskFileRemover::wait(){
  m_responseHandler.m_deletionPromise.get_future().get();
}

void AsyncXRootdDiskFileRemover::XRootdFileRemoverResponseHandler::HandleResponse(XrdCl::XRootDStatus* status, XrdCl::AnyObject* response){
  try{
    cta::exception::XrootCl::throwOnError(*status,"In XRootdDiskFileRemover::remove(), fail to remove file.");
    m_deletionPromise.set_value();
  } catch(const cta::exception::Exception &e){
    m_deletionPromise.set_exception(std::current_exception());
  }
}

//==============================================================================
// AsyncLocalDiskFileRemover
//============================================================================== 
AsyncLocalDiskFileRemover::AsyncLocalDiskFileRemover(const std::string& path){
  m_diskFileRemover.reset(new LocalDiskFileRemover(path));  
}

void AsyncLocalDiskFileRemover::asyncDelete(){
684
685
686
  m_futureDeletion = std::async(std::launch::async,[this](){m_diskFileRemover->remove();});
}

687
void AsyncLocalDiskFileRemover::wait(){
688
689
  m_futureDeletion.get();
}
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726

//==============================================================================
// DIRECTORY FACTORY
//============================================================================== 
DirectoryFactory::DirectoryFactory():
    m_URLLocalDirectory("^file://(.*)$"),
    m_URLXrootDirectory("^(root://.*)$"){}


Directory * DirectoryFactory::createDirectory(const std::string& path){
  // URL path parsing
  std::vector<std::string> regexResult;
  // local file URL?
  regexResult = m_URLLocalDirectory.exec(path);
  if (regexResult.size()) {
    return new LocalDirectory(regexResult[1]);
  }
  // Xroot URL?
  regexResult = m_URLXrootDirectory.exec(path);
  if (regexResult.size()) {
    return new XRootdDirectory(path);
  }
  throw cta::exception::Exception("In DirectoryFactory::createDirectory: unknown type of URL");
}

//==============================================================================
// LOCAL DIRECTORY
//============================================================================== 
LocalDirectory::LocalDirectory(const std::string& path){
  m_URL = path;
}

void LocalDirectory::mkdir(){
  const int retCode = ::mkdir(m_URL.c_str(),S_IRWXU);
  cta::exception::Errnum::throwOnMinusOne(retCode,"In LocalDirectory::mkdir(): failed to create directory at "+m_URL);
}

727
728
729
730
731
void LocalDirectory::rmdir(){
  const int retcode = ::rmdir(m_URL.c_str());
  cta::exception::Errnum::throwOnMinusOne(retcode,"In LocalDirectory::rmdir(): failed to remove the directory at "+m_URL);
}

732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
bool LocalDirectory::exist(){
  struct stat buffer;
  return (stat(m_URL.c_str(), &buffer) == 0);
}

std::set<std::string> LocalDirectory::getFilesName(){
  std::set<std::string> names;
  DIR *dir;
  struct dirent *file;
  dir = opendir(m_URL.c_str());
  cta::exception::Errnum::throwOnNull(dir,"In LocalDirectory::getFilesName, failed to open directory at "+m_URL);
  while((file = readdir(dir)) != NULL){
    char *fileName = file->d_name;
    if(strcmp(fileName,".")  && strcmp(fileName,"..")){
      names.insert(std::string(file->d_name));
    }
  }
  cta::exception::Errnum::throwOnMinusOne(::closedir(dir),"In LocalDirectory::getFilesName(), fail to close directory at "+m_URL);
  return names;
}

//==============================================================================
// XROOT DIRECTORY
//============================================================================== 
XRootdDirectory::XRootdDirectory(const std::string& path):m_xrootFileSystem(path){
  m_URL = path;
758
  m_truncatedDirectoryURL = cta::utils::extractPathFromXrootdPath(path);
759
760
761
762
763
764
765
}

void XRootdDirectory::mkdir() {
  XrdCl::XRootDStatus mkdirStatus = m_xrootFileSystem.MkDir(m_truncatedDirectoryURL,XrdCl::MkDirFlags::None,XrdCl::Access::Mode::UR | XrdCl::Access::Mode::UW | XrdCl::Access::Mode::UX,c_xrootTimeout);
  cta::exception::XrootCl::throwOnError(mkdirStatus,"In XRootdDirectory::mkdir() : failed to create directory at "+m_URL);
}

766
767
768
769
770
void XRootdDirectory::rmdir() {
  XrdCl::XRootDStatus rmdirStatus = m_xrootFileSystem.RmDir(m_truncatedDirectoryURL, c_xrootTimeout);
  cta::exception::XrootCl::throwOnError(rmdirStatus,"In XRootdDirectory::rmdir() : failed to remove directory at "+m_URL);
}

771
bool XRootdDirectory::exist() {
772
773
  XrdCl::StatInfo *statInfo;
  XrdCl::XRootDStatus statStatus = m_xrootFileSystem.Stat(m_truncatedDirectoryURL,statInfo,c_xrootTimeout);
774
  cta::exception::XrootCl::throwOnError(statStatus,"In XRootdDirectory::exist() : failed to stat the directory at "+m_URL);
775
776
777
  if(statStatus.errNo == XErrorCode::kXR_NotFound){
    return false;
  }
778
  return true;
779
780
781
782
}

std::set<std::string> XRootdDirectory::getFilesName(){
  std::set<std::string> ret;
783
784
785
786
787
788
789
790
  XrdCl::DirectoryList *directoryContent;
  XrdCl::XRootDStatus dirListStatus = m_xrootFileSystem.DirList(m_truncatedDirectoryURL,XrdCl::DirListFlags::Flags::Stat,directoryContent,c_xrootTimeout);
  cta::exception::XrootCl::throwOnError(dirListStatus,"In XrootdDirectory::getFilesName(): unable to list the files contained in the directory.");
  XrdCl::DirectoryList::ConstIterator iter = directoryContent->Begin();
  while(iter != directoryContent->End()){
    ret.insert((*iter)->GetName());
    iter++;
  }
791
792
793
  return ret;
}

794
}} //end of namespace cta::disk