From f0b85aa31f45fe2b64a7a0703c9c73ecbaedebc8 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Mon, 14 Jun 2021 10:48:30 +0200
Subject: [PATCH] fix tests

---
 broker/src/asapo_broker/database/database.go  |  12 +-
 broker/src/asapo_broker/database/encoding.go  |   4 +-
 .../asapo_broker/database/encoding_test.go    |  30 +--
 broker/src/asapo_broker/database/mongodb.go   |  73 ++---
 .../src/asapo_broker/database/mongodb_test.go | 253 ++++++++++--------
 .../asapo_broker/server/get_commands_test.go  |   4 +-
 .../src/asapo_broker/server/get_meta_test.go  |   4 +-
 .../asapo_broker/server/post_op_image_test.go |   2 +-
 .../server/post_query_images_test.go          |   2 +-
 .../server/post_reset_counter_test.go         |   2 +-
 .../asapo_broker/server/process_request.go    |   2 +-
 .../server/process_request_test.go            |  12 +-
 .../api/cpp/include/asapo/producer/producer.h |   5 +-
 13 files changed, 216 insertions(+), 189 deletions(-)

diff --git a/broker/src/asapo_broker/database/database.go b/broker/src/asapo_broker/database/database.go
index 0bb12f252..7e48a6e52 100644
--- a/broker/src/asapo_broker/database/database.go
+++ b/broker/src/asapo_broker/database/database.go
@@ -3,13 +3,13 @@ package database
 import "asapo_common/utils"
 
 type Request struct {
-	DbName string
-	DbCollectionName string
-	GroupId string
-	Op string
-	DatasetOp bool
+	DbName         string
+	Stream         string
+	GroupId        string
+	Op             string
+	DatasetOp      bool
 	MinDatasetSize int
-	ExtraParam string
+	ExtraParam     string
 }
 
 type Agent interface {
diff --git a/broker/src/asapo_broker/database/encoding.go b/broker/src/asapo_broker/database/encoding.go
index 7f0839797..6e61d95f0 100644
--- a/broker/src/asapo_broker/database/encoding.go
+++ b/broker/src/asapo_broker/database/encoding.go
@@ -85,8 +85,8 @@ func encodeRequest(request *Request) error {
 		return &DBError{utils.StatusWrongInput, "source name is too long"}
 	}
 
-	request.DbCollectionName = encodeStringForColName(request.DbCollectionName)
-	if len(request.DbCollectionName)> max_encoded_stream_size {
+	request.Stream = encodeStringForColName(request.Stream)
+	if len(request.Stream)> max_encoded_stream_size {
 		return &DBError{utils.StatusWrongInput, "stream name is too long"}
 	}
 
diff --git a/broker/src/asapo_broker/database/encoding_test.go b/broker/src/asapo_broker/database/encoding_test.go
index 1def90c99..1b018289e 100644
--- a/broker/src/asapo_broker/database/encoding_test.go
+++ b/broker/src/asapo_broker/database/encoding_test.go
@@ -18,16 +18,16 @@ func TestEncoding(t *testing.T) {
 	assert.Equal(t, sourceDecoded, source)
 
 	r := Request{
-		DbName:           source,
-		DbCollectionName: stream,
-		GroupId:          stream,
-		Op:               "",
-		DatasetOp:        false,
-		MinDatasetSize:   0,
-		ExtraParam:       "",
+		DbName:         source,
+		Stream:         stream,
+		GroupId:        stream,
+		Op:             "",
+		DatasetOp:      false,
+		MinDatasetSize: 0,
+		ExtraParam:     "",
 	}
 	err := encodeRequest(&r)
-	assert.Equal(t, r.DbCollectionName, streamEncoded)
+	assert.Equal(t, r.Stream, streamEncoded)
 	assert.Equal(t, r.GroupId, streamEncoded)
 	assert.Equal(t, r.DbName, sourceEncoded)
 
@@ -63,13 +63,13 @@ func TestEncodingTooLong(t *testing.T) {
 		group := RandomString(test.groupSize)
 		source := RandomString(test.sourceSize)
 		r := Request{
-			DbName:           source,
-			DbCollectionName: stream,
-			GroupId:          group,
-			Op:               "",
-			DatasetOp:        false,
-			MinDatasetSize:   0,
-			ExtraParam:       "",
+			DbName:         source,
+			Stream:         stream,
+			GroupId:        group,
+			Op:             "",
+			DatasetOp:      false,
+			MinDatasetSize: 0,
+			ExtraParam:     "",
 		}
 		err := encodeRequest(&r)
 		if test.ok {
diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go
index 75c885f30..af776a39a 100644
--- a/broker/src/asapo_broker/database/mongodb.go
+++ b/broker/src/asapo_broker/database/mongodb.go
@@ -182,7 +182,7 @@ func maxIndexQuery(request Request, returnIncompete bool) bson.M {
 }
 
 func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool) (max_id int, err error) {
-	c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.DbCollectionName)
+	c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.Stream)
 	q := maxIndexQuery(request, returnIncompete)
 
 	opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true)
@@ -211,7 +211,7 @@ func (db *Mongodb) setCounter(request Request, ind int) (err error) {
 	update := bson.M{"$set": bson.M{pointer_field_name: ind}}
 	opts := options.Update().SetUpsert(true)
 	c := db.client.Database(request.DbName).Collection(pointer_collection_name)
-	q := bson.M{"_id": request.GroupId + "_" + request.DbCollectionName}
+	q := bson.M{"_id": request.GroupId + "_" + request.Stream}
 	_, err = c.UpdateOne(context.TODO(), q, update, opts)
 	return
 }
@@ -228,7 +228,7 @@ func (db *Mongodb) errorWhenCannotIncrementField(request Request, max_ind int) (
 func (db *Mongodb) incrementField(request Request, max_ind int, res interface{}) (err error) {
 	update := bson.M{"$inc": bson.M{pointer_field_name: 1}}
 	opts := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After)
-	q := bson.M{"_id": request.GroupId + "_" + request.DbCollectionName, pointer_field_name: bson.M{"$lt": max_ind}}
+	q := bson.M{"_id": request.GroupId + "_" + request.Stream, pointer_field_name: bson.M{"$lt": max_ind}}
 	c := db.client.Database(request.DbName).Collection(pointer_collection_name)
 
 	err = c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res)
@@ -283,7 +283,7 @@ func recordContainsPartialData(request Request, rec map[string]interface{}) bool
 
 func (db *Mongodb) getRecordFromDb(request Request, id, id_max int) (res map[string]interface{}, err error) {
 	q := bson.M{"_id": id}
-	c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.DbCollectionName)
+	c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.Stream)
 	err = c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res)
 	if err != nil {
 		answer := encodeAnswer(id, id_max, "")
@@ -369,7 +369,7 @@ func (db *Mongodb) negAckRecord(request Request) ([]byte, error) {
 		return nil, &DBError{utils.StatusWrongInput, err.Error()}
 	}
 
-	err = db.InsertRecordToInprocess(request.DbName, inprocess_collection_name_prefix+request.DbCollectionName+"_"+request.GroupId, input.Id, input.Params.DelayMs, 1, true)
+	err = db.InsertRecordToInprocess(request.DbName, inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, input.Id, input.Params.DelayMs, 1, true)
 	return []byte(""), err
 }
 
@@ -379,7 +379,7 @@ func (db *Mongodb) ackRecord(request Request) ([]byte, error) {
 	if err != nil {
 		return nil, &DBError{utils.StatusWrongInput, err.Error()}
 	}
-	c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId)
+	c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId)
 	_, err = c.InsertOne(context.Background(), &record)
 	if err != nil {
 		if duplicateError(err) {
@@ -388,7 +388,7 @@ func (db *Mongodb) ackRecord(request Request) ([]byte, error) {
 		return nil, err
 	}
 
-	c = db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId)
+	c = db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.Stream + "_" + request.GroupId)
 	_, err_del := c.DeleteOne(context.Background(), bson.M{"_id": record.ID})
 	if err_del != nil {
 		return nil, &DBError{utils.StatusWrongInput, err.Error()}
@@ -402,7 +402,7 @@ func (db *Mongodb) checkDatabaseOperationPrerequisites(request Request) error {
 		return &DBError{utils.StatusServiceUnavailable, no_session_msg}
 	}
 
-	if len(request.DbName) == 0 || len(request.DbCollectionName) == 0 {
+	if len(request.DbName) == 0 || len(request.Stream) == 0 {
 		return &DBError{utils.StatusWrongInput, "beamtime_id ans stream must be set"}
 	}
 
@@ -497,10 +497,10 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi
 	}
 	tNow := time.Now().Unix()
 	dbSessionLock.Lock()
-	t := db.lastReadFromInprocess[request.DbCollectionName+"_"+request.GroupId]
+	t := db.lastReadFromInprocess[request.Stream+"_"+request.GroupId]
 	dbSessionLock.Unlock()
 	if (t <= tNow-int64(db.settings.ReadFromInprocessPeriod)) || ignoreTimeout {
-		record_ind, err = db.getUnProcessedId(request.DbName, inprocess_collection_name_prefix+request.DbCollectionName+"_"+request.GroupId, delayMs, nResendAttempts)
+		record_ind, err = db.getUnProcessedId(request.DbName, inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, delayMs, nResendAttempts)
 		if err != nil {
 			log_str := "error getting unprocessed id " + request.DbName + ", groupid: " + request.GroupId + ":" + err.Error()
 			logger.Debug(log_str)
@@ -514,7 +514,7 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi
 		}
 	} else {
 		dbSessionLock.Lock()
-		db.lastReadFromInprocess[request.DbCollectionName+"_"+request.GroupId] = time.Now().Unix()
+		db.lastReadFromInprocess[request.Stream+"_"+request.GroupId] = time.Now().Unix()
 		dbSessionLock.Unlock()
 	}
 
@@ -595,7 +595,7 @@ func checkStreamFinished(request Request, id, id_max int, data map[string]interf
 	if !ok || !r.FinishedStream {
 		return nil
 	}
-	log_str := "reached end of stream " + request.DbCollectionName + " , next_stream: " + r.NextStream
+	log_str := "reached end of stream " + request.Stream + " , next_stream: " + r.NextStream
 	logger.Debug(log_str)
 
 	answer := encodeAnswer(r.ID-1, r.ID-1, r.NextStream)
@@ -614,7 +614,7 @@ func (db *Mongodb) getNextRecord(request Request) ([]byte, error) {
 	}
 
 	if err == nil {
-		err_update := db.InsertToInprocessIfNeeded(request.DbName, inprocess_collection_name_prefix+request.DbCollectionName+"_"+request.GroupId, nextInd, request.ExtraParam)
+		err_update := db.InsertToInprocessIfNeeded(request.DbName, inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, nextInd, request.ExtraParam)
 		if err_update != nil {
 			return nil, err_update
 		}
@@ -642,7 +642,7 @@ func getSizeFilter(request Request) bson.M {
 }
 
 func (db *Mongodb) getSize(request Request) ([]byte, error) {
-	c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.DbCollectionName)
+	c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.Stream)
 
 	filter := getSizeFilter(request)
 	size, err := c.CountDocuments(context.TODO(), filter, options.Count())
@@ -669,7 +669,7 @@ func (db *Mongodb) resetCounter(request Request) ([]byte, error) {
 		return []byte(""), err
 	}
 
-	c := db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId)
+	c := db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.Stream + "_" + request.GroupId)
 	_, err_del := c.DeleteMany(context.Background(), bson.M{"_id": bson.M{"$gte": id}})
 	if err_del != nil {
 		return nil, &DBError{utils.StatusWrongInput, err.Error()}
@@ -678,22 +678,32 @@ func (db *Mongodb) resetCounter(request Request) ([]byte, error) {
 	return []byte(""), nil
 }
 
+func getMetaId(request Request) (string, error) {
+	switch request.ExtraParam {
+	case "0":
+		return "bt", nil
+	case "1":
+		return "st_" + request.Stream, nil
+	default:
+		return "", &DBError{utils.StatusWrongInput, "wrong meta type"}
+	}
+}
+
 func (db *Mongodb) getMeta(request Request) ([]byte, error) {
-	id, err := strconv.Atoi(request.ExtraParam)
+	id, err := getMetaId(request)
 	if err != nil {
 		return nil, err
 	}
-
-	var res map[string]interface{}
 	q := bson.M{"_id": id}
+	var res map[string]interface{}
 	c := db.client.Database(request.DbName).Collection(meta_collection_name)
 	err = c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res)
 	if err != nil {
-		log_str := "error getting meta with id " + strconv.Itoa(id) + " for " + request.DbName + " : " + err.Error()
+		log_str := "error getting meta for " + id + " in " + request.DbName + " : " + err.Error()
 		logger.Debug(log_str)
 		return nil, &DBError{utils.StatusNoData, err.Error()}
 	}
-	log_str := "got record id " + strconv.Itoa(id) + " for " + request.DbName
+	log_str := "got metadata for " + id + " in " + request.DbName
 	logger.Debug(log_str)
 	return utils.MapToJson(&res)
 }
