Commit 1797bcd6 authored by Victor Kotlyar's avatar Victor Kotlyar
Browse files

Ported commits from castor/master:

c5c8509eaf971b2c37a1b2c333693474e14da31b
  ported only radosstriper part of tapserverd

  CASTOR-4739 tapeserverd should support localfile, rfio, xroot and
  rados striper access for disk files

  Implemented the support for direct rados striper support in
  tapeserverd.
  The expected URL is currently: radosstriper:///user@pool:filePath
  (with no ':' in user and pool).

078bd4b88b19a5d879c5e92d5a6469e645a2f7a2
  Added missing test for the radosstriper URL parsing regex and fixed
  it.

b797c72352d03db79cf62471b8eeec65c4fa1270
  Fixed call striperPool: we now call the exception throwing version.
parent df9be572
......@@ -25,6 +25,7 @@
#include "castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp"
#include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp"
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
#include "castor/tape/tapeserver/file/RadosStriperPool.hpp"
#include "common/log/LogContext.hpp"
#include "common/log/StringLogger.hpp"
#include "castor/messages/TapeserverProxyDummy.hpp"
......@@ -132,7 +133,9 @@ namespace unitTests{
FakeTapeWriteTask ftwt;
ftwt.pushDataBlock(new MemBlock(1,blockSize));
castor::tape::tapeserver::daemon::DiskReadTask drt(ftwt,&file,blockNeeded,flag);
DiskFileFactory fileFactory("RFIO","", 0);
castor::tape::file::RadosStriperPool striperPool;
DiskFileFactory fileFactory("RFIO", "", 0, striperPool);
castor::messages::TapeserverProxyDummy tspd;
MockMigrationWatchDog mmwd(1.0, 1.0, tspd, "", lc);
drt.execute(lc,fileFactory,mmwd);
......
......@@ -26,6 +26,7 @@
#include "castor/tape/tapeserver/daemon/DiskReadTask.hpp"
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
#include "castor/tape/tapeserver/file/RadosStriperPool.hpp"
#include "common/threading/BlockingQueue.hpp"
#include "common/threading/Threading.hpp"
#include "common/threading/AtomicCounter.hpp"
......@@ -147,7 +148,7 @@ private:
DiskReadWorkerThread(DiskReadThreadPool & parent):
m_parent(parent),m_threadID(parent.m_nbActiveThread++),m_lc(parent.m_lc),
m_diskFileFactory(parent.m_remoteFileProtocol, parent.m_xrootPrivateKeyPath,
parent.m_xrootTimeout){
parent.m_xrootTimeout, parent.m_striperPool){
cta::log::LogContext::ScopedParam param(m_lc, cta::log::Param("threadID", m_threadID));
m_lc.log(cta::log::INFO,"DisReadThread created");
}
......@@ -204,7 +205,12 @@ private:
* Parameter: xroot timeout
*/
uint16_t m_xrootTimeout;
/**
* A pool of rados striper connections, to be shared by all threads
*/
castor::tape::file::RadosStriperPool m_striperPool;
/**
* Reference to the watchdog, for error reporting.
*/
......
......@@ -101,7 +101,8 @@ namespace unitTests{
TestingRetrieveMount trm(std::move(dbrm));
MockRecallReportPacker report(&trm,lc);
RecallMemoryManager mm(10,100,lc);
DiskFileFactory fileFactory("RFIO","", 0);
castor::tape::file::RadosStriperPool striperPool;
DiskFileFactory fileFactory("RFIO","", 0, striperPool);
cta::MockRetrieveMount mrm;
std::unique_ptr<TestingRetrieveJob> fileToRecall(new TestingRetrieveJob(mrm));
......
......@@ -26,13 +26,13 @@
#include "common/threading/BlockingQueue.hpp"
#include "common/threading/Threading.hpp"
#include "common/threading/AtomicCounter.hpp"
#include "common/log/LogContext.hpp"
#include "castor/tape/tapeserver/utils/suppressUnusedVariable.hpp"
#include "castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
#include "castor/tape/tapeserver/daemon/DiskWriteTask.hpp"
#include "castor/tape/tapeserver/daemon/DiskStats.hpp"
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
#include "castor/tape/tapeserver/file/RadosStriperPool.hpp"
#include "common/Timer.hpp"
#include <vector>
#define __STDC_FORMAT_MACROS
......@@ -114,7 +114,7 @@ private:
m_threadID(manager.m_nbActiveThread++),m_parentThreadPool(manager),
m_lc(m_parentThreadPool.m_lc),
m_diskFileFactory(manager.m_remoteFileProtocol, manager.m_xrootPrivateKeyPath,
manager.m_xrootTimeout)
manager.m_xrootTimeout, manager.m_striperPool)
{
// This thread Id will remain for the rest of the thread's lifetime (and
// also context's lifetime) so ne need for a scope.
......@@ -199,6 +199,11 @@ protected:
*/
uint16_t m_xrootTimeout;
/**
* A pool of rados striper connections, to be shared by all threads
*/
castor::tape::file::RadosStriperPool m_striperPool;
private:
/**
* Aggregate all threads' stats
......
......@@ -28,6 +28,7 @@ include_directories (${XROOTD_INCLUDE_DIR})
set(TAPESERVER_FILE_LIBRARY_SRCS
File.cpp
DiskFile.cpp
RadosStriperPool.cpp
Structures.cpp
../exception/XrootCl.cpp)
......@@ -42,7 +43,7 @@ endif(CMAKE_COMPILER_IS_GNUCC)
add_library(File
${TAPESERVER_FILE_LIBRARY_SRCS})
target_link_libraries (File XrdCl cryptopp)
target_link_libraries (File XrdCl cryptopp radosstriper)
if(CMAKE_COMPILER_IS_GNUCC)
if(GCC_VERSION_GE_4_8_0)
......
......@@ -25,6 +25,7 @@
#include "common/threading/Threading.hpp"
#include "castor/tape/tapeserver/file/DiskFileImplementations.hpp"
#include "castor/tape/tapeserver/file/DiskFile.hpp"
#include "castor/tape/tapeserver/file/RadosStriperPool.hpp"
#include <cryptopp/base64.h>
#include <cryptopp/osrng.h>
......@@ -163,7 +164,9 @@ namespace unitTests {
}
private:
virtual void run() {
castor::tape::diskFile::DiskFileFactory dff("xroot", m_keyPath, 0);
castor::tape::file::RadosStriperPool striperPool;
castor::tape::diskFile::DiskFileFactory dff("xroot", m_keyPath, 0,
striperPool);
for (int i=0; i<5; i++) {
// Read keys in parallel and in a loop to test MT protection of the
// key reading, not protected here.
......
......@@ -25,8 +25,10 @@
#include "castor/common/CastorConfiguration.hpp"
#include "castor/tape/tapeserver/file/DiskFile.hpp"
#include "castor/tape/tapeserver/file/DiskFileImplementations.hpp"
#include "castor/tape/tapeserver/file/RadosStriperPool.hpp"
#include "common/exception/Errnum.hpp"
#include "common/threading/MutexLocker.hpp"
#include <rados/buffer.h>
#include <xrootd/XrdCl/XrdClFile.hh>
#include <uuid/uuid.h>
#include <algorithm>
......@@ -38,18 +40,20 @@ namespace tape {
namespace diskFile {
DiskFileFactory::DiskFileFactory(const std::string & remoteFileProtocol,
const std::string & xrootPrivateKeyFile, uint16_t xrootTimeout):
const std::string & xrootPrivateKeyFile, uint16_t xrootTimeout,
castor::tape::file::RadosStriperPool & striperPool):
m_NoURLLocalFile("^(localhost:|)(/.*)$"),
m_NoURLRemoteFile("^([^:]*:)(.*)$"),
m_NoURLRadosStriperFile("^localhost:([^/]+)/(.*)$"),
m_URLLocalFile("^file://(.*)$"),
m_URLEosFile("^eos://(.*)$"),
m_URLXrootFile("^(root://.*)$"),
m_URLCephFile("^radosStriper://(.*)$"),
m_URLCephFile("^radosstriper:///([^:]+@[^:]+):(.*)$"),
m_remoteFileProtocol(remoteFileProtocol),
m_xrootPrivateKeyFile(xrootPrivateKeyFile),
m_xrootPrivateKeyLoaded(false),
m_xrootTimeout(xrootTimeout)
m_xrootTimeout(xrootTimeout),
m_striperPool(striperPool)
{
// Lowercase the protocol string
std::transform(m_remoteFileProtocol.begin(), m_remoteFileProtocol.end(),
......@@ -146,7 +150,9 @@ ReadFile * DiskFileFactory::createReadFile(const std::string& path) {
// radosStriper URL?
regexResult = m_URLCephFile.exec(path);
if (regexResult.size()) {
return new RadosStriperReadFile(regexResult[1]);
return new RadosStriperReadFile(regexResult[0],
m_striperPool.throwingGetStriper(regexResult[1]),
regexResult[2]);
}
// No URL path parsing
// Do we have a local file?
......@@ -193,7 +199,9 @@ WriteFile * DiskFileFactory::createWriteFile(const std::string& path) {
// radosStriper URL?
regexResult = m_URLCephFile.exec(path);
if (regexResult.size()) {
return new RadosStriperWriteFile(regexResult[1]);
return new RadosStriperWriteFile(regexResult[0],
m_striperPool.throwingGetStriper(regexResult[1]),
regexResult[2]);
}
// No URL path parsing
// Do we have a local file?
......@@ -589,43 +597,67 @@ EosWriteFile::~EosWriteFile() throw() {
//==============================================================================
// RADOS STRIPER READ FILE
//==============================================================================
RadosStriperReadFile::RadosStriperReadFile(const std::string &url){
throw cta::exception::Exception(
"RadosStriperReadFile::RadosStriperReadFile is not implemented");
RadosStriperReadFile::RadosStriperReadFile(const std::string &fullURL,
libradosstriper::RadosStriper * striper,
const std::string &osd): m_striper(striper),
m_osd(osd), m_readPosition(0) {
m_URL=fullURL;
}
size_t RadosStriperReadFile::read(void *data, const size_t size) const {
throw cta::exception::Exception(
"RadosStriperReadFile::read is not implemented");
::ceph::bufferlist bl;
int rc = m_striper->read(m_osd, &bl, size, m_readPosition);
if (rc < 0) {
throw cta::exception::Errnum(-rc,
"In RadosStriperReadFile::read(): failed to striper->read: ");
}
bl.copy(0, rc, (char *)data);
m_readPosition += rc;
return rc;
}
size_t RadosStriperReadFile::size() const {
throw cta::exception::Exception(
"RadosStriperReadFile::size is not implemented");
uint64_t size;
time_t time;
cta::exception::Errnum::throwOnReturnedErrno(
-m_striper->stat(m_osd, &size, &time),
"In RadosStriperReadFile::size(): failed to striper->stat(): ");
return size;
}
RadosStriperReadFile::~RadosStriperReadFile() throw() {
}
RadosStriperReadFile::~RadosStriperReadFile() throw() {}
//==============================================================================
// RADOS STRIPER WRITE FILE
//==============================================================================
RadosStriperWriteFile::RadosStriperWriteFile(const std::string &url){
throw cta::exception::Exception(
"RadosStriperWriteFile::RadosStriperWriteFile is not implemented");
RadosStriperWriteFile::RadosStriperWriteFile(const std::string &fullURL,
libradosstriper::RadosStriper * striper,
const std::string &osd): m_striper(striper),
m_osd(osd), m_writePosition(0) {
m_URL=fullURL;
// Truncate the possibly existing file. If the file does not exist, it's fine.
int rc=m_striper->trunc(m_osd, 0);
if (rc < 0 && rc != -ENOENT) {
throw cta::exception::Errnum(-rc, "In RadosStriperWriteFile::RadosStriperWriteFile(): "
"failed to striper->trunc(): ");
}
}
void RadosStriperWriteFile::write(const void *data, const size_t size) {
throw cta::exception::Exception(
"RadosStriperWriteFile::write is not implemented");
::ceph::bufferlist bl;
bl.append((char *)data, size);
int rc = m_striper->write(m_osd, bl, size, m_writePosition);
if (rc) {
throw cta::exception::Errnum(-rc, "In RadosStriperWriteFile::write(): "
"failed to striper->write(): ");
}
m_writePosition += size;
}
void RadosStriperWriteFile::close() {
throw cta::exception::Exception(
"RadosStriperWriteFile::close is not implemented");
// Nothing to do as writes are synchronous
}
RadosStriperWriteFile::~RadosStriperWriteFile() throw() {
}
RadosStriperWriteFile::~RadosStriperWriteFile() throw() {}
}}} //end of namespace diskFile
......@@ -37,6 +37,9 @@
namespace castor {
namespace tape {
// Forward declaration of RadosStriperPool
namespace file { class RadosStriperPool; }
/**
* Namespace managing the reading and writing of files to and from disk.
*/
......@@ -54,7 +57,8 @@ namespace castor {
typedef castor::tape::utils::Regex Regex;
public:
DiskFileFactory(const std::string & remoteFileProtocol,
const std::string & xrootPrivateKey, uint16_t xrootTimeout);
const std::string & xrootPrivateKey, uint16_t xrootTimeout,
castor::tape::file::RadosStriperPool & striperPool);
ReadFile * createReadFile(const std::string & path);
WriteFile * createWriteFile(const std::string & path);
private:
......@@ -71,6 +75,7 @@ namespace castor {
bool m_xrootPrivateKeyLoaded;
const uint16_t m_xrootTimeout;
static cta::threading::Mutex g_rfioOptionsLock;
castor::tape::file::RadosStriperPool & m_striperPool;
public:
/** Return the private key. Read it from the file if necessary. */
......
......@@ -30,7 +30,7 @@
#include "common/exception/Exception.hpp"
#include <xrootd/XrdCl/XrdClFile.hh>
#include <cryptopp/rsa.h>
#include <radosstriper/libradosstriper.hpp>
namespace castor {
namespace tape {
......@@ -176,20 +176,35 @@ namespace castor {
//==============================================================================
// RADOS STRIPER FILES
//==============================================================================
// The Rados striper URLs in CASTOR are in the form:
// radosstriper:///user@pool:filePath
// We will not expect the
class RadosStriperReadFile: public ReadFile {
public:
RadosStriperReadFile(const std::string &xrootUrl);
RadosStriperReadFile(const std::string &fullURL,
libradosstriper::RadosStriper * striper,
const std::string &osd);
virtual size_t size() const;
virtual size_t read(void *data, const size_t size) const;
virtual ~RadosStriperReadFile() throw();
private:
libradosstriper::RadosStriper * m_striper;
std::string m_osd;
mutable size_t m_readPosition;
};
class RadosStriperWriteFile: public WriteFile {
public:
RadosStriperWriteFile(const std::string &xrootUrl);
RadosStriperWriteFile(const std::string &fullURL,
libradosstriper::RadosStriper * striper,
const std::string &osd);
virtual void write(const void *data, const size_t size);
virtual void close();
virtual ~RadosStriperWriteFile() throw();
private:
libradosstriper::RadosStriper * m_striper;
std::string m_osd;
size_t m_writePosition;
};
} //end of namespace diskFile
......
......@@ -27,6 +27,7 @@
#include "castor/tape/tapeserver/drive/FakeDrive.hpp"
#include "castor/tape/tapeserver/file/File.hpp"
#include "castor/tape/tapeserver/file/DiskFile.hpp"
#include "castor/tape/tapeserver/file/RadosStriperPool.hpp"
#include "common/exception/Errnum.hpp"
#include "common/exception/Exception.hpp"
#include "scheduler/ArchiveJob.hpp"
......@@ -267,7 +268,8 @@ namespace unitTests {
const uint32_t block_size = 1024;
char data1[block_size];
char data2[block_size];
castor::tape::diskFile::DiskFileFactory fileFactory("RFIO","", 0);
castor::tape::file::RadosStriperPool striperPool;
castor::tape::diskFile::DiskFileFactory fileFactory("RFIO","", 0, striperPool);
TempFile sourceFile;
sourceFile.randomFill(1000);
TempFile destinationFile(sourceFile.path()+"_dst");
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "RadosStriperPool.hpp"
#include <stdexcept>
#include "common/exception/Errnum.hpp"
namespace {
//------------------------------------------------------------------------------
// RAII decorator for librados::Rados for local usage
//------------------------------------------------------------------------------
class ReleasingRados: public librados::Rados {
public:
ReleasingRados(): m_released(false) {};
void release() { m_released = true; }
~ReleasingRados() { if(!m_released) librados::Rados::shutdown(); }
private:
bool m_released;
};
}
namespace castor {
namespace tape {
namespace file {
//------------------------------------------------------------------------------
// Accessor to next striper pool index
// Note that this is not thread safe, but we do not care
// as we only want a rough load balancing
//------------------------------------------------------------------------------
unsigned int RadosStriperPool::getStriperIdxAndIncrease() {
if (m_maxStriperIdx == 0) {
// initialization phase :
// - find out the number of objects in the ceph pool
// - allocate corresponding places in the vectors
char *value = 0;
m_maxStriperIdx = 3;
if ((value = getenv("CEPH_NBCONNECTIONS"))) {
// TODO: commited for CTA (value = getconfent("CEPH", "NbConnections", 1))) {
m_maxStriperIdx = atoi(value);
}
for (unsigned int i = 0; i < m_maxStriperIdx; i++) {
m_stripers.push_back(StriperDict());
}
}
unsigned int res = m_striperIdx;
unsigned nextValue = m_striperIdx+1;
if (nextValue >= m_maxStriperIdx) {
nextValue = 0;
}
m_striperIdx = nextValue;
return res;
}
//------------------------------------------------------------------------------
// RadosStriperPool::throwingGetStriper
//------------------------------------------------------------------------------
libradosstriper::RadosStriper* RadosStriperPool::throwingGetStriper(const std::string& userAtPool) {
std::lock_guard<std::mutex> lock{m_mutex};
unsigned int striperIdx = getStriperIdxAndIncrease();
try {
return m_stripers[striperIdx].at(userAtPool);
} catch (std::out_of_range &) {
// we need to create a new radosStriper, as the requested one is not there yet.
// First find the user id (if any given) in the pool string
// format is [<userid>@]<poolname>
const char* userId = NULL;
size_t pos = userAtPool.find('@');
std::string user;
std::string pool;
if (pos != std::string::npos) {
user = userAtPool.substr(0, pos);
userId = user.c_str();
pool = userAtPool.substr(pos + 1);
} else {
pool = userAtPool;
}
// Create the Rados object. It will shutdown automatically when being destructed.
ReleasingRados cluster;
cta::exception::Errnum::throwOnReturnedErrno(cluster.init(userId),
"In RadosStriperPool::throwingGetStriper(): failed to cluster.init(userId): ");
cta::exception::Errnum::throwOnReturnedErrno(cluster.conf_read_file(NULL),
"In RadosStriperPool::throwingGetStriper(): failed to cluster.conf_read_file(NULL): ");
cluster.conf_parse_env(NULL);
cta::exception::Errnum::throwOnReturnedErrno(cluster.connect(),
"In RadosStriperPool::throwingGetStriper(): failed to cluster.connect(): ");
librados::IoCtx ioctx;
cta::exception::Errnum::throwOnReturnedErrno(
cluster.ioctx_create(pool.c_str(), ioctx),
"In RadosStriperPool::throwingGetStriper(): failed to "
"cluster.ioctx_create(pool.c_str(), ioctx): ");
std::unique_ptr<libradosstriper::RadosStriper> newStriper(
new libradosstriper::RadosStriper);
cta::exception::Errnum::throwOnReturnedErrno(
libradosstriper::RadosStriper::striper_create(ioctx, newStriper.get()),
"In RadosStriperPool::throwingGetStriper(): failed to "
"libradosstriper::RadosStriper::striper_create(ioctx, newStriper.get()): ");
// Past that point we should not automatically release the cluster anymore.
cluster.release();
// setup file layout
newStriper->set_object_layout_stripe_count(4);
newStriper->set_object_layout_stripe_unit(32 * 1024 * 1024); // 32 MB
newStriper->set_object_layout_object_size(32 * 1024 * 1024); // 32 MB
// insert into cache and return value
libradosstriper::RadosStriper * ret = newStriper.get();
m_stripers[striperIdx][userAtPool] = newStriper.release();
return ret;
}
}
//------------------------------------------------------------------------------
// RadosStriperPool::getStriper
//------------------------------------------------------------------------------
libradosstriper::RadosStriper* RadosStriperPool::getStriper(const std::string& userAtPool) {
try {
return throwingGetStriper(userAtPool);
} catch (...) {
return NULL;
}
}
//------------------------------------------------------------------------------
// RadosStriperPool::~RadosStriperPool
//------------------------------------------------------------------------------
RadosStriperPool::~RadosStriperPool() {
disconnectAll();
}
//------------------------------------------------------------------------------
// RadosStriperPool::disconnectAll
//------------------------------------------------------------------------------
void RadosStriperPool::disconnectAll() {
std::lock_guard<std::mutex> lock{m_mutex};
for (auto v = m_stripers.begin(); v != m_stripers.end(); v++) {
for (auto i = v->begin(); i != v->end(); i++) {
delete i->second;
}
v->clear();
}
m_stripers.clear();
}
} // namespace file
} // namespace tape
} // namespace castor
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <radosstriper/libradosstriper.hpp>
#include <map>
#include <memory>
#include <mutex>
namespace castor {
namespace tape {
namespace file {
/**
* Utility singleton managing the rados stripers connections by name.
* The destructor will implicitly release the pool connections.
*/
class RadosStriperPool{
public:
/** constructor */
RadosStriperPool() : m_maxStriperIdx(0), m_striperIdx(0) {};
/**
* Get pointer to a connection to the rados user (or one from the cache).
* This function throws exceptions in case of problem.
*/
libradosstriper::RadosStriper * throwingGetStriper(const std::string & userAtPool);
/**
* Get pointer to a connection to the rados user (or one from the cache).
* This function returns NULL in case of problem.
*/
libradosstriper::RadosStriper * getStriper(const std::string & userAtPool);
/**
* Clear the map of all connections
*/
void disconnectAll();
/** Destructor that will delete the held objects (needed in SLC6, see
* m_stripers declaration. */
virtual ~RadosStriperPool();
private:
/// Accessor to next striper pool index