From 46e74294aa2b5b195d68449d88870e97014c70d6 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Wed, 9 May 2018 17:40:31 +0200 Subject: [PATCH] start working at requests --- common/cpp/include/common/networking.h | 1 - common/cpp/include/io/io.h | 1 + common/cpp/src/system_io/system_io.cpp | 4 +- producer/api/CMakeLists.txt | 4 +- producer/api/include/producer/producer.h | 6 + .../api/include/producer/producer_error.h | 6 +- producer/api/src/producer_impl.cpp | 22 +- producer/api/src/receivers_status.cpp | 5 + producer/api/src/receivers_status.h | 17 + producer/api/src/request.cpp | 85 +++++ producer/api/src/request.h | 29 ++ producer/api/src/request_pool.cpp | 7 + producer/api/src/request_pool.h | 17 + producer/api/unittests/test_producer_impl.cpp | 18 +- producer/api/unittests/test_request.cpp | 316 ++++++++++++++++++ 15 files changed, 503 insertions(+), 35 deletions(-) create mode 100644 producer/api/src/receivers_status.cpp create mode 100644 producer/api/src/receivers_status.h create mode 100644 producer/api/src/request.cpp create mode 100644 producer/api/src/request.h create mode 100644 producer/api/src/request_pool.cpp create mode 100644 producer/api/src/request_pool.h create mode 100644 producer/api/unittests/test_request.cpp diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index 6d73f9f6a..724948c05 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -29,7 +29,6 @@ enum NetworkErrorCode : uint16_t { */ struct GenericNetworkRequestHeader { Opcode op_code; - NetworkRequestId request_id; uint64_t data_id; uint64_t data_size; }; diff --git a/common/cpp/include/io/io.h b/common/cpp/include/io/io.h index 6e4e55f98..e177bfe40 100644 --- a/common/cpp/include/io/io.h +++ b/common/cpp/include/io/io.h @@ -41,6 +41,7 @@ enum class SocketProtocols { using FileDescriptor = int; using SocketDescriptor = int; +const SocketDescriptor kDisconnectedSocketDescriptor = -1; class IO { public: diff --git a/common/cpp/src/system_io/system_io.cpp b/common/cpp/src/system_io/system_io.cpp index e8096c325..addeaa526 100644 --- a/common/cpp/src/system_io/system_io.cpp +++ b/common/cpp/src/system_io/system_io.cpp @@ -189,13 +189,13 @@ asapo::FileDescriptor asapo::SystemIO::CreateAndConnectIPTCPSocket(const std::st FileDescriptor fd = CreateSocket(AddressFamilies::INET, SocketTypes::STREAM, SocketProtocols::IP, err); if(*err != nullptr) { - return -1; + return kDisconnectedSocketDescriptor; } InetConnect(fd, address, err); if (*err != nullptr) { CloseSocket(fd, nullptr); - return -1; + return kDisconnectedSocketDescriptor; } return fd; diff --git a/producer/api/CMakeLists.txt b/producer/api/CMakeLists.txt index 58b7b5182..e6f0ada63 100644 --- a/producer/api/CMakeLists.txt +++ b/producer/api/CMakeLists.txt @@ -3,7 +3,8 @@ set(SOURCE_FILES src/producer.cpp src/producer_impl.h src/producer_impl.cpp - ) + src/request.cpp + src/request_pool.cpp src/receivers_status.cpp src/receivers_status.h) ################################ @@ -20,6 +21,7 @@ target_link_libraries(${TARGET_NAME} ${CURL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} set(TEST_SOURCE_FILES unittests/test_producer_impl.cpp unittests/test_producer.cpp + unittests/test_request.cpp ) set(TEST_LIBRARIES "${TARGET_NAME}") diff --git a/producer/api/include/producer/producer.h b/producer/api/include/producer/producer.h index 192e9f2f2..7408e3f78 100644 --- a/producer/api/include/producer/producer.h +++ b/producer/api/include/producer/producer.h @@ -3,7 +3,9 @@ #include <memory> #include <string> +#include <functional> +#include "common/networking.h" #include "producer_error.h" #include "logger/logger.h" @@ -14,6 +16,10 @@ enum class ProducerStatus { kConnected, }; + +using RequestCallback = std::function<void(GenericNetworkRequestHeader, Error)>; + + class Producer { public: //! Creates a new producer diff --git a/producer/api/include/producer/producer_error.h b/producer/api/include/producer/producer_error.h index 720b5cf4d..fccba0c10 100644 --- a/producer/api/include/producer/producer_error.h +++ b/producer/api/include/producer/producer_error.h @@ -10,7 +10,8 @@ enum class ProducerErrorType { kConnectionNotReady, kFileTooLarge, kFileIdAlreadyInUse, - kUnknownServerError + kUnknownServerError, + kCannotSendDataToReceivers, }; //TODO Make a marco to create error class and error template class @@ -71,6 +72,9 @@ auto const kUnknownServerError = ProducerErrorTemplate { "Unknown server error", ProducerErrorType::kUnknownServerError }; +auto const kCannotSendDataToReceivers = ProducerErrorTemplate { + "Cannot connect/send data to receivers", ProducerErrorType::kCannotSendDataToReceivers +}; }; } diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp index 6e8a5acb8..38f74e79d 100644 --- a/producer/api/src/producer_impl.cpp +++ b/producer/api/src/producer_impl.cpp @@ -54,29 +54,11 @@ Error ProducerImpl::ConnectToReceiver(const std::string& receiver_address) { GenericNetworkRequestHeader ProducerImpl::GenerateNextSendRequest(uint64_t file_id, size_t file_size) { GenericNetworkRequestHeader request; request.op_code = kNetOpcodeSendData; - request.request_id = request_id_++; request.data_id = file_id; request.data_size = file_size; return request; } -Error ProducerImpl::SendHeaderAndData(const GenericNetworkRequestHeader& header, const void* data, size_t file_size) { - Error io_error; - io__->Send(client_fd_, &header, sizeof(header), &io_error); - if(io_error) { -// todo: add meaningful message to the io_error (here and below) -// std::cerr << "ProducerImpl::Send/DataRequest error" << io_error << std::endl; - return io_error; - } - - io__->Send(client_fd_, data, file_size, &io_error); - if(io_error) { -// std::cerr << "ProducerImpl::Send/data error" << io_error << std::endl; - return io_error; - } - - return nullptr; -} Error ProducerImpl::ReceiveResponce() { Error err; @@ -108,7 +90,7 @@ Error ProducerImpl::Send(uint64_t file_id, const void* data, size_t file_size) { auto send_data_request = GenerateNextSendRequest(file_id, file_size); - auto error = SendHeaderAndData(send_data_request, data, file_size); + /*auto error = SendHeaderAndData(send_data_request, data, file_size); if(error) { log__->Debug("error sending to " + receiver_uri_ + " - " + error->Explain()); return error; @@ -120,6 +102,8 @@ Error ProducerImpl::Send(uint64_t file_id, const void* data, size_t file_size) { return error; } + */ + log__->Debug("succesfully sent data to " + receiver_uri_); return nullptr; diff --git a/producer/api/src/receivers_status.cpp b/producer/api/src/receivers_status.cpp new file mode 100644 index 000000000..981a33b72 --- /dev/null +++ b/producer/api/src/receivers_status.cpp @@ -0,0 +1,5 @@ +#include "receivers_status.h" + +namespace asapo { + +} \ No newline at end of file diff --git a/producer/api/src/receivers_status.h b/producer/api/src/receivers_status.h new file mode 100644 index 000000000..9dfaba163 --- /dev/null +++ b/producer/api/src/receivers_status.h @@ -0,0 +1,17 @@ +#ifndef ASAPO_RECEIVERS_STATUS_H +#define ASAPO_RECEIVERS_STATUS_H + +#include <string> +#include <vector> + +namespace asapo { + +using ReceiversList = std::vector<std::string>; + +class ReceiversStatus { + +}; + +} + +#endif //ASAPO_RECEIVERS_STATUS_H diff --git a/producer/api/src/request.cpp b/producer/api/src/request.cpp new file mode 100644 index 000000000..17b2f7e3a --- /dev/null +++ b/producer/api/src/request.cpp @@ -0,0 +1,85 @@ +#include <producer/producer_error.h> +#include "request.h" + +namespace asapo { + +Request::Request(const asapo::IO* io, const GenericNetworkRequestHeader& header, const void* data, + RequestCallback callback): + io__{io}, header_{header}, data_{data}, callback_{std::move(callback)} { + +} + +Error Request::ConnectToReceiver(SocketDescriptor* sd, const std::string& receiver_address) { + Error err; + *sd = io__->CreateAndConnectIPTCPSocket(receiver_address, &err); + if(err != nullptr) { + //log__->Debug("cannot connect to receiver at " + receiver_address + " - " + err->Explain()); + return err; + } + return nullptr; +} + +Error Request::SendHeaderAndData(SocketDescriptor sd) { + Error io_error; + io__->Send(sd, &header_, sizeof(header_), &io_error); + if(io_error) { +// todo: add meaningful message to the io_error (here and below) + return io_error; + } + + io__->Send(sd, data_, header_.data_size, &io_error); + if(io_error) { + return io_error; + } + + return nullptr; +} + + +Error Request::ReceiveResponce(SocketDescriptor sd) { + Error err; + SendDataResponse sendDataResponse; + io__->Receive(sd, &sendDataResponse, sizeof(sendDataResponse), &err); + if(err != nullptr) { + return err; + } + + if(sendDataResponse.error_code) { + if(sendDataResponse.error_code == kNetErrorFileIdAlreadyInUse) { + return ProducerErrorTemplates::kFileIdAlreadyInUse.Generate(); + } + return ProducerErrorTemplates::kUnknownServerError.Generate(); + } + return nullptr; +} + + +Error Request::Send(SocketDescriptor* sd, const ReceiversList& receivers_list) { + for (auto receiver_uri : receivers_list) { + if (*sd == kDisconnectedSocketDescriptor) { + auto err = ConnectToReceiver(sd, receiver_uri); + if (err != nullptr ) continue; + } + + auto err = SendHeaderAndData(*sd); + if (err != nullptr ) { + io__->CloseSocket(*sd, nullptr); + *sd = kDisconnectedSocketDescriptor; + continue; + } + + err = ReceiveResponce(*sd); + + if (err != nullptr && err != ProducerErrorTemplates::kFileIdAlreadyInUse) { + io__->CloseSocket(*sd, nullptr); + *sd = kDisconnectedSocketDescriptor; + continue; + } + callback_(header_, std::move(err)); + return nullptr; + + } + return ProducerErrorTemplates::kCannotSendDataToReceivers.Generate(); +} + +} \ No newline at end of file diff --git a/producer/api/src/request.h b/producer/api/src/request.h new file mode 100644 index 000000000..eb2985547 --- /dev/null +++ b/producer/api/src/request.h @@ -0,0 +1,29 @@ +#ifndef ASAPO_REQUEST_H +#define ASAPO_REQUEST_H + +#include "io/io.h" +#include "common/error.h" +#include "receivers_status.h" +#include "common/networking.h" + +#include "producer/producer.h" + +namespace asapo { + +class Request { + public: + explicit Request(const asapo::IO* io, const GenericNetworkRequestHeader& header, const void* data, + RequestCallback callback); + Error Send(SocketDescriptor* sd, const ReceiversList& receivers_list); + const asapo::IO* io__; + private: + Error ConnectToReceiver(SocketDescriptor* sd, const std::string& receiver_address); + Error SendHeaderAndData(SocketDescriptor sd); + Error ReceiveResponce(SocketDescriptor sd); + GenericNetworkRequestHeader header_; + const void* data_; + RequestCallback callback_; +}; +} + +#endif //ASAPO_REQUEST_H diff --git a/producer/api/src/request_pool.cpp b/producer/api/src/request_pool.cpp new file mode 100644 index 000000000..d4135e6f8 --- /dev/null +++ b/producer/api/src/request_pool.cpp @@ -0,0 +1,7 @@ +#include <vector> +#include "request_pool.h" + +namespace asapo { + + +} diff --git a/producer/api/src/request_pool.h b/producer/api/src/request_pool.h new file mode 100644 index 000000000..df6a1316b --- /dev/null +++ b/producer/api/src/request_pool.h @@ -0,0 +1,17 @@ +#ifndef ASAPO_REQUEST_POOL_H +#define ASAPO_REQUEST_POOL_H + +#include <string> +#include <vector> +#include <mutex> + +namespace asapo { + +class RequestPool { + public: + private: +}; + +} + +#endif //ASAPO_REQUEST_POOL_H diff --git a/producer/api/unittests/test_producer_impl.cpp b/producer/api/unittests/test_producer_impl.cpp index 0376a3f3d..768bd8d33 100644 --- a/producer/api/unittests/test_producer_impl.cpp +++ b/producer/api/unittests/test_producer_impl.cpp @@ -32,10 +32,7 @@ TEST(Producer, Logger) { ASSERT_THAT(dynamic_cast<asapo::AbstractLogger*>(producer.log__.get()), Ne(nullptr)); } -/** - * ConnectToReceiver - */ - +/* class ProducerImpl : public testing::Test { public: asapo::ProducerImpl producer; @@ -132,14 +129,11 @@ TEST_F(ProducerImpl, ConnectToReceiver__already_connected) { ASSERT_THAT(error, Eq(asapo::ProducerErrorTemplates::kAlreadyConnected)); } -/** - * Send - */ -MATCHER_P3(M_CheckSendDataRequest, request_id, file_id, file_size, + +MATCHER_P2(M_CheckSendDataRequest, file_id, file_size, "Checks if a valid GenericNetworkRequestHeader was Send") { return ((asapo::GenericNetworkRequestHeader*)arg)->op_code == asapo::kNetOpcodeSendData - && ((asapo::GenericNetworkRequestHeader*)arg)->request_id == request_id && ((asapo::GenericNetworkRequestHeader*)arg)->data_id == file_id && ((asapo::GenericNetworkRequestHeader*)arg)->data_size == file_size; } @@ -166,13 +160,13 @@ TEST_F(ProducerImpl, Send__file_too_large) { ASSERT_THAT(error, Eq(asapo::ProducerErrorTemplates::kFileTooLarge)); } - +/* TEST_F(ProducerImpl, Send__sendDataRequest_error) { InSequence sequence; ConnectToReceiver_DONE(expected_fd); - EXPECT_CALL(mock_io, Send_t(expected_fd, M_CheckSendDataRequest(expected_request_id, expected_file_id, + EXPECT_CALL(mock_io, Send_t(expected_fd, M_CheckSendDataRequest(expected_file_id, expected_file_size), sizeof(asapo::GenericNetworkRequestHeader), _)) .Times(1) @@ -320,4 +314,6 @@ TEST_F(ProducerImpl, SetLogLevel) { } + + */ } diff --git a/producer/api/unittests/test_request.cpp b/producer/api/unittests/test_request.cpp new file mode 100644 index 000000000..3fe3bc935 --- /dev/null +++ b/producer/api/unittests/test_request.cpp @@ -0,0 +1,316 @@ +#include <gtest/gtest.h> +#include <gmock/gmock.h> + +#include "unittests/MockIO.h" +#include "unittests/MockLogger.h" +#include "common/error.h" +#include "io/io.h" + +#include "producer/producer.h" +#include "producer/producer_error.h" + +#include "../src/request.h" +#include <common/networking.h> +#include "io/io_factory.h" + +namespace { + +using ::testing::Return; +using ::testing::_; +using ::testing::DoAll; +using ::testing::SetArgReferee; +using ::testing::Gt; +using ::testing::Eq; +using ::testing::Ne; +using ::testing::Mock; + +using ::testing::InSequence; +using ::testing::HasSubstr; + + +TEST(Producer, Constructor) { + auto io = std::unique_ptr<asapo::IO> {asapo::GenerateDefaultIO()}; + asapo::GenericNetworkRequestHeader header; + asapo::Request request{io.get(), header, nullptr, [](asapo::GenericNetworkRequestHeader, asapo::Error) {}}; + + ASSERT_THAT(dynamic_cast<const asapo::IO*>(request.io__), Ne(nullptr)); +} + + +class RequestTests : public testing::Test { + public: + testing::NiceMock<asapo::MockIO> mock_io; + uint64_t expected_file_id = 4224; + uint64_t expected_file_size = 1337; + asapo::Opcode expected_op_code = asapo::kNetOpcodeSendData; + void* expected_file_pointer = (void*)0xC00FE; + asapo::Error callback_err; + + asapo::GenericNetworkRequestHeader header{expected_op_code, expected_file_id, expected_file_size}; + bool called = false; + asapo::GenericNetworkRequestHeader callback_header; + asapo::Request request{&mock_io, header, expected_file_pointer, [this](asapo::GenericNetworkRequestHeader header, asapo::Error err) { + called = true; + callback_err = std::move(err); + callback_header = header; + }}; + testing::NiceMock<asapo::MockLogger> mock_logger; + + asapo::SocketDescriptor sd = asapo::kDisconnectedSocketDescriptor; + std::string expected_address1 = {"127.0.0.1:9090"}; + std::string expected_address2 = {"127.0.0.1:9091"}; + asapo::ReceiversList receivers_list{expected_address1, expected_address2}; + std::vector<asapo::SocketDescriptor> expected_sds{83942, 83943}; + + void ExpectFailConnect(bool only_once = false); + void ExpectFailSendHeader(bool only_once = false); + void ExpectFailSendData(bool only_once = false); + void ExpectOKConnect(bool only_once = false); + void ExpectOKSendHeader(bool only_once = false); + void ExpectOKSendData(bool only_once = false); + void ExpectFailReceive(bool only_once = false); + void ExpectOKReceive(bool only_once = true); + + void SetUp() override { +// request.log__ = asapo::Logger {&mock_logger}; + } + void TearDown() override { + } +}; + +ACTION_P(A_WriteSendDataResponse, error_code) { + ((asapo::SendDataResponse*)arg1)->op_code = asapo::kNetOpcodeSendData; + ((asapo::SendDataResponse*)arg1)->error_code = error_code; +} + + + +MATCHER_P2(M_CheckSendDataRequest, file_id, file_size, + "Checks if a valid GenericNetworkRequestHeader was Send") { + return ((asapo::GenericNetworkRequestHeader*)arg)->op_code == asapo::kNetOpcodeSendData + && ((asapo::GenericNetworkRequestHeader*)arg)->data_id == file_id + && ((asapo::GenericNetworkRequestHeader*)arg)->data_size == file_size; +} + +void RequestTests::ExpectFailConnect(bool only_once) { + for (auto expected_address : receivers_list) { + EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(expected_address, _)) + .WillOnce( + DoAll( + testing::SetArgPointee<1>(asapo::IOErrorTemplates::kInvalidAddressFormat.Generate().release()), + Return(asapo::kDisconnectedSocketDescriptor) + )); + if (only_once) break; + } + +} + + +void RequestTests::ExpectFailSendHeader(bool only_once) { + for (auto expected_sd : expected_sds) { + EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(expected_file_id, + expected_file_size), + sizeof(asapo::GenericNetworkRequestHeader), _)) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), + Return(-1) + )); + EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); + if (only_once) break; + } + +} + +void RequestTests::ExpectFailSendData(bool only_once) { + for (auto expected_sd : expected_sds) { + EXPECT_CALL(mock_io, Send_t(expected_sd, expected_file_pointer, expected_file_size, _)) + .Times(1) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), + Return(-1) + )); + EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); + if (only_once) break; + } + +} + + +void RequestTests::ExpectFailReceive(bool only_once) { + for (auto expected_sd : expected_sds) { + EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _)) + .Times(1) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), + testing::Return(-1) + )); + EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); + if (only_once) break; + } + +} + + +void RequestTests::ExpectOKSendData(bool only_once) { + for (auto expected_sd : expected_sds) { + EXPECT_CALL(mock_io, Send_t(expected_sd, expected_file_pointer, expected_file_size, _)) + .Times(1) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + Return(expected_file_size) + )); + if (only_once) break; + } + +} + + + +void RequestTests::ExpectOKSendHeader(bool only_once) { + for (auto expected_sd : expected_sds) { + EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(expected_file_id, + expected_file_size), + sizeof(asapo::GenericNetworkRequestHeader), _)) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + Return(sizeof(asapo::GenericNetworkRequestHeader)) + )); + if (only_once) break; + } + +} + + +void RequestTests::ExpectOKConnect(bool only_once) { + int i = 0; + for (auto expected_address : receivers_list) { + EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(expected_address, _)) + .WillOnce( + DoAll( + testing::SetArgPointee<1>(nullptr), + Return(expected_sds[i]) + )); + if (only_once) break; + i++; + } +} + + +void RequestTests::ExpectOKReceive(bool only_once) { + for (auto expected_sd : expected_sds) { + EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _)) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + A_WriteSendDataResponse(asapo::kNetErrorNoError), + testing::ReturnArg<2>() + )); + if (only_once) break; + } +} + + +TEST_F(RequestTests, TriesConnectWhenNotConnected) { + ExpectFailConnect(); + + auto err = request.Send(&sd, receivers_list); + + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); +} + +TEST_F(RequestTests, DoesNotTryConnectWhenConnected) { + sd = expected_sds[0]; + EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(_, _)) + .Times(0); + ExpectFailSendHeader(true); + + + auto err = request.Send(&sd, asapo::ReceiversList{expected_address1}); + + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); +} + + + +TEST_F(RequestTests, ErrorWhenCannotSendHeader) { + ExpectOKConnect(); + ExpectFailSendHeader(); + + auto err = request.Send(&sd, receivers_list); + + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); +} + +TEST_F(RequestTests, ErrorWhenCannotSendData) { + ExpectOKConnect(); + ExpectOKSendHeader(); + ExpectFailSendData(); + + auto err = request.Send(&sd, receivers_list); + + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); +} + +TEST_F(RequestTests, ErrorWhenCannotReceiveData) { + ExpectOKConnect(); + ExpectOKSendHeader(); + ExpectOKSendData(); + + ExpectFailReceive(); + + auto err = request.Send(&sd, receivers_list); + + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); +} + + + + +TEST_F(RequestTests, ImmediatelyRrrorIfFileAlreadyInUse) { + ExpectOKConnect(true); + ExpectOKSendHeader(true); + ExpectOKSendData(true); + + EXPECT_CALL(mock_io, Receive_t(expected_sds[0], _, sizeof(asapo::SendDataResponse), _)) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + A_WriteSendDataResponse(asapo::kNetErrorFileIdAlreadyInUse), + testing::ReturnArg<2>() + )); + + + auto err = request.Send(&sd, receivers_list); + + ASSERT_THAT(callback_err, Eq(asapo::ProducerErrorTemplates::kFileIdAlreadyInUse)); + ASSERT_THAT(called, Eq(true)); + ASSERT_THAT(err, Eq(nullptr)); +} + + +TEST_F(RequestTests, SendOK) { + ExpectOKConnect(true); + ExpectOKSendHeader(true); + ExpectOKSendData(true); + ExpectOKReceive(); + + auto err = request.Send(&sd, receivers_list); + + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(sd, Eq(expected_sds[0])); + ASSERT_THAT(callback_err, Eq(nullptr)); + ASSERT_THAT(called, Eq(true)); + ASSERT_THAT(callback_header.data_size, Eq(header.data_size)); + ASSERT_THAT(callback_header.op_code, Eq(header.op_code)); + ASSERT_THAT(callback_header.data_id, Eq(header.data_id)); + +} + + + +} -- GitLab