diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp index c53e29b06ec432e7d0bc0644c9329ab26e0a8f34..3fccd4f6eab9711bf4e7a923ff0f14ad443e9d0e 100644 --- a/consumer/api/cpp/src/server_data_broker.cpp +++ b/consumer/api/cpp/src/server_data_broker.cpp @@ -356,44 +356,47 @@ bool ServerDataBroker::DataCanBeInBuffer(const FileInfo* info) { return info->buf_id > 0; } +Error ServerDataBroker::CreateNetClientAndTryToGetFile(const FileInfo* info, FileData* data) { + const std::lock_guard<std::mutex> lock(net_client_mutex__); + if (net_client__) { + return nullptr; + } + if (should_try_rdma_first_) { // This will check if a rdma connection can be made and will return early if so + auto fabricClient = std::unique_ptr<NetClient>(new FabricConsumerClient()); -Error ServerDataBroker::TryGetDataFromBuffer(const FileInfo* info, FileData* data) { - Error error; - if (!net_client__) { - const std::lock_guard<std::mutex> lock(net_client_mutex__); - if (!net_client__) { - if (should_try_rdma_first_) { // This will check if a rdma connection can be made and will return early if so - auto fabricClient = std::unique_ptr<NetClient>(new FabricConsumerClient()); - - error = fabricClient->GetData(info, data); - - // Check if the error comes from the receiver data server (so a connection was made) - if (!error || error == RdsResponseErrorTemplates::kNetErrorNoData) { - net_client__.swap(fabricClient); - current_connection_type_ = NetworkConnectionType::kFabric; - return error; // Successfully received data and is now using a fabric client - } - - if (std::getenv("ASAPO_PRINT_FALLBACK_REASON")) { - std::cout << "Fallback to TCP because error: " << error << std::endl; - } - - // Retry with TCP - should_try_rdma_first_ = false; - error = nullptr; - } + Error error = fabricClient->GetData(info, data); - if (!should_try_rdma_first_) { - net_client__.reset(new TcpClient()); - current_connection_type_ = NetworkConnectionType::kAsapoTcp; - // If we use tcp, we can fall thought and use the normal GetData code - } + // Check if the error comes from the receiver data server (so a connection was made) + if (!error || error == RdsResponseErrorTemplates::kNetErrorNoData) { + net_client__.swap(fabricClient); + current_connection_type_ = NetworkConnectionType::kFabric; + return error; // Successfully received data and is now using a fabric client + } + + // An error occurred! + + if (std::getenv("ASAPO_PRINT_FALLBACK_REASON")) { + std::cout << "Fallback to TCP because error: " << error << std::endl; } + + // Retry with TCP + should_try_rdma_first_ = false; + } + + // Create regular tcp client + net_client__.reset(new TcpClient()); + current_connection_type_ = NetworkConnectionType::kAsapoTcp; + + return net_client__->GetData(info, data); +} + +Error ServerDataBroker::TryGetDataFromBuffer(const FileInfo* info, FileData* data) { + if (!net_client__) { + return CreateNetClientAndTryToGetFile(info, data); } - error = net_client__->GetData(info, data); - return error; + return net_client__->GetData(info, data); } std::string ServerDataBroker::GenerateNewGroupId(Error* err) { diff --git a/consumer/api/cpp/src/server_data_broker.h b/consumer/api/cpp/src/server_data_broker.h index db868e8c45c0a1203ce16df127fd7bd4f9673e6c..5994b51aa50fcebe730503cbb953ca1e13cf4e9e 100644 --- a/consumer/api/cpp/src/server_data_broker.h +++ b/consumer/api/cpp/src/server_data_broker.h @@ -137,6 +137,7 @@ class ServerDataBroker final : public asapo::DataBroker { Error* err); bool DataCanBeInBuffer(const FileInfo* info); Error TryGetDataFromBuffer(const FileInfo* info, FileData* data); + Error CreateNetClientAndTryToGetFile(const FileInfo* info, FileData* data); Error ServiceRequestWithTimeout(const std::string& service_name, std::string* service_uri, RequestInfo request, RequestOutput* response); std::string BrokerRequestWithTimeout(RequestInfo request, Error* err);