From 8d0a6b488bb3e67180f910f8971a878a726f4c20 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Wed, 6 Jun 2018 16:21:39 +0200
Subject: [PATCH] service discovery for broker

---
 broker/src/asapo_broker/utils/status_codes.go |   2 +-
 .../request_handler/request_handler.go        |   5 +-
 .../request_handler/request_handler_consul.go |  73 ++++++++--
 .../request_handler_consul_test.go            | 127 +++++++++++++-----
 .../request_handler/request_handler_static.go |  20 ++-
 .../request_handler_static_test.go            |  18 ++-
 .../asapo_discovery/server/get_receivers.go   |  26 +++-
 .../src/asapo_discovery/server/listroutes.go  |   7 +
 .../{get_receivers_test.go => routes_test.go} |  16 ++-
 .../src/asapo_discovery/server/server.go      |  36 +----
 .../asapo_discovery/server/settings_test.go   |  26 +++-
 discovery/src/asapo_discovery/utils/stucts.go |  48 +++++++
 examples/worker/getnext_broker/CMakeLists.txt |   4 +-
 examples/worker/getnext_broker/check_linux.sh |  17 ++-
 .../worker/getnext_broker/check_windows.bat   |  18 ++-
 .../api/src/receiver_discovery_service.cpp    |   2 +-
 receiver/unittests/test_statistics.cpp        |   2 +-
 .../automatic/broker/get_next/check_linux.sh  |   2 +-
 .../broker/get_next/check_windows.bat         |   2 +-
 .../full_chain/simple_chain/check_linux.sh    |   4 +-
 .../full_chain/simple_chain/check_windows.bat |   6 +-
 .../settings/discovery_settings.json.tpl      |   4 +-
 .../next_multithread_broker/CMakeLists.txt    |   4 +-
 .../next_multithread_broker/check_linux.sh    |  18 +--
 .../next_multithread_broker/check_windows.bat |  16 +--
 .../manual/performance_broker/discovery.json  |  10 ++
 .../discovery.json                            |  17 ++-
 .../fluentd.conf                              |   1 -
 .../performance_full_chain_simple/test.sh     |   2 +-
 worker/api/cpp/include/worker/data_broker.h   |   1 +
 worker/api/cpp/src/server_data_broker.cpp     |  53 +++++---
 worker/api/cpp/src/server_data_broker.h       |   2 +
 .../api/cpp/unittests/test_server_broker.cpp  |  91 +++++++++++--
 33 files changed, 491 insertions(+), 189 deletions(-)
 rename discovery/src/asapo_discovery/server/{get_receivers_test.go => routes_test.go} (75%)
 create mode 100644 discovery/src/asapo_discovery/utils/stucts.go
 create mode 100644 tests/manual/performance_broker/discovery.json

diff --git a/broker/src/asapo_broker/utils/status_codes.go b/broker/src/asapo_broker/utils/status_codes.go
index 70a219300..58fef4da3 100644
--- a/broker/src/asapo_broker/utils/status_codes.go
+++ b/broker/src/asapo_broker/utils/status_codes.go
@@ -10,5 +10,5 @@ const (
 	//error codes
 	StatusError      = http.StatusInternalServerError
 	StatusWrongInput = http.StatusBadRequest
-	StatusNoData     = http.StatusNotFound
+	StatusNoData     = http.StatusConflict
 )
diff --git a/discovery/src/asapo_discovery/request_handler/request_handler.go b/discovery/src/asapo_discovery/request_handler/request_handler.go
index e2cac458f..b2ce9b560 100644
--- a/discovery/src/asapo_discovery/request_handler/request_handler.go
+++ b/discovery/src/asapo_discovery/request_handler/request_handler.go
@@ -1,7 +1,10 @@
 package request_handler
 
+import "asapo_discovery/utils"
+
 type Agent interface {
 	GetReceivers() ([]byte, error)
-	Init(int,[]string) error
+	GetBroker() ([]byte, error)
+	Init(settings utils.Settings) error
 }
 
diff --git a/discovery/src/asapo_discovery/request_handler/request_handler_consul.go b/discovery/src/asapo_discovery/request_handler/request_handler_consul.go
index c414158e5..c4a948a4b 100644
--- a/discovery/src/asapo_discovery/request_handler/request_handler_consul.go
+++ b/discovery/src/asapo_discovery/request_handler/request_handler_consul.go
@@ -5,6 +5,8 @@ import (
 	"github.com/hashicorp/consul/api"
 	"strconv"
 	"errors"
+	"sort"
+	"sync"
 )
 
 type ConsulRequestHandler struct {
@@ -17,20 +19,63 @@ type Responce struct {
 	Uris           []string
 }
 
+type SafeCounter struct {
+	counter   int
+	mux sync.Mutex
+}
+
+func (c *SafeCounter) Next(size int) int {
+	c.mux.Lock()
+	defer c.mux.Unlock()
+	val  := c.counter % size
+	c.counter++
+	return val
+}
+
+var counter SafeCounter
+
+func (rh *ConsulRequestHandler) GetServices(name string) ([]string, error) {
+	var result = make([]string, 0)
+	services, _, err := rh.client.Health().Service(name, "", true, nil)
+	if err != nil {
+		return nil, err
+	}
+	for _, service := range (services) {
+		result = append(result, service.Node.Address+":"+strconv.Itoa(service.Service.Port))
+	}
+	sort.Strings(result)
+	return result, nil
+}
+
 func (rh *ConsulRequestHandler) GetReceivers() ([]byte, error) {
-	if (rh.client == nil){
-		return nil,errors.New("consul client not connected")
+	if (rh.client == nil) {
+		return nil, errors.New("consul client not connected")
+	}
+	var response Responce
+	var err error
+	response.Uris, err = rh.GetServices("receiver")
+	if err != nil {
+		return nil, err
 	}
-	var responce Responce
-	services,_,err := rh.client.Health().Service("receiver","",true,nil)
-	if err!=nil {
-		return nil,err
+	response.MaxConnections = rh.MaxConnections
+	return utils.MapToJson(&response)
+}
+
+func (rh *ConsulRequestHandler) GetBroker() ([]byte, error) {
+	if (rh.client == nil) {
+		return nil, errors.New("consul client not connected")
+	}
+	response, err := rh.GetServices("broker")
+	if err != nil {
+		return nil, err
 	}
-	for _,service := range (services) {
-		responce.Uris = append(responce.Uris,service.Node.Address+":"+strconv.Itoa(service.Service.Port))
+	size := len(response)
+	if size ==0 {
+		return []byte(""),nil
+	}else {
+		return []byte(response[counter.Next(size)]),nil
 	}
-	responce.MaxConnections = rh.MaxConnections
-	return utils.MapToJson(&responce)
+	return nil, nil
 }
 
 func (rh *ConsulRequestHandler) connectClient(uri string) (client *api.Client, err error) {
@@ -48,13 +93,13 @@ func (rh *ConsulRequestHandler) connectClient(uri string) (client *api.Client, e
 	return
 }
 
-func (rh *ConsulRequestHandler) Init(maxCons int, uris []string) (err error) {
-	rh.MaxConnections = maxCons
-	if len(uris) == 0 {
+func (rh *ConsulRequestHandler) Init(settings utils.Settings) (err error) {
+	rh.MaxConnections = settings.Receiver.MaxConnections
+	if len(settings.ConsulEndpoints) == 0 {
 		rh.client, err = rh.connectClient("")
 		return err
 	}
-	for _, uri := range (uris) {
+	for _, uri := range (settings.ConsulEndpoints) {
 		rh.client, err = rh.connectClient(uri)
 		if err == nil {
 			return nil
diff --git a/discovery/src/asapo_discovery/request_handler/request_handler_consul_test.go b/discovery/src/asapo_discovery/request_handler/request_handler_consul_test.go
index 5e9d18f00..79f349ee8 100644
--- a/discovery/src/asapo_discovery/request_handler/request_handler_consul_test.go
+++ b/discovery/src/asapo_discovery/request_handler/request_handler_consul_test.go
@@ -3,13 +3,14 @@ package request_handler
 import (
 	"github.com/stretchr/testify/suite"
 	"testing"
-     "github.com/hashicorp/consul/api"
+	"github.com/hashicorp/consul/api"
 	"strconv"
+	"asapo_discovery/utils"
 )
 
 type ConsulHandlerTestSuite struct {
 	suite.Suite
-	client *api.Client
+	client  *api.Client
 	handler ConsulRequestHandler
 }
 
@@ -17,68 +18,122 @@ func TestConsulHandlerTestSuite(t *testing.T) {
 	suite.Run(t, new(ConsulHandlerTestSuite))
 }
 
+var consul_settings utils.Settings
+
+func (suite *ConsulHandlerTestSuite) registerAgents(name string) {
+	for i := 1234; i < 1236; i++ {
+		reg := &api.AgentServiceRegistration{
+			ID:   name + strconv.Itoa(i),
+			Name: name,
+			Port: i,
+			Check: &api.AgentServiceCheck{
+				Interval: "10m",
+				Status:   "passing",
+				HTTP:     "http://localhost:5000/health",
+			},
+		}
+		err := suite.client.Agent().ServiceRegister(reg)
+		if err != nil {
+			panic(err)
+		}
+	}
+
+}
+
 func (suite *ConsulHandlerTestSuite) SetupTest() {
 	var err error
+	consul_settings = utils.Settings{Receiver: utils.ReceiverInfo{MaxConnections: 10, ForceEndpoints: []string{}}}
+
 	suite.client, err = api.NewClient(api.DefaultConfig())
 	if err != nil {
 		panic(err)
 	}
-	for i:=1234;i<1236;i++ {
-	reg := &api.AgentServiceRegistration{
-		ID:   "receiver"+strconv.Itoa(i),
-		Name: "receiver",
-		Port: i,
-		Check: &api.AgentServiceCheck{
-			Interval:"10m",
-			Status:"passing",
-			HTTP: "http://localhost:5000/health",
-		},
-	}
-	err = suite.client.Agent().ServiceRegister(reg)
-	if err != nil {
-		panic(err)
-	}
-	}
+
+	suite.registerAgents("receiver")
+	suite.registerAgents("broker")
+
 }
 
 func (suite *ConsulHandlerTestSuite) TearDownTest() {
 	suite.client.Agent().ServiceDeregister("receiver1234")
 	suite.client.Agent().ServiceDeregister("receiver1235")
+	suite.client.Agent().ServiceDeregister("broker1234")
+	suite.client.Agent().ServiceDeregister("broker1235")
 }
 
-
 func (suite *ConsulHandlerTestSuite) TestInitDefaultUri() {
-	err := suite.handler.Init(10,[]string{})
-	suite.NoError(err,  "empty list")
+	err := suite.handler.Init(consul_settings)
+	suite.NoError(err, "empty list")
 }
 
 func (suite *ConsulHandlerTestSuite) TestInitWrongUri() {
-	err := suite.handler.Init(10,[]string{"blabla"})
-	suite.Error(err,  "wrong consul uri")
-	suite.Nil(suite.handler.client,  "client nli after error")
+	consul_settings.ConsulEndpoints = []string{"blabla"}
+	err := suite.handler.Init(consul_settings)
+	suite.Error(err, "wrong consul uri")
+	suite.Nil(suite.handler.client, "client nli after error")
 
 }
 
 func (suite *ConsulHandlerTestSuite) TestInitOkUriFirst() {
-	err := suite.handler.Init(10,[]string{"http://127.0.0.1:8500"})
-	suite.NoError(err,  "")
+	consul_settings.ConsulEndpoints = []string{"http://127.0.0.1:8500"}
+
+	err := suite.handler.Init(consul_settings)
+	suite.NoError(err, "")
 }
 
 func (suite *ConsulHandlerTestSuite) TestInitOkUriNotFirst() {
-	err := suite.handler.Init(10,[]string{"blabla","http://127.0.0.1:8500"})
-	suite.NoError(err,  "")
-}
+	consul_settings.ConsulEndpoints = []string{"blabla", "http://127.0.0.1:8500"}
 
+	err := suite.handler.Init(consul_settings)
+	suite.NoError(err, "")
+}
 
 func (suite *ConsulHandlerTestSuite) TestGetReceivers() {
-	suite.handler.Init(10,[]string{})
-	res,err := suite.handler.GetReceivers()
-	suite.NoError(err,  "")
-	suite.Equal("{\"MaxConnections\":10,\"Uris\":[\"127.0.0.1:1234\",\"127.0.0.1:1235\"]}",string(res),"uris")
+	suite.handler.Init(consul_settings)
+	res, err := suite.handler.GetReceivers()
+	suite.NoError(err, "")
+	suite.Equal("{\"MaxConnections\":10,\"Uris\":[\"127.0.0.1:1234\",\"127.0.0.1:1235\"]}", string(res), "uris")
 }
 
 func (suite *ConsulHandlerTestSuite) TestGetReceiversWhenNotConnected() {
-	suite.handler.Init(10,[]string{"blabla"})
-	_,err := suite.handler.GetReceivers()
-	suite.Error(err,  "")
+	consul_settings.ConsulEndpoints = []string{"blabla"}
+	suite.handler.Init(consul_settings)
+	_, err := suite.handler.GetReceivers()
+	suite.Error(err, "")
+}
+
+func (suite *ConsulHandlerTestSuite) TestGetBrokerWhenNotConnected() {
+	consul_settings.ConsulEndpoints = []string{"blabla"}
+	suite.handler.Init(consul_settings)
+	_, err := suite.handler.GetBroker()
+	suite.Error(err, "")
 }
+
+func (suite *ConsulHandlerTestSuite) TestGetBrokerRoundRobin() {
+	suite.handler.Init(consul_settings)
+	res, err := suite.handler.GetBroker()
+	suite.NoError(err, "")
+	suite.Equal("127.0.0.1:1234", string(res), "uris")
+
+	res, err = suite.handler.GetBroker()
+	suite.NoError(err, "")
+	suite.Equal("127.0.0.1:1235", string(res), "uris")
+
+	res, err = suite.handler.GetBroker()
+	suite.NoError(err, "")
+	suite.Equal("127.0.0.1:1234", string(res), "uris")
+
+}
+
+
+func (suite *ConsulHandlerTestSuite) TestGetBrokerEmpty() {
+	suite.client.Agent().ServiceDeregister("broker1234")
+	suite.client.Agent().ServiceDeregister("broker1235")
+
+	suite.handler.Init(consul_settings)
+	res, err := suite.handler.GetBroker()
+	suite.NoError(err, "")
+	suite.Equal("", string(res), "uris")
+}
+
+
diff --git a/discovery/src/asapo_discovery/request_handler/request_handler_static.go b/discovery/src/asapo_discovery/request_handler/request_handler_static.go
index d52cb205d..90522d830 100644
--- a/discovery/src/asapo_discovery/request_handler/request_handler_static.go
+++ b/discovery/src/asapo_discovery/request_handler/request_handler_static.go
@@ -5,15 +5,23 @@ import (
 )
 
 type StaticRequestHandler struct {
-	Responce
-	}
+	receiverResponce Responce
+	broker string
+}
+
 
 func (rh *StaticRequestHandler) GetReceivers() ([]byte, error) {
-	return utils.MapToJson(&rh)
+	return utils.MapToJson(&rh.receiverResponce)
 }
 
-func (rh *StaticRequestHandler) Init(maxCons int,uris []string) error {
-	rh.MaxConnections = maxCons
-	rh.Uris = uris
+func (rh *StaticRequestHandler) GetBroker() ([]byte, error) {
+	return []byte(rh.broker),nil
+}
+
+
+func (rh *StaticRequestHandler) Init(settings utils.Settings) error {
+	rh.receiverResponce.MaxConnections = settings.Receiver.MaxConnections
+	rh.receiverResponce.Uris = settings.Receiver.ForceEndpoints
+	rh.broker = settings.Broker.ForceEndpoint
 	return nil
 }
diff --git a/discovery/src/asapo_discovery/request_handler/request_handler_static_test.go b/discovery/src/asapo_discovery/request_handler/request_handler_static_test.go
index 01e769e43..35b08f2f5 100644
--- a/discovery/src/asapo_discovery/request_handler/request_handler_static_test.go
+++ b/discovery/src/asapo_discovery/request_handler/request_handler_static_test.go
@@ -3,23 +3,35 @@ package request_handler
 import (
 	"github.com/stretchr/testify/assert"
 	"testing"
+    "asapo_discovery/utils"
 )
 
 
 var uris = []string{"ip1","ip2"}
 const max_conn = 1
 
+var static_settings utils.Settings= utils.Settings{Receiver:utils.ReceiverInfo{MaxConnections:max_conn,ForceEndpoints:uris},Broker:utils.BrokerInfo{
+	ForceEndpoint:"ip_broker"}}
+
+
+
 var rh StaticRequestHandler;
 
 func TestStaticHandlerInitOK(t *testing.T) {
-	err := rh.Init(max_conn,uris)
+	err := rh.Init(static_settings)
 	assert.Nil(t, err)
 }
 
-func TestStaticHandlerGetOK(t *testing.T) {
-	rh.Init(max_conn,uris)
+func TestStaticHandlerGetReceviersOK(t *testing.T) {
+	rh.Init(static_settings)
 	res,err := rh.GetReceivers()
 	assert.Equal(t,string(res), "{\"MaxConnections\":1,\"Uris\":[\"ip1\",\"ip2\"]}")
 	assert.Nil(t, err)
+}
 
+func TestStaticHandlerGetBrokerOK(t *testing.T) {
+	rh.Init(static_settings)
+	res,err := rh.GetBroker()
+	assert.Equal(t,string(res), "ip_broker")
+	assert.Nil(t, err)
 }
diff --git a/discovery/src/asapo_discovery/server/get_receivers.go b/discovery/src/asapo_discovery/server/get_receivers.go
index 41e0cfa68..70cb2249c 100644
--- a/discovery/src/asapo_discovery/server/get_receivers.go
+++ b/discovery/src/asapo_discovery/server/get_receivers.go
@@ -3,11 +3,23 @@ package server
 import (
 	"net/http"
 	"asapo_discovery/logger"
+	"errors"
 )
 
-func getReceivers() (answer []byte, code int) {
-	answer, err := requestHandler.GetReceivers()
-	log_str := "processing get receivers "
+func getService(service string) (answer []byte, code int) {
+	var err error
+	switch service {
+	case "receivers":
+		answer, err = requestHandler.GetReceivers()
+		break
+	case "broker":
+		answer, err = requestHandler.GetBroker()
+		break
+	default:
+		err = errors.New("wrong request: "+service)
+	}
+
+	log_str := "processing get "+service
 	if err != nil {
 		logger.Error(log_str + " - " + err.Error())
 		return []byte(err.Error()),http.StatusInternalServerError
@@ -19,8 +31,14 @@ func getReceivers() (answer []byte, code int) {
 
 func routeGetReceivers(w http.ResponseWriter, r *http.Request) {
 	r.Header.Set("Content-type", "application/json")
-	answer,code := getReceivers()
+	answer,code := getService("receivers")
 	w.WriteHeader(code)
 	w.Write(answer)
 }
 
+func routeGetBroker(w http.ResponseWriter, r *http.Request) {
+	r.Header.Set("Content-type", "application/json")
+	answer,code := getService("broker")
+	w.WriteHeader(code)
+	w.Write(answer)
+}
diff --git a/discovery/src/asapo_discovery/server/listroutes.go b/discovery/src/asapo_discovery/server/listroutes.go
index 5e87f88da..ed068c343 100644
--- a/discovery/src/asapo_discovery/server/listroutes.go
+++ b/discovery/src/asapo_discovery/server/listroutes.go
@@ -11,4 +11,11 @@ var listRoutes = utils.Routes{
 		"/receivers",
 		routeGetReceivers,
 	},
+	utils.Route{
+		"GetBroker",
+		"Get",
+		"/broker",
+		routeGetBroker,
+	},
+
 }
diff --git a/discovery/src/asapo_discovery/server/get_receivers_test.go b/discovery/src/asapo_discovery/server/routes_test.go
similarity index 75%
rename from discovery/src/asapo_discovery/server/get_receivers_test.go
rename to discovery/src/asapo_discovery/server/routes_test.go
index 59c8ce226..a5bfe3e2f 100644
--- a/discovery/src/asapo_discovery/server/get_receivers_test.go
+++ b/discovery/src/asapo_discovery/server/routes_test.go
@@ -30,7 +30,10 @@ type GetReceiversTestSuite struct {
 
 func (suite *GetReceiversTestSuite) SetupTest() {
 	requestHandler = new(request_handler.StaticRequestHandler)
-	requestHandler.Init(10,[]string{"ip1","ip2"})
+	var s utils.Settings= utils.Settings{Receiver:utils.ReceiverInfo{MaxConnections:10,ForceEndpoints:[]string{"ip1","ip2"}},
+	Broker:utils.BrokerInfo{ForceEndpoint:"ip_broker"}}
+
+	requestHandler.Init(s)
 	logger.SetMockLog()
 }
 
@@ -64,3 +67,14 @@ func (suite *GetReceiversTestSuite) TestGetReceivers() {
 }
 
 
+func (suite *GetReceiversTestSuite) TestGetBroker() {
+	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing get broker")))
+
+	w := doRequest("/broker")
+
+	suite.Equal(http.StatusOK, w.Code, "code ok")
+	suite.Equal(w.Body.String(), "ip_broker", "result")
+	assertExpectations(suite.T())
+}
+
+
diff --git a/discovery/src/asapo_discovery/server/server.go b/discovery/src/asapo_discovery/server/server.go
index 36979843d..fb1bb5d5c 100644
--- a/discovery/src/asapo_discovery/server/server.go
+++ b/discovery/src/asapo_discovery/server/server.go
@@ -2,50 +2,20 @@ package server
 
 import (
 	"asapo_discovery/request_handler"
-	"errors"
+	"asapo_discovery/utils"
 )
 
 var requestHandler request_handler.Agent
 
-type serverSettings struct {
-	Endpoints		 []string
-	Mode			 string
-	Port             int
-	MaxConnections   int
-	LogLevel         string
-}
 
-var settings serverSettings
+var settings utils.Settings
 
 func SetHandler(rh request_handler.Agent) error {
 	requestHandler = rh
-	err := requestHandler.Init(settings.MaxConnections,settings.Endpoints)
+	err := requestHandler.Init(settings)
 	return err
 }
 
-func (settings *serverSettings) Validate() error {
-	if len(settings.Endpoints) == 0 &&  settings.Mode != "consul"{
-		return errors.New("Endpoints not set")
-	}
-
-	if settings.MaxConnections == 0 {
-		return errors.New("Max connections not set")
-	}
-
-	if settings.Port == 0 {
-		return errors.New("Server port not set")
-	}
-
-	if settings.Mode == "" {
-		return errors.New("Mode not set")
-	}
-
-	if settings.Mode != "static" && settings.Mode != "consul" {
-		return errors.New("wrong mode: "  + settings.Mode+ ", (allowed static|consul)")
-	}
-
-	return nil
-}
 
 func GetHandlerMode()string {
 	return settings.Mode
diff --git a/discovery/src/asapo_discovery/server/settings_test.go b/discovery/src/asapo_discovery/server/settings_test.go
index 00e12fdd7..d03b1b0af 100644
--- a/discovery/src/asapo_discovery/server/settings_test.go
+++ b/discovery/src/asapo_discovery/server/settings_test.go
@@ -3,15 +3,18 @@ package server
 import (
 	"github.com/stretchr/testify/assert"
 	"testing"
+	"asapo_discovery/utils"
 )
 
-func fillSettings(mode string)serverSettings {
-	var settings serverSettings
+func fillSettings(mode string) utils.Settings {
+	var settings utils.Settings
 	settings.Port = 1
 	settings.Mode = mode
-	settings.MaxConnections = 10
+	settings.Receiver.MaxConnections = 10
 	settings.LogLevel = "info"
-	settings.Endpoints=[]string{"ip1","ip2"}
+	settings.Receiver.ForceEndpoints=[]string{"ip1","ip2"}
+	settings.Broker.ForceEndpoint="ip_b"
+	settings.ConsulEndpoints=[]string{"ipc1","ipc2"}
 	return settings
 }
 
@@ -27,16 +30,25 @@ func TestSettingsWrongMode(t *testing.T) {
 	assert.NotNil(t, err)
 }
 
-func TestSettingsStaticModeNoEndpoints(t *testing.T) {
+func TestSettingsStaticModeNoReceiverEndpoints(t *testing.T) {
 	settings := fillSettings("static")
-	settings.Endpoints=[]string{}
+	settings.Receiver.ForceEndpoints=[]string{}
 	err := settings.Validate()
 	assert.NotNil(t, err)
 }
 
+func TestSettingsStaticModeNoBrokerEndpoints(t *testing.T) {
+	settings := fillSettings("static")
+	settings.Broker.ForceEndpoint=""
+	err := settings.Validate()
+	assert.NotNil(t, err)
+}
+
+
+
 func TestSettingsConsulModeNoEndpoints(t *testing.T) {
 	settings := fillSettings("consul")
-	settings.Endpoints=[]string{}
+	settings.ConsulEndpoints=[]string{}
 	err := settings.Validate()
 	assert.Nil(t, err)
 }
diff --git a/discovery/src/asapo_discovery/utils/stucts.go b/discovery/src/asapo_discovery/utils/stucts.go
new file mode 100644
index 000000000..23c4a655d
--- /dev/null
+++ b/discovery/src/asapo_discovery/utils/stucts.go
@@ -0,0 +1,48 @@
+package utils
+
+import "errors"
+
+type ReceiverInfo struct {
+	ForceEndpoints	 []string
+	MaxConnections   int
+}
+
+type BrokerInfo struct {
+	ForceEndpoint		 string
+}
+
+
+type Settings struct {
+	Receiver 		ReceiverInfo
+	Broker 		    BrokerInfo
+	ConsulEndpoints []string
+	Mode			string
+	Port            int
+	LogLevel        string
+}
+
+func (settings *Settings) Validate() error {
+	if settings.Mode != "consul"{
+		if len(settings.Receiver.ForceEndpoints) == 0 || len(settings.Broker.ForceEndpoint) == 0 {
+		return errors.New("Endpoints not set")
+		}
+	}
+
+	if settings.Receiver.MaxConnections == 0 {
+		return errors.New("Max connections not set")
+	}
+
+	if settings.Port == 0 {
+		return errors.New("Server port not set")
+	}
+
+	if settings.Mode == "" {
+		return errors.New("Mode not set")
+	}
+
+	if settings.Mode != "static" && settings.Mode != "consul" {
+		return errors.New("wrong mode: "  + settings.Mode+ ", (allowed static|consul)")
+	}
+
+	return nil
+}
diff --git a/examples/worker/getnext_broker/CMakeLists.txt b/examples/worker/getnext_broker/CMakeLists.txt
index b8be1ad68..19796f976 100644
--- a/examples/worker/getnext_broker/CMakeLists.txt
+++ b/examples/worker/getnext_broker/CMakeLists.txt
@@ -12,10 +12,10 @@ set_target_properties(${TARGET_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY
 get_target_property(VAR ${TARGET_NAME} RUNTIME_OUTPUT_DIRECTORY)
 add_dependencies(${TARGET_NAME} asapo-broker)
 
-
+prepare_asapo()
 
 configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/broker_settings.json settings.json COPYONLY)
-add_script_test("${TARGET_NAME}" "${CMAKE_CURRENT_BINARY_DIR}/${TARGET_NAME} $<TARGET_PROPERTY:asapo-broker,EXENAME>")
+add_script_test("${TARGET_NAME}" "${CMAKE_CURRENT_BINARY_DIR}/${TARGET_NAME}")
 
 set (dir examples/worker/${TARGET_NAME})
 install(TARGETS ${TARGET_NAME} DESTINATION "${dir}")
diff --git a/examples/worker/getnext_broker/check_linux.sh b/examples/worker/getnext_broker/check_linux.sh
index e2e13639b..18fe844f5 100644
--- a/examples/worker/getnext_broker/check_linux.sh
+++ b/examples/worker/getnext_broker/check_linux.sh
@@ -7,22 +7,21 @@ set -e
 trap Cleanup EXIT
 
 Cleanup() {
-:
-	kill -9 $brokerid
+    set +e
+    nomad stop nginx
+    nomad stop discovery
+    nomad stop broker
 	echo "db.dropDatabase()" | mongo ${database_name}
 }
 
-args=${@:1:$(($# - 1))}
-broker=${@:$#}
-
-$broker -config settings.json &
-brokerid=`echo $!`
-sleep 0.3
+nomad run nginx.nmd
+nomad run discovery.nmd
+nomad run broker.nmd
 
 for i in `seq 1 3`;
 do
 	echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1})' | mongo ${database_name}
 done
 
-$args 127.0.0.1:5005 $database_name 2 | grep "Processed 3 file(s)"
+$@ 127.0.0.1:8400 $database_name 2 | grep "Processed 3 file(s)"
 
diff --git a/examples/worker/getnext_broker/check_windows.bat b/examples/worker/getnext_broker/check_windows.bat
index f176ca87a..891e876ad 100644
--- a/examples/worker/getnext_broker/check_windows.bat
+++ b/examples/worker/getnext_broker/check_windows.bat
@@ -1,20 +1,16 @@
 SET database_name=test_run
 SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe"
 
-::first argument  path to the executable
-:: second argument path to the broker
+c:\opt\consul\nomad run discovery.nmd
+c:\opt\consul\nomad run broker.nmd
+c:\opt\consul\nomad run nginx.nmd
 
-set full_name="%2"
-set short_name="%~nx2"
-
-start /B "" "%full_name%" -config settings.json
-
-ping 1.0.0.0 -n 1 -w 100 > nul
+ping 1.0.0.0 -n 10 -w 100 > nul
 
 for /l %%x in (1, 1, 3) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1}) | %mongo_exe% %database_name%  || goto :error
 
 
-"%1" 127.0.0.1:5005 %database_name% 1 | findstr /c:"Processed 3 file" || goto :error
+"%1" 127.0.0.1:8400 %database_name% 1 | findstr /c:"Processed 3 file" || goto :error
 goto :clean
 
 :error
@@ -22,5 +18,7 @@ call :clean
 exit /b 1
 
 :clean
-Taskkill /IM "%short_name%" /F
+c:\opt\consul\nomad stop discovery
+c:\opt\consul\nomad stop broker
+c:\opt\consul\nomad stop nginx
 echo db.dropDatabase() | %mongo_exe% %database_name%
diff --git a/producer/api/src/receiver_discovery_service.cpp b/producer/api/src/receiver_discovery_service.cpp
index 7f10179df..ebf87761f 100644
--- a/producer/api/src/receiver_discovery_service.cpp
+++ b/producer/api/src/receiver_discovery_service.cpp
@@ -9,7 +9,7 @@
 
 namespace  asapo {
 
-const std::string ReceiverDiscoveryService::kServiceEndpointSuffix = "/receivers";
+const std::string ReceiverDiscoveryService::kServiceEndpointSuffix = "/discovery/receivers";
 
 ReceiverDiscoveryService::ReceiverDiscoveryService(std::string endpoint, uint64_t update_frequency_ms): httpclient__{DefaultHttpClient()},
     log__{GetDefaultProducerLogger()},
diff --git a/receiver/unittests/test_statistics.cpp b/receiver/unittests/test_statistics.cpp
index ca3ba0986..1d910ace5 100644
--- a/receiver/unittests/test_statistics.cpp
+++ b/receiver/unittests/test_statistics.cpp
@@ -31,7 +31,7 @@ namespace {
 TEST(StatisticTestsConstructor, Constructor) {
     Statistics statistics;
     ASSERT_THAT(dynamic_cast<asapo::StatisticsSenderInfluxDb*>(statistics.statistics_sender_list__[0].get()), Ne(nullptr));
-    ASSERT_THAT(dynamic_cast<asapo::StatisticsSenderFluentd*>(statistics.statistics_sender_list__[1].get()), Ne(nullptr));
+//    ASSERT_THAT(dynamic_cast<asapo::StatisticsSenderFluentd*>(statistics.statistics_sender_list__[1].get()), Ne(nullptr));
 }
 
 
diff --git a/tests/automatic/broker/get_next/check_linux.sh b/tests/automatic/broker/get_next/check_linux.sh
index 8c2da0557..3a3119b1a 100644
--- a/tests/automatic/broker/get_next/check_linux.sh
+++ b/tests/automatic/broker/get_next/check_linux.sh
@@ -23,4 +23,4 @@ brokerid=`echo $!`
 curl -v  --silent 127.0.0.1:5005/database/data/next --stderr - | grep '"_id":1'
 curl -v  --silent 127.0.0.1:5005/database/data/next --stderr - | grep '"_id":2'
 
-curl -v  --silent 127.0.0.1:5005/database/data/next --stderr - | grep "Not Found"
+curl -v  --silent 127.0.0.1:5005/database/data/next --stderr - | grep "not found"
diff --git a/tests/automatic/broker/get_next/check_windows.bat b/tests/automatic/broker/get_next/check_windows.bat
index ac3299335..443c05422 100644
--- a/tests/automatic/broker/get_next/check_windows.bat
+++ b/tests/automatic/broker/get_next/check_windows.bat
@@ -13,7 +13,7 @@ ping 1.0.0.0 -n 1 -w 100 > nul
 
 C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/next --stderr - | findstr /c:\"_id\":1  || goto :error
 C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/next --stderr - | findstr /c:\"_id\":2  || goto :error
-C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/next --stderr - | findstr  /c:"Not Found"  || goto :error
+C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/next --stderr - | findstr  /c:"not found"  || goto :error
 
 goto :clean
 
diff --git a/tests/automatic/full_chain/simple_chain/check_linux.sh b/tests/automatic/full_chain/simple_chain/check_linux.sh
index f4724e1c0..11c022e52 100644
--- a/tests/automatic/full_chain/simple_chain/check_linux.sh
+++ b/tests/automatic/full_chain/simple_chain/check_linux.sh
@@ -6,7 +6,7 @@ trap Cleanup EXIT
 
 broker_database_name=test_run
 monitor_database_name=db_test
-broker_address=127.0.0.1:5005
+proxy_address=127.0.0.1:8400
 
 receiver_folder=/tmp/asapo/receiver/files
 
@@ -38,4 +38,4 @@ $1 localhost:8400 100 1000 4 0 &
 #producerid=`echo $!`
 
 
-$2 ${broker_address} ${broker_database_name} 2 | grep "Processed 1000 file(s)"
+$2 ${proxy_address} ${broker_database_name} 2 | grep "Processed 1000 file(s)"
diff --git a/tests/automatic/full_chain/simple_chain/check_windows.bat b/tests/automatic/full_chain/simple_chain/check_windows.bat
index f2b98f2ce..2c6f52915 100644
--- a/tests/automatic/full_chain/simple_chain/check_windows.bat
+++ b/tests/automatic/full_chain/simple_chain/check_windows.bat
@@ -1,6 +1,7 @@
 SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe"
 set broker_database_name=test_run
 SET receiver_folder="c:\tmp\asapo\receiver\files"
+set proxy_address="127.0.0.1:8400"
 
 echo db.%broker_database_name%.insert({dummy:1}) | %mongo_exe% %broker_database_name%
 
@@ -13,12 +14,11 @@ ping 1.0.0.0 -n 10 -w 100 > nul
 
 REM producer
 mkdir %receiver_folder%
-start /B "" "%1" localhost:8400 100 1000 4 0
+start /B "" "%1" %proxy_address% 100 1000 4 0
 ping 1.0.0.0 -n 1 -w 100 > nul
 
 REM worker
-set broker_address="127.0.0.1:5005"
-"%2" %broker_address% %broker_database_name% 2 | findstr /c:"Processed 1000 file(s)"  || goto :error
+"%2" %proxy_address% %broker_database_name% 2 | findstr /c:"Processed 1000 file(s)"  || goto :error
 
 
 goto :clean
diff --git a/tests/automatic/settings/discovery_settings.json.tpl b/tests/automatic/settings/discovery_settings.json.tpl
index 62cb9864b..25cba6782 100644
--- a/tests/automatic/settings/discovery_settings.json.tpl
+++ b/tests/automatic/settings/discovery_settings.json.tpl
@@ -1,6 +1,8 @@
 {
-  "MaxConnections": 32,
   "Mode": "consul",
+  "Receiver": {
+    "MaxConnections": 32
+  },
   "Port": {{ env "NOMAD_PORT_discovery" }},
   "LogLevel":"debug"
 }
diff --git a/tests/automatic/worker/next_multithread_broker/CMakeLists.txt b/tests/automatic/worker/next_multithread_broker/CMakeLists.txt
index 28525645e..69750c8f4 100644
--- a/tests/automatic/worker/next_multithread_broker/CMakeLists.txt
+++ b/tests/automatic/worker/next_multithread_broker/CMakeLists.txt
@@ -11,7 +11,7 @@ target_link_libraries(${TARGET_NAME} test_common asapo-worker)
 ################################
 # Testing
 ################################
-configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/broker_settings.json settings.json COPYONLY)
-add_script_test("${TARGET_NAME}" "$<TARGET_FILE:${TARGET_NAME}> $<TARGET_PROPERTY:asapo-broker,EXENAME>"
+prepare_asapo()
+add_script_test("${TARGET_NAME}" "$<TARGET_FILE:${TARGET_NAME}>"
         )
 
diff --git a/tests/automatic/worker/next_multithread_broker/check_linux.sh b/tests/automatic/worker/next_multithread_broker/check_linux.sh
index 086f98a26..c5a52cbb3 100644
--- a/tests/automatic/worker/next_multithread_broker/check_linux.sh
+++ b/tests/automatic/worker/next_multithread_broker/check_linux.sh
@@ -7,23 +7,25 @@ set -e
 trap Cleanup EXIT
 
 Cleanup() {
-:
-	kill -9 $brokerid
+    set +e
+    nomad stop nginx
+    nomad stop discovery
+    nomad stop broker
 	echo "db.dropDatabase()" | mongo ${database_name}
 }
 
-args=${@:1:$(($# - 1))}
-broker=${@:$#}
 
-$broker -config settings.json &
-brokerid=`echo $!`
-sleep 0.3
+nomad run nginx.nmd
+nomad run discovery.nmd
+nomad run broker.nmd
+
+sleep 1
 
 for i in `seq 1 10`;
 do
 	echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1})' | mongo ${database_name}
 done
 
-$args 127.0.0.1:5005 $database_name 4 10
+$@ 127.0.0.1:8400 $database_name 4 10
 
 
diff --git a/tests/automatic/worker/next_multithread_broker/check_windows.bat b/tests/automatic/worker/next_multithread_broker/check_windows.bat
index 321f9c5c9..b3762c8a8 100644
--- a/tests/automatic/worker/next_multithread_broker/check_windows.bat
+++ b/tests/automatic/worker/next_multithread_broker/check_windows.bat
@@ -2,19 +2,17 @@ SET database_name=test_run
 SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe"
 
 ::first argument  path to the executable
-:: second argument path to the broker
 
-set full_name="%2"
-set short_name="%~nx2"
+c:\opt\consul\nomad run discovery.nmd
+c:\opt\consul\nomad run broker.nmd
+c:\opt\consul\nomad run nginx.nmd
 
-start /B "" "%full_name%" -config settings.json
-
-ping 1.0.0.0 -n 1 -w 100 > nul
+ping 1.0.0.0 -n 10 -w 100 > nul
 
 for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1}) | %mongo_exe% %database_name%  || goto :error
 
 
-%1 127.0.0.1:5005 %database_name% 4 10 || goto :error
+%1 127.0.0.1:8400 %database_name% 4 10 || goto :error
 
 goto :clean
 
@@ -23,5 +21,7 @@ call :clean
 exit /b 1
 
 :clean
-Taskkill /IM "%short_name%" /F
+c:\opt\consul\nomad stop discovery
+c:\opt\consul\nomad stop broker
+c:\opt\consul\nomad stop nginx
 echo db.dropDatabase() | %mongo_exe% %database_name%
diff --git a/tests/manual/performance_broker/discovery.json b/tests/manual/performance_broker/discovery.json
new file mode 100644
index 000000000..bb3394784
--- /dev/null
+++ b/tests/manual/performance_broker/discovery.json
@@ -0,0 +1,10 @@
+{
+  "Mode": "static",
+  "Broker": {
+    "Endpoint": [
+      "localhost:5005"
+    ]
+  },
+  "Port": 5006,
+  "LogLevel": "info"
+}
\ No newline at end of file
diff --git a/tests/manual/performance_full_chain_simple/discovery.json b/tests/manual/performance_full_chain_simple/discovery.json
index 476f732bb..a0b12327c 100644
--- a/tests/manual/performance_full_chain_simple/discovery.json
+++ b/tests/manual/performance_full_chain_simple/discovery.json
@@ -1,7 +1,16 @@
 {
-  "MaxConnections": 32,
   "Mode": "static",
-  "Endpoints":["localhost:4200"],
-  "Port":5006,
-  "LogLevel":"info"
+  "Recevier": {
+    "Endpoints": [
+      "localhost:4200"
+    ],
+    "MaxConnections": 32
+  },
+  "Broker": {
+    "Endpoint": [
+      "localhost:5005"
+    ]
+  },
+  "Port": 5006,
+  "LogLevel": "info"
 }
\ No newline at end of file
diff --git a/tests/manual/performance_full_chain_simple/fluentd.conf b/tests/manual/performance_full_chain_simple/fluentd.conf
index 8757f5e62..145dbce4b 100644
--- a/tests/manual/performance_full_chain_simple/fluentd.conf
+++ b/tests/manual/performance_full_chain_simple/fluentd.conf
@@ -6,7 +6,6 @@
  add_remote_addr true
 </source>
 
-
 <source>
   @type tail
   path /logs/*.broker
diff --git a/tests/manual/performance_full_chain_simple/test.sh b/tests/manual/performance_full_chain_simple/test.sh
index 424451e26..e4f8fed91 100755
--- a/tests/manual/performance_full_chain_simple/test.sh
+++ b/tests/manual/performance_full_chain_simple/test.sh
@@ -25,7 +25,7 @@ log_dir=~/fullchain_tests/logs
 # starts receiver on $receiver_node
 # runs producer with various file sizes from $producer_node and measures performance
 
-file_size=1000
+file_size=100
 file_num=$((100000000 / $file_size))
 echo filesize: ${file_size}K, filenum: $file_num
 
diff --git a/worker/api/cpp/include/worker/data_broker.h b/worker/api/cpp/include/worker/data_broker.h
index ed9e356ca..d916e629d 100644
--- a/worker/api/cpp/include/worker/data_broker.h
+++ b/worker/api/cpp/include/worker/data_broker.h
@@ -17,6 +17,7 @@ auto const kSourceNotFound = "Source Not Found";
 auto const kSourceNotConnected = "Source Not Connacted";
 auto const kSourceAlreadyConnected = "Source Already Connected";
 auto const kErrorReadingSource = "Error Reading Source";
+auto const kNotFound = "Uri not found";
 auto const kPermissionDenied = "Permissionn Denied";
 auto const kNoData = "No Data";
 auto const kWrongInput = "Wrong Input";
diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp
index da6940895..2859b63e2 100644
--- a/worker/api/cpp/src/server_data_broker.cpp
+++ b/worker/api/cpp/src/server_data_broker.cpp
@@ -24,6 +24,9 @@ Error HttpCodeToWorkerError(const HttpCode& code) {
         message = WorkerErrorMessage::kErrorReadingSource;
         break;
     case HttpCode::NotFound:
+        message = WorkerErrorMessage::kErrorReadingSource;
+        break;
+    case HttpCode::Conflict:
         message = WorkerErrorMessage::kNoData;
         return TextErrorWithType(message, ErrorType::kEndOfFile);
     default:
@@ -56,20 +59,19 @@ std::string GetIDFromJson(const std::string& json_string, Error* err) {
     return std::to_string(id);
 }
 
-void ServerDataBroker::ProcessServerError(Error* err, const std::string& response, std::string* redirect_uri) {
-    if ((*err)->GetErrorType() != asapo::ErrorType::kEndOfFile) {
-        (*err)->Append(response);
-        return;
-    } else {
+void ServerDataBroker::ProcessServerError(Error* err, const std::string& response, std::string* op) {
+    (*err)->Append(response);
+    if ((*err)->GetErrorType() == asapo::ErrorType::kEndOfFile) {
         if (response.find("id") != std::string::npos) {
-            auto id = GetIDFromJson(response, err);
-            if (*err) {
+            Error parse_error;
+            auto id = GetIDFromJson(response, &parse_error);
+            if (parse_error) {
+                (*err)->Append(parse_error->Explain());
                 return;
             }
-            *redirect_uri = server_uri_ + "/database/" + source_name_ + "/" + id;
+            *op = id;
         }
     }
-    *err = nullptr;
     return;
 }
 
@@ -83,23 +85,40 @@ Error ServerDataBroker::ProcessRequest(std::string* response, std::string reques
     return HttpCodeToWorkerError(code);
 }
 
+Error ServerDataBroker::GetBrokerUri() {
+    if (!current_broker_uri_.empty()) {
+        return nullptr;
+    }
+
+    std::string request_uri = server_uri_ + "/discovery/broker";
+    Error err;
+    err = ProcessRequest(&current_broker_uri_, request_uri);
+    if (err != nullptr || current_broker_uri_.empty()) {
+        current_broker_uri_ = "";
+        return TextError("cannot get broker uri from " + server_uri_);
+    }
+    return nullptr;
+}
+
+
 Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, const std::string& operation) {
-    std::string request_uri = server_uri_ + "/database/" + source_name_ + "/" + operation;
+    std::string request_suffix = operation;
     uint64_t elapsed_ms = 0;
     std::string response;
     while (true) {
-        auto err = ProcessRequest(&response, request_uri);
+        auto err = GetBrokerUri();
         if (err == nullptr) {
-            break;
+            std::string request_api = current_broker_uri_ + "/database/" + source_name_ + "/";
+            err = ProcessRequest(&response, request_api + request_suffix);
+            if (err == nullptr) {
+                break;
+            }
         }
 
-        ProcessServerError(&err, response, &request_uri);
-        if (err != nullptr) {
-            return err;
-        }
+        ProcessServerError(&err, response, &request_suffix);
 
         if (elapsed_ms >= timeout_ms_) {
-            err = TextErrorWithType("no more data found, exit on timeout", asapo::ErrorType::kTimeOut);
+            err = TextErrorWithType("exit on timeout, last error: " + err->Explain(), asapo::ErrorType::kTimeOut);
             return err;
         }
         std::this_thread::sleep_for(std::chrono::milliseconds(100));
diff --git a/worker/api/cpp/src/server_data_broker.h b/worker/api/cpp/src/server_data_broker.h
index 03f18b8a3..43ccfeb7b 100644
--- a/worker/api/cpp/src/server_data_broker.h
+++ b/worker/api/cpp/src/server_data_broker.h
@@ -20,9 +20,11 @@ class ServerDataBroker final : public asapo::DataBroker {
     std::unique_ptr<HttpClient> httpclient__;
   private:
     Error GetFileInfoFromServer(FileInfo* info, const std::string& operation);
+    Error GetBrokerUri();
     void ProcessServerError(Error* err, const std::string& response, std::string* redirect_uri);
     Error ProcessRequest(std::string* response, std::string request_uri);
     std::string server_uri_;
+    std::string current_broker_uri_;
     std::string source_name_;
     uint64_t timeout_ms_ = 0;
 };
diff --git a/worker/api/cpp/unittests/test_server_broker.cpp b/worker/api/cpp/unittests/test_server_broker.cpp
index 28e2caef4..bf8c4748d 100644
--- a/worker/api/cpp/unittests/test_server_broker.cpp
+++ b/worker/api/cpp/unittests/test_server_broker.cpp
@@ -33,6 +33,7 @@ using ::testing::NiceMock;
 using ::testing::Return;
 using ::testing::SetArgPointee;
 using ::testing::SetArgReferee;
+using testing::AllOf;
 
 namespace {
 
@@ -53,9 +54,11 @@ class ServerDataBrokerTests : public Test {
     NiceMock<MockIO> mock_io;
     NiceMock<MockHttpClient> mock_http_client;
     FileInfo info;
+    std::string expected_server_uri = "test:8400";
+    std::string expected_broker_uri = "broker:5005";
 
     void SetUp() override {
-        data_broker = std::unique_ptr<ServerDataBroker> {new ServerDataBroker("test", "dbname")};
+        data_broker = std::unique_ptr<ServerDataBroker> {new ServerDataBroker(expected_server_uri, "dbname")};
         data_broker->io__ = std::unique_ptr<IO> {&mock_io};
         data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client};
     }
@@ -64,13 +67,20 @@ class ServerDataBrokerTests : public Test {
         data_broker->httpclient__.release();
     }
     void MockGet(const std::string& response) {
-        EXPECT_CALL(mock_http_client, Get_t(_, _, _)).WillOnce(DoAll(
+        EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_broker_uri), _, _)).WillOnce(DoAll(
                     SetArgPointee<1>(HttpCode::OK),
                     SetArgPointee<2>(nullptr),
                     Return(response)
                 ));
     }
 
+    void MockGetBrokerUri() {
+        EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/discovery/broker"), _, _)).WillOnce(DoAll(
+                    SetArgPointee<1>(HttpCode::OK),
+                    SetArgPointee<2>(nullptr),
+                    Return(expected_broker_uri)));
+    }
+
 };
 
 TEST_F(ServerDataBrokerTests, CanConnect) {
@@ -85,7 +95,9 @@ TEST_F(ServerDataBrokerTests, GetNextReturnsErrorOnWrongInput) {
 
 
 TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) {
-    EXPECT_CALL(mock_http_client, Get_t("test/database/dbname/next", _, _)).WillOnce(DoAll(
+    MockGetBrokerUri();
+
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/dbname/next", _, _)).WillOnce(DoAll(
                 SetArgPointee<1>(HttpCode::OK),
                 SetArgPointee<2>(nullptr),
                 Return("")));
@@ -94,19 +106,26 @@ TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) {
 
 
 TEST_F(ServerDataBrokerTests, GetNextReturnsEOFFromHttpClient) {
+    MockGetBrokerUri();
+
+
     EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
-                SetArgPointee<1>(HttpCode::NotFound),
+                SetArgPointee<1>(HttpCode::Conflict),
                 SetArgPointee<2>(nullptr),
                 Return("{\"id\":1}")));
 
     auto err = data_broker->GetNext(&info, nullptr);
 
+    ASSERT_THAT(err, Ne(nullptr));
     ASSERT_THAT(err->Explain(), HasSubstr("timeout"));
 }
 
 TEST_F(ServerDataBrokerTests, GetNextReturnsWrongResponseFromHttpClient) {
+
+    MockGetBrokerUri();
+
     EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
-                SetArgPointee<1>(HttpCode::NotFound),
+                SetArgPointee<1>(HttpCode::Conflict),
                 SetArgPointee<2>(nullptr),
                 Return("id")));
 
@@ -115,17 +134,63 @@ TEST_F(ServerDataBrokerTests, GetNextReturnsWrongResponseFromHttpClient) {
     ASSERT_THAT(err->Explain(), HasSubstr("Cannot parse"));
 }
 
+TEST_F(ServerDataBrokerTests, GetNextReturnsIfBrokerAddressNotFound) {
+    EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/discovery/broker"), _,
+                                        _)).Times(AtLeast(2)).WillRepeatedly(DoAll(
+                                                    SetArgPointee<1>(HttpCode::NotFound),
+                                                    SetArgPointee<2>(nullptr),
+                                                    Return("")));
+
+    data_broker->SetTimeout(100);
+    auto err = data_broker->GetNext(&info, nullptr);
+
+    ASSERT_THAT(err->Explain(), AllOf(HasSubstr("broker uri"), HasSubstr("cannot")));
+}
+
+TEST_F(ServerDataBrokerTests, GetNextReturnsIfBrokerUriEmpty) {
+    EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/discovery/broker"), _,
+                                        _)).Times(AtLeast(2)).WillRepeatedly(DoAll(
+                                                    SetArgPointee<1>(HttpCode::OK),
+                                                    SetArgPointee<2>(nullptr),
+                                                    Return("")));
+
+    data_broker->SetTimeout(100);
+    auto err = data_broker->GetNext(&info, nullptr);
+
+    ASSERT_THAT(err->Explain(), AllOf(HasSubstr("broker uri"), HasSubstr("cannot")));
+}
+
+
+
+TEST_F(ServerDataBrokerTests, GetDoNotCallBrokerUriIfAlreadyFound) {
+    MockGetBrokerUri();
+    MockGet("error_response");
+
+    data_broker->SetTimeout(100);
+    data_broker->GetNext(&info, nullptr);
+    Mock::VerifyAndClearExpectations(&mock_http_client);
+
+    EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/discovery/broker"), _, _)).Times(0);
+    MockGet("error_response");
+    data_broker->GetNext(&info, nullptr);
+}
+
+
 
 TEST_F(ServerDataBrokerTests, GetNextReturnsEOFFromHttpClientUntilTimeout) {
+    MockGetBrokerUri();
+
+
     EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
-                SetArgPointee<1>(HttpCode::NotFound),
+                SetArgPointee<1>(HttpCode::Conflict),
                 SetArgPointee<2>(nullptr),
                 Return("{\"id\":1}")));
 
-    EXPECT_CALL(mock_http_client, Get_t(HasSubstr("1"), _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll(
-                SetArgPointee<1>(HttpCode::NotFound),
-                SetArgPointee<2>(nullptr),
-                Return("{\"id\":1}")));
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/dbname/1", _,
+                                        _)).Times(AtLeast(1)).WillRepeatedly(DoAll(
+                                                    SetArgPointee<1>(HttpCode::Conflict),
+                                                    SetArgPointee<2>(nullptr),
+                                                    Return("{\"id\":1}")));
 
 
     data_broker->SetTimeout(100);
@@ -146,6 +211,8 @@ FileInfo CreateFI() {
 }
 
 TEST_F(ServerDataBrokerTests, GetNextReturnsFileInfo) {
+    MockGetBrokerUri();
+
     auto to_send = CreateFI();
     auto json = to_send.Json();
 
@@ -163,6 +230,7 @@ TEST_F(ServerDataBrokerTests, GetNextReturnsFileInfo) {
 
 
 TEST_F(ServerDataBrokerTests, GetNextReturnsParseError) {
+    MockGetBrokerUri();
     MockGet("error_response");
     auto err = data_broker->GetNext(&info, nullptr);
 
@@ -171,6 +239,7 @@ TEST_F(ServerDataBrokerTests, GetNextReturnsParseError) {
 
 
 TEST_F(ServerDataBrokerTests, GetNextReturnsIfNoDtataNeeded) {
+    MockGetBrokerUri();
     MockGet("error_response");
     EXPECT_CALL( mock_io, GetDataFromFile_t(_, _, _)).Times(0);
 
@@ -178,9 +247,9 @@ TEST_F(ServerDataBrokerTests, GetNextReturnsIfNoDtataNeeded) {
 }
 
 TEST_F(ServerDataBrokerTests, GetNextCallsReadFromFile) {
+    MockGetBrokerUri();
     auto to_send = CreateFI();
     auto json = to_send.Json();
-
     MockGet(json);
 
     EXPECT_CALL(mock_io, GetDataFromFile_t("name", 100, _)).
-- 
GitLab