Skip to content
Snippets Groups Projects
Commit 1621aa77 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

update logs for dispatcher

parent b97353d6
No related branches found
No related tags found
No related merge requests found
......@@ -69,6 +69,30 @@ enum NetworkErrorCode : uint16_t {
kNetErrorInternalServerError = 65535,
};
inline std::string NetworkErrorCodeToString(uint16_t code) {
switch (code) {
case kNetErrorNoError:
return "success";
case kNetErrorReauthorize:
return "reauthorize";
case kNetErrorWarning:
return "warning";
case kNetErrorWrongRequest:
return "wrong request";
case kNetErrorNotSupported:
return "not suported";
case kNetErrorNoData:
return "no data";
case kNetAuthorizationError:
return "authorization error";
case kNetErrorInternalServerError:
return "internal server error";
default:
return "unknown code";
}
}
//TODO need to use an serialization framework to ensure struct consistency on different computers
const std::size_t kMaxMessageSize = 1024;
......
......@@ -27,14 +27,6 @@ job "receiver" {
service {
name = "asapo-receiver"
port = "recv"
check {
name = "alive"
type = "tcp"
port = "recv"
interval = "10s"
timeout = "2s"
initial_status = "passing"
}
check {
name = "metrics"
type = "http"
......
......@@ -44,7 +44,7 @@ void Connection::Listen() const noexcept {
}
io__->CloseSocket(socket_fd_, nullptr);
statistics__->SendIfNeeded(true);
log__->Info(LogMessageWithFields("disconnected from ").Append("origin", HostFromUri(address_)));
log__->Info(LogMessageWithFields("disconnected from producer").Append("origin", HostFromUri(address_)));
}
......
......@@ -32,7 +32,7 @@ static void fn(struct mg_connection* c, int ev, void* ev_data, void* fn_data) {
void asapo::ReceiverMongooseServer::ListenAndServe(std::string port,
std::unique_ptr<ReceiverMetricsProvider> provider) {
struct mg_mgr mgr; // Event manager
mg_log_set(0);
mg_log_set("0");
mg_mgr_init(&mgr); // Initialise event manager
auto uri = "0.0.0.0:" + port;
if (mg_http_listen(&mgr, uri.c_str(), fn, (void*) provider.get()) == NULL) {
......
......@@ -54,7 +54,7 @@ void Receiver::ProcessConnections(Error* err) {
}
void Receiver::StartNewConnectionInSeparateThread(int connection_socket_fd, const std::string& address) {
log__->Info(LogMessageWithFields("new connection").Append("origin", HostFromUri(address)));
log__->Info(LogMessageWithFields("new connection with producer").Append("origin", HostFromUri(address)));
auto thread = io__->NewThread("ConFd:" + std::to_string(connection_socket_fd),
[connection_socket_fd, address, this] {
auto connection = std::unique_ptr<Connection>(new Connection(connection_socket_fd, address, cache_, GetReceiverConfig()->tag));
......
......@@ -2,13 +2,12 @@
#define ASAPO_RECEIVER_LOGGER_H
#include "asapo/logger/logger.h"
#include "request.h"
namespace asapo {
class Request;
AbstractLogger* GetDefaultReceiverLogger();
LogMessageWithFields RequestLog(std::string message, const Request* request, std::string origin);
LogMessageWithFields RequestLog(std::string message, const Request* request);
}
......
......@@ -8,10 +8,14 @@ namespace asapo {
RequestsDispatcher::RequestsDispatcher(SocketDescriptor socket_fd, std::string address,
ReceiverStatistics* statistics, SharedCache cache) : statistics__{statistics},
io__{GenerateDefaultIO()},
log__{GetDefaultReceiverLogger()},
request_factory__{new RequestFactory{cache}},
socket_fd_{socket_fd},
producer_uri_{std::move(address)} {
log__{
GetDefaultReceiverLogger()},
request_factory__{
new RequestFactory{
cache}},
socket_fd_{socket_fd},
producer_uri_{
std::move(address)} {
}
NetworkErrorCode GetNetworkCodeFromError(const Error& err) {
......@@ -50,55 +54,56 @@ GenericNetworkResponse RequestsDispatcher::CreateResponseToRequest(const std::un
}
Error RequestsDispatcher::HandleRequest(const std::unique_ptr<Request>& request) const {
log__->Debug("processing request id " + std::to_string(request->GetDataID()) + ", opcode " +
std::to_string(request->GetOpCode()) + " from " + producer_uri_ );
log__->Debug(RequestLog("got new request", request.get()));
Error handle_err;
handle_err = request->Handle(statistics__);
if (handle_err) {
if (handle_err == ReceiverErrorTemplates::kReAuthorizationFailure) {
log__->Warning("warning processing request from " + producer_uri_ + " - " + handle_err->Explain());
log__->Warning(RequestLog("warning processing request: " + handle_err->Explain(), request.get()));
} else {
log__->Error("error processing request from " + producer_uri_ + " - " + handle_err->Explain());
log__->Error(RequestLog("error processing request: " + handle_err->Explain(), request.get()));
}
}
return handle_err;
}
Error RequestsDispatcher::SendResponse(const std::unique_ptr<Request>& request, const Error& handle_error) const {
log__->Debug("sending response to " + producer_uri_ );
Error io_err;
GenericNetworkResponse generic_response = CreateResponseToRequest(request, handle_error);
auto log = RequestLog("sending response", request.get()).
Append("response", NetworkErrorCodeToString(generic_response.error_code));
log__->Debug(log);
io__->Send(socket_fd_, &generic_response, sizeof(GenericNetworkResponse), &io_err);
if (io_err) {
log__->Error("error sending response to " + producer_uri_ + " - " + io_err->Explain());
log__->Error(RequestLog("error sending response: " + io_err->Explain(), request.get()));
}
return io_err;
}
Error RequestsDispatcher::ProcessRequest(const std::unique_ptr<Request>& request) const noexcept {
auto handle_err = HandleRequest(request);
auto io_err = SendResponse(request, handle_err);
auto handle_err = HandleRequest(request);
auto io_err = SendResponse(request, handle_err);
return handle_err == nullptr ? std::move(io_err) : std::move(handle_err);
}
std::unique_ptr<Request> RequestsDispatcher::GetNextRequest(Error* err) const noexcept {
//TODO: to be overwritten with MessagePack (or similar)
GenericRequestHeader generic_request_header;
statistics__-> StartTimer(StatisticEntity::kNetwork);
io__-> Receive(socket_fd_, &generic_request_header,
sizeof(GenericRequestHeader), err);
if(*err) {
if (*err == ErrorTemplates::kEndOfFile) {
log__->Debug("error getting next request from " + producer_uri_ + " - " + "peer has performed an orderly shutdown");
} else {
log__->Error("error getting next request from " + producer_uri_ + " - " + (*err)->Explain());
statistics__->StartTimer(StatisticEntity::kNetwork);
io__->Receive(socket_fd_, &generic_request_header,
sizeof(GenericRequestHeader), err);
if (*err) {
if (*err != ErrorTemplates::kEndOfFile) {
log__->Error(LogMessageWithFields("error getting next request: " + (*err)->Explain()).
Append("origin", HostFromUri(producer_uri_)));
}
return nullptr;
}
statistics__-> StopTimer();
statistics__->StopTimer();
auto request = request_factory__->GenerateRequest(generic_request_header, socket_fd_, producer_uri_, err);
if (*err) {
log__->Error("error processing request from " + producer_uri_ + " - " + (*err)->Explain());
log__->Error(LogMessageWithFields("error processing request: " + (*err)->Explain()).
Append("origin", HostFromUri(producer_uri_)));
}
return request;
}
......
......@@ -64,8 +64,8 @@ TEST(RequestDispatcher, Constructor) {
class MockRequest: public Request {
public:
MockRequest(const GenericRequestHeader& request_header, SocketDescriptor socket_fd):
Request(request_header, socket_fd, "", nullptr, nullptr) {};
MockRequest(const GenericRequestHeader& request_header, SocketDescriptor socket_fd, std::string uri = ""):
Request(request_header, socket_fd, uri, nullptr, nullptr) {};
Error Handle(ReceiverStatistics*) override {
return Error{Handle_t()};
};
......@@ -109,7 +109,7 @@ class RequestsDispatcherTests : public Test {
asapo::ReceiverConfig test_config;
GenericRequestHeader header;
MockRequest mock_request{GenericRequestHeader{}, 1};
MockRequest mock_request{GenericRequestHeader{}, 1, connected_uri};
std::unique_ptr<Request> request{&mock_request};
GenericNetworkResponse response;
void SetUp() override {
......@@ -145,25 +145,25 @@ class RequestsDispatcherTests : public Test {
Return(nullptr))
);
if (error) {
EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("error processing request from"), HasSubstr(connected_uri))));
EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("error processing request"), HasSubstr(connected_uri))));
}
}
void MockHandleRequest(int error_mode, Error err = asapo::IOErrorTemplates::kUnknownIOError.Generate() ) {
EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("processing request"), HasSubstr(connected_uri))));
EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("got new request"), HasSubstr(connected_uri))));
EXPECT_CALL(mock_request, Handle_t()).WillOnce(
Return(error_mode > 0 ? err.release() : nullptr)
);
if (error_mode == 1) {
EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("error processing request from"), HasSubstr(connected_uri))));
EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("error processing request"), HasSubstr(connected_uri))));
} else if (error_mode == 2) {
EXPECT_CALL(mock_logger, Warning(AllOf(HasSubstr("warning processing request from"), HasSubstr(connected_uri))));
EXPECT_CALL(mock_logger, Warning(AllOf(HasSubstr("warning processing request"), HasSubstr(connected_uri))));
}
}
void MockSendResponse(GenericNetworkResponse* response, bool error ) {
EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("sending response to"), HasSubstr(connected_uri))));
EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("sending response"), HasSubstr(connected_uri))));
;
EXPECT_CALL(mock_io, Send_t(_, _, _, _)).WillOnce(
DoAll(SetArgPointee<3>(error ? asapo::IOErrorTemplates::kConnectionRefused.Generate().release() : nullptr),
......@@ -197,9 +197,6 @@ TEST_F(RequestsDispatcherTests, ClosedConnectionOnReceivetNextRequest) {
DoAll(SetArgPointee<3>(asapo::ErrorTemplates::kEndOfFile.Generate().release()),
Return(0))
);
EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("peer has performed an orderly shutdown"),
HasSubstr("getting next request"), HasSubstr(connected_uri))));
Error err;
dispatcher->GetNextRequest(&err);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment