Skip to content
Snippets Groups Projects
Commit 8f4be247 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

Merge pull request #187 in ASAPO/asapo from feature_ASAPO-152-kafka-notifications to develop

* commit '7f88d281':
  some refactoring
  Send kafka notifications only for online files
  fix kafka unit test
  Remove windows test for kafka, update kafka message for full path
  final (hopefully) fix to kafka integration test
  another fix to kafka integration test
  correctly pass the nomad var to integration test
  fix warnings in kafka_mock
  kafka integration test
  docker image script update
  update hints and cmake message
  copy dll files to receiver output folder
  use FindRdKafka
  Better kafka cmake deps
  And another receiver konfig fix
  Add kafka config to deploy configuration
  Fix unit tests
  Add unittests for kafka notifications
  Add rdkafka client
parents 1e52da7d 7f88d281
No related branches found
No related tags found
No related merge requests found
Showing
with 444 additions and 1 deletion
......@@ -19,6 +19,12 @@ if(CURL_FOUND) #old FindCURL versions do not create CURL::libcurl target, so we
endif()
find_package(RdKafka REQUIRED)
message (STATUS "Found rdkafka++ libraries: ${RDKAFKA_LIBRARIES}")
message (STATUS "rdkafka++ include dir : ${RDKAFKA_INCLUDE_DIR}")
if (WIN32)
message (STATUS "rdkafka++ binary dir (dll): ${RDKAFKA_BIN_DIR}")
endif()
# python is needed anyway, even if no Python packages are build (e.g. to parse test results)
if ("${Python_EXECUTABLE}" STREQUAL "")
......
# FindRdKafka
# -------------
#
# Tries to find RdKafka on the system
#
# Available variables
# RDKAFKA_LIBRARIES - Path to the library
# RDKAFKA_INCLUDE_DIR - Path to the include dir
cmake_minimum_required(VERSION 3.12)
find_path(RDKAFKA_INCLUDE_DIR librdkafka/rdkafka.h HINTS ${RdKafka_DIR}/include)
find_library(RDKAFKA_LIBRARIES rdkafka++ HINTS ${RdKafka_DIR}/lib ${RdKafka_DIR}/lib64)
find_library(RDKAFKA_C_LIBRARIES rdkafka HINTS ${RdKafka_DIR}/lib ${RdKafka_DIR}/lib64)
IF(WIN32)
find_path(RDKAFKA_BIN_DIR rdkafka++.dll HINTS ${RdKafka_DIR}/bin ${RdKafka_DIR}/lib)
mark_as_advanced(RDKAFKA_BIN_DIR)
find_package_handle_standard_args(RdKafka REQUIRED_VARS RDKAFKA_INCLUDE_DIR RDKAFKA_C_LIBRARIES RDKAFKA_LIBRARIES RDKAFKA_BIN_DIR)
ELSE()
find_package_handle_standard_args(RdKafka REQUIRED_VARS RDKAFKA_INCLUDE_DIR RDKAFKA_C_LIBRARIES RDKAFKA_LIBRARIES)
ENDIF()
mark_as_advanced(RDKAFKA_LIBRARIES RDKAFKA_INCLUDE_DIR)
......@@ -32,12 +32,14 @@ function(prepare_asapo)
else()
configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver_tcp.json.tpl.lin.in receiver_tcp.json.tpl @ONLY)
configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver_fabric.json.tpl.lin.in receiver_fabric.json.tpl @ONLY)
configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver_kafka.json.tpl.lin.in receiver_kafka.json.tpl @ONLY)
configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/authorizer_settings.json.tpl.lin authorizer.json.tpl COPYONLY)
configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/common_scripts/start_services.sh start_services.sh COPYONLY)
configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/common_scripts/stop_services.sh stop_services.sh COPYONLY)
configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver_tcp.nmd.in receiver_tcp.nmd @ONLY)
configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver_fabric.nmd.in receiver_fabric.nmd @ONLY)
configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver_kafka.nmd.in receiver_kafka.nmd @ONLY)
configure_file(${CMAKE_SOURCE_DIR}/config/nomad/nginx_kill_lin.nmd nginx_kill.nmd @ONLY)
endif()
......
......@@ -8,6 +8,7 @@ add_subdirectory(src/data_structs)
add_subdirectory(src/version)
add_subdirectory(src/kafka_client)
add_subdirectory(src/http_client)
......
......@@ -4,6 +4,7 @@
#include <string>
#include <memory>
#include <vector>
#include <map>
#include <string>
#include "asapo/common/error.h"
......@@ -20,6 +21,8 @@ class JsonParser {
Error GetString(const std::string& name, std::string* val) const noexcept;
Error GetArrayUInt64(const std::string& name, std::vector<uint64_t>* val) const noexcept;
Error GetArrayString(const std::string& name, std::vector<std::string>* val) const noexcept;
Error GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept;
Error GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept;
Error GetArrayRawStrings(const std::string& name, std::vector<std::string>* val) const noexcept;
Error GetRawString(std::string* val) const noexcept;
Error GetFlattenedString(const std::string& prefix, const std::string& separator, std::string* val) const noexcept;
......
#ifndef ASAPO_KAFKA_CLIENT_H
#define ASAPO_KAFKA_CLIENT_H
#include "asapo/common/error.h"
#include "asapo/common/data_structs.h"
#include "asapo/kafka_client/kafka_config.h"
#include "asapo/kafka_client/kafka_error.h"
#include <map>
namespace asapo {
class KafkaClient {
public:
virtual Error Send(const std::string& data,
const std::string& topic) noexcept = 0;
virtual ~KafkaClient() = default;
};
KafkaClient* CreateKafkaClient(const KafkaClientConfig& config, Error* err);
}
#endif //ASAPO_KAFKA_CLIENT_H
#ifndef ASAPO_KAFKA_CONFIG_H
#define ASAPO_KAFKA_CONFIG_H
#include <string>
#include <map>
namespace asapo {
struct KafkaClientConfig {
bool enabled{false};
std::map<std::string, std::string> global_config;
std::map<std::string, std::map<std::string, std::string>> topics_config;
};
}
#endif //ASAPO_KAFKA_CONFIG_H
#ifndef ASAPO_KAFKA_ERROR_H
#define ASAPO_KAFKA_ERROR_H
#include "asapo/common/error.h"
namespace asapo {
enum class KafkaErrorType {
kQueueFullError,
kMessageTooLargeError,
kUnknownPartitionError,
kUnknownTopicError,
kGeneralError
};
using KafkaErrorTemplate = ServiceErrorTemplate<KafkaErrorType>;
namespace KafkaErrorTemplates {
auto const kQueueFullError = KafkaErrorTemplate {
"kafka queue is full", KafkaErrorType::kQueueFullError
};
auto const kMessageTooLargeError = KafkaErrorTemplate {
"kafka message is too large", KafkaErrorType::kMessageTooLargeError
};
auto const kUnknownPartitionError = KafkaErrorTemplate {
"partition is unknown in the kafka cluster", KafkaErrorType::kUnknownPartitionError
};
auto const kUnknownTopicError = KafkaErrorTemplate {
"partition is unknown in the kafka cluster", KafkaErrorType::kUnknownTopicError
};
auto const kGeneralError = KafkaErrorTemplate {
"unexpected kafka error occurred", KafkaErrorType::kGeneralError
};
}
}
#endif //ASAPO_KAFKA_ERROR_H
#ifndef ASAPO_MOCKKAFKACLIENT_H
#define ASAPO_MOCKKAFKACLIENT_H
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include "asapo/kafka_client/kafka_client.h"
namespace asapo {
class MockKafkaClient : public KafkaClient {
public:
Error Send(const std::string& data, const std::string& topic) noexcept override {
return Error{Send_t(data, topic)};
}
MOCK_METHOD(ErrorInterface *, Send_t, (const std::string& data, const std::string& topic), ());
};
}
#endif //ASAPO_MOCKKAFKACLIENT_H
......@@ -24,6 +24,13 @@ Error JsonParser::GetArrayString(const std::string& name, std::vector<std::strin
return rapid_json_->GetArrayString(name, val);
}
Error JsonParser::GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept {
return rapid_json_->GetArrayObjectMembers(name, val);
}
Error JsonParser::GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept {
return rapid_json_->GetDictionaryString(name, val);
}
Error JsonParser::GetBool(const std::string& name, bool* val) const noexcept {
return rapid_json_->GetBool(name, val);
......
......@@ -168,6 +168,40 @@ Error RapidJson::GetArrayString(const std::string& name, std::vector<std::string
}
Error RapidJson::GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept {
Value* json_val;
if (Error err = GetValuePointer(name, ValueType::kObject, &json_val)) {
return err;
}
val->clear();
for (auto& m : json_val->GetObject()) {
if (!m.name.IsString()) {
return GeneralErrorTemplates::kSimpleError.Generate("wrong type of object element: " + name);
}
val->push_back(m.name.GetString());
}
return nullptr;
}
Error RapidJson::GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept {
Value* json_val;
if (Error err = GetValuePointer(name, ValueType::kObject, &json_val)) {
return err;
}
val->clear();
for (auto& m : json_val->GetObject()) {
if (!m.value.IsString() || !m.name.IsString()) {
return GeneralErrorTemplates::kSimpleError.Generate("wrong type of dictionary element: " + name);
}
(*val)[m.name.GetString()] = m.value.GetString();
}
return nullptr;
}
RapidJson::RapidJson(const RapidJson& parent, const std::string& subname) {
auto err = parent.GetValuePointer(subname, ValueType::kObject, &object_p_);
if (err) {
......
......@@ -4,6 +4,7 @@
#include "rapidjson/document.h"
#include "asapo/common/error.h"
#include "asapo/io/io.h"
#include <map>
namespace asapo {
......@@ -26,6 +27,8 @@ class RapidJson {
Error GetString(const std::string& name, std::string* val) const noexcept;
Error GetArrayUInt64(const std::string& name, std::vector<uint64_t>* val) const noexcept;
Error GetArrayString(const std::string& name, std::vector<std::string>* val) const noexcept;
Error GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept;
Error GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept;
Error GetArrayRawStrings(const std::string& name, std::vector<std::string>* val) const noexcept;
Error GetRawString(std::string* val) const noexcept;
Error GetFlattenedString(const std::string& prefix, const std::string& separator, std::string* val)const noexcept;
......
set(TARGET_NAME rdkafka_client)
set(SOURCE_FILES
rdkafka_client.cpp
kafka_client_factory.cpp ../../include/asapo/preprocessor/definitions.h)
################################
# Library
################################
add_library(${TARGET_NAME} OBJECT ${SOURCE_FILES})
target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR} ${RDKAFKA_INCLUDE_DIR})
#include "rdkafka_client.h"
namespace asapo {
KafkaClient* CreateKafkaClient(const KafkaClientConfig& config, Error* err) {
try {
return new RdKafkaClient(config);
}
catch (std::string errstr) {
(*err) = KafkaErrorTemplates::kGeneralError.Generate(errstr);
}
return nullptr;
}
}
#include "rdkafka_client.h"
#include <cstring>
#include "asapo/common/data_structs.h"
#include "asapo/io/io_factory.h"
namespace asapo {
RdKafkaClient::RdKafkaClient(const KafkaClientConfig& config) : default_topic_conf_(nullptr) {
std::string err;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
for (const auto& configItem : config.global_config) {
if (conf->set(configItem.first, configItem.second, err) != RdKafka::Conf::CONF_OK) {
throw "cannot initialize kafka: " + err;
}
}
producer_ = RdKafka::Producer::create(conf, err);
if (!producer_) {
throw "cannot initialize kafka";
}
for (const auto& topic : config.topics_config) {
auto topic_config = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
for (const auto& configItem : topic.second) {
if (topic_config->set(configItem.first, configItem.second, err) != RdKafka::Conf::CONF_OK) {
throw "cannot initialize kafka: " + err;
}
}
if (topic.first == "default") {
default_topic_conf_ = topic_config;
} else
{
auto topic_obj = RdKafka::Topic::create(producer_, topic.first, topic_config, err);
if (!topic_obj) {
throw "cannot initialize kafka topic [" + topic.first + "]: " + err;
}
kafka_topics_[topic.first] = topic_obj;
}
}
}
RdKafkaClient::~RdKafkaClient() {
if (producer_) {
producer_->flush(1000);
}
delete producer_;
}
Error RdKafkaClient::Send(const std::string& data, const std::string& topic_name) noexcept {
auto topicIt = kafka_topics_.find(topic_name);
RdKafka::Topic* topic;
if (topicIt == kafka_topics_.end())
{
if (!default_topic_conf_) {
return KafkaErrorTemplates::kUnknownTopicError.Generate();
}
std::string err;
topic = RdKafka::Topic::create(producer_, topic_name, default_topic_conf_, err);
if (!topic) {
return KafkaErrorTemplates::kGeneralError.Generate("Cannot create kafka topic [" + topic_name + "]: " + err);
}
kafka_topics_[topic_name] = topic;
}
else
{
topic = topicIt->second;
}
auto err = producer_->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<void*>(static_cast<const void *>(data.data())), data.size(),
nullptr, nullptr);
switch (err) {
case RdKafka::ERR_NO_ERROR: return nullptr;
case RdKafka::ERR__QUEUE_FULL: return KafkaErrorTemplates::kQueueFullError.Generate();
case RdKafka::ERR_MSG_SIZE_TOO_LARGE: return KafkaErrorTemplates::kMessageTooLargeError.Generate();
case RdKafka::ERR__UNKNOWN_PARTITION: return KafkaErrorTemplates::kUnknownPartitionError.Generate();
case RdKafka::ERR__UNKNOWN_TOPIC: return KafkaErrorTemplates::kUnknownTopicError.Generate();
default: return KafkaErrorTemplates::kGeneralError.Generate(err2str(err));
}
}
}
#ifndef ASAPO_RDKAFKA_CLIENT_H
#define ASAPO_RDKAFKA_CLIENT_H
#include <string>
#include "asapo/kafka_client/kafka_client.h"
#include "librdkafka/rdkafkacpp.h"
namespace asapo {
class RdKafkaClient final : public KafkaClient {
public:
RdKafkaClient(const KafkaClientConfig& config);
Error Send(const std::string& data, const std::string& topic) noexcept override;
virtual ~RdKafkaClient();
private:
RdKafka::Producer* producer_;
RdKafka::Conf* default_topic_conf_;
std::map<std::string, RdKafka::Topic *> kafka_topics_;
};
}
#endif //ASAPO_RDKAFKA_CLIENT_H
......@@ -19,6 +19,7 @@ using ::testing::Return;
using ::testing::SetArgPointee;
using ::testing::HasSubstr;
using ::testing::ElementsAre;
using ::testing::Pair;
using ::testing::DoAll;
using asapo::JsonFileParser;
......@@ -209,6 +210,54 @@ TEST(ParseString, StringArrayConvertToJson) {
ASSERT_THAT(vec, ElementsAre("s1", "s2", "s3"));
}
TEST(ParseString, ObjectMemberArrayConvertToJson) {
std::string json = R"({"object":{"k1":"v1","k2":"v2","k3":"v3"}})";
JsonStringParser parser{json};
std::vector<std::string> vec;
auto err = parser.GetArrayObjectMembers("object", &vec);
ASSERT_THAT(err, Eq(nullptr));
ASSERT_THAT(vec, ElementsAre("k1", "k2", "k3"));
}
TEST(ParseString, DictionaryStringConvertToJson) {
std::string json = R"({"object":{"k1":"v1","k2":"v2","k3":"v3"}})";
JsonStringParser parser{json};
std::map<std::string, std::string> map;
auto err = parser.GetDictionaryString("object", &map);
ASSERT_THAT(err, Eq(nullptr));
ASSERT_THAT(map, ElementsAre(Pair("k1", "v1"), Pair("k2", "v2"), Pair("k3", "v3")));
}
TEST(ParseString, RawStringConvertToJson) {
std::string json = R"({"object":{"k1":"v1","k2":"v2","k3":"v3"}})";
JsonStringParser parser{json};
std::string value;
auto err = parser.GetRawString(&value);
ASSERT_THAT(err, Eq(nullptr));
ASSERT_THAT(json, Eq(value));
}
TEST(ParseString, ArrayRawStringConvertToJson) {
std::string json = R"({"array":[{"k1":"v1"},{"k2":"v2"},{"k3":"v3"}]})";
JsonStringParser parser{json};
std::vector<std::string> vec;
auto err = parser.GetArrayRawStrings("array", &vec);
ASSERT_THAT(err, Eq(nullptr));
ASSERT_THAT(vec, ElementsAre(R"({"k1":"v1"})", R"({"k2":"v2"})", R"({"k3":"v3"})"));
}
class ParseFileTests : public Test {
public:
NiceMock<MockIO> mock_io;
......
variable "receiver_kafka_metadata_broker_list" {
type = string
}
job "receiver" {
datacenters = ["dc1"]
type = "service"
group "group" {
count = 1
task "receiver" {
driver = "raw_exec"
config {
command = "@RECEIVER_DIR@/@RECEIVER_NAME@"
args = ["${NOMAD_TASK_DIR}/receiver.json"]
}
resources {
cpu = 500 # 500 MHz
memory = 256 # 256MB
network {
port "recv" {}
port "recv_ds" {}
port "recv_metrics" {}
}
}
service {
name = "asapo-receiver"
port = "recv"
check {
name = "metrics"
type = "http"
port = "recv_metrics"
path = "/metrics"
interval = "10s"
timeout = "2s"
initial_status = "passing"
}
meta {
metrics-port = "${NOMAD_PORT_recv_metrics}"
}
}
meta {
receiver_kafka_metadata_broker_list = "${var.receiver_kafka_metadata_broker_list}"
}
template {
source = "@WORK_DIR@/receiver_kafka.json.tpl"
destination = "local/receiver.json"
change_mode = "signal"
change_signal = "SIGHUP"
}
}
}
}
......@@ -24,5 +24,14 @@
},
"Tag": "receiver",
"ReceiveToDiskThresholdMB": {{ .Values.ownServices.receiver.receiveToDiskThresholdMB }},
"LogLevel": "info"
"LogLevel": "info",
"Kafka": {
"Enabled": true,
"KafkaClient": {
"metadata.broker.list": {{ .Values.ownServices.receiver.kafkaBrokerList }}
},
"KafkaTopics": {
"asapo": {}
}
}
}
......@@ -33,6 +33,7 @@ ownServices:
enable: true
sizeGb: 1
receiveToDiskThresholdMB: 200
kafkaBrokerList: asap3-events-01,asap3-events-02
fileTransfer:
serviceName: asapo-file-transfer
replicaCount: 1
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment