Skip to content
Snippets Groups Projects
Commit 3b2c0400 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

start work at discovery

parent b1c65781
No related branches found
No related tags found
No related merge requests found
Showing
with 792 additions and 0 deletions
......@@ -122,3 +122,4 @@ doxygen
#GO
broker/pkg
discovery/pkg
\ No newline at end of file
......@@ -57,6 +57,9 @@ add_subdirectory(worker)
add_subdirectory(receiver)
add_subdirectory(discovery)
if(BUILD_INTEGRATION_TESTS)
add_subdirectory(tests)
endif()
......
set (TARGET_NAME asapo-discovery)
if (NOT "$ENV{GOPATH}" STREQUAL "")
set(GOPATH $ENV{GOPATH})
endif()
if (NOT GOPATH)
message (FATAL_ERROR "GOPATH not set")
endif()
message(STATUS "global gopath ${GOPATH}")
IF(WIN32)
set (gopath "${GOPATH}\;${CMAKE_CURRENT_SOURCE_DIR}")
set (exe_name "${TARGET_NAME}.exe")
ELSE()
set (gopath ${GOPATH}:${CMAKE_CURRENT_SOURCE_DIR})
set (exe_name "${TARGET_NAME}")
ENDIF()
include(testing_go)
add_custom_target(${TARGET_NAME} ALL
COMMAND ${CMAKE_COMMAND} -E env GOPATH=${gopath}
go build ${GO_OPTS} -o ${exe_name} asapo_discovery/main
VERBATIM)
define_property(TARGET PROPERTY EXENAME
BRIEF_DOCS <executable name>
FULL_DOCS <full-doc>)
set_target_properties(${TARGET_NAME} PROPERTIES EXENAME ${CMAKE_CURRENT_BINARY_DIR}/${exe_name})
install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${exe_name} DESTINATION bin)
gotest(${TARGET_NAME} "./...")
package logger
import (
"errors"
"strings"
)
type Level uint32
//log levels
const (
InfoLevel = iota
DebugLevel
ErrorLevel
WarnLevel
FatalLevel
)
type Logger interface {
Info(args ...interface{})
Debug(args ...interface{})
Fatal(args ...interface{})
Warning(args ...interface{})
Error(args ...interface{})
SetLevel(level Level)
}
var my_logger Logger = &logRusLogger{}
func Info(args ...interface{}) {
my_logger.Info(args...)
}
func Debug(args ...interface{}) {
my_logger.Debug(args...)
}
func Warning(args ...interface{}) {
my_logger.Warning(args...)
}
func Error(args ...interface{}) {
my_logger.Error(args...)
}
func Fatal(args ...interface{}) {
my_logger.Fatal(args...)
}
func SetLevel(level Level) {
my_logger.SetLevel(level)
}
func LevelFromString(str string) (Level, error) {
switch strings.ToLower(str) {
case "debug":
return DebugLevel, nil
case "info":
return InfoLevel, nil
case "warning":
return WarnLevel, nil
case "error":
return ErrorLevel, nil
case "fatal", "none":
return FatalLevel, nil
}
return FatalLevel, errors.New("wrong log level")
}
package logger
import (
log "github.com/sirupsen/logrus"
)
type logRusLogger struct {
logger_entry *log.Entry
}
func (l *logRusLogger) entry() *log.Entry {
if l.logger_entry != nil {
return l.logger_entry
}
formatter := &log.JSONFormatter{
FieldMap: log.FieldMap{
log.FieldKeyMsg: "message",
},
TimestampFormat: "2006-01-02 15:04:05.000",
}
log.SetFormatter(formatter)
l.logger_entry = log.WithFields(log.Fields{
"source": "discovery",
})
return l.logger_entry
}
func (l *logRusLogger) Info(args ...interface{}) {
l.entry().Info(args...)
return
}
func (l *logRusLogger) Debug(args ...interface{}) {
l.entry().Debug(args...)
return
}
func (l *logRusLogger) Error(args ...interface{}) {
l.entry().Error(args...)
return
}
func (l *logRusLogger) Warning(args ...interface{}) {
l.entry().Warning(args...)
return
}
func (l *logRusLogger) Fatal(args ...interface{}) {
l.entry().Fatal(args...)
return
}
func (l *logRusLogger) SetLevel(level Level) {
logrusLevel := log.InfoLevel
switch level {
case DebugLevel:
logrusLevel = log.DebugLevel
case InfoLevel:
logrusLevel = log.InfoLevel
case WarnLevel:
logrusLevel = log.WarnLevel
case ErrorLevel:
logrusLevel = log.ErrorLevel
case FatalLevel:
logrusLevel = log.FatalLevel
}
log.SetLevel(logrusLevel)
return
}
//+build !release
package logger
import (
"github.com/stretchr/testify/mock"
)
type MockLogger struct {
mock.Mock
}
var MockLog MockLogger
func SetMockLog() {
my_logger = &MockLog
}
func UnsetMockLog() {
my_logger = &logRusLogger{}
}
func (l *MockLogger) Info(args ...interface{}) {
l.Called(args...)
return
}
func (l *MockLogger) Debug(args ...interface{}) {
l.Called(args...)
return
}
func (l *MockLogger) Error(args ...interface{}) {
l.Called(args...)
return
}
func (l *MockLogger) Warning(args ...interface{}) {
l.Called(args...)
return
}
func (l *MockLogger) Fatal(args ...interface{}) {
l.Called(args...)
return
}
func (l *MockLogger) SetLevel(level Level) {
l.Called(level)
return
}
//+build !test
package main
import (
"flag"
log "asapo_discovery/logger"
"asapo_discovery/server"
"os"
"asapo_discovery/request_handler"
)
func NewDefaultHandler() request_handler.Agent {
switch server.GetHandlerMode() {
case "static":
return new(request_handler.StaticRequestHandler)
case "consul":
return new(request_handler.ConsulRequestHandler)
default:
log.Fatal("wrong handler")
return nil
}
}
func PrintUsage() {
log.Fatal("Usage: " + os.Args[0] + " -config <config file>")
}
func main() {
var fname = flag.String("config", "", "config file path")
flag.Parse()
if *fname == "" {
PrintUsage()
}
logLevel, err := server.ReadConfig(*fname)
if err != nil {
log.Fatal(err.Error())
}
log.SetLevel(logLevel)
err = server.SetHandler(NewDefaultHandler())
if err != nil {
log.Fatal(err.Error())
}
server.Start()
}
package request_handler
type Agent interface {
GetReceivers() ([]byte, error)
Init(int,[]string) error
}
package request_handler
import (
"asapo_discovery/utils"
"github.com/hashicorp/consul/api"
"strconv"
"errors"
)
type ConsulRequestHandler struct {
MaxConnections int
client *api.Client
}
type Responce struct {
MaxConnections int
Uris []string
}
func (rh *ConsulRequestHandler) GetReceivers() ([]byte, error) {
if (rh.client == nil){
return nil,errors.New("consul client not connected")
}
var responce Responce
services,_,err := rh.client.Health().Service("receiver","",true,nil)
if err!=nil {
return nil,err
}
for _,service := range (services) {
responce.Uris = append(responce.Uris,service.Node.Address+":"+strconv.Itoa(service.Service.Port))
}
responce.MaxConnections = rh.MaxConnections
return utils.MapToJson(&responce)
}
func (rh *ConsulRequestHandler) connectClient(uri string) (client *api.Client, err error) {
config := api.DefaultConfig()
if len(uri) > 0 {
config.Address = uri
}
client, err = api.NewClient(config)
if err == nil {
_, err = client.Agent().Self()
}
if err != nil {
client = nil
}
return
}
func (rh *ConsulRequestHandler) Init(maxCons int, uris []string) (err error) {
rh.MaxConnections = maxCons
if len(uris) == 0 {
rh.client, err = rh.connectClient("")
return err
}
for _, uri := range (uris) {
rh.client, err = rh.connectClient(uri)
if err == nil {
return nil
}
}
return err
}
package request_handler
import (
"github.com/stretchr/testify/suite"
"testing"
"github.com/hashicorp/consul/api"
"strconv"
)
type ConsulHandlerTestSuite struct {
suite.Suite
client *api.Client
handler ConsulRequestHandler
}
func TestConsulHandlerTestSuite(t *testing.T) {
suite.Run(t, new(ConsulHandlerTestSuite))
}
func (suite *ConsulHandlerTestSuite) SetupTest() {
var err error
suite.client, err = api.NewClient(api.DefaultConfig())
if err != nil {
panic(err)
}
for i:=1234;i<1236;i++ {
reg := &api.AgentServiceRegistration{
ID: "receiver"+strconv.Itoa(i),
Name: "receiver",
Port: i,
Check: &api.AgentServiceCheck{
Interval:"10m",
Status:"passing",
HTTP: "http://localhost:5000/health",
},
}
err = suite.client.Agent().ServiceRegister(reg)
if err != nil {
panic(err)
}
}
}
func (suite *ConsulHandlerTestSuite) TearDownTest() {
suite.client.Agent().ServiceDeregister("receiver1234")
suite.client.Agent().ServiceDeregister("receiver1235")
}
func (suite *ConsulHandlerTestSuite) TestInitDefaultUri() {
err := suite.handler.Init(10,[]string{})
suite.NoError(err, "empty list")
}
func (suite *ConsulHandlerTestSuite) TestInitWrongUri() {
err := suite.handler.Init(10,[]string{"blabla"})
suite.Error(err, "wrong consul uri")
suite.Nil(suite.handler.client, "client nli after error")
}
func (suite *ConsulHandlerTestSuite) TestInitOkUriFirst() {
err := suite.handler.Init(10,[]string{"http://127.0.0.1:8500"})
suite.NoError(err, "")
}
func (suite *ConsulHandlerTestSuite) TestInitOkUriNotFirst() {
err := suite.handler.Init(10,[]string{"blabla","http://127.0.0.1:8500"})
suite.NoError(err, "")
}
func (suite *ConsulHandlerTestSuite) TestGetReceivers() {
suite.handler.Init(10,[]string{})
res,err := suite.handler.GetReceivers()
suite.NoError(err, "")
suite.Equal("{\"MaxConnections\":10,\"Uris\":[\"127.0.0.1:1234\",\"127.0.0.1:1235\"]}",string(res),"uris")
}
func (suite *ConsulHandlerTestSuite) TestGetReceiversWhenNotConnected() {
suite.handler.Init(10,[]string{"blabla"})
_,err := suite.handler.GetReceivers()
suite.Error(err, "")
}
package request_handler
import (
"asapo_discovery/utils"
)
type StaticRequestHandler struct {
Responce
}
func (rh *StaticRequestHandler) GetReceivers() ([]byte, error) {
return utils.MapToJson(&rh)
}
func (rh *StaticRequestHandler) Init(maxCons int,uris []string) error {
rh.MaxConnections = maxCons
rh.Uris = uris
return nil
}
package request_handler
import (
"github.com/stretchr/testify/assert"
"testing"
)
var uris = []string{"ip1","ip2"}
const max_conn = 1
var rh StaticRequestHandler;
func TestStaticHandlerInitOK(t *testing.T) {
err := rh.Init(max_conn,uris)
assert.Nil(t, err)
}
func TestStaticHandlerGetOK(t *testing.T) {
rh.Init(max_conn,uris)
res,err := rh.GetReceivers()
assert.Equal(t,string(res), "{\"MaxConnections\":1,\"Uris\":[\"ip1\",\"ip2\"]}")
assert.Nil(t, err)
}
package server
import (
"net/http"
"asapo_discovery/logger"
)
func getReceivers() (answer []byte, code int) {
answer, err := requestHandler.GetReceivers()
log_str := "processing get receivers "
if err != nil {
logger.Error(log_str + " - " + err.Error())
return []byte(err.Error()),http.StatusInternalServerError
}
logger.Debug(log_str + " - got " + string(answer))
return answer, http.StatusOK
}
func routeGetReceivers(w http.ResponseWriter, r *http.Request) {
r.Header.Set("Content-type", "application/json")
answer,code := getReceivers()
w.WriteHeader(code)
w.Write(answer)
}
package server
import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"asapo_discovery/logger"
"asapo_discovery/utils"
"net/http"
"net/http/httptest"
"strings"
"testing"
"asapo_discovery/request_handler"
)
func containsMatcher(substr string) func(str string) bool {
return func(str string) bool { return strings.Contains(str, substr) }
}
func doRequest(path string) *httptest.ResponseRecorder {
mux := utils.NewRouter(listRoutes)
req, _ := http.NewRequest("GET", path, nil)
w := httptest.NewRecorder()
mux.ServeHTTP(w, req)
return w
}
type GetReceiversTestSuite struct {
suite.Suite
}
func (suite *GetReceiversTestSuite) SetupTest() {
requestHandler = new(request_handler.StaticRequestHandler)
requestHandler.Init(10,[]string{"ip1","ip2"})
logger.SetMockLog()
}
func (suite *GetReceiversTestSuite) TearDownTest() {
logger.UnsetMockLog()
requestHandler = nil
}
func TestGetReceiversTestSuite(t *testing.T) {
suite.Run(t, new(GetReceiversTestSuite))
}
func assertExpectations(t *testing.T) {
logger.MockLog.AssertExpectations(t)
logger.MockLog.ExpectedCalls = nil
}
func (suite *GetReceiversTestSuite) TestWrongPath() {
w := doRequest("/blabla")
suite.Equal(http.StatusNotFound, w.Code, "wrong path")
}
func (suite *GetReceiversTestSuite) TestGetReceivers() {
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing get receivers")))
w := doRequest("/receivers")
suite.Equal(http.StatusOK, w.Code, "code ok")
suite.Equal(w.Body.String(), "{\"MaxConnections\":10,\"Uris\":[\"ip1\",\"ip2\"]}", "result")
assertExpectations(suite.T())
}
package server
import (
"asapo_discovery/utils"
)
var listRoutes = utils.Routes{
utils.Route{
"GetReceivers",
"Get",
"/receivers",
routeGetReceivers,
},
}
package server
import (
"asapo_discovery/request_handler"
"errors"
)
var requestHandler request_handler.Agent
type serverSettings struct {
Endpoints []string
Mode string
Port int
MaxConnections int
LogLevel string
}
var settings serverSettings
func SetHandler(rh request_handler.Agent) error {
requestHandler = rh
err := requestHandler.Init(settings.MaxConnections,settings.Endpoints)
return err
}
func (settings *serverSettings) Validate() error {
if len(settings.Endpoints) == 0 && settings.Mode != "consul"{
return errors.New("Endpoints not set")
}
if settings.MaxConnections == 0 {
return errors.New("Max connections not set")
}
if settings.Port == 0 {
return errors.New("Server port not set")
}
if settings.Mode == "" {
return errors.New("Mode not set")
}
if settings.Mode != "static" && settings.Mode != "consul" {
return errors.New("wrong mode: " + settings.Mode+ ", (allowed static|consul)")
}
return nil
}
func GetHandlerMode()string {
return settings.Mode
}
//+build !test
package server
import (
log "asapo_discovery/logger"
"asapo_discovery/utils"
"net/http"
"strconv"
)
func Start() {
mux := utils.NewRouter(listRoutes)
log.Info("Listening on port: " + strconv.Itoa(settings.Port))
log.Fatal(http.ListenAndServe(":"+strconv.Itoa(settings.Port), http.HandlerFunc(mux.ServeHTTP)))
}
func ReadConfig(fname string) (log.Level, error) {
if err := utils.ReadJsonFromFile(fname, &settings); err != nil {
return log.FatalLevel, err
}
if err := settings.Validate(); err != nil {
return log.FatalLevel,err
}
return log.LevelFromString(settings.LogLevel)
}
package server
import (
"github.com/stretchr/testify/assert"
"testing"
)
func fillSettings(mode string)serverSettings {
var settings serverSettings
settings.Port = 1
settings.Mode = mode
settings.MaxConnections = 10
settings.LogLevel = "info"
settings.Endpoints=[]string{"ip1","ip2"}
return settings
}
func TestSettingsOK(t *testing.T) {
settings := fillSettings("static")
err := settings.Validate()
assert.Nil(t, err)
}
func TestSettingsWrongMode(t *testing.T) {
settings := fillSettings("blalba")
err := settings.Validate()
assert.NotNil(t, err)
}
func TestSettingsStaticModeNoEndpoints(t *testing.T) {
settings := fillSettings("static")
settings.Endpoints=[]string{}
err := settings.Validate()
assert.NotNil(t, err)
}
func TestSettingsConsulModeNoEndpoints(t *testing.T) {
settings := fillSettings("consul")
settings.Endpoints=[]string{}
err := settings.Validate()
assert.Nil(t, err)
}
func TestGetHandlerMode(t *testing.T) {
mode := "consul"
settings = fillSettings(mode)
assert.Equal(t,mode,GetHandlerMode())
}
package utils
import (
json "encoding/json"
"io/ioutil"
)
func StringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}
func MapToJson(res interface{}) ([]byte, error) {
answer, err := json.Marshal(res)
if err == nil {
return answer, nil
} else {
return nil, err
}
}
func ReadJsonFromFile(fname string, config interface{}) error {
content, err := ioutil.ReadFile(fname)
if err != nil {
return err
}
err = json.Unmarshal(content, config)
if err != nil {
return err
}
return nil
}
package utils
import (
"net/http"
"github.com/gorilla/mux"
"strings"
)
type Routes []Route
type Route struct {
Name string
Method string
Pattern string
HandlerFunc http.HandlerFunc
}
func NewRouter(listRoutes Routes) *mux.Router {
router := mux.NewRouter()
for _, route := range listRoutes {
router.
Methods(route.Method).
Path(route.Pattern).
Name(route.Name).
Handler(route.HandlerFunc)
// allow routes without trailing slash
if strings.HasSuffix(route.Pattern, "/") {
router.
Methods(route.Method).
Path(strings.TrimSuffix(route.Pattern, "/")).
Name(route.Name + "_noslash").
Handler(route.HandlerFunc)
}
}
return router
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment