From 7fcf16ca6ed2618928e67d9710aae01d21a9bb6b Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Mon, 14 May 2018 13:06:15 +0200
Subject: [PATCH] add logs to produxer requests, refactoring

---
 common/cpp/include/unittests/MockHttpClient.h |  8 ++--
 producer/api/CMakeLists.txt                   |  2 +-
 producer/api/src/producer_impl.cpp            |  5 +--
 producer/api/src/producer_impl.h              |  2 +-
 producer/api/src/producer_logger.cpp          | 12 ++++++
 producer/api/src/producer_logger.h            | 14 +++++++
 producer/api/src/request.cpp                  | 40 ++++++++++++------
 producer/api/src/request.h                    |  8 ++--
 producer/api/unittests/test_producer_impl.cpp |  2 +-
 producer/api/unittests/test_request.cpp       | 41 ++++++++++++++++++-
 receiver/CMakeLists.txt                       |  2 +-
 ...eceiver_logger.cpp => producer_logger.cpp} |  0
 receiver/src/statistics_sender_influx_db.cpp  |  4 +-
 13 files changed, 109 insertions(+), 31 deletions(-)
 create mode 100644 producer/api/src/producer_logger.cpp
 create mode 100644 producer/api/src/producer_logger.h
 rename receiver/src/{receiver_logger.cpp => producer_logger.cpp} (100%)

diff --git a/common/cpp/include/unittests/MockHttpClient.h b/common/cpp/include/unittests/MockHttpClient.h
index dbb346f22..d128e8a84 100644
--- a/common/cpp/include/unittests/MockHttpClient.h
+++ b/common/cpp/include/unittests/MockHttpClient.h
@@ -12,15 +12,15 @@ class MockHttpClient : public HttpClient {
   public:
     std::string Get(const std::string& uri, HttpCode* code, Error* err) const noexcept override {
         ErrorInterface* error = nullptr;
-        auto responce = Get_t(uri, code, &error);
+        auto response = Get_t(uri, code, &error);
         err->reset(error);
-        return responce;
+        return response;
     }
     std::string Post(const std::string& uri, const std::string& data, HttpCode* code, Error* err) const noexcept override {
         ErrorInterface* error = nullptr;
-        auto responce = Post_t(uri, data, code, &error);
+        auto response = Post_t(uri, data, code, &error);
         err->reset(error);
-        return responce;
+        return response;
     }
     MOCK_CONST_METHOD3(Get_t,
                        std::string(const std::string& uri, HttpCode* code, ErrorInterface** err));
diff --git a/producer/api/CMakeLists.txt b/producer/api/CMakeLists.txt
index e6f0ada63..818132ff3 100644
--- a/producer/api/CMakeLists.txt
+++ b/producer/api/CMakeLists.txt
@@ -1,8 +1,8 @@
 set(TARGET_NAME producer-api)
 set(SOURCE_FILES
         src/producer.cpp
-        src/producer_impl.h
         src/producer_impl.cpp
+        src/producer_logger.cpp
         src/request.cpp
         src/request_pool.cpp src/receivers_status.cpp src/receivers_status.h)
 
diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp
index 38f74e79d..6763e92ff 100644
--- a/producer/api/src/producer_impl.cpp
+++ b/producer/api/src/producer_impl.cpp
@@ -2,6 +2,7 @@
 #include <cstring>
 
 #include "producer_impl.h"
+#include "producer_logger.h"
 #include "io/io_factory.h"
 
 namespace  asapo {
@@ -9,9 +10,7 @@ namespace  asapo {
 const uint32_t ProducerImpl::kVersion = 1;
 const size_t ProducerImpl::kMaxChunkSize = size_t(1024) * size_t(1024) * size_t(1024) * size_t(2); //2GiByte
 
-ProducerImpl::ProducerImpl(): io__{GenerateDefaultIO()} {
-    //todo get fluentd uri from service discovery
-    log__ = CreateDefaultLoggerApi("producer_api", "http://max-wgs.desy.de:9880/asapo");
+ProducerImpl::ProducerImpl(): io__{GenerateDefaultIO()},log__{GetDefaultProducerLogger()} {
 }
 
 uint64_t ProducerImpl::GetVersion() const {
diff --git a/producer/api/src/producer_impl.h b/producer/api/src/producer_impl.h
index 5710d8dd5..95d9af218 100644
--- a/producer/api/src/producer_impl.h
+++ b/producer/api/src/producer_impl.h
@@ -38,7 +38,7 @@ class ProducerImpl : public Producer {
     Error ConnectToReceiver(const std::string& receiver_address) override;
     Error Send(uint64_t file_id, const void* data, size_t file_size) override;
     std::unique_ptr<IO> io__;
-    Logger log__;
+    AbstractLogger* log__;
 };
 }
 
diff --git a/producer/api/src/producer_logger.cpp b/producer/api/src/producer_logger.cpp
new file mode 100644
index 000000000..addf22200
--- /dev/null
+++ b/producer/api/src/producer_logger.cpp
@@ -0,0 +1,12 @@
+#include "producer_logger.h"
+
+namespace asapo {
+
+
+AbstractLogger* GetDefaultProducerLogger() {
+    //todo get fluentd uri from service discovery
+    static Logger logger = CreateDefaultLoggerApi("producer_api", "http://max-wgs.desy.de:9880/asapo");
+    return logger.get();
+}
+
+}
diff --git a/producer/api/src/producer_logger.h b/producer/api/src/producer_logger.h
new file mode 100644
index 000000000..abaf35c24
--- /dev/null
+++ b/producer/api/src/producer_logger.h
@@ -0,0 +1,14 @@
+#ifndef ASAPO_PRODUCER_LOGGER_H
+#define ASAPO_PRODUCER_LOGGER_H
+
+#include "logger/logger.h"
+
+namespace asapo {
+
+
+AbstractLogger* GetDefaultProducerLogger();
+
+}
+
+
+#endif //ASAPO_PRODUCER_LOGGER_H
diff --git a/producer/api/src/request.cpp b/producer/api/src/request.cpp
index 7bd3f2bcd..bde4fcf67 100644
--- a/producer/api/src/request.cpp
+++ b/producer/api/src/request.cpp
@@ -1,11 +1,12 @@
-#include <producer/producer_error.h>
+#include "producer/producer_error.h"
 #include "request.h"
+#include "producer_logger.h"
 
 namespace asapo {
 
 Request::Request(const asapo::IO* io, const GenericNetworkRequestHeader& header, const void* data,
                  RequestCallback callback):
-    io__{io}, header_(header), data_{data}, callback_{std::move(callback)} {
+    io__{io}, log__{GetDefaultProducerLogger()},header_(header), data_{data}, callback_{std::move(callback)} {
 
 }
 
@@ -16,19 +17,22 @@ Error Request::ConnectToReceiver(SocketDescriptor* sd, const std::string& receiv
         //log__->Debug("cannot connect to receiver at " + receiver_address + " - " + err->Explain());
         return err;
     }
+    log__->Info("connected to receiver at " + receiver_address);
     return nullptr;
 }
 
-Error Request::SendHeaderAndData(SocketDescriptor sd) {
+Error Request::SendHeaderAndData(SocketDescriptor sd,const std::string& receiver_address) {
     Error io_error;
     io__->Send(sd, &header_, sizeof(header_), &io_error);
     if(io_error) {
 // todo: add meaningful message to the io_error (here and below)
+        log__->Debug("cannot send header to " + receiver_address + " - " + io_error->Explain());
         return io_error;
     }
 
     io__->Send(sd, data_, header_.data_size, &io_error);
     if(io_error) {
+        log__->Debug("cannot send data to " + receiver_address + " - " + io_error->Explain());
         return io_error;
     }
 
@@ -36,11 +40,12 @@ Error Request::SendHeaderAndData(SocketDescriptor sd) {
 }
 
 
-Error Request::ReceiveResponce(SocketDescriptor sd) {
+Error Request::ReceiveResponse(SocketDescriptor sd, const std::string &receiver_address) {
     Error err;
     SendDataResponse sendDataResponse;
     io__->Receive(sd, &sendDataResponse, sizeof(sendDataResponse), &err);
     if(err != nullptr) {
+        log__->Debug("cannot receive response from " + receiver_address + " - " + err->Explain());
         return err;
     }
 
@@ -53,6 +58,22 @@ Error Request::ReceiveResponce(SocketDescriptor sd) {
     return nullptr;
 }
 
+Error Request::TrySendToReceiver(SocketDescriptor sd, const std::string& receiver_address) {
+    auto err = SendHeaderAndData(sd,receiver_address);
+    if (err)  {
+        return err;
+    }
+
+    err = ReceiveResponse(sd, receiver_address);
+    if (err)  {
+        return err;
+    }
+
+    log__->Debug("successfully sent data to " + receiver_address);
+    return nullptr;
+}
+
+
 
 Error Request::Send(SocketDescriptor* sd, const ReceiversList& receivers_list) {
     for (auto receiver_uri : receivers_list) {
@@ -61,20 +82,13 @@ Error Request::Send(SocketDescriptor* sd, const ReceiversList& receivers_list) {
             if (err != nullptr ) continue;
         }
 
-        auto err = SendHeaderAndData(*sd);
-        if (err != nullptr )  {
-            io__->CloseSocket(*sd, nullptr);
-            *sd = kDisconnectedSocketDescriptor;
-            continue;
-        }
-
-        err = ReceiveResponce(*sd);
-
+        auto err = TrySendToReceiver(*sd,receiver_uri);
         if (err != nullptr && err != ProducerErrorTemplates::kFileIdAlreadyInUse)  {
             io__->CloseSocket(*sd, nullptr);
             *sd = kDisconnectedSocketDescriptor;
             continue;
         }
+
         callback_(header_, std::move(err));
         return nullptr;
 
diff --git a/producer/api/src/request.h b/producer/api/src/request.h
index eb2985547..f12627176 100644
--- a/producer/api/src/request.h
+++ b/producer/api/src/request.h
@@ -15,11 +15,13 @@ class Request {
     explicit Request(const asapo::IO* io, const GenericNetworkRequestHeader& header, const void* data,
                      RequestCallback callback);
     Error Send(SocketDescriptor* sd, const ReceiversList& receivers_list);
-    const asapo::IO* io__;
+    const IO* io__;
+    const AbstractLogger* log__;
   private:
     Error ConnectToReceiver(SocketDescriptor* sd, const std::string& receiver_address);
-    Error SendHeaderAndData(SocketDescriptor sd);
-    Error ReceiveResponce(SocketDescriptor sd);
+    Error SendHeaderAndData(SocketDescriptor sd,const std::string& receiver_address);
+    Error ReceiveResponse(SocketDescriptor sd, const std::string &receiver_address);
+    Error TrySendToReceiver(SocketDescriptor sd, const std::string& receiver_address);
     GenericNetworkRequestHeader header_;
     const void* data_;
     RequestCallback callback_;
diff --git a/producer/api/unittests/test_producer_impl.cpp b/producer/api/unittests/test_producer_impl.cpp
index 768bd8d33..6eb4b2743 100644
--- a/producer/api/unittests/test_producer_impl.cpp
+++ b/producer/api/unittests/test_producer_impl.cpp
@@ -29,7 +29,7 @@ TEST(get_version, VersionAboveZero) {
 
 TEST(Producer, Logger) {
     asapo::ProducerImpl producer;
-    ASSERT_THAT(dynamic_cast<asapo::AbstractLogger*>(producer.log__.get()), Ne(nullptr));
+    ASSERT_THAT(dynamic_cast<asapo::AbstractLogger*>(producer.log__), Ne(nullptr));
 }
 
 /*
diff --git a/producer/api/unittests/test_request.cpp b/producer/api/unittests/test_request.cpp
index 3fe3bc935..8e65926f7 100644
--- a/producer/api/unittests/test_request.cpp
+++ b/producer/api/unittests/test_request.cpp
@@ -23,6 +23,8 @@ using ::testing::Gt;
 using ::testing::Eq;
 using ::testing::Ne;
 using ::testing::Mock;
+using ::testing::AllOf;
+
 
 using ::testing::InSequence;
 using ::testing::HasSubstr;
@@ -34,6 +36,8 @@ TEST(Producer, Constructor) {
     asapo::Request request{io.get(), header, nullptr, [](asapo::GenericNetworkRequestHeader, asapo::Error) {}};
 
     ASSERT_THAT(dynamic_cast<const asapo::IO*>(request.io__), Ne(nullptr));
+    ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(request.log__), Ne(nullptr));
+
 }
 
 
@@ -72,7 +76,7 @@ class RequestTests : public testing::Test {
     void ExpectOKReceive(bool only_once = true);
 
     void SetUp() override {
-//        request.log__ = asapo::Logger {&mock_logger};
+        request.log__ = &mock_logger;
     }
     void TearDown() override {
     }
@@ -107,6 +111,7 @@ void RequestTests::ExpectFailConnect(bool only_once) {
 
 
 void RequestTests::ExpectFailSendHeader(bool only_once) {
+    int i = 0;
     for (auto expected_sd : expected_sds) {
         EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(expected_file_id,
                                     expected_file_size),
@@ -116,13 +121,20 @@ void RequestTests::ExpectFailSendHeader(bool only_once) {
                 testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()),
                 Return(-1)
             ));
+        EXPECT_CALL(mock_logger, Debug(AllOf(
+            HasSubstr("cannot send header"),
+            HasSubstr(receivers_list[i])
+                                       )
+        ));
         EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _));
         if (only_once) break;
+        i++;
     }
 
 }
 
 void RequestTests::ExpectFailSendData(bool only_once) {
+    int i = 0;
     for (auto expected_sd : expected_sds) {
         EXPECT_CALL(mock_io, Send_t(expected_sd, expected_file_pointer, expected_file_size, _))
         .Times(1)
@@ -131,14 +143,21 @@ void RequestTests::ExpectFailSendData(bool only_once) {
                 testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()),
                 Return(-1)
             ));
+        EXPECT_CALL(mock_logger, Debug(AllOf(
+            HasSubstr("cannot send data"),
+            HasSubstr(receivers_list[i])
+                                       )
+        ));
         EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _));
         if (only_once) break;
+        i++;
     }
 
 }
 
 
 void RequestTests::ExpectFailReceive(bool only_once) {
+    int i = 0;
     for (auto expected_sd : expected_sds) {
         EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _))
         .Times(1)
@@ -147,8 +166,14 @@ void RequestTests::ExpectFailReceive(bool only_once) {
                 testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()),
                 testing::Return(-1)
             ));
+        EXPECT_CALL(mock_logger, Debug(AllOf(
+            HasSubstr("cannot receive"),
+            HasSubstr(receivers_list[i])
+                                      )
+        ));
         EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _));
         if (only_once) break;
+        i++;
     }
 
 }
@@ -195,6 +220,11 @@ void RequestTests::ExpectOKConnect(bool only_once) {
                 testing::SetArgPointee<1>(nullptr),
                 Return(expected_sds[i])
             ));
+        EXPECT_CALL(mock_logger, Info(AllOf(
+            HasSubstr("connected"),
+            HasSubstr(expected_address)
+          )
+        ));
         if (only_once) break;
         i++;
     }
@@ -202,6 +232,7 @@ void RequestTests::ExpectOKConnect(bool only_once) {
 
 
 void RequestTests::ExpectOKReceive(bool only_once) {
+    int i = 0;
     for (auto expected_sd : expected_sds) {
         EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _))
         .WillOnce(
@@ -210,7 +241,13 @@ void RequestTests::ExpectOKReceive(bool only_once) {
                 A_WriteSendDataResponse(asapo::kNetErrorNoError),
                 testing::ReturnArg<2>()
             ));
+        EXPECT_CALL(mock_logger, Debug(AllOf(
+            HasSubstr("sent data"),
+            HasSubstr(receivers_list[i])
+                                       )
+        ));
         if (only_once) break;
+        i++;
     }
 }
 
@@ -271,7 +308,7 @@ TEST_F(RequestTests, ErrorWhenCannotReceiveData) {
 
 
 
-TEST_F(RequestTests, ImmediatelyRrrorIfFileAlreadyInUse) {
+TEST_F(RequestTests, ImmediatelyCalBackErrorIfFileAlreadyInUse) {
     ExpectOKConnect(true);
     ExpectOKSendHeader(true);
     ExpectOKSendData(true);
diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt
index 33b5e7bf4..6a7663ba6 100644
--- a/receiver/CMakeLists.txt
+++ b/receiver/CMakeLists.txt
@@ -8,7 +8,7 @@ set(SOURCE_FILES
         src/statistics.cpp
         src/statistics_sender_influx_db.cpp
         src/receiver_config.cpp
-        src/receiver_logger.cpp
+        src/producer_logger.cpp
         src/request_handler_db_write.cpp)
 
 
diff --git a/receiver/src/receiver_logger.cpp b/receiver/src/producer_logger.cpp
similarity index 100%
rename from receiver/src/receiver_logger.cpp
rename to receiver/src/producer_logger.cpp
diff --git a/receiver/src/statistics_sender_influx_db.cpp b/receiver/src/statistics_sender_influx_db.cpp
index 476729904..886b29313 100644
--- a/receiver/src/statistics_sender_influx_db.cpp
+++ b/receiver/src/statistics_sender_influx_db.cpp
@@ -21,7 +21,7 @@ void StatisticsSenderInfluxDb::SendStatistics(const StatisticsToSend& statistic)
     //todo: send statistics async
     HttpCode code;
     Error err;
-    auto responce = httpclient__->Post(GetReceiverConfig()->monitor_db_uri + "/write?db=" +
+    auto response = httpclient__->Post(GetReceiverConfig()->monitor_db_uri + "/write?db=" +
                                        GetReceiverConfig()->monitor_db_name, StatisticsToString(statistic),
                                        &code, &err);
     std::string msg = "sending statistics to " + GetReceiverConfig()->monitor_db_name + " at " +
@@ -32,7 +32,7 @@ void StatisticsSenderInfluxDb::SendStatistics(const StatisticsToSend& statistic)
     }
 
     if (code != HttpCode::OK && code != HttpCode::NoContent) {
-        log__->Error(msg + " - " + responce);
+        log__->Error(msg + " - " + response);
         return;
     }
 
-- 
GitLab