Skip to content
Snippets Groups Projects
Commit 943a2cbc authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

Merge branch 'develop' into feature/parallel-data-transfer

parents d5e78984 e1fb5545
No related branches found
No related tags found
No related merge requests found
Showing
with 792 additions and 0 deletions
...@@ -122,3 +122,4 @@ doxygen ...@@ -122,3 +122,4 @@ doxygen
#GO #GO
broker/pkg broker/pkg
discovery/pkg
\ No newline at end of file
...@@ -62,6 +62,9 @@ add_subdirectory(worker) ...@@ -62,6 +62,9 @@ add_subdirectory(worker)
add_subdirectory(receiver) add_subdirectory(receiver)
add_subdirectory(discovery)
if(BUILD_INTEGRATION_TESTS) if(BUILD_INTEGRATION_TESTS)
add_subdirectory(tests) add_subdirectory(tests)
endif() endif()
......
set (TARGET_NAME asapo-discovery)
if (NOT "$ENV{GOPATH}" STREQUAL "")
set(GOPATH $ENV{GOPATH})
endif()
if (NOT GOPATH)
message (FATAL_ERROR "GOPATH not set")
endif()
message(STATUS "global gopath ${GOPATH}")
IF(WIN32)
set (gopath "${GOPATH}\;${CMAKE_CURRENT_SOURCE_DIR}")
set (exe_name "${TARGET_NAME}.exe")
ELSE()
set (gopath ${GOPATH}:${CMAKE_CURRENT_SOURCE_DIR})
set (exe_name "${TARGET_NAME}")
ENDIF()
include(testing_go)
add_custom_target(${TARGET_NAME} ALL
COMMAND ${CMAKE_COMMAND} -E env GOPATH=${gopath}
go build ${GO_OPTS} -o ${exe_name} asapo_discovery/main
VERBATIM)
define_property(TARGET PROPERTY EXENAME
BRIEF_DOCS <executable name>
FULL_DOCS <full-doc>)
set_target_properties(${TARGET_NAME} PROPERTIES EXENAME ${CMAKE_CURRENT_BINARY_DIR}/${exe_name})
install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${exe_name} DESTINATION bin)
gotest(${TARGET_NAME} "./...")
package logger
import (
"errors"
"strings"
)
type Level uint32
//log levels
const (
InfoLevel = iota
DebugLevel
ErrorLevel
WarnLevel
FatalLevel
)
type Logger interface {
Info(args ...interface{})
Debug(args ...interface{})
Fatal(args ...interface{})
Warning(args ...interface{})
Error(args ...interface{})
SetLevel(level Level)
}
var my_logger Logger = &logRusLogger{}
func Info(args ...interface{}) {
my_logger.Info(args...)
}
func Debug(args ...interface{}) {
my_logger.Debug(args...)
}
func Warning(args ...interface{}) {
my_logger.Warning(args...)
}
func Error(args ...interface{}) {
my_logger.Error(args...)
}
func Fatal(args ...interface{}) {
my_logger.Fatal(args...)
}
func SetLevel(level Level) {
my_logger.SetLevel(level)
}
func LevelFromString(str string) (Level, error) {
switch strings.ToLower(str) {
case "debug":
return DebugLevel, nil
case "info":
return InfoLevel, nil
case "warning":
return WarnLevel, nil
case "error":
return ErrorLevel, nil
case "fatal", "none":
return FatalLevel, nil
}
return FatalLevel, errors.New("wrong log level")
}
package logger
import (
log "github.com/sirupsen/logrus"
)
type logRusLogger struct {
logger_entry *log.Entry
}
func (l *logRusLogger) entry() *log.Entry {
if l.logger_entry != nil {
return l.logger_entry
}
formatter := &log.JSONFormatter{
FieldMap: log.FieldMap{
log.FieldKeyMsg: "message",
},
TimestampFormat: "2006-01-02 15:04:05.000",
}
log.SetFormatter(formatter)
l.logger_entry = log.WithFields(log.Fields{
"source": "discovery",
})
return l.logger_entry
}
func (l *logRusLogger) Info(args ...interface{}) {
l.entry().Info(args...)
return
}
func (l *logRusLogger) Debug(args ...interface{}) {
l.entry().Debug(args...)
return
}
func (l *logRusLogger) Error(args ...interface{}) {
l.entry().Error(args...)
return
}
func (l *logRusLogger) Warning(args ...interface{}) {
l.entry().Warning(args...)
return
}
func (l *logRusLogger) Fatal(args ...interface{}) {
l.entry().Fatal(args...)
return
}
func (l *logRusLogger) SetLevel(level Level) {
logrusLevel := log.InfoLevel
switch level {
case DebugLevel:
logrusLevel = log.DebugLevel
case InfoLevel:
logrusLevel = log.InfoLevel
case WarnLevel:
logrusLevel = log.WarnLevel
case ErrorLevel:
logrusLevel = log.ErrorLevel
case FatalLevel:
logrusLevel = log.FatalLevel
}
log.SetLevel(logrusLevel)
return
}
//+build !release
package logger
import (
"github.com/stretchr/testify/mock"
)
type MockLogger struct {
mock.Mock
}
var MockLog MockLogger
func SetMockLog() {
my_logger = &MockLog
}
func UnsetMockLog() {
my_logger = &logRusLogger{}
}
func (l *MockLogger) Info(args ...interface{}) {
l.Called(args...)
return
}
func (l *MockLogger) Debug(args ...interface{}) {
l.Called(args...)
return
}
func (l *MockLogger) Error(args ...interface{}) {
l.Called(args...)
return
}
func (l *MockLogger) Warning(args ...interface{}) {
l.Called(args...)
return
}
func (l *MockLogger) Fatal(args ...interface{}) {
l.Called(args...)
return
}
func (l *MockLogger) SetLevel(level Level) {
l.Called(level)
return
}
//+build !test
package main
import (
"flag"
log "asapo_discovery/logger"
"asapo_discovery/server"
"os"
"asapo_discovery/request_handler"
)
func NewDefaultHandler() request_handler.Agent {
switch server.GetHandlerMode() {
case "static":
return new(request_handler.StaticRequestHandler)
case "consul":
return new(request_handler.ConsulRequestHandler)
default:
log.Fatal("wrong handler")
return nil
}
}
func PrintUsage() {
log.Fatal("Usage: " + os.Args[0] + " -config <config file>")
}
func main() {
var fname = flag.String("config", "", "config file path")
flag.Parse()
if *fname == "" {
PrintUsage()
}
logLevel, err := server.ReadConfig(*fname)
if err != nil {
log.Fatal(err.Error())
}
log.SetLevel(logLevel)
err = server.SetHandler(NewDefaultHandler())
if err != nil {
log.Fatal(err.Error())
}
server.Start()
}
package request_handler
type Agent interface {
GetReceivers() ([]byte, error)
Init(int,[]string) error
}
package request_handler
import (
"asapo_discovery/utils"
"github.com/hashicorp/consul/api"
"strconv"
"errors"
)
type ConsulRequestHandler struct {
MaxConnections int
client *api.Client
}
type Responce struct {
MaxConnections int
Uris []string
}
func (rh *ConsulRequestHandler) GetReceivers() ([]byte, error) {
if (rh.client == nil){
return nil,errors.New("consul client not connected")
}
var responce Responce
services,_,err := rh.client.Health().Service("receiver","",true,nil)
if err!=nil {
return nil,err
}
for _,service := range (services) {
responce.Uris = append(responce.Uris,service.Node.Address+":"+strconv.Itoa(service.Service.Port))
}
responce.MaxConnections = rh.MaxConnections
return utils.MapToJson(&responce)
}
func (rh *ConsulRequestHandler) connectClient(uri string) (client *api.Client, err error) {
config := api.DefaultConfig()
if len(uri) > 0 {
config.Address = uri
}
client, err = api.NewClient(config)
if err == nil {
_, err = client.Agent().Self()
}
if err != nil {
client = nil
}
return
}
func (rh *ConsulRequestHandler) Init(maxCons int, uris []string) (err error) {
rh.MaxConnections = maxCons
if len(uris) == 0 {
rh.client, err = rh.connectClient("")
return err
}
for _, uri := range (uris) {
rh.client, err = rh.connectClient(uri)
if err == nil {
return nil
}
}
return err
}
package request_handler
import (
"github.com/stretchr/testify/suite"
"testing"
"github.com/hashicorp/consul/api"
"strconv"
)
type ConsulHandlerTestSuite struct {
suite.Suite
client *api.Client
handler ConsulRequestHandler
}
func TestConsulHandlerTestSuite(t *testing.T) {
suite.Run(t, new(ConsulHandlerTestSuite))
}
func (suite *ConsulHandlerTestSuite) SetupTest() {
var err error
suite.client, err = api.NewClient(api.DefaultConfig())
if err != nil {
panic(err)
}
for i:=1234;i<1236;i++ {
reg := &api.AgentServiceRegistration{
ID: "receiver"+strconv.Itoa(i),
Name: "receiver",
Port: i,
Check: &api.AgentServiceCheck{
Interval:"10m",
Status:"passing",
HTTP: "http://localhost:5000/health",
},
}
err = suite.client.Agent().ServiceRegister(reg)
if err != nil {
panic(err)
}
}
}
func (suite *ConsulHandlerTestSuite) TearDownTest() {
suite.client.Agent().ServiceDeregister("receiver1234")
suite.client.Agent().ServiceDeregister("receiver1235")
}
func (suite *ConsulHandlerTestSuite) TestInitDefaultUri() {
err := suite.handler.Init(10,[]string{})
suite.NoError(err, "empty list")
}
func (suite *ConsulHandlerTestSuite) TestInitWrongUri() {
err := suite.handler.Init(10,[]string{"blabla"})
suite.Error(err, "wrong consul uri")
suite.Nil(suite.handler.client, "client nli after error")
}
func (suite *ConsulHandlerTestSuite) TestInitOkUriFirst() {
err := suite.handler.Init(10,[]string{"http://127.0.0.1:8500"})
suite.NoError(err, "")
}
func (suite *ConsulHandlerTestSuite) TestInitOkUriNotFirst() {
err := suite.handler.Init(10,[]string{"blabla","http://127.0.0.1:8500"})
suite.NoError(err, "")
}
func (suite *ConsulHandlerTestSuite) TestGetReceivers() {
suite.handler.Init(10,[]string{})
res,err := suite.handler.GetReceivers()
suite.NoError(err, "")
suite.Equal("{\"MaxConnections\":10,\"Uris\":[\"127.0.0.1:1234\",\"127.0.0.1:1235\"]}",string(res),"uris")
}
func (suite *ConsulHandlerTestSuite) TestGetReceiversWhenNotConnected() {
suite.handler.Init(10,[]string{"blabla"})
_,err := suite.handler.GetReceivers()
suite.Error(err, "")
}
package request_handler
import (
"asapo_discovery/utils"
)
type StaticRequestHandler struct {
Responce
}
func (rh *StaticRequestHandler) GetReceivers() ([]byte, error) {
return utils.MapToJson(&rh)
}
func (rh *StaticRequestHandler) Init(maxCons int,uris []string) error {
rh.MaxConnections = maxCons
rh.Uris = uris
return nil
}
package request_handler
import (
"github.com/stretchr/testify/assert"
"testing"
)
var uris = []string{"ip1","ip2"}
const max_conn = 1
var rh StaticRequestHandler;
func TestStaticHandlerInitOK(t *testing.T) {
err := rh.Init(max_conn,uris)
assert.Nil(t, err)
}
func TestStaticHandlerGetOK(t *testing.T) {
rh.Init(max_conn,uris)
res,err := rh.GetReceivers()
assert.Equal(t,string(res), "{\"MaxConnections\":1,\"Uris\":[\"ip1\",\"ip2\"]}")
assert.Nil(t, err)
}
package server
import (
"net/http"
"asapo_discovery/logger"
)
func getReceivers() (answer []byte, code int) {
answer, err := requestHandler.GetReceivers()
log_str := "processing get receivers "
if err != nil {
logger.Error(log_str + " - " + err.Error())
return []byte(err.Error()),http.StatusInternalServerError
}
logger.Debug(log_str + " - got " + string(answer))
return answer, http.StatusOK
}
func routeGetReceivers(w http.ResponseWriter, r *http.Request) {
r.Header.Set("Content-type", "application/json")
answer,code := getReceivers()
w.WriteHeader(code)
w.Write(answer)
}
package server
import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"asapo_discovery/logger"
"asapo_discovery/utils"
"net/http"
"net/http/httptest"
"strings"
"testing"
"asapo_discovery/request_handler"
)
func containsMatcher(substr string) func(str string) bool {
return func(str string) bool { return strings.Contains(str, substr) }
}
func doRequest(path string) *httptest.ResponseRecorder {
mux := utils.NewRouter(listRoutes)
req, _ := http.NewRequest("GET", path, nil)
w := httptest.NewRecorder()
mux.ServeHTTP(w, req)
return w
}
type GetReceiversTestSuite struct {
suite.Suite
}
func (suite *GetReceiversTestSuite) SetupTest() {
requestHandler = new(request_handler.StaticRequestHandler)
requestHandler.Init(10,[]string{"ip1","ip2"})
logger.SetMockLog()
}
func (suite *GetReceiversTestSuite) TearDownTest() {
logger.UnsetMockLog()
requestHandler = nil
}
func TestGetReceiversTestSuite(t *testing.T) {
suite.Run(t, new(GetReceiversTestSuite))
}
func assertExpectations(t *testing.T) {
logger.MockLog.AssertExpectations(t)
logger.MockLog.ExpectedCalls = nil
}
func (suite *GetReceiversTestSuite) TestWrongPath() {
w := doRequest("/blabla")
suite.Equal(http.StatusNotFound, w.Code, "wrong path")
}
func (suite *GetReceiversTestSuite) TestGetReceivers() {
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing get receivers")))
w := doRequest("/receivers")
suite.Equal(http.StatusOK, w.Code, "code ok")
suite.Equal(w.Body.String(), "{\"MaxConnections\":10,\"Uris\":[\"ip1\",\"ip2\"]}", "result")
assertExpectations(suite.T())
}
package server
import (
"asapo_discovery/utils"
)
var listRoutes = utils.Routes{
utils.Route{
"GetReceivers",
"Get",
"/receivers",
routeGetReceivers,
},
}
package server
import (
"asapo_discovery/request_handler"
"errors"
)
var requestHandler request_handler.Agent
type serverSettings struct {
Endpoints []string
Mode string
Port int
MaxConnections int
LogLevel string
}
var settings serverSettings
func SetHandler(rh request_handler.Agent) error {
requestHandler = rh
err := requestHandler.Init(settings.MaxConnections,settings.Endpoints)
return err
}
func (settings *serverSettings) Validate() error {
if len(settings.Endpoints) == 0 && settings.Mode != "consul"{
return errors.New("Endpoints not set")
}
if settings.MaxConnections == 0 {
return errors.New("Max connections not set")
}
if settings.Port == 0 {
return errors.New("Server port not set")
}
if settings.Mode == "" {
return errors.New("Mode not set")
}
if settings.Mode != "static" && settings.Mode != "consul" {
return errors.New("wrong mode: " + settings.Mode+ ", (allowed static|consul)")
}
return nil
}
func GetHandlerMode()string {
return settings.Mode
}
//+build !test
package server
import (
log "asapo_discovery/logger"
"asapo_discovery/utils"
"net/http"
"strconv"
)
func Start() {
mux := utils.NewRouter(listRoutes)
log.Info("Listening on port: " + strconv.Itoa(settings.Port))
log.Fatal(http.ListenAndServe(":"+strconv.Itoa(settings.Port), http.HandlerFunc(mux.ServeHTTP)))
}
func ReadConfig(fname string) (log.Level, error) {
if err := utils.ReadJsonFromFile(fname, &settings); err != nil {
return log.FatalLevel, err
}
if err := settings.Validate(); err != nil {
return log.FatalLevel,err
}
return log.LevelFromString(settings.LogLevel)
}
package server
import (
"github.com/stretchr/testify/assert"
"testing"
)
func fillSettings(mode string)serverSettings {
var settings serverSettings
settings.Port = 1
settings.Mode = mode
settings.MaxConnections = 10
settings.LogLevel = "info"
settings.Endpoints=[]string{"ip1","ip2"}
return settings
}
func TestSettingsOK(t *testing.T) {
settings := fillSettings("static")
err := settings.Validate()
assert.Nil(t, err)
}
func TestSettingsWrongMode(t *testing.T) {
settings := fillSettings("blalba")
err := settings.Validate()
assert.NotNil(t, err)
}
func TestSettingsStaticModeNoEndpoints(t *testing.T) {
settings := fillSettings("static")
settings.Endpoints=[]string{}
err := settings.Validate()
assert.NotNil(t, err)
}
func TestSettingsConsulModeNoEndpoints(t *testing.T) {
settings := fillSettings("consul")
settings.Endpoints=[]string{}
err := settings.Validate()
assert.Nil(t, err)
}
func TestGetHandlerMode(t *testing.T) {
mode := "consul"
settings = fillSettings(mode)
assert.Equal(t,mode,GetHandlerMode())
}
package utils
import (
json "encoding/json"
"io/ioutil"
)
func StringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}
func MapToJson(res interface{}) ([]byte, error) {
answer, err := json.Marshal(res)
if err == nil {
return answer, nil
} else {
return nil, err
}
}
func ReadJsonFromFile(fname string, config interface{}) error {
content, err := ioutil.ReadFile(fname)
if err != nil {
return err
}
err = json.Unmarshal(content, config)
if err != nil {
return err
}
return nil
}
package utils
import (
"net/http"
"github.com/gorilla/mux"
"strings"
)
type Routes []Route
type Route struct {
Name string
Method string
Pattern string
HandlerFunc http.HandlerFunc
}
func NewRouter(listRoutes Routes) *mux.Router {
router := mux.NewRouter()
for _, route := range listRoutes {
router.
Methods(route.Method).
Path(route.Pattern).
Name(route.Name).
Handler(route.HandlerFunc)
// allow routes without trailing slash
if strings.HasSuffix(route.Pattern, "/") {
router.
Methods(route.Method).
Path(strings.TrimSuffix(route.Pattern, "/")).
Name(route.Name + "_noslash").
Handler(route.HandlerFunc)
}
}
return router
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment