diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index d42d53ac4e1084331cb966e1a899d35e7727f211..bc65a16874fc24d54d26634bc1dc5fa49b1b7590 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -36,18 +36,19 @@ const std::size_t kMaxMessageSize = 1024; struct GenericRequestHeader { GenericRequestHeader(Opcode i_op_code = kOpcodeUnknownOp, uint64_t i_data_id = 0, - uint64_t i_data_size = 0, const std::string& i_message = ""): - op_code{i_op_code}, data_id{i_data_id}, data_size{i_data_size} { + uint64_t i_data_size = 0, uint64_t i_meta_size = 0, const std::string& i_message = ""): + op_code{i_op_code}, data_id{i_data_id}, data_size{i_data_size}, meta_size{i_meta_size} { strncpy(message, i_message.c_str(), kMaxMessageSize); } GenericRequestHeader(const GenericRequestHeader& header) { - op_code = header.op_code, data_id = header.data_id, data_size = header.data_size, + op_code = header.op_code, data_id = header.data_id, data_size = header.data_size, meta_size = header.meta_size, strncpy(message, header.message, kMaxMessageSize); } Opcode op_code; uint64_t data_id; uint64_t data_size; + uint64_t meta_size; char message[kMaxMessageSize]; }; diff --git a/common/cpp/unittests/request/test_request.cpp b/common/cpp/unittests/request/test_request.cpp index 5e07bdabdba87017e53e26ecc5f81fa8b4a7fbf9..80669cddb9de96f7cca8eb3f551ea24aafef5586 100644 --- a/common/cpp/unittests/request/test_request.cpp +++ b/common/cpp/unittests/request/test_request.cpp @@ -38,12 +38,13 @@ using asapo::GenericRequestHeader; TEST(Request, Constructor) { - GenericRequestHeader header{asapo::kOpcodeTransferData, 1, 2, "hello"}; + GenericRequestHeader header{asapo::kOpcodeTransferData, 1, 2, 3, "hello"}; GenericRequest r{header}; ASSERT_THAT(r.header.data_id, Eq(1)); ASSERT_THAT(r.header.op_code, Eq(asapo::kOpcodeTransferData)); ASSERT_THAT(r.header.data_size, Eq(2)); + ASSERT_THAT(r.header.meta_size, Eq(3)); ASSERT_THAT(r.header.message, testing::StrEq("hello")); } diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index dee9462a5b32aa65bf4b0fdb79e980537fce0570..ad646a1b936ccf7699d7c328828c821494414ade 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -105,7 +105,8 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it for(uint64_t i = 0; i < iterations; i++) { auto buffer = CreateMemoryBuffer(number_of_byte); asapo::EventHeader event_header{i + 1, number_of_byte, std::to_string(i + 1)}; - auto err = producer->SendData(event_header, std::move(buffer), &ProcessAfterSend); + std::string meta = "{\"dummy_meta\":\"test" + std::to_string(i + 1) + "\"}"; + auto err = producer->SendData(event_header, std::move(buffer), std::move(meta), &ProcessAfterSend); if (err) { std::cerr << "Cannot send file: " << err << std::endl; return false; diff --git a/producer/api/include/producer/producer.h b/producer/api/include/producer/producer.h index 839c0897849c4d9dee872b888e7fb7447206d64d..6943c173a4dce3cc912e3ed2b3c49e0487740d21 100644 --- a/producer/api/include/producer/producer.h +++ b/producer/api/include/producer/producer.h @@ -28,9 +28,11 @@ class Producer { /*! \param event_header - A stucture with the meta information (file name, size). \param data - A pointer to the data to send + \param metadata - A string with metadata (JSON format) \return Error - Will be nullptr on success */ - virtual Error SendData(const EventHeader& event_header, FileData data, RequestCallback callback) = 0; + virtual Error SendData(const EventHeader& event_header, FileData data, std::string metadata, + RequestCallback callback) = 0; //! Sends files to the receiver /*! \param event_header - A stucture with the meta information (file name, size is ignored). diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp index fc52dd56104a98c3c165e4e761cfccf65cf5eb0c..b5c679a48f1c1ceb10023a108976aecab76f96f9 100644 --- a/producer/api/src/producer_impl.cpp +++ b/producer/api/src/producer_impl.cpp @@ -29,9 +29,9 @@ ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads, a request_pool__.reset(new RequestPool{n_processing_threads, request_handler_factory_.get(), log__}); } -GenericRequestHeader ProducerImpl::GenerateNextSendRequest(uint64_t file_id, uint64_t file_size, +GenericRequestHeader ProducerImpl::GenerateNextSendRequest(uint64_t file_id, uint64_t file_size, uint64_t meta_size, std::string file_name) { - GenericRequestHeader request{kOpcodeTransferData, file_id, file_size, std::move(file_name)}; + GenericRequestHeader request{kOpcodeTransferData, file_id, file_size, meta_size, std::move(file_name)}; return request; } @@ -49,6 +49,7 @@ Error CheckProducerRequest(size_t file_size, size_t filename_size) { Error ProducerImpl::Send(const EventHeader& event_header, FileData data, + std::string metadata, std::string full_path, RequestCallback callback) { auto err = CheckProducerRequest((size_t)event_header.file_size, event_header.file_name.size()); @@ -57,23 +58,24 @@ Error ProducerImpl::Send(const EventHeader& event_header, return err; } - auto request_header = GenerateNextSendRequest(event_header.file_id, event_header.file_size, event_header.file_name); + auto request_header = GenerateNextSendRequest(event_header.file_id, event_header.file_size, + metadata.size(), event_header.file_name); return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{beamtime_id_, std::move(request_header), - std::move(data), std::move(full_path), callback} + std::move(data), std::move(metadata), std::move(full_path), callback} }); } -Error ProducerImpl::SendData(const EventHeader& event_header, FileData data, +Error ProducerImpl::SendData(const EventHeader& event_header, FileData data, std::string metadata, RequestCallback callback) { - return Send(event_header, std::move(data), "", callback); + return Send(event_header, std::move(data), std::move(metadata), "", callback); } Error ProducerImpl::SendFile(const EventHeader& event_header, std::string full_path, RequestCallback callback) { - return Send(event_header, nullptr, std::move(full_path), callback); + return Send(event_header, nullptr, "", std::move(full_path), callback); } @@ -106,11 +108,11 @@ Error ProducerImpl::SetBeamtimeId(std::string beamtime_id) { } Error ProducerImpl::SendMetaData(const std::string& metadata, RequestCallback callback) { - GenericRequestHeader request_header{kOpcodeTransferMetaData, 0, metadata.size(), "beamtime_global.meta"}; + GenericRequestHeader request_header{kOpcodeTransferMetaData, 0, metadata.size(), 0, "beamtime_global.meta"}; FileData data{new uint8_t[metadata.size()]}; strncpy((char*)data.get(), metadata.c_str(), metadata.size()); return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{beamtime_id_, std::move(request_header), - std::move(data), "", callback} + std::move(data), "", "", callback} }); } diff --git a/producer/api/src/producer_impl.h b/producer/api/src/producer_impl.h index 59e3f9bfb800036d16262e4eb926d8b52deaba3f..a6845b81b3f1303b0a0624858a8640560ce2ae21 100644 --- a/producer/api/src/producer_impl.h +++ b/producer/api/src/producer_impl.h @@ -28,7 +28,7 @@ class ProducerImpl : public Producer { void SetLogLevel(LogLevel level) override; void EnableLocalLog(bool enable) override; void EnableRemoteLog(bool enable) override; - Error SendData(const EventHeader& event_header, FileData data, RequestCallback callback) override; + Error SendData(const EventHeader& event_header, FileData data, std::string metadata, RequestCallback callback) override; Error SendFile(const EventHeader& event_header, std::string full_path, RequestCallback callback) override; AbstractLogger* log__; std::unique_ptr<RequestPool> request_pool__; @@ -36,8 +36,10 @@ class ProducerImpl : public Producer { Error SendMetaData(const std::string& metadata, RequestCallback callback) override; private: - Error Send(const EventHeader& event_header, FileData data, std::string full_path, RequestCallback callback); - GenericRequestHeader GenerateNextSendRequest(uint64_t file_id, uint64_t file_size, std::string file_name); + Error Send(const EventHeader& event_header, FileData data, std::string metadata, std::string full_path, + RequestCallback callback); + GenericRequestHeader GenerateNextSendRequest(uint64_t file_id, uint64_t file_size, uint64_t meta_size, + std::string file_name); std::string beamtime_id_; }; diff --git a/producer/api/src/producer_request.cpp b/producer/api/src/producer_request.cpp index 3dde2696dd6596dc20bdf7389fb9734887e2d539..cc52482048d1f6d5e8ba0ef6357c4501e310655d 100644 --- a/producer/api/src/producer_request.cpp +++ b/producer/api/src/producer_request.cpp @@ -14,9 +14,14 @@ Error ProducerRequest::ReadDataFromFileIfNeeded(const IO* io) { ProducerRequest::ProducerRequest(std::string beamtime_id, GenericRequestHeader h, FileData data, + std::string metadata, std::string original_filepath, - RequestCallback callback) : GenericRequest(std::move(h)), beamtime_id{beamtime_id}, data{std::move(data)}, - original_filepath{std::move(original_filepath)}, callback{callback} { + RequestCallback callback) : GenericRequest(std::move(h)), + beamtime_id{std::move(beamtime_id)}, + metadata{std::move(metadata)}, + data{std::move(data)}, + original_filepath{std::move(original_filepath)}, + callback{callback} { } } \ No newline at end of file diff --git a/producer/api/src/producer_request.h b/producer/api/src/producer_request.h index d5befdb8e666df86f9bfad18ae8229ddcefae809..3f90d52fa902fb1f4203e107977dae481de1f543 100644 --- a/producer/api/src/producer_request.h +++ b/producer/api/src/producer_request.h @@ -11,9 +11,12 @@ namespace asapo { class ProducerRequest : public GenericRequest { public: - ProducerRequest(std::string beamtime_id, GenericRequestHeader header, FileData data, std::string original_filepath, + ProducerRequest(std::string beamtime_id, GenericRequestHeader header, FileData data, + std::string metadata, + std::string original_filepath, RequestCallback callback); std::string beamtime_id; + std::string metadata; FileData data; std::string original_filepath; RequestCallback callback; diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp index 9f6d911f8ab03a43f8ddab1d1101261e70c4dbc9..43b552712134ed6d1dced472ba2e01cafe761982 100644 --- a/producer/api/src/request_handler_tcp.cpp +++ b/producer/api/src/request_handler_tcp.cpp @@ -15,7 +15,7 @@ RequestHandlerTcp::RequestHandlerTcp(ReceiverDiscoveryService* discovery_service } Error RequestHandlerTcp::Authorize(const std::string& beamtime_id) { - GenericRequestHeader header{kOpcodeAuthorize, 0, 0, beamtime_id.c_str()}; + GenericRequestHeader header{kOpcodeAuthorize, 0, 0, 0, beamtime_id.c_str()}; Error err; io__->Send(sd_, &header, sizeof(header), &err); if(err) { @@ -48,7 +48,7 @@ Error RequestHandlerTcp::ConnectToReceiver(const std::string& beamtime_id, const return nullptr; } -Error RequestHandlerTcp::SendHeaderAndData(const ProducerRequest* request) { +Error RequestHandlerTcp::SendRequestContent(const ProducerRequest* request) { Error io_error; io__->Send(sd_, &(request->header), sizeof(request->header), &io_error); if(io_error) { @@ -60,7 +60,9 @@ Error RequestHandlerTcp::SendHeaderAndData(const ProducerRequest* request) { return io_error; } - return nullptr; + io__->Send(sd_, (void*) request->metadata.c_str(), (size_t)request->header.meta_size, &io_error); + return io_error; + } Error RequestHandlerTcp::ReceiveResponse() { @@ -93,7 +95,7 @@ Error RequestHandlerTcp::ReceiveResponse() { } Error RequestHandlerTcp::TrySendToReceiver(const ProducerRequest* request) { - auto err = SendHeaderAndData(request); + auto err = SendRequestContent(request); if (err) { return err; } diff --git a/producer/api/src/request_handler_tcp.h b/producer/api/src/request_handler_tcp.h index 2b00f7f207dbc7303e3f3fcd263dc019f08aa73c..72394d73926f5be2f70db3acde9c6457888a5d37 100644 --- a/producer/api/src/request_handler_tcp.h +++ b/producer/api/src/request_handler_tcp.h @@ -32,7 +32,7 @@ class RequestHandlerTcp: public RequestHandler { Error Authorize(const std::string& beamtime_id); Error ConnectToReceiver(const std::string& beamtime_id, const std::string& receiver_address); Error SendDataToOneOfTheReceivers(ProducerRequest* request); - Error SendHeaderAndData(const ProducerRequest*); + Error SendRequestContent(const ProducerRequest* request); Error ReceiveResponse(); Error TrySendToReceiver(const ProducerRequest* request); SocketDescriptor sd_{kDisconnectedSocketDescriptor}; diff --git a/producer/api/unittests/test_producer.cpp b/producer/api/unittests/test_producer.cpp index 813fb1f34022a2968f9f27f232da4cb978a4fd18..25065d74af1ba987f1e490d2f71803a520104cd9 100644 --- a/producer/api/unittests/test_producer.cpp +++ b/producer/api/unittests/test_producer.cpp @@ -53,7 +53,7 @@ TEST(Producer, SimpleWorkflowWihoutConnection) { &err); asapo::EventHeader event_header{1, 1, ""}; - auto err_send = producer->SendData(event_header, nullptr, nullptr); + auto err_send = producer->SendData(event_header, nullptr, "", nullptr); std::this_thread::sleep_for(std::chrono::milliseconds(100)); ASSERT_THAT(producer, Ne(nullptr)); diff --git a/producer/api/unittests/test_producer_impl.cpp b/producer/api/unittests/test_producer_impl.cpp index f7aacf76ef0378bd408ab7e16d35244e870e31d4..ceddea6b18089d55b4cedab519f9912355a9252a 100644 --- a/producer/api/unittests/test_producer_impl.cpp +++ b/producer/api/unittests/test_producer_impl.cpp @@ -29,13 +29,14 @@ using asapo::RequestPool; using asapo::ProducerRequest; -MATCHER_P5(M_CheckSendDataRequest, op_code, beamtime_id, file_id, file_size, message, +MATCHER_P6(M_CheckSendDataRequest, op_code, beamtime_id, metadata, file_id, file_size, message, "Checks if a valid GenericRequestHeader was Send") { auto request = static_cast<ProducerRequest*>(arg); return ((asapo::GenericRequestHeader)(arg->header)).op_code == op_code && ((asapo::GenericRequestHeader)(arg->header)).data_id == file_id && ((asapo::GenericRequestHeader)(arg->header)).data_size == uint64_t(file_size) && request->beamtime_id == beamtime_id + && request->metadata == metadata && strcmp(((asapo::GenericRequestHeader)(arg->header)).message, message) == 0; } @@ -66,14 +67,14 @@ TEST_F(ProducerImplTests, SendReturnsError) { EXPECT_CALL(mock_pull, AddRequest_t(_)).WillOnce(Return( asapo::ProducerErrorTemplates::kRequestPoolIsFull.Generate().release())); asapo::EventHeader event_header{1, 1, ""}; - auto err = producer.SendData(event_header, nullptr, nullptr); + auto err = producer.SendData(event_header, nullptr, "", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull)); } TEST_F(ProducerImplTests, ErrorIfFileNameTooLong) { std::string long_string(asapo::kMaxMessageSize + 100, 'a'); asapo::EventHeader event_header{1, 1, long_string}; - auto err = producer.SendData(event_header, nullptr, nullptr); + auto err = producer.SendData(event_header, nullptr, "", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kFileNameTooLong)); } @@ -81,27 +82,31 @@ TEST_F(ProducerImplTests, ErrorIfFileNameTooLong) { TEST_F(ProducerImplTests, ErrorIfSizeTooLarge) { EXPECT_CALL(mock_logger, Error(testing::HasSubstr("error checking"))); asapo::EventHeader event_header{1, asapo::ProducerImpl::kMaxChunkSize + 1, ""}; - auto err = producer.SendData(event_header, nullptr, nullptr); + auto err = producer.SendData(event_header, nullptr, "", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kFileTooLarge)); } TEST_F(ProducerImplTests, OKSendingSendDataRequest) { uint64_t expected_size = 100; + uint64_t expected_meta_size = 4; uint64_t expected_id = 10; char expected_name[asapo::kMaxMessageSize] = "test_name"; std::string expected_beamtimeid = "beamtime_id"; + std::string expected_metadata = "meta"; producer.SetBeamtimeId(expected_beamtimeid); - ProducerRequest request{"", asapo::GenericRequestHeader{asapo::kOpcodeTransferData, expected_id, expected_size, expected_name}, - nullptr, "", nullptr}; + ProducerRequest request{"", asapo::GenericRequestHeader{asapo::kOpcodeTransferData, expected_id, + expected_size, expected_meta_size, expected_name}, + nullptr, expected_metadata, "", nullptr}; EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, - expected_beamtimeid, expected_id, expected_size, expected_name))).WillOnce(Return( + expected_beamtimeid, expected_metadata, + expected_id, expected_size, expected_name))).WillOnce(Return( nullptr)); asapo::EventHeader event_header{expected_id, expected_size, expected_name}; - auto err = producer.SendData(event_header, nullptr, nullptr); + auto err = producer.SendData(event_header, nullptr, expected_metadata, nullptr); ASSERT_THAT(err, Eq(nullptr)); } @@ -116,7 +121,7 @@ TEST_F(ProducerImplTests, OKAddingSendMetaDataRequest) { producer.SetBeamtimeId(expected_beamtimeid); EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferMetaData, - expected_beamtimeid, expected_id, expected_size, "beamtime_global.meta"))).WillOnce(Return( + expected_beamtimeid, "", expected_id, expected_size, "beamtime_global.meta"))).WillOnce(Return( nullptr)); auto err = producer.SendMetaData(expected_metadata, nullptr); @@ -131,11 +136,11 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequest) { std::string expected_fullpath = "filename"; producer.SetBeamtimeId(expected_beamtimeid); - ProducerRequest request{"", asapo::GenericRequestHeader{asapo::kOpcodeTransferData, expected_id, 0, expected_name}, - nullptr, "", nullptr}; + ProducerRequest request{"", asapo::GenericRequestHeader{asapo::kOpcodeTransferData, expected_id, 0, 0, expected_name}, + nullptr, "", "", nullptr}; EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, - expected_beamtimeid, expected_id, 0, expected_name))).WillOnce(Return( + expected_beamtimeid, "", expected_id, 0, expected_name))).WillOnce(Return( nullptr)); asapo::EventHeader event_header{expected_id, 0, expected_name}; diff --git a/producer/api/unittests/test_producer_request.cpp b/producer/api/unittests/test_producer_request.cpp index f56b5197e0622d1e8109c3eb51002ae76a07afcd..9e2b95479e5d8567119efbce8f587be79ea0c6bd 100644 --- a/producer/api/unittests/test_producer_request.cpp +++ b/producer/api/unittests/test_producer_request.cpp @@ -38,16 +38,21 @@ TEST(ProducerRequest, Constructor) { char expected_beamtime_id[asapo::kMaxMessageSize] = "test_beamtime_id"; uint64_t expected_file_id = 42; uint64_t expected_file_size = 1337; + uint64_t expected_meta_size = 137; + std::string expected_meta = "meta"; asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; - asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_file_name}; - asapo::ProducerRequest request{expected_beamtime_id, std::move(header), nullptr, "", nullptr}; + asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, + expected_meta_size, expected_file_name}; + asapo::ProducerRequest request{expected_beamtime_id, std::move(header), nullptr, expected_meta, "", nullptr}; ASSERT_THAT(request.beamtime_id, Eq(expected_beamtime_id)); - ASSERT_THAT(request.header.message, testing::StrEq("test_name")); + ASSERT_THAT(request.metadata, Eq(expected_meta)); + ASSERT_THAT(request.header.message, testing::StrEq(expected_file_name)); ASSERT_THAT(request.header.data_size, Eq(expected_file_size)); ASSERT_THAT(request.header.data_id, Eq(expected_file_id)); ASSERT_THAT(request.header.op_code, Eq(expected_op_code)); + ASSERT_THAT(request.header.meta_size, Eq(expected_meta_size)); } diff --git a/producer/api/unittests/test_request_handler_filesystem.cpp b/producer/api/unittests/test_request_handler_filesystem.cpp index 93176e0f777a1e06578ad19ea9a5908f5a75566c..bce8b8a9255d34e81caf7ea196cee42ea5e95cc6 100644 --- a/producer/api/unittests/test_request_handler_filesystem.cpp +++ b/producer/api/unittests/test_request_handler_filesystem.cpp @@ -44,6 +44,7 @@ class RequestHandlerFilesystemTests : public testing::Test { uint64_t expected_file_id = 42; uint64_t expected_file_size = 1337; + uint64_t expected_meta_size = 2337; std::string expected_file_name = "test_name"; uint64_t expected_thread_id = 2; std::string expected_destination = "destination"; @@ -52,17 +53,18 @@ class RequestHandlerFilesystemTests : public testing::Test { asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; asapo::Error callback_err; - asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_file_name}; + asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, + expected_meta_size, expected_file_name}; bool called = false; asapo::GenericRequestHeader callback_header; - asapo::ProducerRequest request{"", header, nullptr, "", [this](asapo::GenericRequestHeader header, asapo::Error err) { + asapo::ProducerRequest request{"", header, nullptr, "", "", [this](asapo::GenericRequestHeader header, asapo::Error err) { called = true; callback_err = std::move(err); callback_header = header; }}; - asapo::ProducerRequest request_nocallback{"", header, nullptr, "", nullptr}; - asapo::ProducerRequest request_filesend{"", header, nullptr, expected_origin_fullpath, nullptr}; + asapo::ProducerRequest request_nocallback{"", header, nullptr, "", "", nullptr}; + asapo::ProducerRequest request_filesend{"", header, nullptr, "", expected_origin_fullpath, nullptr}; testing::NiceMock<asapo::MockLogger> mock_logger; diff --git a/producer/api/unittests/test_request_handler_tcp.cpp b/producer/api/unittests/test_request_handler_tcp.cpp index 185f257dc890ea0417a8c6dc5060e104f44230e4..b5bd0daeb2039121c307fec51d91c12f6bb96c78 100644 --- a/producer/api/unittests/test_request_handler_tcp.cpp +++ b/producer/api/unittests/test_request_handler_tcp.cpp @@ -51,6 +51,9 @@ class RequestHandlerTcpTests : public testing::Test { uint64_t expected_file_id = 42; uint64_t expected_file_size = 1337; + uint64_t expected_meta_size = 4; + std::string expected_metadata = "meta"; + char expected_file_name[asapo::kMaxMessageSize] = "test_name"; char expected_beamtime_id[asapo::kMaxMessageSize] = "test_beamtime_id"; @@ -58,20 +61,20 @@ class RequestHandlerTcpTests : public testing::Test { asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; asapo::Error callback_err; - asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_file_name}; + asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_meta_size, expected_file_name}; bool called = false; asapo::GenericRequestHeader callback_header; - asapo::ProducerRequest request{expected_beamtime_id, header, nullptr, "", [this](asapo::GenericRequestHeader header, asapo::Error err) { + asapo::ProducerRequest request{expected_beamtime_id, header, nullptr, expected_metadata, "", [this](asapo::GenericRequestHeader header, asapo::Error err) { called = true; callback_err = std::move(err); callback_header = header; }}; std::string expected_origin_fullpath = std::string("origin/") + expected_file_name; - asapo::ProducerRequest request_filesend{expected_beamtime_id, header, nullptr, expected_origin_fullpath, nullptr}; + asapo::ProducerRequest request_filesend{expected_beamtime_id, header, nullptr, expected_metadata, expected_origin_fullpath, nullptr}; - asapo::ProducerRequest request_nocallback{expected_beamtime_id, header, nullptr, "", nullptr}; + asapo::ProducerRequest request_nocallback{expected_beamtime_id, header, nullptr, expected_metadata, "", nullptr}; testing::NiceMock<asapo::MockLogger> mock_logger; uint64_t n_connections{0}; asapo::RequestHandlerTcp request_handler{&mock_discovery_service, expected_thread_id, &n_connections}; @@ -90,10 +93,15 @@ class RequestHandlerTcpTests : public testing::Test { void ExpectFailAuthorize(bool only_once = false); void ExpectOKAuthorize(bool only_once = false); void ExpectFailSendHeader(bool only_once = false); + void ExpectFailSend(uint64_t expected_size, bool only_once); void ExpectFailSendData(bool only_once = false); + void ExpectFailSendMetaData(bool only_once = false); void ExpectOKConnect(bool only_once = false); void ExpectOKSendHeader(bool only_once = false); + void ExpectOKSend(uint64_t expected_size, bool only_once); + void ExpectOKSendAll(bool only_once); void ExpectOKSendData(bool only_once = false); + void ExpectOKSendMetaData(bool only_once = false); void ExpectFailReceive(bool only_once = false); void ExpectOKReceive(bool only_once = true); void DoSingleSend(bool connect = true, bool success = true); @@ -244,10 +252,10 @@ void RequestHandlerTcpTests::ExpectFailSendHeader(bool only_once) { } -void RequestHandlerTcpTests::ExpectFailSendData(bool only_once) { +void RequestHandlerTcpTests::ExpectFailSend(uint64_t expected_size, bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, Send_t(expected_sd, nullptr, (size_t) expected_file_size, _)) + EXPECT_CALL(mock_io, Send_t(expected_sd, _, (size_t) expected_size, _)) .Times(1) .WillOnce( DoAll( @@ -273,6 +281,16 @@ void RequestHandlerTcpTests::ExpectFailSendData(bool only_once) { } +void RequestHandlerTcpTests::ExpectFailSendData(bool only_once) { + ExpectFailSend(expected_file_size, only_once); +} + +void RequestHandlerTcpTests::ExpectFailSendMetaData(bool only_once) { + ExpectFailSend(expected_meta_size, only_once); +} + + + void RequestHandlerTcpTests::ExpectFailReceive(bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { @@ -302,10 +320,16 @@ void RequestHandlerTcpTests::ExpectFailReceive(bool only_once) { } +void RequestHandlerTcpTests::ExpectOKSendAll(bool only_once) { + ExpectOKSendHeader(only_once); + ExpectOKSendData(only_once); + ExpectOKSendMetaData(only_once); +} + -void RequestHandlerTcpTests::ExpectOKSendData(bool only_once) { +void RequestHandlerTcpTests::ExpectOKSend(uint64_t expected_size, bool only_once) { for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, Send_t(expected_sd, nullptr, (size_t)expected_file_size, _)) + EXPECT_CALL(mock_io, Send_t(expected_sd, _, (size_t)expected_size, _)) .Times(1) .WillOnce( DoAll( @@ -314,7 +338,16 @@ void RequestHandlerTcpTests::ExpectOKSendData(bool only_once) { )); if (only_once) break; } +} + +void RequestHandlerTcpTests::ExpectOKSendMetaData(bool only_once) { + ExpectOKSend(expected_meta_size, only_once); +} + + +void RequestHandlerTcpTests::ExpectOKSendData(bool only_once) { + ExpectOKSend(expected_file_size, only_once); } @@ -382,8 +415,7 @@ void RequestHandlerTcpTests::DoSingleSend(bool connect, bool success) { ExpectOKAuthorize(true); } - ExpectOKSendHeader(true); - ExpectOKSendData(true); + ExpectOKSendAll(true); if (success) { ExpectOKReceive(true); } else { @@ -543,14 +575,27 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendData) { ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } +TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendMetaData) { + ExpectOKConnect(); + ExpectOKAuthorize(); + ExpectOKSendHeader(); + ExpectOKSendData(); + ExpectFailSendMetaData(); + + request_handler.PrepareProcessingRequestLocked(); + auto err = request_handler.ProcessRequestUnlocked(&request); + + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); +} + + TEST_F(RequestHandlerTcpTests, ErrorWhenCannotReceiveData) { EXPECT_CALL(mock_discovery_service, RotatedUriList(_)). WillOnce(Return(receivers_list_single)); ExpectOKConnect(true); ExpectOKAuthorize(true); - ExpectOKSendHeader(true); - ExpectOKSendData(true); + ExpectOKSendAll(true); ExpectFailReceive(true); request_handler.PrepareProcessingRequestLocked(); @@ -563,8 +608,7 @@ void RequestHandlerTcpTests::AssertImmediatelyCallBack(asapo::NetworkErrorCode e const asapo::ProducerErrorTemplate& err_template) { ExpectOKConnect(true); ExpectOKAuthorize(true); - ExpectOKSendHeader(true); - ExpectOKSendData(true); + ExpectOKSendAll(true); EXPECT_CALL(mock_io, Receive_t(expected_sds[0], _, sizeof(asapo::SendDataResponse), _)) .InSequence(seq_receive) @@ -583,7 +627,6 @@ void RequestHandlerTcpTests::AssertImmediatelyCallBack(asapo::NetworkErrorCode e ASSERT_THAT(err, Eq(nullptr)); } - TEST_F(RequestHandlerTcpTests, ImmediatelyCallBackErrorIfFileAlreadyInUse) { AssertImmediatelyCallBack(asapo::kNetErrorFileIdAlreadyInUse, asapo::ProducerErrorTemplates::kFileIdAlreadyInUse); } @@ -597,8 +640,7 @@ TEST_F(RequestHandlerTcpTests, ImmediatelyCallBackErrorIfWrongMetadata) { TEST_F(RequestHandlerTcpTests, SendEmptyCallBack) { ExpectOKConnect(true); ExpectOKAuthorize(true); - ExpectOKSendHeader(true); - ExpectOKSendData(true); + ExpectOKSendAll(true); ExpectOKReceive(); request_handler.PrepareProcessingRequestLocked(); @@ -628,8 +670,7 @@ TEST_F(RequestHandlerTcpTests, FileRequestErrorOnReadData) { TEST_F(RequestHandlerTcpTests, FileRequestOK) { ExpectOKConnect(true); ExpectOKAuthorize(true); - ExpectOKSendHeader(true); - ExpectOKSendData(true); + ExpectOKSendAll(true); ExpectOKReceive(); request_handler.PrepareProcessingRequestLocked(); @@ -651,8 +692,7 @@ TEST_F(RequestHandlerTcpTests, FileRequestOK) { TEST_F(RequestHandlerTcpTests, SendOK) { ExpectOKConnect(true); ExpectOKAuthorize(true); - ExpectOKSendHeader(true); - ExpectOKSendData(true); + ExpectOKSendAll(true); ExpectOKReceive(); request_handler.PrepareProcessingRequestLocked(); diff --git a/receiver/unittests/receiver_data_server/test_request_handler.cpp b/receiver/unittests/receiver_data_server/test_request_handler.cpp index e515d3c26ebfdcefa76eead5a09c48b9d9e5e256..48ca98b975b9a3ab4dff7f5df8aac5344d26607e 100644 --- a/receiver/unittests/receiver_data_server/test_request_handler.cpp +++ b/receiver/unittests/receiver_data_server/test_request_handler.cpp @@ -54,9 +54,11 @@ class RequestHandlerTests : public Test { ReceiverDataServerRequestHandler handler_no_cache{&mock_net, nullptr, &mock_stat}; NiceMock<asapo::MockLogger> mock_logger; uint64_t expected_data_size = 1001243214; + uint64_t expected_meta_size = 100; uint64_t expected_buf_id = 12345; uint64_t expected_source_id = 11; - asapo::GenericRequestHeader header{asapo::kOpcodeGetBufferData, expected_buf_id, expected_data_size, ""}; + asapo::GenericRequestHeader header{asapo::kOpcodeGetBufferData, expected_buf_id, expected_data_size, + expected_meta_size, ""}; asapo::ReceiverDataServerRequest request{std::move(header), expected_source_id}; uint8_t tmp; void SetUp() override { diff --git a/receiver/unittests/test_connection.cpp b/receiver/unittests/test_connection.cpp index 9b5c63ee5e188f615385abfb18ee4e0e14fc9c70..edacbcd6e98c558937fe808603bdcb8770617a57 100644 --- a/receiver/unittests/test_connection.cpp +++ b/receiver/unittests/test_connection.cpp @@ -120,7 +120,7 @@ class ConnectionTests : public Test { )); return nullptr; } else { - auto request = new Request(GenericRequestHeader{asapo::kOpcodeUnknownOp, 0, 1, ""}, 0, connected_uri, nullptr); + auto request = new Request(GenericRequestHeader{asapo::kOpcodeUnknownOp, 0, 1, 0, ""}, 0, connected_uri, nullptr); EXPECT_CALL(mock_dispatcher, GetNextRequest_t(_)) .WillOnce(DoAll( SetArgPointee<0>(nullptr),