From 0b9cdcfb39da46f668a615e3cfbc98dc4a242916 Mon Sep 17 00:00:00 2001
From: karnem <mikhail.karnevskiy@desy.de>
Date: Fri, 3 Nov 2023 16:43:10 +0100
Subject: [PATCH] Fix resend in case of partial data. Add more MongoDB tests on
 GetNext.

---
 broker/CMakeLists.txt                         |   2 +-
 broker/src/asapo_broker/database/mongodb.go   |  60 +++-
 .../src/asapo_broker/database/mongodb_test.go | 333 +++++++++++++-----
 3 files changed, 285 insertions(+), 110 deletions(-)

diff --git a/broker/CMakeLists.txt b/broker/CMakeLists.txt
index 8d4e3ca3f..9d47fd96b 100644
--- a/broker/CMakeLists.txt
+++ b/broker/CMakeLists.txt
@@ -24,4 +24,4 @@ gotest(${TARGET_NAME} "${CMAKE_CURRENT_SOURCE_DIR}/src/asapo_broker" "./...")
 
 
 go_integration_test(${TARGET_NAME}-connectdb "${CMAKE_CURRENT_SOURCE_DIR}/src/asapo_broker" "./..." "MongoDBConnect")
-go_integration_test(${TARGET_NAME}-nextrecord "${CMAKE_CURRENT_SOURCE_DIR}/src/asapo_broker" "./..." "MongoDBNext")
+go_integration_test(${TARGET_NAME}-nextrecord "${CMAKE_CURRENT_SOURCE_DIR}/src/asapo_broker" "./..." "MongoDB")
diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go
index fe5046e5e..b32961369 100644
--- a/broker/src/asapo_broker/database/mongodb.go
+++ b/broker/src/asapo_broker/database/mongodb.go
@@ -9,7 +9,6 @@ import (
 	"context"
 	"encoding/json"
 	"errors"
-	"fmt"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.mongodb.org/mongo-driver/bson/primitive"
 	"go.mongodb.org/mongo-driver/mongo"
@@ -182,6 +181,17 @@ func (db *Mongodb) insertRecord(dbname string, collection_name string, s interfa
 	return err
 }
 
+func (db *Mongodb) replaceRecord(dbname string, id int, collection_name string, s interface{}) error {
+	if db.client == nil {
+		return &DBError{utils.StatusServiceUnavailable, no_session_msg}
+	}
+
+	c := db.client.Database(dbname).Collection(data_collection_name_prefix + collection_name)
+
+	_, err := c.ReplaceOne(context.TODO(), bson.M{"_id": id}, s)
+	return err
+}
+
 func (db *Mongodb) insertMeta(dbname string, s interface{}) error {
 	if db.client == nil {
 		return &DBError{utils.StatusServiceUnavailable, no_session_msg}
@@ -510,7 +520,6 @@ func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, delay
 		}
 		return 0, err
 	}
-
 	rlog.WithFields(map[string]interface{}{"id": res.ID}).Debug("got unprocessed message")
 	return res.ID, nil
 }
@@ -564,7 +573,6 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, params E
 		db.lastReadFromInprocess[request.Stream+"_"+request.GroupId] = time.Now().Unix()
 		dbSessionLock.Unlock()
 	}
-
 	return record_ind, max_ind, nil
 
 }
@@ -620,13 +628,17 @@ func ExtractMessageRecord(data map[string]interface{}) (MessageRecord, bool) {
 	return r, true
 }
 
+//Return next (based on current pointer) available record from DB
 func (db *Mongodb) getNextRecordDB(request Request, currentPointer int) (map[string]interface{}, error) {
 	var res map[string]interface{}
 	filter := bson.D{{"_id", bson.D{{"$gt", currentPointer}}}}
 	opts := options.FindOne().SetSort(bson.M{"_id": 1})
 	coll := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream)
 	err := coll.FindOne(context.TODO(), filter, opts).Decode(&res)
-	return res, err
+	if err != nil {
+		return res, &DBError{utils.StatusNoData, err.Error()}
+	}
+	return res, nil
 }
 
 func (db *Mongodb) getCurrentPointerDB(request Request) (LocationPointer, error) {
@@ -635,9 +647,14 @@ func (db *Mongodb) getCurrentPointerDB(request Request) (LocationPointer, error)
 	opts := options.FindOne()
 	coll := db.client.Database(request.DbName()).Collection(pointer_collection_name)
 	err := coll.FindOne(context.TODO(), filter, opts).Decode(&curPointer)
-	return curPointer, err
+	if err != nil {
+		return curPointer, &DBError{utils.StatusTransactionInterrupted, err.Error()}
+	}
+	return curPointer, nil
 }
 
