From 05895395c9ac3c9374cde466c0809b0b4b9c43b4 Mon Sep 17 00:00:00 2001
From: karnem <mikhail.karnevskiy@desy.de>
Date: Thu, 26 Oct 2023 15:57:18 +0200
Subject: [PATCH] Expose parameters of the retention policy to config file. Use
 sum of the data points instead average. Fix inset in the fields name of the
 InfluxDB response. Split request to the InfluxDB to two periods: full data
 and grouped data. Add results of response via concatenation of vectors.

---
 .../scripts/monitoring_server.json.tpl        |   5 +-
 deploy/asapo_services_light/monitoring.json   |   5 +-
 .../src/asapo_monitoring_server/main/main.go  |  27 +-
 .../server/QueryServer.go                     | 312 ++++++++++--------
 .../server/ServerEntrypoint.go                |  34 +-
 .../server/Settings.go                        |   5 +
 6 files changed, 239 insertions(+), 149 deletions(-)

diff --git a/deploy/asapo_services/scripts/monitoring_server.json.tpl b/deploy/asapo_services/scripts/monitoring_server.json.tpl
index 9ac042419..d0e440df6 100644
--- a/deploy/asapo_services/scripts/monitoring_server.json.tpl
+++ b/deploy/asapo_services/scripts/monitoring_server.json.tpl
@@ -3,5 +3,8 @@
     "ServerPort": {{ env "NOMAD_PORT_monitoring_server" }},
     "LogLevel": "{{ keyOrDefault "log_level" "debug" }}",
     "InfluxDbUrl":"http://localhost:8400/influxdb",
-    "InfluxDbDatabase": "asapo_monitoring"
+    "InfluxDbDatabase": "asapo_monitoring",
+    "RetentionPolicyTime": "12h",
+	"GroupingTime": "10m",
+	"MaxPoints": 500
 }
diff --git a/deploy/asapo_services_light/monitoring.json b/deploy/asapo_services_light/monitoring.json
index ed5241dff..63cf43a4d 100644
--- a/deploy/asapo_services_light/monitoring.json
+++ b/deploy/asapo_services_light/monitoring.json
@@ -3,5 +3,8 @@
     "ServerPort": 8422,
     "LogLevel": "debug",
     "InfluxDbUrl":"http://localhost:8400/influxdb",
-    "InfluxDbDatabase": "asapo_monitoring"
+    "InfluxDbDatabase": "asapo_monitoring",
+    "RetentionPolicyTime": "12h",
+    "GroupingTime": "10m",
+    "MaxPoints": 500
 }
diff --git a/monitoring/monitoring_server/src/asapo_monitoring_server/main/main.go b/monitoring/monitoring_server/src/asapo_monitoring_server/main/main.go
index 356a5fcfc..3eded5fff 100644
--- a/monitoring/monitoring_server/src/asapo_monitoring_server/main/main.go
+++ b/monitoring/monitoring_server/src/asapo_monitoring_server/main/main.go
@@ -41,6 +41,18 @@ func loadConfig(configFileName string) (log.Level, server.Settings, error) {
 		return log.FatalLevel, server.Settings{}, errors.New("'InfluxDbUrl' not set")
 	}
 
+	if settings.RetentionPolicyTime == "" {
+		return log.FatalLevel, server.Settings{}, errors.New("'RetentionPolicyTime' not set")
+	}
+
+	if settings.GroupingTime == "" {
+		return log.FatalLevel, server.Settings{}, errors.New("'GroupingTime' not set")
+	}
+
+	if settings.MaxPoints == 0 {
+		return log.FatalLevel, server.Settings{}, errors.New("'MaxPoints' not set")
+	}
+
 	if settings.InfluxDbDatabase == "" {
 		return log.FatalLevel, server.Settings{}, errors.New("'InfluxDbDatabase' not set")
 	}
@@ -97,27 +109,28 @@ func main() {
 		log.Fatal(err.Error())
 		return
 	}
+	log.SetLevel(logLevel)
+
 	// InfluxDB 1 fix, create database
 	// Does multiple HTTP posts to DB
 	// It creates two databases.
 	normalDB := settings.InfluxDbDatabase
 	avgDB := normalDB + "_avg"
-	//rpName := "one_day"
+	rpName := "one_day"
 	cqName := "avg_values"
 	postStrings := []string{
 		"CREATE DATABASE " + normalDB,
 		"CREATE DATABASE " + avgDB,
-		//"CREATE RETENTION POLICY " + rpName + " ON " + normalDB +
-		//	" DURATION 24h REPLICATION 1 DEFAULT",
+		"CREATE RETENTION POLICY " + rpName + " ON " + normalDB +
+			" DURATION " + settings.RetentionPolicyTime + " REPLICATION 1 DEFAULT",
 		"CREATE CONTINUOUS QUERY " + cqName + " ON " + normalDB + " BEGIN" +
-			" SELECT mean(*) INTO " + avgDB +
-			"..:MEASUREMENT FROM /.*/ GROUP BY time(12h),* END",
+			" SELECT sum(*) INTO " + avgDB +
+			"..:MEASUREMENT FROM /.*/ GROUP BY time(" + settings.GroupingTime + "),* END",
 	}
 	for i := 0; i < len(postStrings); i++ {
 		postTODB(postStrings[i], settings)
+		log.Debug("Modify DB with command: " + postStrings[i])
 	}
 
-	log.SetLevel(logLevel)
-
 	server.Start(settings)
 }
diff --git a/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go b/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go
index 675e6b9d3..e6f6ebed1 100644
--- a/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go
+++ b/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go
@@ -5,20 +5,22 @@ import (
 	log "asapo_common/logger"
 	"context"
 	"errors"
-	"fmt"
 	"github.com/influxdata/influxdb-client-go/v2/api"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 	"sort"
 	"strconv"
 	"strings"
+	"time"
 )
 
 type QueryServer struct {
 	pb.UnimplementedAsapoMonitoringQueryServiceServer
 
-	settings   Settings
-	dbQueryApi api.QueryAPI
+	settings        Settings
+	dbQueryApi      api.QueryAPI
+	switchPeriod    int64
+	groupedInterval int64
 }
 
 func (s *QueryServer) GetMetadata(ctx context.Context, _ *pb.Empty) (*pb.MetadataResponse, error) {
@@ -168,30 +170,39 @@ func doesStringContainsDangerousElements(input string) bool {
 		strings.Contains(input, "}")
 }
 
-// Modifies filter to query averaged data
-func generateFilterForAVGData(s *QueryServer, filter string) string {
-	// Map of all modifications that a querystring needs to access averaged data.
-	// Currently, there is only one modification needed so this is a bit redundant.
-	filterModifications := make(map[string]string)
-	filterModifications[s.settings.InfluxDbDatabase] = s.settings.InfluxDbDatabase + averageSuffix
-	filterModifications[dbMeasurementBrokerFileRequests] = "mean_" + dbMeasurementBrokerFileRequests
-	filterModifications[dbMeasurementFtsTransfers] = "mean_" + dbMeasurementFtsTransfers
-	filterModifications[dbMeasurementFileInput] = "mean_" + dbMeasurementFileInput
-	filterModifications[dbMeasurementRdsCacheMemoryUsage] = "mean_" + dbMeasurementRdsCacheMemoryUsage
-	filterModifications[dbMeasurementRdsFileRequests] = "mean_" + dbMeasurementRdsFileRequests
-	alteredFilter := filter
-	// Do all the filterModifications
-
-	for key, element := range filterModifications {
-		alteredFilter = strings.ReplaceAll(filter, key, element)
-	}
-	return alteredFilter
+func GetRequestPeriods(fromTimestamp uint64, toTimestamp uint64, groupedInterval int64,
+	switchPeriod int64, maxPoints uint64) (string, string, int) {
+	// Define time ranges for full and reduce DBs and interval between data points
+
+	startTime := strconv.FormatUint(fromTimestamp, 10)
+	endTime := strconv.FormatUint(toTimestamp, 10)
+
+	intervalInSec := int((toTimestamp - fromTimestamp) / maxPoints)
+	if intervalInSec < 1 {
+		intervalInSec = 1
+	}
+
+	switchTime := uint64(time.Now().Unix() - switchPeriod)
+	timeRangeAve := ""
+	timeRange := "start: " + startTime + ", stop: " + endTime
+	if fromTimestamp < switchTime {
+		intervalInSec = int(groupedInterval)
+		if toTimestamp < switchTime {
+			timeRangeAve = "start: " + startTime + ", stop: " + endTime
+			timeRange = ""
+		} else {
+			timeRangeAve = "start: " + startTime + ", stop: " + strconv.FormatUint(switchTime, 10)
+			timeRange = "start: " + strconv.FormatUint(switchTime, 10) + ", stop: " + endTime
+		}
+	}
+	return timeRange, timeRangeAve, intervalInSec
 }
 
 func (s *QueryServer) GetDataPoints(ctx context.Context, query *pb.DataPointsQuery) (*pb.DataPointsResponse, error) {
-	startTime := strconv.FormatUint(query.FromTimestamp, 10)
-	endTime := strconv.FormatUint(query.ToTimestamp, 10)
-	intervalInSec := 5
+
+	timeRange, timeRangeAve, intervalInSec := GetRequestPeriods(query.FromTimestamp,
+		query.ToTimestamp, s.groupedInterval, s.switchPeriod, s.settings.MaxPoints)
+	nEntries := int((query.ToTimestamp - query.FromTimestamp) / uint64(intervalInSec))
 
 	if query.BeamtimeFilter == "" {
 		return nil, status.Errorf(codes.InvalidArgument, "Beamtime is required")
@@ -210,28 +221,28 @@ func (s *QueryServer) GetDataPoints(ctx context.Context, query *pb.DataPointsQue
 	response := pb.DataPointsResponse{}
 
 	transferSpeedFilter := createFilter(query, transferSpeedFilterGenerator)
-	transferSpeed, err := queryTransferSpeed(s, ctx, startTime, endTime, intervalInSec, transferSpeedFilter)
+	transferSpeed, err := queryTransferSpeed(s, ctx, timeRange, timeRangeAve, intervalInSec, transferSpeedFilter, nEntries)
 	if err != nil {
 		return nil, err
 	}
 	response.TransferRates = transferSpeed
 
 	fileRateFilter := createFilter(query, transferRateFilterGenerator)
-	fileRates, err := queryFileRate(s, ctx, startTime, endTime, intervalInSec, fileRateFilter)
+	fileRates, err := queryFileRate(s, ctx, timeRange, timeRangeAve, intervalInSec, fileRateFilter, nEntries)
 	if err != nil {
 		return nil, err
 	}
 	response.FileRates = fileRates
 
 	taskTimeFilter := createFilter(query, taskTimeFilterGenerator)
-	taskTime, err := queryTaskTime(s, ctx, startTime, endTime, intervalInSec, taskTimeFilter)
+	taskTime, err := queryTaskTime(s, ctx, timeRange, timeRangeAve, intervalInSec, taskTimeFilter, nEntries)
 	if err != nil {
 		return nil, err
 	}
 	response.TaskTimes = taskTime
 
 	memoryUsageFilter := createFilter(query, memoryUsageFilterGenerator)
-	memoryUsage, err := queryMemoryUsage(s, ctx, startTime, endTime, intervalInSec, memoryUsageFilter)
+	memoryUsage, err := queryMemoryUsage(s, ctx, timeRange, timeRangeAve, intervalInSec, memoryUsageFilter, nEntries)
 	if err != nil {
 		return nil, err
 	}
@@ -243,7 +254,17 @@ func (s *QueryServer) GetDataPoints(ctx context.Context, query *pb.DataPointsQue
 }
 
 // Queries DB for transferspeed
-func queryfunctionTransferSpeed(s *QueryServer, ctx context.Context, intervalInSec int, query string) ([]*pb.TotalTransferRateDataPoint, error) {
+func queryfunctionTransferSpeed(s *QueryServer, ctx context.Context,
+	timeRange string, bucket string, inset string, intervalInSec int, filter string) ([]*pb.TotalTransferRateDataPoint, error) {
+
+	query := "from(bucket: \"" + bucket + "\")" + // Normal querystring
+		" |> range(" + timeRange + ")" +
+		" |> filter(fn: (r) => r._measurement == \"" + dbMeasurementFileInput + "\" or r._measurement == \"" + dbMeasurementBrokerFileRequests + "\" or r._measurement == \"" + dbMeasurementRdsFileRequests + "\" or r._measurement == \"" + dbMeasurementFtsTransfers + "\")" +
+		" |> filter(fn: (r) => r._field == \"" + inset + "totalInputFileSize\" or r._field == \"" + inset + "totalRequestedFileSize\" or r._field == \"" + inset + "totalRdsOutputFileSize\" or r._field == \"" + inset + "totalFtsTransferredFileSize\")" +
+		filter +
+		" |> group(columns: [\"_field\"])" +
+		" |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: sum, createEmpty: true)" +
+		" |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")"
 	result, err := s.dbQueryApi.Query(ctx, query)
 
 	if err != nil {
@@ -257,17 +278,17 @@ func queryfunctionTransferSpeed(s *QueryServer, ctx context.Context, intervalInS
 		sendRdsBytes := int64(0)
 		sendFtsBytes := int64(0)
 
-		if result.Record().Values()["totalInputFileSize"] != nil {
-			recvBytes = result.Record().Values()["totalInputFileSize"].(int64)
+		if result.Record().Values()[inset+"totalInputFileSize"] != nil {
+			recvBytes = result.Record().Values()[inset+"totalInputFileSize"].(int64) * 1000
 		}
-		if result.Record().Values()["totalRequestedFileSize"] != nil {
-			sendBytes = result.Record().Values()["totalRequestedFileSize"].(int64)
+		if result.Record().Values()[inset+"totalRequestedFileSize"] != nil {
+			sendBytes = result.Record().Values()[inset+"totalRequestedFileSize"].(int64)
 		}
-		if result.Record().Values()["totalRdsOutputFileSize"] != nil {
-			sendRdsBytes = result.Record().Values()["totalRdsOutputFileSize"].(int64)
+		if result.Record().Values()[inset+"totalRdsOutputFileSize"] != nil {
+			sendRdsBytes = result.Record().Values()[inset+"totalRdsOutputFileSize"].(int64)
 		}
-		if result.Record().Values()["totalFtsTransferredFileSize"] != nil {
-			sendFtsBytes = result.Record().Values()["totalFtsTransferredFileSize"].(int64)
+		if result.Record().Values()[inset+"totalFtsTransferredFileSize"] != nil {
+			sendFtsBytes = result.Record().Values()[inset+"totalFtsTransferredFileSize"].(int64)
 		}
 
 		arrayResult = append(arrayResult, &pb.TotalTransferRateDataPoint{
@@ -283,35 +304,43 @@ func queryfunctionTransferSpeed(s *QueryServer, ctx context.Context, intervalInS
 	}
 	return arrayResult, nil
 }
-func queryTransferSpeed(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TotalTransferRateDataPoint, error) {
-	// Querystring
-	query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring
-		" |> range(start: " + startTime + ", stop: " + endTime + ")" +
-		" |> filter(fn: (r) => r._measurement == \"" + dbMeasurementFileInput + "\" or r._measurement == \"" + dbMeasurementBrokerFileRequests + "\" or r._measurement == \"" + dbMeasurementRdsFileRequests + "\" or r._measurement == \"" + dbMeasurementFtsTransfers + "\")" +
-		" |> filter(fn: (r) => r._field == \"totalInputFileSize\" or r._field == \"totalRequestedFileSize\" or r._field == \"totalRdsOutputFileSize\" or r._field == \"totalFtsTransferredFileSize\")" +
-		filter +
-		" |> group(columns: [\"_field\"])" +
-		" |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: sum, createEmpty: true)" +
-		" |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")"
-	queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data
+func queryTransferSpeed(s *QueryServer, ctx context.Context, timeRange string,
+	timeRangeAve string, intervalInSec int, filter string, nEntr int) ([]*pb.TotalTransferRateDataPoint, error) {
 
-	arrayResult, err := queryfunctionTransferSpeed(s, ctx, intervalInSec, query) // Query newer data
-	if err != nil {
-		return nil, err
-	}
-	secondArrayResult, err := queryfunctionTransferSpeed(s, ctx, intervalInSec, queryForOldData) // Query averaged data
+	arrayResult, err := queryfunctionTransferSpeed(s, ctx, timeRange,
+		s.settings.InfluxDbDatabase, "", intervalInSec, filter)
 	if err != nil {
 		return nil, err
 	}
-	// Append the results of the second query to those of the first
-	for a := range secondArrayResult {
-		arrayResult = append(arrayResult, secondArrayResult[a])
+
+	// Querystring for averaged data
+	if timeRangeAve != "" {
+		secondArrayResult, err := queryfunctionTransferSpeed(s, ctx, timeRangeAve,
+			s.settings.InfluxDbDatabase+averageSuffix, "sum_", intervalInSec, filter)
+		if err != nil {
+			return nil, err
+		}
+		if len(secondArrayResult) == 0 {
+			secondArrayResult = make([]*pb.TotalTransferRateDataPoint, nEntr-len(arrayResult))
+		}
+		arrayResult = append(secondArrayResult, arrayResult...)
 	}
 	return arrayResult, nil
 }
 
 // Queries DB for Filerate
-func queryfunctionFileRate(s *QueryServer, ctx context.Context, query string) ([]*pb.TotalFileRateDataPoint, error) {
+func queryfunctionFileRate(s *QueryServer, ctx context.Context, timeRange string,
+	bucket string, inset string, intervalInSec int, filter string) ([]*pb.TotalFileRateDataPoint, error) {
+
+	query := "from(bucket: \"" + bucket + "\")" + // Normal querystring
+		" |> range(" + timeRange + ")" +
+		" |> filter(fn: (r) => r._measurement == \"" + dbMeasurementRdsFileRequests + "\" or r._measurement == \"" + dbMeasurementBrokerFileRequests + "\" or r._measurement == \"" + dbMeasurementFtsTransfers + "\")" +
+		" |> filter(fn: (r) => r._field == \"" + inset + "totalRdsHits\" or r._field == \"" + inset + "totalRdsMisses\" or r._field == \"" + inset + "requestedFileCount\" or r._field == \"" + inset + "ftsFileCount\")" +
+		filter +
+		" |> group(columns: [\"_field\"])" +
+		" |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: sum, createEmpty: true)" +
+		" |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")"
+
 	rdsResponses, err := s.dbQueryApi.Query(ctx, query)
 	if err != nil {
 		return nil, err
@@ -324,17 +353,17 @@ func queryfunctionFileRate(s *QueryServer, ctx context.Context, query string) ([
 		miss := uint32(0)
 		fromDisk := uint32(0)
 
-		if rdsResponses.Record().Values()["requestedFileCount"] != nil {
-			totalRequests = uint32(rdsResponses.Record().Values()["requestedFileCount"].(int64))
+		if rdsResponses.Record().Values()[inset+"requestedFileCount"] != nil {
+			totalRequests = uint32(rdsResponses.Record().Values()[inset+"requestedFileCount"].(int64))
 		}
-		if rdsResponses.Record().Values()["totalRdsHits"] != nil {
-			hit = uint32(rdsResponses.Record().Values()["totalRdsHits"].(int64))
+		if rdsResponses.Record().Values()[inset+"totalRdsHits"] != nil {
+			hit = uint32(rdsResponses.Record().Values()[inset+"totalRdsHits"].(int64))
 		}
-		if rdsResponses.Record().Values()["totalRdsMisses"] != nil {
-			miss = uint32(rdsResponses.Record().Values()["totalRdsMisses"].(int64))
+		if rdsResponses.Record().Values()[inset+"totalRdsMisses"] != nil {
+			miss = uint32(rdsResponses.Record().Values()[inset+"totalRdsMisses"].(int64))
 		}
-		if rdsResponses.Record().Values()["ftsFileCount"] != nil {
-			fromDisk = uint32(rdsResponses.Record().Values()["ftsFileCount"].(int64))
+		if rdsResponses.Record().Values()[inset+"ftsFileCount"] != nil {
+			fromDisk = uint32(rdsResponses.Record().Values()[inset+"ftsFileCount"].(int64))
 		}
 
 		arrayResult = append(arrayResult, &pb.TotalFileRateDataPoint{
@@ -351,46 +380,43 @@ func queryfunctionFileRate(s *QueryServer, ctx context.Context, query string) ([
 
 	return arrayResult, nil
 }
-func queryFileRate(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TotalFileRateDataPoint, error) {
-	// Querystrings
-	query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring
-		" |> range(start: " + startTime + ", stop: " + endTime + ")" +
-		" |> filter(fn: (r) => r._measurement == \"" + dbMeasurementRdsFileRequests + "\" or r._measurement == \"" + dbMeasurementBrokerFileRequests + "\" or r._measurement == \"" + dbMeasurementFtsTransfers + "\")" +
-		" |> filter(fn: (r) => r._field == \"totalRdsHits\" or r._field == \"totalRdsMisses\" or r._field == \"requestedFileCount\" or r._field == \"ftsFileCount\")" +
-		filter +
-		" |> group(columns: [\"_field\"])" +
-		" |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: sum, createEmpty: true)" +
-		" |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")"
-	queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data
-
-	arrayResult, err := queryfunctionFileRate(s, ctx, query) // Query newer data
-	if err != nil {
-		return nil, err
-	}
-	secondArrayResult, err := queryfunctionFileRate(s, ctx, queryForOldData) // Query averaged data
+func queryFileRate(s *QueryServer, ctx context.Context, timeRange string,
+	timeRangeAve string, intervalInSec int, filter string, nEntr int) ([]*pb.TotalFileRateDataPoint, error) {
+	arrayResult, err := queryfunctionFileRate(s, ctx, timeRange,
+		s.settings.InfluxDbDatabase, "", intervalInSec, filter) // Query newer data
 	if err != nil {
 		return nil, err
 	}
-	// Append the results of the second query to those of the first
-	for a := range secondArrayResult {
-		arrayResult = append(arrayResult, secondArrayResult[a])
+
+	// Querystring for averaged data
+	if timeRangeAve != "" {
+		secondArrayResult, err := queryfunctionFileRate(s, ctx, timeRangeAve,
+			s.settings.InfluxDbDatabase+averageSuffix, "sum_", intervalInSec, filter)
+		if err != nil {
+			return nil, err
+		}
+		if len(secondArrayResult) == 0 {
+			secondArrayResult = make([]*pb.TotalFileRateDataPoint, nEntr-len(arrayResult))
+		}
+		arrayResult = append(secondArrayResult, arrayResult...)
 	}
 	return arrayResult, nil
 }
 
-func queryTaskTime(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TaskTimeDataPoint, error) {
-	// Querystrings
-	query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring
-		" |> range(start: " + startTime + ", stop: " + endTime + ")" +
-		" |> filter(fn: (r) => r._measurement == \"" + dbMeasurementFileInput + "\" or r._measurement == \"" + dbMeasurementRdsFileRequests + "\")" +
-		" |> filter(fn: (r) => r._field == \"avgTransferReceiveTimeUs\" or r._field == \"avgWriteIoTimeUs\" or r._field == \"avgDbTimeUs\" or r._field == \"avgRdsOutputTransferTimeUs\")" +
-		filter +
-		" |> group(columns: [\"_field\"])" +
-		" |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: mean, createEmpty: true)" +
-		" |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")"
-	queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data
+func queryTaskTime(s *QueryServer, ctx context.Context, timeRange string,
+	timeRangeAve string, intervalInSec int, filter string, nEntr int) ([]*pb.TaskTimeDataPoint, error) {
+
+	queryfunction := func(s *QueryServer, ctx context.Context, timeRange string, bucket string,
+		inset string, intervalInSec int, filter string) ([]*pb.TaskTimeDataPoint, error) {
+		query := "from(bucket: \"" + bucket + "\")" + // Normal querystring
+			" |> range(" + timeRange + ")" +
+			" |> filter(fn: (r) => r._measurement == \"" + dbMeasurementFileInput + "\" or r._measurement == \"" + dbMeasurementRdsFileRequests + "\")" +
+			" |> filter(fn: (r) => r._field == \"" + inset + "avgTransferReceiveTimeUs\" or r._field == \"" + inset + "avgWriteIoTimeUs\" or r._field == \"" + inset + "avgDbTimeUs\" or r._field == \"" + inset + "avgRdsOutputTransferTimeUs\")" +
+			filter +
+			" |> group(columns: [\"_field\"])" +
+			" |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: mean, createEmpty: true)" +
+			" |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")"
 
-	queryfunction := func(s *QueryServer, ctx context.Context, query string) ([]*pb.TaskTimeDataPoint, error) {
 		var arrayResult []*pb.TaskTimeDataPoint
 		result, err := s.dbQueryApi.Query(ctx, query)
 		if err != nil {
@@ -403,17 +429,17 @@ func queryTaskTime(s *QueryServer, ctx context.Context, startTime string, endTim
 			writeToDatabaseTimeUs := float64(0)
 			rdsOutputTransferTimeUs := float64(0)
 
-			if result.Record().Values()["avgTransferReceiveTimeUs"] != nil {
-				receiveIoTimeUs = result.Record().Values()["avgTransferReceiveTimeUs"].(float64)
+			if result.Record().Values()[inset+"avgTransferReceiveTimeUs"] != nil {
+				receiveIoTimeUs = result.Record().Values()[inset+"avgTransferReceiveTimeUs"].(float64)
 			}
-			if result.Record().Values()["avgWriteIoTimeUs"] != nil {
-				writeToDiskTimeUs = result.Record().Values()["avgWriteIoTimeUs"].(float64)
+			if result.Record().Values()[inset+"avgWriteIoTimeUs"] != nil {
+				writeToDiskTimeUs = result.Record().Values()[inset+"avgWriteIoTimeUs"].(float64)
 			}
-			if result.Record().Values()["avgDbTimeUs"] != nil {
-				writeToDatabaseTimeUs = result.Record().Values()["avgDbTimeUs"].(float64)
+			if result.Record().Values()[inset+"avgDbTimeUs"] != nil {
+				writeToDatabaseTimeUs = result.Record().Values()[inset+"avgDbTimeUs"].(float64)
 			}
-			if result.Record().Values()["avgRdsOutputTransferTimeUs"] != nil {
-				rdsOutputTransferTimeUs = result.Record().Values()["avgRdsOutputTransferTimeUs"].(float64)
+			if result.Record().Values()[inset+"avgRdsOutputTransferTimeUs"] != nil {
+				rdsOutputTransferTimeUs = result.Record().Values()[inset+"avgRdsOutputTransferTimeUs"].(float64)
 			}
 
 			arrayResult = append(arrayResult, &pb.TaskTimeDataPoint{
@@ -431,36 +457,42 @@ func queryTaskTime(s *QueryServer, ctx context.Context, startTime string, endTim
 		return arrayResult, nil
 	}
 
-	arrayResult, err := queryfunction(s, ctx, query) // Query newer data
-	if err != nil {
-		return nil, err
-	}
-	secondArrayResult, err := queryfunction(s, ctx, queryForOldData) // Query averaged data
+	arrayResult, err := queryfunction(s, ctx, timeRange,
+		s.settings.InfluxDbDatabase, "", intervalInSec, filter) // Query newer data
 	if err != nil {
 		return nil, err
 	}
-	// Append the results of the second query to those of the first
-	for a := range secondArrayResult {
-		arrayResult = append(arrayResult, secondArrayResult[a])
+
+	// Querystring for averaged data
+	if timeRangeAve != "" {
+		secondArrayResult, err := queryfunction(s, ctx, timeRangeAve,
+			s.settings.InfluxDbDatabase+averageSuffix, "sum_", intervalInSec, filter)
+		if err != nil {
+			return nil, err
+		}
+		if len(secondArrayResult) == 0 {
+			secondArrayResult = make([]*pb.TaskTimeDataPoint, nEntr-len(arrayResult))
+		}
+		arrayResult = append(secondArrayResult, arrayResult...)
 	}
 	return arrayResult, nil
 }
 
-func queryMemoryUsage(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.RdsMemoryUsageDataPoint, error) {
-	// Querystrings
-	query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring
-		" |> range(start: " + startTime + ", stop: " + endTime + ")" +
-		" |> filter(fn: (r) => r._measurement == \"" + dbMeasurementRdsCacheMemoryUsage + "\")" +
-		" |> filter(fn: (r) => r._field == \"rdsCacheUsedBytes\")" +
-		filter +
-		" |> group(columns: [\"_field\"])" +
-		" |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: sum, createEmpty: true)" +
-		" |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")"
-	queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data
+func queryMemoryUsage(s *QueryServer, ctx context.Context, timeRange string,
+	timeRangeAve string, intervalInSec int, filter string, nEntr int) ([]*pb.RdsMemoryUsageDataPoint, error) {
 
-	queryfunction := func(s *QueryServer, ctx context.Context, query string) ([]*pb.RdsMemoryUsageDataPoint, error) {
+	queryfunction := func(s *QueryServer, ctx context.Context, timeRange string, bucket string,
+		inset string, intervalInSec int, filter string) ([]*pb.RdsMemoryUsageDataPoint, error) {
 		var arrayResult []*pb.RdsMemoryUsageDataPoint
-
+		// Querystrings
+		query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring
+			" |> range(" + timeRange + ")" +
+			" |> filter(fn: (r) => r._measurement == \"" + dbMeasurementRdsCacheMemoryUsage + "\")" +
+			" |> filter(fn: (r) => r._field == \"rdsCacheUsedBytes\")" +
+			filter +
+			" |> group(columns: [\"_field\"])" +
+			" |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: sum, createEmpty: true)" +
+			" |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")"
 		result, err := s.dbQueryApi.Query(ctx, query)
 		if err != nil {
 			return nil, err
@@ -468,8 +500,8 @@ func queryMemoryUsage(s *QueryServer, ctx context.Context, startTime string, end
 		for result.Next() {
 			usedBytes := int64(0)
 
-			if result.Record().Values()["rdsCacheUsedBytes"] != nil {
-				usedBytes = result.Record().Values()["rdsCacheUsedBytes"].(int64)
+			if result.Record().Values()[inset+"rdsCacheUsedBytes"] != nil {
+				usedBytes = result.Record().Values()[inset+"rdsCacheUsedBytes"].(int64)
 			}
 
 			arrayResult = append(arrayResult, &pb.RdsMemoryUsageDataPoint{
@@ -483,18 +515,25 @@ func queryMemoryUsage(s *QueryServer, ctx context.Context, startTime string, end
 		return arrayResult, nil
 	}
 
-	arrayResult, err := queryfunction(s, ctx, query) // Query for newer data
+	arrayResult, err := queryfunction(s, ctx, timeRange,
+		s.settings.InfluxDbDatabase, "", intervalInSec, filter) // Query for newer data
 	if err != nil {
 		return nil, err
 	}
-	secondArrayResult, err := queryfunction(s, ctx, queryForOldData) // Query for averaged data
-	if err != nil {
-		return nil, err
-	}
-	// Append the results of the second query to those of the first
-	for a := range secondArrayResult {
-		arrayResult = append(arrayResult, secondArrayResult[a])
+
+	// Querystring for averaged data
+	if timeRangeAve != "" {
+		secondArrayResult, err := queryfunction(s, ctx, timeRangeAve,
+			s.settings.InfluxDbDatabase+averageSuffix, "sum_", intervalInSec, filter)
+		if err != nil {
+			return nil, err
+		}
+		if len(secondArrayResult) == 0 {
+			secondArrayResult = make([]*pb.RdsMemoryUsageDataPoint, nEntr-len(arrayResult))
+		}
+		arrayResult = append(secondArrayResult, arrayResult...)
 	}
+
 	return arrayResult, nil
 }
 
@@ -515,7 +554,6 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) (
 	)
 
 	if err != nil {
-		fmt.Printf("Error: %v\n", err)
 		return nil, err
 	}
 
diff --git a/monitoring/monitoring_server/src/asapo_monitoring_server/server/ServerEntrypoint.go b/monitoring/monitoring_server/src/asapo_monitoring_server/server/ServerEntrypoint.go
index 669cd6b2e..778dcb7ce 100644
--- a/monitoring/monitoring_server/src/asapo_monitoring_server/server/ServerEntrypoint.go
+++ b/monitoring/monitoring_server/src/asapo_monitoring_server/server/ServerEntrypoint.go
@@ -4,29 +4,57 @@ import (
 	pb "asapo_common/generated_proto"
 	log "asapo_common/logger"
 	"context"
+	"errors"
 	influxdb2 "github.com/influxdata/influxdb-client-go/v2"
 	"google.golang.org/grpc"
 	"net"
 	"strconv"
+	"time"
 )
 
+func GetSwitshPeriod(retentionTime string, groupingTime string) (int64, int64, error) {
+	// Return time, at which Querry server switch from full to reduce DB
+	retentionTimeInt, err := time.ParseDuration(retentionTime)
+	if err != nil {
+		return 0, 0, err
+	}
+	groupingTimeInt, err := time.ParseDuration(groupingTime)
+	if err != nil {
+		return 0, 0, err
+	}
+
+	switchPeriod := retentionTimeInt.Seconds() - groupingTimeInt.Seconds()
+	if switchPeriod < 0 {
+		return 0, 0, errors.New("retention time is less then grouping time")
+	}
+	return int64(switchPeriod), int64(groupingTimeInt.Seconds()), nil
+}
+
 func Start(settings Settings) {
 	lis, err := net.Listen("tcp", ":"+strconv.Itoa(int(settings.ServerPort)))
 	if err != nil {
 		log.Fatal("failed to listen: ", err)
 	}
 
-	influxClient := influxdb2.NewClient(settings.InfluxDbUrl, ""/*settings.InfluxDb2AuthToken*/)
+	influxClient := influxdb2.NewClient(settings.InfluxDbUrl, "" /*settings.InfluxDb2AuthToken*/)
 	// influxClient.BucketsAPI().CreateBucketWithName(context.Background(), domain.Organization{}, "bucketName", domain.RetentionRules{})
 
 	queryServer := QueryServer{}
 	ingestServer := IngestServer{}
 
-	dbQueryApi := influxClient.QueryAPI(""/*settings.InfluxDb2Org*/)
+	dbQueryApi := influxClient.QueryAPI("" /*settings.InfluxDb2Org*/)
 	queryServer.dbQueryApi = dbQueryApi
 	queryServer.settings = settings
 
-	dbWriterApi := influxClient.WriteAPI(""/*settings.InfluxDb2Org*/, settings.InfluxDbDatabase)
+	switchPeriod, groupedInterval, err := GetSwitshPeriod(settings.RetentionPolicyTime, settings.GroupingTime)
+	if err != nil {
+		log.Fatal("failed to get switch period", err)
+		return
+	}
+	queryServer.switchPeriod = switchPeriod
+	queryServer.groupedInterval = groupedInterval
+
+	dbWriterApi := influxClient.WriteAPI("" /*settings.InfluxDb2Org*/, settings.InfluxDbDatabase)
 	ingestServer.dbWriterApi = dbWriterApi
 	ingestServer.settings = settings
 
diff --git a/monitoring/monitoring_server/src/asapo_monitoring_server/server/Settings.go b/monitoring/monitoring_server/src/asapo_monitoring_server/server/Settings.go
index 2f2b0b9fa..c0d120b12 100644
--- a/monitoring/monitoring_server/src/asapo_monitoring_server/server/Settings.go
+++ b/monitoring/monitoring_server/src/asapo_monitoring_server/server/Settings.go
@@ -45,4 +45,9 @@ type Settings struct {
 
 	InfluxDbUrl      string // e.g. "http://127.0.0.1:8086"
 	InfluxDbDatabase string // e.g. "asapo-monitoring"
+
+	RetentionPolicyTime string // e.g. 1h
+	GroupingTime        string // e.g. 5m
+	MaxPoints           uint64 // e.g. 500
+
 }
-- 
GitLab