From cf052c91be3fe3efbcfeb4ee1dc7f09ca8c7bf3f Mon Sep 17 00:00:00 2001
From: Carsten Patzke <carsten.patzke@desy.de>
Date: Tue, 12 Dec 2017 13:16:51 +0100
Subject: [PATCH] Use new IO layer in producer and receiver

---
 common/cpp/include/system_wrappers/has_io.h   |  7 +-
 common/cpp/include/system_wrappers/io.h       |  2 +
 .../cpp/include/system_wrappers/system_io.h   |  2 +-
 common/cpp/src/has_io.cpp                     | 13 ---
 common/cpp/src/system_io_linux.cpp            | 14 +++-
 producer/api/src/producer_impl.cpp            |  6 ++
 .../src/dummy_detector.cpp                    |  9 +--
 receiver/CMakeLists.txt                       |  6 +-
 receiver/src/file_refernce_handler.cpp        |  2 +-
 receiver/src/main.cpp                         | 12 ++-
 receiver/src/network_producer_peer.cpp        | 56 +++++++------
 receiver/src/network_producer_peer.h          | 20 +++--
 .../src/network_producer_peer_handlers.cpp    | 10 +--
 receiver/src/receiver.cpp                     | 79 +++++++++++--------
 receiver/src/receiver.h                       | 24 +++---
 15 files changed, 143 insertions(+), 119 deletions(-)

diff --git a/common/cpp/include/system_wrappers/has_io.h b/common/cpp/include/system_wrappers/has_io.h
index 79457b5d1..2dd89a39a 100644
--- a/common/cpp/include/system_wrappers/has_io.h
+++ b/common/cpp/include/system_wrappers/has_io.h
@@ -11,17 +11,12 @@ class HasIO {
     static IO* const kDefaultIO;
 
     IO* io;
-    IOUtils* io_utils;
 
-    HasIO();
+    explicit HasIO();
   public:
-    ~HasIO();
 
     void __set_io(IO* io);
     IO* __get_io();
-
-    void __set_io_utils(IOUtils* io_utils);
-    IOUtils* __get_io_utils();
 };
 
 }