+// If value of the current pointer didn't change during processing of getNext request
+// update it to next existing record.
 func (db *Mongodb) updateCurrentPointerIfEqualDB(request Request, currentValue, nextValue int) error {
 	update := bson.M{"$set": bson.M{pointer_field_name: nextValue}}
 	opts := options.Update().SetUpsert(false)
@@ -647,6 +664,9 @@ func (db *Mongodb) updateCurrentPointerIfEqualDB(request Request, currentValue,
 	return err
 }
 
+// Return record based on current pointer, index of the record and error
+// If obtaining current pointer fails, record is corrupted or thisis end of stream: return 0 instead of record id
+// If record is missing, or incomplete return valid record id
 func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNext) ([]byte, int, error) {
 
 	nextInd, maxInd, err := db.getNextAndMaxIndexesFromCurPointer(request, pointer_collection_name, params.IdKey)
@@ -681,11 +701,11 @@ func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNex
 	} else {
 		// In this case we can not guess, what is the next
 		if err != nil {
-			return nil, 0, err
+			return nil, nextInd, err
 		}
 	}
 
-	// Copy value of d key to _id to keep back compatibility
+	// Copy value of message_id key to _id to keep back compatibility
 	if res["message_id"] != nil {
 		res["_id"] = res["message_id"]
 	}
@@ -697,21 +717,22 @@ func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNex
 		params.IdKey)
 
 	record, err := utils.MapToJson(&res)
+	// Something wrong with the record
 	if err != nil {
 		return nil, 0, err
 	}
 	if recordContainsPartialData(request, res) {
-		return nil, 0, &DBError{utils.StatusPartialData, string(record)}
+		return nil, nextInd, &DBError{utils.StatusPartialData, string(record)}
 	} else {
-		fmt.Println("Return record without error: ", string(record))
 		return record, nextInd, nil
 	}
 
 }
 
