diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index 39e5590f058130bda5dc16da5f81652526d8aec9..73b7c9918c625c37310378b1c4e856530053d117 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -51,13 +51,13 @@ struct GenericRequestHeader { uint64_t i_data_size = 0, uint64_t i_meta_size = 0, const std::string& i_message = "", const std::string& i_substream = ""): op_code{i_op_code}, data_id{i_data_id}, data_size{i_data_size}, meta_size{i_meta_size} { - strncpy(message, i_message.c_str(), kMaxMessageSize); // TODO must be memcpy in order to send raw MemoryDetails + memcpy(message, i_message.c_str(), kMaxMessageSize); strncpy(substream, i_substream.c_str(), kMaxMessageSize); } GenericRequestHeader(const GenericRequestHeader& header) { op_code = header.op_code, data_id = header.data_id, data_size = header.data_size, meta_size = header.meta_size, memcpy(custom_data, header.custom_data, kNCustomParams * sizeof(uint64_t)), - strncpy(message, header.message, kMaxMessageSize); // TODO must be memcpy in order to send raw MemoryDetails + memcpy(message, header.message, kMaxMessageSize); strncpy(substream, header.substream, kMaxMessageSize); } @@ -66,8 +66,8 @@ struct GenericRequestHeader { uint64_t data_size; uint64_t meta_size; CustomRequestData custom_data; - char message[kMaxMessageSize]; - char substream[kMaxMessageSize]; + char message[kMaxMessageSize]; /* Can also be a binary message (e.g. MemoryRegionDetails)*/ + char substream[kMaxMessageSize]; /* Must be a string (strcpy is used) */ std::string Json() { std::string s = "{\"id\":" + std::to_string(data_id) + "," "\"buffer\":\"" + std::string(message) + "\"" diff --git a/common/cpp/include/unittests/MockFabric.h b/common/cpp/include/unittests/MockFabric.h index f433814860e1eaaa1f566a3413242db0d9762a3b..b677df0baf15da1445a92103dbf167eb33a8f571 100644 --- a/common/cpp/include/unittests/MockFabric.h +++ b/common/cpp/include/unittests/MockFabric.h @@ -11,6 +11,7 @@ class MockFabricMemoryRegion : public FabricMemoryRegion { }; class MockFabricContext : public FabricContext { + public: MOCK_CONST_METHOD0(GetAddress, std::string()); std::unique_ptr<FabricMemoryRegion> ShareMemoryRegion(void* src, size_t size, Error* error) override { @@ -61,6 +62,7 @@ class MockFabricClient : public MockFabricContext, public FabricClient { }; class MockFabricServer : public MockFabricContext, public FabricServer { + public: void RecvAny(FabricAddress* srcAddress, FabricMessageId* messageId, void* dst, size_t size, Error* error) override { ErrorInterface* err = nullptr; RecvAny_t(srcAddress, messageId, dst, size, &err); @@ -68,6 +70,30 @@ class MockFabricServer : public MockFabricContext, public FabricServer { } MOCK_METHOD5(RecvAny_t, void(FabricAddress* srcAddress, FabricMessageId* messageId, void* dst, size_t size, ErrorInterface** err)); + public: // Link to FabricContext + std::string GetAddress() const override { + return MockFabricContext::GetAddress(); + } + + std::unique_ptr<FabricMemoryRegion> ShareMemoryRegion(void* src, size_t size, Error* error) override { + return MockFabricContext::ShareMemoryRegion(src, size, error); + } + + void Send(FabricAddress dstAddress, FabricMessageId messageId, + const void* src, size_t size, Error* error) override { + MockFabricContext::Send(dstAddress, messageId, src, size, error); + } + + void Recv(FabricAddress srcAddress, FabricMessageId messageId, + void* dst, size_t size, Error* error) override { + MockFabricContext::Recv(srcAddress, messageId, dst, size, error); + } + + void RdmaWrite(FabricAddress dstAddress, + const MemoryRegionDetails* details, const void* buffer, size_t size, + Error* error) override { + MockFabricContext::RdmaWrite(dstAddress, details, buffer, size, error); + } }; class MockFabricFactory : public FabricFactory { diff --git a/common/cpp/include/unittests/MockIO.h b/common/cpp/include/unittests/MockIO.h index 3eb3e0c8b2d59c45b8a55871dcdbb8232da1cfe4..880f1b054eb4bcc6d72450b894793433aec6afc9 100644 --- a/common/cpp/include/unittests/MockIO.h +++ b/common/cpp/include/unittests/MockIO.h @@ -145,9 +145,11 @@ class MockIO : public IO { } MOCK_CONST_METHOD4(Send_t, size_t(SocketDescriptor socket_fd, const void* buf, size_t length, ErrorInterface** err)); - - MOCK_CONST_METHOD1(SplitAddressToHostnameAndPort, - std::unique_ptr<std::tuple<std::string, uint16_t>>(const std::string& address)); + std::unique_ptr<std::tuple<std::string, uint16_t>> SplitAddressToHostnameAndPort(const std::string& address) const + override { + return std::unique_ptr<std::tuple<std::string, uint16_t>>(SplitAddressToHostnameAndPort_t(address)); + } + MOCK_CONST_METHOD1(SplitAddressToHostnameAndPort_t, std::tuple<std::string, uint16_t>* (const std::string& address)); void Skip(SocketDescriptor socket_fd, size_t length, Error* err) const override { ErrorInterface* error = nullptr; diff --git a/common/cpp/src/http_client/curl_http_client.cpp b/common/cpp/src/http_client/curl_http_client.cpp index 829c6a7f104b49c426a5871efe49237fa486b3f4..2e3211d7b1099205a1b2adf8def2a539253561eb 100644 --- a/common/cpp/src/http_client/curl_http_client.cpp +++ b/common/cpp/src/http_client/curl_http_client.cpp @@ -33,7 +33,7 @@ size_t curl_write( void* ptr, size_t size, size_t nmemb, void* data_container) { if (container->bytes_received + nbytes > container->array_size) { return -1; } - memcpy(container->p_array->get()+container->bytes_received, ptr, nbytes); + memcpy(container->p_array->get() + container->bytes_received, ptr, nbytes); container->bytes_received += nbytes; break; case CurlDataMode::file: diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp index 0a9e6e7c66324ee1868b7e68fd033fc93f3ec7e7..1b30dc30db7f433766401f74d32ac9df778beb88 100644 --- a/consumer/api/cpp/unittests/test_server_broker.cpp +++ b/consumer/api/cpp/unittests/test_server_broker.cpp @@ -125,10 +125,11 @@ class ServerDataBrokerTests : public Test { )); } void MockGetServiceUri(std::string service, std::string result) { - EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/" + service), _, _)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::OK), - SetArgPointee<2>(nullptr), - Return(result))); + EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/" + service), _, + _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(result))); } void MockBeforeFTS(FileData* data); diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index 8148c0a56b0c2aae0eeba82ab85989000fdace90..34dbad23a24beabb518ca79531e57e968571daca 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -27,14 +27,14 @@ set(RECEIVER_CORE_FILES ) set(RDS_FILES + src/receiver_data_server/net_server/fabric_rds_request.cpp src/receiver_data_server/receiver_data_server.cpp src/receiver_data_server/receiver_data_server_request.cpp src/receiver_data_server/receiver_data_server_logger.cpp src/receiver_data_server/net_server/rds_tcp_server.cpp + src/receiver_data_server/net_server/rds_fabric_server.cpp src/receiver_data_server/request_handler/receiver_data_server_request_handler_factory.cpp src/receiver_data_server/request_handler/receiver_data_server_request_handler.cpp - src/receiver_data_server/fabric_server_rds.cpp - src/receiver_data_server/fabric_rds_request.cpp ) @@ -49,12 +49,14 @@ set(SOURCE_FILES ################################ #SET( CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static") +GET_PROPERTY(ASAPO_COMMON_FABRIC_LIBRARIES GLOBAL PROPERTY ASAPO_COMMON_FABRIC_LIBRARIES) add_library(${TARGET_NAME} STATIC ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> $<TARGET_OBJECTS:curl_http_client> $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:request_pool>) set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX) target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR} ${CURL_INCLUDE_DIRS}) -target_link_libraries(${TARGET_NAME} ${CURL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} database) +target_link_libraries(${TARGET_NAME} ${CURL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} database + asapo-fabric ${ASAPO_COMMON_FABRIC_LIBRARIES}) add_executable(${TARGET_NAME}-bin src/main.cpp) @@ -112,6 +114,7 @@ gtest(${TARGET_NAME} "${TEST_SOURCE_FILES}" "${TEST_LIBRARIES}" set(TEST_SOURCE_FILES_RDS unittests/receiver_data_server/test_receiver_data_server.cpp + unittests/receiver_data_server/net_server/test_rds_fabric_server.cpp unittests/receiver_data_server/net_server/test_rds_tcp_server.cpp unittests/receiver_data_server/request_handler/test_request_handler_factory.cpp unittests/receiver_data_server/request_handler/test_request_handler.cpp diff --git a/receiver/src/receiver_data_server/fabric_rds_request.cpp b/receiver/src/receiver_data_server/fabric_rds_request.cpp deleted file mode 100644 index dc42eb3be0b2837abbfb42ae8d57f2703d5eba0a..0000000000000000000000000000000000000000 --- a/receiver/src/receiver_data_server/fabric_rds_request.cpp +++ /dev/null @@ -1,14 +0,0 @@ -#include "fabric_rds_request.h" - -using namespace asapo; - -FabricRdsRequest::FabricRdsRequest( GenericRequestHeader header, - fabric::FabricAddress sourceId, fabric::FabricMessageId messageId) - : ReceiverDataServerRequest(header, sourceId), message_id{messageId} { - -} - -const fabric::MemoryRegionDetails* asapo::FabricRdsRequest::GetMemoryRegion() const { - return reinterpret_cast<const fabric::MemoryRegionDetails*>(header.message); -} - diff --git a/receiver/src/receiver_data_server/net_server/fabric_rds_request.cpp b/receiver/src/receiver_data_server/net_server/fabric_rds_request.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0edde1019f0301655b758ddc76a82b46074ec85f --- /dev/null +++ b/receiver/src/receiver_data_server/net_server/fabric_rds_request.cpp @@ -0,0 +1,14 @@ +#include "fabric_rds_request.h" + +using namespace asapo; + +FabricRdsRequest::FabricRdsRequest(const GenericRequestHeader& header, + fabric::FabricAddress sourceId, fabric::FabricMessageId messageId) + : ReceiverDataServerRequest(header, sourceId), message_id{messageId} { + +} + +const fabric::MemoryRegionDetails* asapo::FabricRdsRequest::GetMemoryRegion() const { + return reinterpret_cast<const fabric::MemoryRegionDetails*>(header.message); +} + diff --git a/receiver/src/receiver_data_server/fabric_rds_request.h b/receiver/src/receiver_data_server/net_server/fabric_rds_request.h similarity index 62% rename from receiver/src/receiver_data_server/fabric_rds_request.h rename to receiver/src/receiver_data_server/net_server/fabric_rds_request.h index e7aee596f1675b68970169409bf4a6984549e04a..bcbd3a180b1cd80c090c408a78603a47f65ce8e3 100644 --- a/receiver/src/receiver_data_server/fabric_rds_request.h +++ b/receiver/src/receiver_data_server/net_server/fabric_rds_request.h @@ -2,13 +2,14 @@ #define ASAPO_FABRIC_RDS_REQUEST_H #include <asapo_fabric/asapo_fabric.h> -#include "receiver_data_server_request.h" +#include "../receiver_data_server_request.h" namespace asapo { class FabricRdsRequest : public ReceiverDataServerRequest { public: - explicit FabricRdsRequest(GenericRequestHeader header, fabric::FabricAddress source_id, fabric::FabricMessageId messageId); + explicit FabricRdsRequest(const GenericRequestHeader& header, fabric::FabricAddress source_id, + fabric::FabricMessageId messageId); fabric::FabricMessageId message_id; const fabric::MemoryRegionDetails* GetMemoryRegion() const; }; diff --git a/receiver/src/receiver_data_server/fabric_server_rds.cpp b/receiver/src/receiver_data_server/net_server/rds_fabric_server.cpp similarity index 58% rename from receiver/src/receiver_data_server/fabric_server_rds.cpp rename to receiver/src/receiver_data_server/net_server/rds_fabric_server.cpp index 85140ed9c949fe4cff8adc8348b60117565f8fdd..43fa3b758de86ca129b2baa3bf408f86622a9075 100644 --- a/receiver/src/receiver_data_server/fabric_server_rds.cpp +++ b/receiver/src/receiver_data_server/net_server/rds_fabric_server.cpp @@ -1,30 +1,30 @@ #include <io/io_factory.h> #include <utility> -#include "fabric_server_rds.h" -#include "receiver_data_server_logger.h" +#include "rds_fabric_server.h" +#include "../receiver_data_server_logger.h" #include "fabric_rds_request.h" using namespace asapo; -FabricServerRds::FabricServerRds(std::string listenAddress): factory__(fabric::GenerateDefaultFabricFactory()), io__{GenerateDefaultIO()}, +RdsFabricServer::RdsFabricServer(std::string listenAddress): factory__(fabric::GenerateDefaultFabricFactory()), io__{GenerateDefaultIO()}, log__{GetDefaultReceiverDataServerLogger()}, listenAddress_(std::move(listenAddress)) { } -FabricServerRds::~FabricServerRds() { +RdsFabricServer::~RdsFabricServer() { } -Error FabricServerRds::Initialize() { - if (server_) { +Error RdsFabricServer::Initialize() { + if (server__) { return TextError("Server was already initialized"); } Error err; std::string hostname; uint16_t port; std::tie(hostname, port) = *io__->SplitAddressToHostnameAndPort(listenAddress_); - server_ = factory__->CreateAndBindServer(log__, hostname, port, &err); + server__ = factory__->CreateAndBindServer(log__, hostname, port, &err); if (err) { return err; } @@ -32,14 +32,14 @@ Error FabricServerRds::Initialize() { return err; } -GenericRequests FabricServerRds::GetNewRequests(Error* err) { +GenericRequests RdsFabricServer::GetNewRequests(Error* err) { // TODO: Should be performance tested, just a single request is returned at a time fabric::FabricAddress srcAddress; fabric::FabricMessageId messageId; GenericRequestHeader header; - server_->RecvAny(&srcAddress, &messageId, &header, sizeof(header), err); - if (err) { + server__->RecvAny(&srcAddress, &messageId, &header, sizeof(header), err); + if (*err) { return {}; // empty result } auto requestPtr = new FabricRdsRequest(header, srcAddress, messageId); @@ -49,27 +49,28 @@ GenericRequests FabricServerRds::GetNewRequests(Error* err) { return genericRequests; } -Error FabricServerRds::SendResponse(const ReceiverDataServerRequest* request, const GenericNetworkResponse* response) { +Error RdsFabricServer::SendResponse(const ReceiverDataServerRequest* request, const GenericNetworkResponse* response) { Error err; auto fabricRequest = dynamic_cast<const FabricRdsRequest*>(request); - server_->Send(request->source_id, fabricRequest->message_id, response, sizeof(*response), &err); + server__->Send(request->source_id, fabricRequest->message_id, response, sizeof(*response), &err); + return err; } -Error FabricServerRds::SendResponseAndSlotData(const ReceiverDataServerRequest* request, +Error RdsFabricServer::SendResponseAndSlotData(const ReceiverDataServerRequest* request, const GenericNetworkResponse* response, const CacheMeta* cache_slot) { Error err; auto fabricRequest = dynamic_cast<const FabricRdsRequest*>(request); - server_->RdmaWrite(fabricRequest->source_id, fabricRequest->GetMemoryRegion(), cache_slot->addr, cache_slot->size, - &err); + server__->RdmaWrite(fabricRequest->source_id, fabricRequest->GetMemoryRegion(), cache_slot->addr, cache_slot->size, + &err); if (err) { return err; } - server_->Send(request->source_id, fabricRequest->message_id, response, sizeof(*response), &err); + server__->Send(request->source_id, fabricRequest->message_id, response, sizeof(*response), &err); return err; } -void FabricServerRds::HandleAfterError(uint64_t source_id) { +void RdsFabricServer::HandleAfterError(uint64_t source_id) { /* Do nothing? */ } diff --git a/receiver/src/receiver_data_server/fabric_server_rds.h b/receiver/src/receiver_data_server/net_server/rds_fabric_server.h similarity index 83% rename from receiver/src/receiver_data_server/fabric_server_rds.h rename to receiver/src/receiver_data_server/net_server/rds_fabric_server.h index 2f51227ae8ba780f3cebb50b2482edd5c62de72f..74e30dd39ab0394a4d0bd4b9e950deaf587c465b 100644 --- a/receiver/src/receiver_data_server/fabric_server_rds.h +++ b/receiver/src/receiver_data_server/net_server/rds_fabric_server.h @@ -6,18 +6,18 @@ namespace asapo { -class FabricServerRds : public RdsNetServer { +class RdsFabricServer : public RdsNetServer { public: - explicit FabricServerRds(std::string listenAddress); - ~FabricServerRds() override; + explicit RdsFabricServer(std::string listenAddress); + ~RdsFabricServer() override; // modified in testings to mock system calls, otherwise do not touch std::unique_ptr<asapo::fabric::FabricFactory> factory__; std::unique_ptr<IO> io__; const AbstractLogger* log__; + std::unique_ptr<fabric::FabricServer> server__; private: std::string listenAddress_; - std::unique_ptr<fabric::FabricServer> server_; public: // NetServer implementation Error Initialize() override; diff --git a/receiver/src/receiver_data_server/receiver_data_server_request.cpp b/receiver/src/receiver_data_server/receiver_data_server_request.cpp index 6c30cff9e646cc4b18e7d6ed308f1a6c4b913d68..41d1477c0f29aba325e247190c4ef431d0e3a1fd 100644 --- a/receiver/src/receiver_data_server/receiver_data_server_request.cpp +++ b/receiver/src/receiver_data_server/receiver_data_server_request.cpp @@ -4,10 +4,10 @@ namespace asapo { ReceiverDataServerRequest::ReceiverDataServerRequest(GenericRequestHeader header, uint64_t source_id) : - GenericRequest(std::move(header), 0), + GenericRequest(header, 0), source_id{source_id} { } -} \ No newline at end of file +} diff --git a/receiver/unittests/receiver_data_server/net_server/test_rds_fabric_server.cpp b/receiver/unittests/receiver_data_server/net_server/test_rds_fabric_server.cpp new file mode 100644 index 0000000000000000000000000000000000000000..699ed65d98e6d812a835ee9593ca15f4c05ed6f6 --- /dev/null +++ b/receiver/unittests/receiver_data_server/net_server/test_rds_fabric_server.cpp @@ -0,0 +1,259 @@ +#include <gtest/gtest.h> +#include <gmock/gmock.h> +#include <string> +#include <unittests/MockIO.h> +#include <unittests/MockLogger.h> +#include <unittests/MockFabric.h> +#include "../../../src/receiver_data_server/net_server/rds_fabric_server.h" +#include "../../../src/receiver_data_server/net_server/fabric_rds_request.h" + +using ::testing::Ne; +using ::testing::Eq; +using ::testing::Test; +using ::testing::NiceMock; +using ::testing::DoAll; +using ::testing::SetArgPointee; +using ::testing::Return; +using ::testing::_; + +using namespace asapo; + +std::string expected_address = "somehost:123"; + +TEST(RdsFabricServer, Constructor) { + RdsFabricServer fabric_server(""); + ASSERT_THAT(dynamic_cast<asapo::IO*>(fabric_server.io__.get()), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(fabric_server.log__), Ne(nullptr)); +} + + +class RdsFabricServerTests : public Test { + public: + RdsFabricServer rds_server{expected_address}; + NiceMock<MockLogger> mock_logger; + MockIO mock_io; + fabric::MockFabricFactory mock_fabric_factory; + fabric::MockFabricServer mock_fabric_server; + + void SetUp() override { + rds_server.log__ = &mock_logger; + rds_server.io__ = std::unique_ptr<IO> {&mock_io}; + rds_server.factory__ = std::unique_ptr<fabric::FabricFactory> {&mock_fabric_factory}; + } + void TearDown() override { + rds_server.io__.release(); + rds_server.factory__.release(); + rds_server.server__.release(); + } + + public: + void InitServer(); +}; + +void RdsFabricServerTests::InitServer() { + EXPECT_CALL(mock_io, SplitAddressToHostnameAndPort_t(expected_address)).WillOnce(Return( + new std::tuple<std::string, uint16_t>("abc", 123) + )); + + EXPECT_CALL(mock_fabric_factory, CreateAndBindServer_t(_, "abc", 123, _)).WillOnce(DoAll( + SetArgPointee<3>(fabric::FabricErrorTemplates::kInternalError.Generate().release()), + Return(&mock_fabric_server) + )); + + Error err = rds_server.Initialize(); + + ASSERT_THAT(err, Eq(fabric::FabricErrorTemplates::kInternalError)); +} + +TEST_F(RdsFabricServerTests, Initialize_Ok) { + InitServer(); +} + +TEST_F(RdsFabricServerTests, Initialize_Error_CreateAndBindServer) { + EXPECT_CALL(mock_io, SplitAddressToHostnameAndPort_t(expected_address)).WillOnce(Return( + new std::tuple<std::string, uint16_t>("abc", 123) + )); + + EXPECT_CALL(mock_fabric_factory, CreateAndBindServer_t(_, "abc", 123, _)).WillOnce(DoAll( + SetArgPointee<3>(fabric::FabricErrorTemplates::kInternalError.Generate().release()), + Return(nullptr) + )); + + Error err = rds_server.Initialize(); + + ASSERT_THAT(rds_server.server__, Eq(nullptr)); + ASSERT_THAT(err, Eq(fabric::FabricErrorTemplates::kInternalError)); +} + +TEST_F(RdsFabricServerTests, Initialize_Error_DoubleInitialize) { + EXPECT_CALL(mock_io, SplitAddressToHostnameAndPort_t(expected_address)).WillOnce(Return( + new std::tuple<std::string, uint16_t>("abc", 123) + )); + + EXPECT_CALL(mock_fabric_factory, CreateAndBindServer_t(_, "abc", 123, _)).WillOnce(Return( + &mock_fabric_server + )); + + Error err = rds_server.Initialize(); + ASSERT_THAT(rds_server.server__, Ne(nullptr)); + ASSERT_THAT(err, Eq(nullptr)); + + err = rds_server.Initialize(); + ASSERT_THAT(rds_server.server__, Ne(nullptr)); + ASSERT_THAT(err, Ne(nullptr)); +} + +ACTION_P5(A_WriteToRecvAnyBuffer, op_code, expected_id, remote_mem_addr, remote_mem_length, remote_mem_key) { + ((GenericRequestHeader*)arg2)->op_code = op_code; + ((GenericRequestHeader*)arg2)->data_id = expected_id; + ((fabric::MemoryRegionDetails*) & ((GenericRequestHeader*)arg2)->message)->addr = remote_mem_addr; + ((fabric::MemoryRegionDetails*) & ((GenericRequestHeader*)arg2)->message)->length = remote_mem_length; + ((fabric::MemoryRegionDetails*) & ((GenericRequestHeader*)arg2)->message)->key = remote_mem_key; +} + +TEST_F(RdsFabricServerTests, GetNewRequests_Ok) { + InitServer(); + + EXPECT_CALL(mock_fabric_server, RecvAny_t(_/*&src*/, _/*&msgId*/, _/*&buf*/, sizeof(GenericRequestHeader), _/*err*/)) + .WillOnce(DoAll( + SetArgPointee<0>(542), + SetArgPointee<1>(123), + A_WriteToRecvAnyBuffer(asapo::kOpcodeGetBufferData, 30, + 90, 10, 23) + )); + + Error err; + GenericRequests requests = rds_server.GetNewRequests(&err); + + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(requests.size(), Eq(1)); + auto req = dynamic_cast<FabricRdsRequest*>(requests[0].get()); + ASSERT_THAT(req->source_id, Eq(542)); + ASSERT_THAT(req->message_id, Eq(123)); + ASSERT_THAT(req->header.op_code, Eq(asapo::kOpcodeGetBufferData)); + ASSERT_THAT(req->header.data_id, Eq(30)); + ASSERT_THAT(req->GetMemoryRegion()->addr, Eq(90)); + ASSERT_THAT(req->GetMemoryRegion()->length, Eq(10)); + ASSERT_THAT(req->GetMemoryRegion()->key, Eq(23)); +} + + +TEST_F(RdsFabricServerTests, GetNewRequests_Error_RecvAny_InternalError) { + InitServer(); + + EXPECT_CALL(mock_fabric_server, RecvAny_t(_/*&src*/, _/*&msgId*/, _/*&buf*/, _/*bufSize*/, _/*err*/)) + .WillOnce( + SetArgPointee<4>(fabric::FabricErrorTemplates::kInternalError.Generate().release()) + ); + + Error err; + GenericRequests requests = rds_server.GetNewRequests(&err); + + ASSERT_THAT(err, Eq(fabric::FabricErrorTemplates::kInternalError)); + ASSERT_THAT(requests.size(), Eq(0)); +} + +TEST_F(RdsFabricServerTests, GetNewRequests_Error_RecvAny_Timeout) { + InitServer(); + + EXPECT_CALL(mock_fabric_server, RecvAny_t(_/*&src*/, _/*&msgId*/, _/*&buf*/, _/*bufSize*/, _/*err*/)) + .WillOnce( + SetArgPointee<4>(IOErrorTemplates::kTimeout.Generate().release()) + ); + + Error err; + GenericRequests requests = rds_server.GetNewRequests(&err); + + ASSERT_THAT(err, Eq(IOErrorTemplates::kTimeout)); + ASSERT_THAT(requests.size(), Eq(0)); +} + +TEST_F(RdsFabricServerTests, SendResponse_Ok) { + InitServer(); + + FabricRdsRequest request(GenericRequestHeader{}, 41, 87); + GenericNetworkResponse response; + + EXPECT_CALL(mock_fabric_server, Send_t(41, 87, &response, sizeof(response), _/*err*/)).Times(1); + + Error err = rds_server.SendResponse(&request, &response); + + ASSERT_THAT(err, Eq(nullptr)); +} + +TEST_F(RdsFabricServerTests, SendResponse_Error_SendError) { + InitServer(); + + FabricRdsRequest request(GenericRequestHeader{}, 41, 87); + GenericNetworkResponse response; + + EXPECT_CALL(mock_fabric_server, Send_t(41, 87, &response, sizeof(response), _/*err*/)).WillOnce( + SetArgPointee<4>(fabric::FabricErrorTemplates::kInternalError.Generate().release()) + ); + + Error err = rds_server.SendResponse(&request, &response); + + ASSERT_THAT(err, Eq(fabric::FabricErrorTemplates::kInternalError)); +} + +TEST_F(RdsFabricServerTests, SendResponseAndSlotData_Ok) { + InitServer(); + + GenericRequestHeader dummyHeader{}; + FabricRdsRequest request(GenericRequestHeader{}, 41, 87); + GenericNetworkResponse response; + CacheMeta cacheSlot; + cacheSlot.addr = (void*)0xABC; + cacheSlot.size = 200; + + EXPECT_CALL(mock_fabric_server, RdmaWrite_t(41, request.GetMemoryRegion(), (void*)0xABC, 200, _/*err*/)).Times(1); + EXPECT_CALL(mock_fabric_server, Send_t(41, 87, &response, sizeof(response), _/*err*/)).Times(1); + + Error err = rds_server.SendResponseAndSlotData(&request, &response, &cacheSlot); + + ASSERT_THAT(err, Eq(nullptr)); +} + +TEST_F(RdsFabricServerTests, SendResponseAndSlotData_RdmaWrite_Error) { + InitServer(); + + GenericRequestHeader dummyHeader{}; + FabricRdsRequest request(GenericRequestHeader{}, 41, 87); + GenericNetworkResponse response; + CacheMeta cacheSlot; + cacheSlot.addr = (void*)0xABC; + cacheSlot.size = 200; + + EXPECT_CALL(mock_fabric_server, RdmaWrite_t(41, request.GetMemoryRegion(), (void*)0xABC, 200, _/*err*/)).WillOnce( + SetArgPointee<4>(fabric::FabricErrorTemplates::kInternalError.Generate().release()) + ); + + Error err = rds_server.SendResponseAndSlotData(&request, &response, &cacheSlot); + + ASSERT_THAT(err, Eq(fabric::FabricErrorTemplates::kInternalError)); +} + +TEST_F(RdsFabricServerTests, SendResponseAndSlotData_Send_Error) { + InitServer(); + + GenericRequestHeader dummyHeader{}; + FabricRdsRequest request(GenericRequestHeader{}, 41, 87); + GenericNetworkResponse response; + CacheMeta cacheSlot; + cacheSlot.addr = (void*)0xABC; + cacheSlot.size = 200; + + EXPECT_CALL(mock_fabric_server, RdmaWrite_t(41, request.GetMemoryRegion(), (void*)0xABC, 200, _/*err*/)).Times(1); + EXPECT_CALL(mock_fabric_server, Send_t(41, 87, &response, sizeof(response), _/*err*/)).WillOnce( + SetArgPointee<4>(fabric::FabricErrorTemplates::kInternalError.Generate().release()) + ); + + Error err = rds_server.SendResponseAndSlotData(&request, &response, &cacheSlot); + + ASSERT_THAT(err, Eq(fabric::FabricErrorTemplates::kInternalError)); +} + +TEST_F(RdsFabricServerTests, HandleAfterError) { + InitServer(); + rds_server.HandleAfterError(2); /* Function does nothing */ +} diff --git a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp index 4be9c89b3940769327d5c24ba9f29bc6048fb840..bc65296abc0ebc432a8a4f7949924004dc043fda 100644 --- a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp @@ -94,7 +94,7 @@ class DbWriterHandlerTests : public Test { handler.log__ = &mock_logger; mock_request.reset(new NiceMock<MockRequest> {request_header, 1, "", &mock_db_check_handler}); config.database_uri = "127.0.0.1:27017"; - config.dataserver.advertise_uri = expected_host_ip+":"+std::to_string(expected_port); + config.dataserver.advertise_uri = expected_host_ip + ":" + std::to_string(expected_port); config.dataserver.listen_port = expected_port; SetReceiverConfig(config, "none"); diff --git a/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp b/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp index 75bc3d65bcd9ec7b5cc6b194c888cc669669da35..760c6d94034c70ca360c7a187f77902735ec292d 100644 --- a/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp +++ b/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp @@ -66,15 +66,15 @@ int main(int argc, char* argv[]) { transfer = "{\"Folder\":\"" + args.folder + "\",\"FileName\":\"random\"}"; auto io = asapo::GenerateDefaultIO(); - auto fname = args.folder+asapo::kPathSeparator+"random"; - uint64_t size=0; - auto expected_data = io->GetDataFromFile(fname,&size,&err); + auto fname = args.folder + asapo::kPathSeparator + "random"; + uint64_t size = 0; + auto expected_data = io->GetDataFromFile(fname, &size, &err); M_AssertEq(nullptr, err); err = server_broker->httpclient__->Post(args.uri_fts + "/transfer", cookie, transfer, &data, size, &code); M_AssertTrue(code == asapo::HttpCode::OK); - for (uint64_t i=0;i<size;i++) { + for (uint64_t i = 0; i < size; i++) { if (expected_data[i] != data[i]) { - M_AssertTrue(false,"recieve array equal to sent array"); + M_AssertTrue(false, "recieve array equal to sent array"); } }