diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskReadTaskTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskReadTaskTest.cpp index 215e5612400c1dbca1dcd0375ede7bf0dda06e6a..21aac5467654a03e779b036767ba8e2572939717 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskReadTaskTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskReadTaskTest.cpp @@ -25,6 +25,7 @@ #include "castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp" #include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp" #include "castor/tape/tapeserver/daemon/MemBlock.hpp" +#include "castor/tape/tapeserver/file/RadosStriperPool.hpp" #include "common/log/LogContext.hpp" #include "common/log/StringLogger.hpp" #include "castor/messages/TapeserverProxyDummy.hpp" @@ -132,7 +133,9 @@ namespace unitTests{ FakeTapeWriteTask ftwt; ftwt.pushDataBlock(new MemBlock(1,blockSize)); castor::tape::tapeserver::daemon::DiskReadTask drt(ftwt,&file,blockNeeded,flag); - DiskFileFactory fileFactory("RFIO","", 0); + castor::tape::file::RadosStriperPool striperPool; + DiskFileFactory fileFactory("RFIO", "", 0, striperPool); + castor::messages::TapeserverProxyDummy tspd; MockMigrationWatchDog mmwd(1.0, 1.0, tspd, "", lc); drt.execute(lc,fileFactory,mmwd); diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskReadThreadPool.hpp b/tapeserver/castor/tape/tapeserver/daemon/DiskReadThreadPool.hpp index 819d0cdae371032950c6dbafb82b8f646e7d14df..123e6e782b90f12844744fb6c49a6b956b0c3f38 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskReadThreadPool.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskReadThreadPool.hpp @@ -26,6 +26,7 @@ #include "castor/tape/tapeserver/daemon/DiskReadTask.hpp" #include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp" +#include "castor/tape/tapeserver/file/RadosStriperPool.hpp" #include "common/threading/BlockingQueue.hpp" #include "common/threading/Threading.hpp" #include "common/threading/AtomicCounter.hpp" @@ -147,7 +148,7 @@ private: DiskReadWorkerThread(DiskReadThreadPool & parent): m_parent(parent),m_threadID(parent.m_nbActiveThread++),m_lc(parent.m_lc), m_diskFileFactory(parent.m_remoteFileProtocol, parent.m_xrootPrivateKeyPath, - parent.m_xrootTimeout){ + parent.m_xrootTimeout, parent.m_striperPool){ cta::log::LogContext::ScopedParam param(m_lc, cta::log::Param("threadID", m_threadID)); m_lc.log(cta::log::INFO,"DisReadThread created"); } @@ -204,7 +205,12 @@ private: * Parameter: xroot timeout */ uint16_t m_xrootTimeout; - + + /** + * A pool of rados striper connections, to be shared by all threads + */ + castor::tape::file::RadosStriperPool m_striperPool; + /** * Reference to the watchdog, for error reporting. */ diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp index 5e2d874122c3db8e785f9517bcf6d8998ded2820..21a06dde46e666eaa873c91111c0e5d144146e04 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp @@ -101,7 +101,8 @@ namespace unitTests{ TestingRetrieveMount trm(std::move(dbrm)); MockRecallReportPacker report(&trm,lc); RecallMemoryManager mm(10,100,lc); - DiskFileFactory fileFactory("RFIO","", 0); + castor::tape::file::RadosStriperPool striperPool; + DiskFileFactory fileFactory("RFIO","", 0, striperPool); cta::MockRetrieveMount mrm; std::unique_ptr<TestingRetrieveJob> fileToRecall(new TestingRetrieveJob(mrm)); diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPool.hpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPool.hpp index cbc42aba562a51b2b12f862249ebfe9554d489e9..94193dda84f02da84a50c660b031ca2d6e904b84 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPool.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPool.hpp @@ -26,13 +26,13 @@ #include "common/threading/BlockingQueue.hpp" #include "common/threading/Threading.hpp" #include "common/threading/AtomicCounter.hpp" - #include "common/log/LogContext.hpp" #include "castor/tape/tapeserver/utils/suppressUnusedVariable.hpp" #include "castor/tape/tapeserver/daemon/RecallReportPacker.hpp" #include "castor/tape/tapeserver/daemon/DiskWriteTask.hpp" #include "castor/tape/tapeserver/daemon/DiskStats.hpp" #include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp" +#include "castor/tape/tapeserver/file/RadosStriperPool.hpp" #include "common/Timer.hpp" #include <vector> #define __STDC_FORMAT_MACROS @@ -114,7 +114,7 @@ private: m_threadID(manager.m_nbActiveThread++),m_parentThreadPool(manager), m_lc(m_parentThreadPool.m_lc), m_diskFileFactory(manager.m_remoteFileProtocol, manager.m_xrootPrivateKeyPath, - manager.m_xrootTimeout) + manager.m_xrootTimeout, manager.m_striperPool) { // This thread Id will remain for the rest of the thread's lifetime (and // also context's lifetime) so ne need for a scope. @@ -199,6 +199,11 @@ protected: */ uint16_t m_xrootTimeout; + /** + * A pool of rados striper connections, to be shared by all threads + */ + castor::tape::file::RadosStriperPool m_striperPool; + private: /** * Aggregate all threads' stats diff --git a/tapeserver/castor/tape/tapeserver/file/CMakeLists.txt b/tapeserver/castor/tape/tapeserver/file/CMakeLists.txt index ae2af7686de289a206f1ae1ef9742163f800027a..04b89a60fbeafab885b9ed4e71e415ebd1cb3efc 100644 --- a/tapeserver/castor/tape/tapeserver/file/CMakeLists.txt +++ b/tapeserver/castor/tape/tapeserver/file/CMakeLists.txt @@ -28,6 +28,7 @@ include_directories (${XROOTD_INCLUDE_DIR}) set(TAPESERVER_FILE_LIBRARY_SRCS File.cpp DiskFile.cpp + RadosStriperPool.cpp Structures.cpp ../exception/XrootCl.cpp) @@ -42,7 +43,7 @@ endif(CMAKE_COMPILER_IS_GNUCC) add_library(File ${TAPESERVER_FILE_LIBRARY_SRCS}) -target_link_libraries (File XrdCl cryptopp) +target_link_libraries (File XrdCl cryptopp radosstriper) if(CMAKE_COMPILER_IS_GNUCC) if(GCC_VERSION_GE_4_8_0) diff --git a/tapeserver/castor/tape/tapeserver/file/CryptoPPTest.cpp b/tapeserver/castor/tape/tapeserver/file/CryptoPPTest.cpp index 22a3599badbe70f94279d859b0559bb8f05f71f3..90dc556131b5ad040431dfa3c1021b632e1bf1ba 100644 --- a/tapeserver/castor/tape/tapeserver/file/CryptoPPTest.cpp +++ b/tapeserver/castor/tape/tapeserver/file/CryptoPPTest.cpp @@ -25,6 +25,7 @@ #include "common/threading/Threading.hpp" #include "castor/tape/tapeserver/file/DiskFileImplementations.hpp" #include "castor/tape/tapeserver/file/DiskFile.hpp" +#include "castor/tape/tapeserver/file/RadosStriperPool.hpp" #include <cryptopp/base64.h> #include <cryptopp/osrng.h> @@ -163,7 +164,9 @@ namespace unitTests { } private: virtual void run() { - castor::tape::diskFile::DiskFileFactory dff("xroot", m_keyPath, 0); + castor::tape::file::RadosStriperPool striperPool; + castor::tape::diskFile::DiskFileFactory dff("xroot", m_keyPath, 0, + striperPool); for (int i=0; i<5; i++) { // Read keys in parallel and in a loop to test MT protection of the // key reading, not protected here. diff --git a/tapeserver/castor/tape/tapeserver/file/DiskFile.cpp b/tapeserver/castor/tape/tapeserver/file/DiskFile.cpp index 2925c4a4ce685d2202c215bd242eadd61973e781..52da3a73c165babc44334be2320c3623b4653733 100644 --- a/tapeserver/castor/tape/tapeserver/file/DiskFile.cpp +++ b/tapeserver/castor/tape/tapeserver/file/DiskFile.cpp @@ -25,8 +25,10 @@ #include "castor/common/CastorConfiguration.hpp" #include "castor/tape/tapeserver/file/DiskFile.hpp" #include "castor/tape/tapeserver/file/DiskFileImplementations.hpp" +#include "castor/tape/tapeserver/file/RadosStriperPool.hpp" #include "common/exception/Errnum.hpp" #include "common/threading/MutexLocker.hpp" +#include <rados/buffer.h> #include <xrootd/XrdCl/XrdClFile.hh> #include <uuid/uuid.h> #include <algorithm> @@ -38,18 +40,20 @@ namespace tape { namespace diskFile { DiskFileFactory::DiskFileFactory(const std::string & remoteFileProtocol, - const std::string & xrootPrivateKeyFile, uint16_t xrootTimeout): + const std::string & xrootPrivateKeyFile, uint16_t xrootTimeout, + castor::tape::file::RadosStriperPool & striperPool): m_NoURLLocalFile("^(localhost:|)(/.*)$"), m_NoURLRemoteFile("^([^:]*:)(.*)$"), m_NoURLRadosStriperFile("^localhost:([^/]+)/(.*)$"), m_URLLocalFile("^file://(.*)$"), m_URLEosFile("^eos://(.*)$"), m_URLXrootFile("^(root://.*)$"), - m_URLCephFile("^radosStriper://(.*)$"), + m_URLCephFile("^radosstriper:///([^:]+@[^:]+):(.*)$"), m_remoteFileProtocol(remoteFileProtocol), m_xrootPrivateKeyFile(xrootPrivateKeyFile), m_xrootPrivateKeyLoaded(false), - m_xrootTimeout(xrootTimeout) + m_xrootTimeout(xrootTimeout), + m_striperPool(striperPool) { // Lowercase the protocol string std::transform(m_remoteFileProtocol.begin(), m_remoteFileProtocol.end(), @@ -146,7 +150,9 @@ ReadFile * DiskFileFactory::createReadFile(const std::string& path) { // radosStriper URL? regexResult = m_URLCephFile.exec(path); if (regexResult.size()) { - return new RadosStriperReadFile(regexResult[1]); + return new RadosStriperReadFile(regexResult[0], + m_striperPool.throwingGetStriper(regexResult[1]), + regexResult[2]); } // No URL path parsing // Do we have a local file? @@ -193,7 +199,9 @@ WriteFile * DiskFileFactory::createWriteFile(const std::string& path) { // radosStriper URL? regexResult = m_URLCephFile.exec(path); if (regexResult.size()) { - return new RadosStriperWriteFile(regexResult[1]); + return new RadosStriperWriteFile(regexResult[0], + m_striperPool.throwingGetStriper(regexResult[1]), + regexResult[2]); } // No URL path parsing // Do we have a local file? @@ -589,43 +597,67 @@ EosWriteFile::~EosWriteFile() throw() { //============================================================================== // RADOS STRIPER READ FILE //============================================================================== -RadosStriperReadFile::RadosStriperReadFile(const std::string &url){ - throw cta::exception::Exception( - "RadosStriperReadFile::RadosStriperReadFile is not implemented"); +RadosStriperReadFile::RadosStriperReadFile(const std::string &fullURL, + libradosstriper::RadosStriper * striper, + const std::string &osd): m_striper(striper), + m_osd(osd), m_readPosition(0) { + m_URL=fullURL; } size_t RadosStriperReadFile::read(void *data, const size_t size) const { - throw cta::exception::Exception( - "RadosStriperReadFile::read is not implemented"); + ::ceph::bufferlist bl; + int rc = m_striper->read(m_osd, &bl, size, m_readPosition); + if (rc < 0) { + throw cta::exception::Errnum(-rc, + "In RadosStriperReadFile::read(): failed to striper->read: "); + } + bl.copy(0, rc, (char *)data); + m_readPosition += rc; + return rc; } size_t RadosStriperReadFile::size() const { - throw cta::exception::Exception( - "RadosStriperReadFile::size is not implemented"); + uint64_t size; + time_t time; + cta::exception::Errnum::throwOnReturnedErrno( + -m_striper->stat(m_osd, &size, &time), + "In RadosStriperReadFile::size(): failed to striper->stat(): "); + return size; } -RadosStriperReadFile::~RadosStriperReadFile() throw() { -} +RadosStriperReadFile::~RadosStriperReadFile() throw() {} //============================================================================== // RADOS STRIPER WRITE FILE //============================================================================== -RadosStriperWriteFile::RadosStriperWriteFile(const std::string &url){ - throw cta::exception::Exception( - "RadosStriperWriteFile::RadosStriperWriteFile is not implemented"); +RadosStriperWriteFile::RadosStriperWriteFile(const std::string &fullURL, + libradosstriper::RadosStriper * striper, + const std::string &osd): m_striper(striper), + m_osd(osd), m_writePosition(0) { + m_URL=fullURL; + // Truncate the possibly existing file. If the file does not exist, it's fine. + int rc=m_striper->trunc(m_osd, 0); + if (rc < 0 && rc != -ENOENT) { + throw cta::exception::Errnum(-rc, "In RadosStriperWriteFile::RadosStriperWriteFile(): " + "failed to striper->trunc(): "); + } } void RadosStriperWriteFile::write(const void *data, const size_t size) { - throw cta::exception::Exception( - "RadosStriperWriteFile::write is not implemented"); + ::ceph::bufferlist bl; + bl.append((char *)data, size); + int rc = m_striper->write(m_osd, bl, size, m_writePosition); + if (rc) { + throw cta::exception::Errnum(-rc, "In RadosStriperWriteFile::write(): " + "failed to striper->write(): "); + } + m_writePosition += size; } void RadosStriperWriteFile::close() { - throw cta::exception::Exception( - "RadosStriperWriteFile::close is not implemented"); + // Nothing to do as writes are synchronous } -RadosStriperWriteFile::~RadosStriperWriteFile() throw() { -} +RadosStriperWriteFile::~RadosStriperWriteFile() throw() {} }}} //end of namespace diskFile diff --git a/tapeserver/castor/tape/tapeserver/file/DiskFile.hpp b/tapeserver/castor/tape/tapeserver/file/DiskFile.hpp index 117eeb88f7f25a6c9abec0c82fdf5133ada4d471..f73477e5eb530bc8181bdacf0f45874fc03bb514 100644 --- a/tapeserver/castor/tape/tapeserver/file/DiskFile.hpp +++ b/tapeserver/castor/tape/tapeserver/file/DiskFile.hpp @@ -37,6 +37,9 @@ namespace castor { namespace tape { + // Forward declaration of RadosStriperPool + namespace file { class RadosStriperPool; } + /** * Namespace managing the reading and writing of files to and from disk. */ @@ -54,7 +57,8 @@ namespace castor { typedef castor::tape::utils::Regex Regex; public: DiskFileFactory(const std::string & remoteFileProtocol, - const std::string & xrootPrivateKey, uint16_t xrootTimeout); + const std::string & xrootPrivateKey, uint16_t xrootTimeout, + castor::tape::file::RadosStriperPool & striperPool); ReadFile * createReadFile(const std::string & path); WriteFile * createWriteFile(const std::string & path); private: @@ -71,6 +75,7 @@ namespace castor { bool m_xrootPrivateKeyLoaded; const uint16_t m_xrootTimeout; static cta::threading::Mutex g_rfioOptionsLock; + castor::tape::file::RadosStriperPool & m_striperPool; public: /** Return the private key. Read it from the file if necessary. */ diff --git a/tapeserver/castor/tape/tapeserver/file/DiskFileImplementations.hpp b/tapeserver/castor/tape/tapeserver/file/DiskFileImplementations.hpp index ee9ae30013d90e549ab140c065e57e32f358c9bd..f2433e2394aa573a459d3c0bdd61db0c54ca6b6b 100644 --- a/tapeserver/castor/tape/tapeserver/file/DiskFileImplementations.hpp +++ b/tapeserver/castor/tape/tapeserver/file/DiskFileImplementations.hpp @@ -30,7 +30,7 @@ #include "common/exception/Exception.hpp" #include <xrootd/XrdCl/XrdClFile.hh> #include <cryptopp/rsa.h> - +#include <radosstriper/libradosstriper.hpp> namespace castor { namespace tape { @@ -176,20 +176,35 @@ namespace castor { //============================================================================== // RADOS STRIPER FILES //============================================================================== + // The Rados striper URLs in CASTOR are in the form: + // radosstriper:///user@pool:filePath + // We will not expect the class RadosStriperReadFile: public ReadFile { public: - RadosStriperReadFile(const std::string &xrootUrl); + RadosStriperReadFile(const std::string &fullURL, + libradosstriper::RadosStriper * striper, + const std::string &osd); virtual size_t size() const; virtual size_t read(void *data, const size_t size) const; virtual ~RadosStriperReadFile() throw(); + private: + libradosstriper::RadosStriper * m_striper; + std::string m_osd; + mutable size_t m_readPosition; }; class RadosStriperWriteFile: public WriteFile { public: - RadosStriperWriteFile(const std::string &xrootUrl); + RadosStriperWriteFile(const std::string &fullURL, + libradosstriper::RadosStriper * striper, + const std::string &osd); virtual void write(const void *data, const size_t size); virtual void close(); virtual ~RadosStriperWriteFile() throw(); + private: + libradosstriper::RadosStriper * m_striper; + std::string m_osd; + size_t m_writePosition; }; } //end of namespace diskFile diff --git a/tapeserver/castor/tape/tapeserver/file/FileTest.cpp b/tapeserver/castor/tape/tapeserver/file/FileTest.cpp index 499951ff06d8d6a77fbc2878d2eedb1b3c9dbfe6..a4d323d2e4186e247afccc0e836c7511df084990 100644 --- a/tapeserver/castor/tape/tapeserver/file/FileTest.cpp +++ b/tapeserver/castor/tape/tapeserver/file/FileTest.cpp @@ -27,6 +27,7 @@ #include "castor/tape/tapeserver/drive/FakeDrive.hpp" #include "castor/tape/tapeserver/file/File.hpp" #include "castor/tape/tapeserver/file/DiskFile.hpp" +#include "castor/tape/tapeserver/file/RadosStriperPool.hpp" #include "common/exception/Errnum.hpp" #include "common/exception/Exception.hpp" #include "scheduler/ArchiveJob.hpp" @@ -267,7 +268,8 @@ namespace unitTests { const uint32_t block_size = 1024; char data1[block_size]; char data2[block_size]; - castor::tape::diskFile::DiskFileFactory fileFactory("RFIO","", 0); + castor::tape::file::RadosStriperPool striperPool; + castor::tape::diskFile::DiskFileFactory fileFactory("RFIO","", 0, striperPool); TempFile sourceFile; sourceFile.randomFill(1000); TempFile destinationFile(sourceFile.path()+"_dst"); diff --git a/tapeserver/castor/tape/tapeserver/file/RadosStriperPool.cpp b/tapeserver/castor/tape/tapeserver/file/RadosStriperPool.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d585d29a35e19504cd13e86b469eff5969d47792 --- /dev/null +++ b/tapeserver/castor/tape/tapeserver/file/RadosStriperPool.cpp @@ -0,0 +1,160 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "RadosStriperPool.hpp" +#include <stdexcept> +#include "common/exception/Errnum.hpp" + +namespace { +//------------------------------------------------------------------------------ +// RAII decorator for librados::Rados for local usage +//------------------------------------------------------------------------------ +class ReleasingRados: public librados::Rados { +public: + ReleasingRados(): m_released(false) {}; + void release() { m_released = true; } + ~ReleasingRados() { if(!m_released) librados::Rados::shutdown(); } +private: + bool m_released; +}; +} + +namespace castor { +namespace tape { +namespace file { + +//------------------------------------------------------------------------------ +// Accessor to next striper pool index +// Note that this is not thread safe, but we do not care +// as we only want a rough load balancing +//------------------------------------------------------------------------------ +unsigned int RadosStriperPool::getStriperIdxAndIncrease() { + if (m_maxStriperIdx == 0) { + // initialization phase : + // - find out the number of objects in the ceph pool + // - allocate corresponding places in the vectors + char *value = 0; + m_maxStriperIdx = 3; + if ((value = getenv("CEPH_NBCONNECTIONS"))) { + // TODO: commited for CTA (value = getconfent("CEPH", "NbConnections", 1))) { + m_maxStriperIdx = atoi(value); + } + for (unsigned int i = 0; i < m_maxStriperIdx; i++) { + m_stripers.push_back(StriperDict()); + } + } + unsigned int res = m_striperIdx; + unsigned nextValue = m_striperIdx+1; + if (nextValue >= m_maxStriperIdx) { + nextValue = 0; + } + m_striperIdx = nextValue; + return res; +} + +//------------------------------------------------------------------------------ +// RadosStriperPool::throwingGetStriper +//------------------------------------------------------------------------------ +libradosstriper::RadosStriper* RadosStriperPool::throwingGetStriper(const std::string& userAtPool) { + std::lock_guard<std::mutex> lock{m_mutex}; + unsigned int striperIdx = getStriperIdxAndIncrease(); + try { + return m_stripers[striperIdx].at(userAtPool); + } catch (std::out_of_range &) { + // we need to create a new radosStriper, as the requested one is not there yet. + // First find the user id (if any given) in the pool string + // format is [<userid>@]<poolname> + const char* userId = NULL; + size_t pos = userAtPool.find('@'); + std::string user; + std::string pool; + if (pos != std::string::npos) { + user = userAtPool.substr(0, pos); + userId = user.c_str(); + pool = userAtPool.substr(pos + 1); + } else { + pool = userAtPool; + } + // Create the Rados object. It will shutdown automatically when being destructed. + ReleasingRados cluster; + cta::exception::Errnum::throwOnReturnedErrno(cluster.init(userId), + "In RadosStriperPool::throwingGetStriper(): failed to cluster.init(userId): "); + cta::exception::Errnum::throwOnReturnedErrno(cluster.conf_read_file(NULL), + "In RadosStriperPool::throwingGetStriper(): failed to cluster.conf_read_file(NULL): "); + cluster.conf_parse_env(NULL); + cta::exception::Errnum::throwOnReturnedErrno(cluster.connect(), + "In RadosStriperPool::throwingGetStriper(): failed to cluster.connect(): "); + librados::IoCtx ioctx; + cta::exception::Errnum::throwOnReturnedErrno( + cluster.ioctx_create(pool.c_str(), ioctx), + "In RadosStriperPool::throwingGetStriper(): failed to " + "cluster.ioctx_create(pool.c_str(), ioctx): "); + std::unique_ptr<libradosstriper::RadosStriper> newStriper( + new libradosstriper::RadosStriper); + cta::exception::Errnum::throwOnReturnedErrno( + libradosstriper::RadosStriper::striper_create(ioctx, newStriper.get()), + "In RadosStriperPool::throwingGetStriper(): failed to " + "libradosstriper::RadosStriper::striper_create(ioctx, newStriper.get()): "); + // Past that point we should not automatically release the cluster anymore. + cluster.release(); + // setup file layout + newStriper->set_object_layout_stripe_count(4); + newStriper->set_object_layout_stripe_unit(32 * 1024 * 1024); // 32 MB + newStriper->set_object_layout_object_size(32 * 1024 * 1024); // 32 MB + // insert into cache and return value + libradosstriper::RadosStriper * ret = newStriper.get(); + m_stripers[striperIdx][userAtPool] = newStriper.release(); + return ret; + } +} + +//------------------------------------------------------------------------------ +// RadosStriperPool::getStriper +//------------------------------------------------------------------------------ +libradosstriper::RadosStriper* RadosStriperPool::getStriper(const std::string& userAtPool) { + try { + return throwingGetStriper(userAtPool); + } catch (...) { + return NULL; + } +} + +//------------------------------------------------------------------------------ +// RadosStriperPool::~RadosStriperPool +//------------------------------------------------------------------------------ +RadosStriperPool::~RadosStriperPool() { + disconnectAll(); +} + +//------------------------------------------------------------------------------ +// RadosStriperPool::disconnectAll +//------------------------------------------------------------------------------ +void RadosStriperPool::disconnectAll() { + std::lock_guard<std::mutex> lock{m_mutex}; + for (auto v = m_stripers.begin(); v != m_stripers.end(); v++) { + for (auto i = v->begin(); i != v->end(); i++) { + delete i->second; + } + v->clear(); + } + m_stripers.clear(); +} + +} // namespace file +} // namespace tape +} // namespace castor diff --git a/tapeserver/castor/tape/tapeserver/file/RadosStriperPool.hpp b/tapeserver/castor/tape/tapeserver/file/RadosStriperPool.hpp new file mode 100644 index 0000000000000000000000000000000000000000..63fdd3aaca8e08b052b41e67df88af2492cb791f --- /dev/null +++ b/tapeserver/castor/tape/tapeserver/file/RadosStriperPool.hpp @@ -0,0 +1,83 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <radosstriper/libradosstriper.hpp> +#include <map> +#include <memory> +#include <mutex> + + +namespace castor { +namespace tape { +namespace file { +/** + * Utility singleton managing the rados stripers connections by name. + * The destructor will implicitly release the pool connections. + */ +class RadosStriperPool{ +public: + + /** constructor */ + RadosStriperPool() : m_maxStriperIdx(0), m_striperIdx(0) {}; + + /** + * Get pointer to a connection to the rados user (or one from the cache). + * This function throws exceptions in case of problem. + */ + libradosstriper::RadosStriper * throwingGetStriper(const std::string & userAtPool); + + /** + * Get pointer to a connection to the rados user (or one from the cache). + * This function returns NULL in case of problem. + */ + libradosstriper::RadosStriper * getStriper(const std::string & userAtPool); + + /** + * Clear the map of all connections + */ + void disconnectAll(); + + /** Destructor that will delete the held objects (needed in SLC6, see + * m_stripers declaration. */ + virtual ~RadosStriperPool(); + +private: + + /// Accessor to next striper pool index + unsigned int getStriperIdxAndIncrease(); + +private: + + // We use a map of pointers instead of maps of unique_ptr who do not work in + // gcc 4.4 (in SLC 6) + typedef std::map<std::string, libradosstriper::RadosStriper *> StriperDict; + /// striper pool + std::vector<StriperDict> m_stripers; + /// mutex protecting the striper pool + std::mutex m_mutex; + /// size of the Striper pool + unsigned int m_maxStriperIdx; + /// index of current striper pool to be used + unsigned int m_striperIdx; +}; + +} // namespace file +} // namespace tape +} // namespace castor diff --git a/tapeserver/castor/tape/tapeserver/utils/RegexTest.cpp b/tapeserver/castor/tape/tapeserver/utils/RegexTest.cpp index 3086c9cbca158cb43f97961edb815acf2ba51c89..616561024c245650953ebfe8bc0b00e83f2fae18 100644 --- a/tapeserver/castor/tape/tapeserver/utils/RegexTest.cpp +++ b/tapeserver/castor/tape/tapeserver/utils/RegexTest.cpp @@ -60,5 +60,15 @@ TEST(castor_tape_utils_Regex, OperationalTest) { ASSERT_EQ(ret3.size(), 0U); } + +TEST(castor_tape_utils_Regex, SubstringMatch) { + castor::tape::utils::Regex re("^radosstriper:///([^:]+@[^:]+):(.*)$"); + std::vector<std::string> ret1; + ret1 = re.exec("radosstriper:///user@pool:12345@castorns.7890"); + + ASSERT_EQ(ret1.size(), 3U); + ASSERT_EQ(ret1[1], "user@pool"); + ASSERT_EQ(ret1[2], "12345@castorns.7890"); +} }