diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index 0b3fe71edb98273bb05e92f328ce47258a5a23fb..a057bcec510b32f15d07c54a8894792e6c597e4e 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -9,6 +9,7 @@ set(SOURCE_FILES src/receiver_config.cpp src/receiver_logger.cpp src/request_handler_db_write.cpp + src/request_handler_receive_data.cpp src/request_handler_authorize.cpp src/statistics_sender_fluentd.cpp src/requests_dispatcher.cpp @@ -72,6 +73,7 @@ set(TEST_SOURCE_FILES unittests/mock_receiver_config.cpp unittests/test_requests_dispatcher.cpp unittests/test_datacache.cpp + unittests/test_request_handler_receive_data.cpp ) # set(TEST_LIBRARIES "${TARGET_NAME};system_io") diff --git a/receiver/src/request.cpp b/receiver/src/request.cpp index 32d66bb14d7a10d2168448256b62ee8e91a71b7a..1d974581bbe448ad78a44b311be1eb3b1bc33a1f 100644 --- a/receiver/src/request.cpp +++ b/receiver/src/request.cpp @@ -10,7 +10,7 @@ Request::Request(const GenericRequestHeader& header, socket_fd_{socket_fd}, origin_uri_{std::move(origin_uri)} { } -Error Request::PrepareDataBuffer() { +Error Request::PrepareDataBufferAndLockIfNeeded() { if (cache__ == nullptr) { try { data_buffer_.reset(new uint8_t[(size_t)request_header_.data_size]); @@ -31,74 +31,14 @@ Error Request::PrepareDataBuffer() { return nullptr; } -Error Request::ReceiveMetaData() { - if (request_header_.meta_size == 0) { - return nullptr; - } - - Error err; - auto buf = std::unique_ptr<uint8_t[]> {new uint8_t[(size_t)request_header_.meta_size]}; - io__->Receive(socket_fd_, (void*) buf.get(), (size_t) request_header_.meta_size, &err); - if (err) { - return err; - } - metadata_.assign((char*)buf.get(), request_header_.meta_size); - return nullptr; -} - - -bool Request::NeedReceiveData() { - return request_header_.data_size > 0 && - (request_header_.custom_data[asapo::kPosIngestMode] & asapo::kTransferData); -} - -Error Request::ReceiveData() { - if (!NeedReceiveData()) { - return nullptr; - } - - auto err = PrepareDataBuffer(); - if (err) { - return err; - } - io__->Receive(socket_fd_, GetData(), (size_t) request_header_.data_size, &err); - if (slot_meta_) { - cache__->UnlockSlot(slot_meta_); - } - return err; -} - - -Error Request::ReceiveRequestContent(ReceiverStatistics* statistics) { - statistics->StartTimer(StatisticEntity::kNetwork); - - auto err = ReceiveMetaData(); - if (err) { - return err; - } - - err = ReceiveData(); - if (err) { - return err; - } - - statistics->StopTimer(); - - return nullptr; -} - - Error Request::Handle(ReceiverStatistics* statistics) { - auto err = ReceiveRequestContent(statistics); - if (err) { - return err; - } - for (auto handler : handlers_) { statistics->StartTimer(handler->GetStatisticEntity()); auto err = handler->ProcessRequest(this); if (err) { + if (dynamic_cast<const RequestHandlerAuthorize*>(handler)); + return err; } statistics->StopTimer(); @@ -192,4 +132,21 @@ void Request::SetStream(std::string stream) { stream_ = std::move(stream); } +void Request::UnlockDataBufferIfNeeded() { + if (slot_meta_) { + cache__->UnlockSlot(slot_meta_); + } +} +SocketDescriptor Request::GetSocket() { + return socket_fd_; +} + +void Request::SetMetadata(std::string metadata) { + metadata_ = std::move(metadata); +} + +uint64_t Request::GetMetaDataSize() const { + return request_header_.meta_size; +} + } \ No newline at end of file diff --git a/receiver/src/request.h b/receiver/src/request.h index c8cf216fd48565ffb62d699c5505b19c84ed1ff0..0ab3d32ecf346b1667ad7b6897e6fb653aaa6b92 100644 --- a/receiver/src/request.h +++ b/receiver/src/request.h @@ -30,6 +30,7 @@ class Request { VIRTUAL void AddHandler(const ReceiverRequestHandler*); VIRTUAL const RequestHandlerList& GetListHandlers() const; VIRTUAL uint64_t GetDataSize() const; + VIRTUAL uint64_t GetMetaDataSize() const; VIRTUAL uint64_t GetDataID() const; VIRTUAL std::string GetFileName() const; VIRTUAL void* GetData() const; @@ -44,20 +45,18 @@ class Request { VIRTUAL const std::string& GetStream() const; VIRTUAL void SetStream(std::string stream); + VIRTUAL void SetMetadata(std::string metadata); VIRTUAL const std::string& GetBeamline() const; VIRTUAL const CustomRequestData& GetCustomData() const; - + VIRTUAL Error PrepareDataBufferAndLockIfNeeded(); + VIRTUAL void UnlockDataBufferIfNeeded(); + VIRTUAL SocketDescriptor GetSocket(); std::unique_ptr<IO> io__; DataCache* cache__ = nullptr; VIRTUAL uint64_t GetSlotId() const; private: - Error PrepareDataBuffer(); - Error ReceiveData(); - Error ReceiveMetaData(); - Error ReceiveRequestContent(ReceiverStatistics* statistics); - bool NeedReceiveData(); const GenericRequestHeader request_header_; const SocketDescriptor socket_fd_; FileData data_buffer_; diff --git a/receiver/src/request_handler_receive_data.cpp b/receiver/src/request_handler_receive_data.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d01f8566e3ab58600322a5d1281d0be4254e39b3 --- /dev/null +++ b/receiver/src/request_handler_receive_data.cpp @@ -0,0 +1,72 @@ +#include "request_handler_receive_data.h" +#include "io/io_factory.h" +#include "request.h" +#include "receiver_logger.h" +#include "receiver_config.h" +#include "preprocessor/definitions.h" + +namespace asapo { + + + +Error RequestHandlerReceiveData::ReceiveMetaData(Request* request) const { + auto meta_size = request->GetMetaDataSize(); + if (meta_size == 0) { + return nullptr; + } + + Error err; + auto buf = std::unique_ptr<uint8_t[]> {new uint8_t[meta_size]}; + io__->Receive(request->GetSocket(), (void*) buf.get(), meta_size, &err); + if (err) { + return err; + } + + request->SetMetadata(std::string((char*)buf.get(), meta_size)); + return nullptr; +} + + +bool RequestHandlerReceiveData::NeedReceiveData(const Request* request) const { + return request->GetDataSize() > 0 && + (request->GetCustomData()[asapo::kPosIngestMode] & asapo::kTransferData); +} + +Error RequestHandlerReceiveData::ReceiveData(Request* request) const { + if (!NeedReceiveData(request)) { + return nullptr; + } + auto err = request->PrepareDataBufferAndLockIfNeeded(); + if (err) { + return err; + } + io__->Receive(request->GetSocket(), request->GetData(), (size_t) request->GetDataSize(), &err); + request->UnlockDataBufferIfNeeded(); + return err; +} + + +Error RequestHandlerReceiveData::ReceiveRequestContent(Request* request) const { + auto err = ReceiveMetaData(request); + if (err) { + return err; + } + return ReceiveData(request); +} + + + +Error RequestHandlerReceiveData::ProcessRequest(Request* request) const { + return ReceiveRequestContent(request); +} + +RequestHandlerReceiveData::RequestHandlerReceiveData() : io__{GenerateDefaultIO()} , log__{GetDefaultReceiverLogger()} { + +} + +StatisticEntity RequestHandlerReceiveData::GetStatisticEntity() const { + return StatisticEntity::kNetwork; +} + + +} diff --git a/receiver/src/request_handler_receive_data.h b/receiver/src/request_handler_receive_data.h new file mode 100644 index 0000000000000000000000000000000000000000..a208f9b4cd95b8f197c41fea0bdfdd43bec2a31d --- /dev/null +++ b/receiver/src/request_handler_receive_data.h @@ -0,0 +1,28 @@ +#ifndef ASAPO_REQUEST_HANDLER_RECEIVE_DATA_H +#define ASAPO_REQUEST_HANDLER_RECEIVE_DATA_H + +#include "request_handler.h" +#include "logger/logger.h" + +#include "io/io.h" + +namespace asapo { + +class RequestHandlerReceiveData final: public ReceiverRequestHandler { + public: + RequestHandlerReceiveData(); + StatisticEntity GetStatisticEntity() const override; + Error ProcessRequest(Request* request) const override; + std::unique_ptr<IO> io__; + const AbstractLogger* log__; + private: + Error ReceiveData(Request* request)const; + Error ReceiveMetaData(Request* request) const; + Error ReceiveRequestContent(Request* request) const; + bool NeedReceiveData(const Request* request) const; + +}; + +} + +#endif //ASAPO_REQUEST_HANDLER_RECEIVE_DATA_H diff --git a/receiver/unittests/test_request.cpp b/receiver/unittests/test_request.cpp index 6d003b3caa39fbe1cc4d7dba5914ab3f288c294d..0c37805c7050f39b8a66bdb4015a0fc862fdb7e0 100644 --- a/receiver/unittests/test_request.cpp +++ b/receiver/unittests/test_request.cpp @@ -96,144 +96,13 @@ class RequestTests : public Test { request->io__.release(); } void ExpectFileName(std::string sended, std::string received); - void ExpectReceive(uint64_t expected_size, bool ok = true); - void ExpectReceiveData(bool ok = true); - void ExpectReceiveMetaData(bool ok = true); - void ExpectReceiveAllOK(); - - }; -ACTION_P(CopyStr, value) { - if (value.size() <= arg2 && value.size() > 0) { - memcpy(static_cast<char*>(arg1), value.c_str(), value.size()); - } -} - - -void RequestTests::ExpectReceive(uint64_t expected_size, bool ok) { - EXPECT_CALL(mock_io, Receive_t(socket_fd_, _, expected_size, _)).WillOnce( - DoAll( - CopyStr(expected_metadata), - SetArgPointee<3>(ok ? nullptr : new asapo::IOError("Test Read Error", asapo::IOErrorType::kReadError)), - Return(0) - )); - -} -void RequestTests::ExpectReceiveData(bool ok) { - ExpectReceive(data_size_, ok); -} -void RequestTests::ExpectReceiveMetaData(bool ok) { - ExpectReceive(expected_metadata_size, ok); -} - -void RequestTests::ExpectReceiveAllOK() { - ExpectReceiveData(true); - ExpectReceiveMetaData(true); -} - -TEST_F(RequestTests, HandleDoesNotReceiveEmptyData) { - generic_request_header.data_size = 0; - generic_request_header.meta_size = 0; - request->io__.release(); - request.reset(new Request{generic_request_header, socket_fd_, "", nullptr}); - request->io__ = std::unique_ptr<asapo::IO> {&mock_io};; - - EXPECT_CALL(mock_io, Receive_t(_, _, _, _)).Times(0); - - auto err = request->Handle(stat); - - ASSERT_THAT(err, Eq(nullptr)); -} - - -TEST_F(RequestTests, HandleDoesNotReceiveDataWhenMetadataOnlyWasSent) { - generic_request_header.data_size = 10; - generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::kTransferMetaDataOnly; - request->io__.release(); - request.reset(new Request{generic_request_header, socket_fd_, "", nullptr}); - request->io__ = std::unique_ptr<asapo::IO> {&mock_io};; - - ExpectReceiveMetaData(true); - - auto err = request->Handle(stat); - - ASSERT_THAT(err, Eq(nullptr)); -} - - -TEST_F(RequestTests, HandleReturnsErrorOnDataReceive) { - ExpectReceiveMetaData(true); - ExpectReceiveData(false); - auto err = request->Handle(stat); - ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kReadError)); -} - -TEST_F(RequestTests, HandleReturnsErrorOnMetaDataReceive) { - ExpectReceiveMetaData(false); - auto err = request->Handle(stat); - ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kReadError)); -} - - - -TEST_F(RequestTests, HandleGetsMemoryFromCache) { - request->cache__ = &mock_cache; - asapo::CacheMeta meta; - meta.id = expected_slot_id; - EXPECT_CALL(mock_cache, GetFreeSlotAndLock(data_size_, _)).WillOnce( - DoAll(SetArgPointee<1>(&meta), - Return(&mock_cache) - )); - - EXPECT_CALL(mock_cache, UnlockSlot(&meta)); - - request->Handle(stat); - - ASSERT_THAT(request->GetSlotId(), Eq(expected_slot_id)); -} - - -TEST_F(RequestTests, ErrorGetMemoryFromCache) { - request->cache__ = &mock_cache; - - EXPECT_CALL(mock_cache, GetFreeSlotAndLock(data_size_, _)).WillOnce( - Return(nullptr) - ); - - EXPECT_CALL(mock_cache, UnlockSlot(_)).Times(0); - - - auto err = request->Handle(stat); - - ASSERT_THAT(request->GetSlotId(), Eq(0)); - ASSERT_THAT(err, Eq(asapo::ErrorTemplates::kMemoryAllocationError)); -} - - -TEST_F(RequestTests, HandleMeasuresTimeOnContentReceive) { - - EXPECT_CALL(mock_statistics, StartTimer_t(asapo::StatisticEntity::kNetwork)); - - ExpectReceiveAllOK(); - - EXPECT_CALL(mock_statistics, StopTimer_t()); - - request->Handle(stat); - - - ASSERT_THAT(request->GetMetaData(), Eq(expected_metadata)); - - -} - TEST_F(RequestTests, HandleProcessesRequests) { MockReqestHandler mock_request_handler; - EXPECT_CALL(mock_statistics, StartTimer_t(asapo::StatisticEntity::kNetwork)); - EXPECT_CALL(mock_request_handler, ProcessRequest_t(_)).WillOnce( Return(nullptr) ).WillOnce( @@ -245,7 +114,7 @@ TEST_F(RequestTests, HandleProcessesRequests) { EXPECT_CALL(mock_statistics, StartTimer_t(asapo::StatisticEntity::kDisk)).Times(2); - EXPECT_CALL(mock_statistics, StopTimer_t()).Times(2); + EXPECT_CALL(mock_statistics, StopTimer_t()).Times(1); auto err = request->Handle(stat); @@ -258,17 +127,6 @@ TEST_F(RequestTests, DataIsNullAtInit) { ASSERT_THAT(data, Eq(nullptr)); } -TEST_F(RequestTests, GetDataIsNotNullptr) { - - request->Handle(stat); - auto data = request->GetData(); - - - ASSERT_THAT(data, Ne(nullptr)); - - -} - TEST_F(RequestTests, GetDataID) { auto id = request->GetDataID(); diff --git a/receiver/unittests/test_request_handler_receive_data.cpp b/receiver/unittests/test_request_handler_receive_data.cpp new file mode 100644 index 0000000000000000000000000000000000000000..aff851dbf3a4cf1b77efa4607973fa769f48fbc2 --- /dev/null +++ b/receiver/unittests/test_request_handler_receive_data.cpp @@ -0,0 +1,206 @@ +#include <gtest/gtest.h> +#include <gmock/gmock.h> +#include <unittests/MockIO.h> +#include "../src/connection.h" +#include "../src/receiver_error.h" +#include "../src/request.h" +#include "../src/request_handler.h" +#include "../src/request_handler_receive_data.h" +#include "database/database.h" +#include "unittests/MockLogger.h" + +#include "receiver_mocking.h" +#include "mock_receiver_config.h" + +using ::testing::Test; +using ::testing::Return; +using ::testing::_; +using ::testing::DoAll; +using ::testing::SetArgReferee; +using ::testing::Gt; +using ::testing::Eq; +using ::testing::Ne; +using ::testing::Mock; +using ::testing::NiceMock; +using ::testing::InSequence; +using ::testing::SetArgPointee; +using ::asapo::Error; +using ::asapo::ErrorInterface; +using ::asapo::FileDescriptor; +using ::asapo::SocketDescriptor; +using ::asapo::GenericRequestHeader; +using ::asapo::SendDataResponse; +using ::asapo::GenericRequestHeader; +using ::asapo::GenericNetworkResponse; +using ::asapo::Opcode; +using ::asapo::Connection; +using ::asapo::MockIO; +using asapo::Request; +using asapo::MockDataCache; +using asapo::StatisticEntity; + +using asapo::ReceiverConfig; +using asapo::SetReceiverConfig; +using asapo::RequestFactory; +using asapo:: RequestHandlerReceiveData; +namespace { + +TEST(ReceiveData, Constructor) { + RequestHandlerReceiveData handler; + ASSERT_THAT(dynamic_cast<asapo::IO*>(handler.io__.get()), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(handler.log__), Ne(nullptr)); +} + +class ReceiveDataHandlerTests : public Test { + public: + GenericRequestHeader generic_request_header; + asapo::SocketDescriptor socket_fd_{1}; + uint64_t data_size_ {100}; + uint64_t data_id_{15}; + uint64_t expected_slot_id{16}; + std::string expected_origin_uri = "origin_uri"; + std::string expected_metadata = "meta"; + uint64_t expected_metadata_size = expected_metadata.size(); + asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; + char expected_request_message[asapo::kMaxMessageSize] = "test_message"; + std::unique_ptr<Request> request; + NiceMock<MockIO> mock_io; + MockDataCache mock_cache; + RequestHandlerReceiveData handler; + NiceMock<asapo::MockLogger> mock_logger; + + + void SetUp() override { + generic_request_header.data_size = data_size_; + generic_request_header.data_id = data_id_; + generic_request_header.meta_size = expected_metadata_size; + generic_request_header.op_code = expected_op_code; + generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; + strcpy(generic_request_header.message, expected_request_message); + request.reset(new Request{generic_request_header, socket_fd_, expected_origin_uri, nullptr}); + handler.io__ = std::unique_ptr<asapo::IO> {&mock_io}; + handler.log__ = &mock_logger; + //ON_CALL(mock_io, Receive_t(socket_fd_, _, data_size_, _)).WillByDefault( + //DoAll(SetArgPointee<3>(nullptr), +// Return(0) +// )); + } + void TearDown() override { + handler.io__.release(); + } + void ExpectReceive(uint64_t expected_size, bool ok = true); + void ExpectReceiveData(bool ok = true); + void ExpectReceiveMetaData(bool ok = true); +}; + +ACTION_P(CopyStr, value) { + if (value.size() <= arg2 && value.size() > 0) { + memcpy(static_cast<char*>(arg1), value.c_str(), value.size()); + } +} + + +void ReceiveDataHandlerTests::ExpectReceive(uint64_t expected_size, bool ok) { + EXPECT_CALL(mock_io, Receive_t(socket_fd_, _, expected_size, _)).WillOnce( + DoAll( + CopyStr(expected_metadata), + SetArgPointee<3>(ok ? nullptr : new asapo::IOError("Test Read Error", asapo::IOErrorType::kReadError)), + Return(0) + )); + +} +void ReceiveDataHandlerTests::ExpectReceiveData(bool ok) { + ExpectReceive(data_size_, ok); +} +void ReceiveDataHandlerTests::ExpectReceiveMetaData(bool ok) { + ExpectReceive(expected_metadata_size, ok); +} + +TEST_F(ReceiveDataHandlerTests, CheckStatisticEntity) { + auto entity = handler.GetStatisticEntity(); + ASSERT_THAT(entity, Eq(asapo::StatisticEntity::kNetwork)); +} + + +TEST_F(ReceiveDataHandlerTests, HandleDoesNotReceiveEmptyData) { + generic_request_header.data_size = 0; + generic_request_header.meta_size = 0; + request.reset(new Request{generic_request_header, socket_fd_, "", nullptr}); + + EXPECT_CALL(mock_io, Receive_t(_, _, _, _)).Times(0); + + auto err = handler.ProcessRequest(request.get()); + + ASSERT_THAT(err, Eq(nullptr)); +} + + +TEST_F(ReceiveDataHandlerTests, HandleDoesNotReceiveDataWhenMetadataOnlyWasSent) { + generic_request_header.data_size = 10; + generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::kTransferMetaDataOnly; + request.reset(new Request{generic_request_header, socket_fd_, "", nullptr}); + + ExpectReceiveMetaData(true); + + auto err = handler.ProcessRequest(request.get()); + + ASSERT_THAT(err, Eq(nullptr)); +} + + +TEST_F(ReceiveDataHandlerTests, HandleReturnsErrorOnDataReceive) { + ExpectReceiveMetaData(true); + ExpectReceiveData(false); + auto err = handler.ProcessRequest(request.get()); + ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kReadError)); +} + +TEST_F(ReceiveDataHandlerTests, HandleReturnsErrorOnMetaDataReceive) { + ExpectReceiveMetaData(false); + auto err = handler.ProcessRequest(request.get()); + ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kReadError)); +} + +TEST_F(ReceiveDataHandlerTests, HandleReturnsOK) { + ExpectReceiveMetaData(true); + ExpectReceiveData(true); + auto err = handler.ProcessRequest(request.get()); + ASSERT_THAT(err, Eq(nullptr)); +} + +TEST_F(ReceiveDataHandlerTests, HandleGetsMemoryFromCache) { + request->cache__ = &mock_cache; + asapo::CacheMeta meta; + meta.id = expected_slot_id; + EXPECT_CALL(mock_cache, GetFreeSlotAndLock(data_size_, _)).WillOnce( + DoAll(SetArgPointee<1>(&meta), + Return(&mock_cache) + )); + + EXPECT_CALL(mock_cache, UnlockSlot(&meta)); + + auto err = handler.ProcessRequest(request.get()); + + ASSERT_THAT(request->GetSlotId(), Eq(expected_slot_id)); +} + + +TEST_F(ReceiveDataHandlerTests, ErrorGetMemoryFromCache) { + request->cache__ = &mock_cache; + + EXPECT_CALL(mock_cache, GetFreeSlotAndLock(data_size_, _)).WillOnce( + Return(nullptr) + ); + + EXPECT_CALL(mock_cache, UnlockSlot(_)).Times(0); + + + auto err = handler.ProcessRequest(request.get()); + + ASSERT_THAT(request->GetSlotId(), Eq(0)); + ASSERT_THAT(err, Eq(asapo::ErrorTemplates::kMemoryAllocationError)); +} + + + +}