diff --git a/common/cpp/include/common/data_structs.h b/common/cpp/include/common/data_structs.h index 1337b7e311c0394ce1416440e26cbc6b59c1218c..fb6b0038b0b966260351eeedc14d65ec8d9e9cb9 100644 --- a/common/cpp/include/common/data_structs.h +++ b/common/cpp/include/common/data_structs.h @@ -19,7 +19,7 @@ class FileInfo { uint64_t buf_id{0}; std::string Json() const; bool SetFromJson(const std::string& json_string); - std::string FullName(const std::string& base_path); + std::string FullName(const std::string& base_path) const; }; inline bool operator==(const FileInfo& lhs, const FileInfo& rhs) { diff --git a/common/cpp/src/data_structs/data_structs.cpp b/common/cpp/src/data_structs/data_structs.cpp index 323d1d6afc71adb320b7fef19a62cbd219be1980..9c5c434d99189e78511759c0c862506bd3ee44d2 100644 --- a/common/cpp/src/data_structs/data_structs.cpp +++ b/common/cpp/src/data_structs/data_structs.cpp @@ -54,7 +54,7 @@ bool FileInfo::SetFromJson(const std::string& json_string) { return true; } -std::string FileInfo::FullName(const std::string& base_path) { +std::string FileInfo::FullName(const std::string& base_path) const { std::string full_name; full_name = base_path.empty() ? "" : base_path + "/"; return full_name + name; diff --git a/common/cpp/unittests/data_structs/test_data_structs.cpp b/common/cpp/unittests/data_structs/test_data_structs.cpp index dbd401a692bef49f15a82d5870449351b88589b5..e4cde4e89d752be098f4ee1688fa9bc660757cf6 100644 --- a/common/cpp/unittests/data_structs/test_data_structs.cpp +++ b/common/cpp/unittests/data_structs/test_data_structs.cpp @@ -21,7 +21,6 @@ using ::testing::SetArgPointee; namespace { uint64_t big_uint = 18446744073709551615ull; -int64_t big_int = -9223372036854775807ll - 1; FileInfo PrepareFileInfo() { FileInfo finfo; diff --git a/receiver/src/receiver_data_server/tcp_server.cpp b/receiver/src/receiver_data_server/tcp_server.cpp index 557137def06ebe0c2dc2040800f023d49d3a1f38..0292885066fd00791df930caf95091f274c4fc21 100644 --- a/receiver/src/receiver_data_server/tcp_server.cpp +++ b/receiver/src/receiver_data_server/tcp_server.cpp @@ -99,6 +99,7 @@ Error TcpServer::SendData(uint64_t source_id, void* buf, uint64_t size) const no } return err; } + void TcpServer::HandleAfterError(uint64_t source_id) const noexcept { CloseSocket(source_id); } diff --git a/receiver/src/request_handler_authorize.cpp b/receiver/src/request_handler_authorize.cpp index 00f932b8270983e447636d5a6071168ea6b96726..b7ff5d211e6dacff19b38423bf4b5bb29bd86ffe 100644 --- a/receiver/src/request_handler_authorize.cpp +++ b/receiver/src/request_handler_authorize.cpp @@ -73,8 +73,8 @@ Error RequestHandlerAuthorize::ProcessOtherRequest(Request* request) const { return ReceiverErrorTemplates::kAuthorizationFailure.Generate(); } - auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds> - (high_resolution_clock::now() - last_updated_).count(); + uint64_t elapsed_ms = (uint64_t) std::chrono::duration_cast<std::chrono::milliseconds> + (high_resolution_clock::now() - last_updated_).count(); if (elapsed_ms >= GetReceiverConfig()->authorization_interval_ms) { auto err = Authorize(request, beamtime_id_.c_str()); if (err) { diff --git a/worker/api/cpp/CMakeLists.txt b/worker/api/cpp/CMakeLists.txt index 5cb0ffd2c46f60faeeb34e6bb26beb853071e91a..c671bb9d6bf7d9a89bfb3943a306e5045430f3d0 100644 --- a/worker/api/cpp/CMakeLists.txt +++ b/worker/api/cpp/CMakeLists.txt @@ -4,7 +4,8 @@ set(SOURCE_FILES src/data_broker.cpp src/folder_data_broker.cpp src/server_data_broker.cpp - src/tcp_client.cpp src/tcp_client.h) + src/tcp_client.cpp + src/tcp_connection_pool.cpp) ################################ @@ -27,7 +28,10 @@ target_link_libraries(${TARGET_NAME} ${CURL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} ################################ set(TEST_SOURCE_FILES unittests/test_worker_api.cpp unittests/test_folder_broker.cpp - unittests/test_server_broker.cpp) + unittests/test_server_broker.cpp + unittests/test_tcp_client.cpp + unittests/test_tcp_connection_pool.cpp + ) set(TEST_LIBRARIES "${TARGET_NAME}") diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp index 3a9f340341b3498875f0469fa0075645539a56e0..048178b3dd11f62dab1dfc5655ad852dec0fa9c3 100644 --- a/worker/api/cpp/src/server_data_broker.cpp +++ b/worker/api/cpp/src/server_data_broker.cpp @@ -170,16 +170,19 @@ Error ServerDataBroker::GetImageFromServer(GetImageServerOperation op, FileInfo* return err; } + return GetDataIfNeeded(info, data); +} + +Error ServerDataBroker::GetDataIfNeeded(FileInfo* info, FileData* data) { if (data == nullptr) { return nullptr; } - Error error; - - error = net_client__->GetData(info, data); + auto error = net_client__->GetData(info, data); if (error) { *data = io__->GetDataFromFile(info->FullName(""), &info->size, &error); } + return error; } diff --git a/worker/api/cpp/src/server_data_broker.h b/worker/api/cpp/src/server_data_broker.h index 643fd560fb0471cf54d7525f19dd1ae6a976dac0..5398167586c6c4e2fa697d3db3d66b6f13bcdc0d 100644 --- a/worker/api/cpp/src/server_data_broker.h +++ b/worker/api/cpp/src/server_data_broker.h @@ -29,6 +29,7 @@ class ServerDataBroker final : public asapo::DataBroker { private: std::string RequestWithToken(std::string uri); Error GetFileInfoFromServer(FileInfo* info, GetImageServerOperation op); + Error GetDataIfNeeded(FileInfo* info, FileData* data); Error GetBrokerUri(); void ProcessServerError(Error* err, const std::string& response, std::string* redirect_uri); Error ProcessRequest(std::string* response, std::string request_uri); diff --git a/worker/api/cpp/src/tcp_client.cpp b/worker/api/cpp/src/tcp_client.cpp index 87e43d7f3882ab1c8bfcd31ce69724780710fac2..8addce07be1e0442c9d6223b92dff11428024d06 100644 --- a/worker/api/cpp/src/tcp_client.cpp +++ b/worker/api/cpp/src/tcp_client.cpp @@ -1,9 +1,81 @@ #include "tcp_client.h" - +#include "io/io_factory.h" +#include "common/networking.h" namespace asapo { +TcpClient::TcpClient() : io__{GenerateDefaultIO()}, connection_pool__{new TcpConnectionPool()} { + +} + + +Error TcpClient::SendGetDataRequest(SocketDescriptor sd, const FileInfo* info) const noexcept { + Error err; + GenericRequestHeader request_header{kOpcodeGetBufferData, info->buf_id, info->size}; + io__->Send(sd, &request_header, sizeof(request_header), &err); + if (err) { + io__->CloseSocket(sd, nullptr); + } + return err; +} + +Error TcpClient::ReconnectAndResendGetDataRequest(SocketDescriptor* sd, const FileInfo* info) const noexcept { + Error err; + *sd = connection_pool__->Reconnect(*sd, &err); + if (err) { + return err; + } else { + return SendGetDataRequest(*sd, info); + } +} + +Error TcpClient::ReceiveResponce(SocketDescriptor sd) const noexcept { + Error err; + + GenericNetworkResponse Response; + io__->Receive(sd, &Response, sizeof(Response), &err); + if(err != nullptr) { + io__->CloseSocket(sd, nullptr); + return err; + } + switch (Response.error_code) { + case kNetErrorWrongRequest : + io__->CloseSocket(sd, nullptr); + return Error{new SimpleError("internal server error: wrong request")}; + case kNetErrorNoData : + return Error{new SimpleError("no data")}; + default: + return nullptr; + } +} + +Error TcpClient::QueryCacheHasData(SocketDescriptor* sd, const FileInfo* info, bool try_reconnect) const noexcept { + Error err; + err = SendGetDataRequest(*sd, info); + if (err && try_reconnect) { + err = ReconnectAndResendGetDataRequest(sd, info); + } + if (err) { + return err; + } + + return ReceiveResponce(*sd); +} + Error TcpClient::GetData(const FileInfo* info, FileData* data) const noexcept { - return ErrorTemplates::kMemoryAllocationError.Generate(); + Error err; + bool reused; + auto sd = connection_pool__->GetFreeConnection(info->source, &reused, &err); + if (err != nullptr) { + return err; + } + + err = QueryCacheHasData(&sd, info, reused); + if (err) { + return err; + } + + io__->Receive(sd, data->get(), info->size, &err); + return err; } } \ No newline at end of file diff --git a/worker/api/cpp/src/tcp_client.h b/worker/api/cpp/src/tcp_client.h index 6188b499ad4337ffc2a004c99074f55b1e546bda..fb9d2fa3309fa9311d84543c42f6c94158aecacf 100644 --- a/worker/api/cpp/src/tcp_client.h +++ b/worker/api/cpp/src/tcp_client.h @@ -2,12 +2,22 @@ #define ASAPO_TCP_CLIENT_H #include "net_client.h" +#include "io/io.h" +#include "tcp_connection_pool.h" namespace asapo { class TcpClient : public NetClient { public: + explicit TcpClient(); Error GetData(const FileInfo* info, FileData* data) const noexcept override; + std::unique_ptr<IO> io__; + std::unique_ptr<TcpConnectionPool> connection_pool__; + private: + Error SendGetDataRequest(SocketDescriptor sd, const FileInfo* info) const noexcept; + Error ReconnectAndResendGetDataRequest(SocketDescriptor* sd, const FileInfo* info) const noexcept; + Error ReceiveResponce(SocketDescriptor sd) const noexcept; + Error QueryCacheHasData(SocketDescriptor* sd, const FileInfo* info, bool try_reconnect) const noexcept; }; diff --git a/worker/api/cpp/src/tcp_connection_pool.cpp b/worker/api/cpp/src/tcp_connection_pool.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3144e04fe00765e5b3838fb1ade9894eecb8e292 --- /dev/null +++ b/worker/api/cpp/src/tcp_connection_pool.cpp @@ -0,0 +1,72 @@ +#include "tcp_connection_pool.h" + +#include <algorithm> + +#include "io/io_factory.h" + +namespace asapo { + +TcpConnectionPool::TcpConnectionPool() : io__{GenerateDefaultIO()} { + +} + +SocketDescriptor TcpConnectionPool::Connect(const std::string& source, Error* err) { + auto sd = io__->CreateAndConnectIPTCPSocket(source, err); + if (*err != nullptr) { + return kDisconnectedSocketDescriptor; + } + return sd; +} + +SocketDescriptor TcpConnectionPool::GetFreeConnection(const std::string& source, bool* reused, Error* err) { + std::lock_guard<std::mutex> lock{mutex_}; + + for (auto& connection : connections_) { + if (source == connection.uri && !connection.in_use) { + connection.in_use = true; + *err = nullptr; + *reused = true; + return connection.sd; + } + } + + auto sd = Connect(source, err); + if (*err == nullptr) { + *reused = false; + TcpConnectionInfo connection{source, sd, true}; + connections_.emplace_back(std::move(connection)); + } + return sd; +} + +SocketDescriptor TcpConnectionPool::Reconnect(SocketDescriptor sd, Error* err) { + std::lock_guard<std::mutex> lock{mutex_}; + + for (size_t i = 0; i < connections_.size(); i++) { + if (connections_[i].sd == sd) { + auto new_sd = Connect(connections_[i].uri, err); + if (err == nullptr) { + connections_[i].sd = new_sd; + connections_[i].in_use = true; + } else { + connections_.erase(connections_.begin() + i); + } + return new_sd; + } + } + + *err = Error{new SimpleError("cannot find connection in pool")}; + return kDisconnectedSocketDescriptor; +} + +void TcpConnectionPool::ReleaseConnection(SocketDescriptor sd) { + std::lock_guard<std::mutex> lock{mutex_}; + + for (auto& connection : connections_) { + if (sd == connection.sd) { + connection.in_use = false; + } + } +} + +} \ No newline at end of file diff --git a/worker/api/cpp/src/tcp_connection_pool.h b/worker/api/cpp/src/tcp_connection_pool.h new file mode 100644 index 0000000000000000000000000000000000000000..b3e8195e3958d7262e75698af02b349499b2cf88 --- /dev/null +++ b/worker/api/cpp/src/tcp_connection_pool.h @@ -0,0 +1,32 @@ +#ifndef ASAPO_TCP_CONNECTION_POOL_H +#define ASAPO_TCP_CONNECTION_POOL_H + +#include <mutex> + +#include "io/io.h" +#include "preprocessor/definitions.h" + +namespace asapo { + +struct TcpConnectionInfo { + std::string uri; + SocketDescriptor sd; + bool in_use; +}; + +class TcpConnectionPool { + public: + VIRTUAL SocketDescriptor GetFreeConnection(const std::string& source, bool* reused, Error* err); + VIRTUAL SocketDescriptor Reconnect(SocketDescriptor sd, Error* err); + VIRTUAL void ReleaseConnection(SocketDescriptor sd); + TcpConnectionPool(); + std::unique_ptr<IO> io__; + private: + SocketDescriptor Connect(const std::string& source, Error* err); + std::vector<TcpConnectionInfo> connections_; + std::mutex mutex_; +}; + +} + +#endif //ASAPO_TCP_CONNECTION_POOL_H diff --git a/worker/api/cpp/unittests/mocking.h b/worker/api/cpp/unittests/mocking.h index 32be31e26682a57145248aa09b73c51669e9023e..83ba19658be91427342a66bb7eb53760e105cdc2 100644 --- a/worker/api/cpp/unittests/mocking.h +++ b/worker/api/cpp/unittests/mocking.h @@ -5,6 +5,7 @@ #include <gmock/gmock.h> #include "../src/net_client.h" +#include "../src/tcp_connection_pool.h" namespace asapo { @@ -19,6 +20,29 @@ class MockNetClient : public asapo::NetClient { }; + +class MockTCPConnectionPool : public asapo::TcpConnectionPool { + public: + + SocketDescriptor GetFreeConnection(const std::string& source, bool* reused, Error* err) override { + ErrorInterface* error = nullptr; + auto data = GetFreeConnection_t(source, reused, &error); + err->reset(error); + return data; + } + MOCK_METHOD3(GetFreeConnection_t, SocketDescriptor (const std::string&, bool* reused, ErrorInterface**)); + + SocketDescriptor Reconnect(SocketDescriptor sd, Error* err) override { + ErrorInterface* error = nullptr; + auto data = Reconnect_t(sd, &error); + err->reset(error); + return data; + } + MOCK_METHOD2(Reconnect_t, SocketDescriptor (SocketDescriptor, ErrorInterface**)); + + MOCK_METHOD1(ReleaseConnection, void (SocketDescriptor)); +}; + } diff --git a/worker/api/cpp/unittests/test_tcp_client.cpp b/worker/api/cpp/unittests/test_tcp_client.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7f71a81c1990b7db3d41f24d279491e25be73ed9 --- /dev/null +++ b/worker/api/cpp/unittests/test_tcp_client.cpp @@ -0,0 +1,240 @@ +#include <gmock/gmock.h> +#include "gtest/gtest.h" + +#include "io/io.h" +#include "unittests/MockIO.h" +#include "mocking.h" +#include "../src/tcp_client.h" +#include "../../../../common/cpp/src/system_io/system_io.h" +#include "common/networking.h" + +using asapo::IO; +using asapo::FileInfo; +using asapo::FileData; +using asapo::MockIO; +using asapo::SimpleError; +using asapo::TcpClient; +using asapo::MockTCPConnectionPool; + + +using ::testing::AtLeast; +using ::testing::Eq; +using ::testing::HasSubstr; +using ::testing::Ne; +using ::testing::Test; +using ::testing::_; +using ::testing::Mock; +using ::testing::NiceMock; +using ::testing::Return; +using ::testing::SetArgPointee; +using ::testing::SetArgReferee; +using testing::AllOf; + +namespace { + +TEST(TcpClient, Constructor) { + auto client = std::unique_ptr<TcpClient> {new TcpClient()}; + ASSERT_THAT(dynamic_cast<asapo::SystemIO*>(client->io__.get()), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<asapo::TcpConnectionPool*>(client->connection_pool__.get()), Ne(nullptr)); +} + +MATCHER_P4(M_CheckSendDataRequest, op_code, buf_id, data_size, message, + "Checks if a valid GenericRequestHeader was Send") { + return ((asapo::GenericRequestHeader*) arg)->op_code == op_code + && ((asapo::GenericRequestHeader*) arg)->data_id == uint64_t(buf_id) + && ((asapo::GenericRequestHeader*) arg)->data_size == uint64_t(data_size) + && strcmp(((asapo::GenericRequestHeader*) arg)->message, message) == 0; +} + +ACTION_P(A_WriteSendDataResponse, error_code) { + ((asapo::SendDataResponse*)arg1)->op_code = asapo::kOpcodeGetBufferData; + ((asapo::SendDataResponse*)arg1)->error_code = error_code; +} + + +class TcpClientTests : public Test { + public: + std::unique_ptr<TcpClient> client = std::unique_ptr<TcpClient> {new TcpClient()}; + NiceMock<MockIO> mock_io; + NiceMock<MockTCPConnectionPool> mock_connection_pool; + FileInfo info; + std::string expected_uri = "test:8400"; + uint64_t expected_buf_id = 123; + uint64_t expected_size = 1233; + FileData data; + asapo::SocketDescriptor expected_sd = 1; + void SetUp() override { + info.source = expected_uri; + info.buf_id = expected_buf_id; + info.size = expected_size; + client->io__ = std::unique_ptr<IO> {&mock_io}; + client->connection_pool__ = std::unique_ptr<asapo::TcpConnectionPool> {&mock_connection_pool}; + } + void TearDown() override { + client->io__.release(); + client->connection_pool__.release(); + } + + void ExpectNewConnection(bool reused, bool ok) { + EXPECT_CALL(mock_connection_pool, GetFreeConnection_t(expected_uri, _, _)).WillOnce( + DoAll( + SetArgPointee<1>(reused), + SetArgPointee<2>(ok ? nullptr : asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), + Return(ok ? expected_sd : asapo::kDisconnectedSocketDescriptor)) + ); + } + + void ExpectReconnect(bool ok) { + EXPECT_CALL(mock_connection_pool, Reconnect_t(expected_sd, _)).WillOnce( + DoAll( + SetArgPointee<1>(ok ? nullptr : asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), + Return(ok ? expected_sd + 1 : asapo::kDisconnectedSocketDescriptor)) + ); + } + + void ExpectSendDataRequest(asapo::SocketDescriptor sd, bool ok = true) { + EXPECT_CALL(mock_io, Send_t(sd, M_CheckSendDataRequest(asapo::kOpcodeGetBufferData, expected_buf_id, + expected_size, ""), + sizeof(asapo::GenericRequestHeader), _)) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(ok ? nullptr + : asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), + Return(ok ? 1 : -1) + )); + if (!ok) { + EXPECT_CALL(mock_io, CloseSocket_t(sd, _)); + } + } + + + + void ExpectGetResponce(asapo::SocketDescriptor sd, bool ok, asapo::NetworkErrorCode responce_code) { + + EXPECT_CALL(mock_io, Receive_t(sd, _, sizeof(asapo::SendDataResponse), _)) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(ok ? nullptr : asapo::IOErrorTemplates::kConnectionRefused.Generate().release()), + A_WriteSendDataResponse(responce_code), + testing::ReturnArg<2>() + )); + } + + void ExpectGetData(asapo::SocketDescriptor sd, bool ok) { + + EXPECT_CALL(mock_io, Receive_t(sd, _, expected_size, _)) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(ok ? nullptr : asapo::IOErrorTemplates::kTimeout.Generate().release()), + testing::Return(ok ? expected_size : -1) + )); + } +}; + +TEST_F(TcpClientTests, ErrorGetNewConnection) { + ExpectNewConnection(false, false); + + auto err = client->GetData(&info, &data); + + ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kUnknownIOError)); +} + +TEST_F(TcpClientTests, SendHeaderForNewConnectionReturnsError) { + ExpectNewConnection(false, true); + ExpectSendDataRequest(expected_sd, false); + + auto err = client->GetData(&info, &data); + + ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kBadFileNumber)); +} + +TEST_F(TcpClientTests, OnErrorSendHeaderTriesToReconnectAndFails) { + ExpectNewConnection(true, true); + ExpectSendDataRequest(expected_sd, false); + ExpectReconnect(false); + + auto err = client->GetData(&info, &data); + + ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kUnknownIOError)); +} + +TEST_F(TcpClientTests, OnErrorSendHeaderTriesToReconnectAndSendsAnotherRequest) { + ExpectNewConnection(true, true); + ExpectSendDataRequest(expected_sd, false); + ExpectReconnect(true); + ExpectSendDataRequest(expected_sd + 1, false); + + auto err = client->GetData(&info, &data); + + ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kBadFileNumber)); +} + +TEST_F(TcpClientTests, GetResponceReturnsError) { + ExpectNewConnection(false, true); + ExpectSendDataRequest(expected_sd, true); + ExpectGetResponce(expected_sd, false, asapo::kNetErrorNoError); + EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); + + auto err = client->GetData(&info, &data); + + ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kConnectionRefused)); +} + +TEST_F(TcpClientTests, GetResponceReturnsNoData) { + ExpectNewConnection(false, true); + ExpectSendDataRequest(expected_sd, true); + ExpectGetResponce(expected_sd, false, asapo::kNetErrorNoData); + + auto err = client->GetData(&info, &data); + + ASSERT_THAT(err, Ne(nullptr)); +} + +TEST_F(TcpClientTests, GetResponceReturnsWrongRequest) { + ExpectNewConnection(false, true); + ExpectSendDataRequest(expected_sd, true); + ExpectGetResponce(expected_sd, true, asapo::kNetErrorWrongRequest); + EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); + + + auto err = client->GetData(&info, &data); + + ASSERT_THAT(err, Ne(nullptr)); +} + +TEST_F(TcpClientTests, ErrorGettingData) { + ExpectNewConnection(false, true); + ExpectSendDataRequest(expected_sd, true); + ExpectGetResponce(expected_sd, true, asapo::kNetErrorNoError); + ExpectGetData(expected_sd, false); + + auto err = client->GetData(&info, &data); + + ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kTimeout)); +} + +TEST_F(TcpClientTests, OkGettingData) { + ExpectNewConnection(false, true); + ExpectSendDataRequest(expected_sd, true); + ExpectGetResponce(expected_sd, true, asapo::kNetErrorNoError); + ExpectGetData(expected_sd, true); + + auto err = client->GetData(&info, &data); + + ASSERT_THAT(err, Eq(nullptr)); +} + +TEST_F(TcpClientTests, OkGettingDataWithReconnect) { + ExpectNewConnection(true, true); + ExpectSendDataRequest(expected_sd, false); + ExpectReconnect(true); + ExpectSendDataRequest(expected_sd + 1, true); + ExpectGetResponce(expected_sd + 1, true, asapo::kNetErrorNoError); + ExpectGetData(expected_sd + 1, true); + + auto err = client->GetData(&info, &data); + + ASSERT_THAT(err, Eq(nullptr)); +} + +} diff --git a/worker/api/cpp/unittests/test_tcp_connection_pool.cpp b/worker/api/cpp/unittests/test_tcp_connection_pool.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8d0e139b1bf0b3d74ddc4908286446f43e67388e --- /dev/null +++ b/worker/api/cpp/unittests/test_tcp_connection_pool.cpp @@ -0,0 +1,176 @@ +#include <gmock/gmock.h> +#include "gtest/gtest.h" + +#include "io/io.h" +#include "unittests/MockIO.h" +#include "mocking.h" +#include "../src/tcp_connection_pool.h" +#include "../../../../common/cpp/src/system_io/system_io.h" + + +using asapo::IO; +using asapo::FileInfo; +using asapo::FileData; +using asapo::MockIO; +using asapo::SimpleError; +using asapo::TcpConnectionPool; +using asapo::SocketDescriptor; +using asapo::Error; + +using ::testing::AtLeast; +using ::testing::Eq; +using ::testing::HasSubstr; +using ::testing::Ne; +using ::testing::Test; +using ::testing::_; +using ::testing::Mock; +using ::testing::NiceMock; +using ::testing::Return; +using ::testing::SetArgPointee; +using ::testing::SetArgReferee; +using testing::AllOf; + +namespace { + +TEST(TcpConnectioPool, Constructor) { + auto client = std::unique_ptr<TcpConnectionPool> {new TcpConnectionPool()}; + ASSERT_THAT(dynamic_cast<asapo::SystemIO*>(client->io__.get()), Ne(nullptr)); +} + + +class TcpConnectioPoolTests : public Test { + public: + NiceMock<MockIO> mock_io; + FileInfo info; + std::string expected_source = "test:8400"; + TcpConnectionPool pool; + SocketDescriptor expected_sd = 123; + bool reused; + void SetUp() override { + pool.io__ = std::unique_ptr<IO> {&mock_io}; + } + void TearDown() override { + pool.io__.release(); + } + + void ExpectSingleConnect() { + EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(expected_source, _)) + .WillOnce( + DoAll( + testing::SetArgPointee<1>(nullptr), + Return(expected_sd) + )); + } + void ExpectTwoConnects(bool second_fails = false) { + EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(expected_source, _)).Times(2) + .WillOnce( + DoAll( + testing::SetArgPointee<1>(nullptr), + Return(expected_sd) + )) + .WillOnce( + DoAll( + testing::SetArgPointee<1>(second_fails ? asapo::IOErrorTemplates::kUnknownIOError.Generate().release() : nullptr), + Return(second_fails ? asapo::kDisconnectedSocketDescriptor : expected_sd + 1) + )); + } + +}; + +TEST_F(TcpConnectioPoolTests, GetConnectionCreatesNewOne) { + ExpectSingleConnect(); + + Error err; + auto sd = pool.GetFreeConnection(expected_source, &reused, &err); + + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(reused, Eq(false)); + ASSERT_THAT(sd, Eq(expected_sd)); +} + + +TEST_F(TcpConnectioPoolTests, GetTwoConnection) { + ExpectTwoConnects(); + + Error err; + auto sd1 = pool.GetFreeConnection(expected_source, &reused, &err); + auto sd2 = pool.GetFreeConnection(expected_source, &reused, &err); + + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(sd1, Eq(expected_sd)); + ASSERT_THAT(sd2, Eq(expected_sd + 1)); +} + + +TEST_F(TcpConnectioPoolTests, GetConnectionUsesConnectionPool) { + ExpectSingleConnect(); + + Error err; + auto sd1 = pool.GetFreeConnection(expected_source, &reused, &err); + pool.ReleaseConnection(sd1); + auto sd2 = pool.GetFreeConnection(expected_source, &reused, &err); + + ASSERT_THAT(reused, Eq(true)); + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(sd1, Eq(expected_sd)); + ASSERT_THAT(sd2, Eq(expected_sd)); +} + +TEST_F(TcpConnectioPoolTests, CannotConnect) { + + EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(expected_source, _)) + .WillOnce( + DoAll( + testing::SetArgPointee<1>(asapo::IOErrorTemplates::kInvalidAddressFormat.Generate().release()), + Return(asapo::kDisconnectedSocketDescriptor) + )); + + Error err, err1; + auto sd = pool.GetFreeConnection(expected_source, &reused, &err); + auto sd1 = pool.Reconnect(sd, &err1); + ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kInvalidAddressFormat)); + ASSERT_THAT(err1, Ne(nullptr)); + ASSERT_THAT(sd, Eq(asapo::kDisconnectedSocketDescriptor)); + ASSERT_THAT(sd1, Eq(asapo::kDisconnectedSocketDescriptor)); +} + +TEST_F(TcpConnectioPoolTests, CanReconnect) { + ExpectTwoConnects(); + + Error err; + auto sd1 = pool.GetFreeConnection(expected_source, &reused, &err); + pool.ReleaseConnection(sd1); + auto sd2 = pool.Reconnect(sd1, &err); + + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(sd1, Eq(expected_sd)); + ASSERT_THAT(sd2, Eq(expected_sd + 1)); +} + +TEST_F(TcpConnectioPoolTests, ReconnectionFails) { + ExpectTwoConnects(true); + + Error err1, err2, err3; + auto sd1 = pool.GetFreeConnection(expected_source, &reused, &err1); + pool.ReleaseConnection(sd1); + auto sd2 = pool.Reconnect(sd1, &err2); + auto sd3 = pool.Reconnect(sd2, &err3); // this reconnect should not work as the record has been removed + + ASSERT_THAT(err1, Eq(nullptr)); + ASSERT_THAT(sd1, Eq(expected_sd)); + ASSERT_THAT(err2, Eq(asapo::IOErrorTemplates::kUnknownIOError)); + ASSERT_THAT(sd2, Eq(asapo::kDisconnectedSocketDescriptor)); + ASSERT_THAT(sd3, Eq(asapo::kDisconnectedSocketDescriptor)); + ASSERT_THAT(err3, Ne(nullptr)); +} + + +TEST_F(TcpConnectioPoolTests, ReconnectWrongSD) { + Error err; + pool.Reconnect(expected_sd, &err); + + ASSERT_THAT(err, Ne(nullptr)); +} + + +}