diff --git a/producer/api/CMakeLists.txt b/producer/api/CMakeLists.txt index e462956c2fe985975672bfc202210eb79a5e86b8..7ecf68637d3ca863578a917d5b626620e82ee0f7 100644 --- a/producer/api/CMakeLists.txt +++ b/producer/api/CMakeLists.txt @@ -4,13 +4,13 @@ set(SOURCE_FILES src/producer_impl.cpp src/producer_logger.cpp src/request.cpp - src/request_pool.cpp src/receivers_status.cpp src/receivers_status.h) + src/request_pool.cpp src/receiver_discovery_service.cpp src/receiver_discovery_service.h) ################################ # Library ################################ -add_library(${TARGET_NAME} STATIC ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> $<TARGET_OBJECTS:logger> +add_library(${TARGET_NAME} STATIC ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:curl_http_client> ) target_include_directories(${TARGET_NAME} PUBLIC include ${ASAPO_CXX_COMMON_INCLUDE_DIR}) target_link_libraries(${TARGET_NAME} ${CURL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) @@ -23,6 +23,7 @@ set(TEST_SOURCE_FILES unittests/test_producer.cpp unittests/test_request.cpp unittests/test_request_pool.cpp + unittests/test_receivers_status.cpp ) set(TEST_LIBRARIES "${TARGET_NAME}") diff --git a/producer/api/src/receiver_discovery_service.cpp b/producer/api/src/receiver_discovery_service.cpp new file mode 100644 index 0000000000000000000000000000000000000000..55d2b4715233856fe221ee3c63b8be4e9609e1a1 --- /dev/null +++ b/producer/api/src/receiver_discovery_service.cpp @@ -0,0 +1,102 @@ +#include "receiver_discovery_service.h" + +#include "producer_logger.h" +#include "json_parser/json_parser.h" + +#include <iostream> +#include <algorithm> + +namespace asapo { + +ReceiverDiscoveryService::ReceiverDiscoveryService(std::string endpoint, uint64_t update_frequency_ms): httpclient__{DefaultHttpClient()}, + log__{GetDefaultProducerLogger()}, + endpoint_{std::move(endpoint)}, update_frequency_ms_{update_frequency_ms} { + +} + +void ReceiverDiscoveryService::StartCollectingData() { + if (thread_ .joinable()) return; + log__->Debug("starting receiver discovery service"); + thread_ = std::thread( + std::bind(&ReceiverDiscoveryService::ThreadHandler, this)); +} + + +Error ReceiverDiscoveryService::ParseResponse(const std::string& responce, ReceiversList* list, + uint64_t* max_connections) { + auto parser = JsonStringParser(responce); + Error err; + (err = parser.GetArrayString("uri_list", list)) || + (err = parser.GetUInt64("max_connections", max_connections)); + return err; +} + +Error ReceiverDiscoveryService::UpdateFromEndpoint(ReceiversList* list, uint64_t* max_connections) { + Error err; + HttpCode code; + auto responce = httpclient__->Get(endpoint_, &code, &err); + if (err != nullptr) { + return err; + } + + if (code != HttpCode::OK) { + return TextError(responce); + } + + return ParseResponse(responce, list, max_connections); + +} + +void ReceiverDiscoveryService::ThreadHandler() { + std::unique_lock<std::mutex> lock(mutex_); + do { + lock.unlock(); + ReceiversList uris; + uint64_t max_connections; + auto err = UpdateFromEndpoint(&uris, &max_connections); + if (err != nullptr) { + log__->Error("getting receivers from " + endpoint_ + " - " + err->Explain()); + continue; + } + lock.lock(); + max_connections_ = max_connections; + uri_list_ = uris; + } while (!condition_.wait_for(lock, std::chrono::milliseconds(update_frequency_ms_), [this] {return (quit_);})) ; + +} + +ReceiverDiscoveryService::~ReceiverDiscoveryService() { + mutex_.lock(); + quit_ = true; + mutex_.unlock(); + condition_.notify_one(); + + if(thread_.joinable()) { + log__->Debug("finishing discovery service"); + thread_.join(); + } +} + +uint64_t ReceiverDiscoveryService::MaxConnections() { + std::lock_guard<std::mutex> lock{mutex_}; + return max_connections_; +} + +ReceiversList ReceiverDiscoveryService::RotatedUriList(uint64_t nthread) { + std::unique_lock<std::mutex> lock(mutex_); + auto size = uri_list_.size(); + if (size == 0) { + return {}; + } + ReceiversList list{uri_list_}; + lock.unlock(); + auto shift = nthread % size; + std::rotate(list.begin(), list.begin() + shift, list.end()); + return list; +} + +uint64_t ReceiverDiscoveryService::UpdateFrequency() { + return update_frequency_ms_; +} + +} \ No newline at end of file diff --git a/producer/api/src/receiver_discovery_service.h b/producer/api/src/receiver_discovery_service.h new file mode 100644 index 0000000000000000000000000000000000000000..c4a5dba934f82dc5d655c7d91ba47ff10b72b2f9 --- /dev/null +++ b/producer/api/src/receiver_discovery_service.h @@ -0,0 +1,49 @@ +#ifndef ASAPO_RECEIVERS_STATUS_H +#define ASAPO_RECEIVERS_STATUS_H + +#include <string> +#include <vector> +#include <mutex> +#include <thread> +#include <condition_variable> + + +#include "http_client/http_client.h" +#include "logger/logger.h" + +#ifdef UNIT_TESTS +#define VIRTUAL virtual +#endif + +namespace asapo { + +using ReceiversList = std::vector<std::string>; + +class ReceiverDiscoveryService { + public: + explicit ReceiverDiscoveryService(std::string endpoint, uint64_t update_frequency_ms); + VIRTUAL void StartCollectingData(); + ~ReceiverDiscoveryService(); + VIRTUAL uint64_t MaxConnections(); + VIRTUAL ReceiversList RotatedUriList(uint64_t nthread); + uint64_t UpdateFrequency(); + public: + std::unique_ptr<HttpClient> httpclient__; + AbstractLogger* log__; + private: + void ThreadHandler(); + Error UpdateFromEndpoint(ReceiversList* list, uint64_t* max_connections); + Error ParseResponse(const std::string& responce, ReceiversList* list, uint64_t* max_connections); + std::string endpoint_; + std::thread thread_; + std::condition_variable condition_; + std::mutex mutex_; + uint64_t max_connections_{0}; + ReceiversList uri_list_; + bool quit_{false}; + uint64_t update_frequency_ms_; +}; + +} + +#endif //ASAPO_RECEIVERS_STATUS_H diff --git a/producer/api/src/receivers_status.cpp b/producer/api/src/receivers_status.cpp deleted file mode 100644 index 981a33b723a1e831e3c289f1c48a547e0549d111..0000000000000000000000000000000000000000 --- a/producer/api/src/receivers_status.cpp +++ /dev/null @@ -1,5 +0,0 @@ -#include "receivers_status.h" - -namespace asapo { - -} \ No newline at end of file diff --git a/producer/api/src/receivers_status.h b/producer/api/src/receivers_status.h deleted file mode 100644 index 9dfaba163f4303604eb9bc504ad64def3cbf9866..0000000000000000000000000000000000000000 --- a/producer/api/src/receivers_status.h +++ /dev/null @@ -1,17 +0,0 @@ -#ifndef ASAPO_RECEIVERS_STATUS_H -#define ASAPO_RECEIVERS_STATUS_H - -#include <string> -#include <vector> - -namespace asapo { - -using ReceiversList = std::vector<std::string>; - -class ReceiversStatus { - -}; - -} - -#endif //ASAPO_RECEIVERS_STATUS_H diff --git a/producer/api/src/request.cpp b/producer/api/src/request.cpp index c4a4035a7c498608f46a8e869689cfb4a99d1d53..1907fe80de86aca5bc7b1757703323668398fb9b 100644 --- a/producer/api/src/request.cpp +++ b/producer/api/src/request.cpp @@ -75,7 +75,12 @@ Error Request::TrySendToReceiver(SocketDescriptor sd, const std::string& receive -Error Request::Send(SocketDescriptor* sd, const ReceiversList& receivers_list) { +Error Request::Send(SocketDescriptor* sd, const ReceiversList& receivers_list,bool rebalance) { + if (rebalance && *sd != kDisconnectedSocketDescriptor) { + io__->CloseSocket(*sd, nullptr); + log__->Info("rebalancing"); + *sd = kDisconnectedSocketDescriptor; + } for (auto receiver_uri : receivers_list) { if (*sd == kDisconnectedSocketDescriptor) { auto err = ConnectToReceiver(sd, receiver_uri); diff --git a/producer/api/src/request.h b/producer/api/src/request.h index b245e7134b9d4f147424b83bed7954e7ea522ea6..d3c8c5d7962a5e090623f39e8e99379b0342d76a 100644 --- a/producer/api/src/request.h +++ b/producer/api/src/request.h @@ -3,7 +3,7 @@ #include "io/io.h" #include "common/error.h" -#include "receivers_status.h" +#include "receiver_discovery_service.h" #include "common/networking.h" #include "producer/producer.h" @@ -19,8 +19,8 @@ class Request { public: explicit Request(const asapo::IO* io, const GenericNetworkRequestHeader& header, const void* data, RequestCallback callback); - VIRTUAL Error Send(SocketDescriptor* sd, const ReceiversList& receivers_list); - VIRTUAL ~Request()=default; + VIRTUAL Error Send(SocketDescriptor* sd, const ReceiversList& receivers_list, bool rebalance); + VIRTUAL ~Request() = default; const IO* io__; const AbstractLogger* log__; uint64_t GetMemoryRequitements(); diff --git a/producer/api/src/request_pool.cpp b/producer/api/src/request_pool.cpp index ac919a860ad02804c8402911604190949845d8b9..70d51c757d091ba814b1bb82e1db0b876020f15d 100644 --- a/producer/api/src/request_pool.cpp +++ b/producer/api/src/request_pool.cpp @@ -1,14 +1,25 @@ +#include <chrono> + #include "request_pool.h" #include "producer_logger.h" + + +using std::chrono::high_resolution_clock; + + namespace asapo { -RequestPool:: RequestPool(uint8_t n_threads, uint64_t max_pool_volume): log__{GetDefaultProducerLogger()}, threads_{n_threads}, - max_pool_volume_{max_pool_volume} { +RequestPool:: RequestPool(uint8_t n_threads, uint64_t max_pool_volume, + ReceiverDiscoveryService* discovery_service): log__{GetDefaultProducerLogger()}, + discovery_service__{discovery_service}, + threads_{n_threads}, max_pool_volume_{max_pool_volume} { + + discovery_service->StartCollectingData(); for(size_t i = 0; i < threads_.size(); i++) { log__->Debug("starting thread " + std::to_string(i)); threads_[i] = std::thread( - std::bind(&RequestPool::ThreadHandler, this)); + [this, i] {ThreadHandler(i);}); } } @@ -33,20 +44,22 @@ Error RequestPool::AddRequest(std::unique_ptr<Request> request) { } -void RequestPool::ThreadHandler(void) { +void RequestPool::ThreadHandler(uint64_t id) { std::unique_lock<std::mutex> lock(mutex_); SocketDescriptor thread_sd = kDisconnectedSocketDescriptor; ReceiversList thread_receivers; + high_resolution_clock::time_point last_rebalance; do { - condition_.wait(lock, [this] { - return (request_queue_.size() || quit_); + condition_.wait(lock, [this, thread_sd] { + return ((request_queue_.size() && (thread_sd != kDisconnectedSocketDescriptor || + ncurrent_connections_ < discovery_service__->MaxConnections()) ) || quit_); }); //after wait, we own the lock - - if (request_queue_.size() && !quit_) { - + if (!quit_) { if (thread_sd == kDisconnectedSocketDescriptor) { - thread_receivers = ReceiversList{"test"}; + thread_receivers = discovery_service__->RotatedUriList(id); + last_rebalance = high_resolution_clock::now(); + ncurrent_connections_++; } auto request = std::move(request_queue_.front()); @@ -55,12 +68,24 @@ void RequestPool::ThreadHandler(void) { //unlock now that we're done messing with the queue lock.unlock(); - auto err = request->Send(&thread_sd,thread_receivers); + uint64_t elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( high_resolution_clock::now() - last_rebalance).count(); + bool rebalance = false; + if (elapsed_ms > discovery_service__->UpdateFrequency()) { + auto thread_receivers_new = discovery_service__->RotatedUriList(id); + last_rebalance = high_resolution_clock::now(); + if (thread_receivers_new != thread_receivers) { + thread_receivers = thread_receivers_new; + rebalance = true; + } + } + + auto err = request->Send(&thread_sd, thread_receivers,rebalance); // we should lock again for the next wait since we did unlock lock.lock(); if (err) { // could not send from this thread - place request back in the queue request_queue_.emplace_front(std::move(request)); + ncurrent_connections_--; } }; } while (!quit_); diff --git a/producer/api/src/request_pool.h b/producer/api/src/request_pool.h index e898be05adc3a2d1c900494bc75ff4d3bb340e77..8a955969d390a4e3532ca906bf1cfd9c29a603b3 100644 --- a/producer/api/src/request_pool.h +++ b/producer/api/src/request_pool.h @@ -5,31 +5,32 @@ #include <vector> #include <mutex> #include <thread> -#include <atomic> #include <condition_variable> #include <queue> #include "logger/logger.h" #include "request.h" - +#include "receiver_discovery_service.h" namespace asapo { class RequestPool { public: - explicit RequestPool(uint8_t n_threads, uint64_t max_pool_volume); + explicit RequestPool(uint8_t n_threads, uint64_t max_pool_volume, ReceiverDiscoveryService* discovery_service); Error AddRequest(std::unique_ptr<Request> request); ~RequestPool(); AbstractLogger* log__; + ReceiverDiscoveryService* discovery_service__; private: std::vector<std::thread> threads_; - void ThreadHandler(); + void ThreadHandler(uint64_t id); bool quit_{false}; std::condition_variable condition_; std::mutex mutex_; std::deque<std::unique_ptr<Request>> request_queue_; uint64_t max_pool_volume_; uint64_t current_pool_volume_{0}; + uint64_t ncurrent_connections_{0}; bool RequestWouldFit(const std::unique_ptr<Request>& request); }; diff --git a/producer/api/unittests/test_receivers_status.cpp b/producer/api/unittests/test_receivers_status.cpp new file mode 100644 index 0000000000000000000000000000000000000000..9c90c70403ad3f733cbb420a437ec41650b8a43a --- /dev/null +++ b/producer/api/unittests/test_receivers_status.cpp @@ -0,0 +1,148 @@ +#include <gtest/gtest.h> +#include <gmock/gmock.h> +#include <chrono> + +#include "unittests/MockLogger.h" +#include "common/error.h" +#include "common/io_error.h" + +#include "../src/receiver_discovery_service.h" +#include "unittests/MockHttpClient.h" + +namespace { + +using ::testing::Return; +using ::testing::_; +using ::testing::DoAll; +using ::testing::SetArgReferee; +using ::testing::Gt; +using ::testing::Eq; +using ::testing::Ne; +using ::testing::Mock; +using ::testing::NiceMock; +using ::testing::AllOf; + +using ::testing::Test; + +using ::testing::InSequence; +using ::testing::HasSubstr; +using testing::SetArgPointee; +using testing::ElementsAre; + +using asapo::Error; +using asapo::MockHttpClient; +using asapo::ReceiverDiscoveryService; + +std::mutex mutex; + +TEST(ReceiversStatus, Constructor) { + ReceiverDiscoveryService status{"endpoint/receivers", 1000}; + ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(status.log__), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::HttpClient*>(status.httpclient__.get()), Ne(nullptr)); +} + + +class ReceiversStatusTests : public Test { + public: + // important to create logger before status, otherwise checks in destructor won't work + NiceMock<asapo::MockLogger> mock_logger; + NiceMock<MockHttpClient>* mock_http_client; + + std::string expected_endpoint{"endpoint/receivers"}; + ReceiverDiscoveryService status{expected_endpoint, 1000}; + + void SetUp() override { + mock_http_client = new NiceMock<MockHttpClient>; + status.httpclient__.reset(mock_http_client); + status.log__ = &mock_logger; + } + void TearDown() override { + } +}; + +TEST_F(ReceiversStatusTests, LogWhenHttpError) { + EXPECT_CALL(*mock_http_client, Get_t(expected_endpoint, _, _)) + .WillOnce( + DoAll(SetArgPointee<2>(new asapo::IOError("Test Read Error", asapo::IOErrorType::kReadError)), + Return("") + )); + + EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("getting receivers"), HasSubstr(expected_endpoint)))); + status.StartCollectingData(); +} + +TEST_F(ReceiversStatusTests, LogWhenWhenWrongHttpCode) { + EXPECT_CALL(*mock_http_client, Get_t(expected_endpoint, _, _)) + .WillOnce( + DoAll(SetArgPointee<2>(nullptr), + SetArgPointee<1>(asapo::HttpCode::BadRequest), + Return("bad request") + )); + + EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("getting receivers"), HasSubstr(expected_endpoint), + HasSubstr("bad request")))); + status.StartCollectingData(); +} + +TEST_F(ReceiversStatusTests, LogWhenWhenCannotReadResponce) { + EXPECT_CALL(*mock_http_client, Get_t(expected_endpoint, _, _)) + .WillOnce( + DoAll(SetArgPointee<2>(nullptr), + SetArgPointee<1>(asapo::HttpCode::OK), + Return("wrong response") + )); + + EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("getting receivers"), HasSubstr(expected_endpoint), + HasSubstr("parse")))); + status.StartCollectingData(); +} + + +TEST_F(ReceiversStatusTests, GetsReqestedInformation) { + std::string json = R"({"uri_list":["s1","s2","s3"], "max_connections":8})"; + + EXPECT_CALL(*mock_http_client, Get_t(expected_endpoint, _, _)) + .WillRepeatedly( + DoAll(SetArgPointee<2>(nullptr), + SetArgPointee<1>(asapo::HttpCode::OK), + Return(json) + )); + + status.StartCollectingData(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + auto nc = status.MaxConnections(); + ASSERT_THAT(nc, Eq(8)); + auto list = status.RotatedUriList(0); + ASSERT_THAT(list, ElementsAre("s1", "s2", "s3")); + + list = status.RotatedUriList(1); + ASSERT_THAT(list, ElementsAre("s2", "s3", "s1")); + + list = status.RotatedUriList(2); + ASSERT_THAT(list, ElementsAre("s3", "s1", "s2")); + + list = status.RotatedUriList(3); + ASSERT_THAT(list, ElementsAre("s1", "s2", "s3")); + +} + +TEST_F(ReceiversStatusTests, JoinThreadAtTheEnd) { + EXPECT_CALL(mock_logger, Debug(HasSubstr("starting receiver discovery"))); + EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing"))); + status.StartCollectingData(); +} + +TEST_F(ReceiversStatusTests, InitialMaxConnection) { + auto nc = status.MaxConnections(); + ASSERT_THAT(nc, Eq(0)); +} + +TEST_F(ReceiversStatusTests, InitialUriList) { + auto list = status.RotatedUriList(0); + ASSERT_THAT(list.size(), Eq(0)); +} + + + +} diff --git a/producer/api/unittests/test_request.cpp b/producer/api/unittests/test_request.cpp index 183b5de4b69a1b2d94ca7fd99eb7041f23ec61de..0b3df412f41a9c36dab3fd3d891437da2194a378 100644 --- a/producer/api/unittests/test_request.cpp +++ b/producer/api/unittests/test_request.cpp @@ -263,7 +263,7 @@ TEST_F(RequestTests, MemoryRequirements) { TEST_F(RequestTests, TriesConnectWhenNotConnected) { ExpectFailConnect(); - auto err = request.Send(&sd, receivers_list); + auto err = request.Send(&sd, receivers_list,false); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } @@ -275,28 +275,65 @@ TEST_F(RequestTests, DoesNotTryConnectWhenConnected) { ExpectFailSendHeader(true); - auto err = request.Send(&sd, asapo::ReceiversList{expected_address1}); + auto err = request.Send(&sd, asapo::ReceiversList{expected_address1},false); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } +TEST_F(RequestTests, DoNotCloseWhenRebalanceAndNotConnected) { + EXPECT_CALL(mock_io, CloseSocket_t(sd, _)).Times(0); + ExpectOKConnect(); + ExpectFailSendHeader(); + + auto err = request.Send(&sd, receivers_list,true); + + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); +} + +TEST_F(RequestTests, DoNotCloseWhenRebalanceIfNotConnected) { + EXPECT_CALL(mock_io, CloseSocket_t(sd, _)).Times(0); + ExpectOKConnect(true); + ExpectFailSendHeader(true); + + + auto err = request.Send(&sd, asapo::ReceiversList{expected_address1},true); + + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); +} + + + +TEST_F(RequestTests, ReconnectWhenRebalance) { + sd = 1000; + EXPECT_CALL(mock_io, CloseSocket_t(1000, _)); + EXPECT_CALL(mock_logger, Info(HasSubstr("rebalancing"))); + + ExpectOKConnect(true); + ExpectFailSendHeader(true); + + + auto err = request.Send(&sd, asapo::ReceiversList{expected_address1},true); + + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); +} TEST_F(RequestTests, ErrorWhenCannotSendHeader) { ExpectOKConnect(); ExpectFailSendHeader(); - auto err = request.Send(&sd, receivers_list); + auto err = request.Send(&sd, receivers_list,false); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } + TEST_F(RequestTests, ErrorWhenCannotSendData) { ExpectOKConnect(); ExpectOKSendHeader(); ExpectFailSendData(); - auto err = request.Send(&sd, receivers_list); + auto err = request.Send(&sd, receivers_list,false); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } @@ -308,7 +345,7 @@ TEST_F(RequestTests, ErrorWhenCannotReceiveData) { ExpectFailReceive(); - auto err = request.Send(&sd, receivers_list); + auto err = request.Send(&sd, receivers_list,false); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } @@ -316,7 +353,7 @@ TEST_F(RequestTests, ErrorWhenCannotReceiveData) { -TEST_F(RequestTests, ImmediatelyCalBackErrorIfFileAlreadyInUse) { +TEST_F(RequestTests, ImmediatelyCallBackErrorIfFileAlreadyInUse) { ExpectOKConnect(true); ExpectOKSendHeader(true); ExpectOKSendData(true); @@ -330,7 +367,7 @@ TEST_F(RequestTests, ImmediatelyCalBackErrorIfFileAlreadyInUse) { )); - auto err = request.Send(&sd, receivers_list); + auto err = request.Send(&sd, receivers_list,false); ASSERT_THAT(callback_err, Eq(asapo::ProducerErrorTemplates::kFileIdAlreadyInUse)); ASSERT_THAT(called, Eq(true)); @@ -344,7 +381,7 @@ TEST_F(RequestTests, SendOK) { ExpectOKSendData(true); ExpectOKReceive(); - auto err = request.Send(&sd, receivers_list); + auto err = request.Send(&sd, receivers_list,false); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(sd, Eq(expected_sds[0])); @@ -357,5 +394,4 @@ TEST_F(RequestTests, SendOK) { } - } diff --git a/producer/api/unittests/test_request_pool.cpp b/producer/api/unittests/test_request_pool.cpp index ae2bd666c2a91ccc728e26b1cfb9b94eb5ba31dd..efefc974dae68150ed04616d550195a06395475e 100644 --- a/producer/api/unittests/test_request_pool.cpp +++ b/producer/api/unittests/test_request_pool.cpp @@ -7,6 +7,8 @@ #include "../src/request.h" #include "../src/request_pool.h" +#include "../src/receiver_discovery_service.h" + #include "io/io_factory.h" namespace { @@ -20,7 +22,7 @@ using ::testing::Eq; using ::testing::Ne; using ::testing::Mock; using ::testing::AllOf; - +using testing::DoAll; using ::testing::InSequence; using ::testing::HasSubstr; @@ -29,21 +31,26 @@ using asapo::Request; using asapo::RequestPool; using asapo::Error; +const std::string expected_endpoint{"endpoint"}; -TEST(RequestPool, Constructor) { - asapo::RequestPool pool{4, 4}; - ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(pool.log__), Ne(nullptr)); -} +class MockDiscoveryService : public asapo::ReceiverDiscoveryService { + public: + MockDiscoveryService() : ReceiverDiscoveryService{expected_endpoint, 1} {}; + MOCK_METHOD0(StartCollectingData, void()); + MOCK_METHOD0(MaxConnections, uint64_t()); + MOCK_METHOD1(RotatedUriList, asapo::ReceiversList(uint64_t)); +}; class MockRequest : public Request { public: - MockRequest() : Request(asapo::GenerateDefaultIO(), asapo::GenericNetworkRequestHeader{}, nullptr, nullptr) {}; - Error Send(asapo::SocketDescriptor* sd, const asapo::ReceiversList& receivers_list) override { - return Error {Send_t(sd, receivers_list)}; + std::unique_ptr<asapo::IO> io = std::unique_ptr<asapo::IO> {asapo::GenerateDefaultIO()}; + MockRequest() : Request(io.get(), asapo::GenericNetworkRequestHeader{}, nullptr, nullptr) {}; + Error Send(asapo::SocketDescriptor* sd, const asapo::ReceiversList& receivers_list, bool rebalance) override { + return Error {Send_t(sd, receivers_list,rebalance)}; } - MOCK_METHOD2(Send_t, asapo::SimpleError*(asapo::SocketDescriptor*, const asapo::ReceiversList&)); + MOCK_METHOD3(Send_t, asapo::SimpleError * (asapo::SocketDescriptor*, const asapo::ReceiversList&,bool)); }; class RequestPoolTests : public testing::Test { @@ -51,24 +58,39 @@ class RequestPoolTests : public testing::Test { testing::NiceMock<asapo::MockLogger> mock_logger; const uint8_t nthreads = 4; const uint64_t max_size = 1024 * 1024 * 1024; - asapo::RequestPool pool {nthreads, max_size}; + testing::NiceMock<MockDiscoveryService> mock_discovery; + asapo::RequestPool pool {nthreads, max_size, &mock_discovery}; std::unique_ptr<Request> request; MockRequest* mock_request = new MockRequest; + asapo::ReceiversList expected_receivers_list{"ip1", "ip2", "ip3"}; void SetUp() override { pool.log__ = &mock_logger; request.reset(mock_request); + ON_CALL(mock_discovery, MaxConnections()).WillByDefault(Return(100)); + ON_CALL(mock_discovery, RotatedUriList(_)).WillByDefault(Return(expected_receivers_list)); + } void TearDown() override { } }; +TEST(RequestPool, Constructor) { + auto io = std::unique_ptr<asapo::IO> {asapo::GenerateDefaultIO()}; + testing::NiceMock<MockDiscoveryService> mock_discovery; + + EXPECT_CALL(mock_discovery, StartCollectingData()); + asapo::RequestPool pool{4, 4, &mock_discovery}; -TEST(RequestPool, AddRequestFailsDueToSize) { - RequestPool pool{4, 0}; + ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(pool.log__), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::ReceiverDiscoveryService*>(pool.discovery_service__), Ne(nullptr)); +} +TEST(RequestPool, AddRequestFailsDueToSize) { auto io = std::unique_ptr<asapo::IO> {asapo::GenerateDefaultIO()}; - asapo::GenericNetworkRequestHeader header; + asapo::ReceiverDiscoveryService discovery{expected_endpoint, 1000}; + RequestPool pool{4, 0, &discovery}; + asapo::GenericNetworkRequestHeader header{asapo::Opcode::kNetOpcodeCount, 1, 1}; std::unique_ptr<Request> request{new Request{io.get(), header, nullptr, [](asapo::GenericNetworkRequestHeader, asapo::Error) {}}}; auto err = pool.AddRequest(std::move(request)); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull)); @@ -76,11 +98,14 @@ TEST(RequestPool, AddRequestFailsDueToSize) { } TEST_F(RequestPoolTests, AddRequestCallsSend) { - EXPECT_CALL(*mock_request, Send_t(testing::Pointee(asapo::kDisconnectedSocketDescriptor), testing::ElementsAre("test"))). - WillOnce( - Return(nullptr) - ); + EXPECT_CALL(mock_discovery, RotatedUriList(_)).WillOnce(Return(asapo::ReceiversList{"ip3", "ip2", "ip1"})); + + EXPECT_CALL(*mock_request, Send_t(testing::Pointee(asapo::kDisconnectedSocketDescriptor), + testing::ElementsAre("ip3", "ip2", "ip1"),false)). + WillOnce( + Return(nullptr) + ); auto err = pool.AddRequest(std::move(request)); std::this_thread::sleep_for(std::chrono::milliseconds(10)); @@ -90,265 +115,111 @@ TEST_F(RequestPoolTests, AddRequestCallsSend) { TEST_F(RequestPoolTests, AddRequestCallsSendTwice) { asapo::SimpleError* send_error = new asapo::SimpleError("www"); - EXPECT_CALL(*mock_request, Send_t(testing::Pointee(asapo::kDisconnectedSocketDescriptor), testing::ElementsAre("test"))) - .Times(2) - .WillOnce(Return(send_error)) - .WillOnce(Return(nullptr)); + + EXPECT_CALL(*mock_request, Send_t(testing::Pointee(asapo::kDisconnectedSocketDescriptor), testing::ElementsAre("ip1", + "ip2", "ip3"),false)) + .Times(2) + .WillOnce(DoAll( + testing::SetArgPointee<0>(asapo::kDisconnectedSocketDescriptor), + Return(send_error) + )) + .WillOnce(DoAll( + testing::SetArgPointee<0>(1), + Return(nullptr) + )); auto err = pool.AddRequest(std::move(request)); std::this_thread::sleep_for(std::chrono::milliseconds(10)); - ASSERT_THAT(err, Eq(nullptr)); - } +TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { + EXPECT_CALL(mock_discovery, MaxConnections()).WillRepeatedly(Return(1)); -TEST_F(RequestPoolTests, FinishProcessingThreads) { - EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing thread"))).Times(nthreads); -} - - -/* - -void RequestTests::ExpectFailSendHeader(bool only_once) { - int i = 0; - for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(expected_file_id, - expected_file_size), - sizeof(asapo::GenericNetworkRequestHeader), _)) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), - Return(-1) - )); - EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("cannot send header"), - HasSubstr(receivers_list[i]) - ) - )); - EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); - if (only_once) break; - i++; - } + MockRequest* mock_request2 = new MockRequest; -} - -void RequestTests::ExpectFailSendData(bool only_once) { - int i = 0; - for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, Send_t(expected_sd, expected_file_pointer, expected_file_size, _)) - .Times(1) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), - Return(-1) - )); - EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("cannot send data"), - HasSubstr(receivers_list[i]) - ) - )); - EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); - if (only_once) break; - i++; - } - -} + EXPECT_CALL(*mock_request, Send_t(testing::Pointee(asapo::kDisconnectedSocketDescriptor), testing::ElementsAre("ip1", + "ip2", "ip3"),false)) + .WillOnce(DoAll( + testing::SetArgPointee<0>(1), + Return(nullptr) + ) + ); + EXPECT_CALL(*mock_request2, Send_t(testing::Pointee(1), testing::ElementsAre("ip1", "ip2", "ip3"),false)) + .WillOnce(DoAll( + testing::SetArgPointee<0>(1), + Return(nullptr) + ) + ); + auto err1 = pool.AddRequest(std::move(request)); + request.reset(mock_request2); + auto err2 = pool.AddRequest(std::move(request)); - -void RequestTests::ExpectFailReceive(bool only_once) { - int i = 0; - for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _)) - .Times(1) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), - testing::Return(-1) - )); - EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("cannot receive"), - HasSubstr(receivers_list[i]) - ) - )); - EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); - if (only_once) break; - i++; - } - -} - - -void RequestTests::ExpectOKSendData(bool only_once) { - for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, Send_t(expected_sd, expected_file_pointer, expected_file_size, _)) - .Times(1) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - Return(expected_file_size) - )); - if (only_once) break; - } - -} - - - -void RequestTests::ExpectOKSendHeader(bool only_once) { - for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(expected_file_id, - expected_file_size), - sizeof(asapo::GenericNetworkRequestHeader), _)) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - Return(sizeof(asapo::GenericNetworkRequestHeader)) - )); - if (only_once) break; - } - -} - - -void RequestTests::ExpectOKConnect(bool only_once) { - int i = 0; - for (auto expected_address : receivers_list) { - EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(expected_address, _)) - .WillOnce( - DoAll( - testing::SetArgPointee<1>(nullptr), - Return(expected_sds[i]) - )); - EXPECT_CALL(mock_logger, Info(AllOf( - HasSubstr("connected"), - HasSubstr(expected_address) - ) - )); - if (only_once) break; - i++; - } -} - - -void RequestTests::ExpectOKReceive(bool only_once) { - int i = 0; - for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _)) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - A_WriteSendDataResponse(asapo::kNetErrorNoError), - testing::ReturnArg<2>() - )); - EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("sent data"), - HasSubstr(receivers_list[i]) - ) - )); - if (only_once) break; - i++; - } -} - - -TEST_F(RequestTests, TriesConnectWhenNotConnected) { - ExpectFailConnect(); - - auto err = request.Send(&sd, receivers_list); - - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + ASSERT_THAT(err1, Eq(nullptr)); + ASSERT_THAT(err2, Eq(nullptr)); } -TEST_F(RequestTests, DoesNotTryConnectWhenConnected) { - sd = expected_sds[0]; - EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(_, _)) - .Times(0); - ExpectFailSendHeader(true); - auto err = request.Send(&sd, asapo::ReceiversList{expected_address1}); - - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); -} - +TEST_F(RequestPoolTests, AddRequestDoesNotCallsSendWhenNoConnactionsAllowed) { + EXPECT_CALL(*mock_request, Send_t(_, _,_)).Times(0); + EXPECT_CALL(mock_discovery, MaxConnections()).WillRepeatedly(Return(0)); -TEST_F(RequestTests, ErrorWhenCannotSendHeader) { - ExpectOKConnect(); - ExpectFailSendHeader(); - - auto err = request.Send(&sd, receivers_list); + pool.discovery_service__ = &mock_discovery; + auto err = pool.AddRequest(std::move(request)); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); + ASSERT_THAT(err, Eq(nullptr)); } -TEST_F(RequestTests, ErrorWhenCannotSendData) { - ExpectOKConnect(); - ExpectOKSendHeader(); - ExpectFailSendData(); - auto err = request.Send(&sd, receivers_list); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); +TEST_F(RequestPoolTests, FinishProcessingThreads) { + EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing thread"))).Times(nthreads); } -TEST_F(RequestTests, ErrorWhenCannotReceiveData) { - ExpectOKConnect(); - ExpectOKSendHeader(); - ExpectOKSendData(); - ExpectFailReceive(); +TEST_F(RequestPoolTests, Rebalance) { + EXPECT_CALL(mock_discovery, MaxConnections()).WillRepeatedly(Return(1)); - auto err = request.Send(&sd, receivers_list); + MockRequest* mock_request2 = new MockRequest; - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); -} + EXPECT_CALL(mock_discovery, RotatedUriList(_)).Times(2). + WillOnce(Return(asapo::ReceiversList{"ip3", "ip2", "ip1"})). + WillOnce(Return(asapo::ReceiversList{"ip4", "ip5", "ip6"})); + EXPECT_CALL(*mock_request, Send_t(testing::Pointee(asapo::kDisconnectedSocketDescriptor), testing::ElementsAre("ip3", + "ip2", "ip1"),false)) + .WillOnce(DoAll( + testing::SetArgPointee<0>(1), + Return(nullptr) + ) + ); + EXPECT_CALL(*mock_request2, Send_t(testing::Pointee(1), testing::ElementsAre("ip4", "ip5", "ip6"),true)) + .WillOnce(DoAll( + testing::SetArgPointee<0>(1), + Return(nullptr) + ) + ); -TEST_F(RequestTests, ImmediatelyCalBackErrorIfFileAlreadyInUse) { - ExpectOKConnect(true); - ExpectOKSendHeader(true); - ExpectOKSendData(true); + auto err1 = pool.AddRequest(std::move(request)); + request.reset(mock_request2); - EXPECT_CALL(mock_io, Receive_t(expected_sds[0], _, sizeof(asapo::SendDataResponse), _)) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - A_WriteSendDataResponse(asapo::kNetErrorFileIdAlreadyInUse), - testing::ReturnArg<2>() - )); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + auto err2 = pool.AddRequest(std::move(request)); - auto err = request.Send(&sd, receivers_list); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + ASSERT_THAT(err1, Eq(nullptr)); + ASSERT_THAT(err2, Eq(nullptr)); - ASSERT_THAT(callback_err, Eq(asapo::ProducerErrorTemplates::kFileIdAlreadyInUse)); - ASSERT_THAT(called, Eq(true)); - ASSERT_THAT(err, Eq(nullptr)); } -TEST_F(RequestTests, SendOK) { - ExpectOKConnect(true); - ExpectOKSendHeader(true); - ExpectOKSendData(true); - ExpectOKReceive(); - - auto err = request.Send(&sd, receivers_list); - - ASSERT_THAT(err, Eq(nullptr)); - ASSERT_THAT(sd, Eq(expected_sds[0])); - ASSERT_THAT(callback_err, Eq(nullptr)); - ASSERT_THAT(called, Eq(true)); - ASSERT_THAT(callback_header.data_size, Eq(header.data_size)); - ASSERT_THAT(callback_header.op_code, Eq(header.op_code)); - ASSERT_THAT(callback_header.data_id, Eq(header.data_id)); - -} - -*/ }