-func (db *Mongodb) getRecordFromInprocessed(request Request, params ExtraParamNext, originalerror error, ignoreTimeout bool) ([]byte, int, error) {
-	// Get next index from inprocessed collection and
-	// return corresponding data
+// Get index of the record from in-process collection and
+// return corresponding record
+func (db *Mongodb) getRecordFromInprocessed(request Request, params ExtraParamNext,
+	originalerror error, ignoreTimeout bool) ([]byte, int, error) {
 	var err_inproc error
 	nextInd, maxInd, err_inproc := db.getNextAndMaxIndexesFromInprocessed(request, params, ignoreTimeout)
 	if err_inproc != nil {
@@ -754,11 +775,12 @@ func (db *Mongodb) getNextRecord(request Request) ([]byte, error) {
 		return nil, err
 	}
 
+	missingInd := 0
 	if data == nil {
 		data, nextInd, err = db.getRecordFromCurPointer(request, params)
 		if err != nil {
 			request.Logger().Debug("error getting record from current pointer: ", err)
-			return nil, err
+			missingInd = nextInd
 		}
 	}
 
@@ -766,8 +788,18 @@ func (db *Mongodb) getNextRecord(request Request) ([]byte, error) {
 		data, nextInd, err = db.getRecordFromInprocessed(request, params, err, true)
 	}
 
+	// If index of the record is valid, store it in in-process table to try again next time
+	if missingInd > 0 {
+		errUpdate := db.InsertToInprocessIfNeeded(request.DbName(),
+			inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, missingInd, params)
+		if errUpdate != nil {
+			return nil, errUpdate
+		}
+	}
+
 	if err == nil {
-		err_update := db.InsertToInprocessIfNeeded(request.DbName(), inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, nextInd, params)
+		err_update := db.InsertToInprocessIfNeeded(request.DbName(),
+			inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, nextInd, params)
 		if err_update != nil {
 			return nil, err_update
 		}
diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go
index e9c2fca30..8f5efe51a 100644
--- a/broker/src/asapo_broker/database/mongodb_test.go
+++ b/broker/src/asapo_broker/database/mongodb_test.go
@@ -20,6 +20,14 @@ type TestRecord struct {
 	Timestamp int64             `bson:"timestamp" json:"timestamp"`
 }
 
+type TestUserRecord struct {
+	ID        int64             `bson:"_id" json:"_id"`
+	MessageID int64             `bson:"message_id" json:"message_id"`
+	Meta      map[string]string `bson:"meta" json:"meta"`
+	Name      string            `bson:"name" json:"name"`
+	Timestamp int64             `bson:"timestamp" json:"timestamp"`
+}
+
 type TestRecordStreamBtMeta struct {
 	ID   string `bson:"_id" json:"_id"`
 	Meta string `bson:"meta" json:"meta"`
@@ -61,15 +69,32 @@ var rec3 = TestRecord{3, empty_next, "ccc", 2}
 var rec_finished3 = TestRecord{3, map[string]string{"next_stream": "next1"}, finish_stream_keyword, 2}
 var rec_finished11 = TestRecord{11, map[string]string{"next_stream": "next1"}, finish_stream_keyword, 2}
 
+var userRec1 = TestUserRecord{1, 1, empty_next, "aaa", 0}
+var userRec2 = TestUserRecord{2, 2, empty_next, "aaa", 0}
+var userRec3 = TestUserRecord{3, 2, empty_next, "aaa", 0}
+
 var rec1_expect, _ = json.Marshal(rec1)
 var rec2_expect, _ = json.Marshal(rec2)
 var rec3_expect, _ = json.Marshal(rec3)
 
+var userRec1_expect, _ = json.Marshal(userRec1)
+var userRec2_expect, _ = json.Marshal(userRec2)
+
 var recs1 = SizeRecord{3}
 var recs1_expect, _ = json.Marshal(recs1)
 var recs2 = SizeRecord{0}
 var recs2_expect, _ = json.Marshal(recs2)
 
+func getExtraParamNext(key string, resend bool, delay int, resendAttempts int) string {
+	var extraParamNext = ExtraParamNext{
+		IdKey:          key,
+		Resend:         resend,
+		DelayMs:        delay,
+		ResendAttempts: resendAttempts}
+	encoded, _ := json.Marshal(extraParamNext)
+	return string(encoded)
+}
+
 func cleanup() {
 	if db.client == nil {
 		return
@@ -126,7 +151,8 @@ func TestMongoDBGetNextErrorWhenWrongDatabasename(t *testing.T) {
 func TestMongoDBGetNextErrorWhenNonExistingDatacollectionname(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
-	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: "bla", GroupId: groupId, Op: "next"})
+	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource,
+		Stream: "bla", GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_stream\":\"\"}", err.Error())
 }
@@ -142,26 +168,34 @@ func TestMongoDBGetLastErrorWhenNonExistingDatacollectionname(t *testing.T) {
 func TestMongoDBGetByIdErrorWhenNoData(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
-	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "2"})
+	extraParam := ExtraParamId{
+		Id:    2,
+		IdKey: "_id"}
+	encoded, _ := json.Marshal(extraParam)
+	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "id", ExtraParam: string(encoded)})
 
-	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":0,\"next_stream\":\"\"}", err.Error())
+	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
+
 }
 
-func TestMongoDBGetNextErrorWhenRecordNotThereYet(t *testing.T) {
+func TestMongoDBGetNextIfMissingID(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec2)
-	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
-	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
-	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":2,\"next_stream\":\"\"}", err.Error())
+	rec, _ := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
+	assert.Equal(t, string(rec2_expect), string(rec))
+
 }
 
 func TestMongoDBGetNextOK(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
+	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec1_expect), string(res))
 }
@@ -172,8 +206,8 @@ func TestMongoDBGetNextErrorOnFinishedStream(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec_finished)
 
-	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
-	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
+	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
+	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message)
@@ -185,9 +219,9 @@ func TestMongoDBGetNextErrorOnFinishedStreamAlways(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec_finished)
 
-	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
-	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
-	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
+	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
+	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
+	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message)
@@ -199,7 +233,12 @@ func TestMongoDBGetByIdErrorOnFinishedStream(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec_finished)
 
-	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "2"})
+	extraParam := ExtraParamId{
+		Id:    2,
+		IdKey: "_id"}
+	encoded, _ := json.Marshal(extraParam)
+	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "id", ExtraParam: string(encoded)})
 
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message)
@@ -212,7 +251,6 @@ func TestMongoDBGetLastErrorOnFinishedStream(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec_finished)
 
 	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "last"})
-	fmt.Println(string(res))
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message)
 }
