diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 1d5660c1757e47387e2347be6fa2bd661db8eb27..54762061293d263de17b6124774c9fbbefb81134 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -119,7 +119,7 @@ test-services-linux-debug: - cd $CI_PROJECT_DIR - bash $CI_PROJECT_DIR/deploy/build_env/services-linux/run_asapo.sh - cd $CI_PROJECT_DIR/build - - ctest --no-compress-output -T Test -L all -E "full_chain_monitoring|noaccess|restart|logger_fluentd" --output-on-failure --output-junit testResult.xml + - ctest --no-compress-output -T Test -E "full_chain_monitoring|noaccess|restart|logger_fluentd" --output-on-failure --output-junit testResult.xml - pip3 install pytest - pytest -vv -o log_cli=true --log-cli-level=DEBUG ../tests/automatic/pytests/ diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index ac3f5c643b49311d10b4adb510b1f28e46000d9e..5b3caf4fa0797caf0ad37119e4ee55bbb72f217b 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -26,7 +26,7 @@ type ID struct { type MessageRecord struct { ID int `json:"_id"` - MessageID int `json:"message_id"` + TimeID int `json:"time_id"` Timestamp int `bson:"timestamp" json:"timestamp"` Name string `json:"name"` Meta map[string]interface{} `json:"meta"` @@ -369,10 +369,6 @@ func (db *Mongodb) getRecordByIDRaw(request Request, id, id_max int, idKey strin return nil, err } - // Copy value of d key to _id to keep back compatibility - if res["message_id"] != nil { - res["_id"] = res["message_id"] - } if err := checkStreamFinished(request, id, id_max, res); err != nil { return nil, err } @@ -633,11 +629,11 @@ func ExtractMessageRecord(data map[string]interface{}) (MessageRecord, bool) { return r, true } -//Return next (based on current pointer) available record from DB +//Return next in time (based on current pointer) available record from DB func (db *Mongodb) getNextRecordDB(request Request, currentPointer int) (map[string]interface{}, error) { var res map[string]interface{} - filter := bson.D{{"_id", bson.D{{"$gt", currentPointer}}}} - opts := options.FindOne().SetSort(bson.M{"_id": 1}) + filter := bson.D{{"time_id", bson.D{{"$gt", currentPointer}}}} + opts := options.FindOne().SetSort(bson.M{"time_id": 1}) coll := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream) err := coll.FindOne(context.TODO(), filter, opts).Decode(&res) if err != nil { @@ -680,9 +676,9 @@ func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNex } var res map[string]interface{} res, err = db.getRecordFromDb(request, nextInd, maxInd, params.IdKey) - // Missing Id! Since Ids are sequential, jump to the next available id + // Missing Id! Since time_Ids are sequential, jump to the next available id // Update current pointer to next available id - if err != nil && params.IdKey == "_id" { + if err != nil && params.IdKey == "time_id" { for { curPointer, err := db.getCurrentPointerDB(request) if err != nil { @@ -692,7 +688,7 @@ func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNex if err != nil { return nil, 0, err } - nextValue, ok := getIntVal(request, res["_id"]) + nextValue, ok := getIntVal(request, res["time_id"]) if ok != nil { return nil, 0, errors.New("failed to next next available id. Extraction of id fails") } @@ -710,10 +706,6 @@ func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNex } } - // Copy value of message_id key to _id to keep back compatibility - if res["message_id"] != nil { - res["_id"] = res["message_id"] - } if err := checkStreamFinished(request, nextInd, maxInd, res); err != nil { return nil, 0, err } diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index 0b1497a5cb2d54fd37b28ed011e2960a3fd5f1e8..c471bc56065240798c2906ff0654135e93b5225e 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -22,9 +22,9 @@ type TestRecord struct { type TestUserRecord struct { ID int64 `bson:"_id" json:"_id"` - MessageID int64 `bson:"message_id" json:"message_id"` Meta map[string]string `bson:"meta" json:"meta"` Name string `bson:"name" json:"name"` + TimeID int64 `bson:"time_id" json:"time_id"` Timestamp int64 `bson:"timestamp" json:"timestamp"` } @@ -69,9 +69,9 @@ var rec3 = TestRecord{3, empty_next, "ccc", 2} var rec_finished3 = TestRecord{3, map[string]string{"next_stream": "next1"}, finish_stream_keyword, 2} var rec_finished11 = TestRecord{11, map[string]string{"next_stream": "next1"}, finish_stream_keyword, 2} -var userRec1 = TestUserRecord{1, 1, empty_next, "aaa", 0} -var userRec2 = TestUserRecord{2, 2, empty_next, "aaa", 0} -var userRec3 = TestUserRecord{3, 2, empty_next, "aaa", 0} +var userRec1 = TestUserRecord{1, empty_next, "aaa", 1, 0} +var userRec2 = TestUserRecord{2, empty_next, "bbb", 2, 0} +var userRec3 = TestUserRecord{3, empty_next, "ccc", 3, 0} var rec1_expect, _ = json.Marshal(rec1) var rec2_expect, _ = json.Marshal(rec2) @@ -183,10 +183,10 @@ func TestMongoDBGetByIdErrorWhenNoData(t *testing.T) { func TestMongoDBGetNextIfMissingID(t *testing.T) { db.Connect(dbaddress) defer cleanup() - db.insertRecord(dbname, collection, &rec2) + db.insertRecord(dbname, collection, &userRec2) rec, _ := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, - GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)}) - assert.Equal(t, string(rec2_expect), string(rec)) + GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("time_id", false, 0, -1)}) + assert.Equal(t, string(userRec2_expect), string(rec)) } @@ -250,7 +250,7 @@ func TestMongoDBGetLastErrorOnFinishedStream(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec_finished) - res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "last"}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "last"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message) } @@ -1045,20 +1045,20 @@ func TestMongoDBGetNextUserKeyMissingID(t *testing.T) { db.insertRecord(dbname, collection, &userRec2) _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, - GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("message_id", true, 0, 1)}) + GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 1)}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":2,\"next_stream\":\"\"}", err.Error()) db.insertRecord(dbname, collection, &userRec1) rec1, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, - GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("message_id", true, 0, 1)}) + GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 1)}) rec2, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, - GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("message_id", true, 0, 1)}) + GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 1)}) - assert.Equal(t, string(userRec1_expect), string(rec1)) - assert.Equal(t, string(userRec2_expect), string(rec2)) + assert.Equal(t, string(userRec1_expect), string(rec2)) + assert.Equal(t, string(userRec2_expect), string(rec1)) } func TestMongoDBOkOnIncompleteDatasetID(t *testing.T) { diff --git a/broker/src/asapo_broker/server/get_commands_test.go b/broker/src/asapo_broker/server/get_commands_test.go index c667ab9b4f4c524f13687d186a43aa26cadce21e..00b5927a1a16218b5ddc5e56b41af6e3acfe8a47 100644 --- a/broker/src/asapo_broker/server/get_commands_test.go +++ b/broker/src/asapo_broker/server/get_commands_test.go @@ -44,7 +44,7 @@ var testsGetCommand = []struct { externalParam string }{ {"last", expectedSource, expectedStream, "", expectedStream + "/0/last", "", "0"}, - {"id", expectedSource, expectedStream, "", expectedStream + "/0/1", "&id=1", "{\"id\":1,\"id_key\":\"message_id\"}"}, + {"id", expectedSource, expectedStream, "", expectedStream + "/0/1", "&id=1", "{\"id\":1,\"id_key\":\"_id\"}"}, {"meta", expectedSource, "default", "", "default/0/meta/0", "", "0"}, {"nacks", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/nacks", "", "0_0"}, {"groupedlast", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/groupedlast", "", ""}, diff --git a/broker/src/asapo_broker/server/get_id.go b/broker/src/asapo_broker/server/get_id.go index 6fb4f652bc256bf3828bf0618da9f624949680d9..02f1bfe61b1ec3346394028fae80cfcf818eb01c 100644 --- a/broker/src/asapo_broker/server/get_id.go +++ b/broker/src/asapo_broker/server/get_id.go @@ -29,7 +29,7 @@ func getExtraParameters(r *http.Request) (string, bool) { } id_key, id_ok := vars["id_key"] if !id_ok { - id_key = "message_id" + id_key = "_id" } extraParam := database.ExtraParamId{ Id: id_int, diff --git a/common/cpp/include/asapo/common/data_structs.h b/common/cpp/include/asapo/common/data_structs.h index a54b7d30cc16ab7c25069f231fd364e79af506c1..cdc5a83d9e37e7b4de9fa91cf249a56e741197dc 100644 --- a/common/cpp/include/asapo/common/data_structs.h +++ b/common/cpp/include/asapo/common/data_structs.h @@ -35,7 +35,7 @@ class MessageMeta { std::chrono::system_clock::time_point timestamp; uint64_t size{0}; uint64_t id{0}; - uint64_t message_id{0}; + uint64_t time_id{0}; std::string source; std::string ib_source; std::string metadata; diff --git a/common/cpp/src/data_structs/data_structs.cpp b/common/cpp/src/data_structs/data_structs.cpp index e32d4c4c163d67f88d53fbab70667fd1b4bcdf41..9fb97f856a57af3c397e06f39aad03f5e003efa9 100644 --- a/common/cpp/src/data_structs/data_structs.cpp +++ b/common/cpp/src/data_structs/data_structs.cpp @@ -64,7 +64,7 @@ std::string MessageMeta::Json() const { int64_t buf_id_int = static_cast<int64_t>(buf_id); std::string s = "{\"_id\":" + std::to_string(id) + "," - "\"message_id\":" + std::to_string(message_id) + "," + "\"time_id\":" + std::to_string(time_id) + "," "\"size\":" + std::to_string(size) + "," "\"name\":\"" + x + "\"," "\"timestamp\":" diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp index 3723e1da50cdeeb3476020fe14185e208da08441..815c911a97350fcae692c465c40d28219fc42cea 100644 --- a/common/cpp/src/database/mongodb_client.cpp +++ b/common/cpp/src/database/mongodb_client.cpp @@ -87,7 +87,7 @@ Error MongoDBClient::UpdateCurrentCollectionIfNeeded(const std::string& collecti current_collection_name_ = collection_name; mongoc_collection_set_write_concern(current_collection_, write_concern_); - // Create index `message_id` for collection of asapo messages + // Create index `time_id` for collection of asapo messages // ToDo cache streams with already created indices if (collection_name.rfind(kDBDataCollectionNamePrefix, 0) != 0){ return nullptr; @@ -112,7 +112,7 @@ Error MongoDBClient::CreateIndex(const std::string& collection_name) const { db = mongoc_client_get_database (client_, database_name_.c_str()); bson_init (&keys); - BSON_APPEND_INT32 (&keys, "message_id", 1); + BSON_APPEND_INT32 (&keys, "time_id", 1); index_name = mongoc_collection_keys_to_index_string (&keys); create_indexes = BCON_NEW ("createIndexes", BCON_UTF8 (collection_name.c_str()), @@ -394,17 +394,17 @@ Error MongoDBClient::GetNextId(const std::string& stream, uint64_t* id) const { Error MongoDBClient::InsertWithAutoId(const MessageMeta& file, uint64_t* id_inserted) const { - uint64_t id; - auto err = GetNextId(current_collection_name_, &id); + uint64_t time_id; + auto err = GetNextId(current_collection_name_, &time_id); if (err != nullptr) { return err; } auto meta_new = file; - meta_new.id = id; - // Inset with auto ID - if (file.message_id == 0){ - meta_new.message_id = id; + meta_new.time_id = time_id; + // Insert meta with auto ID + if (file.id == 0){ + meta_new.id = time_id; } return Insert(current_collection_name_, meta_new, false, id_inserted); } @@ -420,7 +420,7 @@ Error MongoDBClient::Insert(const std::string& collection, const MessageMeta& fi return err; } - if (file.id == 0) { + if (file.time_id == 0) { return InsertWithAutoId(file, id_inserted); } @@ -528,8 +528,8 @@ Error MongoDBClient::InsertAsDatasetMessage(const std::string& collection, const return err; } auto query = - BCON_NEW ("$and", "[", "{", "_id", BCON_INT64(static_cast<int64_t>(file.message_id)), "}", - "{", "message_id", BCON_INT64(static_cast<int64_t>(file.message_id)), "}", + BCON_NEW ("$and", "[", "{", "_id", BCON_INT64(static_cast<int64_t>(file.id)), "}", + "{", "time_id", BCON_INT64(static_cast<int64_t>(file.id)), "}", "{", "messages.dataset_substream", "{", "$ne", BCON_INT64(static_cast<int64_t>(file.dataset_substream)), "}", "}", "]"); @@ -577,17 +577,17 @@ Error MongoDBClient::GetRecordFromDb(const std::string& collection, uint64_t id, filter = BCON_NEW ("_id", BCON_INT64(static_cast<int64_t>(id))); opts = BCON_NEW ("limit", BCON_INT64(1)); break; - case GetRecordMode::kByMessageId: - filter = BCON_NEW ("message_id", BCON_INT64(static_cast<int64_t>(id))); + case GetRecordMode::kByTimeId: + filter = BCON_NEW ("time_id", BCON_INT64(static_cast<int64_t>(id))); opts = BCON_NEW ("limit", BCON_INT64(1)); break; - case GetRecordMode::kLast: + case GetRecordMode::kLastId: filter = BCON_NEW (NULL); opts = BCON_NEW ("limit", BCON_INT64(1), "sort", "{", "_id", BCON_INT64(-1), "}"); break; - case GetRecordMode::kLastMessageId: + case GetRecordMode::kLastTimeId: filter = BCON_NEW (NULL); - opts = BCON_NEW ("limit", BCON_INT64(1), "sort", "{", "message_id", BCON_INT64(-1), "}"); + opts = BCON_NEW ("limit", BCON_INT64(1), "sort", "{", "time_id", BCON_INT64(-1), "}"); break; case GetRecordMode::kEarliest: filter = BCON_NEW (NULL); @@ -622,7 +622,7 @@ Error MongoDBClient::GetRecordFromDb(const std::string& collection, uint64_t id, Error MongoDBClient::GetById(const std::string& collection, uint64_t id, MessageMeta* file) const { std::string record_str; - auto err = GetRecordFromDb(collection, id, "", GetRecordMode::kByMessageId, &record_str); + auto err = GetRecordFromDb(collection, id, "", GetRecordMode::kById, &record_str); if (err) { return err; } @@ -712,9 +712,9 @@ Error UpdateStreamInfoFromLastRecord(const std::string& last_record_str, return DBErrorTemplates::kJsonParseError.Generate( "UpdateStreamInfoFromLastRecord: cannot parse timestamp in response: " + last_record_str); } - if (parser.GetUInt64("message_id", &id) != nullptr) { + if (parser.GetUInt64("_id", &id) != nullptr) { return DBErrorTemplates::kJsonParseError.Generate( - "UpdateStreamInfoFromLastRecord: cannot parse message_id in response: " + last_record_str); + "UpdateStreamInfoFromLastRecord: cannot parse _id in response: " + last_record_str); } info->timestamp_lastentry = timestamp_last; @@ -740,7 +740,7 @@ Error StreamInfoFromDbResponse(const std::string& last_record_str, Error MongoDBClient::GetStreamInfo(const std::string& collection, StreamInfo* info) const { std::string last_record_str, earliest_record_str; - auto err = GetRecordFromDb(collection, 0, "", GetRecordMode::kLastMessageId, &last_record_str); + auto err = GetRecordFromDb(collection, 0, "", GetRecordMode::kLastId, &last_record_str); if (err) { if (err == DBErrorTemplates::kNoRecord) { // with noRecord error it will return last_id = 0 which can be used to understand that the stream is not started yet diff --git a/common/cpp/src/database/mongodb_client.h b/common/cpp/src/database/mongodb_client.h index 1681d597a0b80a5a2d2996ae4535be79c2876f42..29b14a90869387b2ea9e69785b781053dbbdd163 100644 --- a/common/cpp/src/database/mongodb_client.h +++ b/common/cpp/src/database/mongodb_client.h @@ -35,11 +35,11 @@ using bson_p = std::unique_ptr<_bson_t, BsonDestroyFunctor>; enum class GetRecordMode { kById, - kByMessageId, - kLast, + kByTimeId, + kLastId, kEarliest, kByStringId, - kLastMessageId, + kLastTimeId, }; const size_t maxDbNameLength = 63; diff --git a/common/cpp/unittests/data_structs/test_data_structs.cpp b/common/cpp/unittests/data_structs/test_data_structs.cpp index af3679afa3a7732254eca0fb9aad3ab79c8a5060..9d212539d10e230732ded003089288fa16a4e55e 100644 --- a/common/cpp/unittests/data_structs/test_data_structs.cpp +++ b/common/cpp/unittests/data_structs/test_data_structs.cpp @@ -30,7 +30,7 @@ MessageMeta PrepareMessageMeta(bool includeNewStreamField = true) { MessageMeta message_meta; message_meta.size = 100; message_meta.id = 1; - message_meta.message_id = 1; + message_meta.time_id = 1; message_meta.dataset_substream = 3; message_meta.name = std::string("folder") + asapo::kPathSeparator + "test"; message_meta.source = "host:1234"; @@ -58,10 +58,10 @@ TEST(MessageMetaTests, CorrectConvertToJson) { std::string json = message_meta.Json(); if (asapo::kPathSeparator == '/') { ASSERT_THAT(json, Eq( - R"({"_id":1,"message_id":1,"size":100,"name":"folder/test","timestamp":1000000,"source":"host:1234","ib_source":"","buf_id":-1,"stream":"testStream","dataset_substream":3,"ingest_mode":13,"meta":{"bla":10}})")); + R"({"_id":1,"time_id":1,"size":100,"name":"folder/test","timestamp":1000000,"source":"host:1234","ib_source":"","buf_id":-1,"stream":"testStream","dataset_substream":3,"ingest_mode":13,"meta":{"bla":10}})")); } else { ASSERT_THAT(json, Eq( - R"({"_id":1,"message_id":1,"size":100,"name":"folder\\test","timestamp":1000000,"source":"host:1234","ib_source":"","buf_id":-1,"stream":"testStream","dataset_substream":3,"ingest_mode":13,"meta":{"bla":10}})")); + R"({"_id":1,"time_id":1,"size":100,"name":"folder\\test","timestamp":1000000,"source":"host:1234","ib_source":"","buf_id":-1,"stream":"testStream","dataset_substream":3,"ingest_mode":13,"meta":{"bla":10}})")); } } diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index 4f5da1cf198872f7ff8d1e87a6f14f9ee08f364d..71841270aa524455b7d7d01eee7fe30c250adc01 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -347,9 +347,9 @@ Error ConsumerImpl::GetRecordFromServer(std::string* response, std::string group std::to_string(delay_ms_) + "&resend_attempts=" + std::to_string(resend_attempts_); } if (op == GetMessageServerOperation::GetNextAvailable) { - ri.extra_params += "&id_key=_id"; + ri.extra_params += "&id_key=time_id"; } else { - ri.extra_params += "&id_key=message_id"; + ri.extra_params += "&id_key=_id"; } RequestOutput output; err = ProcessRequest(&output, ri, ¤t_broker_uri_); diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp index e0ac762566983a245bedb87ee7410ccbcb59f3fa..3922d85a58feb664f477756e4afea2d937a3a2b4 100644 --- a/consumer/api/cpp/unittests/test_consumer_impl.cpp +++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp @@ -269,7 +269,7 @@ TEST_F(ConsumerImplTests, DefaultStreamIsDetector) { asapo::SourceType::kProcessed, "instance", "step", "beamtime_id", "", "", expected_token }, "/beamtime/beamtime_id/detector/stream/" + expected_group_id_encoded + "/next?token=" + expected_token - + "&instanceid=instance&pipelinestep=step&id_key=message_id"); + + "&instanceid=instance&pipelinestep=step&id_key=_id"); } TEST_F(ConsumerImplTests, DefaultPipelineStepIsDefaultStep) { @@ -278,7 +278,7 @@ TEST_F(ConsumerImplTests, DefaultPipelineStepIsDefaultStep) { asapo::SourceType::kProcessed, "instance", "", "beamtime_id", "a", "b", expected_token }, "/beamtime/beamtime_id/b/stream/" + expected_group_id_encoded + "/next?token=" + expected_token - + "&instanceid=instance&pipelinestep=DefaultStep&id_key=message_id"); + + "&instanceid=instance&pipelinestep=DefaultStep&id_key=_id"); } TEST_F(ConsumerImplTests, AutoPipelineStepIsDefaultStep) { @@ -287,7 +287,7 @@ TEST_F(ConsumerImplTests, AutoPipelineStepIsDefaultStep) { asapo::SourceType::kProcessed, "instance", "auto", "beamtime_id", "a", "b", expected_token }, "/beamtime/beamtime_id/b/stream/" + expected_group_id_encoded + "/next?token=" + expected_token - + "&instanceid=instance&pipelinestep=DefaultStep&id_key=message_id"); + + "&instanceid=instance&pipelinestep=DefaultStep&id_key=_id"); } /* @@ -330,7 +330,7 @@ TEST_F(ConsumerImplTests, GetNextUsesCorrectUriWithStream) { expected_stream_encoded + "/" + expected_group_id_encoded + "/next?token=" + expected_token + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded - + "&id_key=message_id", _, + + "&id_key=_id", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), @@ -347,7 +347,7 @@ TEST_F(ConsumerImplTests, GetLastOnceUsesCorrectUri) { "/" + expected_group_id_encoded + "/groupedlast?token=" + expected_token + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded - + "&id_key=message_id", _, + + "&id_key=_id", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), @@ -362,7 +362,7 @@ TEST_F(ConsumerImplTests, GetLastUsesCorrectUri) { Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" + expected_stream_encoded + "/0/last?token=" + expected_token + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded - + "&id_key=message_id", _, + + "&id_key=_id", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), @@ -548,7 +548,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsNoDataAfterTimeoutEvenIfOtherErrorOcc "/stream/0/" + std::to_string(expected_dataset_id) + "?token=" + expected_token + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded - + "&id_key=message_id" + + "&id_key=_id" , _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll( SetArgPointee<1>(HttpCode::ServiceUnavailable), SetArgPointee<2>(nullptr), @@ -1070,7 +1070,7 @@ TEST_F(ConsumerImplTests, GetNextDatasetUsesCorrectUri) { expected_group_id_encoded + "/next?token=" + expected_token + "&dataset=true&minsize=0" + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded - + "&id_key=message_id", _, + + "&id_key=_id", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), @@ -1208,7 +1208,7 @@ TEST_F(ConsumerImplTests, GetLastDatasetUsesCorrectUri) { expected_stream_encoded + "/0/last?token=" + expected_token + "&dataset=true&minsize=1" + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded - + "&id_key=message_id", _, + + "&id_key=_id", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), @@ -1226,7 +1226,7 @@ TEST_F(ConsumerImplTests, GetLastDatasetInGroupUsesCorrectUri) { expected_stream_encoded + "/" + expected_group_id_encoded + "/groupedlast?token=" + expected_token + "&dataset=true&minsize=1" + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded - + "&id_key=message_id", _, + + "&id_key=_id", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), @@ -1552,7 +1552,7 @@ TEST_F(ConsumerImplTests, ResendNacks) { + expected_group_id_encoded + "/next?token=" + expected_token + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded - + "&resend_nacks=true&delay_ms=10000&resend_attempts=3&id_key=message_id", _, + + "&resend_nacks=true&delay_ms=10000&resend_attempts=3&id_key=_id", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), diff --git a/examples/consumer/getnext_python/check_linux.sh b/examples/consumer/getnext_python/check_linux.sh index 8ab598fcea4b1c91377d2bc0dd10c6db9b21544b..40fbadd8d95a037d92be7456fb2b997dcad3fa1c 100644 --- a/examples/consumer/getnext_python/check_linux.sh +++ b/examples/consumer/getnext_python/check_linux.sh @@ -17,7 +17,7 @@ Cleanup() { for i in `seq 1 3`; do - echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":100,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data_default.insert({"_id":NumberInt('$i'),"time_id":NumberInt('$i'),"size":100,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} done echo 'db.meta.insert({"_id":"bt","meta":{"meta_test":"test"}})' | mongo ${database_name} diff --git a/examples/pipeline/in_to_out/check_linux.sh b/examples/pipeline/in_to_out/check_linux.sh index 0a41099f9993f2aed10c1c90ffb58a9d295aa948..7d305a868bc9194d3afad9760c46b308d317a6cb 100644 --- a/examples/pipeline/in_to_out/check_linux.sh +++ b/examples/pipeline/in_to_out/check_linux.sh @@ -42,7 +42,7 @@ echo hello3 > processed/file3 for i in `seq 1 3`; do - echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":6,"name":"'processed/file$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${indatabase_name} + echo 'db.data_default.insert({"_id":NumberInt('$i'),"time_id":NumberInt('$i'),"size":6,"name":"'processed/file$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${indatabase_name} done sleep 1 @@ -52,7 +52,7 @@ cat out cat out | grep "Processed 3 file(s)" cat out | grep "Sent 3 file(s)" -echo "db.data_default.find({"message_id":1})" | mongo ${outdatabase_name} | tee /dev/stderr | grep file1_${data_source_out} +echo "db.data_default.find({"_id":1})" | mongo ${outdatabase_name} | tee /dev/stderr | grep file1_${data_source_out} cat ${receiver_folder}/processed/file1_${data_source_out} | grep hello1 cat ${receiver_folder}/processed/file2_${data_source_out} | grep hello2 @@ -61,4 +61,4 @@ cat ${receiver_folder}/processed/file3_${data_source_out} | grep hello3 $1 127.0.0.1:8400 $source_path $beamtime_id $data_source_in $data_source_out2 $token 2 1000 25000 0 > out2 cat out2 test ! -f ${receiver_folder}/processed/file1_${data_source_out2} -echo "db.data_default.find({"message_id":1})" | mongo ${outdatabase_name2} | tee /dev/stderr | grep processed/file1 +echo "db.data_default.find({"_id":1})" | mongo ${outdatabase_name2} | tee /dev/stderr | grep processed/file1 diff --git a/examples/pipeline/in_to_out_python/check_linux.sh b/examples/pipeline/in_to_out_python/check_linux.sh index 6a3f447463bc13e2d2feddb01a47899c3bccc6fa..5b0bc11959845e986e6893d655a22fcb4b559860 100644 --- a/examples/pipeline/in_to_out_python/check_linux.sh +++ b/examples/pipeline/in_to_out_python/check_linux.sh @@ -46,7 +46,7 @@ echo hello3 > processed/file3 for i in `seq 1 3`; do - echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":6,"name":"'processed/file$i'","timestamp":1,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${indatabase_name} + echo 'db.data_default.insert({"_id":NumberInt('$i'),"time_id":NumberInt('$i'),"size":6,"name":"'processed/file$i'","timestamp":1,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${indatabase_name} done sleep 1 @@ -61,8 +61,8 @@ cat out | grep "Sent 5 file(s)" cat out | grep bt_meta cat out | grep st_meta -echo "db.data_default.find({"message_id":1})" | mongo ${outdatabase_name} -echo "db.data_default.find({"message_id":1})" | mongo ${outdatabase_name} | tee /dev/stderr | grep "file1_${data_source_out}" +echo "db.data_default.find({"time_id":1})" | mongo ${outdatabase_name} +echo "db.data_default.find({"time_id":1})" | mongo ${outdatabase_name} | tee /dev/stderr | grep "file1_${data_source_out}" cat ${receiver_folder}/processed/file1_${data_source_out} | grep hello1 cat ${receiver_folder}/processed/file2_${data_source_out} | grep hello2 diff --git a/receiver/src/request_handler/request_handler_db_write.cpp b/receiver/src/request_handler/request_handler_db_write.cpp index 556ebd1797bb1e14ce5dfaaf60f53dc2b0bb37ec..869605580f1d6423b5784190bd23d04a2c853bbb 100644 --- a/receiver/src/request_handler/request_handler_db_write.cpp +++ b/receiver/src/request_handler/request_handler_db_write.cpp @@ -77,8 +77,8 @@ MessageMeta RequestHandlerDbWrite::PrepareMessageMeta(const Request* request) co MessageMeta message_meta; message_meta.name = request->GetFileName(); message_meta.size = request->GetDataSize(); - message_meta.id = 0; - message_meta.message_id = request->GetDataID(); + message_meta.id = request->GetDataID(); + message_meta.time_id = 0; message_meta.ingest_mode = request->GetIngestMode(); message_meta.buf_id = request->GetSlotId(); message_meta.stream = request->GetStream(); diff --git a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp index c1a30692bad8377e9952f4b8a0e81f3c79be73aa..45c3aa3ad74a3040ff1557690840c2ea1a245715 100644 --- a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp @@ -168,8 +168,8 @@ MessageMeta DbWriterHandlerTests::PrepareMessageMeta(bool substream) { MessageMeta message_meta; message_meta.size = expected_file_size; message_meta.name = expected_file_name; - message_meta.id = 0; - message_meta.message_id = expected_id; + message_meta.id = expected_id; + message_meta.time_id = 0; message_meta.ingest_mode = expected_ingest_mode; if (substream) { message_meta.dataset_substream = expected_substream; diff --git a/tests/automatic/broker/get_last/check_linux.sh b/tests/automatic/broker/get_last/check_linux.sh index aaf6f17f9293427d7820cfb4cb5ae14e4632ca14..86f8ca1ea0ede0f11b8627af8257a6f564a96338 100644 --- a/tests/automatic/broker/get_last/check_linux.sh +++ b/tests/automatic/broker/get_last/check_linux.sh @@ -12,8 +12,8 @@ Cleanup() { echo "db.dropDatabase()" | mongo ${database_name} } -echo "db.data_${stream}.insert({"_id":NumberInt(2),"message_id":NumberInt(2)})" | mongo ${database_name} -echo "db.data_${stream}.insert({"_id":NumberInt(1),"message_id":NumberInt(1)})" | mongo ${database_name} +echo "db.data_${stream}.insert({"_id":NumberInt(2),"time_id":NumberInt(2)})" | mongo ${database_name} +echo "db.data_${stream}.insert({"_id":NumberInt(1),"time_id":NumberInt(1)})" | mongo ${database_name} token=$BT_DATA_TOKEN @@ -23,21 +23,21 @@ echo found broker at $broker groupid=`curl -d '' --silent $broker/v0.2/creategroup` -curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=message_id" --stderr - +curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=_id" --stderr - -curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=message_id" --stderr - | grep '"_id":2' -curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=message_id" --stderr - | grep '"_id":2' +curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=_id" --stderr - | grep '"_id":2' +curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=_id" --stderr - | grep '"_id":2' -echo "db.data_${stream}.insert({"_id":NumberInt(3),"message_id":NumberInt(3)})"| mongo ${database_name} +echo "db.data_${stream}.insert({"_id":NumberInt(3),"time_id":NumberInt(3)})"| mongo ${database_name} -curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=message_id" --stderr - | grep '"_id":3' +curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=_id" --stderr - | grep '"_id":3' -echo "db.data_${stream}.insert({"_id":NumberInt(4),"message_id":NumberInt(4)})" | mongo ${database_name} +echo "db.data_${stream}.insert({"_id":NumberInt(4),"time_id":NumberInt(4)})" | mongo ${database_name} -curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/${groupid}/next?token=${token}&id_key=message_id" --stderr - | grep '"_id":1' -curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=message_id" --stderr - | grep '"_id":4' +curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/${groupid}/next?token=${token}&id_key=_id" --stderr - | grep '"_id":1' +curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=_id" --stderr - | grep '"_id":4' #with a new group groupid=`curl -d '' --silent $broker/v0.2/creategroup` -curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/${groupid}/next?token=${token}&id_key=message_id" --stderr - | grep '"_id":1' -curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=message_id" --stderr - | grep '"_id":4' \ No newline at end of file +curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/${groupid}/next?token=${token}&id_key=_id" --stderr - | grep '"_id":1' +curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=_id" --stderr - | grep '"_id":4' \ No newline at end of file diff --git a/tests/automatic/broker/get_next/check_linux.sh b/tests/automatic/broker/get_next/check_linux.sh index aec3b15000e64ca0b2e527f2e4cb790791dc4ce6..f652887449dde3765f820af6984f2b596ed027a8 100644 --- a/tests/automatic/broker/get_next/check_linux.sh +++ b/tests/automatic/broker/get_next/check_linux.sh @@ -12,8 +12,8 @@ Cleanup() { echo "db.dropDatabase()" | mongo ${database_name} } -echo "db.data_${stream}.insert({"_id":NumberInt(2),"message_id":NumberInt(1)})" | mongo ${database_name} -echo "db.data_${stream}.insert({"_id":NumberInt(1),"message_id":NumberInt(2)})" | mongo ${database_name} +echo "db.data_${stream}.insert({"_id":NumberInt(2),"time_id":NumberInt(1)})" | mongo ${database_name} +echo "db.data_${stream}.insert({"_id":NumberInt(1),"time_id":NumberInt(2)})" | mongo ${database_name} token=$BT_DATA_TOKEN @@ -21,10 +21,10 @@ broker=`curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol= echo found broker at $broker groupid=`curl -d '' --silent $broker/v0.3/creategroup` -curl -v --silent "${broker}/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=${token}&id_key=message_id" --stderr - | tee /dev/stderr | grep '"message_id":1' -curl -v --silent "${broker}/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=${token}&id_key=message_id" --stderr - | tee /dev/stderr | grep '"message_id":2' -curl -v --silent "${broker}/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=${token}&id_key=message_id" --stderr - | tee /dev/stderr | grep '"id_max":2' +curl -v --silent "${broker}/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=${token}&id_key=_id" --stderr - | tee /dev/stderr | grep '"_id":1' +curl -v --silent "${broker}/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=${token}&id_key=_id" --stderr - | tee /dev/stderr | grep '"_id":2' +curl -v --silent "${broker}/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=${token}&id_key=_id" --stderr - | tee /dev/stderr | grep '"id_max":2' # with a new group groupid=`curl -d '' --silent $broker/v0.3/creategroup` -curl -v --silent "${broker}/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=${token}&id_key=message_id" --stderr - | tee /dev/stderr | grep '"message_id":1' \ No newline at end of file +curl -v --silent "${broker}/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=${token}&id_key=_id" --stderr - | tee /dev/stderr | grep '"_id":1' \ No newline at end of file diff --git a/tests/automatic/consumer/consumer_api/check_linux.sh b/tests/automatic/consumer/consumer_api/check_linux.sh index 63d46610c70a72ba4ddcdfc4f989f9371ddb0cb1..f8a170f4c0f622aebf8ad74852b870cb523dce0d 100644 --- a/tests/automatic/consumer/consumer_api/check_linux.sh +++ b/tests/automatic/consumer/consumer_api/check_linux.sh @@ -18,21 +18,21 @@ Cleanup() { for i in `seq 1 10`; do - echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":6,"name":"'$i'","timestamp":'10-$i',"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} >/dev/null + echo 'db.data_default.insert({"_id":NumberInt('$i'),"time_id":NumberInt('$i'),"size":6,"name":"'$i'","timestamp":'10-$i',"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} >/dev/null done for i in `seq 1 5`; do - echo 'db.data_stream1.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":6,"name":"'1$i'","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} >/dev/null + echo 'db.data_stream1.insert({"_id":NumberInt('$i'),"time_id":NumberInt('$i'),"size":6,"name":"'1$i'","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} >/dev/null done -echo 'db.data_stream1.insert({"_id":NumberInt(6),"message_id":NumberInt(6),"size":0,"name":"asapo_finish_stream","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"ns"}})' | mongo ${database_name} >/dev/null +echo 'db.data_stream1.insert({"_id":NumberInt(6),"time_id":NumberInt(6),"size":0,"name":"asapo_finish_stream","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"ns"}})' | mongo ${database_name} >/dev/null for i in `seq 1 5`; do - echo 'db.data_stream2.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":6,"name":"'2$i'","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} >/dev/null + echo 'db.data_stream2.insert({"_id":NumberInt('$i'),"time_id":NumberInt('$i'),"size":6,"name":"'2$i'","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} >/dev/null done -echo 'db.data_stream2.insert({"_id":NumberInt(6),"message_id":NumberInt(6),"size":0,"name":"asapo_finish_stream","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"asapo_no_next"}})' | mongo ${database_name} >/dev/null +echo 'db.data_stream2.insert({"_id":NumberInt(6),"time_id":NumberInt(6),"size":0,"name":"asapo_finish_stream","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"asapo_no_next"}})' | mongo ${database_name} >/dev/null echo hello1 > 1 @@ -49,10 +49,10 @@ do messages='' for j in `seq 1 3`; do - messages="$messages,{"_id":$j,"message_id":$j,"size":6,"name":'${i}_${j}',"timestamp":1000,"source":'none',"buf_id":0,"dataset_substream":0,"meta":{"test":10}}" >/dev/null + messages="$messages,{"_id":$j,"time_id":$j,"size":6,"name":'${i}_${j}',"timestamp":1000,"source":'none',"buf_id":0,"dataset_substream":0,"meta":{"test":10}}" >/dev/null done messages=${messages#?} - echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":3,"messages":['$messages']})' | mongo ${database_name} >/dev/null + echo 'db.data_default.insert({"_id":NumberInt('$i'),"time_id":NumberInt('$i'),"size":3,"messages":['$messages']})' | mongo ${database_name} >/dev/null done for i in `seq 1 5`; @@ -60,10 +60,10 @@ do messages='' for j in `seq 1 2`; do - messages="$messages,{"_id":$j,"message_id":$j,"size":6,"name":'${i}_${j}',"timestamp":1000,"source":'none',"buf_id":0,"dataset_substream":0,"meta":{"test":10}}" >/dev/null + messages="$messages,{"_id":$j,"time_id":$j,"size":6,"name":'${i}_${j}',"timestamp":1000,"source":'none',"buf_id":0,"dataset_substream":0,"meta":{"test":10}}" >/dev/null done messages=${messages#?} - echo 'db.data_incomplete.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":3,"messages":['$messages']})' | mongo ${database_name} >/dev/null + echo 'db.data_incomplete.insert({"_id":NumberInt('$i'),"time_id":NumberInt('$i'),"size":3,"messages":['$messages']})' | mongo ${database_name} >/dev/null done diff --git a/tests/automatic/consumer/consumer_api_python/check_linux.sh b/tests/automatic/consumer/consumer_api_python/check_linux.sh index 5a912611948ae322de245503945fa9e6cfa6be8a..9161e5c964d44393cf28f1f702fa76009080f938 100644 --- a/tests/automatic/consumer/consumer_api_python/check_linux.sh +++ b/tests/automatic/consumer/consumer_api_python/check_linux.sh @@ -23,10 +23,10 @@ echo -n hello1 > $source_path/1_1 for i in `seq 1 5`; do - echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":6,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data_default.insert({"_id":NumberInt('$i'),"time_id":NumberInt('$i'),"size":6,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} done -echo 'db.data_streamfts.insert({"_id":NumberInt(1),"message_id":NumberInt(1),"size":0,"name":"'1'","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} +echo 'db.data_streamfts.insert({"_id":NumberInt(1),"time_id":NumberInt(1),"size":0,"name":"'1'","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} echo 'db.meta.insert({"_id":"bt","meta":{"data":"test_bt"}})' | mongo ${database_name} echo 'db.meta.insert({"_id":"st_test","meta":{"data":"test_st"}})' | mongo ${database_name} @@ -34,16 +34,16 @@ echo 'db.meta.insert({"_id":"st_test","meta":{"data":"test_st"}})' | mongo ${dat for i in `seq 1 5`; do - echo 'db.data_stream1.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":6,"name":"'1$i'","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data_stream1.insert({"_id":NumberInt('$i'),"time_id":NumberInt('$i'),"size":6,"name":"'1$i'","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} done for i in `seq 1 5`; do - echo 'db.data_stream2.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":6,"name":"'2$i'","timestamp":3000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data_stream2.insert({"_id":NumberInt('$i'),"time_id":NumberInt('$i'),"size":6,"name":"'2$i'","timestamp":3000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} done -echo 'db.data_stream1.insert({"_id":NumberInt(6),"message_id":NumberInt(6),"size":0,"name":"asapo_finish_stream","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"ns"}})' | mongo ${database_name} -echo 'db.data_stream2.insert({"_id":NumberInt(6),"message_id":NumberInt(6),"size":0,"name":"asapo_finish_stream","timestamp":3000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"asapo_no_next"}})' | mongo ${database_name} +echo 'db.data_stream1.insert({"_id":NumberInt(6),"time_id":NumberInt(6),"size":0,"name":"asapo_finish_stream","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"ns"}})' | mongo ${database_name} +echo 'db.data_stream2.insert({"_id":NumberInt(6),"time_id":NumberInt(6),"size":0,"name":"asapo_finish_stream","timestamp":3000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"asapo_no_next"}})' | mongo ${database_name} sleep 1 @@ -62,10 +62,10 @@ do messages='' for j in `seq 1 3`; do - messages="$messages,{"_id":$j,"message_id":$j,"size":6,"name":'${i}_${j}',"timestamp":0,"source":'none',"buf_id":0,"dataset_substream":0,"meta":{"test":10}}" >/dev/null + messages="$messages,{"_id":$j,"time_id":$j,"size":6,"name":'${i}_${j}',"timestamp":0,"source":'none',"buf_id":0,"dataset_substream":0,"meta":{"test":10}}" >/dev/null done messages=${messages#?} - echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":3,"messages":['$messages']})' | mongo ${database_name} >/dev/null + echo 'db.data_default.insert({"_id":NumberInt('$i'),"time_id":NumberInt('$i'),"size":3,"messages":['$messages']})' | mongo ${database_name} >/dev/null done @@ -74,10 +74,10 @@ do messages='' for j in `seq 1 2`; do - messages="$messages,{"_id":NumberInt('$j'),"message_id":NumberInt('$j'),"size":6,"name":'${i}_${j}',"timestamp":1000,"source":'none',"buf_id":0,"dataset_substream":0,"meta":{"test":10}}" + messages="$messages,{"_id":NumberInt('$j'),"time_id":NumberInt('$j'),"size":6,"name":'${i}_${j}',"timestamp":1000,"source":'none',"buf_id":0,"dataset_substream":0,"meta":{"test":10}}" done messages=${messages#?} - echo 'db.data_incomplete.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":3,"messages":['$messages']})' | mongo ${database_name} + echo 'db.data_incomplete.insert({"_id":NumberInt('$i'),"time_id":NumberInt('$i'),"size":3,"messages":['$messages']})' | mongo ${database_name} done diff --git a/tests/automatic/consumer/next_multithread_broker/check_linux.sh b/tests/automatic/consumer/next_multithread_broker/check_linux.sh index 600b77dfaed3ab66eb7d2940aa6060dc2d42616e..fb0a64fb88ea092e5cd824420b098484ded1a877 100644 --- a/tests/automatic/consumer/next_multithread_broker/check_linux.sh +++ b/tests/automatic/consumer/next_multithread_broker/check_linux.sh @@ -13,7 +13,7 @@ Cleanup() { for i in `seq 1 10`; do - echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":100,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data_default.insert({"_id":NumberInt('$i'),"time_id":NumberInt('$i'),"size":100,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} done $@ 127.0.0.1:8400 test_run 4 10 $token_test_run diff --git a/tests/automatic/mongo_db/auto_id/auto_id.cpp b/tests/automatic/mongo_db/auto_id/auto_id.cpp index 7f24aa932928825dafc79319d9551075d13f6660..6c44070a741d23fcdd0302782af3ae0cf0844606 100644 --- a/tests/automatic/mongo_db/auto_id/auto_id.cpp +++ b/tests/automatic/mongo_db/auto_id/auto_id.cpp @@ -52,14 +52,14 @@ Args GetArgs(int argc, char* argv[]) { } void Insert(const asapo::MongoDBClient& db, const std::string& name, asapo::MessageMeta fi, const Args& args) { - auto start = fi.message_id; + auto start = fi.id; for (int i = 0; i < args.n_messages_per_thread; i++) { switch (args.mode) { case Mode::kTransaction: - fi.message_id = 0; + fi.id = 0; break; case Mode::kUpdateCounterThenIngest: - fi.message_id = start + static_cast<uint64_t>(i) + 1; + fi.id = start + static_cast<uint64_t>(i) + 1; break; } uint64_t inserted_id{0}; @@ -87,8 +87,8 @@ int main(int argc, char* argv[]) { fi.timestamp = std::chrono::system_clock::now(); fi.buf_id = 18446744073709551615ull; fi.source = "host:1234"; - fi.id = 0; - fi.message_id = static_cast<uint64_t>(args.n_messages_per_thread * i); + fi.time_id = 0; + fi.id = static_cast<uint64_t>(args.n_messages_per_thread * i); db.Connect("127.0.0.1", db_name); Insert(db, "stream", fi, args); }; diff --git a/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp b/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp index 0642a3246fa7bd29ca49a9e02b4c04981bb98d15..fe0d4ec018b500b416209fc9b351a298c82ee680 100644 --- a/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp +++ b/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp @@ -55,8 +55,8 @@ int main(int argc, char* argv[]) { asapo::MessageMeta fi; fi.size = 100; fi.name = "relpath/1"; - fi.id = 0; - fi.message_id = static_cast<uint64_t>(args.file_id); + fi.time_id = 0; + fi.id = static_cast<uint64_t>(args.file_id); fi.timestamp = std::chrono::system_clock::now(); fi.buf_id = 18446744073709551615ull; fi.source = "host:1234"; @@ -79,7 +79,7 @@ int main(int argc, char* argv[]) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); auto fi1 = fi; auto fi2 = fi; - fi2.message_id = 123; + fi2.id = 123; fi1.timestamp = std::chrono::system_clock::now(); fi2.timestamp = std::chrono::system_clock::now() + std::chrono::minutes(1); fi2.name = asapo::kFinishStreamKeyword; @@ -93,7 +93,7 @@ int main(int argc, char* argv[]) { asapo::MessageMeta fi_db; asapo::MongoDBClient db_new; db_new.Connect("127.0.0.1", db_name); - err = db_new.GetById(std::string("data_") + stream_name, fi.message_id, &fi_db); + err = db_new.GetById(std::string("data_") + stream_name, fi.id, &fi_db); fi_db.id = fi.id; M_AssertTrue(fi_db == fi, "get record from db"); M_AssertEq(nullptr, err); @@ -104,11 +104,11 @@ int main(int argc, char* argv[]) { err = db.GetStreamInfo(std::string("data_") + stream_name, &info); M_AssertEq(nullptr, err); - M_AssertEq(fi.message_id, info.last_id); + M_AssertEq(fi.id, info.last_id); err = db.GetLastStream(&info); M_AssertEq(nullptr, err); - M_AssertEq(fi2.message_id, info.last_id); + M_AssertEq(fi2.id, info.last_id); M_AssertEq("test1", info.name); M_AssertEq(true, info.finished); M_AssertEq("ns", info.next_stream); diff --git a/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp b/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp index 9625eaf09a3a7ee79126484ce4a3f8d1992aa09a..5a235fad523e4f1b9b8dbafa20ef8942b20ea57a 100644 --- a/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp +++ b/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp @@ -41,8 +41,8 @@ int main(int argc, char* argv[]) { fi.timestamp = std::chrono::system_clock::now(); fi.buf_id = 18446744073709551615ull; fi.source = "host:1234"; - fi.id = 0; - fi.message_id = static_cast<uint64_t>(args.file_id); + fi.time_id = 0; + fi.id = static_cast<uint64_t>(args.file_id); fi.dataset_substream = 10; uint64_t dataset_size = 2; @@ -64,8 +64,8 @@ int main(int argc, char* argv[]) { if (args.keyword == "OK") { // check retrieve asapo::MessageMeta fi_db; - err = db.GetDataSetById("data_test", fi.dataset_substream, fi.message_id, &fi_db); - fi_db.id = 0; + err = db.GetDataSetById("data_test", fi.dataset_substream, fi.id, &fi_db); + fi_db.time_id = 0; if (err != nullptr) { std::cout << "GetDataSetById failed: " << err->Explain() << std::endl; } @@ -79,19 +79,19 @@ int main(int argc, char* argv[]) { err = db.GetStreamInfo("data_test", &info); M_AssertEq(nullptr, err); - M_AssertEq(fi.message_id, info.last_id); + M_AssertEq(fi.id, info.last_id); asapo::StreamInfo info_last; err = db.GetLastStream(&info_last); M_AssertEq(nullptr, err); M_AssertEq("test", info_last.name); - M_AssertEq(fi.message_id, info_last.last_id); + M_AssertEq(fi.id, info_last.last_id); M_AssertEq(false, info_last.finished); auto fi2 = fi; fi2.id = 123; - fi2.message_id = 123; + fi2.time_id = 123; fi2.timestamp = std::chrono::system_clock::now() + std::chrono::minutes(1); fi2.name = asapo::kFinishStreamKeyword; fi2.metadata = R"({"next_stream":"ns"})"; @@ -102,7 +102,7 @@ int main(int argc, char* argv[]) { err = db.GetLastStream(&info_last); M_AssertEq(nullptr, err); M_AssertEq("test", info_last.name); - M_AssertEq(fi2.message_id, info_last.last_id); + M_AssertEq(fi2.id, info_last.last_id); M_AssertEq(true, info_last.finished); err = db.DeleteStream("test"); M_AssertEq(nullptr, err); diff --git a/tests/automatic/producer_receiver/transfer_datasets/check_linux.sh b/tests/automatic/producer_receiver/transfer_datasets/check_linux.sh index 7c1769c59954a6b1f5675b40b409046b4f2f944a..1adb4357351474943a6a5ff3406617e31c0c409b 100644 --- a/tests/automatic/producer_receiver/transfer_datasets/check_linux.sh +++ b/tests/automatic/producer_receiver/transfer_datasets/check_linux.sh @@ -28,4 +28,4 @@ ls -ln ${receiver_folder}/processed/1_1 | awk '{ print $5 }'| grep 100000 ls -ln ${receiver_folder}/processed/1_2 | awk '{ print $5 }'| grep 100000 ls -ln ${receiver_folder}/processed/1_3 | awk '{ print $5 }'| grep 100000 -echo 'db.data_default.find({"messages.message_id":{$gt:0}},{"messages.name":1})' | mongo asapo_test_detector | grep 1_1 | grep 1_2 | grep 1_3 +echo 'db.data_default.find({"messages._id":{$gt:0}},{"messages.name":1})' | mongo asapo_test_detector | grep 1_1 | grep 1_2 | grep 1_3 diff --git a/tests/automatic/support/getnext/check_linux.sh b/tests/automatic/support/getnext/check_linux.sh index c880d75ddd1094e7d19df1b587ffd3fc30e224c2..c243fdadb181ac7d549d0ce123a73b0349c80b86 100644 --- a/tests/automatic/support/getnext/check_linux.sh +++ b/tests/automatic/support/getnext/check_linux.sh @@ -17,7 +17,7 @@ Cleanup() { for i in `seq 1 3`; do - echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":100,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data_default.insert({"_id":NumberInt('$i'),"time_id":NumberInt('$i'),"size":100,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} done sleep 1