diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index 9ce7f962e1dd3eacbc4d9d54daefc22826d89017..2895436afb2c701aeb44f0ccb2ed0d4f247308eb 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -39,7 +39,7 @@ enum NetworkErrorCode : uint16_t { const std::size_t kMaxMessageSize = 1024; const std::size_t kNCustomParams = 3; using CustomRequestData = uint64_t[kNCustomParams]; -const std::size_t kPosInjestMode = 0; +const std::size_t kPosIngestMode = 0; const std::size_t kPosDataSetId = 1; const std::size_t kPosDataSetSize = 2; @@ -63,8 +63,8 @@ struct GenericRequestHeader { char message[kMaxMessageSize]; std::string Json() { std::string s = "{\"id\":" + std::to_string(data_id) + "," - "\"buffer\":\"" + std::string(message)+"\"" - + "}"; + "\"buffer\":\"" + std::string(message) + "\"" + + "}"; return s; }; diff --git a/common/cpp/src/system_io/system_io.cpp b/common/cpp/src/system_io/system_io.cpp index 729ed56194ef94a7ed3dc8540a2eaf1e9cda2705..db0a2760cb0467d3a2a6e8a156615fa19593a996 100644 --- a/common/cpp/src/system_io/system_io.cpp +++ b/common/cpp/src/system_io/system_io.cpp @@ -152,8 +152,7 @@ Error SystemIO::WriteDataToFile(const std::string& root_folder, const std::strin full_name = fname; } Error err; - //todo:: add IO_OPEN_MODE_SET_LENGTH_0 + tests - auto fd = Open(full_name, IO_OPEN_MODE_CREATE | IO_OPEN_MODE_RW, &err); + auto fd = Open(full_name, IO_OPEN_MODE_CREATE | IO_OPEN_MODE_RW | IO_OPEN_MODE_SET_LENGTH_0, &err); if (err == IOErrorTemplates::kFileNotFound && create_directories) { size_t pos = fname.rfind(kPathSeparator); if (pos == std::string::npos) { diff --git a/producer/api/cpp/include/producer/producer.h b/producer/api/cpp/include/producer/producer.h index d60ad09ab04034224e55fe8a02ba107d99a95447..b87140ae9814685e0a0a86c370534ebeab0e7552 100644 --- a/producer/api/cpp/include/producer/producer.h +++ b/producer/api/cpp/include/producer/producer.h @@ -30,15 +30,23 @@ class Producer { \param data - A pointer to the data to send \return Error - Will be nullptr on success */ - virtual Error SendData(const EventHeader& event_header, FileData data, uint64_t injest_mode, + virtual Error SendData(const EventHeader& event_header, FileData data, uint64_t ingest_mode, RequestCallback callback) = 0; + + + //! Sends data to the receiver - same as SendData - memory should not be freed until send is finished + //! used e.g. for Python bindings + virtual Error SendData_(const EventHeader& event_header, void* data, uint64_t ingest_mode, + RequestCallback callback) = 0; + + //! Sends files to the receiver /*! \param event_header - A stucture with the meta information (file name, size is ignored). \param file name - A full path of the file to send \return Error - Will be nullptr on success */ - virtual Error SendFile(const EventHeader& event_header, std::string full_path, uint64_t injest_mode, + virtual Error SendFile(const EventHeader& event_header, std::string full_path, uint64_t ingest_mode, RequestCallback callback) = 0; //! Sends metadata for the current beamtime to the receiver diff --git a/producer/api/cpp/include/producer/producer_error.h b/producer/api/cpp/include/producer/producer_error.h index 3ceded7777a114ace0ae55d101cffc446ee887d6..a77e6c47af158042a35e641fae46fecad6da79e8 100644 --- a/producer/api/cpp/include/producer/producer_error.h +++ b/producer/api/cpp/include/producer/producer_error.h @@ -11,6 +11,8 @@ enum class ProducerErrorType { kFileTooLarge, kFileNameTooLong, kEmptyFileName, + kNoData, + kZeroDataSize, kBeamtimeIdTooLong, kBeamtimeAlreadySet, kFileIdAlreadyInUse, @@ -20,7 +22,7 @@ enum class ProducerErrorType { kInternalServerError, kCannotSendDataToReceivers, kRequestPoolIsFull, - kWrongInjestMode + kWrongIngestMode }; using ProducerErrorTemplate = ServiceErrorTemplate<ProducerErrorType, ErrorType::kProducerError>; @@ -33,11 +35,19 @@ auto const kConnectionNotReady = ProducerErrorTemplate { "Connection not ready", ProducerErrorType::kConnectionNotReady }; -auto const kWrongInjestMode = ProducerErrorTemplate { - "wrong injest mode", ProducerErrorType::kWrongInjestMode +auto const kWrongIngestMode = ProducerErrorTemplate { + "wrong ingest mode", ProducerErrorType::kWrongIngestMode +}; + +auto const kNoData = ProducerErrorTemplate { + "no data", ProducerErrorType::kNoData }; +auto const kZeroDataSize = ProducerErrorTemplate { + "zero data size", ProducerErrorType::kZeroDataSize +}; + auto const kErrorSubsetSize = ProducerErrorTemplate { "Error in subset size", ProducerErrorType::kErrorSubsetSize @@ -99,3 +109,4 @@ auto const kErrorInMetadata = ProducerErrorTemplate { } #endif //ASAPO_PRODUCER_ERROR_H + diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index 78d5c3fb3fcb40f30cfb4ee9b7d5d8dd3e30e329..0bfc656d42b3e6f06053f1e5e358979a8b646bca 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -31,7 +31,7 @@ ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads, a request_pool__.reset(new RequestPool{n_processing_threads, request_handler_factory_.get(), log__}); } -GenericRequestHeader ProducerImpl::GenerateNextSendRequest(const EventHeader& event_header, uint64_t injest_mode) { +GenericRequestHeader ProducerImpl::GenerateNextSendRequest(const EventHeader& event_header, uint64_t ingest_mode) { GenericRequestHeader request{kOpcodeTransferData, event_header.file_id, event_header.file_size, event_header.user_metadata.size(), std::move(event_header.file_name)}; if (event_header.subset_id != 0) { @@ -39,25 +39,25 @@ GenericRequestHeader ProducerImpl::GenerateNextSendRequest(const EventHeader& ev request.custom_data[kPosDataSetId] = event_header.subset_id; request.custom_data[kPosDataSetSize] = event_header.subset_size; } - request.custom_data[kPosInjestMode] = injest_mode; + request.custom_data[kPosIngestMode] = ingest_mode; return request; } -Error CheckInjestMode(uint64_t injest_mode) { - if ((injest_mode & IngestModeFlags::kTransferData) && - (injest_mode & IngestModeFlags::kTransferMetaDataOnly)) { - return ProducerErrorTemplates::kWrongInjestMode.Generate(); +Error CheckIngestMode(uint64_t ingest_mode) { + if ((ingest_mode & IngestModeFlags::kTransferData) && + (ingest_mode & IngestModeFlags::kTransferMetaDataOnly)) { + return ProducerErrorTemplates::kWrongIngestMode.Generate(); } - if (!(injest_mode & IngestModeFlags::kTransferData) && - !(injest_mode & IngestModeFlags::kTransferMetaDataOnly)) { - return ProducerErrorTemplates::kWrongInjestMode.Generate(); + if (!(ingest_mode & IngestModeFlags::kTransferData) && + !(ingest_mode & IngestModeFlags::kTransferMetaDataOnly)) { + return ProducerErrorTemplates::kWrongIngestMode.Generate(); } return nullptr; } -Error CheckProducerRequest(const EventHeader& event_header, uint64_t injest_mode) { +Error CheckProducerRequest(const EventHeader& event_header, uint64_t ingest_mode) { if ((size_t)event_header.file_size > ProducerImpl::kMaxChunkSize) { return ProducerErrorTemplates::kFileTooLarge.Generate(); } @@ -74,42 +74,61 @@ Error CheckProducerRequest(const EventHeader& event_header, uint64_t injest_mode return ProducerErrorTemplates::kErrorSubsetSize.Generate(); } - return CheckInjestMode(injest_mode); + return CheckIngestMode(ingest_mode); } Error ProducerImpl::Send(const EventHeader& event_header, FileData data, std::string full_path, - uint64_t injest_mode, - RequestCallback callback) { - auto err = CheckProducerRequest(event_header, injest_mode); + uint64_t ingest_mode, + RequestCallback callback, + bool manage_data_memory) { + auto err = CheckProducerRequest(event_header, ingest_mode); if (err) { log__->Error("error checking request - " + err->Explain()); return err; } - auto request_header = GenerateNextSendRequest(event_header, injest_mode); + auto request_header = GenerateNextSendRequest(event_header, ingest_mode); return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header), - std::move(data), std::move(event_header.user_metadata), std::move(full_path), callback} + std::move(data), std::move(event_header.user_metadata), std::move(full_path), callback, manage_data_memory} }); } +bool WandTransferData(uint64_t ingest_mode) { + return ingest_mode & IngestModeFlags::kTransferData; +} + +Error CheckData(uint64_t ingest_mode,const EventHeader& event_header, const FileData* data) { + if (WandTransferData(ingest_mode)) { + if (*data == nullptr) { + return ProducerErrorTemplates::kNoData.Generate(); + } + if (event_header.file_size == 0) { + return ProducerErrorTemplates::kZeroDataSize.Generate(); + } + } + return nullptr; +} Error ProducerImpl::SendData(const EventHeader& event_header, FileData data, - uint64_t injest_mode, RequestCallback callback) { - return Send(std::move(event_header), std::move(data), "", injest_mode, callback); + uint64_t ingest_mode, RequestCallback callback) { + if (auto err = CheckData(ingest_mode,event_header,&data)) { + return err; + } + return Send(std::move(event_header), std::move(data), "", ingest_mode, callback, true); } -Error ProducerImpl::SendFile(const EventHeader& event_header, std::string full_path, uint64_t injest_mode, +Error ProducerImpl::SendFile(const EventHeader& event_header, std::string full_path, uint64_t ingest_mode, RequestCallback callback) { if (full_path.empty()) { return ProducerErrorTemplates::kEmptyFileName.Generate(); } - return Send(event_header, nullptr, std::move(full_path), injest_mode, callback); + return Send(event_header, nullptr, std::move(full_path), ingest_mode, callback, true); } @@ -148,12 +167,27 @@ Error ProducerImpl::SetCredentials(SourceCredentials source_cred) { Error ProducerImpl::SendMetaData(const std::string& metadata, RequestCallback callback) { GenericRequestHeader request_header{kOpcodeTransferMetaData, 0, metadata.size(), 0, "beamtime_global.meta"}; - request_header.custom_data[kPosInjestMode] = asapo::IngestModeFlags::kTransferData; + request_header.custom_data[kPosIngestMode] = asapo::IngestModeFlags::kTransferData; FileData data{new uint8_t[metadata.size()]}; strncpy((char*)data.get(), metadata.c_str(), metadata.size()); return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header), - std::move(data), "", "", callback} + std::move(data), "", "", callback, true} }); } + +Error ProducerImpl::SendData_(const EventHeader& event_header, + void* data, + uint64_t ingest_mode, + RequestCallback callback) { + FileData data_wrapped = FileData{(uint8_t*)data}; + + if (auto err = CheckData(ingest_mode,event_header,&data_wrapped)) { + return err; + } + + + return Send(std::move(event_header), std::move(data_wrapped), "", ingest_mode, callback, false); +} + } \ No newline at end of file diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h index d3eba882d8f8079d6b5a93efbab5967297c25512..e6efc2c6c2eae366e63fb34e190f6ab723be97f1 100644 --- a/producer/api/cpp/src/producer_impl.h +++ b/producer/api/cpp/src/producer_impl.h @@ -28,8 +28,11 @@ class ProducerImpl : public Producer { void SetLogLevel(LogLevel level) override; void EnableLocalLog(bool enable) override; void EnableRemoteLog(bool enable) override; - Error SendData(const EventHeader& event_header, FileData data, uint64_t injest_mode, RequestCallback callback) override; - Error SendFile(const EventHeader& event_header, std::string full_path, uint64_t injest_mode, + Error SendData(const EventHeader& event_header, FileData data, uint64_t ingest_mode, RequestCallback callback) override; + Error SendData_(const EventHeader& event_header, void* data , uint64_t ingest_mode, + RequestCallback callback) override; + + Error SendFile(const EventHeader& event_header, std::string full_path, uint64_t ingest_mode, RequestCallback callback) override; AbstractLogger* log__; std::unique_ptr<RequestPool> request_pool__; @@ -39,9 +42,9 @@ class ProducerImpl : public Producer { Error SendMetaData(const std::string& metadata, RequestCallback callback) override; private: - Error Send(const EventHeader& event_header, FileData data, std::string full_path, uint64_t injest_mode, - RequestCallback callback); - GenericRequestHeader GenerateNextSendRequest(const EventHeader& event_header, uint64_t injest_mode); + Error Send(const EventHeader& event_header, FileData data, std::string full_path, uint64_t ingest_mode, + RequestCallback callback, bool manage_data_memory); + GenericRequestHeader GenerateNextSendRequest(const EventHeader& event_header, uint64_t ingest_mode); std::string source_cred_string_; }; diff --git a/producer/api/cpp/src/producer_request.cpp b/producer/api/cpp/src/producer_request.cpp index 6a7857f33ff4ba9f733d54b438206a5e7aa8986c..0af745937e66da386cc4a25693501ee410673b9b 100644 --- a/producer/api/cpp/src/producer_request.cpp +++ b/producer/api/cpp/src/producer_request.cpp @@ -16,19 +16,27 @@ ProducerRequest::ProducerRequest(std::string source_credentials, FileData data, std::string metadata, std::string original_filepath, - RequestCallback callback) : GenericRequest(std::move(h)), + RequestCallback callback, + bool manage_data_memory) : GenericRequest(std::move(h)), source_credentials{std::move(source_credentials)}, metadata{std::move(metadata)}, data{std::move(data)}, original_filepath{std::move(original_filepath)}, - callback{callback} { + callback{callback}, + manage_data_memory{manage_data_memory} { } bool ProducerRequest::NeedSendData() const { if (header.op_code == kOpcodeTransferData || header.op_code == kOpcodeTransferSubsetData) { - return header.custom_data[kPosInjestMode] & IngestModeFlags::kTransferData; + return header.custom_data[kPosIngestMode] & IngestModeFlags::kTransferData; } return true; } +ProducerRequest::~ProducerRequest() { + if (!manage_data_memory && data != nullptr) { + data.release(); + } +} + } \ No newline at end of file diff --git a/producer/api/cpp/src/producer_request.h b/producer/api/cpp/src/producer_request.h index dba048b346fcbc192cd925cfcfe767eaa2cdb2b7..a5001a73d9162598c1c2600f224561f62612f022 100644 --- a/producer/api/cpp/src/producer_request.h +++ b/producer/api/cpp/src/producer_request.h @@ -11,15 +11,18 @@ namespace asapo { class ProducerRequest : public GenericRequest { public: + ~ProducerRequest(); ProducerRequest(std::string source_credentials, GenericRequestHeader header, FileData data, std::string metadata, std::string original_filepath, - RequestCallback callback); + RequestCallback callback, + bool manage_data_memory); std::string source_credentials; std::string metadata; FileData data; std::string original_filepath; RequestCallback callback; + bool manage_data_memory; Error ReadDataFromFileIfNeeded(const IO* io); bool NeedSendData() const; }; diff --git a/producer/api/cpp/unittests/test_producer.cpp b/producer/api/cpp/unittests/test_producer.cpp index 8e2e836024c71e5f9e89f56626ece6962b970dcd..c116c046efdaaecd4a075e2c9ea7e033ce87b471 100644 --- a/producer/api/cpp/unittests/test_producer.cpp +++ b/producer/api/cpp/unittests/test_producer.cpp @@ -66,7 +66,7 @@ TEST(Producer, SimpleWorkflowWihoutConnection) { &err); asapo::EventHeader event_header{1, 1, "test"}; - auto err_send = producer->SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr); + auto err_send = producer->SendData(event_header, nullptr, asapo::kTransferMetaDataOnly, nullptr); std::this_thread::sleep_for(std::chrono::milliseconds(100)); ASSERT_THAT(producer, Ne(nullptr)); diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index 90a73651394a26f16cd45dbb9005401196ff1099..782a45a128dbb374913cfd860d759772f06f8389 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -28,21 +28,22 @@ using ::testing::HasSubstr; using asapo::RequestPool; using asapo::ProducerRequest; -MATCHER_P9(M_CheckSendDataRequest, op_code, source_credentials, metadata, file_id, file_size, message, injest_mode, - subset_id, - subset_size, - "Checks if a valid GenericRequestHeader was Send") { +MATCHER_P10(M_CheckSendDataRequest, op_code, source_credentials, metadata, file_id, file_size, message, ingest_mode, + subset_id, + subset_size, manage_data_memory, + "Checks if a valid GenericRequestHeader was Send") { auto request = static_cast<ProducerRequest*>(arg); return ((asapo::GenericRequestHeader) (arg->header)).op_code == op_code && ((asapo::GenericRequestHeader) (arg->header)).data_id == file_id && ((asapo::GenericRequestHeader) (arg->header)).data_size == uint64_t(file_size) + && request->manage_data_memory == manage_data_memory && request->source_credentials == source_credentials && request->metadata == metadata && (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[1] == uint64_t(subset_id) : true) && (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[2] == uint64_t(subset_size) : true) - && ((asapo::GenericRequestHeader) (arg->header)).custom_data[asapo::kPosInjestMode] == uint64_t(injest_mode) + && ((asapo::GenericRequestHeader) (arg->header)).custom_data[asapo::kPosIngestMode] == uint64_t(ingest_mode) && strcmp(((asapo::GenericRequestHeader) (arg->header)).message, message) == 0; } @@ -63,7 +64,7 @@ class ProducerImplTests : public testing::Test { uint64_t expected_id = 10; uint64_t expected_subset_id = 100; uint64_t expected_subset_size = 4; - uint64_t expected_injest_mode = asapo::IngestModeFlags::kTransferData; + uint64_t expected_ingest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly; char expected_name[asapo::kMaxMessageSize] = "test_name"; asapo::SourceCredentials expected_credentials{ @@ -78,6 +79,8 @@ class ProducerImplTests : public testing::Test { std::string expected_metadata = "meta"; std::string expected_fullpath = "filename"; + bool expected_managed_memory = true; + bool expected_unmanaged_memory = false; void SetUp() override { producer.log__ = &mock_logger; producer.request_pool__ = std::unique_ptr<RequestPool> {&mock_pull}; @@ -91,21 +94,21 @@ TEST_F(ProducerImplTests, SendReturnsError) { EXPECT_CALL(mock_pull, AddRequest_t(_)).WillOnce(Return( asapo::ProducerErrorTemplates::kRequestPoolIsFull.Generate().release())); asapo::EventHeader event_header{1, 1, "test"}; - auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr); + auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull)); } TEST_F(ProducerImplTests, ErrorIfFileNameTooLong) { std::string long_string(asapo::kMaxMessageSize + 100, 'a'); asapo::EventHeader event_header{1, 1, long_string}; - auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr); + auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kFileNameTooLong)); } TEST_F(ProducerImplTests, ErrorIfFileEmpty) { std::string long_string(asapo::kMaxMessageSize + 100, 'a'); asapo::EventHeader event_header{1, 1, ""}; - auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr); + auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kEmptyFileName)); } @@ -113,17 +116,51 @@ TEST_F(ProducerImplTests, ErrorIfFileEmpty) { TEST_F(ProducerImplTests, ErrorIfSizeTooLarge) { EXPECT_CALL(mock_logger, Error(testing::HasSubstr("error checking"))); asapo::EventHeader event_header{1, asapo::ProducerImpl::kMaxChunkSize + 1, ""}; - auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr); + auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kFileTooLarge)); } TEST_F(ProducerImplTests, ErrorIfSubsetSizeNotDefined) { EXPECT_CALL(mock_logger, Error(testing::HasSubstr("subset size"))); asapo::EventHeader event_header{1, asapo::ProducerImpl::kMaxChunkSize, "test", "", 1}; - auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr); + auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kErrorSubsetSize)); } +TEST_F(ProducerImplTests, ErrorIfZeroDataSize) { + asapo::FileData data = asapo::FileData{new uint8_t(100) }; + asapo::EventHeader event_header{1, 0, expected_fullpath}; + auto err = producer.SendData(event_header, std::move(data), asapo::kDefaultIngestMode, nullptr); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kZeroDataSize)); +} + +TEST_F(ProducerImplTests, ErrorIfNoData) { + asapo::EventHeader event_header{1, 100, expected_fullpath}; + auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kNoData)); +} + +TEST_F(ProducerImplTests, ErrorIfNoDataSend_) { + asapo::EventHeader event_header{1, 100, expected_fullpath}; + auto err = producer.SendData_(event_header, nullptr, asapo::kDefaultIngestMode, nullptr); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kNoData)); +} + + +TEST_F(ProducerImplTests, OkIfNoDataWithTransferMetadataOnlyMode) { + asapo::EventHeader event_header{1, 100, expected_fullpath}; + auto err = producer.SendData(event_header, nullptr, asapo::kTransferMetaDataOnly, nullptr); + ASSERT_THAT(err, Eq(nullptr)); +} + +TEST_F(ProducerImplTests, OkIfZeroSizeWithTransferMetadataOnlyMode) { + asapo::FileData data = asapo::FileData{new uint8_t(100) }; + asapo::EventHeader event_header{1, 0, expected_fullpath}; + auto err = producer.SendData(event_header, std::move(data), asapo::kTransferMetaDataOnly, nullptr); + ASSERT_THAT(err, Eq(nullptr)); +} + + TEST_F(ProducerImplTests, UsesDefaultStream) { producer.SetCredentials(expected_default_credentials); @@ -133,13 +170,15 @@ TEST_F(ProducerImplTests, UsesDefaultStream) { expected_id, expected_size, expected_name, - expected_injest_mode, + expected_ingest_mode, 0, - 0))).WillOnce(Return( - nullptr)); + 0, + expected_managed_memory + ))).WillOnce(Return( + nullptr)); asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata}; - auto err = producer.SendData(event_header, nullptr, expected_injest_mode, nullptr); + auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(nullptr)); } @@ -153,13 +192,36 @@ TEST_F(ProducerImplTests, OKSendingSendDataRequest) { expected_id, expected_size, expected_name, - expected_injest_mode, + expected_ingest_mode, 0, - 0))).WillOnce(Return( - nullptr)); + 0, + expected_managed_memory))).WillOnce(Return( + nullptr)); + + asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata}; + auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr); + + ASSERT_THAT(err, Eq(nullptr)); +} + +TEST_F(ProducerImplTests, OKSendingSendDataRequesUnmanagedMemory) { + producer.SetCredentials(expected_credentials); + + EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, + expected_credentials_str, + expected_metadata, + expected_id, + expected_size, + expected_name, + expected_ingest_mode, + 0, + 0, + expected_unmanaged_memory + ))).WillOnce(Return( + nullptr)); asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata}; - auto err = producer.SendData(event_header, nullptr, expected_injest_mode, nullptr); + auto err = producer.SendData_(event_header, nullptr, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(nullptr)); } @@ -169,14 +231,14 @@ TEST_F(ProducerImplTests, OKSendingSendSubsetDataRequest) { EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferSubsetData, expected_credentials_str, expected_metadata, expected_id, expected_size, expected_name, - expected_injest_mode, - expected_subset_id, expected_subset_size))).WillOnce( + expected_ingest_mode, + expected_subset_id, expected_subset_size, expected_managed_memory))).WillOnce( Return( nullptr)); asapo::EventHeader event_header {expected_id, expected_size, expected_name, expected_metadata, expected_subset_id, expected_subset_size}; - auto err = producer.SendData(event_header, nullptr, expected_injest_mode, nullptr); + auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(nullptr)); } @@ -185,7 +247,7 @@ TEST_F(ProducerImplTests, OKAddingSendMetaDataRequest) { expected_id = 0; expected_metadata = "{\"meta\":10}"; expected_size = expected_metadata.size(); - expected_injest_mode = asapo::IngestModeFlags::kTransferData; + expected_ingest_mode = asapo::IngestModeFlags::kTransferData; producer.SetCredentials(expected_credentials); EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferMetaData, @@ -194,9 +256,9 @@ TEST_F(ProducerImplTests, OKAddingSendMetaDataRequest) { expected_id, expected_size, "beamtime_global.meta", - expected_injest_mode, + expected_ingest_mode, 10, - 10))).WillOnce(Return( + 10, expected_managed_memory))).WillOnce(Return( nullptr)); auto err = producer.SendMetaData(expected_metadata, nullptr); @@ -211,19 +273,20 @@ TEST_F(ProducerImplTests, ErrorSendingEmptyFileName) { EXPECT_CALL(mock_pull, AddRequest_t(_)).Times(0); asapo::EventHeader event_header{expected_id, 0, expected_name}; - auto err = producer.SendFile(event_header, "", expected_injest_mode, nullptr); + auto err = producer.SendFile(event_header, "", expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kEmptyFileName)); } + TEST_F(ProducerImplTests, ErrorSendingEmptyRelativeFileName) { producer.SetCredentials(expected_credentials); EXPECT_CALL(mock_pull, AddRequest_t(_)).Times(0); asapo::EventHeader event_header{expected_id, 0, ""}; - auto err = producer.SendFile(event_header, expected_fullpath, expected_injest_mode, nullptr); + auto err = producer.SendFile(event_header, expected_fullpath, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kEmptyFileName)); @@ -239,13 +302,13 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequest) { expected_id, 0, expected_name, - expected_injest_mode, + expected_ingest_mode, 0, - 0))).WillOnce(Return( - nullptr)); + 0, expected_managed_memory))).WillOnce(Return( + nullptr)); asapo::EventHeader event_header{expected_id, 0, expected_name}; - auto err = producer.SendFile(event_header, expected_fullpath, expected_injest_mode, nullptr); + auto err = producer.SendFile(event_header, expected_fullpath, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(nullptr)); } @@ -269,21 +332,21 @@ TEST_F(ProducerImplTests, ErrorSettingSecondTime) { ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCredentialsAlreadySet)); } -TEST_F(ProducerImplTests, ErrorSendingWrongInjestMode) { +TEST_F(ProducerImplTests, ErrorSendingWrongIngestMode) { producer.SetCredentials(expected_credentials); EXPECT_CALL(mock_pull, AddRequest_t(_)).Times(0); asapo::EventHeader event_header{expected_id, 0, expected_name}; - auto injest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly | asapo::IngestModeFlags::kTransferData; - auto err = producer.SendFile(event_header, expected_fullpath, injest_mode, nullptr); + auto ingest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly | asapo::IngestModeFlags::kTransferData; + auto err = producer.SendFile(event_header, expected_fullpath, ingest_mode, nullptr); - injest_mode = 0; - auto err_null = producer.SendFile(event_header, expected_fullpath, injest_mode, nullptr); + ingest_mode = 0; + auto err_null = producer.SendFile(event_header, expected_fullpath, ingest_mode, nullptr); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInjestMode)); - ASSERT_THAT(err_null, Eq(asapo::ProducerErrorTemplates::kWrongInjestMode)); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongIngestMode)); + ASSERT_THAT(err_null, Eq(asapo::ProducerErrorTemplates::kWrongIngestMode)); } diff --git a/producer/api/cpp/unittests/test_producer_request.cpp b/producer/api/cpp/unittests/test_producer_request.cpp index 3ca0a1fe1d0b85c65ac56dfb86f748bdd9547dcb..2c93230024ebe3f90698fc0beac507760dc4d528 100644 --- a/producer/api/cpp/unittests/test_producer_request.cpp +++ b/producer/api/cpp/unittests/test_producer_request.cpp @@ -44,7 +44,7 @@ TEST(ProducerRequest, Constructor) { asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_meta_size, expected_file_name}; - asapo::ProducerRequest request{expected_source_credentials, std::move(header), nullptr, expected_meta, "", nullptr}; + asapo::ProducerRequest request{expected_source_credentials, std::move(header), nullptr, expected_meta, "", nullptr, true}; ASSERT_THAT(request.source_credentials, Eq(expected_source_credentials)); ASSERT_THAT(request.metadata, Eq(expected_meta)); @@ -56,4 +56,17 @@ TEST(ProducerRequest, Constructor) { } + +TEST(ProducerRequest, Destructor) { +// fails with data corruption if done wrong + char data_[100]; + asapo::FileData data{(uint8_t*)data_}; + asapo::GenericRequestHeader header{asapo::kOpcodeTransferData, 1, 1, 1, ""}; + asapo::ProducerRequest* request = new asapo::ProducerRequest{"", std::move(header), std::move(data), "", "", nullptr, false}; + + delete request; + + +} + } diff --git a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp index 603bbf49e07e1b5b9de5ec269cf473c5bda66c21..41359b4606bac54f0a0d9df154f13578bf8344c6 100644 --- a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp +++ b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp @@ -61,19 +61,19 @@ class RequestHandlerFilesystemTests : public testing::Test { called = true; callback_err = std::move(err); callback_header = header; - }}; + }, true}; - asapo::ProducerRequest request_nocallback{"", header, nullptr, "", "", nullptr}; - asapo::ProducerRequest request_filesend{"", header, nullptr, "", expected_origin_fullpath, nullptr}; + asapo::ProducerRequest request_nocallback{"", header, nullptr, "", "", nullptr, true}; + asapo::ProducerRequest request_filesend{"", header, nullptr, "", expected_origin_fullpath, nullptr, true}; testing::NiceMock<asapo::MockLogger> mock_logger; asapo::RequestHandlerFilesystem request_handler{expected_destination, expected_thread_id}; void SetUp() override { - request.header.custom_data[asapo::kPosInjestMode] = asapo::kDefaultIngestMode; - request_filesend.header.custom_data[asapo::kPosInjestMode] = asapo::kDefaultIngestMode; - request_nocallback.header.custom_data[asapo::kPosInjestMode] = asapo::kDefaultIngestMode; + request.header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; + request_filesend.header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; + request_nocallback.header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; request_handler.log__ = &mock_logger; request_handler.io__.reset(&mock_io); diff --git a/producer/api/cpp/unittests/test_request_handler_tcp.cpp b/producer/api/cpp/unittests/test_request_handler_tcp.cpp index 0c9caf4bb6f30a3970c82fda3b256e8e384bca6c..dd604464733ef92c1a5cffa67e331e9afc28437a 100644 --- a/producer/api/cpp/unittests/test_request_handler_tcp.cpp +++ b/producer/api/cpp/unittests/test_request_handler_tcp.cpp @@ -72,7 +72,7 @@ class RequestHandlerTcpTests : public testing::Test { callback_called = true; callback_err = std::move(err); callback_header = header; - }}; + }, true}; std::string expected_origin_fullpath = std::string("origin/") + expected_file_name; asapo::ProducerRequest request_filesend{expected_beamtime_id, header, nullptr, expected_metadata, @@ -80,10 +80,10 @@ class RequestHandlerTcpTests : public testing::Test { callback_called = true; callback_err = std::move(err); callback_header = header; - }}; + }, true}; - asapo::ProducerRequest request_nocallback{expected_beamtime_id, header, nullptr, expected_metadata, "", nullptr}; + asapo::ProducerRequest request_nocallback{expected_beamtime_id, header, nullptr, expected_metadata, "", nullptr, true}; testing::NiceMock<asapo::MockLogger> mock_logger; uint64_t n_connections{0}; asapo::RequestHandlerTcp request_handler{&mock_discovery_service, expected_thread_id, &n_connections}; @@ -118,9 +118,9 @@ class RequestHandlerTcpTests : public testing::Test { void SetUp() override { request_handler.log__ = &mock_logger; request_handler.io__.reset(&mock_io); - request.header.custom_data[asapo::kPosInjestMode] = asapo::kDefaultIngestMode; - request_filesend.header.custom_data[asapo::kPosInjestMode] = asapo::kDefaultIngestMode; - request_nocallback.header.custom_data[asapo::kPosInjestMode] = asapo::kDefaultIngestMode; + request.header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; + request_filesend.header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; + request_nocallback.header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; ON_CALL(mock_discovery_service, RotatedUriList(_)). WillByDefault(Return(receivers_list)); @@ -721,7 +721,7 @@ TEST_F(RequestHandlerTcpTests, SendOK) { ASSERT_THAT(std::string{callback_header.message}, Eq(std::string{header.message})); } -TEST_F(RequestHandlerTcpTests, SendMetadataIgnoresInjestMode) { +TEST_F(RequestHandlerTcpTests, SendMetadataIgnoresIngestMode) { ExpectOKConnect(true); ExpectOKAuthorize(true); ExpectOKSendHeader(true, asapo::kOpcodeTransferMetaData); @@ -729,8 +729,8 @@ TEST_F(RequestHandlerTcpTests, SendMetadataIgnoresInjestMode) { ExpectOKSendMetaData(true); ExpectOKReceive(); - auto injest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly; - request.header.custom_data[asapo::kPosInjestMode] = injest_mode; + auto ingest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly; + request.header.custom_data[asapo::kPosIngestMode] = ingest_mode; request.header.op_code = asapo::kOpcodeTransferMetaData; request_handler.PrepareProcessingRequestLocked(); @@ -747,14 +747,14 @@ TEST_F(RequestHandlerTcpTests, SendMetaOnlyOK) { ExpectOKSendMetaData(true); ExpectOKReceive(); - auto injest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly; + auto ingest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly; - request.header.custom_data[asapo::kPosInjestMode] = injest_mode; + request.header.custom_data[asapo::kPosIngestMode] = ingest_mode; request_handler.PrepareProcessingRequestLocked(); auto err = request_handler.ProcessRequestUnlocked(&request); ASSERT_THAT(err, Eq(nullptr)); - ASSERT_THAT(callback_header.custom_data[asapo::kPosInjestMode], Eq(injest_mode)); + ASSERT_THAT(callback_header.custom_data[asapo::kPosIngestMode], Eq(ingest_mode)); } TEST_F(RequestHandlerTcpTests, SendMetaOnlyForFileReadOK) { @@ -766,11 +766,11 @@ TEST_F(RequestHandlerTcpTests, SendMetaOnlyForFileReadOK) { request_handler.PrepareProcessingRequestLocked(); - EXPECT_CALL(mock_io, GetDataFromFile_t(_,_,_)).Times(0); + EXPECT_CALL(mock_io, GetDataFromFile_t(_, _, _)).Times(0); - auto injest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly; + auto ingest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly; - request_filesend.header.custom_data[asapo::kPosInjestMode] = injest_mode; + request_filesend.header.custom_data[asapo::kPosIngestMode] = ingest_mode; auto err = request_handler.ProcessRequestUnlocked(&request_filesend); ASSERT_THAT(err, Eq(nullptr)); } diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index abc6b065c977dea5585eba1cbc217180461ab4bc..bfd4a043f9cb56e5d15bf2e9773394bbcc19834f 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -72,14 +72,18 @@ cdef extern from "asapo_producer.h" namespace "asapo": cdef extern from "asapo_wrappers.h" namespace "asapo": cppclass RequestCallbackCython: pass + cppclass RequestCallbackCythonMemory: + pass RequestCallback unwrap_callback(RequestCallbackCython, void*,void*) + RequestCallback unwrap_callback_with_memory(RequestCallbackCythonMemory, void*,void*,void*) cdef extern from "asapo_producer.h" namespace "asapo": cppclass Producer: @staticmethod unique_ptr[Producer] Create(string endpoint,uint8_t nthreads,RequestHandlerType type, SourceCredentials source,Error* error) - Error SendFile(const EventHeader& event_header, string full_path, uint64_t injest_mode,RequestCallback callback) + Error SendFile(const EventHeader& event_header, string full_path, uint64_t ingest_mode,RequestCallback callback) + Error SendData_(const EventHeader& event_header, void* data, uint64_t ingest_mode,RequestCallback callback) void SetLogLevel(LogLevel level) cdef extern from "asapo_producer.h" namespace "asapo": diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index d5d14af015287174d305d15cbdafd1860a50cd8c..a5987b464c5ce159774d1d8270515d58e0ad5ef0 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -6,13 +6,14 @@ cimport numpy as np import json from cpython.version cimport PY_MAJOR_VERSION from libcpp.memory cimport unique_ptr +from cpython.ref cimport PyObject,Py_XINCREF,Py_XDECREF np.import_array() -DEFAULT_INJEST_MODE = kDefaultIngestMode -INJEST_MODE_TRANSFER_DATA = kTransferData -INJEST_MODE_TRANSFER_METADATA_ONLY = kTransferMetaDataOnly -INJEST_MODE_STORE_IN_FILESYSTEM = kStoreInFilesystem +DEFAULT_INGEST_MODE = kDefaultIngestMode +INGEST_MODE_TRANSFER_DATA = kTransferData +INGEST_MODE_TRANSFER_METADATA_ONLY = kTransferMetaDataOnly +INGEST_MODE_STORE_IN_FILESYSTEM = kStoreInFilesystem cdef extern from "numpy/ndarraytypes.h": @@ -33,6 +34,20 @@ cdef bytes _bytes(s): else: raise TypeError("Could not convert to unicode.") + +cdef void* data_pointer_nparray(data): + if data is None: + return <void*>NULL + data_char = data.view(np.int8) + cdef char[::1] arr_memview = data_char + return <void*>&arr_memview[0] + +cdef void* data_pointer_bytes(data): + if data is None: + return <void*>NULL + cdef const unsigned char[::1] arr_memview = data + return <void*>&arr_memview[0] + cdef class PyProducer: cdef unique_ptr[Producer] c_producer def set_log_level(self,level): @@ -47,37 +62,101 @@ cdef class PyProducer: elif level == "warn" : log_level = LogLevel_Warning else: - print("wrong loglevel mode: "+level) + print("wrong loglevel mode: "+ level) return self.c_producer.get().SetLogLevel(log_level) - def send_file(self,int id,string local_path,string exposed_path,user_meta=None,subset=None,injest_mode = DEFAULT_INJEST_MODE,callback=None): + + def send_np_array(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None): + cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode) + event_header.file_id = id + if data is None: + event_header.file_size = 0 + else: + event_header.file_size = data.nbytes + err = self.c_producer.get().SendData_(event_header, data_pointer_nparray(data), ingest_mode, + unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_ndarr, + <void*>self,<void*>callback if callback != None else NULL, <void*>data)) + cdef err_str = GetErrorString(&err) + if err_str.strip(): + return err_str + else: + if data is not None: + if data.base is not None: + Py_XINCREF(<PyObject*>data.base) + else: + Py_XINCREF(<PyObject*>data) + return None + cdef EventHeader create_event_header(self,int id, exposed_path,user_meta,subset,ingest_mode): cdef EventHeader event_header event_header.file_id = id - event_header.file_size = 0 - event_header.file_name = exposed_path - event_header.user_metadata = user_meta if user_meta!=None else "" + event_header.file_name = _bytes(exposed_path) + event_header.user_metadata = _bytes(user_meta) if user_meta!=None else "" if subset == None: event_header.subset_id = 0 event_header.subset_size = 0 else: event_header.subset_id = subset[0] event_header.subset_size = subset[1] - err = self.c_producer.get().SendFile(event_header, _bytes(local_path), injest_mode, - unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL)) + return event_header + + def send_bytes(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None): + cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode) + event_header.file_size = len(data) + err = self.c_producer.get().SendData_(event_header, data_pointer_bytes(data), ingest_mode, + unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr, + <void*>self,<void*>callback if callback != None else NULL, <void*>data)) cdef err_str = GetErrorString(&err) if err_str.strip(): return err_str else: + Py_XINCREF(<PyObject*>data) return None - cdef void c_callback(self,py_callback,GenericRequestHeader header, Error err) with gil: - info_str = _str(header.Json()) - info = json.loads(info_str) + + def send_data(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None): + if type(data) == np.ndarray or data == None: + return self.send_np_array(id,exposed_path,data,user_meta,subset,ingest_mode,callback) + elif type(data) == bytes: + return self.send_bytes(id,exposed_path,data,user_meta,subset,ingest_mode,callback) + else: + return "wrong data type: " + str(type(data)) + + def send_file(self,int id, local_path, exposed_path,user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None): + cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode) + event_header.file_size = 0 + err = self.c_producer.get().SendFile(event_header, _bytes(local_path), ingest_mode, + unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL)) cdef err_str = GetErrorString(&err) if err_str.strip(): - py_err = err_str + return err_str else: - py_err = None - py_callback(info,py_err) + return None + cdef void c_callback_python(self,py_callback, GenericRequestHeader header, string err_str): + if py_callback != None: + info_str = _str(header.Json()) + info = json.loads(info_str) + if err_str.strip(): + py_err = err_str + else: + py_err = None + py_callback(info,py_err) + + cdef void c_callback(self,py_callback, GenericRequestHeader header, Error err) with gil: + cdef err_str = GetErrorString(&err) + self.c_callback_python(py_callback,header,"") + + cdef void c_callback_ndarr(self,py_callback,nd_array,GenericRequestHeader header, Error err) with gil: + if nd_array is not None: + if nd_array.base is not None: + Py_XDECREF(<PyObject*>nd_array.base) + else: + Py_XDECREF(<PyObject*>nd_array) + self.c_callback_python(py_callback,header,GetErrorString(&err)) + + cdef void c_callback_bytesaddr(self,py_callback,bytes_array,GenericRequestHeader header, Error err) with gil: + if bytes_array is not None: + Py_XDECREF(<PyObject*>bytes_array) + self.c_callback_python(py_callback,header,GetErrorString(&err)) + @staticmethod def create_producer(endpoint,beamtime_id,stream,token,nthreads): pyProd = PyProducer() diff --git a/producer/api/python/asapo_wrappers.h b/producer/api/python/asapo_wrappers.h index ffaa4b1844274bb2e1f77f21be93149131c6b84c..1e7c44240f6b4e5216cf098ff91017a9035562c5 100644 --- a/producer/api/python/asapo_wrappers.h +++ b/producer/api/python/asapo_wrappers.h @@ -15,6 +15,7 @@ inline std::string GetErrorString(asapo::Error* err) { } using RequestCallbackCython = void (*)(void*, void*, GenericRequestHeader header, Error err); +using RequestCallbackCythonMemory = void (*)(void*, void*, void*, GenericRequestHeader header, Error err); RequestCallback unwrap_callback(RequestCallbackCython callback, void* c_self, void* py_func) { if (py_func == NULL) { @@ -26,6 +27,14 @@ RequestCallback unwrap_callback(RequestCallbackCython callback, void* c_self, vo return wrapper; } +RequestCallback unwrap_callback_with_memory(RequestCallbackCythonMemory callback, void* c_self, void* py_func, + void* nd_array) { + RequestCallback wrapper = [ = ](GenericRequestHeader header, Error err) -> void { + callback(c_self, py_func, nd_array, header, std::move(err)); + }; + return wrapper; +} + } diff --git a/producer/api/python/setup.py.in b/producer/api/python/setup.py.in index b4ae7777e7977d6970f580afce553dc677c734a1..81fa3e5a0289f328993058039d895132e241b7e1 100644 --- a/producer/api/python/setup.py.in +++ b/producer/api/python/setup.py.in @@ -16,4 +16,7 @@ module = Extension("asapo_producer", ["asapo_producer.pyx"], ext_modules = cythonize([module]) -setup(ext_modules = ext_modules) +setup(ext_modules = ext_modules, + setup_requires=["numpy","cython>=0.28"], + install_requires=["numpy","cython>=0.28"] +) diff --git a/producer/api/python/source_dist_linux/setup.py.in b/producer/api/python/source_dist_linux/setup.py.in index 8d8509b7ac2131504ddf87d220cc8c1dac0ac765..3e92976f88cb472bf0b3ce2d9601c63a0efb4132 100644 --- a/producer/api/python/source_dist_linux/setup.py.in +++ b/producer/api/python/source_dist_linux/setup.py.in @@ -16,6 +16,6 @@ setup( name ="asapo_producer", ext_modules = ext_modules, version = "@ASAPO_VERSION_PYTHON@", - setup_requires=["numpy"], - install_requires=["numpy"] + setup_requires=["numpy","cython>=0.28"], + install_requires=["numpy","cython>=0.28"] ) diff --git a/receiver/src/request.cpp b/receiver/src/request.cpp index c5bcb69ff66e7f2a2e44ea1dc4c94890fde1af3e..553972a676ff52dcc3ce1088781b6f5a0c5cd09c 100644 --- a/receiver/src/request.cpp +++ b/receiver/src/request.cpp @@ -49,7 +49,7 @@ Error Request::ReceiveMetaData() { bool Request::NeedReceiveData() { return request_header_.data_size > 0 && - (request_header_.custom_data[asapo::kPosInjestMode] & asapo::kTransferData); + (request_header_.custom_data[asapo::kPosIngestMode] & asapo::kTransferData); } Error Request::ReceiveData() { diff --git a/receiver/src/request_factory.cpp b/receiver/src/request_factory.cpp index ef08ad35d797898660f7e0781f092d84712c6cb0..ce07ccf2b1879364bac81ae88ecd9fba8393f5a3 100644 --- a/receiver/src/request_factory.cpp +++ b/receiver/src/request_factory.cpp @@ -6,7 +6,7 @@ namespace asapo { bool NeedFileWriteHandler (const GenericRequestHeader& request_header) { return GetReceiverConfig()->write_to_disk && - (request_header.custom_data[kPosInjestMode] & IngestModeFlags::kStoreInFilesystem); + (request_header.custom_data[kPosIngestMode] & IngestModeFlags::kStoreInFilesystem); } bool NeedDbHandler (const GenericRequestHeader& request_header) { diff --git a/receiver/unittests/test_request.cpp b/receiver/unittests/test_request.cpp index 33859865a2233137f6e5cd86bf3e9cbc47759a95..ee06e4d3e6cda4dd088ef942f3bc9e11f25f2a6f 100644 --- a/receiver/unittests/test_request.cpp +++ b/receiver/unittests/test_request.cpp @@ -83,7 +83,7 @@ class RequestTests : public Test { generic_request_header.data_id = data_id_; generic_request_header.meta_size = expected_metadata_size; generic_request_header.op_code = expected_op_code; - generic_request_header.custom_data[asapo::kPosInjestMode] = asapo::kDefaultIngestMode; + generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; strcpy(generic_request_header.message, expected_request_message); request.reset(new Request{generic_request_header, socket_fd_, expected_origin_uri, nullptr}); request->io__ = std::unique_ptr<asapo::IO> {&mock_io}; @@ -149,7 +149,7 @@ TEST_F(RequestTests, HandleDoesNotReceiveEmptyData) { TEST_F(RequestTests, HandleDoesNotReceiveDataWhenMetadataOnlyWasSent) { generic_request_header.data_size = 10; - generic_request_header.custom_data[asapo::kPosInjestMode] = asapo::kTransferMetaDataOnly; + generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::kTransferMetaDataOnly; request->io__.release(); request.reset(new Request{generic_request_header, socket_fd_, "", nullptr}); request->io__ = std::unique_ptr<asapo::IO> {&mock_io};; diff --git a/receiver/unittests/test_request_factory.cpp b/receiver/unittests/test_request_factory.cpp index c9c70754f2df7bfcf5b38ef4d415229ef934f598..a5633a9bc01a776f8dce6a2d426f02ca7b23f6e4 100644 --- a/receiver/unittests/test_request_factory.cpp +++ b/receiver/unittests/test_request_factory.cpp @@ -60,7 +60,7 @@ class FactoryTests : public Test { std::string origin_uri{"origin_uri"}; void SetUp() override { generic_request_header.op_code = asapo::Opcode::kOpcodeTransferData; - generic_request_header.custom_data[asapo::kPosInjestMode] = asapo::kDefaultIngestMode; + generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; config.write_to_disk = true; config.write_to_db = true; SetReceiverConfig(config, "none"); @@ -113,7 +113,7 @@ TEST_F(FactoryTests, DoNotAddDiskWriterIfNotWantedInConfig) { } TEST_F(FactoryTests, DoNotAddDiskWriterIfNotWantedInRequest) { - generic_request_header.custom_data[asapo::kPosInjestMode] = asapo::IngestModeFlags::kTransferData; + generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::IngestModeFlags::kTransferData; auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(request->GetListHandlers().size(), Eq(2)); diff --git a/tests/manual/python_tests/producer/clean_db.sh b/tests/manual/python_tests/producer/clean_db.sh index 326ee510db2ef352129f53a9be80f846a463c988..1f89519f88aee0fe4571eb0a7defdb9487ecdd7d 100755 --- a/tests/manual/python_tests/producer/clean_db.sh +++ b/tests/manual/python_tests/producer/clean_db.sh @@ -3,4 +3,5 @@ beamtime_id=asapo_test1 -echo "db.dropDatabase()" | mongo ${beamtime_id}_detector +echo "db.dropDatabase()" | mongo ${beamtime_id}_python2 +echo "db.dropDatabase()" | mongo ${beamtime_id}_python3 diff --git a/tests/manual/python_tests/producer/file1 b/tests/manual/python_tests/producer/file1 index 9daeafb9864cf43055ae93beb0afd6c7d144bfa4..a5bce3fd2565d8f458555a0c6f42d0504a848bd5 100644 --- a/tests/manual/python_tests/producer/file1 +++ b/tests/manual/python_tests/producer/file1 @@ -1 +1 @@ -test +test1 diff --git a/tests/manual/python_tests/producer/run.sh b/tests/manual/python_tests/producer/run.sh index 62b5fec56935b8783c952a3d84d667b82dd27446..0a0f2a5a1f381db96e7701cec333e010c39a6bed 100755 --- a/tests/manual/python_tests/producer/run.sh +++ b/tests/manual/python_tests/producer/run.sh @@ -3,4 +3,7 @@ export PYTHONPATH=/home/yakubov/projects/asapo/cmake-build-debug/producer/api/py mkdir -p /tmp/asapo/receiver/files/test1/asapo_test1 -python test.py +python test.py python2 + + +python3 test.py python3 diff --git a/tests/manual/python_tests/producer/test.py b/tests/manual/python_tests/producer/test.py index 0f9ed285ed7a7ddef7de7ffb2fc7a6094218ddbb..a5960680aeec645d8ba8c5137884d96fbf77cf1e 100644 --- a/tests/manual/python_tests/producer/test.py +++ b/tests/manual/python_tests/producer/test.py @@ -3,47 +3,66 @@ from __future__ import print_function import asapo_producer import sys import time - -#import threading -#lock = threading.Lock() +import numpy as np +import threading +lock = threading.Lock() endpoint = "127.0.0.1:8400" beamtime = "asapo_test1" -stream = "" +stream = sys.argv[1] token = "" nthreads = 8 + + def callback(header,err): -# lock.acquire() # just example, don't do this if not needed + lock.acquire() # to print if err is not None: print("could not sent: ",header,err) else: print ("successfuly sent: ",header) -# lock.release() + lock.release() + +def assert_err(err): + if err is not None: + print(err) + sys.exit(1) producer, err = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads) -if err is not None: - print(err) - sys.exit(1) +assert_err(err) producer.set_log_level("info") #send single file -err = producer.send_file(1, local_path = "./file1", exposed_path = "file1", user_meta = '{"test_key":"test_val"}', callback = callback) -if err is not None: - print(err) - +err = producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback) +assert_err(err) #send subsets -#producer.send_file(1, local_path = "./file1", exposed_path = "file1"",subset=(1,2),user_meta = '{"test_key":"test_val"}', callback = callback) -#producer.send_file(1, local_path = "./file1", exposed_path = "file1",subset=(1,2),user_meta = '{"test_key":"test_val"}', callback = callback) +producer.send_file(2, local_path = "./file1", exposed_path = stream+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) +producer.send_file(3, local_path = "./file1", exposed_path = stream+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) #send meta only -err = producer.send_file(2, local_path = "./file2",exposed_path = "./file2", - injest_mode = asapo_producer.INJEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) -if err is not None: - print(err) +err = producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever", + ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) +assert_err(err) + +data = np.arange(10,dtype=np.float64) + +#send data from array +err = producer.send_data(4, stream+"/"+"file5",data, + ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) +assert_err(err) + +#send data from string +err = producer.send_data(5, stream+"/"+"file6",b"hello", + ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) +assert_err(err) + +#send metadata only +err = producer.send_data(6, stream+"/"+"file7",None, + ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) +assert_err(err) time.sleep(1)