@@ -221,8 +259,8 @@ func TestMongoDBGetNextErrorOnNoMoreData(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
-	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
+	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
+	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"\"}", err.(*DBError).Message)
@@ -233,8 +271,8 @@ func TestMongoDBGetNextCorrectOrder(t *testing.T) {
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec2)
 	db.insertRecord(dbname, collection, &rec1)
-	res1, _ := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
-	res2, _ := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
+	res1, _ := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
+	res2, _ := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 	assert.Equal(t, string(rec1_expect), string(res1))
 	assert.Equal(t, string(rec2_expect), string(res2))
 }
@@ -264,16 +302,14 @@ func getRecords(n int, resend bool) []int {
 	results := make([]int, n)
 	var wg sync.WaitGroup
 	wg.Add(n)
-	extra_param := ""
-	if resend {
-		extra_param = "0_1"
-	}
 	for i := 0; i < n; i++ {
 		go func() {
 			defer wg.Done()
-			res_bin, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: extra_param})
+			// fmt.Println("REquest ", string(encoded))
+			res_bin, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+				GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 1)})
 			if err != nil {
-				fmt.Println("error at read ", i)
+				fmt.Println("error at read ", i, string(res_bin))
 			}
 			var res TestRecord
 			json.Unmarshal(res_bin, &res)
@@ -283,7 +319,6 @@ func getRecords(n int, resend bool) []int {
 		}()
 	}
 	wg.Wait()
-
 	return results
 }
 
@@ -316,7 +351,8 @@ func TestMongoDBGetLastAfterErasingDatabase(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	insertRecords(10)
-	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
+	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 	db.dropDatabase(dbname)
 
 	db.insertRecord(dbname, collection, &rec1)
@@ -331,7 +367,8 @@ func TestMongoDBGetNextAfterErasingDatabase(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	insertRecords(200)
-	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
+	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 	db.dropDatabase(dbname)
 
 	n := 100
@@ -344,10 +381,12 @@ func TestMongoDBGetNextEmptyAfterErasingDatabase(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	insertRecords(10)
-	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
+	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 	db.dropDatabase(dbname)
 
-	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
+	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_stream\":\"\"}", err.Error())
 }
@@ -356,8 +395,12 @@ func TestMongoDBgetRecordByID(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-
-	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "1"})
+	extraParam := ExtraParamId{
+		Id:    1,
+		IdKey: "_id"}
+	encoded, _ := json.Marshal(extraParam)
+	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "id", ExtraParam: string(encoded)})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec1_expect), string(res))
 }
@@ -366,7 +409,12 @@ func TestMongoDBgetRecordByIDFails(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "2"})
+	extraParam := ExtraParamId{
+		Id:    2,
+		IdKey: "_id"}
+	encoded, _ := json.Marshal(extraParam)
+	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "id", ExtraParam: string(encoded)})
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":1,\"next_stream\":\"\"}", err.Error())
 }
@@ -375,7 +423,8 @@ func TestMongoDBGetRecordNext(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
+	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec1_expect), string(res))
 }
@@ -385,9 +434,10 @@ func TestMongoDBGetRecordNextMultipleCollections(t *testing.T) {
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection2, &rec_dataset1)
-
-	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
-	res_string, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection2, GroupId: groupId, Op: "next", DatasetOp: true})
+	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
+	res_string, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection2,
+		GroupId: groupId, Op: "next", DatasetOp: true, ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 	var res_ds TestDataset
 	json.Unmarshal(res_string, &res_ds)
 
@@ -403,7 +453,12 @@ func TestMongoDBGetRecordID(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "1"})
+	extraParam := ExtraParamId{
+		Id:    1,
+		IdKey: "_id"}
+	encoded, _ := json.Marshal(extraParam)
+	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "id", ExtraParam: string(encoded)})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec1_expect), string(res))
 }
@@ -412,7 +467,8 @@ func TestMongoDBWrongOp(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "bla"})
+	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "bla"})
 	assert.NotNil(t, err)
 }
 
@@ -438,8 +494,8 @@ func TestMongoDBGetNextAfterGetLastCorrect(t *testing.T) {
 	assert.Equal(t, string(rec2_expect), string(res))
 
 	db.insertRecord(dbname, collection, &rec3)
-
-	res, err = db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
+	res, err = db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec1_expect), string(res))
 
