diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index 6136e2f5ba8efea410bedfe52e30313f7bb3c4e8..d9760b2409493939046b6a94bd47e270ea811dee 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -34,7 +34,7 @@ enum NetworkErrorCode : uint16_t { //TODO need to use an serialization framework to ensure struct consistency on different computers const std::size_t kMaxMessageSize = 1024; -const std::size_t kNCustomParams = 2; +const std::size_t kNCustomParams = 3; using CustomRequestData = uint64_t[kNCustomParams]; diff --git a/producer/api/include/producer/producer_error.h b/producer/api/include/producer/producer_error.h index 8d0e4c4d7102862fef3acd56eca7241c3d227105..856af4d5e99502c19a47402d0f32a4be3772c617 100644 --- a/producer/api/include/producer/producer_error.h +++ b/producer/api/include/producer/producer_error.h @@ -18,7 +18,8 @@ enum class ProducerErrorType { kAuthorizationFailed, kInternalServerError, kCannotSendDataToReceivers, - kRequestPoolIsFull + kRequestPoolIsFull, + kWrongInjestMode }; using ProducerErrorTemplate = ServiceErrorTemplate<ProducerErrorType, ErrorType::kProducerError>; @@ -31,6 +32,12 @@ auto const kConnectionNotReady = ProducerErrorTemplate { "Connection not ready", ProducerErrorType::kConnectionNotReady }; +auto const kWrongInjestMode = ProducerErrorTemplate { + "wrong injest mode", ProducerErrorType::kWrongInjestMode +}; + + + auto const kErrorSubsetSize = ProducerErrorTemplate { "Error in subset size", ProducerErrorType::kErrorSubsetSize }; diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp index 17b434c9d26ab69e28e8cf205994d322205983c0..40fd02ea103e3f36eb217d3478b8a0ab2da0db44 100644 --- a/producer/api/src/producer_impl.cpp +++ b/producer/api/src/producer_impl.cpp @@ -8,6 +8,8 @@ #include "producer/producer_error.h" #include "producer_request_handler_factory.h" #include "producer_request.h" +#include "common/data_structs.h" + namespace asapo { @@ -29,18 +31,33 @@ ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads, a request_pool__.reset(new RequestPool{n_processing_threads, request_handler_factory_.get(), log__}); } -GenericRequestHeader ProducerImpl::GenerateNextSendRequest(const EventHeader& event_header) { +GenericRequestHeader ProducerImpl::GenerateNextSendRequest(const EventHeader& event_header, uint64_t injest_mode) { GenericRequestHeader request{kOpcodeTransferData, event_header.file_id, event_header.file_size, event_header.user_metadata.size(), std::move(event_header.file_name)}; if (event_header.subset_id != 0) { request.op_code = kOpcodeTransferSubsetData; - request.custom_data[0] = event_header.subset_id; - request.custom_data[1] = event_header.subset_size; + request.custom_data[1] = event_header.subset_id; + request.custom_data[2] = event_header.subset_size; } + request.custom_data[0] = injest_mode; return request; } -Error CheckProducerRequest(const EventHeader& event_header) { +Error CheckInjestMode(uint64_t injest_mode) { + if ((injest_mode & IngestModeFlags::kTransferData) && + (injest_mode & IngestModeFlags::kTransferMetaDataOnly)) { + return ProducerErrorTemplates::kWrongInjestMode.Generate(); + } + + if (!(injest_mode & IngestModeFlags::kTransferData) && + !(injest_mode & IngestModeFlags::kTransferMetaDataOnly)) { + return ProducerErrorTemplates::kWrongInjestMode.Generate(); + } + + return nullptr; +} + +Error CheckProducerRequest(const EventHeader& event_header, uint64_t injest_mode) { if ((size_t)event_header.file_size > ProducerImpl::kMaxChunkSize) { return ProducerErrorTemplates::kFileTooLarge.Generate(); } @@ -53,7 +70,7 @@ Error CheckProducerRequest(const EventHeader& event_header) { return ProducerErrorTemplates::kErrorSubsetSize.Generate(); } - return nullptr; + return CheckInjestMode(injest_mode); } Error ProducerImpl::Send(const EventHeader& event_header, @@ -61,13 +78,13 @@ Error ProducerImpl::Send(const EventHeader& event_header, std::string full_path, uint64_t injest_mode, RequestCallback callback) { - auto err = CheckProducerRequest(event_header); + auto err = CheckProducerRequest(event_header, injest_mode); if (err) { log__->Error("error checking request - " + err->Explain()); return err; } - auto request_header = GenerateNextSendRequest(event_header); + auto request_header = GenerateNextSendRequest(event_header, injest_mode); return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header), std::move(data), std::move(event_header.user_metadata), std::move(full_path), callback} diff --git a/producer/api/src/producer_impl.h b/producer/api/src/producer_impl.h index 487943b3d73a0d4a098b3b59192decdf43eeb6e6..d3eba882d8f8079d6b5a93efbab5967297c25512 100644 --- a/producer/api/src/producer_impl.h +++ b/producer/api/src/producer_impl.h @@ -41,7 +41,7 @@ class ProducerImpl : public Producer { private: Error Send(const EventHeader& event_header, FileData data, std::string full_path, uint64_t injest_mode, RequestCallback callback); - GenericRequestHeader GenerateNextSendRequest(const EventHeader& event_header); + GenericRequestHeader GenerateNextSendRequest(const EventHeader& event_header, uint64_t injest_mode); std::string source_cred_string_; }; diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp index ada8ed2ea26e8bed257e6b64a7659661243ffd12..e8546eb9282148a7544a083d22b383bb43c7fe79 100644 --- a/producer/api/src/request_handler_tcp.cpp +++ b/producer/api/src/request_handler_tcp.cpp @@ -48,6 +48,14 @@ Error RequestHandlerTcp::ConnectToReceiver(const std::string& beamtime_id, const return nullptr; } +bool NeedSendData(const ProducerRequest* request) { + if (request->header.op_code == kOpcodeTransferData || request->header.op_code == kOpcodeTransferSubsetData) { + return request->header.custom_data[0] & IngestModeFlags::kTransferData; + } + + return true; +} + Error RequestHandlerTcp::SendRequestContent(const ProducerRequest* request) { Error io_error; io__->Send(sd_, &(request->header), sizeof(request->header), &io_error); @@ -55,7 +63,10 @@ Error RequestHandlerTcp::SendRequestContent(const ProducerRequest* request) { return io_error; } - io__->Send(sd_, (void*) request->data.get(), (size_t)request->header.data_size, &io_error); + if (NeedSendData(request)) { + io__->Send(sd_, (void*) request->data.get(), (size_t)request->header.data_size, &io_error); + } + if(io_error) { return io_error; } diff --git a/producer/api/unittests/test_producer_impl.cpp b/producer/api/unittests/test_producer_impl.cpp index aa0df2348d56c574140a9703b9f390902732dd92..f2136eb002bccca0672dc99f525b26a8cccc2b13 100644 --- a/producer/api/unittests/test_producer_impl.cpp +++ b/producer/api/unittests/test_producer_impl.cpp @@ -28,24 +28,25 @@ using ::testing::HasSubstr; using asapo::RequestPool; using asapo::ProducerRequest; - -MATCHER_P8(M_CheckSendDataRequest, op_code, source_credentials, metadata, file_id, file_size, message, subset_id, +MATCHER_P9(M_CheckSendDataRequest, op_code, source_credentials, metadata, file_id, file_size, message, injest_mode, + subset_id, subset_size, "Checks if a valid GenericRequestHeader was Send") { auto request = static_cast<ProducerRequest*>(arg); - return ((asapo::GenericRequestHeader)(arg->header)).op_code == op_code - && ((asapo::GenericRequestHeader)(arg->header)).data_id == file_id - && ((asapo::GenericRequestHeader)(arg->header)).data_size == uint64_t(file_size) + return ((asapo::GenericRequestHeader) (arg->header)).op_code == op_code + && ((asapo::GenericRequestHeader) (arg->header)).data_id == file_id + && ((asapo::GenericRequestHeader) (arg->header)).data_size == uint64_t(file_size) && request->source_credentials == source_credentials && request->metadata == metadata - && (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader)(arg->header)).custom_data[0] == - uint64_t(subset_id) : true) - && (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader)(arg->header)).custom_data[1] == - uint64_t(subset_size) : true) - && strcmp(((asapo::GenericRequestHeader)(arg->header)).message, message) == 0; + && (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[1] + == uint64_t(subset_id) : true) + && (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[2] + == uint64_t(subset_size) : true) + && ((op_code == asapo::kOpcodeTransferSubsetData || op_code == asapo::kOpcodeTransferData) ? + ((asapo::GenericRequestHeader) (arg->header)).custom_data[0] == uint64_t(injest_mode) : true) + && strcmp(((asapo::GenericRequestHeader) (arg->header)).message, message) == 0; } - TEST(ProducerImpl, Constructor) { asapo::ProducerImpl producer{"", 4, asapo::RequestHandlerType::kTcp}; ASSERT_THAT(dynamic_cast<asapo::AbstractLogger*>(producer.log__), Ne(nullptr)); @@ -63,16 +64,16 @@ class ProducerImplTests : public testing::Test { uint64_t expected_id = 10; uint64_t expected_subset_id = 100; uint64_t expected_subset_size = 4; + uint64_t expected_injest_mode = asapo::IngestModeFlags::kTransferData | asapo::IngestModeFlags::kStoreInCache; char expected_name[asapo::kMaxMessageSize] = "test_name"; - asapo::SourceCredentials expected_credentials { + asapo::SourceCredentials expected_credentials{ "beamtime_id", "subname", "token" }; - asapo::SourceCredentials expected_default_credentials { + asapo::SourceCredentials expected_default_credentials{ "beamtime_id", "", "token" }; - std::string expected_credentials_str = "beamtime_id%subname%token"; std::string expected_default_credentials_str = "beamtime_id%detector%token"; @@ -102,7 +103,6 @@ TEST_F(ProducerImplTests, ErrorIfFileNameTooLong) { ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kFileNameTooLong)); } - TEST_F(ProducerImplTests, ErrorIfSizeTooLarge) { EXPECT_CALL(mock_logger, Error(testing::HasSubstr("error checking"))); asapo::EventHeader event_header{1, asapo::ProducerImpl::kMaxChunkSize + 1, ""}; @@ -117,32 +117,42 @@ TEST_F(ProducerImplTests, ErrorIfSubsetSizeNotDefined) { ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kErrorSubsetSize)); } - TEST_F(ProducerImplTests, UsesDefaultStream) { producer.SetCredentials(expected_default_credentials); EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, - expected_default_credentials_str, expected_metadata, - expected_id, expected_size, expected_name, 0, 0))).WillOnce(Return( - nullptr)); + expected_default_credentials_str, + expected_metadata, + expected_id, + expected_size, + expected_name, + expected_injest_mode, + 0, + 0))).WillOnce(Return( + nullptr)); asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata}; - auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode , nullptr); + auto err = producer.SendData(event_header, nullptr, expected_injest_mode, nullptr); ASSERT_THAT(err, Eq(nullptr)); } - TEST_F(ProducerImplTests, OKSendingSendDataRequest) { producer.SetCredentials(expected_credentials); EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, - expected_credentials_str, expected_metadata, - expected_id, expected_size, expected_name, 0, 0))).WillOnce(Return( - nullptr)); + expected_credentials_str, + expected_metadata, + expected_id, + expected_size, + expected_name, + expected_injest_mode, + 0, + 0))).WillOnce(Return( + nullptr)); asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata}; - auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr); + auto err = producer.SendData(event_header, nullptr, expected_injest_mode, nullptr); ASSERT_THAT(err, Eq(nullptr)); } @@ -152,17 +162,18 @@ TEST_F(ProducerImplTests, OKSendingSendSubsetDataRequest) { EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferSubsetData, expected_credentials_str, expected_metadata, expected_id, expected_size, expected_name, - expected_subset_id, expected_subset_size))).WillOnce(Return( - nullptr)); + expected_injest_mode, + expected_subset_id, expected_subset_size))).WillOnce( + Return( + nullptr)); - asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata, expected_subset_id, expected_subset_size}; - auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode , nullptr); + asapo::EventHeader event_header + {expected_id, expected_size, expected_name, expected_metadata, expected_subset_id, expected_subset_size}; + auto err = producer.SendData(event_header, nullptr, expected_injest_mode, nullptr); ASSERT_THAT(err, Eq(nullptr)); } - - TEST_F(ProducerImplTests, OKAddingSendMetaDataRequest) { expected_id = 0; expected_metadata = "{\"meta\":10}"; @@ -170,8 +181,14 @@ TEST_F(ProducerImplTests, OKAddingSendMetaDataRequest) { producer.SetCredentials(expected_credentials); EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferMetaData, - expected_credentials_str, "", expected_id, - expected_size, "beamtime_global.meta", 10, 10))).WillOnce(Return( + expected_credentials_str, + "", + expected_id, + expected_size, + "beamtime_global.meta", + 0, + 10, + 10))).WillOnce(Return( nullptr)); auto err = producer.SendMetaData(expected_metadata, nullptr); @@ -183,16 +200,22 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequest) { producer.SetCredentials(expected_credentials); EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, - expected_credentials_str, "", expected_id, 0, expected_name, 0, 0))).WillOnce(Return( - nullptr)); + expected_credentials_str, + "", + expected_id, + 0, + expected_name, + expected_injest_mode, + 0, + 0))).WillOnce(Return( + nullptr)); asapo::EventHeader event_header{expected_id, 0, expected_name}; - auto err = producer.SendFile(event_header, expected_fullpath, asapo::kDefaultIngestMode, nullptr); + auto err = producer.SendFile(event_header, expected_fullpath, expected_injest_mode, nullptr); ASSERT_THAT(err, Eq(nullptr)); } - TEST_F(ProducerImplTests, ErrorSettingBeamtime) { std::string long_str(asapo::kMaxMessageSize * 10, 'a'); expected_credentials = asapo::SourceCredentials{long_str, "", ""}; @@ -212,6 +235,22 @@ TEST_F(ProducerImplTests, ErrorSettingSecondTime) { ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCredentialsAlreadySet)); } +TEST_F(ProducerImplTests, ErrorSendingWrongInjestMode) { + producer.SetCredentials(expected_credentials); + + EXPECT_CALL(mock_pull, AddRequest_t(_)).Times(0); + + asapo::EventHeader event_header{expected_id, 0, expected_name}; + auto injest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly | asapo::IngestModeFlags::kTransferData; + auto err = producer.SendFile(event_header, expected_fullpath, injest_mode, nullptr); + + injest_mode = 0; + auto err_null = producer.SendFile(event_header, expected_fullpath, injest_mode, nullptr); + + + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInjestMode)); + ASSERT_THAT(err_null, Eq(asapo::ProducerErrorTemplates::kWrongInjestMode)); +} } diff --git a/producer/api/unittests/test_request_handler_tcp.cpp b/producer/api/unittests/test_request_handler_tcp.cpp index b5bd0daeb2039121c307fec51d91c12f6bb96c78..c0b9cfb4239a2653466606fa8dee59bd2d8e380c 100644 --- a/producer/api/unittests/test_request_handler_tcp.cpp +++ b/producer/api/unittests/test_request_handler_tcp.cpp @@ -43,6 +43,7 @@ TEST(RequestHandlerTcp, Constructor) { } std::string expected_auth_message = {"12345"}; +asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; class RequestHandlerTcpTests : public testing::Test { public: @@ -59,7 +60,6 @@ class RequestHandlerTcpTests : public testing::Test { uint64_t expected_thread_id = 2; - asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; asapo::Error callback_err; asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_meta_size, expected_file_name}; bool called = false; @@ -97,7 +97,7 @@ class RequestHandlerTcpTests : public testing::Test { void ExpectFailSendData(bool only_once = false); void ExpectFailSendMetaData(bool only_once = false); void ExpectOKConnect(bool only_once = false); - void ExpectOKSendHeader(bool only_once = false); + void ExpectOKSendHeader(bool only_once = false, asapo::Opcode code = expected_op_code); void ExpectOKSend(uint64_t expected_size, bool only_once); void ExpectOKSendAll(bool only_once); void ExpectOKSendData(bool only_once = false); @@ -109,6 +109,9 @@ class RequestHandlerTcpTests : public testing::Test { void SetUp() override { request_handler.log__ = &mock_logger; request_handler.io__.reset(&mock_io); + request.header.custom_data[0] = asapo::kDefaultIngestMode; + request_filesend.header.custom_data[0] = asapo::kDefaultIngestMode; + request_nocallback.header.custom_data[0] = asapo::kDefaultIngestMode; ON_CALL(mock_discovery_service, RotatedUriList(_)). WillByDefault(Return(receivers_list)); @@ -352,9 +355,9 @@ void RequestHandlerTcpTests::ExpectOKSendData(bool only_once) { -void RequestHandlerTcpTests::ExpectOKSendHeader(bool only_once) { +void RequestHandlerTcpTests::ExpectOKSendHeader(bool only_once, asapo::Opcode opcode) { for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(expected_op_code, expected_file_id, + EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(opcode, expected_file_id, expected_file_size, expected_file_name), sizeof(asapo::GenericRequestHeader), _)) .WillOnce( @@ -686,9 +689,6 @@ TEST_F(RequestHandlerTcpTests, FileRequestOK) { ASSERT_THAT(err, Eq(nullptr)); } - - - TEST_F(RequestHandlerTcpTests, SendOK) { ExpectOKConnect(true); ExpectOKAuthorize(true); @@ -707,5 +707,41 @@ TEST_F(RequestHandlerTcpTests, SendOK) { ASSERT_THAT(std::string{callback_header.message}, Eq(std::string{header.message})); } +TEST_F(RequestHandlerTcpTests, SendMetadataIgnoresInjestMode) { + ExpectOKConnect(true); + ExpectOKAuthorize(true); + ExpectOKSendHeader(true, asapo::kOpcodeTransferMetaData); + ExpectOKSendData(true); + ExpectOKSendMetaData(true); + ExpectOKReceive(); + + auto injest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly; + request.header.custom_data[0] = injest_mode; + request.header.op_code = asapo::kOpcodeTransferMetaData; + + request_handler.PrepareProcessingRequestLocked(); + auto err = request_handler.ProcessRequestUnlocked(&request); + + ASSERT_THAT(err, Eq(nullptr)); +} + + +TEST_F(RequestHandlerTcpTests, SendMetaOnlyOK) { + ExpectOKConnect(true); + ExpectOKAuthorize(true); + ExpectOKSendHeader(true); + ExpectOKSendMetaData(true); + ExpectOKReceive(); + + auto injest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly; + + request.header.custom_data[0] = injest_mode; + request_handler.PrepareProcessingRequestLocked(); + auto err = request_handler.ProcessRequestUnlocked(&request); + + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(callback_header.custom_data[0], Eq(injest_mode)); +} + } diff --git a/receiver/src/request_handler_db_write.cpp b/receiver/src/request_handler_db_write.cpp index 7540492fbace9d6b3226b793bfb37e9e33e93c3b..f1491480cdf2730910df36f019e8293d58b870b6 100644 --- a/receiver/src/request_handler_db_write.cpp +++ b/receiver/src/request_handler_db_write.cpp @@ -37,8 +37,8 @@ Error RequestHandlerDbWrite::InsertRecordToDb(const Request* request) const { " at " + GetReceiverConfig()->broker_db_uri); } } else { - auto subset_id = request->GetCustomData()[0]; - auto subset_size = request->GetCustomData()[1]; + auto subset_id = request->GetCustomData()[1]; + auto subset_size = request->GetCustomData()[2]; err = db_client__->InsertAsSubset(file_info, subset_id, subset_size, true); if (!err) { log__->Debug(std::string{"insert record as subset id "} + std::to_string(subset_id) + ", id: " + diff --git a/receiver/unittests/test_request_handler_db_writer.cpp b/receiver/unittests/test_request_handler_db_writer.cpp index 67fa5bdc9cc0a41ab448321e81d61b770c1192be..5f83e996463d3edd8ad836544e3f1f582c76241f 100644 --- a/receiver/unittests/test_request_handler_db_writer.cpp +++ b/receiver/unittests/test_request_handler_db_writer.cpp @@ -73,7 +73,7 @@ class DbWriterHandlerTests : public Test { uint64_t expected_id = 15; uint64_t expected_subset_id = 15; uint64_t expected_subset_size = 2; - uint64_t expected_custom_data[2] {expected_subset_id, expected_subset_size}; + uint64_t expected_custom_data[asapo::kNCustomParams] {0, expected_subset_id, expected_subset_size}; void SetUp() override { GenericRequestHeader request_header; request_header.data_id = 2;