Skip to content
Snippets Groups Projects
Commit e4f9d3e4 authored by George Sedov's avatar George Sedov
Browse files

Add rdkafka client

Adds rdkafka dependency, rdkafka client,
and notification for file close event
parent 5138d1c2
No related branches found
No related tags found
1 merge request!185Develop
Showing
with 340 additions and 3 deletions
......@@ -8,6 +8,7 @@ add_subdirectory(src/data_structs)
add_subdirectory(src/version)
add_subdirectory(src/kafka_client)
add_subdirectory(src/http_client)
......
......@@ -4,6 +4,7 @@
#include <string>
#include <memory>
#include <vector>
#include <map>
#include <string>
#include "asapo/common/error.h"
......@@ -20,6 +21,8 @@ class JsonParser {
Error GetString(const std::string& name, std::string* val) const noexcept;
Error GetArrayUInt64(const std::string& name, std::vector<uint64_t>* val) const noexcept;
Error GetArrayString(const std::string& name, std::vector<std::string>* val) const noexcept;
Error GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept;
Error GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept;
Error GetArrayRawStrings(const std::string& name, std::vector<std::string>* val) const noexcept;
Error GetRawString(std::string* val) const noexcept;
Error GetFlattenedString(const std::string& prefix, const std::string& separator, std::string* val) const noexcept;
......
#ifndef ASAPO_KAFKA_CLIENT_H
#define ASAPO_KAFKA_CLIENT_H
#include "asapo/common/error.h"
#include "asapo/common/data_structs.h"
#include "asapo/kafka_client/kafka_config.h"
#include "asapo/kafka_client/kafka_error.h"
#include <map>
namespace asapo {
class KafkaClient {
public:
virtual Error Send(const std::string& data,
const std::string& topic) noexcept = 0;
virtual ~KafkaClient() = default;
};
Error InitializeKafkaClient(const KafkaClientConfig& config);
std::unique_ptr<KafkaClient> GetKafkaClient();
}
#endif //ASAPO_KAFKA_CLIENT_H
#ifndef ASAPO_KAFKA_CONFIG_H
#define ASAPO_KAFKA_CONFIG_H
#include <string>
#include <map>
namespace asapo {
struct KafkaClientConfig {
std::map<std::string, std::string> global_config;
std::map<std::string, std::map<std::string, std::string>> topics_config;
};
}
#endif //ASAPO_KAFKA_CONFIG_H
#ifndef ASAPO_KAFKA_ERROR_H
#define ASAPO_KAFKA_ERROR_H
#include "asapo/common/error.h"
namespace asapo {
enum class KafkaErrorType {
kQueueFullError,
kMessageTooLargeError,
kUnknownPartitionError,
kUnknownTopicError,
kGeneralError
};
using KafkaErrorTemplate = ServiceErrorTemplate<KafkaErrorType>;
namespace KafkaErrorTemplates {
auto const kQueueFullError = KafkaErrorTemplate {
"kafka queue is full", KafkaErrorType::kQueueFullError
};
auto const kMessageTooLargeError = KafkaErrorTemplate {
"kafka message is too large", KafkaErrorType::kMessageTooLargeError
};
auto const kUnknownPartitionError = KafkaErrorTemplate {
"partition is unknown in the kafka cluster", KafkaErrorType::kUnknownPartitionError
};
auto const kUnknownTopicError = KafkaErrorTemplate {
"partition is unknown in the kafka cluster", KafkaErrorType::kUnknownTopicError
};
auto const kGeneralError = KafkaErrorTemplate {
"unexpected kafka error occurred", KafkaErrorType::kGeneralError
};
}
}
#endif //ASAPO_KAFKA_ERROR_H
......@@ -24,6 +24,13 @@ Error JsonParser::GetArrayString(const std::string& name, std::vector<std::strin
return rapid_json_->GetArrayString(name, val);
}
Error JsonParser::GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept {
return rapid_json_->GetArrayObjectMembers(name, val);
}
Error JsonParser::GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept {
return rapid_json_->GetDictionaryString(name, val);
}
Error JsonParser::GetBool(const std::string& name, bool* val) const noexcept {
return rapid_json_->GetBool(name, val);
......
......@@ -168,6 +168,40 @@ Error RapidJson::GetArrayString(const std::string& name, std::vector<std::string
}
Error RapidJson::GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept {
Value* json_val;
if (Error err = GetValuePointer(name, ValueType::kObject, &json_val)) {
return err;
}
val->clear();
for (auto& m : json_val->GetObject()) {
if (!m.name.IsString()) {
return GeneralErrorTemplates::kSimpleError.Generate("wrong type of object element: " + name);
}
val->push_back(m.name.GetString());
}
return nullptr;
}
Error RapidJson::GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept {
Value* json_val;
if (Error err = GetValuePointer(name, ValueType::kObject, &json_val)) {
return err;
}
val->clear();
for (auto& m : json_val->GetObject()) {
if (!m.value.IsString() || !m.name.IsString()) {
return GeneralErrorTemplates::kSimpleError.Generate("wrong type of dictionary element: " + name);
}
(*val)[m.name.GetString()] = m.value.GetString();
}
return nullptr;
}
RapidJson::RapidJson(const RapidJson& parent, const std::string& subname) {
auto err = parent.GetValuePointer(subname, ValueType::kObject, &object_p_);
if (err) {
......
......@@ -4,6 +4,7 @@
#include "rapidjson/document.h"
#include "asapo/common/error.h"
#include "asapo/io/io.h"
#include <map>
namespace asapo {
......@@ -26,6 +27,8 @@ class RapidJson {
Error GetString(const std::string& name, std::string* val) const noexcept;
Error GetArrayUInt64(const std::string& name, std::vector<uint64_t>* val) const noexcept;
Error GetArrayString(const std::string& name, std::vector<std::string>* val) const noexcept;
Error GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept;
Error GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept;
Error GetArrayRawStrings(const std::string& name, std::vector<std::string>* val) const noexcept;
Error GetRawString(std::string* val) const noexcept;
Error GetFlattenedString(const std::string& prefix, const std::string& separator, std::string* val)const noexcept;
......
set(TARGET_NAME rdkafka_client)
set(SOURCE_FILES
rdkafka_client.cpp
kafka_client_factory.cpp ../../include/asapo/preprocessor/definitions.h)
################################
# Library
################################
add_library(${TARGET_NAME} OBJECT ${SOURCE_FILES})
IF(WIN32)
target_compile_definitions(${TARGET_NAME} PUBLIC -DRDKAFKA_STATICLIB)
ENDIF()
target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR} ${RDKAFKA_INCLUDE_DIRS})
#include "rdkafka_client.h"
namespace asapo {
KafkaClient* instance = nullptr;
Error InitializeKafkaClient(const KafkaClientConfig& config) {
if (instance != nullptr) {
return KafkaErrorTemplates::kGeneralError.Generate("Kafka client already initialized");
}
try {
instance = new RdKafkaClient(config);
}
catch (std::string err) {
return KafkaErrorTemplates::kGeneralError.Generate(err);
}
return nullptr;
}
std::unique_ptr<KafkaClient> GetKafkaClient() {
if (instance != nullptr) {
return std::unique_ptr<KafkaClient> {instance};
} else {
return nullptr;
}
}
}
#include "rdkafka_client.h"
#include <cstring>
#include "asapo/common/data_structs.h"
#include "asapo/io/io_factory.h"
namespace asapo {
RdKafkaClient::RdKafkaClient(const KafkaClientConfig& config) : defaultTopicConf(nullptr) {
std::string err;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
for (const auto& configItem : config.global_config) {
if (conf->set(configItem.first, configItem.second, err) != RdKafka::Conf::CONF_OK) {
throw "cannot initialize kafka: " + err;
}
}
producer = RdKafka::Producer::create(conf, err);
if (!producer) {
throw "cannot initialize kafka";
}
for (const auto& topic : config.topics_config) {
auto topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
for (const auto& configItem : topic.second) {
if (topicConfig->set(configItem.first, configItem.second, err) != RdKafka::Conf::CONF_OK) {
throw "cannot initialize kafka: " + err;
}
}
if (topic.first == "default") {
this->defaultTopicConf = topicConfig;
} else
{
auto topicObj = RdKafka::Topic::create(producer, topic.first, topicConfig, err);
if (!topicObj) {
throw "cannot initialize kafka topic [" + topic.first + "]: " + err;
}
this->kafkaTopics[topic.first] = topicObj;
}
}
}
RdKafkaClient::~RdKafkaClient() {
if (producer) {
producer->flush(1000);
}
delete producer;
}
Error RdKafkaClient::Send(const std::string& data, const std::string& topicName) noexcept {
auto topicIt = this->kafkaTopics.find(topicName);
RdKafka::Topic* topic;
if (topicIt == this->kafkaTopics.end())
{
if (!defaultTopicConf) {
return KafkaErrorTemplates::kUnknownTopicError.Generate();
}
std::string err;
topic = RdKafka::Topic::create(producer, topicName, this->defaultTopicConf, err);
if (!topic) {
return KafkaErrorTemplates::kGeneralError.Generate("Cannot create kafka topic [" + topicName + "]: " + err);
}
this->kafkaTopics[topicName] = topic;
}
else
{
topic = topicIt->second;
}
auto err = producer->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<void*>(static_cast<const void *>(data.data())), data.size(),
nullptr, nullptr);
switch (err) {
case RdKafka::ERR_NO_ERROR: return nullptr;
case RdKafka::ERR__QUEUE_FULL: return KafkaErrorTemplates::kQueueFullError.Generate();
case RdKafka::ERR_MSG_SIZE_TOO_LARGE: return KafkaErrorTemplates::kMessageTooLargeError.Generate();
case RdKafka::ERR__UNKNOWN_PARTITION: return KafkaErrorTemplates::kUnknownPartitionError.Generate();
case RdKafka::ERR__UNKNOWN_TOPIC: return KafkaErrorTemplates::kUnknownTopicError.Generate();
default: return KafkaErrorTemplates::kGeneralError.Generate(err2str(err));
}
}
}
#ifndef ASAPO_RDKAFKA_CLIENT_H
#define ASAPO_RDKAFKA_CLIENT_H
#include <string>
#include "asapo/kafka_client/kafka_client.h"
#include "librdkafka/rdkafkacpp.h"
namespace asapo {
class RdKafkaClient final : public KafkaClient {
public:
RdKafkaClient(const KafkaClientConfig& config);
Error Send(const std::string& data, const std::string& topic) noexcept override;
virtual ~RdKafkaClient();
private:
RdKafka::Producer* producer;
RdKafka::Conf* defaultTopicConf;
std::map<std::string, RdKafka::Topic *> kafkaTopics;
};
}
#endif //ASAPO_RDKAFKA_CLIENT_H
......@@ -24,5 +24,11 @@
},
"Tag": "receiver",
"ReceiveToDiskThresholdMB": {{ .Values.ownServices.receiver.receiveToDiskThresholdMB }},
"LogLevel": "info"
"LogLevel": "info",
"KafkaClient": {
"metadata.broker.list": {{ .Values.ownServices.receiver.kafkaBrokerList }}
},
"KafkaTopics": {
"asapo": {}
}
}
......@@ -33,6 +33,7 @@ ownServices:
enable: true
sizeGb: 1
receiveToDiskThresholdMB: 200
kafkaBrokerList: asap3-events-01,asap3-events-02
fileTransfer:
serviceName: asapo-file-transfer
replicaCount: 1
......
......@@ -26,6 +26,7 @@ set(RECEIVER_CORE_FILES
src/request_handler/request_handler_db_check_request.cpp
src/request_handler/request_handler_db_delete_stream.cpp
src/request_handler/request_handler_db_get_meta.cpp
src/request_handler/request_handler_kafka_notify.cpp
src/request_handler/request_factory.cpp
src/request_handler/request_handler_db.cpp
src/request_handler/file_processors/write_file_processor.cpp
......@@ -59,11 +60,11 @@ set(SOURCE_FILES
GET_PROPERTY(ASAPO_COMMON_FABRIC_LIBRARIES GLOBAL PROPERTY ASAPO_COMMON_FABRIC_LIBRARIES)
add_library(${TARGET_NAME} STATIC ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> $<TARGET_OBJECTS:curl_http_client>
$<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:request_pool>)
$<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:request_pool> $<TARGET_OBJECTS:rdkafka_client>)
set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX)
target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR})
target_include_directories(${TARGET_NAME} SYSTEM PUBLIC ${LIBFABRIC_INCLUDE_DIR})
target_link_libraries(${TARGET_NAME} CURL::libcurl ${CMAKE_THREAD_LIBS_INIT} database
target_link_libraries(${TARGET_NAME} CURL::libcurl rdkafka++ ${CMAKE_THREAD_LIBS_INIT} database
asapo-fabric ${ASAPO_COMMON_FABRIC_LIBRARIES})
......
......@@ -6,6 +6,7 @@
#include "receiver_data_server/receiver_data_server_logger.h"
#include "asapo/common/internal/version.h"
#include "asapo/kafka_client/kafka_client.h"
#include "receiver_data_server/receiver_data_server.h"
#include "receiver_data_server/net_server/rds_tcp_server.h"
......@@ -134,6 +135,18 @@ int main(int argc, char* argv[]) {
}
auto metrics_thread = StartMetricsServer(config->metrics, logger);
if (!config->kafka_config.global_config.empty()) {
err = asapo::InitializeKafkaClient(config->kafka_config);
if (err) {
logger->Error("Error initializing kafka client: " + err->Explain());
logger->Error("Kafka notification disabled");
}
}
else {
logger->Info("No kafka config provided. Kafka notification disabled.");
}
auto exit_code = StartReceiver(config, cache, logger);
// todo: implement graceful exit, currently it never reaches this point
return exit_code;
......
......@@ -17,6 +17,8 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) {
std::string log_level;
Error err;
std::vector<std::string> kafkaTopics;
(err = parser.GetString("PerformanceDbServer", &config.performance_db_uri)) ||
(err = parser.GetBool("MonitorPerformance", &config.monitor_performance)) ||
(err = parser.GetUInt64("ListenPort", &config.listen_port)) ||
......@@ -42,6 +44,26 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) {
return err;
}
(err = parser.GetDictionaryString("KafkaClient", &config.kafka_config.global_config)) ||
(err = parser.GetArrayObjectMembers("KafkaTopics", &kafkaTopics));
if (!err) {
for(const auto& topic : kafkaTopics) {
auto topicConfig = config.kafka_config.topics_config[topic];
err = parser.Embedded("KafkaTopics").GetDictionaryString(topic, &topicConfig);
if (err) {
break;
}
}
}
if (err) {
// ignore kafka config error. Just disable it.
config.kafka_config.global_config.clear();
config.kafka_config.topics_config.clear();
}
config.dataserver.tag = config.tag + "_ds";
config.log_level = StringToLogLevel(log_level, &err);
......
......@@ -7,6 +7,7 @@
#include "receiver_data_server/receiver_data_server_config.h"
#include "metrics/receiver_metrics_config.h"
#include "asapo/kafka_client/kafka_config.h"
namespace asapo {
......@@ -27,6 +28,7 @@ struct ReceiverConfig {
ReceiverDataServerConfig dataserver;
ReceiverMetricsConfig metrics;
std::string discovery_server;
KafkaClientConfig kafka_config;
};
class ReceiverConfigManager {
......
......@@ -32,6 +32,7 @@ void RequestFactory::AddReceiveViaBufferHandlers(std::unique_ptr<Request>& reque
request->AddHandler(&request_handler_receivedata_);
if (NeedFileWriteHandler(request_header)) {
request->AddHandler(&request_handler_filewrite_);
request->AddHandler(&request_handler_kafka_notify_);
}
}
......@@ -42,6 +43,7 @@ Error RequestFactory::AddReceiveDirectToFileHandler(std::unique_ptr<Request>& re
"ingest mode should include kStoreInFilesystem for large files ");
}
request->AddHandler(&request_handler_filereceive_);
request->AddHandler(&request_handler_kafka_notify_);
return nullptr;
}
......
......@@ -8,6 +8,7 @@
#include "request_handler_db_last_stream.h"
#include "request_handler_db_delete_stream.h"
#include "request_handler_db_get_meta.h"
#include "request_handler_kafka_notify.h"
#include "request_handler_file_process.h"
#include "request_handler_db_write.h"
......@@ -45,6 +46,7 @@ class RequestFactory {
RequestHandlerInitialAuthorization request_handler_initial_authorize_{&shared_auth_cache_};
RequestHandlerSecondaryAuthorization request_handler_secondary_authorize_{&shared_auth_cache_};
RequestHandlerDbCheckRequest request_handler_db_check_{kDBDataCollectionNamePrefix};
RequestHandlerKafkaNotify request_handler_kafka_notify_;
SharedCache cache_;
AuthorizationData shared_auth_cache_;
bool ReceiveDirectToFile(const GenericRequestHeader& request_header) const;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment