From 5b86e524f7bba66e9da4fda1534907d986d525c5 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Tue, 26 Jun 2018 13:23:23 +0200
Subject: [PATCH] start adding tokens to worker

---
 CMakeModules/prepare_asapo.cmake              |  1 +
 asapo_tools/src/asapo_tools/cli/token.go      |  6 +-
 broker/src/asapo_broker/server/get_id.go      |  5 ++
 broker/src/asapo_broker/server/get_id_test.go | 19 +++---
 broker/src/asapo_broker/server/get_next.go    |  6 ++
 .../src/asapo_broker/server/get_next_test.go  | 60 +++++++++++++++----
 .../src/asapo_broker/server/request_common.go | 30 ++++++++++
 broker/src/asapo_broker/server/server.go      |  3 +
 .../asapo_broker/server/server_nottested.go   | 18 ++++++
 .../src/asapo_common/utils/authorization.go   |  5 ++
 common/go/src/asapo_common/utils/helpers.go   | 16 +++++
 config/nomad/broker.nmd.in                    |  6 ++
 examples/worker/getnext_broker/check_linux.sh |  5 +-
 .../worker/getnext_broker/check_windows.bat   |  3 +-
 .../worker/getnext_broker/getnext_broker.cpp  | 40 ++++++++-----
 .../broker/check_monitoring/CMakeLists.txt    |  4 +-
 .../broker/check_monitoring/check_linux.sh    |  7 ++-
 .../automatic/broker/get_next/CMakeLists.txt  |  4 +-
 .../automatic/broker/get_next/check_linux.sh  | 12 ++--
 .../broker/get_next/check_windows.bat         | 12 +++-
 .../broker/read_config/CMakeLists.txt         |  1 +
 .../curl_httpclient_command.cpp               |  2 +-
 .../full_chain/simple_chain/CMakeLists.txt    |  2 +-
 .../full_chain/simple_chain/check_linux.sh    |  4 +-
 .../full_chain/simple_chain/check_windows.bat |  5 +-
 tests/automatic/settings/broker_secret.key    |  1 +
 tests/automatic/settings/broker_settings.json |  3 +-
 .../settings/broker_settings.json.tpl         |  3 +-
 .../next_multithread_broker/check_linux.sh    |  3 +-
 .../next_multithread_broker/check_windows.bat |  3 +-
 .../next_multithread_broker.cpp               | 10 +++-
 tests/manual/performance_broker/settings.json |  3 +-
 tests/manual/performance_broker/test.sh       | 13 +++-
 worker/api/cpp/include/worker/data_broker.h   |  5 +-
 worker/api/cpp/src/data_broker.cpp            |  8 +--
 worker/api/cpp/src/server_data_broker.cpp     | 16 +++--
 worker/api/cpp/src/server_data_broker.h       |  4 +-
 .../api/cpp/unittests/test_server_broker.cpp  | 34 ++++++++---
 worker/api/cpp/unittests/test_worker_api.cpp  |  3 +-
 39 files changed, 292 insertions(+), 93 deletions(-)
 create mode 100644 broker/src/asapo_broker/server/request_common.go
 create mode 100644 tests/automatic/settings/broker_secret.key

diff --git a/CMakeModules/prepare_asapo.cmake b/CMakeModules/prepare_asapo.cmake
index 2f14a215a..86fbf0e39 100644
--- a/CMakeModules/prepare_asapo.cmake
+++ b/CMakeModules/prepare_asapo.cmake
@@ -17,6 +17,7 @@ function(prepare_asapo)
     configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/discovery_settings.json.tpl discovery.json.tpl COPYONLY)
     configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/authorizer_settings.json.tpl authorizer.json.tpl COPYONLY)
     configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/broker_settings.json.tpl broker.json.tpl COPYONLY)
+    configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/broker_secret.key broker_secret.key COPYONLY)
     configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/nginx.conf.tpl nginx.conf.tpl COPYONLY)
     configure_file(${CMAKE_SOURCE_DIR}/config/nomad/nginx.nmd.in nginx.nmd @ONLY)
 
diff --git a/asapo_tools/src/asapo_tools/cli/token.go b/asapo_tools/src/asapo_tools/cli/token.go
index 6e9fae44a..7fdb749d2 100644
--- a/asapo_tools/src/asapo_tools/cli/token.go
+++ b/asapo_tools/src/asapo_tools/cli/token.go
@@ -37,14 +37,12 @@ func (cmd *command) CommandToken() error {
 		return err
 	}
 
-	strings, err := utils.ReadStringsFromFile(flags.SecretFile)
+	secret, err := utils.ReadFirstStringFromFile(flags.SecretFile)
 	if err !=nil  {
 		return err
 	}
 
-
-
-	fmt.Fprintf(outBuf, "%s\n", generateToken(flags.BeamtimeID,strings[0]))
+	fmt.Fprintf(outBuf, "%s\n", generateToken(flags.BeamtimeID,secret))
 
 	return nil
 }
diff --git a/broker/src/asapo_broker/server/get_id.go b/broker/src/asapo_broker/server/get_id.go
index 4b8a48090..f66d61970 100644
--- a/broker/src/asapo_broker/server/get_id.go
+++ b/broker/src/asapo_broker/server/get_id.go
@@ -31,6 +31,11 @@ func routeGetByID(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
+	if err := testAuth(r, db_name); err != nil {
+		writeAuthAnswer(w, "get id", db_name, err.Error())
+		return
+	}
+
 	answer, code := getRecordByID(db_name, id)
 	w.WriteHeader(code)
 	w.Write(answer)
diff --git a/broker/src/asapo_broker/server/get_id_test.go b/broker/src/asapo_broker/server/get_id_test.go
index 3f3370f9c..0e18c932a 100644
--- a/broker/src/asapo_broker/server/get_id_test.go
+++ b/broker/src/asapo_broker/server/get_id_test.go
@@ -28,6 +28,7 @@ type GetIDTestSuite struct {
 }
 
 func (suite *GetIDTestSuite) SetupTest() {
+	prepareTestAuth()
 	statistics.Reset()
 	suite.mock_db = new(database.MockedDatabase)
 	db = suite.mock_db
@@ -46,29 +47,29 @@ func TestGetIDTestSuite(t *testing.T) {
 }
 
 func (suite *GetIDTestSuite) TestGetIDWithWrongDatabaseName() {
-	suite.mock_db.On("GetRecordByID", "foo", 1).Return([]byte(""),
+	suite.mock_db.On("GetRecordByID", expectedBeamtimeId, 1).Return([]byte(""),
 		&database.DBError{utils.StatusWrongInput, ""})
 
-	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("get id request in foo")))
+	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("get id request in")))
 
-	w := doRequest("/database/foo/1")
+	w := doRequest("/database/" + expectedBeamtimeId + "/1" + correctTokenSuffix)
 
 	suite.Equal(http.StatusBadRequest, w.Code, "wrong database name")
 }
 
 func (suite *GetIDTestSuite) TestGetIDWithInternalDBError() {
-	suite.mock_db.On("GetRecordByID", "foo", 1).Return([]byte(""), errors.New(""))
-	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("get id request in foo")))
+	suite.mock_db.On("GetRecordByID", expectedBeamtimeId, 1).Return([]byte(""), errors.New(""))
+	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("get id request in")))
 
-	w := doRequest("/database/foo/1")
+	w := doRequest("/database/" + expectedBeamtimeId + "/1" + correctTokenSuffix)
 	suite.Equal(http.StatusInternalServerError, w.Code, "internal error")
 }
 
 func (suite *GetIDTestSuite) TestGetIDOK() {
-	suite.mock_db.On("GetRecordByID", "dbname", 1).Return([]byte("Hello"), nil)
-	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("get id request in dbname")))
+	suite.mock_db.On("GetRecordByID", expectedBeamtimeId, 1).Return([]byte("Hello"), nil)
+	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("get id request in")))
 
-	w := doRequest("/database/dbname/1")
+	w := doRequest("/database/" + expectedBeamtimeId + "/1" + correctTokenSuffix)
 	suite.Equal(http.StatusOK, w.Code, "GetID OK")
 	suite.Equal("Hello", string(w.Body.Bytes()), "GetID sends data")
 }
diff --git a/broker/src/asapo_broker/server/get_next.go b/broker/src/asapo_broker/server/get_next.go
index 77ff12977..5a42c3044 100644
--- a/broker/src/asapo_broker/server/get_next.go
+++ b/broker/src/asapo_broker/server/get_next.go
@@ -21,6 +21,12 @@ func routeGetNext(w http.ResponseWriter, r *http.Request) {
 		w.WriteHeader(http.StatusBadRequest)
 		return
 	}
+
+	if err := testAuth(r, db_name); err != nil {
+		writeAuthAnswer(w, "get next", db_name, err.Error())
+		return
+	}
+
 	answer, code := getNextRecord(db_name)
 	w.WriteHeader(code)
 	w.Write(answer)
diff --git a/broker/src/asapo_broker/server/get_next_test.go b/broker/src/asapo_broker/server/get_next_test.go
index dcc749e35..268390973 100644
--- a/broker/src/asapo_broker/server/get_next_test.go
+++ b/broker/src/asapo_broker/server/get_next_test.go
@@ -14,6 +14,20 @@ import (
 	"testing"
 )
 
+var correctTokenSuffix, wrongTokenSuffix, suffixWithWrongToken, expectedBeamtimeId string
+
+func prepareTestAuth() {
+	expectedBeamtimeId = "beamtime_id"
+	auth = utils.NewHMACAuth("secret")
+	token, err := auth.GenerateToken(&expectedBeamtimeId)
+	if err != nil {
+		panic(err)
+	}
+	correctTokenSuffix = "?token=" + token
+	wrongTokenSuffix = "?blablabla=aa"
+	suffixWithWrongToken = "?token=blabla"
+}
+
 type request struct {
 	path    string
 	cmd     string
@@ -52,8 +66,8 @@ func (suite *GetNextTestSuite) SetupTest() {
 	statistics.Reset()
 	suite.mock_db = new(database.MockedDatabase)
 	db = suite.mock_db
+	prepareTestAuth()
 	logger.SetMockLog()
-	ExpectCopyClose(suite.mock_db)
 }
 
 func (suite *GetNextTestSuite) TearDownTest() {
@@ -66,38 +80,58 @@ func TestGetNextTestSuite(t *testing.T) {
 	suite.Run(t, new(GetNextTestSuite))
 }
 
+func (suite *GetNextTestSuite) TestGetNextWithWrongToken() {
+	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("wrong token")))
+
+	w := doRequest("/database/" + expectedBeamtimeId + "/next" + suffixWithWrongToken)
+
+	suite.Equal(http.StatusUnauthorized, w.Code, "wrong token")
+}
+
+func (suite *GetNextTestSuite) TestGetNextWithNoToken() {
+	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("cannot extract")))
+
+	w := doRequest("/database/" + expectedBeamtimeId + "/next" + wrongTokenSuffix)
+
+	suite.Equal(http.StatusUnauthorized, w.Code, "no token")
+}
+
 func (suite *GetNextTestSuite) TestGetNextWithWrongDatabaseName() {
-	suite.mock_db.On("GetNextRecord", "foo").Return([]byte(""),
+	suite.mock_db.On("GetNextRecord", expectedBeamtimeId).Return([]byte(""),
 		&database.DBError{utils.StatusWrongInput, ""})
 
-	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("get next request in foo")))
+	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("get next request")))
+	ExpectCopyClose(suite.mock_db)
 
-	w := doRequest("/database/foo/next")
+	w := doRequest("/database/" + expectedBeamtimeId + "/next" + correctTokenSuffix)
 
 	suite.Equal(http.StatusBadRequest, w.Code, "wrong database name")
 }
 
 func (suite *GetNextTestSuite) TestGetNextWithInternalDBError() {
-	suite.mock_db.On("GetNextRecord", "foo").Return([]byte(""), errors.New(""))
-	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("get next request in foo")))
+	suite.mock_db.On("GetNextRecord", expectedBeamtimeId).Return([]byte(""), errors.New(""))
+	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("get next request")))
+	ExpectCopyClose(suite.mock_db)
 
-	w := doRequest("/database/foo/next")
+	w := doRequest("/database/" + expectedBeamtimeId + "/next" + correctTokenSuffix)
 	suite.Equal(http.StatusInternalServerError, w.Code, "internal error")
 }
 
 func (suite *GetNextTestSuite) TestGetNextWithGoodDatabaseName() {
-	suite.mock_db.On("GetNextRecord", "dbname").Return([]byte("Hello"), nil)
-	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("get next request in dbname")))
+	suite.mock_db.On("GetNextRecord", expectedBeamtimeId).Return([]byte("Hello"), nil)
+	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("get next request")))
+	ExpectCopyClose(suite.mock_db)
 
-	w := doRequest("/database/dbname/next")
+	w := doRequest("/database/" + expectedBeamtimeId + "/next" + correctTokenSuffix)
 	suite.Equal(http.StatusOK, w.Code, "GetNext OK")
 	suite.Equal("Hello", string(w.Body.Bytes()), "GetNext sends data")
 }
 
 func (suite *GetNextTestSuite) TestGetNextAddsCounter() {
-	suite.mock_db.On("GetNextRecord", "dbname").Return([]byte("Hello"), nil)
-	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("get next request in dbname")))
+	suite.mock_db.On("GetNextRecord", expectedBeamtimeId).Return([]byte("Hello"), nil)
+	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("get next request in "+expectedBeamtimeId)))
+	ExpectCopyClose(suite.mock_db)
 
