diff --git a/common/cpp/include/asapo_fabric/fabric_error.h b/common/cpp/include/asapo_fabric/fabric_error.h index 40c88b6ec228261b7b9ce24fe2d23e94bbac1a64..1a8ca01061397a9d59384fe11e92505692d0c1d8 100644 --- a/common/cpp/include/asapo_fabric/fabric_error.h +++ b/common/cpp/include/asapo_fabric/fabric_error.h @@ -25,7 +25,7 @@ auto const kNotSupportedOnBuildError = FabricErrorTemplate { "This build of ASAPO does not support LibFabric", FabricErrorType::kNotSupported }; auto const kLibraryNotFoundError = FabricErrorTemplate { - "asapo-fabric, LibFabric or dependencies were not found", FabricErrorType::kLibraryNotFound + "LibFabric or dependencies were not found", FabricErrorType::kLibraryNotFound }; auto const kLibraryCompatibilityError = FabricErrorTemplate { "LibFabric was found but somehow some a function is missing", FabricErrorType::kLibraryCompatibilityError diff --git a/common/cpp/src/asapo_fabric/asapo_fabric.cpp b/common/cpp/src/asapo_fabric/asapo_fabric.cpp index 1d5c84fd2aafa821888cf6170b18cae6c7d115ee..e4612b7c754e3e4705d4d7015c68b14b298e7cf7 100644 --- a/common/cpp/src/asapo_fabric/asapo_fabric.cpp +++ b/common/cpp/src/asapo_fabric/asapo_fabric.cpp @@ -15,7 +15,7 @@ std::unique_ptr<FabricFactory> asapo::fabric::GenerateDefaultFabricFactory() { return std::unique_ptr<FabricFactory>(new FabricFactoryImpl()); } - void* handle = dlopen("libfabric.so", RTLD_LAZY); + void* handle = dlopen("libfabric.so.1", RTLD_LAZY); if (handle) { #define ADD_FABRIC_CALL(fName) do { if (!(*((void**)&gffm().fName) = dlsym(handle, #fName))) goto functionNotFoundError; } while(0) ADD_FABRIC_CALL(fi_version); diff --git a/common/cpp/src/asapo_fabric/common/fabric_context_impl.cpp b/common/cpp/src/asapo_fabric/common/fabric_context_impl.cpp index 67e56366f05834ec4c4c32b3b3e63b8a72b55901..3d46fe0425591868257d9216dc47ff0c6e3f2d15 100644 --- a/common/cpp/src/asapo_fabric/common/fabric_context_impl.cpp +++ b/common/cpp/src/asapo_fabric/common/fabric_context_impl.cpp @@ -7,6 +7,7 @@ #include <netinet/in.h> #include <arpa/inet.h> #include <rdma/fi_tagged.h> +#include <iostream> #include "fabric_context_impl.h" #include "fabric_memory_region_impl.h" @@ -31,8 +32,7 @@ std::string __PRETTY_FUNCTION_TO_NAMESPACE__(const std::string& prettyFunction) } \ } while(0) // Enforce ';' -// TODO: It is super important that version 1.10 is installed, but since its not released yet we go with 1.9 -const uint32_t FabricContextImpl::kMinExpectedLibFabricVersion = FI_VERSION(1, 9); +const uint32_t FabricContextImpl::kMinExpectedLibFabricVersion = FI_VERSION(1, 11); FabricContextImpl::FabricContextImpl() : io__{ GenerateDefaultIO() }, alive_check_response_task_(this) { } diff --git a/common/cpp/src/asapo_fabric/common/task/fabric_self_requeuing_task.cpp b/common/cpp/src/asapo_fabric/common/task/fabric_self_requeuing_task.cpp index 75b18f1c440b5fc7afe5c868ff6789d1e9a5d088..53c48236f25b24b2c9d5ee74cff3cd37780accfd 100644 --- a/common/cpp/src/asapo_fabric/common/task/fabric_self_requeuing_task.cpp +++ b/common/cpp/src/asapo_fabric/common/task/fabric_self_requeuing_task.cpp @@ -49,6 +49,6 @@ void FabricSelfRequeuingTask::AfterCompletion() { } } -FabricContextImpl* FabricSelfRequeuingTask::ParentContext() { +FabricContextImpl* FabricSelfRequeuingTask::ParentContext() const { return parent_context_; } diff --git a/common/cpp/src/asapo_fabric/common/task/fabric_self_requeuing_task.h b/common/cpp/src/asapo_fabric/common/task/fabric_self_requeuing_task.h index 905b6f1dbb00a9669722711d898c70d970702424..0c23c3e447616d881503a3a3f4fa9e54c226a2a2 100644 --- a/common/cpp/src/asapo_fabric/common/task/fabric_self_requeuing_task.h +++ b/common/cpp/src/asapo_fabric/common/task/fabric_self_requeuing_task.h @@ -25,7 +25,7 @@ class FabricSelfRequeuingTask : public FabricTask { void HandleCompletion(const fi_cq_tagged_entry* entry, FabricAddress source) final; void HandleErrorCompletion(const fi_cq_err_entry* errEntry) final; protected: - FabricContextImpl* ParentContext(); + FabricContextImpl* ParentContext() const; virtual void RequeueSelf() = 0; virtual void OnCompletion(const fi_cq_tagged_entry* entry, FabricAddress source) = 0; diff --git a/common/cpp/src/asapo_fabric/server/task/fabric_handshake_accepting_task.cpp b/common/cpp/src/asapo_fabric/server/task/fabric_handshake_accepting_task.cpp index 44ed14d06149e743b8c8723fdbb29362539ac364..253d778b4975f56858098943c21649bcfd51e042 100644 --- a/common/cpp/src/asapo_fabric/server/task/fabric_handshake_accepting_task.cpp +++ b/common/cpp/src/asapo_fabric/server/task/fabric_handshake_accepting_task.cpp @@ -10,7 +10,7 @@ FabricHandshakeAcceptingTask::FabricHandshakeAcceptingTask(FabricServerImpl* par : FabricSelfRequeuingTask(parentServerContext) { } -FabricServerImpl* FabricHandshakeAcceptingTask::ServerContext() { +FabricServerImpl* FabricHandshakeAcceptingTask::ServerContext() const { return dynamic_cast<FabricServerImpl*>(ParentContext()); } diff --git a/common/cpp/src/asapo_fabric/server/task/fabric_handshake_accepting_task.h b/common/cpp/src/asapo_fabric/server/task/fabric_handshake_accepting_task.h index 74ffd3742fdc8435ae927da7f953ffde8eec3832..dedf61822b10e97cd65d1035cd7fcda9728749f4 100644 --- a/common/cpp/src/asapo_fabric/server/task/fabric_handshake_accepting_task.h +++ b/common/cpp/src/asapo_fabric/server/task/fabric_handshake_accepting_task.h @@ -18,7 +18,7 @@ class FabricHandshakeAcceptingTask : public FabricSelfRequeuingTask { explicit FabricHandshakeAcceptingTask(FabricServerImpl* server); private: - FabricServerImpl* ServerContext(); + FabricServerImpl* ServerContext() const; protected: // override FabricSelfRequeuingTask void RequeueSelf() override; diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp index 3fd91d9db21fe7e205e4af3e238e9e9638dbffc3..c53e29b06ec432e7d0bc0644c9329ab26e0a8f34 100644 --- a/consumer/api/cpp/src/server_data_broker.cpp +++ b/consumer/api/cpp/src/server_data_broker.cpp @@ -134,6 +134,8 @@ Error ServerDataBroker::ProcessPostRequest(const RequestInfo& request, RequestOu httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, request.cookie, request.body, &response->data_output, response->data_output_size, code); break; + default: + break; } return err; } @@ -373,6 +375,10 @@ Error ServerDataBroker::TryGetDataFromBuffer(const FileInfo* info, FileData* dat return error; // Successfully received data and is now using a fabric client } + if (std::getenv("ASAPO_PRINT_FALLBACK_REASON")) { + std::cout << "Fallback to TCP because error: " << error << std::endl; + } + // Retry with TCP should_try_rdma_first_ = false; error = nullptr; diff --git a/receiver/src/receiver_data_server/receiver_data_server.cpp b/receiver/src/receiver_data_server/receiver_data_server.cpp index 5526d602a9f34283fe65bab55dd71418910f1665..4b26097544fcd93cf72bf89e2ff9ef88aa4e2227 100644 --- a/receiver/src/receiver_data_server/receiver_data_server.cpp +++ b/receiver/src/receiver_data_server/receiver_data_server.cpp @@ -24,6 +24,7 @@ void ReceiverDataServer::Run() { if (err == IOErrorTemplates::kTimeout) { continue; } + if (!err) { err = request_pool__->AddRequests(std::move(requests)); }