From b83a71d4a48ab35f831df4e7448cf2ebce8fab3b Mon Sep 17 00:00:00 2001 From: George Sedov <george.sedov@desy.de> Date: Fri, 26 Nov 2021 16:22:00 +0100 Subject: [PATCH] Revert "Add rdkafka client" (wrong branch) This reverts commit e4f9d3e406be542294870e759832105aa44bf8d1. --- common/cpp/CMakeLists.txt | 1 - .../include/asapo/json_parser/json_parser.h | 3 - .../include/asapo/kafka_client/kafka_client.h | 24 ----- .../include/asapo/kafka_client/kafka_config.h | 16 ---- .../include/asapo/kafka_client/kafka_error.h | 43 --------- common/cpp/src/json_parser/json_parser.cpp | 7 -- common/cpp/src/json_parser/rapid_json.cpp | 34 -------- common/cpp/src/json_parser/rapid_json.h | 3 - common/cpp/src/kafka_client/CMakeLists.txt | 16 ---- .../src/kafka_client/kafka_client_factory.cpp | 29 ------- .../cpp/src/kafka_client/rdkafka_client.cpp | 87 ------------------- common/cpp/src/kafka_client/rdkafka_client.h | 25 ------ .../asapo/configs/asapo-receiver.json | 8 +- deploy/asapo_helm_chart/asapo/values.yaml | 1 - receiver/CMakeLists.txt | 5 +- receiver/src/main.cpp | 13 --- receiver/src/receiver_config.cpp | 22 ----- receiver/src/receiver_config.h | 2 - .../src/request_handler/request_factory.cpp | 2 - .../src/request_handler/request_factory.h | 2 - .../request_handler_kafka_notify.cpp | 26 ------ .../request_handler_kafka_notify.h | 20 ----- 22 files changed, 3 insertions(+), 386 deletions(-) delete mode 100644 common/cpp/include/asapo/kafka_client/kafka_client.h delete mode 100644 common/cpp/include/asapo/kafka_client/kafka_config.h delete mode 100644 common/cpp/include/asapo/kafka_client/kafka_error.h delete mode 100644 common/cpp/src/kafka_client/CMakeLists.txt delete mode 100644 common/cpp/src/kafka_client/kafka_client_factory.cpp delete mode 100644 common/cpp/src/kafka_client/rdkafka_client.cpp delete mode 100644 common/cpp/src/kafka_client/rdkafka_client.h delete mode 100644 receiver/src/request_handler/request_handler_kafka_notify.cpp delete mode 100644 receiver/src/request_handler/request_handler_kafka_notify.h diff --git a/common/cpp/CMakeLists.txt b/common/cpp/CMakeLists.txt index c55ea7c53..8f90ff257 100644 --- a/common/cpp/CMakeLists.txt +++ b/common/cpp/CMakeLists.txt @@ -8,7 +8,6 @@ add_subdirectory(src/data_structs) add_subdirectory(src/version) -add_subdirectory(src/kafka_client) add_subdirectory(src/http_client) diff --git a/common/cpp/include/asapo/json_parser/json_parser.h b/common/cpp/include/asapo/json_parser/json_parser.h index d62cc8c89..b8bd14f69 100644 --- a/common/cpp/include/asapo/json_parser/json_parser.h +++ b/common/cpp/include/asapo/json_parser/json_parser.h @@ -4,7 +4,6 @@ #include <string> #include <memory> #include <vector> -#include <map> #include <string> #include "asapo/common/error.h" @@ -21,8 +20,6 @@ class JsonParser { Error GetString(const std::string& name, std::string* val) const noexcept; Error GetArrayUInt64(const std::string& name, std::vector<uint64_t>* val) const noexcept; Error GetArrayString(const std::string& name, std::vector<std::string>* val) const noexcept; - Error GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept; - Error GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept; Error GetArrayRawStrings(const std::string& name, std::vector<std::string>* val) const noexcept; Error GetRawString(std::string* val) const noexcept; Error GetFlattenedString(const std::string& prefix, const std::string& separator, std::string* val) const noexcept; diff --git a/common/cpp/include/asapo/kafka_client/kafka_client.h b/common/cpp/include/asapo/kafka_client/kafka_client.h deleted file mode 100644 index 64e033527..000000000 --- a/common/cpp/include/asapo/kafka_client/kafka_client.h +++ /dev/null @@ -1,24 +0,0 @@ -#ifndef ASAPO_KAFKA_CLIENT_H -#define ASAPO_KAFKA_CLIENT_H - -#include "asapo/common/error.h" -#include "asapo/common/data_structs.h" -#include "asapo/kafka_client/kafka_config.h" -#include "asapo/kafka_client/kafka_error.h" -#include <map> - -namespace asapo { - -class KafkaClient { - public: - virtual Error Send(const std::string& data, - const std::string& topic) noexcept = 0; - virtual ~KafkaClient() = default; -}; - -Error InitializeKafkaClient(const KafkaClientConfig& config); -std::unique_ptr<KafkaClient> GetKafkaClient(); - -} - -#endif //ASAPO_KAFKA_CLIENT_H diff --git a/common/cpp/include/asapo/kafka_client/kafka_config.h b/common/cpp/include/asapo/kafka_client/kafka_config.h deleted file mode 100644 index 81f707bab..000000000 --- a/common/cpp/include/asapo/kafka_client/kafka_config.h +++ /dev/null @@ -1,16 +0,0 @@ -#ifndef ASAPO_KAFKA_CONFIG_H -#define ASAPO_KAFKA_CONFIG_H - -#include <string> -#include <map> - -namespace asapo { - - struct KafkaClientConfig { - std::map<std::string, std::string> global_config; - std::map<std::string, std::map<std::string, std::string>> topics_config; - }; - -} - -#endif //ASAPO_KAFKA_CONFIG_H diff --git a/common/cpp/include/asapo/kafka_client/kafka_error.h b/common/cpp/include/asapo/kafka_client/kafka_error.h deleted file mode 100644 index 3ef69183d..000000000 --- a/common/cpp/include/asapo/kafka_client/kafka_error.h +++ /dev/null @@ -1,43 +0,0 @@ -#ifndef ASAPO_KAFKA_ERROR_H -#define ASAPO_KAFKA_ERROR_H - -#include "asapo/common/error.h" - -namespace asapo { - - enum class KafkaErrorType { - kQueueFullError, - kMessageTooLargeError, - kUnknownPartitionError, - kUnknownTopicError, - kGeneralError - }; - - using KafkaErrorTemplate = ServiceErrorTemplate<KafkaErrorType>; - - namespace KafkaErrorTemplates { - - auto const kQueueFullError = KafkaErrorTemplate { - "kafka queue is full", KafkaErrorType::kQueueFullError - }; - - auto const kMessageTooLargeError = KafkaErrorTemplate { - "kafka message is too large", KafkaErrorType::kMessageTooLargeError - }; - - auto const kUnknownPartitionError = KafkaErrorTemplate { - "partition is unknown in the kafka cluster", KafkaErrorType::kUnknownPartitionError - }; - - auto const kUnknownTopicError = KafkaErrorTemplate { - "partition is unknown in the kafka cluster", KafkaErrorType::kUnknownTopicError - }; - - auto const kGeneralError = KafkaErrorTemplate { - "unexpected kafka error occurred", KafkaErrorType::kGeneralError - }; - } - -} - -#endif //ASAPO_KAFKA_ERROR_H diff --git a/common/cpp/src/json_parser/json_parser.cpp b/common/cpp/src/json_parser/json_parser.cpp index 32457c07a..b25cb6b4a 100644 --- a/common/cpp/src/json_parser/json_parser.cpp +++ b/common/cpp/src/json_parser/json_parser.cpp @@ -24,13 +24,6 @@ Error JsonParser::GetArrayString(const std::string& name, std::vector<std::strin return rapid_json_->GetArrayString(name, val); } -Error JsonParser::GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept { - return rapid_json_->GetArrayObjectMembers(name, val); -} - -Error JsonParser::GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept { - return rapid_json_->GetDictionaryString(name, val); -} Error JsonParser::GetBool(const std::string& name, bool* val) const noexcept { return rapid_json_->GetBool(name, val); diff --git a/common/cpp/src/json_parser/rapid_json.cpp b/common/cpp/src/json_parser/rapid_json.cpp index 7dba5ea22..a9883d134 100644 --- a/common/cpp/src/json_parser/rapid_json.cpp +++ b/common/cpp/src/json_parser/rapid_json.cpp @@ -168,40 +168,6 @@ Error RapidJson::GetArrayString(const std::string& name, std::vector<std::string } -Error RapidJson::GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept { - Value* json_val; - if (Error err = GetValuePointer(name, ValueType::kObject, &json_val)) { - return err; - } - - val->clear(); - for (auto& m : json_val->GetObject()) { - if (!m.name.IsString()) { - return GeneralErrorTemplates::kSimpleError.Generate("wrong type of object element: " + name); - } - val->push_back(m.name.GetString()); - } - return nullptr; - -} - -Error RapidJson::GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept { - Value* json_val; - if (Error err = GetValuePointer(name, ValueType::kObject, &json_val)) { - return err; - } - - val->clear(); - for (auto& m : json_val->GetObject()) { - if (!m.value.IsString() || !m.name.IsString()) { - return GeneralErrorTemplates::kSimpleError.Generate("wrong type of dictionary element: " + name); - } - (*val)[m.name.GetString()] = m.value.GetString(); - } - return nullptr; - -} - RapidJson::RapidJson(const RapidJson& parent, const std::string& subname) { auto err = parent.GetValuePointer(subname, ValueType::kObject, &object_p_); if (err) { diff --git a/common/cpp/src/json_parser/rapid_json.h b/common/cpp/src/json_parser/rapid_json.h index 477d6e4ee..b4cbc8871 100644 --- a/common/cpp/src/json_parser/rapid_json.h +++ b/common/cpp/src/json_parser/rapid_json.h @@ -4,7 +4,6 @@ #include "rapidjson/document.h" #include "asapo/common/error.h" #include "asapo/io/io.h" -#include <map> namespace asapo { @@ -27,8 +26,6 @@ class RapidJson { Error GetString(const std::string& name, std::string* val) const noexcept; Error GetArrayUInt64(const std::string& name, std::vector<uint64_t>* val) const noexcept; Error GetArrayString(const std::string& name, std::vector<std::string>* val) const noexcept; - Error GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept; - Error GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept; Error GetArrayRawStrings(const std::string& name, std::vector<std::string>* val) const noexcept; Error GetRawString(std::string* val) const noexcept; Error GetFlattenedString(const std::string& prefix, const std::string& separator, std::string* val)const noexcept; diff --git a/common/cpp/src/kafka_client/CMakeLists.txt b/common/cpp/src/kafka_client/CMakeLists.txt deleted file mode 100644 index 64b5d004f..000000000 --- a/common/cpp/src/kafka_client/CMakeLists.txt +++ /dev/null @@ -1,16 +0,0 @@ -set(TARGET_NAME rdkafka_client) -set(SOURCE_FILES - rdkafka_client.cpp - kafka_client_factory.cpp ../../include/asapo/preprocessor/definitions.h) - - -################################ -# Library -################################ -add_library(${TARGET_NAME} OBJECT ${SOURCE_FILES}) -IF(WIN32) - target_compile_definitions(${TARGET_NAME} PUBLIC -DRDKAFKA_STATICLIB) -ENDIF() -target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR} ${RDKAFKA_INCLUDE_DIRS}) - - diff --git a/common/cpp/src/kafka_client/kafka_client_factory.cpp b/common/cpp/src/kafka_client/kafka_client_factory.cpp deleted file mode 100644 index 8f3526bfb..000000000 --- a/common/cpp/src/kafka_client/kafka_client_factory.cpp +++ /dev/null @@ -1,29 +0,0 @@ -#include "rdkafka_client.h" - -namespace asapo { - -KafkaClient* instance = nullptr; - -Error InitializeKafkaClient(const KafkaClientConfig& config) { - if (instance != nullptr) { - return KafkaErrorTemplates::kGeneralError.Generate("Kafka client already initialized"); - } - - try { - instance = new RdKafkaClient(config); - } - catch (std::string err) { - return KafkaErrorTemplates::kGeneralError.Generate(err); - } - - return nullptr; -} - -std::unique_ptr<KafkaClient> GetKafkaClient() { - if (instance != nullptr) { - return std::unique_ptr<KafkaClient> {instance}; - } else { - return nullptr; - } -} -} diff --git a/common/cpp/src/kafka_client/rdkafka_client.cpp b/common/cpp/src/kafka_client/rdkafka_client.cpp deleted file mode 100644 index 0aabc0de0..000000000 --- a/common/cpp/src/kafka_client/rdkafka_client.cpp +++ /dev/null @@ -1,87 +0,0 @@ -#include "rdkafka_client.h" - -#include <cstring> -#include "asapo/common/data_structs.h" -#include "asapo/io/io_factory.h" - -namespace asapo { - -RdKafkaClient::RdKafkaClient(const KafkaClientConfig& config) : defaultTopicConf(nullptr) { - std::string err; - RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); - for (const auto& configItem : config.global_config) { - if (conf->set(configItem.first, configItem.second, err) != RdKafka::Conf::CONF_OK) { - throw "cannot initialize kafka: " + err; - } - } - - producer = RdKafka::Producer::create(conf, err); - if (!producer) { - throw "cannot initialize kafka"; - } - - for (const auto& topic : config.topics_config) { - auto topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); - for (const auto& configItem : topic.second) { - if (topicConfig->set(configItem.first, configItem.second, err) != RdKafka::Conf::CONF_OK) { - throw "cannot initialize kafka: " + err; - } - } - if (topic.first == "default") { - this->defaultTopicConf = topicConfig; - } else - { - auto topicObj = RdKafka::Topic::create(producer, topic.first, topicConfig, err); - if (!topicObj) { - throw "cannot initialize kafka topic [" + topic.first + "]: " + err; - - } - this->kafkaTopics[topic.first] = topicObj; - } - } -} - -RdKafkaClient::~RdKafkaClient() { - if (producer) { - producer->flush(1000); - } - delete producer; -} - -Error RdKafkaClient::Send(const std::string& data, const std::string& topicName) noexcept { - auto topicIt = this->kafkaTopics.find(topicName); - RdKafka::Topic* topic; - if (topicIt == this->kafkaTopics.end()) - { - if (!defaultTopicConf) { - return KafkaErrorTemplates::kUnknownTopicError.Generate(); - } - std::string err; - topic = RdKafka::Topic::create(producer, topicName, this->defaultTopicConf, err); - if (!topic) { - return KafkaErrorTemplates::kGeneralError.Generate("Cannot create kafka topic [" + topicName + "]: " + err); - } - this->kafkaTopics[topicName] = topic; - } - else - { - topic = topicIt->second; - } - - auto err = producer->produce(topic, RdKafka::Topic::PARTITION_UA, - RdKafka::Producer::RK_MSG_COPY, - const_cast<void*>(static_cast<const void *>(data.data())), data.size(), - nullptr, nullptr); - - switch (err) { - case RdKafka::ERR_NO_ERROR: return nullptr; - case RdKafka::ERR__QUEUE_FULL: return KafkaErrorTemplates::kQueueFullError.Generate(); - case RdKafka::ERR_MSG_SIZE_TOO_LARGE: return KafkaErrorTemplates::kMessageTooLargeError.Generate(); - case RdKafka::ERR__UNKNOWN_PARTITION: return KafkaErrorTemplates::kUnknownPartitionError.Generate(); - case RdKafka::ERR__UNKNOWN_TOPIC: return KafkaErrorTemplates::kUnknownTopicError.Generate(); - default: return KafkaErrorTemplates::kGeneralError.Generate(err2str(err)); - } - -} - -} diff --git a/common/cpp/src/kafka_client/rdkafka_client.h b/common/cpp/src/kafka_client/rdkafka_client.h deleted file mode 100644 index fcfa163c3..000000000 --- a/common/cpp/src/kafka_client/rdkafka_client.h +++ /dev/null @@ -1,25 +0,0 @@ -#ifndef ASAPO_RDKAFKA_CLIENT_H -#define ASAPO_RDKAFKA_CLIENT_H - -#include <string> - -#include "asapo/kafka_client/kafka_client.h" -#include "librdkafka/rdkafkacpp.h" - -namespace asapo { - -class RdKafkaClient final : public KafkaClient { - public: - RdKafkaClient(const KafkaClientConfig& config); - Error Send(const std::string& data, const std::string& topic) noexcept override; - - virtual ~RdKafkaClient(); - private: - RdKafka::Producer* producer; - RdKafka::Conf* defaultTopicConf; - std::map<std::string, RdKafka::Topic *> kafkaTopics; -}; - -} - -#endif //ASAPO_RDKAFKA_CLIENT_H diff --git a/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json b/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json index 088e46b5d..c32c6703a 100644 --- a/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json +++ b/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json @@ -24,11 +24,5 @@ }, "Tag": "receiver", "ReceiveToDiskThresholdMB": {{ .Values.ownServices.receiver.receiveToDiskThresholdMB }}, - "LogLevel": "info", - "KafkaClient": { - "metadata.broker.list": {{ .Values.ownServices.receiver.kafkaBrokerList }} - }, - "KafkaTopics": { - "asapo": {} - } + "LogLevel": "info" } diff --git a/deploy/asapo_helm_chart/asapo/values.yaml b/deploy/asapo_helm_chart/asapo/values.yaml index 946bc9394..beccb7c21 100644 --- a/deploy/asapo_helm_chart/asapo/values.yaml +++ b/deploy/asapo_helm_chart/asapo/values.yaml @@ -33,7 +33,6 @@ ownServices: enable: true sizeGb: 1 receiveToDiskThresholdMB: 200 - kafkaBrokerList: asap3-events-01,asap3-events-02 fileTransfer: serviceName: asapo-file-transfer replicaCount: 1 diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index 02c8dbd78..836a04f9b 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -26,7 +26,6 @@ set(RECEIVER_CORE_FILES src/request_handler/request_handler_db_check_request.cpp src/request_handler/request_handler_db_delete_stream.cpp src/request_handler/request_handler_db_get_meta.cpp - src/request_handler/request_handler_kafka_notify.cpp src/request_handler/request_factory.cpp src/request_handler/request_handler_db.cpp src/request_handler/file_processors/write_file_processor.cpp @@ -60,11 +59,11 @@ set(SOURCE_FILES GET_PROPERTY(ASAPO_COMMON_FABRIC_LIBRARIES GLOBAL PROPERTY ASAPO_COMMON_FABRIC_LIBRARIES) add_library(${TARGET_NAME} STATIC ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> $<TARGET_OBJECTS:curl_http_client> - $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:request_pool> $<TARGET_OBJECTS:rdkafka_client>) + $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:request_pool>) set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX) target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR}) target_include_directories(${TARGET_NAME} SYSTEM PUBLIC ${LIBFABRIC_INCLUDE_DIR}) -target_link_libraries(${TARGET_NAME} CURL::libcurl rdkafka++ ${CMAKE_THREAD_LIBS_INIT} database +target_link_libraries(${TARGET_NAME} CURL::libcurl ${CMAKE_THREAD_LIBS_INIT} database asapo-fabric ${ASAPO_COMMON_FABRIC_LIBRARIES}) diff --git a/receiver/src/main.cpp b/receiver/src/main.cpp index 292a34f26..9f818218d 100644 --- a/receiver/src/main.cpp +++ b/receiver/src/main.cpp @@ -6,7 +6,6 @@ #include "receiver_data_server/receiver_data_server_logger.h" #include "asapo/common/internal/version.h" -#include "asapo/kafka_client/kafka_client.h" #include "receiver_data_server/receiver_data_server.h" #include "receiver_data_server/net_server/rds_tcp_server.h" @@ -135,18 +134,6 @@ int main(int argc, char* argv[]) { } auto metrics_thread = StartMetricsServer(config->metrics, logger); - - if (!config->kafka_config.global_config.empty()) { - err = asapo::InitializeKafkaClient(config->kafka_config); - if (err) { - logger->Error("Error initializing kafka client: " + err->Explain()); - logger->Error("Kafka notification disabled"); - } - } - else { - logger->Info("No kafka config provided. Kafka notification disabled."); - } - auto exit_code = StartReceiver(config, cache, logger); // todo: implement graceful exit, currently it never reaches this point return exit_code; diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp index 80103255a..c6cc78460 100644 --- a/receiver/src/receiver_config.cpp +++ b/receiver/src/receiver_config.cpp @@ -17,8 +17,6 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) { std::string log_level; Error err; - std::vector<std::string> kafkaTopics; - (err = parser.GetString("PerformanceDbServer", &config.performance_db_uri)) || (err = parser.GetBool("MonitorPerformance", &config.monitor_performance)) || (err = parser.GetUInt64("ListenPort", &config.listen_port)) || @@ -44,26 +42,6 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) { return err; } - (err = parser.GetDictionaryString("KafkaClient", &config.kafka_config.global_config)) || - (err = parser.GetArrayObjectMembers("KafkaTopics", &kafkaTopics)); - - if (!err) { - for(const auto& topic : kafkaTopics) { - auto topicConfig = config.kafka_config.topics_config[topic]; - err = parser.Embedded("KafkaTopics").GetDictionaryString(topic, &topicConfig); - if (err) { - break; - } - } - } - - if (err) { - // ignore kafka config error. Just disable it. - config.kafka_config.global_config.clear(); - config.kafka_config.topics_config.clear(); - } - - config.dataserver.tag = config.tag + "_ds"; config.log_level = StringToLogLevel(log_level, &err); diff --git a/receiver/src/receiver_config.h b/receiver/src/receiver_config.h index d065ed295..0a054e2c1 100644 --- a/receiver/src/receiver_config.h +++ b/receiver/src/receiver_config.h @@ -7,7 +7,6 @@ #include "receiver_data_server/receiver_data_server_config.h" #include "metrics/receiver_metrics_config.h" -#include "asapo/kafka_client/kafka_config.h" namespace asapo { @@ -28,7 +27,6 @@ struct ReceiverConfig { ReceiverDataServerConfig dataserver; ReceiverMetricsConfig metrics; std::string discovery_server; - KafkaClientConfig kafka_config; }; class ReceiverConfigManager { diff --git a/receiver/src/request_handler/request_factory.cpp b/receiver/src/request_handler/request_factory.cpp index 6405f82b3..e9c5fdd8d 100644 --- a/receiver/src/request_handler/request_factory.cpp +++ b/receiver/src/request_handler/request_factory.cpp @@ -32,7 +32,6 @@ void RequestFactory::AddReceiveViaBufferHandlers(std::unique_ptr<Request>& reque request->AddHandler(&request_handler_receivedata_); if (NeedFileWriteHandler(request_header)) { request->AddHandler(&request_handler_filewrite_); - request->AddHandler(&request_handler_kafka_notify_); } } @@ -43,7 +42,6 @@ Error RequestFactory::AddReceiveDirectToFileHandler(std::unique_ptr<Request>& re "ingest mode should include kStoreInFilesystem for large files "); } request->AddHandler(&request_handler_filereceive_); - request->AddHandler(&request_handler_kafka_notify_); return nullptr; } diff --git a/receiver/src/request_handler/request_factory.h b/receiver/src/request_handler/request_factory.h index c298c3c2b..04f871cda 100644 --- a/receiver/src/request_handler/request_factory.h +++ b/receiver/src/request_handler/request_factory.h @@ -8,7 +8,6 @@ #include "request_handler_db_last_stream.h" #include "request_handler_db_delete_stream.h" #include "request_handler_db_get_meta.h" -#include "request_handler_kafka_notify.h" #include "request_handler_file_process.h" #include "request_handler_db_write.h" @@ -46,7 +45,6 @@ class RequestFactory { RequestHandlerInitialAuthorization request_handler_initial_authorize_{&shared_auth_cache_}; RequestHandlerSecondaryAuthorization request_handler_secondary_authorize_{&shared_auth_cache_}; RequestHandlerDbCheckRequest request_handler_db_check_{kDBDataCollectionNamePrefix}; - RequestHandlerKafkaNotify request_handler_kafka_notify_; SharedCache cache_; AuthorizationData shared_auth_cache_; bool ReceiveDirectToFile(const GenericRequestHeader& request_header) const; diff --git a/receiver/src/request_handler/request_handler_kafka_notify.cpp b/receiver/src/request_handler/request_handler_kafka_notify.cpp deleted file mode 100644 index 43aef92d1..000000000 --- a/receiver/src/request_handler/request_handler_kafka_notify.cpp +++ /dev/null @@ -1,26 +0,0 @@ -#include "request_handler_kafka_notify.h" -#include "../request.h" - -namespace asapo { - -Error RequestHandlerKafkaNotify::ProcessRequest(Request* request) const { - if (!kafka_client_) { - //client was not initialized, ignore - return nullptr; - } - - std::string message = "{" - "\"event\":\"IN_CLOSE_WRITE\"," - "\"path\":\"" + request->GetFileName() + "\"" - "}"; - - return kafka_client_->Send(message, "asapo"); -} - -StatisticEntity RequestHandlerKafkaNotify::GetStatisticEntity() const { - return StatisticEntity::kNetwork; -} - -RequestHandlerKafkaNotify::RequestHandlerKafkaNotify() : kafka_client_{GetKafkaClient()} { -} -} \ No newline at end of file diff --git a/receiver/src/request_handler/request_handler_kafka_notify.h b/receiver/src/request_handler/request_handler_kafka_notify.h deleted file mode 100644 index 1b165cd75..000000000 --- a/receiver/src/request_handler/request_handler_kafka_notify.h +++ /dev/null @@ -1,20 +0,0 @@ -#ifndef ASAPO_REQUEST_HANDLER_KAFKA_NOTIFY_H -#define ASAPO_REQUEST_HANDLER_KAFKA_NOTIFY_H - -#include "request_handler.h" -#include "asapo/kafka_client/kafka_client.h" - -namespace asapo { - -class RequestHandlerKafkaNotify final : public ReceiverRequestHandler { - public: - RequestHandlerKafkaNotify(); - StatisticEntity GetStatisticEntity() const override; - Error ProcessRequest(Request* request) const override; - private: - std::unique_ptr<KafkaClient> kafka_client_; -}; - -} - -#endif //ASAPO_REQUEST_HANDLER_KAFKA_NOTIFY_H -- GitLab