From 445394aeba855fe1fca37f07fb8ae5ee86ee081b Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Fri, 25 Jan 2019 14:17:21 +0100
Subject: [PATCH] add listening multiple sockets to io

---
 common/cpp/include/io/io.h                    |   5 +
 common/cpp/include/unittests/MockIO.h         |  12 ++
 common/cpp/src/system_io/system_io.cpp        |  28 ++++
 common/cpp/src/system_io/system_io.h          |   4 +
 receiver/CMakeLists.txt                       |   4 +-
 receiver/src/main.cpp                         |   7 +-
 receiver/src/receiver_config.cpp              |   1 +
 receiver/src/receiver_config.h                |   1 +
 .../receiver_data_server.cpp                  |   8 +-
 .../receiver_data_server.h                    |   4 +-
 .../src/receiver_data_server/tcp_server.cpp   |  24 +++
 .../src/receiver_data_server/tcp_server.h     |  13 ++
 receiver/unittests/mock_receiver_config.cpp   |   2 +
 .../test_receiver_data_server.cpp             |  33 +++--
 .../receiver_data_server/test_tcp_server.cpp  |  84 +++++++++++
 receiver/unittests/test_config.cpp            |   2 +
 .../system_io/ip_tcp_network/CMakeLists.txt   |  18 +--
 .../ip_tcp_network/client_serv/CMakeLists.txt |  16 ++
 .../{ => client_serv}/ip_tcp_network.cpp      |   0
 .../client_serv_multicon/CMakeLists.txt       |  16 ++
 .../client_serv_multicon/multicon.cpp         | 138 ++++++++++++++++++
 21 files changed, 382 insertions(+), 38 deletions(-)
 create mode 100644 receiver/unittests/receiver_data_server/test_tcp_server.cpp
 create mode 100644 tests/automatic/system_io/ip_tcp_network/client_serv/CMakeLists.txt
 rename tests/automatic/system_io/ip_tcp_network/{ => client_serv}/ip_tcp_network.cpp (100%)
 create mode 100644 tests/automatic/system_io/ip_tcp_network/client_serv_multicon/CMakeLists.txt
 create mode 100644 tests/automatic/system_io/ip_tcp_network/client_serv_multicon/multicon.cpp

