From e1d0a7ab4aa49c1f6863d8668fda1f148b91e558 Mon Sep 17 00:00:00 2001 From: karnem <mikhail.karnevskiy@desy.de> Date: Thu, 7 Sep 2023 12:31:34 +0200 Subject: [PATCH] Add function to create new index: message_id. Fix stream info with respect tot his change. --- common/cpp/src/database/mongodb_client.cpp | 79 ++++++++++++++++++++-- common/cpp/src/database/mongodb_client.h | 3 + 2 files changed, 77 insertions(+), 5 deletions(-) diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp index c15dda360..1f5b037f1 100644 --- a/common/cpp/src/database/mongodb_client.cpp +++ b/common/cpp/src/database/mongodb_client.cpp @@ -87,10 +87,65 @@ Error MongoDBClient::UpdateCurrentCollectionIfNeeded(const std::string& collecti current_collection_name_ = collection_name; mongoc_collection_set_write_concern(current_collection_, write_concern_); + // Create index `message_id` for collection of asapo messages + // ToDo cache streams with already created indices + if (collection_name.rfind(kDBDataCollectionNamePrefix, 0) != 0){ + return nullptr; + } + CreateIndex(encoded_name); + return nullptr; } +Error MongoDBClient::CreateIndex(const std::string& collection_name) const { + + bson_t keys; + char *index_name; + bson_t *create_indexes; + bson_t reply; + char *reply_str; + bson_error_t error; + bool r; + mongoc_database_t *db; + + db = mongoc_client_get_database (client_, database_name_.c_str()); + + bson_init (&keys); + BSON_APPEND_INT32 (&keys, "message_id", 1); + index_name = mongoc_collection_keys_to_index_string (&keys); + create_indexes = BCON_NEW ("createIndexes", + BCON_UTF8 (collection_name.c_str()), + "indexes", + "[", + "{", + "key", + BCON_DOCUMENT (&keys), + "name", + BCON_UTF8 (index_name), + "unique", BCON_BOOL (true), + "}", + "]"); + + r = mongoc_database_write_command_with_opts ( + db, create_indexes, NULL , &reply, &error); + + reply_str = bson_as_json (&reply, NULL); + printf ("%s\n", reply_str); + + if (!r) { + fprintf (stderr, "Error in createIndexes: %s\n", error.message); + } + bson_free (index_name); + bson_free (reply_str); + bson_destroy (&reply); + bson_destroy (create_indexes); + // return DBErrorTemplates::kInsertError.Generate("cannot create index for collection: "+collection_name); + return nullptr; +} + + + Error MongoDBClient::TryConnectDatabase() { auto err = Ping(); if (err == nullptr) { @@ -347,6 +402,10 @@ Error MongoDBClient::InsertWithAutoId(const MessageMeta& file, auto meta_new = file; meta_new.id = id; + // Inset with auto ID + if (file.message_id == 0){ + meta_new.message_id = id; + } return Insert(current_collection_name_, meta_new, false, id_inserted); } @@ -469,7 +528,9 @@ Error MongoDBClient::InsertAsDatasetMessage(const std::string& collection, const return err; } auto query = - BCON_NEW ("$and", "[", "{", "_id", BCON_INT64(static_cast<int64_t>(file.id)), "}", "{", "messages.dataset_substream", + BCON_NEW ("$and", "[", "{", "_id", BCON_INT64(static_cast<int64_t>(file.message_id)), "}", + "{", "message_id", BCON_INT64(static_cast<int64_t>(file.message_id)), "}", + "{", "messages.dataset_substream", "{", "$ne", BCON_INT64(static_cast<int64_t>(file.dataset_substream)), "}", "}", "]"); auto update = BCON_NEW ("$setOnInsert", "{", @@ -516,10 +577,18 @@ Error MongoDBClient::GetRecordFromDb(const std::string& collection, uint64_t id, filter = BCON_NEW ("_id", BCON_INT64(static_cast<int64_t>(id))); opts = BCON_NEW ("limit", BCON_INT64(1)); break; + case GetRecordMode::kByMessageId: + filter = BCON_NEW ("message_id", BCON_INT64(static_cast<int64_t>(id))); + opts = BCON_NEW ("limit", BCON_INT64(1)); + break; case GetRecordMode::kLast: filter = BCON_NEW (NULL); opts = BCON_NEW ("limit", BCON_INT64(1), "sort", "{", "_id", BCON_INT64(-1), "}"); break; + case GetRecordMode::kLastMessageId: + filter = BCON_NEW (NULL); + opts = BCON_NEW ("limit", BCON_INT64(1), "sort", "{", "message_id", BCON_INT64(-1), "}"); + break; case GetRecordMode::kEarliest: filter = BCON_NEW (NULL); opts = BCON_NEW ("limit", BCON_INT64(1), "sort", "{", "timestamp", BCON_INT64(1), "}"); @@ -553,7 +622,7 @@ Error MongoDBClient::GetRecordFromDb(const std::string& collection, uint64_t id, Error MongoDBClient::GetById(const std::string& collection, uint64_t id, MessageMeta* file) const { std::string record_str; - auto err = GetRecordFromDb(collection, id, "", GetRecordMode::kById, &record_str); + auto err = GetRecordFromDb(collection, id, "", GetRecordMode::kByMessageId, &record_str); if (err) { return err; } @@ -643,9 +712,9 @@ Error UpdateStreamInfoFromLastRecord(const std::string& last_record_str, return DBErrorTemplates::kJsonParseError.Generate( "UpdateStreamInfoFromLastRecord: cannot parse timestamp in response: " + last_record_str); } - if (parser.GetUInt64("_id", &id) != nullptr) { + if (parser.GetUInt64("message_id", &id) != nullptr) { return DBErrorTemplates::kJsonParseError.Generate( - "UpdateStreamInfoFromLastRecord: cannot parse _id in response: " + last_record_str); + "UpdateStreamInfoFromLastRecord: cannot parse message_id in response: " + last_record_str); } info->timestamp_lastentry = timestamp_last; @@ -671,7 +740,7 @@ Error StreamInfoFromDbResponse(const std::string& last_record_str, Error MongoDBClient::GetStreamInfo(const std::string& collection, StreamInfo* info) const { std::string last_record_str, earliest_record_str; - auto err = GetRecordFromDb(collection, 0, "", GetRecordMode::kLast, &last_record_str); + auto err = GetRecordFromDb(collection, 0, "", GetRecordMode::kLastMessageId, &last_record_str); if (err) { if (err == DBErrorTemplates::kNoRecord) { // with noRecord error it will return last_id = 0 which can be used to understand that the stream is not started yet diff --git a/common/cpp/src/database/mongodb_client.h b/common/cpp/src/database/mongodb_client.h index d2a0621f9..e45d79521 100644 --- a/common/cpp/src/database/mongodb_client.h +++ b/common/cpp/src/database/mongodb_client.h @@ -35,9 +35,11 @@ using bson_p = std::unique_ptr<_bson_t, BsonDestroyFunctor>; enum class GetRecordMode { kById, + kByMessageId, kLast, kEarliest, kByStringId, + kLastMessageId, }; const size_t maxDbNameLength = 63; @@ -62,6 +64,7 @@ class MongoDBClient final : public Database { Error GetPersistedStreamsNumber(int* res) const override; Error PersistStream(const std::string& stream_name) const override; Error GetNextId(const std::string& collection, uint64_t* id) const; + Error CreateIndex(const std::string& collection_name) const; ~MongoDBClient() override; private: mongoc_client_t* client_{nullptr}; -- GitLab