diff --git a/deploy/asapo_services/scripts/monitoring_server.json.tpl b/deploy/asapo_services/scripts/monitoring_server.json.tpl index 9ac0424196908d85e08b5e55453c2829fe485f57..d0e440df69f0ecfbd35e3947b3e5671502d34554 100644 --- a/deploy/asapo_services/scripts/monitoring_server.json.tpl +++ b/deploy/asapo_services/scripts/monitoring_server.json.tpl @@ -3,5 +3,8 @@ "ServerPort": {{ env "NOMAD_PORT_monitoring_server" }}, "LogLevel": "{{ keyOrDefault "log_level" "debug" }}", "InfluxDbUrl":"http://localhost:8400/influxdb", - "InfluxDbDatabase": "asapo_monitoring" + "InfluxDbDatabase": "asapo_monitoring", + "RetentionPolicyTime": "12h", + "GroupingTime": "10m", + "MaxPoints": 500 } diff --git a/deploy/asapo_services_light/monitoring.json b/deploy/asapo_services_light/monitoring.json index ed5241dff07ec5f4d0b3b2e50cc29f48b1eb4a2a..63cf43a4d62c9bfeba20413d2ee8e6825c4d5ee3 100644 --- a/deploy/asapo_services_light/monitoring.json +++ b/deploy/asapo_services_light/monitoring.json @@ -3,5 +3,8 @@ "ServerPort": 8422, "LogLevel": "debug", "InfluxDbUrl":"http://localhost:8400/influxdb", - "InfluxDbDatabase": "asapo_monitoring" + "InfluxDbDatabase": "asapo_monitoring", + "RetentionPolicyTime": "12h", + "GroupingTime": "10m", + "MaxPoints": 500 } diff --git a/monitoring/monitoring_server/src/asapo_monitoring_server/main/main.go b/monitoring/monitoring_server/src/asapo_monitoring_server/main/main.go index 356a5fcfc9a9426aa30f2d0faa3b63e03fbaf1eb..3eded5fffaff985ca77d0757761a41e865dd2c8f 100644 --- a/monitoring/monitoring_server/src/asapo_monitoring_server/main/main.go +++ b/monitoring/monitoring_server/src/asapo_monitoring_server/main/main.go @@ -41,6 +41,18 @@ func loadConfig(configFileName string) (log.Level, server.Settings, error) { return log.FatalLevel, server.Settings{}, errors.New("'InfluxDbUrl' not set") } + if settings.RetentionPolicyTime == "" { + return log.FatalLevel, server.Settings{}, errors.New("'RetentionPolicyTime' not set") + } + + if settings.GroupingTime == "" { + return log.FatalLevel, server.Settings{}, errors.New("'GroupingTime' not set") + } + + if settings.MaxPoints == 0 { + return log.FatalLevel, server.Settings{}, errors.New("'MaxPoints' not set") + } + if settings.InfluxDbDatabase == "" { return log.FatalLevel, server.Settings{}, errors.New("'InfluxDbDatabase' not set") } @@ -97,27 +109,28 @@ func main() { log.Fatal(err.Error()) return } + log.SetLevel(logLevel) + // InfluxDB 1 fix, create database // Does multiple HTTP posts to DB // It creates two databases. normalDB := settings.InfluxDbDatabase avgDB := normalDB + "_avg" - //rpName := "one_day" + rpName := "one_day" cqName := "avg_values" postStrings := []string{ "CREATE DATABASE " + normalDB, "CREATE DATABASE " + avgDB, - //"CREATE RETENTION POLICY " + rpName + " ON " + normalDB + - // " DURATION 24h REPLICATION 1 DEFAULT", + "CREATE RETENTION POLICY " + rpName + " ON " + normalDB + + " DURATION " + settings.RetentionPolicyTime + " REPLICATION 1 DEFAULT", "CREATE CONTINUOUS QUERY " + cqName + " ON " + normalDB + " BEGIN" + - " SELECT mean(*) INTO " + avgDB + - "..:MEASUREMENT FROM /.*/ GROUP BY time(12h),* END", + " SELECT sum(*) INTO " + avgDB + + "..:MEASUREMENT FROM /.*/ GROUP BY time(" + settings.GroupingTime + "),* END", } for i := 0; i < len(postStrings); i++ { postTODB(postStrings[i], settings) + log.Debug("Modify DB with command: " + postStrings[i]) } - log.SetLevel(logLevel) - server.Start(settings) } diff --git a/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go b/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go index 675e6b9d3bf3730c634f1b10e5f988b3e60959a7..e6f6ebed124dd7922308e03dfd7f08676e22500c 100644 --- a/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go +++ b/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go @@ -5,20 +5,22 @@ import ( log "asapo_common/logger" "context" "errors" - "fmt" "github.com/influxdata/influxdb-client-go/v2/api" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "sort" "strconv" "strings" + "time" ) type QueryServer struct { pb.UnimplementedAsapoMonitoringQueryServiceServer - settings Settings - dbQueryApi api.QueryAPI + settings Settings + dbQueryApi api.QueryAPI + switchPeriod int64 + groupedInterval int64 } func (s *QueryServer) GetMetadata(ctx context.Context, _ *pb.Empty) (*pb.MetadataResponse, error) { @@ -168,30 +170,39 @@ func doesStringContainsDangerousElements(input string) bool { strings.Contains(input, "}") } -// Modifies filter to query averaged data -func generateFilterForAVGData(s *QueryServer, filter string) string { - // Map of all modifications that a querystring needs to access averaged data. - // Currently, there is only one modification needed so this is a bit redundant. - filterModifications := make(map[string]string) - filterModifications[s.settings.InfluxDbDatabase] = s.settings.InfluxDbDatabase + averageSuffix - filterModifications[dbMeasurementBrokerFileRequests] = "mean_" + dbMeasurementBrokerFileRequests - filterModifications[dbMeasurementFtsTransfers] = "mean_" + dbMeasurementFtsTransfers - filterModifications[dbMeasurementFileInput] = "mean_" + dbMeasurementFileInput - filterModifications[dbMeasurementRdsCacheMemoryUsage] = "mean_" + dbMeasurementRdsCacheMemoryUsage - filterModifications[dbMeasurementRdsFileRequests] = "mean_" + dbMeasurementRdsFileRequests - alteredFilter := filter - // Do all the filterModifications - - for key, element := range filterModifications { - alteredFilter = strings.ReplaceAll(filter, key, element) - } - return alteredFilter +func GetRequestPeriods(fromTimestamp uint64, toTimestamp uint64, groupedInterval int64, + switchPeriod int64, maxPoints uint64) (string, string, int) { + // Define time ranges for full and reduce DBs and interval between data points + + startTime := strconv.FormatUint(fromTimestamp, 10) + endTime := strconv.FormatUint(toTimestamp, 10) + + intervalInSec := int((toTimestamp - fromTimestamp) / maxPoints) + if intervalInSec < 1 { + intervalInSec = 1 + } + + switchTime := uint64(time.Now().Unix() - switchPeriod) + timeRangeAve := "" + timeRange := "start: " + startTime + ", stop: " + endTime + if fromTimestamp < switchTime { + intervalInSec = int(groupedInterval) + if toTimestamp < switchTime { + timeRangeAve = "start: " + startTime + ", stop: " + endTime + timeRange = "" + } else { + timeRangeAve = "start: " + startTime + ", stop: " + strconv.FormatUint(switchTime, 10) + timeRange = "start: " + strconv.FormatUint(switchTime, 10) + ", stop: " + endTime + } + } + return timeRange, timeRangeAve, intervalInSec } func (s *QueryServer) GetDataPoints(ctx context.Context, query *pb.DataPointsQuery) (*pb.DataPointsResponse, error) { - startTime := strconv.FormatUint(query.FromTimestamp, 10) - endTime := strconv.FormatUint(query.ToTimestamp, 10) - intervalInSec := 5 + + timeRange, timeRangeAve, intervalInSec := GetRequestPeriods(query.FromTimestamp, + query.ToTimestamp, s.groupedInterval, s.switchPeriod, s.settings.MaxPoints) + nEntries := int((query.ToTimestamp - query.FromTimestamp) / uint64(intervalInSec)) if query.BeamtimeFilter == "" { return nil, status.Errorf(codes.InvalidArgument, "Beamtime is required") @@ -210,28 +221,28 @@ func (s *QueryServer) GetDataPoints(ctx context.Context, query *pb.DataPointsQue response := pb.DataPointsResponse{} transferSpeedFilter := createFilter(query, transferSpeedFilterGenerator) - transferSpeed, err := queryTransferSpeed(s, ctx, startTime, endTime, intervalInSec, transferSpeedFilter) + transferSpeed, err := queryTransferSpeed(s, ctx, timeRange, timeRangeAve, intervalInSec, transferSpeedFilter, nEntries) if err != nil { return nil, err } response.TransferRates = transferSpeed fileRateFilter := createFilter(query, transferRateFilterGenerator) - fileRates, err := queryFileRate(s, ctx, startTime, endTime, intervalInSec, fileRateFilter) + fileRates, err := queryFileRate(s, ctx, timeRange, timeRangeAve, intervalInSec, fileRateFilter, nEntries) if err != nil { return nil, err } response.FileRates = fileRates taskTimeFilter := createFilter(query, taskTimeFilterGenerator) - taskTime, err := queryTaskTime(s, ctx, startTime, endTime, intervalInSec, taskTimeFilter) + taskTime, err := queryTaskTime(s, ctx, timeRange, timeRangeAve, intervalInSec, taskTimeFilter, nEntries) if err != nil { return nil, err } response.TaskTimes = taskTime memoryUsageFilter := createFilter(query, memoryUsageFilterGenerator) - memoryUsage, err := queryMemoryUsage(s, ctx, startTime, endTime, intervalInSec, memoryUsageFilter) + memoryUsage, err := queryMemoryUsage(s, ctx, timeRange, timeRangeAve, intervalInSec, memoryUsageFilter, nEntries) if err != nil { return nil, err } @@ -243,7 +254,17 @@ func (s *QueryServer) GetDataPoints(ctx context.Context, query *pb.DataPointsQue } // Queries DB for transferspeed -func queryfunctionTransferSpeed(s *QueryServer, ctx context.Context, intervalInSec int, query string) ([]*pb.TotalTransferRateDataPoint, error) { +func queryfunctionTransferSpeed(s *QueryServer, ctx context.Context, + timeRange string, bucket string, inset string, intervalInSec int, filter string) ([]*pb.TotalTransferRateDataPoint, error) { + + query := "from(bucket: \"" + bucket + "\")" + // Normal querystring + " |> range(" + timeRange + ")" + + " |> filter(fn: (r) => r._measurement == \"" + dbMeasurementFileInput + "\" or r._measurement == \"" + dbMeasurementBrokerFileRequests + "\" or r._measurement == \"" + dbMeasurementRdsFileRequests + "\" or r._measurement == \"" + dbMeasurementFtsTransfers + "\")" + + " |> filter(fn: (r) => r._field == \"" + inset + "totalInputFileSize\" or r._field == \"" + inset + "totalRequestedFileSize\" or r._field == \"" + inset + "totalRdsOutputFileSize\" or r._field == \"" + inset + "totalFtsTransferredFileSize\")" + + filter + + " |> group(columns: [\"_field\"])" + + " |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: sum, createEmpty: true)" + + " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" result, err := s.dbQueryApi.Query(ctx, query) if err != nil { @@ -257,17 +278,17 @@ func queryfunctionTransferSpeed(s *QueryServer, ctx context.Context, intervalInS sendRdsBytes := int64(0) sendFtsBytes := int64(0) - if result.Record().Values()["totalInputFileSize"] != nil { - recvBytes = result.Record().Values()["totalInputFileSize"].(int64) + if result.Record().Values()[inset+"totalInputFileSize"] != nil { + recvBytes = result.Record().Values()[inset+"totalInputFileSize"].(int64) * 1000 } - if result.Record().Values()["totalRequestedFileSize"] != nil { - sendBytes = result.Record().Values()["totalRequestedFileSize"].(int64) + if result.Record().Values()[inset+"totalRequestedFileSize"] != nil { + sendBytes = result.Record().Values()[inset+"totalRequestedFileSize"].(int64) } - if result.Record().Values()["totalRdsOutputFileSize"] != nil { - sendRdsBytes = result.Record().Values()["totalRdsOutputFileSize"].(int64) + if result.Record().Values()[inset+"totalRdsOutputFileSize"] != nil { + sendRdsBytes = result.Record().Values()[inset+"totalRdsOutputFileSize"].(int64) } - if result.Record().Values()["totalFtsTransferredFileSize"] != nil { - sendFtsBytes = result.Record().Values()["totalFtsTransferredFileSize"].(int64) + if result.Record().Values()[inset+"totalFtsTransferredFileSize"] != nil { + sendFtsBytes = result.Record().Values()[inset+"totalFtsTransferredFileSize"].(int64) } arrayResult = append(arrayResult, &pb.TotalTransferRateDataPoint{ @@ -283,35 +304,43 @@ func queryfunctionTransferSpeed(s *QueryServer, ctx context.Context, intervalInS } return arrayResult, nil } -func queryTransferSpeed(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TotalTransferRateDataPoint, error) { - // Querystring - query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring - " |> range(start: " + startTime + ", stop: " + endTime + ")" + - " |> filter(fn: (r) => r._measurement == \"" + dbMeasurementFileInput + "\" or r._measurement == \"" + dbMeasurementBrokerFileRequests + "\" or r._measurement == \"" + dbMeasurementRdsFileRequests + "\" or r._measurement == \"" + dbMeasurementFtsTransfers + "\")" + - " |> filter(fn: (r) => r._field == \"totalInputFileSize\" or r._field == \"totalRequestedFileSize\" or r._field == \"totalRdsOutputFileSize\" or r._field == \"totalFtsTransferredFileSize\")" + - filter + - " |> group(columns: [\"_field\"])" + - " |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: sum, createEmpty: true)" + - " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" - queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data +func queryTransferSpeed(s *QueryServer, ctx context.Context, timeRange string, + timeRangeAve string, intervalInSec int, filter string, nEntr int) ([]*pb.TotalTransferRateDataPoint, error) { - arrayResult, err := queryfunctionTransferSpeed(s, ctx, intervalInSec, query) // Query newer data - if err != nil { - return nil, err - } - secondArrayResult, err := queryfunctionTransferSpeed(s, ctx, intervalInSec, queryForOldData) // Query averaged data + arrayResult, err := queryfunctionTransferSpeed(s, ctx, timeRange, + s.settings.InfluxDbDatabase, "", intervalInSec, filter) if err != nil { return nil, err } - // Append the results of the second query to those of the first - for a := range secondArrayResult { - arrayResult = append(arrayResult, secondArrayResult[a]) + + // Querystring for averaged data + if timeRangeAve != "" { + secondArrayResult, err := queryfunctionTransferSpeed(s, ctx, timeRangeAve, + s.settings.InfluxDbDatabase+averageSuffix, "sum_", intervalInSec, filter) + if err != nil { + return nil, err + } + if len(secondArrayResult) == 0 { + secondArrayResult = make([]*pb.TotalTransferRateDataPoint, nEntr-len(arrayResult)) + } + arrayResult = append(secondArrayResult, arrayResult...) } return arrayResult, nil } // Queries DB for Filerate -func queryfunctionFileRate(s *QueryServer, ctx context.Context, query string) ([]*pb.TotalFileRateDataPoint, error) { +func queryfunctionFileRate(s *QueryServer, ctx context.Context, timeRange string, + bucket string, inset string, intervalInSec int, filter string) ([]*pb.TotalFileRateDataPoint, error) { + + query := "from(bucket: \"" + bucket + "\")" + // Normal querystring + " |> range(" + timeRange + ")" + + " |> filter(fn: (r) => r._measurement == \"" + dbMeasurementRdsFileRequests + "\" or r._measurement == \"" + dbMeasurementBrokerFileRequests + "\" or r._measurement == \"" + dbMeasurementFtsTransfers + "\")" + + " |> filter(fn: (r) => r._field == \"" + inset + "totalRdsHits\" or r._field == \"" + inset + "totalRdsMisses\" or r._field == \"" + inset + "requestedFileCount\" or r._field == \"" + inset + "ftsFileCount\")" + + filter + + " |> group(columns: [\"_field\"])" + + " |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: sum, createEmpty: true)" + + " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" + rdsResponses, err := s.dbQueryApi.Query(ctx, query) if err != nil { return nil, err @@ -324,17 +353,17 @@ func queryfunctionFileRate(s *QueryServer, ctx context.Context, query string) ([ miss := uint32(0) fromDisk := uint32(0) - if rdsResponses.Record().Values()["requestedFileCount"] != nil { - totalRequests = uint32(rdsResponses.Record().Values()["requestedFileCount"].(int64)) + if rdsResponses.Record().Values()[inset+"requestedFileCount"] != nil { + totalRequests = uint32(rdsResponses.Record().Values()[inset+"requestedFileCount"].(int64)) } - if rdsResponses.Record().Values()["totalRdsHits"] != nil { - hit = uint32(rdsResponses.Record().Values()["totalRdsHits"].(int64)) + if rdsResponses.Record().Values()[inset+"totalRdsHits"] != nil { + hit = uint32(rdsResponses.Record().Values()[inset+"totalRdsHits"].(int64)) } - if rdsResponses.Record().Values()["totalRdsMisses"] != nil { - miss = uint32(rdsResponses.Record().Values()["totalRdsMisses"].(int64)) + if rdsResponses.Record().Values()[inset+"totalRdsMisses"] != nil { + miss = uint32(rdsResponses.Record().Values()[inset+"totalRdsMisses"].(int64)) } - if rdsResponses.Record().Values()["ftsFileCount"] != nil { - fromDisk = uint32(rdsResponses.Record().Values()["ftsFileCount"].(int64)) + if rdsResponses.Record().Values()[inset+"ftsFileCount"] != nil { + fromDisk = uint32(rdsResponses.Record().Values()[inset+"ftsFileCount"].(int64)) } arrayResult = append(arrayResult, &pb.TotalFileRateDataPoint{ @@ -351,46 +380,43 @@ func queryfunctionFileRate(s *QueryServer, ctx context.Context, query string) ([ return arrayResult, nil } -func queryFileRate(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TotalFileRateDataPoint, error) { - // Querystrings - query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring - " |> range(start: " + startTime + ", stop: " + endTime + ")" + - " |> filter(fn: (r) => r._measurement == \"" + dbMeasurementRdsFileRequests + "\" or r._measurement == \"" + dbMeasurementBrokerFileRequests + "\" or r._measurement == \"" + dbMeasurementFtsTransfers + "\")" + - " |> filter(fn: (r) => r._field == \"totalRdsHits\" or r._field == \"totalRdsMisses\" or r._field == \"requestedFileCount\" or r._field == \"ftsFileCount\")" + - filter + - " |> group(columns: [\"_field\"])" + - " |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: sum, createEmpty: true)" + - " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" - queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data - - arrayResult, err := queryfunctionFileRate(s, ctx, query) // Query newer data - if err != nil { - return nil, err - } - secondArrayResult, err := queryfunctionFileRate(s, ctx, queryForOldData) // Query averaged data +func queryFileRate(s *QueryServer, ctx context.Context, timeRange string, + timeRangeAve string, intervalInSec int, filter string, nEntr int) ([]*pb.TotalFileRateDataPoint, error) { + arrayResult, err := queryfunctionFileRate(s, ctx, timeRange, + s.settings.InfluxDbDatabase, "", intervalInSec, filter) // Query newer data if err != nil { return nil, err } - // Append the results of the second query to those of the first - for a := range secondArrayResult { - arrayResult = append(arrayResult, secondArrayResult[a]) + + // Querystring for averaged data + if timeRangeAve != "" { + secondArrayResult, err := queryfunctionFileRate(s, ctx, timeRangeAve, + s.settings.InfluxDbDatabase+averageSuffix, "sum_", intervalInSec, filter) + if err != nil { + return nil, err + } + if len(secondArrayResult) == 0 { + secondArrayResult = make([]*pb.TotalFileRateDataPoint, nEntr-len(arrayResult)) + } + arrayResult = append(secondArrayResult, arrayResult...) } return arrayResult, nil } -func queryTaskTime(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TaskTimeDataPoint, error) { - // Querystrings - query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring - " |> range(start: " + startTime + ", stop: " + endTime + ")" + - " |> filter(fn: (r) => r._measurement == \"" + dbMeasurementFileInput + "\" or r._measurement == \"" + dbMeasurementRdsFileRequests + "\")" + - " |> filter(fn: (r) => r._field == \"avgTransferReceiveTimeUs\" or r._field == \"avgWriteIoTimeUs\" or r._field == \"avgDbTimeUs\" or r._field == \"avgRdsOutputTransferTimeUs\")" + - filter + - " |> group(columns: [\"_field\"])" + - " |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: mean, createEmpty: true)" + - " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" - queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data +func queryTaskTime(s *QueryServer, ctx context.Context, timeRange string, + timeRangeAve string, intervalInSec int, filter string, nEntr int) ([]*pb.TaskTimeDataPoint, error) { + + queryfunction := func(s *QueryServer, ctx context.Context, timeRange string, bucket string, + inset string, intervalInSec int, filter string) ([]*pb.TaskTimeDataPoint, error) { + query := "from(bucket: \"" + bucket + "\")" + // Normal querystring + " |> range(" + timeRange + ")" + + " |> filter(fn: (r) => r._measurement == \"" + dbMeasurementFileInput + "\" or r._measurement == \"" + dbMeasurementRdsFileRequests + "\")" + + " |> filter(fn: (r) => r._field == \"" + inset + "avgTransferReceiveTimeUs\" or r._field == \"" + inset + "avgWriteIoTimeUs\" or r._field == \"" + inset + "avgDbTimeUs\" or r._field == \"" + inset + "avgRdsOutputTransferTimeUs\")" + + filter + + " |> group(columns: [\"_field\"])" + + " |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: mean, createEmpty: true)" + + " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" - queryfunction := func(s *QueryServer, ctx context.Context, query string) ([]*pb.TaskTimeDataPoint, error) { var arrayResult []*pb.TaskTimeDataPoint result, err := s.dbQueryApi.Query(ctx, query) if err != nil { @@ -403,17 +429,17 @@ func queryTaskTime(s *QueryServer, ctx context.Context, startTime string, endTim writeToDatabaseTimeUs := float64(0) rdsOutputTransferTimeUs := float64(0) - if result.Record().Values()["avgTransferReceiveTimeUs"] != nil { - receiveIoTimeUs = result.Record().Values()["avgTransferReceiveTimeUs"].(float64) + if result.Record().Values()[inset+"avgTransferReceiveTimeUs"] != nil { + receiveIoTimeUs = result.Record().Values()[inset+"avgTransferReceiveTimeUs"].(float64) } - if result.Record().Values()["avgWriteIoTimeUs"] != nil { - writeToDiskTimeUs = result.Record().Values()["avgWriteIoTimeUs"].(float64) + if result.Record().Values()[inset+"avgWriteIoTimeUs"] != nil { + writeToDiskTimeUs = result.Record().Values()[inset+"avgWriteIoTimeUs"].(float64) } - if result.Record().Values()["avgDbTimeUs"] != nil { - writeToDatabaseTimeUs = result.Record().Values()["avgDbTimeUs"].(float64) + if result.Record().Values()[inset+"avgDbTimeUs"] != nil { + writeToDatabaseTimeUs = result.Record().Values()[inset+"avgDbTimeUs"].(float64) } - if result.Record().Values()["avgRdsOutputTransferTimeUs"] != nil { - rdsOutputTransferTimeUs = result.Record().Values()["avgRdsOutputTransferTimeUs"].(float64) + if result.Record().Values()[inset+"avgRdsOutputTransferTimeUs"] != nil { + rdsOutputTransferTimeUs = result.Record().Values()[inset+"avgRdsOutputTransferTimeUs"].(float64) } arrayResult = append(arrayResult, &pb.TaskTimeDataPoint{ @@ -431,36 +457,42 @@ func queryTaskTime(s *QueryServer, ctx context.Context, startTime string, endTim return arrayResult, nil } - arrayResult, err := queryfunction(s, ctx, query) // Query newer data - if err != nil { - return nil, err - } - secondArrayResult, err := queryfunction(s, ctx, queryForOldData) // Query averaged data + arrayResult, err := queryfunction(s, ctx, timeRange, + s.settings.InfluxDbDatabase, "", intervalInSec, filter) // Query newer data if err != nil { return nil, err } - // Append the results of the second query to those of the first - for a := range secondArrayResult { - arrayResult = append(arrayResult, secondArrayResult[a]) + + // Querystring for averaged data + if timeRangeAve != "" { + secondArrayResult, err := queryfunction(s, ctx, timeRangeAve, + s.settings.InfluxDbDatabase+averageSuffix, "sum_", intervalInSec, filter) + if err != nil { + return nil, err + } + if len(secondArrayResult) == 0 { + secondArrayResult = make([]*pb.TaskTimeDataPoint, nEntr-len(arrayResult)) + } + arrayResult = append(secondArrayResult, arrayResult...) } return arrayResult, nil } -func queryMemoryUsage(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.RdsMemoryUsageDataPoint, error) { - // Querystrings - query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring - " |> range(start: " + startTime + ", stop: " + endTime + ")" + - " |> filter(fn: (r) => r._measurement == \"" + dbMeasurementRdsCacheMemoryUsage + "\")" + - " |> filter(fn: (r) => r._field == \"rdsCacheUsedBytes\")" + - filter + - " |> group(columns: [\"_field\"])" + - " |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: sum, createEmpty: true)" + - " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" - queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data +func queryMemoryUsage(s *QueryServer, ctx context.Context, timeRange string, + timeRangeAve string, intervalInSec int, filter string, nEntr int) ([]*pb.RdsMemoryUsageDataPoint, error) { - queryfunction := func(s *QueryServer, ctx context.Context, query string) ([]*pb.RdsMemoryUsageDataPoint, error) { + queryfunction := func(s *QueryServer, ctx context.Context, timeRange string, bucket string, + inset string, intervalInSec int, filter string) ([]*pb.RdsMemoryUsageDataPoint, error) { var arrayResult []*pb.RdsMemoryUsageDataPoint - + // Querystrings + query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring + " |> range(" + timeRange + ")" + + " |> filter(fn: (r) => r._measurement == \"" + dbMeasurementRdsCacheMemoryUsage + "\")" + + " |> filter(fn: (r) => r._field == \"rdsCacheUsedBytes\")" + + filter + + " |> group(columns: [\"_field\"])" + + " |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: sum, createEmpty: true)" + + " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" result, err := s.dbQueryApi.Query(ctx, query) if err != nil { return nil, err @@ -468,8 +500,8 @@ func queryMemoryUsage(s *QueryServer, ctx context.Context, startTime string, end for result.Next() { usedBytes := int64(0) - if result.Record().Values()["rdsCacheUsedBytes"] != nil { - usedBytes = result.Record().Values()["rdsCacheUsedBytes"].(int64) + if result.Record().Values()[inset+"rdsCacheUsedBytes"] != nil { + usedBytes = result.Record().Values()[inset+"rdsCacheUsedBytes"].(int64) } arrayResult = append(arrayResult, &pb.RdsMemoryUsageDataPoint{ @@ -483,18 +515,25 @@ func queryMemoryUsage(s *QueryServer, ctx context.Context, startTime string, end return arrayResult, nil } - arrayResult, err := queryfunction(s, ctx, query) // Query for newer data + arrayResult, err := queryfunction(s, ctx, timeRange, + s.settings.InfluxDbDatabase, "", intervalInSec, filter) // Query for newer data if err != nil { return nil, err } - secondArrayResult, err := queryfunction(s, ctx, queryForOldData) // Query for averaged data - if err != nil { - return nil, err - } - // Append the results of the second query to those of the first - for a := range secondArrayResult { - arrayResult = append(arrayResult, secondArrayResult[a]) + + // Querystring for averaged data + if timeRangeAve != "" { + secondArrayResult, err := queryfunction(s, ctx, timeRangeAve, + s.settings.InfluxDbDatabase+averageSuffix, "sum_", intervalInSec, filter) + if err != nil { + return nil, err + } + if len(secondArrayResult) == 0 { + secondArrayResult = make([]*pb.RdsMemoryUsageDataPoint, nEntr-len(arrayResult)) + } + arrayResult = append(secondArrayResult, arrayResult...) } + return arrayResult, nil } @@ -515,7 +554,6 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) ( ) if err != nil { - fmt.Printf("Error: %v\n", err) return nil, err } diff --git a/monitoring/monitoring_server/src/asapo_monitoring_server/server/ServerEntrypoint.go b/monitoring/monitoring_server/src/asapo_monitoring_server/server/ServerEntrypoint.go index 669cd6b2e7758ddd0a781faf156dc68aa45a0765..778dcb7ce24f9e03bf92206d80c6af009e1b0e59 100644 --- a/monitoring/monitoring_server/src/asapo_monitoring_server/server/ServerEntrypoint.go +++ b/monitoring/monitoring_server/src/asapo_monitoring_server/server/ServerEntrypoint.go @@ -4,29 +4,57 @@ import ( pb "asapo_common/generated_proto" log "asapo_common/logger" "context" + "errors" influxdb2 "github.com/influxdata/influxdb-client-go/v2" "google.golang.org/grpc" "net" "strconv" + "time" ) +func GetSwitshPeriod(retentionTime string, groupingTime string) (int64, int64, error) { + // Return time, at which Querry server switch from full to reduce DB + retentionTimeInt, err := time.ParseDuration(retentionTime) + if err != nil { + return 0, 0, err + } + groupingTimeInt, err := time.ParseDuration(groupingTime) + if err != nil { + return 0, 0, err + } + + switchPeriod := retentionTimeInt.Seconds() - groupingTimeInt.Seconds() + if switchPeriod < 0 { + return 0, 0, errors.New("retention time is less then grouping time") + } + return int64(switchPeriod), int64(groupingTimeInt.Seconds()), nil +} + func Start(settings Settings) { lis, err := net.Listen("tcp", ":"+strconv.Itoa(int(settings.ServerPort))) if err != nil { log.Fatal("failed to listen: ", err) } - influxClient := influxdb2.NewClient(settings.InfluxDbUrl, ""/*settings.InfluxDb2AuthToken*/) + influxClient := influxdb2.NewClient(settings.InfluxDbUrl, "" /*settings.InfluxDb2AuthToken*/) // influxClient.BucketsAPI().CreateBucketWithName(context.Background(), domain.Organization{}, "bucketName", domain.RetentionRules{}) queryServer := QueryServer{} ingestServer := IngestServer{} - dbQueryApi := influxClient.QueryAPI(""/*settings.InfluxDb2Org*/) + dbQueryApi := influxClient.QueryAPI("" /*settings.InfluxDb2Org*/) queryServer.dbQueryApi = dbQueryApi queryServer.settings = settings - dbWriterApi := influxClient.WriteAPI(""/*settings.InfluxDb2Org*/, settings.InfluxDbDatabase) + switchPeriod, groupedInterval, err := GetSwitshPeriod(settings.RetentionPolicyTime, settings.GroupingTime) + if err != nil { + log.Fatal("failed to get switch period", err) + return + } + queryServer.switchPeriod = switchPeriod + queryServer.groupedInterval = groupedInterval + + dbWriterApi := influxClient.WriteAPI("" /*settings.InfluxDb2Org*/, settings.InfluxDbDatabase) ingestServer.dbWriterApi = dbWriterApi ingestServer.settings = settings diff --git a/monitoring/monitoring_server/src/asapo_monitoring_server/server/Settings.go b/monitoring/monitoring_server/src/asapo_monitoring_server/server/Settings.go index 2f2b0b9fa763ae62ac590f7cd18b4fb487713920..c0d120b12f9b25aeba1a1fb65c9ea80b50498fd3 100644 --- a/monitoring/monitoring_server/src/asapo_monitoring_server/server/Settings.go +++ b/monitoring/monitoring_server/src/asapo_monitoring_server/server/Settings.go @@ -45,4 +45,9 @@ type Settings struct { InfluxDbUrl string // e.g. "http://127.0.0.1:8086" InfluxDbDatabase string // e.g. "asapo-monitoring" + + RetentionPolicyTime string // e.g. 1h + GroupingTime string // e.g. 5m + MaxPoints uint64 // e.g. 500 + }