From b091554c891944c040219e40a3229d89c69c88ac Mon Sep 17 00:00:00 2001 From: Lars Janssen <lars.janssen@desy.de> Date: Thu, 9 Mar 2023 15:31:58 +0100 Subject: [PATCH] removed RP --- .../src/asapo_monitoring_server/main/main.go | 6 +- .../server/QueryServer.go | 148 +++++++++--------- 2 files changed, 73 insertions(+), 81 deletions(-) diff --git a/monitoring/monitoring_server/src/asapo_monitoring_server/main/main.go b/monitoring/monitoring_server/src/asapo_monitoring_server/main/main.go index da0e764b3..356a5fcfc 100644 --- a/monitoring/monitoring_server/src/asapo_monitoring_server/main/main.go +++ b/monitoring/monitoring_server/src/asapo_monitoring_server/main/main.go @@ -102,13 +102,13 @@ func main() { // It creates two databases. normalDB := settings.InfluxDbDatabase avgDB := normalDB + "_avg" - rpName := "one_day" + //rpName := "one_day" cqName := "avg_values" postStrings := []string{ "CREATE DATABASE " + normalDB, "CREATE DATABASE " + avgDB, - "CREATE RETENTION POLICY " + rpName + " ON " + normalDB + - " DURATION 24h REPLICATION 1 DEFAULT", + //"CREATE RETENTION POLICY " + rpName + " ON " + normalDB + + // " DURATION 24h REPLICATION 1 DEFAULT", "CREATE CONTINUOUS QUERY " + cqName + " ON " + normalDB + " BEGIN" + " SELECT mean(*) INTO " + avgDB + "..:MEASUREMENT FROM /.*/ GROUP BY time(12h),* END", diff --git a/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go b/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go index 8dcd8844f..675e6b9d3 100644 --- a/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go +++ b/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go @@ -283,8 +283,6 @@ func queryfunctionTransferSpeed(s *QueryServer, ctx context.Context, intervalInS } return arrayResult, nil } - -// Queries DB for transferspeed, using both the normal and the average bucket func queryTransferSpeed(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TotalTransferRateDataPoint, error) { // Querystring query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring @@ -353,8 +351,6 @@ func queryfunctionFileRate(s *QueryServer, ctx context.Context, query string) ([ return arrayResult, nil } - -// Queries DB for Filerate, using both the normal and the average bucket func queryFileRate(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TotalFileRateDataPoint, error) { // Querystrings query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring @@ -382,49 +378,6 @@ func queryFileRate(s *QueryServer, ctx context.Context, startTime string, endTim return arrayResult, nil } -// Queries DB for Tasktime -func queryfunctionTaskTime(s *QueryServer, ctx context.Context, query string) ([]*pb.TaskTimeDataPoint, error) { - var arrayResult []*pb.TaskTimeDataPoint - result, err := s.dbQueryApi.Query(ctx, query) - if err != nil { - return nil, err - } - - for result.Next() { - receiveIoTimeUs := float64(0) - writeToDiskTimeUs := float64(0) - writeToDatabaseTimeUs := float64(0) - rdsOutputTransferTimeUs := float64(0) - - if result.Record().Values()["avgTransferReceiveTimeUs"] != nil { - receiveIoTimeUs = result.Record().Values()["avgTransferReceiveTimeUs"].(float64) - } - if result.Record().Values()["avgWriteIoTimeUs"] != nil { - writeToDiskTimeUs = result.Record().Values()["avgWriteIoTimeUs"].(float64) - } - if result.Record().Values()["avgDbTimeUs"] != nil { - writeToDatabaseTimeUs = result.Record().Values()["avgDbTimeUs"].(float64) - } - if result.Record().Values()["avgRdsOutputTransferTimeUs"] != nil { - rdsOutputTransferTimeUs = result.Record().Values()["avgRdsOutputTransferTimeUs"].(float64) - } - - arrayResult = append(arrayResult, &pb.TaskTimeDataPoint{ - ReceiveIoTimeUs: uint32(receiveIoTimeUs), - WriteToDiskTimeUs: uint32(writeToDiskTimeUs), - WriteToDatabaseTimeUs: uint32(writeToDatabaseTimeUs), - RdsSendToConsumerTimeUs: uint32(rdsOutputTransferTimeUs), - }) - } - - if result.Err() != nil { - return nil, result.Err() - } - - return arrayResult, nil -} - -// Queries DB for Tasktime, using both the normal and the average bucket func queryTaskTime(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TaskTimeDataPoint, error) { // Querystrings query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring @@ -437,11 +390,52 @@ func queryTaskTime(s *QueryServer, ctx context.Context, startTime string, endTim " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data - arrayResult, err := queryfunctionTaskTime(s, ctx, query) // Query newer data + queryfunction := func(s *QueryServer, ctx context.Context, query string) ([]*pb.TaskTimeDataPoint, error) { + var arrayResult []*pb.TaskTimeDataPoint + result, err := s.dbQueryApi.Query(ctx, query) + if err != nil { + return nil, err + } + + for result.Next() { + receiveIoTimeUs := float64(0) + writeToDiskTimeUs := float64(0) + writeToDatabaseTimeUs := float64(0) + rdsOutputTransferTimeUs := float64(0) + + if result.Record().Values()["avgTransferReceiveTimeUs"] != nil { + receiveIoTimeUs = result.Record().Values()["avgTransferReceiveTimeUs"].(float64) + } + if result.Record().Values()["avgWriteIoTimeUs"] != nil { + writeToDiskTimeUs = result.Record().Values()["avgWriteIoTimeUs"].(float64) + } + if result.Record().Values()["avgDbTimeUs"] != nil { + writeToDatabaseTimeUs = result.Record().Values()["avgDbTimeUs"].(float64) + } + if result.Record().Values()["avgRdsOutputTransferTimeUs"] != nil { + rdsOutputTransferTimeUs = result.Record().Values()["avgRdsOutputTransferTimeUs"].(float64) + } + + arrayResult = append(arrayResult, &pb.TaskTimeDataPoint{ + ReceiveIoTimeUs: uint32(receiveIoTimeUs), + WriteToDiskTimeUs: uint32(writeToDiskTimeUs), + WriteToDatabaseTimeUs: uint32(writeToDatabaseTimeUs), + RdsSendToConsumerTimeUs: uint32(rdsOutputTransferTimeUs), + }) + } + + if result.Err() != nil { + return nil, result.Err() + } + + return arrayResult, nil + } + + arrayResult, err := queryfunction(s, ctx, query) // Query newer data if err != nil { return nil, err } - secondArrayResult, err := queryfunctionTaskTime(s, ctx, queryForOldData) // Query averaged data + secondArrayResult, err := queryfunction(s, ctx, queryForOldData) // Query averaged data if err != nil { return nil, err } @@ -452,33 +446,6 @@ func queryTaskTime(s *QueryServer, ctx context.Context, startTime string, endTim return arrayResult, nil } -// Queries DB for Memory usage -func queryfunctionMemoryUsage(s *QueryServer, ctx context.Context, query string) ([]*pb.RdsMemoryUsageDataPoint, error) { - var arrayResult []*pb.RdsMemoryUsageDataPoint - - result, err := s.dbQueryApi.Query(ctx, query) - if err != nil { - return nil, err - } - for result.Next() { - usedBytes := int64(0) - - if result.Record().Values()["rdsCacheUsedBytes"] != nil { - usedBytes = result.Record().Values()["rdsCacheUsedBytes"].(int64) - } - - arrayResult = append(arrayResult, &pb.RdsMemoryUsageDataPoint{ - TotalUsedMemory: uint64(usedBytes), - }) - } - if result.Err() != nil { - return nil, result.Err() - } - - return arrayResult, nil -} - -// Queries DB for Memory usage, using both the normal and the average bucket func queryMemoryUsage(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.RdsMemoryUsageDataPoint, error) { // Querystrings query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring @@ -491,11 +458,36 @@ func queryMemoryUsage(s *QueryServer, ctx context.Context, startTime string, end " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data - arrayResult, err := queryfunctionMemoryUsage(s, ctx, query) // Query for newer data + queryfunction := func(s *QueryServer, ctx context.Context, query string) ([]*pb.RdsMemoryUsageDataPoint, error) { + var arrayResult []*pb.RdsMemoryUsageDataPoint + + result, err := s.dbQueryApi.Query(ctx, query) + if err != nil { + return nil, err + } + for result.Next() { + usedBytes := int64(0) + + if result.Record().Values()["rdsCacheUsedBytes"] != nil { + usedBytes = result.Record().Values()["rdsCacheUsedBytes"].(int64) + } + + arrayResult = append(arrayResult, &pb.RdsMemoryUsageDataPoint{ + TotalUsedMemory: uint64(usedBytes), + }) + } + if result.Err() != nil { + return nil, result.Err() + } + + return arrayResult, nil + } + + arrayResult, err := queryfunction(s, ctx, query) // Query for newer data if err != nil { return nil, err } - secondArrayResult, err := queryfunctionMemoryUsage(s, ctx, queryForOldData) // Query for averaged data + secondArrayResult, err := queryfunction(s, ctx, queryForOldData) // Query for averaged data if err != nil { return nil, err } -- GitLab