From fbca258c075cb7a333377c2e8ca1788f18a3c64b Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Thu, 28 Nov 2019 13:58:30 +0100
Subject: [PATCH] update consumer for streams

---
 broker/src/asapo_broker/server/get_id_test.go |   2 +-
 .../src/asapo_broker/server/get_last_test.go  |   2 +-
 .../src/asapo_broker/server/get_meta_test.go  |   2 +-
 .../src/asapo_broker/server/get_next_test.go  |   2 +-
 .../src/asapo_broker/server/get_size_test.go  |   2 +-
 broker/src/asapo_broker/server/listroutes.go  |  14 +-
 .../server/post_query_images_test.go          |   2 +-
 .../server/post_reset_counter_test.go         |   2 +-
 .../server/process_request_test.go            |  17 +--
 common/cpp/include/common/data_structs.h      |   1 +
 .../api/cpp/include/consumer/data_broker.h    |  13 +-
 consumer/api/cpp/src/server_data_broker.cpp   | 111 +++++++++++----
 consumer/api/cpp/src/server_data_broker.h     |  34 ++++-
 .../api/cpp/unittests/test_server_broker.cpp  | 129 ++++++++++++++----
 producer/api/cpp/src/producer_impl.cpp        |   1 -
 producer/api/cpp/src/producer_impl.h          |   1 -
 .../api/cpp/unittests/test_producer_impl.cpp  |   8 +-
 .../broker/check_monitoring/check_linux.sh    |   2 +-
 .../automatic/broker/get_last/check_linux.sh  |  16 +--
 .../broker/get_last/check_windows.bat         |  14 +-
 .../automatic/broker/get_meta/check_linux.sh  |   4 +-
 .../broker/get_meta/check_windows.bat         |   4 +-
 .../automatic/broker/get_next/check_linux.sh  |   8 +-
 .../broker/get_next/check_windows.bat         |   8 +-
 24 files changed, 286 insertions(+), 113 deletions(-)

diff --git a/broker/src/asapo_broker/server/get_id_test.go b/broker/src/asapo_broker/server/get_id_test.go
index 97c9bf5fe..bb3c5b43a 100644
--- a/broker/src/asapo_broker/server/get_id_test.go
+++ b/broker/src/asapo_broker/server/get_id_test.go
@@ -42,7 +42,7 @@ func (suite *GetIDTestSuite) TestGetIdCallsCorrectRoutine() {
 	suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "id", "1").Return([]byte("Hello"), nil)
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request")))
 
-	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedGroupID + "/1" + correctTokenSuffix)
+	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/1" + correctTokenSuffix)
 	suite.Equal(http.StatusOK, w.Code, "GetImage OK")
 	suite.Equal("Hello", string(w.Body.Bytes()), "GetID sends data")
 }
diff --git a/broker/src/asapo_broker/server/get_last_test.go b/broker/src/asapo_broker/server/get_last_test.go
index 3d9d58cd1..60d1ff4e1 100644
--- a/broker/src/asapo_broker/server/get_last_test.go
+++ b/broker/src/asapo_broker/server/get_last_test.go
@@ -36,7 +36,7 @@ func (suite *GetLastTestSuite) TestGetLastCallsCorrectRoutine() {
 	suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "last", "0").Return([]byte("Hello"), nil)
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request last")))
 
-	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedGroupID + "/last" + correctTokenSuffix)
+	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/last" + correctTokenSuffix)
 	suite.Equal(http.StatusOK, w.Code, "GetLast OK")
 	suite.Equal("Hello", string(w.Body.Bytes()), "GetLast sends data")
 }
diff --git a/broker/src/asapo_broker/server/get_meta_test.go b/broker/src/asapo_broker/server/get_meta_test.go
index cc60001fc..aef6f7e2e 100644
--- a/broker/src/asapo_broker/server/get_meta_test.go
+++ b/broker/src/asapo_broker/server/get_meta_test.go
@@ -36,7 +36,7 @@ func (suite *GetMetaTestSuite) TestGetMetaOK() {
 	suite.mock_db.On("ProcessRequest", expectedDBName, "", "meta", "0").Return([]byte("{\"test\":10}"), nil)
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request meta")))
 
-	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/0/meta/0" + correctTokenSuffix)
+	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/default/0/meta/0" + correctTokenSuffix)
 	suite.Equal(http.StatusOK, w.Code, "GetSize OK")
 	suite.Equal("{\"test\":10}", string(w.Body.Bytes()), "GetMeta sends meta")
 }
diff --git a/broker/src/asapo_broker/server/get_next_test.go b/broker/src/asapo_broker/server/get_next_test.go
index be9d664d8..2da6bb2fb 100644
--- a/broker/src/asapo_broker/server/get_next_test.go
+++ b/broker/src/asapo_broker/server/get_next_test.go
@@ -36,7 +36,7 @@ func (suite *GetNextTestSuite) TestGetNextCallsCorrectRoutine() {
 	suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "next", "0").Return([]byte("Hello"), nil)
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next")))
 
-	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix)
+	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/next" + correctTokenSuffix)
 	suite.Equal(http.StatusOK, w.Code, "GetNext OK")
 	suite.Equal("Hello", string(w.Body.Bytes()), "GetNext sends data")
 }
diff --git a/broker/src/asapo_broker/server/get_size_test.go b/broker/src/asapo_broker/server/get_size_test.go
index e5ce8fadc..a52808356 100644
--- a/broker/src/asapo_broker/server/get_size_test.go
+++ b/broker/src/asapo_broker/server/get_size_test.go
@@ -36,7 +36,7 @@ func (suite *GetSizeTestSuite) TestGetSizeOK() {
 	suite.mock_db.On("ProcessRequest", expectedDBName, "", "size", "0").Return([]byte("{\"size\":10}"), nil)
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request size")))
 
-	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/size" + correctTokenSuffix)
+	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/size" + correctTokenSuffix)
 	suite.Equal(http.StatusOK, w.Code, "GetSize OK")
 	suite.Equal("{\"size\":10}", string(w.Body.Bytes()), "GetSize sends size")
 }
diff --git a/broker/src/asapo_broker/server/listroutes.go b/broker/src/asapo_broker/server/listroutes.go
index 76595e407..b14482705 100644
--- a/broker/src/asapo_broker/server/listroutes.go
+++ b/broker/src/asapo_broker/server/listroutes.go
@@ -8,31 +8,31 @@ var listRoutes = utils.Routes{
 	utils.Route{
 		"GetNext",
 		"Get",
-		"/database/{dbname}/{stream}/{groupid}/next",
+		"/database/{dbname}/{stream}/{substream}/{groupid}/next",
 		routeGetNext,
 	},
 	utils.Route{
 		"GetSize",
 		"Get",
-		"/database/{dbname}/{stream}/size",
+		"/database/{dbname}/{stream}/{substream}/size",
 		routeGetSize,
 	},
 	utils.Route{
 		"GetLast",
 		"Get",
-		"/database/{dbname}/{stream}/{groupid}/last",
+		"/database/{dbname}/{stream}/{substream}/{groupid}/last",
 		routeGetLast,
 	},
 	utils.Route{
 		"GetID",
 		"Get",
-		"/database/{dbname}/{stream}/{groupid}/{id}",
+		"/database/{dbname}/{stream}/{substream}/{groupid}/{id}",
 		routeGetByID,
 	},
 	utils.Route{
 		"GetMeta",
 		"Get",
-		"/database/{dbname}/{stream}/0/meta/{id}",
+		"/database/{dbname}/{stream}/default/0/meta/{id}",
 		routeGetMeta,
 	},
 	utils.Route{
@@ -44,13 +44,13 @@ var listRoutes = utils.Routes{
 	utils.Route{
 		"QueryImages",
 		"Post",
-		"/database/{dbname}/{stream}/0/queryimages",
+		"/database/{dbname}/{stream}/{substream}/0/queryimages",
 		routeQueryImages,
 	},
 	utils.Route{
 		"ResetConter",
 		"Post",
-		"/database/{dbname}/{stream}/{groupid}/resetcounter",
+		"/database/{dbname}/{stream}/{substream}/{groupid}/resetcounter",
 		routeResetCounter,
 	},
 	utils.Route{
diff --git a/broker/src/asapo_broker/server/post_query_images_test.go b/broker/src/asapo_broker/server/post_query_images_test.go
index 335a024e3..227f0d8f6 100644
--- a/broker/src/asapo_broker/server/post_query_images_test.go
+++ b/broker/src/asapo_broker/server/post_query_images_test.go
@@ -37,6 +37,6 @@ func (suite *QueryTestSuite) TestQueryOK() {
 	suite.mock_db.On("ProcessRequest", expectedDBName, "", "queryimages", query_str).Return([]byte("{}"), nil)
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request queryimages")))
 
-	w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedStream+"/0/queryimages"+correctTokenSuffix, "POST", query_str)
+	w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedStream+"/"+expectedSubstream+"/0/queryimages"+correctTokenSuffix, "POST", query_str)
 	suite.Equal(http.StatusOK, w.Code, "Query OK")
 }
diff --git a/broker/src/asapo_broker/server/post_reset_counter_test.go b/broker/src/asapo_broker/server/post_reset_counter_test.go
index e6a27a1b4..74cfec68f 100644
--- a/broker/src/asapo_broker/server/post_reset_counter_test.go
+++ b/broker/src/asapo_broker/server/post_reset_counter_test.go
@@ -36,6 +36,6 @@ func (suite *ResetCounterTestSuite) TestResetCounterOK() {
 	suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "resetcounter", "10").Return([]byte(""), nil)
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request resetcounter")))
 
-	w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedStream+"/"+expectedGroupID+"/resetcounter"+correctTokenSuffix+"&value=10", "POST")
+	w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedStream+"/"+expectedSubstream+"/"+expectedGroupID+"/resetcounter"+correctTokenSuffix+"&value=10", "POST")
 	suite.Equal(http.StatusOK, w.Code, "ResetCounter OK")
 }
diff --git a/broker/src/asapo_broker/server/process_request_test.go b/broker/src/asapo_broker/server/process_request_test.go
index a21310e73..219e6a5da 100644
--- a/broker/src/asapo_broker/server/process_request_test.go
+++ b/broker/src/asapo_broker/server/process_request_test.go
@@ -21,6 +21,7 @@ var correctTokenSuffix, wrongTokenSuffix, suffixWithWrongToken, expectedBeamtime
 const expectedGroupID = "bid2a5auidddp1vl71d0"
 const wrongGroupID = "bid2a5auidddp1vl71"
 const expectedStream = "stream"
+const expectedSubstream = "substream"
 
 func prepareTestAuth() {
 	expectedBeamtimeId = "beamtime_id"
@@ -106,7 +107,7 @@ func TestProcessRequestTestSuite(t *testing.T) {
 func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongToken() {
 	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("wrong token")))
 
-	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedGroupID + "/next" + suffixWithWrongToken)
+	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/next" + suffixWithWrongToken)
 
 	suite.Equal(http.StatusUnauthorized, w.Code, "wrong token")
 }
@@ -114,7 +115,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongToken() {
 func (suite *ProcessRequestTestSuite) TestProcessRequestWithNoToken() {
 	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("cannot extract")))
 
-	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedGroupID + "/next" + wrongTokenSuffix)
+	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/next" + wrongTokenSuffix)
 
 	suite.Equal(http.StatusUnauthorized, w.Code, "no token")
 }
@@ -125,7 +126,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName()
 
 	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next")))
 
-	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix)
+	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/next" + correctTokenSuffix)
 
 	suite.Equal(http.StatusBadRequest, w.Code, "wrong database name")
 }
@@ -138,7 +139,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() {
 	ExpectReconnect(suite.mock_db)
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("reconnected")))
 
-	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix)
+	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/next" + correctTokenSuffix)
 	time.Sleep(time.Second)
 	suite.Equal(http.StatusNotFound, w.Code, "wrong database name")
 }
@@ -149,7 +150,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() {
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("reconnected")))
 
 	ExpectReconnect(suite.mock_db)
-	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix)
+	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/next" + correctTokenSuffix)
 	time.Sleep(time.Second)
 
 	suite.Equal(http.StatusNotFound, w.Code, "internal error")
@@ -159,13 +160,13 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() {
 	suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "next", "0").Return([]byte("Hello"), nil)
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName)))
 
-	doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix)
+	doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/next" + correctTokenSuffix)
 	suite.Equal(1, statistics.GetCounter(), "ProcessRequest increases counter")
 }
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestWrongGroupID() {
 	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("wrong groupid")))
-	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + wrongGroupID + "/next" + correctTokenSuffix)
+	w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + wrongGroupID + "/next" + correctTokenSuffix)
 	suite.Equal(http.StatusBadRequest, w.Code, "wrong group id")
 }
 
@@ -173,5 +174,5 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() {
 	suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "next_dataset", "0").Return([]byte("Hello"), nil)
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next_dataset in "+expectedDBName)))
 
-	doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix + "&dataset=true")
+	doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/next" + correctTokenSuffix + "&dataset=true")
 }
diff --git a/common/cpp/include/common/data_structs.h b/common/cpp/include/common/data_structs.h
index 0361f1043..2fb0720d4 100644
--- a/common/cpp/include/common/data_structs.h
+++ b/common/cpp/include/common/data_structs.h
@@ -64,6 +64,7 @@ enum IngestModeFlags : uint64_t {
 
 const uint64_t kDefaultIngestMode = kTransferData | kStoreInFilesystem;
 
+const std::string kDefaultSubstream = "default";
 
 
 }
