diff --git a/broker/src/asapo_broker/database/database.go b/broker/src/asapo_broker/database/database.go
index 61ec11360a53ebf7a9e47c11db0d655d4a9d21e7..5df85c2ddd38784896fbf0535220006415fd1287 100644
--- a/broker/src/asapo_broker/database/database.go
+++ b/broker/src/asapo_broker/database/database.go
@@ -1,7 +1,17 @@
 package database
 
+type Request struct {
+	DbName string
+	DbCollectionName string
+	GroupId string
+	Op string
+	DatasetOp bool
+	MinDatasetSize int
+	ExtraParam string
+}
+
 type Agent interface {
-	ProcessRequest(db_name string, data_collection_name string, group_id string, op string, extra string) ([]byte, error)
+	ProcessRequest(request Request) ([]byte, error)
 	Ping() error
 	Connect(string) error
 	Close()
diff --git a/broker/src/asapo_broker/database/mock_database.go b/broker/src/asapo_broker/database/mock_database.go
index 3797f8afb6d9fa006fb3b30ac488d66a55c59c11..574a8aa5161a133454dccdaf6d85cdda9e4f4b7e 100644
--- a/broker/src/asapo_broker/database/mock_database.go
+++ b/broker/src/asapo_broker/database/mock_database.go
@@ -29,7 +29,7 @@ func (db *MockedDatabase) SetSettings(settings DBSettings) {
 }
 
 
-func (db *MockedDatabase) ProcessRequest(db_name string, data_collection_name string, group_id string, op string, extra_param string) (answer []byte, err error) {
-	args := db.Called(db_name, data_collection_name, group_id, op, extra_param)
+func (db *MockedDatabase) ProcessRequest(request Request) (answer []byte, err error) {
+	args := db.Called(request)
 	return args.Get(0).([]byte), args.Error(1)
 }
diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go
index 576655bb496790466c116b2d6f688c27f50459a9..5a73166b86d891c9bef7d287db75d2e2b84e007f 100644
--- a/broker/src/asapo_broker/database/mongodb.go
+++ b/broker/src/asapo_broker/database/mongodb.go
@@ -764,43 +764,37 @@ func (db *Mongodb) getSubstreams(db_name string, from string) ([]byte, error) {
 }
 
 
-func (db *Mongodb) ProcessRequest(db_name string, collection_name string, group_id string, op string, extra_param string) (answer []byte, err error) {
-	dataset := false
-	if strings.HasSuffix(op, "_dataset") {
-		dataset = true
-		op = op[:len(op)-8]
-	}
-
-	if err := db.checkDatabaseOperationPrerequisites(db_name, collection_name, group_id); err != nil {
+func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) {
+	if err := db.checkDatabaseOperationPrerequisites(request.DbName, request.DbCollectionName, request.GroupId); err != nil {
 		return nil, err
 	}
 
-	switch op {
+	switch request.Op {
 	case "next":
-		return db.getNextRecord(db_name, collection_name, group_id, dataset, extra_param)
+		return db.getNextRecord(request.DbName, request.DbCollectionName, request.GroupId, request.DatasetOp, request.ExtraParam)
 	case "id":
-		return db.getRecordByID(db_name, collection_name, group_id, extra_param, dataset)
+		return db.getRecordByID(request.DbName, request.DbCollectionName, request.GroupId, request.ExtraParam, request.DatasetOp)
 	case "last":
-		return db.getLastRecord(db_name, collection_name, group_id, dataset)
+		return db.getLastRecord(request.DbName, request.DbCollectionName, request.GroupId, request.DatasetOp)
 	case "resetcounter":
-		return db.resetCounter(db_name, collection_name, group_id, extra_param)
+		return db.resetCounter(request.DbName, request.DbCollectionName, request.GroupId, request.ExtraParam)
 	case "size":
-		return db.getSize(db_name, collection_name)
+		return db.getSize(request.DbName, request.DbCollectionName)
 	case "meta":
-		return db.getMeta(db_name, extra_param)
+		return db.getMeta(request.DbName, request.ExtraParam)
 	case "queryimages":
-		return db.queryImages(db_name, collection_name, extra_param)
+		return db.queryImages(request.DbName, request.DbCollectionName, request.ExtraParam)
 	case "substreams":
-		return db.getSubstreams(db_name,extra_param)
+		return db.getSubstreams(request.DbName,request.ExtraParam)
 	case "ackimage":
-		return db.ackRecord(db_name, collection_name, group_id, extra_param)
+		return db.ackRecord(request.DbName, request.DbCollectionName, request.GroupId, request.ExtraParam)
 	case "negackimage":
-		return db.negAckRecord(db_name, group_id, extra_param)
+		return db.negAckRecord(request.DbName, request.GroupId, request.ExtraParam)
 	case "nacks":
-		return db.nacks(db_name, collection_name, group_id, extra_param)
+		return db.nacks(request.DbName, request.DbCollectionName, request.GroupId, request.ExtraParam)
 	case "lastack":
-		return db.lastAck(db_name, collection_name, group_id)
+		return db.lastAck(request.DbName, request.DbCollectionName, request.GroupId)
 	}
 
-	return nil, errors.New("Wrong db operation: " + op)
+	return nil, errors.New("Wrong db operation: " + request.Op)
 }
diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go
index 8e2d97295a7f2b3e782fea6757b24b9098bd06c1..8a93623bb3490aa95eb06d139fa9567ba5a3044e 100644
--- a/broker/src/asapo_broker/database/mongodb_test.go
+++ b/broker/src/asapo_broker/database/mongodb_test.go
@@ -13,10 +13,10 @@ import (
 )
 
 type TestRecord struct {
-	ID   int               `bson:"_id" json:"_id"`
-	Meta map[string]string `bson:"meta" json:"meta"`
-	Name string            `bson:"name" json:"name"`
-	Timestamp   int64        `bson:"timestamp" json:"timestamp"`
+	ID        int               `bson:"_id" json:"_id"`
+	Meta      map[string]string `bson:"meta" json:"meta"`
+	Name      string            `bson:"name" json:"name"`
+	Timestamp int64             `bson:"timestamp" json:"timestamp"`
 }
 
 type TestDataset struct {
@@ -37,10 +37,10 @@ const metaID_str = "0"
 
 var empty_next = map[string]string{"next_substream": ""}
 
-var rec1 = TestRecord{1, empty_next, "aaa",0}
-var rec_finished = TestRecord{2, map[string]string{"next_substream": "next1"}, finish_substream_keyword,0}
-var rec2 = TestRecord{2, empty_next, "bbb",1}
-var rec3 = TestRecord{3, empty_next, "ccc",2}
+var rec1 = TestRecord{1, empty_next, "aaa", 0}
+var rec_finished = TestRecord{2, map[string]string{"next_substream": "next1"}, finish_substream_keyword, 0}
+var rec2 = TestRecord{2, empty_next, "bbb", 1}
+var rec3 = TestRecord{3, empty_next, "ccc", 2}
 
 var rec1_expect, _ = json.Marshal(rec1)
 var rec2_expect, _ = json.Marshal(rec2)
@@ -75,31 +75,31 @@ func TestMongoDBConnectOK(t *testing.T) {
 }
 
 func TestMongoDBGetNextErrorWhenNotConnected(t *testing.T) {
-	_, err := db.ProcessRequest(dbname, collection, groupId, "next", "")
+	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
 	assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code)
 }
 
 func TestMongoDBGetMetaErrorWhenNotConnected(t *testing.T) {
-	_, err := db.ProcessRequest(dbname, collection, "", "meta", "0")
+	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "meta", ExtraParam: "0"})
 	assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code)
 }
 
 func TestMongoDBQueryImagesErrorWhenNotConnected(t *testing.T) {
-	_, err := db.ProcessRequest(dbname, collection, "", "queryimages", "0")
+	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "queryimages", ExtraParam: "0"})
 	assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code)
 }
 
 func TestMongoDBGetNextErrorWhenWrongDatabasename(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
-	_, err := db.ProcessRequest("", collection, groupId, "next", "")
+	_, err := db.ProcessRequest(Request{DbCollectionName: collection, GroupId: groupId, Op: "next"})
 	assert.Equal(t, utils.StatusWrongInput, err.(*DBError).Code)
 }
 
 func TestMongoDBGetNextErrorWhenNonExistingDatacollectionname(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
-	_, err := db.ProcessRequest(dbname, "bla", groupId, "next", "")
+	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: "bla", GroupId: groupId, Op: "next"})
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_substream\":\"\"}", err.Error())
 }
