Skip to content
Snippets Groups Projects
Commit ed7c1f2a authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

discover mongodb via discovery service, not dns

parent fd907cf6
No related branches found
No related tags found
No related merge requests found
Showing
with 179 additions and 128 deletions
......@@ -39,6 +39,8 @@ func main() {
log.SetLevel(logLevel)
server.CreateDiscoveryService()
err = server.InitDB(NewDefaultDatabase())
if err != nil {
log.Fatal(err.Error())
......
......@@ -30,7 +30,7 @@ func checkGroupID(w http.ResponseWriter, needGroupID bool, group_id string, db_n
}
if _, err := xid.FromString(group_id); err != nil {
err_str := "wrong groupid " + group_id
log_str := "processing get " + op + " request in " + db_name + " at " + settings.BrokerDbAddress + ": " + err_str
log_str := "processing get " + op + " request in " + db_name + " at " + settings.DatabaseServer + ": " + err_str
logger.Error(log_str)
fmt.Println(log_str)
w.WriteHeader(http.StatusBadRequest)
......@@ -90,7 +90,7 @@ func processRequestInDb(db_name string, group_id string, op string, extra_param
defer db_new.Close()
statistics.IncreaseCounter()
answer, err := db_new.ProcessRequest(db_name, group_id, op, extra_param)
log_str := "processing request " + op + " in " + db_name + " at " + settings.BrokerDbAddress
log_str := "processing request " + op + " in " + db_name + " at " + settings.DatabaseServer
if err != nil {
return returnError(err, log_str)
}
......
......@@ -7,7 +7,7 @@ import (
)
func writeAuthAnswer(w http.ResponseWriter, requestName string, db_name string, err string) {
log_str := "processing " + requestName + " request in " + db_name + " at " + settings.BrokerDbAddress
log_str := "processing " + requestName + " request in " + db_name + " at " + settings.DatabaseServer
logger.Error(log_str + " - " + err)
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte(err))
......
......@@ -2,15 +2,20 @@ package server
import (
"asapo_broker/database"
log "asapo_common/logger"
"asapo_common/utils"
"errors"
"io/ioutil"
"net/http"
)
var db database.Agent
type serverSettings struct {
BrokerDbAddress string
MonitorDbAddress string
MonitorDbName string
DiscoveryServer string
DatabaseServer string
PerformanceDbServer string
PerformanceDbName string
SecretFile string
Port int
LogLevel string
......@@ -20,12 +25,44 @@ var settings serverSettings
var statistics serverStatistics
var auth utils.Auth
func InitDB(dbAgent database.Agent) error {
type discoveryAPI struct {
Client *http.Client
baseURL string
}
var discoveryService discoveryAPI
func (api *discoveryAPI) GetMongoDbAddress() (string, error) {
resp, err := api.Client.Get(api.baseURL + "/mongo")
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
return string(body), err
}
func InitDB(dbAgent database.Agent) (err error) {
db = dbAgent
err := db.Connect(settings.BrokerDbAddress)
if settings.DatabaseServer == "auto" {
settings.DatabaseServer, err = discoveryService.GetMongoDbAddress()
if err != nil {
return err
}
if settings.DatabaseServer == "" {
return errors.New("no database servers found")
}
log.Debug("Got mongodb server: " + settings.DatabaseServer)
}
err = db.Connect(settings.DatabaseServer)
return err
}
func CreateDiscoveryService() {
discoveryService = discoveryAPI{&http.Client{}, "http://" + settings.DiscoveryServer}
}
func CleanupDB() {
if db != nil {
db.Close()
......
......@@ -38,20 +38,24 @@ func ReadConfig(fname string) (log.Level, error) {
return log.FatalLevel, err
}
if settings.BrokerDbAddress == "" {
return log.FatalLevel, errors.New("BrokerDbAddress not set")
if settings.DatabaseServer == "" {
return log.FatalLevel, errors.New("DatabaseServer not set")
}
if settings.MonitorDbAddress == "" {
return log.FatalLevel, errors.New("MonitorDbAddress not set")
if settings.PerformanceDbServer == "" {
return log.FatalLevel, errors.New("PerformanceDbServer not set")
}
if settings.DatabaseServer == "auto" && settings.DiscoveryServer == "" {
return log.FatalLevel, errors.New("DiscoveryServer not set for auto DatabaseServer")
}
if settings.Port == 0 {
return log.FatalLevel, errors.New("Server port not set")
}
if settings.MonitorDbName == "" {
return log.FatalLevel, errors.New("MonitorDbName not set")
if settings.PerformanceDbName == "" {
return log.FatalLevel, errors.New("PerformanceDbName not set")
}
if settings.SecretFile == "" {
......
......@@ -6,6 +6,8 @@ import (
"errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"net/http"
"net/http/httptest"
"testing"
)
......@@ -37,7 +39,7 @@ func TestInitDBWithWrongAddress(t *testing.T) {
mock_db.ExpectedCalls = nil
settings.BrokerDbAddress = "0.0.0.0:0000"
settings.DatabaseServer = "0.0.0.0:0000"
for _, test := range initDBTests {
mock_db.On("Connect", "0.0.0.0:0000").Return(test.answer)
......@@ -50,6 +52,28 @@ func TestInitDBWithWrongAddress(t *testing.T) {
db = nil
}
func TestInitDBWithAutoAddress(t *testing.T) {
mock_db := setup()
mock_db.ExpectedCalls = nil
settings.DatabaseServer = "auto"
mock_server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, req.URL.String(), "/mongo", "request string")
rw.Write([]byte("0.0.0.0:0000"))
}))
defer mock_server.Close()
discoveryService = discoveryAPI{mock_server.Client(), mock_server.URL}
mock_db.On("Connect", "0.0.0.0:0000").Return(nil)
err := InitDB(mock_db)
assert.Equal(t, nil, err, "auto connect ok")
assertExpectations(t, mock_db)
db = nil
}
func TestCleanupDBWithoutInit(t *testing.T) {
mock_db := setup()
......
......@@ -47,7 +47,7 @@ func (st *serverStatistics) WriteStatistic() (err error) {
func (st *serverStatistics) Monitor() {
for {
time.Sleep(1000 * time.Millisecond)
logstr := "sending statistics to " + settings.MonitorDbAddress + ", dbname: " + settings.MonitorDbName
logstr := "sending statistics to " + settings.PerformanceDbServer + ", dbname: " + settings.PerformanceDbName
if err := st.WriteStatistic(); err != nil {
log.Error(logstr + " - " + err.Error())
} else {
......
......@@ -21,7 +21,7 @@ type StatisticInfluxDbWriter struct {
func (writer *StatisticInfluxDbWriter) Write(statistics *serverStatistics) error {
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://"+ settings.MonitorDbAddress,
Addr: "http://"+ settings.PerformanceDbServer,
})
if err != nil {
return err
......@@ -29,7 +29,7 @@ func (writer *StatisticInfluxDbWriter) Write(statistics *serverStatistics) error
defer c.Close()
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
Database: settings.MonitorDbName,
Database: settings.PerformanceDbName,
})
tags := map[string]string{"Group ID": "0"}
......
......@@ -11,10 +11,14 @@ type BrokerInfo struct {
StaticEndpoint string
}
type MongoInfo struct {
StaticEndpoint string
}
type Settings struct {
Receiver ReceiverInfo
Broker BrokerInfo
Mongo MongoInfo
ConsulEndpoints []string
Mode string
Port int
......@@ -23,8 +27,8 @@ type Settings struct {
func (settings *Settings) Validate() error {
if settings.Mode != "consul"{
if len(settings.Receiver.StaticEndpoints) == 0 || len(settings.Broker.StaticEndpoint) == 0 {
return errors.New("receiver or broker endpoints not set")
if len(settings.Receiver.StaticEndpoints) == 0 || len(settings.Broker.StaticEndpoint) == 0 || len(settings.Mongo.StaticEndpoint) == 0{
return errors.New("static endpoints not set")
}
}
......
......@@ -2,12 +2,6 @@ FROM ubuntu:18.04
MAINTAINER DESY IT
ENV CONSUL_VERSION=1.6.0
ENV NOMAD_VERSION=0.9.5
ENV TERRAFORM_VERSION=0.12.7
ENV HASHICORP_RELEASES=https://releases.hashicorp.com
RUN apt-get update && apt-get install -y supervisor apt-transport-https \
ca-certificates \
curl \
......@@ -24,6 +18,11 @@ RUN add-apt-repository \
RUN apt-get update && apt-get install -y docker-ce-cli wget unzip
ENV CONSUL_VERSION=1.6.0
ENV NOMAD_VERSION=0.9.5
ENV TERRAFORM_VERSION=0.12.8
ENV HASHICORP_RELEASES=https://releases.hashicorp.com
RUN set -eux && \
mkdir -p /tmp/build && \
cd /tmp/build && \
......@@ -37,10 +36,14 @@ RUN set -eux && \
rm -rf /tmp/build && \
# tiny smoke test to ensure the binary we downloaded runs
consul version && \
nomad version
nomad version && \
terraform version
ADD supervisord.conf /etc/
RUN mkdir -p /var/log/supervisord/
COPY scripts/ /var/run/asapo/
ENTRYPOINT ["supervisord", "--configuration", "/etc/supervisord.conf"]
#!/usr/bin/env bash
influx=`dig +short @127.0.0.1 -p 8600 influxdb.service.asapo | head -1`
databases="asapo_receivers asapo_brokers"
for database in $databases
do
curl -i -XPOST http://${influx}:8086/query --data-urlencode "q=CREATE DATABASE $database"
done
provider "nomad" {
address = "http://localhost:4646"
}
variable "fluentd_logs" {
default = true
}
variable "nginx_version" {
default = "latest"
}
variable "asapo_imagename_suffix" {
default = ""
}
variable "asapo_image_tag" {
default = "latest"
}
variable "shared_dir" {
default = "/tmp"
}
data "template_file" "nginx" {
template = "${file("./asapo-nginx.nmd.tpl")}"
vars = {
nginx_version = "${var.nginx_version}"
}
}
data "template_file" "asapo_services" {
template = "${file("./asapo-services.nmd.tpl")}"
vars = {
image_suffix = "${var.asapo_imagename_suffix}:${var.asapo_image_tag}"
shared_dir = "${var.shared_dir}"
fluentd_logs = "${var.fluentd_logs}"
}
}
resource "nomad_job" "asapo-nginx" {
jobspec = "${data.template_file.nginx.rendered}"
}
resource "nomad_job" "asapo-services" {
jobspec = "${data.template_file.asapo_services.rendered}"
}
{
"BrokerDbAddress":"localhost:8400/mongo",
"MonitorDbAddress":"localhost:8400/influxdb",
"MonitorDbName": "asapo_brokers",
"port":{{ env "NOMAD_PORT_broker" }},
"LogLevel":"info",
"SecretFile":"/secrets/secret.key"
}
docker run --privileged --rm -v /var/run/docker.sock:/var/run/docker.sock -v `pwd`/jobs:/usr/local/nomad_jobs --name asapo --net=host -v /tmp/nomad:/tmp/nomad -v /var/lib/docker:/var/lib/docker -d yakser/asapo-cluster
#!/usr/bin/env bash
NOMAD_ALLOC_HOST_SHARED=/tmp/asapo/container_host_shared/nomad_alloc
SERVICE_DATA_CLUSTER_SHARED=/tmp/asapo/asapo_cluster_shared/service_data
DATA_GLOBAL_SHARED=/tmp/asapo/global_shared/data
mkdir -p $NOMAD_ALLOC_HOST_SHARED $SERVICE_DATA_CLUSTER_SHARED $DATA_GLOBAL_SHARED
chmod 777 $NOMAD_ALLOC_HOST_SHARED $SERVICE_DATA_CLUSTER_SHARED $DATA_GLOBAL_SHARED
docker run --privileged --rm -v /var/run/docker.sock:/var/run/docker.sock \
-v /var/lib/docker:/var/lib/docker \
-v $NOMAD_ALLOC_HOST_SHARED:$NOMAD_ALLOC_HOST_SHARED \
-v $SERVICE_DATA_CLUSTER_SHARED:$SERVICE_DATA_CLUSTER_SHARED \
-v $DATA_GLOBAL_SHARED:$DATA_GLOBAL_SHARED \
-e NOMAD_ALLOC=$NOMAD_ALLOC_HOST_SHARED \
-e TF_VAR_service_dir=$SERVICE_DATA_CLUSTER_SHARED \
-e TF_VAR_data_dir=$DATA_GLOBAL_SHARED \
--name asapo --net=host -d yakser/asapo-cluster
......@@ -22,18 +22,19 @@ job "asapo-brokers" {
driver = "docker"
config {
network_mode = "host"
dns_servers = ["127.0.0.1"]
image = "yakser/asapo-broker-dev:feature_virtualized-deployment.latest"
image = "yakser/asapo-broker${image_suffix}"
force_pull = true
volumes = ["local/config.json:/var/lib/broker/config.json"]
logging {
type = "fluentd"
config {
fluentd-address = "localhost:9881"
fluentd-async-connect = true
tag = "asapo.docker"
}
%{ if fluentd_logs }
logging {
type = "fluentd"
config {
fluentd-address = "localhost:9881"
fluentd-async-connect = true
tag = "asapo.docker"
}
}
%{endif}
}
resources {
......@@ -60,14 +61,14 @@ job "asapo-brokers" {
}
template {
source = "/usr/local/nomad_jobs/broker.json.tpl"
source = "${scripts_dir}/broker.json.tpl"
destination = "local/config.json"
change_mode = "restart"
}
template {
source = "/usr/local/nomad_jobs/auth_secret.key"
destination = "secrets/secret.key"
source = "${scripts_dir}/auth_secret.key"
destination = "local/secret.key"
change_mode = "restart"
}
} #task brokers
......
......@@ -23,16 +23,15 @@ job "asapo-mongo" {
config {
network_mode = "host"
image = "mongo:4.0.0"
volumes = ["/${meta.shared_storage}/mongodb:/data/db"]
image = "mongo:${mongo_version}"
volumes = ["/${service_dir}/mongodb:/data/db"]
}
resources {
cpu = 1500
memory = 12560
memory = "${mongo_total_memory_size}"
network {
port "mongo" {
static = 27017
static = "${mongo_port}"
}
}
}
......
......@@ -59,7 +59,7 @@ job "asapo-nginx" {
}
template {
source = "/usr/local/nomad_jobs/nginx.conf.tpl"
source = "${scripts_dir}/nginx.conf.tpl"
destination = "local/nginx.conf"
change_mode = "restart"
}
......
......@@ -21,19 +21,20 @@ job "asapo-perfmetrics" {
driver = "docker"
config {
dns_servers = ["127.0.0.1"]
network_mode = "host"
image = "influxdb"
volumes = ["/${meta.shared_storage}/influxdb:/var/lib/influxdb"]
image = "influxdb:${influxdb_version}"
volumes = ["/${service_dir}/influxdb:/var/lib/influxdb"]
}
env {
PRE_CREATE_DB="asapo_receivers;asapo_brokers"
}
resources {
cpu = 1500
memory = 32000
memory = "${influxdb_total_memory_size}"
network {
mbits = 10
port "influxdb" {
static = 8086
static = "${influxdb_port}"
}
}
}
......@@ -62,24 +63,21 @@ job "asapo-perfmetrics" {
driver = "docker"
env {
GF_SERVER_DOMAIN = "${attr.unique.hostname}"
GF_SERVER_DOMAIN = "$${attr.unique.hostname}"
GF_SERVER_ROOT_URL = "%(protocol)s://%(domain)s/performance/"
}
config {
dns_servers = ["127.0.0.1"]
network_mode = "host"
image = "grafana/grafana"
volumes = ["/${meta.shared_storage}/grafana:/var/lib/grafana"]
image = "grafana/grafana:${grafana_version}"
volumes = ["/${service_dir}/grafana:/var/lib/grafana"]
}
resources {
cpu = 1500
memory = 2560
memory = "${grafana_total_memory_size}"
network {
mbits = 10
port "grafana" {
static = 3000
static = "${grafana_port}"
}
}
}
......
......@@ -24,17 +24,20 @@ job "asapo-receivers" {
config {
network_mode = "host"
dns_servers = ["127.0.0.1"]
image = "yakser/asapo-receiver-dev:feature_virtualized-deployment.latest"
image = "yakser/asapo-receiver${image_suffix}"
force_pull = true
volumes = ["local/config.json:/var/lib/receiver/config.json",
"/bldocuments/support/asapo/data:/var/lib/receiver/data"]
"${data_dir}:/var/lib/receiver/data"]
%{ if fluentd_logs }
logging {
type = "fluentd"
config {
fluentd-address = "localhost:9881"
tag = "asapo.docker"
}
type = "fluentd"
config {
fluentd-address = "localhost:9881"
fluentd-async-connect = true
tag = "asapo.docker"
}
}
%{endif}
}
resources {
......@@ -42,9 +45,11 @@ job "asapo-receivers" {
port "recv" {}
port "recv_ds" {}
}
memory = 40000
memory = "${receiver_total_memory_size}"
}
service {
name = "asapo-receiver"
port = "recv"
......@@ -63,8 +68,13 @@ job "asapo-receivers" {
}
}
meta {
receiver_dataserver_cache_size = "${receiver_dataserver_cache_size}"
}
template {
source = "/usr/local/nomad_jobs/receiver.json.tpl"
source = "${scripts_dir}/receiver.json.tpl"
destination = "local/config.json"
change_mode = "restart"
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment