Skip to content
Snippets Groups Projects
Commit c40fe85d authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Handle holes in _id during processing of get_next_available request.

parent 2e960380
No related branches found
No related tags found
No related merge requests found
......@@ -9,6 +9,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
......@@ -207,6 +208,21 @@ func maxIndexQuery(request Request, returnIncompete bool) bson.M {
return q
}
func getIntVal(request Request, val interface{}) (int32, error) {
var result int32
switch v := val.(type) {
default:
request.Logger().Debug("unexpected type %T", v)
return 0, errors.New("cannot convert value to int")
case int32:
result, _ = val.(int32)
case int64:
val, _ := val.(int64)
result = int32(val)
}
return result, nil
}
func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool, idKey string) (int, error) {
c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream)
q := maxIndexQuery(request, returnIncompete)
......@@ -220,8 +236,8 @@ func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool, idKey stri
if err != nil {
return 0, err
}
maxId, ok := result[idKey].(int32)
if !ok {
maxId, ok := getIntVal(request, result[idKey])
if ok != nil {
return 0, errors.New("cannot get max index by " + idKey)
}
return int(maxId), nil
......@@ -604,6 +620,95 @@ func ExtractMessageRecord(data map[string]interface{}) (MessageRecord, bool) {
return r, true
}
func (db *Mongodb) getNextRecordDB(request Request, currentPointer int) (map[string]interface{}, error) {
var res map[string]interface{}
filter := bson.D{{"_id", bson.D{{"$gt", currentPointer}}}}
opts := options.FindOne().SetSort(bson.M{"_id": 1})
coll := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream)
err := coll.FindOne(context.TODO(), filter, opts).Decode(&res)
return res, err
}
func (db *Mongodb) getCurrentPointerDB(request Request) (LocationPointer, error) {
var curPointer LocationPointer
filter := bson.M{"_id": request.GroupId + "_" + request.Stream}
opts := options.FindOne()
coll := db.client.Database(request.DbName()).Collection(pointer_collection_name)
err := coll.FindOne(context.TODO(), filter, opts).Decode(&curPointer)
return curPointer, err
}
func (db *Mongodb) updateCurrentPointerIfEqualDB(request Request, currentValue, nextValue int) error {
update := bson.M{"$set": bson.M{pointer_field_name: nextValue}}
opts := options.Update().SetUpsert(false)
coll := db.client.Database(request.DbName()).Collection(pointer_collection_name)
filter := bson.M{"_id": request.GroupId + "_" + request.Stream, pointer_field_name: currentValue}
_, err := coll.UpdateOne(context.TODO(), filter, update, opts)
return err
}
func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNext) ([]byte, int, error) {
nextInd, maxInd, err := db.getNextAndMaxIndexesFromCurPointer(request, pointer_collection_name, params.IdKey)
if err != nil {
return nil, 0, err
}
var res map[string]interface{}
res, err = db.getRecordFromDb(request, nextInd, maxInd, params.IdKey)
// Missing Id! Since Ids are sequential, jump to the next available id
// Update current pointer to next available id
if err != nil && params.IdKey == "_id" {
for {
curPointer, err := db.getCurrentPointerDB(request)
if err != nil {
return nil, 0, err
}
res, err = db.getNextRecordDB(request, curPointer.Value)
if err != nil {
return nil, 0, err
}
nextValue, ok := getIntVal(request, res["_id"])
if ok != nil {
return nil, 0, errors.New("failed to next next available id. Extraction of id fails")
}
updateErr := db.updateCurrentPointerIfEqualDB(request, curPointer.Value, int(nextValue))
// If update fails, another broker already change the value. Go to the next one.
if updateErr == nil {
nextInd = int(nextValue)
break
}
}
} else {
// In this case we can not guess, what is the next
if err != nil {
return nil, 0, err
}
}
// Copy value of d key to _id to keep back compatibility
if res["message_id"] != nil {
res["_id"] = res["message_id"]
}
if err := checkStreamFinished(request, nextInd, maxInd, res); err != nil {
return nil, 0, err
}
request.Logger().WithFields(map[string]interface{}{"id": nextInd}).Debug("got record from db by id key: ",
params.IdKey)
record, err := utils.MapToJson(&res)
if err != nil {
return nil, 0, err
}
if recordContainsPartialData(request, res) {
return nil, 0, &DBError{utils.StatusPartialData, string(record)}
} else {
fmt.Println("Return record without error: ", string(record))
return record, nextInd, nil
}
}
func (db *Mongodb) getRecordFromInprocessed(request Request, params ExtraParamNext, originalerror error, ignoreTimeout bool) ([]byte, int, error) {
// Get next index from inprocessed collection and
// return corresponding data
......@@ -640,15 +745,24 @@ func (db *Mongodb) getNextRecord(request Request) ([]byte, error) {
if err != nil {
return nil, errors.New("fails to extract request parameters: " + err.Error())
}
request.Logger().Debug("get parameters ", params.IdKey)
request.Logger().Debug("get next by : ", params.IdKey)
nextInd, maxInd, err := db.getNextAndMaxIndexes(request, params)
var data []byte
var nextInd int
data, nextInd, err = db.getRecordFromInprocessed(request, params, nil, false)
if err != nil {
return nil, err
}
data, err := db.getRecordByIDRaw(request, nextInd, maxInd, params.IdKey)
if err != nil {
if data == nil {
data, nextInd, err = db.getRecordFromCurPointer(request, params)
if err != nil {
request.Logger().Debug("error getting record from current pointer: ", err)
return nil, err
}
}
if data == nil {
data, nextInd, err = db.getRecordFromInprocessed(request, params, err, true)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment