From eb31bdaf84964154656ff93be38a7f5c6f9be9e2 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Sat, 16 Jun 2018 23:23:16 +0200 Subject: [PATCH] start refactor connection --- receiver/CMakeLists.txt | 3 +- receiver/src/connection.cpp | 121 +----- receiver/src/connection.h | 17 +- receiver/src/request.cpp | 10 +- receiver/src/request.h | 18 +- receiver/src/requests_dispatcher.cpp | 160 +++++++ receiver/src/requests_dispatcher.h | 44 ++ receiver/unittests/test_connection.cpp | 409 +++--------------- receiver/unittests/test_request.cpp | 15 +- .../unittests/test_requests_dispatcher.cpp | 248 +++++++++++ 10 files changed, 551 insertions(+), 494 deletions(-) create mode 100644 receiver/src/requests_dispatcher.cpp create mode 100644 receiver/src/requests_dispatcher.h create mode 100644 receiver/unittests/test_requests_dispatcher.cpp diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index 947315f30..20bf08ef3 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -9,7 +9,7 @@ set(SOURCE_FILES src/statistics_sender_influx_db.cpp src/receiver_config.cpp src/producer_logger.cpp - src/request_handler_db_write.cpp src/statistics_sender_fluentd.cpp src/statistics_sender_fluentd.h src/connection_authorizer.cpp src/connection_authorizer.h) + src/request_handler_db_write.cpp src/statistics_sender_fluentd.cpp src/statistics_sender_fluentd.h src/connection_authorizer.cpp src/connection_authorizer.h src/requests_dispatcher.cpp src/requests_dispatcher.h) ################################ @@ -51,6 +51,7 @@ set(TEST_SOURCE_FILES unittests/test_statistics_sender_influx_db.cpp unittests/test_statistics_sender_fluentd.cpp unittests/mock_receiver_config.cpp + unittests/test_requests_dispatcher.cpp ) # set(TEST_LIBRARIES "${TARGET_NAME};system_io") diff --git a/receiver/src/connection.cpp b/receiver/src/connection.cpp index 0f71c7118..d0944de5a 100644 --- a/receiver/src/connection.cpp +++ b/receiver/src/connection.cpp @@ -8,52 +8,19 @@ namespace asapo { -size_t Connection::kRequestHandlerMaxBufferSize; -std::atomic<uint32_t> Connection::kNetworkProducerPeerImplGlobalCounter(0); - Connection::Connection(SocketDescriptor socket_fd, const std::string& address, - std::string receiver_tag) : request_factory__{new RequestFactory}, + std::string receiver_tag) : io__{GenerateDefaultIO()}, statistics__{new Statistics}, log__{GetDefaultReceiverLogger()}, - authorizer__{new ConnectionAuthorizer} { + requests_dispatcher__{new RequestsDispatcher{socket_fd,address,statistics__.get()}}{ socket_fd_ = socket_fd; - connection_id_ = kNetworkProducerPeerImplGlobalCounter++; address_ = address; statistics__->AddTag("connection_from", address); statistics__->AddTag("receiver_tag", std::move(receiver_tag)); - } -uint64_t Connection::GetId() const noexcept { - return connection_id_; -} -NetworkErrorCode GetNetworkCodeFromError(const Error& err) { - if (err) { - if (err == IOErrorTemplates::kFileAlreadyExists) { - return NetworkErrorCode::kNetErrorFileIdAlreadyInUse; - } else { - return NetworkErrorCode::kNetErrorInternalServerError; - } - } - return NetworkErrorCode::kNetErrorNoError; -} - -Error Connection::ProcessRequest(const std::unique_ptr<Request>& request) const noexcept { - Error err; - err = request->Handle(&statistics__); - GenericNetworkResponse generic_response; - generic_response.error_code = GetNetworkCodeFromError(err); - if (err) { - log__->Error("error while processing request from " + address_ + " - " + err->Explain()); - } - io__->Send(socket_fd_, &generic_response, sizeof(GenericNetworkResponse), &err); - if (err) { - log__->Error("error sending response to " + address_ + " - " + err->Explain()); - } - return err; -} void Connection::ProcessStatisticsAfterRequest(const std::unique_ptr<Request>& request) const noexcept { statistics__->IncreaseRequestCounter(); @@ -62,74 +29,14 @@ void Connection::ProcessStatisticsAfterRequest(const std::unique_ptr<Request>& r statistics__->SendIfNeeded(); } -Error Connection::ReadAuthorizationHeaderIfNeeded() const { - if (auth_header_was_read_) return nullptr; - - Error err; - GenericRequestHeader generic_request_header; - io__->Receive(socket_fd_, &generic_request_header, sizeof(GenericRequestHeader), &err); - if (err) { - log__->Error("error receive authorization header from " + address_ + " - " + err->Explain()); - return err; - } - - if (generic_request_header.op_code != kOpcodeAuthorize) { - std::string msg= "wrong code in authorization header from " + address_; - log__->Error(msg); - return TextError(msg); - } - - beamtime_id_=std::string{generic_request_header.message}; - return nullptr; -} - -Error Connection::SendAuthorizationResponseIfNeeded(const Error& auth_err) const { - if (auth_header_was_read_) return nullptr; - - GenericNetworkResponse generic_response; - if (auth_err == nullptr) { - generic_response.error_code = kNetErrorNoError; - } else { - generic_response.error_code = kNetAuthorizationError; - strcpy(generic_response.message, auth_err->Explain().c_str()); - } - - Error send_err; - io__->Send(socket_fd_, &generic_response, sizeof(GenericNetworkResponse), &send_err); - if (send_err) { - log__->Error("error sending authorization response to " + address_ + " - " + send_err->Explain()); - return send_err; - } - auth_header_was_read_ = true; - return nullptr; -} - -Error Connection::AuthorizeIfNeeded() const { - Error err = ReadAuthorizationHeaderIfNeeded(); - if (err == nullptr) { - err = authorizer__->Authorize(beamtime_id_,address_); - } - Error err_send = SendAuthorizationResponseIfNeeded(err); - - return err == nullptr ? std::move(err_send) : std::move(err); -} - void Connection::Listen() const noexcept { while (true) { - Error err = AuthorizeIfNeeded(); - if (err) { - break; - } - auto request = WaitForNewRequest(&err); - if (err) { - if (err != ErrorTemplates::kEndOfFile) { - log__->Error("error while waiting for request from " + address_ + " - " + err->Explain()); - } + Error err; + auto request = requests_dispatcher__->GetNextRequest(&err); + if (err){ break; } - if (!request) continue; //no error, but timeout - log__->Debug("processing request from " + address_); - err = ProcessRequest(request); + err = requests_dispatcher__->ProcessRequest(request); if (err) { break; } @@ -140,21 +47,7 @@ void Connection::Listen() const noexcept { log__->Info("disconnected from " + address_); } -std::unique_ptr<Request> Connection::WaitForNewRequest(Error* err) const noexcept { -//TODO: to be overwritten with MessagePack (or similar) - GenericRequestHeader generic_request_header; - statistics__-> StartTimer(StatisticEntity::kNetwork); - io__-> ReceiveWithTimeout(socket_fd_, &generic_request_header, - sizeof(GenericRequestHeader), 50, err); - if(*err) { - if(*err == IOErrorTemplates::kTimeout) { - *err = nullptr;//Not an error in this case - } - return nullptr; - } - statistics__-> StopTimer(); - return request_factory__->GenerateRequest(generic_request_header, socket_fd_, err); -} } + diff --git a/receiver/src/connection.h b/receiver/src/connection.h index 9a42bea10..beed99003 100644 --- a/receiver/src/connection.h +++ b/receiver/src/connection.h @@ -17,39 +17,28 @@ #include "statistics.h" #include "logger/logger.h" #include "connection_authorizer.h" +#include "requests_dispatcher.h" namespace asapo { class Connection { public: private: - uint32_t connection_id_; std::string address_; int socket_fd_; public: - static size_t kRequestHandlerMaxBufferSize; - static std::atomic<uint32_t> kNetworkProducerPeerImplGlobalCounter; Connection(SocketDescriptor socket_fd, const std::string& address, std::string receiver_tag); ~Connection() = default; void Listen() const noexcept; - uint64_t GetId() const noexcept; - std::unique_ptr<RequestFactory> request_factory__; std::unique_ptr<IO> io__; mutable std::unique_ptr<Statistics> statistics__; const AbstractLogger* log__; - std::unique_ptr<ConnectionAuthorizer> authorizer__; - private: - mutable bool auth_header_was_read_ = false; - Error ReadAuthorizationHeaderIfNeeded() const; - Error SendAuthorizationResponseIfNeeded(const Error& auth_err) const; - Error AuthorizeIfNeeded() const; - std::unique_ptr<Request> WaitForNewRequest(Error* err) const noexcept; - Error ProcessRequest(const std::unique_ptr<Request>& request) const noexcept; + std::unique_ptr<RequestsDispatcher> requests_dispatcher__; + private: void ProcessStatisticsAfterRequest(const std::unique_ptr<Request>& request) const noexcept; - mutable std::string beamtime_id_; }; } diff --git a/receiver/src/request.cpp b/receiver/src/request.cpp index 1a836e9f6..85b230a07 100644 --- a/receiver/src/request.cpp +++ b/receiver/src/request.cpp @@ -29,23 +29,23 @@ Error Request::ReceiveData() { } -Error Request::Handle(std::unique_ptr<Statistics>* statistics) { +Error Request::Handle(Statistics* statistics) { Error err; if (request_header_.data_size != 0) { - (*statistics)->StartTimer(StatisticEntity::kNetwork); + statistics->StartTimer(StatisticEntity::kNetwork); auto err = ReceiveData(); if (err) { return err; } - (*statistics)->StopTimer(); + statistics->StopTimer(); } for (auto handler : handlers_) { - (*statistics)->StartTimer(handler->GetStatisticEntity()); + statistics->StartTimer(handler->GetStatisticEntity()); auto err = handler->ProcessRequest(*this); if (err) { return err; } - (*statistics)->StopTimer(); + statistics->StopTimer(); } return nullptr; } diff --git a/receiver/src/request.h b/receiver/src/request.h index 3b2f05698..3effd5fb2 100644 --- a/receiver/src/request.h +++ b/receiver/src/request.h @@ -9,22 +9,22 @@ #include "request_handler_db_write.h" #include "statistics.h" +#include "preprocessor/definitions.h" namespace asapo { using RequestHandlerList = std::vector<const RequestHandler*>; class Request { public: - virtual Error Handle(std::unique_ptr<Statistics>*); - virtual ~Request() = default; + VIRTUAL Error Handle(Statistics*); + ~Request() = default; Request(const GenericRequestHeader& request_header, SocketDescriptor socket_fd); - void AddHandler(const RequestHandler*); - const RequestHandlerList& GetListHandlers() const; - virtual uint64_t GetDataSize() const; - virtual uint64_t GetDataID() const; - virtual std::string GetFileName() const; - - virtual const FileData& GetData() const; + VIRTUAL void AddHandler(const RequestHandler*); + VIRTUAL const RequestHandlerList& GetListHandlers() const; + VIRTUAL uint64_t GetDataSize() const; + VIRTUAL uint64_t GetDataID() const; + VIRTUAL std::string GetFileName() const; + VIRTUAL const FileData& GetData() const; std::unique_ptr<IO> io__; private: Error AllocateDataBuffer(); diff --git a/receiver/src/requests_dispatcher.cpp b/receiver/src/requests_dispatcher.cpp new file mode 100644 index 000000000..0cd2ab796 --- /dev/null +++ b/receiver/src/requests_dispatcher.cpp @@ -0,0 +1,160 @@ +#include "requests_dispatcher.h" +#include "request.h" +#include "io/io_factory.h" +#include "receiver_logger.h" + +namespace asapo { + +RequestsDispatcher::RequestsDispatcher(SocketDescriptor socket_fd, std::string address, + Statistics* statistics) : statistics__{statistics}, + io__{GenerateDefaultIO()}, + log__{GetDefaultReceiverLogger()}, + request_factory__{new RequestFactory{}}, + authorizer__{new ConnectionAuthorizer}, + socket_fd_{socket_fd}, + producer_uri_{std::move(address)} { +} + +NetworkErrorCode GetNetworkCodeFromError(const Error& err) { + if (err) { + if (err == IOErrorTemplates::kFileAlreadyExists) { + return NetworkErrorCode::kNetErrorFileIdAlreadyInUse; + } else { + return NetworkErrorCode::kNetErrorInternalServerError; + } + } + return NetworkErrorCode::kNetErrorNoError; +} + + +Error RequestsDispatcher::ProcessRequest(const std::unique_ptr<Request>& request) const noexcept { + Error err; + err = request->Handle(statistics__); + GenericNetworkResponse generic_response; + generic_response.error_code = GetNetworkCodeFromError(err); + if (err) { + log__->Error("error while processing request from " + producer_uri_ + " - " + err->Explain()); + } + io__->Send(socket_fd_, &generic_response, sizeof(GenericNetworkResponse), &err); + if (err) { + log__->Error("error sending response to " + producer_uri_ + " - " + err->Explain()); + } + return err; +} + + +std::unique_ptr<Request> RequestsDispatcher::GetNextRequest(Error* err) const noexcept { +//TODO: to be overwritten with MessagePack (or similar) +GenericRequestHeader generic_request_header; +statistics__->StartTimer(StatisticEntity::kNetwork); +io__->Receive(socket_fd_, &generic_request_header,sizeof(GenericRequestHeader), err); +if(*err) { +log__->Error("error getting next request from " + producer_uri_+" - "+(*err)->Explain()); +return nullptr; +} +statistics__->StopTimer(); +auto request = request_factory__->GenerateRequest(generic_request_header, socket_fd_, err); +if (*err) { +log__->Error("error processing request from " + producer_uri_+" - "+(*err)->Explain()); +} + +return request; +} + + + +/* + #include <cstring> +#include <assert.h> +#include "connection.h" +#include "receiver_error.h" +#include "io/io_factory.h" + +#include "receiver_logger.h" + +namespace asapo { + +size_t Connection::kRequestHandlerMaxBufferSize; +std::atomic<uint32_t> Connection::kNetworkProducerPeerImplGlobalCounter(0); + +Connection::Connection(SocketDescriptor socket_fd, const std::string& address, + std::string receiver_tag) : request_factory__{new RequestFactory}, + io__{GenerateDefaultIO()}, + statistics__{new Statistics}, + log__{GetDefaultReceiverLogger()}, + authorizer__{new ConnectionAuthorizer}, + requests_dispatcher__{new RequestsDispatcher}{ + socket_fd_ = socket_fd; + connection_id_ = kNetworkProducerPeerImplGlobalCounter++; + address_ = address; + statistics__->AddTag("connection_from", address); + statistics__->AddTag("receiver_tag", std::move(receiver_tag)); +} + +uint64_t Connection::GetId() const noexcept { + return connection_id_; +} + + +Error Connection::ReadAuthorizationHeaderIfNeeded() const { + if (auth_header_was_read_) return nullptr; + + Error err; + GenericRequestHeader generic_request_header; + io__->Receive(socket_fd_, &generic_request_header, sizeof(GenericRequestHeader), &err); + if (err) { + log__->Error("error receive authorization header from " + address_ + " - " + err->Explain()); + return err; + } + + if (generic_request_header.op_code != kOpcodeAuthorize) { + std::string msg= "wrong code in authorization header from " + address_; + log__->Error(msg); + return TextError(msg); + } + + beamtime_id_=std::string{generic_request_header.message}; + return nullptr; +} + +Error Connection::SendAuthorizationResponseIfNeeded(const Error& auth_err) const { + if (auth_header_was_read_) return nullptr; + + GenericNetworkResponse generic_response; + if (auth_err == nullptr) { + generic_response.error_code = kNetErrorNoError; + } else { + generic_response.error_code = kNetAuthorizationError; + strcpy(generic_response.message, auth_err->Explain().c_str()); + } + + Error send_err; + io__->Send(socket_fd_, &generic_response, sizeof(GenericNetworkResponse), &send_err); + if (send_err) { + log__->Error("error sending authorization response to " + address_ + " - " + send_err->Explain()); + return send_err; + } + auth_header_was_read_ = true; + return nullptr; +} + +Error Connection::AuthorizeIfNeeded() const { + Error err = ReadAuthorizationHeaderIfNeeded(); + if (err == nullptr) { + err = authorizer__->Authorize(beamtime_id_,address_); + } + Error err_send = SendAuthorizationResponseIfNeeded(err); + + return err == nullptr ? std::move(err_send) : std::move(err); +} + + + +} + + + + + */ + +} diff --git a/receiver/src/requests_dispatcher.h b/receiver/src/requests_dispatcher.h new file mode 100644 index 000000000..9242ef8d4 --- /dev/null +++ b/receiver/src/requests_dispatcher.h @@ -0,0 +1,44 @@ +#ifndef ASAPO_REQUESTS_DISPATCHER_H +#define ASAPO_REQUESTS_DISPATCHER_H + +#include "preprocessor/definitions.h" +#include "common/error.h" +#include "request.h" +#include "io/io.h" +#include "statistics.h" +#include "logger/logger.h" +#include "connection_authorizer.h" + +namespace asapo { + +class RequestsDispatcher { + public: + RequestsDispatcher(SocketDescriptor socket_fd, std::string address, Statistics* statistics); + VIRTUAL Error ProcessRequest(const std::unique_ptr<Request>& request) const noexcept; + VIRTUAL std::unique_ptr<Request> GetNextRequest(Error* err) const noexcept; + Statistics* statistics__; + std::unique_ptr<IO> io__; + const AbstractLogger* log__; + std::unique_ptr<RequestFactory> request_factory__; + std::unique_ptr<ConnectionAuthorizer>authorizer__; + private: + SocketDescriptor socket_fd_; + std::string producer_uri_; +}; + +} + +#endif //ASAPO_REQUESTS_DISPATCHER_H + + +/* + mutable bool auth_header_was_read_ = false; + Error ReadAuthorizationHeaderIfNeeded() const; + Error SendAuthorizationResponseIfNeeded(const Error& auth_err) const; + Error AuthorizeIfNeeded() const; + std::unique_ptr<Request> WaitForNewRequest(Error* err) const noexcept; + Error ProcessRequest(const std::unique_ptr<Request>& request) const noexcept; + void ProcessStatisticsAfterRequest(const std::unique_ptr<Request>& request) const noexcept; + mutable std::string beamtime_id_; + + */ \ No newline at end of file diff --git a/receiver/unittests/test_connection.cpp b/receiver/unittests/test_connection.cpp index e72948622..a4c336d16 100644 --- a/receiver/unittests/test_connection.cpp +++ b/receiver/unittests/test_connection.cpp @@ -11,6 +11,8 @@ #include "../src/connection_authorizer.h" #include "../src/receiver_config.h" #include "../src/receiver_config_factory.h" +#include "../src/requests_dispatcher.h" + #include "mock_receiver_config.h" @@ -58,46 +60,28 @@ namespace { TEST(Connection, Constructor) { Connection connection{0, "some_address", "some_tag"}; ASSERT_THAT(dynamic_cast<asapo::Statistics*>(connection.statistics__.get()), Ne(nullptr)); - ASSERT_THAT(dynamic_cast<asapo::IO*>(connection.io__.get()), Ne(nullptr)); - ASSERT_THAT(dynamic_cast<asapo::RequestFactory*>(connection.request_factory__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(connection.log__), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<asapo::RequestsDispatcher*>(connection.requests_dispatcher__.get()), Ne(nullptr)); } -class MockRequestHandler: public Request { - public: - MockRequestHandler(const GenericRequestHeader& request_header, SocketDescriptor socket_fd): - Request(request_header, socket_fd) {}; - Error Handle(std::unique_ptr<Statistics>* statistics) override { - return Error{Handle_t()}; - }; - MOCK_CONST_METHOD0(Handle_t, ErrorInterface * ()); -}; - - -class MockRequestFactory: public asapo::RequestFactory { - public: - std::unique_ptr<Request> GenerateRequest(const GenericRequestHeader& request_header, - SocketDescriptor socket_fd, - Error* err) const noexcept override { - ErrorInterface* error = nullptr; - auto res = GenerateRequest_t(request_header, socket_fd, &error); - err->reset(error); - return std::unique_ptr<Request> {res}; - } - - MOCK_CONST_METHOD3(GenerateRequest_t, Request * (const GenericRequestHeader&, - SocketDescriptor socket_fd, - ErrorInterface**)); - -}; -class MockAuthorizer: public asapo::ConnectionAuthorizer { +class MockDispatcher: public asapo::RequestsDispatcher { public: - Error Authorize(std::string beamtime_id,std::string uri) const noexcept override { - return Error{Authorize_t(beamtime_id,uri)}; + MockDispatcher():asapo::RequestsDispatcher(0,"",nullptr){}; + Error ProcessRequest(const std::unique_ptr<Request>& request) const noexcept override { + return Error{ProcessRequest_t(request.get())}; } - MOCK_CONST_METHOD2(Authorize_t, ErrorInterface * (std::string beamtime_id,std::string uri)); + + std::unique_ptr<Request> GetNextRequest(Error* err) const noexcept override { + ErrorInterface* error = nullptr; + auto req = GetNextRequest_t(&error); + err->reset(error); + return std::unique_ptr<Request>{req}; + }; + + MOCK_CONST_METHOD1(ProcessRequest_t, ErrorInterface * (Request*)); + MOCK_CONST_METHOD1(GetNextRequest_t, Request * (asapo::ErrorInterface**)); }; @@ -106,357 +90,96 @@ class ConnectionTests : public Test { public: std::string connected_uri{"some_address"}; NiceMock<MockIO> mock_io; - MockRequestFactory mock_factory; + MockDispatcher mock_dispatcher; NiceMock<MockStatistics> mock_statictics; NiceMock<asapo::MockLogger> mock_logger; std::unique_ptr<Connection> connection; - NiceMock<MockAuthorizer> mock_authorizer; - Sequence seq_send; - void MockAuthorize(); - - asapo::ReceiverConfig test_config; void SetUp() override { - test_config.authorization_interval = 0; - SetReceiverConfig(test_config); connection = std::unique_ptr<Connection> {new Connection{0, connected_uri, "some_tag"}}; connection->io__ = std::unique_ptr<asapo::IO> {&mock_io}; connection->statistics__ = std::unique_ptr<asapo::Statistics> {&mock_statictics}; - connection->request_factory__ = std::unique_ptr<asapo::RequestFactory> {&mock_factory}; connection->log__ = &mock_logger; - connection->authorizer__ = std::unique_ptr<asapo::ConnectionAuthorizer> {&mock_authorizer}; - ON_CALL(mock_io, ReceiveWithTimeout_t(_, _, _, _, _)). - WillByDefault(DoAll(testing::SetArgPointee<4>(nullptr), - testing::Return(0))); - EXPECT_CALL(mock_io, CloseSocket_t(_, _)); + connection->requests_dispatcher__ = std::unique_ptr<asapo::RequestsDispatcher> {&mock_dispatcher}; + EXPECT_CALL(mock_io, CloseSocket_t(_,_)); EXPECT_CALL(mock_statictics, Send_t()); - ON_CALL(mock_authorizer, Authorize_t(_,_)).WillByDefault(Return(nullptr)); + EXPECT_CALL(mock_logger, Info(HasSubstr("disconnected"))); } void TearDown() override { connection->io__.release(); - connection->request_factory__.release(); - connection->authorizer__.release(); connection->statistics__.release(); + connection->requests_dispatcher__.release(); } -}; - -ACTION_P(A_WriteAuth, op_code) { - ((asapo::GenericRequestHeader*)arg1)->op_code = op_code; - strcpy(((asapo::GenericRequestHeader*)arg1)->message, "test"); -} - - -void ConnectionTests::MockAuthorize() { - EXPECT_CALL(mock_io, Receive_t(_, _, _, _)).WillOnce( - DoAll(SetArgPointee<3>(nullptr), - A_WriteAuth(asapo::kOpcodeAuthorize), - Return(0) - )); - - - EXPECT_CALL(mock_authorizer, Authorize_t(_,_)).Times(testing::AtLeast(1)).WillRepeatedly(Return(nullptr)); - - EXPECT_CALL(mock_io, Send_t(_, _, _, _)) - .InSequence(seq_send) - .WillOnce( - DoAll(SetArgPointee<3>(nullptr), - Return(0) - )); -} - - - -TEST_F(ConnectionTests, ErrorWaitForNewRequest) { - - MockAuthorize(); - EXPECT_CALL(mock_io, ReceiveWithTimeout_t(_, _, _, _, _)).Times(2). - WillOnce( - DoAll(SetArgPointee<4>(new asapo::IOError("", asapo::IOErrorType::kTimeout)), - Return(0))) - .WillOnce( - DoAll(SetArgPointee<4>(new asapo::IOError("", asapo::IOErrorType::kUnknownIOError)), - Return(0)) - ); - - EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("waiting for request"), HasSubstr(connected_uri)))); - - - connection->Listen(); -} - -ACTION_P(SaveArg1ToGenericNetworkResponse, value) { - auto resp = *static_cast<const GenericNetworkResponse*>(arg1); - value->error_code = resp.error_code; - strcpy(value->message, resp.message); -} - - -TEST_F(ConnectionTests, ErrorOnReadAuthorizationHeader) { - - EXPECT_CALL(mock_io, Receive_t(_, _, _, _)).WillOnce( - DoAll(SetArgPointee<3>(new asapo::IOError("Test Read Error", asapo::IOErrorType::kReadError)), - Return(0) - )); - - EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("receive authorization"), HasSubstr(connected_uri)))); - - - GenericNetworkResponse response; - EXPECT_CALL(mock_io, Send_t(_, _, sizeof(GenericNetworkResponse), _)) - .WillOnce( - DoAll(SetArgPointee<3>(nullptr), - SaveArg1ToGenericNetworkResponse(&response), - Return(0) - )); - - - connection->Listen(); - - ASSERT_THAT(response.error_code, Eq(asapo::NetworkErrorCode::kNetAuthorizationError)); - ASSERT_THAT(response.message, HasSubstr("Test Read Error")); -} - - - -TEST_F(ConnectionTests, ErrorOnSendingAuthorizationResponse) { - - EXPECT_CALL(mock_io, Receive_t(_, _, _, _)).WillOnce( - DoAll(SetArgPointee<3>(nullptr), - A_WriteAuth(asapo::kOpcodeAuthorize), - Return(0) - )); - - EXPECT_CALL(mock_io, Send_t(_, _, sizeof(GenericNetworkResponse), _)).WillOnce( - DoAll(SetArgPointee<3>(new asapo::IOError("Test Send Error", asapo::IOErrorType::kUnknownIOError)), - Return(0) - )); - - EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("authorization"), HasSubstr("response"), HasSubstr(connected_uri)))); - - connection->Listen(); - -} - -TEST_F(ConnectionTests, AuthorizerReturnsOk) { - EXPECT_CALL(mock_io, Receive_t(_, _, _, _)) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - A_WriteAuth(asapo::kOpcodeAuthorize), - testing::ReturnArg<2>() - )); - - EXPECT_CALL(mock_authorizer, Authorize_t(StrEq("test"),connected_uri)). - WillOnce( - Return(nullptr)); - - EXPECT_CALL(mock_io, Send_t(_, _, _, _)).WillOnce( - DoAll(SetArgPointee<3>(new asapo::IOError("Test Send Error", asapo::IOErrorType::kUnknownIOError)), - Return(0) - )); - - connection->Listen(); - -} - -TEST_F(ConnectionTests, ErrorOnWrongAuthHeaderCode) { - EXPECT_CALL(mock_io, Receive_t(_, _, _, _)) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - A_WriteAuth(asapo::kOpcodeTransferData), - Return(0) - )); - - EXPECT_CALL(mock_authorizer, Authorize_t(StrEq("test"),connected_uri)).Times(0); - - EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("wrong"),HasSubstr("code"), HasSubstr(connected_uri)))); - - connection->Listen(); -} - -TEST_F(ConnectionTests, AuthorizerReturnsError) { - EXPECT_CALL(mock_io, Receive_t(_, _, _, _)) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - A_WriteAuth(asapo::kOpcodeAuthorize), - testing::ReturnArg<2>() - )); - - EXPECT_CALL(mock_authorizer, Authorize_t(StrEq("test"),connected_uri)). - WillOnce( - Return(new asapo::SimpleError{"auth error"})); - - connection->Listen(); -} - - -TEST_F(ConnectionTests, CallsHandleRequest) { - - GenericRequestHeader header; - auto request = new MockRequestHandler{header, 1}; - MockAuthorize(); - - EXPECT_CALL(mock_io, ReceiveWithTimeout_t(_, _, _, _, _)); - - EXPECT_CALL(mock_factory, GenerateRequest_t(_, _, _)).WillOnce( - Return(request) - ); - - EXPECT_CALL(*request, Handle_t()).WillOnce( - Return(new asapo::SimpleError{""}) - ); - - EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("processing request"), HasSubstr(connected_uri)))); - EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("processing request"), HasSubstr(connected_uri)))); - - - EXPECT_CALL(mock_io, Send_t(_, _, _, _)) - .InSequence(seq_send) - .WillOnce( - DoAll(SetArgPointee<3>(new asapo::IOError("Test Send Error", asapo::IOErrorType::kUnknownIOError)), - Return(0) - )); - - EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("sending response"), HasSubstr(connected_uri)))); - EXPECT_CALL(mock_logger, Info(AllOf(HasSubstr("disconnected"), HasSubstr(connected_uri)))); - - connection->Listen(); -} - -TEST_F(ConnectionTests, SendsErrorToProducer) { - - GenericRequestHeader header; - auto request = new MockRequestHandler{header, 1}; - - MockAuthorize(); - - EXPECT_CALL(mock_io, ReceiveWithTimeout_t(_, _, _, _, _)); + Request* MockGetNext(bool error){ + if (error ){ + EXPECT_CALL(mock_dispatcher, GetNextRequest_t(_)) + .WillOnce(DoAll( + SetArgPointee<0>(new asapo::SimpleError{"error"}), + Return(nullptr) + )); + return nullptr; + } else { + auto request = new Request(GenericRequestHeader{asapo::kOpcodeUnknownOp,0,1,""},0); + EXPECT_CALL(mock_dispatcher, GetNextRequest_t(_)) + .WillOnce(DoAll( + SetArgPointee<0>(nullptr), + Return(request) + )); + return request; + } + } - EXPECT_CALL(mock_factory, GenerateRequest_t(_, _, _)).WillOnce( - Return(request) - ); + void MockProcessRequest(Request* request,bool error) { + if (error ){ + EXPECT_CALL(mock_dispatcher, ProcessRequest_t(request)) + .WillOnce( + Return(new asapo::SimpleError{"error"}) + ); + } else { + EXPECT_CALL(mock_dispatcher, ProcessRequest_t(request)) + .WillOnce( + Return(nullptr) + ); + } + } - EXPECT_CALL(*request, Handle_t()).WillOnce( - Return(new asapo::SimpleError{""}) - ); +}; - GenericNetworkResponse response; - EXPECT_CALL(mock_io, Send_t(_, _, sizeof(GenericNetworkResponse), _)) - .InSequence(seq_send) - .WillOnce( - DoAll(SetArgPointee<3>(new asapo::IOError("Test Send Error", asapo::IOErrorType::kUnknownIOError)), - SaveArg1ToGenericNetworkResponse(&response), - Return(0) - )); +TEST_F(ConnectionTests, ExitOnErrorsWithGetNextRequest) { + MockGetNext(true); connection->Listen(); - - ASSERT_THAT(response.error_code, Eq(asapo::NetworkErrorCode::kNetErrorInternalServerError)); - -} - -void MockExitCycle(const MockIO& mock_io) { - - EXPECT_CALL(mock_io, ReceiveWithTimeout_t(_, _, _, _, _)) - .Times(testing::AtLeast(1)).WillRepeatedly( - DoAll(SetArgPointee<4>(new asapo::IOError("", asapo::IOErrorType::kUnknownIOError)), - Return(0)) - ); -} - - -void MockExitCycle(const MockIO& mock_io, MockStatistics& mock_statictics) { - EXPECT_CALL(mock_statictics, StartTimer_t(StatisticEntity::kNetwork)); - - MockExitCycle(mock_io); } -MockRequestHandler* MockWaitRequest(const MockRequestFactory& mock_factory) { - GenericRequestHeader header; - header.data_size = 1; - auto request = new MockRequestHandler{header, 1}; - EXPECT_CALL(mock_factory, GenerateRequest_t(_, _, _)).WillOnce( - Return(request) - ); - return request; -} -TEST_F(ConnectionTests, FillsStatistics) { +TEST_F(ConnectionTests, ProcessStatisticsWhenOKProcessRequest) { InSequence sequence; + auto request = MockGetNext(false); - MockAuthorize(); - - EXPECT_CALL(mock_statictics, StartTimer_t(StatisticEntity::kNetwork)); - - EXPECT_CALL(mock_io, ReceiveWithTimeout_t(_, _, _, _, _)); - - EXPECT_CALL(mock_statictics, StopTimer_t()); - - auto request = MockWaitRequest(mock_factory); - - EXPECT_CALL(*request, Handle_t()).WillOnce( - Return(nullptr) - ); - - EXPECT_CALL(mock_io, Send_t(_, _, _, _)) - .InSequence(seq_send) - .WillOnce( - DoAll(SetArgPointee<3>(nullptr), - Return(0) - )); + MockProcessRequest(request,false); EXPECT_CALL(mock_statictics, IncreaseRequestCounter_t()); - - EXPECT_CALL(mock_statictics, IncreaseRequestDataVolume_t(1 + sizeof(asapo::GenericRequestHeader) + - sizeof(asapo::GenericNetworkResponse))); - - + EXPECT_CALL(mock_statictics, IncreaseRequestDataVolume_t(1+ sizeof(asapo::GenericRequestHeader) + + sizeof(asapo::GenericNetworkResponse))); EXPECT_CALL(mock_statictics, SendIfNeeded_t()); - EXPECT_CALL(mock_authorizer, Authorize_t(_,_)).WillOnce(Return(nullptr)); - MockExitCycle(mock_io, mock_statictics); + MockGetNext(true); connection->Listen(); - - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } -TEST_F(ConnectionTests, AuthorizedHeaderExtractedOnlyOnce) { - InSequence sequence; - - MockAuthorize(); - - EXPECT_CALL(mock_io, ReceiveWithTimeout_t(_, _, _, _, _)); - auto request = MockWaitRequest(mock_factory); +TEST_F(ConnectionTests, ExitOnErrorsWithProcessRequest) { + auto request = MockGetNext(false); - EXPECT_CALL(*request, Handle_t()).WillOnce( - Return(nullptr) - ); - - EXPECT_CALL(mock_io, Send_t(_, _, _, _)) - .WillOnce( - DoAll(SetArgPointee<3>(nullptr), - Return(0) - )); - - EXPECT_CALL(mock_authorizer, Authorize_t(_,_)).WillOnce(Return(nullptr)); - - EXPECT_CALL(mock_io, Send_t(_, _, _, _)).Times(0); - - MockExitCycle(mock_io); + MockProcessRequest(request,true); connection->Listen(); - - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } + } diff --git a/receiver/unittests/test_request.cpp b/receiver/unittests/test_request.cpp index f238e9891..6f75bdc28 100644 --- a/receiver/unittests/test_request.cpp +++ b/receiver/unittests/test_request.cpp @@ -70,9 +70,9 @@ class RequestTests : public Test { std::unique_ptr<Request> request; NiceMock<MockIO> mock_io; NiceMock<MockStatistics> mock_statistics; - std::unique_ptr<asapo::Statistics> stat; + asapo::Statistics* stat; void SetUp() override { - stat = std::unique_ptr<asapo::Statistics> {&mock_statistics}; + stat = &mock_statistics; generic_request_header.data_size = data_size_; generic_request_header.data_id = data_id_; request.reset(new Request{generic_request_header, socket_fd_}); @@ -84,7 +84,6 @@ class RequestTests : public Test { } void TearDown() override { request->io__.release(); - stat.release(); } }; @@ -97,7 +96,7 @@ TEST_F(RequestTests, HandleDoesNotReceiveEmptyData) { EXPECT_CALL(mock_io, Receive_t(_, _, _, _)).Times(0); - auto err = request->Handle(&stat); + auto err = request->Handle(stat); ASSERT_THAT(err, Eq(nullptr)); } @@ -108,7 +107,7 @@ TEST_F(RequestTests, HandleReturnsErrorOnDataReceive) { Return(0) )); - auto err = request->Handle(&stat); + auto err = request->Handle(stat); ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kReadError)); } @@ -125,7 +124,7 @@ TEST_F(RequestTests, HandleMeasuresTimeOnDataReceive) { EXPECT_CALL(mock_statistics, StopTimer_t()); - request->Handle(&stat); + request->Handle(stat); } @@ -151,7 +150,7 @@ TEST_F(RequestTests, HandleProcessesRequests) { EXPECT_CALL(mock_statistics, StopTimer_t()).Times(2); - auto err = request->Handle(&stat); + auto err = request->Handle(stat); ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kUnknownIOError)); } @@ -164,7 +163,7 @@ TEST_F(RequestTests, DataIsNullAtInit) { TEST_F(RequestTests, GetDataIsNotNullptr) { - request->Handle(&stat); + request->Handle(stat); auto& data = request->GetData(); diff --git a/receiver/unittests/test_requests_dispatcher.cpp b/receiver/unittests/test_requests_dispatcher.cpp new file mode 100644 index 000000000..eb54f3581 --- /dev/null +++ b/receiver/unittests/test_requests_dispatcher.cpp @@ -0,0 +1,248 @@ +#include <gtest/gtest.h> +#include <gmock/gmock.h> + +#include "unittests/MockIO.h" +#include "unittests/MockLogger.h" +#include "../src/receiver_error.h" +#include "../src/request.h" +#include "../src/statistics.h" +#include "mock_statistics.h" +#include "../src/connection_authorizer.h" +#include "../src/receiver_config.h" +#include "../src/receiver_config_factory.h" +#include "mock_receiver_config.h" + +#include "../src/requests_dispatcher.h" + + +using ::testing::Test; +using ::testing::Return; +using ::testing::_; +using ::testing::DoAll; +using ::testing::SetArgReferee; +using ::testing::Gt; +using ::testing::Eq; +using ::testing::Ne; +using ::testing::Mock; +using ::testing::NiceMock; +using ::testing::SaveArg; +using ::testing::SaveArgPointee; +using ::testing::InSequence; +using ::testing::HasSubstr; +using ::testing::StrEq; +using ::testing::SetArgPointee; +using ::testing::AllOf; +using testing::Sequence; + +using asapo::Error; +using asapo::ErrorInterface; +using asapo::FileDescriptor; +using asapo::SocketDescriptor; +using asapo::GenericRequestHeader; +using asapo::SendDataResponse; +using asapo::GenericRequestHeader; +using asapo::GenericNetworkResponse; +using asapo::Opcode; +using asapo::MockIO; +using asapo::MockLogger; +using asapo::Request; +using asapo::Statistics; +using asapo::StatisticEntity; +using asapo::MockStatistics; + +using asapo::ReceiverConfig; +using asapo::SetReceiverConfig; + +using asapo::SetReceiverConfig; +using asapo::RequestsDispatcher; + + +namespace { + +TEST(RequestDispatcher, Constructor) { + asapo::Statistics* stat; + RequestsDispatcher dispatcher{0, "some_address",stat}; + ASSERT_THAT(dynamic_cast<const asapo::Statistics*>(dispatcher.statistics__), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<asapo::IO*>(dispatcher.io__.get()), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<asapo::RequestFactory*>(dispatcher.request_factory__.get()), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(dispatcher.log__), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<asapo::ConnectionAuthorizer*>(dispatcher.authorizer__.get()), Ne(nullptr)); +} + +class MockRequest: public Request { + public: + MockRequest(const GenericRequestHeader& request_header, SocketDescriptor socket_fd): + Request(request_header, socket_fd) {}; + Error Handle(Statistics* statistics) override { + return Error{Handle_t()}; + }; + MOCK_CONST_METHOD0(Handle_t, ErrorInterface * ()); +}; + + +class MockRequestFactory: public asapo::RequestFactory { + public: + std::unique_ptr<Request> GenerateRequest(const GenericRequestHeader& request_header, + SocketDescriptor socket_fd, + Error* err) const noexcept override { + ErrorInterface* error = nullptr; + auto res = GenerateRequest_t(request_header, socket_fd, &error); + err->reset(error); + return std::unique_ptr<Request> {res}; + } + + MOCK_CONST_METHOD3(GenerateRequest_t, Request * (const GenericRequestHeader&, + SocketDescriptor socket_fd, + ErrorInterface**)); + +}; + +class MockAuthorizer: public asapo::ConnectionAuthorizer { + public: + Error Authorize(std::string beamtime_id,std::string uri) const noexcept override { + return Error{Authorize_t(beamtime_id,uri)}; + } + MOCK_CONST_METHOD2(Authorize_t, ErrorInterface * (std::string beamtime_id,std::string uri)); + +}; + + +class RequestsDispatcherTests : public Test { + public: + std::unique_ptr<RequestsDispatcher> dispatcher; + std::string connected_uri{"some_address"}; + NiceMock<MockIO> mock_io; + MockRequestFactory mock_factory; + NiceMock<MockStatistics> mock_statictics; + NiceMock<asapo::MockLogger> mock_logger; + NiceMock<MockAuthorizer> mock_authorizer; + Sequence seq_send; + void MockAuthorize(); + + asapo::ReceiverConfig test_config; + GenericRequestHeader header; + std::string expected_beamtime_id="beamtime_id"; + MockRequest mock_authorize_request{GenericRequestHeader{asapo::kOpcodeAuthorize,0,0,expected_beamtime_id},1}; + + void SetUp() override { + test_config.authorization_interval = 0; + SetReceiverConfig(test_config); + dispatcher = std::unique_ptr<RequestsDispatcher> {new RequestsDispatcher{0, connected_uri, &mock_statictics}}; + dispatcher->io__ = std::unique_ptr<asapo::IO> {&mock_io}; + dispatcher->statistics__ = &mock_statictics; + dispatcher->request_factory__ = std::unique_ptr<asapo::RequestFactory> {&mock_factory}; + dispatcher->log__ = &mock_logger; + dispatcher->authorizer__ = std::unique_ptr<asapo::ConnectionAuthorizer> {&mock_authorizer}; + + } + void TearDown() override { + dispatcher->io__.release(); + dispatcher->request_factory__.release(); + dispatcher->authorizer__.release(); + } + void MockReceiveRequest(bool error ){ + EXPECT_CALL(mock_io, Receive_t(_, _, _, _)) + .WillOnce( + DoAll(SetArgPointee<3>(error?asapo::IOErrorTemplates::kUnknownIOError.Generate().release():nullptr), + Return(0)) + ); + + } + void MockCreateRequest(bool error ){ + EXPECT_CALL(mock_factory, GenerateRequest_t(_, _, _)) + .WillOnce( + DoAll(SetArgPointee<2>(error?asapo::ReceiverErrorTemplates::kInvalidOpCode.Generate().release():nullptr), + Return(nullptr)) + ); + + } + +}; + +TEST_F(RequestsDispatcherTests, ErrorReceivetNextRequest) { + EXPECT_CALL(mock_statictics, StartTimer_t(StatisticEntity::kNetwork)); + MockReceiveRequest(true); + EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("getting next request"), HasSubstr(connected_uri)))); + + Error err; + dispatcher->GetNextRequest(&err); + + ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kUnknownIOError)); +} + +TEST_F(RequestsDispatcherTests, ErrorCreatetNextRequest) { + MockReceiveRequest(false); + MockCreateRequest(true); + EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("error processing request from"), HasSubstr(connected_uri)))); + + Error err; + dispatcher->GetNextRequest(&err); + + ASSERT_THAT(err, Eq(asapo::ReceiverErrorTemplates::kInvalidOpCode)); +} + +TEST_F(RequestsDispatcherTests, OkCreatetNextRequest) { + MockReceiveRequest(false); + MockCreateRequest(false); + + Error err; + dispatcher->GetNextRequest(&err); + + ASSERT_THAT(err, Eq(nullptr)); +} + + +/* + +ACTION_P(A_WriteAuth, op_code) { + ((asapo::GenericRequestHeader*)arg1)->op_code = op_code; + strcpy(((asapo::GenericRequestHeader*)arg1)->message, "test"); +} + + +ACTION_P(SaveArg1ToGenericNetworkResponse, value) { + auto resp = *static_cast<const GenericNetworkResponse*>(arg1); + value->error_code = resp.error_code; + strcpy(value->message, resp.message); +} + + + +TEST_F(RequestsDispatcherTests, CallsHandleRequest) { + + GenericRequestHeader header; + auto request = new MockRequestHandler{header, 1}; + MockAuthorize(); + + EXPECT_CALL(mock_io, ReceiveWithTimeout_t(_, _, _, _, _)); + + EXPECT_CALL(mock_factory, GenerateRequest_t(_, _, _)).WillOnce( + Return(request) + ); + + EXPECT_CALL(*request, Handle_t()).WillOnce( + Return(new asapo::SimpleError{""}) + ); + + EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("processing request"), HasSubstr(connected_uri)))); + EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("processing request"), HasSubstr(connected_uri)))); + + + EXPECT_CALL(mock_io, Send_t(_, _, _, _)) + .InSequence(seq_send) + .WillOnce( + DoAll(SetArgPointee<3>(new asapo::IOError("Test Send Error", asapo::IOErrorType::kUnknownIOError)), + Return(0) + )); + + EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("sending response"), HasSubstr(connected_uri)))); + EXPECT_CALL(mock_logger, Info(AllOf(HasSubstr("disconnected"), HasSubstr(connected_uri)))); + + dispatcher->Listen(); +} + + + +*/ + +} -- GitLab