Skip to content
Snippets Groups Projects
Commit 8d0a6b48 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

service discovery for broker

parent b85b480f
No related branches found
No related tags found
No related merge requests found
Showing
with 322 additions and 133 deletions
......@@ -10,5 +10,5 @@ const (
//error codes
StatusError = http.StatusInternalServerError
StatusWrongInput = http.StatusBadRequest
StatusNoData = http.StatusNotFound
StatusNoData = http.StatusConflict
)
package request_handler
import "asapo_discovery/utils"
type Agent interface {
GetReceivers() ([]byte, error)
Init(int,[]string) error
GetBroker() ([]byte, error)
Init(settings utils.Settings) error
}
......@@ -5,6 +5,8 @@ import (
"github.com/hashicorp/consul/api"
"strconv"
"errors"
"sort"
"sync"
)
type ConsulRequestHandler struct {
......@@ -17,20 +19,63 @@ type Responce struct {
Uris []string
}
type SafeCounter struct {
counter int
mux sync.Mutex
}
func (c *SafeCounter) Next(size int) int {
c.mux.Lock()
defer c.mux.Unlock()
val := c.counter % size
c.counter++
return val
}
var counter SafeCounter
func (rh *ConsulRequestHandler) GetServices(name string) ([]string, error) {
var result = make([]string, 0)
services, _, err := rh.client.Health().Service(name, "", true, nil)
if err != nil {
return nil, err
}
for _, service := range (services) {
result = append(result, service.Node.Address+":"+strconv.Itoa(service.Service.Port))
}
sort.Strings(result)
return result, nil
}
func (rh *ConsulRequestHandler) GetReceivers() ([]byte, error) {
if (rh.client == nil){
return nil,errors.New("consul client not connected")
if (rh.client == nil) {
return nil, errors.New("consul client not connected")
}
var response Responce
var err error
response.Uris, err = rh.GetServices("receiver")
if err != nil {
return nil, err
}
var responce Responce
services,_,err := rh.client.Health().Service("receiver","",true,nil)
if err!=nil {
return nil,err
response.MaxConnections = rh.MaxConnections
return utils.MapToJson(&response)
}
func (rh *ConsulRequestHandler) GetBroker() ([]byte, error) {
if (rh.client == nil) {
return nil, errors.New("consul client not connected")
}
response, err := rh.GetServices("broker")
if err != nil {
return nil, err
}
for _,service := range (services) {
responce.Uris = append(responce.Uris,service.Node.Address+":"+strconv.Itoa(service.Service.Port))
size := len(response)
if size ==0 {
return []byte(""),nil
}else {
return []byte(response[counter.Next(size)]),nil
}
responce.MaxConnections = rh.MaxConnections
return utils.MapToJson(&responce)
return nil, nil
}
func (rh *ConsulRequestHandler) connectClient(uri string) (client *api.Client, err error) {
......@@ -48,13 +93,13 @@ func (rh *ConsulRequestHandler) connectClient(uri string) (client *api.Client, e
return
}
func (rh *ConsulRequestHandler) Init(maxCons int, uris []string) (err error) {
rh.MaxConnections = maxCons
if len(uris) == 0 {
func (rh *ConsulRequestHandler) Init(settings utils.Settings) (err error) {
rh.MaxConnections = settings.Receiver.MaxConnections
if len(settings.ConsulEndpoints) == 0 {
rh.client, err = rh.connectClient("")
return err
}
for _, uri := range (uris) {
for _, uri := range (settings.ConsulEndpoints) {
rh.client, err = rh.connectClient(uri)
if err == nil {
return nil
......
......@@ -3,13 +3,14 @@ package request_handler
import (
"github.com/stretchr/testify/suite"
"testing"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api"
"strconv"
"asapo_discovery/utils"
)
type ConsulHandlerTestSuite struct {
suite.Suite
client *api.Client
client *api.Client
handler ConsulRequestHandler
}
......@@ -17,68 +18,122 @@ func TestConsulHandlerTestSuite(t *testing.T) {
suite.Run(t, new(ConsulHandlerTestSuite))
}
var consul_settings utils.Settings
func (suite *ConsulHandlerTestSuite) registerAgents(name string) {
for i := 1234; i < 1236; i++ {
reg := &api.AgentServiceRegistration{
ID: name + strconv.Itoa(i),
Name: name,
Port: i,
Check: &api.AgentServiceCheck{
Interval: "10m",
Status: "passing",
HTTP: "http://localhost:5000/health",
},
}
err := suite.client.Agent().ServiceRegister(reg)
if err != nil {
panic(err)
}
}
}
func (suite *ConsulHandlerTestSuite) SetupTest() {
var err error
consul_settings = utils.Settings{Receiver: utils.ReceiverInfo{MaxConnections: 10, ForceEndpoints: []string{}}}
suite.client, err = api.NewClient(api.DefaultConfig())
if err != nil {
panic(err)
}
for i:=1234;i<1236;i++ {
reg := &api.AgentServiceRegistration{
ID: "receiver"+strconv.Itoa(i),
Name: "receiver",
Port: i,
Check: &api.AgentServiceCheck{
Interval:"10m",
Status:"passing",
HTTP: "http://localhost:5000/health",
},
}
err = suite.client.Agent().ServiceRegister(reg)
if err != nil {
panic(err)
}
}
suite.registerAgents("receiver")
suite.registerAgents("broker")
}
func (suite *ConsulHandlerTestSuite) TearDownTest() {
suite.client.Agent().ServiceDeregister("receiver1234")
suite.client.Agent().ServiceDeregister("receiver1235")
suite.client.Agent().ServiceDeregister("broker1234")
suite.client.Agent().ServiceDeregister("broker1235")
}
func (suite *ConsulHandlerTestSuite) TestInitDefaultUri() {
err := suite.handler.Init(10,[]string{})
suite.NoError(err, "empty list")
err := suite.handler.Init(consul_settings)
suite.NoError(err, "empty list")
}
func (suite *ConsulHandlerTestSuite) TestInitWrongUri() {
err := suite.handler.Init(10,[]string{"blabla"})
suite.Error(err, "wrong consul uri")
suite.Nil(suite.handler.client, "client nli after error")
consul_settings.ConsulEndpoints = []string{"blabla"}
err := suite.handler.Init(consul_settings)
suite.Error(err, "wrong consul uri")
suite.Nil(suite.handler.client, "client nli after error")
}
func (suite *ConsulHandlerTestSuite) TestInitOkUriFirst() {
err := suite.handler.Init(10,[]string{"http://127.0.0.1:8500"})
suite.NoError(err, "")
consul_settings.ConsulEndpoints = []string{"http://127.0.0.1:8500"}
err := suite.handler.Init(consul_settings)
suite.NoError(err, "")
}
func (suite *ConsulHandlerTestSuite) TestInitOkUriNotFirst() {
err := suite.handler.Init(10,[]string{"blabla","http://127.0.0.1:8500"})
suite.NoError(err, "")
}
consul_settings.ConsulEndpoints = []string{"blabla", "http://127.0.0.1:8500"}
err := suite.handler.Init(consul_settings)
suite.NoError(err, "")
}
func (suite *ConsulHandlerTestSuite) TestGetReceivers() {
suite.handler.Init(10,[]string{})
res,err := suite.handler.GetReceivers()
suite.NoError(err, "")
suite.Equal("{\"MaxConnections\":10,\"Uris\":[\"127.0.0.1:1234\",\"127.0.0.1:1235\"]}",string(res),"uris")
suite.handler.Init(consul_settings)
res, err := suite.handler.GetReceivers()
suite.NoError(err, "")
suite.Equal("{\"MaxConnections\":10,\"Uris\":[\"127.0.0.1:1234\",\"127.0.0.1:1235\"]}", string(res), "uris")
}
func (suite *ConsulHandlerTestSuite) TestGetReceiversWhenNotConnected() {
suite.handler.Init(10,[]string{"blabla"})
_,err := suite.handler.GetReceivers()
suite.Error(err, "")
consul_settings.ConsulEndpoints = []string{"blabla"}
suite.handler.Init(consul_settings)
_, err := suite.handler.GetReceivers()
suite.Error(err, "")
}
func (suite *ConsulHandlerTestSuite) TestGetBrokerWhenNotConnected() {
consul_settings.ConsulEndpoints = []string{"blabla"}
suite.handler.Init(consul_settings)
_, err := suite.handler.GetBroker()
suite.Error(err, "")
}
func (suite *ConsulHandlerTestSuite) TestGetBrokerRoundRobin() {
suite.handler.Init(consul_settings)
res, err := suite.handler.GetBroker()
suite.NoError(err, "")
suite.Equal("127.0.0.1:1234", string(res), "uris")
res, err = suite.handler.GetBroker()
suite.NoError(err, "")
suite.Equal("127.0.0.1:1235", string(res), "uris")
res, err = suite.handler.GetBroker()
suite.NoError(err, "")
suite.Equal("127.0.0.1:1234", string(res), "uris")
}
func (suite *ConsulHandlerTestSuite) TestGetBrokerEmpty() {
suite.client.Agent().ServiceDeregister("broker1234")
suite.client.Agent().ServiceDeregister("broker1235")
suite.handler.Init(consul_settings)
res, err := suite.handler.GetBroker()
suite.NoError(err, "")
suite.Equal("", string(res), "uris")
}
......@@ -5,15 +5,23 @@ import (
)
type StaticRequestHandler struct {
Responce
}
receiverResponce Responce
broker string
}
func (rh *StaticRequestHandler) GetReceivers() ([]byte, error) {
return utils.MapToJson(&rh)
return utils.MapToJson(&rh.receiverResponce)
}
func (rh *StaticRequestHandler) Init(maxCons int,uris []string) error {
rh.MaxConnections = maxCons
rh.Uris = uris
func (rh *StaticRequestHandler) GetBroker() ([]byte, error) {
return []byte(rh.broker),nil
}
func (rh *StaticRequestHandler) Init(settings utils.Settings) error {
rh.receiverResponce.MaxConnections = settings.Receiver.MaxConnections
rh.receiverResponce.Uris = settings.Receiver.ForceEndpoints
rh.broker = settings.Broker.ForceEndpoint
return nil
}
......@@ -3,23 +3,35 @@ package request_handler
import (
"github.com/stretchr/testify/assert"
"testing"
"asapo_discovery/utils"
)
var uris = []string{"ip1","ip2"}
const max_conn = 1
var static_settings utils.Settings= utils.Settings{Receiver:utils.ReceiverInfo{MaxConnections:max_conn,ForceEndpoints:uris},Broker:utils.BrokerInfo{
ForceEndpoint:"ip_broker"}}
var rh StaticRequestHandler;
func TestStaticHandlerInitOK(t *testing.T) {
err := rh.Init(max_conn,uris)
err := rh.Init(static_settings)
assert.Nil(t, err)
}
func TestStaticHandlerGetOK(t *testing.T) {
rh.Init(max_conn,uris)
func TestStaticHandlerGetReceviersOK(t *testing.T) {
rh.Init(static_settings)
res,err := rh.GetReceivers()
assert.Equal(t,string(res), "{\"MaxConnections\":1,\"Uris\":[\"ip1\",\"ip2\"]}")
assert.Nil(t, err)
}
func TestStaticHandlerGetBrokerOK(t *testing.T) {
rh.Init(static_settings)
res,err := rh.GetBroker()
assert.Equal(t,string(res), "ip_broker")
assert.Nil(t, err)
}
......@@ -3,11 +3,23 @@ package server
import (
"net/http"
"asapo_discovery/logger"
"errors"
)
func getReceivers() (answer []byte, code int) {
answer, err := requestHandler.GetReceivers()
log_str := "processing get receivers "
func getService(service string) (answer []byte, code int) {
var err error
switch service {
case "receivers":
answer, err = requestHandler.GetReceivers()
break
case "broker":
answer, err = requestHandler.GetBroker()
break
default:
err = errors.New("wrong request: "+service)
}
log_str := "processing get "+service
if err != nil {
logger.Error(log_str + " - " + err.Error())
return []byte(err.Error()),http.StatusInternalServerError
......@@ -19,8 +31,14 @@ func getReceivers() (answer []byte, code int) {
func routeGetReceivers(w http.ResponseWriter, r *http.Request) {
r.Header.Set("Content-type", "application/json")
answer,code := getReceivers()
answer,code := getService("receivers")
w.WriteHeader(code)
w.Write(answer)
}
func routeGetBroker(w http.ResponseWriter, r *http.Request) {
r.Header.Set("Content-type", "application/json")
answer,code := getService("broker")
w.WriteHeader(code)
w.Write(answer)
}
......@@ -11,4 +11,11 @@ var listRoutes = utils.Routes{
"/receivers",
routeGetReceivers,
},
utils.Route{
"GetBroker",
"Get",
"/broker",
routeGetBroker,
},
}
......@@ -30,7 +30,10 @@ type GetReceiversTestSuite struct {
func (suite *GetReceiversTestSuite) SetupTest() {
requestHandler = new(request_handler.StaticRequestHandler)
requestHandler.Init(10,[]string{"ip1","ip2"})
var s utils.Settings= utils.Settings{Receiver:utils.ReceiverInfo{MaxConnections:10,ForceEndpoints:[]string{"ip1","ip2"}},
Broker:utils.BrokerInfo{ForceEndpoint:"ip_broker"}}
requestHandler.Init(s)
logger.SetMockLog()
}
......@@ -64,3 +67,14 @@ func (suite *GetReceiversTestSuite) TestGetReceivers() {
}
func (suite *GetReceiversTestSuite) TestGetBroker() {
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing get broker")))
w := doRequest("/broker")
suite.Equal(http.StatusOK, w.Code, "code ok")
suite.Equal(w.Body.String(), "ip_broker", "result")
assertExpectations(suite.T())
}
......@@ -2,50 +2,20 @@ package server
import (
"asapo_discovery/request_handler"
"errors"
"asapo_discovery/utils"
)
var requestHandler request_handler.Agent
type serverSettings struct {
Endpoints []string
Mode string
Port int
MaxConnections int
LogLevel string
}
var settings serverSettings
var settings utils.Settings
func SetHandler(rh request_handler.Agent) error {
requestHandler = rh
err := requestHandler.Init(settings.MaxConnections,settings.Endpoints)
err := requestHandler.Init(settings)
return err
}
func (settings *serverSettings) Validate() error {
if len(settings.Endpoints) == 0 && settings.Mode != "consul"{
return errors.New("Endpoints not set")
}
if settings.MaxConnections == 0 {
return errors.New("Max connections not set")
}
if settings.Port == 0 {
return errors.New("Server port not set")
}
if settings.Mode == "" {
return errors.New("Mode not set")
}
if settings.Mode != "static" && settings.Mode != "consul" {
return errors.New("wrong mode: " + settings.Mode+ ", (allowed static|consul)")
}
return nil
}
func GetHandlerMode()string {
return settings.Mode
......
......@@ -3,15 +3,18 @@ package server
import (
"github.com/stretchr/testify/assert"
"testing"
"asapo_discovery/utils"
)
func fillSettings(mode string)serverSettings {
var settings serverSettings
func fillSettings(mode string) utils.Settings {
var settings utils.Settings
settings.Port = 1
settings.Mode = mode
settings.MaxConnections = 10
settings.Receiver.MaxConnections = 10
settings.LogLevel = "info"
settings.Endpoints=[]string{"ip1","ip2"}
settings.Receiver.ForceEndpoints=[]string{"ip1","ip2"}
settings.Broker.ForceEndpoint="ip_b"
settings.ConsulEndpoints=[]string{"ipc1","ipc2"}
return settings
}
......@@ -27,16 +30,25 @@ func TestSettingsWrongMode(t *testing.T) {
assert.NotNil(t, err)
}
func TestSettingsStaticModeNoEndpoints(t *testing.T) {
func TestSettingsStaticModeNoReceiverEndpoints(t *testing.T) {
settings := fillSettings("static")
settings.Endpoints=[]string{}
settings.Receiver.ForceEndpoints=[]string{}
err := settings.Validate()
assert.NotNil(t, err)
}
func TestSettingsStaticModeNoBrokerEndpoints(t *testing.T) {
settings := fillSettings("static")
settings.Broker.ForceEndpoint=""
err := settings.Validate()
assert.NotNil(t, err)
}
func TestSettingsConsulModeNoEndpoints(t *testing.T) {
settings := fillSettings("consul")
settings.Endpoints=[]string{}
settings.ConsulEndpoints=[]string{}
err := settings.Validate()
assert.Nil(t, err)
}
......
package utils
import "errors"
type ReceiverInfo struct {
ForceEndpoints []string
MaxConnections int
}
type BrokerInfo struct {
ForceEndpoint string
}
type Settings struct {
Receiver ReceiverInfo
Broker BrokerInfo
ConsulEndpoints []string
Mode string
Port int
LogLevel string
}
func (settings *Settings) Validate() error {
if settings.Mode != "consul"{
if len(settings.Receiver.ForceEndpoints) == 0 || len(settings.Broker.ForceEndpoint) == 0 {
return errors.New("Endpoints not set")
}
}
if settings.Receiver.MaxConnections == 0 {
return errors.New("Max connections not set")
}
if settings.Port == 0 {
return errors.New("Server port not set")
}
if settings.Mode == "" {
return errors.New("Mode not set")
}
if settings.Mode != "static" && settings.Mode != "consul" {
return errors.New("wrong mode: " + settings.Mode+ ", (allowed static|consul)")
}
return nil
}
......@@ -12,10 +12,10 @@ set_target_properties(${TARGET_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY
get_target_property(VAR ${TARGET_NAME} RUNTIME_OUTPUT_DIRECTORY)
add_dependencies(${TARGET_NAME} asapo-broker)
prepare_asapo()
configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/broker_settings.json settings.json COPYONLY)
add_script_test("${TARGET_NAME}" "${CMAKE_CURRENT_BINARY_DIR}/${TARGET_NAME} $<TARGET_PROPERTY:asapo-broker,EXENAME>")
add_script_test("${TARGET_NAME}" "${CMAKE_CURRENT_BINARY_DIR}/${TARGET_NAME}")
set (dir examples/worker/${TARGET_NAME})
install(TARGETS ${TARGET_NAME} DESTINATION "${dir}")
......
......@@ -7,22 +7,21 @@ set -e
trap Cleanup EXIT
Cleanup() {
:
kill -9 $brokerid
set +e
nomad stop nginx
nomad stop discovery
nomad stop broker
echo "db.dropDatabase()" | mongo ${database_name}
}
args=${@:1:$(($# - 1))}
broker=${@:$#}
$broker -config settings.json &
brokerid=`echo $!`
sleep 0.3
nomad run nginx.nmd
nomad run discovery.nmd
nomad run broker.nmd
for i in `seq 1 3`;
do
echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1})' | mongo ${database_name}
done
$args 127.0.0.1:5005 $database_name 2 | grep "Processed 3 file(s)"
$@ 127.0.0.1:8400 $database_name 2 | grep "Processed 3 file(s)"
SET database_name=test_run
SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe"
::first argument path to the executable
:: second argument path to the broker
c:\opt\consul\nomad run discovery.nmd
c:\opt\consul\nomad run broker.nmd
c:\opt\consul\nomad run nginx.nmd
set full_name="%2"
set short_name="%~nx2"
start /B "" "%full_name%" -config settings.json
ping 1.0.0.0 -n 1 -w 100 > nul
ping 1.0.0.0 -n 10 -w 100 > nul
for /l %%x in (1, 1, 3) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1}) | %mongo_exe% %database_name% || goto :error
"%1" 127.0.0.1:5005 %database_name% 1 | findstr /c:"Processed 3 file" || goto :error
"%1" 127.0.0.1:8400 %database_name% 1 | findstr /c:"Processed 3 file" || goto :error
goto :clean
:error
......@@ -22,5 +18,7 @@ call :clean
exit /b 1
:clean
Taskkill /IM "%short_name%" /F
c:\opt\consul\nomad stop discovery
c:\opt\consul\nomad stop broker
c:\opt\consul\nomad stop nginx
echo db.dropDatabase() | %mongo_exe% %database_name%
......@@ -9,7 +9,7 @@
namespace asapo {
const std::string ReceiverDiscoveryService::kServiceEndpointSuffix = "/receivers";
const std::string ReceiverDiscoveryService::kServiceEndpointSuffix = "/discovery/receivers";
ReceiverDiscoveryService::ReceiverDiscoveryService(std::string endpoint, uint64_t update_frequency_ms): httpclient__{DefaultHttpClient()},
log__{GetDefaultProducerLogger()},
......
......@@ -31,7 +31,7 @@ namespace {
TEST(StatisticTestsConstructor, Constructor) {
Statistics statistics;
ASSERT_THAT(dynamic_cast<asapo::StatisticsSenderInfluxDb*>(statistics.statistics_sender_list__[0].get()), Ne(nullptr));
ASSERT_THAT(dynamic_cast<asapo::StatisticsSenderFluentd*>(statistics.statistics_sender_list__[1].get()), Ne(nullptr));
// ASSERT_THAT(dynamic_cast<asapo::StatisticsSenderFluentd*>(statistics.statistics_sender_list__[1].get()), Ne(nullptr));
}
......
......@@ -23,4 +23,4 @@ brokerid=`echo $!`
curl -v --silent 127.0.0.1:5005/database/data/next --stderr - | grep '"_id":1'
curl -v --silent 127.0.0.1:5005/database/data/next --stderr - | grep '"_id":2'
curl -v --silent 127.0.0.1:5005/database/data/next --stderr - | grep "Not Found"
curl -v --silent 127.0.0.1:5005/database/data/next --stderr - | grep "not found"
......@@ -13,7 +13,7 @@ ping 1.0.0.0 -n 1 -w 100 > nul
C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next --stderr - | findstr /c:\"_id\":1 || goto :error
C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next --stderr - | findstr /c:\"_id\":2 || goto :error
C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next --stderr - | findstr /c:"Not Found" || goto :error
C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next --stderr - | findstr /c:"not found" || goto :error
goto :clean
......
......@@ -6,7 +6,7 @@ trap Cleanup EXIT
broker_database_name=test_run
monitor_database_name=db_test
broker_address=127.0.0.1:5005
proxy_address=127.0.0.1:8400
receiver_folder=/tmp/asapo/receiver/files
......@@ -38,4 +38,4 @@ $1 localhost:8400 100 1000 4 0 &
#producerid=`echo $!`
$2 ${broker_address} ${broker_database_name} 2 | grep "Processed 1000 file(s)"
$2 ${proxy_address} ${broker_database_name} 2 | grep "Processed 1000 file(s)"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment