From b2d2a8e9e4022a2e23b26133b16349a480bc49a9 Mon Sep 17 00:00:00 2001 From: Carsten Patzke <carsten.patzke@desy.de> Date: Thu, 17 Sep 2020 17:32:09 +0200 Subject: [PATCH] Refactored TryGetDataFromBuffer --- consumer/api/cpp/src/server_data_broker.cpp | 67 +++++++++++---------- consumer/api/cpp/src/server_data_broker.h | 1 + 2 files changed, 36 insertions(+), 32 deletions(-) diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp index c53e29b06..3fccd4f6e 100644 --- a/consumer/api/cpp/src/server_data_broker.cpp +++ b/consumer/api/cpp/src/server_data_broker.cpp @@ -356,44 +356,47 @@ bool ServerDataBroker::DataCanBeInBuffer(const FileInfo* info) { return info->buf_id > 0; } +Error ServerDataBroker::CreateNetClientAndTryToGetFile(const FileInfo* info, FileData* data) { + const std::lock_guard<std::mutex> lock(net_client_mutex__); + if (net_client__) { + return nullptr; + } + if (should_try_rdma_first_) { // This will check if a rdma connection can be made and will return early if so + auto fabricClient = std::unique_ptr<NetClient>(new FabricConsumerClient()); -Error ServerDataBroker::TryGetDataFromBuffer(const FileInfo* info, FileData* data) { - Error error; - if (!net_client__) { - const std::lock_guard<std::mutex> lock(net_client_mutex__); - if (!net_client__) { - if (should_try_rdma_first_) { // This will check if a rdma connection can be made and will return early if so - auto fabricClient = std::unique_ptr<NetClient>(new FabricConsumerClient()); - - error = fabricClient->GetData(info, data); - - // Check if the error comes from the receiver data server (so a connection was made) - if (!error || error == RdsResponseErrorTemplates::kNetErrorNoData) { - net_client__.swap(fabricClient); - current_connection_type_ = NetworkConnectionType::kFabric; - return error; // Successfully received data and is now using a fabric client - } - - if (std::getenv("ASAPO_PRINT_FALLBACK_REASON")) { - std::cout << "Fallback to TCP because error: " << error << std::endl; - } - - // Retry with TCP - should_try_rdma_first_ = false; - error = nullptr; - } + Error error = fabricClient->GetData(info, data); - if (!should_try_rdma_first_) { - net_client__.reset(new TcpClient()); - current_connection_type_ = NetworkConnectionType::kAsapoTcp; - // If we use tcp, we can fall thought and use the normal GetData code - } + // Check if the error comes from the receiver data server (so a connection was made) + if (!error || error == RdsResponseErrorTemplates::kNetErrorNoData) { + net_client__.swap(fabricClient); + current_connection_type_ = NetworkConnectionType::kFabric; + return error; // Successfully received data and is now using a fabric client + } + + // An error occurred! + + if (std::getenv("ASAPO_PRINT_FALLBACK_REASON")) { + std::cout << "Fallback to TCP because error: " << error << std::endl; } + + // Retry with TCP + should_try_rdma_first_ = false; + } + + // Create regular tcp client + net_client__.reset(new TcpClient()); + current_connection_type_ = NetworkConnectionType::kAsapoTcp; + + return net_client__->GetData(info, data); +} + +Error ServerDataBroker::TryGetDataFromBuffer(const FileInfo* info, FileData* data) { + if (!net_client__) { + return CreateNetClientAndTryToGetFile(info, data); } - error = net_client__->GetData(info, data); - return error; + return net_client__->GetData(info, data); } std::string ServerDataBroker::GenerateNewGroupId(Error* err) { diff --git a/consumer/api/cpp/src/server_data_broker.h b/consumer/api/cpp/src/server_data_broker.h index db868e8c4..5994b51aa 100644 --- a/consumer/api/cpp/src/server_data_broker.h +++ b/consumer/api/cpp/src/server_data_broker.h @@ -137,6 +137,7 @@ class ServerDataBroker final : public asapo::DataBroker { Error* err); bool DataCanBeInBuffer(const FileInfo* info); Error TryGetDataFromBuffer(const FileInfo* info, FileData* data); + Error CreateNetClientAndTryToGetFile(const FileInfo* info, FileData* data); Error ServiceRequestWithTimeout(const std::string& service_name, std::string* service_uri, RequestInfo request, RequestOutput* response); std::string BrokerRequestWithTimeout(RequestInfo request, Error* err); -- GitLab