diff --git a/producer/api/CMakeLists.txt b/producer/api/CMakeLists.txt index 29f13b508c5453bb976a294235edc6883ba6c9ab..610dcba3d1c773d214ced99477b0cc4ca9303d8d 100644 --- a/producer/api/CMakeLists.txt +++ b/producer/api/CMakeLists.txt @@ -24,7 +24,7 @@ target_link_libraries(${TARGET_NAME} ${CURL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} set(TEST_SOURCE_FILES unittests/test_producer_impl.cpp unittests/test_producer.cpp -# unittests/test_request_handler_tcp.cpp + unittests/test_request_handler_tcp.cpp unittests/test_request_pool.cpp unittests/test_receiver_discovery_service.cpp unittests/test_request_handler_factory.cpp diff --git a/producer/api/src/producer_impl.h b/producer/api/src/producer_impl.h index b2d7ac65f521318332a64b6fba8c36ad3c6cc3fb..f5c60c3fb22edfe55f4d3612e635e711999f69c8 100644 --- a/producer/api/src/producer_impl.h +++ b/producer/api/src/producer_impl.h @@ -20,7 +20,7 @@ class ProducerImpl : public Producer { public: static const size_t kMaxChunkSize; static const size_t kMaxPoolVolume; - static const uint64_t kDiscoveryServiceUpdateFrequencyMs; + static const size_t kDiscoveryServiceUpdateFrequencyMs; explicit ProducerImpl(std::string endpoint, uint8_t n_processing_threads); ProducerImpl(const ProducerImpl&) = delete; diff --git a/producer/api/src/producer_logger.cpp b/producer/api/src/producer_logger.cpp index addf222008df69f0fb14ca0270b1b58bb7d4d16e..35528043b319b1127fa513ac94530e18e3aa7028 100644 --- a/producer/api/src/producer_logger.cpp +++ b/producer/api/src/producer_logger.cpp @@ -5,7 +5,9 @@ namespace asapo { AbstractLogger* GetDefaultProducerLogger() { //todo get fluentd uri from service discovery - static Logger logger = CreateDefaultLoggerApi("producer_api", "http://max-wgs.desy.de:9880/asapo"); + // static Logger logger = CreateDefaultLoggerApi("producer_api", "http://max-wgs.desy.de:9880/asapo"); + static Logger logger = CreateDefaultLoggerBin("producer_api"); + return logger.get(); } diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp index 059905fba3cc5af6ce5b9d261ba454d67b7acea0..daf29846414443e9cca647ce7e2b1389a4eebaf8 100644 --- a/producer/api/src/request_handler_tcp.cpp +++ b/producer/api/src/request_handler_tcp.cpp @@ -142,7 +142,7 @@ bool RequestHandlerTcp::CanCreateNewConnections() { } bool RequestHandlerTcp::ReadyProcessRequest() { - IsConnected() || CanCreateNewConnections(); + return IsConnected() || CanCreateNewConnections(); } void RequestHandlerTcp::PrepareProcessingRequestLocked() { diff --git a/producer/api/unittests/mocking.h b/producer/api/unittests/mocking.h index 88804241679ffc5223cdbffa676a01411361d29a..6a6aaf46c7fcb998092078f60b4868f457b8172f 100644 --- a/producer/api/unittests/mocking.h +++ b/producer/api/unittests/mocking.h @@ -40,13 +40,20 @@ class MockRequestPull : public RequestPool { class MockRequestHandler : public RequestHandler { public: - Error ProcessRequestUnlocked(const Request* request) { + Error ProcessRequestUnlocked(const Request* request) override { return Error{ProcessRequestUnlocked_t(request)}; } - - MOCK_METHOD0(PrepareProcessingRequestLocked, void()); + void TearDownProcessingRequestLocked(const Error& error_from_process) override{ + if (error_from_process) { + TearDownProcessingRequestLocked_t(error_from_process.get()); + } + else { + TearDownProcessingRequestLocked_t(nullptr); + } + } + MOCK_METHOD0(PrepareProcessingRequestLocked, void()); MOCK_METHOD0(ReadyProcessRequest, bool()); - MOCK_METHOD1(TearDownProcessingRequestLocked, void(const Error& error_from_process)); + MOCK_METHOD1(TearDownProcessingRequestLocked_t, void(ErrorInterface* error_from_process)); MOCK_METHOD1(ProcessRequestUnlocked_t, ErrorInterface*(const Request*)); }; diff --git a/producer/api/unittests/test_request_pool.cpp b/producer/api/unittests/test_request_pool.cpp index b66bf4d04eb3deace7e44896943fddc94238487c..9202d0d5b466560ce7f74032d8ff2e41c1a08723 100644 --- a/producer/api/unittests/test_request_pool.cpp +++ b/producer/api/unittests/test_request_pool.cpp @@ -28,6 +28,8 @@ using testing::DoAll; using testing::NiceMock; using ::testing::InSequence; using ::testing::HasSubstr; +using testing::AtLeast; +using testing::Ref; using asapo::RequestHandler; using asapo::RequestPool; @@ -37,28 +39,32 @@ using asapo::Request; using asapo::GenericNetworkRequestHeader; -MockRequestHandler mock_request_handler; - class MockRequestHandlerFactory : public asapo::RequestHandlerFactory { public: - MockRequestHandlerFactory():RequestHandlerFactory(asapo::RequestHandlerType::kTcp, nullptr){}; + MockRequestHandlerFactory(RequestHandler* request_handler): + RequestHandlerFactory(asapo::RequestHandlerType::kTcp, nullptr) + { + request_handler_ = request_handler; + } std::unique_ptr<RequestHandler> NewRequestHandler(uint64_t thread_id) override { - return std::unique_ptr<RequestHandler>{&mock_request_handler}; + return std::unique_ptr<RequestHandler>{request_handler_}; } + private: + RequestHandler* request_handler_; }; class RequestPoolTests : public testing::Test { public: + NiceMock<MockRequestHandler>* mock_request_handler = new testing::NiceMock<MockRequestHandler>; NiceMock<asapo::MockLogger> mock_logger; - MockRequestHandlerFactory request_handler_factory; - const uint8_t nthreads = 4; + MockRequestHandlerFactory request_handler_factory{mock_request_handler}; + const uint8_t nthreads = 1; const uint64_t max_size = 1024 * 1024 * 1024; asapo::RequestPool pool {nthreads, max_size, &request_handler_factory}; - std::unique_ptr<Request> request; - NiceMock<MockRequestHandler>* mock_request_handler = new testing::NiceMock<MockRequestHandler>; + std::unique_ptr<Request> request{new Request{GenericNetworkRequestHeader{}, nullptr, nullptr}}; void SetUp() override { pool.log__ = &mock_logger; } @@ -76,12 +82,14 @@ TEST(RequestPool, Constructor) { ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(pool.log__), Ne(nullptr)); } -/* + TEST(RequestPool, AddRequestFailsDueToSize) { - asapo::ReceiverDiscoveryService discovery{expected_endpoint, 1000}; - RequestPool pool{4, 0, &discovery}; - std::unique_ptr<RequestHandlerTcp> request{new RequestHandlerTcp{GenericNetworkRequestHeader{}, nullptr, nullptr}}; + asapo::ReceiverDiscoveryService discovery{asapo::expected_endpoint, 1000}; + NiceMock<asapo::RequestHandlerFactory> request_handler_factory{asapo::RequestHandlerType::kTcp,&discovery}; + + RequestPool pool{4, 0, &request_handler_factory}; + std::unique_ptr<Request> request{new Request{GenericNetworkRequestHeader{}, nullptr, nullptr}}; auto err = pool.AddRequest(std::move(request)); @@ -89,30 +97,10 @@ TEST(RequestPool, AddRequestFailsDueToSize) { } -TEST_F(RequestPoolTests, AddRequestCallsSend) { - EXPECT_CALL(mock_discovery, RotatedUriList(_)).WillOnce(Return(expected_receivers_list1)); - ExpectSend(mock_request, expected_receivers_list1); - - auto err = pool.AddRequest(std::move(request)); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); +TEST_F(RequestPoolTests, AddRequestDoesGoFurtherWhenNotReady) { - ASSERT_THAT(err, Eq(nullptr)); -} - -TEST_F(RequestPoolTests, AddRequestCallsSendTwice) { - asapo::SimpleError* send_error = new asapo::SimpleError(""); - - EXPECT_CALL(*mock_request, Send_t(testing::Pointee(asapo::kDisconnectedSocketDescriptor), expected_receivers_list1, - false)) - .Times(2) - .WillOnce(DoAll( - testing::SetArgPointee<0>(asapo::kDisconnectedSocketDescriptor), - Return(send_error) - )) - .WillOnce(DoAll( - testing::SetArgPointee<0>(1), - Return(nullptr) - )); + EXPECT_CALL(*mock_request_handler, ReadyProcessRequest()).Times(AtLeast(1)).WillRepeatedly(Return(false)); + EXPECT_CALL(*mock_request_handler, PrepareProcessingRequestLocked()).Times(0); auto err = pool.AddRequest(std::move(request)); std::this_thread::sleep_for(std::chrono::milliseconds(10)); @@ -120,75 +108,53 @@ TEST_F(RequestPoolTests, AddRequestCallsSendTwice) { ASSERT_THAT(err, Eq(nullptr)); } -TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { - EXPECT_CALL(mock_discovery, MaxConnections()).WillRepeatedly(Return(1)); - - MockRequestHandler* mock_request2 = new MockRequestHandler; - - ExpectSend(mock_request, expected_receivers_list1); - ExpectSend(mock_request2, expected_receivers_list1, true); - - - auto err1 = pool.AddRequest(std::move(request)); - request.reset(mock_request2); - auto err2 = pool.AddRequest(std::move(request)); +TEST_F(RequestPoolTests, NRequestsInQueue) { + auto nreq = pool.NRequestsInQueue(); + ASSERT_THAT(nreq, Eq(0)); +} - std::this_thread::sleep_for(std::chrono::milliseconds(30)); - ASSERT_THAT(err1, Eq(nullptr)); - ASSERT_THAT(err2, Eq(nullptr)); +void ExpectSend(MockRequestHandler* mock_handler,int ntimes = 1) { + EXPECT_CALL(*mock_handler, ReadyProcessRequest()).Times(ntimes).WillRepeatedly(Return(true)); + EXPECT_CALL(*mock_handler, PrepareProcessingRequestLocked()).Times(ntimes); + EXPECT_CALL(*mock_handler, ProcessRequestUnlocked_t(_)).Times(ntimes).WillRepeatedly(Return(nullptr)); + EXPECT_CALL(*mock_handler, TearDownProcessingRequestLocked_t(nullptr)).Times(ntimes); } -TEST_F(RequestPoolTests, AddRequestDoesNotCallsSendWhenNoConnactionsAllowed) { - EXPECT_CALL(mock_discovery, MaxConnections()).WillRepeatedly(Return(0)); - EXPECT_CALL(*mock_request, Send_t(_, _, _)).Times(0); +TEST_F(RequestPoolTests, AddRequestCallsSend) { + + ExpectSend(mock_request_handler); auto err = pool.AddRequest(std::move(request)); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::sleep_for(std::chrono::milliseconds(20)); ASSERT_THAT(err, Eq(nullptr)); } -TEST_F(RequestPoolTests, NRequestsInQueue) { - auto nreq = pool.NRequestsInQueue(); - ASSERT_THAT(nreq, Eq(0)); -} - - -TEST_F(RequestPoolTests, FinishProcessingThreads) { - EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing thread"))).Times(nthreads); -} - - -TEST_F(RequestPoolTests, Rebalance) { - EXPECT_CALL(mock_discovery, MaxConnections()).WillRepeatedly(Return(1)); - - MockRequestHandler* mock_request2 = new MockRequestHandler; +TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { - EXPECT_CALL(mock_discovery, RotatedUriList(_)).Times(2). - WillOnce(Return(expected_receivers_list1)). - WillOnce(Return(expected_receivers_list2)); + Request* request2 = new Request{GenericNetworkRequestHeader{}, nullptr, nullptr}; + ExpectSend(mock_request_handler,2); - ExpectSend(mock_request, expected_receivers_list1); - ExpectSend(mock_request2, expected_receivers_list2, true, true); auto err1 = pool.AddRequest(std::move(request)); - request.reset(mock_request2); - - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - + request.reset(request2); auto err2 = pool.AddRequest(std::move(request)); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::sleep_for(std::chrono::milliseconds(30)); ASSERT_THAT(err1, Eq(nullptr)); ASSERT_THAT(err2, Eq(nullptr)); +} + + +TEST_F(RequestPoolTests, FinishProcessingThreads) { + EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing thread"))).Times(nthreads); } -*/ }