@@ -107,26 +107,25 @@ func TestMongoDBGetNextErrorWhenNonExistingDatacollectionname(t *testing.T) {
 func TestMongoDBGetLastErrorWhenNonExistingDatacollectionname(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
-	_, err := db.ProcessRequest(dbname, "bla", groupId, "last", "")
+	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: "bla", GroupId: groupId, Op: "last"})
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_substream\":\"\"}", err.Error())
 }
 
-func TestMongoDBGetByIdErrorWhenNonExistingDatacollectionname(t *testing.T) {
+func TestMongoDBGetByIdErrorWhenNoData(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
-	_, err := db.ProcessRequest(dbname, collection, groupId, "id", "2")
+	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", ExtraParam: "2"})
 
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":0,\"next_substream\":\"\"}", err.Error())
 }
 
-
 func TestMongoDBGetNextErrorWhenRecordNotThereYet(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec2)
-	_, err := db.ProcessRequest(dbname, collection, groupId, "next", "")
+	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":2,\"next_substream\":\"\"}", err.Error())
 }
@@ -135,20 +134,19 @@ func TestMongoDBGetNextOK(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	res, err := db.ProcessRequest(dbname, collection, groupId, "next", "")
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec1_expect), string(res))
 }
 
-
 func TestMongoDBGetNextErrorOnFinishedStream(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec_finished)
 
-	db.ProcessRequest(dbname, collection, groupId, "next", "")
-	_, err := db.ProcessRequest(dbname, collection, groupId, "next", "")
+	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
 
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":2,\"next_substream\":\"next1\"}", err.(*DBError).Message)
@@ -158,8 +156,8 @@ func TestMongoDBGetNextErrorOnNoMoreData(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	db.ProcessRequest(dbname, collection, groupId, "next", "")
-	_, err := db.ProcessRequest(dbname, collection, groupId, "next", "")
+	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
 
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"\"}", err.(*DBError).Message)
@@ -170,8 +168,8 @@ func TestMongoDBGetNextCorrectOrder(t *testing.T) {
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec2)
 	db.insertRecord(dbname, collection, &rec1)
-	res1, _ := db.ProcessRequest(dbname, collection, groupId, "next", "")
-	res2, _ := db.ProcessRequest(dbname, collection, groupId, "next", "")
+	res1, _ := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	res2, _ := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
 	assert.Equal(t, string(rec1_expect), string(res1))
 	assert.Equal(t, string(rec2_expect), string(res2))
 }
@@ -191,8 +189,8 @@ func insertRecords(n int) {
 	for ind, record := range records {
 		record.ID = ind + 1
 		record.Name = string(ind)
-		if err:= db.insertRecord(dbname, collection, &record);err!=nil {
-			fmt.Println("error at insert ",ind)
+		if err := db.insertRecord(dbname, collection, &record); err != nil {
+			fmt.Println("error at insert ", ind)
 		}
 	}
 }
@@ -201,20 +199,20 @@ func getRecords(n int, resend bool) []int {
 	results := make([]int, n)
 	var wg sync.WaitGroup
 	wg.Add(n)
-	extra_param:=""
+	extra_param := ""
 	if resend {
-		extra_param="0_1"
+		extra_param = "0_1"
 	}
 	for i := 0; i < n; i++ {
 		go func() {
 			defer wg.Done()
-			res_bin, err:= db.ProcessRequest(dbname, collection, groupId, "next", extra_param)
-			if err!=nil {
-				fmt.Println("error at read ",i)
+			res_bin, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: extra_param})
+			if err != nil {
+				fmt.Println("error at read ", i)
 			}
 			var res TestRecord
 			json.Unmarshal(res_bin, &res)
-			if res.ID>0 {
+			if res.ID > 0 {
 				results[res.ID-1] = 1
 			}
 		}()
@@ -230,7 +228,7 @@ func TestMongoDBGetNextInParallel(t *testing.T) {
 	n := 100
 	insertRecords(n)
 
-	results := getRecords(n,false)
+	results := getRecords(n, false)
 
 	assert.Equal(t, n, getNOnes(results))
 }
@@ -242,26 +240,24 @@ func TestMongoDBGetNextInParallelWithResend(t *testing.T) {
 	n := 100
 	insertRecords(n)
 
-	results := getRecords(n,true)
-	results2 := getRecords(n,true)
+	results := getRecords(n, true)
+	results2 := getRecords(n, true)
 
-	assert.Equal(t, n, getNOnes(results),"first")
-	assert.Equal(t, n, getNOnes(results2),"second")
+	assert.Equal(t, n, getNOnes(results), "first")
+	assert.Equal(t, n, getNOnes(results2), "second")
 }
 
-
-
 func TestMongoDBGetLastAfterErasingDatabase(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	insertRecords(10)
-	db.ProcessRequest(dbname, collection, groupId, "next", "")
+	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
 	db.dropDatabase(dbname)
 
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
 
-	res, err := db.ProcessRequest(dbname, collection, groupId, "last", "0")
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", ExtraParam: "0"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec2_expect), string(res))
 }
@@ -270,12 +266,12 @@ func TestMongoDBGetNextAfterErasingDatabase(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	insertRecords(200)
-	db.ProcessRequest(dbname, collection, groupId, "next", "")
+	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
 	db.dropDatabase(dbname)
 
 	n := 100
 	insertRecords(n)
-	results := getRecords(n,false)
+	results := getRecords(n, false)
 	assert.Equal(t, n, getNOnes(results))
 }
 
@@ -283,20 +279,20 @@ func TestMongoDBGetNextEmptyAfterErasingDatabase(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	insertRecords(10)
-	db.ProcessRequest(dbname, collection, groupId, "next", "")
+	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
 	db.dropDatabase(dbname)
 
-	_, err := db.ProcessRequest(dbname, collection, groupId, "next", "")
+	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_substream\":\"\"}", err.Error())
 }
 
-
 func TestMongoDBgetRecordByID(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	res, err := db.ProcessRequest(dbname, collection, groupId, "id", "1")
+
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", ExtraParam: "1"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec1_expect), string(res))
 }
@@ -305,7 +301,7 @@ func TestMongoDBgetRecordByIDFails(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	_, err := db.ProcessRequest(dbname, collection, groupId, "id", "2")
+	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", ExtraParam: "2"})
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":1,\"next_substream\":\"\"}", err.Error())
 }
@@ -314,7 +310,7 @@ func TestMongoDBGetRecordNext(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	res, err := db.ProcessRequest(dbname, collection, groupId, "next", "")
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec1_expect), string(res))
 }
@@ -325,8 +321,8 @@ func TestMongoDBGetRecordNextMultipleCollections(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection2, &rec_dataset1)
 
-	res, err := db.ProcessRequest(dbname, collection, groupId, "next", "")
-	res_string, err2 := db.ProcessRequest(dbname, collection2, groupId, "next_dataset", "")
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	res_string, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection2, GroupId: groupId, Op: "next", DatasetOp: true})
 	var res_ds TestDataset
 	json.Unmarshal(res_string, &res_ds)
 
@@ -342,7 +338,7 @@ func TestMongoDBGetRecordID(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	res, err := db.ProcessRequest(dbname, collection, groupId, "id", "1")
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", ExtraParam: "1"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec1_expect), string(res))
 }
@@ -351,7 +347,7 @@ func TestMongoDBWrongOp(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	_, err := db.ProcessRequest(dbname, collection, groupId, "bla", "0")
+	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "bla"})
 	assert.NotNil(t, err)
 }
 
@@ -361,7 +357,7 @@ func TestMongoDBGetRecordLast(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
 
-	res, err := db.ProcessRequest(dbname, collection, groupId, "last", "0")
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", ExtraParam: "0"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec2_expect), string(res))
 }
@@ -372,13 +368,13 @@ func TestMongoDBGetNextAfterGetLastCorrect(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
 
-	res, err := db.ProcessRequest(dbname, collection, groupId, "last", "0")
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", ExtraParam: "0"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec2_expect), string(res))
 
 	db.insertRecord(dbname, collection, &rec3)
 
-	res, err = db.ProcessRequest(dbname, collection, groupId, "next", "")
+	res, err = db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec3_expect), string(res))
 
@@ -391,7 +387,7 @@ func TestMongoDBGetSize(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec2)
 	db.insertRecord(dbname, collection, &rec3)
 
-	res, err := db.ProcessRequest(dbname, collection, "", "size", "0")
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(recs1_expect), string(res))
 }
@@ -400,7 +396,7 @@ func TestMongoDBGetSizeNoRecords(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 
-	res, err := db.ProcessRequest(dbname, collection, "", "size", "0")
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(recs2_expect), string(res))
 }
@@ -418,7 +414,7 @@ func TestMongoPingNotConected(t *testing.T) {
 }
 
 func TestMongoDBgetRecordByIDNotConnected(t *testing.T) {
-	_, err := db.ProcessRequest(dbname, collection, "", "id", "2")
+	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", ExtraParam: "1"})
 	assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code)
 }
 
@@ -428,15 +424,15 @@ func TestMongoDBResetCounter(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
 
-	res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "")
+	res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
 
 	assert.Nil(t, err1)
 	assert.Equal(t, string(rec1_expect), string(res1))
 
-	_, err_reset := db.ProcessRequest(dbname, collection, groupId, "resetcounter", "1")
+	_, err_reset := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "resetcounter", ExtraParam: "1"})
 	assert.Nil(t, err_reset)
 
-	res2, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "")
+	res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
 
 	assert.Nil(t, err2)
 	assert.Equal(t, string(rec2_expect), string(res2))
@@ -450,7 +446,7 @@ func TestMongoDBGetMetaOK(t *testing.T) {
 	rec_expect, _ := json.Marshal(recm)
 	db.insertMeta(dbname, &recm)
 
-	res, err := db.ProcessRequest(dbname, collection, "", "meta", metaID_str)
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "meta", ExtraParam: metaID_str})
 
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec_expect), string(res))
@@ -460,7 +456,7 @@ func TestMongoDBGetMetaErr(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 
-	_, err := db.ProcessRequest(dbname, collection, "", "meta", metaID_str)
+	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "meta", ExtraParam: metaID_str})
 	assert.NotNil(t, err)
 }
 
@@ -486,7 +482,7 @@ var tests = []struct {
 	res   []TestRecordMeta
 	ok    bool
 }{
-	{"_id > 0", []TestRecordMeta{recq1, recq2,recq3,recq4}, true},
+	{"_id > 0", []TestRecordMeta{recq1, recq2, recq3, recq4}, true},
 	{"meta.counter = 10", []TestRecordMeta{recq1, recq3}, true},
 	{"meta.counter = 10 ORDER BY _id DESC", []TestRecordMeta{recq3, recq1}, true},
 	{"meta.counter > 10 ORDER BY meta.counter DESC", []TestRecordMeta{recq4, recq2}, true},
@@ -535,7 +531,7 @@ func TestMongoDBQueryImagesOK(t *testing.T) {
 		//			continue
 		//		}
 
-		res_string, err := db.ProcessRequest(dbname, collection, "", "queryimages", test.query)
+		res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "queryimages", ExtraParam: test.query})
 		var res []TestRecordMeta
 		json.Unmarshal(res_string, &res)
 		//		fmt.Println(string(res_string))
@@ -554,7 +550,7 @@ func TestMongoDBQueryImagesOnEmptyDatabase(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	for _, test := range tests {
-		res_string, err := db.ProcessRequest(dbname, collection, "", "queryimages", test.query)
+		res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "queryimages", ExtraParam: test.query})
 		var res []TestRecordMeta
 		json.Unmarshal(res_string, &res)
 		assert.Equal(t, 0, len(res))
@@ -577,7 +573,7 @@ func TestMongoDBGetDataset(t *testing.T) {
 
 	db.insertRecord(dbname, collection, &rec_dataset1)
 
-	res_string, err := db.ProcessRequest(dbname, collection, groupId, "next_dataset", "")
+	res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", DatasetOp: true})
 
 	assert.Nil(t, err)
 
@@ -593,7 +589,7 @@ func TestMongoDBNoDataOnNotCompletedFirstDataset(t *testing.T) {
 
 	db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
 
-	res_string, err := db.ProcessRequest(dbname, collection, groupId, "next_dataset", "")
+	res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", DatasetOp: true})
 
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_substream\":\"\"}", err.(*DBError).Message)
@@ -608,7 +604,7 @@ func TestMongoDBGetRecordLastDataSetSkipsIncompleteSets(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec_dataset1)
 	db.insertRecord(dbname, collection, &rec_dataset2)
 
-	res_string, err := db.ProcessRequest(dbname, collection, groupId, "last_dataset", "0")
+	res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", DatasetOp:true, ExtraParam: "0"})
 
 	assert.Nil(t, err)
 
@@ -625,7 +621,7 @@ func TestMongoDBGetRecordLastDataSetOK(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec_dataset1)
 	db.insertRecord(dbname, collection, &rec_dataset3)
 
-	res_string, err := db.ProcessRequest(dbname, collection, groupId, "last_dataset", "0")
+	res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"})
 
 	assert.Nil(t, err)
 
@@ -640,7 +636,7 @@ func TestMongoDBGetDatasetID(t *testing.T) {
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec_dataset1)
 
-	res_string, err := db.ProcessRequest(dbname, collection, groupId, "id_dataset", "1")
+	res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp:true, ExtraParam: "1"})
 
 	assert.Nil(t, err)
 
@@ -657,28 +653,29 @@ type Substream struct {
 }
 
 var testsSubstreams = []struct {
-	from string
-	substreams []Substream
+	from               string
+	substreams         []Substream
 	expectedSubstreams SubstreamsRecord
-	test       string
-	ok         bool
+	test               string
+	ok                 bool
 }{
-	{"",[]Substream{},SubstreamsRecord{[]SubstreamInfo{}}, "no substreams", true},
-	{"",[]Substream{{"ss1",[]TestRecord{rec2,rec1}}},SubstreamsRecord{[]SubstreamInfo{SubstreamInfo{Name: "ss1",Timestamp: 0}}}, "one substream", true},
-	{"",[]Substream{{"ss1",[]TestRecord{rec2,rec1}},{"ss2",[]TestRecord{rec2,rec3}}},SubstreamsRecord{[]SubstreamInfo{SubstreamInfo{Name: "ss1",Timestamp: 0},SubstreamInfo{Name: "ss2",Timestamp: 1}}}, "two substreams", true},
-	{"ss2",[]Substream{{"ss1",[]TestRecord{rec1,rec2}},{"ss2",[]TestRecord{rec2,rec3}}},SubstreamsRecord{[]SubstreamInfo{SubstreamInfo{Name: "ss2",Timestamp: 1}}}, "with from", true},
+	{"", []Substream{}, SubstreamsRecord{[]SubstreamInfo{}}, "no substreams", true},
+	{"", []Substream{{"ss1", []TestRecord{rec2, rec1}}}, SubstreamsRecord{[]SubstreamInfo{SubstreamInfo{Name: "ss1", Timestamp: 0}}}, "one substream", true},
+	{"", []Substream{{"ss1", []TestRecord{rec2, rec1}}, {"ss2", []TestRecord{rec2, rec3}}}, SubstreamsRecord{[]SubstreamInfo{SubstreamInfo{Name: "ss1", Timestamp: 0}, SubstreamInfo{Name: "ss2", Timestamp: 1}}}, "two substreams", true},
+	{"ss2", []Substream{{"ss1", []TestRecord{rec1, rec2}}, {"ss2", []TestRecord{rec2, rec3}}}, SubstreamsRecord{[]SubstreamInfo{SubstreamInfo{Name: "ss2", Timestamp: 1}}}, "with from", true},
 }
 
 func TestMongoDBListSubstreams(t *testing.T) {
 	for _, test := range testsSubstreams {
 		db.Connect(dbaddress)
 		for _, substream := range test.substreams {
-			for _,rec:= range substream.records {
+			for _, rec := range substream.records {
 				db.insertRecord(dbname, substream.name, &rec)
 			}
 		}
 		var rec_substreams_expect, _ = json.Marshal(test.expectedSubstreams)
-		res, err := db.ProcessRequest(dbname, "0", "0", "substreams", test.from)
+
+		res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: "0", Op: "substreams", ExtraParam: test.from})
 		if test.ok {
 			assert.Nil(t, err, test.test)
 			assert.Equal(t, string(rec_substreams_expect), string(res), test.test)
@@ -695,50 +692,51 @@ func TestMongoDBAckImage(t *testing.T) {
 
 	db.insertRecord(dbname, collection, &rec1)
 	query_str := "{\"Id\":1,\"Op\":\"ackimage\"}"
-	res, err := db.ProcessRequest(dbname, collection, groupId, "ackimage", query_str)
-	nacks,_ := db.getNacks(dbname,collection,groupId,1,1)
+
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "ackimage", ExtraParam: query_str})
+	nacks, _ := db.getNacks(dbname, collection, groupId, 1, 1)
 	assert.Nil(t, err)
 	assert.Equal(t, "", string(res))
 	assert.Equal(t, 0, len(nacks))
 }
 
 var testsNacs = []struct {
-	rangeString string
-	resString string
+	rangeString   string
+	resString     string
 	insertRecords bool
-	ackRecords bool
-	ok         bool
-	test string
+	ackRecords    bool
+	ok            bool
+	test          string
 }{
-	{"0_0", "{\"unacknowledged\":[1,2,3,4,5,6,7,8,9,10]}",true,false,true,"whole range"},
-	{"", "{\"unacknowledged\":[1,2,3,4,5,6,7,8,9,10]}",true,false,false,"empty string range"},
-	{"0_5", "{\"unacknowledged\":[1,2,3,4,5]}",true,false,true,"to given"},
-	{"5_0", "{\"unacknowledged\":[5,6,7,8,9,10]}",true,false,true,"from given"},
-	{"3_7", "{\"unacknowledged\":[3,4,5,6,7]}",true,false,true,"range given"},
-	{"1_1", "{\"unacknowledged\":[1]}",true,false,true,"single record"},
-	{"3_1", "{\"unacknowledged\":[]}",true,false,false,"to lt from"},
-	{"0_0", "{\"unacknowledged\":[]}",false,false,true,"no records"},
-	{"0_0", "{\"unacknowledged\":[1,5,6,7,8,9,10]}",true,true,true,"skip acks"},
-	{"2_4", "{\"unacknowledged\":[]}",true,true,true,"all acknowledged"},
-	{"1_4", "{\"unacknowledged\":[1]}",true,true,true,"some acknowledged"},
+	{"0_0", "{\"unacknowledged\":[1,2,3,4,5,6,7,8,9,10]}", true, false, true, "whole range"},
+	{"", "{\"unacknowledged\":[1,2,3,4,5,6,7,8,9,10]}", true, false, false, "empty string range"},
+	{"0_5", "{\"unacknowledged\":[1,2,3,4,5]}", true, false, true, "to given"},
+	{"5_0", "{\"unacknowledged\":[5,6,7,8,9,10]}", true, false, true, "from given"},
+	{"3_7", "{\"unacknowledged\":[3,4,5,6,7]}", true, false, true, "range given"},
+	{"1_1", "{\"unacknowledged\":[1]}", true, false, true, "single record"},
+	{"3_1", "{\"unacknowledged\":[]}", true, false, false, "to lt from"},
+	{"0_0", "{\"unacknowledged\":[]}", false, false, true, "no records"},
+	{"0_0", "{\"unacknowledged\":[1,5,6,7,8,9,10]}", true, true, true, "skip acks"},
+	{"2_4", "{\"unacknowledged\":[]}", true, true, true, "all acknowledged"},
+	{"1_4", "{\"unacknowledged\":[1]}", true, true, true, "some acknowledged"},
 }
 
