diff --git a/common/cpp/include/unittests/MockHttpClient.h b/common/cpp/include/unittests/MockHttpClient.h index dbb346f22ec297e5aab7a5b24039949bb628fac3..d128e8a84fd669604c99afdda2272445e434f471 100644 --- a/common/cpp/include/unittests/MockHttpClient.h +++ b/common/cpp/include/unittests/MockHttpClient.h @@ -12,15 +12,15 @@ class MockHttpClient : public HttpClient { public: std::string Get(const std::string& uri, HttpCode* code, Error* err) const noexcept override { ErrorInterface* error = nullptr; - auto responce = Get_t(uri, code, &error); + auto response = Get_t(uri, code, &error); err->reset(error); - return responce; + return response; } std::string Post(const std::string& uri, const std::string& data, HttpCode* code, Error* err) const noexcept override { ErrorInterface* error = nullptr; - auto responce = Post_t(uri, data, code, &error); + auto response = Post_t(uri, data, code, &error); err->reset(error); - return responce; + return response; } MOCK_CONST_METHOD3(Get_t, std::string(const std::string& uri, HttpCode* code, ErrorInterface** err)); diff --git a/producer/api/CMakeLists.txt b/producer/api/CMakeLists.txt index e6f0ada635b7bee1ea6c8068adfb6da7d806fdf6..818132ff30bfff9c03a8a8547b15915716e1bb4d 100644 --- a/producer/api/CMakeLists.txt +++ b/producer/api/CMakeLists.txt @@ -1,8 +1,8 @@ set(TARGET_NAME producer-api) set(SOURCE_FILES src/producer.cpp - src/producer_impl.h src/producer_impl.cpp + src/producer_logger.cpp src/request.cpp src/request_pool.cpp src/receivers_status.cpp src/receivers_status.h) diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp index 38f74e79d4a4a17fb18cf51ba46b7aaed63084a7..6763e92ffffb1f32b4c4d6030862623dd045b94a 100644 --- a/producer/api/src/producer_impl.cpp +++ b/producer/api/src/producer_impl.cpp @@ -2,6 +2,7 @@ #include <cstring> #include "producer_impl.h" +#include "producer_logger.h" #include "io/io_factory.h" namespace asapo { @@ -9,9 +10,7 @@ namespace asapo { const uint32_t ProducerImpl::kVersion = 1; const size_t ProducerImpl::kMaxChunkSize = size_t(1024) * size_t(1024) * size_t(1024) * size_t(2); //2GiByte -ProducerImpl::ProducerImpl(): io__{GenerateDefaultIO()} { - //todo get fluentd uri from service discovery - log__ = CreateDefaultLoggerApi("producer_api", "http://max-wgs.desy.de:9880/asapo"); +ProducerImpl::ProducerImpl(): io__{GenerateDefaultIO()},log__{GetDefaultProducerLogger()} { } uint64_t ProducerImpl::GetVersion() const { diff --git a/producer/api/src/producer_impl.h b/producer/api/src/producer_impl.h index 5710d8dd561b5688465b221d57fe8b88490c2348..95d9af21835ec674e93b0605b339de1ae5492ac7 100644 --- a/producer/api/src/producer_impl.h +++ b/producer/api/src/producer_impl.h @@ -38,7 +38,7 @@ class ProducerImpl : public Producer { Error ConnectToReceiver(const std::string& receiver_address) override; Error Send(uint64_t file_id, const void* data, size_t file_size) override; std::unique_ptr<IO> io__; - Logger log__; + AbstractLogger* log__; }; } diff --git a/producer/api/src/producer_logger.cpp b/producer/api/src/producer_logger.cpp new file mode 100644 index 0000000000000000000000000000000000000000..addf222008df69f0fb14ca0270b1b58bb7d4d16e --- /dev/null +++ b/producer/api/src/producer_logger.cpp @@ -0,0 +1,12 @@ +#include "producer_logger.h" + +namespace asapo { + + +AbstractLogger* GetDefaultProducerLogger() { + //todo get fluentd uri from service discovery + static Logger logger = CreateDefaultLoggerApi("producer_api", "http://max-wgs.desy.de:9880/asapo"); + return logger.get(); +} + +} diff --git a/producer/api/src/producer_logger.h b/producer/api/src/producer_logger.h new file mode 100644 index 0000000000000000000000000000000000000000..abaf35c2490a820f75f1531f5db8595537a289f3 --- /dev/null +++ b/producer/api/src/producer_logger.h @@ -0,0 +1,14 @@ +#ifndef ASAPO_PRODUCER_LOGGER_H +#define ASAPO_PRODUCER_LOGGER_H + +#include "logger/logger.h" + +namespace asapo { + + +AbstractLogger* GetDefaultProducerLogger(); + +} + + +#endif //ASAPO_PRODUCER_LOGGER_H diff --git a/producer/api/src/request.cpp b/producer/api/src/request.cpp index 7bd3f2bcdb48ab25bc10367a60f9d223f24374b6..bde4fcf672fdbc88135d7b78d57eb7b7472ca35e 100644 --- a/producer/api/src/request.cpp +++ b/producer/api/src/request.cpp @@ -1,11 +1,12 @@ -#include <producer/producer_error.h> +#include "producer/producer_error.h" #include "request.h" +#include "producer_logger.h" namespace asapo { Request::Request(const asapo::IO* io, const GenericNetworkRequestHeader& header, const void* data, RequestCallback callback): - io__{io}, header_(header), data_{data}, callback_{std::move(callback)} { + io__{io}, log__{GetDefaultProducerLogger()},header_(header), data_{data}, callback_{std::move(callback)} { } @@ -16,19 +17,22 @@ Error Request::ConnectToReceiver(SocketDescriptor* sd, const std::string& receiv //log__->Debug("cannot connect to receiver at " + receiver_address + " - " + err->Explain()); return err; } + log__->Info("connected to receiver at " + receiver_address); return nullptr; } -Error Request::SendHeaderAndData(SocketDescriptor sd) { +Error Request::SendHeaderAndData(SocketDescriptor sd,const std::string& receiver_address) { Error io_error; io__->Send(sd, &header_, sizeof(header_), &io_error); if(io_error) { // todo: add meaningful message to the io_error (here and below) + log__->Debug("cannot send header to " + receiver_address + " - " + io_error->Explain()); return io_error; } io__->Send(sd, data_, header_.data_size, &io_error); if(io_error) { + log__->Debug("cannot send data to " + receiver_address + " - " + io_error->Explain()); return io_error; } @@ -36,11 +40,12 @@ Error Request::SendHeaderAndData(SocketDescriptor sd) { } -Error Request::ReceiveResponce(SocketDescriptor sd) { +Error Request::ReceiveResponse(SocketDescriptor sd, const std::string &receiver_address) { Error err; SendDataResponse sendDataResponse; io__->Receive(sd, &sendDataResponse, sizeof(sendDataResponse), &err); if(err != nullptr) { + log__->Debug("cannot receive response from " + receiver_address + " - " + err->Explain()); return err; } @@ -53,6 +58,22 @@ Error Request::ReceiveResponce(SocketDescriptor sd) { return nullptr; } +Error Request::TrySendToReceiver(SocketDescriptor sd, const std::string& receiver_address) { + auto err = SendHeaderAndData(sd,receiver_address); + if (err) { + return err; + } + + err = ReceiveResponse(sd, receiver_address); + if (err) { + return err; + } + + log__->Debug("successfully sent data to " + receiver_address); + return nullptr; +} + + Error Request::Send(SocketDescriptor* sd, const ReceiversList& receivers_list) { for (auto receiver_uri : receivers_list) { @@ -61,20 +82,13 @@ Error Request::Send(SocketDescriptor* sd, const ReceiversList& receivers_list) { if (err != nullptr ) continue; } - auto err = SendHeaderAndData(*sd); - if (err != nullptr ) { - io__->CloseSocket(*sd, nullptr); - *sd = kDisconnectedSocketDescriptor; - continue; - } - - err = ReceiveResponce(*sd); - + auto err = TrySendToReceiver(*sd,receiver_uri); if (err != nullptr && err != ProducerErrorTemplates::kFileIdAlreadyInUse) { io__->CloseSocket(*sd, nullptr); *sd = kDisconnectedSocketDescriptor; continue; } + callback_(header_, std::move(err)); return nullptr; diff --git a/producer/api/src/request.h b/producer/api/src/request.h index eb2985547ad70285cd079dea8ab4b633e088d6d7..f12627176b0d5b74d312b98b4c0193fbc4e8e065 100644 --- a/producer/api/src/request.h +++ b/producer/api/src/request.h @@ -15,11 +15,13 @@ class Request { explicit Request(const asapo::IO* io, const GenericNetworkRequestHeader& header, const void* data, RequestCallback callback); Error Send(SocketDescriptor* sd, const ReceiversList& receivers_list); - const asapo::IO* io__; + const IO* io__; + const AbstractLogger* log__; private: Error ConnectToReceiver(SocketDescriptor* sd, const std::string& receiver_address); - Error SendHeaderAndData(SocketDescriptor sd); - Error ReceiveResponce(SocketDescriptor sd); + Error SendHeaderAndData(SocketDescriptor sd,const std::string& receiver_address); + Error ReceiveResponse(SocketDescriptor sd, const std::string &receiver_address); + Error TrySendToReceiver(SocketDescriptor sd, const std::string& receiver_address); GenericNetworkRequestHeader header_; const void* data_; RequestCallback callback_; diff --git a/producer/api/unittests/test_producer_impl.cpp b/producer/api/unittests/test_producer_impl.cpp index 768bd8d331e4a5ff0347726fefe09514d9a3c5d9..6eb4b274309f28fa262a4f88b5a1722ac1714479 100644 --- a/producer/api/unittests/test_producer_impl.cpp +++ b/producer/api/unittests/test_producer_impl.cpp @@ -29,7 +29,7 @@ TEST(get_version, VersionAboveZero) { TEST(Producer, Logger) { asapo::ProducerImpl producer; - ASSERT_THAT(dynamic_cast<asapo::AbstractLogger*>(producer.log__.get()), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<asapo::AbstractLogger*>(producer.log__), Ne(nullptr)); } /* diff --git a/producer/api/unittests/test_request.cpp b/producer/api/unittests/test_request.cpp index 3fe3bc93589622d324997d081923bb5d3ef06ce7..8e65926f751731b8d57c6927e653a11ffaefc2ea 100644 --- a/producer/api/unittests/test_request.cpp +++ b/producer/api/unittests/test_request.cpp @@ -23,6 +23,8 @@ using ::testing::Gt; using ::testing::Eq; using ::testing::Ne; using ::testing::Mock; +using ::testing::AllOf; + using ::testing::InSequence; using ::testing::HasSubstr; @@ -34,6 +36,8 @@ TEST(Producer, Constructor) { asapo::Request request{io.get(), header, nullptr, [](asapo::GenericNetworkRequestHeader, asapo::Error) {}}; ASSERT_THAT(dynamic_cast<const asapo::IO*>(request.io__), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(request.log__), Ne(nullptr)); + } @@ -72,7 +76,7 @@ class RequestTests : public testing::Test { void ExpectOKReceive(bool only_once = true); void SetUp() override { -// request.log__ = asapo::Logger {&mock_logger}; + request.log__ = &mock_logger; } void TearDown() override { } @@ -107,6 +111,7 @@ void RequestTests::ExpectFailConnect(bool only_once) { void RequestTests::ExpectFailSendHeader(bool only_once) { + int i = 0; for (auto expected_sd : expected_sds) { EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(expected_file_id, expected_file_size), @@ -116,13 +121,20 @@ void RequestTests::ExpectFailSendHeader(bool only_once) { testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), Return(-1) )); + EXPECT_CALL(mock_logger, Debug(AllOf( + HasSubstr("cannot send header"), + HasSubstr(receivers_list[i]) + ) + )); EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); if (only_once) break; + i++; } } void RequestTests::ExpectFailSendData(bool only_once) { + int i = 0; for (auto expected_sd : expected_sds) { EXPECT_CALL(mock_io, Send_t(expected_sd, expected_file_pointer, expected_file_size, _)) .Times(1) @@ -131,14 +143,21 @@ void RequestTests::ExpectFailSendData(bool only_once) { testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), Return(-1) )); + EXPECT_CALL(mock_logger, Debug(AllOf( + HasSubstr("cannot send data"), + HasSubstr(receivers_list[i]) + ) + )); EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); if (only_once) break; + i++; } } void RequestTests::ExpectFailReceive(bool only_once) { + int i = 0; for (auto expected_sd : expected_sds) { EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _)) .Times(1) @@ -147,8 +166,14 @@ void RequestTests::ExpectFailReceive(bool only_once) { testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), testing::Return(-1) )); + EXPECT_CALL(mock_logger, Debug(AllOf( + HasSubstr("cannot receive"), + HasSubstr(receivers_list[i]) + ) + )); EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); if (only_once) break; + i++; } } @@ -195,6 +220,11 @@ void RequestTests::ExpectOKConnect(bool only_once) { testing::SetArgPointee<1>(nullptr), Return(expected_sds[i]) )); + EXPECT_CALL(mock_logger, Info(AllOf( + HasSubstr("connected"), + HasSubstr(expected_address) + ) + )); if (only_once) break; i++; } @@ -202,6 +232,7 @@ void RequestTests::ExpectOKConnect(bool only_once) { void RequestTests::ExpectOKReceive(bool only_once) { + int i = 0; for (auto expected_sd : expected_sds) { EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _)) .WillOnce( @@ -210,7 +241,13 @@ void RequestTests::ExpectOKReceive(bool only_once) { A_WriteSendDataResponse(asapo::kNetErrorNoError), testing::ReturnArg<2>() )); + EXPECT_CALL(mock_logger, Debug(AllOf( + HasSubstr("sent data"), + HasSubstr(receivers_list[i]) + ) + )); if (only_once) break; + i++; } } @@ -271,7 +308,7 @@ TEST_F(RequestTests, ErrorWhenCannotReceiveData) { -TEST_F(RequestTests, ImmediatelyRrrorIfFileAlreadyInUse) { +TEST_F(RequestTests, ImmediatelyCalBackErrorIfFileAlreadyInUse) { ExpectOKConnect(true); ExpectOKSendHeader(true); ExpectOKSendData(true); diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index 33b5e7bf4381ac4203da297f05f442db845df661..6a7663ba6189c7ece66a7fadaf1b0bd9560a2b3f 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -8,7 +8,7 @@ set(SOURCE_FILES src/statistics.cpp src/statistics_sender_influx_db.cpp src/receiver_config.cpp - src/receiver_logger.cpp + src/producer_logger.cpp src/request_handler_db_write.cpp) diff --git a/receiver/src/receiver_logger.cpp b/receiver/src/producer_logger.cpp similarity index 100% rename from receiver/src/receiver_logger.cpp rename to receiver/src/producer_logger.cpp diff --git a/receiver/src/statistics_sender_influx_db.cpp b/receiver/src/statistics_sender_influx_db.cpp index 476729904cefd41cf73702edb6e98e9dfb780ee4..886b293130ec3e5086cb2138822d54f527e3574d 100644 --- a/receiver/src/statistics_sender_influx_db.cpp +++ b/receiver/src/statistics_sender_influx_db.cpp @@ -21,7 +21,7 @@ void StatisticsSenderInfluxDb::SendStatistics(const StatisticsToSend& statistic) //todo: send statistics async HttpCode code; Error err; - auto responce = httpclient__->Post(GetReceiverConfig()->monitor_db_uri + "/write?db=" + + auto response = httpclient__->Post(GetReceiverConfig()->monitor_db_uri + "/write?db=" + GetReceiverConfig()->monitor_db_name, StatisticsToString(statistic), &code, &err); std::string msg = "sending statistics to " + GetReceiverConfig()->monitor_db_name + " at " + @@ -32,7 +32,7 @@ void StatisticsSenderInfluxDb::SendStatistics(const StatisticsToSend& statistic) } if (code != HttpCode::OK && code != HttpCode::NoContent) { - log__->Error(msg + " - " + responce); + log__->Error(msg + " - " + response); return; }