@@ -449,7 +505,7 @@ func TestMongoDBGetGetLastInGroupCorrect(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) // to check it does not influence groupedlast
+	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)}) // to check it does not influence groupedlast
 
 	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""})
 	assert.Nil(t, err)
@@ -590,8 +646,7 @@ func TestMongoDBResetCounter(t *testing.T) {
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
-
-	res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
+	res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 
 	assert.Nil(t, err1)
 	assert.Equal(t, string(rec1_expect), string(res1))
@@ -599,7 +654,7 @@ func TestMongoDBResetCounter(t *testing.T) {
 	_, err_reset := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "resetcounter", ExtraParam: "1"})
 	assert.Nil(t, err_reset)
 
-	res2, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
+	res2, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 
 	assert.Nil(t, err2)
 	assert.Equal(t, string(rec2_expect), string(res2))
@@ -753,8 +808,8 @@ func TestMongoDBGetDataset(t *testing.T) {
 	defer cleanup()
 
 	db.insertRecord(dbname, collection, &rec_dataset1)
-
-	res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true})
+	res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", DatasetOp: true, ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 
 	assert.Nil(t, err)
 
@@ -769,8 +824,8 @@ func TestMongoDBNoDataOnNotCompletedFirstDataset(t *testing.T) {
 	defer cleanup()
 
 	db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
-
-	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true})
+	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next",
+		DatasetOp: true, ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 
 	assert.Equal(t, utils.StatusPartialData, err.(*DBError).Code)
 	var res TestDataset
@@ -784,9 +839,10 @@ func TestMongoDBNoDataOnNotCompletedNextDataset(t *testing.T) {
 
 	db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
 	db.insertRecord(dbname, collection, &rec_dataset2_incomplete)
-
-	_, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true})
-	_, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true})
+	_, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", DatasetOp: true, ExtraParam: getExtraParamNext("_id", false, 0, -1)})
+	_, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", DatasetOp: true, ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 
 	assert.Equal(t, utils.StatusPartialData, err1.(*DBError).Code)
 	assert.Equal(t, utils.StatusPartialData, err2.(*DBError).Code)
@@ -802,7 +858,8 @@ func TestMongoDBGetRecordLastDataSetSkipsIncompleteSets(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec_dataset1)
 	db.insertRecord(dbname, collection, &rec_dataset2)
 
-	res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"})
+	res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"})
 
 	assert.Nil(t, err)
 
@@ -819,7 +876,8 @@ func TestMongoDBGetRecordLastDataSetReturnsIncompleteSets(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec_dataset1)
 	db.insertRecord(dbname, collection, &rec_dataset2)
 
-	res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "last",
+	res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "last",
 		DatasetOp: true, MinDatasetSize: 3, ExtraParam: "0"})
 
 	assert.Nil(t, err)
@@ -837,7 +895,8 @@ func TestMongoDBGetRecordLastDataSetSkipsIncompleteSetsWithMinSize(t *testing.T)
 	db.insertRecord(dbname, collection, &rec_dataset1)
 	db.insertRecord(dbname, collection, &rec_dataset2_incomplete3)
 
-	res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "last",
+	res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "last",
 		DatasetOp: true, MinDatasetSize: 3, ExtraParam: "0"})
 
 	assert.Nil(t, err)
@@ -854,7 +913,8 @@ func TestMongoDBGetRecordLastDataSetWithFinishedStream(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec_dataset1)
 	db.insertRecord(dbname, collection, &rec_finished)
 
-	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "last",
+	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "last",
 		DatasetOp: true, ExtraParam: "0"})
 
 	assert.NotNil(t, err)
@@ -871,7 +931,8 @@ func TestMongoDBGetRecordLastDataSetWithIncompleteDatasetsAndFinishedStreamRetur
 	db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
 	db.insertRecord(dbname, collection, &rec_finished)
 
-	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "last",
+	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "last",
 		DatasetOp: true, MinDatasetSize: 2, ExtraParam: "0"})
 
 	assert.NotNil(t, err)
@@ -888,7 +949,8 @@ func TestMongoDBGetRecordLastDataSetOK(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec_dataset1)
 	db.insertRecord(dbname, collection, &rec_dataset3)
 
-	res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"})
+	res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"})
 
 	assert.Nil(t, err)
 
@@ -902,8 +964,12 @@ func TestMongoDBGetDatasetID(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec_dataset1)
-
-	res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "id", DatasetOp: true, ExtraParam: "1"})
+	extraParam := ExtraParamId{
+		Id:    1,
+		IdKey: "_id"}
+	encoded, _ := json.Marshal(extraParam)
+	res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "id", DatasetOp: true, ExtraParam: string(encoded)})
 
 	assert.Nil(t, err)
 
