From a4737bcdcf6b503f3df7be1a68937ae6a6284fca Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Thu, 31 May 2018 11:43:59 +0200
Subject: [PATCH] refactor producer

---
 .../dummy_data_producer.cpp                   |   1 -
 .../api/src/receiver_discovery_service.cpp    |  11 +-
 producer/api/src/receiver_discovery_service.h |   1 +
 .../api/src/request_handler_filesystem.cpp    |   3 -
 producer/api/src/request_handler_tcp.cpp      | 101 +++++++++++-------
 producer/api/src/request_handler_tcp.h        |  19 ++--
 6 files changed, 85 insertions(+), 51 deletions(-)

diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
index 5add8c660..3b4a76e08 100644
--- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp
+++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
@@ -1,7 +1,6 @@
 #include <iostream>
 #include <chrono>
 #include <vector>
-#include <tuple>
 #include <mutex>
 #include <thread>
 
diff --git a/producer/api/src/receiver_discovery_service.cpp b/producer/api/src/receiver_discovery_service.cpp
index 64e2f4227..5ca5f7315 100644
--- a/producer/api/src/receiver_discovery_service.cpp
+++ b/producer/api/src/receiver_discovery_service.cpp
@@ -47,6 +47,13 @@ Error ReceiverDiscoveryService::UpdateFromEndpoint(ReceiversList* list, uint64_t
 
 }
 
