diff --git a/3d_party/rapidjson/include/rapidjson/internal/regex.h b/3d_party/rapidjson/include/rapidjson/internal/regex.h index 02272d2afdb9190d7a69d6ffd382d9003eaebb32..79ec11b2b99ca774a6fc910a9fe6aa152019bf3c 100644 --- a/3d_party/rapidjson/include/rapidjson/internal/regex.h +++ b/3d_party/rapidjson/include/rapidjson/internal/regex.h @@ -50,7 +50,7 @@ RAPIDJSON_DIAG_OFF(switch - enum) 0); //!< Represents an invalid index in GenericRegex::State::out, out1 static const SizeType kRegexInvalidRange = ~SizeType(0); -//! Regular expression engine with subset of ECMAscript grammar. +//! Regular expression engine with dataset of ECMAscript grammar. /*! Supported regular expression syntax: - \c ab Concatenation diff --git a/3d_party/spd_log/include/spdlog/fmt/bundled/format.h b/3d_party/spd_log/include/spdlog/fmt/bundled/format.h index b23ff8b45e74d6c9ce015cbf055fecdf7ab6c3f6..3462998c30a823c668c732c3f8aaf61f8fdaa57f 100644 --- a/3d_party/spd_log/include/spdlog/fmt/bundled/format.h +++ b/3d_party/spd_log/include/spdlog/fmt/bundled/format.h @@ -680,7 +680,7 @@ inline T* make_ptr(T* ptr, std::size_t) { /** \rst - A buffer supporting a subset of ``std::vector``'s operations. + A buffer supporting a dataset of ``std::vector``'s operations. \endrst */ template <typename T> diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e1ec90109995c26b2bb9a4e8b1070a88c10b066..fb50022159f22839bc866c879ee551703d655d85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,8 @@ BREAKING CHANGES * use term `message` for blob of information we send around, rename related structs, parameters, ... #### renaming - Producer API * SendData/send_data -> Send/send -* SendFile/send_file -> SendFromFile/send_from_file +* id_in_subset -> dataset_substream +* subset_size -> dataset_size (and in general replace subset with dataset) #### renaming - Consumer API * broker -> consumer diff --git a/common/cpp/include/asapo/common/data_structs.h b/common/cpp/include/asapo/common/data_structs.h index 457e4756fe3f6acc77b674f5706d0b9bbc4b1087..ffbc2d6817444b7dd6b9f0c8b6908e5851754def 100644 --- a/common/cpp/include/asapo/common/data_structs.h +++ b/common/cpp/include/asapo/common/data_structs.h @@ -31,6 +31,7 @@ class MessageMeta { std::string source; std::string metadata; uint64_t buf_id{0}; + uint64_t dataset_substream{0}; std::string Json() const; bool SetFromJson(const std::string& json_string); std::string FullName(const std::string& base_path) const; diff --git a/common/cpp/include/asapo/common/networking.h b/common/cpp/include/asapo/common/networking.h index 7a479c2a7f262acc5a3dc9a9a17f36ee0d2f6283..bd7f5379f302f0fbd05e423b270495e699958bb0 100644 --- a/common/cpp/include/asapo/common/networking.h +++ b/common/cpp/include/asapo/common/networking.h @@ -21,7 +21,7 @@ enum class NetworkConnectionType : uint32_t { enum Opcode : uint8_t { kOpcodeUnknownOp = 1, kOpcodeTransferData, - kOpcodeTransferSubsetData, + kOpcodeTransferDatasetData, kOpcodeStreamInfo, kOpcodeLastStream, kOpcodeGetBufferData, diff --git a/common/cpp/include/asapo/database/database.h b/common/cpp/include/asapo/database/database.h index 289a4c8c09dac53b369524d4af1a3994aed9c9f7..d36b95322ffac117825b6fc21d81099ce5238aac 100644 --- a/common/cpp/include/asapo/database/database.h +++ b/common/cpp/include/asapo/database/database.h @@ -17,8 +17,8 @@ class Database { virtual Error Connect(const std::string& address, const std::string& database) = 0; virtual Error Insert(const std::string& collection, const MessageMeta& file, bool ignore_duplicates) const = 0; virtual Error Upsert(const std::string& collection, uint64_t id, const uint8_t* data, uint64_t size) const = 0; - virtual Error InsertAsSubset(const std::string& collection, const MessageMeta& file, uint64_t subset_id, - uint64_t subset_size, + virtual Error InsertAsDatasetMessage(const std::string& collection, const MessageMeta& file, + uint64_t dataset_size, bool ignore_duplicates) const = 0; virtual Error GetById(const std::string& collection, uint64_t id, MessageMeta* file) const = 0; diff --git a/common/cpp/include/asapo/unittests/MockDatabase.h b/common/cpp/include/asapo/unittests/MockDatabase.h index f3c47933d49cf6172bba3824318ec4661298d9d2..691e39af8f4b24c25216f6b5f99180dc8de4ab4d 100644 --- a/common/cpp/include/asapo/unittests/MockDatabase.h +++ b/common/cpp/include/asapo/unittests/MockDatabase.h @@ -19,9 +19,9 @@ class MockDatabase : public Database { return Error{Insert_t(collection, file, ignore_duplicates)}; } - Error InsertAsSubset(const std::string& collection, const MessageMeta& file, uint64_t subset_id, - uint64_t subset_size, bool ignore_duplicates) const override { - return Error{InsertAsSubset_t(collection, file, subset_id, subset_size, ignore_duplicates)}; + Error InsertAsDatasetMessage(const std::string& collection, const MessageMeta& file, + uint64_t dataset_size, bool ignore_duplicates) const override { + return Error{InsertAsDatasetMessage_t(collection, file, dataset_size, ignore_duplicates)}; } @@ -29,7 +29,7 @@ class MockDatabase : public Database { MOCK_CONST_METHOD3(Insert_t, ErrorInterface * (const std::string&, const MessageMeta&, bool)); - MOCK_CONST_METHOD5(InsertAsSubset_t, ErrorInterface * (const std::string&, const MessageMeta&, uint64_t, uint64_t, bool)); + MOCK_CONST_METHOD4(InsertAsDatasetMessage_t, ErrorInterface * (const std::string&, const MessageMeta&, uint64_t, bool)); Error Upsert(const std::string& collection, uint64_t id, const uint8_t* data, uint64_t size) const override { diff --git a/common/cpp/src/data_structs/data_structs.cpp b/common/cpp/src/data_structs/data_structs.cpp index 2c9b7deedb067c9f6822c3878b39b055a0a6f2d4..fc7882c29dda4c1a5add7578ca45922fb159a03b 100644 --- a/common/cpp/src/data_structs/data_structs.cpp +++ b/common/cpp/src/data_structs/data_structs.cpp @@ -62,8 +62,8 @@ std::string MessageMeta::Json() const { "\"timestamp\":" + std::to_string(nanoseconds_from_epoch) + "," "\"source\":\"" + source + "\"," - "\"buf_id\":" + std::to_string(buf_id_int) - + "," + "\"buf_id\":" + std::to_string(buf_id_int) + "," + "\"dataset_substream\":" + std::to_string(dataset_substream) + "," "\"meta\":" + (metadata.size() == 0 ? std::string("{}") : metadata) + "}"; return s; @@ -119,6 +119,7 @@ bool MessageMeta::SetFromJson(const std::string &json_string) { parser.GetString("name", &name) || parser.GetString("source", &source) || parser.GetUInt64("buf_id", &buf_id) || + parser.GetUInt64("dataset_substream", &dataset_substream) || parser.Embedded("meta").GetRawString(&metadata) || !TimeFromJson(parser, "timestamp", ×tamp)) { *this = old; diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp index 5abe197d20a708c0475ce281c7ccae6c1be89ce5..442d20b1d27587a500b4aae929323fa8fe8e353e 100644 --- a/common/cpp/src/database/mongodb_client.cpp +++ b/common/cpp/src/database/mongodb_client.cpp @@ -237,9 +237,8 @@ Error MongoDBClient::AddBsonDocumentToArray(bson_t* query, bson_t* update, bool return err; } -Error MongoDBClient::InsertAsSubset(const std::string &collection, const MessageMeta &file, - uint64_t subset_id, - uint64_t subset_size, +Error MongoDBClient::InsertAsDatasetMessage(const std::string &collection, const MessageMeta &file, + uint64_t dataset_size, bool ignore_duplicates) const { if (!connected_) { return DBErrorTemplates::kNotConnected.Generate(); @@ -252,10 +251,10 @@ Error MongoDBClient::InsertAsSubset(const std::string &collection, const Message if (err) { return err; } - auto query = BCON_NEW ("$and", "[", "{", "_id", BCON_INT64(subset_id), "}", "{", "messages._id", "{", "$ne", - BCON_INT64(file.id), "}", "}", "]"); + auto query = BCON_NEW ("$and", "[", "{", "_id", BCON_INT64(file.id), "}", "{", "messages.dataset_substream", "{", "$ne", + BCON_INT64(file.dataset_substream), "}", "}", "]"); auto update = BCON_NEW ("$setOnInsert", "{", - "size", BCON_INT64(subset_size), + "size", BCON_INT64(dataset_size), "timestamp", BCON_INT64((int64_t) NanosecsEpochFromTimePoint(file.timestamp)), "}", "$addToSet", "{", @@ -348,7 +347,7 @@ Error MongoDBClient::GetDataSetById(const std::string &collection, uint64_t id_i } for (const auto &message_meta : dataset.content) { - if (message_meta.id == id_in_set) { + if (message_meta.dataset_substream == id_in_set) { *file = message_meta; return nullptr; } diff --git a/common/cpp/src/database/mongodb_client.h b/common/cpp/src/database/mongodb_client.h index ad78011173e875f44b3efcd17b5a61f1a4f1f15b..a1b9bb5ef3a306006439170761a60ab86bfc6fed 100644 --- a/common/cpp/src/database/mongodb_client.h +++ b/common/cpp/src/database/mongodb_client.h @@ -44,7 +44,7 @@ class MongoDBClient final : public Database { MongoDBClient(); Error Connect(const std::string& address, const std::string& database) override; Error Insert(const std::string& collection, const MessageMeta& file, bool ignore_duplicates) const override; - Error InsertAsSubset(const std::string& collection, const MessageMeta& file, uint64_t subset_id, uint64_t subset_size, + Error InsertAsDatasetMessage(const std::string& collection, const MessageMeta& file, uint64_t dataset_size, bool ignore_duplicates) const override; Error Upsert(const std::string& collection, uint64_t id, const uint8_t* data, uint64_t size) const override; Error GetById(const std::string& collection, uint64_t id, MessageMeta* file) const override; diff --git a/common/cpp/unittests/data_structs/test_data_structs.cpp b/common/cpp/unittests/data_structs/test_data_structs.cpp index 49ba236dfb244eaf37a28d58cdc70b31ede1f0e8..c801a2324c58cdd8c278c66d867abc7c370b3646 100644 --- a/common/cpp/unittests/data_structs/test_data_structs.cpp +++ b/common/cpp/unittests/data_structs/test_data_structs.cpp @@ -29,6 +29,7 @@ MessageMeta PrepareMessageMeta() { MessageMeta message_meta; message_meta.size = 100; message_meta.id = 1; + message_meta.dataset_substream = 3; message_meta.name = std::string("folder") + asapo::kPathSeparator + "test"; message_meta.source = "host:1234"; message_meta.buf_id = big_uint; @@ -42,6 +43,7 @@ TEST(MessageMetaTests, Defaults) { ASSERT_THAT(message_meta.buf_id, Eq(0)); ASSERT_THAT(message_meta.id, Eq(0)); + ASSERT_THAT(message_meta.dataset_substream, Eq(0)); } @@ -50,10 +52,10 @@ TEST(MessageMetaTests, CorrectConvertToJson) { std::string json = message_meta.Json(); if (asapo::kPathSeparator == '/') { ASSERT_THAT(json, Eq( - R"({"_id":1,"size":100,"name":"folder/test","timestamp":1000000,"source":"host:1234","buf_id":-1,"meta":{"bla":10}})")); + R"({"_id":1,"size":100,"name":"folder/test","timestamp":1000000,"source":"host:1234","buf_id":-1,"dataset_substream":3,"meta":{"bla":10}})")); } else { ASSERT_THAT(json, Eq( - R"({"_id":1,"size":100,"name":"folder\\test","timestamp":1000000,"source":"host:1234","buf_id":-1,"meta":{"bla":10}})")); + R"({"_id":1,"size":100,"name":"folder\\test","timestamp":1000000,"source":"host:1234","buf_id":-1,,"dataset_substream":3,"meta":{"bla":10}})")); } } @@ -103,6 +105,7 @@ TEST(MessageMetaTests, CorrectConvertFromJson) { ASSERT_THAT(result.buf_id, Eq(message_meta.buf_id)); ASSERT_THAT(result.source, Eq(message_meta.source)); ASSERT_THAT(result.metadata, Eq(message_meta.metadata)); + ASSERT_THAT(result.dataset_substream, Eq(message_meta.dataset_substream)); } diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index 913246ab29b58c438917ed85072014c84ccac7c0..bc343dfbc809ed3452a040b47e5cba35b3cc37d6 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -181,7 +181,7 @@ class Consumer { //! Get all messages matching the query. /*! - \param sql_query - query string in SQL format. Limit subset is supported + \param sql_query - query string in SQL format. Limit dataset is supported \param err - will be set in case of error, nullptr otherwise \return vector of message metadata matchiing to specified query. Empty if nothing found or error */ diff --git a/examples/pipeline/in_to_out_python/in_to_out.py b/examples/pipeline/in_to_out_python/in_to_out.py index 45943a99b129afa6b3c39ef4b104022d5954ea4d..0e58c1b0a0daf249ca960344e3088b177de570dd 100644 --- a/examples/pipeline/in_to_out_python/in_to_out.py +++ b/examples/pipeline/in_to_out_python/in_to_out.py @@ -44,7 +44,7 @@ while True: data, meta = consumer.get_next(group_id, meta_only=not transfer_data) print ("received: ",meta) n_recv = n_recv + 1 - producer.send_data(meta['_id'],meta['name']+"_"+stream_out ,data, + producer.send(meta['_id'],meta['name']+"_"+stream_out ,data, ingest_mode = ingest_mode, callback = callback) except asapo_consumer.AsapoEndOfStreamError: break diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index b44eae6a84f8fd6f55c53eac43179523c0905f6b..a23c2ed08c080c4efe8c59e17c5e8a51cacf75ce 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -163,8 +163,8 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it } else { for (uint64_t id = 0; id < messages_in_set; id++) { auto buffer = CreateMemoryBuffer(number_of_byte); - message_header.id_in_subset = id + 1; - message_header.subset_size = messages_in_set; + message_header.dataset_substream = id + 1; + message_header.dataset_size = messages_in_set; message_header.message_id = i + 1; message_header.file_name = std::to_string(i + 1) + "_" + std::to_string(id + 1); if (!data_source.empty()) { diff --git a/producer/api/cpp/include/asapo/producer/common.h b/producer/api/cpp/include/asapo/producer/common.h index 07dd2a110b9733876c2533225aa362ed565e6d49..5b2da89b06ec02a429928c484e516f1be3c1025b 100644 --- a/producer/api/cpp/include/asapo/producer/common.h +++ b/producer/api/cpp/include/asapo/producer/common.h @@ -30,19 +30,19 @@ struct MessageHeader { MessageHeader() {}; MessageHeader(uint64_t message_id_i, uint64_t data_size_i, std::string file_name_i, std::string user_metadata_i = "", - uint64_t id_in_subset_i = 0, - uint64_t subset_size_i = 0 ): + uint64_t dataset_substream_i = 0, + uint64_t dataset_size_i = 0 ): message_id{message_id_i}, data_size{data_size_i}, file_name{std::move(file_name_i)}, user_metadata{std::move(user_metadata_i)}, - id_in_subset{id_in_subset_i}, - subset_size{subset_size_i} {}; + dataset_substream{dataset_substream_i}, + dataset_size{dataset_size_i} {}; uint64_t message_id = 0; uint64_t data_size = 0; std::string file_name; std::string user_metadata; - uint64_t id_in_subset = 0; - uint64_t subset_size = 0; + uint64_t dataset_substream = 0; + uint64_t dataset_size = 0; }; } diff --git a/producer/api/cpp/include/asapo/producer/producer.h b/producer/api/cpp/include/asapo/producer/producer.h index db73edf62ef7123ad5cd510f25a3cbf563d7f7a2..66b0a5b1574111acf0b1e8ecfd36e4eb15083e05 100644 --- a/producer/api/cpp/include/asapo/producer/producer.h +++ b/producer/api/cpp/include/asapo/producer/producer.h @@ -80,7 +80,7 @@ class Producer { \param full_path - A full path of the file to send \return Error - Will be nullptr on success */ - virtual Error SendFromFile(const MessageHeader& message_header, std::string full_path, uint64_t ingest_mode, + virtual Error SendFile(const MessageHeader& message_header, std::string full_path, uint64_t ingest_mode, RequestCallback callback) = 0; //! Sends message from a file to a stream @@ -89,7 +89,7 @@ class Producer { \param full_path - A full path of the file to send \return Error - Will be nullptr on success */ - virtual Error SendFromFile(const MessageHeader& message_header, std::string stream, std::string full_path, + virtual Error SendFile(const MessageHeader& message_header, std::string stream, std::string full_path, uint64_t ingest_mode, RequestCallback callback) = 0; diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index 72f2ee4a37ed8aeb6dc9a272fd72ade73a3ab989..76bf9b4b20210268cddb6c9bef70fc72728595f7 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -38,10 +38,10 @@ GenericRequestHeader ProducerImpl::GenerateNextSendRequest(const MessageHeader& uint64_t ingest_mode) { GenericRequestHeader request{kOpcodeTransferData, message_header.message_id, message_header.data_size, message_header.user_metadata.size(), message_header.file_name, stream}; - if (message_header.id_in_subset != 0) { - request.op_code = kOpcodeTransferSubsetData; - request.custom_data[kPosDataSetId] = message_header.id_in_subset; - request.custom_data[kPosDataSetSize] = message_header.subset_size; + if (message_header.dataset_substream != 0) { + request.op_code = kOpcodeTransferDatasetData; + request.custom_data[kPosDataSetId] = message_header.dataset_substream; + request.custom_data[kPosDataSetSize] = message_header.dataset_size; } request.custom_data[kPosIngestMode] = ingest_mode; return request; @@ -80,8 +80,8 @@ Error CheckProducerRequest(const MessageHeader& message_header, uint64_t ingest_ return ProducerErrorTemplates::kWrongInput.Generate("empty filename"); } - if (message_header.id_in_subset > 0 && message_header.subset_size == 0) { - return ProducerErrorTemplates::kWrongInput.Generate("subset dimensions"); + if (message_header.dataset_substream > 0 && message_header.dataset_size == 0) { + return ProducerErrorTemplates::kWrongInput.Generate("dataset dimensions"); } if (message_header.message_id == 0) { @@ -161,9 +161,9 @@ Error ProducerImpl::SendStreamFinishedFlag(std::string stream, uint64_t last_id, return Send(message_header, std::move(stream), nullptr, "", IngestModeFlags::kTransferMetaDataOnly, callback, true); } -Error ProducerImpl::SendFromFile(const MessageHeader& message_header, std::string full_path, uint64_t ingest_mode, +Error ProducerImpl::SendFile(const MessageHeader& message_header, std::string full_path, uint64_t ingest_mode, RequestCallback callback) { - return SendFromFile(message_header, kDefaultStream, std::move(full_path), ingest_mode, callback); + return SendFile(message_header, kDefaultStream, std::move(full_path), ingest_mode, callback); } @@ -262,7 +262,7 @@ Error ProducerImpl::WaitRequestsFinished(uint64_t timeout_ms) { void ProducerImpl::StopThreads__() { request_pool__->StopThreads(); } -Error ProducerImpl::SendFromFile(const MessageHeader& message_header, +Error ProducerImpl::SendFile(const MessageHeader& message_header, std::string stream, std::string full_path, uint64_t ingest_mode, diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h index 18e952f700458c7f0fc75b1cd5065f622b0c70b7..3e04b58e2f29847d1982d01ede072383162ae6fd 100644 --- a/producer/api/cpp/src/producer_impl.h +++ b/producer/api/cpp/src/producer_impl.h @@ -50,9 +50,9 @@ class ProducerImpl : public Producer { Error Send__(const MessageHeader &message_header, std::string stream, void* data, uint64_t ingest_mode, RequestCallback callback) override; void StopThreads__() override; - Error SendFromFile(const MessageHeader &message_header, std::string full_path, uint64_t ingest_mode, + Error SendFile(const MessageHeader &message_header, std::string full_path, uint64_t ingest_mode, RequestCallback callback) override; - Error SendFromFile(const MessageHeader &message_header, std::string stream, std::string full_path, uint64_t ingest_mode, + Error SendFile(const MessageHeader &message_header, std::string stream, std::string full_path, uint64_t ingest_mode, RequestCallback callback) override; Error SendStreamFinishedFlag(std::string stream, uint64_t last_id, std::string next_stream, diff --git a/producer/api/cpp/src/producer_request.cpp b/producer/api/cpp/src/producer_request.cpp index 7cc63b4f1aeaac497e939fd83187abe17af20b10..73a64c2a61052aae9d9cc68f4c717b77591e99ef 100644 --- a/producer/api/cpp/src/producer_request.cpp +++ b/producer/api/cpp/src/producer_request.cpp @@ -27,7 +27,7 @@ ProducerRequest::ProducerRequest(std::string source_credentials, } bool ProducerRequest::NeedSend() const { - if (header.op_code == kOpcodeTransferData || header.op_code == kOpcodeTransferSubsetData) { + if (header.op_code == kOpcodeTransferData || header.op_code == kOpcodeTransferDatasetData) { return header.custom_data[kPosIngestMode] & IngestModeFlags::kTransferData; } return true; diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index 09f6545f3b6a98fbeae53c108b9ebf3559872765..089dae54be6ed9efd476d179b2e8bdf538640871 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -32,8 +32,8 @@ using asapo::ProducerRequest; MATCHER_P10(M_CheckSendRequest, op_code, source_credentials, metadata, file_id, file_size, message, stream, ingest_mode, - subset_id, - subset_size, + dataset_id, + dataset_size, "Checks if a valid GenericRequestHeader was Send") { auto request = static_cast<ProducerRequest*>(arg); return ((asapo::GenericRequestHeader) (arg->header)).op_code == op_code @@ -42,10 +42,10 @@ MATCHER_P10(M_CheckSendRequest, op_code, source_credentials, metadata, file_id, && request->manage_data_memory == true && request->source_credentials == source_credentials && request->metadata == metadata - && (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[1] - == uint64_t(subset_id) : true) - && (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[2] - == uint64_t(subset_size) : true) + && (op_code == asapo::kOpcodeTransferDatasetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[1] + == uint64_t(dataset_id) : true) + && (op_code == asapo::kOpcodeTransferDatasetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[2] + == uint64_t(dataset_size) : true) && ((asapo::GenericRequestHeader) (arg->header)).custom_data[asapo::kPosIngestMode] == uint64_t(ingest_mode) && strcmp(((asapo::GenericRequestHeader) (arg->header)).message, message) == 0 && strcmp(((asapo::GenericRequestHeader) (arg->header)).stream, stream) == 0; @@ -66,8 +66,8 @@ class ProducerImplTests : public testing::Test { asapo::ProducerImpl producer{"", 1, 3600000, asapo::RequestHandlerType::kTcp}; uint64_t expected_size = 100; uint64_t expected_id = 10; - uint64_t expected_subset_id = 100; - uint64_t expected_subset_size = 4; + uint64_t expected_dataset_id = 100; + uint64_t expected_dataset_size = 4; uint64_t expected_ingest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly; char expected_name[asapo::kMaxMessageSize] = "test_name"; @@ -118,8 +118,8 @@ TEST_F(ProducerImplTests, ErrorIfFileEmpty) { ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } -TEST_F(ProducerImplTests, ErrorIfSubsetSizeNotDefined) { - EXPECT_CALL(mock_logger, Error(testing::HasSubstr("subset dimensions"))); +TEST_F(ProducerImplTests, ErrorIfDatasetSizeNotDefined) { + EXPECT_CALL(mock_logger, Error(testing::HasSubstr("dataset dimensions"))); asapo::MessageHeader message_header{1, 1000, "test", "", 1}; auto err = producer.Send(message_header, nullptr, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); @@ -275,9 +275,9 @@ TEST_F(ProducerImplTests, OKSendingStreamFinishWithNoNextStream) { ASSERT_THAT(err, Eq(nullptr)); } -TEST_F(ProducerImplTests, OKSendingSendSubsetDataRequest) { +TEST_F(ProducerImplTests, OKSendingSendDatasetDataRequest) { producer.SetCredentials(expected_credentials); - EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendRequest(asapo::kOpcodeTransferSubsetData, + EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendRequest(asapo::kOpcodeTransferDatasetData, expected_credentials_str, expected_metadata, expected_id, @@ -285,13 +285,13 @@ TEST_F(ProducerImplTests, OKSendingSendSubsetDataRequest) { expected_name, asapo::kDefaultStream.c_str(), expected_ingest_mode, - expected_subset_id, - expected_subset_size), false)).WillOnce( + expected_dataset_id, + expected_dataset_size), false)).WillOnce( Return( nullptr)); asapo::MessageHeader message_header - {expected_id, expected_size, expected_name, expected_metadata, expected_subset_id, expected_subset_size}; + {expected_id, expected_size, expected_name, expected_metadata, expected_dataset_id, expected_dataset_size}; auto err = producer.Send(message_header, nullptr, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(nullptr)); @@ -327,7 +327,7 @@ TEST_F(ProducerImplTests, ErrorSendingEmptyFileName) { EXPECT_CALL(mock_pull, AddRequest_t(_, _)).Times(0); asapo::MessageHeader message_header{expected_id, 0, expected_name}; - auto err = producer.SendFromFile(message_header, "", expected_ingest_mode, nullptr); + auto err = producer.SendFile(message_header, "", expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); @@ -339,7 +339,7 @@ TEST_F(ProducerImplTests, ErrorSendingEmptyRelativeFileName) { EXPECT_CALL(mock_pull, AddRequest_t(_, _)).Times(0); asapo::MessageHeader message_header{expected_id, 0, ""}; - auto err = producer.SendFromFile(message_header, expected_fullpath, expected_ingest_mode, nullptr); + auto err = producer.SendFile(message_header, expected_fullpath, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); @@ -361,7 +361,7 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequest) { nullptr)); asapo::MessageHeader message_header{expected_id, 0, expected_name}; - auto err = producer.SendFromFile(message_header, expected_fullpath, expected_ingest_mode, nullptr); + auto err = producer.SendFile(message_header, expected_fullpath, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(nullptr)); } @@ -383,7 +383,7 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequestWithStream) { asapo::MessageHeader message_header{expected_id, 0, expected_name}; auto err = - producer.SendFromFile(message_header, expected_stream, expected_fullpath, expected_ingest_mode, nullptr); + producer.SendFile(message_header, expected_stream, expected_fullpath, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(nullptr)); } @@ -421,7 +421,7 @@ TEST_F(ProducerImplTests, ErrorSendingWrongIngestMode) { EXPECT_CALL(mock_pull, AddRequest_t(_, _)).Times(0); for (auto ingest_mode : ingest_modes) { - auto err = producer.SendFromFile(message_header, expected_fullpath, ingest_mode, nullptr); + auto err = producer.SendFile(message_header, expected_fullpath, ingest_mode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } diff --git a/producer/api/cpp/unittests/test_request_handler_tcp.cpp b/producer/api/cpp/unittests/test_request_handler_tcp.cpp index d9f4a155e786944579d54840048d4f3d5d2136a6..25a88248bb4b2b3ed107704870670260d9a1699e 100644 --- a/producer/api/cpp/unittests/test_request_handler_tcp.cpp +++ b/producer/api/cpp/unittests/test_request_handler_tcp.cpp @@ -132,8 +132,8 @@ class RequestHandlerTcpTests : public testing::Test { void ExpectOKSendAll(bool only_once); void ExpectGetFileSize(bool ok); void ExpectOKSend(bool only_once = false); - void ExpectOKSendFromFile(bool only_once = false); - void ExpectFailSendFromFile(const asapo::ProducerErrorTemplate &err_template, bool client_error = false); + void ExpectOKSendFile(bool only_once = false); + void ExpectFailSendFile(const asapo::ProducerErrorTemplate &err_template, bool client_error = false); void ExpectOKSendMetaData(bool only_once = false); void ExpectFailReceive(bool only_once = false); void ExpectOKReceive(bool only_once = true, asapo::NetworkErrorCode code = asapo::kNetErrorNoError, @@ -302,7 +302,7 @@ void RequestHandlerTcpTests::ExpectFailSendHeader(bool only_once) { } } -void RequestHandlerTcpTests::ExpectFailSendFromFile(const asapo::ProducerErrorTemplate &err_template, bool client_error) { +void RequestHandlerTcpTests::ExpectFailSendFile(const asapo::ProducerErrorTemplate &err_template, bool client_error) { int i = 0; for (auto expected_sd : expected_sds) { EXPECT_CALL(mock_io, SendFile_t(expected_sd, expected_origin_fullpath, (size_t) expected_file_size)) @@ -430,7 +430,7 @@ void RequestHandlerTcpTests::ExpectOKSend(bool only_once) { ExpectOKSend(expected_file_size, only_once); } -void RequestHandlerTcpTests::ExpectOKSendFromFile(bool only_once) { +void RequestHandlerTcpTests::ExpectOKSendFile(bool only_once) { for (auto expected_sd : expected_sds) { EXPECT_CALL(mock_io, SendFile_t(expected_sd, expected_origin_fullpath, (size_t) expected_file_size)) .Times(1) @@ -772,13 +772,13 @@ TEST_F(RequestHandlerTcpTests, SendEmptyCallBack) { ASSERT_THAT(retry, Eq(false)); } -TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFromFileWithReadError) { +TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFileWithReadError) { ExpectGetFileSize(true); ExpectOKConnect(true); ExpectOKAuthorize(true); ExpectOKSendHeader(true); ExpectOKSendMetaData(true); - ExpectFailSendFromFile(asapo::ProducerErrorTemplates::kLocalIOError, true); + ExpectFailSendFile(asapo::ProducerErrorTemplates::kLocalIOError, true); request_handler.PrepareProcessingRequestLocked(); auto success = request_handler.ProcessRequestUnlocked(&request_filesend, &retry); @@ -790,13 +790,13 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFromFileWithReadError) { } -TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFromFileWithServerError) { +TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFileWithServerError) { ExpectGetFileSize(true); ExpectOKConnect(); ExpectOKAuthorize(); ExpectOKSendHeader(); ExpectOKSendMetaData(); - ExpectFailSendFromFile(asapo::ProducerErrorTemplates::kInternalServerError); + ExpectFailSendFile(asapo::ProducerErrorTemplates::kInternalServerError); request_handler.PrepareProcessingRequestLocked(); auto success = request_handler.ProcessRequestUnlocked(&request_filesend, &retry); @@ -839,7 +839,7 @@ TEST_F(RequestHandlerTcpTests, FileRequestOK) { ExpectOKAuthorize(true); ExpectOKSendHeader(true); ExpectOKSendMetaData(true); - ExpectOKSendFromFile(true); + ExpectOKSendFile(true); ExpectOKReceive(true, asapo::kNetErrorNoError, expected_response); request_handler.PrepareProcessingRequestLocked(); diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index c09a2d7df70effa0aa69f6817839d3fc5f2693b7..78879b01d7022d2e786895f07b6c05bca4b176a8 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -65,8 +65,8 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo": uint64_t data_size string file_name string user_metadata - uint64_t id_in_subset - uint64_t subset_size + uint64_t dataset_substream + uint64_t dataset_size cdef extern from "asapo/asapo_producer.h" namespace "asapo": struct GenericRequestHeader: @@ -95,7 +95,7 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil: cppclass Producer: @staticmethod unique_ptr[Producer] Create(string endpoint,uint8_t nthreads,RequestHandlerType type, SourceCredentials source,uint64_t timeout_ms, Error* error) - Error SendFromFile(const MessageHeader& message_header, string stream, string full_path, uint64_t ingest_mode,RequestCallback callback) + Error SendFile(const MessageHeader& message_header, string stream, string full_path, uint64_t ingest_mode,RequestCallback callback) Error Send__(const MessageHeader& message_header, string stream, void* data, uint64_t ingest_mode,RequestCallback callback) void StopThreads__() void SetLogLevel(LogLevel level) diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index d6322ae309a2b45a35bebfb998223cf926b29651..6a7e9f64e6138b0f6b1009652f2bd46a381a0bdb 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -107,8 +107,8 @@ cdef class PyProducer: return self.c_producer.get().SetLogLevel(log_level) - def __send_np_array(self, id, exposed_path,data, user_meta=None,subset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None): - cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,subset,ingest_mode) + def __send_np_array(self, id, exposed_path,data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None): + cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode) if data is None: message_header.data_size = 0 else: @@ -126,21 +126,21 @@ cdef class PyProducer: if callback != None: Py_XINCREF(<PyObject*>callback) return - cdef MessageHeader create_message_header(self,uint64_t id, exposed_path,user_meta,subset,ingest_mode): + cdef MessageHeader create_message_header(self,uint64_t id, exposed_path,user_meta,dataset,ingest_mode): cdef MessageHeader message_header message_header.message_id = id message_header.file_name = _bytes(exposed_path) message_header.user_metadata = _bytes(user_meta) if user_meta!=None else "" - if subset == None: - message_header.id_in_subset = 0 - message_header.subset_size = 0 + if dataset == None: + message_header.dataset_substream = 0 + message_header.dataset_size = 0 else: - message_header.id_in_subset = subset[0] - message_header.subset_size = subset[1] + message_header.dataset_substream = dataset[0] + message_header.dataset_size = dataset[1] return message_header - def __send_bytes(self, id, exposed_path,data, user_meta=None,subset=None, stream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None): - cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,subset,ingest_mode) + def __send_bytes(self, id, exposed_path,data, user_meta=None,dataset=None, stream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None): + cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode) message_header.data_size = len(data) err = self.c_producer.get().Send__(message_header,_bytes(stream), data_pointer_bytes(data), ingest_mode, unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr, @@ -152,7 +152,7 @@ cdef class PyProducer: Py_XINCREF(<PyObject*>callback) return - def send_data(self, uint64_t id, exposed_path, data, user_meta=None, subset=None, stream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None): + def send(self, uint64_t id, exposed_path, data, user_meta=None, dataset=None, stream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None): """ :param id: unique data id :type id: int @@ -162,8 +162,8 @@ cdef class PyProducer: :type data: contiguous numpy or bytes array, can be None for INGEST_MODE_TRANSFER_METADATA_ONLY ingest mode :param user_meta: user metadata, default None :type user_meta: JSON string - :param subset: a tuple with two int values (id in subset, subset size), default None - :type subset: tuple + :param dataset: a tuple with two int values (dataset substream id, amount of dataset substreams), default None + :type dataset: tuple :param stream: stream name, default "default" :type stream: string :param ingest_mode: ingest mode flag @@ -175,9 +175,9 @@ cdef class PyProducer: AsapoProducerError: actually should not happen """ if type(data) == np.ndarray or data == None: - self.__send_np_array(id,exposed_path,data,user_meta,subset,stream,ingest_mode,callback) + self.__send_np_array(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback) elif type(data) == bytes: - self.__send_bytes(id,exposed_path,data,user_meta,subset,stream,ingest_mode,callback) + self.__send_bytes(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback) else: raise(AsapoProducerError("wrong data type: " + str(type(data)))) def send_stream_finished_flag(self, stream, uint64_t last_id, next_stream = None, callback = None): @@ -234,7 +234,7 @@ cdef class PyProducer: if err: throw_exception(err) return json.loads(_str(info.Json(True))) - def send_file(self, uint64_t id, local_path, exposed_path, user_meta=None, subset=None, stream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None): + def send_file(self, uint64_t id, local_path, exposed_path, user_meta=None, dataset=None, stream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None): """ :param id: unique data id :type id: int @@ -244,8 +244,8 @@ cdef class PyProducer: :type exposed_path: string :param user_meta: user metadata, default None :type user_meta: JSON string - :param subset: a tuple with two int values (subset id, subset size), default None - :type subset: tuple + :param dataset: a tuple with two int values (dataset id, dataset size), default None + :type dataset: tuple :param stream: stream name, default "default" :type stream: string :param ingest_mode: ingest mode flag @@ -258,9 +258,9 @@ cdef class PyProducer: AsapoProducerError: actually should not happen """ - cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,subset,ingest_mode) + cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode) message_header.data_size = 0 - err = self.c_producer.get().SendFromFile(message_header, _bytes(stream), _bytes(local_path), ingest_mode, + err = self.c_producer.get().SendFile(message_header, _bytes(stream), _bytes(local_path), ingest_mode, unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL)) if err: throw_exception(err) diff --git a/producer/event_monitor_producer/src/eventmon_config.cpp b/producer/event_monitor_producer/src/eventmon_config.cpp index eeed35561107b3c3cba05380324d9ec37d257ff3..d3d403950319b4713b90194c3276ad60b96839cb 100644 --- a/producer/event_monitor_producer/src/eventmon_config.cpp +++ b/producer/event_monitor_producer/src/eventmon_config.cpp @@ -11,30 +11,30 @@ EventMonConfigFactory::EventMonConfigFactory() : io__{GenerateDefaultIO()} { } -Error SubsetModeToEnum(const std::string& mode_str, SubSetMode* mode) { +Error DatasetModeToEnum(const std::string& mode_str, DatasetMode* mode) { if (mode_str == "batch") { - *mode = SubSetMode::kBatch; + *mode = DatasetMode::kBatch; return nullptr; } if (mode_str == "none") { - *mode = SubSetMode::kNone; + *mode = DatasetMode::kNone; return nullptr; } if (mode_str == "multisource") { - *mode = SubSetMode::kMultiSource; + *mode = DatasetMode::kMultiSource; return nullptr; } - return TextError("Wrone subset mode:" + mode_str); + return TextError("Wrone dataset mode:" + mode_str); } Error EventMonConfigFactory::ParseConfigFile(std::string file_name) { JsonFileParser parser(file_name, &io__); Error err = nullptr; - std::string subset_mode; + std::string dataset_mode; (err = parser.GetString("AsapoEndpoint", &config.asapo_endpoint)) || (err = parser.GetString("Tag", &config.tag)) || @@ -48,19 +48,19 @@ Error EventMonConfigFactory::ParseConfigFile(std::string file_name) { (err = parser.GetArrayString("MonitoredSubFolders", &config.monitored_subfolders)) || (err = parser.GetArrayString("IgnoreExtensions", &config.ignored_extensions)) || (err = parser.GetArrayString("WhitelistExtensions", &config.whitelisted_extensions)) || - (err = parser.Embedded("Subset").GetString("Mode", &subset_mode)) || - (err = SubsetModeToEnum(subset_mode, &config.subset_mode)); + (err = parser.Embedded("Dataset").GetString("Mode", &dataset_mode)) || + (err = DatasetModeToEnum(dataset_mode, &config.dataset_mode)); if (err) { return err; } - if (config.subset_mode == SubSetMode::kBatch) { - err = parser.Embedded("Subset").GetUInt64("BatchSize", &config.subset_batch_size); + if (config.dataset_mode == DatasetMode::kBatch) { + err = parser.Embedded("Dataset").GetUInt64("BatchSize", &config.dataset_batch_size); } - if (config.subset_mode == SubSetMode::kMultiSource) { - err = parser.Embedded("Subset").GetUInt64("NSources", &config.subset_multisource_nsources); - err = parser.Embedded("Subset").GetUInt64("SourceId", &config.subset_multisource_sourceid); + if (config.dataset_mode == DatasetMode::kMultiSource) { + err = parser.Embedded("Dataset").GetUInt64("NSources", &config.dataset_multisource_nsources); + err = parser.Embedded("Dataset").GetUInt64("SourceId", &config.dataset_multisource_sourceid); } @@ -73,7 +73,7 @@ Error EventMonConfigFactory::CheckConfig() { (err = CheckMode()) || (err = CheckLogLevel()) || (err = CheckNThreads()) || - (err = CheckSubsets()); + (err = CheckDatasets()); //todo: check monitored folders exist? return err; @@ -113,13 +113,13 @@ Error EventMonConfigFactory::CheckNThreads() { return nullptr; } -Error EventMonConfigFactory::CheckSubsets() { - if (config.subset_mode == SubSetMode::kBatch && config.subset_batch_size < 1) { +Error EventMonConfigFactory::CheckDatasets() { + if (config.dataset_mode == DatasetMode::kBatch && config.dataset_batch_size < 1) { return TextError("Batch size should > 0"); } - if (config.subset_mode == SubSetMode::kMultiSource && config.subset_multisource_nsources < 1) { + if (config.dataset_mode == DatasetMode::kMultiSource && config.dataset_multisource_nsources < 1) { return TextError("Number of sources size should be > 0"); } diff --git a/producer/event_monitor_producer/src/eventmon_config.h b/producer/event_monitor_producer/src/eventmon_config.h index 0fe7065e745734ed46e87e6a560a3100fd8b48e5..3f404ed36b7bf5abf58969f15acfe1ebbe033958 100644 --- a/producer/event_monitor_producer/src/eventmon_config.h +++ b/producer/event_monitor_producer/src/eventmon_config.h @@ -9,7 +9,7 @@ namespace asapo { -enum class SubSetMode { +enum class DatasetMode { kNone, kBatch, kMultiSource @@ -27,10 +27,10 @@ struct EventMonConfig { std::vector<std::string> ignored_extensions; std::vector<std::string> whitelisted_extensions; bool remove_after_send = false; - SubSetMode subset_mode = SubSetMode::kNone; - uint64_t subset_batch_size = 1; - uint64_t subset_multisource_nsources = 1; - uint64_t subset_multisource_sourceid = 1; + DatasetMode dataset_mode = DatasetMode::kNone; + uint64_t dataset_batch_size = 1; + uint64_t dataset_multisource_nsources = 1; + uint64_t dataset_multisource_sourceid = 1; std::string data_source; private: std::string log_level_str; diff --git a/producer/event_monitor_producer/src/eventmon_config_factory.h b/producer/event_monitor_producer/src/eventmon_config_factory.h index 0c50df1965acfe00353357dae8602ba5b6d79395..7697238fb7822321ca87c1cdb25e168eb92b7dcf 100644 --- a/producer/event_monitor_producer/src/eventmon_config_factory.h +++ b/producer/event_monitor_producer/src/eventmon_config_factory.h @@ -16,7 +16,7 @@ class EventMonConfigFactory { Error ParseConfigFile(std::string file_name); Error CheckMode(); Error CheckLogLevel(); - Error CheckSubsets(); + Error CheckDatasets(); Error CheckNThreads(); Error CheckConfig(); }; diff --git a/producer/event_monitor_producer/src/main_eventmon.cpp b/producer/event_monitor_producer/src/main_eventmon.cpp index 091c8b7df85da3e1d01240d124a634c012fdab63..3450ded0b6d4ad24260df3d47c687ab3ef8dff41 100644 --- a/producer/event_monitor_producer/src/main_eventmon.cpp +++ b/producer/event_monitor_producer/src/main_eventmon.cpp @@ -76,18 +76,18 @@ void SignalHandler(int signal) { } -void HandleSubsets(asapo::MessageHeader* header) { - switch (GetEventMonConfig()->subset_mode) { - case asapo::SubSetMode::kNone: +void HandleDatasets(asapo::MessageHeader* header) { + switch (GetEventMonConfig()->dataset_mode) { + case asapo::DatasetMode::kNone: return; - case asapo::SubSetMode::kBatch: - header->subset_size = GetEventMonConfig()->subset_batch_size; - header->id_in_subset = (header->message_id - 1) % header->subset_size + 1; - header->message_id = (header->message_id - 1) / header->subset_size + 1; + case asapo::DatasetMode::kBatch: + header->dataset_size = GetEventMonConfig()->dataset_batch_size; + header->dataset_substream = (header->message_id - 1) % header->dataset_size + 1; + header->message_id = (header->message_id - 1) / header->dataset_size + 1; break; - case asapo::SubSetMode::kMultiSource: - header->subset_size = GetEventMonConfig()->subset_multisource_nsources; - header->id_in_subset = GetEventMonConfig()->subset_multisource_sourceid; + case asapo::DatasetMode::kMultiSource: + header->dataset_size = GetEventMonConfig()->dataset_multisource_nsources; + header->dataset_substream = GetEventMonConfig()->dataset_multisource_sourceid; break; } } @@ -136,8 +136,8 @@ int main (int argc, char* argv[]) { continue; } message_header.message_id = ++i; - HandleSubsets(&message_header); - producer->SendFromFile(message_header, GetEventMonConfig()->root_monitored_folder + asapo::kPathSeparator + + HandleDatasets(&message_header); + producer->SendFile(message_header, GetEventMonConfig()->root_monitored_folder + asapo::kPathSeparator + message_header.file_name, asapo::kDefaultIngestMode, ProcessAfterSend); } diff --git a/producer/event_monitor_producer/unittests/mock_eventmon_config.cpp b/producer/event_monitor_producer/unittests/mock_eventmon_config.cpp index c41aca5ffb2eb2f537b8c64df283214dc7a34d14..4d0a68286c6231d3fcee575dd96272994b0dbefb 100644 --- a/producer/event_monitor_producer/unittests/mock_eventmon_config.cpp +++ b/producer/event_monitor_producer/unittests/mock_eventmon_config.cpp @@ -51,28 +51,28 @@ Error SetFolderMonConfig (const EventMonConfig& config) { config_string += "," + std::string("\"RemoveAfterSend\":") + (config.remove_after_send ? "true" : "false"); config_string += "," + std::string("\"DataSource\":") + "\"" + config.data_source + "\""; - std::string subset_mode; - switch (config.subset_mode) { - case SubSetMode::kBatch: - subset_mode = "batch"; + std::string dataset_mode; + switch (config.dataset_mode) { + case DatasetMode::kBatch: + dataset_mode = "batch"; break; - case SubSetMode::kMultiSource: - subset_mode = "multisource"; + case DatasetMode::kMultiSource: + dataset_mode = "multisource"; break; - case SubSetMode::kNone: - subset_mode = "none"; + case DatasetMode::kNone: + dataset_mode = "none"; break; } - config_string += "," + std::string("\"Subset\":{"); - config_string += std::string("\"Mode\":") + "\"" + subset_mode + "\""; - if (config.subset_mode == SubSetMode::kBatch) { - config_string += "," + std::string("\"BatchSize\":") + std::to_string(config.subset_batch_size); + config_string += "," + std::string("\"Dataset\":{"); + config_string += std::string("\"Mode\":") + "\"" + dataset_mode + "\""; + if (config.dataset_mode == DatasetMode::kBatch) { + config_string += "," + std::string("\"BatchSize\":") + std::to_string(config.dataset_batch_size); } - if (config.subset_mode == SubSetMode::kMultiSource) { - config_string += "," + std::string("\"SourceId\":") + std::to_string(config.subset_multisource_sourceid); - config_string += "," + std::string("\"NSources\":") + std::to_string(config.subset_multisource_nsources); + if (config.dataset_mode == DatasetMode::kMultiSource) { + config_string += "," + std::string("\"SourceId\":") + std::to_string(config.dataset_multisource_sourceid); + config_string += "," + std::string("\"NSources\":") + std::to_string(config.dataset_multisource_nsources); } config_string += "}"; diff --git a/producer/event_monitor_producer/unittests/test_eventmon_config.cpp b/producer/event_monitor_producer/unittests/test_eventmon_config.cpp index 0d4e90fea79481874a699c5cfe555067a7bac77d..f9369e4f4480b902c27d5b69ee40f6bda6b58470 100644 --- a/producer/event_monitor_producer/unittests/test_eventmon_config.cpp +++ b/producer/event_monitor_producer/unittests/test_eventmon_config.cpp @@ -31,7 +31,7 @@ using ::asapo::MockIO; using ::asapo::EventMonConfigFactory; using asapo::EventMonConfig; -using asapo::SubSetMode; +using asapo::DatasetMode; namespace { @@ -62,8 +62,8 @@ TEST_F(ConfigTests, ReadSettingsOK) { test_config.monitored_subfolders = {"test1", "test2"}; test_config.ignored_extensions = {"tmp", "test"}; test_config.remove_after_send = true; - test_config.subset_mode = SubSetMode::kBatch; - test_config.subset_batch_size = 9; + test_config.dataset_mode = DatasetMode::kBatch; + test_config.dataset_batch_size = 9; test_config.data_source = "source"; test_config.whitelisted_extensions = {"bla"}; @@ -82,8 +82,8 @@ TEST_F(ConfigTests, ReadSettingsOK) { ASSERT_THAT(config->root_monitored_folder, Eq("tmp")); ASSERT_THAT(config->ignored_extensions, ElementsAre("tmp", "test")); ASSERT_THAT(config->remove_after_send, Eq(true)); - ASSERT_THAT(config->subset_mode, Eq(SubSetMode::kBatch)); - ASSERT_THAT(config->subset_batch_size, Eq(9)); + ASSERT_THAT(config->dataset_mode, Eq(DatasetMode::kBatch)); + ASSERT_THAT(config->dataset_batch_size, Eq(9)); ASSERT_THAT(config->data_source, Eq("source")); } @@ -103,17 +103,17 @@ TEST_F(ConfigTests, ReadSettingsWhiteListOK) { TEST_F(ConfigTests, ReadSettingsMultiSourceOK) { asapo::EventMonConfig test_config; - test_config.subset_mode = SubSetMode::kMultiSource; - test_config.subset_multisource_nsources = 2; - test_config.subset_multisource_sourceid = 12; + test_config.dataset_mode = DatasetMode::kMultiSource; + test_config.dataset_multisource_nsources = 2; + test_config.dataset_multisource_sourceid = 12; auto err = asapo::SetFolderMonConfig(test_config); auto config = asapo::GetEventMonConfig(); ASSERT_THAT(err, Eq(nullptr)); - ASSERT_THAT(config->subset_mode, Eq(SubSetMode::kMultiSource)); - ASSERT_THAT(config->subset_multisource_nsources, Eq(2)); - ASSERT_THAT(config->subset_multisource_sourceid, Eq(12)); + ASSERT_THAT(config->dataset_mode, Eq(DatasetMode::kMultiSource)); + ASSERT_THAT(config->dataset_multisource_nsources, Eq(2)); + ASSERT_THAT(config->dataset_multisource_sourceid, Eq(12)); } @@ -135,16 +135,16 @@ TEST_F(ConfigTests, ReadSettingsChecksNthreads) { } -TEST_F(ConfigTests, ReadSettingsChecksSubsets) { +TEST_F(ConfigTests, ReadSettingsChecksDatasets) { asapo::EventMonConfig test_config; - test_config.subset_mode = SubSetMode::kBatch; - test_config.subset_batch_size = 0; + test_config.dataset_mode = DatasetMode::kBatch; + test_config.dataset_batch_size = 0; auto err = asapo::SetFolderMonConfig(test_config); ASSERT_THAT(err, Ne(nullptr)); - test_config.subset_mode = SubSetMode::kMultiSource; - test_config.subset_multisource_nsources = 0; + test_config.dataset_mode = DatasetMode::kMultiSource; + test_config.dataset_multisource_nsources = 0; err = asapo::SetFolderMonConfig(test_config); ASSERT_THAT(err, Ne(nullptr)); @@ -152,9 +152,9 @@ TEST_F(ConfigTests, ReadSettingsChecksSubsets) { } -TEST_F(ConfigTests, ReadSettingsDoesnotChecksSubsetsIfNoSubsets) { +TEST_F(ConfigTests, ReadSettingsDoesnotChecksDatasetsIfNoDatasets) { asapo::EventMonConfig test_config; - test_config.subset_batch_size = 0; + test_config.dataset_batch_size = 0; auto err = asapo::SetFolderMonConfig(test_config); ASSERT_THAT(err, Eq(nullptr)); diff --git a/receiver/src/request_handler/request_factory.cpp b/receiver/src/request_handler/request_factory.cpp index c3372ca1305a5bacab4068cac24b5899b69e87b0..fdacdd94a3c4eed52152fddc0489786f85b76818 100644 --- a/receiver/src/request_handler/request_factory.cpp +++ b/receiver/src/request_handler/request_factory.cpp @@ -51,7 +51,7 @@ Error RequestFactory::AddHandlersToRequest(std::unique_ptr<Request> &request, switch (request_header.op_code) { case Opcode::kOpcodeTransferData: - case Opcode::kOpcodeTransferSubsetData: { + case Opcode::kOpcodeTransferDatasetData: { request->AddHandler(&request_handler_receive_metadata_); auto err = AddReceiveWriteHandlers(request, request_header); if (err) { diff --git a/receiver/src/request_handler/request_handler_db_check_request.cpp b/receiver/src/request_handler/request_handler_db_check_request.cpp index 60fbe61a1ad7d6b3e5ea20c3e83c330d3eb091ec..b43347fee946e7bc5a48cbb31296afcb65487271 100644 --- a/receiver/src/request_handler/request_handler_db_check_request.cpp +++ b/receiver/src/request_handler/request_handler_db_check_request.cpp @@ -32,7 +32,7 @@ Error RequestHandlerDbCheckRequest::GetRecordFromDb(const Request* request, Mess auto id_in_set = request->GetCustomData()[1]; err = db_client__->GetDataSetById(col_name, id_in_set, id, record); if (!err) { - log__->Debug(std::string{"get subset record id "} + std::to_string(id) + " from " + col_name + " in " + + log__->Debug(std::string{"get dataset record id "} + std::to_string(id) + " from " + col_name + " in " + db_name_ + " at " + GetReceiverConfig()->database_uri); } return err; diff --git a/receiver/src/request_handler/request_handler_db_write.cpp b/receiver/src/request_handler/request_handler_db_write.cpp index 2d61d1122cf98b57a23ee13a8c10eaae4ffd79ce..d0113286aab69657d27e4ed7c69fdd272b411855 100644 --- a/receiver/src/request_handler/request_handler_db_write.cpp +++ b/receiver/src/request_handler/request_handler_db_write.cpp @@ -65,13 +65,12 @@ Error RequestHandlerDbWrite::InsertRecordToDb(const Request* request) const { " at " + GetReceiverConfig()->database_uri); } } else { - auto subset_id = message_meta.id; - message_meta.id = request->GetCustomData()[1]; - auto subset_size = request->GetCustomData()[2]; - err = db_client__->InsertAsSubset(col_name, message_meta, subset_id, subset_size, false); + message_meta.dataset_substream = request->GetCustomData()[1]; + auto dataset_size = request->GetCustomData()[2]; + err = db_client__->InsertAsDatasetMessage(col_name, message_meta, dataset_size, false); if (!err) { - log__->Debug(std::string{"insert record as subset id "} + std::to_string(message_meta.id) + ", id in subset: " + - std::to_string(subset_id) + " to " + col_name + " in " + + log__->Debug(std::string{"insert record to substream "} + std::to_string(message_meta.dataset_substream) + ", id: " + + std::to_string(message_meta.id) + " to " + col_name + " in " + db_name_ + " at " + GetReceiverConfig()->database_uri); } diff --git a/receiver/unittests/request_handler/test_request_factory.cpp b/receiver/unittests/request_handler/test_request_factory.cpp index bb924ba635e4939ad43a7cad0ad7f5d6a51e3c02..5f224d3d01135bacd0906518eccd10ba51c7aa72 100644 --- a/receiver/unittests/request_handler/test_request_factory.cpp +++ b/receiver/unittests/request_handler/test_request_factory.cpp @@ -80,7 +80,7 @@ TEST_F(FactoryTests, ErrorOnWrongCode) { } TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendCode) { - for (auto code : std::vector<asapo::Opcode> {asapo::Opcode::kOpcodeTransferData, asapo::Opcode::kOpcodeTransferSubsetData}) { + for (auto code : std::vector<asapo::Opcode> {asapo::Opcode::kOpcodeTransferData, asapo::Opcode::kOpcodeTransferDatasetData}) { generic_request_header.op_code = code; auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); @@ -96,7 +96,7 @@ TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendCode) { } TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendCodeLargeFile) { - for (auto code : std::vector<asapo::Opcode> {asapo::Opcode::kOpcodeTransferData, asapo::Opcode::kOpcodeTransferSubsetData}) { + for (auto code : std::vector<asapo::Opcode> {asapo::Opcode::kOpcodeTransferData, asapo::Opcode::kOpcodeTransferDatasetData}) { generic_request_header.op_code = code; config.receive_to_disk_threshold_mb = 0; SetReceiverConfig(config, "none"); diff --git a/receiver/unittests/request_handler/test_request_handler_db_check_request.cpp b/receiver/unittests/request_handler/test_request_handler_db_check_request.cpp index 31d93693bbad712281eb8f2d2340802a4bc8478d..85c2b5da91967daf92d26146e08bd6eef6030e4f 100644 --- a/receiver/unittests/request_handler/test_request_handler_db_check_request.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db_check_request.cpp @@ -82,9 +82,9 @@ class DbCheckRequestHandlerTests : public Test { std::string expected_metadata = "meta"; uint64_t expected_file_size = 10; uint64_t expected_id = 15; - uint64_t expected_subset_id = 16; - uint64_t expected_subset_size = 2; - uint64_t expected_custom_data[asapo::kNCustomParams] {0, expected_subset_id, expected_subset_size}; + uint64_t expected_dataset_id = 16; + uint64_t expected_dataset_size = 2; + uint64_t expected_custom_data[asapo::kNCustomParams] {0, expected_dataset_id, expected_dataset_size}; MessageMeta expected_message_meta; MockFunctions mock_functions; int n_run = 0; @@ -179,7 +179,7 @@ void DbCheckRequestHandlerTests::ExpectRequestParams(asapo::Opcode op_code, cons .WillOnce(Return(op_code)) ; - if (op_code == asapo::Opcode::kOpcodeTransferSubsetData) { + if (op_code == asapo::Opcode::kOpcodeTransferDatasetData) { EXPECT_CALL(*mock_request, GetCustomData_t()) .WillOnce(Return(expected_custom_data)) ; @@ -207,8 +207,8 @@ void DbCheckRequestHandlerTests::MockGetByID(asapo::ErrorInterface* error, bool } void DbCheckRequestHandlerTests::MockGetSetByID(asapo::ErrorInterface* error, bool expect_compare ) { - ExpectRequestParams(asapo::Opcode::kOpcodeTransferSubsetData, expected_data_source, expect_compare); - EXPECT_CALL(mock_db, GetSetById_t(expected_collection_name, expected_subset_id, expected_id, _)). + ExpectRequestParams(asapo::Opcode::kOpcodeTransferDatasetData, expected_data_source, expect_compare); + EXPECT_CALL(mock_db, GetSetById_t(expected_collection_name, expected_dataset_id, expected_id, _)). WillOnce(DoAll( SetArgPointee<3>(expected_message_meta), testing::Return(error) diff --git a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp index b4962bedc273ebade9d5d48b256d7f27e1e203e4..129e704a0cea73acb6e8d19368817a815b383c29 100644 --- a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp @@ -83,9 +83,9 @@ class DbWriterHandlerTests : public Test { std::string expected_metadata = "meta"; uint64_t expected_file_size = 10; uint64_t expected_id = 15; - uint64_t expected_subset_id = 15; - uint64_t expected_subset_size = 2; - uint64_t expected_custom_data[asapo::kNCustomParams] {0, expected_subset_id, expected_subset_size}; + uint64_t expected_substream = 20; + uint64_t expected_dataset_size = 2; + uint64_t expected_custom_data[asapo::kNCustomParams] {0, expected_substream, expected_dataset_size}; asapo::MockHandlerDbCheckRequest mock_db_check_handler{asapo::kDBDataCollectionNamePrefix}; void SetUp() override { @@ -105,7 +105,7 @@ class DbWriterHandlerTests : public Test { void ExpectRequestParams(asapo::Opcode op_code, const std::string& data_source); void ExpectLogger(); void ExpectDuplicatedID(); - MessageMeta PrepareMessageMeta(); + MessageMeta PrepareMessageMeta(bool substream = false); void TearDown() override { handler.db_client__.release(); } @@ -117,6 +117,7 @@ MATCHER_P(CompareMessageMeta, file, "") { if (arg.size != file.size) return false; if (arg.source != file.source) return false; if (arg.buf_id != file.buf_id) return false; + if (arg.dataset_substream != file.dataset_substream) return false; if (arg.name != file.name) return false; if (arg.id != file.id) return false; if (arg.metadata != file.metadata) return false; @@ -179,7 +180,7 @@ void DbWriterHandlerTests::ExpectRequestParams(asapo::Opcode op_code, const std: .WillOnce(Return(op_code)) ; - if (op_code == asapo::Opcode::kOpcodeTransferSubsetData) { + if (op_code == asapo::Opcode::kOpcodeTransferDatasetData) { EXPECT_CALL(*mock_request, GetCustomData_t()).Times(2). WillRepeatedly(Return(expected_custom_data)) ; @@ -189,11 +190,14 @@ void DbWriterHandlerTests::ExpectRequestParams(asapo::Opcode op_code, const std: -MessageMeta DbWriterHandlerTests::PrepareMessageMeta() { +MessageMeta DbWriterHandlerTests::PrepareMessageMeta(bool substream) { MessageMeta message_meta; message_meta.size = expected_file_size; message_meta.name = expected_file_name; message_meta.id = expected_id; + if (substream) { + message_meta.dataset_substream = expected_substream; + } message_meta.buf_id = expected_buf_id; message_meta.source = expected_host_ip + ":" + std::to_string(expected_port); message_meta.metadata = expected_metadata; @@ -223,14 +227,14 @@ TEST_F(DbWriterHandlerTests, CallsInsert) { handler.ProcessRequest(mock_request.get()); } -TEST_F(DbWriterHandlerTests, CallsInsertSubset) { +TEST_F(DbWriterHandlerTests, CallsInsertDataset) { - ExpectRequestParams(asapo::Opcode::kOpcodeTransferSubsetData, expected_data_source); - auto message_meta = PrepareMessageMeta(); + ExpectRequestParams(asapo::Opcode::kOpcodeTransferDatasetData, expected_data_source); + auto message_meta = PrepareMessageMeta(true); - EXPECT_CALL(mock_db, InsertAsSubset_t(expected_collection_name, CompareMessageMeta(message_meta), expected_subset_id, - expected_subset_size, false)). + EXPECT_CALL(mock_db, InsertAsDatasetMessage_t(expected_collection_name, CompareMessageMeta(message_meta), + expected_dataset_size, false)). WillOnce(testing::Return( nullptr)); ExpectLogger(); diff --git a/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py b/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py index f999665371057705c38946bfd4d009ccf71e5358..c0858795a1bdf11a15e24bf61b52751c2b574df1 100644 --- a/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py +++ b/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py @@ -18,7 +18,7 @@ class AsapoSender: self.ingest_mode = asapo_producer.DEFAULT_INGEST_MODE self._n_queued = 8 def send(self, data, metadata): - self.producer.send_data( + self.producer.send( metadata['_id'], metadata['name'], data, ingest_mode=self.ingest_mode, callback=self._callback) diff --git a/tests/automatic/bug_fixes/producer_send_after_restart/test.json.in b/tests/automatic/bug_fixes/producer_send_after_restart/test.json.in index 0bb40a0f2674b4e0677c0d7fb6894a5dffb693fc..488577ab4558d1e1a94d02a0e8ad2909760044c1 100644 --- a/tests/automatic/bug_fixes/producer_send_after_restart/test.json.in +++ b/tests/automatic/bug_fixes/producer_send_after_restart/test.json.in @@ -11,7 +11,7 @@ "WhitelistExtensions":[], "RemoveAfterSend":true, "DataSource": "", - "Subset": { + "Dataset": { "Mode":"none" } diff --git a/tests/automatic/bug_fixes/receiver_cpu_usage/test.json.in b/tests/automatic/bug_fixes/receiver_cpu_usage/test.json.in index 537e5e0addc6b95c960a25ad1810f01d27003b46..6714f39e689b089be8d3870a85fe58ef0639bd38 100644 --- a/tests/automatic/bug_fixes/receiver_cpu_usage/test.json.in +++ b/tests/automatic/bug_fixes/receiver_cpu_usage/test.json.in @@ -11,7 +11,7 @@ "WhitelistExtensions":[], "RemoveAfterSend":true, "DataSource": "", - "Subset": { + "Dataset": { "Mode":"none" } } diff --git a/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py b/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py index 3949f00b2027540839ea983ec7032cb0ec8fd406..5f48b974a311c3bb9cc7278465c49ac3c0712f04 100644 --- a/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py +++ b/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py @@ -34,7 +34,7 @@ group_id = consumer.generate_group_id() n_send = 10 for i in range(n_send): - producer.send_data(i+1, "name"+str(i),None,ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY,stream = "stream", callback = callback) + producer.send(i+1, "name"+str(i),None,ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY,stream = "stream", callback = callback) producer.send_stream_finished_flag("stream", 10, next_stream = "next_stream", callback = callback) producer.wait_requests_finished(timeout) diff --git a/tests/automatic/full_chain/simple_chain_filegen/test.json.in b/tests/automatic/full_chain/simple_chain_filegen/test.json.in index e0979a465208fe2479f34b3a4a96b5459346267a..fef76848b8562c9058e3dfe681b2ac7170cc2a86 100644 --- a/tests/automatic/full_chain/simple_chain_filegen/test.json.in +++ b/tests/automatic/full_chain/simple_chain_filegen/test.json.in @@ -11,7 +11,7 @@ "WhitelistExtensions":[], "RemoveAfterSend":true, "DataSource": "", - "Subset": { + "Dataset": { "Mode":"none" } diff --git a/tests/automatic/full_chain/simple_chain_filegen_batches/test.json.in b/tests/automatic/full_chain/simple_chain_filegen_batches/test.json.in index eed5e4c01826102a123eca9eee5c865d50a5b913..a1053cd5b1efc23bf41039f8cd8167a62a7db213 100644 --- a/tests/automatic/full_chain/simple_chain_filegen_batches/test.json.in +++ b/tests/automatic/full_chain/simple_chain_filegen_batches/test.json.in @@ -11,7 +11,7 @@ "WhitelistExtensions":[], "RemoveAfterSend":false, "DataSource": "", - "Subset": { + "Dataset": { "Mode":"batch", "BatchSize":3 } diff --git a/tests/automatic/full_chain/simple_chain_filegen_multisource/test.json.in b/tests/automatic/full_chain/simple_chain_filegen_multisource/test.json.in index fae62b12f66435fe8f68e49cb975dc6e1aaeeaa5..5f32f629e2ba57f733c8dd922cac14d30e9ea26e 100644 --- a/tests/automatic/full_chain/simple_chain_filegen_multisource/test.json.in +++ b/tests/automatic/full_chain/simple_chain_filegen_multisource/test.json.in @@ -11,7 +11,7 @@ "WhitelistExtensions":[], "RemoveAfterSend":true, "DataSource": "", - "Subset": { + "Dataset": { "Mode":"multisource", "SourceId":@ID@, "NSources":2 diff --git a/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/test.json.in b/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/test.json.in index 0bb40a0f2674b4e0677c0d7fb6894a5dffb693fc..488577ab4558d1e1a94d02a0e8ad2909760044c1 100644 --- a/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/test.json.in +++ b/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/test.json.in @@ -11,7 +11,7 @@ "WhitelistExtensions":[], "RemoveAfterSend":true, "DataSource": "", - "Subset": { + "Dataset": { "Mode":"none" } diff --git a/tests/automatic/full_chain/simple_chain_filegen_readdata_file/test.json.in b/tests/automatic/full_chain/simple_chain_filegen_readdata_file/test.json.in index 0bb40a0f2674b4e0677c0d7fb6894a5dffb693fc..488577ab4558d1e1a94d02a0e8ad2909760044c1 100644 --- a/tests/automatic/full_chain/simple_chain_filegen_readdata_file/test.json.in +++ b/tests/automatic/full_chain/simple_chain_filegen_readdata_file/test.json.in @@ -11,7 +11,7 @@ "WhitelistExtensions":[], "RemoveAfterSend":true, "DataSource": "", - "Subset": { + "Dataset": { "Mode":"none" } diff --git a/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp b/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp index b10952d39d6c9a18d173f957bc8bc38185fe950c..a19a182b13d51933a765f2df81e22db4660a87af 100644 --- a/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp +++ b/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp @@ -37,34 +37,32 @@ int main(int argc, char* argv[]) { asapo::MessageMeta fi; fi.size = 100; fi.name = "relpath/1"; - uint64_t subset_id = args.file_id; fi.timestamp = std::chrono::system_clock::now(); fi.buf_id = 18446744073709551615ull; fi.source = "host:1234"; - fi.id = 10; + fi.id = args.file_id; + fi.dataset_substream = 10; - uint64_t subset_size = 2; + uint64_t dataset_size = 2; if (args.keyword != "Notconnected") { db.Connect("127.0.0.1", "data"); } - auto err = db.InsertAsSubset("test", fi, subset_id, subset_size, true); + auto err = db.InsertAsDatasetMessage("test", fi, dataset_size, true); if (args.keyword == "DuplicateID") { Assert(err, "OK"); - fi.id = 2; - err = db.InsertAsSubset("test", fi, subset_id, subset_size, true); -// Assert(err, "OK"); - err = db.InsertAsSubset("test", fi, subset_id, subset_size, false); + err = db.InsertAsDatasetMessage("test", fi, dataset_size, true); + err = db.InsertAsDatasetMessage("test", fi, dataset_size, false); } Assert(err, args.keyword); if (args.keyword == "OK") { // check retrieve asapo::MessageMeta fi_db; - err = db.GetDataSetById("test", fi.id,subset_id, &fi_db); + err = db.GetDataSetById("test", fi.dataset_substream,fi.id, &fi_db); M_AssertTrue(fi_db == fi, "get record from db"); M_AssertEq(nullptr, err); err = db.GetDataSetById("test", 0, 0, &fi_db); diff --git a/tests/automatic/producer/file_monitor_producer/test.json.in b/tests/automatic/producer/file_monitor_producer/test.json.in index 88f1e6e55681029ead0d8681ac229d06daf195a1..fa4d102c24c3ec03eca49b1999cd5907ddcf8fe9 100644 --- a/tests/automatic/producer/file_monitor_producer/test.json.in +++ b/tests/automatic/producer/file_monitor_producer/test.json.in @@ -11,7 +11,7 @@ "WhitelistExtensions":[], "RemoveAfterSend":true, "DataSource": "", - "Subset": { + "Dataset": { "Mode":"none" } diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index 3f6e545556547912b38aaedb5cf6184eaf7f2555..eb14bca76ed10a90c94b978bfb501a007332d1d9 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -49,10 +49,10 @@ producer.send_file(1, local_path="./file1", exposed_path="processed/" + data_sou producer.send_file(10, local_path="./file1", exposed_path="processed/" + data_source + "/" + "file10", user_meta='{"test_key":"test_val"}', callback=None) -# send subsets -producer.send_file(2, local_path="./file1", exposed_path="processed/" + data_source + "/" + "file2", subset=(1, 2), +# send datasets +producer.send_file(2, local_path="./file1", exposed_path="processed/" + data_source + "/" + "file2", dataset=(1, 2), user_meta='{"test_key":"test_val"}', callback=callback) -producer.send_file(2, local_path="./file1", exposed_path="processed/" + data_source + "/" + "file3", subset=(2, 2), +producer.send_file(2, local_path="./file1", exposed_path="processed/" + data_source + "/" + "file3", dataset=(2, 2), user_meta='{"test_key":"test_val"}', callback=callback) # send meta only @@ -62,27 +62,27 @@ producer.send_file(3, local_path="./not_exist", exposed_path="./whatever", data = np.arange(10, dtype=np.float64) # send data from array -producer.send_data(4, "processed/" + data_source + "/" + "file5", data, +producer.send(4, "processed/" + data_source + "/" + "file5", data, ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback) # send data from string -producer.send_data(5, "processed/" + data_source + "/" + "file6", b"hello", +producer.send(5, "processed/" + data_source + "/" + "file6", b"hello", ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback) # send metadata only -producer.send_data(6, "processed/" + data_source + "/" + "file7", None, +producer.send(6, "processed/" + data_source + "/" + "file7", None, ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback=callback) # send single file/wrong filename producer.send_file(1, local_path="./file2", exposed_path="processed/" + data_source + "/" + "file1", callback=callback) x = np.array([[1, 2, 3], [4, 5, 6]], np.float32) -producer.send_data(8, "processed/" + data_source + "/" + "file8", x, +producer.send(8, "processed/" + data_source + "/" + "file8", x, ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback) try: x = x.T - producer.send_data(8, "processed/" + data_source + "/" + "file8", x, + producer.send(8, "processed/" + data_source + "/" + "file8", x, ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback) except asapo_producer.AsapoWrongInputError as e: print(e) @@ -91,7 +91,7 @@ else: sys.exit(1) try: - producer.send_data(0, "processed/" + data_source + "/" + "file6", b"hello", + producer.send(0, "processed/" + data_source + "/" + "file6", b"hello", ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback) except asapo_producer.AsapoWrongInputError as e: print(e) @@ -100,7 +100,7 @@ else: sys.exit(1) # send to another stream -producer.send_data(1, "processed/" + data_source + "/" + "file9", None, +producer.send(1, "processed/" + data_source + "/" + "file9", None, ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, stream="stream", callback=callback) # wait normal requests finished before sending duplicates @@ -111,13 +111,13 @@ producer.wait_requests_finished(50000) producer.send_file(1, local_path="./file1", exposed_path="processed/" + data_source + "/" + "file1", user_meta='{"test_key":"test_val"}', callback=callback) # send metadata only once again -producer.send_data(6, "processed/" + data_source + "/" + "file7", None, +producer.send(6, "processed/" + data_source + "/" + "file7", None, ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback=callback) # send same id different data producer.send_file(1, local_path="./file1", exposed_path="processed/" + data_source + "/" + "file1", user_meta='{"test_key1":"test_val"}', callback=callback) # send same id different data -producer.send_data(6, "processed/" + data_source + "/" + "file8", None, +producer.send(6, "processed/" + data_source + "/" + "file8", None, ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback=callback) # send same id without writing to database, should success @@ -130,7 +130,7 @@ n = producer.get_requests_queue_size() assert_eq(n, 0, "requests in queue") # send to another data to stream stream -producer.send_data(2, "processed/" + data_source + "/" + "file10", None, +producer.send(2, "processed/" + data_source + "/" + "file10", None, ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, stream="stream", callback=callback) producer.wait_requests_finished(50000) diff --git a/tests/manual/python_tests/producer/test.py b/tests/manual/python_tests/producer/test.py index 3e6dc6f9500928a6262b31d9fb5e6383a59f3325..403dc1d4623bba660f17f94a428b274f5770cec5 100644 --- a/tests/manual/python_tests/producer/test.py +++ b/tests/manual/python_tests/producer/test.py @@ -35,9 +35,9 @@ producer.set_log_level("info") producer.send_file(1, local_path = "./file1", exposed_path = data_source+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback) -#send subsets -producer.send_file(2, local_path = "./file1", exposed_path = data_source+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) -producer.send_file(3, local_path = "./file1", exposed_path = data_source+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) +#send datasets +producer.send_file(2, local_path = "./file1", exposed_path = data_source+"/"+"file2",dataset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) +producer.send_file(3, local_path = "./file1", exposed_path = data_source+"/"+"file3",dataset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) #send meta only producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever", @@ -46,25 +46,25 @@ producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever", data = np.arange(10,dtype=np.float64) #send data from array -producer.send_data(4, data_source+"/"+"file5",data, +producer.send(4, data_source+"/"+"file5",data, ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) #send data from string -err = producer.send_data(5, data_source+"/"+"file6",b"hello", +err = producer.send(5, data_source+"/"+"file6",b"hello", ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) #send metadata only -producer.send_data(6, data_source+"/"+"file7",None, +producer.send(6, data_source+"/"+"file7",None, ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) x = np.array([[1, 2, 3], [4, 5, 6]], np.float32) -producer.send_data(4, data_source+"/"+"file5",x, +producer.send(4, data_source+"/"+"file5",x, ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) try: x = x.T - producer.send_data(4, data_source+"/"+"file5",x, + producer.send(4, data_source+"/"+"file5",x, ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) except: pass diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/test.py b/tests/manual/python_tests/producer_wait_bug_mongo/test.py index 145c27e04e644560248a9d423be425784fa3a9e4..ead637fdfcfac7c07b70a9ff243e83861ac3933b 100644 --- a/tests/manual/python_tests/producer_wait_bug_mongo/test.py +++ b/tests/manual/python_tests/producer_wait_bug_mongo/test.py @@ -35,9 +35,9 @@ producer.set_log_level("debug") producer.send_file(1, local_path = "./file1", exposed_path = data_source+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback) -#send subsets -producer.send_file(2, local_path = "./file1", exposed_path = data_source+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) -producer.send_file(3, local_path = "./file1", exposed_path = data_source+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) +#send datasets +producer.send_file(2, local_path = "./file1", exposed_path = data_source+"/"+"file2",dataset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) +producer.send_file(3, local_path = "./file1", exposed_path = data_source+"/"+"file3",dataset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) #send meta only producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever", @@ -46,25 +46,25 @@ producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever", data = np.arange(10,dtype=np.float64) #send data from array -producer.send_data(4, data_source+"/"+"file5",data, +producer.send(4, data_source+"/"+"file5",data, ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) #send data from string -err = producer.send_data(5, data_source+"/"+"file6",b"hello", +err = producer.send(5, data_source+"/"+"file6",b"hello", ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) #send metadata only -producer.send_data(6, data_source+"/"+"file7",None, +producer.send(6, data_source+"/"+"file7",None, ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) x = np.array([[1, 2, 3], [4, 5, 6]], np.float32) -producer.send_data(4, data_source+"/"+"file5",x, +producer.send(4, data_source+"/"+"file5",x, ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) try: x = x.T - producer.send_data(4, data_source+"/"+"file5",x, + producer.send(4, data_source+"/"+"file5",x, ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) except: pass diff --git a/tests/manual/python_tests/producer_wait_threads/producer_api.py b/tests/manual/python_tests/producer_wait_threads/producer_api.py index 0175bb3861f5b69be4860da15425ff9b8766d8ec..1c19ec7f15d22b82a85a185c6b6c32e1060bdf89 100644 --- a/tests/manual/python_tests/producer_wait_threads/producer_api.py +++ b/tests/manual/python_tests/producer_wait_threads/producer_api.py @@ -32,9 +32,9 @@ producer.send_file(1, local_path = "./file1", exposed_path = data_source+"/"+"fi #send single file without callback producer.send_file(1, local_path = "./file1", exposed_path = data_source+"/"+"file1", user_meta = '{"test_key":"test_val"}',callback=None) -#send subsets -producer.send_file(2, local_path = "./file1", exposed_path = data_source+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) -producer.send_file(3, local_path = "./file1", exposed_path = data_source+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) +#send datasets +producer.send_file(2, local_path = "./file1", exposed_path = data_source+"/"+"file2",dataset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) +producer.send_file(3, local_path = "./file1", exposed_path = data_source+"/"+"file3",dataset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) #send meta only producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever", @@ -43,15 +43,15 @@ producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever", data = np.arange(10,dtype=np.float64) #send data from array -producer.send_data(4, data_source+"/"+"file5",data, +producer.send(4, data_source+"/"+"file5",data, ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) #send data from string -producer.send_data(5, data_source+"/"+"file6",b"hello", +producer.send(5, data_source+"/"+"file6",b"hello", ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) #send metadata only -producer.send_data(6, data_source+"/"+"file7",None, +producer.send(6, data_source+"/"+"file7",None, ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) producer.wait_requests_finished(1000) diff --git a/tests/manual/python_tests/producer_wait_threads/test.py b/tests/manual/python_tests/producer_wait_threads/test.py index bd6cdcc23f939163f2ace6eda1f8763d07fc25f3..75983e6b30571a4bec77ea1243d923606a4c1ae1 100644 --- a/tests/manual/python_tests/producer_wait_threads/test.py +++ b/tests/manual/python_tests/producer_wait_threads/test.py @@ -33,9 +33,9 @@ producer.send_file(1, local_path = "./file1", exposed_path = data_source+"/"+"fi producer.send_file(1, local_path = "./file1", exposed_path = data_source+"/"+"file1", user_meta = '{"test_key":"test_val"}') -#send subsets -producer.send_file(2, local_path = "./file1", exposed_path = data_source+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) -producer.send_file(3, local_path = "./file1", exposed_path = data_source+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) +#send datasets +producer.send_file(2, local_path = "./file1", exposed_path = data_source+"/"+"file2",dataset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) +producer.send_file(3, local_path = "./file1", exposed_path = data_source+"/"+"file3",dataset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) #send meta only producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever", @@ -44,15 +44,15 @@ producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever", data = np.arange(10,dtype=np.float64) #send data from array -producer.send_data(4, data_source+"/"+"file5",data, +producer.send(4, data_source+"/"+"file5",data, ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) #send data from string -producer.send_data(5, data_source+"/"+"file6",b"hello", +producer.send(5, data_source+"/"+"file6",b"hello", ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) #send metadata only -producer.send_data(6, data_source+"/"+"file7",None, +producer.send(6, data_source+"/"+"file7",None, ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) producer.wait_requests_finished(1) diff --git a/tests/manual/tests_via_nomad/asapo-test_filemon.nomad.in b/tests/manual/tests_via_nomad/asapo-test_filemon.nomad.in index 91205ea9b00d90dc549847c83f4d17063611b7b2..be78fb225d36390b0f109a91d0665d9b10417c04 100644 --- a/tests/manual/tests_via_nomad/asapo-test_filemon.nomad.in +++ b/tests/manual/tests_via_nomad/asapo-test_filemon.nomad.in @@ -42,7 +42,7 @@ job "asapo-filemon" { "WhitelistExtensions":[], "RemoveAfterSend":true, "DataSource": "", - "Subset": { + "Dataset": { "Mode":"none" } } @@ -103,7 +103,7 @@ job "asapo-filemon" { "WhitelistExtensions":[], "RemoveAfterSend":true, "DataSource": "", - "Subset": { + "Dataset": { "Mode":"none" } diff --git a/tests/manual/tests_via_nomad/asapo-test_filemon_batch.nomad.in b/tests/manual/tests_via_nomad/asapo-test_filemon_batch.nomad.in index ec8f6d680a7ba36535864fe070d77f00a4ad0f52..07c84daeb5b311ec33d8adad5b468bcadd38c0d5 100644 --- a/tests/manual/tests_via_nomad/asapo-test_filemon_batch.nomad.in +++ b/tests/manual/tests_via_nomad/asapo-test_filemon_batch.nomad.in @@ -42,7 +42,7 @@ job "asapo-filemon_batch" { "WhitelistExtensions":[], "RemoveAfterSend":true, "DataSource": "", - "Subset": { + "Dataset": { "Mode":"batch", "BatchSize": {{ keyOrDefault "monitor_batch_size" "3" }} } @@ -104,7 +104,7 @@ job "asapo-filemon_batch" { "WhitelistExtensions":[], "RemoveAfterSend":true, "DataSource": "", - "Subset": { + "Dataset": { "Mode":"batch", "BatchSize": {{ keyOrDefault "monitor_batch_size" "3" }} } diff --git a/tests/manual/tests_via_nomad/asapo-test_filemon_multisource.nomad.in b/tests/manual/tests_via_nomad/asapo-test_filemon_multisource.nomad.in index 77585d59cc128394af7101e69be9dc124c6e19af..7f7b07825033ee28c48161c07d2825ffdad7075e 100644 --- a/tests/manual/tests_via_nomad/asapo-test_filemon_multisource.nomad.in +++ b/tests/manual/tests_via_nomad/asapo-test_filemon_multisource.nomad.in @@ -42,7 +42,7 @@ job "asapo-filemon_multisource" { "WhitelistExtensions":[], "RemoveAfterSend":true, "DataSource": "", - "Subset": { + "Dataset": { "Mode":"multisource", "SourceId": 1, "NSources":2 @@ -105,7 +105,7 @@ job "asapo-filemon_multisource" { "WhitelistExtensions":[], "RemoveAfterSend":true, "DataSource": "", - "Subset": { + "Dataset": { "Mode":"multisource", "SourceId": 2, "NSources":2 diff --git a/tests/manual/tests_via_nomad/asapo-test_filemon_producer_tolocal.nomad.in b/tests/manual/tests_via_nomad/asapo-test_filemon_producer_tolocal.nomad.in index 590460341e9440edfa38872e18e495089038a2cf..13c332d532289e50bacf15d355a70a2820b2ac14 100644 --- a/tests/manual/tests_via_nomad/asapo-test_filemon_producer_tolocal.nomad.in +++ b/tests/manual/tests_via_nomad/asapo-test_filemon_producer_tolocal.nomad.in @@ -42,7 +42,7 @@ job "asapo-produceronly" { "WhitelistExtensions":[], "RemoveAfterSend":true, "DataSource": "", - "Subset": { + "Dataset": { "Mode":"none" } } @@ -103,7 +103,7 @@ job "asapo-produceronly" { "WhitelistExtensions":[], "RemoveAfterSend":true, "DataSource": "", - "Subset": { + "Dataset": { "Mode":"none" }