From 77c3908b4d7ef30d833bcb24fcdffc2a6a3d3785 Mon Sep 17 00:00:00 2001 From: Carsten Patzke <carsten.patzke@desy.de> Date: Fri, 26 Jan 2018 17:38:40 +0100 Subject: [PATCH] Fixed build on linux --- common/cpp/CMakeLists.txt | 4 +- common/cpp/include/common/networking.h | 2 +- common/cpp/include/system_wrappers/io.h | 31 ++-- .../cpp/include/system_wrappers/system_io.h | 32 ++-- common/cpp/src/system_io.cpp | 149 +++++++-------- common/cpp/src/system_io_linux.cpp | 100 +--------- common/cpp/src/system_io_windows.cpp | 168 ++++++++--------- common/cpp/unittests/MockIO.h | 67 ++++--- .../dummy_data_producer.cpp | 2 +- receiver/src/network_producer_peer.cpp | 10 +- .../src/network_producer_peer_handlers.cpp | 10 +- .../ip_tcp_network/ip_tcp_network.cpp | 173 +++++++++--------- 12 files changed, 350 insertions(+), 398 deletions(-) diff --git a/common/cpp/CMakeLists.txt b/common/cpp/CMakeLists.txt index db7e0b405..96a10b110 100644 --- a/common/cpp/CMakeLists.txt +++ b/common/cpp/CMakeLists.txt @@ -1,4 +1,3 @@ -find_package(Threads REQUIRED) set(TARGET_NAME common) set(SOURCE_FILES @@ -18,9 +17,10 @@ ENDIF(WIN32) # Library ################################ IF(WIN32) + find_package(Threads REQUIRED) SET_PROPERTY(GLOBAL PROPERTY HIDRA2_COMMON_IO_LIBRARIES ${CMAKE_THREAD_LIBS_INIT} wsock32 ws2_32) ELSEIF(UNIX) - SET_PROPERTY(GLOBAL PROPERTY HIDRA2_COMMON_IO_LIBRARIES ${CMAKE_THREAD_LIBS_INIT}) + SET_PROPERTY(GLOBAL PROPERTY HIDRA2_COMMON_IO_LIBRARIES Threads::Threads) ENDIF(WIN32) add_library(${TARGET_NAME} OBJECT ${SOURCE_FILES}) #add_library(${TARGET_NAME} SHARED ${SOURCE_FILES}) diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index b1108e686..f477ac9d3 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -8,7 +8,7 @@ namespace hidra2 { typedef uint64_t NetworkRequestId; enum OpCode : uint8_t { - OP_CODE_A, + OP_CODE_A, OP_CODE__SEND_DATA, OP_CODE_COUNT, diff --git a/common/cpp/include/system_wrappers/io.h b/common/cpp/include/system_wrappers/io.h index 44ff508ec..978ba9539 100644 --- a/common/cpp/include/system_wrappers/io.h +++ b/common/cpp/include/system_wrappers/io.h @@ -73,20 +73,23 @@ class IO { /* * Network */ - virtual SocketDescriptor CreateSocket(AddressFamilies address_family, SocketTypes socket_type, SocketProtocols socket_protocol, IOErrors* err) const = 0; - virtual void Listen(SocketDescriptor socket_fd, int backlog, IOErrors* err) const = 0; - virtual void InetBind(SocketDescriptor socket_fd, const std::string& address, IOErrors* err) const = 0; - virtual std::unique_ptr<std::tuple<std::string, SocketDescriptor>> InetAccept(SocketDescriptor socket_fd, IOErrors* err) const = 0; - virtual void InetConnect(SocketDescriptor socket_fd, const std::string& address, IOErrors* err) const = 0; - virtual SocketDescriptor CreateAndConnectIPTCPSocket(const std::string& address, IOErrors* err) const = 0; - virtual size_t Receive(SocketDescriptor socket_fd, void* buf, size_t length, IOErrors* err) const = 0; - virtual size_t ReceiveTimeout(SocketDescriptor socket_fd, void* buf, size_t length, long timeout_in_usec, IOErrors* err) const = 0; - virtual size_t Send(SocketDescriptor socket_fd, const void* buf, size_t length, IOErrors* err) const = 0; - virtual void Skip(SocketDescriptor socket_fd, size_t length, IOErrors* err) const = 0; - /** - * @param err Since CloseSocket if often used in an error case, it's able to accept nullptr. - */ - virtual void CloseSocket(SocketDescriptor socket_fd, IOErrors* err) const = 0; + virtual SocketDescriptor CreateSocket(AddressFamilies address_family, SocketTypes socket_type, + SocketProtocols socket_protocol, IOErrors* err) const = 0; + virtual void Listen(SocketDescriptor socket_fd, int backlog, IOErrors* err) const = 0; + virtual void InetBind(SocketDescriptor socket_fd, const std::string& address, IOErrors* err) const = 0; + virtual std::unique_ptr<std::tuple<std::string, SocketDescriptor>> InetAccept(SocketDescriptor socket_fd, + IOErrors* err) const = 0; + virtual void InetConnect(SocketDescriptor socket_fd, const std::string& address, IOErrors* err) const = 0; + virtual SocketDescriptor CreateAndConnectIPTCPSocket(const std::string& address, IOErrors* err) const = 0; + virtual size_t Receive(SocketDescriptor socket_fd, void* buf, size_t length, IOErrors* err) const = 0; + virtual size_t ReceiveTimeout(SocketDescriptor socket_fd, void* buf, size_t length, long timeout_in_usec, + IOErrors* err) const = 0; + virtual size_t Send(SocketDescriptor socket_fd, const void* buf, size_t length, IOErrors* err) const = 0; + virtual void Skip(SocketDescriptor socket_fd, size_t length, IOErrors* err) const = 0; + /** + * @param err Since CloseSocket if often used in an error case, it's able to accept nullptr. + */ + virtual void CloseSocket(SocketDescriptor socket_fd, IOErrors* err) const = 0; /* * Filesystem diff --git a/common/cpp/include/system_wrappers/system_io.h b/common/cpp/include/system_wrappers/system_io.h index 1e8e71287..460a6b40f 100644 --- a/common/cpp/include/system_wrappers/system_io.h +++ b/common/cpp/include/system_wrappers/system_io.h @@ -22,16 +22,16 @@ class SystemIO final : public IO { // Function maps. Should never be called apart from in wrapper function FileDescriptor _open(const char* filename, int posix_open_flags) const; - void _close(FileDescriptor fd) const; + void _close(FileDescriptor fd) const; ssize_t _read(FileDescriptor fd, void* buffer, size_t length) const; ssize_t _write(FileDescriptor fd, const void* buffer, size_t count) const; - int _mkdir(const char* dirname) const; + int _mkdir(const char* dirname) const; - SocketDescriptor _socket(int address_family, int socket_type, int socket_protocol) const; + SocketDescriptor _socket(int address_family, int socket_type, int socket_protocol) const; int _listen(SocketDescriptor fd, int backlog) const; ssize_t _send(SocketDescriptor socket_fd, const void* buffer, size_t length) const; ssize_t _recv(SocketDescriptor socket_fd, void* buffer, size_t length) const; - void _close_socket(SocketDescriptor fd) const; + void _close_socket(SocketDescriptor fd) const; std::unique_ptr<std::tuple<std::string, uint16_t>> SplitAddressToHostAndPort(std::string address) const; @@ -48,17 +48,19 @@ class SystemIO final : public IO { /* * Network */ - SocketDescriptor CreateSocket(AddressFamilies address_family, SocketTypes socket_type, SocketProtocols socket_protocol, IOErrors* err) const; - void Listen(SocketDescriptor socket_fd, int backlog, IOErrors* err) const; - void InetBind(SocketDescriptor socket_fd, const std::string& address, IOErrors* err) const; - std::unique_ptr<std::tuple<std::string, SocketDescriptor>> InetAccept(SocketDescriptor socket_fd, IOErrors* err) const; - void InetConnect(SocketDescriptor socket_fd, const std::string& address, IOErrors* err) const; - SocketDescriptor CreateAndConnectIPTCPSocket(const std::string& address, IOErrors* err) const; - size_t Receive(SocketDescriptor socket_fd, void* buf, size_t length, IOErrors* err) const; - size_t ReceiveTimeout(SocketDescriptor socket_fd, void* buf, size_t length, long timeout_in_usec, IOErrors* err) const; - size_t Send(SocketDescriptor socket_fd, const void* buf, size_t length, IOErrors* err) const; - void Skip(SocketDescriptor socket_fd, size_t length, IOErrors* err) const; - void CloseSocket(SocketDescriptor socket_fd, IOErrors* err) const; + SocketDescriptor CreateSocket(AddressFamilies address_family, SocketTypes socket_type, SocketProtocols socket_protocol, + IOErrors* err) const; + void Listen(SocketDescriptor socket_fd, int backlog, IOErrors* err) const; + void InetBind(SocketDescriptor socket_fd, const std::string& address, IOErrors* err) const; + std::unique_ptr<std::tuple<std::string, SocketDescriptor>> InetAccept(SocketDescriptor socket_fd, IOErrors* err) const; + void InetConnect(SocketDescriptor socket_fd, const std::string& address, IOErrors* err) const; + SocketDescriptor CreateAndConnectIPTCPSocket(const std::string& address, IOErrors* err) const; + size_t Receive(SocketDescriptor socket_fd, void* buf, size_t length, IOErrors* err) const; + size_t ReceiveTimeout(SocketDescriptor socket_fd, void* buf, size_t length, long timeout_in_usec, + IOErrors* err) const; + size_t Send(SocketDescriptor socket_fd, const void* buf, size_t length, IOErrors* err) const; + void Skip(SocketDescriptor socket_fd, size_t length, IOErrors* err) const; + void CloseSocket(SocketDescriptor socket_fd, IOErrors* err) const; /* * Filesystem diff --git a/common/cpp/src/system_io.cpp b/common/cpp/src/system_io.cpp index fbfad4099..8b8c42dc9 100644 --- a/common/cpp/src/system_io.cpp +++ b/common/cpp/src/system_io.cpp @@ -7,7 +7,11 @@ #include <windows.h> #undef CreateDirectory #endif - +#ifdef __linux__ +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#endif namespace hidra2 { @@ -116,17 +120,17 @@ hidra2::FileDescriptor hidra2::SystemIO::Open(const std::string& filename, } void hidra2::SystemIO::CloseSocket(SocketDescriptor fd, hidra2::IOErrors* err) const { - _close_socket(fd); - if (err) { - *err = GetLastError(); - } + _close_socket(fd); + if (err) { + *err = GetLastError(); + } } void hidra2::SystemIO::Close(FileDescriptor fd, hidra2::IOErrors* err) const { - _close(fd); - if (err) { - *err = GetLastError(); - } + _close(fd); + if (err) { + *err = GetLastError(); + } } size_t hidra2::SystemIO::Read(FileDescriptor fd, void* buf, size_t length, IOErrors* err) const { @@ -173,33 +177,33 @@ size_t hidra2::SystemIO::Write(FileDescriptor fd, const void* buf, size_t length short hidra2::SystemIO::AddressFamilyToPosixFamily(AddressFamilies address_family) const { - switch (address_family) { - case AddressFamilies::INET: - return AF_INET; - } - return -1; + switch (address_family) { + case AddressFamilies::INET: + return AF_INET; + } + return -1; } int hidra2::SystemIO::SocketTypeToPosixType(SocketTypes socket_type) const { - switch (socket_type) { - case SocketTypes::STREAM: - return SOCK_STREAM; - } - return -1; + switch (socket_type) { + case SocketTypes::STREAM: + return SOCK_STREAM; + } + return -1; } int hidra2::SystemIO::SocketProtocolToPosixProtocol(SocketProtocols socket_protocol) const { - switch (socket_protocol) { - case SocketProtocols::IP: - return IPPROTO_IP; - } - return -1; + switch (socket_protocol) { + case SocketProtocols::IP: + return IPPROTO_IP; + } + return -1; } SocketDescriptor SystemIO::CreateSocket(AddressFamilies address_family, - SocketTypes socket_type, - SocketProtocols socket_protocol, - IOErrors* err) const { + SocketTypes socket_type, + SocketProtocols socket_protocol, + IOErrors* err) const { *err = IOErrors::kNoError; int domain = AddressFamilyToPosixFamily(address_family); @@ -226,32 +230,32 @@ SocketDescriptor SystemIO::CreateSocket(AddressFamilies address_family, } void hidra2::SystemIO::InetBind(SocketDescriptor socket_fd, const std::string& address, - IOErrors* err) const { - *err = IOErrors::kNoError; - - int family = AddressFamilyToPosixFamily(AddressFamilies::INET); - if (family == -1) { - *err = IOErrors::kUnsupportedAddressFamily; - return; - } - - auto host_port_tuple = SplitAddressToHostAndPort(address); - if (!host_port_tuple) { - *err = IOErrors::kInvalidAddressFormat; - return; - } - std::string host; - uint16_t port = 0; - std::tie(host, port) = *host_port_tuple; - - sockaddr_in socket_address{}; - socket_address.sin_addr.s_addr = inet_addr(host.c_str()); - socket_address.sin_port = htons(port); - socket_address.sin_family = family; - - if (::bind(socket_fd, reinterpret_cast<const sockaddr*>(&socket_address), sizeof(socket_address)) == -1) { - *err = GetLastError(); - } + IOErrors* err) const { + *err = IOErrors::kNoError; + + int family = AddressFamilyToPosixFamily(AddressFamilies::INET); + if (family == -1) { + *err = IOErrors::kUnsupportedAddressFamily; + return; + } + + auto host_port_tuple = SplitAddressToHostAndPort(address); + if (!host_port_tuple) { + *err = IOErrors::kInvalidAddressFormat; + return; + } + std::string host; + uint16_t port = 0; + std::tie(host, port) = *host_port_tuple; + + sockaddr_in socket_address{}; + socket_address.sin_addr.s_addr = inet_addr(host.c_str()); + socket_address.sin_port = htons(port); + socket_address.sin_family = family; + + if (::bind(socket_fd, reinterpret_cast<const sockaddr*>(&socket_address), sizeof(socket_address)) == -1) { + *err = GetLastError(); + } } void hidra2::SystemIO::Listen(SocketDescriptor socket_fd, int backlog, hidra2::IOErrors* err) const { @@ -283,27 +287,28 @@ size_t hidra2::SystemIO::Receive(SocketDescriptor socket_fd, void* buf, size_t l } -size_t hidra2::SystemIO::ReceiveTimeout(SocketDescriptor socket_fd, void* buf, size_t length, long timeout_in_usec, IOErrors* err) const { - *err = hidra2::IOErrors::kNoError; - - fd_set read_fds; - FD_ZERO(&read_fds); - FD_SET(socket_fd, &read_fds); - timeval timeout; - timeout.tv_sec = 0; - timeout.tv_usec = timeout_in_usec; +size_t hidra2::SystemIO::ReceiveTimeout(SocketDescriptor socket_fd, void* buf, size_t length, long timeout_in_usec, + IOErrors* err) const { + *err = hidra2::IOErrors::kNoError; - int res = ::select(socket_fd + 1, &read_fds, nullptr, nullptr, &timeout); - if (res == 0) { - *err = IOErrors::kTimeout; - return 0; - } - if (res == -1) { - *err = GetLastError(); - return 0; - } + fd_set read_fds; + FD_ZERO(&read_fds); + FD_SET(socket_fd, &read_fds); + timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = timeout_in_usec; + + int res = ::select(socket_fd + 1, &read_fds, nullptr, nullptr, &timeout); + if (res == 0) { + *err = IOErrors::kTimeout; + return 0; + } + if (res == -1) { + *err = GetLastError(); + return 0; + } - return Receive(socket_fd, buf, length, err); + return Receive(socket_fd, buf, length, err); } diff --git a/common/cpp/src/system_io_linux.cpp b/common/cpp/src/system_io_linux.cpp index 21e1add15..8c5bcd746 100644 --- a/common/cpp/src/system_io_linux.cpp +++ b/common/cpp/src/system_io_linux.cpp @@ -2,17 +2,15 @@ #include <cstring> + #include <dirent.h> #include <sys/stat.h> #include <algorithm> - -#include <errno.h> #include <netinet/in.h> #include <arpa/inet.h> #include <fcntl.h> #include <iostream> #include <zconf.h> -#include <assert.h> using std::string; using std::vector; @@ -160,31 +158,6 @@ void SystemIO::CollectFileInformationRecursivly(const std::string& path, closedir(dir); } - -int SystemIO::AddressFamilyToPosixFamily(AddressFamilies address_family) const { - switch (address_family) { - case AddressFamilies::INET: - return AF_INET; - } - return -1; -}; - -int SystemIO::SocketTypeToPosixType(SocketTypes socket_type) const { - switch (socket_type) { - case SocketTypes::STREAM: - return SOCK_STREAM; - } - return -1; -} - -int SystemIO::SocketProtocolToPosixProtocol(SocketProtocols socket_protocol) const { - switch (socket_protocol) { - case SocketProtocols::IP: - return IPPROTO_IP; - } - return -1; -} - hidra2::FileDescriptor hidra2::SystemIO::_open(const char* filename, int posix_open_flags) const { return ::open(filename, posix_open_flags, S_IWUSR | S_IRWXU); } @@ -201,15 +174,15 @@ ssize_t SystemIO::_write(hidra2::FileDescriptor fd, const void* buffer, size_t l return ::write(fd, buffer, length); } -FileDescriptor SystemIO::_socket(int address_family, int socket_type, int socket_protocol) const { +SocketDescriptor SystemIO::_socket(int address_family, int socket_type, int socket_protocol) const { return ::socket(address_family, socket_type, socket_protocol); } -ssize_t SystemIO::_send(FileDescriptor socket_fd, const void* buffer, size_t length) const { +ssize_t SystemIO::_send(SocketDescriptor socket_fd, const void* buffer, size_t length) const { return ::send(socket_fd, buffer, length, 0); } -ssize_t SystemIO::_recv(FileDescriptor socket_fd, void* buffer, size_t length) const { +ssize_t SystemIO::_recv(SocketDescriptor socket_fd, void* buffer, size_t length) const { return ::recv(socket_fd, buffer, length, 0); } @@ -217,40 +190,13 @@ int SystemIO::_mkdir(const char* dirname) const { return ::mkdir(dirname, S_IRWXU); } -int SystemIO::_listen(FileDescriptor fd, int backlog) const { - return ::listen(fd, backlog); +int SystemIO::_listen(SocketDescriptor socket_fd, int backlog) const { + return ::listen(socket_fd, backlog); } -void hidra2::SystemIO::InetBind(hidra2::FileDescriptor socket_fd, - const std::string& address, - hidra2::IOErrors* err) const { - *err = IOErrors::kNoError; - - int family = AddressFamilyToPosixFamily(AddressFamilies::INET); - if (family == -1) { - *err = IOErrors::kUnsupportedAddressFamily; - return; - } - - auto host_port_tuple = SplitAddressToHostAndPort(address); - if(!host_port_tuple) { - *err = IOErrors::kInvalidAddressFormat; - return; - } - std::string host; - uint16_t port = 0; - std::tie(host, port) = *host_port_tuple; - - sockaddr_in socket_address{}; - socket_address.sin_addr.s_addr = inet_addr(host.c_str()); - socket_address.sin_port = htons(port); - socket_address.sin_family = static_cast<sa_family_t>(family); - - if (::bind(socket_fd, reinterpret_cast<const sockaddr*>(&socket_address), sizeof(socket_address)) == -1) { - *err = GetLastError(); - } - +void SystemIO::_close_socket(SocketDescriptor socket_fd) const { + ::close(socket_fd); } void hidra2::SystemIO::InetConnect(FileDescriptor socket_fd, const std::string& address, hidra2::IOErrors* err) const { @@ -283,8 +229,8 @@ void hidra2::SystemIO::InetConnect(FileDescriptor socket_fd, const std::string& } -std::unique_ptr<std::tuple<std::string, hidra2::FileDescriptor>> hidra2::SystemIO::InetAccept( -hidra2::FileDescriptor socket_fd, IOErrors* err) const { +std::unique_ptr<std::tuple<std::string, hidra2::SocketDescriptor>> hidra2::SystemIO::InetAccept( +hidra2::SocketDescriptor socket_fd, IOErrors* err) const { *err = IOErrors::kNoError; sa_family_t family = AddressFamilyToPosixFamily(AddressFamilies::INET); if (family == -1) { @@ -311,30 +257,4 @@ hidra2::FileDescriptor socket_fd, IOErrors* err) const { peer_fd)); } -size_t hidra2::SystemIO::ReceiveTimeout(hidra2::FileDescriptor socket_fd, - void* buf, - size_t length, - uint16_t timeout_in_sec, - hidra2::IOErrors* err) const { - *err = hidra2::IOErrors::kNoError; - - fd_set read_fds; - FD_SET(socket_fd, &read_fds); - timeval timeout; - timeout.tv_sec = timeout_in_sec; - timeout.tv_usec = 0; - - int res = ::select(socket_fd + 1, &read_fds, nullptr, nullptr, &timeout); - if (res == 0) { - *err = IOErrors::kTimeout; - return 0; - } - if (res == -1) { - *err = GetLastError(); - return 0; - } - - return Receive(socket_fd, buf, length, err); -} - } diff --git a/common/cpp/src/system_io_windows.cpp b/common/cpp/src/system_io_windows.cpp index 70cf4bc88..391934940 100644 --- a/common/cpp/src/system_io_windows.cpp +++ b/common/cpp/src/system_io_windows.cpp @@ -17,26 +17,26 @@ using std::chrono::system_clock; namespace hidra2 { IOErrors IOErrorFromGetLastError() { - DWORD last_error = GetLastError(); + DWORD last_error = GetLastError(); switch (last_error) { case ERROR_SUCCESS : - return IOErrors::kNoError; + return IOErrors::kNoError; case ERROR_PATH_NOT_FOUND: case ERROR_FILE_NOT_FOUND: - return IOErrors::kFileNotFound; + return IOErrors::kFileNotFound; case ERROR_ACCESS_DENIED: - return IOErrors::kPermissionDenied; - case ERROR_CONNECTION_REFUSED: - return IOErrors::kConnectionRefused; - case WSAEFAULT: - return IOErrors::kInvalidMemoryAddress; - case WSAECONNRESET: - return IOErrors::kConnectionResetByPeer; - case WSAENOTSOCK: - return IOErrors::kSocketOperationOnNonSocket; + return IOErrors::kPermissionDenied; + case ERROR_CONNECTION_REFUSED: + return IOErrors::kConnectionRefused; + case WSAEFAULT: + return IOErrors::kInvalidMemoryAddress; + case WSAECONNRESET: + return IOErrors::kConnectionResetByPeer; + case WSAENOTSOCK: + return IOErrors::kSocketOperationOnNonSocket; default: - std::cout << "[IOErrorFromGetLastError] Unknown error code: " << last_error << std::endl; - return IOErrors::kUnknownError; + std::cout << "[IOErrorFromGetLastError] Unknown error code: " << last_error << std::endl; + return IOErrors::kUnknownError; } } @@ -144,60 +144,61 @@ void SystemIO::CollectFileInformationRecursivly(const std::string& path, } } -std::unique_ptr<std::tuple<std::string, SocketDescriptor>> SystemIO::InetAccept(SocketDescriptor socket_fd, IOErrors* err) const { - *err = IOErrors::kNoError; - static short family = AddressFamilyToPosixFamily(AddressFamilies::INET); - if (family == -1) { - *err = IOErrors::kUnsupportedAddressFamily; - return nullptr; - } - - sockaddr_in client_address{}; - static int client_address_size = sizeof(sockaddr_in); - - int peer_fd = ::accept(socket_fd, reinterpret_cast<sockaddr*>(&client_address), &client_address_size); - - if (peer_fd == -1) { - *err = GetLastError(); - return nullptr; - } - - std::string - address = std::string(inet_ntoa(client_address.sin_addr)) + ':' + std::to_string(client_address.sin_port); - return std::unique_ptr<std::tuple<std::string, SocketDescriptor>>(new - std::tuple<std::string, - SocketDescriptor>( - address, - peer_fd)); +std::unique_ptr<std::tuple<std::string, SocketDescriptor>> SystemIO::InetAccept(SocketDescriptor socket_fd, +IOErrors* err) const { + *err = IOErrors::kNoError; + static short family = AddressFamilyToPosixFamily(AddressFamilies::INET); + if (family == -1) { + *err = IOErrors::kUnsupportedAddressFamily; + return nullptr; + } + + sockaddr_in client_address{}; + static int client_address_size = sizeof(sockaddr_in); + + int peer_fd = ::accept(socket_fd, reinterpret_cast<sockaddr*>(&client_address), &client_address_size); + + if (peer_fd == -1) { + *err = GetLastError(); + return nullptr; + } + + std::string + address = std::string(inet_ntoa(client_address.sin_addr)) + ':' + std::to_string(client_address.sin_port); + return std::unique_ptr<std::tuple<std::string, SocketDescriptor>>(new + std::tuple<std::string, + SocketDescriptor>( + address, + peer_fd)); } void hidra2::SystemIO::InetConnect(SocketDescriptor socket_fd, const std::string& address, IOErrors* err) const { - *err = IOErrors::kNoError; - - auto host_port_tuple = SplitAddressToHostAndPort(address); - if (!host_port_tuple) { - *err = IOErrors::kInvalidAddressFormat; - return; - } - std::string host; - uint16_t port = 0; - std::tie(host, port) = *host_port_tuple; - - short family = AddressFamilyToPosixFamily(AddressFamilies::INET); - if (family == -1) { - *err = IOErrors::kUnsupportedAddressFamily; - return; - } - - sockaddr_in socket_address{}; - socket_address.sin_addr.s_addr = inet_addr(host.c_str()); - socket_address.sin_port = htons(port); - socket_address.sin_family = family; - - if (::connect(socket_fd, (struct sockaddr*) &socket_address, sizeof(socket_address)) == -1) { - *err = GetLastError(); - return; - } + *err = IOErrors::kNoError; + + auto host_port_tuple = SplitAddressToHostAndPort(address); + if (!host_port_tuple) { + *err = IOErrors::kInvalidAddressFormat; + return; + } + std::string host; + uint16_t port = 0; + std::tie(host, port) = *host_port_tuple; + + short family = AddressFamilyToPosixFamily(AddressFamilies::INET); + if (family == -1) { + *err = IOErrors::kUnsupportedAddressFamily; + return; + } + + sockaddr_in socket_address{}; + socket_address.sin_addr.s_addr = inet_addr(host.c_str()); + socket_address.sin_port = htons(port); + socket_address.sin_family = family; + + if (::connect(socket_fd, (struct sockaddr*) &socket_address, sizeof(socket_address)) == -1) { + *err = GetLastError(); + return; + } } FileDescriptor SystemIO::_open(const char* filename, int posix_open_flags) const { @@ -207,11 +208,11 @@ FileDescriptor SystemIO::_open(const char* filename, int posix_open_flags) const } void SystemIO::_close(FileDescriptor fd) const { - ::_close(fd); + ::_close(fd); } void SystemIO::_close_socket(SocketDescriptor fd) const { - ::closesocket(fd); + ::closesocket(fd); } ssize_t SystemIO::_read(FileDescriptor fd, void* buffer, size_t length) const { @@ -223,23 +224,22 @@ ssize_t SystemIO::_write(FileDescriptor fd, const void* buffer, size_t length) c } SocketDescriptor SystemIO::_socket(int address_family, int socket_type, int socket_protocol) const { - static bool WSAStartupDone = false; - if (!WSAStartupDone) { - WSAStartupDone = true; - WORD wVersionRequested = MAKEWORD(2, 2); - WSADATA wsaData; - int err = WSAStartup(wVersionRequested, &wsaData); - if (err != 0) { - std::cout << "[_socket/WSAStartup] Faild to WSAStartup with version 2.2" << std::endl; - WSACleanup(); - // Do not return, since ::socket has to set an errno - } - else { - std::atexit([] { - WSACleanup(); - }); - } - } + static bool WSAStartupDone = false; + if (!WSAStartupDone) { + WSAStartupDone = true; + WORD wVersionRequested = MAKEWORD(2, 2); + WSADATA wsaData; + int err = WSAStartup(wVersionRequested, &wsaData); + if (err != 0) { + std::cout << "[_socket/WSAStartup] Faild to WSAStartup with version 2.2" << std::endl; + WSACleanup(); + // Do not return, since ::socket has to set an errno + } else { + std::atexit([] { + WSACleanup(); + }); + } + } return ::socket(address_family, socket_type, socket_protocol); } diff --git a/common/cpp/unittests/MockIO.h b/common/cpp/unittests/MockIO.h index f58566a2b..60d1bf7c9 100644 --- a/common/cpp/unittests/MockIO.h +++ b/common/cpp/unittests/MockIO.h @@ -5,36 +5,53 @@ #define HIDRA2_COMMON__MOCKIO_H namespace hidra2 { - class MockIO : public IO { public: - MOCK_CONST_METHOD1(NewThread, std::thread * (std::function<void()> function)); - MOCK_CONST_METHOD4(CreateSocket, FileDescriptor(AddressFamilies address_family, SocketTypes socket_type, - SocketProtocols socket_protocol, IOErrors* err)); - MOCK_CONST_METHOD3(Listen, void(FileDescriptor socket_fd, int backlog, IOErrors* err)); - MOCK_CONST_METHOD3(InetBind, void(FileDescriptor socket_fd, const std::string& address, IOErrors* err)); - virtual std::unique_ptr<std::tuple<std::string, FileDescriptor>> InetAccept(FileDescriptor socket_fd, + MOCK_CONST_METHOD1(NewThread, + std::thread * (std::function<void()> function)); + MOCK_CONST_METHOD4(CreateSocket, + SocketDescriptor(AddressFamilies address_family, SocketTypes socket_type, SocketProtocols socket_protocol, + IOErrors* err)); + MOCK_CONST_METHOD3(Listen, + void(SocketDescriptor socket_fd, int backlog, IOErrors* err)); + MOCK_CONST_METHOD3(InetBind, + void(SocketDescriptor socket_fd, const std::string& address, IOErrors* err)); + virtual std::unique_ptr<std::tuple<std::string, SocketDescriptor>> InetAccept(SocketDescriptor socket_fd, IOErrors* err) const { - return std::unique_ptr<std::tuple<std::string, FileDescriptor>>(InetAccept_proxy(socket_fd, err)); + return std::unique_ptr<std::tuple<std::string, SocketDescriptor>>(InetAccept_proxy(socket_fd, err)); } - MOCK_CONST_METHOD2(InetAccept_proxy, std::tuple<std::string, FileDescriptor>* (FileDescriptor socket_fd, - IOErrors* err)); - MOCK_CONST_METHOD3(InetConnect, void(FileDescriptor socket_fd, const std::string& address, IOErrors* err)); - MOCK_CONST_METHOD2(CreateAndConnectIPTCPSocket, FileDescriptor(const std::string& address, IOErrors* err)); - MOCK_CONST_METHOD4(Receive, size_t(FileDescriptor socket_fd, void* buf, size_t length, IOErrors* err)); - MOCK_CONST_METHOD5(ReceiveTimeout, size_t(FileDescriptor socket_fd, void* buf, size_t length, uint16_t timeout_in_sec, - IOErrors* err)); - MOCK_CONST_METHOD4(Send, size_t(FileDescriptor socket_fd, const void* buf, size_t length, IOErrors* err)); - MOCK_CONST_METHOD3(Skip, void(FileDescriptor socket_fd, size_t length, IOErrors* err)); - MOCK_CONST_METHOD3(Open, FileDescriptor(const std::string& filename, int open_flags, IOErrors* err)); - MOCK_CONST_METHOD2(Close, void(FileDescriptor, IOErrors*)); - MOCK_CONST_METHOD4(Read, size_t(FileDescriptor fd, void* buf, size_t length, IOErrors* err)); - MOCK_CONST_METHOD4(Write, size_t(FileDescriptor fd, const void* buf, size_t length, IOErrors* err)); - MOCK_CONST_METHOD2(CreateDirectory, void(const std::string& directory_name, hidra2::IOErrors* err)); - MOCK_CONST_METHOD3(GetDataFromFile, FileData(const std::string& fname, uint64_t fsize, IOErrors* err)); - MOCK_CONST_METHOD2(FilesInFolder, std::vector<FileInfo>(const std::string& folder, IOErrors* err)); - MOCK_CONST_METHOD3(CollectFileInformationRecursivly, void(const std::string& path, std::vector<FileInfo>* files, + MOCK_CONST_METHOD2(InetAccept_proxy, std::tuple<std::string, SocketDescriptor>* (SocketDescriptor socket_fd, IOErrors* err)); + MOCK_CONST_METHOD3(InetConnect, + void(SocketDescriptor socket_fd, const std::string& address, IOErrors* err)); + MOCK_CONST_METHOD2(CreateAndConnectIPTCPSocket, + SocketDescriptor(const std::string& address, IOErrors* err)); + MOCK_CONST_METHOD4(Receive, + size_t(SocketDescriptor socket_fd, void* buf, size_t length, IOErrors* err)); + MOCK_CONST_METHOD5(ReceiveTimeout, + size_t(SocketDescriptor socket_fd, void* buf, size_t length, long timeout_in_usec, IOErrors* err)); + MOCK_CONST_METHOD4(Send, + size_t(SocketDescriptor socket_fd, const void* buf, size_t length, IOErrors* err)); + MOCK_CONST_METHOD3(Skip, + void(SocketDescriptor socket_fd, size_t length, IOErrors* err)); + MOCK_CONST_METHOD2(CloseSocket, + void(SocketDescriptor socket_fd, IOErrors* err)); + MOCK_CONST_METHOD3(Open, + FileDescriptor(const std::string& filename, int open_flags, IOErrors* err)); + MOCK_CONST_METHOD2(Close, + void(FileDescriptor fd, IOErrors* err)); + MOCK_CONST_METHOD4(Read, + size_t(FileDescriptor fd, void* buf, size_t length, IOErrors* err)); + MOCK_CONST_METHOD4(Write, + size_t(FileDescriptor fd, const void* buf, size_t length, IOErrors* err)); + MOCK_CONST_METHOD2(CreateDirectory, + void(const std::string& directory_name, hidra2::IOErrors* err)); + MOCK_CONST_METHOD3(GetDataFromFile, + FileData(const std::string& fname, uint64_t fsize, IOErrors* err)); + MOCK_CONST_METHOD3(CollectFileInformationRecursivly, + void(const std::string& path, std::vector<FileInfo>* files, IOErrors* err)); + MOCK_CONST_METHOD2(FilesInFolder, + std::vector<FileInfo>(const std::string& folder, IOErrors* err)); }; } diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 0c506ba7c..f740beada 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -61,7 +61,7 @@ int main (int argc, char* argv[]) { << std::endl; SendDummyData(receiver_address, number_of_byte, iterations); - getchar(); + getchar(); } diff --git a/receiver/src/network_producer_peer.cpp b/receiver/src/network_producer_peer.cpp index 397264620..18159a317 100644 --- a/receiver/src/network_producer_peer.cpp +++ b/receiver/src/network_producer_peer.cpp @@ -45,12 +45,12 @@ void NetworkProducerPeer::internal_receiver_thread_() { while(is_listening_) { err = IOErrors::kNoError; - size_t size = io->ReceiveTimeout(socket_fd_, generic_request, sizeof(GenericNetworkRequest), 5, &err); - if (size == 0) { - std::cout << "size: " << size << std::endl; - } + size_t size = io->ReceiveTimeout(socket_fd_, generic_request, sizeof(GenericNetworkRequest), 5, &err); + if (size == 0) { + std::cout << "size: " << size << std::endl; + } + - if(err != IOErrors::kNoError) { if(err == IOErrors::kTimeout) { diff --git a/receiver/src/network_producer_peer_handlers.cpp b/receiver/src/network_producer_peer_handlers.cpp index 8568de11c..2ad818293 100644 --- a/receiver/src/network_producer_peer_handlers.cpp +++ b/receiver/src/network_producer_peer_handlers.cpp @@ -22,11 +22,11 @@ void NetworkProducerPeer::handle_send_data_request_(NetworkProducerPeer* self, c SendDataResponse* response) { IOErrors ioErr; - if (request->file_size == 0) { - std::cerr << "[" << self->connection_id() << "] file_id: " << request->file_id << " has size of 0!" << std::endl; - response->error_code = NET_ERR__ALLOCATE_STORAGE_FAILED; - return; - } + if (request->file_size == 0) { + std::cerr << "[" << self->connection_id() << "] file_id: " << request->file_id << " has size of 0!" << std::endl; + response->error_code = NET_ERR__ALLOCATE_STORAGE_FAILED; + return; + } if(request->file_size > size_t(2)*size_t(1024)*size_t(1024)*size_t(1024)/*2GiByte*/) { response->error_code = NET_ERR__ALLOCATE_STORAGE_FAILED; diff --git a/tests/system_io/ip_tcp_network/ip_tcp_network.cpp b/tests/system_io/ip_tcp_network/ip_tcp_network.cpp index b49fb2b6a..a5eb607ee 100644 --- a/tests/system_io/ip_tcp_network/ip_tcp_network.cpp +++ b/tests/system_io/ip_tcp_network/ip_tcp_network.cpp @@ -14,13 +14,17 @@ using hidra2::SocketProtocols; using hidra2::FileDescriptor; using hidra2::M_AssertEq; +using namespace std::chrono; + static const std::unique_ptr<SystemIO> io(new SystemIO()); static const std::string kListenAddress = "127.0.0.1:60123"; static std::promise<void> thread_started; void ExitIfErrIsNotOk(IOErrors* err, int exit_number) { - if(*err != IOErrors::kNoError) + if(*err != IOErrors::kNoError) { + std::cerr << "ERROR: Exit on " << exit_number << std::endl; exit(exit_number); + } } std::thread* CreateEchoServerThread() { @@ -33,94 +37,95 @@ std::thread* CreateEchoServerThread() { ExitIfErrIsNotOk(&err, 101); io->Listen(socket, 5, &err); ExitIfErrIsNotOk(&err, 102); - thread_started.set_value(); - - int i = 0; - while (true) { - std::cout << "[SERVER][" << i << "] InetAccept" << std::endl; - auto client_info_tuple = io->InetAccept(socket, &err); - ExitIfErrIsNotOk(&err, 103); - std::string client_address; - FileDescriptor client_fd; - std::tie(client_address, client_fd) = *client_info_tuple; - - size_t max_buffer_size = 1024 * 1024;//1MiByte - std::unique_ptr<uint8_t[]> buffer(new uint8_t[max_buffer_size]); - while (true) { - size_t received = io->ReceiveTimeout(client_fd, buffer.get(), max_buffer_size, 100, &err); - if (err == IOErrors::kTimeout) { - continue; - } - if (err == IOErrors::kEndOfFile) { - io->Send(client_fd, buffer.get(), received, &err); - ExitIfErrIsNotOk(&err, 104); - break; - } - ExitIfErrIsNotOk(&err, 104); - io->Send(client_fd, buffer.get(), received, &err); - ExitIfErrIsNotOk(&err, 105); - } - - std::cout << "[SERVER][" << i << "] Close client_fd" << std::endl; - io->CloseSocket(client_fd, &err); - ExitIfErrIsNotOk(&err, 106); - } - std::cout << "[SERVER][" << i << "] Close socket" << std::endl; - io->CloseSocket(socket, &err); - ExitIfErrIsNotOk(&err, 107); + thread_started.set_value(); + + int i = 0; + while (true) { + std::cout << "[SERVER][" << i << "] InetAccept" << std::endl; + auto client_info_tuple = io->InetAccept(socket, &err); + ExitIfErrIsNotOk(&err, 103); + std::string client_address; + FileDescriptor client_fd; + std::tie(client_address, client_fd) = *client_info_tuple; + + size_t max_buffer_size = 1024 * 1024;//1MiByte + std::unique_ptr<uint8_t[]> buffer(new uint8_t[max_buffer_size]); + while (true) { + size_t received = io->ReceiveTimeout(client_fd, buffer.get(), max_buffer_size, 100, &err); + if (err == IOErrors::kTimeout) { + continue; + } + if (err == IOErrors::kEndOfFile) { + io->Send(client_fd, buffer.get(), received, &err); + ExitIfErrIsNotOk(&err, 104); + break; + } + ExitIfErrIsNotOk(&err, 104); + io->Send(client_fd, buffer.get(), received, &err); + ExitIfErrIsNotOk(&err, 105); + } + + std::cout << "[SERVER][" << i << "] Close client_fd" << std::endl; + io->CloseSocket(client_fd, &err); + ExitIfErrIsNotOk(&err, 106); + break; + } + std::cout << "[SERVER][" << i << "] Close socket" << std::endl; + io->CloseSocket(socket, &err); + ExitIfErrIsNotOk(&err, 107); }); } void CheckNormal(int times, size_t size) { - IOErrors err; - std::cout << "[CLIENT] CreateAndConnectIPTCPSocket" << std::endl; - FileDescriptor socket = io->CreateAndConnectIPTCPSocket(kListenAddress, &err); - ExitIfErrIsNotOk(&err, 201); - - io->ReceiveTimeout(socket, nullptr, 1, 1000*100/*100ms*/, &err); - if (err != IOErrors::kTimeout) { - ExitIfErrIsNotOk(&err, 202); - } - - for (int i = 0; i < times; i++) { - std::unique_ptr<uint8_t[]> buffer(new uint8_t[size]); - for (size_t i = 0; i < size; i++) { - buffer[i] = rand(); - } - - std::cout << "[CLIENT] Send" << std::endl; - io->Send(socket, buffer.get(), size, &err); - ExitIfErrIsNotOk(&err, 203); - - std::unique_ptr<uint8_t[]> buffer2(new uint8_t[size]); - std::cout << "[CLIENT] Receive" << std::endl; - io->Receive(socket, buffer2.get(), size, &err); - ExitIfErrIsNotOk(&err, 204); - - std::cout << "[CLIENT] buffer check" << std::endl; - for (size_t i = 0; i < size; i++) { - if (buffer[i] != buffer2[i]) { - exit(205); - } - } - } - - std::cout << "[CLIENT] Close" << std::endl; - io->CloseSocket(socket, &err); - ExitIfErrIsNotOk(&err, 106); + IOErrors err; + std::cout << "[CLIENT] CreateAndConnectIPTCPSocket" << std::endl; + FileDescriptor socket = io->CreateAndConnectIPTCPSocket(kListenAddress, &err); + ExitIfErrIsNotOk(&err, 201); + + io->ReceiveTimeout(socket, nullptr, 1, 1000 * 100/*100ms*/, &err); + if (err != IOErrors::kTimeout) { + ExitIfErrIsNotOk(&err, 202); + } + + for (int i = 0; i < times; i++) { + std::unique_ptr<uint8_t[]> buffer(new uint8_t[size]); + for (size_t i = 0; i < size; i++) { + buffer[i] = rand(); + } + + std::cout << "[CLIENT] Send" << std::endl; + io->Send(socket, buffer.get(), size, &err); + ExitIfErrIsNotOk(&err, 203); + + std::unique_ptr<uint8_t[]> buffer2(new uint8_t[size]); + std::cout << "[CLIENT] Receive" << std::endl; + io->Receive(socket, buffer2.get(), size, &err); + ExitIfErrIsNotOk(&err, 204); + + std::cout << "[CLIENT] buffer check" << std::endl; + for (size_t i = 0; i < size; i++) { + if (buffer[i] != buffer2[i]) { + exit(205); + } + } + } + + std::cout << "[CLIENT] Close" << std::endl; + io->CloseSocket(socket, &err); + ExitIfErrIsNotOk(&err, 106); } int main(int argc, char* argv[]) { - std::thread* server_thread = CreateEchoServerThread(); - server_thread->detach(); - thread_started.get_future().get();//Make sure that the server is started - - std::cout << "Check 1" << std::endl; - CheckNormal(10, 1024 * 1024 * 3); - std::cout << "Check 2" << std::endl; - CheckNormal(30, 1024 * 1024 * 3); - std::cout << "Check 3" << std::endl; - CheckNormal(2, 1024 * 1024 * 100/*100 MiByte */); - - return 0; + std::thread* server_thread = CreateEchoServerThread(); + server_thread->detach(); + thread_started.get_future().get();//Make sure that the server is started + + std::cout << "Check 1" << std::endl; + CheckNormal(10, 1024 * 1024 * 3); + std::cout << "Check 2" << std::endl; + CheckNormal(30, 1024 * 1024 * 30); + std::cout << "Check 3" << std::endl; + CheckNormal(2, 1024 * 1024 * 1/*100 MiByte */); + + return 0; } -- GitLab