-
 func TestMongoDBNacks(t *testing.T) {
 	for _, test := range testsNacs {
 		db.Connect(dbaddress)
-		if test.insertRecords  {
+		if test.insertRecords {
 			insertRecords(10)
 		}
-		if (test.ackRecords) {
-			db.ackRecord(dbname, collection, groupId,"{\"Id\":2,\"Op\":\"ackimage\"}")
-			db.ackRecord(dbname, collection, groupId,"{\"Id\":3,\"Op\":\"ackimage\"}")
-			db.ackRecord(dbname, collection, groupId,"{\"Id\":4,\"Op\":\"ackimage\"}")
+		if test.ackRecords {
+			db.ackRecord(dbname, collection, groupId, "{\"Id\":2,\"Op\":\"ackimage\"}")
+			db.ackRecord(dbname, collection, groupId, "{\"Id\":3,\"Op\":\"ackimage\"}")
+			db.ackRecord(dbname, collection, groupId, "{\"Id\":4,\"Op\":\"ackimage\"}")
 		}
-		res, err := db.ProcessRequest(dbname, collection, groupId, "nacks", test.rangeString)
+
+		res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "nacks", ExtraParam: test.rangeString})
 		if test.ok {
 			assert.Nil(t, err, test.test)
-			assert.Equal(t, test.resString, string(res),test.test)
+			assert.Equal(t, test.resString, string(res), test.test)
 		} else {
 			assert.NotNil(t, err, test.test)
 		}
@@ -748,43 +746,43 @@ func TestMongoDBNacks(t *testing.T) {
 
 var testsLastAcs = []struct {
 	insertRecords bool
-	ackRecords bool
-	resString string
-	test string
+	ackRecords    bool
+	resString     string
+	test          string
 }{
-	{false,false,"{\"lastAckId\":0}","empty db"},
-	{true,false,"{\"lastAckId\":0}","no acks"},
-	{true,true,"{\"lastAckId\":4}","last ack 4"},
+	{false, false, "{\"lastAckId\":0}", "empty db"},
+	{true, false, "{\"lastAckId\":0}", "no acks"},
+	{true, true, "{\"lastAckId\":4}", "last ack 4"},
 }
 
-
 func TestMongoDBLastAcks(t *testing.T) {
 	for _, test := range testsLastAcs {
 		db.Connect(dbaddress)
-		if test.insertRecords  {
+		if test.insertRecords {
 			insertRecords(10)
 		}
-		if (test.ackRecords) {
-			db.ackRecord(dbname, collection, groupId,"{\"Id\":2,\"Op\":\"ackimage\"}")
-			db.ackRecord(dbname, collection, groupId,"{\"Id\":3,\"Op\":\"ackimage\"}")
-			db.ackRecord(dbname, collection, groupId,"{\"Id\":4,\"Op\":\"ackimage\"}")
+		if test.ackRecords {
+			db.ackRecord(dbname, collection, groupId, "{\"Id\":2,\"Op\":\"ackimage\"}")
+			db.ackRecord(dbname, collection, groupId, "{\"Id\":3,\"Op\":\"ackimage\"}")
+			db.ackRecord(dbname, collection, groupId, "{\"Id\":4,\"Op\":\"ackimage\"}")
 		}
-		res, err := db.ProcessRequest(dbname, collection, groupId, "lastack", "")
+
+		res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "lastack"})
 		assert.Nil(t, err, test.test)
-		assert.Equal(t, test.resString, string(res),test.test)
+		assert.Equal(t, test.resString, string(res), test.test)
 		cleanup()
 	}
 }
 
-
 func TestMongoDBGetNextUsesInprocessedImmedeatly(t *testing.T) {
 	db.SetSettings(DBSettings{ReadFromInprocessPeriod: 0})
 	db.Connect(dbaddress)
 	defer cleanup()
 	err := db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
-	res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0_3")
-	res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "0_3")
+
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
+	res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
 
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
@@ -797,14 +795,14 @@ func TestMongoDBGetNextUsesInprocessedNumRetry(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	err := db.insertRecord(dbname, collection, &rec1)
-	res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0_1")
-	res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "0_1")
-	_, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "0_1")
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
+	res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
+	_, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
 
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
 	assert.NotNil(t, err2)
-	if err2!=nil {
+	if err2 != nil {
 		assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"\"}", err2.Error())
 	}
 	assert.Equal(t, string(rec1_expect), string(res))
@@ -817,10 +815,10 @@ func TestMongoDBGetNextUsesInprocessedAfterTimeout(t *testing.T) {
 	defer cleanup()
 	err := db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
-	res, err := db.ProcessRequest(dbname, collection, groupId, "next", "1_3")
-	res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "1_3")
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"})
+	res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"})
 	time.Sleep(time.Second)
-	res2, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "1_3")
+	res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"})
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
 	assert.Nil(t, err2)
@@ -835,10 +833,10 @@ func TestMongoDBGetNextReturnsToNormalAfterUsesInprocessed(t *testing.T) {
 	defer cleanup()
 	err := db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
-	res, err := db.ProcessRequest(dbname, collection, groupId, "next", "1_3")
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"})
 	time.Sleep(time.Second)
-	res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "1_3")
-	res2, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "1_3")
+	res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"})
+	res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"})
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
 	assert.Nil(t, err2)
@@ -847,15 +845,14 @@ func TestMongoDBGetNextReturnsToNormalAfterUsesInprocessed(t *testing.T) {
 	assert.Equal(t, string(rec2_expect), string(res2))
 }
 
-
 func TestMongoDBGetNextUsesInprocessedImmedeatlyIfFinishedStream(t *testing.T) {
 	db.SetSettings(DBSettings{ReadFromInprocessPeriod: 10})
 	db.Connect(dbaddress)
 	defer cleanup()
 	err := db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec_finished)
-	res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0_3")
-	res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "0_3")
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
+	res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
 	assert.Equal(t, string(rec1_expect), string(res))
@@ -868,9 +865,9 @@ func TestMongoDBGetNextUsesInprocessedImmedeatlyIfEndofStream(t *testing.T) {
 	defer cleanup()
 	err := db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
-	res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0_3")
-	res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "0_3")
-	res2, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "0_3")
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
+	res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
+	res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
 	assert.Nil(t, err2)
@@ -884,43 +881,43 @@ func TestMongoDBAckDeletesInprocessed(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	db.ProcessRequest(dbname, collection, groupId, "next", "0_3")
+	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
 	query_str := "{\"Id\":1,\"Op\":\"ackimage\"}"
-	db.ProcessRequest(dbname, collection, groupId, "ackimage", query_str)
-	_, err := db.ProcessRequest(dbname, collection, groupId, "next", "0_3")
+
+	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "ackimage", ExtraParam: query_str})
+	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
 	assert.NotNil(t, err)
-	if err!=nil {
+	if err != nil {
 		assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 		assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"\"}", err.Error())
 	}
 }
 
-
 func TestMongoDBNegAck(t *testing.T) {
 	db.SetSettings(DBSettings{ReadFromInprocessPeriod: 0})
 	db.Connect(dbaddress)
 	defer cleanup()
 	inputParams := struct {
-		Id int
+		Id     int
 		Params struct {
 			DelaySec int
 		}
 	}{}
 	inputParams.Id = 1
-	inputParams.Params.DelaySec=0
-
+	inputParams.Params.DelaySec = 0
 
 	db.insertRecord(dbname, collection, &rec1)
-	db.ProcessRequest(dbname, collection, groupId, "next", "")
-	bparam,_:= json.Marshal(&inputParams)
-	db.ProcessRequest(dbname, collection, groupId, "negackimage", string(bparam))
-	res, err := db.ProcessRequest(dbname, collection, groupId, "next", "") // first time image from negack
-	_, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "") // second time nothing
+	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	bparam, _ := json.Marshal(&inputParams)
+
+	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "negackimage", ExtraParam: string(bparam)})
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) // first time image from negack
+	_, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})  // second time nothing
 
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec1_expect), string(res))
 	assert.NotNil(t, err1)
-	if err1!=nil {
+	if err1 != nil {
 		assert.Equal(t, utils.StatusNoData, err1.(*DBError).Code)
 		assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"\"}", err1.Error())
 	}
@@ -932,11 +929,12 @@ func TestMongoDBGetNextClearsInprocessAfterReset(t *testing.T) {
 	defer cleanup()
 	err := db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
-	res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0_1")
-	res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "0_1")
-	db.ProcessRequest(dbname, collection, groupId, "resetcounter", "0")
-	res2, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "0_1")
-	res3, err3 := db.ProcessRequest(dbname, collection, groupId, "next", "0_1")
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
+	res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
+
+	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "resetcounter", ExtraParam: "0"})
+	res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
+	res3, err3 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
 
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
@@ -946,4 +944,4 @@ func TestMongoDBGetNextClearsInprocessAfterReset(t *testing.T) {
 	assert.Equal(t, string(rec1_expect), string(res1))
 	assert.Equal(t, string(rec1_expect), string(res2))
 	assert.Equal(t, string(rec1_expect), string(res3))
-}
\ No newline at end of file
+}
diff --git a/broker/src/asapo_broker/server/get_commands_test.go b/broker/src/asapo_broker/server/get_commands_test.go
index c0a4eba12b086ee9be0ecc32efc2795ae6afef28..08f789619468417d6cdce5ff85799000d813d0f5 100644
--- a/broker/src/asapo_broker/server/get_commands_test.go
+++ b/broker/src/asapo_broker/server/get_commands_test.go
@@ -56,7 +56,7 @@ var testsGetCommand = []struct {
 
 func (suite *GetCommandsTestSuite) TestGetCommandsCallsCorrectRoutine() {
 	for _, test := range testsGetCommand {
-		suite.mock_db.On("ProcessRequest", expectedDBName, test.substream, test.groupid, test.command, test.externalParam).Return([]byte("Hello"), nil)
+		suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, DbCollectionName: test.substream, GroupId: test.groupid, Op: test.command, ExtraParam: test.externalParam}).Return([]byte("Hello"), nil)
 		logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request "+test.command)))
 		w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + test.reqString+correctTokenSuffix+test.queryParams)
 		suite.Equal(http.StatusOK, w.Code, test.command+ " OK")
diff --git a/broker/src/asapo_broker/server/get_next.go b/broker/src/asapo_broker/server/get_next.go
index 8297dc97322cb60595b9fbb82bd3a5c046d1d986..9e588fb9a73da679beffef9598aa9aa8d5df61d1 100644
--- a/broker/src/asapo_broker/server/get_next.go
+++ b/broker/src/asapo_broker/server/get_next.go
@@ -4,8 +4,6 @@ import (
 	"net/http"
 )
 
-
-
 func extractResend(r *http.Request) (string) {
 	keys := r.URL.Query()
 	resend := keys.Get("resend_nacks")
@@ -18,7 +16,6 @@ func extractResend(r *http.Request) (string) {
 	return resend_params
 }
 
-
 func routeGetNext(w http.ResponseWriter, r *http.Request) {
 	processRequest(w, r, "next", extractResend(r), true)
 }
diff --git a/broker/src/asapo_broker/server/post_op_image_test.go b/broker/src/asapo_broker/server/post_op_image_test.go
index fbba23069496813d34f5aae1ca30cd5387ac3c45..19d70f940d94a86d8ac9cec30848accacadf12e9 100644
--- a/broker/src/asapo_broker/server/post_op_image_test.go
+++ b/broker/src/asapo_broker/server/post_op_image_test.go
@@ -34,7 +34,7 @@ func TestImageOpTestSuite(t *testing.T) {
 
 func (suite *ImageOpTestSuite) TestAckImageOpOK() {
 	query_str := "{\"Id\":1,\"Op\":\"ackimage\"}"
-	suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "ackimage", query_str).Return([]byte(""), nil)
+	suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, DbCollectionName: expectedSubstream, GroupId: expectedGroupID, Op: "ackimage", ExtraParam: query_str}).Return([]byte(""), nil)
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request ackimage")))
 	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/1" + correctTokenSuffix,"POST",query_str)
 	suite.Equal(http.StatusOK, w.Code, "ackimage OK")
diff --git a/broker/src/asapo_broker/server/post_query_images_test.go b/broker/src/asapo_broker/server/post_query_images_test.go
index 51ed9c45fd32814564fb87ccf9fc17b783897a2e..49010054bd85f9d34de110e08c680505df97dde4 100644
--- a/broker/src/asapo_broker/server/post_query_images_test.go
+++ b/broker/src/asapo_broker/server/post_query_images_test.go
@@ -34,7 +34,8 @@ func TestQueryTestSuite(t *testing.T) {
 
 func (suite *QueryTestSuite) TestQueryOK() {
 	query_str := "aaaa"
-	suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, "", "queryimages", query_str).Return([]byte("{}"), nil)
+
+	suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, DbCollectionName: expectedSubstream,Op: "queryimages", ExtraParam: query_str}).Return([]byte("{}"), nil)
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request queryimages")))
 
 	w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedStream+"/"+expectedSubstream+"/0/queryimages"+correctTokenSuffix, "POST", query_str)
diff --git a/broker/src/asapo_broker/server/post_reset_counter_test.go b/broker/src/asapo_broker/server/post_reset_counter_test.go
index bb2f2b2a224ea666e0c543046dd4264f2539ad72..d35f116a15d063dc6be8264f59ac0468b54c3ee1 100644
--- a/broker/src/asapo_broker/server/post_reset_counter_test.go
+++ b/broker/src/asapo_broker/server/post_reset_counter_test.go
@@ -33,7 +33,9 @@ func TestResetCounterTestSuite(t *testing.T) {
 }
 
 func (suite *ResetCounterTestSuite) TestResetCounterOK() {
-	suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "resetcounter", "10").Return([]byte(""), nil)
+	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedSubstream, GroupId:expectedGroupID, Op: "resetcounter", ExtraParam: "10"}
+	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), nil)
+
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request resetcounter")))
 
 	w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedStream+"/"+expectedSubstream+"/"+expectedGroupID+"/resetcounter"+correctTokenSuffix+"&value=10", "POST")
diff --git a/broker/src/asapo_broker/server/process_request.go b/broker/src/asapo_broker/server/process_request.go
index d65dce7748b43c61ac255e9df1f18f100752bf7c..5b4bf2f3c3d09a97cb26166a53d634e487a01e50 100644
--- a/broker/src/asapo_broker/server/process_request.go
+++ b/broker/src/asapo_broker/server/process_request.go
@@ -24,8 +24,6 @@ func extractRequestParameters(r *http.Request, needGroupID bool) (string, string
 	return db_name, stream, substream, group_id, ok1 && ok2 && ok3 && ok4
 }
 
-var Sink bool
-
 func IsLetterOrNumbers(s string) bool {
 	for _, r := range s {
 		if (r < 'a' || r > 'z') && (r < 'A' || r > 'Z') && (r<'0' || r>'9') {
@@ -69,11 +67,18 @@ func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_par
 		return
 	}
 
-	if datasetRequested(r) {
-		op = op + "_dataset"
+	request := database.Request{}
+	request.DbName = db_name+"_"+stream
+	request.Op = op
+	request.ExtraParam = extra_param
+	request.DbCollectionName = substream
+	request.GroupId = group_id
+	if yes, minSize := datasetRequested(r); yes {
+		request.DatasetOp = true
+		request.MinDatasetSize = minSize
 	}
 
-	answer, code := processRequestInDb(db_name+"_"+stream, substream, group_id, op, extra_param)
+	answer, code := processRequestInDb(request)
 	w.WriteHeader(code)
 	w.Write(answer)
 }
@@ -110,10 +115,10 @@ func reconnectIfNeeded(db_error error) {
 	}
 }
 
-func processRequestInDb(db_name string, data_collection_name string, group_id string, op string, extra_param string) (answer []byte, code int) {
+func processRequestInDb(request database.Request) (answer []byte, code int) {
 	statistics.IncreaseCounter()
-	answer, err := db.ProcessRequest(db_name, data_collection_name, group_id, op, extra_param)
-	log_str := "processing request " + op + " in " + db_name + " at " + settings.GetDatabaseServer()
+	answer, err := db.ProcessRequest(request)
+	log_str := "processing request " + request.Op + " in " + request.DbName + " at " + settings.GetDatabaseServer()
 	if err != nil {
 		go reconnectIfNeeded(err)
 		return returnError(err, log_str)
diff --git a/broker/src/asapo_broker/server/process_request_test.go b/broker/src/asapo_broker/server/process_request_test.go
index 1757decba923ba6e98e2b107b2f7b6c3f168b4f1..5aa7b28fc2a622dc88a6ec6f0c02c1951517c233 100644
--- a/broker/src/asapo_broker/server/process_request_test.go
+++ b/broker/src/asapo_broker/server/process_request_test.go
@@ -123,7 +123,10 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithNoToken() {
 }
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() {
-	suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "").Return([]byte(""),
+
+	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedSubstream, GroupId:expectedGroupID, Op: "next"}
+
+	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""),
 		&database.DBError{utils.StatusNoData, ""})
 
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next")))
@@ -134,7 +137,10 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName()
 }
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() {
-	suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "").Return([]byte(""),
+
+	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedSubstream, GroupId:expectedGroupID, Op: "next"}
+
+	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""),
 		&database.DBError{utils.StatusServiceUnavailable, ""})
 
 	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next")))
