From 37edd71435ecf05abb9ee78fc5a7472a35d5fc79 Mon Sep 17 00:00:00 2001 From: Carsten Patzke <carsten.patzke@desy.de> Date: Fri, 26 Jan 2018 16:59:08 +0100 Subject: [PATCH] Implemented more windows features --- common/cpp/include/common/networking.h | 1 + common/cpp/include/system_wrappers/io.h | 35 ++-- .../cpp/include/system_wrappers/system_io.h | 43 +++-- common/cpp/src/system_io.cpp | 119 ++++++++++++-- common/cpp/src/system_io_windows.cpp | 146 +++++++++++------ .../dummy_data_producer.cpp | 4 +- receiver/src/main.cpp | 2 +- receiver/src/network_producer_peer.cpp | 15 +- .../src/network_producer_peer_handlers.cpp | 8 +- receiver/src/receiver.cpp | 6 +- .../ip_tcp_network/ip_tcp_network.cpp | 151 ++++++++++-------- 11 files changed, 352 insertions(+), 178 deletions(-) diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index 6ea824a5f..b1108e686 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -8,6 +8,7 @@ namespace hidra2 { typedef uint64_t NetworkRequestId; enum OpCode : uint8_t { + OP_CODE_A, OP_CODE__SEND_DATA, OP_CODE_COUNT, diff --git a/common/cpp/include/system_wrappers/io.h b/common/cpp/include/system_wrappers/io.h index 4f41298dc..44ff508ec 100644 --- a/common/cpp/include/system_wrappers/io.h +++ b/common/cpp/include/system_wrappers/io.h @@ -60,6 +60,7 @@ enum class SocketProtocols { }; typedef int FileDescriptor; +typedef int SocketDescriptor; class IO { public: @@ -72,31 +73,27 @@ class IO { /* * Network */ - virtual FileDescriptor CreateSocket (AddressFamilies address_family, SocketTypes socket_type, - SocketProtocols socket_protocol, IOErrors* err) const = 0; - virtual void Listen (FileDescriptor socket_fd, int backlog, IOErrors* err) const = 0; - virtual void InetBind (FileDescriptor socket_fd, const std::string& address, IOErrors* err) const = 0; - virtual std::unique_ptr<std::tuple<std::string, FileDescriptor>> InetAccept(FileDescriptor socket_fd, - IOErrors* err) const = 0; - virtual void InetConnect (FileDescriptor socket_fd, const std::string& address, IOErrors* err) const = 0; - virtual FileDescriptor CreateAndConnectIPTCPSocket(const std::string& address, IOErrors* err) const = 0; - - virtual size_t Receive (FileDescriptor socket_fd, void* buf, size_t length, IOErrors* err) const = 0; - virtual size_t ReceiveTimeout (FileDescriptor socket_fd, - void* buf, - size_t length, - uint16_t timeout_in_sec, - IOErrors* err) const = 0; - virtual size_t Send (FileDescriptor socket_fd, const void* buf, size_t length, - IOErrors* err) const = 0; - virtual void Skip (FileDescriptor socket_fd, size_t length, IOErrors* err) const = 0; + virtual SocketDescriptor CreateSocket(AddressFamilies address_family, SocketTypes socket_type, SocketProtocols socket_protocol, IOErrors* err) const = 0; + virtual void Listen(SocketDescriptor socket_fd, int backlog, IOErrors* err) const = 0; + virtual void InetBind(SocketDescriptor socket_fd, const std::string& address, IOErrors* err) const = 0; + virtual std::unique_ptr<std::tuple<std::string, SocketDescriptor>> InetAccept(SocketDescriptor socket_fd, IOErrors* err) const = 0; + virtual void InetConnect(SocketDescriptor socket_fd, const std::string& address, IOErrors* err) const = 0; + virtual SocketDescriptor CreateAndConnectIPTCPSocket(const std::string& address, IOErrors* err) const = 0; + virtual size_t Receive(SocketDescriptor socket_fd, void* buf, size_t length, IOErrors* err) const = 0; + virtual size_t ReceiveTimeout(SocketDescriptor socket_fd, void* buf, size_t length, long timeout_in_usec, IOErrors* err) const = 0; + virtual size_t Send(SocketDescriptor socket_fd, const void* buf, size_t length, IOErrors* err) const = 0; + virtual void Skip(SocketDescriptor socket_fd, size_t length, IOErrors* err) const = 0; + /** + * @param err Since CloseSocket if often used in an error case, it's able to accept nullptr. + */ + virtual void CloseSocket(SocketDescriptor socket_fd, IOErrors* err) const = 0; /* * Filesystem */ virtual FileDescriptor Open (const std::string& filename, int open_flags, IOErrors* err) const = 0; /** - * @param err Since close if often used in an error case, it's able to accept nullptr. + * @param err Since Close if often used in an error case, it's able to accept nullptr. */ virtual void Close (FileDescriptor fd, IOErrors* err) const = 0; diff --git a/common/cpp/include/system_wrappers/system_io.h b/common/cpp/include/system_wrappers/system_io.h index f62159eb0..1e8e71287 100644 --- a/common/cpp/include/system_wrappers/system_io.h +++ b/common/cpp/include/system_wrappers/system_io.h @@ -16,20 +16,22 @@ class SystemIO final : public IO { int FileOpenModeToPosixFileOpenMode(int open_flags) const; IOErrors GetLastError() const; - int AddressFamilyToPosixFamily (AddressFamilies address_family) const; + short AddressFamilyToPosixFamily (AddressFamilies address_family) const; int SocketTypeToPosixType (SocketTypes socket_type) const; int SocketProtocolToPosixProtocol (SocketProtocols socket_protocol) const; // Function maps. Should never be called apart from in wrapper function FileDescriptor _open(const char* filename, int posix_open_flags) const; - void _close(FileDescriptor fd) const; + void _close(FileDescriptor fd) const; ssize_t _read(FileDescriptor fd, void* buffer, size_t length) const; ssize_t _write(FileDescriptor fd, const void* buffer, size_t count) const; - FileDescriptor _socket(int address_family, int socket_type, int socket_protocol) const; - int _listen(FileDescriptor fd, int backlog) const; - ssize_t _send(FileDescriptor socket_fd, const void* buffer, size_t length) const; - ssize_t _recv(FileDescriptor socket_fd, void* buffer, size_t length) const; - int _mkdir(const char* dirname) const; + int _mkdir(const char* dirname) const; + + SocketDescriptor _socket(int address_family, int socket_type, int socket_protocol) const; + int _listen(SocketDescriptor fd, int backlog) const; + ssize_t _send(SocketDescriptor socket_fd, const void* buffer, size_t length) const; + ssize_t _recv(SocketDescriptor socket_fd, void* buffer, size_t length) const; + void _close_socket(SocketDescriptor fd) const; std::unique_ptr<std::tuple<std::string, uint16_t>> SplitAddressToHostAndPort(std::string address) const; @@ -46,22 +48,17 @@ class SystemIO final : public IO { /* * Network */ - FileDescriptor CreateSocket(AddressFamilies address_family, SocketTypes socket_type, - SocketProtocols socket_protocol, IOErrors* err) const; - void Listen(FileDescriptor socket_fd, int backlog, IOErrors* err) const; - void InetBind(FileDescriptor socket_fd, const std::string& address, IOErrors* err) const; - std::unique_ptr<std::tuple<std::string, FileDescriptor>> InetAccept(FileDescriptor socket_fd, IOErrors* err) const; - void InetConnect(FileDescriptor socket_fd, const std::string& address, IOErrors* err) const; - FileDescriptor CreateAndConnectIPTCPSocket(const std::string& address, IOErrors* err) const; - - size_t Receive(FileDescriptor socket_fd, void* buf, size_t length, IOErrors* err) const; - size_t ReceiveTimeout(FileDescriptor socket_fd, - void* buf, - size_t length, - uint16_t timeout_in_sec, - IOErrors* err) const; - size_t Send(FileDescriptor socket_fd, const void* buf, size_t length, IOErrors* err) const; - void Skip(FileDescriptor socket_fd, size_t length, IOErrors* err) const; + SocketDescriptor CreateSocket(AddressFamilies address_family, SocketTypes socket_type, SocketProtocols socket_protocol, IOErrors* err) const; + void Listen(SocketDescriptor socket_fd, int backlog, IOErrors* err) const; + void InetBind(SocketDescriptor socket_fd, const std::string& address, IOErrors* err) const; + std::unique_ptr<std::tuple<std::string, SocketDescriptor>> InetAccept(SocketDescriptor socket_fd, IOErrors* err) const; + void InetConnect(SocketDescriptor socket_fd, const std::string& address, IOErrors* err) const; + SocketDescriptor CreateAndConnectIPTCPSocket(const std::string& address, IOErrors* err) const; + size_t Receive(SocketDescriptor socket_fd, void* buf, size_t length, IOErrors* err) const; + size_t ReceiveTimeout(SocketDescriptor socket_fd, void* buf, size_t length, long timeout_in_usec, IOErrors* err) const; + size_t Send(SocketDescriptor socket_fd, const void* buf, size_t length, IOErrors* err) const; + void Skip(SocketDescriptor socket_fd, size_t length, IOErrors* err) const; + void CloseSocket(SocketDescriptor socket_fd, IOErrors* err) const; /* * Filesystem diff --git a/common/cpp/src/system_io.cpp b/common/cpp/src/system_io.cpp index 828a1c3be..fbfad4099 100644 --- a/common/cpp/src/system_io.cpp +++ b/common/cpp/src/system_io.cpp @@ -3,6 +3,13 @@ #include <cassert> #include <algorithm> +#ifdef _WIN32 +#include <windows.h> +#undef CreateDirectory +#endif + + + namespace hidra2 { /******************************************************************************* @@ -27,11 +34,11 @@ void StripBasePath(const std::string& folder, std::vector<FileInfo>* file_list) // PRIVATE FUNCTIONS - END -std::thread* hidra2::SystemIO::NewThread(std::function<void()> function) const { +std::thread* SystemIO::NewThread(std::function<void()> function) const { return new std::thread(function); } -void hidra2::SystemIO::Skip(hidra2::FileDescriptor socket_fd, size_t length, hidra2::IOErrors* err) const { +void SystemIO::Skip(SocketDescriptor socket_fd, size_t length, hidra2::IOErrors* err) const { static const size_t kSkipBufferSize = 1024; //TODO need to find a better way to skip bytes @@ -108,11 +115,18 @@ hidra2::FileDescriptor hidra2::SystemIO::Open(const std::string& filename, return fd; } -void hidra2::SystemIO::Close(hidra2::FileDescriptor fd, hidra2::IOErrors* err) const { - _close(fd); - if(err) { - *err = GetLastError(); - } +void hidra2::SystemIO::CloseSocket(SocketDescriptor fd, hidra2::IOErrors* err) const { + _close_socket(fd); + if (err) { + *err = GetLastError(); + } +} + +void hidra2::SystemIO::Close(FileDescriptor fd, hidra2::IOErrors* err) const { + _close(fd); + if (err) { + *err = GetLastError(); + } } size_t hidra2::SystemIO::Read(FileDescriptor fd, void* buf, size_t length, IOErrors* err) const { @@ -157,7 +171,32 @@ size_t hidra2::SystemIO::Write(FileDescriptor fd, const void* buf, size_t length return already_wrote; } -FileDescriptor SystemIO::CreateSocket(AddressFamilies address_family, + +short hidra2::SystemIO::AddressFamilyToPosixFamily(AddressFamilies address_family) const { + switch (address_family) { + case AddressFamilies::INET: + return AF_INET; + } + return -1; +} + +int hidra2::SystemIO::SocketTypeToPosixType(SocketTypes socket_type) const { + switch (socket_type) { + case SocketTypes::STREAM: + return SOCK_STREAM; + } + return -1; +} + +int hidra2::SystemIO::SocketProtocolToPosixProtocol(SocketProtocols socket_protocol) const { + switch (socket_protocol) { + case SocketProtocols::IP: + return IPPROTO_IP; + } + return -1; +} + +SocketDescriptor SystemIO::CreateSocket(AddressFamilies address_family, SocketTypes socket_type, SocketProtocols socket_protocol, IOErrors* err) const { @@ -186,7 +225,36 @@ FileDescriptor SystemIO::CreateSocket(AddressFamilies address_family, return fd; } -void hidra2::SystemIO::Listen(hidra2::FileDescriptor socket_fd, int backlog, hidra2::IOErrors* err) const { +void hidra2::SystemIO::InetBind(SocketDescriptor socket_fd, const std::string& address, + IOErrors* err) const { + *err = IOErrors::kNoError; + + int family = AddressFamilyToPosixFamily(AddressFamilies::INET); + if (family == -1) { + *err = IOErrors::kUnsupportedAddressFamily; + return; + } + + auto host_port_tuple = SplitAddressToHostAndPort(address); + if (!host_port_tuple) { + *err = IOErrors::kInvalidAddressFormat; + return; + } + std::string host; + uint16_t port = 0; + std::tie(host, port) = *host_port_tuple; + + sockaddr_in socket_address{}; + socket_address.sin_addr.s_addr = inet_addr(host.c_str()); + socket_address.sin_port = htons(port); + socket_address.sin_family = family; + + if (::bind(socket_fd, reinterpret_cast<const sockaddr*>(&socket_address), sizeof(socket_address)) == -1) { + *err = GetLastError(); + } +} + +void hidra2::SystemIO::Listen(SocketDescriptor socket_fd, int backlog, hidra2::IOErrors* err) const { *err = IOErrors::kNoError; if (_listen(socket_fd, backlog) == -1) { @@ -194,8 +262,7 @@ void hidra2::SystemIO::Listen(hidra2::FileDescriptor socket_fd, int backlog, hid } } -size_t hidra2::SystemIO::Receive(hidra2::FileDescriptor socket_fd, void* buf, size_t length, - hidra2::IOErrors* err) const { +size_t hidra2::SystemIO::Receive(SocketDescriptor socket_fd, void* buf, size_t length, IOErrors* err) const { *err = hidra2::IOErrors::kNoError; size_t already_received = 0; @@ -212,15 +279,38 @@ size_t hidra2::SystemIO::Receive(hidra2::FileDescriptor socket_fd, void* buf, si } already_received += received_amount; } - return already_received; } -size_t hidra2::SystemIO::Send(hidra2::FileDescriptor socket_fd, +size_t hidra2::SystemIO::ReceiveTimeout(SocketDescriptor socket_fd, void* buf, size_t length, long timeout_in_usec, IOErrors* err) const { + *err = hidra2::IOErrors::kNoError; + + fd_set read_fds; + FD_ZERO(&read_fds); + FD_SET(socket_fd, &read_fds); + timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = timeout_in_usec; + + int res = ::select(socket_fd + 1, &read_fds, nullptr, nullptr, &timeout); + if (res == 0) { + *err = IOErrors::kTimeout; + return 0; + } + if (res == -1) { + *err = GetLastError(); + return 0; + } + + return Receive(socket_fd, buf, length, err); +} + + +size_t hidra2::SystemIO::Send(SocketDescriptor socket_fd, const void* buf, size_t length, - hidra2::IOErrors* err) const { + IOErrors* err) const { *err = hidra2::IOErrors::kNoError; size_t already_sent = 0; @@ -241,7 +331,6 @@ size_t hidra2::SystemIO::Send(hidra2::FileDescriptor socket_fd, return already_sent; } - hidra2::FileData hidra2::SystemIO::GetDataFromFile(const std::string& fname, uint64_t fsize, IOErrors* err) const { *err = IOErrors::kNoError; FileDescriptor fd = Open(fname, IO_OPEN_MODE_READ, err); diff --git a/common/cpp/src/system_io_windows.cpp b/common/cpp/src/system_io_windows.cpp index 514f2064f..70cf4bc88 100644 --- a/common/cpp/src/system_io_windows.cpp +++ b/common/cpp/src/system_io_windows.cpp @@ -6,6 +6,7 @@ #include <io.h> #include <windows.h> #include <direct.h> +#include <iostream> #pragma comment(lib, "Ws2_32.lib") @@ -16,23 +17,27 @@ using std::chrono::system_clock; namespace hidra2 { IOErrors IOErrorFromGetLastError() { - IOErrors err; - switch (GetLastError()) { + DWORD last_error = GetLastError(); + switch (last_error) { case ERROR_SUCCESS : - err = IOErrors::kNoError; - break; + return IOErrors::kNoError; case ERROR_PATH_NOT_FOUND: case ERROR_FILE_NOT_FOUND: - err = IOErrors::kFileNotFound; - break; + return IOErrors::kFileNotFound; case ERROR_ACCESS_DENIED: - err = IOErrors::kPermissionDenied; - break; + return IOErrors::kPermissionDenied; + case ERROR_CONNECTION_REFUSED: + return IOErrors::kConnectionRefused; + case WSAEFAULT: + return IOErrors::kInvalidMemoryAddress; + case WSAECONNRESET: + return IOErrors::kConnectionResetByPeer; + case WSAENOTSOCK: + return IOErrors::kSocketOperationOnNonSocket; default: - err = IOErrors::kUnknownError; - break; + std::cout << "[IOErrorFromGetLastError] Unknown error code: " << last_error << std::endl; + return IOErrors::kUnknownError; } - return err; } @@ -139,64 +144,111 @@ void SystemIO::CollectFileInformationRecursivly(const std::string& path, } } -int hidra2::SystemIO::AddressFamilyToPosixFamily(AddressFamilies address_family) const { - return 0; +std::unique_ptr<std::tuple<std::string, SocketDescriptor>> SystemIO::InetAccept(SocketDescriptor socket_fd, IOErrors* err) const { + *err = IOErrors::kNoError; + static short family = AddressFamilyToPosixFamily(AddressFamilies::INET); + if (family == -1) { + *err = IOErrors::kUnsupportedAddressFamily; + return nullptr; + } + + sockaddr_in client_address{}; + static int client_address_size = sizeof(sockaddr_in); + + int peer_fd = ::accept(socket_fd, reinterpret_cast<sockaddr*>(&client_address), &client_address_size); + + if (peer_fd == -1) { + *err = GetLastError(); + return nullptr; + } + + std::string + address = std::string(inet_ntoa(client_address.sin_addr)) + ':' + std::to_string(client_address.sin_port); + return std::unique_ptr<std::tuple<std::string, SocketDescriptor>>(new + std::tuple<std::string, + SocketDescriptor>( + address, + peer_fd)); } -int hidra2::SystemIO::SocketTypeToPosixType(SocketTypes socket_type) const { - return 0; +void hidra2::SystemIO::InetConnect(SocketDescriptor socket_fd, const std::string& address, IOErrors* err) const { + *err = IOErrors::kNoError; + + auto host_port_tuple = SplitAddressToHostAndPort(address); + if (!host_port_tuple) { + *err = IOErrors::kInvalidAddressFormat; + return; + } + std::string host; + uint16_t port = 0; + std::tie(host, port) = *host_port_tuple; + + short family = AddressFamilyToPosixFamily(AddressFamilies::INET); + if (family == -1) { + *err = IOErrors::kUnsupportedAddressFamily; + return; + } + + sockaddr_in socket_address{}; + socket_address.sin_addr.s_addr = inet_addr(host.c_str()); + socket_address.sin_port = htons(port); + socket_address.sin_family = family; + + if (::connect(socket_fd, (struct sockaddr*) &socket_address, sizeof(socket_address)) == -1) { + *err = GetLastError(); + return; + } } -int hidra2::SystemIO::SocketProtocolToPosixProtocol(SocketProtocols socket_protocol) const { - return 0; -} - -void hidra2::SystemIO::InetBind(FileDescriptor socket_fd, const std::string& address, - IOErrors* err) const { - -} - -std::unique_ptr<std::tuple<std::string, FileDescriptor>> SystemIO::InetAccept(FileDescriptor socket_fd, -IOErrors* err) const { - return std::unique_ptr<std::tuple<std::string, FileDescriptor>>(); -} - -void hidra2::SystemIO::InetConnect(FileDescriptor socket_fd, const std::string& address, IOErrors* err) const { - -} - -size_t hidra2::SystemIO::ReceiveTimeout(FileDescriptor socket_fd, void* buf, size_t length, uint16_t timeout_in_sec, - IOErrors* err) const { - return size_t(); -} - -FileDescriptor hidra2::SystemIO::_open(const char* filename, int posix_open_flags) const { +FileDescriptor SystemIO::_open(const char* filename, int posix_open_flags) const { int fd; errno = _sopen_s(&fd, filename, posix_open_flags, _SH_DENYNO, _S_IREAD | _S_IWRITE); return fd; } -void SystemIO::_close(hidra2::FileDescriptor fd) const { - ::_close(fd); +void SystemIO::_close(FileDescriptor fd) const { + ::_close(fd); } -ssize_t SystemIO::_read(hidra2::FileDescriptor fd, void* buffer, size_t length) const { +void SystemIO::_close_socket(SocketDescriptor fd) const { + ::closesocket(fd); +} + +ssize_t SystemIO::_read(FileDescriptor fd, void* buffer, size_t length) const { return ::_read(fd, (char*)buffer, length); } -ssize_t SystemIO::_write(hidra2::FileDescriptor fd, const void* buffer, size_t length) const { +ssize_t SystemIO::_write(FileDescriptor fd, const void* buffer, size_t length) const { return ::_write(fd, (const char*)buffer, length); } -FileDescriptor SystemIO::_socket(int address_family, int socket_type, int socket_protocol) const { +SocketDescriptor SystemIO::_socket(int address_family, int socket_type, int socket_protocol) const { + static bool WSAStartupDone = false; + if (!WSAStartupDone) { + WSAStartupDone = true; + WORD wVersionRequested = MAKEWORD(2, 2); + WSADATA wsaData; + int err = WSAStartup(wVersionRequested, &wsaData); + if (err != 0) { + std::cout << "[_socket/WSAStartup] Faild to WSAStartup with version 2.2" << std::endl; + WSACleanup(); + // Do not return, since ::socket has to set an errno + } + else { + std::atexit([] { + WSACleanup(); + }); + } + } + return ::socket(address_family, socket_type, socket_protocol); } -ssize_t SystemIO::_send(FileDescriptor socket_fd, const void* buffer, size_t length) const { +ssize_t SystemIO::_send(SocketDescriptor socket_fd, const void* buffer, size_t length) const { return ::send(socket_fd, (char*)buffer, length, 0); } -ssize_t SystemIO::_recv(FileDescriptor socket_fd, void* buffer, size_t length) const { +ssize_t SystemIO::_recv(SocketDescriptor socket_fd, void* buffer, size_t length) const { return ::recv(socket_fd, (char*)buffer, length, 0); } @@ -204,7 +256,7 @@ int SystemIO::_mkdir(const char* dirname) const { return ::_mkdir(dirname); } -int SystemIO::_listen(FileDescriptor fd, int backlog) const { +int SystemIO::_listen(SocketDescriptor fd, int backlog) const { return ::listen(fd, backlog); } diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index ac3b80483..0c506ba7c 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -33,7 +33,7 @@ int SendDummyData(const std::string& receiver_address, size_t number_of_byte, ui auto buffer = std::unique_ptr<uint8_t>(new uint8_t[number_of_byte]); - for(uint64_t i = 0; i < iterations; i++) { + for(uint64_t i = 1; i < iterations; i++) { std::cout << "Send file " << i + 1 << "/" << iterations << std::endl; hidra2::ProducerError error; error = producer->Send(i, buffer.get(), number_of_byte); @@ -61,5 +61,7 @@ int main (int argc, char* argv[]) { << std::endl; SendDummyData(receiver_address, number_of_byte, iterations); + getchar(); + } diff --git a/receiver/src/main.cpp b/receiver/src/main.cpp index 2a01e7a7e..55f460f2b 100644 --- a/receiver/src/main.cpp +++ b/receiver/src/main.cpp @@ -2,7 +2,7 @@ #include "receiver.h" int main (int argc, char* argv[]) { - static const std::string address = "0.0.0.0:4420"; + static const std::string address = "0.0.0.0:4200"; auto* receiver = new hidra2::Receiver(); diff --git a/receiver/src/network_producer_peer.cpp b/receiver/src/network_producer_peer.cpp index bd7d16f23..397264620 100644 --- a/receiver/src/network_producer_peer.cpp +++ b/receiver/src/network_producer_peer.cpp @@ -45,7 +45,12 @@ void NetworkProducerPeer::internal_receiver_thread_() { while(is_listening_) { err = IOErrors::kNoError; - io->ReceiveTimeout(socket_fd_, generic_request, sizeof(GenericNetworkRequest), 1, &err); + size_t size = io->ReceiveTimeout(socket_fd_, generic_request, sizeof(GenericNetworkRequest), 5, &err); + if (size == 0) { + std::cout << "size: " << size << std::endl; + } + + if(err != IOErrors::kNoError) { if(err == IOErrors::kTimeout) { @@ -77,7 +82,7 @@ void NetworkProducerPeer::internal_receiver_thread_() { } } - io->Close(socket_fd_, nullptr); + io->CloseSocket(socket_fd_, nullptr); std::cout << "[" << connection_id() << "] Disconnected." << std::endl; free(generic_request); @@ -98,7 +103,7 @@ size_t NetworkProducerPeer::handle_generic_request_(GenericNetworkRequest* reque if(request->op_code >= OP_CODE_COUNT || request->op_code < 0) { std::cerr << "[" << connection_id() << "] Error invalid op_code: " << request->op_code << " force disconnect." << std::endl; - io->Close(socket_fd_, nullptr); + io->CloseSocket(socket_fd_, nullptr); return 0; } @@ -107,8 +112,8 @@ size_t NetworkProducerPeer::handle_generic_request_(GenericNetworkRequest* reque auto handler_information = kRequestHandlers[request->op_code]; - assert(handler_information.request_size <= kGenericBufferSize);//Would overwrite memory - assert(handler_information.response_size <= kGenericBufferSize);//Would overwrite memory + assert(handler_information.request_size <= kGenericBufferSize);//Would overwrite arbitrary memory + assert(handler_information.response_size <= kGenericBufferSize);//Would overwrite arbitrary memory IOErrors err; //receive the rest of the message diff --git a/receiver/src/network_producer_peer_handlers.cpp b/receiver/src/network_producer_peer_handlers.cpp index 24cbf36c1..8568de11c 100644 --- a/receiver/src/network_producer_peer_handlers.cpp +++ b/receiver/src/network_producer_peer_handlers.cpp @@ -22,6 +22,12 @@ void NetworkProducerPeer::handle_send_data_request_(NetworkProducerPeer* self, c SendDataResponse* response) { IOErrors ioErr; + if (request->file_size == 0) { + std::cerr << "[" << self->connection_id() << "] file_id: " << request->file_id << " has size of 0!" << std::endl; + response->error_code = NET_ERR__ALLOCATE_STORAGE_FAILED; + return; + } + if(request->file_size > size_t(2)*size_t(1024)*size_t(1024)*size_t(1024)/*2GiByte*/) { response->error_code = NET_ERR__ALLOCATE_STORAGE_FAILED; return; @@ -34,7 +40,7 @@ void NetworkProducerPeer::handle_send_data_request_(NetworkProducerPeer* self, c self->io->Skip(self->socket_fd_, request->file_size, &ioErr); if(ioErr != IOErrors::kNoError) { std::cout << "[NetworkProducerPeer] Out of sync force disconnect" << std::endl; - self->io->Close(self->socket_fd_, nullptr); + self->io->CloseSocket(self->socket_fd_, nullptr); } return; } diff --git a/receiver/src/receiver.cpp b/receiver/src/receiver.cpp index 3c00f3be6..b25cedd76 100644 --- a/receiver/src/receiver.cpp +++ b/receiver/src/receiver.cpp @@ -27,7 +27,7 @@ void hidra2::Receiver::StartListener(std::string listener_address, ReceiverError io->InetBind(listener_fd, listener_address, &io_error); if(io_error != IOErrors::kNoError) { - io->Close(listener_fd, nullptr); + io->CloseSocket(listener_fd, nullptr); *err = ReceiverError::FAILED_CREATING_SOCKET; listener_running_ = false; std::cerr << "Fail to bind socket" << std::endl; @@ -36,7 +36,7 @@ void hidra2::Receiver::StartListener(std::string listener_address, ReceiverError io->Listen(listener_fd, kMaxUnacceptedConnectionsBacklog, &io_error); if(io_error != IOErrors::kNoError) { - io->Close(listener_fd, nullptr); + io->CloseSocket(listener_fd, nullptr); *err = ReceiverError::FAILED_CREATING_SOCKET; listener_running_ = false; std::cerr << "Fail to start listen" << std::endl; @@ -70,7 +70,7 @@ void hidra2::Receiver::AcceptThreadLogic() { void hidra2::Receiver::StopListener(ReceiverError* err) { listener_running_ = false; - io->Close(listener_fd_, nullptr); + io->CloseSocket(listener_fd_, nullptr); if(accept_thread_) accept_thread_->join(); accept_thread_ = nullptr; diff --git a/tests/system_io/ip_tcp_network/ip_tcp_network.cpp b/tests/system_io/ip_tcp_network/ip_tcp_network.cpp index a80256a19..b49fb2b6a 100644 --- a/tests/system_io/ip_tcp_network/ip_tcp_network.cpp +++ b/tests/system_io/ip_tcp_network/ip_tcp_network.cpp @@ -2,6 +2,7 @@ #include <system_wrappers/system_io.h> #include <chrono> #include <thread> +#include <future> #include "testing.h" @@ -13,11 +14,9 @@ using hidra2::SocketProtocols; using hidra2::FileDescriptor; using hidra2::M_AssertEq; -using namespace std::chrono_literals; - static const std::unique_ptr<SystemIO> io(new SystemIO()); static const std::string kListenAddress = "127.0.0.1:60123"; -static const size_t kSendBufferSize = 1024 * 1024 * 5; //3 MiByte +static std::promise<void> thread_started; void ExitIfErrIsNotOk(IOErrors* err, int exit_number) { if(*err != IOErrors::kNoError) @@ -34,68 +33,94 @@ std::thread* CreateEchoServerThread() { ExitIfErrIsNotOk(&err, 101); io->Listen(socket, 5, &err); ExitIfErrIsNotOk(&err, 102); - - std::cout << "[SERVER] InetAccept" << std::endl; - auto client_info_tuple = io->InetAccept(socket, &err); - ExitIfErrIsNotOk(&err, 103); - std::string client_address; - FileDescriptor client_fd; - std::tie(client_address, client_fd) = *client_info_tuple; - - - std::unique_ptr<uint8_t[]> buffer(new uint8_t[kSendBufferSize]); - std::cout << "[SERVER] Receive" << std::endl; - io->Receive(client_fd, buffer.get(), kSendBufferSize, &err); - ExitIfErrIsNotOk(&err, 104); - io->Send(client_fd, buffer.get(), kSendBufferSize, &err); - std::cout << "[SERVER] Send" << std::endl; - ExitIfErrIsNotOk(&err, 105); - - std::cout << "[SERVER] Close client_fd" << std::endl; - io->Close(client_fd, &err); - ExitIfErrIsNotOk(&err, 106); - - std::cout << "[SERVER] Close socket" << std::endl; - io->Close(socket, &err); - ExitIfErrIsNotOk(&err, 107); + thread_started.set_value(); + + int i = 0; + while (true) { + std::cout << "[SERVER][" << i << "] InetAccept" << std::endl; + auto client_info_tuple = io->InetAccept(socket, &err); + ExitIfErrIsNotOk(&err, 103); + std::string client_address; + FileDescriptor client_fd; + std::tie(client_address, client_fd) = *client_info_tuple; + + size_t max_buffer_size = 1024 * 1024;//1MiByte + std::unique_ptr<uint8_t[]> buffer(new uint8_t[max_buffer_size]); + while (true) { + size_t received = io->ReceiveTimeout(client_fd, buffer.get(), max_buffer_size, 100, &err); + if (err == IOErrors::kTimeout) { + continue; + } + if (err == IOErrors::kEndOfFile) { + io->Send(client_fd, buffer.get(), received, &err); + ExitIfErrIsNotOk(&err, 104); + break; + } + ExitIfErrIsNotOk(&err, 104); + io->Send(client_fd, buffer.get(), received, &err); + ExitIfErrIsNotOk(&err, 105); + } + + std::cout << "[SERVER][" << i << "] Close client_fd" << std::endl; + io->CloseSocket(client_fd, &err); + ExitIfErrIsNotOk(&err, 106); + } + std::cout << "[SERVER][" << i << "] Close socket" << std::endl; + io->CloseSocket(socket, &err); + ExitIfErrIsNotOk(&err, 107); }); } +void CheckNormal(int times, size_t size) { + IOErrors err; + std::cout << "[CLIENT] CreateAndConnectIPTCPSocket" << std::endl; + FileDescriptor socket = io->CreateAndConnectIPTCPSocket(kListenAddress, &err); + ExitIfErrIsNotOk(&err, 201); + + io->ReceiveTimeout(socket, nullptr, 1, 1000*100/*100ms*/, &err); + if (err != IOErrors::kTimeout) { + ExitIfErrIsNotOk(&err, 202); + } + + for (int i = 0; i < times; i++) { + std::unique_ptr<uint8_t[]> buffer(new uint8_t[size]); + for (size_t i = 0; i < size; i++) { + buffer[i] = rand(); + } + + std::cout << "[CLIENT] Send" << std::endl; + io->Send(socket, buffer.get(), size, &err); + ExitIfErrIsNotOk(&err, 203); + + std::unique_ptr<uint8_t[]> buffer2(new uint8_t[size]); + std::cout << "[CLIENT] Receive" << std::endl; + io->Receive(socket, buffer2.get(), size, &err); + ExitIfErrIsNotOk(&err, 204); + + std::cout << "[CLIENT] buffer check" << std::endl; + for (size_t i = 0; i < size; i++) { + if (buffer[i] != buffer2[i]) { + exit(205); + } + } + } + + std::cout << "[CLIENT] Close" << std::endl; + io->CloseSocket(socket, &err); + ExitIfErrIsNotOk(&err, 106); +} + int main(int argc, char* argv[]) { - std::thread* server_thread = CreateEchoServerThread(); - server_thread->detach(); - - std::this_thread::sleep_for(2s); // Just to make sure that the server thread starts running - - IOErrors err; - std::cout << "[CLIENT] CreateAndConnectIPTCPSocket" << std::endl; - FileDescriptor socket = io->CreateAndConnectIPTCPSocket(kListenAddress, &err); - ExitIfErrIsNotOk(&err, 201); - - std::unique_ptr<uint8_t[]> buffer(new uint8_t[kSendBufferSize]); - for(size_t i = 0; i < kSendBufferSize; i++) { - buffer[i] = (uint8_t)i; - } - - std::cout << "[CLIENT] Send" << std::endl; - io->Send(socket, buffer.get(), kSendBufferSize, &err); - ExitIfErrIsNotOk(&err, 202); - - std::unique_ptr<uint8_t[]> buffer2(new uint8_t[kSendBufferSize]); - std::cout << "[CLIENT] Receive" << std::endl; - io->Receive(socket, buffer2.get(), kSendBufferSize, &err); - ExitIfErrIsNotOk(&err, 203); - - std::cout << "[CLIENT] Close" << std::endl; - io->Close(socket, &err); - ExitIfErrIsNotOk(&err, 104); - - std::cout << "[CLIENT] buffer check" << std::endl; - for(size_t i = 0; i < kSendBufferSize; i++) { - if(buffer[i] != buffer2[i]) { - exit(205); - } - } - - return 0; + std::thread* server_thread = CreateEchoServerThread(); + server_thread->detach(); + thread_started.get_future().get();//Make sure that the server is started + + std::cout << "Check 1" << std::endl; + CheckNormal(10, 1024 * 1024 * 3); + std::cout << "Check 2" << std::endl; + CheckNormal(30, 1024 * 1024 * 3); + std::cout << "Check 3" << std::endl; + CheckNormal(2, 1024 * 1024 * 100/*100 MiByte */); + + return 0; } -- GitLab