diff --git a/common/cpp/include/asapo/common/networking.h b/common/cpp/include/asapo/common/networking.h index 09009e49c59f33a42285d84bcd6be6007561bd34..bfce369d63d4535cdb276c49be20286c7e217cc7 100644 --- a/common/cpp/include/asapo/common/networking.h +++ b/common/cpp/include/asapo/common/networking.h @@ -69,6 +69,30 @@ enum NetworkErrorCode : uint16_t { kNetErrorInternalServerError = 65535, }; +inline std::string NetworkErrorCodeToString(uint16_t code) { + switch (code) { + case kNetErrorNoError: + return "success"; + case kNetErrorReauthorize: + return "reauthorize"; + case kNetErrorWarning: + return "warning"; + case kNetErrorWrongRequest: + return "wrong request"; + case kNetErrorNotSupported: + return "not suported"; + case kNetErrorNoData: + return "no data"; + case kNetAuthorizationError: + return "authorization error"; + case kNetErrorInternalServerError: + return "internal server error"; + default: + return "unknown code"; + } +} + + //TODO need to use an serialization framework to ensure struct consistency on different computers const std::size_t kMaxMessageSize = 1024; diff --git a/config/nomad/receiver_tcp.nmd.in b/config/nomad/receiver_tcp.nmd.in index 07bc0a4798abf454cde610ddbe352402b2b641b3..b8a5bb56238ddfdd25b07eb1e44e2c1e8bebd488 100644 --- a/config/nomad/receiver_tcp.nmd.in +++ b/config/nomad/receiver_tcp.nmd.in @@ -27,14 +27,6 @@ job "receiver" { service { name = "asapo-receiver" port = "recv" - check { - name = "alive" - type = "tcp" - port = "recv" - interval = "10s" - timeout = "2s" - initial_status = "passing" - } check { name = "metrics" type = "http" diff --git a/receiver/src/connection.cpp b/receiver/src/connection.cpp index 2770d0d89ef5bed286f27f3446164f47f19b5331..bd1a6d9c6395c9ba67a6c53dd419b97d057ef73d 100644 --- a/receiver/src/connection.cpp +++ b/receiver/src/connection.cpp @@ -44,7 +44,7 @@ void Connection::Listen() const noexcept { } io__->CloseSocket(socket_fd_, nullptr); statistics__->SendIfNeeded(true); - log__->Info(LogMessageWithFields("disconnected from ").Append("origin", HostFromUri(address_))); + log__->Info(LogMessageWithFields("disconnected from producer").Append("origin", HostFromUri(address_))); } diff --git a/receiver/src/metrics/receiver_mongoose_server.cpp b/receiver/src/metrics/receiver_mongoose_server.cpp index cce3b191873ca15b54cff465971a43fce469cb88..d705bea0c795f8819b8bdb37f3fd7ac240ce8b51 100644 --- a/receiver/src/metrics/receiver_mongoose_server.cpp +++ b/receiver/src/metrics/receiver_mongoose_server.cpp @@ -32,7 +32,7 @@ static void fn(struct mg_connection* c, int ev, void* ev_data, void* fn_data) { void asapo::ReceiverMongooseServer::ListenAndServe(std::string port, std::unique_ptr<ReceiverMetricsProvider> provider) { struct mg_mgr mgr; // Event manager - mg_log_set(0); + mg_log_set("0"); mg_mgr_init(&mgr); // Initialise event manager auto uri = "0.0.0.0:" + port; if (mg_http_listen(&mgr, uri.c_str(), fn, (void*) provider.get()) == NULL) { diff --git a/receiver/src/receiver.cpp b/receiver/src/receiver.cpp index 203eb13ac90ffdd6901071bfc847059fdedfec90..4e6f2352e78867c3456917c10ef92da5b702976c 100644 --- a/receiver/src/receiver.cpp +++ b/receiver/src/receiver.cpp @@ -54,7 +54,7 @@ void Receiver::ProcessConnections(Error* err) { } void Receiver::StartNewConnectionInSeparateThread(int connection_socket_fd, const std::string& address) { - log__->Info(LogMessageWithFields("new connection").Append("origin", HostFromUri(address))); + log__->Info(LogMessageWithFields("new connection with producer").Append("origin", HostFromUri(address))); auto thread = io__->NewThread("ConFd:" + std::to_string(connection_socket_fd), [connection_socket_fd, address, this] { auto connection = std::unique_ptr<Connection>(new Connection(connection_socket_fd, address, cache_, GetReceiverConfig()->tag)); diff --git a/receiver/src/receiver_logger.h b/receiver/src/receiver_logger.h index 9df4e29433a25720da29eebc1d4f4eb531dabca4..49385f52b9597e929f85d1107e0a30177e415314 100644 --- a/receiver/src/receiver_logger.h +++ b/receiver/src/receiver_logger.h @@ -2,13 +2,12 @@ #define ASAPO_RECEIVER_LOGGER_H #include "asapo/logger/logger.h" +#include "request.h" namespace asapo { -class Request; - AbstractLogger* GetDefaultReceiverLogger(); -LogMessageWithFields RequestLog(std::string message, const Request* request, std::string origin); +LogMessageWithFields RequestLog(std::string message, const Request* request); } diff --git a/receiver/src/request_handler/requests_dispatcher.cpp b/receiver/src/request_handler/requests_dispatcher.cpp index ad009aba5addd6dbc66bec875df4900554c3905f..d9bb0642c901743c834e2d4eca57c4c08dabb56e 100644 --- a/receiver/src/request_handler/requests_dispatcher.cpp +++ b/receiver/src/request_handler/requests_dispatcher.cpp @@ -8,10 +8,14 @@ namespace asapo { RequestsDispatcher::RequestsDispatcher(SocketDescriptor socket_fd, std::string address, ReceiverStatistics* statistics, SharedCache cache) : statistics__{statistics}, io__{GenerateDefaultIO()}, - log__{GetDefaultReceiverLogger()}, - request_factory__{new RequestFactory{cache}}, - socket_fd_{socket_fd}, -producer_uri_{std::move(address)} { + log__{ + GetDefaultReceiverLogger()}, +request_factory__{ + new RequestFactory{ + cache}}, +socket_fd_{socket_fd}, +producer_uri_{ + std::move(address)} { } NetworkErrorCode GetNetworkCodeFromError(const Error& err) { @@ -50,55 +54,56 @@ GenericNetworkResponse RequestsDispatcher::CreateResponseToRequest(const std::un } Error RequestsDispatcher::HandleRequest(const std::unique_ptr<Request>& request) const { - log__->Debug("processing request id " + std::to_string(request->GetDataID()) + ", opcode " + - std::to_string(request->GetOpCode()) + " from " + producer_uri_ ); + log__->Debug(RequestLog("got new request", request.get())); Error handle_err; handle_err = request->Handle(statistics__); if (handle_err) { if (handle_err == ReceiverErrorTemplates::kReAuthorizationFailure) { - log__->Warning("warning processing request from " + producer_uri_ + " - " + handle_err->Explain()); + log__->Warning(RequestLog("warning processing request: " + handle_err->Explain(), request.get())); } else { - log__->Error("error processing request from " + producer_uri_ + " - " + handle_err->Explain()); + log__->Error(RequestLog("error processing request: " + handle_err->Explain(), request.get())); } } return handle_err; } Error RequestsDispatcher::SendResponse(const std::unique_ptr<Request>& request, const Error& handle_error) const { - log__->Debug("sending response to " + producer_uri_ ); Error io_err; GenericNetworkResponse generic_response = CreateResponseToRequest(request, handle_error); + auto log = RequestLog("sending response", request.get()). + Append("response", NetworkErrorCodeToString(generic_response.error_code)); + log__->Debug(log); io__->Send(socket_fd_, &generic_response, sizeof(GenericNetworkResponse), &io_err); if (io_err) { - log__->Error("error sending response to " + producer_uri_ + " - " + io_err->Explain()); + log__->Error(RequestLog("error sending response: " + io_err->Explain(), request.get())); } return io_err; } Error RequestsDispatcher::ProcessRequest(const std::unique_ptr<Request>& request) const noexcept { - auto handle_err = HandleRequest(request); - auto io_err = SendResponse(request, handle_err); + auto handle_err = HandleRequest(request); + auto io_err = SendResponse(request, handle_err); return handle_err == nullptr ? std::move(io_err) : std::move(handle_err); } std::unique_ptr<Request> RequestsDispatcher::GetNextRequest(Error* err) const noexcept { //TODO: to be overwritten with MessagePack (or similar) GenericRequestHeader generic_request_header; - statistics__-> StartTimer(StatisticEntity::kNetwork); - io__-> Receive(socket_fd_, &generic_request_header, - sizeof(GenericRequestHeader), err); - if(*err) { - if (*err == ErrorTemplates::kEndOfFile) { - log__->Debug("error getting next request from " + producer_uri_ + " - " + "peer has performed an orderly shutdown"); - } else { - log__->Error("error getting next request from " + producer_uri_ + " - " + (*err)->Explain()); + statistics__->StartTimer(StatisticEntity::kNetwork); + io__->Receive(socket_fd_, &generic_request_header, + sizeof(GenericRequestHeader), err); + if (*err) { + if (*err != ErrorTemplates::kEndOfFile) { + log__->Error(LogMessageWithFields("error getting next request: " + (*err)->Explain()). + Append("origin", HostFromUri(producer_uri_))); } return nullptr; } - statistics__-> StopTimer(); + statistics__->StopTimer(); auto request = request_factory__->GenerateRequest(generic_request_header, socket_fd_, producer_uri_, err); if (*err) { - log__->Error("error processing request from " + producer_uri_ + " - " + (*err)->Explain()); + log__->Error(LogMessageWithFields("error processing request: " + (*err)->Explain()). + Append("origin", HostFromUri(producer_uri_))); } return request; } diff --git a/receiver/unittests/request_handler/test_requests_dispatcher.cpp b/receiver/unittests/request_handler/test_requests_dispatcher.cpp index aad695db0406141df911288ce466a8400b66722b..978a662f38d083609ff513ca83f07b283e0a2389 100644 --- a/receiver/unittests/request_handler/test_requests_dispatcher.cpp +++ b/receiver/unittests/request_handler/test_requests_dispatcher.cpp @@ -64,8 +64,8 @@ TEST(RequestDispatcher, Constructor) { class MockRequest: public Request { public: - MockRequest(const GenericRequestHeader& request_header, SocketDescriptor socket_fd): - Request(request_header, socket_fd, "", nullptr, nullptr) {}; + MockRequest(const GenericRequestHeader& request_header, SocketDescriptor socket_fd, std::string uri = ""): + Request(request_header, socket_fd, uri, nullptr, nullptr) {}; Error Handle(ReceiverStatistics*) override { return Error{Handle_t()}; }; @@ -109,7 +109,7 @@ class RequestsDispatcherTests : public Test { asapo::ReceiverConfig test_config; GenericRequestHeader header; - MockRequest mock_request{GenericRequestHeader{}, 1}; + MockRequest mock_request{GenericRequestHeader{}, 1, connected_uri}; std::unique_ptr<Request> request{&mock_request}; GenericNetworkResponse response; void SetUp() override { @@ -145,25 +145,25 @@ class RequestsDispatcherTests : public Test { Return(nullptr)) ); if (error) { - EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("error processing request from"), HasSubstr(connected_uri)))); + EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("error processing request"), HasSubstr(connected_uri)))); } } void MockHandleRequest(int error_mode, Error err = asapo::IOErrorTemplates::kUnknownIOError.Generate() ) { - EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("processing request"), HasSubstr(connected_uri)))); + EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("got new request"), HasSubstr(connected_uri)))); EXPECT_CALL(mock_request, Handle_t()).WillOnce( Return(error_mode > 0 ? err.release() : nullptr) ); if (error_mode == 1) { - EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("error processing request from"), HasSubstr(connected_uri)))); + EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("error processing request"), HasSubstr(connected_uri)))); } else if (error_mode == 2) { - EXPECT_CALL(mock_logger, Warning(AllOf(HasSubstr("warning processing request from"), HasSubstr(connected_uri)))); + EXPECT_CALL(mock_logger, Warning(AllOf(HasSubstr("warning processing request"), HasSubstr(connected_uri)))); } } void MockSendResponse(GenericNetworkResponse* response, bool error ) { - EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("sending response to"), HasSubstr(connected_uri)))); + EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("sending response"), HasSubstr(connected_uri)))); ; EXPECT_CALL(mock_io, Send_t(_, _, _, _)).WillOnce( DoAll(SetArgPointee<3>(error ? asapo::IOErrorTemplates::kConnectionRefused.Generate().release() : nullptr), @@ -197,9 +197,6 @@ TEST_F(RequestsDispatcherTests, ClosedConnectionOnReceivetNextRequest) { DoAll(SetArgPointee<3>(asapo::ErrorTemplates::kEndOfFile.Generate().release()), Return(0)) ); - EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("peer has performed an orderly shutdown"), - HasSubstr("getting next request"), HasSubstr(connected_uri)))); - Error err; dispatcher->GetNextRequest(&err);