diff --git a/CMakeIncludes/dependencies.cmake b/CMakeIncludes/dependencies.cmake index a9b19a74c0e7f8fbe0f353946fc082349ad4fe7f..2e12703ccfe6ac0dc9f407f0ad191887f3590ec8 100644 --- a/CMakeIncludes/dependencies.cmake +++ b/CMakeIncludes/dependencies.cmake @@ -19,6 +19,12 @@ if(CURL_FOUND) #old FindCURL versions do not create CURL::libcurl target, so we endif() +find_package(RdKafka REQUIRED) +message (STATUS "Found rdkafka++ libraries: ${RDKAFKA_LIBRARIES}") +message (STATUS "rdkafka++ include dir : ${RDKAFKA_INCLUDE_DIR}") +if (WIN32) + message (STATUS "rdkafka++ binary dir (dll): ${RDKAFKA_BIN_DIR}") +endif() # python is needed anyway, even if no Python packages are build (e.g. to parse test results) if ("${Python_EXECUTABLE}" STREQUAL "") diff --git a/CMakeModules/FindRdKafka.cmake b/CMakeModules/FindRdKafka.cmake new file mode 100644 index 0000000000000000000000000000000000000000..1006e3c59127596c237c83123e2875c41c5a733a --- /dev/null +++ b/CMakeModules/FindRdKafka.cmake @@ -0,0 +1,24 @@ +# FindRdKafka +# ------------- +# +# Tries to find RdKafka on the system +# +# Available variables +# RDKAFKA_LIBRARIES - Path to the library +# RDKAFKA_INCLUDE_DIR - Path to the include dir + +cmake_minimum_required(VERSION 3.12) + +find_path(RDKAFKA_INCLUDE_DIR librdkafka/rdkafka.h HINTS ${RdKafka_DIR}/include) +find_library(RDKAFKA_LIBRARIES rdkafka++ HINTS ${RdKafka_DIR}/lib ${RdKafka_DIR}/lib64) +find_library(RDKAFKA_C_LIBRARIES rdkafka HINTS ${RdKafka_DIR}/lib ${RdKafka_DIR}/lib64) + +IF(WIN32) + find_path(RDKAFKA_BIN_DIR rdkafka++.dll HINTS ${RdKafka_DIR}/bin ${RdKafka_DIR}/lib) + mark_as_advanced(RDKAFKA_BIN_DIR) + find_package_handle_standard_args(RdKafka REQUIRED_VARS RDKAFKA_INCLUDE_DIR RDKAFKA_C_LIBRARIES RDKAFKA_LIBRARIES RDKAFKA_BIN_DIR) +ELSE() + find_package_handle_standard_args(RdKafka REQUIRED_VARS RDKAFKA_INCLUDE_DIR RDKAFKA_C_LIBRARIES RDKAFKA_LIBRARIES) +ENDIF() + +mark_as_advanced(RDKAFKA_LIBRARIES RDKAFKA_INCLUDE_DIR) diff --git a/CMakeModules/prepare_asapo.cmake b/CMakeModules/prepare_asapo.cmake index bcf2db0fcd819039856d419e85e7f3d00cc8be3b..e2badd2b9ca9e49bfd2414f7da3e431a3a12cf21 100644 --- a/CMakeModules/prepare_asapo.cmake +++ b/CMakeModules/prepare_asapo.cmake @@ -32,12 +32,14 @@ function(prepare_asapo) else() configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver_tcp.json.tpl.lin.in receiver_tcp.json.tpl @ONLY) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver_fabric.json.tpl.lin.in receiver_fabric.json.tpl @ONLY) + configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver_kafka.json.tpl.lin.in receiver_kafka.json.tpl @ONLY) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/authorizer_settings.json.tpl.lin authorizer.json.tpl COPYONLY) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/common_scripts/start_services.sh start_services.sh COPYONLY) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/common_scripts/stop_services.sh stop_services.sh COPYONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver_tcp.nmd.in receiver_tcp.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver_fabric.nmd.in receiver_fabric.nmd @ONLY) + configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver_kafka.nmd.in receiver_kafka.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/nginx_kill_lin.nmd nginx_kill.nmd @ONLY) endif() diff --git a/common/cpp/CMakeLists.txt b/common/cpp/CMakeLists.txt index 8f90ff257a0db963ffc133518e2768c020975fc7..c55ea7c5316671bfda98130a3fa8383517d5a579 100644 --- a/common/cpp/CMakeLists.txt +++ b/common/cpp/CMakeLists.txt @@ -8,6 +8,7 @@ add_subdirectory(src/data_structs) add_subdirectory(src/version) +add_subdirectory(src/kafka_client) add_subdirectory(src/http_client) diff --git a/common/cpp/include/asapo/json_parser/json_parser.h b/common/cpp/include/asapo/json_parser/json_parser.h index a2baa383dc52f4b65afadae2029b5a70cae9f255..cf123e712547fb42ef0c693c6f60404e13656405 100644 --- a/common/cpp/include/asapo/json_parser/json_parser.h +++ b/common/cpp/include/asapo/json_parser/json_parser.h @@ -4,6 +4,7 @@ #include <string> #include <memory> #include <vector> +#include <map> #include <string> #include "asapo/common/error.h" @@ -20,6 +21,8 @@ class JsonParser { Error GetString(const std::string& name, std::string* val) const noexcept; Error GetArrayUInt64(const std::string& name, std::vector<uint64_t>* val) const noexcept; Error GetArrayString(const std::string& name, std::vector<std::string>* val) const noexcept; + Error GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept; + Error GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept; Error GetArrayRawStrings(const std::string& name, std::vector<std::string>* val) const noexcept; Error GetRawString(std::string* val) const noexcept; Error GetFlattenedString(const std::string& prefix, const std::string& separator, std::string* val) const noexcept; diff --git a/common/cpp/include/asapo/kafka_client/kafka_client.h b/common/cpp/include/asapo/kafka_client/kafka_client.h new file mode 100644 index 0000000000000000000000000000000000000000..a920d36e1e1e0bc3fa53256dfc8d41052965acb6 --- /dev/null +++ b/common/cpp/include/asapo/kafka_client/kafka_client.h @@ -0,0 +1,23 @@ +#ifndef ASAPO_KAFKA_CLIENT_H +#define ASAPO_KAFKA_CLIENT_H + +#include "asapo/common/error.h" +#include "asapo/common/data_structs.h" +#include "asapo/kafka_client/kafka_config.h" +#include "asapo/kafka_client/kafka_error.h" +#include <map> + +namespace asapo { + +class KafkaClient { + public: + virtual Error Send(const std::string& data, + const std::string& topic) noexcept = 0; + virtual ~KafkaClient() = default; +}; + +KafkaClient* CreateKafkaClient(const KafkaClientConfig& config, Error* err); + +} + +#endif //ASAPO_KAFKA_CLIENT_H diff --git a/common/cpp/include/asapo/kafka_client/kafka_config.h b/common/cpp/include/asapo/kafka_client/kafka_config.h new file mode 100644 index 0000000000000000000000000000000000000000..51bdad9d8272d8e1c56bb158a79c180261ea7a35 --- /dev/null +++ b/common/cpp/include/asapo/kafka_client/kafka_config.h @@ -0,0 +1,17 @@ +#ifndef ASAPO_KAFKA_CONFIG_H +#define ASAPO_KAFKA_CONFIG_H + +#include <string> +#include <map> + +namespace asapo { + + struct KafkaClientConfig { + bool enabled{false}; + std::map<std::string, std::string> global_config; + std::map<std::string, std::map<std::string, std::string>> topics_config; + }; + +} + +#endif //ASAPO_KAFKA_CONFIG_H diff --git a/common/cpp/include/asapo/kafka_client/kafka_error.h b/common/cpp/include/asapo/kafka_client/kafka_error.h new file mode 100644 index 0000000000000000000000000000000000000000..3ef69183d3b25e77ba6f0a4a0cee64f9721f60ae --- /dev/null +++ b/common/cpp/include/asapo/kafka_client/kafka_error.h @@ -0,0 +1,43 @@ +#ifndef ASAPO_KAFKA_ERROR_H +#define ASAPO_KAFKA_ERROR_H + +#include "asapo/common/error.h" + +namespace asapo { + + enum class KafkaErrorType { + kQueueFullError, + kMessageTooLargeError, + kUnknownPartitionError, + kUnknownTopicError, + kGeneralError + }; + + using KafkaErrorTemplate = ServiceErrorTemplate<KafkaErrorType>; + + namespace KafkaErrorTemplates { + + auto const kQueueFullError = KafkaErrorTemplate { + "kafka queue is full", KafkaErrorType::kQueueFullError + }; + + auto const kMessageTooLargeError = KafkaErrorTemplate { + "kafka message is too large", KafkaErrorType::kMessageTooLargeError + }; + + auto const kUnknownPartitionError = KafkaErrorTemplate { + "partition is unknown in the kafka cluster", KafkaErrorType::kUnknownPartitionError + }; + + auto const kUnknownTopicError = KafkaErrorTemplate { + "partition is unknown in the kafka cluster", KafkaErrorType::kUnknownTopicError + }; + + auto const kGeneralError = KafkaErrorTemplate { + "unexpected kafka error occurred", KafkaErrorType::kGeneralError + }; + } + +} + +#endif //ASAPO_KAFKA_ERROR_H diff --git a/common/cpp/include/asapo/unittests/MockKafkaClient.h b/common/cpp/include/asapo/unittests/MockKafkaClient.h new file mode 100644 index 0000000000000000000000000000000000000000..f51ea3a6e553fdda1aaefd85b32decb2d3851d91 --- /dev/null +++ b/common/cpp/include/asapo/unittests/MockKafkaClient.h @@ -0,0 +1,22 @@ +#ifndef ASAPO_MOCKKAFKACLIENT_H +#define ASAPO_MOCKKAFKACLIENT_H + +#include <gtest/gtest.h> +#include <gmock/gmock.h> + +#include "asapo/kafka_client/kafka_client.h" + +namespace asapo { + +class MockKafkaClient : public KafkaClient { + public: + Error Send(const std::string& data, const std::string& topic) noexcept override { + return Error{Send_t(data, topic)}; + } + + MOCK_METHOD(ErrorInterface *, Send_t, (const std::string& data, const std::string& topic), ()); +}; + +} + +#endif //ASAPO_MOCKKAFKACLIENT_H diff --git a/common/cpp/src/json_parser/json_parser.cpp b/common/cpp/src/json_parser/json_parser.cpp index b25cb6b4a9d1f96226e60b4f5ccfe5fdfaf1afe3..32457c07ab127c4606e8be1ae7f54cb57a9f1083 100644 --- a/common/cpp/src/json_parser/json_parser.cpp +++ b/common/cpp/src/json_parser/json_parser.cpp @@ -24,6 +24,13 @@ Error JsonParser::GetArrayString(const std::string& name, std::vector<std::strin return rapid_json_->GetArrayString(name, val); } +Error JsonParser::GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept { + return rapid_json_->GetArrayObjectMembers(name, val); +} + +Error JsonParser::GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept { + return rapid_json_->GetDictionaryString(name, val); +} Error JsonParser::GetBool(const std::string& name, bool* val) const noexcept { return rapid_json_->GetBool(name, val); diff --git a/common/cpp/src/json_parser/rapid_json.cpp b/common/cpp/src/json_parser/rapid_json.cpp index a9883d134374785d5c801c5a33356cd7951c32fd..7dba5ea22cc247c0b07bff9a2d2757324757be97 100644 --- a/common/cpp/src/json_parser/rapid_json.cpp +++ b/common/cpp/src/json_parser/rapid_json.cpp @@ -168,6 +168,40 @@ Error RapidJson::GetArrayString(const std::string& name, std::vector<std::string } +Error RapidJson::GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept { + Value* json_val; + if (Error err = GetValuePointer(name, ValueType::kObject, &json_val)) { + return err; + } + + val->clear(); + for (auto& m : json_val->GetObject()) { + if (!m.name.IsString()) { + return GeneralErrorTemplates::kSimpleError.Generate("wrong type of object element: " + name); + } + val->push_back(m.name.GetString()); + } + return nullptr; + +} + +Error RapidJson::GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept { + Value* json_val; + if (Error err = GetValuePointer(name, ValueType::kObject, &json_val)) { + return err; + } + + val->clear(); + for (auto& m : json_val->GetObject()) { + if (!m.value.IsString() || !m.name.IsString()) { + return GeneralErrorTemplates::kSimpleError.Generate("wrong type of dictionary element: " + name); + } + (*val)[m.name.GetString()] = m.value.GetString(); + } + return nullptr; + +} + RapidJson::RapidJson(const RapidJson& parent, const std::string& subname) { auto err = parent.GetValuePointer(subname, ValueType::kObject, &object_p_); if (err) { diff --git a/common/cpp/src/json_parser/rapid_json.h b/common/cpp/src/json_parser/rapid_json.h index b4cbc88711dfcc0f8e7ae6f21257277b9c10469e..477d6e4ee24ef6c7186ef7b0496c8edc786e10c7 100644 --- a/common/cpp/src/json_parser/rapid_json.h +++ b/common/cpp/src/json_parser/rapid_json.h @@ -4,6 +4,7 @@ #include "rapidjson/document.h" #include "asapo/common/error.h" #include "asapo/io/io.h" +#include <map> namespace asapo { @@ -26,6 +27,8 @@ class RapidJson { Error GetString(const std::string& name, std::string* val) const noexcept; Error GetArrayUInt64(const std::string& name, std::vector<uint64_t>* val) const noexcept; Error GetArrayString(const std::string& name, std::vector<std::string>* val) const noexcept; + Error GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept; + Error GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept; Error GetArrayRawStrings(const std::string& name, std::vector<std::string>* val) const noexcept; Error GetRawString(std::string* val) const noexcept; Error GetFlattenedString(const std::string& prefix, const std::string& separator, std::string* val)const noexcept; diff --git a/common/cpp/src/kafka_client/CMakeLists.txt b/common/cpp/src/kafka_client/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..53d4ff666f3cb4e491d6417f12ed78be87b69aba --- /dev/null +++ b/common/cpp/src/kafka_client/CMakeLists.txt @@ -0,0 +1,13 @@ +set(TARGET_NAME rdkafka_client) +set(SOURCE_FILES + rdkafka_client.cpp + kafka_client_factory.cpp ../../include/asapo/preprocessor/definitions.h) + + +################################ +# Library +################################ +add_library(${TARGET_NAME} OBJECT ${SOURCE_FILES}) +target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR} ${RDKAFKA_INCLUDE_DIR}) + + diff --git a/common/cpp/src/kafka_client/kafka_client_factory.cpp b/common/cpp/src/kafka_client/kafka_client_factory.cpp new file mode 100644 index 0000000000000000000000000000000000000000..eabae085efb94ec96bfa9a8ff056256119ce8227 --- /dev/null +++ b/common/cpp/src/kafka_client/kafka_client_factory.cpp @@ -0,0 +1,14 @@ +#include "rdkafka_client.h" + +namespace asapo { + +KafkaClient* CreateKafkaClient(const KafkaClientConfig& config, Error* err) { + try { + return new RdKafkaClient(config); + } + catch (std::string errstr) { + (*err) = KafkaErrorTemplates::kGeneralError.Generate(errstr); + } + return nullptr; +} +} diff --git a/common/cpp/src/kafka_client/rdkafka_client.cpp b/common/cpp/src/kafka_client/rdkafka_client.cpp new file mode 100644 index 0000000000000000000000000000000000000000..55a24cfc9cc65542ec90aa96766c4c0fe497ee01 --- /dev/null +++ b/common/cpp/src/kafka_client/rdkafka_client.cpp @@ -0,0 +1,87 @@ +#include "rdkafka_client.h" + +#include <cstring> +#include "asapo/common/data_structs.h" +#include "asapo/io/io_factory.h" + +namespace asapo { + +RdKafkaClient::RdKafkaClient(const KafkaClientConfig& config) : default_topic_conf_(nullptr) { + std::string err; + RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + for (const auto& configItem : config.global_config) { + if (conf->set(configItem.first, configItem.second, err) != RdKafka::Conf::CONF_OK) { + throw "cannot initialize kafka: " + err; + } + } + + producer_ = RdKafka::Producer::create(conf, err); + if (!producer_) { + throw "cannot initialize kafka"; + } + + for (const auto& topic : config.topics_config) { + auto topic_config = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); + for (const auto& configItem : topic.second) { + if (topic_config->set(configItem.first, configItem.second, err) != RdKafka::Conf::CONF_OK) { + throw "cannot initialize kafka: " + err; + } + } + if (topic.first == "default") { + default_topic_conf_ = topic_config; + } else + { + auto topic_obj = RdKafka::Topic::create(producer_, topic.first, topic_config, err); + if (!topic_obj) { + throw "cannot initialize kafka topic [" + topic.first + "]: " + err; + + } + kafka_topics_[topic.first] = topic_obj; + } + } +} + +RdKafkaClient::~RdKafkaClient() { + if (producer_) { + producer_->flush(1000); + } + delete producer_; +} + +Error RdKafkaClient::Send(const std::string& data, const std::string& topic_name) noexcept { + auto topicIt = kafka_topics_.find(topic_name); + RdKafka::Topic* topic; + if (topicIt == kafka_topics_.end()) + { + if (!default_topic_conf_) { + return KafkaErrorTemplates::kUnknownTopicError.Generate(); + } + std::string err; + topic = RdKafka::Topic::create(producer_, topic_name, default_topic_conf_, err); + if (!topic) { + return KafkaErrorTemplates::kGeneralError.Generate("Cannot create kafka topic [" + topic_name + "]: " + err); + } + kafka_topics_[topic_name] = topic; + } + else + { + topic = topicIt->second; + } + + auto err = producer_->produce(topic, RdKafka::Topic::PARTITION_UA, + RdKafka::Producer::RK_MSG_COPY, + const_cast<void*>(static_cast<const void *>(data.data())), data.size(), + nullptr, nullptr); + + switch (err) { + case RdKafka::ERR_NO_ERROR: return nullptr; + case RdKafka::ERR__QUEUE_FULL: return KafkaErrorTemplates::kQueueFullError.Generate(); + case RdKafka::ERR_MSG_SIZE_TOO_LARGE: return KafkaErrorTemplates::kMessageTooLargeError.Generate(); + case RdKafka::ERR__UNKNOWN_PARTITION: return KafkaErrorTemplates::kUnknownPartitionError.Generate(); + case RdKafka::ERR__UNKNOWN_TOPIC: return KafkaErrorTemplates::kUnknownTopicError.Generate(); + default: return KafkaErrorTemplates::kGeneralError.Generate(err2str(err)); + } + +} + +} diff --git a/common/cpp/src/kafka_client/rdkafka_client.h b/common/cpp/src/kafka_client/rdkafka_client.h new file mode 100644 index 0000000000000000000000000000000000000000..3997ff3585b71134ce2f34fdfd3bc6683873b587 --- /dev/null +++ b/common/cpp/src/kafka_client/rdkafka_client.h @@ -0,0 +1,25 @@ +#ifndef ASAPO_RDKAFKA_CLIENT_H +#define ASAPO_RDKAFKA_CLIENT_H + +#include <string> + +#include "asapo/kafka_client/kafka_client.h" +#include "librdkafka/rdkafkacpp.h" + +namespace asapo { + +class RdKafkaClient final : public KafkaClient { + public: + RdKafkaClient(const KafkaClientConfig& config); + Error Send(const std::string& data, const std::string& topic) noexcept override; + + virtual ~RdKafkaClient(); + private: + RdKafka::Producer* producer_; + RdKafka::Conf* default_topic_conf_; + std::map<std::string, RdKafka::Topic *> kafka_topics_; +}; + +} + +#endif //ASAPO_RDKAFKA_CLIENT_H diff --git a/common/cpp/unittests/json_parser/test_json_parser.cpp b/common/cpp/unittests/json_parser/test_json_parser.cpp index d3017cac350dbbdd9ad9182472df99423baad8c0..ceed36b1b353aa3597e61217a30a115905de39cd 100644 --- a/common/cpp/unittests/json_parser/test_json_parser.cpp +++ b/common/cpp/unittests/json_parser/test_json_parser.cpp @@ -19,6 +19,7 @@ using ::testing::Return; using ::testing::SetArgPointee; using ::testing::HasSubstr; using ::testing::ElementsAre; +using ::testing::Pair; using ::testing::DoAll; using asapo::JsonFileParser; @@ -209,6 +210,54 @@ TEST(ParseString, StringArrayConvertToJson) { ASSERT_THAT(vec, ElementsAre("s1", "s2", "s3")); } +TEST(ParseString, ObjectMemberArrayConvertToJson) { + std::string json = R"({"object":{"k1":"v1","k2":"v2","k3":"v3"}})"; + + JsonStringParser parser{json}; + + std::vector<std::string> vec; + auto err = parser.GetArrayObjectMembers("object", &vec); + + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(vec, ElementsAre("k1", "k2", "k3")); +} + +TEST(ParseString, DictionaryStringConvertToJson) { + std::string json = R"({"object":{"k1":"v1","k2":"v2","k3":"v3"}})"; + + JsonStringParser parser{json}; + + std::map<std::string, std::string> map; + auto err = parser.GetDictionaryString("object", &map); + + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(map, ElementsAre(Pair("k1", "v1"), Pair("k2", "v2"), Pair("k3", "v3"))); +} + +TEST(ParseString, RawStringConvertToJson) { + std::string json = R"({"object":{"k1":"v1","k2":"v2","k3":"v3"}})"; + + JsonStringParser parser{json}; + + std::string value; + auto err = parser.GetRawString(&value); + + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(json, Eq(value)); +} + +TEST(ParseString, ArrayRawStringConvertToJson) { + std::string json = R"({"array":[{"k1":"v1"},{"k2":"v2"},{"k3":"v3"}]})"; + + JsonStringParser parser{json}; + + std::vector<std::string> vec; + auto err = parser.GetArrayRawStrings("array", &vec); + + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(vec, ElementsAre(R"({"k1":"v1"})", R"({"k2":"v2"})", R"({"k3":"v3"})")); +} + class ParseFileTests : public Test { public: NiceMock<MockIO> mock_io; diff --git a/config/nomad/receiver_kafka.nmd.in b/config/nomad/receiver_kafka.nmd.in new file mode 100644 index 0000000000000000000000000000000000000000..71bfe537dda1dc88d59ce61817618102e75291fc --- /dev/null +++ b/config/nomad/receiver_kafka.nmd.in @@ -0,0 +1,60 @@ +variable "receiver_kafka_metadata_broker_list" { + type = string +} + +job "receiver" { + datacenters = ["dc1"] + + type = "service" + + group "group" { + count = 1 + + task "receiver" { + driver = "raw_exec" + + config { + command = "@RECEIVER_DIR@/@RECEIVER_NAME@" + args = ["${NOMAD_TASK_DIR}/receiver.json"] + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + port "recv" {} + port "recv_ds" {} + port "recv_metrics" {} + } + } + + service { + name = "asapo-receiver" + port = "recv" + check { + name = "metrics" + type = "http" + port = "recv_metrics" + path = "/metrics" + interval = "10s" + timeout = "2s" + initial_status = "passing" + } + meta { + metrics-port = "${NOMAD_PORT_recv_metrics}" + } + } + + meta { + receiver_kafka_metadata_broker_list = "${var.receiver_kafka_metadata_broker_list}" + } + + template { + source = "@WORK_DIR@/receiver_kafka.json.tpl" + destination = "local/receiver.json" + change_mode = "signal" + change_signal = "SIGHUP" + } + } + } +} diff --git a/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json b/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json index c32c6703ad90abc586f4c47c4e3a758a56417dd8..7ae6db9d0e5f29539e8f706f8a5c7e248e9772ed 100644 --- a/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json +++ b/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json @@ -24,5 +24,14 @@ }, "Tag": "receiver", "ReceiveToDiskThresholdMB": {{ .Values.ownServices.receiver.receiveToDiskThresholdMB }}, - "LogLevel": "info" + "LogLevel": "info", + "Kafka": { + "Enabled": true, + "KafkaClient": { + "metadata.broker.list": {{ .Values.ownServices.receiver.kafkaBrokerList }} + }, + "KafkaTopics": { + "asapo": {} + } + } } diff --git a/deploy/asapo_helm_chart/asapo/values.yaml b/deploy/asapo_helm_chart/asapo/values.yaml index beccb7c2193ae10139e8325d3aa9ac175901bea3..946bc9394d2c4f9b089aa4a6124e15ec25fc00e8 100644 --- a/deploy/asapo_helm_chart/asapo/values.yaml +++ b/deploy/asapo_helm_chart/asapo/values.yaml @@ -33,6 +33,7 @@ ownServices: enable: true sizeGb: 1 receiveToDiskThresholdMB: 200 + kafkaBrokerList: asap3-events-01,asap3-events-02 fileTransfer: serviceName: asapo-file-transfer replicaCount: 1 diff --git a/deploy/asapo_services/asap3.tfvars b/deploy/asapo_services/asap3.tfvars index 1070bcc4026d94dd2223a0bbaae0951dacc22fa1..991b42e2f3eedffdf04d0221183ea1512c79880d 100644 --- a/deploy/asapo_services/asap3.tfvars +++ b/deploy/asapo_services/asap3.tfvars @@ -21,6 +21,7 @@ receiver_dataserver_cache_size = 30 #gb receiver_receive_to_disk_threshold = 50 # mb receiver_dataserver_nthreads = 8 receiver_network_modes = "tcp" +receiver_kafka_enabled = true grafana_total_memory_size = 2000 influxdb_total_memory_size = 2000 diff --git a/deploy/asapo_services/scripts/asapo-receivers.nmd.tpl b/deploy/asapo_services/scripts/asapo-receivers.nmd.tpl index fee0524f40eb4267b631c6bc1a6961dc4fc81c4e..5a48f8a93b24ba5f884fe670d284fc98fc538113 100644 --- a/deploy/asapo_services/scripts/asapo-receivers.nmd.tpl +++ b/deploy/asapo_services/scripts/asapo-receivers.nmd.tpl @@ -100,6 +100,8 @@ job "asapo-receivers" { receiver_network_modes = "${receiver_network_modes}" perf_monitor = "${perf_monitor}" receiver_expose_metrics = "${receiver_expose_metrics}" + receiver_kafka_enabled = "${receiver_kafka_enabled}" + receiver_kafka_metadata_broker_list = "${receiver_kafka_metadata_broker_list}" } template { diff --git a/deploy/asapo_services/scripts/receiver.json.tpl b/deploy/asapo_services/scripts/receiver.json.tpl index da0b30cd3d668b0a1eed5b0fed60033237c1961e..3183e72ab7ef371dc799f48ceab967af43531f6a 100644 --- a/deploy/asapo_services/scripts/receiver.json.tpl +++ b/deploy/asapo_services/scripts/receiver.json.tpl @@ -24,5 +24,14 @@ }, "Tag": "{{ env "attr.unique.hostname" }}", "ReceiveToDiskThresholdMB": {{ env "NOMAD_META_receiver_receive_to_disk_threshold" }}, - "LogLevel": "{{ keyOrDefault "receiver_log_level" "info" }}" + "LogLevel": "{{ keyOrDefault "receiver_log_level" "info" }}", + "Kafka": { + "Enabled": {{ env "NOMAD_META_receiver_kafka_enabled" }}, + "KafkaClient": { + "metadata.broker.list": "{{ env "NOMAD_META_receiver_kafka_metadata_broker_list" }}" + }, + "KafkaTopics": { + "asapo": {} + } + } } diff --git a/deploy/asapo_services/scripts/templates.tf b/deploy/asapo_services/scripts/templates.tf index 5cc67dc9e132bd0f6149d4abb34fdca2440362df..664505890ed9e00007731886657d2dc5532932dc 100644 --- a/deploy/asapo_services/scripts/templates.tf +++ b/deploy/asapo_services/scripts/templates.tf @@ -56,6 +56,8 @@ data "template_file" "asapo_receivers" { force_pull_images = "${var.force_pull_images}" perf_monitor = "${var.perf_monitor}" receiver_expose_metrics = "${var.receiver_expose_metrics}" + receiver_kafka_enabled = "${var.receiver_kafka_enabled}" + receiver_kafka_metadata_broker_list = "${var.receiver_kafka_metadata_broker_list}" } } diff --git a/deploy/asapo_services/scripts/vars.tf b/deploy/asapo_services/scripts/vars.tf index c7d16e5f1c2200764cf6f3b0ea26d2a35fd96423..13068f32a322ec22e78a8342b98719fdd2071fac 100644 --- a/deploy/asapo_services/scripts/vars.tf +++ b/deploy/asapo_services/scripts/vars.tf @@ -58,6 +58,10 @@ variable "receiver_network_modes" {} variable "receiver_expose_metrics" {} +variable "receiver_kafka_enabled" {} + +variable "receiver_kafka_metadata_broker_list" {} + variable "grafana_total_memory_size" {} variable "influxdb_total_memory_size" {} @@ -111,4 +115,4 @@ variable "n_brokers" {} variable "n_fts" {} -variable "ldap_uri" {} \ No newline at end of file +variable "ldap_uri" {} diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index 836a04f9b650155c7c41b7620838fda23f783ea1..92e0b87ca8180b53ac70f1c44e8b5caa0646803f 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -26,6 +26,7 @@ set(RECEIVER_CORE_FILES src/request_handler/request_handler_db_check_request.cpp src/request_handler/request_handler_db_delete_stream.cpp src/request_handler/request_handler_db_get_meta.cpp + src/request_handler/request_handler_kafka_notify.cpp src/request_handler/request_factory.cpp src/request_handler/request_handler_db.cpp src/request_handler/file_processors/write_file_processor.cpp @@ -59,11 +60,11 @@ set(SOURCE_FILES GET_PROPERTY(ASAPO_COMMON_FABRIC_LIBRARIES GLOBAL PROPERTY ASAPO_COMMON_FABRIC_LIBRARIES) add_library(${TARGET_NAME} STATIC ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> $<TARGET_OBJECTS:curl_http_client> - $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:request_pool>) + $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:request_pool> $<TARGET_OBJECTS:rdkafka_client>) set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX) target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR}) -target_include_directories(${TARGET_NAME} SYSTEM PUBLIC ${LIBFABRIC_INCLUDE_DIR}) -target_link_libraries(${TARGET_NAME} CURL::libcurl ${CMAKE_THREAD_LIBS_INIT} database +target_include_directories(${TARGET_NAME} SYSTEM PUBLIC ${LIBFABRIC_INCLUDE_DIR} ${RDKAFKA_INCLUDE_DIR}) +target_link_libraries(${TARGET_NAME} CURL::libcurl ${RDKAFKA_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} database asapo-fabric ${ASAPO_COMMON_FABRIC_LIBRARIES}) @@ -75,8 +76,14 @@ set_target_properties(${TARGET_NAME}-bin PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}$<$<CONFIG:Debug>:> ) +if (WIN32) + file(GLOB KAFKA_DLLS "${RDKAFKA_BIN_DIR}/rdkafka*.dll") + file(COPY ${KAFKA_DLLS} DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) +endif() + configure_file(docker/Dockerfile . COPYONLY) configure_file(docker/install_libfabric.sh . COPYONLY) +configure_file(docker/install_rdkafka.sh . COPYONLY) ################################ @@ -107,6 +114,7 @@ set(TEST_SOURCE_FILES unittests/request_handler/test_request_handler_receive_metadata.cpp unittests/request_handler/test_request_handler_delete_stream.cpp unittests/request_handler/test_request_handler_db_get_meta.cpp + unittests/request_handler/test_request_handler_kafka_notify.cpp unittests/statistics/test_statistics_sender_influx_db.cpp unittests/statistics/test_statistics_sender_fluentd.cpp unittests/mock_receiver_config.cpp diff --git a/receiver/docker/Dockerfile b/receiver/docker/Dockerfile index e03a3cf5c8fee45ded2c608ddc5d4ab711bd5a36..00df9b405fa013a8d48d81a3b180b23f0f76019e 100644 --- a/receiver/docker/Dockerfile +++ b/receiver/docker/Dockerfile @@ -1,6 +1,7 @@ FROM ubuntu:18.04 ADD receiver / ADD install_libfabric.sh install_libfabric.sh -RUN apt update && ./install_libfabric.sh +ADD install_rdkafka.sh install_rdkakfa.sh +RUN apt update && ./install_libfabric.sh && ./install_rdkakfa.sh CMD ["/receiver","/var/lib/receiver/config.json"] diff --git a/receiver/docker/install_rdkafka.sh b/receiver/docker/install_rdkafka.sh new file mode 100755 index 0000000000000000000000000000000000000000..e1bb3c207aa627ca17a481d56a472dc4a94eb69a --- /dev/null +++ b/receiver/docker/install_rdkafka.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +apt install -y wget build-essential autoconf libtool make zlib1g-dev libzstd-dev + +wget https://github.com/edenhill/librdkafka/archive/refs/tags/v1.8.2.tar.gz +tar xzf v1.8.2.tar.gz +cd librdkafka-1.8.2 + +./configure --enable-zlib --enable-zstd --disable-lz4 --disable-lz4-ext --disable-ssl --disable-gssapi --disable-sasl +make -j 4 +make install +ldconfig +cd - +rm -rf librdkafka-1.8.2 +rm v1.8.2.tar.gz \ No newline at end of file diff --git a/receiver/src/connection.cpp b/receiver/src/connection.cpp index b0fcdca7010056553a11f89041c459c94ab71f49..aa0a1e028271d9ec590058857333317e46280818 100644 --- a/receiver/src/connection.cpp +++ b/receiver/src/connection.cpp @@ -9,11 +9,11 @@ namespace asapo { Connection::Connection(SocketDescriptor socket_fd, const std::string& address, - SharedCache cache, std::string receiver_tag) : + SharedCache cache, KafkaClient* kafkaClient, std::string receiver_tag) : io__{GenerateDefaultIO()}, statistics__{new ReceiverStatistics}, log__{GetDefaultReceiverLogger()}, -requests_dispatcher__{new RequestsDispatcher{socket_fd, address, statistics__.get(), cache}} { +requests_dispatcher__{new RequestsDispatcher{socket_fd, address, statistics__.get(), cache, kafkaClient}} { socket_fd_ = socket_fd; address_ = address; statistics__->AddTag("connection_from", address); diff --git a/receiver/src/connection.h b/receiver/src/connection.h index 192b8e66af2626525c7f15a3e0e6146182da06ef..88af2a4ec6001cbf58e32c88e940bfbf24f476a1 100644 --- a/receiver/src/connection.h +++ b/receiver/src/connection.h @@ -28,7 +28,7 @@ class Connection { int socket_fd_; public: - Connection(SocketDescriptor socket_fd, const std::string& address, SharedCache cache, std::string receiver_tag); + Connection(SocketDescriptor socket_fd, const std::string& address, SharedCache cache, KafkaClient* kafkaClient, std::string receiver_tag); ~Connection() = default; void Listen() const noexcept; diff --git a/receiver/src/main.cpp b/receiver/src/main.cpp index 9f818218dcf240c04648775c1e83e2c5764645d8..77410e80e81e7007826b44871f722578d2bb22ee 100644 --- a/receiver/src/main.cpp +++ b/receiver/src/main.cpp @@ -6,6 +6,7 @@ #include "receiver_data_server/receiver_data_server_logger.h" #include "asapo/common/internal/version.h" +#include "asapo/kafka_client/kafka_client.h" #include "receiver_data_server/receiver_data_server.h" #include "receiver_data_server/net_server/rds_tcp_server.h" @@ -77,11 +78,11 @@ std::vector<std::thread> StartDataServers(const asapo::ReceiverConfig* config, a } int StartReceiver(const asapo::ReceiverConfig* config, asapo::SharedCache cache, - asapo::AbstractLogger* logger) { + asapo::KafkaClient* kafkaClient, asapo::AbstractLogger* logger) { static const std::string address = "0.0.0.0:" + std::to_string(config->listen_port); logger->Info(std::string("starting receiver, version ") + asapo::kVersion); - auto* receiver = new asapo::Receiver(cache); + auto receiver = std::unique_ptr<asapo::Receiver>{new asapo::Receiver(cache, kafkaClient)}; logger->Info("listening on " + address); asapo::Error err; @@ -134,7 +135,20 @@ int main(int argc, char* argv[]) { } auto metrics_thread = StartMetricsServer(config->metrics, logger); - auto exit_code = StartReceiver(config, cache, logger); + + std::unique_ptr<asapo::KafkaClient> kafkaClient; + if (config->kafka_config.enabled) { + kafkaClient.reset(asapo::CreateKafkaClient(config->kafka_config, &err)); + if (kafkaClient == nullptr) { + logger->Error("error initializing kafka client: " + err->Explain()); + return EXIT_FAILURE; + } + } + else { + logger->Info("kafka notifications disabled."); + } + + auto exit_code = StartReceiver(config, cache, kafkaClient.get(), logger); // todo: implement graceful exit, currently it never reaches this point return exit_code; } diff --git a/receiver/src/receiver.cpp b/receiver/src/receiver.cpp index e3ef529d402f141d51004104e3ab01bbbda5bfcd..8dc7f7733f7d811870f687acf61b0c75e3e0cead 100644 --- a/receiver/src/receiver.cpp +++ b/receiver/src/receiver.cpp @@ -11,7 +11,7 @@ namespace asapo { const int Receiver::kMaxUnacceptedConnectionsBacklog = 5; -Receiver::Receiver(SharedCache cache): cache_{cache}, io__{GenerateDefaultIO()}, log__{GetDefaultReceiverLogger()} { +Receiver::Receiver(SharedCache cache, KafkaClient* kafkaClient): cache_{cache}, kafka_client_{kafkaClient}, io__{GenerateDefaultIO()}, log__{GetDefaultReceiverLogger()} { } @@ -55,7 +55,7 @@ void Receiver::StartNewConnectionInSeparateThread(int connection_socket_fd, cons log__->Info(LogMessageWithFields("new connection with producer").Append("origin", HostFromUri(address))); auto thread = io__->NewThread("ConFd:" + std::to_string(connection_socket_fd), [connection_socket_fd, address, this] { - auto connection = std::unique_ptr<Connection>(new Connection(connection_socket_fd, address, cache_, GetReceiverConfig()->tag)); + auto connection = std::unique_ptr<Connection>(new Connection(connection_socket_fd, address, cache_, kafka_client_.get(), GetReceiverConfig()->tag)); connection->Listen(); }); diff --git a/receiver/src/receiver.h b/receiver/src/receiver.h index e7d9bf1dd3c064cb033f551d4affd5f426854da1..a0ff2bb13703ee3c46f3d1f08f113cc1140de7bd 100644 --- a/receiver/src/receiver.h +++ b/receiver/src/receiver.h @@ -20,11 +20,12 @@ class Receiver { void ProcessConnections(Error* err); std::vector<std::unique_ptr<std::thread>> threads_; SharedCache cache_; + std::unique_ptr<KafkaClient> kafka_client_; public: static const int kMaxUnacceptedConnectionsBacklog;//TODO: Read from config Receiver(const Receiver&) = delete; Receiver& operator=(const Receiver&) = delete; - Receiver(SharedCache cache); + Receiver(SharedCache cache, KafkaClient* kafkaClient); void Listen(std::string listener_address, Error* err, bool exit_after_first_connection = false); std::unique_ptr<IO> io__; diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp index c6cc7846083c669c072bbb9d6d9eacdf0f4d0b54..d6f7ca0ac6fad3809fc392754c49e2bf2fa63d54 100644 --- a/receiver/src/receiver_config.cpp +++ b/receiver/src/receiver_config.cpp @@ -17,6 +17,8 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) { std::string log_level; Error err; + std::vector<std::string> kafkaTopics; + (err = parser.GetString("PerformanceDbServer", &config.performance_db_uri)) || (err = parser.GetBool("MonitorPerformance", &config.monitor_performance)) || (err = parser.GetUInt64("ListenPort", &config.listen_port)) || @@ -36,12 +38,29 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) { (err = parser.Embedded("DataServer").GetArrayString("NetworkMode", &config.dataserver.network_mode)) || (err = parser.Embedded("Metrics").GetBool("Expose", &config.metrics.expose)) || (err = parser.Embedded("Metrics").GetUInt64("ListenPort", &config.metrics.listen_port)) || - (err = parser.GetString("LogLevel", &log_level)); + (err = parser.GetString("LogLevel", &log_level)) || + (err = parser.Embedded("Kafka").GetBool("Enabled", &config.kafka_config.enabled)); if (err) { return err; } + if (config.kafka_config.enabled) { + (err = parser.Embedded("Kafka").GetDictionaryString("KafkaClient", &config.kafka_config.global_config)) || + (err = parser.Embedded("Kafka").GetArrayObjectMembers("KafkaTopics", &kafkaTopics)); + if (err) { + return err; + } + + for(const auto& topic : kafkaTopics) { + auto topicConfig = config.kafka_config.topics_config[topic]; + err = parser.Embedded("Kafka").Embedded("KafkaTopics").GetDictionaryString(topic, &topicConfig); + if (err) { + return err; + } + } + } + config.dataserver.tag = config.tag + "_ds"; config.log_level = StringToLogLevel(log_level, &err); diff --git a/receiver/src/receiver_config.h b/receiver/src/receiver_config.h index 0a054e2c1625cd7d41259b2e8d0f11007bd6f19a..d065ed2958921e21441a4bc28ef5c5217c7e3af5 100644 --- a/receiver/src/receiver_config.h +++ b/receiver/src/receiver_config.h @@ -7,6 +7,7 @@ #include "receiver_data_server/receiver_data_server_config.h" #include "metrics/receiver_metrics_config.h" +#include "asapo/kafka_client/kafka_config.h" namespace asapo { @@ -27,6 +28,7 @@ struct ReceiverConfig { ReceiverDataServerConfig dataserver; ReceiverMetricsConfig metrics; std::string discovery_server; + KafkaClientConfig kafka_config; }; class ReceiverConfigManager { diff --git a/receiver/src/request_handler/request_factory.cpp b/receiver/src/request_handler/request_factory.cpp index e9c5fdd8d5db8d4d35e041fab1f013bae5a9c799..c281e71f1dfa5774c90ea36bb2435beaece35aca 100644 --- a/receiver/src/request_handler/request_factory.cpp +++ b/receiver/src/request_handler/request_factory.cpp @@ -32,6 +32,9 @@ void RequestFactory::AddReceiveViaBufferHandlers(std::unique_ptr<Request>& reque request->AddHandler(&request_handler_receivedata_); if (NeedFileWriteHandler(request_header)) { request->AddHandler(&request_handler_filewrite_); + if (GetReceiverConfig()->kafka_config.enabled) { + request->AddHandler(&request_handler_kafka_notify_); + } } } @@ -42,6 +45,9 @@ Error RequestFactory::AddReceiveDirectToFileHandler(std::unique_ptr<Request>& re "ingest mode should include kStoreInFilesystem for large files "); } request->AddHandler(&request_handler_filereceive_); + if (GetReceiverConfig()->kafka_config.enabled) { + request->AddHandler(&request_handler_kafka_notify_); + } return nullptr; } @@ -116,8 +122,7 @@ std::unique_ptr<Request> RequestFactory::GenerateRequest(const GenericRequestHea return request; } -RequestFactory::RequestFactory(SharedCache cache) : cache_{cache} { - +RequestFactory::RequestFactory(SharedCache cache, KafkaClient* kafka_client) : request_handler_kafka_notify_{kafka_client}, cache_{cache} { } } diff --git a/receiver/src/request_handler/request_factory.h b/receiver/src/request_handler/request_factory.h index 04f871cdaa311cbf024e02122e27d3fab044c997..578e2b342c89ba65754424e46dfab61ddd93a7cf 100644 --- a/receiver/src/request_handler/request_factory.h +++ b/receiver/src/request_handler/request_factory.h @@ -8,6 +8,7 @@ #include "request_handler_db_last_stream.h" #include "request_handler_db_delete_stream.h" #include "request_handler_db_get_meta.h" +#include "request_handler_kafka_notify.h" #include "request_handler_file_process.h" #include "request_handler_db_write.h" @@ -23,7 +24,7 @@ namespace asapo { class RequestFactory { public: - explicit RequestFactory (SharedCache cache); + explicit RequestFactory (SharedCache cache, KafkaClient* kafka_client); virtual std::unique_ptr<Request> GenerateRequest(const GenericRequestHeader& request_header, SocketDescriptor socket_fd, std::string origin_uri, Error* err) const noexcept; virtual ~RequestFactory() = default; @@ -45,6 +46,7 @@ class RequestFactory { RequestHandlerInitialAuthorization request_handler_initial_authorize_{&shared_auth_cache_}; RequestHandlerSecondaryAuthorization request_handler_secondary_authorize_{&shared_auth_cache_}; RequestHandlerDbCheckRequest request_handler_db_check_{kDBDataCollectionNamePrefix}; + RequestHandlerKafkaNotify request_handler_kafka_notify_; SharedCache cache_; AuthorizationData shared_auth_cache_; bool ReceiveDirectToFile(const GenericRequestHeader& request_header) const; diff --git a/receiver/src/request_handler/request_handler_kafka_notify.cpp b/receiver/src/request_handler/request_handler_kafka_notify.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7ab5373b9961bfe2b23523c27a68403d24c05b21 --- /dev/null +++ b/receiver/src/request_handler/request_handler_kafka_notify.cpp @@ -0,0 +1,33 @@ +#include "request_handler_kafka_notify.h" +#include "../request.h" + +namespace asapo { + +Error RequestHandlerKafkaNotify::ProcessRequest(Request* request) const { + bool write_to_offline = request->GetSourceType() == SourceType::kProcessed || + static_cast<bool>(request->GetCustomData()[kPosIngestMode] & IngestModeFlags::kWriteRawDataToOffline); + + if (!kafka_client_ || write_to_offline) { + return nullptr; + } + + auto root_folder = request->GetOnlinePath(); + if (root_folder.empty()) { + return ReceiverErrorTemplates::kBadRequest.Generate("online path not available"); + } + + std::string message = "{" + "\"event\":\"IN_CLOSE_WRITE\"," + "\"path\":\"" + root_folder + kPathSeparator + request->GetFileName() + "\"" + "}"; + + return kafka_client_->Send(message, "asapo"); +} + +StatisticEntity RequestHandlerKafkaNotify::GetStatisticEntity() const { + return StatisticEntity::kNetwork; +} + +RequestHandlerKafkaNotify::RequestHandlerKafkaNotify(KafkaClient* kafka_client) : kafka_client_{kafka_client} { +} +} \ No newline at end of file diff --git a/receiver/src/request_handler/request_handler_kafka_notify.h b/receiver/src/request_handler/request_handler_kafka_notify.h new file mode 100644 index 0000000000000000000000000000000000000000..399b01126dce0865f273b8446746c5acde53692d --- /dev/null +++ b/receiver/src/request_handler/request_handler_kafka_notify.h @@ -0,0 +1,20 @@ +#ifndef ASAPO_REQUEST_HANDLER_KAFKA_NOTIFY_H +#define ASAPO_REQUEST_HANDLER_KAFKA_NOTIFY_H + +#include "request_handler.h" +#include "asapo/kafka_client/kafka_client.h" + +namespace asapo { + +class RequestHandlerKafkaNotify final : public ReceiverRequestHandler { + public: + RequestHandlerKafkaNotify(KafkaClient* kafka_client); + StatisticEntity GetStatisticEntity() const override; + Error ProcessRequest(Request* request) const override; + private: + KafkaClient* kafka_client_; +}; + +} + +#endif //ASAPO_REQUEST_HANDLER_KAFKA_NOTIFY_H diff --git a/receiver/src/request_handler/requests_dispatcher.cpp b/receiver/src/request_handler/requests_dispatcher.cpp index e222465641aaaeb919976ed713d9ef316214cb40..e416ede4b3c670a41674f27e0eaa137a0e93395c 100644 --- a/receiver/src/request_handler/requests_dispatcher.cpp +++ b/receiver/src/request_handler/requests_dispatcher.cpp @@ -5,13 +5,13 @@ namespace asapo { RequestsDispatcher::RequestsDispatcher(SocketDescriptor socket_fd, std::string address, - ReceiverStatistics* statistics, SharedCache cache) : statistics__{statistics}, + ReceiverStatistics* statistics, SharedCache cache, KafkaClient* kafka_client) : statistics__{statistics}, io__{GenerateDefaultIO()}, log__{ GetDefaultReceiverLogger()}, request_factory__{ new RequestFactory{ - cache}}, + cache, kafka_client}}, socket_fd_{socket_fd}, producer_uri_{ std::move(address)} { diff --git a/receiver/src/request_handler/requests_dispatcher.h b/receiver/src/request_handler/requests_dispatcher.h index 051f8efb0993de1ff96d321c0c90c2346bbac5bf..67d9169df8007d87700480acd4d1783ddcad9f2a 100644 --- a/receiver/src/request_handler/requests_dispatcher.h +++ b/receiver/src/request_handler/requests_dispatcher.h @@ -14,7 +14,7 @@ namespace asapo { class RequestsDispatcher { public: - RequestsDispatcher(SocketDescriptor socket_fd, std::string address, ReceiverStatistics* statistics, SharedCache cache); + RequestsDispatcher(SocketDescriptor socket_fd, std::string address, ReceiverStatistics* statistics, SharedCache cache, KafkaClient* kafka_client); ASAPO_VIRTUAL Error ProcessRequest(const std::unique_ptr<Request>& request) const noexcept; ASAPO_VIRTUAL std::unique_ptr<Request> GetNextRequest(Error* err) const noexcept; ASAPO_VIRTUAL ~RequestsDispatcher() = default; diff --git a/receiver/unittests/mock_receiver_config.cpp b/receiver/unittests/mock_receiver_config.cpp index 2e02670b4a23553ecfa320112b924f05c20a4545..ba0bbfb05ea9cebc2e9acef646b0026d7b86bb58 100644 --- a/receiver/unittests/mock_receiver_config.cpp +++ b/receiver/unittests/mock_receiver_config.cpp @@ -80,6 +80,9 @@ Error SetReceiverConfig (const ReceiverConfig& config, std::string error_field) config_string += "," + Key("AuthorizationServer", error_field) + "\"" + config.authorization_server + "\""; config_string += "," + Key("LogLevel", error_field) + "\"" + log_level + "\""; config_string += "," + Key("Tag", error_field) + "\"" + config.tag + "\""; + config_string += "," + Key("Kafka", error_field) + "{"; + config_string += Key("Enabled", error_field) + (config.kafka_config.enabled ? "true" : "false") ; + config_string += "}"; config_string += "}"; diff --git a/receiver/unittests/request_handler/test_request_factory.cpp b/receiver/unittests/request_handler/test_request_factory.cpp index f2c6179a22a17ad97f823dc5fa11747ea9c5cf06..ed58fe68f7736d45af13259595510141bb24c559 100644 --- a/receiver/unittests/request_handler/test_request_factory.cpp +++ b/receiver/unittests/request_handler/test_request_factory.cpp @@ -5,6 +5,7 @@ #include "asapo/unittests/MockIO.h" #include "asapo/unittests/MockDatabase.h" +#include "asapo/unittests/MockKafkaClient.h" #include "../../src/connection.h" #include "../../src/receiver_error.h" #include "../../src/request.h" @@ -25,7 +26,8 @@ namespace { class FactoryTests : public Test { public: - RequestFactory factory{nullptr}; + asapo::MockKafkaClient kafkaClient; + RequestFactory factory{nullptr, &kafkaClient}; Error err{nullptr}; GenericRequestHeader generic_request_header; ReceiverConfig config; @@ -47,18 +49,21 @@ TEST_F(FactoryTests, ErrorOnWrongCode) { } TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendCode) { + config.kafka_config.enabled = true; + SetReceiverConfig(config, "none"); + for (auto code : std::vector<asapo::Opcode> {asapo::Opcode::kOpcodeTransferData, asapo::Opcode::kOpcodeTransferDatasetData}) { generic_request_header.op_code = code; auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); - ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(dynamic_cast<asapo::Request*>(request.get()), Ne(nullptr)); - ASSERT_THAT(request->GetListHandlers().size(), Eq(5)); + ASSERT_THAT(request->GetListHandlers().size(), Eq(6)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerSecondaryAuthorization*>(request->GetListHandlers()[0]), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveMetaData*>(request->GetListHandlers()[1]), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveData*>(request->GetListHandlers()[2]), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerFileProcess*>(request->GetListHandlers()[3]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerKafkaNotify*>(request->GetListHandlers()[4]), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers().back()), Ne(nullptr)); } } @@ -67,6 +72,7 @@ TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendCodeLargeFile) { for (auto code : std::vector<asapo::Opcode> {asapo::Opcode::kOpcodeTransferData, asapo::Opcode::kOpcodeTransferDatasetData}) { generic_request_header.op_code = code; config.receive_to_disk_threshold_mb = 0; + config.kafka_config.enabled = true; SetReceiverConfig(config, "none"); generic_request_header.data_size = 1; @@ -74,11 +80,12 @@ TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendCodeLargeFile) { ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(dynamic_cast<asapo::Request*>(request.get()), Ne(nullptr)); - ASSERT_THAT(request->GetListHandlers().size(), Eq(4)); + ASSERT_THAT(request->GetListHandlers().size(), Eq(5)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerSecondaryAuthorization*>(request->GetListHandlers()[0]), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveMetaData*>(request->GetListHandlers()[1]), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerFileProcess*>(request->GetListHandlers()[2]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerKafkaNotify*>(request->GetListHandlers()[3]), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers().back()), Ne(nullptr)); } } @@ -107,6 +114,25 @@ TEST_F(FactoryTests, DoNotAddDbWriterIfNotWanted) { generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::IngestModeFlags::kTransferData | asapo::IngestModeFlags::kStoreInFilesystem; + config.kafka_config.enabled = true; + SetReceiverConfig(config, "none"); + + auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(request->GetListHandlers().size(), Eq(5)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerSecondaryAuthorization*>(request->GetListHandlers()[0]), + Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveMetaData*>(request->GetListHandlers()[1]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveData*>(request->GetListHandlers()[2]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerFileProcess*>(request->GetListHandlers()[3]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerKafkaNotify*>(request->GetListHandlers()[4]), Ne(nullptr)); +} + +TEST_F(FactoryTests, DoNotAddKafkaIfNotWanted) { + generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::IngestModeFlags::kTransferData | + asapo::IngestModeFlags::kStoreInFilesystem; + + config.kafka_config.enabled = false; SetReceiverConfig(config, "none"); auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); @@ -120,7 +146,7 @@ TEST_F(FactoryTests, DoNotAddDbWriterIfNotWanted) { } TEST_F(FactoryTests, CachePassedToRequest) { - RequestFactory factory{std::shared_ptr<asapo::DataCache>{new asapo::DataCache{0, 0}}}; + RequestFactory factory{std::shared_ptr<asapo::DataCache>{new asapo::DataCache{0, 0}}, nullptr}; auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); ASSERT_THAT(err, Eq(nullptr)); diff --git a/receiver/unittests/request_handler/test_request_handler_kafka_notify.cpp b/receiver/unittests/request_handler/test_request_handler_kafka_notify.cpp new file mode 100644 index 0000000000000000000000000000000000000000..fdd4c299b704c2edb2d8cc4c434b391adf30d851 --- /dev/null +++ b/receiver/unittests/request_handler/test_request_handler_kafka_notify.cpp @@ -0,0 +1,66 @@ +#include <gtest/gtest.h> +#include <gmock/gmock.h> + +#include "asapo/unittests/MockKafkaClient.h" +#include "../../src/request_handler/request_factory.h" +#include "../receiver_mocking.h" + +using namespace testing; +using namespace asapo; + +namespace { + +class KafkaNotifyHandlerTests : public Test { + public: + NiceMock<MockKafkaClient> kafka_client; + RequestHandlerKafkaNotify handler{&kafka_client}; + std::unique_ptr<NiceMock<MockRequest>> mock_request; + std::string expected_filename = std::string("raw") + asapo::kPathSeparator + "filename"; + std::string expected_online_path = std::string("online") + asapo::kPathSeparator + "path"; + CustomRequestData expected_custom_data {kDefaultIngestMode, 0, 0}; + const std::string expected_topic = "asapo"; + + void SetUp() override { + GenericRequestHeader request_header; + mock_request.reset(new NiceMock<MockRequest> {request_header, 1, "", nullptr}); + } + + void TearDown() override { + } +}; + +TEST_F(KafkaNotifyHandlerTests, KafkaNotifyOK) { + EXPECT_CALL(*mock_request, GetFileName()).WillOnce(Return(expected_filename)); + EXPECT_CALL(*mock_request, GetOnlinePath()).WillOnce(ReturnRef(expected_online_path)); + EXPECT_CALL(kafka_client, Send_t(HasSubstr(expected_filename), expected_topic)).WillOnce(Return(nullptr)); + EXPECT_CALL(*mock_request, GetCustomData_t()).WillOnce(Return(expected_custom_data)); + EXPECT_CALL(*mock_request, GetSourceType()).WillOnce(Return(SourceType::kRaw)); + + auto err = handler.ProcessRequest(mock_request.get()); + ASSERT_THAT(err, Eq(nullptr)); + Mock::VerifyAndClearExpectations(mock_request.get()); + Mock::VerifyAndClearExpectations(&kafka_client); +} + +TEST_F(KafkaNotifyHandlerTests, KafkaNotifyNotNeededForProcessed) { + EXPECT_CALL(*mock_request, GetSourceType()).WillOnce(Return(SourceType::kProcessed)); + + + auto err = handler.ProcessRequest(mock_request.get()); + ASSERT_THAT(err, Eq(nullptr)); + Mock::VerifyAndClearExpectations(mock_request.get()); + Mock::VerifyAndClearExpectations(&kafka_client); +} + +TEST_F(KafkaNotifyHandlerTests, KafkaNotifyNotNeededForOfflineRaw) { + EXPECT_CALL(*mock_request, GetSourceType()).WillOnce(Return(SourceType::kRaw)); + EXPECT_CALL(*mock_request, GetCustomData_t()).WillOnce(Return(expected_custom_data)); + expected_custom_data[kPosIngestMode] |= IngestModeFlags::kWriteRawDataToOffline; + + auto err = handler.ProcessRequest(mock_request.get()); + ASSERT_THAT(err, Eq(nullptr)); + Mock::VerifyAndClearExpectations(mock_request.get()); + Mock::VerifyAndClearExpectations(&kafka_client); +} + +} \ No newline at end of file diff --git a/receiver/unittests/request_handler/test_requests_dispatcher.cpp b/receiver/unittests/request_handler/test_requests_dispatcher.cpp index c7831accb35c3ca48c118e3825fded1a086bdce1..d5c7ead18205916bc0d1019225d22385f9ed739d 100644 --- a/receiver/unittests/request_handler/test_requests_dispatcher.cpp +++ b/receiver/unittests/request_handler/test_requests_dispatcher.cpp @@ -20,7 +20,7 @@ namespace { TEST(RequestDispatcher, Constructor) { auto stat = std::unique_ptr<ReceiverStatistics> {new ReceiverStatistics}; - RequestsDispatcher dispatcher{0, "some_address", stat.get(), nullptr}; + RequestsDispatcher dispatcher{0, "some_address", stat.get(), nullptr, nullptr}; ASSERT_THAT(dynamic_cast<const asapo::ReceiverStatistics*>(dispatcher.statistics__), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::IO*>(dispatcher.io__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::RequestFactory*>(dispatcher.request_factory__.get()), Ne(nullptr)); @@ -40,7 +40,7 @@ class MockRequest: public Request { class MockRequestFactory: public asapo::RequestFactory { public: - MockRequestFactory(): RequestFactory(nullptr) {}; + MockRequestFactory(): RequestFactory(nullptr, nullptr) {}; std::unique_ptr<Request> GenerateRequest(const GenericRequestHeader& request_header, SocketDescriptor socket_fd, std::string origin_uri, Error* err) const noexcept override { @@ -80,7 +80,7 @@ class RequestsDispatcherTests : public Test { void SetUp() override { test_config.authorization_interval_ms = 0; SetReceiverConfig(test_config, "none"); - dispatcher = std::unique_ptr<RequestsDispatcher> {new RequestsDispatcher{0, connected_uri, &mock_statictics, nullptr}}; + dispatcher = std::unique_ptr<RequestsDispatcher> {new RequestsDispatcher{0, connected_uri, &mock_statictics, nullptr, nullptr}}; dispatcher->io__ = std::unique_ptr<asapo::IO> {&mock_io}; dispatcher->statistics__ = &mock_statictics; dispatcher->request_factory__ = std::unique_ptr<asapo::RequestFactory> {&mock_factory}; diff --git a/receiver/unittests/test_config.cpp b/receiver/unittests/test_config.cpp index 8ab74566ad81284e5bc37fd1f653784f9705cc5a..289113eaffcbf02e2d1bbd8b9ebdce0ba69919c9 100644 --- a/receiver/unittests/test_config.cpp +++ b/receiver/unittests/test_config.cpp @@ -81,6 +81,8 @@ TEST_F(ConfigTests, ReadSettings) { ASSERT_THAT(config->receive_to_disk_threshold_mb, Eq(50)); ASSERT_THAT(config->metrics.expose, Eq(true)); ASSERT_THAT(config->metrics.listen_port, Eq(123)); + ASSERT_THAT(config->kafka_config.enabled, Eq(false)); + } @@ -92,7 +94,7 @@ TEST_F(ConfigTests, ErrorReadSettings) { "DataCache", "Use", "SizeGB", "ReservedShare", "DatabaseServer", "Tag", "AuthorizationServer", "AuthorizationInterval", "PerformanceDbName", "LogLevel", "NThreads", "DiscoveryServer", "AdvertiseURI", "NetworkMode", "MonitorPerformance", - "ReceiveToDiskThresholdMB", "Metrics", "Expose"}; + "ReceiveToDiskThresholdMB", "Metrics", "Expose","Enabled"}; for (const auto& field : fields) { auto err = asapo::SetReceiverConfig(test_config, field); ASSERT_THAT(err, Ne(nullptr)); diff --git a/receiver/unittests/test_connection.cpp b/receiver/unittests/test_connection.cpp index d8d24445a227ebd121a83cadd3f3b547a3307ac1..4f73aaa17bc3e2ba1faa7a6a72b036a0ba1b265e 100644 --- a/receiver/unittests/test_connection.cpp +++ b/receiver/unittests/test_connection.cpp @@ -14,7 +14,7 @@ using namespace asapo; namespace { TEST(Connection, Constructor) { - Connection connection{0, "some_address", nullptr, "some_tag"}; + Connection connection{0, "some_address", nullptr, nullptr, "some_tag"}; ASSERT_THAT(dynamic_cast<asapo::Statistics*>(connection.statistics__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::IO*>(connection.io__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(connection.log__), Ne(nullptr)); @@ -24,7 +24,7 @@ TEST(Connection, Constructor) { class MockDispatcher: public asapo::RequestsDispatcher { public: - MockDispatcher(): asapo::RequestsDispatcher(0, "", nullptr, nullptr) {}; + MockDispatcher(): asapo::RequestsDispatcher(0, "", nullptr, nullptr, nullptr) {}; Error ProcessRequest(const std::unique_ptr<Request>& request) const noexcept override { return Error{ProcessRequest_t(request.get())}; } @@ -52,7 +52,7 @@ class ConnectionTests : public Test { std::unique_ptr<Connection> connection; void SetUp() override { - connection = std::unique_ptr<Connection> {new Connection{0, connected_uri, nullptr, "some_tag"}}; + connection = std::unique_ptr<Connection> {new Connection{0, connected_uri, nullptr, nullptr, "some_tag"}}; connection->io__ = std::unique_ptr<asapo::IO> {&mock_io}; connection->statistics__ = std::unique_ptr<asapo::ReceiverStatistics> {&mock_statictics}; connection->log__ = &mock_logger; diff --git a/receiver/unittests/test_receiver.cpp b/receiver/unittests/test_receiver.cpp index 49e7859cd08227ff27e9baa7166d26fcab0e19c3..1d032201124db3a5a4644a9485d3bd57c7280fa0 100644 --- a/receiver/unittests/test_receiver.cpp +++ b/receiver/unittests/test_receiver.cpp @@ -12,7 +12,7 @@ using namespace asapo; namespace { TEST(Receiver, Constructor) { - asapo::Receiver receiver(nullptr); + asapo::Receiver receiver(nullptr, nullptr); ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(receiver.log__), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::IO*>(receiver.io__.get()), Ne(nullptr)); } @@ -31,7 +31,7 @@ class StartListenerFixture : public testing::Test { Error err; ::testing::NiceMock<asapo::MockLogger> mock_logger; ::testing::NiceMock<asapo::MockIO> mock_io; - asapo::Receiver receiver{nullptr}; + asapo::Receiver receiver{nullptr, nullptr}; void SetUp() override { err = nullptr; diff --git a/tests/automatic/producer_receiver/CMakeLists.txt b/tests/automatic/producer_receiver/CMakeLists.txt index aae8d4bf65ba9db9753972c71493c900cb5a8310..c3689a65a9c31061619848dbf12a02b393981075 100644 --- a/tests/automatic/producer_receiver/CMakeLists.txt +++ b/tests/automatic/producer_receiver/CMakeLists.txt @@ -1,5 +1,6 @@ add_subdirectory(transfer_single_file) add_subdirectory(transfer_single_file_bypass_buffer) +add_subdirectory(transfer_single_file_with_kafka) if (BUILD_PYTHON) add_subdirectory(transfer_single_file_write_to_raw) diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/CMakeLists.txt b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..daf1cbec1ffc1308ca52bfbce65ef2bc19a0e037 --- /dev/null +++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/CMakeLists.txt @@ -0,0 +1,20 @@ +set(TARGET_NAME transfer-single-file_kafka) +if (NOT WIN32) + set(SOURCE_FILES kafka_mock.cpp) + + add_executable(${TARGET_NAME} ${SOURCE_FILES}) + target_link_libraries(${TARGET_NAME} ${RDKAFKA_C_LIBRARIES}) + target_include_directories(${TARGET_NAME} PUBLIC ${RDKAFKA_INCLUDE_DIR}) + + #use expression generator to get rid of VS adding Debug/Release folders + set_target_properties(${TARGET_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY + ${CMAKE_CURRENT_BINARY_DIR}$<$<CONFIG:Debug>:> + ) + + ################################ + # Testing + ################################ + prepare_asapo() + + add_script_test("${TARGET_NAME}" "$<TARGET_FILE:dummy-data-producer>" nomem) +endif() \ No newline at end of file diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..6af384ac67285ab62fff4094197c7bea4bcdf572 --- /dev/null +++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh @@ -0,0 +1,71 @@ +#!/usr/bin/env bash + +set -e + +trap Cleanup EXIT + +database_name=db_test +beamtime_id=asapo_test +beamline=test +receiver_root_folder=/tmp/asapo/receiver/files +facility=test_facility +year=2019 +receiver_folder=${receiver_root_folder}/${facility}/gpfs/${beamline}/${year}/data/${beamtime_id} +receiver_folder_online=${receiver_root_folder}/beamline/${beamline}/current + +Cleanup() { + echo cleanup + set +e + nomad stop receiver + nomad run receiver_tcp.nmd + while true + do + sleep 1 + curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.1 --stderr - | grep 127.0.0.1 || continue + echo recevier started + break + done + rm -rf ${receiver_root_folder} + echo "db.dropDatabase()" | mongo ${beamtime_id}_detector + influx -database ${database_name} -execute "drop series from statistics, RequestsRate" +} + +rm -f bootstrap + +./transfer-single-file_kafka ${receiver_folder_online}/raw/1 & KAFKA_PID=$! + +echo "Started the kafka listener" + +while [ ! -f bootstrap ]; do + if ! kill -0 $KAFKA_PID > /dev/null 2>&1; then + echo Kafka listener exited unexpectedly + exit 1 + fi + sleep 1 +done + +BOOTSTRAP=$(cat bootstrap) + +echo "Read kafka bootstrap: ${BOOTSTRAP}" + +nomad stop receiver +nomad run -var receiver_kafka_metadata_broker_list="${BOOTSTRAP}" receiver_kafka.nmd +while true +do + sleep 1 + curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.1 --stderr - | grep 127.0.0.1 || continue + echo recevier started + break +done + +mkdir -p ${receiver_folder} +mkdir -p ${receiver_folder_online} + +$1 localhost:8400 ${beamtime_id} 100 1 1 100 30 + +ls -ln ${receiver_folder_online}/raw/1 | awk '{ print $5 }'| grep 100000 + +wait $KAFKA_PID +RESULT=$? + +echo "Mock kafka returned $RESULT" diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8da0c966103be4de971edc3451273a7559374536 --- /dev/null +++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp @@ -0,0 +1,92 @@ +#include <stdlib.h> +#include <string.h> + +#include "librdkafka/rdkafka.h" +#include "librdkafka/rdkafka_mock.h" + +int main(int argc, char *argv[]) { + + char *expectedmsg; + asprintf(&expectedmsg, "{\"event\":\"IN_CLOSE_WRITE\",\"path\":\"%s\"}", argc > 1 ? argv[argc - 1] : "processed/1"); + + rd_kafka_conf_t *conf = rd_kafka_conf_new(); + char errstr[256]; + + if(rd_kafka_conf_set(conf, "client.id", "MOCK", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "Failed to set kafka config: %s\n", errstr); + return EXIT_FAILURE; + } + /*if(rd_kafka_conf_set(conf, "bootstrap.servers", "127.0.0.1:23456", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "Failed to set kafka config: %s\n", errstr); + return EXIT_FAILURE; + }*/ + + rd_kafka_t *rkproducer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); + if(!rkproducer) { + fprintf(stderr, "Failed to create kafka producer: %s\n", errstr); + return EXIT_FAILURE; + } + + rd_kafka_mock_cluster_t *mcluster = rd_kafka_mock_cluster_new(rkproducer, 1); + if(!mcluster) { + fprintf(stderr, "Failed to create kafka cluster: %s\n", errstr); + return EXIT_FAILURE; + } + + const char* bootstrap = rd_kafka_mock_cluster_bootstraps(mcluster); + + FILE *fp = fopen("bootstrap", "w"); + fprintf(fp, "%s", bootstrap); + fclose(fp); + + conf = rd_kafka_conf_new(); + if(rd_kafka_conf_set(conf, "client.id", "MOCK_CONSUMER", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "Failed to set kafka config: %s\n", errstr); + return EXIT_FAILURE; + } + if(rd_kafka_conf_set(conf, "group.id", "asapo", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "Failed to set kafka config: %s\n", errstr); + return EXIT_FAILURE; + } + if(rd_kafka_conf_set(conf, "bootstrap.servers", bootstrap, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "Failed to set kafka config: %s\n", errstr); + return EXIT_FAILURE; + } + + rd_kafka_t *rkconsumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); + + if(!rkconsumer) { + fprintf(stderr, "Failed to create kafka consumer: %s\n", errstr); + return EXIT_FAILURE; + } + + rd_kafka_poll_set_consumer(rkconsumer); + rd_kafka_topic_partition_list_t* subscription = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(subscription, "asapo", RD_KAFKA_PARTITION_UA); + rd_kafka_resp_err_t err = rd_kafka_subscribe(rkconsumer, subscription); + if (err) { + fprintf(stderr, "Failed to subscribe to topic: %s\n", rd_kafka_err2str(err)); + return EXIT_FAILURE; + } + + + rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rkconsumer, 30000); + + if(!rkmessage) { + fprintf(stderr, "No kafka message received\n"); + return EXIT_FAILURE; + } else { + if (rkmessage->err) { + fprintf(stderr, "Got error: %s\n", rd_kafka_message_errstr(rkmessage)); + return EXIT_FAILURE; + } else { + if (!strncmp((const char *)rkmessage->payload, expectedmsg, rkmessage->len)) { + fprintf(stdout, "Kafka message is correct: %.*s\n", (int)rkmessage->len, (const char *)rkmessage->payload); + return EXIT_SUCCESS; + } else { + fprintf(stderr, "Kafka message is incorrect: %.*s (expected %s)\n", (int)rkmessage->len, (const char *)rkmessage->payload, expectedmsg); + return EXIT_FAILURE; + } + } + } +} diff --git a/tests/automatic/settings/receiver_fabric.json.tpl.lin.in b/tests/automatic/settings/receiver_fabric.json.tpl.lin.in index f7524f9e9a6e47b08d3bd8881a5657af2f7fd518..c3eb3f433ad53e400df06d1803f4eecd67c087d5 100644 --- a/tests/automatic/settings/receiver_fabric.json.tpl.lin.in +++ b/tests/automatic/settings/receiver_fabric.json.tpl.lin.in @@ -24,5 +24,8 @@ "ListenPort": {{ env "NOMAD_PORT_recv" }}, "Tag": "{{ env "NOMAD_ADDR_recv" }}", "ReceiveToDiskThresholdMB":50, - "LogLevel" : "debug" + "LogLevel" : "debug", + "Kafka" : { + "Enabled" : false + } } diff --git a/tests/automatic/settings/receiver_kafka.json.tpl.lin.in b/tests/automatic/settings/receiver_kafka.json.tpl.lin.in new file mode 100644 index 0000000000000000000000000000000000000000..f9d0fae47e197697f9174e37b2dfb87ce634e7ba --- /dev/null +++ b/tests/automatic/settings/receiver_kafka.json.tpl.lin.in @@ -0,0 +1,37 @@ +{ + "PerformanceDbServer":"localhost:8086", + "MonitorPerformance": true, + "PerformanceDbName": "db_test", + "DatabaseServer":"auto", + "DiscoveryServer": "localhost:8400/asapo-discovery", + "DataServer": { + "AdvertiseURI": "127.0.0.1:{{ env "NOMAD_PORT_recv_ds" }}", + "NThreads": 2, + "ListenPort": {{ env "NOMAD_PORT_recv_ds" }}, + "NetworkMode": ["tcp"] + }, + "Metrics": { + "Expose": true, + "ListenPort": {{ env "NOMAD_PORT_recv_metrics" }} + }, + "DataCache": { + "Use": @RECEIVER_USE_CACHE@, + "SizeGB": 1, + "ReservedShare": 10 + }, + "AuthorizationServer": "localhost:8400/asapo-authorizer", + "AuthorizationInterval": 1000, + "ListenPort": {{ env "NOMAD_PORT_recv" }}, + "Tag": "{{ env "NOMAD_ADDR_recv" }}", + "ReceiveToDiskThresholdMB":50, + "LogLevel" : "debug", + "Kafka" : { + "Enabled" : true, + "KafkaClient": { + "metadata.broker.list": "{{ env "NOMAD_META_receiver_kafka_metadata_broker_list" }}" + }, + "KafkaTopics": { + "asapo": {} + } + } + } diff --git a/tests/automatic/settings/receiver_tcp.json.tpl.lin.in b/tests/automatic/settings/receiver_tcp.json.tpl.lin.in index 498d9fdcc33c7344ca12d7d13fb36a90b26bd4ee..ebaa3aea27f523d7c06d0d61d9c72825e054da98 100644 --- a/tests/automatic/settings/receiver_tcp.json.tpl.lin.in +++ b/tests/automatic/settings/receiver_tcp.json.tpl.lin.in @@ -24,5 +24,8 @@ "ListenPort": {{ env "NOMAD_PORT_recv" }}, "Tag": "{{ env "NOMAD_ADDR_recv" }}", "ReceiveToDiskThresholdMB":50, - "LogLevel" : "debug" + "LogLevel" : "debug", + "Kafka" : { + "Enabled" : false + } } diff --git a/tests/automatic/settings/receiver_tcp.json.tpl.win.in b/tests/automatic/settings/receiver_tcp.json.tpl.win.in index 446ebcbf270af898204cbf7e2d93e2e45d864ecb..1c08e1b8448bf8fa79092889266ca74c69d925c6 100644 --- a/tests/automatic/settings/receiver_tcp.json.tpl.win.in +++ b/tests/automatic/settings/receiver_tcp.json.tpl.win.in @@ -24,5 +24,8 @@ }, "Tag": "{{ env "NOMAD_ADDR_recv" }}", "ReceiveToDiskThresholdMB":50, - "LogLevel" : "debug" + "LogLevel" : "debug", + "Kafka" : { + "Enabled" : false + } }