diff --git a/CHANGELOG.md b/CHANGELOG.md index 45154555a098927e46742dc4c158c262ca3f210e..29826280e18ea889c514caa0fb62bbfd4342560b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,10 @@ IMPROVEMENTS * Producer API - queue limits in Python, for C++ return original data in error custom data * Consumer API - add GetCurrentDatasetCount/get_current_dataset_count function with option to include or exclude incomplete datasets +* Consumer API - GetStreamList/get_stream_list - can filter finished/unfinished streams now + +BREAKING CHANGES +* Consumer API (C++ only)- GetStreamList has now extra argument StreamFilter ## 20.12.0 diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 09b17e85472e5e81472171212aee983180655879..f95736a2be9fd873dcba5be494cef11e8b1149dd 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -8,7 +8,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" @@ -25,27 +24,27 @@ type ID struct { ID int `bson:"_id"` } -type ServiceRecord struct { - ID int `json:"_id"` - Name string `json:"name"` - Meta map[string]interface{} `json:"meta"` +type FinishedStreamRecord struct { + ID int `json:"_id"` + Name string `json:"name"` + Meta map[string]interface{} `json:"meta"` + NextStream string } type InProcessingRecord struct { - ID int `bson:"_id" json:"_id"` - MaxResendAttempts int `bson:"maxResendAttempts" json:"maxResendAttempts"` - ResendAttempts int `bson:"resendAttempts" json:"resendAttempts"` - DelayMs int64 `bson:"delayMs" json:"delayMs"` + ID int `bson:"_id" json:"_id"` + MaxResendAttempts int `bson:"maxResendAttempts" json:"maxResendAttempts"` + ResendAttempts int `bson:"resendAttempts" json:"resendAttempts"` + DelayMs int64 `bson:"delayMs" json:"delayMs"` } type NegAckParamsRecord struct { - ID int `bson:"_id" json:"_id"` - MaxResendAttempts int `bson:"maxResendAttempts" json:"maxResendAttempts"` - ResendAttempts int `bson:"resendAttempts" json:"resendAttempts"` - DelayMs int64 `bson:"delayMs" json:"delayMs"` + ID int `bson:"_id" json:"_id"` + MaxResendAttempts int `bson:"maxResendAttempts" json:"maxResendAttempts"` + ResendAttempts int `bson:"resendAttempts" json:"resendAttempts"` + DelayMs int64 `bson:"delayMs" json:"delayMs"` } - type Nacks struct { Unacknowledged []int `json:"unacknowledged"` } @@ -74,8 +73,6 @@ const no_next_stream_keyword = "asapo_no_next" var dbSessionLock sync.Mutex - - type SizeRecord struct { Size int `bson:"size" json:"size"` } @@ -161,18 +158,24 @@ func (db *Mongodb) insertMeta(dbname string, s interface{}) error { return err } -func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool) (max_id int, err error) { - c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.DbCollectionName) +func maxIndexQuery(request Request, returnIncompete bool) bson.M { var q bson.M if request.DatasetOp && !returnIncompete { - if request.MinDatasetSize>0 { - q = bson.M{"size": bson.M{"$gte": request.MinDatasetSize}} + if request.MinDatasetSize > 0 { + q = bson.M{"$expr": bson.M{"$gte": []interface{}{bson.M{"$size": "$messages"}, request.MinDatasetSize}}} } else { q = bson.M{"$expr": bson.M{"$eq": []interface{}{"$size", bson.M{"$size": "$messages"}}}} } + q = bson.M{"$or": []interface{}{bson.M{"name": finish_stream_keyword}, q}} } else { q = nil } + return q +} + +func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool) (max_id int, err error) { + c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.DbCollectionName) + q := maxIndexQuery(request, returnIncompete) opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true) var result ID @@ -184,7 +187,6 @@ func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool) (max_id in return result.ID, err } - func duplicateError(err error) bool { command_error, ok := err.(mongo.CommandError) if !ok { @@ -219,7 +221,7 @@ func (db *Mongodb) incrementField(request Request, max_ind int, res interface{}) if err == mongo.ErrNoDocuments || duplicateError(err) { // try again without upsert - if the first error was due to missing pointer opts = options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After) - if err2 := c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res);err2==nil { + if err2 := c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res); err2 == nil { return nil } return &DBError{utils.StatusNoData, encodeAnswer(max_ind, max_ind, "")} @@ -232,58 +234,71 @@ func (db *Mongodb) incrementField(request Request, max_ind int, res interface{}) func encodeAnswer(id, id_max int, next_stream string) string { var r = struct { - Op string `json:"op"` - Id int `json:"id"` - Id_max int `json:"id_max"` + Op string `json:"op"` + Id int `json:"id"` + Id_max int `json:"id_max"` Next_stream string `json:"next_stream"` }{"get_record_by_id", id, id_max, next_stream} answer, _ := json.Marshal(&r) return string(answer) } -func (db *Mongodb) getRecordByIDRow(request Request, id, id_max int) ([]byte, error) { - var res map[string]interface{} - q := bson.M{"_id": id} +func recordContainsPartialData(request Request, rec map[string]interface{}) bool { + if !request.DatasetOp { + return false + } + + name, ok_name := rec["name"].(string) + if ok_name && name == finish_stream_keyword { + return false + } + imgs, ok1 := rec["messages"].(primitive.A) + expectedSize, ok2 := utils.InterfaceToInt64(rec["size"]) + if !ok1 || !ok2 { + return false + } + nMessages := len(imgs) + if (request.MinDatasetSize == 0 && int64(nMessages) != expectedSize) || (request.MinDatasetSize == 0 && nMessages < request.MinDatasetSize) { + return true + } + return false +} +func (db *Mongodb) getRecordFromDb(request Request, id, id_max int) (res map[string]interface{}, err error) { + q := bson.M{"_id": id} c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.DbCollectionName) - err := c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res) + err = c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res) if err != nil { answer := encodeAnswer(id, id_max, "") log_str := "error getting record id " + strconv.Itoa(id) + " for " + request.DbName + " : " + err.Error() - fmt.Println(err) logger.Debug(log_str) - return nil, &DBError{utils.StatusNoData, answer} + return res, &DBError{utils.StatusNoData, answer} } + return res, err +} - partialData := false - if request.DatasetOp { - imgs,ok1 :=res["messages"].(primitive.A) - expectedSize,ok2 := utils.InterfaceToInt64(res["size"]) - if !ok1 || !ok2 { - return nil, &DBError{utils.StatusTransactionInterrupted, "getRecordByIDRow: cannot parse database response" } - } - nMessages := len(imgs) - if (request.MinDatasetSize==0 && int64(nMessages)!=expectedSize) || (request.MinDatasetSize==0 && nMessages<request.MinDatasetSize) { - partialData = true - } +func (db *Mongodb) getRecordByIDRow(request Request, id, id_max int) ([]byte, error) { + res, err := db.getRecordFromDb(request, id, id_max) + if err != nil { + return nil, err } - if partialData { - log_str := "got record id " + strconv.Itoa(id) + " for " + request.DbName - logger.Debug(log_str) - } else { - log_str := "got record id " + strconv.Itoa(id) + " for " + request.DbName - logger.Debug(log_str) + if err := checkStreamFinished(request, id, id_max, res); err != nil { + return nil, err } - answer,err := utils.MapToJson(&res) - if err!=nil { - return nil,err + log_str := "got record id " + strconv.Itoa(id) + " for " + request.DbName + logger.Debug(log_str) + + record, err := utils.MapToJson(&res) + if err != nil { + return nil, err } - if partialData { - return nil,&DBError{utils.StatusPartialData, string(answer)} + if recordContainsPartialData(request, res) { + return nil, &DBError{utils.StatusPartialData, string(record)} + } else { + return record, nil } - return answer,nil } func (db *Mongodb) getEarliestRecord(dbname string, collection_name string) (map[string]interface{}, error) { @@ -297,9 +312,9 @@ func (db *Mongodb) getEarliestRecord(dbname string, collection_name string) (map if err == mongo.ErrNoDocuments { return map[string]interface{}{}, nil } - return nil,err + return nil, err } - return res,nil + return res, nil } func (db *Mongodb) getRecordByID(request Request) ([]byte, error) { @@ -308,18 +323,17 @@ func (db *Mongodb) getRecordByID(request Request) ([]byte, error) { return nil, &DBError{utils.StatusWrongInput, err.Error()} } - max_ind, err := db.getMaxIndex(request,true) + max_ind, err := db.getMaxIndex(request, true) if err != nil { return nil, err } return db.getRecordByIDRow(request, id, max_ind) - } func (db *Mongodb) negAckRecord(request Request) ([]byte, error) { input := struct { - Id int + Id int Params struct { DelayMs int } @@ -330,14 +344,13 @@ func (db *Mongodb) negAckRecord(request Request) ([]byte, error) { return nil, &DBError{utils.StatusWrongInput, err.Error()} } - err = db.InsertRecordToInprocess(request.DbName,inprocess_collection_name_prefix+request.GroupId,input.Id,input.Params.DelayMs, 1) + err = db.InsertRecordToInprocess(request.DbName, inprocess_collection_name_prefix+request.GroupId, input.Id, input.Params.DelayMs, 1) return []byte(""), err } - func (db *Mongodb) ackRecord(request Request) ([]byte, error) { var record ID - err := json.Unmarshal([]byte(request.ExtraParam),&record) + err := json.Unmarshal([]byte(request.ExtraParam), &record) if err != nil { return nil, &DBError{utils.StatusWrongInput, err.Error()} } @@ -368,7 +381,7 @@ func (db *Mongodb) checkDatabaseOperationPrerequisites(request Request) error { } func (db *Mongodb) getCurrentPointer(request Request) (LocationPointer, int, error) { - max_ind, err := db.getMaxIndex(request,true) + max_ind, err := db.getMaxIndex(request, true) if err != nil { return LocationPointer{}, 0, err } @@ -386,18 +399,18 @@ func (db *Mongodb) getCurrentPointer(request Request) (LocationPointer, int, err return curPointer, max_ind, nil } -func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, delayMs int,nResendAttempts int) (int, error) { +func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, delayMs int, nResendAttempts int) (int, error) { var res InProcessingRecord opts := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After) tNow := time.Now().UnixNano() - var update bson.M - if nResendAttempts==0 { - update = bson.M{"$set": bson.M{"delayMs": tNow + int64(delayMs*1e6) ,"maxResendAttempts":math.MaxInt32}, "$inc": bson.M{"resendAttempts": 1}} + var update bson.M + if nResendAttempts == 0 { + update = bson.M{"$set": bson.M{"delayMs": tNow + int64(delayMs*1e6), "maxResendAttempts": math.MaxInt32}, "$inc": bson.M{"resendAttempts": 1}} } else { - update = bson.M{"$set": bson.M{"delayMs": tNow + int64(delayMs*1e6) ,"maxResendAttempts":nResendAttempts}, "$inc": bson.M{"resendAttempts": 1}} + update = bson.M{"$set": bson.M{"delayMs": tNow + int64(delayMs*1e6), "maxResendAttempts": nResendAttempts}, "$inc": bson.M{"resendAttempts": 1}} } - q := bson.M{"delayMs": bson.M{"$lte": tNow},"$expr": bson.M{"$lt": []string{"$resendAttempts","$maxResendAttempts"}}} + q := bson.M{"delayMs": bson.M{"$lte": tNow}, "$expr": bson.M{"$lt": []string{"$resendAttempts", "$maxResendAttempts"}}} c := db.client.Database(dbname).Collection(collection_name) err := c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(&res) if err != nil { @@ -412,9 +425,9 @@ func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, delay return res.ID, nil } -func (db *Mongodb) InsertRecordToInprocess(db_name string, collection_name string,id int,delayMs int, nResendAttempts int) error { +func (db *Mongodb) InsertRecordToInprocess(db_name string, collection_name string, id int, delayMs int, nResendAttempts int) error { record := InProcessingRecord{ - id, nResendAttempts, 0,time.Now().UnixNano()+int64(delayMs*1e6), + id, nResendAttempts, 0, time.Now().UnixNano() + int64(delayMs*1e6), } c := db.client.Database(db_name).Collection(collection_name) @@ -434,12 +447,12 @@ func (db *Mongodb) InsertToInprocessIfNeeded(db_name string, collection_name str return err } - return db.InsertRecordToInprocess(db_name,collection_name,id,delayMs, nResendAttempts) + return db.InsertRecordToInprocess(db_name, collection_name, id, delayMs, nResendAttempts) } func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTimeout bool) (int, int, error) { - var record_ind, max_ind, delayMs, nResendAttempts int + var record_ind, max_ind, delayMs, nResendAttempts int var err error if len(request.ExtraParam) != 0 { delayMs, nResendAttempts, err = extractsTwoIntsFromString(request.ExtraParam) @@ -451,7 +464,7 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi } tNow := time.Now().Unix() if (atomic.LoadInt64(&db.lastReadFromInprocess) <= tNow-int64(db.settings.ReadFromInprocessPeriod)) || ignoreTimeout { - record_ind, err = db.getUnProcessedId(request.DbName, inprocess_collection_name_prefix+request.GroupId, delayMs,nResendAttempts) + record_ind, err = db.getUnProcessedId(request.DbName, inprocess_collection_name_prefix+request.GroupId, delayMs, nResendAttempts) if err != nil { log_str := "error getting unprocessed id " + request.DbName + ", groupid: " + request.GroupId + ":" + err.Error() logger.Debug(log_str) @@ -505,45 +518,58 @@ func (db *Mongodb) getNextAndMaxIndexes(request Request) (int, int, error) { return nextInd, maxInd, nil } -func (db *Mongodb) processLastRecord(request Request, data []byte, err error) ([]byte, error) { - var r ServiceRecord - err = json.Unmarshal(data, &r) +func ExtractFinishedStreamRecord(data map[string]interface{}) (FinishedStreamRecord, bool) { + var r FinishedStreamRecord + err := utils.MapToStruct(data, &r) if err != nil || r.Name != finish_stream_keyword { - return data, err + return r, false } var next_stream string next_stream, ok := r.Meta["next_stream"].(string) if !ok { next_stream = no_next_stream_keyword } + r.NextStream = next_stream + return r, true +} - answer := encodeAnswer(r.ID, r.ID, next_stream) - log_str := "reached end of stream " + request.DbCollectionName + " , next_stream: " + next_stream - logger.Debug(log_str) - - +func (db *Mongodb) tryGetRecordFromInprocessed(request Request, originalerror error) ([]byte, error) { var err_inproc error nextInd, maxInd, err_inproc := db.getNextAndMaxIndexesFromInprocessed(request, true) if err_inproc != nil { return nil, err_inproc } if nextInd != 0 { - return db.getRecordByIDRow(request, nextInd, maxInd) + return db.getRecordByIDRow(request, nextInd, maxInd) + } else { + return nil, originalerror + } +} + +func checkStreamFinished(request Request, id, id_max int, data map[string]interface{}) error { + if id != id_max { + return nil + } + r, ok := ExtractFinishedStreamRecord(data) + if !ok { + return nil } + log_str := "reached end of stream " + request.DbCollectionName + " , next_stream: " + r.NextStream + logger.Debug(log_str) - return nil, &DBError{utils.StatusNoData, answer} + answer := encodeAnswer(r.ID-1, r.ID-1, r.NextStream) + return &DBError{utils.StatusNoData, answer} } func (db *Mongodb) getNextRecord(request Request) ([]byte, error) { - nextInd, maxInd, err := db.getNextAndMaxIndexes(request) if err != nil { return nil, err } data, err := db.getRecordByIDRow(request, nextInd, maxInd) - if nextInd == maxInd && GetStatusCodeFromError(err)!=utils.StatusPartialData { - data, err = db.processLastRecord(request,data, err) + if err != nil { + data, err = db.tryGetRecordFromInprocessed(request, err) } if err == nil { @@ -563,24 +589,30 @@ func (db *Mongodb) getLastRecord(request Request) ([]byte, error) { return db.getRecordByIDRow(request, max_ind, max_ind) } -func (db *Mongodb) getSize(request Request) ([]byte, error) { - c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.DbCollectionName) - var rec SizeRecord - var err error - filter:=bson.M{} - if request.ExtraParam=="false" { // do not return incomplete datasets +func getSizeFilter(request Request) bson.M { + filter := bson.M{} + if request.ExtraParam == "false" { // do not return incomplete datasets filter = bson.M{"$expr": bson.M{"$eq": []interface{}{"$size", bson.M{"$size": "$messages"}}}} - } else if request.ExtraParam=="true" { - filter = bson.M{"$expr": bson.M{"gt": []interface{}{0, bson.M{"$size": "$messages"}}}} + } else if request.ExtraParam == "true" { + filter = bson.M{"$expr": bson.M{"$gt": []interface{}{bson.M{"$size": "$messages"}, 0}}} } + filter = bson.M{"$and": []interface{}{bson.M{"name": bson.M{"$ne": finish_stream_keyword}}, filter}} + return filter +} +func (db *Mongodb) getSize(request Request) ([]byte, error) { + c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.DbCollectionName) + + filter := getSizeFilter(request) size, err := c.CountDocuments(context.TODO(), filter, options.Count()) if err != nil { if ce, ok := err.(mongo.CommandError); ok && ce.Code == 17124 { - return nil,&DBError{utils.StatusWrongInput, "no datasets found"} + return nil, &DBError{utils.StatusWrongInput, "no datasets found"} } return nil, err } + + var rec SizeRecord rec.Size = int(size) return json.Marshal(&rec) } @@ -592,7 +624,7 @@ func (db *Mongodb) resetCounter(request Request) ([]byte, error) { } err = db.setCounter(request, id) - if err!= nil { + if err != nil { return []byte(""), err } @@ -693,10 +725,10 @@ func extractsTwoIntsFromString(from_to string) (int, int, error) { } -func (db *Mongodb) nacks(request Request) ([]byte, error) { +func (db *Mongodb) getNacksLimits(request Request) (int,int, error) { from, to, err := extractsTwoIntsFromString(request.ExtraParam) if err != nil { - return nil, err + return 0,0, err } if from == 0 { @@ -706,9 +738,17 @@ func (db *Mongodb) nacks(request Request) ([]byte, error) { if to == 0 { to, err = db.getMaxIndex(request, true) if err != nil { - return nil, err + return 0,0, err } } + return from,to,nil +} + +func (db *Mongodb) nacks(request Request) ([]byte, error) { + from, to, err := db.getNacksLimits(request) + if err != nil { + return nil, err + } res := Nacks{[]int{}} if to == 0 { @@ -739,27 +779,28 @@ func (db *Mongodb) lastAck(request Request) ([]byte, error) { return utils.MapToJson(&result) } -func (db *Mongodb) getNacks(request Request, min_index, max_index int) ([]int, error) { - - c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId) - +func (db *Mongodb) canAvoidDbRequest(min_index int, max_index int, c *mongo.Collection) ([]int, error, bool) { if min_index > max_index { - return []int{}, errors.New("from index is greater than to index") + return []int{}, errors.New("from index is greater than to index"), true } size, err := c.CountDocuments(context.TODO(), bson.M{}, options.Count()) if err != nil { - return []int{}, err + return []int{}, err, true } if size == 0 { - return makeRange(min_index, max_index), nil + return makeRange(min_index, max_index), nil, true } if min_index == 1 && int(size) == max_index { - return []int{}, nil + return []int{}, nil, true } + return nil, nil, false +} + +func getNacksQuery(max_index int, min_index int) []bson.D { matchStage := bson.D{{"$match", bson.D{{"_id", bson.D{{"$lt", max_index + 1}, {"$gt", min_index - 1}}}}}} groupStage := bson.D{ {"$group", bson.D{ @@ -775,30 +816,41 @@ func (db *Mongodb) getNacks(request Request, min_index, max_index int) ([]int, e {"$setDifference", bson.A{bson.D{{"$range", bson.A{min_index, max_index + 1}}}, "$numbers"}}, }}}, }} + return mongo.Pipeline{matchStage, groupStage, projectStage} +} - query := mongo.Pipeline{matchStage, groupStage, projectStage} - cursor, err := c.Aggregate(context.Background(), query) - type res struct { +func extractNacsFromCursor(err error, cursor *mongo.Cursor) ([]int, error) { + resp := []struct { Numbers []int - } - resp := []res{} + }{} err = cursor.All(context.Background(), &resp) if err != nil || len(resp) != 1 { return []int{}, err } - return resp[0].Numbers, nil } +func (db *Mongodb) getNacks(request Request, min_index, max_index int) ([]int, error) { + c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId) + + if res, err, ok := db.canAvoidDbRequest(min_index, max_index, c);ok{ + return res, err + } + + query := getNacksQuery(max_index, min_index) + cursor, err := c.Aggregate(context.Background(), query) + + return extractNacsFromCursor(err, cursor) +} + func (db *Mongodb) getStreams(request Request) ([]byte, error) { - rec, err := streams.getStreams(db,request.DbName,request.ExtraParam) + rec, err := streams.getStreams(db, request.DbName, request.ExtraParam) if err != nil { return db.processQueryError("get streams", request.DbName, err) } return json.Marshal(&rec) } - func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) { if err := db.checkDatabaseOperationPrerequisites(request); err != nil { return nil, err diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index cc6119e172774995131d2abe2ea88f27e3d203db..e759b0c39553f08b6c6fcf4b3bb0b07b5000f9ee 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -149,9 +149,34 @@ func TestMongoDBGetNextErrorOnFinishedStream(t *testing.T) { _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) - assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":2,\"next_stream\":\"next1\"}", err.(*DBError).Message) + assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message) } +func TestMongoDBGetByIdErrorOnFinishedStream(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection, &rec_finished) + + _,err:=db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id",ExtraParam: "2"}) + + assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) + assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message) +} + +func TestMongoDBGetLastErrorOnFinishedStream(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection, &rec_finished) + + res,err:= db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last"}) + fmt.Println(string(res)) + assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) + assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message) +} + + func TestMongoDBGetNextErrorOnNoMoreData(t *testing.T) { db.Connect(dbaddress) defer cleanup() @@ -392,6 +417,18 @@ func TestMongoDBGetSize(t *testing.T) { assert.Equal(t, string(recs1_expect), string(res)) } +func TestMongoDBGetSizeWithFinishedStream(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection, &rec_finished) + + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size"}) + assert.Nil(t, err) + var rec_expect, _ = json.Marshal(&SizeRecord{1}) + assert.Equal(t, string(rec_expect), string(res)) +} + func TestMongoDBGetSizeForDatasets(t *testing.T) { db.Connect(dbaddress) defer cleanup() @@ -404,6 +441,20 @@ func TestMongoDBGetSizeForDatasets(t *testing.T) { assert.Equal(t, utils.StatusWrongInput, err1.(*DBError).Code) } + +func TestMongoDBGetSizeForDatasetsWithFinishedStream(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + db.insertRecord(dbname, collection, &rec_dataset1_incomplete) + db.insertRecord(dbname, collection, &rec_finished) + + res, _ := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size",ExtraParam: "true"}) + + var rec_expect, _ = json.Marshal(&SizeRecord{1}) + assert.Equal(t, string(rec_expect), string(res)) +} + + func TestMongoDBGetSizeDataset(t *testing.T) { db.Connect(dbaddress) defer cleanup() @@ -412,16 +463,10 @@ func TestMongoDBGetSizeDataset(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset2_incomplete) size2_expect, _ := json.Marshal(SizeRecord{2}) - size1_expect, _ := json.Marshal(SizeRecord{1}) res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size",ExtraParam: "true"}) assert.Nil(t, err) assert.Equal(t, string(size2_expect), string(res)) - - res, err = db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size",ExtraParam: "false"}) - assert.Nil(t, err) - assert.Equal(t, string(size1_expect), string(res)) - } func TestMongoDBGetSizeNoRecords(t *testing.T) { @@ -600,6 +645,8 @@ var rec_dataset2_incomplete = TestDataset{2, 4, []TestRecord{rec1, rec2, rec3}} var rec_dataset2 = TestDataset{2, 4, []TestRecord{rec1, rec2, rec3}} var rec_dataset3 = TestDataset{3, 3, []TestRecord{rec3, rec2, rec2}} +var rec_dataset2_incomplete3 = TestDataset{2, 3, []TestRecord{rec1, rec2}} + func TestMongoDBGetDataset(t *testing.T) { db.Connect(dbaddress) defer cleanup() @@ -647,59 +694,94 @@ func TestMongoDBNoDataOnNotCompletedNextDataset(t *testing.T) { assert.Equal(t, rec_dataset2_incomplete, res) } - -func TestMongoDBReturnInCompletedDataset(t *testing.T) { +func TestMongoDBGetRecordLastDataSetSkipsIncompleteSets(t *testing.T) { db.Connect(dbaddress) defer cleanup() - db.insertRecord(dbname, collection, &rec_dataset1_incomplete) + db.insertRecord(dbname, collection, &rec_dataset1) + db.insertRecord(dbname, collection, &rec_dataset2) - res_string, err := db.ProcessRequest(Request{DbName: dbname, - DbCollectionName: collection, GroupId: groupId, Op: "next", DatasetOp: true, MinDatasetSize: 1}) + res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", DatasetOp:true, ExtraParam: "0"}) assert.Nil(t, err) + var res TestDataset json.Unmarshal(res_string, &res) - assert.Equal(t, rec_dataset1_incomplete, res) + assert.Equal(t, rec_dataset1, res) } - -func TestMongoDBGetRecordLastDataSetSkipsIncompleteSets(t *testing.T) { +func TestMongoDBGetRecordLastDataSetReturnsIncompleteSets(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec_dataset1) db.insertRecord(dbname, collection, &rec_dataset2) - res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", DatasetOp:true, ExtraParam: "0"}) + res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", + DatasetOp:true,MinDatasetSize: 3,ExtraParam: "0"}) assert.Nil(t, err) var res TestDataset json.Unmarshal(res_string, &res) - assert.Equal(t, rec_dataset1, res) + assert.Equal(t, rec_dataset2, res) } -func TestMongoDBGetRecordLastDataSetReturnsIncompleteSets(t *testing.T) { +func TestMongoDBGetRecordLastDataSetSkipsIncompleteSetsWithMinSize(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec_dataset1) - db.insertRecord(dbname, collection, &rec_dataset2) + db.insertRecord(dbname, collection, &rec_dataset2_incomplete3) res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", - DatasetOp:true,MinDatasetSize: 2,ExtraParam: "0"}) + DatasetOp:true,MinDatasetSize: 3,ExtraParam: "0"}) assert.Nil(t, err) var res TestDataset json.Unmarshal(res_string, &res) + assert.Equal(t, rec_dataset1, res) +} - assert.Equal(t, rec_dataset2, res) +func TestMongoDBGetRecordLastDataSetWithFinishedStream(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + + db.insertRecord(dbname, collection, &rec_dataset1) + db.insertRecord(dbname, collection, &rec_finished) + + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", + DatasetOp:true,ExtraParam: "0"}) + + assert.NotNil(t, err) + if err != nil { + assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) + assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.Error()) + } } + +func TestMongoDBGetRecordLastDataSetWithIncompleteDatasetsAndFinishedStreamReturnsEndofStream(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + + db.insertRecord(dbname, collection, &rec_dataset1_incomplete) + db.insertRecord(dbname, collection, &rec_finished) + + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", + DatasetOp:true,MinDatasetSize: 2,ExtraParam: "0"}) + + assert.NotNil(t, err) + if err != nil { + assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) + assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.Error()) + } +} + + func TestMongoDBGetRecordLastDataSetOK(t *testing.T) { db.Connect(dbaddress) defer cleanup() diff --git a/common/cpp/include/asapo/common/data_structs.h b/common/cpp/include/asapo/common/data_structs.h index 35b5e49d5b29d2c5726168ca1029a880d9f77695..490286271505d4a1081c8d12972533c3e2adddd9 100644 --- a/common/cpp/include/asapo/common/data_structs.h +++ b/common/cpp/include/asapo/common/data_structs.h @@ -41,10 +41,12 @@ class MessageMeta { struct StreamInfo { uint64_t last_id{0}; std::string name; + bool finished{false}; + std::string next_stream; std::chrono::system_clock::time_point timestamp_created; std::chrono::system_clock::time_point timestamp_lastentry; - std::string Json(bool add_last) const; - bool SetFromJson(const std::string& json_string,bool read_last); + std::string Json() const; + bool SetFromJson(const std::string &json_string); }; using StreamInfos = std::vector<StreamInfo>; diff --git a/common/cpp/src/data_structs/data_structs.cpp b/common/cpp/src/data_structs/data_structs.cpp index fc7882c29dda4c1a5add7578ca45922fb159a03b..622c671656c60c596a25faea34c2e25a6d5558f0 100644 --- a/common/cpp/src/data_structs/data_structs.cpp +++ b/common/cpp/src/data_structs/data_structs.cpp @@ -145,26 +145,31 @@ uint64_t NanosecsEpochFromTimePoint(std::chrono::system_clock::time_point time_p return (uint64_t) std::chrono::duration_cast<std::chrono::nanoseconds>(time_point.time_since_epoch()).count(); } -std::string StreamInfo::Json(bool add_last) const { +std::string StreamInfo::Json() const { auto nanoseconds_from_epoch = NanosecsEpochFromTimePoint(timestamp_created); auto nanoseconds_from_epoch_le = NanosecsEpochFromTimePoint(timestamp_lastentry); - return (add_last ? "{\"lastId\":" + std::to_string(last_id) + "," : "{") + + return ("{\"lastId\":" + std::to_string(last_id) + "," + "\"name\":\"" + name + "\",\"timestampCreated\":" + std::to_string(nanoseconds_from_epoch) - + (add_last ? std::string(",") + "\"timestampLast\":" + std::to_string(nanoseconds_from_epoch_le) : "") - + "}"; + + std::string(",") + "\"timestampLast\":" + std::to_string(nanoseconds_from_epoch_le) + + ",\"finished\":" + std::to_string(finished)+ ",\"nextStream\":\"" + next_stream) + + "\"}"; } -bool StreamInfo::SetFromJson(const std::string &json_string, bool read_last) { +bool StreamInfo::SetFromJson(const std::string &json_string) { auto old = *this; JsonStringParser parser(json_string); uint64_t id; - if ((read_last ? parser.GetUInt64("lastId", &last_id) : nullptr) || - (read_last ? !TimeFromJson(parser, "timestampLast", ×tamp_lastentry) : false) || + uint64_t finished_i; + if (parser.GetUInt64("lastId", &last_id) || + parser.GetUInt64("finished", &finished_i) || + parser.GetString("nextStream", &next_stream) || + !TimeFromJson(parser, "timestampLast", ×tamp_lastentry) || parser.GetString("name", &name) || !TimeFromJson(parser, "timestampCreated", ×tamp_created)) { *this = old; return false; } + finished=bool(finished_i); return true; } diff --git a/common/cpp/unittests/data_structs/test_data_structs.cpp b/common/cpp/unittests/data_structs/test_data_structs.cpp index 17357f5cbf7bdb2b7404bb4d920edd37813557bd..4a46f638012bfdb5e6c304835765ca3b667cfc71 100644 --- a/common/cpp/unittests/data_structs/test_data_structs.cpp +++ b/common/cpp/unittests/data_structs/test_data_structs.cpp @@ -140,6 +140,8 @@ StreamInfo PrepareStreamInfo() { StreamInfo sinfo; sinfo.last_id = 123; sinfo.name = "test"; + sinfo.next_stream = "next"; + sinfo.finished = true; sinfo.timestamp_created = std::chrono::time_point<std::chrono::system_clock>(std::chrono::milliseconds(1)); sinfo.timestamp_lastentry = std::chrono::time_point<std::chrono::system_clock>(std::chrono::milliseconds(2)); return sinfo; @@ -157,36 +159,24 @@ TEST(StreamInfo, ConvertFromJson) { StreamInfo result; auto sinfo = PrepareStreamInfo(); - std::string json = sinfo.Json(true); + std::string json = sinfo.Json(); - auto ok = result.SetFromJson(json,true); + auto ok = result.SetFromJson(json); ASSERT_THAT(ok, Eq(true)); ASSERT_THAT(result.last_id, sinfo.last_id); ASSERT_THAT(result.name, sinfo.name); ASSERT_THAT(result.timestamp_created, sinfo.timestamp_created); ASSERT_THAT(result.timestamp_lastentry, sinfo.timestamp_lastentry); + ASSERT_THAT(result.finished, sinfo.finished); + ASSERT_THAT(result.next_stream, sinfo.next_stream); } -TEST(StreamInfo, ConvertFromJsonWithoutID) { - StreamInfo result; - - auto sinfo = PrepareStreamInfo(); - std::string json = sinfo.Json(false); - - auto ok = result.SetFromJson(json,false); - - ASSERT_THAT(ok, Eq(true)); - ASSERT_THAT(result.name, sinfo.name); - ASSERT_THAT(result.timestamp_created, sinfo.timestamp_created); -} - - TEST(StreamInfo, ConvertFromJsonErr) { StreamInfo result; std::string json = R"({"lastId":123)"; - auto ok = result.SetFromJson(json,true); + auto ok = result.SetFromJson(json); ASSERT_THAT(ok, Eq(false)); ASSERT_THAT(result.last_id, Eq(0)); @@ -195,20 +185,12 @@ TEST(StreamInfo, ConvertFromJsonErr) { TEST(StreamInfo, ConvertToJson) { auto sinfo = PrepareStreamInfo(); - std::string expected_json = R"({"lastId":123,"name":"test","timestampCreated":1000000,"timestampLast":2000000})"; - auto json = sinfo.Json(true); + std::string expected_json = R"({"lastId":123,"name":"test","timestampCreated":1000000,"timestampLast":2000000,"finished":1,"nextStream":"next"})"; + auto json = sinfo.Json(); - ASSERT_THAT(expected_json, Eq(json)); + ASSERT_THAT(json,Eq(expected_json)); } -TEST(StreamInfo, ConvertToJsonWithoutID) { - auto sinfo = PrepareStreamInfo(); - - std::string expected_json = R"({"name":"test","timestampCreated":1000000})"; - auto json = sinfo.Json(false); - - ASSERT_THAT(expected_json, Eq(json)); -} TEST(SourceCredentials, ConvertToString) { auto sc = SourceCredentials{SourceType::kRaw,"beamtime","beamline","source","token"}; diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index 02dcb68e5f4b92c264a4ffe5bfbaea0c0af6fa8c..19f632c34e508c029acf5d60982f8283609ee0eb 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -12,6 +12,12 @@ namespace asapo { +enum class StreamFilter { + kAllStreams, + kFinishedStreams, + kUnFinishedStreams +}; + class Consumer { public: //! Reset counter for the specific group. @@ -73,8 +79,8 @@ class Consumer { */ virtual NetworkConnectionType CurrentConnectionType() const = 0; - //! Get list of streams, set from to "" to get all streams - virtual StreamInfos GetStreamList(std::string from, Error* err) = 0; + //! Get list of streams with filter, set from to "" to get all streams + virtual StreamInfos GetStreamList(std::string from, StreamFilter filter, Error* err) = 0; //! Get current number of messages in stream /*! diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index 4386dd9b1f948fb54341ba1ea46c793176eb9ccd..dc8c9487e38b389a065c99ab35053c14a2367019 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -644,7 +644,7 @@ StreamInfos ParseStreamsFromResponse(std::string response, Error* err) { } for (auto stream_encoded : streams_endcoded) { StreamInfo si; - auto ok = si.SetFromJson(stream_encoded, false); + auto ok = si.SetFromJson(stream_encoded); if (!ok) { *err = TextError("cannot parse " + stream_encoded); return StreamInfos{}; @@ -654,14 +654,20 @@ StreamInfos ParseStreamsFromResponse(std::string response, Error* err) { return streams; } -StreamInfos ConsumerImpl::GetStreamList(std::string from, Error* err) { - - RequestInfo ri; - ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + "/0/streams"; - ri.post = false; - if (!from.empty()) { - ri.extra_params = "&from=" + from; +std::string filterToString(StreamFilter filter) { + switch(filter) { + case StreamFilter::kAllStreams: + return "all"; + case StreamFilter::kFinishedStreams: + return "finished"; + case StreamFilter::kUnFinishedStreams: + return "unfinished"; } +} + + +StreamInfos ConsumerImpl::GetStreamList(std::string from,StreamFilter filter, Error* err) { + RequestInfo ri = GetStreamListRequest(from, filter); auto response = BrokerRequestWithTimeout(ri, err); if (*err) { @@ -671,6 +677,17 @@ StreamInfos ConsumerImpl::GetStreamList(std::string from, Error* err) { return ParseStreamsFromResponse(std::move(response), err); } +RequestInfo ConsumerImpl::GetStreamListRequest(const std::string &from, const StreamFilter &filter) const { + RequestInfo ri; + ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + "/0/streams"; + ri.post = false; + if (!from.empty()) { + ri.extra_params = "&from=" + from; + } + ri.extra_params +="&filter="+filterToString(filter); + return ri; +} + Error ConsumerImpl::UpdateFolderTokenIfNeeded(bool ignore_existing) { if (!folder_token_.empty() && !ignore_existing) { return nullptr; diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index 27488d22dfa23acbe0b7c4c21d8761c134bc33b4..e85028a9de231338b97dd4fb2f8e2b837840eb7a 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -100,7 +100,7 @@ class ConsumerImpl final : public asapo::Consumer { Error RetrieveData(MessageMeta* info, MessageData* data) override; - StreamInfos GetStreamList(std::string from, Error* err) override; + StreamInfos GetStreamList(std::string from, StreamFilter filter, Error* err) override; void SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) override; virtual void InterruptCurrentOperation() override; @@ -144,9 +144,9 @@ class ConsumerImpl final : public asapo::Consumer { Error UpdateFolderTokenIfNeeded(bool ignore_existing); uint64_t GetCurrentCount(std::string stream, bool datasets, bool include_incomplete, Error* err); + RequestInfo GetStreamListRequest(const std::string &from, const StreamFilter &filter) const; - - std::string endpoint_; + std::string endpoint_; std::string current_broker_uri_; std::string current_fts_uri_; std::string source_path_; @@ -163,6 +163,7 @@ class ConsumerImpl final : public asapo::Consumer { uint64_t delay_ms_; uint64_t resend_attempts_; std::atomic<bool> interrupt_flag_{ false}; + }; } diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp index c252a3ec8eaa668fb90ce8a3582143449be1f036..36d083511342f0e6c49a84afa393bda36f717d6b 100644 --- a/consumer/api/cpp/unittests/test_consumer_impl.cpp +++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp @@ -1017,36 +1017,37 @@ TEST_F(ConsumerImplTests, GetDatasetByIdUsesCorrectUri) { TEST_F(ConsumerImplTests, GetStreamListUsesCorrectUri) { MockGetBrokerUri(); std::string return_streams = - R"({"streams":[{"lastId":123,"name":"test","timestampCreated":1000000},{"name":"test1","timestampCreated":2000000}]})"; + std::string(R"({"streams":[{"lastId":123,"name":"test","timestampCreated":1000000,"timestampLast":1000,"finished":0,"nextStream":""},)")+ + R"({"lastId":124,"name":"test1","timestampCreated":2000000,"timestampLast":2000,"finished":1,"nextStream":"next"}]})"; EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/0/streams" - + "?token=" + expected_token + "&from=stream_from", _, + + "?token=" + expected_token + "&from=stream_from&filter=all", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), Return(return_streams))); asapo::Error err; - auto streams = consumer->GetStreamList("stream_from", &err); + auto streams = consumer->GetStreamList("stream_from",asapo::StreamFilter::kAllStreams, &err); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(streams.size(), Eq(2)); ASSERT_THAT(streams.size(), 2); - ASSERT_THAT(streams[0].Json(false), R"({"name":"test","timestampCreated":1000000})"); - ASSERT_THAT(streams[1].Json(false), R"({"name":"test1","timestampCreated":2000000})"); + ASSERT_THAT(streams[0].Json(), R"({"lastId":123,"name":"test","timestampCreated":1000000,"timestampLast":1000,"finished":0,"nextStream":""})"); + ASSERT_THAT(streams[1].Json(), R"({"lastId":124,"name":"test1","timestampCreated":2000000,"timestampLast":2000,"finished":1,"nextStream":"next"})"); } TEST_F(ConsumerImplTests, GetStreamListUsesCorrectUriWithoutFrom) { MockGetBrokerUri(); EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/0/streams" - + "?token=" + expected_token, _, + + "?token=" + expected_token+"&filter=finished", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), Return("")));; asapo::Error err; - auto streams = consumer->GetStreamList("", &err); + auto streams = consumer->GetStreamList("",asapo::StreamFilter::kFinishedStreams, &err); } void ConsumerImplTests::MockBeforeFTS(MessageData* data) { diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index cea9988b77dd09a22309a45dfb32f622dd4d21d6..279ae39e4adcf0bff444fa694d584d67b10928b8 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -46,8 +46,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo": string data_source string user_token cppclass StreamInfo: - string Json(bool add_last_id) - bool SetFromJson(string json_str, bool read_last_id) + string Json() cdef extern from "asapo/asapo_consumer.h" namespace "asapo": cppclass NetworkConnectionType: @@ -55,6 +54,11 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo": NetworkConnectionType NetworkConnectionType_kUndefined "asapo::NetworkConnectionType::kUndefined" NetworkConnectionType NetworkConnectionType_kAsapoTcp "asapo::NetworkConnectionType::kAsapoTcp" NetworkConnectionType NetworkConnectionType_kFabric "asapo::NetworkConnectionType::kFabric" + cppclass StreamFilter: + pass + StreamFilter StreamFilter_kAllStreams "asapo::StreamFilter::kAllStreams" + StreamFilter StreamFilter_kFinishedStreams "asapo::StreamFilter::kFinishedStreams" + StreamFilter StreamFilter_kUnFinishedStreams "asapo::StreamFilter::kUnFinishedStreams" cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: cdef cppclass Consumer: @@ -80,7 +84,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: DataSet GetLastDataset(uint64_t min_size, string stream, Error* err) DataSet GetDatasetById(uint64_t id, uint64_t min_size, string stream, Error* err) Error RetrieveData(MessageMeta* info, MessageData* data) - vector[StreamInfo] GetStreamList(string from_stream, Error* err) + vector[StreamInfo] GetStreamList(string from_stream, StreamFilter filter, Error* err) void SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) void InterruptCurrentOperation() diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 9e201004045e4e12696a5edc6d7f68173cb4f8d3..913883a351bb8df6708b9dfa723c43acbd5517fd 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -104,6 +104,15 @@ cdef throw_exception(Error& err, res = None): cdef class PyConsumer: cdef unique_ptr[Consumer] c_consumer + cdef StreamFilter _filter_to_cfilter(self,filter): + if filter == "all": + return StreamFilter_kAllStreams + elif filter == "finished": + return StreamFilter_kFinishedStreams + elif filter == "unfinished": + return StreamFilter_kUnFinishedStreams + else: + raise AsapoWrongInputError("wrong filter, must be all|finished|unfinished") def _op(self, op, group_id, stream, meta_only, uint64_t id): cdef MessageMeta info cdef string b_group_id = _bytes(group_id) @@ -219,17 +228,18 @@ cdef class PyConsumer: if err: throw_exception(err) return _str(group_id) - def get_stream_list(self, from_stream = ""): + def get_stream_list(self, filter="all", from_stream = ""): cdef Error err cdef vector[StreamInfo] streams cdef string b_from_stream = _bytes(from_stream) + cdef StreamFilter stream_filter = self._filter_to_cfilter(filter) with nogil: - streams = self.c_consumer.get().GetStreamList(b_from_stream,&err) + streams = self.c_consumer.get().GetStreamList(b_from_stream,stream_filter,&err) if err: throw_exception(err) list = [] for stream in streams: - list.append(json.loads(_str(stream.Json(False)))) + list.append(json.loads(_str(stream.Json()))) return list def acknowledge(self, group_id, uint64_t id, stream = "default"): cdef string b_group_id = _bytes(group_id) diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index 3717a1af89568c1e8c9b701603a143824c840007..7514daae822e24e75c12a41d8d2d898c00bad022 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -319,7 +319,7 @@ void ActivatePromise(std::shared_ptr<std::promise<StreamInfoResult>> promise, Re Error err) { StreamInfoResult res; if (err == nullptr) { - auto ok = res.sinfo.SetFromJson(payload.response,true); + auto ok = res.sinfo.SetFromJson(payload.response); res.err = ok ? nullptr : ProducerErrorTemplates::kInternalServerError.Generate( std::string("cannot read JSON string from server response: ") + payload.response).release(); } else { diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index aaac948f94739b9c7bc57236264d80cc710fca2f..f35a341c933c0dd447ea318ef8b0bae1936c3e94 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -28,8 +28,7 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo": uint8_t[] release() uint8_t[] get() cppclass StreamInfo: - string Json(bool add_last_id) - bool SetFromJson(string json_str, bool read_last_id) + string Json() cdef extern from "asapo/asapo_producer.h" namespace "asapo": cppclass RequestHandlerType: diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 436f1a4ac57129b2a57c89d49d96e836b4bd7e6e..e40d7a5c9e233f7fab8e14802e2cf828ea0f2e23 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -220,7 +220,7 @@ cdef class PyProducer: info = self.c_producer.get().GetStreamInfo(b_stream,timeout_ms,&err) if err: throw_exception(err) - return json.loads(_str(info.Json(True))) + return json.loads(_str(info.Json())) def last_stream(self, uint64_t timeout_ms = 1000): """ @@ -237,7 +237,7 @@ cdef class PyProducer: info = self.c_producer.get().GetLastStream(timeout_ms,&err) if err: throw_exception(err) - return json.loads(_str(info.Json(True))) + return json.loads(_str(info.Json())) def send_file(self, uint64_t id, local_path, exposed_path, user_meta=None, dataset=None, ingest_mode = DEFAULT_INGEST_MODE, stream = "default", callback=None): """ :param id: unique data id diff --git a/receiver/src/request_handler/request_handler_db_last_stream.cpp b/receiver/src/request_handler/request_handler_db_last_stream.cpp index ed5978b6e7f62a7241369b356824813795453ece..7e31468f565b0306b0334cc18420b43ba797f6d5 100644 --- a/receiver/src/request_handler/request_handler_db_last_stream.cpp +++ b/receiver/src/request_handler/request_handler_db_last_stream.cpp @@ -20,7 +20,7 @@ Error RequestHandlerDbLastStream::ProcessRequest(Request* request) const { if (!err) { log__->Debug(std::string{"get last stream "} + " in " + db_name_ + " at " + GetReceiverConfig()->database_uri); - request->SetResponseMessage(info.Json(true), ResponseMessageType::kInfo); + request->SetResponseMessage(info.Json(), ResponseMessageType::kInfo); } return err; } diff --git a/receiver/src/request_handler/request_handler_db_stream_info.cpp b/receiver/src/request_handler/request_handler_db_stream_info.cpp index 20221ba8c3babb5466046cde16baee96f145bfe0..65d194ccfa1f570fa51341d58e6e3b799a50528c 100644 --- a/receiver/src/request_handler/request_handler_db_stream_info.cpp +++ b/receiver/src/request_handler/request_handler_db_stream_info.cpp @@ -21,7 +21,7 @@ Error RequestHandlerDbStreamInfo::ProcessRequest(Request* request) const { log__->Debug(std::string{"get stream info from "} + col_name + " in " + db_name_ + " at " + GetReceiverConfig()->database_uri); info.name = request->GetStream(); - request->SetResponseMessage(info.Json(true), ResponseMessageType::kInfo); + request->SetResponseMessage(info.Json(), ResponseMessageType::kInfo); } return err; } diff --git a/tests/automatic/consumer/consumer_api/check_linux.sh b/tests/automatic/consumer/consumer_api/check_linux.sh index 3fb2718ca677c12d23096a68694338b0e3911f70..9f599d42b2d01c476ecd9d33aae114cf6438e092 100644 --- a/tests/automatic/consumer/consumer_api/check_linux.sh +++ b/tests/automatic/consumer/consumer_api/check_linux.sh @@ -36,11 +36,13 @@ do echo 'db.data_stream1.insert({"_id":'$i',"size":6,"name":"'1$i'","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} done +echo 'db.data_stream1.insert({"_id":'6',"size":0,"name":"asapo_finish_stream","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"ns"}})' | mongo ${database_name} + for i in `seq 1 5`; do echo 'db.data_stream2.insert({"_id":'$i',"size":6,"name":"'2$i'","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} done - +echo 'db.data_stream2.insert({"_id":'6',"size":0,"name":"asapo_finish_stream","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"asapo_no_next"}})' | mongo ${database_name} echo hello1 > 1 diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp index 61f5b38ad4fa1bd5dabbea98700a04ff6f670f46..e72b7cb189050e158a52bdd5e581734d01a0bcbc 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.cpp +++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp @@ -84,6 +84,14 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str M_AssertTrue(err == nullptr, "GetCurrentSize no error"); M_AssertTrue(size == 10, "GetCurrentSize size"); + auto size1 = consumer->GetCurrentSize("stream1", &err); + M_AssertTrue(err == nullptr, "GetCurrentSize 1 no error"); + M_AssertTrue(size1 == 5, "GetCurrentSize 1 size"); + + auto size2 = consumer->GetCurrentSize("stream2", &err); + M_AssertTrue(err == nullptr, "GetCurrentSize 2 no error"); + M_AssertTrue(size2 == 5, "GetCurrentSize 2 size"); + err = consumer->ResetLastReadMarker(group_id,"default"); M_AssertTrue(err == nullptr, "SetLastReadMarker"); @@ -118,7 +126,6 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str M_AssertTrue(err != nullptr, "query5"); M_AssertTrue(messages.size() == 0, "size of query answer 5"); - //streams err = consumer->GetNext(group_id, &fi, nullptr, "stream1"); @@ -133,17 +140,18 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str M_AssertTrue(err == nullptr, "GetNext stream2 no error"); M_AssertTrue(fi.name == "21", "GetNext stream2 filename"); - auto streams = consumer->GetStreamList("",&err); + auto streams = consumer->GetStreamList("",asapo::StreamFilter::kAllStreams,&err); M_AssertTrue(err == nullptr, "GetStreamList no error"); M_AssertTrue(streams.size() == 3, "streams.size"); M_AssertTrue(streams[0].name == "default", "streams0.name1"); M_AssertTrue(streams[1].name == "stream1", "streams1.name2"); M_AssertTrue(streams[2].name == "stream2", "streams2.name3"); - std::cout<<streams[0].Json(false)<<std::endl; - std::cout<<streams[1].Json(false)<<std::endl; - std::cout<<streams[2].Json(false)<<std::endl; + M_AssertTrue(streams[1].finished == true, "stream1 finished"); + M_AssertTrue(streams[1].next_stream == "next_stream", "stream1 next stream"); + M_AssertTrue(streams[2].finished == true, "stream2 finished"); + M_AssertTrue(streams[2].next_stream == "", "stream2 no next stream"); M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[0].timestamp_created) == 0, "streams0.timestamp"); - M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[0].timestamp_lastentry) == 0, "streams0.timestamp lastentry not set"); + M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[0].timestamp_lastentry) > 0, "streams0.timestamp lastentry set"); M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[1].timestamp_created) == 1000, "streams1.timestamp"); M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[2].timestamp_created) == 2000, "streams2.timestamp"); // acknowledges