From 3112ae687ea1285bb16cfef757124ebeee67972b Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Wed, 7 Aug 2019 16:00:44 +0200
Subject: [PATCH] start adding streams to producer/workers

---
 .../src/asapo_authorizer/server/authorize.go  | 47 +++++++++++---
 .../asapo_authorizer/server/authorize_test.go | 53 +++++++++++----
 .../src/asapo_authorizer/server/server.go     |  1 +
 common/cpp/include/common/data_structs.h      | 11 ++++
 common/cpp/src/data_structs/data_structs.cpp  |  2 +
 .../dummy_data_producer.cpp                   |  2 +-
 .../worker/getnext_broker/getnext_broker.cpp  |  5 +-
 producer/api/CMakeLists.txt                   |  2 +-
 producer/api/include/producer/producer.h      |  4 +-
 .../api/include/producer/producer_error.h     |  4 +-
 producer/api/src/producer.cpp                 |  4 +-
 producer/api/src/producer_impl.cpp            | 25 ++++---
 producer/api/src/producer_impl.h              |  6 +-
 producer/api/src/producer_request.cpp         |  4 +-
 producer/api/src/producer_request.h           |  4 +-
 producer/api/src/request_handler_tcp.cpp      |  2 +-
 producer/api/unittests/test_producer.cpp      | 13 ++--
 producer/api/unittests/test_producer_impl.cpp | 60 ++++++++++++-----
 .../api/unittests/test_producer_request.cpp   |  6 +-
 .../src/main_eventmon.cpp                     |  2 +-
 receiver/src/request.cpp                      |  7 +-
 receiver/src/request.h                        |  6 ++
 receiver/src/request_handler_authorize.cpp    | 25 +++----
 receiver/src/request_handler_authorize.h      |  6 +-
 receiver/src/request_handler_db.cpp           |  5 ++
 receiver/unittests/receiver_mocking.h         |  2 +
 receiver/unittests/test_request.cpp           |  7 ++
 .../test_request_handler_authorizer.cpp       | 14 +++-
 .../unittests/test_request_handler_db.cpp     | 34 +++++++++-
 .../test_request_handler_db_meta_writer.cpp   |  8 ++-
 .../test_request_handler_db_writer.cpp        | 27 ++++++--
 .../curl_httpclient_command.cpp               |  2 +-
 .../beamtime_metadata/beamtime_metadata.cpp   |  2 +-
 .../next_multithread_broker.cpp               |  2 +-
 .../worker/worker_api/worker_api.cpp          |  3 +-
 .../getlast_broker.cpp                        |  4 +-
 worker/api/cpp/include/worker/data_broker.h   |  2 +-
 worker/api/cpp/src/data_broker.cpp            |  5 +-
 worker/api/cpp/src/server_data_broker.cpp     | 27 +++++---
 worker/api/cpp/src/server_data_broker.h       |  5 +-
 .../api/cpp/unittests/test_server_broker.cpp  | 65 +++++++++++++++----
 worker/api/cpp/unittests/test_worker_api.cpp  |  2 +-
 42 files changed, 378 insertions(+), 139 deletions(-)

diff --git a/authorizer/src/asapo_authorizer/server/authorize.go b/authorizer/src/asapo_authorizer/server/authorize.go
index 132649607..736e22206 100644
--- a/authorizer/src/asapo_authorizer/server/authorize.go
+++ b/authorizer/src/asapo_authorizer/server/authorize.go
@@ -1,20 +1,39 @@
 package server
 
 import (
-	"net/http"
-	"encoding/json"
+	log "asapo_common/logger"
 	"asapo_common/utils"
+	"encoding/json"
+	"errors"
+	"net/http"
 	"path/filepath"
 	"strings"
-	log "asapo_common/logger"
-	"errors"
 )
 
-type authorizationRequest struct {
+type SourceCredentials struct {
 	BeamtimeId string
+	Stream string
+	Token string
+}
+
+type authorizationRequest struct {
+	SourceCredentials string
 	OriginHost string
 }
 
+func getSourceCredentials(request authorizationRequest ) (SourceCredentials,error){
+	vals := strings.Split(request.SourceCredentials,"%")
+
+	if len(vals)!=3 {
+		return SourceCredentials{}, errors.New("cannot get source credentials from "+request.SourceCredentials)
+	}
+	creds := SourceCredentials{vals[0],vals[1],vals[2]}
+	if creds.Stream=="" {
+		creds.Stream="detector"
+	}
+	return creds,nil
+}
+
 func extractRequest(r *http.Request) (request authorizationRequest, err error) {
 	decoder := json.NewDecoder(r.Body)
 	err = decoder.Decode(&request)
@@ -63,9 +82,10 @@ func beamtimeExists(info beamtimeInfo) bool {
 	return checkBeamtimeExistsInStrings(info, lines)
 }
 
-func authorize(request authorizationRequest) (bool, beamtimeInfo) {
+func authorize(request authorizationRequest,creds SourceCredentials) (bool, beamtimeInfo) {
 	for _, pair := range settings.AlwaysAllowedBeamtimes {
-		if pair.BeamtimeId == request.BeamtimeId {
+		if pair.BeamtimeId == creds.BeamtimeId {
+			pair.Stream = creds.Stream
 			return true, pair
 		}
 	}
@@ -78,7 +98,8 @@ func authorize(request authorizationRequest) (bool, beamtimeInfo) {
 	}
 
 	answer.Beamline = beamline
-	answer.BeamtimeId = request.BeamtimeId
+	answer.BeamtimeId = creds.BeamtimeId
+	answer.Stream = creds.Stream
 	if (!beamtimeExists(answer)) {
 		log.Error("cannot authorize beamtime " + answer.BeamtimeId + " for " + request.OriginHost + " in " + answer.Beamline)
 		return false, beamtimeInfo{}
@@ -95,7 +116,15 @@ func routeAuthorize(w http.ResponseWriter, r *http.Request) {
 		w.WriteHeader(http.StatusBadRequest)
 		return
 	}
-	ok, beamtimeInfo := authorize(request)
+
+
+	creds,err := getSourceCredentials(request)
+	if err!=nil {
+		w.WriteHeader(http.StatusBadRequest)
+		return
+	}
+
+	ok, beamtimeInfo := authorize(request,creds)
 	if (!ok) {
 		w.WriteHeader(http.StatusUnauthorized)
 		return
diff --git a/authorizer/src/asapo_authorizer/server/authorize_test.go b/authorizer/src/asapo_authorizer/server/authorize_test.go
index 1c1b39b34..94b05b58e 100644
--- a/authorizer/src/asapo_authorizer/server/authorize_test.go
+++ b/authorizer/src/asapo_authorizer/server/authorize_test.go
@@ -3,12 +3,12 @@ package server
 import (
 	"asapo_common/utils"
 	"github.com/stretchr/testify/assert"
+	"io/ioutil"
 	"net/http"
 	"net/http/httptest"
+	"os"
 	"strings"
 	"testing"
-	"io/ioutil"
-	"os"
 )
 
 type request struct {
@@ -40,20 +40,44 @@ func doAuthorizeRequest(path string,buf string) *httptest.ResponseRecorder {
 	return w
 }
 
-func TestAuthorizeOK(t *testing.T) {
-	allowBeamlines([]beamtimeInfo{{"asapo_test","beamline"}})
-	request :=  makeRequest(authorizationRequest{"asapo_test","host"})
+
+var credTests = [] struct {
+	request string
+	cred SourceCredentials
+	message string
+} {
+	{"asapo_test%%", SourceCredentials{"asapo_test","detector",""},"default stream and no token"},
+	{"asapo_test%%token", SourceCredentials{"asapo_test","detector","token"},"default stream"},
+	{"asapo_test%stream%", SourceCredentials{"asapo_test","stream",""},"no token"},
+	{"asapo_test%stream%token", SourceCredentials{"asapo_test","stream","token"},"all set"},
+}
+
+func TestSplitCreds(t *testing.T) {
+
+	for _, test := range credTests {
+		request :=  authorizationRequest{test.request,"host"}
+		creds,err := getSourceCredentials(request)
+		assert.Nil(t,err)
+		assert.Equal(t,creds,test.cred,test.message)
+	}
+}
+
+func TestAuthorizeDefaultOK(t *testing.T) {
+	allowBeamlines([]beamtimeInfo{{"asapo_test","beamline",""}})
+	request :=  makeRequest(authorizationRequest{"asapo_test%%","host"})
 	w := doAuthorizeRequest("/authorize",request)
 
 	body, _ := ioutil.ReadAll(w.Body)
 
 	assert.Contains(t, string(body), "asapo_test", "")
 	assert.Contains(t, string(body), "beamline", "")
+	assert.Contains(t, string(body), "detector", "")
+
 	assert.Equal(t, http.StatusOK, w.Code, "")
 }
 
 func TestNotAuthorized(t *testing.T) {
-	request :=  makeRequest(authorizationRequest{"any_id","host"})
+	request :=  makeRequest(authorizationRequest{"any_id%%","host"})
 	w := doAuthorizeRequest("/authorize",request)
 	assert.Equal(t, http.StatusUnauthorized, w.Code, "")
 }
@@ -70,10 +94,11 @@ func TestAuthorizeWrongPath(t *testing.T) {
 	assert.Equal(t, http.StatusNotFound, w.Code, "")
 }
 
-func TestAlwaysAuthorizeAllowed(t *testing.T) {
-	allowBeamlines([]beamtimeInfo{{"test","beamline"}})
-	request :=  authorizationRequest{"asapo_test","host"}
-	ok,_ := authorize(request)
+func TestDoNotAuthorizeIfNotInAllowed(t *testing.T) {
+	allowBeamlines([]beamtimeInfo{{"test","beamline",""}})
+	request :=  authorizationRequest{"asapo_test%%","host"}
+	creds,_ := getSourceCredentials(request)
+	ok,_ := authorize(request,creds)
 	assert.Equal(t,false, ok, "")
 }
 
@@ -95,7 +120,7 @@ func TestGetBeamlineFromIP(t *testing.T) {
 
 }
 func TestCheckBeamtimeExistsInStringsFalse(t *testing.T) {
-	beamInfo := beamtimeInfo{"123","bl"}
+	beamInfo := beamtimeInfo{"123","bl",""}
 	lines:=[]string{"111","flash	pg2	11003932		beamtime	start: 2018-06-11","petra3	p01	c20180508-000-COM20181	commissioning"}
 	ok := checkBeamtimeExistsInStrings(beamInfo,lines)
 	assert.False(t,ok, "")
@@ -103,7 +128,7 @@ func TestCheckBeamtimeExistsInStringsFalse(t *testing.T) {
 
 
 func TestCheckBeamtimeExistsInStringsOk(t *testing.T) {
-	beamInfo := beamtimeInfo{"11003932","pg2"}
+	beamInfo := beamtimeInfo{"11003932","pg2",""}
 	lines:=[]string{"111","flash	pg2	11003932		beamtime	start: 2018-06-11","petra3	p01	c20180508-000-COM20181	commissioning"}
 	ok := checkBeamtimeExistsInStrings(beamInfo,lines)
 	assert.True(t,ok, "")
@@ -142,11 +167,11 @@ petra3	p02.1	11004341		beamtime	start: 2018-06-18
 	ioutil.WriteFile("127.0.0.1", []byte("bl1"), 0644)
 
 
-	request := authorizationRequest{"11003924","127.0.0.1"}
+	request := authorizationRequest{"11003924%%","127.0.0.1"}
 	w := doAuthorizeRequest("/authorize",makeRequest(request))
 
 	body, _ := ioutil.ReadAll(w.Body)
-	assert.Contains(t, string(body), request.BeamtimeId, "")
+	assert.Contains(t, string(body), "11003924", "")
 	assert.Contains(t, string(body), "bl1", "")
 	assert.Equal(t, http.StatusOK, w.Code, "")
 
diff --git a/authorizer/src/asapo_authorizer/server/server.go b/authorizer/src/asapo_authorizer/server/server.go
index 4215eecec..166bf12fb 100644
--- a/authorizer/src/asapo_authorizer/server/server.go
+++ b/authorizer/src/asapo_authorizer/server/server.go
@@ -4,6 +4,7 @@ package server
 type  beamtimeInfo struct {
 	BeamtimeId string
 	Beamline string
+	Stream string
 }
 
 type serverSettings struct {
diff --git a/common/cpp/include/common/data_structs.h b/common/cpp/include/common/data_structs.h
index 2ce84c713..56020baac 100644
--- a/common/cpp/include/common/data_structs.h
+++ b/common/cpp/include/common/data_structs.h
@@ -45,5 +45,16 @@ struct DataSet {
 
 using SubDirList = std::vector<std::string>;
 
+
+struct SourceCredentials {
+    static const std::string kDefaultStream;
+    std::string beamtime_id;
+    std::string stream;
+    std::string user_token;
+    std::string GetString() {
+        return beamtime_id + "%" + stream + "%" + user_token;
+    };
+};
+
 }
 #endif //ASAPO_FILE_INFO_H
diff --git a/common/cpp/src/data_structs/data_structs.cpp b/common/cpp/src/data_structs/data_structs.cpp
index 9f12d4547..0c87a7f28 100644
--- a/common/cpp/src/data_structs/data_structs.cpp
+++ b/common/cpp/src/data_structs/data_structs.cpp
@@ -19,6 +19,8 @@ using std::chrono::system_clock;
 
 namespace asapo {
 
+const std::string SourceCredentials::kDefaultStream = "detector";
+
 
 std::string FileInfo::Json() const {
     auto nanoseconds_from_epoch = std::chrono::time_point_cast<std::chrono::nanoseconds>(modify_date).
diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
index 9a270e058..83adf23cc 100644
--- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp
+++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
@@ -141,7 +141,7 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) {
     asapo::Error err;
     auto producer = asapo::Producer::Create(args.receiver_address, args.nthreads,
                                             args.mode == 0 ? asapo::RequestHandlerType::kTcp : asapo::RequestHandlerType::kFilesystem,
-                                            args.beamtime_id, &err);
+                                            asapo::SourceCredentials{args.beamtime_id, "", ""}, &err);
     if(err) {
         std::cerr << "Cannot start producer. ProducerError: " << err << std::endl;
         exit(EXIT_FAILURE);
diff --git a/examples/worker/getnext_broker/getnext_broker.cpp b/examples/worker/getnext_broker/getnext_broker.cpp
index 09b8b8a99..40f8e1faf 100644
--- a/examples/worker/getnext_broker/getnext_broker.cpp
+++ b/examples/worker/getnext_broker/getnext_broker.cpp
@@ -47,8 +47,9 @@ std::vector<std::thread> StartThreads(const Params& params,
     auto exec_next = [&params, nfiles, errors, nbuf, nfiles_total](int i) {
         asapo::FileInfo fi;
         Error err;
-        auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path, params.beamtime_id,
-                      params.token, &err);
+        auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path,
+                      asapo::SourceCredentials{params.beamtime_id, "", params.token}, &err);
+
         broker->SetTimeout((uint64_t) params.timeout_ms);
         asapo::FileData data;
 
diff --git a/producer/api/CMakeLists.txt b/producer/api/CMakeLists.txt
index d521461b6..86d882adc 100644
--- a/producer/api/CMakeLists.txt
+++ b/producer/api/CMakeLists.txt
@@ -14,7 +14,7 @@ set(SOURCE_FILES
 # Library
 ################################
 add_library(${TARGET_NAME} STATIC ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:json_parser>
-        $<TARGET_OBJECTS:curl_http_client> $<TARGET_OBJECTS:request_pool>)
+        $<TARGET_OBJECTS:curl_http_client> $<TARGET_OBJECTS:request_pool> $<TARGET_OBJECTS:data_structs>)
 target_include_directories(${TARGET_NAME} PUBLIC include ${ASAPO_CXX_COMMON_INCLUDE_DIR})
 target_link_libraries(${TARGET_NAME} ${CURL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
 
diff --git a/producer/api/include/producer/producer.h b/producer/api/include/producer/producer.h
index 6943c173a..4ed90b2dd 100644
--- a/producer/api/include/producer/producer.h
+++ b/producer/api/include/producer/producer.h
@@ -19,7 +19,7 @@ class Producer {
      * @return A unique_ptr to a new producer instance
      */
     static std::unique_ptr<Producer> Create(const std::string& endpoint, uint8_t n_processing_threads,
-                                            asapo::RequestHandlerType type, std::string beamtime_id,
+                                            asapo::RequestHandlerType type, SourceCredentials source_cred,
                                             Error* err);
 
     virtual ~Producer() = default;
@@ -56,7 +56,7 @@ class Producer {
     //! Enables/Disables sending logs to the central server
     virtual void EnableRemoteLog(bool enable) = 0;
     //! Set beamtime id which producer will use to send data
-    virtual Error SetBeamtimeId(std::string beamtime_id) = 0;
+    virtual Error SetCredentials(SourceCredentials source_cred) = 0;
 };
 }
 
diff --git a/producer/api/include/producer/producer_error.h b/producer/api/include/producer/producer_error.h
index ee6caa8fd..8d0e4c4d7 100644
--- a/producer/api/include/producer/producer_error.h
+++ b/producer/api/include/producer/producer_error.h
@@ -44,12 +44,12 @@ auto const kFileNameTooLong = ProducerErrorTemplate {
     "filename too long", ProducerErrorType::kFileNameTooLong
 };
 
-auto const kBeamtimeIdTooLong = ProducerErrorTemplate {
+auto const kCredentialsTooLong = ProducerErrorTemplate {
     "beamtime id too long", ProducerErrorType::kBeamtimeIdTooLong
 };
 
 
-auto const kBeamtimeAlreadySet = ProducerErrorTemplate {
+auto const kCredentialsAlreadySet = ProducerErrorTemplate {
     "beamtime id already set", ProducerErrorType::kBeamtimeAlreadySet
 };
 
diff --git a/producer/api/src/producer.cpp b/producer/api/src/producer.cpp
index ac8187646..bc308be85 100644
--- a/producer/api/src/producer.cpp
+++ b/producer/api/src/producer.cpp
@@ -2,7 +2,7 @@
 #include "producer_impl.h"
 
 std::unique_ptr<asapo::Producer> asapo::Producer::Create(const std::string& endpoint, uint8_t n_processing_threads,
-        asapo::RequestHandlerType type, std::string beamtime_id, Error* err) {
+        asapo::RequestHandlerType type, SourceCredentials source_cred, Error* err) {
     if (n_processing_threads > kMaxProcessingThreads) {
         *err = TextError("Too many processing threads: " + std::to_string(n_processing_threads));
         return nullptr;
@@ -19,7 +19,7 @@ std::unique_ptr<asapo::Producer> asapo::Producer::Create(const std::string& endp
         return nullptr;
     }
 
-    *err = producer->SetBeamtimeId(beamtime_id);
+    *err = producer->SetCredentials(std::move(source_cred));
     if (*err) {
         return nullptr;
     }
diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp
index 9b4b55729..af08b782f 100644
--- a/producer/api/src/producer_impl.cpp
+++ b/producer/api/src/producer_impl.cpp
@@ -69,7 +69,7 @@ Error ProducerImpl::Send(const EventHeader& event_header,
 
     auto request_header = GenerateNextSendRequest(event_header, metadata.size());
 
-    return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{beamtime_id_, std::move(request_header),
+    return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header),
                 std::move(data), std::move(metadata), std::move(full_path), callback}
     });
 
@@ -99,19 +99,24 @@ void ProducerImpl::EnableRemoteLog(bool enable) {
     log__->EnableRemoteLog(enable);
 }
 
-Error ProducerImpl::SetBeamtimeId(std::string beamtime_id) {
+Error ProducerImpl::SetCredentials(SourceCredentials source_cred) {
 
-    if (!beamtime_id_.empty()) {
-        log__->Error("beamtime_id already set");
-        return ProducerErrorTemplates::kBeamtimeAlreadySet.Generate();
+    if (!source_cred_string_.empty()) {
+        log__->Error("credentials already set");
+        return ProducerErrorTemplates::kCredentialsAlreadySet.Generate();
     }
 
-    if (beamtime_id.size() > kMaxMessageSize) {
-        log__->Error("beamtime_id is too long - " + beamtime_id);
-        return ProducerErrorTemplates::kBeamtimeIdTooLong.Generate();
+    if (source_cred.stream.empty()) {
+        source_cred.stream = SourceCredentials::kDefaultStream;
+    }
+
+    source_cred_string_ = source_cred.GetString();
+    if (source_cred_string_.size()  + source_cred.user_token.size() > kMaxMessageSize) {
+        log__->Error("credentials string is too long - " + source_cred_string_);
+        source_cred_string_ = "";
+        return ProducerErrorTemplates::kCredentialsTooLong.Generate();
     }
 
-    beamtime_id_ = std::move(beamtime_id);
     return nullptr;
 }
 
@@ -119,7 +124,7 @@ Error ProducerImpl::SendMetaData(const std::string& metadata, RequestCallback ca
     GenericRequestHeader request_header{kOpcodeTransferMetaData, 0, metadata.size(), 0, "beamtime_global.meta"};
     FileData data{new uint8_t[metadata.size()]};
     strncpy((char*)data.get(), metadata.c_str(), metadata.size());
-    return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{beamtime_id_, std::move(request_header),
+    return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header),
                 std::move(data), "", "", callback}
     });
 }
diff --git a/producer/api/src/producer_impl.h b/producer/api/src/producer_impl.h
index cb887750c..fc3d10fc8 100644
--- a/producer/api/src/producer_impl.h
+++ b/producer/api/src/producer_impl.h
@@ -32,14 +32,16 @@ class ProducerImpl : public Producer {
     Error SendFile(const EventHeader& event_header, std::string full_path, RequestCallback callback) override;
     AbstractLogger* log__;
     std::unique_ptr<RequestPool> request_pool__;
-    Error SetBeamtimeId(std::string beamtime_id) override;
+
+    Error SetCredentials(SourceCredentials source_cred) override;
+
     Error SendMetaData(const std::string& metadata, RequestCallback callback) override;
 
   private:
     Error Send(const EventHeader& event_header, FileData data, std::string metadata, std::string full_path,
                RequestCallback callback);
     GenericRequestHeader GenerateNextSendRequest(const EventHeader& event_header, uint64_t meta_size);
-    std::string beamtime_id_;
+    std::string source_cred_string_;
 };
 
 }
diff --git a/producer/api/src/producer_request.cpp b/producer/api/src/producer_request.cpp
index cc5248204..734e3fbc6 100644
--- a/producer/api/src/producer_request.cpp
+++ b/producer/api/src/producer_request.cpp
@@ -11,13 +11,13 @@ Error ProducerRequest::ReadDataFromFileIfNeeded(const IO* io) {
     return err;
 }
 
-ProducerRequest::ProducerRequest(std::string beamtime_id,
+ProducerRequest::ProducerRequest(std::string source_credentials,
                                  GenericRequestHeader h,
                                  FileData data,
                                  std::string metadata,
                                  std::string original_filepath,
                                  RequestCallback callback) : GenericRequest(std::move(h)),
-    beamtime_id{std::move(beamtime_id)},
+    source_credentials{std::move(source_credentials)},
     metadata{std::move(metadata)},
     data{std::move(data)},
     original_filepath{std::move(original_filepath)},
diff --git a/producer/api/src/producer_request.h b/producer/api/src/producer_request.h
index 3f90d52fa..28beafab7 100644
--- a/producer/api/src/producer_request.h
+++ b/producer/api/src/producer_request.h
@@ -11,11 +11,11 @@ namespace asapo {
 
 class ProducerRequest : public GenericRequest {
   public:
-    ProducerRequest(std::string beamtime_id, GenericRequestHeader header, FileData data,
+    ProducerRequest(std::string source_credentials, GenericRequestHeader header, FileData data,
                     std::string metadata,
                     std::string original_filepath,
                     RequestCallback callback);
-    std::string beamtime_id;
+    std::string source_credentials;
     std::string metadata;
     FileData data;
     std::string original_filepath;
diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp
index 4692d0212..ada8ed2ea 100644
--- a/producer/api/src/request_handler_tcp.cpp
+++ b/producer/api/src/request_handler_tcp.cpp
@@ -172,7 +172,7 @@ bool RequestHandlerTcp::ServerError(const Error& err) {
 Error RequestHandlerTcp::SendDataToOneOfTheReceivers(ProducerRequest* request) {
     for (auto receiver_uri : receivers_list_) {
         if (Disconnected()) {
-            auto err = ConnectToReceiver(request->beamtime_id, receiver_uri);
+            auto err = ConnectToReceiver(request->source_credentials, receiver_uri);
             if (err != nullptr ) continue;
         }
 
diff --git a/producer/api/unittests/test_producer.cpp b/producer/api/unittests/test_producer.cpp
index 25065d74a..bdafe317b 100644
--- a/producer/api/unittests/test_producer.cpp
+++ b/producer/api/unittests/test_producer.cpp
@@ -8,12 +8,14 @@
 using ::testing::Ne;
 using ::testing::Eq;
 
+using asapo::SourceCredentials;
+
 namespace {
 
 TEST(CreateProducer, TcpProducer) {
     asapo::Error err;
     std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4, asapo::RequestHandlerType::kTcp,
-                                                "bt", &err);
+                                                SourceCredentials{"bt", "", ""}, &err);
     ASSERT_THAT(dynamic_cast<asapo::ProducerImpl*>(producer.get()), Ne(nullptr));
     ASSERT_THAT(err, Eq(nullptr));
 }
@@ -22,9 +24,9 @@ TEST(CreateProducer, ErrorBeamtime) {
     asapo::Error err;
     std::string expected_beamtimeid(asapo::kMaxMessageSize * 10, 'a');
     std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4, asapo::RequestHandlerType::kTcp,
-                                                expected_beamtimeid, &err);
+                                                SourceCredentials{expected_beamtimeid, "", ""}, &err);
     ASSERT_THAT(producer, Eq(nullptr));
-    ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kBeamtimeIdTooLong));
+    ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCredentialsTooLong));
 }
 
 
@@ -42,14 +44,15 @@ TEST(CreateProducer, FileSystemProducer) {
 TEST(CreateProducer, TooManyThreads) {
     asapo::Error err;
     std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", asapo::kMaxProcessingThreads + 1,
-                                                asapo::RequestHandlerType::kTcp, "bt", &err);
+                                                asapo::RequestHandlerType::kTcp, SourceCredentials{"bt", "", ""}, &err);
     ASSERT_THAT(producer, Eq(nullptr));
     ASSERT_THAT(err, Ne(nullptr));
 }
 
 TEST(Producer, SimpleWorkflowWihoutConnection) {
     asapo::Error err;
-    std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("hello", 5, asapo::RequestHandlerType::kTcp, "bt",
+    std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("hello", 5, asapo::RequestHandlerType::kTcp,
+                                                SourceCredentials{"bt", "", ""},
                                                 &err);
 
     asapo::EventHeader event_header{1, 1, ""};
diff --git a/producer/api/unittests/test_producer_impl.cpp b/producer/api/unittests/test_producer_impl.cpp
index 3e3532baf..832761096 100644
--- a/producer/api/unittests/test_producer_impl.cpp
+++ b/producer/api/unittests/test_producer_impl.cpp
@@ -29,13 +29,14 @@ using asapo::RequestPool;
 using asapo::ProducerRequest;
 
 
-MATCHER_P8(M_CheckSendDataRequest, op_code, beamtime_id, metadata, file_id, file_size, message, subset_id, subset_size,
+MATCHER_P8(M_CheckSendDataRequest, op_code, source_credentials, metadata, file_id, file_size, message, subset_id,
+           subset_size,
            "Checks if a valid GenericRequestHeader was Send") {
     auto request = static_cast<ProducerRequest*>(arg);
     return ((asapo::GenericRequestHeader)(arg->header)).op_code == op_code
            && ((asapo::GenericRequestHeader)(arg->header)).data_id == file_id
            && ((asapo::GenericRequestHeader)(arg->header)).data_size == uint64_t(file_size)
-           && request->beamtime_id == beamtime_id
+           && request->source_credentials == source_credentials
            && request->metadata == metadata
            && (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader)(arg->header)).custom_data[0] ==
                uint64_t(subset_id) : true)
@@ -64,7 +65,17 @@ class ProducerImplTests : public testing::Test {
     uint64_t expected_subset_size = 4;
 
     char expected_name[asapo::kMaxMessageSize] = "test_name";
-    std::string expected_beamtimeid = "beamtime_id";
+    asapo::SourceCredentials expected_credentials {
+        "beamtime_id", "subname", "token"
+    };
+    asapo::SourceCredentials expected_default_credentials {
+        "beamtime_id", "", "token"
+    };
+
+
+    std::string expected_credentials_str = "beamtime_id%subname%token";
+    std::string expected_default_credentials_str = "beamtime_id%detector%token";
+
     std::string expected_metadata = "meta";
     std::string expected_fullpath = "filename";
     void SetUp() override {
@@ -107,12 +118,26 @@ TEST_F(ProducerImplTests, ErrorIfSubsetSizeNotDefined) {
 }
 
 
+TEST_F(ProducerImplTests, UsesDefaultStream) {
+    producer.SetCredentials(expected_default_credentials);
+
+    EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData,
+                                        expected_default_credentials_str, expected_metadata,
+                                        expected_id, expected_size, expected_name, 0, 0))).WillOnce(Return(
+                                                    nullptr));
+
+    asapo::EventHeader event_header{expected_id, expected_size, expected_name};
+    auto err = producer.SendData(event_header, nullptr, expected_metadata, nullptr);
+
+    ASSERT_THAT(err, Eq(nullptr));
+}
+
 
 TEST_F(ProducerImplTests, OKSendingSendDataRequest) {
-    producer.SetBeamtimeId(expected_beamtimeid);
+    producer.SetCredentials(expected_credentials);
 
     EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData,
-                                        expected_beamtimeid, expected_metadata,
+                                        expected_credentials_str, expected_metadata,
                                         expected_id, expected_size, expected_name, 0, 0))).WillOnce(Return(
                                                     nullptr));
 
@@ -123,9 +148,9 @@ TEST_F(ProducerImplTests, OKSendingSendDataRequest) {
 }
 
 TEST_F(ProducerImplTests, OKSendingSendSubsetDataRequest) {
-    producer.SetBeamtimeId(expected_beamtimeid);
+    producer.SetCredentials(expected_credentials);
     EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferSubsetData,
-                                        expected_beamtimeid, expected_metadata,
+                                        expected_credentials_str, expected_metadata,
                                         expected_id, expected_size, expected_name,
                                         expected_subset_id, expected_subset_size))).WillOnce(Return(
                                                     nullptr));
@@ -143,9 +168,9 @@ TEST_F(ProducerImplTests, OKAddingSendMetaDataRequest) {
     expected_metadata = "{\"meta\":10}";
     expected_size = expected_metadata.size();
 
-    producer.SetBeamtimeId(expected_beamtimeid);
+    producer.SetCredentials(expected_credentials);
     EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferMetaData,
-                                        expected_beamtimeid, "", expected_id,
+                                        expected_credentials_str, "", expected_id,
                                         expected_size, "beamtime_global.meta", 10, 10))).WillOnce(Return(
                                                     nullptr));
 
@@ -155,10 +180,10 @@ TEST_F(ProducerImplTests, OKAddingSendMetaDataRequest) {
 }
 
 TEST_F(ProducerImplTests, OKSendingSendFileRequest) {
-    producer.SetBeamtimeId(expected_beamtimeid);
+    producer.SetCredentials(expected_credentials);
 
     EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData,
-                                        expected_beamtimeid, "", expected_id, 0, expected_name, 0, 0))).WillOnce(Return(
+                                        expected_credentials_str, "", expected_id, 0, expected_name, 0, 0))).WillOnce(Return(
                                                     nullptr));
 
     asapo::EventHeader event_header{expected_id, 0, expected_name};
@@ -169,21 +194,22 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequest) {
 
 
 TEST_F(ProducerImplTests, ErrorSettingBeamtime) {
-    std::string expected_beamtimeid(asapo::kMaxMessageSize * 10, 'a');
+    std::string long_str(asapo::kMaxMessageSize * 10, 'a');
+    expected_credentials = asapo::SourceCredentials{long_str, "", ""};
     EXPECT_CALL(mock_logger, Error(testing::HasSubstr("too long")));
 
-    auto err = producer.SetBeamtimeId(expected_beamtimeid);
+    auto err = producer.SetCredentials(expected_credentials);
 
-    ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kBeamtimeIdTooLong));
+    ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCredentialsTooLong));
 }
 
 TEST_F(ProducerImplTests, ErrorSettingSecondTime) {
     EXPECT_CALL(mock_logger, Error(testing::HasSubstr("already")));
 
-    producer.SetBeamtimeId("1");
-    auto err = producer.SetBeamtimeId("2");
+    producer.SetCredentials(asapo::SourceCredentials{"1", "2", "3"});
+    auto err = producer.SetCredentials(asapo::SourceCredentials{"4", "5", "6"});
 
-    ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kBeamtimeAlreadySet));
+    ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCredentialsAlreadySet));
 }
 
 
diff --git a/producer/api/unittests/test_producer_request.cpp b/producer/api/unittests/test_producer_request.cpp
index 9e2b95479..3ca0a1fe1 100644
--- a/producer/api/unittests/test_producer_request.cpp
+++ b/producer/api/unittests/test_producer_request.cpp
@@ -35,7 +35,7 @@ using ::testing::Sequence;
 
 TEST(ProducerRequest, Constructor) {
     char  expected_file_name[asapo::kMaxMessageSize] = "test_name";
-    char  expected_beamtime_id[asapo::kMaxMessageSize] = "test_beamtime_id";
+    char  expected_source_credentials[asapo::kMaxMessageSize] = "test_beamtime_id%test_streamid%test_token";
     uint64_t expected_file_id = 42;
     uint64_t expected_file_size = 1337;
     uint64_t expected_meta_size = 137;
@@ -44,9 +44,9 @@ TEST(ProducerRequest, Constructor) {
 
     asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size,
                                        expected_meta_size, expected_file_name};
-    asapo::ProducerRequest request{expected_beamtime_id, std::move(header), nullptr, expected_meta, "", nullptr};
+    asapo::ProducerRequest request{expected_source_credentials, std::move(header), nullptr, expected_meta, "", nullptr};
 
-    ASSERT_THAT(request.beamtime_id, Eq(expected_beamtime_id));
+    ASSERT_THAT(request.source_credentials, Eq(expected_source_credentials));
     ASSERT_THAT(request.metadata, Eq(expected_meta));
     ASSERT_THAT(request.header.message, testing::StrEq(expected_file_name));
     ASSERT_THAT(request.header.data_size, Eq(expected_file_size));
diff --git a/producer/event_monitor_producer/src/main_eventmon.cpp b/producer/event_monitor_producer/src/main_eventmon.cpp
index d4344370c..db22a15f7 100644
--- a/producer/event_monitor_producer/src/main_eventmon.cpp
+++ b/producer/event_monitor_producer/src/main_eventmon.cpp
@@ -39,7 +39,7 @@ std::unique_ptr<Producer> CreateProducer() {
 
     Error err;
     auto producer = Producer::Create(config->asapo_endpoint, (uint8_t) config->nthreads,
-                                     config->mode, config->beamtime_id, &err);
+                                     config->mode, asapo::SourceCredentials{config->beamtime_id, "", ""}, &err);
     if(err) {
         std::cerr << "cannot create producer: " << err << std::endl;
         exit(EXIT_FAILURE);
diff --git a/receiver/src/request.cpp b/receiver/src/request.cpp
index 286cff585..e8e764689 100644
--- a/receiver/src/request.cpp
+++ b/receiver/src/request.cpp
@@ -179,6 +179,11 @@ const std::string& Request::GetMetaData() const {
 const CustomRequestData& Request::GetCustomData() const {
     return request_header_.custom_data;
 }
-
+const std::string& Request::GetStream() const {
+    return stream_;
+}
+void Request::SetStream(std::string stream) {
+    stream_ = std::move(stream);
+}
 
 }
\ No newline at end of file
diff --git a/receiver/src/request.h b/receiver/src/request.h
index c97cdd357..b11f825bb 100644
--- a/receiver/src/request.h
+++ b/receiver/src/request.h
@@ -41,6 +41,11 @@ class Request {
     VIRTUAL const std::string& GetBeamtimeId() const;
     VIRTUAL void SetBeamtimeId(std::string beamtime_id);
     VIRTUAL void SetBeamline(std::string beamline);
+
+    VIRTUAL const std::string& GetStream() const;
+    VIRTUAL void SetStream(std::string stream);
+
+
     VIRTUAL const std::string& GetBeamline() const;
     VIRTUAL const CustomRequestData& GetCustomData() const;
 
@@ -60,6 +65,7 @@ class Request {
     RequestHandlerList handlers_;
     std::string origin_uri_;
     std::string beamtime_id_;
+    std::string stream_;
     std::string beamline_;
     std::string metadata_;
     CacheMeta* slot_meta_ = nullptr;
diff --git a/receiver/src/request_handler_authorize.cpp b/receiver/src/request_handler_authorize.cpp
index 823b74546..8cc5a5e50 100644
--- a/receiver/src/request_handler_authorize.cpp
+++ b/receiver/src/request_handler_authorize.cpp
@@ -9,9 +9,9 @@ using std::chrono::system_clock;
 
 namespace asapo {
 
-std::string RequestHandlerAuthorize::GetRequestString(const Request* request, const char* beamtime_id) const {
-    std::string request_string = std::string("{\"BeamtimeId\":\"") +
-                                 beamtime_id + "\",\"OriginHost\":\"" + request->GetOriginUri() + "\"}";
+std::string RequestHandlerAuthorize::GetRequestString(const Request* request, const char* source_credentials) const {
+    std::string request_string = std::string("{\"SourceCredentials\":\"") +
+        source_credentials + "\",\"OriginHost\":\"" + request->GetOriginUri() + "\"}";
     return request_string;
 }
 
@@ -26,10 +26,10 @@ Error RequestHandlerAuthorize::ErrorFromServerResponse(const Error& err, HttpCod
     }
 }
 
-Error RequestHandlerAuthorize::Authorize(Request* request, const char* beamtime_id) const {
+Error RequestHandlerAuthorize::Authorize(Request* request, const char* source_credentials) const {
     HttpCode code;
     Error err;
-    std::string request_string = GetRequestString(request, beamtime_id);
+    std::string request_string = GetRequestString(request, source_credentials);
 
     auto response = http_client__->Post(GetReceiverConfig()->authorization_server + "/authorize", request_string, &code,
                                         &err);
@@ -41,23 +41,25 @@ Error RequestHandlerAuthorize::Authorize(Request* request, const char* beamtime_
         return auth_error;
     }
 
-    last_updated_ = system_clock::now();
-
     JsonStringParser parser{response};
     (err = parser.GetString("BeamtimeId", &beamtime_id_)) ||
+    (err = parser.GetString("Stream", &stream_)) ||
     (err = parser.GetString("Beamline", &beamline_));
     if (err) {
         return ErrorFromServerResponse(err, code);
     } else {
         log__->Debug(std::string("authorized connection from ") + request->GetOriginUri() + " beamline: " +
-                     beamline_ + ", beamtime id: " + beamtime_id_);
+                     beamline_ + ", beamtime id: " + beamtime_id_+", stream: "+stream_);
     }
 
+    last_updated_ = system_clock::now();
+    cached_source_credentials_ = source_credentials;
+
     return nullptr;
 }
 
 Error RequestHandlerAuthorize::ProcessAuthorizationRequest(Request* request) const {
-    if (!beamtime_id_.empty()) {
+    if (!cached_source_credentials_.empty()) {
         Error auth_error = asapo::ReceiverErrorTemplates::kAuthorizationFailure.Generate();
         auth_error->Append("already authorized");
         log__->Error("failure authorizing at " + GetReceiverConfig()->authorization_server + " - " +
@@ -69,20 +71,21 @@ Error RequestHandlerAuthorize::ProcessAuthorizationRequest(Request* request) con
 }
 
 Error RequestHandlerAuthorize::ProcessOtherRequest(Request* request) const {
-    if (beamtime_id_.empty()) {
+    if (cached_source_credentials_.empty()) {
         return ReceiverErrorTemplates::kAuthorizationFailure.Generate();
     }
 
     uint64_t elapsed_ms = (uint64_t) std::chrono::duration_cast<std::chrono::milliseconds>
                           (system_clock::now() - last_updated_).count();
     if (elapsed_ms >= GetReceiverConfig()->authorization_interval_ms) {
-        auto err = Authorize(request, beamtime_id_.c_str());
+        auto err = Authorize(request, cached_source_credentials_.c_str());
         if (err) {
             return err;
         }
     }
     request->SetBeamtimeId(beamtime_id_);
     request->SetBeamline(beamline_);
+    request->SetStream(stream_);
     return nullptr;
 }
 
diff --git a/receiver/src/request_handler_authorize.h b/receiver/src/request_handler_authorize.h
index f439bd51c..c0f4e63a7 100644
--- a/receiver/src/request_handler_authorize.h
+++ b/receiver/src/request_handler_authorize.h
@@ -21,14 +21,16 @@ class RequestHandlerAuthorize final: public ReceiverRequestHandler {
     std::unique_ptr<HttpClient>http_client__;
   private:
     mutable std::string beamtime_id_;
+    mutable std::string stream_;
     mutable std::string beamline_;
+    mutable std::string cached_source_credentials_;
     mutable std::chrono::system_clock::time_point last_updated_;
     Error ProcessAuthorizationRequest(Request* request) const;
     Error ProcessOtherRequest(Request* request) const;
-    Error Authorize(Request* request, const char* beamtime_id) const;
+    Error Authorize(Request* request, const char* source_credentials) const;
     Error ErrorFromServerResponse(const Error& err, HttpCode code) const;
 
-    std::string GetRequestString(const Request* request, const char* beamtime_id) const;
+    std::string GetRequestString(const Request* request, const char* source_credentials) const;
 };
 
 }
diff --git a/receiver/src/request_handler_db.cpp b/receiver/src/request_handler_db.cpp
index b60a2dc6a..874c5846e 100644
--- a/receiver/src/request_handler_db.cpp
+++ b/receiver/src/request_handler_db.cpp
@@ -8,8 +8,13 @@ namespace asapo {
 Error RequestHandlerDb::ProcessRequest(Request* request) const {
     if (db_name_.empty()) {
         db_name_ = request->GetBeamtimeId();
+        auto stream = request->GetStream();
+        if (stream != "detector") {
+            db_name_+="_"+stream;
+        }
     }
 
+
     return ConnectToDbIfNeeded();
 }
 
diff --git a/receiver/unittests/receiver_mocking.h b/receiver/unittests/receiver_mocking.h
index 4b3d3771a..77a5e8cb3 100644
--- a/receiver/unittests/receiver_mocking.h
+++ b/receiver/unittests/receiver_mocking.h
@@ -51,6 +51,7 @@ class MockRequest: public Request {
     MOCK_CONST_METHOD0(GetSlotId, uint64_t());
     MOCK_CONST_METHOD0(GetData, void* ());
     MOCK_CONST_METHOD0(GetBeamtimeId, const std::string & ());
+    MOCK_CONST_METHOD0(GetStream, const std::string & ());
     MOCK_CONST_METHOD0(GetMetaData, const std::string & ());
     MOCK_CONST_METHOD0(GetBeamline, const std::string & ());
     MOCK_CONST_METHOD0(GetOpCode,
@@ -62,6 +63,7 @@ class MockRequest: public Request {
     MOCK_CONST_METHOD0(GetCustomData_t, const uint64_t* ());
     MOCK_CONST_METHOD0(GetMessage, const char* ());
     MOCK_METHOD1(SetBeamtimeId, void (std::string));
+    MOCK_METHOD1(SetStream, void (std::string));
     MOCK_METHOD1(SetBeamline, void (std::string));
 };
 
diff --git a/receiver/unittests/test_request.cpp b/receiver/unittests/test_request.cpp
index e50d65888..73d917796 100644
--- a/receiver/unittests/test_request.cpp
+++ b/receiver/unittests/test_request.cpp
@@ -324,6 +324,13 @@ TEST_F(RequestTests, SetGetBeamtimeId) {
 }
 
 
+TEST_F(RequestTests, SetGetStream) {
+    request->SetStream("stream");
+
+    ASSERT_THAT(request->GetStream(), "stream");
+}
+
+
 TEST_F(RequestTests, SetGetBeamline) {
     request->SetBeamline("beamline");
 
diff --git a/receiver/unittests/test_request_handler_authorizer.cpp b/receiver/unittests/test_request_handler_authorizer.cpp
index 8e179fa71..cd3f45503 100644
--- a/receiver/unittests/test_request_handler_authorizer.cpp
+++ b/receiver/unittests/test_request_handler_authorizer.cpp
@@ -63,11 +63,13 @@ class AuthorizerHandlerTests : public Test {
     ReceiverConfig config;
 
     NiceMock<asapo::MockLogger> mock_logger;
+    std::string expected_source_credentials = "beamtime_id%stream%token";
     std::string expected_beamtime_id = "beamtime_id";
+    std::string expected_stream = "stream";
     std::string expected_beamline = "beamline";
     std::string expected_producer_uri = "producer_uri";
     std::string expected_authorization_server = "authorizer_host";
-    std::string expect_request_string = std::string("{\"BeamtimeId\":\"") + expected_beamtime_id + "\",\"OriginHost\":\"" +
+    std::string expect_request_string = std::string("{\"SourceCredentials\":\"") + expected_source_credentials + "\",\"OriginHost\":\"" +
                                         expected_producer_uri + "\"}";
 
     void MockRequestData();
@@ -101,19 +103,23 @@ class AuthorizerHandlerTests : public Test {
             WillOnce(
                 DoAll(SetArgPointee<3>(nullptr),
                       SetArgPointee<2>(code),
-                      Return("{\"BeamtimeId\":\"" + expected_beamtime_id + "\",\"Beamline\":" + "\"" + expected_beamline + "\"}")
+                      Return("{\"BeamtimeId\":\"" + expected_beamtime_id +
+                          "\",\"Stream\":" + "\"" + expected_stream +
+                      "\",\"Beamline\":" + "\"" + expected_beamline + "\"}")
                      ));
             if (code != HttpCode::OK) {
                 EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("failure authorizing"),
                                                      HasSubstr("return code"),
                                                      HasSubstr(std::to_string(int(code))),
                                                      HasSubstr(expected_beamtime_id),
+                                                     HasSubstr(expected_stream),
                                                      HasSubstr(expected_producer_uri),
                                                      HasSubstr(expected_authorization_server))));
             } else {
                 EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("authorized"),
                                                      HasSubstr(expected_beamtime_id),
                                                      HasSubstr(expected_beamline),
+                                                     HasSubstr(expected_stream),
                                                      HasSubstr(expected_producer_uri))));
             }
         }
@@ -125,7 +131,7 @@ class AuthorizerHandlerTests : public Test {
         .WillOnce(Return(asapo::kOpcodeAuthorize))
         ;
         EXPECT_CALL(*mock_request, GetMessage())
-        .WillOnce(Return(expected_beamtime_id.c_str()))
+        .WillOnce(Return(expected_source_credentials.c_str()))
         ;
 
         MockAuthRequest(error, code);
@@ -137,6 +143,7 @@ class AuthorizerHandlerTests : public Test {
         ;
         if (!error && code == HttpCode::OK) {
             EXPECT_CALL(*mock_request, SetBeamtimeId(expected_beamtime_id));
+            EXPECT_CALL(*mock_request, SetStream(expected_stream));
             EXPECT_CALL(*mock_request, SetBeamline(expected_beamline));
         }
 
@@ -231,6 +238,7 @@ TEST_F(AuthorizerHandlerTests, DataTransferRequestAuthorizeUsesCachedValue) {
     EXPECT_CALL(mock_http_client, Post_t(_, _, _, _)).Times(0);
     EXPECT_CALL(*mock_request, SetBeamtimeId(expected_beamtime_id));
     EXPECT_CALL(*mock_request, SetBeamline(expected_beamline));
+    EXPECT_CALL(*mock_request, SetStream(expected_stream));
 
     auto err =  handler.ProcessRequest(mock_request.get());
 
diff --git a/receiver/unittests/test_request_handler_db.cpp b/receiver/unittests/test_request_handler_db.cpp
index 4f9053197..b058c57dd 100644
--- a/receiver/unittests/test_request_handler_db.cpp
+++ b/receiver/unittests/test_request_handler_db.cpp
@@ -62,6 +62,9 @@ class DbHandlerTests : public Test {
     NiceMock<asapo::MockLogger> mock_logger;
     ReceiverConfig config;
     std::string expected_beamtime_id = "beamtime_id";
+    std::string expected_stream = "stream";
+    std::string expected_default_stream = "detector";
+
     void SetUp() override {
         GenericRequestHeader request_header;
         request_header.data_id = 2;
@@ -69,6 +72,8 @@ class DbHandlerTests : public Test {
         handler.log__ = &mock_logger;
         mock_request.reset(new NiceMock<MockRequest> {request_header, 1, ""});
         ON_CALL(*mock_request, GetBeamtimeId()).WillByDefault(ReturnRef(expected_beamtime_id));
+        ON_CALL(*mock_request, GetStream()).WillByDefault(ReturnRef(expected_stream));
+
     }
     void TearDown() override {
         handler.db_client__.release();
@@ -99,14 +104,41 @@ TEST_F(DbHandlerTests, ProcessRequestCallsConnectDbWhenNotConnected) {
     .WillOnce(ReturnRef(expected_beamtime_id))
     ;
 
+    EXPECT_CALL(*mock_request, GetStream())
+        .WillOnce(ReturnRef(expected_stream))
+        ;
 
-    EXPECT_CALL(mock_db, Connect_t("127.0.0.1:27017", expected_beamtime_id, expected_collection_name)).
+
+
+    EXPECT_CALL(mock_db, Connect_t("127.0.0.1:27017", expected_beamtime_id+"_"+expected_stream, expected_collection_name)).
     WillOnce(testing::Return(nullptr));
 
     auto err = handler.ProcessRequest(mock_request.get());
     ASSERT_THAT(err, Eq(nullptr));
 }
 
+TEST_F(DbHandlerTests, ProcessRequestUsesCorrectDbNameForDetector) {
+    config.broker_db_uri = "127.0.0.1:27017";
+    SetReceiverConfig(config, "none");
+
+
+    EXPECT_CALL(*mock_request, GetBeamtimeId())
+        .WillOnce(ReturnRef(expected_beamtime_id))
+        ;
+
+    EXPECT_CALL(*mock_request, GetStream())
+        .WillOnce(ReturnRef(expected_default_stream))
+        ;
+
+
+    EXPECT_CALL(mock_db, Connect_t("127.0.0.1:27017", expected_beamtime_id, expected_collection_name)).
+        WillOnce(testing::Return(nullptr));
+
+    auto err = handler.ProcessRequest(mock_request.get());
+    ASSERT_THAT(err, Eq(nullptr));
+}
+
+
 TEST_F(DbHandlerTests, ProcessRequestReturnsErrorWhenCannotConnect) {
 
     EXPECT_CALL(mock_db, Connect_t(_, _, expected_collection_name)).
diff --git a/receiver/unittests/test_request_handler_db_meta_writer.cpp b/receiver/unittests/test_request_handler_db_meta_writer.cpp
index 61f8c9b62..8e851db8c 100644
--- a/receiver/unittests/test_request_handler_db_meta_writer.cpp
+++ b/receiver/unittests/test_request_handler_db_meta_writer.cpp
@@ -62,6 +62,7 @@ class DbMetaWriterHandlerTests : public Test {
     NiceMock<asapo::MockLogger> mock_logger;
     ReceiverConfig config;
     std::string expected_beamtime_id = "beamtime_id";
+    std::string expected_stream = "stream";
     std::string meta_str =
         R"("info":"stat","int_data":0,"float_data":0.1,"bool":false)";
     const uint8_t* expected_meta = reinterpret_cast<const uint8_t*>(meta_str.c_str());
@@ -90,7 +91,12 @@ TEST_F(DbMetaWriterHandlerTests, CallsUpdate) {
     .WillOnce(ReturnRef(expected_beamtime_id))
     ;
 
-    EXPECT_CALL(mock_db, Connect_t(config.broker_db_uri, expected_beamtime_id, expected_collection_name)).
+    EXPECT_CALL(*mock_request, GetStream())
+        .WillOnce(ReturnRef(expected_stream))
+        ;
+
+
+    EXPECT_CALL(mock_db, Connect_t(config.broker_db_uri, expected_beamtime_id+"_"+expected_stream, expected_collection_name)).
     WillOnce(testing::Return(nullptr));
 
 
diff --git a/receiver/unittests/test_request_handler_db_writer.cpp b/receiver/unittests/test_request_handler_db_writer.cpp
index d4fa6213a..2ba6e289f 100644
--- a/receiver/unittests/test_request_handler_db_writer.cpp
+++ b/receiver/unittests/test_request_handler_db_writer.cpp
@@ -62,6 +62,8 @@ class DbWriterHandlerTests : public Test {
     NiceMock<asapo::MockLogger> mock_logger;
     ReceiverConfig config;
     std::string expected_beamtime_id = "beamtime_id";
+    std::string expected_default_stream = "detector";
+    std::string expected_stream = "stream";
     std::string expected_hostname = "host";
     uint64_t expected_port = 1234;
     uint64_t expected_buf_id = 18446744073709551615ull;
@@ -86,8 +88,9 @@ class DbWriterHandlerTests : public Test {
 
         ON_CALL(*mock_request, GetBeamtimeId()).WillByDefault(ReturnRef(expected_beamtime_id));
     }
-    void ExpectRequestParams(asapo::Opcode op_code);
-    FileInfo PrepareFileInfo();
+    void ExpectRequestParams(asapo::Opcode op_code,const std::string& stream);
+
+        FileInfo PrepareFileInfo();
     void TearDown() override {
         handler.db_client__.release();
     }
@@ -107,16 +110,25 @@ MATCHER_P(CompareFileInfo, file, "") {
 }
 
 
-void DbWriterHandlerTests::ExpectRequestParams(asapo::Opcode op_code) {
+void DbWriterHandlerTests::ExpectRequestParams(asapo::Opcode op_code,const std::string& stream) {
     EXPECT_CALL(*mock_request, GetBeamtimeId())
     .WillOnce(ReturnRef(expected_beamtime_id))
     ;
 
+    EXPECT_CALL(*mock_request, GetStream())
+        .WillOnce(ReturnRef(stream))
+        ;
+
+
     EXPECT_CALL(*mock_request, GetSlotId())
     .WillOnce(Return(expected_buf_id))
     ;
 
-    EXPECT_CALL(mock_db, Connect_t(config.broker_db_uri, expected_beamtime_id, expected_collection_name)).
+    std::string db_name = expected_beamtime_id;
+    if (stream != "detector") {
+        db_name +="_"+stream;
+    }
+    EXPECT_CALL(mock_db, Connect_t(config.broker_db_uri, db_name, expected_collection_name)).
     WillOnce(testing::Return(nullptr));
 
     EXPECT_CALL(*mock_request, GetDataSize())
@@ -147,6 +159,8 @@ void DbWriterHandlerTests::ExpectRequestParams(asapo::Opcode op_code) {
 }
 
 
+
+
 FileInfo DbWriterHandlerTests::PrepareFileInfo() {
     FileInfo file_info;
     file_info.size = expected_file_size;
@@ -160,7 +174,7 @@ FileInfo DbWriterHandlerTests::PrepareFileInfo() {
 
 TEST_F(DbWriterHandlerTests, CallsInsert) {
 
-    ExpectRequestParams(asapo::Opcode::kOpcodeTransferData);
+    ExpectRequestParams(asapo::Opcode::kOpcodeTransferData,expected_stream);
     auto file_info = PrepareFileInfo();
 
     EXPECT_CALL(mock_db, Insert_t(CompareFileInfo(file_info), _)).
@@ -169,6 +183,7 @@ TEST_F(DbWriterHandlerTests, CallsInsert) {
     EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("insert record"),
                                          HasSubstr(config.broker_db_uri),
                                          HasSubstr(expected_beamtime_id),
+                                         HasSubstr(expected_stream),
                                          HasSubstr(expected_collection_name)
                                         )
                                   )
@@ -179,7 +194,7 @@ TEST_F(DbWriterHandlerTests, CallsInsert) {
 
 TEST_F(DbWriterHandlerTests, CallsInsertSubset) {
 
-    ExpectRequestParams(asapo::Opcode::kOpcodeTransferSubsetData);
+    ExpectRequestParams(asapo::Opcode::kOpcodeTransferSubsetData,expected_default_stream);
     auto file_info = PrepareFileInfo();
 
 
diff --git a/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp b/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp
index a75f4dfeb..e77a9024d 100644
--- a/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp
+++ b/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp
@@ -32,7 +32,7 @@ int main(int argc, char* argv[]) {
     auto args = GetArgs(argc, argv);
 
     asapo::Error err;
-    auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.uri, "", "", "", &err);
+    auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.uri, "", asapo::SourceCredentials{"", "", ""}, &err);
     auto server_broker = static_cast<asapo::ServerDataBroker*>(broker.get());
 
     asapo::HttpCode code;
diff --git a/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp b/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp
index afde3cc1f..044ca7c93 100644
--- a/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp
+++ b/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp
@@ -69,7 +69,7 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) {
     auto producer = asapo::Producer::Create(args.receiver_address, 1,
                                             args.mode == 0 ? asapo::RequestHandlerType::kTcp
                                             : asapo::RequestHandlerType::kFilesystem,
-                                            args.beamtime_id, &err);
+                                            asapo::SourceCredentials{args.beamtime_id, "", ""}, &err);
     if (err) {
         std::cerr << "Cannot start producer. ProducerError: " << err << std::endl;
         exit(EXIT_FAILURE);
diff --git a/tests/automatic/worker/next_multithread_broker/next_multithread_broker.cpp b/tests/automatic/worker/next_multithread_broker/next_multithread_broker.cpp
index b4d087649..54291d7fc 100644
--- a/tests/automatic/worker/next_multithread_broker/next_multithread_broker.cpp
+++ b/tests/automatic/worker/next_multithread_broker/next_multithread_broker.cpp
@@ -49,7 +49,7 @@ Args GetArgs(int argc, char* argv[]) {
 
 void TestAll(const Args& args) {
     asapo::Error err;
-    auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, "dummy", args.run_name, args.token, &err);
+    auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, "dummy", asapo::SourceCredentials{args.run_name, "", args.token}, &err);
     auto group_id = broker->GenerateNewGroupId(&err);
     std::vector<asapo::FileInfos>file_infos(args.nthreads);
     auto exec_next = [&](int i) {
diff --git a/tests/automatic/worker/worker_api/worker_api.cpp b/tests/automatic/worker/worker_api/worker_api.cpp
index e2f56e101..82683d07b 100644
--- a/tests/automatic/worker/worker_api/worker_api.cpp
+++ b/tests/automatic/worker/worker_api/worker_api.cpp
@@ -146,7 +146,8 @@ void TestDataset(const std::unique_ptr<asapo::DataBroker>& broker, const std::st
 
 void TestAll(const Args& args) {
     asapo::Error err;
-    auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, ".", args.run_name, args.token, &err);
+    auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, ".",
+                  asapo::SourceCredentials{args.run_name, "", args.token}, &err);
     broker->SetTimeout(100);
     auto group_id = broker->GenerateNewGroupId(&err);
 
diff --git a/tests/manual/performance_broker_receiver/getlast_broker.cpp b/tests/manual/performance_broker_receiver/getlast_broker.cpp
index e230daceb..af57a6e86 100644
--- a/tests/manual/performance_broker_receiver/getlast_broker.cpp
+++ b/tests/manual/performance_broker_receiver/getlast_broker.cpp
@@ -47,8 +47,8 @@ std::vector<std::thread> StartThreads(const Params& params,
     auto exec_next = [&params, nfiles, errors, nbuf, nfiles_total](int i) {
         asapo::FileInfo fi;
         Error err;
-        auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path, params.beamtime_id,
-                      params.token, &err);
+        auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path,
+                      asapo::SourceCredentials{params.beamtime_id, "", params.token}, &err);
         broker->SetTimeout((uint64_t) params.timeout_ms);
         asapo::FileData data;
 
diff --git a/worker/api/cpp/include/worker/data_broker.h b/worker/api/cpp/include/worker/data_broker.h
index 4d6fbac02..9629b3389 100644
--- a/worker/api/cpp/include/worker/data_broker.h
+++ b/worker/api/cpp/include/worker/data_broker.h
@@ -128,7 +128,7 @@ class DataBrokerFactory {
     static std::unique_ptr<DataBroker> CreateFolderBroker(const std::string& source_name,
             Error* error) noexcept;
     static std::unique_ptr<DataBroker> CreateServerBroker(std::string server_name, std::string source_path,
-            std::string beamtime_id, std::string token,
+            SourceCredentials source,
             Error* error) noexcept;
 
 };
diff --git a/worker/api/cpp/src/data_broker.cpp b/worker/api/cpp/src/data_broker.cpp
index f6eac739d..e18bbe133 100644
--- a/worker/api/cpp/src/data_broker.cpp
+++ b/worker/api/cpp/src/data_broker.cpp
@@ -32,10 +32,9 @@ std::unique_ptr<DataBroker> DataBrokerFactory::CreateFolderBroker(const std::str
 };
 
 std::unique_ptr<DataBroker> DataBrokerFactory::CreateServerBroker(std::string server_name, std::string source_path,
-        std::string beamtime_id, std::string token,
+        SourceCredentials source,
         Error* error) noexcept {
-    return Create<ServerDataBroker>(std::move(server_name), error, std::move(source_path), std::move(beamtime_id),
-                                    std::move(token));
+    return Create<ServerDataBroker>(std::move(server_name), error, std::move(source_path), std::move(source));
 }
 
 
diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp
index 8afacb830..09daf5952 100644
--- a/worker/api/cpp/src/server_data_broker.cpp
+++ b/worker/api/cpp/src/server_data_broker.cpp
@@ -35,11 +35,15 @@ Error HttpCodeToWorkerError(const HttpCode& code) {
 
 ServerDataBroker::ServerDataBroker(std::string server_uri,
                                    std::string source_path,
-                                   std::string source_name,
-                                   std::string token) :
+                                   SourceCredentials source) :
     io__{GenerateDefaultIO()}, httpclient__{DefaultHttpClient()},
     net_client__{new TcpClient()},
-server_uri_{std::move(server_uri)}, source_path_{std::move(source_path)}, source_name_{std::move(source_name)}, token_{std::move(token)} {
+server_uri_{std::move(server_uri)}, source_path_{std::move(source_path)}, source_credentials_{std::move(source)} {
+
+    if (source_credentials_.stream.empty()) {
+        source_credentials_.stream = SourceCredentials::kDefaultStream;
+    }
+
 }
 
 Error ServerDataBroker::Connect() {
@@ -76,7 +80,7 @@ void ServerDataBroker::ProcessServerError(Error* err, const std::string& respons
 }
 
 std::string ServerDataBroker::RequestWithToken(std::string uri) {
-    return std::move(uri) + "?token=" + token_;
+    return std::move(uri) + "?token=" + source_credentials_.user_token;
 }
 
 Error ServerDataBroker::ProcessRequest(std::string* response, const RequestInfo& request) {
@@ -117,7 +121,8 @@ Error ServerDataBroker::GetBrokerUri() {
 Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string group_id, GetImageServerOperation op,
                                             bool dataset) {
     std::string request_suffix = OpToUriCmd(op);
-    std::string request_api = "/database/" + source_name_ + "/" + std::move(group_id) + "/";
+    std::string request_api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/" +
+                              std::move(group_id) + "/";
     uint64_t elapsed_ms = 0;
     while (true) {
         auto err = GetBrokerUri();
@@ -263,7 +268,8 @@ std::string ServerDataBroker::BrokerRequestWithTimeout(RequestInfo request, Erro
 
 Error ServerDataBroker::ResetCounter(std::string group_id) {
     RequestInfo ri;
-    ri.api = "/database/" + source_name_ + "/" + std::move(group_id) + "/resetcounter";
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/" + std::move(
+                 group_id) + "/resetcounter";
     ri.post = true;
 
     Error err;
@@ -273,7 +279,7 @@ Error ServerDataBroker::ResetCounter(std::string group_id) {
 
 uint64_t ServerDataBroker::GetNDataSets(Error* err) {
     RequestInfo ri;
-    ri.api = "/database/" + source_name_ + "/size";
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/size";
     auto responce = BrokerRequestWithTimeout(ri, err);
     if (*err) {
         return 0;
@@ -295,7 +301,8 @@ Error ServerDataBroker::GetById(uint64_t id, FileInfo* info, std::string group_i
 Error ServerDataBroker::GetRecordFromServerById(uint64_t id, std::string* response, std::string group_id,
                                                 bool dataset) {
     RequestInfo ri;
-    ri.api = "/database/" + source_name_ + "/" + std::move(group_id) + "/" + std::to_string(id);
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/" + std::move(
+                 group_id) + "/" + std::to_string(id);
     ri.extra_params = "&reset=true";
     if (dataset) {
         ri.extra_params += "&dataset=true";
@@ -308,7 +315,7 @@ Error ServerDataBroker::GetRecordFromServerById(uint64_t id, std::string* respon
 
 std::string ServerDataBroker::GetBeamtimeMeta(Error* err) {
     RequestInfo ri;
-    ri.api = "/database/" + source_name_ + "/0/meta/0";
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/0/meta/0";
 
     return BrokerRequestWithTimeout(ri, err);
 }
@@ -342,7 +349,7 @@ DataSet ServerDataBroker::DecodeDatasetFromResponse(std::string response, Error*
 
 FileInfos ServerDataBroker::QueryImages(std::string query, Error* err) {
     RequestInfo ri;
-    ri.api = "/database/" + source_name_ + "/0/queryimages";
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/0/queryimages";
     ri.post = true;
     ri.body = std::move(query);
 
diff --git a/worker/api/cpp/src/server_data_broker.h b/worker/api/cpp/src/server_data_broker.h
index 3676878ba..e87bb9968 100644
--- a/worker/api/cpp/src/server_data_broker.h
+++ b/worker/api/cpp/src/server_data_broker.h
@@ -27,7 +27,7 @@ struct RequestInfo {
 
 class ServerDataBroker final : public asapo::DataBroker {
   public:
-    explicit ServerDataBroker(std::string server_uri, std::string source_path, std::string source_name, std::string token);
+    explicit ServerDataBroker(std::string server_uri, std::string source_path, SourceCredentials source);
     Error Connect() override;
     Error ResetCounter(std::string group_id) override;
     Error GetNext(FileInfo* info, std::string group_id, FileData* data) override;
@@ -66,8 +66,7 @@ class ServerDataBroker final : public asapo::DataBroker {
     std::string server_uri_;
     std::string current_broker_uri_;
     std::string source_path_;
-    std::string source_name_;
-    std::string token_;
+    SourceCredentials source_credentials_;
     uint64_t timeout_ms_ = 0;
 };
 
diff --git a/worker/api/cpp/unittests/test_server_broker.cpp b/worker/api/cpp/unittests/test_server_broker.cpp
index df027969c..092aa793b 100644
--- a/worker/api/cpp/unittests/test_server_broker.cpp
+++ b/worker/api/cpp/unittests/test_server_broker.cpp
@@ -43,7 +43,7 @@ namespace {
 
 TEST(FolderDataBroker, Constructor) {
     auto data_broker =
-    std::unique_ptr<ServerDataBroker> {new ServerDataBroker("test", "path", "beamtime_id", "token")};
+    std::unique_ptr<ServerDataBroker> {new ServerDataBroker("test", "path", asapo::SourceCredentials{"beamtime_id", "", "token"})};
     ASSERT_THAT(dynamic_cast<asapo::SystemIO*>(data_broker->io__.get()), Ne(nullptr));
     ASSERT_THAT(dynamic_cast<asapo::CurlHttpClient*>(data_broker->httpclient__.get()), Ne(nullptr));
     ASSERT_THAT(dynamic_cast<asapo::TcpClient*>(data_broker->net_client__.get()), Ne(nullptr));
@@ -63,6 +63,8 @@ class ServerDataBrokerTests : public Test {
     std::string expected_filename = "filename";
     std::string expected_full_path = std::string("/tmp/beamline/beamtime") + asapo::kPathSeparator + expected_filename;
     std::string expected_group_id = "groupid";
+    std::string expected_stream = "stream";
+
     std::string expected_metadata = "{\"meta\":1}";
     std::string expected_query_string = "bla";
 
@@ -70,7 +72,7 @@ class ServerDataBrokerTests : public Test {
     static const uint64_t expected_buf_id = 123;
     void SetUp() override {
         data_broker = std::unique_ptr<ServerDataBroker> {
-            new ServerDataBroker(expected_server_uri, expected_path, "beamtime_id", expected_token)
+            new ServerDataBroker(expected_server_uri, expected_path, asapo::SourceCredentials{"beamtime_id", expected_stream, expected_token})
         };
         data_broker->io__ = std::unique_ptr<IO> {&mock_io};
         data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client};
@@ -133,10 +135,37 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsErrorOnWrongInput) {
     ASSERT_THAT(err, Eq(asapo::WorkerErrorTemplates::kWrongInput));
 }
 
+TEST_F(ServerDataBrokerTests, DefaultStreamIsDetector) {
+    data_broker->io__.release();
+    data_broker->httpclient__.release();
+    data_broker->net_client__.release();
+    data_broker = std::unique_ptr<ServerDataBroker> {
+        new ServerDataBroker(expected_server_uri, expected_path, asapo::SourceCredentials{"beamtime_id", "", expected_token})
+    };
+    data_broker->io__ = std::unique_ptr<IO> {&mock_io};
+    data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client};
+    data_broker->net_client__ = std::unique_ptr<asapo::NetClient> {&mock_netclient};
+
+    MockGetBrokerUri();
+
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/detector/" + expected_group_id +
+                                        "/next?token="
+                                        + expected_token, _,
+                                        _)).WillOnce(DoAll(
+                                                SetArgPointee<1>(HttpCode::OK),
+                                                SetArgPointee<2>(nullptr),
+                                                Return("")));
+
+    data_broker->GetNext(&info, expected_group_id, nullptr);
+}
+
+
+
 TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) {
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id + "/next?token="
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+                                        expected_group_id + "/next?token="
                                         + expected_token, _,
                                         _)).WillOnce(DoAll(
                                                 SetArgPointee<1>(HttpCode::OK),
@@ -148,7 +177,8 @@ TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) {
 TEST_F(ServerDataBrokerTests, GetLastUsesCorrectUri) {
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id + "/last?token="
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+                                        expected_group_id + "/last?token="
                                         + expected_token, _,
                                         _)).WillOnce(DoAll(
                                                 SetArgPointee<1>(HttpCode::OK),
@@ -259,7 +289,8 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsEOFFromHttpClientUntilTimeout) {
                 SetArgPointee<2>(nullptr),
                 Return("{\"id\":1}")));
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+                                        expected_group_id
                                         + "/1?token=" + expected_token, _,
                                         _)).Times(AtLeast(1)).WillRepeatedly(DoAll(
                                                     SetArgPointee<1>(HttpCode::Conflict),
@@ -394,7 +425,8 @@ TEST_F(ServerDataBrokerTests, ResetCounterUsesCorrectUri) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
-    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id +
+    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+                                         expected_group_id +
                                          "/resetcounter?token="
                                          + expected_token, _, _, _)).WillOnce(DoAll(
                                                      SetArgPointee<2>(HttpCode::OK),
@@ -409,7 +441,7 @@ TEST_F(ServerDataBrokerTests, GetNDataSetsUsesCorrectUri) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/size?token="
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/size?token="
                                         + expected_token, _, _)).WillOnce(DoAll(
                                                     SetArgPointee<1>(HttpCode::OK),
                                                     SetArgPointee<2>(nullptr),
@@ -425,7 +457,7 @@ TEST_F(ServerDataBrokerTests, GetNDataSetsErrorOnWrongResponce) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/size?token="
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/size?token="
                                         + expected_token, _, _)).WillRepeatedly(DoAll(
                                                     SetArgPointee<1>(HttpCode::Unauthorized),
                                                     SetArgPointee<2>(nullptr),
@@ -441,7 +473,7 @@ TEST_F(ServerDataBrokerTests, GetNDataErrorOnWrongParse) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/size?token="
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/size?token="
                                         + expected_token, _, _)).WillOnce(DoAll(
                                                     SetArgPointee<1>(HttpCode::OK),
                                                     SetArgPointee<2>(nullptr),
@@ -458,7 +490,8 @@ TEST_F(ServerDataBrokerTests, GetByIdUsesCorrectUri) {
     auto to_send = CreateFI();
     auto json = to_send.Json();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/"  + expected_group_id
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/"  +
+                                        expected_group_id
                                         + "/" + std::to_string(
                                             expected_dataset_id) + "?token="
                                         + expected_token + "&reset=true", _,
@@ -479,7 +512,8 @@ TEST_F(ServerDataBrokerTests, GetMetaDataOK) {
     data_broker->SetTimeout(100);
 
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/0/meta/0?token="
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream +
+                                        "/0/meta/0?token="
                                         + expected_token, _,
                                         _)).WillOnce(DoAll(
                                                 SetArgPointee<1>(HttpCode::OK),
@@ -607,7 +641,8 @@ TEST_F(ServerDataBrokerTests, QueryImagesReturnRecords) {
 TEST_F(ServerDataBrokerTests, GetNextDatasetUsesCorrectUri) {
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id + "/next?token="
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+                                        expected_group_id + "/next?token="
                                         + expected_token + "&dataset=true", _,
                                         _)).WillOnce(DoAll(
                                                 SetArgPointee<1>(HttpCode::OK),
@@ -666,7 +701,8 @@ TEST_F(ServerDataBrokerTests, GetDataSetReturnsParseError) {
 TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUri) {
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id + "/last?token="
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+                                        expected_group_id + "/last?token="
                                         + expected_token + "&dataset=true", _,
                                         _)).WillOnce(DoAll(
                                                 SetArgPointee<1>(HttpCode::OK),
@@ -680,7 +716,8 @@ TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUri) {
 TEST_F(ServerDataBrokerTests, GetDatasetByIdUsesCorrectUri) {
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id +
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+                                        expected_group_id +
                                         "/" + std::to_string(expected_dataset_id) + "?token="
                                         + expected_token + "&reset=true&dataset=true", _,
                                         _)).WillOnce(DoAll(
diff --git a/worker/api/cpp/unittests/test_worker_api.cpp b/worker/api/cpp/unittests/test_worker_api.cpp
index c193314aa..7c6fa7952 100644
--- a/worker/api/cpp/unittests/test_worker_api.cpp
+++ b/worker/api/cpp/unittests/test_worker_api.cpp
@@ -45,7 +45,7 @@ TEST_F(DataBrokerFactoryTests, FailCreateDataSourceWithEmptySource) {
 
 TEST_F(DataBrokerFactoryTests, CreateServerDataSource) {
 
-    auto data_broker = DataBrokerFactory::CreateServerBroker("server", "path", "beamtime_id", "token", &error);
+    auto data_broker = DataBrokerFactory::CreateServerBroker("server", "path", asapo::SourceCredentials{"beamtime_id", "", "token"}, &error);
 
     ASSERT_THAT(error, Eq(nullptr));
     ASSERT_THAT(dynamic_cast<ServerDataBroker*>(data_broker.get()), Ne(nullptr));
-- 
GitLab