From 3d8f8e4f08b00a1a3940fc3306beefe0418e1c94 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Mon, 1 Mar 2021 17:12:08 +0100 Subject: [PATCH] update stream info content for consumers --- broker/src/asapo_broker/database/mongodb.go | 81 ++++++++----- .../asapo_broker/database/mongodb_streams.go | 109 ++++++++++++++---- .../src/asapo_broker/database/mongodb_test.go | 81 +++++++------ .../src/asapo_broker/database/streams_test.go | 29 ++++- common/cpp/src/data_structs/data_structs.cpp | 6 +- .../data_structs/test_data_structs.cpp | 2 +- common/go/src/asapo_common/utils/helpers.go | 10 ++ consumer/api/cpp/src/consumer_impl.cpp | 30 +++-- consumer/api/cpp/src/consumer_impl.h | 5 +- .../api/cpp/unittests/test_consumer_impl.cpp | 8 +- consumer/api/python/asapo_consumer.pyx.in | 4 +- .../test_request_handler_db_last_stream.cpp | 2 +- .../test_request_handler_db_stream_info.cpp | 2 +- .../consumer/consumer_api/check_linux.sh | 2 +- .../consumer/consumer_api/check_windows.bat | 6 + .../consumer/consumer_api/consumer_api.cpp | 12 +- .../consumer_api_python/check_linux.sh | 4 + .../consumer_api_python/check_windows.bat | 2 + .../consumer_api_python/consumer_api.py | 11 +- 19 files changed, 289 insertions(+), 117 deletions(-) diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index f95736a2b..41bf082a0 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -24,11 +24,13 @@ type ID struct { ID int `bson:"_id"` } -type FinishedStreamRecord struct { - ID int `json:"_id"` - Name string `json:"name"` - Meta map[string]interface{} `json:"meta"` - NextStream string +type MessageRecord struct { + ID int `json:"_id"` + Timestamp int `bson:"timestamp" json:"timestamp"` + Name string `json:"name"` + Meta map[string]interface{} `json:"meta"` + NextStream string + FinishedStream bool } type InProcessingRecord struct { @@ -277,7 +279,7 @@ func (db *Mongodb) getRecordFromDb(request Request, id, id_max int) (res map[str return res, err } -func (db *Mongodb) getRecordByIDRow(request Request, id, id_max int) ([]byte, error) { +func (db *Mongodb) getRecordByIDRaw(request Request, id, id_max int) ([]byte, error) { res, err := db.getRecordFromDb(request, id, id_max) if err != nil { return nil, err @@ -301,10 +303,10 @@ func (db *Mongodb) getRecordByIDRow(request Request, id, id_max int) ([]byte, er } } -func (db *Mongodb) getEarliestRecord(dbname string, collection_name string) (map[string]interface{}, error) { +func (db *Mongodb) getRawRecordWithSort(dbname string, collection_name string, sortField string, sortOrder int) (map[string]interface{}, error) { var res map[string]interface{} c := db.client.Database(dbname).Collection(data_collection_name_prefix + collection_name) - opts := options.FindOne().SetSort(bson.M{"timestamp": 1}) + opts := options.FindOne().SetSort(bson.M{sortField: sortOrder}) var q bson.M = nil err := c.FindOne(context.TODO(), q, opts).Decode(&res) @@ -317,6 +319,14 @@ func (db *Mongodb) getEarliestRecord(dbname string, collection_name string) (map return res, nil } +func (db *Mongodb) getLastRawRecord(dbname string, collection_name string) (map[string]interface{}, error) { + return db.getRawRecordWithSort(dbname, collection_name, "_id", -1) +} + +func (db *Mongodb) getEarliestRawRecord(dbname string, collection_name string) (map[string]interface{}, error) { + return db.getRawRecordWithSort(dbname, collection_name, "timestamp", 1) +} + func (db *Mongodb) getRecordByID(request Request) ([]byte, error) { id, err := strconv.Atoi(request.ExtraParam) if err != nil { @@ -328,7 +338,7 @@ func (db *Mongodb) getRecordByID(request Request) ([]byte, error) { return nil, err } - return db.getRecordByIDRow(request, id, max_ind) + return db.getRecordByIDRaw(request, id, max_ind) } func (db *Mongodb) negAckRecord(request Request) ([]byte, error) { @@ -518,18 +528,21 @@ func (db *Mongodb) getNextAndMaxIndexes(request Request) (int, int, error) { return nextInd, maxInd, nil } -func ExtractFinishedStreamRecord(data map[string]interface{}) (FinishedStreamRecord, bool) { - var r FinishedStreamRecord +func ExtractMessageRecord(data map[string]interface{}) (MessageRecord, bool) { + var r MessageRecord err := utils.MapToStruct(data, &r) - if err != nil || r.Name != finish_stream_keyword { + if err != nil { return r, false } - var next_stream string - next_stream, ok := r.Meta["next_stream"].(string) - if !ok { - next_stream = no_next_stream_keyword + r.FinishedStream = (r.Name == finish_stream_keyword) + if r.FinishedStream { + var next_stream string + next_stream, ok := r.Meta["next_stream"].(string) + if !ok { + next_stream = no_next_stream_keyword + } + r.NextStream = next_stream } - r.NextStream = next_stream return r, true } @@ -540,7 +553,7 @@ func (db *Mongodb) tryGetRecordFromInprocessed(request Request, originalerror er return nil, err_inproc } if nextInd != 0 { - return db.getRecordByIDRow(request, nextInd, maxInd) + return db.getRecordByIDRaw(request, nextInd, maxInd) } else { return nil, originalerror } @@ -550,8 +563,8 @@ func checkStreamFinished(request Request, id, id_max int, data map[string]interf if id != id_max { return nil } - r, ok := ExtractFinishedStreamRecord(data) - if !ok { + r, ok := ExtractMessageRecord(data) + if !ok || !r.FinishedStream { return nil } log_str := "reached end of stream " + request.DbCollectionName + " , next_stream: " + r.NextStream @@ -567,7 +580,7 @@ func (db *Mongodb) getNextRecord(request Request) ([]byte, error) { return nil, err } - data, err := db.getRecordByIDRow(request, nextInd, maxInd) + data, err := db.getRecordByIDRaw(request, nextInd, maxInd) if err != nil { data, err = db.tryGetRecordFromInprocessed(request, err) } @@ -586,7 +599,7 @@ func (db *Mongodb) getLastRecord(request Request) ([]byte, error) { if err != nil { return nil, err } - return db.getRecordByIDRow(request, max_ind, max_ind) + return db.getRecordByIDRaw(request, max_ind, max_ind) } func getSizeFilter(request Request) bson.M { @@ -725,10 +738,10 @@ func extractsTwoIntsFromString(from_to string) (int, int, error) { } -func (db *Mongodb) getNacksLimits(request Request) (int,int, error) { +func (db *Mongodb) getNacksLimits(request Request) (int, int, error) { from, to, err := extractsTwoIntsFromString(request.ExtraParam) if err != nil { - return 0,0, err + return 0, 0, err } if from == 0 { @@ -736,12 +749,24 @@ func (db *Mongodb) getNacksLimits(request Request) (int,int, error) { } if to == 0 { - to, err = db.getMaxIndex(request, true) + to, err = db.getMaxLimitWithoutEndOfStream(request, err) if err != nil { - return 0,0, err + return 0, 0, err } } - return from,to,nil + return from, to, nil +} + +func (db *Mongodb) getMaxLimitWithoutEndOfStream(request Request, err error) (int, error) { + maxInd, err := db.getMaxIndex(request, true) + if err != nil { + return 0, err + } + _, last_err := db.getRecordByIDRaw(request, maxInd, maxInd) + if last_err != nil && maxInd > 0 { + maxInd = maxInd - 1 + } + return maxInd, nil } func (db *Mongodb) nacks(request Request) ([]byte, error) { @@ -833,7 +858,7 @@ func extractNacsFromCursor(err error, cursor *mongo.Cursor) ([]int, error) { func (db *Mongodb) getNacks(request Request, min_index, max_index int) ([]int, error) { c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId) - if res, err, ok := db.canAvoidDbRequest(min_index, max_index, c);ok{ + if res, err, ok := db.canAvoidDbRequest(min_index, max_index, c); ok { return res, err } diff --git a/broker/src/asapo_broker/database/mongodb_streams.go b/broker/src/asapo_broker/database/mongodb_streams.go index fba70330b..4c531b1b9 100644 --- a/broker/src/asapo_broker/database/mongodb_streams.go +++ b/broker/src/asapo_broker/database/mongodb_streams.go @@ -14,8 +14,12 @@ import ( ) type StreamInfo struct { - Name string `json:"name"` - Timestamp int64 `json:"timestampCreated"` + LastId int64 `json:"lastId"` + Name string `json:"name"` + Timestamp int64 `json:"timestampCreated"` + TimestampLast int64 `json:"timestampLast"` + Finished bool `json:"finished"` + NextStream string `json:"nextStream"` } type StreamsRecord struct { @@ -57,32 +61,93 @@ func readStreams(db *Mongodb, db_name string) (StreamsRecord, error) { return rec, nil } -func updateTimestamps(db *Mongodb, db_name string, rec *StreamsRecord) { - ss,dbFound :=streams.records[db_name] +func getCurrentStreams(db_name string) []StreamInfo { + ss, dbFound := streams.records[db_name] currentStreams := []StreamInfo{} if dbFound { // sort streams by name - currentStreams=ss.Streams - sort.Slice(currentStreams,func(i, j int) bool { - return currentStreams[i].Name>=currentStreams[j].Name + currentStreams = ss.Streams + sort.Slice(currentStreams, func(i, j int) bool { + return currentStreams[i].Name >= currentStreams[j].Name }) } + return currentStreams +} + +func findStreamAmongCurrent(currentStreams []StreamInfo, record StreamInfo) (int, bool) { + ind := sort.Search(len(currentStreams), func(i int) bool { + return currentStreams[i].Name >= record.Name + }) + if ind < len(currentStreams) && currentStreams[ind].Name == record.Name { + return ind, true + } + return -1, false +} + +func fillInfoFromEarliestRecord(db *Mongodb, db_name string, rec *StreamsRecord, record StreamInfo, i int) error { + res, err := db.getEarliestRawRecord(db_name, record.Name) + if err != nil { + return err + } + ts, ok := utils.GetInt64FromMap(res, "timestamp") + if ok { + rec.Streams[i].Timestamp = ts + } else { + return errors.New("fillInfoFromEarliestRecord: cannot extact timestamp") + } + return nil +} + +func fillInfoFromLastRecord(db *Mongodb, db_name string, rec *StreamsRecord, record StreamInfo, i int) error { + res, err := db.getLastRawRecord(db_name, record.Name) + if err != nil { + return err + } + mrec, ok := ExtractMessageRecord(res) + if !ok { + return errors.New("fillInfoFromLastRecord: cannot extract record") + } + + rec.Streams[i].LastId = int64(mrec.ID) + rec.Streams[i].TimestampLast = int64(mrec.Timestamp) + rec.Streams[i].Finished = mrec.FinishedStream + if mrec.FinishedStream { + rec.Streams[i].LastId = rec.Streams[i].LastId - 1 + if mrec.NextStream != no_next_stream_keyword { + rec.Streams[i].NextStream = mrec.NextStream + } + } + return nil +} + +func updateStreamInfofromCurrent(currentStreams []StreamInfo, record StreamInfo, rec *StreamInfo) (found bool, updateFinished bool) { + ind, found := findStreamAmongCurrent(currentStreams, record) + if found { + *rec = currentStreams[ind] + if currentStreams[ind].Finished { + return found, true + } + } + return found, false +} + +func updateStreamInfos(db *Mongodb, db_name string, rec *StreamsRecord) error { + currentStreams := getCurrentStreams(db_name) for i, record := range rec.Streams { - ind := sort.Search(len(currentStreams),func(i int) bool { - return currentStreams[i].Name>=record.Name - }) - if ind < len(currentStreams) && currentStreams[ind].Name == record.Name { // record found, just skip it - rec.Streams[i].Timestamp = currentStreams[ind].Timestamp + found, mayContinue := updateStreamInfofromCurrent(currentStreams, record, &rec.Streams[i]) + if mayContinue { continue } - res, err := db.getEarliestRecord(db_name, record.Name) - if err == nil { - ts,ok:=utils.InterfaceToInt64(res["timestamp"]) - if ok { - rec.Streams[i].Timestamp = ts + if !found { // set timestamp + if err := fillInfoFromEarliestRecord(db, db_name, rec, record, i); err != nil { + return err } } + if err := fillInfoFromLastRecord(db, db_name, rec, record, i); err != nil { // update from last record (timestamp, stream finished flag) + return err + } } + return nil } func sortRecords(rec *StreamsRecord) { @@ -96,9 +161,13 @@ func (ss *Streams) updateFromDb(db *Mongodb, db_name string) (StreamsRecord, err if err != nil { return StreamsRecord{}, err } - updateTimestamps(db, db_name, &rec) + err = updateStreamInfos(db, db_name, &rec) + if err != nil { + return StreamsRecord{}, err + } + sortRecords(&rec) - if len(rec.Streams)>0 { + if len(rec.Streams) > 0 { ss.records[db_name] = rec ss.lastUpdated = time.Now().UnixNano() } @@ -107,7 +176,7 @@ func (ss *Streams) updateFromDb(db *Mongodb, db_name string) (StreamsRecord, err func (ss *Streams) getStreams(db *Mongodb, db_name string, from string) (StreamsRecord, error) { streamsLock.Lock() - rec, err := ss.tryGetFromCache(db_name,db.settings.UpdateStreamCachePeriodMs) + rec, err := ss.tryGetFromCache(db_name, db.settings.UpdateStreamCachePeriodMs) if err != nil { rec, err = ss.updateFromDb(db, db_name) } diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index e759b0c39..085690516 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -13,16 +13,17 @@ import ( ) type TestRecord struct { - ID int64 `bson:"_id" json:"_id"` + ID int64 `bson:"_id" json:"_id"` Meta map[string]string `bson:"meta" json:"meta"` Name string `bson:"name" json:"name"` Timestamp int64 `bson:"timestamp" json:"timestamp"` } type TestDataset struct { - ID int64 `bson:"_id" json:"_id"` - Size int64 `bson:"size" json:"size"` - Messages []TestRecord `bson:"messages" json:"messages"` + Timestamp int64 `bson:"timestamp" json:"timestamp"` + ID int64 `bson:"_id" json:"_id"` + Size int64 `bson:"size" json:"size"` + Messages []TestRecord `bson:"messages" json:"messages"` } var db Mongodb @@ -38,9 +39,11 @@ const metaID_str = "0" var empty_next = map[string]string{"next_stream": ""} var rec1 = TestRecord{1, empty_next, "aaa", 0} -var rec_finished = TestRecord{2, map[string]string{"next_stream": "next1"}, finish_stream_keyword, 0} +var rec_finished = TestRecord{2, map[string]string{"next_stream": "next1"}, finish_stream_keyword, 2} var rec2 = TestRecord{2, empty_next, "bbb", 1} var rec3 = TestRecord{3, empty_next, "ccc", 2} +var rec_finished3 = TestRecord{3, map[string]string{"next_stream": "next1"}, finish_stream_keyword, 2} +var rec_finished11 = TestRecord{11, map[string]string{"next_stream": "next1"}, finish_stream_keyword, 2} var rec1_expect, _ = json.Marshal(rec1) var rec2_expect, _ = json.Marshal(rec2) @@ -158,7 +161,7 @@ func TestMongoDBGetByIdErrorOnFinishedStream(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec_finished) - _,err:=db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id",ExtraParam: "2"}) + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", ExtraParam: "2"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message) @@ -170,13 +173,12 @@ func TestMongoDBGetLastErrorOnFinishedStream(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec_finished) - res,err:= db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last"}) + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last"}) fmt.Println(string(res)) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message) } - func TestMongoDBGetNextErrorOnNoMoreData(t *testing.T) { db.Connect(dbaddress) defer cleanup() @@ -434,27 +436,25 @@ func TestMongoDBGetSizeForDatasets(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec1) - _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size",ExtraParam: "false"}) + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size", ExtraParam: "false"}) assert.Equal(t, utils.StatusWrongInput, err.(*DBError).Code) - _, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size",ExtraParam: "true"}) + _, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size", ExtraParam: "true"}) assert.Equal(t, utils.StatusWrongInput, err1.(*DBError).Code) } - func TestMongoDBGetSizeForDatasetsWithFinishedStream(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec_dataset1_incomplete) db.insertRecord(dbname, collection, &rec_finished) - res, _ := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size",ExtraParam: "true"}) + res, _ := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size", ExtraParam: "true"}) var rec_expect, _ = json.Marshal(&SizeRecord{1}) assert.Equal(t, string(rec_expect), string(res)) } - func TestMongoDBGetSizeDataset(t *testing.T) { db.Connect(dbaddress) defer cleanup() @@ -464,7 +464,7 @@ func TestMongoDBGetSizeDataset(t *testing.T) { size2_expect, _ := json.Marshal(SizeRecord{2}) - res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size",ExtraParam: "true"}) + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size", ExtraParam: "true"}) assert.Nil(t, err) assert.Equal(t, string(size2_expect), string(res)) } @@ -639,13 +639,13 @@ func TestMongoDBQueryMessagesOnEmptyDatabase(t *testing.T) { } } -var rec_dataset1 = TestDataset{1, 3, []TestRecord{rec1, rec2, rec3}} -var rec_dataset1_incomplete = TestDataset{1, 4, []TestRecord{rec1, rec2, rec3}} -var rec_dataset2_incomplete = TestDataset{2, 4, []TestRecord{rec1, rec2, rec3}} -var rec_dataset2 = TestDataset{2, 4, []TestRecord{rec1, rec2, rec3}} -var rec_dataset3 = TestDataset{3, 3, []TestRecord{rec3, rec2, rec2}} +var rec_dataset1 = TestDataset{0, 1, 3, []TestRecord{rec1, rec2, rec3}} +var rec_dataset1_incomplete = TestDataset{1, 1, 4, []TestRecord{rec1, rec2, rec3}} +var rec_dataset2_incomplete = TestDataset{2, 2, 4, []TestRecord{rec1, rec2, rec3}} +var rec_dataset2 = TestDataset{1, 2, 4, []TestRecord{rec1, rec2, rec3}} +var rec_dataset3 = TestDataset{2, 3, 3, []TestRecord{rec3, rec2, rec2}} -var rec_dataset2_incomplete3 = TestDataset{2, 3, []TestRecord{rec1, rec2}} +var rec_dataset2_incomplete3 = TestDataset{1, 2, 3, []TestRecord{rec1, rec2}} func TestMongoDBGetDataset(t *testing.T) { db.Connect(dbaddress) @@ -701,7 +701,7 @@ func TestMongoDBGetRecordLastDataSetSkipsIncompleteSets(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1) db.insertRecord(dbname, collection, &rec_dataset2) - res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", DatasetOp:true, ExtraParam: "0"}) + res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"}) assert.Nil(t, err) @@ -719,7 +719,7 @@ func TestMongoDBGetRecordLastDataSetReturnsIncompleteSets(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset2) res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", - DatasetOp:true,MinDatasetSize: 3,ExtraParam: "0"}) + DatasetOp: true, MinDatasetSize: 3, ExtraParam: "0"}) assert.Nil(t, err) @@ -737,7 +737,7 @@ func TestMongoDBGetRecordLastDataSetSkipsIncompleteSetsWithMinSize(t *testing.T) db.insertRecord(dbname, collection, &rec_dataset2_incomplete3) res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", - DatasetOp:true,MinDatasetSize: 3,ExtraParam: "0"}) + DatasetOp: true, MinDatasetSize: 3, ExtraParam: "0"}) assert.Nil(t, err) @@ -754,7 +754,7 @@ func TestMongoDBGetRecordLastDataSetWithFinishedStream(t *testing.T) { db.insertRecord(dbname, collection, &rec_finished) _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", - DatasetOp:true,ExtraParam: "0"}) + DatasetOp: true, ExtraParam: "0"}) assert.NotNil(t, err) if err != nil { @@ -763,7 +763,6 @@ func TestMongoDBGetRecordLastDataSetWithFinishedStream(t *testing.T) { } } - func TestMongoDBGetRecordLastDataSetWithIncompleteDatasetsAndFinishedStreamReturnsEndofStream(t *testing.T) { db.Connect(dbaddress) defer cleanup() @@ -772,7 +771,7 @@ func TestMongoDBGetRecordLastDataSetWithIncompleteDatasetsAndFinishedStreamRetur db.insertRecord(dbname, collection, &rec_finished) _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", - DatasetOp:true,MinDatasetSize: 2,ExtraParam: "0"}) + DatasetOp: true, MinDatasetSize: 2, ExtraParam: "0"}) assert.NotNil(t, err) if err != nil { @@ -781,7 +780,6 @@ func TestMongoDBGetRecordLastDataSetWithIncompleteDatasetsAndFinishedStreamRetur } } - func TestMongoDBGetRecordLastDataSetOK(t *testing.T) { db.Connect(dbaddress) defer cleanup() @@ -804,7 +802,7 @@ func TestMongoDBGetDatasetID(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec_dataset1) - res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp:true, ExtraParam: "1"}) + res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp: true, ExtraParam: "1"}) assert.Nil(t, err) @@ -820,7 +818,7 @@ func TestMongoDBErrorOnIncompleteDatasetID(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec_dataset1_incomplete) - _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp:true, ExtraParam: "1"}) + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp: true, ExtraParam: "1"}) assert.Equal(t, utils.StatusPartialData, err.(*DBError).Code) @@ -836,7 +834,7 @@ func TestMongoDBOkOnIncompleteDatasetID(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec_dataset1_incomplete) - res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp:true,MinDatasetSize: 3,ExtraParam: "1"}) + res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp: true, MinDatasetSize: 3, ExtraParam: "1"}) assert.Nil(t, err) @@ -853,16 +851,20 @@ type Stream struct { } var testsStreams = []struct { - from string + from string streams []Stream expectedStreams StreamsRecord - test string - ok bool + test string + ok bool }{ {"", []Stream{}, StreamsRecord{[]StreamInfo{}}, "no streams", true}, - {"", []Stream{{"ss1", []TestRecord{rec2, rec1}}}, StreamsRecord{[]StreamInfo{StreamInfo{Name: "ss1", Timestamp: 0}}}, "one stream", true}, - {"", []Stream{{"ss1", []TestRecord{rec2, rec1}}, {"ss2", []TestRecord{rec2, rec3}}}, StreamsRecord{[]StreamInfo{StreamInfo{Name: "ss1", Timestamp: 0}, StreamInfo{Name: "ss2", Timestamp: 1}}}, "two streams", true}, - {"ss2", []Stream{{"ss1", []TestRecord{rec1, rec2}}, {"ss2", []TestRecord{rec2, rec3}}}, StreamsRecord{[]StreamInfo{StreamInfo{Name: "ss2", Timestamp: 1}}}, "with from", true}, + {"", []Stream{{"ss1", []TestRecord{rec2, rec1}}}, + StreamsRecord{[]StreamInfo{StreamInfo{Name: "ss1", Timestamp: 0, LastId: 2, TimestampLast: 1}}}, "one stream", true}, + {"", []Stream{{"ss1", []TestRecord{rec2, rec1}}, + {"ss2", []TestRecord{rec2, rec3}}}, + StreamsRecord{[]StreamInfo{StreamInfo{Name: "ss1", Timestamp: 0, LastId: 2, TimestampLast: 1}, + StreamInfo{Name: "ss2", Timestamp: 1, LastId: 3, TimestampLast: 2}}}, "two streams", true}, + {"ss2", []Stream{{"ss1", []TestRecord{rec1, rec2}}, {"ss2", []TestRecord{rec2, rec3}}}, StreamsRecord{[]StreamInfo{StreamInfo{Name: "ss2", Timestamp: 1, LastId: 3, TimestampLast: 2}}}, "with from", true}, } func TestMongoDBListStreams(t *testing.T) { @@ -891,11 +893,13 @@ func TestMongoDBAckMessage(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection, &rec_finished) + query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}" request := Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str} res, err := db.ProcessRequest(request) - nacks, _ := db.getNacks(request, 1, 1) + nacks, _ := db.getNacks(request, 0, 0) assert.Nil(t, err) assert.Equal(t, "", string(res)) assert.Equal(t, 0, len(nacks)) @@ -927,6 +931,7 @@ func TestMongoDBNacks(t *testing.T) { db.Connect(dbaddress) if test.insertRecords { insertRecords(10) + db.insertRecord(dbname, collection, &rec_finished11) } if test.ackRecords { db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackmessage\"}"}) @@ -961,6 +966,7 @@ func TestMongoDBLastAcks(t *testing.T) { db.Connect(dbaddress) if test.insertRecords { insertRecords(10) + db.insertRecord(dbname, collection, &rec_finished11) } if test.ackRecords { db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackmessage\"}"}) @@ -1034,6 +1040,7 @@ func TestMongoDBGetNextReturnsToNormalAfterUsesInprocessed(t *testing.T) { defer cleanup() err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) + db.insertRecord(dbname, collection, &rec_finished3) res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) time.Sleep(time.Second) res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) diff --git a/broker/src/asapo_broker/database/streams_test.go b/broker/src/asapo_broker/database/streams_test.go index c172adf54..9408445be 100644 --- a/broker/src/asapo_broker/database/streams_test.go +++ b/broker/src/asapo_broker/database/streams_test.go @@ -18,7 +18,7 @@ func (suite *StreamsTestSuite) SetupTest() { func (suite *StreamsTestSuite) TearDownTest() { cleanup() - streams.records= map[string]StreamsRecord{} + streams.records = map[string]StreamsRecord{} } func TestStreamsTestSuite(t *testing.T) { @@ -48,6 +48,33 @@ func (suite *StreamsTestSuite) TestStreamsUsesCache() { rec, err := streams.getStreams(&db, dbname, "") suite.Nil(err) suite.Equal(int64(1), rec.Streams[0].Timestamp) + suite.Equal(false, rec.Streams[0].Finished) + suite.Equal(int64(2), rec.Streams[0].LastId) + suite.Equal(int64(1), rec.Streams[0].TimestampLast) +} + +func (suite *StreamsTestSuite) TestStreamsGetFinishedInfo() { + db.settings.UpdateStreamCachePeriodMs = 1000 + db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection, &rec_finished) + rec, err := streams.getStreams(&db, dbname, "") + suite.Nil(err) + suite.Equal(int64(0), rec.Streams[0].Timestamp) + suite.Equal(true, rec.Streams[0].Finished) + suite.Equal("next1", rec.Streams[0].NextStream) +} + +func (suite *StreamsTestSuite) TestStreamsDataSetsGetFinishedInfo() { + db.settings.UpdateStreamCachePeriodMs = 1000 + db.insertRecord(dbname, collection, &rec_dataset1_incomplete) + db.insertRecord(dbname, collection, &rec_finished) + rec, err := streams.getStreams(&db, dbname, "") + suite.Nil(err) + suite.Equal(int64(1), rec.Streams[0].Timestamp) + suite.Equal(int64(2), rec.Streams[0].TimestampLast) + suite.Equal(true, rec.Streams[0].Finished) + suite.Equal("next1", rec.Streams[0].NextStream) + suite.Equal(int64(1), rec.Streams[0].LastId) } func (suite *StreamsTestSuite) TestStreamsNotUsesCacheWhenExpired() { diff --git a/common/cpp/src/data_structs/data_structs.cpp b/common/cpp/src/data_structs/data_structs.cpp index 622c67165..9dea46940 100644 --- a/common/cpp/src/data_structs/data_structs.cpp +++ b/common/cpp/src/data_structs/data_structs.cpp @@ -151,7 +151,7 @@ std::string StreamInfo::Json() const { return ("{\"lastId\":" + std::to_string(last_id) + "," + "\"name\":\"" + name + "\",\"timestampCreated\":" + std::to_string(nanoseconds_from_epoch) + std::string(",") + "\"timestampLast\":" + std::to_string(nanoseconds_from_epoch_le) - + ",\"finished\":" + std::to_string(finished)+ ",\"nextStream\":\"" + next_stream) + + ",\"finished\":" + (finished?"true":"false")+ ",\"nextStream\":\"" + next_stream) + "\"}"; } @@ -159,9 +159,8 @@ bool StreamInfo::SetFromJson(const std::string &json_string) { auto old = *this; JsonStringParser parser(json_string); uint64_t id; - uint64_t finished_i; if (parser.GetUInt64("lastId", &last_id) || - parser.GetUInt64("finished", &finished_i) || + parser.GetBool("finished", &finished) || parser.GetString("nextStream", &next_stream) || !TimeFromJson(parser, "timestampLast", ×tamp_lastentry) || parser.GetString("name", &name) || @@ -169,7 +168,6 @@ bool StreamInfo::SetFromJson(const std::string &json_string) { *this = old; return false; } - finished=bool(finished_i); return true; } diff --git a/common/cpp/unittests/data_structs/test_data_structs.cpp b/common/cpp/unittests/data_structs/test_data_structs.cpp index 4a46f6380..e6507bd21 100644 --- a/common/cpp/unittests/data_structs/test_data_structs.cpp +++ b/common/cpp/unittests/data_structs/test_data_structs.cpp @@ -185,7 +185,7 @@ TEST(StreamInfo, ConvertFromJsonErr) { TEST(StreamInfo, ConvertToJson) { auto sinfo = PrepareStreamInfo(); - std::string expected_json = R"({"lastId":123,"name":"test","timestampCreated":1000000,"timestampLast":2000000,"finished":1,"nextStream":"next"})"; + std::string expected_json = R"({"lastId":123,"name":"test","timestampCreated":1000000,"timestampLast":2000000,"finished":true,"nextStream":"next"})"; auto json = sinfo.Json(); ASSERT_THAT(json,Eq(expected_json)); diff --git a/common/go/src/asapo_common/utils/helpers.go b/common/go/src/asapo_common/utils/helpers.go index 9b7dc2093..e809d3170 100644 --- a/common/go/src/asapo_common/utils/helpers.go +++ b/common/go/src/asapo_common/utils/helpers.go @@ -25,6 +25,16 @@ func MapToJson(res interface{}) ([]byte, error) { } } +func GetInt64FromMap(s map[string]interface{}, name string) (int64,bool) { + val, ok := InterfaceToInt64(s[name]) + if ok { + return val,true + } else { + return -1, false + } +} + + func InterfaceToInt64(val interface{}) (int64, bool) { val64, ok := val.(int64) var valf64 float64 diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index dc8c9487e..42b3688a1 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -529,7 +529,8 @@ Error ConsumerImpl::SetLastReadMarker(std::string group_id, uint64_t value, std: } uint64_t ConsumerImpl::GetCurrentSize(std::string stream, Error* err) { - return GetCurrentCount(stream,false,false,err); + auto ri = GetSizeRequestForSingleMessagesStream(stream); + return GetCurrentCount(stream,ri,err); } Error ConsumerImpl::GetById(uint64_t id, MessageMeta* info, MessageData* data, std::string stream) { @@ -843,21 +844,25 @@ void ConsumerImpl::InterruptCurrentOperation() { } uint64_t ConsumerImpl::GetCurrentDatasetCount(std::string stream, bool include_incomplete, Error* err) { - return GetCurrentCount(stream,true,include_incomplete,err); + RequestInfo ri = GetSizeRequestForDatasetStream(stream, include_incomplete); + return GetCurrentCount(stream,ri,err); } -uint64_t ConsumerImpl::GetCurrentCount(std::string stream, bool datasets, bool include_incomplete, Error* err) { - RequestInfo ri; - ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + - +"/" + std::move(stream) + "/size"; - if (datasets) { - ri.extra_params = std::string("&incomplete=")+(include_incomplete?"true":"false"); - } +RequestInfo ConsumerImpl::GetSizeRequestForDatasetStream(std::string &stream, bool include_incomplete) const { + RequestInfo ri = GetSizeRequestForSingleMessagesStream(stream); + ri.extra_params = std::string("&incomplete=")+(include_incomplete?"true":"false"); + return ri; +} + +uint64_t ConsumerImpl::GetCurrentCount(std::string stream, const RequestInfo& ri, Error* err) { auto responce = BrokerRequestWithTimeout(ri, err); if (*err) { return 0; } + return ParseGetCurrentCountResponce(err, responce); +} +uint64_t ConsumerImpl::ParseGetCurrentCountResponce(Error* err, const std::string &responce) const { JsonStringParser parser(responce); uint64_t size; if ((*err = parser.GetUInt64("size", &size)) != nullptr) { @@ -866,4 +871,11 @@ uint64_t ConsumerImpl::GetCurrentCount(std::string stream, bool datasets, bool i return size; } +RequestInfo ConsumerImpl::GetSizeRequestForSingleMessagesStream(std::string &stream) const { + RequestInfo ri; + ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + + +"/" + std::move(stream) + "/size"; + return ri; +} + } diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index e85028a9d..9628c8107 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -143,7 +143,7 @@ class ConsumerImpl final : public asapo::Consumer { std::string OpToUriCmd(GetMessageServerOperation op); Error UpdateFolderTokenIfNeeded(bool ignore_existing); - uint64_t GetCurrentCount(std::string stream, bool datasets, bool include_incomplete, Error* err); + uint64_t GetCurrentCount(std::string stream, const RequestInfo& ri, Error* err); RequestInfo GetStreamListRequest(const std::string &from, const StreamFilter &filter) const; std::string endpoint_; @@ -164,6 +164,9 @@ class ConsumerImpl final : public asapo::Consumer { uint64_t resend_attempts_; std::atomic<bool> interrupt_flag_{ false}; + RequestInfo GetSizeRequestForSingleMessagesStream(std::string &stream) const; + RequestInfo GetSizeRequestForDatasetStream(std::string &stream, bool include_incomplete) const; + uint64_t ParseGetCurrentCountResponce(Error* err, const std::string &responce) const; }; } diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp index 36d083511..813a2d65f 100644 --- a/consumer/api/cpp/unittests/test_consumer_impl.cpp +++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp @@ -1017,8 +1017,8 @@ TEST_F(ConsumerImplTests, GetDatasetByIdUsesCorrectUri) { TEST_F(ConsumerImplTests, GetStreamListUsesCorrectUri) { MockGetBrokerUri(); std::string return_streams = - std::string(R"({"streams":[{"lastId":123,"name":"test","timestampCreated":1000000,"timestampLast":1000,"finished":0,"nextStream":""},)")+ - R"({"lastId":124,"name":"test1","timestampCreated":2000000,"timestampLast":2000,"finished":1,"nextStream":"next"}]})"; + std::string(R"({"streams":[{"lastId":123,"name":"test","timestampCreated":1000000,"timestampLast":1000,"finished":false,"nextStream":""},)")+ + R"({"lastId":124,"name":"test1","timestampCreated":2000000,"timestampLast":2000,"finished":true,"nextStream":"next"}]})"; EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/0/streams" + "?token=" + expected_token + "&from=stream_from&filter=all", _, @@ -1032,8 +1032,8 @@ TEST_F(ConsumerImplTests, GetStreamListUsesCorrectUri) { ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(streams.size(), Eq(2)); ASSERT_THAT(streams.size(), 2); - ASSERT_THAT(streams[0].Json(), R"({"lastId":123,"name":"test","timestampCreated":1000000,"timestampLast":1000,"finished":0,"nextStream":""})"); - ASSERT_THAT(streams[1].Json(), R"({"lastId":124,"name":"test1","timestampCreated":2000000,"timestampLast":2000,"finished":1,"nextStream":"next"})"); + ASSERT_THAT(streams[0].Json(), R"({"lastId":123,"name":"test","timestampCreated":1000000,"timestampLast":1000,"finished":false,"nextStream":""})"); + ASSERT_THAT(streams[1].Json(), R"({"lastId":124,"name":"test1","timestampCreated":2000000,"timestampLast":2000,"finished":true,"nextStream":"next"})"); } TEST_F(ConsumerImplTests, GetStreamListUsesCorrectUriWithoutFrom) { diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 913883a35..346941f28 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -104,7 +104,7 @@ cdef throw_exception(Error& err, res = None): cdef class PyConsumer: cdef unique_ptr[Consumer] c_consumer - cdef StreamFilter _filter_to_cfilter(self,filter): + cdef StreamFilter _filter_to_cfilter(self,filter) except + : if filter == "all": return StreamFilter_kAllStreams elif filter == "finished": @@ -228,7 +228,7 @@ cdef class PyConsumer: if err: throw_exception(err) return _str(group_id) - def get_stream_list(self, filter="all", from_stream = ""): + def get_stream_list(self,from_stream = "",filter="all"): cdef Error err cdef vector[StreamInfo] streams cdef string b_from_stream = _bytes(from_stream) diff --git a/receiver/unittests/request_handler/test_request_handler_db_last_stream.cpp b/receiver/unittests/request_handler/test_request_handler_db_last_stream.cpp index 2e6762cb5..9eb2310ca 100644 --- a/receiver/unittests/request_handler/test_request_handler_db_last_stream.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db_last_stream.cpp @@ -63,7 +63,7 @@ class DbMetaLastStreamTests : public Test { ReceiverConfig config; std::string expected_beamtime_id = "beamtime_id"; std::string expected_data_source = "source"; - std::string info_str = R"({"lastId":10,"name":"stream","timestampCreated":1000000,"timestampLast":2000000})"; + std::string info_str = R"({"lastId":10,"name":"stream","timestampCreated":1000000,"timestampLast":2000000,"finished":false,"nextStream":""})"; asapo::StreamInfo expected_stream_info; void SetUp() override { GenericRequestHeader request_header; diff --git a/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp b/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp index 1d1d96d3d..a2828c31e 100644 --- a/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp @@ -64,7 +64,7 @@ class DbMetaStreamInfoTests : public Test { ReceiverConfig config; std::string expected_beamtime_id = "beamtime_id"; std::string expected_data_source = "source"; - std::string info_str = R"({"lastId":10,"name":"stream","timestampCreated":1000000,"timestampLast":2000000})"; + std::string info_str = R"({"lastId":10,"name":"stream","timestampCreated":1000000,"timestampLast":2000000,"finished":false,"nextStream":""})"; asapo::StreamInfo expected_stream_info; void SetUp() override { GenericRequestHeader request_header; diff --git a/tests/automatic/consumer/consumer_api/check_linux.sh b/tests/automatic/consumer/consumer_api/check_linux.sh index 9f599d42b..5d4d14696 100644 --- a/tests/automatic/consumer/consumer_api/check_linux.sh +++ b/tests/automatic/consumer/consumer_api/check_linux.sh @@ -42,7 +42,7 @@ for i in `seq 1 5`; do echo 'db.data_stream2.insert({"_id":'$i',"size":6,"name":"'2$i'","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} done -echo 'db.data_stream2.insert({"_id":'6',"size":0,"name":"asapo_finish_stream","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"asapo_no_next"}})' | mongo ${database_name} +echo 'db.data_stream2.insert({"_id":'6',"size":0,"name":"asapo_finish_stream","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"asapo_no_next"}})' | mongo ${database_name} echo hello1 > 1 diff --git a/tests/automatic/consumer/consumer_api/check_windows.bat b/tests/automatic/consumer/consumer_api/check_windows.bat index 19e163518..908980541 100644 --- a/tests/automatic/consumer/consumer_api/check_windows.bat +++ b/tests/automatic/consumer/consumer_api/check_windows.bat @@ -12,7 +12,13 @@ for /l %%x in (1, 1, 10) do echo db.data_default.insert({"_id":%%x,"size":6,"nam for /l %%x in (1, 1, 5) do echo db.data_stream1.insert({"_id":%%x,"size":6,"name":"1%%x","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error +echo db.data_stream1.insert({"_id":6,"size":0,"name":"asapo_finish_stream","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"ns"}}) | %mongo_exe% %database_name% || goto :error + for /l %%x in (1, 1, 5) do echo db.data_stream2.insert({"_id":%%x,"size":6,"name":"2%%x","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error + +echo db.data_stream2.insert({"_id":6,"size":0,"name":"asapo_finish_stream","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"asapo_no_next"}}) | %mongo_exe% %database_name% || goto :error + + echo hello1 > 1 diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp index e72b7cb18..91d802d18 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.cpp +++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp @@ -143,17 +143,19 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str auto streams = consumer->GetStreamList("",asapo::StreamFilter::kAllStreams,&err); M_AssertTrue(err == nullptr, "GetStreamList no error"); M_AssertTrue(streams.size() == 3, "streams.size"); - M_AssertTrue(streams[0].name == "default", "streams0.name1"); - M_AssertTrue(streams[1].name == "stream1", "streams1.name2"); - M_AssertTrue(streams[2].name == "stream2", "streams2.name3"); + M_AssertTrue(streams[0].name == "default", "streams0.name"); + M_AssertTrue(streams[1].name == "stream1", "streams1.name"); + M_AssertTrue(streams[2].name == "stream2", "streams2.name"); M_AssertTrue(streams[1].finished == true, "stream1 finished"); - M_AssertTrue(streams[1].next_stream == "next_stream", "stream1 next stream"); + M_AssertTrue(streams[1].next_stream == "ns", "stream1 next stream"); M_AssertTrue(streams[2].finished == true, "stream2 finished"); M_AssertTrue(streams[2].next_stream == "", "stream2 no next stream"); M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[0].timestamp_created) == 0, "streams0.timestamp"); - M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[0].timestamp_lastentry) > 0, "streams0.timestamp lastentry set"); + M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[0].timestamp_lastentry) == 0, "streams0.timestamp lastentry"); M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[1].timestamp_created) == 1000, "streams1.timestamp"); + M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[1].timestamp_lastentry) == 1000, "streams1.timestamp lastentry"); M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[2].timestamp_created) == 2000, "streams2.timestamp"); + M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[2].timestamp_lastentry) == 2000, "streams2.timestamp lastentry"); // acknowledges auto id = consumer->GetLastAcknowledgedMessage(group_id,"default", &err); diff --git a/tests/automatic/consumer/consumer_api_python/check_linux.sh b/tests/automatic/consumer/consumer_api_python/check_linux.sh index 22d179ce1..aed808f5d 100644 --- a/tests/automatic/consumer/consumer_api_python/check_linux.sh +++ b/tests/automatic/consumer/consumer_api_python/check_linux.sh @@ -50,6 +50,10 @@ do echo 'db.data_stream2.insert({"_id":'$i',"size":6,"name":"'2$i'","timestamp":3000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} done +echo 'db.data_stream1.insert({"_id":'6',"size":0,"name":"asapo_finish_stream","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"ns"}})' | mongo ${database_name} +echo 'db.data_stream2.insert({"_id":'6',"size":0,"name":"asapo_finish_stream","timestamp":3000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"asapo_no_next"}})' | mongo ${database_name} + + sleep 1 export PYTHONPATH=$1:${PYTHONPATH} diff --git a/tests/automatic/consumer/consumer_api_python/check_windows.bat b/tests/automatic/consumer/consumer_api_python/check_windows.bat index adcf8ce57..a6e530432 100644 --- a/tests/automatic/consumer/consumer_api_python/check_windows.bat +++ b/tests/automatic/consumer/consumer_api_python/check_windows.bat @@ -20,6 +20,8 @@ for /l %%x in (1, 1, 5) do echo db.data_stream1.insert({"_id":%%x,"size":6,"name for /l %%x in (1, 1, 5) do echo db.data_stream2.insert({"_id":%%x,"size":6,"name":"2%%x","timestamp":3000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error +echo db.data_stream1.insert({"_id":6,"size":0,"name":"asapo_finish_stream","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"ns"}}) | %mongo_exe% %database_name% || goto :error +echo db.data_stream2.insert({"_id":6,"size":0,"name":"asapo_finish_stream","timestamp":3000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"asapo_no_next"}}) | %mongo_exe% %database_name% || goto :error mkdir %source_path% diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py index 4e0915726..c253db6e4 100644 --- a/tests/automatic/consumer/consumer_api_python/consumer_api.py +++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py @@ -129,15 +129,22 @@ def check_single(consumer, group_id): _, meta = consumer.get_next(group_id, meta_only=True, stream = "stream2") assert_metaname(meta, "21", "get next stream2") - streams = consumer.get_stream_list("") + streams = consumer.get_stream_list("","all") assert_eq(len(streams), 4, "number of streams") print(streams) assert_eq(streams[0]["name"], "default", "streams_name1") + assert_eq(streams[0]["finished"], False, "streams_finished1") assert_eq(streams[1]["name"], "streamfts", "streams_name2") assert_eq(streams[2]["name"], "stream1", "streams_name2") assert_eq(streams[3]["name"], "stream2", "streams_name3") assert_eq(streams[1]["timestampCreated"], 1000, "streams_timestamp2") - + assert_eq(streams[2]["timestampLast"], 2000, "streams_timestamplast2") + assert_eq(streams[2]["finished"], True, "streams_finished2") + assert_eq(streams[2]["nextStream"], "ns", "next stream 2") + assert_eq(streams[2]["lastId"], 5, "last id stream 2") + assert_eq(streams[3]["finished"], True, "streams_finished3") + assert_eq(streams[3]["nextStream"], "", "next stream 3") + assert_eq(streams[3]["lastId"], 5, "last id stream 3") # acks try: id = consumer.get_last_acknowledged_message(group_id) -- GitLab