diff --git a/common/cpp/include/system_wrappers/has_io.h b/common/cpp/include/system_wrappers/has_io.h index 79457b5d12157ec9000da0c486b7abb517261b51..2dd89a39ab772e2254b62df5d0618e445ffa554a 100644 --- a/common/cpp/include/system_wrappers/has_io.h +++ b/common/cpp/include/system_wrappers/has_io.h @@ -11,17 +11,12 @@ class HasIO { static IO* const kDefaultIO; IO* io; - IOUtils* io_utils; - HasIO(); + explicit HasIO(); public: - ~HasIO(); void __set_io(IO* io); IO* __get_io(); - - void __set_io_utils(IOUtils* io_utils); - IOUtils* __get_io_utils(); }; } diff --git a/common/cpp/include/system_wrappers/io.h b/common/cpp/include/system_wrappers/io.h index 0328291df84f6e7aa44bb7b869e321b79cb4b901..f2a781a6929f4fdb66a129517a53367dd0cfa161 100644 --- a/common/cpp/include/system_wrappers/io.h +++ b/common/cpp/include/system_wrappers/io.h @@ -14,12 +14,14 @@ namespace hidra2 { enum class IOErrors { NO_ERROR, + BAD_FILE_NUMBER, FILE_NOT_FOUND, READ_ERROR, PERMISSIONS_DENIED, UNSUPPORTED_ADDRESS_FAMILY, INVALID_ADDRESS_FORMAT, STREAM_EOF, + ADDRESS_ALREADY_IN_USE, CONNECTION_REFUSED, CONNECTION_RESET_BY_PEER, TIMEOUT, diff --git a/common/cpp/include/system_wrappers/system_io.h b/common/cpp/include/system_wrappers/system_io.h index ae45cc88eaf147b324f620d6b2450fe62085acc2..1b1bf123846522b9cc0b737c8c74781fc707aaf2 100644 --- a/common/cpp/include/system_wrappers/system_io.h +++ b/common/cpp/include/system_wrappers/system_io.h @@ -26,7 +26,7 @@ class SystemIO final : public IO { size_t receive (FileDescriptor socket_fd, void* buf, size_t length, hidra2::IOErrors* err); - void receive_timeout (FileDescriptor socket_fd, + void receive_timeout (FileDescriptor socket_fd, void* buf, size_t length, uint16_t timeout_in_sec, diff --git a/common/cpp/src/has_io.cpp b/common/cpp/src/has_io.cpp index af8748610ab9619189fda127b869cc0926d9b5f4..932252941e7b41e3eddf5937dd6bd4bc2db7455c 100644 --- a/common/cpp/src/has_io.cpp +++ b/common/cpp/src/has_io.cpp @@ -5,11 +5,6 @@ hidra2::IO* const hidra2::HasIO::kDefaultIO = new hidra2::SystemIO(); hidra2::HasIO::HasIO() { io = kDefaultIO; - io_utils = new hidra2::IOUtils(&io); -} - -hidra2::HasIO::~HasIO() { - delete io_utils; } void hidra2::HasIO::__set_io(hidra2::IO* io) { @@ -19,11 +14,3 @@ void hidra2::HasIO::__set_io(hidra2::IO* io) { hidra2::IO* hidra2::HasIO::__get_io() { return io; } - -void hidra2::HasIO::__set_io_utils(hidra2::IOUtils* io_utils) { - this->io_utils = io_utils; -} - -hidra2::IOUtils* hidra2::HasIO::__get_io_utils() { - return io_utils; -} diff --git a/common/cpp/src/system_io_linux.cpp b/common/cpp/src/system_io_linux.cpp index 80162aa4411959d3cacf04e1af4f8139033bf051..99e592a7d8b9338027e22431c96fae079de4a19b 100644 --- a/common/cpp/src/system_io_linux.cpp +++ b/common/cpp/src/system_io_linux.cpp @@ -24,6 +24,9 @@ IOErrors IOErrorFromErrno() { case 0: err = IOErrors::NO_ERROR; break; + case EBADF: + err = IOErrors::BAD_FILE_NUMBER; + break; case ENOENT: case ENOTDIR: err = IOErrors::FILE_NOT_FOUND; @@ -34,11 +37,14 @@ IOErrors IOErrorFromErrno() { case ECONNREFUSED: err = IOErrors::CONNECTION_REFUSED; break; + case EADDRINUSE: + err = IOErrors::ADDRESS_ALREADY_IN_USE; + break; case ECONNRESET: err = IOErrors::CONNECTION_RESET_BY_PEER; - break; + break; default: - std::cout << "[TMP/IOErrorFromErrno] Unknown error code: " << errno << std::endl; + std::cout << "[TMP/IOErrorFromErrNo] Unknown error code: " << errno << std::endl; err = IOErrors::UNKNOWN_ERROR; break; } @@ -308,7 +314,7 @@ size_t hidra2::SystemIO::receive(hidra2::FileDescriptor socket_fd, void* buf, si size_t already_received = 0; while(already_received < length) { - ssize_t received_amount = ::recv(socket_fd, buf + already_received, length - already_received, 0); + ssize_t received_amount = ::recv(socket_fd, (uint8_t*)buf + already_received, length - already_received, 0); if(received_amount == 0) { *err = IOErrors::STREAM_EOF; return already_received; @@ -358,7 +364,7 @@ size_t hidra2::SystemIO::send(hidra2::FileDescriptor socket_fd, size_t already_sent = 0; while(already_sent < length) { - ssize_t send_amount = ::send(socket_fd, buf + already_sent, length - already_sent, 0); + ssize_t send_amount = ::send(socket_fd, (uint8_t*)buf + already_sent, length - already_sent, 0); if(send_amount == 0) { *err = IOErrors::STREAM_EOF; return already_sent; diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp index 5b1b19e486e305b102737fbe17ebd7c1a4fe6443..4a4c9951f3d3b577c177ec7c7f01e5c2fa6e9860 100644 --- a/producer/api/src/producer_impl.cpp +++ b/producer/api/src/producer_impl.cpp @@ -4,6 +4,8 @@ #include <arpa/inet.h> #include <iostream> #include <cstring> +#include <sys/sendfile.h> +#include <fcntl.h> #include "producer_impl.h" const uint32_t hidra2::ProducerImpl::kVersion = 1; @@ -121,6 +123,9 @@ hidra2::ProducerError hidra2::ProducerImpl::send(std::string filename, void *dat size_t already_send = 0; uint64_t max_chunk_size = (uint64_t)1024*(uint64_t)1024*(uint64_t)1024*(uint64_t)2; + + int fd = ::open("/home/cpatzke/Desktop/bigfile", O_RDONLY); + while(!network_error && already_send < file_size) { size_t need_to_send = max_chunk_size; @@ -140,6 +145,7 @@ hidra2::ProducerError hidra2::ProducerImpl::send(std::string filename, void *dat std::cerr << "hidra2::ProducerImpl::send/send2 error" << std::endl; return PRODUCER_ERROR__SENDING_CHUNK_FAILED; } + io->send(client_fd_, (uint8_t*)data + already_send, need_to_send, &io_error); if(io_error != IOErrors::NO_ERROR) { std::cerr << "hidra2::ProducerImpl::send/send3 error" << std::endl; diff --git a/producer/dummy-event-detector-cpp/src/dummy_detector.cpp b/producer/dummy-event-detector-cpp/src/dummy_detector.cpp index 719926bb7f96a740839b5437ac7504992f08603d..996d7c1e48c6b86c03f0e0a0250d4ec6fdacb4fd 100644 --- a/producer/dummy-event-detector-cpp/src/dummy_detector.cpp +++ b/producer/dummy-event-detector-cpp/src/dummy_detector.cpp @@ -7,8 +7,8 @@ #include <unistd.h> int DummyDetector::main(int argc, char **argv) { + auto producer = hidra2::Producer::create(); - std::unique_ptr<hidra2::Producer> producer = hidra2::Producer::create(); hidra2::ProducerError err = producer->connect_to_receiver("127.0.0.1:8099"); if(err) { std::cerr << "Fail to connect to receiver. ProducerError: " << err << std::endl; @@ -22,15 +22,14 @@ int DummyDetector::main(int argc, char **argv) { } */ - int fd = open("/mnt/ramdisk/bigfile", O_RDONLY); + int fd = open("/tmp/bigfile", O_RDONLY); struct stat astat {}; fstat(fd, &astat); - size_t map_size = static_cast<size_t>(ceil(float(astat.st_size)/float(getpagesize()))*getpagesize()); void *buffer = mmap(nullptr, map_size, PROT_READ, MAP_SHARED, fd, 0); - + madvise(buffer, map_size, MADV_SEQUENTIAL | MADV_WILLNEED); hidra2::ProducerError error; error = producer->send("testfile4", buffer, astat.st_size); @@ -38,7 +37,7 @@ int DummyDetector::main(int argc, char **argv) { if(error) { std::cerr << "File was not successfully deprecated_send, ErrorCode: " << error << std::endl; } else { - std::cout << "File was successfully deprecated_send." << std::endl; + std::cout << "File was successfully send." << std::endl; } munmap(buffer, map_size); diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index 6039279f4fe0ff7f86f26b4f4e0fb42b391ef2c3..1be529ac8b754487053c02956da4ac00e2ea96c6 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -18,7 +18,7 @@ set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX) ################################ # Testing ################################ -#set(TEST_SOURCE_FILES unittests/test_receiver.cpp) -#set(TEST_LIBRARIES producer-api) +set(TEST_SOURCE_FILES unittests/test_receiver.cpp) +set(TEST_LIBRARIES producer-api) -#gtest(${TARGET_NAME} ${TEST_SOURCE_FILES} ${TEST_LIBRARIES}) +gtest(${TARGET_NAME} ${TEST_SOURCE_FILES} ${TEST_LIBRARIES}) diff --git a/receiver/src/file_refernce_handler.cpp b/receiver/src/file_refernce_handler.cpp index cf6b4855b361e7d47bfb591dc0a7e6dec2fbe928..72200ef6946c6bf2d84868225e9ff95321d35c4c 100644 --- a/receiver/src/file_refernce_handler.cpp +++ b/receiver/src/file_refernce_handler.cpp @@ -14,7 +14,7 @@ hidra2::FileReferenceId FileReferenceHandler::add_file(std::string filename, FileReferenceId file_ref_id = ++kGlobalFileRefernceId; - std::string full_path = filename;//TODO + std::string full_path = filename;//TODO add path prefix, and check for exploit with '/' or '..' int fd = open(full_path.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666); if(fd == -1) { err = FILE_REFERENCE_HANDLER_ERR__OPEN_FAILED; diff --git a/receiver/src/main.cpp b/receiver/src/main.cpp index 4a1dfd1996fa3c089e7115e135292d5422bc3286..4205157ad4f3b64fc6b256b7f6670c3b9303a361 100644 --- a/receiver/src/main.cpp +++ b/receiver/src/main.cpp @@ -4,13 +4,19 @@ int main (int argc, char* argv[]) { auto* receiver = new hidra2::Receiver(); - receiver->start_listener("127.0.0.1", 8099); + hidra2::ReceiverError err; - std::cout << "started listener" << std::endl; + receiver->start_listener("127.0.0.1", 8099, &err); + if(err != hidra2::ReceiverError::NO_ERROR) { + std::cerr << "Fail to start receiver" << std::endl; + return 1; + } + std::cout << "start_listener" << std::endl; getchar(); - receiver->stop_listener(); + receiver->stop_listener(&err); + std::cout << "stop_listener" << std::endl; getchar(); return 0; diff --git a/receiver/src/network_producer_peer.cpp b/receiver/src/network_producer_peer.cpp index 17530d797129767f39b5b37eb89b40f4d23c05b9..3bdab2f75d11835111a8ee123d7a105badda0d15 100644 --- a/receiver/src/network_producer_peer.cpp +++ b/receiver/src/network_producer_peer.cpp @@ -1,14 +1,16 @@ #include <cstring> +#include <assert.h> #include "network_producer_peer.h" namespace hidra2 { FileReferenceHandler NetworkProducerPeer::file_reference_handler; +const size_t NetworkProducerPeer::kGenericBufferSize = 1024*50;//50KiByte std::atomic<uint32_t> NetworkProducerPeer::kNetworkProducerPeerCount; const std::vector<NetworkProducerPeer::RequestHandlerInformation> NetworkProducerPeer::kRequestHandlers = NetworkProducerPeer::init_request_handlers(); -NetworkProducerPeer::NetworkProducerPeer(int socket_fd, std::string address) { +NetworkProducerPeer::NetworkProducerPeer(int socket_fd, std::string address) : HasIO() { address_ = std::move(address); socket_fd_ = socket_fd; connection_id_ = kNetworkProducerPeerCount++; @@ -22,21 +24,26 @@ const std::string& NetworkProducerPeer::address() const { return address_; } -void NetworkProducerPeer::start_peer_receiver() { - if(receiver_thread_) +bool NetworkProducerPeer::is_listening() const { + return is_listening_; +} + +void NetworkProducerPeer::start_peer_listener() { + if(listener_thread_ || is_listening_) return; - receiver_thread_ = new std::thread([this] { + is_listening_ = true; + listener_thread_ = new std::thread([this] { internal_receiver_thread_(); }); } void NetworkProducerPeer::internal_receiver_thread_() { - /** Must be as large as the largest request type (not including the data) */ - auto* const generic_request = (GenericNetworkRequest*) malloc(1024*50); - auto* const generic_response = (GenericNetworkResponse*) malloc(1024*50); + auto* const generic_request = (GenericNetworkRequest*) malloc(kGenericBufferSize); + auto* const generic_response = (GenericNetworkResponse*) malloc(kGenericBufferSize); + IOErrors err; - while(true) { + while(is_listening_) { err = IOErrors::NO_ERROR; io->receive_timeout(socket_fd_, generic_request, sizeof(GenericNetworkRequest), 1, &err); @@ -47,11 +54,12 @@ void NetworkProducerPeer::internal_receiver_thread_() { } if(err == IOErrors::STREAM_EOF) { - io->deprecated_close(socket_fd_); - std::cout << "[" << connection_id() << "] Disconnected." << std::endl; + is_listening_ = false; break; } + std::cerr << "[" << connection_id() << "] Fail to receive data" << std::endl; + is_listening_ = false; break; } @@ -69,24 +77,21 @@ void NetworkProducerPeer::internal_receiver_thread_() { } } + io->deprecated_close(socket_fd_); + std::cout << "[" << connection_id() << "] Disconnected." << std::endl; + free(generic_request); free(generic_response); } -void NetworkProducerPeer::stop_peer_receiver() { - if(!receiver_thread_) +void NetworkProducerPeer::stop_peer_listener() { + is_listening_ = false; + if(!listener_thread_) return; - receiver_thread_->join(); - receiver_thread_ = nullptr; -} - -void NetworkProducerPeer::disconnect() { - stop_peer_receiver(); - - io->deprecated_close(socket_fd_); - - std::cout << "[" << connection_id() << "] Disconnected." << std::endl; + listener_thread_->join(); + delete listener_thread_; + listener_thread_ = nullptr; } size_t NetworkProducerPeer::handle_generic_request_(GenericNetworkRequest* request, GenericNetworkResponse* response) { @@ -101,6 +106,9 @@ size_t NetworkProducerPeer::handle_generic_request_(GenericNetworkRequest* reque auto handler_information = kRequestHandlers[request->op_code]; + assert(handler_information.request_size <= kGenericBufferSize);//Would overwrite memory + assert(handler_information.response_size <= kGenericBufferSize);//Would overwrite memory + IOErrors err; //receive the rest of the message io->receive_timeout(socket_fd_, request->data, handler_information.request_size - sizeof(GenericNetworkRequest), 30, &err); @@ -114,5 +122,9 @@ size_t NetworkProducerPeer::handle_generic_request_(GenericNetworkRequest* reque return handler_information.response_size; } +NetworkProducerPeer::~NetworkProducerPeer() { + stop_peer_listener(); +} + } diff --git a/receiver/src/network_producer_peer.h b/receiver/src/network_producer_peer.h index 1a46e4134d4f2a793d0fb7ff81105da5e8b38f99..5ce160cf640a6de05a94046b90ac1acae8f0c480 100644 --- a/receiver/src/network_producer_peer.h +++ b/receiver/src/network_producer_peer.h @@ -23,6 +23,9 @@ class NetworkProducerPeer : HasIO { RequestHandler handler; }; private: + /** Must be as large as the largest request type (not including the data) */ + static const size_t kGenericBufferSize; + static FileReferenceHandler file_reference_handler; static const std::vector<RequestHandlerInformation> kRequestHandlers; static std::atomic<uint32_t> kNetworkProducerPeerCount; @@ -31,9 +34,10 @@ class NetworkProducerPeer : HasIO { int socket_fd_; std::string address_; - bool got_hello_ = false; + bool got_hello_ = false; - std::thread* receiver_thread_ = nullptr; + bool is_listening_ = false; + std::thread* listener_thread_ = nullptr; void internal_receiver_thread_(); size_t handle_generic_request_(GenericNetworkRequest* request, GenericNetworkResponse* response); @@ -43,17 +47,21 @@ class NetworkProducerPeer : HasIO { static void handle_send_data_chunk_request_(NetworkProducerPeer* self, const SendDataChunkRequest* request, SendDataChunkResponse* response); public: + NetworkProducerPeer &operator=(const NetworkProducerPeer &) = delete; + NetworkProducerPeer() = default; + NetworkProducerPeer(int socket_fd, std::string address); + ~NetworkProducerPeer(); static const std::vector<RequestHandlerInformation> init_request_handlers(); - uint32_t connection_id() const; - void start_peer_receiver(); - void stop_peer_receiver(); + void start_peer_listener(); + void stop_peer_listener(); + uint32_t connection_id() const; const std::string& address() const; - void disconnect(); + bool is_listening() const; }; } diff --git a/receiver/src/network_producer_peer_handlers.cpp b/receiver/src/network_producer_peer_handlers.cpp index 225ff490c76f9d716a658349734e71e548f224f2..6abf14f704b05a0437bf2004a317e7be5adde1ea 100644 --- a/receiver/src/network_producer_peer_handlers.cpp +++ b/receiver/src/network_producer_peer_handlers.cpp @@ -78,14 +78,6 @@ void NetworkProducerPeer::handle_send_data_chunk_request_(NetworkProducerPeer* s const SendDataChunkRequest* request, SendDataChunkResponse* response) { IOErrors io_error; - /* - std::cout << "[CHUNK]op_code " << request->op_code << std::endl; - std::cout << "[CHUNK]request_id " << request->request_id << std::endl; - - std::cout << "[CHUNK]file_reference_id " << request->file_reference_id << std::endl; - std::cout << "[CHUNK]start_byte " << request->start_byte << std::endl; - std::cout << "[CHUNK]chunk_size " << request->chunk_size << std::endl; - */ auto file_info = self->file_reference_handler.get_file(request->file_reference_id); if(file_info == nullptr || file_info->owner != self->connection_id()) { @@ -117,7 +109,7 @@ void NetworkProducerPeer::handle_send_data_chunk_request_(NetworkProducerPeer* s return; } - self->io->receive_timeout(self->socket_fd_, mapped_file + map_offset, request->chunk_size, 30, &io_error); + self->io->receive_timeout(self->socket_fd_, (uint8_t*)mapped_file + map_offset, request->chunk_size, 30, &io_error); if(io_error != IOErrors::NO_ERROR) { std::cerr << "Fail to receive all the chunk data." << std::endl; response->error_code = NET_ERR__INTERNAL_SERVER_ERROR; diff --git a/receiver/src/receiver.cpp b/receiver/src/receiver.cpp index 9cb79b78009e06bc0ef6b083fe3b63d2cb870e07..74256e0ed9c5f502bf802c989ae7deac628c7fa5 100644 --- a/receiver/src/receiver.cpp +++ b/receiver/src/receiver.cpp @@ -1,73 +1,82 @@ -#include <sys/socket.h> #include <cstring> #include <unistd.h> #include <arpa/inet.h> #include <iostream> -#include <common/networking.h> #include "receiver.h" #include "network_producer_peer.h" const int hidra2::Receiver::kMaxUnacceptedConnectionsBacklog = 5; -void hidra2::Receiver::start_listener(std::string listener_address, uint16_t port) { +void hidra2::Receiver::start_listener(std::string listener_address, uint16_t port, ReceiverError* err) { + *err = ReceiverError::NO_ERROR; + if(listener_running_) { - return;//Already listening + *err = ReceiverError::ALREADY_LISTEING; + return; } listener_running_ = true; - IOErrors err; + IOErrors io_error; - FileDescriptor listener_fd = io->create_socket(AddressFamilies::INET, SocketTypes::STREAM, SocketProtocols::IP, &err); - if(err != IOErrors::NO_ERROR) { + FileDescriptor listener_fd = io->create_socket(AddressFamilies::INET, SocketTypes::STREAM, SocketProtocols::IP, &io_error); + if(io_error != IOErrors::NO_ERROR) { std::cerr << "Fail to create socket" << std::endl; + return; } - io->inet_bind(listener_fd, listener_address, port, &err); - if(err != IOErrors::NO_ERROR) { + io->inet_bind(listener_fd, listener_address, port, &io_error); + if(io_error != IOErrors::NO_ERROR) { io->deprecated_close(listener_fd); std::cerr << "Fail to bind socket" << std::endl; + return; } - io->listen(listener_fd, kMaxUnacceptedConnectionsBacklog, &err); - if(err != IOErrors::NO_ERROR) { + io->listen(listener_fd, kMaxUnacceptedConnectionsBacklog, &io_error); + if(io_error != IOErrors::NO_ERROR) { io->deprecated_close(listener_fd); std::cerr << "Fail to start listen" << std::endl; + return; } listener_fd_ = listener_fd; - listener_thread_ = new std::thread([this] { - socklen_t sockaddr_in_size = sizeof(sockaddr_in); - while(listener_running_) { - std::string address; - FileDescriptor peer_fd; - - IOErrors err; - auto client_info_tuple = io->inet_accept(listener_fd_, &err); - if(err != IOErrors::NO_ERROR) { - std::cerr << "An error occurred while accepting an incoming connection" << std::endl; - return; - } - std::tie(address, peer_fd) = *client_info_tuple; - - on_new_peer(peer_fd, address); - } + accept_thread_ = new std::thread([this] { + accept_thread_logic_();//TODO add peer to some list }); } -void hidra2::Receiver::stop_listener() { +void hidra2::Receiver::accept_thread_logic_() { + while(listener_running_) { + std::string address; + FileDescriptor peer_fd; + + IOErrors io_error; + auto client_info_tuple = io->inet_accept(listener_fd_, &io_error); + if(io_error != IOErrors::NO_ERROR) { + std::cerr << "An error occurred while accepting an incoming connection" << std::endl; + return; + } + std::tie(address, peer_fd) = *client_info_tuple; + + peer_list_.push_back(on_new_peer_(peer_fd, address));//TODO remove client when disconnect + } +} + +void hidra2::Receiver::stop_listener(ReceiverError* err) { close(listener_fd_); listener_running_ = false; - if(listener_thread_) - listener_thread_->join(); - listener_thread_ = nullptr; + if(accept_thread_) + accept_thread_->join(); + accept_thread_ = nullptr; } -void hidra2::Receiver::on_new_peer(int peer_socket_fd, std::string address) { - NetworkProducerPeer peer(peer_socket_fd, address); +std::unique_ptr<hidra2::NetworkProducerPeer> hidra2::Receiver::on_new_peer_(int peer_socket_fd, std::string address) { + NetworkProducerPeer* peer = new NetworkProducerPeer(peer_socket_fd, address); + + std::cout << "[" << peer->connection_id() << "] New connection from " << address << std::endl; - std::cout << "[" << peer.connection_id() << "] New connection from " << address << std::endl; + peer->start_peer_listener(); - peer.start_peer_receiver(); + return std::unique_ptr<hidra2::NetworkProducerPeer>(peer); } diff --git a/receiver/src/receiver.h b/receiver/src/receiver.h index 90d2fec1dd2ebe40d1912733ff0f9352b5ab1991..892f3ca872f1d698ff12a084dc890a8bea5c6f4b 100644 --- a/receiver/src/receiver.h +++ b/receiver/src/receiver.h @@ -5,16 +5,15 @@ #include <netinet/in.h> #include <thread> #include <system_wrappers/has_io.h> +#include "network_producer_peer.h" +#include <list> namespace hidra2 { -enum { - RECEIVER__ERR_OK, - RECEIVER__ERR_UNK, - RECEIVER__ERR_ADDRESS_IS_PROTECTED, - RECEIVER__ERR_BAD_FILE_DESCRIPTOR, - RECEIVER__ERR_ADDRESS_ALREADY_IN_USE, - +enum class ReceiverError { + NO_ERROR, + ALREADY_LISTEING, + FAILD_CREATING_SOCKET, }; class Receiver : HasIO { @@ -22,16 +21,19 @@ class Receiver : HasIO { static const int kMaxUnacceptedConnectionsBacklog; bool listener_running_ = false; - std::thread* listener_thread_ = nullptr; FileDescriptor listener_fd_; + std::thread* accept_thread_ = nullptr; - void on_new_peer(int peer_socket_fd, std::string address); + void accept_thread_logic_(); + std::list<std::unique_ptr<NetworkProducerPeer>> peer_list_; + std::unique_ptr<NetworkProducerPeer> on_new_peer_(int peer_socket_fd, std::string address); public: Receiver(const Receiver &) = delete; Receiver &operator=(const Receiver &) = delete; Receiver() = default; - void start_listener(std::string listener_address, uint16_t port); - void stop_listener(); + + void start_listener(std::string listener_address, uint16_t port, ReceiverError* err); + void stop_listener(ReceiverError* err); }; }