From 84606c50c77e580c2717cdcb117a8fcb4df8e443 Mon Sep 17 00:00:00 2001
From: karnem <mikhail.karnevskiy@desy.de>
Date: Mon, 28 Aug 2023 13:31:52 +0200
Subject: [PATCH] Fix some of CI tests. Add message_id parameter. Fix _id and
 message_id types to int32.

---
 broker/src/asapo_broker/database/database.go  |  2 +-
 .../asapo_broker/server/get_commands_test.go  |  6 ++---
 .../server/process_request_test.go            | 25 +++++++++++--------
 .../data_structs/test_data_structs.cpp        |  5 ++--
 .../consumer/getnext_python/check_linux.sh    |  2 +-
 examples/consumer/getnext_python/getnext.py   |  2 +-
 .../test_request_handler_db_writer.cpp        |  3 ++-
 .../automatic/broker/get_last/check_linux.sh  | 25 ++++++++++---------
 .../automatic/broker/get_next/check_linux.sh  | 13 +++++-----
 .../consumer/consumer_api/check_linux.sh      |  2 +-
 .../automatic/support/getnext/check_linux.sh  |  2 +-
 11 files changed, 46 insertions(+), 41 deletions(-)

diff --git a/broker/src/asapo_broker/database/database.go b/broker/src/asapo_broker/database/database.go
index a620d929c..56a07d25a 100644
--- a/broker/src/asapo_broker/database/database.go
+++ b/broker/src/asapo_broker/database/database.go
@@ -18,7 +18,7 @@ type Request struct {
 
 type ExtraParamId struct {
 	Id    int    `json:"id"`
-	IdKey string `json:"idKey"`
+	IdKey string `json:"id_key"`
 }
 
 type ExtraParamNext struct {
diff --git a/broker/src/asapo_broker/server/get_commands_test.go b/broker/src/asapo_broker/server/get_commands_test.go
index b0ee5d04f..9ccc2ea5b 100644
--- a/broker/src/asapo_broker/server/get_commands_test.go
+++ b/broker/src/asapo_broker/server/get_commands_test.go
@@ -44,13 +44,13 @@ var testsGetCommand = []struct {
 	externalParam string
 }{
 	{"last", expectedSource, expectedStream, "", expectedStream + "/0/last", "", "0"},
-	{"id", expectedSource, expectedStream, "", expectedStream + "/0/1", "", "1"},
+	{"id", expectedSource, expectedStream, "", expectedStream + "/0/1", "&id=1", "{\"id\":1,\"id_key\":\"message_id\"}"},
 	{"meta", expectedSource, "default", "", "default/0/meta/0", "", "0"},
 	{"nacks", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/nacks", "", "0_0"},
 	{"groupedlast", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/groupedlast", "", ""},
-	{"next", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/next", "", ""},
+	{"next", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/next", "", "{\"id_key\":\"_id\",\"resend\":false,\"delay_ms\":0,\"resend_attempts\":-1}"},
 	{"next", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" +
-		expectedGroupID + "/next", "&resend_nacks=true&delay_ms=10000&resend_attempts=3", "10000_3"},
+		expectedGroupID + "/next", "&resend_nacks=true&delay_ms=10000&resend_attempts=3", "{\"id_key\":\"_id\",\"resend\":true,\"delay_ms\":10000,\"resend_attempts\":3}"},
 	{"size", expectedSource, expectedStream, "", expectedStream + "/size", "", ""},
 	{"size", expectedSource, expectedStream, "", expectedStream + "/size", "&incomplete=true", "true"},
 	{"streams", expectedSource, "0", "", "0/streams", "", "{\"from\":\"\",\"filter\":\"\",\"detailed\":\"\"}"},
diff --git a/broker/src/asapo_broker/server/process_request_test.go b/broker/src/asapo_broker/server/process_request_test.go
index a9c5f53f3..06571264c 100644
--- a/broker/src/asapo_broker/server/process_request_test.go
+++ b/broker/src/asapo_broker/server/process_request_test.go
@@ -45,7 +45,7 @@ func (a *MockAuthServer) AuthorizeToken(tokenJWT string) (token Token, err error
 		}, nil
 	}
 
-	return Token{}, &AuthorizationError{errors.New("wrong or expired JWT token"),http.StatusUnauthorized}
+	return Token{}, &AuthorizationError{errors.New("wrong or expired JWT token"), http.StatusUnauthorized}
 }
 
 func prepareTestAuth() {
@@ -68,7 +68,7 @@ type request struct {
 
 func containsMatcherMap(substrings ...string) func(map[string]interface{}) bool {
 	return func(vals map[string]interface{}) bool {
-		res,_:=utils.MapToJson(vals)
+		res, _ := utils.MapToJson(vals)
 		for _, substr := range substrings {
 			if !strings.Contains(string(res), substr) {
 				return false
@@ -89,7 +89,6 @@ func containsMatcherStr(substrings ...string) func(str string) bool {
 	}
 }
 
-
 func doRequest(path string, extra_params ...string) *httptest.ResponseRecorder {
 	m := "GET"
 	if len(extra_params) > 0 {
@@ -167,7 +166,8 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithNoToken() {
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() {
 
-	expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"}
+	expectedRequest := database.Request{Beamtime: expectedBeamtimeId, DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next",
+		ExtraParam: "{\"id_key\":\"_id\",\"resend\":false,\"delay_ms\":0,\"resend_attempts\":-1}"}
 
 	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""),
 		&database.DBError{utils.StatusNoData, ""})
@@ -176,7 +176,6 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName()
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("got request")))
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("no data or partial data")))
 
-
 	w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix)
 
 	suite.Equal(http.StatusConflict, w.Code, "wrong database name")
@@ -184,12 +183,13 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName()
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() {
 
-	expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"}
+	expectedRequest := database.Request{Beamtime: expectedBeamtimeId, DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next",
+		ExtraParam: "{\"id_key\":\"_id\",\"resend\":false,\"delay_ms\":0,\"resend_attempts\":-1}"}
 
 	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""),
 		&database.DBError{utils.StatusServiceUnavailable, ""})
 
-    logger.MockLog.On("WithFields", mock.Anything)
+	logger.MockLog.On("WithFields", mock.Anything)
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("got request")))
 	logger.MockLog.On("Error", mock.MatchedBy(containsMatcherStr("cannot process request")))
 	ExpectReconnect(suite.mock_db)
@@ -202,7 +202,8 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() {
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() {
 
-	expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"}
+	expectedRequest := database.Request{Beamtime: expectedBeamtimeId, DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next",
+		ExtraParam: "{\"id_key\":\"_id\",\"resend\":false,\"delay_ms\":0,\"resend_attempts\":-1}"}
 
 	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), errors.New(""))
 
@@ -220,7 +221,8 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() {
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() {
 
-	expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"}
+	expectedRequest := database.Request{Beamtime: expectedBeamtimeId, DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next",
+		ExtraParam: "{\"id_key\":\"_id\",\"resend\":false,\"delay_ms\":0,\"resend_attempts\":-1}"}
 	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil)
 
 	logger.MockLog.On("WithFields", mock.Anything)
@@ -232,7 +234,8 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() {
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() {
 
-	expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, DatasetOp: true, Op: "next"}
+	expectedRequest := database.Request{Beamtime: expectedBeamtimeId, DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, DatasetOp: true, Op: "next",
+		ExtraParam: "{\"id_key\":\"_id\",\"resend\":false,\"delay_ms\":0,\"resend_attempts\":-1}"}
 	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil)
 
 	logger.MockLog.On("WithFields", mock.Anything)
@@ -259,7 +262,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestDeleteStreamReadToken()
 func (suite *ProcessRequestTestSuite) TestProcessRequestDeleteStreamWriteToken() {
 	query_str := "query_string"
 
-	expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: "", Op: "delete_stream", ExtraParam: query_str}
+	expectedRequest := database.Request{Beamtime: expectedBeamtimeId, DataSource: expectedSource, Stream: expectedStream, GroupId: "", Op: "delete_stream", ExtraParam: query_str}
 	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil)
 
 	logger.MockLog.On("WithFields", mock.MatchedBy(containsMatcherMap("delete_stream")))
diff --git a/common/cpp/unittests/data_structs/test_data_structs.cpp b/common/cpp/unittests/data_structs/test_data_structs.cpp
index 85222ab50..af3679afa 100644
--- a/common/cpp/unittests/data_structs/test_data_structs.cpp
+++ b/common/cpp/unittests/data_structs/test_data_structs.cpp
@@ -30,6 +30,7 @@ MessageMeta PrepareMessageMeta(bool includeNewStreamField = true) {
     MessageMeta message_meta;
     message_meta.size = 100;
     message_meta.id = 1;
+    message_meta.message_id = 1;  
     message_meta.dataset_substream = 3;
     message_meta.name = std::string("folder") + asapo::kPathSeparator + "test";
     message_meta.source = "host:1234";
@@ -57,10 +58,10 @@ TEST(MessageMetaTests, CorrectConvertToJson) {
     std::string json = message_meta.Json();
     if (asapo::kPathSeparator == '/') {
         ASSERT_THAT(json, Eq(
-                        R"({"_id":1,"size":100,"name":"folder/test","timestamp":1000000,"source":"host:1234","ib_source":"","buf_id":-1,"stream":"testStream","dataset_substream":3,"ingest_mode":13,"meta":{"bla":10}})"));
+                        R"({"_id":1,"message_id":1,"size":100,"name":"folder/test","timestamp":1000000,"source":"host:1234","ib_source":"","buf_id":-1,"stream":"testStream","dataset_substream":3,"ingest_mode":13,"meta":{"bla":10}})"));
     } else {
         ASSERT_THAT(json, Eq(
-                        R"({"_id":1,"size":100,"name":"folder\\test","timestamp":1000000,"source":"host:1234","ib_source":"","buf_id":-1,"stream":"testStream","dataset_substream":3,"ingest_mode":13,"meta":{"bla":10}})"));
+                        R"({"_id":1,"message_id":1,"size":100,"name":"folder\\test","timestamp":1000000,"source":"host:1234","ib_source":"","buf_id":-1,"stream":"testStream","dataset_substream":3,"ingest_mode":13,"meta":{"bla":10}})"));
     }
 }
 
diff --git a/examples/consumer/getnext_python/check_linux.sh b/examples/consumer/getnext_python/check_linux.sh
index 5e7040bd3..8ab598fce 100644
--- a/examples/consumer/getnext_python/check_linux.sh
+++ b/examples/consumer/getnext_python/check_linux.sh
@@ -17,7 +17,7 @@ Cleanup() {
 
 for i in `seq 1 3`;
 do
-	echo 'db.data_default.insert({"_id":'$i',"size":100,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name}
+	echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":100,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name}
 done
 
 echo 'db.meta.insert({"_id":"bt","meta":{"meta_test":"test"}})' | mongo ${database_name}
diff --git a/examples/consumer/getnext_python/getnext.py b/examples/consumer/getnext_python/getnext.py
index e521c98f5..8fa7efb9c 100644
--- a/examples/consumer/getnext_python/getnext.py
+++ b/examples/consumer/getnext_python/getnext.py
@@ -22,5 +22,5 @@ print ('meta: ', json.dumps(meta, indent=4, sort_keys=True))
 try:
     beamtime_meta = consumer.get_beamtime_meta()
     print ('beamtime meta: ', json.dumps(beamtime_meta, indent=4, sort_keys=True))
-except asapo_consumer.AsapoError as err:
+except Exception as err:
     print ('error getting beamtime meta: ', err)
diff --git a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp
index d1f5aaa8c..c1a30692b 100644
--- a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp
+++ b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp
@@ -168,7 +168,8 @@ MessageMeta DbWriterHandlerTests::PrepareMessageMeta(bool substream) {
     MessageMeta message_meta;
     message_meta.size = expected_file_size;
     message_meta.name = expected_file_name;
-    message_meta.id = expected_id;
+    message_meta.id = 0;
+    message_meta.message_id = expected_id;   
     message_meta.ingest_mode = expected_ingest_mode;
     if (substream) {
         message_meta.dataset_substream = expected_substream;
diff --git a/tests/automatic/broker/get_last/check_linux.sh b/tests/automatic/broker/get_last/check_linux.sh
index 6f50ea116..aaf6f17f9 100644
--- a/tests/automatic/broker/get_last/check_linux.sh
+++ b/tests/automatic/broker/get_last/check_linux.sh
@@ -12,8 +12,8 @@ Cleanup() {
 	echo "db.dropDatabase()" | mongo ${database_name}
 }
 
-echo "db.data_${stream}.insert({"_id":2})" | mongo ${database_name}
-echo "db.data_${stream}.insert({"_id":1})" | mongo ${database_name}
+echo "db.data_${stream}.insert({"_id":NumberInt(2),"message_id":NumberInt(2)})" | mongo ${database_name}
+echo "db.data_${stream}.insert({"_id":NumberInt(1),"message_id":NumberInt(1)})" | mongo ${database_name}
 
 token=$BT_DATA_TOKEN
 
@@ -22,21 +22,22 @@ echo found broker at $broker
 
 groupid=`curl -d '' --silent $broker/v0.2/creategroup`
 
-curl -v  --silent $broker/v0.2/beamtime/data/detector/${stream}/0/last?token=$token --stderr -
 
-curl -v  --silent $broker/v0.2/beamtime/data/detector/${stream}/0/last?token=$token --stderr - | grep '"_id":2'
-curl -v  --silent $broker/v0.2/beamtime/data/detector/${stream}/0/last?token=$token --stderr - | grep '"_id":2'
+curl -v  --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=message_id" --stderr -
 
-echo "db.data_${stream}.insert({"_id":3})" | mongo ${database_name}
+curl -v  --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=message_id" --stderr - | grep '"_id":2'
+curl -v  --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=message_id" --stderr - | grep '"_id":2'
 
-curl -v  --silent $broker/v0.2/beamtime/data/detector/${stream}/0/last?token=$token --stderr - | grep '"_id":3'
+echo "db.data_${stream}.insert({"_id":NumberInt(3),"message_id":NumberInt(3)})"| mongo ${database_name}
 
-echo "db.data_${stream}.insert({"_id":4})" | mongo ${database_name}
+curl -v  --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=message_id" --stderr - | grep '"_id":3'
 
-curl -v  --silent $broker/v0.2/beamtime/data/detector/${stream}/${groupid}/next?token=$token --stderr - | grep '"_id":1'
-curl -v  --silent $broker/v0.2/beamtime/data/detector/${stream}/0/last?token=$token --stderr - | grep '"_id":4'
+echo "db.data_${stream}.insert({"_id":NumberInt(4),"message_id":NumberInt(4)})" | mongo ${database_name}
+
+curl -v  --silent "${broker}/v0.2/beamtime/data/detector/${stream}/${groupid}/next?token=${token}&id_key=message_id" --stderr - | grep '"_id":1'
+curl -v  --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=message_id" --stderr - | grep '"_id":4'
 
 #with a new group
 groupid=`curl -d '' --silent $broker/v0.2/creategroup`
-curl -v  --silent $broker/v0.2/beamtime/data/detector/${stream}/${groupid}/next?token=$token --stderr - | grep '"_id":1'
-curl -v  --silent $broker/v0.2/beamtime/data/detector/${stream}/0/last?token=$token --stderr - | grep '"_id":4'
\ No newline at end of file
+curl -v  --silent "${broker}/v0.2/beamtime/data/detector/${stream}/${groupid}/next?token=${token}&id_key=message_id" --stderr - | grep '"_id":1'
+curl -v  --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=message_id" --stderr - | grep '"_id":4'
\ No newline at end of file
diff --git a/tests/automatic/broker/get_next/check_linux.sh b/tests/automatic/broker/get_next/check_linux.sh
index 840a0f12d..aec3b1500 100644
--- a/tests/automatic/broker/get_next/check_linux.sh
+++ b/tests/automatic/broker/get_next/check_linux.sh
@@ -12,20 +12,19 @@ Cleanup() {
 	echo "db.dropDatabase()" | mongo ${database_name}
 }
 
-echo "db.data_${stream}.insert({"_id":2})" | mongo ${database_name}
-echo "db.data_${stream}.insert({"_id":1})" | mongo ${database_name}
+echo "db.data_${stream}.insert({"_id":NumberInt(2),"message_id":NumberInt(1)})" | mongo ${database_name}
+echo "db.data_${stream}.insert({"_id":NumberInt(1),"message_id":NumberInt(2)})" | mongo ${database_name}
 
 token=$BT_DATA_TOKEN
 
 broker=`curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.6`
 echo found broker at $broker
 
-
 groupid=`curl -d '' --silent $broker/v0.3/creategroup`
-curl -v  --silent $broker/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=$token --stderr - | tee /dev/stderr  | grep '"_id":1'
-curl -v  --silent $broker/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=$token --stderr - | tee /dev/stderr  | grep '"_id":2'
-curl -v  --silent $broker/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=$token --stderr - | tee /dev/stderr  | grep '"id_max":2'
+curl -v  --silent "${broker}/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=${token}&id_key=message_id" --stderr - | tee /dev/stderr  | grep '"message_id":1'
+curl -v  --silent "${broker}/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=${token}&id_key=message_id" --stderr - | tee /dev/stderr  | grep '"message_id":2'
+curl -v  --silent "${broker}/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=${token}&id_key=message_id" --stderr - | tee /dev/stderr  | grep '"id_max":2'
 
 # with a new group
 groupid=`curl -d '' --silent $broker/v0.3/creategroup`
-curl -v  --silent $broker/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=$token --stderr - | tee /dev/stderr | grep '"_id":1'
\ No newline at end of file
+curl -v  --silent "${broker}/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=${token}&id_key=message_id" --stderr - | tee /dev/stderr | grep '"message_id":1'
\ No newline at end of file
diff --git a/tests/automatic/consumer/consumer_api/check_linux.sh b/tests/automatic/consumer/consumer_api/check_linux.sh
index 043ad990f..e24b52817 100644
--- a/tests/automatic/consumer/consumer_api/check_linux.sh
+++ b/tests/automatic/consumer/consumer_api/check_linux.sh
@@ -18,7 +18,7 @@ Cleanup() {
 
 for i in `seq 1 10`;
 do
-	echo 'db.data_default.insert({"_id":'$i',"size":6,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} >/dev/null
+	echo 'db.data_default.insert({"_id":'$i',"size":6,"name":"'$i'","timestamp":'100-$i',"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} >/dev/null
 done
 
 for i in `seq 1 5`;
diff --git a/tests/automatic/support/getnext/check_linux.sh b/tests/automatic/support/getnext/check_linux.sh
index 2cb761da0..c880d75dd 100644
--- a/tests/automatic/support/getnext/check_linux.sh
+++ b/tests/automatic/support/getnext/check_linux.sh
@@ -17,7 +17,7 @@ Cleanup() {
 
 for i in `seq 1 3`;
 do
-	echo 'db.data_default.insert({"_id":'$i',"size":100,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name}
+	echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":100,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name}
 done
 
 sleep 1
-- 
GitLab