-	doRequest("/database/dbname/next")
+	doRequest("/database/" + expectedBeamtimeId + "/next" + correctTokenSuffix)
 	suite.Equal(1, statistics.GetCounter(), "GetNext increases counter")
 }
diff --git a/broker/src/asapo_broker/server/request_common.go b/broker/src/asapo_broker/server/request_common.go
new file mode 100644
index 000000000..775ddd9bd
--- /dev/null
+++ b/broker/src/asapo_broker/server/request_common.go
@@ -0,0 +1,30 @@
+package server
+
+import (
+	"asapo_common/logger"
+	"errors"
+	"net/http"
+)
+
+func writeAuthAnswer(w http.ResponseWriter, requestName string, db_name string, err string) {
+	log_str := "processing " + requestName + " request in " + db_name + " at " + settings.BrokerDbAddress
+	logger.Error(log_str + " - " + err)
+	w.WriteHeader(http.StatusUnauthorized)
+	w.Write([]byte(err))
+}
+
+func testAuth(r *http.Request, beamtime_id string) error {
+	token_got := r.URL.Query().Get("token")
+
+	if len(token_got) == 0 {
+		return errors.New("cannot extract token from request")
+	}
+
+	token_expect, _ := auth.GenerateToken(&beamtime_id)
+
+	if token_got != token_expect {
+		return errors.New("wrong token")
+	}
+
+	return nil
+}
diff --git a/broker/src/asapo_broker/server/server.go b/broker/src/asapo_broker/server/server.go
index d1073b632..39213b7b1 100644
--- a/broker/src/asapo_broker/server/server.go
+++ b/broker/src/asapo_broker/server/server.go
@@ -2,6 +2,7 @@ package server
 
 import (
 	"asapo_broker/database"
+	"asapo_common/utils"
 )
 
 var db database.Agent
@@ -10,12 +11,14 @@ type serverSettings struct {
 	BrokerDbAddress  string
 	MonitorDbAddress string
 	MonitorDbName    string
+	SecretFile       string
 	Port             int
 	LogLevel         string
 }
 
 var settings serverSettings
 var statistics serverStatistics
+var auth utils.Auth
 
 func InitDB(dbAgent database.Agent) error {
 	db = dbAgent
diff --git a/broker/src/asapo_broker/server/server_nottested.go b/broker/src/asapo_broker/server/server_nottested.go
index 60a316df9..c9febf74b 100644
--- a/broker/src/asapo_broker/server/server_nottested.go
+++ b/broker/src/asapo_broker/server/server_nottested.go
@@ -23,6 +23,14 @@ func Start() {
 	log.Fatal(http.ListenAndServe(":"+strconv.Itoa(settings.Port), http.HandlerFunc(mux.ServeHTTP)))
 }
 
+func createAuth() (utils.Auth, error) {
+	secret, err := utils.ReadFirstStringFromFile(settings.SecretFile)
+	if err != nil {
+		return nil, err
+	}
+	return utils.NewHMACAuth(secret), nil
+}
+
 func ReadConfig(fname string) (log.Level, error) {
 	if err := utils.ReadJsonFromFile(fname, &settings); err != nil {
 		return log.FatalLevel, err
@@ -44,6 +52,16 @@ func ReadConfig(fname string) (log.Level, error) {
 		return log.FatalLevel, errors.New("MonitorDbName not set")
 	}
 
+	if settings.SecretFile == "" {
+		return log.FatalLevel, errors.New("Secret file not set")
+	}
+
+	var err error
+	auth, err = createAuth()
+	if err != nil {
+		return log.FatalLevel, err
+	}
+
 	level, err := log.LevelFromString(settings.LogLevel)
 
 	return level, err
diff --git a/common/go/src/asapo_common/utils/authorization.go b/common/go/src/asapo_common/utils/authorization.go
index a79bb6808..48ba8e4b8 100644
--- a/common/go/src/asapo_common/utils/authorization.go
+++ b/common/go/src/asapo_common/utils/authorization.go
@@ -182,6 +182,11 @@ func NewHMACAuth(key string) *HMACAuth {
 	return &a
 }
 
+func (a *HMACAuth) Name() string {
+	return "Bearer"
+}
+
+
 func generateHMACToken(value string, key string) string {
 	mac := hmac.New(sha256.New, []byte(key))
 	mac.Write([]byte(value))
diff --git a/common/go/src/asapo_common/utils/helpers.go b/common/go/src/asapo_common/utils/helpers.go
index 72b7afd83..94f0fdfa6 100644
--- a/common/go/src/asapo_common/utils/helpers.go
+++ b/common/go/src/asapo_common/utils/helpers.go
@@ -4,6 +4,7 @@ import (
 	json "encoding/json"
 	"io/ioutil"
 	"strings"
+	"errors"
 )
 
 func StringInSlice(a string, list []string) bool {
@@ -49,6 +50,21 @@ func ReadStringsFromFile(fname string) ([]string, error) {
 	return lines,nil
 }
 
+
+func ReadFirstStringFromFile(fname string) (string, error) {
+	lines,err  := ReadStringsFromFile(fname)
+	if err != nil {
+		return "",err
+	}
+
+	if len(lines)==0 {
+		return "",errors.New("empty file")
+	}
+
+	return lines[0],nil
+}
+
+
 func MapToStruct(m map[string]interface{}, val interface{}) error {
 	tmp, err := json.Marshal(m)
 	if err != nil {
diff --git a/config/nomad/broker.nmd.in b/config/nomad/broker.nmd.in
index 1d968aa42..211c71e21 100644
--- a/config/nomad/broker.nmd.in
+++ b/config/nomad/broker.nmd.in
@@ -44,6 +44,12 @@ job "broker" {
          change_signal = "SIGHUP"
       }
 
+      template {
+         source        = "@WORK_DIR@/broker_secret.key"
+         destination   = "broker_secret.key"
+         change_mode   = "signal"
+         change_signal = "SIGHUP"
+      }
     }
   }
 }
diff --git a/examples/worker/getnext_broker/check_linux.sh b/examples/worker/getnext_broker/check_linux.sh
index 18fe844f5..8455966d0 100644
--- a/examples/worker/getnext_broker/check_linux.sh
+++ b/examples/worker/getnext_broker/check_linux.sh
@@ -1,6 +1,7 @@
 #!/usr/bin/env bash
 
 database_name=test_run
+token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo=
 
 set -e
 
@@ -23,5 +24,7 @@ do
 	echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1})' | mongo ${database_name}
 done
 
-$@ 127.0.0.1:8400 $database_name 2 | grep "Processed 3 file(s)"
+$@ 127.0.0.1:8400 $database_name 2 $token_test_run | grep "Processed 3 file(s)"
+
+
 
diff --git a/examples/worker/getnext_broker/check_windows.bat b/examples/worker/getnext_broker/check_windows.bat
index 891e876ad..d96b652d8 100644
--- a/examples/worker/getnext_broker/check_windows.bat
+++ b/examples/worker/getnext_broker/check_windows.bat
@@ -1,5 +1,6 @@
 SET database_name=test_run
 SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe"
+set token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo=
 
 c:\opt\consul\nomad run discovery.nmd
 c:\opt\consul\nomad run broker.nmd
@@ -10,7 +11,7 @@ ping 1.0.0.0 -n 10 -w 100 > nul
 for /l %%x in (1, 1, 3) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1}) | %mongo_exe% %database_name%  || goto :error
 
 
-"%1" 127.0.0.1:8400 %database_name% 1 | findstr /c:"Processed 3 file" || goto :error
+"%1" 127.0.0.1:8400 %database_name% 1 %token_test_run% | findstr /c:"Processed 3 file" || goto :error
 goto :clean
 
 :error
diff --git a/examples/worker/getnext_broker/getnext_broker.cpp b/examples/worker/getnext_broker/getnext_broker.cpp
index 8655fdbb4..46648a2db 100644
--- a/examples/worker/getnext_broker/getnext_broker.cpp
+++ b/examples/worker/getnext_broker/getnext_broker.cpp
@@ -12,6 +12,13 @@
 using std::chrono::high_resolution_clock;
 using asapo::Error;
 
+struct Params {
+    std::string server;
+    std::string beamtime_id;
+    std::string token;
+    int nthreads;
+};
+
 void WaitThreads(std::vector<std::thread>* threads) {
     for (auto& thread : *threads) {
         thread.join();
@@ -28,12 +35,11 @@ int ProcessError(const Error& err) {
     return 0;
 }
 
-std::vector<std::thread> StartThreads(const std::string& server, const std::string& run_name, int nthreads,
-                                      std::vector<int>* nfiles, std::vector<int>* errors) {
-    auto exec_next = [server, run_name, nfiles, errors](int i) {
+std::vector<std::thread> StartThreads(const Params& params, std::vector<int>* nfiles, std::vector<int>* errors) {
+    auto exec_next = [&params, nfiles, errors](int i) {
         asapo::FileInfo fi;
         Error err;
-        auto broker = asapo::DataBrokerFactory::CreateServerBroker(server, run_name, &err);
+        auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.beamtime_id, params.token, &err);
         broker->SetTimeout(10000);
         while ((err = broker->GetNext(&fi, nullptr)) == nullptr) {
             (*nfiles)[i] ++;
@@ -42,24 +48,25 @@ std::vector<std::thread> StartThreads(const std::string& server, const std::stri
     };
 
     std::vector<std::thread> threads;
-    for (int i = 0; i < nthreads; i++) {
+    for (int i = 0; i < params.nthreads; i++) {
         threads.emplace_back(std::thread(exec_next, i));
     }
     return threads;
 }
 
-int ReadAllData(const std::string& server, const std::string& run_name, int nthreads, uint64_t* duration_ms) {
+int ReadAllData(const Params& params, uint64_t* duration_ms) {
     asapo::FileInfo fi;
     high_resolution_clock::time_point t1 = high_resolution_clock::now();
 
-    std::vector<int>nfiles(nthreads, 0);
-    std::vector<int>errors(nthreads, 0);
+    std::vector<int>nfiles(params.nthreads, 0);
+    std::vector<int>errors(params.nthreads, 0);
 
-    auto threads = StartThreads(server, run_name, nthreads, &nfiles, &errors);
+    auto threads = StartThreads(params, &nfiles, &errors);
     WaitThreads(&threads);
 
     int n_total = std::accumulate(nfiles.begin(), nfiles.end(), 0);
     int errors_total = std::accumulate(errors.begin(), errors.end(), 0);
+
     if (errors_total) {
         exit(EXIT_FAILURE);
     }
@@ -71,17 +78,18 @@ int ReadAllData(const std::string& server, const std::string& run_name, int nthr
 }
 
 int main(int argc, char* argv[]) {
-    if (argc != 4) {
-        std::cout << "Usage: " + std::string{argv[0]} +" <server> <run_name> <nthreads>" << std::endl;
+    if (argc != 5) {
+        std::cout << "Usage: " + std::string{argv[0]} +" <server> <run_name> <nthreads> <token>" << std::endl;
         exit(EXIT_FAILURE);
     }
-    std::string server = std::string{argv[1]};
-    std::string run_name = std::string{argv[2]};
-    int nthreads = atoi(argv[3]);
-
+    Params params;
+    params.server = std::string{argv[1]};
+    params.beamtime_id = std::string{argv[2]};
+    params.nthreads = atoi(argv[3]);
+    params.token = std::string{argv[4]};
 
     uint64_t duration_ms;
-    auto nfiles = ReadAllData(server, run_name, nthreads, &duration_ms);
+    auto nfiles = ReadAllData(params, &duration_ms);
 
     std::cout << "Processed " << nfiles << " file(s)" << std::endl;
     std::cout << "Elapsed : " << duration_ms << "ms" << std::endl;
diff --git a/tests/automatic/broker/check_monitoring/CMakeLists.txt b/tests/automatic/broker/check_monitoring/CMakeLists.txt
index 03487791a..fc5151365 100644
--- a/tests/automatic/broker/check_monitoring/CMakeLists.txt
+++ b/tests/automatic/broker/check_monitoring/CMakeLists.txt
@@ -4,5 +4,7 @@ set(TARGET_NAME asapo-broker)
 # Testing
 ################################
 configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/broker_settings.json settings.json COPYONLY)
-add_script_test("${TARGET_NAME}-monitoring" "$<TARGET_PROPERTY:${TARGET_NAME},EXENAME>" nomem
+configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/broker_secret.key broker_secret.key COPYONLY)
+
+add_script_test("${TARGET_NAME}-monitoring" "$<TARGET_PROPERTY:${TARGET_NAME},EXENAME> $<TARGET_PROPERTY:asapo,EXENAME>" nomem
         )
diff --git a/tests/automatic/broker/check_monitoring/check_linux.sh b/tests/automatic/broker/check_monitoring/check_linux.sh
index b0a2de9ab..5b3e631b2 100644
--- a/tests/automatic/broker/check_monitoring/check_linux.sh
+++ b/tests/automatic/broker/check_monitoring/check_linux.sh
@@ -15,7 +15,10 @@ Cleanup() {
 
 influx -execute "create database ${database_name}"
 
-$@ -config settings.json &
+token=`$2 token -secret broker_secret.key data`
+
+
+$1 -config settings.json &
 
 sleep 0.3
 
@@ -23,7 +26,7 @@ brokerid=`echo $!`
 
 for i in `seq 1 50`;
 do
-    curl --silent 127.0.0.1:5005/database/data/next >/dev/null 2>&1 &
+    curl --silent 127.0.0.1:5005/database/data/next?token=$token >/dev/null 2>&1 &
 done
 
 
diff --git a/tests/automatic/broker/get_next/CMakeLists.txt b/tests/automatic/broker/get_next/CMakeLists.txt
index 2f661335f..d4f6222d0 100644
--- a/tests/automatic/broker/get_next/CMakeLists.txt
+++ b/tests/automatic/broker/get_next/CMakeLists.txt
@@ -4,5 +4,7 @@ set(TARGET_NAME asapo-broker)
 # Testing
 ################################
 configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/broker_settings.json settings.json COPYONLY)
-add_script_test("${TARGET_NAME}-getnext" "$<TARGET_PROPERTY:${TARGET_NAME},EXENAME>" nomem
+configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/broker_secret.key broker_secret.key COPYONLY)
+
+add_script_test("${TARGET_NAME}-getnext" "$<TARGET_PROPERTY:${TARGET_NAME},EXENAME> $<TARGET_PROPERTY:asapo,EXENAME>" nomem
         )
diff --git a/tests/automatic/broker/get_next/check_linux.sh b/tests/automatic/broker/get_next/check_linux.sh
index 3a3119b1a..f4bf78a6f 100644
--- a/tests/automatic/broker/get_next/check_linux.sh
+++ b/tests/automatic/broker/get_next/check_linux.sh
@@ -15,12 +15,16 @@ Cleanup() {
 echo "db.data.insert({"_id":2})" | mongo ${database_name}
 echo "db.data.insert({"_id":1})" | mongo ${database_name}
 
-$@ -config settings.json &
+token=`$2 token -secret broker_secret.key data`
+
+$1 -config settings.json &
 
 sleep 0.3
 brokerid=`echo $!`
 
-curl -v  --silent 127.0.0.1:5005/database/data/next --stderr - | grep '"_id":1'
-curl -v  --silent 127.0.0.1:5005/database/data/next --stderr - | grep '"_id":2'
 
-curl -v  --silent 127.0.0.1:5005/database/data/next --stderr - | grep "not found"
+
+curl -v  --silent 127.0.0.1:5005/database/data/next?token=$token --stderr - | grep '"_id":1'
+curl -v  --silent 127.0.0.1:5005/database/data/next?token=$token --stderr - | grep '"_id":2'
+
+curl -v  --silent 127.0.0.1:5005/database/data/next?token=$token --stderr - | grep "not found"
diff --git a/tests/automatic/broker/get_next/check_windows.bat b/tests/automatic/broker/get_next/check_windows.bat
index 443c05422..dfa4ffa85 100644
--- a/tests/automatic/broker/get_next/check_windows.bat
+++ b/tests/automatic/broker/get_next/check_windows.bat
@@ -7,13 +7,18 @@ echo db.data.insert({"_id":2}) | %mongo_exe% %database_name%  || goto :error
 set full_name="%1"
 set short_name="%~nx1"
 
+"%2" token -secret broker_secret.key data > token
+set /P token=< token
+
+
+
 start /B "" "%full_name%" -config settings.json
 
 ping 1.0.0.0 -n 1 -w 100 > nul
 
-C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/next --stderr - | findstr /c:\"_id\":1  || goto :error
-C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/next --stderr - | findstr /c:\"_id\":2  || goto :error
-C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/next --stderr - | findstr  /c:"not found"  || goto :error
+C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/next?token=%token% --stderr - | findstr /c:\"_id\":1  || goto :error
+C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/next?token=%token% --stderr - | findstr /c:\"_id\":2  || goto :error
+C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/next?token=%token% --stderr - | findstr  /c:"not found"  || goto :error
 
 goto :clean
 
@@ -24,3 +29,4 @@ exit /b 1
 :clean
 Taskkill /IM "%short_name%" /F
 echo db.dropDatabase() | %mongo_exe% %database_name%
+del /f token
\ No newline at end of file
diff --git a/tests/automatic/broker/read_config/CMakeLists.txt b/tests/automatic/broker/read_config/CMakeLists.txt
index 75c1658dd..f05b79669 100644
--- a/tests/automatic/broker/read_config/CMakeLists.txt
+++ b/tests/automatic/broker/read_config/CMakeLists.txt
@@ -4,6 +4,7 @@ set(TARGET_NAME asapo-broker)
 # Testing
 ################################
 configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/broker_settings.json settings_good.json COPYONLY)
+configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/broker_secret.key broker_secret.key COPYONLY)
 configure_file(settings_bad.json settings_bad.json COPYONLY)
 add_script_test("${TARGET_NAME}-readconfig" "$<TARGET_PROPERTY:${TARGET_NAME},EXENAME>" nomem
         )
diff --git a/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp b/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp
index a7cc5bd38..89fccb0fd 100644
--- a/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp
+++ b/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp
@@ -32,7 +32,7 @@ int main(int argc, char* argv[]) {
     auto args = GetArgs(argc, argv);
 
     asapo::Error err;
-    auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.uri, "", &err);
+    auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.uri, "", "", &err);
     auto server_broker = static_cast<asapo::ServerDataBroker*>(broker.get());
 
     asapo::HttpCode code;
diff --git a/tests/automatic/full_chain/simple_chain/CMakeLists.txt b/tests/automatic/full_chain/simple_chain/CMakeLists.txt
index a63811d4f..1f7374efe 100644
--- a/tests/automatic/full_chain/simple_chain/CMakeLists.txt
+++ b/tests/automatic/full_chain/simple_chain/CMakeLists.txt
@@ -4,4 +4,4 @@ set(TARGET_NAME full_chain_simple_chain)
 # Testing
 ################################
 prepare_asapo()
-add_script_test("${TARGET_NAME}" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:getnext_broker>" nomem)
+add_script_test("${TARGET_NAME}" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:getnext_broker> $<TARGET_PROPERTY:asapo,EXENAME>" nomem)
diff --git a/tests/automatic/full_chain/simple_chain/check_linux.sh b/tests/automatic/full_chain/simple_chain/check_linux.sh
index 22b61d6e0..12b1d68ae 100644
--- a/tests/automatic/full_chain/simple_chain/check_linux.sh
+++ b/tests/automatic/full_chain/simple_chain/check_linux.sh
@@ -5,6 +5,8 @@ set -e
 trap Cleanup EXIT
 
 beamtime_id=asapo_test
+token=`$3 token -secret broker_secret.key $beamtime_id`
+
 monitor_database_name=db_test
 proxy_address=127.0.0.1:8400
 
@@ -42,4 +44,4 @@ $1 localhost:8400 ${beamtime_id} 100 1000 4 0 100 &
 #producerid=`echo $!`
 
 
-$2 ${proxy_address} ${beamtime_id} 2 | grep "Processed 1000 file(s)"
+$2 ${proxy_address} ${beamtime_id} 2 $token | grep "Processed 1000 file(s)"
diff --git a/tests/automatic/full_chain/simple_chain/check_windows.bat b/tests/automatic/full_chain/simple_chain/check_windows.bat
index fecb43db9..f26def490 100644
--- a/tests/automatic/full_chain/simple_chain/check_windows.bat
+++ b/tests/automatic/full_chain/simple_chain/check_windows.bat
@@ -5,6 +5,8 @@ SET receiver_root_folder=c:\tmp\asapo\receiver\files
 SET receiver_folder="%receiver_root_folder%\%beamline%\%beamtime_id%"
 
 
+"%3" token -secret broker_secret.key %beamtime_id% > token
+set /P token=< token
 
 set proxy_address="127.0.0.1:8400"
 
@@ -24,7 +26,7 @@ start /B "" "%1" %proxy_address% %beamtime_id% 100 1000 4 0 100
 ping 1.0.0.0 -n 1 -w 100 > nul
 
 REM worker
-"%2" %proxy_address% %beamtime_id% 2 | findstr /c:"Processed 1000 file(s)"  || goto :error
+"%2" %proxy_address% %beamtime_id% 2 %token% | findstr /c:"Processed 1000 file(s)"  || goto :error
 
 
 goto :clean
@@ -40,6 +42,7 @@ c:\opt\consul\nomad stop broker
 c:\opt\consul\nomad stop authorizer
 c:\opt\consul\nomad stop nginx
 rmdir /S /Q %receiver_root_folder%
+del /f token
 echo db.dropDatabase() | %mongo_exe% %beamtime_id%
 
 
diff --git a/tests/automatic/settings/broker_secret.key b/tests/automatic/settings/broker_secret.key
new file mode 100644
index 000000000..1d100e0ec
--- /dev/null
+++ b/tests/automatic/settings/broker_secret.key
@@ -0,0 +1 @@
+12ljzgneasfd
diff --git a/tests/automatic/settings/broker_settings.json b/tests/automatic/settings/broker_settings.json
index a5ccb0752..ab8984917 100644
--- a/tests/automatic/settings/broker_settings.json
+++ b/tests/automatic/settings/broker_settings.json
@@ -3,5 +3,6 @@
   "MonitorDbAddress": "localhost:8086",
   "MonitorDbName": "db_test",
   "port":5005,
-  "LogLevel":"info"
+  "LogLevel":"info",
+  "SecretFile":"broker_secret.key"
 }
\ No newline at end of file
diff --git a/tests/automatic/settings/broker_settings.json.tpl b/tests/automatic/settings/broker_settings.json.tpl
index af6d1dcb2..2716cc6e3 100644
--- a/tests/automatic/settings/broker_settings.json.tpl
+++ b/tests/automatic/settings/broker_settings.json.tpl
@@ -3,5 +3,6 @@
   "MonitorDbAddress": "localhost:8086",
   "MonitorDbName": "db_test",
   "port":{{ env "NOMAD_PORT_broker" }},
-  "LogLevel":"info"
+  "LogLevel":"info",
+  "SecretFile":"broker_secret.key"
 }
\ No newline at end of file
diff --git a/tests/automatic/worker/next_multithread_broker/check_linux.sh b/tests/automatic/worker/next_multithread_broker/check_linux.sh
index c5a52cbb3..308bd1ff6 100644
--- a/tests/automatic/worker/next_multithread_broker/check_linux.sh
+++ b/tests/automatic/worker/next_multithread_broker/check_linux.sh
@@ -1,6 +1,7 @@
 #!/usr/bin/env bash
 
 database_name=test_run
+token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo=
 
 set -e
 
@@ -26,6 +27,6 @@ do
 	echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1})' | mongo ${database_name}
 done
 
-$@ 127.0.0.1:8400 $database_name 4 10
+$@ 127.0.0.1:8400 $database_name 4 10 $token_test_run
 
 
diff --git a/tests/automatic/worker/next_multithread_broker/check_windows.bat b/tests/automatic/worker/next_multithread_broker/check_windows.bat
index b3762c8a8..1a5d84ae4 100644
--- a/tests/automatic/worker/next_multithread_broker/check_windows.bat
+++ b/tests/automatic/worker/next_multithread_broker/check_windows.bat
@@ -1,5 +1,6 @@
 SET database_name=test_run
 SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe"
+set token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo=
 
 ::first argument  path to the executable
 
@@ -12,7 +13,7 @@ ping 1.0.0.0 -n 10 -w 100 > nul
 for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1}) | %mongo_exe% %database_name%  || goto :error
 
 
-%1 127.0.0.1:8400 %database_name% 4 10 || goto :error
+%1 127.0.0.1:8400 %database_name% 4 10 %token_test_run% || goto :error
 
 goto :clean
 
diff --git a/tests/automatic/worker/next_multithread_broker/next_multithread_broker.cpp b/tests/automatic/worker/next_multithread_broker/next_multithread_broker.cpp
index 5f8321ec4..ffd3f90f5 100644
--- a/tests/automatic/worker/next_multithread_broker/next_multithread_broker.cpp
+++ b/tests/automatic/worker/next_multithread_broker/next_multithread_broker.cpp
@@ -28,12 +28,13 @@ void Assert(std::vector<asapo::FileInfos> file_infos, int nthreads, int nfiles)
 struct Args {
     std::string server;
     std::string run_name;
+    std::string token;
     int nthreads;
     int nfiles;
 };
 
 Args GetArgs(int argc, char* argv[]) {
-    if (argc != 5) {
+    if (argc != 6) {
         std::cout << "Wrong number of arguments" << std::endl;
         exit(EXIT_FAILURE);
     }
@@ -41,12 +42,14 @@ Args GetArgs(int argc, char* argv[]) {
     std::string source_name{argv[2]};
     int nthreads = std::stoi(argv[3]);
     int nfiles = std::stoi(argv[4]);
-    return Args{server, source_name, nthreads, nfiles};
+    std::string token{argv[5]};
+
+    return Args{server, source_name, token, nthreads, nfiles};
 }
 
 void GetAllFromBroker(const Args& args) {
     asapo::Error err;
-    auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, args.run_name, &err);
+    auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, args.run_name, args.token, &err);
 
     std::vector<asapo::FileInfos>file_infos(args.nthreads);
     auto exec_next = [&](int i) {
@@ -54,6 +57,7 @@ void GetAllFromBroker(const Args& args) {
         while ((err = broker->GetNext(&fi, nullptr)) == nullptr) {
             file_infos[i].emplace_back(fi);
         }
+        printf("%s\n", err->Explain().c_str());
     };
 
     std::vector<std::thread> threads;
diff --git a/tests/manual/performance_broker/settings.json b/tests/manual/performance_broker/settings.json
index a2c1a4a5a..a687e733f 100644
--- a/tests/manual/performance_broker/settings.json
+++ b/tests/manual/performance_broker/settings.json
@@ -3,5 +3,6 @@
   "MonitorDbAddress": "localhost:8086",
   "MonitorDbName": "db_test",
   "port":5005,
-  "LogLevel":"info"
+  "LogLevel":"info",
+  "SecretFile":"broker_secret.key"
 }
\ No newline at end of file
diff --git a/tests/manual/performance_broker/test.sh b/tests/manual/performance_broker/test.sh
index 9c36fce45..66e361c73 100755
--- a/tests/manual/performance_broker/test.sh
+++ b/tests/manual/performance_broker/test.sh
@@ -4,10 +4,12 @@
 # reads fileset into database
 # calls getnext_broker example from $worker_node
 
-nthreads=16
+nthreads=1
 # a directory with many files in it
 dir=/gpfs/petra3/scratch/yakubov/test
-run_name=test
+run_name=test_run
+token=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo=
+
 service_node=max-wgs
 
 monitor_node=zitpcx27016
@@ -42,6 +44,11 @@ ssh ${service_node} "bash -c 'cd ${service_dir}; nohup ./asapo-discovery -config
 
 
 scp settings_tmp.json ${service_node}:${service_dir}/settings.json
+
+scp ../../../tests/automatic/settings/broker_secret.key ${service_node}:${service_dir}/broker_secret.key
+
+
+
 rm settings_tmp.json
 scp ../../../cmake-build-release/broker/asapo-broker ${service_node}:${service_dir}
 ssh ${service_node} "bash -c 'cd ${service_dir}; nohup ./asapo-broker -config settings.json &> ${service_dir}/broker.log &'"
@@ -52,7 +59,7 @@ ssh ${worker_node} ${worker_dir}/folder2db -n ${nthreads} ${dir} ${run_name} ${s
 sleep 3
 
 scp ../../../cmake-build-release/examples/worker/getnext_broker/getnext_broker ${worker_node}:${worker_dir}
-ssh ${worker_node} ${worker_dir}/getnext_broker ${service_node}:8400 ${run_name} ${nthreads}
+ssh ${worker_node} ${worker_dir}/getnext_broker ${service_node}:8400 ${run_name} ${nthreads} $token
 
 
 
diff --git a/worker/api/cpp/include/worker/data_broker.h b/worker/api/cpp/include/worker/data_broker.h
index d916e629d..bc03ec02d 100644
--- a/worker/api/cpp/include/worker/data_broker.h
+++ b/worker/api/cpp/include/worker/data_broker.h
@@ -21,6 +21,7 @@ auto const kNotFound = "Uri not found";
 auto const kPermissionDenied = "Permissionn Denied";
 auto const kNoData = "No Data";
 auto const kWrongInput = "Wrong Input";
+auto const kAuthorizationError = "authorization error";
 auto const kInternalError = "Internal Error";
 auto const kUnknownIOError = "Unknown IO Error";
 }
@@ -47,8 +48,8 @@ class DataBrokerFactory {
   public:
     static std::unique_ptr<DataBroker> CreateFolderBroker(const std::string& source_name,
             Error* error) noexcept;
-    static std::unique_ptr<DataBroker> CreateServerBroker(const std::string& server_name,
-            const std::string& source_name,
+    static std::unique_ptr<DataBroker> CreateServerBroker(std::string server_name,
+            std::string beamtime_id, std::string token,
             Error* error) noexcept;
 
 };
diff --git a/worker/api/cpp/src/data_broker.cpp b/worker/api/cpp/src/data_broker.cpp
index 624e60ed5..0e833b2ac 100644
--- a/worker/api/cpp/src/data_broker.cpp
+++ b/worker/api/cpp/src/data_broker.cpp
@@ -11,7 +11,6 @@ std::unique_ptr<DataBroker> Create(const std::string& source_name,
                                    Args&& ... args) noexcept {
     if (source_name.empty()) {
         error->reset(new SimpleError("Empty Data Source"));
-        //*return_code = WorkerErrorMessage::kEmptyDatasource;
         return nullptr;
     }
 
@@ -21,7 +20,6 @@ std::unique_ptr<DataBroker> Create(const std::string& source_name,
         error->reset(nullptr);
     } catch (...) {         // we do not test this part
         error->reset(new SimpleError("Memory error"));
-//       *return_code = WorkerErrorMessage::kMemoryError;
     }
 
     return p;
@@ -33,10 +31,10 @@ std::unique_ptr<DataBroker> DataBrokerFactory::CreateFolderBroker(const std::str
     return Create<FolderDataBroker>(source_name, error);
 };
 
-std::unique_ptr<DataBroker> DataBrokerFactory::CreateServerBroker(const std::string& server_name,
-        const std::string& source_name,
+std::unique_ptr<DataBroker> DataBrokerFactory::CreateServerBroker(std::string server_name,
+        std::string beamtime_id, std::string token,
         Error* error) noexcept {
-    return Create<ServerDataBroker>(server_name, error, source_name);
+    return Create<ServerDataBroker>(std::move(server_name), error, std::move(beamtime_id), std::move(token));
 }
 
 
diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp
index 2859b63e2..f517a770f 100644
--- a/worker/api/cpp/src/server_data_broker.cpp
+++ b/worker/api/cpp/src/server_data_broker.cpp
@@ -20,6 +20,9 @@ Error HttpCodeToWorkerError(const HttpCode& code) {
     case HttpCode::BadRequest:
         message = WorkerErrorMessage::kWrongInput;
         break;
+    case HttpCode::Unauthorized:
+        message = WorkerErrorMessage::kAuthorizationError;
+        break;
     case HttpCode::InternalServerError:
         message = WorkerErrorMessage::kErrorReadingSource;
         break;
@@ -36,10 +39,11 @@ Error HttpCodeToWorkerError(const HttpCode& code) {
     return Error{new HttpError(message, code)};
 }
 
-ServerDataBroker::ServerDataBroker(const std::string& server_uri,
-                                   const std::string& source_name) :
+ServerDataBroker::ServerDataBroker(std::string server_uri,
+                                   std::string source_name,
+                                   std::string token) :
     io__{GenerateDefaultIO()}, httpclient__{DefaultHttpClient()},
-    server_uri_{server_uri}, source_name_{source_name} {
+    server_uri_{std::move(server_uri)}, source_name_{std::move(source_name)}, token_{std::move(token)} {
 }
 
 Error ServerDataBroker::Connect() {
@@ -75,10 +79,14 @@ void ServerDataBroker::ProcessServerError(Error* err, const std::string& respons
     return;
 }
 
+std::string ServerDataBroker::RequestWithToken(std::string uri) {
+    return std::move(uri) + "?token=" + token_;
+}
+
 Error ServerDataBroker::ProcessRequest(std::string* response, std::string request_uri) {
     Error err;
     HttpCode code;
-    *response = httpclient__->Get(request_uri, &code, &err);
+    *response = httpclient__->Get(RequestWithToken(request_uri), &code, &err);
     if (err != nullptr) {
         return err;
     }
diff --git a/worker/api/cpp/src/server_data_broker.h b/worker/api/cpp/src/server_data_broker.h
index 43ccfeb7b..5c666dafb 100644
--- a/worker/api/cpp/src/server_data_broker.h
+++ b/worker/api/cpp/src/server_data_broker.h
@@ -12,13 +12,14 @@ Error HttpCodeToWorkerError(const HttpCode& code);
 
 class ServerDataBroker final : public asapo::DataBroker {
   public:
-    explicit ServerDataBroker(const std::string& server_uri, const std::string& source_name);
+    explicit ServerDataBroker(std::string server_uri, std::string source_name, std::string token);
     Error Connect() override;
     Error GetNext(FileInfo* info, FileData* data) override;
     void SetTimeout(uint64_t timeout_ms) override;
     std::unique_ptr<IO> io__; // modified in testings to mock system calls,otherwise do not touch
     std::unique_ptr<HttpClient> httpclient__;
   private:
+    std::string RequestWithToken(std::string uri);
     Error GetFileInfoFromServer(FileInfo* info, const std::string& operation);
     Error GetBrokerUri();
     void ProcessServerError(Error* err, const std::string& response, std::string* redirect_uri);
@@ -26,6 +27,7 @@ class ServerDataBroker final : public asapo::DataBroker {
     std::string server_uri_;
     std::string current_broker_uri_;
     std::string source_name_;
+    std::string token_;
     uint64_t timeout_ms_ = 0;
 };
 
diff --git a/worker/api/cpp/unittests/test_server_broker.cpp b/worker/api/cpp/unittests/test_server_broker.cpp
index bf8c4748d..58e24e8eb 100644
--- a/worker/api/cpp/unittests/test_server_broker.cpp
+++ b/worker/api/cpp/unittests/test_server_broker.cpp
@@ -38,12 +38,12 @@ using testing::AllOf;
 namespace {
 
 TEST(FolderDataBroker, SetCorrectIo) {
-    auto data_broker = std::unique_ptr<ServerDataBroker> {new ServerDataBroker("test", "dbname")};
+    auto data_broker = std::unique_ptr<ServerDataBroker> {new ServerDataBroker("test", "beamtime_id", "token")};
     ASSERT_THAT(dynamic_cast<asapo::SystemIO*>(data_broker->io__.get()), Ne(nullptr));
 }
 
 TEST(FolderDataBroker, SetCorrectHttpClient) {
-    auto data_broker = std::unique_ptr<ServerDataBroker> {new ServerDataBroker("test", "dbname")};
+    auto data_broker = std::unique_ptr<ServerDataBroker> {new ServerDataBroker("test", "beamtime_id", "token")};
     ASSERT_THAT(dynamic_cast<asapo::CurlHttpClient*>(data_broker->httpclient__.get()), Ne(nullptr));
 }
 
@@ -56,9 +56,10 @@ class ServerDataBrokerTests : public Test {
     FileInfo info;
     std::string expected_server_uri = "test:8400";
     std::string expected_broker_uri = "broker:5005";
+    std::string expected_token = "token";
 
     void SetUp() override {
-        data_broker = std::unique_ptr<ServerDataBroker> {new ServerDataBroker(expected_server_uri, "dbname")};
+        data_broker = std::unique_ptr<ServerDataBroker> {new ServerDataBroker(expected_server_uri, "beamtime_id", expected_token)};
         data_broker->io__ = std::unique_ptr<IO> {&mock_io};
         data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client};
     }
@@ -97,10 +98,11 @@ TEST_F(ServerDataBrokerTests, GetNextReturnsErrorOnWrongInput) {
 TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) {
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/dbname/next", _, _)).WillOnce(DoAll(
-                SetArgPointee<1>(HttpCode::OK),
-                SetArgPointee<2>(nullptr),
-                Return("")));
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/next?token=" + expected_token, _,
+                                        _)).WillOnce(DoAll(
+                                                SetArgPointee<1>(HttpCode::OK),
+                                                SetArgPointee<2>(nullptr),
+                                                Return("")));
     data_broker->GetNext(&info, nullptr);
 }
 
@@ -108,7 +110,6 @@ TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) {
 TEST_F(ServerDataBrokerTests, GetNextReturnsEOFFromHttpClient) {
     MockGetBrokerUri();
 
-
     EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
                 SetArgPointee<1>(HttpCode::Conflict),
                 SetArgPointee<2>(nullptr),
@@ -120,6 +121,21 @@ TEST_F(ServerDataBrokerTests, GetNextReturnsEOFFromHttpClient) {
     ASSERT_THAT(err->Explain(), HasSubstr("timeout"));
 }
 
+TEST_F(ServerDataBrokerTests, GetNextReturnsNotAuthorized) {
+    MockGetBrokerUri();
+
+    EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
+                SetArgPointee<1>(HttpCode::Unauthorized),
+                SetArgPointee<2>(nullptr),
+                Return("")));
+
+    auto err = data_broker->GetNext(&info, nullptr);
+
+    ASSERT_THAT(err, Ne(nullptr));
+    ASSERT_THAT(err->Explain(), HasSubstr("authorization"));
+}
+
+
 TEST_F(ServerDataBrokerTests, GetNextReturnsWrongResponseFromHttpClient) {
 
     MockGetBrokerUri();
@@ -186,7 +202,7 @@ TEST_F(ServerDataBrokerTests, GetNextReturnsEOFFromHttpClientUntilTimeout) {
                 SetArgPointee<2>(nullptr),
                 Return("{\"id\":1}")));
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/dbname/1", _,
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/1?token=" + expected_token, _,
                                         _)).Times(AtLeast(1)).WillRepeatedly(DoAll(
                                                     SetArgPointee<1>(HttpCode::Conflict),
                                                     SetArgPointee<2>(nullptr),
diff --git a/worker/api/cpp/unittests/test_worker_api.cpp b/worker/api/cpp/unittests/test_worker_api.cpp
index 4b36ea394..6af41b953 100644
--- a/worker/api/cpp/unittests/test_worker_api.cpp
+++ b/worker/api/cpp/unittests/test_worker_api.cpp
@@ -39,14 +39,13 @@ TEST_F(DataBrokerFactoryTests, FailCreateDataSourceWithEmptySource) {
 
     auto data_broker = DataBrokerFactory::CreateFolderBroker("", &error);
 
-//    ASSERT_THAT(error->Explain(), Eq(WorkerErrorMessage::kEmptyDatasource));
     ASSERT_THAT(error->Explain(), Eq("Empty Data Source"));
     ASSERT_THAT(data_broker.get(), Eq(nullptr));
 }
 
 TEST_F(DataBrokerFactoryTests, CreateServerDataSource) {
 
-    auto data_broker = DataBrokerFactory::CreateServerBroker("server", "database", &error);
+    auto data_broker = DataBrokerFactory::CreateServerBroker("server", "beamtime_id", "token", &error);
 
     ASSERT_THAT(error, Eq(nullptr));
     ASSERT_THAT(dynamic_cast<ServerDataBroker*>(data_broker.get()), Ne(nullptr));
-- 
GitLab