diff --git a/CHANGELOG.md b/CHANGELOG.md index c56a4a3309a2b283ad7cb4fa93160ed9029c7920..95d828b264d677a10a9ad1a921f399c2963bb8a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ FEATURES * Consumer API: Get last within consumer group returns message only once - +* Producer API: An option to write raw data to core filesystem directly ## 21.09.0 diff --git a/CMakeLists.txt b/CMakeLists.txt index b65a84e1e4c9d3f50a75dbae7e5709f682748138..5d614a59e25475c6a575c45be24b2611d6677ba5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,12 +5,12 @@ set(CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/CMakeModules/ ${PROJECT_SOURCE_DIR}/ #protocol version changes if one of the microservice API's change set (ASAPO_CONSUMER_PROTOCOL "v0.5") -set (ASAPO_PRODUCER_PROTOCOL "v0.4") +set (ASAPO_PRODUCER_PROTOCOL "v0.5") set (ASAPO_DISCOVERY_API_VER "v0.1") set (ASAPO_AUTHORIZER_API_VER "v0.2") set (ASAPO_BROKER_API_VER "v0.5") set (ASAPO_FILE_TRANSFER_SERVICE_API_VER "v0.2") -set (ASAPO_RECEIVER_API_VER "v0.4") +set (ASAPO_RECEIVER_API_VER "v0.5") set (ASAPO_RDS_API_VER "v0.1") set (DB_SCHEMA_VER "v0.1") diff --git a/PROTOCOL-VERSIONS.md b/PROTOCOL-VERSIONS.md index f91b0d2fb04d1393bab62174ebaad43ff84561ab..102219d4c772e56f6059915b94c458f4b8adc16e 100644 --- a/PROTOCOL-VERSIONS.md +++ b/PROTOCOL-VERSIONS.md @@ -1,6 +1,7 @@ ### Producer Protocol | Release | used by client | Supported by server | Status | | ------------ | ------------------- | -------------------- | ---------------- | +| v0.5 | | | In development | | v0.4 | 21.09.0 - 21.09.0 | 21.09.0 - 21.09.0 | Current version | | v0.3 | 21.06.0 - 21.06.0 | 21.06.0 - 21.09.0 | Deprecates from 01.09.2022 | | v0.2 | 21.03.2 - 21.03.2 | 21.03.2 - 21.09.0 | Deprecates from 01.07.2022 | diff --git a/VERSIONS.md b/VERSIONS.md index 438a72968942edef808eba10d776fe10aa0df84a..e857bfb66bf5b6580a0477c4e623f6b048fd1248 100644 --- a/VERSIONS.md +++ b/VERSIONS.md @@ -2,7 +2,7 @@ | Release | API changed\*\* | Protocol | Supported by server from/to | Status |Comment| | ------------ | ----------- | -------- | ------------------------- | --------------------- | ------- | -| 21.12.0 | No | v0.4 | 21.12.0/21.12.0 | in development | | +| 21.12.0 | No | v0.5 | 21.12.0/21.12.0 | in development | | | 21.09.0 | No | v0.4 | 21.09.0/21.09.0 | current version |beamline token for raw | | 21.06.0 | Yes | v0.3 | 21.06.0/21.09.0 | deprecates 01.09.2022 |arbitrary characters| | 21.03.3 | No | v0.2 | 21.03.2/21.09.0 | deprecates 01.07.2022 |bugfix in server| diff --git a/authorizer/src/asapo_authorizer/server/authorize.go b/authorizer/src/asapo_authorizer/server/authorize.go index 1afe3d9da5fb11154a68c6bd0896b1601aaa8e7e..1b984a9c456a3f56e4d4793b3cecdaf9ffcc27eb 100644 --- a/authorizer/src/asapo_authorizer/server/authorize.go +++ b/authorizer/src/asapo_authorizer/server/authorize.go @@ -180,7 +180,7 @@ func alwaysAllowed(creds SourceCredentials) (common.BeamtimeMeta, bool) { if pair.BeamtimeId == creds.BeamtimeId { pair.DataSource = creds.DataSource pair.Type = creds.Type - pair.AccessTypes = []string{"read", "write"} + pair.AccessTypes = []string{"read", "write","writeraw"} return pair, true } } diff --git a/common/cpp/include/asapo/common/data_structs.h b/common/cpp/include/asapo/common/data_structs.h index 1460af10bd07e7bf4abe0343988311e3cef53a95..e2d9d56fb275a2ca25e3c07ba76316a7a9905e93 100644 --- a/common/cpp/include/asapo/common/data_structs.h +++ b/common/cpp/include/asapo/common/data_structs.h @@ -144,6 +144,7 @@ enum IngestModeFlags : uint64_t { kTransferMetaDataOnly = 1 << 1, kStoreInFilesystem = 1 << 2, kStoreInDatabase = 1 << 3, + kWriteRawDataToOffline = 1 << 4, }; const uint64_t kDefaultIngestMode = kTransferData | kStoreInFilesystem | kStoreInDatabase; diff --git a/discovery/src/asapo_discovery/protocols/hard_coded_producer.go b/discovery/src/asapo_discovery/protocols/hard_coded_producer.go index 9ef311c1261095631feb7996fc68f06b38a66181..3acd3b625626a4a1eb3c8ca503796949dbca243c 100644 --- a/discovery/src/asapo_discovery/protocols/hard_coded_producer.go +++ b/discovery/src/asapo_discovery/protocols/hard_coded_producer.go @@ -2,11 +2,16 @@ package protocols func GetSupportedProducerProtocols() []Protocol { return []Protocol{ + Protocol{"v0.5", + map[string]string{ + "Discovery": "v0.1", + "Receiver": "v0.5", + }, &protocolValidatorCurrent{}}, Protocol{"v0.4", map[string]string{ "Discovery": "v0.1", "Receiver": "v0.4", - }, &protocolValidatorCurrent{}}, + }, &protocolValidatorDeprecated{getTimefromDate("2022-12-01")}}, Protocol{"v0.3", map[string]string{ "Discovery": "v0.1", diff --git a/discovery/src/asapo_discovery/protocols/protocol_test.go b/discovery/src/asapo_discovery/protocols/protocol_test.go index f7933b7a848af1257f9ae8c0073c8047093fdb2e..4a419ca810c5dd20acb0d5ea157ec81aa7caea94 100644 --- a/discovery/src/asapo_discovery/protocols/protocol_test.go +++ b/discovery/src/asapo_discovery/protocols/protocol_test.go @@ -25,7 +25,8 @@ var protocolTests = []protocolTest{ // producer - {"producer", "v0.4", true, "current", "v0.4"}, + {"producer", "v0.5", true, "current", "v0.5"}, + {"producer", "v0.4", true, "deprecates", "v0.4"}, {"producer", "v0.3", true, "deprecates", "v0.3"}, {"producer", "v0.2", true, "deprecates", "v0.2"}, {"producer", "v0.1", true, "deprecates", "v0.1"}, diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index 8c0fda47f6e3651b6508047ce0fe6e13713e6c9f..c307cbf1d64e3a707018ca308b7d88b1e4383e3c 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -606,7 +606,7 @@ TEST_F(ProducerImplTests, GetVersionInfoWithServer) { R"({"softwareVersion":"21.06.0, build 7a9294ad","clientSupported":"no", "clientProtocol":{"versionInfo":"v0.4"}})"; EXPECT_CALL(*mock_http_client, Get_t(HasSubstr(expected_server_uri + - "/asapo-discovery/v0.1/version?client=producer&protocol=v0.4"), _, _)).WillOnce(DoAll( + "/asapo-discovery/v0.1/version?client=producer&protocol=v0.5"), _, _)).WillOnce(DoAll( SetArgPointee<1>(asapo::HttpCode::OK), SetArgPointee<2>(nullptr), Return(result))); diff --git a/producer/api/cpp/unittests/test_producer_request.cpp b/producer/api/cpp/unittests/test_producer_request.cpp index 7a48b685d332754b6c4bee9c99aefabd14b5a794..ae0e560cfa20250982c9cadd2de151a23ed33950 100644 --- a/producer/api/cpp/unittests/test_producer_request.cpp +++ b/producer/api/cpp/unittests/test_producer_request.cpp @@ -40,7 +40,7 @@ TEST(ProducerRequest, Constructor) { uint64_t expected_file_size = 1337; uint64_t expected_meta_size = 137; std::string expected_meta = "meta"; - std::string expected_api_version = "v0.4"; + std::string expected_api_version = "v0.5"; asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, diff --git a/producer/api/cpp/unittests/test_receiver_discovery_service.cpp b/producer/api/cpp/unittests/test_receiver_discovery_service.cpp index c4103b8df771b76e27368df658a8678da3514541..9b8be27b06e82e02ec4f05a62d1dd81ee195dc6c 100644 --- a/producer/api/cpp/unittests/test_receiver_discovery_service.cpp +++ b/producer/api/cpp/unittests/test_receiver_discovery_service.cpp @@ -48,7 +48,7 @@ class ReceiversStatusTests : public Test { NiceMock<asapo::MockLogger> mock_logger; NiceMock<MockHttpClient>* mock_http_client; - std::string expected_endpoint{"endpoint/asapo-discovery/v0.1/asapo-receiver?protocol=v0.4"}; + std::string expected_endpoint{"endpoint/asapo-discovery/v0.1/asapo-receiver?protocol=v0.5"}; ReceiverDiscoveryService status{"endpoint", 20}; void SetUp() override { diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index fc762012bc9d27f6974625d37184df6e5528e6c6..0ac9cf4ccc1d2e5b6d8952e8ed56f84ccd5c3909 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -133,4 +133,5 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo": kTransferMetaDataOnly kStoreInFilesystem kStoreInDatabase + kWriteRawDataToOffline diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 0f236b9888173fd6a06c1b29258f554ebb043fd0..1a02131818ade7a0ff13908c9cc3a6dbbdbcbb5f 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -17,6 +17,7 @@ INGEST_MODE_TRANSFER_DATA = kTransferData INGEST_MODE_TRANSFER_METADATA_ONLY = kTransferMetaDataOnly INGEST_MODE_STORE_IN_FILESYSTEM = kStoreInFilesystem INGEST_MODE_STORE_IN_DATABASE = kStoreInDatabase +INGEST_MODE_WRITE_RAW_DATA_TO_OFFLINE_FS = kWriteRawDataToOffline cdef extern from "numpy/ndarraytypes.h": void PyArray_ENABLEFLAGS(np.ndarray arr, int flags) diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index af41e773a067d3b546a3c471cb2d4062f2fc6912..836a04f9b650155c7c41b7620838fda23f783ea1 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -135,13 +135,4 @@ set(TEST_SOURCE_FILES_RDS unittests/receiver_data_server/request_handler/test_request_handler_factory.cpp unittests/receiver_data_server/request_handler/test_request_handler.cpp ) -gtest(${TARGET_NAME}_RDS "${TEST_SOURCE_FILES_RDS}" "${TEST_LIBRARIES}" - ${CMAKE_CURRENT_SOURCE_DIR}/src/*.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/src/*.h - ${CMAKE_CURRENT_SOURCE_DIR}/src/statistics/*.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/src/statistics/*.h - ${CMAKE_CURRENT_SOURCE_DIR}/src/request_handler/*.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/src/request_handler/*.h - ${CMAKE_CURRENT_SOURCE_DIR}/src/file_processors/*.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/src/file_processors/*.h - ) +gtest(${TARGET_NAME}_RDS "${TEST_SOURCE_FILES_RDS}" "${TEST_LIBRARIES}" nocov) diff --git a/receiver/src/request_handler/file_processors/file_processor.cpp b/receiver/src/request_handler/file_processors/file_processor.cpp index 52dd708c236de585ceddde0af0e17d6919425dae..62c33ab7ccd74f471be0bf05c3159af37cb528c3 100644 --- a/receiver/src/request_handler/file_processors/file_processor.cpp +++ b/receiver/src/request_handler/file_processors/file_processor.cpp @@ -1,49 +1,67 @@ #include "file_processor.h" +#include <tuple> + #include "asapo/io/io_factory.h" #include "../../receiver_logger.h" #include "../../request.h" namespace asapo { -FileProcessor::FileProcessor(): io__{GenerateDefaultIO()}, log__{GetDefaultReceiverLogger()} { +FileProcessor::FileProcessor() : io__{GenerateDefaultIO()}, log__{GetDefaultReceiverLogger()} { } -Error GetRootFolder(const Request* request, std::string* root_folder) { - std::string root; - auto fname = request->GetFileName(); - auto pos = fname.find(asapo::kPathSeparator); - if (pos == std::string::npos) { - return ReceiverErrorTemplates::kBadRequest.Generate("cannot extract root folder from file path " + fname); - } - +Error CheckFileNameIsAbsolute(const std::string &fname) { auto posr = fname.find(".."); if (posr != std::string::npos) { return ReceiverErrorTemplates::kBadRequest.Generate("cannot use relative path in path name " + fname); } + return nullptr; +} +Error CheckFileFolderCorrespondsToRequestSourceType(const Request *request, const std::string &fname) { + auto pos = fname.find(asapo::kPathSeparator); + if (pos == std::string::npos) { + return ReceiverErrorTemplates::kBadRequest.Generate("cannot extract root folder from file path " + fname); + } std::string file_folder = fname.substr(0, pos); auto folder_by_type = GetStringFromSourceType(request->GetSourceType()); if (file_folder != folder_by_type) { - return ReceiverErrorTemplates::kBadRequest.Generate("file " + fname + " is not in " + folder_by_type + " folder"); + return ReceiverErrorTemplates::kBadRequest.Generate( + "file " + fname + " is not in " + folder_by_type + " folder"); } + return nullptr; +} - switch (request->GetSourceType()) { - case SourceType::kProcessed: - root = request->GetOfflinePath(); - break; - case SourceType::kRaw: - root = request->GetOnlinePath(); - if (root.empty()) { - return ReceiverErrorTemplates::kBadRequest.Generate("online path not available"); - } - break; +Error GetRootFolderFromRequest(const Request *request, std::string *root_folder) { + bool rawToOffline = request->GetCustomData()[asapo::kPosIngestMode] & asapo::kWriteRawDataToOffline; + if (request->GetSourceType() == SourceType::kProcessed || rawToOffline) { + *root_folder = request->GetOfflinePath(); + return nullptr; } - *root_folder = root; + *root_folder = request->GetOnlinePath(); + if (root_folder->empty()) { + return ReceiverErrorTemplates::kBadRequest.Generate("online path not available"); + } return nullptr; } +Error GetRootFolder(const Request *request, std::string *root_folder) { + auto fname = request->GetFileName(); + + auto err = CheckFileNameIsAbsolute(fname); + if (err != nullptr) { + return err; + } + + err = CheckFileFolderCorrespondsToRequestSourceType(request, fname); + if (err != nullptr) { + return err; + } + + return GetRootFolderFromRequest(request, root_folder); +} } diff --git a/receiver/unittests/request_handler/file_processors/test_file_processor.cpp b/receiver/unittests/request_handler/file_processors/test_file_processor.cpp index 5f2ac6d6acea527875fdebbfa45b8cd0e166d0c9..f8c7e562dfb89cdec4262e105e33693e5f5754cf 100644 --- a/receiver/unittests/request_handler/file_processors/test_file_processor.cpp +++ b/receiver/unittests/request_handler/file_processors/test_file_processor.cpp @@ -11,33 +11,8 @@ #include "../../receiver_mocking.h" -using ::testing::Test; -using ::testing::Return; -using ::testing::ReturnRef; -using ::testing::_; -using ::testing::DoAll; -using ::testing::SetArgReferee; -using ::testing::Gt; -using ::testing::Eq; -using ::testing::Ne; -using ::testing::Mock; -using ::testing::NiceMock; -using ::testing::InSequence; -using ::testing::SetArgPointee; -using ::testing::AllOf; -using ::testing::HasSubstr; - - -using ::asapo::Error; -using ::asapo::GetRootFolder; -using ::asapo::ErrorInterface; -using ::asapo::FileDescriptor; -using ::asapo::SocketDescriptor; -using ::asapo::MockIO; -using asapo::Request; -using asapo::ReceiveFileProcessor; -using ::asapo::GenericRequestHeader; -using asapo::MockRequest; +using namespace testing; +using namespace asapo; namespace { @@ -48,8 +23,9 @@ class FileProcessorTests : public Test { NiceMock<asapo::MockLogger> mock_logger; std::string expected_offline_path = "offline"; std::string expected_online_path = "online"; - void MockRequestData(std::string fname, asapo::SourceType type); - void SetUp() override { + void MockRequestData(std::string fname, asapo::SourceType type, bool write_raw_to_core); + CustomRequestData expected_custom_data {kDefaultIngestMode, 0, 0}; + void SetUp() override { GenericRequestHeader request_header; request_header.data_id = 2; asapo::ReceiverConfig test_config; @@ -61,9 +37,9 @@ class FileProcessorTests : public Test { }; -void FileProcessorTests::MockRequestData(std::string fname, asapo::SourceType type) { +void FileProcessorTests::MockRequestData(std::string fname, asapo::SourceType type, bool write_raw_to_core) { - if (type == asapo::SourceType::kProcessed) { + if (type == asapo::SourceType::kProcessed || write_raw_to_core) { EXPECT_CALL(*mock_request, GetOfflinePath()) .WillRepeatedly(ReturnRef(expected_offline_path)); } else { @@ -71,6 +47,15 @@ void FileProcessorTests::MockRequestData(std::string fname, asapo::SourceType ty .WillRepeatedly(ReturnRef(expected_online_path)); } + if (write_raw_to_core) { + expected_custom_data[asapo::kPosIngestMode] |= kWriteRawDataToOffline; + } else { + expected_custom_data[asapo::kPosIngestMode] = kDefaultIngestMode; + } + + EXPECT_CALL(*mock_request, GetCustomData_t()).WillRepeatedly(Return(expected_custom_data)); + + EXPECT_CALL(*mock_request, GetSourceType()).WillRepeatedly(Return(type)); EXPECT_CALL(*mock_request, GetFileName()).Times(1) @@ -88,22 +73,25 @@ TEST_F(FileProcessorTests, RawWriteToRaw) { struct Test { asapo::SourceType type; + bool write_raw_to_core; std::string filename; bool error; std::string res; }; std::vector<Test> tests = { - Test{asapo::SourceType::kProcessed, repl_sep("processed/bla.text"), false, expected_offline_path}, - Test{asapo::SourceType::kProcessed, repl_sep("raw/bla.text"), true, ""}, - Test{asapo::SourceType::kProcessed, repl_sep("processed/../bla.text"), true, ""}, - Test{asapo::SourceType::kProcessed, repl_sep("bla/bla.text"), true, ""}, - Test{asapo::SourceType::kProcessed, repl_sep("bla.text"), true, ""}, - Test{asapo::SourceType::kProcessed, repl_sep("./bla.text"), true, ""}, - Test{asapo::SourceType::kRaw, repl_sep("raw/bla.text"), false, expected_online_path}, + Test{asapo::SourceType::kProcessed, false, repl_sep("processed/bla.text"), false, expected_offline_path}, + Test{asapo::SourceType::kProcessed, true, repl_sep("processed/bla.text"), false, expected_offline_path}, + Test{asapo::SourceType::kProcessed, false, repl_sep("raw/bla.text"), true, ""}, + Test{asapo::SourceType::kProcessed, false, repl_sep("processed/../bla.text"), true, ""}, + Test{asapo::SourceType::kProcessed, false, repl_sep("bla/bla.text"), true, ""}, + Test{asapo::SourceType::kProcessed, false, repl_sep("bla.text"), true, ""}, + Test{asapo::SourceType::kProcessed, false, repl_sep("./bla.text"), true, ""}, + Test{asapo::SourceType::kRaw, false, repl_sep("raw/bla.text"), false, expected_online_path}, + Test{asapo::SourceType::kRaw, true, repl_sep("raw/bla.text"), false, expected_offline_path}, }; for (auto& test : tests) { - MockRequestData(test.filename, test.type); + MockRequestData(test.filename, test.type, test.write_raw_to_core); std::string res; auto err = GetRootFolder(mock_request.get(), &res); if (test.error) { diff --git a/receiver/unittests/request_handler/file_processors/test_receive_file_processor.cpp b/receiver/unittests/request_handler/file_processors/test_receive_file_processor.cpp index 4a129be53629b96b613fbd73079116bc14fd2ffe..0fe70c057a2890bc0cb0a01fe5c8d7aabb419014 100644 --- a/receiver/unittests/request_handler/file_processors/test_receive_file_processor.cpp +++ b/receiver/unittests/request_handler/file_processors/test_receive_file_processor.cpp @@ -11,32 +11,8 @@ #include "../../receiver_mocking.h" -using ::testing::Test; -using ::testing::Return; -using ::testing::ReturnRef; -using ::testing::_; -using ::testing::DoAll; -using ::testing::SetArgReferee; -using ::testing::Gt; -using ::testing::Eq; -using ::testing::Ne; -using ::testing::Mock; -using ::testing::NiceMock; -using ::testing::InSequence; -using ::testing::SetArgPointee; -using ::testing::AllOf; -using ::testing::HasSubstr; - - -using ::asapo::Error; -using ::asapo::ErrorInterface; -using ::asapo::FileDescriptor; -using ::asapo::SocketDescriptor; -using ::asapo::MockIO; -using asapo::Request; -using asapo::ReceiveFileProcessor; -using ::asapo::GenericRequestHeader; -using asapo::MockRequest; +using namespace testing; +using namespace asapo; namespace { @@ -63,6 +39,7 @@ class ReceiveFileProcessorTests : public Test { uint64_t expected_file_size = 10; bool expected_overwrite = false; std::string expected_root_folder = "root_folder"; + CustomRequestData expected_custom_data {kDefaultIngestMode, 0, 0}; std::string expected_full_path = expected_root_folder + asapo::kPathSeparator + expected_facility + asapo::kPathSeparator + "gpfs" + asapo::kPathSeparator + expected_beamline + @@ -101,6 +78,7 @@ void ReceiveFileProcessorTests::MockRequestData() { EXPECT_CALL(*mock_request, GetSourceType()).Times(2) .WillRepeatedly(Return(expected_source_type)); + EXPECT_CALL(*mock_request, GetCustomData_t()).WillRepeatedly(Return(expected_custom_data)); EXPECT_CALL(*mock_request, GetFileName()).Times(2) .WillRepeatedly(Return(expected_file_name)); diff --git a/receiver/unittests/request_handler/file_processors/test_write_file_processor.cpp b/receiver/unittests/request_handler/file_processors/test_write_file_processor.cpp index 02967f907b9120dfc063ce19ae0491ac585220b9..bead654171b2962415c58e6e9c28aa08c6c9d4b8 100644 --- a/receiver/unittests/request_handler/file_processors/test_write_file_processor.cpp +++ b/receiver/unittests/request_handler/file_processors/test_write_file_processor.cpp @@ -11,32 +11,8 @@ #include "../../receiver_mocking.h" -using ::testing::Test; -using ::testing::Return; -using ::testing::ReturnRef; -using ::testing::_; -using ::testing::DoAll; -using ::testing::SetArgReferee; -using ::testing::Gt; -using ::testing::Eq; -using ::testing::Ne; -using ::testing::Mock; -using ::testing::NiceMock; -using ::testing::InSequence; -using ::testing::SetArgPointee; -using ::testing::AllOf; -using ::testing::HasSubstr; - - -using ::asapo::Error; -using ::asapo::ErrorInterface; -using ::asapo::FileDescriptor; -using ::asapo::SocketDescriptor; -using ::asapo::MockIO; -using asapo::Request; -using asapo::WriteFileProcessor; -using ::asapo::GenericRequestHeader; -using asapo::MockRequest; +using namespace testing; +using namespace asapo; namespace { @@ -62,6 +38,7 @@ class WriteFileProcessorTests : public Test { uint64_t expected_file_size = 10; bool expected_overwrite = false; std::string expected_root_folder = "root_folder"; + CustomRequestData expected_custom_data {kDefaultIngestMode, 0, 0}; std::string expected_full_path = expected_root_folder + asapo::kPathSeparator + expected_facility + asapo::kPathSeparator + "gpfs" + asapo::kPathSeparator + expected_beamline + @@ -107,6 +84,8 @@ void WriteFileProcessorTests::MockRequestData(int times) { EXPECT_CALL(*mock_request, GetSourceType()).Times(times * 2) .WillRepeatedly(Return(expected_source_type)); + EXPECT_CALL(*mock_request, GetCustomData_t()).WillRepeatedly(Return(expected_custom_data)); + EXPECT_CALL(*mock_request, GetFileName()).Times(times * 2) .WillRepeatedly(Return(expected_file_name)); diff --git a/receiver/unittests/statistics/test_receiver_statistics.cpp b/receiver/unittests/statistics/test_receiver_statistics.cpp index 318acac37c904b146c1efe89586074a4d83f24a4..2956fb98014612bf82171a4b70cff5df6abc5441 100644 --- a/receiver/unittests/statistics/test_receiver_statistics.cpp +++ b/receiver/unittests/statistics/test_receiver_statistics.cpp @@ -73,7 +73,7 @@ void ReceiverStatisticTests::TestTimer(const StatisticEntity& entity) { auto stat = ExtractStat(); - ASSERT_THAT(stat.extra_entities[static_cast<size_t>(entity)].second, Ge(0.3)); + ASSERT_THAT(stat.extra_entities[static_cast<size_t>(entity)].second, Gt(0)); ASSERT_THAT(stat.extra_entities[static_cast<size_t>(entity)].second, Le(1.0)); } diff --git a/tests/automatic/producer_receiver/CMakeLists.txt b/tests/automatic/producer_receiver/CMakeLists.txt index fe492ed045288c807437cf76afb50225f44c06a9..aae8d4bf65ba9db9753972c71493c900cb5a8310 100644 --- a/tests/automatic/producer_receiver/CMakeLists.txt +++ b/tests/automatic/producer_receiver/CMakeLists.txt @@ -1,5 +1,11 @@ add_subdirectory(transfer_single_file) add_subdirectory(transfer_single_file_bypass_buffer) + +if (BUILD_PYTHON) + add_subdirectory(transfer_single_file_write_to_raw) +endif() + + add_subdirectory(transfer_datasets) #if (UNIX) #removed since monitoring willbe refactored anyway diff --git a/tests/automatic/producer_receiver/transfer_single_file_write_to_raw/CMakeLists.txt b/tests/automatic/producer_receiver/transfer_single_file_write_to_raw/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..f809ffc168a579e4b40c6169548832131571d99d --- /dev/null +++ b/tests/automatic/producer_receiver/transfer_single_file_write_to_raw/CMakeLists.txt @@ -0,0 +1,19 @@ +set(TARGET_NAME transfer-single-file-write-to-raw) + + +################################ +# Testing +################################ +if (UNIX) + get_target_property(PYTHON_LIBS python-lib-producer BINARY_DIR) +else() + get_target_property(PYTHON_LIBS asapo_producer BINARY_DIR) +endif() + + +file(TO_NATIVE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/transfer.py TEST_SCRIPT ) + +add_script_test("${TARGET_NAME}-python" "${Python_EXECUTABLE} ${PYTHON_LIBS} ${TEST_SCRIPT} " nomem) + + + diff --git a/tests/automatic/producer_receiver/transfer_single_file_write_to_raw/check_linux.sh b/tests/automatic/producer_receiver/transfer_single_file_write_to_raw/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..7e2ace2088cb63f7aa5b0315cd537bb167ccb2c6 --- /dev/null +++ b/tests/automatic/producer_receiver/transfer_single_file_write_to_raw/check_linux.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash + +set -e + +trap Cleanup EXIT + +database_name=db_test +data_source=python +beamtime_id=asapo_test +beamline=test +receiver_root_folder=/tmp/asapo/receiver/files +facility=test_facility +year=2019 +receiver_folder=${receiver_root_folder}/${facility}/gpfs/${beamline}/${year}/data/${beamtime_id} +receiver_folder_online=${receiver_root_folder}/beamline/${beamline}/current + +Cleanup() { + echo cleanup + rm -rf ${receiver_root_folder} ${receiver_folder_online} + echo "db.dropDatabase()" | mongo ${beamtime_id}_${data_source} + influx -database ${database_name} -execute "drop series from statistics, RequestsRate" +} + +mkdir -p ${receiver_folder} +mkdir -p ${receiver_folder_online} + + +export PYTHONPATH=$2:${PYTHONPATH} + +mkdir -p ${receiver_folder} + +echo test > file1 + +$1 $3 $data_source $beamtime_id "127.0.0.1:8400" &> out +cat out + +ls -ln ${receiver_folder}/raw/python/file1 | awk '{ print $5 }'| grep 5 +ls -ln ${receiver_folder_online}/raw/python/file1 | awk '{ print $5 }'| grep 5 + diff --git a/tests/automatic/producer_receiver/transfer_single_file_write_to_raw/check_windows.bat b/tests/automatic/producer_receiver/transfer_single_file_write_to_raw/check_windows.bat new file mode 100644 index 0000000000000000000000000000000000000000..5b9f488c3af7117c48fc4c56dc265c070811be20 --- /dev/null +++ b/tests/automatic/producer_receiver/transfer_single_file_write_to_raw/check_windows.bat @@ -0,0 +1,36 @@ +SET mongo_exe="c:\Program Files\MongoDB\Server\4.2\bin\mongo.exe" +SET beamtime_id=asapo_test +SET data_source=python +SET beamline=test +SET receiver_root_folder=c:\tmp\asapo\receiver\files +SET receiver_folder="%receiver_root_folder%\test_facility\gpfs\%beamline%\2019\data\%beamtime_id%" +SET receiver_folder_online="%receiver_root_folder%\beamline\%beamline%\current" + +mkdir %receiver_folder% +mkdir %receiver_folder_online% + +set PYTHONPATH=%2 + +echo test > file1 + +"%1" "%3" %data_source% %beamtime_id% "127.0.0.1:8400" > out +type out + +FOR /F "usebackq" %%A IN ('%receiver_folder%\raw\python\file1') DO set size=%%~zA +if %size% NEQ 7 goto :error + +FOR /F "usebackq" %%A IN ('%receiver_folder_online%\raw\python\file1') DO set size=%%~zA +if %size% NEQ 7 goto :error + +goto :clean + +:error +call :clean +exit /b 1 + +:clean +rmdir /S /Q %receiver_root_folder% +rmdir /S /Q %receiver_folder_online% +echo db.dropDatabase() | %mongo_exe% %beamtime_id%_python + + diff --git a/tests/automatic/producer_receiver/transfer_single_file_write_to_raw/transfer.py b/tests/automatic/producer_receiver/transfer_single_file_write_to_raw/transfer.py new file mode 100644 index 0000000000000000000000000000000000000000..9e4e16a1a52c5aee3c1321069d0b1b914da0b46f --- /dev/null +++ b/tests/automatic/producer_receiver/transfer_single_file_write_to_raw/transfer.py @@ -0,0 +1,40 @@ +from __future__ import print_function + +import asapo_producer +import sys +import time +import numpy as np +import threading +from datetime import datetime + + +lock = threading.Lock() + +data_source = sys.argv[1] +beamtime = sys.argv[2] +endpoint = sys.argv[3] + +token = "" +nthreads = 8 + +def callback(payload, err): + lock.acquire() # to print + if isinstance(err, asapo_producer.AsapoServerWarning): + print("successfuly sent, but with warning from server: ", payload, err) + elif err is not None: + print("could not sent: ", payload, err) + else: + print("successfuly sent: ", payload) + lock.release() + +producer = asapo_producer.create_producer(endpoint,'raw', beamtime, 'auto', data_source, token, nthreads, 60000) +producer.set_log_level("debug") + +# send single file +producer.send_file(1, local_path="./file1", exposed_path="raw/" + data_source + "/" + "file1", callback=callback) +producer.send_file(2, local_path="./file1", exposed_path="raw/" + data_source + "/" + "file1", + ingest_mode= asapo_producer.DEFAULT_INGEST_MODE|asapo_producer.INGEST_MODE_WRITE_RAW_DATA_TO_OFFLINE_FS, callback=callback) + +producer.wait_requests_finished(50000) + +print('Finished successfully')