Skip to content
Snippets Groups Projects
Commit 8820ac8e authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Merge branch 'fix_monitoring' into 'develop'

Reduce memory footprint from asapo monitoring

See merge request asapo/asapo!205
parents c467e43e 05895395
No related branches found
No related tags found
No related merge requests found
......@@ -3,5 +3,8 @@
"ServerPort": {{ env "NOMAD_PORT_monitoring_server" }},
"LogLevel": "{{ keyOrDefault "log_level" "debug" }}",
"InfluxDbUrl":"http://localhost:8400/influxdb",
"InfluxDbDatabase": "asapo_monitoring"
"InfluxDbDatabase": "asapo_monitoring",
"RetentionPolicyTime": "12h",
"GroupingTime": "10m",
"MaxPoints": 500
}
......@@ -3,5 +3,8 @@
"ServerPort": 8422,
"LogLevel": "debug",
"InfluxDbUrl":"http://localhost:8400/influxdb",
"InfluxDbDatabase": "asapo_monitoring"
"InfluxDbDatabase": "asapo_monitoring",
"RetentionPolicyTime": "12h",
"GroupingTime": "10m",
"MaxPoints": 500
}
......@@ -41,6 +41,18 @@ func loadConfig(configFileName string) (log.Level, server.Settings, error) {
return log.FatalLevel, server.Settings{}, errors.New("'InfluxDbUrl' not set")
}
if settings.RetentionPolicyTime == "" {
return log.FatalLevel, server.Settings{}, errors.New("'RetentionPolicyTime' not set")
}
if settings.GroupingTime == "" {
return log.FatalLevel, server.Settings{}, errors.New("'GroupingTime' not set")
}
if settings.MaxPoints == 0 {
return log.FatalLevel, server.Settings{}, errors.New("'MaxPoints' not set")
}
if settings.InfluxDbDatabase == "" {
return log.FatalLevel, server.Settings{}, errors.New("'InfluxDbDatabase' not set")
}
......@@ -97,27 +109,28 @@ func main() {
log.Fatal(err.Error())
return
}
log.SetLevel(logLevel)
// InfluxDB 1 fix, create database
// Does multiple HTTP posts to DB
// It creates two databases.
normalDB := settings.InfluxDbDatabase
avgDB := normalDB + "_avg"
//rpName := "one_day"
rpName := "one_day"
cqName := "avg_values"
postStrings := []string{
"CREATE DATABASE " + normalDB,
"CREATE DATABASE " + avgDB,
//"CREATE RETENTION POLICY " + rpName + " ON " + normalDB +
// " DURATION 24h REPLICATION 1 DEFAULT",
"CREATE RETENTION POLICY " + rpName + " ON " + normalDB +
" DURATION " + settings.RetentionPolicyTime + " REPLICATION 1 DEFAULT",
"CREATE CONTINUOUS QUERY " + cqName + " ON " + normalDB + " BEGIN" +
" SELECT mean(*) INTO " + avgDB +
"..:MEASUREMENT FROM /.*/ GROUP BY time(12h),* END",
" SELECT sum(*) INTO " + avgDB +
"..:MEASUREMENT FROM /.*/ GROUP BY time(" + settings.GroupingTime + "),* END",
}
for i := 0; i < len(postStrings); i++ {
postTODB(postStrings[i], settings)
log.Debug("Modify DB with command: " + postStrings[i])
}
log.SetLevel(logLevel)
server.Start(settings)
}
......@@ -4,29 +4,57 @@ import (
pb "asapo_common/generated_proto"
log "asapo_common/logger"
"context"
"errors"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"google.golang.org/grpc"
"net"
"strconv"
"time"
)
func GetSwitshPeriod(retentionTime string, groupingTime string) (int64, int64, error) {
// Return time, at which Querry server switch from full to reduce DB
retentionTimeInt, err := time.ParseDuration(retentionTime)
if err != nil {
return 0, 0, err
}
groupingTimeInt, err := time.ParseDuration(groupingTime)
if err != nil {
return 0, 0, err
}
switchPeriod := retentionTimeInt.Seconds() - groupingTimeInt.Seconds()
if switchPeriod < 0 {
return 0, 0, errors.New("retention time is less then grouping time")
}
return int64(switchPeriod), int64(groupingTimeInt.Seconds()), nil
}
func Start(settings Settings) {
lis, err := net.Listen("tcp", ":"+strconv.Itoa(int(settings.ServerPort)))
if err != nil {
log.Fatal("failed to listen: ", err)
}
influxClient := influxdb2.NewClient(settings.InfluxDbUrl, ""/*settings.InfluxDb2AuthToken*/)
influxClient := influxdb2.NewClient(settings.InfluxDbUrl, "" /*settings.InfluxDb2AuthToken*/)
// influxClient.BucketsAPI().CreateBucketWithName(context.Background(), domain.Organization{}, "bucketName", domain.RetentionRules{})
queryServer := QueryServer{}
ingestServer := IngestServer{}
dbQueryApi := influxClient.QueryAPI(""/*settings.InfluxDb2Org*/)
dbQueryApi := influxClient.QueryAPI("" /*settings.InfluxDb2Org*/)
queryServer.dbQueryApi = dbQueryApi
queryServer.settings = settings
dbWriterApi := influxClient.WriteAPI(""/*settings.InfluxDb2Org*/, settings.InfluxDbDatabase)
switchPeriod, groupedInterval, err := GetSwitshPeriod(settings.RetentionPolicyTime, settings.GroupingTime)
if err != nil {
log.Fatal("failed to get switch period", err)
return
}
queryServer.switchPeriod = switchPeriod
queryServer.groupedInterval = groupedInterval
dbWriterApi := influxClient.WriteAPI("" /*settings.InfluxDb2Org*/, settings.InfluxDbDatabase)
ingestServer.dbWriterApi = dbWriterApi
ingestServer.settings = settings
......
......@@ -45,4 +45,9 @@ type Settings struct {
InfluxDbUrl string // e.g. "http://127.0.0.1:8086"
InfluxDbDatabase string // e.g. "asapo-monitoring"
RetentionPolicyTime string // e.g. 1h
GroupingTime string // e.g. 5m
MaxPoints uint64 // e.g. 500
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment