Forked from
asapo / asapo
867 commits behind the upstream repository.
-
George Sedov authoredGeorge Sedov authored
receiver_config.cpp 3.03 KiB
#include "receiver_config.h"
#include "asapo/io/io_factory.h"
#include "asapo/json_parser/json_parser.h"
#include <iostream>
namespace asapo {
ReceiverConfig config;
ReceiverConfigManager::ReceiverConfigManager() : io__{GenerateDefaultIO()} {
}
Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) {
JsonFileParser parser(file_name, &io__);
std::string log_level;
Error err;
bool kafkaEnabled;
std::vector<std::string> kafkaTopics;
(err = parser.GetString("PerformanceDbServer", &config.performance_db_uri)) ||
(err = parser.GetBool("MonitorPerformance", &config.monitor_performance)) ||
(err = parser.GetUInt64("ListenPort", &config.listen_port)) ||
(err = parser.GetUInt64("ReceiveToDiskThresholdMB", &config.receive_to_disk_threshold_mb)) ||
(err = parser.Embedded("DataServer").GetUInt64("ListenPort", &config.dataserver.listen_port)) ||
(err = parser.Embedded("DataServer").GetUInt64("NThreads", &config.dataserver.nthreads)) ||
(err = parser.Embedded("DataCache").GetBool("Use", &config.use_datacache)) ||
(err = parser.Embedded("DataCache").GetUInt64("SizeGB", &config.datacache_size_gb)) ||
(err = parser.Embedded("DataCache").GetUInt64("ReservedShare", &config.datacache_reserved_share)) ||
(err = parser.GetString("DatabaseServer", &config.database_uri)) ||
(err = parser.GetString("DiscoveryServer", &config.discovery_server)) ||
(err = parser.GetString("Tag", &config.tag)) ||
(err = parser.GetString("AuthorizationServer", &config.authorization_server)) ||
(err = parser.GetUInt64("AuthorizationInterval", &config.authorization_interval_ms)) ||
(err = parser.GetString("PerformanceDbName", &config.performance_db_name)) ||
(err = parser.Embedded("DataServer").GetString("AdvertiseURI", &config.dataserver.advertise_uri)) ||
(err = parser.Embedded("DataServer").GetArrayString("NetworkMode", &config.dataserver.network_mode)) ||
(err = parser.Embedded("Metrics").GetBool("Expose", &config.metrics.expose)) ||
(err = parser.Embedded("Metrics").GetUInt64("ListenPort", &config.metrics.listen_port)) ||
(err = parser.GetString("LogLevel", &log_level)) ||
(err = parser.Embedded("Kafka").GetBool("Enabled", &kafkaEnabled));
if (kafkaEnabled) {
// read the configuration only if kafka is enabled. empty configuration means "disabled"
(err = parser.GetDictionaryString("KafkaClient", &config.kafka_config.global_config)) ||
(err = parser.GetArrayObjectMembers("KafkaTopics", &kafkaTopics));
if (err) {
return err;
}
for(const auto& topic : kafkaTopics) {
auto topicConfig = config.kafka_config.topics_config[topic];
err = parser.Embedded("KafkaTopics").GetDictionaryString(topic, &topicConfig);
if (err) {
return err;
}
}
}
config.dataserver.tag = config.tag + "_ds";
config.log_level = StringToLogLevel(log_level, &err);
return err;
}
const ReceiverConfig* GetReceiverConfig() {
return &config;
}
}