Skip to content
Snippets Groups Projects
Commit c50f9eb6 authored by George Sedov's avatar George Sedov
Browse files

Add optional IB source for data in cache

parent 333837b0
No related branches found
No related tags found
No related merge requests found
Showing with 50 additions and 20 deletions
......@@ -36,6 +36,7 @@ class MessageMeta {
uint64_t size{0};
uint64_t id{0};
std::string source;
std::string ib_source;
std::string metadata;
uint64_t buf_id{0};
std::string stream; // might be "unknownStream" for older datasets
......
......@@ -69,6 +69,7 @@ std::string MessageMeta::Json() const {
"\"timestamp\":"
+ std::to_string(nanoseconds_from_epoch) + ","
"\"source\":\"" + source + "\","
"\"ib_source\":\"" + ib_source + "\","
"\"buf_id\":" + std::to_string(buf_id_int) + ","
"\"stream\":\"" + stream + "\","
"\"dataset_substream\":" + std::to_string(dataset_substream) + ","
......@@ -139,8 +140,9 @@ bool MessageMeta::SetFromJson(const std::string& json_string) {
*this = old;
return false;
}
//ignore error if ingest_mode not found
//ignore error if the following not found
parser.GetUInt64("ingest_mode", &ingest_mode);
parser.GetString("ib_source", &ib_source);
return true;
}
......
......@@ -18,11 +18,11 @@ SocketDescriptor TcpConnectionPool::Connect(const std::string& source, Error* er
return sd;
}
SocketDescriptor TcpConnectionPool::GetFreeConnection(const std::string& source, bool* reused, Error* err) {
SocketDescriptor TcpConnectionPool::GetFreeConnection(const std::string& source, const std::string& optional_source, bool* reused, Error* err) {
std::lock_guard<std::mutex> lock{mutex_};
for (auto& connection : connections_) {
if (source == connection.uri && !connection.in_use) {
if ((optional_source == connection.uri || source == connection.uri) && !connection.in_use) {
connection.in_use = true;
*err = nullptr;
*reused = true;
......@@ -30,7 +30,11 @@ SocketDescriptor TcpConnectionPool::GetFreeConnection(const std::string& source,
}
}
auto sd = Connect(source, err);
auto sd = Connect(optional_source.empty() ? source : optional_source, err);
if (*err != nullptr && !optional_source.empty()) {
// optional source (IB connection) failed for whatever reason, fallback
sd = Connect(source, err);
}
if (*err == nullptr) {
*reused = false;
TcpConnectionInfo connection{source, sd, true};
......
......@@ -16,7 +16,7 @@ struct TcpConnectionInfo {
class TcpConnectionPool {
public:
ASAPO_VIRTUAL SocketDescriptor GetFreeConnection(const std::string& source, bool* reused, Error* err);
ASAPO_VIRTUAL SocketDescriptor GetFreeConnection(const std::string& source, const std::string& optional_source, bool* reused, Error* err);
ASAPO_VIRTUAL SocketDescriptor Reconnect(SocketDescriptor sd, Error* err);
ASAPO_VIRTUAL void ReleaseConnection(SocketDescriptor sd);
ASAPO_VIRTUAL ~TcpConnectionPool() = default;
......
......@@ -104,7 +104,7 @@ Error TcpConsumerClient::ReceiveData(SocketDescriptor sd, const MessageMeta* inf
Error TcpConsumerClient::GetData(const MessageMeta* info, const std::string& request_sender_details, MessageData* data) {
Error err;
bool reused;
auto sd = connection_pool__->GetFreeConnection(info->source, &reused, &err);
auto sd = connection_pool__->GetFreeConnection(info->source, info->ib_source, &reused, &err);
if (err != nullptr) {
return err;
}
......
......@@ -24,13 +24,13 @@ class MockNetClient : public asapo::NetClient {
class MockTCPConnectionPool : public asapo::TcpConnectionPool {
public:
SocketDescriptor GetFreeConnection(const std::string& source, bool* reused, Error* err) override {
SocketDescriptor GetFreeConnection(const std::string& source, const std::string& optional_source, bool* reused, Error* err) override {
ErrorInterface* error = nullptr;
auto data = GetFreeConnection_t(source, reused, &error);
auto data = GetFreeConnection_t(source, optional_source, reused, &error);
err->reset(error);
return data;
}
MOCK_METHOD3(GetFreeConnection_t, SocketDescriptor (const std::string&, bool* reused, ErrorInterface**));
MOCK_METHOD4(GetFreeConnection_t, SocketDescriptor (const std::string&, const std::string&, bool* reused, ErrorInterface**));
SocketDescriptor Reconnect(SocketDescriptor sd, Error* err) override {
ErrorInterface* error = nullptr;
......
......@@ -81,7 +81,7 @@ TEST_F(TcpConnectioPoolTests, GetConnectionCreatesNewOne) {
ExpectSingleConnect();
Error err;
auto sd = pool.GetFreeConnection(expected_source, &reused, &err);
auto sd = pool.GetFreeConnection(expected_source, "", &reused, &err);
ASSERT_THAT(err, Eq(nullptr));
ASSERT_THAT(reused, Eq(false));
......@@ -93,8 +93,8 @@ TEST_F(TcpConnectioPoolTests, GetTwoConnection) {
ExpectTwoConnects();
Error err;
auto sd1 = pool.GetFreeConnection(expected_source, &reused, &err);
auto sd2 = pool.GetFreeConnection(expected_source, &reused, &err);
auto sd1 = pool.GetFreeConnection(expected_source, "", &reused, &err);
auto sd2 = pool.GetFreeConnection(expected_source, "", &reused, &err);
ASSERT_THAT(err, Eq(nullptr));
ASSERT_THAT(sd1, Eq(expected_sd));
......@@ -106,9 +106,9 @@ TEST_F(TcpConnectioPoolTests, GetConnectionUsesConnectionPool) {
ExpectSingleConnect();
Error err;
auto sd1 = pool.GetFreeConnection(expected_source, &reused, &err);
auto sd1 = pool.GetFreeConnection(expected_source, "", &reused, &err);
pool.ReleaseConnection(sd1);
auto sd2 = pool.GetFreeConnection(expected_source, &reused, &err);
auto sd2 = pool.GetFreeConnection(expected_source, "", &reused, &err);
ASSERT_THAT(reused, Eq(true));
ASSERT_THAT(err, Eq(nullptr));
......@@ -126,7 +126,7 @@ TEST_F(TcpConnectioPoolTests, CannotConnect) {
));
Error err, err1;
auto sd = pool.GetFreeConnection(expected_source, &reused, &err);
auto sd = pool.GetFreeConnection(expected_source, "", &reused, &err);
auto sd1 = pool.Reconnect(sd, &err1);
ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kInvalidAddressFormat));
ASSERT_THAT(err1, Ne(nullptr));
......@@ -138,7 +138,7 @@ TEST_F(TcpConnectioPoolTests, CanReconnect) {
ExpectTwoConnects();
Error err;
auto sd1 = pool.GetFreeConnection(expected_source, &reused, &err);
auto sd1 = pool.GetFreeConnection(expected_source, "", &reused, &err);
pool.ReleaseConnection(sd1);
auto sd2 = pool.Reconnect(sd1, &err);
......@@ -151,7 +151,7 @@ TEST_F(TcpConnectioPoolTests, ReconnectionFails) {
ExpectTwoConnects(true);
Error err1, err2, err3;
auto sd1 = pool.GetFreeConnection(expected_source, &reused, &err1);
auto sd1 = pool.GetFreeConnection(expected_source, "", &reused, &err1);
pool.ReleaseConnection(sd1);
auto sd2 = pool.Reconnect(sd1, &err2);
auto sd3 = pool.Reconnect(sd2, &err3); // this reconnect should not work as the record has been removed
......
......@@ -77,10 +77,10 @@ class TcpClientTests : public Test {
}
void ExpectNewConnection(bool reused, bool ok) {
EXPECT_CALL(mock_connection_pool, GetFreeConnection_t(expected_uri, _, _)).WillOnce(
EXPECT_CALL(mock_connection_pool, GetFreeConnection_t(expected_uri, "", _, _)).WillOnce(
DoAll(
SetArgPointee<1>(reused),
SetArgPointee<2>(ok ? nullptr : asapo::IOErrorTemplates::kUnknownIOError.Generate().release()),
SetArgPointee<2>(reused),
SetArgPointee<3>(ok ? nullptr : asapo::IOErrorTemplates::kUnknownIOError.Generate().release()),
Return(ok ? expected_sd : asapo::kDisconnectedSocketDescriptor))
);
}
......
......@@ -95,6 +95,9 @@ int StartReceiver(const asapo::ReceiverConfig* config, asapo::SharedCache cache,
auto receiver = std::unique_ptr<asapo::Receiver>{new asapo::Receiver(cache,monitoring, kafkaClient)};
logger->Info("listening on " + address);
if (!config->dataserver.advertise_ib_uri.empty()) {
logger->Info("additional ib " + address);
}
asapo::Error err;
receiver->Listen(address, &err);
......
......@@ -3,6 +3,8 @@
#include "asapo/json_parser/json_parser.h"
#include <iostream>
#include <unistd.h>
#include <netdb.h>
namespace asapo {
......@@ -64,6 +66,22 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) {
}
}
// try to discover an IB address. It should be resolved by <hostname>-ib.desy.de, if exists
char hostname[250];
gethostname(hostname, 250);
std::string hostnamestr {hostname};
auto pos = hostnamestr.find('.');
if (pos != std::string::npos) {
hostnamestr = hostnamestr.substr(0, pos) + "-ib" + hostnamestr.substr(pos);
} else {
hostnamestr += "-ib";
}
auto record = gethostbyname(hostnamestr.c_str());
if (record != nullptr && record->h_addr_list != nullptr) {
pos = config.dataserver.advertise_uri.find(':');
config.dataserver.advertise_ib_uri = record->h_addr_list[0] + config.dataserver.advertise_uri.substr(pos);
}
config.dataserver.tag = config.tag + "_ds";
config.log_level = StringToLogLevel(log_level, &err);
......
......@@ -10,6 +10,7 @@ struct ReceiverDataServerConfig {
uint64_t nthreads = 0;
std::string tag;
std::string advertise_uri;
std::string advertise_ib_uri;
std::vector<std::string> network_mode;
};
......
......@@ -82,6 +82,7 @@ MessageMeta RequestHandlerDbWrite::PrepareMessageMeta(const Request* request) co
message_meta.buf_id = request->GetSlotId();
message_meta.stream = request->GetStream();
message_meta.source = GetReceiverConfig()->dataserver.advertise_uri;
message_meta.ib_source = GetReceiverConfig()->dataserver.advertise_ib_uri;
message_meta.metadata = request->GetMetaData();
message_meta.timestamp = std::chrono::system_clock::now();
return message_meta;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment