diff --git a/common/cpp/include/asapo/kafka_client/kafka_client.h b/common/cpp/include/asapo/kafka_client/kafka_client.h index 64e0335278625709342ca5ffd50b05f3a08dcfc3..a920d36e1e1e0bc3fa53256dfc8d41052965acb6 100644 --- a/common/cpp/include/asapo/kafka_client/kafka_client.h +++ b/common/cpp/include/asapo/kafka_client/kafka_client.h @@ -16,8 +16,7 @@ class KafkaClient { virtual ~KafkaClient() = default; }; -Error InitializeKafkaClient(const KafkaClientConfig& config); -std::unique_ptr<KafkaClient> GetKafkaClient(); +KafkaClient* CreateKafkaClient(const KafkaClientConfig& config, Error* err); } diff --git a/common/cpp/include/asapo/unittests/MockKafkaClient.h b/common/cpp/include/asapo/unittests/MockKafkaClient.h new file mode 100644 index 0000000000000000000000000000000000000000..f51ea3a6e553fdda1aaefd85b32decb2d3851d91 --- /dev/null +++ b/common/cpp/include/asapo/unittests/MockKafkaClient.h @@ -0,0 +1,22 @@ +#ifndef ASAPO_MOCKKAFKACLIENT_H +#define ASAPO_MOCKKAFKACLIENT_H + +#include <gtest/gtest.h> +#include <gmock/gmock.h> + +#include "asapo/kafka_client/kafka_client.h" + +namespace asapo { + +class MockKafkaClient : public KafkaClient { + public: + Error Send(const std::string& data, const std::string& topic) noexcept override { + return Error{Send_t(data, topic)}; + } + + MOCK_METHOD(ErrorInterface *, Send_t, (const std::string& data, const std::string& topic), ()); +}; + +} + +#endif //ASAPO_MOCKKAFKACLIENT_H diff --git a/common/cpp/src/kafka_client/kafka_client_factory.cpp b/common/cpp/src/kafka_client/kafka_client_factory.cpp index 8f3526bfbc15b88b79caaac1c27e7a2a25bc85a5..eabae085efb94ec96bfa9a8ff056256119ce8227 100644 --- a/common/cpp/src/kafka_client/kafka_client_factory.cpp +++ b/common/cpp/src/kafka_client/kafka_client_factory.cpp @@ -2,28 +2,13 @@ namespace asapo { -KafkaClient* instance = nullptr; - -Error InitializeKafkaClient(const KafkaClientConfig& config) { - if (instance != nullptr) { - return KafkaErrorTemplates::kGeneralError.Generate("Kafka client already initialized"); - } - +KafkaClient* CreateKafkaClient(const KafkaClientConfig& config, Error* err) { try { - instance = new RdKafkaClient(config); + return new RdKafkaClient(config); } - catch (std::string err) { - return KafkaErrorTemplates::kGeneralError.Generate(err); + catch (std::string errstr) { + (*err) = KafkaErrorTemplates::kGeneralError.Generate(errstr); } - return nullptr; } - -std::unique_ptr<KafkaClient> GetKafkaClient() { - if (instance != nullptr) { - return std::unique_ptr<KafkaClient> {instance}; - } else { - return nullptr; - } -} } diff --git a/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json b/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json index 088e46b5dc343c8abc8b1d43e0287c76c8cce991..7ae6db9d0e5f29539e8f706f8a5c7e248e9772ed 100644 --- a/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json +++ b/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json @@ -25,10 +25,13 @@ "Tag": "receiver", "ReceiveToDiskThresholdMB": {{ .Values.ownServices.receiver.receiveToDiskThresholdMB }}, "LogLevel": "info", - "KafkaClient": { - "metadata.broker.list": {{ .Values.ownServices.receiver.kafkaBrokerList }} - }, - "KafkaTopics": { - "asapo": {} + "Kafka": { + "Enabled": true, + "KafkaClient": { + "metadata.broker.list": {{ .Values.ownServices.receiver.kafkaBrokerList }} + }, + "KafkaTopics": { + "asapo": {} + } } } diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index 02c8dbd789890c8c4576a3ac97410a8386389b79..866cdcb752e61abd30ee0f9576e1d3e0589ab65c 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -108,6 +108,7 @@ set(TEST_SOURCE_FILES unittests/request_handler/test_request_handler_receive_metadata.cpp unittests/request_handler/test_request_handler_delete_stream.cpp unittests/request_handler/test_request_handler_db_get_meta.cpp + unittests/request_handler/test_request_handler_kafka_notify.cpp unittests/statistics/test_statistics_sender_influx_db.cpp unittests/statistics/test_statistics_sender_fluentd.cpp unittests/mock_receiver_config.cpp diff --git a/receiver/src/connection.cpp b/receiver/src/connection.cpp index b0fcdca7010056553a11f89041c459c94ab71f49..aa0a1e028271d9ec590058857333317e46280818 100644 --- a/receiver/src/connection.cpp +++ b/receiver/src/connection.cpp @@ -9,11 +9,11 @@ namespace asapo { Connection::Connection(SocketDescriptor socket_fd, const std::string& address, - SharedCache cache, std::string receiver_tag) : + SharedCache cache, KafkaClient* kafkaClient, std::string receiver_tag) : io__{GenerateDefaultIO()}, statistics__{new ReceiverStatistics}, log__{GetDefaultReceiverLogger()}, -requests_dispatcher__{new RequestsDispatcher{socket_fd, address, statistics__.get(), cache}} { +requests_dispatcher__{new RequestsDispatcher{socket_fd, address, statistics__.get(), cache, kafkaClient}} { socket_fd_ = socket_fd; address_ = address; statistics__->AddTag("connection_from", address); diff --git a/receiver/src/connection.h b/receiver/src/connection.h index 192b8e66af2626525c7f15a3e0e6146182da06ef..88af2a4ec6001cbf58e32c88e940bfbf24f476a1 100644 --- a/receiver/src/connection.h +++ b/receiver/src/connection.h @@ -28,7 +28,7 @@ class Connection { int socket_fd_; public: - Connection(SocketDescriptor socket_fd, const std::string& address, SharedCache cache, std::string receiver_tag); + Connection(SocketDescriptor socket_fd, const std::string& address, SharedCache cache, KafkaClient* kafkaClient, std::string receiver_tag); ~Connection() = default; void Listen() const noexcept; diff --git a/receiver/src/main.cpp b/receiver/src/main.cpp index 292a34f261b7b1c52599162fa16d85ccf41f6d49..aedfddc166457000d107d8169e1b1e7dddc3e9cb 100644 --- a/receiver/src/main.cpp +++ b/receiver/src/main.cpp @@ -78,11 +78,11 @@ std::vector<std::thread> StartDataServers(const asapo::ReceiverConfig* config, a } int StartReceiver(const asapo::ReceiverConfig* config, asapo::SharedCache cache, - asapo::AbstractLogger* logger) { + asapo::KafkaClient* kafkaClient, asapo::AbstractLogger* logger) { static const std::string address = "0.0.0.0:" + std::to_string(config->listen_port); logger->Info(std::string("starting receiver, version ") + asapo::kVersion); - auto* receiver = new asapo::Receiver(cache); + auto* receiver = new asapo::Receiver(cache, kafkaClient); logger->Info("listening on " + address); asapo::Error err; @@ -122,6 +122,7 @@ int main(int argc, char* argv[]) { logger->SetLogLevel(config->log_level); asapo::SharedCache cache = nullptr; + asapo::KafkaClient* kafkaClient = nullptr; if (config->use_datacache) { cache.reset(new asapo::DataCache{config->datacache_size_gb * 1024 * 1024 * 1024, (float) config->datacache_reserved_share / 100}); @@ -137,17 +138,17 @@ int main(int argc, char* argv[]) { auto metrics_thread = StartMetricsServer(config->metrics, logger); if (!config->kafka_config.global_config.empty()) { - err = asapo::InitializeKafkaClient(config->kafka_config); - if (err) { + kafkaClient = asapo::CreateKafkaClient(config->kafka_config, &err); + if (kafkaClient == nullptr) { logger->Error("Error initializing kafka client: " + err->Explain()); - logger->Error("Kafka notification disabled"); + return EXIT_FAILURE; } } else { - logger->Info("No kafka config provided. Kafka notification disabled."); + logger->Info("Kafka notification disabled."); } - auto exit_code = StartReceiver(config, cache, logger); + auto exit_code = StartReceiver(config, cache, kafkaClient, logger); // todo: implement graceful exit, currently it never reaches this point return exit_code; } diff --git a/receiver/src/receiver.cpp b/receiver/src/receiver.cpp index e3ef529d402f141d51004104e3ab01bbbda5bfcd..8dc7f7733f7d811870f687acf61b0c75e3e0cead 100644 --- a/receiver/src/receiver.cpp +++ b/receiver/src/receiver.cpp @@ -11,7 +11,7 @@ namespace asapo { const int Receiver::kMaxUnacceptedConnectionsBacklog = 5; -Receiver::Receiver(SharedCache cache): cache_{cache}, io__{GenerateDefaultIO()}, log__{GetDefaultReceiverLogger()} { +Receiver::Receiver(SharedCache cache, KafkaClient* kafkaClient): cache_{cache}, kafka_client_{kafkaClient}, io__{GenerateDefaultIO()}, log__{GetDefaultReceiverLogger()} { } @@ -55,7 +55,7 @@ void Receiver::StartNewConnectionInSeparateThread(int connection_socket_fd, cons log__->Info(LogMessageWithFields("new connection with producer").Append("origin", HostFromUri(address))); auto thread = io__->NewThread("ConFd:" + std::to_string(connection_socket_fd), [connection_socket_fd, address, this] { - auto connection = std::unique_ptr<Connection>(new Connection(connection_socket_fd, address, cache_, GetReceiverConfig()->tag)); + auto connection = std::unique_ptr<Connection>(new Connection(connection_socket_fd, address, cache_, kafka_client_.get(), GetReceiverConfig()->tag)); connection->Listen(); }); diff --git a/receiver/src/receiver.h b/receiver/src/receiver.h index e7d9bf1dd3c064cb033f551d4affd5f426854da1..a0ff2bb13703ee3c46f3d1f08f113cc1140de7bd 100644 --- a/receiver/src/receiver.h +++ b/receiver/src/receiver.h @@ -20,11 +20,12 @@ class Receiver { void ProcessConnections(Error* err); std::vector<std::unique_ptr<std::thread>> threads_; SharedCache cache_; + std::unique_ptr<KafkaClient> kafka_client_; public: static const int kMaxUnacceptedConnectionsBacklog;//TODO: Read from config Receiver(const Receiver&) = delete; Receiver& operator=(const Receiver&) = delete; - Receiver(SharedCache cache); + Receiver(SharedCache cache, KafkaClient* kafkaClient); void Listen(std::string listener_address, Error* err, bool exit_after_first_connection = false); std::unique_ptr<IO> io__; diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp index 80103255a1ad5d5d2ec728ce97541b2cdfec3a25..c32526d3f85f733357d2fd44d4186bc2c74c541c 100644 --- a/receiver/src/receiver_config.cpp +++ b/receiver/src/receiver_config.cpp @@ -17,6 +17,7 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) { std::string log_level; Error err; + bool kafkaEnabled; std::vector<std::string> kafkaTopics; (err = parser.GetString("PerformanceDbServer", &config.performance_db_uri)) || @@ -38,32 +39,29 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) { (err = parser.Embedded("DataServer").GetArrayString("NetworkMode", &config.dataserver.network_mode)) || (err = parser.Embedded("Metrics").GetBool("Expose", &config.metrics.expose)) || (err = parser.Embedded("Metrics").GetUInt64("ListenPort", &config.metrics.listen_port)) || - (err = parser.GetString("LogLevel", &log_level)); + (err = parser.GetString("LogLevel", &log_level)) || + (err = parser.Embedded("Kafka").GetBool("Enabled", &kafkaEnabled)); + - if (err) { - return err; - } - (err = parser.GetDictionaryString("KafkaClient", &config.kafka_config.global_config)) || - (err = parser.GetArrayObjectMembers("KafkaTopics", &kafkaTopics)); + if (kafkaEnabled) { + // read the configuration only if kafka is enabled. empty configuration means "disabled" + (err = parser.GetDictionaryString("KafkaClient", &config.kafka_config.global_config)) || + (err = parser.GetArrayObjectMembers("KafkaTopics", &kafkaTopics)); + + if (err) { + return err; + } - if (!err) { for(const auto& topic : kafkaTopics) { auto topicConfig = config.kafka_config.topics_config[topic]; err = parser.Embedded("KafkaTopics").GetDictionaryString(topic, &topicConfig); if (err) { - break; + return err; } } } - if (err) { - // ignore kafka config error. Just disable it. - config.kafka_config.global_config.clear(); - config.kafka_config.topics_config.clear(); - } - - config.dataserver.tag = config.tag + "_ds"; config.log_level = StringToLogLevel(log_level, &err); diff --git a/receiver/src/request_handler/request_factory.cpp b/receiver/src/request_handler/request_factory.cpp index 6405f82b30b4a7970f9a892406f486da96806006..df5757019dd87b3abe80c81a1a8e5083faae9663 100644 --- a/receiver/src/request_handler/request_factory.cpp +++ b/receiver/src/request_handler/request_factory.cpp @@ -118,8 +118,7 @@ std::unique_ptr<Request> RequestFactory::GenerateRequest(const GenericRequestHea return request; } -RequestFactory::RequestFactory(SharedCache cache) : cache_{cache} { - +RequestFactory::RequestFactory(SharedCache cache, KafkaClient* kafka_client) : request_handler_kafka_notify_{kafka_client}, cache_{cache} { } } diff --git a/receiver/src/request_handler/request_factory.h b/receiver/src/request_handler/request_factory.h index c298c3c2bbc704f09224b31e371ab637655aeaac..578e2b342c89ba65754424e46dfab61ddd93a7cf 100644 --- a/receiver/src/request_handler/request_factory.h +++ b/receiver/src/request_handler/request_factory.h @@ -24,7 +24,7 @@ namespace asapo { class RequestFactory { public: - explicit RequestFactory (SharedCache cache); + explicit RequestFactory (SharedCache cache, KafkaClient* kafka_client); virtual std::unique_ptr<Request> GenerateRequest(const GenericRequestHeader& request_header, SocketDescriptor socket_fd, std::string origin_uri, Error* err) const noexcept; virtual ~RequestFactory() = default; diff --git a/receiver/src/request_handler/request_handler_kafka_notify.cpp b/receiver/src/request_handler/request_handler_kafka_notify.cpp index 43aef92d111cb58b3ad1e79de1f6b0bc3efd9ff8..558f74d3fe135756084c235efc0423805d7ed584 100644 --- a/receiver/src/request_handler/request_handler_kafka_notify.cpp +++ b/receiver/src/request_handler/request_handler_kafka_notify.cpp @@ -21,6 +21,6 @@ StatisticEntity RequestHandlerKafkaNotify::GetStatisticEntity() const { return StatisticEntity::kNetwork; } -RequestHandlerKafkaNotify::RequestHandlerKafkaNotify() : kafka_client_{GetKafkaClient()} { +RequestHandlerKafkaNotify::RequestHandlerKafkaNotify(KafkaClient* kafka_client) : kafka_client_{kafka_client} { } } \ No newline at end of file diff --git a/receiver/src/request_handler/request_handler_kafka_notify.h b/receiver/src/request_handler/request_handler_kafka_notify.h index 1b165cd75b792f8dcbd8d9a3cd28ad684aa06463..399b01126dce0865f273b8446746c5acde53692d 100644 --- a/receiver/src/request_handler/request_handler_kafka_notify.h +++ b/receiver/src/request_handler/request_handler_kafka_notify.h @@ -8,11 +8,11 @@ namespace asapo { class RequestHandlerKafkaNotify final : public ReceiverRequestHandler { public: - RequestHandlerKafkaNotify(); + RequestHandlerKafkaNotify(KafkaClient* kafka_client); StatisticEntity GetStatisticEntity() const override; Error ProcessRequest(Request* request) const override; private: - std::unique_ptr<KafkaClient> kafka_client_; + KafkaClient* kafka_client_; }; } diff --git a/receiver/src/request_handler/requests_dispatcher.cpp b/receiver/src/request_handler/requests_dispatcher.cpp index e222465641aaaeb919976ed713d9ef316214cb40..e416ede4b3c670a41674f27e0eaa137a0e93395c 100644 --- a/receiver/src/request_handler/requests_dispatcher.cpp +++ b/receiver/src/request_handler/requests_dispatcher.cpp @@ -5,13 +5,13 @@ namespace asapo { RequestsDispatcher::RequestsDispatcher(SocketDescriptor socket_fd, std::string address, - ReceiverStatistics* statistics, SharedCache cache) : statistics__{statistics}, + ReceiverStatistics* statistics, SharedCache cache, KafkaClient* kafka_client) : statistics__{statistics}, io__{GenerateDefaultIO()}, log__{ GetDefaultReceiverLogger()}, request_factory__{ new RequestFactory{ - cache}}, + cache, kafka_client}}, socket_fd_{socket_fd}, producer_uri_{ std::move(address)} { diff --git a/receiver/src/request_handler/requests_dispatcher.h b/receiver/src/request_handler/requests_dispatcher.h index b18da4725c1fbbaa7ce309f118a31c71769adefb..5f4bcc4687703da64f7e12511f40ec8d24d836ae 100644 --- a/receiver/src/request_handler/requests_dispatcher.h +++ b/receiver/src/request_handler/requests_dispatcher.h @@ -14,7 +14,7 @@ namespace asapo { class RequestsDispatcher { public: - RequestsDispatcher(SocketDescriptor socket_fd, std::string address, ReceiverStatistics* statistics, SharedCache cache); + RequestsDispatcher(SocketDescriptor socket_fd, std::string address, ReceiverStatistics* statistics, SharedCache cache, KafkaClient* kafka_client); VIRTUAL Error ProcessRequest(const std::unique_ptr<Request>& request) const noexcept; VIRTUAL std::unique_ptr<Request> GetNextRequest(Error* err) const noexcept; VIRTUAL ~RequestsDispatcher() = default; diff --git a/receiver/unittests/mock_receiver_config.cpp b/receiver/unittests/mock_receiver_config.cpp index 2e02670b4a23553ecfa320112b924f05c20a4545..bec7f9caebd58a335bae5674539ad2bbc4c23dc8 100644 --- a/receiver/unittests/mock_receiver_config.cpp +++ b/receiver/unittests/mock_receiver_config.cpp @@ -80,6 +80,9 @@ Error SetReceiverConfig (const ReceiverConfig& config, std::string error_field) config_string += "," + Key("AuthorizationServer", error_field) + "\"" + config.authorization_server + "\""; config_string += "," + Key("LogLevel", error_field) + "\"" + log_level + "\""; config_string += "," + Key("Tag", error_field) + "\"" + config.tag + "\""; + config_string += "," + Key("Kafka", error_field) + "{"; + config_string += Key("Enabled", error_field) + "false"; + config_string += "}"; config_string += "}"; diff --git a/receiver/unittests/request_handler/test_request_factory.cpp b/receiver/unittests/request_handler/test_request_factory.cpp index f2c6179a22a17ad97f823dc5fa11747ea9c5cf06..7eeff9af6a6effb8572058aa990848126a92299d 100644 --- a/receiver/unittests/request_handler/test_request_factory.cpp +++ b/receiver/unittests/request_handler/test_request_factory.cpp @@ -5,6 +5,7 @@ #include "asapo/unittests/MockIO.h" #include "asapo/unittests/MockDatabase.h" +#include "asapo/unittests/MockKafkaClient.h" #include "../../src/connection.h" #include "../../src/receiver_error.h" #include "../../src/request.h" @@ -25,7 +26,8 @@ namespace { class FactoryTests : public Test { public: - RequestFactory factory{nullptr}; + asapo::MockKafkaClient kafkaClient; + RequestFactory factory{nullptr, &kafkaClient}; Error err{nullptr}; GenericRequestHeader generic_request_header; ReceiverConfig config; @@ -53,12 +55,13 @@ TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendCode) { ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(dynamic_cast<asapo::Request*>(request.get()), Ne(nullptr)); - ASSERT_THAT(request->GetListHandlers().size(), Eq(5)); + ASSERT_THAT(request->GetListHandlers().size(), Eq(6)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerSecondaryAuthorization*>(request->GetListHandlers()[0]), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveMetaData*>(request->GetListHandlers()[1]), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveData*>(request->GetListHandlers()[2]), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerFileProcess*>(request->GetListHandlers()[3]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerKafkaNotify*>(request->GetListHandlers()[4]), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers().back()), Ne(nullptr)); } } @@ -74,11 +77,12 @@ TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendCodeLargeFile) { ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(dynamic_cast<asapo::Request*>(request.get()), Ne(nullptr)); - ASSERT_THAT(request->GetListHandlers().size(), Eq(4)); + ASSERT_THAT(request->GetListHandlers().size(), Eq(5)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerSecondaryAuthorization*>(request->GetListHandlers()[0]), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveMetaData*>(request->GetListHandlers()[1]), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerFileProcess*>(request->GetListHandlers()[2]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerKafkaNotify*>(request->GetListHandlers()[3]), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers().back()), Ne(nullptr)); } } @@ -111,16 +115,17 @@ TEST_F(FactoryTests, DoNotAddDbWriterIfNotWanted) { auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); ASSERT_THAT(err, Eq(nullptr)); - ASSERT_THAT(request->GetListHandlers().size(), Eq(4)); + ASSERT_THAT(request->GetListHandlers().size(), Eq(5)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerSecondaryAuthorization*>(request->GetListHandlers()[0]), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveMetaData*>(request->GetListHandlers()[1]), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveData*>(request->GetListHandlers()[2]), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerFileProcess*>(request->GetListHandlers()[3]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerKafkaNotify*>(request->GetListHandlers()[4]), Ne(nullptr)); } TEST_F(FactoryTests, CachePassedToRequest) { - RequestFactory factory{std::shared_ptr<asapo::DataCache>{new asapo::DataCache{0, 0}}}; + RequestFactory factory{std::shared_ptr<asapo::DataCache>{new asapo::DataCache{0, 0}}, nullptr}; auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); ASSERT_THAT(err, Eq(nullptr)); diff --git a/receiver/unittests/request_handler/test_request_handler_kafka_notify.cpp b/receiver/unittests/request_handler/test_request_handler_kafka_notify.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4b9c68d89bdfd9da165ea9e4288a0a6210ed8c15 --- /dev/null +++ b/receiver/unittests/request_handler/test_request_handler_kafka_notify.cpp @@ -0,0 +1,38 @@ +#include <gtest/gtest.h> +#include <gmock/gmock.h> + +#include "asapo/unittests/MockKafkaClient.h" +#include "../../src/request_handler/request_factory.h" +#include "../receiver_mocking.h" + +using namespace testing; +using namespace asapo; + +namespace { + +class KafkaNotifyHandlerTests : public Test { + public: + NiceMock<MockKafkaClient> kafka_client; + RequestHandlerKafkaNotify handler{&kafka_client}; + std::unique_ptr<NiceMock<MockRequest>> mock_request; + const std::string expected_filename = "filename"; + const std::string expected_topic = "asapo"; + + void SetUp() override { + GenericRequestHeader request_header; + mock_request.reset(new NiceMock<MockRequest> {request_header, 1, "", nullptr}); + EXPECT_CALL(*mock_request, GetFileName()).WillOnce(Return(expected_filename)); + EXPECT_CALL(kafka_client, Send_t(HasSubstr(expected_filename), expected_topic)).WillOnce(Return(nullptr)); + } + + void TearDown() override { + } +}; + +TEST_F(KafkaNotifyHandlerTests, KafkaNotifyOK) { + auto err = handler.ProcessRequest(mock_request.get()); + ASSERT_THAT(err, Eq(nullptr)); + Mock::VerifyAndClearExpectations(mock_request.get()); + Mock::VerifyAndClearExpectations(mock_request.get()); +} +} \ No newline at end of file diff --git a/receiver/unittests/request_handler/test_requests_dispatcher.cpp b/receiver/unittests/request_handler/test_requests_dispatcher.cpp index c7831accb35c3ca48c118e3825fded1a086bdce1..d5c7ead18205916bc0d1019225d22385f9ed739d 100644 --- a/receiver/unittests/request_handler/test_requests_dispatcher.cpp +++ b/receiver/unittests/request_handler/test_requests_dispatcher.cpp @@ -20,7 +20,7 @@ namespace { TEST(RequestDispatcher, Constructor) { auto stat = std::unique_ptr<ReceiverStatistics> {new ReceiverStatistics}; - RequestsDispatcher dispatcher{0, "some_address", stat.get(), nullptr}; + RequestsDispatcher dispatcher{0, "some_address", stat.get(), nullptr, nullptr}; ASSERT_THAT(dynamic_cast<const asapo::ReceiverStatistics*>(dispatcher.statistics__), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::IO*>(dispatcher.io__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::RequestFactory*>(dispatcher.request_factory__.get()), Ne(nullptr)); @@ -40,7 +40,7 @@ class MockRequest: public Request { class MockRequestFactory: public asapo::RequestFactory { public: - MockRequestFactory(): RequestFactory(nullptr) {}; + MockRequestFactory(): RequestFactory(nullptr, nullptr) {}; std::unique_ptr<Request> GenerateRequest(const GenericRequestHeader& request_header, SocketDescriptor socket_fd, std::string origin_uri, Error* err) const noexcept override { @@ -80,7 +80,7 @@ class RequestsDispatcherTests : public Test { void SetUp() override { test_config.authorization_interval_ms = 0; SetReceiverConfig(test_config, "none"); - dispatcher = std::unique_ptr<RequestsDispatcher> {new RequestsDispatcher{0, connected_uri, &mock_statictics, nullptr}}; + dispatcher = std::unique_ptr<RequestsDispatcher> {new RequestsDispatcher{0, connected_uri, &mock_statictics, nullptr, nullptr}}; dispatcher->io__ = std::unique_ptr<asapo::IO> {&mock_io}; dispatcher->statistics__ = &mock_statictics; dispatcher->request_factory__ = std::unique_ptr<asapo::RequestFactory> {&mock_factory}; diff --git a/receiver/unittests/test_connection.cpp b/receiver/unittests/test_connection.cpp index d8d24445a227ebd121a83cadd3f3b547a3307ac1..4f73aaa17bc3e2ba1faa7a6a72b036a0ba1b265e 100644 --- a/receiver/unittests/test_connection.cpp +++ b/receiver/unittests/test_connection.cpp @@ -14,7 +14,7 @@ using namespace asapo; namespace { TEST(Connection, Constructor) { - Connection connection{0, "some_address", nullptr, "some_tag"}; + Connection connection{0, "some_address", nullptr, nullptr, "some_tag"}; ASSERT_THAT(dynamic_cast<asapo::Statistics*>(connection.statistics__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::IO*>(connection.io__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(connection.log__), Ne(nullptr)); @@ -24,7 +24,7 @@ TEST(Connection, Constructor) { class MockDispatcher: public asapo::RequestsDispatcher { public: - MockDispatcher(): asapo::RequestsDispatcher(0, "", nullptr, nullptr) {}; + MockDispatcher(): asapo::RequestsDispatcher(0, "", nullptr, nullptr, nullptr) {}; Error ProcessRequest(const std::unique_ptr<Request>& request) const noexcept override { return Error{ProcessRequest_t(request.get())}; } @@ -52,7 +52,7 @@ class ConnectionTests : public Test { std::unique_ptr<Connection> connection; void SetUp() override { - connection = std::unique_ptr<Connection> {new Connection{0, connected_uri, nullptr, "some_tag"}}; + connection = std::unique_ptr<Connection> {new Connection{0, connected_uri, nullptr, nullptr, "some_tag"}}; connection->io__ = std::unique_ptr<asapo::IO> {&mock_io}; connection->statistics__ = std::unique_ptr<asapo::ReceiverStatistics> {&mock_statictics}; connection->log__ = &mock_logger; diff --git a/receiver/unittests/test_receiver.cpp b/receiver/unittests/test_receiver.cpp index 49e7859cd08227ff27e9baa7166d26fcab0e19c3..1d032201124db3a5a4644a9485d3bd57c7280fa0 100644 --- a/receiver/unittests/test_receiver.cpp +++ b/receiver/unittests/test_receiver.cpp @@ -12,7 +12,7 @@ using namespace asapo; namespace { TEST(Receiver, Constructor) { - asapo::Receiver receiver(nullptr); + asapo::Receiver receiver(nullptr, nullptr); ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(receiver.log__), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::IO*>(receiver.io__.get()), Ne(nullptr)); } @@ -31,7 +31,7 @@ class StartListenerFixture : public testing::Test { Error err; ::testing::NiceMock<asapo::MockLogger> mock_logger; ::testing::NiceMock<asapo::MockIO> mock_io; - asapo::Receiver receiver{nullptr}; + asapo::Receiver receiver{nullptr, nullptr}; void SetUp() override { err = nullptr;