From b96cd9abc45251e17439619adae82d324fabb857 Mon Sep 17 00:00:00 2001
From: Carsten Patzke <carsten.patzke@desy.de>
Date: Wed, 31 Jan 2018 14:27:38 +0100
Subject: [PATCH] Improved transfer speed significantly :)

---
 common/cpp/include/system_wrappers/io.h       |   1 +
 .../cpp/include/system_wrappers/system_io.h   |   2 +
 common/cpp/src/system_io.cpp                  |  44 +++++--
 common/cpp/src/system_io_linux.cpp            |   6 +-
 common/cpp/src/system_io_windows.cpp          |   4 +-
 .../dummy_data_producer.cpp                   |   1 -
 receiver/src/network_producer_peer.cpp        |   4 +-
 tests/system_io/CMakeLists.txt                |   1 +
 .../ip_tcp_network_speedtest/CMakeLists.txt   |  22 ++++
 .../ip_tcp_network_speedtest.cpp              | 114 ++++++++++++++++++
 10 files changed, 183 insertions(+), 16 deletions(-)
 create mode 100644 tests/system_io/ip_tcp_network_speedtest/CMakeLists.txt
 create mode 100644 tests/system_io/ip_tcp_network_speedtest/ip_tcp_network_speedtest.cpp

diff --git a/common/cpp/include/system_wrappers/io.h b/common/cpp/include/system_wrappers/io.h
index 5c9f73bd9..da8f97113 100644
--- a/common/cpp/include/system_wrappers/io.h
+++ b/common/cpp/include/system_wrappers/io.h
@@ -16,6 +16,7 @@ enum class IOErrors {
     kUnknownError,
     kNoError,
     kBadFileNumber,
+    kResourceTemporarilyUnavailable,
     kFileNotFound,
     kReadError,
     kPermissionDenied,
diff --git a/common/cpp/include/system_wrappers/system_io.h b/common/cpp/include/system_wrappers/system_io.h
index 3688ad720..78f742e01 100644
--- a/common/cpp/include/system_wrappers/system_io.h
+++ b/common/cpp/include/system_wrappers/system_io.h
@@ -12,6 +12,8 @@ namespace hidra2 {
 
 class SystemIO final : public IO {
   private:
+    static const int kNetBufferSize;//TODO: need to set by config
+
     //void CollectFileInformationRecursivly(const std::string& path, std::vector<FileInfo>* files, IOErrors* err) const;
     int FileOpenModeToPosixFileOpenMode(int open_flags) const;
     IOErrors GetLastError() const;
diff --git a/common/cpp/src/system_io.cpp b/common/cpp/src/system_io.cpp
index 89de7fb44..22dc6a907 100644
--- a/common/cpp/src/system_io.cpp
+++ b/common/cpp/src/system_io.cpp
@@ -12,6 +12,8 @@
 
 namespace hidra2 {
 
+const int SystemIO::kNetBufferSize = 1024 * 1024 * 50; //MiByte
+
 /*******************************************************************************
  *                              system_io.cpp                                  *
  * THIS FILE HOLDS GENERAL FUNCTIONS THAT CAN BE USED ON WINDOWS AND ON LINUX  *
@@ -68,6 +70,10 @@ hidra2::FileDescriptor hidra2::SystemIO::CreateAndConnectIPTCPSocket(const std::
     *err = hidra2::IOErrors::kNoError;
 
     FileDescriptor fd = CreateSocket(AddressFamilies::INET, SocketTypes::STREAM, SocketProtocols::IP, err);
+
+    setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &kNetBufferSize, sizeof(kNetBufferSize));
+    setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &kNetBufferSize, sizeof(kNetBufferSize));
+
     if(*err != IOErrors::kNoError) {
         return -1;
     }
@@ -112,6 +118,8 @@ hidra2::FileDescriptor hidra2::SystemIO::Open(const std::string& filename,
     FileDescriptor fd = _open(filename.c_str(), flags);
     if(fd == -1) {
         *err = GetLastError();
+    } else {
+        *err = IOErrors::kNoError;
     }
     return fd;
 }
@@ -209,8 +217,6 @@ SocketDescriptor SystemIO::CreateSocket(AddressFamilies address_family,
                                         SocketTypes socket_type,
                                         SocketProtocols socket_protocol,
                                         IOErrors* err) const {
-    *err = IOErrors::kNoError;
-
     int domain = AddressFamilyToPosixFamily(address_family);
     if(domain == -1) {
         *err = IOErrors::kUnsupportedAddressFamily;
@@ -229,9 +235,17 @@ SocketDescriptor SystemIO::CreateSocket(AddressFamilies address_family,
         return -1;
     }
 
-    int fd = _socket(domain, type, protocol);
-    *err = GetLastError();
-    return fd;
+    int socket_fd = _socket(domain, type, protocol);
+    if(socket_fd == -1) {
+        *err = GetLastError();
+        return socket_fd;
+    }
+
+    setsockopt(socket_fd, SOL_SOCKET, SO_SNDBUF, &kNetBufferSize, sizeof(kNetBufferSize));
+    setsockopt(socket_fd, SOL_SOCKET, SO_RCVBUF, &kNetBufferSize, sizeof(kNetBufferSize));
+
+    *err = IOErrors::kNoError;
+    return socket_fd;
 }
 
 void hidra2::SystemIO::InetBind(SocketDescriptor socket_fd, const std::string& address,
@@ -272,7 +286,6 @@ void hidra2::SystemIO::Listen(SocketDescriptor socket_fd, int backlog, hidra2::I
 }
 
 size_t hidra2::SystemIO::Receive(SocketDescriptor socket_fd, void* buf, size_t length, IOErrors* err) const {
-    *err = hidra2::IOErrors::kNoError;
 
     size_t already_received = 0;
 
@@ -284,12 +297,16 @@ size_t hidra2::SystemIO::Receive(SocketDescriptor socket_fd, void* buf, size_t l
         }
         if (received_amount == -1) {
             *err = GetLastError();
+            if(*err == IOErrors::kResourceTemporarilyUnavailable) {
+                continue;
+            }
             if (*err != IOErrors::kNoError) {
                 return already_received;
             }
         }
         already_received += received_amount;
     }
+    *err = hidra2::IOErrors::kNoError;
     return already_received;
 }
 
@@ -323,7 +340,6 @@ size_t hidra2::SystemIO::Send(SocketDescriptor socket_fd,
                               const void* buf,
                               size_t length,
                               IOErrors* err) const {
-    *err = hidra2::IOErrors::kNoError;
 
     size_t already_sent = 0;
 
@@ -335,6 +351,9 @@ size_t hidra2::SystemIO::Send(SocketDescriptor socket_fd,
         }
         if (send_amount == -1) {
             *err = GetLastError();
+            if(*err == IOErrors::kResourceTemporarilyUnavailable) {
+                continue;
+            }
             if (*err != IOErrors::kNoError) {
                 return already_sent;
             }
@@ -342,6 +361,7 @@ size_t hidra2::SystemIO::Send(SocketDescriptor socket_fd,
         already_sent += send_amount;
     }
 
+    *err = hidra2::IOErrors::kNoError;
     return already_sent;
 }
 
@@ -386,8 +406,11 @@ std::vector<hidra2::FileInfo> hidra2::SystemIO::FilesInFolder(const std::string&
 }
 
 void hidra2::SystemIO::CreateNewDirectory(const std::string& directory_name, hidra2::IOErrors* err) const {
-    _mkdir(directory_name.c_str());
-    *err = GetLastError();
+    if(_mkdir(directory_name.c_str()) == -1) {
+        *err = GetLastError();
+    } else {
+        *err = IOErrors::kNoError;
+    }
 }
 std::unique_ptr<std::tuple<std::string, uint16_t>> SystemIO::SplitAddressToHostAndPort(std::string address) const {
     try {
@@ -450,6 +473,9 @@ IOErrors* err) const {
         return nullptr;
     }
 
+    setsockopt(peer_fd, SOL_SOCKET, SO_RCVBUF, &kNetBufferSize, sizeof(kNetBufferSize));
+    setsockopt(peer_fd, SOL_SOCKET, SO_SNDBUF, &kNetBufferSize, sizeof(kNetBufferSize));
+
     std::string
     address = std::string(inet_ntoa(client_address.sin_addr)) + ':' + std::to_string(client_address.sin_port);
     return std::unique_ptr<std::tuple<std::string, SocketDescriptor>>(new
diff --git a/common/cpp/src/system_io_linux.cpp b/common/cpp/src/system_io_linux.cpp
index 9ac0c1655..43478ca24 100644
--- a/common/cpp/src/system_io_linux.cpp
+++ b/common/cpp/src/system_io_linux.cpp
@@ -30,6 +30,8 @@ IOErrors GetLastErrorFromErrno() {
         return IOErrors::kNoError;
     case EBADF:
         return IOErrors::kBadFileNumber;
+    case EAGAIN:
+        return IOErrors::kResourceTemporarilyUnavailable;
     case ENOENT:
     case ENOTDIR:
         return IOErrors::kFileNotFound;
@@ -179,11 +181,11 @@ SocketDescriptor SystemIO::_socket(int address_family, int socket_type, int sock
 }
 
 ssize_t SystemIO::_send(SocketDescriptor socket_fd, const void* buffer, size_t length) const {
-    return ::send(socket_fd, buffer, length, 0);
+    return ::send(socket_fd, buffer, length, MSG_DONTWAIT);
 }
 
 ssize_t SystemIO::_recv(SocketDescriptor socket_fd, void* buffer, size_t length) const {
-    return ::recv(socket_fd, buffer, length, 0);
+    return ::recv(socket_fd, buffer, length, MSG_DONTWAIT);
 }
 
 int SystemIO::_mkdir(const char* dirname) const {
diff --git a/common/cpp/src/system_io_windows.cpp b/common/cpp/src/system_io_windows.cpp
index 53f29ab7a..1e84733c7 100644
--- a/common/cpp/src/system_io_windows.cpp
+++ b/common/cpp/src/system_io_windows.cpp
@@ -148,11 +148,11 @@ FileDescriptor SystemIO::_open(const char* filename, int posix_open_flags) const
 }
 
 bool SystemIO::_close(FileDescriptor fd) const {
-	return ::_close(fd) == 0;
+    return ::_close(fd) == 0;
 }
 
 bool SystemIO::_close_socket(SocketDescriptor fd) const {
-	return ::closesocket(fd) == 0;
+    return ::closesocket(fd) == 0;
 }
 
 ssize_t SystemIO::_read(FileDescriptor fd, void* buffer, size_t length) const {
diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
index 5dbad173d..820df34ad 100644
--- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp
+++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
@@ -61,7 +61,6 @@ int main (int argc, char* argv[]) {
               << std::endl;
 
     SendDummyData(receiver_address, number_of_byte, iterations);
-    getchar();
 
 }
 
diff --git a/receiver/src/network_producer_peer.cpp b/receiver/src/network_producer_peer.cpp
index 343af5018..00726c5dc 100644
--- a/receiver/src/network_producer_peer.cpp
+++ b/receiver/src/network_producer_peer.cpp
@@ -45,7 +45,7 @@ void NetworkProducerPeer::internal_receiver_thread_() {
     while(is_listening_) {
         err = IOErrors::kNoError;
 
-        size_t size = io->ReceiveTimeout(socket_fd_, generic_request, sizeof(GenericNetworkRequest), 50, &err);		
+        size_t size = io->ReceiveTimeout(socket_fd_, generic_request, sizeof(GenericNetworkRequest), 50, &err);
         if(err != IOErrors::kNoError) {
             if(err == IOErrors::kTimeout) {
                 std::this_thread::yield();
@@ -62,7 +62,7 @@ void NetworkProducerPeer::internal_receiver_thread_() {
             break;
         }
 
-		assert(size);
+        assert(size);
 
         std::cout << "[" << connection_id() << "] Got request: " << generic_request->op_code << std::endl;
         size_t bytes_to_send = handle_generic_request_(generic_request, generic_response);
diff --git a/tests/system_io/CMakeLists.txt b/tests/system_io/CMakeLists.txt
index c8887c717..de44b3dd3 100644
--- a/tests/system_io/CMakeLists.txt
+++ b/tests/system_io/CMakeLists.txt
@@ -3,4 +3,5 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.7) # needed for fixtures
 add_subdirectory(read_files_in_folder)
 add_subdirectory(read_file_content)
 add_subdirectory(ip_tcp_network)
+add_subdirectory(ip_tcp_network_speedtest)
 
diff --git a/tests/system_io/ip_tcp_network_speedtest/CMakeLists.txt b/tests/system_io/ip_tcp_network_speedtest/CMakeLists.txt
new file mode 100644
index 000000000..5a9c80198
--- /dev/null
+++ b/tests/system_io/ip_tcp_network_speedtest/CMakeLists.txt
@@ -0,0 +1,22 @@
+set(TARGET_NAME ip_tcp_network_speedtest)
+set(SOURCE_FILES ip_tcp_network_speedtest.cpp)
+
+
+################################
+# Executable and link
+################################
+add_executable(${TARGET_NAME} ${SOURCE_FILES} $<TARGET_OBJECTS:common>)
+
+#Add all necessary common libraries
+GET_PROPERTY(HIDRA2_COMMON_IO_LIBRARIES GLOBAL PROPERTY HIDRA2_COMMON_IO_LIBRARIES)
+target_link_libraries(${TARGET_NAME} ${HIDRA2_COMMON_IO_LIBRARIES})
+
+target_link_libraries(${TARGET_NAME} test_common)
+target_include_directories(${TARGET_NAME} PUBLIC ${CMAKE_SOURCE_DIR}/common/cpp/include)
+set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX)
+
+################################
+# Testing
+################################
+
+add_integration_test(${TARGET_NAME} ip_tcp_network_speedtest "")
diff --git a/tests/system_io/ip_tcp_network_speedtest/ip_tcp_network_speedtest.cpp b/tests/system_io/ip_tcp_network_speedtest/ip_tcp_network_speedtest.cpp
new file mode 100644
index 000000000..c58a5736f
--- /dev/null
+++ b/tests/system_io/ip_tcp_network_speedtest/ip_tcp_network_speedtest.cpp
@@ -0,0 +1,114 @@
+#include <iostream>
+#include <system_wrappers/system_io.h>
+#include <future>
+#include <iomanip>
+
+#include "testing.h"
+
+using hidra2::SystemIO;
+using hidra2::IOErrors;
+using hidra2::AddressFamilies;
+using hidra2::SocketTypes;
+using hidra2::SocketProtocols;
+using hidra2::FileDescriptor;
+using hidra2::M_AssertEq;
+
+using namespace std::chrono;
+using std::chrono::high_resolution_clock;
+
+static const std::unique_ptr<SystemIO> io(new SystemIO());
+static const std::string kListenAddress = "127.0.0.1:4206";
+static std::promise<void> kThreadStarted;
+
+static size_t kTestSize = size_t(1024) * size_t(1024) * size_t(1024) * size_t(1); //1GiByte
+static int kTestCount = 50;
+
+void Exit(int exit_number) {
+    std::cerr << "ERROR: Exit on " << exit_number << std::endl;
+    getchar();
+    exit(exit_number);
+}
+
+void ExitIfErrIsNotOk(IOErrors* err, int exit_number) {
+    if(*err != IOErrors::kNoError) {
+        Exit(exit_number);
+    }
+}
+
+std::thread* CreateEchoServerThread() {
+    return io->NewThread([] {
+        std::unique_ptr<uint8_t[]> kBufferServer(new uint8_t[kTestSize]);
+
+        IOErrors err;
+        FileDescriptor socket = io->CreateSocket(AddressFamilies::INET, SocketTypes::STREAM, SocketProtocols::IP, &err);
+        ExitIfErrIsNotOk(&err, 100);
+        io->InetBind(socket, kListenAddress, &err);
+        std::cout << "[SERVER] Bind on: " << kListenAddress << std::endl;
+        std::cout << "[SERVER] Listen" << std::endl;
+        ExitIfErrIsNotOk(&err, 101);
+        io->Listen(socket, 5, &err);
+        ExitIfErrIsNotOk(&err, 102);
+        kThreadStarted.set_value();
+
+        std::cout << "[SERVER] InetAccept" << std::endl;
+        auto client_info_tuple = io->InetAccept(socket, &err);
+        ExitIfErrIsNotOk(&err, 103);
+        std::string client_address;
+        FileDescriptor client_fd;
+        std::tie(client_address, client_fd) = *client_info_tuple;
+
+        for(int i = 0; i < kTestCount; i++) {
+            io->Receive(client_fd, kBufferServer.get(), kTestSize, &err);
+            ExitIfErrIsNotOk(&err, 105);
+        }
+
+        std::cout << "[SERVER] Close client_fd" << std::endl;
+        io->CloseSocket(client_fd, &err);
+        ExitIfErrIsNotOk(&err, 106);
+
+        std::cout << "[SERVER] Close server socket" << std::endl;
+        io->CloseSocket(socket, &err);
+        ExitIfErrIsNotOk(&err, 107);
+    });
+}
+
+void Speedtest() {
+    std::unique_ptr<uint8_t[]> kBufferClient(new uint8_t[kTestSize]);
+
+    IOErrors err;
+    std::cout << "[CLIENT] CreateAndConnectIPTCPSocket" << std::endl;
+    FileDescriptor socket = io->CreateAndConnectIPTCPSocket(kListenAddress, &err);
+    ExitIfErrIsNotOk(&err, 201);
+
+    for(int i = 0; i < kTestCount; i++) {
+        high_resolution_clock::time_point t1 = high_resolution_clock::now();
+        io->Send(socket, kBufferClient.get(), kTestSize, &err);
+        ExitIfErrIsNotOk(&err, 203);
+        high_resolution_clock::time_point t2 = high_resolution_clock::now();
+
+        double tookMs = std::chrono::duration_cast<std::chrono::milliseconds>( t2 - t1 ).count();
+        std::cout << std::setprecision(2) << std::fixed << (kTestSize / (tookMs / 1000)) / 1024 / 1024 / 1024 << " GiByte/s" <<
+                  std::endl;
+        /*
+        << "\t"<< std::fixed << kTestSize/(tookMs/1000) << " Byte/s" << std::endl
+        << "\tTime " << tookMs << "ms " << std::endl
+        << "\tfor  " << kTestSize << " Byte" << std::endl;
+        */
+    }
+
+    std::cout << "[CLIENT] Close" << std::endl;
+    io->CloseSocket(socket, &err);
+    ExitIfErrIsNotOk(&err, 204);
+}
+
+int main(int argc, char* argv[]) {
+    std::thread* server_thread = CreateEchoServerThread();
+    kThreadStarted.get_future().get();//Make sure that the server is started
+
+    Speedtest();
+
+    std::cout << "server_thread->join()" << std::endl;
+    server_thread->join();
+
+    return 0;
+}
-- 
GitLab