Skip to content
Snippets Groups Projects
Commit 452642a3 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

producer can send to substreams

parent bdfeee94
No related branches found
No related tags found
No related merge requests found
......@@ -44,14 +44,17 @@ const std::size_t kPosDataSetSize = 2;
struct GenericRequestHeader {
GenericRequestHeader(Opcode i_op_code = kOpcodeUnknownOp, uint64_t i_data_id = 0,
uint64_t i_data_size = 0, uint64_t i_meta_size = 0, const std::string& i_message = ""):
uint64_t i_data_size = 0, uint64_t i_meta_size = 0, const std::string& i_message = "",
const std::string& i_substream = ""):
op_code{i_op_code}, data_id{i_data_id}, data_size{i_data_size}, meta_size{i_meta_size} {
strncpy(message, i_message.c_str(), kMaxMessageSize);
strncpy(substream, i_substream.c_str(), kMaxMessageSize);
}
GenericRequestHeader(const GenericRequestHeader& header) {
op_code = header.op_code, data_id = header.data_id, data_size = header.data_size, meta_size = header.meta_size,
memcpy(custom_data, header.custom_data, kNCustomParams * sizeof(uint64_t)),
strncpy(message, header.message, kMaxMessageSize);
strncpy(substream, header.substream, kMaxMessageSize);
}
Opcode op_code;
......@@ -60,6 +63,7 @@ struct GenericRequestHeader {
uint64_t meta_size;
CustomRequestData custom_data;
char message[kMaxMessageSize];
char substream[kMaxMessageSize];
std::string Json() {
std::string s = "{\"id\":" + std::to_string(data_id) + ","
"\"buffer\":\"" + std::string(message) + "\""
......
......@@ -39,19 +39,45 @@ class Producer {
virtual Error SendData__(const EventHeader& event_header, void* data, uint64_t ingest_mode,
RequestCallback callback) = 0;
//! Sends data to the receiver
/*!
\param event_header - A stucture with the meta information (file name, size, a string with user metadata (JSON format)).
\param data - A pointer to the data to send
\return Error - Will be nullptr on success
*/
virtual Error SendData(const EventHeader& event_header, std::string substream, FileData data, uint64_t ingest_mode,
RequestCallback callback) = 0;
//! Sends data to the receiver - same as SendData - memory should not be freed until send is finished
//! used e.g. for Python bindings
virtual Error SendData__(const EventHeader& event_header, std::string substream, void* data, uint64_t ingest_mode,
RequestCallback callback) = 0;
//! Stop processing threads
//! used e.g. for Python bindings
virtual void StopThreads__() = 0;
//! Sends files to the receiver
//! Sends files to the default substream
/*!
\param event_header - A stucture with the meta information (file name, size is ignored).
\param file name - A full path of the file to send
\param full_path - A full path of the file to send
\return Error - Will be nullptr on success
*/
virtual Error SendFile(const EventHeader& event_header, std::string full_path, uint64_t ingest_mode,
RequestCallback callback) = 0;
//! Sends files to the substream
/*!
\param event_header - A stucture with the meta information (file name, size is ignored).
\param full_path - A full path of the file to send
\return Error - Will be nullptr on success
*/
virtual Error SendFile(const EventHeader& event_header, std::string substream, std::string full_path,
uint64_t ingest_mode,
RequestCallback callback) = 0;
//! Sends metadata for the current beamtime to the receiver
/*!
\param metadata - a JSON string with metadata
......
......@@ -14,6 +14,7 @@
namespace asapo {
const size_t ProducerImpl::kDiscoveryServiceUpdateFrequencyMs = 10000; // 10s
const std::string ProducerImpl::kDefaultSubstream = "default";
ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads, asapo::RequestHandlerType type):
......@@ -30,9 +31,10 @@ ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads, a
request_pool__.reset(new RequestPool{n_processing_threads, request_handler_factory_.get(), log__});
}
GenericRequestHeader ProducerImpl::GenerateNextSendRequest(const EventHeader& event_header, uint64_t ingest_mode) {
GenericRequestHeader ProducerImpl::GenerateNextSendRequest(const EventHeader& event_header, std::string substream,
uint64_t ingest_mode) {
GenericRequestHeader request{kOpcodeTransferData, event_header.file_id, event_header.file_size,
event_header.user_metadata.size(), std::move(event_header.file_name)};
event_header.user_metadata.size(), event_header.file_name, substream};
if (event_header.subset_id != 0) {
request.op_code = kOpcodeTransferSubsetData;
request.custom_data[kPosDataSetId] = event_header.subset_id;
......@@ -73,6 +75,7 @@ Error CheckProducerRequest(const EventHeader& event_header, uint64_t ingest_mode
}
Error ProducerImpl::Send(const EventHeader& event_header,
std::string substream,
FileData data,
std::string full_path,
uint64_t ingest_mode,
......@@ -84,7 +87,7 @@ Error ProducerImpl::Send(const EventHeader& event_header,
return err;
}
auto request_header = GenerateNextSendRequest(event_header, ingest_mode);
auto request_header = GenerateNextSendRequest(event_header, std::move(substream), ingest_mode);
return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header),
std::move(data), std::move(event_header.user_metadata), std::move(full_path), callback, manage_data_memory}
......@@ -110,20 +113,25 @@ Error CheckData(uint64_t ingest_mode, const EventHeader& event_header, const Fil
Error ProducerImpl::SendData(const EventHeader& event_header, FileData data,
uint64_t ingest_mode, RequestCallback callback) {
return SendData(event_header, kDefaultSubstream, std::move(data), ingest_mode, callback);
}
Error ProducerImpl::SendData(const EventHeader& event_header,
std::string substream,
FileData data,
uint64_t ingest_mode,
RequestCallback callback) {
if (auto err = CheckData(ingest_mode, event_header, &data)) {
return err;
}
return Send(std::move(event_header), std::move(data), "", ingest_mode, callback, true);
return Send(event_header, std::move(substream), std::move(data), "", ingest_mode, callback, true);
}
Error ProducerImpl::SendFile(const EventHeader& event_header, std::string full_path, uint64_t ingest_mode,
RequestCallback callback) {
if (full_path.empty()) {
return ProducerErrorTemplates::kWrongInput.Generate("empty filename");
}
return Send(event_header, nullptr, std::move(full_path), ingest_mode, callback, true);
return SendFile(event_header, kDefaultSubstream, std::move(full_path), ingest_mode, callback);
}
......@@ -170,8 +178,8 @@ Error ProducerImpl::SendMetaData(const std::string& metadata, RequestCallback ca
});
}
Error ProducerImpl::SendData__(const EventHeader& event_header,
std::string substream,
void* data,
uint64_t ingest_mode,
RequestCallback callback) {
......@@ -181,8 +189,14 @@ Error ProducerImpl::SendData__(const EventHeader& event_header,
return err;
}
return Send(std::move(event_header), std::move(substream), std::move(data_wrapped), "", ingest_mode, callback, false);
}
return Send(std::move(event_header), std::move(data_wrapped), "", ingest_mode, callback, false);
Error ProducerImpl::SendData__(const EventHeader& event_header,
void* data,
uint64_t ingest_mode,
RequestCallback callback) {
return SendData__(event_header, kDefaultSubstream, data, ingest_mode, callback);
}
uint64_t ProducerImpl::GetRequestsQueueSize() {
......@@ -200,5 +214,18 @@ Error ProducerImpl::WaitRequestsFinished(uint64_t timeout_ms) {
void ProducerImpl::StopThreads__() {
request_pool__->StopThreads();
}
Error ProducerImpl::SendFile(const EventHeader& event_header,
std::string substream,
std::string full_path,
uint64_t ingest_mode,
RequestCallback callback) {
if (full_path.empty()) {
return ProducerErrorTemplates::kWrongInput.Generate("empty filename");
}
return Send(event_header, std::move(substream), nullptr, std::move(full_path), ingest_mode, callback, true);
}
}
\ No newline at end of file
......@@ -19,6 +19,7 @@ class ProducerImpl : public Producer {
std::unique_ptr<RequestHandlerFactory> request_handler_factory_;
public:
static const size_t kDiscoveryServiceUpdateFrequencyMs;
static const std::string kDefaultSubstream;
explicit ProducerImpl(std::string endpoint, uint8_t n_processing_threads, asapo::RequestHandlerType type);
ProducerImpl(const ProducerImpl&) = delete;
......@@ -30,9 +31,16 @@ class ProducerImpl : public Producer {
Error SendData(const EventHeader& event_header, FileData data, uint64_t ingest_mode, RequestCallback callback) override;
Error SendData__(const EventHeader& event_header, void* data , uint64_t ingest_mode,
RequestCallback callback) override;
Error SendData(const EventHeader& event_header, std::string substream, FileData data, uint64_t ingest_mode,
RequestCallback callback) override;
Error SendData__(const EventHeader& event_header, std::string substream, void* data , uint64_t ingest_mode,
RequestCallback callback) override;
void StopThreads__() override;
Error SendFile(const EventHeader& event_header, std::string full_path, uint64_t ingest_mode,
RequestCallback callback) override;
Error SendFile(const EventHeader& event_header, std::string substream, std::string full_path, uint64_t ingest_mode,
RequestCallback callback) override;
AbstractLogger* log__;
std::unique_ptr<RequestPool> request_pool__;
......@@ -43,9 +51,11 @@ class ProducerImpl : public Producer {
Error WaitRequestsFinished(uint64_t timeout_ms) override;
private:
Error Send(const EventHeader& event_header, FileData data, std::string full_path, uint64_t ingest_mode,
Error Send(const EventHeader& event_header, std::string substream, FileData data, std::string full_path,
uint64_t ingest_mode,
RequestCallback callback, bool manage_data_memory);
GenericRequestHeader GenerateNextSendRequest(const EventHeader& event_header, uint64_t ingest_mode);
GenericRequestHeader GenerateNextSendRequest(const EventHeader& event_header, std::string substream,
uint64_t ingest_mode);
std::string source_cred_string_;
};
......
......@@ -28,15 +28,16 @@ using ::testing::HasSubstr;
using asapo::RequestPool;
using asapo::ProducerRequest;
MATCHER_P10(M_CheckSendDataRequest, op_code, source_credentials, metadata, file_id, file_size, message, ingest_mode,
MATCHER_P10(M_CheckSendDataRequest, op_code, source_credentials, metadata, file_id, file_size, message, substream,
ingest_mode,
subset_id,
subset_size, manage_data_memory,
subset_size,
"Checks if a valid GenericRequestHeader was Send") {
auto request = static_cast<ProducerRequest*>(arg);
return ((asapo::GenericRequestHeader) (arg->header)).op_code == op_code
&& ((asapo::GenericRequestHeader) (arg->header)).data_id == file_id
&& ((asapo::GenericRequestHeader) (arg->header)).data_size == uint64_t(file_size)
&& request->manage_data_memory == manage_data_memory
&& request->manage_data_memory == true
&& request->source_credentials == source_credentials
&& request->metadata == metadata
&& (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[1]
......@@ -44,7 +45,8 @@ MATCHER_P10(M_CheckSendDataRequest, op_code, source_credentials, metadata, file_
&& (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[2]
== uint64_t(subset_size) : true)
&& ((asapo::GenericRequestHeader) (arg->header)).custom_data[asapo::kPosIngestMode] == uint64_t(ingest_mode)
&& strcmp(((asapo::GenericRequestHeader) (arg->header)).message, message) == 0;
&& strcmp(((asapo::GenericRequestHeader) (arg->header)).message, message) == 0
&& strcmp(((asapo::GenericRequestHeader) (arg->header)).substream, substream) == 0;
}
TEST(ProducerImpl, Constructor) {
......@@ -67,6 +69,8 @@ class ProducerImplTests : public testing::Test {
uint64_t expected_ingest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly;
char expected_name[asapo::kMaxMessageSize] = "test_name";
char expected_substream[asapo::kMaxMessageSize] = "test_substream";
asapo::SourceCredentials expected_credentials{
"beamtime_id", "subname", "token"
};
......@@ -164,12 +168,10 @@ TEST_F(ProducerImplTests, UsesDefaultStream) {
expected_id,
expected_size,
expected_name,
asapo::ProducerImpl::kDefaultSubstream.c_str(),
expected_ingest_mode,
0,
0,
expected_managed_memory
))).WillOnce(Return(
nullptr));
0))).WillOnce(Return(nullptr));
asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata};
auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr);
......@@ -186,11 +188,12 @@ TEST_F(ProducerImplTests, OKSendingSendDataRequest) {
expected_id,
expected_size,
expected_name,
asapo::ProducerImpl::kDefaultSubstream.c_str(),
expected_ingest_mode,
0,
0,
expected_managed_memory))).WillOnce(Return(
nullptr));
0
))).WillOnce(Return(
nullptr));
asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata};
auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr);
......@@ -198,7 +201,7 @@ TEST_F(ProducerImplTests, OKSendingSendDataRequest) {
ASSERT_THAT(err, Eq(nullptr));
}
TEST_F(ProducerImplTests, OKSendingSendDataRequesUnmanagedMemory) {
TEST_F(ProducerImplTests, OKSendingSendDataRequestWithSubstream) {
producer.SetCredentials(expected_credentials);
EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData,
......@@ -207,26 +210,27 @@ TEST_F(ProducerImplTests, OKSendingSendDataRequesUnmanagedMemory) {
expected_id,
expected_size,
expected_name,
expected_substream,
expected_ingest_mode,
0,
0,
expected_unmanaged_memory
0
))).WillOnce(Return(
nullptr));
asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata};
auto err = producer.SendData__(event_header, nullptr, expected_ingest_mode, nullptr);
auto err = producer.SendData(event_header, expected_substream, nullptr, expected_ingest_mode, nullptr);
ASSERT_THAT(err, Eq(nullptr));
}
TEST_F(ProducerImplTests, OKSendingSendSubsetDataRequest) {
producer.SetCredentials(expected_credentials);
EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferSubsetData,
expected_credentials_str, expected_metadata,
expected_id, expected_size, expected_name,
expected_id, expected_size, expected_name, asapo::ProducerImpl::kDefaultSubstream.c_str(),
expected_ingest_mode,
expected_subset_id, expected_subset_size, expected_managed_memory))).WillOnce(
expected_subset_id, expected_subset_size))).WillOnce(
Return(
nullptr));
......@@ -250,9 +254,10 @@ TEST_F(ProducerImplTests, OKAddingSendMetaDataRequest) {
expected_id,
expected_size,
"beamtime_global.meta",
"",
expected_ingest_mode,
10,
10, expected_managed_memory))).WillOnce(Return(
10))).WillOnce(Return(
nullptr));
auto err = producer.SendMetaData(expected_metadata, nullptr);
......@@ -296,10 +301,11 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequest) {
expected_id,
0,
expected_name,
asapo::ProducerImpl::kDefaultSubstream.c_str(),
expected_ingest_mode,
0,
0, expected_managed_memory))).WillOnce(Return(
nullptr));
0))).WillOnce(Return(
nullptr));
asapo::EventHeader event_header{expected_id, 0, expected_name};
auto err = producer.SendFile(event_header, expected_fullpath, expected_ingest_mode, nullptr);
......@@ -307,6 +313,27 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequest) {
ASSERT_THAT(err, Eq(nullptr));
}
TEST_F(ProducerImplTests, OKSendingSendFileRequestWithSubstream) {
producer.SetCredentials(expected_credentials);
EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData,
expected_credentials_str,
"",
expected_id,
0,
expected_name,
expected_substream,
expected_ingest_mode,
0,
0))).WillOnce(Return(
nullptr));
asapo::EventHeader event_header{expected_id, 0, expected_name};
auto err = producer.SendFile(event_header, expected_substream, expected_fullpath, expected_ingest_mode, nullptr);
ASSERT_THAT(err, Eq(nullptr));
}
TEST_F(ProducerImplTests, ErrorSettingBeamtime) {
std::string long_str(asapo::kMaxMessageSize * 10, 'a');
expected_credentials = asapo::SourceCredentials{long_str, "", ""};
......
......@@ -59,12 +59,15 @@ class RequestHandlerTcpTests : public testing::Test {
char expected_file_name[asapo::kMaxMessageSize] = "test_name";
char expected_beamtime_id[asapo::kMaxMessageSize] = "test_beamtime_id";
char expected_substream[asapo::kMaxMessageSize] = "test_substream";
uint64_t expected_thread_id = 2;
asapo::Error callback_err;
asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_meta_size, expected_file_name};
asapo::GenericRequestHeader header_fromfile{expected_op_code, expected_file_id, 0, expected_meta_size, expected_file_name};
asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size,
expected_meta_size, expected_file_name, expected_substream};
asapo::GenericRequestHeader header_fromfile{expected_op_code, expected_file_id, 0, expected_meta_size,
expected_file_name, expected_substream};
bool callback_called = false;
asapo::GenericRequestHeader callback_header;
......@@ -140,12 +143,14 @@ ACTION_P(A_WriteSendDataResponse, error_code) {
strcpy(((asapo::SendDataResponse*)arg1)->message, expected_auth_message.c_str());
}
MATCHER_P4(M_CheckSendDataRequest, op_code, file_id, file_size, message,
MATCHER_P5(M_CheckSendDataRequest, op_code, file_id, file_size, message, substream,
"Checks if a valid GenericRequestHeader was Send") {
return ((asapo::GenericRequestHeader*)arg)->op_code == op_code
&& ((asapo::GenericRequestHeader*)arg)->data_id == uint64_t(file_id)
&& ((asapo::GenericRequestHeader*)arg)->data_size == uint64_t(file_size)
&& strcmp(((asapo::GenericRequestHeader*)arg)->message, message) == 0;
&& strcmp(((asapo::GenericRequestHeader*)arg)->message, message) == 0
&& strcmp(((asapo::GenericRequestHeader*)arg)->substream, substream) == 0;
}
......@@ -172,7 +177,8 @@ void RequestHandlerTcpTests::ExpectFailConnect(bool only_once) {
void RequestHandlerTcpTests::ExpectFailAuthorize(bool only_once) {
int i = 0;
for (auto expected_sd : expected_sds) {
EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(asapo::kOpcodeAuthorize, 0, 0, expected_beamtime_id),
EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(asapo::kOpcodeAuthorize, 0, 0, expected_beamtime_id,
""),
sizeof(asapo::GenericRequestHeader), _))
.WillOnce(
DoAll(
......@@ -209,7 +215,8 @@ void RequestHandlerTcpTests::ExpectFailAuthorize(bool only_once) {
void RequestHandlerTcpTests::ExpectOKAuthorize(bool only_once) {
int i = 0;
for (auto expected_sd : expected_sds) {
EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(asapo::kOpcodeAuthorize, 0, 0, expected_beamtime_id),
EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(asapo::kOpcodeAuthorize, 0, 0, expected_beamtime_id,
""),
sizeof(asapo::GenericRequestHeader), _))
.WillOnce(
DoAll(
......@@ -243,7 +250,7 @@ void RequestHandlerTcpTests::ExpectFailSendHeader(bool only_once) {
int i = 0;
for (auto expected_sd : expected_sds) {
EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(expected_op_code, expected_file_id,
expected_file_size, expected_file_name),
expected_file_size, expected_file_name, expected_substream),
sizeof(asapo::GenericRequestHeader), _))
.WillOnce(
DoAll(
......@@ -408,7 +415,7 @@ void RequestHandlerTcpTests::ExpectOKSendFile(bool only_once) {
void RequestHandlerTcpTests::ExpectOKSendHeader(bool only_once, asapo::Opcode opcode) {
for (auto expected_sd : expected_sds) {
EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(opcode, expected_file_id,
expected_file_size, expected_file_name),
expected_file_size, expected_file_name, expected_substream),
sizeof(asapo::GenericRequestHeader), _))
.WillOnce(
DoAll(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment