diff --git a/consumer/api/c/include/asapo/consumer_c.h b/consumer/api/c/include/asapo/consumer_c.h index 75974ab5b1d366de37f6b1d472b0ae822ec766fd..01a2018f7db793d9fba3e93addc7a2d9644aa808 100644 --- a/consumer/api/c/include/asapo/consumer_c.h +++ b/consumer/api/c/include/asapo/consumer_c.h @@ -176,7 +176,6 @@ void asapo_consumer_set_resend_nacs(AsapoConsumerHandle consumer, uint64_t delay_ms, uint64_t resend_attempts); -// TODO: What happend to asapo_create_source_credentials? const char* asapo_message_data_get_as_chars(const AsapoMessageDataHandle data); const char* asapo_message_meta_get_name(const AsapoMessageMetaHandle md); void asapo_message_meta_get_timestamp(const AsapoMessageMetaHandle md, diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 3b85079b7879ab4e3027f1d7d1305da58b10de9f..38cfe0362721686bcb9cbdec7cf85b02d59d059f 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -427,7 +427,7 @@ cdef class __PyConsumerFactory: consumer.c_consumer.get().SetTimeout(timeout) return consumer -def create_consumer(server_name,source_path,has_filesystem,instance_id,pipeline_step,beamtime_id,data_source,token,timeout_ms): +def create_consumer(server_name,source_path,has_filesystem,beamtime_id,data_source,token,timeout_ms,instance_id='auto',pipeline_step='auto'): """ :param server_name: Server endpoint (hostname:port) :type server_name: string @@ -435,10 +435,6 @@ def create_consumer(server_name,source_path,has_filesystem,instance_id,pipeline_ :type source_path: string :param has_filesystem: True if the source_path is accessible locally, otherwise will use file transfer service to get data :type has_filesystem: bool - :param instance_id: instance id, can be "auto", will create a combination out of hostname and pid - :type instance_id: string - :param pipeline_step: pipeline step id, can be "auto", "DefaultStep" is used then - :type pipeline_step: string :param beamline: beamline name, can be "auto" if beamtime_id is given :type beamline: string :param data_source: name of the data source that produces data @@ -449,6 +445,10 @@ def create_consumer(server_name,source_path,has_filesystem,instance_id,pipeline_ :type nthreads: int :param timeout_ms: send requests timeout in milliseconds :type timeout_ms: int + :param instance_id: instance id, can be "auto" (default), will create a combination out of hostname and pid + :type instance_id: string + :param pipeline_step: pipeline step id, can be "auto" (default), "DefaultStep" is used then + :type pipeline_step: string :return: consumer object and error. (None,err) if case of error, (consumer, None) if success :rtype: Tuple with consumer object and error. """ diff --git a/producer/api/cpp/unittests/test_request_handler_tcp.cpp b/producer/api/cpp/unittests/test_request_handler_tcp.cpp index fb17f420316d78734abe5fc712d014c879876e90..97bd36d1939b81e314d6a24b7dffa1ac6b2bac68 100644 --- a/producer/api/cpp/unittests/test_request_handler_tcp.cpp +++ b/producer/api/cpp/unittests/test_request_handler_tcp.cpp @@ -38,8 +38,8 @@ TEST(RequestHandlerTcp, Constructor) { MockDiscoveryService ds; asapo::RequestHandlerTcp request{&ds, 1, nullptr}; - ASSERT_THAT(dynamic_cast<const asapo::IO*>(request.io__.get()), Ne(nullptr)); - ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(request.log__), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::IO *>(request.io__.get()), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger *>(request.log__), Ne(nullptr)); ASSERT_THAT(request.discovery_service__, Eq(&ds)); } @@ -47,229 +47,238 @@ std::string expected_auth_message = {"12345"}; asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; class RequestHandlerTcpTests : public testing::Test { - public: - NiceMock<asapo::MockIO> mock_io; - NiceMock<MockDiscoveryService> mock_discovery_service; - - uint64_t expected_file_id = 42; - uint64_t expected_file_size = 1337; - uint64_t expected_meta_size = 4; - std::string expected_metadata = "meta"; - std::string expected_warning = "warning"; - std::string expected_response = "response"; - - char expected_file_name[asapo::kMaxMessageSize] = "test_name"; - char expected_beamtime_id[asapo::kMaxMessageSize] = "test_beamtime_id"; - char expected_stream[asapo::kMaxMessageSize] = "test_stream"; - - uint64_t expected_thread_id = 2; - - asapo::Error callback_err; - asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, - expected_meta_size, expected_file_name, expected_stream}; - asapo::GenericRequestHeader header_fromfile{expected_op_code, expected_file_id, 0, expected_meta_size, - expected_file_name, expected_stream}; - bool callback_called = false; - asapo::GenericRequestHeader callback_header; - std::string callback_response; - uint8_t expected_callback_data = 2; - asapo::MessageData expected_data1{[this]() { - auto a = new uint8_t[expected_file_size]; - for (uint64_t i = 0; i < expected_file_size; i++) { - a[i] = expected_callback_data; - } - return a; - } - ()}; - asapo::MessageData callback_data; - - asapo::ProducerRequest request{false, expected_beamtime_id, header, std::move(expected_data1), expected_metadata, "", - [this](asapo::RequestCallbackPayload payload, asapo::Error err) { - callback_called = true; - callback_err = std::move(err); - callback_header = payload.original_header; - callback_response = payload.response; - callback_data = std::move(payload.data); - }, true, 0}; - - std::string expected_origin_fullpath = std::string("origin/") + expected_file_name; - asapo::ProducerRequest request_filesend{false, expected_beamtime_id, header_fromfile, nullptr, expected_metadata, - expected_origin_fullpath, - [this](asapo::RequestCallbackPayload payload, asapo::Error err) { - callback_called = true; - callback_err = std::move(err); - callback_header = payload.original_header; - callback_response = payload.response; - callback_data = std::move(payload.data); - }, true, 0}; - - asapo::ProducerRequest - request_nocallback{false, expected_beamtime_id, header, nullptr, expected_metadata, "", nullptr, true, 0}; - testing::NiceMock<asapo::MockLogger> mock_logger; - uint64_t n_connections{0}; - asapo::RequestHandlerTcp request_handler{&mock_discovery_service, expected_thread_id, &n_connections}; - - std::string expected_address1 = {"127.0.0.1:9090"}; - std::string expected_address2 = {"127.0.0.1:9091"}; - asapo::ReceiversList receivers_list{expected_address1, expected_address2}; - asapo::ReceiversList receivers_list2{expected_address2, expected_address1}; - - asapo::ReceiversList receivers_list_single{expected_address1}; - - std::vector<asapo::SocketDescriptor> expected_sds{83942, 83943}; - - bool retry; - Sequence seq_receive[2]; - void ExpectFailConnect(bool only_once = false); - void ExpectFailAuthorize(asapo::NetworkErrorCode error_code, bool useNewCredsFormat = false); - void ExpectOKAuthorize(bool only_once = false, bool useNewCredsFormat = false); - void ExpectFailSendHeader(bool only_once = false); - void ExpectFailSend(uint64_t expected_size, bool only_once); - void ExpectFailSend(bool only_once = false); - void ExpectFailSendMetaData(bool only_once = false); - void ExpectOKConnect(bool only_once = false); - void ExpectOKSendHeader(bool only_once = false, asapo::Opcode code = expected_op_code); - void ExpectOKSend(uint64_t expected_size, bool only_once); - void ExpectOKSendAll(bool only_once); - void ExpectGetFileSize(bool ok); - void ExpectOKSend(bool only_once = false); - void ExpectOKSendFile(bool only_once = false); - void ExpectFailSendFile(const asapo::ProducerErrorTemplate& err_template, bool client_error = false); - void ExpectOKSendMetaData(bool only_once = false); - void ExpectFailReceive(bool only_once = false); - void ExpectOKReceive(bool only_once = true, asapo::NetworkErrorCode code = asapo::kNetErrorNoError, - std::string message = ""); - void DoSingleSend(bool connect = true, bool success = true); - void AssertImmediatelyCallBack(asapo::NetworkErrorCode error_code, const asapo::ProducerErrorTemplate& err_template); - void SetUp() override { - request_handler.log__ = &mock_logger; - request_handler.io__.reset(&mock_io); - request.header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; - request_filesend.header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; - request_nocallback.header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; - ON_CALL(mock_discovery_service, RotatedUriList(_)). - WillByDefault(Return(receivers_list)); - - } - void TearDown() override { - request_handler.io__.release(); + public: + NiceMock<asapo::MockIO> mock_io; + NiceMock<MockDiscoveryService> mock_discovery_service; + + uint64_t expected_file_id = 42; + uint64_t expected_file_size = 1337; + uint64_t expected_meta_size = 4; + std::string expected_metadata = "meta"; + std::string expected_warning = "warning"; + std::string expected_response = "response"; + + char expected_file_name[asapo::kMaxMessageSize] = "test_name"; + char expected_beamtime_id[asapo::kMaxMessageSize] = "test_beamtime_id"; + char expected_stream[asapo::kMaxMessageSize] = "test_stream"; + + uint64_t expected_thread_id = 2; + + asapo::Error callback_err; + asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, + expected_meta_size, expected_file_name, expected_stream}; + asapo::GenericRequestHeader header_fromfile{expected_op_code, expected_file_id, 0, expected_meta_size, + expected_file_name, expected_stream}; + bool callback_called = false; + asapo::GenericRequestHeader callback_header; + std::string callback_response; + uint8_t expected_callback_data = 2; + asapo::MessageData expected_data1{[this]() { + auto a = new uint8_t[expected_file_size]; + for (uint64_t i = 0; i < expected_file_size; i++) { + a[i] = expected_callback_data; } + return a; + } + ()}; + asapo::MessageData callback_data; + + asapo::ProducerRequest request{false, expected_beamtime_id, header, std::move(expected_data1), expected_metadata, "", + [this](asapo::RequestCallbackPayload payload, asapo::Error err) { + callback_called = true; + callback_err = std::move(err); + callback_header = payload.original_header; + callback_response = payload.response; + callback_data = std::move(payload.data); + }, true, 0}; + + std::string expected_origin_fullpath = std::string("origin/") + expected_file_name; + asapo::ProducerRequest request_filesend{false, expected_beamtime_id, header_fromfile, nullptr, expected_metadata, + expected_origin_fullpath, + [this](asapo::RequestCallbackPayload payload, asapo::Error err) { + callback_called = true; + callback_err = std::move(err); + callback_header = payload.original_header; + callback_response = payload.response; + callback_data = std::move(payload.data); + }, true, 0}; + + asapo::ProducerRequest + request_nocallback{false, expected_beamtime_id, header, nullptr, expected_metadata, "", nullptr, true, 0}; + testing::NiceMock<asapo::MockLogger> mock_logger; + uint64_t n_connections{0}; + asapo::RequestHandlerTcp request_handler{&mock_discovery_service, expected_thread_id, &n_connections}; + + std::string expected_address1 = {"127.0.0.1:9090"}; + std::string expected_address2 = {"127.0.0.1:9091"}; + asapo::ReceiversList receivers_list{expected_address1, expected_address2}; + asapo::ReceiversList receivers_list2{expected_address2, expected_address1}; + + asapo::ReceiversList receivers_list_single{expected_address1}; + + std::vector<asapo::SocketDescriptor> expected_sds{83942, 83943}; + + bool retry; + Sequence seq_receive[2]; + void ExpectFailConnect(bool only_once = false); + void ExpectFailAuthorize(asapo::NetworkErrorCode error_code, bool useNewCredsFormat = false); + void ExpectOKAuthorize(bool only_once = false, bool useNewCredsFormat = false); + void ExpectFailSendHeader(bool only_once = false); + void ExpectFailSend(uint64_t expected_size, bool only_once); + void ExpectFailSend(bool only_once = false); + void ExpectFailSendMetaData(bool only_once = false); + void ExpectOKConnect(bool only_once = false); + void ExpectOKSendHeader(bool only_once = false, asapo::Opcode code = expected_op_code); + void ExpectOKSend(uint64_t expected_size, bool only_once); + void ExpectOKSendAll(bool only_once); + void ExpectGetFileSize(bool ok); + void ExpectOKSend(bool only_once = false); + void ExpectOKSendFile(bool only_once = false); + void ExpectFailSendFile(const asapo::ProducerErrorTemplate &err_template, bool client_error = false); + void ExpectOKSendMetaData(bool only_once = false); + void ExpectFailReceive(bool only_once = false); + void ExpectOKReceive(bool only_once = true, asapo::NetworkErrorCode code = asapo::kNetErrorNoError, + std::string message = ""); + void DoSingleSend(bool connect = true, bool success = true); + void AssertImmediatelyCallBack(asapo::NetworkErrorCode error_code, const asapo::ProducerErrorTemplate &err_template); + void SetUp() override { + request_handler.log__ = &mock_logger; + request_handler.io__.reset(&mock_io); + request.header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; + request_filesend.header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; + request_nocallback.header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; + ON_CALL(mock_discovery_service, RotatedUriList(_)). + WillByDefault(Return(receivers_list)); + + } + void TearDown() override { + request_handler.io__.release(); + } }; ACTION_P2(A_WriteSendResponse, error_code, message) { - ((asapo::SendResponse*) arg1)->op_code = asapo::kOpcodeTransferData; - ((asapo::SendResponse*) arg1)->error_code = error_code; - strcpy(((asapo::SendResponse*) arg1)->message, message.c_str()); + ((asapo::SendResponse *) arg1)->op_code = asapo::kOpcodeTransferData; + ((asapo::SendResponse *) arg1)->error_code = error_code; + strcpy(((asapo::SendResponse *) arg1)->message, message.c_str()); } MATCHER_P5(M_CheckSendRequest, op_code, file_id, file_size, message, stream, "Checks if a valid GenericRequestHeader was Send") { - bool matchOpcode = ((asapo::GenericRequestHeader*) arg)->op_code == op_code; - bool matchFileId = ((asapo::GenericRequestHeader*) arg)->data_id == uint64_t(file_id); - bool matchFileSize = ((asapo::GenericRequestHeader*) arg)->data_size == uint64_t(file_size); - bool matchMessage = strcmp(((asapo::GenericRequestHeader*) arg)->message, message) == 0; - bool matchStream = strcmp(((asapo::GenericRequestHeader*) arg)->stream, stream) == 0; - - return matchOpcode && matchFileId && matchFileSize && matchMessage && matchStream; - +// actually, arg may be not GenericRequestHeader, so it should exit on check of op_code. Not very good solution, maybe pass size of GenericRequestHeader and compare it + return ((asapo::GenericRequestHeader *) arg)->op_code == op_code && + ((asapo::GenericRequestHeader *) arg)->data_id == uint64_t(file_id) && + ((asapo::GenericRequestHeader *) arg)->data_size == uint64_t(file_size) && + strcmp(((asapo::GenericRequestHeader *) arg)->message, message) == 0 && + strcmp(((asapo::GenericRequestHeader *) arg)->stream, stream) == 0; } void RequestHandlerTcpTests::ExpectFailConnect(bool only_once) { - for (auto expected_address : receivers_list) { + for (auto expected_address: receivers_list) { EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(expected_address, _)) - .WillOnce( - DoAll( - testing::SetArgPointee<1>(asapo::IOErrorTemplates::kInvalidAddressFormat.Generate().release()), - Return(asapo::kDisconnectedSocketDescriptor) - )); + .WillOnce( + DoAll( + testing::SetArgPointee<1>(asapo::IOErrorTemplates::kInvalidAddressFormat.Generate().release()), + Return(asapo::kDisconnectedSocketDescriptor) + )); if (only_once) EXPECT_CALL(mock_logger, Debug(AllOf( HasSubstr("cannot connect"), HasSubstr(expected_address) ) - )); + )); if (only_once) break; } } -void RequestHandlerTcpTests::ExpectFailAuthorize(asapo::NetworkErrorCode error_code, bool useNewCredsFormat /*= false*/) { +void RequestHandlerTcpTests::ExpectFailAuthorize(asapo::NetworkErrorCode error_code, + bool useNewCredsFormat /*= false*/) { auto expected_sd = expected_sds[0]; EXPECT_CALL(mock_io, - Send_t(expected_sd, M_CheckSendRequest(asapo::kOpcodeAuthorize, 0, 0, useNewCredsFormat ? "new_source_credentials_format" : "", - ""), - sizeof(asapo::GenericRequestHeader), _)) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - Return(sizeof(asapo::GenericRequestHeader)) - )); + Send_t(expected_sd, + M_CheckSendRequest(asapo::kOpcodeAuthorize, + 0, + 0, + useNewCredsFormat ? "new_source_credentials_format" : "", + ""), + sizeof(asapo::GenericRequestHeader), + _)) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + Return(sizeof(asapo::GenericRequestHeader)) + )); EXPECT_CALL(mock_io, Send_t(expected_sd, _, strlen(expected_beamtime_id), _)) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - Return(strlen(expected_beamtime_id)) - )); + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + Return(strlen(expected_beamtime_id)) + )); EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendResponse), _)) - .InSequence(seq_receive[0]) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - A_WriteSendResponse(error_code, expected_auth_message), - testing::ReturnArg<2>() - )); + .InSequence(seq_receive[0]) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + A_WriteSendResponse(error_code, expected_auth_message), + testing::ReturnArg<2>() + )); EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); EXPECT_CALL(mock_logger, Debug(AllOf( HasSubstr("disconnected"), HasSubstr(receivers_list[0]) ) - )); + )); EXPECT_CALL(mock_logger, Error(AllOf( HasSubstr("authorization"), HasSubstr(expected_auth_message), HasSubstr(receivers_list[0]) ) - )); + )); } void RequestHandlerTcpTests::ExpectOKAuthorize(bool only_once, bool useNewCredsFormat /*= false*/) { size_t i = 0; - for (auto expected_sd : expected_sds) { + for (auto expected_sd: expected_sds) { EXPECT_CALL(mock_io, - Send_t(expected_sd, M_CheckSendRequest(asapo::kOpcodeAuthorize, 0, 0, useNewCredsFormat ? "new_source_credentials_format" : "", - ""), - sizeof(asapo::GenericRequestHeader), _)) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - Return(sizeof(asapo::GenericRequestHeader)) - )); + Send_t(expected_sd, + M_CheckSendRequest(asapo::kOpcodeAuthorize, + 0, + 0, + useNewCredsFormat ? "new_source_credentials_format" : "", + ""), + sizeof(asapo::GenericRequestHeader), + _)) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + Return(sizeof(asapo::GenericRequestHeader)) + )); EXPECT_CALL(mock_io, Send_t(expected_sd, _, strlen(expected_beamtime_id), _)) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - Return(strlen(expected_beamtime_id)) - )); + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + Return(strlen(expected_beamtime_id)) + )); EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendResponse), _)) - .InSequence(seq_receive[i]) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - A_WriteSendResponse(asapo::kNetErrorNoError, expected_auth_message), - testing::ReturnArg<2>() - )); + .InSequence(seq_receive[i]) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + A_WriteSendResponse(asapo::kNetErrorNoError, expected_auth_message), + testing::ReturnArg<2>() + )); if (only_once) { EXPECT_CALL(mock_logger, Info(AllOf( HasSubstr("authorized"), HasSubstr(receivers_list[i]) ) - )); + )); } if (only_once) break; i++; @@ -278,30 +287,30 @@ void RequestHandlerTcpTests::ExpectOKAuthorize(bool only_once, bool useNewCredsF void RequestHandlerTcpTests::ExpectFailSendHeader(bool only_once) { size_t i = 0; - for (auto expected_sd : expected_sds) { + for (auto expected_sd: expected_sds) { EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendRequest(expected_op_code, - expected_file_id, - expected_file_size, - expected_file_name, - expected_stream), + expected_file_id, + expected_file_size, + expected_file_name, + expected_stream), sizeof(asapo::GenericRequestHeader), _)) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), - Return(-1) - )); + .WillOnce( + DoAll( + testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), + Return(-1) + )); if (only_once) { EXPECT_CALL(mock_logger, Debug(AllOf( HasSubstr("disconnected"), HasSubstr(receivers_list[i]) ) - )); + )); EXPECT_CALL(mock_logger, Warning(AllOf( HasSubstr("cannot send"), HasSubstr(receivers_list[i]) ) - )); + )); } EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); if (only_once) break; @@ -312,14 +321,14 @@ void RequestHandlerTcpTests::ExpectFailSendHeader(bool only_once) { } } -void RequestHandlerTcpTests::ExpectFailSendFile(const asapo::ProducerErrorTemplate& err_template, bool client_error) { +void RequestHandlerTcpTests::ExpectFailSendFile(const asapo::ProducerErrorTemplate &err_template, bool client_error) { size_t i = 0; - for (auto expected_sd : expected_sds) { + for (auto expected_sd: expected_sds) { EXPECT_CALL(mock_io, SendFile_t(expected_sd, expected_origin_fullpath, (size_t) expected_file_size)) - .Times(1) - .WillOnce( - Return(err_template.Generate().release()) - ); + .Times(1) + .WillOnce( + Return(err_template.Generate().release()) + ); if (client_error) { @@ -327,11 +336,11 @@ void RequestHandlerTcpTests::ExpectFailSendFile(const asapo::ProducerErrorTempla HasSubstr("disconnected"), HasSubstr(receivers_list[i]) ) - )); + )); EXPECT_CALL(mock_logger, Error(AllOf( - HasSubstr("cannot send"), - HasSubstr(receivers_list[i])) - )); + HasSubstr("cannot send"), + HasSubstr(receivers_list[i])) + )); } @@ -347,26 +356,26 @@ void RequestHandlerTcpTests::ExpectFailSendFile(const asapo::ProducerErrorTempla void RequestHandlerTcpTests::ExpectFailSend(uint64_t expected_size, bool only_once) { size_t i = 0; - for (auto expected_sd : expected_sds) { + for (auto expected_sd: expected_sds) { EXPECT_CALL(mock_io, Send_t(expected_sd, _, (size_t) expected_size, _)) - .Times(1) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), - Return(-1) - )); + .Times(1) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), + Return(-1) + )); if (only_once) { EXPECT_CALL(mock_logger, Debug(AllOf( HasSubstr("disconnected"), HasSubstr(receivers_list[i]) ) - )); + )); EXPECT_CALL(mock_logger, Warning(AllOf( HasSubstr("cannot send"), HasSubstr(receivers_list[i]) ) - )); + )); } EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); @@ -386,25 +395,25 @@ void RequestHandlerTcpTests::ExpectFailSendMetaData(bool only_once) { void RequestHandlerTcpTests::ExpectFailReceive(bool only_once) { size_t i = 0; - for (auto expected_sd : expected_sds) { + for (auto expected_sd: expected_sds) { EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendResponse), _)) - .InSequence(seq_receive[i]) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), - testing::Return(-1) - )); + .InSequence(seq_receive[i]) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), + testing::Return(-1) + )); EXPECT_CALL(mock_logger, Debug(AllOf( HasSubstr("disconnected"), HasSubstr(receivers_list[i]) ) - )); + )); EXPECT_CALL(mock_logger, Warning(AllOf( HasSubstr("cannot send"), HasSubstr(receivers_list[i]) ) - )); + )); EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); if (only_once) break; i++; @@ -420,14 +429,14 @@ void RequestHandlerTcpTests::ExpectOKSendAll(bool only_once) { } void RequestHandlerTcpTests::ExpectOKSend(uint64_t expected_size, bool only_once) { - for (auto expected_sd : expected_sds) { + for (auto expected_sd: expected_sds) { EXPECT_CALL(mock_io, Send_t(expected_sd, _, (size_t) expected_size, _)) - .Times(1) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - Return((size_t) expected_file_size) - )); + .Times(1) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + Return((size_t) expected_file_size) + )); if (only_once) break; } } @@ -441,27 +450,27 @@ void RequestHandlerTcpTests::ExpectOKSend(bool only_once) { } void RequestHandlerTcpTests::ExpectOKSendFile(bool only_once) { - for (auto expected_sd : expected_sds) { + for (auto expected_sd: expected_sds) { EXPECT_CALL(mock_io, SendFile_t(expected_sd, expected_origin_fullpath, (size_t) expected_file_size)) - .Times(1) - .WillOnce(Return(nullptr)); + .Times(1) + .WillOnce(Return(nullptr)); if (only_once) break; } } void RequestHandlerTcpTests::ExpectOKSendHeader(bool only_once, asapo::Opcode opcode) { - for (auto expected_sd : expected_sds) { + for (auto expected_sd: expected_sds) { EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendRequest(opcode, - expected_file_id, - expected_file_size, - expected_file_name, - expected_stream), + expected_file_id, + expected_file_size, + expected_file_name, + expected_stream), sizeof(asapo::GenericRequestHeader), _)) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - Return(sizeof(asapo::GenericRequestHeader)) - )); + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + Return(sizeof(asapo::GenericRequestHeader)) + )); if (only_once) break; } @@ -469,19 +478,19 @@ void RequestHandlerTcpTests::ExpectOKSendHeader(bool only_once, asapo::Opcode op void RequestHandlerTcpTests::ExpectOKConnect(bool only_once) { size_t i = 0; - for (auto expected_address : receivers_list) { + for (auto expected_address: receivers_list) { EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(expected_address, _)) - .WillOnce( - DoAll( - testing::SetArgPointee<1>(nullptr), - Return(expected_sds[i]) - )); + .WillOnce( + DoAll( + testing::SetArgPointee<1>(nullptr), + Return(expected_sds[i]) + )); if (only_once) { EXPECT_CALL(mock_logger, Debug(AllOf( HasSubstr("connected to"), HasSubstr(expected_address) ) - )); + )); } if (only_once) break; i++; @@ -490,21 +499,21 @@ void RequestHandlerTcpTests::ExpectOKConnect(bool only_once) { void RequestHandlerTcpTests::ExpectOKReceive(bool only_once, asapo::NetworkErrorCode code, std::string message) { size_t i = 0; - for (auto expected_sd : expected_sds) { + for (auto expected_sd: expected_sds) { EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendResponse), _)) - .InSequence(seq_receive[i]) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - A_WriteSendResponse(code, message), - testing::ReturnArg<2>() - )); + .InSequence(seq_receive[i]) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + A_WriteSendResponse(code, message), + testing::ReturnArg<2>() + )); if (only_once) { EXPECT_CALL(mock_logger, Debug(AllOf( HasSubstr("sent data"), HasSubstr(receivers_list[i]) ) - )); + )); } if (only_once) break; i++; @@ -526,7 +535,7 @@ void RequestHandlerTcpTests::DoSingleSend(bool connect, bool success) { if (connect) { EXPECT_CALL(mock_discovery_service, RotatedUriList(_)). - WillOnce(Return(receivers_list_single)); + WillOnce(Return(receivers_list_single)); } request_handler.PrepareProcessingRequestLocked(); @@ -624,10 +633,10 @@ TEST_F(RequestHandlerTcpTests, DoesNotTryConnectWhenConnected) { DoSingleSend(); EXPECT_CALL(mock_discovery_service, RotatedUriList(_)). - WillOnce(Return(receivers_list_single)); + WillOnce(Return(receivers_list_single)); EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(_, _)) - .Times(0); + .Times(0); ExpectFailSendHeader(true); @@ -657,7 +666,7 @@ TEST_F(RequestHandlerTcpTests, CloseConnectionWhenRebalance) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); EXPECT_CALL(mock_discovery_service, RotatedUriList(_)). - WillOnce(Return(asapo::ReceiversList{})); + WillOnce(Return(asapo::ReceiversList{})); EXPECT_CALL(mock_io, CloseSocket_t(_, _)); @@ -712,7 +721,7 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendMetaData) { TEST_F(RequestHandlerTcpTests, ErrorWhenCannotReceiveData) { EXPECT_CALL(mock_discovery_service, RotatedUriList(_)). - WillOnce(Return(receivers_list_single)); + WillOnce(Return(receivers_list_single)); ExpectOKConnect(true); ExpectOKAuthorize(true); @@ -728,30 +737,30 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotReceiveData) { } void RequestHandlerTcpTests::AssertImmediatelyCallBack(asapo::NetworkErrorCode error_code, - const asapo::ProducerErrorTemplate& err_template) { + const asapo::ProducerErrorTemplate &err_template) { ExpectOKConnect(true); ExpectOKAuthorize(true); ExpectOKSendAll(true); EXPECT_CALL(mock_io, Receive_t(expected_sds[0], _, sizeof(asapo::SendResponse), _)) - .InSequence(seq_receive[0]) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - A_WriteSendResponse(error_code, expected_auth_message), - testing::ReturnArg<2>() - )); + .InSequence(seq_receive[0]) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + A_WriteSendResponse(error_code, expected_auth_message), + testing::ReturnArg<2>() + )); EXPECT_CALL(mock_logger, Debug(AllOf( HasSubstr("disconnected"), HasSubstr(receivers_list[0]) ) - )); + )); EXPECT_CALL(mock_logger, Error(AllOf( HasSubstr("cannot send"), HasSubstr(receivers_list[0]) ) - )); + )); request_handler.PrepareProcessingRequestLocked(); auto success = request_handler.ProcessRequestUnlocked(&request, &retry); @@ -779,12 +788,10 @@ TEST_F(RequestHandlerTcpTests, ImmediatelyCallBackErrorIfAuthorizationFailure) { AssertImmediatelyCallBack(asapo::kNetAuthorizationError, asapo::ProducerErrorTemplates::kWrongInput); } - TEST_F(RequestHandlerTcpTests, ImmediatelyCallBackErrorIfNotSupportedfailure) { AssertImmediatelyCallBack(asapo::kNetErrorNotSupported, asapo::ProducerErrorTemplates::kUnsupportedClient); } - TEST_F(RequestHandlerTcpTests, ImmediatelyCallBackErrorIfWrongMetadata) { AssertImmediatelyCallBack(asapo::kNetErrorWrongRequest, asapo::ProducerErrorTemplates::kWrongInput); } @@ -997,9 +1004,9 @@ TEST_F(RequestHandlerTcpTests, SendMetaOnlyForFileReadOK) { TEST_F(RequestHandlerTcpTests, TimeoutCallsCallback) { EXPECT_CALL(mock_logger, Error(AllOf( - HasSubstr("timeout"), - HasSubstr("stream")) - )); + HasSubstr("timeout"), + HasSubstr("stream")) + )); request_handler.ProcessRequestTimeoutUnlocked(&request); @@ -1016,9 +1023,9 @@ TEST_F(RequestHandlerTcpTests, SendWithWarning) { ExpectOKReceive(true, asapo::kNetErrorWarning, expected_warning); EXPECT_CALL(mock_logger, Warning(AllOf( - HasSubstr("server"), - HasSubstr(expected_warning)) - )); + HasSubstr("server"), + HasSubstr(expected_warning)) + )); request_handler.PrepareProcessingRequestLocked(); auto success = request_handler.ProcessRequestUnlocked(&request, &retry); diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 1a636d92a431ec6367b6744f0d394c739be7e895..bb001d97df1d92113d5040ace1bef3b5f051b94c 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -491,16 +491,12 @@ cdef class PyProducer: throw_exception(err) return pyProd -def create_producer(endpoint,type,instance_id,pipeline_step,beamtime_id,beamline,data_source,token,nthreads,timeout_ms): +def create_producer(endpoint,type,beamtime_id,beamline,data_source,token,nthreads,timeout_ms,instance_id='auto',pipeline_step='auto'): """ :param endpoint: server endpoint (url:port) :type endpoint: string :param type: source type, "raw" to write to "raw" folder in beamline filesystem,"processed" to write to "processed" folder in core filesystem :type type: string - :param instance_id: instance id, can be "auto", will create a combination out of hostname and pid - :type instance_id: string - :param pipeline_step: pipeline step id, can be "auto", "DefaultStep" is used then - :type pipeline_step: string :param beamline: beamline name, can be "auto" if beamtime_id is given :type beamline: string :param data_source: name of the data source that produces data @@ -511,6 +507,10 @@ def create_producer(endpoint,type,instance_id,pipeline_step,beamtime_id,beamline :type nthreads: int :param timeout_ms: send requests timeout in milliseconds :type timeout_ms: int + :param instance_id: instance id, can be "auto" (default), will create a combination out of hostname and pid + :type instance_id: string + :param pipeline_step: pipeline step id, can be "auto" (default), "DefaultStep" is used then + :type pipeline_step: string :raises: AsapoWrongInputError: wrong input (number of threads, ,,,) AsapoProducerError: actually should not happen diff --git a/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py b/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py index 1c6653284f73d28f2f9bcfd4860e493f59490c6c..485410e11255acf83f84bb082ace4a60b9f72ba7 100644 --- a/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py +++ b/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py @@ -25,7 +25,7 @@ class AsapoSender: def _callback(self, header, err): print ("hello self callback") -producer = asapo_producer.create_producer(endpoint,'processed','auto','auto',beamtime,'auto', data_source, token, nthreads, 600000) +producer = asapo_producer.create_producer(endpoint,'processed',beamtime,'auto', data_source, token, nthreads, 600000) producer.set_log_level("debug") print(asapo_producer.__version__) diff --git a/tests/automatic/producer/aai/producer_aai.py b/tests/automatic/producer/aai/producer_aai.py index 2ddf0c99cd646b379abe08177d5f92b11013d72d..da575d7c727b2f6dee4dfd21c50a7c41cb755794 100644 --- a/tests/automatic/producer/aai/producer_aai.py +++ b/tests/automatic/producer/aai/producer_aai.py @@ -27,7 +27,7 @@ def callback(header,err): lock.release() -producer = asapo_producer.create_producer(endpoint,'processed','auto','auto','auto',beamline, data_source, token, nthreads, 60000) +producer = asapo_producer.create_producer(endpoint,'processed','auto',beamline, data_source, token, nthreads, 60000) producer.set_log_level("debug") diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index 3ad913e58b1c5744a645ad4a633fe62d29e35639..beb45a12dd9d5544496655a28f9ca709a017ba14 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -47,7 +47,7 @@ def assert_version(version): if not ok: sys.exit(1) -producer = asapo_producer.create_producer(endpoint,'processed','auto','auto', beamtime, 'auto', data_source, token, nthreads, 60000) +producer = asapo_producer.create_producer(endpoint,'processed', beamtime, 'auto', data_source, token, nthreads, 60000) producer.set_log_level("debug") @@ -233,7 +233,7 @@ else: # create with error try: - producer = asapo_producer.create_producer(endpoint,'processed','auto','auto', beamtime, 'auto', data_source, token, 0, 0) + producer = asapo_producer.create_producer(endpoint,'processed', beamtime, 'auto', data_source, token, 0, 0) except asapo_producer.AsapoWrongInputError as e: print(e) else: diff --git a/tests/automatic/settings/broker_settings.json b/tests/automatic/settings/broker_settings.json index 623f348608d822c221811adcfd00ab922fcddee0..276c5eaefa2eab50da07cd0e444196d55fece395 100644 --- a/tests/automatic/settings/broker_settings.json +++ b/tests/automatic/settings/broker_settings.json @@ -1,5 +1,6 @@ { "DatabaseServer":"127.0.0.1:27017", + "DiscoveryServer": "localhost:8400/asapo-discovery", "PerformanceDbServer": "localhost:8086", "MonitorPerformance": true, "MonitoringServerUrl":"auto",