diff --git a/common/cpp/include/io/io.h b/common/cpp/include/io/io.h index d40a2cb8625a0cbe4504ac1fbb508bf961334ecd..47b94e69312269d335753599929d2690cb558474 100644 --- a/common/cpp/include/io/io.h +++ b/common/cpp/include/io/io.h @@ -41,6 +41,7 @@ enum class SocketProtocols { using FileDescriptor = int; using SocketDescriptor = int; +using ListSocketDescriptors = std::vector<SocketDescriptor>; const SocketDescriptor kDisconnectedSocketDescriptor = -1; class IO { @@ -57,6 +58,10 @@ class IO { virtual SocketDescriptor CreateSocket(AddressFamilies address_family, SocketTypes socket_type, SocketProtocols socket_protocol, Error* err) const = 0; virtual void Listen(SocketDescriptor socket_fd, int backlog, Error* err) const = 0; + + virtual ListSocketDescriptors WaitSocketsActivity(const ListSocketDescriptors& sockets_to_listen, Error* err) const = 0; + + virtual void InetBind(SocketDescriptor socket_fd, const std::string& address, Error* err) const = 0; virtual SocketDescriptor CreateAndBindIPTCPSocketListener(const std::string& address, int backlog, Error* err) const = 0; diff --git a/common/cpp/include/unittests/MockIO.h b/common/cpp/include/unittests/MockIO.h index abbee5358101c904a2eae8333688750e5121f953..0b8e9ff18e83d7b8862acee5053dc70426bafe75 100644 --- a/common/cpp/include/unittests/MockIO.h +++ b/common/cpp/include/unittests/MockIO.h @@ -13,6 +13,18 @@ class MockIO : public IO { } MOCK_CONST_METHOD1(NewThread_t, std::thread * (std::function<void()> function)); + + ListSocketDescriptors WaitSocketsActivity(const ListSocketDescriptors& sockets_to_listen, Error* err) const override { + ErrorInterface* error = nullptr; + auto data = WaitSocketsActivity_t(sockets_to_listen, &error); + err->reset(error); + return data; + } + + MOCK_CONST_METHOD2(WaitSocketsActivity_t, ListSocketDescriptors(ListSocketDescriptors sockets_to_listen, + ErrorInterface** err)); + + SocketDescriptor CreateSocket(AddressFamilies address_family, SocketTypes socket_type, SocketProtocols socket_protocol, Error* err) const override { ErrorInterface* error = nullptr; diff --git a/common/cpp/src/system_io/system_io.cpp b/common/cpp/src/system_io/system_io.cpp index e162e2317190e8a5f5a3dd9d631e55a6513257ee..adb1af92ca48c764cb1c056649ec2b946d8d0e5c 100644 --- a/common/cpp/src/system_io/system_io.cpp +++ b/common/cpp/src/system_io/system_io.cpp @@ -612,4 +612,32 @@ Error SystemIO::RemoveFile(const std::string& fname) const { } } +ListSocketDescriptors SystemIO::WaitSocketsActivity(const ListSocketDescriptors& sockets_to_listen, Error* err) const { + fd_set readfds; + FD_ZERO(&readfds); + SocketDescriptor max_sd = kDisconnectedSocketDescriptor; + for (auto sd : sockets_to_listen) { + FD_SET(sd, &readfds); + if (sd > max_sd) { + max_sd = sd; + } + } + + auto activity = select(max_sd + 1, &readfds , NULL , NULL , NULL); + + if ((activity < 0) && (errno != EINTR)) { + *err = GetLastError(); + return {}; + } + + ListSocketDescriptors active_sockets; + for (auto sd : sockets_to_listen) { + if (FD_ISSET(sd, &readfds)) { + active_sockets.push_back(sd); + } + } + + return active_sockets; +} + } diff --git a/common/cpp/src/system_io/system_io.h b/common/cpp/src/system_io/system_io.h index 48c614927db890d3ece91b0b2ad97b7e4a7b9468..048b5d1935ebbde6dc3d66d4ef210fe292634379 100644 --- a/common/cpp/src/system_io/system_io.h +++ b/common/cpp/src/system_io/system_io.h @@ -82,6 +82,10 @@ class SystemIO final : public IO { /* * Network */ + + ListSocketDescriptors WaitSocketsActivity(const ListSocketDescriptors& sockets_to_listen, Error* err) const override; + + SocketDescriptor CreateSocket(AddressFamilies address_family, SocketTypes socket_type, SocketProtocols socket_protocol, Error* err) const; void Listen(SocketDescriptor socket_fd, int backlog, Error* err) const; diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index fcc419fd88bfda278722f05e88f94c8963c8995b..44821cb69690de93bb1d12399f08ee625de8fee1 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -70,5 +70,7 @@ set(TEST_LIBRARIES "${TARGET_NAME};system_io") gtest(${TARGET_NAME} "${TEST_SOURCE_FILES}" "${TEST_LIBRARIES}" ${CMAKE_CURRENT_SOURCE_DIR}/src/main.cpp) set(TEST_SOURCE_FILES_RDS - unittests/receiver_data_server/test_receiver_data_server.cpp) + unittests/receiver_data_server/test_receiver_data_server.cpp + unittests/receiver_data_server/test_tcp_server.cpp + ) gtest(${TARGET_NAME}_RDS "${TEST_SOURCE_FILES_RDS}" "${TEST_LIBRARIES}" ${CMAKE_CURRENT_SOURCE_DIR}/src/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/*.h) diff --git a/receiver/src/main.cpp b/receiver/src/main.cpp index 72d45a960aeb3ff907b929fcf52c4f581e18d910..6c80cec86293f06f900d22aa778f771ce6abf555 100644 --- a/receiver/src/main.cpp +++ b/receiver/src/main.cpp @@ -7,6 +7,8 @@ #include "receiver_logger.h" #include "common/version.h" +#include "receiver_data_server/receiver_data_server.h" + asapo::Error ReadConfigFile(int argc, char* argv[]) { if (argc != 2) { std::cerr << "Usage: " << argv[0] << " <config file>" << std::endl; @@ -21,7 +23,6 @@ int main (int argc, char* argv[]) { auto err = ReadConfigFile(argc, argv); const auto& logger = asapo::GetDefaultReceiverLogger(); - if (err) { logger->Error("cannot read config file: " + err->Explain()); return 1; @@ -31,6 +32,10 @@ int main (int argc, char* argv[]) { logger->SetLogLevel(config->log_level); + static const std::string dataserver_address = "0.0.0.0:" + std::to_string(config->dataserver_listen_port); + asapo::ReceiverDataServer data_server{dataserver_address}; + std::thread server_thread (&asapo::ReceiverDataServer::Run, &data_server); + static const std::string address = "0.0.0.0:" + std::to_string(config->listen_port); auto* receiver = new asapo::Receiver(); diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp index d2eb57eb7e716b0305ec61762b1453998fac35fd..24b8152888ec8ac549c348d7c48c66670e0cd6a6 100644 --- a/receiver/src/receiver_config.cpp +++ b/receiver/src/receiver_config.cpp @@ -17,6 +17,7 @@ Error ReceiverConfigFactory::SetConfigFromFile(std::string file_name) { Error err; (err = parser.GetString("MonitorDbAddress", &config.monitor_db_uri)) || (err = parser.GetUInt64("ListenPort", &config.listen_port)) || + (err = parser.Embedded("DataServer").GetUInt64("ListenPort", &config.dataserver_listen_port)) || (err = parser.GetBool("WriteToDisk", &config.write_to_disk)) || (err = parser.GetBool("WriteToDb", &config.write_to_db)) || (err = parser.GetString("BrokerDbAddress", &config.broker_db_uri)) || diff --git a/receiver/src/receiver_config.h b/receiver/src/receiver_config.h index d99d07617b84de3a53d5e0edbff06dec75a1e5cc..5b7f1a9e01f713dbf78dc5d8ee5ce41ca60f144e 100644 --- a/receiver/src/receiver_config.h +++ b/receiver/src/receiver_config.h @@ -13,6 +13,7 @@ struct ReceiverConfig { std::string broker_db_uri; std::string root_folder; uint64_t listen_port = 0; + uint64_t dataserver_listen_port = 0; std::string authorization_server; uint64_t authorization_interval_ms = 0; bool write_to_disk = false; diff --git a/receiver/src/receiver_data_server/receiver_data_server.cpp b/receiver/src/receiver_data_server/receiver_data_server.cpp index 674a3cdc34313195828108c430f34e07741bb037..09ea980658e3a5668ec0ab79e0b27fbdb88c08db 100644 --- a/receiver/src/receiver_data_server/receiver_data_server.cpp +++ b/receiver/src/receiver_data_server/receiver_data_server.cpp @@ -4,7 +4,7 @@ namespace asapo { -ReceiverDataServer::ReceiverDataServer() : request_pool__{new RequestPool}, net__{new TcpServer()}, +ReceiverDataServer::ReceiverDataServer(std::string address) : request_pool__{new RequestPool}, net__{new TcpServer(address)}, log__{GetDefaultReceiverDataServerLogger()} { } @@ -12,11 +12,9 @@ void ReceiverDataServer::Run() { while (true) { Error err; auto requests = net__->GetNewRequests(&err); - if (err) { - log__->Error(std::string("receiver data server stopped: ") + err->Explain()); - return; + if (!err) { + err = request_pool__->AddRequests(requests); } - err = request_pool__->AddRequests(requests); if (err) { log__->Error(std::string("receiver data server stopped: ") + err->Explain()); return; diff --git a/receiver/src/receiver_data_server/receiver_data_server.h b/receiver/src/receiver_data_server/receiver_data_server.h index d3a2972efde3b0fa53dbba6cf56e100bdedb52bd..74a3ce97b8d217ae925fc972fd18deef80d658b4 100644 --- a/receiver/src/receiver_data_server/receiver_data_server.h +++ b/receiver/src/receiver_data_server/receiver_data_server.h @@ -11,12 +11,12 @@ namespace asapo { class ReceiverDataServer { public: + explicit ReceiverDataServer(std::string address); std::unique_ptr<RequestPool> request_pool__; std::unique_ptr<NetServer> net__; const AbstractLogger* log__; - - ReceiverDataServer(); void Run(); + private: }; } diff --git a/receiver/src/receiver_data_server/tcp_server.cpp b/receiver/src/receiver_data_server/tcp_server.cpp index db717fea1c103d0507153cd3b5f2310de031bce6..cb4529d5e93f45a8b32f935387f130f4c2f55f55 100644 --- a/receiver/src/receiver_data_server/tcp_server.cpp +++ b/receiver/src/receiver_data_server/tcp_server.cpp @@ -1,8 +1,32 @@ #include "tcp_server.h" +#include "receiver_data_server_logger.h" + +#include "io/io_factory.h" namespace asapo { +TcpServer::TcpServer(std::string address) : io__{GenerateDefaultIO()}, log__{GetDefaultReceiverDataServerLogger()}, + address_{std::move(address)} {} + +Error TcpServer::InitializeMasterSocketIfNeeded() const noexcept { + Error err; + if (master_socket_ == kDisconnectedSocketDescriptor) { + master_socket_ = io__->CreateAndBindIPTCPSocketListener(address_, kMaxPendingConnections, &err); + if (!err) { + log__->Info("data server listening on " + address_); + } else { + log__->Error("dataserver cannot listen on " + address_ + ": " + err->Explain()); + } + } + return err; +} + Requests TcpServer::GetNewRequests(Error* err) const noexcept { + if (*err = InitializeMasterSocketIfNeeded()) { + return {}; + } + return {}; } + } \ No newline at end of file diff --git a/receiver/src/receiver_data_server/tcp_server.h b/receiver/src/receiver_data_server/tcp_server.h index 27d28f078bf65b2f7c055ea974ee3f6800eaa644..dd71c1e5f95ad109cf288884c7f1b2d0988d69fd 100644 --- a/receiver/src/receiver_data_server/tcp_server.h +++ b/receiver/src/receiver_data_server/tcp_server.h @@ -2,12 +2,25 @@ #define ASAPO_TCP_SERVER_H #include "net_server.h" +#include "io/io.h" +#include "logger/logger.h" namespace asapo { +const int kMaxPendingConnections = 5; + class TcpServer : public NetServer { public: + TcpServer(std::string address); virtual Requests GetNewRequests(Error* err) const noexcept override ; + std::unique_ptr<IO> io__; + const AbstractLogger* log__; + private: + + Error InitializeMasterSocketIfNeeded() const noexcept; + mutable SocketDescriptor master_socket_{kDisconnectedSocketDescriptor}; + mutable ListSocketDescriptors client_sockets_; + std::string address_; }; } diff --git a/receiver/unittests/mock_receiver_config.cpp b/receiver/unittests/mock_receiver_config.cpp index ae51f8f86bcaa0e294882f2f3f53ae454aaad5b6..d20c961a3840dd1cc1bcd07efeca4956d8520374 100644 --- a/receiver/unittests/mock_receiver_config.cpp +++ b/receiver/unittests/mock_receiver_config.cpp @@ -38,6 +38,8 @@ Error SetReceiverConfig (const ReceiverConfig& config) { config_string += "," + std::string("\"MonitorDbName\":") + "\"" + config.monitor_db_name + "\""; config_string += "," + std::string("\"BrokerDbAddress\":") + "\"" + config.broker_db_uri + "\""; config_string += "," + std::string("\"ListenPort\":") + std::to_string(config.listen_port); + config_string += "," + std::string("\"DataServer\":{\"ListenPort\":") + std::to_string( + config.dataserver_listen_port) + "}"; config_string += "," + std::string("\"AuthorizationInterval\":") + std::to_string(config.authorization_interval_ms); config_string += "," + std::string("\"AuthorizationServer\":") + "\"" + config.authorization_server + "\""; config_string += "," + std::string("\"WriteToDisk\":") + (config.write_to_disk ? "true" : "false"); diff --git a/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp b/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp index a7685fe9b0309c13ec47fae9142a5c39baa1c855..ef752a114eb6f110c43f286e15ce5653188057b5 100644 --- a/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp +++ b/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp @@ -35,7 +35,7 @@ using asapo::Error; namespace { TEST(ReceiverDataServer, Constructor) { - ReceiverDataServer data_server; + ReceiverDataServer data_server{""}; ASSERT_THAT(dynamic_cast<const asapo::TcpServer*>(data_server.net__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::RequestPool*>(data_server.request_pool__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(data_server.log__), Ne(nullptr)); @@ -44,7 +44,8 @@ TEST(ReceiverDataServer, Constructor) { class ReceiverDataServerTests : public Test { public: - ReceiverDataServer data_server; + std::string expected_address = "somehost:123"; + ReceiverDataServer data_server{expected_address}; asapo::MockNetServer mock_net; asapo::MockPool mock_pool; NiceMock<asapo::MockLogger> mock_logger; @@ -52,7 +53,6 @@ class ReceiverDataServerTests : public Test { data_server.net__ = std::unique_ptr<asapo::NetServer> {&mock_net}; data_server.request_pool__ = std::unique_ptr<asapo::RequestPool> {&mock_pool}; data_server.log__ = &mock_logger; - } void TearDown() override { data_server.net__.release(); @@ -62,14 +62,6 @@ class ReceiverDataServerTests : public Test { TEST_F(ReceiverDataServerTests, ErrorGetNewRequests) { - - /* EXPECT_CALL(mock_net, ProcessRequest_t(_)).WillOnce( - Return(nullptr) - ).WillOnce( - Return(new asapo::IOError("Test Send Error", asapo::IOErrorType::kUnknownIOError)) - ); - */ - EXPECT_CALL(mock_net, GetNewRequests_t(_)).WillOnce( DoAll(SetArgPointee<0>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), Return(asapo::Requests{}) @@ -80,13 +72,10 @@ TEST_F(ReceiverDataServerTests, ErrorGetNewRequests) { EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("stopped"), HasSubstr(errtext)))); - data_server.Run(); - } TEST_F(ReceiverDataServerTests, ErrorAddingRequests) { - EXPECT_CALL(mock_net, GetNewRequests_t(_)).WillOnce( DoAll(SetArgPointee<0>(nullptr), Return(asapo::Requests{}) @@ -104,6 +93,22 @@ TEST_F(ReceiverDataServerTests, ErrorAddingRequests) { data_server.Run(); } +TEST_F(ReceiverDataServerTests, Ok) { + EXPECT_CALL(mock_net, GetNewRequests_t(_)).WillOnce( + DoAll(SetArgPointee<0>(nullptr), + Return(asapo::Requests{}) + ) + ).WillOnce( + DoAll(SetArgPointee<0>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), + Return(asapo::Requests{}) + ) + ); + EXPECT_CALL(mock_pool, AddRequests_t(_)).WillOnce( + Return(nullptr) + ); + + data_server.Run(); +} } diff --git a/receiver/unittests/receiver_data_server/test_tcp_server.cpp b/receiver/unittests/receiver_data_server/test_tcp_server.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3b74e0b03938353fa74b9d26f1b6897019d6403c --- /dev/null +++ b/receiver/unittests/receiver_data_server/test_tcp_server.cpp @@ -0,0 +1,84 @@ +#include <gtest/gtest.h> +#include <gmock/gmock.h> + + +#include "unittests/MockLogger.h" +#include "unittests/MockIO.h" + +#include "../../src/receiver_data_server/tcp_server.h" + + + +using ::testing::Test; +using ::testing::Gt; +using ::testing::Ge; +using ::testing::Le; +using ::testing::Eq; +using ::testing::Ne; +using ::testing::Ref; +using ::testing::Return; +using ::testing::_; +using ::testing::SetArgPointee; +using ::testing::NiceMock; +using ::testing::HasSubstr; + + +using asapo::TcpServer; +using asapo::MockIO; +using asapo::Error; + + +namespace { + +TEST(TCPServer, Constructor) { + TcpServer tcp_server(""); + ASSERT_THAT(dynamic_cast<asapo::IO*>(tcp_server.io__.get()), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(tcp_server.log__), Ne(nullptr)); + +} + +class TCPServerTests : public Test { + public: + std::string expected_address = "somehost:123"; + TcpServer tcp_server{expected_address}; + NiceMock<MockIO> mock_io; + NiceMock<asapo::MockLogger> mock_logger; + asapo::SocketDescriptor expected_socket = 1; + void SetUp() override { + tcp_server.io__ = std::unique_ptr<asapo::IO> {&mock_io}; + tcp_server.log__ = &mock_logger; + } + void TearDown() override { + tcp_server.io__.release(); + } + void ExpectListenMaster(bool ok); + +}; + +void TCPServerTests::ExpectListenMaster(bool ok) { + EXPECT_CALL(mock_io, CreateAndBindIPTCPSocketListener_t(expected_address, asapo::kMaxPendingConnections, _)) + .WillOnce(DoAll( + SetArgPointee<2>(ok ? nullptr : asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), + Return(expected_socket) + )); + +} + +TEST_F(TCPServerTests, GetNewRequestsInitializesSocket) { + Error err; + ExpectListenMaster(false); + tcp_server.GetNewRequests(&err); + ASSERT_THAT(err, Ne(nullptr)); +} + +TEST_F(TCPServerTests, GetNewRequestsInitializesSocketOnlyOnce) { + Error err; + ExpectListenMaster(false); + tcp_server.GetNewRequests(&err); + tcp_server.GetNewRequests(&err); +// ASSERT_THAT(err, Ne(nullptr)); +} + + + +} diff --git a/receiver/unittests/test_config.cpp b/receiver/unittests/test_config.cpp index 45e2a4e1a628ab745523edde35919ccd9f23ec2f..17c869fb5f1673105de8145801f7d38f2aec0f9f 100644 --- a/receiver/unittests/test_config.cpp +++ b/receiver/unittests/test_config.cpp @@ -50,6 +50,7 @@ TEST_F(ConfigTests, ReadSettings) { asapo::ReceiverConfig test_config; test_config.listen_port = 4200; + test_config.dataserver_listen_port = 4201; test_config.tag = "receiver1"; test_config.monitor_db_name = "db_test"; test_config.monitor_db_uri = "localhost:8086"; @@ -70,6 +71,7 @@ TEST_F(ConfigTests, ReadSettings) { ASSERT_THAT(config->monitor_db_name, Eq("db_test")); ASSERT_THAT(config->broker_db_uri, Eq("localhost:27017")); ASSERT_THAT(config->listen_port, Eq(4200)); + ASSERT_THAT(config->dataserver_listen_port, Eq(4201)); ASSERT_THAT(config->authorization_interval_ms, Eq(10000)); ASSERT_THAT(config->authorization_server, Eq("AuthorizationServer")); ASSERT_THAT(config->write_to_disk, Eq(true)); diff --git a/tests/automatic/system_io/ip_tcp_network/CMakeLists.txt b/tests/automatic/system_io/ip_tcp_network/CMakeLists.txt index 9222a3606bba3f4b4f28d8e9df608efcfaabd0de..25009418aa20a00c4efbf8207702179fa3bf00fa 100644 --- a/tests/automatic/system_io/ip_tcp_network/CMakeLists.txt +++ b/tests/automatic/system_io/ip_tcp_network/CMakeLists.txt @@ -1,16 +1,4 @@ -set(TARGET_NAME ip_tcp_network) -set(SOURCE_FILES ip_tcp_network.cpp) +CMAKE_MINIMUM_REQUIRED(VERSION 3.7) # needed for fixtures -################################ -# Executable and link -################################ -add_executable(${TARGET_NAME} ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> ) -target_link_libraries(${TARGET_NAME} test_common) -target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR}) -set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX) - -################################ -# Testing -################################ -# memory test too slow -add_integration_test(${TARGET_NAME} ip_tcp_network " " nomem) +add_subdirectory(client_serv) +add_subdirectory(client_serv_multicon) diff --git a/tests/automatic/system_io/ip_tcp_network/client_serv/CMakeLists.txt b/tests/automatic/system_io/ip_tcp_network/client_serv/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..9222a3606bba3f4b4f28d8e9df608efcfaabd0de --- /dev/null +++ b/tests/automatic/system_io/ip_tcp_network/client_serv/CMakeLists.txt @@ -0,0 +1,16 @@ +set(TARGET_NAME ip_tcp_network) +set(SOURCE_FILES ip_tcp_network.cpp) + +################################ +# Executable and link +################################ +add_executable(${TARGET_NAME} ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> ) +target_link_libraries(${TARGET_NAME} test_common) +target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR}) +set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX) + +################################ +# Testing +################################ +# memory test too slow +add_integration_test(${TARGET_NAME} ip_tcp_network " " nomem) diff --git a/tests/automatic/system_io/ip_tcp_network/ip_tcp_network.cpp b/tests/automatic/system_io/ip_tcp_network/client_serv/ip_tcp_network.cpp similarity index 100% rename from tests/automatic/system_io/ip_tcp_network/ip_tcp_network.cpp rename to tests/automatic/system_io/ip_tcp_network/client_serv/ip_tcp_network.cpp diff --git a/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/CMakeLists.txt b/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..9ec394d69dda463112f97886ad7114701b278560 --- /dev/null +++ b/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/CMakeLists.txt @@ -0,0 +1,16 @@ +set(TARGET_NAME ip_tcp_network_clientserv_multicon) +set(SOURCE_FILES multicon.cpp) + +################################ +# Executable and link +################################ +add_executable(${TARGET_NAME} ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> ) +target_link_libraries(${TARGET_NAME} test_common) +target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR}) +set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX) + +################################ +# Testing +################################ +# memory test too slow +add_integration_test(${TARGET_NAME} ip_tcp_network_multicon " " nomem) diff --git a/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/multicon.cpp b/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/multicon.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ae05c6b35a3b747bf55cbc86ce5d89fd0b67ef17 --- /dev/null +++ b/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/multicon.cpp @@ -0,0 +1,138 @@ +#include <iostream> +#include "io/io_factory.h" +#include <chrono> +#include <thread> +#include <future> +#include <atomic> +#include <algorithm> + +#include "testing.h" + +using asapo::Error; +using asapo::ErrorType; +using asapo::AddressFamilies; +using asapo::SocketTypes; +using asapo::SocketProtocols; +using asapo::SocketDescriptor; +using asapo::M_AssertEq; + +using namespace std::chrono; + +static const std::unique_ptr<asapo::IO> io(asapo::GenerateDefaultIO()); +static const std::string kListenAddress = "127.0.0.1:60123"; +static std::promise<void> kThreadStarted; +std::atomic<int> exit_thread{0}; + +void Exit(int exit_number) { + std::cerr << "ERROR: Exit on " << exit_number << std::endl; + exit(exit_number); +} + +void ExitIfErrIsNotOk(Error* err, int exit_number) { + if(*err != nullptr) { + std::cerr << "Explain(): " << (*err)->Explain() << std::endl; + Exit(exit_number); + } +} + +std::unique_ptr<std::thread> CreateEchoServerThread() { + return io->NewThread([&] { + Error err; + SocketDescriptor master_socket = io->CreateAndBindIPTCPSocketListener(kListenAddress, 3, &err); + std::cout << "[SERVER] master socket " << master_socket << std::endl; + ExitIfErrIsNotOk(&err, 100); + kThreadStarted.set_value(); + asapo::ListSocketDescriptors sockets_to_listen; + sockets_to_listen.push_back(master_socket); + while (!exit_thread) { + auto sockets = io->WaitSocketsActivity(sockets_to_listen, &err); + for(auto socket : sockets) { + std::cout << "[SERVER] processing socket " << socket << std::endl; + if (socket == master_socket) { + auto client_info_tuple = io->InetAcceptConnection(socket, &err); + ExitIfErrIsNotOk(&err, 103); + std::string client_address; + SocketDescriptor client_fd; + std::tie(client_address, client_fd) = *client_info_tuple; + std::cout << "[SERVER] accepted connection from " << client_address << std::endl; + sockets_to_listen.push_back(client_fd); + } else { + uint64_t message; + io->Receive(socket, &message, sizeof(uint64_t), &err); + if (err == asapo::ErrorTemplates::kEndOfFile) { + std::cout << "[SERVER] end of file " << socket << std::endl; + io->CloseSocket(socket, &err); + sockets_to_listen.erase(std::remove(sockets_to_listen.begin(), sockets_to_listen.end(), socket), + sockets_to_listen.end()); + continue; + } + ExitIfErrIsNotOk(&err, 104); + + io->Send(socket, &message, sizeof(uint64_t), &err); + ExitIfErrIsNotOk(&err, 105); + } + } + } + for(auto socket : sockets_to_listen) { + std::cout << "[SERVER] close socket " << socket << std::endl; + io->CloseSocket(socket, &err); + ExitIfErrIsNotOk(&err, 108); + } + }); +} + +void CheckNormal(int times) { + Error err; + std::cout << "[CLIENT] CreateAndConnectIPTCPSocket" << std::endl; + SocketDescriptor socket = io->CreateAndConnectIPTCPSocket(kListenAddress, &err); + ExitIfErrIsNotOk(&err, 201); + + + for (int i = 0; i < times; i++) { + std::cout << "[CLIENT] send random number" << std::endl; + + uint64_t message_send = rand(); + + std::cout << "[CLIENT] Send Size" << std::endl; + io->Send(socket, &message_send, sizeof(uint64_t), &err); + ExitIfErrIsNotOk(&err, 203); + + uint64_t message_recv; + io->Receive(socket, &message_recv, sizeof(uint64_t), &err); + ExitIfErrIsNotOk(&err, 206); + if(message_recv != message_send) { + Exit(205); + } + } + std::cout << "[CLIENT] Close" << std::endl; + io->CloseSocket(socket, &err); + ExitIfErrIsNotOk(&err, 108); +} + +int main(int argc, char* argv[]) { + Error err; + std::unique_ptr<std::thread> server_thread = CreateEchoServerThread(); + kThreadStarted.get_future().get();//Make sure that the server is started + + std::cout << "Check" << std::endl; + auto thread1 = io->NewThread([&] { + CheckNormal(30); + }); + auto thread2 = io->NewThread([&] { + CheckNormal(30); + }); + auto thread3 = io->NewThread([&] { + CheckNormal(30); + }); + + thread1->join(); + thread2->join(); + thread3->join(); + + exit_thread = 1; + + std::cout << "server_thread->join()" << std::endl; + server_thread->join(); + + return 0; +}