diff --git a/broker/src/asapo_broker/utils/status_codes.go b/broker/src/asapo_broker/utils/status_codes.go index 70a2193004b0e34bd07f5f0804b50375fd8ba910..58fef4da3eba0fb3d6962487a88b0c2fc8a4d393 100644 --- a/broker/src/asapo_broker/utils/status_codes.go +++ b/broker/src/asapo_broker/utils/status_codes.go @@ -10,5 +10,5 @@ const ( //error codes StatusError = http.StatusInternalServerError StatusWrongInput = http.StatusBadRequest - StatusNoData = http.StatusNotFound + StatusNoData = http.StatusConflict ) diff --git a/discovery/src/asapo_discovery/request_handler/request_handler.go b/discovery/src/asapo_discovery/request_handler/request_handler.go index e2cac458fd6e68d543dae36d56354ddef531b244..b2ce9b5601af3c2df091c926e6ef54e6acc5ab11 100644 --- a/discovery/src/asapo_discovery/request_handler/request_handler.go +++ b/discovery/src/asapo_discovery/request_handler/request_handler.go @@ -1,7 +1,10 @@ package request_handler +import "asapo_discovery/utils" + type Agent interface { GetReceivers() ([]byte, error) - Init(int,[]string) error + GetBroker() ([]byte, error) + Init(settings utils.Settings) error } diff --git a/discovery/src/asapo_discovery/request_handler/request_handler_consul.go b/discovery/src/asapo_discovery/request_handler/request_handler_consul.go index c414158e5865d9d85c9100311c732ab16de0e51d..c4a948a4b26aaea7608e7b29188966c376b9550e 100644 --- a/discovery/src/asapo_discovery/request_handler/request_handler_consul.go +++ b/discovery/src/asapo_discovery/request_handler/request_handler_consul.go @@ -5,6 +5,8 @@ import ( "github.com/hashicorp/consul/api" "strconv" "errors" + "sort" + "sync" ) type ConsulRequestHandler struct { @@ -17,20 +19,63 @@ type Responce struct { Uris []string } +type SafeCounter struct { + counter int + mux sync.Mutex +} + +func (c *SafeCounter) Next(size int) int { + c.mux.Lock() + defer c.mux.Unlock() + val := c.counter % size + c.counter++ + return val +} + +var counter SafeCounter + +func (rh *ConsulRequestHandler) GetServices(name string) ([]string, error) { + var result = make([]string, 0) + services, _, err := rh.client.Health().Service(name, "", true, nil) + if err != nil { + return nil, err + } + for _, service := range (services) { + result = append(result, service.Node.Address+":"+strconv.Itoa(service.Service.Port)) + } + sort.Strings(result) + return result, nil +} + func (rh *ConsulRequestHandler) GetReceivers() ([]byte, error) { - if (rh.client == nil){ - return nil,errors.New("consul client not connected") + if (rh.client == nil) { + return nil, errors.New("consul client not connected") + } + var response Responce + var err error + response.Uris, err = rh.GetServices("receiver") + if err != nil { + return nil, err } - var responce Responce - services,_,err := rh.client.Health().Service("receiver","",true,nil) - if err!=nil { - return nil,err + response.MaxConnections = rh.MaxConnections + return utils.MapToJson(&response) +} + +func (rh *ConsulRequestHandler) GetBroker() ([]byte, error) { + if (rh.client == nil) { + return nil, errors.New("consul client not connected") + } + response, err := rh.GetServices("broker") + if err != nil { + return nil, err } - for _,service := range (services) { - responce.Uris = append(responce.Uris,service.Node.Address+":"+strconv.Itoa(service.Service.Port)) + size := len(response) + if size ==0 { + return []byte(""),nil + }else { + return []byte(response[counter.Next(size)]),nil } - responce.MaxConnections = rh.MaxConnections - return utils.MapToJson(&responce) + return nil, nil } func (rh *ConsulRequestHandler) connectClient(uri string) (client *api.Client, err error) { @@ -48,13 +93,13 @@ func (rh *ConsulRequestHandler) connectClient(uri string) (client *api.Client, e return } -func (rh *ConsulRequestHandler) Init(maxCons int, uris []string) (err error) { - rh.MaxConnections = maxCons - if len(uris) == 0 { +func (rh *ConsulRequestHandler) Init(settings utils.Settings) (err error) { + rh.MaxConnections = settings.Receiver.MaxConnections + if len(settings.ConsulEndpoints) == 0 { rh.client, err = rh.connectClient("") return err } - for _, uri := range (uris) { + for _, uri := range (settings.ConsulEndpoints) { rh.client, err = rh.connectClient(uri) if err == nil { return nil diff --git a/discovery/src/asapo_discovery/request_handler/request_handler_consul_test.go b/discovery/src/asapo_discovery/request_handler/request_handler_consul_test.go index 5e9d18f00fc376b4326d4341e880043128ce98bf..79f349ee897f85cd0fa1af188c251b470c17b304 100644 --- a/discovery/src/asapo_discovery/request_handler/request_handler_consul_test.go +++ b/discovery/src/asapo_discovery/request_handler/request_handler_consul_test.go @@ -3,13 +3,14 @@ package request_handler import ( "github.com/stretchr/testify/suite" "testing" - "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/api" "strconv" + "asapo_discovery/utils" ) type ConsulHandlerTestSuite struct { suite.Suite - client *api.Client + client *api.Client handler ConsulRequestHandler } @@ -17,68 +18,122 @@ func TestConsulHandlerTestSuite(t *testing.T) { suite.Run(t, new(ConsulHandlerTestSuite)) } +var consul_settings utils.Settings + +func (suite *ConsulHandlerTestSuite) registerAgents(name string) { + for i := 1234; i < 1236; i++ { + reg := &api.AgentServiceRegistration{ + ID: name + strconv.Itoa(i), + Name: name, + Port: i, + Check: &api.AgentServiceCheck{ + Interval: "10m", + Status: "passing", + HTTP: "http://localhost:5000/health", + }, + } + err := suite.client.Agent().ServiceRegister(reg) + if err != nil { + panic(err) + } + } + +} + func (suite *ConsulHandlerTestSuite) SetupTest() { var err error + consul_settings = utils.Settings{Receiver: utils.ReceiverInfo{MaxConnections: 10, ForceEndpoints: []string{}}} + suite.client, err = api.NewClient(api.DefaultConfig()) if err != nil { panic(err) } - for i:=1234;i<1236;i++ { - reg := &api.AgentServiceRegistration{ - ID: "receiver"+strconv.Itoa(i), - Name: "receiver", - Port: i, - Check: &api.AgentServiceCheck{ - Interval:"10m", - Status:"passing", - HTTP: "http://localhost:5000/health", - }, - } - err = suite.client.Agent().ServiceRegister(reg) - if err != nil { - panic(err) - } - } + + suite.registerAgents("receiver") + suite.registerAgents("broker") + } func (suite *ConsulHandlerTestSuite) TearDownTest() { suite.client.Agent().ServiceDeregister("receiver1234") suite.client.Agent().ServiceDeregister("receiver1235") + suite.client.Agent().ServiceDeregister("broker1234") + suite.client.Agent().ServiceDeregister("broker1235") } - func (suite *ConsulHandlerTestSuite) TestInitDefaultUri() { - err := suite.handler.Init(10,[]string{}) - suite.NoError(err, "empty list") + err := suite.handler.Init(consul_settings) + suite.NoError(err, "empty list") } func (suite *ConsulHandlerTestSuite) TestInitWrongUri() { - err := suite.handler.Init(10,[]string{"blabla"}) - suite.Error(err, "wrong consul uri") - suite.Nil(suite.handler.client, "client nli after error") + consul_settings.ConsulEndpoints = []string{"blabla"} + err := suite.handler.Init(consul_settings) + suite.Error(err, "wrong consul uri") + suite.Nil(suite.handler.client, "client nli after error") } func (suite *ConsulHandlerTestSuite) TestInitOkUriFirst() { - err := suite.handler.Init(10,[]string{"http://127.0.0.1:8500"}) - suite.NoError(err, "") + consul_settings.ConsulEndpoints = []string{"http://127.0.0.1:8500"} + + err := suite.handler.Init(consul_settings) + suite.NoError(err, "") } func (suite *ConsulHandlerTestSuite) TestInitOkUriNotFirst() { - err := suite.handler.Init(10,[]string{"blabla","http://127.0.0.1:8500"}) - suite.NoError(err, "") -} + consul_settings.ConsulEndpoints = []string{"blabla", "http://127.0.0.1:8500"} + err := suite.handler.Init(consul_settings) + suite.NoError(err, "") +} func (suite *ConsulHandlerTestSuite) TestGetReceivers() { - suite.handler.Init(10,[]string{}) - res,err := suite.handler.GetReceivers() - suite.NoError(err, "") - suite.Equal("{\"MaxConnections\":10,\"Uris\":[\"127.0.0.1:1234\",\"127.0.0.1:1235\"]}",string(res),"uris") + suite.handler.Init(consul_settings) + res, err := suite.handler.GetReceivers() + suite.NoError(err, "") + suite.Equal("{\"MaxConnections\":10,\"Uris\":[\"127.0.0.1:1234\",\"127.0.0.1:1235\"]}", string(res), "uris") } func (suite *ConsulHandlerTestSuite) TestGetReceiversWhenNotConnected() { - suite.handler.Init(10,[]string{"blabla"}) - _,err := suite.handler.GetReceivers() - suite.Error(err, "") + consul_settings.ConsulEndpoints = []string{"blabla"} + suite.handler.Init(consul_settings) + _, err := suite.handler.GetReceivers() + suite.Error(err, "") +} + +func (suite *ConsulHandlerTestSuite) TestGetBrokerWhenNotConnected() { + consul_settings.ConsulEndpoints = []string{"blabla"} + suite.handler.Init(consul_settings) + _, err := suite.handler.GetBroker() + suite.Error(err, "") } + +func (suite *ConsulHandlerTestSuite) TestGetBrokerRoundRobin() { + suite.handler.Init(consul_settings) + res, err := suite.handler.GetBroker() + suite.NoError(err, "") + suite.Equal("127.0.0.1:1234", string(res), "uris") + + res, err = suite.handler.GetBroker() + suite.NoError(err, "") + suite.Equal("127.0.0.1:1235", string(res), "uris") + + res, err = suite.handler.GetBroker() + suite.NoError(err, "") + suite.Equal("127.0.0.1:1234", string(res), "uris") + +} + + +func (suite *ConsulHandlerTestSuite) TestGetBrokerEmpty() { + suite.client.Agent().ServiceDeregister("broker1234") + suite.client.Agent().ServiceDeregister("broker1235") + + suite.handler.Init(consul_settings) + res, err := suite.handler.GetBroker() + suite.NoError(err, "") + suite.Equal("", string(res), "uris") +} + + diff --git a/discovery/src/asapo_discovery/request_handler/request_handler_static.go b/discovery/src/asapo_discovery/request_handler/request_handler_static.go index d52cb205d5b0c00315c871055e8faf96634e269c..90522d8307b6352a0abd34521a0a7a361ce65645 100644 --- a/discovery/src/asapo_discovery/request_handler/request_handler_static.go +++ b/discovery/src/asapo_discovery/request_handler/request_handler_static.go @@ -5,15 +5,23 @@ import ( ) type StaticRequestHandler struct { - Responce - } + receiverResponce Responce + broker string +} + func (rh *StaticRequestHandler) GetReceivers() ([]byte, error) { - return utils.MapToJson(&rh) + return utils.MapToJson(&rh.receiverResponce) } -func (rh *StaticRequestHandler) Init(maxCons int,uris []string) error { - rh.MaxConnections = maxCons - rh.Uris = uris +func (rh *StaticRequestHandler) GetBroker() ([]byte, error) { + return []byte(rh.broker),nil +} + + +func (rh *StaticRequestHandler) Init(settings utils.Settings) error { + rh.receiverResponce.MaxConnections = settings.Receiver.MaxConnections + rh.receiverResponce.Uris = settings.Receiver.ForceEndpoints + rh.broker = settings.Broker.ForceEndpoint return nil } diff --git a/discovery/src/asapo_discovery/request_handler/request_handler_static_test.go b/discovery/src/asapo_discovery/request_handler/request_handler_static_test.go index 01e769e4359ac2b05cd2b162907c6d9b2c0a7e5b..35b08f2f5cac5ccd76623117a05b26548893d4d7 100644 --- a/discovery/src/asapo_discovery/request_handler/request_handler_static_test.go +++ b/discovery/src/asapo_discovery/request_handler/request_handler_static_test.go @@ -3,23 +3,35 @@ package request_handler import ( "github.com/stretchr/testify/assert" "testing" + "asapo_discovery/utils" ) var uris = []string{"ip1","ip2"} const max_conn = 1 +var static_settings utils.Settings= utils.Settings{Receiver:utils.ReceiverInfo{MaxConnections:max_conn,ForceEndpoints:uris},Broker:utils.BrokerInfo{ + ForceEndpoint:"ip_broker"}} + + + var rh StaticRequestHandler; func TestStaticHandlerInitOK(t *testing.T) { - err := rh.Init(max_conn,uris) + err := rh.Init(static_settings) assert.Nil(t, err) } -func TestStaticHandlerGetOK(t *testing.T) { - rh.Init(max_conn,uris) +func TestStaticHandlerGetReceviersOK(t *testing.T) { + rh.Init(static_settings) res,err := rh.GetReceivers() assert.Equal(t,string(res), "{\"MaxConnections\":1,\"Uris\":[\"ip1\",\"ip2\"]}") assert.Nil(t, err) +} +func TestStaticHandlerGetBrokerOK(t *testing.T) { + rh.Init(static_settings) + res,err := rh.GetBroker() + assert.Equal(t,string(res), "ip_broker") + assert.Nil(t, err) } diff --git a/discovery/src/asapo_discovery/server/get_receivers.go b/discovery/src/asapo_discovery/server/get_receivers.go index 41e0cfa68399036a38f31c68bf490f225e9aa025..70cb2249cb4a4c0270f78ba59bb9a48c96d91c3c 100644 --- a/discovery/src/asapo_discovery/server/get_receivers.go +++ b/discovery/src/asapo_discovery/server/get_receivers.go @@ -3,11 +3,23 @@ package server import ( "net/http" "asapo_discovery/logger" + "errors" ) -func getReceivers() (answer []byte, code int) { - answer, err := requestHandler.GetReceivers() - log_str := "processing get receivers " +func getService(service string) (answer []byte, code int) { + var err error + switch service { + case "receivers": + answer, err = requestHandler.GetReceivers() + break + case "broker": + answer, err = requestHandler.GetBroker() + break + default: + err = errors.New("wrong request: "+service) + } + + log_str := "processing get "+service if err != nil { logger.Error(log_str + " - " + err.Error()) return []byte(err.Error()),http.StatusInternalServerError @@ -19,8 +31,14 @@ func getReceivers() (answer []byte, code int) { func routeGetReceivers(w http.ResponseWriter, r *http.Request) { r.Header.Set("Content-type", "application/json") - answer,code := getReceivers() + answer,code := getService("receivers") w.WriteHeader(code) w.Write(answer) } +func routeGetBroker(w http.ResponseWriter, r *http.Request) { + r.Header.Set("Content-type", "application/json") + answer,code := getService("broker") + w.WriteHeader(code) + w.Write(answer) +} diff --git a/discovery/src/asapo_discovery/server/listroutes.go b/discovery/src/asapo_discovery/server/listroutes.go index 5e87f88da6acbb38b2d4ff5b487dfe7dfc3425f9..ed068c3430ed7c61249515db7e2e05f26a785ac8 100644 --- a/discovery/src/asapo_discovery/server/listroutes.go +++ b/discovery/src/asapo_discovery/server/listroutes.go @@ -11,4 +11,11 @@ var listRoutes = utils.Routes{ "/receivers", routeGetReceivers, }, + utils.Route{ + "GetBroker", + "Get", + "/broker", + routeGetBroker, + }, + } diff --git a/discovery/src/asapo_discovery/server/get_receivers_test.go b/discovery/src/asapo_discovery/server/routes_test.go similarity index 75% rename from discovery/src/asapo_discovery/server/get_receivers_test.go rename to discovery/src/asapo_discovery/server/routes_test.go index 59c8ce226fe4be3af87f34f900db129fbce6e175..a5bfe3e2fa90e0a87acf9b1a3b861486cca0f694 100644 --- a/discovery/src/asapo_discovery/server/get_receivers_test.go +++ b/discovery/src/asapo_discovery/server/routes_test.go @@ -30,7 +30,10 @@ type GetReceiversTestSuite struct { func (suite *GetReceiversTestSuite) SetupTest() { requestHandler = new(request_handler.StaticRequestHandler) - requestHandler.Init(10,[]string{"ip1","ip2"}) + var s utils.Settings= utils.Settings{Receiver:utils.ReceiverInfo{MaxConnections:10,ForceEndpoints:[]string{"ip1","ip2"}}, + Broker:utils.BrokerInfo{ForceEndpoint:"ip_broker"}} + + requestHandler.Init(s) logger.SetMockLog() } @@ -64,3 +67,14 @@ func (suite *GetReceiversTestSuite) TestGetReceivers() { } +func (suite *GetReceiversTestSuite) TestGetBroker() { + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing get broker"))) + + w := doRequest("/broker") + + suite.Equal(http.StatusOK, w.Code, "code ok") + suite.Equal(w.Body.String(), "ip_broker", "result") + assertExpectations(suite.T()) +} + + diff --git a/discovery/src/asapo_discovery/server/server.go b/discovery/src/asapo_discovery/server/server.go index 36979843ddf4ab258ed47f7761cc93c3e453074a..fb1bb5d5c629a18c517c3193bd56730ac18e9c8a 100644 --- a/discovery/src/asapo_discovery/server/server.go +++ b/discovery/src/asapo_discovery/server/server.go @@ -2,50 +2,20 @@ package server import ( "asapo_discovery/request_handler" - "errors" + "asapo_discovery/utils" ) var requestHandler request_handler.Agent -type serverSettings struct { - Endpoints []string - Mode string - Port int - MaxConnections int - LogLevel string -} -var settings serverSettings +var settings utils.Settings func SetHandler(rh request_handler.Agent) error { requestHandler = rh - err := requestHandler.Init(settings.MaxConnections,settings.Endpoints) + err := requestHandler.Init(settings) return err } -func (settings *serverSettings) Validate() error { - if len(settings.Endpoints) == 0 && settings.Mode != "consul"{ - return errors.New("Endpoints not set") - } - - if settings.MaxConnections == 0 { - return errors.New("Max connections not set") - } - - if settings.Port == 0 { - return errors.New("Server port not set") - } - - if settings.Mode == "" { - return errors.New("Mode not set") - } - - if settings.Mode != "static" && settings.Mode != "consul" { - return errors.New("wrong mode: " + settings.Mode+ ", (allowed static|consul)") - } - - return nil -} func GetHandlerMode()string { return settings.Mode diff --git a/discovery/src/asapo_discovery/server/settings_test.go b/discovery/src/asapo_discovery/server/settings_test.go index 00e12fdd7367e33a65cca193d06831c43d178147..d03b1b0af4b24049cddb17ff42ebc282d932af25 100644 --- a/discovery/src/asapo_discovery/server/settings_test.go +++ b/discovery/src/asapo_discovery/server/settings_test.go @@ -3,15 +3,18 @@ package server import ( "github.com/stretchr/testify/assert" "testing" + "asapo_discovery/utils" ) -func fillSettings(mode string)serverSettings { - var settings serverSettings +func fillSettings(mode string) utils.Settings { + var settings utils.Settings settings.Port = 1 settings.Mode = mode - settings.MaxConnections = 10 + settings.Receiver.MaxConnections = 10 settings.LogLevel = "info" - settings.Endpoints=[]string{"ip1","ip2"} + settings.Receiver.ForceEndpoints=[]string{"ip1","ip2"} + settings.Broker.ForceEndpoint="ip_b" + settings.ConsulEndpoints=[]string{"ipc1","ipc2"} return settings } @@ -27,16 +30,25 @@ func TestSettingsWrongMode(t *testing.T) { assert.NotNil(t, err) } -func TestSettingsStaticModeNoEndpoints(t *testing.T) { +func TestSettingsStaticModeNoReceiverEndpoints(t *testing.T) { settings := fillSettings("static") - settings.Endpoints=[]string{} + settings.Receiver.ForceEndpoints=[]string{} err := settings.Validate() assert.NotNil(t, err) } +func TestSettingsStaticModeNoBrokerEndpoints(t *testing.T) { + settings := fillSettings("static") + settings.Broker.ForceEndpoint="" + err := settings.Validate() + assert.NotNil(t, err) +} + + + func TestSettingsConsulModeNoEndpoints(t *testing.T) { settings := fillSettings("consul") - settings.Endpoints=[]string{} + settings.ConsulEndpoints=[]string{} err := settings.Validate() assert.Nil(t, err) } diff --git a/discovery/src/asapo_discovery/utils/stucts.go b/discovery/src/asapo_discovery/utils/stucts.go new file mode 100644 index 0000000000000000000000000000000000000000..23c4a655d0611c7fb623dc03d49f9816135985e4 --- /dev/null +++ b/discovery/src/asapo_discovery/utils/stucts.go @@ -0,0 +1,48 @@ +package utils + +import "errors" + +type ReceiverInfo struct { + ForceEndpoints []string + MaxConnections int +} + +type BrokerInfo struct { + ForceEndpoint string +} + + +type Settings struct { + Receiver ReceiverInfo + Broker BrokerInfo + ConsulEndpoints []string + Mode string + Port int + LogLevel string +} + +func (settings *Settings) Validate() error { + if settings.Mode != "consul"{ + if len(settings.Receiver.ForceEndpoints) == 0 || len(settings.Broker.ForceEndpoint) == 0 { + return errors.New("Endpoints not set") + } + } + + if settings.Receiver.MaxConnections == 0 { + return errors.New("Max connections not set") + } + + if settings.Port == 0 { + return errors.New("Server port not set") + } + + if settings.Mode == "" { + return errors.New("Mode not set") + } + + if settings.Mode != "static" && settings.Mode != "consul" { + return errors.New("wrong mode: " + settings.Mode+ ", (allowed static|consul)") + } + + return nil +} diff --git a/examples/worker/getnext_broker/CMakeLists.txt b/examples/worker/getnext_broker/CMakeLists.txt index b8be1ad68eb16bd8e6527f84f749ba781364d9b0..19796f976678b2127e7bdcbeb15b2e9c900739ec 100644 --- a/examples/worker/getnext_broker/CMakeLists.txt +++ b/examples/worker/getnext_broker/CMakeLists.txt @@ -12,10 +12,10 @@ set_target_properties(${TARGET_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY get_target_property(VAR ${TARGET_NAME} RUNTIME_OUTPUT_DIRECTORY) add_dependencies(${TARGET_NAME} asapo-broker) - +prepare_asapo() configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/broker_settings.json settings.json COPYONLY) -add_script_test("${TARGET_NAME}" "${CMAKE_CURRENT_BINARY_DIR}/${TARGET_NAME} $<TARGET_PROPERTY:asapo-broker,EXENAME>") +add_script_test("${TARGET_NAME}" "${CMAKE_CURRENT_BINARY_DIR}/${TARGET_NAME}") set (dir examples/worker/${TARGET_NAME}) install(TARGETS ${TARGET_NAME} DESTINATION "${dir}") diff --git a/examples/worker/getnext_broker/check_linux.sh b/examples/worker/getnext_broker/check_linux.sh index e2e13639bd5f8b56199c909d9a583a7c91a798dc..18fe844f51ae3e6f40943fe0678f0cb0e7252d0e 100644 --- a/examples/worker/getnext_broker/check_linux.sh +++ b/examples/worker/getnext_broker/check_linux.sh @@ -7,22 +7,21 @@ set -e trap Cleanup EXIT Cleanup() { -: - kill -9 $brokerid + set +e + nomad stop nginx + nomad stop discovery + nomad stop broker echo "db.dropDatabase()" | mongo ${database_name} } -args=${@:1:$(($# - 1))} -broker=${@:$#} - -$broker -config settings.json & -brokerid=`echo $!` -sleep 0.3 +nomad run nginx.nmd +nomad run discovery.nmd +nomad run broker.nmd for i in `seq 1 3`; do echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1})' | mongo ${database_name} done -$args 127.0.0.1:5005 $database_name 2 | grep "Processed 3 file(s)" +$@ 127.0.0.1:8400 $database_name 2 | grep "Processed 3 file(s)" diff --git a/examples/worker/getnext_broker/check_windows.bat b/examples/worker/getnext_broker/check_windows.bat index f176ca87ad817b9990cefa5e39b26a40235f1ffc..891e876adee4f71d609df9bb626e90b339aa1d2f 100644 --- a/examples/worker/getnext_broker/check_windows.bat +++ b/examples/worker/getnext_broker/check_windows.bat @@ -1,20 +1,16 @@ SET database_name=test_run SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" -::first argument path to the executable -:: second argument path to the broker +c:\opt\consul\nomad run discovery.nmd +c:\opt\consul\nomad run broker.nmd +c:\opt\consul\nomad run nginx.nmd -set full_name="%2" -set short_name="%~nx2" - -start /B "" "%full_name%" -config settings.json - -ping 1.0.0.0 -n 1 -w 100 > nul +ping 1.0.0.0 -n 10 -w 100 > nul for /l %%x in (1, 1, 3) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1}) | %mongo_exe% %database_name% || goto :error -"%1" 127.0.0.1:5005 %database_name% 1 | findstr /c:"Processed 3 file" || goto :error +"%1" 127.0.0.1:8400 %database_name% 1 | findstr /c:"Processed 3 file" || goto :error goto :clean :error @@ -22,5 +18,7 @@ call :clean exit /b 1 :clean -Taskkill /IM "%short_name%" /F +c:\opt\consul\nomad stop discovery +c:\opt\consul\nomad stop broker +c:\opt\consul\nomad stop nginx echo db.dropDatabase() | %mongo_exe% %database_name% diff --git a/producer/api/src/receiver_discovery_service.cpp b/producer/api/src/receiver_discovery_service.cpp index 7f10179df570486094be9cb1918fad7d39e5005b..ebf87761f06eaf6a75fc0e319f3305cd33826cb1 100644 --- a/producer/api/src/receiver_discovery_service.cpp +++ b/producer/api/src/receiver_discovery_service.cpp @@ -9,7 +9,7 @@ namespace asapo { -const std::string ReceiverDiscoveryService::kServiceEndpointSuffix = "/receivers"; +const std::string ReceiverDiscoveryService::kServiceEndpointSuffix = "/discovery/receivers"; ReceiverDiscoveryService::ReceiverDiscoveryService(std::string endpoint, uint64_t update_frequency_ms): httpclient__{DefaultHttpClient()}, log__{GetDefaultProducerLogger()}, diff --git a/receiver/unittests/test_statistics.cpp b/receiver/unittests/test_statistics.cpp index ca3ba0986977ccf1b4da30727189c30b7494a776..1d910ace5c71a7beabeb47154611dd57ddd45492 100644 --- a/receiver/unittests/test_statistics.cpp +++ b/receiver/unittests/test_statistics.cpp @@ -31,7 +31,7 @@ namespace { TEST(StatisticTestsConstructor, Constructor) { Statistics statistics; ASSERT_THAT(dynamic_cast<asapo::StatisticsSenderInfluxDb*>(statistics.statistics_sender_list__[0].get()), Ne(nullptr)); - ASSERT_THAT(dynamic_cast<asapo::StatisticsSenderFluentd*>(statistics.statistics_sender_list__[1].get()), Ne(nullptr)); +// ASSERT_THAT(dynamic_cast<asapo::StatisticsSenderFluentd*>(statistics.statistics_sender_list__[1].get()), Ne(nullptr)); } diff --git a/tests/automatic/broker/get_next/check_linux.sh b/tests/automatic/broker/get_next/check_linux.sh index 8c2da0557ee06fc479e37bb9d5c4498a5212640a..3a3119b1a0df98ed9cbb29a7f6859819e27531ae 100644 --- a/tests/automatic/broker/get_next/check_linux.sh +++ b/tests/automatic/broker/get_next/check_linux.sh @@ -23,4 +23,4 @@ brokerid=`echo $!` curl -v --silent 127.0.0.1:5005/database/data/next --stderr - | grep '"_id":1' curl -v --silent 127.0.0.1:5005/database/data/next --stderr - | grep '"_id":2' -curl -v --silent 127.0.0.1:5005/database/data/next --stderr - | grep "Not Found" +curl -v --silent 127.0.0.1:5005/database/data/next --stderr - | grep "not found" diff --git a/tests/automatic/broker/get_next/check_windows.bat b/tests/automatic/broker/get_next/check_windows.bat index ac3299335d3232d8b0e2ac08244c452bf510ad56..443c05422d74a9c346e72ca65cfd5c5535a1f964 100644 --- a/tests/automatic/broker/get_next/check_windows.bat +++ b/tests/automatic/broker/get_next/check_windows.bat @@ -13,7 +13,7 @@ ping 1.0.0.0 -n 1 -w 100 > nul C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next --stderr - | findstr /c:\"_id\":1 || goto :error C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next --stderr - | findstr /c:\"_id\":2 || goto :error -C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next --stderr - | findstr /c:"Not Found" || goto :error +C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next --stderr - | findstr /c:"not found" || goto :error goto :clean diff --git a/tests/automatic/full_chain/simple_chain/check_linux.sh b/tests/automatic/full_chain/simple_chain/check_linux.sh index f4724e1c08480bccac04a304984cd6532d11bea9..11c022e52ccfcf5a5eb5d996b34a1ebdb6363e07 100644 --- a/tests/automatic/full_chain/simple_chain/check_linux.sh +++ b/tests/automatic/full_chain/simple_chain/check_linux.sh @@ -6,7 +6,7 @@ trap Cleanup EXIT broker_database_name=test_run monitor_database_name=db_test -broker_address=127.0.0.1:5005 +proxy_address=127.0.0.1:8400 receiver_folder=/tmp/asapo/receiver/files @@ -38,4 +38,4 @@ $1 localhost:8400 100 1000 4 0 & #producerid=`echo $!` -$2 ${broker_address} ${broker_database_name} 2 | grep "Processed 1000 file(s)" +$2 ${proxy_address} ${broker_database_name} 2 | grep "Processed 1000 file(s)" diff --git a/tests/automatic/full_chain/simple_chain/check_windows.bat b/tests/automatic/full_chain/simple_chain/check_windows.bat index f2b98f2cefa70b526cbceffad8adb04d4f9d245c..2c6f5291593fc91718a68b81a4415f191bdad4ea 100644 --- a/tests/automatic/full_chain/simple_chain/check_windows.bat +++ b/tests/automatic/full_chain/simple_chain/check_windows.bat @@ -1,6 +1,7 @@ SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" set broker_database_name=test_run SET receiver_folder="c:\tmp\asapo\receiver\files" +set proxy_address="127.0.0.1:8400" echo db.%broker_database_name%.insert({dummy:1}) | %mongo_exe% %broker_database_name% @@ -13,12 +14,11 @@ ping 1.0.0.0 -n 10 -w 100 > nul REM producer mkdir %receiver_folder% -start /B "" "%1" localhost:8400 100 1000 4 0 +start /B "" "%1" %proxy_address% 100 1000 4 0 ping 1.0.0.0 -n 1 -w 100 > nul REM worker -set broker_address="127.0.0.1:5005" -"%2" %broker_address% %broker_database_name% 2 | findstr /c:"Processed 1000 file(s)" || goto :error +"%2" %proxy_address% %broker_database_name% 2 | findstr /c:"Processed 1000 file(s)" || goto :error goto :clean diff --git a/tests/automatic/settings/discovery_settings.json.tpl b/tests/automatic/settings/discovery_settings.json.tpl index 62cb9864b6b7cf0c4a738ca8f1a2b254ff4400fc..25cba67824339d7c81a9d39b4a89d8356d4ea9bf 100644 --- a/tests/automatic/settings/discovery_settings.json.tpl +++ b/tests/automatic/settings/discovery_settings.json.tpl @@ -1,6 +1,8 @@ { - "MaxConnections": 32, "Mode": "consul", + "Receiver": { + "MaxConnections": 32 + }, "Port": {{ env "NOMAD_PORT_discovery" }}, "LogLevel":"debug" } diff --git a/tests/automatic/worker/next_multithread_broker/CMakeLists.txt b/tests/automatic/worker/next_multithread_broker/CMakeLists.txt index 28525645e8c1e08f24251df35bbff7c2562b9f91..69750c8f47ce37a2e8e7b4a5adccd022a6373328 100644 --- a/tests/automatic/worker/next_multithread_broker/CMakeLists.txt +++ b/tests/automatic/worker/next_multithread_broker/CMakeLists.txt @@ -11,7 +11,7 @@ target_link_libraries(${TARGET_NAME} test_common asapo-worker) ################################ # Testing ################################ -configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/broker_settings.json settings.json COPYONLY) -add_script_test("${TARGET_NAME}" "$<TARGET_FILE:${TARGET_NAME}> $<TARGET_PROPERTY:asapo-broker,EXENAME>" +prepare_asapo() +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:${TARGET_NAME}>" ) diff --git a/tests/automatic/worker/next_multithread_broker/check_linux.sh b/tests/automatic/worker/next_multithread_broker/check_linux.sh index 086f98a26573c2815fc36b227b876ac9be313add..c5a52cbb37b40d257309f329049f3b3da5b72470 100644 --- a/tests/automatic/worker/next_multithread_broker/check_linux.sh +++ b/tests/automatic/worker/next_multithread_broker/check_linux.sh @@ -7,23 +7,25 @@ set -e trap Cleanup EXIT Cleanup() { -: - kill -9 $brokerid + set +e + nomad stop nginx + nomad stop discovery + nomad stop broker echo "db.dropDatabase()" | mongo ${database_name} } -args=${@:1:$(($# - 1))} -broker=${@:$#} -$broker -config settings.json & -brokerid=`echo $!` -sleep 0.3 +nomad run nginx.nmd +nomad run discovery.nmd +nomad run broker.nmd + +sleep 1 for i in `seq 1 10`; do echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1})' | mongo ${database_name} done -$args 127.0.0.1:5005 $database_name 4 10 +$@ 127.0.0.1:8400 $database_name 4 10 diff --git a/tests/automatic/worker/next_multithread_broker/check_windows.bat b/tests/automatic/worker/next_multithread_broker/check_windows.bat index 321f9c5c947274cfffa449f5ada1b8412e17a100..b3762c8a856d534d4f92590f68b5820e09265ab1 100644 --- a/tests/automatic/worker/next_multithread_broker/check_windows.bat +++ b/tests/automatic/worker/next_multithread_broker/check_windows.bat @@ -2,19 +2,17 @@ SET database_name=test_run SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" ::first argument path to the executable -:: second argument path to the broker -set full_name="%2" -set short_name="%~nx2" +c:\opt\consul\nomad run discovery.nmd +c:\opt\consul\nomad run broker.nmd +c:\opt\consul\nomad run nginx.nmd -start /B "" "%full_name%" -config settings.json - -ping 1.0.0.0 -n 1 -w 100 > nul +ping 1.0.0.0 -n 10 -w 100 > nul for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1}) | %mongo_exe% %database_name% || goto :error -%1 127.0.0.1:5005 %database_name% 4 10 || goto :error +%1 127.0.0.1:8400 %database_name% 4 10 || goto :error goto :clean @@ -23,5 +21,7 @@ call :clean exit /b 1 :clean -Taskkill /IM "%short_name%" /F +c:\opt\consul\nomad stop discovery +c:\opt\consul\nomad stop broker +c:\opt\consul\nomad stop nginx echo db.dropDatabase() | %mongo_exe% %database_name% diff --git a/tests/manual/performance_broker/discovery.json b/tests/manual/performance_broker/discovery.json new file mode 100644 index 0000000000000000000000000000000000000000..bb339478444bd27419c0c0a93acdc40972917c4f --- /dev/null +++ b/tests/manual/performance_broker/discovery.json @@ -0,0 +1,10 @@ +{ + "Mode": "static", + "Broker": { + "Endpoint": [ + "localhost:5005" + ] + }, + "Port": 5006, + "LogLevel": "info" +} \ No newline at end of file diff --git a/tests/manual/performance_full_chain_simple/discovery.json b/tests/manual/performance_full_chain_simple/discovery.json index 476f732bbedad31adc0e4ce4fbbee1ca081cc025..a0b12327cb2f764e823b73263f85c3e766db6b89 100644 --- a/tests/manual/performance_full_chain_simple/discovery.json +++ b/tests/manual/performance_full_chain_simple/discovery.json @@ -1,7 +1,16 @@ { - "MaxConnections": 32, "Mode": "static", - "Endpoints":["localhost:4200"], - "Port":5006, - "LogLevel":"info" + "Recevier": { + "Endpoints": [ + "localhost:4200" + ], + "MaxConnections": 32 + }, + "Broker": { + "Endpoint": [ + "localhost:5005" + ] + }, + "Port": 5006, + "LogLevel": "info" } \ No newline at end of file diff --git a/tests/manual/performance_full_chain_simple/fluentd.conf b/tests/manual/performance_full_chain_simple/fluentd.conf index 8757f5e62529c3aa35f31edeb244224675bb5f6a..145dbce4bd148936c75da0c94b13786359908a25 100644 --- a/tests/manual/performance_full_chain_simple/fluentd.conf +++ b/tests/manual/performance_full_chain_simple/fluentd.conf @@ -6,7 +6,6 @@ add_remote_addr true </source> - <source> @type tail path /logs/*.broker diff --git a/tests/manual/performance_full_chain_simple/test.sh b/tests/manual/performance_full_chain_simple/test.sh index 424451e26fa35147f9c7eb30abc98fcf6f55afa7..e4f8fed91d354606c8c58066bb515b1defb561d0 100755 --- a/tests/manual/performance_full_chain_simple/test.sh +++ b/tests/manual/performance_full_chain_simple/test.sh @@ -25,7 +25,7 @@ log_dir=~/fullchain_tests/logs # starts receiver on $receiver_node # runs producer with various file sizes from $producer_node and measures performance -file_size=1000 +file_size=100 file_num=$((100000000 / $file_size)) echo filesize: ${file_size}K, filenum: $file_num diff --git a/worker/api/cpp/include/worker/data_broker.h b/worker/api/cpp/include/worker/data_broker.h index ed9e356cad549ed78c8df753d404e91c2a3c5aa7..d916e629dd16daa6056f45763abd7f76cfc8623a 100644 --- a/worker/api/cpp/include/worker/data_broker.h +++ b/worker/api/cpp/include/worker/data_broker.h @@ -17,6 +17,7 @@ auto const kSourceNotFound = "Source Not Found"; auto const kSourceNotConnected = "Source Not Connacted"; auto const kSourceAlreadyConnected = "Source Already Connected"; auto const kErrorReadingSource = "Error Reading Source"; +auto const kNotFound = "Uri not found"; auto const kPermissionDenied = "Permissionn Denied"; auto const kNoData = "No Data"; auto const kWrongInput = "Wrong Input"; diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp index da6940895b746e88d52dd23b83e8e401a89c32b7..2859b63e297cce2fefe8704121dabc99b01d631f 100644 --- a/worker/api/cpp/src/server_data_broker.cpp +++ b/worker/api/cpp/src/server_data_broker.cpp @@ -24,6 +24,9 @@ Error HttpCodeToWorkerError(const HttpCode& code) { message = WorkerErrorMessage::kErrorReadingSource; break; case HttpCode::NotFound: + message = WorkerErrorMessage::kErrorReadingSource; + break; + case HttpCode::Conflict: message = WorkerErrorMessage::kNoData; return TextErrorWithType(message, ErrorType::kEndOfFile); default: @@ -56,20 +59,19 @@ std::string GetIDFromJson(const std::string& json_string, Error* err) { return std::to_string(id); } -void ServerDataBroker::ProcessServerError(Error* err, const std::string& response, std::string* redirect_uri) { - if ((*err)->GetErrorType() != asapo::ErrorType::kEndOfFile) { - (*err)->Append(response); - return; - } else { +void ServerDataBroker::ProcessServerError(Error* err, const std::string& response, std::string* op) { + (*err)->Append(response); + if ((*err)->GetErrorType() == asapo::ErrorType::kEndOfFile) { if (response.find("id") != std::string::npos) { - auto id = GetIDFromJson(response, err); - if (*err) { + Error parse_error; + auto id = GetIDFromJson(response, &parse_error); + if (parse_error) { + (*err)->Append(parse_error->Explain()); return; } - *redirect_uri = server_uri_ + "/database/" + source_name_ + "/" + id; + *op = id; } } - *err = nullptr; return; } @@ -83,23 +85,40 @@ Error ServerDataBroker::ProcessRequest(std::string* response, std::string reques return HttpCodeToWorkerError(code); } +Error ServerDataBroker::GetBrokerUri() { + if (!current_broker_uri_.empty()) { + return nullptr; + } + + std::string request_uri = server_uri_ + "/discovery/broker"; + Error err; + err = ProcessRequest(¤t_broker_uri_, request_uri); + if (err != nullptr || current_broker_uri_.empty()) { + current_broker_uri_ = ""; + return TextError("cannot get broker uri from " + server_uri_); + } + return nullptr; +} + + Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, const std::string& operation) { - std::string request_uri = server_uri_ + "/database/" + source_name_ + "/" + operation; + std::string request_suffix = operation; uint64_t elapsed_ms = 0; std::string response; while (true) { - auto err = ProcessRequest(&response, request_uri); + auto err = GetBrokerUri(); if (err == nullptr) { - break; + std::string request_api = current_broker_uri_ + "/database/" + source_name_ + "/"; + err = ProcessRequest(&response, request_api + request_suffix); + if (err == nullptr) { + break; + } } - ProcessServerError(&err, response, &request_uri); - if (err != nullptr) { - return err; - } + ProcessServerError(&err, response, &request_suffix); if (elapsed_ms >= timeout_ms_) { - err = TextErrorWithType("no more data found, exit on timeout", asapo::ErrorType::kTimeOut); + err = TextErrorWithType("exit on timeout, last error: " + err->Explain(), asapo::ErrorType::kTimeOut); return err; } std::this_thread::sleep_for(std::chrono::milliseconds(100)); diff --git a/worker/api/cpp/src/server_data_broker.h b/worker/api/cpp/src/server_data_broker.h index 03f18b8a36406f7f3e607e30f30b9fc9ca8f2918..43ccfeb7be55df5a95c8748eada7604a79986f5b 100644 --- a/worker/api/cpp/src/server_data_broker.h +++ b/worker/api/cpp/src/server_data_broker.h @@ -20,9 +20,11 @@ class ServerDataBroker final : public asapo::DataBroker { std::unique_ptr<HttpClient> httpclient__; private: Error GetFileInfoFromServer(FileInfo* info, const std::string& operation); + Error GetBrokerUri(); void ProcessServerError(Error* err, const std::string& response, std::string* redirect_uri); Error ProcessRequest(std::string* response, std::string request_uri); std::string server_uri_; + std::string current_broker_uri_; std::string source_name_; uint64_t timeout_ms_ = 0; }; diff --git a/worker/api/cpp/unittests/test_server_broker.cpp b/worker/api/cpp/unittests/test_server_broker.cpp index 28e2caef439d0a720983785b64dba9575e5db300..bf8c4748d341220be3a9a48e5d60d421228d856a 100644 --- a/worker/api/cpp/unittests/test_server_broker.cpp +++ b/worker/api/cpp/unittests/test_server_broker.cpp @@ -33,6 +33,7 @@ using ::testing::NiceMock; using ::testing::Return; using ::testing::SetArgPointee; using ::testing::SetArgReferee; +using testing::AllOf; namespace { @@ -53,9 +54,11 @@ class ServerDataBrokerTests : public Test { NiceMock<MockIO> mock_io; NiceMock<MockHttpClient> mock_http_client; FileInfo info; + std::string expected_server_uri = "test:8400"; + std::string expected_broker_uri = "broker:5005"; void SetUp() override { - data_broker = std::unique_ptr<ServerDataBroker> {new ServerDataBroker("test", "dbname")}; + data_broker = std::unique_ptr<ServerDataBroker> {new ServerDataBroker(expected_server_uri, "dbname")}; data_broker->io__ = std::unique_ptr<IO> {&mock_io}; data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; } @@ -64,13 +67,20 @@ class ServerDataBrokerTests : public Test { data_broker->httpclient__.release(); } void MockGet(const std::string& response) { - EXPECT_CALL(mock_http_client, Get_t(_, _, _)).WillOnce(DoAll( + EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_broker_uri), _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), Return(response) )); } + void MockGetBrokerUri() { + EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/discovery/broker"), _, _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(expected_broker_uri))); + } + }; TEST_F(ServerDataBrokerTests, CanConnect) { @@ -85,7 +95,9 @@ TEST_F(ServerDataBrokerTests, GetNextReturnsErrorOnWrongInput) { TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) { - EXPECT_CALL(mock_http_client, Get_t("test/database/dbname/next", _, _)).WillOnce(DoAll( + MockGetBrokerUri(); + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/dbname/next", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), Return(""))); @@ -94,19 +106,26 @@ TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) { TEST_F(ServerDataBrokerTests, GetNextReturnsEOFFromHttpClient) { + MockGetBrokerUri(); + + EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::NotFound), + SetArgPointee<1>(HttpCode::Conflict), SetArgPointee<2>(nullptr), Return("{\"id\":1}"))); auto err = data_broker->GetNext(&info, nullptr); + ASSERT_THAT(err, Ne(nullptr)); ASSERT_THAT(err->Explain(), HasSubstr("timeout")); } TEST_F(ServerDataBrokerTests, GetNextReturnsWrongResponseFromHttpClient) { + + MockGetBrokerUri(); + EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::NotFound), + SetArgPointee<1>(HttpCode::Conflict), SetArgPointee<2>(nullptr), Return("id"))); @@ -115,17 +134,63 @@ TEST_F(ServerDataBrokerTests, GetNextReturnsWrongResponseFromHttpClient) { ASSERT_THAT(err->Explain(), HasSubstr("Cannot parse")); } +TEST_F(ServerDataBrokerTests, GetNextReturnsIfBrokerAddressNotFound) { + EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/discovery/broker"), _, + _)).Times(AtLeast(2)).WillRepeatedly(DoAll( + SetArgPointee<1>(HttpCode::NotFound), + SetArgPointee<2>(nullptr), + Return(""))); + + data_broker->SetTimeout(100); + auto err = data_broker->GetNext(&info, nullptr); + + ASSERT_THAT(err->Explain(), AllOf(HasSubstr("broker uri"), HasSubstr("cannot"))); +} + +TEST_F(ServerDataBrokerTests, GetNextReturnsIfBrokerUriEmpty) { + EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/discovery/broker"), _, + _)).Times(AtLeast(2)).WillRepeatedly(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(""))); + + data_broker->SetTimeout(100); + auto err = data_broker->GetNext(&info, nullptr); + + ASSERT_THAT(err->Explain(), AllOf(HasSubstr("broker uri"), HasSubstr("cannot"))); +} + + + +TEST_F(ServerDataBrokerTests, GetDoNotCallBrokerUriIfAlreadyFound) { + MockGetBrokerUri(); + MockGet("error_response"); + + data_broker->SetTimeout(100); + data_broker->GetNext(&info, nullptr); + Mock::VerifyAndClearExpectations(&mock_http_client); + + EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/discovery/broker"), _, _)).Times(0); + MockGet("error_response"); + data_broker->GetNext(&info, nullptr); +} + + TEST_F(ServerDataBrokerTests, GetNextReturnsEOFFromHttpClientUntilTimeout) { + MockGetBrokerUri(); + + EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::NotFound), + SetArgPointee<1>(HttpCode::Conflict), SetArgPointee<2>(nullptr), Return("{\"id\":1}"))); - EXPECT_CALL(mock_http_client, Get_t(HasSubstr("1"), _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll( - SetArgPointee<1>(HttpCode::NotFound), - SetArgPointee<2>(nullptr), - Return("{\"id\":1}"))); + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/dbname/1", _, + _)).Times(AtLeast(1)).WillRepeatedly(DoAll( + SetArgPointee<1>(HttpCode::Conflict), + SetArgPointee<2>(nullptr), + Return("{\"id\":1}"))); data_broker->SetTimeout(100); @@ -146,6 +211,8 @@ FileInfo CreateFI() { } TEST_F(ServerDataBrokerTests, GetNextReturnsFileInfo) { + MockGetBrokerUri(); + auto to_send = CreateFI(); auto json = to_send.Json(); @@ -163,6 +230,7 @@ TEST_F(ServerDataBrokerTests, GetNextReturnsFileInfo) { TEST_F(ServerDataBrokerTests, GetNextReturnsParseError) { + MockGetBrokerUri(); MockGet("error_response"); auto err = data_broker->GetNext(&info, nullptr); @@ -171,6 +239,7 @@ TEST_F(ServerDataBrokerTests, GetNextReturnsParseError) { TEST_F(ServerDataBrokerTests, GetNextReturnsIfNoDtataNeeded) { + MockGetBrokerUri(); MockGet("error_response"); EXPECT_CALL( mock_io, GetDataFromFile_t(_, _, _)).Times(0); @@ -178,9 +247,9 @@ TEST_F(ServerDataBrokerTests, GetNextReturnsIfNoDtataNeeded) { } TEST_F(ServerDataBrokerTests, GetNextCallsReadFromFile) { + MockGetBrokerUri(); auto to_send = CreateFI(); auto json = to_send.Json(); - MockGet(json); EXPECT_CALL(mock_io, GetDataFromFile_t("name", 100, _)).