Skip to content
Snippets Groups Projects

refactor stream finish flag

Merged Mikhail Karnevskiy requested to merge refactor_stream_finish_flag into develop
@@ -9,7 +9,6 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
@@ -483,10 +482,13 @@ func (db *Mongodb) getRecordByID(request Request) ([]byte, error, uint64) {
if err != nil {
return nil, err, 0
}
if errFinish := db.checkStreamFinishedDB(request, max_ind, params.IdKey == "message_id"); errFinish != nil {
return nil, errFinish, 0
}
data, err := db.getRecordByIDRaw(request, params.Id, max_ind, params.IdKey)
// If error to get record from DB, may be stream is already finished
if err != nil && params.Id > max_ind {
if errFinish := db.checkStreamFinishedDB(request, max_ind, params.IdKey == "message_id"); errFinish != nil {
return nil, errFinish, 0
}
}
return data, err, uint64(params.Id)
}
@@ -740,7 +742,7 @@ func (db *Mongodb) getStreamFinishedFlag(dbname string, stream string) (int, str
}
next_stream := info.NextStream
if info.NextStream == "" {
next_stream = "asapo_no_next"
next_stream = no_next_stream_keyword
}
return int(info.LastId), next_stream, nil
}
@@ -752,7+754,7 @@
return size, err
}
func (db *Mongodb) getNumberOfDatasets(dbname string, stream string, minDatasetSize int) (int64, error) {
filter := bson.M{"stream": stream}
if minDatasetSize > 0 {
filter["$expr"] = bson.M{"$gte": []interface{}{bson.M{"$size": "$messages"}, minDatasetSize}}
@@ -764,7+766,7 @@
return size, err
}
func (db *Mongodb) hasInProcessMessages(dbname string, groupId string, stream string) bool {
func (db *Mongodb) hasInProcessMessages(dbname string, groupId string, stream string) (bool, error) {
c := db.client.Database(dbname).Collection(inprocess_collection_name_prefix)
filter := bson.M{"stream": stream, "group_id": groupId,
"$expr": bson.M{"$lt": []string{"$resendAttempts", "$maxResendAttempts"}}}
var res map[string]interface{}
err := c.FindOne(context.TODO(), filter, options.FindOne()).Decode(&res)
fmt.Println(err, res)
if err != nil {
return false
if err == mongo.ErrNoDocuments {
return false, nil
}
// If request susses, there are messages to be processed
return true
// It is unknown if the stream is finished but it safe to assume that it is not because the client can just try again.
return true, err
}
// Check if stream is already finished
// If messages consumed ordered by message_id it is enough to check that currentId is already at maximum
// Otherwise one need to check that number of documents is as expected and nothing is left in inProcess
// Number of documents in case of Datasets counts with respect of their completeness
// A stream is finished when the following conditions apply
// 1. The stream finished flag is set
// 2. The stream contains as many messages (that meet the completeness criteria in case of datasets) as given by the flag
// 3. There are no unacknowledged messages waiting to be resend
func (db *Mongodb) checkStreamFinishedDB(request Request, currentId int, ordered bool) error {
fmt.Println("Check Finished")
maxId, nextStream, err := db.getStreamFinishedFlag(request.DbName(), request.Stream)
if err != nil {
return nil
}
// This is the `ordered=True` case. Because skipped message ids are never resent,
// Because skipped message ids are never resent, the stream is considered finished when the `currentId` reaches the
// `maxId` of the stream finished flag.
// the stream is considered finished when the `currentId` reaches the `maxId` of the stream finished flag.
// This shortcut does not apply to datasets, because datasets might be still incomplete.
if ordered && !request.DatasetOp {
if currentId == maxId {
answer := encodeAnswer(maxId, maxId, nextStream)
@@ -803,13 +805,11 @@ func (db *Mongodb) checkStreamFinishedDB(request Request, currentId int, ordered
var size int64
if request.DatasetOp {
fmt.Println("Check Finished DS ", request.MinDatasetSize)
size, err = db.getNumberOfDatasets(request.DbName(), request.Stream, request.MinDatasetSize)
fmt.Println(size, "SIZE", maxId, nextStream)
} else {
size, err = db.getNumberOfMessages(request.DbName(), request.Stream)
}
// Stream is not finished if size of the dataset is not estimated
// It is unknown if the stream is finished but it safe to assume that it is not because the client can just try again.
if err != nil {
return nil
}
@@ -817,7 +817,11 @@ func (db *Mongodb) checkStreamFinishedDB(request Request, currentId int, ordered
return nil
}
hasInProcess := db.hasInProcessMessages(request.DbName(), request.GroupId, request.Stream)
hasInProcess, err := db.hasInProcessMessages(request.DbName(), request.GroupId, request.Stream)
// It is unknown if the stream is finished but it safe to assume that it is not because the client can just try again.
if err != nil {
return nil
}
if hasInProcess {
return nil
}
@@ -877,6 +881,7 @@ func (db *Mongodb) getLastRecord(request Request, idKey string) ([]byte, error,
if err != nil {
return nil, err, 0
}
// If stream is finished function will return stream finish error instead of the last message
if errFinish := db.checkStreamFinishedDB(request, max_ind, idKey == "message_id"); errFinish != nil {
return nil, errFinish, 0
}
Loading