@@ -713,7 +723,7 @@ func (db *Mongodb) queryMessages(request Request) ([]byte, error) {
 		return nil, &DBError{utils.StatusWrongInput, err.Error()}
 	}
 
-	c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.DbCollectionName)
+	c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.Stream)
 	opts := options.Find()
 
 	if len(sort) > 0 {
@@ -832,14 +842,14 @@ func (db *Mongodb) collectionExist(request Request, name string) (bool, error) {
 }
 
 func (db *Mongodb) deleteDataCollection(errorOnNotexist bool, request Request) error {
-	dataCol := data_collection_name_prefix + request.DbCollectionName
+	dataCol := data_collection_name_prefix + request.Stream
 	if errorOnNotexist {
 		exist, err := db.collectionExist(request, dataCol)
 		if err != nil {
 			return err
 		}
 		if !exist {
-			return &DBError{utils.StatusWrongInput, "stream " + request.DbCollectionName + " does not exist"}
+			return &DBError{utils.StatusWrongInput, "stream " + request.Stream + " does not exist"}
 		}
 	}
 	return db.deleteCollection(request, dataCol)
@@ -851,10 +861,10 @@ func (db *Mongodb) deleteDocumentsInCollection(request Request, collection strin
 	return err
 }
 
-func escapeQuery(query string )(res string) {
+func escapeQuery(query string) (res string) {
 	chars := `\-[]{}()*+?.,^$|#`
 	for _, char := range chars {
-		query = strings.ReplaceAll(query,string(char),`\`+string(char))
+		query = strings.ReplaceAll(query, string(char), `\`+string(char))
 	}
 	return query
 }
@@ -877,15 +887,15 @@ func (db *Mongodb) deleteCollectionsWithPrefix(request Request, prefix string) e
 }
 
 func (db *Mongodb) deleteServiceMeta(request Request) error {
-	err := db.deleteCollectionsWithPrefix(request, acks_collection_name_prefix+request.DbCollectionName)
+	err := db.deleteCollectionsWithPrefix(request, acks_collection_name_prefix+request.Stream)
 	if err != nil {
 		return err
 	}
-	err = db.deleteCollectionsWithPrefix(request, inprocess_collection_name_prefix+request.DbCollectionName)
+	err = db.deleteCollectionsWithPrefix(request, inprocess_collection_name_prefix+request.Stream)
 	if err != nil {
 		return err
 	}
-	return db.deleteDocumentsInCollection(request, pointer_collection_name, "_id", ".*_"+escapeQuery(request.DbCollectionName)+"$")
+	return db.deleteDocumentsInCollection(request, pointer_collection_name, "_id", ".*_"+escapeQuery(request.Stream)+"$")
 }
 
 func (db *Mongodb) deleteStream(request Request) ([]byte, error) {
@@ -903,7 +913,7 @@ func (db *Mongodb) deleteStream(request Request) ([]byte, error) {
 		return nil, &DBError{utils.StatusWrongInput, "wrong params: " + request.ExtraParam}
 	}
 	if !*params.DeleteMeta {
-		logger.Debug("skipping delete stream meta for " + request.DbCollectionName + " in " + request.DbName)
+		logger.Debug("skipping delete stream meta for " + request.Stream + " in " + request.DbName)
 		return nil, nil
 	}
 
@@ -917,7 +927,7 @@ func (db *Mongodb) deleteStream(request Request) ([]byte, error) {
 }
 
 func (db *Mongodb) lastAck(request Request) ([]byte, error) {
-	c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId)
+	c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId)
 	opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true)
 	result := LastAck{0}
 	var q bson.M = nil
@@ -984,7 +994,7 @@ func extractNacsFromCursor(err error, cursor *mongo.Cursor) ([]int, error) {
 }
 
 func (db *Mongodb) getNacks(request Request, min_index, max_index int) ([]int, error) {
-	c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId)
+	c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId)
 
 	if res, err, ok := db.canAvoidDbRequest(min_index, max_index, c); ok {
 		return res, err
@@ -1004,7 +1014,6 @@ func (db *Mongodb) getStreams(request Request) ([]byte, error) {
 	return json.Marshal(&rec)
 }
 
-
 func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) {
 	if err := db.checkDatabaseOperationPrerequisites(request); err != nil {
 		return nil, err
diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go
index 9b4742ff9..21cfb0943 100644
--- a/broker/src/asapo_broker/database/mongodb_test.go
+++ b/broker/src/asapo_broker/database/mongodb_test.go
@@ -19,6 +19,14 @@ type TestRecord struct {
 	Timestamp int64             `bson:"timestamp" json:"timestamp"`
 }
 
+type TestRecordStreamBtMeta struct {
+	ID        string             `bson:"_id" json:"_id"`
+	Meta      string `bson:"meta" json:"meta"`
+}
+
+var recbt = TestRecordStreamBtMeta{"bt", "meta_bt"}
+var recst = TestRecordStreamBtMeta{"st_stream", "meta_st"}
+
 type TestDataset struct {
 	Timestamp int64        `bson:"timestamp" json:"timestamp"`
 	ID        int64        `bson:"_id" json:"_id"`
@@ -33,8 +41,7 @@ const collection = "stream"
 const collection2 = "stream2"
 const dbaddress = "127.0.0.1:27017"
 const groupId = "bid2a5auidddp1vl71d0"
-const metaID = 0
-const metaID_str = "0"
+const metaID = "bt"
 
 const badSymbolsDb = `/\."$`
 const badSymbolsCol = `$`
@@ -93,31 +100,31 @@ func TestMongoDBConnectOK(t *testing.T) {
 }
 
 func TestMongoDBGetNextErrorWhenNotConnected(t *testing.T) {
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
 	assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code)
 }
 
 func TestMongoDBGetMetaErrorWhenNotConnected(t *testing.T) {
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "meta", ExtraParam: "0"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "meta", ExtraParam: "0"})
 	assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code)
 }
 
 func TestMongoDBQueryMessagesErrorWhenNotConnected(t *testing.T) {
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "querymessages", ExtraParam: "0"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "querymessages", ExtraParam: "0"})
 	assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code)
 }
 
 func TestMongoDBGetNextErrorWhenWrongDatabasename(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
-	_, err := db.ProcessRequest(Request{DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	_, err := db.ProcessRequest(Request{Stream: collection, GroupId: groupId, Op: "next"})
 	assert.Equal(t, utils.StatusWrongInput, err.(*DBError).Code)
 }
 
 func TestMongoDBGetNextErrorWhenNonExistingDatacollectionname(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: "bla", GroupId: groupId, Op: "next"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: "bla", GroupId: groupId, Op: "next"})
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_stream\":\"\"}", err.Error())
 }
