diff --git a/producer/api/src/request.cpp b/producer/api/src/request.cpp index 1907fe80de86aca5bc7b1757703323668398fb9b..ae64a462f0f08795c6ab8e6d0273844e9e273103 100644 --- a/producer/api/src/request.cpp +++ b/producer/api/src/request.cpp @@ -75,7 +75,7 @@ Error Request::TrySendToReceiver(SocketDescriptor sd, const std::string& receive -Error Request::Send(SocketDescriptor* sd, const ReceiversList& receivers_list,bool rebalance) { +Error Request::Send(SocketDescriptor* sd, const ReceiversList& receivers_list, bool rebalance) { if (rebalance && *sd != kDisconnectedSocketDescriptor) { io__->CloseSocket(*sd, nullptr); log__->Info("rebalancing"); diff --git a/producer/api/src/request_pool.cpp b/producer/api/src/request_pool.cpp index 70d51c757d091ba814b1bb82e1db0b876020f15d..92e1dfae6a90863117848ce900c77cece6fc5cbe 100644 --- a/producer/api/src/request_pool.cpp +++ b/producer/api/src/request_pool.cpp @@ -1,13 +1,7 @@ -#include <chrono> - #include "request_pool.h" #include "producer_logger.h" - -using std::chrono::high_resolution_clock; - - namespace asapo { RequestPool:: RequestPool(uint8_t n_threads, uint64_t max_pool_volume, @@ -39,54 +33,94 @@ Error RequestPool::AddRequest(std::unique_ptr<Request> request) { //todo: maybe notify_one is better here condition_.notify_all(); - return nullptr; } +bool RequestPool::IsConnected(SocketDescriptor thread_sd) { + return thread_sd != kDisconnectedSocketDescriptor; +} + +bool RequestPool::CanCreateNewConnections() { + return ncurrent_connections_ < discovery_service__->MaxConnections(); +} + +bool RequestPool::CanProcessRequest(SocketDescriptor thread_sd) { + return (request_queue_.size() && (IsConnected(thread_sd) || CanCreateNewConnections())); +} + + +void RequestPool::UpdateIfNewConnection(ThreadInformation* thread_info) { + if (thread_info->thread_sd != kDisconnectedSocketDescriptor) + return; + + thread_info->thread_receivers = discovery_service__->RotatedUriList(thread_info->id); + thread_info->last_rebalance = high_resolution_clock::now(); + ncurrent_connections_++; +} + +bool RequestPool::CheckForRebalance(ThreadInformation* thread_info) { + auto now = high_resolution_clock::now(); + uint64_t elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( now - + thread_info->last_rebalance).count(); + bool rebalance = false; + if (elapsed_ms > discovery_service__->UpdateFrequency()) { + auto thread_receivers_new = discovery_service__->RotatedUriList(thread_info->id); + thread_info->last_rebalance = now; + if (thread_receivers_new != thread_info->thread_receivers) { + thread_info->thread_receivers = thread_receivers_new; + rebalance = true; + } + } + return rebalance; + +} + +std::unique_ptr<Request> RequestPool::GetRequestFromQueue() { + auto request = std::move(request_queue_.front()); + request_queue_.pop_front(); + return request; +} + +Error RequestPool::SendDataViaRequest(const std::unique_ptr<Request>& request, ThreadInformation* thread_info) { + //unlock now that we're ready to do async operations + thread_info->lock.unlock(); + + bool rebalance = CheckForRebalance(thread_info); + auto err = request->Send(&thread_info->thread_sd, thread_info->thread_receivers, rebalance); + + // we should lock again for the next wait since we did unlock + thread_info->lock.lock(); + return err; +} + +void RequestPool::PutRequestBackToQueue(std::unique_ptr<Request> request) { + request_queue_.emplace_front(std::move(request)); + ncurrent_connections_--; +} + +void RequestPool::ProcessRequest(ThreadInformation* thread_info) { + UpdateIfNewConnection((thread_info)); + + auto request = GetRequestFromQueue(); + auto err = SendDataViaRequest(request, thread_info); + if (err) { + PutRequestBackToQueue(std::move(request)); + } +} void RequestPool::ThreadHandler(uint64_t id) { - std::unique_lock<std::mutex> lock(mutex_); - SocketDescriptor thread_sd = kDisconnectedSocketDescriptor; - ReceiversList thread_receivers; + ThreadInformation thread_info; + thread_info.id = id; + thread_info.lock = std::unique_lock<std::mutex>(mutex_); + thread_info.thread_sd = kDisconnectedSocketDescriptor; high_resolution_clock::time_point last_rebalance; do { - condition_.wait(lock, [this, thread_sd] { - return ((request_queue_.size() && (thread_sd != kDisconnectedSocketDescriptor || - ncurrent_connections_ < discovery_service__->MaxConnections()) ) || quit_); + condition_.wait(thread_info.lock, [this, &thread_info] { + return (CanProcessRequest(thread_info.thread_sd) || quit_); }); //after wait, we own the lock if (!quit_) { - if (thread_sd == kDisconnectedSocketDescriptor) { - thread_receivers = discovery_service__->RotatedUriList(id); - last_rebalance = high_resolution_clock::now(); - ncurrent_connections_++; - } - - auto request = std::move(request_queue_.front()); - request_queue_.pop_front(); - - //unlock now that we're done messing with the queue - lock.unlock(); - - uint64_t elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( high_resolution_clock::now() - last_rebalance).count(); - bool rebalance = false; - if (elapsed_ms > discovery_service__->UpdateFrequency()) { - auto thread_receivers_new = discovery_service__->RotatedUriList(id); - last_rebalance = high_resolution_clock::now(); - if (thread_receivers_new != thread_receivers) { - thread_receivers = thread_receivers_new; - rebalance = true; - } - } - - auto err = request->Send(&thread_sd, thread_receivers,rebalance); - // we should lock again for the next wait since we did unlock - lock.lock(); - if (err) { - // could not send from this thread - place request back in the queue - request_queue_.emplace_front(std::move(request)); - ncurrent_connections_--; - } + ProcessRequest(&thread_info); }; } while (!quit_); } @@ -104,7 +138,9 @@ RequestPool::~RequestPool() { } } } - - +uint64_t RequestPool::NRequestsInQueue() { + std::lock_guard<std::mutex> lock{mutex_}; + return request_queue_.size(); +} } diff --git a/producer/api/src/request_pool.h b/producer/api/src/request_pool.h index 8a955969d390a4e3532ca906bf1cfd9c29a603b3..69c27492ad0ced68f7b43a0561b4c36e73ce9971 100644 --- a/producer/api/src/request_pool.h +++ b/producer/api/src/request_pool.h @@ -7,21 +7,33 @@ #include <thread> #include <condition_variable> #include <queue> +#include <chrono> + #include "logger/logger.h" #include "request.h" #include "receiver_discovery_service.h" +using std::chrono::high_resolution_clock; + namespace asapo { class RequestPool { + struct ThreadInformation { + uint64_t id; + std::unique_lock<std::mutex> lock; + SocketDescriptor thread_sd; + ReceiversList thread_receivers; + high_resolution_clock::time_point last_rebalance; + }; public: explicit RequestPool(uint8_t n_threads, uint64_t max_pool_volume, ReceiverDiscoveryService* discovery_service); Error AddRequest(std::unique_ptr<Request> request); ~RequestPool(); AbstractLogger* log__; - ReceiverDiscoveryService* discovery_service__; + uint64_t NRequestsInQueue(); private: + ReceiverDiscoveryService* discovery_service__; std::vector<std::thread> threads_; void ThreadHandler(uint64_t id); bool quit_{false}; @@ -32,6 +44,16 @@ class RequestPool { uint64_t current_pool_volume_{0}; uint64_t ncurrent_connections_{0}; bool RequestWouldFit(const std::unique_ptr<Request>& request); + bool IsConnected(SocketDescriptor thread_sd); + bool CanCreateNewConnections(); + bool CanProcessRequest(SocketDescriptor thread_sd); + void ProcessRequest(ThreadInformation* thread_info); + void UpdateIfNewConnection(ThreadInformation* thread_info); + bool CheckForRebalance(ThreadInformation* thread_info); + std::unique_ptr<Request> GetRequestFromQueue(); + Error SendDataViaRequest(const std::unique_ptr<Request>& request, ThreadInformation* thread_info); + void PutRequestBackToQueue(std::unique_ptr<Request>request); + }; } diff --git a/producer/api/unittests/test_request.cpp b/producer/api/unittests/test_request.cpp index 0b3df412f41a9c36dab3fd3d891437da2194a378..c1e85d854c245cdcf71f661a50cd24f86673b464 100644 --- a/producer/api/unittests/test_request.cpp +++ b/producer/api/unittests/test_request.cpp @@ -263,7 +263,7 @@ TEST_F(RequestTests, MemoryRequirements) { TEST_F(RequestTests, TriesConnectWhenNotConnected) { ExpectFailConnect(); - auto err = request.Send(&sd, receivers_list,false); + auto err = request.Send(&sd, receivers_list, false); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } @@ -275,7 +275,7 @@ TEST_F(RequestTests, DoesNotTryConnectWhenConnected) { ExpectFailSendHeader(true); - auto err = request.Send(&sd, asapo::ReceiversList{expected_address1},false); + auto err = request.Send(&sd, asapo::ReceiversList{expected_address1}, false); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } @@ -285,7 +285,7 @@ TEST_F(RequestTests, DoNotCloseWhenRebalanceAndNotConnected) { ExpectOKConnect(); ExpectFailSendHeader(); - auto err = request.Send(&sd, receivers_list,true); + auto err = request.Send(&sd, receivers_list, true); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } @@ -296,7 +296,7 @@ TEST_F(RequestTests, DoNotCloseWhenRebalanceIfNotConnected) { ExpectFailSendHeader(true); - auto err = request.Send(&sd, asapo::ReceiversList{expected_address1},true); + auto err = request.Send(&sd, asapo::ReceiversList{expected_address1}, true); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } @@ -312,7 +312,7 @@ TEST_F(RequestTests, ReconnectWhenRebalance) { ExpectFailSendHeader(true); - auto err = request.Send(&sd, asapo::ReceiversList{expected_address1},true); + auto err = request.Send(&sd, asapo::ReceiversList{expected_address1}, true); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } @@ -322,7 +322,7 @@ TEST_F(RequestTests, ErrorWhenCannotSendHeader) { ExpectOKConnect(); ExpectFailSendHeader(); - auto err = request.Send(&sd, receivers_list,false); + auto err = request.Send(&sd, receivers_list, false); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } @@ -333,7 +333,7 @@ TEST_F(RequestTests, ErrorWhenCannotSendData) { ExpectOKSendHeader(); ExpectFailSendData(); - auto err = request.Send(&sd, receivers_list,false); + auto err = request.Send(&sd, receivers_list, false); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } @@ -345,7 +345,7 @@ TEST_F(RequestTests, ErrorWhenCannotReceiveData) { ExpectFailReceive(); - auto err = request.Send(&sd, receivers_list,false); + auto err = request.Send(&sd, receivers_list, false); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } @@ -367,7 +367,7 @@ TEST_F(RequestTests, ImmediatelyCallBackErrorIfFileAlreadyInUse) { )); - auto err = request.Send(&sd, receivers_list,false); + auto err = request.Send(&sd, receivers_list, false); ASSERT_THAT(callback_err, Eq(asapo::ProducerErrorTemplates::kFileIdAlreadyInUse)); ASSERT_THAT(called, Eq(true)); @@ -381,7 +381,7 @@ TEST_F(RequestTests, SendOK) { ExpectOKSendData(true); ExpectOKReceive(); - auto err = request.Send(&sd, receivers_list,false); + auto err = request.Send(&sd, receivers_list, false); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(sd, Eq(expected_sds[0])); diff --git a/producer/api/unittests/test_request_pool.cpp b/producer/api/unittests/test_request_pool.cpp index efefc974dae68150ed04616d550195a06395475e..e3687522f8b2e0bd9370b03573f78d18703963b1 100644 --- a/producer/api/unittests/test_request_pool.cpp +++ b/producer/api/unittests/test_request_pool.cpp @@ -23,13 +23,15 @@ using ::testing::Ne; using ::testing::Mock; using ::testing::AllOf; using testing::DoAll; - +using testing::NiceMock; using ::testing::InSequence; using ::testing::HasSubstr; +using asapo::ReceiversList; using asapo::Request; using asapo::RequestPool; using asapo::Error; +using asapo::GenericNetworkRequestHeader; const std::string expected_endpoint{"endpoint"}; @@ -38,74 +40,76 @@ class MockDiscoveryService : public asapo::ReceiverDiscoveryService { MockDiscoveryService() : ReceiverDiscoveryService{expected_endpoint, 1} {}; MOCK_METHOD0(StartCollectingData, void()); MOCK_METHOD0(MaxConnections, uint64_t()); - MOCK_METHOD1(RotatedUriList, asapo::ReceiversList(uint64_t)); + MOCK_METHOD1(RotatedUriList, ReceiversList(uint64_t)); }; +std::unique_ptr<asapo::IO> io = std::unique_ptr<asapo::IO> {asapo::GenerateDefaultIO()}; class MockRequest : public Request { public: - std::unique_ptr<asapo::IO> io = std::unique_ptr<asapo::IO> {asapo::GenerateDefaultIO()}; MockRequest() : Request(io.get(), asapo::GenericNetworkRequestHeader{}, nullptr, nullptr) {}; - Error Send(asapo::SocketDescriptor* sd, const asapo::ReceiversList& receivers_list, bool rebalance) override { - return Error {Send_t(sd, receivers_list,rebalance)}; + Error Send(asapo::SocketDescriptor* sd, const ReceiversList& receivers_list, bool rebalance) override { + return Error {Send_t(sd, receivers_list, rebalance)}; } - MOCK_METHOD3(Send_t, asapo::SimpleError * (asapo::SocketDescriptor*, const asapo::ReceiversList&,bool)); + MOCK_METHOD3(Send_t, asapo::SimpleError * (asapo::SocketDescriptor*, const ReceiversList&, bool)); }; class RequestPoolTests : public testing::Test { public: - testing::NiceMock<asapo::MockLogger> mock_logger; + NiceMock<asapo::MockLogger> mock_logger; const uint8_t nthreads = 4; const uint64_t max_size = 1024 * 1024 * 1024; - testing::NiceMock<MockDiscoveryService> mock_discovery; + NiceMock<MockDiscoveryService> mock_discovery; asapo::RequestPool pool {nthreads, max_size, &mock_discovery}; std::unique_ptr<Request> request; - MockRequest* mock_request = new MockRequest; - asapo::ReceiversList expected_receivers_list{"ip1", "ip2", "ip3"}; + NiceMock<MockRequest>* mock_request = new testing::NiceMock<MockRequest>; + ReceiversList expected_receivers_list1{"ip1", "ip2", "ip3"}; + ReceiversList expected_receivers_list2{"ip4", "ip5", "ip6"}; void SetUp() override { pool.log__ = &mock_logger; request.reset(mock_request); ON_CALL(mock_discovery, MaxConnections()).WillByDefault(Return(100)); - ON_CALL(mock_discovery, RotatedUriList(_)).WillByDefault(Return(expected_receivers_list)); + ON_CALL(mock_discovery, RotatedUriList(_)).WillByDefault(Return(expected_receivers_list1)); } void TearDown() override { } }; -TEST(RequestPool, Constructor) { - auto io = std::unique_ptr<asapo::IO> {asapo::GenerateDefaultIO()}; - testing::NiceMock<MockDiscoveryService> mock_discovery; +void ExpectSend(MockRequest* request, const ReceiversList& list, bool connected = false, bool rebalance = false) { + auto descriptor = connected ? 1 : asapo::kDisconnectedSocketDescriptor; + EXPECT_CALL(*request, Send_t(testing::Pointee(descriptor), list, rebalance)) + .WillOnce(DoAll( + testing::SetArgPointee<0>(1), + Return(nullptr) + ) + ); +} +TEST(RequestPool, Constructor) { + NiceMock<MockDiscoveryService> mock_discovery; EXPECT_CALL(mock_discovery, StartCollectingData()); asapo::RequestPool pool{4, 4, &mock_discovery}; ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(pool.log__), Ne(nullptr)); - ASSERT_THAT(dynamic_cast<const asapo::ReceiverDiscoveryService*>(pool.discovery_service__), Ne(nullptr)); } TEST(RequestPool, AddRequestFailsDueToSize) { - auto io = std::unique_ptr<asapo::IO> {asapo::GenerateDefaultIO()}; asapo::ReceiverDiscoveryService discovery{expected_endpoint, 1000}; RequestPool pool{4, 0, &discovery}; - asapo::GenericNetworkRequestHeader header{asapo::Opcode::kNetOpcodeCount, 1, 1}; - std::unique_ptr<Request> request{new Request{io.get(), header, nullptr, [](asapo::GenericNetworkRequestHeader, asapo::Error) {}}}; + std::unique_ptr<Request> request{new Request{io.get(), GenericNetworkRequestHeader{}, nullptr, nullptr}}; + auto err = pool.AddRequest(std::move(request)); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull)); } TEST_F(RequestPoolTests, AddRequestCallsSend) { - - EXPECT_CALL(mock_discovery, RotatedUriList(_)).WillOnce(Return(asapo::ReceiversList{"ip3", "ip2", "ip1"})); - - EXPECT_CALL(*mock_request, Send_t(testing::Pointee(asapo::kDisconnectedSocketDescriptor), - testing::ElementsAre("ip3", "ip2", "ip1"),false)). - WillOnce( - Return(nullptr) - ); + EXPECT_CALL(mock_discovery, RotatedUriList(_)).WillOnce(Return(expected_receivers_list1)); + ExpectSend(mock_request, expected_receivers_list1); auto err = pool.AddRequest(std::move(request)); std::this_thread::sleep_for(std::chrono::milliseconds(10)); @@ -114,10 +118,10 @@ TEST_F(RequestPoolTests, AddRequestCallsSend) { } TEST_F(RequestPoolTests, AddRequestCallsSendTwice) { - asapo::SimpleError* send_error = new asapo::SimpleError("www"); + asapo::SimpleError* send_error = new asapo::SimpleError(""); - EXPECT_CALL(*mock_request, Send_t(testing::Pointee(asapo::kDisconnectedSocketDescriptor), testing::ElementsAre("ip1", - "ip2", "ip3"),false)) + EXPECT_CALL(*mock_request, Send_t(testing::Pointee(asapo::kDisconnectedSocketDescriptor), expected_receivers_list1, + false)) .Times(2) .WillOnce(DoAll( testing::SetArgPointee<0>(asapo::kDisconnectedSocketDescriptor), @@ -130,8 +134,8 @@ TEST_F(RequestPoolTests, AddRequestCallsSendTwice) { auto err = pool.AddRequest(std::move(request)); std::this_thread::sleep_for(std::chrono::milliseconds(10)); - ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(err, Eq(nullptr)); } TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { @@ -139,19 +143,10 @@ TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { MockRequest* mock_request2 = new MockRequest; - EXPECT_CALL(*mock_request, Send_t(testing::Pointee(asapo::kDisconnectedSocketDescriptor), testing::ElementsAre("ip1", - "ip2", "ip3"),false)) - .WillOnce(DoAll( - testing::SetArgPointee<0>(1), - Return(nullptr) - ) - ); - EXPECT_CALL(*mock_request2, Send_t(testing::Pointee(1), testing::ElementsAre("ip1", "ip2", "ip3"),false)) - .WillOnce(DoAll( - testing::SetArgPointee<0>(1), - Return(nullptr) - ) - ); + ExpectSend(mock_request, expected_receivers_list1); + ExpectSend(mock_request2, expected_receivers_list1, true); + + auto err1 = pool.AddRequest(std::move(request)); request.reset(mock_request2); auto err2 = pool.AddRequest(std::move(request)); @@ -164,17 +159,19 @@ TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { TEST_F(RequestPoolTests, AddRequestDoesNotCallsSendWhenNoConnactionsAllowed) { - EXPECT_CALL(*mock_request, Send_t(_, _,_)).Times(0); - EXPECT_CALL(mock_discovery, MaxConnections()).WillRepeatedly(Return(0)); + EXPECT_CALL(*mock_request, Send_t(_, _, _)).Times(0); - pool.discovery_service__ = &mock_discovery; auto err = pool.AddRequest(std::move(request)); std::this_thread::sleep_for(std::chrono::milliseconds(10)); ASSERT_THAT(err, Eq(nullptr)); } +TEST_F(RequestPoolTests, NRequestsInQueue) { + auto nreq = pool.NRequestsInQueue(); + ASSERT_THAT(nreq, Eq(0)); +} TEST_F(RequestPoolTests, FinishProcessingThreads) { @@ -187,25 +184,15 @@ TEST_F(RequestPoolTests, Rebalance) { MockRequest* mock_request2 = new MockRequest; + EXPECT_CALL(mock_discovery, RotatedUriList(_)).Times(2). - WillOnce(Return(asapo::ReceiversList{"ip3", "ip2", "ip1"})). - WillOnce(Return(asapo::ReceiversList{"ip4", "ip5", "ip6"})); + WillOnce(Return(expected_receivers_list1)). + WillOnce(Return(expected_receivers_list2)); - EXPECT_CALL(*mock_request, Send_t(testing::Pointee(asapo::kDisconnectedSocketDescriptor), testing::ElementsAre("ip3", - "ip2", "ip1"),false)) - .WillOnce(DoAll( - testing::SetArgPointee<0>(1), - Return(nullptr) - ) - ); + ExpectSend(mock_request, expected_receivers_list1); + ExpectSend(mock_request2, expected_receivers_list2, true, true); - EXPECT_CALL(*mock_request2, Send_t(testing::Pointee(1), testing::ElementsAre("ip4", "ip5", "ip6"),true)) - .WillOnce(DoAll( - testing::SetArgPointee<0>(1), - Return(nullptr) - ) - ); auto err1 = pool.AddRequest(std::move(request)); request.reset(mock_request2);