@@ -147,7 +153,11 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() {
 }
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() {
-	suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "").Return([]byte(""), errors.New(""))
+
+	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedSubstream, GroupId:expectedGroupID, Op: "next"}
+
+
+	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), errors.New(""))
 	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next")))
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("reconnected")))
 
@@ -159,7 +169,11 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() {
 }
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() {
-	suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "").Return([]byte("Hello"), nil)
+
+	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedSubstream, GroupId:expectedGroupID, Op: "next"}
+	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil)
+
+
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName)))
 
 	doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/next" + correctTokenSuffix)
@@ -173,8 +187,11 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWrongGroupID() {
 }
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() {
-	suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next_dataset", "").Return([]byte("Hello"), nil)
-	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next_dataset in "+expectedDBName)))
+
+	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedSubstream, GroupId:expectedGroupID, DatasetOp:true, Op: "next"}
+	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil)
+
+	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName)))
 
 	doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/next" + correctTokenSuffix + "&dataset=true")
 }
diff --git a/broker/src/asapo_broker/server/request_common.go b/broker/src/asapo_broker/server/request_common.go
index b8314701084a70177c0f762c0f32ca90c8b12993..28f2bc38d729c1cc14cafc6e696bdf71cfc9ccd8 100644
--- a/broker/src/asapo_broker/server/request_common.go
+++ b/broker/src/asapo_broker/server/request_common.go
@@ -4,6 +4,7 @@ import (
 	"asapo_common/logger"
 	"errors"
 	"net/http"
+	"strconv"
 )
 
 func writeAuthAnswer(w http.ResponseWriter, requestName string, db_name string, err string) {
@@ -13,7 +14,7 @@ func writeAuthAnswer(w http.ResponseWriter, requestName string, db_name string,
 	w.Write([]byte(err))
 }
 
-func ValueTrue(r *http.Request, key string) bool {
+func valueTrue(r *http.Request, key string) bool {
 	val := r.URL.Query().Get(key)
 	if len(val) == 0 {
 		return false
@@ -25,8 +26,21 @@ func ValueTrue(r *http.Request, key string) bool {
 	return false
 }
 
-func datasetRequested(r *http.Request) bool {
-	return ValueTrue(r, "dataset")
+func valueInt(r *http.Request, key string) int {
+	val := r.URL.Query().Get(key)
+	if len(val) == 0 {
+		return 0
+	}
+
+	i, err := strconv.Atoi(val)
+	if err != nil {
+		return 0
+	}
+	return i
+}
+
+func datasetRequested(r *http.Request) (bool,int) {
+	return valueTrue(r, "dataset"),valueInt(r,"minsize")
 }
 
 func testAuth(r *http.Request, beamtime_id string) error {
diff --git a/examples/consumer/getnext_broker/getnext_broker.cpp b/examples/consumer/getnext_broker/getnext_broker.cpp
index 51c5e1694927cdafce88f13e27a3bcae202b497e..e8c9a826e496d8b2b0569ea8fe4f1ae6a3bb0309 100644
--- a/examples/consumer/getnext_broker/getnext_broker.cpp
+++ b/examples/consumer/getnext_broker/getnext_broker.cpp
@@ -132,7 +132,7 @@ StartThreads(const Args& params, std::vector<int>* nfiles, std::vector<int>* err
         bool isFirstFile = true;
         while (true) {
             if (params.datasets) {
-                auto dataset = broker->GetNextDataset(group_id, &err);
+                auto dataset = broker->GetNextDataset(group_id, 0, &err);
                 if (err == nullptr) {
                     for (auto& fi : dataset.content) {
                         (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1;
diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp
index dc1ecce0fcb17335583d4581f10f59870eebbdac..fdad4330e5da16df13c0eedd617fd597f95ff8c6 100644
--- a/tests/automatic/consumer/consumer_api/consumer_api.cpp
+++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp
@@ -207,7 +207,7 @@ void TestDataset(const std::unique_ptr<asapo::DataBroker>& broker, const std::st
     asapo::FileInfo fi;
     asapo::Error err;
 
-    auto dataset = broker->GetNextDataset(group_id, &err);
+    auto dataset = broker->GetNextDataset(group_id, 0, &err);
     if (err) {
         std::cout << err->Explain() << std::endl;
     }
@@ -223,18 +223,18 @@ void TestDataset(const std::unique_ptr<asapo::DataBroker>& broker, const std::st
     M_AssertEq("hello1", std::string(data.get(), data.get() + dataset.content[0].size));
 
 
-    dataset = broker->GetLastDataset(group_id, &err);
+    dataset = broker->GetLastDataset(group_id, 0, &err);
     M_AssertTrue(err == nullptr, "GetLast no error");
     M_AssertTrue(dataset.content[0].name == "10_1", "GetLastDataset filename");
     M_AssertTrue(dataset.content[0].metadata == "{\"test\":10}", "GetLastDataset metadata");
 
-    dataset = broker->GetNextDataset(group_id, &err);
+    dataset = broker->GetNextDataset(group_id, 0, &err);
     M_AssertTrue(err != nullptr, "GetNextDataset2 error");
 
-    dataset = broker->GetLastDataset(group_id, &err);
+    dataset = broker->GetLastDataset(group_id,0, &err);
     M_AssertTrue(err == nullptr, "GetLastDataset2 no error");
 
-    dataset = broker->GetDatasetById(8, group_id, &err);
+    dataset = broker->GetDatasetById(8, group_id, 0, &err);
     M_AssertTrue(err == nullptr, "GetDatasetById error");
     M_AssertTrue(dataset.content[2].name == "8_3", "GetDatasetById filename");
 
diff --git a/tests/manual/performance_broker_receiver/getlast_broker.cpp b/tests/manual/performance_broker_receiver/getlast_broker.cpp
index 658af3435661e504e64bad6d4a90f0b40d9adb42..c3a285c54a0cdc6000e83ce1afe4483ccb351336 100644
--- a/tests/manual/performance_broker_receiver/getlast_broker.cpp
+++ b/tests/manual/performance_broker_receiver/getlast_broker.cpp
@@ -81,7 +81,7 @@ std::vector<std::thread> StartThreads(const Args& params,
         while (std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now() - start).count() <
                 params.timeout_ms) {
             if (params.datasets) {
-                auto dataset = broker->GetLastDataset(group_id, &err);
+                auto dataset = broker->GetLastDataset(group_id, 0, &err);
                 if (err == nullptr) {
                     for (auto& fi : dataset.content) {
                         (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1;