@@ -125,7 +132,7 @@ func TestMongoDBGetNextErrorWhenNonExistingDatacollectionname(t *testing.T) {
 func TestMongoDBGetLastErrorWhenNonExistingDatacollectionname(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: "bla", GroupId: groupId, Op: "last"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: "bla", GroupId: groupId, Op: "last"})
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_stream\":\"\"}", err.Error())
 }
@@ -133,7 +140,7 @@ func TestMongoDBGetLastErrorWhenNonExistingDatacollectionname(t *testing.T) {
 func TestMongoDBGetByIdErrorWhenNoData(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", ExtraParam: "2"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "2"})
 
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":0,\"next_stream\":\"\"}", err.Error())
@@ -143,7 +150,7 @@ func TestMongoDBGetNextErrorWhenRecordNotThereYet(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec2)
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":2,\"next_stream\":\"\"}", err.Error())
 }
@@ -152,7 +159,7 @@ func TestMongoDBGetNextOK(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec1_expect), string(res))
 }
@@ -163,8 +170,8 @@ func TestMongoDBGetNextErrorOnFinishedStream(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec_finished)
 
-	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
 
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message)
@@ -176,9 +183,9 @@ func TestMongoDBGetNextErrorOnFinishedStreamAlways(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec_finished)
 
-	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
-	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
+	db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
 
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message)
@@ -192,7 +199,7 @@ func TestMongoDBGetByIdErrorOnFinishedStream(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec_finished)
 
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", ExtraParam: "2"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "2"})
 
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message)
@@ -204,7 +211,7 @@ func TestMongoDBGetLastErrorOnFinishedStream(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec_finished)
 
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "last"})
 	fmt.Println(string(res))
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message)
@@ -214,8 +221,8 @@ func TestMongoDBGetNextErrorOnNoMoreData(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
 
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"\"}", err.(*DBError).Message)
@@ -226,8 +233,8 @@ func TestMongoDBGetNextCorrectOrder(t *testing.T) {
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec2)
 	db.insertRecord(dbname, collection, &rec1)
-	res1, _ := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
-	res2, _ := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	res1, _ := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
+	res2, _ := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
 	assert.Equal(t, string(rec1_expect), string(res1))
 	assert.Equal(t, string(rec2_expect), string(res2))
 }
@@ -264,7 +271,7 @@ func getRecords(n int, resend bool) []int {
 	for i := 0; i < n; i++ {
 		go func() {
 			defer wg.Done()
-			res_bin, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: extra_param})
+			res_bin, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: extra_param})
 			if err != nil {
 				fmt.Println("error at read ", i)
 			}
@@ -309,13 +316,13 @@ func TestMongoDBGetLastAfterErasingDatabase(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	insertRecords(10)
-	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
 	db.dropDatabase(dbname)
 
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
 
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", ExtraParam: "0"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "last", ExtraParam: "0"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec2_expect), string(res))
 }
@@ -324,7 +331,7 @@ func TestMongoDBGetNextAfterErasingDatabase(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	insertRecords(200)
-	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
 	db.dropDatabase(dbname)
 
 	n := 100
@@ -337,10 +344,10 @@ func TestMongoDBGetNextEmptyAfterErasingDatabase(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	insertRecords(10)
-	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
 	db.dropDatabase(dbname)
 
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_stream\":\"\"}", err.Error())
 }
@@ -350,7 +357,7 @@ func TestMongoDBgetRecordByID(t *testing.T) {
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
 
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", ExtraParam: "1"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "1"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec1_expect), string(res))
 }
@@ -359,7 +366,7 @@ func TestMongoDBgetRecordByIDFails(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", ExtraParam: "2"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "2"})
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":1,\"next_stream\":\"\"}", err.Error())
 }
@@ -368,7 +375,7 @@ func TestMongoDBGetRecordNext(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec1_expect), string(res))
 }
@@ -379,8 +386,8 @@ func TestMongoDBGetRecordNextMultipleCollections(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection2, &rec_dataset1)
 
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
-	res_string, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection2, GroupId: groupId, Op: "next", DatasetOp: true})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
+	res_string, err2 := db.ProcessRequest(Request{DbName: dbname, Stream: collection2, GroupId: groupId, Op: "next", DatasetOp: true})
 	var res_ds TestDataset
 	json.Unmarshal(res_string, &res_ds)
 
@@ -396,7 +403,7 @@ func TestMongoDBGetRecordID(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", ExtraParam: "1"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "1"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec1_expect), string(res))
 }
@@ -405,7 +412,7 @@ func TestMongoDBWrongOp(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "bla"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "bla"})
 	assert.NotNil(t, err)
 }
 
@@ -415,7 +422,7 @@ func TestMongoDBGetRecordLast(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
 
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", ExtraParam: "0"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "last", ExtraParam: "0"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec2_expect), string(res))
 }
@@ -426,13 +433,13 @@ func TestMongoDBGetNextAfterGetLastCorrect(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
 
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", ExtraParam: "0"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "last", ExtraParam: "0"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec2_expect), string(res))
 
 	db.insertRecord(dbname, collection, &rec3)
 
-	res, err = db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	res, err = db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec1_expect), string(res))
 
@@ -445,7 +452,7 @@ func TestMongoDBGetSize(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec2)
 	db.insertRecord(dbname, collection, &rec3)
 
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "size"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(recs1_expect), string(res))
 }
@@ -456,7 +463,7 @@ func TestMongoDBGetSizeWithFinishedStream(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec_finished)
 
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "size"})
 	assert.Nil(t, err)
 	var rec_expect, _ = json.Marshal(&SizeRecord{1})
 	assert.Equal(t, string(rec_expect), string(res))
@@ -467,10 +474,10 @@ func TestMongoDBGetSizeForDatasets(t *testing.T) {
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
 
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size", ExtraParam: "false"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "size", ExtraParam: "false"})
 	assert.Equal(t, utils.StatusWrongInput, err.(*DBError).Code)
 
-	_, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size", ExtraParam: "true"})
+	_, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "size", ExtraParam: "true"})
 	assert.Equal(t, utils.StatusWrongInput, err1.(*DBError).Code)
 }
 
@@ -480,7 +487,7 @@ func TestMongoDBGetSizeForDatasetsWithFinishedStream(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
 	db.insertRecord(dbname, collection, &rec_finished)
 
-	res, _ := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size", ExtraParam: "true"})
+	res, _ := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "size", ExtraParam: "true"})
 
 	var rec_expect, _ = json.Marshal(&SizeRecord{1})
 	assert.Equal(t, string(rec_expect), string(res))
@@ -495,7 +502,7 @@ func TestMongoDBGetSizeDataset(t *testing.T) {
 
 	size2_expect, _ := json.Marshal(SizeRecord{2})
 
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size", ExtraParam: "true"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "size", ExtraParam: "true"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(size2_expect), string(res))
 }
@@ -504,7 +511,7 @@ func TestMongoDBGetSizeNoRecords(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "size"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(recs2_expect), string(res))
 }
@@ -522,7 +529,7 @@ func TestMongoPingNotConected(t *testing.T) {
 }
 
 func TestMongoDBgetRecordByIDNotConnected(t *testing.T) {
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", ExtraParam: "1"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "1"})
 	assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code)
 }
 
@@ -532,29 +539,39 @@ func TestMongoDBResetCounter(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
 
-	res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	res1, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
 
 	assert.Nil(t, err1)
 	assert.Equal(t, string(rec1_expect), string(res1))
 
-	_, err_reset := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "resetcounter", ExtraParam: "1"})
+	_, err_reset := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "resetcounter", ExtraParam: "1"})
 	assert.Nil(t, err_reset)
 
-	res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	res2, err2 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
 
 	assert.Nil(t, err2)
 	assert.Equal(t, string(rec2_expect), string(res2))
 }
 
-func TestMongoDBGetMetaOK(t *testing.T) {
-	recm := rec1
+func TestMongoDBGetMetaBtOK(t *testing.T) {
+	db.Connect(dbaddress)
+	defer cleanup()
+	rec_expect, _ := json.Marshal(recbt)
+	db.insertMeta(dbname, &recbt)
+
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: "whatever", Op: "meta", ExtraParam: "0"})
+
+	assert.Nil(t, err)
+	assert.Equal(t, string(rec_expect), string(res))
+}
+
+func TestMongoDBGetMetaStOK(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
-	recm.ID = metaID
-	rec_expect, _ := json.Marshal(recm)
-	db.insertMeta(dbname, &recm)
+	rec_expect, _ := json.Marshal(recst)
+	db.insertMeta(dbname, &recst)
 
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "meta", ExtraParam: metaID_str})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "meta", ExtraParam: "1"})
 
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec_expect), string(res))
@@ -564,7 +581,7 @@ func TestMongoDBGetMetaErr(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "meta", ExtraParam: metaID_str})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "meta", ExtraParam: metaID})
 	assert.NotNil(t, err)
 }
 
@@ -639,7 +656,7 @@ func TestMongoDBQueryMessagesOK(t *testing.T) {
 		//			continue
 		//		}
 
-		res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "querymessages", ExtraParam: test.query})
+		res_string, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "querymessages", ExtraParam: test.query})
 		var res []TestRecordMeta
 		json.Unmarshal(res_string, &res)
 		//		fmt.Println(string(res_string))
@@ -658,7 +675,7 @@ func TestMongoDBQueryMessagesOnEmptyDatabase(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	for _, test := range tests {
-		res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "querymessages", ExtraParam: test.query})
+		res_string, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "querymessages", ExtraParam: test.query})
 		var res []TestRecordMeta
 		json.Unmarshal(res_string, &res)
 		assert.Equal(t, 0, len(res))
@@ -684,7 +701,7 @@ func TestMongoDBGetDataset(t *testing.T) {
 
 	db.insertRecord(dbname, collection, &rec_dataset1)
 
-	res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", DatasetOp: true})
+	res_string, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true})
 
 	assert.Nil(t, err)
 
@@ -700,7 +717,7 @@ func TestMongoDBNoDataOnNotCompletedFirstDataset(t *testing.T) {
 
 	db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
 
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", DatasetOp: true})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true})
 
 	assert.Equal(t, utils.StatusPartialData, err.(*DBError).Code)
 	var res TestDataset
@@ -715,8 +732,8 @@ func TestMongoDBNoDataOnNotCompletedNextDataset(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
 	db.insertRecord(dbname, collection, &rec_dataset2_incomplete)
 
-	_, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", DatasetOp: true})
-	_, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", DatasetOp: true})
+	_, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true})
+	_, err2 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true})
 
 	assert.Equal(t, utils.StatusPartialData, err1.(*DBError).Code)
 	assert.Equal(t, utils.StatusPartialData, err2.(*DBError).Code)
@@ -732,7 +749,7 @@ func TestMongoDBGetRecordLastDataSetSkipsIncompleteSets(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec_dataset1)
 	db.insertRecord(dbname, collection, &rec_dataset2)
 
-	res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"})
+	res_string, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"})
 
 	assert.Nil(t, err)
 
@@ -749,7 +766,7 @@ func TestMongoDBGetRecordLastDataSetReturnsIncompleteSets(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec_dataset1)
 	db.insertRecord(dbname, collection, &rec_dataset2)
 
-	res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last",
+	res_string, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "last",
 		DatasetOp: true, MinDatasetSize: 3, ExtraParam: "0"})
 
 	assert.Nil(t, err)
@@ -767,7 +784,7 @@ func TestMongoDBGetRecordLastDataSetSkipsIncompleteSetsWithMinSize(t *testing.T)
 	db.insertRecord(dbname, collection, &rec_dataset1)
 	db.insertRecord(dbname, collection, &rec_dataset2_incomplete3)
 
-	res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last",
+	res_string, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "last",
 		DatasetOp: true, MinDatasetSize: 3, ExtraParam: "0"})
 
 	assert.Nil(t, err)
@@ -784,7 +801,7 @@ func TestMongoDBGetRecordLastDataSetWithFinishedStream(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec_dataset1)
 	db.insertRecord(dbname, collection, &rec_finished)
 
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last",
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "last",
 		DatasetOp: true, ExtraParam: "0"})
 
 	assert.NotNil(t, err)
@@ -801,7 +818,7 @@ func TestMongoDBGetRecordLastDataSetWithIncompleteDatasetsAndFinishedStreamRetur
 	db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
 	db.insertRecord(dbname, collection, &rec_finished)
 
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last",
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "last",
 		DatasetOp: true, MinDatasetSize: 2, ExtraParam: "0"})
 
 	assert.NotNil(t, err)
