diff --git a/continuousintegration/docker/ctafrontend/cc7/etc/yum/pluginconf.d/versionlock.list b/continuousintegration/docker/ctafrontend/cc7/etc/yum/pluginconf.d/versionlock.list index 9848b693b7669f82245b5b40ed9e8af388cd1243..7e45d5f801c76205d49193baf1b3fbceed82b3f1 100644 --- a/continuousintegration/docker/ctafrontend/cc7/etc/yum/pluginconf.d/versionlock.list +++ b/continuousintegration/docker/ctafrontend/cc7/etc/yum/pluginconf.d/versionlock.list @@ -1,14 +1,17 @@ -0:eos-archive-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 -0:eos-cleanup-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 -0:eos-client-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 -0:eos-debuginfo-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 -0:eos-fuse-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 -0:eos-fuse-core-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 -0:eos-fuse-sysv-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 -0:eos-server-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 -0:eos-srm-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 -0:eos-test-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 -0:eos-testkeytab-4.1.26-20170824172038gitc7cf4c6.el7.cern.x86_64 +0:eos-archive-4.2.4-20171206151625gitda6f67d.el7.cern.x86_64 +0:eos-cleanup-4.2.4-20171206151625gitda6f67d.el7.cern.x86_64 +0:eos-client-4.2.4-20171206151625gitda6f67d.el7.cern.x86_64 +0:eos-debuginfo-4.2.4-20171206151625gitda6f67d.el7.cern.x86_64 +0:eos-fuse-4.2.4-20171206151625gitda6f67d.el7.cern.x86_64 +0:eos-fuse-core-4.2.4-20171206151625gitda6f67d.el7.cern.x86_64 +0:eos-fuse-sysv-4.2.4-20171206151625gitda6f67d.el7.cern.x86_64 +0:eos-fusex-4.2.4-20171206151625gitda6f67d.el7.cern.x86_64 +0:eos-fusex-core-4.2.4-20171206151625gitda6f67d.el7.cern.x86_64 +0:eos-fusex-selinux-4.2.4-20171206151625gitda6f67d.el7.cern.x86_64 +0:eos-server-4.2.4-20171206151625gitda6f67d.el7.cern.x86_64 +0:eos-srm-4.2.4-20171206151625gitda6f67d.el7.cern.x86_64 +0:eos-test-4.2.4-20171206151625gitda6f67d.el7.cern.x86_64 +0:eos-testkeytab-4.2.4-20171206151625gitda6f67d.el7.cern.x86_64 1:python2-xrootd-4.8.0-0.rc1.el7.x86_64 1:python3-xrootd-4.8.0-0.rc1.el7.x86_64 1:xrootd-4.8.0-0.rc1.el7.x86_64 @@ -26,43 +29,43 @@ 1:xrootd-server-devel-4.8.0-0.rc1.el7.x86_64 1:xrootd-server-libs-4.8.0-0.rc1.el7.x86_64 1:xrootd-tests-4.8.0-0.rc1.el7.x86_64 -2:ceph-12.2.0-0.el7.x86_64 -2:ceph-base-12.2.0-0.el7.x86_64 -2:ceph-common-12.2.0-0.el7.x86_64 -2:ceph-debuginfo-12.2.0-0.el7.x86_64 -2:cephfs-java-12.2.0-0.el7.x86_64 -2:ceph-fuse-12.2.0-0.el7.x86_64 -2:ceph-mds-12.2.0-0.el7.x86_64 -2:ceph-mgr-12.2.0-0.el7.x86_64 -2:ceph-mon-12.2.0-0.el7.x86_64 -2:ceph-osd-12.2.0-0.el7.x86_64 -2:ceph-radosgw-12.2.0-0.el7.x86_64 -2:ceph-resource-agents-12.2.0-0.el7.x86_64 -2:ceph-selinux-12.2.0-0.el7.x86_64 -2:ceph-test-12.2.0-0.el7.x86_64 -2:libcephfs2-12.2.0-0.el7.x86_64 -2:libcephfs-devel-12.2.0-0.el7.x86_64 -2:libcephfs_jni1-12.2.0-0.el7.x86_64 -2:libcephfs_jni-devel-12.2.0-0.el7.x86_64 -2:librados2-12.2.0-0.el7.x86_64 -2:librados-devel-12.2.0-0.el7.x86_64 -2:libradosstriper1-12.2.0-0.el7.x86_64 -2:libradosstriper-devel-12.2.0-0.el7.x86_64 -2:librbd1-12.2.0-0.el7.x86_64 -2:librbd-devel-12.2.0-0.el7.x86_64 -2:librgw2-12.2.0-0.el7.x86_64 -2:librgw-devel-12.2.0-0.el7.x86_64 -2:python34-ceph-argparse-12.2.0-0.el7.x86_64 -2:python34-cephfs-12.2.0-0.el7.x86_64 -2:python34-rados-12.2.0-0.el7.x86_64 -2:python34-rbd-12.2.0-0.el7.x86_64 -2:python34-rgw-12.2.0-0.el7.x86_64 -2:python-ceph-compat-12.2.0-0.el7.x86_64 -2:python-cephfs-12.2.0-0.el7.x86_64 -2:python-rados-12.2.0-0.el7.x86_64 -2:python-rbd-12.2.0-0.el7.x86_64 -2:python-rgw-12.2.0-0.el7.x86_64 -2:rados-objclass-devel-12.2.0-0.el7.x86_64 -2:rbd-fuse-12.2.0-0.el7.x86_64 -2:rbd-mirror-12.2.0-0.el7.x86_64 -2:rbd-nbd-12.2.0-0.el7.x86_64 +2:ceph-12.2.2-0.el7.x86_64 +2:ceph-base-12.2.2-0.el7.x86_64 +2:ceph-common-12.2.2-0.el7.x86_64 +2:ceph-debuginfo-12.2.2-0.el7.x86_64 +2:cephfs-java-12.2.2-0.el7.x86_64 +2:ceph-fuse-12.2.2-0.el7.x86_64 +2:ceph-mds-12.2.2-0.el7.x86_64 +2:ceph-mgr-12.2.2-0.el7.x86_64 +2:ceph-mon-12.2.2-0.el7.x86_64 +2:ceph-osd-12.2.2-0.el7.x86_64 +2:ceph-radosgw-12.2.2-0.el7.x86_64 +2:ceph-resource-agents-12.2.2-0.el7.x86_64 +2:ceph-selinux-12.2.2-0.el7.x86_64 +2:ceph-test-12.2.2-0.el7.x86_64 +2:libcephfs2-12.2.2-0.el7.x86_64 +2:libcephfs-devel-12.2.2-0.el7.x86_64 +2:libcephfs_jni1-12.2.2-0.el7.x86_64 +2:libcephfs_jni-devel-12.2.2-0.el7.x86_64 +2:librados2-12.2.2-0.el7.x86_64 +2:librados-devel-12.2.2-0.el7.x86_64 +2:libradosstriper1-12.2.2-0.el7.x86_64 +2:libradosstriper-devel-12.2.2-0.el7.x86_64 +2:librbd1-12.2.2-0.el7.x86_64 +2:librbd-devel-12.2.2-0.el7.x86_64 +2:librgw2-12.2.2-0.el7.x86_64 +2:librgw-devel-12.2.2-0.el7.x86_64 +2:python34-ceph-argparse-12.2.2-0.el7.x86_64 +2:python34-cephfs-12.2.2-0.el7.x86_64 +2:python34-rados-12.2.2-0.el7.x86_64 +2:python34-rbd-12.2.2-0.el7.x86_64 +2:python34-rgw-12.2.2-0.el7.x86_64 +2:python-ceph-compat-12.2.2-0.el7.x86_64 +2:python-cephfs-12.2.2-0.el7.x86_64 +2:python-rados-12.2.2-0.el7.x86_64 +2:python-rbd-12.2.2-0.el7.x86_64 +2:python-rgw-12.2.2-0.el7.x86_64 +2:rados-objclass-devel-12.2.2-0.el7.x86_64 +2:rbd-fuse-12.2.2-0.el7.x86_64 +2:rbd-mirror-12.2.2-0.el7.x86_64 +2:rbd-nbd-12.2.2-0.el7.x86_64 diff --git a/continuousintegration/docker/ctafrontend/cc7/opt/run/bin/ctaeos-mgm.sh b/continuousintegration/docker/ctafrontend/cc7/opt/run/bin/ctaeos-mgm.sh index 16e05947a02bdad1d7ac12226d0b5b4b6d891d9a..2aac6178715c93b19d21a0ec8b5043c2906c3d72 100755 --- a/continuousintegration/docker/ctafrontend/cc7/opt/run/bin/ctaeos-mgm.sh +++ b/continuousintegration/docker/ctafrontend/cc7/opt/run/bin/ctaeos-mgm.sh @@ -158,7 +158,7 @@ test -e /usr/lib64/libjemalloc.so.1 && export LD_PRELOAD=/usr/lib64/libjemalloc. # ACLs in EOS are evaluated when unix permissions are failing, hence the 555 unix permission. eos mkdir ${CTA_TEST_DIR} eos chmod 555 ${CTA_TEST_DIR} - eos attr set sys.acl=g:eosusers:rwx!d,g:powerusers:rwx+d /eos/ctaeos/cta + eos attr set sys.acl=g:eosusers:rwx!d,u:poweruser1:rwx+dp,u:poweruser2:rwx+dp /eos/ctaeos/cta eos attr set CTA_StorageClass=ctaStorageClass ${CTA_TEST_DIR} diff --git a/continuousintegration/docker/ctafrontend/cc7/opt/run/bin/eos_configure_preprod.sh b/continuousintegration/docker/ctafrontend/cc7/opt/run/bin/eos_configure_preprod.sh index a0831211ad70f95d43b0c6831483670390ff3972..91c09c6417854dde53ffbf682dc76f3f932ea758 100755 --- a/continuousintegration/docker/ctafrontend/cc7/opt/run/bin/eos_configure_preprod.sh +++ b/continuousintegration/docker/ctafrontend/cc7/opt/run/bin/eos_configure_preprod.sh @@ -8,7 +8,7 @@ chmod 755 /var/eos/wfe/bash/* eos mkdir ${PREPROD_DIR} eos chmod 555 ${PREPROD_DIR} -eos attr set sys.acl=g:eosusers:rwx!d,g:powerusers:rwx+d ${PREPROD_DIR} +eos attr set sys.acl=g:eosusers:rwx!d,u:poweruser1:rwx+dp,u:poweruser2:rwx+dp ${PREPROD_DIR} eos attr set CTA_StorageClass=ctaStorageClass ${PREPROD_DIR} diff --git a/continuousintegration/orchestration/tests/archive_retrieve.sh b/continuousintegration/orchestration/tests/archive_retrieve.sh index f77afaec70bc10f8631bd29ae13a06d6b02042c2..d0a78db2402ee22a6a6e6691184a99ca90dac891 100755 --- a/continuousintegration/orchestration/tests/archive_retrieve.sh +++ b/continuousintegration/orchestration/tests/archive_retrieve.sh @@ -35,6 +35,7 @@ echo "Launching simple_client_ar.sh on client pod" echo " Archiving file: xrdcp as user1" echo " Retrieving it as poweruser1" kubectl -n ${NAMESPACE} cp simple_client_ar.sh client:/root/client_ar.sh +kubectl -n ${NAMESPACE} cp client_helper.sh client:/root/client_helper.sh kubectl -n ${NAMESPACE} exec client -- bash /root/simple_client_ar.sh || exit 1 NB_FILES=1000 @@ -46,7 +47,6 @@ echo " Archiving ${NB_FILES} files of ${FILE_SIZE_KB}kB each" echo " Archiving files: xrdcp as user1" echo " Retrieving them as poweruser1" kubectl -n ${NAMESPACE} cp client_ar.sh client:/root/client_ar.sh -kubectl -n ${NAMESPACE} cp client_helper.sh client:/root/client_helper.sh kubectl -n ${NAMESPACE} exec client -- bash /root/client_ar.sh -n ${NB_FILES} -s ${FILE_SIZE_KB} -p 10 -d /eos/ctaeos/preprod -v -r || exit 1 exit 0 diff --git a/continuousintegration/orchestration/tests/client_ar.sh b/continuousintegration/orchestration/tests/client_ar.sh index 2617e2f22e11081bde7187c5d5e47033dd4a1b98..87ec5166592d72680c59b0958e8e1be87025a954 100644 --- a/continuousintegration/orchestration/tests/client_ar.sh +++ b/continuousintegration/orchestration/tests/client_ar.sh @@ -76,6 +76,14 @@ if [[ $VERBOSE == 1 ]]; then TAILPID=$! fi +# get some common useful helpers for krb5 +. /root/client_helper.sh + +# Get kerberos credentials for poweruser1 +eospower_kdestroy +eospower_kinit + + EOS_DIR="${EOS_BASEDIR}/$(uuidgen)" echo "Creating test dir in eos: ${EOS_DIR}" # uuid should be unique no need to remove dir before... @@ -141,7 +149,8 @@ echo "Trigerring EOS retrieve workflow as poweruser1:powerusers (12001:1200)" # XrdSecPROTOCOL=sss xrdfs ${EOSINSTANCE} prepare -s "${EOS_DIR}/${TEST_FILE_NAME}?eos.ruid=12001&eos.rgid=1200" || echo "Could not trigger retrieve for ${EOS_DIR}/${TEST_FILE_NAME}" #done -grep tapeonly$ ${STATUS_FILE} | sed -e 's/ .*$//' | XrdSecPROTOCOL=sss xargs --max-procs=${NB_PROCS} -iTEST_FILE_NAME xrdfs ${EOSINSTANCE} prepare -s "${EOS_DIR}/TEST_FILE_NAME?eos.ruid=12001&eos.rgid=1200" +# We need the -s as we are staging the files from tape (see xrootd prepare definition) +grep tapeonly$ ${STATUS_FILE} | sed -e 's/ .*$//' | KRB5CCNAME=/tmp/${EOSPOWER_USER}/krb5cc_0 XrdSecPROTOCOL=krb5 xargs --max-procs=${NB_PROCS} -iTEST_FILE_NAME xrdfs ${EOSINSTANCE} prepare -s ${EOS_DIR}/TEST_FILE_NAME # Wait for the copy to appear on disk diff --git a/continuousintegration/orchestration/tests/simple_client_ar.sh b/continuousintegration/orchestration/tests/simple_client_ar.sh index 89c92d1b8e9721b833d210025803b7ad45ca4012..8e0517462b2a91186069627e0d9489d1679d4b07 100644 --- a/continuousintegration/orchestration/tests/simple_client_ar.sh +++ b/continuousintegration/orchestration/tests/simple_client_ar.sh @@ -4,6 +4,11 @@ EOSINSTANCE=ctaeos TEST_FILE_NAME=`uuidgen` +# get some common useful helpers for krb5 +. /root/client_helper.sh + +eospower_kdestroy +eospower_kinit echo "xrdcp /etc/group root://${EOSINSTANCE}//eos/ctaeos/cta/${TEST_FILE_NAME}" xrdcp /etc/group root://${EOSINSTANCE}//eos/ctaeos/cta/${TEST_FILE_NAME} @@ -41,9 +46,11 @@ echo "Information about the testing file without disk replica" eos root://${EOSINSTANCE} info /eos/ctaeos/cta/${TEST_FILE_NAME} echo echo "Trigerring EOS retrieve workflow as poweruser1:powerusers (12001:1200)" -echo "XrdSecPROTOCOL=sss xrdfs ${EOSINSTANCE} prepare -s \"/eos/ctaeos/cta/${TEST_FILE_NAME}?eos.ruid=12001&eos.rgid=1200\"" - XrdSecPROTOCOL=sss xrdfs ${EOSINSTANCE} prepare -s "/eos/ctaeos/cta/${TEST_FILE_NAME}?eos.ruid=12001&eos.rgid=1200" +#echo "XrdSecPROTOCOL=sss xrdfs ${EOSINSTANCE} prepare -s \"/eos/ctaeos/cta/${TEST_FILE_NAME}?eos.ruid=12001&eos.rgid=1200\"" +# XrdSecPROTOCOL=sss xrdfs ${EOSINSTANCE} prepare -s "/eos/ctaeos/cta/${TEST_FILE_NAME}?eos.ruid=12001&eos.rgid=1200" +# We need the -s as we are staging the files from tape (see xrootd prepare definition) +KRB5CCNAME=/tmp/${EOSPOWER_USER}/krb5cc_0 XrdSecPROTOCOL=krb5 xrdfs ${EOSINSTANCE} prepare -s /eos/ctaeos/cta/${TEST_FILE_NAME} # Wait for the copy to appear on disk SECONDS_PASSED=0 diff --git a/objectstore/BackendFactory.cpp b/objectstore/BackendFactory.cpp index 7e98d21893d24785e5332bd1c63e66c4878a8de7..f96dae143b2b45cae567141755e5d8b4b1c71fb6 100644 --- a/objectstore/BackendFactory.cpp +++ b/objectstore/BackendFactory.cpp @@ -23,7 +23,7 @@ #include "common/utils/utils.hpp" #include "common/utils/Regex.hpp" -auto cta::objectstore::BackendFactory::createBackend(const std::string& URL) +auto cta::objectstore::BackendFactory::createBackend(const std::string& URL, log::Logger & logger) -> std::unique_ptr<Backend> { utils::Regex fileRe("^file://(.*)$"), radosRe("^rados://([^@]+)@([^:]+)(|:(.+))$"); @@ -39,9 +39,9 @@ auto cta::objectstore::BackendFactory::createBackend(const std::string& URL) if (regexResult.size() != 5 && regexResult.size() != 4) throw cta::exception::Exception("In BackendFactory::createBackend(): unexpected number of matches in regex"); if (regexResult.size() == 5) - return std::unique_ptr<Backend>(new BackendRados(regexResult[1], regexResult[2], regexResult[4])); + return std::unique_ptr<Backend>(new BackendRados(logger, regexResult[1], regexResult[2], regexResult[4])); else - return std::unique_ptr<Backend>(new BackendRados(regexResult[1], regexResult[2])); + return std::unique_ptr<Backend>(new BackendRados(logger, regexResult[1], regexResult[2])); } // Fall back to a file URL if all failed return std::unique_ptr<Backend>(new BackendVFS(URL)); diff --git a/objectstore/BackendFactory.hpp b/objectstore/BackendFactory.hpp index ecb7ebcf152bad1d70b109033367f18235d9c2ca..d4ac004ded9524bc6da44d740a711fa7498a22bd 100644 --- a/objectstore/BackendFactory.hpp +++ b/objectstore/BackendFactory.hpp @@ -19,11 +19,12 @@ #pragma once #include "Backend.hpp" +#include "common/log/Logger.hpp" #include <memory> namespace cta { namespace objectstore { class BackendFactory { public: - static std::unique_ptr<Backend> createBackend(const std::string & URL); + static std::unique_ptr<Backend> createBackend(const std::string & URL, log::Logger & logger); }; }} \ No newline at end of file diff --git a/objectstore/BackendRados.cpp b/objectstore/BackendRados.cpp index 3a50e9ec2988bd5c9a0c9bb672239926240a5f1d..a9316c6f45572202a338905a22dff9d910cedbb2 100644 --- a/objectstore/BackendRados.cpp +++ b/objectstore/BackendRados.cpp @@ -80,23 +80,53 @@ namespace cta { namespace objectstore { cta::threading::Mutex BackendRados::RadosTimeoutLogger::g_mutex; -BackendRados::BackendRados(const std::string & userId, const std::string & pool, const std::string &radosNameSpace) : +BackendRados::BackendRados(log::Logger & logger, const std::string & userId, const std::string & pool, + const std::string &radosNameSpace) : m_user(userId), m_pool(pool), m_namespace(radosNameSpace), m_cluster(), m_radosCtx() { cta::exception::Errnum::throwOnReturnedErrno(-m_cluster.init(userId.c_str()), "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.init"); + bool contextSet=false; try { + RadosTimeoutLogger rtl; cta::exception::Errnum::throwOnReturnedErrno(-m_cluster.conf_read_file(NULL), "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.conf_read_file"); + rtl.logIfNeeded("In BackendRados::BackendRados(): m_cluster.conf_read_file()", "no object"); + rtl.reset(); cta::exception::Errnum::throwOnReturnedErrno(-m_cluster.conf_parse_env(NULL), "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.conf_parse_env"); + rtl.logIfNeeded("In BackendRados::BackendRados(): m_cluster.conf_parse_env()", "no object"); + rtl.reset(); cta::exception::Errnum::throwOnReturnedErrno(-m_cluster.connect(), "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.connect"); + rtl.logIfNeeded("In BackendRados::BackendRados(): m_cluster.connect()", "no object"); + rtl.reset(); cta::exception::Errnum::throwOnReturnedErrno(-m_cluster.ioctx_create(pool.c_str(), m_radosCtx), "In ObjectStoreRados::ObjectStoreRados, failed to m_cluster.ioctx_create"); + rtl.logIfNeeded("In BackendRados::BackendRados(): m_cluster.ioctx_create()", "no object"); + contextSet=true; // An empty string also sets the namespace to default so no need to filter. This function does not fail. m_radosCtx.set_namespace(radosNameSpace); + // Create the thread pool. One thread per CPU hardware thread. + for (size_t i=0; i<std::thread::hardware_concurrency(); i++) { + RadosWorkerThreadAndContext * rwtac = new RadosWorkerThreadAndContext(m_cluster, pool, radosNameSpace, i, logger); + m_threads.push_back(rwtac); + m_threads.back()->start(); + } } catch (...) { + for (size_t i=0; i<m_threads.size(); i++) m_JobQueue.push(nullptr); + for (auto &t: m_threads) { + if (t) t->wait(); + delete t; + } + if (contextSet) + { + RadosTimeoutLogger rtl; + m_radosCtx.close(); + rtl.logIfNeeded("In BackendRados::BackendRados(): m_radosCtx.close()", "no object"); + } + RadosTimeoutLogger rtl; m_cluster.shutdown(); + rtl.logIfNeeded("In BackendRados::BackendRados(): m_cluster.shutdown()", "no object"); throw; } } @@ -504,6 +534,28 @@ BackendRados::ScopedLock* BackendRados::lockShared(std::string name, uint64_t ti return ret.release(); } +BackendRados::RadosWorkerThreadAndContext::RadosWorkerThreadAndContext(librados::Rados& cluster, + const std::string & pool, const std::string& radosNameSpace, int threadID, log::Logger & logger): + m_threadID(threadID), m_lc(logger) { + RadosTimeoutLogger rtl; + cta::exception::Errnum::throwOnReturnedErrno(-cluster.ioctx_create(pool.c_str(), m_radosCtx), + "In RadosWorkerThreadAndContext::RadosWorkerThreadAndContext, failed to cluster.ioctx_create"); + rtl.logIfNeeded("In RadosWorkerThreadAndContext::RadosWorkerThreadAndContext(): ", "no object"); + // An empty string also sets the namespace to default so no need to filter. This function does not fail. + m_radosCtx.set_namespace(radosNameSpace); +} + +BackendRados::RadosWorkerThreadAndContext::~RadosWorkerThreadAndContext() { + RadosTimeoutLogger rtl; + m_radosCtx.close(); + rtl.logIfNeeded("In RadosWorkerThreadAndContext::~RadosWorkerThreadAndContext(): m_radosCtx.close()", "no object"); +} + +void BackendRados::RadosWorkerThreadAndContext::run() { + +} + + Backend::AsyncUpdater* BackendRados::asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update) { return new AsyncUpdater(*this, name, update); diff --git a/objectstore/BackendRados.hpp b/objectstore/BackendRados.hpp index e52ee4cd59518be858bb830eb34c517eecee5366..e0fb508c6146ab4b3e1e44268f7c8f722fd0c77f 100644 --- a/objectstore/BackendRados.hpp +++ b/objectstore/BackendRados.hpp @@ -21,6 +21,9 @@ #include "Backend.hpp" #include "rados/librados.hpp" #include "common/threading/Mutex.hpp" +#include "common/threading/BlockingQueue.hpp" +#include "common/log/Logger.hpp" +#include "common/log/LogContext.hpp" #include <future> // RADOS_LOCKING can be NOTIFY or BACKOFF @@ -57,7 +60,7 @@ public: * @param userId * @param pool */ - BackendRados(const std::string & userId, const std::string & pool, const std::string &radosNameSpace = ""); + BackendRados(log::Logger & logger, const std::string & userId, const std::string & pool, const std::string &radosNameSpace = ""); ~BackendRados() override; std::string user() { return m_user; @@ -183,6 +186,42 @@ private: uint64_t m_watchHandle; }; +private: + /** + * Base class for jobs handled by the thread-and-context pool. + */ + class AsyncJob { + virtual void execute(librados::IoCtx & context)=0; + virtual ~AsyncJob() {} + }; + + /** + * The queue for the thread-and-context pool. + */ + cta::threading::BlockingQueue<AsyncJob *> m_JobQueue; + + /** + * The class for the worker threads + */ + class RadosWorkerThreadAndContext: private cta::threading::Thread { + public: + RadosWorkerThreadAndContext(librados::Rados & cluster, const std::string & pool, const std::string & radosNameSpace, + int threadID, log::Logger & logger); + virtual ~RadosWorkerThreadAndContext(); + void start() { cta::threading::Thread::start(); } + void wait() { cta::threading::Thread::wait(); } + private: + librados::IoCtx m_radosCtx; + const int m_threadID; + log::LogContext m_lc; + void run() override; + }; + + /** + * The container for the threads + */ + std::vector<RadosWorkerThreadAndContext *> m_threads; + public: /** * A class following up the check existence-lock-fetch-update-write-unlock. Constructor implicitly diff --git a/objectstore/cta-objectstore-collect-orphaned-object.cpp b/objectstore/cta-objectstore-collect-orphaned-object.cpp index e3e2c85bd0203f568ce62e7d34d082d66469965e..a6188019a39543caa211c9ed14fdeaaf93500242 100644 --- a/objectstore/cta-objectstore-collect-orphaned-object.cpp +++ b/objectstore/cta-objectstore-collect-orphaned-object.cpp @@ -44,7 +44,7 @@ int main(int argc, char ** argv) { cta::log::LogContext lc(sl); std::string objectName; if (4 == argc) { - be.reset(cta::objectstore::BackendFactory::createBackend(argv[1]).release()); + be.reset(cta::objectstore::BackendFactory::createBackend(argv[1], sl).release()); const cta::rdbms::Login catalogueLogin = cta::rdbms::Login::parseFile(argv[2]); const uint64_t nbConns = 1; const uint64_t nbArchiveFileListingConns = 0; @@ -52,7 +52,7 @@ int main(int argc, char ** argv) { objectName = argv[3]; } else if (2 == argc) { cta::common::Configuration m_ctaConf("/etc/cta/cta-frontend.conf"); - be = std::move(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr))); + be = std::move(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr), sl)); const cta::rdbms::Login catalogueLogin = cta::rdbms::Login::parseFile("/etc/cta/cta-catalogue.conf"); const uint64_t nbConns = 1; const uint64_t nbArchiveFileListingConns = 0; diff --git a/objectstore/cta-objectstore-dereference-removed-queues.cpp b/objectstore/cta-objectstore-dereference-removed-queues.cpp index 560522493c2431fa27e253d4be4e10ada94b1748..5aaeb492869d00d490682468f0dd9bcdaca72de9 100644 --- a/objectstore/cta-objectstore-dereference-removed-queues.cpp +++ b/objectstore/cta-objectstore-dereference-removed-queues.cpp @@ -23,6 +23,8 @@ */ #include "common/Configuration.hpp" +#include "common/log/StdoutLogger.hpp" +#include "common/log/LogContext.hpp" #include "BackendFactory.hpp" #include "BackendVFS.hpp" #include "Agent.hpp" @@ -33,12 +35,13 @@ int main(int argc, char ** argv) { try { + cta::log::StdoutLogger logger("cta-objectstore-dereference-removed-queues"); std::unique_ptr<cta::objectstore::Backend> be; if (2 == argc) { - be.reset(cta::objectstore::BackendFactory::createBackend(argv[1]).release()); + be.reset(cta::objectstore::BackendFactory::createBackend(argv[1], logger).release()); } else if (1 == argc) { cta::common::Configuration m_ctaConf("/etc/cta/cta-frontend.conf"); - be = std::move(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr))); + be = std::move(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr), logger)); } else { throw std::runtime_error("Wrong number of arguments: expected 0 or 1: [objectstoreURL]"); } diff --git a/objectstore/cta-objectstore-dump-object.cpp b/objectstore/cta-objectstore-dump-object.cpp index 7e3899ef311a39ad22517fa9852359052a48d813..9cce8d42b295c6493f802213904cfd3282ec1e05 100644 --- a/objectstore/cta-objectstore-dump-object.cpp +++ b/objectstore/cta-objectstore-dump-object.cpp @@ -24,6 +24,8 @@ #include "common/Configuration.hpp" #include "BackendFactory.hpp" +#include "common/log/DummyLogger.hpp" +#include "common/log/LogContext.hpp" #include "BackendVFS.hpp" #include "GenericObject.hpp" #include <iostream> @@ -31,14 +33,15 @@ int main(int argc, char ** argv) { try { + cta::log::DummyLogger dl(""); std::unique_ptr<cta::objectstore::Backend> be; std::string objectName; if (3 == argc) { - be.reset(cta::objectstore::BackendFactory::createBackend(argv[1]).release()); + be.reset(cta::objectstore::BackendFactory::createBackend(argv[1], dl).release()); objectName = argv[2]; } else if (2 == argc ){ cta::common::Configuration m_ctaConf("/etc/cta/cta-frontend.conf"); - be=std::move(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr))); + be=std::move(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr), dl)); objectName = argv[1]; } else { throw std::runtime_error("Wrong number of arguments: expected 1 or 2: [objectstoreURL] objectname"); diff --git a/objectstore/cta-objectstore-initialize.cpp b/objectstore/cta-objectstore-initialize.cpp index 9d0d2a206198a91ad4bdb329d8d8f4e7dd52c695..93f2faefa3680e282e92af093b100faa58a15c9c 100644 --- a/objectstore/cta-objectstore-initialize.cpp +++ b/objectstore/cta-objectstore-initialize.cpp @@ -33,12 +33,13 @@ int main(int argc, char ** argv) { try { + cta::log::StdoutLogger logger("cta-objectstore-initialize"); + cta::log::LogContext lc(logger); std::unique_ptr<cta::objectstore::Backend> be; if (1 == argc) { be.reset(new cta::objectstore::BackendVFS); - } else if (2 == argc) { - be.reset(cta::objectstore::BackendFactory::createBackend(argv[1]).release()); + be.reset(cta::objectstore::BackendFactory::createBackend(argv[1], logger).release()); } else { throw std::runtime_error("Wrong number of arguments: expected 0 or 1"); } @@ -52,12 +53,10 @@ int main(int argc, char ** argv) { re.insert(); cta::objectstore::ScopedExclusiveLock rel(re); re.fetch(); - cta::log::StdoutLogger logger("cta-objectstore-initialize"); cta::objectstore::AgentReference agr("cta-objectstore-initialize", logger); cta::objectstore::Agent ag(agr.getAgentAddress(), *be); ag.initialize(); cta::objectstore::EntryLogSerDeser el("user0", "systemhost", time(NULL)); - cta::log::LogContext lc(logger); re.addOrGetAgentRegisterPointerAndCommit(agr,el, lc); rel.release(); ag.insertAndRegisterSelf(lc); diff --git a/objectstore/cta-objectstore-list.cpp b/objectstore/cta-objectstore-list.cpp index e0ddc4e82bdc0615cb2edd2083723fc7ffa51839..f0cb5ac9161017cba3cb68e8f664a077313345a0 100644 --- a/objectstore/cta-objectstore-list.cpp +++ b/objectstore/cta-objectstore-list.cpp @@ -25,6 +25,8 @@ #include "common/Configuration.hpp" #include "BackendVFS.hpp" #include "BackendFactory.hpp" +#include "common/log/DummyLogger.hpp" +#include "common/log/LogContext.hpp" #include "RootEntry.hpp" #include "Agent.hpp" #include <iostream> @@ -32,12 +34,13 @@ int main(int argc, char ** argv) { try { + cta::log::DummyLogger dl(""); std::unique_ptr<cta::objectstore::Backend> be; if (1 == argc) { cta::common::Configuration m_ctaConf("/etc/cta/cta-frontend.conf"); - be = std::move(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr))); + be = std::move(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr), dl)); } else if (2 == argc) { - be.reset(cta::objectstore::BackendFactory::createBackend(argv[1]).release()); + be.reset(cta::objectstore::BackendFactory::createBackend(argv[1], dl).release()); } else { throw std::runtime_error("Wrong number of arguments: expected 0 or 1: [objectstoreURL]"); } diff --git a/objectstore/cta-objectstore-unfollow-agent.cpp b/objectstore/cta-objectstore-unfollow-agent.cpp index 1a61b3404e6bc88d6a0363d446fda6d7dbef3f95..02482e5f6522156948b5ce683c23f50c22b0fca4 100644 --- a/objectstore/cta-objectstore-unfollow-agent.cpp +++ b/objectstore/cta-objectstore-unfollow-agent.cpp @@ -24,6 +24,8 @@ #include "common/Configuration.hpp" #include "BackendFactory.hpp" +#include "common/log/StdoutLogger.hpp" +#include "common/log/LogContext.hpp" #include "BackendVFS.hpp" #include "Agent.hpp" #include "RootEntry.hpp" @@ -34,14 +36,15 @@ int main(int argc, char ** argv) { try { + cta::log::StdoutLogger logger("cta-objectstore-unfollow-agent"); std::string agentName; std::unique_ptr<cta::objectstore::Backend> be; if (3 == argc) { - be.reset(cta::objectstore::BackendFactory::createBackend(argv[1]).release()); + be.reset(cta::objectstore::BackendFactory::createBackend(argv[1], logger).release()); agentName = argv[2]; } else if (2 == argc) { cta::common::Configuration m_ctaConf("/etc/cta/cta-frontend.conf"); - be=std::move(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr))); + be=std::move(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr), logger)); agentName = argv[1]; } else { throw std::runtime_error("Wrong number of arguments: expected 1 or 2: [objectstoreURL] agentName"); diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 34b385bb298def8e9a370dbaad3f0021410f1990..870227844f4c113b9919d520fffc421e8bfaaec8 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -1728,7 +1728,7 @@ const SchedulerDatabase::ArchiveMount::MountInfo& OStoreDB::ArchiveMount::getMou //------------------------------------------------------------------------------ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMount::getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext& logContext) { - utils::Timer t; + utils::Timer t, totalTime; double driveRegisterCheckTime = 0; double findQueueTime = 0; double lockFetchQueueTime = 0; @@ -2074,7 +2074,8 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun .add("jobsUpdateTime", jobsUpdateTime) .add("queueProcessTime", queueProcessTime) .add("ownershipRemovalTime", ownershipRemovalTime) - .add("queueCommitTime", queueCommitTime); + .add("queueCommitTime", queueCommitTime) + .add("schedulerDbTime", totalTime.secs()); logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): jobs retrieval complete."); } // We can construct the return value. @@ -2283,7 +2284,7 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo } catch (cta::exception::Exception & e) { std::string debugType=typeid(e).name(); if (typeid(e) == typeid(Backend::NoSuchObject) || - typeid(e) == typeid(objectstore::ArchiveRequest::WrongPreviousOwner)) { + typeid(e) == typeid(objectstore::RetrieveRequest::WrongPreviousOwner)) { // The object was not present or not owned, so we skip it. It should be removed from // the queue. jobsToDequeue.emplace_back((*j)->m_retrieveRequest.getAddressIfSet()); diff --git a/scheduler/OStoreDB/OStoreDBFactory.hpp b/scheduler/OStoreDB/OStoreDBFactory.hpp index d7fd49501e324eee53c5fbaf02ab505ad7f90497..5579662c32d7fb2f2ded1d9969ec66c8e8cb07fa 100644 --- a/scheduler/OStoreDB/OStoreDBFactory.hpp +++ b/scheduler/OStoreDB/OStoreDBFactory.hpp @@ -165,8 +165,8 @@ public: } private: - std::unique_ptr <cta::objectstore::Backend> m_backend; std::unique_ptr <cta::log::Logger> m_logger; + std::unique_ptr <cta::objectstore::Backend> m_backend; std::unique_ptr <cta::catalogue::Catalogue> m_catalogue; cta::OStoreDB m_OStoreDB; objectstore::AgentReference m_agentReference; @@ -175,7 +175,7 @@ private: template <> OStoreDBWrapper<cta::objectstore::BackendVFS>::OStoreDBWrapper( const std::string &context, const std::string &URL) : -m_backend(new cta::objectstore::BackendVFS()), m_logger(new cta::log::DummyLogger("")), +m_logger(new cta::log::DummyLogger("")), m_backend(new cta::objectstore::BackendVFS()), m_catalogue(new cta::catalogue::DummyCatalogue(*m_logger)), m_OStoreDB(*m_backend, *m_catalogue, *m_logger), m_agentReference("OStoreDBFactory", *m_logger) { // We need to populate the root entry before using. @@ -202,7 +202,7 @@ m_OStoreDB(*m_backend, *m_catalogue, *m_logger), m_agentReference("OStoreDBFacto template <> OStoreDBWrapper<cta::objectstore::BackendRados>::OStoreDBWrapper( const std::string &context, const std::string &URL) : -m_backend(cta::objectstore::BackendFactory::createBackend(URL).release()), m_logger(new cta::log::DummyLogger("")), +m_logger(new cta::log::DummyLogger("")), m_backend(cta::objectstore::BackendFactory::createBackend(URL, *m_logger).release()), m_catalogue(new cta::catalogue::DummyCatalogue(*m_logger)), m_OStoreDB(*m_backend, *m_catalogue, *m_logger), m_agentReference("OStoreDBFactory", *m_logger) { // We need to first clean up possible left overs in the pool diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index a5d52cdbfec840966d55558d7eb24df6be0d00dc..e954a66d82bf74c0e90af15de5286271341f85cf 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -464,40 +464,12 @@ std::list<common::dataStructures::DriveState> Scheduler::getDriveStates(const co } //------------------------------------------------------------------------------ -// getNextMount +// sortAndGetTapesForMountInfo //------------------------------------------------------------------------------ -std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLibraryName, const std::string &driveName, log::LogContext & lc) { - // In order to decide the next mount to do, we have to take a global lock on - // the scheduling, retrieve a list of all running mounts, queues sizes for - // tapes and tape pools, order the candidates by priority - // below threshold, and pick one at a time, we then attempt to get a tape - // from the catalogue (for the archive mounts), and walk the list until we - // mount or find nothing to do. - // We then skip to the next candidate, until we find a suitable one and - // return the mount, or exhaust all of them an - // Many steps for this logic are not specific for the database and are hence - // implemented in the scheduler itself. - // First, get the mount-related info from the DB - utils::Timer timer; - double getMountInfoTime = 0; - double queueTrimingTime = 0; - double getTapeInfoTime = 0; - double candidateSortingTime = 0; - double getTapeForWriteTime = 0; - double decisionTime = 0; - double mountCreationTime = 0; - double driveStatusSetTime = 0; - double schedulerDbTime = 0; - double catalogueTime = 0; - std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> mountInfo; - mountInfo = m_db.getMountInfo(lc); - getMountInfoTime = timer.secs(utils::Timer::resetCounter); - if (mountInfo->queueTrimRequired) { - m_db.trimEmptyQueues(lc); - queueTrimingTime = timer.secs(utils::Timer::resetCounter); - } - __attribute__((unused)) SchedulerDatabase::TapeMountDecisionInfo & debugMountInfo = *mountInfo; - +void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo>& mountInfo, + const std::string & logicalLibraryName, const std::string & driveName, utils::Timer & timer, + std::map<tpType, uint32_t> & existingMountsSummary, std::set<std::string> & tapesInUse, std::list<catalogue::TapeForWriting> & tapeList, + double & getTapeInfoTime, double & candidateSortingTime, double & getTapeForWriteTime, log::LogContext & lc) { // The library information is not know for the tapes involved in retrieves. We // need to query the catalogue now about all those tapes. // Build the list of tapes. @@ -530,9 +502,6 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib // With the existing mount list, we can now populate the potential mount list // with the per tape pool existing mount statistics. - typedef std::pair<std::string, common::dataStructures::MountType> tpType; - std::map<tpType, uint32_t> existingMountsSummary; - std::set<std::string> tapesInUse; for (auto & em: mountInfo->existingOrNextMounts) { // If a mount is still listed for our own drive, it is a leftover that we disregard. if (em.driveName!=driveName) { @@ -547,7 +516,7 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib params.add("vid", em.vid) .add("mountType", common::dataStructures::toString(em.type)) .add("drive", em.driveName); - lc.log(log::DEBUG,"In Scheduler::getNextMount(): tapeAlreadyInUse found."); + lc.log(log::DEBUG,"In Scheduler::sortAndGetTapesForMountInfo(): tapeAlreadyInUse found."); } } } @@ -590,7 +559,7 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib .add("minArchiveRequestAge", m->minArchiveRequestAge) .add("existingMounts", existingMounts) .add("maxDrivesAllowed", m->maxDrivesAllowed); - lc.log(log::DEBUG, "Removing potential mount not passing criteria"); + lc.log(log::DEBUG, "In Scheduler::sortAndGetTapesForMountInfo(): Removing potential mount not passing criteria"); m = mountInfo->potentialMounts.erase(m); } else { // populate the mount with a weight @@ -611,7 +580,7 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib .add("existingMounts", existingMounts) .add("maxDrivesAllowed", m->maxDrivesAllowed) .add("ratioOfMountQuotaUsed", m->ratioOfMountQuotaUsed); - lc.log(log::DEBUG, "Will consider potential mount"); + lc.log(log::DEBUG, "In Scheduler::sortAndGetTapesForMountInfo(): Will consider potential mount"); m++; } } @@ -626,7 +595,6 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib // Find out if we have any potential archive mount in the list. If so, get the // list of tapes from the catalogue. - std::list<catalogue::TapeForWriting> tapeList; if (std::count_if( mountInfo->potentialMounts.cbegin(), mountInfo->potentialMounts.cend(), [](decltype(*mountInfo->potentialMounts.cbegin())& m){ return m.type == common::dataStructures::MountType::Archive; } )) { @@ -643,6 +611,165 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib t++; } } +} + +//------------------------------------------------------------------------------ +// getNextMountDryRun +//------------------------------------------------------------------------------ +bool Scheduler::getNextMountDryRun(const std::string& logicalLibraryName, const std::string& driveName, log::LogContext& lc) { + // We run the same algorithm as the actual getNextMount without the global lock + // For this reason, we just return true as soon as valid mount has been found. + utils::Timer timer; + double getMountInfoTime = 0; + double getTapeInfoTime = 0; + double candidateSortingTime = 0; + double getTapeForWriteTime = 0; + double decisionTime = 0; + double schedulerDbTime = 0; + double catalogueTime = 0; + std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> mountInfo; + mountInfo = m_db.getMountInfoNoLock(lc); + getMountInfoTime = timer.secs(utils::Timer::resetCounter); + std::map<tpType, uint32_t> existingMountsSummary; + std::set<std::string> tapesInUse; + std::list<catalogue::TapeForWriting> tapeList; + + sortAndGetTapesForMountInfo(mountInfo, logicalLibraryName, driveName, timer, + existingMountsSummary, tapesInUse, tapeList, + getTapeInfoTime, candidateSortingTime, getTapeForWriteTime, lc); + + // We can now simply iterate on the candidates until we manage to find a valid mount + for (auto m = mountInfo->potentialMounts.begin(); m!=mountInfo->potentialMounts.end(); m++) { + // If the mount is an archive, we still have to find a tape. + if (m->type==common::dataStructures::MountType::Archive) { + // We need to find a tape for archiving. It should be both in the right + // tape pool and in the drive's logical library + // The first tape matching will go for a prototype. + // TODO: improve to reuse already partially written tapes and randomization + for (auto & t: tapeList) { + if (t.tapePool == m->tapePool) { + // We have our tape. That's enough. + decisionTime += timer.secs(utils::Timer::resetCounter); + schedulerDbTime = getMountInfoTime; + catalogueTime = getTapeInfoTime + getTapeForWriteTime; + uint32_t existingMounts = 0; + try { + existingMounts=existingMountsSummary.at(tpType(m->tapePool, m->type)); + } catch (...) {} + log::ScopedParamContainer params(lc); + params.add("tapepool", m->tapePool) + .add("vid", t.vid) + .add("mountType", common::dataStructures::toString(m->type)) + .add("existingMounts", existingMounts) + .add("bytesQueued", m->bytesQueued) + .add("minBytesToWarrantMount", m_minBytesToWarrantAMount) + .add("filesQueued", m->filesQueued) + .add("minFilesToWarrantMount", m_minFilesToWarrantAMount) + .add("oldestJobAge", time(NULL) - m->oldestJobStartTime) + .add("minArchiveRequestAge", m->minArchiveRequestAge) + .add("getMountInfoTime", getMountInfoTime) + .add("getTapeInfoTime", getTapeInfoTime) + .add("candidateSortingTime", candidateSortingTime) + .add("getTapeForWriteTime", getTapeForWriteTime) + .add("decisionTime", decisionTime) + .add("schedulerDbTime", schedulerDbTime) + .add("catalogueTime", catalogueTime); + lc.log(log::DEBUG, "In Scheduler::getNextMountDryRun(): Found a potential mount (archive)"); + return true; + } + } + } else if (m->type==common::dataStructures::MountType::Retrieve) { + // We know the tape we intend to mount. We have to validate the tape is + // actually available to read (not mounted or about to be mounted, and pass + // on it if so). + if (tapesInUse.count(m->vid)) continue; + decisionTime += timer.secs(utils::Timer::resetCounter); + log::ScopedParamContainer params(lc); + uint32_t existingMounts = 0; + try { + existingMounts=existingMountsSummary.at(tpType(m->tapePool, m->type)); + } catch (...) {} + schedulerDbTime = getMountInfoTime; + catalogueTime = getTapeInfoTime + getTapeForWriteTime; + params.add("tapepool", m->tapePool) + .add("vid", m->vid) + .add("mountType", common::dataStructures::toString(m->type)) + .add("existingMounts", existingMounts) + .add("bytesQueued", m->bytesQueued) + .add("minBytesToWarrantMount", m_minBytesToWarrantAMount) + .add("filesQueued", m->filesQueued) + .add("minFilesToWarrantMount", m_minFilesToWarrantAMount) + .add("oldestJobAge", time(NULL) - m->oldestJobStartTime) + .add("minArchiveRequestAge", m->minArchiveRequestAge) + .add("getMountInfoTime", getMountInfoTime) + .add("getTapeInfoTime", getTapeInfoTime) + .add("candidateSortingTime", candidateSortingTime) + .add("getTapeForWriteTime", getTapeForWriteTime) + .add("decisionTime", decisionTime) + .add("schedulerDbTime", schedulerDbTime) + .add("catalogueTime", catalogueTime); + lc.log(log::DEBUG, "In Scheduler::getNextMountDryRun(): Found a potential mount (retrieve)"); + return true; + } + } + schedulerDbTime = getMountInfoTime; + catalogueTime = getTapeInfoTime + getTapeForWriteTime; + decisionTime += timer.secs(utils::Timer::resetCounter); + log::ScopedParamContainer params(lc); + params.add("getMountInfoTime", getMountInfoTime) + .add("getTapeInfoTime", getTapeInfoTime) + .add("candidateSortingTime", candidateSortingTime) + .add("getTapeForWriteTime", getTapeForWriteTime) + .add("decisionTime", decisionTime) + .add("schedulerDbTime", schedulerDbTime) + .add("catalogueTime", catalogueTime); + lc.log(log::DEBUG, "In Scheduler::getNextMountDryRun(): No valid mount found."); + return false; +} + + +//------------------------------------------------------------------------------ +// getNextMount +//------------------------------------------------------------------------------ +std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLibraryName, const std::string &driveName, log::LogContext & lc) { + // In order to decide the next mount to do, we have to take a global lock on + // the scheduling, retrieve a list of all running mounts, queues sizes for + // tapes and tape pools, order the candidates by priority + // below threshold, and pick one at a time, we then attempt to get a tape + // from the catalogue (for the archive mounts), and walk the list until we + // mount or find nothing to do. + // We then skip to the next candidate, until we find a suitable one and + // return the mount, or exhaust all of them an + // Many steps for this logic are not specific for the database and are hence + // implemented in the scheduler itself. + // First, get the mount-related info from the DB + utils::Timer timer; + double getMountInfoTime = 0; + double queueTrimingTime = 0; + double getTapeInfoTime = 0; + double candidateSortingTime = 0; + double getTapeForWriteTime = 0; + double decisionTime = 0; + double mountCreationTime = 0; + double driveStatusSetTime = 0; + double schedulerDbTime = 0; + double catalogueTime = 0; + std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> mountInfo; + mountInfo = m_db.getMountInfo(lc); + getMountInfoTime = timer.secs(utils::Timer::resetCounter); + if (mountInfo->queueTrimRequired) { + m_db.trimEmptyQueues(lc); + queueTrimingTime = timer.secs(utils::Timer::resetCounter); + } + __attribute__((unused)) SchedulerDatabase::TapeMountDecisionInfo & debugMountInfo = *mountInfo; + + std::map<tpType, uint32_t> existingMountsSummary; + std::set<std::string> tapesInUse; + std::list<catalogue::TapeForWriting> tapeList; + + sortAndGetTapesForMountInfo(mountInfo, logicalLibraryName, driveName, timer, + existingMountsSummary, tapesInUse, tapeList, + getTapeInfoTime, candidateSortingTime, getTapeForWriteTime, lc); // We can now simply iterate on the candidates until we manage to create a // mount for one of them @@ -769,6 +896,7 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib } schedulerDbTime = getMountInfoTime + queueTrimingTime + mountCreationTime + driveStatusSetTime; catalogueTime = getTapeInfoTime + getTapeForWriteTime; + decisionTime += timer.secs(utils::Timer::resetCounter); log::ScopedParamContainer params(lc); params.add("getMountInfoTime", getMountInfoTime) .add("queueTrimingTime", queueTrimingTime) diff --git a/scheduler/Scheduler.hpp b/scheduler/Scheduler.hpp index db4fa6f16dddd8c08dfbc0dcacd1b5301b4a05e9..f882fd3a66d005fbcddf51b49ccd838f40dcfafc 100644 --- a/scheduler/Scheduler.hpp +++ b/scheduler/Scheduler.hpp @@ -41,6 +41,7 @@ #include "common/dataStructures/VerifyInfo.hpp" #include "common/dataStructures/WriteTestResult.hpp" #include "common/dataStructures/QueueAndMountSummary.hpp" +#include "common/Timer.hpp" #include "common/exception/Exception.hpp" #include "common/log/LogContext.hpp" @@ -247,6 +248,27 @@ public: const cta::common::dataStructures::SecurityIdentity &cliIdentity, log::LogContext & lc) const; /*============== Actual mount scheduling and queue status reporting ========*/ +private: + typedef std::pair<std::string, common::dataStructures::MountType> tpType; + /** + * Common part to getNextMountDryRun() and getNextMount() to populate mount decision info. + * The structure should be pre-loaded by the calling function. + */ + void sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> &mountInfo, + const std::string & logicalLibraryName, const std::string & driveName, utils::Timer & timer, + std::map<tpType, uint32_t> & existingMountsSummary, std::set<std::string> & tapesInUse, std::list<catalogue::TapeForWriting> & tapeList, + double & getTapeInfoTime, double & candidateSortingTime, double & getTapeForWriteTime, log::LogContext & lc); + +public: + /** + * Run the mount decision logic lock free, so we have no contention in the + * most usual case where there is no mount to create. + * @param logicalLibraryName library for the drive we are scheduling + * @param driveName name of the drive we are scheduling + * @param lc log context + * @return true if a valid mount would have been found. + */ + bool getNextMountDryRun(const std::string &logicalLibraryName, const std::string &driveName, log::LogContext & lc); /** * Actually decide which mount to do next for a given drive. * @param logicalLibraryName library for the drive we are scheduling diff --git a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.cpp b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.cpp index a912740c7a971b8347722bdeead988766c2b03ec..e258791363f60c37a8edae3a6db896c36a546908 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.cpp @@ -156,7 +156,8 @@ schedule: // As getting next mount could be long, we report the drive as up immediately. m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Up, lc); try { - tapeMount.reset(m_scheduler.getNextMount(m_driveConfig.logicalLibrary, m_driveConfig.unitName, lc).release()); + if (m_scheduler.getNextMountDryRun(m_driveConfig.logicalLibrary, m_driveConfig.unitName, lc)) + tapeMount.reset(m_scheduler.getNextMount(m_driveConfig.logicalLibrary, m_driveConfig.unitName, lc).release()); } catch (cta::exception::Exception & e) { cta::log::ScopedParamContainer localParams(lc); localParams.add("errorMessage", e.getMessageValue()); diff --git a/tapeserver/castor/tape/tapeserver/drive/DriveGeneric.cpp b/tapeserver/castor/tape/tapeserver/drive/DriveGeneric.cpp index 3115a6635bcae9a6e5c760c6450263c8a7115cbd..a08abcd1e101865983046cf666277797fff4bd7e 100644 --- a/tapeserver/castor/tape/tapeserver/drive/DriveGeneric.cpp +++ b/tapeserver/castor/tape/tapeserver/drive/DriveGeneric.cpp @@ -2560,7 +2560,7 @@ void drive::DriveGeneric::waitTestUnitReady(const uint32_t timeoutSecond) { } while(t.secs() < timeoutSecond); // Throw an exception for the last tolerated exception that exceeded the timer - cta::exception::Exception ex; + cta::exception::TimeOut ex; ex.getMessage() << "Failed to test unit ready after waiting " << timeoutSecond << " seconds: " << lastTestUnitReadyExceptionMsg; throw ex; diff --git a/tapeserver/castor/tape/tapeserver/file/File.cpp b/tapeserver/castor/tape/tapeserver/file/File.cpp index 16d21ee9dafb397dd65f10569f2bae9b1c1ec969..6d817ab5acf017101d0d5accebfe4dc81d392c63 100644 --- a/tapeserver/castor/tape/tapeserver/file/File.cpp +++ b/tapeserver/castor/tape/tapeserver/file/File.cpp @@ -70,14 +70,12 @@ namespace castor { throw ex; } - // this readBlock only for mhvtl workaround m_drive.rewind(); m_drive.disableLogicalBlockProtection(); - VOL1withCrc vol1WithCrc; - const ssize_t res = m_drive.readBlock((void * )&vol1WithCrc, - sizeof(vol1WithCrc)); - if (res >= (ssize_t)(sizeof(VOL1withCrc) - sizeof(VOL1))) { - switch(vol1WithCrc.getLBPMethod()) { + { + VOL1 vol1; + m_drive.readExactBlock((void * )&vol1, sizeof(vol1), "[ReadSession::ReadSession()] - Reading VOL1"); + switch(vol1.getLBPMethod()) { case SCSI::logicBlockProtectionMethod::CRC32C: m_detectedLbp = true; if (m_useLbp) { @@ -94,21 +92,22 @@ namespace castor { m_detectedLbp = false; break; default: - throw cta::exception::Exception("In ReadSession::ReadSession(): " - "unknown LBP method"); + throw cta::exception::Exception("In ReadSession::ReadSession(): unknown LBP method"); } } // from this point the right LBP mode should be set or not set m_drive.rewind(); - VOL1 vol1; - m_drive.readExactBlock((void * )&vol1, sizeof(vol1), "[ReadSession::ReadSession()] - Reading VOL1"); - try { - vol1.verify(); - } catch (std::exception & e) { - throw TapeFormatError(e.what()); + { + VOL1 vol1; + m_drive.readExactBlock((void *) &vol1, sizeof(vol1), "[ReadSession::ReadSession()] - Reading VOL1"); + try { + vol1.verify(); + } catch (std::exception &e) { + throw TapeFormatError(e.what()); + } + HeaderChecker::checkVOL1(vol1, volInfo.vid); //after which we are at the end of VOL1 header (i.e. beginning of HDR1 of the first file) on success, or at BOT in case of exception } - HeaderChecker::checkVOL1(vol1, volInfo.vid); //after which we are at the end of VOL1 header (i.e. beginning of HDR1 of the first file) on success, or at BOT in case of exception } void HeaderChecker::checkVOL1(const VOL1 &vol1, const std::string &volId) { @@ -403,14 +402,12 @@ namespace castor { throw ex; } - // this readBlock only for mhvtl workaround m_drive.rewind(); m_drive.disableLogicalBlockProtection(); - VOL1withCrc vol1WithCrc; - const ssize_t res = m_drive.readBlock((void * )&vol1WithCrc, - sizeof(vol1WithCrc)); - if (res >= (ssize_t)(sizeof(VOL1withCrc) - sizeof(VOL1))) { - switch(vol1WithCrc.getLBPMethod()) { + { + VOL1 vol1; + m_drive.readExactBlock((void * )&vol1, sizeof(vol1), "[WriteSession::WriteSession()] - Reading VOL1"); + switch(vol1.getLBPMethod()) { case SCSI::logicBlockProtectionMethod::CRC32C: m_detectedLbp = true; if (m_useLbp) { @@ -431,21 +428,22 @@ namespace castor { m_detectedLbp = false; break; default: - throw cta::exception::Exception("In WriteSession::WriteSession(): " - "unknown LBP method"); + throw cta::exception::Exception("In WriteSession::WriteSession(): unknown LBP method"); } } // from this point the right LBP mode should be set or not set m_drive.rewind(); - VOL1 vol1; - m_drive.readExactBlock((void * )&vol1, sizeof(vol1), "[WriteSession::WriteSession()] - Reading VOL1"); - try { - vol1.verify(); - } catch (std::exception & e) { - throw TapeFormatError(e.what()); - } - HeaderChecker::checkVOL1(vol1, m_vid); // now we know that we are going to write on the correct tape + { + VOL1 vol1; + m_drive.readExactBlock((void *) &vol1, sizeof(vol1), "[WriteSession::WriteSession()] - Reading VOL1"); + try { + vol1.verify(); + } catch (std::exception &e) { + throw TapeFormatError(e.what()); + } + HeaderChecker::checkVOL1(vol1, m_vid); // now we know that we are going to write on the correct tape + } //if the tape is not empty let's move to the last trailer if(last_fSeq>0) { uint32_t dst_filemark = last_fSeq*3-1; // 3 file marks per file but we want to read the last trailer (hence the -1) diff --git a/tapeserver/daemon/DriveHandler.cpp b/tapeserver/daemon/DriveHandler.cpp index 3267f516bce511947e7e81913b3a5249396214c0..0ad9ea65257f1f0fc69d03734bd478cf62ab438c 100644 --- a/tapeserver/daemon/DriveHandler.cpp +++ b/tapeserver/daemon/DriveHandler.cpp @@ -873,7 +873,7 @@ int DriveHandler::runChild() { } // Before anything, we need to check we have access to the scheduler's central storages. std::unique_ptr<cta::objectstore::Backend> backend( - cta::objectstore::BackendFactory::createBackend(m_tapedConfig.backendPath.value()).release()); + cta::objectstore::BackendFactory::createBackend(m_tapedConfig.backendPath.value(), lc.logger()).release()); // If the backend is a VFS, make sure we don't delete it on exit. // If not, nevermind. try { diff --git a/tapeserver/daemon/GarbageCollectorHandler.cpp b/tapeserver/daemon/GarbageCollectorHandler.cpp index 024387697c9f74513c280d2aa3c62763c9959606..4859d9f2da85ee7a349bb5f6e5e2593da7e1e804 100644 --- a/tapeserver/daemon/GarbageCollectorHandler.cpp +++ b/tapeserver/daemon/GarbageCollectorHandler.cpp @@ -254,7 +254,7 @@ int GarbageCollectorHandler::runChild() { // fail likewise, so we just wait for shutdown signal (no feedback to main // process). std::unique_ptr<cta::objectstore::Backend> backend( - cta::objectstore::BackendFactory::createBackend(m_tapedConfig.backendPath.value()).release()); + cta::objectstore::BackendFactory::createBackend(m_tapedConfig.backendPath.value(), m_processManager.logContext().logger()).release()); // If the backend is a VFS, make sure we don't delete it on exit. // If not, nevermind. try { diff --git a/xroot_plugins/XrdCtaFilesystem.cpp b/xroot_plugins/XrdCtaFilesystem.cpp index db1f36e3354de24bd9a412561e2e5dd2388c5b3d..121f48417bc2fac6bf48fdae8331076ec2dbaf0b 100644 --- a/xroot_plugins/XrdCtaFilesystem.cpp +++ b/xroot_plugins/XrdCtaFilesystem.cpp @@ -278,7 +278,7 @@ XrdCtaFilesystem::XrdCtaFilesystem(): throw cta::exception::Exception(std::string("Failed to instantiate object representing CTA logging system: ")+ex.getMessage().str()); } - m_backend = std::move(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr))); + m_backend = std::move(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr), *m_log)); m_backendPopulator = cta::make_unique<cta::objectstore::BackendPopulator>(*m_backend, "Frontend", cta::log::LogContext(*m_log)); const rdbms::Login catalogueLogin = rdbms::Login::parseFile("/etc/cta/cta-catalogue.conf"); diff --git a/xroot_plugins/XrdSsiCtaServiceProvider.cpp b/xroot_plugins/XrdSsiCtaServiceProvider.cpp index e1318de19707440a3e0124d9d954f4e90dd0095b..6baeb3d1d2edc47b4848d9f13bb465b35afe4352 100644 --- a/xroot_plugins/XrdSsiCtaServiceProvider.cpp +++ b/xroot_plugins/XrdSsiCtaServiceProvider.cpp @@ -93,7 +93,7 @@ bool XrdSsiCtaServiceProvider::Init(XrdSsiLogger *logP, XrdSsiCluster *clsP, con // Initialise the Backend - m_backend = std::move(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr))); + m_backend = std::move(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr), *m_log)); m_backendPopulator = cta::make_unique<cta::objectstore::BackendPopulator>(*m_backend, "Frontend", cta::log::LogContext(*m_log)); m_scheddb = cta::make_unique<cta::OStoreDBWithAgent>(*m_backend, m_backendPopulator->getAgentReference(), *m_catalogue, *m_log);