diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go
index f95736a2be9fd873dcba5be494cef11e8b1149dd..41bf082a0444c2f1248f540a4db52bd03d4ae3de 100644
--- a/broker/src/asapo_broker/database/mongodb.go
+++ b/broker/src/asapo_broker/database/mongodb.go
@@ -24,11 +24,13 @@ type ID struct {
 	ID int `bson:"_id"`
 }
 
-type FinishedStreamRecord struct {
-	ID         int                    `json:"_id"`
-	Name       string                 `json:"name"`
-	Meta       map[string]interface{} `json:"meta"`
-	NextStream string
+type MessageRecord struct {
+	ID             int                    `json:"_id"`
+	Timestamp      int                    `bson:"timestamp" json:"timestamp"`
+	Name           string                 `json:"name"`
+	Meta           map[string]interface{} `json:"meta"`
+	NextStream     string
+	FinishedStream bool
 }
 
 type InProcessingRecord struct {
@@ -277,7 +279,7 @@ func (db *Mongodb) getRecordFromDb(request Request, id, id_max int) (res map[str
 	return res, err
 }
 
-func (db *Mongodb) getRecordByIDRow(request Request, id, id_max int) ([]byte, error) {
+func (db *Mongodb) getRecordByIDRaw(request Request, id, id_max int) ([]byte, error) {
 	res, err := db.getRecordFromDb(request, id, id_max)
 	if err != nil {
 		return nil, err
@@ -301,10 +303,10 @@ func (db *Mongodb) getRecordByIDRow(request Request, id, id_max int) ([]byte, er
 	}
 }
 
-func (db *Mongodb) getEarliestRecord(dbname string, collection_name string) (map[string]interface{}, error) {
+func (db *Mongodb) getRawRecordWithSort(dbname string, collection_name string, sortField string, sortOrder int) (map[string]interface{}, error) {
 	var res map[string]interface{}
 	c := db.client.Database(dbname).Collection(data_collection_name_prefix + collection_name)
-	opts := options.FindOne().SetSort(bson.M{"timestamp": 1})
+	opts := options.FindOne().SetSort(bson.M{sortField: sortOrder})
 	var q bson.M = nil
 	err := c.FindOne(context.TODO(), q, opts).Decode(&res)
 
@@ -317,6 +319,14 @@ func (db *Mongodb) getEarliestRecord(dbname string, collection_name string) (map
 	return res, nil
 }
 
+func (db *Mongodb) getLastRawRecord(dbname string, collection_name string) (map[string]interface{}, error) {
+	return db.getRawRecordWithSort(dbname, collection_name, "_id", -1)
+}
+
+func (db *Mongodb) getEarliestRawRecord(dbname string, collection_name string) (map[string]interface{}, error) {
+	return db.getRawRecordWithSort(dbname, collection_name, "timestamp", 1)
+}
+
 func (db *Mongodb) getRecordByID(request Request) ([]byte, error) {
 	id, err := strconv.Atoi(request.ExtraParam)
 	if err != nil {
@@ -328,7 +338,7 @@ func (db *Mongodb) getRecordByID(request Request) ([]byte, error) {
 		return nil, err
 	}
 
-	return db.getRecordByIDRow(request, id, max_ind)
+	return db.getRecordByIDRaw(request, id, max_ind)
 }
 
 func (db *Mongodb) negAckRecord(request Request) ([]byte, error) {
@@ -518,18 +528,21 @@ func (db *Mongodb) getNextAndMaxIndexes(request Request) (int, int, error) {
 	return nextInd, maxInd, nil
 }
 
-func ExtractFinishedStreamRecord(data map[string]interface{}) (FinishedStreamRecord, bool) {
-	var r FinishedStreamRecord
+func ExtractMessageRecord(data map[string]interface{}) (MessageRecord, bool) {
+	var r MessageRecord
 	err := utils.MapToStruct(data, &r)
-	if err != nil || r.Name != finish_stream_keyword {
+	if err != nil {
 		return r, false
 	}
-	var next_stream string
-	next_stream, ok := r.Meta["next_stream"].(string)
-	if !ok {
-		next_stream = no_next_stream_keyword
+	r.FinishedStream = (r.Name == finish_stream_keyword)
+	if r.FinishedStream {
+		var next_stream string
+		next_stream, ok := r.Meta["next_stream"].(string)
+		if !ok {
+			next_stream = no_next_stream_keyword
+		}
+		r.NextStream = next_stream
 	}
-	r.NextStream = next_stream
 	return r, true
 }
 
@@ -540,7 +553,7 @@ func (db *Mongodb) tryGetRecordFromInprocessed(request Request, originalerror er
 		return nil, err_inproc
 	}
 	if nextInd != 0 {
-		return db.getRecordByIDRow(request, nextInd, maxInd)
+		return db.getRecordByIDRaw(request, nextInd, maxInd)
 	} else {
 		return nil, originalerror
 	}
@@ -550,8 +563,8 @@ func checkStreamFinished(request Request, id, id_max int, data map[string]interf
 	if id != id_max {
 		return nil
 	}
-	r, ok := ExtractFinishedStreamRecord(data)
-	if !ok {
+	r, ok := ExtractMessageRecord(data)
+	if !ok || !r.FinishedStream {
 		return nil
 	}
 	log_str := "reached end of stream " + request.DbCollectionName + " , next_stream: " + r.NextStream
@@ -567,7 +580,7 @@ func (db *Mongodb) getNextRecord(request Request) ([]byte, error) {
 		return nil, err
 	}
 
-	data, err := db.getRecordByIDRow(request, nextInd, maxInd)
+	data, err := db.getRecordByIDRaw(request, nextInd, maxInd)
 	if err != nil {
 		data, err = db.tryGetRecordFromInprocessed(request, err)
 	}
@@ -586,7 +599,7 @@ func (db *Mongodb) getLastRecord(request Request) ([]byte, error) {
 	if err != nil {
 		return nil, err
 	}
-	return db.getRecordByIDRow(request, max_ind, max_ind)
+	return db.getRecordByIDRaw(request, max_ind, max_ind)
 }
 
 func getSizeFilter(request Request) bson.M {
@@ -725,10 +738,10 @@ func extractsTwoIntsFromString(from_to string) (int, int, error) {
 
 }
 
-func (db *Mongodb) getNacksLimits(request Request) (int,int, error) {
+func (db *Mongodb) getNacksLimits(request Request) (int, int, error) {
 	from, to, err := extractsTwoIntsFromString(request.ExtraParam)
 	if err != nil {
-		return 0,0, err
+		return 0, 0, err
 	}
 
 	if from == 0 {
@@ -736,12 +749,24 @@ func (db *Mongodb) getNacksLimits(request Request) (int,int, error) {
 	}
 
 	if to == 0 {
-		to, err = db.getMaxIndex(request, true)
+		to, err = db.getMaxLimitWithoutEndOfStream(request, err)
 		if err != nil {
-			return 0,0, err
+			return 0, 0, err
 		}
 	}
-	return from,to,nil
+	return from, to, nil
+}
+
+func (db *Mongodb) getMaxLimitWithoutEndOfStream(request Request, err error) (int, error) {
+	maxInd, err := db.getMaxIndex(request, true)
+	if err != nil {
+		return 0, err
+	}
+	_, last_err := db.getRecordByIDRaw(request, maxInd, maxInd)
+	if last_err != nil && maxInd > 0 {
+		maxInd = maxInd - 1
+	}
+	return maxInd, nil
 }
 
 func (db *Mongodb) nacks(request Request) ([]byte, error) {
@@ -833,7 +858,7 @@ func extractNacsFromCursor(err error, cursor *mongo.Cursor) ([]int, error) {
 func (db *Mongodb) getNacks(request Request, min_index, max_index int) ([]int, error) {
 	c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId)
 
-	if res, err, ok := db.canAvoidDbRequest(min_index, max_index, c);ok{
+	if res, err, ok := db.canAvoidDbRequest(min_index, max_index, c); ok {
 		return res, err
 	}
 
diff --git a/broker/src/asapo_broker/database/mongodb_streams.go b/broker/src/asapo_broker/database/mongodb_streams.go
index fba70330b025ad6ae762c4574324073b3900a655..4c531b1b9f5d978fd5138e79c82aa7eb153ae5ff 100644
--- a/broker/src/asapo_broker/database/mongodb_streams.go
+++ b/broker/src/asapo_broker/database/mongodb_streams.go
@@ -14,8 +14,12 @@ import (
 )
 
 type StreamInfo struct {
-	Name      string `json:"name"`
-	Timestamp int64  `json:"timestampCreated"`
+	LastId        int64  `json:"lastId"`
+	Name          string `json:"name"`
+	Timestamp     int64  `json:"timestampCreated"`
+	TimestampLast int64  `json:"timestampLast"`
+	Finished      bool   `json:"finished"`
+	NextStream    string `json:"nextStream"`
 }
 
 type StreamsRecord struct {
@@ -57,32 +61,93 @@ func readStreams(db *Mongodb, db_name string) (StreamsRecord, error) {
 	return rec, nil
 }
 
-func updateTimestamps(db *Mongodb, db_name string, rec *StreamsRecord) {
-	ss,dbFound :=streams.records[db_name]
+func getCurrentStreams(db_name string) []StreamInfo {
+	ss, dbFound := streams.records[db_name]
 	currentStreams := []StreamInfo{}
 	if dbFound {
 		// sort streams by name
-		currentStreams=ss.Streams
-		sort.Slice(currentStreams,func(i, j int) bool {
-			return currentStreams[i].Name>=currentStreams[j].Name
+		currentStreams = ss.Streams
+		sort.Slice(currentStreams, func(i, j int) bool {
+			return currentStreams[i].Name >= currentStreams[j].Name
 		})
 	}
+	return currentStreams
+}
+
+func findStreamAmongCurrent(currentStreams []StreamInfo, record StreamInfo) (int, bool) {
+	ind := sort.Search(len(currentStreams), func(i int) bool {
+		return currentStreams[i].Name >= record.Name
+	})
+	if ind < len(currentStreams) && currentStreams[ind].Name == record.Name {
+		return ind, true
+	}
+	return -1, false
+}
+
+func fillInfoFromEarliestRecord(db *Mongodb, db_name string, rec *StreamsRecord, record StreamInfo, i int) error {
+	res, err := db.getEarliestRawRecord(db_name, record.Name)
+	if err != nil {
+		return err
+	}
+	ts, ok := utils.GetInt64FromMap(res, "timestamp")
+	if ok {
+		rec.Streams[i].Timestamp = ts
+	} else {
+		return errors.New("fillInfoFromEarliestRecord: cannot extact timestamp")
+	}
+	return nil
+}
+
+func fillInfoFromLastRecord(db *Mongodb, db_name string, rec *StreamsRecord, record StreamInfo, i int) error {
+	res, err := db.getLastRawRecord(db_name, record.Name)
+	if err != nil {
+		return err
+	}
+	mrec, ok := ExtractMessageRecord(res)
+	if !ok {
+		return errors.New("fillInfoFromLastRecord: cannot extract record")
+	}
+
+	rec.Streams[i].LastId = int64(mrec.ID)
+	rec.Streams[i].TimestampLast = int64(mrec.Timestamp)
+	rec.Streams[i].Finished = mrec.FinishedStream
+	if mrec.FinishedStream {
+		rec.Streams[i].LastId = rec.Streams[i].LastId - 1
+		if mrec.NextStream != no_next_stream_keyword {
+			rec.Streams[i].NextStream = mrec.NextStream
+		}
+	}
+	return nil
+}
+
+func updateStreamInfofromCurrent(currentStreams []StreamInfo, record StreamInfo, rec *StreamInfo) (found bool, updateFinished bool) {
+	ind, found := findStreamAmongCurrent(currentStreams, record)
+	if found {
+		*rec = currentStreams[ind]
+		if currentStreams[ind].Finished {
+			return found, true
+		}
+	}
+	return found, false
+}
+
+func updateStreamInfos(db *Mongodb, db_name string, rec *StreamsRecord) error {
+	currentStreams := getCurrentStreams(db_name)
 	for i, record := range rec.Streams {
-		ind := sort.Search(len(currentStreams),func(i int) bool {
-			return currentStreams[i].Name>=record.Name
-		})
-		if ind < len(currentStreams) && currentStreams[ind].Name == record.Name { // record found, just skip it
-			rec.Streams[i].Timestamp = currentStreams[ind].Timestamp
+		found, mayContinue := updateStreamInfofromCurrent(currentStreams, record, &rec.Streams[i])
+		if mayContinue {
 			continue
 		}
-		res, err := db.getEarliestRecord(db_name, record.Name)
-		if err == nil {
-			ts,ok:=utils.InterfaceToInt64(res["timestamp"])
-			if ok {
-				rec.Streams[i].Timestamp = ts
+		if !found { // set timestamp
+			if err := fillInfoFromEarliestRecord(db, db_name, rec, record, i); err != nil {
+				return err
 			}
 		}
+		if err := fillInfoFromLastRecord(db, db_name, rec, record, i); err != nil { // update from last record (timestamp, stream finished flag)
+			return err
+		}
 	}
+	return nil
 }
 
 func sortRecords(rec *StreamsRecord) {
@@ -96,9 +161,13 @@ func (ss *Streams) updateFromDb(db *Mongodb, db_name string) (StreamsRecord, err
 	if err != nil {
 		return StreamsRecord{}, err
 	}
-	updateTimestamps(db, db_name, &rec)
+	err = updateStreamInfos(db, db_name, &rec)
+	if err != nil {
+		return StreamsRecord{}, err
+	}
+
 	sortRecords(&rec)
-	if len(rec.Streams)>0 {
+	if len(rec.Streams) > 0 {
 		ss.records[db_name] = rec
 		ss.lastUpdated = time.Now().UnixNano()
 	}
@@ -107,7 +176,7 @@ func (ss *Streams) updateFromDb(db *Mongodb, db_name string) (StreamsRecord, err
 
 func (ss *Streams) getStreams(db *Mongodb, db_name string, from string) (StreamsRecord, error) {
 	streamsLock.Lock()
-	rec, err := ss.tryGetFromCache(db_name,db.settings.UpdateStreamCachePeriodMs)
+	rec, err := ss.tryGetFromCache(db_name, db.settings.UpdateStreamCachePeriodMs)
 	if err != nil {
 		rec, err = ss.updateFromDb(db, db_name)
 	}
diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go
index e759b0c39553f08b6c6fcf4b3bb0b07b5000f9ee..08569051628ecec2f601cb8bf83a9a2ca1a644d6 100644
--- a/broker/src/asapo_broker/database/mongodb_test.go
+++ b/broker/src/asapo_broker/database/mongodb_test.go
@@ -13,16 +13,17 @@ import (
 )
 
 type TestRecord struct {
-	ID        int64               `bson:"_id" json:"_id"`
+	ID        int64             `bson:"_id" json:"_id"`
 	Meta      map[string]string `bson:"meta" json:"meta"`
 	Name      string            `bson:"name" json:"name"`
 	Timestamp int64             `bson:"timestamp" json:"timestamp"`
 }
 
 type TestDataset struct {
-	ID     int64          `bson:"_id" json:"_id"`
-	Size   int64          `bson:"size" json:"size"`
-	Messages []TestRecord `bson:"messages" json:"messages"`
+	Timestamp int64        `bson:"timestamp" json:"timestamp"`
+	ID        int64        `bson:"_id" json:"_id"`
+	Size      int64        `bson:"size" json:"size"`
+	Messages  []TestRecord `bson:"messages" json:"messages"`
 }
 
 var db Mongodb
@@ -38,9 +39,11 @@ const metaID_str = "0"
 var empty_next = map[string]string{"next_stream": ""}
 
 var rec1 = TestRecord{1, empty_next, "aaa", 0}
-var rec_finished = TestRecord{2, map[string]string{"next_stream": "next1"}, finish_stream_keyword, 0}
+var rec_finished = TestRecord{2, map[string]string{"next_stream": "next1"}, finish_stream_keyword, 2}
 var rec2 = TestRecord{2, empty_next, "bbb", 1}
 var rec3 = TestRecord{3, empty_next, "ccc", 2}
+var rec_finished3 = TestRecord{3, map[string]string{"next_stream": "next1"}, finish_stream_keyword, 2}
+var rec_finished11 = TestRecord{11, map[string]string{"next_stream": "next1"}, finish_stream_keyword, 2}
 
 var rec1_expect, _ = json.Marshal(rec1)
 var rec2_expect, _ = json.Marshal(rec2)
@@ -158,7 +161,7 @@ func TestMongoDBGetByIdErrorOnFinishedStream(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec_finished)
 
-	_,err:=db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id",ExtraParam: "2"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", ExtraParam: "2"})
 
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message)
@@ -170,13 +173,12 @@ func TestMongoDBGetLastErrorOnFinishedStream(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec_finished)
 
-	res,err:= db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last"})
 	fmt.Println(string(res))
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 	assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message)
 }
 
-
 func TestMongoDBGetNextErrorOnNoMoreData(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
@@ -434,27 +436,25 @@ func TestMongoDBGetSizeForDatasets(t *testing.T) {
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec1)
 
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size",ExtraParam: "false"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size", ExtraParam: "false"})
 	assert.Equal(t, utils.StatusWrongInput, err.(*DBError).Code)
 
-	_, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size",ExtraParam: "true"})
+	_, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size", ExtraParam: "true"})
 	assert.Equal(t, utils.StatusWrongInput, err1.(*DBError).Code)
 }
 
-
 func TestMongoDBGetSizeForDatasetsWithFinishedStream(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
 	db.insertRecord(dbname, collection, &rec_finished)
 
-	res, _ := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size",ExtraParam: "true"})
+	res, _ := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size", ExtraParam: "true"})
 
 	var rec_expect, _ = json.Marshal(&SizeRecord{1})
 	assert.Equal(t, string(rec_expect), string(res))
 }
 
-
 func TestMongoDBGetSizeDataset(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
@@ -464,7 +464,7 @@ func TestMongoDBGetSizeDataset(t *testing.T) {
 
 	size2_expect, _ := json.Marshal(SizeRecord{2})
 
-	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size",ExtraParam: "true"})
+	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size", ExtraParam: "true"})
 	assert.Nil(t, err)
 	assert.Equal(t, string(size2_expect), string(res))
 }
@@ -639,13 +639,13 @@ func TestMongoDBQueryMessagesOnEmptyDatabase(t *testing.T) {
 	}
 }
 
-var rec_dataset1 = TestDataset{1, 3, []TestRecord{rec1, rec2, rec3}}
-var rec_dataset1_incomplete = TestDataset{1, 4, []TestRecord{rec1, rec2, rec3}}
-var rec_dataset2_incomplete = TestDataset{2, 4, []TestRecord{rec1, rec2, rec3}}
-var rec_dataset2 = TestDataset{2, 4, []TestRecord{rec1, rec2, rec3}}
-var rec_dataset3 = TestDataset{3, 3, []TestRecord{rec3, rec2, rec2}}
+var rec_dataset1 = TestDataset{0, 1, 3, []TestRecord{rec1, rec2, rec3}}
+var rec_dataset1_incomplete = TestDataset{1, 1, 4, []TestRecord{rec1, rec2, rec3}}
+var rec_dataset2_incomplete = TestDataset{2, 2, 4, []TestRecord{rec1, rec2, rec3}}
+var rec_dataset2 = TestDataset{1, 2, 4, []TestRecord{rec1, rec2, rec3}}
+var rec_dataset3 = TestDataset{2, 3, 3, []TestRecord{rec3, rec2, rec2}}
 
-var rec_dataset2_incomplete3 = TestDataset{2, 3, []TestRecord{rec1, rec2}}
+var rec_dataset2_incomplete3 = TestDataset{1, 2, 3, []TestRecord{rec1, rec2}}
 
 func TestMongoDBGetDataset(t *testing.T) {
 	db.Connect(dbaddress)
@@ -701,7 +701,7 @@ func TestMongoDBGetRecordLastDataSetSkipsIncompleteSets(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec_dataset1)
 	db.insertRecord(dbname, collection, &rec_dataset2)
 
-	res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", DatasetOp:true, ExtraParam: "0"})
+	res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"})
 
 	assert.Nil(t, err)
 
@@ -719,7 +719,7 @@ func TestMongoDBGetRecordLastDataSetReturnsIncompleteSets(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec_dataset2)
 
 	res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last",
-		DatasetOp:true,MinDatasetSize: 3,ExtraParam: "0"})
+		DatasetOp: true, MinDatasetSize: 3, ExtraParam: "0"})
 
 	assert.Nil(t, err)
 
@@ -737,7 +737,7 @@ func TestMongoDBGetRecordLastDataSetSkipsIncompleteSetsWithMinSize(t *testing.T)
 	db.insertRecord(dbname, collection, &rec_dataset2_incomplete3)
 
 	res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last",
-		DatasetOp:true,MinDatasetSize: 3,ExtraParam: "0"})
+		DatasetOp: true, MinDatasetSize: 3, ExtraParam: "0"})
 
 	assert.Nil(t, err)
 
@@ -754,7 +754,7 @@ func TestMongoDBGetRecordLastDataSetWithFinishedStream(t *testing.T) {
 	db.insertRecord(dbname, collection, &rec_finished)
 
 	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last",
-		DatasetOp:true,ExtraParam: "0"})
+		DatasetOp: true, ExtraParam: "0"})
 
 	assert.NotNil(t, err)
 	if err != nil {
@@ -763,7 +763,6 @@ func TestMongoDBGetRecordLastDataSetWithFinishedStream(t *testing.T) {
 	}
 }
 
-
 func TestMongoDBGetRecordLastDataSetWithIncompleteDatasetsAndFinishedStreamReturnsEndofStream(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
@@ -772,7 +771,7 @@ func TestMongoDBGetRecordLastDataSetWithIncompleteDatasetsAndFinishedStreamRetur
 	db.insertRecord(dbname, collection, &rec_finished)
 
 	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last",
-		DatasetOp:true,MinDatasetSize: 2,ExtraParam: "0"})
+		DatasetOp: true, MinDatasetSize: 2, ExtraParam: "0"})
 
 	assert.NotNil(t, err)
 	if err != nil {
@@ -781,7 +780,6 @@ func TestMongoDBGetRecordLastDataSetWithIncompleteDatasetsAndFinishedStreamRetur
 	}
 }
 
-
 func TestMongoDBGetRecordLastDataSetOK(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
@@ -804,7 +802,7 @@ func TestMongoDBGetDatasetID(t *testing.T) {
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec_dataset1)
 
-	res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp:true, ExtraParam: "1"})
+	res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp: true, ExtraParam: "1"})
 
 	assert.Nil(t, err)
 
@@ -820,7 +818,7 @@ func TestMongoDBErrorOnIncompleteDatasetID(t *testing.T) {
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
 
-	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp:true, ExtraParam: "1"})
+	_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp: true, ExtraParam: "1"})
 
 	assert.Equal(t, utils.StatusPartialData, err.(*DBError).Code)
 
@@ -836,7 +834,7 @@ func TestMongoDBOkOnIncompleteDatasetID(t *testing.T) {
 	defer cleanup()
 	db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
 
-	res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp:true,MinDatasetSize: 3,ExtraParam: "1"})
+	res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp: true, MinDatasetSize: 3, ExtraParam: "1"})
 
 	assert.Nil(t, err)
 
@@ -853,16 +851,20 @@ type Stream struct {
 }
 
 var testsStreams = []struct {
-	from               string
+	from            string
 	streams         []Stream
 	expectedStreams StreamsRecord
-	test               string
-	ok                 bool
+	test            string
+	ok              bool
 }{
 	{"", []Stream{}, StreamsRecord{[]StreamInfo{}}, "no streams", true},
-	{"", []Stream{{"ss1", []TestRecord{rec2, rec1}}}, StreamsRecord{[]StreamInfo{StreamInfo{Name: "ss1", Timestamp: 0}}}, "one stream", true},
-	{"", []Stream{{"ss1", []TestRecord{rec2, rec1}}, {"ss2", []TestRecord{rec2, rec3}}}, StreamsRecord{[]StreamInfo{StreamInfo{Name: "ss1", Timestamp: 0}, StreamInfo{Name: "ss2", Timestamp: 1}}}, "two streams", true},
-	{"ss2", []Stream{{"ss1", []TestRecord{rec1, rec2}}, {"ss2", []TestRecord{rec2, rec3}}}, StreamsRecord{[]StreamInfo{StreamInfo{Name: "ss2", Timestamp: 1}}}, "with from", true},
+	{"", []Stream{{"ss1", []TestRecord{rec2, rec1}}},
+		StreamsRecord{[]StreamInfo{StreamInfo{Name: "ss1", Timestamp: 0, LastId: 2, TimestampLast: 1}}}, "one stream", true},
+	{"", []Stream{{"ss1", []TestRecord{rec2, rec1}},
+		{"ss2", []TestRecord{rec2, rec3}}},
+		StreamsRecord{[]StreamInfo{StreamInfo{Name: "ss1", Timestamp: 0, LastId: 2, TimestampLast: 1},
+			StreamInfo{Name: "ss2", Timestamp: 1, LastId: 3, TimestampLast: 2}}}, "two streams", true},
+	{"ss2", []Stream{{"ss1", []TestRecord{rec1, rec2}}, {"ss2", []TestRecord{rec2, rec3}}}, StreamsRecord{[]StreamInfo{StreamInfo{Name: "ss2", Timestamp: 1, LastId: 3, TimestampLast: 2}}}, "with from", true},
 }
 
 func TestMongoDBListStreams(t *testing.T) {
@@ -891,11 +893,13 @@ func TestMongoDBAckMessage(t *testing.T) {
 	defer cleanup()
 
 	db.insertRecord(dbname, collection, &rec1)
+	db.insertRecord(dbname, collection, &rec_finished)
+
 	query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}"
 
 	request := Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}
 	res, err := db.ProcessRequest(request)
-	nacks, _ := db.getNacks(request, 1, 1)
+	nacks, _ := db.getNacks(request, 0, 0)
 	assert.Nil(t, err)
 	assert.Equal(t, "", string(res))
 	assert.Equal(t, 0, len(nacks))
@@ -927,6 +931,7 @@ func TestMongoDBNacks(t *testing.T) {
 		db.Connect(dbaddress)
 		if test.insertRecords {
 			insertRecords(10)
+			db.insertRecord(dbname, collection, &rec_finished11)
 		}
 		if test.ackRecords {
 			db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackmessage\"}"})
@@ -961,6 +966,7 @@ func TestMongoDBLastAcks(t *testing.T) {
 		db.Connect(dbaddress)
 		if test.insertRecords {
 			insertRecords(10)
+			db.insertRecord(dbname, collection, &rec_finished11)
 		}
 		if test.ackRecords {
 			db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackmessage\"}"})
@@ -1034,6 +1040,7 @@ func TestMongoDBGetNextReturnsToNormalAfterUsesInprocessed(t *testing.T) {
 	defer cleanup()
 	err := db.insertRecord(dbname, collection, &rec1)
 	db.insertRecord(dbname, collection, &rec2)
+	db.insertRecord(dbname, collection, &rec_finished3)
 	res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"})
 	time.Sleep(time.Second)
 	res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"})
diff --git a/broker/src/asapo_broker/database/streams_test.go b/broker/src/asapo_broker/database/streams_test.go
index c172adf5483c2de061f64f55021663f74988a34b..9408445be8c7667c5e25a520dbfeb97eddf4f36f 100644
--- a/broker/src/asapo_broker/database/streams_test.go
+++ b/broker/src/asapo_broker/database/streams_test.go
@@ -18,7 +18,7 @@ func (suite *StreamsTestSuite) SetupTest() {
 
 func (suite *StreamsTestSuite) TearDownTest() {
 	cleanup()
-	streams.records= map[string]StreamsRecord{}
+	streams.records = map[string]StreamsRecord{}
 }
 
 func TestStreamsTestSuite(t *testing.T) {
@@ -48,6 +48,33 @@ func (suite *StreamsTestSuite) TestStreamsUsesCache() {
 	rec, err := streams.getStreams(&db, dbname, "")
 	suite.Nil(err)
 	suite.Equal(int64(1), rec.Streams[0].Timestamp)
+	suite.Equal(false, rec.Streams[0].Finished)
+	suite.Equal(int64(2), rec.Streams[0].LastId)
+	suite.Equal(int64(1), rec.Streams[0].TimestampLast)
+}
+
+func (suite *StreamsTestSuite) TestStreamsGetFinishedInfo() {
+	db.settings.UpdateStreamCachePeriodMs = 1000
+	db.insertRecord(dbname, collection, &rec1)
+	db.insertRecord(dbname, collection, &rec_finished)
+	rec, err := streams.getStreams(&db, dbname, "")
+	suite.Nil(err)
+	suite.Equal(int64(0), rec.Streams[0].Timestamp)
+	suite.Equal(true, rec.Streams[0].Finished)
+	suite.Equal("next1", rec.Streams[0].NextStream)
+}
+
+func (suite *StreamsTestSuite) TestStreamsDataSetsGetFinishedInfo() {
+	db.settings.UpdateStreamCachePeriodMs = 1000
+	db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
+	db.insertRecord(dbname, collection, &rec_finished)
+	rec, err := streams.getStreams(&db, dbname, "")
+	suite.Nil(err)
+	suite.Equal(int64(1), rec.Streams[0].Timestamp)
+	suite.Equal(int64(2), rec.Streams[0].TimestampLast)
+	suite.Equal(true, rec.Streams[0].Finished)
+	suite.Equal("next1", rec.Streams[0].NextStream)
+	suite.Equal(int64(1), rec.Streams[0].LastId)
 }
 
 func (suite *StreamsTestSuite) TestStreamsNotUsesCacheWhenExpired() {
diff --git a/common/cpp/src/data_structs/data_structs.cpp b/common/cpp/src/data_structs/data_structs.cpp
index 622c671656c60c596a25faea34c2e25a6d5558f0..9dea46940fe52b975f7047cac6cf8615da17d253 100644
--- a/common/cpp/src/data_structs/data_structs.cpp
+++ b/common/cpp/src/data_structs/data_structs.cpp
@@ -151,7 +151,7 @@ std::string StreamInfo::Json() const {
     return ("{\"lastId\":" + std::to_string(last_id) + ","  +
         "\"name\":\"" + name + "\",\"timestampCreated\":" + std::to_string(nanoseconds_from_epoch)
         + std::string(",") + "\"timestampLast\":" + std::to_string(nanoseconds_from_epoch_le)
-        + ",\"finished\":" + std::to_string(finished)+ ",\"nextStream\":\"" + next_stream)
+        + ",\"finished\":" + (finished?"true":"false")+ ",\"nextStream\":\"" + next_stream)
         + "\"}";
 }
 
@@ -159,9 +159,8 @@ bool StreamInfo::SetFromJson(const std::string &json_string) {
     auto old = *this;
     JsonStringParser parser(json_string);
     uint64_t id;
-    uint64_t finished_i;
     if (parser.GetUInt64("lastId", &last_id) ||
-        parser.GetUInt64("finished", &finished_i) ||
+        parser.GetBool("finished", &finished) ||
         parser.GetString("nextStream", &next_stream) ||
         !TimeFromJson(parser, "timestampLast", &timestamp_lastentry) ||
         parser.GetString("name", &name) ||
@@ -169,7 +168,6 @@ bool StreamInfo::SetFromJson(const std::string &json_string) {
         *this = old;
         return false;
     }
-    finished=bool(finished_i);
     return true;
 }
 
diff --git a/common/cpp/unittests/data_structs/test_data_structs.cpp b/common/cpp/unittests/data_structs/test_data_structs.cpp
index 4a46f638012bfdb5e6c304835765ca3b667cfc71..e6507bd21cf36d5c8cde8258a7070519ab3de026 100644
--- a/common/cpp/unittests/data_structs/test_data_structs.cpp
+++ b/common/cpp/unittests/data_structs/test_data_structs.cpp
@@ -185,7 +185,7 @@ TEST(StreamInfo, ConvertFromJsonErr) {
 TEST(StreamInfo, ConvertToJson) {
     auto sinfo = PrepareStreamInfo();
 
-    std::string expected_json = R"({"lastId":123,"name":"test","timestampCreated":1000000,"timestampLast":2000000,"finished":1,"nextStream":"next"})";
+    std::string expected_json = R"({"lastId":123,"name":"test","timestampCreated":1000000,"timestampLast":2000000,"finished":true,"nextStream":"next"})";
     auto json = sinfo.Json();
 
     ASSERT_THAT(json,Eq(expected_json));
diff --git a/common/go/src/asapo_common/utils/helpers.go b/common/go/src/asapo_common/utils/helpers.go
index 9b7dc20936f9c4da8d56825b2cafcc2ea317dcf9..e809d31705b3505f704f12e8ca6eb31878e16805 100644
--- a/common/go/src/asapo_common/utils/helpers.go
+++ b/common/go/src/asapo_common/utils/helpers.go
@@ -25,6 +25,16 @@ func MapToJson(res interface{}) ([]byte, error) {
 	}
 }
 
+func GetInt64FromMap(s map[string]interface{}, name string) (int64,bool) {
+	val, ok := InterfaceToInt64(s[name])
+	if ok {
+		return val,true
+	} else {
+		return -1, false
+	}
+}
+
+
 func InterfaceToInt64(val interface{}) (int64, bool) {
 	val64, ok := val.(int64)
 	var valf64 float64
diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp
index dc8c9487e38b389a065c99ab35053c14a2367019..42b3688a1a558e66cdba73dafa0e63bfdd4f8c3b 100644
--- a/consumer/api/cpp/src/consumer_impl.cpp
+++ b/consumer/api/cpp/src/consumer_impl.cpp
@@ -529,7 +529,8 @@ Error ConsumerImpl::SetLastReadMarker(std::string group_id, uint64_t value, std:
 }
 
 uint64_t ConsumerImpl::GetCurrentSize(std::string stream, Error* err) {
-    return GetCurrentCount(stream,false,false,err);
+    auto ri = GetSizeRequestForSingleMessagesStream(stream);
+    return GetCurrentCount(stream,ri,err);
 }
 
 Error ConsumerImpl::GetById(uint64_t id, MessageMeta* info, MessageData* data, std::string stream) {
@@ -843,21 +844,25 @@ void ConsumerImpl::InterruptCurrentOperation() {
 }
 
 uint64_t ConsumerImpl::GetCurrentDatasetCount(std::string stream, bool include_incomplete, Error* err) {
-    return GetCurrentCount(stream,true,include_incomplete,err);
+    RequestInfo ri = GetSizeRequestForDatasetStream(stream, include_incomplete);
+    return GetCurrentCount(stream,ri,err);
 }
 
-uint64_t ConsumerImpl::GetCurrentCount(std::string stream, bool datasets, bool include_incomplete, Error* err) {
-    RequestInfo ri;
-    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source +
-        +"/" + std::move(stream) + "/size";
-    if (datasets) {
-        ri.extra_params = std::string("&incomplete=")+(include_incomplete?"true":"false");
-    }
+RequestInfo ConsumerImpl::GetSizeRequestForDatasetStream(std::string &stream, bool include_incomplete) const {
+    RequestInfo ri = GetSizeRequestForSingleMessagesStream(stream);
+    ri.extra_params = std::string("&incomplete=")+(include_incomplete?"true":"false");
+    return ri;
+}
+
+uint64_t ConsumerImpl::GetCurrentCount(std::string stream, const RequestInfo& ri, Error* err) {
     auto responce = BrokerRequestWithTimeout(ri, err);
     if (*err) {
         return 0;
     }
+    return ParseGetCurrentCountResponce(err, responce);
+}
 
+uint64_t ConsumerImpl::ParseGetCurrentCountResponce(Error* err, const std::string &responce) const {
     JsonStringParser parser(responce);
     uint64_t size;
     if ((*err = parser.GetUInt64("size", &size)) != nullptr) {
@@ -866,4 +871,11 @@ uint64_t ConsumerImpl::GetCurrentCount(std::string stream, bool datasets, bool i
     return size;
 }
 
+RequestInfo ConsumerImpl::GetSizeRequestForSingleMessagesStream(std::string &stream) const {
+    RequestInfo ri;
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source +
+        +"/" + std::move(stream) + "/size";
+    return ri;
+}
+
 }
diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h
index e85028a9de231338b97dd4fb2f8e2b837840eb7a..9628c810760c03a364677519473756bed77f3e8f 100644
--- a/consumer/api/cpp/src/consumer_impl.h
+++ b/consumer/api/cpp/src/consumer_impl.h
@@ -143,7 +143,7 @@ class ConsumerImpl final : public asapo::Consumer {
     std::string OpToUriCmd(GetMessageServerOperation op);
     Error UpdateFolderTokenIfNeeded(bool ignore_existing);
 
-    uint64_t GetCurrentCount(std::string stream, bool datasets, bool include_incomplete, Error* err);
+    uint64_t GetCurrentCount(std::string stream, const RequestInfo& ri, Error* err);
     RequestInfo GetStreamListRequest(const std::string &from, const StreamFilter &filter) const;
 
     std::string endpoint_;
@@ -164,6 +164,9 @@ class ConsumerImpl final : public asapo::Consumer {
     uint64_t resend_attempts_;
     std::atomic<bool> interrupt_flag_{ false};
 
+  RequestInfo GetSizeRequestForSingleMessagesStream(std::string &stream) const;
+  RequestInfo GetSizeRequestForDatasetStream(std::string &stream, bool include_incomplete) const;
+  uint64_t ParseGetCurrentCountResponce(Error* err, const std::string &responce) const;
 };
 
 }
diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp
index 36d083511342f0e6c49a84afa393bda36f717d6b..813a2d65f4144c5c2dfdcf6e7cf04284f35bc916 100644
--- a/consumer/api/cpp/unittests/test_consumer_impl.cpp
+++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp
@@ -1017,8 +1017,8 @@ TEST_F(ConsumerImplTests, GetDatasetByIdUsesCorrectUri) {
 TEST_F(ConsumerImplTests, GetStreamListUsesCorrectUri) {
     MockGetBrokerUri();
     std::string return_streams =
-        std::string(R"({"streams":[{"lastId":123,"name":"test","timestampCreated":1000000,"timestampLast":1000,"finished":0,"nextStream":""},)")+
-        R"({"lastId":124,"name":"test1","timestampCreated":2000000,"timestampLast":2000,"finished":1,"nextStream":"next"}]})";
+        std::string(R"({"streams":[{"lastId":123,"name":"test","timestampCreated":1000000,"timestampLast":1000,"finished":false,"nextStream":""},)")+
+        R"({"lastId":124,"name":"test1","timestampCreated":2000000,"timestampLast":2000,"finished":true,"nextStream":"next"}]})";
     EXPECT_CALL(mock_http_client,
                 Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/0/streams"
                           + "?token=" + expected_token + "&from=stream_from&filter=all", _,
@@ -1032,8 +1032,8 @@ TEST_F(ConsumerImplTests, GetStreamListUsesCorrectUri) {
     ASSERT_THAT(err, Eq(nullptr));
     ASSERT_THAT(streams.size(), Eq(2));
     ASSERT_THAT(streams.size(), 2);
-    ASSERT_THAT(streams[0].Json(), R"({"lastId":123,"name":"test","timestampCreated":1000000,"timestampLast":1000,"finished":0,"nextStream":""})");
-    ASSERT_THAT(streams[1].Json(), R"({"lastId":124,"name":"test1","timestampCreated":2000000,"timestampLast":2000,"finished":1,"nextStream":"next"})");
+    ASSERT_THAT(streams[0].Json(), R"({"lastId":123,"name":"test","timestampCreated":1000000,"timestampLast":1000,"finished":false,"nextStream":""})");
+    ASSERT_THAT(streams[1].Json(), R"({"lastId":124,"name":"test1","timestampCreated":2000000,"timestampLast":2000,"finished":true,"nextStream":"next"})");
 }
 
 TEST_F(ConsumerImplTests, GetStreamListUsesCorrectUriWithoutFrom) {
diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in
index 913883a351bb8df6708b9dfa723c43acbd5517fd..346941f2885d3a88637c021fae2a44f402f2bab0 100644
--- a/consumer/api/python/asapo_consumer.pyx.in
+++ b/consumer/api/python/asapo_consumer.pyx.in
@@ -104,7 +104,7 @@ cdef throw_exception(Error& err, res = None):
 
 cdef class PyConsumer:
     cdef unique_ptr[Consumer] c_consumer
-    cdef StreamFilter _filter_to_cfilter(self,filter):
+    cdef StreamFilter _filter_to_cfilter(self,filter) except + :
         if filter == "all":
             return StreamFilter_kAllStreams
         elif filter == "finished":
@@ -228,7 +228,7 @@ cdef class PyConsumer:
         if err:
             throw_exception(err)
         return _str(group_id)
-    def get_stream_list(self, filter="all", from_stream = ""):
+    def get_stream_list(self,from_stream = "",filter="all"):
         cdef Error err
         cdef vector[StreamInfo] streams
         cdef string b_from_stream = _bytes(from_stream)
diff --git a/receiver/unittests/request_handler/test_request_handler_db_last_stream.cpp b/receiver/unittests/request_handler/test_request_handler_db_last_stream.cpp
index 2e6762cb541f123ce9b529e28b23bbf648792934..9eb2310cacfa5da4097b788d42f609c19b944e25 100644
--- a/receiver/unittests/request_handler/test_request_handler_db_last_stream.cpp
+++ b/receiver/unittests/request_handler/test_request_handler_db_last_stream.cpp
@@ -63,7 +63,7 @@ class DbMetaLastStreamTests : public Test {
     ReceiverConfig config;
     std::string expected_beamtime_id = "beamtime_id";
     std::string expected_data_source = "source";
-    std::string info_str = R"({"lastId":10,"name":"stream","timestampCreated":1000000,"timestampLast":2000000})";
+    std::string info_str = R"({"lastId":10,"name":"stream","timestampCreated":1000000,"timestampLast":2000000,"finished":false,"nextStream":""})";
     asapo::StreamInfo expected_stream_info;
     void SetUp() override {
         GenericRequestHeader request_header;
diff --git a/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp b/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp
index 1d1d96d3d2cf47a84928f603fd5660af329b4804..a2828c31eafab387f1f373bc8ffa35e531a75cfa 100644
--- a/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp
+++ b/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp
@@ -64,7 +64,7 @@ class DbMetaStreamInfoTests : public Test {
     ReceiverConfig config;
     std::string expected_beamtime_id = "beamtime_id";
     std::string expected_data_source = "source";
-    std::string info_str = R"({"lastId":10,"name":"stream","timestampCreated":1000000,"timestampLast":2000000})";
+    std::string info_str = R"({"lastId":10,"name":"stream","timestampCreated":1000000,"timestampLast":2000000,"finished":false,"nextStream":""})";
     asapo::StreamInfo expected_stream_info;
     void SetUp() override {
         GenericRequestHeader request_header;
diff --git a/tests/automatic/consumer/consumer_api/check_linux.sh b/tests/automatic/consumer/consumer_api/check_linux.sh
index 9f599d42b2d01c476ecd9d33aae114cf6438e092..5d4d146969cb33f931535e976356e66dd24a718c 100644
--- a/tests/automatic/consumer/consumer_api/check_linux.sh
+++ b/tests/automatic/consumer/consumer_api/check_linux.sh
@@ -42,7 +42,7 @@ for i in `seq 1 5`;
 do
 	echo 'db.data_stream2.insert({"_id":'$i',"size":6,"name":"'2$i'","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name}
 done
-echo 'db.data_stream2.insert({"_id":'6',"size":0,"name":"asapo_finish_stream","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"asapo_no_next"}})' | mongo ${database_name}
+echo 'db.data_stream2.insert({"_id":'6',"size":0,"name":"asapo_finish_stream","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"asapo_no_next"}})' | mongo ${database_name}
 
 echo hello1 > 1
 
diff --git a/tests/automatic/consumer/consumer_api/check_windows.bat b/tests/automatic/consumer/consumer_api/check_windows.bat
index 19e163518f92fa6ea3bfc202608cd8b4e33c7174..908980541836aede7a5e8685efdd137c87896052 100644
--- a/tests/automatic/consumer/consumer_api/check_windows.bat
+++ b/tests/automatic/consumer/consumer_api/check_windows.bat
@@ -12,7 +12,13 @@ for /l %%x in (1, 1, 10) do echo db.data_default.insert({"_id":%%x,"size":6,"nam
 
 for /l %%x in (1, 1, 5) do echo db.data_stream1.insert({"_id":%%x,"size":6,"name":"1%%x","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}}) | %mongo_exe% %database_name%  || goto :error
 
+echo db.data_stream1.insert({"_id":6,"size":0,"name":"asapo_finish_stream","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"ns"}}) | %mongo_exe% %database_name%  || goto :error
+
 for /l %%x in (1, 1, 5) do echo db.data_stream2.insert({"_id":%%x,"size":6,"name":"2%%x","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}}) | %mongo_exe% %database_name%  || goto :error
+
+echo db.data_stream2.insert({"_id":6,"size":0,"name":"asapo_finish_stream","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"asapo_no_next"}}) | %mongo_exe% %database_name%  || goto :error
+
+
 echo hello1 > 1
 
 
diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp
index e72b7cb189050e158a52bdd5e581734d01a0bcbc..91d802d18e75632333eaac2a0fa657047faa8eee 100644
--- a/tests/automatic/consumer/consumer_api/consumer_api.cpp
+++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp
@@ -143,17 +143,19 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str
     auto streams = consumer->GetStreamList("",asapo::StreamFilter::kAllStreams,&err);
     M_AssertTrue(err == nullptr, "GetStreamList no error");
     M_AssertTrue(streams.size() == 3, "streams.size");
-    M_AssertTrue(streams[0].name == "default", "streams0.name1");
-    M_AssertTrue(streams[1].name == "stream1", "streams1.name2");
-    M_AssertTrue(streams[2].name == "stream2", "streams2.name3");
+    M_AssertTrue(streams[0].name == "default", "streams0.name");
+    M_AssertTrue(streams[1].name == "stream1", "streams1.name");
+    M_AssertTrue(streams[2].name == "stream2", "streams2.name");
     M_AssertTrue(streams[1].finished == true, "stream1 finished");
-    M_AssertTrue(streams[1].next_stream == "next_stream", "stream1 next stream");
+    M_AssertTrue(streams[1].next_stream == "ns", "stream1 next stream");
     M_AssertTrue(streams[2].finished == true, "stream2 finished");
     M_AssertTrue(streams[2].next_stream == "", "stream2 no next stream");
     M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[0].timestamp_created) == 0, "streams0.timestamp");
-    M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[0].timestamp_lastentry) > 0, "streams0.timestamp lastentry  set");
+    M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[0].timestamp_lastentry) == 0, "streams0.timestamp lastentry");
     M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[1].timestamp_created) == 1000, "streams1.timestamp");
+    M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[1].timestamp_lastentry) == 1000, "streams1.timestamp lastentry");
     M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[2].timestamp_created) == 2000, "streams2.timestamp");
+    M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[2].timestamp_lastentry) == 2000, "streams2.timestamp lastentry");
 // acknowledges
 
     auto id = consumer->GetLastAcknowledgedMessage(group_id,"default", &err);
diff --git a/tests/automatic/consumer/consumer_api_python/check_linux.sh b/tests/automatic/consumer/consumer_api_python/check_linux.sh
index 22d179ce11e85172925902031dbfafe5c3147e19..aed808f5de44e225a8bb83dfc9e926098d9ec6c8 100644
--- a/tests/automatic/consumer/consumer_api_python/check_linux.sh
+++ b/tests/automatic/consumer/consumer_api_python/check_linux.sh
@@ -50,6 +50,10 @@ do
 	echo 'db.data_stream2.insert({"_id":'$i',"size":6,"name":"'2$i'","timestamp":3000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name}
 done
 
+echo 'db.data_stream1.insert({"_id":'6',"size":0,"name":"asapo_finish_stream","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"ns"}})' | mongo ${database_name}
+echo 'db.data_stream2.insert({"_id":'6',"size":0,"name":"asapo_finish_stream","timestamp":3000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"asapo_no_next"}})' | mongo ${database_name}
+
+
 sleep 1
 
 export PYTHONPATH=$1:${PYTHONPATH}
diff --git a/tests/automatic/consumer/consumer_api_python/check_windows.bat b/tests/automatic/consumer/consumer_api_python/check_windows.bat
index adcf8ce57d735d5b6da75b464aabe61bc82d0cc1..a6e530432635fb23c88e4dad1b179f3f8283ddde 100644
--- a/tests/automatic/consumer/consumer_api_python/check_windows.bat
+++ b/tests/automatic/consumer/consumer_api_python/check_windows.bat
@@ -20,6 +20,8 @@ for /l %%x in (1, 1, 5) do echo db.data_stream1.insert({"_id":%%x,"size":6,"name
 
 for /l %%x in (1, 1, 5) do echo db.data_stream2.insert({"_id":%%x,"size":6,"name":"2%%x","timestamp":3000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}}) | %mongo_exe% %database_name%  || goto :error
 
+echo db.data_stream1.insert({"_id":6,"size":0,"name":"asapo_finish_stream","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"ns"}}) | %mongo_exe% %database_name%  || goto :error
+echo db.data_stream2.insert({"_id":6,"size":0,"name":"asapo_finish_stream","timestamp":3000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"asapo_no_next"}}) | %mongo_exe% %database_name%  || goto :error
 
 mkdir %source_path%
 
diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py
index 4e091572648f136c7f83c38e86c7df8af83c23ba..c253db6e4768426b6b9d83e69453a939f015ff0c 100644
--- a/tests/automatic/consumer/consumer_api_python/consumer_api.py
+++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py
@@ -129,15 +129,22 @@ def check_single(consumer, group_id):
     _, meta = consumer.get_next(group_id, meta_only=True, stream = "stream2")
     assert_metaname(meta, "21", "get next stream2")
 
-    streams = consumer.get_stream_list("")
+    streams = consumer.get_stream_list("","all")
     assert_eq(len(streams), 4, "number of streams")
     print(streams)
     assert_eq(streams[0]["name"], "default", "streams_name1")
+    assert_eq(streams[0]["finished"], False, "streams_finished1")
     assert_eq(streams[1]["name"], "streamfts", "streams_name2")
     assert_eq(streams[2]["name"], "stream1", "streams_name2")
     assert_eq(streams[3]["name"], "stream2", "streams_name3")
     assert_eq(streams[1]["timestampCreated"], 1000, "streams_timestamp2")
-
+    assert_eq(streams[2]["timestampLast"], 2000, "streams_timestamplast2")
+    assert_eq(streams[2]["finished"], True, "streams_finished2")
+    assert_eq(streams[2]["nextStream"], "ns", "next stream 2")
+    assert_eq(streams[2]["lastId"], 5, "last id stream 2")
+    assert_eq(streams[3]["finished"], True, "streams_finished3")
+    assert_eq(streams[3]["nextStream"], "", "next stream 3")
+    assert_eq(streams[3]["lastId"], 5, "last id stream 3")
     # acks
     try:
         id = consumer.get_last_acknowledged_message(group_id)