From f2f5309148bb7542ffd03d06a02051d525347706 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Tue, 15 Jun 2021 13:32:13 +0200
Subject: [PATCH] refactor mets scheme

---
 broker/src/asapo_broker/database/mongodb.go   |  8 +++-
 .../src/asapo_broker/database/mongodb_test.go |  7 +--
 common/cpp/include/asapo/database/database.h  |  2 +
 .../include/asapo/unittests/MockDatabase.h    |  5 +++
 common/cpp/src/database/mongodb_client.cpp    | 43 ++++++++++++++-----
 common/cpp/src/database/mongodb_client.h      |  7 ++-
 consumer/api/python/asapo_consumer.pyx.in     |  2 -
 .../consumer/getnext_python/check_linux.sh    |  2 +-
 .../consumer/getnext_python/check_windows.bat |  2 +-
 .../automatic/broker/get_meta/check_linux.sh  |  4 +-
 .../broker/get_meta/check_windows.bat         |  8 ++--
 .../consumer_api_python/check_linux.sh        |  4 +-
 .../consumer_api_python/check_windows.bat     |  4 +-
 tests/automatic/mongo_db/CMakeLists.txt       |  2 +-
 .../insert_retrieve_mongodb.cpp               | 15 -------
 .../mongo_db/{upsert => meta}/CMakeLists.txt  | 10 ++---
 .../{upsert => meta}/cleanup_linux.sh         |  0
 .../{upsert => meta}/cleanup_windows.bat      |  0
 .../meta_mongodb.cpp}                         | 28 +++++++++++-
 19 files changed, 99 insertions(+), 54 deletions(-)
 rename tests/automatic/mongo_db/{upsert => meta}/CMakeLists.txt (61%)
 rename tests/automatic/mongo_db/{upsert => meta}/cleanup_linux.sh (100%)
 rename tests/automatic/mongo_db/{upsert => meta}/cleanup_windows.bat (100%)
 rename tests/automatic/mongo_db/{upsert/upsert_mongodb.cpp => meta/meta_mongodb.cpp} (55%)

diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go
index af776a39a..d248f016f 100644
--- a/broker/src/asapo_broker/database/mongodb.go
+++ b/broker/src/asapo_broker/database/mongodb.go
@@ -703,9 +703,15 @@ func (db *Mongodb) getMeta(request Request) ([]byte, error) {
 		logger.Debug(log_str)
 		return nil, &DBError{utils.StatusNoData, err.Error()}
 	}
+	userMeta,ok:=res["meta"]
+	if !ok {
+		log_str := "error getting meta for " + id + " in " + request.DbName + " : cannot parse database response"
+		logger.Error(log_str)
+		return nil, errors.New(log_str)
+	}
 	log_str := "got metadata for " + id + " in " + request.DbName
 	logger.Debug(log_str)
-	return utils.MapToJson(&res)
+	return utils.MapToJson(&userMeta)
 }
 
 func (db *Mongodb) processQueryError(query, dbname string, err error) ([]byte, error) {
diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go
index 21cfb0943..e4c8f458d 100644
--- a/broker/src/asapo_broker/database/mongodb_test.go
+++ b/broker/src/asapo_broker/database/mongodb_test.go
@@ -556,7 +556,7 @@ func TestMongoDBResetCounter(t *testing.T) {
 func TestMongoDBGetMetaBtOK(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
-	rec_expect, _ := json.Marshal(recbt)
+	rec_expect, _ := json.Marshal(recbt.Meta)
 	db.insertMeta(dbname, &recbt)
 
 	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: "whatever", Op: "meta", ExtraParam: "0"})
@@ -568,7 +568,7 @@ func TestMongoDBGetMetaBtOK(t *testing.T) {
 func TestMongoDBGetMetaStOK(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
-	rec_expect, _ := json.Marshal(recst)
+	rec_expect, _ := json.Marshal(recst.Meta)
 	db.insertMeta(dbname, &recst)
 
 	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "meta", ExtraParam: "1"})
@@ -581,8 +581,9 @@ func TestMongoDBGetMetaErr(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 
-	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "meta", ExtraParam: metaID})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "meta", ExtraParam: "1"})
 	assert.NotNil(t, err)
