diff --git a/common/cpp/include/asapo/common/data_structs.h b/common/cpp/include/asapo/common/data_structs.h index 7b06404dd6ed1b6115635d5e594a2d971d607eb9..d29301909599de07885df777baac9747216cfe6c 100644 --- a/common/cpp/include/asapo/common/data_structs.h +++ b/common/cpp/include/asapo/common/data_structs.h @@ -36,6 +36,7 @@ class MessageMeta { uint64_t size{0}; uint64_t id{0}; std::string source; + std::string ib_source; std::string metadata; uint64_t buf_id{0}; std::string stream; // might be "unknownStream" for older datasets diff --git a/common/cpp/src/data_structs/data_structs.cpp b/common/cpp/src/data_structs/data_structs.cpp index e405a65d3f943a519a8e0cfd0046d2964a5179f7..30869d441d9c376a7637dbb239716819cf392e2a 100644 --- a/common/cpp/src/data_structs/data_structs.cpp +++ b/common/cpp/src/data_structs/data_structs.cpp @@ -69,6 +69,7 @@ std::string MessageMeta::Json() const { "\"timestamp\":" + std::to_string(nanoseconds_from_epoch) + "," "\"source\":\"" + source + "\"," + "\"ib_source\":\"" + ib_source + "\"," "\"buf_id\":" + std::to_string(buf_id_int) + "," "\"stream\":\"" + stream + "\"," "\"dataset_substream\":" + std::to_string(dataset_substream) + "," @@ -139,8 +140,9 @@ bool MessageMeta::SetFromJson(const std::string& json_string) { *this = old; return false; } - //ignore error if ingest_mode not found + //ignore error if the following not found parser.GetUInt64("ingest_mode", &ingest_mode); + parser.GetString("ib_source", &ib_source); return true; } diff --git a/consumer/api/cpp/src/tcp_connection_pool.cpp b/consumer/api/cpp/src/tcp_connection_pool.cpp index 38bc9a76242532057850002b80bc6a69ef36b019..24d93b3ec66b07c9d902b926660996e6701a45c6 100644 --- a/consumer/api/cpp/src/tcp_connection_pool.cpp +++ b/consumer/api/cpp/src/tcp_connection_pool.cpp @@ -18,11 +18,11 @@ SocketDescriptor TcpConnectionPool::Connect(const std::string& source, Error* er return sd; } -SocketDescriptor TcpConnectionPool::GetFreeConnection(const std::string& source, bool* reused, Error* err) { +SocketDescriptor TcpConnectionPool::GetFreeConnection(const std::string& source, const std::string& optional_source, bool* reused, Error* err) { std::lock_guard<std::mutex> lock{mutex_}; for (auto& connection : connections_) { - if (source == connection.uri && !connection.in_use) { + if ((optional_source == connection.uri || source == connection.uri) && !connection.in_use) { connection.in_use = true; *err = nullptr; *reused = true; @@ -30,7 +30,11 @@ SocketDescriptor TcpConnectionPool::GetFreeConnection(const std::string& source, } } - auto sd = Connect(source, err); + auto sd = Connect(optional_source.empty() ? source : optional_source, err); + if (*err != nullptr && !optional_source.empty()) { + // optional source (IB connection) failed for whatever reason, fallback + sd = Connect(source, err); + } if (*err == nullptr) { *reused = false; TcpConnectionInfo connection{source, sd, true}; diff --git a/consumer/api/cpp/src/tcp_connection_pool.h b/consumer/api/cpp/src/tcp_connection_pool.h index 963d92036db4f5fcbab92657a70c06fa9ab7df88..b0636d3ec8f47d3739e1586f01a919194467a60c 100644 --- a/consumer/api/cpp/src/tcp_connection_pool.h +++ b/consumer/api/cpp/src/tcp_connection_pool.h @@ -16,7 +16,7 @@ struct TcpConnectionInfo { class TcpConnectionPool { public: - ASAPO_VIRTUAL SocketDescriptor GetFreeConnection(const std::string& source, bool* reused, Error* err); + ASAPO_VIRTUAL SocketDescriptor GetFreeConnection(const std::string& source, const std::string& optional_source, bool* reused, Error* err); ASAPO_VIRTUAL SocketDescriptor Reconnect(SocketDescriptor sd, Error* err); ASAPO_VIRTUAL void ReleaseConnection(SocketDescriptor sd); ASAPO_VIRTUAL ~TcpConnectionPool() = default; diff --git a/consumer/api/cpp/src/tcp_consumer_client.cpp b/consumer/api/cpp/src/tcp_consumer_client.cpp index c0bf7108eab156bba4bea783371dfcc9426049df..552db7686c3a437719d4114b3b2a26c759a75d8c 100644 --- a/consumer/api/cpp/src/tcp_consumer_client.cpp +++ b/consumer/api/cpp/src/tcp_consumer_client.cpp @@ -104,7 +104,7 @@ Error TcpConsumerClient::ReceiveData(SocketDescriptor sd, const MessageMeta* inf Error TcpConsumerClient::GetData(const MessageMeta* info, const std::string& request_sender_details, MessageData* data) { Error err; bool reused; - auto sd = connection_pool__->GetFreeConnection(info->source, &reused, &err); + auto sd = connection_pool__->GetFreeConnection(info->source, info->ib_source, &reused, &err); if (err != nullptr) { return err; } diff --git a/consumer/api/cpp/unittests/mocking.h b/consumer/api/cpp/unittests/mocking.h index fa0784bc22af387094c8c8b9ef2a20fa00f68be9..e293abb0c727e8b50089fa933a9e68536fa97c24 100644 --- a/consumer/api/cpp/unittests/mocking.h +++ b/consumer/api/cpp/unittests/mocking.h @@ -24,13 +24,13 @@ class MockNetClient : public asapo::NetClient { class MockTCPConnectionPool : public asapo::TcpConnectionPool { public: - SocketDescriptor GetFreeConnection(const std::string& source, bool* reused, Error* err) override { + SocketDescriptor GetFreeConnection(const std::string& source, const std::string& optional_source, bool* reused, Error* err) override { ErrorInterface* error = nullptr; - auto data = GetFreeConnection_t(source, reused, &error); + auto data = GetFreeConnection_t(source, optional_source, reused, &error); err->reset(error); return data; } - MOCK_METHOD3(GetFreeConnection_t, SocketDescriptor (const std::string&, bool* reused, ErrorInterface**)); + MOCK_METHOD4(GetFreeConnection_t, SocketDescriptor (const std::string&, const std::string&, bool* reused, ErrorInterface**)); SocketDescriptor Reconnect(SocketDescriptor sd, Error* err) override { ErrorInterface* error = nullptr; diff --git a/consumer/api/cpp/unittests/test_tcp_connection_pool.cpp b/consumer/api/cpp/unittests/test_tcp_connection_pool.cpp index d8e3f9f363ab84763d17efb99877f7d464ddc83c..1348e41b36c43eeea81dbb1abe04af6c54864cc9 100644 --- a/consumer/api/cpp/unittests/test_tcp_connection_pool.cpp +++ b/consumer/api/cpp/unittests/test_tcp_connection_pool.cpp @@ -81,7 +81,7 @@ TEST_F(TcpConnectioPoolTests, GetConnectionCreatesNewOne) { ExpectSingleConnect(); Error err; - auto sd = pool.GetFreeConnection(expected_source, &reused, &err); + auto sd = pool.GetFreeConnection(expected_source, "", &reused, &err); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(reused, Eq(false)); @@ -93,8 +93,8 @@ TEST_F(TcpConnectioPoolTests, GetTwoConnection) { ExpectTwoConnects(); Error err; - auto sd1 = pool.GetFreeConnection(expected_source, &reused, &err); - auto sd2 = pool.GetFreeConnection(expected_source, &reused, &err); + auto sd1 = pool.GetFreeConnection(expected_source, "", &reused, &err); + auto sd2 = pool.GetFreeConnection(expected_source, "", &reused, &err); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(sd1, Eq(expected_sd)); @@ -106,9 +106,9 @@ TEST_F(TcpConnectioPoolTests, GetConnectionUsesConnectionPool) { ExpectSingleConnect(); Error err; - auto sd1 = pool.GetFreeConnection(expected_source, &reused, &err); + auto sd1 = pool.GetFreeConnection(expected_source, "", &reused, &err); pool.ReleaseConnection(sd1); - auto sd2 = pool.GetFreeConnection(expected_source, &reused, &err); + auto sd2 = pool.GetFreeConnection(expected_source, "", &reused, &err); ASSERT_THAT(reused, Eq(true)); ASSERT_THAT(err, Eq(nullptr)); @@ -126,7 +126,7 @@ TEST_F(TcpConnectioPoolTests, CannotConnect) { )); Error err, err1; - auto sd = pool.GetFreeConnection(expected_source, &reused, &err); + auto sd = pool.GetFreeConnection(expected_source, "", &reused, &err); auto sd1 = pool.Reconnect(sd, &err1); ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kInvalidAddressFormat)); ASSERT_THAT(err1, Ne(nullptr)); @@ -138,7 +138,7 @@ TEST_F(TcpConnectioPoolTests, CanReconnect) { ExpectTwoConnects(); Error err; - auto sd1 = pool.GetFreeConnection(expected_source, &reused, &err); + auto sd1 = pool.GetFreeConnection(expected_source, "", &reused, &err); pool.ReleaseConnection(sd1); auto sd2 = pool.Reconnect(sd1, &err); @@ -151,7 +151,7 @@ TEST_F(TcpConnectioPoolTests, ReconnectionFails) { ExpectTwoConnects(true); Error err1, err2, err3; - auto sd1 = pool.GetFreeConnection(expected_source, &reused, &err1); + auto sd1 = pool.GetFreeConnection(expected_source, "", &reused, &err1); pool.ReleaseConnection(sd1); auto sd2 = pool.Reconnect(sd1, &err2); auto sd3 = pool.Reconnect(sd2, &err3); // this reconnect should not work as the record has been removed diff --git a/consumer/api/cpp/unittests/test_tcp_consumer_client.cpp b/consumer/api/cpp/unittests/test_tcp_consumer_client.cpp index 9c0b28f1049f2216d8ec404a7850cc0e2c359772..4c7bd4dd2e7dca03210361f42a05ccf99077fd41 100644 --- a/consumer/api/cpp/unittests/test_tcp_consumer_client.cpp +++ b/consumer/api/cpp/unittests/test_tcp_consumer_client.cpp @@ -77,10 +77,10 @@ class TcpClientTests : public Test { } void ExpectNewConnection(bool reused, bool ok) { - EXPECT_CALL(mock_connection_pool, GetFreeConnection_t(expected_uri, _, _)).WillOnce( + EXPECT_CALL(mock_connection_pool, GetFreeConnection_t(expected_uri, "", _, _)).WillOnce( DoAll( - SetArgPointee<1>(reused), - SetArgPointee<2>(ok ? nullptr : asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), + SetArgPointee<2>(reused), + SetArgPointee<3>(ok ? nullptr : asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), Return(ok ? expected_sd : asapo::kDisconnectedSocketDescriptor)) ); } diff --git a/receiver/src/main.cpp b/receiver/src/main.cpp index c2b9c63bde0c7fcaf1be97c6eddf09f5b6004de4..505b298c2081315973dd7bc5945b752397509604 100644 --- a/receiver/src/main.cpp +++ b/receiver/src/main.cpp @@ -95,6 +95,9 @@ int StartReceiver(const asapo::ReceiverConfig* config, asapo::SharedCache cache, auto receiver = std::unique_ptr<asapo::Receiver>{new asapo::Receiver(cache,monitoring, kafkaClient)}; logger->Info("listening on " + address); + if (!config->dataserver.advertise_ib_uri.empty()) { + logger->Info("additional ib " + address); + } asapo::Error err; receiver->Listen(address, &err); diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp index 39e40d9259cf0d1c6222171ab86e9952e5d645b3..db98730a453b0d3857092e6536fd03b1b74f93d7 100644 --- a/receiver/src/receiver_config.cpp +++ b/receiver/src/receiver_config.cpp @@ -3,6 +3,8 @@ #include "asapo/json_parser/json_parser.h" #include <iostream> +#include <unistd.h> +#include <netdb.h> namespace asapo { @@ -64,6 +66,22 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) { } } + // try to discover an IB address. It should be resolved by <hostname>-ib.desy.de, if exists + char hostname[250]; + gethostname(hostname, 250); + std::string hostnamestr {hostname}; + auto pos = hostnamestr.find('.'); + if (pos != std::string::npos) { + hostnamestr = hostnamestr.substr(0, pos) + "-ib" + hostnamestr.substr(pos); + } else { + hostnamestr += "-ib"; + } + auto record = gethostbyname(hostnamestr.c_str()); + if (record != nullptr && record->h_addr_list != nullptr) { + pos = config.dataserver.advertise_uri.find(':'); + config.dataserver.advertise_ib_uri = record->h_addr_list[0] + config.dataserver.advertise_uri.substr(pos); + } + config.dataserver.tag = config.tag + "_ds"; config.log_level = StringToLogLevel(log_level, &err); diff --git a/receiver/src/receiver_data_server/receiver_data_server_config.h b/receiver/src/receiver_data_server/receiver_data_server_config.h index 208c87d96eeeb61e2cebaf6ecbb170ac42b518d4..3163c10d15de5ba98c4b032d81106daafc3bceea 100644 --- a/receiver/src/receiver_data_server/receiver_data_server_config.h +++ b/receiver/src/receiver_data_server/receiver_data_server_config.h @@ -10,6 +10,7 @@ struct ReceiverDataServerConfig { uint64_t nthreads = 0; std::string tag; std::string advertise_uri; + std::string advertise_ib_uri; std::vector<std::string> network_mode; }; diff --git a/receiver/src/request_handler/request_handler_db_write.cpp b/receiver/src/request_handler/request_handler_db_write.cpp index e715287b026d4e698d968bb5d116841912aba7f4..070559bff28c82e1dd053b6adce461d9835c615a 100644 --- a/receiver/src/request_handler/request_handler_db_write.cpp +++ b/receiver/src/request_handler/request_handler_db_write.cpp @@ -82,6 +82,7 @@ MessageMeta RequestHandlerDbWrite::PrepareMessageMeta(const Request* request) co message_meta.buf_id = request->GetSlotId(); message_meta.stream = request->GetStream(); message_meta.source = GetReceiverConfig()->dataserver.advertise_uri; + message_meta.ib_source = GetReceiverConfig()->dataserver.advertise_ib_uri; message_meta.metadata = request->GetMetaData(); message_meta.timestamp = std::chrono::system_clock::now(); return message_meta;