diff --git a/common/cpp/include/system_wrappers/io.h b/common/cpp/include/system_wrappers/io.h
index 0328291df..f2a781a69 100644
--- a/common/cpp/include/system_wrappers/io.h
+++ b/common/cpp/include/system_wrappers/io.h
@@ -14,12 +14,14 @@ namespace hidra2 {
 
 enum class IOErrors {
     NO_ERROR,
+    BAD_FILE_NUMBER,
     FILE_NOT_FOUND,
     READ_ERROR,
     PERMISSIONS_DENIED,
     UNSUPPORTED_ADDRESS_FAMILY,
     INVALID_ADDRESS_FORMAT,
     STREAM_EOF,
+    ADDRESS_ALREADY_IN_USE,
     CONNECTION_REFUSED,
     CONNECTION_RESET_BY_PEER,
     TIMEOUT,
diff --git a/common/cpp/include/system_wrappers/system_io.h b/common/cpp/include/system_wrappers/system_io.h
index ae45cc88e..1b1bf1238 100644
--- a/common/cpp/include/system_wrappers/system_io.h
+++ b/common/cpp/include/system_wrappers/system_io.h
@@ -26,7 +26,7 @@ class SystemIO final : public IO {
 
 
     size_t          receive         (FileDescriptor socket_fd, void* buf, size_t length, hidra2::IOErrors* err);
-    void            receive_timeout (FileDescriptor socket_fd,
+    void receive_timeout            (FileDescriptor socket_fd,
                                      void* buf,
                                      size_t length,
                                      uint16_t timeout_in_sec,
diff --git a/common/cpp/src/has_io.cpp b/common/cpp/src/has_io.cpp
index af8748610..932252941 100644
--- a/common/cpp/src/has_io.cpp
+++ b/common/cpp/src/has_io.cpp
@@ -5,11 +5,6 @@ hidra2::IO* const hidra2::HasIO::kDefaultIO = new hidra2::SystemIO();
 
 hidra2::HasIO::HasIO() {
     io = kDefaultIO;
-    io_utils = new hidra2::IOUtils(&io);
-}
-
-hidra2::HasIO::~HasIO() {
-    delete io_utils;
 }
 
 void hidra2::HasIO::__set_io(hidra2::IO* io) {
@@ -19,11 +14,3 @@ void hidra2::HasIO::__set_io(hidra2::IO* io) {
 hidra2::IO* hidra2::HasIO::__get_io() {
     return io;
 }
-
-void hidra2::HasIO::__set_io_utils(hidra2::IOUtils* io_utils) {
-    this->io_utils = io_utils;
-}
-
-hidra2::IOUtils* hidra2::HasIO::__get_io_utils() {
-    return io_utils;
-}
diff --git a/common/cpp/src/system_io_linux.cpp b/common/cpp/src/system_io_linux.cpp
index 80162aa44..99e592a7d 100644
--- a/common/cpp/src/system_io_linux.cpp
+++ b/common/cpp/src/system_io_linux.cpp
@@ -24,6 +24,9 @@ IOErrors IOErrorFromErrno() {
     case 0:
         err = IOErrors::NO_ERROR;
         break;
+    case EBADF:
+        err = IOErrors::BAD_FILE_NUMBER;
+        break;
     case ENOENT:
     case ENOTDIR:
         err = IOErrors::FILE_NOT_FOUND;
@@ -34,11 +37,14 @@ IOErrors IOErrorFromErrno() {
     case ECONNREFUSED:
         err = IOErrors::CONNECTION_REFUSED;
         break;
+    case EADDRINUSE:
+        err = IOErrors::ADDRESS_ALREADY_IN_USE;
+        break;
     case ECONNRESET:
         err = IOErrors::CONNECTION_RESET_BY_PEER;
-            break;
+        break;
     default:
-        std::cout << "[TMP/IOErrorFromErrno] Unknown error code: " << errno << std::endl;
+        std::cout << "[TMP/IOErrorFromErrNo] Unknown error code: " << errno << std::endl;
         err = IOErrors::UNKNOWN_ERROR;
         break;
     }
@@ -308,7 +314,7 @@ size_t hidra2::SystemIO::receive(hidra2::FileDescriptor socket_fd, void* buf, si
     size_t already_received = 0;
 
     while(already_received < length) {
-        ssize_t received_amount = ::recv(socket_fd, buf + already_received, length - already_received, 0);
+        ssize_t received_amount = ::recv(socket_fd, (uint8_t*)buf + already_received, length - already_received, 0);
         if(received_amount == 0) {
             *err = IOErrors::STREAM_EOF;
             return already_received;
@@ -358,7 +364,7 @@ size_t hidra2::SystemIO::send(hidra2::FileDescriptor socket_fd,
     size_t already_sent = 0;
 
     while(already_sent < length) {
-        ssize_t send_amount = ::send(socket_fd, buf + already_sent, length - already_sent, 0);
+        ssize_t send_amount = ::send(socket_fd, (uint8_t*)buf + already_sent, length - already_sent, 0);
         if(send_amount == 0) {
             *err = IOErrors::STREAM_EOF;
             return already_sent;
diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp
index 5b1b19e48..4a4c9951f 100644
--- a/producer/api/src/producer_impl.cpp
+++ b/producer/api/src/producer_impl.cpp
@@ -4,6 +4,8 @@
 #include <arpa/inet.h>
 #include <iostream>
 #include <cstring>
+#include <sys/sendfile.h>
+#include <fcntl.h>
 #include "producer_impl.h"
 
 const uint32_t hidra2::ProducerImpl::kVersion = 1;
@@ -121,6 +123,9 @@ hidra2::ProducerError hidra2::ProducerImpl::send(std::string filename, void *dat
     size_t already_send = 0;
     uint64_t max_chunk_size = (uint64_t)1024*(uint64_t)1024*(uint64_t)1024*(uint64_t)2;
 
+
+    int fd = ::open("/home/cpatzke/Desktop/bigfile", O_RDONLY);
+
     while(!network_error && already_send < file_size) {
         size_t need_to_send = max_chunk_size;
 
@@ -140,6 +145,7 @@ hidra2::ProducerError hidra2::ProducerImpl::send(std::string filename, void *dat
             std::cerr << "hidra2::ProducerImpl::send/send2 error" << std::endl;
             return PRODUCER_ERROR__SENDING_CHUNK_FAILED;
         }
+
         io->send(client_fd_, (uint8_t*)data + already_send, need_to_send, &io_error);
         if(io_error != IOErrors::NO_ERROR) {
             std::cerr << "hidra2::ProducerImpl::send/send3 error" << std::endl;
diff --git a/producer/dummy-event-detector-cpp/src/dummy_detector.cpp b/producer/dummy-event-detector-cpp/src/dummy_detector.cpp
index 719926bb7..996d7c1e4 100644
--- a/producer/dummy-event-detector-cpp/src/dummy_detector.cpp
+++ b/producer/dummy-event-detector-cpp/src/dummy_detector.cpp
@@ -7,8 +7,8 @@
 #include <unistd.h>
 
 int DummyDetector::main(int argc, char **argv) {
+    auto producer = hidra2::Producer::create();
 
-    std::unique_ptr<hidra2::Producer> producer = hidra2::Producer::create();
     hidra2::ProducerError err = producer->connect_to_receiver("127.0.0.1:8099");
     if(err) {
         std::cerr << "Fail to connect to receiver. ProducerError: " << err << std::endl;
@@ -22,15 +22,14 @@ int DummyDetector::main(int argc, char **argv) {
     }
      */
 
-    int fd = open("/mnt/ramdisk/bigfile", O_RDONLY);
+    int fd = open("/tmp/bigfile", O_RDONLY);
     struct stat astat {};
     fstat(fd, &astat);
 
-
     size_t map_size = static_cast<size_t>(ceil(float(astat.st_size)/float(getpagesize()))*getpagesize());
     void *buffer = mmap(nullptr, map_size, PROT_READ, MAP_SHARED, fd, 0);
 
-
+    madvise(buffer, map_size, MADV_SEQUENTIAL | MADV_WILLNEED);
 
     hidra2::ProducerError error;
     error = producer->send("testfile4", buffer, astat.st_size);
@@ -38,7 +37,7 @@ int DummyDetector::main(int argc, char **argv) {
     if(error) {
         std::cerr << "File was not successfully deprecated_send, ErrorCode: " << error << std::endl;
     } else {
-        std::cout << "File was successfully deprecated_send." << std::endl;
+        std::cout << "File was successfully send." << std::endl;
     }
 
     munmap(buffer, map_size);
diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt
index 6039279f4..1be529ac8 100644
--- a/receiver/CMakeLists.txt
+++ b/receiver/CMakeLists.txt
@@ -18,7 +18,7 @@ set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX)
 ################################
 # Testing
 ################################
-#set(TEST_SOURCE_FILES unittests/test_receiver.cpp)
-#set(TEST_LIBRARIES producer-api)
+set(TEST_SOURCE_FILES unittests/test_receiver.cpp)
+set(TEST_LIBRARIES producer-api)
 
-#gtest(${TARGET_NAME} ${TEST_SOURCE_FILES} ${TEST_LIBRARIES})
+gtest(${TARGET_NAME} ${TEST_SOURCE_FILES} ${TEST_LIBRARIES})
diff --git a/receiver/src/file_refernce_handler.cpp b/receiver/src/file_refernce_handler.cpp
index cf6b4855b..72200ef69 100644
--- a/receiver/src/file_refernce_handler.cpp
+++ b/receiver/src/file_refernce_handler.cpp
@@ -14,7 +14,7 @@ hidra2::FileReferenceId FileReferenceHandler::add_file(std::string filename,
 
     FileReferenceId file_ref_id = ++kGlobalFileRefernceId;
 
-    std::string full_path = filename;//TODO
+    std::string full_path = filename;//TODO add path prefix, and check for exploit with '/' or '..'
     int fd = open(full_path.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666);
     if(fd == -1) {
         err = FILE_REFERENCE_HANDLER_ERR__OPEN_FAILED;
diff --git a/receiver/src/main.cpp b/receiver/src/main.cpp
index 4a1dfd199..4205157ad 100644
--- a/receiver/src/main.cpp
+++ b/receiver/src/main.cpp
@@ -4,13 +4,19 @@
 int main (int argc, char* argv[]) {
     auto* receiver = new hidra2::Receiver();
 
-    receiver->start_listener("127.0.0.1", 8099);
+    hidra2::ReceiverError err;
 
-    std::cout << "started listener" << std::endl;
+    receiver->start_listener("127.0.0.1", 8099, &err);
+    if(err != hidra2::ReceiverError::NO_ERROR) {
+        std::cerr << "Fail to start receiver" << std::endl;
+        return 1;
+    }
+    std::cout << "start_listener" << std::endl;
 
     getchar();
 
-    receiver->stop_listener();
+    receiver->stop_listener(&err);
+    std::cout << "stop_listener" << std::endl;
     getchar();
 
     return 0;
diff --git a/receiver/src/network_producer_peer.cpp b/receiver/src/network_producer_peer.cpp
index 17530d797..3bdab2f75 100644
--- a/receiver/src/network_producer_peer.cpp
+++ b/receiver/src/network_producer_peer.cpp
@@ -1,14 +1,16 @@
 #include <cstring>
+#include <assert.h>
 #include "network_producer_peer.h"
 
 namespace hidra2 {
 
 FileReferenceHandler NetworkProducerPeer::file_reference_handler;
+const size_t NetworkProducerPeer::kGenericBufferSize = 1024*50;//50KiByte
 std::atomic<uint32_t> NetworkProducerPeer::kNetworkProducerPeerCount;
 
 const std::vector<NetworkProducerPeer::RequestHandlerInformation> NetworkProducerPeer::kRequestHandlers = NetworkProducerPeer::init_request_handlers();
 
-NetworkProducerPeer::NetworkProducerPeer(int socket_fd, std::string address)  {
+NetworkProducerPeer::NetworkProducerPeer(int socket_fd, std::string address) : HasIO() {
     address_ = std::move(address);
     socket_fd_ = socket_fd;
     connection_id_ = kNetworkProducerPeerCount++;
@@ -22,21 +24,26 @@ const std::string& NetworkProducerPeer::address() const {
     return address_;
 }
 
-void NetworkProducerPeer::start_peer_receiver() {
-    if(receiver_thread_)
+bool NetworkProducerPeer::is_listening() const {
+    return is_listening_;
+}
+
+void NetworkProducerPeer::start_peer_listener() {
+    if(listener_thread_ || is_listening_)
         return;
-    receiver_thread_ = new std::thread([this] {
+    is_listening_ = true;
+    listener_thread_ = new std::thread([this] {
         internal_receiver_thread_();
     });
 }
 
 void NetworkProducerPeer::internal_receiver_thread_() {
-    /** Must be as large as the largest request type (not including the data) */
-    auto* const generic_request = (GenericNetworkRequest*) malloc(1024*50);
-    auto* const generic_response = (GenericNetworkResponse*) malloc(1024*50);
+    auto* const generic_request = (GenericNetworkRequest*) malloc(kGenericBufferSize);
+    auto* const generic_response = (GenericNetworkResponse*) malloc(kGenericBufferSize);
+
 
     IOErrors err;
-    while(true) {
+    while(is_listening_) {
         err = IOErrors::NO_ERROR;
 
         io->receive_timeout(socket_fd_, generic_request, sizeof(GenericNetworkRequest), 1, &err);
@@ -47,11 +54,12 @@ void NetworkProducerPeer::internal_receiver_thread_() {
             }
 
             if(err == IOErrors::STREAM_EOF) {
-                io->deprecated_close(socket_fd_);
-                std::cout << "[" << connection_id() << "] Disconnected." << std::endl;
+                is_listening_ = false;
                 break;
             }
+
             std::cerr << "[" << connection_id() << "] Fail to receive data" << std::endl;
+            is_listening_ = false;
             break;
         }
 
@@ -69,24 +77,21 @@ void NetworkProducerPeer::internal_receiver_thread_() {
         }
     }
 
+    io->deprecated_close(socket_fd_);
+    std::cout << "[" << connection_id() << "] Disconnected." << std::endl;
+
     free(generic_request);
     free(generic_response);
 
 }
 
-void NetworkProducerPeer::stop_peer_receiver() {
-    if(!receiver_thread_)
+void NetworkProducerPeer::stop_peer_listener() {
+    is_listening_ = false;
+    if(!listener_thread_)
         return;
-    receiver_thread_->join();
-    receiver_thread_ = nullptr;
-}
-
-void NetworkProducerPeer::disconnect() {
-    stop_peer_receiver();
-
-    io->deprecated_close(socket_fd_);
-
-    std::cout << "[" << connection_id() << "] Disconnected." << std::endl;
+    listener_thread_->join();
+    delete listener_thread_;
+    listener_thread_ = nullptr;
 }
 
 size_t NetworkProducerPeer::handle_generic_request_(GenericNetworkRequest* request, GenericNetworkResponse* response) {
@@ -101,6 +106,9 @@ size_t NetworkProducerPeer::handle_generic_request_(GenericNetworkRequest* reque
 
     auto handler_information = kRequestHandlers[request->op_code];
 
+    assert(handler_information.request_size <= kGenericBufferSize);//Would overwrite memory
+    assert(handler_information.response_size <= kGenericBufferSize);//Would overwrite memory
+
     IOErrors err;
     //receive the rest of the message
     io->receive_timeout(socket_fd_, request->data, handler_information.request_size - sizeof(GenericNetworkRequest), 30, &err);
@@ -114,5 +122,9 @@ size_t NetworkProducerPeer::handle_generic_request_(GenericNetworkRequest* reque
     return handler_information.response_size;
 }
 
+NetworkProducerPeer::~NetworkProducerPeer() {
+    stop_peer_listener();
+}
+
 }
 
diff --git a/receiver/src/network_producer_peer.h b/receiver/src/network_producer_peer.h
index 1a46e4134..5ce160cf6 100644
--- a/receiver/src/network_producer_peer.h
+++ b/receiver/src/network_producer_peer.h
@@ -23,6 +23,9 @@ class NetworkProducerPeer : HasIO {
         RequestHandler handler;
     };
   private:
+    /** Must be as large as the largest request type (not including the data) */
+    static const size_t kGenericBufferSize;
+
     static FileReferenceHandler file_reference_handler;
     static const std::vector<RequestHandlerInformation> kRequestHandlers;
     static std::atomic<uint32_t> kNetworkProducerPeerCount;
@@ -31,9 +34,10 @@ class NetworkProducerPeer : HasIO {
     int             socket_fd_;
     std::string     address_;
 
-    bool got_hello_ = false;
+    bool            got_hello_ = false;
 
-    std::thread*    receiver_thread_ = nullptr;
+    bool            is_listening_ = false;
+    std::thread*    listener_thread_ = nullptr;
 
     void internal_receiver_thread_();
     size_t handle_generic_request_(GenericNetworkRequest* request, GenericNetworkResponse* response);
@@ -43,17 +47,21 @@ class NetworkProducerPeer : HasIO {
     static void handle_send_data_chunk_request_(NetworkProducerPeer* self, const SendDataChunkRequest* request, SendDataChunkResponse* response);
 
   public:
+    NetworkProducerPeer &operator=(const NetworkProducerPeer &) = delete;
+    NetworkProducerPeer() = default;
+
     NetworkProducerPeer(int socket_fd, std::string address);
+    ~NetworkProducerPeer();
 
     static const std::vector<RequestHandlerInformation> init_request_handlers();
 
-    uint32_t connection_id() const;
 
-    void start_peer_receiver();
-    void stop_peer_receiver();
+    void start_peer_listener();
+    void stop_peer_listener();
 
+    uint32_t connection_id() const;
     const std::string& address() const;
-    void disconnect();
+    bool is_listening() const;
 };
 
 }
diff --git a/receiver/src/network_producer_peer_handlers.cpp b/receiver/src/network_producer_peer_handlers.cpp
index 225ff490c..6abf14f70 100644
--- a/receiver/src/network_producer_peer_handlers.cpp
+++ b/receiver/src/network_producer_peer_handlers.cpp
@@ -78,14 +78,6 @@ void NetworkProducerPeer::handle_send_data_chunk_request_(NetworkProducerPeer* s
                                                           const SendDataChunkRequest* request,
                                                           SendDataChunkResponse* response) {
     IOErrors io_error;
-    /*
-    std::cout << "[CHUNK]op_code " << request->op_code << std::endl;
-    std::cout << "[CHUNK]request_id " << request->request_id << std::endl;
-
-    std::cout << "[CHUNK]file_reference_id " << request->file_reference_id << std::endl;
-    std::cout << "[CHUNK]start_byte " << request->start_byte << std::endl;
-    std::cout << "[CHUNK]chunk_size " << request->chunk_size << std::endl;
-    */
     auto file_info = self->file_reference_handler.get_file(request->file_reference_id);
 
     if(file_info == nullptr || file_info->owner != self->connection_id()) {
@@ -117,7 +109,7 @@ void NetworkProducerPeer::handle_send_data_chunk_request_(NetworkProducerPeer* s
         return;
     }
 
-    self->io->receive_timeout(self->socket_fd_, mapped_file + map_offset, request->chunk_size, 30, &io_error);
+    self->io->receive_timeout(self->socket_fd_, (uint8_t*)mapped_file + map_offset, request->chunk_size, 30, &io_error);
     if(io_error != IOErrors::NO_ERROR) {
         std::cerr << "Fail to receive all the chunk data." << std::endl;
         response->error_code = NET_ERR__INTERNAL_SERVER_ERROR;
diff --git a/receiver/src/receiver.cpp b/receiver/src/receiver.cpp
index 9cb79b780..74256e0ed 100644
--- a/receiver/src/receiver.cpp
+++ b/receiver/src/receiver.cpp
@@ -1,73 +1,82 @@
-#include <sys/socket.h>
 #include <cstring>
 #include <unistd.h>
 #include <arpa/inet.h>
 #include <iostream>
-#include <common/networking.h>
 #include "receiver.h"
 #include "network_producer_peer.h"
 
 const int hidra2::Receiver::kMaxUnacceptedConnectionsBacklog = 5;
 
-void hidra2::Receiver::start_listener(std::string listener_address, uint16_t port) {
+void hidra2::Receiver::start_listener(std::string listener_address, uint16_t port, ReceiverError* err) {
+    *err = ReceiverError::NO_ERROR;
+
     if(listener_running_) {
-        return;//Already listening
+        *err = ReceiverError::ALREADY_LISTEING;
+        return;
     }
     listener_running_ = true;
 
-    IOErrors err;
+    IOErrors io_error;
 
-    FileDescriptor listener_fd = io->create_socket(AddressFamilies::INET, SocketTypes::STREAM, SocketProtocols::IP, &err);
-    if(err != IOErrors::NO_ERROR) {
+    FileDescriptor listener_fd = io->create_socket(AddressFamilies::INET, SocketTypes::STREAM, SocketProtocols::IP, &io_error);
+    if(io_error != IOErrors::NO_ERROR) {
         std::cerr << "Fail to create socket" << std::endl;
+        return;
     }
 
-    io->inet_bind(listener_fd, listener_address, port, &err);
-    if(err != IOErrors::NO_ERROR) {
+    io->inet_bind(listener_fd, listener_address, port, &io_error);
+    if(io_error != IOErrors::NO_ERROR) {
         io->deprecated_close(listener_fd);
         std::cerr << "Fail to bind socket" << std::endl;
+        return;
     }
 
-    io->listen(listener_fd, kMaxUnacceptedConnectionsBacklog, &err);
-    if(err != IOErrors::NO_ERROR) {
+    io->listen(listener_fd, kMaxUnacceptedConnectionsBacklog, &io_error);
+    if(io_error != IOErrors::NO_ERROR) {
         io->deprecated_close(listener_fd);
         std::cerr << "Fail to start listen" << std::endl;
+        return;
     }
 
     listener_fd_ = listener_fd;
 
-    listener_thread_ = new std::thread([this] {
-        socklen_t sockaddr_in_size  = sizeof(sockaddr_in);
-        while(listener_running_) {
-            std::string address;
-            FileDescriptor peer_fd;
-
-            IOErrors err;
-            auto client_info_tuple = io->inet_accept(listener_fd_, &err);
-            if(err != IOErrors::NO_ERROR) {
-                std::cerr << "An error occurred while accepting an incoming connection" << std::endl;
-                return;
-            }
-            std::tie(address, peer_fd) = *client_info_tuple;
-
-            on_new_peer(peer_fd, address);
-        }
+    accept_thread_ = new std::thread([this] {
+        accept_thread_logic_();//TODO add peer to some list
     });
 
 }
 
-void hidra2::Receiver::stop_listener() {
+void hidra2::Receiver::accept_thread_logic_() {
+    while(listener_running_) {
+        std::string address;
+        FileDescriptor peer_fd;
+
+        IOErrors io_error;
+        auto client_info_tuple = io->inet_accept(listener_fd_, &io_error);
+        if(io_error != IOErrors::NO_ERROR) {
+            std::cerr << "An error occurred while accepting an incoming connection" << std::endl;
+            return;
+        }
+        std::tie(address, peer_fd) = *client_info_tuple;
+
+        peer_list_.push_back(on_new_peer_(peer_fd, address));//TODO remove client when disconnect
+    }
+}
+
+void hidra2::Receiver::stop_listener(ReceiverError* err) {
     close(listener_fd_);
     listener_running_ = false;
-    if(listener_thread_)
-        listener_thread_->join();
-    listener_thread_ = nullptr;
+    if(accept_thread_)
+        accept_thread_->join();
+    accept_thread_ = nullptr;
 }
 
-void hidra2::Receiver::on_new_peer(int peer_socket_fd, std::string address) {
-    NetworkProducerPeer peer(peer_socket_fd, address);
+std::unique_ptr<hidra2::NetworkProducerPeer> hidra2::Receiver::on_new_peer_(int peer_socket_fd, std::string address) {
+    NetworkProducerPeer* peer = new NetworkProducerPeer(peer_socket_fd, address);
+
+    std::cout << "[" << peer->connection_id() << "] New connection from " << address << std::endl;
 
-    std::cout << "[" << peer.connection_id() << "] New connection from " << address << std::endl;
+    peer->start_peer_listener();
 
-    peer.start_peer_receiver();
+    return std::unique_ptr<hidra2::NetworkProducerPeer>(peer);
 }
diff --git a/receiver/src/receiver.h b/receiver/src/receiver.h
index 90d2fec1d..892f3ca87 100644
--- a/receiver/src/receiver.h
+++ b/receiver/src/receiver.h
@@ -5,16 +5,15 @@
 #include <netinet/in.h>
 #include <thread>
 #include <system_wrappers/has_io.h>
+#include "network_producer_peer.h"
+#include <list>
 
 namespace hidra2 {
 
-enum {
-    RECEIVER__ERR_OK,
-    RECEIVER__ERR_UNK,
-    RECEIVER__ERR_ADDRESS_IS_PROTECTED,
-    RECEIVER__ERR_BAD_FILE_DESCRIPTOR,
-    RECEIVER__ERR_ADDRESS_ALREADY_IN_USE,
-
+enum class ReceiverError {
+    NO_ERROR,
+    ALREADY_LISTEING,
+    FAILD_CREATING_SOCKET,
 };
 
 class Receiver : HasIO {
@@ -22,16 +21,19 @@ class Receiver : HasIO {
     static const int kMaxUnacceptedConnectionsBacklog;
 
     bool listener_running_ = false;
-    std::thread* listener_thread_ = nullptr;
     FileDescriptor listener_fd_;
+    std::thread* accept_thread_ = nullptr;
 
-    void on_new_peer(int peer_socket_fd, std::string address);
+    void accept_thread_logic_();
+    std::list<std::unique_ptr<NetworkProducerPeer>> peer_list_;
+    std::unique_ptr<NetworkProducerPeer> on_new_peer_(int peer_socket_fd, std::string address);
   public:
     Receiver(const Receiver &) = delete;
     Receiver &operator=(const Receiver &) = delete;
     Receiver() = default;
-    void start_listener(std::string listener_address, uint16_t port);
-    void stop_listener();
+
+    void start_listener(std::string listener_address, uint16_t port, ReceiverError* err);
+    void stop_listener(ReceiverError* err);
 };
 
 }
-- 
GitLab