diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index 3d740dc91e80e86166e3eaf61ee99072759d334b..6909a965241af9a3d9e5f996cf3af09086f67425 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -20,6 +20,8 @@ enum Opcode : uint8_t { enum NetworkErrorCode : uint16_t { kNetErrorNoError, + kNetErrorWrongRequest, + kNetErrorNoData, kNetAuthorizationError, kNetErrorFileIdAlreadyInUse, kNetErrorAllocateStorageFailed, @@ -49,7 +51,6 @@ struct GenericRequestHeader { struct GenericNetworkResponse { Opcode op_code; - NetworkRequestId request_id; NetworkErrorCode error_code; char message[kMaxMessageSize]; }; diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index c0371b1f9da58afd586962fbb67f0a6da26d10a7..77d305f1e14ce42fa88e6b49211129759be460e6 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -13,12 +13,11 @@ set(SOURCE_FILES src/statistics_sender_fluentd.cpp src/requests_dispatcher.cpp src/receiver_data_server/receiver_data_server.cpp - src/receiver_data_server/net_server.cpp src/receiver_data_server/tcp_server.cpp src/receiver_data_server/receiver_data_server_request.cpp src/receiver_data_server/receiver_data_server_logger.cpp src/data_cache.cpp src/receiver_data_server/receiver_data_server_request_handler_factory.cpp -) + src/receiver_data_server/receiver_data_server_request_handler.cpp) ################################ @@ -74,5 +73,7 @@ gtest(${TARGET_NAME} "${TEST_SOURCE_FILES}" "${TEST_LIBRARIES}" ${CMAKE_CURRENT_ set(TEST_SOURCE_FILES_RDS unittests/receiver_data_server/test_receiver_data_server.cpp unittests/receiver_data_server/test_tcp_server.cpp + unittests/receiver_data_server/test_request_handler_factory.cpp + unittests/receiver_data_server/test_request_handler.cpp ) gtest(${TARGET_NAME}_RDS "${TEST_SOURCE_FILES_RDS}" "${TEST_LIBRARIES}" ${CMAKE_CURRENT_SOURCE_DIR}/src/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/*.h) diff --git a/receiver/src/data_cache.cpp b/receiver/src/data_cache.cpp index a8cf0e70b5342e31f9a7779a7e8c174349fc29b5..02f963e28c079dc6e7a42514d56b705d498ab3da 100644 --- a/receiver/src/data_cache.cpp +++ b/receiver/src/data_cache.cpp @@ -75,11 +75,11 @@ bool DataCache::SlotTooCloseToCurrentPointer(const CacheMeta* meta) { } // we allow to read if it was already locked - if lock come from reading - no problems, from writing -should not happen! -void* DataCache::GetSlotToReadAndLock(uint64_t id, CacheMeta** meta) { +void* DataCache::GetSlotToReadAndLock(uint64_t id, uint64_t data_size, CacheMeta** meta) { std::lock_guard<std::mutex> lock{mutex_}; for (auto& meta_rec : meta_) { if (meta_rec->id == id) { - if (SlotTooCloseToCurrentPointer(meta_rec.get())) { + if (data_size != meta_rec->size || SlotTooCloseToCurrentPointer(meta_rec.get())) { return nullptr; } meta_rec->lock++; diff --git a/receiver/src/data_cache.h b/receiver/src/data_cache.h index 24dd0d987c3481feac80ce7b7dc9f6f5c1b36df6..8d708f9c4f5de2e2db80dbf78568c264458c2c87 100644 --- a/receiver/src/data_cache.h +++ b/receiver/src/data_cache.h @@ -22,7 +22,7 @@ class DataCache { public: explicit DataCache(uint64_t cache_size_gb, float keepunlocked_ratio); VIRTUAL void* GetFreeSlotAndLock(uint64_t size, CacheMeta** meta); - void* GetSlotToReadAndLock(uint64_t id, CacheMeta** meta); + VIRTUAL void* GetSlotToReadAndLock(uint64_t id, uint64_t data_size, CacheMeta** meta); VIRTUAL bool UnlockSlot(CacheMeta* meta); ~DataCache() = default; private: diff --git a/receiver/src/main.cpp b/receiver/src/main.cpp index 602d5f1b595ee98ffbd4db295c8afa92a57234d0..d375bc68667700a27663cd6fab01880b0cb69799 100644 --- a/receiver/src/main.cpp +++ b/receiver/src/main.cpp @@ -22,8 +22,8 @@ asapo::Error ReadConfigFile(int argc, char* argv[]) { std::thread StartDataServer(const asapo::ReceiverConfig* config, asapo::SharedCache cache) { static const std::string dataserver_address = "0.0.0.0:" + std::to_string(config->dataserver_listen_port); - return std::thread([config] { - asapo::ReceiverDataServer data_server{dataserver_address, config->log_level, (uint8_t)config->dataserver_nthreads}; + return std::thread([config, cache] { + asapo::ReceiverDataServer data_server{dataserver_address, config->log_level, (uint8_t)config->dataserver_nthreads, cache}; data_server.Run(); }); } diff --git a/receiver/src/receiver_data_server/net_server.cpp b/receiver/src/receiver_data_server/net_server.cpp deleted file mode 100644 index dbc8718a1dc0fef2c2f281f6ce05d49e0e1ded4b..0000000000000000000000000000000000000000 --- a/receiver/src/receiver_data_server/net_server.cpp +++ /dev/null @@ -1,5 +0,0 @@ -#include "net_server.h" - -namespace asapo { - -} \ No newline at end of file diff --git a/receiver/src/receiver_data_server/net_server.h b/receiver/src/receiver_data_server/net_server.h index 6c3fee9868e9b2a8518592251eeecebf25459f2c..f91a61ae1916478e0122de57999760beb3f8c4d2 100644 --- a/receiver/src/receiver_data_server/net_server.h +++ b/receiver/src/receiver_data_server/net_server.h @@ -10,7 +10,10 @@ namespace asapo { class NetServer { public: virtual GenericRequests GetNewRequests(Error* err) const noexcept = 0; + virtual Error SendData(uint64_t source_id, void* buf, uint64_t size) const noexcept = 0; + virtual ~NetServer() = default; + }; } diff --git a/receiver/src/receiver_data_server/receiver_data_server.cpp b/receiver/src/receiver_data_server/receiver_data_server.cpp index d2a329812f9072c520601575be249e12df2092ab..e18b396b4963f7558445013dcdc692f3cdceef26 100644 --- a/receiver/src/receiver_data_server/receiver_data_server.cpp +++ b/receiver/src/receiver_data_server/receiver_data_server.cpp @@ -5,9 +5,10 @@ namespace asapo { -ReceiverDataServer::ReceiverDataServer(std::string address, LogLevel log_level, uint8_t n_threads) : net__{new TcpServer(address)}, -log__{GetDefaultReceiverDataServerLogger()} { - request_handler_factory_.reset(new ReceiverDataServerRequestHandlerFactory()); +ReceiverDataServer::ReceiverDataServer(std::string address, LogLevel log_level, uint8_t n_threads, + SharedCache data_cache) : net__{new TcpServer(address)}, +log__{GetDefaultReceiverDataServerLogger()}, data_cache_{data_cache} { + request_handler_factory_.reset(new ReceiverDataServerRequestHandlerFactory(net__.get(), data_cache_.get())); GetDefaultReceiverDataServerLogger()->SetLogLevel(log_level); request_pool__.reset(new RequestPool{n_threads, request_handler_factory_.get(), log__}); } diff --git a/receiver/src/receiver_data_server/receiver_data_server.h b/receiver/src/receiver_data_server/receiver_data_server.h index 26a11b19b2a0f51b6a7e6e2d58a657347ed7f17e..ff36b61304dd0eb90f4b39e99a68ea2cc364248a 100644 --- a/receiver/src/receiver_data_server/receiver_data_server.h +++ b/receiver/src/receiver_data_server/receiver_data_server.h @@ -6,6 +6,7 @@ #include "net_server.h" #include "request/request_pool.h" #include "logger/logger.h" +#include "../data_cache.h" namespace asapo { @@ -14,12 +15,14 @@ class ReceiverDataServer { // important to create it before request_pool__ std::unique_ptr<RequestHandlerFactory> request_handler_factory_; public: - explicit ReceiverDataServer(std::string address, LogLevel log_level, uint8_t n_threads); + explicit ReceiverDataServer(std::string address, LogLevel log_level, uint8_t n_threads, SharedCache data_cache); std::unique_ptr<RequestPool> request_pool__; std::unique_ptr<NetServer> net__; const AbstractLogger* log__; void Run(); private: + SharedCache data_cache_; + }; } diff --git a/receiver/src/receiver_data_server/receiver_data_server_error.h b/receiver/src/receiver_data_server/receiver_data_server_error.h index 822fbafbd3ed407a42b58b3100477e1cb3e0bf22..5b3ddadea216b85a2945935aabdbb8b702eba4ec 100644 --- a/receiver/src/receiver_data_server/receiver_data_server_error.h +++ b/receiver/src/receiver_data_server/receiver_data_server_error.h @@ -1,5 +1,5 @@ -#ifndef ASAPO_RECEIVER_ERROR_H -#define ASAPO_RECEIVER_ERROR_H +#ifndef ASAPO_RECEIVER_DATA_SERVER_ERROR_H +#define ASAPO_RECEIVER_DATA_SERVER_ERROR_H #include "common/error.h" @@ -7,16 +7,22 @@ namespace asapo { enum class ReceiverDataServerErrorType { kMemoryPool, + kWrongRequest }; using ReceiverDataServerErrorTemplate = ServiceErrorTemplate<ReceiverDataServerErrorType, ErrorType::kReceiverError>; -namespace ReceiverErrorTemplates { +namespace ReceiverDataServerErrorTemplates { auto const kMemoryPool = ReceiverDataServerErrorTemplate { "memory error", ReceiverDataServerErrorType::kMemoryPool }; +auto const kWrongRequest = ReceiverDataServerErrorTemplate { + "wrong request", ReceiverDataServerErrorType::kWrongRequest +}; + + }; } -#endif //ASAPO_RECEIVER_ERROR_H +#endif //ASAPO_RECEIVER_DATA_SERVER_ERROR_H diff --git a/receiver/src/receiver_data_server/receiver_data_server_request.cpp b/receiver/src/receiver_data_server/receiver_data_server_request.cpp index baabf2a8875a51983322a8e6c8f38fbed366f4c6..f0e5a5deade1ffb76ecabc493fa7aee3b2e88ae4 100644 --- a/receiver/src/receiver_data_server/receiver_data_server_request.cpp +++ b/receiver/src/receiver_data_server/receiver_data_server_request.cpp @@ -3,10 +3,9 @@ namespace asapo { -ReceiverDataServerRequest::ReceiverDataServerRequest(GenericRequestHeader header, uint64_t net_id, - const NetServer* server) : +ReceiverDataServerRequest::ReceiverDataServerRequest(GenericRequestHeader header, uint64_t source_id) : GenericRequest(std::move(header)), - net_id{net_id}, server{server} { + source_id{source_id} { } diff --git a/receiver/src/receiver_data_server/receiver_data_server_request.h b/receiver/src/receiver_data_server/receiver_data_server_request.h index c191325d40a1150b47b0fd7be05b334d4b236073..0a6d24f63787594009b3b9fefa4ebdf452577780 100644 --- a/receiver/src/receiver_data_server/receiver_data_server_request.h +++ b/receiver/src/receiver_data_server/receiver_data_server_request.h @@ -1,5 +1,5 @@ -#ifndef ASAPO_REQUEST_H -#define ASAPO_REQUEST_H +#ifndef ASAPO_RECEIVER_DATA_SERVER_REQUEST_H +#define ASAPO_RECEIVER_DATA_SERVER_REQUEST_H #include "common/networking.h" @@ -11,9 +11,8 @@ class NetServer; class ReceiverDataServerRequest : public GenericRequest { public: - explicit ReceiverDataServerRequest(GenericRequestHeader header, uint64_t net_id, const NetServer* server); - const uint64_t net_id; - const NetServer* server; + explicit ReceiverDataServerRequest(GenericRequestHeader header, uint64_t source_id); + const uint64_t source_id; ~ReceiverDataServerRequest() = default; }; @@ -22,4 +21,4 @@ using ReceiverDataServerRequestPtr = std::unique_ptr<ReceiverDataServerRequest>; } -#endif //ASAPO_REQUEST_H +#endif //ASAPO_RECEIVER_DATA_SERVER_REQUEST_H diff --git a/receiver/src/receiver_data_server/receiver_data_server_request_handler.cpp b/receiver/src/receiver_data_server/receiver_data_server_request_handler.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ca7b80ca0e4392d49add9c2949568397931e5667 --- /dev/null +++ b/receiver/src/receiver_data_server/receiver_data_server_request_handler.cpp @@ -0,0 +1,86 @@ +#include "receiver_data_server_request_handler.h" + +#include "receiver_data_server_error.h" + +namespace asapo { + +ReceiverDataServerRequestHandler::ReceiverDataServerRequestHandler(const NetServer* server, + DataCache* data_cache): log__{GetDefaultReceiverDataServerLogger()}, server_{server}, + data_cache_{data_cache} { + +} + + +bool ReceiverDataServerRequestHandler::CheckRequest(const ReceiverDataServerRequest* request) { + return request->header.op_code == kOpcodeGetBufferData; +} + +Error ReceiverDataServerRequestHandler::SendData(const ReceiverDataServerRequest* request, + void* data, + CacheMeta* meta) { + auto err = SendResponce(request, kNetErrorNoError); + if (err) { + data_cache_->UnlockSlot(meta); + return err; + } + err = server_->SendData(request->source_id, data, request->header.data_size); + log__->Debug("sending data from memory cache, id:" + std::to_string(request->header.data_id)); + data_cache_->UnlockSlot(meta); + return err; +} + +void* ReceiverDataServerRequestHandler::GetSlot(const ReceiverDataServerRequest* request, CacheMeta** meta) { + void* buf = nullptr; + if (data_cache_) { + buf = data_cache_->GetSlotToReadAndLock(request->header.data_id, request->header.data_size, + meta); + if (!buf) { + log__->Debug("data not found in memory cache, id:" + std::to_string(request->header.data_id)); + } + + } + if (buf == nullptr) { + SendResponce(request, kNetErrorNoData); + } + return buf; +} + + +Error ReceiverDataServerRequestHandler::ProcessRequestUnlocked(GenericRequest* request) { + auto receiver_request = dynamic_cast<ReceiverDataServerRequest*>(request); + if (!CheckRequest(receiver_request)) { + SendResponce(receiver_request, kNetErrorWrongRequest); + log__->Error("wrong request, code:" + std::to_string(receiver_request->header.op_code)); + return nullptr; + } + + CacheMeta* meta; + auto buf = GetSlot(receiver_request, &meta); + if (buf == nullptr) { + return nullptr; + } + + SendData(receiver_request, buf, meta); + return nullptr; +} + +bool ReceiverDataServerRequestHandler::ReadyProcessRequest() { + return true; // always ready +} + +void ReceiverDataServerRequestHandler::PrepareProcessingRequestLocked() { +// do nothing +} + +void ReceiverDataServerRequestHandler::TearDownProcessingRequestLocked(const Error& error_from_process) { +// do nothing +} + +Error ReceiverDataServerRequestHandler::SendResponce(const ReceiverDataServerRequest* request, NetworkErrorCode code) { + GenericNetworkResponse responce; + responce.op_code = kOpcodeGetBufferData; + responce.error_code = code; + return server_->SendData(request->source_id, &responce, sizeof(GenericNetworkResponse)); +} + +} \ No newline at end of file diff --git a/receiver/src/receiver_data_server/receiver_data_server_request_handler.h b/receiver/src/receiver_data_server/receiver_data_server_request_handler.h new file mode 100644 index 0000000000000000000000000000000000000000..aba072c53c22ec695b2f869928d9a0291ca37b23 --- /dev/null +++ b/receiver/src/receiver_data_server/receiver_data_server_request_handler.h @@ -0,0 +1,31 @@ +#ifndef ASAPO_RECEIVER_DATA_SERVER_REQUEST_HANDLER_H +#define ASAPO_RECEIVER_DATA_SERVER_REQUEST_HANDLER_H + +#include "request/request_handler.h" +#include "net_server.h" +#include "../data_cache.h" +#include "receiver_data_server_request.h" +#include "receiver_data_server_logger.h" + +namespace asapo { + +class ReceiverDataServerRequestHandler: public RequestHandler { + public: + explicit ReceiverDataServerRequestHandler(const NetServer* server, DataCache* data_cache); + Error ProcessRequestUnlocked(GenericRequest* request) override; + bool ReadyProcessRequest() override; + void PrepareProcessingRequestLocked() override; + void TearDownProcessingRequestLocked(const Error& error_from_process) override; + const AbstractLogger* log__; + private: + const NetServer* server_; + DataCache* data_cache_; + bool CheckRequest(const ReceiverDataServerRequest* request); + Error SendResponce(const ReceiverDataServerRequest* request, NetworkErrorCode code); + Error SendData(const ReceiverDataServerRequest* request, void* data, CacheMeta* meta); + void* GetSlot(const ReceiverDataServerRequest* request, CacheMeta** meta); +}; + +} + +#endif //ASAPO_RECEIVER_DATA_SERVER_REQUEST_HANDLER_H diff --git a/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.cpp b/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.cpp index d28ea747160f915e7f4261c8ec6e7fa7581fbd88..be4b9fdb64bce89eb9508749d5d6818b22801eb9 100644 --- a/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.cpp +++ b/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.cpp @@ -1,9 +1,16 @@ #include "receiver_data_server_request_handler_factory.h" +#include "receiver_data_server_request_handler.h" + namespace asapo { std::unique_ptr<RequestHandler> ReceiverDataServerRequestHandlerFactory::NewRequestHandler(uint64_t thread_id, uint64_t* shared_counter) { - return std::unique_ptr<RequestHandler>(); + return std::unique_ptr<RequestHandler> {new ReceiverDataServerRequestHandler(server_, data_cache_)}; +} +ReceiverDataServerRequestHandlerFactory::ReceiverDataServerRequestHandlerFactory(const NetServer* server, + DataCache* data_cache) : server_{server}, + data_cache_{data_cache} { + } } \ No newline at end of file diff --git a/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.h b/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.h index 1176dd57deb32d07946f69d0c5df2a20d55939cc..93d7815eb58f1528fea95cdf19bf2f515fbb25d7 100644 --- a/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.h +++ b/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.h @@ -5,13 +5,18 @@ #include "request/request_handler.h" #include "preprocessor/definitions.h" +#include "net_server.h" +#include "../data_cache.h" namespace asapo { class ReceiverDataServerRequestHandlerFactory : public RequestHandlerFactory { public: + ReceiverDataServerRequestHandlerFactory (const NetServer* server, DataCache* data_cache); VIRTUAL std::unique_ptr<RequestHandler> NewRequestHandler(uint64_t thread_id, uint64_t* shared_counter) override; private: + const NetServer* server_; + DataCache* data_cache_; }; diff --git a/receiver/src/receiver_data_server/tcp_server.cpp b/receiver/src/receiver_data_server/tcp_server.cpp index ec8e7ad3e89c33189f223cc4405471e5db1cdb38..914c880a81686b91e4233799c33df8b35985f640 100644 --- a/receiver/src/receiver_data_server/tcp_server.cpp +++ b/receiver/src/receiver_data_server/tcp_server.cpp @@ -51,7 +51,7 @@ ReceiverDataServerRequestPtr TcpServer::ReadRequest(SocketDescriptor socket, Err ); return nullptr; } - return ReceiverDataServerRequestPtr{new ReceiverDataServerRequest{std::move(header), (uint64_t) socket, this}}; + return ReceiverDataServerRequestPtr{new ReceiverDataServerRequest{std::move(header), (uint64_t) socket}}; } GenericRequests TcpServer::ReadRequests(const ListSocketDescriptors& sockets) const noexcept { @@ -92,4 +92,13 @@ TcpServer::~TcpServer() { } +Error TcpServer::SendData(uint64_t source_id, void* buf, uint64_t size) const noexcept { + Error err; + io__->Send(source_id, buf, size, &err); + if (err) { + log__->Error("cannot send to worker" + err->Explain()); + } + return err; +} + } \ No newline at end of file diff --git a/receiver/src/receiver_data_server/tcp_server.h b/receiver/src/receiver_data_server/tcp_server.h index 24eb78c58e0076a652b55dea8e2e65a6c1bb072c..3cc08ee83ec5e3a85551142c4ab001c52d284fc0 100644 --- a/receiver/src/receiver_data_server/tcp_server.h +++ b/receiver/src/receiver_data_server/tcp_server.h @@ -13,7 +13,8 @@ class TcpServer : public NetServer { public: TcpServer(std::string address); ~TcpServer(); - virtual GenericRequests GetNewRequests(Error* err) const noexcept override ; + GenericRequests GetNewRequests(Error* err) const noexcept override ; + Error SendData(uint64_t source_id, void* buf, uint64_t size) const noexcept override; std::unique_ptr<IO> io__; const AbstractLogger* log__; private: diff --git a/receiver/unittests/receiver_data_server/receiver_dataserver_mocking.h b/receiver/unittests/receiver_data_server/receiver_dataserver_mocking.h index 390a628e731e17add3c4153129d24d51aa0cccde..611a4a08e90203dd907d18a481c8b43a5d2e0b56 100644 --- a/receiver/unittests/receiver_data_server/receiver_dataserver_mocking.h +++ b/receiver/unittests/receiver_data_server/receiver_dataserver_mocking.h @@ -1,5 +1,5 @@ -#ifndef ASAPO_MOCK_STATISTICS_H -#define ASAPO_MOCK_STATISTICS_H +#ifndef ASAPO_RECEIVER_DATASERVER_MOCKING_H +#define ASAPO_RECEIVER_DATASERVER_MOCKING_H #include <gtest/gtest.h> #include <gmock/gmock.h> @@ -18,7 +18,7 @@ class MockNetServer : public NetServer { err->reset(error); GenericRequests res; for (const auto& preq : reqs) { - ReceiverDataServerRequestPtr ptr = ReceiverDataServerRequestPtr{new ReceiverDataServerRequest{preq.header, preq.net_id, preq.server}}; + ReceiverDataServerRequestPtr ptr = ReceiverDataServerRequestPtr{new ReceiverDataServerRequest{preq.header, preq.source_id}}; res.push_back(std::move(ptr)); } return res; @@ -26,6 +26,14 @@ class MockNetServer : public NetServer { MOCK_CONST_METHOD1(GetNewRequests_t, std::vector<ReceiverDataServerRequest> (ErrorInterface** error)); + + Error SendData(uint64_t source_id, void* buf, uint64_t size) const noexcept override { + return Error{SendData_t(source_id, buf, size)}; + + }; + + MOCK_CONST_METHOD3(SendData_t, ErrorInterface * (uint64_t source_id, void* buf, uint64_t size)); + }; class MockPool : public RequestPool { @@ -47,4 +55,4 @@ class MockPool : public RequestPool { } -#endif //ASAPO_MOCK_STATISTICS_H +#endif //ASAPO_RECEIVER_DATASERVER_MOCKING_H diff --git a/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp b/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp index 9a2e4c7146de86b1cc60d4f08245fe0c5bde34c6..599451d12ae18f5064d98eceea5b3a58da3a6f84 100644 --- a/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp +++ b/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp @@ -37,7 +37,7 @@ using asapo::ReceiverDataServerRequest; namespace { TEST(ReceiverDataServer, Constructor) { - ReceiverDataServer data_server{"", asapo::LogLevel::Debug, 4}; + ReceiverDataServer data_server{"", asapo::LogLevel::Debug, 4, nullptr}; ASSERT_THAT(dynamic_cast<const asapo::TcpServer*>(data_server.net__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::RequestPool*>(data_server.request_pool__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(data_server.log__), Ne(nullptr)); @@ -46,7 +46,7 @@ TEST(ReceiverDataServer, Constructor) { class ReceiverDataServerTests : public Test { public: std::string expected_address = "somehost:123"; - ReceiverDataServer data_server{expected_address, asapo::LogLevel::Debug, 0}; + ReceiverDataServer data_server{expected_address, asapo::LogLevel::Debug, 0, nullptr}; asapo::MockNetServer mock_net; asapo::MockPool mock_pool; NiceMock<asapo::MockLogger> mock_logger; @@ -101,10 +101,10 @@ TEST_F(ReceiverDataServerTests, ErrorAddingRequests) { ); EXPECT_CALL(mock_pool, AddRequests_t(_)).WillOnce( - Return(asapo::ReceiverErrorTemplates::kMemoryPool.Generate("cannot add request to pool").release()) + Return(asapo::ReceiverDataServerErrorTemplates::kMemoryPool.Generate("cannot add request to pool").release()) ); - auto errtext = asapo::ReceiverErrorTemplates::kMemoryPool.Generate("cannot add request to pool")->Explain(); + auto errtext = asapo::ReceiverDataServerErrorTemplates::kMemoryPool.Generate("cannot add request to pool")->Explain(); EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("stopped"), HasSubstr(errtext)))); diff --git a/receiver/unittests/receiver_data_server/test_request_handler.cpp b/receiver/unittests/receiver_data_server/test_request_handler.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a84efd8bd114f97907049ce2a94b58da70f41a87 --- /dev/null +++ b/receiver/unittests/receiver_data_server/test_request_handler.cpp @@ -0,0 +1,146 @@ +#include <gtest/gtest.h> +#include <gmock/gmock.h> + + +#include "unittests/MockLogger.h" +#include "../../src/receiver_data_server/receiver_data_server.h" +#include "../../src/receiver_data_server/receiver_data_server_request_handler.h" + +#include "../receiver_mocking.h" +#include "receiver_dataserver_mocking.h" +#include "../../src/receiver_data_server/receiver_data_server_error.h" +#include "common/io_error.h" + +using ::testing::Test; +using ::testing::Gt; +using ::testing::Ge; +using ::testing::Le; +using ::testing::Eq; +using ::testing::Ne; +using ::testing::Ref; +using ::testing::Return; +using ::testing::_; +using ::testing::SetArgPointee; +using ::testing::NiceMock; +using ::testing::HasSubstr; + + +using asapo::ReceiverDataServer; +using asapo::ReceiverDataServerRequestHandler; + + +namespace { + +MATCHER_P3(M_CheckResponce, op_code, error_code, message, + "Checks if a valid GenericNetworkResponse was used") { + return ((asapo::GenericNetworkResponse*)arg)->op_code == op_code + && ((asapo::GenericNetworkResponse*)arg)->error_code == uint64_t(error_code); +} + +TEST(RequestHandlerTest, Constructor) { + ReceiverDataServerRequestHandler handler{nullptr, nullptr}; + ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(handler.log__), Ne(nullptr)); +} + + +class RequestHandlerTests : public Test { + public: + asapo::MockNetServer mock_net; + asapo::MockDataCache mock_cache; + ReceiverDataServerRequestHandler handler{&mock_net, &mock_cache}; + ReceiverDataServerRequestHandler handler_no_cache{&mock_net, nullptr}; + NiceMock<asapo::MockLogger> mock_logger; + uint64_t expected_data_size = 1001243214; + uint64_t expected_buf_id = 12345; + uint64_t expected_source_id = 11; + asapo::GenericRequestHeader header{asapo::kOpcodeGetBufferData, expected_buf_id, expected_data_size, ""}; + asapo::ReceiverDataServerRequest request{std::move(header), expected_source_id}; + uint8_t tmp; + void SetUp() override { + handler.log__ = &mock_logger; + } + void TearDown() override { + } + void MockGetSlot(bool ok = true); + void MockSendResponce(asapo::NetworkErrorCode err_code, bool ok = true); + +}; + +void RequestHandlerTests::MockGetSlot(bool ok) { + EXPECT_CALL(mock_cache, GetSlotToReadAndLock(expected_buf_id, expected_data_size, _)).WillOnce( + Return(ok ? &tmp : nullptr) + ); +} + +void RequestHandlerTests::MockSendResponce(asapo::NetworkErrorCode err_code, bool ok) { + EXPECT_CALL(mock_net, SendData_t(expected_source_id, + M_CheckResponce(asapo::kOpcodeGetBufferData, err_code, ""), sizeof(asapo::GenericNetworkResponse))).WillOnce( + Return(ok ? nullptr : asapo::IOErrorTemplates::kUnknownIOError.Generate().release()) + ); +} + +TEST_F(RequestHandlerTests, RequestAlwaysReady) { + auto res = handler.ReadyProcessRequest(); + + ASSERT_THAT(res, Eq(true)); +} + +TEST_F(RequestHandlerTests, ProcessRequest_WronOpCode) { + request.header.op_code = asapo::kOpcodeUnknownOp; + MockSendResponce(asapo::kNetErrorWrongRequest, false); + + EXPECT_CALL(mock_logger, Error(HasSubstr("wrong request"))); + + auto err = handler.ProcessRequestUnlocked(&request); + + ASSERT_THAT(err, Eq(nullptr)); +} + +TEST_F(RequestHandlerTests, ProcessRequestReturnsNoDataWhenCacheNotUsed) { + MockSendResponce(asapo::kNetErrorNoData, true); + + auto err = handler_no_cache.ProcessRequestUnlocked(&request); + EXPECT_CALL(mock_logger, Debug(_)).Times(0); + + ASSERT_THAT(err, Eq(nullptr)); +} + +TEST_F(RequestHandlerTests, ProcessRequestReadSlotReturnsNull) { + MockGetSlot(false); + MockSendResponce(asapo::kNetErrorNoData, true); + EXPECT_CALL(mock_logger, Debug(HasSubstr("not found"))); + + auto err = handler.ProcessRequestUnlocked(&request); + + ASSERT_THAT(err, Eq(nullptr)); +} + + +TEST_F(RequestHandlerTests, ProcessRequestReadSlotErrorSendingResponce) { + MockGetSlot(true); + MockSendResponce(asapo::kNetErrorNoError, false); + EXPECT_CALL(mock_net, SendData_t(expected_source_id, &tmp, expected_data_size)).Times(0); + EXPECT_CALL(mock_cache, UnlockSlot(_)); + + auto err = handler.ProcessRequestUnlocked(&request); + + ASSERT_THAT(err, Eq(nullptr)); +} + + + +TEST_F(RequestHandlerTests, ProcessRequestOk) { + MockGetSlot(true); + MockSendResponce(asapo::kNetErrorNoError, true); + EXPECT_CALL(mock_net, SendData_t(expected_source_id, &tmp, expected_data_size)).WillOnce( + Return(nullptr) + ); + EXPECT_CALL(mock_cache, UnlockSlot(_)); + EXPECT_CALL(mock_logger, Debug(HasSubstr("sending"))); + + auto err = handler.ProcessRequestUnlocked(&request); + + ASSERT_THAT(err, Eq(nullptr)); +} + +} diff --git a/receiver/unittests/receiver_data_server/test_request_handler_factory.cpp b/receiver/unittests/receiver_data_server/test_request_handler_factory.cpp new file mode 100644 index 0000000000000000000000000000000000000000..13e391c37085e756be00f029a7e6684019b8e16e --- /dev/null +++ b/receiver/unittests/receiver_data_server/test_request_handler_factory.cpp @@ -0,0 +1,40 @@ +#include <gtest/gtest.h> +#include <gmock/gmock.h> + + +#include "unittests/MockLogger.h" +#include "../../src/receiver_data_server/receiver_data_server.h" +#include "../../src/receiver_data_server/receiver_data_server_request_handler_factory.h" +#include "../../src/receiver_data_server/receiver_data_server_request_handler.h" + + + +using ::testing::Test; +using ::testing::Gt; +using ::testing::Ge; +using ::testing::Le; +using ::testing::Eq; +using ::testing::Ne; +using ::testing::Ref; +using ::testing::Return; +using ::testing::_; +using ::testing::SetArgPointee; +using ::testing::NiceMock; +using ::testing::HasSubstr; + + +using asapo::ReceiverDataServer; +using asapo::ReceiverDataServerRequestHandlerFactory; + + +namespace { + +TEST(ReceiverDataServerRequestHandlerFactory, Constructor) { + ReceiverDataServer data_server{"", asapo::LogLevel::Debug, 4, nullptr}; + ReceiverDataServerRequestHandlerFactory factory((asapo::NetServer*)&data_server, nullptr); + auto handler = factory.NewRequestHandler(1, nullptr); + ASSERT_THAT(dynamic_cast<const asapo::ReceiverDataServerRequestHandler*>(handler.get()), Ne(nullptr)); +} + + +} diff --git a/receiver/unittests/receiver_data_server/test_tcp_server.cpp b/receiver/unittests/receiver_data_server/test_tcp_server.cpp index f7c35a80e734244f5af7aeb3cb5003cec224d927..4014e70ba9c8bfcf6f5fcd843c3f42fc61dcdbf1 100644 --- a/receiver/unittests/receiver_data_server/test_tcp_server.cpp +++ b/receiver/unittests/receiver_data_server/test_tcp_server.cpp @@ -224,13 +224,31 @@ TEST_F(TCPServerTests, GetNewRequestsReadOk) { ASSERT_THAT(requests.size(), Eq(3)); int i = 0; for (auto conn : expected_client_sockets) { + ASSERT_THAT(dynamic_cast<asapo::ReceiverDataServerRequest*>(requests[i].get()), Ne(nullptr)); ASSERT_THAT(requests[i]->header.data_id, Eq(conn)); ASSERT_THAT(requests[i]->header.op_code, Eq(asapo::kOpcodeGetBufferData)); -// ASSERT_THAT(requests[i]->net_id, Eq(conn)); +// ASSERT_THAT(requests[i]->source_id, Eq(conn)); // ASSERT_THAT(requests[i]->server, Eq(&tcp_server)); i++; } } +TEST_F(TCPServerTests, SendData) { + uint8_t tmp; + + EXPECT_CALL(mock_io, Send_t(1, &tmp, 10, _)) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), + Return(1) + )); + + EXPECT_CALL(mock_logger, Error(HasSubstr("cannot send"))); + + auto err = tcp_server.SendData(1, &tmp, 10); + + ASSERT_THAT(err, Ne(nullptr)); +} + } diff --git a/receiver/unittests/receiver_mocking.h b/receiver/unittests/receiver_mocking.h index d8d3a6efc215abd95e072ee0b5cf03d6668036ce..d74e5cb10e1a2bb1c31545c5720168ffd4d34828 100644 --- a/receiver/unittests/receiver_mocking.h +++ b/receiver/unittests/receiver_mocking.h @@ -1,5 +1,5 @@ -#ifndef ASAPO_MOCK_STATISTICS_H -#define ASAPO_MOCK_STATISTICS_H +#ifndef ASAPO_RECEIVER_MOCKING_H +#define ASAPO_RECEIVER_MOCKING_H #include <gtest/gtest.h> #include <gmock/gmock.h> @@ -69,9 +69,12 @@ class MockDataCache: public DataCache { MOCK_METHOD2(GetFreeSlotAndLock, void* (uint64_t size, CacheMeta** meta)); MOCK_METHOD1(UnlockSlot, bool(CacheMeta* meta)); + MOCK_METHOD3(GetSlotToReadAndLock, void* (uint64_t + id, uint64_t data_size, CacheMeta** meta)); + }; } -#endif //ASAPO_MOCK_STATISTICS_H +#endif //ASAPO_RECEIVER_MOCKING_H diff --git a/receiver/unittests/test_config.cpp b/receiver/unittests/test_config.cpp index 2ea4e9bcc66cc5c1883c6a8739ac01ba818d65e3..eb99b2ae6c88d5f984b5052013ce6688d9a9279f 100644 --- a/receiver/unittests/test_config.cpp +++ b/receiver/unittests/test_config.cpp @@ -101,7 +101,7 @@ TEST_F(ConfigTests, ErrorReadSettings) { std::vector<std::string>fields {"MonitorDbAddress", "ListenPort", "DataServer", "ListenPort", "WriteToDisk", "WriteToDb", "DataCache", "Use", "SizeGB", "ReservedShare", "BrokerDbAddress", "Tag", "AuthorizationServer", "AuthorizationInterval", "RootFolder", "MonitorDbName", "LogLevel", - "SourceHost","NThreads"}; + "SourceHost", "NThreads"}; for (const auto& field : fields) { auto err = asapo::SetReceiverConfig(test_config, field); ASSERT_THAT(err, Ne(nullptr)); diff --git a/receiver/unittests/test_datacache.cpp b/receiver/unittests/test_datacache.cpp index 8e7f85569bc7a6e792c03f76a30ab0d1d8d8100f..1994fdad2fe0d07d99c545e8058098f697ecc751 100644 --- a/receiver/unittests/test_datacache.cpp +++ b/receiver/unittests/test_datacache.cpp @@ -96,7 +96,7 @@ TEST_F(DataCacheTests, GetFreeSlotCannotWriteIfAlreadyWriting) { TEST_F(DataCacheTests, PrepareToReadIdNotFound) { uint64_t id; id = 0; - uint8_t* addr = (uint8_t*) cache.GetSlotToReadAndLock(id, &meta1); + uint8_t* addr = (uint8_t*) cache.GetSlotToReadAndLock(id, expected_size, &meta1); ASSERT_THAT(addr, Eq(nullptr)); ASSERT_THAT(meta1, Eq(nullptr)); } @@ -105,7 +105,7 @@ TEST_F(DataCacheTests, PrepareToReadOk) { uint64_t data_size = expected_cache_size * 0.7; uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlotAndLock(data_size, &meta1); - uint8_t* addr = (uint8_t*) cache.GetSlotToReadAndLock(meta1->id, &meta2); + uint8_t* addr = (uint8_t*) cache.GetSlotToReadAndLock(meta1->id, data_size, &meta2); ASSERT_THAT(addr, Eq(ini_addr)); ASSERT_THAT(meta1, Eq(meta2)); ASSERT_THAT(meta2->size, Eq(data_size)); @@ -116,13 +116,11 @@ TEST_F(DataCacheTests, PrepareToReadFailsIfTooCloseToCurrentPointer) { auto data_size = expected_cache_size * 0.9; cache.GetFreeSlotAndLock(data_size, &meta1); - uint8_t* addr = (uint8_t*) cache.GetSlotToReadAndLock(meta1->id, &meta2); + uint8_t* addr = (uint8_t*) cache.GetSlotToReadAndLock(meta1->id, data_size, &meta2); ASSERT_THAT(addr, Eq(nullptr)); } - - TEST_F(DataCacheTests, GetFreeSlotRemovesOldMetadataRecords) { CacheMeta* meta3, *meta4, *meta5; CacheMeta* meta; @@ -132,10 +130,10 @@ TEST_F(DataCacheTests, GetFreeSlotRemovesOldMetadataRecords) { cache.GetFreeSlotAndLock(10, &meta4); cache.GetFreeSlotAndLock(30, &meta5); - uint8_t* addr1 = (uint8_t*) cache.GetSlotToReadAndLock(meta1->id, &meta); - uint8_t* addr2 = (uint8_t*) cache.GetSlotToReadAndLock(meta2->id, &meta); - uint8_t* addr3 = (uint8_t*) cache.GetSlotToReadAndLock(meta3->id, &meta); - uint8_t* addr4 = (uint8_t*) cache.GetSlotToReadAndLock(meta4->id, &meta); + uint8_t* addr1 = (uint8_t*) cache.GetSlotToReadAndLock(meta1->id, 10, &meta); + uint8_t* addr2 = (uint8_t*) cache.GetSlotToReadAndLock(meta2->id, 10, &meta); + uint8_t* addr3 = (uint8_t*) cache.GetSlotToReadAndLock(meta3->id, expected_cache_size - 30, &meta); + uint8_t* addr4 = (uint8_t*) cache.GetSlotToReadAndLock(meta4->id, 10, &meta); ASSERT_THAT(addr1, Eq(nullptr)); ASSERT_THAT(addr2, Eq(nullptr)); @@ -145,11 +143,32 @@ TEST_F(DataCacheTests, GetFreeSlotRemovesOldMetadataRecords) { } +TEST_F(DataCacheTests, GetSlotToReadSizeOk) { + CacheMeta* meta; + cache.GetFreeSlotAndLock(expected_size, &meta1); + + uint8_t* addr1 = (uint8_t*) cache.GetSlotToReadAndLock(meta1->id, expected_size, &meta); + + ASSERT_THAT(addr1, Ne(nullptr)); + ASSERT_THAT(meta->size, Eq(expected_size)); +} + +TEST_F(DataCacheTests, GetSlotToReadWrongSize) { + CacheMeta* meta; + cache.GetFreeSlotAndLock(expected_size, &meta1); + + uint8_t* addr1 = (uint8_t*) cache.GetSlotToReadAndLock(meta1->id, expected_size + 1, &meta); + + ASSERT_THAT(addr1, Eq(nullptr)); +} + + + TEST_F(DataCacheTests, CannotGetFreeSlotIfNeedCleanOnebeingReaded) { CacheMeta* meta; uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlotAndLock(10, &meta1); - auto res = cache.GetSlotToReadAndLock(meta1->id, &meta); + auto res = cache.GetSlotToReadAndLock(meta1->id, 10, &meta); uint8_t* addr = (uint8_t*) cache.GetFreeSlotAndLock(expected_cache_size, &meta2); ASSERT_THAT(ini_addr, Ne(nullptr)); @@ -162,7 +181,7 @@ TEST_F(DataCacheTests, CanGetFreeSlotIfWasUnlocked) { CacheMeta* meta; cache.GetFreeSlotAndLock(10, &meta1); cache.UnlockSlot(meta1); - cache.GetSlotToReadAndLock(meta1->id, &meta); + cache.GetSlotToReadAndLock(meta1->id, 10, &meta); cache.UnlockSlot(meta); auto addr = cache.GetFreeSlotAndLock(expected_cache_size, &meta2); @@ -172,8 +191,8 @@ TEST_F(DataCacheTests, CanGetFreeSlotIfWasUnlocked) { TEST_F(DataCacheTests, IncreasLockForEveryRead) { CacheMeta* meta; cache.GetFreeSlotAndLock(10, &meta1); - cache.GetSlotToReadAndLock(meta1->id, &meta); - cache.GetSlotToReadAndLock(meta1->id, &meta); + cache.GetSlotToReadAndLock(meta1->id, 10, &meta); + cache.GetSlotToReadAndLock(meta1->id, 10, &meta); cache.UnlockSlot(meta); auto addr = cache.GetFreeSlotAndLock(expected_cache_size, &meta2); @@ -185,8 +204,8 @@ TEST_F(DataCacheTests, DecreasLockForEveryUnlock) { cache.GetFreeSlotAndLock(10, &meta1); cache.UnlockSlot(meta1); - cache.GetSlotToReadAndLock(meta1->id, &meta); - cache.GetSlotToReadAndLock(meta1->id, &meta); + cache.GetSlotToReadAndLock(meta1->id, 10, &meta); + cache.GetSlotToReadAndLock(meta1->id, 10, &meta); cache.UnlockSlot(meta); cache.UnlockSlot(meta); auto addr = cache.GetFreeSlotAndLock(expected_cache_size, &meta2);