Skip to content
Snippets Groups Projects
Commit b67e1e87 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

Merge branch 'develop' into feature_ASAPO-108-kubernetes-deployment

parents dbe75649 c33040a2
No related branches found
No related tags found
No related merge requests found
Showing
with 1065 additions and 7 deletions
......@@ -17,10 +17,11 @@ ENDIF(WIN32)
#TODO: Better way then GLOBAL PROPERTY
IF(WIN32)
find_package(Threads REQUIRED)
SET_PROPERTY(GLOBAL PROPERTY ASAPO_COMMON_IO_LIBRARIES ${CMAKE_THREAD_LIBS_INIT} wsock32 ws2_32)
SET(ASAPO_COMMON_IO_LIBRARIES ${CMAKE_THREAD_LIBS_INIT} wsock32 ws2_32)
ELSEIF(UNIX)
SET_PROPERTY(GLOBAL PROPERTY ASAPO_COMMON_IO_LIBRARIES Threads::Threads)
SET(ASAPO_COMMON_IO_LIBRARIES Threads::Threads)
ENDIF(WIN32)
SET_PROPERTY(GLOBAL PROPERTY ASAPO_COMMON_IO_LIBRARIES ${ASAPO_COMMON_IO_LIBRARIES})
if (CMAKE_BUILD_TYPE STREQUAL "Debug")
add_definitions(-DUNIT_TESTS)
......@@ -41,6 +42,8 @@ option(BUILD_PYTHON_DOCS "Uses sphinx to build the Python documentaion" OFF)
option(BUILD_CONSUMER_TOOLS "Build consumer tools" OFF)
option(BUILD_EXAMPLES "Build examples" OFF)
option(ENABLE_LIBFABRIC "Enables LibFabric support for RDMA transfers" OFF)
set(CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/CMakeModules/)
set (ASAPO_CXX_COMMON_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/common/cpp/include)
......@@ -62,6 +65,18 @@ endif()
message (STATUS "Using Python: ${Python_EXECUTABLE}")
SET(ASAPO_COMMON_FABRIC_LIBRARIES ${ASAPO_COMMON_IO_LIBRARIES})
IF(ENABLE_LIBFABRIC)
find_package(LibFabric)
if(NOT LIBFABRIC_LIBRARY)
message(FATAL_ERROR "Did not find libfabric")
endif()
message(STATUS "LibFabric support enabled")
message(STATUS "LIB_FABRIC: Path: ${LIBFABRIC_LIBRARY} Include: ${LIBFABRIC_INCLUDE_DIR}")
add_definitions(-DLIBFABRIC_ENABLED)
SET(ASAPO_COMMON_FABRIC_LIBRARIES ${ASAPO_COMMON_FABRIC_LIBRARIES} fabric)
ENDIF()
SET_PROPERTY(GLOBAL PROPERTY ASAPO_COMMON_FABRIC_LIBRARIES ${ASAPO_COMMON_FABRIC_LIBRARIES})
# format sources
include(astyle)
......
# FindLibFabric
# -------------
#
# Tries to find LibFabric on the system
#
# Available variables
# LIBFABRIC_LIBRARY - Path to the library
# LIBFABRIC_INCLUDE_DIR - Path to the include dir
cmake_minimum_required(VERSION 2.6)
find_path(LIBFABRIC_INCLUDE_DIR fabric.h)
find_library(LIBFABRIC_LIBRARY fabric)
mark_as_advanced(LIBFABRIC_INCLUDE_DIR LIBFABRIC_LIBRARY)
......@@ -12,6 +12,8 @@ add_subdirectory(src/logger)
add_subdirectory(src/request)
add_subdirectory(src/asapo_fabric)
if(BUILD_MONGODB_CLIENTLIB)
add_subdirectory(src/database)
endif()
......
#ifndef ASAPO_FABRIC_H
#define ASAPO_FABRIC_H
#include <cstdint>
#include <string>
#include <memory>
#include <common/error.h>
#include <logger/logger.h>
#include "fabric_error.h"
namespace asapo {
namespace fabric {
typedef uint64_t FabricAddress;
typedef uint64_t FabricMessageId;
// TODO Use a serialization framework
struct MemoryRegionDetails {
uint64_t addr;
uint64_t length;
uint64_t key;
};
class FabricMemoryRegion {
public:
virtual ~FabricMemoryRegion() = default;
virtual const MemoryRegionDetails* GetDetails() const = 0;
};
class FabricContext {
public:
virtual std::string GetAddress() const = 0;
virtual std::unique_ptr<FabricMemoryRegion> ShareMemoryRegion(void* src, size_t size, Error* error) = 0;
virtual void Send(FabricAddress dstAddress, FabricMessageId messageId,
const void* src, size_t size, Error* error) = 0;
virtual void Recv(FabricAddress srcAddress, FabricMessageId messageId,
void* dst, size_t size, Error* error) = 0;
virtual void RdmaWrite(FabricAddress dstAddress,
const MemoryRegionDetails* details, const void* buffer, size_t size,
Error* error) = 0;
// Since RdmaRead heavily impacts the performance we will not implement this
// virtual void RdmaRead(...) = 0;
};
class FabricClient : public FabricContext {
public:
virtual ~FabricClient() = default;
/// The serverAddress must be in this format: "hostname:port"
virtual FabricAddress AddServerAddress(const std::string& serverAddress, Error* error) = 0;
};
class FabricServer : public FabricContext {
public:
virtual ~FabricServer() = default;
virtual void RecvAny(FabricAddress* srcAddress, FabricMessageId* messageId, void* dst, size_t size, Error* error) = 0;
};
class FabricFactory {
public:
/**
* Creates a new server and will immediately allocate and listen to the given host:port
*/
virtual std::unique_ptr<FabricServer>
CreateAndBindServer(const AbstractLogger* logger, const std::string& host, uint16_t port,
Error* error) const = 0;
/**
* Will allocate a proper domain as soon as the client gets his first server address added
*/
virtual std::unique_ptr<FabricClient> CreateClient(Error* error) const = 0;
};
std::unique_ptr<FabricFactory> GenerateDefaultFabricFactory();
}
}
#endif //ASAPO_FABRIC_H
#ifndef ASAPO_FABRIC_ERROR_H
#define ASAPO_FABRIC_ERROR_H
namespace asapo {
namespace fabric {
enum class FabricErrorType {
kNotSupported,
kOutdatedLibrary,
kInternalError, // An error that was produced by LibFabric
kInternalOperationCanceled, // An error that was produced by LibFabric
kInternalConnectionError, // This might occur when the connection is unexpectedly closed
kNoDeviceFound,
kClientNotInitialized,
kTimeout,
kConnectionRefused,
};
using FabricError = ServiceError<FabricErrorType, ErrorType::kFabricError>;
using FabricErrorTemplate = ServiceErrorTemplate<FabricErrorType, ErrorType::kFabricError>;
namespace FabricErrorTemplates {
auto const kNotSupportedOnBuildError = FabricErrorTemplate {
"This build of ASAPO does not support LibFabric", FabricErrorType::kNotSupported
};
auto const kOutdatedLibraryError = FabricErrorTemplate {
"LibFabric outdated", FabricErrorType::kOutdatedLibrary
};
auto const kInternalError = FabricErrorTemplate {
"Internal LibFabric error", FabricErrorType::kInternalError
};
auto const kInternalOperationCanceledError = FabricErrorTemplate {
"Internal LibFabric operation canceled error", FabricErrorType::kInternalOperationCanceled
};
auto const kNoDeviceFoundError = FabricErrorTemplate {
"No device was found (Check your config)", FabricErrorType::kNoDeviceFound
};
auto const kClientNotInitializedError = FabricErrorTemplate {
"The client was not initialized. Add server address first!",
FabricErrorType::kClientNotInitialized
};
auto const kTimeout = FabricErrorTemplate {
"Timeout",
FabricErrorType::kTimeout
};
auto const kConnectionRefusedError = FabricErrorTemplate {
"Connection refused",
FabricErrorType::kConnectionRefused
};
auto const kInternalConnectionError = FabricErrorTemplate {
"Connection error (maybe a disconnect?)",
FabricErrorType::kInternalConnectionError
};
}
}
}
#endif //ASAPO_FABRIC_ERROR_H
......@@ -18,6 +18,7 @@ enum class ErrorType {
kConsumerError,
kMemoryAllocationError,
kEndOfFile,
kFabricError,
};
class ErrorInterface;
......@@ -213,7 +214,7 @@ class ServiceErrorTemplate : public SimpleErrorTemplate {
}
inline Error Generate(const std::string& suffix) const noexcept override {
return Error(new ServiceError<ServiceErrorType, MainErrorType>(error_ + " :" + suffix, error_type_));
return Error(new ServiceError<ServiceErrorType, MainErrorType>(error_ + ": " + suffix, error_type_));
}
inline bool operator==(const Error& rhs) const override {
......
......@@ -47,13 +47,13 @@ struct GenericRequestHeader {
uint64_t i_data_size = 0, uint64_t i_meta_size = 0, const std::string& i_message = "",
const std::string& i_substream = ""):
op_code{i_op_code}, data_id{i_data_id}, data_size{i_data_size}, meta_size{i_meta_size} {
strncpy(message, i_message.c_str(), kMaxMessageSize);
strncpy(message, i_message.c_str(), kMaxMessageSize); // TODO must be memcpy in order to send raw MemoryDetails
strncpy(substream, i_substream.c_str(), kMaxMessageSize);
}
GenericRequestHeader(const GenericRequestHeader& header) {
op_code = header.op_code, data_id = header.data_id, data_size = header.data_size, meta_size = header.meta_size,
memcpy(custom_data, header.custom_data, kNCustomParams * sizeof(uint64_t)),
strncpy(message, header.message, kMaxMessageSize);
strncpy(message, header.message, kMaxMessageSize); // TODO must be memcpy in order to send raw MemoryDetails
strncpy(substream, header.substream, kMaxMessageSize);
}
......
......@@ -92,6 +92,10 @@ class IO {
* @param err Since CloseSocket if often used in an error case, it's able to accept err as nullptr.
*/
virtual void CloseSocket(SocketDescriptor socket_fd, Error* err) const = 0;
virtual std::string AddressFromSocket(SocketDescriptor socket) const noexcept = 0;
virtual std::string GetHostName(Error* err) const noexcept = 0;
virtual std::unique_ptr<std::tuple<std::string, uint16_t>> SplitAddressToHostnameAndPort(
const std::string& address) const = 0;
/*
* Filesystem
......@@ -117,8 +121,6 @@ class IO {
virtual std::vector<FileInfo> FilesInFolder (const std::string& folder, Error* err) const = 0;
virtual std::string ReadFileToString (const std::string& fname, Error* err) const = 0;
virtual Error GetLastError() const = 0;
virtual std::string AddressFromSocket(SocketDescriptor socket) const noexcept = 0;
virtual std::string GetHostName(Error* err) const noexcept = 0;
virtual FileInfo GetFileInfo(const std::string& name, Error* err) const = 0;
virtual ~IO() = default;
......
#ifndef ASAPO_MOCKFABRIC_H
#define ASAPO_MOCKFABRIC_H
#include <asapo_fabric/asapo_fabric.h>
namespace asapo {
namespace fabric {
class MockFabricMemoryRegion : public FabricMemoryRegion {
MOCK_CONST_METHOD0(GetDetails, const MemoryRegionDetails * ());
};
class MockFabricContext : public FabricContext {
MOCK_CONST_METHOD0(GetAddress, std::string());
std::unique_ptr<FabricMemoryRegion> ShareMemoryRegion(void* src, size_t size, Error* error) override {
ErrorInterface* err = nullptr;
auto data = ShareMemoryRegion_t(src, size, &err);
error->reset(err);
return std::unique_ptr<FabricMemoryRegion> {data};
}
MOCK_METHOD3(ShareMemoryRegion_t, FabricMemoryRegion * (void* src, size_t size, ErrorInterface** err));
void Send(FabricAddress dstAddress, FabricMessageId messageId,
const void* src, size_t size, Error* error) override {
ErrorInterface* err = nullptr;
Send_t(dstAddress, messageId, src, size, &err);
error->reset(err);
}
MOCK_METHOD5(Send_t, void(FabricAddress dstAddress, FabricMessageId messageId,
const void* src, size_t size, ErrorInterface** err));
void Recv(FabricAddress srcAddress, FabricMessageId messageId,
void* dst, size_t size, Error* error) override {
ErrorInterface* err = nullptr;
Recv_t(srcAddress, messageId, dst, size, &err);
error->reset(err);
}
MOCK_METHOD5(Recv_t, void(FabricAddress dstAddress, FabricMessageId messageId,
const void* src, size_t size, ErrorInterface** err));
void RdmaWrite(FabricAddress dstAddress,
const MemoryRegionDetails* details, const void* buffer, size_t size,
Error* error) override {
ErrorInterface* err = nullptr;
RdmaWrite_t(dstAddress, details, buffer, size, &err);
error->reset(err);
}
MOCK_METHOD5(RdmaWrite_t, void(FabricAddress dstAddress, const MemoryRegionDetails* details, const void* buffer,
size_t size, ErrorInterface** error));
};
class MockFabricClient : public MockFabricContext, public FabricClient {
FabricAddress AddServerAddress(const std::string& serverAddress, Error* error) override {
ErrorInterface* err = nullptr;
auto data = AddServerAddress_t(serverAddress, &err);
error->reset(err);
return data;
}
MOCK_METHOD2(AddServerAddress_t, FabricAddress (const std::string& serverAddress, ErrorInterface** err));
};
class MockFabricServer : public MockFabricContext, public FabricServer {
void RecvAny(FabricAddress* srcAddress, FabricMessageId* messageId, void* dst, size_t size, Error* error) override {
ErrorInterface* err = nullptr;
RecvAny_t(srcAddress, messageId, dst, size, &err);
error->reset(err);
}
MOCK_METHOD5(RecvAny_t, void(FabricAddress* srcAddress, FabricMessageId* messageId,
void* dst, size_t size, ErrorInterface** err));
};
class MockFabricFactory : public FabricFactory {
public:
std::unique_ptr<FabricServer>
CreateAndBindServer(const AbstractLogger* logger, const std::string& host, uint16_t port,
Error* error) const override {
ErrorInterface* err = nullptr;
auto data = CreateAndBindServer_t(logger, host, port, &err);
error->reset(err);
return std::unique_ptr<FabricServer> {data};
}
MOCK_CONST_METHOD4(CreateAndBindServer_t,
FabricServer * (const AbstractLogger* logger, const std::string& host,
uint16_t port, ErrorInterface** err));
std::unique_ptr<FabricClient> CreateClient(Error* error) const override {
ErrorInterface* err = nullptr;
auto data = CreateClient_t(&err);
error->reset(err);
return std::unique_ptr<FabricClient> {data};
}
MOCK_CONST_METHOD1(CreateClient_t,
FabricClient * (ErrorInterface** err));
};
}
}
#endif //ASAPO_MOCKFABRIC_H
......@@ -145,6 +145,10 @@ class MockIO : public IO {
}
MOCK_CONST_METHOD4(Send_t, size_t(SocketDescriptor socket_fd, const void* buf, size_t length, ErrorInterface** err));
MOCK_CONST_METHOD1(SplitAddressToHostnameAndPort,
std::unique_ptr<std::tuple<std::string, uint16_t>>(const std::string& address));
void Skip(SocketDescriptor socket_fd, size_t length, Error* err) const override {
ErrorInterface* error = nullptr;
Skip_t(socket_fd, length, &error);
......
set(TARGET_NAME asapo-fabric)
include_directories(include)
set(SOURCE_FILES asapo_fabric.cpp)
IF(ENABLE_LIBFABRIC)
set(SOURCE_FILES ${SOURCE_FILES}
fabric_internal_error.cpp
fabric_factory_impl.cpp
common/fabric_context_impl.cpp
common/fabric_memory_region_impl.cpp
common/task/fabric_waitable_task.cpp
common/task/fabric_self_deleting_task.cpp
common/task/fabric_self_requeuing_task.cpp
common/task/fabric_alive_check_response_task.cpp
client/fabric_client_impl.cpp
server/fabric_server_impl.cpp
server/task/fabric_recv_any_task.cpp
server/task/fabric_handshake_accepting_task.cpp
)
ELSE()
set(SOURCE_FILES ${SOURCE_FILES}
fabric_factory_not_supported.cpp
)
ENDIF()
################################
# Library
################################
add_library(${TARGET_NAME} STATIC ${SOURCE_FILES} $<TARGET_OBJECTS:system_io>)
target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR})
#include <asapo_fabric/asapo_fabric.h>
#ifdef LIBFABRIC_ENABLED
#include "fabric_factory_impl.h"
#else
#include "fabric_factory_not_supported.h"
#endif
using namespace asapo::fabric;
std::unique_ptr<FabricFactory> asapo::fabric::GenerateDefaultFabricFactory() {
#ifdef LIBFABRIC_ENABLED
return std::unique_ptr<FabricFactory>(new FabricFactoryImpl());
#else
return std::unique_ptr<FabricFactory>(new FabricFactoryNotSupported());
#endif
}
#include "fabric_client_impl.h"
#include <rdma/fi_domain.h>
#include <cstring>
using namespace asapo;
using namespace fabric;
std::string FabricClientImpl::GetAddress() const {
if (!domain_) {
return "";
}
return FabricContextImpl::GetAddress();
}
std::unique_ptr<FabricMemoryRegion> FabricClientImpl::ShareMemoryRegion(void* src, size_t size, Error* error) {
if (!domain_) {
*error = FabricErrorTemplates::kClientNotInitializedError.Generate();
return nullptr;
}
return FabricContextImpl::ShareMemoryRegion(src, size, error);
}
void FabricClientImpl::Send(FabricAddress dstAddress, FabricMessageId messageId, const void* src, size_t size,
Error* error) {
if (!domain_) {
*error = FabricErrorTemplates::kClientNotInitializedError.Generate();
return;
}
FabricContextImpl::Send(dstAddress, messageId, src, size, error);
}
void FabricClientImpl::Recv(FabricAddress srcAddress, FabricMessageId messageId, void* dst, size_t size, Error* error) {
if (!domain_) {
*error = FabricErrorTemplates::kClientNotInitializedError.Generate();
return;
}
FabricContextImpl::Recv(srcAddress, messageId, dst, size, error);
}
void
FabricClientImpl::RdmaWrite(FabricAddress dstAddress, const MemoryRegionDetails* details, const void* buffer,
size_t size,
Error* error) {
if (!domain_) {
*error = FabricErrorTemplates::kClientNotInitializedError.Generate();
return;
}
FabricContextImpl::RdmaWrite(dstAddress, details, buffer, size, error);
}
FabricAddress FabricClientImpl::AddServerAddress(const std::string& serverAddress, Error* error) {
std::string hostname;
uint16_t port;
std::tie(hostname, port) = *io__->SplitAddressToHostnameAndPort(serverAddress);
std::string serverIp = io__->ResolveHostnameToIp(hostname, error);
InitIfNeeded(serverIp, error);
if (*error) {
return FI_ADDR_NOTAVAIL;
}
int result;
FabricAddress addrIdx;
result = fi_av_insertsvc(address_vector_, serverIp.c_str(), std::to_string(port).c_str(),
&addrIdx, 0, nullptr);
if (result != 1) {
*error = ErrorFromFabricInternal("fi_av_insertsvc", result);
return FI_ADDR_NOTAVAIL;
}
FabricHandshakePayload handshake {};
strcpy(handshake.hostnameAndPort, GetAddress().c_str());
RawSend(addrIdx, &handshake, sizeof(handshake), error);
if (*error) {
return 0;
}
// Zero sized payload
RawRecv(addrIdx, nullptr, 0, error);
return addrIdx;
}
void FabricClientImpl::InitIfNeeded(const std::string& targetIpHint, Error* error) {
const std::lock_guard<std::mutex> lock(initMutex_); // Will be released when scope is cleared
if (domain_) {
return; // Was already initialized
}
InitCommon(targetIpHint, 0, error);
}
#ifndef ASAPO_FABRIC_CLIENT_IMPL_H
#define ASAPO_FABRIC_CLIENT_IMPL_H
#include <asapo_fabric/asapo_fabric.h>
#include "../common/fabric_context_impl.h"
namespace asapo {
namespace fabric {
class FabricClientImpl : public FabricClient, public FabricContextImpl {
private:
std::mutex initMutex_;
public: // Link to FabricContext
std::string GetAddress() const override;
std::unique_ptr<FabricMemoryRegion> ShareMemoryRegion(void* src, size_t size, Error* error) override;
void Send(FabricAddress dstAddress, FabricMessageId messageId,
const void* src, size_t size, Error* error) override;
void Recv(FabricAddress srcAddress, FabricMessageId messageId,
void* dst, size_t size, Error* error) override;
void RdmaWrite(FabricAddress dstAddress,
const MemoryRegionDetails* details, const void* buffer, size_t size,
Error* error) override;
public:
FabricAddress AddServerAddress(const std::string& serverAddress, Error* error) override;
private:
void InitIfNeeded(const std::string& targetIpHint, Error* error);
};
}
}
#endif //ASAPO_FABRIC_CLIENT_IMPL_H
#include <io/io_factory.h>
#include <cstring>
#include <rdma/fi_cm.h>
#include <rdma/fi_domain.h>
#include <rdma/fi_endpoint.h>
#include <rdma/fi_rma.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <rdma/fi_tagged.h>
#include "fabric_context_impl.h"
#include "fabric_memory_region_impl.h"
using namespace asapo;
using namespace fabric;
std::string __PRETTY_FUNCTION_TO_NAMESPACE__(const std::string& prettyFunction) {
auto functionParamBegin = prettyFunction.find('(');
auto spaceBegin = prettyFunction.substr(0, functionParamBegin).find(' ');
return prettyFunction.substr(spaceBegin + 1, functionParamBegin - spaceBegin - 1);
}
// This marco checks if the call that is being made returns FI_SUCCESS. Should only be used with LiFabric functions
// *error is set to the corresponding LiFabric error
#define FI_OK(functionCall) \
do { \
int tmp_fi_status = functionCall; \
if(__builtin_expect(tmp_fi_status, FI_SUCCESS)) { \
std::string tmp_fi_s = #functionCall; \
*error = ErrorFromFabricInternal(__PRETTY_FUNCTION_TO_NAMESPACE__(__PRETTY_FUNCTION__) + " Line " + std::to_string(__LINE__) + ": " + tmp_fi_s.substr(0, tmp_fi_s.find('(')), tmp_fi_status);\
return; \
} \
} while(0) // Enforce ';'
// TODO: It is super important that version 1.10 is installed, but since its not released yet we go with 1.9
const uint32_t FabricContextImpl::kMinExpectedLibFabricVersion = FI_VERSION(1, 9);
FabricContextImpl::FabricContextImpl() : io__{ GenerateDefaultIO() }, alive_check_response_task_(this) {
}
FabricContextImpl::~FabricContextImpl() {
StopBackgroundThreads();
if (endpoint_)
fi_close(&endpoint_->fid);
if (completion_queue_)
fi_close(&completion_queue_->fid);
if (address_vector_)
fi_close(&address_vector_->fid);
if (domain_)
fi_close(&domain_->fid);
if (fabric_)
fi_close(&fabric_->fid);
if (fabric_info_)
fi_freeinfo(fabric_info_);
}
void FabricContextImpl::InitCommon(const std::string& networkIpHint, uint16_t serverListenPort, Error* error) {
const bool isServer = serverListenPort != 0;
// The server must know where the packages are coming from, FI_SOURCE allows this.
uint64_t additionalFlags = isServer ? FI_SOURCE : 0;
fi_info* hints = fi_allocinfo();
if (networkIpHint == "127.0.0.1") {
// sockets mode
hints->fabric_attr->prov_name = strdup("sockets");
hotfix_using_sockets_ = true;
} else {
// verbs mode
hints->fabric_attr->prov_name = strdup("verbs;ofi_rxm");
}
hints->ep_attr->type = FI_EP_RDM;
hints->caps = FI_TAGGED | FI_RMA | FI_DIRECTED_RECV | additionalFlags;
if (isServer) {
hints->src_addr = strdup(networkIpHint.c_str());
} else {
hints->dest_addr = strdup(networkIpHint.c_str());
}
// I've deliberately removed the FI_MR_LOCAL flag, which forces the user of the API to pre register the
// memory that is going to be transferred via RDMA.
// Since performance tests showed that the performance is roughly equal I've removed it.
hints->domain_attr->mr_mode = FI_MR_ALLOCATED | FI_MR_VIRT_ADDR | FI_MR_PROV_KEY;// | FI_MR_LOCAL;
hints->addr_format = FI_SOCKADDR_IN;
int ret = fi_getinfo(
kMinExpectedLibFabricVersion, networkIpHint.c_str(), isServer ? std::to_string(serverListenPort).c_str() : nullptr,
additionalFlags, hints, &fabric_info_);
if (ret) {
if (ret == -FI_ENODATA) {
*error = FabricErrorTemplates::kNoDeviceFoundError.Generate();
} else {
*error = ErrorFromFabricInternal("fi_getinfo", ret);
}
fi_freeinfo(hints);
return;
}
// fprintf(stderr, fi_tostr(fabric_info_, FI_TYPE_INFO)); // Print the found fabric details
// We have to reapply the memory mode because they get resetted
fabric_info_->domain_attr->mr_mode = hints->domain_attr->mr_mode;
// total_buffered_recv is a hint to the provider of the total available space that may be needed to buffer messages
// that are received for which there is no matching receive operation.
// fabric_info_->rx_attr->total_buffered_recv = 0;
// If something strange is happening with receive requests, we should set this to 0.
fi_freeinfo(hints);
FI_OK(fi_fabric(fabric_info_->fabric_attr, &fabric_, nullptr));
FI_OK(fi_domain(fabric_, fabric_info_, &domain_, nullptr));
fi_av_attr av_attr{};
FI_OK(fi_av_open(domain_, &av_attr, &address_vector_, nullptr));
fi_cq_attr cq_attr{};
if (serverListenPort) {
// The server must know where the data is coming from(FI_SOURCE) and what the MessageId(TAG) is.
cq_attr.format = FI_CQ_FORMAT_TAGGED;
}
cq_attr.wait_obj = FI_WAIT_UNSPEC; // Allow the wait of querying the cq
FI_OK(fi_cq_open(domain_, &cq_attr, &completion_queue_, nullptr));
FI_OK(fi_endpoint(domain_, fabric_info_, &endpoint_, nullptr));
FI_OK(fi_ep_bind(endpoint_, &address_vector_->fid, 0));
FI_OK(fi_ep_bind(endpoint_, &completion_queue_->fid, FI_RECV | FI_SEND));
FI_OK(fi_enable(endpoint_));
StartBackgroundThreads();
}
std::string FabricContextImpl::GetAddress() const {
sockaddr_in sin{};
size_t sin_size = sizeof(sin);
fi_getname(&(endpoint_->fid), &sin, &sin_size);
// TODO Maybe expose such a function to io__
switch(sin.sin_family) {
case AF_INET:
return std::string(inet_ntoa(sin.sin_addr)) + ":" + std::to_string(ntohs(sin.sin_port));
default:
throw std::runtime_error("Unknown addr family: " + std::to_string(sin.sin_family));
}
}
std::unique_ptr<FabricMemoryRegion> FabricContextImpl::ShareMemoryRegion(void* src, size_t size, Error* error) {
fid_mr* mr{};
auto region = std::unique_ptr<FabricMemoryRegionImpl>(new FabricMemoryRegionImpl());
int ret = fi_mr_reg(domain_, src, size,
FI_REMOTE_READ | FI_REMOTE_WRITE | FI_SEND | FI_RECV,
0, 0, 0, &mr, region.get());
if (ret != 0) {
*error = ErrorFromFabricInternal("fi_mr_reg", ret);
return nullptr;
}
region->SetArguments(mr, (uint64_t)src, size);
return std::unique_ptr<FabricMemoryRegion>(region.release());
}
void FabricContextImpl::Send(FabricAddress dstAddress, FabricMessageId messageId, const void* src, size_t size,
Error* error) {
HandleFiCommandWithBasicTaskAndWait(FI_ASAPO_ADDR_NO_ALIVE_CHECK, error,
fi_tsend, src, size, nullptr, dstAddress, messageId);
}
void FabricContextImpl::Recv(FabricAddress srcAddress, FabricMessageId messageId, void* dst, size_t size,
Error* error) {
HandleFiCommandWithBasicTaskAndWait(srcAddress, error,
fi_trecv, dst, size, nullptr, srcAddress, messageId, kRecvTaggedExactMatch);
}
void FabricContextImpl::RawSend(FabricAddress dstAddress, const void* src, size_t size, Error* error) {
HandleFiCommandWithBasicTaskAndWait(FI_ASAPO_ADDR_NO_ALIVE_CHECK, error,
fi_send, src, size, nullptr, dstAddress);
}
void FabricContextImpl::RawRecv(FabricAddress srcAddress, void* dst, size_t size, Error* error) {
HandleFiCommandWithBasicTaskAndWait(FI_ASAPO_ADDR_NO_ALIVE_CHECK, error,
fi_recv, dst, size, nullptr, srcAddress);
}
void
FabricContextImpl::RdmaWrite(FabricAddress dstAddress, const MemoryRegionDetails* details, const void* buffer,
size_t size,
Error* error) {
HandleFiCommandWithBasicTaskAndWait(dstAddress, error,
fi_write, buffer, size, nullptr, dstAddress, details->addr, details->key);
}
void FabricContextImpl::StartBackgroundThreads() {
background_threads_running_ = true;
completion_thread_ = io__->NewThread("ASAPO/FI/CQ", [this]() {
CompletionThread();
});
alive_check_response_task_.Start();
}
void FabricContextImpl::StopBackgroundThreads() {
alive_check_response_task_.Stop(); // This has to be done before we kill the completion thread
background_threads_running_ = false;
if (completion_thread_) {
completion_thread_->join();
completion_thread_ = nullptr;
}
}
void FabricContextImpl::CompletionThread() {
Error error;
fi_cq_tagged_entry entry{};
FabricAddress tmpAddress;
while(background_threads_running_ && !error) {
ssize_t ret;
ret = fi_cq_sreadfrom(completion_queue_, &entry, 1, &tmpAddress, nullptr, 10 /*ms*/);
switch (ret) {
case -FI_EAGAIN: // No data
std::this_thread::yield();
break;
case -FI_EAVAIL: // An error is in the queue
CompletionThreadHandleErrorAvailable(&error);
break;
case 1: { // We got 1 data entry back
auto task = (FabricWaitableTask*)(entry.op_context);
if (task) {
task->HandleCompletion(&entry, tmpAddress);
} else {
error = FabricErrorTemplates::kInternalError.Generate("nullptr context from fi_cq_sreadfrom");
}
break;
}
default:
error = ErrorFromFabricInternal("Unknown error while fi_cq_readfrom", ret);
break;
}
}
if (error) {
throw std::runtime_error("ASAPO Fabric CompletionThread exited with error: " + error->Explain());
}
}
void FabricContextImpl::CompletionThreadHandleErrorAvailable(Error* error) {
fi_cq_err_entry errEntry{};
ssize_t ret = fi_cq_readerr(completion_queue_, &errEntry, 0);
if (ret != 1) {
*error = ErrorFromFabricInternal("Unknown error while fi_cq_readerr", ret);
} else {
auto task = (FabricWaitableTask*)(errEntry.op_context);
if (task) {
task->HandleErrorCompletion(&errEntry);
} else if (hotfix_using_sockets_) {
printf("[Known Sockets bug libfabric/#5795] Ignoring nullptr task!\n");
} else {
*error = FabricErrorTemplates::kInternalError.Generate("nullptr context from fi_cq_readerr");
}
}
}
bool FabricContextImpl::TargetIsAliveCheck(FabricAddress address) {
Error error;
HandleFiCommandWithBasicTaskAndWait(FI_ASAPO_ADDR_NO_ALIVE_CHECK, &error,
fi_tsend, nullptr, 0, nullptr, address, FI_ASAPO_TAG_ALIVE_CHECK);
// If the send was successful, then we are still able to communicate with the peer
return !(error != nullptr);
}
void FabricContextImpl::InternalWait(FabricAddress targetAddress, FabricWaitableTask* task, Error* error) {
// Check if we simply can wait for our task
task->Wait(requestTimeoutMs_, error);
if (*error == FabricErrorTemplates::kTimeout) {
if (targetAddress == FI_ASAPO_ADDR_NO_ALIVE_CHECK) {
CancelTask(task, error);
// We expect the task to fail with 'Operation canceled'
if (*error == FabricErrorTemplates::kInternalOperationCanceledError) {
// Switch it to a timeout so its more clearly what happened
*error = FabricErrorTemplates::kTimeout.Generate();
}
} else {
InternalWaitWithAliveCheck(targetAddress, task, error);
}
}
}
void FabricContextImpl::InternalWaitWithAliveCheck(FabricAddress targetAddress, FabricWaitableTask* task,
Error* error) {// Handle advanced alive check
bool aliveCheckFailed = false;
for (uint32_t i = 0; i < maxTimeoutRetires_ && *error == FabricErrorTemplates::kTimeout; i++) {
*error = nullptr;
printf("HandleFiCommandAndWait - Tries: %d\n", i);
if (!TargetIsAliveCheck(targetAddress)) {
aliveCheckFailed = true;
break;
}
task->Wait(requestTimeoutMs_, error);
}
CancelTask(task, error);
if (aliveCheckFailed) {
*error = FabricErrorTemplates::kInternalConnectionError.Generate();
} else if(*error == FabricErrorTemplates::kInternalOperationCanceledError) {
*error = FabricErrorTemplates::kTimeout.Generate();
}
}
void FabricContextImpl::CancelTask(FabricWaitableTask* task, Error* error) {
*error = nullptr;
fi_cancel(&endpoint_->fid, task);
task->Wait(0, error); // You can probably expect a kInternalOperationCanceledError
}
#ifndef ASAPO_FABRIC_CONTEXT_IMPL_H
#define ASAPO_FABRIC_CONTEXT_IMPL_H
#include <io/io.h>
#include <rdma/fabric.h>
#include <rdma/fi_endpoint.h>
#include <memory>
#include <asapo_fabric/asapo_fabric.h>
#include <thread>
#include "task/fabric_waitable_task.h"
#include "../fabric_internal_error.h"
#include "task/fabric_alive_check_response_task.h"
namespace asapo {
namespace fabric {
#define FI_ASAPO_ADDR_NO_ALIVE_CHECK FI_ADDR_NOTAVAIL
#define FI_ASAPO_TAG_ALIVE_CHECK ((uint64_t) -1)
/**
* TODO: State of the bandages used in asapo to use RXM
* If you read this _in the future_ there are hopefully fixes for the following topics:
* Since RXM is connectionless, we do not know when an disconnect occurs.
* - Therefore when we try to receive data, we have added a targetAddress to HandleFiCommandAndWait,
* which might check if the peer is still responding to pings when a timeout occurs.
*
* Another issue is that in order to send data all addresses have to be added in an addressVector,
* unfortunately, this is also required to respond to a request.
* - So we added a handshake procedure that sends the local address of the client with a handshake to the server.
* This could be fixed by FI_SOURCE_ERR, which automatically
* adds new connections the AV which would obsolete the handshake.
* At the time of writing this, FI_SOURCE_ERR is not supported with verbs;ofi_rxm
*/
const static uint64_t kRecvTaggedAnyMatch = ~0ULL;
const static uint64_t kRecvTaggedExactMatch = 0;
// TODO Use a serialization framework
struct FabricHandshakePayload {
// Hostnames can be up to 256 Bytes long. We also need to store the port number.
char hostnameAndPort[512];
};
class FabricContextImpl : public FabricContext {
friend class FabricSelfRequeuingTask;
friend class FabricAliveCheckResponseTask;
public:
std::unique_ptr<IO> io__;
protected:
FabricAliveCheckResponseTask alive_check_response_task_;
fi_info* fabric_info_{};
fid_fabric* fabric_{};
fid_domain* domain_{};
fid_cq* completion_queue_{};
fid_av* address_vector_{};
fid_ep* endpoint_{};
uint64_t requestEnqueueTimeoutMs_ = 10000; // 10 sec for queuing a task
uint64_t requestTimeoutMs_ = 20000; // 20 sec to complete a task, otherwise a ping will be send
uint32_t maxTimeoutRetires_ = 5; // Timeout retires, if one of them fails, the task will fail with a timeout
std::unique_ptr<std::thread> completion_thread_;
bool background_threads_running_ = false;
private:
// Unfortunately when a client disconnects on sockets, a weird completion is generated. See libfabric/#5795
bool hotfix_using_sockets_ = false;
public:
explicit FabricContextImpl();
virtual ~FabricContextImpl();
static const uint32_t kMinExpectedLibFabricVersion;
std::string GetAddress() const override;
/// The memory will be shared until the result is freed
std::unique_ptr<FabricMemoryRegion> ShareMemoryRegion(void* src, size_t size, Error* error) override;
/// With message id
void Send(FabricAddress dstAddress, FabricMessageId messageId,
const void* src, size_t size, Error* error) override;
void Recv(FabricAddress srcAddress, FabricMessageId messageId,
void* dst, size_t size, Error* error) override;
/// Without message id - No alive check!
void RawSend(FabricAddress dstAddress,
const void* src, size_t size, Error* error);
void RawRecv(FabricAddress srcAddress,
void* dst, size_t size, Error* error);
/// Rdma
void RdmaWrite(FabricAddress dstAddress,
const MemoryRegionDetails* details, const void* buffer, size_t size,
Error* error) override;
protected:
/// If client serverListenPort == 0
void InitCommon(const std::string& networkIpHint, uint16_t serverListenPort, Error* error);
void StartBackgroundThreads();
void StopBackgroundThreads();
// If the targetAddress is FI_ASAPO_ADDR_NO_ALIVE_CHECK and a timeout occurs, no further ping is being done.
// Alive check is generally only necessary if you are trying to receive data or RDMA send.
template<class FuncType, class... ArgTypes>
inline void HandleFiCommandWithBasicTaskAndWait(FabricAddress targetAddress, Error* error,
FuncType func, ArgTypes... args) {
FabricWaitableTask task;
HandleFiCommandAndWait(targetAddress, &task, error, func, args...);
}
template<class FuncType, class... ArgTypes>
inline void HandleFiCommandAndWait(FabricAddress targetAddress, FabricWaitableTask* task, Error* error,
FuncType func, ArgTypes... args) {
HandleRawFiCommand(task, error, func, args...);
if (!(*error)) { // We successfully queued our request
InternalWait(targetAddress, task, error);
}
}
template<class FuncType, class... ArgTypes>
inline void HandleRawFiCommand(void* context, Error* error, FuncType func, ArgTypes... args) {
ssize_t ret;
// Since handling timeouts is an overhead, we first try to send the data regularly
ret = func(endpoint_, args..., context);
if (ret == -FI_EAGAIN) {
using namespace std::chrono;
using clock = std::chrono::high_resolution_clock;
auto maxTime = clock::now() + milliseconds(requestEnqueueTimeoutMs_);
do {
std::this_thread::sleep_for(milliseconds(3));
ret = func(endpoint_, args..., context);
} while (ret == -FI_EAGAIN && maxTime >= clock::now());
}
switch (-ret) {
case FI_SUCCESS:
// Success
break;
case FI_EAGAIN: // We felt trough our own timeout loop
*error = FabricErrorTemplates::kTimeout.Generate();
break;
case FI_ENOENT:
*error = FabricErrorTemplates::kConnectionRefusedError.Generate();
break;
default:
*error = ErrorFromFabricInternal("HandleRawFiCommand", ret);
break;
}
}
private:
bool TargetIsAliveCheck(FabricAddress address);
void CompletionThread();
void InternalWait(FabricAddress targetAddress, FabricWaitableTask* task, Error* error);
void InternalWaitWithAliveCheck(FabricAddress targetAddress, FabricWaitableTask* task, Error* error);
void CompletionThreadHandleErrorAvailable(Error* error);
void CancelTask(FabricWaitableTask* task, Error* error);
};
}
}
#endif //ASAPO_FABRIC_CONTEXT_IMPL_H
#include "fabric_memory_region_impl.h"
using namespace asapo;
using namespace fabric;
FabricMemoryRegionImpl::~FabricMemoryRegionImpl() {
if (mr_) {
fi_close(&mr_->fid);
}
}
void FabricMemoryRegionImpl::SetArguments(fid_mr* mr, uint64_t address, uint64_t length) {
mr_ = mr;
details_.addr = address;
details_.length = length;
details_.key = fi_mr_key(mr_);
}
const MemoryRegionDetails* FabricMemoryRegionImpl::GetDetails() const {
return &details_;
}
#ifndef ASAPO_FABRIC_MEMORY_REGION_IMPL_H
#define ASAPO_FABRIC_MEMORY_REGION_IMPL_H
#include <asapo_fabric/asapo_fabric.h>
#include <rdma/fi_domain.h>
namespace asapo {
namespace fabric {
class FabricMemoryRegionImpl : public FabricMemoryRegion {
private:
fid_mr* mr_{};
MemoryRegionDetails details_{};
public:
~FabricMemoryRegionImpl() override;
void SetArguments(fid_mr* mr, uint64_t address, uint64_t length);
const MemoryRegionDetails* GetDetails() const override;
};
}
}
#endif //ASAPO_FABRIC_MEMORY_REGION_IMPL_H
#include <rdma/fi_tagged.h>
#include "fabric_alive_check_response_task.h"
#include "../fabric_context_impl.h"
using namespace asapo;
using namespace fabric;
void FabricAliveCheckResponseTask::RequeueSelf() {
Error tmpError = nullptr;
ParentContext()->HandleRawFiCommand(this, &tmpError,
fi_trecv, nullptr, 0, nullptr, FI_ADDR_UNSPEC, FI_ASAPO_TAG_ALIVE_CHECK, kRecvTaggedExactMatch);
// Error is ignored
}
void FabricAliveCheckResponseTask::OnCompletion(const fi_cq_tagged_entry*, FabricAddress) {
// We received a ping, LibFabric will automatically notify the sender about the completion.
}
void FabricAliveCheckResponseTask::OnErrorCompletion(const fi_cq_err_entry*) {
// Error is ignored
}
FabricAliveCheckResponseTask::FabricAliveCheckResponseTask(FabricContextImpl* parentContext)
: FabricSelfRequeuingTask(parentContext) {
}
#ifndef ASAPO_FABRIC_ALIVE_CHECK_RESPONSE_TASK_H
#define ASAPO_FABRIC_ALIVE_CHECK_RESPONSE_TASK_H
#include "fabric_self_requeuing_task.h"
namespace asapo {
namespace fabric {
/**
* This is the counter part of FabricContextImpl.TargetIsAliveCheck
*/
class FabricAliveCheckResponseTask : public FabricSelfRequeuingTask {
public:
explicit FabricAliveCheckResponseTask(FabricContextImpl* parentContext);
protected:
void RequeueSelf() override;
void OnCompletion(const fi_cq_tagged_entry* entry, FabricAddress source) override;
void OnErrorCompletion(const fi_cq_err_entry* errEntry) override;
};
}
}
#endif //ASAPO_FABRIC_ALIVE_CHECK_RESPONSE_TASK_H
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment