From 7f917d62404a8c2fc80647b8df52e71301bba69b Mon Sep 17 00:00:00 2001 From: Michael Davis <michael.davis@cern.ch> Date: Thu, 25 Mar 2021 11:23:08 +0000 Subject: [PATCH] Clean os references --- scheduler/OStoreDB/OStoreDBInit.hpp | 88 ++++++++++++++++++++++ tapeserver/daemon/DriveHandler.cpp | 86 +++++---------------- tapeserver/daemon/MaintenanceHandler.cpp | 43 +++-------- xroot_plugins/XrdSsiCtaRequestMessage.cpp | 2 +- xroot_plugins/XrdSsiCtaRequestMessage.hpp | 4 +- xroot_plugins/XrdSsiCtaServiceProvider.cpp | 29 +++---- xroot_plugins/XrdSsiCtaServiceProvider.hpp | 61 +++++---------- 7 files changed, 149 insertions(+), 164 deletions(-) create mode 100644 scheduler/OStoreDB/OStoreDBInit.hpp diff --git a/scheduler/OStoreDB/OStoreDBInit.hpp b/scheduler/OStoreDB/OStoreDBInit.hpp new file mode 100644 index 0000000000..82349ca987 --- /dev/null +++ b/scheduler/OStoreDB/OStoreDBInit.hpp @@ -0,0 +1,88 @@ +/** + * @project The CERN Tape Archive (CTA) + * @copyright Copyright © 2021 CERN + * @license This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <common/make_unique.hpp> +#include <objectstore/BackendPopulator.hpp> +#include <objectstore/BackendFactory.hpp> +#include <objectstore/AgentHeartbeatThread.hpp> +#include <objectstore/BackendVFS.hpp> +#include <objectstore/GarbageCollector.hpp> +#include <scheduler/OStoreDB/OStoreDBWithAgent.hpp> + +namespace cta { + +class OStoreDBInit +{ +public: + OStoreDBInit(const std::string& client_process, const std::string& db_conn_str, log::Logger& log, + bool leaveNonEmptyAgentsBehind = false) + { + // Initialise the ObjectStore Backend + m_backend = std::move(objectstore::BackendFactory::createBackend(db_conn_str, log)); + m_backendPopulator = make_unique<objectstore::BackendPopulator>(*m_backend, client_process, log::LogContext(log)); + if(leaveNonEmptyAgentsBehind) { + m_backendPopulator->leaveNonEmptyAgentsBehind(); + } + + try { + // If the backend is a VFS, don't delete it on exit + dynamic_cast<objectstore::BackendVFS &>(*m_backend).noDeleteOnExit(); + } catch (std::bad_cast &) { + // If not, never mind + } + + // Start the heartbeat thread for the agent object. The thread is guaranteed to have started before we call the unique_ptr deleter + auto aht = new objectstore::AgentHeartbeatThread(m_backendPopulator->getAgentReference(), *m_backend, log); + aht->startThread(); + m_agentHeartbeat = std::move(UniquePtrAgentHeartbeatThread(aht)); + } + + std::unique_ptr<OStoreDBWithAgent> getSchedDB(catalogue::Catalogue& catalogue, log::Logger& log) { + return make_unique<OStoreDBWithAgent>(*m_backend, m_backendPopulator->getAgentReference(), catalogue, log); + } + + objectstore::GarbageCollector getGarbageCollector(catalogue::Catalogue& catalogue) { + return objectstore::GarbageCollector(*m_backend, m_backendPopulator->getAgentReference(), catalogue); + } + +private: + /*! + * Deleter for instances of the AgentHeartbeatThread class. + * + * Using a deleter (rather than calling from a destructor) guarantees that we can't call stopAndWaitThread() + * before the AgentHeartbeatThread has been started. + */ + struct AgentHeartbeatThreadDeleter { + void operator()(objectstore::AgentHeartbeatThread *aht) { + aht->stopAndWaitThread(); + } + }; + typedef std::unique_ptr<objectstore::AgentHeartbeatThread, AgentHeartbeatThreadDeleter> UniquePtrAgentHeartbeatThread; + + // Member variables + + std::unique_ptr<objectstore::Backend> m_backend; //!< VFS backend for the objectstore DB + std::unique_ptr<objectstore::BackendPopulator> m_backendPopulator; //!< Object used to populate the backend + UniquePtrAgentHeartbeatThread m_agentHeartbeat; //!< Agent heartbeat thread +}; + +typedef OStoreDBInit SchedulerDBInit_t; +typedef OStoreDBWithAgent SchedulerDB_t; + +} // namespace cta diff --git a/tapeserver/daemon/DriveHandler.cpp b/tapeserver/daemon/DriveHandler.cpp index 5d5532e356..6f96341539 100644 --- a/tapeserver/daemon/DriveHandler.cpp +++ b/tapeserver/daemon/DriveHandler.cpp @@ -22,13 +22,9 @@ #include "common/processCap/ProcessCap.hpp" #include "DriveHandler.hpp" #include "DriveHandlerProxy.hpp" -#include "objectstore/Backend.hpp" -#include "objectstore/BackendFactory.hpp" -#include "objectstore/BackendVFS.hpp" -#include "objectstore/BackendPopulator.hpp" -#include "objectstore/AgentHeartbeatThread.hpp" #include "rdbms/Login.hpp" #include "scheduler/OStoreDB/OStoreDBWithAgent.hpp" +#include "scheduler/OStoreDB/OStoreDBInit.hpp" #include "tapeserver/castor/tape/tapeserver/daemon/CleanerSession.hpp" #include "tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.hpp" #include "tapeserver/castor/tape/tapeserver/daemon/Session.hpp" @@ -873,47 +869,29 @@ int DriveHandler::runChild() { params.add("backendPath", m_tapedConfig.backendPath.value()); lc.log(log::DEBUG, "In DriveHandler::runChild(): will connect to object store backend."); } - // Before anything, we need to check we have access to the scheduler's central storages. - std::unique_ptr<cta::objectstore::Backend> backend; - try { - backend.reset(cta::objectstore::BackendFactory::createBackend(m_tapedConfig.backendPath.value(), lc.logger()).release()); - } catch (cta::exception::Exception &ex) { - log::ScopedParamContainer param (lc); - param.add("errorMessage", ex.getMessageValue()); - lc.log(log::CRIT, "In DriveHandler::runChild(): failed to connect to objectstore. Reporting fatal error."); - driveHandlerProxy.reportState(tape::session::SessionState::Fatal, tape::session::SessionType::Undetermined, ""); - sleep(1); - return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN; - } - // If the backend is a VFS, make sure we don't delete it on exit. - // If not, nevermind. - try { - dynamic_cast<cta::objectstore::BackendVFS &>(*backend).noDeleteOnExit(); - } catch (std::bad_cast &){} - // Create the agent entry in the object store. This could fail (even before ping, so - // handle failure like a ping failure). - std::unique_ptr<cta::objectstore::BackendPopulator> backendPopulator; - std::unique_ptr<cta::OStoreDBWithAgent> osdb; + // Before anything, we need to check we have access to the scheduler's central storage + std::unique_ptr<SchedulerDBInit_t> sched_db_init; try { std::string processName="DriveProcess-"; processName+=m_configLine.unitName; log::ScopedParamContainer params(lc); params.add("processName", processName); lc.log(log::DEBUG, "In DriveHandler::runChild(): will create agent entry. Enabling leaving non-empty agent behind."); - backendPopulator.reset(new cta::objectstore::BackendPopulator(*backend, processName, lc)); - backendPopulator->leaveNonEmptyAgentsBehind(); - } catch(cta::exception::Exception &ex) { - log::ScopedParamContainer param(lc); + sched_db_init.reset(new SchedulerDBInit_t(processName, m_tapedConfig.backendPath.value(), m_processManager.logContext().logger(), true)); + } catch (cta::exception::Exception &ex) { + log::ScopedParamContainer param (lc); param.add("errorMessage", ex.getMessageValue()); - lc.log(log::CRIT, "In DriveHandler::runChild(): failed to instantiate agent entry. Reporting fatal error."); + lc.log(log::CRIT, "In DriveHandler::runChild(): failed to connect to objectstore or failed to instantiate agent entry. Reporting fatal error."); driveHandlerProxy.reportState(tape::session::SessionState::Fatal, tape::session::SessionType::Undetermined, ""); sleep(1); return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN; } + std::unique_ptr<SchedulerDB_t> sched_db; try { - if(!m_catalogue) + if(!m_catalogue) { m_catalogue = createCatalogue("DriveHandler::runChild()"); - osdb.reset(new cta::OStoreDBWithAgent(*backend, backendPopulator->getAgentReference(), *m_catalogue, lc.logger())); + } + sched_db = sched_db_init->getSchedDB(*m_catalogue, lc.logger()); } catch(cta::exception::Exception &ex) { log::ScopedParamContainer param(lc); param.add("errorMessage", ex.getMessageValue()); @@ -923,7 +901,7 @@ int DriveHandler::runChild() { return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN; } lc.log(log::DEBUG, "In DriveHandler::runChild(): will create scheduler."); - cta::Scheduler scheduler(*m_catalogue, *osdb, m_tapedConfig.mountCriteria.value().maxFiles, + cta::Scheduler scheduler(*m_catalogue, *sched_db, m_tapedConfig.mountCriteria.value().maxFiles, m_tapedConfig.mountCriteria.value().maxBytes); // Before launching the transfer session, we validate that the scheduler is reachable. lc.log(log::DEBUG, "In DriveHandler::runChild(): will ping scheduler."); @@ -943,11 +921,6 @@ int DriveHandler::runChild() { return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN; } - lc.log(log::DEBUG, "In DriveHandler::runChild(): will start agent heartbeat."); - // The object store is accessible, let's turn the agent heartbeat on. - objectstore::AgentHeartbeatThread agentHeartbeat(backendPopulator->getAgentReference(), *backend, lc.logger()); - agentHeartbeat.startThread(); - // 1) Special case first, if we crashed in a cleaner session, we put the drive down if (m_previousSession == PreviousSession::Crashed && m_previousType == SessionType::Cleanup) { log::ScopedParamContainer params(lc); @@ -1117,7 +1090,7 @@ int DriveHandler::runChild() { //Checking the drive does not already exist in the objectstore try{ - osdb->checkDriveCanBeCreated(driveInfo); + sched_db->checkDriveCanBeCreated(driveInfo); } catch (SchedulerDatabase::DriveAlreadyExistsException &ex) { log::ScopedParamContainer param(lc); param.add("tapeDrive",driveInfo.driveName) @@ -1164,7 +1137,6 @@ int DriveHandler::runChild() { scheduler); auto ret = dataTransferSession.execute(); - agentHeartbeat.stopAndWaitThread(); return ret; } } @@ -1199,42 +1171,24 @@ SubprocessHandler::ProcessingStatus DriveHandler::shutdown() { m_catalogue = createCatalogue("DriveHandler::shutdown()"); //Create the scheduler + std::unique_ptr<SchedulerDBInit_t> sched_db_init; - //Create the backend - std::unique_ptr<cta::objectstore::Backend> backend; - try { - backend.reset(cta::objectstore::BackendFactory::createBackend(m_tapedConfig.backendPath.value(), lc.logger()).release()); - } catch (cta::exception::Exception &ex) { - log::ScopedParamContainer param (lc); - param.add("errorMessage", ex.getMessageValue()); - lc.log(log::CRIT, "In DriveHandler::shutdown(): failed to connect to objectstore."); - goto exitShutdown; - } - // If the backend is a VFS, make sure we don't delete it on exit. - // If not, nevermind. - try { - dynamic_cast<cta::objectstore::BackendVFS &>(*backend).noDeleteOnExit(); - } catch (std::bad_cast &){} - // Create the agent entry in the object store. This could fail (even before ping, so - // handle failure like a ping failure). - std::unique_ptr<cta::objectstore::BackendPopulator> backendPopulator; - std::unique_ptr<cta::OStoreDBWithAgent> osdb; try { std::string processName="DriveHandlerShutdown-"; processName+=m_configLine.unitName; log::ScopedParamContainer params(lc); params.add("processName", processName); lc.log(log::DEBUG, "In DriveHandler::shutdown(): will create agent entry. Enabling leaving non-empty agent behind."); - backendPopulator.reset(new cta::objectstore::BackendPopulator(*backend, processName, lc)); - } catch(cta::exception::Exception &ex) { - log::ScopedParamContainer param(lc); + sched_db_init.reset(new SchedulerDBInit_t(processName, m_tapedConfig.backendPath.value(), lc.logger(), true)); + } catch (cta::exception::Exception &ex) { + log::ScopedParamContainer param (lc); param.add("errorMessage", ex.getMessageValue()); - lc.log(log::CRIT, "In DriveHandler::shutdown(): failed to instantiate agent entry. Reporting fatal error."); + lc.log(log::CRIT, "In DriveHandler::shutdown(): failed to connect to objectstore or failed to instantiate agent entry. Reporting fatal error."); goto exitShutdown; } - osdb.reset(new cta::OStoreDBWithAgent(*backend, backendPopulator->getAgentReference(), *m_catalogue, lc.logger())); + std::unique_ptr<SchedulerDB_t> sched_db = sched_db_init->getSchedDB(*m_catalogue, lc.logger()); lc.log(log::DEBUG, "In DriveHandler::shutdown(): will create scheduler."); - std::unique_ptr<cta::Scheduler> scheduler(new Scheduler(*m_catalogue, *osdb, 0,0)); + std::unique_ptr<cta::Scheduler> scheduler(new Scheduler(*m_catalogue, *sched_db, 0,0)); cta::mediachanger::MediaChangerFacade mediaChangerFacade(m_processManager.logContext().logger()); castor::tape::System::realWrapper sWrapper; diff --git a/tapeserver/daemon/MaintenanceHandler.cpp b/tapeserver/daemon/MaintenanceHandler.cpp index b05d139554..8e72c5d948 100644 --- a/tapeserver/daemon/MaintenanceHandler.cpp +++ b/tapeserver/daemon/MaintenanceHandler.cpp @@ -18,15 +18,10 @@ #include "MaintenanceHandler.hpp" #include "common/exception/Errnum.hpp" -#include "objectstore/AgentHeartbeatThread.hpp" -#include "objectstore/BackendPopulator.hpp" -#include "objectstore/BackendFactory.hpp" -#include "objectstore/BackendVFS.hpp" -#include "objectstore/GarbageCollector.hpp" -#include "scheduler/OStoreDB/OStoreDBWithAgent.hpp" #include "catalogue/Catalogue.hpp" #include "catalogue/CatalogueFactoryFactory.hpp" #include "scheduler/Scheduler.hpp" +#include "scheduler/OStoreDB/OStoreDBInit.hpp" #include "rdbms/Login.hpp" #include "common/make_unique.hpp" #include "scheduler/DiskReportRunner.hpp" @@ -268,32 +263,23 @@ void MaintenanceHandler::exceptionThrowingRunChild(){ // Before anything, we will check for access to the scheduler's central storage. // If we fail to access it, we cannot work. We expect the drive processes to - // fail likewise, so we just wait for shutdown signal (no feedback to main - // process). - std::unique_ptr<cta::objectstore::Backend> backend( - cta::objectstore::BackendFactory::createBackend(m_tapedConfig.backendPath.value(), m_processManager.logContext().logger()).release()); - // If the backend is a VFS, make sure we don't delete it on exit. - // If not, nevermind. - try { - dynamic_cast<cta::objectstore::BackendVFS &>(*backend).noDeleteOnExit(); - } catch (std::bad_cast &){} - // Create the agent entry in the object store. This could fail (even before ping, so - // handle failure like a ping failure). - std::unique_ptr<cta::objectstore::BackendPopulator> backendPopulator; - std::unique_ptr<cta::OStoreDBWithAgent> osdb; + // fail likewise, so we just wait for shutdown signal (no feedback to main process). + SchedulerDBInit_t sched_db_init("Maintenance", m_tapedConfig.backendPath.value(), m_processManager.logContext().logger()); + + std::unique_ptr<cta::SchedulerDB_t> sched_db; std::unique_ptr<cta::catalogue::Catalogue> catalogue; std::unique_ptr<cta::Scheduler> scheduler; try { - backendPopulator.reset(new cta::objectstore::BackendPopulator(*backend, "Maintenance", m_processManager.logContext())); const cta::rdbms::Login catalogueLogin = cta::rdbms::Login::parseFile(m_tapedConfig.fileCatalogConfigFile.value()); const uint64_t nbConns = 1; const uint64_t nbArchiveFileListingConns = 1; auto catalogueFactory = cta::catalogue::CatalogueFactoryFactory::create(m_processManager.logContext().logger(), catalogueLogin, nbConns, nbArchiveFileListingConns); - catalogue=catalogueFactory->create(); - osdb.reset(new cta::OStoreDBWithAgent(*backend, backendPopulator->getAgentReference(), *catalogue, m_processManager.logContext().logger())); - scheduler=make_unique<cta::Scheduler>(*catalogue, *osdb, 5, 2*1000*1000); //TODO: we have hardcoded the mount policy parameters here temporarily we will remove them once we know where to put them - // Before launching the transfer session, we validate that the scheduler is reachable. + catalogue = catalogueFactory->create(); + sched_db = sched_db_init.getSchedDB(*catalogue, m_processManager.logContext().logger()); + // TODO: we have hardcoded the mount policy parameters here temporarily we will remove them once we know where to put them + scheduler = make_unique<cta::Scheduler>(*catalogue, *sched_db, 5, 2*1000*1000); + // Before launching the transfer session, we validate that the scheduler is reachable. scheduler->ping(m_processManager.logContext()); } catch(cta::exception::Exception &ex) { { @@ -310,13 +296,9 @@ void MaintenanceHandler::exceptionThrowingRunChild(){ "In MaintenanceHandler::exceptionThrowingRunChild(): Received shutdown message after failure to contact storage. Exiting."); throw ex; } - - // The object store is accessible, let's turn the agent heartbeat on. - objectstore::AgentHeartbeatThread agentHeartbeat(backendPopulator->getAgentReference(), *backend, m_processManager.logContext().logger()); - agentHeartbeat.startThread(); - + // Create the garbage collector and the disk reporter - objectstore::GarbageCollector gc(*backend, backendPopulator->getAgentReference(), *catalogue); + auto gc = sched_db_init.getGarbageCollector(*catalogue); DiskReportRunner diskReportRunner(*scheduler); RepackRequestManager repackRequestManager(*scheduler); @@ -366,7 +348,6 @@ void MaintenanceHandler::exceptionThrowingRunChild(){ "In MaintenanceHandler::exceptionThrowingRunChild(): received an unknown exception."); throw; } - agentHeartbeat.stopAndWaitThread(); } //------------------------------------------------------------------------------ diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp index 0211ee12be..63fa578d57 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp @@ -1041,7 +1041,7 @@ void RequestMessage::processFailedRequest_Ls(cta::xrd::Response &response, XrdSs { using namespace cta::admin; - stream = new FailedRequestLsStream(*this, m_catalogue, m_scheduler, m_scheddb, m_lc); + stream = new FailedRequestLsStream(*this, m_catalogue, m_scheduler, m_service.getSchedDb(), m_lc); // Display the correct column headers response.set_show_header(has_flag(OptionBoolean::SUMMARY) ? HeaderType::FAILEDREQUEST_LS_SUMMARY diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.hpp b/xroot_plugins/XrdSsiCtaRequestMessage.hpp index 606ceb2858..d2a698ae8b 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.hpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.hpp @@ -34,7 +34,7 @@ class RequestMessage { public: RequestMessage(const XrdSsiEntity &client, const XrdSsiCtaServiceProvider *service) : - m_scheddb(service->getSchedDb()), + m_service(*service), m_catalogue(service->getCatalogue()), m_scheduler(service->getScheduler()), m_archiveFileMaxSize(service->getArchiveFileMaxSize()), @@ -289,7 +289,7 @@ private: Protocol m_protocol; //!< The protocol the client used to connect cta::common::dataStructures::SecurityIdentity m_cliIdentity; //!< Client identity: username/host - cta::OStoreDBWithAgent &m_scheddb; //!< Reference to CTA ObjectStore + const XrdSsiCtaServiceProvider &m_service; //!< Const reference to the XRootD SSI Service cta::catalogue::Catalogue &m_catalogue; //!< Reference to CTA Catalogue cta::Scheduler &m_scheduler; //!< Reference to CTA Scheduler uint64_t m_archiveFileMaxSize; //!< Maximum allowed file size for archive requests diff --git a/xroot_plugins/XrdSsiCtaServiceProvider.cpp b/xroot_plugins/XrdSsiCtaServiceProvider.cpp index 77e3b490db..79376a010d 100644 --- a/xroot_plugins/XrdSsiCtaServiceProvider.cpp +++ b/xroot_plugins/XrdSsiCtaServiceProvider.cpp @@ -24,7 +24,6 @@ #include "common/log/FileLogger.hpp" #include "common/log/LogLevel.hpp" #include "common/utils/utils.hpp" -#include "objectstore/BackendVFS.hpp" #include "rdbms/Login.hpp" #include "version.h" #include "XrdSsiCtaServiceProvider.hpp" @@ -134,15 +133,16 @@ void XrdSsiCtaServiceProvider::ExceptionThrowingInit(XrdSsiLogger *logP, XrdSsiC } this->m_catalogue_conn_string = catalogueLogin.connectionString; - - // Initialise the Backend - auto objectstore_backendpath = config.getOptionValueStr("cta.objectstore.backendpath"); - if(!objectstore_backendpath.first) { - throw exception::UserError("cta.objectstore.backendpath is not set in configuration file " + cfgFn); + + // Initialise the Scheduler DB + const std::string DB_CONN_PARAM = "cta.objectstore.backendpath"; + auto db_conn = config.getOptionValueStr(DB_CONN_PARAM); + if(!db_conn.first) { + throw exception::UserError(DB_CONN_PARAM + " is not set in configuration file " + cfgFn); } - m_backend = std::move(cta::objectstore::BackendFactory::createBackend(objectstore_backendpath.second, *m_log)); - m_backendPopulator = cta::make_unique<cta::objectstore::BackendPopulator>(*m_backend, "Frontend", cta::log::LogContext(*m_log)); - m_scheddb = cta::make_unique<cta::OStoreDBWithAgent>(*m_backend, m_backendPopulator->getAgentReference(), *m_catalogue, *m_log); + m_scheddb_init = cta::make_unique<SchedulerDBInit_t>("Frontend", db_conn.second, *m_log); + m_scheddb = m_scheddb_init->getSchedDB(*m_catalogue, *m_log); + auto threadPoolSize = config.getOptionValueInt("cta.schedulerdb.numberofthreads"); if (threadPoolSize.first) { m_scheddb->setThreadNumber(threadPoolSize.second); @@ -151,12 +151,6 @@ void XrdSsiCtaServiceProvider::ExceptionThrowingInit(XrdSsiLogger *logP, XrdSsiC // Initialise the Scheduler m_scheduler = cta::make_unique<cta::Scheduler>(*m_catalogue, *m_scheddb, 5, 2*1000*1000); - try { - // If the backend is a VFS, make sure we don't delete it on exit - dynamic_cast<objectstore::BackendVFS &>(*m_backend).noDeleteOnExit(); - } catch (std::bad_cast &) { - // If not, never mind - } // Initialise the Frontend auto archiveFileMaxSize = config.getOptionValueInt("cta.archivefile.max_size_gb"); @@ -177,11 +171,6 @@ void XrdSsiCtaServiceProvider::ExceptionThrowingInit(XrdSsiLogger *logP, XrdSsiC Log::Msg(XrdSsiPb::Log::WARNING, LOG_SUFFIX, "warning: 'cta.ns.config' not specified; namespace queries are disabled"); } - // Start the heartbeat thread for the agent object. The thread is guaranteed to have started before we call the unique_ptr deleter - auto aht = new cta::objectstore::AgentHeartbeatThread(m_backendPopulator->getAgentReference(), *m_backend, *m_log); - aht->startThread(); - m_agentHeartbeat = std::move(UniquePtrAgentHeartbeatThread(aht)); - // All done log(log::INFO, std::string("cta-frontend started"), params); } diff --git a/xroot_plugins/XrdSsiCtaServiceProvider.hpp b/xroot_plugins/XrdSsiCtaServiceProvider.hpp index dffd76bcdb..a247466294 100644 --- a/xroot_plugins/XrdSsiCtaServiceProvider.hpp +++ b/xroot_plugins/XrdSsiCtaServiceProvider.hpp @@ -1,7 +1,7 @@ /*! * @project The CERN Tape Archive (CTA) * @brief XRootD Service Provider class - * @copyright Copyright 2017 CERN + * @copyright Copyright © 2021 CERN * @license This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or @@ -18,27 +18,20 @@ #pragma once +#include <XrdSsi/XrdSsiProvider.hh> + #include <common/Configuration.hpp> #include <common/utils/utils.hpp> -#include <objectstore/BackendPopulator.hpp> -#include <objectstore/BackendFactory.hpp> -#include <scheduler/Scheduler.hpp> -#include <scheduler/OStoreDB/OStoreDBWithAgent.hpp> -#include <objectstore/AgentHeartbeatThread.hpp> #include <xroot_plugins/Namespace.hpp> #include <XrdSsiPbLog.hpp> - -#include <XrdSsi/XrdSsiProvider.hh> - +#include <scheduler/Scheduler.hpp> +#include <scheduler/OStoreDB/OStoreDBInit.hpp> /*! * Global pointer to the Service Provider object. */ - extern XrdSsiProvider *XrdSsiProviderServer; - - /*! * Instantiates a Service to process client requests. */ @@ -86,9 +79,9 @@ public: XrdSsiProvider::rStat QueryResource(const char *rName, const char *contact=0) override; /*! - * Get a reference to the ObjectStore for this Service + * Get a reference to the Scheduler DB for this Service */ - cta::OStoreDBWithAgent &getSchedDb() const { return *m_scheddb; } + cta::SchedulerDB_t &getSchedDb() const { return *m_scheddb; } /*! * Get a reference to the Catalogue for this Service @@ -134,38 +127,18 @@ private: void ExceptionThrowingInit(XrdSsiLogger *logP, XrdSsiCluster *clsP, const std::string &cfgFn, const std::string &parms, int argc, char **argv); - /*! - * Deleter for instances of the AgentHeartbeatThread class. - * - * As the aht object is initialised by Init() rather than in the constructor, we can't simply call - * stopAndWaitThread() in the destructor. Using a deleter guarantees that we call stopAndWaitThread() - * after the AgentHeartbeatThread has been started by Init(). - */ - struct AgentHeartbeatThreadDeleter { - void operator()(cta::objectstore::AgentHeartbeatThread *aht) { - aht->stopAndWaitThread(); - } - }; - - /*! - * Typedef for unique pointer to AgentHeartbeatThread - */ - typedef std::unique_ptr<cta::objectstore::AgentHeartbeatThread, AgentHeartbeatThreadDeleter> - UniquePtrAgentHeartbeatThread; - // Member variables - uint64_t m_archiveFileMaxSize; //!< Maximum allowed file size for archive requests - std::unique_ptr<cta::objectstore::Backend> m_backend; //!< VFS backend for the objectstore DB - std::unique_ptr<cta::objectstore::BackendPopulator> m_backendPopulator; //!< Object used to populate the backend - std::unique_ptr<cta::OStoreDBWithAgent> m_scheddb; //!< DB/Object Store of persistent objects - std::unique_ptr<cta::catalogue::Catalogue> m_catalogue; //!< CTA catalogue of tapes and tape files - std::unique_ptr<cta::Scheduler> m_scheduler; //!< The scheduler - std::unique_ptr<cta::log::Logger> m_log; //!< The logger - cta::optional<std::string> m_repackBufferURL; //!< The repack buffer URL - cta::NamespaceMap_t m_namespaceMap; //!< Endpoints for namespace queries - UniquePtrAgentHeartbeatThread m_agentHeartbeat; //!< Agent heartbeat thread - std::string m_catalogue_conn_string; //!< The catalogue connection string without the password + std::unique_ptr<cta::catalogue::Catalogue> m_catalogue; //!< Catalogue of tapes and tape files + std::unique_ptr<cta::SchedulerDB_t> m_scheddb; //!< Scheduler DB for persistent objects (queues and requests) + std::unique_ptr<cta::SchedulerDBInit_t> m_scheddb_init; //!< Wrapper to manage Scheduler DB initialisation + std::unique_ptr<cta::Scheduler> m_scheduler; //!< The scheduler + std::unique_ptr<cta::log::Logger> m_log; //!< The logger + + uint64_t m_archiveFileMaxSize; //!< Maximum allowed file size for archive requests + cta::optional<std::string> m_repackBufferURL; //!< The repack buffer URL + cta::NamespaceMap_t m_namespaceMap; //!< Endpoints for namespace queries + std::string m_catalogue_conn_string; //!< The catalogue connection string (without the password) static constexpr const char* const LOG_SUFFIX = "XrdSsiCtaServiceProvider"; //!< Identifier for log messages }; -- GitLab