diff --git a/common/cpp/CMakeLists.txt b/common/cpp/CMakeLists.txt index 5b5436a303e421650f044d553d6b62968defd3ba..80b3eb66beee46eaa067cc047d8feba93749d0fc 100644 --- a/common/cpp/CMakeLists.txt +++ b/common/cpp/CMakeLists.txt @@ -1,11 +1,8 @@ set(TARGET_NAME common) set(SOURCE_FILES - include/common/os.h - include/common/networking.h - include/system_wrappers/io.h - include/system_wrappers/has_io.h src/has_io.cpp - include/system_wrappers/io_utils.h src/io_utils.cpp - include/system_wrappers/system_io.h src/system_io.cpp + src/has_io.cpp + src/io_utils.cpp + src/system_io.cpp ) diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index 5337357cd5acf6384558ea895b07f4467b1986ae..470c55acde0d500e7629f50bcbd8d0ec001f743b 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -6,6 +6,7 @@ namespace hidra2 { typedef uint64_t FileReferenceId; +typedef uint64_t NetworkRequestId; enum OpCode : uint8_t { OP_CODE__HELLO, @@ -20,6 +21,7 @@ enum NetworkErrorCode : uint16_t { NET_ERR__UNSUPPORTED_VERSION, NET_ERR__FILENAME_ALREADY_IN_USE, NET_ERR__UNKNOWN_REFERENCE_ID, + NET_ERR__ALLOCATE_STORAGE_FAILED, NET_ERR__INTERNAL_SERVER_ERROR = 65535, }; @@ -29,14 +31,14 @@ enum NetworkErrorCode : uint16_t { * @{ */ struct GenericNetworkRequest { - OpCode op_code; - uint64_t request_id; - char data[]; + OpCode op_code; + NetworkRequestId request_id; + char data[]; }; struct GenericNetworkResponse { OpCode op_code; - uint64_t request_id; + NetworkRequestId request_id; NetworkErrorCode error_code; char data[]; }; diff --git a/common/cpp/include/system_wrappers/has_io.h b/common/cpp/include/system_wrappers/has_io.h index 95c09f51a5f63e1bdafb987ca2fb447e298d763f..47eb5466771944a4c56a4be13f344e45648736fe 100644 --- a/common/cpp/include/system_wrappers/has_io.h +++ b/common/cpp/include/system_wrappers/has_io.h @@ -2,18 +2,20 @@ #define HIDRA2_SYSTEM_WRAPPERS__HAS_IO_H #include "io.h" +#include "io_utils.h" + namespace hidra2 { class HasIO { protected: static IO* const kDefaultIO; - static IOUtils* const kDefaultIOUtils; IO* io; IOUtils* io_utils; HasIO(); public: + void __set_io(IO* io); IO* __get_io(); diff --git a/common/cpp/include/system_wrappers/io.h b/common/cpp/include/system_wrappers/io.h index 8220e5453ef8fcfe8345376ed70b2356cefe9239..fbcada18bc031aa1cc9cd2b4a53eb3ec2069dc38 100644 --- a/common/cpp/include/system_wrappers/io.h +++ b/common/cpp/include/system_wrappers/io.h @@ -3,7 +3,6 @@ #include <unistd.h> #include <sys/socket.h> -#include "io_utils.h" namespace hidra2 { diff --git a/common/cpp/include/system_wrappers/io_utils.h b/common/cpp/include/system_wrappers/io_utils.h index f5d709bdc63034f3626f81bf472ae7ccafac61be..65cdc18ed944fcbae17806c1cf6af3d8669555f8 100644 --- a/common/cpp/include/system_wrappers/io_utils.h +++ b/common/cpp/include/system_wrappers/io_utils.h @@ -1,11 +1,19 @@ #ifndef HIDRA2_SYSTEM_WRAPPERS__SYSTEM_UTIL_H #define HIDRA2_SYSTEM_WRAPPERS__SYSTEM_UTIL_H +#include "io.h" + namespace hidra2 { class IOUtils { + private: + IO** io_; public: + explicit IOUtils(IO** io); + bool is_valid_fd(int fd); + ssize_t send_in_steps(int __fd, const void* __buf, size_t __n, int __flags); + ssize_t recv_in_steps(int __fd, void* __buf, size_t __n, int __flags); }; } diff --git a/common/cpp/src/has_io.cpp b/common/cpp/src/has_io.cpp index 001e7ea604bf3b4aef79c2913b6fe223d1d2b62a..a4170df9c31630438f47d76857db2d0954085980 100644 --- a/common/cpp/src/has_io.cpp +++ b/common/cpp/src/has_io.cpp @@ -5,7 +5,7 @@ hidra2::IO* const hidra2::HasIO::kDefaultIO = new hidra2::SystemIO(); hidra2::HasIO::HasIO() { io = kDefaultIO; - //utils = new IOUtils(); + io_utils = new hidra2::IOUtils(&io); } void hidra2::HasIO::__set_io(hidra2::IO* io) { @@ -21,5 +21,5 @@ void hidra2::HasIO::__set_io_utils(hidra2::IOUtils* io_utils) { } hidra2::IOUtils* hidra2::HasIO::__get_io_utils() { - return this->io_utils; + return io_utils; } diff --git a/common/cpp/src/io_utils.cpp b/common/cpp/src/io_utils.cpp index 6b8adb0228549a74bee450c94cb59680dd04f6cc..e62834d822b05c3f181bf35c726e9a7699950bfb 100644 --- a/common/cpp/src/io_utils.cpp +++ b/common/cpp/src/io_utils.cpp @@ -3,6 +3,37 @@ #include "system_wrappers/io_utils.h" + +hidra2::IOUtils::IOUtils(hidra2::IO** io) { + io_ = io; +} + bool hidra2::IOUtils::is_valid_fd(int fd) { return fcntl(fd, F_GETFD) != -1 || errno != EBADF; } + +ssize_t hidra2::IOUtils::send_in_steps(int __fd, const void* __buf, size_t __n, int __flags) { + size_t already_send = 0; + while(already_send < __n) { + ssize_t send_amount = (*io_)->send(__fd, __buf + already_send, __n - already_send, __flags); + if(send_amount <= 0) { + return send_amount; + } + already_send += send_amount; + } + return already_send; +} + +ssize_t hidra2::IOUtils::recv_in_steps(int __fd, void* __buf, size_t __n, int __flags) { + size_t already_received = 0; + + while(already_received < __n) { + ssize_t recv_amount = (*io_)->recv(__fd, __buf + already_received, __n - already_received, __flags); + if(recv_amount <= 0) { + return recv_amount; + } + already_received += recv_amount; + } + + return already_received; +} diff --git a/producer/api/CMakeLists.txt b/producer/api/CMakeLists.txt index 95127f2c1a92cc8feb4bd89b065fac236ceefe20..aac79920b6993963c51d9e4700cec904570044dd 100644 --- a/producer/api/CMakeLists.txt +++ b/producer/api/CMakeLists.txt @@ -1,5 +1,8 @@ set(TARGET_NAME producer-api) -set(SOURCE_FILES src/producer.cpp include/producer/producer.h src/producer_impl.cpp src/producer_impl.h) +set(SOURCE_FILES + src/producer.cpp + src/producer_impl.h src/producer_impl.cpp + ) ################################ diff --git a/producer/api/include/producer/producer.h b/producer/api/include/producer/producer.h index f085a006731d361752cf35357aae3dbdbe3f7689..f336de1edb19d14999f357b9f57381dba2109c79 100644 --- a/producer/api/include/producer/producer.h +++ b/producer/api/include/producer/producer.h @@ -1,5 +1,5 @@ -#ifndef HIDRA2__PRODUCER_PRODUCER_H -#define HIDRA2__PRODUCER_PRODUCER_H +#ifndef HIDRA2_PRODUCER__PRODUCER_H +#define HIDRA2_PRODUCER__PRODUCER_H #include <string> #include <memory> @@ -12,7 +12,9 @@ enum ProducerError { PRODUCER_ERROR__OK, PRODUCER_ERROR__ALREADY_CONNECTED, PRODUCER_ERROR__CONNECTION_NOT_READY, - PRODUCER_ERROR__CHUNK_PROVIDER_NOT_READY_AT_START, + PRODUCER_ERROR__SENDING_CHUNK_FAILED, + PRODUCER_ERROR__RECEIVING_SERVER_RESPONSE_FAILED, + PRODUCER_ERROR__SERVER_REPORTED_AN_ERROR, }; enum ProducerStatus { @@ -43,4 +45,4 @@ class Producer : public HasIO { }; } -#endif //HIDRA2__PRODUCER_PRODUCER_H +#endif //HIDRA2_PRODUCER__PRODUCER_H diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp index 2928689a3c7d8bed4f642da0918e05c44bf0412d..350f629eda12003fefa6e3f1ddaecc6c70232229 100644 --- a/producer/api/src/producer_impl.cpp +++ b/producer/api/src/producer_impl.cpp @@ -41,11 +41,11 @@ hidra2::ProducerError hidra2::ProducerImpl::connect_to_receiver(std::string rece helloRequest.os = (OSType)4; helloRequest.is_x64 = true; - io->send(client_fd_, &helloRequest, sizeof(helloRequest), 0); + io_utils->send_in_steps(client_fd_, &helloRequest, sizeof(helloRequest), 0); HelloResponse helloResponse; - io->recv(client_fd_, &helloResponse, sizeof(helloResponse), 0); + io_utils->recv_in_steps(client_fd_, &helloResponse, sizeof(helloResponse), 0); std::cout << "op_code: " << helloResponse.op_code << std::endl; std::cout << "request_id: " << helloResponse.request_id << std::endl; @@ -57,64 +57,11 @@ hidra2::ProducerError hidra2::ProducerImpl::connect_to_receiver(std::string rece return PRODUCER_ERROR__OK; } - std::cerr << "Fail to connect to server. Server response error code: " << helloResponse.error_code << std::endl; + std::cerr << "Fail to connect to server. NetErrorCode: " << helloResponse.error_code << std::endl; return PRODUCER_ERROR__OK; } -//TODO not our code. need to be removed. Copy&Pasted from stackoverflow -void hexDump (char *desc, void *addr, int len) { - int i; - unsigned char buff[17]; - unsigned char *pc = (unsigned char*)addr; - - // Output description if given. - if (desc != NULL) - printf ("%s:\n", desc); - - if (len == 0) { - printf(" ZERO LENGTH\n"); - return; - } - if (len < 0) { - printf(" NEGATIVE LENGTH: %i\n",len); - return; - } - - // Process every byte in the data. - for (i = 0; i < len; i++) { - // Multiple of 16 means new line (with line offset). - - if ((i % 16) == 0) { - // Just don't print ASCII for the zeroth line. - if (i != 0) - printf (" %s\n", buff); - - // Output the offset. - printf (" %04x ", i); - } - - // Now the hex code for the specific character. - printf (" %02x", pc[i]); - - // And store a printable ASCII character for later. - if ((pc[i] < 0x20) || (pc[i] > 0x7e)) - buff[i % 16] = '.'; - else - buff[i % 16] = pc[i]; - buff[(i % 16) + 1] = '\0'; - } - - // Pad out last line if not exactly 16 characters. - while ((i % 16) != 0) { - printf (" "); - i++; - } - - // And print the final ASCII bit. - printf (" %s\n", buff); -} - hidra2::ProducerError hidra2::ProducerImpl::send(std::string filename, void *data, uint64_t file_size) { hidra2::PrepareSendDataRequest prepareSendDataRequest; prepareSendDataRequest.op_code = OP_CODE__PREPARE_SEND_DATA; @@ -122,42 +69,67 @@ hidra2::ProducerError hidra2::ProducerImpl::send(std::string filename, void *dat prepareSendDataRequest.file_size = file_size; filename.copy((char*)&prepareSendDataRequest.filename, sizeof(prepareSendDataRequest.filename)); - //TODO Loop std::cout << "Send file: " << filename << std::endl; - io->send(client_fd_, &prepareSendDataRequest, sizeof(prepareSendDataRequest), 0); + io_utils->send_in_steps(client_fd_, &prepareSendDataRequest, sizeof(prepareSendDataRequest), 0); hidra2::PrepareSendDataResponse prepareSendDataResponse; - io->recv(client_fd_, &prepareSendDataResponse, sizeof(prepareSendDataResponse), 0); + io_utils->recv_in_steps(client_fd_, &prepareSendDataResponse, sizeof(prepareSendDataResponse), 0); + if(prepareSendDataResponse.error_code) { + std::cerr << "Server rejected metadata. NetErrorCode: " << prepareSendDataResponse.error_code << std::endl; + return PRODUCER_ERROR__SERVER_REPORTED_AN_ERROR; + } std::cout << "op_code: " << prepareSendDataResponse.op_code << std::endl; std::cout << "request_id: " << prepareSendDataResponse.request_id << std::endl; std::cout << "error_code: " << prepareSendDataResponse.error_code << std::endl; std::cout << "file_reference_id: " << prepareSendDataResponse.file_reference_id << std::endl; - hidra2::SendDataChunkRequest sendDataChunkRequest; - sendDataChunkRequest.op_code = OP_CODE__SEND_DATA_CHUNK; - sendDataChunkRequest.request_id = request_id++; - sendDataChunkRequest.start_byte = 0; - sendDataChunkRequest.chunk_size = file_size; - sendDataChunkRequest.file_reference_id = prepareSendDataResponse.file_reference_id; + NetworkErrorCode network_error = NET_ERR__NO_ERROR; + size_t already_send = 0; + uint64_t max_chunk_size = static_cast<uint64_t>(1024*1024*1024*2); - io->send(client_fd_, &sendDataChunkRequest, sizeof(sendDataChunkRequest), 0); + while(!network_error && already_send < file_size) { + size_t need_to_send = max_chunk_size; + if(double(file_size) - already_send - max_chunk_size < 0) { + need_to_send = file_size - already_send; + } - hexDump("send", data, file_size); + hidra2::SendDataChunkRequest sendDataChunkRequest; + sendDataChunkRequest.op_code = OP_CODE__SEND_DATA_CHUNK; + sendDataChunkRequest.request_id = request_id++; + sendDataChunkRequest.start_byte = already_send; + sendDataChunkRequest.chunk_size = need_to_send; + sendDataChunkRequest.file_reference_id = prepareSendDataResponse.file_reference_id; - io->send(client_fd_, data, file_size, 0); + if(io_utils->send_in_steps(client_fd_, &sendDataChunkRequest, sizeof(sendDataChunkRequest), 0) == -1) { + std::cerr << "Fail to send chunk metadata. errno: " << errno << std::endl; + return PRODUCER_ERROR__SENDING_CHUNK_FAILED; + } + if(io_utils->send_in_steps(client_fd_, data + already_send, need_to_send, 0) == -1) { + std::cerr << "Fail to send chunk data. errno: " << errno << std::endl; + return PRODUCER_ERROR__SENDING_CHUNK_FAILED; + } - hidra2::SendDataChunkResponse sendDataChunkResponse; + already_send += need_to_send; - io->recv(client_fd_, &sendDataChunkResponse, sizeof(sendDataChunkResponse), 0); + hidra2::SendDataChunkResponse sendDataChunkResponse; - std::cout << "op_code: " << sendDataChunkResponse.op_code << std::endl; - std::cout << "request_id: " << sendDataChunkResponse.request_id << std::endl; - std::cout << "error_code: " << sendDataChunkResponse.error_code << std::endl; + if(io_utils->recv_in_steps(client_fd_, &sendDataChunkResponse, sizeof(sendDataChunkResponse), 0) == -1) { + std::cout << "Failed to receive servers response. errno: " << errno << std::endl; + return PRODUCER_ERROR__RECEIVING_SERVER_RESPONSE_FAILED; + } + + if(sendDataChunkResponse.error_code) { + std::cout << "Server reported an error. NetErrorCode: " << sendDataChunkResponse.error_code; + return PRODUCER_ERROR__SERVER_REPORTED_AN_ERROR; + } + } + + return PRODUCER_ERROR__OK; } diff --git a/producer/api/src/producer_impl.h b/producer/api/src/producer_impl.h index 21ada2860e4eb774ded245a1b395e020b1bd3c66..ceb6736cb22d6ad1955291ce4d3ec62886ef66cf 100644 --- a/producer/api/src/producer_impl.h +++ b/producer/api/src/producer_impl.h @@ -1,5 +1,5 @@ -#ifndef HIDRA2_PRODUCER__PRODUCERIMPL_H -#define HIDRA2_PRODUCER__PRODUCERIMPL_H +#ifndef HIDRA2_PRODUCER__PRODUCER_IMPL_H +#define HIDRA2_PRODUCER__PRODUCER_IMPL_H #include <string> #include <system_wrappers/has_io.h> @@ -27,4 +27,4 @@ class ProducerImpl : public Producer { }; } -#endif //HIDRA2_PRODUCER__PRODUCERIMPL_H +#endif //HIDRA2_PRODUCER__PRODUCER_IMPL_H diff --git a/producer/dummy-event-detector-cpp/src/dummy_detector.cpp b/producer/dummy-event-detector-cpp/src/dummy_detector.cpp index 2a221cd8ac6697e2ac357d1a2b70093b369e2486..363d382a94e3ed2669cbb54d9319e56054dd269f 100644 --- a/producer/dummy-event-detector-cpp/src/dummy_detector.cpp +++ b/producer/dummy-event-detector-cpp/src/dummy_detector.cpp @@ -17,7 +17,7 @@ int DummyDetector::main(int argc, char **argv) { } */ - int fd = open("/tmp/Test.png", O_RDONLY); + int fd = open("/mnt/ramdisk/bigfile", O_RDONLY); struct stat astat{}; fstat(fd, &astat); @@ -25,8 +25,10 @@ int DummyDetector::main(int argc, char **argv) { size_t map_size = static_cast<size_t>(ceil(float(astat.st_size)/float(getpagesize()))*getpagesize()); void *buffer = mmap(nullptr, map_size, PROT_READ, MAP_SHARED, fd, 0); + + hidra2::ProducerError error; - error = producer->send("testfile", buffer, astat.st_size); + error = producer->send("testfile4", buffer, astat.st_size); if(error) { std::cerr << "File was not successfully send, ErrorCode: " << error << std::endl; @@ -34,8 +36,7 @@ int DummyDetector::main(int argc, char **argv) { std::cout << "File was successfully send." << std::endl; } - free(buffer); - /* - */ + munmap(buffer, map_size); + return 0; } diff --git a/receiver/src/file_refernce_handler.cpp b/receiver/src/file_refernce_handler.cpp index c5bd727962faa71bc02a835aeb482841115bd350..a62e840c31d36f46b43f20a6ae2e287874443520 100644 --- a/receiver/src/file_refernce_handler.cpp +++ b/receiver/src/file_refernce_handler.cpp @@ -14,24 +14,18 @@ hidra2::FileReferenceId FileReferenceHandler::add_file(std::string filename, FileReferenceId file_ref_id = ++kGlobalFileRefernceId; - int fd = open(filename.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666); + std::string full_path = filename;//TODO + int fd = open(full_path.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666); if(fd == -1) { err = FILE_REFERENCE_HANDLER_ERR__OPEN_FAILED; return 0; } -/* - sync(); - __off_t offset = lseek(fd, file_size, SEEK_SET); - if(offset != file_size) { - err = FILE_REFERENCE_HANDLER_ERR__LSEEK_FAILED; + if(fallocate(fd, 0, 0, file_size) == -1) { + err = FILE_REFERENCE_HANDLER_ERR__ALLOCATE_STORAGE_FAILED; return 0; } - sync(); -*/ - fallocate(fd, 0, 0, file_size); - auto file_info = new FileInformation(); file_info->filename = filename; file_info->file_size = file_size; diff --git a/receiver/src/file_refernce_handler.h b/receiver/src/file_refernce_handler.h index 0c00deabb97f03ae6d53e6b9d10628eabd0ae885..662faa2adb7b7a9f56462aabe56473ccb634fe01 100644 --- a/receiver/src/file_refernce_handler.h +++ b/receiver/src/file_refernce_handler.h @@ -12,7 +12,7 @@ namespace hidra2 { enum FileReferenceHandlerError { FILE_REFERENCE_HANDLER_ERR__OK, FILE_REFERENCE_HANDLER_ERR__OPEN_FAILED, - FILE_REFERENCE_HANDLER_ERR__LSEEK_FAILED, + FILE_REFERENCE_HANDLER_ERR__ALLOCATE_STORAGE_FAILED, }; class FileReferenceHandler : HasIO { diff --git a/receiver/src/main.cpp b/receiver/src/main.cpp index c19d80cb5aae4c73555ec524f150c5bb3d9b46dc..f1d1165713063b37d817136122a949578cb03af9 100644 --- a/receiver/src/main.cpp +++ b/receiver/src/main.cpp @@ -1,7 +1,7 @@ #include "receiver.h" int main (int argc, char* argv[]) { - hidra2::Receiver* receiver = new hidra2::Receiver(); + auto* receiver = new hidra2::Receiver(); receiver->start_listener("127.0.0.1", 8099); diff --git a/receiver/src/network_producer_peer.cpp b/receiver/src/network_producer_peer.cpp index 13adee0df097c0ab43fa4eaddb9e166f6140e4f1..74351701a9ec29cfd05c5f52825b9c9b8ebfb764 100644 --- a/receiver/src/network_producer_peer.cpp +++ b/receiver/src/network_producer_peer.cpp @@ -53,9 +53,9 @@ void NetworkProducerPeer::internal_receiver_thread_() { break; } - if(io->recv(socket_fd_, generic_request, sizeof(GenericNetworkRequest), 0) <= 0) { + if(io_utils->recv_in_steps(socket_fd_, generic_request, sizeof(GenericNetworkRequest), 0) <= 0) { //Disconnect - close(socket_fd_); + io->close(socket_fd_); std::cout << "[" << connection_id() << "] Disconnected." << std::endl; break; } @@ -65,7 +65,7 @@ void NetworkProducerPeer::internal_receiver_thread_() { if(bytes_to_send == 0) { continue; } - send(socket_fd_, generic_response, bytes_to_send, 0); + io_utils->send_in_steps(socket_fd_, generic_response, bytes_to_send, 0); } free(generic_request); @@ -91,6 +91,7 @@ void NetworkProducerPeer::disconnect() { size_t NetworkProducerPeer::handle_generic_request_(GenericNetworkRequest* request, GenericNetworkResponse* response) { if(request->op_code >= OP_CODE_COUNT || request->op_code < 0) { std::cerr << "[" << connection_id() << "] Error invalid op_code: " << request->op_code << std::endl; + close(socket_fd_); return 0; } @@ -100,7 +101,7 @@ size_t NetworkProducerPeer::handle_generic_request_(GenericNetworkRequest* reque auto handler_information = kRequestHandlers[request->op_code]; //receive the rest of the message - recv(socket_fd_, request->data, handler_information.request_size - sizeof(GenericNetworkRequest), 0); + io_utils->recv_in_steps(socket_fd_, request->data, handler_information.request_size - sizeof(GenericNetworkRequest), 0); handler_information.handler(this, request, response); diff --git a/receiver/src/network_producer_peer.h b/receiver/src/network_producer_peer.h index f581821a005b3554e3cc29e8bea9795c353e25d5..479a9c83b118def12b027759b74d78a96198e93a 100644 --- a/receiver/src/network_producer_peer.h +++ b/receiver/src/network_producer_peer.h @@ -31,6 +31,8 @@ class NetworkProducerPeer : HasIO { int socket_fd_; std::string address_; + bool got_hello_ = false; + std::thread* receiver_thread_ = nullptr; void internal_receiver_thread_(); diff --git a/receiver/src/network_producer_peer_handlers.cpp b/receiver/src/network_producer_peer_handlers.cpp index 3ed9f9b23f381abdd950cc4526e1cb1d65e18d0b..f7f8bde8b67d579e3685344fceeff39da6d812ba 100644 --- a/receiver/src/network_producer_peer_handlers.cpp +++ b/receiver/src/network_producer_peer_handlers.cpp @@ -33,6 +33,14 @@ const std::vector<NetworkProducerPeer::RequestHandlerInformation> NetworkProduce void NetworkProducerPeer::handle_hello_request_(NetworkProducerPeer* self, const HelloRequest* request, HelloResponse* response) { + + if(self->got_hello_) { + std::cerr << "Client send hello twice." << std::endl; + self->io->close(self->socket_fd_); + return; + } + self->got_hello_ = true; + std::cout << "op_code " << request->op_code << std::endl; std::cout << "request_id " << request->request_id << std::endl; @@ -46,24 +54,21 @@ void NetworkProducerPeer::handle_hello_request_(NetworkProducerPeer* self, const void NetworkProducerPeer::handle_prepare_send_data_request_(NetworkProducerPeer* self, const PrepareSendDataRequest* request, PrepareSendDataResponse* response) { - std::cout << "[PRE]op_code " << request->op_code << std::endl; - std::cout << "[PRE]request_id " << request->request_id << std::endl; - - std::cout << "[PRE]filename " << request->filename << std::endl; - std::cout << "[PRE]file_size " << request->file_size << std::endl; - - FileReferenceHandlerError error; + FileReferenceHandlerError error = FILE_REFERENCE_HANDLER_ERR__OK; FileReferenceId reference_id = self->file_reference_handler.add_file(request->filename, request->file_size, self->connection_id(), error); if(reference_id == 0 || error) { - response->error_code = NET_ERR__INTERNAL_SERVER_ERROR; + std::cerr << "Failed to add_file. FileReferenceHandlerError: " << error << std::endl; + response->error_code = NET_ERR__ALLOCATE_STORAGE_FAILED; response->file_reference_id = 0; return; } + std::cout << "Created new file '" << request->filename << "' of size " << request->file_size << std::endl; + response->error_code = NET_ERR__NO_ERROR; response->file_reference_id = reference_id; } @@ -71,13 +76,14 @@ void NetworkProducerPeer::handle_prepare_send_data_request_(NetworkProducerPeer* void NetworkProducerPeer::handle_send_data_chunk_request_(NetworkProducerPeer* self, const SendDataChunkRequest* request, SendDataChunkResponse* response) { + /* std::cout << "[CHUNK]op_code " << request->op_code << std::endl; std::cout << "[CHUNK]request_id " << request->request_id << std::endl; std::cout << "[CHUNK]file_reference_id " << request->file_reference_id << std::endl; std::cout << "[CHUNK]start_byte " << request->start_byte << std::endl; std::cout << "[CHUNK]chunk_size " << request->chunk_size << std::endl; - + */ auto file_info = self->file_reference_handler.get_file(request->file_reference_id); if(file_info == nullptr || file_info->owner != self->connection_id()) { @@ -85,23 +91,32 @@ void NetworkProducerPeer::handle_send_data_chunk_request_(NetworkProducerPeer* s return; } - size_t map_size = static_cast<size_t>(ceil(float(request->chunk_size)/float(getpagesize()))*getpagesize()); + // Round to the next full pagesize + size_t map_start = size_t(request->start_byte/getpagesize())*getpagesize(); + size_t map_offset = request->start_byte%getpagesize(); + size_t map_size = static_cast<size_t>(ceil(float(request->chunk_size+map_offset)/float(getpagesize()))*getpagesize()); + + if(request->start_byte+request->chunk_size > file_info->file_size) { + std::cerr << "Producer is sending a lager file then excepted" << std::endl; + self->io_utils->recv_in_steps(self->socket_fd_, nullptr, request->chunk_size, 0); + return; + } void* mapped_file = mmap(nullptr, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, file_info->fd, - request->start_byte); + map_start); if(!mapped_file || mapped_file == MAP_FAILED) { - std::cerr << "Mapping a file faild" << std::endl;//TODO need to read to rest of the file into void - self->io->recv(self->socket_fd_, nullptr, request->chunk_size, 0); + std::cerr << "Mapping a file failed. errno: " << errno << std::endl; + self->io_utils->recv_in_steps(self->socket_fd_, nullptr, request->chunk_size, 0); response->error_code = NET_ERR__INTERNAL_SERVER_ERROR; return; } - if(self->io->recv(self->socket_fd_, mapped_file, request->chunk_size, 0) != request->chunk_size) { - std::cerr << "Fail to recv all the chunk data" << std::endl; + if(self->io_utils->recv_in_steps(self->socket_fd_, mapped_file + map_offset, request->chunk_size, 0) != request->chunk_size) { + std::cerr << "Fail to recv all the chunk data. errno: " << errno << std::endl; response->error_code = NET_ERR__INTERNAL_SERVER_ERROR; }