Skip to content
Snippets Groups Projects
Commit c55e8225 authored by Carsten Patzke's avatar Carsten Patzke
Browse files

[rds::fabric] Implemented FabricServerRds with real use of LibFabric

parent d2379c46
No related branches found
No related tags found
No related merge requests found
......@@ -33,6 +33,8 @@ set(RDS_FILES
src/receiver_data_server/net_server/rds_tcp_server.cpp
src/receiver_data_server/request_handler/receiver_data_server_request_handler_factory.cpp
src/receiver_data_server/request_handler/receiver_data_server_request_handler.cpp
src/receiver_data_server/fabric_server_rds.cpp
src/receiver_data_server/fabric_rds_request.cpp
)
......
#include "fabric_rds_request.h"
using namespace asapo;
FabricRdsRequest::FabricRdsRequest( GenericRequestHeader header,
fabric::FabricAddress sourceId, fabric::FabricMessageId messageId)
: ReceiverDataServerRequest(header, sourceId), message_id{messageId} {
}
const fabric::MemoryRegionDetails* asapo::FabricRdsRequest::GetMemoryRegion() const {
return reinterpret_cast<const fabric::MemoryRegionDetails*>(header.message);
}
#ifndef ASAPO_FABRIC_RDS_REQUEST_H
#define ASAPO_FABRIC_RDS_REQUEST_H
#include <asapo_fabric/asapo_fabric.h>
#include "receiver_data_server_request.h"
namespace asapo {
class FabricRdsRequest : public ReceiverDataServerRequest {
public:
explicit FabricRdsRequest(GenericRequestHeader header, fabric::FabricAddress source_id, fabric::FabricMessageId messageId);
fabric::FabricMessageId message_id;
const fabric::MemoryRegionDetails* GetMemoryRegion() const;
};
}
#endif //ASAPO_FABRIC_RDS_REQUEST_H
#include <io/io_factory.h>
#include <utility>
#include "fabric_server_rds.h"
#include "receiver_data_server_logger.h"
#include "fabric_rds_request.h"
using namespace asapo;
FabricServerRds::FabricServerRds(const std::string& address) : factory__(fabric::GenerateDefaultFabricFactory()) {
FabricServerRds::FabricServerRds(std::string listenAddress): factory__(fabric::GenerateDefaultFabricFactory()), io__{GenerateDefaultIO()},
log__{GetDefaultReceiverDataServerLogger()}, listenAddress_(std::move(listenAddress)) {
}
......@@ -10,21 +16,60 @@ FabricServerRds::~FabricServerRds() {
}
GenericRequests FabricServerRds::GetNewRequests(Error* err) const noexcept {
return asapo::GenericRequests();
Error FabricServerRds::Initialize() {
if (server_) {
return TextError("Server was already initialized");
}
Error err;
std::string hostname;
uint16_t port;
std::tie(hostname, port) = *io__->SplitAddressToHostnameAndPort(listenAddress_);
server_ = factory__->CreateAndBindServer(log__, hostname, port, &err);
if (err) {
return err;
}
return err;
}
Error
FabricServerRds::SendResponse(const ReceiverDataServerRequest* request, const GenericNetworkResponse* response) const noexcept {
return asapo::Error();
GenericRequests FabricServerRds::GetNewRequests(Error* err) {
// TODO: Should be performance tested, just a single request is returned at a time
fabric::FabricAddress srcAddress;
fabric::FabricMessageId messageId;
GenericRequestHeader header;
server_->RecvAny(&srcAddress, &messageId, &header, sizeof(header), err);
if (err) {
return {}; // empty result
}
auto requestPtr = new FabricRdsRequest(header, srcAddress, messageId);
GenericRequests genericRequests;
genericRequests.emplace_back(GenericRequestPtr(requestPtr));
return genericRequests;
}
Error FabricServerRds::SendResponseAndSlotData(const ReceiverDataServerRequest* request,
const GenericNetworkResponse* response,
const CacheMeta* cache_slot) const noexcept {
return asapo::Error();
Error FabricServerRds::SendResponse(const ReceiverDataServerRequest* request, const GenericNetworkResponse* response) {
Error err;
auto fabricRequest = dynamic_cast<const FabricRdsRequest*>(request);
server_->Send(request->source_id, fabricRequest->message_id, response, sizeof(*response), &err);
}
void FabricServerRds::HandleAfterError(uint64_t source_id) const noexcept {
Error FabricServerRds::SendResponseAndSlotData(const ReceiverDataServerRequest* request,
const GenericNetworkResponse* response, const CacheMeta* cache_slot) {
Error err;
auto fabricRequest = dynamic_cast<const FabricRdsRequest*>(request);
server_->RdmaWrite(fabricRequest->source_id, fabricRequest->GetMemoryRegion(), cache_slot->addr, cache_slot->size,
&err);
if (err) {
return err;
}
server_->Send(request->source_id, fabricRequest->message_id, response, sizeof(*response), &err);
return err;
}
void FabricServerRds::HandleAfterError(uint64_t source_id) {
/* Do nothing? */
}
......@@ -7,19 +7,28 @@
namespace asapo {
class FabricServerRds : public RdsNetServer {
public:
explicit FabricServerRds(const std::string& address);
public:
explicit FabricServerRds(std::string listenAddress);
~FabricServerRds() override;
std::unique_ptr<asapo::fabric::FabricFactory> factory__; // modified in testings to mock system calls, otherwise do not touch
public: // NetServer implementation
GenericRequests GetNewRequests(Error* err) const noexcept override;
Error SendResponse(const ReceiverDataServerRequest* request, const GenericNetworkResponse* response) const noexcept override;
// modified in testings to mock system calls, otherwise do not touch
std::unique_ptr<asapo::fabric::FabricFactory> factory__;
std::unique_ptr<IO> io__;
const AbstractLogger* log__;
private:
std::string listenAddress_;
std::unique_ptr<fabric::FabricServer> server_;
public: // NetServer implementation
Error Initialize() override;
GenericRequests GetNewRequests(Error* err) override;
Error SendResponse(const ReceiverDataServerRequest* request, const GenericNetworkResponse* response) override;
Error SendResponseAndSlotData(const ReceiverDataServerRequest* request, const GenericNetworkResponse* response,
const CacheMeta* cache_slot) const noexcept override;
const CacheMeta* cache_slot) override;
void HandleAfterError(uint64_t source_id) const noexcept override;
void HandleAfterError(uint64_t source_id) override;
};
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment