From 452642a3d116ac10c81bedc62b01b1c8b0ddfdfc Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Tue, 26 Nov 2019 17:28:54 +0100 Subject: [PATCH] producer can send to substreams --- common/cpp/include/common/networking.h | 6 +- producer/api/cpp/include/producer/producer.h | 30 ++++++++- producer/api/cpp/src/producer_impl.cpp | 49 +++++++++++--- producer/api/cpp/src/producer_impl.h | 14 +++- .../api/cpp/unittests/test_producer_impl.cpp | 67 +++++++++++++------ .../unittests/test_request_handler_tcp.cpp | 23 ++++--- 6 files changed, 145 insertions(+), 44 deletions(-) diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index 66c0c7ceb..529541618 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -44,14 +44,17 @@ const std::size_t kPosDataSetSize = 2; struct GenericRequestHeader { GenericRequestHeader(Opcode i_op_code = kOpcodeUnknownOp, uint64_t i_data_id = 0, - uint64_t i_data_size = 0, uint64_t i_meta_size = 0, const std::string& i_message = ""): + uint64_t i_data_size = 0, uint64_t i_meta_size = 0, const std::string& i_message = "", + const std::string& i_substream = ""): op_code{i_op_code}, data_id{i_data_id}, data_size{i_data_size}, meta_size{i_meta_size} { strncpy(message, i_message.c_str(), kMaxMessageSize); + strncpy(substream, i_substream.c_str(), kMaxMessageSize); } GenericRequestHeader(const GenericRequestHeader& header) { op_code = header.op_code, data_id = header.data_id, data_size = header.data_size, meta_size = header.meta_size, memcpy(custom_data, header.custom_data, kNCustomParams * sizeof(uint64_t)), strncpy(message, header.message, kMaxMessageSize); + strncpy(substream, header.substream, kMaxMessageSize); } Opcode op_code; @@ -60,6 +63,7 @@ struct GenericRequestHeader { uint64_t meta_size; CustomRequestData custom_data; char message[kMaxMessageSize]; + char substream[kMaxMessageSize]; std::string Json() { std::string s = "{\"id\":" + std::to_string(data_id) + "," "\"buffer\":\"" + std::string(message) + "\"" diff --git a/producer/api/cpp/include/producer/producer.h b/producer/api/cpp/include/producer/producer.h index d0daa4160..78fc4ac91 100644 --- a/producer/api/cpp/include/producer/producer.h +++ b/producer/api/cpp/include/producer/producer.h @@ -39,19 +39,45 @@ class Producer { virtual Error SendData__(const EventHeader& event_header, void* data, uint64_t ingest_mode, RequestCallback callback) = 0; + //! Sends data to the receiver + /*! + \param event_header - A stucture with the meta information (file name, size, a string with user metadata (JSON format)). + \param data - A pointer to the data to send + \return Error - Will be nullptr on success + */ + virtual Error SendData(const EventHeader& event_header, std::string substream, FileData data, uint64_t ingest_mode, + RequestCallback callback) = 0; + + + //! Sends data to the receiver - same as SendData - memory should not be freed until send is finished + //! used e.g. for Python bindings + virtual Error SendData__(const EventHeader& event_header, std::string substream, void* data, uint64_t ingest_mode, + RequestCallback callback) = 0; + //! Stop processing threads //! used e.g. for Python bindings virtual void StopThreads__() = 0; - //! Sends files to the receiver + //! Sends files to the default substream /*! \param event_header - A stucture with the meta information (file name, size is ignored). - \param file name - A full path of the file to send + \param full_path - A full path of the file to send \return Error - Will be nullptr on success */ virtual Error SendFile(const EventHeader& event_header, std::string full_path, uint64_t ingest_mode, RequestCallback callback) = 0; + //! Sends files to the substream + /*! + \param event_header - A stucture with the meta information (file name, size is ignored). + \param full_path - A full path of the file to send + \return Error - Will be nullptr on success + */ + virtual Error SendFile(const EventHeader& event_header, std::string substream, std::string full_path, + uint64_t ingest_mode, + RequestCallback callback) = 0; + + //! Sends metadata for the current beamtime to the receiver /*! \param metadata - a JSON string with metadata diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index d2f933380..7c28af0dd 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -14,6 +14,7 @@ namespace asapo { const size_t ProducerImpl::kDiscoveryServiceUpdateFrequencyMs = 10000; // 10s +const std::string ProducerImpl::kDefaultSubstream = "default"; ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads, asapo::RequestHandlerType type): @@ -30,9 +31,10 @@ ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads, a request_pool__.reset(new RequestPool{n_processing_threads, request_handler_factory_.get(), log__}); } -GenericRequestHeader ProducerImpl::GenerateNextSendRequest(const EventHeader& event_header, uint64_t ingest_mode) { +GenericRequestHeader ProducerImpl::GenerateNextSendRequest(const EventHeader& event_header, std::string substream, + uint64_t ingest_mode) { GenericRequestHeader request{kOpcodeTransferData, event_header.file_id, event_header.file_size, - event_header.user_metadata.size(), std::move(event_header.file_name)}; + event_header.user_metadata.size(), event_header.file_name, substream}; if (event_header.subset_id != 0) { request.op_code = kOpcodeTransferSubsetData; request.custom_data[kPosDataSetId] = event_header.subset_id; @@ -73,6 +75,7 @@ Error CheckProducerRequest(const EventHeader& event_header, uint64_t ingest_mode } Error ProducerImpl::Send(const EventHeader& event_header, + std::string substream, FileData data, std::string full_path, uint64_t ingest_mode, @@ -84,7 +87,7 @@ Error ProducerImpl::Send(const EventHeader& event_header, return err; } - auto request_header = GenerateNextSendRequest(event_header, ingest_mode); + auto request_header = GenerateNextSendRequest(event_header, std::move(substream), ingest_mode); return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header), std::move(data), std::move(event_header.user_metadata), std::move(full_path), callback, manage_data_memory} @@ -110,20 +113,25 @@ Error CheckData(uint64_t ingest_mode, const EventHeader& event_header, const Fil Error ProducerImpl::SendData(const EventHeader& event_header, FileData data, uint64_t ingest_mode, RequestCallback callback) { + return SendData(event_header, kDefaultSubstream, std::move(data), ingest_mode, callback); +} + +Error ProducerImpl::SendData(const EventHeader& event_header, + std::string substream, + FileData data, + uint64_t ingest_mode, + RequestCallback callback) { if (auto err = CheckData(ingest_mode, event_header, &data)) { return err; } - return Send(std::move(event_header), std::move(data), "", ingest_mode, callback, true); + return Send(event_header, std::move(substream), std::move(data), "", ingest_mode, callback, true); + } Error ProducerImpl::SendFile(const EventHeader& event_header, std::string full_path, uint64_t ingest_mode, RequestCallback callback) { - if (full_path.empty()) { - return ProducerErrorTemplates::kWrongInput.Generate("empty filename"); - } - - return Send(event_header, nullptr, std::move(full_path), ingest_mode, callback, true); + return SendFile(event_header, kDefaultSubstream, std::move(full_path), ingest_mode, callback); } @@ -170,8 +178,8 @@ Error ProducerImpl::SendMetaData(const std::string& metadata, RequestCallback ca }); } - Error ProducerImpl::SendData__(const EventHeader& event_header, + std::string substream, void* data, uint64_t ingest_mode, RequestCallback callback) { @@ -181,8 +189,14 @@ Error ProducerImpl::SendData__(const EventHeader& event_header, return err; } + return Send(std::move(event_header), std::move(substream), std::move(data_wrapped), "", ingest_mode, callback, false); +} - return Send(std::move(event_header), std::move(data_wrapped), "", ingest_mode, callback, false); +Error ProducerImpl::SendData__(const EventHeader& event_header, + void* data, + uint64_t ingest_mode, + RequestCallback callback) { + return SendData__(event_header, kDefaultSubstream, data, ingest_mode, callback); } uint64_t ProducerImpl::GetRequestsQueueSize() { @@ -200,5 +214,18 @@ Error ProducerImpl::WaitRequestsFinished(uint64_t timeout_ms) { void ProducerImpl::StopThreads__() { request_pool__->StopThreads(); } +Error ProducerImpl::SendFile(const EventHeader& event_header, + std::string substream, + std::string full_path, + uint64_t ingest_mode, + RequestCallback callback) { + if (full_path.empty()) { + return ProducerErrorTemplates::kWrongInput.Generate("empty filename"); + } + + return Send(event_header, std::move(substream), nullptr, std::move(full_path), ingest_mode, callback, true); + +} + } \ No newline at end of file diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h index e39dba3f3..3ab1ebe79 100644 --- a/producer/api/cpp/src/producer_impl.h +++ b/producer/api/cpp/src/producer_impl.h @@ -19,6 +19,7 @@ class ProducerImpl : public Producer { std::unique_ptr<RequestHandlerFactory> request_handler_factory_; public: static const size_t kDiscoveryServiceUpdateFrequencyMs; + static const std::string kDefaultSubstream; explicit ProducerImpl(std::string endpoint, uint8_t n_processing_threads, asapo::RequestHandlerType type); ProducerImpl(const ProducerImpl&) = delete; @@ -30,9 +31,16 @@ class ProducerImpl : public Producer { Error SendData(const EventHeader& event_header, FileData data, uint64_t ingest_mode, RequestCallback callback) override; Error SendData__(const EventHeader& event_header, void* data , uint64_t ingest_mode, RequestCallback callback) override; + Error SendData(const EventHeader& event_header, std::string substream, FileData data, uint64_t ingest_mode, + RequestCallback callback) override; + Error SendData__(const EventHeader& event_header, std::string substream, void* data , uint64_t ingest_mode, + RequestCallback callback) override; void StopThreads__() override; Error SendFile(const EventHeader& event_header, std::string full_path, uint64_t ingest_mode, RequestCallback callback) override; + Error SendFile(const EventHeader& event_header, std::string substream, std::string full_path, uint64_t ingest_mode, + RequestCallback callback) override; + AbstractLogger* log__; std::unique_ptr<RequestPool> request_pool__; @@ -43,9 +51,11 @@ class ProducerImpl : public Producer { Error WaitRequestsFinished(uint64_t timeout_ms) override; private: - Error Send(const EventHeader& event_header, FileData data, std::string full_path, uint64_t ingest_mode, + Error Send(const EventHeader& event_header, std::string substream, FileData data, std::string full_path, + uint64_t ingest_mode, RequestCallback callback, bool manage_data_memory); - GenericRequestHeader GenerateNextSendRequest(const EventHeader& event_header, uint64_t ingest_mode); + GenericRequestHeader GenerateNextSendRequest(const EventHeader& event_header, std::string substream, + uint64_t ingest_mode); std::string source_cred_string_; }; diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index 0b3d35eb2..d2d73fe6d 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -28,15 +28,16 @@ using ::testing::HasSubstr; using asapo::RequestPool; using asapo::ProducerRequest; -MATCHER_P10(M_CheckSendDataRequest, op_code, source_credentials, metadata, file_id, file_size, message, ingest_mode, +MATCHER_P10(M_CheckSendDataRequest, op_code, source_credentials, metadata, file_id, file_size, message, substream, + ingest_mode, subset_id, - subset_size, manage_data_memory, + subset_size, "Checks if a valid GenericRequestHeader was Send") { auto request = static_cast<ProducerRequest*>(arg); return ((asapo::GenericRequestHeader) (arg->header)).op_code == op_code && ((asapo::GenericRequestHeader) (arg->header)).data_id == file_id && ((asapo::GenericRequestHeader) (arg->header)).data_size == uint64_t(file_size) - && request->manage_data_memory == manage_data_memory + && request->manage_data_memory == true && request->source_credentials == source_credentials && request->metadata == metadata && (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[1] @@ -44,7 +45,8 @@ MATCHER_P10(M_CheckSendDataRequest, op_code, source_credentials, metadata, file_ && (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[2] == uint64_t(subset_size) : true) && ((asapo::GenericRequestHeader) (arg->header)).custom_data[asapo::kPosIngestMode] == uint64_t(ingest_mode) - && strcmp(((asapo::GenericRequestHeader) (arg->header)).message, message) == 0; + && strcmp(((asapo::GenericRequestHeader) (arg->header)).message, message) == 0 + && strcmp(((asapo::GenericRequestHeader) (arg->header)).substream, substream) == 0; } TEST(ProducerImpl, Constructor) { @@ -67,6 +69,8 @@ class ProducerImplTests : public testing::Test { uint64_t expected_ingest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly; char expected_name[asapo::kMaxMessageSize] = "test_name"; + char expected_substream[asapo::kMaxMessageSize] = "test_substream"; + asapo::SourceCredentials expected_credentials{ "beamtime_id", "subname", "token" }; @@ -164,12 +168,10 @@ TEST_F(ProducerImplTests, UsesDefaultStream) { expected_id, expected_size, expected_name, + asapo::ProducerImpl::kDefaultSubstream.c_str(), expected_ingest_mode, 0, - 0, - expected_managed_memory - ))).WillOnce(Return( - nullptr)); + 0))).WillOnce(Return(nullptr)); asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata}; auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr); @@ -186,11 +188,12 @@ TEST_F(ProducerImplTests, OKSendingSendDataRequest) { expected_id, expected_size, expected_name, + asapo::ProducerImpl::kDefaultSubstream.c_str(), expected_ingest_mode, 0, - 0, - expected_managed_memory))).WillOnce(Return( - nullptr)); + 0 + ))).WillOnce(Return( + nullptr)); asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata}; auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr); @@ -198,7 +201,7 @@ TEST_F(ProducerImplTests, OKSendingSendDataRequest) { ASSERT_THAT(err, Eq(nullptr)); } -TEST_F(ProducerImplTests, OKSendingSendDataRequesUnmanagedMemory) { +TEST_F(ProducerImplTests, OKSendingSendDataRequestWithSubstream) { producer.SetCredentials(expected_credentials); EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, @@ -207,26 +210,27 @@ TEST_F(ProducerImplTests, OKSendingSendDataRequesUnmanagedMemory) { expected_id, expected_size, expected_name, + expected_substream, expected_ingest_mode, 0, - 0, - expected_unmanaged_memory + 0 ))).WillOnce(Return( nullptr)); asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata}; - auto err = producer.SendData__(event_header, nullptr, expected_ingest_mode, nullptr); + auto err = producer.SendData(event_header, expected_substream, nullptr, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(nullptr)); } + TEST_F(ProducerImplTests, OKSendingSendSubsetDataRequest) { producer.SetCredentials(expected_credentials); EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferSubsetData, expected_credentials_str, expected_metadata, - expected_id, expected_size, expected_name, + expected_id, expected_size, expected_name, asapo::ProducerImpl::kDefaultSubstream.c_str(), expected_ingest_mode, - expected_subset_id, expected_subset_size, expected_managed_memory))).WillOnce( + expected_subset_id, expected_subset_size))).WillOnce( Return( nullptr)); @@ -250,9 +254,10 @@ TEST_F(ProducerImplTests, OKAddingSendMetaDataRequest) { expected_id, expected_size, "beamtime_global.meta", + "", expected_ingest_mode, 10, - 10, expected_managed_memory))).WillOnce(Return( + 10))).WillOnce(Return( nullptr)); auto err = producer.SendMetaData(expected_metadata, nullptr); @@ -296,10 +301,11 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequest) { expected_id, 0, expected_name, + asapo::ProducerImpl::kDefaultSubstream.c_str(), expected_ingest_mode, 0, - 0, expected_managed_memory))).WillOnce(Return( - nullptr)); + 0))).WillOnce(Return( + nullptr)); asapo::EventHeader event_header{expected_id, 0, expected_name}; auto err = producer.SendFile(event_header, expected_fullpath, expected_ingest_mode, nullptr); @@ -307,6 +313,27 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequest) { ASSERT_THAT(err, Eq(nullptr)); } +TEST_F(ProducerImplTests, OKSendingSendFileRequestWithSubstream) { + producer.SetCredentials(expected_credentials); + + EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, + expected_credentials_str, + "", + expected_id, + 0, + expected_name, + expected_substream, + expected_ingest_mode, + 0, + 0))).WillOnce(Return( + nullptr)); + + asapo::EventHeader event_header{expected_id, 0, expected_name}; + auto err = producer.SendFile(event_header, expected_substream, expected_fullpath, expected_ingest_mode, nullptr); + + ASSERT_THAT(err, Eq(nullptr)); +} + TEST_F(ProducerImplTests, ErrorSettingBeamtime) { std::string long_str(asapo::kMaxMessageSize * 10, 'a'); expected_credentials = asapo::SourceCredentials{long_str, "", ""}; diff --git a/producer/api/cpp/unittests/test_request_handler_tcp.cpp b/producer/api/cpp/unittests/test_request_handler_tcp.cpp index 42988f20f..d086a932f 100644 --- a/producer/api/cpp/unittests/test_request_handler_tcp.cpp +++ b/producer/api/cpp/unittests/test_request_handler_tcp.cpp @@ -59,12 +59,15 @@ class RequestHandlerTcpTests : public testing::Test { char expected_file_name[asapo::kMaxMessageSize] = "test_name"; char expected_beamtime_id[asapo::kMaxMessageSize] = "test_beamtime_id"; + char expected_substream[asapo::kMaxMessageSize] = "test_substream"; uint64_t expected_thread_id = 2; asapo::Error callback_err; - asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_meta_size, expected_file_name}; - asapo::GenericRequestHeader header_fromfile{expected_op_code, expected_file_id, 0, expected_meta_size, expected_file_name}; + asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, + expected_meta_size, expected_file_name, expected_substream}; + asapo::GenericRequestHeader header_fromfile{expected_op_code, expected_file_id, 0, expected_meta_size, + expected_file_name, expected_substream}; bool callback_called = false; asapo::GenericRequestHeader callback_header; @@ -140,12 +143,14 @@ ACTION_P(A_WriteSendDataResponse, error_code) { strcpy(((asapo::SendDataResponse*)arg1)->message, expected_auth_message.c_str()); } -MATCHER_P4(M_CheckSendDataRequest, op_code, file_id, file_size, message, +MATCHER_P5(M_CheckSendDataRequest, op_code, file_id, file_size, message, substream, "Checks if a valid GenericRequestHeader was Send") { return ((asapo::GenericRequestHeader*)arg)->op_code == op_code && ((asapo::GenericRequestHeader*)arg)->data_id == uint64_t(file_id) && ((asapo::GenericRequestHeader*)arg)->data_size == uint64_t(file_size) - && strcmp(((asapo::GenericRequestHeader*)arg)->message, message) == 0; + && strcmp(((asapo::GenericRequestHeader*)arg)->message, message) == 0 + && strcmp(((asapo::GenericRequestHeader*)arg)->substream, substream) == 0; + } @@ -172,7 +177,8 @@ void RequestHandlerTcpTests::ExpectFailConnect(bool only_once) { void RequestHandlerTcpTests::ExpectFailAuthorize(bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(asapo::kOpcodeAuthorize, 0, 0, expected_beamtime_id), + EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(asapo::kOpcodeAuthorize, 0, 0, expected_beamtime_id, + ""), sizeof(asapo::GenericRequestHeader), _)) .WillOnce( DoAll( @@ -209,7 +215,8 @@ void RequestHandlerTcpTests::ExpectFailAuthorize(bool only_once) { void RequestHandlerTcpTests::ExpectOKAuthorize(bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(asapo::kOpcodeAuthorize, 0, 0, expected_beamtime_id), + EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(asapo::kOpcodeAuthorize, 0, 0, expected_beamtime_id, + ""), sizeof(asapo::GenericRequestHeader), _)) .WillOnce( DoAll( @@ -243,7 +250,7 @@ void RequestHandlerTcpTests::ExpectFailSendHeader(bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(expected_op_code, expected_file_id, - expected_file_size, expected_file_name), + expected_file_size, expected_file_name, expected_substream), sizeof(asapo::GenericRequestHeader), _)) .WillOnce( DoAll( @@ -408,7 +415,7 @@ void RequestHandlerTcpTests::ExpectOKSendFile(bool only_once) { void RequestHandlerTcpTests::ExpectOKSendHeader(bool only_once, asapo::Opcode opcode) { for (auto expected_sd : expected_sds) { EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(opcode, expected_file_id, - expected_file_size, expected_file_name), + expected_file_size, expected_file_name, expected_substream), sizeof(asapo::GenericRequestHeader), _)) .WillOnce( DoAll( -- GitLab