diff --git a/common/cpp/CMakeLists.txt b/common/cpp/CMakeLists.txt index 8f90ff257a0db963ffc133518e2768c020975fc7..c55ea7c5316671bfda98130a3fa8383517d5a579 100644 --- a/common/cpp/CMakeLists.txt +++ b/common/cpp/CMakeLists.txt @@ -8,6 +8,7 @@ add_subdirectory(src/data_structs) add_subdirectory(src/version) +add_subdirectory(src/kafka_client) add_subdirectory(src/http_client) diff --git a/common/cpp/include/asapo/json_parser/json_parser.h b/common/cpp/include/asapo/json_parser/json_parser.h index b8bd14f69bca16c77058354b965c6a5b4b4c84cc..d62cc8c89d8259d85cb78ecfc7ba2adda58a0fd6 100644 --- a/common/cpp/include/asapo/json_parser/json_parser.h +++ b/common/cpp/include/asapo/json_parser/json_parser.h @@ -4,6 +4,7 @@ #include <string> #include <memory> #include <vector> +#include <map> #include <string> #include "asapo/common/error.h" @@ -20,6 +21,8 @@ class JsonParser { Error GetString(const std::string& name, std::string* val) const noexcept; Error GetArrayUInt64(const std::string& name, std::vector<uint64_t>* val) const noexcept; Error GetArrayString(const std::string& name, std::vector<std::string>* val) const noexcept; + Error GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept; + Error GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept; Error GetArrayRawStrings(const std::string& name, std::vector<std::string>* val) const noexcept; Error GetRawString(std::string* val) const noexcept; Error GetFlattenedString(const std::string& prefix, const std::string& separator, std::string* val) const noexcept; diff --git a/common/cpp/include/asapo/kafka_client/kafka_client.h b/common/cpp/include/asapo/kafka_client/kafka_client.h new file mode 100644 index 0000000000000000000000000000000000000000..64e0335278625709342ca5ffd50b05f3a08dcfc3 --- /dev/null +++ b/common/cpp/include/asapo/kafka_client/kafka_client.h @@ -0,0 +1,24 @@ +#ifndef ASAPO_KAFKA_CLIENT_H +#define ASAPO_KAFKA_CLIENT_H + +#include "asapo/common/error.h" +#include "asapo/common/data_structs.h" +#include "asapo/kafka_client/kafka_config.h" +#include "asapo/kafka_client/kafka_error.h" +#include <map> + +namespace asapo { + +class KafkaClient { + public: + virtual Error Send(const std::string& data, + const std::string& topic) noexcept = 0; + virtual ~KafkaClient() = default; +}; + +Error InitializeKafkaClient(const KafkaClientConfig& config); +std::unique_ptr<KafkaClient> GetKafkaClient(); + +} + +#endif //ASAPO_KAFKA_CLIENT_H diff --git a/common/cpp/include/asapo/kafka_client/kafka_config.h b/common/cpp/include/asapo/kafka_client/kafka_config.h new file mode 100644 index 0000000000000000000000000000000000000000..81f707bab9973b5445ce5318c728ff3905c4f517 --- /dev/null +++ b/common/cpp/include/asapo/kafka_client/kafka_config.h @@ -0,0 +1,16 @@ +#ifndef ASAPO_KAFKA_CONFIG_H +#define ASAPO_KAFKA_CONFIG_H + +#include <string> +#include <map> + +namespace asapo { + + struct KafkaClientConfig { + std::map<std::string, std::string> global_config; + std::map<std::string, std::map<std::string, std::string>> topics_config; + }; + +} + +#endif //ASAPO_KAFKA_CONFIG_H diff --git a/common/cpp/include/asapo/kafka_client/kafka_error.h b/common/cpp/include/asapo/kafka_client/kafka_error.h new file mode 100644 index 0000000000000000000000000000000000000000..3ef69183d3b25e77ba6f0a4a0cee64f9721f60ae --- /dev/null +++ b/common/cpp/include/asapo/kafka_client/kafka_error.h @@ -0,0 +1,43 @@ +#ifndef ASAPO_KAFKA_ERROR_H +#define ASAPO_KAFKA_ERROR_H + +#include "asapo/common/error.h" + +namespace asapo { + + enum class KafkaErrorType { + kQueueFullError, + kMessageTooLargeError, + kUnknownPartitionError, + kUnknownTopicError, + kGeneralError + }; + + using KafkaErrorTemplate = ServiceErrorTemplate<KafkaErrorType>; + + namespace KafkaErrorTemplates { + + auto const kQueueFullError = KafkaErrorTemplate { + "kafka queue is full", KafkaErrorType::kQueueFullError + }; + + auto const kMessageTooLargeError = KafkaErrorTemplate { + "kafka message is too large", KafkaErrorType::kMessageTooLargeError + }; + + auto const kUnknownPartitionError = KafkaErrorTemplate { + "partition is unknown in the kafka cluster", KafkaErrorType::kUnknownPartitionError + }; + + auto const kUnknownTopicError = KafkaErrorTemplate { + "partition is unknown in the kafka cluster", KafkaErrorType::kUnknownTopicError + }; + + auto const kGeneralError = KafkaErrorTemplate { + "unexpected kafka error occurred", KafkaErrorType::kGeneralError + }; + } + +} + +#endif //ASAPO_KAFKA_ERROR_H diff --git a/common/cpp/src/json_parser/json_parser.cpp b/common/cpp/src/json_parser/json_parser.cpp index b25cb6b4a9d1f96226e60b4f5ccfe5fdfaf1afe3..32457c07ab127c4606e8be1ae7f54cb57a9f1083 100644 --- a/common/cpp/src/json_parser/json_parser.cpp +++ b/common/cpp/src/json_parser/json_parser.cpp @@ -24,6 +24,13 @@ Error JsonParser::GetArrayString(const std::string& name, std::vector<std::strin return rapid_json_->GetArrayString(name, val); } +Error JsonParser::GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept { + return rapid_json_->GetArrayObjectMembers(name, val); +} + +Error JsonParser::GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept { + return rapid_json_->GetDictionaryString(name, val); +} Error JsonParser::GetBool(const std::string& name, bool* val) const noexcept { return rapid_json_->GetBool(name, val); diff --git a/common/cpp/src/json_parser/rapid_json.cpp b/common/cpp/src/json_parser/rapid_json.cpp index a9883d134374785d5c801c5a33356cd7951c32fd..7dba5ea22cc247c0b07bff9a2d2757324757be97 100644 --- a/common/cpp/src/json_parser/rapid_json.cpp +++ b/common/cpp/src/json_parser/rapid_json.cpp @@ -168,6 +168,40 @@ Error RapidJson::GetArrayString(const std::string& name, std::vector<std::string } +Error RapidJson::GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept { + Value* json_val; + if (Error err = GetValuePointer(name, ValueType::kObject, &json_val)) { + return err; + } + + val->clear(); + for (auto& m : json_val->GetObject()) { + if (!m.name.IsString()) { + return GeneralErrorTemplates::kSimpleError.Generate("wrong type of object element: " + name); + } + val->push_back(m.name.GetString()); + } + return nullptr; + +} + +Error RapidJson::GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept { + Value* json_val; + if (Error err = GetValuePointer(name, ValueType::kObject, &json_val)) { + return err; + } + + val->clear(); + for (auto& m : json_val->GetObject()) { + if (!m.value.IsString() || !m.name.IsString()) { + return GeneralErrorTemplates::kSimpleError.Generate("wrong type of dictionary element: " + name); + } + (*val)[m.name.GetString()] = m.value.GetString(); + } + return nullptr; + +} + RapidJson::RapidJson(const RapidJson& parent, const std::string& subname) { auto err = parent.GetValuePointer(subname, ValueType::kObject, &object_p_); if (err) { diff --git a/common/cpp/src/json_parser/rapid_json.h b/common/cpp/src/json_parser/rapid_json.h index b4cbc88711dfcc0f8e7ae6f21257277b9c10469e..477d6e4ee24ef6c7186ef7b0496c8edc786e10c7 100644 --- a/common/cpp/src/json_parser/rapid_json.h +++ b/common/cpp/src/json_parser/rapid_json.h @@ -4,6 +4,7 @@ #include "rapidjson/document.h" #include "asapo/common/error.h" #include "asapo/io/io.h" +#include <map> namespace asapo { @@ -26,6 +27,8 @@ class RapidJson { Error GetString(const std::string& name, std::string* val) const noexcept; Error GetArrayUInt64(const std::string& name, std::vector<uint64_t>* val) const noexcept; Error GetArrayString(const std::string& name, std::vector<std::string>* val) const noexcept; + Error GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept; + Error GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept; Error GetArrayRawStrings(const std::string& name, std::vector<std::string>* val) const noexcept; Error GetRawString(std::string* val) const noexcept; Error GetFlattenedString(const std::string& prefix, const std::string& separator, std::string* val)const noexcept; diff --git a/common/cpp/src/kafka_client/CMakeLists.txt b/common/cpp/src/kafka_client/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..64b5d004fe3f052b9764752240d05e46c9a7b844 --- /dev/null +++ b/common/cpp/src/kafka_client/CMakeLists.txt @@ -0,0 +1,16 @@ +set(TARGET_NAME rdkafka_client) +set(SOURCE_FILES + rdkafka_client.cpp + kafka_client_factory.cpp ../../include/asapo/preprocessor/definitions.h) + + +################################ +# Library +################################ +add_library(${TARGET_NAME} OBJECT ${SOURCE_FILES}) +IF(WIN32) + target_compile_definitions(${TARGET_NAME} PUBLIC -DRDKAFKA_STATICLIB) +ENDIF() +target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR} ${RDKAFKA_INCLUDE_DIRS}) + + diff --git a/common/cpp/src/kafka_client/kafka_client_factory.cpp b/common/cpp/src/kafka_client/kafka_client_factory.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8f3526bfbc15b88b79caaac1c27e7a2a25bc85a5 --- /dev/null +++ b/common/cpp/src/kafka_client/kafka_client_factory.cpp @@ -0,0 +1,29 @@ +#include "rdkafka_client.h" + +namespace asapo { + +KafkaClient* instance = nullptr; + +Error InitializeKafkaClient(const KafkaClientConfig& config) { + if (instance != nullptr) { + return KafkaErrorTemplates::kGeneralError.Generate("Kafka client already initialized"); + } + + try { + instance = new RdKafkaClient(config); + } + catch (std::string err) { + return KafkaErrorTemplates::kGeneralError.Generate(err); + } + + return nullptr; +} + +std::unique_ptr<KafkaClient> GetKafkaClient() { + if (instance != nullptr) { + return std::unique_ptr<KafkaClient> {instance}; + } else { + return nullptr; + } +} +} diff --git a/common/cpp/src/kafka_client/rdkafka_client.cpp b/common/cpp/src/kafka_client/rdkafka_client.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0aabc0de01a1d6d592a4257ce8796d004d2ed3a1 --- /dev/null +++ b/common/cpp/src/kafka_client/rdkafka_client.cpp @@ -0,0 +1,87 @@ +#include "rdkafka_client.h" + +#include <cstring> +#include "asapo/common/data_structs.h" +#include "asapo/io/io_factory.h" + +namespace asapo { + +RdKafkaClient::RdKafkaClient(const KafkaClientConfig& config) : defaultTopicConf(nullptr) { + std::string err; + RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + for (const auto& configItem : config.global_config) { + if (conf->set(configItem.first, configItem.second, err) != RdKafka::Conf::CONF_OK) { + throw "cannot initialize kafka: " + err; + } + } + + producer = RdKafka::Producer::create(conf, err); + if (!producer) { + throw "cannot initialize kafka"; + } + + for (const auto& topic : config.topics_config) { + auto topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); + for (const auto& configItem : topic.second) { + if (topicConfig->set(configItem.first, configItem.second, err) != RdKafka::Conf::CONF_OK) { + throw "cannot initialize kafka: " + err; + } + } + if (topic.first == "default") { + this->defaultTopicConf = topicConfig; + } else + { + auto topicObj = RdKafka::Topic::create(producer, topic.first, topicConfig, err); + if (!topicObj) { + throw "cannot initialize kafka topic [" + topic.first + "]: " + err; + + } + this->kafkaTopics[topic.first] = topicObj; + } + } +} + +RdKafkaClient::~RdKafkaClient() { + if (producer) { + producer->flush(1000); + } + delete producer; +} + +Error RdKafkaClient::Send(const std::string& data, const std::string& topicName) noexcept { + auto topicIt = this->kafkaTopics.find(topicName); + RdKafka::Topic* topic; + if (topicIt == this->kafkaTopics.end()) + { + if (!defaultTopicConf) { + return KafkaErrorTemplates::kUnknownTopicError.Generate(); + } + std::string err; + topic = RdKafka::Topic::create(producer, topicName, this->defaultTopicConf, err); + if (!topic) { + return KafkaErrorTemplates::kGeneralError.Generate("Cannot create kafka topic [" + topicName + "]: " + err); + } + this->kafkaTopics[topicName] = topic; + } + else + { + topic = topicIt->second; + } + + auto err = producer->produce(topic, RdKafka::Topic::PARTITION_UA, + RdKafka::Producer::RK_MSG_COPY, + const_cast<void*>(static_cast<const void *>(data.data())), data.size(), + nullptr, nullptr); + + switch (err) { + case RdKafka::ERR_NO_ERROR: return nullptr; + case RdKafka::ERR__QUEUE_FULL: return KafkaErrorTemplates::kQueueFullError.Generate(); + case RdKafka::ERR_MSG_SIZE_TOO_LARGE: return KafkaErrorTemplates::kMessageTooLargeError.Generate(); + case RdKafka::ERR__UNKNOWN_PARTITION: return KafkaErrorTemplates::kUnknownPartitionError.Generate(); + case RdKafka::ERR__UNKNOWN_TOPIC: return KafkaErrorTemplates::kUnknownTopicError.Generate(); + default: return KafkaErrorTemplates::kGeneralError.Generate(err2str(err)); + } + +} + +} diff --git a/common/cpp/src/kafka_client/rdkafka_client.h b/common/cpp/src/kafka_client/rdkafka_client.h new file mode 100644 index 0000000000000000000000000000000000000000..fcfa163c3356972ff573c8deab3363f1d6020351 --- /dev/null +++ b/common/cpp/src/kafka_client/rdkafka_client.h @@ -0,0 +1,25 @@ +#ifndef ASAPO_RDKAFKA_CLIENT_H +#define ASAPO_RDKAFKA_CLIENT_H + +#include <string> + +#include "asapo/kafka_client/kafka_client.h" +#include "librdkafka/rdkafkacpp.h" + +namespace asapo { + +class RdKafkaClient final : public KafkaClient { + public: + RdKafkaClient(const KafkaClientConfig& config); + Error Send(const std::string& data, const std::string& topic) noexcept override; + + virtual ~RdKafkaClient(); + private: + RdKafka::Producer* producer; + RdKafka::Conf* defaultTopicConf; + std::map<std::string, RdKafka::Topic *> kafkaTopics; +}; + +} + +#endif //ASAPO_RDKAFKA_CLIENT_H diff --git a/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json b/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json index c32c6703ad90abc586f4c47c4e3a758a56417dd8..088e46b5dc343c8abc8b1d43e0287c76c8cce991 100644 --- a/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json +++ b/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json @@ -24,5 +24,11 @@ }, "Tag": "receiver", "ReceiveToDiskThresholdMB": {{ .Values.ownServices.receiver.receiveToDiskThresholdMB }}, - "LogLevel": "info" + "LogLevel": "info", + "KafkaClient": { + "metadata.broker.list": {{ .Values.ownServices.receiver.kafkaBrokerList }} + }, + "KafkaTopics": { + "asapo": {} + } } diff --git a/deploy/asapo_helm_chart/asapo/values.yaml b/deploy/asapo_helm_chart/asapo/values.yaml index beccb7c2193ae10139e8325d3aa9ac175901bea3..946bc9394d2c4f9b089aa4a6124e15ec25fc00e8 100644 --- a/deploy/asapo_helm_chart/asapo/values.yaml +++ b/deploy/asapo_helm_chart/asapo/values.yaml @@ -33,6 +33,7 @@ ownServices: enable: true sizeGb: 1 receiveToDiskThresholdMB: 200 + kafkaBrokerList: asap3-events-01,asap3-events-02 fileTransfer: serviceName: asapo-file-transfer replicaCount: 1 diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index 836a04f9b650155c7c41b7620838fda23f783ea1..02c8dbd789890c8c4576a3ac97410a8386389b79 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -26,6 +26,7 @@ set(RECEIVER_CORE_FILES src/request_handler/request_handler_db_check_request.cpp src/request_handler/request_handler_db_delete_stream.cpp src/request_handler/request_handler_db_get_meta.cpp + src/request_handler/request_handler_kafka_notify.cpp src/request_handler/request_factory.cpp src/request_handler/request_handler_db.cpp src/request_handler/file_processors/write_file_processor.cpp @@ -59,11 +60,11 @@ set(SOURCE_FILES GET_PROPERTY(ASAPO_COMMON_FABRIC_LIBRARIES GLOBAL PROPERTY ASAPO_COMMON_FABRIC_LIBRARIES) add_library(${TARGET_NAME} STATIC ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> $<TARGET_OBJECTS:curl_http_client> - $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:request_pool>) + $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:request_pool> $<TARGET_OBJECTS:rdkafka_client>) set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX) target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR}) target_include_directories(${TARGET_NAME} SYSTEM PUBLIC ${LIBFABRIC_INCLUDE_DIR}) -target_link_libraries(${TARGET_NAME} CURL::libcurl ${CMAKE_THREAD_LIBS_INIT} database +target_link_libraries(${TARGET_NAME} CURL::libcurl rdkafka++ ${CMAKE_THREAD_LIBS_INIT} database asapo-fabric ${ASAPO_COMMON_FABRIC_LIBRARIES}) diff --git a/receiver/src/main.cpp b/receiver/src/main.cpp index 9f818218dcf240c04648775c1e83e2c5764645d8..292a34f261b7b1c52599162fa16d85ccf41f6d49 100644 --- a/receiver/src/main.cpp +++ b/receiver/src/main.cpp @@ -6,6 +6,7 @@ #include "receiver_data_server/receiver_data_server_logger.h" #include "asapo/common/internal/version.h" +#include "asapo/kafka_client/kafka_client.h" #include "receiver_data_server/receiver_data_server.h" #include "receiver_data_server/net_server/rds_tcp_server.h" @@ -134,6 +135,18 @@ int main(int argc, char* argv[]) { } auto metrics_thread = StartMetricsServer(config->metrics, logger); + + if (!config->kafka_config.global_config.empty()) { + err = asapo::InitializeKafkaClient(config->kafka_config); + if (err) { + logger->Error("Error initializing kafka client: " + err->Explain()); + logger->Error("Kafka notification disabled"); + } + } + else { + logger->Info("No kafka config provided. Kafka notification disabled."); + } + auto exit_code = StartReceiver(config, cache, logger); // todo: implement graceful exit, currently it never reaches this point return exit_code; diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp index c6cc7846083c669c072bbb9d6d9eacdf0f4d0b54..80103255a1ad5d5d2ec728ce97541b2cdfec3a25 100644 --- a/receiver/src/receiver_config.cpp +++ b/receiver/src/receiver_config.cpp @@ -17,6 +17,8 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) { std::string log_level; Error err; + std::vector<std::string> kafkaTopics; + (err = parser.GetString("PerformanceDbServer", &config.performance_db_uri)) || (err = parser.GetBool("MonitorPerformance", &config.monitor_performance)) || (err = parser.GetUInt64("ListenPort", &config.listen_port)) || @@ -42,6 +44,26 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) { return err; } + (err = parser.GetDictionaryString("KafkaClient", &config.kafka_config.global_config)) || + (err = parser.GetArrayObjectMembers("KafkaTopics", &kafkaTopics)); + + if (!err) { + for(const auto& topic : kafkaTopics) { + auto topicConfig = config.kafka_config.topics_config[topic]; + err = parser.Embedded("KafkaTopics").GetDictionaryString(topic, &topicConfig); + if (err) { + break; + } + } + } + + if (err) { + // ignore kafka config error. Just disable it. + config.kafka_config.global_config.clear(); + config.kafka_config.topics_config.clear(); + } + + config.dataserver.tag = config.tag + "_ds"; config.log_level = StringToLogLevel(log_level, &err); diff --git a/receiver/src/receiver_config.h b/receiver/src/receiver_config.h index 0a054e2c1625cd7d41259b2e8d0f11007bd6f19a..d065ed2958921e21441a4bc28ef5c5217c7e3af5 100644 --- a/receiver/src/receiver_config.h +++ b/receiver/src/receiver_config.h @@ -7,6 +7,7 @@ #include "receiver_data_server/receiver_data_server_config.h" #include "metrics/receiver_metrics_config.h" +#include "asapo/kafka_client/kafka_config.h" namespace asapo { @@ -27,6 +28,7 @@ struct ReceiverConfig { ReceiverDataServerConfig dataserver; ReceiverMetricsConfig metrics; std::string discovery_server; + KafkaClientConfig kafka_config; }; class ReceiverConfigManager { diff --git a/receiver/src/request_handler/request_factory.cpp b/receiver/src/request_handler/request_factory.cpp index e9c5fdd8d5db8d4d35e041fab1f013bae5a9c799..6405f82b30b4a7970f9a892406f486da96806006 100644 --- a/receiver/src/request_handler/request_factory.cpp +++ b/receiver/src/request_handler/request_factory.cpp @@ -32,6 +32,7 @@ void RequestFactory::AddReceiveViaBufferHandlers(std::unique_ptr<Request>& reque request->AddHandler(&request_handler_receivedata_); if (NeedFileWriteHandler(request_header)) { request->AddHandler(&request_handler_filewrite_); + request->AddHandler(&request_handler_kafka_notify_); } } @@ -42,6 +43,7 @@ Error RequestFactory::AddReceiveDirectToFileHandler(std::unique_ptr<Request>& re "ingest mode should include kStoreInFilesystem for large files "); } request->AddHandler(&request_handler_filereceive_); + request->AddHandler(&request_handler_kafka_notify_); return nullptr; } diff --git a/receiver/src/request_handler/request_factory.h b/receiver/src/request_handler/request_factory.h index 04f871cdaa311cbf024e02122e27d3fab044c997..c298c3c2bbc704f09224b31e371ab637655aeaac 100644 --- a/receiver/src/request_handler/request_factory.h +++ b/receiver/src/request_handler/request_factory.h @@ -8,6 +8,7 @@ #include "request_handler_db_last_stream.h" #include "request_handler_db_delete_stream.h" #include "request_handler_db_get_meta.h" +#include "request_handler_kafka_notify.h" #include "request_handler_file_process.h" #include "request_handler_db_write.h" @@ -45,6 +46,7 @@ class RequestFactory { RequestHandlerInitialAuthorization request_handler_initial_authorize_{&shared_auth_cache_}; RequestHandlerSecondaryAuthorization request_handler_secondary_authorize_{&shared_auth_cache_}; RequestHandlerDbCheckRequest request_handler_db_check_{kDBDataCollectionNamePrefix}; + RequestHandlerKafkaNotify request_handler_kafka_notify_; SharedCache cache_; AuthorizationData shared_auth_cache_; bool ReceiveDirectToFile(const GenericRequestHeader& request_header) const; diff --git a/receiver/src/request_handler/request_handler_kafka_notify.cpp b/receiver/src/request_handler/request_handler_kafka_notify.cpp new file mode 100644 index 0000000000000000000000000000000000000000..43aef92d111cb58b3ad1e79de1f6b0bc3efd9ff8 --- /dev/null +++ b/receiver/src/request_handler/request_handler_kafka_notify.cpp @@ -0,0 +1,26 @@ +#include "request_handler_kafka_notify.h" +#include "../request.h" + +namespace asapo { + +Error RequestHandlerKafkaNotify::ProcessRequest(Request* request) const { + if (!kafka_client_) { + //client was not initialized, ignore + return nullptr; + } + + std::string message = "{" + "\"event\":\"IN_CLOSE_WRITE\"," + "\"path\":\"" + request->GetFileName() + "\"" + "}"; + + return kafka_client_->Send(message, "asapo"); +} + +StatisticEntity RequestHandlerKafkaNotify::GetStatisticEntity() const { + return StatisticEntity::kNetwork; +} + +RequestHandlerKafkaNotify::RequestHandlerKafkaNotify() : kafka_client_{GetKafkaClient()} { +} +} \ No newline at end of file diff --git a/receiver/src/request_handler/request_handler_kafka_notify.h b/receiver/src/request_handler/request_handler_kafka_notify.h new file mode 100644 index 0000000000000000000000000000000000000000..1b165cd75b792f8dcbd8d9a3cd28ad684aa06463 --- /dev/null +++ b/receiver/src/request_handler/request_handler_kafka_notify.h @@ -0,0 +1,20 @@ +#ifndef ASAPO_REQUEST_HANDLER_KAFKA_NOTIFY_H +#define ASAPO_REQUEST_HANDLER_KAFKA_NOTIFY_H + +#include "request_handler.h" +#include "asapo/kafka_client/kafka_client.h" + +namespace asapo { + +class RequestHandlerKafkaNotify final : public ReceiverRequestHandler { + public: + RequestHandlerKafkaNotify(); + StatisticEntity GetStatisticEntity() const override; + Error ProcessRequest(Request* request) const override; + private: + std::unique_ptr<KafkaClient> kafka_client_; +}; + +} + +#endif //ASAPO_REQUEST_HANDLER_KAFKA_NOTIFY_H