diff --git a/ReleaseNotes.md b/ReleaseNotes.md index 2a173b77a1a6da80184f8ff2e8e124673bc6493a..74f0e8e6964df85546d80ac972553cc47e60c349 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -1,8 +1,11 @@ -# v.NEXT +# v4.NEXT +## Summary ### Features - cta/CTA#211 - Add functionality for reading encrypted tapes with cta-readtp - cta/CTA#214 - Update manual page for cta-admin to include info about user defined config files. +### Bug fixes +- cta/CTA#93 - Refactor Frontend code to allow code sharing between SSI and gRPC implementations # v4.7.14-1 diff --git a/frontend/common/Config.cpp b/frontend/common/Config.cpp new file mode 100644 index 0000000000000000000000000000000000000000..49c589824be230b89491eb6f54c07f88ebb03511 --- /dev/null +++ b/frontend/common/Config.cpp @@ -0,0 +1,101 @@ +/** + * @project The CERN Tape Archive (CTA) + * @copyright Copyright © 2019-2022 CERN + * @license This program is free software, distributed under the terms of the GNU General Public + * Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". You can + * redistribute it and/or modify it under the terms of the GPL Version 3, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * In applying this licence, CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization or + * submit itself to any jurisdiction. + */ + +#include <fstream> +#include <sstream> +#include <vector> +#include <map> + +#include "common/exception/UserError.hpp" +#include "Config.hpp" + +namespace cta { +namespace frontend { + +//! Configuration option list type +using optionlist_t = std::vector<std::string>; + +Config::Config(const std::string& filename) { + // Open the config file for reading + std::ifstream file(filename); + if(!file) { + throw exception::UserError("Failed to open " + filename); + } + // Parse the config file + try { + parse(file); + } catch(std::exception &ex) { + throw exception::UserError("Failed to parse configuration file " + filename + ": " + ex.what()); + } +} + +const optionlist_t& Config::getOptionList(const std::string& key) const { + auto it = m_configuration.find(key); + return it == m_configuration.end() ? m_nulloptionlist : it->second; +} + +std::optional<std::string> Config::getOptionValueStr(const std::string &key) const { + auto optionlist = getOptionList(key); + + return optionlist.empty() ? std::nullopt : std::optional<std::string>(optionlist.at(0)); +} + +std::optional<int> Config::getOptionValueInt(const std::string &key) const { + auto optionlist = getOptionList(key); + + return optionlist.empty() ? std::nullopt : std::optional<int>(std::stoi(optionlist.at(0))); +} + +void Config::parse(std::ifstream &file) { + std::string line; + + while(std::getline(file, line)) { + // Strip out comments + auto pos = line.find('#'); + if(pos != std::string::npos) { + line.resize(pos); + } + + // Extract the key + std::istringstream ss(line); + std::string key; + ss >> key; + + // Extract and store the config options + if(!key.empty()) { + optionlist_t values = tokenize(ss); + + if(!values.empty()) { + m_configuration[key] = values; + } + } + } +} + +optionlist_t Config::tokenize(std::istringstream &input) { + optionlist_t values; + + while(!input.eof()) { + std::string value; + input >> value; + if(!value.empty()) values.push_back(value); + } + + return values; +} + +}} // namespace cta::frontend diff --git a/frontend/common/Config.hpp b/frontend/common/Config.hpp new file mode 100644 index 0000000000000000000000000000000000000000..16886dc3a5c18093cd7bb4d9f44e43a2f6bac31a --- /dev/null +++ b/frontend/common/Config.hpp @@ -0,0 +1,74 @@ +/** + * @project The CERN Tape Archive (CTA) + * @copyright Copyright © 2019-2022 CERN + * @license This program is free software, distributed under the terms of the GNU General Public + * Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". You can + * redistribute it and/or modify it under the terms of the GPL Version 3, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * In applying this licence, CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization or + * submit itself to any jurisdiction. + */ + +#pragma once + +#include <map> +#include <sstream> + +namespace cta { +namespace frontend { + +/*! + * Interface to the CTA Frontend configuration file + */ +class Config +{ +public: + /*! + * Construct from a configuration file + */ + explicit Config(const std::string& filename); + + /*! + * Get a single option string value from config + */ + std::optional<std::string> getOptionValueStr(const std::string &key) const; + + /*! + * Get a single option integer value from config + * + * Throws std::invalid_argument or std::out_of_range if the key exists but the value cannot be + * converted to an integer + */ + std::optional<int> getOptionValueInt(const std::string &key) const; + +private: + //! Configuration option list type + using optionlist_t = std::vector<std::string>; + + /*! + * Parse config file + */ + void parse(std::ifstream& file); + + /*! + * Get option list from config + */ + const optionlist_t& getOptionList(const std::string& key) const; + + /*! + * Tokenize a stringstream + */ + optionlist_t tokenize(std::istringstream& input); + + // Member variables + const optionlist_t m_nulloptionlist; //!< Empty option list returned when key not found + std::map<std::string, optionlist_t> m_configuration; //!< Parsed configuration options +}; + +}} // namespace cta::frontend diff --git a/frontend/common/FrontendService.cpp b/frontend/common/FrontendService.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0ad7331b7130c8aa0b4ad3591ae0a48f8017d73c --- /dev/null +++ b/frontend/common/FrontendService.cpp @@ -0,0 +1,269 @@ +/* + * @project The CERN Tape Archive (CTA) + * @copyright Copyright © 2022 CERN + * @license This program is free software, distributed under the terms of the GNU General Public + * Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". You can + * redistribute it and/or modify it under the terms of the GPL Version 3, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * In applying this licence, CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization or + * submit itself to any jurisdiction. + */ + +#include "version.h" +#include "catalogue/Catalogue.hpp" +#include "catalogue/CatalogueFactory.hpp" +#include "catalogue/CatalogueFactoryFactory.hpp" +#include "rdbms/Login.hpp" +#include "common/log/FileLogger.hpp" +#include "common/log/LogLevel.hpp" +#include "common/log/StdoutLogger.hpp" +#include "common/log/SyslogLogger.hpp" +#include "FrontendService.hpp" +#include "Config.hpp" + +namespace cta { +namespace frontend { + +FrontendService::FrontendService(const std::string& configFilename) : m_archiveFileMaxSize(0) { + int logToSyslog = 0; + int logToStdout = 0; + int logtoFile = 0; + std::string logFilePath = ""; + + // Read CTA namespaced configuration options from XRootD config file + Config config(configFilename); + + // Instantiate the CTA logging system + try { + // Set the logger URL + auto loggerURL = config.getOptionValueStr("cta.log.url"); + if(!loggerURL.has_value()) loggerURL = "syslog:"; + const auto shortHostname = utils::getShortHostname(); + + // Set the logger level + auto loggerLevelStr = config.getOptionValueStr("cta.log.level"); + auto loggerLevel = loggerLevelStr.has_value() ? log::toLogLevel(loggerLevelStr.value()) : log::INFO; + + // Set the log context + if(loggerURL.value() == "syslog:") { + m_log = std::make_unique<log::SyslogLogger>(shortHostname, "cta-frontend", loggerLevel); + logToSyslog = 1; + } else if(loggerURL.value() == "stdout:") { + m_log = std::make_unique<log::StdoutLogger>(shortHostname, "cta-frontend"); + logToStdout = 1; + } else if(loggerURL.value().substr(0, 5) == "file:") { + logtoFile = 1; + logFilePath = loggerURL.value().substr(5); + m_log = std::make_unique<log::FileLogger>(shortHostname, "cta-frontend", logFilePath, loggerLevel); + } else { + throw exception::UserError(std::string("Unknown log URL: ") + loggerURL.value()); + } + } catch(exception::Exception& ex) { + std::string ex_str("Failed to instantiate object representing CTA logging system: "); + throw exception::Exception(ex_str + ex.getMessage().str()); + } + log::Logger& log = *m_log; + const std::list<log::Param> params = {log::Param("version", CTA_VERSION)}; + { + // Log starting message + std::list<log::Param> params; + params.push_back(log::Param("version", CTA_VERSION)); + params.push_back(log::Param("configFilename", configFilename)); + params.push_back(log::Param("logToStdout", std::to_string(logToStdout))); + params.push_back(log::Param("logToSyslog", std::to_string(logToSyslog))); + params.push_back(log::Param("logtoFile", std::to_string(logtoFile))); + params.push_back(log::Param("logFilePath", logFilePath)); + log(log::INFO, std::string("Starting cta-frontend"), params); + } + + // Initialise the Catalogue + std::string catalogueConfigFile = "/etc/cta/cta-catalogue.conf"; + const rdbms::Login catalogueLogin = rdbms::Login::parseFile(catalogueConfigFile); + auto catalogue_numberofconnections = config.getOptionValueInt("cta.catalogue.numberofconnections"); + if(!catalogue_numberofconnections.has_value()) { + throw exception::UserError("cta.catalogue.numberofconnections is not set in configuration file " + configFilename); + } + const uint64_t nbArchiveFileListingConns = 2; + + { + // Log catalogue.numberofconnections + std::list<log::Param> params; + params.push_back(log::Param("source", configFilename)); + params.push_back(log::Param("category", "cta.catalogue")); + params.push_back(log::Param("key", "numberofconnections")); + params.push_back(log::Param("value", std::to_string(catalogue_numberofconnections.value()))); + log(log::INFO, "Configuration entry", params); + } + { + // Log catalogue number of archive file listing connections + std::list<log::Param> params; + params.push_back(log::Param("source", "Compile time default")); + params.push_back(log::Param("category", "cta.catalogue")); + params.push_back(log::Param("key", "nbArchiveFileListingConns")); + params.push_back(log::Param("value", std::to_string(nbArchiveFileListingConns))); + log(log::INFO, "Configuration entry", params); + } + + { + auto catalogueFactory = catalogue::CatalogueFactoryFactory::create(*m_log, catalogueLogin, + catalogue_numberofconnections.value(), nbArchiveFileListingConns); + m_catalogue = catalogueFactory->create(); + try { + m_catalogue->ping(); + } catch(cta::exception::Exception& ex) { + auto lc = getLogContext(); + lc.log(cta::log::CRIT, ex.getMessageValue()); + throw ex; + } + } + + m_catalogue_conn_string = catalogueLogin.connectionString; + + // Initialise the Scheduler DB + const std::string DB_CONN_PARAM = "cta.objectstore.backendpath"; + auto db_conn = config.getOptionValueStr(DB_CONN_PARAM); + if(!db_conn.has_value()) { + throw exception::UserError(DB_CONN_PARAM + " is not set in configuration file " + configFilename); + } + + { + // Log cta.objectstore.backendpath + std::list<log::Param> params; + params.push_back(log::Param("source", configFilename)); + params.push_back(log::Param("category", "cta.objectstore")); + params.push_back(log::Param("key", "backendpath")); + params.push_back(log::Param("value", db_conn.value())); + log(log::INFO, "Configuration entry", params); + } + + m_scheddbInit = std::make_unique<SchedulerDBInit_t>("Frontend", db_conn.value(), *m_log); + m_scheddb = m_scheddbInit->getSchedDB(*m_catalogue, *m_log); + + const auto schedulerThreadStackSize = config.getOptionValueInt("ca.schedulerdb.threadstacksize_mb"); + std::optional<size_t> schedulerThreadStackOpt = schedulerThreadStackSize.has_value() ? + std::optional<size_t>(schedulerThreadStackSize.value() * 1024 * 1024) : std::nullopt; + + auto threadPoolSize = config.getOptionValueInt("cta.schedulerdb.numberofthreads"); + if(threadPoolSize.has_value()) { + m_scheddb->setThreadNumber(threadPoolSize.value(), schedulerThreadStackOpt); + } + m_scheddb->setBottomHalfQueueSize(25000); + + // Log cta.schedulerdb.numberofthreads + if(threadPoolSize.has_value()) { + std::list<log::Param> params; + params.push_back(log::Param("source", configFilename)); + params.push_back(log::Param("category", "cta.schedulerdb")); + params.push_back(log::Param("key", "numberofthreads")); + params.push_back(log::Param("value", std::to_string(threadPoolSize.value()))); + log(log::INFO, "Configuration entry", params); + } + + // Initialise the Scheduler + m_scheduler = std::make_unique<cta::Scheduler>(*m_catalogue, *m_scheddb, 5, 2*1000*1000); + + // Initialise the Frontend + auto archiveFileMaxSize = config.getOptionValueInt("cta.archivefile.max_size_gb"); + // Convert archiveFileMaxSize from GB to bytes + m_archiveFileMaxSize = archiveFileMaxSize.has_value() ? archiveFileMaxSize.value()*1024*1024*1024 : 0; + + { + // Log cta.archivefile.max_size_gb + std::list<log::Param> params; + params.push_back(log::Param("source", archiveFileMaxSize.has_value() ? configFilename : "Compile time default")); + params.push_back(log::Param("category", "cta.archivefile")); + params.push_back(log::Param("key", "max_size_gb")); + params.push_back(log::Param("value", std::to_string(archiveFileMaxSize.has_value() ? archiveFileMaxSize.value() : 0))); + log(log::INFO, "Configuration entry", params); + } + + // Get the repack buffer URL + auto repackBufferURLConf = config.getOptionValueStr("cta.repack.repack_buffer_url"); + if(repackBufferURLConf.has_value()) { + m_repackBufferURL = repackBufferURLConf.value(); + } + + // Get the verification mount policy + const auto verificationMountPolicy = config.getOptionValueStr("cta.verification.mount_policy"); + if(verificationMountPolicy.has_value()) { + m_verificationMountPolicy = verificationMountPolicy.value(); + } + + { + // Log cta.repack.repack_buffer_url + if(repackBufferURLConf.has_value()) { + std::list<log::Param> params; + params.push_back(log::Param("source", configFilename)); + params.push_back(log::Param("category", "cta.repack")); + params.push_back(log::Param("key", "repack_buffer_url")); + params.push_back(log::Param("value", repackBufferURLConf.value())); + log(log::INFO, "Configuration entry", params); + } + } + + // Get the endpoint for namespace queries + auto nsConf = config.getOptionValueStr("cta.ns.config"); + if(nsConf.has_value()) { + setNamespaceMap(nsConf.value()); + } else { + log(log::WARNING, "'cta.ns.config' not specified; namespace queries are disabled"); + } + + { + // Log cta.ns.config + if(nsConf.has_value()) { + std::list<log::Param> params; + params.push_back(log::Param("source", configFilename)); + params.push_back(log::Param("category", "cta.ns")); + params.push_back(log::Param("key", "config")); + params.push_back(log::Param("value", nsConf.value())); + log(log::INFO, "Configuration entry", params); + } + } + + // Get the mount policy name for verification requests + + // All done + log(log::INFO, std::string("cta-frontend started"), params); +} + +void FrontendService::setNamespaceMap(const std::string& keytab_file) { + // Open the keytab file for reading + std::ifstream file(keytab_file); + if(!file) { + throw cta::exception::UserError("Failed to open namespace keytab configuration file " + keytab_file); + } + + // Parse the keytab line by line + std::string line; + for(int lineno = 0; std::getline(file, line); ++lineno) { + // Strip out comments + auto pos = line.find('#'); + if(pos != std::string::npos) { + line.resize(pos); + } + + // Parse one line + std::istringstream ss(line); + std::string diskInstance; + std::string endpoint; + std::string token; + std::string eol; + ss >> diskInstance >> endpoint >> token >> eol; + + // Ignore blank lines, all other lines must have exactly 3 elements + if(token.empty() || !eol.empty()) { + if(diskInstance.empty() && endpoint.empty() && token.empty()) continue; + throw cta::exception::UserError("Could not parse namespace keytab configuration file line " + std::to_string(lineno) + ": " + line); + } + m_namespaceMap.insert(std::make_pair(diskInstance, cta::Namespace(endpoint, token))); + } +} + +}} // namespace cta::frontend diff --git a/frontend/common/FrontendService.hpp b/frontend/common/FrontendService.hpp new file mode 100644 index 0000000000000000000000000000000000000000..ad5e38f2edbb78da51d04f912bb2c529f2b82a1f --- /dev/null +++ b/frontend/common/FrontendService.hpp @@ -0,0 +1,107 @@ +/* + * @project The CERN Tape Archive (CTA) + * @copyright Copyright © 2022 CERN + * @license This program is free software, distributed under the terms of the GNU General Public + * Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". You can + * redistribute it and/or modify it under the terms of the GPL Version 3, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * In applying this licence, CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization or + * submit itself to any jurisdiction. + */ + +#pragma once + +#include "scheduler/Scheduler.hpp" +#ifdef CTA_PGSCHED +#include "scheduler/PostgresSchedDB/PostgresSchedDBInit.hpp" +#else +#include "scheduler/OStoreDB/OStoreDBInit.hpp" +#endif +#include "Namespace.hpp" + +namespace cta { +namespace frontend { + +class FrontendService { +public: + explicit FrontendService(const std::string& configFilename); + + ~FrontendService() = default; + + /*! + * Get the log context + */ + cta::log::LogContext getLogContext() const { return cta::log::LogContext(*m_log); } + + /*! + * Get the Catalogue connection string + */ + const std::string getCatalogueConnectionString() const {return m_catalogue_conn_string; } + + /*! + * Get a reference to the Catalogue + */ + cta::catalogue::Catalogue& getCatalogue() const { return *m_catalogue; } + + /*! + * Get a reference to the Scheduler DB + */ + cta::SchedulerDB_t& getSchedDb() const { return *m_scheddb; } + + /*! + * Get a reference to the Scheduler + */ + cta::Scheduler& getScheduler() const { return *m_scheduler; } + + /*! + * Get the maximum file size for an archive request + */ + uint64_t getArchiveFileMaxSize() const { return m_archiveFileMaxSize; } + + /*! + * Get the repack buffer URL + */ + std::optional<std::string> getRepackBufferURL() const { return m_repackBufferURL; } + + /*! + * Get the verification mount policy + */ + std::optional<std::string> getVerificationMountPolicy() const { return m_verificationMountPolicy; } + + /*! + * Get the endpoints for namespace queries + */ + cta::NamespaceMap_t getNamespaceMap() const { return m_namespaceMap; } + +private: + /*! + * Set the verification mount policy + */ + void setVerificationMountPolicy(const std::string& verificationMountPolicy) { m_verificationMountPolicy = verificationMountPolicy; } + + /*! + * Populate the namespace endpoint configuration from a keytab file + */ + void setNamespaceMap(const std::string& keytab_file); + + // Member variables + std::unique_ptr<cta::log::Logger> m_log; //!< The logger + std::unique_ptr<cta::catalogue::Catalogue> m_catalogue; //!< Catalogue of tapes and tape files + std::unique_ptr<SchedulerDBInit_t> m_scheddbInit; //!< Persistent initialiser object for Scheduler DB + std::unique_ptr<cta::SchedulerDB_t> m_scheddb; //!< Scheduler DB for persistent objects (queues and requests) + std::unique_ptr<cta::Scheduler> m_scheduler; //!< The scheduler + + std::string m_catalogue_conn_string; //!< The catalogue connection string (without the password) + uint64_t m_archiveFileMaxSize; //!< Maximum allowed file size for archive requests + std::optional<std::string> m_repackBufferURL; //!< The repack buffer URL + std::optional<std::string> m_verificationMountPolicy; //!< The mount policy for verification requests + cta::NamespaceMap_t m_namespaceMap; //!< Endpoints for namespace queries +}; + +}} // namespace cta::frontend diff --git a/xroot_plugins/GrpcClient.cpp b/frontend/common/GrpcClient.cpp similarity index 100% rename from xroot_plugins/GrpcClient.cpp rename to frontend/common/GrpcClient.cpp diff --git a/xroot_plugins/GrpcClient.hpp b/frontend/common/GrpcClient.hpp similarity index 100% rename from xroot_plugins/GrpcClient.hpp rename to frontend/common/GrpcClient.hpp diff --git a/xroot_plugins/GrpcEndpoint.cpp b/frontend/common/GrpcEndpoint.cpp similarity index 98% rename from xroot_plugins/GrpcEndpoint.cpp rename to frontend/common/GrpcEndpoint.cpp index 25f3ea1fa9ec88dd2f0ee03ec1d306d04af5a340..b392357f0395b370a5467973d75d4139ab897953 100644 --- a/xroot_plugins/GrpcEndpoint.cpp +++ b/frontend/common/GrpcEndpoint.cpp @@ -15,11 +15,9 @@ * submit itself to any jurisdiction. */ -#include <xroot_plugins/GrpcEndpoint.hpp> - #include "common/exception/UserError.hpp" #include "common/exception/Exception.hpp" - +#include "frontend/common/GrpcEndpoint.hpp" std::string cta::grpc::Endpoint::getPath(const std::string &diskFileId) const { // diskFileId is sent to CTA as a uint64_t, but we store it as a decimal string, cf.: diff --git a/xroot_plugins/GrpcEndpoint.hpp b/frontend/common/GrpcEndpoint.hpp similarity index 96% rename from xroot_plugins/GrpcEndpoint.hpp rename to frontend/common/GrpcEndpoint.hpp index 75114a83ad0ca89a6e30c2de418876e11f6cab61..803c63a7a3129ec548debab65579d8e5ee97aa29 100644 --- a/xroot_plugins/GrpcEndpoint.hpp +++ b/frontend/common/GrpcEndpoint.hpp @@ -17,11 +17,9 @@ #pragma once -#include <xroot_plugins/Namespace.hpp> -#include <xroot_plugins/GrpcClient.hpp> - #include "common/exception/UserError.hpp" - +#include "frontend/common/GrpcClient.hpp" +#include "Namespace.hpp" namespace cta { namespace grpc { diff --git a/xroot_plugins/Namespace.hpp b/frontend/common/Namespace.hpp similarity index 84% rename from xroot_plugins/Namespace.hpp rename to frontend/common/Namespace.hpp index c08c9696106eeef1f538d9a60564a61087fe9ced..4ed98d0a6a207456bcd2bfdf41be97d608aed0f0 100644 --- a/xroot_plugins/Namespace.hpp +++ b/frontend/common/Namespace.hpp @@ -18,7 +18,6 @@ #pragma once #include <map> -#include <iostream> // for testing namespace cta { @@ -26,13 +25,12 @@ struct Namespace { Namespace(const std::string &ep, const std::string &tk) : endpoint(ep), token(tk) { -std::cerr << "Created namespace endpoint " << endpoint << " with token " << token << std::endl; } std::string endpoint; std::string token; }; -typedef std::map<std::string, Namespace> NamespaceMap_t; +using NamespaceMap_t = std::map<std::string, Namespace>; -} // namespace cta::grpc +} // namespace cta diff --git a/frontend/common/PbException.hpp b/frontend/common/PbException.hpp new file mode 100644 index 0000000000000000000000000000000000000000..e5a81b083c9eec42cbda045a9d98228f62201812 --- /dev/null +++ b/frontend/common/PbException.hpp @@ -0,0 +1,25 @@ +/* + * @project The CERN Tape Archive (CTA) + * @copyright Copyright © 2022 CERN + * @license This program is free software, distributed under the terms of the GNU General Public + * Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". You can + * redistribute it and/or modify it under the terms of the GPL Version 3, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * In applying this licence, CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization or + * submit itself to any jurisdiction. + */ + +#pragma once + +#include "common/exception/Exception.hpp" + +namespace cta { +namespace exception { +CTA_GENERATE_EXCEPTION_CLASS(PbException); +}} diff --git a/xroot_plugins/CMakeLists.txt b/xroot_plugins/CMakeLists.txt index 419df1c6cdc8dcae5f730bf78a020580608e7e6b..c8b7a40aaf2a37cac6a9d57ca1c9b7e6d63d2795 100644 --- a/xroot_plugins/CMakeLists.txt +++ b/xroot_plugins/CMakeLists.txt @@ -36,7 +36,9 @@ include_directories(${CMAKE_BINARY_DIR}/eos_cta ${PROTOBUF3_INCLUDE_DIRS}) # add_library(XrdSsiCta MODULE XrdSsiCtaServiceProvider.cpp XrdSsiCtaRequestProc.cpp XrdSsiCtaRequestMessage.cpp ../cmdline/CtaAdminCmdParse.cpp - GrpcClient.cpp GrpcEndpoint.cpp) + ../frontend/common/Config.cpp + ../frontend/common/FrontendService.cpp + ../frontend/common/GrpcClient.cpp ../frontend/common/GrpcEndpoint.cpp) if(CTA_USE_PGSCHED) target_link_libraries(XrdSsiCta ${XROOTD_XRDSSI_LIB} XrdSsiLib XrdSsiPbEosCta ctascheduler ctacommon ctacatalogue EosCtaGrpc ${GRPC_LIBRARY} ${GRPC_GRPC++_LIBRARY}) diff --git a/xroot_plugins/XrdCtaTapeFileLs.hpp b/xroot_plugins/XrdCtaTapeFileLs.hpp index 6c4b4ea52c26eee5281ccffeec5917a96240f7c9..43cbc9bf615b48693cbc04e63ff603b5becd4582 100644 --- a/xroot_plugins/XrdCtaTapeFileLs.hpp +++ b/xroot_plugins/XrdCtaTapeFileLs.hpp @@ -18,11 +18,10 @@ #pragma once #include "catalogue/CatalogueItor.hpp" -#include <common/checksum/ChecksumBlobSerDeser.hpp> -#include <xroot_plugins/GrpcEndpoint.hpp> -#include <xroot_plugins/XrdCtaStream.hpp> -#include <xroot_plugins/XrdSsiCtaRequestMessage.hpp> - +#include "common/checksum/ChecksumBlobSerDeser.hpp" +#include "frontend/common/GrpcEndpoint.hpp" +#include "XrdCtaStream.hpp" +#include "XrdSsiCtaRequestMessage.hpp" namespace cta { namespace xrd { diff --git a/xroot_plugins/XrdCtaTapeLs.hpp b/xroot_plugins/XrdCtaTapeLs.hpp index 3be29a62482e92789dfe2dcddbec0dc2cb47494b..e98f0c0f77fde0401384694a4dc209f1b5d28afd 100644 --- a/xroot_plugins/XrdCtaTapeLs.hpp +++ b/xroot_plugins/XrdCtaTapeLs.hpp @@ -17,9 +17,8 @@ #pragma once -#include <xroot_plugins/XrdCtaStream.hpp> -#include <xroot_plugins/XrdSsiCtaRequestMessage.hpp> - +#include "XrdCtaStream.hpp" +#include "XrdSsiCtaRequestMessage.hpp" namespace cta { namespace xrd { diff --git a/xroot_plugins/XrdCtaTapePoolLs.hpp b/xroot_plugins/XrdCtaTapePoolLs.hpp index 7478d081f394612baaee28da51fabd7a2c633ad3..6af30eec99397e23fc5293f99602bcea2d5afa8c 100644 --- a/xroot_plugins/XrdCtaTapePoolLs.hpp +++ b/xroot_plugins/XrdCtaTapePoolLs.hpp @@ -18,9 +18,9 @@ #pragma once #include "catalogue/TapePool.hpp" -#include <catalogue/TapePoolSearchCriteria.hpp> -#include <xroot_plugins/XrdCtaStream.hpp> -#include <xroot_plugins/XrdSsiCtaRequestMessage.hpp> +#include "catalogue/TapePoolSearchCriteria.hpp" +#include "XrdCtaStream.hpp" +#include "XrdSsiCtaRequestMessage.hpp" namespace cta { namespace xrd { diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp index 7409eae874a8d2f35108094b7c8233c2211f756f..93ab24173fd8e83f91964b365de37ce427dc5a6a 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp @@ -18,7 +18,6 @@ #include <limits> #include <sstream> #include <string> - #include <XrdSsiPbException.hpp> #include "catalogue/CreateMountPolicyAttributes.hpp" @@ -27,31 +26,30 @@ #include "common/dataStructures/LogicalLibrary.hpp" #include "common/dataStructures/RequesterActivityMountRule.hpp" #include "common/utils/Regex.hpp" - -#include "xroot_plugins/XrdCtaActivityMountRuleLs.hpp" -#include "xroot_plugins/XrdCtaAdminLs.hpp" -#include "xroot_plugins/XrdCtaArchiveRouteLs.hpp" -#include "xroot_plugins/XrdCtaChangeStorageClass.hpp" -#include "xroot_plugins/XrdCtaDiskInstanceLs.hpp" -#include "xroot_plugins/XrdCtaDiskInstanceSpaceLs.hpp" -#include "xroot_plugins/XrdCtaDiskSystemLs.hpp" -#include "xroot_plugins/XrdCtaDriveLs.hpp" -#include "xroot_plugins/XrdCtaFailedRequestLs.hpp" -#include "xroot_plugins/XrdCtaGroupMountRuleLs.hpp" -#include "xroot_plugins/XrdCtaLogicalLibraryLs.hpp" -#include "xroot_plugins/XrdCtaMediaTypeLs.hpp" -#include "xroot_plugins/XrdCtaMountPolicyLs.hpp" -#include "xroot_plugins/XrdCtaRecycleTapeFileLs.hpp" -#include "xroot_plugins/XrdCtaRepackLs.hpp" -#include "xroot_plugins/XrdCtaRequesterMountRuleLs.hpp" -#include "xroot_plugins/XrdCtaShowQueues.hpp" -#include "xroot_plugins/XrdCtaStorageClassLs.hpp" -#include "xroot_plugins/XrdCtaTapeFileLs.hpp" -#include "xroot_plugins/XrdCtaTapeLs.hpp" -#include "xroot_plugins/XrdCtaTapePoolLs.hpp" -#include "xroot_plugins/XrdCtaVersion.hpp" -#include "xroot_plugins/XrdCtaVirtualOrganizationLs.hpp" -#include "xroot_plugins/XrdSsiCtaRequestMessage.hpp" +#include "XrdCtaActivityMountRuleLs.hpp" +#include "XrdCtaAdminLs.hpp" +#include "XrdCtaArchiveRouteLs.hpp" +#include "XrdCtaChangeStorageClass.hpp" +#include "XrdCtaDiskInstanceLs.hpp" +#include "XrdCtaDiskInstanceSpaceLs.hpp" +#include "XrdCtaDiskSystemLs.hpp" +#include "XrdCtaDriveLs.hpp" +#include "XrdCtaFailedRequestLs.hpp" +#include "XrdCtaGroupMountRuleLs.hpp" +#include "XrdCtaLogicalLibraryLs.hpp" +#include "XrdCtaMediaTypeLs.hpp" +#include "XrdCtaMountPolicyLs.hpp" +#include "XrdCtaRecycleTapeFileLs.hpp" +#include "XrdCtaRepackLs.hpp" +#include "XrdCtaRequesterMountRuleLs.hpp" +#include "XrdCtaShowQueues.hpp" +#include "XrdCtaStorageClassLs.hpp" +#include "XrdCtaTapeFileLs.hpp" +#include "XrdCtaTapeLs.hpp" +#include "XrdCtaTapePoolLs.hpp" +#include "XrdCtaVersion.hpp" +#include "XrdCtaVirtualOrganizationLs.hpp" +#include "XrdSsiCtaRequestMessage.hpp" namespace cta { namespace xrd { @@ -1189,7 +1187,7 @@ void RequestMessage::processFailedRequest_Ls(cta::xrd::Response &response, XrdSs { using namespace cta::admin; - stream = new FailedRequestLsStream(*this, m_catalogue, m_scheduler, m_service.getSchedDb(), m_lc); + stream = new FailedRequestLsStream(*this, m_catalogue, m_scheduler, m_service.getFrontendService().getSchedDb(), m_lc); // Display the correct column headers response.set_show_header(has_flag(OptionBoolean::SUMMARY) ? HeaderType::FAILEDREQUEST_LS_SUMMARY diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.hpp b/xroot_plugins/XrdSsiCtaRequestMessage.hpp index 988b295943029057fc0a057863516c7c268088a7..e0cc8ec122ca003f56cbfbd55dbc4b7c8f4e5111 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.hpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.hpp @@ -19,11 +19,10 @@ #include <XrdSsi/XrdSsiEntity.hh> -#include "XrdSsiCtaServiceProvider.hpp" -#include "cta_frontend.pb.h" #include "common/utils/utils.hpp" #include "Versions.hpp" - +#include "XrdSsiCtaServiceProvider.hpp" +#include "cta_frontend.pb.h" namespace cta { namespace xrd { @@ -35,14 +34,14 @@ class RequestMessage public: RequestMessage(const XrdSsiEntity &client, const XrdSsiCtaServiceProvider *service) : m_service(*service), - m_catalogue(service->getCatalogue()), - m_scheduler(service->getScheduler()), - m_archiveFileMaxSize(service->getArchiveFileMaxSize()), - m_repackBufferURL(service->getRepackBufferURL()), - m_verificationMountPolicy(service->getVerificationMountPolicy()), - m_namespaceMap(service->getNamespaceMap()), - m_lc(service->getLogContext()), - m_catalogue_conn_string(service->getCatalogueConnectionString()) + m_catalogue(service->getFrontendService().getCatalogue()), + m_scheduler(service->getFrontendService().getScheduler()), + m_archiveFileMaxSize(service->getFrontendService().getArchiveFileMaxSize()), + m_repackBufferURL(service->getFrontendService().getRepackBufferURL()), + m_verificationMountPolicy(service->getFrontendService().getVerificationMountPolicy()), + m_namespaceMap(service->getFrontendService().getNamespaceMap()), + m_lc(service->getFrontendService().getLogContext()), + m_catalogue_conn_string(service->getFrontendService().getCatalogueConnectionString()) { m_cliIdentity.username = client.name; m_cliIdentity.host = cta::utils::getShortHostname(); // Host should be of the machine that executes the command diff --git a/xroot_plugins/XrdSsiCtaRequestProc.cpp b/xroot_plugins/XrdSsiCtaRequestProc.cpp index cfbb8ceff7a4b8a52d72ef3c786551b9bfe0c2b5..7eff401642041aef049ee6427287d43f3ab07468 100644 --- a/xroot_plugins/XrdSsiCtaRequestProc.cpp +++ b/xroot_plugins/XrdSsiCtaRequestProc.cpp @@ -15,25 +15,21 @@ * submit itself to any jurisdiction. */ -#include "common/dataStructures/ArchiveRequest.hpp" -#include "common/exception/Exception.hpp" - -#include "XrdSsiPbLog.hpp" -#include "XrdSsiPbException.hpp" -#include "XrdSsiPbRequestProc.hpp" +#include <XrdSsiPbLog.hpp> +#include <XrdSsiPbRequestProc.hpp> +#include "common/dataStructures/ArchiveRequest.hpp" +#include "frontend/common/PbException.hpp" #include "XrdSsiCtaServiceProvider.hpp" #include "XrdSsiCtaRequestMessage.hpp" - - namespace XrdSsiPb { /*! - * Convert a framework exception into a Response + * Convert a XrdSsiPb framework exception into a Response */ template<> -void ExceptionHandler<cta::xrd::Response, PbException>::operator()(cta::xrd::Response &response, const PbException &ex) +void ExceptionHandler<cta::xrd::Response, XrdSsiPb::PbException>::operator()(cta::xrd::Response &response, const XrdSsiPb::PbException &ex) { response.set_type(cta::xrd::Response::RSP_ERR_PROTOBUF); response.set_message_txt(ex.what()); @@ -42,7 +38,18 @@ void ExceptionHandler<cta::xrd::Response, PbException>::operator()(cta::xrd::Res XrdSsiPb::Log::Msg(XrdSsiPb::Log::ERROR, "ExceptionHandler", ex.what()); } +/*! + * Convert a CTA Protobuf exception into a Response + */ +template<> +void ExceptionHandler<cta::xrd::Response, cta::exception::PbException>::operator()(cta::xrd::Response &response, const cta::exception::PbException &ex) +{ + response.set_type(cta::xrd::Response::RSP_ERR_PROTOBUF); + response.set_message_txt(ex.what()); + // Also log the error in the XRootD log + XrdSsiPb::Log::Msg(XrdSsiPb::Log::ERROR, "ExceptionHandler", ex.what()); +} /*! * Process the Notification Request @@ -60,27 +67,28 @@ void RequestProc<cta::xrd::Request, cta::xrd::Response, cta::xrd::Alert>::Execut // If not, this is fatal unrecoverable error throw std::logic_error("XRootD Service is not a CTA Service"); } + auto lc = cta_service_ptr->getFrontendService().getLogContext(); try { // Process the message cta::xrd::RequestMessage request_msg(*(m_resource.client), cta_service_ptr); request_msg.process(m_request, m_metadata, m_response_stream_ptr); - } catch(PbException &ex) { + } catch(cta::exception::PbException &ex) { m_metadata.set_type(cta::xrd::Response::RSP_ERR_PROTOBUF); m_metadata.set_message_txt(ex.what()); - cta_service_ptr->getLogContext().log(cta::log::ERR, ErrorFunction + "RSP_ERR_PROTOBUF: " + ex.what()); + lc.log(cta::log::ERR, ErrorFunction + "RSP_ERR_PROTOBUF: " + ex.what()); } catch(cta::exception::UserError &ex) { m_metadata.set_type(cta::xrd::Response::RSP_ERR_USER); m_metadata.set_message_txt(ex.getMessageValue()); - cta_service_ptr->getLogContext().log(cta::log::ERR, ErrorFunction + "RSP_ERR_USER: " + ex.getMessageValue()); + lc.log(cta::log::ERR, ErrorFunction + "RSP_ERR_USER: " + ex.getMessageValue()); } catch(cta::exception::Exception &ex) { m_metadata.set_type(cta::xrd::Response::RSP_ERR_CTA); m_metadata.set_message_txt(ex.getMessageValue()); - cta_service_ptr->getLogContext().log(cta::log::ERR, ErrorFunction + "RSP_ERR_CTA: " + ex.what()); + lc.log(cta::log::ERR, ErrorFunction + "RSP_ERR_CTA: " + ex.what()); } catch(std::runtime_error &ex) { m_metadata.set_type(cta::xrd::Response::RSP_ERR_CTA); m_metadata.set_message_txt(ex.what()); - cta_service_ptr->getLogContext().log(cta::log::ERR, ErrorFunction + "RSP_ERR_CTA: " + ex.what()); + lc.log(cta::log::ERR, ErrorFunction + "RSP_ERR_CTA: " + ex.what()); } } diff --git a/xroot_plugins/XrdSsiCtaServiceProvider.cpp b/xroot_plugins/XrdSsiCtaServiceProvider.cpp index 77089bb68be157dfd22fc255ed309a73089bfd0c..b21a7a486d35c78802ed94a2244ff056acdb2ca1 100644 --- a/xroot_plugins/XrdSsiCtaServiceProvider.cpp +++ b/xroot_plugins/XrdSsiCtaServiceProvider.cpp @@ -15,23 +15,12 @@ * submit itself to any jurisdiction. */ -#include "cta_frontend.pb.h" -#include "version.h" - #include <XrdSsiPbAlert.hpp> #include <XrdSsiPbConfig.hpp> #include <XrdSsiPbService.hpp> -#include "catalogue/Catalogue.hpp" -#include "catalogue/CatalogueFactory.hpp" -#include "catalogue/CatalogueFactoryFactory.hpp" -#include "common/log/FileLogger.hpp" -#include "common/log/LogLevel.hpp" -#include "common/log/StdoutLogger.hpp" -#include "common/log/SyslogLogger.hpp" -#include "common/utils/utils.hpp" -#include "rdbms/Login.hpp" -#include "xroot_plugins/XrdSsiCtaServiceProvider.hpp" +#include "XrdSsiCtaServiceProvider.hpp" +#include "cta_frontend.pb.h" /* * Global pointer to the Service Provider object. @@ -42,11 +31,23 @@ */ XrdSsiProvider *XrdSsiProviderServer = new XrdSsiCtaServiceProvider; - // This method inherits from an external class to this project, so we cannot modify the interface +// This method inherits from an external class to this project, so we cannot modify the interface bool XrdSsiCtaServiceProvider::Init(XrdSsiLogger *logP, XrdSsiCluster *clsP, const std::string cfgFn, // cppcheck-suppress passedByValue const std::string parms, int argc, char **argv) { // cppcheck-suppress passedByValue try { - ExceptionThrowingInit(logP, clsP, cfgFn, parms, argc, argv); + XrdSsiPb::Log::Msg(XrdSsiPb::Log::INFO, LOG_SUFFIX, "Called Init(", cfgFn, ',', parms, ')'); + + // Set XRootD SSI Protobuf logging level from config file + XrdSsiPb::Config config(cfgFn); + auto loglevel = config.getOptionList("cta.log.ssi"); + if(loglevel.empty()) { + XrdSsiPb::Log::SetLogLevel("info"); + } else { + XrdSsiPb::Log::SetLogLevel(loglevel); + } + + // Initialise the Frontend Service object from the config file + m_frontendService = std::make_unique<cta::frontend::FrontendService>(cfgFn); return true; } catch(XrdSsiPb::XrdSsiException &ex) { XrdSsiPb::Log::Msg(XrdSsiPb::Log::ERROR, LOG_SUFFIX, "XrdSsiCtaServiceProvider::Init(): XrdSsiPb::XrdSsiException ", ex.what()); @@ -64,264 +65,6 @@ bool XrdSsiCtaServiceProvider::Init(XrdSsiLogger *logP, XrdSsiCluster *clsP, con return false; } -void XrdSsiCtaServiceProvider::ExceptionThrowingInit(XrdSsiLogger *logP, XrdSsiCluster *clsP, const std::string &cfgFn, - const std::string &parms, int argc, char **argv) -{ - using namespace XrdSsiPb; - using namespace cta; - - Log::Msg(XrdSsiPb::Log::INFO, LOG_SUFFIX, "Called Init(", cfgFn, ',', parms, ')'); - - // Read CTA namespaced configuration options from XRootD config file - Config config(cfgFn); - - // Set XRootD SSI Protobuf logging level - auto loglevel = config.getOptionList("cta.log.ssi"); - if(!loglevel.empty()) { - Log::SetLogLevel(loglevel); - } else { - Log::SetLogLevel("info"); - } - - int logToSyslog = 0; - int logToStdout = 0; - int logtoFile = 0; - std::string logFilePath = ""; - - // Instantiate the CTA logging system - try { - // Set the logger URL - auto loggerURL = config.getOptionValueStr("cta.log.url"); - if(!loggerURL.first) loggerURL.second = "syslog:"; - const auto shortHostname = utils::getShortHostname(); - - // Set the logger level - int loggerLevel = log::INFO; - auto loggerLevelStr = config.getOptionValueStr("cta.log.level"); - if(loggerLevelStr.first) loggerLevel = log::toLogLevel(loggerLevelStr.second); - - if (loggerURL.second == "syslog:") { - m_log.reset(new log::SyslogLogger(shortHostname, "cta-frontend", loggerLevel)); - logToSyslog = 1; - } else if (loggerURL.second == "stdout:") { - m_log.reset(new log::StdoutLogger(shortHostname, "cta-frontend")); - logToStdout = 1; - } else if (loggerURL.second.substr(0, 5) == "file:") { - logtoFile = 1; - logFilePath = loggerURL.second.substr(5); - m_log.reset(new log::FileLogger(shortHostname, "cta-frontend", logFilePath, loggerLevel)); - } else { - throw exception::UserError(std::string("Unknown log URL: ") + loggerURL.second); - } - } catch(exception::Exception &ex) { - std::string ex_str("Failed to instantiate object representing CTA logging system: "); - throw exception::Exception(ex_str + ex.getMessage().str()); - } - - const std::list<log::Param> params = {log::Param("version", CTA_VERSION)}; - log::Logger &log = *m_log; - - { - // Log starting message - std::list<log::Param> params; - params.push_back(log::Param("version", CTA_VERSION)); - params.push_back(log::Param("configFileLocation", cfgFn)); - params.push_back(log::Param("logToStdout", std::to_string(logToStdout))); - params.push_back(log::Param("logToSyslog", std::to_string(logToSyslog))); - params.push_back(log::Param("logtoFile", std::to_string(logtoFile))); - params.push_back(log::Param("logFilePath", logFilePath)); - log(log::INFO, std::string("Starting cta-frontend"), params); - } - - // Initialise the Catalogue - std::string catalogueConfigFile = "/etc/cta/cta-catalogue.conf"; - const rdbms::Login catalogueLogin = rdbms::Login::parseFile(catalogueConfigFile); - auto catalogue_numberofconnections = config.getOptionValueInt("cta.catalogue.numberofconnections"); - if(!catalogue_numberofconnections.first) { - throw exception::UserError("cta.catalogue.numberofconnections is not set in configuration file " + cfgFn); - } - const uint64_t nbArchiveFileListingConns = 2; - - { - // Log catalogue.numberofconnections - std::list<log::Param> params; - params.push_back(log::Param("source", cfgFn)); - params.push_back(log::Param("category", "cta.catalogue")); - params.push_back(log::Param("key", "numberofconnections")); - params.push_back(log::Param("value", std::to_string(catalogue_numberofconnections.second))); - log(log::INFO, "Configuration entry", params); - } - { - // Log catalogue number of archive file listing connections - std::list<log::Param> params; - params.push_back(log::Param("source", "Compile time default")); - params.push_back(log::Param("category", "cta.catalogue")); - params.push_back(log::Param("key", "nbArchiveFileListingConns")); - params.push_back(log::Param("value", std::to_string(nbArchiveFileListingConns))); - log(log::INFO, "Configuration entry", params); - } - - { - auto catalogueFactory = catalogue::CatalogueFactoryFactory::create(*m_log, catalogueLogin, - catalogue_numberofconnections.second, nbArchiveFileListingConns); - m_catalogue = catalogueFactory->create(); - try{ - m_catalogue->ping(); - } catch(cta::exception::Exception& ex){ - log::LogContext lc(*m_log); - lc.log(cta::log::CRIT,ex.getMessageValue()); - throw ex; - } - } - - this->m_catalogue_conn_string = catalogueLogin.connectionString; - - // Initialise the Scheduler DB - const std::string DB_CONN_PARAM = "cta.objectstore.backendpath"; - auto db_conn = config.getOptionValueStr(DB_CONN_PARAM); - if(!db_conn.first) { - throw exception::UserError(DB_CONN_PARAM + " is not set in configuration file " + cfgFn); - } - - { - // Log cta.objectstore.backendpath - std::list<log::Param> params; - params.push_back(log::Param("source", cfgFn)); - params.push_back(log::Param("category", "cta.objectstore")); - params.push_back(log::Param("key", "backendpath")); - params.push_back(log::Param("value", db_conn.second)); - log(log::INFO, "Configuration entry", params); - } - - m_scheddb_init = std::make_unique<SchedulerDBInit_t>("Frontend", db_conn.second, *m_log); - m_scheddb = m_scheddb_init->getSchedDB(*m_catalogue, *m_log); - - std::optional<size_t> schedulerThreadStackOpt; - const auto schedulerThreadStackSize = config.getOptionValueInt("ca.schedulerdb.threadstacksize_mb"); - if (schedulerThreadStackSize.first) { - schedulerThreadStackOpt = schedulerThreadStackSize.second * 1024 * 1024; - } - - auto threadPoolSize = config.getOptionValueInt("cta.schedulerdb.numberofthreads"); - if (threadPoolSize.first) { - m_scheddb->setThreadNumber(threadPoolSize.second, schedulerThreadStackOpt); - } - m_scheddb->setBottomHalfQueueSize(25000); - - { - // Log cta.schedulerdb.numberofthreads - if (threadPoolSize.first) { - std::list<log::Param> params; - params.push_back(log::Param("source", cfgFn)); - params.push_back(log::Param("category", "cta.schedulerdb")); - params.push_back(log::Param("key", "numberofthreads")); - params.push_back(log::Param("value", std::to_string(threadPoolSize.second))); - log(log::INFO, "Configuration entry", params); - } - } - - // Initialise the Scheduler - m_scheduler = std::make_unique<cta::Scheduler>(*m_catalogue, *m_scheddb, 5, 2*1000*1000); - - // Initialise the Frontend - auto archiveFileMaxSize = config.getOptionValueInt("cta.archivefile.max_size_gb"); - m_archiveFileMaxSize = archiveFileMaxSize.first ? archiveFileMaxSize.second : 0; // GB - m_archiveFileMaxSize *= 1024*1024*1024; // bytes - - { - // Log cta.archivefile.max_size_gb - std::list<log::Param> params; - params.push_back(log::Param("source", archiveFileMaxSize.first ? cfgFn: "Compile time default")); - params.push_back(log::Param("category", "cta.archivefile")); - params.push_back(log::Param("key", "max_size_gb")); - params.push_back(log::Param("value", std::to_string(archiveFileMaxSize.first ? archiveFileMaxSize.second : 0))); - log(log::INFO, "Configuration entry", params); - } - - // Get the repack buffer URL - auto repackBufferURLConf = config.getOptionValueStr("cta.repack.repack_buffer_url"); - if(repackBufferURLConf.first){ - m_repackBufferURL = repackBufferURLConf.second; - } - - // Get the verification mount policy - const auto verificationMountPolicy = config.getOptionValueStr("cta.verification.mount_policy"); - if(verificationMountPolicy.first){ - m_verificationMountPolicy = verificationMountPolicy.second; - } - - { - // Log cta.repack.repack_buffer_url - if(repackBufferURLConf.first){ - std::list<log::Param> params; - params.push_back(log::Param("source", cfgFn)); - params.push_back(log::Param("category", "cta.repack")); - params.push_back(log::Param("key", "repack_buffer_url")); - params.push_back(log::Param("value", repackBufferURLConf.second)); - log(log::INFO, "Configuration entry", params); - } - } - - // Get the endpoint for namespace queries - auto nsConf = config.getOptionValueStr("cta.ns.config"); - if(nsConf.first) { - setNamespaceMap(nsConf.second); - } else { - Log::Msg(XrdSsiPb::Log::WARNING, LOG_SUFFIX, "warning: 'cta.ns.config' not specified; namespace queries are disabled"); - } - - { - // Log cta.ns.config - if(nsConf.first){ - std::list<log::Param> params; - params.push_back(log::Param("source", cfgFn)); - params.push_back(log::Param("category", "cta.ns")); - params.push_back(log::Param("key", "config")); - params.push_back(log::Param("value", nsConf.second)); - log(log::INFO, "Configuration entry", params); - } - } - - // Get the mount policy name for verification requests - - // All done - log(log::INFO, std::string("cta-frontend started"), params); -} - -void XrdSsiCtaServiceProvider::setNamespaceMap(const std::string &keytab_file) -{ - // Open the keytab file for reading - std::ifstream file(keytab_file); - if(!file) { - throw cta::exception::UserError("Failed to open namespace keytab configuration file " + keytab_file); - } - - // Parse the keytab line by line - std::string line; - for(int lineno = 0; std::getline(file, line); ++lineno) { - // Strip out comments - auto pos = line.find('#'); - if(pos != std::string::npos) { - line.resize(pos); - } - - // Parse one line - std::stringstream ss(line); - std::string diskInstance; - std::string endpoint; - std::string token; - std::string eol; - ss >> diskInstance >> endpoint >> token >> eol; - - // Ignore blank lines, all other lines must have exactly 3 elements - if(token.empty() || !eol.empty()) { - if(diskInstance.empty() && endpoint.empty() && token.empty()) continue; - throw cta::exception::UserError("Could not parse namespace keytab configuration file line " + std::to_string(lineno) + ": " + line); - } - m_namespaceMap.insert(std::make_pair(diskInstance, cta::Namespace(endpoint, token))); - } -} - XrdSsiService* XrdSsiCtaServiceProvider::GetService(XrdSsiErrInfo &eInfo, const std::string &contact, int oHold) { XrdSsiPb::Log::Msg(XrdSsiPb::Log::INFO, LOG_SUFFIX, "Called GetService(", contact, ',', oHold, ')'); diff --git a/xroot_plugins/XrdSsiCtaServiceProvider.hpp b/xroot_plugins/XrdSsiCtaServiceProvider.hpp index 93f3f134634dc938b6ceac54f0c87d155a3b0a7f..60e1991fc705e99cff00406b18fb5ed3bdee8677 100644 --- a/xroot_plugins/XrdSsiCtaServiceProvider.hpp +++ b/xroot_plugins/XrdSsiCtaServiceProvider.hpp @@ -19,20 +19,20 @@ #include <memory> #include <string> - #include <XrdSsi/XrdSsiProvider.hh> +#include <XrdSsiPbLog.hpp> #include "catalogue/Catalogue.hpp" -#include <common/Configuration.hpp> -#include <common/utils/utils.hpp> -#include <xroot_plugins/Namespace.hpp> -#include <XrdSsiPbLog.hpp> -#include <scheduler/Scheduler.hpp> +#include "common/Configuration.hpp" +#include "common/utils/utils.hpp" +#include "scheduler/Scheduler.hpp" #ifdef CTA_PGSCHED -#include <scheduler/PostgresSchedDB/PostgresSchedDBInit.hpp> +#include "scheduler/PostgresSchedDB/PostgresSchedDBInit.hpp" #else -#include <scheduler/OStoreDB/OStoreDBInit.hpp> +#include "scheduler/OStoreDB/OStoreDBInit.hpp" #endif +#include "frontend/common/Namespace.hpp" +#include "frontend/common/FrontendService.hpp" /*! * Global pointer to the Service Provider object. @@ -44,7 +44,7 @@ extern XrdSsiProvider *XrdSsiProviderServer; */ class XrdSsiCtaServiceProvider : public XrdSsiProvider { public: - XrdSsiCtaServiceProvider() : m_archiveFileMaxSize(0) { + XrdSsiCtaServiceProvider() { XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "XrdSsiCtaServiceProvider() constructor"); } @@ -86,73 +86,12 @@ public: XrdSsiProvider::rStat QueryResource(const char *rName, const char *contact = nullptr) override; /*! - * Get a reference to the Scheduler DB for this Service - */ - cta::SchedulerDB_t &getSchedDb() const { return *m_scheddb; } - - /*! - * Get a reference to the Catalogue for this Service - */ - cta::catalogue::Catalogue &getCatalogue() const { return *m_catalogue; } - - /*! - * Get a reference to the Scheduler for this Service - */ - cta::Scheduler &getScheduler() const { return *m_scheduler; } - - /*! - * Get the log context for this Service - */ - cta::log::LogContext getLogContext() const { return cta::log::LogContext(*m_log); } - - /*! - * Get the maximum file size for an archive request - */ - uint64_t getArchiveFileMaxSize() const { return m_archiveFileMaxSize; } - - /*! - * Get the repack buffer URL + * Get a reference to the FrontendService object */ - std::optional<std::string> getRepackBufferURL() const { return m_repackBufferURL; } - - /*! - * Get the verification mount policy - */ - std::optional<std::string> getVerificationMountPolicy() const { return m_verificationMountPolicy; } - - - /*! - * Populate the namespace endpoint configuration from a keytab file - */ - void setNamespaceMap(const std::string &keytab_file); - - /*! - * Get the endpoints for namespace queries - */ - cta::NamespaceMap_t getNamespaceMap() const { return m_namespaceMap; } - - const std::string getCatalogueConnectionString() const {return m_catalogue_conn_string; } + cta::frontend::FrontendService& getFrontendService() const { return *m_frontendService; } private: - /*! - * Version of Init() that throws exceptions in case of problems - */ - void ExceptionThrowingInit(XrdSsiLogger *logP, XrdSsiCluster *clsP, const std::string &cfgFn, - const std::string &parms, int argc, char **argv); - // Member variables - - std::unique_ptr<cta::catalogue::Catalogue> m_catalogue; //!< Catalogue of tapes and tape files - std::unique_ptr<cta::SchedulerDB_t> m_scheddb; //!< Scheduler DB for persistent objects (queues and requests) - std::unique_ptr<cta::SchedulerDBInit_t> m_scheddb_init; //!< Wrapper to manage Scheduler DB initialisation - std::unique_ptr<cta::Scheduler> m_scheduler; //!< The scheduler - std::unique_ptr<cta::log::Logger> m_log; //!< The logger - - uint64_t m_archiveFileMaxSize; //!< Maximum allowed file size for archive requests - std::optional<std::string> m_repackBufferURL; //!< The repack buffer URL - std::optional<std::string> m_verificationMountPolicy; //!< The mount policy for verification requests - cta::NamespaceMap_t m_namespaceMap; //!< Endpoints for namespace queries - std::string m_catalogue_conn_string; //!< The catalogue connection string (without the password) - + std::unique_ptr<cta::frontend::FrontendService> m_frontendService; //!< protocol-neutral CTA Frontend Service object static constexpr const char* const LOG_SUFFIX = "XrdSsiCtaServiceProvider"; //!< Identifier for log messages };