From c40fe85de151b5a602e4fbd20f7897274ce32ce2 Mon Sep 17 00:00:00 2001
From: karnem <mikhail.karnevskiy@desy.de>
Date: Thu, 7 Sep 2023 12:04:06 +0200
Subject: [PATCH] Handle holes in _id during processing of get_next_available
 request.

---
 broker/src/asapo_broker/database/mongodb.go | 126 +++++++++++++++++++-
 1 file changed, 120 insertions(+), 6 deletions(-)

diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go
index ce7ddcc20..fe5046e5e 100644
--- a/broker/src/asapo_broker/database/mongodb.go
+++ b/broker/src/asapo_broker/database/mongodb.go
@@ -9,6 +9,7 @@ import (
 	"context"
 	"encoding/json"
 	"errors"
+	"fmt"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.mongodb.org/mongo-driver/bson/primitive"
 	"go.mongodb.org/mongo-driver/mongo"
@@ -207,6 +208,21 @@ func maxIndexQuery(request Request, returnIncompete bool) bson.M {
 	return q
 }
 
+func getIntVal(request Request, val interface{}) (int32, error) {
+	var result int32
+	switch v := val.(type) {
+	default:
+		request.Logger().Debug("unexpected type %T", v)
+		return 0, errors.New("cannot convert value to int")
+	case int32:
+		result, _ = val.(int32)
+	case int64:
+		val, _ := val.(int64)
+		result = int32(val)
+	}
+	return result, nil
+}
+
 func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool, idKey string) (int, error) {
 	c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream)
 	q := maxIndexQuery(request, returnIncompete)
@@ -220,8 +236,8 @@ func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool, idKey stri
 	if err != nil {
 		return 0, err
 	}
-	maxId, ok := result[idKey].(int32)
-	if !ok {
+	maxId, ok := getIntVal(request, result[idKey])
+	if ok != nil {
 		return 0, errors.New("cannot get max index by " + idKey)
 	}
 	return int(maxId), nil
@@ -604,6 +620,95 @@ func ExtractMessageRecord(data map[string]interface{}) (MessageRecord, bool) {
 	return r, true
 }
 
+func (db *Mongodb) getNextRecordDB(request Request, currentPointer int) (map[string]interface{}, error) {
+	var res map[string]interface{}
+	filter := bson.D{{"_id", bson.D{{"$gt", currentPointer}}}}
+	opts := options.FindOne().SetSort(bson.M{"_id": 1})
+	coll := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream)
+	err := coll.FindOne(context.TODO(), filter, opts).Decode(&res)
+	return res, err
+}
+
+func (db *Mongodb) getCurrentPointerDB(request Request) (LocationPointer, error) {
+	var curPointer LocationPointer
+	filter := bson.M{"_id": request.GroupId + "_" + request.Stream}
+	opts := options.FindOne()
+	coll := db.client.Database(request.DbName()).Collection(pointer_collection_name)
+	err := coll.FindOne(context.TODO(), filter, opts).Decode(&curPointer)
+	return curPointer, err
+}
+
+func (db *Mongodb) updateCurrentPointerIfEqualDB(request Request, currentValue, nextValue int) error {
+	update := bson.M{"$set": bson.M{pointer_field_name: nextValue}}
+	opts := options.Update().SetUpsert(false)
+	coll := db.client.Database(request.DbName()).Collection(pointer_collection_name)
+	filter := bson.M{"_id": request.GroupId + "_" + request.Stream, pointer_field_name: currentValue}
+	_, err := coll.UpdateOne(context.TODO(), filter, update, opts)
+	return err
+}
+
+func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNext) ([]byte, int, error) {
+
+	nextInd, maxInd, err := db.getNextAndMaxIndexesFromCurPointer(request, pointer_collection_name, params.IdKey)
+	if err != nil {
+		return nil, 0, err
+	}
+	var res map[string]interface{}
+	res, err = db.getRecordFromDb(request, nextInd, maxInd, params.IdKey)
+	// Missing Id! Since Ids are sequential, jump to the next available id
+	// Update current pointer to next available id
+	if err != nil && params.IdKey == "_id" {
+		for {
+			curPointer, err := db.getCurrentPointerDB(request)
+			if err != nil {
+				return nil, 0, err
+			}
+			res, err = db.getNextRecordDB(request, curPointer.Value)
+			if err != nil {
+				return nil, 0, err
+			}
+			nextValue, ok := getIntVal(request, res["_id"])
+			if ok != nil {
+				return nil, 0, errors.New("failed to next next available id. Extraction of id fails")
+			}
+			updateErr := db.updateCurrentPointerIfEqualDB(request, curPointer.Value, int(nextValue))
+			// If update fails, another broker already change the value. Go to the next one.
+			if updateErr == nil {
+				nextInd = int(nextValue)
+				break
+			}
+		}
+	} else {
+		// In this case we can not guess, what is the next
+		if err != nil {
+			return nil, 0, err
+		}
+	}
+
+	// Copy value of d key to _id to keep back compatibility
+	if res["message_id"] != nil {
+		res["_id"] = res["message_id"]
+	}
+	if err := checkStreamFinished(request, nextInd, maxInd, res); err != nil {
+		return nil, 0, err
+	}
+
+	request.Logger().WithFields(map[string]interface{}{"id": nextInd}).Debug("got record from db by id key: ",
+		params.IdKey)
+
+	record, err := utils.MapToJson(&res)
+	if err != nil {
+		return nil, 0, err
+	}
+	if recordContainsPartialData(request, res) {
+		return nil, 0, &DBError{utils.StatusPartialData, string(record)}
+	} else {
+		fmt.Println("Return record without error: ", string(record))
+		return record, nextInd, nil
+	}
+
+}
+
 func (db *Mongodb) getRecordFromInprocessed(request Request, params ExtraParamNext, originalerror error, ignoreTimeout bool) ([]byte, int, error) {
 	// Get next index from inprocessed collection and
 	// return corresponding data
@@ -640,15 +745,24 @@ func (db *Mongodb) getNextRecord(request Request) ([]byte, error) {
 	if err != nil {
 		return nil, errors.New("fails to extract request parameters: " + err.Error())
 	}
-	request.Logger().Debug("get parameters ", params.IdKey)
+	request.Logger().Debug("get next by : ", params.IdKey)
 
-	nextInd, maxInd, err := db.getNextAndMaxIndexes(request, params)
+	var data []byte
+	var nextInd int
+	data, nextInd, err = db.getRecordFromInprocessed(request, params, nil, false)
 	if err != nil {
 		return nil, err
 	}
 
-	data, err := db.getRecordByIDRaw(request, nextInd, maxInd, params.IdKey)
-	if err != nil {
+	if data == nil {
+		data, nextInd, err = db.getRecordFromCurPointer(request, params)
+		if err != nil {
+			request.Logger().Debug("error getting record from current pointer: ", err)
+			return nil, err
+		}
+	}
+
+	if data == nil {
 		data, nextInd, err = db.getRecordFromInprocessed(request, params, err, true)
 	}
 
-- 
GitLab