diff --git a/CHANGELOG.md b/CHANGELOG.md index fa2fca85119a9d6a75aa44baa13a433e9c58d754..46168d3cf43e55234c5a72d6e2c790e01e23ebef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 20.09.2 + +FEATURES +* implemented possibility to send data without writing to database (no need of consecutive indexes, etc. but will not be able to consume such data) + + ## 20.09.1 FEATURES diff --git a/CMakeModules/prepare_asapo.cmake b/CMakeModules/prepare_asapo.cmake index 981593512e060dcb2ebbf1e2d4cf940b5ddc2909..0c5baa12c4bf1815588ba447d22fe4fe78a51d2a 100644 --- a/CMakeModules/prepare_asapo.cmake +++ b/CMakeModules/prepare_asapo.cmake @@ -22,10 +22,6 @@ function(prepare_asapo) set(RECEIVER_USE_CACHE true) endif() - if(NOT DEFINED RECEIVER_WRITE_TO_DISK) - set(RECEIVER_WRITE_TO_DISK true) - endif() - if (WIN32) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver_tcp.json.tpl.win.in receiver_tcp.json.tpl @ONLY) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/authorizer_settings.json.tpl.win authorizer.json.tpl COPYONLY) diff --git a/common/cpp/include/common/data_structs.h b/common/cpp/include/common/data_structs.h index 66d6e1ebe0da6b9fac02f6b7dbae2fb74120475e..b44afb9833e37b0db1177edbda611999d3923da0 100644 --- a/common/cpp/include/common/data_structs.h +++ b/common/cpp/include/common/data_structs.h @@ -103,9 +103,10 @@ enum IngestModeFlags : uint64_t { kTransferData = 1 << 0, kTransferMetaDataOnly = 1 << 1, kStoreInFilesystem = 1 << 2, + kStoreInDatabase = 1 << 3, }; -const uint64_t kDefaultIngestMode = kTransferData | kStoreInFilesystem; +const uint64_t kDefaultIngestMode = kTransferData | kStoreInFilesystem | kStoreInDatabase; const std::string kDefaultSubstream = "default"; diff --git a/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json b/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json index f2f6b0f2eb404de90825b1924a8c5493ef8591d2..c034223fc2a8c0082d924ab5ba5db797f2dfc037 100644 --- a/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json +++ b/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json @@ -18,8 +18,6 @@ "ReservedShare": 10 }, "Tag": "receiver", - "WriteToDisk":true, "ReceiveToDiskThresholdMB": {{ .Values.ownServices.receiver.receiveToDiskThresholdMB }}, - "WriteToDb":true, "LogLevel": "info" } diff --git a/deploy/asapo_services/scripts/receiver.json.tpl b/deploy/asapo_services/scripts/receiver.json.tpl index ec2ef0969aad4b576bb5aeb6e77bfc89fcc21f1f..9f1b688a8a484091f84b5e9cac260e817dfc91a0 100644 --- a/deploy/asapo_services/scripts/receiver.json.tpl +++ b/deploy/asapo_services/scripts/receiver.json.tpl @@ -18,8 +18,6 @@ "ReservedShare": 10 }, "Tag": "{{ env "attr.unique.hostname" }}", - "WriteToDisk":true, "ReceiveToDiskThresholdMB": {{ env "NOMAD_META_receiver_receive_to_disk_threshold" }}, - "WriteToDb":true, "LogLevel": "{{ keyOrDefault "receiver_log_level" "info" }}" } diff --git a/docs/sphinx/source/producer.rst b/docs/sphinx/source/producer.rst index b3985141cffca4fd1de08ef40976933fad739c5b..2aff5fc87200d6e76529110ee5a09b69a073dff8 100644 --- a/docs/sphinx/source/producer.rst +++ b/docs/sphinx/source/producer.rst @@ -13,7 +13,8 @@ Injest modes: .. data:: INGEST_MODE_TRANSFER_DATA .. data:: INGEST_MODE_TRANSFER_METADATA_ONLY .. data:: INGEST_MODE_STORE_IN_FILESYSTEM -.. data:: DEFAULT_INGEST_MODE = INGEST_MODE_TRANSFER_DATA | INGEST_MODE_STORE_IN_FILESYSTEM +.. data:: INGEST_MODE_STORE_IN_DATABASE +.. data:: DEFAULT_INGEST_MODE = INGEST_MODE_TRANSFER_DATA | INGEST_MODE_STORE_IN_FILESYSTEM | INGEST_MODE_STORE_IN_DATABASE Logger levels: diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index 98adb7e921f15ae17d83547a64c4207f13f96914..6d85df00cf24e75a8ab7965f420cb89d4622aa67 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -58,6 +58,16 @@ Error CheckIngestMode(uint64_t ingest_mode) { return ProducerErrorTemplates::kWrongInput.Generate("wrong ingest mode"); } + if (ingest_mode & IngestModeFlags::kTransferData && + !(ingest_mode & (IngestModeFlags::kStoreInDatabase | IngestModeFlags::kStoreInFilesystem))) { + return ProducerErrorTemplates::kWrongInput.Generate("wrong ingest mode"); + } + + if (ingest_mode & IngestModeFlags::kTransferMetaDataOnly && + (ingest_mode & IngestModeFlags::kStoreInFilesystem)) { + return ProducerErrorTemplates::kWrongInput.Generate("wrong ingest mode"); + } + return nullptr; } @@ -204,7 +214,7 @@ Error ProducerImpl::SetCredentials(SourceCredentials source_cred) { Error ProducerImpl::SendMetaData(const std::string& metadata, RequestCallback callback) { GenericRequestHeader request_header{kOpcodeTransferMetaData, 0, metadata.size(), 0, "beamtime_global.meta"}; - request_header.custom_data[kPosIngestMode] = asapo::IngestModeFlags::kTransferData; + request_header.custom_data[kPosIngestMode] = asapo::IngestModeFlags::kTransferData | asapo::IngestModeFlags::kStoreInDatabase; FileData data{new uint8_t[metadata.size()]}; strncpy((char*)data.get(), metadata.c_str(), metadata.size()); return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header), diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index 248d4cda47c6432fcc87d90d19306dcaccc9eab7..3b31e4d84e2435a3a0a0ecd3614a00dfa1cee544 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -35,18 +35,18 @@ MATCHER_P10(M_CheckSendDataRequest, op_code, source_credentials, metadata, file_ "Checks if a valid GenericRequestHeader was Send") { auto request = static_cast<ProducerRequest*>(arg); return ((asapo::GenericRequestHeader) (arg->header)).op_code == op_code - && ((asapo::GenericRequestHeader) (arg->header)).data_id == file_id - && ((asapo::GenericRequestHeader) (arg->header)).data_size == uint64_t(file_size) - && request->manage_data_memory == true - && request->source_credentials == source_credentials - && request->metadata == metadata - && (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[1] - == uint64_t(subset_id) : true) - && (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[2] - == uint64_t(subset_size) : true) - && ((asapo::GenericRequestHeader) (arg->header)).custom_data[asapo::kPosIngestMode] == uint64_t(ingest_mode) - && strcmp(((asapo::GenericRequestHeader) (arg->header)).message, message) == 0 - && strcmp(((asapo::GenericRequestHeader) (arg->header)).substream, substream) == 0; + && ((asapo::GenericRequestHeader) (arg->header)).data_id == file_id + && ((asapo::GenericRequestHeader) (arg->header)).data_size == uint64_t(file_size) + && request->manage_data_memory == true + && request->source_credentials == source_credentials + && request->metadata == metadata + && (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[1] + == uint64_t(subset_id) : true) + && (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[2] + == uint64_t(subset_size) : true) + && ((asapo::GenericRequestHeader) (arg->header)).custom_data[asapo::kPosIngestMode] == uint64_t(ingest_mode) + && strcmp(((asapo::GenericRequestHeader) (arg->header)).message, message) == 0 + && strcmp(((asapo::GenericRequestHeader) (arg->header)).substream, substream) == 0; } TEST(ProducerImpl, Constructor) { @@ -56,47 +56,47 @@ TEST(ProducerImpl, Constructor) { } class ProducerImplTests : public testing::Test { - public: - testing::NiceMock<MockDiscoveryService> service; - asapo::ProducerRequestHandlerFactory factory{&service}; - testing::NiceMock<asapo::MockLogger> mock_logger; - testing::NiceMock<MockRequestPull> mock_pull{&factory, &mock_logger}; - asapo::ProducerImpl producer{"", 1, 3600, asapo::RequestHandlerType::kTcp}; - uint64_t expected_size = 100; - uint64_t expected_id = 10; - uint64_t expected_subset_id = 100; - uint64_t expected_subset_size = 4; - uint64_t expected_ingest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly; - - char expected_name[asapo::kMaxMessageSize] = "test_name"; - char expected_substream[asapo::kMaxMessageSize] = "test_substream"; - std::string expected_next_substream = "next_substream"; - - asapo::SourceCredentials expected_credentials{asapo::SourceType::kRaw,"beamtime_id", "beamline", "subname", "token" - }; - asapo::SourceCredentials expected_default_credentials{ - asapo::SourceType::kProcessed,"beamtime_id", "", "", "token" - }; - - std::string expected_credentials_str = "raw%beamtime_id%beamline%subname%token"; - std::string expected_default_credentials_str = "processed%beamtime_id%auto%detector%token"; - - std::string expected_metadata = "meta"; - std::string expected_fullpath = "filename"; - bool expected_managed_memory = true; - bool expected_unmanaged_memory = false; - void SetUp() override { - producer.log__ = &mock_logger; - producer.request_pool__ = std::unique_ptr<RequestPool> {&mock_pull}; - } - void TearDown() override { - producer.request_pool__.release(); - } + public: + testing::NiceMock<MockDiscoveryService> service; + asapo::ProducerRequestHandlerFactory factory{&service}; + testing::NiceMock<asapo::MockLogger> mock_logger; + testing::NiceMock<MockRequestPull> mock_pull{&factory, &mock_logger}; + asapo::ProducerImpl producer{"", 1, 3600, asapo::RequestHandlerType::kTcp}; + uint64_t expected_size = 100; + uint64_t expected_id = 10; + uint64_t expected_subset_id = 100; + uint64_t expected_subset_size = 4; + uint64_t expected_ingest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly; + + char expected_name[asapo::kMaxMessageSize] = "test_name"; + char expected_substream[asapo::kMaxMessageSize] = "test_substream"; + std::string expected_next_substream = "next_substream"; + + asapo::SourceCredentials expected_credentials{asapo::SourceType::kRaw, "beamtime_id", "beamline", "subname", "token" + }; + asapo::SourceCredentials expected_default_credentials{ + asapo::SourceType::kProcessed, "beamtime_id", "", "", "token" + }; + + std::string expected_credentials_str = "raw%beamtime_id%beamline%subname%token"; + std::string expected_default_credentials_str = "processed%beamtime_id%auto%detector%token"; + + std::string expected_metadata = "meta"; + std::string expected_fullpath = "filename"; + bool expected_managed_memory = true; + bool expected_unmanaged_memory = false; + void SetUp() override { + producer.log__ = &mock_logger; + producer.request_pool__ = std::unique_ptr<RequestPool>{&mock_pull}; + } + void TearDown() override { + producer.request_pool__.release(); + } }; TEST_F(ProducerImplTests, SendReturnsError) { EXPECT_CALL(mock_pull, AddRequest_t(_, false)).WillOnce(Return( - asapo::ProducerErrorTemplates::kRequestPoolIsFull.Generate().release())); + asapo::ProducerErrorTemplates::kRequestPoolIsFull.Generate().release())); asapo::EventHeader event_header{1, 1, "test"}; auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull)); @@ -116,8 +116,6 @@ TEST_F(ProducerImplTests, ErrorIfFileEmpty) { ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } - - TEST_F(ProducerImplTests, ErrorIfSubsetSizeNotDefined) { EXPECT_CALL(mock_logger, Error(testing::HasSubstr("subset dimensions"))); asapo::EventHeader event_header{1, 1000, "test", "", 1}; @@ -126,7 +124,7 @@ TEST_F(ProducerImplTests, ErrorIfSubsetSizeNotDefined) { } TEST_F(ProducerImplTests, ErrorIfZeroDataSize) { - asapo::FileData data = asapo::FileData{new uint8_t[100] }; + asapo::FileData data = asapo::FileData{new uint8_t[100]}; asapo::EventHeader event_header{1, 0, expected_fullpath}; auto err = producer.SendData(event_header, std::move(data), asapo::kDefaultIngestMode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); @@ -157,26 +155,25 @@ TEST_F(ProducerImplTests, OkIfNoDataWithTransferMetadataOnlyMode) { } TEST_F(ProducerImplTests, OkIfZeroSizeWithTransferMetadataOnlyMode) { - asapo::FileData data = asapo::FileData{new uint8_t[100] }; + asapo::FileData data = asapo::FileData{new uint8_t[100]}; asapo::EventHeader event_header{1, 0, expected_fullpath}; auto err = producer.SendData(event_header, std::move(data), asapo::kTransferMetaDataOnly, nullptr); ASSERT_THAT(err, Eq(nullptr)); } - TEST_F(ProducerImplTests, UsesDefaultStream) { producer.SetCredentials(expected_default_credentials); EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, - expected_default_credentials_str, - expected_metadata, - expected_id, - expected_size, - expected_name, - asapo::kDefaultSubstream.c_str(), - expected_ingest_mode, - 0, - 0), false)).WillOnce(Return(nullptr)); + expected_default_credentials_str, + expected_metadata, + expected_id, + expected_size, + expected_name, + asapo::kDefaultSubstream.c_str(), + expected_ingest_mode, + 0, + 0), false)).WillOnce(Return(nullptr)); asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata}; auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr); @@ -188,17 +185,17 @@ TEST_F(ProducerImplTests, OKSendingSendDataRequest) { producer.SetCredentials(expected_credentials); EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, - expected_credentials_str, - expected_metadata, - expected_id, - expected_size, - expected_name, - asapo::kDefaultSubstream.c_str(), - expected_ingest_mode, - 0, - 0 - ), false)).WillOnce(Return( - nullptr)); + expected_credentials_str, + expected_metadata, + expected_id, + expected_size, + expected_name, + asapo::kDefaultSubstream.c_str(), + expected_ingest_mode, + 0, + 0 + ), false)).WillOnce(Return( + nullptr)); asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata}; auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr); @@ -210,17 +207,17 @@ TEST_F(ProducerImplTests, OKSendingSendDataRequestWithSubstream) { producer.SetCredentials(expected_credentials); EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, - expected_credentials_str, - expected_metadata, - expected_id, - expected_size, - expected_name, - expected_substream, - expected_ingest_mode, - 0, - 0 - ), false)).WillOnce(Return( - nullptr)); + expected_credentials_str, + expected_metadata, + expected_id, + expected_size, + expected_name, + expected_substream, + expected_ingest_mode, + 0, + 0 + ), false)).WillOnce(Return( + nullptr)); asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata}; auto err = producer.SendData(event_header, expected_substream, nullptr, expected_ingest_mode, nullptr); @@ -233,19 +230,18 @@ TEST_F(ProducerImplTests, OKSendingSubstreamFinish) { std::string next_stream_meta = std::string("{\"next_substream\":") + "\"" + expected_next_substream + "\"}"; - EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, - expected_credentials_str, - next_stream_meta.c_str(), - expected_id + 1, - 0, - asapo::ProducerImpl::kFinishSubStreamKeyword.c_str(), - expected_substream, - asapo::IngestModeFlags::kTransferMetaDataOnly, - 0, - 0 - ), false)).WillOnce(Return( - nullptr)); + expected_credentials_str, + next_stream_meta.c_str(), + expected_id + 1, + 0, + asapo::ProducerImpl::kFinishSubStreamKeyword.c_str(), + expected_substream, + asapo::IngestModeFlags::kTransferMetaDataOnly, + 0, + 0 + ), false)).WillOnce(Return( + nullptr)); auto err = producer.SendSubstreamFinishedFlag(expected_substream, expected_id, expected_next_substream, nullptr); @@ -255,42 +251,45 @@ TEST_F(ProducerImplTests, OKSendingSubstreamFinish) { TEST_F(ProducerImplTests, OKSendingSubstreamFinishWithNoNextStream) { producer.SetCredentials(expected_credentials); - std::string next_stream_meta = std::string("{\"next_substream\":") + "\"" + asapo::ProducerImpl::kNoNextSubStreamKeyword - + "\"}"; - + std::string + next_stream_meta = std::string("{\"next_substream\":") + "\"" + asapo::ProducerImpl::kNoNextSubStreamKeyword + + "\"}"; EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, - expected_credentials_str, - next_stream_meta.c_str(), - expected_id + 1, - 0, - asapo::ProducerImpl::kFinishSubStreamKeyword.c_str(), - expected_substream, - asapo::IngestModeFlags::kTransferMetaDataOnly, - 0, - 0 - ), false)).WillOnce(Return( - nullptr)); + expected_credentials_str, + next_stream_meta.c_str(), + expected_id + 1, + 0, + asapo::ProducerImpl::kFinishSubStreamKeyword.c_str(), + expected_substream, + asapo::IngestModeFlags::kTransferMetaDataOnly, + 0, + 0 + ), false)).WillOnce(Return( + nullptr)); auto err = producer.SendSubstreamFinishedFlag(expected_substream, expected_id, "", nullptr); ASSERT_THAT(err, Eq(nullptr)); } - - TEST_F(ProducerImplTests, OKSendingSendSubsetDataRequest) { producer.SetCredentials(expected_credentials); EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferSubsetData, - expected_credentials_str, expected_metadata, - expected_id, expected_size, expected_name, asapo::kDefaultSubstream.c_str(), - expected_ingest_mode, - expected_subset_id, expected_subset_size), false)).WillOnce( - Return( - nullptr)); + expected_credentials_str, + expected_metadata, + expected_id, + expected_size, + expected_name, + asapo::kDefaultSubstream.c_str(), + expected_ingest_mode, + expected_subset_id, + expected_subset_size), false)).WillOnce( + Return( + nullptr)); asapo::EventHeader event_header - {expected_id, expected_size, expected_name, expected_metadata, expected_subset_id, expected_subset_size}; + {expected_id, expected_size, expected_name, expected_metadata, expected_subset_id, expected_subset_size}; auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(nullptr)); @@ -300,27 +299,26 @@ TEST_F(ProducerImplTests, OKAddingSendMetaDataRequest) { expected_id = 0; expected_metadata = "{\"meta\":10}"; expected_size = expected_metadata.size(); - expected_ingest_mode = asapo::IngestModeFlags::kTransferData; + expected_ingest_mode = asapo::IngestModeFlags::kTransferData | asapo::IngestModeFlags::kStoreInDatabase ; producer.SetCredentials(expected_credentials); EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferMetaData, - expected_credentials_str, - "", - expected_id, - expected_size, - "beamtime_global.meta", - "", - expected_ingest_mode, - 10, - 10), false)).WillOnce(Return( - nullptr)); + expected_credentials_str, + "", + expected_id, + expected_size, + "beamtime_global.meta", + "", + expected_ingest_mode, + 10, + 10), false)).WillOnce(Return( + nullptr)); auto err = producer.SendMetaData(expected_metadata, nullptr); ASSERT_THAT(err, Eq(nullptr)); } - TEST_F(ProducerImplTests, ErrorSendingEmptyFileName) { producer.SetCredentials(expected_credentials); @@ -333,7 +331,6 @@ TEST_F(ProducerImplTests, ErrorSendingEmptyFileName) { } - TEST_F(ProducerImplTests, ErrorSendingEmptyRelativeFileName) { producer.SetCredentials(expected_credentials); @@ -346,21 +343,20 @@ TEST_F(ProducerImplTests, ErrorSendingEmptyRelativeFileName) { } - TEST_F(ProducerImplTests, OKSendingSendFileRequest) { producer.SetCredentials(expected_credentials); EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, - expected_credentials_str, - "", - expected_id, - 0, - expected_name, - asapo::kDefaultSubstream.c_str(), - expected_ingest_mode, - 0, - 0), false)).WillOnce(Return( - nullptr)); + expected_credentials_str, + "", + expected_id, + 0, + expected_name, + asapo::kDefaultSubstream.c_str(), + expected_ingest_mode, + 0, + 0), false)).WillOnce(Return( + nullptr)); asapo::EventHeader event_header{expected_id, 0, expected_name}; auto err = producer.SendFile(event_header, expected_fullpath, expected_ingest_mode, nullptr); @@ -372,16 +368,16 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequestWithSubstream) { producer.SetCredentials(expected_credentials); EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, - expected_credentials_str, - "", - expected_id, - 0, - expected_name, - expected_substream, - expected_ingest_mode, - 0, - 0), false)).WillOnce(Return( - nullptr)); + expected_credentials_str, + "", + expected_id, + 0, + expected_name, + expected_substream, + expected_ingest_mode, + 0, + 0), false)).WillOnce(Return( + nullptr)); asapo::EventHeader event_header{expected_id, 0, expected_name}; auto err = producer.SendFile(event_header, expected_substream, expected_fullpath, expected_ingest_mode, nullptr); @@ -391,7 +387,7 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequestWithSubstream) { TEST_F(ProducerImplTests, ErrorSettingBeamtime) { std::string long_str(asapo::kMaxMessageSize * 10, 'a'); - expected_credentials = asapo::SourceCredentials{asapo::SourceType::kRaw,long_str, "", "", ""}; + expected_credentials = asapo::SourceCredentials{asapo::SourceType::kRaw, long_str, "", "", ""}; EXPECT_CALL(mock_logger, Error(testing::HasSubstr("too long"))); auto err = producer.SetCredentials(expected_credentials); @@ -402,74 +398,73 @@ TEST_F(ProducerImplTests, ErrorSettingBeamtime) { TEST_F(ProducerImplTests, ErrorSettingSecondTime) { EXPECT_CALL(mock_logger, Error(testing::HasSubstr("already"))); - producer.SetCredentials(asapo::SourceCredentials{asapo::SourceType::kRaw,"1", "", "2", "3"}); - auto err = producer.SetCredentials(asapo::SourceCredentials{asapo::SourceType::kRaw,"4", "", "5", "6"}); + producer.SetCredentials(asapo::SourceCredentials{asapo::SourceType::kRaw, "1", "", "2", "3"}); + auto err = producer.SetCredentials(asapo::SourceCredentials{asapo::SourceType::kRaw, "4", "", "5", "6"}); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, ErrorSendingWrongIngestMode) { producer.SetCredentials(expected_credentials); - - EXPECT_CALL(mock_pull, AddRequest_t(_, _)).Times(0); - asapo::EventHeader event_header{expected_id, 0, expected_name}; - auto ingest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly | asapo::IngestModeFlags::kTransferData; - auto err = producer.SendFile(event_header, expected_fullpath, ingest_mode, nullptr); + uint64_t ingest_modes[] = {0, asapo::IngestModeFlags::kTransferMetaDataOnly | asapo::IngestModeFlags::kTransferData, + asapo::IngestModeFlags::kTransferData, + asapo::IngestModeFlags::kStoreInDatabase, + asapo::IngestModeFlags::kStoreInFilesystem, + asapo::IngestModeFlags::kStoreInDatabase | asapo::IngestModeFlags::kStoreInFilesystem, + asapo::IngestModeFlags::kTransferMetaDataOnly + | asapo::IngestModeFlags::kStoreInFilesystem}; - ingest_mode = 0; - auto err_null = producer.SendFile(event_header, expected_fullpath, ingest_mode, nullptr); + EXPECT_CALL(mock_pull, AddRequest_t(_, _)).Times(0); + for (auto ingest_mode : ingest_modes) { + auto err = producer.SendFile(event_header, expected_fullpath, ingest_mode, nullptr); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); + } - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); - ASSERT_THAT(err_null, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } - TEST_F(ProducerImplTests, GetQueueSize) { EXPECT_CALL(mock_pull, NRequestsInPool()).WillOnce(Return(10)); - auto size = producer.GetRequestsQueueSize(); + auto size = producer.GetRequestsQueueSize(); ASSERT_THAT(size, Eq(10)); } TEST_F(ProducerImplTests, WaitRequestsFinished) { EXPECT_CALL(mock_pull, WaitRequestsFinished_t(_)).WillOnce(Return( - asapo::IOErrorTemplates::kTimeout.Generate().release())); + asapo::IOErrorTemplates::kTimeout.Generate().release())); - auto err = producer.WaitRequestsFinished(100); + auto err = producer.WaitRequestsFinished(100); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); } - MATCHER_P3(M_CheckGetSubstreamInfoRequest, op_code, source_credentials, substream, "Checks if a valid GenericRequestHeader was Send") { auto request = static_cast<ProducerRequest*>(arg); return ((asapo::GenericRequestHeader) (arg->header)).op_code == op_code - && request->source_credentials == source_credentials - && strcmp(((asapo::GenericRequestHeader) (arg->header)).substream, substream) == 0; + && request->source_credentials == source_credentials + && strcmp(((asapo::GenericRequestHeader) (arg->header)).substream, substream) == 0; } - TEST_F(ProducerImplTests, GetStreamInfoMakesCorerctRequest) { producer.SetCredentials(expected_credentials); EXPECT_CALL(mock_pull, AddRequest_t(M_CheckGetSubstreamInfoRequest(asapo::kOpcodeStreamInfo, - expected_credentials_str, - expected_substream), true)).WillOnce( - Return(nullptr)); + expected_credentials_str, + expected_substream), true)).WillOnce( + Return(nullptr)); asapo::Error err; producer.GetStreamInfo(expected_substream, 1, &err); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); } - TEST(GetStreamInfoTest, GetStreamInfoTimeout) { asapo::ProducerImpl producer1{"", 1, 10, asapo::RequestHandlerType::kTcp}; asapo::Error err; - auto sinfo = producer1.GetStreamInfo(5, &err); + auto sinfo = producer1.GetStreamInfo(5, &err); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); ASSERT_THAT(err->Explain(), HasSubstr("opcode: 4")); @@ -487,5 +482,4 @@ TEST_F(ProducerImplTests, GetLastStreamMakesCorerctRequest) { ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); } - } diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index 1518fd2e4437ffb858e33f71fe25b3fa3377db8b..d0a53e6cbb9536d580f152200d3acc182c36fa7d 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -120,4 +120,5 @@ cdef extern from "asapo_producer.h" namespace "asapo": kTransferData kTransferMetaDataOnly kStoreInFilesystem + kStoreInDatabase diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 956818788d52da927a43c2c616f495543f9e52d8..3cd30bf122922e44d594271c11df1ce364f01e47 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -15,6 +15,7 @@ DEFAULT_INGEST_MODE = kDefaultIngestMode INGEST_MODE_TRANSFER_DATA = kTransferData INGEST_MODE_TRANSFER_METADATA_ONLY = kTransferMetaDataOnly INGEST_MODE_STORE_IN_FILESYSTEM = kStoreInFilesystem +INGEST_MODE_STORE_IN_DATABASE = kStoreInDatabase cdef extern from "numpy/ndarraytypes.h": void PyArray_ENABLEFLAGS(np.ndarray arr, int flags) diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp index a3334cbe2f0ef282aace6d2621894e61342863b0..ca978b50b35754d4ed9c4da7c7452da81022bea0 100644 --- a/receiver/src/receiver_config.cpp +++ b/receiver/src/receiver_config.cpp @@ -23,8 +23,6 @@ Error ReceiverConfigFactory::SetConfig(std::string file_name) { (err = parser.GetUInt64("ReceiveToDiskThresholdMB", &config.receive_to_disk_threshold_mb)) || (err = parser.Embedded("DataServer").GetUInt64("ListenPort", &config.dataserver.listen_port)) || (err = parser.Embedded("DataServer").GetUInt64("NThreads", &config.dataserver.nthreads)) || - (err = parser.GetBool("WriteToDisk", &config.write_to_disk)) || - (err = parser.GetBool("WriteToDb", &config.write_to_db)) || (err = parser.Embedded("DataCache").GetBool("Use", &config.use_datacache)) || (err = parser.Embedded("DataCache").GetUInt64("SizeGB", &config.datacache_size_gb)) || (err = parser.Embedded("DataCache").GetUInt64("ReservedShare", &config.datacache_reserved_share)) || diff --git a/receiver/src/receiver_config.h b/receiver/src/receiver_config.h index e82b04258941ba8764caadcdc74e7fd99f4b1e02..2d673db43fd04f898d87cc4606f731953113cbfc 100644 --- a/receiver/src/receiver_config.h +++ b/receiver/src/receiver_config.h @@ -15,8 +15,6 @@ struct ReceiverConfig { uint64_t listen_port = 0; std::string authorization_server; uint64_t authorization_interval_ms = 0; - bool write_to_disk = false; - bool write_to_db = false; bool use_datacache = true; uint64_t datacache_size_gb = 0; uint64_t datacache_reserved_share = 0; diff --git a/receiver/src/request_handler/request_factory.cpp b/receiver/src/request_handler/request_factory.cpp index 7e1d3a4c5f61dc61d18a41bf6ed68e11b361847b..c3372ca1305a5bacab4068cac24b5899b69e87b0 100644 --- a/receiver/src/request_handler/request_factory.cpp +++ b/receiver/src/request_handler/request_factory.cpp @@ -5,12 +5,12 @@ namespace asapo { bool NeedFileWriteHandler(const GenericRequestHeader &request_header) { - return GetReceiverConfig()->write_to_disk && - (request_header.custom_data[kPosIngestMode] & IngestModeFlags::kStoreInFilesystem); + return request_header.custom_data[kPosIngestMode] & IngestModeFlags::kStoreInFilesystem; } bool NeedDbHandler(const GenericRequestHeader &request_header) { - return GetReceiverConfig()->write_to_db; + return (request_header.custom_data[kPosIngestMode] & IngestModeFlags::kStoreInDatabase) || + (request_header.custom_data[kPosIngestMode] == asapo::IngestModeFlags::kTransferMetaDataOnly); } bool RequestFactory::ReceiveDirectToFile(const GenericRequestHeader &request_header) const { @@ -37,9 +37,6 @@ void RequestFactory::AddReceiveViaBufferHandlers(std::unique_ptr<Request> &reque Error RequestFactory::AddReceiveDirectToFileHandler(std::unique_ptr<Request> &request, const GenericRequestHeader &request_header) const { - if (!GetReceiverConfig()->write_to_disk) { - return ReceiverErrorTemplates::kInternalServerError.Generate("reciever does not support writing to disk"); - } if (!(request_header.custom_data[kPosIngestMode] & kStoreInFilesystem)) { return ReceiverErrorTemplates::kBadRequest.Generate( "ingest mode should include kStoreInFilesystem for large files "); diff --git a/receiver/unittests/mock_receiver_config.cpp b/receiver/unittests/mock_receiver_config.cpp index e84d07cedde38fb560959808f24f89b9adf3d233..9200f61e0cbc1b70e13aa87b1f4071b674175da5 100644 --- a/receiver/unittests/mock_receiver_config.cpp +++ b/receiver/unittests/mock_receiver_config.cpp @@ -72,8 +72,6 @@ Error SetReceiverConfig (const ReceiverConfig& config, std::string error_field) config_string += "," + Key("ReceiveToDiskThresholdMB", error_field) + std::to_string(config.receive_to_disk_threshold_mb); config_string += "," + Key("AuthorizationServer", error_field) + "\"" + config.authorization_server + "\""; - config_string += "," + Key("WriteToDisk", error_field) + (config.write_to_disk ? "true" : "false"); - config_string += "," + Key("WriteToDb", error_field) + (config.write_to_db ? "true" : "false"); config_string += "," + Key("LogLevel", error_field) + "\"" + log_level + "\""; config_string += "," + Key("Tag", error_field) + "\"" + config.tag + "\""; config_string += "}"; diff --git a/receiver/unittests/request_handler/test_request_factory.cpp b/receiver/unittests/request_handler/test_request_factory.cpp index e1cb5b8aef43cf015c5448f8351cc92d894f58e9..aa650afcee60aa6d2c41103af5cd20534ac94e3c 100644 --- a/receiver/unittests/request_handler/test_request_factory.cpp +++ b/receiver/unittests/request_handler/test_request_factory.cpp @@ -66,8 +66,6 @@ class FactoryTests : public Test { void SetUp() override { generic_request_header.op_code = asapo::Opcode::kOpcodeTransferData; generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; - config.write_to_disk = true; - config.write_to_db = true; SetReceiverConfig(config, "none"); } void TearDown() override { @@ -126,30 +124,16 @@ TEST_F(FactoryTests, ReturnsDataRequestForAuthorizationCode) { ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerAuthorize*>(request->GetListHandlers()[0]), Ne(nullptr)); } - -TEST_F(FactoryTests, DoNotAddDiskWriterIfNotWantedInConfig) { - config.write_to_disk = false; - - SetReceiverConfig(config, "none"); - - auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); - ASSERT_THAT(err, Eq(nullptr)); - ASSERT_THAT(request->GetListHandlers().size(), Eq(4)); - ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerAuthorize*>(request->GetListHandlers()[0]), Ne(nullptr)); - ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers().back()), Ne(nullptr)); -} - -TEST_F(FactoryTests, DoNotAddDiskWriterIfNotWantedInRequest) { +TEST_F(FactoryTests, DoNotAddDiskAndDbWriterIfNotWantedInRequest) { generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::IngestModeFlags::kTransferData; auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); ASSERT_THAT(err, Eq(nullptr)); - ASSERT_THAT(request->GetListHandlers().size(), Eq(4)); + ASSERT_THAT(request->GetListHandlers().size(), Eq(3)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerAuthorize*>(request->GetListHandlers()[0]), Ne(nullptr)); - ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers().back()), Ne(nullptr)); } TEST_F(FactoryTests, DoNotAddDbWriterIfNotWanted) { - config.write_to_db = false; + generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::IngestModeFlags::kTransferData | asapo::IngestModeFlags::kStoreInFilesystem; SetReceiverConfig(config, "none"); @@ -171,44 +155,30 @@ TEST_F(FactoryTests, CachePassedToRequest) { } -TEST_F(FactoryTests, ReturnsMetaDataRequestOnkOpcodeTransferMetaData) { - generic_request_header.op_code = asapo::Opcode::kOpcodeTransferMetaData; +TEST_F(FactoryTests, ReturnsMetaDataRequestOnTransferMetaDataOnly) { + generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::IngestModeFlags::kTransferMetaDataOnly; auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(dynamic_cast<asapo::Request*>(request.get()), Ne(nullptr)); - ASSERT_THAT(dynamic_cast<asapo::Request*>(request->cache__), Eq(nullptr)); + ASSERT_THAT(request->GetListHandlers().size(), Eq(4)); ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerAuthorize*>(request->GetListHandlers()[0]), Ne(nullptr)); - ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveData*>(request->GetListHandlers()[1]), Ne(nullptr)); - ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbMetaWrite*>(request->GetListHandlers().back()), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveMetaData*>(request->GetListHandlers()[1]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveData*>(request->GetListHandlers()[2]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers()[3]), Ne(nullptr)); } - -TEST_F(FactoryTests, DonNotGenerateMetadataRequestIfNoDbConfigured) { - config.write_to_db = false; - - SetReceiverConfig(config, "none"); - - +TEST_F(FactoryTests, ReturnsMetaDataRequestOnkOpcodeTransferMetaData) { + generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::IngestModeFlags::kTransferData | asapo::IngestModeFlags::kStoreInDatabase; generic_request_header.op_code = asapo::Opcode::kOpcodeTransferMetaData; auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); - ASSERT_THAT(err, Eq(asapo::ReceiverErrorTemplates::kInternalServerError)); -} - - -TEST_F(FactoryTests, DonNotGenerateRequestIfWriteToDiskNotActive) { - config.write_to_disk = false; - config.receive_to_disk_threshold_mb = 0; - - SetReceiverConfig(config, "none"); - - generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; - generic_request_header.data_size = 1; - - auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); - - ASSERT_THAT(err, Eq(asapo::ReceiverErrorTemplates::kInternalServerError)); + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(dynamic_cast<asapo::Request*>(request.get()), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<asapo::Request*>(request->cache__), Eq(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerAuthorize*>(request->GetListHandlers()[0]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveData*>(request->GetListHandlers()[1]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbMetaWrite*>(request->GetListHandlers().back()), Ne(nullptr)); } TEST_F(FactoryTests, DonNotGenerateRequestIfIngestModeIsWrong) { diff --git a/receiver/unittests/test_config.cpp b/receiver/unittests/test_config.cpp index 3a3c86451f06f9e15b623dfe613d2d9b874930ae..15072bf2d012eb01e4e9ed26c558db2a333098a9 100644 --- a/receiver/unittests/test_config.cpp +++ b/receiver/unittests/test_config.cpp @@ -48,8 +48,6 @@ class ConfigTests : public Test { test_config.tag = "receiver1"; test_config.performance_db_name = "db_test"; test_config.performance_db_uri = "localhost:8086"; - test_config.write_to_disk = true; - test_config.write_to_db = true; test_config.database_uri = "localhost:27017"; test_config.log_level = asapo::LogLevel::Error; test_config.authorization_interval_ms = 10000; @@ -83,8 +81,6 @@ TEST_F(ConfigTests, ReadSettings) { ASSERT_THAT(config->listen_port, Eq(4200)); ASSERT_THAT(config->authorization_interval_ms, Eq(10000)); ASSERT_THAT(config->authorization_server, Eq("AuthorizationServer/aa")); - ASSERT_THAT(config->write_to_disk, Eq(true)); - ASSERT_THAT(config->write_to_db, Eq(true)); ASSERT_THAT(config->log_level, Eq(asapo::LogLevel::Error)); ASSERT_THAT(config->tag, Eq("receiver1")); ASSERT_THAT(config->use_datacache, Eq(false)); @@ -105,8 +101,8 @@ TEST_F(ConfigTests, ReadSettings) { TEST_F(ConfigTests, ErrorReadSettings) { PrepareConfig(); - std::vector<std::string>fields {"PerformanceDbServer", "ListenPort", "DataServer", "ListenPort", "WriteToDisk", - "WriteToDb", "DataCache", "Use", "SizeGB", "ReservedShare", "DatabaseServer", "Tag", + std::vector<std::string>fields {"PerformanceDbServer", "ListenPort", "DataServer", "ListenPort", + "DataCache", "Use", "SizeGB", "ReservedShare", "DatabaseServer", "Tag", "AuthorizationServer", "AuthorizationInterval", "PerformanceDbName", "LogLevel", "NThreads", "DiscoveryServer", "AdvertiseURI", "NetworkMode", "ReceiveToDiskThresholdMB"}; diff --git a/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/CMakeLists.txt b/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/CMakeLists.txt index ec3f269413aaa944deca4abb18dbe3a1e789c672..682fdc6a2196ab447e5f29282a4b98ebdbac6116 100644 --- a/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/CMakeLists.txt +++ b/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/CMakeLists.txt @@ -3,7 +3,6 @@ set(TARGET_NAME full_chain_simple_chain_withdata_fromcache) ################################ # Testing ################################ -set(RECEIVER_WRITE_TO_DISK false) prepare_asapo() if (UNIX) set (ROOT_PATH "/tmp/asapo/") diff --git a/tests/automatic/high_avail/broker_mongo_restart/CMakeLists.txt b/tests/automatic/high_avail/broker_mongo_restart/CMakeLists.txt index 244d2522065c3899e612801b4eaddc32b93491f4..826d9d19a9409532bcb194a3e8899501f11b8091 100644 --- a/tests/automatic/high_avail/broker_mongo_restart/CMakeLists.txt +++ b/tests/automatic/high_avail/broker_mongo_restart/CMakeLists.txt @@ -3,6 +3,5 @@ set(TARGET_NAME broker_mongo_restart) ################################ # Testing ################################ -set(RECEIVER_WRITE_TO_DISK false) prepare_asapo() add_script_test("${TARGET_NAME}-tcp" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:getnext_broker> $<TARGET_PROPERTY:asapo,EXENAME> tcp" nomem) diff --git a/tests/automatic/high_avail/services_restart/CMakeLists.txt b/tests/automatic/high_avail/services_restart/CMakeLists.txt index b62a7b6f95a462ad2a2d4ab4c384907e21436892..25a2b009764f40a11a6b5745f054ec86bf240b3f 100644 --- a/tests/automatic/high_avail/services_restart/CMakeLists.txt +++ b/tests/automatic/high_avail/services_restart/CMakeLists.txt @@ -3,7 +3,6 @@ set(TARGET_NAME service_restart) ################################ # Testing ################################ -set(RECEIVER_WRITE_TO_DISK false) prepare_asapo() add_script_test("${TARGET_NAME}-all-tcp" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:getnext_broker> $<TARGET_PROPERTY:asapo,EXENAME> broker 1000 998 tcp" nomem) add_script_test("${TARGET_NAME}-all-but-broker-tcp" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:getnext_broker> $<TARGET_PROPERTY:asapo,EXENAME> receiver 1000 1000 tcp" nomem) diff --git a/tests/automatic/producer/python_api/check_linux.sh b/tests/automatic/producer/python_api/check_linux.sh index 3916aa6f0fd94a8708884d122e2f0c013b59422d..b4fea561aa03591d754d711cb942b408a1fb0691 100644 --- a/tests/automatic/producer/python_api/check_linux.sh +++ b/tests/automatic/producer/python_api/check_linux.sh @@ -41,7 +41,7 @@ sleep 1 $1 $3 $stream $beamtime_id "127.0.0.1:8400" &> out || cat out cat out -cat out | grep "successfuly sent" | wc -l | grep 12 +cat out | grep "successfuly sent" | wc -l | grep 13 cat out | grep "local i/o error" cat out | grep "already have record with same id" | wc -l | grep 4 cat out | grep "duplicate" | wc -l | grep 4 diff --git a/tests/automatic/producer/python_api/check_windows.bat b/tests/automatic/producer/python_api/check_windows.bat index d608dab96eb6756f3d525af82f16b61f0e87bb1e..a0a3537efdbfcb082e9b5dcd403e421a91214c28 100644 --- a/tests/automatic/producer/python_api/check_windows.bat +++ b/tests/automatic/producer/python_api/check_windows.bat @@ -22,7 +22,7 @@ set PYTHONPATH=%2 type out set NUM=0 for /F %%N in ('find /C "successfuly sent" ^< "out"') do set NUM=%%N -echo %NUM% | findstr 12 || goto error +echo %NUM% | findstr 13 || goto error for /F %%N in ('find /C "} wrong input: Bad request: already have record with same id" ^< "out"') do set NUM=%%N echo %NUM% | findstr 2 || goto error diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index 0578283dfb085e2b5fbc3690eaa1352b9cae7f57..460a03ed12f6628d61460907106ff45c9f20dde1 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -120,6 +120,11 @@ producer.send_file(1, local_path="./file1", exposed_path="processed/" + stream + producer.send_data(6, "processed/" + stream + "/" + "file8", None, ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback=callback) +# send same id without writing to database, should success +producer.send_file(1, local_path="./file1", exposed_path="processed/" + stream + "/" + "file18", + user_meta='{"test_key1":"test_val"}', + ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_DATA | asapo_producer.INGEST_MODE_STORE_IN_FILESYSTEM,callback=callback) + producer.wait_requests_finished(50000) n = producer.get_requests_queue_size() assert_eq(n, 0, "requests in queue") diff --git a/tests/automatic/settings/receiver_fabric.json.tpl.lin.in b/tests/automatic/settings/receiver_fabric.json.tpl.lin.in index 3c7ec5ef1e2e0b8d9353e5c475af2a2f39ab8e03..2138a3296e52ba3c6dd355938c021eb05edbbbf6 100644 --- a/tests/automatic/settings/receiver_fabric.json.tpl.lin.in +++ b/tests/automatic/settings/receiver_fabric.json.tpl.lin.in @@ -18,8 +18,6 @@ "AuthorizationInterval": 1000, "ListenPort": {{ env "NOMAD_PORT_recv" }}, "Tag": "{{ env "NOMAD_ADDR_recv" }}", - "WriteToDisk": @RECEIVER_WRITE_TO_DISK@, "ReceiveToDiskThresholdMB":50, - "WriteToDb": true, "LogLevel" : "debug" } diff --git a/tests/automatic/settings/receiver_tcp.json.tpl.lin.in b/tests/automatic/settings/receiver_tcp.json.tpl.lin.in index 4414e4692d60a7f9180b0781f73a8889d9215b33..6061a8a9fd8b7438790b209e8d77c756147a629c 100644 --- a/tests/automatic/settings/receiver_tcp.json.tpl.lin.in +++ b/tests/automatic/settings/receiver_tcp.json.tpl.lin.in @@ -18,8 +18,6 @@ "AuthorizationInterval": 1000, "ListenPort": {{ env "NOMAD_PORT_recv" }}, "Tag": "{{ env "NOMAD_ADDR_recv" }}", - "WriteToDisk": @RECEIVER_WRITE_TO_DISK@, "ReceiveToDiskThresholdMB":50, - "WriteToDb": true, "LogLevel" : "debug" } diff --git a/tests/automatic/settings/receiver_tcp.json.tpl.win.in b/tests/automatic/settings/receiver_tcp.json.tpl.win.in index b2afbf0d4a22b17512ff7e7e09ad70c001fe29aa..f96debf98172ac42fd4ce2854d3d7a5c265b3873 100644 --- a/tests/automatic/settings/receiver_tcp.json.tpl.win.in +++ b/tests/automatic/settings/receiver_tcp.json.tpl.win.in @@ -18,8 +18,6 @@ "ReservedShare": 10 }, "Tag": "{{ env "NOMAD_ADDR_recv" }}", - "WriteToDisk": @RECEIVER_WRITE_TO_DISK@, "ReceiveToDiskThresholdMB":50, - "WriteToDb": true, "LogLevel" : "debug" } diff --git a/tests/manual/broker_debug_local/receiver.json b/tests/manual/broker_debug_local/receiver.json index 62c11ec5a306342dd8996a409bc9e64d91f35365..8a358c98cbb01faf364b65eb56942638c3c57133 100644 --- a/tests/manual/broker_debug_local/receiver.json +++ b/tests/manual/broker_debug_local/receiver.json @@ -18,8 +18,6 @@ "AuthorizationInterval": 10000, "ListenPort": 22001, "Tag": "22001", - "WriteToDisk": true, "ReceiveToDiskThresholdMB":50, - "WriteToDb": true, "LogLevel" : "debug" } diff --git a/tests/manual/broker_debug_local/receiver.json.tpl b/tests/manual/broker_debug_local/receiver.json.tpl index 234e78b51f86e603e6bf5b9c7e5b184d16f4537c..02e0441d517806b27fbd6ed906cf97519188cdba 100644 --- a/tests/manual/broker_debug_local/receiver.json.tpl +++ b/tests/manual/broker_debug_local/receiver.json.tpl @@ -18,8 +18,6 @@ "AuthorizationInterval": 10000, "ListenPort": {{ env "NOMAD_PORT_recv" }}, "Tag": "{{ env "NOMAD_ADDR_recv" }}", - "WriteToDisk": true, "ReceiveToDiskThresholdMB":50, - "WriteToDb": true, "LogLevel" : "debug" } diff --git a/tests/manual/performance_full_chain_simple/receiver.json b/tests/manual/performance_full_chain_simple/receiver.json index 476958c50d503934b42d71e49055465e4fdd85a7..3b4bfd1a0223e5040fe36d04d4c07f3378b24899 100644 --- a/tests/manual/performance_full_chain_simple/receiver.json +++ b/tests/manual/performance_full_chain_simple/receiver.json @@ -15,9 +15,7 @@ "SizeGB": 1, "ReservedShare": 10 }, - "WriteToDisk":true, "ReceiveToDiskThresholdMB":50, - "WriteToDb":true, "LogLevel":"info", "Tag": "test_receiver" } diff --git a/tests/manual/performance_producer_receiver/receiver.json b/tests/manual/performance_producer_receiver/receiver.json index f4871c8325730d3dcfd5c0edc65090297814c7c3..485e80876e9e42b618d8ff4387bcc9899ba6cfb4 100644 --- a/tests/manual/performance_producer_receiver/receiver.json +++ b/tests/manual/performance_producer_receiver/receiver.json @@ -15,9 +15,7 @@ "SizeGB": 1, "ReservedShare": 10 }, - "WriteToDisk":true, "ReceiveToDiskThresholdMB":50, - "WriteToDb":true, "LogLevel":"info", "Tag": "test_receiver" } diff --git a/tests/manual/performance_producer_receiver/test.sh b/tests/manual/performance_producer_receiver/test.sh index aefb44b8f17776dccdbfe1acbfd8a63764620ffe..030022133d7caaecd11e3d1668767a9724542bda 100755 --- a/tests/manual/performance_producer_receiver/test.sh +++ b/tests/manual/performance_producer_receiver/test.sh @@ -58,9 +58,6 @@ cat receiver.json | then . + {value:\"${monitor_node}:${monitor_port}\"} elif .key == \"ListenPort\" then . + {value:${receiver_port}} - elif .key == \"WriteToDisk\" - then . + {value:$1} - else . end ) | from_entries" > receiver_tmp.json diff --git a/tests/manual/python_tests/producer/receiver.json.tpl b/tests/manual/python_tests/producer/receiver.json.tpl index ec637c58473d65b47548df1266e65e0185bcbee1..93539e5f794765ab6e40d2a589c3cd0d402a4569 100644 --- a/tests/manual/python_tests/producer/receiver.json.tpl +++ b/tests/manual/python_tests/producer/receiver.json.tpl @@ -18,8 +18,6 @@ "AuthorizationInterval": 10000, "ListenPort": {{ env "NOMAD_PORT_recv" }}, "Tag": "{{ env "NOMAD_ADDR_recv" }}", - "WriteToDisk": true, "ReceiveToDiskThresholdMB":50, - "WriteToDb": true, "LogLevel" : "debug" } diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/receiver.json.tpl b/tests/manual/python_tests/producer_wait_bug_mongo/receiver.json.tpl index ec637c58473d65b47548df1266e65e0185bcbee1..93539e5f794765ab6e40d2a589c3cd0d402a4569 100644 --- a/tests/manual/python_tests/producer_wait_bug_mongo/receiver.json.tpl +++ b/tests/manual/python_tests/producer_wait_bug_mongo/receiver.json.tpl @@ -18,8 +18,6 @@ "AuthorizationInterval": 10000, "ListenPort": {{ env "NOMAD_PORT_recv" }}, "Tag": "{{ env "NOMAD_ADDR_recv" }}", - "WriteToDisk": true, "ReceiveToDiskThresholdMB":50, - "WriteToDb": true, "LogLevel" : "debug" } diff --git a/tests/manual/receiver_debug_local/receiver.json b/tests/manual/receiver_debug_local/receiver.json index 62c11ec5a306342dd8996a409bc9e64d91f35365..8a358c98cbb01faf364b65eb56942638c3c57133 100644 --- a/tests/manual/receiver_debug_local/receiver.json +++ b/tests/manual/receiver_debug_local/receiver.json @@ -18,8 +18,6 @@ "AuthorizationInterval": 10000, "ListenPort": 22001, "Tag": "22001", - "WriteToDisk": true, "ReceiveToDiskThresholdMB":50, - "WriteToDb": true, "LogLevel" : "debug" }