Commit cb8a7ca7 authored by Michael Davis's avatar Michael Davis
Browse files

Clean os references

parent a0bf18c9
Pipeline #4335 passed with stages
in 23 minutes and 15 seconds
/**
* @project The CERN Tape Archive (CTA)
* @copyright Copyright © 2021 CERN
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <common/make_unique.hpp>
#include <objectstore/BackendPopulator.hpp>
#include <objectstore/BackendFactory.hpp>
#include <objectstore/AgentHeartbeatThread.hpp>
#include <objectstore/BackendVFS.hpp>
#include <objectstore/GarbageCollector.hpp>
#include <scheduler/OStoreDB/OStoreDBWithAgent.hpp>
namespace cta {
class OStoreDBInit
{
public:
OStoreDBInit(const std::string& client_process, const std::string& db_conn_str, log::Logger& log,
bool leaveNonEmptyAgentsBehind = false)
{
// Initialise the ObjectStore Backend
m_backend = std::move(objectstore::BackendFactory::createBackend(db_conn_str, log));
m_backendPopulator = make_unique<objectstore::BackendPopulator>(*m_backend, client_process, log::LogContext(log));
if(leaveNonEmptyAgentsBehind) {
m_backendPopulator->leaveNonEmptyAgentsBehind();
}
try {
// If the backend is a VFS, don't delete it on exit
dynamic_cast<objectstore::BackendVFS &>(*m_backend).noDeleteOnExit();
} catch (std::bad_cast &) {
// If not, never mind
}
// Start the heartbeat thread for the agent object. The thread is guaranteed to have started before we call the unique_ptr deleter
auto aht = new objectstore::AgentHeartbeatThread(m_backendPopulator->getAgentReference(), *m_backend, log);
aht->startThread();
m_agentHeartbeat = std::move(UniquePtrAgentHeartbeatThread(aht));
}
std::unique_ptr<OStoreDBWithAgent> getSchedDB(catalogue::Catalogue& catalogue, log::Logger& log) {
return make_unique<OStoreDBWithAgent>(*m_backend, m_backendPopulator->getAgentReference(), catalogue, log);
}
objectstore::GarbageCollector getGarbageCollector(catalogue::Catalogue& catalogue) {
return objectstore::GarbageCollector(*m_backend, m_backendPopulator->getAgentReference(), catalogue);
}
private:
/*!
* Deleter for instances of the AgentHeartbeatThread class.
*
* Using a deleter (rather than calling from a destructor) guarantees that we can't call stopAndWaitThread()
* before the AgentHeartbeatThread has been started.
*/
struct AgentHeartbeatThreadDeleter {
void operator()(objectstore::AgentHeartbeatThread *aht) {
aht->stopAndWaitThread();
}
};
typedef std::unique_ptr<objectstore::AgentHeartbeatThread, AgentHeartbeatThreadDeleter> UniquePtrAgentHeartbeatThread;
// Member variables
std::unique_ptr<objectstore::Backend> m_backend; //!< VFS backend for the objectstore DB
std::unique_ptr<objectstore::BackendPopulator> m_backendPopulator; //!< Object used to populate the backend
UniquePtrAgentHeartbeatThread m_agentHeartbeat; //!< Agent heartbeat thread
};
typedef OStoreDBInit SchedulerDBInit_t;
typedef OStoreDBWithAgent SchedulerDB_t;
} // namespace cta
...@@ -22,13 +22,9 @@ ...@@ -22,13 +22,9 @@
#include "common/processCap/ProcessCap.hpp" #include "common/processCap/ProcessCap.hpp"
#include "DriveHandler.hpp" #include "DriveHandler.hpp"
#include "DriveHandlerProxy.hpp" #include "DriveHandlerProxy.hpp"
#include "objectstore/Backend.hpp"
#include "objectstore/BackendFactory.hpp"
#include "objectstore/BackendVFS.hpp"
#include "objectstore/BackendPopulator.hpp"
#include "objectstore/AgentHeartbeatThread.hpp"
#include "rdbms/Login.hpp" #include "rdbms/Login.hpp"
#include "scheduler/OStoreDB/OStoreDBWithAgent.hpp" #include "scheduler/OStoreDB/OStoreDBWithAgent.hpp"
#include "scheduler/OStoreDB/OStoreDBInit.hpp"
#include "tapeserver/castor/tape/tapeserver/daemon/CleanerSession.hpp" #include "tapeserver/castor/tape/tapeserver/daemon/CleanerSession.hpp"
#include "tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.hpp" #include "tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.hpp"
#include "tapeserver/castor/tape/tapeserver/daemon/Session.hpp" #include "tapeserver/castor/tape/tapeserver/daemon/Session.hpp"
...@@ -873,47 +869,29 @@ int DriveHandler::runChild() { ...@@ -873,47 +869,29 @@ int DriveHandler::runChild() {
params.add("backendPath", m_tapedConfig.backendPath.value()); params.add("backendPath", m_tapedConfig.backendPath.value());
lc.log(log::DEBUG, "In DriveHandler::runChild(): will connect to object store backend."); lc.log(log::DEBUG, "In DriveHandler::runChild(): will connect to object store backend.");
} }
// Before anything, we need to check we have access to the scheduler's central storages. // Before anything, we need to check we have access to the scheduler's central storage
std::unique_ptr<cta::objectstore::Backend> backend; std::unique_ptr<SchedulerDBInit_t> sched_db_init;
try {
backend.reset(cta::objectstore::BackendFactory::createBackend(m_tapedConfig.backendPath.value(), lc.logger()).release());
} catch (cta::exception::Exception &ex) {
log::ScopedParamContainer param (lc);
param.add("errorMessage", ex.getMessageValue());
lc.log(log::CRIT, "In DriveHandler::runChild(): failed to connect to objectstore. Reporting fatal error.");
driveHandlerProxy.reportState(tape::session::SessionState::Fatal, tape::session::SessionType::Undetermined, "");
sleep(1);
return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
}
// If the backend is a VFS, make sure we don't delete it on exit.
// If not, nevermind.
try {
dynamic_cast<cta::objectstore::BackendVFS &>(*backend).noDeleteOnExit();
} catch (std::bad_cast &){}
// Create the agent entry in the object store. This could fail (even before ping, so
// handle failure like a ping failure).
std::unique_ptr<cta::objectstore::BackendPopulator> backendPopulator;
std::unique_ptr<cta::OStoreDBWithAgent> osdb;
try { try {
std::string processName="DriveProcess-"; std::string processName="DriveProcess-";
processName+=m_configLine.unitName; processName+=m_configLine.unitName;
log::ScopedParamContainer params(lc); log::ScopedParamContainer params(lc);
params.add("processName", processName); params.add("processName", processName);
lc.log(log::DEBUG, "In DriveHandler::runChild(): will create agent entry. Enabling leaving non-empty agent behind."); lc.log(log::DEBUG, "In DriveHandler::runChild(): will create agent entry. Enabling leaving non-empty agent behind.");
backendPopulator.reset(new cta::objectstore::BackendPopulator(*backend, processName, lc)); sched_db_init.reset(new SchedulerDBInit_t(processName, m_tapedConfig.backendPath.value(), m_processManager.logContext().logger(), true));
backendPopulator->leaveNonEmptyAgentsBehind(); } catch (cta::exception::Exception &ex) {
} catch(cta::exception::Exception &ex) { log::ScopedParamContainer param (lc);
log::ScopedParamContainer param(lc);
param.add("errorMessage", ex.getMessageValue()); param.add("errorMessage", ex.getMessageValue());
lc.log(log::CRIT, "In DriveHandler::runChild(): failed to instantiate agent entry. Reporting fatal error."); lc.log(log::CRIT, "In DriveHandler::runChild(): failed to connect to objectstore or failed to instantiate agent entry. Reporting fatal error.");
driveHandlerProxy.reportState(tape::session::SessionState::Fatal, tape::session::SessionType::Undetermined, ""); driveHandlerProxy.reportState(tape::session::SessionState::Fatal, tape::session::SessionType::Undetermined, "");
sleep(1); sleep(1);
return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN; return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
} }
std::unique_ptr<SchedulerDB_t> sched_db;
try { try {
if(!m_catalogue) if(!m_catalogue) {
m_catalogue = createCatalogue("DriveHandler::runChild()"); m_catalogue = createCatalogue("DriveHandler::runChild()");
osdb.reset(new cta::OStoreDBWithAgent(*backend, backendPopulator->getAgentReference(), *m_catalogue, lc.logger())); }
sched_db = sched_db_init->getSchedDB(*m_catalogue, lc.logger());
} catch(cta::exception::Exception &ex) { } catch(cta::exception::Exception &ex) {
log::ScopedParamContainer param(lc); log::ScopedParamContainer param(lc);
param.add("errorMessage", ex.getMessageValue()); param.add("errorMessage", ex.getMessageValue());
...@@ -923,7 +901,7 @@ int DriveHandler::runChild() { ...@@ -923,7 +901,7 @@ int DriveHandler::runChild() {
return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN; return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
} }
lc.log(log::DEBUG, "In DriveHandler::runChild(): will create scheduler."); lc.log(log::DEBUG, "In DriveHandler::runChild(): will create scheduler.");
cta::Scheduler scheduler(*m_catalogue, *osdb, m_tapedConfig.mountCriteria.value().maxFiles, cta::Scheduler scheduler(*m_catalogue, *sched_db, m_tapedConfig.mountCriteria.value().maxFiles,
m_tapedConfig.mountCriteria.value().maxBytes); m_tapedConfig.mountCriteria.value().maxBytes);
// Before launching the transfer session, we validate that the scheduler is reachable. // Before launching the transfer session, we validate that the scheduler is reachable.
lc.log(log::DEBUG, "In DriveHandler::runChild(): will ping scheduler."); lc.log(log::DEBUG, "In DriveHandler::runChild(): will ping scheduler.");
...@@ -943,11 +921,6 @@ int DriveHandler::runChild() { ...@@ -943,11 +921,6 @@ int DriveHandler::runChild() {
return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN; return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
} }
lc.log(log::DEBUG, "In DriveHandler::runChild(): will start agent heartbeat.");
// The object store is accessible, let's turn the agent heartbeat on.
objectstore::AgentHeartbeatThread agentHeartbeat(backendPopulator->getAgentReference(), *backend, lc.logger());
agentHeartbeat.startThread();
// 1) Special case first, if we crashed in a cleaner session, we put the drive down // 1) Special case first, if we crashed in a cleaner session, we put the drive down
if (m_previousSession == PreviousSession::Crashed && m_previousType == SessionType::Cleanup) { if (m_previousSession == PreviousSession::Crashed && m_previousType == SessionType::Cleanup) {
log::ScopedParamContainer params(lc); log::ScopedParamContainer params(lc);
...@@ -1117,7 +1090,7 @@ int DriveHandler::runChild() { ...@@ -1117,7 +1090,7 @@ int DriveHandler::runChild() {
//Checking the drive does not already exist in the objectstore //Checking the drive does not already exist in the objectstore
try{ try{
osdb->checkDriveCanBeCreated(driveInfo); sched_db->checkDriveCanBeCreated(driveInfo);
} catch (SchedulerDatabase::DriveAlreadyExistsException &ex) { } catch (SchedulerDatabase::DriveAlreadyExistsException &ex) {
log::ScopedParamContainer param(lc); log::ScopedParamContainer param(lc);
param.add("tapeDrive",driveInfo.driveName) param.add("tapeDrive",driveInfo.driveName)
...@@ -1164,7 +1137,6 @@ int DriveHandler::runChild() { ...@@ -1164,7 +1137,6 @@ int DriveHandler::runChild() {
scheduler); scheduler);
auto ret = dataTransferSession.execute(); auto ret = dataTransferSession.execute();
agentHeartbeat.stopAndWaitThread();
return ret; return ret;
} }
} }
...@@ -1199,42 +1171,24 @@ SubprocessHandler::ProcessingStatus DriveHandler::shutdown() { ...@@ -1199,42 +1171,24 @@ SubprocessHandler::ProcessingStatus DriveHandler::shutdown() {
m_catalogue = createCatalogue("DriveHandler::shutdown()"); m_catalogue = createCatalogue("DriveHandler::shutdown()");
//Create the scheduler //Create the scheduler
std::unique_ptr<SchedulerDBInit_t> sched_db_init;
//Create the backend
std::unique_ptr<cta::objectstore::Backend> backend;
try {
backend.reset(cta::objectstore::BackendFactory::createBackend(m_tapedConfig.backendPath.value(), lc.logger()).release());
} catch (cta::exception::Exception &ex) {
log::ScopedParamContainer param (lc);
param.add("errorMessage", ex.getMessageValue());
lc.log(log::CRIT, "In DriveHandler::shutdown(): failed to connect to objectstore.");
goto exitShutdown;
}
// If the backend is a VFS, make sure we don't delete it on exit.
// If not, nevermind.
try {
dynamic_cast<cta::objectstore::BackendVFS &>(*backend).noDeleteOnExit();
} catch (std::bad_cast &){}
// Create the agent entry in the object store. This could fail (even before ping, so
// handle failure like a ping failure).
std::unique_ptr<cta::objectstore::BackendPopulator> backendPopulator;
std::unique_ptr<cta::OStoreDBWithAgent> osdb;
try { try {
std::string processName="DriveHandlerShutdown-"; std::string processName="DriveHandlerShutdown-";
processName+=m_configLine.unitName; processName+=m_configLine.unitName;
log::ScopedParamContainer params(lc); log::ScopedParamContainer params(lc);
params.add("processName", processName); params.add("processName", processName);
lc.log(log::DEBUG, "In DriveHandler::shutdown(): will create agent entry. Enabling leaving non-empty agent behind."); lc.log(log::DEBUG, "In DriveHandler::shutdown(): will create agent entry. Enabling leaving non-empty agent behind.");
backendPopulator.reset(new cta::objectstore::BackendPopulator(*backend, processName, lc)); sched_db_init.reset(new SchedulerDBInit_t(processName, m_tapedConfig.backendPath.value(), lc.logger(), true));
} catch(cta::exception::Exception &ex) { } catch (cta::exception::Exception &ex) {
log::ScopedParamContainer param(lc); log::ScopedParamContainer param (lc);
param.add("errorMessage", ex.getMessageValue()); param.add("errorMessage", ex.getMessageValue());
lc.log(log::CRIT, "In DriveHandler::shutdown(): failed to instantiate agent entry. Reporting fatal error."); lc.log(log::CRIT, "In DriveHandler::shutdown(): failed to connect to objectstore or failed to instantiate agent entry. Reporting fatal error.");
goto exitShutdown; goto exitShutdown;
} }
osdb.reset(new cta::OStoreDBWithAgent(*backend, backendPopulator->getAgentReference(), *m_catalogue, lc.logger())); std::unique_ptr<SchedulerDB_t> sched_db = sched_db_init->getSchedDB(*m_catalogue, lc.logger());
lc.log(log::DEBUG, "In DriveHandler::shutdown(): will create scheduler."); lc.log(log::DEBUG, "In DriveHandler::shutdown(): will create scheduler.");
std::unique_ptr<cta::Scheduler> scheduler(new Scheduler(*m_catalogue, *osdb, 0,0)); std::unique_ptr<cta::Scheduler> scheduler(new Scheduler(*m_catalogue, *sched_db, 0,0));
cta::mediachanger::MediaChangerFacade mediaChangerFacade(m_processManager.logContext().logger()); cta::mediachanger::MediaChangerFacade mediaChangerFacade(m_processManager.logContext().logger());
castor::tape::System::realWrapper sWrapper; castor::tape::System::realWrapper sWrapper;
......
...@@ -18,15 +18,10 @@ ...@@ -18,15 +18,10 @@
#include "MaintenanceHandler.hpp" #include "MaintenanceHandler.hpp"
#include "common/exception/Errnum.hpp" #include "common/exception/Errnum.hpp"
#include "objectstore/AgentHeartbeatThread.hpp"
#include "objectstore/BackendPopulator.hpp"
#include "objectstore/BackendFactory.hpp"
#include "objectstore/BackendVFS.hpp"
#include "objectstore/GarbageCollector.hpp"
#include "scheduler/OStoreDB/OStoreDBWithAgent.hpp"
#include "catalogue/Catalogue.hpp" #include "catalogue/Catalogue.hpp"
#include "catalogue/CatalogueFactoryFactory.hpp" #include "catalogue/CatalogueFactoryFactory.hpp"
#include "scheduler/Scheduler.hpp" #include "scheduler/Scheduler.hpp"
#include "scheduler/OStoreDB/OStoreDBInit.hpp"
#include "rdbms/Login.hpp" #include "rdbms/Login.hpp"
#include "common/make_unique.hpp" #include "common/make_unique.hpp"
#include "scheduler/DiskReportRunner.hpp" #include "scheduler/DiskReportRunner.hpp"
...@@ -268,32 +263,23 @@ void MaintenanceHandler::exceptionThrowingRunChild(){ ...@@ -268,32 +263,23 @@ void MaintenanceHandler::exceptionThrowingRunChild(){
// Before anything, we will check for access to the scheduler's central storage. // Before anything, we will check for access to the scheduler's central storage.
// If we fail to access it, we cannot work. We expect the drive processes to // If we fail to access it, we cannot work. We expect the drive processes to
// fail likewise, so we just wait for shutdown signal (no feedback to main // fail likewise, so we just wait for shutdown signal (no feedback to main process).
// process). SchedulerDBInit_t sched_db_init("Maintenance", m_tapedConfig.backendPath.value(), m_processManager.logContext().logger());
std::unique_ptr<cta::objectstore::Backend> backend(
cta::objectstore::BackendFactory::createBackend(m_tapedConfig.backendPath.value(), m_processManager.logContext().logger()).release()); std::unique_ptr<cta::SchedulerDB_t> sched_db;
// If the backend is a VFS, make sure we don't delete it on exit.
// If not, nevermind.
try {
dynamic_cast<cta::objectstore::BackendVFS &>(*backend).noDeleteOnExit();
} catch (std::bad_cast &){}
// Create the agent entry in the object store. This could fail (even before ping, so
// handle failure like a ping failure).
std::unique_ptr<cta::objectstore::BackendPopulator> backendPopulator;
std::unique_ptr<cta::OStoreDBWithAgent> osdb;
std::unique_ptr<cta::catalogue::Catalogue> catalogue; std::unique_ptr<cta::catalogue::Catalogue> catalogue;
std::unique_ptr<cta::Scheduler> scheduler; std::unique_ptr<cta::Scheduler> scheduler;
try { try {
backendPopulator.reset(new cta::objectstore::BackendPopulator(*backend, "Maintenance", m_processManager.logContext()));
const cta::rdbms::Login catalogueLogin = cta::rdbms::Login::parseFile(m_tapedConfig.fileCatalogConfigFile.value()); const cta::rdbms::Login catalogueLogin = cta::rdbms::Login::parseFile(m_tapedConfig.fileCatalogConfigFile.value());
const uint64_t nbConns = 1; const uint64_t nbConns = 1;
const uint64_t nbArchiveFileListingConns = 1; const uint64_t nbArchiveFileListingConns = 1;
auto catalogueFactory = cta::catalogue::CatalogueFactoryFactory::create(m_processManager.logContext().logger(), auto catalogueFactory = cta::catalogue::CatalogueFactoryFactory::create(m_processManager.logContext().logger(),
catalogueLogin, nbConns, nbArchiveFileListingConns); catalogueLogin, nbConns, nbArchiveFileListingConns);
catalogue=catalogueFactory->create(); catalogue = catalogueFactory->create();
osdb.reset(new cta::OStoreDBWithAgent(*backend, backendPopulator->getAgentReference(), *catalogue, m_processManager.logContext().logger())); sched_db = sched_db_init.getSchedDB(*catalogue, m_processManager.logContext().logger());
scheduler=make_unique<cta::Scheduler>(*catalogue, *osdb, 5, 2*1000*1000); //TODO: we have hardcoded the mount policy parameters here temporarily we will remove them once we know where to put them // TODO: we have hardcoded the mount policy parameters here temporarily we will remove them once we know where to put them
// Before launching the transfer session, we validate that the scheduler is reachable. scheduler = make_unique<cta::Scheduler>(*catalogue, *sched_db, 5, 2*1000*1000);
// Before launching the transfer session, we validate that the scheduler is reachable.
scheduler->ping(m_processManager.logContext()); scheduler->ping(m_processManager.logContext());
} catch(cta::exception::Exception &ex) { } catch(cta::exception::Exception &ex) {
{ {
...@@ -310,13 +296,9 @@ void MaintenanceHandler::exceptionThrowingRunChild(){ ...@@ -310,13 +296,9 @@ void MaintenanceHandler::exceptionThrowingRunChild(){
"In MaintenanceHandler::exceptionThrowingRunChild(): Received shutdown message after failure to contact storage. Exiting."); "In MaintenanceHandler::exceptionThrowingRunChild(): Received shutdown message after failure to contact storage. Exiting.");
throw ex; throw ex;
} }
// The object store is accessible, let's turn the agent heartbeat on.
objectstore::AgentHeartbeatThread agentHeartbeat(backendPopulator->getAgentReference(), *backend, m_processManager.logContext().logger());
agentHeartbeat.startThread();
// Create the garbage collector and the disk reporter // Create the garbage collector and the disk reporter
objectstore::GarbageCollector gc(*backend, backendPopulator->getAgentReference(), *catalogue); auto gc = sched_db_init.getGarbageCollector(*catalogue);
DiskReportRunner diskReportRunner(*scheduler); DiskReportRunner diskReportRunner(*scheduler);
RepackRequestManager repackRequestManager(*scheduler); RepackRequestManager repackRequestManager(*scheduler);
...@@ -366,7 +348,6 @@ void MaintenanceHandler::exceptionThrowingRunChild(){ ...@@ -366,7 +348,6 @@ void MaintenanceHandler::exceptionThrowingRunChild(){
"In MaintenanceHandler::exceptionThrowingRunChild(): received an unknown exception."); "In MaintenanceHandler::exceptionThrowingRunChild(): received an unknown exception.");
throw; throw;
} }
agentHeartbeat.stopAndWaitThread();
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
......
...@@ -1041,7 +1041,7 @@ void RequestMessage::processFailedRequest_Ls(cta::xrd::Response &response, XrdSs ...@@ -1041,7 +1041,7 @@ void RequestMessage::processFailedRequest_Ls(cta::xrd::Response &response, XrdSs
{ {
using namespace cta::admin; using namespace cta::admin;
stream = new FailedRequestLsStream(*this, m_catalogue, m_scheduler, m_scheddb, m_lc); stream = new FailedRequestLsStream(*this, m_catalogue, m_scheduler, m_service.getSchedDb(), m_lc);
// Display the correct column headers // Display the correct column headers
response.set_show_header(has_flag(OptionBoolean::SUMMARY) ? HeaderType::FAILEDREQUEST_LS_SUMMARY response.set_show_header(has_flag(OptionBoolean::SUMMARY) ? HeaderType::FAILEDREQUEST_LS_SUMMARY
......
...@@ -34,7 +34,7 @@ class RequestMessage ...@@ -34,7 +34,7 @@ class RequestMessage
{ {
public: public:
RequestMessage(const XrdSsiEntity &client, const XrdSsiCtaServiceProvider *service) : RequestMessage(const XrdSsiEntity &client, const XrdSsiCtaServiceProvider *service) :
m_scheddb(service->getSchedDb()), m_service(*service),
m_catalogue(service->getCatalogue()), m_catalogue(service->getCatalogue()),
m_scheduler(service->getScheduler()), m_scheduler(service->getScheduler()),
m_archiveFileMaxSize(service->getArchiveFileMaxSize()), m_archiveFileMaxSize(service->getArchiveFileMaxSize()),
...@@ -289,7 +289,7 @@ private: ...@@ -289,7 +289,7 @@ private:
Protocol m_protocol; //!< The protocol the client used to connect Protocol m_protocol; //!< The protocol the client used to connect
cta::common::dataStructures::SecurityIdentity m_cliIdentity; //!< Client identity: username/host cta::common::dataStructures::SecurityIdentity m_cliIdentity; //!< Client identity: username/host
cta::OStoreDBWithAgent &m_scheddb; //!< Reference to CTA ObjectStore const XrdSsiCtaServiceProvider &m_service; //!< Const reference to the XRootD SSI Service
cta::catalogue::Catalogue &m_catalogue; //!< Reference to CTA Catalogue cta::catalogue::Catalogue &m_catalogue; //!< Reference to CTA Catalogue
cta::Scheduler &m_scheduler; //!< Reference to CTA Scheduler cta::Scheduler &m_scheduler; //!< Reference to CTA Scheduler
uint64_t m_archiveFileMaxSize; //!< Maximum allowed file size for archive requests uint64_t m_archiveFileMaxSize; //!< Maximum allowed file size for archive requests
......
...@@ -24,7 +24,6 @@ ...@@ -24,7 +24,6 @@
#include "common/log/FileLogger.hpp" #include "common/log/FileLogger.hpp"
#include "common/log/LogLevel.hpp" #include "common/log/LogLevel.hpp"
#include "common/utils/utils.hpp" #include "common/utils/utils.hpp"
#include "objectstore/BackendVFS.hpp"
#include "rdbms/Login.hpp" #include "rdbms/Login.hpp"
#include "version.h" #include "version.h"
#include "XrdSsiCtaServiceProvider.hpp" #include "XrdSsiCtaServiceProvider.hpp"
...@@ -134,15 +133,16 @@ void XrdSsiCtaServiceProvider::ExceptionThrowingInit(XrdSsiLogger *logP, XrdSsiC ...@@ -134,15 +133,16 @@ void XrdSsiCtaServiceProvider::ExceptionThrowingInit(XrdSsiLogger *logP, XrdSsiC
} }
this->m_catalogue_conn_string = catalogueLogin.connectionString; this->m_catalogue_conn_string = catalogueLogin.connectionString;
// Initialise the Backend // Initialise the Scheduler DB
auto objectstore_backendpath = config.getOptionValueStr("cta.objectstore.backendpath"); const std::string DB_CONN_PARAM = "cta.objectstore.backendpath";
if(!objectstore_backendpath.first) { auto db_conn = config.getOptionValueStr(DB_CONN_PARAM);
throw exception::UserError("cta.objectstore.backendpath is not set in configuration file " + cfgFn); if(!db_conn.first) {
throw exception::UserError(DB_CONN_PARAM + " is not set in configuration file " + cfgFn);
} }
m_backend = std::move(cta::objectstore::BackendFactory::createBackend(objectstore_backendpath.second, *m_log)); m_scheddb_init = cta::make_unique<SchedulerDBInit_t>("Frontend", db_conn.second, *m_log);
m_backendPopulator = cta::make_unique<cta::objectstore::BackendPopulator>(*m_backend, "Frontend", cta::log::LogContext(*m_log)); m_scheddb = m_scheddb_init->getSchedDB(*m_catalogue, *m_log);
m_scheddb = cta::make_unique<cta::OStoreDBWithAgent>(*m_backend, m_backendPopulator->getAgentReference(), *m_catalogue, *m_log);
auto threadPoolSize = config.getOptionValueInt("cta.schedulerdb.numberofthreads"); auto threadPoolSize = config.getOptionValueInt("cta.schedulerdb.numberofthreads");
if (threadPoolSize.first) { if (threadPoolSize.first) {
m_scheddb->setThreadNumber(threadPoolSize.second); m_scheddb->setThreadNumber(threadPoolSize.second);
...@@ -151,12 +151,6 @@ void XrdSsiCtaServiceProvider::ExceptionThrowingInit(XrdSsiLogger *logP, XrdSsiC ...@@ -151,12 +151,6 @@ void XrdSsiCtaServiceProvider::ExceptionThrowingInit(XrdSsiLogger *logP, XrdSsiC
// Initialise the Scheduler // Initialise the Scheduler
m_scheduler = cta::make_unique<cta::Scheduler>(*m_catalogue, *m_scheddb, 5, 2*1000*1000); m_scheduler = cta::make_unique<cta::Scheduler>(*m_catalogue, *m_scheddb, 5, 2*1000*1000);
try {
// If the backend is a VFS, make sure we don't delete it on exit
dynamic_cast<objectstore::BackendVFS &>(*m_backend).noDeleteOnExit();
} catch (std::bad_cast &) {
// If not, never mind
}
// Initialise the Frontend // Initialise the Frontend
auto archiveFileMaxSize = config.getOptionValueInt("cta.archivefile.max_size_gb"); auto archiveFileMaxSize = config.getOptionValueInt("cta.archivefile.max_size_gb");
...@@ -177,11 +171,6 @@ void XrdSsiCtaServiceProvider::ExceptionThrowingInit(XrdSsiLogger *logP, XrdSsiC ...@@ -177,11 +171,6 @@ void XrdSsiCtaServiceProvider::ExceptionThrowingInit(XrdSsiLogger *logP, XrdSsiC
Log::Msg(XrdSsiPb::Log::WARNING, LOG_SUFFIX, "warning: 'cta.ns.config' not specified; namespace queries are disabled"); Log::Msg(XrdSsiPb::Log::WARNING, LOG_SUFFIX, "warning: 'cta.ns.config' not specified; namespace queries are disabled");
} }
// Start the heartbeat thread for the agent object. The thread is guaranteed to have started before we call the unique_ptr deleter
auto aht = new cta::objectstore::AgentHeartbeatThread(m_backendPopulator->getAgentReference(), *m_backend, *m_log);
aht->startThread();
m_agentHeartbeat = std::move(UniquePtrAgentHeartbeatThread(aht));
// All done // All done
log(log::INFO, std::string("cta-frontend started"), params); log(log::INFO, std::string("cta-frontend started"), params);
} }
......
/*! /*!
* @project The CERN Tape Archive (CTA) * @project The CERN Tape Archive (CTA)
* @brief XRootD Service Provider class * @brief XRootD Service Provider class
* @copyright Copyright 2017 CERN * @copyright Copyright © 2021 CERN
* @license This program is free software: you can redistribute it and/or modify * @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or * the Free Software Foundation, either version 3 of the License, or
...@@ -18,27 +18,20 @@ ...@@ -18,27 +18,20 @@
#pragma once #pragma once
#include <XrdSsi/XrdSsiProvider.hh>
#include <common/Configuration.hpp> #include <common/Configuration.hpp>
#include <common/utils/utils.hpp> #include <common/utils/utils.hpp>
#include <objectstore/BackendPopulator.hpp>
#include <objectstore/BackendFactory.hpp>
#include <scheduler/Scheduler.hpp>
#include <scheduler/OStoreDB/OStoreDBWithAgent.hpp>
#include <objectstore/AgentHeartbeatThread.hpp>
#include <xroot_plugins/Namespace.hpp> #include <xroot_plugins/Namespace.hpp>
#include <XrdSsiPbLog.hpp> #include <XrdSsiPbLog.hpp>
#include <scheduler/Scheduler.hpp>
#include <XrdSsi/XrdSsiProvider.hh> #include <scheduler/OStoreDB/OStoreDBInit.hpp>
/*! /*!
* Global pointer to the Service Provider object. * Global pointer to the Service Provider object.
*/ */
extern XrdSsiProvider *XrdSsiProviderServer; extern XrdSsiProvider *XrdSsiProviderServer;
/*! /*!
* Instantiates a Service to process client requests. * Instantiates a Service to process client requests.
*/ */
...@@ -86,9 +79,9 @@ public: ...@@ -86,9 +79,9 @@ public:
XrdSsiProvider::rStat QueryResource(const char *rName, const char *contact=0) override; XrdSsiProvider::rStat QueryResource(const</