@@ -818,7 +835,7 @@ func TestMongoDBGetRecordLastDataSetOK(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec_dataset1)
 	db.insertRecord(dbname, collection, &rec_dataset3)
 
-	res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"})
+	res_string, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"})
 
 	assert.Nil(t, err)
 
@@ -833,7 +850,7 @@ func TestMongoDBGetDatasetID(t *testing.T) {
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec_dataset1)
 
-	res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp: true, ExtraParam: "1"})
+	res_string, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "id", DatasetOp: true, ExtraParam: "1"})
 
 	assert.Nil(t, err)
 
@@ -849,7 +866,7 @@ func TestMongoDBErrorOnIncompleteDatasetID(t *testing.T) {
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
 
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp: true, ExtraParam: "1"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "id", DatasetOp: true, ExtraParam: "1"})
 
 	assert.Equal(t, utils.StatusPartialData, err.(*DBError).Code)
 
@@ -865,7 +882,7 @@ func TestMongoDBOkOnIncompleteDatasetID(t *testing.T) {
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
 
-	res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp: true, MinDatasetSize: 3, ExtraParam: "1"})
+	res_string, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "id", DatasetOp: true, MinDatasetSize: 3, ExtraParam: "1"})
 
 	assert.Nil(t, err)
 
@@ -912,7 +929,7 @@ func TestMongoDBListStreams(t *testing.T) {
 		}
 		var rec_streams_expect, _ = json.Marshal(test.expectedStreams)
 
-		res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: "0", Op: "streams", ExtraParam: test.from})
+		res, err := db.ProcessRequest(Request{DbName: dbname, Stream: "0", Op: "streams", ExtraParam: test.from})
 		if test.ok {
 			assert.Nil(t, err, test.test)
 			assert.Equal(t, string(rec_streams_expect), string(res), test.test)
@@ -932,7 +949,7 @@ func TestMongoDBAckMessage(t *testing.T) {
 
 	query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}"
 
-	request := Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}
+	request := Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}
 	res, err := db.ProcessRequest(request)
 	nacks, _ := db.getNacks(request, 0, 0)
 	assert.Nil(t, err)
@@ -969,12 +986,12 @@ func TestMongoDBNacks(t *testing.T) {
 			db.insertRecord(dbname, collection, &rec_finished11)
 		}
 		if test.ackRecords {
-			db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackmessage\"}"})
-			db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":3,\"Op\":\"ackmessage\"}"})
-			db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":4,\"Op\":\"ackmessage\"}"})
+			db.ackRecord(Request{DbName: dbname, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackmessage\"}"})
+			db.ackRecord(Request{DbName: dbname, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":3,\"Op\":\"ackmessage\"}"})
+			db.ackRecord(Request{DbName: dbname, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":4,\"Op\":\"ackmessage\"}"})
 		}
 
-		res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "nacks", ExtraParam: test.rangeString})
+		res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "nacks", ExtraParam: test.rangeString})
 		if test.ok {
 			assert.Nil(t, err, test.test)
 			assert.Equal(t, test.resString, string(res), test.test)
@@ -1004,12 +1021,12 @@ func TestMongoDBLastAcks(t *testing.T) {
 			db.insertRecord(dbname, collection, &rec_finished11)
 		}
 		if test.ackRecords {
-			db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackmessage\"}"})
-			db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":3,\"Op\":\"ackmessage\"}"})
-			db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":4,\"Op\":\"ackmessage\"}"})
+			db.ackRecord(Request{DbName: dbname, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackmessage\"}"})
+			db.ackRecord(Request{DbName: dbname, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":3,\"Op\":\"ackmessage\"}"})
+			db.ackRecord(Request{DbName: dbname, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":4,\"Op\":\"ackmessage\"}"})
 		}
 
-		res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "lastack"})
+		res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "lastack"})
 		assert.Nil(t, err, test.test)
 		assert.Equal(t, test.resString, string(res), test.test)
 		cleanup()
@@ -1023,8 +1040,8 @@ func TestMongoDBGetNextUsesInprocessedImmedeatly(t *testing.T) {
 	err := db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
 
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
-	res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
+	res1, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
 
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
@@ -1037,9 +1054,9 @@ func TestMongoDBGetNextUsesInprocessedNumRetry(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	err := db.insertRecord(dbname, collection, &rec1)
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
-	res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
-	_, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
+	res1, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
+	_, err2 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
 
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
@@ -1057,10 +1074,10 @@ func TestMongoDBGetNextUsesInprocessedAfterTimeout(t *testing.T) {
 	defer cleanup()
 	err := db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"})
-	res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"})
+	res1, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"})
 	time.Sleep(time.Second)
-	res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"})
+	res2, err2 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"})
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
 	assert.Nil(t, err2)
@@ -1076,10 +1093,10 @@ func TestMongoDBGetNextReturnsToNormalAfterUsesInprocessed(t *testing.T) {
 	err := db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
 	db.insertRecord(dbname, collection, &rec_finished3)
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"})
 	time.Sleep(time.Second)
-	res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"})
-	res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"})
+	res1, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"})
+	res2, err2 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"})
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
 	assert.Nil(t, err2)
@@ -1094,8 +1111,8 @@ func TestMongoDBGetNextUsesInprocessedImmedeatlyIfFinishedStream(t *testing.T) {
 	defer cleanup()
 	err := db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec_finished)
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
-	res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
+	res1, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
 	assert.Equal(t, string(rec1_expect), string(res))
@@ -1108,9 +1125,9 @@ func TestMongoDBGetNextUsesInprocessedImmedeatlyIfEndofStream(t *testing.T) {
 	defer cleanup()
 	err := db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
-	res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
-	res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
+	res1, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
+	res2, err2 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
 	assert.Nil(t, err2)
@@ -1124,11 +1141,11 @@ func TestMongoDBAckDeletesInprocessed(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
-	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
+	db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
 	query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}"
 
-	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str})
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
+	db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str})
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"})
 	assert.NotNil(t, err)
 	if err != nil {
 		assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
@@ -1142,8 +1159,8 @@ func TestMongoDBAckTwiceErrors(t *testing.T) {
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
 	query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}"
-	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str})
-	_,err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str})
+	db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str})
+	_,err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str})
 	assert.Equal(t, utils.StatusWrongInput, err.(*DBError).Code)
 }
 
@@ -1162,14 +1179,14 @@ func TestMongoDBNegAck(t *testing.T) {
 	inputParams.Params.DelayMs = 0
 
 	db.insertRecord(dbname, collection, &rec1)
-	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})
+	db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})
 	bparam, _ := json.Marshal(&inputParams)
 
