Skip to content
Snippets Groups Projects
Commit 3b9b9105 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

discovery services can find ib address, parameter for mongo_dir

parent 68c262a0
No related branches found
No related tags found
No related merge requests found
Showing
with 53 additions and 26 deletions
......@@ -5,6 +5,7 @@ import "errors"
type ReceiverInfo struct {
StaticEndpoints []string
MaxConnections int
UseIBAddress bool
}
type BrokerInfo struct {
......
......@@ -3,8 +3,8 @@
#folders
NOMAD_ALLOC_HOST_SHARED=/tmp/asapo/container_host_shared/nomad_alloc
SERVICE_DATA_CLUSTER_SHARED=/home/yakubov/asapo/asapo_cluster_shared/service_data
DATA_GLOBAL_SHARED=/home/yakubov/asapo/global_shared/data
DATA_GLOBAL_SHARED=/gpfs/petra3/scratch/yakubov/asapo_shared
MONGO_DIR=/scratch/mongodb # due to performance reasons mongodb can benefit from writing to local filesystem (HA to be worked on)
#service distribution
MAX_NOMAD_SERVERS=3 # rest are clients
N_ASAPO_LIGHTWEIGHT_SERVICE_NODES=1 # where to put influx, elk, ... . Rest are receivers, brokers, mongodb
......@@ -38,8 +38,8 @@ SERVER_ADRESSES=`scontrol show hostnames $SLURM_JOB_NODELIST | head -$N_SERVERS
ASAPO_LIGHTWEIGHT_SERVICE_NODES=`scontrol show hostnames $SLURM_JOB_NODELIST | head -$N_ASAPO_LIGHTWEIGHT_SERVICE_NODES | 'BEGIN{printf "["} {printf "%s\"%s\"",sep,$0; sep=","} END{print "]"}'`
# make folders if not exist
mkdir -p $NOMAD_ALLOC_HOST_SHARED $SERVICE_DATA_CLUSTER_SHARED $DATA_GLOBAL_SHARED
chmod 777 $NOMAD_ALLOC_HOST_SHARED $SERVICE_DATA_CLUSTER_SHARED $DATA_GLOBAL_SHARED
mkdir -p $NOMAD_ALLOC_HOST_SHARED $SERVICE_DATA_CLUSTER_SHARED $DATA_GLOBAL_SHARED $MONGO_DIR
chmod 777 $NOMAD_ALLOC_HOST_SHARED $SERVICE_DATA_CLUSTER_SHARED $DATA_GLOBAL_SHARED $MONGO_DIR
cd $SERVICE_DATA_CLUSTER_SHARED
mkdir esdatadir fluentd grafana influxdb mongodb
chmod 777 *
......@@ -72,6 +72,7 @@ dockerrun --rm \
-e NOMAD_ALLOC_DIR=$NOMAD_ALLOC_HOST_SHARED \
-e TF_VAR_service_dir=$SERVICE_DATA_CLUSTER_SHARED \
-e TF_VAR_data_dir=$DATA_GLOBAL_SHARED \
-e TF_VAR_mongo_dir=$MONGO_DIR \
-e ADVERTISE_IP=$ADVERTISE_IP \
-e RECURSORS=$RECURSORS \
-e TF_VAR_asapo_user=$ASAPO_USER \
......
......@@ -32,7 +32,7 @@ job "asapo-mongo" {
security_opt = ["no-new-privileges"]
userns_mode = "host"
image = "mongo:${mongo_version}"
volumes = ["/${service_dir}/mongodb:/data/db"]
volumes = ["${mongo_dir}:/data/db"]
}
resources {
......
{
"Mode": "consul",
"Receiver": {
"MaxConnections": 32
"MaxConnections": 32,
"UseIBAddress": {{ keyOrDefault "use_ib_for_receiver" "false" }}
},
"Port": {{ env "NOMAD_PORT_discovery" }},
"LogLevel": "{{ keyOrDefault "log_level" "info" }}"
......
......@@ -75,6 +75,7 @@ data "template_file" "asapo_mongo" {
template = "${file("${var.job_scripts_dir}/asapo-mongo.nmd.tpl")}"
vars = {
service_dir = "${var.service_dir}"
mongo_dir = "${var.mongo_dir}"
mongo_version = "${var.mongo_version}"
mongo_total_memory_size = "${var.mongo_total_memory_size}"
mongo_port = "${var.mongo_port}"
......
......@@ -26,6 +26,8 @@ variable "service_dir" {}
variable "data_dir" {}
variable "mongo_dir" {}
variable "receiver_total_memory_size" {}
variable "receiver_dataserver_cache_size" {}
......
{
"Mode": "consul",
"Receiver": {
"MaxConnections": 32
"MaxConnections": 32,
"UseIBAddress": {{ keyOrDefault "use_ib_for_receiver" "false" }}
},
"Port": {{ env "NOMAD_PORT_discovery" }},
"LogLevel": "{{ keyOrDefault "log_level" "info" }}"
......
......@@ -3,7 +3,7 @@ package request_handler
import "asapo_common/utils"
type Agent interface {
GetReceivers() ([]byte, error)
GetReceivers(bool) ([]byte, error)
GetBroker() ([]byte, error)
GetMongo() ([]byte, error)
Init(settings utils.Settings) error
......
......@@ -35,25 +35,34 @@ func (c *SafeCounter) Next(size int) int {
var counter SafeCounter
func (rh *ConsulRequestHandler) GetServices(name string) ([]string, error) {
func (rh *ConsulRequestHandler) GetServices(name string,use_ib bool) ([]string, error) {
var result = make([]string, 0)
services, _, err := rh.client.Health().Service(name, "", true, nil)
if err != nil {
return nil, err
}
for _, service := range (services) {
result = append(result, service.Node.Address+":"+strconv.Itoa(service.Service.Port))
var address string
if use_ib {
var ok bool
address,ok = service.Node.Meta["ib_address"];
if !ok || address=="none" {
address = service.Node.Address
}
} else {
address = service.Node.Address
}
result = append(result, address+":"+strconv.Itoa(service.Service.Port))
}
sort.Strings(result)
return result, nil
}
func (rh *ConsulRequestHandler) GetReceivers() ([]byte, error) {
func (rh *ConsulRequestHandler) GetReceivers(use_ib bool) ([]byte, error) {
if len(rh.staticHandler.receiverResponce.Uris)>0 {
return rh.staticHandler.GetReceivers()
return rh.staticHandler.GetReceivers(false)
}
var response Responce
response.MaxConnections = rh.MaxConnections
......@@ -61,7 +70,7 @@ func (rh *ConsulRequestHandler) GetReceivers() ([]byte, error) {
return nil, errors.New("consul client not connected")
}
var err error
response.Uris, err = rh.GetServices("asapo-receiver")
response.Uris, err = rh.GetServices("asapo-receiver",use_ib)
if err != nil {
return nil, err
}
......@@ -76,7 +85,7 @@ func (rh *ConsulRequestHandler) GetBroker() ([]byte, error) {
if (rh.client == nil) {
return nil, errors.New("consul client not connected")
}
response, err := rh.GetServices("asapo-broker")
response, err := rh.GetServices("asapo-broker",false)
if err != nil {
return nil, err
}
......@@ -97,7 +106,7 @@ func (rh *ConsulRequestHandler) GetMongo() ([]byte, error) {
if (rh.client == nil) {
return nil, errors.New("consul client not connected")
}
response, err := rh.GetServices("mongo")
response, err := rh.GetServices("mongo",false)
if err != nil {
return nil, err
}
......
......@@ -94,15 +94,24 @@ func (suite *ConsulHandlerTestSuite) TestInitOkUriNotFirst() {
func (suite *ConsulHandlerTestSuite) TestGetReceivers() {
suite.handler.Init(consul_settings)
res, err := suite.handler.GetReceivers()
res, err := suite.handler.GetReceivers(false)
suite.NoError(err, "")
suite.Equal("{\"MaxConnections\":10,\"Uris\":[\"127.0.0.1:1234\",\"127.0.0.1:1235\"]}", string(res), "uris")
}
func (suite *ConsulHandlerTestSuite) TestGetReceiversWithIB() {
consul_settings.Receiver.UseIBAddress = true
suite.handler.Init(consul_settings)
res, err := suite.handler.GetReceivers(true)
suite.NoError(err, "")
suite.Equal("{\"MaxConnections\":10,\"Uris\":[\"10.10.0.1:1234\",\"10.10.0.1:1235\"]}", string(res), "uris")
}
func (suite *ConsulHandlerTestSuite) TestGetReceiversStatic() {
consul_settings.Receiver.StaticEndpoints= []string{"127.0.0.1:0000"}
suite.handler.Init(consul_settings)
res, err := suite.handler.GetReceivers()
res, err := suite.handler.GetReceivers(false)
suite.NoError(err, "")
suite.Equal("{\"MaxConnections\":10,\"Uris\":[\"127.0.0.1:0000\"]}", string(res), "uris")
}
......@@ -110,7 +119,7 @@ func (suite *ConsulHandlerTestSuite) TestGetReceiversStatic() {
func (suite *ConsulHandlerTestSuite) TestGetReceiversWhenNotConnected() {
consul_settings.ConsulEndpoints = []string{"blabla"}
suite.handler.Init(consul_settings)
_, err := suite.handler.GetReceivers()
_, err := suite.handler.GetReceivers(false)
suite.Error(err, "")
}
......
......@@ -11,7 +11,7 @@ type StaticRequestHandler struct {
}
func (rh *StaticRequestHandler) GetReceivers() ([]byte, error) {
func (rh *StaticRequestHandler) GetReceivers(bool) ([]byte, error) {
return utils.MapToJson(&rh.receiverResponce)
}
......
......@@ -24,7 +24,7 @@ func TestStaticHandlerInitOK(t *testing.T) {
func TestStaticHandlerGetReceviersOK(t *testing.T) {
rh.Init(static_settings)
res,err := rh.GetReceivers()
res,err := rh.GetReceivers(false)
assert.Equal(t,string(res), "{\"MaxConnections\":1,\"Uris\":[\"ip1\",\"ip2\"]}")
assert.Nil(t, err)
}
......
......@@ -10,7 +10,7 @@ func getService(service string) (answer []byte, code int) {
var err error
switch service {
case "receivers":
answer, err = requestHandler.GetReceivers()
answer, err = requestHandler.GetReceivers(settings.Receiver.UseIBAddress)
break
case "broker":
answer, err = requestHandler.GetBroker()
......
......@@ -34,5 +34,5 @@ $1 localhost:8400 ${beamtime_id} 100 112 4 0 100
sleep 2
# should be 118 requests (112 data transfers + 5 authorizations (4 + 1 after reconnection due to wrong meta))
influx -execute "select sum(n_requests) from statistics" -database=${database_name} -format=json | tee /dev/stderr | jq .results[0].series[0].values[0][1] | tee /dev/stderr | grep 117
# should be 116 requests (112 data transfers + 4 authorizations)
influx -execute "select sum(n_requests) from statistics" -database=${database_name} -format=json | tee /dev/stderr | jq .results[0].series[0].values[0][1] | tee /dev/stderr | grep 116
{
"Mode": "consul",
"Receiver": {
"MaxConnections": 32
"MaxConnections": 32,
"UseIBAddress": false
},
"Mongo": {
"StaticEndpoint": "127.0.0.1:27017"
......
{
"Mode": "consul",
"Receiver": {
"MaxConnections": 32
"MaxConnections": 32,
"UseIBAddress": false
},
"Port": {{ env "NOMAD_PORT_discovery" }},
"LogLevel":"debug"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment