From 071cd8869fb7e75eae757c980cacba078a5342c8 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Mon, 11 Nov 2019 15:29:23 +0100 Subject: [PATCH] receiver receives file directly on disk --- common/cpp/include/common/networking.h | 1 - common/cpp/include/io/io.h | 3 +- common/cpp/include/unittests/MockIO.h | 7 + common/cpp/src/system_io/system_io.cpp | 69 +++++++-- common/cpp/src/system_io/system_io.h | 8 +- .../cluster/asapo_overwrite_vars.tfvars | 1 + .../cluster/scripts/asapo-receivers.nmd.tpl | 1 + .../cluster/scripts/asapo.auto.tfvars.in | 1 + .../docker/cluster/scripts/receiver.json.tpl | 1 + deploy/docker/cluster/scripts/templates.tf | 1 + deploy/docker/cluster/scripts/vars.tf | 2 + deploy/nomad_jobs/receiver.json.tpl | 1 + producer/api/cpp/src/request_handler_tcp.cpp | 2 +- .../unittests/test_request_handler_tcp.cpp | 2 +- receiver/CMakeLists.txt | 9 +- receiver/src/receiver_config.cpp | 1 + receiver/src/receiver_config.h | 1 + receiver/src/request.cpp | 2 +- receiver/src/request.h | 5 +- receiver/src/request_factory.cpp | 36 ++++- receiver/src/request_factory.h | 9 +- receiver/src/request_handler_file_receive.cpp | 33 ++++ receiver/src/request_handler_file_receive.h | 22 +++ .../src/request_handler_receive_metadata.cpp | 36 +++++ .../src/request_handler_receive_metadata.h | 22 +++ receiver/src/requests_dispatcher.cpp | 4 +- receiver/unittests/mock_receiver_config.cpp | 2 + receiver/unittests/receiver_mocking.h | 5 +- receiver/unittests/test_config.cpp | 6 +- receiver/unittests/test_request_factory.cpp | 64 +++++++- .../test_request_handler_file_receive.cpp | 145 ++++++++++++++++++ .../test_request_handler_receive_data.cpp | 1 - .../test_request_handler_receive_metadata.cpp | 126 +++++++++++++++ .../unittests/test_requests_dispatcher.cpp | 11 +- .../producer_receiver/CMakeLists.txt | 1 + .../CMakeLists.txt | 7 + .../check_linux.sh | 44 ++++++ .../check_windows.bat | 47 ++++++ .../settings/receiver.json.tpl.lin.in | 1 + .../settings/receiver.json.tpl.win.in | 1 + .../client_serv/ip_tcp_network.cpp | 9 +- .../receiver.json | 1 + .../receiver.json | 1 + .../python_tests/producer/receiver.json.tpl | 1 + .../manual/receiver_debug_local/receiver.json | 1 + 45 files changed, 708 insertions(+), 46 deletions(-) create mode 100644 receiver/src/request_handler_file_receive.cpp create mode 100644 receiver/src/request_handler_file_receive.h create mode 100644 receiver/src/request_handler_receive_metadata.cpp create mode 100644 receiver/src/request_handler_receive_metadata.h create mode 100644 receiver/unittests/test_request_handler_file_receive.cpp create mode 100644 receiver/unittests/test_request_handler_receive_metadata.cpp create mode 100644 tests/automatic/producer_receiver/transfer_single_file_bypass_buffer/CMakeLists.txt create mode 100644 tests/automatic/producer_receiver/transfer_single_file_bypass_buffer/check_linux.sh create mode 100644 tests/automatic/producer_receiver/transfer_single_file_bypass_buffer/check_windows.bat diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index 2895436af..66c0c7ceb 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -29,7 +29,6 @@ enum NetworkErrorCode : uint16_t { kNetErrorNoData, kNetAuthorizationError, kNetErrorFileIdAlreadyInUse, - kNetErrorErrorInMetadata, kNetErrorAllocateStorageFailed, kNetErrorInternalServerError = 65535, }; diff --git a/common/cpp/include/io/io.h b/common/cpp/include/io/io.h index 7951f2dbe..92e79f10e 100644 --- a/common/cpp/include/io/io.h +++ b/common/cpp/include/io/io.h @@ -103,7 +103,8 @@ class IO { size_t length, bool create_directories) const = 0; virtual Error WriteDataToFile (const std::string& root_folder, const std::string& fname, const uint8_t* data, size_t length, bool create_directories) const = 0; - + virtual Error ReceiveDataToFile(SocketDescriptor socket, const std::string& root_folder, const std::string& fname, + size_t length, bool create_directories) const = 0; virtual void CreateNewDirectory (const std::string& directory_name, Error* err) const = 0; virtual FileData GetDataFromFile (const std::string& fname, uint64_t* fsize, Error* err) const = 0; virtual SubDirList GetSubDirectories(const std::string& path, Error* err) const = 0; diff --git a/common/cpp/include/unittests/MockIO.h b/common/cpp/include/unittests/MockIO.h index ce0583f9a..e24eee23d 100644 --- a/common/cpp/include/unittests/MockIO.h +++ b/common/cpp/include/unittests/MockIO.h @@ -235,6 +235,13 @@ class MockIO : public IO { } + MOCK_CONST_METHOD5(ReceiveDataToFile_t, ErrorInterface * (SocketDescriptor socket, const std::string& root_folder, + const std::string& fname, size_t fsize, bool create_directories)); + + Error ReceiveDataToFile(SocketDescriptor socket, const std::string& root_folder, const std::string& fname, + size_t length, bool create_directories) const override { + return Error{ReceiveDataToFile_t(socket, root_folder, fname, length, create_directories)}; + } MOCK_CONST_METHOD5(WriteDataToFile_t, ErrorInterface * (const std::string& root_folder, const std::string& fname, diff --git a/common/cpp/src/system_io/system_io.cpp b/common/cpp/src/system_io/system_io.cpp index e1493c741..6bc1e55dd 100644 --- a/common/cpp/src/system_io/system_io.cpp +++ b/common/cpp/src/system_io/system_io.cpp @@ -27,7 +27,7 @@ namespace asapo { const int SystemIO::kNetBufferSize = 1024 * 1024; const int SystemIO::kWaitTimeoutMs = 1000; const size_t SystemIO::kMaxTransferChunkSize = size_t(1024) * size_t(1024) * size_t(1024) * size_t(2); //2GiByte -const size_t SystemIO::kReadBufSize = size_t(1024) * 1024 * 50; //50MiByte +const size_t SystemIO::kReadWriteBufSize = size_t(1024) * 1024 * 50; //50MiByte @@ -146,36 +146,43 @@ void asapo::SystemIO::CreateNewDirectory(const std::string& directory_name, Erro } } -Error SystemIO::WriteDataToFile(const std::string& root_folder, const std::string& fname, const uint8_t* data, - size_t length, bool create_directories) const { +FileDescriptor SystemIO::OpenWithCreateFolders(const std::string& root_folder, const std::string& fname, + bool create_directories, Error* err) const { std::string full_name; if (!root_folder.empty()) { full_name = root_folder + kPathSeparator + fname; } else { full_name = fname; } - Error err; - auto fd = Open(full_name, IO_OPEN_MODE_CREATE | IO_OPEN_MODE_RW | IO_OPEN_MODE_SET_LENGTH_0, &err); - if (err == IOErrorTemplates::kFileNotFound && create_directories) { + auto fd = Open(full_name, IO_OPEN_MODE_CREATE | IO_OPEN_MODE_RW | IO_OPEN_MODE_SET_LENGTH_0, err); + if (*err == IOErrorTemplates::kFileNotFound && create_directories) { size_t pos = fname.rfind(kPathSeparator); if (pos == std::string::npos) { - return IOErrorTemplates::kFileNotFound.Generate(); + *err = IOErrorTemplates::kFileNotFound.Generate(full_name); + return -1; } - auto err_create = CreateDirectoryWithParents(root_folder, fname.substr(0, pos)); - if (err_create) { - return err_create; + *err = CreateDirectoryWithParents(root_folder, fname.substr(0, pos)); + if (*err) { + return -1; } - return WriteDataToFile(root_folder, fname, data, length, false); + return OpenWithCreateFolders(root_folder, fname, false, err); } + return fd; + +} + +Error SystemIO::WriteDataToFile(const std::string& root_folder, const std::string& fname, const uint8_t* data, + size_t length, bool create_directories) const { + Error err; + auto fd = OpenWithCreateFolders(root_folder, fname, create_directories, &err); if (err) { - err->Append(full_name); return err; } Write(fd, data, length, &err); if (err) { - err->Append(full_name); + err->Append(fname); return err; } @@ -624,7 +631,7 @@ Error SystemIO::SendFile(SocketDescriptor socket_fd, const std::string& fname, s size_t total_bytes_sent = 0; - size_t buf_size = std::min(length, kReadBufSize); + size_t buf_size = std::min(length, kReadWriteBufSize); Error err; auto fd = Open(fname, IO_OPEN_MODE_READ, &err); @@ -655,5 +662,39 @@ Error SystemIO::SendFile(SocketDescriptor socket_fd, const std::string& fname, s return nullptr; } +Error SystemIO:: ReceiveDataToFile(SocketDescriptor socket, const std::string& root_folder, const std::string& fname, + size_t length, bool create_directories) const { + Error err; + auto fd = OpenWithCreateFolders(root_folder, fname, create_directories, &err); + if (err) { + return err; + } + + size_t buf_size = std::min(length, kReadWriteBufSize); + auto data_array = std::unique_ptr<uint8_t> {AllocateArray(buf_size, &err)}; + if (err != nullptr) { + return err; + } + + size_t total_bytes_written = 0; + while (total_bytes_written < length) { + auto bytes_received = Receive(socket, data_array.get(), std::min(buf_size, length - total_bytes_written), &err); + if (err != nullptr && err != ErrorTemplates::kEndOfFile) { + Close(fd, nullptr); + return err; + } + auto bytes_written = Write(fd, data_array.get(), bytes_received, &err); + if (err != nullptr) { + Close(fd, nullptr); + return err; + } + total_bytes_written += bytes_written; + } + + Close(fd, nullptr); + return nullptr; +} + + } diff --git a/common/cpp/src/system_io/system_io.h b/common/cpp/src/system_io/system_io.h index e0f84caf8..3d0c15252 100644 --- a/common/cpp/src/system_io/system_io.h +++ b/common/cpp/src/system_io/system_io.h @@ -26,7 +26,7 @@ class SystemIO final : public IO { private: static const int kNetBufferSize;//TODO: need to set by config static const size_t kMaxTransferChunkSize; - static const size_t kReadBufSize; + static const size_t kReadWriteBufSize; static const int kWaitTimeoutMs; @@ -80,6 +80,9 @@ class SystemIO final : public IO { Error CreateEpoolIfNeeded(SocketDescriptor master_socket) const; Error ProcessNewConnection(SocketDescriptor master_socket, std::vector<std::string>* new_connections, ListSocketDescriptors* sockets_to_listen) const; + FileDescriptor OpenWithCreateFolders(const std::string& root_folder, const std::string& fname, + bool create_directories, Error* err) const; + #endif public: ~SystemIO(); @@ -130,6 +133,9 @@ class SystemIO final : public IO { FileData GetDataFromFile(const std::string& fname, uint64_t* fsize, Error* err) const override; Error WriteDataToFile (const std::string& root_folder, const std::string& fname, const FileData& data, size_t length, bool create_directories) const override; + Error ReceiveDataToFile(SocketDescriptor socket, const std::string& root_folder, const std::string& fname, + size_t length, bool create_directories) const override; + Error WriteDataToFile(const std::string& root_folder, const std::string& fname, const uint8_t* data, size_t length, bool create_directories) const override; SubDirList GetSubDirectories(const std::string& path, Error* err) const override; diff --git a/deploy/docker/cluster/asapo_overwrite_vars.tfvars b/deploy/docker/cluster/asapo_overwrite_vars.tfvars index 83378f60e..11be9b693 100644 --- a/deploy/docker/cluster/asapo_overwrite_vars.tfvars +++ b/deploy/docker/cluster/asapo_overwrite_vars.tfvars @@ -2,6 +2,7 @@ elk_logs = true receiver_total_memory_size = 35000 receiver_dataserver_cache_size = 30 #gb +receiver_receive_to_disk_threshold = 50 # mb receiver_dataserver_nthreads = 8 grafana_total_memory_size = 2000 diff --git a/deploy/docker/cluster/scripts/asapo-receivers.nmd.tpl b/deploy/docker/cluster/scripts/asapo-receivers.nmd.tpl index af4c2f500..1b96b3f3c 100644 --- a/deploy/docker/cluster/scripts/asapo-receivers.nmd.tpl +++ b/deploy/docker/cluster/scripts/asapo-receivers.nmd.tpl @@ -78,6 +78,7 @@ job "asapo-receivers" { meta { receiver_dataserver_cache_size = "${receiver_dataserver_cache_size}" receiver_dataserver_nthreads = "${receiver_dataserver_nthreads}" + receiver_receive_to_disk_threshold = "${receiver_receive_to_disk_threshold}" } diff --git a/deploy/docker/cluster/scripts/asapo.auto.tfvars.in b/deploy/docker/cluster/scripts/asapo.auto.tfvars.in index 8f2df9e65..cfbcb6c97 100644 --- a/deploy/docker/cluster/scripts/asapo.auto.tfvars.in +++ b/deploy/docker/cluster/scripts/asapo.auto.tfvars.in @@ -17,6 +17,7 @@ receiver_total_memory_size = "2000" receiver_dataserver_cache_size = "1" #gb receiver_dataserver_nthreads = 4 +receiver_receive_to_disk_threshold = 50 #mb grafana_total_memory_size = "256" diff --git a/deploy/docker/cluster/scripts/receiver.json.tpl b/deploy/docker/cluster/scripts/receiver.json.tpl index 94eada3d8..0000e509a 100644 --- a/deploy/docker/cluster/scripts/receiver.json.tpl +++ b/deploy/docker/cluster/scripts/receiver.json.tpl @@ -18,6 +18,7 @@ }, "Tag": "{{ env "attr.unique.hostname" }}", "WriteToDisk":true, + "ReceiveToDiskThresholdMB": {{ env "NOMAD_META_receiver_receive_to_disk_threshold" }}, "WriteToDb":true, "LogLevel": "{{ keyOrDefault "receiver_log_level" "info" }}", "RootFolder" : "/var/lib/receiver/data" diff --git a/deploy/docker/cluster/scripts/templates.tf b/deploy/docker/cluster/scripts/templates.tf index 586d3ac9f..9eb5e0972 100644 --- a/deploy/docker/cluster/scripts/templates.tf +++ b/deploy/docker/cluster/scripts/templates.tf @@ -38,6 +38,7 @@ data "template_file" "asapo_receivers" { nomad_logs = "${var.nomad_logs}" receiver_total_memory_size = "${var.receiver_total_memory_size}" receiver_dataserver_cache_size = "${var.receiver_dataserver_cache_size}" + receiver_receive_to_disk_threshold= "${var.receiver_receive_to_disk_threshold}" receiver_dataserver_nthreads = "${var.receiver_dataserver_nthreads}" asapo_user = "${var.asapo_user}" n_receivers = "${var.n_receivers}" diff --git a/deploy/docker/cluster/scripts/vars.tf b/deploy/docker/cluster/scripts/vars.tf index 133fde9d1..a6c2814ac 100644 --- a/deploy/docker/cluster/scripts/vars.tf +++ b/deploy/docker/cluster/scripts/vars.tf @@ -34,6 +34,8 @@ variable "receiver_dataserver_cache_size" {} variable "receiver_dataserver_nthreads" {} +variable "receiver_receive_to_disk_threshold" {} + variable "grafana_total_memory_size" {} variable "influxdb_total_memory_size" {} diff --git a/deploy/nomad_jobs/receiver.json.tpl b/deploy/nomad_jobs/receiver.json.tpl index 8db2c97b3..a8c1938ef 100644 --- a/deploy/nomad_jobs/receiver.json.tpl +++ b/deploy/nomad_jobs/receiver.json.tpl @@ -18,6 +18,7 @@ }, "Tag": "{{ env "NOMAD_ADDR_recv" }}", "WriteToDisk":true, + "ReceiveToDiskThresholdMB":50, "WriteToDb":true, "LogLevel": "{{ keyOrDefault "receiver_log_level" "info" }}", "RootFolder" : "/var/lib/receiver/data" diff --git a/producer/api/cpp/src/request_handler_tcp.cpp b/producer/api/cpp/src/request_handler_tcp.cpp index 3fc32f4fb..9aab14b4e 100644 --- a/producer/api/cpp/src/request_handler_tcp.cpp +++ b/producer/api/cpp/src/request_handler_tcp.cpp @@ -91,7 +91,7 @@ Error RequestHandlerTcp::ReceiveResponse(const GenericRequestHeader& request_hea res_err->Append(sendDataResponse.message); return res_err; } - case kNetErrorErrorInMetadata : { + case kNetErrorWrongRequest : { auto res_err = ProducerErrorTemplates::kWrongInput.Generate(); res_err->Append(sendDataResponse.message); return res_err; diff --git a/producer/api/cpp/unittests/test_request_handler_tcp.cpp b/producer/api/cpp/unittests/test_request_handler_tcp.cpp index 36e3d0cd1..42988f20f 100644 --- a/producer/api/cpp/unittests/test_request_handler_tcp.cpp +++ b/producer/api/cpp/unittests/test_request_handler_tcp.cpp @@ -710,7 +710,7 @@ TEST_F(RequestHandlerTcpTests, ImmediatelyCallBackErrorIfFileAlreadyInUse) { } TEST_F(RequestHandlerTcpTests, ImmediatelyCallBackErrorIfWrongMetadata) { - AssertImmediatelyCallBack(asapo::kNetErrorErrorInMetadata, asapo::ProducerErrorTemplates::kWrongInput); + AssertImmediatelyCallBack(asapo::kNetErrorWrongRequest, asapo::ProducerErrorTemplates::kWrongInput); } TEST_F(RequestHandlerTcpTests, SendEmptyCallBack) { diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index a057bcec5..c991bcfa8 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -17,10 +17,13 @@ set(SOURCE_FILES src/receiver_data_server/tcp_server.cpp src/receiver_data_server/receiver_data_server_request.cpp src/receiver_data_server/receiver_data_server_logger.cpp - src/data_cache.cpp src/receiver_data_server/receiver_data_server_request_handler_factory.cpp + src/data_cache.cpp + src/receiver_data_server/receiver_data_server_request_handler_factory.cpp src/receiver_data_server/receiver_data_server_request_handler.cpp src/receiver_statistics.cpp src/request_handler_db_meta_write.cpp + src/request_handler_receive_metadata.cpp + src/request_handler_file_receive.cpp src/request_factory.cpp src/request_handler_db.cpp) @@ -64,16 +67,18 @@ set(TEST_SOURCE_FILES unittests/test_request.cpp unittests/test_request_factory.cpp unittests/test_request_handler_file_write.cpp + unittests/test_request_handler_file_receive.cpp unittests/test_request_handler_db_writer.cpp unittests/test_request_handler_db_meta_writer.cpp unittests/test_request_handler_db.cpp unittests/test_request_handler_authorizer.cpp + unittests/test_request_handler_receive_data.cpp + unittests/test_request_handler_receive_metadata.cpp unittests/test_statistics_sender_influx_db.cpp unittests/test_statistics_sender_fluentd.cpp unittests/mock_receiver_config.cpp unittests/test_requests_dispatcher.cpp unittests/test_datacache.cpp - unittests/test_request_handler_receive_data.cpp ) # set(TEST_LIBRARIES "${TARGET_NAME};system_io") diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp index 8401555df..05db25037 100644 --- a/receiver/src/receiver_config.cpp +++ b/receiver/src/receiver_config.cpp @@ -20,6 +20,7 @@ Error ReceiverConfigFactory::SetConfig(std::string file_name) { (err = parser.GetString("PerformanceDbServer", &config.performance_db_uri)) || (err = parser.GetUInt64("ListenPort", &config.listen_port)) || + (err = parser.GetUInt64("ReceiveToDiskThresholdMB", &config.receive_to_disk_threshold_mb)) || (err = parser.Embedded("DataServer").GetUInt64("ListenPort", &config.dataserver.listen_port)) || (err = parser.Embedded("DataServer").GetUInt64("NThreads", &config.dataserver.nthreads)) || (err = parser.GetBool("WriteToDisk", &config.write_to_disk)) || diff --git a/receiver/src/receiver_config.h b/receiver/src/receiver_config.h index 831e72232..115216026 100644 --- a/receiver/src/receiver_config.h +++ b/receiver/src/receiver_config.h @@ -21,6 +21,7 @@ struct ReceiverConfig { bool use_datacache = true; uint64_t datacache_size_gb = 0; uint64_t datacache_reserved_share = 0; + uint64_t receive_to_disk_threshold_mb = 0; LogLevel log_level = LogLevel::Info; std::string tag; std::string advertise_ip; diff --git a/receiver/src/request.cpp b/receiver/src/request.cpp index 24a91ed7c..7de01b565 100644 --- a/receiver/src/request.cpp +++ b/receiver/src/request.cpp @@ -135,7 +135,7 @@ void Request::UnlockDataBufferIfNeeded() { cache__->UnlockSlot(slot_meta_); } } -SocketDescriptor Request::GetSocket() { +SocketDescriptor Request::GetSocket() const { return socket_fd_; } diff --git a/receiver/src/request.h b/receiver/src/request.h index 84dedbd2e..85bf6a3a3 100644 --- a/receiver/src/request.h +++ b/receiver/src/request.h @@ -12,7 +12,8 @@ #include "request_handler_authorize.h" #include "request_handler_db_meta_write.h" #include "request_handler_receive_data.h" - +#include "request_handler_receive_metadata.h" +#include "request_handler_file_receive.h" #include "receiver_statistics.h" #include "data_cache.h" @@ -53,7 +54,7 @@ class Request { VIRTUAL const CustomRequestData& GetCustomData() const; VIRTUAL Error PrepareDataBufferAndLockIfNeeded(); VIRTUAL void UnlockDataBufferIfNeeded(); - VIRTUAL SocketDescriptor GetSocket(); + VIRTUAL SocketDescriptor GetSocket() const ; std::unique_ptr<IO> io__; DataCache* cache__ = nullptr; VIRTUAL uint64_t GetSlotId() const; diff --git a/receiver/src/request_factory.cpp b/receiver/src/request_factory.cpp index ee0f0ca83..27f31cb64 100644 --- a/receiver/src/request_factory.cpp +++ b/receiver/src/request_factory.cpp @@ -13,11 +13,37 @@ bool NeedDbHandler (const GenericRequestHeader& request_header) { return GetReceiverConfig()->write_to_db; } -Error RequestFactory::AddReceiveWriteHandlers(std::unique_ptr<Request>& request,const GenericRequestHeader& request_header) const { +bool RequestFactory::ReceiveDirectToFile(const GenericRequestHeader& request_header) const { + return request_header.data_size > GetReceiverConfig()->receive_to_disk_threshold_mb * 1024 * 1024; +} + +Error RequestFactory::AddReceiveWriteHandlers(std::unique_ptr<Request>& request, + const GenericRequestHeader& request_header) const { + if (ReceiveDirectToFile(request_header)) { + return AddReceiveDirectToFileHandler(request, request_header); + } else { + AddReceiveViaBufferHandlers(request, request_header); + return nullptr; + } +} + +void RequestFactory::AddReceiveViaBufferHandlers(std::unique_ptr<Request>& request, + const GenericRequestHeader& request_header) const { request->AddHandler(&request_handler_receivedata_); if (NeedFileWriteHandler(request_header)) { request->AddHandler(&request_handler_filewrite_); } +} + +Error RequestFactory::AddReceiveDirectToFileHandler(std::unique_ptr<Request>& request, + const GenericRequestHeader& request_header) const { + if (!GetReceiverConfig()->write_to_disk) { + return ReceiverErrorTemplates::kReject.Generate("reciever does not support writing to disk"); + } + if (! (request_header.custom_data[kPosIngestMode] & kStoreInFilesystem)) { + return ReceiverErrorTemplates::kBadRequest.Generate("ingest mode should include kStoreInFilesystem for large files "); + } + request->AddHandler(&request_handler_filereceive_); return nullptr; } @@ -27,8 +53,12 @@ Error RequestFactory::AddHandlersToRequest(std::unique_ptr<Request>& request, switch (request_header.op_code) { case Opcode::kOpcodeTransferData: - case Opcode::kOpcodeTransferSubsetData: { - AddReceiveWriteHandlers(request,request_header); + case Opcode::kOpcodeTransferSubsetData: { + request->AddHandler(&request_handler_receive_metadata_); + auto err = AddReceiveWriteHandlers(request, request_header); + if (err) { + return err; + } if (NeedDbHandler(request_header)) { request->AddHandler(&request_handler_dbwrite_); } diff --git a/receiver/src/request_factory.h b/receiver/src/request_factory.h index f51a5597e..350a44aac 100644 --- a/receiver/src/request_factory.h +++ b/receiver/src/request_factory.h @@ -12,13 +12,20 @@ class RequestFactory { SocketDescriptor socket_fd, std::string origin_uri, Error* err) const noexcept; private: Error AddHandlersToRequest(std::unique_ptr<Request>& request, const GenericRequestHeader& request_header) const; - Error AddReceiveWriteHandlers(std::unique_ptr<Request>& request,const GenericRequestHeader& request_header) const; + Error AddReceiveWriteHandlers(std::unique_ptr<Request>& request, const GenericRequestHeader& request_header) const; RequestHandlerFileWrite request_handler_filewrite_; RequestHandlerReceiveData request_handler_receivedata_; + RequestHandlerReceiveMetaData request_handler_receive_metadata_; RequestHandlerDbWrite request_handler_dbwrite_{kDBDataCollectionName}; RequestHandlerDbMetaWrite request_handler_db_meta_write_{kDBMetaCollectionName}; RequestHandlerAuthorize request_handler_authorize_; + RequestHandlerFileReceive request_handler_filereceive_; SharedCache cache_; + bool ReceiveDirectToFile(const GenericRequestHeader& request_header) const; + Error AddReceiveDirectToFileHandler(std::unique_ptr<Request>& request, + const GenericRequestHeader& request_header) const; + void AddReceiveViaBufferHandlers(std::unique_ptr<Request>& request, + const GenericRequestHeader& request_header) const; }; } diff --git a/receiver/src/request_handler_file_receive.cpp b/receiver/src/request_handler_file_receive.cpp new file mode 100644 index 000000000..9475a826c --- /dev/null +++ b/receiver/src/request_handler_file_receive.cpp @@ -0,0 +1,33 @@ +#include "request_handler_file_receive.h" +#include "io/io_factory.h" +#include "request.h" +#include "receiver_logger.h" +#include "receiver_config.h" +#include "preprocessor/definitions.h" + +namespace asapo { + +Error RequestHandlerFileReceive::ProcessRequest(Request* request) const { + auto fsize = request->GetDataSize(); + auto socket = request->GetSocket(); + auto fname = request->GetFileName(); + auto root_folder = GetReceiverConfig()->root_folder + kPathSeparator + + request->GetBeamline() + kPathSeparator + + request->GetBeamtimeId(); + auto err = io__->ReceiveDataToFile(socket, root_folder, fname, (size_t) fsize, true); + if (!err) { + log__->Debug("received file of size " + std::to_string(fsize) + " to " + root_folder + kPathSeparator + fname); + } + return err; +} + +RequestHandlerFileReceive::RequestHandlerFileReceive() : io__{GenerateDefaultIO()} , log__{GetDefaultReceiverLogger()} { + +} + +StatisticEntity RequestHandlerFileReceive::GetStatisticEntity() const { + return StatisticEntity::kDisk; +} + + +} diff --git a/receiver/src/request_handler_file_receive.h b/receiver/src/request_handler_file_receive.h new file mode 100644 index 000000000..4e9cdb4b5 --- /dev/null +++ b/receiver/src/request_handler_file_receive.h @@ -0,0 +1,22 @@ +#ifndef ASAPO_REQUEST_HANDLER_FILE_RECEIVE_H +#define ASAPO_REQUEST_HANDLER_FILE_RECEIVE_H + +#include "request_handler.h" +#include "logger/logger.h" + +#include "io/io.h" + +namespace asapo { + +class RequestHandlerFileReceive final: public ReceiverRequestHandler { + public: + RequestHandlerFileReceive(); + StatisticEntity GetStatisticEntity() const override; + Error ProcessRequest(Request* request) const override; + std::unique_ptr<IO> io__; + const AbstractLogger* log__; +}; + +} + +#endif //ASAPO_REQUEST_HANDLER_FILE_RECEIVE_H diff --git a/receiver/src/request_handler_receive_metadata.cpp b/receiver/src/request_handler_receive_metadata.cpp new file mode 100644 index 000000000..93c8d283c --- /dev/null +++ b/receiver/src/request_handler_receive_metadata.cpp @@ -0,0 +1,36 @@ +#include "request_handler_receive_metadata.h" +#include "io/io_factory.h" +#include "request.h" +#include "receiver_logger.h" +#include "receiver_config.h" +#include "preprocessor/definitions.h" + +namespace asapo { + +Error RequestHandlerReceiveMetaData::ProcessRequest(Request* request) const { + auto meta_size = request->GetMetaDataSize(); + if (meta_size == 0) { + return nullptr; + } + + Error err; + auto buf = std::unique_ptr<uint8_t[]> {new uint8_t[meta_size]}; + io__->Receive(request->GetSocket(), (void*) buf.get(), meta_size, &err); + if (err) { + return err; + } + + request->SetMetadata(std::string((char*)buf.get(), meta_size)); + return nullptr; +} + +RequestHandlerReceiveMetaData::RequestHandlerReceiveMetaData() : io__{GenerateDefaultIO()} , log__{GetDefaultReceiverLogger()} { + +} + +StatisticEntity RequestHandlerReceiveMetaData::GetStatisticEntity() const { + return StatisticEntity::kNetwork; +} + + +} diff --git a/receiver/src/request_handler_receive_metadata.h b/receiver/src/request_handler_receive_metadata.h new file mode 100644 index 000000000..385c16c45 --- /dev/null +++ b/receiver/src/request_handler_receive_metadata.h @@ -0,0 +1,22 @@ +#ifndef ASAPO_REQUEST_HANDLER_RECEIVE_METADATA_H +#define ASAPO_REQUEST_HANDLER_RECEIVE_METADATA_H + +#include "request_handler.h" +#include "logger/logger.h" + +#include "io/io.h" + +namespace asapo { + +class RequestHandlerReceiveMetaData final: public ReceiverRequestHandler { + public: + RequestHandlerReceiveMetaData(); + StatisticEntity GetStatisticEntity() const override; + Error ProcessRequest(Request* request) const override; + std::unique_ptr<IO> io__; + const AbstractLogger* log__; +}; + +} + +#endif //ASAPO_REQUEST_HANDLER_RECEIVE_METADATA_H diff --git a/receiver/src/requests_dispatcher.cpp b/receiver/src/requests_dispatcher.cpp index 35e46ce00..4ac17c30d 100644 --- a/receiver/src/requests_dispatcher.cpp +++ b/receiver/src/requests_dispatcher.cpp @@ -20,8 +20,8 @@ NetworkErrorCode GetNetworkCodeFromError(const Error& err) { return NetworkErrorCode::kNetErrorFileIdAlreadyInUse; } else if (err == ReceiverErrorTemplates::kAuthorizationFailure) { return NetworkErrorCode::kNetAuthorizationError; - } else if (err == DBErrorTemplates::kJsonParseError) { - return NetworkErrorCode::kNetErrorErrorInMetadata; + } else if (err == DBErrorTemplates::kJsonParseError || err == ReceiverErrorTemplates::kBadRequest) { + return NetworkErrorCode::kNetErrorWrongRequest; } else { return NetworkErrorCode::kNetErrorInternalServerError; } diff --git a/receiver/unittests/mock_receiver_config.cpp b/receiver/unittests/mock_receiver_config.cpp index 1ddc3ed4c..8a38336d0 100644 --- a/receiver/unittests/mock_receiver_config.cpp +++ b/receiver/unittests/mock_receiver_config.cpp @@ -57,6 +57,8 @@ Error SetReceiverConfig (const ReceiverConfig& config, std::string error_field) config_string += "," + Key("ReservedShare", error_field) + std::to_string(config.datacache_reserved_share); config_string += "}"; config_string += "," + Key("AuthorizationInterval", error_field) + std::to_string(config.authorization_interval_ms); + config_string += "," + Key("ReceiveToDiskThresholdMB", + error_field) + std::to_string(config.receive_to_disk_threshold_mb); config_string += "," + Key("AuthorizationServer", error_field) + "\"" + config.authorization_server + "\""; config_string += "," + Key("WriteToDisk", error_field) + (config.write_to_disk ? "true" : "false"); config_string += "," + Key("WriteToDb", error_field) + (config.write_to_db ? "true" : "false"); diff --git a/receiver/unittests/receiver_mocking.h b/receiver/unittests/receiver_mocking.h index 77a5e8cb3..d16a3f526 100644 --- a/receiver/unittests/receiver_mocking.h +++ b/receiver/unittests/receiver_mocking.h @@ -54,8 +54,9 @@ class MockRequest: public Request { MOCK_CONST_METHOD0(GetStream, const std::string & ()); MOCK_CONST_METHOD0(GetMetaData, const std::string & ()); MOCK_CONST_METHOD0(GetBeamline, const std::string & ()); - MOCK_CONST_METHOD0(GetOpCode, - asapo::Opcode ()); + MOCK_CONST_METHOD0(GetOpCode, asapo::Opcode ()); + MOCK_CONST_METHOD0(GetSocket, asapo::SocketDescriptor ()); + const asapo::CustomRequestData& GetCustomData() const override { return (asapo::CustomRequestData&) * GetCustomData_t(); }; diff --git a/receiver/unittests/test_config.cpp b/receiver/unittests/test_config.cpp index aca938cff..4147e0349 100644 --- a/receiver/unittests/test_config.cpp +++ b/receiver/unittests/test_config.cpp @@ -63,6 +63,8 @@ class ConfigTests : public Test { test_config.dataserver.nthreads = 5; test_config.discovery_server = "discovery"; test_config.advertise_ip = "0.0.0.1"; + test_config.receive_to_disk_threshold_mb = 50; + } }; @@ -95,7 +97,7 @@ TEST_F(ConfigTests, ReadSettings) { ASSERT_THAT(config->dataserver.tag, Eq("receiver1_ds")); ASSERT_THAT(config->discovery_server, Eq("discovery")); ASSERT_THAT(config->advertise_ip, Eq("0.0.0.1")); - + ASSERT_THAT(config->receive_to_disk_threshold_mb, Eq(50)); } @@ -105,7 +107,7 @@ TEST_F(ConfigTests, ErrorReadSettings) { std::vector<std::string>fields {"PerformanceDbServer", "ListenPort", "DataServer", "ListenPort", "WriteToDisk", "WriteToDb", "DataCache", "Use", "SizeGB", "ReservedShare", "DatabaseServer", "Tag", "AuthorizationServer", "AuthorizationInterval", "RootFolder", "PerformanceDbName", "LogLevel", - "NThreads", "DiscoveryServer", "AdvertiseIP"}; + "NThreads", "DiscoveryServer", "AdvertiseIP", "ReceiveToDiskThresholdMB"}; for (const auto& field : fields) { auto err = asapo::SetReceiverConfig(test_config, field); ASSERT_THAT(err, Ne(nullptr)); diff --git a/receiver/unittests/test_request_factory.cpp b/receiver/unittests/test_request_factory.cpp index 744677365..efd613811 100644 --- a/receiver/unittests/test_request_factory.cpp +++ b/receiver/unittests/test_request_factory.cpp @@ -14,6 +14,7 @@ #include "../src/request_handler_db_write.h" #include "../src/request_handler_authorize.h" #include "../src/request_handler_receive_data.h" +#include "../src/request_handler_receive_metadata.h" #include "database/database.h" @@ -82,12 +83,32 @@ TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendDataCode) { generic_request_header.op_code = code; auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(dynamic_cast<asapo::Request*>(request.get()), Ne(nullptr)); + ASSERT_THAT(request->GetListHandlers().size(), Eq(5)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerAuthorize*>(request->GetListHandlers()[0]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveMetaData*>(request->GetListHandlers()[1]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveData*>(request->GetListHandlers()[2]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerFileWrite*>(request->GetListHandlers()[3]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers().back()), Ne(nullptr)); + } +} + +TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendDataCodeLargeFile) { + for (auto code : std::vector<asapo::Opcode> {asapo::Opcode::kOpcodeTransferData, asapo::Opcode::kOpcodeTransferSubsetData}) { + generic_request_header.op_code = code; + config.receive_to_disk_threshold_mb = 0; + SetReceiverConfig(config, "none"); + + generic_request_header.data_size = 1; + auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); + ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(dynamic_cast<asapo::Request*>(request.get()), Ne(nullptr)); ASSERT_THAT(request->GetListHandlers().size(), Eq(4)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerAuthorize*>(request->GetListHandlers()[0]), Ne(nullptr)); - ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveData*>(request->GetListHandlers()[1]), Ne(nullptr)); - ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerFileWrite*>(request->GetListHandlers()[2]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveMetaData*>(request->GetListHandlers()[1]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerFileReceive*>(request->GetListHandlers()[2]), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers().back()), Ne(nullptr)); } } @@ -110,7 +131,7 @@ TEST_F(FactoryTests, DoNotAddDiskWriterIfNotWantedInConfig) { auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); ASSERT_THAT(err, Eq(nullptr)); - ASSERT_THAT(request->GetListHandlers().size(), Eq(3)); + ASSERT_THAT(request->GetListHandlers().size(), Eq(4)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerAuthorize*>(request->GetListHandlers()[0]), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers().back()), Ne(nullptr)); } @@ -119,7 +140,7 @@ TEST_F(FactoryTests, DoNotAddDiskWriterIfNotWantedInRequest) { generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::IngestModeFlags::kTransferData; auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); ASSERT_THAT(err, Eq(nullptr)); - ASSERT_THAT(request->GetListHandlers().size(), Eq(3)); + ASSERT_THAT(request->GetListHandlers().size(), Eq(4)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerAuthorize*>(request->GetListHandlers()[0]), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers().back()), Ne(nullptr)); } @@ -131,10 +152,11 @@ TEST_F(FactoryTests, DoNotAddDbWriterIfNotWanted) { auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); ASSERT_THAT(err, Eq(nullptr)); - ASSERT_THAT(request->GetListHandlers().size(), Eq(3)); + ASSERT_THAT(request->GetListHandlers().size(), Eq(4)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerAuthorize*>(request->GetListHandlers()[0]), Ne(nullptr)); - ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveData*>(request->GetListHandlers()[1]), Ne(nullptr)); - ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerFileWrite*>(request->GetListHandlers()[2]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveMetaData*>(request->GetListHandlers()[1]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveData*>(request->GetListHandlers()[2]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerFileWrite*>(request->GetListHandlers()[3]), Ne(nullptr)); } TEST_F(FactoryTests, CachePassedToRequest) { @@ -172,4 +194,32 @@ TEST_F(FactoryTests, DonNotGenerateMetadataRequestIfNoDbConfigured) { } +TEST_F(FactoryTests, DonNotGenerateRequestIfWriteToDiskNotActive) { + config.write_to_disk = false; + config.receive_to_disk_threshold_mb = 0; + + SetReceiverConfig(config, "none"); + + generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; + generic_request_header.data_size = 1; + + auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); + + ASSERT_THAT(err, Eq(asapo::ReceiverErrorTemplates::kReject)); +} + +TEST_F(FactoryTests, DonNotGenerateRequestIfIngestModeIsWrong) { + config.receive_to_disk_threshold_mb = 0; + + SetReceiverConfig(config, "none"); + + generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::kTransferData; + generic_request_header.data_size = 1; + + auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); + + ASSERT_THAT(err, Eq(asapo::ReceiverErrorTemplates::kBadRequest)); +} + + } diff --git a/receiver/unittests/test_request_handler_file_receive.cpp b/receiver/unittests/test_request_handler_file_receive.cpp new file mode 100644 index 000000000..bd4191f2f --- /dev/null +++ b/receiver/unittests/test_request_handler_file_receive.cpp @@ -0,0 +1,145 @@ +#include <gtest/gtest.h> +#include <gmock/gmock.h> + +#include "unittests/MockIO.h" +#include "unittests/MockLogger.h" + +#include "../src/receiver_error.h" +#include "../src/request.h" +#include "../src/request_handler.h" +#include "../src/request_handler_file_receive.h" +#include "common/networking.h" +#include "mock_receiver_config.h" +#include "preprocessor/definitions.h" + +#include "receiver_mocking.h" + +using ::testing::Test; +using ::testing::Return; +using ::testing::ReturnRef; +using ::testing::_; +using ::testing::DoAll; +using ::testing::SetArgReferee; +using ::testing::Gt; +using ::testing::Eq; +using ::testing::Ne; +using ::testing::Mock; +using ::testing::NiceMock; +using ::testing::InSequence; +using ::testing::SetArgPointee; +using ::testing::AllOf; +using ::testing::HasSubstr; + + +using ::asapo::Error; +using ::asapo::ErrorInterface; +using ::asapo::FileDescriptor; +using ::asapo::SocketDescriptor; +using ::asapo::MockIO; +using asapo::Request; +using asapo::RequestHandlerFileReceive; +using ::asapo::GenericRequestHeader; +using asapo::MockRequest; + +namespace { + +TEST(FileReceive, Constructor) { + RequestHandlerFileReceive handler; + ASSERT_THAT(dynamic_cast<asapo::IO*>(handler.io__.get()), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(handler.log__), Ne(nullptr)); +} + +class FileReceiveHandlerTests : public Test { + public: + RequestHandlerFileReceive handler; + NiceMock<MockIO> mock_io; + std::unique_ptr<MockRequest> mock_request; + NiceMock<asapo::MockLogger> mock_logger; + SocketDescriptor expected_socket_id = SocketDescriptor{1}; + std::string expected_file_name = "2"; + std::string expected_beamtime_id = "beamtime_id"; + std::string expected_beamline = "beamline"; + uint64_t expected_file_size = 10; + void MockRequestData(); + void SetUp() override { + GenericRequestHeader request_header; + request_header.data_id = 2; + mock_request.reset(new MockRequest{request_header, expected_socket_id, ""}); + handler.io__ = std::unique_ptr<asapo::IO> {&mock_io}; + handler.log__ = &mock_logger; + } + void TearDown() override { + handler.io__.release(); + } + +}; + +TEST_F(FileReceiveHandlerTests, CheckStatisticEntity) { + auto entity = handler.GetStatisticEntity(); + ASSERT_THAT(entity, Eq(asapo::StatisticEntity::kDisk)); +} + +void FileReceiveHandlerTests::MockRequestData() { + EXPECT_CALL(*mock_request, GetDataSize()) + .WillOnce(Return(expected_file_size)) + ; + + EXPECT_CALL(*mock_request, GetSocket()) + .WillOnce(Return(expected_socket_id)) + ; + + EXPECT_CALL(*mock_request, GetBeamtimeId()) + .WillOnce(ReturnRef(expected_beamtime_id)) + ; + + EXPECT_CALL(*mock_request, GetBeamline()) + .WillOnce(ReturnRef(expected_beamline)) + ; + + EXPECT_CALL(*mock_request, GetFileName()) + .WillOnce(Return(expected_file_name)) + ; +} + +TEST_F(FileReceiveHandlerTests, CallsReceiveFile) { + asapo::ReceiverConfig test_config; + test_config.root_folder = "test_folder"; + + asapo::SetReceiverConfig(test_config, "none"); + + MockRequestData(); + + std::string expected_path = std::string("test_folder") + asapo::kPathSeparator + expected_beamline + + asapo::kPathSeparator + expected_beamtime_id; + + EXPECT_CALL(mock_io, ReceiveDataToFile_t(expected_socket_id, expected_path, expected_file_name, expected_file_size, + true)) + .WillOnce( + Return(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()) + ); + + auto err = handler.ProcessRequest(mock_request.get()); + + ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kUnknownIOError)); +} + + +TEST_F(FileReceiveHandlerTests, WritesToLog) { + + MockRequestData(); + + EXPECT_CALL(mock_io, ReceiveDataToFile_t(_, _, _, _, _)) + .WillOnce(Return(nullptr)); + + EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("received file"), + HasSubstr(expected_file_name), + HasSubstr(expected_beamtime_id), + HasSubstr(std::to_string(expected_file_size)) + ) + ) + ); + handler.ProcessRequest(mock_request.get()); +} + + +} \ No newline at end of file diff --git a/receiver/unittests/test_request_handler_receive_data.cpp b/receiver/unittests/test_request_handler_receive_data.cpp index 5a27a0836..83fd25cc6 100644 --- a/receiver/unittests/test_request_handler_receive_data.cpp +++ b/receiver/unittests/test_request_handler_receive_data.cpp @@ -147,7 +147,6 @@ TEST_F(ReceiveDataHandlerTests, HandleDoesNotReceiveDataWhenMetadataOnlyWasSent) ASSERT_THAT(err, Eq(nullptr)); } - TEST_F(ReceiveDataHandlerTests, HandleReturnsErrorOnDataReceive) { ExpectReceiveMetaData(true); ExpectReceiveData(false); diff --git a/receiver/unittests/test_request_handler_receive_metadata.cpp b/receiver/unittests/test_request_handler_receive_metadata.cpp new file mode 100644 index 000000000..92d526504 --- /dev/null +++ b/receiver/unittests/test_request_handler_receive_metadata.cpp @@ -0,0 +1,126 @@ +#include <gtest/gtest.h> +#include <gmock/gmock.h> +#include <unittests/MockIO.h> +#include "../src/connection.h" +#include "../src/receiver_error.h" +#include "../src/request.h" +#include "../src/request_handler.h" +#include "../src/request_handler_receive_metadata.h" +#include "database/database.h" +#include "unittests/MockLogger.h" + +#include "receiver_mocking.h" +#include "mock_receiver_config.h" + +using ::testing::Test; +using ::testing::Return; +using ::testing::_; +using ::testing::DoAll; +using ::testing::SetArgReferee; +using ::testing::Gt; +using ::testing::Eq; +using ::testing::Ne; +using ::testing::Mock; +using ::testing::NiceMock; +using ::testing::InSequence; +using ::testing::SetArgPointee; +using ::asapo::Error; +using ::asapo::ErrorInterface; +using ::asapo::FileDescriptor; +using ::asapo::SocketDescriptor; +using ::asapo::GenericRequestHeader; +using ::asapo::SendDataResponse; +using ::asapo::GenericRequestHeader; +using ::asapo::GenericNetworkResponse; +using ::asapo::Opcode; +using ::asapo::Connection; +using ::asapo::MockIO; +using asapo::Request; +using asapo::MockDataCache; +using asapo::StatisticEntity; + +using asapo::ReceiverConfig; +using asapo::SetReceiverConfig; +using asapo::RequestFactory; +using asapo:: RequestHandlerReceiveMetaData; +namespace { + +TEST(ReceiveData, Constructor) { + RequestHandlerReceiveMetaData handler; + ASSERT_THAT(dynamic_cast<asapo::IO*>(handler.io__.get()), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(handler.log__), Ne(nullptr)); +} + +class ReceiveMetaDataHandlerTests : public Test { + public: + GenericRequestHeader generic_request_header; + asapo::SocketDescriptor socket_fd_{1}; + uint64_t data_size_ {100}; + uint64_t data_id_{15}; + uint64_t expected_slot_id{16}; + std::string expected_origin_uri = "origin_uri"; + std::string expected_metadata = "meta"; + uint64_t expected_metadata_size = expected_metadata.size(); + asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; + std::unique_ptr<Request> request; + NiceMock<MockIO> mock_io; + RequestHandlerReceiveMetaData handler; + NiceMock<asapo::MockLogger> mock_logger; + + void SetUp() override { + generic_request_header.data_size = data_size_; + generic_request_header.data_id = data_id_; + generic_request_header.meta_size = expected_metadata_size; + generic_request_header.op_code = expected_op_code; + generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; + request.reset(new Request{generic_request_header, socket_fd_, expected_origin_uri, nullptr}); + handler.io__ = std::unique_ptr<asapo::IO> {&mock_io}; + handler.log__ = &mock_logger; + } + void TearDown() override { + handler.io__.release(); + } + void ExpectReceive(uint64_t expected_size, bool ok = true); + void ExpectReceiveMetaData(bool ok = true); +}; + +ACTION_P(CopyStr, value) { + if (value.size() <= arg2 && value.size() > 0) { + memcpy(static_cast<char*>(arg1), value.c_str(), value.size()); + } +} + + +void ReceiveMetaDataHandlerTests::ExpectReceive(uint64_t expected_size, bool ok) { + EXPECT_CALL(mock_io, Receive_t(socket_fd_, _, expected_size, _)).WillOnce( + DoAll( + CopyStr(expected_metadata), + SetArgPointee<3>(ok ? nullptr : new asapo::IOError("Test Read Error", asapo::IOErrorType::kReadError)), + Return(0) + )); + +} + +void ReceiveMetaDataHandlerTests::ExpectReceiveMetaData(bool ok) { + ExpectReceive(expected_metadata_size, ok); +} + +TEST_F(ReceiveMetaDataHandlerTests, CheckStatisticEntity) { + auto entity = handler.GetStatisticEntity(); + ASSERT_THAT(entity, Eq(asapo::StatisticEntity::kNetwork)); +} + +TEST_F(ReceiveMetaDataHandlerTests, HandleReturnsErrorOnMetaDataReceive) { + ExpectReceiveMetaData(false); + auto err = handler.ProcessRequest(request.get()); + ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kReadError)); +} + +TEST_F(ReceiveMetaDataHandlerTests, HandleReturnsOK) { + ExpectReceiveMetaData(true); + auto err = handler.ProcessRequest(request.get()); + ASSERT_THAT(err, Eq(nullptr)); +} + + +} diff --git a/receiver/unittests/test_requests_dispatcher.cpp b/receiver/unittests/test_requests_dispatcher.cpp index e5ecb5985..78b495f67 100644 --- a/receiver/unittests/test_requests_dispatcher.cpp +++ b/receiver/unittests/test_requests_dispatcher.cpp @@ -289,10 +289,19 @@ TEST_F(RequestsDispatcherTests, ProcessRequestReturnsMetaDataFailure) { auto err = dispatcher->ProcessRequest(request); ASSERT_THAT(err, Eq(asapo::DBErrorTemplates::kJsonParseError)); - ASSERT_THAT(response.error_code, Eq(asapo::kNetErrorErrorInMetadata)); + ASSERT_THAT(response.error_code, Eq(asapo::kNetErrorWrongRequest)); ASSERT_THAT(std::string(response.message), HasSubstr("parse")); } +TEST_F(RequestsDispatcherTests, ProcessRequestReturnsBadRequest) { + MockHandleRequest(true, asapo::ReceiverErrorTemplates::kBadRequest.Generate()); + MockSendResponse(&response, false); + + auto err = dispatcher->ProcessRequest(request); + + ASSERT_THAT(response.error_code, Eq(asapo::kNetErrorWrongRequest)); +} + } diff --git a/tests/automatic/producer_receiver/CMakeLists.txt b/tests/automatic/producer_receiver/CMakeLists.txt index 8c02b4735..e70ec3340 100644 --- a/tests/automatic/producer_receiver/CMakeLists.txt +++ b/tests/automatic/producer_receiver/CMakeLists.txt @@ -1,4 +1,5 @@ add_subdirectory(transfer_single_file) +add_subdirectory(transfer_single_file_bypass_buffer) add_subdirectory(transfer_datasets) if (UNIX) diff --git a/tests/automatic/producer_receiver/transfer_single_file_bypass_buffer/CMakeLists.txt b/tests/automatic/producer_receiver/transfer_single_file_bypass_buffer/CMakeLists.txt new file mode 100644 index 000000000..7b299ca60 --- /dev/null +++ b/tests/automatic/producer_receiver/transfer_single_file_bypass_buffer/CMakeLists.txt @@ -0,0 +1,7 @@ +set(TARGET_NAME transfer-single-file) + +################################ +# Testing +################################ +prepare_asapo() +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:dummy-data-producer>" nomem) diff --git a/tests/automatic/producer_receiver/transfer_single_file_bypass_buffer/check_linux.sh b/tests/automatic/producer_receiver/transfer_single_file_bypass_buffer/check_linux.sh new file mode 100644 index 000000000..cd78d6e8d --- /dev/null +++ b/tests/automatic/producer_receiver/transfer_single_file_bypass_buffer/check_linux.sh @@ -0,0 +1,44 @@ +#!/usr/bin/env bash + +set -e + +trap Cleanup EXIT + +database_name=db_test +beamtime_id=asapo_test +beamline=test +receiver_root_folder=/tmp/asapo/receiver/files +receiver_folder=${receiver_root_folder}/${beamline}/${beamtime_id} + +Cleanup() { + echo cleanup +# rm -rf ${receiver_root_folder} + nomad stop receiver + nomad stop discovery + nomad stop authorizer + nomad stop nginx + nomad run nginx_kill.nmd && nomad stop -yes -purge nginx_kill + echo "db.dropDatabase()" | mongo ${beamtime_id}_detector + influx -execute "drop database ${database_name}" +} + +# create db before consumer starts reading it. todo: git rid of it +echo "db.${beamtime_id}_detector.insert({dummy:1})" | mongo ${beamtime_id}_detector + +nomad run authorizer.nmd +nomad run nginx.nmd +nomad run receiver.nmd +nomad run discovery.nmd + +mkdir -p ${receiver_folder} + +sleep 1 + +$1 localhost:8400 ${beamtime_id} 60000 1 1 0 30 + +echo "db.data.find({"_id":1})" | mongo ${beamtime_id}_detector > out +cat out +cat out | grep '"buf_id" : 0' +cat out | grep user_meta + +ls -ln ${receiver_folder}/1 | awk '{ print $5 }'| grep 60000000 diff --git a/tests/automatic/producer_receiver/transfer_single_file_bypass_buffer/check_windows.bat b/tests/automatic/producer_receiver/transfer_single_file_bypass_buffer/check_windows.bat new file mode 100644 index 000000000..2405f42c3 --- /dev/null +++ b/tests/automatic/producer_receiver/transfer_single_file_bypass_buffer/check_windows.bat @@ -0,0 +1,47 @@ +SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" +SET beamtime_id=asapo_test +SET beamline=test +SET receiver_root_folder=c:\tmp\asapo\receiver\files +SET receiver_folder="%receiver_root_folder%\%beamline%\%beamtime_id%" + + +echo db.%beamtime_id%_detector.insert({dummy:1})" | %mongo_exe% %beamtime_id%_detector + + +c:\opt\consul\nomad run receiver.nmd +c:\opt\consul\nomad run authorizer.nmd +c:\opt\consul\nomad run discovery.nmd +c:\opt\consul\nomad run nginx.nmd + +ping 1.0.0.0 -n 1 -w 100 > nul + +mkdir %receiver_folder% + +"%1" localhost:8400 %beamtime_id% 60000 1 1 0 30 + +ping 1.0.0.0 -n 1 -w 100 > nul + +FOR /F "usebackq" %%A IN ('%receiver_folder%\1') DO set size=%%~zA +if %size% NEQ 60000000 goto :error + +echo "db.data.find({"_id":1})" | mongo ${beamtime_id}_detector > out +type out +type out | findstr /c:"\"buf_id\" : 0" || goto :error +type out | findstr /c:user_meta || goto :error + +goto :clean + +:error +call :clean +exit /b 1 + +:clean +c:\opt\consul\nomad stop receiver +c:\opt\consul\nomad stop discovery +c:\opt\consul\nomad stop nginx +c:\opt\consul\nomad run nginx_kill.nmd && c:\opt\consul\nomad stop -yes -purge nginx_kill +c:\opt\consul\nomad stop authorizer +rmdir /S /Q %receiver_root_folder% +echo db.dropDatabase() | %mongo_exe% %beamtime_id%_detector + + diff --git a/tests/automatic/settings/receiver.json.tpl.lin.in b/tests/automatic/settings/receiver.json.tpl.lin.in index 86fa61c0d..3ba55dd6c 100644 --- a/tests/automatic/settings/receiver.json.tpl.lin.in +++ b/tests/automatic/settings/receiver.json.tpl.lin.in @@ -18,6 +18,7 @@ "ListenPort": {{ env "NOMAD_PORT_recv" }}, "Tag": "{{ env "NOMAD_ADDR_recv" }}", "WriteToDisk": @RECEIVER_WRITE_TO_DISK@, + "ReceiveToDiskThresholdMB":50, "WriteToDb": true, "LogLevel" : "debug", "RootFolder" : "/tmp/asapo/receiver/files" diff --git a/tests/automatic/settings/receiver.json.tpl.win.in b/tests/automatic/settings/receiver.json.tpl.win.in index 4cc81d11d..6cff01f59 100644 --- a/tests/automatic/settings/receiver.json.tpl.win.in +++ b/tests/automatic/settings/receiver.json.tpl.win.in @@ -18,6 +18,7 @@ }, "Tag": "{{ env "NOMAD_ADDR_recv" }}", "WriteToDisk": @RECEIVER_WRITE_TO_DISK@, + "ReceiveToDiskThresholdMB":50, "WriteToDb": true, "LogLevel" : "debug", "RootFolder" : "c:\\tmp\\asapo\\receiver\\files" diff --git a/tests/automatic/system_io/ip_tcp_network/client_serv/ip_tcp_network.cpp b/tests/automatic/system_io/ip_tcp_network/client_serv/ip_tcp_network.cpp index 78a49f073..08149f98a 100644 --- a/tests/automatic/system_io/ip_tcp_network/client_serv/ip_tcp_network.cpp +++ b/tests/automatic/system_io/ip_tcp_network/client_serv/ip_tcp_network.cpp @@ -79,7 +79,8 @@ std::unique_ptr<std::thread> CreateEchoServerThread() { io->Send(client_fd, buffer.get(), received, &err); ExitIfErrIsNotOk(&err, 107); - received = io->Receive(client_fd, buffer.get(), need_to_receive_size, &err); + io->ReceiveDataToFile(client_fd, "", "received", need_to_receive_size, false); + buffer = io->GetDataFromFile("received", &need_to_receive_size, &err); io->Send(client_fd, buffer.get(), received, &err); ExitIfErrIsNotOk(&err, 108); } @@ -113,7 +114,7 @@ void CheckNormal(int times, size_t size) { buffer[i] = rand(); } - FILE* out = fopen("data", "wb"); + FILE* out = fopen("sent", "wb"); fwrite(buffer.get(), 1, size, out); fclose(out); @@ -147,7 +148,7 @@ void CheckNormal(int times, size_t size) { } std::cout << "[CLIENT] send file" << std::endl; - err = io->SendFile(socket, "data", size); + err = io->SendFile(socket, "sent", size); ExitIfErrIsNotOk(&err, 208); std::cout << "[CLIENT] receive file" << std::endl; receive_count = io->Receive(socket, buffer2.get(), size, &err); @@ -206,6 +207,8 @@ int main(int argc, char* argv[]) { if(asapo::IOErrorTemplates::kConnectionRefused != err) { ExitIfErrIsNotOk(&err, 304); } + remove("sent"); + remove("received"); return 0; } diff --git a/tests/manual/performance_full_chain_simple/receiver.json b/tests/manual/performance_full_chain_simple/receiver.json index 71aea67fd..2d88da1f9 100644 --- a/tests/manual/performance_full_chain_simple/receiver.json +++ b/tests/manual/performance_full_chain_simple/receiver.json @@ -16,6 +16,7 @@ "ReservedShare": 10 }, "WriteToDisk":true, + "ReceiveToDiskThresholdMB":50, "WriteToDb":true, "LogLevel":"info", "Tag": "test_receiver", diff --git a/tests/manual/performance_producer_receiver/receiver.json b/tests/manual/performance_producer_receiver/receiver.json index 1e1ae4d70..9d3609563 100644 --- a/tests/manual/performance_producer_receiver/receiver.json +++ b/tests/manual/performance_producer_receiver/receiver.json @@ -16,6 +16,7 @@ "ReservedShare": 10 }, "WriteToDisk":true, + "ReceiveToDiskThresholdMB":50, "WriteToDb":true, "LogLevel":"info", "Tag": "test_receiver", diff --git a/tests/manual/python_tests/producer/receiver.json.tpl b/tests/manual/python_tests/producer/receiver.json.tpl index 0eaaa9848..bb59485af 100644 --- a/tests/manual/python_tests/producer/receiver.json.tpl +++ b/tests/manual/python_tests/producer/receiver.json.tpl @@ -18,6 +18,7 @@ "ListenPort": {{ env "NOMAD_PORT_recv" }}, "Tag": "{{ env "NOMAD_ADDR_recv" }}", "WriteToDisk": true, + "ReceiveToDiskThresholdMB":50, "WriteToDb": true, "LogLevel" : "debug", "RootFolder" : "/tmp/asapo/receiver/files" diff --git a/tests/manual/receiver_debug_local/receiver.json b/tests/manual/receiver_debug_local/receiver.json index 2c36af7bf..ace0fbecf 100644 --- a/tests/manual/receiver_debug_local/receiver.json +++ b/tests/manual/receiver_debug_local/receiver.json @@ -18,6 +18,7 @@ "ListenPort": 22001, "Tag": "22001", "WriteToDisk": true, + "ReceiveToDiskThresholdMB":50, "WriteToDb": true, "LogLevel" : "debug", "RootFolder" : "/tmp/asapo/receiver/files" -- GitLab