Skip to content
Snippets Groups Projects
Commit 7fcf16ca authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

add logs to produxer requests, refactoring

parent 8f53180b
No related branches found
No related tags found
No related merge requests found
...@@ -12,15 +12,15 @@ class MockHttpClient : public HttpClient { ...@@ -12,15 +12,15 @@ class MockHttpClient : public HttpClient {
public: public:
std::string Get(const std::string& uri, HttpCode* code, Error* err) const noexcept override { std::string Get(const std::string& uri, HttpCode* code, Error* err) const noexcept override {
ErrorInterface* error = nullptr; ErrorInterface* error = nullptr;
auto responce = Get_t(uri, code, &error); auto response = Get_t(uri, code, &error);
err->reset(error); err->reset(error);
return responce; return response;
} }
std::string Post(const std::string& uri, const std::string& data, HttpCode* code, Error* err) const noexcept override { std::string Post(const std::string& uri, const std::string& data, HttpCode* code, Error* err) const noexcept override {
ErrorInterface* error = nullptr; ErrorInterface* error = nullptr;
auto responce = Post_t(uri, data, code, &error); auto response = Post_t(uri, data, code, &error);
err->reset(error); err->reset(error);
return responce; return response;
} }
MOCK_CONST_METHOD3(Get_t, MOCK_CONST_METHOD3(Get_t,
std::string(const std::string& uri, HttpCode* code, ErrorInterface** err)); std::string(const std::string& uri, HttpCode* code, ErrorInterface** err));
......
set(TARGET_NAME producer-api) set(TARGET_NAME producer-api)
set(SOURCE_FILES set(SOURCE_FILES
src/producer.cpp src/producer.cpp
src/producer_impl.h
src/producer_impl.cpp src/producer_impl.cpp
src/producer_logger.cpp
src/request.cpp src/request.cpp
src/request_pool.cpp src/receivers_status.cpp src/receivers_status.h) src/request_pool.cpp src/receivers_status.cpp src/receivers_status.h)
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#include <cstring> #include <cstring>
#include "producer_impl.h" #include "producer_impl.h"
#include "producer_logger.h"
#include "io/io_factory.h" #include "io/io_factory.h"
namespace asapo { namespace asapo {
...@@ -9,9 +10,7 @@ namespace asapo { ...@@ -9,9 +10,7 @@ namespace asapo {
const uint32_t ProducerImpl::kVersion = 1; const uint32_t ProducerImpl::kVersion = 1;
const size_t ProducerImpl::kMaxChunkSize = size_t(1024) * size_t(1024) * size_t(1024) * size_t(2); //2GiByte const size_t ProducerImpl::kMaxChunkSize = size_t(1024) * size_t(1024) * size_t(1024) * size_t(2); //2GiByte
ProducerImpl::ProducerImpl(): io__{GenerateDefaultIO()} { ProducerImpl::ProducerImpl(): io__{GenerateDefaultIO()},log__{GetDefaultProducerLogger()} {
//todo get fluentd uri from service discovery
log__ = CreateDefaultLoggerApi("producer_api", "http://max-wgs.desy.de:9880/asapo");
} }
uint64_t ProducerImpl::GetVersion() const { uint64_t ProducerImpl::GetVersion() const {
......
...@@ -38,7 +38,7 @@ class ProducerImpl : public Producer { ...@@ -38,7 +38,7 @@ class ProducerImpl : public Producer {
Error ConnectToReceiver(const std::string& receiver_address) override; Error ConnectToReceiver(const std::string& receiver_address) override;
Error Send(uint64_t file_id, const void* data, size_t file_size) override; Error Send(uint64_t file_id, const void* data, size_t file_size) override;
std::unique_ptr<IO> io__; std::unique_ptr<IO> io__;
Logger log__; AbstractLogger* log__;
}; };
} }
......
#include "producer_logger.h"
namespace asapo {
AbstractLogger* GetDefaultProducerLogger() {
//todo get fluentd uri from service discovery
static Logger logger = CreateDefaultLoggerApi("producer_api", "http://max-wgs.desy.de:9880/asapo");
return logger.get();
}
}
#ifndef ASAPO_PRODUCER_LOGGER_H
#define ASAPO_PRODUCER_LOGGER_H
#include "logger/logger.h"
namespace asapo {
AbstractLogger* GetDefaultProducerLogger();
}
#endif //ASAPO_PRODUCER_LOGGER_H
#include <producer/producer_error.h> #include "producer/producer_error.h"
#include "request.h" #include "request.h"
#include "producer_logger.h"
namespace asapo { namespace asapo {
Request::Request(const asapo::IO* io, const GenericNetworkRequestHeader& header, const void* data, Request::Request(const asapo::IO* io, const GenericNetworkRequestHeader& header, const void* data,
RequestCallback callback): RequestCallback callback):
io__{io}, header_(header), data_{data}, callback_{std::move(callback)} { io__{io}, log__{GetDefaultProducerLogger()},header_(header), data_{data}, callback_{std::move(callback)} {
} }
...@@ -16,19 +17,22 @@ Error Request::ConnectToReceiver(SocketDescriptor* sd, const std::string& receiv ...@@ -16,19 +17,22 @@ Error Request::ConnectToReceiver(SocketDescriptor* sd, const std::string& receiv
//log__->Debug("cannot connect to receiver at " + receiver_address + " - " + err->Explain()); //log__->Debug("cannot connect to receiver at " + receiver_address + " - " + err->Explain());
return err; return err;
} }
log__->Info("connected to receiver at " + receiver_address);
return nullptr; return nullptr;
} }
Error Request::SendHeaderAndData(SocketDescriptor sd) { Error Request::SendHeaderAndData(SocketDescriptor sd,const std::string& receiver_address) {
Error io_error; Error io_error;
io__->Send(sd, &header_, sizeof(header_), &io_error); io__->Send(sd, &header_, sizeof(header_), &io_error);
if(io_error) { if(io_error) {
// todo: add meaningful message to the io_error (here and below) // todo: add meaningful message to the io_error (here and below)
log__->Debug("cannot send header to " + receiver_address + " - " + io_error->Explain());
return io_error; return io_error;
} }
io__->Send(sd, data_, header_.data_size, &io_error); io__->Send(sd, data_, header_.data_size, &io_error);
if(io_error) { if(io_error) {
log__->Debug("cannot send data to " + receiver_address + " - " + io_error->Explain());
return io_error; return io_error;
} }
...@@ -36,11 +40,12 @@ Error Request::SendHeaderAndData(SocketDescriptor sd) { ...@@ -36,11 +40,12 @@ Error Request::SendHeaderAndData(SocketDescriptor sd) {
} }
Error Request::ReceiveResponce(SocketDescriptor sd) { Error Request::ReceiveResponse(SocketDescriptor sd, const std::string &receiver_address) {
Error err; Error err;
SendDataResponse sendDataResponse; SendDataResponse sendDataResponse;
io__->Receive(sd, &sendDataResponse, sizeof(sendDataResponse), &err); io__->Receive(sd, &sendDataResponse, sizeof(sendDataResponse), &err);
if(err != nullptr) { if(err != nullptr) {
log__->Debug("cannot receive response from " + receiver_address + " - " + err->Explain());
return err; return err;
} }
...@@ -53,6 +58,22 @@ Error Request::ReceiveResponce(SocketDescriptor sd) { ...@@ -53,6 +58,22 @@ Error Request::ReceiveResponce(SocketDescriptor sd) {
return nullptr; return nullptr;
} }
Error Request::TrySendToReceiver(SocketDescriptor sd, const std::string& receiver_address) {
auto err = SendHeaderAndData(sd,receiver_address);
if (err) {
return err;
}
err = ReceiveResponse(sd, receiver_address);
if (err) {
return err;
}
log__->Debug("successfully sent data to " + receiver_address);
return nullptr;
}
Error Request::Send(SocketDescriptor* sd, const ReceiversList& receivers_list) { Error Request::Send(SocketDescriptor* sd, const ReceiversList& receivers_list) {
for (auto receiver_uri : receivers_list) { for (auto receiver_uri : receivers_list) {
...@@ -61,20 +82,13 @@ Error Request::Send(SocketDescriptor* sd, const ReceiversList& receivers_list) { ...@@ -61,20 +82,13 @@ Error Request::Send(SocketDescriptor* sd, const ReceiversList& receivers_list) {
if (err != nullptr ) continue; if (err != nullptr ) continue;
} }
auto err = SendHeaderAndData(*sd); auto err = TrySendToReceiver(*sd,receiver_uri);
if (err != nullptr ) {
io__->CloseSocket(*sd, nullptr);
*sd = kDisconnectedSocketDescriptor;
continue;
}
err = ReceiveResponce(*sd);
if (err != nullptr && err != ProducerErrorTemplates::kFileIdAlreadyInUse) { if (err != nullptr && err != ProducerErrorTemplates::kFileIdAlreadyInUse) {
io__->CloseSocket(*sd, nullptr); io__->CloseSocket(*sd, nullptr);
*sd = kDisconnectedSocketDescriptor; *sd = kDisconnectedSocketDescriptor;
continue; continue;
} }
callback_(header_, std::move(err)); callback_(header_, std::move(err));
return nullptr; return nullptr;
......
...@@ -15,11 +15,13 @@ class Request { ...@@ -15,11 +15,13 @@ class Request {
explicit Request(const asapo::IO* io, const GenericNetworkRequestHeader& header, const void* data, explicit Request(const asapo::IO* io, const GenericNetworkRequestHeader& header, const void* data,
RequestCallback callback); RequestCallback callback);
Error Send(SocketDescriptor* sd, const ReceiversList& receivers_list); Error Send(SocketDescriptor* sd, const ReceiversList& receivers_list);
const asapo::IO* io__; const IO* io__;
const AbstractLogger* log__;
private: private:
Error ConnectToReceiver(SocketDescriptor* sd, const std::string& receiver_address); Error ConnectToReceiver(SocketDescriptor* sd, const std::string& receiver_address);
Error SendHeaderAndData(SocketDescriptor sd); Error SendHeaderAndData(SocketDescriptor sd,const std::string& receiver_address);
Error ReceiveResponce(SocketDescriptor sd); Error ReceiveResponse(SocketDescriptor sd, const std::string &receiver_address);
Error TrySendToReceiver(SocketDescriptor sd, const std::string& receiver_address);
GenericNetworkRequestHeader header_; GenericNetworkRequestHeader header_;
const void* data_; const void* data_;
RequestCallback callback_; RequestCallback callback_;
......
...@@ -29,7 +29,7 @@ TEST(get_version, VersionAboveZero) { ...@@ -29,7 +29,7 @@ TEST(get_version, VersionAboveZero) {
TEST(Producer, Logger) { TEST(Producer, Logger) {
asapo::ProducerImpl producer; asapo::ProducerImpl producer;
ASSERT_THAT(dynamic_cast<asapo::AbstractLogger*>(producer.log__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::AbstractLogger*>(producer.log__), Ne(nullptr));
} }
/* /*
......
...@@ -23,6 +23,8 @@ using ::testing::Gt; ...@@ -23,6 +23,8 @@ using ::testing::Gt;
using ::testing::Eq; using ::testing::Eq;
using ::testing::Ne; using ::testing::Ne;
using ::testing::Mock; using ::testing::Mock;
using ::testing::AllOf;
using ::testing::InSequence; using ::testing::InSequence;
using ::testing::HasSubstr; using ::testing::HasSubstr;
...@@ -34,6 +36,8 @@ TEST(Producer, Constructor) { ...@@ -34,6 +36,8 @@ TEST(Producer, Constructor) {
asapo::Request request{io.get(), header, nullptr, [](asapo::GenericNetworkRequestHeader, asapo::Error) {}}; asapo::Request request{io.get(), header, nullptr, [](asapo::GenericNetworkRequestHeader, asapo::Error) {}};
ASSERT_THAT(dynamic_cast<const asapo::IO*>(request.io__), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::IO*>(request.io__), Ne(nullptr));
ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(request.log__), Ne(nullptr));
} }
...@@ -72,7 +76,7 @@ class RequestTests : public testing::Test { ...@@ -72,7 +76,7 @@ class RequestTests : public testing::Test {
void ExpectOKReceive(bool only_once = true); void ExpectOKReceive(bool only_once = true);
void SetUp() override { void SetUp() override {
// request.log__ = asapo::Logger {&mock_logger}; request.log__ = &mock_logger;
} }
void TearDown() override { void TearDown() override {
} }
...@@ -107,6 +111,7 @@ void RequestTests::ExpectFailConnect(bool only_once) { ...@@ -107,6 +111,7 @@ void RequestTests::ExpectFailConnect(bool only_once) {
void RequestTests::ExpectFailSendHeader(bool only_once) { void RequestTests::ExpectFailSendHeader(bool only_once) {
int i = 0;
for (auto expected_sd : expected_sds) { for (auto expected_sd : expected_sds) {
EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(expected_file_id, EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(expected_file_id,
expected_file_size), expected_file_size),
...@@ -116,13 +121,20 @@ void RequestTests::ExpectFailSendHeader(bool only_once) { ...@@ -116,13 +121,20 @@ void RequestTests::ExpectFailSendHeader(bool only_once) {
testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()),
Return(-1) Return(-1)
)); ));
EXPECT_CALL(mock_logger, Debug(AllOf(
HasSubstr("cannot send header"),
HasSubstr(receivers_list[i])
)
));
EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _));
if (only_once) break; if (only_once) break;
i++;
} }
} }
void RequestTests::ExpectFailSendData(bool only_once) { void RequestTests::ExpectFailSendData(bool only_once) {
int i = 0;
for (auto expected_sd : expected_sds) { for (auto expected_sd : expected_sds) {
EXPECT_CALL(mock_io, Send_t(expected_sd, expected_file_pointer, expected_file_size, _)) EXPECT_CALL(mock_io, Send_t(expected_sd, expected_file_pointer, expected_file_size, _))
.Times(1) .Times(1)
...@@ -131,14 +143,21 @@ void RequestTests::ExpectFailSendData(bool only_once) { ...@@ -131,14 +143,21 @@ void RequestTests::ExpectFailSendData(bool only_once) {
testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()),
Return(-1) Return(-1)
)); ));
EXPECT_CALL(mock_logger, Debug(AllOf(
HasSubstr("cannot send data"),
HasSubstr(receivers_list[i])
)
));
EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _));
if (only_once) break; if (only_once) break;
i++;
} }
} }
void RequestTests::ExpectFailReceive(bool only_once) { void RequestTests::ExpectFailReceive(bool only_once) {
int i = 0;
for (auto expected_sd : expected_sds) { for (auto expected_sd : expected_sds) {
EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _)) EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _))
.Times(1) .Times(1)
...@@ -147,8 +166,14 @@ void RequestTests::ExpectFailReceive(bool only_once) { ...@@ -147,8 +166,14 @@ void RequestTests::ExpectFailReceive(bool only_once) {
testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()),
testing::Return(-1) testing::Return(-1)
)); ));
EXPECT_CALL(mock_logger, Debug(AllOf(
HasSubstr("cannot receive"),
HasSubstr(receivers_list[i])
)
));
EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _));
if (only_once) break; if (only_once) break;
i++;
} }
} }
...@@ -195,6 +220,11 @@ void RequestTests::ExpectOKConnect(bool only_once) { ...@@ -195,6 +220,11 @@ void RequestTests::ExpectOKConnect(bool only_once) {
testing::SetArgPointee<1>(nullptr), testing::SetArgPointee<1>(nullptr),
Return(expected_sds[i]) Return(expected_sds[i])
)); ));
EXPECT_CALL(mock_logger, Info(AllOf(
HasSubstr("connected"),
HasSubstr(expected_address)
)
));
if (only_once) break; if (only_once) break;
i++; i++;
} }
...@@ -202,6 +232,7 @@ void RequestTests::ExpectOKConnect(bool only_once) { ...@@ -202,6 +232,7 @@ void RequestTests::ExpectOKConnect(bool only_once) {
void RequestTests::ExpectOKReceive(bool only_once) { void RequestTests::ExpectOKReceive(bool only_once) {
int i = 0;
for (auto expected_sd : expected_sds) { for (auto expected_sd : expected_sds) {
EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _)) EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _))
.WillOnce( .WillOnce(
...@@ -210,7 +241,13 @@ void RequestTests::ExpectOKReceive(bool only_once) { ...@@ -210,7 +241,13 @@ void RequestTests::ExpectOKReceive(bool only_once) {
A_WriteSendDataResponse(asapo::kNetErrorNoError), A_WriteSendDataResponse(asapo::kNetErrorNoError),
testing::ReturnArg<2>() testing::ReturnArg<2>()
)); ));
EXPECT_CALL(mock_logger, Debug(AllOf(
HasSubstr("sent data"),
HasSubstr(receivers_list[i])
)
));
if (only_once) break; if (only_once) break;
i++;
} }
} }
...@@ -271,7 +308,7 @@ TEST_F(RequestTests, ErrorWhenCannotReceiveData) { ...@@ -271,7 +308,7 @@ TEST_F(RequestTests, ErrorWhenCannotReceiveData) {
TEST_F(RequestTests, ImmediatelyRrrorIfFileAlreadyInUse) { TEST_F(RequestTests, ImmediatelyCalBackErrorIfFileAlreadyInUse) {
ExpectOKConnect(true); ExpectOKConnect(true);
ExpectOKSendHeader(true); ExpectOKSendHeader(true);
ExpectOKSendData(true); ExpectOKSendData(true);
......
...@@ -8,7 +8,7 @@ set(SOURCE_FILES ...@@ -8,7 +8,7 @@ set(SOURCE_FILES
src/statistics.cpp src/statistics.cpp
src/statistics_sender_influx_db.cpp src/statistics_sender_influx_db.cpp
src/receiver_config.cpp src/receiver_config.cpp
src/receiver_logger.cpp src/producer_logger.cpp
src/request_handler_db_write.cpp) src/request_handler_db_write.cpp)
......
File moved
...@@ -21,7 +21,7 @@ void StatisticsSenderInfluxDb::SendStatistics(const StatisticsToSend& statistic) ...@@ -21,7 +21,7 @@ void StatisticsSenderInfluxDb::SendStatistics(const StatisticsToSend& statistic)
//todo: send statistics async //todo: send statistics async
HttpCode code; HttpCode code;
Error err; Error err;
auto responce = httpclient__->Post(GetReceiverConfig()->monitor_db_uri + "/write?db=" + auto response = httpclient__->Post(GetReceiverConfig()->monitor_db_uri + "/write?db=" +
GetReceiverConfig()->monitor_db_name, StatisticsToString(statistic), GetReceiverConfig()->monitor_db_name, StatisticsToString(statistic),
&code, &err); &code, &err);
std::string msg = "sending statistics to " + GetReceiverConfig()->monitor_db_name + " at " + std::string msg = "sending statistics to " + GetReceiverConfig()->monitor_db_name + " at " +
...@@ -32,7 +32,7 @@ void StatisticsSenderInfluxDb::SendStatistics(const StatisticsToSend& statistic) ...@@ -32,7 +32,7 @@ void StatisticsSenderInfluxDb::SendStatistics(const StatisticsToSend& statistic)
} }
if (code != HttpCode::OK && code != HttpCode::NoContent) { if (code != HttpCode::OK && code != HttpCode::NoContent) {
log__->Error(msg + " - " + responce); log__->Error(msg + " - " + response);
return; return;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment