Skip to content
Snippets Groups Projects
Commit e1d0a7ab authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Add function to create new index: message_id. Fix stream info with respect tot his change.

parent c40fe85d
No related branches found
No related tags found
No related merge requests found
......@@ -87,10 +87,65 @@ Error MongoDBClient::UpdateCurrentCollectionIfNeeded(const std::string& collecti
current_collection_name_ = collection_name;
mongoc_collection_set_write_concern(current_collection_, write_concern_);
// Create index `message_id` for collection of asapo messages
// ToDo cache streams with already created indices
if (collection_name.rfind(kDBDataCollectionNamePrefix, 0) != 0){
return nullptr;
}
CreateIndex(encoded_name);
return nullptr;
}
Error MongoDBClient::CreateIndex(const std::string& collection_name) const {
bson_t keys;
char *index_name;
bson_t *create_indexes;
bson_t reply;
char *reply_str;
bson_error_t error;
bool r;
mongoc_database_t *db;
db = mongoc_client_get_database (client_, database_name_.c_str());
bson_init (&keys);
BSON_APPEND_INT32 (&keys, "message_id", 1);
index_name = mongoc_collection_keys_to_index_string (&keys);
create_indexes = BCON_NEW ("createIndexes",
BCON_UTF8 (collection_name.c_str()),
"indexes",
"[",
"{",
"key",
BCON_DOCUMENT (&keys),
"name",
BCON_UTF8 (index_name),
"unique", BCON_BOOL (true),
"}",
"]");
r = mongoc_database_write_command_with_opts (
db, create_indexes, NULL , &reply, &error);
reply_str = bson_as_json (&reply, NULL);
printf ("%s\n", reply_str);
if (!r) {
fprintf (stderr, "Error in createIndexes: %s\n", error.message);
}
bson_free (index_name);
bson_free (reply_str);
bson_destroy (&reply);
bson_destroy (create_indexes);
// return DBErrorTemplates::kInsertError.Generate("cannot create index for collection: "+collection_name);
return nullptr;
}
Error MongoDBClient::TryConnectDatabase() {
auto err = Ping();
if (err == nullptr) {
......@@ -347,6 +402,10 @@ Error MongoDBClient::InsertWithAutoId(const MessageMeta& file,
auto meta_new = file;
meta_new.id = id;
// Inset with auto ID
if (file.message_id == 0){
meta_new.message_id = id;
}
return Insert(current_collection_name_, meta_new, false, id_inserted);
}
......@@ -469,7 +528,9 @@ Error MongoDBClient::InsertAsDatasetMessage(const std::string& collection, const
return err;
}
auto query =
BCON_NEW ("$and", "[", "{", "_id", BCON_INT64(static_cast<int64_t>(file.id)), "}", "{", "messages.dataset_substream",
BCON_NEW ("$and", "[", "{", "_id", BCON_INT64(static_cast<int64_t>(file.message_id)), "}",
"{", "message_id", BCON_INT64(static_cast<int64_t>(file.message_id)), "}",
"{", "messages.dataset_substream",
"{", "$ne",
BCON_INT64(static_cast<int64_t>(file.dataset_substream)), "}", "}", "]");
auto update = BCON_NEW ("$setOnInsert", "{",
......@@ -516,10 +577,18 @@ Error MongoDBClient::GetRecordFromDb(const std::string& collection, uint64_t id,
filter = BCON_NEW ("_id", BCON_INT64(static_cast<int64_t>(id)));
opts = BCON_NEW ("limit", BCON_INT64(1));
break;
case GetRecordMode::kByMessageId:
filter = BCON_NEW ("message_id", BCON_INT64(static_cast<int64_t>(id)));
opts = BCON_NEW ("limit", BCON_INT64(1));
break;
case GetRecordMode::kLast:
filter = BCON_NEW (NULL);
opts = BCON_NEW ("limit", BCON_INT64(1), "sort", "{", "_id", BCON_INT64(-1), "}");
break;
case GetRecordMode::kLastMessageId:
filter = BCON_NEW (NULL);
opts = BCON_NEW ("limit", BCON_INT64(1), "sort", "{", "message_id", BCON_INT64(-1), "}");
break;
case GetRecordMode::kEarliest:
filter = BCON_NEW (NULL);
opts = BCON_NEW ("limit", BCON_INT64(1), "sort", "{", "timestamp", BCON_INT64(1), "}");
......@@ -553,7 +622,7 @@ Error MongoDBClient::GetRecordFromDb(const std::string& collection, uint64_t id,
Error MongoDBClient::GetById(const std::string& collection, uint64_t id, MessageMeta* file) const {
std::string record_str;
auto err = GetRecordFromDb(collection, id, "", GetRecordMode::kById, &record_str);
auto err = GetRecordFromDb(collection, id, "", GetRecordMode::kByMessageId, &record_str);
if (err) {
return err;
}
......@@ -643,9 +712,9 @@ Error UpdateStreamInfoFromLastRecord(const std::string& last_record_str,
return DBErrorTemplates::kJsonParseError.Generate(
"UpdateStreamInfoFromLastRecord: cannot parse timestamp in response: " + last_record_str);
}
if (parser.GetUInt64("_id", &id) != nullptr) {
if (parser.GetUInt64("message_id", &id) != nullptr) {
return DBErrorTemplates::kJsonParseError.Generate(
"UpdateStreamInfoFromLastRecord: cannot parse _id in response: " + last_record_str);
"UpdateStreamInfoFromLastRecord: cannot parse message_id in response: " + last_record_str);
}
info->timestamp_lastentry = timestamp_last;
......@@ -671,7 +740,7 @@ Error StreamInfoFromDbResponse(const std::string& last_record_str,
Error MongoDBClient::GetStreamInfo(const std::string& collection, StreamInfo* info) const {
std::string last_record_str, earliest_record_str;
auto err = GetRecordFromDb(collection, 0, "", GetRecordMode::kLast, &last_record_str);
auto err = GetRecordFromDb(collection, 0, "", GetRecordMode::kLastMessageId, &last_record_str);
if (err) {
if (err
== DBErrorTemplates::kNoRecord) { // with noRecord error it will return last_id = 0 which can be used to understand that the stream is not started yet
......
......@@ -35,9 +35,11 @@ using bson_p = std::unique_ptr<_bson_t, BsonDestroyFunctor>;
enum class GetRecordMode {
kById,
kByMessageId,
kLast,
kEarliest,
kByStringId,
kLastMessageId,
};
const size_t maxDbNameLength = 63;
......@@ -62,6 +64,7 @@ class MongoDBClient final : public Database {
Error GetPersistedStreamsNumber(int* res) const override;
Error PersistStream(const std::string& stream_name) const override;
Error GetNextId(const std::string& collection, uint64_t* id) const;
Error CreateIndex(const std::string& collection_name) const;
~MongoDBClient() override;
private:
mongoc_client_t* client_{nullptr};
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment