diff --git a/producer/api/cpp/include/producer/common.h b/producer/api/cpp/include/producer/common.h index 998d74f1ae358079e00f652a7d18ddc536defb67..a1d6fd56f707ad963e9f8ba2392b3bde5ec4aeb4 100644 --- a/producer/api/cpp/include/producer/common.h +++ b/producer/api/cpp/include/producer/common.h @@ -14,6 +14,7 @@ const uint8_t kMaxProcessingThreads = 32; struct RequestCallbackPayload { GenericRequestHeader original_header; + FileData data; std::string response; }; diff --git a/producer/api/cpp/src/request_handler_filesystem.cpp b/producer/api/cpp/src/request_handler_filesystem.cpp index c548f70c713a4e6fc40ac88f0b1f0373759f3d3c..0b0f027b6344d1874a258064ba3e0df269738e86 100644 --- a/producer/api/cpp/src/request_handler_filesystem.cpp +++ b/producer/api/cpp/src/request_handler_filesystem.cpp @@ -23,6 +23,7 @@ bool RequestHandlerFilesystem::ProcessRequestUnlocked(GenericRequest* request, b producer_request->data = io__->GetDataFromFile(producer_request->original_filepath, &producer_request->header.data_size, &err); if (err) { + producer_request->data = nullptr; *retry = true; return false; } @@ -31,7 +32,7 @@ bool RequestHandlerFilesystem::ProcessRequestUnlocked(GenericRequest* request, b err = io__->WriteDataToFile(destination_folder_, request->header.message, (uint8_t*)producer_request->data.get(), (size_t)request->header.data_size, true, true); if (producer_request->callback) { - producer_request->callback(RequestCallbackPayload{request->header, ""}, std::move(err)); + producer_request->callback(RequestCallbackPayload{request->header, std::move(producer_request->data),""}, std::move(err)); } *retry = false; return true; diff --git a/producer/api/cpp/src/request_handler_tcp.cpp b/producer/api/cpp/src/request_handler_tcp.cpp index befcd52d4dcb7a33d802b820ca5380ea47d424f7..f2d59de51b141a3d5c87ab4033321e7e555c85a5 100644 --- a/producer/api/cpp/src/request_handler_tcp.cpp +++ b/producer/api/cpp/src/request_handler_tcp.cpp @@ -216,7 +216,7 @@ bool RequestHandlerTcp::ProcessErrorFromReceiver(const Error& error, void RequestHandlerTcp::ProcessRequestCallback(Error err, ProducerRequest* request, std::string response, bool* retry) { if (request->callback) { - request->callback(RequestCallbackPayload{request->header, std::move(response)}, std::move(err)); + request->callback(RequestCallbackPayload{request->header, std::move(request->data),std::move(response)}, std::move(err)); } *retry = false; } @@ -266,7 +266,6 @@ bool RequestHandlerTcp::ProcessRequestUnlocked(GenericRequest* request, bool* re return false; } - if (NeedRebalance()) { CloseConnectionToPeformRebalance(); } @@ -304,7 +303,7 @@ void RequestHandlerTcp::ProcessRequestTimeout(GenericRequest* request) { auto err = ProducerErrorTemplates::kTimeout.Generate(err_string); if (producer_request->callback) { - producer_request->callback(RequestCallbackPayload{request->header, ""}, std::move(err)); + producer_request->callback(RequestCallbackPayload{request->header, std::move(producer_request->data),""}, std::move(err)); } } diff --git a/producer/api/cpp/unittests/test_request_handler_tcp.cpp b/producer/api/cpp/unittests/test_request_handler_tcp.cpp index 687b86c06522b9a6a4032d4edd85d97695d4576d..2ee8d411828fd78d50497daaa42d069ac8965ef7 100644 --- a/producer/api/cpp/unittests/test_request_handler_tcp.cpp +++ b/producer/api/cpp/unittests/test_request_handler_tcp.cpp @@ -34,7 +34,6 @@ using ::testing::InSequence; using ::testing::HasSubstr; using ::testing::Sequence; - TEST(RequestHandlerTcp, Constructor) { MockDiscoveryService ds; asapo::RequestHandlerTcp request{&ds, 1, nullptr}; @@ -48,173 +47,185 @@ std::string expected_auth_message = {"12345"}; asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; class RequestHandlerTcpTests : public testing::Test { - public: - NiceMock<asapo::MockIO> mock_io; - NiceMock<MockDiscoveryService> mock_discovery_service; - - uint64_t expected_file_id = 42; - uint64_t expected_file_size = 1337; - uint64_t expected_meta_size = 4; - std::string expected_metadata = "meta"; - std::string expected_warning = "warning"; - std::string expected_response = "response"; - - char expected_file_name[asapo::kMaxMessageSize] = "test_name"; - char expected_beamtime_id[asapo::kMaxMessageSize] = "test_beamtime_id"; - char expected_substream[asapo::kMaxMessageSize] = "test_substream"; - - uint64_t expected_thread_id = 2; - - asapo::Error callback_err; - asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, - expected_meta_size, expected_file_name, expected_substream}; - asapo::GenericRequestHeader header_fromfile{expected_op_code, expected_file_id, 0, expected_meta_size, - expected_file_name, expected_substream}; - bool callback_called = false; - asapo::GenericRequestHeader callback_header; - std::string callback_response; - - asapo::ProducerRequest request{expected_beamtime_id, header, nullptr, expected_metadata, "", [this](asapo::RequestCallbackPayload payload, asapo::Error err) { - callback_called = true; - callback_err = std::move(err); - callback_header = payload.original_header; - callback_response = payload.response; - }, true, 0}; - - std::string expected_origin_fullpath = std::string("origin/") + expected_file_name; - asapo::ProducerRequest request_filesend{expected_beamtime_id, header_fromfile, nullptr, expected_metadata, - expected_origin_fullpath, [this](asapo::RequestCallbackPayload payload, asapo::Error err) { - callback_called = true; - callback_err = std::move(err); - callback_header = payload.original_header; - callback_response = payload.response; - }, true, 0}; - - - asapo::ProducerRequest request_nocallback{expected_beamtime_id, header, nullptr, expected_metadata, "", nullptr, true, 0}; - testing::NiceMock<asapo::MockLogger> mock_logger; - uint64_t n_connections{0}; - asapo::RequestHandlerTcp request_handler{&mock_discovery_service, expected_thread_id, &n_connections}; - - std::string expected_address1 = {"127.0.0.1:9090"}; - std::string expected_address2 = {"127.0.0.1:9091"}; - asapo::ReceiversList receivers_list{expected_address1, expected_address2}; - asapo::ReceiversList receivers_list2{expected_address2, expected_address1}; - - asapo::ReceiversList receivers_list_single{expected_address1}; - - std::vector<asapo::SocketDescriptor> expected_sds{83942, 83943}; - - bool retry; - Sequence seq_receive[2]; - void ExpectFailConnect(bool only_once = false); - void ExpectFailAuthorize(bool only_once = false); - void ExpectOKAuthorize(bool only_once = false); - void ExpectFailSendHeader(bool only_once = false); - void ExpectFailSend(uint64_t expected_size, bool only_once); - void ExpectFailSendData(bool only_once = false); - void ExpectFailSendMetaData(bool only_once = false); - void ExpectOKConnect(bool only_once = false); - void ExpectOKSendHeader(bool only_once = false, asapo::Opcode code = expected_op_code); - void ExpectOKSend(uint64_t expected_size, bool only_once); - void ExpectOKSendAll(bool only_once); - void ExpectGetFileSize(bool ok); - void ExpectOKSendData(bool only_once = false); - void ExpectOKSendFile(bool only_once = false); - void ExpectFailSendFile(const asapo::ProducerErrorTemplate& err_template, bool client_error = false); - void ExpectOKSendMetaData(bool only_once = false); - void ExpectFailReceive(bool only_once = false); - void ExpectOKReceive(bool only_once = true, asapo::NetworkErrorCode code = asapo::kNetErrorNoError, - std::string message = ""); - void DoSingleSend(bool connect = true, bool success = true); - void AssertImmediatelyCallBack(asapo::NetworkErrorCode error_code, const asapo::ProducerErrorTemplate& err_template); - void SetUp() override { - request_handler.log__ = &mock_logger; - request_handler.io__.reset(&mock_io); - request.header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; - request_filesend.header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; - request_nocallback.header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; - ON_CALL(mock_discovery_service, RotatedUriList(_)). - WillByDefault(Return(receivers_list)); - - } - void TearDown() override { - request_handler.io__.release(); + public: + NiceMock<asapo::MockIO> mock_io; + NiceMock<MockDiscoveryService> mock_discovery_service; + + uint64_t expected_file_id = 42; + uint64_t expected_file_size = 1337; + uint64_t expected_meta_size = 4; + std::string expected_metadata = "meta"; + std::string expected_warning = "warning"; + std::string expected_response = "response"; + + char expected_file_name[asapo::kMaxMessageSize] = "test_name"; + char expected_beamtime_id[asapo::kMaxMessageSize] = "test_beamtime_id"; + char expected_substream[asapo::kMaxMessageSize] = "test_substream"; + + uint64_t expected_thread_id = 2; + + asapo::Error callback_err; + asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, + expected_meta_size, expected_file_name, expected_substream}; + asapo::GenericRequestHeader header_fromfile{expected_op_code, expected_file_id, 0, expected_meta_size, + expected_file_name, expected_substream}; + bool callback_called = false; + asapo::GenericRequestHeader callback_header; + std::string callback_response; + uint8_t expected_callback_data = 2; + asapo::FileData expected_data{[this]() { + auto a = new uint8_t[expected_file_size]; + for (auto i = 0; i < expected_file_size; i++) { + a[i] = expected_callback_data; } + return a; + }()}; + asapo::FileData callback_data; + + asapo::ProducerRequest request{expected_beamtime_id, header, std::move(expected_data), expected_metadata, "", + [this](asapo::RequestCallbackPayload payload, asapo::Error err) { + callback_called = true; + callback_err = std::move(err); + callback_header = payload.original_header; + callback_response = payload.response; + callback_data = std::move(payload.data); + }, true, 0}; + + std::string expected_origin_fullpath = std::string("origin/") + expected_file_name; + asapo::ProducerRequest request_filesend{expected_beamtime_id, header_fromfile, nullptr, expected_metadata, + expected_origin_fullpath, + [this](asapo::RequestCallbackPayload payload, asapo::Error err) { + callback_called = true; + callback_err = std::move(err); + callback_header = payload.original_header; + callback_response = payload.response; + callback_data = std::move(payload.data); + }, true, 0}; + + asapo::ProducerRequest + request_nocallback{expected_beamtime_id, header, nullptr, expected_metadata, "", nullptr, true, 0}; + testing::NiceMock<asapo::MockLogger> mock_logger; + uint64_t n_connections{0}; + asapo::RequestHandlerTcp request_handler{&mock_discovery_service, expected_thread_id, &n_connections}; + + std::string expected_address1 = {"127.0.0.1:9090"}; + std::string expected_address2 = {"127.0.0.1:9091"}; + asapo::ReceiversList receivers_list{expected_address1, expected_address2}; + asapo::ReceiversList receivers_list2{expected_address2, expected_address1}; + + asapo::ReceiversList receivers_list_single{expected_address1}; + + std::vector<asapo::SocketDescriptor> expected_sds{83942, 83943}; + + bool retry; + Sequence seq_receive[2]; + void ExpectFailConnect(bool only_once = false); + void ExpectFailAuthorize(bool only_once = false); + void ExpectOKAuthorize(bool only_once = false); + void ExpectFailSendHeader(bool only_once = false); + void ExpectFailSend(uint64_t expected_size, bool only_once); + void ExpectFailSendData(bool only_once = false); + void ExpectFailSendMetaData(bool only_once = false); + void ExpectOKConnect(bool only_once = false); + void ExpectOKSendHeader(bool only_once = false, asapo::Opcode code = expected_op_code); + void ExpectOKSend(uint64_t expected_size, bool only_once); + void ExpectOKSendAll(bool only_once); + void ExpectGetFileSize(bool ok); + void ExpectOKSendData(bool only_once = false); + void ExpectOKSendFile(bool only_once = false); + void ExpectFailSendFile(const asapo::ProducerErrorTemplate &err_template, bool client_error = false); + void ExpectOKSendMetaData(bool only_once = false); + void ExpectFailReceive(bool only_once = false); + void ExpectOKReceive(bool only_once = true, asapo::NetworkErrorCode code = asapo::kNetErrorNoError, + std::string message = ""); + void DoSingleSend(bool connect = true, bool success = true); + void AssertImmediatelyCallBack(asapo::NetworkErrorCode error_code, const asapo::ProducerErrorTemplate &err_template); + void SetUp() override { + request_handler.log__ = &mock_logger; + request_handler.io__.reset(&mock_io); + request.header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; + request_filesend.header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; + request_nocallback.header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; + ON_CALL(mock_discovery_service, RotatedUriList(_)). + WillByDefault(Return(receivers_list)); + + } + void TearDown() override { + request_handler.io__.release(); + } }; ACTION_P2(A_WriteSendDataResponse, error_code, message) { - ((asapo::SendDataResponse*)arg1)->op_code = asapo::kOpcodeTransferData; - ((asapo::SendDataResponse*)arg1)->error_code = error_code; - strcpy(((asapo::SendDataResponse*)arg1)->message, message.c_str()); + ((asapo::SendDataResponse*) arg1)->op_code = asapo::kOpcodeTransferData; + ((asapo::SendDataResponse*) arg1)->error_code = error_code; + strcpy(((asapo::SendDataResponse*) arg1)->message, message.c_str()); } MATCHER_P5(M_CheckSendDataRequest, op_code, file_id, file_size, message, substream, "Checks if a valid GenericRequestHeader was Send") { - return ((asapo::GenericRequestHeader*)arg)->op_code == op_code - && ((asapo::GenericRequestHeader*)arg)->data_id == uint64_t(file_id) - && ((asapo::GenericRequestHeader*)arg)->data_size == uint64_t(file_size) - && strcmp(((asapo::GenericRequestHeader*)arg)->message, message) == 0 - && strcmp(((asapo::GenericRequestHeader*)arg)->substream, substream) == 0; + return ((asapo::GenericRequestHeader*) arg)->op_code == op_code + && ((asapo::GenericRequestHeader*) arg)->data_id == uint64_t(file_id) + && ((asapo::GenericRequestHeader*) arg)->data_size == uint64_t(file_size) + && strcmp(((asapo::GenericRequestHeader*) arg)->message, message) == 0 + && strcmp(((asapo::GenericRequestHeader*) arg)->substream, substream) == 0; } - void RequestHandlerTcpTests::ExpectFailConnect(bool only_once) { for (auto expected_address : receivers_list) { EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(expected_address, _)) - .WillOnce( - DoAll( - testing::SetArgPointee<1>(asapo::IOErrorTemplates::kInvalidAddressFormat.Generate().release()), - Return(asapo::kDisconnectedSocketDescriptor) - )); + .WillOnce( + DoAll( + testing::SetArgPointee<1>(asapo::IOErrorTemplates::kInvalidAddressFormat.Generate().release()), + Return(asapo::kDisconnectedSocketDescriptor) + )); if (only_once) EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("cannot connect"), - HasSubstr(expected_address) + HasSubstr("cannot connect"), + HasSubstr(expected_address) ) - )); + )); if (only_once) break; } } - void RequestHandlerTcpTests::ExpectFailAuthorize(bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(asapo::kOpcodeAuthorize, 0, 0, expected_beamtime_id, - ""), - sizeof(asapo::GenericRequestHeader), _)) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - Return(sizeof(asapo::GenericRequestHeader)) - )); + EXPECT_CALL(mock_io, + Send_t(expected_sd, M_CheckSendDataRequest(asapo::kOpcodeAuthorize, 0, 0, expected_beamtime_id, + ""), + sizeof(asapo::GenericRequestHeader), _)) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + Return(sizeof(asapo::GenericRequestHeader)) + )); EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _)) - .InSequence(seq_receive[i]) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - A_WriteSendDataResponse(asapo::kNetAuthorizationError, expected_auth_message), - testing::ReturnArg<2>() - )); + .InSequence(seq_receive[i]) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + A_WriteSendDataResponse(asapo::kNetAuthorizationError, expected_auth_message), + testing::ReturnArg<2>() + )); EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); if (only_once) { EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("disconnected"), - HasSubstr(receivers_list[i]) + HasSubstr("disconnected"), + HasSubstr(receivers_list[i]) ) - )); + )); EXPECT_CALL(mock_logger, Error(AllOf( - HasSubstr("authorization"), - HasSubstr(expected_auth_message), - HasSubstr(receivers_list[i]) + HasSubstr("authorization"), + HasSubstr(expected_auth_message), + HasSubstr(receivers_list[i]) ) - )); + )); } if (only_once) break; i++; @@ -224,30 +235,30 @@ void RequestHandlerTcpTests::ExpectFailAuthorize(bool only_once) { void RequestHandlerTcpTests::ExpectOKAuthorize(bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(asapo::kOpcodeAuthorize, 0, 0, expected_beamtime_id, - ""), - sizeof(asapo::GenericRequestHeader), _)) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - Return(sizeof(asapo::GenericRequestHeader)) - )); - + EXPECT_CALL(mock_io, + Send_t(expected_sd, M_CheckSendDataRequest(asapo::kOpcodeAuthorize, 0, 0, expected_beamtime_id, + ""), + sizeof(asapo::GenericRequestHeader), _)) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + Return(sizeof(asapo::GenericRequestHeader)) + )); EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _)) - .InSequence(seq_receive[i]) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - A_WriteSendDataResponse(asapo::kNetErrorNoError, expected_auth_message), - testing::ReturnArg<2>() - )); + .InSequence(seq_receive[i]) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + A_WriteSendDataResponse(asapo::kNetErrorNoError, expected_auth_message), + testing::ReturnArg<2>() + )); if (only_once) { EXPECT_CALL(mock_logger, Info(AllOf( - HasSubstr("authorized"), - HasSubstr(receivers_list[i]) + HasSubstr("authorized"), + HasSubstr(receivers_list[i]) ) - )); + )); } if (only_once) break; i++; @@ -255,31 +266,32 @@ void RequestHandlerTcpTests::ExpectOKAuthorize(bool only_once) { } - - void RequestHandlerTcpTests::ExpectFailSendHeader(bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(expected_op_code, expected_file_id, - expected_file_size, expected_file_name, expected_substream), + EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(expected_op_code, + expected_file_id, + expected_file_size, + expected_file_name, + expected_substream), sizeof(asapo::GenericRequestHeader), _)) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), - Return(-1) - )); + .WillOnce( + DoAll( + testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), + Return(-1) + )); if (only_once) { EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("disconnected"), - HasSubstr(receivers_list[i]) + HasSubstr("disconnected"), + HasSubstr(receivers_list[i]) ) - )); + )); EXPECT_CALL(mock_logger, Warning(AllOf( - HasSubstr("cannot send"), - HasSubstr(receivers_list[i]) + HasSubstr("cannot send"), + HasSubstr(receivers_list[i]) ) - )); + )); } EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); if (only_once) break; @@ -290,26 +302,26 @@ void RequestHandlerTcpTests::ExpectFailSendHeader(bool only_once) { } } -void RequestHandlerTcpTests::ExpectFailSendFile(const asapo::ProducerErrorTemplate& err_template, bool client_error) { +void RequestHandlerTcpTests::ExpectFailSendFile(const asapo::ProducerErrorTemplate &err_template, bool client_error) { int i = 0; for (auto expected_sd : expected_sds) { EXPECT_CALL(mock_io, SendFile_t(expected_sd, expected_origin_fullpath, (size_t) expected_file_size)) - .Times(1) - .WillOnce( - Return(err_template.Generate().release()) - ); + .Times(1) + .WillOnce( + Return(err_template.Generate().release()) + ); if (client_error) { EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("disconnected"), - HasSubstr(receivers_list[i]) + HasSubstr("disconnected"), + HasSubstr(receivers_list[i]) ) - )); + )); EXPECT_CALL(mock_logger, Error(AllOf( - HasSubstr("cannot send"), - HasSubstr(receivers_list[i]) ) - )); + HasSubstr("cannot send"), + HasSubstr(receivers_list[i])) + )); } @@ -323,29 +335,28 @@ void RequestHandlerTcpTests::ExpectFailSendFile(const asapo::ProducerErrorTempla } - void RequestHandlerTcpTests::ExpectFailSend(uint64_t expected_size, bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { EXPECT_CALL(mock_io, Send_t(expected_sd, _, (size_t) expected_size, _)) - .Times(1) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), - Return(-1) - )); + .Times(1) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), + Return(-1) + )); if (only_once) { EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("disconnected"), - HasSubstr(receivers_list[i]) + HasSubstr("disconnected"), + HasSubstr(receivers_list[i]) ) - )); + )); EXPECT_CALL(mock_logger, Warning(AllOf( - HasSubstr("cannot send"), - HasSubstr(receivers_list[i]) + HasSubstr("cannot send"), + HasSubstr(receivers_list[i]) ) - )); + )); } EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); @@ -363,30 +374,27 @@ void RequestHandlerTcpTests::ExpectFailSendMetaData(bool only_once) { ExpectFailSend(expected_meta_size, only_once); } - - void RequestHandlerTcpTests::ExpectFailReceive(bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _)) - .InSequence(seq_receive[i]) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), - testing::Return(-1) - )); + .InSequence(seq_receive[i]) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), + testing::Return(-1) + )); EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("disconnected"), - HasSubstr(receivers_list[i]) + HasSubstr("disconnected"), + HasSubstr(receivers_list[i]) ) - )); - + )); EXPECT_CALL(mock_logger, Warning(AllOf( - HasSubstr("cannot send"), - HasSubstr(receivers_list[i]) + HasSubstr("cannot send"), + HasSubstr(receivers_list[i]) ) - )); + )); EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); if (only_once) break; i++; @@ -401,16 +409,15 @@ void RequestHandlerTcpTests::ExpectOKSendAll(bool only_once) { ExpectOKSendData(only_once); } - void RequestHandlerTcpTests::ExpectOKSend(uint64_t expected_size, bool only_once) { for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, Send_t(expected_sd, _, (size_t)expected_size, _)) - .Times(1) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - Return((size_t) expected_file_size) - )); + EXPECT_CALL(mock_io, Send_t(expected_sd, _, (size_t) expected_size, _)) + .Times(1) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + Return((size_t) expected_file_size) + )); if (only_once) break; } } @@ -419,76 +426,75 @@ void RequestHandlerTcpTests::ExpectOKSendMetaData(bool only_once) { ExpectOKSend(expected_meta_size, only_once); } - - void RequestHandlerTcpTests::ExpectOKSendData(bool only_once) { ExpectOKSend(expected_file_size, only_once); } void RequestHandlerTcpTests::ExpectOKSendFile(bool only_once) { for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, SendFile_t(expected_sd, expected_origin_fullpath, (size_t)expected_file_size)) - .Times(1) - .WillOnce(Return(nullptr)); + EXPECT_CALL(mock_io, SendFile_t(expected_sd, expected_origin_fullpath, (size_t) expected_file_size)) + .Times(1) + .WillOnce(Return(nullptr)); if (only_once) break; } } void RequestHandlerTcpTests::ExpectOKSendHeader(bool only_once, asapo::Opcode opcode) { for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(opcode, expected_file_id, - expected_file_size, expected_file_name, expected_substream), + EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(opcode, + expected_file_id, + expected_file_size, + expected_file_name, + expected_substream), sizeof(asapo::GenericRequestHeader), _)) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - Return(sizeof(asapo::GenericRequestHeader)) - )); + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + Return(sizeof(asapo::GenericRequestHeader)) + )); if (only_once) break; } } - void RequestHandlerTcpTests::ExpectOKConnect(bool only_once) { int i = 0; for (auto expected_address : receivers_list) { EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(expected_address, _)) - .WillOnce( - DoAll( - testing::SetArgPointee<1>(nullptr), - Return(expected_sds[i]) - )); + .WillOnce( + DoAll( + testing::SetArgPointee<1>(nullptr), + Return(expected_sds[i]) + )); if (only_once) { EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("connected to"), - HasSubstr(expected_address) + HasSubstr("connected to"), + HasSubstr(expected_address) ) - )); + )); } if (only_once) break; i++; } } - void RequestHandlerTcpTests::ExpectOKReceive(bool only_once, asapo::NetworkErrorCode code, std::string message) { int i = 0; for (auto expected_sd : expected_sds) { EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _)) - .InSequence(seq_receive[i]) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - A_WriteSendDataResponse(code, message), - testing::ReturnArg<2>() - )); + .InSequence(seq_receive[i]) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + A_WriteSendDataResponse(code, message), + testing::ReturnArg<2>() + )); if (only_once) { EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("sent data"), - HasSubstr(receivers_list[i]) + HasSubstr("sent data"), + HasSubstr(receivers_list[i]) ) - )); + )); } if (only_once) break; i++; @@ -496,7 +502,7 @@ void RequestHandlerTcpTests::ExpectOKReceive(bool only_once, asapo::NetworkError } void RequestHandlerTcpTests::DoSingleSend(bool connect, bool success) { - if (connect) { + if (connect) { ExpectOKConnect(true); ExpectOKAuthorize(true); } @@ -510,7 +516,7 @@ void RequestHandlerTcpTests::DoSingleSend(bool connect, bool success) { if (connect) { EXPECT_CALL(mock_discovery_service, RotatedUriList(_)). - WillOnce(Return(receivers_list_single)); + WillOnce(Return(receivers_list_single)); } request_handler.PrepareProcessingRequestLocked(); @@ -563,7 +569,6 @@ TEST_F(RequestHandlerTcpTests, DoNotReduceConnectionNumberAtTearDownIfNoError) { ASSERT_THAT(n_connections, Eq(1)); } - TEST_F(RequestHandlerTcpTests, TriesConnectWhenNotConnected) { ExpectFailConnect(); @@ -588,19 +593,16 @@ TEST_F(RequestHandlerTcpTests, FailsWhenCannotAuthorize) { ASSERT_THAT(success, Eq(false)); ASSERT_THAT(retry, Eq(false)); - } - TEST_F(RequestHandlerTcpTests, DoesNotTryConnectWhenConnected) { DoSingleSend(); EXPECT_CALL(mock_discovery_service, RotatedUriList(_)). - WillOnce(Return(receivers_list_single)); - + WillOnce(Return(receivers_list_single)); EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(_, _)) - .Times(0); + .Times(0); ExpectFailSendHeader(true); @@ -611,7 +613,6 @@ TEST_F(RequestHandlerTcpTests, DoesNotTryConnectWhenConnected) { } - TEST_F(RequestHandlerTcpTests, DoNotCloseWhenNotConnected) { EXPECT_CALL(mock_io, CloseSocket_t(_, _)).Times(0); ExpectOKConnect(); @@ -626,13 +627,12 @@ TEST_F(RequestHandlerTcpTests, DoNotCloseWhenNotConnected) { } - TEST_F(RequestHandlerTcpTests, CloseConnectionWhenRebalance) { DoSingleSend(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); EXPECT_CALL(mock_discovery_service, RotatedUriList(_)). - WillOnce(Return(asapo::ReceiversList{})); + WillOnce(Return(asapo::ReceiversList{})); EXPECT_CALL(mock_io, CloseSocket_t(_, _)); @@ -643,8 +643,6 @@ TEST_F(RequestHandlerTcpTests, CloseConnectionWhenRebalance) { } - - TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendHeader) { ExpectOKConnect(); ExpectOKAuthorize(); @@ -658,7 +656,6 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendHeader) { } - TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendData) { ExpectOKConnect(); ExpectOKAuthorize(); @@ -690,7 +687,7 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendMetaData) { TEST_F(RequestHandlerTcpTests, ErrorWhenCannotReceiveData) { EXPECT_CALL(mock_discovery_service, RotatedUriList(_)). - WillOnce(Return(receivers_list_single)); + WillOnce(Return(receivers_list_single)); ExpectOKConnect(true); ExpectOKAuthorize(true); @@ -706,30 +703,30 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotReceiveData) { } void RequestHandlerTcpTests::AssertImmediatelyCallBack(asapo::NetworkErrorCode error_code, - const asapo::ProducerErrorTemplate& err_template) { + const asapo::ProducerErrorTemplate &err_template) { ExpectOKConnect(true); ExpectOKAuthorize(true); ExpectOKSendAll(true); EXPECT_CALL(mock_io, Receive_t(expected_sds[0], _, sizeof(asapo::SendDataResponse), _)) - .InSequence(seq_receive[0]) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - A_WriteSendDataResponse(error_code, expected_auth_message), - testing::ReturnArg<2>() - )); + .InSequence(seq_receive[0]) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + A_WriteSendDataResponse(error_code, expected_auth_message), + testing::ReturnArg<2>() + )); EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("disconnected"), - HasSubstr(receivers_list[0]) + HasSubstr("disconnected"), + HasSubstr(receivers_list[0]) ) - )); + )); EXPECT_CALL(mock_logger, Error(AllOf( - HasSubstr("cannot send"), - HasSubstr(receivers_list[0]) + HasSubstr("cannot send"), + HasSubstr(receivers_list[0]) ) - )); + )); request_handler.PrepareProcessingRequestLocked(); auto success = request_handler.ProcessRequestUnlocked(&request, &retry); @@ -757,12 +754,10 @@ TEST_F(RequestHandlerTcpTests, ImmediatelyCallBackErrorIfAuthorizationFailure) { AssertImmediatelyCallBack(asapo::kNetAuthorizationError, asapo::ProducerErrorTemplates::kWrongInput); } - TEST_F(RequestHandlerTcpTests, ImmediatelyCallBackErrorIfWrongMetadata) { AssertImmediatelyCallBack(asapo::kNetErrorWrongRequest, asapo::ProducerErrorTemplates::kWrongInput); } - TEST_F(RequestHandlerTcpTests, SendEmptyCallBack) { ExpectOKConnect(true); ExpectOKAuthorize(true); @@ -811,8 +806,6 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFileWithServerError) { ASSERT_THAT(retry, Eq(true)); } - - TEST_F(RequestHandlerTcpTests, RetryOnReauthorize) { ExpectOKConnect(false); ExpectOKAuthorize(false); @@ -827,8 +820,6 @@ TEST_F(RequestHandlerTcpTests, RetryOnReauthorize) { ASSERT_THAT(retry, Eq(true)); } - - TEST_F(RequestHandlerTcpTests, FileRequestErrorGettingFileSize) { ExpectGetFileSize(false); @@ -842,8 +833,6 @@ TEST_F(RequestHandlerTcpTests, FileRequestErrorGettingFileSize) { } - - TEST_F(RequestHandlerTcpTests, FileRequestOK) { ExpectGetFileSize(true); ExpectOKConnect(true); @@ -860,19 +849,17 @@ TEST_F(RequestHandlerTcpTests, FileRequestOK) { ASSERT_THAT(callback_called, Eq(true)); ASSERT_THAT(callback_err, Eq(nullptr)); ASSERT_THAT(callback_response, Eq(expected_response)); + ASSERT_THAT(callback_data.get(), Eq(nullptr)); ASSERT_THAT(retry, Eq(false)); } - - TEST_F(RequestHandlerTcpTests, SendOK) { ExpectOKConnect(true); ExpectOKAuthorize(true); ExpectOKSendAll(true); ExpectOKReceive(true, asapo::kNetErrorNoError, expected_response); - request_handler.PrepareProcessingRequestLocked(); auto success = request_handler.ProcessRequestUnlocked(&request, &retry); @@ -883,6 +870,11 @@ TEST_F(RequestHandlerTcpTests, SendOK) { ASSERT_THAT(callback_header.data_size, Eq(header.data_size)); ASSERT_THAT(callback_header.op_code, Eq(header.op_code)); ASSERT_THAT(callback_header.data_id, Eq(header.data_id)); + ASSERT_THAT(callback_data, Ne(nullptr)); + for (auto i = 0; i < expected_file_size; i++) { + ASSERT_THAT(callback_data[i], Eq(expected_callback_data)); + } + ASSERT_THAT(callback_response, Eq(expected_response)); ASSERT_THAT(std::string{callback_header.message}, Eq(std::string{header.message})); } @@ -907,7 +899,6 @@ TEST_F(RequestHandlerTcpTests, SendMetadataIgnoresIngestMode) { } - TEST_F(RequestHandlerTcpTests, SendMetaOnlyOK) { ExpectOKConnect(true); ExpectOKAuthorize(true); @@ -947,20 +938,19 @@ TEST_F(RequestHandlerTcpTests, SendMetaOnlyForFileReadOK) { } - TEST_F(RequestHandlerTcpTests, TimeoutCallsCallback) { EXPECT_CALL(mock_logger, Error(AllOf( - HasSubstr("timeout"), - HasSubstr("substream")) - )); + HasSubstr("timeout"), + HasSubstr("substream")) + )); request_handler.ProcessRequestTimeout(&request); ASSERT_THAT(callback_err, Eq(asapo::ProducerErrorTemplates::kTimeout)); ASSERT_THAT(callback_called, Eq(true)); -} - + ASSERT_THAT(callback_data, Ne(nullptr)); +} TEST_F(RequestHandlerTcpTests, SendWithWarning) { ExpectOKConnect(true); @@ -969,10 +959,9 @@ TEST_F(RequestHandlerTcpTests, SendWithWarning) { ExpectOKReceive(true, asapo::kNetErrorWarning, expected_warning); EXPECT_CALL(mock_logger, Warning(AllOf( - HasSubstr("server"), - HasSubstr(expected_warning)) - )); - + HasSubstr("server"), + HasSubstr(expected_warning)) + )); request_handler.PrepareProcessingRequestLocked(); auto success = request_handler.ProcessRequestUnlocked(&request, &retry); @@ -985,6 +974,7 @@ TEST_F(RequestHandlerTcpTests, SendWithWarning) { ASSERT_THAT(callback_header.data_size, Eq(header.data_size)); ASSERT_THAT(callback_header.op_code, Eq(header.op_code)); ASSERT_THAT(callback_header.data_id, Eq(header.data_id)); + ASSERT_THAT(callback_data, Ne(nullptr)); ASSERT_THAT(std::string{callback_header.message}, Eq(std::string{header.message})); }