From dead52d802ee623d88937bf9befcad2c64db206d Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Mon, 17 May 2021 14:56:31 +0200
Subject: [PATCH] fix stream info for datasets bug

---
 common/cpp/src/database/mongodb_client.cpp    | 65 ++++++++++++-------
 .../insert_retrieve_dataset_mongodb.cpp       | 37 +++++++++--
 .../producer/python_api/producer_api.py       |  7 +-
 3 files changed, 77 insertions(+), 32 deletions(-)

diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp
index ba8969ed7..6def7c2e3 100644
--- a/common/cpp/src/database/mongodb_client.cpp
+++ b/common/cpp/src/database/mongodb_client.cpp
@@ -361,7 +361,7 @@ Error MongoDBClient::GetDataSetById(const std::string &collection,
 
 }
 
-Error   UpdateStreamInfoFromEarliestRecord(const std::string &earliest_record_str,
+Error UpdateStreamInfoFromEarliestRecord(const std::string &earliest_record_str,
                                          StreamInfo* info) {
     std::chrono::system_clock::time_point timestamp_created;
     auto parser = JsonStringParser(earliest_record_str);
@@ -382,7 +382,7 @@ Error UpdateFinishedStreamInfo(const std::string &metadata,
     auto err = parser.GetString("next_stream", &next_stream);
     if (err) {
         return DBErrorTemplates::kJsonParseError.Generate(
-            "UpdateFinishedStreamInfo: cannot parse finished strean meta response: " + metadata);
+            "UpdateFinishedStreamInfo: cannot parse finished stream meta response: " + metadata);
     }
     if (next_stream != kNoNextStreamKeyword) {
         info->next_stream = next_stream;
@@ -390,24 +390,40 @@ Error UpdateFinishedStreamInfo(const std::string &metadata,
     return nullptr;
 }
 
+Error UpdateFinishedInfo(const std::string &last_record_str, const JsonStringParser &parser, StreamInfo* info) {
+    std::string name;
+    parser.GetString("name", &name);
+    if (name != kFinishStreamKeyword) {
+        return nullptr;
+    }
+    std::string metadata;
+    if (parser.Embedded("meta").GetRawString(&metadata) != nullptr) {
+        return DBErrorTemplates::kJsonParseError.Generate(
+            "UpdateStreamInfoFromLastRecord: cannot parse metadata in response: " + last_record_str);
+    }
+    return UpdateFinishedStreamInfo(metadata, info);
+}
+
 Error UpdateStreamInfoFromLastRecord(const std::string &last_record_str,
                                      StreamInfo* info) {
-    MessageMeta last_message;
-    auto ok = last_message.SetFromJson(last_record_str);
-    if (!ok) {
+    auto parser = JsonStringParser(last_record_str);
+    std::chrono::system_clock::time_point timestamp_last;
+    uint64_t id;
+
+    if (!TimeFromJson(parser, "timestamp", &timestamp_last)) {
         return DBErrorTemplates::kJsonParseError.Generate(
-            "UpdateStreamInfoFromLastRecord: cannot parse mongodb response: " + last_record_str);
+            "UpdateStreamInfoFromLastRecord: cannot parse timestamp in response: " + last_record_str);
     }
-    info->last_id = last_message.id;
-    info->timestamp_lastentry = last_message.timestamp;
-
-    if (last_message.name == kFinishStreamKeyword) {
-        auto err = UpdateFinishedStreamInfo(last_message.metadata, info);
-        if (err) {
-            return err;
-        }
+    if (parser.GetUInt64("_id", &id) != nullptr) {
+        return DBErrorTemplates::kJsonParseError.Generate(
+            "UpdateStreamInfoFromLastRecord: cannot parse _id in response: " + last_record_str);
     }
-    return nullptr;
+
+    info->timestamp_lastentry = timestamp_last;
+    info->last_id = id;
+
+    return UpdateFinishedInfo(last_record_str,parser,info);
+
 }
 
 Error StreamInfoFromDbResponse(const std::string &last_record_str,
@@ -428,7 +444,8 @@ Error MongoDBClient::GetStreamInfo(const std::string &collection, StreamInfo* in
     std::string last_record_str, earliest_record_str;
     auto err = GetRecordFromDb(collection, 0, GetRecordMode::kLast, &last_record_str);
     if (err) {
-        if (err == DBErrorTemplates::kNoRecord) { // with noRecord error it will return last_id = 0 which can be used to understand that the stream is not started yet
+        if (err
+            == DBErrorTemplates::kNoRecord) { // with noRecord error it will return last_id = 0 which can be used to understand that the stream is not started yet
             *info = StreamInfo{};
             return nullptr;
         }
@@ -506,14 +523,13 @@ Error MongoDBClient::GetLastStream(StreamInfo* info) const {
     return err;
 }
 
-
 Error MongoDBClient::DeleteCollections(const std::string &prefix) const {
     mongoc_database_t* database;
     char** strv;
     bson_error_t error;
     std::string querystr = "^" + prefix;
     bson_t* query = BCON_NEW ("name", BCON_REGEX(querystr.c_str(), "i"));
-    bson_t* opts = BCON_NEW ("nameOnly", BCON_BOOL(true),"filter",BCON_DOCUMENT(query));
+    bson_t* opts = BCON_NEW ("nameOnly", BCON_BOOL(true), "filter", BCON_DOCUMENT(query));
     database = mongoc_client_get_database(client_, database_name_.c_str());
     Error err;
     if ((strv = mongoc_database_get_collection_names_with_opts(
@@ -540,20 +556,21 @@ Error MongoDBClient::DeleteCollection(const std::string &name) const {
     mongoc_collection_destroy(collection);
     if (!r) {
         if (error.code == 26) {
-            return DBErrorTemplates::kNoRecord.Generate("collection "+name+" not found in "+database_name_);
+            return DBErrorTemplates::kNoRecord.Generate("collection " + name + " not found in " + database_name_);
         } else {
-            return DBErrorTemplates::kDBError.Generate(std::string(error.message)+": "+std::to_string(error.code));
+            return DBErrorTemplates::kDBError.Generate(std::string(error.message) + ": " + std::to_string(error.code));
         }
     }
     return nullptr;
 }
 
-Error MongoDBClient::DeleteDocumentsInCollection(const std::string &collection_name,const std::string &querystr) const {
+Error MongoDBClient::DeleteDocumentsInCollection(const std::string &collection_name,
+                                                 const std::string &querystr) const {
     auto collection = mongoc_client_get_collection(client_, database_name_.c_str(), collection_name.c_str());
     mongoc_collection_set_write_concern(collection, write_concern_);
     bson_error_t error;
     auto query = BCON_NEW ("_id", BCON_REGEX(querystr.c_str(), "i"));
-    if (!mongoc_collection_delete_many(collection, query, NULL,NULL, &error)) {
+    if (!mongoc_collection_delete_many(collection, query, NULL, NULL, &error)) {
         return DBErrorTemplates::kDBError.Generate(error.message);
     }
     mongoc_collection_destroy(collection);
@@ -570,8 +587,8 @@ Error MongoDBClient::DeleteStream(const std::string &stream) const {
     if (err == nullptr) {
         DeleteCollections(inprocess_col);
         DeleteCollections(acks_col);
-        std::string querystr = ".*_" + stream+"$";
-        DeleteDocumentsInCollection("current_location",querystr);
+        std::string querystr = ".*_" + stream + "$";
+        DeleteDocumentsInCollection("current_location", querystr);
     }
     return err;
 }
diff --git a/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp b/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp
index a19a182b1..5c194b9b5 100644
--- a/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp
+++ b/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp
@@ -49,26 +49,51 @@ int main(int argc, char* argv[]) {
         db.Connect("127.0.0.1", "data");
     }
 
-    auto err =  db.InsertAsDatasetMessage("test", fi, dataset_size, true);
+    auto err =  db.InsertAsDatasetMessage("data_test", fi, dataset_size, true);
 
 
     if (args.keyword == "DuplicateID") {
         Assert(err, "OK");
-        err =  db.InsertAsDatasetMessage("test", fi, dataset_size, true);
-        err =  db.InsertAsDatasetMessage("test", fi, dataset_size, false);
+        err =  db.InsertAsDatasetMessage("data_test", fi, dataset_size, true);
+        err =  db.InsertAsDatasetMessage("data_test", fi, dataset_size, false);
     }
 
     Assert(err, args.keyword);
 
     if (args.keyword == "OK") { // check retrieve
         asapo::MessageMeta fi_db;
-        err = db.GetDataSetById("test", fi.dataset_substream,fi.id, &fi_db);
+        err = db.GetDataSetById("data_test", fi.dataset_substream,fi.id, &fi_db);
         M_AssertTrue(fi_db == fi, "get record from db");
         M_AssertEq(nullptr, err);
-        err = db.GetDataSetById("test", 0, 0, &fi_db);
+        err = db.GetDataSetById("data_test", 0, 0, &fi_db);
         Assert(err, "No record");
-    }
 
+        asapo::StreamInfo info;
+
+        err = db.GetStreamInfo("data_test", &info);
+        M_AssertEq(nullptr, err);
+        M_AssertEq(fi.id, info.last_id);
+
+        asapo::StreamInfo info_last;
+
+        err = db.GetLastStream(&info_last);
+        M_AssertEq(nullptr, err);
+        M_AssertEq("test", info_last.name);
+        M_AssertEq(fi.id, info_last.last_id);
+        M_AssertEq(false, info_last.finished);
+
+        auto fi2 = fi;
+        fi2.id = 123;
+        fi2.timestamp = std::chrono::system_clock::now()+std::chrono::minutes(1);
+        fi2.name = asapo::kFinishStreamKeyword;
+        fi2.metadata=R"({"next_stream":"ns"})";
+        db.Insert("data_test", fi2, false);
+        err = db.GetLastStream(&info_last);
+        M_AssertEq(nullptr, err);
+        M_AssertEq("test", info_last.name);
+        M_AssertEq(fi2.id, info_last.last_id);
+        M_AssertEq(true, info_last.finished);
+    }
 
     return 0;
 }
diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py
index 2d6de1c53..dc82b2a2c 100644
--- a/tests/automatic/producer/python_api/producer_api.py
+++ b/tests/automatic/producer/python_api/producer_api.py
@@ -66,9 +66,9 @@ producer.send_file(10, local_path="./file1", exposed_path="processed/" + data_so
 
 # send datasets
 producer.send_file(2, local_path="./file1", exposed_path="processed/" + data_source + "/" + "file2", dataset=(1, 2),
-                   user_meta='{"test_key":"test_val"}', callback=callback)
+                   user_meta='{"test_key":"test_val"}', stream="dataset_stream", callback=callback)
 producer.send_file(2, local_path="./file1", exposed_path="processed/" + data_source + "/" + "file3", dataset=(2, 2),
-                   user_meta='{"test_key":"test_val"}', callback=callback)
+                   user_meta='{"test_key":"test_val"}', stream="dataset_stream", callback=callback)
 
 # send meta only
 producer.send_file(3, local_path="./not_exist", exposed_path="./whatever",
@@ -187,6 +187,9 @@ info = producer.stream_info('stream')
 assert_eq(info['lastId'], 3, "last id from different stream")
 assert_eq(info['finished'], True, "stream finished")
 
+info = producer.stream_info('dataset_stream')
+assert_eq(info['lastId'], 2, "last id from stream with datasets")
+
 info = producer.stream_info('not_exist')
 assert_eq(info['lastId'], 0, "last id from non existing stream")
 
-- 
GitLab