-	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "negackmessage", ExtraParam: string(bparam)})
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) // first time message from negack
-	_, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})  // second time nothing
-	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "negackmessage", ExtraParam: string(bparam)})
-	_, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"})  // second time nothing
+	db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "negackmessage", ExtraParam: string(bparam)})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) // first time message from negack
+	_, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"})  // second time nothing
+	db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "negackmessage", ExtraParam: string(bparam)})
+	_, err2 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) // second time nothing
 
 	assert.Nil(t, err)
 	assert.Equal(t, string(rec1_expect), string(res))
@@ -1188,12 +1205,12 @@ func TestMongoDBGetNextClearsInprocessAfterReset(t *testing.T) {
 	defer cleanup()
 	err := db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
-	res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
+	res1, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
 
-	db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "resetcounter", ExtraParam: "0"})
-	res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
-	res3, err3 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
+	db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "resetcounter", ExtraParam: "0"})
+	res2, err2 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
+	res3, err3 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"})
 
 	assert.Nil(t, err)
 	assert.Nil(t, err1)
@@ -1223,12 +1240,12 @@ func TestDeleteStreams(t *testing.T) {
 	for _, test := range testsDeleteStream {
 		db.Connect(dbaddress)
 		db.insertRecord(dbname, encodeStringForColName(test.stream), &rec1)
-		db.ProcessRequest(Request{DbName: dbname, DbCollectionName: test.stream, GroupId: "123", Op: "next"})
+		db.ProcessRequest(Request{DbName: dbname, Stream: test.stream, GroupId: "123", Op: "next"})
 		query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}"
-		request := Request{DbName: dbname, DbCollectionName: test.stream, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}
+		request := Request{DbName: dbname, Stream: test.stream, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}
 		_, err := db.ProcessRequest(request)
 		assert.Nil(t, err, test.message)
-		_, err = db.ProcessRequest(Request{DbName: dbname, DbCollectionName: test.stream, GroupId: "", Op: "delete_stream", ExtraParam: test.params})
+		_, err = db.ProcessRequest(Request{DbName: dbname, Stream: test.stream, GroupId: "", Op: "delete_stream", ExtraParam: test.params})
 		if test.ok {
 			rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""})
 			acks_exist,_:= db.collectionExist(Request{DbName: dbname, ExtraParam: ""},acks_collection_name_prefix+test.stream)
@@ -1240,7 +1257,7 @@ func TestDeleteStreams(t *testing.T) {
 		} else {
 			assert.NotNil(t, err, test.message)
 		}
-		_, err = db.ProcessRequest(Request{DbName: dbname, DbCollectionName: test.stream, GroupId: "", Op: "delete_stream", ExtraParam: test.params})
+		_, err = db.ProcessRequest(Request{DbName: dbname, Stream: test.stream, GroupId: "", Op: "delete_stream", ExtraParam: test.params})
 		if test.ok2 {
 			assert.Nil(t, err, test.message+" 2")
 		} else {
@@ -1271,7 +1288,7 @@ func TestMongoDBEncodingOK(t *testing.T) {
 	for _, test := range testsEncodings {
 		db.Connect(dbaddress)
 		db.insertRecord(test.dbname_indb, test.collection_indb, &rec1)
-		res, err := db.ProcessRequest(Request{DbName: test.dbname, DbCollectionName: test.collection, GroupId: test.group, Op: "next"})
+		res, err := db.ProcessRequest(Request{DbName: test.dbname, Stream: test.collection, GroupId: test.group, Op: "next"})
 		if test.ok {
 			assert.Nil(t, err, test.message)
 			assert.Equal(t, string(rec1_expect), string(res), test.message)
diff --git a/broker/src/asapo_broker/server/get_commands_test.go b/broker/src/asapo_broker/server/get_commands_test.go
index 40c41c2b6..980946e49 100644
--- a/broker/src/asapo_broker/server/get_commands_test.go
+++ b/broker/src/asapo_broker/server/get_commands_test.go
@@ -59,7 +59,7 @@ var testsGetCommand = []struct {
 
 func (suite *GetCommandsTestSuite) TestGetCommandsCallsCorrectRoutine() {
 	for _, test := range testsGetCommand {
-		suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, DbCollectionName: test.stream, GroupId: test.groupid, Op: test.command, ExtraParam: test.externalParam}).Return([]byte("Hello"), nil)
+		suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, Stream: test.stream, GroupId: test.groupid, Op: test.command, ExtraParam: test.externalParam}).Return([]byte("Hello"), nil)
 		logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request "+test.command)))
 		w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + test.source + "/" + test.reqString+correctTokenSuffix+test.queryParams)
 		suite.Equal(http.StatusOK, w.Code, test.command+ " OK")
@@ -83,7 +83,7 @@ func (suite *GetCommandsTestSuite) TestGetCommandsCorrectlyProcessedEncoding() {
 		test.reqString = strings.Replace(test.reqString,test.source,encodedSource,1)
 		test.reqString = strings.Replace(test.reqString,test.stream,encodedStream,1)
 		dbname := expectedBeamtimeId + "_" + newsource
-		suite.mock_db.On("ProcessRequest", database.Request{DbName: dbname, DbCollectionName: newstream, GroupId: newgroup, Op: test.command, ExtraParam: test.externalParam}).Return([]byte("Hello"), nil)
+		suite.mock_db.On("ProcessRequest", database.Request{DbName: dbname, Stream: newstream, GroupId: newgroup, Op: test.command, ExtraParam: test.externalParam}).Return([]byte("Hello"), nil)
 		logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request "+test.command)))
 		w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + encodedSource + "/" + test.reqString+correctTokenSuffix+test.queryParams)
 		suite.Equal(http.StatusOK, w.Code, test.command+ " OK")
diff --git a/broker/src/asapo_broker/server/get_meta_test.go b/broker/src/asapo_broker/server/get_meta_test.go
index 550efb653..b54a72865 100644
--- a/broker/src/asapo_broker/server/get_meta_test.go
+++ b/broker/src/asapo_broker/server/get_meta_test.go
@@ -33,9 +33,9 @@ func TestGetMetaTestSuite(t *testing.T) {
 }
 
 func (suite *GetMetaTestSuite) TestGetMetaOK() {
-	suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, Op: "meta", ExtraParam: "1"}).Return([]byte(""), nil)
+	suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, Stream: expectedStream, Op: "meta", ExtraParam: "0"}).Return([]byte(""), nil)
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request meta")))
-	w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/0/meta"  + "/1" + correctTokenSuffix,"GET")
+	w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/0/meta"  + "/0" + correctTokenSuffix,"GET")
 	suite.Equal(http.StatusOK, w.Code, "meta OK")
 }
 
diff --git a/broker/src/asapo_broker/server/post_op_image_test.go b/broker/src/asapo_broker/server/post_op_image_test.go
index fc1a2d4e7..2cc3159ee 100644
--- a/broker/src/asapo_broker/server/post_op_image_test.go
+++ b/broker/src/asapo_broker/server/post_op_image_test.go
@@ -34,7 +34,7 @@ func TestMessageOpTestSuite(t *testing.T) {
 
 func (suite *MessageOpTestSuite) TestAckMessageOpOK() {
 	query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}"
-	suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, Op: "ackmessage", ExtraParam: query_str}).Return([]byte(""), nil)
+	suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, Stream: expectedStream, GroupId: expectedGroupID, Op: "ackmessage", ExtraParam: query_str}).Return([]byte(""), nil)
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request ackmessage")))
 	w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/1" + correctTokenSuffix,"POST",query_str)
 	suite.Equal(http.StatusOK, w.Code, "ackmessage OK")
diff --git a/broker/src/asapo_broker/server/post_query_images_test.go b/broker/src/asapo_broker/server/post_query_images_test.go
index 16aca9242..d51d2490a 100644
--- a/broker/src/asapo_broker/server/post_query_images_test.go
+++ b/broker/src/asapo_broker/server/post_query_images_test.go
@@ -35,7 +35,7 @@ func TestQueryTestSuite(t *testing.T) {
 func (suite *QueryTestSuite) TestQueryOK() {
 	query_str := "aaaa"
 
-	suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, DbCollectionName: expectedStream,Op: "querymessages", ExtraParam: query_str}).Return([]byte("{}"), nil)
+	suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, Stream: expectedStream,Op: "querymessages", ExtraParam: query_str}).Return([]byte("{}"), nil)
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request querymessages")))
 
 	w := doRequest("/beamtime/"+expectedBeamtimeId+"/"+expectedSource+"/"+expectedStream+"/0/querymessages"+correctTokenSuffix, "POST", query_str)
diff --git a/broker/src/asapo_broker/server/post_reset_counter_test.go b/broker/src/asapo_broker/server/post_reset_counter_test.go
index 10fb4e1b0..64291bee2 100644
--- a/broker/src/asapo_broker/server/post_reset_counter_test.go
+++ b/broker/src/asapo_broker/server/post_reset_counter_test.go
@@ -33,7 +33,7 @@ func TestResetCounterTestSuite(t *testing.T) {
 }
 
 func (suite *ResetCounterTestSuite) TestResetCounterOK() {
-	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId:expectedGroupID, Op: "resetcounter", ExtraParam: "10"}
+	expectedRequest := database.Request{DbName: expectedDBName, Stream: expectedStream, GroupId:expectedGroupID, Op: "resetcounter", ExtraParam: "10"}
 	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), nil)
 
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request resetcounter")))
diff --git a/broker/src/asapo_broker/server/process_request.go b/broker/src/asapo_broker/server/process_request.go
index 23fe151a8..41b6564b7 100644
--- a/broker/src/asapo_broker/server/process_request.go
+++ b/broker/src/asapo_broker/server/process_request.go
@@ -78,7 +78,7 @@ func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_par
 	request.DbName = db_name+"_"+datasource
 	request.Op = op
 	request.ExtraParam = extra_param
-	request.DbCollectionName = stream
+	request.Stream = stream
 	request.GroupId = group_id
 	if yes, minSize := datasetRequested(r); yes {
 		request.DatasetOp = true
diff --git a/broker/src/asapo_broker/server/process_request_test.go b/broker/src/asapo_broker/server/process_request_test.go
index 4ac81cd3b..781a7f16b 100644
--- a/broker/src/asapo_broker/server/process_request_test.go
+++ b/broker/src/asapo_broker/server/process_request_test.go
@@ -151,7 +151,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithNoToken() {
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() {
 
-	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, Op: "next"}
+	expectedRequest := database.Request{DbName: expectedDBName, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"}
 
 	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""),
 		&database.DBError{utils.StatusNoData, ""})
@@ -165,7 +165,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName()
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() {
 
-	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, Op: "next"}
+	expectedRequest := database.Request{DbName: expectedDBName, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"}
 
 	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""),
 		&database.DBError{utils.StatusServiceUnavailable, ""})
@@ -181,7 +181,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() {
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() {
 
-	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, Op: "next"}
+	expectedRequest := database.Request{DbName: expectedDBName, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"}
 
 	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), errors.New(""))
 	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next")))
@@ -196,7 +196,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() {
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() {
 
-	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, Op: "next"}
+	expectedRequest := database.Request{DbName: expectedDBName, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"}
 	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil)
 
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName)))
@@ -207,7 +207,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() {
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() {
 
-	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, DatasetOp: true, Op: "next"}
+	expectedRequest := database.Request{DbName: expectedDBName, Stream: expectedStream, GroupId: expectedGroupID, DatasetOp: true, Op: "next"}
 	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil)
 
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName)))
@@ -231,7 +231,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestDeleteStreamReadToken()
 func (suite *ProcessRequestTestSuite) TestProcessRequestDeleteStreamWriteToken() {
 	query_str := "query_string"
 
-	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: "", Op: "delete_stream", ExtraParam: query_str}
+	expectedRequest := database.Request{DbName: expectedDBName, Stream: expectedStream, GroupId: "", Op: "delete_stream", ExtraParam: query_str}
 	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil)
 
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request delete_stream in "+expectedDBName)))
diff --git a/producer/api/cpp/include/asapo/producer/producer.h b/producer/api/cpp/include/asapo/producer/producer.h
index 6d2a9535c..5b684de31 100644
--- a/producer/api/cpp/include/asapo/producer/producer.h
+++ b/producer/api/cpp/include/asapo/producer/producer.h
@@ -112,8 +112,9 @@ class Producer {
       \param callback - callback function
       \return Error - will be nullptr on success
     */
-    virtual Error DEPRECATED("obsolates 01.07.2022, use SendBeamtimeMetadata instead") SendMetadata(const std::string& metadata,
-                               RequestCallback callback)  = 0;
+    virtual Error DEPRECATED("obsolates 01.07.2022, use SendBeamtimeMetadata instead") SendMetadata(
+        const std::string& metadata,
+        RequestCallback callback)  = 0;
 
     //! Sends beamtime metadata to the receiver
     /*!
-- 
GitLab