diff --git a/CMakeLists.txt b/CMakeLists.txt index cbe091aba1111b693fa4a9ca90e4a13ad809a1a0..f444089d0209112a31fa486ea7da8003692bac36 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,10 +17,11 @@ ENDIF(WIN32) #TODO: Better way then GLOBAL PROPERTY IF(WIN32) find_package(Threads REQUIRED) - SET_PROPERTY(GLOBAL PROPERTY ASAPO_COMMON_IO_LIBRARIES ${CMAKE_THREAD_LIBS_INIT} wsock32 ws2_32) + SET(ASAPO_COMMON_IO_LIBRARIES ${CMAKE_THREAD_LIBS_INIT} wsock32 ws2_32) ELSEIF(UNIX) - SET_PROPERTY(GLOBAL PROPERTY ASAPO_COMMON_IO_LIBRARIES Threads::Threads) + SET(ASAPO_COMMON_IO_LIBRARIES Threads::Threads) ENDIF(WIN32) +SET_PROPERTY(GLOBAL PROPERTY ASAPO_COMMON_IO_LIBRARIES ${ASAPO_COMMON_IO_LIBRARIES}) if (CMAKE_BUILD_TYPE STREQUAL "Debug") add_definitions(-DUNIT_TESTS) @@ -63,6 +64,8 @@ if ("${Python_EXECUTABLE}" STREQUAL "") endif() message (STATUS "Using Python: ${Python_EXECUTABLE}") + +SET(ASAPO_COMMON_FABRIC_LIBRARIES ${ASAPO_COMMON_IO_LIBRARIES}) IF(ENABLE_LIBFABRIC) find_package(LibFabric) if(NOT LIBFABRIC_LIBRARY) @@ -70,9 +73,10 @@ IF(ENABLE_LIBFABRIC) endif() message(STATUS "LibFabric support enabled") message(STATUS "LIB_FABRIC: Path: ${LIBFABRIC_LIBRARY} Include: ${LIBFABRIC_INCLUDE_DIR}") - SET_PROPERTY(GLOBAL PROPERTY ASAPO_COMMON_FABRIC_LIBRARIES ${ASAPO_COMMON_IO_LIBRARIES} libfabric) add_definitions(-DLIBFABRIC_ENABLED) + SET(ASAPO_COMMON_FABRIC_LIBRARIES ${ASAPO_COMMON_FABRIC_LIBRARIES} fabric) ENDIF() +SET_PROPERTY(GLOBAL PROPERTY ASAPO_COMMON_FABRIC_LIBRARIES ${ASAPO_COMMON_FABRIC_LIBRARIES}) # format sources include(astyle) diff --git a/common/cpp/include/asapo_fabric/asapo_fabric.h b/common/cpp/include/asapo_fabric/asapo_fabric.h index 5399c3a3b5d046c9c09ce74f9fc63cfb1399dbc6..c4470bc7a1f682d8c71e2edd4093e46f52172110 100644 --- a/common/cpp/include/asapo_fabric/asapo_fabric.h +++ b/common/cpp/include/asapo_fabric/asapo_fabric.h @@ -26,9 +26,6 @@ namespace asapo { namespace fabric { class FabricContext { public: - /// If this function is not called, the default timeout is 5000 ms - virtual void SetRequestTimeout(uint64_t msTimeout) = 0; - virtual std::string GetAddress() const = 0; virtual std::unique_ptr<FabricMemoryRegion> ShareMemoryRegion(void* src, size_t size, Error* error) = 0; @@ -40,7 +37,7 @@ namespace asapo { namespace fabric { void* dst, size_t size, Error* error) = 0; virtual void RdmaWrite(FabricAddress dstAddress, - MemoryRegionDetails* details, const void* buffer, size_t size, + const MemoryRegionDetails* details, const void* buffer, size_t size, Error* error) = 0; // Since RdmaRead heavily impacts the performance we will not implement this @@ -53,6 +50,7 @@ namespace asapo { namespace fabric { public: virtual ~FabricClient() = default; + /// The serverAddress must be in this format: "hostname:port" virtual FabricAddress AddServerAddress(const std::string& serverAddress, Error* error) = 0; }; @@ -60,13 +58,20 @@ namespace asapo { namespace fabric { public: virtual ~FabricServer() = default; - virtual void RecvAny(FabricAddress* srcAddress, FabricMessageId* messageId, void* src, size_t size, Error* error) = 0; + virtual void RecvAny(FabricAddress* srcAddress, FabricMessageId* messageId, void* dst, size_t size, Error* error) = 0; }; class FabricFactory { public: - virtual std::unique_ptr<FabricServer> CreateAndBindServer(Error* error) const = 0; - + /** + * Creates a new server and will immediately allocate and listen to the given host:port + */ + virtual std::unique_ptr<FabricServer> + CreateAndBindServer(const std::string& host, uint16_t port, Error* error) const = 0; + + /** + * Will allocate a proper domain as soon as the client gets his first server address added + */ virtual std::unique_ptr<FabricClient> CreateClient(Error* error) const = 0; }; diff --git a/common/cpp/include/common/error.h b/common/cpp/include/common/error.h index e2a49f858bf78b1961948d2ae73dbfec85205402..c2259b79b551f0b3a1449ce30a55443aa75951da 100644 --- a/common/cpp/include/common/error.h +++ b/common/cpp/include/common/error.h @@ -18,6 +18,7 @@ enum class ErrorType { kConsumerError, kMemoryAllocationError, kEndOfFile, + kFabricError, }; class ErrorInterface; @@ -213,7 +214,7 @@ class ServiceErrorTemplate : public SimpleErrorTemplate { } inline Error Generate(const std::string& suffix) const noexcept override { - return Error(new ServiceError<ServiceErrorType, MainErrorType>(error_ + " :" + suffix, error_type_)); + return Error(new ServiceError<ServiceErrorType, MainErrorType>(error_ + ": " + suffix, error_type_)); } inline bool operator==(const Error& rhs) const override { diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index 5348d85a9414cd3642edf9400f897496dc4c4e2a..d1c79909bd44b9fe83f7833ffe461c57d8c05d99 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -47,13 +47,13 @@ struct GenericRequestHeader { uint64_t i_data_size = 0, uint64_t i_meta_size = 0, const std::string& i_message = "", const std::string& i_substream = ""): op_code{i_op_code}, data_id{i_data_id}, data_size{i_data_size}, meta_size{i_meta_size} { - strncpy(message, i_message.c_str(), kMaxMessageSize); + strncpy(message, i_message.c_str(), kMaxMessageSize); // TODO must be memcpy in order to send raw MemoryDetails strncpy(substream, i_substream.c_str(), kMaxMessageSize); } GenericRequestHeader(const GenericRequestHeader& header) { op_code = header.op_code, data_id = header.data_id, data_size = header.data_size, meta_size = header.meta_size, memcpy(custom_data, header.custom_data, kNCustomParams * sizeof(uint64_t)), - strncpy(message, header.message, kMaxMessageSize); + strncpy(message, header.message, kMaxMessageSize); // TODO must be memcpy in order to send raw MemoryDetails strncpy(substream, header.substream, kMaxMessageSize); } diff --git a/common/cpp/include/io/io.h b/common/cpp/include/io/io.h index dade78385798a06a20ba647f009d2f0b7a5c153b..c5c8f60b02efeebc0d6fc0f206fdeb0c24481b17 100644 --- a/common/cpp/include/io/io.h +++ b/common/cpp/include/io/io.h @@ -92,6 +92,9 @@ class IO { * @param err Since CloseSocket if often used in an error case, it's able to accept err as nullptr. */ virtual void CloseSocket(SocketDescriptor socket_fd, Error* err) const = 0; + virtual std::string AddressFromSocket(SocketDescriptor socket) const noexcept = 0; + virtual std::string GetHostName(Error* err) const noexcept = 0; + virtual std::unique_ptr<std::tuple<std::string, uint16_t>> SplitAddressToHostnameAndPort(std::string address) const = 0; /* * Filesystem @@ -117,8 +120,6 @@ class IO { virtual std::vector<FileInfo> FilesInFolder (const std::string& folder, Error* err) const = 0; virtual std::string ReadFileToString (const std::string& fname, Error* err) const = 0; virtual Error GetLastError() const = 0; - virtual std::string AddressFromSocket(SocketDescriptor socket) const noexcept = 0; - virtual std::string GetHostName(Error* err) const noexcept = 0; virtual FileInfo GetFileInfo(const std::string& name, Error* err) const = 0; virtual ~IO() = default; diff --git a/common/cpp/src/asapo_fabric/CMakeLists.txt b/common/cpp/src/asapo_fabric/CMakeLists.txt index ad7e8b35488aa9aee1c5d083ac79b1418fac475c..3d49d2d706ffa44af6c0293bc2bef860b913532b 100644 --- a/common/cpp/src/asapo_fabric/CMakeLists.txt +++ b/common/cpp/src/asapo_fabric/CMakeLists.txt @@ -6,11 +6,20 @@ set(SOURCE_FILES asapo_fabric.cpp) IF(ENABLE_LIBFABRIC) set(SOURCE_FILES ${SOURCE_FILES} - fabric_factory_impl.cpp + fabric_error.cpp + fabric_factory_impl.cpp + common/fabric_context_impl.cpp + common/fabric_memory_region_impl.cpp + common/fabric_waitable_task.cpp + client/fabric_client_impl.cpp + server/fabric_server_impl.cpp + server/task/fabric_recv_any_task.cpp + server/task/fabric_handshake_accepting_task.cpp + server/task/fabric_self_deleting_task.cpp ) ELSE() set(SOURCE_FILES ${SOURCE_FILES} - fabric_factory_not_supported.cpp + fabric_factory_not_supported.cpp ) ENDIF() @@ -18,6 +27,6 @@ ENDIF() # Library ################################ -add_library(${TARGET_NAME} STATIC ${SOURCE_FILES}) +add_library(${TARGET_NAME} STATIC ${SOURCE_FILES} $<TARGET_OBJECTS:system_io>) target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR}) diff --git a/common/cpp/src/asapo_fabric/client/fabric_client_impl.cpp b/common/cpp/src/asapo_fabric/client/fabric_client_impl.cpp new file mode 100644 index 0000000000000000000000000000000000000000..503d52e1c14b86349b739923f7c893e9ccdd647f --- /dev/null +++ b/common/cpp/src/asapo_fabric/client/fabric_client_impl.cpp @@ -0,0 +1,91 @@ +#include "fabric_client_impl.h" +#include <rdma/fi_domain.h> +#include <cstring> + +using namespace asapo; +using namespace fabric; + +std::string FabricClientImpl::GetAddress() const { + return FabricContextImpl::GetAddress(); +} + +std::unique_ptr<FabricMemoryRegion> FabricClientImpl::ShareMemoryRegion(void* src, size_t size, Error* error) { + if (!domain_) { + *error = FabricErrorTemplates::kClientNotInitializedError.Generate(); + return nullptr; + } + return FabricContextImpl::ShareMemoryRegion(src, size, error); +} + +void FabricClientImpl::Send(FabricAddress dstAddress, FabricMessageId messageId, const void* src, size_t size, + Error* error) { + if (!domain_) { + *error = FabricErrorTemplates::kClientNotInitializedError.Generate(); + return; + } + FabricContextImpl::Send(dstAddress, messageId, src, size, error); +} + +void FabricClientImpl::Recv(FabricAddress srcAddress, FabricMessageId messageId, void* dst, size_t size, Error* error) { + if (!domain_) { + *error = FabricErrorTemplates::kClientNotInitializedError.Generate(); + return; + } + FabricContextImpl::Recv(srcAddress, messageId, dst, size, error); +} + +void +FabricClientImpl::RdmaWrite(FabricAddress dstAddress, const MemoryRegionDetails* details, const void* buffer, size_t size, + Error* error) { + if (!domain_) { + *error = FabricErrorTemplates::kClientNotInitializedError.Generate(); + return; + } + FabricContextImpl::RdmaWrite(dstAddress, details, buffer, size, error); +} + +FabricAddress FabricClientImpl::AddServerAddress(const std::string& serverAddress, Error* error) { + std::string hostname; + uint16_t port; + std::tie(hostname, port) = *io__->SplitAddressToHostnameAndPort(serverAddress); + std::string serverIp = io__->ResolveHostnameToIp(hostname, error); + + InitIfNeeded(serverIp, error); + if (*error) { + return FI_ADDR_NOTAVAIL; + } + + int result; + FabricAddress addrIdx; + + result = fi_av_insertsvc(address_vector_, serverIp.c_str(), std::to_string(port).c_str(), + &addrIdx, 0, nullptr); + if (result != 1) { + *error = ErrorFromFabricInternal("fi_av_insertsvc", result); + return FI_ADDR_NOTAVAIL; + } + + DBG("Inserted '" + serverIp + "' with port " + std::to_string(port) + " as id: " + std::to_string(addrIdx)); + + FabricHandshakePayload handshake {}; + strcpy(handshake.hostnameAndPort, GetAddress().c_str()); + RawSend(addrIdx, &handshake, sizeof(handshake), error); + if (*error) { + return 0; + } + + // Zero sized payload + RawRecv(addrIdx, nullptr, 0, error); + + return addrIdx; +} + +void FabricClientImpl::InitIfNeeded(const std::string& targetIpHint, Error* error) { + const std::lock_guard<std::mutex> lock(initMutex_); // Will be released when scope is cleared + + if (domain_) { + return; // Was already initialized + } + + InitCommon(targetIpHint, 0, error); +} diff --git a/common/cpp/src/asapo_fabric/client/fabric_client_impl.h b/common/cpp/src/asapo_fabric/client/fabric_client_impl.h new file mode 100644 index 0000000000000000000000000000000000000000..eeb378750db3c8a25bfffcd277dc97352b339197 --- /dev/null +++ b/common/cpp/src/asapo_fabric/client/fabric_client_impl.h @@ -0,0 +1,35 @@ +#ifndef ASAPO_FABRIC_CLIENT_IMPL_H +#define ASAPO_FABRIC_CLIENT_IMPL_H + +#include <asapo_fabric/asapo_fabric.h> +#include "../common/fabric_context_impl.h" + +namespace asapo { namespace fabric { + +class FabricClientImpl : public FabricClient, public FabricContextImpl { +private: + std::mutex initMutex_; +public: // Link to FabricContext + std::string GetAddress() const override; + + std::unique_ptr<FabricMemoryRegion> ShareMemoryRegion(void* src, size_t size, Error* error) override; + + void Send(FabricAddress dstAddress, FabricMessageId messageId, + const void* src, size_t size, Error* error) override; + + void Recv(FabricAddress srcAddress, FabricMessageId messageId, + void* dst, size_t size, Error* error) override; + + void RdmaWrite(FabricAddress dstAddress, + const MemoryRegionDetails* details, const void* buffer, size_t size, + Error* error) override; +public: + FabricAddress AddServerAddress(const std::string& serverAddress, Error* error) override; + +private: + void InitIfNeeded(const std::string& targetIpHint, Error* error); +}; + +}} + +#endif //ASAPO_FABRIC_CLIENT_IMPL_H diff --git a/common/cpp/src/asapo_fabric/common/fabric_context_impl.cpp b/common/cpp/src/asapo_fabric/common/fabric_context_impl.cpp new file mode 100644 index 0000000000000000000000000000000000000000..55a341b9a48134cda5bf186382c8e241c5bd643c --- /dev/null +++ b/common/cpp/src/asapo_fabric/common/fabric_context_impl.cpp @@ -0,0 +1,257 @@ +#include <io/io_factory.h> +#include <cstring> +#include <rdma/fi_cm.h> +#include <rdma/fi_domain.h> +#include <rdma/fi_endpoint.h> +#include <rdma/fi_rma.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <rdma/fi_tagged.h> +#include "fabric_context_impl.h" +#include "fabric_memory_region_impl.h" + +using namespace asapo; +using namespace fabric; + +std::string __PRETTY_FUNCTION_TO_NAMESPACE__(const std::string& prettyFunction) +{ + auto functionParamBegin = prettyFunction.find('('); + auto spaceBegin = prettyFunction.substr(0, functionParamBegin).find(' '); + return prettyFunction.substr(spaceBegin + 1, functionParamBegin - spaceBegin - 1); +} + +#define FI_OK(functionCall) \ + do { \ + int tmp_fi_status = functionCall; \ + if(__builtin_expect(tmp_fi_status, FI_SUCCESS)) { \ + std::string tmp_fi_s = #functionCall; \ + *error = ErrorFromFabricInternal(__PRETTY_FUNCTION_TO_NAMESPACE__(__PRETTY_FUNCTION__) + " Line " + std::to_string(__LINE__) + ": " + tmp_fi_s.substr(0, tmp_fi_s.find('(')), tmp_fi_status);\ + return; \ + } \ + } while(0) + +const uint32_t FabricContextImpl::kMinExpectedLibFabricVersion = FI_VERSION(1, 9); + +FabricContextImpl::FabricContextImpl() : io__{ GenerateDefaultIO() } { + +} + +FabricContextImpl::~FabricContextImpl() { + StopBackgroundThreads(); + + if (endpoint_) + fi_close(&endpoint_->fid); + + if (completion_queue_) + fi_close(&completion_queue_->fid); + + if (address_vector_) + fi_close(&address_vector_->fid); + + if (domain_) + fi_close(&domain_->fid); + + if (fabric_) + fi_close(&fabric_->fid); + + if (fabric_info_) + fi_freeinfo(fabric_info_); +} + +std::string FabricContextImpl::GetAddress() const { + sockaddr_in sin{}; + size_t sin_size = sizeof(sin); + fi_getname(&(endpoint_->fid), &sin, &sin_size); + + // TODO Maybe expose such a function to io__ + switch(sin.sin_family) { + case AF_INET: + return std::string(inet_ntoa(sin.sin_addr)) + ":" + std::to_string(ntohs(sin.sin_port)); + default: + throw std::runtime_error("Unknown addr family: " + std::to_string(sin.sin_family)); + } +} + +std::unique_ptr<FabricMemoryRegion> FabricContextImpl::ShareMemoryRegion(void* src, size_t size, Error* error) { + fid_mr* mr{}; + auto region = std::unique_ptr<FabricMemoryRegionImpl>(new FabricMemoryRegionImpl()); + int ret = fi_mr_reg(domain_, src, size, + FI_REMOTE_READ | FI_REMOTE_WRITE | FI_SEND | FI_RECV, + 0, 0, 0, &mr, region.get()); + + if (ret != 0) { + *error = ErrorFromFabricInternal("fi_mr_reg", ret); + return nullptr; + } + + region->SetArguments(mr, (uint64_t)src, size); + return region; +} + +void FabricContextImpl::Send(FabricAddress dstAddress, FabricMessageId messageId, const void* src, size_t size, + Error* error) { + HandleFiCommandWithBasicTaskAndWait(fi_tsend, error, + endpoint_, src, size, nullptr, dstAddress, messageId); +} + +void FabricContextImpl::Recv(FabricAddress srcAddress, FabricMessageId messageId, void* dst, size_t size, + Error* error) { + HandleFiCommandWithBasicTaskAndWait(fi_trecv, error, + endpoint_, dst, size, nullptr, srcAddress, messageId, 0); +} + +void FabricContextImpl::RawSend(FabricAddress dstAddress, const void* src, size_t size, Error* error) { + HandleFiCommandWithBasicTaskAndWait(fi_send, error, + endpoint_, src, size, nullptr, dstAddress); +} + +void FabricContextImpl::RawRecv(FabricAddress srcAddress, void* dst, size_t size, Error* error) { + HandleFiCommandWithBasicTaskAndWait(fi_recv, error, + endpoint_, dst, size, nullptr, srcAddress); +} + +void +FabricContextImpl::RdmaWrite(FabricAddress dstAddress, const MemoryRegionDetails* details, const void* buffer, size_t size, + Error* error) { + HandleFiCommandWithBasicTaskAndWait(fi_write, error, + endpoint_, buffer, size, nullptr, dstAddress, details->addr, details->key); + +} + +void FabricContextImpl::InitCommon(const std::string& networkIpHint, uint16_t serverListenPort, Error* error) { + const bool isServer = serverListenPort != 0; + + // The server must know where the packages are coming from, FI_SOURCE allows this. + uint64_t additionalFlags = isServer ? FI_SOURCE : 0; + + fi_info* hints = fi_allocinfo(); + // We somehow have to know if we should allocate a dummy sockets domain or a real verbs domain + if (networkIpHint == "127.0.0.1") { + // sockets mode + hints->fabric_attr->prov_name = strdup("sockets"); + DBG("Using sockets provider. (Should only be used for integration tests)"); + } else { + // verbs mode + hints->fabric_attr->prov_name = strdup("verbs;ofi_rxm"); + hints->domain_attr->caps = FI_RMA | FI_TAGGED | FI_REMOTE_WRITE | FI_SOURCE | FI_SEND | FI_RECV; + } + + if (isServer) { + hints->src_addr = strdup(networkIpHint.c_str()); + } else { + hints->dest_addr = strdup(networkIpHint.c_str()); + } + + hints->domain_attr->mr_mode = FI_MR_ALLOCATED | FI_MR_VIRT_ADDR | FI_MR_PROV_KEY; + hints->ep_attr->type = FI_EP_RDM; + hints->addr_format = FI_SOCKADDR_IN; + + int ret = fi_getinfo( + kMinExpectedLibFabricVersion, networkIpHint.c_str(), isServer ? std::to_string(serverListenPort).c_str() : nullptr, + additionalFlags, hints, &fabric_info_); + + if (ret) { + if (ret == -FI_ENODATA) { + *error = FabricErrorTemplates::kNoDeviceFoundError.Generate(); + } else { + *error = ErrorFromFabricInternal("fi_getinfo", ret); + } + fi_freeinfo(hints); + return; + } + DBG(fi_tostr(fabric_info_, FI_TYPE_INFO)); + + // We have to reapply the memory mode because they get resetted + fabric_info_->domain_attr->mr_mode = hints->domain_attr->mr_mode; + fi_freeinfo(hints); + + FI_OK(fi_fabric(fabric_info_->fabric_attr, &fabric_, nullptr)); + FI_OK(fi_domain(fabric_, fabric_info_, &domain_, nullptr)); + + fi_av_attr av_attr{}; + FI_OK(fi_av_open(domain_, &av_attr, &address_vector_, nullptr)); + + fi_cq_attr cq_attr{}; + if (serverListenPort) { + // The server must know where the data is coming from(FI_SOURCE) and what the tag is(MessageId). + cq_attr.format = FI_CQ_FORMAT_TAGGED; + } + FI_OK(fi_cq_open(domain_, &cq_attr, &completion_queue_, nullptr)); + + FI_OK(fi_endpoint(domain_, fabric_info_, &endpoint_, nullptr)); + FI_OK(fi_ep_bind(endpoint_, &address_vector_->fid, 0)); + FI_OK(fi_ep_bind(endpoint_, &completion_queue_->fid, FI_RECV | FI_SEND)); + + FI_OK(fi_enable(endpoint_)); + + StartBackgroundThreads(); +} + +void FabricContextImpl::StartBackgroundThreads() { + background_threads_running_ = true; + + completion_thread_ = io__->NewThread("ASAPO/FI/CQ", [this]() { + CompletionThreadTask(); + }); +} + +void FabricContextImpl::StopBackgroundThreads() { + background_threads_running_ = false; + if (completion_thread_) { + DBG("Stop completion thread - wait"); + completion_thread_->join(); + completion_thread_ = nullptr; + DBG("Stop completion thread - done"); + } +} + +void FabricContextImpl::CompletionThreadTask() { + Error error; + fi_cq_tagged_entry entry; + FabricAddress tmpAddress; + while(background_threads_running_ && !error) { + ssize_t ret; + memset(&entry, 0, sizeof(entry)); + ret = fi_cq_readfrom(completion_queue_, &entry, 1, &tmpAddress); + if (ret == -FI_EAGAIN) { + //std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::yield(); + continue; // No data + } + + if (ret == -FI_EAVAIL) { + DBG("Ich habe einen error"); + fi_cq_err_entry errEntry{}; + ret = fi_cq_readerr(completion_queue_, &errEntry, 0); + DBG("After fi_cq_readerr"); + if (ret != 1) { + error = ErrorFromFabricInternal("Unknown error while fi_cq_readerr", ret); + } else { + auto task = (FabricWaitableTask*)(errEntry.op_context); + if (task) { + task->HandleErrorCompletion(&errEntry); + } else { + DBG("[WARN] errEntry.op_context without context!"); + } + } + + continue; + } + + if (ret != 1) { + error = ErrorFromFabricInternal("Unknown error while fi_cq_readfrom", ret); + break; + } + + auto task = (FabricWaitableTask*)(entry.op_context); + if (task) { + task->HandleCompletion(&entry, tmpAddress); + } else { + DBG("[WARN] entry.op_context without context!"); + } + } + + if (error) { + DBG("[Error] CompletionThreadTask exited with error: " + error->Explain()); + } +} diff --git a/common/cpp/src/asapo_fabric/common/fabric_context_impl.h b/common/cpp/src/asapo_fabric/common/fabric_context_impl.h new file mode 100644 index 0000000000000000000000000000000000000000..ed054b0b41fc95865e7d4320a8d7dc6cc2ed9817 --- /dev/null +++ b/common/cpp/src/asapo_fabric/common/fabric_context_impl.h @@ -0,0 +1,127 @@ +#ifndef ASAPO_FABRIC_CONTEXT_IMPL_H +#define ASAPO_FABRIC_CONTEXT_IMPL_H + +#include <io/io.h> +#include <rdma/fabric.h> +#include <memory> +#include <asapo_fabric/asapo_fabric.h> +#include <thread> +#include "fabric_waitable_task.h" +#include "../fabric_error.h" + +namespace asapo { namespace fabric { + + static auto start = std::chrono::high_resolution_clock::now(); + inline void DBG(const std::string& message) { + std::string tmpMessage = "[FABRIC DEBUG][" + std::to_string(std::chrono::duration_cast<std::chrono::microseconds>( std::chrono::high_resolution_clock::now() - start ).count()) + "] " + message; + fprintf(stderr, "%s\n", tmpMessage.c_str()); + } + +#pragma pack(push, 1) + struct FabricHandshakePayload { + // Hostnames can be up to 256 Bytes long. We also need to store the port number. + char hostnameAndPort[512]; + }; +#pragma pack(pop) + +#define TODO_UNKNOWN_ADDRESS 0//(FI_ADDR_NOTAVAIL - 1) + + class FabricContextImpl : public FabricContext { + public: + std::unique_ptr<IO> io__; + protected: + fi_info* fabric_info_{}; + fid_fabric* fabric_{}; + fid_domain* domain_{}; + fid_cq* completion_queue_{}; + fid_av* address_vector_{}; + fid_ep* endpoint_{}; + + uint64_t requestTimeoutMs_ = 5000; + + std::unique_ptr<std::thread> completion_thread_; + bool background_threads_running_ = false; + public: + explicit FabricContextImpl(); + virtual ~FabricContextImpl(); + + static const uint32_t kMinExpectedLibFabricVersion; + + std::string GetAddress() const override; + + /// The memory will be shared until the result is freed + std::unique_ptr<FabricMemoryRegion> ShareMemoryRegion(void* src, size_t size, Error* error) override; + + /// With message id + void Send(FabricAddress dstAddress, FabricMessageId messageId, + const void* src, size_t size, Error* error) override; + void Recv(FabricAddress srcAddress, FabricMessageId messageId, + void* dst, size_t size, Error* error) override; + + /// Without message id + void RawSend(FabricAddress dstAddress, + const void* src, size_t size, Error* error); + void RawRecv(FabricAddress srcAddress, + void* dst, size_t size, Error* error); + + /// Rdma + void RdmaWrite(FabricAddress dstAddress, + const MemoryRegionDetails* details, const void* buffer, size_t size, + Error* error) override; + + protected: + /// If client serverListenPort == 0 + void InitCommon(const std::string& networkIpHint, uint16_t serverListenPort, Error* error); + + void StartBackgroundThreads(); + void StopBackgroundThreads(); + + template<class FuncType, class... ArgTypes> + inline void HandleFiCommandWithBasicTaskAndWait(FuncType func, Error* error, ArgTypes... args) { + FabricWaitableTask task; + HandleFiCommandAndWait(func, &task, error, args...); + } + + template<class FuncType, class... ArgTypes> + inline void HandleFiCommandAndWait(FuncType func, FabricWaitableTask* task, Error* error, ArgTypes... args) { + HandleFiCommand(func, task, error, args...); + if (!(*error)) { + task->Wait(error); + } + } + + template<class FuncType, class... ArgTypes> + inline void HandleFiCommand(FuncType func, void* context, Error* error, ArgTypes... args) { + ssize_t ret; + // Since handling timeouts is an overhead, we first try to send the data regularly + ret = func(args..., context); + if (ret == -FI_EAGAIN) { + fprintf(stderr, "[WARN|HandleFiCommandAndWait] Start time tracker overhead\n"); + + using namespace std::chrono; + using clock = std::chrono::high_resolution_clock; + auto maxTime = clock::now() + milliseconds(requestTimeoutMs_); + + do { + std::this_thread::sleep_for(milliseconds(3)); + ret = func(args..., context); + } while (ret == -FI_EAGAIN && maxTime >= clock::now()); + } + + if (ret != 0) { + if (ret == -FI_EAGAIN) { + *error = TextError("Timeout"); + } else { + *error = ErrorFromFabricInternal("HandleFiCommandAndWait", ret); + } + return; + } + } + + private: + void CompletionThreadTask(); + }; + +}} + +#endif //ASAPO_FABRIC_CONTEXT_IMPL_H diff --git a/common/cpp/src/asapo_fabric/common/fabric_memory_region_impl.cpp b/common/cpp/src/asapo_fabric/common/fabric_memory_region_impl.cpp new file mode 100644 index 0000000000000000000000000000000000000000..fad3e15ed01e9cf646aa5bf8eef164fe2840d8d4 --- /dev/null +++ b/common/cpp/src/asapo_fabric/common/fabric_memory_region_impl.cpp @@ -0,0 +1,18 @@ +#include "fabric_memory_region_impl.h" + +asapo::fabric::FabricMemoryRegionImpl::~FabricMemoryRegionImpl() { + if (mr_) { + fi_close(&mr_->fid); + } +} + +void asapo::fabric::FabricMemoryRegionImpl::SetArguments(fid_mr* mr, uint64_t address, uint64_t length) { + mr_ = mr; + details_.addr = address; + details_.length = length; + details_.key = fi_mr_key(mr_); +} + +asapo::fabric::MemoryRegionDetails* asapo::fabric::FabricMemoryRegionImpl::GetDetails() { + return &details_; +} diff --git a/common/cpp/src/asapo_fabric/common/fabric_memory_region_impl.h b/common/cpp/src/asapo_fabric/common/fabric_memory_region_impl.h new file mode 100644 index 0000000000000000000000000000000000000000..65552b82f8e2f768afaa22d59d4937be5c9a357f --- /dev/null +++ b/common/cpp/src/asapo_fabric/common/fabric_memory_region_impl.h @@ -0,0 +1,22 @@ +#ifndef ASAPO_FABRIC_MEMORY_REGION_IMPL_H +#define ASAPO_FABRIC_MEMORY_REGION_IMPL_H + +#include <asapo_fabric/asapo_fabric.h> +#include <rdma/fi_domain.h> + +namespace asapo { namespace fabric { + class FabricMemoryRegionImpl : public FabricMemoryRegion { + private: + fid_mr* mr_{}; + MemoryRegionDetails details_{}; + public: + ~FabricMemoryRegionImpl() override; + + void SetArguments(fid_mr* mr, uint64_t address, uint64_t length); + + MemoryRegionDetails* GetDetails() override; + }; +}} + + +#endif //ASAPO_FABRIC_MEMORY_REGION_IMPL_H diff --git a/common/cpp/src/asapo_fabric/common/fabric_task.h b/common/cpp/src/asapo_fabric/common/fabric_task.h new file mode 100644 index 0000000000000000000000000000000000000000..0509bb0904d4e479a5a8f49451706a16c10f5d68 --- /dev/null +++ b/common/cpp/src/asapo_fabric/common/fabric_task.h @@ -0,0 +1,15 @@ +#ifndef ASAPO_FABRIC_TASK_H +#define ASAPO_FABRIC_TASK_H + +#include <asapo_fabric/asapo_fabric.h> +#include <rdma/fi_eq.h> + +namespace asapo { namespace fabric { + class FabricTask { + public: + virtual void HandleCompletion(const fi_cq_tagged_entry* entry, FabricAddress source) = 0; + virtual void HandleErrorCompletion(fi_cq_err_entry* errEntry) = 0; + }; +}} + +#endif //ASAPO_FABRIC_TASK_H diff --git a/common/cpp/src/asapo_fabric/common/fabric_waitable_task.cpp b/common/cpp/src/asapo_fabric/common/fabric_waitable_task.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0bb3ad0c200cbdd3a00a4df006fa8d2e03214fa8 --- /dev/null +++ b/common/cpp/src/asapo_fabric/common/fabric_waitable_task.cpp @@ -0,0 +1,28 @@ +#include "fabric_waitable_task.h" +#include "../fabric_error.h" + +using namespace asapo; +using namespace fabric; + +FabricWaitableTask::FabricWaitableTask() : future_{promise_.get_future()} { + +} + +void FabricWaitableTask::HandleCompletion(const fi_cq_tagged_entry* entry, FabricAddress source) { + source_ = source; + promise_.set_value(); +} + +void FabricWaitableTask::HandleErrorCompletion(fi_cq_err_entry* errEntry) { + error_ = ErrorFromFabricInternal("FabricWaitableTask", -errEntry->err); + promise_.set_value(); +} + +void FabricWaitableTask::Wait(Error* error) { + future_.wait(); + error->swap(error_); +} + +FabricAddress FabricWaitableTask::GetSource() const { + return source_; +} diff --git a/common/cpp/src/asapo_fabric/common/fabric_waitable_task.h b/common/cpp/src/asapo_fabric/common/fabric_waitable_task.h new file mode 100644 index 0000000000000000000000000000000000000000..daf076149a521a9736196c59853a7da8d86eb2c7 --- /dev/null +++ b/common/cpp/src/asapo_fabric/common/fabric_waitable_task.h @@ -0,0 +1,30 @@ +#ifndef ASAPO_FABRIC_WAITABLE_TASK_H +#define ASAPO_FABRIC_WAITABLE_TASK_H + +#include <common/error.h> +#include <asapo_fabric/asapo_fabric.h> +#include <future> +#include "fabric_task.h" + +namespace asapo { namespace fabric { + class FabricWaitableTask : FabricTask { + private: + std::promise<void> promise_; + std::future<void> future_; + + Error error_; + FabricAddress source_; + public: + explicit FabricWaitableTask(); + + void HandleCompletion(const fi_cq_tagged_entry* entry, FabricAddress source) override; + void HandleErrorCompletion(fi_cq_err_entry* errEntry) override; + + void Wait(Error* error); + + FabricAddress GetSource() const; + + }; +}} + +#endif //ASAPO_FABRIC_WAITABLE_TASK_H diff --git a/common/cpp/src/asapo_fabric/fabric_error.cpp b/common/cpp/src/asapo_fabric/fabric_error.cpp new file mode 100644 index 0000000000000000000000000000000000000000..6f37cb151baba4761fbd2e625e8711b06fbd3fa8 --- /dev/null +++ b/common/cpp/src/asapo_fabric/fabric_error.cpp @@ -0,0 +1,8 @@ +#include "fabric_error.h" +#include <rdma/fi_errno.h> + +asapo::Error asapo::fabric::ErrorFromFabricInternal(const std::string& where, int internalStatusCode) { + std::string errText = where + ": " + fi_strerror(-internalStatusCode); + auto err = new ServiceError<FabricErrorType, ErrorType::kFabricError>(errText, FabricErrorType::kInternalError); + return Error(err); +} diff --git a/common/cpp/src/asapo_fabric/fabric_error.h b/common/cpp/src/asapo_fabric/fabric_error.h new file mode 100644 index 0000000000000000000000000000000000000000..c8e422689383d461572f57467cb5190804aa4890 --- /dev/null +++ b/common/cpp/src/asapo_fabric/fabric_error.h @@ -0,0 +1,45 @@ +#ifndef ASAPO_FABRICERRORCONVERTER_H +#define ASAPO_FABRICERRORCONVERTER_H + +#include <common/error.h> + +namespace asapo { namespace fabric { +enum class FabricErrorType { + kNotSupported, + kOutdatedLibrary, + kInternalError, // An error that was produced by LibFabric + kNoDeviceFound, + kClientNotInitialized, + kTimeout, +}; + + +using FabricError = ServiceError<FabricErrorType, ErrorType::kFabricError>; +using FabricErrorTemplate = ServiceErrorTemplate<FabricErrorType, ErrorType::kFabricError>; + +/** + * internalStatusCode must be a negative number + * (Which all libfabric api calls usually return in an error case + */ +Error ErrorFromFabricInternal(const std::string& where, int internalStatusCode); + +namespace FabricErrorTemplates { + auto const kNotSupportedOnBuildError = FabricErrorTemplate { + "This build of ASAPO does not support LibFabric", FabricErrorType::kNotSupported + }; + auto const kOutdatedLibraryError = FabricErrorTemplate { + "LibFabric outdated", FabricErrorType::kOutdatedLibrary + }; + auto const kNoDeviceFoundError = FabricErrorTemplate { + "No device was found (Check your config)", FabricErrorType::kNoDeviceFound + }; + auto const kClientNotInitializedError = FabricErrorTemplate { + "The client was not initialized. Add server address first!", + FabricErrorType::kClientNotInitialized + }; + +} + +}} + +#endif //ASAPO_FABRICERRORCONVERTER_H diff --git a/common/cpp/src/asapo_fabric/fabric_factory_impl.cpp b/common/cpp/src/asapo_fabric/fabric_factory_impl.cpp index b5c267ef8d16b863602f9e92dfadb1f6f86f7503..e6856325ab9b8ddd845550e61c7118b687d16024 100644 --- a/common/cpp/src/asapo_fabric/fabric_factory_impl.cpp +++ b/common/cpp/src/asapo_fabric/fabric_factory_impl.cpp @@ -1,5 +1,7 @@ #include "fabric_factory_impl.h" -#include "fabric_internal_impl_common.h" +#include "fabric_error.h" +#include "client/fabric_client_impl.h" +#include "server/fabric_server_impl.h" #include <rdma/fabric.h> using namespace asapo::fabric; @@ -11,35 +13,38 @@ std::string fi_version_string(uint32_t version) { bool FabricFactoryImpl::HasValidVersion(Error* error) const { auto current_version = fi_version(); - if (FI_VERSION_LT(current_version, EXPECTED_FI_VERSION)) { - std::string found_version_str = fi_version_string(current_version); - std::string expected_version_str = fi_version_string(EXPECTED_FI_VERSION); + DBG("Found LibFabric version: " + fi_version_string(current_version)); - std::string errorText = "LibFabric outdated."; - errorText += " (Found " + found_version_str + " but expected at least " + expected_version_str + ")"; + if (FI_VERSION_LT(current_version, FabricContextImpl::kMinExpectedLibFabricVersion)) { + std::string found_version_str = fi_version_string(current_version); + std::string expected_version_str = fi_version_string(FabricContextImpl::kMinExpectedLibFabricVersion); - *error = TextError(errorText); + std::string errorText = "Found " + found_version_str + " but expected at least " + expected_version_str; + *error = FabricErrorTemplates::kOutdatedLibraryError.Generate(errorText); return false; } return true; } -std::unique_ptr<FabricClient> -FabricFactoryImpl::CreateClient(Error* error) const { +std::unique_ptr<FabricServer> +FabricFactoryImpl::CreateAndBindServer(const std::string& host, uint16_t port, Error* error) const { if (!HasValidVersion(error)) { return nullptr; } - *error = TextError("This build of ASAPO does not support LibFabric."); - return nullptr; + auto server = std::unique_ptr<FabricServerImpl>(new FabricServerImpl()); + + server->InitAndStartServer(host, port, error); + + return server; } -std::unique_ptr<FabricServer> FabricFactoryImpl::CreateAndBindServer(Error* error) const { +std::unique_ptr<FabricClient> +FabricFactoryImpl::CreateClient(Error* error) const { if (!HasValidVersion(error)) { return nullptr; } - *error = TextError("This build of ASAPO does not support LibFabric."); - return nullptr; + return std::unique_ptr<FabricClient>(new FabricClientImpl()); } diff --git a/common/cpp/src/asapo_fabric/fabric_factory_impl.h b/common/cpp/src/asapo_fabric/fabric_factory_impl.h index 776d42a98cfc2fcc7c5466182f2841127ae55161..93d09b2d97f2456baab5f60ec2c16a54181e754f 100644 --- a/common/cpp/src/asapo_fabric/fabric_factory_impl.h +++ b/common/cpp/src/asapo_fabric/fabric_factory_impl.h @@ -5,9 +5,10 @@ namespace asapo { namespace fabric { class FabricFactoryImpl : public FabricFactory { + public: bool HasValidVersion(Error* error) const; - std::unique_ptr<FabricServer> CreateAndBindServer(Error* error) const override; + std::unique_ptr<FabricServer> CreateAndBindServer(const std::string& host, uint16_t port, Error* error) const override; std::unique_ptr<FabricClient> CreateClient(Error* error) const override; }; diff --git a/common/cpp/src/asapo_fabric/fabric_factory_not_supported.cpp b/common/cpp/src/asapo_fabric/fabric_factory_not_supported.cpp index 0508d997243270ca5edcf7e6a132e3d686ec109b..7ccde67150476aebddd67392ba9139bcf0f676d9 100644 --- a/common/cpp/src/asapo_fabric/fabric_factory_not_supported.cpp +++ b/common/cpp/src/asapo_fabric/fabric_factory_not_supported.cpp @@ -1,14 +1,15 @@ #include "fabric_factory_not_supported.h" +#include "fabric_error.h" using namespace asapo::fabric; -std::unique_ptr<FabricServer> asapo::fabric::FabricFactoryNotSupported::CreateAndBindServer(Error* error) const { - *error = TextError("This build of ASAPO does not support LibFabric."); +std::unique_ptr<FabricServer> asapo::fabric::FabricFactoryNotSupported::CreateAndBindServer(uint16_t port, + Error* error) const { + *error = FabricErrorTemplates::kNotSupportedOnBuildError.Generate(); return nullptr; } std::unique_ptr<FabricClient> asapo::fabric::FabricFactoryNotSupported::CreateClient(Error* error) const { - *error = TextError("This build of ASAPO does not support LibFabric."); + *error = FabricErrorTemplates::kNotSupportedOnBuildError.Generate(); return nullptr; } - diff --git a/common/cpp/src/asapo_fabric/fabric_factory_not_supported.h b/common/cpp/src/asapo_fabric/fabric_factory_not_supported.h index c701ac1a5e4f0b9d1f578385dd38056258c5b467..bac60424606630a9ae3de71e78099cb12af78348 100644 --- a/common/cpp/src/asapo_fabric/fabric_factory_not_supported.h +++ b/common/cpp/src/asapo_fabric/fabric_factory_not_supported.h @@ -5,7 +5,7 @@ namespace asapo { namespace fabric { class FabricFactoryNotSupported : public FabricFactory { - std::unique_ptr<FabricServer> CreateAndBindServer(Error* error) const override; + std::unique_ptr<FabricServer> CreateAndBindServer(uint16_t port, Error* error) const override; std::unique_ptr<FabricClient> CreateClient(Error* error) const override; }; diff --git a/common/cpp/src/asapo_fabric/fabric_internal_impl_common.h b/common/cpp/src/asapo_fabric/fabric_internal_impl_common.h deleted file mode 100644 index 73882fd0bb97987a3c38dec1173bfbd7afbffe36..0000000000000000000000000000000000000000 --- a/common/cpp/src/asapo_fabric/fabric_internal_impl_common.h +++ /dev/null @@ -1,23 +0,0 @@ -#ifndef ASAPO_FABRIC_INTERNAL_IMPL_COMMON_H -#define ASAPO_FABRIC_INTERNAL_IMPL_COMMON_H - -/* - * This file contains common features used in ASAPO's integration of libfabric. - * Only include this file into *.cpp files, never in *.h files - */ - -#ifndef EXPECTED_FI_VERSION -#define EXPECTED_FI_VERSION FI_VERSION(1, 9) -#endif - -#pragma pack(push, 1) -struct HandshakePayload { - // Hostnames can be up to 256 Bytes long. We also need to store the port number. - char hostnameAndPort[512]; -}; - -#pragma pack(pop) - -#define TODO_UNKNOWN_ADDRESS (FI_ADDR_NOTAVAIL - 1) - -#endif //ASAPO_FABRIC_INTERNAL_IMPL_COMMON_H diff --git a/common/cpp/src/asapo_fabric/server/fabric_server_impl.cpp b/common/cpp/src/asapo_fabric/server/fabric_server_impl.cpp new file mode 100644 index 0000000000000000000000000000000000000000..dec7e7036f70ad28ba2f4922263bdb58b1180fff --- /dev/null +++ b/common/cpp/src/asapo_fabric/server/fabric_server_impl.cpp @@ -0,0 +1,61 @@ +#include "fabric_server_impl.h" +#include "task/fabric_recv_any_task.h" +#include <rdma/fi_tagged.h> + +using namespace asapo; +using namespace fabric; + +FabricServerImpl::~FabricServerImpl() { + accepting_task_running = false; + accepting_task_->DeleteRequest(); +} + +FabricServerImpl::FabricServerImpl() : accepting_task_ {new FabricHandshakeAcceptingTask(this)} { + +} + +std::string FabricServerImpl::GetAddress() const { + return FabricContextImpl::GetAddress(); +} + +std::unique_ptr<FabricMemoryRegion> FabricServerImpl::ShareMemoryRegion(void* src, size_t size, Error* error) { + return FabricContextImpl::ShareMemoryRegion(src, size, error); +} + +void FabricServerImpl::Send(FabricAddress dstAddress, FabricMessageId messageId, const void* src, size_t size, + Error* error) { + FabricContextImpl::Send(dstAddress, messageId, src, size, error); +} + +void FabricServerImpl::Recv(FabricAddress srcAddress, FabricMessageId messageId, void* dst, size_t size, Error* error) { + FabricContextImpl::Recv(srcAddress, messageId, dst, size, error); +} + +void +FabricServerImpl::RdmaWrite(FabricAddress dstAddress, const MemoryRegionDetails* details, const void* buffer, size_t size, + Error* error) { + FabricContextImpl::RdmaWrite(dstAddress, details, buffer, size, error); +} + +void +FabricServerImpl::RecvAny(FabricAddress* srcAddress, FabricMessageId* messageId, void* dst, size_t size, Error* error) { + FabricRecvAnyTask anyTask; + + HandleFiCommandAndWait(fi_trecv, &anyTask, error, + endpoint_, dst, size, nullptr, TODO_UNKNOWN_ADDRESS, 0, ~0ULL); + + if (!(*error)) { + if (anyTask.GetSource() == FI_ADDR_NOTAVAIL) { + *error = TextError("Source address is unavailable"); + } + *messageId = anyTask.GetMessageId(); + *srcAddress = anyTask.GetSource(); + } +} + +void FabricServerImpl::InitAndStartServer(const std::string& host, uint16_t port, Error* error) { + InitCommon(host, port, error); + + accepting_task_running = true; + accepting_task_->StartRequest(); +} diff --git a/common/cpp/src/asapo_fabric/server/fabric_server_impl.h b/common/cpp/src/asapo_fabric/server/fabric_server_impl.h new file mode 100644 index 0000000000000000000000000000000000000000..8f17042a970ad90489cd223689411833041f7ef8 --- /dev/null +++ b/common/cpp/src/asapo_fabric/server/fabric_server_impl.h @@ -0,0 +1,43 @@ +#ifndef ASAPO_FABRIC_SERVER_IMPL_H +#define ASAPO_FABRIC_SERVER_IMPL_H + +#include <asapo_fabric/asapo_fabric.h> +#include "../common/fabric_context_impl.h" +#include "../fabric_factory_impl.h" +#include "task/fabric_handshake_accepting_task.h" + +namespace asapo { namespace fabric { + +class FabricServerImpl : public FabricServer, public FabricContextImpl { + friend FabricFactoryImpl; + friend class FabricHandshakeAcceptingTask; + +private: + std::unique_ptr<FabricHandshakeAcceptingTask> accepting_task_; + bool accepting_task_running = false; + + void InitAndStartServer(const std::string& host, uint16_t port, Error* error); +public: + ~FabricServerImpl() override; + explicit FabricServerImpl(); +public: // Link to FabricContext + std::string GetAddress() const override; + + std::unique_ptr<FabricMemoryRegion> ShareMemoryRegion(void* src, size_t size, Error* error) override; + + void Send(FabricAddress dstAddress, FabricMessageId messageId, + const void* src, size_t size, Error* error) override; + + void Recv(FabricAddress srcAddress, FabricMessageId messageId, + void* dst, size_t size, Error* error) override; + + void RdmaWrite(FabricAddress dstAddress, + const MemoryRegionDetails* details, const void* buffer, size_t size, + Error* error) override; +public: + void RecvAny(FabricAddress* srcAddress, FabricMessageId* messageId, void* dst, size_t size, Error* error) override; +}; + +}} + +#endif //ASAPO_FABRIC_SERVER_IMPL_H diff --git a/common/cpp/src/asapo_fabric/server/task/fabric_handshake_accepting_task.cpp b/common/cpp/src/asapo_fabric/server/task/fabric_handshake_accepting_task.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c85c952476d0d3bea825e9baaa560b2d92da6a36 --- /dev/null +++ b/common/cpp/src/asapo_fabric/server/task/fabric_handshake_accepting_task.cpp @@ -0,0 +1,76 @@ +#include <rdma/fi_endpoint.h> +#include "fabric_handshake_accepting_task.h" +#include "../fabric_server_impl.h" +#include "fabric_self_deleting_task.h" + +using namespace asapo; +using namespace fabric; + +FabricHandshakeAcceptingTask::~FabricHandshakeAcceptingTask() { + DeleteRequest(); +} + +FabricHandshakeAcceptingTask::FabricHandshakeAcceptingTask(FabricServerImpl* server) : server_{server} { +} + +void FabricHandshakeAcceptingTask::HandleCompletion(const fi_cq_tagged_entry* entry, FabricAddress source) { + Error error; + HandleAccept(&error); + if (error) { + OnError(&error); + return; + } + StartRequest(); +} + +void FabricHandshakeAcceptingTask::HandleErrorCompletion(fi_cq_err_entry* errEntry) { + Error error; + error = ErrorFromFabricInternal("FabricWaitableTask", -errEntry->err); + OnError(&error); + + StartRequest(); +} + +void FabricHandshakeAcceptingTask::StartRequest() { + if (server_->accepting_task_running) { + Error error; + server_->HandleFiCommand(fi_recv, this, &error, + server_->endpoint_, &handshake_payload_, sizeof(handshake_payload_), nullptr, TODO_UNKNOWN_ADDRESS); + + if (error) { + OnError(&error); + } + } +} + +void FabricHandshakeAcceptingTask::DeleteRequest() { + if (server_->endpoint_) { // The endpoint could not have been initialized + fi_cancel(&server_->endpoint_->fid, this); + } +} + +void FabricHandshakeAcceptingTask::HandleAccept(Error* error) { + std::string hostname; + uint16_t port; + std::tie(hostname, port) = *server_->io__->SplitAddressToHostnameAndPort(handshake_payload_.hostnameAndPort); + FabricAddress tmpAddr; + int ret = fi_av_insertsvc(server_->address_vector_, hostname.c_str(), std::to_string(port).c_str(), &tmpAddr, 0, nullptr); + if (ret != 1) { + *error = ErrorFromFabricInternal("fi_av_insertsvc", ret); + return; + } + DBG("[Handshake] Added as: " + std::to_string(tmpAddr)); + + + // TODO: This could slow down the whole complete queue process, maybe use another thread? :/ + // Send and forget + server_->HandleFiCommand(fi_send, new FabricSelfDeletingTask(), error, + server_->endpoint_, nullptr, 0, nullptr, tmpAddr); + if (*error) { + return; + } +} + +void FabricHandshakeAcceptingTask::OnError(Error* error) { + DBG("[Error] HandleCompletion: " + (*error)->Explain()); +} diff --git a/common/cpp/src/asapo_fabric/server/task/fabric_handshake_accepting_task.h b/common/cpp/src/asapo_fabric/server/task/fabric_handshake_accepting_task.h new file mode 100644 index 0000000000000000000000000000000000000000..ac0e18bc6dd414a6fc04d7ab52ac3d6fa02a13d2 --- /dev/null +++ b/common/cpp/src/asapo_fabric/server/task/fabric_handshake_accepting_task.h @@ -0,0 +1,37 @@ +#ifndef ASAPO_FABRIC_HANDSHAKE_ACCEPTING_TASK_H +#define ASAPO_FABRIC_HANDSHAKE_ACCEPTING_TASK_H + +#include "../../common/fabric_task.h" +#include "../../common/fabric_context_impl.h" + +namespace asapo { namespace fabric { + +// Need forward declaration for reference inside the task +class FabricServerImpl; + +/** + * This task will automatically requeue itself + */ +class FabricHandshakeAcceptingTask : public FabricTask { + +private: + FabricServerImpl* server_; + FabricHandshakePayload handshake_payload_{}; +public: + ~FabricHandshakeAcceptingTask(); + explicit FabricHandshakeAcceptingTask(FabricServerImpl* server); + + void HandleCompletion(const fi_cq_tagged_entry* entry, FabricAddress source) override; + void HandleErrorCompletion(fi_cq_err_entry* errEntry) override; + + void StartRequest(); + void DeleteRequest(); + +private: + void HandleAccept(Error* error); + void OnError(Error* error); +}; + +}} + +#endif //ASAPO_FABRIC_HANDSHAKE_ACCEPTING_TASK_H diff --git a/common/cpp/src/asapo_fabric/server/task/fabric_recv_any_task.cpp b/common/cpp/src/asapo_fabric/server/task/fabric_recv_any_task.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3666db205fe550872ed8eadb66b65fd32d240422 --- /dev/null +++ b/common/cpp/src/asapo_fabric/server/task/fabric_recv_any_task.cpp @@ -0,0 +1,18 @@ +#include "fabric_recv_any_task.h" + +using namespace asapo; +using namespace fabric; + +void FabricRecvAnyTask::HandleCompletion(const fi_cq_tagged_entry* entry, FabricAddress source) { + messageId_ = entry->tag; + FabricWaitableTask::HandleCompletion(entry, source); +} + +void FabricRecvAnyTask::HandleErrorCompletion(fi_cq_err_entry* errEntry) { + messageId_ = errEntry->tag; + FabricWaitableTask::HandleErrorCompletion(errEntry); +} + +FabricMessageId FabricRecvAnyTask::GetMessageId() const { + return messageId_; +} diff --git a/common/cpp/src/asapo_fabric/server/task/fabric_recv_any_task.h b/common/cpp/src/asapo_fabric/server/task/fabric_recv_any_task.h new file mode 100644 index 0000000000000000000000000000000000000000..22238400c86afe82be6326465c791790a13cbed3 --- /dev/null +++ b/common/cpp/src/asapo_fabric/server/task/fabric_recv_any_task.h @@ -0,0 +1,23 @@ +#ifndef ASAPO_FABRIC_RECV_ANY_TASK_H +#define ASAPO_FABRIC_RECV_ANY_TASK_H + +#include <asapo_fabric/asapo_fabric.h> +#include <rdma/fi_eq.h> +#include "../../common/fabric_waitable_task.h" + +namespace asapo { namespace fabric { + +class FabricRecvAnyTask : public FabricWaitableTask { +private: + FabricMessageId messageId_; +public: + void HandleCompletion(const fi_cq_tagged_entry* entry, FabricAddress source) override; + void HandleErrorCompletion(fi_cq_err_entry* errEntry) override; + + FabricMessageId GetMessageId() const; +}; + +}} + + +#endif //ASAPO_FABRIC_RECV_ANY_TASK_H diff --git a/common/cpp/src/asapo_fabric/server/task/fabric_self_deleting_task.cpp b/common/cpp/src/asapo_fabric/server/task/fabric_self_deleting_task.cpp new file mode 100644 index 0000000000000000000000000000000000000000..667f45286f6b56d77c8000ee7e475894586e9a96 --- /dev/null +++ b/common/cpp/src/asapo_fabric/server/task/fabric_self_deleting_task.cpp @@ -0,0 +1,14 @@ +#include "fabric_self_deleting_task.h" + +void asapo::fabric::FabricSelfDeletingTask::HandleCompletion(const fi_cq_tagged_entry* entry, + asapo::fabric::FabricAddress source) { + OnDone(); +} + +void asapo::fabric::FabricSelfDeletingTask::HandleErrorCompletion(fi_cq_err_entry* errEntry) { + OnDone(); +} + +void asapo::fabric::FabricSelfDeletingTask::OnDone() { + delete this; +} diff --git a/common/cpp/src/asapo_fabric/server/task/fabric_self_deleting_task.h b/common/cpp/src/asapo_fabric/server/task/fabric_self_deleting_task.h new file mode 100644 index 0000000000000000000000000000000000000000..6963f0f8cb2a3ecf7afb871bb4851750231a4ec0 --- /dev/null +++ b/common/cpp/src/asapo_fabric/server/task/fabric_self_deleting_task.h @@ -0,0 +1,20 @@ +#ifndef ASAPO_FABRIC_SELF_DELETING_TASK_H +#define ASAPO_FABRIC_SELF_DELETING_TASK_H + +#include "../../common/fabric_task.h" + +namespace asapo { namespace fabric { + +class FabricSelfDeletingTask : FabricTask { + + void HandleCompletion(const fi_cq_tagged_entry* entry, FabricAddress source) final; + void HandleErrorCompletion(fi_cq_err_entry* errEntry) final; + +private: + virtual ~FabricSelfDeletingTask() = default; + void OnDone(); +}; + +}} + +#endif //ASAPO_FABRIC_SELF_DELETING_TASK_H diff --git a/common/cpp/src/system_io/system_io.h b/common/cpp/src/system_io/system_io.h index a97bb1152d465b1d40dcc02b1e4d73ee32d327c3..e1c226abf69acf198b8992558fd82338aec4dcef 100644 --- a/common/cpp/src/system_io/system_io.h +++ b/common/cpp/src/system_io/system_io.h @@ -64,8 +64,6 @@ class SystemIO final : public IO { SocketDescriptor _accept(SocketDescriptor socket_fd, void* address, size_t* address_length) const; bool _close_socket(SocketDescriptor socket_fd) const; - std::unique_ptr<std::tuple<std::string, uint16_t>> SplitAddressToHostnameAndPort(std::string address) const; - std::unique_ptr<sockaddr_in> BuildSockaddrIn(const std::string& address, Error* err) const; /* @@ -130,6 +128,7 @@ class SystemIO final : public IO { void Skip(SocketDescriptor socket_fd, size_t length, Error* err) const override; void CloseSocket(SocketDescriptor socket_fd, Error* err) const override; std::string GetHostName(Error* err) const noexcept override; + std::unique_ptr<std::tuple<std::string, uint16_t>> SplitAddressToHostnameAndPort(std::string address) const override; /* * Filesystem diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 30598b059e866cf2fb886e6f3e9ce32167f2b2cd..11b4622c6528a19a79daa5807874d7f8093862ea 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -4,3 +4,5 @@ add_subdirectory(automatic) configure_files(${CMAKE_CURRENT_SOURCE_DIR}/manual/tests_via_nomad ${CMAKE_CURRENT_BINARY_DIR}/manual/tests_via_nomad @ONLY) add_subdirectory(manual/performance_broker_receiver) + +add_subdirectory(manual/asapo_fabric) diff --git a/tests/manual/asapo_fabric/CMakeLists.txt b/tests/manual/asapo_fabric/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..00e820551b4331705bc3c46ef5b92114df7cd7a1 --- /dev/null +++ b/tests/manual/asapo_fabric/CMakeLists.txt @@ -0,0 +1,7 @@ +GET_PROPERTY(ASAPO_COMMON_FABRIC_LIBRARIES GLOBAL PROPERTY ASAPO_COMMON_FABRIC_LIBRARIES) + +add_executable(example-fabric-server fabric_server.cpp) +target_link_libraries(example-fabric-server asapo-fabric ${ASAPO_COMMON_FABRIC_LIBRARIES}) + +add_executable(example-fabric-client fabric_client.cpp) +target_link_libraries(example-fabric-client asapo-fabric ${ASAPO_COMMON_FABRIC_LIBRARIES}) diff --git a/tests/manual/asapo_fabric/fabric_client.cpp b/tests/manual/asapo_fabric/fabric_client.cpp new file mode 100644 index 0000000000000000000000000000000000000000..863be7219eab9d5a65dd379bd9c4f75b70146284 --- /dev/null +++ b/tests/manual/asapo_fabric/fabric_client.cpp @@ -0,0 +1,72 @@ +#include <asapo_fabric/asapo_fabric.h> +#include <iostream> +#include <common/data_structs.h> +#include <common/networking.h> + +using namespace asapo; +using namespace asapo::fabric; + +int main() { + Error error; + auto factory = GenerateDefaultFabricFactory(); + + auto client = factory->CreateClient(&error); + if (error) { + std::cerr << error << std::endl; + return 1; + } + + size_t dataBufferSize = 1024*1024*400 /*400 MiByte*/; + FileData dataBuffer = FileData{new uint8_t[dataBufferSize]}; + + auto serverAddress = client->AddServerAddress("127.0.0.1:1319", &error); + if (error) { + std::cerr << error << std::endl; + return 1; + } + + auto mr = client->ShareMemoryRegion(dataBuffer.get(), dataBufferSize, &error); + if (error) { + std::cerr << error << std::endl; + return 1; + } + + uint64_t totalTransferSize = 0; + auto start = std::chrono::high_resolution_clock::now(); + + std::cout << "Starting message loop" << std::endl; + for (FabricMessageId messageId = 0; messageId < 10 && !error; messageId++) { + GenericRequestHeader request{}; + memcpy(&request.message, mr->GetDetails(), sizeof(MemoryRegionDetails)); + client->Send(serverAddress, messageId, &request, sizeof(request), &error); + if (error) { + break; + } + + GenericNetworkResponse response{}; + client->Recv(serverAddress, messageId, &response, sizeof(response), &error); + if (error) { + break; + } + + if (strcmp((char*)dataBuffer.get(), "I (the server) wrote into your buffer.") != 0) { + error = TextError("The buffer was not written with the expected text"); + break; + } + memset(dataBuffer.get(), 0, 64); + + totalTransferSize += dataBufferSize; + } + auto end = std::chrono::high_resolution_clock::now(); + + if (error) { + std::cerr << "Client exited with error: " << error << std::endl; + return 1; + } + + auto timeTook = end - start; + std::cout << "Transferred " << (((totalTransferSize)/1024)/1024) << " MiBytes in " + << std::chrono::duration_cast<std::chrono::milliseconds>(timeTook).count() << "ms" << std::endl; + + return 0; +} diff --git a/tests/manual/asapo_fabric/fabric_server.cpp b/tests/manual/asapo_fabric/fabric_server.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8dc1e6e6ab5a25c7874d2c9a90ce81e3cd0ce3d3 --- /dev/null +++ b/tests/manual/asapo_fabric/fabric_server.cpp @@ -0,0 +1,71 @@ +#include <asapo_fabric/asapo_fabric.h> +#include <iostream> +#include <io/io_factory.h> +#include <common/networking.h> + +using namespace asapo; +using namespace asapo::fabric; + +volatile bool running = false; + +void ServerThread(FabricServer* server, size_t bufferSize, FileData* buffer) { + Error error; + while(running && !error) { + FabricAddress clientAddress; + FabricMessageId messageId; + GenericRequestHeader request; + + server->RecvAny(&clientAddress, &messageId, &request, sizeof(request), &error); + if (error) { + break; + } + + std::cout << "Got a request from " << clientAddress << " id: " << messageId << std::endl; + server->RdmaWrite(clientAddress, (MemoryRegionDetails*)&request.message, buffer->get(), bufferSize, &error); + + GenericNetworkResponse response{}; + server->Send(clientAddress, messageId, &response, sizeof(response), &error); + } + + if (error) { + std::cerr << "Server thread exited with an error: " << error << std::endl; + } +} + +int main() { + Error error; + auto io = GenerateDefaultIO(); + auto factory = GenerateDefaultFabricFactory(); + + uint16_t port = 1319; + auto server = factory->CreateAndBindServer("127.0.0.1", port, &error); + if (error) { + std::cerr << error << std::endl; + return 1; + } + + size_t dataBufferSize = 1024*1024*400 /*400 MiByte*/; + FileData dataBuffer = FileData{new uint8_t[dataBufferSize]}; + strcpy((char*)dataBuffer.get(), "I (the server) wrote into your buffer."); + + running = true; + auto thread = io->NewThread("ServerThread", [&server, &dataBufferSize, &dataBuffer](){ + ServerThread(server.get(), dataBufferSize, &dataBuffer); + }); + + std::cout << "Server is listening on " << server->GetAddress() << std::endl; + std::cout << "Press Enter to stop the server." << std::endl; + + getchar(); + std::cout << "Stopping server..." << std::endl; + + running = false; + thread->join(); + + if (error) { + std::cerr << "Client exited with error: " << error << std::endl; + return 1; + } + + return 0; +}