From a773837a17f43c746adeb9fd25aae4fa44a3cc11 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Tue, 20 Mar 2018 16:40:22 +0100 Subject: [PATCH] refactoring reciever --- common/cpp/src/system_io/system_io_linux.cpp | 3 +- receiver/CMakeLists.txt | 10 +- receiver/src/connection.cpp | 230 ++++++++++++++++++ receiver/src/connection.h | 68 ++++++ receiver/src/main.cpp | 5 +- receiver/src/network_producer_peer.cpp | 18 -- receiver/src/network_producer_peer.h | 37 --- receiver/src/network_producer_peer_impl.cpp | 161 ------------ receiver/src/network_producer_peer_impl.h | 80 ------ .../network_producer_peer_impl_functions.cpp | 39 --- .../network_producer_peer_impl_handlers.cpp | 59 ----- receiver/src/receiver.cpp | 34 ++- receiver/src/receiver.h | 7 +- receiver/src/request.cpp | 8 + receiver/src/request.h | 15 ++ receiver/src/request_handler.cpp | 9 + receiver/src/request_handler.h | 16 ++ ...ucer_peer_impl.cpp => test_connection.cpp} | 68 ++---- .../unittests/test_network_producer_peer.cpp | 15 -- receiver/unittests/test_receiver.cpp | 21 +- .../test_file_reference_handler.cpp | 20 -- receiver/unittests_OLD/test_receiver.cpp | 194 --------------- 22 files changed, 421 insertions(+), 696 deletions(-) create mode 100644 receiver/src/connection.cpp create mode 100644 receiver/src/connection.h delete mode 100644 receiver/src/network_producer_peer.cpp delete mode 100644 receiver/src/network_producer_peer.h delete mode 100644 receiver/src/network_producer_peer_impl.cpp delete mode 100644 receiver/src/network_producer_peer_impl.h delete mode 100644 receiver/src/network_producer_peer_impl_functions.cpp delete mode 100644 receiver/src/network_producer_peer_impl_handlers.cpp create mode 100644 receiver/src/request.cpp create mode 100644 receiver/src/request.h create mode 100644 receiver/src/request_handler.cpp create mode 100644 receiver/src/request_handler.h rename receiver/unittests/{test_network_producer_peer_impl.cpp => test_connection.cpp} (92%) delete mode 100644 receiver/unittests/test_network_producer_peer.cpp delete mode 100644 receiver/unittests_OLD/test_file_reference_handler.cpp delete mode 100644 receiver/unittests_OLD/test_receiver.cpp diff --git a/common/cpp/src/system_io/system_io_linux.cpp b/common/cpp/src/system_io/system_io_linux.cpp index 12eb7ac01..4ee197806 100644 --- a/common/cpp/src/system_io/system_io_linux.cpp +++ b/common/cpp/src/system_io/system_io_linux.cpp @@ -169,7 +169,6 @@ void SystemIO::CollectFileInformationRecursively(const std::string& path, void SystemIO::ApplyNetworkOptions(SocketDescriptor socket_fd, Error* err) const { //TODO: Need to change network layer code, so everything can be NonBlocking - //TODO: consider using SO_REUSEPORT so that it could work after kill -9. Currently can exit with message port // in use and one have to wait for some time until the system cleans up the stuff //int flags; if ( @@ -179,6 +178,8 @@ void SystemIO::ApplyNetworkOptions(SocketDescriptor socket_fd, Error* err) const ||*/ setsockopt(socket_fd, SOL_SOCKET, SO_SNDBUF, (char*)&kNetBufferSize, sizeof(kNetBufferSize)) != 0 || + setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, (char*)&kNetBufferSize, sizeof(kNetBufferSize)) != 0 + || setsockopt(socket_fd, SOL_SOCKET, SO_SNDBUF, (char*)&kNetBufferSize, sizeof(kNetBufferSize)) != 0 ) { *err = GetLastError(); diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index d65b8ebc8..f98fb80b4 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -1,11 +1,10 @@ set(TARGET_NAME receiver) set(SOURCE_FILES src/receiver.h src/receiver.cpp - src/network_producer_peer.h src/network_producer_peer.cpp - src/network_producer_peer_impl.h src/network_producer_peer_impl.cpp - src/network_producer_peer_impl_handlers.cpp - src/network_producer_peer_impl_functions.cpp + src/connection.h src/connection.cpp src/receiver_error.h + src/request.cpp + src/request_handler.cpp ) @@ -33,8 +32,7 @@ set_property(TARGET ${TARGET_NAME} PROPERTY ENABLE_EXPORTS true) # set(TEST_SOURCE_FILES unittests/test_receiver.cpp - unittests/test_network_producer_peer.cpp - unittests/test_network_producer_peer_impl.cpp + unittests/test_connection.cpp ) # set(TEST_LIBRARIES "${TARGET_NAME};system_io") diff --git a/receiver/src/connection.cpp b/receiver/src/connection.cpp new file mode 100644 index 000000000..1fd54705c --- /dev/null +++ b/receiver/src/connection.cpp @@ -0,0 +1,230 @@ +#include <cstring> +#include <assert.h> +#include "connection.h" +#include "receiver_error.h" + +namespace hidra2 { + +size_t Connection::kRequestHandlerMaxBufferSize; +std::atomic<uint32_t> Connection::kNetworkProducerPeerImplGlobalCounter(0); + +const std::vector<Connection::RequestHandlerInformation> Connection::kRequestHandlers = + Connection::StaticInitRequestHandlerList(); + +Connection::Connection(SocketDescriptor socket_fd, const std::string& address) { + socket_fd_ = socket_fd; + connection_id_ = kNetworkProducerPeerImplGlobalCounter++; + address_ = address; +} + +uint32_t Connection::GetId() const { + return connection_id_; +} + +std::string Connection::GetAddress() const { + return address_; +} + +void Connection::Listen() noexcept { + + std::unique_ptr<GenericNetworkRequest> generic_request_buffer; + std::unique_ptr<GenericNetworkResponse> generic_response_buffer; + try { + generic_request_buffer.reset(reinterpret_cast<GenericNetworkRequest*> (new uint8_t[kRequestHandlerMaxBufferSize])); + generic_response_buffer.reset(reinterpret_cast<GenericNetworkResponse*> (new uint8_t[kRequestHandlerMaxBufferSize])); + } catch(...) { + std::cerr << "Failed to allocate buffer space for request and response" << std::endl; + return; + } + + Error err; + while(true) { + err = nullptr; + ProcessRequestFromProducer(generic_request_buffer.get(), generic_response_buffer.get(), &err); + if(err) { + std::cerr << "[" << GetId() << "] Error while handling request: " << err << std::endl; + break; + } + } + + io->CloseSocket(socket_fd_, nullptr); + std::cout << "[" << GetId() << "] Disconnected." << std::endl; +} + + +void Connection::ProcessRequestFromProducer(GenericNetworkRequest* request, + GenericNetworkResponse* response, + Error* err) noexcept { + io->ReceiveWithTimeout(socket_fd_, request, sizeof(GenericNetworkRequest), 50, err); + if(*err) { + if(*err == IOErrorTemplates::kTimeout) { + *err = nullptr;//Not an error in this case + } + return; + } + HandleRawRequestBuffer(request, response, err); +} + + +void Connection::HandleRawRequestBuffer(GenericNetworkRequest* request, + GenericNetworkResponse* response, + Error* err) noexcept { + std::cout << "[" << GetId() << "] Got request op_code: " << request->op_code << std::endl; + + //response will be set here and the amount to send is returned + size_t bytes_to_send = HandleGenericRequest(request, response, err); + if(*err) { + std::cerr << "[" << GetId() << "] Error occurred while handling op_code: " << request->op_code << std::endl; + return; + } + + if(bytes_to_send == 0) { + return;//No data to send + } + + io->Send(socket_fd_, response, bytes_to_send, err); +} + +size_t Connection::HandleGenericRequest(GenericNetworkRequest* request, + GenericNetworkResponse* response, Error* err) noexcept { + if(!CheckIfValidNetworkOpCode(request->op_code)) { + *err = hidra2::ReceiverErrorTemplates::kInvalidOpCode.Generate(); + return 0; + } + + response->request_id = request->request_id; + response->op_code = request->op_code; + + auto handler_information = kRequestHandlers[request->op_code]; + + static const size_t sizeof_generic_request = sizeof(GenericNetworkRequest); + + //after receiving all GenericNetworkResponse fields (did the caller already), + //we need now need to receive the rest of the request + io->Receive(socket_fd_, (uint8_t*)request + sizeof_generic_request, + handler_information.request_size - sizeof_generic_request, err); + + if(*err) { + return 0; + } + + //Invoke the request handler which sets the response + handler_information.handler(this, request, response, err); + + if(*err) { + return 0; + } + + return handler_information.response_size; +} + +Connection::~Connection() { +} + +FileDescriptor Connection::CreateAndOpenFileByFileId(uint64_t file_id, Error* err) const noexcept { + io->CreateNewDirectory("files", err); + if(*err && *err != IOErrorTemplates::kFileAlreadyExists) { + return -1; + } + return io->Open("files/" + std::to_string(file_id) + ".bin", IO_OPEN_MODE_CREATE_AND_FAIL_IF_EXISTS | IO_OPEN_MODE_RW, + err); +} + +bool Connection::CheckIfValidFileSize(size_t file_size) const noexcept { + return file_size != 0 && file_size <= size_t(1024) * 1024 * 1024 * 2; +} + +bool Connection::CheckIfValidNetworkOpCode(Opcode opcode) const noexcept { + return opcode < kNetOpcodeCount && opcode >= 0; +} + +void Connection::ReceiveAndSaveFile(uint64_t file_id, size_t file_size, Error* err) const noexcept { + if(!CheckIfValidFileSize(file_size)) { + *err = ErrorTemplates::kMemoryAllocationError.Generate(); + return; + } + + FileDescriptor fd = CreateAndOpenFileByFileId(file_id, err); + if(*err) { + if(*err != IOErrorTemplates::kFileAlreadyExists) { + return; //Unexpected error + } + Error skipErr;//Create a new error instance so that the original error will not be overwritten + io->Skip(socket_fd_, file_size, &skipErr);//Skip the file payload so it will not get out of sync + return; + } + + FileData buffer; + try { + buffer.reset(new uint8_t[file_size]); + } catch(std::exception& e) { + *err = ErrorTemplates::kMemoryAllocationError.Generate(); + (*err)->Append(e.what()); + return; + } + + io->Receive(socket_fd_, buffer.get(), file_size, err); + if(*err) { + return; + } + + io->Write(fd, buffer.get(), file_size, err); + if(*err) { + return; + } +} + +const std::vector<Connection::RequestHandlerInformation> +Connection::StaticInitRequestHandlerList() { + std::vector<Connection::RequestHandlerInformation> vec(kNetOpcodeCount); + + // Add new opcodes here + vec[kNetOpcodeSendData] = { + sizeof(SendDataRequest), + sizeof(SendDataResponse), + (Connection::RequestHandler)& Connection::HandleSendDataRequestInternalCaller + }; + + for(RequestHandlerInformation& handler_information : vec) { + //Adjust max size needed for a request/response-buffer + + if(handler_information.request_size > kRequestHandlerMaxBufferSize) { + kRequestHandlerMaxBufferSize = handler_information.request_size; + } + if(handler_information.response_size > kRequestHandlerMaxBufferSize) { + kRequestHandlerMaxBufferSize = handler_information.response_size; + } + } + + return vec; +} + + +void Connection::HandleSendDataRequestInternalCaller(Connection* self, + const SendDataRequest* request, + SendDataResponse* response, + Error* err) noexcept { + self->HandleSendDataRequest(request, response, err); +} + +void Connection::HandleSendDataRequest(const SendDataRequest* request, SendDataResponse* response, + Error* err) noexcept { + ReceiveAndSaveFile(request->file_id, request->file_size, err); + + if(!*err) { + response->error_code = kNetErrorNoError; + return; + } + + if(*err == IOErrorTemplates::kFileAlreadyExists) { + response->error_code = kNetErrorFileIdAlreadyInUse; + } else { + std::cout << "[" << GetId() << "] Unexpected ReceiveAndSaveFile error " << *err << std::endl; + response->error_code = kNetErrorInternalServerError; + //self->io->CloseSocket(self->socket_fd_, nullptr); TODO: Might want to close the connection? + } +} + + +} + diff --git a/receiver/src/connection.h b/receiver/src/connection.h new file mode 100644 index 000000000..ddf6c1a2a --- /dev/null +++ b/receiver/src/connection.h @@ -0,0 +1,68 @@ +#ifndef HIDRA2_NetworkProducerPeerImpl_H +#define HIDRA2_NetworkProducerPeerImpl_H + +#include <string> +#include <map> +#include <utility> +#include <thread> +#include <common/networking.h> +#include <system_wrappers/has_io.h> +#include <iostream> +#include <atomic> +#include <vector> +#include "connection.h" + +namespace hidra2 { + +class Connection : public HasIO { + public: + typedef void (* RequestHandler)(Connection* self, GenericNetworkRequest* request, + GenericNetworkResponse* response, Error* err); + struct RequestHandlerInformation { + size_t request_size; + size_t response_size; + RequestHandler handler; + }; + private: + uint32_t connection_id_; + std::string address_; + int socket_fd_; + public: + static const std::vector<RequestHandlerInformation> kRequestHandlers; + static size_t kRequestHandlerMaxBufferSize; + + static std::atomic<uint32_t> kNetworkProducerPeerImplGlobalCounter; + + Connection(SocketDescriptor socket_fd, const std::string& address); + ~Connection(); + + static const std::vector<RequestHandlerInformation> StaticInitRequestHandlerList(); + void Listen()noexcept; + + uint32_t GetId() const; + std::string GetAddress() const; + + void ReceiveAndSaveFile(uint64_t file_id, size_t file_size, Error* err) const noexcept; + + FileDescriptor CreateAndOpenFileByFileId(uint64_t file_id, Error* err) const noexcept; + bool CheckIfValidFileSize(size_t file_size) const noexcept; + bool CheckIfValidNetworkOpCode(Opcode opcode) const noexcept; + + private: + size_t HandleGenericRequest(GenericNetworkRequest* request, GenericNetworkResponse* response, + Error* err) noexcept; + void HandleSendDataRequest(const SendDataRequest* request, SendDataResponse* response, Error* err) noexcept; + + void ProcessRequestFromProducer(GenericNetworkRequest* request, GenericNetworkResponse* response, + Error* err) noexcept; + void HandleRawRequestBuffer(GenericNetworkRequest* request, GenericNetworkResponse* response, + Error* err) noexcept; + static void HandleSendDataRequestInternalCaller(Connection* self, + const SendDataRequest* request, + SendDataResponse* response, Error* err) noexcept; +}; + +} + + +#endif //HIDRA2_NetworkProducerPeerImpl_H diff --git a/receiver/src/main.cpp b/receiver/src/main.cpp index 5cac1c7c1..3b871296c 100644 --- a/receiver/src/main.cpp +++ b/receiver/src/main.cpp @@ -8,12 +8,11 @@ int main (int argc, char* argv[]) { hidra2::Error err; - std::cout << "StartListener on " << address << std::endl; - receiver->StartListener(address, &err); + std::cout << "Listening on " << address << std::endl; + receiver->Listen(address, &err); if(err) { std::cerr << "Failed to start receiver: " << err << std::endl; return 1; } - return 0; } diff --git a/receiver/src/network_producer_peer.cpp b/receiver/src/network_producer_peer.cpp deleted file mode 100644 index a9beec45b..000000000 --- a/receiver/src/network_producer_peer.cpp +++ /dev/null @@ -1,18 +0,0 @@ -#include "network_producer_peer_impl.h" - -namespace hidra2 { - -std::unique_ptr<NetworkProducerPeer> NetworkProducerPeer::CreateNetworkProducerPeer(SocketDescriptor socket_fd, - const std::string& address) { - return std::unique_ptr<NetworkProducerPeer>(new NetworkProducerPeerImpl(socket_fd, address)); -} - -void StartNewConnection(int peer_socket_fd, const std::string& address) { - auto peer = NetworkProducerPeer::CreateNetworkProducerPeer(peer_socket_fd, address); - - std::cout << "[" << peer->GetConnectionId() << "] New connection from " << address << std::endl; - peer->StartPeerListener(); -} - - -} diff --git a/receiver/src/network_producer_peer.h b/receiver/src/network_producer_peer.h deleted file mode 100644 index fd77753cc..000000000 --- a/receiver/src/network_producer_peer.h +++ /dev/null @@ -1,37 +0,0 @@ -#ifndef HIDRA2_NETWORKPRODUCERPEER_H -#define HIDRA2_NETWORKPRODUCERPEER_H - -#include <string> -#include <map> -#include <utility> -#include <thread> -#include <common/networking.h> -#include <system_wrappers/has_io.h> -#include <iostream> -#include <atomic> -#include <vector> - -namespace hidra2 { - -class NetworkProducerPeer { - public: - virtual ~NetworkProducerPeer() = default; - - virtual void StartPeerListener() = 0; - virtual void StopPeerListener() = 0; - - virtual uint32_t GetConnectionId() const = 0; - virtual std::string GetAddress() const = 0; - - virtual void ReceiveAndSaveFile(uint64_t file_id, size_t file_size, Error* err) const noexcept = 0; - - static std::unique_ptr<NetworkProducerPeer> CreateNetworkProducerPeer(SocketDescriptor socket_fd, - const std::string& address); -}; - -void StartNewConnection(int peer_socket_fd, const std::string& address); - - -} - -#endif //HIDRA2_NETWORKPRODUCERPEER_H diff --git a/receiver/src/network_producer_peer_impl.cpp b/receiver/src/network_producer_peer_impl.cpp deleted file mode 100644 index 7a09d8c69..000000000 --- a/receiver/src/network_producer_peer_impl.cpp +++ /dev/null @@ -1,161 +0,0 @@ -#include <cstring> -#include <assert.h> -#include "network_producer_peer_impl.h" -#include "receiver_error.h" - -namespace hidra2 { - -size_t NetworkProducerPeerImpl::kRequestHandlerMaxBufferSize; -std::atomic<uint32_t> NetworkProducerPeerImpl::kNetworkProducerPeerImplGlobalCounter(0); - -const std::vector<NetworkProducerPeerImpl::RequestHandlerInformation> NetworkProducerPeerImpl::kRequestHandlers = - NetworkProducerPeerImpl::StaticInitRequestHandlerList(); - -NetworkProducerPeerImpl::NetworkProducerPeerImpl(SocketDescriptor socket_fd, const std::string& address) { - socket_fd_ = socket_fd; - connection_id_ = kNetworkProducerPeerImplGlobalCounter++; - address_ = address; -} - -uint32_t NetworkProducerPeerImpl::GetConnectionId() const { - return connection_id_; -} - -std::string NetworkProducerPeerImpl::GetAddress() const { - return address_; -} - -void NetworkProducerPeerImpl::StartPeerListener() { - if(listener_thread_ || is_listening_) - return; - is_listening_ = true; - listener_thread_ = io->NewThread([this] { - InternalPeerReceiverThreadEntryPoint(); - }); -} - -void NetworkProducerPeerImpl::InternalPeerReceiverThreadEntryPoint() noexcept { - - std::unique_ptr<GenericNetworkRequest> generic_request_buffer; - std::unique_ptr<GenericNetworkResponse> generic_response_buffer; - try { - generic_request_buffer.reset(reinterpret_cast<GenericNetworkRequest*> (new uint8_t[kRequestHandlerMaxBufferSize])); - generic_response_buffer.reset(reinterpret_cast<GenericNetworkResponse*> (new uint8_t[kRequestHandlerMaxBufferSize])); - } catch(...) { - std::cerr << "Failed to allocate buffer space for request and response" << std::endl; - is_listening_ = false; - } - - Error err; - while(is_listening_) { - err = nullptr; - InternalPeerReceiverDoWork(generic_request_buffer.get(), generic_response_buffer.get(), &err); - if(err) { - std::cerr << "[" << GetConnectionId() << "] Error while handling work: " << err << std::endl; - is_listening_ = false; - } - } - - io->CloseSocket(socket_fd_, nullptr); - std::cout << "[" << GetConnectionId() << "] Disconnected." << std::endl; -} - - -void NetworkProducerPeerImpl::InternalPeerReceiverDoWork(GenericNetworkRequest* request, - GenericNetworkResponse* response, - Error* err) noexcept { - io->ReceiveWithTimeout(socket_fd_, request, sizeof(GenericNetworkRequest), 50, err); - if(*err) { - if(*err == IOErrorTemplates::kTimeout) { - *err = nullptr;//Not an error in this case - } - return; - } - HandleRawRequestBuffer(request, response, err); -} - - -void NetworkProducerPeerImpl::HandleRawRequestBuffer(GenericNetworkRequest* request, - GenericNetworkResponse* response, - Error* err) noexcept { - std::cout << "[" << GetConnectionId() << "] Got request op_code: " << request->op_code << std::endl; - - //response will be set here and the amount to send is returned - size_t bytes_to_send = HandleGenericRequest(request, response, err); - if(*err) { - std::cerr << "[" << GetConnectionId() << "] Error occurred while handling op_code: " << request->op_code << std::endl; - return; - } - - if(bytes_to_send == 0) { - return;//No data to send - } - - io->Send(socket_fd_, response, bytes_to_send, err); -} - -void NetworkProducerPeerImpl::StopPeerListener() { -// is_listening_ = false; - if(!listener_thread_) - return; - listener_thread_->join(); - listener_thread_ = nullptr; -} - -size_t NetworkProducerPeerImpl::HandleGenericRequest(GenericNetworkRequest* request, - GenericNetworkResponse* response, Error* err) noexcept { - if(!CheckIfValidNetworkOpCode(request->op_code)) { - *err = hidra2::ReceiverErrorTemplates::kInvalidOpCode.Generate(); - return 0; - } - - response->request_id = request->request_id; - response->op_code = request->op_code; - - auto handler_information = kRequestHandlers[request->op_code]; - - static const size_t sizeof_generic_request = sizeof(GenericNetworkRequest); - - //after receiving all GenericNetworkResponse fields (did the caller already), - //we need now need to receive the rest of the request - io->Receive(socket_fd_, (uint8_t*)request + sizeof_generic_request, - handler_information.request_size - sizeof_generic_request, err); - - if(*err) { - return 0; - } - - //Invoke the request handler which sets the response - handler_information.handler(this, request, response, err); - - if(*err) { - return 0; - } - - return handler_information.response_size; -} - -NetworkProducerPeerImpl::~NetworkProducerPeerImpl() { - StopPeerListener(); -} - -FileDescriptor NetworkProducerPeerImpl::CreateAndOpenFileByFileId(uint64_t file_id, Error* err) const noexcept { - io->CreateNewDirectory("files", err); - if(*err && *err != IOErrorTemplates::kFileAlreadyExists) { - return -1; - } - return io->Open("files/" + std::to_string(file_id) + ".bin", IO_OPEN_MODE_CREATE_AND_FAIL_IF_EXISTS | IO_OPEN_MODE_RW, - err); -} - -bool NetworkProducerPeerImpl::CheckIfValidFileSize(size_t file_size) const noexcept { - return file_size != 0 && file_size <= size_t(1024) * 1024 * 1024 * 2; -} - -bool NetworkProducerPeerImpl::CheckIfValidNetworkOpCode(Opcode opcode) const noexcept { - return opcode < kNetOpcodeCount && opcode >= 0; -} - - -} - diff --git a/receiver/src/network_producer_peer_impl.h b/receiver/src/network_producer_peer_impl.h deleted file mode 100644 index d334ed850..000000000 --- a/receiver/src/network_producer_peer_impl.h +++ /dev/null @@ -1,80 +0,0 @@ -#ifndef HIDRA2_NetworkProducerPeerImpl_H -#define HIDRA2_NetworkProducerPeerImpl_H - -#include <string> -#include <map> -#include <utility> -#include <thread> -#include <common/networking.h> -#include <system_wrappers/has_io.h> -#include <iostream> -#include <atomic> -#include <vector> -#include "network_producer_peer.h" - -namespace hidra2 { - -class NetworkProducerPeerImpl : public NetworkProducerPeer, public HasIO { - public: - typedef void (* RequestHandler)(NetworkProducerPeerImpl* self, GenericNetworkRequest* request, - GenericNetworkResponse* response, Error* err); - struct RequestHandlerInformation { - size_t request_size; - size_t response_size; - RequestHandler handler; - }; - private: - uint32_t connection_id_; - std::string address_; - int socket_fd_; - - bool is_listening_ = false; - std::unique_ptr<std::thread> listener_thread_ = nullptr; - - public: - static const std::vector<RequestHandlerInformation> kRequestHandlers; - static size_t kRequestHandlerMaxBufferSize; - - static std::atomic<uint32_t> kNetworkProducerPeerImplGlobalCounter; - - NetworkProducerPeerImpl(SocketDescriptor socket_fd, const std::string& address); - ~NetworkProducerPeerImpl() override; - - static const std::vector<RequestHandlerInformation> StaticInitRequestHandlerList(); - void StartPeerListener() override; - void StopPeerListener() override; - - uint32_t GetConnectionId() const override; - std::string GetAddress() const override; - - void ReceiveAndSaveFile(uint64_t file_id, size_t file_size, Error* err) const noexcept override; - - virtual FileDescriptor CreateAndOpenFileByFileId(uint64_t file_id, Error* err) const noexcept; - virtual bool CheckIfValidFileSize(size_t file_size) const noexcept; - virtual bool CheckIfValidNetworkOpCode(Opcode opcode) const noexcept; - - public: - /* - * Private functions but opened for unittest - */ - virtual size_t HandleGenericRequest(GenericNetworkRequest* request, GenericNetworkResponse* response, - Error* err) noexcept; - virtual void HandleSendDataRequest(const SendDataRequest* request, SendDataResponse* response, Error* err) noexcept; - - //Preparation for up coming thread pool implementation - virtual void InternalPeerReceiverThreadEntryPoint() noexcept; - virtual void InternalPeerReceiverDoWork(GenericNetworkRequest* request, GenericNetworkResponse* response, - Error* err) noexcept; - virtual void HandleRawRequestBuffer(GenericNetworkRequest* request, GenericNetworkResponse* response, - Error* err) noexcept; - - private: - static void HandleSendDataRequestInternalCaller(NetworkProducerPeerImpl* self, - const SendDataRequest* request, - SendDataResponse* response, Error* err) noexcept; -}; - -} - - -#endif //HIDRA2_NetworkProducerPeerImpl_H diff --git a/receiver/src/network_producer_peer_impl_functions.cpp b/receiver/src/network_producer_peer_impl_functions.cpp deleted file mode 100644 index d01f31af6..000000000 --- a/receiver/src/network_producer_peer_impl_functions.cpp +++ /dev/null @@ -1,39 +0,0 @@ -#include "network_producer_peer_impl.h" - -namespace hidra2 { -void NetworkProducerPeerImpl::ReceiveAndSaveFile(uint64_t file_id, size_t file_size, Error* err) const noexcept { - if(!CheckIfValidFileSize(file_size)) { - *err = ErrorTemplates::kMemoryAllocationError.Generate(); - return; - } - - FileDescriptor fd = CreateAndOpenFileByFileId(file_id, err); - if(*err) { - if(*err != IOErrorTemplates::kFileAlreadyExists) { - return; //Unexpected error - } - Error skipErr;//Create a new error instance so that the original error will not be overwritten - io->Skip(socket_fd_, file_size, &skipErr);//Skip the file payload so it will not get out of sync - return; - } - - FileData buffer; - try { - buffer.reset(new uint8_t[file_size]); - } catch(std::exception& e) { - *err = ErrorTemplates::kMemoryAllocationError.Generate(); - (*err)->Append(e.what()); - return; - } - - io->Receive(socket_fd_, buffer.get(), file_size, err); - if(*err) { - return; - } - - io->Write(fd, buffer.get(), file_size, err); - if(*err) { - return; - } -} -} diff --git a/receiver/src/network_producer_peer_impl_handlers.cpp b/receiver/src/network_producer_peer_impl_handlers.cpp deleted file mode 100644 index f4020d859..000000000 --- a/receiver/src/network_producer_peer_impl_handlers.cpp +++ /dev/null @@ -1,59 +0,0 @@ -#include "network_producer_peer_impl.h" -#include "receiver.h" -#include <cmath> - -namespace hidra2 { - -const std::vector<NetworkProducerPeerImpl::RequestHandlerInformation> -NetworkProducerPeerImpl::StaticInitRequestHandlerList() { - std::vector<NetworkProducerPeerImpl::RequestHandlerInformation> vec(kNetOpcodeCount); - - // Add new opcodes here - vec[kNetOpcodeSendData] = { - sizeof(SendDataRequest), - sizeof(SendDataResponse), - (NetworkProducerPeerImpl::RequestHandler)& NetworkProducerPeerImpl::HandleSendDataRequestInternalCaller - }; - - for(RequestHandlerInformation& handler_information : vec) { - //Adjust max size needed for a request/response-buffer - - if(handler_information.request_size > kRequestHandlerMaxBufferSize) { - kRequestHandlerMaxBufferSize = handler_information.request_size; - } - if(handler_information.response_size > kRequestHandlerMaxBufferSize) { - kRequestHandlerMaxBufferSize = handler_information.response_size; - } - } - - return vec; -} - - -void NetworkProducerPeerImpl::HandleSendDataRequestInternalCaller(NetworkProducerPeerImpl* self, - const SendDataRequest* request, - SendDataResponse* response, - Error* err) noexcept { - self->HandleSendDataRequest(request, response, err); -} - -void NetworkProducerPeerImpl::HandleSendDataRequest(const SendDataRequest* request, SendDataResponse* response, - Error* err) noexcept { - ReceiveAndSaveFile(request->file_id, request->file_size, err); - - if(!*err) { - response->error_code = kNetErrorNoError; - return; - } - - if(*err == IOErrorTemplates::kFileAlreadyExists) { - response->error_code = kNetErrorFileIdAlreadyInUse; - } else { - std::cout << "[" << GetConnectionId() << "] Unexpected ReceiveAndSaveFile error " << *err << std::endl; - response->error_code = kNetErrorInternalServerError; - //self->io->CloseSocket(self->socket_fd_, nullptr); TODO: Might want to close the connection? - } -} - -} - diff --git a/receiver/src/receiver.cpp b/receiver/src/receiver.cpp index 7f48da0ef..7746dd31a 100644 --- a/receiver/src/receiver.cpp +++ b/receiver/src/receiver.cpp @@ -2,10 +2,14 @@ #include <iostream> #include "receiver.h" #include "receiver_error.h" +#include "connection.h" -const int hidra2::Receiver::kMaxUnacceptedConnectionsBacklog = 5; +namespace hidra2 { -hidra2::Error hidra2::Receiver::PrepareListener(std::string listener_address) { + +const int Receiver::kMaxUnacceptedConnectionsBacklog = 5; + +Error Receiver::PrepareListener(std::string listener_address) { Error err = nullptr; listener_fd_ = io->CreateAndBindIPTCPSocketListener(listener_address, kMaxUnacceptedConnectionsBacklog, &err); @@ -13,19 +17,20 @@ hidra2::Error hidra2::Receiver::PrepareListener(std::string listener_address) { } -void hidra2::Receiver::StartListener(std::string listener_address, Error* err) { +void Receiver::Listen(std::string listener_address, Error* err, bool exit_after_first_connection) { *err = PrepareListener(listener_address); if(*err) { return; } while(true) { - Error err_work = nullptr; - AcceptThreadLogicWork(&err_work); + ProcessConnections(err); + if (exit_after_first_connection) break; } } -void hidra2::Receiver::AcceptThreadLogicWork(hidra2::Error* err) { +//TODO: remove error since it is not used +void Receiver::ProcessConnections(Error* err) { std::string address; FileDescriptor peer_fd; @@ -37,6 +42,21 @@ void hidra2::Receiver::AcceptThreadLogicWork(hidra2::Error* err) { return; } std::tie(address, peer_fd) = *client_info_tuple; - StartNewConnection(peer_fd, address); + StartNewConnectionInSeparateThread(peer_fd, address); +} + +void Receiver::StartNewConnectionInSeparateThread(int connection_socket_fd, const std::string& address) { + auto thread = io->NewThread([connection_socket_fd, address] { + auto connection = std::unique_ptr<Connection>(new Connection(connection_socket_fd, address)); + std::cout << "[" << connection->GetId() << "] New connection from " << address << std::endl; + connection->Listen(); + }); + + if (thread) { + thread->detach(); + } + return; + } +} \ No newline at end of file diff --git a/receiver/src/receiver.h b/receiver/src/receiver.h index a6cd7fcec..3808f8999 100644 --- a/receiver/src/receiver.h +++ b/receiver/src/receiver.h @@ -4,7 +4,7 @@ #include <string> #include <thread> #include <system_wrappers/has_io.h> -#include "network_producer_peer.h" +#include "connection.h" #include <list> namespace hidra2 { @@ -13,6 +13,8 @@ class Receiver : public HasIO { private: FileDescriptor listener_fd_ = -1; Error PrepareListener(std::string listener_address); + void StartNewConnectionInSeparateThread(int connection_socket_fd, const std::string& address); + void ProcessConnections(Error* err); public: static const int kMaxUnacceptedConnectionsBacklog;//TODO: Read from config @@ -20,8 +22,7 @@ class Receiver : public HasIO { Receiver& operator=(const Receiver&) = delete; Receiver() = default; - void StartListener(std::string listener_address, Error* err); - void AcceptThreadLogicWork(Error* err); + void Listen(std::string listener_address, Error* err, bool exit_after_first_connection = false); }; } diff --git a/receiver/src/request.cpp b/receiver/src/request.cpp new file mode 100644 index 000000000..1d0597896 --- /dev/null +++ b/receiver/src/request.cpp @@ -0,0 +1,8 @@ +// +// Created by yakubov on 20/03/18. +// + +#include "request.h" +namespace hidra2 { + +} \ No newline at end of file diff --git a/receiver/src/request.h b/receiver/src/request.h new file mode 100644 index 000000000..bb6abbf49 --- /dev/null +++ b/receiver/src/request.h @@ -0,0 +1,15 @@ +#ifndef HIDRA2_REQUEST_H +#define HIDRA2_REQUEST_H + +#include "receiver_error.h" + +namespace hidra2 { + +class Request { + Error Process(); + virtual uint64_t GetBodySize()=0; +}; + +} + +#endif //HIDRA2_REQUEST_H diff --git a/receiver/src/request_handler.cpp b/receiver/src/request_handler.cpp new file mode 100644 index 000000000..2c68d7afb --- /dev/null +++ b/receiver/src/request_handler.cpp @@ -0,0 +1,9 @@ +// +// Created by yakubov on 20/03/18. +// + +#include "request_handler.h" + +namespace hidra2 { + +} \ No newline at end of file diff --git a/receiver/src/request_handler.h b/receiver/src/request_handler.h new file mode 100644 index 000000000..2dea83399 --- /dev/null +++ b/receiver/src/request_handler.h @@ -0,0 +1,16 @@ +#ifndef HIDRA2_REQUEST_HANDLER_H +#define HIDRA2_REQUEST_HANDLER_H + +#include "request.h" +#include "receiver_error.h" + +namespace hidra2 { + + +class RequestHandler { + virtual Error ProcessRequest(const Request& request) = 0; +}; + +} + +#endif //HIDRA2_REQUEST_HANDLER_H diff --git a/receiver/unittests/test_network_producer_peer_impl.cpp b/receiver/unittests/test_connection.cpp similarity index 92% rename from receiver/unittests/test_network_producer_peer_impl.cpp rename to receiver/unittests/test_connection.cpp index 7b916bca4..586ed4a05 100644 --- a/receiver/unittests/test_network_producer_peer_impl.cpp +++ b/receiver/unittests/test_connection.cpp @@ -1,7 +1,7 @@ #include <gtest/gtest.h> #include <gmock/gmock.h> #include <unittests/MockIO.h> -#include "../src/network_producer_peer_impl.h" +#include "../src/connection.h" #include "../src/receiver_error.h" using ::testing::Return; @@ -22,42 +22,44 @@ using ::hidra2::SendDataResponse; using ::hidra2::GenericNetworkRequest; using ::hidra2::GenericNetworkResponse; using ::hidra2::Opcode; +using ::hidra2::Connection; namespace { +/* TEST(Constructor, CheckGetAddress) { std::string expected_address = "somehost:1234"; - hidra2::NetworkProducerPeerImpl networkProducerPeer(0, expected_address); + hidra2::Connection networkProducerPeer(0, expected_address); ASSERT_THAT(networkProducerPeer.GetAddress(), Eq(expected_address)); } TEST(CheckIfValidFileSize, ZeroFileSize) { - hidra2::NetworkProducerPeerImpl networkProducerPeer(0, ""); + hidra2::Connection networkProducerPeer(0, ""); EXPECT_FALSE(networkProducerPeer.CheckIfValidFileSize(0)); } TEST(CheckIfValidFileSize, SmallFileSize) { - hidra2::NetworkProducerPeerImpl networkProducerPeer(0, ""); + hidra2::Connection networkProducerPeer(0, ""); EXPECT_TRUE(networkProducerPeer.CheckIfValidFileSize(1)); } TEST(CheckIfValidFileSize, OneGiByteSize) { - hidra2::NetworkProducerPeerImpl networkProducerPeer(0, ""); + hidra2::Connection networkProducerPeer(0, ""); EXPECT_TRUE(networkProducerPeer.CheckIfValidFileSize(1024 * 1024 * 1024 * 1)); } TEST(CheckIfValidFileSize, TwoGiByteSize) { - hidra2::NetworkProducerPeerImpl networkProducerPeer(0, ""); + hidra2::Connection networkProducerPeer(0, ""); EXPECT_TRUE(networkProducerPeer.CheckIfValidFileSize(size_t(1024) * 1024 * 1024 * 2)); } TEST(CheckIfValidFileSize, MoreThenTwoGiByteSize) { - hidra2::NetworkProducerPeerImpl networkProducerPeer(0, ""); + hidra2::Connection networkProducerPeer(0, ""); EXPECT_FALSE(networkProducerPeer.CheckIfValidFileSize(size_t(1024) * 1024 * 1024 * 2 + 1)); } TEST(CreateAndOpenFileByFileId, FolderUnknownIOError) { - hidra2::NetworkProducerPeerImpl NetworkProducerPeerImpl(0, ""); + hidra2::Connection NetworkProducerPeerImpl(0, ""); hidra2::MockIO mockIO; NetworkProducerPeerImpl.SetIO__(&mockIO); @@ -74,7 +76,7 @@ TEST(CreateAndOpenFileByFileId, FolderUnknownIOError) { } TEST(CreateAndOpenFileByFileId, FolderAlreadyExsistsButFileAlreadyExists) { - hidra2::NetworkProducerPeerImpl NetworkProducerPeerImpl(0, ""); + hidra2::Connection NetworkProducerPeerImpl(0, ""); hidra2::MockIO mockIO; NetworkProducerPeerImpl.SetIO__(&mockIO); @@ -106,7 +108,7 @@ TEST(CreateAndOpenFileByFileId, FolderAlreadyExsistsButFileAlreadyExists) { } TEST(CreateAndOpenFileByFileId, FolderCreatedButFileAlreadyExists) { - hidra2::NetworkProducerPeerImpl NetworkProducerPeerImpl(0, ""); + hidra2::Connection NetworkProducerPeerImpl(0, ""); hidra2::MockIO mockIO; NetworkProducerPeerImpl.SetIO__(&mockIO); @@ -137,7 +139,7 @@ TEST(CreateAndOpenFileByFileId, FolderCreatedButFileAlreadyExists) { } TEST(CreateAndOpenFileByFileId, Ok) { - hidra2::NetworkProducerPeerImpl NetworkProducerPeerImpl(0, ""); + hidra2::Connection NetworkProducerPeerImpl(0, ""); hidra2::MockIO mockIO; NetworkProducerPeerImpl.SetIO__(&mockIO); @@ -169,48 +171,26 @@ TEST(CreateAndOpenFileByFileId, Ok) { } TEST(CheckIfValidNetworkOpCode, NormalOpcodeSendData) { - hidra2::NetworkProducerPeerImpl NetworkProducerPeerImpl(0, ""); + hidra2::Connection NetworkProducerPeerImpl(0, ""); EXPECT_TRUE(NetworkProducerPeerImpl.CheckIfValidNetworkOpCode(hidra2::Opcode::kNetOpcodeSendData)); } TEST(CheckIfValidNetworkOpCode, FalseOpcode) { - hidra2::NetworkProducerPeerImpl NetworkProducerPeerImpl(0, ""); + hidra2::Connection NetworkProducerPeerImpl(0, ""); EXPECT_FALSE(NetworkProducerPeerImpl.CheckIfValidNetworkOpCode(hidra2::Opcode::kNetOpcodeCount)); } TEST(CheckIfValidNetworkOpCode, FalseOpcodeByNumber) { - hidra2::NetworkProducerPeerImpl NetworkProducerPeerImpl(0, ""); + hidra2::Connection NetworkProducerPeerImpl(0, ""); EXPECT_FALSE(NetworkProducerPeerImpl.CheckIfValidNetworkOpCode((hidra2::Opcode) 90)); } TEST(CheckIfValidNetworkOpCode, FalseOpcodeByNegativNumber) { - hidra2::NetworkProducerPeerImpl NetworkProducerPeerImpl(0, ""); + hidra2::Connection NetworkProducerPeerImpl(0, ""); //Technically -1 is some big positive number since Opcode is an unsigned 8 bit number EXPECT_FALSE(NetworkProducerPeerImpl.CheckIfValidNetworkOpCode((hidra2::Opcode) - 1)); } -class ReceiveAndSaveFileMock : public hidra2::NetworkProducerPeerImpl { - public: - ReceiveAndSaveFileMock(hidra2::SocketDescriptor socket_fd, const std::string& address) : NetworkProducerPeerImpl( - socket_fd, - address) {} - - FileDescriptor CreateAndOpenFileByFileId(uint64_t file_id, Error* err) const noexcept override { - ErrorInterface* error = nullptr; - auto data = CreateAndOpenFileByFileId_t(file_id, &error); - err->reset(error); - return data; - } - MOCK_CONST_METHOD2(CreateAndOpenFileByFileId_t, FileDescriptor(uint64_t - file_id, ErrorInterface * *err)); - - bool CheckIfValidFileSize(size_t file_size) const noexcept override { - return CheckIfValidFileSize_t(file_size); - } - MOCK_CONST_METHOD1(CheckIfValidFileSize_t, bool(size_t - file_size)); - -}; class ReceiveAndSaveFileFixture : public testing::Test { public: @@ -223,10 +203,10 @@ class ReceiveAndSaveFileFixture : public testing::Test { Error err; hidra2::MockIO mockIO; - std::unique_ptr<ReceiveAndSaveFileMock> networkProducerPeer; + std::unique_ptr<Connection> networkProducerPeer; void SetUp() override { - networkProducerPeer.reset(new ReceiveAndSaveFileMock(expected_socket_descriptor, expected_address)); + networkProducerPeer.reset(new Connection(expected_socket_descriptor, expected_address)); networkProducerPeer->SetIO__(&mockIO); } }; @@ -234,7 +214,7 @@ class ReceiveAndSaveFileFixture : public testing::Test { TEST_F(ReceiveAndSaveFileFixture, CheckFileSizeError) { err = nullptr; - EXPECT_CALL(*networkProducerPeer, CheckIfValidFileSize_t(expected_file_size)) + EXPECT_CALL(*networkProducerPeer, CheckIfValidFileSize(expected_file_size)) .WillOnce( Return(false) ); @@ -801,16 +781,14 @@ TEST_F(StartPeerListenerFixture, Ok) { EXPECT_CALL(mockIO, NewThread_t(_)); - networkProducerPeer->StartPeerListener(); + networkProducerPeer->Listen(); } TEST_F(StartPeerListenerFixture, AlreadyListening) { EXPECT_CALL(mockIO, NewThread_t(_)).Times(1); - networkProducerPeer->StartPeerListener(); - - networkProducerPeer->StartPeerListener(); + networkProducerPeer->Listen(); } - +*/ } diff --git a/receiver/unittests/test_network_producer_peer.cpp b/receiver/unittests/test_network_producer_peer.cpp deleted file mode 100644 index 23f5ba810..000000000 --- a/receiver/unittests/test_network_producer_peer.cpp +++ /dev/null @@ -1,15 +0,0 @@ -#include <gtest/gtest.h> -#include <gmock/gmock.h> -#include <unittests/MockIO.h> -#include "../src/network_producer_peer.h" - -using ::testing::Ne; - -namespace { - -TEST(CreateNetworkProducerPeer, PointerIsNotNullptr) { - std::unique_ptr<hidra2::NetworkProducerPeer> producer = hidra2::NetworkProducerPeer::CreateNetworkProducerPeer(1, ""); - ASSERT_THAT(producer.get(), Ne(nullptr)); -} - -} diff --git a/receiver/unittests/test_receiver.cpp b/receiver/unittests/test_receiver.cpp index 0cc62ebf4..cd75a6438 100644 --- a/receiver/unittests/test_receiver.cpp +++ b/receiver/unittests/test_receiver.cpp @@ -3,7 +3,7 @@ #include <unittests/MockIO.h> #include "../src/receiver.h" #include "../src/receiver_error.h" -#include "../src/network_producer_peer_impl.h" +#include "../src/connection.h" using ::testing::Return; using ::testing::_; @@ -17,7 +17,7 @@ using ::testing::InSequence; using ::hidra2::Error; using ::hidra2::FileDescriptor; using ::hidra2::ErrorInterface; -using ::hidra2::NetworkProducerPeer; +using ::hidra2::Connection; using ::hidra2::SocketDescriptor; namespace { @@ -33,7 +33,7 @@ class StartListenerFixture : public testing::Test { Error err; - hidra2::MockIO mockIO; + ::testing::NiceMock<hidra2::MockIO> mockIO; hidra2::Receiver receiver; void SetUp() override { @@ -49,33 +49,38 @@ TEST_F(StartListenerFixture, CreateAndBindIPTCPSocketListenerError) { Return(0) )); - receiver.StartListener(expected_address, &err); + receiver.Listen(expected_address, &err, true); ASSERT_THAT(err, Eq(hidra2::IOErrorTemplates::kUnknownIOError)); } TEST_F(StartListenerFixture, InetAcceptConnectionError) { - EXPECT_CALL(mockIO, InetAcceptConnection_t(-1, _)) + EXPECT_CALL(mockIO, InetAcceptConnection_t(_, _)) .WillOnce(DoAll( SetArgPointee<1>(hidra2::IOErrorTemplates::kUnknownIOError.Generate().release()), Return(new std::tuple<std::string, SocketDescriptor>(expected_address, expected_socket_descriptor_client)) )); - receiver.AcceptThreadLogicWork(&err); + receiver.Listen(expected_address, &err, true); ASSERT_THAT(err, Eq(hidra2::IOErrorTemplates::kUnknownIOError)); } TEST_F(StartListenerFixture, Ok) { - EXPECT_CALL(mockIO, InetAcceptConnection_t(-1, _)) + + EXPECT_CALL(mockIO, InetAcceptConnection_t(_, _)) .WillOnce(DoAll( SetArgPointee<1>(nullptr), Return(new std::tuple<std::string, SocketDescriptor>(expected_address, expected_socket_descriptor_client)) )); + EXPECT_CALL(mockIO, NewThread_t(_)). + WillOnce( + Return(nullptr) + ); - receiver.AcceptThreadLogicWork(&err); + receiver.Listen(expected_address, &err, true); ASSERT_THAT(err, Eq(nullptr)); } diff --git a/receiver/unittests_OLD/test_file_reference_handler.cpp b/receiver/unittests_OLD/test_file_reference_handler.cpp deleted file mode 100644 index ffd4ce74d..000000000 --- a/receiver/unittests_OLD/test_file_reference_handler.cpp +++ /dev/null @@ -1,20 +0,0 @@ -#include <gtest/gtest.h> -#include <gmock/gmock.h> -#include <system_wrappers/io.h> -#include "../src/receiver.h" - -namespace { - -using ::testing::Return; -using ::testing::_; -using ::testing::DoAll; -using ::testing::SetArgReferee; -using ::testing::Gt; -using ::testing::Mock; -using ::testing::InSequence; - -TEST(FileReferenceHandler, add_file) { - EXPECT_EQ(1, 1); -} - -} diff --git a/receiver/unittests_OLD/test_receiver.cpp b/receiver/unittests_OLD/test_receiver.cpp deleted file mode 100644 index e96b9c1da..000000000 --- a/receiver/unittests_OLD/test_receiver.cpp +++ /dev/null @@ -1,194 +0,0 @@ -#include <gtest/gtest.h> -#include <gmock/gmock.h> -#include <system_wrappers/io.h> -#include "../src/receiver.h" -#include "../../common/cpp/unittests/MockIO.h" - -namespace { - -using ::testing::Return; -using ::testing::_; -using ::testing::DoAll; -using ::testing::SetArgReferee; -using ::testing::Gt; -using ::testing::Mock; -using ::testing::InSequence; - -//Need to be redone completely - -// hidra2::Receiver receiver; -// hidra2::FileDescriptor expected_socket_fd = 1338; -// -//TEST(Receiver, start_Listener__CreateSocket_fail) { -// hidra2::MockIO mockIO; -// receiver.__set_io(&mockIO); -// -// InSequence sequence; -// -// std::string expected_address = "127.0.0.1"; -// uint16_t expected_port = 9876; -// -// EXPECT_CALL(mockIO, CreateSocket(hidra2::AddressFamilies::INET, hidra2::SocketTypes::STREAM, -// hidra2::SocketProtocols::IP, _)) -// .Times(1) -// .WillOnce( -// DoAll( -// testing::SetArgPointee<3>(hidra2::IOError::UNKNOWN_ERROR), -// Return(-1) -// )); -// -// hidra2::ReceiverError receiver_error; -// receiver.StartListener(expected_address, expected_port, &receiver_error); -// EXPECT_EQ(receiver_error, hidra2::ReceiverError::FAILED_CREATING_SOCKET); -// -// Mock::VerifyAndClearExpectations(&mockIO); -//} -// -//TEST(Receiver, start_Listener__InetBind_fail) { -// hidra2::MockIO mockIO; -// receiver.__set_io(&mockIO); -// -// InSequence sequence; -// -// std::string expected_address = "127.0.0.1"; -// uint16_t expected_port = 9876; -// -// EXPECT_CALL(mockIO, CreateSocket(hidra2::AddressFamilies::INET, hidra2::SocketTypes::STREAM, -// hidra2::SocketProtocols::IP, _)) -// .Times(1) -// .WillOnce( -// DoAll( -// testing::SetArgPointee<3>(hidra2::IOError::NO_ERROR), -// Return(expected_socket_fd) -// )); -// -// EXPECT_CALL(mockIO, InetBind(expected_socket_fd, expected_address, expected_port, _)) -// .Times(1) -// .WillOnce(testing::SetArgPointee<3>(hidra2::IOError::ADDRESS_ALREADY_IN_USE)); -// -// EXPECT_CALL(mockIO, Close(expected_socket_fd)) -// .Times(1); -// -// hidra2::ReceiverError receiver_error; -// receiver.StartListener(expected_address, expected_port, &receiver_error); -// EXPECT_EQ(receiver_error, hidra2::ReceiverError::FAILED_CREATING_SOCKET); -// -// Mock::VerifyAndClearExpectations(&mockIO); -//} -// -//TEST(Receiver, start_Listener__Listen_fail) { -// hidra2::MockIO mockIO; -// receiver.__set_io(&mockIO); -// -// InSequence sequence; -// -// std::string expected_address = "127.0.0.1"; -// uint16_t expected_port = 9876; -// -// EXPECT_CALL(mockIO, CreateSocket(hidra2::AddressFamilies::INET, hidra2::SocketTypes::STREAM, -// hidra2::SocketProtocols::IP, _)) -// .Times(1) -// .WillOnce( -// DoAll( -// testing::SetArgPointee<3>(hidra2::IOError::NO_ERROR), -// Return(expected_socket_fd) -// )); -// -// EXPECT_CALL(mockIO, InetBind(expected_socket_fd, expected_address, expected_port, _)) -// .Times(1) -// .WillOnce(testing::SetArgPointee<3>(hidra2::IOError::NO_ERROR)); -// -// EXPECT_CALL(mockIO, Listen(expected_socket_fd, receiver.kMaxUnacceptedConnectionsBacklog, _)) -// .Times(1) -// .WillOnce(testing::SetArgPointee<2>(hidra2::IOError::BAD_FILE_NUMBER)); -// -// EXPECT_CALL(mockIO, Close(expected_socket_fd, _)) -// .Times(1); -// -// hidra2::ReceiverError receiver_error; -// receiver.StartListener(expected_address, expected_port, &receiver_error); -// EXPECT_EQ(receiver_error, hidra2::ReceiverError::FAILED_CREATING_SOCKET); -// -// Mock::VerifyAndClearExpectations(&mockIO); -//} -// -//TEST(Receiver, start_Listener) { -// hidra2::MockIO mockIO; -// receiver.__set_io(&mockIO); -// -// InSequence sequence; -// -// std::string expected_address = "127.0.0.1"; -// uint16_t expected_port = 9876; -// -// EXPECT_CALL(mockIO, CreateSocket(hidra2::AddressFamilies::INET, hidra2::SocketTypes::STREAM, -// hidra2::SocketProtocols::IP, _)) -// .Times(1) -// .WillOnce( -// DoAll( -// testing::SetArgPointee<3>(hidra2::IOError::NO_ERROR), -// Return(expected_socket_fd) -// )); -// -// EXPECT_CALL(mockIO, InetBind(expected_socket_fd, expected_address, expected_port, _)) -// .Times(1) -// .WillOnce(testing::SetArgPointee<3>(hidra2::IOError::NO_ERROR)); -// -// EXPECT_CALL(mockIO, Listen(expected_socket_fd, receiver.kMaxUnacceptedConnectionsBacklog, _)) -// .Times(1) -// .WillOnce(testing::SetArgPointee<2>(hidra2::IOError::NO_ERROR)); -// -// /** -// * TODO: Since StartListener will start a new thread -// * we need to mock std::thread -// */ -// EXPECT_CALL(mockIO, InetAcceptProxy(expected_socket_fd, _)) -// .Times(1) -// .WillOnce( -// DoAll( -// testing::SetArgPointee<1>(hidra2::IOError::BAD_FILE_NUMBER), -// Return(nullptr) -// )); -// -// -// hidra2::ReceiverError receiver_error; -// receiver.StartListener(expected_address, expected_port, &receiver_error); -// EXPECT_EQ(receiver_error, hidra2::ReceiverError::NO_ERROR); -// -// sleep(1); //Make sure that the thread is running so InetAcceptProxy would work -// -// Mock::VerifyAndClearExpectations(&mockIO); -//} -// -//TEST(Receiver, start_Listener_already_Listening) { -// InSequence sequence; -// -// std::string expected_address = "127.0.0.1"; -// uint16_t expected_port = 9876; -// -// hidra2::ReceiverError receiver_error; -// receiver.StartListener(expected_address, expected_port, &receiver_error); -// EXPECT_EQ(receiver_error, hidra2::ReceiverError::ALREADY_LISTEING); -//} -// -//TEST(Receiver, stop_Listener) { -// hidra2::MockIO mockIO; -// receiver.__set_io(&mockIO); -// -// -// EXPECT_CALL(mockIO, Close(expected_socket_fd)) -// .Times(1) -// .WillOnce( -// Return(0) -// ); -// -// -// hidra2::ReceiverError receiver_error; -// -// receiver.stop_listener(&receiver_error); -// EXPECT_EQ(receiver_error, hidra2::ReceiverError::NO_ERROR); -// -// Mock::VerifyAndClearExpectations(&mockIO); -//} - -} -- GitLab