From d5e789844a09eccabd28b938fa80e6a2f3d7c34f Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Sun, 20 May 2018 01:05:00 +0200 Subject: [PATCH] refactor, add option to producer api to write to file instead of sending via network --- common/cpp/include/common/networking.h | 21 ++- common/cpp/include/io/io.h | 1 + common/cpp/include/unittests/MockIO.h | 7 +- common/cpp/src/system_io/system_io.cpp | 9 +- common/cpp/src/system_io/system_io.h | 3 +- .../dummy_data_producer.cpp | 44 ++++-- producer/api/CMakeLists.txt | 7 +- producer/api/include/producer/common.h | 24 +++ producer/api/include/producer/producer.h | 18 +-- producer/api/src/producer.cpp | 4 +- producer/api/src/producer_impl.cpp | 29 ++-- producer/api/src/producer_impl.h | 8 +- producer/api/src/request.h | 4 +- producer/api/src/request_handler.h | 5 - producer/api/src/request_handler_factory.cpp | 12 +- producer/api/src/request_handler_factory.h | 12 +- .../api/src/request_handler_filesystem.cpp | 29 ++++ producer/api/src/request_handler_filesystem.h | 34 +++++ producer/api/src/request_handler_tcp.h | 2 +- producer/api/unittests/test_producer.cpp | 19 ++- producer/api/unittests/test_producer_impl.cpp | 34 +++-- .../test_request_handler_factory.cpp | 11 +- .../test_request_handler_filesystem.cpp | 138 ++++++++++++++++++ .../unittests/test_request_handler_tcp.cpp | 31 ++-- producer/api/unittests/test_request_pool.cpp | 10 +- receiver/src/connection.cpp | 6 +- receiver/src/request.cpp | 6 +- receiver/src/request.h | 6 +- receiver/unittests/test_connection.cpp | 18 +-- receiver/unittests/test_request.cpp | 6 +- receiver/unittests/test_request_factory.cpp | 10 +- .../test_request_handler_db_writer.cpp | 6 +- .../test_request_handler_file_write.cpp | 6 +- 33 files changed, 443 insertions(+), 137 deletions(-) create mode 100644 producer/api/include/producer/common.h create mode 100644 producer/api/src/request_handler_filesystem.cpp create mode 100644 producer/api/src/request_handler_filesystem.h create mode 100644 producer/api/unittests/test_request_handler_filesystem.cpp diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index 724948c05..708743860 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -2,15 +2,17 @@ #define ASAPO_COMMON__NETWORKING_H #include <cstdint> +#include <algorithm> +#include <string> namespace asapo { typedef uint64_t NetworkRequestId; enum Opcode : uint8_t { - kNetOpcodeUnknownOp, - kNetOpcodeSendData, - kNetOpcodeCount, + kOpcodeUnknownOp, + kOpcodeTransferData, + kOpcodeCount, }; enum NetworkErrorCode : uint16_t { @@ -27,10 +29,19 @@ enum NetworkErrorCode : uint16_t { * RPC always return a response to a corresponding request * @{ */ -struct GenericNetworkRequestHeader { - Opcode op_code; + +const std::size_t kMaxFileNameSize = 1024; +struct GenericRequestHeader { + GenericRequestHeader(Opcode i_op_code = kOpcodeUnknownOp,uint64_t i_data_id = 0, + uint64_t i_data_size = 0,const std::string& i_file_name = ""): + op_code{i_op_code},data_id{i_data_id},data_size{i_data_size} { + auto size = std::min(i_file_name.size()+1,kMaxFileNameSize); + memcpy(file_name, i_file_name.c_str(), size); + } + Opcode op_code; uint64_t data_id; uint64_t data_size; + char file_name[kMaxFileNameSize]; }; struct GenericNetworkResponse { diff --git a/common/cpp/include/io/io.h b/common/cpp/include/io/io.h index e177bfe40..f94e977fe 100644 --- a/common/cpp/include/io/io.h +++ b/common/cpp/include/io/io.h @@ -92,6 +92,7 @@ class IO { virtual size_t Write (FileDescriptor fd, const void* buf, size_t length, Error* err) const = 0; virtual Error WriteDataToFile (const std::string& fname, const FileData& data, size_t length) const = 0; + virtual Error WriteDataToFile (const std::string& fname, const uint8_t* data, size_t length) const = 0; virtual void CreateNewDirectory (const std::string& directory_name, Error* err) const = 0; virtual FileData GetDataFromFile (const std::string& fname, uint64_t fsize, Error* err) const = 0; diff --git a/common/cpp/include/unittests/MockIO.h b/common/cpp/include/unittests/MockIO.h index 803ce8690..4260a5918 100644 --- a/common/cpp/include/unittests/MockIO.h +++ b/common/cpp/include/unittests/MockIO.h @@ -178,7 +178,12 @@ class MockIO : public IO { } - MOCK_CONST_METHOD3(WriteDataToFile_t, ErrorInterface * (const std::string& fname, uint8_t* data, size_t fsize)); + Error WriteDataToFile(const std::string& fname, const uint8_t* data, size_t length) const override { + return Error{WriteDataToFile_t(fname, data, length)}; + } + + + MOCK_CONST_METHOD3(WriteDataToFile_t, ErrorInterface * (const std::string& fname, const uint8_t* data, size_t fsize)); void CollectFileInformationRecursively(const std::string& path, std::vector<FileInfo>* files, Error* err) const override { diff --git a/common/cpp/src/system_io/system_io.cpp b/common/cpp/src/system_io/system_io.cpp index addeaa526..6841f7d9d 100644 --- a/common/cpp/src/system_io/system_io.cpp +++ b/common/cpp/src/system_io/system_io.cpp @@ -122,20 +122,25 @@ void asapo::SystemIO::CreateNewDirectory(const std::string& directory_name, Erro } } -Error SystemIO::WriteDataToFile(const std::string& fname, const FileData& data, size_t length) const { +Error SystemIO::WriteDataToFile(const std::string& fname, const uint8_t* data, size_t length) const { Error err; auto fd = Open(fname, IO_OPEN_MODE_CREATE_AND_FAIL_IF_EXISTS | IO_OPEN_MODE_RW, &err); if (err) { return err; } - Write(fd, data.get(), length, &err); + Write(fd, data, length, &err); if (err) { return err; } Close(fd, &err); return err; + +} + +Error SystemIO::WriteDataToFile(const std::string& fname, const FileData& data, size_t length) const { + return WriteDataToFile(fname,data.get(),length); } diff --git a/common/cpp/src/system_io/system_io.h b/common/cpp/src/system_io/system_io.h index ce033ff31..424ca8cd5 100644 --- a/common/cpp/src/system_io/system_io.h +++ b/common/cpp/src/system_io/system_io.h @@ -102,7 +102,8 @@ class SystemIO final : public IO { void CreateNewDirectory(const std::string& directory_name, Error* err) const; FileData GetDataFromFile(const std::string& fname, uint64_t fsize, Error* err) const; Error WriteDataToFile (const std::string& fname, const FileData& data, size_t length) const; - void CollectFileInformationRecursively(const std::string& path, std::vector<FileInfo>* files, + Error WriteDataToFile(const std::string& fname, const uint8_t* data, size_t length) const; + void CollectFileInformationRecursively(const std::string& path, std::vector<FileInfo>* files, Error* err) const; std::string ReadFileToString(const std::string& fname, Error* err) const; }; diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 3350c099c..f5c9a6c93 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -11,17 +11,18 @@ using std::chrono::high_resolution_clock; std::mutex mutex; +int nfiles; -typedef std::tuple<std::string, size_t, uint64_t, uint64_t> ArgumentTuple; +typedef std::tuple<std::string, size_t, uint64_t, uint64_t,uint64_t> ArgumentTuple; ArgumentTuple ProcessCommandArguments(int argc, char* argv[]) { - if (argc != 5) { + if (argc != 6) { std::cout << - "Usage: " << argv[0] << " <receiver_address> <number_of_byte> <iterations> <nthreads>" + "Usage: " << argv[0] << " <destination> <number_of_byte> <iterations> <nthreads> <mode 0 -t tcp, 1 - filesystem>" << std::endl; exit(EXIT_FAILURE); } try { - return ArgumentTuple(argv[1], std::stoull(argv[2]), std::stoull(argv[3]), std::stoull(argv[4])); + return ArgumentTuple(argv[1], std::stoull(argv[2]), std::stoull(argv[3]), std::stoull(argv[4]),std::stoull(argv[5])); } catch(std::exception& e) { std::cerr << "Fail to parse arguments" << std::endl; std::cerr << e.what() << std::endl; @@ -29,13 +30,16 @@ ArgumentTuple ProcessCommandArguments(int argc, char* argv[]) { } } -void work(asapo::GenericNetworkRequestHeader header, asapo::Error err) { +void work(asapo::GenericRequestHeader header, asapo::Error err) { mutex.lock(); + nfiles--; if (err) { std::cerr << "File was not successfully send: " << err << std::endl; + nfiles = 0; + mutex.unlock(); return; } - std::cerr << "File was successfully send." << header.data_id << std::endl; + // std::cerr << "File was successfully send." << header.data_id << std::endl; mutex.unlock(); } @@ -44,7 +48,7 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it for(uint64_t i = 0; i < iterations; i++) { // std::cerr << "Send file " << i + 1 << "/" << iterations << std::endl; - auto err = producer->Send(i + 1, buffer.get(), number_of_byte, &work); + auto err = producer->Send(i + 1, buffer.get(), number_of_byte,std::to_string(i), &work); if (err) { std::cerr << "Cannot send file: " << err << std::endl; return false; @@ -59,16 +63,26 @@ int main (int argc, char* argv[]) { size_t number_of_kbytes; uint64_t iterations; uint64_t nthreads; - std::tie(receiver_address, number_of_kbytes, iterations, nthreads) = ProcessCommandArguments(argc, argv); + uint64_t mode; + + std::tie(receiver_address, number_of_kbytes, iterations, nthreads,mode) = ProcessCommandArguments(argc, argv); std::cout << "receiver_address: " << receiver_address << std::endl << "Package size: " << number_of_kbytes << "k" << std::endl << "iterations: " << iterations << std::endl << "nthreads: " << nthreads << std::endl - << std::endl; + << "mode: " << mode << std::endl + << std::endl; + + nfiles = iterations; asapo::Error err; - auto producer = asapo::Producer::Create(receiver_address, nthreads, &err); + std::unique_ptr<asapo::Producer> producer; + if (mode == 0) { + producer = asapo::Producer::Create(receiver_address, nthreads, asapo::RequestHandlerType::kTcp,&err); + } else { + producer = asapo::Producer::Create(receiver_address, nthreads, asapo::RequestHandlerType::kFilesystem,&err); + } producer->EnableLocalLog(true); producer->SetLogLevel(asapo::LogLevel::Debug); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); @@ -82,6 +96,16 @@ int main (int argc, char* argv[]) { if(!SendDummyData(producer.get(), number_of_kbytes * 1024, iterations)) { return EXIT_FAILURE; } + + while (true) { + mutex.lock(); + if (nfiles <= 0) { + mutex.unlock(); + break; + } + mutex.unlock(); + } + high_resolution_clock::time_point t2 = high_resolution_clock::now(); double duration_sec = std::chrono::duration_cast<std::chrono::milliseconds>( t2 - t1 ).count() / 1000.0; double size_gb = double(number_of_kbytes) * iterations / 1024.0 / 1024.0 * 8.0; diff --git a/producer/api/CMakeLists.txt b/producer/api/CMakeLists.txt index 55d2aac86..e30da7fec 100644 --- a/producer/api/CMakeLists.txt +++ b/producer/api/CMakeLists.txt @@ -4,9 +4,11 @@ set(SOURCE_FILES src/producer_impl.cpp src/producer_logger.cpp src/request_handler_tcp.cpp - src/request_pool.cpp src/receiver_discovery_service.cpp + src/request_handler_filesystem.cpp + src/request_pool.cpp + src/receiver_discovery_service.cpp src/request_handler_factory.cpp - src/request.cpp) + src/request.cpp include/producer/common.h) ################################ @@ -24,6 +26,7 @@ set(TEST_SOURCE_FILES unittests/test_producer_impl.cpp unittests/test_producer.cpp unittests/test_request_handler_tcp.cpp + unittests/test_request_handler_filesystem.cpp unittests/test_request_pool.cpp unittests/test_receiver_discovery_service.cpp unittests/test_request_handler_factory.cpp diff --git a/producer/api/include/producer/common.h b/producer/api/include/producer/common.h new file mode 100644 index 000000000..2f6b6322e --- /dev/null +++ b/producer/api/include/producer/common.h @@ -0,0 +1,24 @@ +#ifndef ASAPO_PRODUCER_COMMON_H +#define ASAPO_PRODUCER_COMMON_H + +#include <cstdint> +#include <functional> + +#include "common/networking.h" +#include "common/error.h" + +namespace asapo { + +const uint8_t kMaxProcessingThreads = 32; + +using RequestCallback = std::function<void(GenericRequestHeader, Error)>; + +enum class RequestHandlerType { + kTcp, + kFilesystem +}; + + +} + +#endif //ASAPO_PRODUCER_COMMON_H diff --git a/producer/api/include/producer/producer.h b/producer/api/include/producer/producer.h index c1fd27ce8..0174ca8de 100644 --- a/producer/api/include/producer/producer.h +++ b/producer/api/include/producer/producer.h @@ -3,23 +3,12 @@ #include <memory> #include <string> -#include <functional> -#include "common/networking.h" -#include "producer_error.h" #include "logger/logger.h" +#include "producer/common.h" namespace asapo { -enum class ProducerStatus { - kDisconnected, - kConnected, -}; - -const uint8_t kMaxProcessingThreads = 32; - -using RequestCallback = std::function<void(GenericNetworkRequestHeader, Error)>; - class Producer { public: @@ -27,7 +16,8 @@ class Producer { /*! * @return A unique_ptr to a new producer instance */ - static std::unique_ptr<Producer> Create(const std::string& endpoint, uint8_t n_processing_threads, Error* err); + static std::unique_ptr<Producer> Create(const std::string& endpoint, uint8_t n_processing_threads, asapo::RequestHandlerType type, + Error* err); virtual ~Producer() = default; @@ -38,7 +28,7 @@ class Producer { \param file_size - The size of the data. \return Error - Will be nullptr on success */ - virtual Error Send(uint64_t file_id, const void* data, size_t file_size, RequestCallback callback) = 0; + virtual Error Send(uint64_t file_id, const void* data, size_t file_size,std::string file_name, RequestCallback callback) = 0; //! Set internal log level virtual void SetLogLevel(LogLevel level) = 0; //! Enables/Disables logs output to stdout diff --git a/producer/api/src/producer.cpp b/producer/api/src/producer.cpp index 2736578e1..01be40c65 100644 --- a/producer/api/src/producer.cpp +++ b/producer/api/src/producer.cpp @@ -2,7 +2,7 @@ #include "producer_impl.h" std::unique_ptr<asapo::Producer> asapo::Producer::Create(const std::string& endpoint, uint8_t n_processing_threads, - Error* err) { + asapo::RequestHandlerType type, Error* err) { if (n_processing_threads > kMaxProcessingThreads) { *err = TextError("Too many processing threads: " + std::to_string(n_processing_threads)); return nullptr; @@ -10,7 +10,7 @@ std::unique_ptr<asapo::Producer> asapo::Producer::Create(const std::string& endp try { *err = nullptr; - return std::unique_ptr<asapo::Producer>(new ProducerImpl(endpoint, n_processing_threads)); + return std::unique_ptr<asapo::Producer>(new ProducerImpl(endpoint, n_processing_threads,type)); } catch (const std::exception& ex) { *err = TextError(ex.what()); return nullptr; diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp index 3d7e84e46..e2eca87ad 100644 --- a/producer/api/src/producer_impl.cpp +++ b/producer/api/src/producer_impl.cpp @@ -4,6 +4,7 @@ #include "producer_impl.h" #include "producer_logger.h" #include "io/io_factory.h" +#include "producer/producer_error.h" namespace asapo { @@ -11,21 +12,27 @@ const size_t ProducerImpl::kMaxChunkSize = size_t(1024) * size_t(1024) * size_t( const size_t ProducerImpl::kDiscoveryServiceUpdateFrequencyMs = 10000; // 10s -ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads): log__{GetDefaultProducerLogger()} { - discovery_service_.reset(new ReceiverDiscoveryService{endpoint, ProducerImpl::kDiscoveryServiceUpdateFrequencyMs}); - request_handler_factory_.reset(new RequestHandlerFactory{RequestHandlerType::kTcp, discovery_service_.get()}); +ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads,asapo::RequestHandlerType type): + log__{GetDefaultProducerLogger()} { + switch (type) { + case RequestHandlerType::kTcp: + discovery_service_.reset(new ReceiverDiscoveryService{endpoint, ProducerImpl::kDiscoveryServiceUpdateFrequencyMs}); + request_handler_factory_.reset(new RequestHandlerFactory{discovery_service_.get()}); + break; + case RequestHandlerType::kFilesystem: + request_handler_factory_.reset(nullptr); + request_handler_factory_.reset(new RequestHandlerFactory{endpoint}); + + } request_pool__.reset(new RequestPool{n_processing_threads, request_handler_factory_.get()}); } -GenericNetworkRequestHeader ProducerImpl::GenerateNextSendRequest(uint64_t file_id, size_t file_size) { - GenericNetworkRequestHeader request; - request.op_code = kNetOpcodeSendData; - request.data_id = file_id; - request.data_size = file_size; +GenericRequestHeader ProducerImpl::GenerateNextSendRequest(uint64_t file_id, size_t file_size,std::string file_name) { + GenericRequestHeader request{kOpcodeTransferData,file_id,file_size,std::move(file_name)}; return request; } -Error CheckProducerRequest(const GenericNetworkRequestHeader header) { +Error CheckProducerRequest(const GenericRequestHeader header) { if (header.data_size > ProducerImpl::kMaxChunkSize) { return ProducerErrorTemplates::kFileTooLarge.Generate(); } @@ -34,8 +41,8 @@ Error CheckProducerRequest(const GenericNetworkRequestHeader header) { } -Error ProducerImpl::Send(uint64_t file_id, const void* data, size_t file_size, RequestCallback callback) { - auto request_header = GenerateNextSendRequest(file_id, file_size); +Error ProducerImpl::Send(uint64_t file_id, const void* data, size_t file_size,std::string file_name,RequestCallback callback) { + auto request_header = GenerateNextSendRequest(file_id, file_size,std::move(file_name)); auto err = CheckProducerRequest(request_header); if (err) { diff --git a/producer/api/src/producer_impl.h b/producer/api/src/producer_impl.h index 7c48dc2f7..1706ed87a 100644 --- a/producer/api/src/producer_impl.h +++ b/producer/api/src/producer_impl.h @@ -21,21 +21,21 @@ class ProducerImpl : public Producer { static const size_t kMaxChunkSize; static const size_t kDiscoveryServiceUpdateFrequencyMs; - explicit ProducerImpl(std::string endpoint, uint8_t n_processing_threads); + explicit ProducerImpl(std::string endpoint, uint8_t n_processing_threads,asapo::RequestHandlerType type); ProducerImpl(const ProducerImpl&) = delete; ProducerImpl& operator=(const ProducerImpl&) = delete; void SetLogLevel(LogLevel level) override; void EnableLocalLog(bool enable) override; void EnableRemoteLog(bool enable) override; - Error Send(uint64_t file_id, const void* data, size_t file_size, RequestCallback callback) override; + Error Send(uint64_t file_id, const void* data, size_t file_size,std::string file_name, RequestCallback callback) override; AbstractLogger* log__; std::unique_ptr<RequestPool> request_pool__; private: - GenericNetworkRequestHeader GenerateNextSendRequest(uint64_t file_id, size_t file_size); + GenericRequestHeader GenerateNextSendRequest(uint64_t file_id, size_t file_size,std::string file_name); }; -Error CheckProducerRequest(const GenericNetworkRequestHeader header); +Error CheckProducerRequest(const GenericRequestHeader header); } #endif //ASAPO_PRODUCER__PRODUCER_IMPL_H diff --git a/producer/api/src/request.h b/producer/api/src/request.h index 0e9e29b27..beb9daf8e 100644 --- a/producer/api/src/request.h +++ b/producer/api/src/request.h @@ -2,12 +2,12 @@ #define ASAPO_PRODUCER_REQUEST_H #include "common/networking.h" -#include "producer/producer.h" +#include "producer/common.h" namespace asapo { struct Request { - GenericNetworkRequestHeader header; + GenericRequestHeader header; const void* data; RequestCallback callback; }; diff --git a/producer/api/src/request_handler.h b/producer/api/src/request_handler.h index 1e3f51836..70138c7f0 100644 --- a/producer/api/src/request_handler.h +++ b/producer/api/src/request_handler.h @@ -6,11 +6,6 @@ #include "common/error.h" #include "request.h" -#ifdef UNIT_TESTS -#define VIRTUAL virtual -#endif - - namespace asapo { class RequestHandler { diff --git a/producer/api/src/request_handler_factory.cpp b/producer/api/src/request_handler_factory.cpp index 7dedc995b..bd6ff52dd 100644 --- a/producer/api/src/request_handler_factory.cpp +++ b/producer/api/src/request_handler_factory.cpp @@ -1,6 +1,8 @@ #include "request_handler_factory.h" #include "request_handler_tcp.h" +#include "request_handler_filesystem.h" + namespace asapo { @@ -8,18 +10,22 @@ std::unique_ptr<RequestHandler> RequestHandlerFactory::NewRequestHandler(uint64_ switch (type_) { case asapo::RequestHandlerType::kTcp: return std::unique_ptr<RequestHandler> {new RequestHandlerTcp(discovery_service_, thread_id, shared_counter)}; + case asapo::RequestHandlerType::kFilesystem: + return std::unique_ptr<RequestHandler> {new RequestHandlerFilesystem(destination_folder_, thread_id)}; + } return nullptr; } -RequestHandlerFactory::RequestHandlerFactory(RequestHandlerType type, - ReceiverDiscoveryService* discovery_service): type_{type}, +RequestHandlerFactory::RequestHandlerFactory(ReceiverDiscoveryService* discovery_service): type_{RequestHandlerType::kTcp}, discovery_service_{discovery_service} { if (discovery_service_) { discovery_service_->StartCollectingData(); } +} - +RequestHandlerFactory::RequestHandlerFactory(std::string destination_folder): type_{RequestHandlerType::kFilesystem}, + destination_folder_{std::move(destination_folder)} { } diff --git a/producer/api/src/request_handler_factory.h b/producer/api/src/request_handler_factory.h index 32cffafd8..3c38fd314 100644 --- a/producer/api/src/request_handler_factory.h +++ b/producer/api/src/request_handler_factory.h @@ -11,18 +11,16 @@ namespace asapo { #define VIRTUAL virtual #endif -enum class RequestHandlerType { - kTcp, - kFilesystem -}; class RequestHandlerFactory { public: - RequestHandlerFactory(RequestHandlerType type, ReceiverDiscoveryService* discovery_service); - VIRTUAL std::unique_ptr<RequestHandler> NewRequestHandler(uint64_t thread_id, uint64_t* shared_counter); + RequestHandlerFactory(ReceiverDiscoveryService* discovery_service); + RequestHandlerFactory(std::string destination_folder); + VIRTUAL std::unique_ptr<RequestHandler> NewRequestHandler(uint64_t thread_id, uint64_t* shared_counter); private: RequestHandlerType type_; - ReceiverDiscoveryService* discovery_service_; + ReceiverDiscoveryService* discovery_service_{nullptr}; + std::string destination_folder_; }; diff --git a/producer/api/src/request_handler_filesystem.cpp b/producer/api/src/request_handler_filesystem.cpp new file mode 100644 index 000000000..f198d9a2d --- /dev/null +++ b/producer/api/src/request_handler_filesystem.cpp @@ -0,0 +1,29 @@ +#include "producer/producer_error.h" +#include "request_handler_filesystem.h" +#include "producer_logger.h" +#include "io/io_factory.h" + +#include <cstdint> + +namespace asapo { + + +RequestHandlerFilesystem::RequestHandlerFilesystem(std::string destination_folder, uint64_t thread_id): + io__{GenerateDefaultIO()}, log__{GetDefaultProducerLogger()}, destination_folder_{std::move(destination_folder)}, thread_id_{thread_id} +{ + +} + + + +Error RequestHandlerFilesystem::ProcessRequestUnlocked(const Request* request) { + std::string fullpath = destination_folder_ + "/"+request->header.file_name+".bin"; + auto err = io__->WriteDataToFile(fullpath,(uint8_t*)request->data,request->header.data_size); + if (request->callback) { + request->callback(request->header, std::move(err)); + } + return nullptr; +} + + +} diff --git a/producer/api/src/request_handler_filesystem.h b/producer/api/src/request_handler_filesystem.h new file mode 100644 index 000000000..8436562fc --- /dev/null +++ b/producer/api/src/request_handler_filesystem.h @@ -0,0 +1,34 @@ +#ifndef ASAPO_REQUEST_HANDLER_FILESYSTEM_H +#define ASAPO_REQUEST_HANDLER_FILESYSTEM_H + +#include <chrono> + +#include "io/io.h" +#include "common/error.h" + +#include "producer/common.h" +#include "request_handler.h" +#include "logger/logger.h" + +using std::chrono::high_resolution_clock; + +namespace asapo { + +class RequestHandlerFilesystem: public RequestHandler { + public: + explicit RequestHandlerFilesystem(std::string destination_folder, uint64_t thread_id); + Error ProcessRequestUnlocked(const Request* request) override; + bool ReadyProcessRequest() override {return true;}; + void PrepareProcessingRequestLocked() override {}; + void TearDownProcessingRequestLocked(const Error& error_from_process) override {}; + + virtual ~RequestHandlerFilesystem() = default; + std::unique_ptr<IO> io__; + const AbstractLogger* log__; + private: + std::string destination_folder_; + uint64_t thread_id_; +}; +} + +#endif //ASAPO_REQUEST_HANDLER_FILESYSTEM_H diff --git a/producer/api/src/request_handler_tcp.h b/producer/api/src/request_handler_tcp.h index 6ed212294..2a631dbdd 100644 --- a/producer/api/src/request_handler_tcp.h +++ b/producer/api/src/request_handler_tcp.h @@ -8,7 +8,7 @@ #include "receiver_discovery_service.h" #include "common/networking.h" -#include "producer/producer.h" +#include "producer/common.h" #include "request_handler.h" diff --git a/producer/api/unittests/test_producer.cpp b/producer/api/unittests/test_producer.cpp index fd3b4eea7..cc9cdb135 100644 --- a/producer/api/unittests/test_producer.cpp +++ b/producer/api/unittests/test_producer.cpp @@ -3,30 +3,39 @@ #include "producer/producer.h" #include "../src/producer_impl.h" + using ::testing::Ne; using ::testing::Eq; namespace { -TEST(CreateProducer, PointerIsNotNullptr) { +TEST(CreateProducer, TcpProducer) { asapo::Error err; - std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4, &err); + std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint",4,asapo::RequestHandlerType::kTcp, &err); ASSERT_THAT(dynamic_cast<asapo::ProducerImpl*>(producer.get()), Ne(nullptr)); ASSERT_THAT(err, Eq(nullptr)); +} +TEST(CreateProducer, FileSystemProducer) { + asapo::Error err; + std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4,asapo::RequestHandlerType::kFilesystem, &err); + ASSERT_THAT(dynamic_cast<asapo::ProducerImpl*>(producer.get()), Ne(nullptr)); + ASSERT_THAT(err, Eq(nullptr)); } + TEST(CreateProducer, TooManyThreads) { asapo::Error err; - std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", asapo::kMaxProcessingThreads + 1, &err); + std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", asapo::kMaxProcessingThreads + 1, + asapo::RequestHandlerType::kTcp, &err); ASSERT_THAT(producer, Eq(nullptr)); ASSERT_THAT(err, Ne(nullptr)); } TEST(Producer, SimpleWorkflowWihoutConnection) { asapo::Error err; - std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("hello", 5, &err); - auto err_send = producer->Send(1, nullptr, 1, nullptr); + std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("hello", 5, asapo::RequestHandlerType::kTcp,&err); + auto err_send = producer->Send(1, nullptr, 1, "",nullptr); std::this_thread::sleep_for(std::chrono::milliseconds(100)); ASSERT_THAT(producer, Ne(nullptr)); ASSERT_THAT(err, Eq(nullptr)); diff --git a/producer/api/unittests/test_producer_impl.cpp b/producer/api/unittests/test_producer_impl.cpp index be807c26e..8f68ef6fe 100644 --- a/producer/api/unittests/test_producer_impl.cpp +++ b/producer/api/unittests/test_producer_impl.cpp @@ -3,8 +3,10 @@ #include "unittests/MockLogger.h" #include "common/error.h" -#include "producer/producer.h" +#include "producer/common.h" #include "../src/producer_impl.h" +#include "producer/producer_error.h" + #include "../src/request_pool.h" #include "../src/request_handler_tcp.h" @@ -28,8 +30,17 @@ using asapo::RequestPool; using asapo::Request; +MATCHER_P3(M_CheckSendDataRequest, file_id, file_size,file_name, + "Checks if a valid GenericRequestHeader was Send") { + return ((asapo::GenericRequestHeader*)arg)->op_code == asapo::kOpcodeTransferData + && ((asapo::GenericRequestHeader*)arg)->data_id == file_id + && std::string(((asapo::GenericRequestHeader*)arg)->file_name) == file_name + && ((asapo::GenericRequestHeader*)arg)->data_size == file_size; +} + + TEST(ProducerImpl, Constructor) { - asapo::ProducerImpl producer{"", 4}; + asapo::ProducerImpl producer{"", 4,asapo::RequestHandlerType::kTcp}; ASSERT_THAT(dynamic_cast<asapo::AbstractLogger*>(producer.log__), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::RequestPool*>(producer.request_pool__.get()), Ne(nullptr)); } @@ -37,10 +48,10 @@ TEST(ProducerImpl, Constructor) { class ProducerImplTests : public testing::Test { public: testing::NiceMock<MockDiscoveryService> service; - asapo::RequestHandlerFactory factory{asapo::RequestHandlerType::kTcp, &service}; + asapo::RequestHandlerFactory factory{&service}; testing::NiceMock<asapo::MockLogger> mock_logger; testing::NiceMock<MockRequestPull> mock_pull{&factory}; - asapo::ProducerImpl producer{"", 1}; + asapo::ProducerImpl producer{"", 1,asapo::RequestHandlerType::kTcp}; void SetUp() override { producer.log__ = &mock_logger; producer.request_pool__ = std::unique_ptr<RequestPool> {&mock_pull}; @@ -53,23 +64,28 @@ class ProducerImplTests : public testing::Test { TEST_F(ProducerImplTests, SendReturnsError) { EXPECT_CALL(mock_pull, AddRequest_t(_)).WillOnce(Return( asapo::ProducerErrorTemplates::kRequestPoolIsFull.Generate().release())); - auto err = producer.Send(1, nullptr, 1, nullptr); + auto err = producer.Send(1, nullptr, 1, "",nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull)); } TEST_F(ProducerImplTests, ErrorIfSizeTooLarge) { - auto err = producer.Send(1, nullptr, asapo::ProducerImpl::kMaxChunkSize + 1, nullptr); + auto err = producer.Send(1, nullptr, asapo::ProducerImpl::kMaxChunkSize + 1,"", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kFileTooLarge)); } TEST_F(ProducerImplTests, OKSendingRequest) { uint64_t expected_size = 100; - Request request{asapo::GenericNetworkRequestHeader{}, nullptr, nullptr}; - EXPECT_CALL(mock_pull, AddRequest_t(_)).WillOnce(Return( + uint64_t expected_id = 10; + std::string expected_name = "test_name"; + + + Request request{asapo::GenericRequestHeader{asapo::kOpcodeTransferData,expected_id,expected_size,expected_name}, nullptr, nullptr}; + + EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(expected_id,expected_size,expected_name))).WillOnce(Return( nullptr)); - auto err = producer.Send(1, nullptr, expected_size, nullptr); + auto err = producer.Send(expected_id, nullptr, expected_size,expected_name, nullptr); ASSERT_THAT(err, Eq(nullptr)); } diff --git a/producer/api/unittests/test_request_handler_factory.cpp b/producer/api/unittests/test_request_handler_factory.cpp index 8d3bb0b6d..cbfc57d80 100644 --- a/producer/api/unittests/test_request_handler_factory.cpp +++ b/producer/api/unittests/test_request_handler_factory.cpp @@ -5,6 +5,7 @@ #include "../src/receiver_discovery_service.h" #include "../src/request_handler_tcp.h" #include "mocking.h" +#include "../src/request_handler_filesystem.h" using ::testing::Ne; using ::testing::Eq; @@ -18,7 +19,7 @@ TEST(CreateFactory, Tcp) { MockDiscoveryService mock_discovery; EXPECT_CALL(mock_discovery, StartCollectingData()); - RequestHandlerFactory factory{asapo::RequestHandlerType::kTcp, &mock_discovery}; + RequestHandlerFactory factory{&mock_discovery}; auto handler = factory.NewRequestHandler(1, nullptr); @@ -26,6 +27,14 @@ TEST(CreateFactory, Tcp) { } +TEST(CreateFactory, Filesystem) { + RequestHandlerFactory factory{""}; + + auto handler = factory.NewRequestHandler(1, nullptr); + + ASSERT_THAT(dynamic_cast<asapo::RequestHandlerFilesystem*>(handler.get()), Ne(nullptr)); + +} } diff --git a/producer/api/unittests/test_request_handler_filesystem.cpp b/producer/api/unittests/test_request_handler_filesystem.cpp new file mode 100644 index 000000000..7579e63d1 --- /dev/null +++ b/producer/api/unittests/test_request_handler_filesystem.cpp @@ -0,0 +1,138 @@ +#include <gtest/gtest.h> +#include <gmock/gmock.h> + +#include "unittests/MockIO.h" +#include "unittests/MockLogger.h" +#include "common/error.h" +#include "io/io.h" + +#include "producer/common.h" +#include "producer/producer_error.h" + +#include "../src/request_handler_filesystem.h" +#include "io/io_factory.h" + +namespace { + +using ::testing::Return; +using ::testing::_; +using ::testing::DoAll; +using ::testing::SetArgReferee; +using ::testing::Gt; +using ::testing::Eq; +using ::testing::Ne; +using ::testing::Mock; +using ::testing::AllOf; +using testing::NiceMock; + +using ::testing::InSequence; +using ::testing::HasSubstr; + + +TEST(RequestHandlerFileSystem, Constructor) { + asapo::RequestHandlerFilesystem request_handler{"destination", 1}; + + ASSERT_THAT(dynamic_cast<const asapo::IO*>(request_handler.io__.get()), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(request_handler.log__), Ne(nullptr)); +} + +class RequestHandlerFilesystemTests : public testing::Test { + public: + NiceMock<asapo::MockIO> mock_io; + + uint64_t expected_file_id = 42; + uint64_t expected_file_size = 1337; + std::string expected_file_name = "test_name"; + uint64_t expected_thread_id = 2; + std::string expected_destination = "destination"; + std::string expected_fullpath = expected_destination + "/" + expected_file_name+".bin"; + asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; + uint8_t* expected_data_pointer = (uint8_t*)0xC00FE; + asapo::Error callback_err; + asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size,expected_file_name}; + bool called = false; + asapo::GenericRequestHeader callback_header; + asapo::Request request{header, expected_data_pointer, [this](asapo::GenericRequestHeader header, asapo::Error err) { + called = true; + callback_err = std::move(err); + callback_header = header; + }}; + + asapo::Request request_nocallback{header, expected_data_pointer, nullptr}; + testing::NiceMock<asapo::MockLogger> mock_logger; + + asapo::RequestHandlerFilesystem request_handler{expected_destination,expected_thread_id}; + + void SetUp() override { + request_handler.log__ = &mock_logger; + request_handler.io__.reset(&mock_io); + } + void TearDown() override { + request_handler.io__.release(); + } +}; + +ACTION_P(A_WriteSendDataResponse, error_code) { + ((asapo::SendDataResponse*)arg1)->op_code = asapo::kOpcodeTransferData; + ((asapo::SendDataResponse*)arg1)->error_code = error_code; +} + +MATCHER_P2(M_CheckSendDataRequest, file_id, file_size, + "Checks if a valid GenericRequestHeader was Send") { + return ((asapo::GenericRequestHeader*)arg)->op_code == asapo::kOpcodeTransferData + && ((asapo::GenericRequestHeader*)arg)->data_id == file_id + && ((asapo::GenericRequestHeader*)arg)->data_size == file_size; +} + +TEST_F(RequestHandlerFilesystemTests, CallBackErrorIfCannotSaveFile) { + EXPECT_CALL(mock_io, WriteDataToFile_t(expected_fullpath, expected_data_pointer, expected_file_size)) + .WillOnce( + Return( + asapo::IOErrorTemplates::kUnknownIOError.Generate().release()) + ); + + + auto err = request_handler.ProcessRequestUnlocked(&request); + + ASSERT_THAT(callback_err, Eq(asapo::IOErrorTemplates::kUnknownIOError)); + ASSERT_THAT(called, Eq(true)); + ASSERT_THAT(err, Eq(nullptr)); +} + +TEST_F(RequestHandlerFilesystemTests, WorksWithemptyCallback) { + EXPECT_CALL(mock_io, WriteDataToFile_t(expected_fullpath, expected_data_pointer, expected_file_size)) + .WillOnce( + Return( + asapo::IOErrorTemplates::kUnknownIOError.Generate().release()) + ); + + + auto err = request_handler.ProcessRequestUnlocked(&request_nocallback); + + ASSERT_THAT(called, Eq(false)); + ASSERT_THAT(err, Eq(nullptr)); +} + + + +TEST_F(RequestHandlerFilesystemTests, TransferOK) { + EXPECT_CALL(mock_io, WriteDataToFile_t(expected_fullpath, expected_data_pointer, expected_file_size)) + .WillOnce( + Return( + nullptr) + ); + + request_handler.PrepareProcessingRequestLocked(); + auto err = request_handler.ProcessRequestUnlocked(&request); + + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(callback_err, Eq(nullptr)); + ASSERT_THAT(called, Eq(true)); + ASSERT_THAT(callback_header.data_size, Eq(header.data_size)); + ASSERT_THAT(callback_header.op_code, Eq(header.op_code)); + ASSERT_THAT(callback_header.data_id, Eq(header.data_id)); + ASSERT_THAT(std::string{callback_header.file_name}, Eq(std::string{header.file_name})); +} + + +} diff --git a/producer/api/unittests/test_request_handler_tcp.cpp b/producer/api/unittests/test_request_handler_tcp.cpp index 009480a6e..3eb1e025f 100644 --- a/producer/api/unittests/test_request_handler_tcp.cpp +++ b/producer/api/unittests/test_request_handler_tcp.cpp @@ -6,7 +6,7 @@ #include "common/error.h" #include "io/io.h" -#include "producer/producer.h" +#include "producer/common.h" #include "producer/producer_error.h" #include "../src/request_handler_tcp.h" @@ -49,15 +49,16 @@ class RequestHandlerTcpTests : public testing::Test { uint64_t expected_file_id = 42; uint64_t expected_file_size = 1337; + std::string expected_file_name = "test_name"; uint64_t expected_thread_id = 2; - asapo::Opcode expected_op_code = asapo::kNetOpcodeSendData; + asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; void* expected_file_pointer = (void*)0xC00FE; asapo::Error callback_err; - asapo::GenericNetworkRequestHeader header{expected_op_code, expected_file_id, expected_file_size}; + asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size,expected_file_name}; bool called = false; - asapo::GenericNetworkRequestHeader callback_header; - asapo::Request request{header, expected_file_pointer, [this](asapo::GenericNetworkRequestHeader header, asapo::Error err) { + asapo::GenericRequestHeader callback_header; + asapo::Request request{header, expected_file_pointer, [this](asapo::GenericRequestHeader header, asapo::Error err) { called = true; callback_err = std::move(err); callback_header = header; @@ -100,15 +101,15 @@ class RequestHandlerTcpTests : public testing::Test { }; ACTION_P(A_WriteSendDataResponse, error_code) { - ((asapo::SendDataResponse*)arg1)->op_code = asapo::kNetOpcodeSendData; + ((asapo::SendDataResponse*)arg1)->op_code = asapo::kOpcodeTransferData; ((asapo::SendDataResponse*)arg1)->error_code = error_code; } MATCHER_P2(M_CheckSendDataRequest, file_id, file_size, - "Checks if a valid GenericNetworkRequestHeader was Send") { - return ((asapo::GenericNetworkRequestHeader*)arg)->op_code == asapo::kNetOpcodeSendData - && ((asapo::GenericNetworkRequestHeader*)arg)->data_id == file_id - && ((asapo::GenericNetworkRequestHeader*)arg)->data_size == file_size; + "Checks if a valid GenericRequestHeader was Send") { + return ((asapo::GenericRequestHeader*)arg)->op_code == asapo::kOpcodeTransferData + && ((asapo::GenericRequestHeader*)arg)->data_id == file_id + && ((asapo::GenericRequestHeader*)arg)->data_size == file_size; } @@ -130,7 +131,7 @@ void RequestHandlerTcpTests::ExpectFailSendHeader(bool only_once) { for (auto expected_sd : expected_sds) { EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(expected_file_id, expected_file_size), - sizeof(asapo::GenericNetworkRequestHeader), _)) + sizeof(asapo::GenericRequestHeader), _)) .WillOnce( DoAll( testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), @@ -214,11 +215,11 @@ void RequestHandlerTcpTests::ExpectOKSendHeader(bool only_once) { for (auto expected_sd : expected_sds) { EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(expected_file_id, expected_file_size), - sizeof(asapo::GenericNetworkRequestHeader), _)) + sizeof(asapo::GenericRequestHeader), _)) .WillOnce( DoAll( testing::SetArgPointee<3>(nullptr), - Return(sizeof(asapo::GenericNetworkRequestHeader)) + Return(sizeof(asapo::GenericRequestHeader)) )); if (only_once) break; } @@ -357,7 +358,7 @@ TEST_F(RequestHandlerTcpTests, DoesNotTryConnectWhenConnected) { -TEST_F(RequestHandlerTcpTests, DoNotCloseWhenRebalanceAndNotConnected) { +TEST_F(RequestHandlerTcpTests, DoNotCloseWhenNotConnected) { EXPECT_CALL(mock_io, CloseSocket_t(_, _)).Times(0); ExpectOKConnect(); ExpectFailSendHeader(); @@ -470,7 +471,7 @@ TEST_F(RequestHandlerTcpTests, SendOK) { ASSERT_THAT(callback_header.data_size, Eq(header.data_size)); ASSERT_THAT(callback_header.op_code, Eq(header.op_code)); ASSERT_THAT(callback_header.data_id, Eq(header.data_id)); - + ASSERT_THAT(std::string{callback_header.file_name}, Eq(std::string{header.file_name})); } diff --git a/producer/api/unittests/test_request_pool.cpp b/producer/api/unittests/test_request_pool.cpp index d3b50af57..5b2c8f3bb 100644 --- a/producer/api/unittests/test_request_pool.cpp +++ b/producer/api/unittests/test_request_pool.cpp @@ -36,14 +36,14 @@ using asapo::RequestPool; using asapo::Error; using asapo::ErrorInterface; using asapo::Request; -using asapo::GenericNetworkRequestHeader; +using asapo::GenericRequestHeader; class MockRequestHandlerFactory : public asapo::RequestHandlerFactory { public: MockRequestHandlerFactory(RequestHandler* request_handler): - RequestHandlerFactory(asapo::RequestHandlerType::kTcp, nullptr) { + RequestHandlerFactory(nullptr) { request_handler_ = request_handler; } std::unique_ptr<RequestHandler> NewRequestHandler(uint64_t thread_id, uint64_t* shared_counter) override { @@ -62,7 +62,7 @@ class RequestPoolTests : public testing::Test { MockRequestHandlerFactory request_handler_factory{mock_request_handler}; const uint8_t nthreads = 1; asapo::RequestPool pool {nthreads, &request_handler_factory}; - std::unique_ptr<Request> request{new Request{GenericNetworkRequestHeader{}, nullptr, nullptr}}; + std::unique_ptr<Request> request{new Request{GenericRequestHeader{}, nullptr, nullptr}}; void SetUp() override { pool.log__ = &mock_logger; } @@ -73,7 +73,7 @@ class RequestPoolTests : public testing::Test { TEST(RequestPool, Constructor) { NiceMock<MockDiscoveryService> ds; - NiceMock<asapo::RequestHandlerFactory> request_handler_factory{asapo::RequestHandlerType::kTcp, &ds}; + NiceMock<asapo::RequestHandlerFactory> request_handler_factory{&ds}; asapo::RequestPool pool{4, &request_handler_factory}; @@ -118,7 +118,7 @@ TEST_F(RequestPoolTests, AddRequestCallsSend) { TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { - Request* request2 = new Request{GenericNetworkRequestHeader{}, nullptr, nullptr}; + Request* request2 = new Request{GenericRequestHeader{}, nullptr, nullptr}; ExpectSend(mock_request_handler, 2); diff --git a/receiver/src/connection.cpp b/receiver/src/connection.cpp index b86a1f0ef..eb4165f6a 100644 --- a/receiver/src/connection.cpp +++ b/receiver/src/connection.cpp @@ -51,7 +51,7 @@ Error Connection::ProcessRequest(const std::unique_ptr<Request>& request) const void Connection::ProcessStatisticsAfterRequest(const std::unique_ptr<Request>& request) const noexcept { statistics__->IncreaseRequestCounter(); - statistics__->IncreaseRequestDataVolume(request->GetDataSize() + sizeof(GenericNetworkRequestHeader) + + statistics__->IncreaseRequestDataVolume(request->GetDataSize() + sizeof(GenericRequestHeader) + sizeof(GenericNetworkResponse)); statistics__->SendIfNeeded(); } @@ -82,9 +82,9 @@ void Connection::Listen() const noexcept { std::unique_ptr<Request> Connection::WaitForNewRequest(Error* err) const noexcept { //TODO: to be overwritten with MessagePack (or similar) - GenericNetworkRequestHeader generic_request_header; + GenericRequestHeader generic_request_header; statistics__->StartTimer(StatisticEntity::kNetwork); - io__->ReceiveWithTimeout(socket_fd_, &generic_request_header, sizeof(GenericNetworkRequestHeader), 50, err); + io__->ReceiveWithTimeout(socket_fd_, &generic_request_header, sizeof(GenericRequestHeader), 50, err); if(*err) { if(*err == IOErrorTemplates::kTimeout) { *err = nullptr;//Not an error in this case diff --git a/receiver/src/request.cpp b/receiver/src/request.cpp index 3db277350..1a836e9f6 100644 --- a/receiver/src/request.cpp +++ b/receiver/src/request.cpp @@ -4,7 +4,7 @@ #include "receiver_config.h" namespace asapo { -Request::Request(const GenericNetworkRequestHeader& header, +Request::Request(const GenericRequestHeader& header, SocketDescriptor socket_fd) : io__{GenerateDefaultIO()}, request_header_(header), socket_fd_{socket_fd} { } @@ -77,12 +77,12 @@ std::string Request::GetFileName() const { return std::to_string(request_header_.data_id) + ".bin"; } -std::unique_ptr<Request> RequestFactory::GenerateRequest(const GenericNetworkRequestHeader& +std::unique_ptr<Request> RequestFactory::GenerateRequest(const GenericRequestHeader& request_header, SocketDescriptor socket_fd, Error* err) const noexcept { *err = nullptr; switch (request_header.op_code) { - case Opcode::kNetOpcodeSendData: { + case Opcode::kOpcodeTransferData: { auto request = std::unique_ptr<Request> {new Request{request_header, socket_fd}}; if (GetReceiverConfig()->write_to_disk) { diff --git a/receiver/src/request.h b/receiver/src/request.h index a8fca6768..3b2f05698 100644 --- a/receiver/src/request.h +++ b/receiver/src/request.h @@ -17,7 +17,7 @@ class Request { public: virtual Error Handle(std::unique_ptr<Statistics>*); virtual ~Request() = default; - Request(const GenericNetworkRequestHeader& request_header, SocketDescriptor socket_fd); + Request(const GenericRequestHeader& request_header, SocketDescriptor socket_fd); void AddHandler(const RequestHandler*); const RequestHandlerList& GetListHandlers() const; virtual uint64_t GetDataSize() const; @@ -29,7 +29,7 @@ class Request { private: Error AllocateDataBuffer(); Error ReceiveData(); - const GenericNetworkRequestHeader request_header_; + const GenericRequestHeader request_header_; const SocketDescriptor socket_fd_; FileData data_buffer_; RequestHandlerList handlers_; @@ -37,7 +37,7 @@ class Request { class RequestFactory { public: - virtual std::unique_ptr<Request> GenerateRequest(const GenericNetworkRequestHeader& request_header, + virtual std::unique_ptr<Request> GenerateRequest(const GenericRequestHeader& request_header, SocketDescriptor socket_fd, Error* err) const noexcept; private: RequestHandlerFileWrite request_handler_filewrite_; diff --git a/receiver/unittests/test_connection.cpp b/receiver/unittests/test_connection.cpp index b18701ee2..45634411d 100644 --- a/receiver/unittests/test_connection.cpp +++ b/receiver/unittests/test_connection.cpp @@ -30,9 +30,9 @@ using asapo::Error; using asapo::ErrorInterface; using asapo::FileDescriptor; using asapo::SocketDescriptor; -using asapo::GenericNetworkRequestHeader; +using asapo::GenericRequestHeader; using asapo::SendDataResponse; -using asapo::GenericNetworkRequestHeader; +using asapo::GenericRequestHeader; using asapo::GenericNetworkResponse; using asapo::Opcode; using asapo::Connection; @@ -56,7 +56,7 @@ TEST(Connection, Constructor) { class MockRequestHandler: public Request { public: - MockRequestHandler(const GenericNetworkRequestHeader& request_header, SocketDescriptor socket_fd): + MockRequestHandler(const GenericRequestHeader& request_header, SocketDescriptor socket_fd): Request(request_header, socket_fd) {}; Error Handle(std::unique_ptr<Statistics>* statistics) override { return Error{Handle_t()}; @@ -67,7 +67,7 @@ class MockRequestHandler: public Request { class MockRequestFactory: public asapo::RequestFactory { public: - std::unique_ptr<Request> GenerateRequest(const GenericNetworkRequestHeader& request_header, + std::unique_ptr<Request> GenerateRequest(const GenericRequestHeader& request_header, SocketDescriptor socket_fd, Error* err) const noexcept override { ErrorInterface* error = nullptr; @@ -76,7 +76,7 @@ class MockRequestFactory: public asapo::RequestFactory { return std::unique_ptr<Request> {res}; } - MOCK_CONST_METHOD3(GenerateRequest_t, Request * (const GenericNetworkRequestHeader&, + MOCK_CONST_METHOD3(GenerateRequest_t, Request * (const GenericRequestHeader&, SocketDescriptor socket_fd, ErrorInterface**)); @@ -138,7 +138,7 @@ ACTION_P(SaveArg1ToGenericNetworkResponse, value) { TEST_F(ConnectionTests, CallsHandleRequest) { - GenericNetworkRequestHeader header; + GenericRequestHeader header; auto request = new MockRequestHandler{header, 1}; EXPECT_CALL(mock_io, ReceiveWithTimeout_t(_, _, _, _, _)); @@ -171,7 +171,7 @@ TEST_F(ConnectionTests, CallsHandleRequest) { TEST_F(ConnectionTests, SendsErrorToProducer) { - GenericNetworkRequestHeader header; + GenericRequestHeader header; auto request = new MockRequestHandler{header, 1}; EXPECT_CALL(mock_io, ReceiveWithTimeout_t(_, _, _, _, _)); @@ -208,7 +208,7 @@ void MockExitCycle(const MockIO& mock_io, MockStatistics& mock_statictics) { } MockRequestHandler* MockWaitRequest(const MockRequestFactory& mock_factory) { - GenericNetworkRequestHeader header; + GenericRequestHeader header; header.data_size = 1; auto request = new MockRequestHandler{header, 1}; EXPECT_CALL(mock_factory, GenerateRequest_t(_, _, _)).WillOnce( @@ -240,7 +240,7 @@ TEST_F(ConnectionTests, FillsStatistics) { EXPECT_CALL(mock_statictics, IncreaseRequestCounter_t()); - EXPECT_CALL(mock_statictics, IncreaseRequestDataVolume_t(1 + sizeof(asapo::GenericNetworkRequestHeader) + + EXPECT_CALL(mock_statictics, IncreaseRequestDataVolume_t(1 + sizeof(asapo::GenericRequestHeader) + sizeof(asapo::GenericNetworkResponse))); diff --git a/receiver/unittests/test_request.cpp b/receiver/unittests/test_request.cpp index a22fec1ed..f238e9891 100644 --- a/receiver/unittests/test_request.cpp +++ b/receiver/unittests/test_request.cpp @@ -28,9 +28,9 @@ using ::asapo::Error; using ::asapo::ErrorInterface; using ::asapo::FileDescriptor; using ::asapo::SocketDescriptor; -using ::asapo::GenericNetworkRequestHeader; +using ::asapo::GenericRequestHeader; using ::asapo::SendDataResponse; -using ::asapo::GenericNetworkRequestHeader; +using ::asapo::GenericRequestHeader; using ::asapo::GenericNetworkResponse; using ::asapo::Opcode; using ::asapo::Connection; @@ -63,7 +63,7 @@ class MockReqestHandler : public asapo::RequestHandler { class RequestTests : public Test { public: - GenericNetworkRequestHeader generic_request_header; + GenericRequestHeader generic_request_header; asapo::SocketDescriptor socket_fd_{1}; uint64_t data_size_ {100}; uint64_t data_id_{15}; diff --git a/receiver/unittests/test_request_factory.cpp b/receiver/unittests/test_request_factory.cpp index 5b2bc53bd..f3a34e840 100644 --- a/receiver/unittests/test_request_factory.cpp +++ b/receiver/unittests/test_request_factory.cpp @@ -29,7 +29,7 @@ using ::testing::InSequence; using ::testing::SetArgPointee; using ::asapo::Error; using ::asapo::ErrorInterface; -using ::asapo::GenericNetworkRequestHeader; +using ::asapo::GenericRequestHeader; using ::asapo::GenericNetworkResponse; using ::asapo::Opcode; using ::asapo::Connection; @@ -50,11 +50,11 @@ class FactoryTests : public Test { public: RequestFactory factory; Error err{nullptr}; - GenericNetworkRequestHeader generic_request_header; + GenericRequestHeader generic_request_header; ReceiverConfig config; void SetUp() override { - generic_request_header.op_code = asapo::Opcode::kNetOpcodeSendData; + generic_request_header.op_code = asapo::Opcode::kOpcodeTransferData; config.write_to_disk = true; config.write_to_db = true; SetReceiverConfig(config); @@ -64,14 +64,14 @@ class FactoryTests : public Test { }; TEST_F(FactoryTests, ErrorOnWrongCode) { - generic_request_header.op_code = asapo::Opcode::kNetOpcodeUnknownOp; + generic_request_header.op_code = asapo::Opcode::kOpcodeUnknownOp; auto request = factory.GenerateRequest(generic_request_header, 1, &err); ASSERT_THAT(err, Ne(nullptr)); } TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendDataCode) { - generic_request_header.op_code = asapo::Opcode::kNetOpcodeSendData; + generic_request_header.op_code = asapo::Opcode::kOpcodeTransferData; auto request = factory.GenerateRequest(generic_request_header, 1, &err); ASSERT_THAT(err, Eq(nullptr)); diff --git a/receiver/unittests/test_request_handler_db_writer.cpp b/receiver/unittests/test_request_handler_db_writer.cpp index 4fb824ee4..465b1efbb 100644 --- a/receiver/unittests/test_request_handler_db_writer.cpp +++ b/receiver/unittests/test_request_handler_db_writer.cpp @@ -40,7 +40,7 @@ using ::asapo::SocketDescriptor; using ::asapo::MockIO; using asapo::Request; using asapo::RequestHandlerDbWrite; -using ::asapo::GenericNetworkRequestHeader; +using ::asapo::GenericRequestHeader; using asapo::MockDatabase; using asapo::RequestFactory; @@ -52,7 +52,7 @@ namespace { class MockRequestHandler: public Request { public: - MockRequestHandler(const GenericNetworkRequestHeader& request_header, SocketDescriptor socket_fd): + MockRequestHandler(const GenericRequestHeader& request_header, SocketDescriptor socket_fd): Request(request_header, socket_fd) {}; MOCK_CONST_METHOD0(GetFileName, std::string()); @@ -70,7 +70,7 @@ class DbWriterHandlerTests : public Test { NiceMock<asapo::MockLogger> mock_logger; ReceiverConfig config; void SetUp() override { - GenericNetworkRequestHeader request_header; + GenericRequestHeader request_header; request_header.data_id = 2; handler.db_client__ = std::unique_ptr<asapo::Database> {&mock_db}; handler.log__ = &mock_logger; diff --git a/receiver/unittests/test_request_handler_file_write.cpp b/receiver/unittests/test_request_handler_file_write.cpp index 58fa342ca..1bd450f88 100644 --- a/receiver/unittests/test_request_handler_file_write.cpp +++ b/receiver/unittests/test_request_handler_file_write.cpp @@ -35,7 +35,7 @@ using ::asapo::SocketDescriptor; using ::asapo::MockIO; using asapo::Request; using asapo::RequestHandlerFileWrite; -using ::asapo::GenericNetworkRequestHeader; +using ::asapo::GenericRequestHeader; namespace { @@ -48,7 +48,7 @@ TEST(FileWrite, Constructor) { class MockRequestHandler: public Request { public: - MockRequestHandler(const GenericNetworkRequestHeader& request_header, SocketDescriptor socket_fd): + MockRequestHandler(const GenericRequestHeader& request_header, SocketDescriptor socket_fd): Request(request_header, socket_fd) {}; MOCK_CONST_METHOD0(GetFileName, std::string()); @@ -66,7 +66,7 @@ class FileWriteHandlerTests : public Test { uint64_t expected_file_size = 10; void MockRequestData(); void SetUp() override { - GenericNetworkRequestHeader request_header; + GenericRequestHeader request_header; request_header.data_id = 2; mock_request.reset(new MockRequestHandler{request_header, 1}); handler.io__ = std::unique_ptr<asapo::IO> {&mock_io}; -- GitLab