Skip to content
Snippets Groups Projects
Forked from asapo / asapo
867 commits behind the upstream repository.
receiver_config.cpp 3.03 KiB
#include "receiver_config.h"
#include "asapo/io/io_factory.h"
#include "asapo/json_parser/json_parser.h"

#include <iostream>

namespace asapo {

ReceiverConfig config;

ReceiverConfigManager::ReceiverConfigManager() : io__{GenerateDefaultIO()} {

}

Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) {
    JsonFileParser parser(file_name, &io__);
    std::string log_level;
    Error err;

    bool kafkaEnabled;
    std::vector<std::string> kafkaTopics;

    (err = parser.GetString("PerformanceDbServer", &config.performance_db_uri)) ||
    (err = parser.GetBool("MonitorPerformance", &config.monitor_performance)) ||
    (err = parser.GetUInt64("ListenPort", &config.listen_port)) ||
    (err = parser.GetUInt64("ReceiveToDiskThresholdMB", &config.receive_to_disk_threshold_mb)) ||
    (err = parser.Embedded("DataServer").GetUInt64("ListenPort", &config.dataserver.listen_port)) ||
    (err = parser.Embedded("DataServer").GetUInt64("NThreads", &config.dataserver.nthreads)) ||
    (err = parser.Embedded("DataCache").GetBool("Use", &config.use_datacache)) ||
    (err = parser.Embedded("DataCache").GetUInt64("SizeGB", &config.datacache_size_gb)) ||
    (err = parser.Embedded("DataCache").GetUInt64("ReservedShare", &config.datacache_reserved_share)) ||
    (err = parser.GetString("DatabaseServer", &config.database_uri)) ||
    (err = parser.GetString("DiscoveryServer", &config.discovery_server)) ||
    (err = parser.GetString("Tag", &config.tag)) ||
    (err = parser.GetString("AuthorizationServer", &config.authorization_server)) ||
    (err = parser.GetUInt64("AuthorizationInterval", &config.authorization_interval_ms)) ||
    (err = parser.GetString("PerformanceDbName", &config.performance_db_name)) ||
    (err = parser.Embedded("DataServer").GetString("AdvertiseURI", &config.dataserver.advertise_uri)) ||
    (err = parser.Embedded("DataServer").GetArrayString("NetworkMode", &config.dataserver.network_mode)) ||
    (err = parser.Embedded("Metrics").GetBool("Expose", &config.metrics.expose)) ||
    (err = parser.Embedded("Metrics").GetUInt64("ListenPort", &config.metrics.listen_port)) ||
    (err = parser.GetString("LogLevel", &log_level)) ||
    (err = parser.Embedded("Kafka").GetBool("Enabled", &kafkaEnabled));



    if (kafkaEnabled) {
        // read the configuration only if kafka is enabled. empty configuration means "disabled"
        (err = parser.GetDictionaryString("KafkaClient", &config.kafka_config.global_config)) ||
        (err = parser.GetArrayObjectMembers("KafkaTopics", &kafkaTopics));

        if (err) {
            return err;
        }

        for(const auto& topic : kafkaTopics) {
            auto topicConfig = config.kafka_config.topics_config[topic];
            err = parser.Embedded("KafkaTopics").GetDictionaryString(topic, &topicConfig);
            if (err) {
                return err;
            }
        }
    }

    config.dataserver.tag = config.tag + "_ds";

    config.log_level = StringToLogLevel(log_level, &err);
    return err;

}

const ReceiverConfig* GetReceiverConfig() {
    return &config;
}


}