From 312957789801a08da0351a910a1f4e14982185d8 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Mon, 21 Dec 2020 16:12:10 +0100
Subject: [PATCH] =?UTF-8?q?start=C2=A0renaming=20stream=20to=20data=20sour?=
 =?UTF-8?q?ce?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 CHANGELOG.md                                  |  4 +
 .../src/asapo_authorizer/server/authorize.go  | 10 +--
 .../asapo_authorizer/server/authorize_test.go | 74 +++++++++----------
 .../src/asapo_authorizer/server/server.go     |  2 +-
 broker/src/asapo_broker/server/listroutes.go  | 22 +++---
 .../asapo_broker/server/process_request.go    |  8 +-
 .../server/process_request_test.go            |  2 +-
 .../cpp/include/asapo/common/data_structs.h   |  8 +-
 consumer/api/cpp/src/server_data_broker.cpp   | 26 +++----
 .../api/cpp/unittests/test_server_broker.cpp  | 66 ++++++++---------
 consumer/api/python/asapo_consumer.pxd        |  2 +-
 consumer/api/python/asapo_consumer.pyx.in     |  8 +-
 producer/api/cpp/src/producer_impl.cpp        |  4 +-
 producer/api/python/asapo_producer.pxd        |  2 +-
 producer/api/python/asapo_producer.pyx.in     | 12 +--
 receiver/src/request.cpp                      |  8 +-
 receiver/src/request.h                        |  6 +-
 .../request_handler_authorize.cpp             |  6 +-
 .../request_handler_authorize.h               |  2 +-
 .../request_handler/request_handler_db.cpp    |  4 +-
 receiver/unittests/receiver_mocking.h         |  4 +-
 .../test_request_handler_authorizer.cpp       | 14 ++--
 .../test_request_handler_db.cpp               | 10 +--
 .../test_request_handler_db_check_request.cpp | 10 +--
 .../test_request_handler_db_last_stream.cpp   |  6 +-
 .../test_request_handler_db_meta_writer.cpp   |  8 +-
 .../test_request_handler_db_stream_info.cpp   |  6 +-
 .../test_request_handler_db_writer.cpp        | 22 +++---
 receiver/unittests/test_request.cpp           |  4 +-
 29 files changed, 182 insertions(+), 178 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2d5eafefd..e6b3d8800 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -15,6 +15,10 @@ BREAKING CHANGES
 * Consumer API - get_next_dataset, get_last_dataset, get_dataset_by_id return dictionary with 'id','expected_size','content' fields, not tuple (id,content) as before
 * Consumer API - remove group_id argument from get_last/get_by_id/get_last_dataset/get_dataset_by_id functions
 * Producer API - changed meaning of subsets (subset_id replaced with id_in_subset and this means now id of the image within a subset (e.g. module number for multi-module detector)), file_id is now a global id of a multi-set data (i.g. multi-image id) 
+    ####  renaming - Producer API
+* stream -> data_source, substream -> stream
+    ####  renaming - Consumer API
+*
 
 BUG FIXES
 * fix memory leak bug in Python consumer library (lead to problems when creating many consumer instances)
diff --git a/authorizer/src/asapo_authorizer/server/authorize.go b/authorizer/src/asapo_authorizer/server/authorize.go
index 394b652a7..edcf0b703 100644
--- a/authorizer/src/asapo_authorizer/server/authorize.go
+++ b/authorizer/src/asapo_authorizer/server/authorize.go
@@ -13,7 +13,7 @@ import (
 type SourceCredentials struct {
 	BeamtimeId string
 	Beamline   string
-	Stream     string
+	DataSource     string
 	Token      string
 	Type 	   string
 }
@@ -30,8 +30,8 @@ func getSourceCredentials(request authorizationRequest) (SourceCredentials, erro
 		return SourceCredentials{}, errors.New("cannot get source credentials from " + request.SourceCredentials)
 	}
 	creds := SourceCredentials{vals[1], vals[2], vals[3], vals[4],vals[0]}
-	if creds.Stream == "" {
-		creds.Stream = "detector"
+	if creds.DataSource == "" {
+		creds.DataSource = "detector"
 	}
 
 	if creds.Beamline == "" {
@@ -124,7 +124,7 @@ func findBeamtimeMetaFromBeamline(beamline string) (beamtimeMeta, error) {
 func alwaysAllowed(creds SourceCredentials) (beamtimeMeta, bool) {
 	for _, pair := range settings.AlwaysAllowedBeamtimes {
 		if pair.BeamtimeId == creds.BeamtimeId {
-			pair.Stream = creds.Stream
+			pair.DataSource = creds.DataSource
 			pair.Type = creds.Type
 			return pair, true
 		}
@@ -198,7 +198,7 @@ func findMeta(creds SourceCredentials) (beamtimeMeta, error) {
 		return beamtimeMeta{}, err
 	}
 
-	meta.Stream = creds.Stream
+	meta.DataSource = creds.DataSource
 	meta.Type = creds.Type
 
 	return meta, nil
diff --git a/authorizer/src/asapo_authorizer/server/authorize_test.go b/authorizer/src/asapo_authorizer/server/authorize_test.go
index d3b8e3629..4085b1b5b 100644
--- a/authorizer/src/asapo_authorizer/server/authorize_test.go
+++ b/authorizer/src/asapo_authorizer/server/authorize_test.go
@@ -58,16 +58,16 @@ var credTests = [] struct {
 	ok bool
 	message string
 } {
-	{"processed%asapo_test%auto%%", SourceCredentials{"asapo_test","auto","detector","","processed"},true,"auto beamline, stream and no token"},
-	{"processed%asapo_test%auto%%token", SourceCredentials{"asapo_test","auto","detector","token","processed"},true,"auto beamline, stream"},
-	{"processed%asapo_test%auto%stream%", SourceCredentials{"asapo_test","auto","stream","","processed"},true,"auto beamline, no token"},
-	{"processed%asapo_test%auto%stream%token", SourceCredentials{"asapo_test","auto","stream","token","processed"},true,"auto beamline,stream, token"},
-	{"processed%asapo_test%beamline%stream%token", SourceCredentials{"asapo_test","beamline","stream","token","processed"},true,"all set"},
-	{"processed%auto%beamline%stream%token", SourceCredentials{"auto","beamline","stream","token","processed"},true,"auto beamtime"},
-	{"raw%auto%auto%stream%token", SourceCredentials{},false,"auto beamtime and beamline"},
-	{"raw%%beamline%stream%token", SourceCredentials{"auto","beamline","stream","token","raw"},true,"empty beamtime"},
-	{"raw%asapo_test%%stream%token", SourceCredentials{"asapo_test","auto","stream","token","raw"},true,"empty bealine"},
-	{"raw%%%stream%token", SourceCredentials{},false,"both empty"},
+	{"processed%asapo_test%auto%%", SourceCredentials{"asapo_test","auto","detector","","processed"},true,"auto beamline, source and no token"},
+	{"processed%asapo_test%auto%%token", SourceCredentials{"asapo_test","auto","detector","token","processed"},true,"auto beamline, source"},
+	{"processed%asapo_test%auto%source%", SourceCredentials{"asapo_test","auto","source","","processed"},true,"auto beamline, no token"},
+	{"processed%asapo_test%auto%source%token", SourceCredentials{"asapo_test","auto","source","token","processed"},true,"auto beamline,source, token"},
+	{"processed%asapo_test%beamline%source%token", SourceCredentials{"asapo_test","beamline","source","token","processed"},true,"all set"},
+	{"processed%auto%beamline%source%token", SourceCredentials{"auto","beamline","source","token","processed"},true,"auto beamtime"},
+	{"raw%auto%auto%source%token", SourceCredentials{},false,"auto beamtime and beamline"},
+	{"raw%%beamline%source%token", SourceCredentials{"auto","beamline","source","token","raw"},true,"empty beamtime"},
+	{"raw%asapo_test%%source%token", SourceCredentials{"asapo_test","auto","source","token","raw"},true,"empty bealine"},
+	{"raw%%%source%token", SourceCredentials{},false,"both empty"},
 }
 
 func TestSplitCreds(t *testing.T) {
@@ -150,46 +150,46 @@ var authTests = [] struct {
 	source_type string
 	beamtime_id string
 	beamline string
-	stream string
+	dataSource string
 	token string
 	originHost string
 	status int
 	message string
 	answer string
 }{
-	{"processed","test","auto","stream", prepareToken("test"),"127.0.0.2",http.StatusOK,"user stream with correct token",
-		`{"beamtimeId":"test","beamline":"bl1","stream":"stream","core-path":"./tf/gpfs/bl1/2019/data/test","beamline-path":"","source-type":"processed"}`},
-	{"processed","test_online","auto","stream", prepareToken("test_online"),"127.0.0.1",http.StatusOK,"with online path, processed type",
-		`{"beamtimeId":"test_online","beamline":"bl1","stream":"stream","core-path":"./tf/gpfs/bl1/2019/data/test_online","beamline-path":"","source-type":"processed"}`},
-	{"processed","test1","auto","stream", prepareToken("test1"),"127.0.0.1",http.StatusUnauthorized,"correct token, beamtime not found",
+	{"processed","test","auto","dataSource", prepareToken("test"),"127.0.0.2",http.StatusOK,"user source with correct token",
+		`{"beamtimeId":"test","beamline":"bl1","dataSource":"dataSource","core-path":"./tf/gpfs/bl1/2019/data/test","beamline-path":"","source-type":"processed"}`},
+	{"processed","test_online","auto","dataSource", prepareToken("test_online"),"127.0.0.1",http.StatusOK,"with online path, processed type",
+		`{"beamtimeId":"test_online","beamline":"bl1","dataSource":"dataSource","core-path":"./tf/gpfs/bl1/2019/data/test_online","beamline-path":"","source-type":"processed"}`},
+	{"processed","test1","auto","dataSource", prepareToken("test1"),"127.0.0.1",http.StatusUnauthorized,"correct token, beamtime not found",
 		""},
-	{"processed","test","auto","stream", prepareToken("wrong"),"127.0.0.1",http.StatusUnauthorized,"user stream with wrong token",
+	{"processed","test","auto","dataSource", prepareToken("wrong"),"127.0.0.1",http.StatusUnauthorized,"user source with wrong token",
 		""},
-	{"processed","test","bl1","stream", prepareToken("test"),"127.0.0.1",http.StatusOK,"correct beamline given",
-		`{"beamtimeId":"test","beamline":"bl1","stream":"stream","core-path":"./tf/gpfs/bl1/2019/data/test","beamline-path":"","source-type":"processed"}`},
-		{"processed","test","bl2","stream", prepareToken("test"),"127.0.0.1",http.StatusUnauthorized,"incorrect beamline given",
+	{"processed","test","bl1","dataSource", prepareToken("test"),"127.0.0.1",http.StatusOK,"correct beamline given",
+		`{"beamtimeId":"test","beamline":"bl1","dataSource":"dataSource","core-path":"./tf/gpfs/bl1/2019/data/test","beamline-path":"","source-type":"processed"}`},
+		{"processed","test","bl2","dataSource", prepareToken("test"),"127.0.0.1",http.StatusUnauthorized,"incorrect beamline given",
 		""},
-	{"processed","auto","p07", "stream",prepareToken("bl_p07"),"127.0.0.1",http.StatusOK,"beamtime found",
-		`{"beamtimeId":"11111111","beamline":"p07","stream":"stream","core-path":"asap3/petra3/gpfs/p07/2020/data/11111111","beamline-path":"","source-type":"processed"}`},
-	{"processed","auto","p07", "stream",prepareToken("bl_p06"),"127.0.0.1",http.StatusUnauthorized,"wrong token",
+	{"processed","auto","p07", "dataSource",prepareToken("bl_p07"),"127.0.0.1",http.StatusOK,"beamtime found",
+		`{"beamtimeId":"11111111","beamline":"p07","dataSource":"dataSource","core-path":"asap3/petra3/gpfs/p07/2020/data/11111111","beamline-path":"","source-type":"processed"}`},
+	{"processed","auto","p07", "dataSource",prepareToken("bl_p06"),"127.0.0.1",http.StatusUnauthorized,"wrong token",
 		""},
-	{"processed","auto","p08", "stream",prepareToken("bl_p08"),"127.0.0.1",http.StatusUnauthorized,"beamtime not found",
+	{"processed","auto","p08", "dataSource",prepareToken("bl_p08"),"127.0.0.1",http.StatusUnauthorized,"beamtime not found",
 		""},
-	{"raw","test_online","auto","stream", prepareToken("test_online"),"127.0.0.1",http.StatusOK,"raw type",
-		`{"beamtimeId":"test_online","beamline":"bl1","stream":"stream","core-path":"./tf/gpfs/bl1/2019/data/test_online","beamline-path":"./bl1/current","source-type":"raw"}`},
-	{"raw","test_online","auto","stream", "","127.0.0.1",http.StatusOK,"raw type",
-		`{"beamtimeId":"test_online","beamline":"bl1","stream":"stream","core-path":"./tf/gpfs/bl1/2019/data/test_online","beamline-path":"./bl1/current","source-type":"raw"}`},
- 	{"raw","auto","p07","stream", "","127.0.0.1",http.StatusOK,"raw type, auto beamtime",
-		`{"beamtimeId":"11111111","beamline":"p07","stream":"stream","core-path":"asap3/petra3/gpfs/p07/2020/data/11111111","beamline-path":"./p07/current","source-type":"raw"}`},
+	{"raw","test_online","auto","dataSource", prepareToken("test_online"),"127.0.0.1",http.StatusOK,"raw type",
+		`{"beamtimeId":"test_online","beamline":"bl1","dataSource":"dataSource","core-path":"./tf/gpfs/bl1/2019/data/test_online","beamline-path":"./bl1/current","source-type":"raw"}`},
+	{"raw","test_online","auto","dataSource", "","127.0.0.1",http.StatusOK,"raw type",
+		`{"beamtimeId":"test_online","beamline":"bl1","dataSource":"dataSource","core-path":"./tf/gpfs/bl1/2019/data/test_online","beamline-path":"./bl1/current","source-type":"raw"}`},
+ 	{"raw","auto","p07","dataSource", "","127.0.0.1",http.StatusOK,"raw type, auto beamtime",
+		`{"beamtimeId":"11111111","beamline":"p07","dataSource":"dataSource","core-path":"asap3/petra3/gpfs/p07/2020/data/11111111","beamline-path":"./p07/current","source-type":"raw"}`},
 	{"raw","auto","p07","noldap", "","127.0.0.1",http.StatusNotFound,"no conection to ldap",
 		""},
-	{"raw","test_online","auto","stream", "","127.0.0.2",http.StatusUnauthorized,"raw type, wrong origin host",
+	{"raw","test_online","auto","dataSource", "","127.0.0.2",http.StatusUnauthorized,"raw type, wrong origin host",
 		""},
-	{"raw","test","auto","stream", prepareToken("test"),"127.0.0.1",http.StatusUnauthorized,"raw when not online",
+	{"raw","test","auto","dataSource", prepareToken("test"),"127.0.0.1",http.StatusUnauthorized,"raw when not online",
 		""},
-	{"processed","test","auto","stream", "","127.0.0.1:1001",http.StatusOK,"processed without token",
-		`{"beamtimeId":"test","beamline":"bl1","stream":"stream","core-path":"./tf/gpfs/bl1/2019/data/test","beamline-path":"","source-type":"processed"}`},
-	{"processed","test","auto","stream", "","127.0.0.2",http.StatusUnauthorized,"processed without token, wrong host",
+	{"processed","test","auto","dataSource", "","127.0.0.1:1001",http.StatusOK,"processed without token",
+		`{"beamtimeId":"test","beamline":"bl1","dataSource":"dataSource","core-path":"./tf/gpfs/bl1/2019/data/test","beamline-path":"","source-type":"processed"}`},
+	{"processed","test","auto","dataSource", "","127.0.0.2",http.StatusUnauthorized,"processed without token, wrong host",
 		""},
 }
 
@@ -222,7 +222,7 @@ func TestAuthorize(t *testing.T) {
 				bl = "bl1"
 			}
 			expected_filter:="a3"+bl+"-hosts"
-			if test.stream == "noldap" {
+			if test.dataSource == "noldap" {
 				err := &common.ServerError{utils.StatusServiceUnavailable,""}
 				mockClient.On("GetAllowedIpsForBeamline", expected_uri, expected_base,expected_filter).Return([]string{}, err)
 			} else {
@@ -230,7 +230,7 @@ func TestAuthorize(t *testing.T) {
 			}
 		}
 
-		request :=  makeRequest(authorizationRequest{test.source_type+"%"+test.beamtime_id+"%"+test.beamline+"%"+test.stream+"%"+test.token,test.originHost})
+		request :=  makeRequest(authorizationRequest{test.source_type+"%"+test.beamtime_id+"%"+test.beamline+"%"+test.dataSource+"%"+test.token,test.originHost})
 		w := doPostRequest("/authorize",request)
 
 		body, _ := ioutil.ReadAll(w.Body)
diff --git a/authorizer/src/asapo_authorizer/server/server.go b/authorizer/src/asapo_authorizer/server/server.go
index e5f751873..7dc7aca8c 100644
--- a/authorizer/src/asapo_authorizer/server/server.go
+++ b/authorizer/src/asapo_authorizer/server/server.go
@@ -8,7 +8,7 @@ import (
 type  beamtimeMeta struct {
 	BeamtimeId string  `json:"beamtimeId"`
 	Beamline string     `json:"beamline"`
-	Stream string       `json:"stream"`
+	DataSource string       `json:"dataSource"`
 	OfflinePath string `json:"core-path"`
 	OnlinePath string `json:"beamline-path"`
 	Type string `json:"source-type"`
diff --git a/broker/src/asapo_broker/server/listroutes.go b/broker/src/asapo_broker/server/listroutes.go
index f971d7199..a59d65bca 100644
--- a/broker/src/asapo_broker/server/listroutes.go
+++ b/broker/src/asapo_broker/server/listroutes.go
@@ -8,49 +8,49 @@ var listRoutes = utils.Routes{
 	utils.Route{
 		"GetNext",
 		"Get",
-		"/database/{dbname}/{stream}/{substream}/{groupid}/next",
+		"/database/{dbname}/{datasource}/{substream}/{groupid}/next",
 		routeGetNext,
 	},
 	utils.Route{
 		"GetSize",
 		"Get",
-		"/database/{dbname}/{stream}/{substream}/size",
+		"/database/{dbname}/{datasource}/{substream}/size",
 		routeGetSize,
 	},
 	utils.Route{
 		"GetSubstreams",
 		"Get",
-		"/database/{dbname}/{stream}/{substream}/substreams",
+		"/database/{dbname}/{datasource}/{substream}/substreams",
 		routeGetSubstreams,
 	},
 	utils.Route{
 		"GetLast",
 		"Get",
-		"/database/{dbname}/{stream}/{substream}/0/last",
+		"/database/{dbname}/{datasource}/{substream}/0/last",
 		routeGetLast,
 	},
 	utils.Route{
 		"GetLastAck",
 		"Get",
-		"/database/{dbname}/{stream}/{substream}/{groupid}/lastack",
+		"/database/{dbname}/{datasource}/{substream}/{groupid}/lastack",
 		routeGetLastAck,
 	},
 	utils.Route{
 		"GetNacks",
 		"Get",
-		"/database/{dbname}/{stream}/{substream}/{groupid}/nacks",
+		"/database/{dbname}/{datasource}/{substream}/{groupid}/nacks",
 		routeGetNacks,
 	},
 	utils.Route{
 		"GetID",
 		"Get",
-		"/database/{dbname}/{stream}/{substream}/0/{id}",
+		"/database/{dbname}/{datasource}/{substream}/0/{id}",
 		routeGetByID,
 	},
 	utils.Route{
 		"GetMeta",
 		"Get",
-		"/database/{dbname}/{stream}/{substream}/0/meta/{id}",
+		"/database/{dbname}/{datasource}/{substream}/0/meta/{id}",
 		routeGetMeta,
 	},
 	utils.Route{
@@ -62,19 +62,19 @@ var listRoutes = utils.Routes{
 	utils.Route{
 		"QueryImages",
 		"Post",
-		"/database/{dbname}/{stream}/{substream}/0/queryimages",
+		"/database/{dbname}/{datasource}/{substream}/0/queryimages",
 		routeQueryImages,
 	},
 	utils.Route{
 		"ResetConter",
 		"Post",
-		"/database/{dbname}/{stream}/{substream}/{groupid}/resetcounter",
+		"/database/{dbname}/{datasource}/{substream}/{groupid}/resetcounter",
 		routeResetCounter,
 	},
 	utils.Route{
 		"ImageOp",
 		"Post",
-		"/database/{dbname}/{stream}/{substream}/{groupid}/{id}",
+		"/database/{dbname}/{datasource}/{substream}/{groupid}/{id}",
 		routeImageOp,
 	},
 	utils.Route{
diff --git a/broker/src/asapo_broker/server/process_request.go b/broker/src/asapo_broker/server/process_request.go
index 4adf102b6..ab1c4d5f2 100644
--- a/broker/src/asapo_broker/server/process_request.go
+++ b/broker/src/asapo_broker/server/process_request.go
@@ -13,7 +13,7 @@ func extractRequestParameters(r *http.Request, needGroupID bool) (string, string
 	vars := mux.Vars(r)
 	db_name, ok1 := vars["dbname"]
 
-	stream, ok3 := vars["stream"]
+	datasource, ok3 := vars["datasource"]
 	substream, ok4 := vars["substream"]
 
 	ok2 := true
@@ -21,7 +21,7 @@ func extractRequestParameters(r *http.Request, needGroupID bool) (string, string
 	if needGroupID {
 		group_id, ok2 = vars["groupid"]
 	}
-	return db_name, stream, substream, group_id, ok1 && ok2 && ok3 && ok4
+	return db_name, datasource, substream, group_id, ok1 && ok2 && ok3 && ok4
 }
 
 func IsLetterOrNumbers(s string) bool {
@@ -52,7 +52,7 @@ func checkGroupID(w http.ResponseWriter, needGroupID bool, group_id string, db_n
 func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_param string, needGroupID bool) {
 	r.Header.Set("Content-type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
-	db_name, stream, substream, group_id, ok := extractRequestParameters(r, needGroupID)
+	db_name, datasource, substream, group_id, ok := extractRequestParameters(r, needGroupID)
 	if !ok {
 		w.WriteHeader(http.StatusBadRequest)
 		return
@@ -68,7 +68,7 @@ func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_par
 	}
 
 	request := database.Request{}
-	request.DbName = db_name+"_"+stream
+	request.DbName = db_name+"_"+datasource
 	request.Op = op
 	request.ExtraParam = extra_param
 	request.DbCollectionName = substream
diff --git a/broker/src/asapo_broker/server/process_request_test.go b/broker/src/asapo_broker/server/process_request_test.go
index 5aa7b28fc..24605fdc0 100644
--- a/broker/src/asapo_broker/server/process_request_test.go
+++ b/broker/src/asapo_broker/server/process_request_test.go
@@ -20,7 +20,7 @@ var correctTokenSuffix, wrongTokenSuffix, suffixWithWrongToken, expectedBeamtime
 
 const expectedGroupID = "bid2a5auidddp1vl71d0"
 const wrongGroupID = "_bid2a5auidddp1vl71"
-const expectedStream = "stream"
+const expectedStream = "datasource"
 const expectedSubstream = "substream"
 
 func prepareTestAuth() {
diff --git a/common/cpp/include/asapo/common/data_structs.h b/common/cpp/include/asapo/common/data_structs.h
index c17921eb7..738b3d8b1 100644
--- a/common/cpp/include/asapo/common/data_structs.h
+++ b/common/cpp/include/asapo/common/data_structs.h
@@ -80,10 +80,10 @@ Error GetSourceTypeFromString(std::string stype,SourceType *type);
 std::string GetStringFromSourceType(SourceType type);
 
 struct SourceCredentials {
-    SourceCredentials(SourceType type, std::string beamtime, std::string beamline, std::string stream, std::string token):
+    SourceCredentials(SourceType type, std::string beamtime, std::string beamline, std::string data_source, std::string token):
         beamtime_id{std::move(beamtime)},
         beamline{std::move(beamline)},
-        stream{std::move(stream)},
+        data_source{std::move(data_source)},
         user_token{std::move(token)},
         type{type}{};
     SourceCredentials() {};
@@ -92,11 +92,11 @@ struct SourceCredentials {
     static const std::string kDefaultBeamtimeId;
     std::string beamtime_id;
     std::string beamline;
-    std::string stream;
+    std::string data_source;
     std::string user_token;
     SourceType type = SourceType::kProcessed;
     std::string GetString() {
-        return (type==SourceType::kRaw?std::string("raw"):std::string("processed")) + "%"+ beamtime_id + "%" + beamline + "%" + stream + "%" + user_token;
+        return (type==SourceType::kRaw?std::string("raw"):std::string("processed")) + "%"+ beamtime_id + "%" + beamline + "%" + data_source + "%" + user_token;
     };
 };
 
diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp
index 590ffcc55..b5189be37 100644
--- a/consumer/api/cpp/src/server_data_broker.cpp
+++ b/consumer/api/cpp/src/server_data_broker.cpp
@@ -114,8 +114,8 @@ ServerDataBroker::ServerDataBroker(std::string server_uri,
 
     // net_client__ will be lazy initialized
 
-    if (source_credentials_.stream.empty()) {
-        source_credentials_.stream = SourceCredentials::kDefaultStream;
+    if (source_credentials_.data_source.empty()) {
+        source_credentials_.data_source = SourceCredentials::kDefaultStream;
     }
 
 }
@@ -246,7 +246,7 @@ Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string g
     interrupt_flag_= false;
     std::string request_suffix = OpToUriCmd(op);
     std::string request_group = OpToUriCmd(op);
-    std::string request_api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream
+    std::string request_api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source
         + "/" + std::move(substream);
     uint64_t elapsed_ms = 0;
     Error no_data_error;
@@ -529,7 +529,7 @@ Error ServerDataBroker::ResetLastReadMarker(std::string group_id, std::string su
 
 Error ServerDataBroker::SetLastReadMarker(uint64_t value, std::string group_id, std::string substream) {
     RequestInfo ri;
-    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/"
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + "/"
         + std::move(substream) + "/" + std::move(group_id) + "/resetcounter";
     ri.extra_params = "&value=" + std::to_string(value);
     ri.post = true;
@@ -541,7 +541,7 @@ Error ServerDataBroker::SetLastReadMarker(uint64_t value, std::string group_id,
 
 uint64_t ServerDataBroker::GetCurrentSize(std::string substream, Error* err) {
     RequestInfo ri;
-    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream +
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source +
         +"/" + std::move(substream) + "/size";
     auto responce = BrokerRequestWithTimeout(ri, err);
     if (*err) {
@@ -575,7 +575,7 @@ Error ServerDataBroker::GetRecordFromServerById(uint64_t id, std::string* respon
                                                 std::string substream,
                                                 bool dataset, uint64_t min_size) {
     RequestInfo ri;
-    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream +
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source +
         +"/" + std::move(substream) +
         "/" + std::move(
         group_id) + "/" + std::to_string(id);
@@ -591,7 +591,7 @@ Error ServerDataBroker::GetRecordFromServerById(uint64_t id, std::string* respon
 
 std::string ServerDataBroker::GetBeamtimeMeta(Error* err) {
     RequestInfo ri;
-    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/default/0/meta/0";
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + "/default/0/meta/0";
 
     return BrokerRequestWithTimeout(ri, err);
 }
@@ -608,7 +608,7 @@ DataSet DecodeDatasetFromResponse(std::string response, Error* err) {
 
 FileInfos ServerDataBroker::QueryImages(std::string query, std::string substream, Error* err) {
     RequestInfo ri;
-    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream +
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source +
         "/" + std::move(substream) + "/0/queryimages";
     ri.post = true;
     ri.body = std::move(query);
@@ -692,7 +692,7 @@ StreamInfos ParseSubstreamsFromResponse(std::string response, Error* err) {
 StreamInfos ServerDataBroker::GetSubstreamList(std::string from, Error* err) {
 
     RequestInfo ri;
-    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/0/substreams";
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + "/0/substreams";
     ri.post = false;
     if (!from.empty()) {
         ri.extra_params = "&from=" + from;
@@ -762,7 +762,7 @@ Error ServerDataBroker::GetDataFromFileTransferService(FileInfo* info, FileData*
 
 Error ServerDataBroker::Acknowledge(std::string group_id, uint64_t id, std::string substream) {
     RequestInfo ri;
-    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream +
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source +
         +"/" + std::move(substream) +
         "/" + std::move(group_id) + "/" + std::to_string(id);
     ri.post = true;
@@ -779,7 +779,7 @@ IdList ServerDataBroker::GetUnacknowledgedTupleIds(std::string group_id,
                                                    uint64_t to_id,
                                                    Error* error) {
     RequestInfo ri;
-    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream +
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source +
         +"/" + std::move(substream) +
         "/" + std::move(group_id) + "/nacks";
     ri.extra_params = "&from=" + std::to_string(from_id) + "&to=" + std::to_string(to_id);
@@ -807,7 +807,7 @@ IdList ServerDataBroker::GetUnacknowledgedTupleIds(std::string group_id,
 
 uint64_t ServerDataBroker::GetLastAcknowledgedTulpeId(std::string group_id, std::string substream, Error* error) {
     RequestInfo ri;
-    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream +
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source +
         +"/" + std::move(substream) +
         "/" + std::move(group_id) + "/lastack";
 
@@ -843,7 +843,7 @@ Error ServerDataBroker::NegativeAcknowledge(std::string group_id,
                                             uint64_t delay_sec,
                                             std::string substream) {
     RequestInfo ri;
-    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream +
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source +
         +"/" + std::move(substream) +
         "/" + std::move(group_id) + "/" + std::to_string(id);
     ri.post = true;
diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp
index 54d3075ab..b1a3ce36d 100644
--- a/consumer/api/cpp/unittests/test_server_broker.cpp
+++ b/consumer/api/cpp/unittests/test_server_broker.cpp
@@ -72,7 +72,7 @@ class ServerDataBrokerTests : public Test {
   std::string expected_filename = "filename";
   std::string expected_full_path = std::string("/tmp/beamline/beamtime") + asapo::kPathSeparator + expected_filename;
   std::string expected_group_id = "groupid";
-  std::string expected_stream = "stream";
+  std::string expected_data_source = "source";
   std::string expected_substream = "substream";
   std::string expected_metadata = "{\"meta\":1}";
   std::string expected_query_string = "bla";
@@ -93,14 +93,14 @@ class ServerDataBrokerTests : public Test {
                                expected_path,
                                true,
                                asapo::SourceCredentials{asapo::SourceType::kProcessed, expected_beamtime_id, "",
-                                                        expected_stream, expected_token})
+                                                        expected_data_source, expected_token})
       };
       fts_data_broker = std::unique_ptr<ServerDataBroker>{
           new ServerDataBroker(expected_server_uri,
                                expected_path,
                                false,
                                asapo::SourceCredentials{asapo::SourceType::kProcessed, expected_beamtime_id, "",
-                                                        expected_stream, expected_token})
+                                                        expected_data_source, expected_token})
       };
       data_broker->io__ = std::unique_ptr<IO>{&mock_io};
       data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient>{&mock_http_client};
@@ -215,7 +215,7 @@ TEST_F(ServerDataBrokerTests, DefaultStreamIsDetector) {
 TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) {
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/"
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/"
                                             + expected_group_id + "/next?token="
                                             + expected_token, _,
                                         _)).WillOnce(DoAll(
@@ -228,7 +228,7 @@ TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) {
 TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUriWithSubstream) {
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" +
                                             expected_substream + "/" + expected_group_id + "/next?token="
                                             + expected_token, _,
                                         _)).WillOnce(DoAll(
@@ -242,7 +242,7 @@ TEST_F(ServerDataBrokerTests, GetLastUsesCorrectUri) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client,
-                Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/last?token="
+                Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/last?token="
                           + expected_token, _,
                       _)).WillOnce(DoAll(
         SetArgPointee<1>(HttpCode::OK),
@@ -411,7 +411,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsNoDataAfterTimeoutEvenIfOtherErrorO
         Return("{\"op\":\"get_record_by_id\",\"id\":" + std::to_string(expected_dataset_id) +
             ",\"id_max\":2,\"next_substream\":\"""\"}")));
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/"
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/"
                                             + std::to_string(expected_dataset_id) + "?token="
                                             + expected_token, _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll(
         SetArgPointee<1>(HttpCode::NotFound),
@@ -595,7 +595,7 @@ TEST_F(ServerDataBrokerTests, ResetCounterByDefaultUsesCorrectUri) {
     data_broker->SetTimeout(100);
 
     EXPECT_CALL(mock_http_client,
-                Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
+                Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/" +
                     expected_group_id +
                     "/resetcounter?token=" + expected_token + "&value=0", _, _, _, _)).WillOnce(DoAll(
         SetArgPointee<3>(HttpCode::OK),
@@ -610,7 +610,7 @@ TEST_F(ServerDataBrokerTests, ResetCounterUsesCorrectUri) {
     data_broker->SetTimeout(100);
 
     EXPECT_CALL(mock_http_client,
-                Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
+                Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/" +
                     expected_group_id +
                     "/resetcounter?token=" + expected_token + "&value=10", _, _, _, _)).WillOnce(DoAll(
         SetArgPointee<3>(HttpCode::OK),
@@ -624,7 +624,7 @@ TEST_F(ServerDataBrokerTests, ResetCounterUsesCorrectUriWithSubstream) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
-    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" +
         expected_substream + "/" +
         expected_group_id +
         "/resetcounter?token=" + expected_token + "&value=10", _, _, _, _)).WillOnce(DoAll(
@@ -639,7 +639,7 @@ TEST_F(ServerDataBrokerTests, GetCurrentSizeUsesCorrectUri) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream +
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source +
         "/default/size?token="
                                             + expected_token, _, _)).WillOnce(DoAll(
         SetArgPointee<1>(HttpCode::OK),
@@ -655,7 +655,7 @@ TEST_F(ServerDataBrokerTests, GetCurrentSizeUsesCorrectUriWithSubstream) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" +
         expected_substream + "/size?token="
                                             + expected_token, _, _)).WillOnce(DoAll(
         SetArgPointee<1>(HttpCode::OK),
@@ -671,7 +671,7 @@ TEST_F(ServerDataBrokerTests, GetCurrentSizeErrorOnWrongResponce) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream +
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source +
         "/default/size?token="
                                             + expected_token, _, _)).WillRepeatedly(DoAll(
         SetArgPointee<1>(HttpCode::Unauthorized),
@@ -687,7 +687,7 @@ TEST_F(ServerDataBrokerTests, GetNDataErrorOnWrongParse) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream +
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source +
         "/default/size?token="
                                             + expected_token, _, _)).WillOnce(DoAll(
         SetArgPointee<1>(HttpCode::OK),
@@ -705,7 +705,7 @@ TEST_F(ServerDataBrokerTests, GetByIdUsesCorrectUri) {
     auto to_send = CreateFI();
     auto json = to_send.Json();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/"
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/"
                                             + std::to_string(
                                                 expected_dataset_id) + "?token="
                                             + expected_token, _,
@@ -724,7 +724,7 @@ TEST_F(ServerDataBrokerTests, GetByIdTimeouts) {
     MockGetBrokerUri();
     data_broker->SetTimeout(10);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/"
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/"
                                             + std::to_string(expected_dataset_id) + "?token="
                                             + expected_token, _, _)).WillOnce(DoAll(
         SetArgPointee<1>(HttpCode::Conflict),
@@ -740,7 +740,7 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsEndOfStream) {
     MockGetBrokerUri();
     data_broker->SetTimeout(10);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/"
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/"
                                             + std::to_string(expected_dataset_id) + "?token="
                                             + expected_token, _, _)).WillOnce(DoAll(
         SetArgPointee<1>(HttpCode::Conflict),
@@ -756,7 +756,7 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsEndOfStreamWhenIdTooLarge) {
     MockGetBrokerUri();
     data_broker->SetTimeout(10);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/"
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/"
                                             + std::to_string(expected_dataset_id) + "?token="
                                             + expected_token, _, _)).WillOnce(DoAll(
         SetArgPointee<1>(HttpCode::Conflict),
@@ -772,7 +772,7 @@ TEST_F(ServerDataBrokerTests, GetMetaDataOK) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream +
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source +
                                             "/default/0/meta/0?token="
                                             + expected_token, _,
                                         _)).WillOnce(DoAll(
@@ -878,7 +878,7 @@ TEST_F(ServerDataBrokerTests, QueryImagesReturnRecords) {
     auto responce_string = "[" + json1 + "," + json2 + "]";
 
     EXPECT_CALL(mock_http_client,
-                Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0" +
+                Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0" +
                     "/queryimages?token=" + expected_token, _, expected_query_string, _, _)).WillOnce(DoAll(
         SetArgPointee<3>(HttpCode::OK),
         SetArgPointee<4>(nullptr),
@@ -899,7 +899,7 @@ TEST_F(ServerDataBrokerTests, QueryImagesUsesCorrectUriWithSubstream) {
 
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" +
         expected_substream + "/0" +
         "/queryimages?token=" + expected_token, _, expected_query_string, _, _)).WillOnce(DoAll(
         SetArgPointee<3>(HttpCode::OK),
@@ -917,7 +917,7 @@ TEST_F(ServerDataBrokerTests, QueryImagesUsesCorrectUriWithSubstream) {
 TEST_F(ServerDataBrokerTests, GetNextDatasetUsesCorrectUri) {
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/" +
                                             expected_group_id + "/next?token="
                                             + expected_token + "&dataset=true&minsize=0", _,
                                         _)).WillOnce(DoAll(
@@ -1046,7 +1046,7 @@ TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUri) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client,
-                Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/last?token="
+                Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/last?token="
                           + expected_token + "&dataset=true&minsize=2", _,
                       _)).WillOnce(DoAll(
         SetArgPointee<1>(HttpCode::OK),
@@ -1059,7 +1059,7 @@ TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUri) {
 TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUriWithSubstream) {
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" +
                                             expected_substream + "/0/last?token="
                                             + expected_token + "&dataset=true&minsize=1", _,
                                         _)).WillOnce(DoAll(
@@ -1073,7 +1073,7 @@ TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUriWithSubstream) {
 TEST_F(ServerDataBrokerTests, GetDatasetByIdUsesCorrectUri) {
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/"
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/"
                                             + std::to_string(expected_dataset_id) + "?token="
                                             + expected_token + "&dataset=true" + "&minsize=0", _,
                                         _)).WillOnce(DoAll(
@@ -1089,7 +1089,7 @@ TEST_F(ServerDataBrokerTests, GetSubstreamListUsesCorrectUri) {
     std::string return_substreams =
         R"({"substreams":[{"lastId":123,"name":"test","timestampCreated":1000000},{"name":"test1","timestampCreated":2000000}]})";
     EXPECT_CALL(mock_http_client,
-                Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/0/substreams"
+                Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/0/substreams"
                           + "?token=" + expected_token + "&from=stream_from", _,
                       _)).WillOnce(DoAll(
         SetArgPointee<1>(HttpCode::OK),
@@ -1108,7 +1108,7 @@ TEST_F(ServerDataBrokerTests, GetSubstreamListUsesCorrectUri) {
 TEST_F(ServerDataBrokerTests, GetSubstreamListUsesCorrectUriWithoutFrom) {
     MockGetBrokerUri();
     EXPECT_CALL(mock_http_client,
-                Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/0/substreams"
+                Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/0/substreams"
                           + "?token=" + expected_token, _,
                       _)).WillOnce(DoAll(
         SetArgPointee<1>(HttpCode::OK),
@@ -1250,7 +1250,7 @@ TEST_F(ServerDataBrokerTests, GetImageTriesToGetTokenAgainIfTransferFailed) {
 TEST_F(ServerDataBrokerTests, AcknowledgeUsesCorrectUri) {
     MockGetBrokerUri();
     auto expected_acknowledge_command = "{\"Op\":\"ackimage\"}";
-    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" +
         expected_substream + "/" +
         expected_group_id
                                              + "/" + std::to_string(expected_dataset_id) + "?token="
@@ -1268,7 +1268,7 @@ TEST_F(ServerDataBrokerTests, AcknowledgeUsesCorrectUriWithDefaultSubStream) {
     MockGetBrokerUri();
     auto expected_acknowledge_command = "{\"Op\":\"ackimage\"}";
     EXPECT_CALL(mock_http_client,
-                Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
+                Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/" +
                     expected_group_id
                            + "/" + std::to_string(expected_dataset_id) + "?token="
                            + expected_token, _, expected_acknowledge_command, _, _)).WillOnce(DoAll(
@@ -1283,7 +1283,7 @@ TEST_F(ServerDataBrokerTests, AcknowledgeUsesCorrectUriWithDefaultSubStream) {
 
 void ServerDataBrokerTests::ExpectIdList(bool error) {
     MockGetBrokerUri();
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" +
         expected_substream + "/" +
         expected_group_id + "/nacks?token=" + expected_token + "&from=1&to=0", _, _)).WillOnce(DoAll(
         SetArgPointee<1>(HttpCode::OK),
@@ -1301,7 +1301,7 @@ TEST_F(ServerDataBrokerTests, GetUnAcknowledgedListReturnsIds) {
 }
 
 void ServerDataBrokerTests::ExpectLastAckId(bool empty_response) {
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" +
         expected_substream + "/" +
         expected_group_id + "/lastack?token=" + expected_token, _, _)).WillOnce(DoAll(
         SetArgPointee<1>(HttpCode::OK),
@@ -1339,7 +1339,7 @@ TEST_F(ServerDataBrokerTests, GetByIdErrorsForId0) {
 TEST_F(ServerDataBrokerTests, ResendNacks) {
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/"
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/"
                                             + expected_group_id + "/next?token="
                                             + expected_token + "&resend_nacks=true&delay_sec=10&resend_attempts=3", _,
                                         _)).WillOnce(DoAll(
@@ -1354,7 +1354,7 @@ TEST_F(ServerDataBrokerTests, ResendNacks) {
 TEST_F(ServerDataBrokerTests, NegativeAcknowledgeUsesCorrectUri) {
     MockGetBrokerUri();
     auto expected_neg_acknowledge_command = R"({"Op":"negackimage","Params":{"DelaySec":10}})";
-    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" +
         expected_substream + "/" +
         expected_group_id
                                              + "/" + std::to_string(expected_dataset_id) + "?token="
diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd
index a1182ee75..9cb5824c0 100644
--- a/consumer/api/python/asapo_consumer.pxd
+++ b/consumer/api/python/asapo_consumer.pxd
@@ -43,7 +43,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo":
     FileInfos content
   struct  SourceCredentials:
     string beamtime_id
-    string stream
+    string data_source
     string user_token
   cppclass StreamInfo:
     string Json(bool add_last_id)
diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in
index fc5ea16b1..69e32ee50 100644
--- a/consumer/api/python/asapo_consumer.pyx.in
+++ b/consumer/api/python/asapo_consumer.pyx.in
@@ -324,14 +324,14 @@ cdef class __PyDataBrokerFactory:
     def __cinit__(self):
         with nogil:
             self.c_factory = DataBrokerFactory()
-    def create_server_broker(self,server_name,source_path,has_filesystem,beamtime_id,stream,token,timeout):
+    def create_server_broker(self,server_name,source_path,has_filesystem,beamtime_id,data_source,token,timeout):
         cdef string b_server_name = _bytes(server_name)
         cdef string b_source_path = _bytes(source_path)
         cdef bool b_has_filesystem = has_filesystem
         cdef SourceCredentials source
         source.beamtime_id = _bytes(beamtime_id)
         source.user_token = _bytes(token)
-        source.stream = _bytes(stream)
+        source.data_source = _bytes(data_source)
         cdef Error err
         broker = PyDataBroker()
         with nogil:
@@ -341,7 +341,7 @@ cdef class __PyDataBrokerFactory:
         broker.c_broker.get().SetTimeout(timeout)
         return broker
 
-def create_server_broker(server_name,source_path,has_filesystem,beamtime_id,stream,token,timeout_ms):
+def create_server_broker(server_name,source_path,has_filesystem,beamtime_id,data_source,token,timeout_ms):
     """
       :param server_name: Server endpoint (hostname:port)
       :type server_name: string
@@ -353,7 +353,7 @@ def create_server_broker(server_name,source_path,has_filesystem,beamtime_id,stre
       :rtype: Tuple with broker object and error.
 	"""
     factory = __PyDataBrokerFactory()
-    return factory.create_server_broker(server_name,source_path,has_filesystem, beamtime_id,stream,token,timeout_ms)
+    return factory.create_server_broker(server_name,source_path,has_filesystem, beamtime_id,data_source,token,timeout_ms)
 
 
 __version__ = "@PYTHON_ASAPO_VERSION@@ASAPO_VERSION_COMMIT@"
diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp
index 9399cd327..2fa855984 100644
--- a/producer/api/cpp/src/producer_impl.cpp
+++ b/producer/api/cpp/src/producer_impl.cpp
@@ -186,8 +186,8 @@ Error ProducerImpl::SetCredentials(SourceCredentials source_cred) {
         return ProducerErrorTemplates::kWrongInput.Generate("credentials already set");
     }
 
-    if (source_cred.stream.empty()) {
-        source_cred.stream = SourceCredentials::kDefaultStream;
+    if (source_cred.data_source.empty()) {
+        source_cred.data_source = SourceCredentials::kDefaultStream;
     }
 
     if (source_cred.beamline.empty()) {
diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd
index f1b400c10..c267b701d 100644
--- a/producer/api/python/asapo_producer.pxd
+++ b/producer/api/python/asapo_producer.pxd
@@ -55,7 +55,7 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo":
   struct  SourceCredentials:
     string beamtime_id
     string beamline
-    string stream
+    string data_source
     string user_token
     SourceType type
 
diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in
index 1040a6fb2..9e49f3187 100644
--- a/producer/api/python/asapo_producer.pyx.in
+++ b/producer/api/python/asapo_producer.pyx.in
@@ -320,7 +320,7 @@ cdef class PyProducer:
             if self.c_producer.get() is not NULL:
                 self.c_producer.get().StopThreads__()
     @staticmethod
-    def __create_producer(endpoint,type,beamtime_id,beamline,stream,token,nthreads,timeout_sec):
+    def __create_producer(endpoint,type,beamtime_id,beamline,data_source,token,nthreads,timeout_sec):
         pyProd = PyProducer()
         cdef Error err
         cdef SourceType source_type
@@ -331,14 +331,14 @@ cdef class PyProducer:
         source.beamtime_id = beamtime_id
         source.beamline = beamline
         source.user_token = token
-        source.stream = stream
+        source.data_source = data_source
         source.type = source_type
         pyProd.c_producer = Producer.Create(endpoint,nthreads,RequestHandlerType_Tcp,source,timeout_sec,&err)
         if err:
             throw_exception(err)
         return pyProd
 
-def create_producer(endpoint,type,beamtime_id,beamline,stream,token,nthreads,timeout_sec):
+def create_producer(endpoint,type,beamtime_id,beamline,data_source,token,nthreads,timeout_sec):
     """
          :param endpoint: server endpoint (url:port)
          :type endpoint: string
@@ -348,8 +348,8 @@ def create_producer(endpoint,type,beamtime_id,beamline,stream,token,nthreads,tim
          :type beamtime_id: string
          :param beamline: beamline name, can be "auto" if beamtime_id is given
          :type beamline: string
-         :param stream: stream to producer data to
-         :type stream: string
+         :param data_source: name of the data source that produces data
+         :type data_source: string
          :param token: authorization token
          :type token: string
          :param nthreads: ingest mode flag
@@ -360,7 +360,7 @@ def create_producer(endpoint,type,beamtime_id,beamline,stream,token,nthreads,tim
             AsapoWrongInputError: wrong input (number of threads, ,,,)
             AsapoProducerError: actually should not happen
     """
-    return PyProducer.__create_producer(_bytes(endpoint),_bytes(type),_bytes(beamtime_id),_bytes(beamline),_bytes(stream),_bytes(token),nthreads,timeout_sec)
+    return PyProducer.__create_producer(_bytes(endpoint),_bytes(type),_bytes(beamtime_id),_bytes(beamline),_bytes(data_source),_bytes(token),nthreads,timeout_sec)
 
 
 __version__ = "@PYTHON_ASAPO_VERSION@@ASAPO_VERSION_COMMIT@"
diff --git a/receiver/src/request.cpp b/receiver/src/request.cpp
index c07c249fb..808567d3d 100644
--- a/receiver/src/request.cpp
+++ b/receiver/src/request.cpp
@@ -129,11 +129,11 @@ const std::string& Request::GetMetaData() const {
 const CustomRequestData& Request::GetCustomData() const {
     return request_header_.custom_data;
 }
-const std::string& Request::GetStream() const {
-    return stream_;
+const std::string& Request::GetDataSource() const {
+    return data_source_;
 }
-void Request::SetStream(std::string stream) {
-    stream_ = std::move(stream);
+void Request::SetDataSource(std::string data_source) {
+    data_source_ = std::move(data_source);
 }
 
 void Request::UnlockDataBufferIfNeeded() {
diff --git a/receiver/src/request.h b/receiver/src/request.h
index 2bd6d5796..4811b01aa 100644
--- a/receiver/src/request.h
+++ b/receiver/src/request.h
@@ -57,8 +57,8 @@ class Request {
     VIRTUAL void SetSourceType(SourceType);
     VIRTUAL SourceType GetSourceType() const;
 
-    VIRTUAL const std::string& GetStream() const;
-    VIRTUAL void SetStream(std::string stream);
+    VIRTUAL const std::string& GetDataSource() const;
+    VIRTUAL void SetDataSource(std::string data_source);
     VIRTUAL void SetMetadata(std::string metadata);
 
     VIRTUAL void SetOnlinePath(std::string facility);
@@ -88,7 +88,7 @@ class Request {
     RequestHandlerList handlers_;
     std::string origin_uri_;
     std::string beamtime_id_;
-    std::string stream_;
+    std::string data_source_;
     std::string beamline_;
     std::string offline_path_;
     std::string online_path_;
diff --git a/receiver/src/request_handler/request_handler_authorize.cpp b/receiver/src/request_handler/request_handler_authorize.cpp
index fa321e304..f13fc1ae2 100644
--- a/receiver/src/request_handler/request_handler_authorize.cpp
+++ b/receiver/src/request_handler/request_handler_authorize.cpp
@@ -45,7 +45,7 @@ Error RequestHandlerAuthorize::Authorize(Request* request, const char* source_cr
 
     JsonStringParser parser{response};
     (err = parser.GetString("beamtimeId", &beamtime_id_)) ||
-    (err = parser.GetString("stream", &stream_)) ||
+    (err = parser.GetString("dataSource", &data_source_)) ||
     (err = parser.GetString("core-path", &offline_path_)) ||
     (err = parser.GetString("beamline-path", &online_path_)) ||
     (err = parser.GetString("source-type", &stype)) ||
@@ -55,7 +55,7 @@ Error RequestHandlerAuthorize::Authorize(Request* request, const char* source_cr
         return ErrorFromAuthorizationServerResponse(err, code);
     } else {
         log__->Debug(std::string("authorized connection from ") + request->GetOriginUri() +"source type: "+stype+ " beamline: " +
-                     beamline_ + ", beamtime id: " + beamtime_id_ + ", stream: " + stream_);
+                     beamline_ + ", beamtime id: " + beamtime_id_ + ", data soucre: " + data_source_);
     }
 
     last_updated_ = system_clock::now();
@@ -106,7 +106,7 @@ Error RequestHandlerAuthorize::ProcessOtherRequest(Request* request) const {
     }
     request->SetBeamtimeId(beamtime_id_);
     request->SetBeamline(beamline_);
-    request->SetStream(stream_);
+    request->SetDataSource(data_source_);
     request->SetOfflinePath(offline_path_);
     request->SetOnlinePath(online_path_);
     request->SetSourceType(source_type_);
diff --git a/receiver/src/request_handler/request_handler_authorize.h b/receiver/src/request_handler/request_handler_authorize.h
index 481927cf0..1798ea8fb 100644
--- a/receiver/src/request_handler/request_handler_authorize.h
+++ b/receiver/src/request_handler/request_handler_authorize.h
@@ -21,7 +21,7 @@ class RequestHandlerAuthorize final: public ReceiverRequestHandler {
     std::unique_ptr<HttpClient>http_client__;
   private:
     mutable std::string beamtime_id_;
-    mutable std::string stream_;
+    mutable std::string data_source_;
     mutable std::string beamline_;
     mutable std::string offline_path_;
     mutable std::string online_path_;
diff --git a/receiver/src/request_handler/request_handler_db.cpp b/receiver/src/request_handler/request_handler_db.cpp
index c26ea6e28..821f0d770 100644
--- a/receiver/src/request_handler/request_handler_db.cpp
+++ b/receiver/src/request_handler/request_handler_db.cpp
@@ -8,8 +8,8 @@ namespace asapo {
 Error RequestHandlerDb::ProcessRequest(Request* request) const {
     if (db_name_.empty()) {
         db_name_ = request->GetBeamtimeId();
-        auto stream = request->GetStream();
-        db_name_ += "_" + stream;
+        auto data_source = request->GetDataSource();
+        db_name_ += "_" + data_source;
     }
 
 
diff --git a/receiver/unittests/receiver_mocking.h b/receiver/unittests/receiver_mocking.h
index 7b72249a5..4845de577 100644
--- a/receiver/unittests/receiver_mocking.h
+++ b/receiver/unittests/receiver_mocking.h
@@ -71,7 +71,7 @@ class MockRequest: public Request {
     MOCK_CONST_METHOD0(GetSlotId, uint64_t());
     MOCK_CONST_METHOD0(GetData, void* ());
     MOCK_CONST_METHOD0(GetBeamtimeId, const std::string & ());
-    MOCK_CONST_METHOD0(GetStream, const std::string & ());
+    MOCK_CONST_METHOD0(GetDataSource, const std::string & ());
     MOCK_CONST_METHOD0(GetMetaData, const std::string & ());
     MOCK_CONST_METHOD0(GetBeamline, const std::string & ());
     MOCK_CONST_METHOD0(GetOpCode, asapo::Opcode ());
@@ -87,7 +87,7 @@ class MockRequest: public Request {
     MOCK_CONST_METHOD0(GetCustomData_t, const uint64_t* ());
     MOCK_CONST_METHOD0(GetMessage, const char* ());
     MOCK_METHOD1(SetBeamtimeId, void (std::string));
-    MOCK_METHOD1(SetStream, void (std::string));
+    MOCK_METHOD1(SetDataSource, void (std::string));
     MOCK_METHOD1(SetBeamline, void (std::string));
     MOCK_METHOD1(SetOnlinePath, void (std::string));
     MOCK_METHOD1(SetOfflinePath, void (std::string));
diff --git a/receiver/unittests/request_handler/test_request_handler_authorizer.cpp b/receiver/unittests/request_handler/test_request_handler_authorizer.cpp
index 304d0af27..c1e5a9741 100644
--- a/receiver/unittests/request_handler/test_request_handler_authorizer.cpp
+++ b/receiver/unittests/request_handler/test_request_handler_authorizer.cpp
@@ -64,7 +64,7 @@ class AuthorizerHandlerTests : public Test {
 
     NiceMock<asapo::MockLogger> mock_logger;
     std::string expected_beamtime_id = "beamtime_id";
-    std::string expected_stream = "stream";
+    std::string expected_data_source = "source";
     std::string expected_beamline = "beamline";
     std::string expected_beamline_path = "/beamline/p01/current";
     std::string expected_core_path = "/gpfs/blabla";
@@ -77,7 +77,7 @@ class AuthorizerHandlerTests : public Test {
     void MockRequestData();
     void SetUp() override {
         GenericRequestHeader request_header;
-        expected_source_credentials = "processed%"+expected_beamtime_id + "%stream%token";
+        expected_source_credentials = "processed%"+expected_beamtime_id + "%source%token";
         expect_request_string = std::string("{\"SourceCredentials\":\"") + expected_source_credentials +
                                 "\",\"OriginHost\":\"" +
                                 expected_producer_uri + "\"}";
@@ -111,7 +111,7 @@ class AuthorizerHandlerTests : public Test {
                 DoAll(SetArgPointee<4>(nullptr),
                       SetArgPointee<3>(code),
                       Return("{\"beamtimeId\":\"" + expected_beamtime_id +
-                             "\",\"stream\":" + "\"" + expected_stream +
+                             "\",\"dataSource\":" + "\"" + expected_data_source +
                              "\",\"beamline-path\":" + "\"" + expected_beamline_path +
                              "\",\"core-path\":" + "\"" + expected_core_path +
                              "\",\"source-type\":" + "\"" + expected_source_type_str +
@@ -123,7 +123,7 @@ class AuthorizerHandlerTests : public Test {
                                                      HasSubstr(std::to_string(int(code))),
                                                      HasSubstr(expected_source_type_str),
                                                      HasSubstr(expected_beamtime_id),
-                                                     HasSubstr(expected_stream),
+                                                     HasSubstr(expected_data_source),
                                                      HasSubstr(expected_producer_uri),
                                                      HasSubstr(expected_authorization_server))));
             } else {
@@ -131,7 +131,7 @@ class AuthorizerHandlerTests : public Test {
                                                      HasSubstr(expected_beamtime_id),
                                                      HasSubstr(expected_beamline),
                                                      HasSubstr(expected_source_type_str),
-                                                     HasSubstr(expected_stream),
+                                                     HasSubstr(expected_data_source),
                                                      HasSubstr(expected_producer_uri))));
             }
         }
@@ -156,7 +156,7 @@ class AuthorizerHandlerTests : public Test {
 
         if (!error && code == HttpCode::OK && set_request) {
             EXPECT_CALL(*mock_request, SetBeamtimeId(expected_beamtime_id));
-            EXPECT_CALL(*mock_request, SetStream(expected_stream));
+            EXPECT_CALL(*mock_request, SetDataSource(expected_data_source));
             EXPECT_CALL(*mock_request, SetOfflinePath(expected_core_path));
             EXPECT_CALL(*mock_request, SetOnlinePath(expected_beamline_path));
             EXPECT_CALL(*mock_request, SetBeamline(expected_beamline));
@@ -265,7 +265,7 @@ TEST_F(AuthorizerHandlerTests, DataTransferRequestAuthorizeUsesCachedValue) {
     EXPECT_CALL(mock_http_client, Post_t(_, _, _, _, _)).Times(0);
     EXPECT_CALL(*mock_request, SetBeamtimeId(expected_beamtime_id));
     EXPECT_CALL(*mock_request, SetBeamline(expected_beamline));
-    EXPECT_CALL(*mock_request, SetStream(expected_stream));
+    EXPECT_CALL(*mock_request, SetDataSource(expected_data_source));
     EXPECT_CALL(*mock_request, SetOnlinePath(expected_beamline_path));
     EXPECT_CALL(*mock_request, SetOfflinePath(expected_core_path));
     EXPECT_CALL(*mock_request, SetSourceType(expected_source_type));
diff --git a/receiver/unittests/request_handler/test_request_handler_db.cpp b/receiver/unittests/request_handler/test_request_handler_db.cpp
index a84aa60b1..0cda482d7 100644
--- a/receiver/unittests/request_handler/test_request_handler_db.cpp
+++ b/receiver/unittests/request_handler/test_request_handler_db.cpp
@@ -66,8 +66,8 @@ class DbHandlerTests : public Test {
     NiceMock<asapo::MockLogger> mock_logger;
     ReceiverConfig config;
     std::string expected_beamtime_id = "beamtime_id";
-    std::string expected_stream = "stream";
-    std::string expected_default_stream = "detector";
+    std::string expected_stream = "source";
+    std::string expected_default_source = "detector";
     std::string expected_discovery_server = "discovery";
     std::string expected_database_server = "127.0.0.1:27017";
 
@@ -81,7 +81,7 @@ class DbHandlerTests : public Test {
         handler.http_client__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client};
         mock_request.reset(new NiceMock<MockRequest> {request_header, 1, "", nullptr});
         ON_CALL(*mock_request, GetBeamtimeId()).WillByDefault(ReturnRef(expected_beamtime_id));
-        ON_CALL(*mock_request, GetStream()).WillByDefault(ReturnRef(expected_stream));
+        ON_CALL(*mock_request, GetDataSource()).WillByDefault(ReturnRef(expected_stream));
 
     }
     void TearDown() override {
@@ -151,7 +151,7 @@ TEST_F(DbHandlerTests, ProcessRequestDiscoversMongoDbAddress) {
     .WillOnce(ReturnRef(expected_beamtime_id))
     ;
 
-    EXPECT_CALL(*mock_request, GetStream())
+    EXPECT_CALL(*mock_request, GetDataSource())
     .WillOnce(ReturnRef(expected_stream))
     ;
 
@@ -188,7 +188,7 @@ TEST_F(DbHandlerTests, ProcessRequestCallsConnectDbWhenNotConnected) {
     .WillOnce(ReturnRef(expected_beamtime_id))
     ;
 
-    EXPECT_CALL(*mock_request, GetStream())
+    EXPECT_CALL(*mock_request, GetDataSource())
     .WillOnce(ReturnRef(expected_stream))
     ;
 
diff --git a/receiver/unittests/request_handler/test_request_handler_db_check_request.cpp b/receiver/unittests/request_handler/test_request_handler_db_check_request.cpp
index 5f02a4d1d..780e78bd6 100644
--- a/receiver/unittests/request_handler/test_request_handler_db_check_request.cpp
+++ b/receiver/unittests/request_handler/test_request_handler_db_check_request.cpp
@@ -73,8 +73,8 @@ class DbCheckRequestHandlerTests : public Test {
     NiceMock<asapo::MockLogger> mock_logger;
     ReceiverConfig config;
     std::string expected_beamtime_id = "beamtime_id";
-    std::string expected_default_stream = "detector";
-    std::string expected_stream = "stream";
+    std::string expected_default_source = "detector";
+    std::string expected_data_source = "source";
     std::string expected_host_uri = "127.0.0.1:1234";
     uint64_t expected_port = 1234;
     uint64_t expected_buf_id = 18446744073709551615ull;
@@ -148,7 +148,7 @@ void DbCheckRequestHandlerTests::ExpectRequestParams(asapo::Opcode op_code, cons
         .WillOnce(ReturnRef(expected_beamtime_id))
         ;
 
-        EXPECT_CALL(*mock_request, GetStream())
+        EXPECT_CALL(*mock_request, GetDataSource())
         .WillOnce(ReturnRef(stream))
         ;
     }
@@ -198,7 +198,7 @@ FileInfo DbCheckRequestHandlerTests::PrepareFileInfo() {
 }
 
 void DbCheckRequestHandlerTests::MockGetByID(asapo::ErrorInterface* error, bool expect_compare ) {
-    ExpectRequestParams(asapo::Opcode::kOpcodeTransferData, expected_stream, expect_compare);
+    ExpectRequestParams(asapo::Opcode::kOpcodeTransferData, expected_data_source, expect_compare);
     EXPECT_CALL(mock_db, GetById_t(expected_collection_name, expected_id, _)).
     WillOnce(DoAll(
                  SetArgPointee<2>(expected_file_info),
@@ -207,7 +207,7 @@ void DbCheckRequestHandlerTests::MockGetByID(asapo::ErrorInterface* error, bool
 }
 
 void DbCheckRequestHandlerTests::MockGetSetByID(asapo::ErrorInterface* error, bool expect_compare ) {
-    ExpectRequestParams(asapo::Opcode::kOpcodeTransferSubsetData, expected_stream, expect_compare);
+    ExpectRequestParams(asapo::Opcode::kOpcodeTransferSubsetData, expected_data_source, expect_compare);
     EXPECT_CALL(mock_db, GetSetById_t(expected_collection_name, expected_subset_id, expected_id, _)).
     WillOnce(DoAll(
                  SetArgPointee<3>(expected_file_info),
diff --git a/receiver/unittests/request_handler/test_request_handler_db_last_stream.cpp b/receiver/unittests/request_handler/test_request_handler_db_last_stream.cpp
index 120dfb7b2..35d6ae14b 100644
--- a/receiver/unittests/request_handler/test_request_handler_db_last_stream.cpp
+++ b/receiver/unittests/request_handler/test_request_handler_db_last_stream.cpp
@@ -62,7 +62,7 @@ class DbMetaLastStreamTests : public Test {
     NiceMock<asapo::MockLogger> mock_logger;
     ReceiverConfig config;
     std::string expected_beamtime_id = "beamtime_id";
-    std::string expected_stream = "stream";
+    std::string expected_data_source = "stream";
     std::string info_str = R"({"lastId":10,"name":"substream","timestampCreated":1000000,"timestampLast":2000000})";
     asapo::StreamInfo expected_stream_info;
     void SetUp() override {
@@ -89,9 +89,9 @@ TEST_F(DbMetaLastStreamTests, CallsUpdate) {
     .WillOnce(ReturnRef(expected_beamtime_id))
     ;
 
-    EXPECT_CALL(*mock_request, GetStream()).WillOnce(ReturnRef(expected_stream));
+    EXPECT_CALL(*mock_request, GetDataSource()).WillOnce(ReturnRef(expected_data_source));
 
-    EXPECT_CALL(mock_db, Connect_t(config.database_uri, expected_beamtime_id + "_" + expected_stream)).
+    EXPECT_CALL(mock_db, Connect_t(config.database_uri, expected_beamtime_id + "_" + expected_data_source)).
     WillOnce(testing::Return(nullptr));
 
 
diff --git a/receiver/unittests/request_handler/test_request_handler_db_meta_writer.cpp b/receiver/unittests/request_handler/test_request_handler_db_meta_writer.cpp
index d80d877bb..7cec10a2c 100644
--- a/receiver/unittests/request_handler/test_request_handler_db_meta_writer.cpp
+++ b/receiver/unittests/request_handler/test_request_handler_db_meta_writer.cpp
@@ -62,7 +62,7 @@ class DbMetaWriterHandlerTests : public Test {
     NiceMock<asapo::MockLogger> mock_logger;
     ReceiverConfig config;
     std::string expected_beamtime_id = "beamtime_id";
-    std::string expected_stream = "stream";
+    std::string expected_data_source = "source";
     std::string meta_str =
         R"("info":"stat","int_data":0,"float_data":0.1,"bool":false)";
     const uint8_t* expected_meta = reinterpret_cast<const uint8_t*>(meta_str.c_str());
@@ -91,12 +91,12 @@ TEST_F(DbMetaWriterHandlerTests, CallsUpdate) {
     .WillOnce(ReturnRef(expected_beamtime_id))
     ;
 
-    EXPECT_CALL(*mock_request, GetStream())
-    .WillOnce(ReturnRef(expected_stream))
+    EXPECT_CALL(*mock_request, GetDataSource())
+    .WillOnce(ReturnRef(expected_data_source))
     ;
 
 
-    EXPECT_CALL(mock_db, Connect_t(config.database_uri, expected_beamtime_id + "_" + expected_stream)).
+    EXPECT_CALL(mock_db, Connect_t(config.database_uri, expected_beamtime_id + "_" + expected_data_source)).
     WillOnce(testing::Return(nullptr));
 
 
diff --git a/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp b/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp
index 5bcd03d12..bd0aee056 100644
--- a/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp
+++ b/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp
@@ -63,7 +63,7 @@ class DbMetaStreamInfoTests : public Test {
     NiceMock<asapo::MockLogger> mock_logger;
     ReceiverConfig config;
     std::string expected_beamtime_id = "beamtime_id";
-    std::string expected_stream = "stream";
+    std::string expected_data_source = "source";
     std::string info_str = R"({"lastId":10,"name":"substream","timestampCreated":1000000,"timestampLast":2000000})";
     asapo::StreamInfo expected_stream_info;
     void SetUp() override {
@@ -90,13 +90,13 @@ TEST_F(DbMetaStreamInfoTests, CallsUpdate) {
     .WillOnce(ReturnRef(expected_beamtime_id))
     ;
 
-    EXPECT_CALL(*mock_request, GetStream()).WillOnce(ReturnRef(expected_stream));
+    EXPECT_CALL(*mock_request, GetDataSource()).WillOnce(ReturnRef(expected_data_source));
 
     EXPECT_CALL(*mock_request, GetSubstream()).Times(2)
     .WillRepeatedly(Return(expected_substream))
     ;
 
-    EXPECT_CALL(mock_db, Connect_t(config.database_uri, expected_beamtime_id + "_" + expected_stream)).
+    EXPECT_CALL(mock_db, Connect_t(config.database_uri, expected_beamtime_id + "_" + expected_data_source)).
     WillOnce(testing::Return(nullptr));
 
 
diff --git a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp
index d291c1075..8ac7d7886 100644
--- a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp
+++ b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp
@@ -74,8 +74,8 @@ class DbWriterHandlerTests : public Test {
     NiceMock<asapo::MockLogger> mock_logger;
     ReceiverConfig config;
     std::string expected_beamtime_id = "beamtime_id";
-    std::string expected_default_stream = "detector";
-    std::string expected_stream = "stream";
+    std::string expected_default_source = "detector";
+    std::string expected_data_source = "source";
     std::string expected_host_ip = "127.0.0.1";
     uint64_t expected_port = 1234;
     uint64_t expected_buf_id = 18446744073709551615ull;
@@ -129,7 +129,7 @@ MATCHER_P(CompareFileInfo, file, "") {
 }
 
 
-void DbWriterHandlerTests::ExpectRequestParams(asapo::Opcode op_code, const std::string& stream) {
+void DbWriterHandlerTests::ExpectRequestParams(asapo::Opcode op_code, const std::string& data_source) {
 
     EXPECT_CALL(*mock_request, WasAlreadyProcessed())
     .WillOnce(Return(false))
@@ -139,8 +139,8 @@ void DbWriterHandlerTests::ExpectRequestParams(asapo::Opcode op_code, const std:
     .WillOnce(ReturnRef(expected_beamtime_id))
     ;
 
-    EXPECT_CALL(*mock_request, GetStream())
-    .WillOnce(ReturnRef(stream))
+    EXPECT_CALL(*mock_request, GetDataSource())
+    .WillOnce(ReturnRef(data_source))
     ;
 
 
@@ -149,7 +149,7 @@ void DbWriterHandlerTests::ExpectRequestParams(asapo::Opcode op_code, const std:
     ;
 
     std::string db_name = expected_beamtime_id;
-    db_name += "_" + stream;
+    db_name += "_" + data_source;
 
     EXPECT_CALL(mock_db, Connect_t(config.database_uri, db_name)).
     WillOnce(testing::Return(nullptr));
@@ -203,7 +203,7 @@ void DbWriterHandlerTests::ExpectLogger() {
     EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("insert record"),
                                          HasSubstr(config.database_uri),
                                          HasSubstr(expected_beamtime_id),
-                                         HasSubstr(expected_stream),
+                                         HasSubstr(expected_data_source),
                                          HasSubstr(expected_collection_name)
                                         )
                                   )
@@ -213,7 +213,7 @@ void DbWriterHandlerTests::ExpectLogger() {
 
 TEST_F(DbWriterHandlerTests, CallsInsert) {
 
-    ExpectRequestParams(asapo::Opcode::kOpcodeTransferData, expected_stream);
+    ExpectRequestParams(asapo::Opcode::kOpcodeTransferData, expected_data_source);
     auto file_info = PrepareFileInfo();
 
     EXPECT_CALL(mock_db, Insert_t(expected_collection_name, CompareFileInfo(file_info), false)).
@@ -225,13 +225,13 @@ TEST_F(DbWriterHandlerTests, CallsInsert) {
 
 TEST_F(DbWriterHandlerTests, CallsInsertSubset) {
 
-    ExpectRequestParams(asapo::Opcode::kOpcodeTransferSubsetData, expected_default_stream);
+    ExpectRequestParams(asapo::Opcode::kOpcodeTransferSubsetData, expected_data_source);
     auto file_info = PrepareFileInfo();
 
 
     EXPECT_CALL(mock_db, InsertAsSubset_t(expected_collection_name, CompareFileInfo(file_info), expected_subset_id,
                                           expected_subset_size, false)).
-    WillOnce(testing::Return(nullptr));
+    WillOnce(testing::Return(   nullptr));
     ExpectLogger();
 
     handler.ProcessRequest(mock_request.get());
@@ -239,7 +239,7 @@ TEST_F(DbWriterHandlerTests, CallsInsertSubset) {
 
 
 void DbWriterHandlerTests::ExpectDuplicatedID() {
-    ExpectRequestParams(asapo::Opcode::kOpcodeTransferData, expected_stream);
+    ExpectRequestParams(asapo::Opcode::kOpcodeTransferData, expected_data_source);
     auto file_info = PrepareFileInfo();
 
     EXPECT_CALL(mock_db, Insert_t(expected_collection_name, CompareFileInfo(file_info), false)).
diff --git a/receiver/unittests/test_request.cpp b/receiver/unittests/test_request.cpp
index 59ce937bd..402f00def 100644
--- a/receiver/unittests/test_request.cpp
+++ b/receiver/unittests/test_request.cpp
@@ -224,9 +224,9 @@ TEST_F(RequestTests, SetGetBeamtimeId) {
 
 
 TEST_F(RequestTests, SetGetStream) {
-    request->SetStream("stream");
+    request->SetDataSource("stream");
 
-    ASSERT_THAT(request->GetStream(), "stream");
+    ASSERT_THAT(request->GetDataSource(), "stream");
 }
 
 
-- 
GitLab