diff --git a/CMakeModules/FindRdKafka.cmake b/CMakeModules/FindRdKafka.cmake index 3ef14418cc1ea132789442d434299fe329f3b921..1006e3c59127596c237c83123e2875c41c5a733a 100644 --- a/CMakeModules/FindRdKafka.cmake +++ b/CMakeModules/FindRdKafka.cmake @@ -11,12 +11,14 @@ cmake_minimum_required(VERSION 3.12) find_path(RDKAFKA_INCLUDE_DIR librdkafka/rdkafka.h HINTS ${RdKafka_DIR}/include) find_library(RDKAFKA_LIBRARIES rdkafka++ HINTS ${RdKafka_DIR}/lib ${RdKafka_DIR}/lib64) +find_library(RDKAFKA_C_LIBRARIES rdkafka HINTS ${RdKafka_DIR}/lib ${RdKafka_DIR}/lib64) + IF(WIN32) find_path(RDKAFKA_BIN_DIR rdkafka++.dll HINTS ${RdKafka_DIR}/bin ${RdKafka_DIR}/lib) mark_as_advanced(RDKAFKA_BIN_DIR) - find_package_handle_standard_args(RdKafka REQUIRED_VARS RDKAFKA_INCLUDE_DIR RDKAFKA_LIBRARIES RDKAFKA_BIN_DIR) + find_package_handle_standard_args(RdKafka REQUIRED_VARS RDKAFKA_INCLUDE_DIR RDKAFKA_C_LIBRARIES RDKAFKA_LIBRARIES RDKAFKA_BIN_DIR) ELSE() - find_package_handle_standard_args(RdKafka REQUIRED_VARS RDKAFKA_INCLUDE_DIR RDKAFKA_LIBRARIES) + find_package_handle_standard_args(RdKafka REQUIRED_VARS RDKAFKA_INCLUDE_DIR RDKAFKA_C_LIBRARIES RDKAFKA_LIBRARIES) ENDIF() mark_as_advanced(RDKAFKA_LIBRARIES RDKAFKA_INCLUDE_DIR) diff --git a/common/cpp/include/asapo/kafka_client/kafka_config.h b/common/cpp/include/asapo/kafka_client/kafka_config.h index 81f707bab9973b5445ce5318c728ff3905c4f517..51bdad9d8272d8e1c56bb158a79c180261ea7a35 100644 --- a/common/cpp/include/asapo/kafka_client/kafka_config.h +++ b/common/cpp/include/asapo/kafka_client/kafka_config.h @@ -7,6 +7,7 @@ namespace asapo { struct KafkaClientConfig { + bool enabled{false}; std::map<std::string, std::string> global_config; std::map<std::string, std::map<std::string, std::string>> topics_config; }; diff --git a/common/cpp/src/kafka_client/rdkafka_client.cpp b/common/cpp/src/kafka_client/rdkafka_client.cpp index 0aabc0de01a1d6d592a4257ce8796d004d2ed3a1..55a24cfc9cc65542ec90aa96766c4c0fe497ee01 100644 --- a/common/cpp/src/kafka_client/rdkafka_client.cpp +++ b/common/cpp/src/kafka_client/rdkafka_client.cpp @@ -6,7 +6,7 @@ namespace asapo { -RdKafkaClient::RdKafkaClient(const KafkaClientConfig& config) : defaultTopicConf(nullptr) { +RdKafkaClient::RdKafkaClient(const KafkaClientConfig& config) : default_topic_conf_(nullptr) { std::string err; RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); for (const auto& configItem : config.global_config) { @@ -15,60 +15,60 @@ RdKafkaClient::RdKafkaClient(const KafkaClientConfig& config) : defaultTopicConf } } - producer = RdKafka::Producer::create(conf, err); - if (!producer) { + producer_ = RdKafka::Producer::create(conf, err); + if (!producer_) { throw "cannot initialize kafka"; } for (const auto& topic : config.topics_config) { - auto topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); + auto topic_config = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); for (const auto& configItem : topic.second) { - if (topicConfig->set(configItem.first, configItem.second, err) != RdKafka::Conf::CONF_OK) { + if (topic_config->set(configItem.first, configItem.second, err) != RdKafka::Conf::CONF_OK) { throw "cannot initialize kafka: " + err; } } if (topic.first == "default") { - this->defaultTopicConf = topicConfig; + default_topic_conf_ = topic_config; } else { - auto topicObj = RdKafka::Topic::create(producer, topic.first, topicConfig, err); - if (!topicObj) { + auto topic_obj = RdKafka::Topic::create(producer_, topic.first, topic_config, err); + if (!topic_obj) { throw "cannot initialize kafka topic [" + topic.first + "]: " + err; } - this->kafkaTopics[topic.first] = topicObj; + kafka_topics_[topic.first] = topic_obj; } } } RdKafkaClient::~RdKafkaClient() { - if (producer) { - producer->flush(1000); + if (producer_) { + producer_->flush(1000); } - delete producer; + delete producer_; } -Error RdKafkaClient::Send(const std::string& data, const std::string& topicName) noexcept { - auto topicIt = this->kafkaTopics.find(topicName); +Error RdKafkaClient::Send(const std::string& data, const std::string& topic_name) noexcept { + auto topicIt = kafka_topics_.find(topic_name); RdKafka::Topic* topic; - if (topicIt == this->kafkaTopics.end()) + if (topicIt == kafka_topics_.end()) { - if (!defaultTopicConf) { + if (!default_topic_conf_) { return KafkaErrorTemplates::kUnknownTopicError.Generate(); } std::string err; - topic = RdKafka::Topic::create(producer, topicName, this->defaultTopicConf, err); + topic = RdKafka::Topic::create(producer_, topic_name, default_topic_conf_, err); if (!topic) { - return KafkaErrorTemplates::kGeneralError.Generate("Cannot create kafka topic [" + topicName + "]: " + err); + return KafkaErrorTemplates::kGeneralError.Generate("Cannot create kafka topic [" + topic_name + "]: " + err); } - this->kafkaTopics[topicName] = topic; + kafka_topics_[topic_name] = topic; } else { topic = topicIt->second; } - auto err = producer->produce(topic, RdKafka::Topic::PARTITION_UA, + auto err = producer_->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, const_cast<void*>(static_cast<const void *>(data.data())), data.size(), nullptr, nullptr); diff --git a/common/cpp/src/kafka_client/rdkafka_client.h b/common/cpp/src/kafka_client/rdkafka_client.h index fcfa163c3356972ff573c8deab3363f1d6020351..3997ff3585b71134ce2f34fdfd3bc6683873b587 100644 --- a/common/cpp/src/kafka_client/rdkafka_client.h +++ b/common/cpp/src/kafka_client/rdkafka_client.h @@ -15,9 +15,9 @@ class RdKafkaClient final : public KafkaClient { virtual ~RdKafkaClient(); private: - RdKafka::Producer* producer; - RdKafka::Conf* defaultTopicConf; - std::map<std::string, RdKafka::Topic *> kafkaTopics; + RdKafka::Producer* producer_; + RdKafka::Conf* default_topic_conf_; + std::map<std::string, RdKafka::Topic *> kafka_topics_; }; } diff --git a/receiver/src/main.cpp b/receiver/src/main.cpp index aedfddc166457000d107d8169e1b1e7dddc3e9cb..77410e80e81e7007826b44871f722578d2bb22ee 100644 --- a/receiver/src/main.cpp +++ b/receiver/src/main.cpp @@ -82,7 +82,7 @@ int StartReceiver(const asapo::ReceiverConfig* config, asapo::SharedCache cache, static const std::string address = "0.0.0.0:" + std::to_string(config->listen_port); logger->Info(std::string("starting receiver, version ") + asapo::kVersion); - auto* receiver = new asapo::Receiver(cache, kafkaClient); + auto receiver = std::unique_ptr<asapo::Receiver>{new asapo::Receiver(cache, kafkaClient)}; logger->Info("listening on " + address); asapo::Error err; @@ -122,7 +122,6 @@ int main(int argc, char* argv[]) { logger->SetLogLevel(config->log_level); asapo::SharedCache cache = nullptr; - asapo::KafkaClient* kafkaClient = nullptr; if (config->use_datacache) { cache.reset(new asapo::DataCache{config->datacache_size_gb * 1024 * 1024 * 1024, (float) config->datacache_reserved_share / 100}); @@ -137,18 +136,19 @@ int main(int argc, char* argv[]) { auto metrics_thread = StartMetricsServer(config->metrics, logger); - if (!config->kafka_config.global_config.empty()) { - kafkaClient = asapo::CreateKafkaClient(config->kafka_config, &err); + std::unique_ptr<asapo::KafkaClient> kafkaClient; + if (config->kafka_config.enabled) { + kafkaClient.reset(asapo::CreateKafkaClient(config->kafka_config, &err)); if (kafkaClient == nullptr) { - logger->Error("Error initializing kafka client: " + err->Explain()); + logger->Error("error initializing kafka client: " + err->Explain()); return EXIT_FAILURE; } } else { - logger->Info("Kafka notification disabled."); + logger->Info("kafka notifications disabled."); } - auto exit_code = StartReceiver(config, cache, kafkaClient, logger); + auto exit_code = StartReceiver(config, cache, kafkaClient.get(), logger); // todo: implement graceful exit, currently it never reaches this point return exit_code; } diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp index 3c162e6ced92e7d4272c781817ae40f2f6c535be..d6f7ca0ac6fad3809fc392754c49e2bf2fa63d54 100644 --- a/receiver/src/receiver_config.cpp +++ b/receiver/src/receiver_config.cpp @@ -17,7 +17,6 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) { std::string log_level; Error err; - bool kafkaEnabled; std::vector<std::string> kafkaTopics; (err = parser.GetString("PerformanceDbServer", &config.performance_db_uri)) || @@ -40,17 +39,15 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) { (err = parser.Embedded("Metrics").GetBool("Expose", &config.metrics.expose)) || (err = parser.Embedded("Metrics").GetUInt64("ListenPort", &config.metrics.listen_port)) || (err = parser.GetString("LogLevel", &log_level)) || - (err = parser.Embedded("Kafka").GetBool("Enabled", &kafkaEnabled)); + (err = parser.Embedded("Kafka").GetBool("Enabled", &config.kafka_config.enabled)); if (err) { return err; } - if (kafkaEnabled) { - // read the configuration only if kafka is enabled. empty configuration means "disabled" + if (config.kafka_config.enabled) { (err = parser.Embedded("Kafka").GetDictionaryString("KafkaClient", &config.kafka_config.global_config)) || (err = parser.Embedded("Kafka").GetArrayObjectMembers("KafkaTopics", &kafkaTopics)); - if (err) { return err; } diff --git a/receiver/src/request_handler/request_factory.cpp b/receiver/src/request_handler/request_factory.cpp index df5757019dd87b3abe80c81a1a8e5083faae9663..c281e71f1dfa5774c90ea36bb2435beaece35aca 100644 --- a/receiver/src/request_handler/request_factory.cpp +++ b/receiver/src/request_handler/request_factory.cpp @@ -32,7 +32,9 @@ void RequestFactory::AddReceiveViaBufferHandlers(std::unique_ptr<Request>& reque request->AddHandler(&request_handler_receivedata_); if (NeedFileWriteHandler(request_header)) { request->AddHandler(&request_handler_filewrite_); - request->AddHandler(&request_handler_kafka_notify_); + if (GetReceiverConfig()->kafka_config.enabled) { + request->AddHandler(&request_handler_kafka_notify_); + } } } @@ -43,7 +45,9 @@ Error RequestFactory::AddReceiveDirectToFileHandler(std::unique_ptr<Request>& re "ingest mode should include kStoreInFilesystem for large files "); } request->AddHandler(&request_handler_filereceive_); - request->AddHandler(&request_handler_kafka_notify_); + if (GetReceiverConfig()->kafka_config.enabled) { + request->AddHandler(&request_handler_kafka_notify_); + } return nullptr; } diff --git a/receiver/src/request_handler/request_handler_kafka_notify.cpp b/receiver/src/request_handler/request_handler_kafka_notify.cpp index 75302e38dd839e0c6b1b195d41343f760aacaa01..7ab5373b9961bfe2b23523c27a68403d24c05b21 100644 --- a/receiver/src/request_handler/request_handler_kafka_notify.cpp +++ b/receiver/src/request_handler/request_handler_kafka_notify.cpp @@ -4,11 +4,10 @@ namespace asapo { Error RequestHandlerKafkaNotify::ProcessRequest(Request* request) const { - bool online = request->GetSourceType() != SourceType::kProcessed && - !static_cast<bool>(request->GetCustomData()[kPosIngestMode] & IngestModeFlags::kWriteRawDataToOffline); + bool write_to_offline = request->GetSourceType() == SourceType::kProcessed || + static_cast<bool>(request->GetCustomData()[kPosIngestMode] & IngestModeFlags::kWriteRawDataToOffline); - if (!kafka_client_ || !online) { - //client was not initialized or file written to offline + if (!kafka_client_ || write_to_offline) { return nullptr; } diff --git a/receiver/unittests/mock_receiver_config.cpp b/receiver/unittests/mock_receiver_config.cpp index bec7f9caebd58a335bae5674539ad2bbc4c23dc8..ba0bbfb05ea9cebc2e9acef646b0026d7b86bb58 100644 --- a/receiver/unittests/mock_receiver_config.cpp +++ b/receiver/unittests/mock_receiver_config.cpp @@ -81,7 +81,7 @@ Error SetReceiverConfig (const ReceiverConfig& config, std::string error_field) config_string += "," + Key("LogLevel", error_field) + "\"" + log_level + "\""; config_string += "," + Key("Tag", error_field) + "\"" + config.tag + "\""; config_string += "," + Key("Kafka", error_field) + "{"; - config_string += Key("Enabled", error_field) + "false"; + config_string += Key("Enabled", error_field) + (config.kafka_config.enabled ? "true" : "false") ; config_string += "}"; config_string += "}"; diff --git a/receiver/unittests/request_handler/test_request_factory.cpp b/receiver/unittests/request_handler/test_request_factory.cpp index 7eeff9af6a6effb8572058aa990848126a92299d..ed58fe68f7736d45af13259595510141bb24c559 100644 --- a/receiver/unittests/request_handler/test_request_factory.cpp +++ b/receiver/unittests/request_handler/test_request_factory.cpp @@ -49,10 +49,12 @@ TEST_F(FactoryTests, ErrorOnWrongCode) { } TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendCode) { + config.kafka_config.enabled = true; + SetReceiverConfig(config, "none"); + for (auto code : std::vector<asapo::Opcode> {asapo::Opcode::kOpcodeTransferData, asapo::Opcode::kOpcodeTransferDatasetData}) { generic_request_header.op_code = code; auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); - ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(dynamic_cast<asapo::Request*>(request.get()), Ne(nullptr)); ASSERT_THAT(request->GetListHandlers().size(), Eq(6)); @@ -70,6 +72,7 @@ TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendCodeLargeFile) { for (auto code : std::vector<asapo::Opcode> {asapo::Opcode::kOpcodeTransferData, asapo::Opcode::kOpcodeTransferDatasetData}) { generic_request_header.op_code = code; config.receive_to_disk_threshold_mb = 0; + config.kafka_config.enabled = true; SetReceiverConfig(config, "none"); generic_request_header.data_size = 1; @@ -111,6 +114,7 @@ TEST_F(FactoryTests, DoNotAddDbWriterIfNotWanted) { generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::IngestModeFlags::kTransferData | asapo::IngestModeFlags::kStoreInFilesystem; + config.kafka_config.enabled = true; SetReceiverConfig(config, "none"); auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); @@ -124,6 +128,23 @@ TEST_F(FactoryTests, DoNotAddDbWriterIfNotWanted) { ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerKafkaNotify*>(request->GetListHandlers()[4]), Ne(nullptr)); } +TEST_F(FactoryTests, DoNotAddKafkaIfNotWanted) { + generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::IngestModeFlags::kTransferData | + asapo::IngestModeFlags::kStoreInFilesystem; + + config.kafka_config.enabled = false; + SetReceiverConfig(config, "none"); + + auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(request->GetListHandlers().size(), Eq(4)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerSecondaryAuthorization*>(request->GetListHandlers()[0]), + Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveMetaData*>(request->GetListHandlers()[1]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveData*>(request->GetListHandlers()[2]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerFileProcess*>(request->GetListHandlers()[3]), Ne(nullptr)); +} + TEST_F(FactoryTests, CachePassedToRequest) { RequestFactory factory{std::shared_ptr<asapo::DataCache>{new asapo::DataCache{0, 0}}, nullptr}; diff --git a/receiver/unittests/test_config.cpp b/receiver/unittests/test_config.cpp index 8ab74566ad81284e5bc37fd1f653784f9705cc5a..289113eaffcbf02e2d1bbd8b9ebdce0ba69919c9 100644 --- a/receiver/unittests/test_config.cpp +++ b/receiver/unittests/test_config.cpp @@ -81,6 +81,8 @@ TEST_F(ConfigTests, ReadSettings) { ASSERT_THAT(config->receive_to_disk_threshold_mb, Eq(50)); ASSERT_THAT(config->metrics.expose, Eq(true)); ASSERT_THAT(config->metrics.listen_port, Eq(123)); + ASSERT_THAT(config->kafka_config.enabled, Eq(false)); + } @@ -92,7 +94,7 @@ TEST_F(ConfigTests, ErrorReadSettings) { "DataCache", "Use", "SizeGB", "ReservedShare", "DatabaseServer", "Tag", "AuthorizationServer", "AuthorizationInterval", "PerformanceDbName", "LogLevel", "NThreads", "DiscoveryServer", "AdvertiseURI", "NetworkMode", "MonitorPerformance", - "ReceiveToDiskThresholdMB", "Metrics", "Expose"}; + "ReceiveToDiskThresholdMB", "Metrics", "Expose","Enabled"}; for (const auto& field : fields) { auto err = asapo::SetReceiverConfig(test_config, field); ASSERT_THAT(err, Ne(nullptr)); diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/CMakeLists.txt b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/CMakeLists.txt index 3f97480dd81c113781e3c63ebe61ef071d030737..daf1cbec1ffc1308ca52bfbce65ef2bc19a0e037 100644 --- a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/CMakeLists.txt +++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/CMakeLists.txt @@ -3,7 +3,8 @@ if (NOT WIN32) set(SOURCE_FILES kafka_mock.cpp) add_executable(${TARGET_NAME} ${SOURCE_FILES}) - target_link_libraries(${TARGET_NAME} rdkafka) + target_link_libraries(${TARGET_NAME} ${RDKAFKA_C_LIBRARIES}) + target_include_directories(${TARGET_NAME} PUBLIC ${RDKAFKA_INCLUDE_DIR}) #use expression generator to get rid of VS adding Debug/Release folders set_target_properties(${TARGET_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY