From a291edc9e85d460aa620e44d1c17a3f1b39be717 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Fri, 18 Jun 2021 11:58:03 +0200 Subject: [PATCH] initial version of auto id --- 3d_party/mongo-c-driver/install.sh | 19 ++- CHANGELOG.md | 1 + common/cpp/src/database/mongodb_client.cpp | 154 ++++++++++++++++-- common/cpp/src/database/mongodb_client.h | 11 ++ tests/automatic/mongo_db/CMakeLists.txt | 1 + .../automatic/mongo_db/auto_id/CMakeLists.txt | 17 ++ tests/automatic/mongo_db/auto_id/auto_id.cpp | 120 ++++++++++++++ .../mongo_db/auto_id/cleanup_linux.sh | 5 + .../mongo_db/auto_id/cleanup_windows.bat | 4 + .../mongo_db/insert_retrieve/CMakeLists.txt | 2 +- .../insert_retrieve_dataset/CMakeLists.txt | 2 +- tests/automatic/mongo_db/meta/CMakeLists.txt | 2 +- 12 files changed, 316 insertions(+), 22 deletions(-) create mode 100644 tests/automatic/mongo_db/auto_id/CMakeLists.txt create mode 100644 tests/automatic/mongo_db/auto_id/auto_id.cpp create mode 100644 tests/automatic/mongo_db/auto_id/cleanup_linux.sh create mode 100644 tests/automatic/mongo_db/auto_id/cleanup_windows.bat diff --git a/3d_party/mongo-c-driver/install.sh b/3d_party/mongo-c-driver/install.sh index eba4e08f6..a76720bc4 100755 --- a/3d_party/mongo-c-driver/install.sh +++ b/3d_party/mongo-c-driver/install.sh @@ -1,12 +1,21 @@ #!/usr/bin/env bash + +set -e + +if [[ p$1 == "p" ]]; then + echo "install folder missing" + exit 1 +fi + + cd $1 -wget https://github.com/mongodb/mongo-c-driver/releases/download/1.15.2/mongo-c-driver-1.15.2.tar.gz -tar xzf mongo-c-driver-1.15.2.tar.gz -cd mongo-c-driver-1.15.2 +wget https://github.com/mongodb/mongo-c-driver/releases/download/1.17.2/mongo-c-driver-1.17.2.tar.gz +tar xzf mongo-c-driver-1.17.2.tar.gz +cd mongo-c-driver-1.17.2 -cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_SSL=OFF -DENABLE_SASL=OFF -DENABLE_AUTOMATIC_INIT_AND_CLEANUP=OFF -DMONGOC_ENABLE_STATIC=ON . -make +cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_AUTOMATIC_INIT_AND_CLEANUP=OFF -DMONGOC_ENABLE_STATIC=ON . +make -j 4 #sudo make install diff --git a/CHANGELOG.md b/CHANGELOG.md index 258270253..12ad1db73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ IMPROVEMENTS * Consumer/Producer API - allow any characters in source/stream/group names +* Consumer/Producer API - introduce stream metadata * Consumer API - an option to auto discovery of data folder when consumer client uses file transfer service (has_filesystem=False) BUG FIXES diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp index d4e1de03f..74337e109 100644 --- a/common/cpp/src/database/mongodb_client.cpp +++ b/common/cpp/src/database/mongodb_client.cpp @@ -14,7 +14,19 @@ namespace asapo { using asapo::Database; +void +my_logger (mongoc_log_level_t log_level, + const char* log_domain, + const char* message, + void* user_data) { + /* smaller values are more important */ + if (log_level < MONGOC_LOG_LEVEL_CRITICAL) { + mongoc_log_default_handler (log_level, log_domain, message, user_data); + } +} + MongoDbInstance::MongoDbInstance() { + mongoc_log_set_handler (my_logger, NULL); mongoc_init(); } @@ -65,7 +77,7 @@ Error MongoDBClient::UpdateCurrentCollectionIfNeeded(const std::string& collecti mongoc_collection_destroy(current_collection_); } - auto encoded_name = EncodeColName(collection_name); + auto encoded_name = EncodeColName(collection_name); if (encoded_name.size() > maxCollectionNameLength) { return DBErrorTemplates::kWrongInput.Generate("collection name too long"); } @@ -142,7 +154,6 @@ bson_p PrepareBsonDocument(const MessageMeta& file, Error* err) { return nullptr; } - *err = nullptr; return bson_p{bson}; } @@ -156,7 +167,8 @@ bson_p PrepareUpdateDocument(const uint8_t* json, Error* err) { return nullptr; } bson_error_t mongo_err; - auto bson_meta = bson_new_from_json(reinterpret_cast<const uint8_t*>(json_flat.c_str()), json_flat.size(), &mongo_err); + auto bson_meta = + bson_new_from_json(reinterpret_cast<const uint8_t*>(json_flat.c_str()), json_flat.size(), &mongo_err); if (!bson_meta) { *err = DBErrorTemplates::kJsonParseError.Generate(mongo_err.message); return nullptr; @@ -164,7 +176,6 @@ bson_p PrepareUpdateDocument(const uint8_t* json, Error* err) { return bson_p{bson_meta}; } - bson_p PrepareInjestDocument(const uint8_t* json, ssize_t len, Error* err) { bson_error_t mongo_err; if (json == nullptr) { @@ -172,13 +183,12 @@ bson_p PrepareInjestDocument(const uint8_t* json, ssize_t len, Error* err) { return nullptr; } - auto bson_meta = bson_new_from_json(json, len, &mongo_err); if (!bson_meta) { *err = DBErrorTemplates::kJsonParseError.Generate(mongo_err.message); return nullptr; } - auto bson = bson_new(); + auto bson = bson_new(); if (!BSON_APPEND_DOCUMENT(bson, "meta", bson_meta) || !BSON_APPEND_UTF8(bson, "schema_version", GetDbSchemaVersion().c_str())) { *err = DBErrorTemplates::kInsertError.Generate("cannot add schema version "); @@ -193,13 +203,20 @@ bson_p PrepareInjestDocument(const uint8_t* json, ssize_t len, Error* err) { Error MongoDBClient::InsertBsonDocument(const bson_p& document, bool ignore_duplicates) const { bson_error_t mongo_err; - if (!mongoc_collection_insert_one(current_collection_, document.get(), NULL, NULL, &mongo_err)) { + bson_p insert_opts{bson_new ()}; + + if (current_session_) { + if (!mongoc_client_session_append (current_session_, insert_opts.get(), &mongo_err)) { + return DBErrorTemplates::kInsertError.Generate(std::string("Could not add session to opts: ") + mongo_err.message); + } + } + + if (!mongoc_collection_insert_one(current_collection_, document.get(), insert_opts.get(), NULL, &mongo_err)) { if (mongo_err.code == MONGOC_ERROR_DUPLICATE_KEY) { return ignore_duplicates ? nullptr : DBErrorTemplates::kDuplicateID.Generate(); } return DBErrorTemplates::kInsertError.Generate(mongo_err.message); } - return nullptr; } @@ -232,7 +249,7 @@ Error MongoDBClient::ReplaceBsonDocument(const std::string& id, const bson_p& do bson_free(opts); bson_free(selector); - bson_destroy (&reply); + bson_destroy(&reply); return err; } @@ -242,7 +259,7 @@ Error MongoDBClient::UpdateBsonDocument(const std::string& id, const bson_p& doc bson_t* opts = BCON_NEW ("upsert", BCON_BOOL(upsert)); bson_t* selector = BCON_NEW ("_id", BCON_UTF8(id.c_str())); - bson_t* update = BCON_NEW ("$set", BCON_DOCUMENT(document.get())); + bson_t* update = BCON_NEW ("$set", BCON_DOCUMENT(document.get())); bson_t reply; Error err = nullptr; @@ -256,12 +273,117 @@ Error MongoDBClient::UpdateBsonDocument(const std::string& id, const bson_p& doc bson_free(opts); bson_free(selector); + bson_destroy(&reply); + bson_destroy(update); + + return err; +} + +Error MongoDBClient::GetNextId(const std::string& stream, uint64_t* id) const { + mongoc_find_and_modify_opts_t* opts; + std::string collection_name = "auto_id_counters"; + auto collection = mongoc_client_get_collection(client_, database_name_.c_str(), collection_name.c_str()); + mongoc_collection_set_write_concern(collection, write_concern_); + + bson_t reply; + bson_error_t error; + bson_t query = BSON_INITIALIZER; + bson_t* update; + bool success; + BSON_APPEND_UTF8 (&query, "_id", stream.c_str()); + update = BCON_NEW ("$inc", "{", "curIndex", BCON_INT64(1), "}"); + opts = mongoc_find_and_modify_opts_new (); + mongoc_find_and_modify_opts_set_update (opts, update); + if (current_session_) { + bson_p extra_opts{bson_new()}; + if (!mongoc_client_session_append(current_session_, extra_opts.get(), &error)) { + return DBErrorTemplates::kInsertError.Generate( + std::string("Could not add session to opts: ") + error.message); + } + success = mongoc_find_and_modify_opts_append(opts, extra_opts.get()); + if (!success) { + return DBErrorTemplates::kInsertError.Generate(std::string( + "mongoc_find_and_modify_opts_append: cannot append options")); + } + } + mongoc_find_and_modify_opts_set_flags(opts, + mongoc_find_and_modify_flags_t(MONGOC_FIND_AND_MODIFY_UPSERT | MONGOC_FIND_AND_MODIFY_RETURN_NEW)); + success = mongoc_collection_find_and_modify_with_opts ( + collection, &query, opts, &reply, &error); + Error err; + if (success) { + bson_iter_t iter; + bson_iter_t iter_idx; + auto found = bson_iter_init(&iter, &reply) && bson_iter_find_descendant(&iter, "value.curIndex", &iter_idx) + && BSON_ITER_HOLDS_INT64(&iter_idx); + if (found) { + *id = bson_iter_int64(&iter_idx); + } else { + err = DBErrorTemplates::kInsertError.Generate(std::string("cannot extract auto id")); + } + } else { +// auto str = bson_as_relaxed_extended_json(&reply, NULL); +// printf("%s",str); +// bson_free(str); + + err = DBErrorTemplates::kInsertError.Generate(std::string("cannot get auto id:") + error.message ); + } bson_destroy (&reply); bson_destroy (update); - + bson_destroy (&query); + mongoc_find_and_modify_opts_destroy (opts); + mongoc_collection_destroy(collection); return err; } +static bool callback(mongoc_client_session_t* session, + void* ctx, + bson_t** reply, + bson_error_t* error) { + auto context = (TransactionContext*) ctx; + + uint64_t id; + context->err = context->caller->GetNextId(context->collection, &id); + if (context->err != nullptr) { + return false; + } + + context->meta.id = id; + context->err = context->caller->Insert(context->collection, context->meta, context->ignore_duplicates); + return context->err == nullptr; +} + + +Error MongoDBClient::InsertWithAutoId(const MessageMeta& file, + const std::string& collection, + bool ignore_duplicates) const { + bson_error_t error; + TransactionContext context = {this, file, collection, ignore_duplicates}; + + current_session_ = mongoc_client_start_session(client_, NULL, &error); + if (!current_session_) { + return DBErrorTemplates::kDBError.Generate(std::string("cannot start mongodb session: ") + error.message); + } + auto ret = false; + auto nret = 0; + while (!ret && nret < 100) { + ret = mongoc_client_session_with_transaction( + current_session_, callback, NULL, (void*) &context, NULL, NULL); + nret++; + } + current_session_ = nullptr; + + if (!ret) { + if (context.err) { + return std::move(context.err); + } else { + return DBErrorTemplates::kDBError.Generate("mongoc_client_session_with_transaction"); + } + } + + return nullptr; +} + Error MongoDBClient::Insert(const std::string& collection, const MessageMeta& file, bool ignore_duplicates) const { if (!connected_) { return DBErrorTemplates::kNotConnected.Generate(); @@ -272,6 +394,10 @@ Error MongoDBClient::Insert(const std::string& collection, const MessageMeta& fi return err; } + if (file.id == 0) { + return InsertWithAutoId(file, collection, ignore_duplicates); + } + auto document = PrepareBsonDocument(file, &err); if (err) { return err; @@ -317,7 +443,7 @@ Error MongoDBClient::InsertMeta(const std::string& collection, const std::string } auto id_encoded = EncodeColName(id); - auto document = PrepareBsonDocument(data, (ssize_t)size, id_encoded, mode, &err); + auto document = PrepareBsonDocument(data, (ssize_t) size, id_encoded, mode, &err); if (err) { return err; } @@ -596,7 +722,6 @@ bool MongoCollectionIsDataStream(const std::string& stream_name) { return stream_name.rfind(prefix, 0) == 0; } - Error MongoDBClient::UpdateLastStreamInfo(const char* str, StreamInfo* info) const { auto collection_name = DecodeName(str); if (!MongoCollectionIsDataStream(collection_name)) { @@ -686,7 +811,8 @@ Error MongoDBClient::DeleteCollection(const std::string& name) const { mongoc_collection_destroy(collection); if (!r) { if (error.code == 26) { - return DBErrorTemplates::kNoRecord.Generate("collection " + name + " not found in " + DecodeName(database_name_)); + return DBErrorTemplates::kNoRecord.Generate( + "collection " + name + " not found in " + DecodeName(database_name_)); } else { return DBErrorTemplates::kDBError.Generate(std::string(error.message) + ": " + std::to_string(error.code)); } diff --git a/common/cpp/src/database/mongodb_client.h b/common/cpp/src/database/mongodb_client.h index d47ba4bc5..d3801cdb3 100644 --- a/common/cpp/src/database/mongodb_client.h +++ b/common/cpp/src/database/mongodb_client.h @@ -58,10 +58,12 @@ class MongoDBClient final : public Database { Error GetLastStream(StreamInfo* info) const override; Error DeleteStream(const std::string& stream) const override; Error GetMetaFromDb(const std::string& collection, const std::string& id, std::string* res) const override; + Error GetNextId(const std::string& collection, uint64_t* id) const; ~MongoDBClient() override; private: mongoc_client_t* client_{nullptr}; mutable mongoc_collection_t* current_collection_{nullptr}; + mutable mongoc_client_session_t* current_session_{nullptr}; mutable std::string current_collection_name_; std::string database_name_; mongoc_write_concern_t* write_concern_{nullptr}; @@ -82,6 +84,15 @@ class MongoDBClient final : public Database { Error DeleteCollection(const std::string& name) const; Error DeleteCollections(const std::string& prefix) const; Error DeleteDocumentsInCollection(const std::string& collection_name, const std::string& querystr) const; + Error InsertWithAutoId(const MessageMeta& file, const std::string& collection, bool ignore_duplicates) const; +}; + +struct TransactionContext { + const MongoDBClient* caller; + MessageMeta meta; + std::string collection; + bool ignore_duplicates; + Error err; }; } diff --git a/tests/automatic/mongo_db/CMakeLists.txt b/tests/automatic/mongo_db/CMakeLists.txt index 9357532e7..5ce635b2f 100644 --- a/tests/automatic/mongo_db/CMakeLists.txt +++ b/tests/automatic/mongo_db/CMakeLists.txt @@ -2,6 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.7) # needed for fixtures add_subdirectory(connect) add_subdirectory(insert_retrieve) +add_subdirectory(auto_id) add_subdirectory(insert_retrieve_dataset) add_subdirectory(meta) diff --git a/tests/automatic/mongo_db/auto_id/CMakeLists.txt b/tests/automatic/mongo_db/auto_id/CMakeLists.txt new file mode 100644 index 000000000..10d2242ec --- /dev/null +++ b/tests/automatic/mongo_db/auto_id/CMakeLists.txt @@ -0,0 +1,17 @@ +set(TARGET_NAME mongo-auto-id) +set(SOURCE_FILES auto_id.cpp) + + +################################ +# Executable and link +################################ +add_executable(${TARGET_NAME} ${SOURCE_FILES}) +target_link_libraries(${TARGET_NAME} test_common database) +target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR}) + +################################ +# Testing +################################ +add_test_cleanup(${TARGET_NAME}) +add_integration_test(${TARGET_NAME} trans "trans 4 100" "1") +add_integration_test(${TARGET_NAME} seq "seq 4 100" "1") diff --git a/tests/automatic/mongo_db/auto_id/auto_id.cpp b/tests/automatic/mongo_db/auto_id/auto_id.cpp new file mode 100644 index 000000000..78c00b0b4 --- /dev/null +++ b/tests/automatic/mongo_db/auto_id/auto_id.cpp @@ -0,0 +1,120 @@ +#include <iostream> +#include <chrono> +#include <thread> +#include <atomic> + +#include "../../../common/cpp/src/database/mongodb_client.h" +#include "asapo/database/db_error.h" + +#include "testing.h" +#include "asapo/common/data_structs.h" + +using asapo::Error; + +using std::chrono::high_resolution_clock; +using std::chrono::duration_cast; +using std::chrono::duration; +using std::chrono::milliseconds; + +std::atomic<uint64_t> global_count{0}; + +enum class Mode { + kTransaction, + kUpdateCounterThenIngest +}; + +struct Args { + Mode mode; + std::string str_mode; + int n_threads; + int n_messages_per_thread; +}; + +Args GetArgs(int argc, char* argv[]) { + if (argc != 4) { + std::cout << "Wrong number of arguments" << std::endl; + exit(EXIT_FAILURE); + } + std::string mode = argv[1]; + Args args; + if (mode == "trans" ) { + args.mode = Mode::kTransaction; + } else if (mode == "seq" ) { + args.mode = Mode::kUpdateCounterThenIngest; + } else { + printf("wrong mode"); + exit(1); + } + args.str_mode = mode; + args.n_threads = atoi(argv[2]); + args.n_messages_per_thread = atoi(argv[3]); + return args; +} + +void insert(const asapo::MongoDBClient& db, const std::string& name, asapo::MessageMeta fi, const Args& args) { + auto start = fi.id; + for (int i = 0; i < args.n_messages_per_thread; i++) { + switch (args.mode) { + case Mode::kTransaction: + fi.id = 0; + break; + case Mode::kUpdateCounterThenIngest: + fi.id = start + i + 1; + break; + default: + abort(); + } + Error err = db.Insert(std::string("data_") + name, fi, false); + if (err != nullptr) { + printf("%s\n",err->Explain().c_str()); +// break; + } else { + global_count++; + } + } +} + +int main(int argc, char* argv[]) { + auto args = GetArgs(argc, argv); + auto db_name = R"(data_/ \."$)"; + + auto exec_next = [&args, db_name](int i) { + asapo::MessageMeta fi; + asapo::MongoDBClient db; + fi.size = 100; + fi.name = "relpath/1"; + fi.timestamp = std::chrono::system_clock::now(); + fi.buf_id = 18446744073709551615ull; + fi.source = "host:1234"; + fi.id = args.n_messages_per_thread * i; + db.Connect("127.0.0.1", db_name); + insert(db, "stream", fi, args); + }; + + auto t1 = high_resolution_clock::now(); + + std::vector<std::thread> threads; + for (int i = 0; i < args.n_threads; i++) { + threads.emplace_back(std::thread(exec_next, i)); + } + for (auto& thread : threads) { + thread.join(); + } + + auto messages_sent = global_count.load(); + + printf("Sent %llu messages \n", messages_sent); + M_AssertTrue(messages_sent == args.n_threads * args.n_messages_per_thread); + + auto t2 = high_resolution_clock::now(); + auto ms_int = duration_cast<milliseconds>(t2 - t1).count(); + printf("mode: %s, throughput %llu messages/sec with %d threads\n", args.str_mode.c_str(), 1000 * messages_sent / ms_int, + args.n_threads); + + asapo::MongoDBClient db; + db.Connect("127.0.0.1", db_name); + db.DeleteStream("stream"); + + + return 0; +} diff --git a/tests/automatic/mongo_db/auto_id/cleanup_linux.sh b/tests/automatic/mongo_db/auto_id/cleanup_linux.sh new file mode 100644 index 000000000..71093a257 --- /dev/null +++ b/tests/automatic/mongo_db/auto_id/cleanup_linux.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +database_name=data_%2F%20%5C%2E%22%24 + +echo "db.dropDatabase()" | mongo ${database_name} diff --git a/tests/automatic/mongo_db/auto_id/cleanup_windows.bat b/tests/automatic/mongo_db/auto_id/cleanup_windows.bat new file mode 100644 index 000000000..8a64516d4 --- /dev/null +++ b/tests/automatic/mongo_db/auto_id/cleanup_windows.bat @@ -0,0 +1,4 @@ +SET database_name=data_%%2F%%20%%5C%%2E%%22%%24 +SET mongo_exe="c:\Program Files\MongoDB\Server\4.2\bin\mongo.exe" + +echo db.dropDatabase() | %mongo_exe% %database_name% diff --git a/tests/automatic/mongo_db/insert_retrieve/CMakeLists.txt b/tests/automatic/mongo_db/insert_retrieve/CMakeLists.txt index d1eef84c9..261380562 100644 --- a/tests/automatic/mongo_db/insert_retrieve/CMakeLists.txt +++ b/tests/automatic/mongo_db/insert_retrieve/CMakeLists.txt @@ -1,4 +1,4 @@ -set(TARGET_NAME insert_retrieve_mongodb) +set(TARGET_NAME mongo-insert_retrieve_mongodb) set(SOURCE_FILES insert_retrieve_mongodb.cpp) diff --git a/tests/automatic/mongo_db/insert_retrieve_dataset/CMakeLists.txt b/tests/automatic/mongo_db/insert_retrieve_dataset/CMakeLists.txt index d4c09f197..b5a491984 100644 --- a/tests/automatic/mongo_db/insert_retrieve_dataset/CMakeLists.txt +++ b/tests/automatic/mongo_db/insert_retrieve_dataset/CMakeLists.txt @@ -1,4 +1,4 @@ -set(TARGET_NAME insert_retrieve_dataset_mongodb) +set(TARGET_NAME mongo-insert_retrieve_dataset_mongodb) set(SOURCE_FILES insert_retrieve_dataset_mongodb.cpp) diff --git a/tests/automatic/mongo_db/meta/CMakeLists.txt b/tests/automatic/mongo_db/meta/CMakeLists.txt index 609cd1581..93f413907 100644 --- a/tests/automatic/mongo_db/meta/CMakeLists.txt +++ b/tests/automatic/mongo_db/meta/CMakeLists.txt @@ -1,4 +1,4 @@ -set(TARGET_NAME meta_mongodb) +set(TARGET_NAME mongo-meta) set(SOURCE_FILES meta_mongodb.cpp) -- GitLab