From 6738a408dc8ff7548e5370f7f42d0f28474b0794 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Sat, 19 May 2018 08:44:22 +0200 Subject: [PATCH] all tests pass --- producer/api/src/request_handler_tcp.cpp | 3 - producer/api/src/request_pool.cpp | 2 - .../unittests/test_request_handler_tcp.cpp | 107 ++++----- .../api/unittests/test_request_pool_old.cpp | 204 ------------------ 4 files changed, 46 insertions(+), 270 deletions(-) delete mode 100644 producer/api/unittests/test_request_pool_old.cpp diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp index f5732764a..fbdd30225 100644 --- a/producer/api/src/request_handler_tcp.cpp +++ b/producer/api/src/request_handler_tcp.cpp @@ -80,7 +80,6 @@ Error RequestHandlerTcp::TrySendToReceiver(const Request* request, const std::st void RequestHandlerTcp::UpdateReceiversUriIfNewConnection() { if (sd_ != kDisconnectedSocketDescriptor) return; - receivers_list_ = discovery_service__->RotatedUriList(thread_id_); last_receivers_uri_update_ = high_resolution_clock::now(); (*ncurrent_connections_)++; @@ -103,7 +102,6 @@ bool RequestHandlerTcp::CheckForRebalance() { } } return rebalance; - } Error RequestHandlerTcp::ProcessRequestUnlocked(const Request* request) { @@ -130,7 +128,6 @@ Error RequestHandlerTcp::ProcessRequestUnlocked(const Request* request) { request->callback(request->header, std::move(err)); } return nullptr; - } return ProducerErrorTemplates::kCannotSendDataToReceivers.Generate(); } diff --git a/producer/api/src/request_pool.cpp b/producer/api/src/request_pool.cpp index 8eb18321f..1825a0a1f 100644 --- a/producer/api/src/request_pool.cpp +++ b/producer/api/src/request_pool.cpp @@ -42,9 +42,7 @@ void RequestPool::PutRequestBackToQueue(std::unique_ptr<Request> request) { void RequestPool::ProcessRequest(const std::unique_ptr<RequestHandler>& request_handler, ThreadInformation* thread_info) { - request_handler->PrepareProcessingRequestLocked(); - auto request = GetRequestFromQueue(); thread_info->lock.unlock(); auto err = request_handler->ProcessRequestUnlocked(request.get()); diff --git a/producer/api/unittests/test_request_handler_tcp.cpp b/producer/api/unittests/test_request_handler_tcp.cpp index 538d8cc73..009480a6e 100644 --- a/producer/api/unittests/test_request_handler_tcp.cpp +++ b/producer/api/unittests/test_request_handler_tcp.cpp @@ -68,11 +68,12 @@ class RequestHandlerTcpTests : public testing::Test { uint64_t n_connections{0}; asapo::RequestHandlerTcp request_handler{&mock_discovery_service, expected_thread_id, &n_connections}; - asapo::SocketDescriptor sd = asapo::kDisconnectedSocketDescriptor; std::string expected_address1 = {"127.0.0.1:9090"}; std::string expected_address2 = {"127.0.0.1:9091"}; asapo::ReceiversList receivers_list{expected_address1, expected_address2}; - asapo::ReceiversList receivers_list_single{expected_address1}; + asapo::ReceiversList receivers_list2{expected_address2, expected_address1}; + + asapo::ReceiversList receivers_list_single{expected_address1}; std::vector<asapo::SocketDescriptor> expected_sds{83942, 83943}; @@ -89,6 +90,9 @@ class RequestHandlerTcpTests : public testing::Test { void SetUp() override { request_handler.log__ = &mock_logger; request_handler.io__.reset(&mock_io); + ON_CALL(mock_discovery_service, RotatedUriList(_)). + WillByDefault(Return(receivers_list)); + } void TearDown() override { request_handler.io__.release(); @@ -330,113 +334,92 @@ TEST_F(RequestHandlerTcpTests, DoNotReduceConnectionNumberAtTearDownIfNoError) { } -/* -//Error ProcessRequestUnlocked(const Request* request) override; - -//void TearDownProcessingRequestLocked(const Error &error_from_process) override; - - - -TEST_F(RequestTests, MemoryRequirements) { - - auto size = request.GetMemoryRequitements(); - - ASSERT_THAT(size, Eq(sizeof(asapo::RequestHandlerTcp) + expected_file_size)); -} - - -TEST_F(RequestTests, TriesConnectWhenNotConnected) { +TEST_F(RequestHandlerTcpTests, TriesConnectWhenNotConnected) { ExpectFailConnect(); - auto err = request.ProcessRequestUnlocked(&sd, receivers_list, false); + request_handler.PrepareProcessingRequestLocked(); + auto err = request_handler.ProcessRequestUnlocked(&request); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } -TEST_F(RequestTests, DoesNotTryConnectWhenConnected) { - sd = expected_sds[0]; +TEST_F(RequestHandlerTcpTests, DoesNotTryConnectWhenConnected) { + DoSingleSend(); + EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(_, _)) .Times(0); ExpectFailSendHeader(true); - - auto err = request.ProcessRequestUnlocked(&sd, asapo::ReceiversList{expected_address1}, false); + auto err = request_handler.ProcessRequestUnlocked(&request); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } -TEST_F(RequestTests, DoNotCloseWhenRebalanceAndNotConnected) { - EXPECT_CALL(mock_io, CloseSocket_t(sd, _)).Times(0); + + +TEST_F(RequestHandlerTcpTests, DoNotCloseWhenRebalanceAndNotConnected) { + EXPECT_CALL(mock_io, CloseSocket_t(_, _)).Times(0); ExpectOKConnect(); ExpectFailSendHeader(); - auto err = request.ProcessRequestUnlocked(&sd, receivers_list, true); + request_handler.PrepareProcessingRequestLocked(); + auto err = request_handler.ProcessRequestUnlocked(&request); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } -TEST_F(RequestTests, DoNotCloseWhenRebalanceIfNotConnected) { - EXPECT_CALL(mock_io, CloseSocket_t(sd, _)).Times(0); - ExpectOKConnect(true); - ExpectFailSendHeader(true); - - - auto err = request.ProcessRequestUnlocked(&sd, asapo::ReceiversList{expected_address1}, true); - - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); -} -TEST_F(RequestTests, ReconnectWhenRebalance) { - sd = 1000; - EXPECT_CALL(mock_io, CloseSocket_t(1000, _)); - EXPECT_CALL(mock_logger, Info(HasSubstr("rebalancing"))); +TEST_F(RequestHandlerTcpTests, CloseConnectionWhenRebalance) { + DoSingleSend(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); - ExpectOKConnect(true); - ExpectFailSendHeader(true); + EXPECT_CALL(mock_discovery_service, RotatedUriList(_)). + WillOnce(Return(asapo::ReceiversList{})); + EXPECT_CALL(mock_io, CloseSocket_t(_, _)); - auto err = request.ProcessRequestUnlocked(&sd, asapo::ReceiversList{expected_address1}, true); + auto err = request_handler.ProcessRequestUnlocked(&request); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } -TEST_F(RequestTests, ErrorWhenCannotSendHeader) { + +TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendHeader) { ExpectOKConnect(); ExpectFailSendHeader(); - auto err = request.ProcessRequestUnlocked(&sd, receivers_list, false); + request_handler.PrepareProcessingRequestLocked(); + auto err = request_handler.ProcessRequestUnlocked(&request); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } -TEST_F(RequestTests, ErrorWhenCannotSendData) { +TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendData) { ExpectOKConnect(); ExpectOKSendHeader(); ExpectFailSendData(); - auto err = request.ProcessRequestUnlocked(&sd, receivers_list, false); + request_handler.PrepareProcessingRequestLocked(); + auto err = request_handler.ProcessRequestUnlocked(&request); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } -TEST_F(RequestTests, ErrorWhenCannotReceiveData) { +TEST_F(RequestHandlerTcpTests, ErrorWhenCannotReceiveData) { ExpectOKConnect(); ExpectOKSendHeader(); ExpectOKSendData(); - ExpectFailReceive(); - auto err = request.ProcessRequestUnlocked(&sd, receivers_list, false); + request_handler.PrepareProcessingRequestLocked(); + auto err = request_handler.ProcessRequestUnlocked(&request); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } - - - -TEST_F(RequestTests, ImmediatelyCallBackErrorIfFileAlreadyInUse) { +TEST_F(RequestHandlerTcpTests, ImmediatelyCallBackErrorIfFileAlreadyInUse) { ExpectOKConnect(true); ExpectOKSendHeader(true); ExpectOKSendData(true); @@ -450,7 +433,8 @@ TEST_F(RequestTests, ImmediatelyCallBackErrorIfFileAlreadyInUse) { )); - auto err = request.ProcessRequestUnlocked(&sd, receivers_list, false); + request_handler.PrepareProcessingRequestLocked(); + auto err = request_handler.ProcessRequestUnlocked(&request); ASSERT_THAT(callback_err, Eq(asapo::ProducerErrorTemplates::kFileIdAlreadyInUse)); ASSERT_THAT(called, Eq(true)); @@ -458,28 +442,29 @@ TEST_F(RequestTests, ImmediatelyCallBackErrorIfFileAlreadyInUse) { } -TEST_F(RequestTests, SendEmptyCallBack) { +TEST_F(RequestHandlerTcpTests, SendEmptyCallBack) { ExpectOKConnect(true); ExpectOKSendHeader(true); ExpectOKSendData(true); ExpectOKReceive(); - auto err = request_nocallback.ProcessRequestUnlocked(&sd, receivers_list, false); + request_handler.PrepareProcessingRequestLocked(); + auto err = request_handler.ProcessRequestUnlocked(&request_nocallback); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(called, Eq(false)); } -TEST_F(RequestTests, SendOK) { +TEST_F(RequestHandlerTcpTests, SendOK) { ExpectOKConnect(true); ExpectOKSendHeader(true); ExpectOKSendData(true); ExpectOKReceive(); - auto err = request.ProcessRequestUnlocked(&sd, receivers_list, false); + request_handler.PrepareProcessingRequestLocked(); + auto err = request_handler.ProcessRequestUnlocked(&request); ASSERT_THAT(err, Eq(nullptr)); - ASSERT_THAT(sd, Eq(expected_sds[0])); ASSERT_THAT(callback_err, Eq(nullptr)); ASSERT_THAT(called, Eq(true)); ASSERT_THAT(callback_header.data_size, Eq(header.data_size)); @@ -488,5 +473,5 @@ TEST_F(RequestTests, SendOK) { } -*/ + } diff --git a/producer/api/unittests/test_request_pool_old.cpp b/producer/api/unittests/test_request_pool_old.cpp deleted file mode 100644 index ace7f3c0c..000000000 --- a/producer/api/unittests/test_request_pool_old.cpp +++ /dev/null @@ -1,204 +0,0 @@ -#include <gtest/gtest.h> -#include <gmock/gmock.h> -#include <chrono> - -#include "unittests/MockLogger.h" -#include "common/error.h" - -#include "../src/request_handler_tcp.h" -#include "../src/request_pool.h" -#include "../src/receiver_discovery_service.h" - -#include "io/io_factory.h" - -namespace { - -using ::testing::Return; -using ::testing::_; -using ::testing::DoAll; -using ::testing::SetArgReferee; -using ::testing::Gt; -using ::testing::Eq; -using ::testing::Ne; -using ::testing::Mock; -using ::testing::AllOf; -using testing::DoAll; -using testing::NiceMock; -using ::testing::InSequence; -using ::testing::HasSubstr; - -using asapo::ReceiversList; -using asapo::RequestHandlerTcp; -using asapo::RequestPool; -using asapo::Error; -using asapo::GenericNetworkRequestHeader; - -const std::string expected_endpoint{"endpoint"}; - - - -class MockRequest : public RequestHandlerTcp { - public: - MockRequest() : RequestHandlerTcp(asapo::GenericNetworkRequestHeader{}, nullptr, nullptr) {}; - Error Handle(asapo::SocketDescriptor* sd, const ReceiversList &receivers_list, bool rebalance) override { - return Error {Send_t(sd, receivers_list, rebalance)}; - } - - MOCK_METHOD3(Send_t, asapo::SimpleError * (asapo::SocketDescriptor*, const ReceiversList&, bool)); -}; - -class RequestPoolTests : public testing::Test { - public: - NiceMock<asapo::MockLogger> mock_logger; - const uint8_t nthreads = 4; - const uint64_t max_size = 1024 * 1024 * 1024; - NiceMock<MockDiscoveryService> mock_discovery; - asapo::RequestPool pool {nthreads, max_size, &mock_discovery}; - std::unique_ptr<RequestHandlerTcp> request; - NiceMock<MockRequest>* mock_request = new testing::NiceMock<MockRequest>; - ReceiversList expected_receivers_list1{"ip1", "ip2", "ip3"}; - ReceiversList expected_receivers_list2{"ip4", "ip5", "ip6"}; - void SetUp() override { - pool.log__ = &mock_logger; - request.reset(mock_request); - ON_CALL(mock_discovery, MaxConnections()).WillByDefault(Return(100)); - ON_CALL(mock_discovery, RotatedUriList(_)).WillByDefault(Return(expected_receivers_list1)); - - } - void TearDown() override { - } -}; - -void ExpectSend(MockRequest* request, const ReceiversList& list, bool connected = false, bool rebalance = false) { - auto descriptor = connected ? 1 : asapo::kDisconnectedSocketDescriptor; - EXPECT_CALL(*request, Send_t(testing::Pointee(descriptor), list, rebalance)) - .WillOnce(DoAll( - testing::SetArgPointee<0>(1), - Return(nullptr) - ) - ); -} - -TEST(RequestPool, Constructor) { - NiceMock<MockDiscoveryService> mock_discovery; - EXPECT_CALL(mock_discovery, StartCollectingData()); - - asapo::RequestPool pool{4, 4, &mock_discovery}; - - ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(pool.log__), Ne(nullptr)); -} - -TEST(RequestPool, AddRequestFailsDueToSize) { - asapo::ReceiverDiscoveryService discovery{expected_endpoint, 1000}; - RequestPool pool{4, 0, &discovery}; - std::unique_ptr<RequestHandlerTcp> request{new RequestHandlerTcp{GenericNetworkRequestHeader{}, nullptr, nullptr}}; - - auto err = pool.AddRequest(std::move(request)); - - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull)); - -} - -TEST_F(RequestPoolTests, AddRequestCallsSend) { - EXPECT_CALL(mock_discovery, RotatedUriList(_)).WillOnce(Return(expected_receivers_list1)); - ExpectSend(mock_request, expected_receivers_list1); - - auto err = pool.AddRequest(std::move(request)); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - - ASSERT_THAT(err, Eq(nullptr)); -} - -TEST_F(RequestPoolTests, AddRequestCallsSendTwice) { - asapo::SimpleError* send_error = new asapo::SimpleError(""); - - EXPECT_CALL(*mock_request, Send_t(testing::Pointee(asapo::kDisconnectedSocketDescriptor), expected_receivers_list1, - false)) - .Times(2) - .WillOnce(DoAll( - testing::SetArgPointee<0>(asapo::kDisconnectedSocketDescriptor), - Return(send_error) - )) - .WillOnce(DoAll( - testing::SetArgPointee<0>(1), - Return(nullptr) - )); - - auto err = pool.AddRequest(std::move(request)); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - - ASSERT_THAT(err, Eq(nullptr)); -} - -TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { - EXPECT_CALL(mock_discovery, MaxConnections()).WillRepeatedly(Return(1)); - - MockRequest* mock_request2 = new MockRequest; - - ExpectSend(mock_request, expected_receivers_list1); - ExpectSend(mock_request2, expected_receivers_list1, true); - - - auto err1 = pool.AddRequest(std::move(request)); - request.reset(mock_request2); - auto err2 = pool.AddRequest(std::move(request)); - - std::this_thread::sleep_for(std::chrono::milliseconds(30)); - ASSERT_THAT(err1, Eq(nullptr)); - ASSERT_THAT(err2, Eq(nullptr)); -} - - - -TEST_F(RequestPoolTests, AddRequestDoesNotCallsSendWhenNoConnactionsAllowed) { - EXPECT_CALL(mock_discovery, MaxConnections()).WillRepeatedly(Return(0)); - EXPECT_CALL(*mock_request, Send_t(_, _, _)).Times(0); - - auto err = pool.AddRequest(std::move(request)); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - - ASSERT_THAT(err, Eq(nullptr)); -} - -TEST_F(RequestPoolTests, NRequestsInQueue) { - auto nreq = pool.NRequestsInQueue(); - ASSERT_THAT(nreq, Eq(0)); -} - - -TEST_F(RequestPoolTests, FinishProcessingThreads) { - EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing thread"))).Times(nthreads); -} - - -TEST_F(RequestPoolTests, Rebalance) { - EXPECT_CALL(mock_discovery, MaxConnections()).WillRepeatedly(Return(1)); - - MockRequest* mock_request2 = new MockRequest; - - - EXPECT_CALL(mock_discovery, RotatedUriList(_)).Times(2). - WillOnce(Return(expected_receivers_list1)). - WillOnce(Return(expected_receivers_list2)); - - - ExpectSend(mock_request, expected_receivers_list1); - ExpectSend(mock_request2, expected_receivers_list2, true, true); - - - auto err1 = pool.AddRequest(std::move(request)); - request.reset(mock_request2); - - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - - auto err2 = pool.AddRequest(std::move(request)); - - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - ASSERT_THAT(err1, Eq(nullptr)); - ASSERT_THAT(err2, Eq(nullptr)); - -} - - - -} -- GitLab