@@ -918,8 +984,12 @@ func TestMongoDBErrorOnIncompleteDatasetID(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
-
-	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "id", DatasetOp: true, ExtraParam: "1"})
+	extraParam := ExtraParamId{
+		Id:    1,
+		IdKey: "_id"}
+	encoded, _ := json.Marshal(extraParam)
+	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "id", DatasetOp: true, ExtraParam: string(encoded)})
 
 	assert.Equal(t, utils.StatusPartialData, err.(*DBError).Code)
 
@@ -930,12 +1000,61 @@ func TestMongoDBErrorOnIncompleteDatasetID(t *testing.T) {
 
 }
 
-func TestMongoDBOkOnIncompleteDatasetID(t *testing.T) {
+func TestMongoDBGetNextRecoverAfterIncompleteDataset(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
+	db.insertRecord(dbname, collection, &rec_dataset2_incomplete)
+
+	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 3),
+		DatasetOp: true, MinDatasetSize: 0})
+	assert.Equal(t, utils.StatusPartialData, err.(*DBError).Code)
 
-	res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "id", DatasetOp: true, MinDatasetSize: 3, ExtraParam: "1"})
+	db.replaceRecord(dbname, 1, collection, &rec_dataset1)
+	res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 3),
+		DatasetOp: true, MinDatasetSize: 0})
+
+	assert.Nil(t, err)
+
+	var res_ds TestDataset
+	json.Unmarshal(res_string, &res_ds)
+	assert.Equal(t, rec_dataset1, res_ds)
+}
+
+func TestMongoDBGetNextUserKeyMissingID(t *testing.T) {
+	db.Connect(dbaddress)
+	defer cleanup()
+	db.insertRecord(dbname, collection, &userRec2)
+
+	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("message_id", true, 0, 1)})
+
+	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
+	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":2,\"next_stream\":\"\"}", err.Error())
+
+	db.insertRecord(dbname, collection, &userRec1)
+	rec1, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("message_id", true, 0, 1)})
+
+	rec2, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("message_id", true, 0, 1)})
+
+	assert.Equal(t, string(userRec1_expect), string(rec1))
+	assert.Equal(t, string(userRec2_expect), string(rec2))
+}
+
+func TestMongoDBOkOnIncompleteDatasetID(t *testing.T) {
+	db.Connect(dbaddress)
+	defer cleanup()
+	db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
+	extraParam := ExtraParamId{
+		Id:    1,
+		IdKey: "_id"}
+	encoded, _ := json.Marshal(extraParam)
+	res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "id", DatasetOp: true, MinDatasetSize: 3, ExtraParam: string(encoded)})
 
 	assert.Nil(t, err)
 
@@ -981,7 +1100,8 @@ func TestMongoDBListStreams(t *testing.T) {
 		}
 		var rec_streams_expect, _ = json.Marshal(test.expectedStreams)
 
-		res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: "0", Op: "streams", ExtraParam: `{"from":"` + test.from + `","filter":"","detailed":"True"}`})
+		res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: "0", Op: "streams",
+			ExtraParam: `{"from":"` + test.from + `","filter":"","detailed":"True"}`})
 		if test.ok {
 			assert.Nil(t, err, test.test)
 			assert.Equal(t, string(rec_streams_expect), string(res), test.test)
@@ -1091,14 +1211,19 @@ func TestMongoDBGetNextUsesInprocessedImmedeatly(t *testing.T) {
 	defer cleanup()
 	err := db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
-
-	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
-	res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
+	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 1)})
+	res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 1)})
+	res2, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 1)})
 
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
+	assert.Nil(t, err2)
 	assert.Equal(t, string(rec1_expect), string(res))
 	assert.Equal(t, string(rec1_expect), string(res1))
+	assert.Equal(t, string(rec2_expect), string(res2))
 }
 
 func TestMongoDBGetNextUsesInprocessedNumRetry(t *testing.T) {
@@ -1106,9 +1231,12 @@ func TestMongoDBGetNextUsesInprocessedNumRetry(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	err := db.insertRecord(dbname, collection, &rec1)
-	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
-	res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
-	_, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
+	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 1)})
+	res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 1)})
+	_, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 1)})
 
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
@@ -1126,10 +1254,13 @@ func TestMongoDBGetNextUsesInprocessedAfterTimeout(t *testing.T) {
 	defer cleanup()
 	err := db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
-	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"})
-	res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"})
+	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 1000, 3)})
+	res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 1000, 3)})
 	time.Sleep(time.Second)
