Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • constellation/constellation
  • annika.vauth/constellation
  • finn.feindt/constellation
  • cbespin/constellation
  • stephan.lachnit/constellation
  • malinda.de.silva/constellation
6 results
Show changes
Commits on Source (36)
Showing
with 432 additions and 61 deletions
......@@ -19,6 +19,7 @@
#include <memory>
#include <mutex>
#include <optional>
#include <set>
#include <stop_token>
#include <string>
#include <string_view>
......@@ -31,12 +32,14 @@
#include <zmq_addon.hpp>
#include "constellation/core/chirp/Manager.hpp"
#include "constellation/core/config/Dictionary.hpp"
#include "constellation/core/log/Level.hpp"
#include "constellation/core/log/log.hpp"
#include "constellation/core/log/Logger.hpp"
#include "constellation/core/log/SinkManager.hpp"
#include "constellation/core/message/CMDP1Message.hpp"
#include "constellation/core/metrics/Metric.hpp"
#include "constellation/core/metrics/MetricsManager.hpp"
#include "constellation/core/networking/exceptions.hpp"
#include "constellation/core/networking/zmq_helpers.hpp"
#include "constellation/core/protocol/CHIRP_definitions.hpp"
......@@ -46,6 +49,7 @@
#include "constellation/core/utils/windows.hpp"
using namespace constellation;
using namespace constellation::config;
using namespace constellation::log;
using namespace constellation::message;
using namespace constellation::metrics;
......@@ -125,54 +129,95 @@ void CMDPSink::subscription_loop(const std::stop_token& stop_token) {
body.remove_prefix(1);
LOG(*logger_, TRACE) << "Received " << (subscribe ? "" : "un") << "subscribe message for " << body;
// TODO(simonspa) At some point we also have to treat STAT here
if(!body.starts_with("LOG/")) {
continue;
// Handle subscriptions as well as notification subscriptions:
if(body.starts_with("LOG/")) {
handle_log_subscriptions(subscribe, body);
} else if(body.starts_with("LOG?") && subscribe) {
SinkManager::getInstance().sendLogNotification();
} else if(body.starts_with("STAT/")) {
handle_stat_subscriptions(subscribe, body);
} else if(body.starts_with("STAT?") && subscribe) {
SinkManager::getInstance().sendMetricNotification();
} else {
LOG(*logger_, WARNING) << "Received " << (subscribe ? "" : "un") << "subscribe message with invalid topic "
<< body << ", ignoring";
}
}
}
const auto level_endpos = body.find_first_of('/', 4);
const auto level_str = body.substr(4, level_endpos - 4);
void CMDPSink::handle_log_subscriptions(bool subscribe, std::string_view body) {
// Empty level means subscription to everything
const auto level = (level_str.empty() ? std::optional<Level>(TRACE) : enum_cast<Level>(level_str));
const auto level_endpos = body.find_first_of('/', 4);
const auto level_str = body.substr(4, level_endpos - 4);
// Only accept valid levels
if(!level.has_value()) {
LOG(*logger_, TRACE) << "Invalid log level " << std::quoted(level_str) << ", ignoring";
continue;
}
// Empty level means subscription to everything
const auto level = (level_str.empty() ? std::optional<Level>(TRACE) : enum_cast<Level>(level_str));
const auto topic = (level_endpos != std::string_view::npos ? body.substr(level_endpos + 1) : std::string_view());
const auto topic_uc = transform(topic, ::toupper);
LOG(*logger_, TRACE) << "In/decrementing subscription counters for topic " << std::quoted(topic_uc);
// Only accept valid levels
if(!level.has_value()) {
LOG(*logger_, TRACE) << "Invalid log level " << std::quoted(level_str) << ", ignoring";
return;
}
if(subscribe) {
log_subscriptions_[topic_uc][level.value()] += 1;
} else {
if(log_subscriptions_[topic_uc][level.value()] > 0) {
log_subscriptions_[topic_uc][level.value()] -= 1;
}
const auto topic = (level_endpos != std::string_view::npos ? body.substr(level_endpos + 1) : std::string_view());
const auto topic_uc = transform(topic, ::toupper);
LOG(*logger_, TRACE) << "In/decrementing subscription counters for topic " << std::quoted(topic_uc);
if(subscribe) {
log_subscriptions_[topic_uc][level.value()] += 1;
} else {
if(log_subscriptions_[topic_uc][level.value()] > 0) {
log_subscriptions_[topic_uc][level.value()] -= 1;
}
}
// Figure out lowest level for each topic
auto cmdp_global_level = Level::OFF;
std::map<std::string_view, Level> cmdp_sub_topic_levels;
for(const auto& [logger, levels] : log_subscriptions_) {
auto it = std::ranges::find_if(levels, [](const auto& i) { return i.second > 0; });
if(it != levels.end()) {
if(!logger.empty()) {
cmdp_sub_topic_levels[logger] = it->first;
} else {
cmdp_global_level = it->first;
}
// Figure out lowest level for each topic
auto cmdp_global_level = Level::OFF;
std::map<std::string_view, Level> cmdp_sub_topic_levels;
for(const auto& [logger, levels] : log_subscriptions_) {
auto it = std::ranges::find_if(levels, [](const auto& i) { return i.second > 0; });
if(it != levels.end()) {
if(!logger.empty()) {
cmdp_sub_topic_levels[logger] = it->first;
} else {
cmdp_global_level = it->first;
}
}
}
LOG(*logger_, TRACE) << "Lowest global log level: " << std::quoted(enum_name(cmdp_global_level));
LOG(*logger_, TRACE) << "Lowest global log level: " << std::quoted(enum_name(cmdp_global_level));
// Update subscriptions
SinkManager::getInstance().updateCMDPLevels(cmdp_global_level, std::move(cmdp_sub_topic_levels));
}
// Update subscriptions
SinkManager::getInstance().updateCMDPLevels(cmdp_global_level, std::move(cmdp_sub_topic_levels));
void CMDPSink::handle_stat_subscriptions(bool subscribe, std::string_view body) {
const auto topic = body.substr(5);
const auto topic_uc = transform(topic, ::toupper);
LOG(*logger_, TRACE) << "In/decrementing subscription counters for topic " << std::quoted(topic_uc);
if(subscribe) {
stat_subscriptions_[topic_uc] += 1;
} else {
if(stat_subscriptions_[topic_uc] > 0) {
stat_subscriptions_[topic_uc] -= 1;
}
}
// Global subscription to all topics
const auto global_subscription = (stat_subscriptions_.contains("") && stat_subscriptions_.at("") > 0);
// List of subscribed topics:
std::set<std::string_view> subscription_topics;
for(const auto& [topic, counter] : stat_subscriptions_) {
if(counter > 0) {
subscription_topics.insert(topic);
}
}
// Update subscriptions
MetricsManager::getInstance().updateSubscriptions(global_subscription, std::move(subscription_topics));
}
void CMDPSink::enableSending(std::string sender_name) {
......@@ -236,3 +281,18 @@ void CMDPSink::sinkMetric(MetricValue metric_value) {
throw NetworkError(e.what());
}
}
void CMDPSink::sinkNotification(const std::string& id, Dictionary topics) {
// Create message header
auto msghead = CMDP1Message::Header(sender_name_, std::chrono::system_clock::now());
// Lock the mutex - automatically done for regular logging:
const std::lock_guard<std::mutex> lock {mutex_};
try {
// Create and send CMDP message
CMDP1Notification(std::move(msghead), id, std::move(topics)).assemble().send(pub_socket_);
} catch(const zmq::error_t& e) {
throw NetworkError(e.what());
}
}
......@@ -15,12 +15,14 @@
#include <mutex>
#include <stop_token>
#include <string>
#include <string_view>
#include <thread>
#include <spdlog/async_logger.h>
#include <spdlog/sinks/base_sink.h>
#include <zmq.hpp>
#include "constellation/core/config/Dictionary.hpp"
#include "constellation/core/log/Level.hpp"
#include "constellation/core/log/Logger.hpp"
#include "constellation/core/metrics/Metric.hpp"
......@@ -74,6 +76,14 @@ namespace constellation::log {
*/
void sinkMetric(metrics::MetricValue metric_value);
/**
* Sink notification
*
* @param id Notification type
* @param topics Topics for the given notification type
*/
void sinkNotification(const std::string& id, config::Dictionary topics);
protected:
void sink_it_(const spdlog::details::log_msg& msg) final;
void flush_() final {}
......@@ -81,6 +91,10 @@ namespace constellation::log {
private:
void subscription_loop(const std::stop_token& stop_token);
void handle_log_subscriptions(bool subscribe, std::string_view body);
void handle_stat_subscriptions(bool subscribe, std::string_view body);
private:
std::unique_ptr<Logger> logger_;
......@@ -93,6 +107,7 @@ namespace constellation::log {
std::jthread subscription_thread_;
std::map<std::string, std::map<Level, std::size_t>> log_subscriptions_;
std::map<std::string, std::size_t> stat_subscriptions_;
};
} // namespace constellation::log
......@@ -29,9 +29,11 @@
#include <wincon.h>
#endif
#include "constellation/core/config/Dictionary.hpp"
#include "constellation/core/log/CMDPSink.hpp"
#include "constellation/core/log/Level.hpp"
#include "constellation/core/log/ProxySink.hpp"
#include "constellation/core/metrics/MetricsManager.hpp"
#include "constellation/core/networking/zmq_helpers.hpp"
#include "constellation/core/utils/string.hpp"
......@@ -233,3 +235,25 @@ void SinkManager::updateCMDPLevels(Level cmdp_global_level, std::map<std::string
calculate_log_level(logger);
}
}
void SinkManager::sendMetricNotification() {
const auto descriptions = metrics::MetricsManager::getInstance().getMetricsDescriptions();
config::Dictionary payload;
for(const auto& [key, value] : descriptions) {
payload.emplace(key, value);
}
cmdp_sink_->sinkNotification("STAT?", std::move(payload));
}
void SinkManager::sendLogNotification() {
config::Dictionary payload;
std::unique_lock loggers_lock {loggers_mutex_};
for(const auto& logger : loggers_) {
// TODO(simonspa): Loggers don't have a description yet - leaving empty
payload.emplace(logger->name(), "");
}
loggers_lock.unlock();
cmdp_sink_->sinkNotification("LOG?", std::move(payload));
}
......@@ -91,6 +91,16 @@ namespace constellation::log {
*/
void sendCMDPMetric(metrics::MetricValue metric_value) { cmdp_sink_->sinkMetric(std::move(metric_value)); }
/**
* Send CMDP Metric topic notification message via the CMDP sink
*/
void sendMetricNotification();
/**
* Send CMDP Log topic notification message via the CMDP sink
*/
void sendLogNotification();
/**
* @brief Get an asynchronous spdlog logger with a given topic
*
......
......@@ -18,6 +18,7 @@
#include <zmq.hpp>
#include <zmq_addon.hpp>
#include "constellation/core/config/Dictionary.hpp"
#include "constellation/core/log/Level.hpp"
#include "constellation/core/message/exceptions.hpp"
#include "constellation/core/message/PayloadBuffer.hpp"
......@@ -46,6 +47,10 @@ bool CMDP1Message::isStatMessage() const {
return topic_.starts_with("STAT/");
}
bool CMDP1Message::isNotification() const {
return topic_.starts_with("STAT?") || topic_.starts_with("LOG?");
}
zmq::multipart_t CMDP1Message::assemble() {
zmq::multipart_t frames {};
......@@ -70,8 +75,9 @@ CMDP1Message CMDP1Message::disassemble(zmq::multipart_t& frames) {
// Decode topic
const auto topic = frames.pop().to_string();
if(!(topic.starts_with("LOG/") || topic.starts_with("STAT/"))) {
throw MessageDecodingError("Invalid message topic, neither log nor telemetry message");
if(!(topic.starts_with("LOG/") || topic.starts_with("STAT/") || topic.starts_with("LOG?") ||
topic.starts_with("STAT?"))) {
throw MessageDecodingError("Invalid message topic \"" + topic + "\", neither log nor telemetry message");
}
// Check if valid log level by trying to decode it
......@@ -90,9 +96,21 @@ CMDP1Message CMDP1Message::disassemble(zmq::multipart_t& frames) {
return {topic, header, std::move(payload)};
}
std::string_view CMDP1Message::getTopic() const {
if(topic_.starts_with("LOG/")) {
// Search for second slash after "LOG/" to get substring with log topic
const auto level_endpos = topic_.find_first_of('/', 4);
return topic_.substr(level_endpos + 1);
} else if(topic_.starts_with("STAT/")) {
return topic_.substr(5);
}
throw IncorrectMessageType("Neither log nor stat message");
}
Level CMDP1Message::get_log_level_from_topic(std::string_view topic) {
if(!topic.starts_with("LOG/")) {
throw MessageDecodingError("Not a log message");
throw IncorrectMessageType("Not a log message");
}
// Search for second slash after "LOG/" to get substring with log level
......@@ -117,7 +135,7 @@ CMDP1LogMessage::CMDP1LogMessage(CMDP1Message&& message) : CMDP1Message(std::mov
throw IncorrectMessageType("Not a log message");
}
const auto topic = getTopic();
const auto topic = getMessageTopic();
level_ = get_log_level_from_topic(topic);
// Search for a '/' after "LOG/"
......@@ -144,11 +162,11 @@ CMDP1StatMessage::CMDP1StatMessage(Header header, metrics::MetricValue metric_va
CMDP1StatMessage::CMDP1StatMessage(CMDP1Message&& message) : CMDP1Message(std::move(message)) {
if(!isStatMessage()) {
throw MessageDecodingError("Not a telemetry message");
throw IncorrectMessageType("Not a telemetry message");
}
// Assign topic after prefix "STAT/"
const auto topic = std::string(getTopic().substr(5));
const auto topic = std::string(getMessageTopic().substr(5));
try {
metric_value_ = MetricValue::disassemble(topic, get_payload());
......@@ -161,3 +179,23 @@ CMDP1StatMessage CMDP1StatMessage::disassemble(zmq::multipart_t& frames) {
// Use disassemble from base class and cast via constructor
return {CMDP1Message::disassemble(frames)};
}
CMDP1Notification::CMDP1Notification(Header header, const std::string& id, Dictionary topics)
: CMDP1Message(id, std::move(header), topics.assemble()), topics_(std::move(topics)) {}
CMDP1Notification::CMDP1Notification(CMDP1Message&& message) : CMDP1Message(std::move(message)) {
if(!isNotification()) {
throw IncorrectMessageType("Not a CMDP notification");
}
try {
topics_ = Dictionary::disassemble(get_payload());
} catch(const std::invalid_argument& e) {
throw MessageDecodingError(e.what());
}
}
CMDP1Notification CMDP1Notification::disassemble(zmq::multipart_t& frames) {
// Use disassemble from base class and cast via constructor
return {CMDP1Message::disassemble(frames)};
}
......@@ -19,6 +19,7 @@
#include <zmq_addon.hpp>
#include "constellation/build.hpp"
#include "constellation/core/config/Dictionary.hpp"
#include "constellation/core/log/Level.hpp"
#include "constellation/core/message/BaseHeader.hpp"
#include "constellation/core/message/PayloadBuffer.hpp"
......@@ -58,7 +59,13 @@ namespace constellation::message {
/**
* @return CMDP message topic
*/
std::string_view getTopic() const { return topic_; };
std::string_view getMessageTopic() const { return topic_; };
/**
* @brief Get topic without CMDP identifier (LOG or STAT)
* @return topic
*/
CNSTLN_API std::string_view getTopic() const;
/**
* @return If the message is a log message
......@@ -70,6 +77,11 @@ namespace constellation::message {
*/
CNSTLN_API bool isStatMessage() const;
/**
* @return if the message is a notification
*/
CNSTLN_API bool isNotification() const;
/**
* Assemble full message to frames for ZeroMQ
*
......@@ -207,4 +219,43 @@ namespace constellation::message {
metrics::MetricValue metric_value_;
};
class CMDP1Notification : public CMDP1Message {
public:
/**
* Construct a new CMDP1 message for metrics
*
* @note The message topic will be taken as the metric name
*
* @param header CMDP1 header of the message
* @param id Identifier of notification
* @param topics Dictionary with available topics for the given identifier
*/
CNSTLN_API CMDP1Notification(Header header, const std::string& id, config::Dictionary topics);
/**
* Construct a CMDP1Notification from a decoded CMDP1Message
*
* @throw IncorrectMessageType If the message is not a (valid) notification
*/
CNSTLN_API CMDP1Notification(CMDP1Message&& message);
/**
* @return List of topics
*/
CNSTLN_API const config::Dictionary& getTopics() const { return topics_; }
/**
* Disassemble stats message from ZeroMQ frames
*
* This function moves the payload.
*
* @return New CMDP1Notification assembled from ZeroMQ frames
* @throw MessageDecodingError If the message is not a valid CMDP1 message
* @throw IncorrectMessageType If the message is a valid CMDP1 message but not a (valid) stat message
*/
CNSTLN_API static CMDP1Notification disassemble(zmq::multipart_t& frames);
private:
config::Dictionary topics_;
};
} // namespace constellation::message
......@@ -16,6 +16,7 @@
#include <iomanip>
#include <memory>
#include <mutex>
#include <set>
#include <stop_token>
#include <string>
#include <thread>
......@@ -48,11 +49,24 @@ MetricsManager::~MetricsManager() noexcept {
}
}
void MetricsManager::registerMetric(std::shared_ptr<Metric> metric) {
bool MetricsManager::shouldStat(std::string_view name) const {
return global_subscription_ || subscribed_topics_.contains(name);
}
void MetricsManager::updateSubscriptions(bool global, std::set<std::string_view> topic_subscriptions) {
// Acquire lock for metric variables and update them
const std::lock_guard levels_lock {metrics_mutex_};
global_subscription_ = global;
subscribed_topics_ = std::move(topic_subscriptions);
}
void MetricsManager::registerMetric(std::shared_ptr<Metric> metric, std::string description) {
const auto name = std::string(metric->name());
std::unique_lock metrics_lock {metrics_mutex_};
const auto [it, inserted] = metrics_.insert_or_assign(name, std::move(metric));
metrics_descriptions_.insert_or_assign(name, std::move(description));
SinkManager::getInstance().sendMetricNotification();
metrics_lock.unlock();
if(!inserted) {
......@@ -66,12 +80,14 @@ void MetricsManager::registerMetric(std::shared_ptr<Metric> metric) {
LOG(logger_, DEBUG) << "Successfully registered metric " << std::quoted(name);
}
void MetricsManager::registerTimedMetric(std::shared_ptr<TimedMetric> metric) {
void MetricsManager::registerTimedMetric(std::shared_ptr<TimedMetric> metric, std::string description) {
const auto name = std::string(metric->name());
// Add to metrics map
std::unique_lock metrics_lock {metrics_mutex_};
const auto [it, inserted] = metrics_.insert_or_assign(name, metric);
metrics_descriptions_.insert_or_assign(name, std::move(description));
SinkManager::getInstance().sendMetricNotification();
metrics_lock.unlock();
if(!inserted) {
......@@ -95,6 +111,11 @@ void MetricsManager::unregisterMetric(std::string_view name) {
if(it != metrics_.end()) {
metrics_.erase(it);
}
auto itd = metrics_descriptions_.find(name);
if(itd != metrics_descriptions_.end()) {
metrics_descriptions_.erase(itd);
}
SinkManager::getInstance().sendMetricNotification();
metrics_lock.unlock();
std::unique_lock timed_metrics_lock {timed_metrics_mutex_};
......@@ -108,6 +129,8 @@ void MetricsManager::unregisterMetric(std::string_view name) {
void MetricsManager::unregisterMetrics() {
std::unique_lock metrics_lock {metrics_mutex_};
metrics_.clear();
metrics_descriptions_.clear();
SinkManager::getInstance().sendMetricNotification();
metrics_lock.unlock();
std::unique_lock timed_metrics_lock {timed_metrics_mutex_};
......@@ -160,8 +183,8 @@ void MetricsManager::run(const std::stop_token& stop_token) {
// Check timed metrics
const std::lock_guard timed_metrics_lock {timed_metrics_mutex_};
for(auto& [name, timed_metric] : timed_metrics_) {
// If last time sent larger than interval and allowed -> send metric
if(timed_metric.timeoutReached()) {
// If last time sent larger than interval and allowed and there is a subscription -> send metric
if(timed_metric.timeoutReached() && shouldStat(name)) {
auto value = timed_metric->currentValue();
if(value.has_value()) {
LOG(logger_, TRACE) << "Sending metric " << std::quoted(timed_metric->name()) << ": "
......
......@@ -15,6 +15,7 @@
#include <memory>
#include <mutex>
#include <queue>
#include <set>
#include <stop_token>
#include <string>
#include <string_view>
......@@ -52,8 +53,9 @@ namespace constellation::metrics {
* Register a (manually triggered) metric
*
* @param metric Shared pointer to the metric
* @param description Description of the metric
*/
CNSTLN_API void registerMetric(std::shared_ptr<Metric> metric);
CNSTLN_API void registerMetric(std::shared_ptr<Metric> metric, std::string description);
/**
* Register a (manually triggered) metric
......@@ -61,15 +63,17 @@ namespace constellation::metrics {
* @param name Unique topic of the metric
* @param unit Unit of the provided value
* @param type Type of the metric
* @param description Description of the metric
*/
void registerMetric(std::string name, std::string unit, metrics::MetricType type);
void registerMetric(std::string name, std::string unit, metrics::MetricType type, std::string description);
/**
* Register a timed metric
*
* @param metric Shared pointer to the timed metric
* @param description Description of the metric
*/
CNSTLN_API void registerTimedMetric(std::shared_ptr<TimedMetric> metric);
CNSTLN_API void registerTimedMetric(std::shared_ptr<TimedMetric> metric, std::string description);
/**
* Register a timed metric
......@@ -77,6 +81,7 @@ namespace constellation::metrics {
* @param name Name of the metric
* @param unit Unit of the metric as human readable string
* @param type Type of the metric
* @param description Description of the metric
* @param interval Interval in which to send the metric
* @param value_callback Callback to determine the current value of the metric
*/
......@@ -85,6 +90,7 @@ namespace constellation::metrics {
void registerTimedMetric(std::string name,
std::string unit,
metrics::MetricType type,
std::string description,
std::chrono::steady_clock::duration interval,
C value_callback);
......@@ -105,7 +111,7 @@ namespace constellation::metrics {
/**
* Check if a metric should be send given the subscription status
*/
static constexpr bool shouldStat(std::string_view /*name*/) { return true; }
CNSTLN_API bool shouldStat(std::string_view name) const;
/**
* Manually trigger a metric
......@@ -115,6 +121,21 @@ namespace constellation::metrics {
*/
CNSTLN_API void triggerMetric(std::string name, config::Value value);
/**
* @brief Update topic subscriptions
*
* @param global Global Flag for global subscription to all topics
* @param topic_subscriptions List of individual subscription topics
*/
void updateSubscriptions(bool global, std::set<std::string_view> topic_subscriptions = {});
/**
* @brief Obtain map of registered metrics along with their descriptions
*
* @return Map with metric descriptions
*/
utils::string_hash_map<std::string> getMetricsDescriptions() const { return metrics_descriptions_; }
private:
MetricsManager();
......@@ -148,8 +169,13 @@ namespace constellation::metrics {
private:
log::Logger logger_;
// List of topics with active subscribers:
std::set<std::string_view> subscribed_topics_;
bool global_subscription_;
// Contains all metrics, including timed ones
utils::string_hash_map<std::shared_ptr<Metric>> metrics_;
utils::string_hash_map<std::string> metrics_descriptions_;
std::mutex metrics_mutex_;
// Only timed metrics for background thread
......
......@@ -28,8 +28,9 @@
namespace constellation::metrics {
inline void MetricsManager::registerMetric(std::string name, std::string unit, metrics::MetricType type) {
registerMetric(std::make_shared<metrics::Metric>(std::move(name), std::move(unit), type));
inline void
MetricsManager::registerMetric(std::string name, std::string unit, metrics::MetricType type, std::string description) {
registerMetric(std::make_shared<metrics::Metric>(std::move(name), std::move(unit), type), std::move(description));
};
template <typename C>
......@@ -37,6 +38,7 @@ namespace constellation::metrics {
void MetricsManager::registerTimedMetric(std::string name,
std::string unit,
metrics::MetricType type,
std::string description,
std::chrono::steady_clock::duration interval,
C value_callback) {
std::function<std::optional<config::Value>()> value_callback_cast =
......@@ -59,7 +61,8 @@ namespace constellation::metrics {
}
};
registerTimedMetric(
std::make_shared<TimedMetric>(std::move(name), std::move(unit), type, interval, std::move(value_callback_cast)));
std::make_shared<TimedMetric>(std::move(name), std::move(unit), type, interval, std::move(value_callback_cast)),
std::move(description));
}
} // namespace constellation::metrics
......@@ -11,6 +11,7 @@
#include <algorithm>
#include <functional>
#include <map>
#include <mutex>
#include <set>
#include <string>
......@@ -27,7 +28,8 @@ using namespace constellation::listener;
using namespace constellation::message;
CMDPListener::CMDPListener(std::string_view log_topic, std::function<void(CMDP1Message&&)> callback)
: SubscriberPoolT(log_topic, std::move(callback)) {}
: SubscriberPoolT(log_topic, [this](auto&& arg) { handle_message(std::forward<decltype(arg)>(arg)); }),
callback_(std::move(callback)) {}
void CMDPListener::host_connected(const chirp::DiscoveredService& service) {
const std::lock_guard subscribed_topics_lock {subscribed_topics_mutex_};
......@@ -48,6 +50,45 @@ void CMDPListener::host_connected(const chirp::DiscoveredService& service) {
}
}
void CMDPListener::handle_message(message::CMDP1Message&& msg) {
if(msg.isNotification()) {
// Handle notification message:
const auto notification = CMDP1Notification(std::move(msg));
const auto& topics = notification.getTopics();
const auto sender = notification.getHeader().getSender();
const std::lock_guard available_topics_lock {available_topics_mutex_};
const auto [it, inserted] = available_topics_.emplace(sender, std::map<std::string, std::string>());
if(!inserted) {
it->second.clear();
}
for(const auto& t : topics) {
it->second.emplace(t.first, t.second.str());
}
// Call method for derived classes to propagate information
topics_available(sender, it->second);
} else {
// Pass regular messages on to registered callback
callback_(std::move(msg));
}
}
void CMDPListener::topics_available(std::string_view /* sender */, const std::map<std::string, std::string>& /* topics */) {}
std::map<std::string, std::string> CMDPListener::getAvailableTopics(std::string_view sender) {
const std::lock_guard topics_lock {available_topics_mutex_};
const auto sender_it = available_topics_.find(sender);
if(sender_it != available_topics_.end()) {
return sender_it->second;
}
return {};
}
void CMDPListener::subscribeTopic(std::string topic) {
multiscribeTopics({}, {std::move(topic)});
}
......
......@@ -10,6 +10,7 @@
#pragma once
#include <functional>
#include <map>
#include <mutex>
#include <set>
#include <string>
......@@ -130,6 +131,15 @@ namespace constellation::listener {
*/
CNSTLN_API void removeExtraTopicSubscriptions();
/**
* @brief Obtain available topics for given sender. Topics are parsed from CMDP notification messages and cached per
* sender. Returns an empty map if the sender is not known or has not sent a topic notification yet.
*
* @param sender Sending CMDP host to get available topics for
* @return Map with available topics as keys and their description as values
*/
CNSTLN_API std::map<std::string, std::string> getAvailableTopics(std::string_view sender);
protected:
/**
* @brief Method for derived classes to act on newly connected sockets
......@@ -139,15 +149,40 @@ namespace constellation::listener {
*/
CNSTLN_API void host_connected(const chirp::DiscoveredService& service) override;
/**
* @brief Method for derived classes to act on topic notifications
*
* @param sender CMDP sending host of the topic notification
* @param topics Map with notification topics and their description
*/
CNSTLN_API virtual void topics_available(std::string_view sender, const std::map<std::string, std::string>& topics);
private:
/**
* @brief Helper methods to separate notification messages from regular CMDP messages. Notifications are handled
* internally while regular messages are passed on to the provided callback of the implementing class.
*
* @param msg CMDP message to process
*/
void handle_message(message::CMDP1Message&& msg);
private:
// Hide subscribe/unsubscribe functions from SubscriberPool
using SubscriberPoolT::subscribe;
using SubscriberPoolT::unsubscribe;
private:
/* Callback */
std::function<void(message::CMDP1Message&&)> callback_;
/* Subscribed topics */
std::mutex subscribed_topics_mutex_;
std::set<std::string> subscribed_topics_;
utils::string_hash_map<std::set<std::string>> extra_subscribed_topics_;
/* Available topics from notification */
std::mutex available_topics_mutex_;
utils::string_hash_map<std::map<std::string, std::string>> available_topics_;
};
} // namespace constellation::listener
......@@ -36,7 +36,10 @@ using namespace constellation::utils;
LogListener::LogListener(std::string_view log_topic, std::function<void(CMDP1LogMessage&&)> callback)
: CMDPListener(log_topic,
[callback = std::move(callback)](CMDP1Message&& msg) { callback(CMDP1LogMessage(std::move(msg))); }),
global_log_level_(Level::OFF) {}
global_log_level_(Level::OFF) {
// Subscribe to notifications:
CMDPListener::subscribeTopic("LOG?");
}
std::vector<std::string> LogListener::generate_topics(const std::string& log_topic, Level level, bool subscribe) {
std::vector<std::string> topics {};
......
......@@ -58,6 +58,7 @@ ReceiverSatellite::ReceiverSatellite(std::string_view type, std::string_view nam
register_timed_metric("BYTES_RECEIVED",
"B",
MetricType::LAST_VALUE,
"Number of bytes received by this satellite in the current run",
10s,
{CSCP::State::starting, CSCP::State::RUN, CSCP::State::stopping},
[this]() { return bytes_received_.load(); });
......
......@@ -158,8 +158,9 @@ namespace constellation::satellite {
* @param name Unique topic of the metric
* @param unit Unit of the provided value
* @param type Type of the metric
* @param description Description of the metric
*/
static void register_metric(std::string name, std::string unit, metrics::MetricType type);
static void register_metric(std::string name, std::string unit, metrics::MetricType type, std::string description);
/**
* @brief Register a metric which will be emitted in regular intervals, evaluated from the provided function
......@@ -167,6 +168,7 @@ namespace constellation::satellite {
* @param name Name of the metric
* @param unit Unit of the metric as human readable string
* @param type Type of the metric
* @param description Description of the metric
* @param interval Interval in which to send the metric
* @param value_callback Callback to determine the current value of the metric
*/
......@@ -175,6 +177,7 @@ namespace constellation::satellite {
static void register_timed_metric(std::string name,
std::string unit,
metrics::MetricType type,
std::string description,
std::chrono::steady_clock::duration interval,
C value_callback);
......@@ -184,6 +187,7 @@ namespace constellation::satellite {
* @param name Name of the metric
* @param unit Unit of the metric as human readable string
* @param type Type of the metric
* @param description Description of the metric
* @param interval Interval in which to send the metric
* @param allowed_states Set of states in which the callback is allowed
* @param value_callback Callback to determine the current value of the metric
......@@ -193,6 +197,7 @@ namespace constellation::satellite {
void register_timed_metric(std::string name,
std::string unit,
metrics::MetricType type,
std::string description,
std::chrono::steady_clock::duration interval,
std::set<protocol::CSCP::State> allowed_states,
C value_callback);
......
......@@ -25,8 +25,10 @@
namespace constellation::satellite {
inline void Satellite::register_metric(std::string name, std::string unit, metrics::MetricType type) {
metrics::MetricsManager::getInstance().registerMetric(std::move(name), std::move(unit), type);
inline void
Satellite::register_metric(std::string name, std::string unit, metrics::MetricType type, std::string description) {
metrics::MetricsManager::getInstance().registerMetric(
std::move(name), std::move(unit), type, std::move(description));
}
template <typename C>
......@@ -34,10 +36,11 @@ namespace constellation::satellite {
inline void Satellite::register_timed_metric(std::string name,
std::string unit,
metrics::MetricType type,
std::string description,
std::chrono::steady_clock::duration interval,
C value_callback) {
metrics::MetricsManager::getInstance().registerTimedMetric(
std::move(name), std::move(unit), type, interval, std::move(value_callback));
std::move(name), std::move(unit), type, std::move(description), interval, std::move(value_callback));
}
template <typename C>
......@@ -45,6 +48,7 @@ namespace constellation::satellite {
inline void Satellite::register_timed_metric(std::string name,
std::string unit,
metrics::MetricType type,
std::string description,
std::chrono::steady_clock::duration interval,
std::set<protocol::CSCP::State> allowed_states,
C value_callback) {
......@@ -52,6 +56,7 @@ namespace constellation::satellite {
std::move(name),
std::move(unit),
type,
std::move(description),
interval,
[this, allowed_states = std::move(allowed_states), value_callback = std::move(value_callback)]() {
std::optional<std::invoke_result_t<C>> retval = std::nullopt;
......
......@@ -48,6 +48,7 @@ TransmitterSatellite::TransmitterSatellite(std::string_view type, std::string_vi
register_timed_metric("BYTES_TRANSMITTED",
"B",
MetricType::LAST_VALUE,
"Number of bytes transmitted by this satellite in the current run",
10s,
{CSCP::State::starting, CSCP::State::RUN, CSCP::State::stopping},
[this]() { return bytes_transmitted_.load(); });
......
......@@ -130,6 +130,11 @@ Observatory::Observatory(std::string_view group_name) : logger_("UI") {
// Connect signals:
connect(&log_listener_, &QLogListener::newSender, this, [&](const QString& sender) { filterSender->addItem(sender); });
connect(&log_listener_, &QLogListener::newTopic, this, [&](const QString& topic) { filterTopic->addItem(topic); });
connect(&log_listener_, &QLogListener::newTopics, this, [&](const QStringList& topics) {
filterTopic->clear();
filterTopic->addItem("- All -");
filterTopic->addItems(topics);
});
connect(&log_listener_, &QLogListener::connectionsChanged, this, [&](std::size_t num) {
labelNrSatellites->setText("<font color='gray'><b>" + QString::number(num) + "</b></font>");
});
......
......@@ -10,6 +10,7 @@
#include "QLogListener.hpp"
#include <cstddef>
#include <map>
#include <mutex>
#include <set>
#include <string>
......@@ -19,6 +20,7 @@
#include <QDateTime>
#include <QObject>
#include <QString>
#include <QStringList>
#include <QVariant>
#include "constellation/core/chirp/Manager.hpp"
......@@ -93,6 +95,22 @@ void QLogListener::host_disconnected(const DiscoveredService& service) {
emit connectionsChanged(countSockets());
}
void QLogListener::topics_available(std::string_view /*sender*/, const std::map<std::string, std::string>& topics) {
QStringList all_topics;
for(const auto& [topic, desc] : topics) {
all_topics.append(QString::fromStdString(topic));
}
all_topics.removeDuplicates();
all_topics.sort();
// Emit signal for global topic list change:
emit newTopics(all_topics);
// emit newSenderTopics(QString::fromStdString(std::string(sender)), {});
}
QVariant QLogListener::data(const QModelIndex& index, int role) const {
if(role != Qt::DisplayRole || !index.isValid()) {
return {};
......
......@@ -11,9 +11,11 @@
#include <cstddef>
#include <deque>
#include <map>
#include <mutex>
#include <set>
#include <string>
#include <string_view>
#include <QAbstractListModel>
#include <QObject>
......@@ -109,6 +111,8 @@ signals:
*/
void newTopic(QString topic);
void newTopics(QStringList topics);
private:
/**
* @brief Callback registered for receiving log messages from the subscription pool
......@@ -122,6 +126,8 @@ private:
void host_connected(const constellation::chirp::DiscoveredService& service) override;
void host_disconnected(const constellation::chirp::DiscoveredService& service) override;
void topics_available(std::string_view sender, const std::map<std::string, std::string>& topics) override;
private:
/** Log messages & access mutex*/
std::deque<constellation::gui::QLogMessage> messages_;
......
......@@ -34,9 +34,10 @@ void SputnikSatellite::initializing(Configuration& config) {
// Obtain the beeping interval from the configuration:
auto interval = config.get<std::uint64_t>("interval", 3000U);
register_timed_metric("BEEP", "beeps", MetricType::LAST_VALUE, std::chrono::milliseconds(interval), []() { return 42; });
register_timed_metric(
"BEEP", "beeps", MetricType::LAST_VALUE, "Sputnik beeps", std::chrono::milliseconds(interval), []() { return 42; });
register_metric("TIME", "s", MetricType::LAST_VALUE);
register_metric("TIME", "s", MetricType::LAST_VALUE, "Sputnik total running time");
}
void SputnikSatellite::launching() {
......