diff --git a/receiver/src/receiver_data_server/net_server.h b/receiver/src/receiver_data_server/net_server.h deleted file mode 100644 index ffdaaaf003a058127e41e03e69523d539f7761d8..0000000000000000000000000000000000000000 --- a/receiver/src/receiver_data_server/net_server.h +++ /dev/null @@ -1,20 +0,0 @@ -#ifndef ASAPO_NET_SERVER_H -#define ASAPO_NET_SERVER_H - -#include "common/error.h" - -#include "request/request.h" - -namespace asapo { - -class NetServer { - public: - virtual GenericRequests GetNewRequests(Error* err) const noexcept = 0; - virtual Error SendData(uint64_t source_id, void* buf, uint64_t size) const noexcept = 0; - virtual void HandleAfterError(uint64_t source_id) const noexcept = 0; - virtual ~NetServer() = default; -}; - -} - -#endif //ASAPO_NET_SERVER_H diff --git a/receiver/src/receiver_data_server/rds_net_server.h b/receiver/src/receiver_data_server/rds_net_server.h new file mode 100644 index 0000000000000000000000000000000000000000..ee49cc4af4bedd97ef9d1416ed39143cb55859de --- /dev/null +++ b/receiver/src/receiver_data_server/rds_net_server.h @@ -0,0 +1,28 @@ +#ifndef ASAPO_RDS_NET_SERVER_H +#define ASAPO_RDS_NET_SERVER_H + +#include "../data_cache.h" +#include "common/error.h" +#include "receiver_data_server_request.h" + +namespace asapo { + +class RdsNetServer { + public: + /** + * It is very important that this function is successfully called, before any other call is is made! + */ + virtual Error Initialize() = 0; + virtual GenericRequests GetNewRequests(Error* err) = 0; + virtual Error SendResponse(const ReceiverDataServerRequest* request, + const GenericNetworkResponse* response) = 0; + virtual Error + SendResponseAndSlotData(const ReceiverDataServerRequest* request, const GenericNetworkResponse* response, + const CacheMeta* cache_slot) = 0; + virtual void HandleAfterError(uint64_t source_id) = 0; + virtual ~RdsNetServer() = default; +}; + +} + +#endif //ASAPO_RDS_NET_SERVER_H diff --git a/receiver/src/receiver_data_server/receiver_data_server.cpp b/receiver/src/receiver_data_server/receiver_data_server.cpp index eca610743ea0eaf3efc71e54b180095e1cc32b6a..81566167b717499b7a3d76c4bc79db6ae437f800 100644 --- a/receiver/src/receiver_data_server/receiver_data_server.cpp +++ b/receiver/src/receiver_data_server/receiver_data_server.cpp @@ -17,6 +17,13 @@ config_{config}, statistics__{new Statistics()} { } void ReceiverDataServer::Run() { + { + Error startError = net__->Initialize(); + if (startError) { + log__->Error(std::string("Error starting rds net server: ") + startError->Explain()); + return; + } + } while (true) { statistics__->SendIfNeeded(); Error err; @@ -34,4 +41,4 @@ void ReceiverDataServer::Run() { } } -} \ No newline at end of file +} diff --git a/receiver/src/receiver_data_server/receiver_data_server.h b/receiver/src/receiver_data_server/receiver_data_server.h index c7edda3f3ab16f1c0376f489d943bd96574ea252..889ce357c5fa43ab19a3c5874e3bc3c8a81a8eae 100644 --- a/receiver/src/receiver_data_server/receiver_data_server.h +++ b/receiver/src/receiver_data_server/receiver_data_server.h @@ -3,7 +3,7 @@ #include <memory> -#include "net_server.h" +#include "rds_net_server.h" #include "request/request_pool.h" #include "logger/logger.h" #include "../data_cache.h" @@ -21,7 +21,7 @@ class ReceiverDataServer { explicit ReceiverDataServer(std::string address, LogLevel log_level, SharedCache data_cache, const ReceiverDataCenterConfig& config); std::unique_ptr<RequestPool> request_pool__; - std::unique_ptr<NetServer> net__; + std::unique_ptr<RdsNetServer> net__; const AbstractLogger* log__; void Run(); private: diff --git a/receiver/src/receiver_data_server/receiver_data_server_request.h b/receiver/src/receiver_data_server/receiver_data_server_request.h index 0a6d24f63787594009b3b9fefa4ebdf452577780..541e08dddf45cf259861699a97e6f941bf5d9b6b 100644 --- a/receiver/src/receiver_data_server/receiver_data_server_request.h +++ b/receiver/src/receiver_data_server/receiver_data_server_request.h @@ -7,7 +7,7 @@ namespace asapo { -class NetServer; +class RdsNetServer; class ReceiverDataServerRequest : public GenericRequest { public: diff --git a/receiver/src/receiver_data_server/receiver_data_server_request_handler.cpp b/receiver/src/receiver_data_server/receiver_data_server_request_handler.cpp index 03d8ae764549afee64c6e3e5f4a555bcf2f67ac6..bc45e229eea4b6f21a9963488f7c2447cc91cf40 100644 --- a/receiver/src/receiver_data_server/receiver_data_server_request_handler.cpp +++ b/receiver/src/receiver_data_server/receiver_data_server_request_handler.cpp @@ -4,7 +4,7 @@ namespace asapo { -ReceiverDataServerRequestHandler::ReceiverDataServerRequestHandler(const NetServer* server, +ReceiverDataServerRequestHandler::ReceiverDataServerRequestHandler(RdsNetServer* server, DataCache* data_cache, Statistics* statistics): log__{GetDefaultReceiverDataServerLogger()}, statistics__{statistics}, server_{server}, data_cache_{data_cache} { @@ -15,59 +15,53 @@ bool ReceiverDataServerRequestHandler::CheckRequest(const ReceiverDataServerRequ return request->header.op_code == kOpcodeGetBufferData; } -Error ReceiverDataServerRequestHandler::SendData(const ReceiverDataServerRequest* request, - void* data, - CacheMeta* meta) { - auto err = SendResponce(request, kNetErrorNoError); - if (err) { - data_cache_->UnlockSlot(meta); - return err; - } - err = server_->SendData(request->source_id, data, request->header.data_size); - log__->Debug("sending data from memory cache, id:" + std::to_string(request->header.data_id)); - data_cache_->UnlockSlot(meta); - return err; +Error ReceiverDataServerRequestHandler::SendResponse(const ReceiverDataServerRequest* request, NetworkErrorCode code) { + GenericNetworkResponse response{}; + response.op_code = kOpcodeGetBufferData; + response.error_code = code; + return server_->SendResponse(request, &response); +} + +Error ReceiverDataServerRequestHandler::SendResponseAndSlotData(const ReceiverDataServerRequest* request, + const CacheMeta* meta) { + GenericNetworkResponse response{}; + response.op_code = kOpcodeGetBufferData; + response.error_code = kNetErrorNoError; + return server_->SendResponseAndSlotData(request, &response, + meta); } -void* ReceiverDataServerRequestHandler::GetSlot(const ReceiverDataServerRequest* request, CacheMeta** meta) { - void* buf = nullptr; +CacheMeta* ReceiverDataServerRequestHandler::GetSlotAndLock(const ReceiverDataServerRequest* request) { + CacheMeta* meta = nullptr; if (data_cache_) { - buf = data_cache_->GetSlotToReadAndLock(request->header.data_id, request->header.data_size, - meta); - if (!buf) { + data_cache_->GetSlotToReadAndLock(request->header.data_id, request->header.data_size, &meta); + if (!meta) { log__->Debug("data not found in memory cache, id:" + std::to_string(request->header.data_id)); } - - } - if (buf == nullptr) { - SendResponce(request, kNetErrorNoData); } - return buf; + return meta; } - bool ReceiverDataServerRequestHandler::ProcessRequestUnlocked(GenericRequest* request, bool* retry) { *retry = false; auto receiver_request = dynamic_cast<ReceiverDataServerRequest*>(request); if (!CheckRequest(receiver_request)) { - SendResponce(receiver_request, kNetErrorWrongRequest); - server_->HandleAfterError(receiver_request->source_id); - log__->Error("wrong request, code:" + std::to_string(receiver_request->header.op_code)); + HandleInvalidRequest(receiver_request); return true; } - CacheMeta* meta; - auto buf = GetSlot(receiver_request, &meta); - if (buf == nullptr) { + CacheMeta* meta = GetSlotAndLock(receiver_request); + if (!meta) { + SendResponse(receiver_request, kNetErrorNoData); return true; } - SendData(receiver_request, buf, meta); - statistics__->IncreaseRequestCounter(); - statistics__->IncreaseRequestDataVolume(receiver_request->header.data_size); + HandleValidRequest(receiver_request, meta); + data_cache_->UnlockSlot(meta); return true; } + bool ReceiverDataServerRequestHandler::ReadyProcessRequest() { return true; // always ready } @@ -76,19 +70,30 @@ void ReceiverDataServerRequestHandler::PrepareProcessingRequestLocked() { // do nothing } -void ReceiverDataServerRequestHandler::TearDownProcessingRequestLocked(bool processing_succeeded) { +void ReceiverDataServerRequestHandler::TearDownProcessingRequestLocked(bool /*processing_succeeded*/) { // do nothing } -Error ReceiverDataServerRequestHandler::SendResponce(const ReceiverDataServerRequest* request, NetworkErrorCode code) { - GenericNetworkResponse responce; - responce.op_code = kOpcodeGetBufferData; - responce.error_code = code; - return server_->SendData(request->source_id, &responce, sizeof(GenericNetworkResponse)); +void ReceiverDataServerRequestHandler::ProcessRequestTimeout(GenericRequest* /*request*/) { +// do nothing } -void ReceiverDataServerRequestHandler::ProcessRequestTimeout(GenericRequest* request) { -// do nothing +void ReceiverDataServerRequestHandler::HandleInvalidRequest(const ReceiverDataServerRequest* receiver_request) { + SendResponse(receiver_request, kNetErrorWrongRequest); + server_->HandleAfterError(receiver_request->source_id); + log__->Error("wrong request, code:" + std::to_string(receiver_request->header.op_code)); } -} \ No newline at end of file +void ReceiverDataServerRequestHandler::HandleValidRequest(const ReceiverDataServerRequest* receiver_request, + const CacheMeta* meta) { + auto err = SendResponseAndSlotData(receiver_request, meta); + if (err) { + log__->Error("failed to send slot:" + err->Explain()); + server_->HandleAfterError(receiver_request->source_id); + } else { + statistics__->IncreaseRequestCounter(); + statistics__->IncreaseRequestDataVolume(receiver_request->header.data_size); + } +} + +} diff --git a/receiver/src/receiver_data_server/receiver_data_server_request_handler.h b/receiver/src/receiver_data_server/receiver_data_server_request_handler.h index 4bae2ee45e590dc1d3e699d72e90675c04cc2b9a..44abf4ea54f772b50ffac3fff13757b5a9977481 100644 --- a/receiver/src/receiver_data_server/receiver_data_server_request_handler.h +++ b/receiver/src/receiver_data_server/receiver_data_server_request_handler.h @@ -2,7 +2,7 @@ #define ASAPO_RECEIVER_DATA_SERVER_REQUEST_HANDLER_H #include "request/request_handler.h" -#include "net_server.h" +#include "rds_net_server.h" #include "../data_cache.h" #include "receiver_data_server_request.h" #include "receiver_data_server_logger.h" @@ -12,7 +12,7 @@ namespace asapo { class ReceiverDataServerRequestHandler: public RequestHandler { public: - explicit ReceiverDataServerRequestHandler(const NetServer* server, DataCache* data_cache, Statistics* statistics); + explicit ReceiverDataServerRequestHandler(RdsNetServer* server, DataCache* data_cache, Statistics* statistics); bool ProcessRequestUnlocked(GenericRequest* request, bool* retry) override; bool ReadyProcessRequest() override; void PrepareProcessingRequestLocked() override; @@ -22,12 +22,16 @@ class ReceiverDataServerRequestHandler: public RequestHandler { const AbstractLogger* log__; Statistics* statistics__; private: - const NetServer* server_; + RdsNetServer* server_; DataCache* data_cache_; bool CheckRequest(const ReceiverDataServerRequest* request); - Error SendResponce(const ReceiverDataServerRequest* request, NetworkErrorCode code); - Error SendData(const ReceiverDataServerRequest* request, void* data, CacheMeta* meta); - void* GetSlot(const ReceiverDataServerRequest* request, CacheMeta** meta); + Error SendResponse(const ReceiverDataServerRequest* request, NetworkErrorCode code); + Error SendResponseAndSlotData(const ReceiverDataServerRequest* request, const CacheMeta* meta); + CacheMeta* GetSlotAndLock(const ReceiverDataServerRequest* request); + + void HandleInvalidRequest(const ReceiverDataServerRequest* receiver_request); + + void HandleValidRequest(const ReceiverDataServerRequest* receiver_request, const CacheMeta* meta); }; } diff --git a/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.cpp b/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.cpp index d3e259ff1ffa0940a3887eb6b1e2159fd69a33e2..d08f13a8c05089475e887f7c9ca3038ca848e28c 100644 --- a/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.cpp +++ b/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.cpp @@ -8,9 +8,9 @@ std::unique_ptr<RequestHandler> ReceiverDataServerRequestHandlerFactory::NewRequ uint64_t* shared_counter) { return std::unique_ptr<RequestHandler> {new ReceiverDataServerRequestHandler(server_, data_cache_, statistics_)}; } -ReceiverDataServerRequestHandlerFactory::ReceiverDataServerRequestHandlerFactory(const NetServer* server, +ReceiverDataServerRequestHandlerFactory::ReceiverDataServerRequestHandlerFactory(RdsNetServer* server, DataCache* data_cache, Statistics* statistics) : server_{server}, data_cache_{data_cache}, statistics_{statistics} { } -} \ No newline at end of file +} diff --git a/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.h b/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.h index b84f80fc8b562041b78cd97f8fee5897fc8d2807..29b26dbcefafb27f74786d3371a3c6e6a60b7ca1 100644 --- a/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.h +++ b/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.h @@ -5,7 +5,7 @@ #include "request/request_handler.h" #include "preprocessor/definitions.h" -#include "net_server.h" +#include "rds_net_server.h" #include "../data_cache.h" #include "../statistics.h" @@ -13,10 +13,10 @@ namespace asapo { class ReceiverDataServerRequestHandlerFactory : public RequestHandlerFactory { public: - ReceiverDataServerRequestHandlerFactory (const NetServer* server, DataCache* data_cache, Statistics* statistics); + ReceiverDataServerRequestHandlerFactory(RdsNetServer* server, DataCache* data_cache, Statistics* statistics); VIRTUAL std::unique_ptr<RequestHandler> NewRequestHandler(uint64_t thread_id, uint64_t* shared_counter) override; private: - const NetServer* server_; + RdsNetServer* server_; DataCache* data_cache_; Statistics* statistics_; }; diff --git a/receiver/src/receiver_data_server/tcp_server.cpp b/receiver/src/receiver_data_server/tcp_server.cpp index 4a924745f9cab11081790b2b52b2fcb6b6e88776..bd8bb807c387151e60e5c25eabdbe70edbe680fa 100644 --- a/receiver/src/receiver_data_server/tcp_server.cpp +++ b/receiver/src/receiver_data_server/tcp_server.cpp @@ -9,7 +9,7 @@ namespace asapo { TcpServer::TcpServer(std::string address) : io__{GenerateDefaultIO()}, log__{GetDefaultReceiverDataServerLogger()}, address_{std::move(address)} {} -Error TcpServer::InitializeMasterSocketIfNeeded() const noexcept { +Error TcpServer::Initialize() { Error err; if (master_socket_ == kDisconnectedSocketDescriptor) { master_socket_ = io__->CreateAndBindIPTCPSocketListener(address_, kMaxPendingConnections, &err); @@ -18,11 +18,13 @@ Error TcpServer::InitializeMasterSocketIfNeeded() const noexcept { } else { log__->Error("dataserver cannot listen on " + address_ + ": " + err->Explain()); } + } else { + err = TextError("Server was already initialized"); } return err; } -ListSocketDescriptors TcpServer::GetActiveSockets(Error* err) const noexcept { +ListSocketDescriptors TcpServer::GetActiveSockets(Error* err) { std::vector<std::string> new_connections; auto sockets = io__->WaitSocketsActivity(master_socket_, &sockets_to_listen_, &new_connections, err); for (auto& connection : new_connections) { @@ -31,14 +33,14 @@ ListSocketDescriptors TcpServer::GetActiveSockets(Error* err) const noexcept { return sockets; } -void TcpServer::CloseSocket(SocketDescriptor socket) const noexcept { +void TcpServer::CloseSocket(SocketDescriptor socket) { sockets_to_listen_.erase(std::remove(sockets_to_listen_.begin(), sockets_to_listen_.end(), socket), sockets_to_listen_.end()); log__->Debug("connection " + io__->AddressFromSocket(socket) + " closed"); io__->CloseSocket(socket, nullptr); } -ReceiverDataServerRequestPtr TcpServer::ReadRequest(SocketDescriptor socket, Error* err) const noexcept { +ReceiverDataServerRequestPtr TcpServer::ReadRequest(SocketDescriptor socket, Error* err) { GenericRequestHeader header; io__->Receive(socket, &header, sizeof(GenericRequestHeader), err); @@ -51,10 +53,10 @@ ReceiverDataServerRequestPtr TcpServer::ReadRequest(SocketDescriptor socket, Err ); return nullptr; } - return ReceiverDataServerRequestPtr{new ReceiverDataServerRequest{std::move(header), (uint64_t) socket}}; + return ReceiverDataServerRequestPtr{new ReceiverDataServerRequest{header, (uint64_t) socket}}; } -GenericRequests TcpServer::ReadRequests(const ListSocketDescriptors& sockets) const noexcept { +GenericRequests TcpServer::ReadRequests(const ListSocketDescriptors& sockets) { GenericRequests requests; for (auto client : sockets) { Error err; @@ -69,11 +71,7 @@ GenericRequests TcpServer::ReadRequests(const ListSocketDescriptors& sockets) co return requests; } -GenericRequests TcpServer::GetNewRequests(Error* err) const noexcept { - if ( (*err = InitializeMasterSocketIfNeeded()) ) { - return {}; - } - +GenericRequests TcpServer::GetNewRequests(Error* err) { auto sockets = GetActiveSockets(err); if (*err) { return {}; @@ -90,18 +88,34 @@ TcpServer::~TcpServer() { io__->CloseSocket(master_socket_, nullptr); } +void TcpServer::HandleAfterError(uint64_t source_id) { + CloseSocket(source_id); +} -Error TcpServer::SendData(uint64_t source_id, void* buf, uint64_t size) const noexcept { +Error TcpServer::SendResponse(const ReceiverDataServerRequest* request, const GenericNetworkResponse* response) { Error err; - io__->Send(source_id, buf, size, &err); + io__->Send(request->source_id, response, sizeof(*response), &err); if (err) { log__->Error("cannot send to consumer" + err->Explain()); } return err; } -void TcpServer::HandleAfterError(uint64_t source_id) const noexcept { - CloseSocket(source_id); +Error +TcpServer::SendResponseAndSlotData(const ReceiverDataServerRequest* request, const GenericNetworkResponse* response, + const CacheMeta* cache_slot) { + Error err; + + err = SendResponse(request, response); + if (err) { + return err; + } + + io__->Send(request->source_id, cache_slot->addr, cache_slot->size, &err); + if (err) { + log__->Error("cannot send slot to worker" + err->Explain()); + } + return err; } -} \ No newline at end of file +} diff --git a/receiver/src/receiver_data_server/tcp_server.h b/receiver/src/receiver_data_server/tcp_server.h index af4f9c6579a85e28ff8aae946f2c0c37a503147f..bf27acafc1d50ac9219cad6bd66f96c3a314352a 100644 --- a/receiver/src/receiver_data_server/tcp_server.h +++ b/receiver/src/receiver_data_server/tcp_server.h @@ -1,7 +1,7 @@ -#ifndef ASAPO_TCP_SERVER_H -#define ASAPO_TCP_SERVER_H +#ifndef ASAPO_RDS_TCP_SERVER_H +#define ASAPO_RDS_TCP_SERVER_H -#include "net_server.h" +#include "rds_net_server.h" #include "io/io.h" #include "logger/logger.h" #include "receiver_data_server_request.h" @@ -9,26 +9,31 @@ namespace asapo { const int kMaxPendingConnections = 5; -class TcpServer : public NetServer { +class TcpServer : public RdsNetServer { public: - TcpServer(std::string address); - ~TcpServer(); - GenericRequests GetNewRequests(Error* err) const noexcept override ; - Error SendData(uint64_t source_id, void* buf, uint64_t size) const noexcept override; - void HandleAfterError(uint64_t source_id) const noexcept override; + explicit TcpServer(std::string address); + ~TcpServer() override; + + Error Initialize() override; + + GenericRequests GetNewRequests(Error* err) override; + Error SendResponse(const ReceiverDataServerRequest* request, + const GenericNetworkResponse* response) override; + Error SendResponseAndSlotData(const ReceiverDataServerRequest* request, const GenericNetworkResponse* response, + const CacheMeta* cache_slot) override; + void HandleAfterError(uint64_t source_id) override; std::unique_ptr<IO> io__; const AbstractLogger* log__; private: - void CloseSocket(SocketDescriptor socket) const noexcept; - ListSocketDescriptors GetActiveSockets(Error* err) const noexcept; - Error InitializeMasterSocketIfNeeded() const noexcept; - ReceiverDataServerRequestPtr ReadRequest(SocketDescriptor socket, Error* err) const noexcept; - GenericRequests ReadRequests(const ListSocketDescriptors& sockets) const noexcept; - mutable SocketDescriptor master_socket_{kDisconnectedSocketDescriptor}; - mutable ListSocketDescriptors sockets_to_listen_; + void CloseSocket(SocketDescriptor socket); + ListSocketDescriptors GetActiveSockets(Error* err); + ReceiverDataServerRequestPtr ReadRequest(SocketDescriptor socket, Error* err) ; + GenericRequests ReadRequests(const ListSocketDescriptors& sockets); + SocketDescriptor master_socket_{kDisconnectedSocketDescriptor}; + ListSocketDescriptors sockets_to_listen_; std::string address_; }; } -#endif //ASAPO_TCP_SERVER_H +#endif //ASAPO_RDS_TCP_SERVER_H diff --git a/receiver/unittests/receiver_data_server/receiver_dataserver_mocking.h b/receiver/unittests/receiver_data_server/receiver_dataserver_mocking.h index 6eacdb6c706bd8c47e13b757b2704930fee179ed..5db8ab20591ab3b28368aae72e317d326b2154e3 100644 --- a/receiver/unittests/receiver_data_server/receiver_dataserver_mocking.h +++ b/receiver/unittests/receiver_data_server/receiver_dataserver_mocking.h @@ -4,15 +4,20 @@ #include <gtest/gtest.h> #include <gmock/gmock.h> -#include "../../src/receiver_data_server/net_server.h" +#include "../../src/receiver_data_server/rds_net_server.h" #include "request/request_pool.h" #include "../../src/receiver_data_server/receiver_data_server_request.h" namespace asapo { -class MockNetServer : public NetServer { +class MockNetServer : public RdsNetServer { public: - GenericRequests GetNewRequests(Error* err) const noexcept override { + Error Initialize() override { + return Error{Initialize_t()}; + }; + MOCK_METHOD0(Initialize_t, ErrorInterface * ()); + + GenericRequests GetNewRequests(Error* err) override { ErrorInterface* error = nullptr; auto reqs = GetNewRequests_t(&error); err->reset(error); @@ -24,21 +29,29 @@ class MockNetServer : public NetServer { return res; } - MOCK_CONST_METHOD1(GetNewRequests_t, std::vector<ReceiverDataServerRequest> (ErrorInterface** - error)); - - Error SendData(uint64_t source_id, void* buf, uint64_t size) const noexcept override { - return Error{SendData_t(source_id, buf, size)}; + MOCK_METHOD1(GetNewRequests_t, std::vector<ReceiverDataServerRequest> (ErrorInterface** + error)); + Error SendResponse(const ReceiverDataServerRequest* request, + const GenericNetworkResponse* response) override { + return Error{SendResponse_t(request, response)}; }; + MOCK_METHOD2(SendResponse_t, ErrorInterface * (const ReceiverDataServerRequest* request, + const GenericNetworkResponse* response)); - MOCK_CONST_METHOD3(SendData_t, ErrorInterface * (uint64_t source_id, void* buf, uint64_t size)); + Error SendResponseAndSlotData(const ReceiverDataServerRequest* request, const GenericNetworkResponse* response, + const CacheMeta* cache_slot) override { + return Error{SendResponseAndSlotData_t(request, response, cache_slot)}; + }; + MOCK_METHOD3(SendResponseAndSlotData_t, ErrorInterface * (const ReceiverDataServerRequest* request, + const GenericNetworkResponse* response, + const CacheMeta* cache_slot)); - void HandleAfterError(uint64_t source_id) const noexcept override { + void HandleAfterError(uint64_t source_id) override { HandleAfterError_t(source_id); } - MOCK_CONST_METHOD1(HandleAfterError_t, void (uint64_t source_id)); + MOCK_METHOD1(HandleAfterError_t, void (uint64_t source_id)); }; class MockPool : public RequestPool { @@ -47,7 +60,7 @@ class MockPool : public RequestPool { Error AddRequests(GenericRequests requests) noexcept override { std::vector<GenericRequest> reqs; for (const auto& preq : requests) { - reqs.push_back(GenericRequest{preq->header, 0}); + reqs.emplace_back(preq->header, 0); } return Error(AddRequests_t(std::move(reqs))); diff --git a/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp b/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp index 50017308ce4856b558270470ff3a65d9a1431a5d..17f798369b240d4bff4a2fa0c39f6b758b71eabb 100644 --- a/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp +++ b/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp @@ -59,7 +59,7 @@ class ReceiverDataServerTests : public Test { NiceMock<asapo::MockLogger> mock_logger; NiceMock<asapo::MockStatistics> mock_statistics; void SetUp() override { - data_server.net__ = std::unique_ptr<asapo::NetServer> {&mock_net}; + data_server.net__ = std::unique_ptr<asapo::RdsNetServer> {&mock_net}; data_server.request_pool__ = std::unique_ptr<asapo::RequestPool> {&mock_pool}; data_server.log__ = &mock_logger; data_server.statistics__ = std::unique_ptr<asapo::Statistics> {&mock_statistics};; diff --git a/receiver/unittests/receiver_data_server/test_request_handler.cpp b/receiver/unittests/receiver_data_server/test_request_handler.cpp index fa5b8d8dd1dd9e38d42eb20eb39d982e68b75564..804999e1b79723968d549e57d2ddc3abafa01c9b 100644 --- a/receiver/unittests/receiver_data_server/test_request_handler.cpp +++ b/receiver/unittests/receiver_data_server/test_request_handler.cpp @@ -23,15 +23,14 @@ using ::testing::_; using ::testing::SetArgPointee; using ::testing::NiceMock; using ::testing::HasSubstr; - +using ::testing::DoAll; using asapo::ReceiverDataServer; using asapo::ReceiverDataServerRequestHandler; - namespace { -MATCHER_P3(M_CheckResponce, op_code, error_code, message, +MATCHER_P3(M_CheckResponse, op_code, error_code, message, "Checks if a valid GenericNetworkResponse was used") { return ((asapo::GenericNetworkResponse*)arg)->op_code == op_code && ((asapo::GenericNetworkResponse*)arg)->error_code == uint64_t(error_code); @@ -57,6 +56,7 @@ class RequestHandlerTests : public Test { uint64_t expected_meta_size = 100; uint64_t expected_buf_id = 12345; uint64_t expected_source_id = 11; + asapo::CacheMeta expected_meta; bool retry; asapo::GenericRequestHeader header{asapo::kOpcodeGetBufferData, expected_buf_id, expected_data_size, expected_meta_size, ""}; @@ -67,22 +67,41 @@ class RequestHandlerTests : public Test { } void TearDown() override { } - void MockGetSlot(bool ok = true); - void MockSendResponce(asapo::NetworkErrorCode err_code, bool ok = true); + void MockGetSlotAndUnlockIt(bool return_without_error = true); + void MockSendResponse(asapo::NetworkErrorCode expected_response_code, bool return_without_error); + void MockSendResponseAndSlotData(asapo::NetworkErrorCode expected_response_code, bool return_without_error); + }; -void RequestHandlerTests::MockGetSlot(bool ok) { - EXPECT_CALL(mock_cache, GetSlotToReadAndLock(expected_buf_id, expected_data_size, _)).WillOnce( - Return(ok ? &tmp : nullptr) - ); +void RequestHandlerTests::MockGetSlotAndUnlockIt(bool return_without_error) { + EXPECT_CALL(mock_cache, GetSlotToReadAndLock(expected_buf_id, expected_data_size, _)).WillOnce(DoAll( + SetArgPointee<2>(return_without_error ? &expected_meta : nullptr), + Return(return_without_error ? &tmp : nullptr) + )); + if (return_without_error) { + EXPECT_CALL(mock_cache, UnlockSlot(_)); + } } -void RequestHandlerTests::MockSendResponce(asapo::NetworkErrorCode err_code, bool ok) { - EXPECT_CALL(mock_net, SendData_t(expected_source_id, - M_CheckResponce(asapo::kOpcodeGetBufferData, err_code, ""), sizeof(asapo::GenericNetworkResponse))).WillOnce( - Return(ok ? nullptr : asapo::IOErrorTemplates::kUnknownIOError.Generate().release()) - ); +void RequestHandlerTests::MockSendResponse(asapo::NetworkErrorCode expected_response_code, bool return_without_error) { + EXPECT_CALL(mock_net, SendResponse_t( + &request, + M_CheckResponse(asapo::kOpcodeGetBufferData, expected_response_code, "") + )).WillOnce( + Return(return_without_error ? nullptr : asapo::IOErrorTemplates::kUnknownIOError.Generate().release()) + ); +} + +void RequestHandlerTests::MockSendResponseAndSlotData(asapo::NetworkErrorCode expected_response_code, + bool return_without_error) { + EXPECT_CALL(mock_net, SendResponseAndSlotData_t( + &request, + M_CheckResponse(asapo::kOpcodeGetBufferData, expected_response_code, ""), + &expected_meta + )).WillOnce( + Return(return_without_error ? nullptr : asapo::IOErrorTemplates::kUnknownIOError.Generate().release()) + ); } TEST_F(RequestHandlerTests, RequestAlwaysReady) { @@ -91,9 +110,9 @@ TEST_F(RequestHandlerTests, RequestAlwaysReady) { ASSERT_THAT(res, Eq(true)); } -TEST_F(RequestHandlerTests, ProcessRequest_WronOpCode) { +TEST_F(RequestHandlerTests, ProcessRequest_WrongOpCode) { request.header.op_code = asapo::kOpcodeUnknownOp; - MockSendResponce(asapo::kNetErrorWrongRequest, false); + MockSendResponse(asapo::kNetErrorWrongRequest, false); EXPECT_CALL(mock_net, HandleAfterError_t(expected_source_id)); EXPECT_CALL(mock_logger, Error(HasSubstr("wrong request"))); @@ -103,8 +122,8 @@ TEST_F(RequestHandlerTests, ProcessRequest_WronOpCode) { ASSERT_THAT(success, Eq(true)); } -TEST_F(RequestHandlerTests, ProcessRequestReturnsNoDataWhenCacheNotUsed) { - MockSendResponce(asapo::kNetErrorNoData, true); +TEST_F(RequestHandlerTests, ProcessRequest_ReturnsNoDataWhenCacheNotUsed) { + MockSendResponse(asapo::kNetErrorNoData, true); auto success = handler_no_cache.ProcessRequestUnlocked(&request, &retry); EXPECT_CALL(mock_logger, Debug(_)).Times(0); @@ -112,9 +131,9 @@ TEST_F(RequestHandlerTests, ProcessRequestReturnsNoDataWhenCacheNotUsed) { ASSERT_THAT(success, Eq(true)); } -TEST_F(RequestHandlerTests, ProcessRequestReadSlotReturnsNull) { - MockGetSlot(false); - MockSendResponce(asapo::kNetErrorNoData, true); +TEST_F(RequestHandlerTests, ProcessRequest_ReadSlotReturnsNull) { + MockGetSlotAndUnlockIt(false); + MockSendResponse(asapo::kNetErrorNoData, true); EXPECT_CALL(mock_logger, Debug(HasSubstr("not found"))); auto success = handler.ProcessRequestUnlocked(&request, &retry); @@ -122,28 +141,19 @@ TEST_F(RequestHandlerTests, ProcessRequestReadSlotReturnsNull) { ASSERT_THAT(success, Eq(true)); } - -TEST_F(RequestHandlerTests, ProcessRequestReadSlotErrorSendingResponce) { - MockGetSlot(true); - MockSendResponce(asapo::kNetErrorNoError, false); - EXPECT_CALL(mock_net, SendData_t(expected_source_id, &tmp, expected_data_size)).Times(0); - EXPECT_CALL(mock_cache, UnlockSlot(_)); +TEST_F(RequestHandlerTests, ProcessRequest_ReadSlotErrorSendingResponse) { + MockGetSlotAndUnlockIt(true); + MockSendResponseAndSlotData(asapo::kNetErrorNoError, false); + EXPECT_CALL(mock_net, HandleAfterError_t(_)); auto success = handler.ProcessRequestUnlocked(&request, &retry); ASSERT_THAT(success, Eq(true)); } - - -TEST_F(RequestHandlerTests, ProcessRequestOk) { - MockGetSlot(true); - MockSendResponce(asapo::kNetErrorNoError, true); - EXPECT_CALL(mock_net, SendData_t(expected_source_id, &tmp, expected_data_size)).WillOnce( - Return(nullptr) - ); - EXPECT_CALL(mock_cache, UnlockSlot(_)); - EXPECT_CALL(mock_logger, Debug(HasSubstr("sending"))); +TEST_F(RequestHandlerTests, ProcessRequest_Ok) { + MockGetSlotAndUnlockIt(true); + MockSendResponseAndSlotData(asapo::kNetErrorNoError, true); EXPECT_CALL(mock_stat, IncreaseRequestCounter_t()); EXPECT_CALL(mock_stat, IncreaseRequestDataVolume_t(expected_data_size)); auto success = handler.ProcessRequestUnlocked(&request, &retry); diff --git a/receiver/unittests/receiver_data_server/test_request_handler_factory.cpp b/receiver/unittests/receiver_data_server/test_request_handler_factory.cpp index 770804c8f908d0d5dfbeb9074984df07a94862bb..44ecc6462c46854fc698e0d54be6cc4f77807c79 100644 --- a/receiver/unittests/receiver_data_server/test_request_handler_factory.cpp +++ b/receiver/unittests/receiver_data_server/test_request_handler_factory.cpp @@ -35,7 +35,7 @@ TEST(ReceiverDataServerRequestHandlerFactory, Constructor) { config.nthreads = 4; ReceiverDataServer data_server{"", asapo::LogLevel::Debug, nullptr, config}; asapo::Statistics stat; - ReceiverDataServerRequestHandlerFactory factory((asapo::NetServer*)&data_server, nullptr, &stat); + ReceiverDataServerRequestHandlerFactory factory((asapo::RdsNetServer*)&data_server, nullptr, &stat); auto handler = factory.NewRequestHandler(1, nullptr); ASSERT_THAT(dynamic_cast<const asapo::ReceiverDataServerRequestHandler*>(handler.get()), Ne(nullptr)); } diff --git a/receiver/unittests/receiver_data_server/test_tcp_server.cpp b/receiver/unittests/receiver_data_server/test_tcp_server.cpp index 9b8985ef3f03f68a85344fa01c398864e5f29b86..76c6caf5eee3070caeaaa600c7771490143d8bb4 100644 --- a/receiver/unittests/receiver_data_server/test_tcp_server.cpp +++ b/receiver/unittests/receiver_data_server/test_tcp_server.cpp @@ -59,14 +59,15 @@ class TCPServerTests : public Test { void TearDown() override { tcp_server.io__.release(); } - void ExpectListenMaster(bool ok); + void ExpectTcpBind(bool ok); void WaitSockets(bool ok, ListSocketDescriptors clients = {}); - void MockReceiveRequest(bool ok ); + void MockReceiveRequest(bool ok); + void InitMasterServer(); void ExpectReceiveOk(); void ExpectReceiveRequestEof(); }; -void TCPServerTests::ExpectListenMaster(bool ok) { +void TCPServerTests::ExpectTcpBind(bool ok) { EXPECT_CALL(mock_io, CreateAndBindIPTCPSocketListener_t(expected_address, asapo::kMaxPendingConnections, _)) .WillOnce(DoAll( SetArgPointee<2>(ok ? nullptr : asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), @@ -93,24 +94,28 @@ void TCPServerTests::WaitSockets(bool ok, ListSocketDescriptors clients) { } } -TEST_F(TCPServerTests, GetNewRequestsInitializesSocket) { - Error err; - ExpectListenMaster(false); +void TCPServerTests::InitMasterServer() { + ExpectTcpBind(true); + ASSERT_THAT(tcp_server.Initialize(), Eq(nullptr)); +} - auto requests = tcp_server.GetNewRequests(&err); +TEST_F(TCPServerTests, Initialize_Error) { + ExpectTcpBind(false); + + Error err = tcp_server.Initialize(); ASSERT_THAT(err, Ne(nullptr)); - ASSERT_THAT(requests, IsEmpty()); } -TEST_F(TCPServerTests, GetNewRequestsInitializesSocketOnlyOnce) { +TEST_F(TCPServerTests, Initialize_ErrorDoubleInitialize) { Error err; - ExpectListenMaster(false); - tcp_server.GetNewRequests(&err); - tcp_server.GetNewRequests(&err); + ExpectTcpBind(true); + err = tcp_server.Initialize(); + ASSERT_THAT(err, Eq(nullptr)); -// ASSERT_THAT(err, Ne(nullptr)); + err = tcp_server.Initialize(); + ASSERT_THAT(err, Ne(nullptr)); } void TCPServerTests::MockReceiveRequest(bool ok ) { @@ -162,10 +167,9 @@ void TCPServerTests::ExpectReceiveOk() { } } - TEST_F(TCPServerTests, GetNewRequestsWaitsSocketActivitiesError) { Error err; - ExpectListenMaster(true); + InitMasterServer(); WaitSockets(false); auto requests = tcp_server.GetNewRequests(&err); @@ -176,7 +180,7 @@ TEST_F(TCPServerTests, GetNewRequestsWaitsSocketActivitiesError) { TEST_F(TCPServerTests, GetNewRequestsWaitsSocketReceiveFailure) { Error err; - ExpectListenMaster(true); + InitMasterServer(); WaitSockets(true); MockReceiveRequest(false); @@ -194,7 +198,7 @@ TEST_F(TCPServerTests, GetNewRequestsWaitsSocketReceiveFailure) { TEST_F(TCPServerTests, GetNewRequestsReadEof) { Error err; - ExpectListenMaster(true); + InitMasterServer(); WaitSockets(true); ExpectReceiveRequestEof(); @@ -212,9 +216,8 @@ TEST_F(TCPServerTests, GetNewRequestsReadEof) { } TEST_F(TCPServerTests, GetNewRequestsReadOk) { - Error - err; - ExpectListenMaster(true); + Error err; + InitMasterServer(); WaitSockets(true); ExpectReceiveOk(); @@ -234,10 +237,11 @@ TEST_F(TCPServerTests, GetNewRequestsReadOk) { } -TEST_F(TCPServerTests, SendData) { - uint8_t tmp; +TEST_F(TCPServerTests, SendResponse) { + asapo::GenericNetworkResponse tmp {}; + asapo::ReceiverDataServerRequest expectedRequest {{}, 30}; - EXPECT_CALL(mock_io, Send_t(1, &tmp, 10, _)) + EXPECT_CALL(mock_io, Send_t(30, &tmp, sizeof(asapo::GenericNetworkResponse), _)) .WillOnce( DoAll( testing::SetArgPointee<3>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), @@ -246,11 +250,80 @@ TEST_F(TCPServerTests, SendData) { EXPECT_CALL(mock_logger, Error(HasSubstr("cannot send"))); - auto err = tcp_server.SendData(1, &tmp, 10); + auto err = tcp_server.SendResponse(&expectedRequest, &tmp); + + ASSERT_THAT(err, Ne(nullptr)); +} + +TEST_F(TCPServerTests, SendResponseAndSlotData_SendResponseError) { + asapo::GenericNetworkResponse tmp {}; + + + asapo::ReceiverDataServerRequest expectedRequest {{}, 30}; + asapo::CacheMeta expectedMeta {}; + expectedMeta.id = 20; + expectedMeta.addr = (void*)0x9234; + expectedMeta.size = 50; + expectedMeta.lock = 123; + + EXPECT_CALL(mock_io, Send_t(30, &tmp, sizeof(asapo::GenericNetworkResponse), _)) + .WillOnce(DoAll( + testing::SetArgPointee<3>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), + Return(0) + )); + EXPECT_CALL(mock_logger, Error(HasSubstr("cannot send"))); + + auto err = tcp_server.SendResponseAndSlotData(&expectedRequest, &tmp, &expectedMeta); + + ASSERT_THAT(err, Ne(nullptr)); +} + +TEST_F(TCPServerTests, SendResponseAndSlotData_SendDataError) { + asapo::GenericNetworkResponse tmp {}; + + asapo::ReceiverDataServerRequest expectedRequest {{}, 30}; + asapo::CacheMeta expectedMeta {}; + expectedMeta.id = 20; + expectedMeta.addr = (void*)0x9234; + expectedMeta.size = 50; + expectedMeta.lock = 123; + + EXPECT_CALL(mock_io, Send_t(30, &tmp, sizeof(asapo::GenericNetworkResponse), _)) + .WillOnce(Return(1)); + EXPECT_CALL(mock_io, Send_t(30, expectedMeta.addr, expectedMeta.size, _)) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), + Return(0) + )); + + EXPECT_CALL(mock_logger, Error(HasSubstr("cannot send"))); + + auto err = tcp_server.SendResponseAndSlotData(&expectedRequest, &tmp, &expectedMeta); ASSERT_THAT(err, Ne(nullptr)); } +TEST_F(TCPServerTests, SendResponseAndSlotData_Ok) { + asapo::GenericNetworkResponse tmp {}; + + asapo::ReceiverDataServerRequest expectedRequest {{}, 30}; + asapo::CacheMeta expectedMeta {}; + expectedMeta.id = 20; + expectedMeta.addr = (void*)0x9234; + expectedMeta.size = 50; + expectedMeta.lock = 123; + + EXPECT_CALL(mock_io, Send_t(30, &tmp, sizeof(asapo::GenericNetworkResponse), _)) + .WillOnce(Return(1)); + EXPECT_CALL(mock_io, Send_t(30, expectedMeta.addr, expectedMeta.size, _)) + .WillOnce(Return(expectedMeta.size)); + + auto err = tcp_server.SendResponseAndSlotData(&expectedRequest, &tmp, &expectedMeta); + + ASSERT_THAT(err, Eq(nullptr)); +} + TEST_F(TCPServerTests, HandleAfterError) { EXPECT_CALL(mock_io, CloseSocket_t(expected_client_sockets[0], _)); tcp_server.HandleAfterError(expected_client_sockets[0]); diff --git a/tests/automatic/settings/nginx.conf.tpl b/tests/automatic/settings/nginx.conf.tpl index 782384a118a67119d0970c16cee5283bf2d12f62..bf598bc5b0f782fb44e7cc127d209aaee5b407ad 100644 --- a/tests/automatic/settings/nginx.conf.tpl +++ b/tests/automatic/settings/nginx.conf.tpl @@ -1,5 +1,5 @@ worker_processes 1; - +daemon off; events { worker_connections 1024; }