-	res2, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"})
+	res2, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 1000, 3)})
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
 	assert.Nil(t, err2)
@@ -1145,10 +1276,13 @@ func TestMongoDBGetNextReturnsToNormalAfterUsesInprocessed(t *testing.T) {
 	err := db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
 	db.insertRecord(dbname, collection, &rec_finished3)
-	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"})
+	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 1000, 3)})
 	time.Sleep(time.Second)
-	res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"})
-	res2, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"})
+	res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 1000, 3)})
+	res2, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 1000, 3)})
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
 	assert.Nil(t, err2)
@@ -1163,8 +1297,10 @@ func TestMongoDBGetNextUsesInprocessedImmedeatlyIfFinishedStream(t *testing.T) {
 	defer cleanup()
 	err := db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec_finished)
-	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
-	res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
+	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 3)})
+	res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 3)})
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
 	assert.Equal(t, string(rec1_expect), string(res))
@@ -1177,9 +1313,12 @@ func TestMongoDBGetNextUsesInprocessedImmedeatlyIfEndofStream(t *testing.T) {
 	defer cleanup()
 	err := db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
-	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
-	res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
-	res2, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
+	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 3)})
+	res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 3)})
+	res2, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 3)})
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
 	assert.Nil(t, err2)
@@ -1193,11 +1332,14 @@ func TestMongoDBAckDeletesInprocessed(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
+	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 3)})
 	query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}"
 
-	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str})
-	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
+	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "ackmessage", ExtraParam: query_str})
+	_, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection,
+		GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 3)})
 	assert.NotNil(t, err)
 	if err != nil {
 		assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
@@ -1230,14 +1372,14 @@ func TestMongoDBNegAck(t *testing.T) {
 	inputParams.Params.DelayMs = 0
 
 	db.insertRecord(dbname, collection, &rec1)
-	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})
+	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 	bparam, _ := json.Marshal(&inputParams)
 
 	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "negackmessage", ExtraParam: string(bparam)})
-	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) // first time message from negack
-	_, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"})  // second time nothing
+	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)}) // first time message from negack
+	_, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})  // second time nothing
 	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "negackmessage", ExtraParam: string(bparam)})
-	_, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) // second time nothing
+	_, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)}) // second time nothing
 
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec1_expect), string(res))
@@ -1256,12 +1398,12 @@ func TestMongoDBGetNextClearsInprocessAfterReset(t *testing.T) {
 	defer cleanup()
 	err := db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
-	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
-	res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
+	res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 1)})
+	res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 1)})
 
 	db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "resetcounter", ExtraParam: "0"})
-	res2, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
-	res3, err3 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
+	res2, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 1)})
+	res3, err3 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: getExtraParamNext("_id", true, 0, 1)})
 
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
@@ -1290,7 +1432,7 @@ func TestDeleteStreams(t *testing.T) {
 	for _, test := range testsDeleteStream {
 		db.Connect(dbaddress)
 		db.insertRecord(dbname, encodeStringForColName(test.stream), &rec1)
-		db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: test.stream, GroupId: "123", Op: "next"})
+		db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: test.stream, GroupId: "123", Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 		query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}"
 		request := Request{Beamtime: beamtime, DataSource: datasource, Stream: test.stream, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}
 		_, err := db.ProcessRequest(request)
@@ -1337,7 +1479,8 @@ func TestMongoDBEncodingOK(t *testing.T) {
 	for _, test := range testsEncodings {
 		db.Connect(dbaddress)
 		db.insertRecord(test.dbname_indb, test.collection_indb, &rec1)
-		res, err := db.ProcessRequest(Request{Beamtime: test.beamtime, DataSource: test.datasource, Stream: test.collection, GroupId: test.group, Op: "next"})
+		res, err := db.ProcessRequest(Request{Beamtime: test.beamtime, DataSource: test.datasource, Stream: test.collection,
+			GroupId: test.group, Op: "next", ExtraParam: getExtraParamNext("_id", false, 0, -1)})
 		if test.ok {
 			assert.Nil(t, err, test.message)
 			assert.Equal(t, string(rec1_expect), string(res), test.message)
-- 
GitLab