Skip to content
Snippets Groups Projects
Commit 84c3ce5b authored by George Sedov's avatar George Sedov
Browse files

Add unittests for kafka notifications

parent 6968ea2e
No related branches found
No related tags found
No related merge requests found
Showing
with 123 additions and 68 deletions
......@@ -16,8 +16,7 @@ class KafkaClient {
virtual ~KafkaClient() = default;
};
Error InitializeKafkaClient(const KafkaClientConfig& config);
std::unique_ptr<KafkaClient> GetKafkaClient();
KafkaClient* CreateKafkaClient(const KafkaClientConfig& config, Error* err);
}
......
#ifndef ASAPO_MOCKKAFKACLIENT_H
#define ASAPO_MOCKKAFKACLIENT_H
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include "asapo/kafka_client/kafka_client.h"
namespace asapo {
class MockKafkaClient : public KafkaClient {
public:
Error Send(const std::string& data, const std::string& topic) noexcept override {
return Error{Send_t(data, topic)};
}
MOCK_METHOD(ErrorInterface *, Send_t, (const std::string& data, const std::string& topic), ());
};
}
#endif //ASAPO_MOCKKAFKACLIENT_H
......@@ -2,28 +2,13 @@
namespace asapo {
KafkaClient* instance = nullptr;
Error InitializeKafkaClient(const KafkaClientConfig& config) {
if (instance != nullptr) {
return KafkaErrorTemplates::kGeneralError.Generate("Kafka client already initialized");
}
KafkaClient* CreateKafkaClient(const KafkaClientConfig& config, Error* err) {
try {
instance = new RdKafkaClient(config);
return new RdKafkaClient(config);
}
catch (std::string err) {
return KafkaErrorTemplates::kGeneralError.Generate(err);
catch (std::string errstr) {
(*err) = KafkaErrorTemplates::kGeneralError.Generate(errstr);
}
return nullptr;
}
std::unique_ptr<KafkaClient> GetKafkaClient() {
if (instance != nullptr) {
return std::unique_ptr<KafkaClient> {instance};
} else {
return nullptr;
}
}
}
......@@ -25,10 +25,13 @@
"Tag": "receiver",
"ReceiveToDiskThresholdMB": {{ .Values.ownServices.receiver.receiveToDiskThresholdMB }},
"LogLevel": "info",
"KafkaClient": {
"metadata.broker.list": {{ .Values.ownServices.receiver.kafkaBrokerList }}
},
"KafkaTopics": {
"asapo": {}
"Kafka": {
"Enabled": true,
"KafkaClient": {
"metadata.broker.list": {{ .Values.ownServices.receiver.kafkaBrokerList }}
},
"KafkaTopics": {
"asapo": {}
}
}
}
......@@ -108,6 +108,7 @@ set(TEST_SOURCE_FILES
unittests/request_handler/test_request_handler_receive_metadata.cpp
unittests/request_handler/test_request_handler_delete_stream.cpp
unittests/request_handler/test_request_handler_db_get_meta.cpp
unittests/request_handler/test_request_handler_kafka_notify.cpp
unittests/statistics/test_statistics_sender_influx_db.cpp
unittests/statistics/test_statistics_sender_fluentd.cpp
unittests/mock_receiver_config.cpp
......
......@@ -9,11 +9,11 @@
namespace asapo {
Connection::Connection(SocketDescriptor socket_fd, const std::string& address,
SharedCache cache, std::string receiver_tag) :
SharedCache cache, KafkaClient* kafkaClient, std::string receiver_tag) :
io__{GenerateDefaultIO()},
statistics__{new ReceiverStatistics},
log__{GetDefaultReceiverLogger()},
requests_dispatcher__{new RequestsDispatcher{socket_fd, address, statistics__.get(), cache}} {
requests_dispatcher__{new RequestsDispatcher{socket_fd, address, statistics__.get(), cache, kafkaClient}} {
socket_fd_ = socket_fd;
address_ = address;
statistics__->AddTag("connection_from", address);
......
......@@ -28,7 +28,7 @@ class Connection {
int socket_fd_;
public:
Connection(SocketDescriptor socket_fd, const std::string& address, SharedCache cache, std::string receiver_tag);
Connection(SocketDescriptor socket_fd, const std::string& address, SharedCache cache, KafkaClient* kafkaClient, std::string receiver_tag);
~Connection() = default;
void Listen() const noexcept;
......
......@@ -78,11 +78,11 @@ std::vector<std::thread> StartDataServers(const asapo::ReceiverConfig* config, a
}
int StartReceiver(const asapo::ReceiverConfig* config, asapo::SharedCache cache,
asapo::AbstractLogger* logger) {
asapo::KafkaClient* kafkaClient, asapo::AbstractLogger* logger) {
static const std::string address = "0.0.0.0:" + std::to_string(config->listen_port);
logger->Info(std::string("starting receiver, version ") + asapo::kVersion);
auto* receiver = new asapo::Receiver(cache);
auto* receiver = new asapo::Receiver(cache, kafkaClient);
logger->Info("listening on " + address);
asapo::Error err;
......@@ -122,6 +122,7 @@ int main(int argc, char* argv[]) {
logger->SetLogLevel(config->log_level);
asapo::SharedCache cache = nullptr;
asapo::KafkaClient* kafkaClient = nullptr;
if (config->use_datacache) {
cache.reset(new asapo::DataCache{config->datacache_size_gb * 1024 * 1024 * 1024,
(float) config->datacache_reserved_share / 100});
......@@ -137,17 +138,17 @@ int main(int argc, char* argv[]) {
auto metrics_thread = StartMetricsServer(config->metrics, logger);
if (!config->kafka_config.global_config.empty()) {
err = asapo::InitializeKafkaClient(config->kafka_config);
if (err) {
kafkaClient = asapo::CreateKafkaClient(config->kafka_config, &err);
if (kafkaClient == nullptr) {
logger->Error("Error initializing kafka client: " + err->Explain());
logger->Error("Kafka notification disabled");
return EXIT_FAILURE;
}
}
else {
logger->Info("No kafka config provided. Kafka notification disabled.");
logger->Info("Kafka notification disabled.");
}
auto exit_code = StartReceiver(config, cache, logger);
auto exit_code = StartReceiver(config, cache, kafkaClient, logger);
// todo: implement graceful exit, currently it never reaches this point
return exit_code;
}
......@@ -11,7 +11,7 @@ namespace asapo {
const int Receiver::kMaxUnacceptedConnectionsBacklog = 5;
Receiver::Receiver(SharedCache cache): cache_{cache}, io__{GenerateDefaultIO()}, log__{GetDefaultReceiverLogger()} {
Receiver::Receiver(SharedCache cache, KafkaClient* kafkaClient): cache_{cache}, kafka_client_{kafkaClient}, io__{GenerateDefaultIO()}, log__{GetDefaultReceiverLogger()} {
}
......@@ -55,7 +55,7 @@ void Receiver::StartNewConnectionInSeparateThread(int connection_socket_fd, cons
log__->Info(LogMessageWithFields("new connection with producer").Append("origin", HostFromUri(address)));
auto thread = io__->NewThread("ConFd:" + std::to_string(connection_socket_fd),
[connection_socket_fd, address, this] {
auto connection = std::unique_ptr<Connection>(new Connection(connection_socket_fd, address, cache_, GetReceiverConfig()->tag));
auto connection = std::unique_ptr<Connection>(new Connection(connection_socket_fd, address, cache_, kafka_client_.get(), GetReceiverConfig()->tag));
connection->Listen();
});
......
......@@ -20,11 +20,12 @@ class Receiver {
void ProcessConnections(Error* err);
std::vector<std::unique_ptr<std::thread>> threads_;
SharedCache cache_;
std::unique_ptr<KafkaClient> kafka_client_;
public:
static const int kMaxUnacceptedConnectionsBacklog;//TODO: Read from config
Receiver(const Receiver&) = delete;
Receiver& operator=(const Receiver&) = delete;
Receiver(SharedCache cache);
Receiver(SharedCache cache, KafkaClient* kafkaClient);
void Listen(std::string listener_address, Error* err, bool exit_after_first_connection = false);
std::unique_ptr<IO> io__;
......
......@@ -17,6 +17,7 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) {
std::string log_level;
Error err;
bool kafkaEnabled;
std::vector<std::string> kafkaTopics;
(err = parser.GetString("PerformanceDbServer", &config.performance_db_uri)) ||
......@@ -38,32 +39,29 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) {
(err = parser.Embedded("DataServer").GetArrayString("NetworkMode", &config.dataserver.network_mode)) ||
(err = parser.Embedded("Metrics").GetBool("Expose", &config.metrics.expose)) ||
(err = parser.Embedded("Metrics").GetUInt64("ListenPort", &config.metrics.listen_port)) ||
(err = parser.GetString("LogLevel", &log_level));
(err = parser.GetString("LogLevel", &log_level)) ||
(err = parser.Embedded("Kafka").GetBool("Enabled", &kafkaEnabled));
if (err) {
return err;
}
(err = parser.GetDictionaryString("KafkaClient", &config.kafka_config.global_config)) ||
(err = parser.GetArrayObjectMembers("KafkaTopics", &kafkaTopics));
if (kafkaEnabled) {
// read the configuration only if kafka is enabled. empty configuration means "disabled"
(err = parser.GetDictionaryString("KafkaClient", &config.kafka_config.global_config)) ||
(err = parser.GetArrayObjectMembers("KafkaTopics", &kafkaTopics));
if (err) {
return err;
}
if (!err) {
for(const auto& topic : kafkaTopics) {
auto topicConfig = config.kafka_config.topics_config[topic];
err = parser.Embedded("KafkaTopics").GetDictionaryString(topic, &topicConfig);
if (err) {
break;
return err;
}
}
}
if (err) {
// ignore kafka config error. Just disable it.
config.kafka_config.global_config.clear();
config.kafka_config.topics_config.clear();
}
config.dataserver.tag = config.tag + "_ds";
config.log_level = StringToLogLevel(log_level, &err);
......
......@@ -118,8 +118,7 @@ std::unique_ptr<Request> RequestFactory::GenerateRequest(const GenericRequestHea
return request;
}
RequestFactory::RequestFactory(SharedCache cache) : cache_{cache} {
RequestFactory::RequestFactory(SharedCache cache, KafkaClient* kafka_client) : request_handler_kafka_notify_{kafka_client}, cache_{cache} {
}
}
......@@ -24,7 +24,7 @@ namespace asapo {
class RequestFactory {
public:
explicit RequestFactory (SharedCache cache);
explicit RequestFactory (SharedCache cache, KafkaClient* kafka_client);
virtual std::unique_ptr<Request> GenerateRequest(const GenericRequestHeader& request_header,
SocketDescriptor socket_fd, std::string origin_uri, Error* err) const noexcept;
virtual ~RequestFactory() = default;
......
......@@ -21,6 +21,6 @@ StatisticEntity RequestHandlerKafkaNotify::GetStatisticEntity() const {
return StatisticEntity::kNetwork;
}
RequestHandlerKafkaNotify::RequestHandlerKafkaNotify() : kafka_client_{GetKafkaClient()} {
RequestHandlerKafkaNotify::RequestHandlerKafkaNotify(KafkaClient* kafka_client) : kafka_client_{kafka_client} {
}
}
\ No newline at end of file
......@@ -8,11 +8,11 @@ namespace asapo {
class RequestHandlerKafkaNotify final : public ReceiverRequestHandler {
public:
RequestHandlerKafkaNotify();
RequestHandlerKafkaNotify(KafkaClient* kafka_client);
StatisticEntity GetStatisticEntity() const override;
Error ProcessRequest(Request* request) const override;
private:
std::unique_ptr<KafkaClient> kafka_client_;
KafkaClient* kafka_client_;
};
}
......
......@@ -5,13 +5,13 @@
namespace asapo {
RequestsDispatcher::RequestsDispatcher(SocketDescriptor socket_fd, std::string address,
ReceiverStatistics* statistics, SharedCache cache) : statistics__{statistics},
ReceiverStatistics* statistics, SharedCache cache, KafkaClient* kafka_client) : statistics__{statistics},
io__{GenerateDefaultIO()},
log__{
GetDefaultReceiverLogger()},
request_factory__{
new RequestFactory{
cache}},
cache, kafka_client}},
socket_fd_{socket_fd},
producer_uri_{
std::move(address)} {
......
......@@ -14,7 +14,7 @@ namespace asapo {
class RequestsDispatcher {
public:
RequestsDispatcher(SocketDescriptor socket_fd, std::string address, ReceiverStatistics* statistics, SharedCache cache);
RequestsDispatcher(SocketDescriptor socket_fd, std::string address, ReceiverStatistics* statistics, SharedCache cache, KafkaClient* kafka_client);
VIRTUAL Error ProcessRequest(const std::unique_ptr<Request>& request) const noexcept;
VIRTUAL std::unique_ptr<Request> GetNextRequest(Error* err) const noexcept;
VIRTUAL ~RequestsDispatcher() = default;
......
......@@ -80,6 +80,9 @@ Error SetReceiverConfig (const ReceiverConfig& config, std::string error_field)
config_string += "," + Key("AuthorizationServer", error_field) + "\"" + config.authorization_server + "\"";
config_string += "," + Key("LogLevel", error_field) + "\"" + log_level + "\"";
config_string += "," + Key("Tag", error_field) + "\"" + config.tag + "\"";
config_string += "," + Key("Kafka", error_field) + "{";
config_string += Key("Enabled", error_field) + "false";
config_string += "}";
config_string += "}";
......
......@@ -5,6 +5,7 @@
#include "asapo/unittests/MockIO.h"
#include "asapo/unittests/MockDatabase.h"
#include "asapo/unittests/MockKafkaClient.h"
#include "../../src/connection.h"
#include "../../src/receiver_error.h"
#include "../../src/request.h"
......@@ -25,7 +26,8 @@ namespace {
class FactoryTests : public Test {
public:
RequestFactory factory{nullptr};
asapo::MockKafkaClient kafkaClient;
RequestFactory factory{nullptr, &kafkaClient};
Error err{nullptr};
GenericRequestHeader generic_request_header;
ReceiverConfig config;
......@@ -53,12 +55,13 @@ TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendCode) {
ASSERT_THAT(err, Eq(nullptr));
ASSERT_THAT(dynamic_cast<asapo::Request*>(request.get()), Ne(nullptr));
ASSERT_THAT(request->GetListHandlers().size(), Eq(5));
ASSERT_THAT(request->GetListHandlers().size(), Eq(6));
ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerSecondaryAuthorization*>(request->GetListHandlers()[0]),
Ne(nullptr));
ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveMetaData*>(request->GetListHandlers()[1]), Ne(nullptr));
ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveData*>(request->GetListHandlers()[2]), Ne(nullptr));
ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerFileProcess*>(request->GetListHandlers()[3]), Ne(nullptr));
ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerKafkaNotify*>(request->GetListHandlers()[4]), Ne(nullptr));
ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers().back()), Ne(nullptr));
}
}
......@@ -74,11 +77,12 @@ TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendCodeLargeFile) {
ASSERT_THAT(err, Eq(nullptr));
ASSERT_THAT(dynamic_cast<asapo::Request*>(request.get()), Ne(nullptr));
ASSERT_THAT(request->GetListHandlers().size(), Eq(4));
ASSERT_THAT(request->GetListHandlers().size(), Eq(5));
ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerSecondaryAuthorization*>(request->GetListHandlers()[0]),
Ne(nullptr));
ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveMetaData*>(request->GetListHandlers()[1]), Ne(nullptr));
ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerFileProcess*>(request->GetListHandlers()[2]), Ne(nullptr));
ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerKafkaNotify*>(request->GetListHandlers()[3]), Ne(nullptr));
ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers().back()), Ne(nullptr));
}
}
......@@ -111,16 +115,17 @@ TEST_F(FactoryTests, DoNotAddDbWriterIfNotWanted) {
auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err);
ASSERT_THAT(err, Eq(nullptr));
ASSERT_THAT(request->GetListHandlers().size(), Eq(4));
ASSERT_THAT(request->GetListHandlers().size(), Eq(5));
ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerSecondaryAuthorization*>(request->GetListHandlers()[0]),
Ne(nullptr));
ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveMetaData*>(request->GetListHandlers()[1]), Ne(nullptr));
ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveData*>(request->GetListHandlers()[2]), Ne(nullptr));
ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerFileProcess*>(request->GetListHandlers()[3]), Ne(nullptr));
ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerKafkaNotify*>(request->GetListHandlers()[4]), Ne(nullptr));
}
TEST_F(FactoryTests, CachePassedToRequest) {
RequestFactory factory{std::shared_ptr<asapo::DataCache>{new asapo::DataCache{0, 0}}};
RequestFactory factory{std::shared_ptr<asapo::DataCache>{new asapo::DataCache{0, 0}}, nullptr};
auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err);
ASSERT_THAT(err, Eq(nullptr));
......
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include "asapo/unittests/MockKafkaClient.h"
#include "../../src/request_handler/request_factory.h"
#include "../receiver_mocking.h"
using namespace testing;
using namespace asapo;
namespace {
class KafkaNotifyHandlerTests : public Test {
public:
NiceMock<MockKafkaClient> kafka_client;
RequestHandlerKafkaNotify handler{&kafka_client};
std::unique_ptr<NiceMock<MockRequest>> mock_request;
const std::string expected_filename = "filename";
const std::string expected_topic = "asapo";
void SetUp() override {
GenericRequestHeader request_header;
mock_request.reset(new NiceMock<MockRequest> {request_header, 1, "", nullptr});
EXPECT_CALL(*mock_request, GetFileName()).WillOnce(Return(expected_filename));
EXPECT_CALL(kafka_client, Send_t(HasSubstr(expected_filename), expected_topic)).WillOnce(Return(nullptr));
}
void TearDown() override {
}
};
TEST_F(KafkaNotifyHandlerTests, KafkaNotifyOK) {
auto err = handler.ProcessRequest(mock_request.get());
ASSERT_THAT(err, Eq(nullptr));
Mock::VerifyAndClearExpectations(mock_request.get());
Mock::VerifyAndClearExpectations(mock_request.get());
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment