diff --git a/3d_party/mongo-c-driver/install.sh b/3d_party/mongo-c-driver/install.sh index eba4e08f6b6e3ecc45f7e7e3075a63b9c93b0f89..7d0f2b352617b65e2010bf0ed28a44da0a73206a 100755 --- a/3d_party/mongo-c-driver/install.sh +++ b/3d_party/mongo-c-driver/install.sh @@ -1,12 +1,21 @@ #!/usr/bin/env bash + +set -e + +if [[ p$1 == "p" ]]; then + echo "install folder missing" + exit 1 +fi + + cd $1 -wget https://github.com/mongodb/mongo-c-driver/releases/download/1.15.2/mongo-c-driver-1.15.2.tar.gz -tar xzf mongo-c-driver-1.15.2.tar.gz -cd mongo-c-driver-1.15.2 +wget https://github.com/mongodb/mongo-c-driver/releases/download/1.17.2/mongo-c-driver-1.17.2.tar.gz +tar xzf mongo-c-driver-1.17.2.tar.gz +cd mongo-c-driver-1.17.2 -cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_SSL=OFF -DENABLE_SASL=OFF -DENABLE_AUTOMATIC_INIT_AND_CLEANUP=OFF -DMONGOC_ENABLE_STATIC=ON . -make +cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_SSL=OFF -DENABLE_SASL=OFF -DENABLE_AUTOMATIC_INIT_AND_CLEANUP=OFF -DMONGOC_ENABLE_STATIC=ON . +make -j 4 #sudo make install diff --git a/CHANGELOG.md b/CHANGELOG.md index 258270253dcdf7ba8295289316473cc28211551c..3c188b0e09c768093bdd52be7c808130be703289 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,12 @@ -## 21.06.0 (in progress) +## 21.06.0 + +FEATURES +* Consumer API: C client +* Producer API: An option to automatically generate message id (use sparingly, reduced performance possible) IMPROVEMENTS * Consumer/Producer API - allow any characters in source/stream/group names +* Consumer/Producer API - introduce stream metadata * Consumer API - an option to auto discovery of data folder when consumer client uses file transfer service (has_filesystem=False) BUG FIXES diff --git a/common/cpp/include/asapo/database/database.h b/common/cpp/include/asapo/database/database.h index e21af48757b4774baf42d7435bb4b01a020c00ae..61c32e0dcec5a7867d0f6d9c87aff8fb3846657c 100644 --- a/common/cpp/include/asapo/database/database.h +++ b/common/cpp/include/asapo/database/database.h @@ -14,7 +14,8 @@ constexpr char kDBMetaCollectionName[] = "meta"; class Database { public: virtual Error Connect(const std::string& address, const std::string& database) = 0; - virtual Error Insert(const std::string& collection, const MessageMeta& file, bool ignore_duplicates) const = 0; + virtual Error Insert(const std::string& collection, const MessageMeta& file, bool ignore_duplicates, + uint64_t* id_inserted) const = 0; virtual Error InsertMeta(const std::string& collection, const std::string& id, const uint8_t* data, uint64_t size, MetaIngestMode mode) const = 0; virtual Error InsertAsDatasetMessage(const std::string& collection, const MessageMeta& file, diff --git a/common/cpp/include/asapo/unittests/MockDatabase.h b/common/cpp/include/asapo/unittests/MockDatabase.h index f8c2464364c7eaf7ebec9c3d26fd622a3de3542b..b8f94b260fb13f4554ea86e71f59166a9b32b4bd 100644 --- a/common/cpp/include/asapo/unittests/MockDatabase.h +++ b/common/cpp/include/asapo/unittests/MockDatabase.h @@ -15,8 +15,9 @@ class MockDatabase : public Database { return Error{Connect_t(address, database)}; } - Error Insert(const std::string& collection, const MessageMeta& file, bool ignore_duplicates) const override { - return Error{Insert_t(collection, file, ignore_duplicates)}; + Error Insert(const std::string& collection, const MessageMeta& file, bool ignore_duplicates, + uint64_t* id_inserted) const override { + return Error{Insert_t(collection, file, ignore_duplicates, id_inserted)}; } Error InsertAsDatasetMessage(const std::string& collection, const MessageMeta& file, @@ -25,7 +26,7 @@ class MockDatabase : public Database { } MOCK_METHOD2(Connect_t, ErrorInterface * (const std::string&, const std::string&)); - MOCK_CONST_METHOD3(Insert_t, ErrorInterface * (const std::string&, const MessageMeta&, bool)); + MOCK_CONST_METHOD4(Insert_t, ErrorInterface * (const std::string&, const MessageMeta&, bool, uint64_t*)); MOCK_CONST_METHOD4(InsertAsDatasetMessage_t, ErrorInterface * (const std::string&, const MessageMeta&, uint64_t, bool)); diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp index d4e1de03f6f48e26ef6ff2c86194d3588575662d..423122f9aca51901b7792d61a30f5fd89f50001f 100644 --- a/common/cpp/src/database/mongodb_client.cpp +++ b/common/cpp/src/database/mongodb_client.cpp @@ -14,7 +14,19 @@ namespace asapo { using asapo::Database; +void +my_logger (mongoc_log_level_t log_level, + const char* log_domain, + const char* message, + void* user_data) { + /* smaller values are more important */ + if (log_level < MONGOC_LOG_LEVEL_CRITICAL) { + mongoc_log_default_handler (log_level, log_domain, message, user_data); + } +} + MongoDbInstance::MongoDbInstance() { + mongoc_log_set_handler (my_logger, NULL); mongoc_init(); } @@ -65,7 +77,7 @@ Error MongoDBClient::UpdateCurrentCollectionIfNeeded(const std::string& collecti mongoc_collection_destroy(current_collection_); } - auto encoded_name = EncodeColName(collection_name); + auto encoded_name = EncodeColName(collection_name); if (encoded_name.size() > maxCollectionNameLength) { return DBErrorTemplates::kWrongInput.Generate("collection name too long"); } @@ -142,7 +154,6 @@ bson_p PrepareBsonDocument(const MessageMeta& file, Error* err) { return nullptr; } - *err = nullptr; return bson_p{bson}; } @@ -156,7 +167,8 @@ bson_p PrepareUpdateDocument(const uint8_t* json, Error* err) { return nullptr; } bson_error_t mongo_err; - auto bson_meta = bson_new_from_json(reinterpret_cast<const uint8_t*>(json_flat.c_str()), json_flat.size(), &mongo_err); + auto bson_meta = + bson_new_from_json(reinterpret_cast<const uint8_t*>(json_flat.c_str()), json_flat.size(), &mongo_err); if (!bson_meta) { *err = DBErrorTemplates::kJsonParseError.Generate(mongo_err.message); return nullptr; @@ -164,7 +176,6 @@ bson_p PrepareUpdateDocument(const uint8_t* json, Error* err) { return bson_p{bson_meta}; } - bson_p PrepareInjestDocument(const uint8_t* json, ssize_t len, Error* err) { bson_error_t mongo_err; if (json == nullptr) { @@ -172,13 +183,12 @@ bson_p PrepareInjestDocument(const uint8_t* json, ssize_t len, Error* err) { return nullptr; } - auto bson_meta = bson_new_from_json(json, len, &mongo_err); if (!bson_meta) { *err = DBErrorTemplates::kJsonParseError.Generate(mongo_err.message); return nullptr; } - auto bson = bson_new(); + auto bson = bson_new(); if (!BSON_APPEND_DOCUMENT(bson, "meta", bson_meta) || !BSON_APPEND_UTF8(bson, "schema_version", GetDbSchemaVersion().c_str())) { *err = DBErrorTemplates::kInsertError.Generate("cannot add schema version "); @@ -193,13 +203,20 @@ bson_p PrepareInjestDocument(const uint8_t* json, ssize_t len, Error* err) { Error MongoDBClient::InsertBsonDocument(const bson_p& document, bool ignore_duplicates) const { bson_error_t mongo_err; - if (!mongoc_collection_insert_one(current_collection_, document.get(), NULL, NULL, &mongo_err)) { + bson_p insert_opts{bson_new ()}; + + if (current_session_) { + if (!mongoc_client_session_append (current_session_, insert_opts.get(), &mongo_err)) { + return DBErrorTemplates::kInsertError.Generate(std::string("Could not add session to opts: ") + mongo_err.message); + } + } + + if (!mongoc_collection_insert_one(current_collection_, document.get(), insert_opts.get(), NULL, &mongo_err)) { if (mongo_err.code == MONGOC_ERROR_DUPLICATE_KEY) { return ignore_duplicates ? nullptr : DBErrorTemplates::kDuplicateID.Generate(); } return DBErrorTemplates::kInsertError.Generate(mongo_err.message); } - return nullptr; } @@ -232,7 +249,7 @@ Error MongoDBClient::ReplaceBsonDocument(const std::string& id, const bson_p& do bson_free(opts); bson_free(selector); - bson_destroy (&reply); + bson_destroy(&reply); return err; } @@ -242,7 +259,7 @@ Error MongoDBClient::UpdateBsonDocument(const std::string& id, const bson_p& doc bson_t* opts = BCON_NEW ("upsert", BCON_BOOL(upsert)); bson_t* selector = BCON_NEW ("_id", BCON_UTF8(id.c_str())); - bson_t* update = BCON_NEW ("$set", BCON_DOCUMENT(document.get())); + bson_t* update = BCON_NEW ("$set", BCON_DOCUMENT(document.get())); bson_t reply; Error err = nullptr; @@ -256,13 +273,87 @@ Error MongoDBClient::UpdateBsonDocument(const std::string& id, const bson_p& doc bson_free(opts); bson_free(selector); + bson_destroy(&reply); + bson_destroy(update); + + return err; +} + +Error MongoDBClient::GetNextId(const std::string& stream, uint64_t* id) const { + mongoc_find_and_modify_opts_t* opts; + std::string collection_name = "auto_id_counters"; + auto collection = mongoc_client_get_collection(client_, database_name_.c_str(), collection_name.c_str()); + mongoc_collection_set_write_concern(collection, write_concern_); + + bson_t reply; + bson_error_t error; + bson_t query = BSON_INITIALIZER; + bson_t* update; + bool success; + BSON_APPEND_UTF8 (&query, "_id", stream.c_str()); + update = BCON_NEW ("$inc", "{", "curIndex", BCON_INT64(1), "}"); + opts = mongoc_find_and_modify_opts_new (); + mongoc_find_and_modify_opts_set_update (opts, update); + if (current_session_) { + bson_p extra_opts{bson_new()}; + if (!mongoc_client_session_append(current_session_, extra_opts.get(), &error)) { + return DBErrorTemplates::kInsertError.Generate( + std::string("Could not add session to opts: ") + error.message); + } + success = mongoc_find_and_modify_opts_append(opts, extra_opts.get()); + if (!success) { + return DBErrorTemplates::kInsertError.Generate(std::string( + "mongoc_find_and_modify_opts_append: cannot append options")); + } + } + mongoc_find_and_modify_opts_set_flags(opts, + mongoc_find_and_modify_flags_t(MONGOC_FIND_AND_MODIFY_UPSERT | MONGOC_FIND_AND_MODIFY_RETURN_NEW)); + success = mongoc_collection_find_and_modify_with_opts ( + collection, &query, opts, &reply, &error); + Error err; + if (success) { + bson_iter_t iter; + bson_iter_t iter_idx; + auto found = bson_iter_init(&iter, &reply) && bson_iter_find_descendant(&iter, "value.curIndex", &iter_idx) + && BSON_ITER_HOLDS_INT64(&iter_idx); + if (found) { + *id = bson_iter_int64(&iter_idx); + } else { + err = DBErrorTemplates::kInsertError.Generate(std::string("cannot extract auto id")); + } + } else { +// auto str = bson_as_relaxed_extended_json(&reply, NULL); +// printf("%s",str); +// bson_free(str); + + err = DBErrorTemplates::kInsertError.Generate(std::string("cannot get auto id:") + error.message ); + } bson_destroy (&reply); bson_destroy (update); - + bson_destroy (&query); + mongoc_find_and_modify_opts_destroy (opts); + mongoc_collection_destroy(collection); return err; } -Error MongoDBClient::Insert(const std::string& collection, const MessageMeta& file, bool ignore_duplicates) const { +Error MongoDBClient::InsertWithAutoId(const MessageMeta& file, + const std::string& collection, + uint64_t* id_inserted) const { + bson_error_t error; + + uint64_t id; + auto err = GetNextId(current_collection_name_, &id); + if (err != nullptr) { + return err; + } + + auto meta_new = file; + meta_new.id = id; + return Insert(current_collection_name_, meta_new, false, id_inserted); +} + +Error MongoDBClient::Insert(const std::string& collection, const MessageMeta& file, bool ignore_duplicates, + uint64_t* id_inserted) const { if (!connected_) { return DBErrorTemplates::kNotConnected.Generate(); } @@ -272,12 +363,20 @@ Error MongoDBClient::Insert(const std::string& collection, const MessageMeta& fi return err; } + if (file.id == 0) { + return InsertWithAutoId(file, collection, id_inserted); + } + auto document = PrepareBsonDocument(file, &err); if (err) { return err; } - return InsertBsonDocument(document, ignore_duplicates); + err = InsertBsonDocument(document, ignore_duplicates); + if (!err && id_inserted) { + *id_inserted = file.id; + } + return err; } MongoDBClient::~MongoDBClient() { @@ -317,7 +416,7 @@ Error MongoDBClient::InsertMeta(const std::string& collection, const std::string } auto id_encoded = EncodeColName(id); - auto document = PrepareBsonDocument(data, (ssize_t)size, id_encoded, mode, &err); + auto document = PrepareBsonDocument(data, (ssize_t) size, id_encoded, mode, &err); if (err) { return err; } @@ -596,7 +695,6 @@ bool MongoCollectionIsDataStream(const std::string& stream_name) { return stream_name.rfind(prefix, 0) == 0; } - Error MongoDBClient::UpdateLastStreamInfo(const char* str, StreamInfo* info) const { auto collection_name = DecodeName(str); if (!MongoCollectionIsDataStream(collection_name)) { @@ -686,7 +784,8 @@ Error MongoDBClient::DeleteCollection(const std::string& name) const { mongoc_collection_destroy(collection); if (!r) { if (error.code == 26) { - return DBErrorTemplates::kNoRecord.Generate("collection " + name + " not found in " + DecodeName(database_name_)); + return DBErrorTemplates::kNoRecord.Generate( + "collection " + name + " not found in " + DecodeName(database_name_)); } else { return DBErrorTemplates::kDBError.Generate(std::string(error.message) + ": " + std::to_string(error.code)); } diff --git a/common/cpp/src/database/mongodb_client.h b/common/cpp/src/database/mongodb_client.h index d47ba4bc50c6c18ec7cec61ad3457e4b06447a4e..d97bd21f2c38fa579a64032eccb1cedddd7feeb8 100644 --- a/common/cpp/src/database/mongodb_client.h +++ b/common/cpp/src/database/mongodb_client.h @@ -47,7 +47,8 @@ class MongoDBClient final : public Database { public: MongoDBClient(); Error Connect(const std::string& address, const std::string& database) override; - Error Insert(const std::string& collection, const MessageMeta& file, bool ignore_duplicates) const override; + Error Insert(const std::string& collection, const MessageMeta& file, bool ignore_duplicates, + uint64_t* id_inserted) const override; Error InsertAsDatasetMessage(const std::string& collection, const MessageMeta& file, uint64_t dataset_size, bool ignore_duplicates) const override; Error InsertMeta(const std::string& collection, const std::string& id, const uint8_t* data, uint64_t size, @@ -58,10 +59,12 @@ class MongoDBClient final : public Database { Error GetLastStream(StreamInfo* info) const override; Error DeleteStream(const std::string& stream) const override; Error GetMetaFromDb(const std::string& collection, const std::string& id, std::string* res) const override; + Error GetNextId(const std::string& collection, uint64_t* id) const; ~MongoDBClient() override; private: mongoc_client_t* client_{nullptr}; mutable mongoc_collection_t* current_collection_{nullptr}; + mutable mongoc_client_session_t* current_session_{nullptr}; mutable std::string current_collection_name_; std::string database_name_; mongoc_write_concern_t* write_concern_{nullptr}; @@ -82,6 +85,15 @@ class MongoDBClient final : public Database { Error DeleteCollection(const std::string& name) const; Error DeleteCollections(const std::string& prefix) const; Error DeleteDocumentsInCollection(const std::string& collection_name, const std::string& querystr) const; + Error InsertWithAutoId(const MessageMeta& file, const std::string& collection, uint64_t* id_inserted) const; +}; + +struct TransactionContext { + const MongoDBClient* caller; + MessageMeta meta; + std::string collection; + bool ignore_duplicates; + Error err; }; } diff --git a/consumer/api/cpp/src/consumer_c_glue.cpp b/consumer/api/cpp/src/consumer_c_glue.cpp index 0a925c23ad714463ab0be99e2d2676c6357bcd4b..a4a2e5937a388935a73e2d1a5a1666fc411ba0a5 100644 --- a/consumer/api/cpp/src/consumer_c_glue.cpp +++ b/consumer/api/cpp/src/consumer_c_glue.cpp @@ -1,5 +1,6 @@ #define __CONSUMER_C_INTERFACE_IMPLEMENTATION__ #include "asapo/asapo_consumer.h" + //! boolean type typedef int AsapoBool; diff --git a/consumer/tools/folder_to_db/src/folder_db_importer.cpp b/consumer/tools/folder_to_db/src/folder_db_importer.cpp index 833ade112f4e6aa09a6b19f4a732a7b6ac889230..ed0bdf12a3d7a8946f65c523ea3184745e6cfd6d 100644 --- a/consumer/tools/folder_to_db/src/folder_db_importer.cpp +++ b/consumer/tools/folder_to_db/src/folder_db_importer.cpp @@ -21,7 +21,7 @@ Error FolderToDbImporter::ConnectToDb(const std::unique_ptr<asapo::Database>& db Error FolderToDbImporter::ImportSingleFile(const std::unique_ptr<asapo::Database>& db, const MessageMeta& file) const { - return db->Insert(std::string(kDBDataCollectionNamePrefix) + "_default", file, ignore_duplicates_); + return db->Insert(std::string(kDBDataCollectionNamePrefix) + "_default", file, ignore_duplicates_, nullptr); } Error FolderToDbImporter::ImportFilelistChunk(const std::unique_ptr<asapo::Database>& db, diff --git a/consumer/tools/folder_to_db/unittests/test_folder_to_db.cpp b/consumer/tools/folder_to_db/unittests/test_folder_to_db.cpp index f14a72dd418cfac576bf864ed3b6434ce72a4442..59a46d881e99784a9dfc2c98a48ca998c059b46d 100644 --- a/consumer/tools/folder_to_db/unittests/test_folder_to_db.cpp +++ b/consumer/tools/folder_to_db/unittests/test_folder_to_db.cpp @@ -177,7 +177,7 @@ TEST_F(FolderDBConverterTests, ErrorWhenCannotGetFileList) { TEST_F(FolderDBConverterTests, PassesIgnoreDuplicates) { - EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(expected_collection_name, _, true)).Times(3); + EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(expected_collection_name, _, true, _)).Times(3); converter.IgnoreDuplicates(true); converter.Convert(uri, folder, db_name); @@ -186,7 +186,7 @@ TEST_F(FolderDBConverterTests, PassesIgnoreDuplicates) { TEST_F(FolderDBConverterTests, ErrorWhenCannotImportFileListToDb) { - EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(_, _, _)). + EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(_, _, _, _)). WillOnce(testing::Return(asapo::DBErrorTemplates::kInsertError.Generate().release())); auto error = converter.Convert(uri, folder, db_name); @@ -205,7 +205,7 @@ MATCHER_P(CompareMessageMeta, file, "") { TEST_F(FolderDBConverterTests, PassesFileListToInsert) { for (auto& file : message_metas) { - EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(expected_collection_name, CompareMessageMeta(file), _)). + EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(expected_collection_name, CompareMessageMeta(file), _, _)). WillOnce(testing::Return(nullptr)); } @@ -216,11 +216,11 @@ TEST_F(FolderDBConverterTests, PassesFileListToInsert) { TEST_F(FolderDBConverterTests, PassesFileListToInsertInParallel3by3) { - EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(expected_collection_name, CompareMessageMeta(message_metas[0]), _)). + EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(expected_collection_name, CompareMessageMeta(message_metas[0]), _, _)). WillOnce(testing::Return(nullptr)); - EXPECT_CALL(*(mock_dbf->db[1]), Insert_t(expected_collection_name, CompareMessageMeta(message_metas[1]), _)). + EXPECT_CALL(*(mock_dbf->db[1]), Insert_t(expected_collection_name, CompareMessageMeta(message_metas[1]), _, _)). WillOnce(testing::Return(nullptr)); - EXPECT_CALL(*(mock_dbf->db[2]), Insert_t(expected_collection_name, CompareMessageMeta(message_metas[2]), _)). + EXPECT_CALL(*(mock_dbf->db[2]), Insert_t(expected_collection_name, CompareMessageMeta(message_metas[2]), _, _)). WillOnce(testing::Return(nullptr)); converter.SetNParallelTasks(3, false); @@ -230,11 +230,11 @@ TEST_F(FolderDBConverterTests, PassesFileListToInsertInParallel3by3) { TEST_F(FolderDBConverterTests, PassesFileListToInsertInParallel3by2) { - EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(expected_collection_name, CompareMessageMeta(message_metas[0]), _)). + EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(expected_collection_name, CompareMessageMeta(message_metas[0]), _, _)). WillOnce(testing::Return(nullptr)); - EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(expected_collection_name, CompareMessageMeta(message_metas[1]), _)). + EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(expected_collection_name, CompareMessageMeta(message_metas[1]), _, _)). WillOnce(testing::Return(nullptr)); - EXPECT_CALL(*(mock_dbf->db[1]), Insert_t(expected_collection_name, CompareMessageMeta(message_metas[2]), _)). + EXPECT_CALL(*(mock_dbf->db[1]), Insert_t(expected_collection_name, CompareMessageMeta(message_metas[2]), _, _)). WillOnce(testing::Return(nullptr)); converter.SetNParallelTasks(2, false); @@ -244,7 +244,7 @@ TEST_F(FolderDBConverterTests, PassesFileListToInsertInParallel3by2) { TEST_F(FolderDBConverterTests, ComputesStatistics) { - EXPECT_CALL(*mock_dbf->db[0], Insert_t(_, _, false)). + EXPECT_CALL(*mock_dbf->db[0], Insert_t(_, _, false, _)). Times(message_metas.size()). WillRepeatedly(testing::Return(nullptr)); diff --git a/examples/consumer/getnext/CMakeLists.txt b/examples/consumer/getnext/CMakeLists.txt index 6353d1706436a5acf5e9997b65436cbc4562d63f..1f6310650bf3527c71358fcf444891c343a60993 100644 --- a/examples/consumer/getnext/CMakeLists.txt +++ b/examples/consumer/getnext/CMakeLists.txt @@ -11,9 +11,6 @@ set_target_properties(${TARGET_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}$<$<CONFIG:Debug>:> ) -get_target_property(VAR ${TARGET_NAME} RUNTIME_OUTPUT_DIRECTORY) -add_dependencies(${TARGET_NAME} asapo-broker) - add_script_test("${TARGET_NAME}" "${CMAKE_CURRENT_BINARY_DIR}/${TARGET_NAME}") endif() diff --git a/producer/api/cpp/include/asapo/producer/common.h b/producer/api/cpp/include/asapo/producer/common.h index 5b2da89b06ec02a429928c484e516f1be3c1025b..7ea9293816fb3db9bdcbf36a6e4922706a916697 100644 --- a/producer/api/cpp/include/asapo/producer/common.h +++ b/producer/api/cpp/include/asapo/producer/common.h @@ -25,26 +25,29 @@ enum class RequestHandlerType { kFilesystem }; - struct MessageHeader { MessageHeader() {}; MessageHeader(uint64_t message_id_i, uint64_t data_size_i, std::string file_name_i, std::string user_metadata_i = "", uint64_t dataset_substream_i = 0, - uint64_t dataset_size_i = 0 ): + uint64_t dataset_size_i = 0, + bool auto_id_i = false): message_id{message_id_i}, data_size{data_size_i}, file_name{std::move(file_name_i)}, user_metadata{std::move(user_metadata_i)}, dataset_substream{dataset_substream_i}, - dataset_size{dataset_size_i} {}; + dataset_size{dataset_size_i}, + auto_id{auto_id_i} {}; uint64_t message_id = 0; uint64_t data_size = 0; std::string file_name; std::string user_metadata; uint64_t dataset_substream = 0; uint64_t dataset_size = 0; + bool auto_id = false; }; + } #endif //ASAPO_PRODUCER_COMMON_H diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index a0f2c7477e40b883bf057697c7527e83cc5f8620..26ed89d549a99c83b6da4f8e254603982da299f0 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -69,12 +69,7 @@ Error CheckIngestMode(uint64_t ingest_mode) { return nullptr; } -Error CheckProducerRequest(const MessageHeader& message_header, uint64_t ingest_mode, const std::string& stream) { - - if (stream.empty()) { - return ProducerErrorTemplates::kWrongInput.Generate("stream empty"); - } - +Error CheckFileNameInRequest(const MessageHeader& message_header) { if (message_header.file_name.size() > kMaxMessageSize) { return ProducerErrorTemplates::kWrongInput.Generate("too long filename"); } @@ -82,13 +77,53 @@ Error CheckProducerRequest(const MessageHeader& message_header, uint64_t ingest_ if (message_header.file_name.empty()) { return ProducerErrorTemplates::kWrongInput.Generate("empty filename"); } + return nullptr; +} + +Error CheckDatasetInRequest(const MessageHeader& message_header) { + if (!message_header.dataset_substream) { + return nullptr; + } - if (message_header.dataset_substream > 0 && message_header.dataset_size == 0) { + if (message_header.dataset_size == 0) { return ProducerErrorTemplates::kWrongInput.Generate("dataset dimensions"); } - if (message_header.message_id == 0) { - return ProducerErrorTemplates::kWrongInput.Generate("message id should be positive"); + if (message_header.auto_id) { + return ProducerErrorTemplates::kWrongInput.Generate("auto id mode not implemented for datasets"); + } + + return nullptr; +} + +Error CheckMessageIdInRequest(const MessageHeader& message_header) { + if (message_header.auto_id) { + if (message_header.message_id) { + return ProducerErrorTemplates::kWrongInput.Generate("message id should be 0 for auto id mode"); + } + } else { + if (message_header.message_id == 0) { + return ProducerErrorTemplates::kWrongInput.Generate("message id should be positive"); + } + } + return nullptr; +} + +Error CheckProducerRequest(const MessageHeader& message_header, uint64_t ingest_mode, const std::string& stream) { + if (stream.empty()) { + return ProducerErrorTemplates::kWrongInput.Generate("stream empty"); + } + + if (auto err = CheckFileNameInRequest(message_header)) { + return err; + } + + if (auto err = CheckDatasetInRequest(message_header)) { + return err; + } + + if (auto err = CheckMessageIdInRequest(message_header)) { + return err; } return CheckIngestMode(ingest_mode); @@ -161,12 +196,12 @@ Error ProducerImpl::Send(const MessageHeader& message_header, return HandleErrorFromPool(std::move(err), manage_data_memory); } -bool WandTransferData(uint64_t ingest_mode) { +bool WantTransferData(uint64_t ingest_mode) { return ingest_mode & IngestModeFlags::kTransferData; } Error CheckData(uint64_t ingest_mode, const MessageHeader& message_header, const MessageData* data) { - if (WandTransferData(ingest_mode)) { + if (WantTransferData(ingest_mode)) { if (*data == nullptr) { return ProducerErrorTemplates::kWrongInput.Generate("need data for this ingest mode"); } diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index 216ac1a73ff0046adee313392c71dcc5b22391d4..667f3723a93914d6de35647ada6caf180c7df49d 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -164,6 +164,27 @@ TEST_F(ProducerImplTests, ErrorIfZeroDataSize) { ASSERT_THAT(err_data, Ne(nullptr)); } +TEST_F(ProducerImplTests, ErrorIfBothIdAndAutoIdSet) { + asapo::MessageData data = asapo::MessageData{new uint8_t[100]}; + asapo::MessageHeader message_header{1, 100, expected_fullpath, "", 0, 0, true}; + auto err = producer.Send(message_header, std::move(data), asapo::kDefaultIngestMode, "default", nullptr); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); + auto err_data = static_cast<asapo::OriginalData*>(err->GetCustomData()); + ASSERT_THAT(err_data, Ne(nullptr)); +} + +TEST_F(ProducerImplTests, OkAutoId) { + asapo::MessageHeader message_header{0, 100, expected_fullpath, "", 0, 0, true}; + auto err = producer.Send(message_header, nullptr, asapo::kTransferMetaDataOnly, "default", nullptr); + ASSERT_THAT(err, Eq(nullptr)); +} + +TEST_F(ProducerImplTests, ErrorIfAutoIdForSet) { + asapo::MessageHeader message_header{0, 0, expected_fullpath, "", 1, 1, true}; + auto err = producer.Send(message_header, nullptr, asapo::kTransferMetaDataOnly, "default", nullptr); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); +} + TEST_F(ProducerImplTests, ErrorIfNoData) { asapo::MessageHeader message_header{1, 100, expected_fullpath}; auto err = producer.Send(message_header, nullptr, asapo::kDefaultIngestMode, "default", nullptr); diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index e50f762bc05d4b47cb920d36cc6e58e4a7f2d764..fc762012bc9d27f6974625d37184df6e5528e6c6 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -75,6 +75,8 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo": string user_metadata uint64_t dataset_substream uint64_t dataset_size + bool auto_id + cdef extern from "asapo/asapo_producer.h" namespace "asapo": struct GenericRequestHeader: diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index f681fd636d495403c55920bb580428332bfb8b27..d0101a01b7e853d53350f2413296230c7fe6cff7 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -1,5 +1,6 @@ #distutils: language=c++ + cimport asapo_producer import numpy as np cimport numpy as np @@ -139,8 +140,8 @@ cdef class PyProducer: return {'client': _str(client_info), 'server': _str(server_info), 'supported': supported} else: return {'client': _str(client_info)} - def __send_np_array(self, id, exposed_path,data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None): - cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode) + def __send_np_array(self, id, exposed_path,data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False): + cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id) if data is None: message_header.data_size = 0 else: @@ -158,11 +159,12 @@ cdef class PyProducer: if callback != None: Py_XINCREF(<PyObject*>callback) return - cdef MessageHeader create_message_header(self,uint64_t id, exposed_path,user_meta,dataset,ingest_mode): + cdef MessageHeader create_message_header(self,uint64_t id, exposed_path,user_meta,dataset,ingest_mode,auto_id): cdef MessageHeader message_header message_header.message_id = id message_header.file_name = _bytes(exposed_path) message_header.user_metadata = _bytes(user_meta) if user_meta!=None else "" + message_header.auto_id = auto_id if dataset == None: message_header.dataset_substream = 0 message_header.dataset_size = 0 @@ -171,8 +173,8 @@ cdef class PyProducer: message_header.dataset_size = dataset[1] return message_header - def __send_bytes(self, id, exposed_path,data, user_meta=None,dataset=None, stream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None): - cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode) + def __send_bytes(self, id, exposed_path,data, user_meta=None,dataset=None, stream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False): + cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id) message_header.data_size = len(data) err = self.c_producer.get().Send__(message_header, data_pointer_bytes(data), ingest_mode, _bytes(stream), unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr, @@ -231,7 +233,8 @@ cdef class PyProducer: throw_exception(err) if callback != None: Py_XINCREF(<PyObject*>callback) - def send(self, uint64_t id, exposed_path, data, user_meta=None, dataset=None, ingest_mode = DEFAULT_INGEST_MODE, stream = "default", callback=None): + def send(self, uint64_t id, exposed_path, data, user_meta=None, dataset=None, ingest_mode = DEFAULT_INGEST_MODE, + stream = "default", callback=None, auto_id = False): """ :param id: unique data id :type id: int @@ -249,14 +252,16 @@ cdef class PyProducer: :type stream: string :param callback: callback function, default None :type callback: callback(info,err), where info - json string with event header that was used to send data and response, err - error string or None + :param auto_id: a flag to assign ids automatically, id must be 0 when auto_id = True + :type auto_id: Boolean :raises: AsapoWrongInputError: wrong input (authorization, meta, ...) AsapoProducerError: actually should not happen """ if type(data) == np.ndarray or data == None: - self.__send_np_array(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback) + self.__send_np_array(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id) elif type(data) == bytes: - self.__send_bytes(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback) + self.__send_bytes(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id) else: raise(AsapoProducerError("wrong data type: " + str(type(data)))) def send_stream_finished_flag(self, stream, uint64_t last_id, next_stream = None, callback = None): @@ -371,7 +376,8 @@ cdef class PyProducer: if err: throw_exception(err) return json.loads(_str(info.Json())) - def send_file(self, uint64_t id, local_path, exposed_path, user_meta=None, dataset=None, ingest_mode = DEFAULT_INGEST_MODE, stream = "default", callback=None): + def send_file(self, uint64_t id, local_path, exposed_path, user_meta=None, dataset=None, + ingest_mode = DEFAULT_INGEST_MODE, stream = "default", callback=None, auto_id = False): """ :param id: unique data id :type id: int @@ -389,13 +395,15 @@ cdef class PyProducer: :type stream: string :param callback: callback function, default None :type callback: callback(info,err), where info - json string with event header that was used to send data and response, err - error string or None + :param auto_id: a flag to assign ids automatically, id must be 0 when auto_id = True + :type auto_id: Boolean :raises: AsapoWrongInputError: wrong input (authorization, meta, ...) AsapoLocalIOError: problems reading file to send AsapoProducerError: actually should not happen """ - cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode) + cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id) message_header.data_size = 0 err = self.c_producer.get().SendFile(message_header, _bytes(local_path), ingest_mode, _bytes(stream), unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL)) diff --git a/receiver/src/request_handler/request_handler_db_write.cpp b/receiver/src/request_handler/request_handler_db_write.cpp index db28504e4f66991a18f257362d8e6e76ade59795..cf53fe3088798153299209f0105df9669e0c6d4f 100644 --- a/receiver/src/request_handler/request_handler_db_write.cpp +++ b/receiver/src/request_handler/request_handler_db_write.cpp @@ -58,9 +58,10 @@ Error RequestHandlerDbWrite::InsertRecordToDb(const Request* request) const { auto col_name = collection_name_prefix_ + "_" + request->GetStream(); Error err; if (op_code == Opcode::kOpcodeTransferData) { - err = db_client__->Insert(col_name, message_meta, false); + uint64_t id_inserted{0}; + err = db_client__->Insert(col_name, message_meta, false, &id_inserted); if (!err) { - log__->Debug(std::string{"insert record id "} + std::to_string(message_meta.id) + " to " + col_name + " in " + + log__->Debug(std::string{"insert record id "} + std::to_string(id_inserted) + " to " + col_name + " in " + db_name_ + " at " + GetReceiverConfig()->database_uri); } diff --git a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp index d12f6ef1f4863728cb4a557698ca3eb60a3e2f0d..14e8a1ca978bd2f63297fb539cc4e5001bd04df5 100644 --- a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp @@ -220,7 +220,7 @@ TEST_F(DbWriterHandlerTests, CallsInsert) { ExpectRequestParams(asapo::Opcode::kOpcodeTransferData, expected_data_source); auto message_meta = PrepareMessageMeta(); - EXPECT_CALL(mock_db, Insert_t(expected_collection_name, CompareMessageMeta(message_meta), false)). + EXPECT_CALL(mock_db, Insert_t(expected_collection_name, CompareMessageMeta(message_meta), false, _)). WillOnce(testing::Return(nullptr)); ExpectLogger(); @@ -246,7 +246,7 @@ void DbWriterHandlerTests::ExpectDuplicatedID() { ExpectRequestParams(asapo::Opcode::kOpcodeTransferData, expected_data_source); auto message_meta = PrepareMessageMeta(); - EXPECT_CALL(mock_db, Insert_t(expected_collection_name, CompareMessageMeta(message_meta), false)). + EXPECT_CALL(mock_db, Insert_t(expected_collection_name, CompareMessageMeta(message_meta), false, _)). WillOnce(testing::Return(asapo::DBErrorTemplates::kDuplicateID.Generate().release())); } diff --git a/tests/automatic/mongo_db/CMakeLists.txt b/tests/automatic/mongo_db/CMakeLists.txt index 9357532e7dd02570819d5839fc5426b9dbb327b5..5ce635b2f298e8393d81ee5634c27422a82c0f3a 100644 --- a/tests/automatic/mongo_db/CMakeLists.txt +++ b/tests/automatic/mongo_db/CMakeLists.txt @@ -2,6 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.7) # needed for fixtures add_subdirectory(connect) add_subdirectory(insert_retrieve) +add_subdirectory(auto_id) add_subdirectory(insert_retrieve_dataset) add_subdirectory(meta) diff --git a/tests/automatic/mongo_db/auto_id/CMakeLists.txt b/tests/automatic/mongo_db/auto_id/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..37afeaffdc220b5d0b7593f54845225428e105dd --- /dev/null +++ b/tests/automatic/mongo_db/auto_id/CMakeLists.txt @@ -0,0 +1,17 @@ +set(TARGET_NAME mongo-auto-id) +set(SOURCE_FILES auto_id.cpp) + + +################################ +# Executable and link +################################ +add_executable(${TARGET_NAME} ${SOURCE_FILES}) +target_link_libraries(${TARGET_NAME} test_common database) +target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR}) + +################################ +# Testing +################################ +add_test_cleanup(${TARGET_NAME}) +add_integration_test(${TARGET_NAME} trans "trans 4 100") +add_integration_test(${TARGET_NAME} seq "seq 4 100") diff --git a/tests/automatic/mongo_db/auto_id/auto_id.cpp b/tests/automatic/mongo_db/auto_id/auto_id.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b138e88f6c38c96b518b1d88f34b401895fd6f51 --- /dev/null +++ b/tests/automatic/mongo_db/auto_id/auto_id.cpp @@ -0,0 +1,124 @@ +#include <iostream> +#include <chrono> +#include <thread> +#include <atomic> + +#include "../../../common/cpp/src/database/mongodb_client.h" +#include "asapo/database/db_error.h" + +#include "testing.h" +#include "asapo/common/data_structs.h" + +using asapo::Error; + +using std::chrono::high_resolution_clock; +using std::chrono::duration_cast; +using std::chrono::duration; +using std::chrono::milliseconds; + +std::atomic<uint64_t> global_count{0}; + +enum class Mode { + kTransaction, + kUpdateCounterThenIngest +}; + +struct Args { + Mode mode; + std::string str_mode; + int n_threads; + int n_messages_per_thread; +}; + +Args GetArgs(int argc, char* argv[]) { + if (argc != 4) { + std::cout << "Wrong number of arguments" << std::endl; + exit(EXIT_FAILURE); + } + std::string mode = argv[1]; + Args args; + if (mode == "trans" ) { + args.mode = Mode::kTransaction; + } else if (mode == "seq" ) { + args.mode = Mode::kUpdateCounterThenIngest; + } else { + printf("wrong mode"); + exit(1); + } + args.str_mode = mode; + args.n_threads = atoi(argv[2]); + args.n_messages_per_thread = atoi(argv[3]); + return args; +} + +void insert(const asapo::MongoDBClient& db, const std::string& name, asapo::MessageMeta fi, const Args& args) { + auto start = fi.id; + for (int i = 0; i < args.n_messages_per_thread; i++) { + switch (args.mode) { + case Mode::kTransaction: + fi.id = 0; + break; + case Mode::kUpdateCounterThenIngest: + fi.id = start + i + 1; + break; + default: + abort(); + } + uint64_t inserted_id{0}; + Error err = db.Insert(std::string("data_") + name, fi, false, &inserted_id); + if (err != nullptr) { + printf("%s\n", err->Explain().c_str()); +// break; + } else { + if (inserted_id == 0) { + M_AssertTrue(false); + } + global_count++; + } + } +} + +int main(int argc, char* argv[]) { + auto args = GetArgs(argc, argv); + auto db_name = R"(data_/ \."$)"; + + auto exec_next = [&args, db_name](int i) { + asapo::MessageMeta fi; + asapo::MongoDBClient db; + fi.size = 100; + fi.name = "relpath/1"; + fi.timestamp = std::chrono::system_clock::now(); + fi.buf_id = 18446744073709551615ull; + fi.source = "host:1234"; + fi.id = args.n_messages_per_thread * i; + db.Connect("127.0.0.1", db_name); + insert(db, "stream", fi, args); + }; + + auto t1 = high_resolution_clock::now(); + + std::vector<std::thread> threads; + for (int i = 0; i < args.n_threads; i++) { + threads.emplace_back(std::thread(exec_next, i)); + } + for (auto& thread : threads) { + thread.join(); + } + + auto messages_sent = global_count.load(); + + printf("Sent %llu messages \n", messages_sent); + M_AssertTrue(messages_sent == args.n_threads * args.n_messages_per_thread); + + auto t2 = high_resolution_clock::now(); + auto ms_int = duration_cast<milliseconds>(t2 - t1).count(); + printf("mode: %s, throughput %llu messages/sec with %d threads\n", args.str_mode.c_str(), 1000 * messages_sent / ms_int, + args.n_threads); + + asapo::MongoDBClient db; + db.Connect("127.0.0.1", db_name); + db.DeleteStream("stream"); + + + return 0; +} diff --git a/tests/automatic/mongo_db/auto_id/cleanup_linux.sh b/tests/automatic/mongo_db/auto_id/cleanup_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..71093a2577be0c3a864a60150290b04338c6f057 --- /dev/null +++ b/tests/automatic/mongo_db/auto_id/cleanup_linux.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +database_name=data_%2F%20%5C%2E%22%24 + +echo "db.dropDatabase()" | mongo ${database_name} diff --git a/tests/automatic/mongo_db/auto_id/cleanup_windows.bat b/tests/automatic/mongo_db/auto_id/cleanup_windows.bat new file mode 100644 index 0000000000000000000000000000000000000000..8a64516d46643ebc6910c3c6b8e2248f7e779ac1 --- /dev/null +++ b/tests/automatic/mongo_db/auto_id/cleanup_windows.bat @@ -0,0 +1,4 @@ +SET database_name=data_%%2F%%20%%5C%%2E%%22%%24 +SET mongo_exe="c:\Program Files\MongoDB\Server\4.2\bin\mongo.exe" + +echo db.dropDatabase() | %mongo_exe% %database_name% diff --git a/tests/automatic/mongo_db/insert_retrieve/CMakeLists.txt b/tests/automatic/mongo_db/insert_retrieve/CMakeLists.txt index d1eef84c9f19c8dbf8f9335bc9b9545ae7483137..261380562b2acae897dae87f4dc58eab83a3e92c 100644 --- a/tests/automatic/mongo_db/insert_retrieve/CMakeLists.txt +++ b/tests/automatic/mongo_db/insert_retrieve/CMakeLists.txt @@ -1,4 +1,4 @@ -set(TARGET_NAME insert_retrieve_mongodb) +set(TARGET_NAME mongo-insert_retrieve_mongodb) set(SOURCE_FILES insert_retrieve_mongodb.cpp) diff --git a/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp b/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp index d21e7051fc7a4d70efb5aec0a0661e088a2187f1..68f790722ca2abc1d7d2219b68d560de3e02f557 100644 --- a/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp +++ b/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp @@ -68,11 +68,11 @@ int main(int argc, char* argv[]) { db.Connect("127.0.0.1", db_name); } - auto err = db.Insert(std::string("data_") + stream_name, fi, false); + auto err = db.Insert(std::string("data_") + stream_name, fi, false, nullptr); if (args.keyword == "DuplicateID") { Assert(err, "OK"); - err = db.Insert(std::string("data_") + stream_name, fi, false); + err = db.Insert(std::string("data_") + stream_name, fi, false, nullptr); } std::this_thread::sleep_for(std::chrono::milliseconds(10)); @@ -83,8 +83,8 @@ int main(int argc, char* argv[]) { fi2.timestamp = std::chrono::system_clock::now() + std::chrono::minutes(1); fi2.name = asapo::kFinishStreamKeyword; fi2.metadata = R"({"next_stream":"ns"})"; - db.Insert("data_test1", fi1, false); - db.Insert("data_test1", fi2, false); + db.Insert("data_test1", fi1, false, nullptr); + db.Insert("data_test1", fi2, false, nullptr); Assert(err, args.keyword); @@ -112,10 +112,10 @@ int main(int argc, char* argv[]) { M_AssertEq("ns", info.next_stream); // delete stream - db.Insert(std::string("inprocess_") + stream_name + "_blabla", fi, false); - db.Insert(std::string("inprocess_") + stream_name + "_blabla1", fi, false); - db.Insert(std::string("acks_") + stream_name + "_blabla", fi, false); - db.Insert(std::string("acks_") + stream_name + "_blabla1", fi, false); + db.Insert(std::string("inprocess_") + stream_name + "_blabla", fi, false, nullptr); + db.Insert(std::string("inprocess_") + stream_name + "_blabla1", fi, false, nullptr); + db.Insert(std::string("acks_") + stream_name + "_blabla", fi, false, nullptr); + db.Insert(std::string("acks_") + stream_name + "_blabla1", fi, false, nullptr); db.DeleteStream(stream_name); err = db.GetStreamInfo(std::string("data_") + stream_name, &info); M_AssertTrue(info.last_id == 0); @@ -140,7 +140,7 @@ int main(int argc, char* argv[]) { db1.Connect("127.0.0.1", db_name); auto long_stream_name = GenRandomString(120); - err = db1.Insert(long_stream_name, fi, true); + err = db1.Insert(long_stream_name, fi, true, nullptr); M_AssertTrue(err == asapo::DBErrorTemplates::kWrongInput); diff --git a/tests/automatic/mongo_db/insert_retrieve_dataset/CMakeLists.txt b/tests/automatic/mongo_db/insert_retrieve_dataset/CMakeLists.txt index d4c09f19733a86ed3d81c341663a9f3f0cfcfac3..b5a491984694b9654b5ce34c2be743f9f577441d 100644 --- a/tests/automatic/mongo_db/insert_retrieve_dataset/CMakeLists.txt +++ b/tests/automatic/mongo_db/insert_retrieve_dataset/CMakeLists.txt @@ -1,4 +1,4 @@ -set(TARGET_NAME insert_retrieve_dataset_mongodb) +set(TARGET_NAME mongo-insert_retrieve_dataset_mongodb) set(SOURCE_FILES insert_retrieve_dataset_mongodb.cpp) diff --git a/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp b/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp index 7e2f8e58042ffe1b34348f9296c9802934bb2bfe..b6ffe373fb29c8f97f9ad5cadd368383c218ea8a 100644 --- a/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp +++ b/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp @@ -87,7 +87,7 @@ int main(int argc, char* argv[]) { fi2.timestamp = std::chrono::system_clock::now() + std::chrono::minutes(1); fi2.name = asapo::kFinishStreamKeyword; fi2.metadata = R"({"next_stream":"ns"})"; - db.Insert("data_test", fi2, false); + db.Insert("data_test", fi2, false, nullptr); err = db.GetLastStream(&info_last); M_AssertEq(nullptr, err); M_AssertEq("test", info_last.name); diff --git a/tests/automatic/mongo_db/meta/CMakeLists.txt b/tests/automatic/mongo_db/meta/CMakeLists.txt index 609cd1581b9f6d943d7599cbb43e2ff002aea0dd..93f413907362888d806eac32cd46bd6dc26eaa46 100644 --- a/tests/automatic/mongo_db/meta/CMakeLists.txt +++ b/tests/automatic/mongo_db/meta/CMakeLists.txt @@ -1,4 +1,4 @@ -set(TARGET_NAME meta_mongodb) +set(TARGET_NAME mongo-meta) set(SOURCE_FILES meta_mongodb.cpp) diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index 6f4bad1586c4f404f4c98afa3ba1157fa5efe18e..beb45a12dd9d5544496655a28f9ca709a017ba14 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -117,8 +117,8 @@ else: # wait before sending to another stream so we sure that this stream appears later producer.wait_requests_finished(50000) -# send to another stream -producer.send(1, "processed/" + data_source + "/" + "file9", None, +# send to another stream with auto id +producer.send(0, "processed/" + data_source + "/" + "file9", None,auto_id = True, ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, stream="stream/test $", callback=callback) # wait normal requests finished before sending duplicates @@ -153,7 +153,7 @@ n = producer.get_requests_queue_size() assert_eq(n, 0, "requests in queue") # send another data to stream stream -producer.send(2, "processed/" + data_source + "/" + "file10", None, +producer.send(0, "processed/" + data_source + "/" + "file10", None, auto_id = True, ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, stream="stream/test $", callback=callback) producer.wait_requests_finished(50000)