From c50f9eb66f9936b33dce94cc86a6dd20683d0eb6 Mon Sep 17 00:00:00 2001
From: George Sedov <george.sedov@desy.de>
Date: Wed, 19 Oct 2022 17:40:02 +0200
Subject: [PATCH] Add optional IB source for data in cache

---
 common/cpp/include/asapo/common/data_structs.h |  1 +
 common/cpp/src/data_structs/data_structs.cpp   |  4 +++-
 consumer/api/cpp/src/tcp_connection_pool.cpp   | 10 +++++++---
 consumer/api/cpp/src/tcp_connection_pool.h     |  2 +-
 consumer/api/cpp/src/tcp_consumer_client.cpp   |  2 +-
 consumer/api/cpp/unittests/mocking.h           |  6 +++---
 .../cpp/unittests/test_tcp_connection_pool.cpp | 16 ++++++++--------
 .../cpp/unittests/test_tcp_consumer_client.cpp |  6 +++---
 receiver/src/main.cpp                          |  3 +++
 receiver/src/receiver_config.cpp               | 18 ++++++++++++++++++
 .../receiver_data_server_config.h              |  1 +
 .../request_handler_db_write.cpp               |  1 +
 12 files changed, 50 insertions(+), 20 deletions(-)

diff --git a/common/cpp/include/asapo/common/data_structs.h b/common/cpp/include/asapo/common/data_structs.h
index 7b06404dd..d29301909 100644
--- a/common/cpp/include/asapo/common/data_structs.h
+++ b/common/cpp/include/asapo/common/data_structs.h
@@ -36,6 +36,7 @@ class MessageMeta {
   uint64_t size{0};
   uint64_t id{0};
   std::string source;
+  std::string ib_source;
   std::string metadata;
   uint64_t buf_id{0};
   std::string stream; // might be "unknownStream" for older datasets
diff --git a/common/cpp/src/data_structs/data_structs.cpp b/common/cpp/src/data_structs/data_structs.cpp
index e405a65d3..30869d441 100644
--- a/common/cpp/src/data_structs/data_structs.cpp
+++ b/common/cpp/src/data_structs/data_structs.cpp
@@ -69,6 +69,7 @@ std::string MessageMeta::Json() const {
                     "\"timestamp\":"
                     + std::to_string(nanoseconds_from_epoch) + ","
                     "\"source\":\"" + source + "\","
+                    "\"ib_source\":\"" + ib_source + "\","
                     "\"buf_id\":" + std::to_string(buf_id_int) + ","
                     "\"stream\":\"" + stream + "\","
                     "\"dataset_substream\":" + std::to_string(dataset_substream) + ","
@@ -139,8 +140,9 @@ bool MessageMeta::SetFromJson(const std::string& json_string) {
         *this = old;
         return false;
     }
-    //ignore error if ingest_mode not found
+    //ignore error if the following not found
     parser.GetUInt64("ingest_mode", &ingest_mode);
+    parser.GetString("ib_source", &ib_source);
 
     return true;
 }
diff --git a/consumer/api/cpp/src/tcp_connection_pool.cpp b/consumer/api/cpp/src/tcp_connection_pool.cpp
index 38bc9a762..24d93b3ec 100644
--- a/consumer/api/cpp/src/tcp_connection_pool.cpp
+++ b/consumer/api/cpp/src/tcp_connection_pool.cpp
@@ -18,11 +18,11 @@ SocketDescriptor TcpConnectionPool::Connect(const std::string& source, Error* er
     return sd;
 }
 
-SocketDescriptor TcpConnectionPool::GetFreeConnection(const std::string& source, bool* reused, Error* err) {
+SocketDescriptor TcpConnectionPool::GetFreeConnection(const std::string& source, const std::string& optional_source, bool* reused, Error* err) {
     std::lock_guard<std::mutex> lock{mutex_};
 
     for (auto& connection : connections_) {
-        if (source == connection.uri && !connection.in_use) {
+        if ((optional_source == connection.uri || source == connection.uri) && !connection.in_use) {
             connection.in_use = true;
             *err = nullptr;
             *reused = true;
@@ -30,7 +30,11 @@ SocketDescriptor TcpConnectionPool::GetFreeConnection(const std::string& source,
         }
     }
 
-    auto sd = Connect(source, err);
+    auto sd = Connect(optional_source.empty() ? source : optional_source, err);
+    if (*err != nullptr && !optional_source.empty()) {
+        // optional source (IB connection) failed for whatever reason, fallback
+        sd = Connect(source, err);
+    }
     if (*err == nullptr) {
         *reused = false;
         TcpConnectionInfo connection{source, sd, true};
diff --git a/consumer/api/cpp/src/tcp_connection_pool.h b/consumer/api/cpp/src/tcp_connection_pool.h
index 963d92036..b0636d3ec 100644
--- a/consumer/api/cpp/src/tcp_connection_pool.h
+++ b/consumer/api/cpp/src/tcp_connection_pool.h
@@ -16,7 +16,7 @@ struct TcpConnectionInfo {
 
 class TcpConnectionPool {
   public:
-    ASAPO_VIRTUAL SocketDescriptor GetFreeConnection(const std::string& source, bool* reused, Error* err);
+    ASAPO_VIRTUAL SocketDescriptor GetFreeConnection(const std::string& source, const std::string& optional_source, bool* reused, Error* err);
     ASAPO_VIRTUAL SocketDescriptor Reconnect(SocketDescriptor sd, Error* err);
     ASAPO_VIRTUAL  void ReleaseConnection(SocketDescriptor sd);
     ASAPO_VIRTUAL ~TcpConnectionPool() = default;
diff --git a/consumer/api/cpp/src/tcp_consumer_client.cpp b/consumer/api/cpp/src/tcp_consumer_client.cpp
index c0bf7108e..552db7686 100644
--- a/consumer/api/cpp/src/tcp_consumer_client.cpp
+++ b/consumer/api/cpp/src/tcp_consumer_client.cpp
@@ -104,7 +104,7 @@ Error TcpConsumerClient::ReceiveData(SocketDescriptor sd, const MessageMeta* inf
 Error TcpConsumerClient::GetData(const MessageMeta* info, const std::string& request_sender_details, MessageData* data) {
     Error err;
     bool reused;
-    auto sd = connection_pool__->GetFreeConnection(info->source, &reused, &err);
+    auto sd = connection_pool__->GetFreeConnection(info->source, info->ib_source, &reused, &err);
     if (err != nullptr) {
         return err;
     }
diff --git a/consumer/api/cpp/unittests/mocking.h b/consumer/api/cpp/unittests/mocking.h
index fa0784bc2..e293abb0c 100644
--- a/consumer/api/cpp/unittests/mocking.h
+++ b/consumer/api/cpp/unittests/mocking.h
@@ -24,13 +24,13 @@ class MockNetClient : public asapo::NetClient {
 class MockTCPConnectionPool : public asapo::TcpConnectionPool {
   public:
 
-    SocketDescriptor GetFreeConnection(const std::string& source, bool* reused, Error* err) override {
+    SocketDescriptor GetFreeConnection(const std::string& source, const std::string& optional_source, bool* reused, Error* err) override {
         ErrorInterface* error = nullptr;
-        auto data = GetFreeConnection_t(source, reused, &error);
+        auto data = GetFreeConnection_t(source, optional_source, reused, &error);
         err->reset(error);
         return data;
     }
-    MOCK_METHOD3(GetFreeConnection_t, SocketDescriptor (const std::string&, bool* reused, ErrorInterface**));
+    MOCK_METHOD4(GetFreeConnection_t, SocketDescriptor (const std::string&, const std::string&, bool* reused, ErrorInterface**));
 
     SocketDescriptor Reconnect(SocketDescriptor sd, Error* err) override {
         ErrorInterface* error = nullptr;
diff --git a/consumer/api/cpp/unittests/test_tcp_connection_pool.cpp b/consumer/api/cpp/unittests/test_tcp_connection_pool.cpp
index d8e3f9f36..1348e41b3 100644
--- a/consumer/api/cpp/unittests/test_tcp_connection_pool.cpp
+++ b/consumer/api/cpp/unittests/test_tcp_connection_pool.cpp
@@ -81,7 +81,7 @@ TEST_F(TcpConnectioPoolTests, GetConnectionCreatesNewOne) {
     ExpectSingleConnect();
 
     Error err;
-    auto sd = pool.GetFreeConnection(expected_source, &reused, &err);
+    auto sd = pool.GetFreeConnection(expected_source, "", &reused, &err);
 
     ASSERT_THAT(err, Eq(nullptr));
     ASSERT_THAT(reused, Eq(false));
@@ -93,8 +93,8 @@ TEST_F(TcpConnectioPoolTests, GetTwoConnection) {
     ExpectTwoConnects();
 
     Error err;
-    auto sd1 = pool.GetFreeConnection(expected_source, &reused, &err);
-    auto sd2 = pool.GetFreeConnection(expected_source, &reused, &err);
+    auto sd1 = pool.GetFreeConnection(expected_source, "", &reused, &err);
+    auto sd2 = pool.GetFreeConnection(expected_source, "", &reused, &err);
 
     ASSERT_THAT(err, Eq(nullptr));
     ASSERT_THAT(sd1, Eq(expected_sd));
@@ -106,9 +106,9 @@ TEST_F(TcpConnectioPoolTests, GetConnectionUsesConnectionPool) {
     ExpectSingleConnect();
 
     Error err;
-    auto sd1 = pool.GetFreeConnection(expected_source, &reused, &err);
+    auto sd1 = pool.GetFreeConnection(expected_source, "", &reused, &err);
     pool.ReleaseConnection(sd1);
-    auto sd2 = pool.GetFreeConnection(expected_source, &reused, &err);
+    auto sd2 = pool.GetFreeConnection(expected_source, "", &reused, &err);
 
     ASSERT_THAT(reused, Eq(true));
     ASSERT_THAT(err, Eq(nullptr));
@@ -126,7 +126,7 @@ TEST_F(TcpConnectioPoolTests, CannotConnect) {
         ));
 
     Error err, err1;
-    auto sd = pool.GetFreeConnection(expected_source, &reused, &err);
+    auto sd = pool.GetFreeConnection(expected_source, "", &reused, &err);
     auto sd1 = pool.Reconnect(sd, &err1);
     ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kInvalidAddressFormat));
     ASSERT_THAT(err1, Ne(nullptr));
@@ -138,7 +138,7 @@ TEST_F(TcpConnectioPoolTests, CanReconnect) {
     ExpectTwoConnects();
 
     Error err;
-    auto sd1 = pool.GetFreeConnection(expected_source, &reused, &err);
+    auto sd1 = pool.GetFreeConnection(expected_source, "", &reused, &err);
     pool.ReleaseConnection(sd1);
     auto sd2 = pool.Reconnect(sd1, &err);
 
@@ -151,7 +151,7 @@ TEST_F(TcpConnectioPoolTests, ReconnectionFails) {
     ExpectTwoConnects(true);
 
     Error err1, err2, err3;
-    auto sd1 = pool.GetFreeConnection(expected_source, &reused, &err1);
+    auto sd1 = pool.GetFreeConnection(expected_source, "", &reused, &err1);
     pool.ReleaseConnection(sd1);
     auto sd2 = pool.Reconnect(sd1, &err2);
     auto sd3 = pool.Reconnect(sd2, &err3); // this reconnect should not work as the record has been removed
diff --git a/consumer/api/cpp/unittests/test_tcp_consumer_client.cpp b/consumer/api/cpp/unittests/test_tcp_consumer_client.cpp
index 9c0b28f10..4c7bd4dd2 100644
--- a/consumer/api/cpp/unittests/test_tcp_consumer_client.cpp
+++ b/consumer/api/cpp/unittests/test_tcp_consumer_client.cpp
@@ -77,10 +77,10 @@ class TcpClientTests : public Test {
     }
 
     void ExpectNewConnection(bool reused, bool ok) {
-        EXPECT_CALL(mock_connection_pool, GetFreeConnection_t(expected_uri, _, _)).WillOnce(
+        EXPECT_CALL(mock_connection_pool, GetFreeConnection_t(expected_uri, "", _, _)).WillOnce(
             DoAll(
-                SetArgPointee<1>(reused),
-                SetArgPointee<2>(ok ? nullptr : asapo::IOErrorTemplates::kUnknownIOError.Generate().release()),
+                SetArgPointee<2>(reused),
+                SetArgPointee<3>(ok ? nullptr : asapo::IOErrorTemplates::kUnknownIOError.Generate().release()),
                 Return(ok ? expected_sd : asapo::kDisconnectedSocketDescriptor))
         );
     }
diff --git a/receiver/src/main.cpp b/receiver/src/main.cpp
index c2b9c63bd..505b298c2 100644
--- a/receiver/src/main.cpp
+++ b/receiver/src/main.cpp
@@ -95,6 +95,9 @@ int StartReceiver(const asapo::ReceiverConfig* config, asapo::SharedCache cache,
     auto receiver = std::unique_ptr<asapo::Receiver>{new asapo::Receiver(cache,monitoring, kafkaClient)};
 
     logger->Info("listening on " + address);
+    if (!config->dataserver.advertise_ib_uri.empty()) {
+        logger->Info("additional ib " + address);
+    }
 
     asapo::Error err;
     receiver->Listen(address, &err);
diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp
index 39e40d925..db98730a4 100644
--- a/receiver/src/receiver_config.cpp
+++ b/receiver/src/receiver_config.cpp
@@ -3,6 +3,8 @@
 #include "asapo/json_parser/json_parser.h"
 
 #include <iostream>
+#include <unistd.h>
+#include <netdb.h>
 
 namespace asapo {
 
@@ -64,6 +66,22 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) {
         }
     }
 
+    // try to discover an IB address. It should be resolved by <hostname>-ib.desy.de, if exists
+    char hostname[250];
+    gethostname(hostname, 250);
+    std::string hostnamestr {hostname};
+    auto pos = hostnamestr.find('.');
+    if (pos != std::string::npos) {
+        hostnamestr = hostnamestr.substr(0, pos) + "-ib" + hostnamestr.substr(pos);
+    } else {
+        hostnamestr += "-ib";
+    }
+    auto record = gethostbyname(hostnamestr.c_str());
+    if (record != nullptr && record->h_addr_list != nullptr) {
+        pos = config.dataserver.advertise_uri.find(':');
+        config.dataserver.advertise_ib_uri = record->h_addr_list[0] + config.dataserver.advertise_uri.substr(pos);
+    }
+
     config.dataserver.tag = config.tag + "_ds";
 
     config.log_level = StringToLogLevel(log_level, &err);
diff --git a/receiver/src/receiver_data_server/receiver_data_server_config.h b/receiver/src/receiver_data_server/receiver_data_server_config.h
index 208c87d96..3163c10d1 100644
--- a/receiver/src/receiver_data_server/receiver_data_server_config.h
+++ b/receiver/src/receiver_data_server/receiver_data_server_config.h
@@ -10,6 +10,7 @@ struct ReceiverDataServerConfig {
     uint64_t nthreads = 0;
     std::string tag;
     std::string advertise_uri;
+    std::string advertise_ib_uri;
     std::vector<std::string> network_mode;
 };
 
diff --git a/receiver/src/request_handler/request_handler_db_write.cpp b/receiver/src/request_handler/request_handler_db_write.cpp
index e715287b0..070559bff 100644
--- a/receiver/src/request_handler/request_handler_db_write.cpp
+++ b/receiver/src/request_handler/request_handler_db_write.cpp
@@ -82,6 +82,7 @@ MessageMeta RequestHandlerDbWrite::PrepareMessageMeta(const Request* request) co
     message_meta.buf_id = request->GetSlotId();
     message_meta.stream = request->GetStream();
     message_meta.source = GetReceiverConfig()->dataserver.advertise_uri;
+    message_meta.ib_source = GetReceiverConfig()->dataserver.advertise_ib_uri;
     message_meta.metadata = request->GetMetaData();
     message_meta.timestamp = std::chrono::system_clock::now();
     return message_meta;
-- 
GitLab