diff --git a/consumer/api/cpp/include/consumer/data_broker.h b/consumer/api/cpp/include/consumer/data_broker.h
index 7c274eca6..08aa42119 100644
--- a/consumer/api/cpp/include/consumer/data_broker.h
+++ b/consumer/api/cpp/include/consumer/data_broker.h
@@ -17,7 +17,10 @@ class DataBroker {
       \return nullptr of command was successful, otherwise error.
     */
     virtual Error ResetLastReadMarker(std::string group_id) = 0;
+    virtual Error ResetLastReadMarker(std::string group_id, std::string substream) = 0;
+
     virtual Error SetLastReadMarker(uint64_t value, std::string group_id) = 0;
+    virtual Error SetLastReadMarker(uint64_t value, std::string group_id, std::string substream) = 0;
 
     //! Set timeout for broker operations. Default - no timeout
     virtual void SetTimeout(uint64_t timeout_ms) = 0;
@@ -29,6 +32,7 @@ class DataBroker {
       \return number of datasets.
     */
     virtual uint64_t GetCurrentSize(Error* err) = 0;
+    virtual uint64_t GetCurrentSize(std::string substream, Error* err) = 0;
 
     //! Generate new GroupID.
     /*!
@@ -52,6 +56,7 @@ class DataBroker {
       \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise.
     */
     virtual Error GetNext(FileInfo* info, std::string group_id, FileData* data) = 0;
+    virtual Error GetNext(FileInfo* info, std::string group_id, std::string substream, FileData* data) = 0;
 
     //! Retrieves image using fileinfo.
     /*!
@@ -69,6 +74,7 @@ class DataBroker {
       \return DataSet - information about the dataset
     */
     virtual DataSet GetNextDataset(std::string group_id, Error* err) = 0;
+    virtual DataSet GetNextDataset(std::string group_id, std::string substream, Error* err) = 0;
 
     //! Receive last available completed dataset.
     /*!
@@ -77,6 +83,7 @@ class DataBroker {
       \return DataSet - information about the dataset
     */
     virtual DataSet GetLastDataset(std::string group_id, Error* err) = 0;
+    virtual DataSet GetLastDataset(std::string group_id, std::string substream, Error* err) = 0;
 
 
     //! Receive dataset by id.
@@ -87,8 +94,7 @@ class DataBroker {
       \return DataSet - information about the dataset
     */
     virtual DataSet GetDatasetById(uint64_t id, std::string group_id, Error* err) = 0;
-
-
+    virtual DataSet GetDatasetById(uint64_t id, std::string group_id, std::string substream, Error* err) = 0;
 
     //! Receive single image by id.
     /*!
@@ -98,6 +104,7 @@ class DataBroker {
       \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise.
     */
     virtual Error GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) = 0;
+    virtual Error GetById(uint64_t id, FileInfo* info, std::string group_id, std::string substream, FileData* data) = 0;
 
 
     //! Receive last available image.
@@ -108,6 +115,7 @@ class DataBroker {
       \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise.
     */
     virtual Error GetLast(FileInfo* info, std::string group_id, FileData* data) = 0;
+    virtual Error GetLast(FileInfo* info, std::string group_id, std::string substream, FileData* data) = 0;
 
     //! Get all images matching the query.
     /*!
@@ -116,6 +124,7 @@ class DataBroker {
       \return vector of image metadata matchiing to specified query. Empty if nothing found or error
     */
     virtual FileInfos QueryImages(std::string query, Error* err) = 0;
+    virtual FileInfos QueryImages(std::string query, std::string substream, Error* err) = 0;
 
     virtual ~DataBroker() = default; // needed for unique_ptr to delete itself
 };
diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp
index c642c66e8..133c80785 100644
--- a/consumer/api/cpp/src/server_data_broker.cpp
+++ b/consumer/api/cpp/src/server_data_broker.cpp
@@ -149,11 +149,13 @@ RequestInfo ServerDataBroker::PrepareRequestInfo(std::string api_url, bool datas
 }
 
 
-Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string group_id, GetImageServerOperation op,
+Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string group_id, std::string substream,
+                                            GetImageServerOperation op,
                                             bool dataset) {
     std::string request_suffix = OpToUriCmd(op);
-    std::string request_api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/" +
-                              std::move(group_id) + "/";
+    std::string request_api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream
+                              + "/" + std::move(substream) +
+                              + "/" + std::move(group_id) + "/";
     uint64_t elapsed_ms = 0;
     Error no_data_error;
     while (true) {
@@ -186,11 +188,19 @@ Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string g
 }
 
 Error ServerDataBroker::GetNext(FileInfo* info, std::string group_id, FileData* data) {
-    return GetImageFromServer(GetImageServerOperation::GetNext, 0, std::move(group_id), info, data);
+    return GetNext(info, std::move(group_id), kDefaultSubstream, data);
+}
+
+Error ServerDataBroker::GetNext(FileInfo* info, std::string group_id, std::string substream, FileData* data) {
+    return GetImageFromServer(GetImageServerOperation::GetNext, 0, std::move(group_id), std::move(substream), info, data);
 }
 
 Error ServerDataBroker::GetLast(FileInfo* info, std::string group_id, FileData* data) {
-    return GetImageFromServer(GetImageServerOperation::GetLast, 0, std::move(group_id), info, data);
+    return GetLast(info, std::move(group_id), kDefaultSubstream, data);
+}
+
+Error ServerDataBroker::GetLast(FileInfo* info, std::string group_id, std::string substream, FileData* data) {
+    return GetImageFromServer(GetImageServerOperation::GetLast, 0, std::move(group_id), std::move(substream), info, data);
 }
 
 std::string ServerDataBroker::OpToUriCmd(GetImageServerOperation op) {
@@ -205,6 +215,7 @@ std::string ServerDataBroker::OpToUriCmd(GetImageServerOperation op) {
 }
 
 Error ServerDataBroker::GetImageFromServer(GetImageServerOperation op, uint64_t id, std::string group_id,
+                                           std::string substream,
                                            FileInfo* info,
                                            FileData* data) {
     if (info == nullptr) {
@@ -214,9 +225,9 @@ Error ServerDataBroker::GetImageFromServer(GetImageServerOperation op, uint64_t
     Error err;
     std::string response;
     if (op == GetImageServerOperation::GetID) {
-        err = GetRecordFromServerById(id, &response, std::move(group_id));
+        err = GetRecordFromServerById(id, &response, std::move(group_id), std::move(substream));
     } else {
-        err = GetRecordFromServer(&response, std::move(group_id), op);
+        err = GetRecordFromServer(&response, std::move(group_id), std::move(substream), op);
     }
     if (err != nullptr) {
         return err;
@@ -299,24 +310,34 @@ std::string ServerDataBroker::BrokerRequestWithTimeout(RequestInfo request, Erro
 }
 
 Error ServerDataBroker::SetLastReadMarker(uint64_t value, std::string group_id) {
+    return SetLastReadMarker(value, std::move(group_id), kDefaultSubstream);
+}
+
+Error ServerDataBroker::ResetLastReadMarker(std::string group_id) {
+    return ResetLastReadMarker(std::move(group_id), kDefaultSubstream);
+}
+
+Error ServerDataBroker::ResetLastReadMarker(std::string group_id, std::string substream) {
+    return SetLastReadMarker(0, group_id, substream);
+}
+
+Error ServerDataBroker::SetLastReadMarker(uint64_t value, std::string group_id, std::string substream) {
     RequestInfo ri;
-    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/" + std::move(
-                 group_id) + "/resetcounter";
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/"
+             + std::move(substream) + "/" + std::move(group_id) + "/resetcounter";
     ri.extra_params = "&value=" + std::to_string(value);
     ri.post = true;
 
     Error err;
     BrokerRequestWithTimeout(ri, &err);
     return err;
+    asapo::Error();
 }
 
-Error ServerDataBroker::ResetLastReadMarker(std::string group_id) {
-    return SetLastReadMarker(0, group_id);
-}
-
-uint64_t ServerDataBroker::GetCurrentSize(Error* err) {
+uint64_t ServerDataBroker::GetCurrentSize(std::string substream, Error* err) {
     RequestInfo ri;
-    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/size";
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream +
+             + "/" + std::move(substream) + "/size";
     auto responce = BrokerRequestWithTimeout(ri, err);
     if (*err) {
         return 0;
@@ -328,16 +349,31 @@ uint64_t ServerDataBroker::GetCurrentSize(Error* err) {
         return 0;
     }
     return size;
+}
 
+uint64_t ServerDataBroker::GetCurrentSize(Error* err) {
+    return GetCurrentSize(kDefaultSubstream, err);
 }
 Error ServerDataBroker::GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) {
-    return GetImageFromServer(GetImageServerOperation::GetID, id, group_id, info, data);
+    return GetById(id, info, std::move(group_id), kDefaultSubstream, data);
+}
+
+Error ServerDataBroker::GetById(uint64_t id,
+                                FileInfo* info,
+                                std::string group_id,
+                                std::string substream,
+                                FileData* data) {
+    return GetImageFromServer(GetImageServerOperation::GetID, id, group_id, substream, info, data);
 }
 
+
 Error ServerDataBroker::GetRecordFromServerById(uint64_t id, std::string* response, std::string group_id,
+                                                std::string substream,
                                                 bool dataset) {
     RequestInfo ri;
-    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/" + std::move(
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream +
+             + "/" + std::move(substream) +
+             "/" + std::move(
                  group_id) + "/" + std::to_string(id);
     if (dataset) {
         ri.extra_params += "&dataset=true";
@@ -350,7 +386,7 @@ Error ServerDataBroker::GetRecordFromServerById(uint64_t id, std::string* respon
 
 std::string ServerDataBroker::GetBeamtimeMeta(Error* err) {
     RequestInfo ri;
-    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/0/meta/0";
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/default/0/meta/0";
 
     return BrokerRequestWithTimeout(ri, err);
 }
@@ -380,9 +416,10 @@ DataSet ServerDataBroker::DecodeDatasetFromResponse(std::string response, Error*
     return {id, std::move(res)};
 }
 
-FileInfos ServerDataBroker::QueryImages(std::string query, Error* err) {
+FileInfos ServerDataBroker::QueryImages(std::string query, std::string substream, Error* err) {
     RequestInfo ri;
-    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/0/queryimages";
+    ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream +
+             "/" + std::move(substream) + "/0/queryimages";
     ri.post = true;
     ri.body = std::move(query);
 
@@ -395,32 +432,50 @@ FileInfos ServerDataBroker::QueryImages(std::string query, Error* err) {
     return dataset.content;
 }
 
+
+FileInfos ServerDataBroker::QueryImages(std::string query, Error* err) {
+    return QueryImages(std::move(query), kDefaultSubstream, err);
+}
+
 DataSet ServerDataBroker::GetNextDataset(std::string group_id, Error* err) {
-    return GetDatasetFromServer(GetImageServerOperation::GetNext, 0, std::move(group_id), err);
+    return GetNextDataset(std::move(group_id), kDefaultSubstream, err);
+}
+
+DataSet ServerDataBroker::GetNextDataset(std::string group_id, std::string substream, Error* err) {
+    return GetDatasetFromServer(GetImageServerOperation::GetNext, 0, std::move(group_id), std::move(substream), err);
+}
+
+DataSet ServerDataBroker::GetLastDataset(std::string group_id, std::string substream, Error* err) {
+    return GetDatasetFromServer(GetImageServerOperation::GetLast, 0, std::move(group_id), std::move(substream), err);
+}
+
+DataSet ServerDataBroker::GetLastDataset(std::string group_id, Error* err) {
+    return GetLastDataset(std::move(group_id), kDefaultSubstream, err);
 }
 
 DataSet ServerDataBroker::GetDatasetFromServer(GetImageServerOperation op,
                                                uint64_t id,
-                                               std::string group_id,
+                                               std::string group_id, std::string substream,
                                                Error* err) {
     FileInfos infos;
     std::string response;
     if (op == GetImageServerOperation::GetID) {
-        *err = GetRecordFromServerById(id, &response, std::move(group_id), true);
+        *err = GetRecordFromServerById(id, &response, std::move(group_id), std::move(substream), true);
     } else {
-        *err = GetRecordFromServer(&response, std::move(group_id), op, true);
+        *err = GetRecordFromServer(&response, std::move(group_id), std::move(substream), op, true);
     }
     if (*err != nullptr) {
         return {0, FileInfos{}};
     }
     return DecodeDatasetFromResponse(response, err);
 }
-DataSet ServerDataBroker::GetLastDataset(std::string group_id, Error* err) {
-    return GetDatasetFromServer(GetImageServerOperation::GetLast, 0, std::move(group_id), err);
-}
 
 DataSet ServerDataBroker::GetDatasetById(uint64_t id, std::string group_id, Error* err) {
-    return GetDatasetFromServer(GetImageServerOperation::GetID, id, std::move(group_id), err);
+    return GetDatasetById(id, std::move(group_id), kDefaultSubstream, err);
+}
+
+DataSet ServerDataBroker::GetDatasetById(uint64_t id, std::string group_id, std::string substream, Error* err) {
+    return GetDatasetFromServer(GetImageServerOperation::GetID, id, std::move(group_id), std::move(substream), err);
 }
 
 }
diff --git a/consumer/api/cpp/src/server_data_broker.h b/consumer/api/cpp/src/server_data_broker.h
index 53a394a42..7ef4231eb 100644
--- a/consumer/api/cpp/src/server_data_broker.h
+++ b/consumer/api/cpp/src/server_data_broker.h
@@ -30,18 +30,40 @@ class ServerDataBroker final : public asapo::DataBroker {
   public:
     explicit ServerDataBroker(std::string server_uri, std::string source_path, SourceCredentials source);
     Error ResetLastReadMarker(std::string group_id) override;
+    Error ResetLastReadMarker(std::string group_id, std::string substream) override;
+
     Error SetLastReadMarker(uint64_t value, std::string group_id) override;
+    Error SetLastReadMarker(uint64_t value, std::string group_id, std::string substream) override;
+
     Error GetNext(FileInfo* info, std::string group_id, FileData* data) override;
+    Error GetNext(FileInfo* info, std::string group_id, std::string substream, FileData* data) override;
+
     Error GetLast(FileInfo* info, std::string group_id, FileData* data) override;
+    Error GetLast(FileInfo* info, std::string group_id, std::string substream, FileData* data) override;
+
     std::string GenerateNewGroupId(Error* err) override;
     std::string GetBeamtimeMeta(Error* err) override;
+
     uint64_t GetCurrentSize(Error* err) override;
+    uint64_t GetCurrentSize(std::string substream, Error* err) override;
+
+
     Error GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) override;
+    Error GetById(uint64_t id, FileInfo* info, std::string group_id, std::string substream, FileData* data) override;
+
     void SetTimeout(uint64_t timeout_ms) override;
     FileInfos QueryImages(std::string query, Error* err) override;
+    FileInfos QueryImages(std::string query, std::string substream, Error* err) override;
+
     DataSet GetNextDataset(std::string group_id, Error* err) override;
+    DataSet GetNextDataset(std::string group_id, std::string substream, Error* err) override;
+
     DataSet GetLastDataset(std::string group_id, Error* err) override;
+    DataSet GetLastDataset(std::string group_id, std::string substream, Error* err) override;
+
     DataSet GetDatasetById(uint64_t id, std::string group_id, Error* err) override;
+    DataSet GetDatasetById(uint64_t id, std::string group_id, std::string substream, Error* err) override;
+
     Error RetrieveData(FileInfo* info, FileData* data) override;
 
     std::unique_ptr<IO> io__; // modified in testings to mock system calls,otherwise do not touch
@@ -49,14 +71,18 @@ class ServerDataBroker final : public asapo::DataBroker {
     std::unique_ptr<NetClient> net_client__;
   private:
     std::string RequestWithToken(std::string uri);
-    Error GetRecordFromServer(std::string* info, std::string group_id, GetImageServerOperation op, bool dataset = false);
-    Error GetRecordFromServerById(uint64_t id, std::string* info, std::string group_id, bool dataset = false);
+    Error GetRecordFromServer(std::string* info, std::string group_id, std::string substream, GetImageServerOperation op,
+                              bool dataset = false);
+    Error GetRecordFromServerById(uint64_t id, std::string* info, std::string group_id, std::string substream,
+                                  bool dataset = false);
     Error GetDataIfNeeded(FileInfo* info, FileData* data);
     Error GetBrokerUri();
     bool SwitchToGetByIdIfNoData(Error* err, const std::string& response, std::string* redirect_uri);
     Error ProcessRequest(std::string* response, const RequestInfo& request);
-    Error GetImageFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, FileInfo* info, FileData* data);
-    DataSet GetDatasetFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, Error* err);
+    Error GetImageFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, std::string substream,
+                             FileInfo* info, FileData* data);
+    DataSet GetDatasetFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, std::string substream,
+                                 Error* err);
     bool DataCanBeInBuffer(const FileInfo* info);
     Error TryGetDataFromBuffer(const FileInfo* info, FileData* data);
     std::string BrokerRequestWithTimeout(RequestInfo request, Error* err);
diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp
index be8c853c8..e628aa4f1 100644
--- a/consumer/api/cpp/unittests/test_server_broker.cpp
+++ b/consumer/api/cpp/unittests/test_server_broker.cpp
@@ -65,7 +65,7 @@ class ServerDataBrokerTests : public Test {
     std::string expected_full_path = std::string("/tmp/beamline/beamtime") + asapo::kPathSeparator + expected_filename;
     std::string expected_group_id = "groupid";
     std::string expected_stream = "stream";
-
+    std::string expected_substream = "substream";
     std::string expected_metadata = "{\"meta\":1}";
     std::string expected_query_string = "bla";
 
@@ -144,7 +144,8 @@ TEST_F(ServerDataBrokerTests, DefaultStreamIsDetector) {
 
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/detector/" + expected_group_id +
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/detector/default/" + expected_group_id
+                                        +
                                         "/next?token="
                                         + expected_token, _,
                                         _)).WillOnce(DoAll(
@@ -160,8 +161,8 @@ TEST_F(ServerDataBrokerTests, DefaultStreamIsDetector) {
 TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) {
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
-                                        expected_group_id + "/next?token="
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/"
+                                        + expected_group_id + "/next?token="
                                         + expected_token, _,
                                         _)).WillOnce(DoAll(
                                                 SetArgPointee<1>(HttpCode::OK),
@@ -170,10 +171,23 @@ TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) {
     data_broker->GetNext(&info, expected_group_id, nullptr);
 }
 
-TEST_F(ServerDataBrokerTests, GetLastUsesCorrectUri) {
+TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUriWithSubstream) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+                                        expected_substream + "/" + expected_group_id + "/next?token="
+                                        + expected_token, _,
+                                        _)).WillOnce(DoAll(
+                                                SetArgPointee<1>(HttpCode::OK),
+                                                SetArgPointee<2>(nullptr),
+                                                Return("")));
+    data_broker->GetNext(&info, expected_group_id, expected_substream, nullptr);
+}
+
+TEST_F(ServerDataBrokerTests, GetLastUsesCorrectUri) {
+    MockGetBrokerUri();
+
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
                                         expected_group_id + "/last?token="
                                         + expected_token, _,
                                         _)).WillOnce(DoAll(
@@ -319,7 +333,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsNoDataAfterTimeoutEvenIfOtherErrorO
                 SetArgPointee<2>(nullptr),
                 Return("{\"op\":\"get_record_by_id\",\"id\":" + std::to_string(expected_dataset_id) + ",\"id_max\":2}")));
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/"  +
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/"  +
                                         expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token="
                                         + expected_token, _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll(
                                                     SetArgPointee<1>(HttpCode::NotFound),
@@ -495,7 +509,7 @@ TEST_F(ServerDataBrokerTests, ResetCounterByDefaultUsesCorrectUri) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
-    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
                                          expected_group_id +
                                          "/resetcounter?token=" + expected_token + "&value=0", _, _, _)).WillOnce(DoAll(
                                                      SetArgPointee<2>(HttpCode::OK),
@@ -509,7 +523,7 @@ TEST_F(ServerDataBrokerTests, ResetCounterUsesCorrectUri) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
-    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
                                          expected_group_id +
                                          "/resetcounter?token=" + expected_token + "&value=10", _, _, _)).WillOnce(DoAll(
                                                      SetArgPointee<2>(HttpCode::OK),
@@ -520,11 +534,27 @@ TEST_F(ServerDataBrokerTests, ResetCounterUsesCorrectUri) {
 }
 
 
+TEST_F(ServerDataBrokerTests, ResetCounterUsesCorrectUriWithSubstream) {
+    MockGetBrokerUri();
+    data_broker->SetTimeout(100);
+
+    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+                                         expected_substream + "/" +
+                                         expected_group_id +
+                                         "/resetcounter?token=" + expected_token + "&value=10", _, _, _)).WillOnce(DoAll(
+                                                     SetArgPointee<2>(HttpCode::OK),
+                                                     SetArgPointee<3>(nullptr),
+                                                     Return("")));
+    auto err = data_broker->SetLastReadMarker(10, expected_group_id, expected_substream);
+    ASSERT_THAT(err, Eq(nullptr));
+}
+
 TEST_F(ServerDataBrokerTests, GetCurrentSizeUsesCorrectUri) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/size?token="
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream +
+                                        "/default/size?token="
                                         + expected_token, _, _)).WillOnce(DoAll(
                                                     SetArgPointee<1>(HttpCode::OK),
                                                     SetArgPointee<2>(nullptr),
@@ -535,12 +565,29 @@ TEST_F(ServerDataBrokerTests, GetCurrentSizeUsesCorrectUri) {
     ASSERT_THAT(size, Eq(10));
 }
 
+TEST_F(ServerDataBrokerTests, GetCurrentSizeUsesCorrectUriWithSubstream) {
+    MockGetBrokerUri();
+    data_broker->SetTimeout(100);
+
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+                                        expected_substream + "/size?token="
+                                        + expected_token, _, _)).WillOnce(DoAll(
+                                                    SetArgPointee<1>(HttpCode::OK),
+                                                    SetArgPointee<2>(nullptr),
+                                                    Return("{\"size\":10}")));
+    asapo::Error err;
+    auto size = data_broker->GetCurrentSize(expected_substream, &err);
+    ASSERT_THAT(err, Eq(nullptr));
+    ASSERT_THAT(size, Eq(10));
+}
+
 
 TEST_F(ServerDataBrokerTests, GetCurrentSizeErrorOnWrongResponce) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/size?token="
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream +
+                                        "/default/size?token="
                                         + expected_token, _, _)).WillRepeatedly(DoAll(
                                                     SetArgPointee<1>(HttpCode::Unauthorized),
                                                     SetArgPointee<2>(nullptr),
@@ -556,7 +603,8 @@ TEST_F(ServerDataBrokerTests, GetNDataErrorOnWrongParse) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/size?token="
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream +
+                                        "/default/size?token="
                                         + expected_token, _, _)).WillOnce(DoAll(
                                                     SetArgPointee<1>(HttpCode::OK),
                                                     SetArgPointee<2>(nullptr),
@@ -573,7 +621,7 @@ TEST_F(ServerDataBrokerTests, GetByIdUsesCorrectUri) {
     auto to_send = CreateFI();
     auto json = to_send.Json();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/"  +
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/"  +
                                         expected_group_id
                                         + "/" + std::to_string(
                                             expected_dataset_id) + "?token="
@@ -594,7 +642,7 @@ TEST_F(ServerDataBrokerTests, GetByIdTimeouts) {
     MockGetBrokerUri();
     data_broker->SetTimeout(10);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/"  +
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/"  +
                                         expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token="
                                         + expected_token, _, _)).WillOnce(DoAll(
                                                     SetArgPointee<1>(HttpCode::Conflict),
@@ -610,7 +658,7 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsEndOfStream) {
     MockGetBrokerUri();
     data_broker->SetTimeout(10);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/"  +
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/"  +
                                         expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token="
                                         + expected_token, _, _)).WillOnce(DoAll(
                                                     SetArgPointee<1>(HttpCode::Conflict),
@@ -627,7 +675,7 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsEndOfStreamWhenIdTooLarge) {
     MockGetBrokerUri();
     data_broker->SetTimeout(10);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/"  +
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/"  +
                                         expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token="
                                         + expected_token, _, _)).WillOnce(DoAll(
                                                     SetArgPointee<1>(HttpCode::Conflict),
@@ -647,7 +695,7 @@ TEST_F(ServerDataBrokerTests, GetMetaDataOK) {
 
 
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream +
-                                        "/0/meta/0?token="
+                                        "/default/0/meta/0?token="
                                         + expected_token, _,
                                         _)).WillOnce(DoAll(
                                                 SetArgPointee<1>(HttpCode::OK),
@@ -756,10 +804,11 @@ TEST_F(ServerDataBrokerTests, QueryImagesReturnRecords) {
     auto responce_string = "[" + json1 + "," + json2 + "]";
 
 
-    EXPECT_CALL(mock_http_client, Post_t(HasSubstr("queryimages"), expected_query_string, _, _)).WillOnce(DoAll(
-                SetArgPointee<2>(HttpCode::OK),
-                SetArgPointee<3>(nullptr),
-                Return(responce_string)));
+    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0" +
+                                         "/queryimages?token=" + expected_token, expected_query_string, _, _)).WillOnce(DoAll(
+                                                     SetArgPointee<2>(HttpCode::OK),
+                                                     SetArgPointee<3>(nullptr),
+                                                     Return(responce_string)));
 
     data_broker->SetTimeout(100);
     asapo::Error err;
@@ -772,10 +821,29 @@ TEST_F(ServerDataBrokerTests, QueryImagesReturnRecords) {
     ASSERT_THAT(images[1], Eq(rec2));
 }
 
+TEST_F(ServerDataBrokerTests, QueryImagesUsesCorrectUriWithSubstream) {
+
+    MockGetBrokerUri();
+
+    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+                                         expected_substream + "/0" +
+                                         "/queryimages?token=" + expected_token, expected_query_string, _, _)).WillOnce(DoAll(
+                                                     SetArgPointee<2>(HttpCode::OK),
+                                                     SetArgPointee<3>(nullptr),
+                                                     Return("[]")));
+
+    data_broker->SetTimeout(100);
+    asapo::Error err;
+    auto images = data_broker->QueryImages(expected_query_string, expected_substream, &err);
+
+    ASSERT_THAT(err, Eq(nullptr));
+
+}
+
 TEST_F(ServerDataBrokerTests, GetNextDatasetUsesCorrectUri) {
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
                                         expected_group_id + "/next?token="
                                         + expected_token + "&dataset=true", _,
                                         _)).WillOnce(DoAll(
@@ -835,7 +903,7 @@ TEST_F(ServerDataBrokerTests, GetDataSetReturnsParseError) {
 TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUri) {
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
                                         expected_group_id + "/last?token="
                                         + expected_token + "&dataset=true", _,
                                         _)).WillOnce(DoAll(
@@ -846,11 +914,26 @@ TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUri) {
     data_broker->GetLastDataset(expected_group_id, &err);
 }
 
+TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUriWithSubstream) {
+    MockGetBrokerUri();
+
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+                                        expected_substream + "/" +
+                                        expected_group_id + "/last?token="
+                                        + expected_token + "&dataset=true", _,
+                                        _)).WillOnce(DoAll(
+                                                SetArgPointee<1>(HttpCode::OK),
+                                                SetArgPointee<2>(nullptr),
+                                                Return("")));
+    asapo::Error err;
+    data_broker->GetLastDataset(expected_group_id, expected_substream, &err);
+}
+
 
 TEST_F(ServerDataBrokerTests, GetDatasetByIdUsesCorrectUri) {
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
                                         expected_group_id +
                                         "/" + std::to_string(expected_dataset_id) + "?token="
                                         + expected_token + "&dataset=true", _,
diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp
index 7c28af0dd..cf4e60f90 100644
--- a/producer/api/cpp/src/producer_impl.cpp
+++ b/producer/api/cpp/src/producer_impl.cpp
@@ -14,7 +14,6 @@
 namespace  asapo {
 
 const size_t ProducerImpl::kDiscoveryServiceUpdateFrequencyMs = 10000; // 10s
-const std::string ProducerImpl::kDefaultSubstream = "default";
 
 
 ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads, asapo::RequestHandlerType type):
diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h
index 3ab1ebe79..57e3f8904 100644
--- a/producer/api/cpp/src/producer_impl.h
+++ b/producer/api/cpp/src/producer_impl.h
@@ -19,7 +19,6 @@ class ProducerImpl : public Producer {
     std::unique_ptr<RequestHandlerFactory> request_handler_factory_;
   public:
     static const size_t kDiscoveryServiceUpdateFrequencyMs;
-    static const std::string kDefaultSubstream;
 
     explicit ProducerImpl(std::string endpoint, uint8_t n_processing_threads, asapo::RequestHandlerType type);
     ProducerImpl(const ProducerImpl&) = delete;
diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp
index d2d73fe6d..bac3a64cc 100644
--- a/producer/api/cpp/unittests/test_producer_impl.cpp
+++ b/producer/api/cpp/unittests/test_producer_impl.cpp
@@ -168,7 +168,7 @@ TEST_F(ProducerImplTests, UsesDefaultStream) {
                                         expected_id,
                                         expected_size,
                                         expected_name,
-                                        asapo::ProducerImpl::kDefaultSubstream.c_str(),
+                                        asapo::kDefaultSubstream.c_str(),
                                         expected_ingest_mode,
                                         0,
                                         0))).WillOnce(Return(nullptr));
@@ -188,7 +188,7 @@ TEST_F(ProducerImplTests, OKSendingSendDataRequest) {
                                         expected_id,
                                         expected_size,
                                         expected_name,
-                                        asapo::ProducerImpl::kDefaultSubstream.c_str(),
+                                        asapo::kDefaultSubstream.c_str(),
                                         expected_ingest_mode,
                                         0,
                                         0
@@ -228,7 +228,7 @@ TEST_F(ProducerImplTests, OKSendingSendSubsetDataRequest) {
     producer.SetCredentials(expected_credentials);
     EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferSubsetData,
                                         expected_credentials_str, expected_metadata,
-                                        expected_id, expected_size, expected_name, asapo::ProducerImpl::kDefaultSubstream.c_str(),
+                                        expected_id, expected_size, expected_name, asapo::kDefaultSubstream.c_str(),
                                         expected_ingest_mode,
                                         expected_subset_id, expected_subset_size))).WillOnce(
                                             Return(
@@ -301,7 +301,7 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequest) {
                                         expected_id,
                                         0,
                                         expected_name,
-                                        asapo::ProducerImpl::kDefaultSubstream.c_str(),
+                                        asapo::kDefaultSubstream.c_str(),
                                         expected_ingest_mode,
                                         0,
                                         0))).WillOnce(Return(
diff --git a/tests/automatic/broker/check_monitoring/check_linux.sh b/tests/automatic/broker/check_monitoring/check_linux.sh
index 3908c14fc..30060cb0e 100644
--- a/tests/automatic/broker/check_monitoring/check_linux.sh
+++ b/tests/automatic/broker/check_monitoring/check_linux.sh
@@ -29,7 +29,7 @@ groupid=`curl -d '' --silent 127.0.0.1:5005/creategroup`
 
 for i in `seq 1 50`;
 do
-    curl --silent 127.0.0.1:5005/database/data/stream/${groupid}/next?token=$token >/dev/null 2>&1 &
+    curl --silent 127.0.0.1:5005/database/data/stream/substream/${groupid}/next?token=$token >/dev/null 2>&1 &
 done
 
 
diff --git a/tests/automatic/broker/get_last/check_linux.sh b/tests/automatic/broker/get_last/check_linux.sh
index 4417903f6..9922681fa 100644
--- a/tests/automatic/broker/get_last/check_linux.sh
+++ b/tests/automatic/broker/get_last/check_linux.sh
@@ -25,21 +25,21 @@ brokerid=`echo $!`
 
 groupid=`curl -d '' --silent 127.0.0.1:5005/creategroup`
 
-curl -v  --silent 127.0.0.1:5005/database/data/stream/${groupid}/last?token=$token --stderr -
+curl -v  --silent 127.0.0.1:5005/database/data/stream/substream/${groupid}/last?token=$token --stderr -
 
-curl -v  --silent 127.0.0.1:5005/database/data/stream/${groupid}/last?token=$token --stderr - | grep '"_id":2'
-curl -v  --silent 127.0.0.1:5005/database/data/stream/${groupid}/last?token=$token --stderr - | grep '"_id":2'
+curl -v  --silent 127.0.0.1:5005/database/data/stream/substream/${groupid}/last?token=$token --stderr - | grep '"_id":2'
+curl -v  --silent 127.0.0.1:5005/database/data/stream/substream/${groupid}/last?token=$token --stderr - | grep '"_id":2'
 
 echo "db.data_default.insert({"_id":3})" | mongo ${database_name}
 
-curl -v  --silent 127.0.0.1:5005/database/data/stream/${groupid}/last?token=$token --stderr - | grep '"_id":3'
+curl -v  --silent 127.0.0.1:5005/database/data/stream/substream/${groupid}/last?token=$token --stderr - | grep '"_id":3'
 
 echo "db.data_default.insert({"_id":4})" | mongo ${database_name}
 
-curl -v  --silent 127.0.0.1:5005/database/data/stream/${groupid}/next?token=$token --stderr - | grep '"_id":4'
-curl -v  --silent 127.0.0.1:5005/database/data/stream/${groupid}/last?token=$token --stderr - | grep '"_id":4'
+curl -v  --silent 127.0.0.1:5005/database/data/stream/substream/${groupid}/next?token=$token --stderr - | grep '"_id":4'
+curl -v  --silent 127.0.0.1:5005/database/data/stream/substream/${groupid}/last?token=$token --stderr - | grep '"_id":4'
 
 #with a new group
 groupid=`curl -d '' --silent 127.0.0.1:5005/creategroup`
-curl -v  --silent 127.0.0.1:5005/database/data/stream/${groupid}/next?token=$token --stderr - | grep '"_id":1'
-curl -v  --silent 127.0.0.1:5005/database/data/stream/${groupid}/last?token=$token --stderr - | grep '"_id":4'
\ No newline at end of file
+curl -v  --silent 127.0.0.1:5005/database/data/stream/substream/${groupid}/next?token=$token --stderr - | grep '"_id":1'
+curl -v  --silent 127.0.0.1:5005/database/data/stream/substream/${groupid}/last?token=$token --stderr - | grep '"_id":4'
\ No newline at end of file
diff --git a/tests/automatic/broker/get_last/check_windows.bat b/tests/automatic/broker/get_last/check_windows.bat
index c0746966f..824506290 100644
--- a/tests/automatic/broker/get_last/check_windows.bat
+++ b/tests/automatic/broker/get_last/check_windows.bat
@@ -17,22 +17,22 @@ C:\Curl\curl.exe -d '' --silent 127.0.0.1:5005/creategroup > groupid
 set /P groupid=< groupid
 
 
-C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/%groupid%/last?token=%token% --stderr - | findstr /c:\"_id\":2  || goto :error
-C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/%groupid%/last?token=%token% --stderr - | findstr /c:\"_id\":2  || goto :error
+C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/substream/%groupid%/last?token=%token% --stderr - | findstr /c:\"_id\":2  || goto :error
+C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/substream/%groupid%/last?token=%token% --stderr - | findstr /c:\"_id\":2  || goto :error
 
 echo db.data_default.insert({"_id":3}) | %mongo_exe% %database_name%  || goto :error
-C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/%groupid%/last?token=%token% --stderr - | findstr /c:\"_id\":3  || goto :error
+C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/substream/%groupid%/last?token=%token% --stderr - | findstr /c:\"_id\":3  || goto :error
 
 echo db.data_default.insert({"_id":4}) | %mongo_exe% %database_name%  || goto :error
 
-C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/%groupid%/next?token=%token% --stderr - | findstr /c:\"_id\":4  || goto :error
-C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/%groupid%/last?token=%token% --stderr - | findstr /c:\"_id\":4  || goto :error
+C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/substream/%groupid%/next?token=%token% --stderr - | findstr /c:\"_id\":4  || goto :error
+C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/substream/%groupid%/last?token=%token% --stderr - | findstr /c:\"_id\":4  || goto :error
 
 
 C:\Curl\curl.exe -d '' --silent 127.0.0.1:5005/creategroup > groupid
 set /P groupid=< groupid
-C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/%groupid%/next?token=%token% --stderr - | findstr /c:\"_id\":1  || goto :error
-C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/%groupid%/last?token=%token% --stderr - | findstr /c:\"_id\":4  || goto :error
+C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/substream/%groupid%/next?token=%token% --stderr - | findstr /c:\"_id\":1  || goto :error
+C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/substream/%groupid%/last?token=%token% --stderr - | findstr /c:\"_id\":4  || goto :error
 
 
 goto :clean
diff --git a/tests/automatic/broker/get_meta/check_linux.sh b/tests/automatic/broker/get_meta/check_linux.sh
index 9c9b5c23f..f6fb2b23e 100644
--- a/tests/automatic/broker/get_meta/check_linux.sh
+++ b/tests/automatic/broker/get_meta/check_linux.sh
@@ -21,6 +21,6 @@ $1 -config settings.json &
 sleep 0.3
 brokerid=`echo $!`
 
-curl -v  --silent 127.0.0.1:5005/database/test/stream/0/meta/0?token=$token --stderr - | tee /dev/stderr | grep '"data":"test"'
-curl -v  --silent 127.0.0.1:5005/database/test/stream/0/meta/1?token=$token --stderr - | tee /dev/stderr | grep 'no documents'
+curl -v  --silent 127.0.0.1:5005/database/test/stream/default/0/meta/0?token=$token --stderr - | tee /dev/stderr | grep '"data":"test"'
+curl -v  --silent 127.0.0.1:5005/database/test/stream/default/0/meta/1?token=$token --stderr - | tee /dev/stderr | grep 'no documents'
 
diff --git a/tests/automatic/broker/get_meta/check_windows.bat b/tests/automatic/broker/get_meta/check_windows.bat
index d46480cf8..b6b9e1d4d 100644
--- a/tests/automatic/broker/get_meta/check_windows.bat
+++ b/tests/automatic/broker/get_meta/check_windows.bat
@@ -13,8 +13,8 @@ start /B "" "%full_name%" -config settings.json
 
 ping 1.0.0.0 -n 1 -w 100 > nul
 
-C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/0/meta/0?token=%token% --stderr - | findstr /c:\"_id\":0  || goto :error
-C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/0/meta/1?token=%token% --stderr - | findstr /c:"no documents"  || goto :error
+C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/default/0/meta/0?token=%token% --stderr - | findstr /c:\"_id\":0  || goto :error
+C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/default/0/meta/1?token=%token% --stderr - | findstr /c:"no documents"  || goto :error
 
 
 goto :clean
diff --git a/tests/automatic/broker/get_next/check_linux.sh b/tests/automatic/broker/get_next/check_linux.sh
index f65a59a48..e40f26095 100644
--- a/tests/automatic/broker/get_next/check_linux.sh
+++ b/tests/automatic/broker/get_next/check_linux.sh
@@ -23,10 +23,10 @@ sleep 0.3
 brokerid=`echo $!`
 
 groupid=`curl -d '' --silent 127.0.0.1:5005/creategroup`
-curl -v  --silent 127.0.0.1:5005/database/data/stream/${groupid}/next?token=$token --stderr - | tee /dev/stderr  | grep '"_id":1'
-curl -v  --silent 127.0.0.1:5005/database/data/stream/${groupid}/next?token=$token --stderr - | tee /dev/stderr  | grep '"_id":2'
-curl -v  --silent 127.0.0.1:5005/database/data/stream/${groupid}/next?token=$token --stderr - | tee /dev/stderr  | grep '"id_max":2'
+curl -v  --silent 127.0.0.1:5005/database/data/stream/substream/${groupid}/next?token=$token --stderr - | tee /dev/stderr  | grep '"_id":1'
+curl -v  --silent 127.0.0.1:5005/database/data/stream/substream/${groupid}/next?token=$token --stderr - | tee /dev/stderr  | grep '"_id":2'
+curl -v  --silent 127.0.0.1:5005/database/data/stream/substream/${groupid}/next?token=$token --stderr - | tee /dev/stderr  | grep '"id_max":2'
 
 # with a new group
 groupid=`curl -d '' --silent 127.0.0.1:5005/creategroup`
-curl -v  --silent 127.0.0.1:5005/database/data/stream/${groupid}/next?token=$token --stderr - | tee /dev/stderr | grep '"_id":1'
\ No newline at end of file
+curl -v  --silent 127.0.0.1:5005/database/data/stream/substream/${groupid}/next?token=$token --stderr - | tee /dev/stderr | grep '"_id":1'
\ No newline at end of file
diff --git a/tests/automatic/broker/get_next/check_windows.bat b/tests/automatic/broker/get_next/check_windows.bat
index e108767a8..2a7c1d877 100644
--- a/tests/automatic/broker/get_next/check_windows.bat
+++ b/tests/automatic/broker/get_next/check_windows.bat
@@ -16,13 +16,13 @@ ping 1.0.0.0 -n 1 -w 100 > nul
 
 C:\Curl\curl.exe -d '' --silent 127.0.0.1:5005/creategroup > groupid
 set /P groupid=< groupid
-C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/%groupid%/next?token=%token% --stderr - | findstr /c:\"_id\":1  || goto :error
-C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/%groupid%/next?token=%token% --stderr - | findstr /c:\"_id\":2  || goto :error
-C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/%groupid%/next?token=%token% --stderr - | findstr  /c:\"id_max\":2  || goto :error
+C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/substream/%groupid%/next?token=%token% --stderr - | findstr /c:\"_id\":1  || goto :error
+C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/substream/%groupid%/next?token=%token% --stderr - | findstr /c:\"_id\":2  || goto :error
+C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/substream/%groupid%/next?token=%token% --stderr - | findstr  /c:\"id_max\":2  || goto :error
 
 C:\Curl\curl.exe -d '' --silent 127.0.0.1:5005/creategroup > groupid
 set /P groupid=< groupid
-C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/%groupid%/next?token=%token% --stderr - | findstr /c:\"_id\":1  || goto :error
+C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/substream/%groupid%/next?token=%token% --stderr - | findstr /c:\"_id\":1  || goto :error
 
 goto :clean
 
-- 
GitLab