Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • constellation/constellation
  • annika.vauth/constellation
  • finn.feindt/constellation
  • cbespin/constellation
  • stephan.lachnit/constellation
  • malinda.de.silva/constellation
6 results
Show changes
Commits on Source (60)
Showing
with 544 additions and 144 deletions
......@@ -67,6 +67,7 @@ variables:
NINJA_OPTIONS: "-j8"
KUBERNETES_CPU_REQUEST: "8"
KUBERNETES_MEMORY_REQUEST: "16Gi"
MESON_NUM_PROCESSES: "8"
tags:
- hi-load
......@@ -450,7 +451,7 @@ lint:clang-tidy-diff:
artifacts: true
script:
- git fetch origin $CI_MERGE_REQUEST_TARGET_BRANCH_NAME
- git diff -U0 origin/$CI_MERGE_REQUEST_TARGET_BRANCH_NAME -- | clang-tidy-diff -p 1 -path build -j $(nproc)
- git diff -U0 origin/$CI_MERGE_REQUEST_TARGET_BRANCH_NAME -- | clang-tidy-diff -p 1 -path build -j $MESON_NUM_PROCESSES
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
<<: *cpp-rules-changes
......
---
# SPDX-FileCopyrightText: 2025 DESY and the Constellation authors
# SPDX-License-Identifier: CC-BY-4.0
title: "Authors and Acknowledgments"
---
Constellation is developed and maintained by:
* Stephan Lachnit, Deutsches Elektronen-Synchrotron DESY, [stephan.lachnit](https://gitlab.desy.de/stephan.lachnit)
* Simon Spannagel, Deutsches Elektronen-Synchrotron DESY, [simonspa](https://gitlab.desy.de/simonspa)
* Hanno Perrey, Prevas AB, [hanno.perrey](https://gitlab.desy.de/hanno.perrey)
The following authors, in alphabetical order, have developed or contributed to Constellation:
* Lene Kristian Bryngemark, Lunds universitet, [ext-lene_kristian.bryngemark](https://gitlab.desy.de/ext-lene_kristian.bryngemark)
* Joel Ekström, Prevas AB, [joel.ekstroem](https://gitlab.desy.de/joel.ekstroem)
* Lennart Huth, Deutsches Elektronen-Synchrotron DESY, [lennart.huth](https://gitlab.desy.de/lennart.huth)
* Linus Ros, Prevas AB, [linus.ros](https://gitlab.desy.de/linus.ros)
* Annika Vauth, Universität Hamburg, [annika.vauth](https://gitlab.desy.de/annika.vauth)
* Håkan Wennlöf, Deutsches Elektronen-Synchrotron DESY, [hwennlof](https://gitlab.desy.de/hwennlof)
......@@ -31,7 +31,6 @@
#include <zmq.hpp>
#include <zmq_addon.hpp>
#include "constellation/core/chirp/Manager.hpp"
#include "constellation/core/config/Dictionary.hpp"
#include "constellation/core/log/log.hpp"
#include "constellation/core/message/CHP1Message.hpp"
......@@ -43,6 +42,7 @@
#include "constellation/core/protocol/CHP_definitions.hpp"
#include "constellation/core/protocol/CSCP_definitions.hpp"
#include "constellation/core/utils/enum.hpp"
#include "constellation/core/utils/ManagerLocator.hpp"
#include "constellation/core/utils/msgpack.hpp"
#include "constellation/core/utils/string.hpp"
......@@ -61,7 +61,7 @@ Controller::Controller(std::string controller_name)
void Controller::start() {
LOG(logger_, DEBUG) << "Registering controller callback";
auto* chirp_manager = chirp::Manager::getDefaultInstance();
auto* chirp_manager = ManagerLocator::getCHIRPManager();
if(chirp_manager != nullptr) {
chirp_manager->registerDiscoverCallback(&Controller::callback, CHIRP::CONTROL, this);
chirp_manager->sendRequest(CHIRP::CONTROL);
......@@ -75,7 +75,7 @@ void Controller::stop() {
heartbeat_receiver_.stopPool();
// Unregister callback
auto* chirp_manager = chirp::Manager::getDefaultInstance();
auto* chirp_manager = ManagerLocator::getCHIRPManager();
if(chirp_manager != nullptr) {
chirp_manager->unregisterDiscoverCallback(&Controller::callback, CHIRP::CONTROL);
}
......@@ -503,7 +503,7 @@ void Controller::controller_loop(const std::stop_token& stop_token) {
// Discard all CHIRP services for this host - this will remove the connection through the callback:
lock.unlock();
auto* chirp_manager = chirp::Manager::getDefaultInstance();
auto* chirp_manager = ManagerLocator::getCHIRPManager();
if(chirp_manager != nullptr) {
chirp_manager->forgetDiscoveredServices(conn->second.host_id);
}
......
......@@ -98,14 +98,6 @@ bool DiscoverCallbackEntry::operator<(const DiscoverCallbackEntry& other) const
return std::to_underlying(service_id) < std::to_underlying(other.service_id);
}
Manager* Manager::getDefaultInstance() {
return Manager::default_manager_instance_;
}
void Manager::setAsDefaultInstance() {
Manager::default_manager_instance_ = this;
}
Manager::Manager(const std::optional<asio::ip::address_v4>& brd_address,
const asio::ip::address_v4& any_address,
std::string_view group_name,
......
......@@ -107,19 +107,6 @@ namespace constellation::chirp {
/** Manager for CHIRP broadcasting and receiving */
class Manager {
public:
/**
* Return the default CHIRP Manager (requires to be set via `setAsDefaultInstance`)
*
* @return Pointer to default CHIRP Manager (might be a nullptr)
*/
CNSTLN_API static Manager* getDefaultInstance();
/**
* Set this CHIRP manager as the default instance
*/
CNSTLN_API void setAsDefaultInstance();
public:
/**
* @param brd_address Broadcast address for outgoing broadcast messages
......@@ -338,9 +325,6 @@ namespace constellation::chirp {
std::mutex discover_callbacks_mutex_;
std::jthread main_loop_thread_;
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
inline static Manager* default_manager_instance_;
};
} // namespace constellation::chirp
......@@ -19,15 +19,14 @@
#include <zmq.hpp>
#include "constellation/core/chirp/Manager.hpp"
#include "constellation/core/message/CHP1Message.hpp"
#include "constellation/core/networking/exceptions.hpp"
#include "constellation/core/networking/zmq_helpers.hpp"
#include "constellation/core/protocol/CHIRP_definitions.hpp"
#include "constellation/core/protocol/CSCP_definitions.hpp"
#include "constellation/core/utils/ManagerLocator.hpp"
#include "constellation/core/utils/thread.hpp"
using namespace constellation;
using namespace constellation::heartbeat;
using namespace constellation::message;
using namespace constellation::networking;
......@@ -41,7 +40,7 @@ HeartbeatSend::HeartbeatSend(std::string sender,
sender_(std::move(sender)), state_callback_(std::move(state_callback)), interval_(interval) {
// Announce service via CHIRP
auto* chirp_manager = chirp::Manager::getDefaultInstance();
auto* chirp_manager = ManagerLocator::getCHIRPManager();
if(chirp_manager != nullptr) {
chirp_manager->registerService(CHIRP::HEARTBEAT, port_);
}
......@@ -53,7 +52,7 @@ HeartbeatSend::HeartbeatSend(std::string sender,
HeartbeatSend::~HeartbeatSend() {
// Send CHIRP depart message
auto* chirp_manager = chirp::Manager::getDefaultInstance();
auto* chirp_manager = ManagerLocator::getCHIRPManager();
if(chirp_manager != nullptr) {
chirp_manager->unregisterService(CHIRP::HEARTBEAT, port_);
}
......
......@@ -30,22 +30,24 @@
#include <zmq.hpp>
#include <zmq_addon.hpp>
#include "constellation/core/chirp/Manager.hpp"
#include "constellation/core/config/Dictionary.hpp"
#include "constellation/core/log/Level.hpp"
#include "constellation/core/log/log.hpp"
#include "constellation/core/log/Logger.hpp"
#include "constellation/core/log/SinkManager.hpp"
#include "constellation/core/message/CMDP1Message.hpp"
#include "constellation/core/metrics/Metric.hpp"
#include "constellation/core/metrics/MetricsManager.hpp"
#include "constellation/core/networking/exceptions.hpp"
#include "constellation/core/networking/zmq_helpers.hpp"
#include "constellation/core/protocol/CHIRP_definitions.hpp"
#include "constellation/core/utils/enum.hpp"
#include "constellation/core/utils/ManagerLocator.hpp"
#include "constellation/core/utils/string.hpp"
#include "constellation/core/utils/string_hash_map.hpp"
#include "constellation/core/utils/thread.hpp"
#include "constellation/core/utils/windows.hpp"
using namespace constellation;
using namespace constellation::config;
using namespace constellation::log;
using namespace constellation::message;
using namespace constellation::metrics;
......@@ -75,8 +77,9 @@ namespace {
}
} // namespace
CMDPSink::CMDPSink(std::shared_ptr<zmq::context_t> context)
: context_(std::move(context)), pub_socket_(*context_, zmq::socket_type::xpub), port_(bind_ephemeral_port(pub_socket_)) {
CMDPSink::CMDPSink()
: global_context_(global_zmq_context()), pub_socket_(*global_context_, zmq::socket_type::xpub),
port_(bind_ephemeral_port(pub_socket_)) {
// Set reception timeout for subscription messages on XPUB socket to zero because we need to mutex-lock the socket
// while reading and cannot log at the same time.
try {
......@@ -86,13 +89,6 @@ CMDPSink::CMDPSink(std::shared_ptr<zmq::context_t> context)
}
}
CMDPSink::~CMDPSink() {
subscription_thread_.request_stop();
if(subscription_thread_.joinable()) {
subscription_thread_.join();
}
}
void CMDPSink::subscription_loop(const std::stop_token& stop_token) {
while(!stop_token.stop_requested()) {
......@@ -125,54 +121,96 @@ void CMDPSink::subscription_loop(const std::stop_token& stop_token) {
body.remove_prefix(1);
LOG(*logger_, TRACE) << "Received " << (subscribe ? "" : "un") << "subscribe message for " << body;
// TODO(simonspa) At some point we also have to treat STAT here
if(!body.starts_with("LOG/")) {
continue;
// Handle subscriptions as well as notification subscriptions:
if(body.starts_with("LOG/")) {
handle_log_subscriptions(subscribe, body);
} else if(body.starts_with("LOG?") && subscribe) {
ManagerLocator::getSinkManager().sendLogNotification();
} else if(body.starts_with("STAT/")) {
handle_stat_subscriptions(subscribe, body);
} else if(body.starts_with("STAT?") && subscribe) {
ManagerLocator::getSinkManager().sendMetricNotification();
} else {
LOG(*logger_, WARNING) << "Received " << (subscribe ? "" : "un") << "subscribe message with invalid topic "
<< body << ", ignoring";
}
}
}
const auto level_endpos = body.find_first_of('/', 4);
const auto level_str = body.substr(4, level_endpos - 4);
void CMDPSink::handle_log_subscriptions(bool subscribe, std::string_view body) {
// Empty level means subscription to everything
const auto level = (level_str.empty() ? std::optional<Level>(TRACE) : enum_cast<Level>(level_str));
const auto level_endpos = body.find_first_of('/', 4);
const auto level_str = body.substr(4, level_endpos - 4);
// Only accept valid levels
if(!level.has_value()) {
LOG(*logger_, TRACE) << "Invalid log level " << std::quoted(level_str) << ", ignoring";
continue;
}
// Empty level means subscription to everything
const auto level = (level_str.empty() ? std::optional<Level>(TRACE) : enum_cast<Level>(level_str));
const auto topic = (level_endpos != std::string_view::npos ? body.substr(level_endpos + 1) : std::string_view());
const auto topic_uc = transform(topic, ::toupper);
LOG(*logger_, TRACE) << "In/decrementing subscription counters for topic " << std::quoted(topic_uc);
// Only accept valid levels
if(!level.has_value()) {
LOG(*logger_, TRACE) << "Invalid log level " << std::quoted(level_str) << ", ignoring";
return;
}
if(subscribe) {
log_subscriptions_[topic_uc][level.value()] += 1;
} else {
if(log_subscriptions_[topic_uc][level.value()] > 0) {
log_subscriptions_[topic_uc][level.value()] -= 1;
}
const auto topic = (level_endpos != std::string_view::npos ? body.substr(level_endpos + 1) : std::string_view());
const auto topic_uc = transform(topic, ::toupper);
LOG(*logger_, TRACE) << "In/decrementing subscription counters for topic " << std::quoted(topic_uc);
if(subscribe) {
log_subscriptions_[topic_uc][level.value()] += 1;
} else {
if(log_subscriptions_[topic_uc][level.value()] > 0) {
log_subscriptions_[topic_uc][level.value()] -= 1;
}
}
// Figure out lowest level for each topic
auto cmdp_global_level = Level::OFF;
std::map<std::string_view, Level> cmdp_sub_topic_levels;
for(const auto& [logger, levels] : log_subscriptions_) {
auto it = std::ranges::find_if(levels, [](const auto& i) { return i.second > 0; });
if(it != levels.end()) {
if(!logger.empty()) {
cmdp_sub_topic_levels[logger] = it->first;
} else {
cmdp_global_level = it->first;
}
// Figure out lowest level for each topic
auto cmdp_global_level = Level::OFF;
std::map<std::string_view, Level> cmdp_sub_topic_levels;
for(const auto& [logger, levels] : log_subscriptions_) {
auto it = std::ranges::find_if(levels, [](const auto& i) { return i.second > 0; });
if(it != levels.end()) {
if(!logger.empty()) {
cmdp_sub_topic_levels[logger] = it->first;
} else {
cmdp_global_level = it->first;
}
}
}
LOG(*logger_, TRACE) << "Lowest global log level: " << std::quoted(enum_name(cmdp_global_level));
// Update subscriptions
ManagerLocator::getSinkManager().updateCMDPLevels(cmdp_global_level, std::move(cmdp_sub_topic_levels));
}
void CMDPSink::handle_stat_subscriptions(bool subscribe, std::string_view body) {
const auto topic = body.substr(5);
const auto topic_uc = transform(topic, ::toupper);
LOG(*logger_, TRACE) << "In/decrementing subscription counters for topic " << std::quoted(topic_uc);
if(subscribe) {
stat_subscriptions_[topic_uc] += 1;
} else {
if(stat_subscriptions_[topic_uc] > 0) {
stat_subscriptions_[topic_uc] -= 1;
}
}
LOG(*logger_, TRACE) << "Lowest global log level: " << std::quoted(enum_name(cmdp_global_level));
// Global subscription to all topics
const auto global_subscription = (stat_subscriptions_.contains("") && stat_subscriptions_.at("") > 0);
// Update subscriptions
SinkManager::getInstance().updateCMDPLevels(cmdp_global_level, std::move(cmdp_sub_topic_levels));
// List of subscribed topics:
string_hash_set subscription_topics;
subscription_topics.reserve(stat_subscriptions_.size());
for(const auto& [topic, counter] : stat_subscriptions_) {
if(counter > 0) {
subscription_topics.insert(topic);
}
}
// Update subscriptions
ManagerLocator::getMetricsManager().updateSubscriptions(global_subscription, std::move(subscription_topics));
}
void CMDPSink::enableSending(std::string sender_name) {
......@@ -186,7 +224,7 @@ void CMDPSink::enableSending(std::string sender_name) {
set_thread_name(subscription_thread_, "CMDPSink");
// Register service in CHIRP
auto* chirp_manager = chirp::Manager::getDefaultInstance();
auto* chirp_manager = ManagerLocator::getCHIRPManager();
if(chirp_manager != nullptr) {
chirp_manager->registerService(CHIRP::MONITORING, port_);
} else {
......@@ -195,6 +233,32 @@ void CMDPSink::enableSending(std::string sender_name) {
LOG(*logger_, INFO) << "Starting to log on port " << port_;
}
void CMDPSink::disableSending() {
// Nothing to disable if sending was never enabled
if(logger_ == nullptr) {
return;
}
LOG(*logger_, DEBUG) << "Disabling logging via CMDP";
subscription_thread_.request_stop();
if(subscription_thread_.joinable()) {
subscription_thread_.join();
}
auto* chirp_manager = ManagerLocator::getCHIRPManager();
if(chirp_manager != nullptr) {
chirp_manager->unregisterService(CHIRP::MONITORING, port_);
}
// Reset log levels
log_subscriptions_.clear();
ManagerLocator::getSinkManager().updateCMDPLevels(OFF);
// Delete CDMP logger to avoid circular dependency on destruction of CMDPSink
logger_.reset();
}
void CMDPSink::sink_it_(const spdlog::details::log_msg& msg) {
// Create message header
auto msghead = CMDP1Message::Header(sender_name_, msg.time);
......@@ -236,3 +300,18 @@ void CMDPSink::sinkMetric(MetricValue metric_value) {
throw NetworkError(e.what());
}
}
void CMDPSink::sinkNotification(const std::string& id, Dictionary topics) {
// Create message header
auto msghead = CMDP1Message::Header(sender_name_, std::chrono::system_clock::now());
// Lock the mutex - automatically done for regular logging:
const std::lock_guard<std::mutex> lock {mutex_};
try {
// Create and send CMDP message
CMDP1Notification(std::move(msghead), id, std::move(topics)).assemble().send(pub_socket_);
} catch(const zmq::error_t& e) {
throw NetworkError(e.what());
}
}
......@@ -15,16 +15,19 @@
#include <mutex>
#include <stop_token>
#include <string>
#include <string_view>
#include <thread>
#include <spdlog/async_logger.h>
#include <spdlog/sinks/base_sink.h>
#include <zmq.hpp>
#include "constellation/core/config/Dictionary.hpp"
#include "constellation/core/log/Level.hpp"
#include "constellation/core/log/Logger.hpp"
#include "constellation/core/metrics/Metric.hpp"
#include "constellation/core/networking/Port.hpp"
#include "constellation/core/utils/string_hash_map.hpp"
namespace constellation::log {
/**
......@@ -32,18 +35,17 @@ namespace constellation::log {
*
* Note that ZeroMQ sockets are not thread-safe, meaning that the sink requires a mutex.
*/
class CMDPSink : public spdlog::sinks::base_sink<std::mutex> {
class CMDPSink final : public spdlog::sinks::base_sink<std::mutex> {
public:
/**
* @brief Construct a new CMDPSink
* @param context ZMQ context to be used
*/
CMDPSink(std::shared_ptr<zmq::context_t> context);
CMDPSink();
/**
* @brief Deconstruct the CMDPSink
*/
~CMDPSink() override;
~CMDPSink() = default;
// No copy/move constructor/assignment
/// @cond doxygen_suppress
......@@ -67,6 +69,11 @@ namespace constellation::log {
*/
void enableSending(std::string sender_name);
/**
* @brief Disable sending by stopping the subscription thread
*/
void disableSending();
/**
* Sink metric
*
......@@ -74,6 +81,14 @@ namespace constellation::log {
*/
void sinkMetric(metrics::MetricValue metric_value);
/**
* Sink notification
*
* @param id Notification type
* @param topics Topics for the given notification type
*/
void sinkNotification(const std::string& id, config::Dictionary topics);
protected:
void sink_it_(const spdlog::details::log_msg& msg) final;
void flush_() final {}
......@@ -81,18 +96,24 @@ namespace constellation::log {
private:
void subscription_loop(const std::stop_token& stop_token);
void handle_log_subscriptions(bool subscribe, std::string_view body);
void handle_stat_subscriptions(bool subscribe, std::string_view body);
private:
std::unique_ptr<Logger> logger_;
// Needs to store shared pointer since CMDPSink is owned by static SinkManager
std::shared_ptr<zmq::context_t> context_;
// CMDPSink is a shared instance and is destroyed late -> requires shared ownership of the global context
// otherwise the context will be destroyed before the socket is closed and wait for all sockets to be closed
std::shared_ptr<zmq::context_t> global_context_;
zmq::socket_t pub_socket_;
networking::Port port_;
std::string sender_name_;
std::jthread subscription_thread_;
std::map<std::string, std::map<Level, std::size_t>> log_subscriptions_;
utils::string_hash_map<std::map<Level, std::size_t>> log_subscriptions_;
utils::string_hash_map<std::size_t> stat_subscriptions_;
};
} // namespace constellation::log
......@@ -13,15 +13,16 @@
#include <string_view>
#include <thread>
#include "constellation/core/log/SinkManager.hpp"
#include "constellation/core/utils/ManagerLocator.hpp"
using namespace constellation::log;
using namespace constellation::utils;
using namespace std::chrono_literals;
Logger::Logger(std::string_view topic) : spdlog_logger_(SinkManager::getInstance().getLogger(topic)) {}
Logger::Logger(std::string_view topic) : spdlog_logger_(ManagerLocator::getSinkManager().getLogger(topic)) {}
Logger& Logger::getDefault() {
static Logger instance {SinkManager::getInstance().getDefaultLogger()};
static Logger instance {ManagerLocator::getSinkManager().getDefaultLogger()};
return instance;
}
......
......@@ -29,14 +29,14 @@
#include <wincon.h>
#endif
#include "constellation/core/config/Dictionary.hpp"
#include "constellation/core/log/CMDPSink.hpp"
#include "constellation/core/log/Level.hpp"
#include "constellation/core/log/ProxySink.hpp"
#include "constellation/core/networking/zmq_helpers.hpp"
#include "constellation/core/utils/ManagerLocator.hpp"
#include "constellation/core/utils/string.hpp"
using namespace constellation::log;
using namespace constellation::networking;
using namespace constellation::utils;
SinkManager::ConstellationLevelFormatter::ConstellationLevelFormatter(bool format_short) : format_short_(format_short) {}
......@@ -72,12 +72,10 @@ std::unique_ptr<spdlog::custom_flag_formatter> SinkManager::ConstellationTopicFo
return std::make_unique<ConstellationTopicFormatter>();
}
SinkManager& SinkManager::getInstance() {
static SinkManager instance {};
return instance;
}
SinkManager::SinkManager() : console_global_level_(TRACE), cmdp_global_level_(OFF) {
// Disable global spdlog registration of loggers
spdlog::set_automatic_registration(false);
SinkManager::SinkManager() : zmq_context_(global_zmq_context()), console_global_level_(TRACE), cmdp_global_level_(OFF) {
// Init thread pool with 1k queue size on 1 thread
spdlog::init_thread_pool(1000, 1);
......@@ -111,17 +109,33 @@ SinkManager::SinkManager() : zmq_context_(global_zmq_context()), console_global_
#endif
// CMDP sink, log level always TRACE since only accessed via ProxySink
cmdp_sink_ = std::make_shared<CMDPSink>(zmq_context_);
cmdp_sink_ = std::make_shared<CMDPSink>();
cmdp_sink_->set_level(to_spdlog_level(TRACE));
// Create default logger without topic
default_logger_ = create_logger("DEFAULT");
}
SinkManager::~SinkManager() {
// Remove all loggers
std::unique_lock loggers_lock {loggers_mutex_};
loggers_.clear();
loggers_lock.unlock();
// Reset all sinks
console_sink_.reset();
cmdp_sink_.reset();
// Run spdlog cleanup
spdlog::shutdown();
}
void SinkManager::enableCMDPSending(std::string sender_name) {
cmdp_sink_->enableSending(std::move(sender_name));
}
void SinkManager::disableCMDPSending() {
cmdp_sink_->disableSending();
}
std::shared_ptr<spdlog::async_logger> SinkManager::getLogger(std::string_view topic) {
// Acquire lock for loggers_
std::unique_lock loggers_lock {loggers_mutex_};
......@@ -233,3 +247,25 @@ void SinkManager::updateCMDPLevels(Level cmdp_global_level, std::map<std::string
calculate_log_level(logger);
}
}
void SinkManager::sendMetricNotification() {
const auto descriptions = ManagerLocator::getMetricsManager().getMetricsDescriptions();
config::Dictionary payload;
for(const auto& [key, value] : descriptions) {
payload.emplace(key, value);
}
cmdp_sink_->sinkNotification("STAT?", std::move(payload));
}
void SinkManager::sendLogNotification() {
config::Dictionary payload;
std::unique_lock loggers_lock {loggers_mutex_};
for(const auto& logger : loggers_) {
// TODO(simonspa): Loggers don't have a description yet - leaving empty
payload.emplace(logger->name(), "");
}
loggers_lock.unlock();
cmdp_sink_->sinkNotification("LOG?", std::move(payload));
}
......@@ -23,7 +23,6 @@
#include <spdlog/details/log_msg.h>
#include <spdlog/pattern_formatter.h>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <zmq.hpp>
#include "constellation/build.hpp"
#include "constellation/core/log/CMDPSink.hpp"
......@@ -31,6 +30,11 @@
#include "constellation/core/metrics/Metric.hpp"
#include "constellation/core/networking/Port.hpp"
// Forward declaration
namespace constellation::utils {
class ManagerLocator;
} // namespace constellation::utils
namespace constellation::log {
/**
* @brief Global sink manager
......@@ -58,10 +62,6 @@ namespace constellation::log {
};
public:
CNSTLN_API static SinkManager& getInstance();
~SinkManager() = default;
// No copy/move constructor/assignment
/// @cond doxygen_suppress
SinkManager(const SinkManager& other) = delete;
......@@ -70,6 +70,8 @@ namespace constellation::log {
SinkManager& operator=(SinkManager&& other) = delete;
/// @endcond
CNSTLN_API ~SinkManager();
/**
* @brief Get the ephemeral port to which the CMDP sink is bound to
*
......@@ -84,6 +86,11 @@ namespace constellation::log {
*/
CNSTLN_API void enableCMDPSending(std::string sender_name);
/**
* @brief Disable sending via CMDP
*/
CNSTLN_API void disableCMDPSending();
/**
* Send metric via the CMDP sink
*
......@@ -91,6 +98,16 @@ namespace constellation::log {
*/
void sendCMDPMetric(metrics::MetricValue metric_value) { cmdp_sink_->sinkMetric(std::move(metric_value)); }
/**
* Send CMDP Metric topic notification message via the CMDP sink
*/
void sendMetricNotification();
/**
* Send CMDP Log topic notification message via the CMDP sink
*/
void sendLogNotification();
/**
* @brief Get an asynchronous spdlog logger with a given topic
*
......@@ -124,7 +141,10 @@ namespace constellation::log {
std::map<std::string_view, Level> cmdp_sub_topic_levels = {});
private:
SinkManager();
/// @cond doxygen_suppress
friend utils::ManagerLocator;
CNSTLN_API SinkManager();
/// @endcond
/**
* @brief Create a new asynchronous spdlog logger
......@@ -142,8 +162,6 @@ namespace constellation::log {
void calculate_log_level(std::shared_ptr<spdlog::async_logger>& logger);
private:
std::shared_ptr<zmq::context_t> zmq_context_;
std::shared_ptr<spdlog::sinks::stdout_color_sink_mt> console_sink_;
std::shared_ptr<CMDPSink> cmdp_sink_;
......
......@@ -153,6 +153,7 @@ install_headers(
'utils/casts.hpp',
'utils/enum.hpp',
'utils/exceptions.hpp',
'utils/ManagerLocator.hpp',
'utils/std_future.hpp',
'utils/string.hpp',
'utils/string_hash_map.hpp',
......
......@@ -18,6 +18,7 @@
#include <zmq.hpp>
#include <zmq_addon.hpp>
#include "constellation/core/config/Dictionary.hpp"
#include "constellation/core/log/Level.hpp"
#include "constellation/core/message/exceptions.hpp"
#include "constellation/core/message/PayloadBuffer.hpp"
......@@ -46,6 +47,10 @@ bool CMDP1Message::isStatMessage() const {
return topic_.starts_with("STAT/");
}
bool CMDP1Message::isNotification() const {
return topic_.starts_with("STAT?") || topic_.starts_with("LOG?");
}
zmq::multipart_t CMDP1Message::assemble() {
zmq::multipart_t frames {};
......@@ -70,8 +75,9 @@ CMDP1Message CMDP1Message::disassemble(zmq::multipart_t& frames) {
// Decode topic
const auto topic = frames.pop().to_string();
if(!(topic.starts_with("LOG/") || topic.starts_with("STAT/"))) {
throw MessageDecodingError("Invalid message topic, neither log nor telemetry message");
if(!(topic.starts_with("LOG/") || topic.starts_with("STAT/") || topic.starts_with("LOG?") ||
topic.starts_with("STAT?"))) {
throw MessageDecodingError("Invalid message topic \"" + topic + "\", neither log nor telemetry message");
}
// Check if valid log level by trying to decode it
......@@ -90,9 +96,21 @@ CMDP1Message CMDP1Message::disassemble(zmq::multipart_t& frames) {
return {topic, header, std::move(payload)};
}
std::string CMDP1Message::getTopic() const {
if(topic_.starts_with("LOG/")) {
// Search for second slash after "LOG/" to get substring with log topic
const auto level_endpos = topic_.find_first_of('/', 4);
return topic_.substr(level_endpos + 1);
} else if(topic_.starts_with("STAT/")) {
return topic_.substr(5);
}
throw IncorrectMessageType("Neither log nor stat message");
}
Level CMDP1Message::get_log_level_from_topic(std::string_view topic) {
if(!topic.starts_with("LOG/")) {
throw MessageDecodingError("Not a log message");
throw IncorrectMessageType("Not a log message");
}
// Search for second slash after "LOG/" to get substring with log level
......@@ -117,7 +135,7 @@ CMDP1LogMessage::CMDP1LogMessage(CMDP1Message&& message) : CMDP1Message(std::mov
throw IncorrectMessageType("Not a log message");
}
const auto topic = getTopic();
const auto topic = getMessageTopic();
level_ = get_log_level_from_topic(topic);
// Search for a '/' after "LOG/"
......@@ -144,11 +162,11 @@ CMDP1StatMessage::CMDP1StatMessage(Header header, metrics::MetricValue metric_va
CMDP1StatMessage::CMDP1StatMessage(CMDP1Message&& message) : CMDP1Message(std::move(message)) {
if(!isStatMessage()) {
throw MessageDecodingError("Not a telemetry message");
throw IncorrectMessageType("Not a telemetry message");
}
// Assign topic after prefix "STAT/"
const auto topic = std::string(getTopic().substr(5));
const auto topic = std::string(getMessageTopic().substr(5));
try {
metric_value_ = MetricValue::disassemble(topic, get_payload());
......@@ -161,3 +179,23 @@ CMDP1StatMessage CMDP1StatMessage::disassemble(zmq::multipart_t& frames) {
// Use disassemble from base class and cast via constructor
return {CMDP1Message::disassemble(frames)};
}
CMDP1Notification::CMDP1Notification(Header header, const std::string& id, Dictionary topics)
: CMDP1Message(id, std::move(header), topics.assemble()), topics_(std::move(topics)) {}
CMDP1Notification::CMDP1Notification(CMDP1Message&& message) : CMDP1Message(std::move(message)) {
if(!isNotification()) {
throw IncorrectMessageType("Not a CMDP notification");
}
try {
topics_ = Dictionary::disassemble(get_payload());
} catch(const std::invalid_argument& e) {
throw MessageDecodingError(e.what());
}
}
CMDP1Notification CMDP1Notification::disassemble(zmq::multipart_t& frames) {
// Use disassemble from base class and cast via constructor
return {CMDP1Message::disassemble(frames)};
}
......@@ -19,6 +19,7 @@
#include <zmq_addon.hpp>
#include "constellation/build.hpp"
#include "constellation/core/config/Dictionary.hpp"
#include "constellation/core/log/Level.hpp"
#include "constellation/core/message/BaseHeader.hpp"
#include "constellation/core/message/PayloadBuffer.hpp"
......@@ -58,7 +59,13 @@ namespace constellation::message {
/**
* @return CMDP message topic
*/
std::string_view getTopic() const { return topic_; };
std::string_view getMessageTopic() const { return topic_; };
/**
* @brief Get topic without CMDP identifier (LOG or STAT)
* @return topic
*/
CNSTLN_API std::string getTopic() const;
/**
* @return If the message is a log message
......@@ -70,6 +77,11 @@ namespace constellation::message {
*/
CNSTLN_API bool isStatMessage() const;
/**
* @return if the message is a notification
*/
CNSTLN_API bool isNotification() const;
/**
* Assemble full message to frames for ZeroMQ
*
......@@ -207,4 +219,43 @@ namespace constellation::message {
metrics::MetricValue metric_value_;
};
class CMDP1Notification : public CMDP1Message {
public:
/**
* Construct a new CMDP1 message for metrics
*
* @note The message topic will be taken as the metric name
*
* @param header CMDP1 header of the message
* @param id Identifier of notification
* @param topics Dictionary with available topics for the given identifier
*/
CNSTLN_API CMDP1Notification(Header header, const std::string& id, config::Dictionary topics);
/**
* Construct a CMDP1Notification from a decoded CMDP1Message
*
* @throw IncorrectMessageType If the message is not a (valid) notification
*/
CNSTLN_API CMDP1Notification(CMDP1Message&& message);
/**
* @return List of topics
*/
CNSTLN_API const config::Dictionary& getTopics() const { return topics_; }
/**
* Disassemble stats message from ZeroMQ frames
*
* This function moves the payload.
*
* @return New CMDP1Notification assembled from ZeroMQ frames
* @throw MessageDecodingError If the message is not a valid CMDP1 message
* @throw IncorrectMessageType If the message is a valid CMDP1 message but not a (valid) stat message
*/
CNSTLN_API static CMDP1Notification disassemble(zmq::multipart_t& frames);
private:
config::Dictionary topics_;
};
} // namespace constellation::message
......@@ -23,20 +23,16 @@
#include "constellation/core/config/Value.hpp"
#include "constellation/core/log/log.hpp"
#include "constellation/core/log/SinkManager.hpp"
#include "constellation/core/metrics/Metric.hpp"
#include "constellation/core/utils/ManagerLocator.hpp"
using namespace constellation::config;
using namespace constellation::log;
using namespace constellation::message;
using namespace constellation::metrics;
using namespace constellation::utils;
using namespace std::chrono_literals;
MetricsManager& MetricsManager::getInstance() {
static MetricsManager instance {};
return instance;
}
MetricsManager::MetricsManager() : logger_("STAT"), thread_(std::bind_front(&MetricsManager::run, this)) {};
MetricsManager::~MetricsManager() noexcept {
......@@ -48,11 +44,24 @@ MetricsManager::~MetricsManager() noexcept {
}
}
void MetricsManager::registerMetric(std::shared_ptr<Metric> metric) {
bool MetricsManager::shouldStat(std::string_view name) const {
return global_subscription_ || subscribed_topics_.contains(name);
}
void MetricsManager::updateSubscriptions(bool global, string_hash_set topic_subscriptions) {
// Acquire lock for metric variables and update them
const std::lock_guard levels_lock {metrics_mutex_};
global_subscription_ = global;
subscribed_topics_ = std::move(topic_subscriptions);
}
void MetricsManager::registerMetric(std::shared_ptr<Metric> metric, std::string description) {
const auto name = std::string(metric->name());
std::unique_lock metrics_lock {metrics_mutex_};
const auto [it, inserted] = metrics_.insert_or_assign(name, std::move(metric));
metrics_descriptions_.insert_or_assign(name, std::move(description));
ManagerLocator::getSinkManager().sendMetricNotification();
metrics_lock.unlock();
if(!inserted) {
......@@ -66,12 +75,14 @@ void MetricsManager::registerMetric(std::shared_ptr<Metric> metric) {
LOG(logger_, DEBUG) << "Successfully registered metric " << std::quoted(name);
}
void MetricsManager::registerTimedMetric(std::shared_ptr<TimedMetric> metric) {
void MetricsManager::registerTimedMetric(std::shared_ptr<TimedMetric> metric, std::string description) {
const auto name = std::string(metric->name());
// Add to metrics map
std::unique_lock metrics_lock {metrics_mutex_};
const auto [it, inserted] = metrics_.insert_or_assign(name, metric);
metrics_descriptions_.insert_or_assign(name, std::move(description));
ManagerLocator::getSinkManager().sendMetricNotification();
metrics_lock.unlock();
if(!inserted) {
......@@ -95,6 +106,11 @@ void MetricsManager::unregisterMetric(std::string_view name) {
if(it != metrics_.end()) {
metrics_.erase(it);
}
auto itd = metrics_descriptions_.find(name);
if(itd != metrics_descriptions_.end()) {
metrics_descriptions_.erase(itd);
}
ManagerLocator::getSinkManager().sendMetricNotification();
metrics_lock.unlock();
std::unique_lock timed_metrics_lock {timed_metrics_mutex_};
......@@ -108,6 +124,8 @@ void MetricsManager::unregisterMetric(std::string_view name) {
void MetricsManager::unregisterMetrics() {
std::unique_lock metrics_lock {metrics_mutex_};
metrics_.clear();
metrics_descriptions_.clear();
ManagerLocator::getSinkManager().sendMetricNotification();
metrics_lock.unlock();
std::unique_lock timed_metrics_lock {timed_metrics_mutex_};
......@@ -146,7 +164,7 @@ void MetricsManager::run(const std::stop_token& stop_token) {
if(metric_it != metrics_.end()) {
LOG(logger_, TRACE) << "Sending metric " << std::quoted(name) << ": " << value.str() << " ["
<< metric_it->second->unit() << "]";
SinkManager::getInstance().sendCMDPMetric({metric_it->second, std::move(value)});
ManagerLocator::getSinkManager().sendCMDPMetric({metric_it->second, std::move(value)});
} else {
LOG(logger_, WARNING) << "Metric " << std::quoted(name) << " is not registered";
}
......@@ -160,13 +178,13 @@ void MetricsManager::run(const std::stop_token& stop_token) {
// Check timed metrics
const std::lock_guard timed_metrics_lock {timed_metrics_mutex_};
for(auto& [name, timed_metric] : timed_metrics_) {
// If last time sent larger than interval and allowed -> send metric
if(timed_metric.timeoutReached()) {
// If last time sent larger than interval and allowed and there is a subscription -> send metric
if(timed_metric.timeoutReached() && shouldStat(name)) {
auto value = timed_metric->currentValue();
if(value.has_value()) {
LOG(logger_, TRACE) << "Sending metric " << std::quoted(timed_metric->name()) << ": "
<< value.value().str() << " [" << timed_metric->unit() << "]";
SinkManager::getInstance().sendCMDPMetric({timed_metric.getMetric(), std::move(value.value())});
ManagerLocator::getSinkManager().sendCMDPMetric({timed_metric.getMetric(), std::move(value.value())});
timed_metric.resetTimer();
} else {
LOG(logger_, TRACE) << "Not sending metric " << std::quoted(timed_metric->name()) << ": no value";
......
......@@ -28,16 +28,16 @@
#include "constellation/core/utils/string_hash_map.hpp"
#include "constellation/core/utils/timers.hpp"
// Forward declaration
namespace constellation::utils {
class ManagerLocator;
} // namespace constellation::utils
namespace constellation::metrics {
/** Manager for Metrics handling & transmission */
class MetricsManager {
public:
/**
* @brief Return instance of metrics manager
*/
CNSTLN_API static MetricsManager& getInstance();
// No copy/move constructor/assignment
/// @cond doxygen_suppress
MetricsManager(MetricsManager& other) = delete;
......@@ -52,8 +52,9 @@ namespace constellation::metrics {
* Register a (manually triggered) metric
*
* @param metric Shared pointer to the metric
* @param description Description of the metric
*/
CNSTLN_API void registerMetric(std::shared_ptr<Metric> metric);
CNSTLN_API void registerMetric(std::shared_ptr<Metric> metric, std::string description);
/**
* Register a (manually triggered) metric
......@@ -61,15 +62,17 @@ namespace constellation::metrics {
* @param name Unique topic of the metric
* @param unit Unit of the provided value
* @param type Type of the metric
* @param description Description of the metric
*/
void registerMetric(std::string name, std::string unit, metrics::MetricType type);
void registerMetric(std::string name, std::string unit, metrics::MetricType type, std::string description);
/**
* Register a timed metric
*
* @param metric Shared pointer to the timed metric
* @param description Description of the metric
*/
CNSTLN_API void registerTimedMetric(std::shared_ptr<TimedMetric> metric);
CNSTLN_API void registerTimedMetric(std::shared_ptr<TimedMetric> metric, std::string description);
/**
* Register a timed metric
......@@ -77,6 +80,7 @@ namespace constellation::metrics {
* @param name Name of the metric
* @param unit Unit of the metric as human readable string
* @param type Type of the metric
* @param description Description of the metric
* @param interval Interval in which to send the metric
* @param value_callback Callback to determine the current value of the metric
*/
......@@ -85,6 +89,7 @@ namespace constellation::metrics {
void registerTimedMetric(std::string name,
std::string unit,
metrics::MetricType type,
std::string description,
std::chrono::steady_clock::duration interval,
C value_callback);
......@@ -105,7 +110,7 @@ namespace constellation::metrics {
/**
* Check if a metric should be send given the subscription status
*/
static constexpr bool shouldStat(std::string_view /*name*/) { return true; }
CNSTLN_API bool shouldStat(std::string_view name) const;
/**
* Manually trigger a metric
......@@ -115,8 +120,26 @@ namespace constellation::metrics {
*/
CNSTLN_API void triggerMetric(std::string name, config::Value value);
/**
* @brief Update topic subscriptions
*
* @param global Global Flag for global subscription to all topics
* @param topic_subscriptions List of individual subscription topics
*/
void updateSubscriptions(bool global, utils::string_hash_set topic_subscriptions = {});
/**
* @brief Obtain map of registered metrics along with their descriptions
*
* @return Map with metric descriptions
*/
utils::string_hash_map<std::string> getMetricsDescriptions() const { return metrics_descriptions_; }
private:
MetricsManager();
/// @cond doxygen_suppress
friend utils::ManagerLocator;
CNSTLN_API MetricsManager();
/// @endcond
/**
* Main loop listening and responding to incoming CHIRP broadcasts
......@@ -148,8 +171,13 @@ namespace constellation::metrics {
private:
log::Logger logger_;
// List of topics with active subscribers:
utils::string_hash_set subscribed_topics_;
bool global_subscription_;
// Contains all metrics, including timed ones
utils::string_hash_map<std::shared_ptr<Metric>> metrics_;
utils::string_hash_map<std::string> metrics_descriptions_;
std::mutex metrics_mutex_;
// Only timed metrics for background thread
......
......@@ -28,8 +28,9 @@
namespace constellation::metrics {
inline void MetricsManager::registerMetric(std::string name, std::string unit, metrics::MetricType type) {
registerMetric(std::make_shared<metrics::Metric>(std::move(name), std::move(unit), type));
inline void
MetricsManager::registerMetric(std::string name, std::string unit, metrics::MetricType type, std::string description) {
registerMetric(std::make_shared<metrics::Metric>(std::move(name), std::move(unit), type), std::move(description));
};
template <typename C>
......@@ -37,6 +38,7 @@ namespace constellation::metrics {
void MetricsManager::registerTimedMetric(std::string name,
std::string unit,
metrics::MetricType type,
std::string description,
std::chrono::steady_clock::duration interval,
C value_callback) {
std::function<std::optional<config::Value>()> value_callback_cast =
......@@ -59,7 +61,8 @@ namespace constellation::metrics {
}
};
registerTimedMetric(
std::make_shared<TimedMetric>(std::move(name), std::move(unit), type, interval, std::move(value_callback_cast)));
std::make_shared<TimedMetric>(std::move(name), std::move(unit), type, interval, std::move(value_callback_cast)),
std::move(description));
}
} // namespace constellation::metrics
......@@ -12,7 +12,7 @@
#include <atomic> // IWYU pragma: keep
#include <chrono> // IWYU pragma: keep
#include "constellation/core/metrics/MetricsManager.hpp" // IWYU pragma: keep
#include "constellation/core/utils/ManagerLocator.hpp" // IWYU pragma: keep
// NOLINTBEGIN(cppcoreguidelines-macro-usage)
......@@ -24,7 +24,7 @@
#define STAT_VAR STAT_CONCAT_NESTED(STAT_VAR_L, __LINE__)
#define STAT_VAR2 STAT_CONCAT_NESTED(STAT_VAR2_L, __LINE__)
#define STAT_METRICS_MANAGER constellation::metrics::MetricsManager::getInstance()
#define STAT_METRICS_MANAGER constellation::utils::ManagerLocator::getMetricsManager()
/// @endcond
......
......@@ -24,13 +24,13 @@
#include <zmq.hpp>
#include <zmq_addon.hpp>
#include "constellation/core/chirp/Manager.hpp"
#include "constellation/core/log/log.hpp"
#include "constellation/core/message/exceptions.hpp"
#include "constellation/core/networking/exceptions.hpp"
#include "constellation/core/networking/zmq_helpers.hpp"
#include "constellation/core/protocol/CHIRP_definitions.hpp"
#include "constellation/core/utils/enum.hpp"
#include "constellation/core/utils/ManagerLocator.hpp"
#include "constellation/core/utils/thread.hpp"
namespace constellation::pools {
......@@ -45,7 +45,7 @@ namespace constellation::pools {
pool_thread_ = std::jthread(std::bind_front(&BasePool::loop, this));
utils::set_thread_name(pool_thread_, pool_logger_.getLogTopic());
auto* chirp_manager = chirp::Manager::getDefaultInstance();
auto* chirp_manager = utils::ManagerLocator::getCHIRPManager();
if(chirp_manager != nullptr) {
// Call callback for all already discovered services
const auto discovered_services = chirp_manager->getDiscoveredServices(SERVICE);
......@@ -62,7 +62,7 @@ namespace constellation::pools {
template <typename MESSAGE, protocol::CHIRP::ServiceIdentifier SERVICE, zmq::socket_type SOCKET_TYPE>
void BasePool<MESSAGE, SERVICE, SOCKET_TYPE>::stopPool() {
auto* chirp_manager = chirp::Manager::getDefaultInstance();
auto* chirp_manager = utils::ManagerLocator::getCHIRPManager();
if(chirp_manager != nullptr) {
// Unregister CHIRP discovery callback:
chirp_manager->unregisterDiscoverCallback(&BasePool::callback, SERVICE);
......
/**
* @file
* @brief Manager locator
*
* @copyright Copyright (c) 2025 DESY and the Constellation authors.
* This software is distributed under the terms of the EUPL-1.2 License, copied verbatim in the file "LICENSE.md".
* SPDX-License-Identifier: EUPL-1.2
*/
#pragma once
#include <memory>
#include <mutex>
#include <utility>
#include <zmq.hpp>
#include "constellation/build.hpp"
#include "constellation/core/chirp/Manager.hpp"
#include "constellation/core/log/SinkManager.hpp"
#include "constellation/core/metrics/MetricsManager.hpp"
#include "constellation/core/networking/zmq_helpers.hpp"
namespace constellation::utils {
/**
* @brief Manager locator
*
* This class is a singleton that manages the access, creation and destruction of various managers.
* It acts as a single global instance to avoid issues with static order initialization
* when managers have dependencies on each other.
*/
class ManagerLocator {
public:
CNSTLN_API static ManagerLocator& getInstance() {
static ManagerLocator instance {};
return instance;
}
// No copy/move constructor/assignment
/// @cond doxygen_suppress
ManagerLocator(ManagerLocator& other) = delete;
ManagerLocator& operator=(ManagerLocator other) = delete;
ManagerLocator(ManagerLocator&& other) = delete;
ManagerLocator& operator=(ManagerLocator&& other) = delete;
/// @endcond
/**
* @brief Return the sink manager
*/
CNSTLN_API static log::SinkManager& getSinkManager() { return *getInstance().sink_manager_; }
/**
* @brief Return the metrics manager
*/
CNSTLN_API static metrics::MetricsManager& getMetricsManager() {
auto& instance = getInstance();
std::call_once(instance.creation_flag_, [&]() { instance.create_dependent_managers(); });
return *instance.metrics_manager_;
}
/**
* @brief Return the CHIRP manager
*/
CNSTLN_API static chirp::Manager* getCHIRPManager() {
auto& instance = getInstance();
// TODO(stephan.lachnit): use create_dependent_managers() once CHIRPManager has been reworked
return instance.chirp_manager_.get();
}
/**
* @brief Create the default CHIRP manager
*/
CNSTLN_API static void setDefaultCHIRPManager(std::unique_ptr<chirp::Manager> manager) {
getInstance().chirp_manager_ = std::move(manager);
}
~ManagerLocator() {
// Stop the subscription loop in the CMDP sink
sink_manager_->disableCMDPSending();
// Destruction order: CHIRPManager, MetricsManager, SinkManager, global ZeroMQ context
chirp_manager_.reset();
metrics_manager_.reset();
sink_manager_.reset();
zmq_context_.reset();
}
private:
ManagerLocator() {
// Creation order: global ZeroMQ context, SinkManager, MetricsManager, CHIRPManager
zmq_context_ = networking::global_zmq_context(); // NOLINT(cppcoreguidelines-prefer-member-initializer)
sink_manager_ = std::unique_ptr<log::SinkManager>(new log::SinkManager());
// Cannot create MetricsManager and CHIRP Manager during creation
// since they require a ManagerLocator instance to get SinkManager instance for logging
}
void create_dependent_managers() {
// Creation order: MetricsManager, CHIRPManager
metrics_manager_ = std::unique_ptr<metrics::MetricsManager>(new metrics::MetricsManager());
// TODO(stephan.lachnit): CHIRPManager requires rework to be able to be created here
}
private:
std::shared_ptr<zmq::context_t> zmq_context_;
std::unique_ptr<log::SinkManager> sink_manager_;
std::unique_ptr<metrics::MetricsManager> metrics_manager_;
std::unique_ptr<chirp::Manager> chirp_manager_;
std::once_flag creation_flag_;
};
} // namespace constellation::utils