diff --git a/common/cpp/include/io/io.h b/common/cpp/include/io/io.h
index d40a2cb86..47b94e693 100644
--- a/common/cpp/include/io/io.h
+++ b/common/cpp/include/io/io.h
@@ -41,6 +41,7 @@ enum class SocketProtocols {
 
 using FileDescriptor = int;
 using SocketDescriptor = int;
+using ListSocketDescriptors =  std::vector<SocketDescriptor>;
 const SocketDescriptor kDisconnectedSocketDescriptor = -1;
 
 class IO {
@@ -57,6 +58,10 @@ class IO {
     virtual SocketDescriptor  CreateSocket(AddressFamilies address_family, SocketTypes socket_type,
                                            SocketProtocols socket_protocol, Error* err) const = 0;
     virtual void            Listen(SocketDescriptor socket_fd, int backlog, Error* err) const = 0;
+
+    virtual ListSocketDescriptors WaitSocketsActivity(const ListSocketDescriptors& sockets_to_listen, Error* err) const = 0;
+
+
     virtual void            InetBind(SocketDescriptor socket_fd, const std::string& address, Error* err) const = 0;
     virtual SocketDescriptor  CreateAndBindIPTCPSocketListener(const std::string& address, int backlog,
             Error* err) const = 0;
diff --git a/common/cpp/include/unittests/MockIO.h b/common/cpp/include/unittests/MockIO.h
index abbee5358..0b8e9ff18 100644
--- a/common/cpp/include/unittests/MockIO.h
+++ b/common/cpp/include/unittests/MockIO.h
@@ -13,6 +13,18 @@ class MockIO : public IO {
     }
     MOCK_CONST_METHOD1(NewThread_t, std::thread * (std::function<void()> function));
 
+
+    ListSocketDescriptors WaitSocketsActivity(const ListSocketDescriptors& sockets_to_listen, Error* err) const override {
+        ErrorInterface* error = nullptr;
+        auto data = WaitSocketsActivity_t(sockets_to_listen, &error);
+        err->reset(error);
+        return data;
+    }
+
+    MOCK_CONST_METHOD2(WaitSocketsActivity_t, ListSocketDescriptors(ListSocketDescriptors sockets_to_listen,
+                       ErrorInterface** err));
+
+
     SocketDescriptor CreateSocket(AddressFamilies address_family, SocketTypes socket_type, SocketProtocols socket_protocol,
                                   Error* err) const override {
         ErrorInterface* error = nullptr;
diff --git a/common/cpp/src/system_io/system_io.cpp b/common/cpp/src/system_io/system_io.cpp
index e162e2317..adb1af92c 100644
--- a/common/cpp/src/system_io/system_io.cpp
+++ b/common/cpp/src/system_io/system_io.cpp
@@ -612,4 +612,32 @@ Error SystemIO::RemoveFile(const std::string& fname) const {
     }
 }
 
+ListSocketDescriptors SystemIO::WaitSocketsActivity(const ListSocketDescriptors& sockets_to_listen, Error* err) const {
+    fd_set readfds;
+    FD_ZERO(&readfds);
+    SocketDescriptor max_sd = kDisconnectedSocketDescriptor;
+    for (auto sd : sockets_to_listen) {
+        FD_SET(sd, &readfds);
+        if (sd > max_sd) {
+            max_sd = sd;
+        }
+    }
+
+    auto activity = select(max_sd + 1, &readfds , NULL , NULL , NULL);
+
+    if ((activity < 0) && (errno != EINTR)) {
+        *err = GetLastError();
+        return {};
+    }
+
+    ListSocketDescriptors active_sockets;
+    for (auto sd : sockets_to_listen) {
+        if (FD_ISSET(sd, &readfds)) {
+            active_sockets.push_back(sd);
+        }
+    }
+
+    return active_sockets;
+}
+
 }
diff --git a/common/cpp/src/system_io/system_io.h b/common/cpp/src/system_io/system_io.h
index 48c614927..048b5d193 100644
--- a/common/cpp/src/system_io/system_io.h
+++ b/common/cpp/src/system_io/system_io.h
@@ -82,6 +82,10 @@ class SystemIO final : public IO {
     /*
      * Network
      */
+
+    ListSocketDescriptors WaitSocketsActivity(const ListSocketDescriptors& sockets_to_listen, Error* err) const override;
+
+
     SocketDescriptor  CreateSocket(AddressFamilies address_family, SocketTypes socket_type, SocketProtocols socket_protocol,
                                    Error* err) const;
     void            Listen(SocketDescriptor socket_fd, int backlog, Error* err) const;
diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt
index fcc419fd8..44821cb69 100644
--- a/receiver/CMakeLists.txt
+++ b/receiver/CMakeLists.txt
@@ -70,5 +70,7 @@ set(TEST_LIBRARIES "${TARGET_NAME};system_io")
 gtest(${TARGET_NAME} "${TEST_SOURCE_FILES}" "${TEST_LIBRARIES}" ${CMAKE_CURRENT_SOURCE_DIR}/src/main.cpp)
 
 set(TEST_SOURCE_FILES_RDS
-        unittests/receiver_data_server/test_receiver_data_server.cpp)
+        unittests/receiver_data_server/test_receiver_data_server.cpp
+        unittests/receiver_data_server/test_tcp_server.cpp
+        )
 gtest(${TARGET_NAME}_RDS "${TEST_SOURCE_FILES_RDS}" "${TEST_LIBRARIES}" ${CMAKE_CURRENT_SOURCE_DIR}/src/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/*.h)
diff --git a/receiver/src/main.cpp b/receiver/src/main.cpp
index 72d45a960..6c80cec86 100644
--- a/receiver/src/main.cpp
+++ b/receiver/src/main.cpp
@@ -7,6 +7,8 @@
 #include "receiver_logger.h"
 #include "common/version.h"
 
+#include "receiver_data_server/receiver_data_server.h"
+
 asapo::Error ReadConfigFile(int argc, char* argv[]) {
     if (argc != 2) {
         std::cerr << "Usage: " << argv[0] << " <config file>" << std::endl;
@@ -21,7 +23,6 @@ int main (int argc, char* argv[]) {
 
     auto err = ReadConfigFile(argc, argv);
     const auto& logger = asapo::GetDefaultReceiverLogger();
-
     if (err) {
         logger->Error("cannot read config file: " + err->Explain());
         return 1;
@@ -31,6 +32,10 @@ int main (int argc, char* argv[]) {
 
     logger->SetLogLevel(config->log_level);
 
+    static const std::string dataserver_address = "0.0.0.0:" + std::to_string(config->dataserver_listen_port);
+    asapo::ReceiverDataServer data_server{dataserver_address};
+    std::thread server_thread (&asapo::ReceiverDataServer::Run, &data_server);
+
     static const std::string address = "0.0.0.0:" + std::to_string(config->listen_port);
 
     auto* receiver = new asapo::Receiver();
diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp
index d2eb57eb7..24b815288 100644
--- a/receiver/src/receiver_config.cpp
+++ b/receiver/src/receiver_config.cpp
@@ -17,6 +17,7 @@ Error ReceiverConfigFactory::SetConfigFromFile(std::string file_name) {
     Error err;
     (err = parser.GetString("MonitorDbAddress", &config.monitor_db_uri)) ||
     (err = parser.GetUInt64("ListenPort", &config.listen_port)) ||
+    (err = parser.Embedded("DataServer").GetUInt64("ListenPort", &config.dataserver_listen_port)) ||
     (err = parser.GetBool("WriteToDisk", &config.write_to_disk)) ||
     (err = parser.GetBool("WriteToDb", &config.write_to_db)) ||
     (err = parser.GetString("BrokerDbAddress", &config.broker_db_uri)) ||
diff --git a/receiver/src/receiver_config.h b/receiver/src/receiver_config.h
index d99d07617..5b7f1a9e0 100644
--- a/receiver/src/receiver_config.h
+++ b/receiver/src/receiver_config.h
@@ -13,6 +13,7 @@ struct ReceiverConfig {
     std::string broker_db_uri;
     std::string root_folder;
     uint64_t listen_port = 0;
+    uint64_t dataserver_listen_port = 0;
     std::string authorization_server;
     uint64_t authorization_interval_ms = 0;
     bool write_to_disk = false;
diff --git a/receiver/src/receiver_data_server/receiver_data_server.cpp b/receiver/src/receiver_data_server/receiver_data_server.cpp
index 674a3cdc3..09ea98065 100644
--- a/receiver/src/receiver_data_server/receiver_data_server.cpp
+++ b/receiver/src/receiver_data_server/receiver_data_server.cpp
@@ -4,7 +4,7 @@
 
 namespace asapo {
 
-ReceiverDataServer::ReceiverDataServer() : request_pool__{new RequestPool}, net__{new TcpServer()},
+ReceiverDataServer::ReceiverDataServer(std::string address) : request_pool__{new RequestPool}, net__{new TcpServer(address)},
 log__{GetDefaultReceiverDataServerLogger()} {
 }
 
@@ -12,11 +12,9 @@ void ReceiverDataServer::Run() {
     while (true) {
         Error err;
         auto requests = net__->GetNewRequests(&err);
-        if (err) {
-            log__->Error(std::string("receiver data server stopped: ") + err->Explain());
-            return;
+        if (!err) {
+            err = request_pool__->AddRequests(requests);
         }
-        err = request_pool__->AddRequests(requests);
         if (err) {
             log__->Error(std::string("receiver data server stopped: ") + err->Explain());
             return;
diff --git a/receiver/src/receiver_data_server/receiver_data_server.h b/receiver/src/receiver_data_server/receiver_data_server.h
index d3a2972ef..74a3ce97b 100644
--- a/receiver/src/receiver_data_server/receiver_data_server.h
+++ b/receiver/src/receiver_data_server/receiver_data_server.h
@@ -11,12 +11,12 @@ namespace asapo {
 
 class ReceiverDataServer {
   public:
+    explicit ReceiverDataServer(std::string address);
     std::unique_ptr<RequestPool> request_pool__;
     std::unique_ptr<NetServer> net__;
     const AbstractLogger* log__;
-
-    ReceiverDataServer();
     void Run();
+  private:
 };
 
 }
diff --git a/receiver/src/receiver_data_server/tcp_server.cpp b/receiver/src/receiver_data_server/tcp_server.cpp
index db717fea1..cb4529d5e 100644
--- a/receiver/src/receiver_data_server/tcp_server.cpp
+++ b/receiver/src/receiver_data_server/tcp_server.cpp
@@ -1,8 +1,32 @@
 #include "tcp_server.h"
+#include "receiver_data_server_logger.h"
+
+#include "io/io_factory.h"
 
 namespace asapo {
 
+TcpServer::TcpServer(std::string address) : io__{GenerateDefaultIO()}, log__{GetDefaultReceiverDataServerLogger()},
+    address_{std::move(address)} {}
+
+Error TcpServer::InitializeMasterSocketIfNeeded() const noexcept {
+    Error err;
+    if (master_socket_ == kDisconnectedSocketDescriptor) {
+        master_socket_ = io__->CreateAndBindIPTCPSocketListener(address_, kMaxPendingConnections, &err);
+        if (!err) {
+            log__->Info("data server listening on " + address_);
+        } else {
+            log__->Error("dataserver cannot listen on " + address_ + ": " + err->Explain());
+        }
+    }
+    return err;
+}
+
 Requests TcpServer::GetNewRequests(Error* err) const noexcept {
+    if (*err = InitializeMasterSocketIfNeeded()) {
+        return {};
+    }
+
     return {};
 }
+
 }
\ No newline at end of file
diff --git a/receiver/src/receiver_data_server/tcp_server.h b/receiver/src/receiver_data_server/tcp_server.h
index 27d28f078..dd71c1e5f 100644
--- a/receiver/src/receiver_data_server/tcp_server.h
+++ b/receiver/src/receiver_data_server/tcp_server.h
@@ -2,12 +2,25 @@
 #define ASAPO_TCP_SERVER_H
 
 #include "net_server.h"
+#include "io/io.h"
+#include "logger/logger.h"
 
 namespace asapo {
 
+const int kMaxPendingConnections = 5;
+
 class TcpServer : public NetServer {
   public:
+    TcpServer(std::string address);
     virtual Requests GetNewRequests(Error* err) const noexcept override ;
+    std::unique_ptr<IO> io__;
+    const AbstractLogger* log__;
+  private:
+
+    Error InitializeMasterSocketIfNeeded() const noexcept;
+    mutable SocketDescriptor master_socket_{kDisconnectedSocketDescriptor};
+    mutable ListSocketDescriptors client_sockets_;
+    std::string address_;
 };
 
 }
diff --git a/receiver/unittests/mock_receiver_config.cpp b/receiver/unittests/mock_receiver_config.cpp
index ae51f8f86..d20c961a3 100644
--- a/receiver/unittests/mock_receiver_config.cpp
+++ b/receiver/unittests/mock_receiver_config.cpp
@@ -38,6 +38,8 @@ Error SetReceiverConfig (const ReceiverConfig& config) {
     config_string += "," + std::string("\"MonitorDbName\":") + "\"" + config.monitor_db_name + "\"";
     config_string += "," + std::string("\"BrokerDbAddress\":") + "\"" + config.broker_db_uri + "\"";
     config_string += "," + std::string("\"ListenPort\":") + std::to_string(config.listen_port);
+    config_string += "," + std::string("\"DataServer\":{\"ListenPort\":") + std::to_string(
+                         config.dataserver_listen_port) + "}";
     config_string += "," + std::string("\"AuthorizationInterval\":") + std::to_string(config.authorization_interval_ms);
     config_string += "," + std::string("\"AuthorizationServer\":") + "\"" + config.authorization_server + "\"";
     config_string += "," + std::string("\"WriteToDisk\":") + (config.write_to_disk ? "true" : "false");
diff --git a/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp b/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp
index a7685fe9b..ef752a114 100644
--- a/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp
+++ b/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp
@@ -35,7 +35,7 @@ using asapo::Error;
 namespace {
 
 TEST(ReceiverDataServer, Constructor) {
-    ReceiverDataServer data_server;
+    ReceiverDataServer data_server{""};
     ASSERT_THAT(dynamic_cast<const asapo::TcpServer*>(data_server.net__.get()), Ne(nullptr));
     ASSERT_THAT(dynamic_cast<asapo::RequestPool*>(data_server.request_pool__.get()), Ne(nullptr));
     ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(data_server.log__), Ne(nullptr));
@@ -44,7 +44,8 @@ TEST(ReceiverDataServer, Constructor) {
 
 class ReceiverDataServerTests : public Test {
   public:
-    ReceiverDataServer data_server;
+    std::string expected_address = "somehost:123";
+    ReceiverDataServer data_server{expected_address};
     asapo::MockNetServer mock_net;
     asapo::MockPool mock_pool;
     NiceMock<asapo::MockLogger> mock_logger;
@@ -52,7 +53,6 @@ class ReceiverDataServerTests : public Test {
         data_server.net__ = std::unique_ptr<asapo::NetServer> {&mock_net};
         data_server.request_pool__ = std::unique_ptr<asapo::RequestPool> {&mock_pool};
         data_server.log__ = &mock_logger;
-
     }
     void TearDown() override {
         data_server.net__.release();
@@ -62,14 +62,6 @@ class ReceiverDataServerTests : public Test {
 
 
 TEST_F(ReceiverDataServerTests, ErrorGetNewRequests) {
-
-    /*    EXPECT_CALL(mock_net, ProcessRequest_t(_)).WillOnce(
-            Return(nullptr)
-        ).WillOnce(
-            Return(new asapo::IOError("Test Send Error", asapo::IOErrorType::kUnknownIOError))
-        );
-    */
-
     EXPECT_CALL(mock_net, GetNewRequests_t(_)).WillOnce(
         DoAll(SetArgPointee<0>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()),
               Return(asapo::Requests{})
@@ -80,13 +72,10 @@ TEST_F(ReceiverDataServerTests, ErrorGetNewRequests) {
 
     EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("stopped"), HasSubstr(errtext))));
 
-
     data_server.Run();
-
 }
 
 TEST_F(ReceiverDataServerTests, ErrorAddingRequests) {
-
     EXPECT_CALL(mock_net, GetNewRequests_t(_)).WillOnce(
         DoAll(SetArgPointee<0>(nullptr),
               Return(asapo::Requests{})
@@ -104,6 +93,22 @@ TEST_F(ReceiverDataServerTests, ErrorAddingRequests) {
     data_server.Run();
 }
 
+TEST_F(ReceiverDataServerTests, Ok) {
+    EXPECT_CALL(mock_net, GetNewRequests_t(_)).WillOnce(
+        DoAll(SetArgPointee<0>(nullptr),
+              Return(asapo::Requests{})
+             )
+    ).WillOnce(
+        DoAll(SetArgPointee<0>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()),
+              Return(asapo::Requests{})
+             )
+    );
 
+    EXPECT_CALL(mock_pool, AddRequests_t(_)).WillOnce(
+        Return(nullptr)
+    );
+
+    data_server.Run();
+}
 
 }
diff --git a/receiver/unittests/receiver_data_server/test_tcp_server.cpp b/receiver/unittests/receiver_data_server/test_tcp_server.cpp
new file mode 100644
index 000000000..3b74e0b03
--- /dev/null
+++ b/receiver/unittests/receiver_data_server/test_tcp_server.cpp
@@ -0,0 +1,84 @@
+#include <gtest/gtest.h>
+#include <gmock/gmock.h>
+
+
+#include "unittests/MockLogger.h"
+#include "unittests/MockIO.h"
+
+#include "../../src/receiver_data_server/tcp_server.h"
+
+
+
+using ::testing::Test;
+using ::testing::Gt;
+using ::testing::Ge;
+using ::testing::Le;
+using ::testing::Eq;
+using ::testing::Ne;
+using ::testing::Ref;
+using ::testing::Return;
+using ::testing::_;
+using ::testing::SetArgPointee;
+using ::testing::NiceMock;
+using ::testing::HasSubstr;
+
+
+using asapo::TcpServer;
+using asapo::MockIO;
+using asapo::Error;
+
+
+namespace {
+
+TEST(TCPServer, Constructor) {
+    TcpServer tcp_server("");
+    ASSERT_THAT(dynamic_cast<asapo::IO*>(tcp_server.io__.get()), Ne(nullptr));
+    ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(tcp_server.log__), Ne(nullptr));
+
+}
+
+class TCPServerTests : public Test {
+  public:
+    std::string expected_address = "somehost:123";
+    TcpServer tcp_server{expected_address};
+    NiceMock<MockIO> mock_io;
+    NiceMock<asapo::MockLogger> mock_logger;
+    asapo::SocketDescriptor expected_socket = 1;
+    void SetUp() override {
+        tcp_server.io__ = std::unique_ptr<asapo::IO> {&mock_io};
+        tcp_server.log__ = &mock_logger;
+    }
+    void TearDown() override {
+        tcp_server.io__.release();
+    }
+    void ExpectListenMaster(bool ok);
+
+};
+
+void TCPServerTests::ExpectListenMaster(bool ok) {
+    EXPECT_CALL(mock_io, CreateAndBindIPTCPSocketListener_t(expected_address, asapo::kMaxPendingConnections, _))
+    .WillOnce(DoAll(
+                  SetArgPointee<2>(ok ? nullptr : asapo::IOErrorTemplates::kUnknownIOError.Generate().release()),
+                  Return(expected_socket)
+              ));
+
+}
+
+TEST_F(TCPServerTests, GetNewRequestsInitializesSocket) {
+    Error err;
+    ExpectListenMaster(false);
+    tcp_server.GetNewRequests(&err);
+    ASSERT_THAT(err, Ne(nullptr));
+}
+
+TEST_F(TCPServerTests, GetNewRequestsInitializesSocketOnlyOnce) {
+    Error err;
+    ExpectListenMaster(false);
+    tcp_server.GetNewRequests(&err);
+    tcp_server.GetNewRequests(&err);
+//    ASSERT_THAT(err, Ne(nullptr));
+}
+
+
+
+}
diff --git a/receiver/unittests/test_config.cpp b/receiver/unittests/test_config.cpp
index 45e2a4e1a..17c869fb5 100644
--- a/receiver/unittests/test_config.cpp
+++ b/receiver/unittests/test_config.cpp
@@ -50,6 +50,7 @@ TEST_F(ConfigTests, ReadSettings) {
 
     asapo::ReceiverConfig test_config;
     test_config.listen_port = 4200;
+    test_config.dataserver_listen_port = 4201;
     test_config.tag = "receiver1";
     test_config.monitor_db_name = "db_test";
     test_config.monitor_db_uri = "localhost:8086";
@@ -70,6 +71,7 @@ TEST_F(ConfigTests, ReadSettings) {
     ASSERT_THAT(config->monitor_db_name, Eq("db_test"));
     ASSERT_THAT(config->broker_db_uri, Eq("localhost:27017"));
     ASSERT_THAT(config->listen_port, Eq(4200));
+    ASSERT_THAT(config->dataserver_listen_port, Eq(4201));
     ASSERT_THAT(config->authorization_interval_ms, Eq(10000));
     ASSERT_THAT(config->authorization_server, Eq("AuthorizationServer"));
     ASSERT_THAT(config->write_to_disk, Eq(true));
diff --git a/tests/automatic/system_io/ip_tcp_network/CMakeLists.txt b/tests/automatic/system_io/ip_tcp_network/CMakeLists.txt
index 9222a3606..25009418a 100644
--- a/tests/automatic/system_io/ip_tcp_network/CMakeLists.txt
+++ b/tests/automatic/system_io/ip_tcp_network/CMakeLists.txt
@@ -1,16 +1,4 @@
-set(TARGET_NAME ip_tcp_network)
-set(SOURCE_FILES ip_tcp_network.cpp)
+CMAKE_MINIMUM_REQUIRED(VERSION 3.7) # needed for fixtures
 
-################################
-# Executable and link
-################################
-add_executable(${TARGET_NAME} ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> )
-target_link_libraries(${TARGET_NAME} test_common)
-target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR})
-set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX)
-
-################################
-# Testing
-################################
-# memory test too slow
-add_integration_test(${TARGET_NAME} ip_tcp_network " " nomem)
+add_subdirectory(client_serv)
+add_subdirectory(client_serv_multicon)
diff --git a/tests/automatic/system_io/ip_tcp_network/client_serv/CMakeLists.txt b/tests/automatic/system_io/ip_tcp_network/client_serv/CMakeLists.txt
new file mode 100644
index 000000000..9222a3606
--- /dev/null
+++ b/tests/automatic/system_io/ip_tcp_network/client_serv/CMakeLists.txt
@@ -0,0 +1,16 @@
+set(TARGET_NAME ip_tcp_network)
+set(SOURCE_FILES ip_tcp_network.cpp)
+
+################################
+# Executable and link
+################################
+add_executable(${TARGET_NAME} ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> )
+target_link_libraries(${TARGET_NAME} test_common)
+target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR})
+set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX)
+
+################################
+# Testing
+################################
+# memory test too slow
+add_integration_test(${TARGET_NAME} ip_tcp_network " " nomem)
diff --git a/tests/automatic/system_io/ip_tcp_network/ip_tcp_network.cpp b/tests/automatic/system_io/ip_tcp_network/client_serv/ip_tcp_network.cpp
similarity index 100%
rename from tests/automatic/system_io/ip_tcp_network/ip_tcp_network.cpp
rename to tests/automatic/system_io/ip_tcp_network/client_serv/ip_tcp_network.cpp
diff --git a/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/CMakeLists.txt b/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/CMakeLists.txt
new file mode 100644
index 000000000..9ec394d69
--- /dev/null
+++ b/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/CMakeLists.txt
@@ -0,0 +1,16 @@
+set(TARGET_NAME ip_tcp_network_clientserv_multicon)
+set(SOURCE_FILES multicon.cpp)
+
+################################
+# Executable and link
+################################
+add_executable(${TARGET_NAME} ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> )
+target_link_libraries(${TARGET_NAME} test_common)
+target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR})
+set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX)
+
+################################
+# Testing
+################################
+# memory test too slow
+add_integration_test(${TARGET_NAME} ip_tcp_network_multicon " " nomem)
diff --git a/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/multicon.cpp b/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/multicon.cpp
new file mode 100644
index 000000000..ae05c6b35
--- /dev/null
+++ b/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/multicon.cpp
@@ -0,0 +1,138 @@
+#include <iostream>
+#include "io/io_factory.h"
+#include <chrono>
+#include <thread>
+#include <future>
+#include <atomic>
+#include <algorithm>
+
+#include "testing.h"
+
+using asapo::Error;
+using asapo::ErrorType;
+using asapo::AddressFamilies;
+using asapo::SocketTypes;
+using asapo::SocketProtocols;
+using asapo::SocketDescriptor;
+using asapo::M_AssertEq;
+
+using namespace std::chrono;
+
+static const std::unique_ptr<asapo::IO> io(asapo::GenerateDefaultIO());
+static const std::string kListenAddress = "127.0.0.1:60123";
+static std::promise<void> kThreadStarted;
+std::atomic<int> exit_thread{0};
+
+void Exit(int exit_number) {
+    std::cerr << "ERROR: Exit on " << exit_number << std::endl;
+    exit(exit_number);
+}
+
+void ExitIfErrIsNotOk(Error* err, int exit_number) {
+    if(*err != nullptr) {
+        std::cerr << "Explain(): " << (*err)->Explain() << std::endl;
+        Exit(exit_number);
+    }
+}
+
+std::unique_ptr<std::thread> CreateEchoServerThread() {
+    return io->NewThread([&] {
+        Error err;
+        SocketDescriptor master_socket = io->CreateAndBindIPTCPSocketListener(kListenAddress, 3, &err);
+        std::cout << "[SERVER] master socket " << master_socket << std::endl;
+        ExitIfErrIsNotOk(&err, 100);
+        kThreadStarted.set_value();
+        asapo::ListSocketDescriptors sockets_to_listen;
+        sockets_to_listen.push_back(master_socket);
+        while (!exit_thread) {
+            auto sockets = io->WaitSocketsActivity(sockets_to_listen, &err);
+            for(auto socket : sockets) {
+                std::cout << "[SERVER] processing socket " << socket << std::endl;
+                if (socket == master_socket) {
+                    auto client_info_tuple = io->InetAcceptConnection(socket, &err);
+                    ExitIfErrIsNotOk(&err, 103);
+                    std::string client_address;
+                    SocketDescriptor client_fd;
+                    std::tie(client_address, client_fd) = *client_info_tuple;
+                    std::cout << "[SERVER] accepted connection from " << client_address << std::endl;
+                    sockets_to_listen.push_back(client_fd);
+                } else {
+                    uint64_t message;
+                    io->Receive(socket, &message, sizeof(uint64_t), &err);
+                    if (err == asapo::ErrorTemplates::kEndOfFile) {
+                        std::cout << "[SERVER] end of file " << socket << std::endl;
+                        io->CloseSocket(socket, &err);
+                        sockets_to_listen.erase(std::remove(sockets_to_listen.begin(), sockets_to_listen.end(), socket),
+                                                sockets_to_listen.end());
+                        continue;
+                    }
+                    ExitIfErrIsNotOk(&err, 104);
+
+                    io->Send(socket, &message, sizeof(uint64_t), &err);
+                    ExitIfErrIsNotOk(&err, 105);
+                }
+            }
+        }
+        for(auto socket : sockets_to_listen) {
+            std::cout << "[SERVER] close socket " << socket << std::endl;
+            io->CloseSocket(socket, &err);
+            ExitIfErrIsNotOk(&err, 108);
+        }
+    });
+}
+
+void CheckNormal(int times) {
+    Error err;
+    std::cout << "[CLIENT] CreateAndConnectIPTCPSocket" << std::endl;
+    SocketDescriptor socket = io->CreateAndConnectIPTCPSocket(kListenAddress, &err);
+    ExitIfErrIsNotOk(&err, 201);
+
+
+    for (int i = 0; i < times; i++) {
+        std::cout << "[CLIENT] send random number" << std::endl;
+
+        uint64_t message_send = rand();
+
+        std::cout << "[CLIENT] Send Size" << std::endl;
+        io->Send(socket, &message_send, sizeof(uint64_t), &err);
+        ExitIfErrIsNotOk(&err, 203);
+
+        uint64_t message_recv;
+        io->Receive(socket, &message_recv, sizeof(uint64_t), &err);
+        ExitIfErrIsNotOk(&err, 206);
+        if(message_recv != message_send) {
+            Exit(205);
+        }
+    }
+    std::cout << "[CLIENT] Close" << std::endl;
+    io->CloseSocket(socket, &err);
+    ExitIfErrIsNotOk(&err, 108);
+}
+
+int main(int argc, char* argv[]) {
+    Error err;
+    std::unique_ptr<std::thread> server_thread = CreateEchoServerThread();
+    kThreadStarted.get_future().get();//Make sure that the server is started
+
+    std::cout << "Check" << std::endl;
+    auto thread1 = io->NewThread([&] {
+      CheckNormal(30);
+    });
+    auto thread2 = io->NewThread([&] {
+      CheckNormal(30);
+    });
+    auto thread3 = io->NewThread([&] {
+      CheckNormal(30);
+    });
+
+    thread1->join();
+    thread2->join();
+    thread3->join();
+
+    exit_thread = 1;
+
+    std::cout << "server_thread->join()" << std::endl;
+    server_thread->join();
+
+    return 0;
+}
-- 
GitLab