Skip to content
Snippets Groups Projects
Commit 4dc981c6 authored by Carsten Patzke's avatar Carsten Patzke
Browse files

Dummy detector sends test file to receiver

parent c0176cf1
No related branches found
No related tags found
No related merge requests found
......@@ -8,9 +8,6 @@
const uint32_t hidra2::ProducerImpl::kVersion = 1;
hidra2::FileReferenceId hidra2::ProducerImpl::kGlobalReferenceId = 0;
hidra2::ProducerImpl::ProducerImpl() {
__set_io(ProducerImpl::kDefaultIO);
}
......@@ -65,6 +62,7 @@ hidra2::ProducerError hidra2::ProducerImpl::connect_to_receiver(std::string rece
return PRODUCER_ERROR__OK;
}
//TODO not our code. need to be removed. Copy&Pasted from stackoverflow
void hexDump (char *desc, void *addr, int len) {
int i;
unsigned char buff[17];
......@@ -121,8 +119,13 @@ hidra2::ProducerError hidra2::ProducerImpl::send(std::string filename, void *dat
hidra2::PrepareSendDataRequest prepareSendDataRequest;
prepareSendDataRequest.op_code = OP_CODE__PREPARE_SEND_DATA;
prepareSendDataRequest.request_id = request_id++;
prepareSendDataRequest.file_size = file_size;
filename.copy((char*)&prepareSendDataRequest.filename, sizeof(prepareSendDataRequest.filename));
//TODO Loop
std::cout << "Send file: " << filename << std::endl;
io->send(client_fd_, &prepareSendDataRequest, sizeof(prepareSendDataRequest), 0);
hidra2::PrepareSendDataResponse prepareSendDataResponse;
......@@ -138,8 +141,8 @@ hidra2::ProducerError hidra2::ProducerImpl::send(std::string filename, void *dat
hidra2::SendDataChunkRequest sendDataChunkRequest;
sendDataChunkRequest.op_code = OP_CODE__SEND_DATA_CHUNK;
sendDataChunkRequest.request_id = request_id++;
sendDataChunkRequest.chunk_size = 0;
sendDataChunkRequest.start_byte = 0;
sendDataChunkRequest.chunk_size = file_size;
sendDataChunkRequest.file_reference_id = prepareSendDataResponse.file_reference_id;
io->send(client_fd_, &sendDataChunkRequest, sizeof(sendDataChunkRequest), 0);
......
......@@ -10,7 +10,6 @@ class ProducerImpl : public Producer {
friend Producer;
private:
static const uint32_t kVersion;
static FileReferenceId kGlobalReferenceId;
int client_fd_ = -1;
uint64_t request_id = 0;
......
#include "dummy_detector.h"
#include <iostream>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <cmath>
int DummyDetector::main(int argc, char **argv) {
std::unique_ptr<hidra2::Producer> producer = hidra2::Producer::create();
producer->connect_to_receiver("127.0.0.1");
const size_t size = 255;
/*const size_t size = 255;
void *buffer = malloc(size);
for(char i = 0; i < 255; i++) {
static_cast<char*>(buffer)[i] = i;
for(unsigned char i = 0; i < 255; i++) {
static_cast<unsigned char*>(buffer)[i] = i;
}
*/
int fd = open("/tmp/Test.png", O_RDONLY);
struct stat astat{};
fstat(fd, &astat);
size_t map_size = static_cast<size_t>(ceil(float(astat.st_size)/float(getpagesize()))*getpagesize());
void *buffer = mmap(nullptr, map_size, PROT_READ, MAP_SHARED, fd, 0);
hidra2::ProducerError error;
error = producer->send("testfile", buffer, size);
error = producer->send("testfile", buffer, astat.st_size);
if(error) {
std::cerr << "File was not successfully send, ErrorCode: " << error << std::endl;
......
......@@ -3,6 +3,7 @@ set(SOURCE_FILES
src/main.cpp
src/receiver.h src/receiver.cpp
src/network_producer_peer.h src/network_producer_peer.cpp src/network_producer_peer_handlers.cpp
src/file_refernce_handler.h src/file_refernce_handler.cpp
)
......
#include <iostream>
#include <fcntl.h>
#include "file_refernce_handler.h"
namespace hidra2 {
std::map<FileReferenceId, std::shared_ptr<FileReferenceHandler::FileInformation>> FileReferenceHandler::kFileRefernceMap;
FileReferenceId FileReferenceHandler::kGlobalFileRefernceId = 0;
hidra2::FileReferenceId FileReferenceHandler::add_file(std::string filename,
uint64_t file_size,
uint32_t owner_connection_id,
FileReferenceHandlerError& err) {
FileReferenceId file_ref_id = ++kGlobalFileRefernceId;
int fd = open(filename.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666);
if(fd == -1) {
err = FILE_REFERENCE_HANDLER_ERR__OPEN_FAILED;
return 0;
}
/*
sync();
__off_t offset = lseek(fd, file_size, SEEK_SET);
if(offset != file_size) {
err = FILE_REFERENCE_HANDLER_ERR__LSEEK_FAILED;
return 0;
}
sync();
*/
fallocate(fd, 0, 0, file_size);
auto file_info = new FileInformation();
file_info->filename = filename;
file_info->file_size = file_size;
file_info->owner = owner_connection_id;
file_info->fd = fd;
kFileRefernceMap[file_ref_id] = std::shared_ptr<FileReferenceHandler::FileInformation>(file_info);
err = FILE_REFERENCE_HANDLER_ERR__OK;
return file_ref_id;
}
std::shared_ptr<FileReferenceHandler::FileInformation> FileReferenceHandler::get_file(FileReferenceId file_reference_id) {
return kFileRefernceMap[file_reference_id];
}
void FileReferenceHandler::remove_file(FileReferenceId file_reference_id) {
if(kFileRefernceMap.count(file_reference_id) != 1) {
return; //Not found
}
auto file_info = kFileRefernceMap[file_reference_id];
close(file_info->fd);
kFileRefernceMap.erase(file_reference_id);
}
}
#ifndef HIDRA2_FILE_REFERNCE_HANDLER_H
#define HIDRA2_FILE_REFERNCE_HANDLER_H
#include <cstdint>
#include <common/networking.h>
#include <map>
#include <system_wrappers/has_io.h>
#include <memory>
namespace hidra2 {
enum FileReferenceHandlerError {
FILE_REFERENCE_HANDLER_ERR__OK,
FILE_REFERENCE_HANDLER_ERR__OPEN_FAILED,
FILE_REFERENCE_HANDLER_ERR__LSEEK_FAILED,
};
class FileReferenceHandler : HasIO {
public:
struct FileInformation {
std::string filename;
uint64_t file_size;
uint32_t owner;
int fd;
};
private:
static FileReferenceId kGlobalFileRefernceId;
static std::map<FileReferenceId, std::shared_ptr<FileReferenceHandler::FileInformation>> kFileRefernceMap;
public:
FileReferenceId add_file (std::string filename, uint64_t file_size, uint32_t owner_connection_id, FileReferenceHandlerError& err);
std::shared_ptr<FileInformation> get_file (FileReferenceId file_reference_id);
void remove_file (FileReferenceId file_reference_id);
};
}
#endif //HIDRA2_FILE_REFERNCE_HANDLER_H
......@@ -3,9 +3,10 @@
namespace hidra2 {
FileReferenceHandler NetworkProducerPeer::file_reference_handler;
std::atomic<uint32_t> NetworkProducerPeer::kNetworkProducerPeerCount;
const std::vector<NetworkProducerPeer::RequestHandlerInformation> NetworkProducerPeer::request_handlers = NetworkProducerPeer::init_request_handlers();
const std::vector<NetworkProducerPeer::RequestHandlerInformation> NetworkProducerPeer::kRequestHandlers = NetworkProducerPeer::init_request_handlers();
NetworkProducerPeer::NetworkProducerPeer(int socket_fd, std::string address) {
address_ = std::move(address);
......@@ -96,7 +97,7 @@ size_t NetworkProducerPeer::handle_generic_request_(GenericNetworkRequest* reque
response->request_id = request->request_id;
response->op_code = request->op_code;
auto handler_information = request_handlers[request->op_code];
auto handler_information = kRequestHandlers[request->op_code];
//receive the rest of the message
recv(socket_fd_, request->data, handler_information.request_size - sizeof(GenericNetworkRequest), 0);
......
......@@ -2,6 +2,7 @@
#define HIDRA2_NETWORKPRODUCERPEER_H
#include <string>
#include <map>
#include <utility>
#include <thread>
#include <common/networking.h>
......@@ -9,6 +10,7 @@
#include <iostream>
#include <atomic>
#include <vector>
#include "file_refernce_handler.h"
namespace hidra2 {
......@@ -21,8 +23,8 @@ class NetworkProducerPeer : HasIO {
RequestHandler handler;
};
private:
static const std::vector<RequestHandlerInformation> request_handlers;
static FileReferenceHandler file_reference_handler;
static const std::vector<RequestHandlerInformation> kRequestHandlers;
static std::atomic<uint32_t> kNetworkProducerPeerCount;
uint32_t connection_id_;
......@@ -39,7 +41,7 @@ class NetworkProducerPeer : HasIO {
static void handle_send_data_chunk_request_(NetworkProducerPeer* self, const SendDataChunkRequest* request, SendDataChunkResponse* response);
public:
NetworkProducerPeer(int socket_fd, std::string aHelloRequestddress);
NetworkProducerPeer(int socket_fd, std::string address);
static const std::vector<RequestHandlerInformation> init_request_handlers();
......
#include "network_producer_peer.h"
#include <sys/sendfile.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <cmath>
namespace hidra2 {
void hexDump (char *desc, void *addr, int len) {
int i;
unsigned char buff[17];
unsigned char *pc = (unsigned char*)addr;
// Output description if given.
if (desc != NULL)
printf ("%s:\n", desc);
if (len == 0) {
printf(" ZERO LENGTH\n");
return;
}
if (len < 0) {
printf(" NEGATIVE LENGTH: %i\n",len);
return;
}
// Process every byte in the data.
for (i = 0; i < len; i++) {
// Multiple of 16 means new line (with line offset).
if ((i % 16) == 0) {
// Just don't print ASCII for the zeroth line.
if (i != 0)
printf (" %s\n", buff);
// Output the offset.
printf (" %04x ", i);
}
// Now the hex code for the specific character.
printf (" %02x", pc[i]);
// And store a printable ASCII character for later.
if ((pc[i] < 0x20) || (pc[i] > 0x7e))
buff[i % 16] = '.';
else
buff[i % 16] = pc[i];
buff[(i % 16) + 1] = '\0';
}
// Pad out last line if not exactly 16 characters.
while ((i % 16) != 0) {
printf (" ");
i++;
}
// And print the final ASCII bit.
printf (" %s\n", buff);
}
const std::vector<NetworkProducerPeer::RequestHandlerInformation> NetworkProducerPeer::init_request_handlers() {
std::vector<NetworkProducerPeer::RequestHandlerInformation> vec(OP_CODE_COUNT);
......@@ -72,7 +24,7 @@ const std::vector<NetworkProducerPeer::RequestHandlerInformation> NetworkProduce
vec[OP_CODE__SEND_DATA_CHUNK] = {
sizeof(SendDataChunkRequest),
sizeof(SendDataChunkResponse),
(NetworkProducerPeer::RequestHandler) &NetworkProducerPeer::handle_prepare_send_data_request_
(NetworkProducerPeer::RequestHandler) &NetworkProducerPeer::handle_send_data_chunk_request_
};
return vec;
......@@ -94,33 +46,76 @@ void NetworkProducerPeer::handle_hello_request_(NetworkProducerPeer* self, const
void NetworkProducerPeer::handle_prepare_send_data_request_(NetworkProducerPeer* self, const PrepareSendDataRequest* request,
PrepareSendDataResponse* response) {
std::cout << "op_code " << request->op_code << std::endl;
std::cout << "request_id " << request->request_id << std::endl;
std::cout << "[PRE]op_code " << request->op_code << std::endl;
std::cout << "[PRE]request_id " << request->request_id << std::endl;
std::cout << "filename " << request->filename << std::endl;
std::cout << "file_size " << request->file_size << std::endl;
std::cout << "[PRE]filename " << request->filename << std::endl;
std::cout << "[PRE]file_size " << request->file_size << std::endl;
FileReferenceHandlerError error;
FileReferenceId reference_id = self->file_reference_handler.add_file(request->filename,
request->file_size,
self->connection_id(),
error);
if(reference_id == 0 || error) {
response->error_code = NET_ERR__INTERNAL_SERVER_ERROR;
response->file_reference_id = 0;
return;
}
response->error_code = NET_ERR__NO_ERROR;
response->file_reference_id = 2;
response->file_reference_id = reference_id;
}
void NetworkProducerPeer::handle_send_data_chunk_request_(NetworkProducerPeer* self,
const SendDataChunkRequest* request,
SendDataChunkResponse* response) {
std::cout << "op_code " << request->op_code << std::endl;
std::cout << "request_id " << request->request_id << std::endl;
std::cout << "[CHUNK]op_code " << request->op_code << std::endl;
std::cout << "[CHUNK]request_id " << request->request_id << std::endl;
std::cout << "[CHUNK]file_reference_id " << request->file_reference_id << std::endl;
std::cout << "[CHUNK]start_byte " << request->start_byte << std::endl;
std::cout << "[CHUNK]chunk_size " << request->chunk_size << std::endl;
auto file_info = self->file_reference_handler.get_file(request->file_reference_id);
std::cout << "file_reference_id " << request->file_reference_id << std::endl;
std::cout << "start_byte " << request->start_byte << std::endl;
std::cout << "chunk_size " << request->chunk_size << std::endl;
if(file_info == nullptr || file_info->owner != self->connection_id()) {
response->error_code = NET_ERR__UNKNOWN_REFERENCE_ID;
return;
}
void* chunk_buffer = malloc(request->chunk_size);
size_t map_size = static_cast<size_t>(ceil(float(request->chunk_size)/float(getpagesize()))*getpagesize());
self->io->recv(self->socket_fd_, chunk_buffer, request->chunk_size, 0);
void* mapped_file = mmap(nullptr,
map_size,
PROT_READ | PROT_WRITE, MAP_SHARED,
file_info->fd,
request->start_byte);
hexDump("response", chunk_buffer, request->chunk_size);
if(!mapped_file || mapped_file == MAP_FAILED) {
std::cerr << "Mapping a file faild" << std::endl;//TODO need to read to rest of the file into void
self->io->recv(self->socket_fd_, nullptr, request->chunk_size, 0);
response->error_code = NET_ERR__INTERNAL_SERVER_ERROR;
return;
}
if(self->io->recv(self->socket_fd_, mapped_file, request->chunk_size, 0) != request->chunk_size) {
std::cerr << "Fail to recv all the chunk data" << std::endl;
response->error_code = NET_ERR__INTERNAL_SERVER_ERROR;
}
if (msync(mapped_file, map_size, MS_SYNC) == -1) {
std::cerr << "Fail to sync map file" << std::endl;
response->error_code = NET_ERR__INTERNAL_SERVER_ERROR;
}
if(munmap(mapped_file, map_size) == -1) {
std::cerr << "munmap file faild" << std::endl;
response->error_code = NET_ERR__INTERNAL_SERVER_ERROR;
return;
}
free(chunk_buffer);
response->error_code = NET_ERR__NO_ERROR;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment