From de545b9af110498165617b5560daa9805e786382 Mon Sep 17 00:00:00 2001
From: karnem <mikhail.karnevskiy@desy.de>
Date: Thu, 7 Sep 2023 12:35:27 +0200
Subject: [PATCH] Fix tests by changing from _id to message_id key.

---
 .../api/cpp/unittests/test_consumer_impl.cpp  | 29 ++++++++++++-------
 examples/pipeline/in_to_out/check_linux.sh    |  4 +--
 .../pipeline/in_to_out_python/check_linux.sh  |  7 +++--
 .../next_multithread_broker/check_linux.sh    |  2 +-
 tests/automatic/mongo_db/auto_id/auto_id.cpp  |  9 +++---
 .../insert_retrieve_mongodb.cpp               | 12 ++++----
 .../insert_retrieve_dataset_mongodb.cpp       | 23 +++++++++++----
 .../producer/python_api/producer_api.py       | 12 ++++----
 .../transfer_datasets/check_linux.sh          |  2 +-
 9 files changed, 61 insertions(+), 39 deletions(-)

diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp
index 3eaa4fd7e..81972bdd9 100644
--- a/consumer/api/cpp/unittests/test_consumer_impl.cpp
+++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp
@@ -268,8 +268,8 @@ TEST_F(ConsumerImplTests, DefaultStreamIsDetector) {
             asapo::SourceCredentials{
                 asapo::SourceType::kProcessed, "instance", "step", "beamtime_id", "", "", expected_token
                 },
-                "/beamtime/beamtime_id/detector/stream/" + expected_group_id_encoded + "/next?id_key=message_id&token=" + expected_token
-                + "&instanceid=instance&pipelinestep=step");
+                "/beamtime/beamtime_id/detector/stream/" + expected_group_id_encoded + "/next?token=" + expected_token
+                + "&instanceid=instance&pipelinestep=step&id_key=message_id");
 }
 
 TEST_F(ConsumerImplTests, DefaultPipelineStepIsDefaultStep) {
@@ -278,7 +278,7 @@ TEST_F(ConsumerImplTests, DefaultPipelineStepIsDefaultStep) {
                 asapo::SourceType::kProcessed, "instance", "", "beamtime_id", "a", "b", expected_token
                 },
                 "/beamtime/beamtime_id/b/stream/" + expected_group_id_encoded + "/next?token=" + expected_token
-                + "&instanceid=instance&pipelinestep=DefaultStep");
+                + "&instanceid=instance&pipelinestep=DefaultStep&id_key=message_id");
 }
 
 TEST_F(ConsumerImplTests, AutoPipelineStepIsDefaultStep) {
@@ -287,7 +287,7 @@ TEST_F(ConsumerImplTests, AutoPipelineStepIsDefaultStep) {
                 asapo::SourceType::kProcessed, "instance", "auto", "beamtime_id", "a", "b", expected_token
                 },
                 "/beamtime/beamtime_id/b/stream/" + expected_group_id_encoded + "/next?token=" + expected_token
-                + "&instanceid=instance&pipelinestep=DefaultStep");
+                + "&instanceid=instance&pipelinestep=DefaultStep&id_key=message_id");
 }
 
 /*
@@ -329,7 +329,8 @@ TEST_F(ConsumerImplTests, GetNextUsesCorrectUriWithStream) {
                                         +
                                         expected_stream_encoded + "/" + expected_group_id_encoded + "/next?token="
                                         + expected_token
-                                        + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _,
+                                        + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded
+                                        + "&id_key=message_id", _,
                                         _)).WillOnce(DoAll(
                                                 SetArgPointee<1>(HttpCode::OK),
                                                 SetArgPointee<2>(nullptr),
@@ -345,7 +346,8 @@ TEST_F(ConsumerImplTests, GetLastOnceUsesCorrectUri) {
                       + expected_stream_encoded +
                       "/" + expected_group_id_encoded + "/groupedlast?token="
                       + expected_token
-                      + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _,
+                      + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded
+                      + "&id_key=message_id", _,
                       _)).WillOnce(DoAll(
                                        SetArgPointee<1>(HttpCode::OK),
                                        SetArgPointee<2>(nullptr),
@@ -359,7 +361,8 @@ TEST_F(ConsumerImplTests, GetLastUsesCorrectUri) {
     EXPECT_CALL(mock_http_client,
                 Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" + expected_stream_encoded +
                       "/0/last?token=" + expected_token
-                      + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _,
+                      + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded
+                      + "&id_key=message_id", _,
                       _)).WillOnce(DoAll(
                                        SetArgPointee<1>(HttpCode::OK),
                                        SetArgPointee<2>(nullptr),
@@ -545,6 +548,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsNoDataAfterTimeoutEvenIfOtherErrorOcc
                                         "/stream/0/"
                                         + std::to_string(expected_dataset_id) + "?token=" + expected_token
                                         + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded
+                                        + "&id_key=message_id"
                                         , _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll(
                                                     SetArgPointee<1>(HttpCode::ServiceUnavailable),
                                                     SetArgPointee<2>(nullptr),
@@ -1065,7 +1069,8 @@ TEST_F(ConsumerImplTests, GetNextDatasetUsesCorrectUri) {
                                         "/stream/" +
                                         expected_group_id_encoded + "/next?token="
                                         + expected_token + "&dataset=true&minsize=0"
-                                        + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _,
+                                        + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded
+                                        + "&id_key=message_id", _,
                                         _)).WillOnce(DoAll(
                                                 SetArgPointee<1>(HttpCode::OK),
                                                 SetArgPointee<2>(nullptr),
@@ -1202,7 +1207,8 @@ TEST_F(ConsumerImplTests, GetLastDatasetUsesCorrectUri) {
                       +
                       expected_stream_encoded + "/0/last?token="
                       + expected_token + "&dataset=true&minsize=1"
-                                        + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _,
+                                        + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded
+                                        + "&id_key=message_id", _,
                       _)).WillOnce(DoAll(
                                        SetArgPointee<1>(HttpCode::OK),
                                        SetArgPointee<2>(nullptr),
@@ -1219,7 +1225,8 @@ TEST_F(ConsumerImplTests, GetLastDatasetInGroupUsesCorrectUri) {
                       +
                       expected_stream_encoded + "/" + expected_group_id_encoded + "/groupedlast?token="
                       + expected_token + "&dataset=true&minsize=1"
-                      + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _,
+                      + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded
+                      + "&id_key=message_id", _,
                       _)).WillOnce(DoAll(
                                        SetArgPointee<1>(HttpCode::OK),
                                        SetArgPointee<2>(nullptr),
@@ -1525,7 +1532,7 @@ TEST_F(ConsumerImplTests, ResendNacks) {
                                         + expected_group_id_encoded + "/next?token="
                                         + expected_token
                                         + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded
-                                        + "&resend_nacks=true&delay_ms=10000&resend_attempts=3", _,
+                                        + "&resend_nacks=true&delay_ms=10000&resend_attempts=3&id_key=message_id", _,
                                         _)).WillOnce(DoAll(
                                                 SetArgPointee<1>(HttpCode::OK),
                                                 SetArgPointee<2>(nullptr),
diff --git a/examples/pipeline/in_to_out/check_linux.sh b/examples/pipeline/in_to_out/check_linux.sh
index ceaeccdac..85c9c1660 100644
--- a/examples/pipeline/in_to_out/check_linux.sh
+++ b/examples/pipeline/in_to_out/check_linux.sh
@@ -42,7 +42,7 @@ echo hello3 > processed/file3
 
 for i in `seq 1 3`;
 do
-	echo 'db.data_default.insert({"_id":'$i',"size":6,"name":"'processed/file$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${indatabase_name}
+	echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":6,"name":"'processed/file$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${indatabase_name}
 done
 
 sleep 1
@@ -61,4 +61,4 @@ cat ${receiver_folder}/processed/file3_${data_source_out} | grep hello3
 $1 127.0.0.1:8400 $source_path $beamtime_id $data_source_in $data_source_out2 $token 2 1000 25000 0  > out2
 cat out2
 test ! -f ${receiver_folder}/processed/file1_${data_source_out2}
-echo "db.data_default.find({"_id":1})" | mongo ${outdatabase_name2} | tee /dev/stderr | grep processed/file1
+echo "db.data_default.find({"message_id":1})" | mongo ${outdatabase_name2} | tee /dev/stderr | grep processed/file1
diff --git a/examples/pipeline/in_to_out_python/check_linux.sh b/examples/pipeline/in_to_out_python/check_linux.sh
index c2d2fb48b..6a3f44746 100644
--- a/examples/pipeline/in_to_out_python/check_linux.sh
+++ b/examples/pipeline/in_to_out_python/check_linux.sh
@@ -33,7 +33,7 @@ Cleanup() {
   	echo "db.dropDatabase()" | mongo ${outdatabase_name}
   	rm -rf processed
     rm -rf ${receiver_root_folder}
-#    rm -rf out
+    rm -rf out
 
 }
 
@@ -46,7 +46,7 @@ echo hello3 > processed/file3
 
 for i in `seq 1 3`;
 do
-	echo 'db.data_default.insert({"_id":'$i',"size":6,"name":"'processed/file$i'","timestamp":1,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${indatabase_name}
+	echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":6,"name":"'processed/file$i'","timestamp":1,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${indatabase_name}
 done
 
 sleep 1
@@ -61,7 +61,8 @@ cat out | grep "Sent 5 file(s)"
 cat out | grep bt_meta
 cat out | grep st_meta
 
-echo "db.data_default.find({"_id":1})" | mongo ${outdatabase_name} | tee /dev/stderr | grep "file1_${data_source_out}"
+echo "db.data_default.find({"message_id":1})" | mongo ${outdatabase_name}
+echo "db.data_default.find({"message_id":1})" | mongo ${outdatabase_name} | tee /dev/stderr | grep "file1_${data_source_out}"
 
 cat ${receiver_folder}/processed/file1_${data_source_out} | grep hello1
 cat ${receiver_folder}/processed/file2_${data_source_out} | grep hello2
diff --git a/tests/automatic/consumer/next_multithread_broker/check_linux.sh b/tests/automatic/consumer/next_multithread_broker/check_linux.sh
index 02e486550..600b77dfa 100644
--- a/tests/automatic/consumer/next_multithread_broker/check_linux.sh
+++ b/tests/automatic/consumer/next_multithread_broker/check_linux.sh
@@ -13,7 +13,7 @@ Cleanup() {
 
 for i in `seq 1 10`;
 do
-	echo 'db.data_default.insert({"_id":'$i',"size":100,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name}
+	echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":100,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name}
 done
 
 $@ 127.0.0.1:8400 test_run 4 10 $token_test_run
diff --git a/tests/automatic/mongo_db/auto_id/auto_id.cpp b/tests/automatic/mongo_db/auto_id/auto_id.cpp
index 337360aa8..7f24aa932 100644
--- a/tests/automatic/mongo_db/auto_id/auto_id.cpp
+++ b/tests/automatic/mongo_db/auto_id/auto_id.cpp
@@ -52,14 +52,14 @@ Args GetArgs(int argc, char* argv[]) {
 }
 
 void Insert(const asapo::MongoDBClient& db, const std::string& name, asapo::MessageMeta fi, const Args& args) {
-    auto start = fi.id;
+    auto start = fi.message_id;
     for (int i = 0; i < args.n_messages_per_thread; i++) {
         switch (args.mode) {
         case Mode::kTransaction:
-            fi.id = 0;
+            fi.message_id = 0;
             break;
         case Mode::kUpdateCounterThenIngest:
-            fi.id = start + static_cast<uint64_t>(i) + 1;
+            fi.message_id = start + static_cast<uint64_t>(i) + 1;
             break;
         }
         uint64_t  inserted_id{0};
@@ -87,7 +87,8 @@ int main(int argc, char* argv[]) {
         fi.timestamp = std::chrono::system_clock::now();
         fi.buf_id = 18446744073709551615ull;
         fi.source = "host:1234";
-        fi.id = static_cast<uint64_t>(args.n_messages_per_thread * i);
+        fi.id = 0;
+        fi.message_id = static_cast<uint64_t>(args.n_messages_per_thread * i);        
         db.Connect("127.0.0.1", db_name);
         Insert(db, "stream", fi, args);
     };
diff --git a/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp b/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp
index 0beb953e4..0642a3246 100644
--- a/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp
+++ b/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp
@@ -55,7 +55,8 @@ int main(int argc, char* argv[]) {
     asapo::MessageMeta fi;
     fi.size = 100;
     fi.name = "relpath/1";
-    fi.id = static_cast<uint64_t>(args.file_id);
+    fi.id = 0;
+    fi.message_id = static_cast<uint64_t>(args.file_id);
     fi.timestamp = std::chrono::system_clock::now();
     fi.buf_id = 18446744073709551615ull;
     fi.source = "host:1234";
@@ -78,7 +79,7 @@ int main(int argc, char* argv[]) {
     std::this_thread::sleep_for(std::chrono::milliseconds(10));
     auto fi1 = fi;
     auto fi2 = fi;
-    fi2.id = 123;
+    fi2.message_id = 123;
     fi1.timestamp = std::chrono::system_clock::now();
     fi2.timestamp = std::chrono::system_clock::now() + std::chrono::minutes(1);
     fi2.name = asapo::kFinishStreamKeyword;
@@ -92,7 +93,8 @@ int main(int argc, char* argv[]) {
         asapo::MessageMeta fi_db;
         asapo::MongoDBClient db_new;
         db_new.Connect("127.0.0.1", db_name);
-        err = db_new.GetById(std::string("data_") + stream_name, fi.id, &fi_db);
+        err = db_new.GetById(std::string("data_") + stream_name, fi.message_id, &fi_db);
+        fi_db.id = fi.id;
         M_AssertTrue(fi_db == fi, "get record from db");
         M_AssertEq(nullptr, err);
         err = db_new.GetById(std::string("data_") + stream_name, 0, &fi_db);
@@ -102,11 +104,11 @@ int main(int argc, char* argv[]) {
 
         err = db.GetStreamInfo(std::string("data_") + stream_name, &info);
         M_AssertEq(nullptr, err);
-        M_AssertEq(fi.id, info.last_id);
+        M_AssertEq(fi.message_id, info.last_id);
 
         err = db.GetLastStream(&info);
         M_AssertEq(nullptr, err);
-        M_AssertEq(fi2.id, info.last_id);
+        M_AssertEq(fi2.message_id, info.last_id);
         M_AssertEq("test1", info.name);
         M_AssertEq(true, info.finished);
         M_AssertEq("ns", info.next_stream);
diff --git a/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp b/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp
index ee9cea6c0..9625eaf09 100644
--- a/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp
+++ b/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp
@@ -13,6 +13,7 @@ void Assert(const Error& error, const std::string& expect) {
     } else {
         result = error->Explain();
     }
+    std::cout << "Assertion: " << result << std::endl;
     M_AssertContains(result, expect);
 }
 
@@ -40,7 +41,8 @@ int main(int argc, char* argv[]) {
     fi.timestamp = std::chrono::system_clock::now();
     fi.buf_id = 18446744073709551615ull;
     fi.source = "host:1234";
-    fi.id = static_cast<uint64_t>(args.file_id);
+    fi.id = 0;
+    fi.message_id = static_cast<uint64_t>(args.file_id);   
     fi.dataset_substream = 10;
 
     uint64_t dataset_size = 2;
@@ -62,7 +64,12 @@ int main(int argc, char* argv[]) {
 
     if (args.keyword == "OK") { // check retrieve
         asapo::MessageMeta fi_db;
-        err = db.GetDataSetById("data_test", fi.dataset_substream, fi.id, &fi_db);
+        err = db.GetDataSetById("data_test", fi.dataset_substream, fi.message_id, &fi_db);
+        fi_db.id = 0;
+        if (err != nullptr) {
+            std::cout << "GetDataSetById failed: " << err->Explain() << std::endl; 
+        }
+
         M_AssertTrue(fi_db == fi, "get record from db");
         M_AssertEq(nullptr, err);
         err = db.GetDataSetById("data_test", 0, 0, &fi_db);
@@ -72,26 +79,30 @@ int main(int argc, char* argv[]) {
 
         err = db.GetStreamInfo("data_test", &info);
         M_AssertEq(nullptr, err);
-        M_AssertEq(fi.id, info.last_id);
+        M_AssertEq(fi.message_id, info.last_id);
 
         asapo::StreamInfo info_last;
 
         err = db.GetLastStream(&info_last);
         M_AssertEq(nullptr, err);
         M_AssertEq("test", info_last.name);
-        M_AssertEq(fi.id, info_last.last_id);
+        M_AssertEq(fi.message_id, info_last.last_id);
         M_AssertEq(false, info_last.finished);
 
         auto fi2 = fi;
         fi2.id = 123;
+        fi2.message_id = 123;
         fi2.timestamp = std::chrono::system_clock::now() + std::chrono::minutes(1);
         fi2.name = asapo::kFinishStreamKeyword;
         fi2.metadata = R"({"next_stream":"ns"})";
-        db.Insert("data_test", fi2, false, nullptr);
+        err = db.Insert("data_test", fi2, false, nullptr);
+        if (err != nullptr) {
+            std::cout << "Insert failed: " << err->Explain() << std::endl; 
+        }
         err = db.GetLastStream(&info_last);
         M_AssertEq(nullptr, err);
         M_AssertEq("test", info_last.name);
-        M_AssertEq(fi2.id, info_last.last_id);
+        M_AssertEq(fi2.message_id, info_last.last_id);
         M_AssertEq(true, info_last.finished);
         err = db.DeleteStream("test");
         M_AssertEq(nullptr, err);
diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py
index beb45a12d..2a265df09 100644
--- a/tests/automatic/producer/python_api/producer_api.py
+++ b/tests/automatic/producer/python_api/producer_api.py
@@ -21,7 +21,7 @@ nthreads = 8
 def assert_eq(val, expected, name):
     print("asserting eq for " + name)
     if val != expected:
-        print("error at " + name)
+        print("assertion error at: " + name)
         print('val: ', val, ' expected: ', expected)
         sys.exit(1)
 
@@ -100,7 +100,7 @@ try:
     producer.send(8, "processed/" + data_source + "/" + "file8", x,
                        ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback)
 except asapo_producer.AsapoWrongInputError as e:
-    print(e)
+    print("Expected error: ", e)
 else:
     print("should be error sending non-cont array")
     sys.exit(1)
@@ -109,7 +109,7 @@ try:
     producer.send(0, "processed/" + data_source + "/" + "file6", b"hello",
                        ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback)
 except asapo_producer.AsapoWrongInputError as e:
-    print(e)
+    print("Expected error: ", e)
 else:
     print("should be error sending id 0 ")
     sys.exit(1)
@@ -166,7 +166,7 @@ producer.set_requests_queue_limits(0,1)
 try:
     producer.send(11, "processed/bla", data)
 except asapo_producer.AsapoRequestsPoolIsFull as e:
-    print(e)
+    print("Expected error: ", e)
 else:
     print("should be AsapoRequestsPoolIsFull error ")
     sys.exit(1)
@@ -225,7 +225,7 @@ producer.delete_stream('unknown_stream',error_on_not_exist = False)
 try:
     producer.delete_stream('unknown_stream',error_on_not_exist = True)
 except asapo_producer.AsapoWrongInputError as e:
-    print(e)
+    print("Expected error: ", e)
 else:
     print("should be error on delete unknown stream with flag")
     sys.exit(1)
@@ -235,7 +235,7 @@ else:
 try:
     producer = asapo_producer.create_producer(endpoint,'processed', beamtime, 'auto', data_source, token, 0, 0)
 except asapo_producer.AsapoWrongInputError as e:
-    print(e)
+    print("Expected error: ", e)
 else:
     print("should be error")
     sys.exit(1)
diff --git a/tests/automatic/producer_receiver/transfer_datasets/check_linux.sh b/tests/automatic/producer_receiver/transfer_datasets/check_linux.sh
index 1adb43573..7c1769c59 100644
--- a/tests/automatic/producer_receiver/transfer_datasets/check_linux.sh
+++ b/tests/automatic/producer_receiver/transfer_datasets/check_linux.sh
@@ -28,4 +28,4 @@ ls -ln ${receiver_folder}/processed/1_1 | awk '{ print $5 }'| grep 100000
 ls -ln ${receiver_folder}/processed/1_2 | awk '{ print $5 }'| grep 100000
 ls -ln ${receiver_folder}/processed/1_3 | awk '{ print $5 }'| grep 100000
 
-echo 'db.data_default.find({"messages._id":{$gt:0}},{"messages.name":1})' | mongo asapo_test_detector | grep 1_1 | grep 1_2 | grep 1_3
+echo 'db.data_default.find({"messages.message_id":{$gt:0}},{"messages.name":1})' | mongo asapo_test_detector | grep 1_1 | grep 1_2 | grep 1_3
-- 
GitLab