+void ReceiverDiscoveryService::LogUriList(const ReceiversList& uris) {
+    std::string s;
+    s = std::accumulate(std::begin(uris), std::end(uris), s);
+    log__->Debug("got receivers from " + endpoint_ + ":" + s);
+}
+
+
 void ReceiverDiscoveryService::ThreadHandler() {
     std::unique_lock<std::mutex> lock(mutex_);
     do {
@@ -59,9 +66,7 @@ void ReceiverDiscoveryService::ThreadHandler() {
             lock.lock();
             continue;
         }
-        std::string s;
-        s = std::accumulate(std::begin(uris), std::end(uris), s);
-        log__->Debug("got receivers from " + endpoint_ + ":" + s);
+        LogUriList(uris);
         lock.lock();
         max_connections_ = max_connections;
         uri_list_ = uris;
diff --git a/producer/api/src/receiver_discovery_service.h b/producer/api/src/receiver_discovery_service.h
index 54b0ac09c..c9893423e 100644
--- a/producer/api/src/receiver_discovery_service.h
+++ b/producer/api/src/receiver_discovery_service.h
@@ -31,6 +31,7 @@ class ReceiverDiscoveryService {
     void ThreadHandler();
     Error UpdateFromEndpoint(ReceiversList* list, uint64_t* max_connections);
     Error ParseResponse(const std::string& responce, ReceiversList* list, uint64_t* max_connections);
+    void LogUriList(const ReceiversList& uris);
     std::string endpoint_;
     std::thread thread_;
     std::condition_variable condition_;
diff --git a/producer/api/src/request_handler_filesystem.cpp b/producer/api/src/request_handler_filesystem.cpp
index 8e35164f3..93ac0b499 100644
--- a/producer/api/src/request_handler_filesystem.cpp
+++ b/producer/api/src/request_handler_filesystem.cpp
@@ -7,15 +7,12 @@
 
 namespace asapo {
 
-
 RequestHandlerFilesystem::RequestHandlerFilesystem(std::string destination_folder, uint64_t thread_id):
     io__{GenerateDefaultIO()}, log__{GetDefaultProducerLogger()}, destination_folder_{std::move(destination_folder)},
     thread_id_{thread_id} {
 
 }
 
-
-
 Error RequestHandlerFilesystem::ProcessRequestUnlocked(const Request* request) {
     std::string fullpath = destination_folder_ + "/" + request->header.file_name + ".bin";
     auto err = io__->WriteDataToFile(fullpath, (uint8_t*)request->data, request->header.data_size);
diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp
index 4708c5acb..3b5972d8a 100644
--- a/producer/api/src/request_handler_tcp.cpp
+++ b/producer/api/src/request_handler_tcp.cpp
@@ -22,10 +22,11 @@ Error RequestHandlerTcp::ConnectToReceiver(const std::string& receiver_address)
         return err;
     }
     log__->Info("connected to receiver at " + receiver_address);
+    connected_receiver_uri_ = receiver_address;
     return nullptr;
 }
 
-Error RequestHandlerTcp::SendHeaderAndData(const Request* request, const std::string& receiver_address) {
+Error RequestHandlerTcp::SendHeaderAndData(const Request* request) {
     Error io_error;
     io__->Send(sd_, &(request->header), sizeof(request->header), &io_error);
     if(io_error) {
@@ -40,7 +41,7 @@ Error RequestHandlerTcp::SendHeaderAndData(const Request* request, const std::st
     return nullptr;
 }
 
-Error RequestHandlerTcp::ReceiveResponse(const std::string& receiver_address) {
+Error RequestHandlerTcp::ReceiveResponse() {
     Error err;
     SendDataResponse sendDataResponse;
     io__->Receive(sd_, &sendDataResponse, sizeof(sendDataResponse), &err);
@@ -57,69 +58,93 @@ Error RequestHandlerTcp::ReceiveResponse(const std::string& receiver_address) {
     return nullptr;
 }
 
-Error RequestHandlerTcp::TrySendToReceiver(const Request* request, const std::string& receiver_address) {
-    auto err = SendHeaderAndData(request, receiver_address);
+Error RequestHandlerTcp::TrySendToReceiver(const Request* request) {
+    auto err = SendHeaderAndData(request);
     if (err)  {
         return err;
     }
 
-    err = ReceiveResponse(receiver_address);
+    err = ReceiveResponse();
     if (err)  {
         return err;
     }
 
     log__->Debug(std::string("successfully sent data ") + " id: " + std::to_string(request->header.data_id) + " to " +
-                 receiver_address);
+        connected_receiver_uri_);
     return nullptr;
 }
 
 
-void RequestHandlerTcp::UpdateReceiversUriIfNewConnection() {
-    if (sd_ != kDisconnectedSocketDescriptor)
+void RequestHandlerTcp::UpdateIfNewConnection() {
+    if (Connected())
         return;
-    receivers_list_ = discovery_service__->RotatedUriList(thread_id_);
-    last_receivers_uri_update_ = high_resolution_clock::now();
+    UpdateReceiversList();
     (*ncurrent_connections_)++;
 }
 
-bool RequestHandlerTcp::CheckForRebalance() {
-    if (sd_ == kDisconnectedSocketDescriptor)
+bool RequestHandlerTcp::UpdateReceiversList() {
+    auto thread_receivers_new = discovery_service__->RotatedUriList(thread_id_);
+    last_receivers_uri_update_ = high_resolution_clock::now();
+    if (thread_receivers_new != receivers_list_) {
+        receivers_list_ = thread_receivers_new;
+        return true;
+    }
+    return false;
+}
+
+bool RequestHandlerTcp::TimeToUpdateReceiverList() {
+    uint64_t elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( high_resolution_clock::now() -
+        last_receivers_uri_update_).count();
+    return elapsed_ms > discovery_service__->UpdateFrequency();
+}
+
+
+bool RequestHandlerTcp::Disconnected() {
+    return !Connected();
+}
+
+
+bool RequestHandlerTcp::NeedRebalance() {
+    if (Disconnected())
         return false;
 
-    auto now =  high_resolution_clock::now();
-    uint64_t elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( now -
-                          last_receivers_uri_update_).count();
-    bool rebalance = false;
-    if (elapsed_ms > discovery_service__->UpdateFrequency()) {
-        auto thread_receivers_new = discovery_service__->RotatedUriList(thread_id_);
-        last_receivers_uri_update_ = now;
-        if (thread_receivers_new != receivers_list_) {
-            receivers_list_ = thread_receivers_new;
-            rebalance = true;
-        }
+    if (TimeToUpdateReceiverList()) {
+        return UpdateReceiversList();
     }
-    return rebalance;
+    return false;
+}
+
+void RequestHandlerTcp::CloseConnectionToPeformRebalance() {
+    io__->CloseSocket(sd_, nullptr);
+    log__->Info("rebalancing");
+    sd_ = kDisconnectedSocketDescriptor;
+}
+
+void RequestHandlerTcp::Disconnect() {
+    io__->CloseSocket(sd_, nullptr);
+    sd_ = kDisconnectedSocketDescriptor;
+    log__->Debug("disconnected from  " + connected_receiver_uri_);
+    connected_receiver_uri_.clear();
+}
+
+bool RequestHandlerTcp::ServerError(const Error& err) {
+    return err != nullptr && err != ProducerErrorTemplates::kFileIdAlreadyInUse;
 }
 
 Error RequestHandlerTcp::ProcessRequestUnlocked(const Request* request) {
-    bool rebalance = CheckForRebalance();
-    if (rebalance && sd_ != kDisconnectedSocketDescriptor) {
-        io__->CloseSocket(sd_, nullptr);
-        log__->Info("rebalancing");
-        sd_ = kDisconnectedSocketDescriptor;
+    if (NeedRebalance()) {
+        CloseConnectionToPeformRebalance();
     }
     for (auto receiver_uri : receivers_list_) {
-        if (sd_ == kDisconnectedSocketDescriptor) {
+        if (Disconnected()) {
             auto err = ConnectToReceiver(receiver_uri);
             if (err != nullptr ) continue;
         }
 
-        auto err = TrySendToReceiver(request, receiver_uri);
-        if (err != nullptr && err != ProducerErrorTemplates::kFileIdAlreadyInUse)  {
-            io__->CloseSocket(sd_, nullptr);
-            sd_ = kDisconnectedSocketDescriptor;
+        auto err = TrySendToReceiver(request);
+        if (ServerError(err))  {
+            Disconnect();
             log__->Debug("cannot send data to " + receiver_uri + ": " + err->Explain());
-            log__->Debug("disconnected from  " + receiver_uri);
             continue;
         }
 
@@ -131,7 +156,7 @@ Error RequestHandlerTcp::ProcessRequestUnlocked(const Request* request) {
     return ProducerErrorTemplates::kCannotSendDataToReceivers.Generate();
 }
 
-bool RequestHandlerTcp::IsConnected() {
+bool RequestHandlerTcp::Connected() {
     return sd_ != kDisconnectedSocketDescriptor;
 }
 
@@ -140,11 +165,11 @@ bool RequestHandlerTcp::CanCreateNewConnections() {
 }
 
 bool RequestHandlerTcp::ReadyProcessRequest() {
-    return IsConnected() || CanCreateNewConnections();
+    return Connected() || CanCreateNewConnections();
 }
 
 void RequestHandlerTcp::PrepareProcessingRequestLocked() {
-    UpdateReceiversUriIfNewConnection();
+    UpdateIfNewConnection();
 }
 
 void RequestHandlerTcp::TearDownProcessingRequestLocked(const Error& error_from_process) {
diff --git a/producer/api/src/request_handler_tcp.h b/producer/api/src/request_handler_tcp.h
index 2a631dbdd..08a02132c 100644
--- a/producer/api/src/request_handler_tcp.h
+++ b/producer/api/src/request_handler_tcp.h
@@ -30,18 +30,25 @@ class RequestHandlerTcp: public RequestHandler {
     ReceiverDiscoveryService* discovery_service__;
   private:
     Error ConnectToReceiver(const std::string& receiver_address);
-    Error SendHeaderAndData(const Request*, const std::string& receiver_address);
-    Error ReceiveResponse(const std::string& receiver_address);
-    Error TrySendToReceiver(const Request* request, const std::string& receiver_address);
+    Error SendHeaderAndData(const Request*);
+    Error ReceiveResponse();
+    Error TrySendToReceiver(const Request* request);
     SocketDescriptor sd_{kDisconnectedSocketDescriptor};
-    void UpdateReceiversUriIfNewConnection();
-    bool CheckForRebalance();
+    void UpdateIfNewConnection();
+    bool UpdateReceiversList();
+    bool TimeToUpdateReceiverList();
+    bool NeedRebalance();
+    void CloseConnectionToPeformRebalance();
+    bool Disconnected();
+    void Disconnect();
+    bool ServerError(const Error& err);
     ReceiversList receivers_list_;
     high_resolution_clock::time_point last_receivers_uri_update_;
-    bool IsConnected();
+    bool Connected();
     bool CanCreateNewConnections();
     uint64_t thread_id_;
     uint64_t* ncurrent_connections_;
+    std::string connected_receiver_uri_;
 };
 }
 
-- 
GitLab