+	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 }
 
 type MetaData struct {
diff --git a/common/cpp/include/asapo/database/database.h b/common/cpp/include/asapo/database/database.h
index 678983382..e21af4875 100644
--- a/common/cpp/include/asapo/database/database.h
+++ b/common/cpp/include/asapo/database/database.h
@@ -26,6 +26,8 @@ class Database {
     virtual Error GetStreamInfo(const std::string& collection, StreamInfo* info) const  = 0;
     virtual Error GetLastStream(StreamInfo* info) const  = 0;
     virtual Error DeleteStream(const std::string& stream) const = 0;
+    virtual Error GetMetaFromDb(const std::string& collection, const std::string& id, std::string* res) const = 0;
+
     virtual ~Database() = default;
 };
 
diff --git a/common/cpp/include/asapo/unittests/MockDatabase.h b/common/cpp/include/asapo/unittests/MockDatabase.h
index 65631cdd8..f8c246436 100644
--- a/common/cpp/include/asapo/unittests/MockDatabase.h
+++ b/common/cpp/include/asapo/unittests/MockDatabase.h
@@ -48,6 +48,11 @@ class MockDatabase : public Database {
         return Error{GetSetById_t(collection, set_id, id, file)};
     }
 
+    Error GetMetaFromDb(const std::string& collection, const std::string& id, std::string* res) const override {
+        return Error{GetMetaFromDb_t(collection, id, res)};
+    }
+    MOCK_CONST_METHOD3(GetMetaFromDb_t, ErrorInterface * (const std::string&, const std::string&, std::string* res));
+
     MOCK_CONST_METHOD4(GetSetById_t, ErrorInterface * (const std::string&, uint64_t set_id, uint64_t id, MessageMeta*));
 
     Error GetStreamInfo(const std::string& collection, StreamInfo* info) const override {
diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp
index 747c4ed23..4054377b7 100644
--- a/common/cpp/src/database/mongodb_client.cpp
+++ b/common/cpp/src/database/mongodb_client.cpp
@@ -67,7 +67,7 @@ Error MongoDBClient::UpdateCurrentCollectionIfNeeded(const std::string& collecti
 
     auto encoded_name  = EncodeColName(collection_name);
     if (encoded_name.size() > maxCollectionNameLength) {
-        return DBErrorTemplates::kWrongInput.Generate("stream name too long");
+        return DBErrorTemplates::kWrongInput.Generate("collection name too long");
     }
 
     current_collection_ = mongoc_client_get_collection(client_, database_name_.c_str(),
@@ -154,13 +154,14 @@ bson_p PrepareBsonDocument(const uint8_t* json, ssize_t len, Error* err) {
         return nullptr;
     }
 
-    auto bson = bson_new_from_json(json, len, &mongo_err);
-    if (!bson) {
+    auto bson_meta = bson_new_from_json(json, len, &mongo_err);
+    if (!bson_meta) {
         *err = DBErrorTemplates::kJsonParseError.Generate(mongo_err.message);
         return nullptr;
     }
-
-    if (!BSON_APPEND_UTF8(bson, "schema_version", GetDbSchemaVersion().c_str())) {
+    auto bson =  bson_new();
+    if (!BSON_APPEND_DOCUMENT(bson, "meta", bson_meta)
+            || !BSON_APPEND_UTF8(bson, "schema_version", GetDbSchemaVersion().c_str())) {
         *err = DBErrorTemplates::kInsertError.Generate("cannot add schema version ");
         return nullptr;
     }
@@ -322,7 +323,8 @@ Error MongoDBClient::InsertAsDatasetMessage(const std::string& collection, const
     return err;
 }
 
-Error MongoDBClient::GetRecordFromDb(const std::string& collection, uint64_t id, GetRecordMode mode,
+Error MongoDBClient::GetRecordFromDb(const std::string& collection, uint64_t id, const std::string& string_id,
+                                     GetRecordMode mode,
                                      std::string* res) const {
     if (!connected_) {
         return DBErrorTemplates::kNotConnected.Generate();
@@ -341,6 +343,10 @@ Error MongoDBClient::GetRecordFromDb(const std::string& collection, uint64_t id,
     char* str;
 
     switch (mode) {
+    case GetRecordMode::kByStringId:
+        filter = BCON_NEW ("_id", BCON_UTF8(string_id.c_str()));
+        opts = BCON_NEW ("limit", BCON_INT64(1));
+        break;
     case GetRecordMode::kById:
         filter = BCON_NEW ("_id", BCON_INT64(id));
         opts = BCON_NEW ("limit", BCON_INT64(1));
@@ -382,7 +388,7 @@ Error MongoDBClient::GetRecordFromDb(const std::string& collection, uint64_t id,
 
 Error MongoDBClient::GetById(const std::string& collection, uint64_t id, MessageMeta* file) const {
     std::string record_str;
-    auto err = GetRecordFromDb(collection, id, GetRecordMode::kById, &record_str);
+    auto err = GetRecordFromDb(collection, id, "", GetRecordMode::kById, &record_str);
     if (err) {
         return err;
     }
@@ -398,7 +404,7 @@ Error MongoDBClient::GetDataSetById(const std::string& collection,
                                     uint64_t id,
                                     MessageMeta* file) const {
     std::string record_str;
-    auto err = GetRecordFromDb(collection, id, GetRecordMode::kById, &record_str);
+    auto err = GetRecordFromDb(collection, id, "", GetRecordMode::kById, &record_str);
     if (err) {
         return err;
     }
@@ -500,7 +506,7 @@ Error StreamInfoFromDbResponse(const std::string& last_record_str,
 
 Error MongoDBClient::GetStreamInfo(const std::string& collection, StreamInfo* info) const {
     std::string last_record_str, earliest_record_str;
-    auto err = GetRecordFromDb(collection, 0, GetRecordMode::kLast, &last_record_str);
+    auto err = GetRecordFromDb(collection, 0, "", GetRecordMode::kLast, &last_record_str);
     if (err) {
         if (err
                 == DBErrorTemplates::kNoRecord) { // with noRecord error it will return last_id = 0 which can be used to understand that the stream is not started yet
@@ -509,7 +515,7 @@ Error MongoDBClient::GetStreamInfo(const std::string& collection, StreamInfo* in
         }
         return err;
     }
-    err = GetRecordFromDb(collection, 0, GetRecordMode::kEarliest, &earliest_record_str);
+    err = GetRecordFromDb(collection, 0, "", GetRecordMode::kEarliest, &earliest_record_str);
     if (err) {
         return err;
     }
@@ -646,9 +652,24 @@ Error MongoDBClient::DeleteStream(const std::string& stream) const {
         DeleteCollections(acks_col);
         std::string querystr = ".*_" + EscapeQuery(stream_encoded) + "$";
         DeleteDocumentsInCollection("current_location", querystr);
-        DeleteDocumentsInCollection("meta", "^" + EscapeQuery(stream_encoded) + "$");
     }
+    DeleteDocumentsInCollection("meta", "^" + EscapeQuery(stream_encoded) + "$");
     return err;
 }
 
+Error MongoDBClient::GetMetaFromDb(const std::string& collection, const std::string& id, std::string* res) const {
+    std::string meta_str;
+    auto err = GetRecordFromDb(collection, 0, id, GetRecordMode::kByStringId, &meta_str);
+    if (err) {
+        return err;
+    }
+    auto parser = JsonStringParser(meta_str);
+    err = parser.Embedded("meta").GetRawString(res);
+    if (err) {
+        return DBErrorTemplates::kJsonParseError.Generate(
+                   "GetMetaFromDb: cannot parse database response: " + err->Explain());
+    }
+    return nullptr;
+}
+
 }
diff --git a/common/cpp/src/database/mongodb_client.h b/common/cpp/src/database/mongodb_client.h
index 517e75a64..8e6a2dda2 100644
--- a/common/cpp/src/database/mongodb_client.h
+++ b/common/cpp/src/database/mongodb_client.h
@@ -36,7 +36,8 @@ using bson_p = std::unique_ptr<_bson_t, BsonDestroyFunctor>;
 enum class GetRecordMode {
     kById,
     kLast,
-    kEarliest
+    kEarliest,
+    kByStringId,
 };
 
 const size_t maxDbNameLength = 63;
@@ -56,6 +57,7 @@ class MongoDBClient final : public Database {
     Error GetStreamInfo(const std::string& collection, StreamInfo* info) const override;
     Error GetLastStream(StreamInfo* info) const override;
     Error DeleteStream(const std::string& stream) const override;
+    Error GetMetaFromDb(const std::string& collection, const std::string& id, std::string* res) const override;
     ~MongoDBClient() override;
   private:
     mongoc_client_t* client_{nullptr};
@@ -73,7 +75,8 @@ class MongoDBClient final : public Database {
     Error InsertBsonDocument(const bson_p& document, bool ignore_duplicates) const;
     Error ReplaceBsonDocument(const std::string& id, const bson_p& document, bool upsert) const;
     Error AddBsonDocumentToArray(bson_t* query, bson_t* update, bool ignore_duplicates) const;
-    Error GetRecordFromDb(const std::string& collection, uint64_t id, GetRecordMode mode, std::string* res) const;
+    Error GetRecordFromDb(const std::string& collection, uint64_t id, const std::string& string_id, GetRecordMode mode,
+                          std::string* res) const;
     Error UpdateLastStreamInfo(const char* str, StreamInfo* info) const;
     Error DeleteCollection(const std::string& name) const;
     Error DeleteCollections(const std::string& prefix) const;
diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in
index 61f06611e..1efc446dc 100644
--- a/consumer/api/python/asapo_consumer.pyx.in
+++ b/consumer/api/python/asapo_consumer.pyx.in
@@ -375,7 +375,6 @@ cdef class PyConsumer:
         if err:
             throw_exception(err)
         meta = json.loads(_str(meta_str))
-        del meta['_id']
         return meta
     def get_stream_meta(self, stream = 'default'):
         cdef Error err
@@ -386,7 +385,6 @@ cdef class PyConsumer:
         if err:
             throw_exception(err)
         meta = json.loads(_str(meta_str))
-        del meta['_id']
         return meta
 
     def interrupt_current_operation(self):
diff --git a/examples/consumer/getnext_python/check_linux.sh b/examples/consumer/getnext_python/check_linux.sh
index 543137fc3..f461f7d51 100644
--- a/examples/consumer/getnext_python/check_linux.sh
+++ b/examples/consumer/getnext_python/check_linux.sh
@@ -20,7 +20,7 @@ do
 	echo 'db.data_default.insert({"_id":'$i',"size":100,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name}
 done
 
-echo 'db.meta.insert({"_id":"bt","meta_test":"test"})' | mongo ${database_name}
+echo 'db.meta.insert({"_id":"bt","meta":{"data":"test_bt"}})' | mongo ${database_name}
 
 sleep 1
 
diff --git a/examples/consumer/getnext_python/check_windows.bat b/examples/consumer/getnext_python/check_windows.bat
index f69c97c8e..9c0f94087 100644
--- a/examples/consumer/getnext_python/check_windows.bat
+++ b/examples/consumer/getnext_python/check_windows.bat
@@ -10,7 +10,7 @@ set group_id=bif31l2uiddd4r0q6b40
 for /l %%x in (1, 1, 3) do echo db.data_default.insert({"_id":%%x,"size":100,"name":"%%x","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}}) | %mongo_exe% %database_name%  || goto :error
 
 
-echo db.meta.insert({"_id":"bt","meta_test":"test"}) | %mongo_exe% %database_name%  || goto :error
+echo db.meta.insert({"_id":"bt","meta":{"meta_test":"test"}}) | %mongo_exe% %database_name%  || goto :error
 
 set PYTHONPATH=%1
 
diff --git a/tests/automatic/broker/get_meta/check_linux.sh b/tests/automatic/broker/get_meta/check_linux.sh
index 67186b746..c1d9e4d45 100644
--- a/tests/automatic/broker/get_meta/check_linux.sh
+++ b/tests/automatic/broker/get_meta/check_linux.sh
@@ -11,8 +11,8 @@ Cleanup() {
 	echo "db.dropDatabase()" | mongo ${database_name}
 }
 
-echo 'db.meta.insert({"_id":"bt","data":"test_bt"})' | mongo ${database_name}
-echo 'db.meta.insert({"_id":"st_test","data":"test_st"})' | mongo ${database_name}
+echo 'db.meta.insert({"_id":"bt","meta":{"data":"test_bt"}})' | mongo ${database_name}
+echo 'db.meta.insert({"_id":"st_test","meta":{"data":"test_st"}})' | mongo ${database_name}
 
 token=$BT_TEST_TOKEN
 
diff --git a/tests/automatic/broker/get_meta/check_windows.bat b/tests/automatic/broker/get_meta/check_windows.bat
index 42287f984..0631edfae 100644
--- a/tests/automatic/broker/get_meta/check_windows.bat
+++ b/tests/automatic/broker/get_meta/check_windows.bat
@@ -1,8 +1,8 @@
 SET database_name=data_detector
 SET mongo_exe="c:\Program Files\MongoDB\Server\4.2\bin\mongo.exe"
 
-echo db.meta.insert({"_id":"bt"}) | %mongo_exe% %database_name%  || goto :error
-echo db.meta.insert({"_id":"st_test"}) | %mongo_exe% %database_name%  || goto :error
+echo db.meta.insert({"_id":"bt","meta":{"data":"test_bt"}}) | %mongo_exe% %database_name%  || goto :error
+echo db.meta.insert({"_id":"st_test","meta":{"data":"test_st"}}) | %mongo_exe% %database_name%  || goto :error
 
 curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.3 > broker
 set /P broker=< broker
@@ -10,8 +10,8 @@ set /P broker=< broker
 set token=%BT_DATA_TOKEN%
 
 
-C:\Curl\curl.exe -v  --silent %broker%/v0.2/beamtime/data/detector/default/0/meta/0?token=%token% --stderr - | findstr /c:\"_id\":\"bt\"  || goto :error
-C:\Curl\curl.exe -v  --silent %broker%/v0.2/beamtime/data/detector/test/0/meta/1?token=%token% --stderr - | findstr /c:\"_id\":\"st_test\"  || goto :error
+C:\Curl\curl.exe -v  --silent %broker%/v0.2/beamtime/data/detector/default/0/meta/0?token=%token% --stderr - | findstr /c:\"data\":\"test_bt\"  || goto :error
+C:\Curl\curl.exe -v  --silent %broker%/v0.2/beamtime/data/detector/test/0/meta/1?token=%token% --stderr - | findstr /c:\"data\":\"test_st\"  || goto :error
 C:\Curl\curl.exe -v  --silent %broker%/v0.2/beamtime/data/detector/default/0/meta/1?token=%token% --stderr - | findstr /c:"no documents"  || goto :error
 
 
diff --git a/tests/automatic/consumer/consumer_api_python/check_linux.sh b/tests/automatic/consumer/consumer_api_python/check_linux.sh
index 072a55de6..04ea2c9d1 100644
--- a/tests/automatic/consumer/consumer_api_python/check_linux.sh
+++ b/tests/automatic/consumer/consumer_api_python/check_linux.sh
@@ -28,8 +28,8 @@ done
 
 echo 'db.data_streamfts.insert({"_id":'1',"size":0,"name":"'1'","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name}
 
-echo 'db.meta.insert({"_id":"bt","data":"test_bt"})' | mongo ${database_name}
-echo 'db.meta.insert({"_id":"st_test","data":"test_st"})' | mongo ${database_name}
+echo 'db.meta.insert({"_id":"bt","meta":{"data":"test_bt"}})' | mongo ${database_name}
+echo 'db.meta.insert({"_id":"st_test","meta":{"data":"test_st"}})' | mongo ${database_name}
 
 
 for i in `seq 1 5`;
diff --git a/tests/automatic/consumer/consumer_api_python/check_windows.bat b/tests/automatic/consumer/consumer_api_python/check_windows.bat
index b1f8423eb..5af06fdd7 100644
--- a/tests/automatic/consumer/consumer_api_python/check_windows.bat
+++ b/tests/automatic/consumer/consumer_api_python/check_windows.bat
@@ -14,8 +14,8 @@ for /l %%x in (1, 1, 5) do echo db.data_default.insert({"_id":%%x,"size":6,"name
 
 echo db.data_streamfts.insert({"_id":1,"size":0,"name":"1","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}}) | %mongo_exe% %database_name%  || goto :error
 
-echo db.meta.insert({"_id":"bt","data":"test_bt"}) | %mongo_exe% %database_name%
-echo db.meta.insert({"_id":"st_test","data":"test_st"}) | %mongo_exe% %database_name%
+echo db.meta.insert({"_id":"bt","meta":{"data":"test_bt"}}) | %mongo_exe% %database_name%  || goto :error
+echo db.meta.insert({"_id":"st_test","meta":{"data":"test_st"}}) | %mongo_exe% %database_name%  || goto :error
 
 
 for /l %%x in (1, 1, 5) do echo db.data_stream1.insert({"_id":%%x,"size":6,"name":"1%%x","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}}) | %mongo_exe% %database_name%  || goto :error
diff --git a/tests/automatic/mongo_db/CMakeLists.txt b/tests/automatic/mongo_db/CMakeLists.txt
index e02447414..9357532e7 100644
--- a/tests/automatic/mongo_db/CMakeLists.txt
+++ b/tests/automatic/mongo_db/CMakeLists.txt
@@ -3,5 +3,5 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.7) # needed for fixtures
 add_subdirectory(connect)
 add_subdirectory(insert_retrieve)
 add_subdirectory(insert_retrieve_dataset)
-add_subdirectory(upsert)
+add_subdirectory(meta)
 
diff --git a/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp b/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp
index 9fdcc4cac..d21e7051f 100644
--- a/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp
+++ b/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp
@@ -98,21 +98,6 @@ int main(int argc, char* argv[]) {
         err = db_new.GetById(std::string("data_") + stream_name, 0, &fi_db);
         Assert(err, "No record");
 
-
-// metadata
-        asapo::MetaIngestMode mode{asapo::MetaIngestOp::kInsert, false};
-        std::string meta = R"({"data":"test"})";
-        err = db_new.InsertMeta("meta", stream_name, reinterpret_cast<const uint8_t*>(meta.c_str()), meta.size(), mode);
-        M_AssertEq(nullptr, err);
-        err = db_new.InsertMeta("meta", stream_name, reinterpret_cast<const uint8_t*>(meta.c_str()), meta.size(), mode);
-        M_AssertTrue(err == asapo::DBErrorTemplates::kDuplicateID);
-        mode.op = asapo::MetaIngestOp::kReplace;
-        err = db_new.InsertMeta("meta", stream_name, reinterpret_cast<const uint8_t*>(meta.c_str()), meta.size(), mode);
-        M_AssertEq(nullptr, err);
-        err = db_new.InsertMeta("meta", "notexist", reinterpret_cast<const uint8_t*>(meta.c_str()), meta.size(), mode);
-        M_AssertTrue(err == asapo::DBErrorTemplates::kInsertError);
-
-
         asapo::StreamInfo info;
 
         err = db.GetStreamInfo(std::string("data_") + stream_name, &info);
diff --git a/tests/automatic/mongo_db/upsert/CMakeLists.txt b/tests/automatic/mongo_db/meta/CMakeLists.txt
similarity index 61%
rename from tests/automatic/mongo_db/upsert/CMakeLists.txt
rename to tests/automatic/mongo_db/meta/CMakeLists.txt
index 67061b280..609cd1581 100644
--- a/tests/automatic/mongo_db/upsert/CMakeLists.txt
+++ b/tests/automatic/mongo_db/meta/CMakeLists.txt
@@ -1,5 +1,5 @@
-set(TARGET_NAME upsert_mongodb)
-set(SOURCE_FILES upsert_mongodb.cpp)
+set(TARGET_NAME meta_mongodb)
+set(SOURCE_FILES meta_mongodb.cpp)
 
 
 ################################
@@ -12,9 +12,9 @@ target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR}
 ################################
 # Testing
 ################################
-add_integration_test(${TARGET_NAME} upsertOK "OK" "OK")
-add_integration_test(${TARGET_NAME} upsertFailsWhenNotConnected
+add_integration_test(${TARGET_NAME} metaOK "OK" "OK")
+add_integration_test(${TARGET_NAME} metaFailsWhenNotConnected
         "Notconnected"
         "Notconnected")
-add_integration_test(${TARGET_NAME} upsertFailsOnWrongMeta "parseerror" "parseerror")
+add_integration_test(${TARGET_NAME} metaFailsOnWrongMeta "parseerror" "parseerror")
 add_test_cleanup(${TARGET_NAME})
diff --git a/tests/automatic/mongo_db/upsert/cleanup_linux.sh b/tests/automatic/mongo_db/meta/cleanup_linux.sh
similarity index 100%
rename from tests/automatic/mongo_db/upsert/cleanup_linux.sh
rename to tests/automatic/mongo_db/meta/cleanup_linux.sh
diff --git a/tests/automatic/mongo_db/upsert/cleanup_windows.bat b/tests/automatic/mongo_db/meta/cleanup_windows.bat
similarity index 100%
rename from tests/automatic/mongo_db/upsert/cleanup_windows.bat
rename to tests/automatic/mongo_db/meta/cleanup_windows.bat
diff --git a/tests/automatic/mongo_db/upsert/upsert_mongodb.cpp b/tests/automatic/mongo_db/meta/meta_mongodb.cpp
similarity index 55%
rename from tests/automatic/mongo_db/upsert/upsert_mongodb.cpp
rename to tests/automatic/mongo_db/meta/meta_mongodb.cpp
index ffdbfda51..8b6555c6d 100644
--- a/tests/automatic/mongo_db/upsert/upsert_mongodb.cpp
+++ b/tests/automatic/mongo_db/meta/meta_mongodb.cpp
@@ -3,6 +3,7 @@
 
 #include "../../../common/cpp/src/database/mongodb_client.h"
 #include "testing.h"
+#include "asapo/database/db_error.h"
 
 using asapo::Error;
 
@@ -40,6 +41,8 @@ int main(int argc, char* argv[]) {
         json = R"({"id1":{"test1":2}})";
     }
 
+    auto stream_name = R"(stream/test_/\ ."$)";
+
 
     if (args.keyword != "Notconnected") {
         db.Connect("127.0.0.1", "test");
@@ -50,16 +53,37 @@ int main(int argc, char* argv[]) {
     if (err) {
         std::cout << err->Explain() << std::endl;
     }
-
     Assert(err, args.keyword);
 
     err = db.InsertMeta("meta", "0", reinterpret_cast<const uint8_t*>(json.c_str()), json.size(), mode);
     if (err) {
         std::cout << err->Explain() << std::endl;
     }
-
     Assert(err, args.keyword);
 
+    if (args.keyword == "OK") {
+        asapo::MetaIngestMode mode{asapo::MetaIngestOp::kInsert, false};
+        std::string meta = R"({"data":"test"})";
+        err =
+            db.InsertMeta("meta", stream_name, reinterpret_cast<const uint8_t*>(meta.c_str()), meta.size(), mode);
+        M_AssertEq(nullptr, err);
+        err =
+            db.InsertMeta("meta", stream_name, reinterpret_cast<const uint8_t*>(meta.c_str()), meta.size(), mode);
+        M_AssertTrue(err == asapo::DBErrorTemplates::kDuplicateID);
+        mode.op = asapo::MetaIngestOp::kReplace;
+        err =
+            db.InsertMeta("meta", stream_name, reinterpret_cast<const uint8_t*>(meta.c_str()), meta.size(), mode);
+        M_AssertEq(nullptr, err);
+        err = db.InsertMeta("meta", "notexist", reinterpret_cast<const uint8_t*>(meta.c_str()), meta.size(), mode);
+        M_AssertTrue(err == asapo::DBErrorTemplates::kInsertError);
+
+        std::string meta_res;
+        err = db.GetMetaFromDb("meta", "0", &meta_res);
+        M_AssertEq(meta_res, json);
+
+
+        db.DeleteStream(stream_name);
+    }
 
     return 0;
 }
-- 
GitLab