From 0ccc4be9c26cec9dc61dc4280739034d2197c61e Mon Sep 17 00:00:00 2001 From: Lars Janssen <lars.janssen@hae-hamburg.de> Date: Tue, 28 Feb 2023 12:53:04 +0100 Subject: [PATCH] Modified QueryServer.go to use average data from a different bucket and from newer bucket. From "asapo_monitoring" to "asapo_monitoring_avg". Added the averageSuffix "_avg" to Settings.go. --- .../server/QueryServer.go | 881 ++++++++++-------- .../server/Settings.go | 7 +- 2 files changed, 519 insertions(+), 369 deletions(-) diff --git a/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go b/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go index 7c5639efa..79e4944ac 100644 --- a/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go +++ b/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go @@ -168,7 +168,26 @@ func doesStringContainsDangerousElements(input string) bool { strings.Contains(input, "}") } +// Modifies filter to query averaged data +func generateFilterForAVGData(s *QueryServer, filter string) string { + // Map of all modifications that a querystring needs to access averaged data. + // Currently, there is only one modification needed so this is a bit redundant. + filterModifications := make(map[string]string) + filterModifications[s.settings.InfluxDbDatabase] = s.settings.InfluxDbDatabase + averageSuffix + alteredFilter := filter + // Do all the filterModifications + + for key, element := range filterModifications { + alteredFilter = strings.ReplaceAll(filter, key, element) + } + fmt.Printf("\n") + log.Debug("generateFilterForAVGData: CHANGED: ", filter, " TO: ", alteredFilter, " WITH: ", filterModifications) + fmt.Printf("\n") + return alteredFilter +} + func (s *QueryServer) GetDataPoints(ctx context.Context, query *pb.DataPointsQuery) (*pb.DataPointsResponse, error) { + log.Debug("Getting Datapoints", query) startTime := strconv.FormatUint(query.FromTimestamp, 10) endTime := strconv.FormatUint(query.ToTimestamp, 10) intervalInSec := 5 @@ -219,197 +238,293 @@ func (s *QueryServer) GetDataPoints(ctx context.Context, query *pb.DataPointsQue response.StartTimestampInSec = query.FromTimestamp // TODO: Use first timestamp from query result response.TimeIntervalInSec = uint32(intervalInSec) - + fmt.Printf("\n") return &response, nil } func queryTransferSpeed(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TotalTransferRateDataPoint, error) { - var arrayResult []*pb.TotalTransferRateDataPoint - - result, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDbDatabase+"\")"+ - " |> range(start: "+startTime+", stop: "+endTime+")"+ - " |> filter(fn: (r) => r._measurement == \""+dbMeasurementFileInput+"\" or r._measurement == \""+dbMeasurementBrokerFileRequests+"\" or r._measurement == \""+dbMeasurementRdsFileRequests+"\" or r._measurement == \""+dbMeasurementFtsTransfers+"\")"+ - " |> filter(fn: (r) => r._field == \"totalInputFileSize\" or r._field == \"totalRequestedFileSize\" or r._field == \"totalRdsOutputFileSize\" or r._field == \"totalFtsTransferredFileSize\")"+ - filter+ - " |> group(columns: [\"_field\"])"+ - " |> aggregateWindow(every: "+strconv.Itoa(intervalInSec)+"s, fn: sum, createEmpty: true)"+ - " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")", - ) + log.Debug("queryTransferSpeed") + // Querystring + query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring + " |> range(start: " + startTime + ", stop: " + endTime + ")" + + " |> filter(fn: (r) => r._measurement == \"" + dbMeasurementFileInput + "\" or r._measurement == \"" + dbMeasurementBrokerFileRequests + "\" or r._measurement == \"" + dbMeasurementRdsFileRequests + "\" or r._measurement == \"" + dbMeasurementFtsTransfers + "\")" + + " |> filter(fn: (r) => r._field == \"totalInputFileSize\" or r._field == \"totalRequestedFileSize\" or r._field == \"totalRdsOutputFileSize\" or r._field == \"totalFtsTransferredFileSize\")" + + filter + + " |> group(columns: [\"_field\"])" + + " |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: sum, createEmpty: true)" + + " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" + queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data + + queryfunction := func(s *QueryServer, ctx context.Context, intervalInSec int, query string) ([]*pb.TotalTransferRateDataPoint, error) { + result, err := s.dbQueryApi.Query(ctx, query) + + if err != nil { + log.Error("queryTransferSpeed: queryfunction: RESULT: ", result, " ERROR: ", err) + return nil, err + } - if err != nil { - return nil, err - } + var arrayResult []*pb.TotalTransferRateDataPoint + for result.Next() { + recvBytes := int64(0) + sendBytes := int64(0) + sendRdsBytes := int64(0) + sendFtsBytes := int64(0) - for result.Next() { - recvBytes := int64(0) - sendBytes := int64(0) - sendRdsBytes := int64(0) - sendFtsBytes := int64(0) + if result.Record().Values()["totalInputFileSize"] != nil { + recvBytes = result.Record().Values()["totalInputFileSize"].(int64) + } + if result.Record().Values()["totalRequestedFileSize"] != nil { + sendBytes = result.Record().Values()["totalRequestedFileSize"].(int64) + } + if result.Record().Values()["totalRdsOutputFileSize"] != nil { + sendRdsBytes = result.Record().Values()["totalRdsOutputFileSize"].(int64) + } + if result.Record().Values()["totalFtsTransferredFileSize"] != nil { + sendFtsBytes = result.Record().Values()["totalFtsTransferredFileSize"].(int64) + } - if result.Record().Values()["totalInputFileSize"] != nil { - recvBytes = result.Record().Values()["totalInputFileSize"].(int64) - } - if result.Record().Values()["totalRequestedFileSize"] != nil { - sendBytes = result.Record().Values()["totalRequestedFileSize"].(int64) - } - if result.Record().Values()["totalRdsOutputFileSize"] != nil { - sendRdsBytes = result.Record().Values()["totalRdsOutputFileSize"].(int64) - } - if result.Record().Values()["totalFtsTransferredFileSize"] != nil { - sendFtsBytes = result.Record().Values()["totalFtsTransferredFileSize"].(int64) + arrayResult = append(arrayResult, &pb.TotalTransferRateDataPoint{ + TotalBytesPerSecRecv: uint64(recvBytes) / uint64(intervalInSec), + TotalBytesPerSecSend: uint64(sendBytes) / uint64(intervalInSec), + TotalBytesPerSecRdsSend: uint64(sendRdsBytes) / uint64(intervalInSec), + TotalBytesPerSecFtsSend: uint64(sendFtsBytes) / uint64(intervalInSec), + }) } - arrayResult = append(arrayResult, &pb.TotalTransferRateDataPoint{ - TotalBytesPerSecRecv: uint64(recvBytes) / uint64(intervalInSec), - TotalBytesPerSecSend: uint64(sendBytes) / uint64(intervalInSec), - TotalBytesPerSecRdsSend: uint64(sendRdsBytes) / uint64(intervalInSec), - TotalBytesPerSecFtsSend: uint64(sendFtsBytes) / uint64(intervalInSec), - }) + if result.Err() != nil { + log.Error("queryTransferSpeed: queryfunction: result has error: RESULT: ", result, " ERROR: ", result.Err()) + return nil, result.Err() + } + return arrayResult, nil } - if result.Err() != nil { - return nil, result.Err() + arrayResult, err := queryfunction(s, ctx, intervalInSec, query) // Query newer data + if err != nil { + log.Error("queryTransferSpeed: arrayResult: RESULT: ", arrayResult, " ERROR: ", err) + return nil, err + } + secondArrayResult, err := queryfunction(s, ctx, intervalInSec, queryForOldData) // Query averaged data + if err != nil { + log.Error("queryTransferSpeed: secondArrayResult: RESULT: ", secondArrayResult, " ERROR: ", err) + return nil, err + } + // Append the results of the second query to those of the first + for a := range secondArrayResult { + arrayResult = append(arrayResult, secondArrayResult[a]) } + fmt.Printf("\n") return arrayResult, nil } func queryFileRate(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TotalFileRateDataPoint, error) { - var arrayResult []*pb.TotalFileRateDataPoint - - rdsResponses, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDbDatabase+"\")"+ - " |> range(start: "+startTime+", stop: "+endTime+")"+ - " |> filter(fn: (r) => r._measurement == \""+dbMeasurementRdsFileRequests+"\" or r._measurement == \""+dbMeasurementBrokerFileRequests+"\" or r._measurement == \""+dbMeasurementFtsTransfers+"\")"+ - " |> filter(fn: (r) => r._field == \"totalRdsHits\" or r._field == \"totalRdsMisses\" or r._field == \"requestedFileCount\" or r._field == \"ftsFileCount\")"+ - filter+ - " |> group(columns: [\"_field\"])"+ - " |> aggregateWindow(every: "+strconv.Itoa(intervalInSec)+"s, fn: sum, createEmpty: true)"+ - " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")", - ) - if err != nil { - return nil, err - } + log.Debug("queryFileRate") + // Querystrings + query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring + " |> range(start: " + startTime + ", stop: " + endTime + ")" + + " |> filter(fn: (r) => r._measurement == \"" + dbMeasurementRdsFileRequests + "\" or r._measurement == \"" + dbMeasurementBrokerFileRequests + "\" or r._measurement == \"" + dbMeasurementFtsTransfers + "\")" + + " |> filter(fn: (r) => r._field == \"totalRdsHits\" or r._field == \"totalRdsMisses\" or r._field == \"requestedFileCount\" or r._field == \"ftsFileCount\")" + + filter + + " |> group(columns: [\"_field\"])" + + " |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: sum, createEmpty: true)" + + " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" + queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data + + queryfunction := func(s *QueryServer, ctx context.Context, query string) ([]*pb.TotalFileRateDataPoint, error) { + rdsResponses, err := s.dbQueryApi.Query(ctx, query) + if err != nil { + log.Error("queryFileRate: queryfunction: RESULT: ", rdsResponses, " ERROR: ", err) + return nil, err + } - for rdsResponses.Next() { - totalRequests := uint32(0) - hit := uint32(0) - miss := uint32(0) - fromDisk := uint32(0) + var arrayResult []*pb.TotalFileRateDataPoint + for rdsResponses.Next() { + totalRequests := uint32(0) + hit := uint32(0) + miss := uint32(0) + fromDisk := uint32(0) - if rdsResponses.Record().Values()["requestedFileCount"] != nil { - totalRequests = uint32(rdsResponses.Record().Values()["requestedFileCount"].(int64)) - } - if rdsResponses.Record().Values()["totalRdsHits"] != nil { - hit = uint32(rdsResponses.Record().Values()["totalRdsHits"].(int64)) - } - if rdsResponses.Record().Values()["totalRdsMisses"] != nil { - miss = uint32(rdsResponses.Record().Values()["totalRdsMisses"].(int64)) + if rdsResponses.Record().Values()["requestedFileCount"] != nil { + totalRequests = uint32(rdsResponses.Record().Values()["requestedFileCount"].(int64)) + } + if rdsResponses.Record().Values()["totalRdsHits"] != nil { + hit = uint32(rdsResponses.Record().Values()["totalRdsHits"].(int64)) + } + if rdsResponses.Record().Values()["totalRdsMisses"] != nil { + miss = uint32(rdsResponses.Record().Values()["totalRdsMisses"].(int64)) + } + if rdsResponses.Record().Values()["ftsFileCount"] != nil { + fromDisk = uint32(rdsResponses.Record().Values()["ftsFileCount"].(int64)) + } + + arrayResult = append(arrayResult, &pb.TotalFileRateDataPoint{ + TotalRequests: totalRequests, + CacheMisses: miss, + FromCache: hit, + FromDisk: fromDisk, + }) } - if rdsResponses.Record().Values()["ftsFileCount"] != nil { - fromDisk = uint32(rdsResponses.Record().Values()["ftsFileCount"].(int64)) + + if rdsResponses.Err() != nil { + log.Debug("queryFileRate: queryfunction: result has error. RESULT: ", rdsResponses, " ERROR: ", rdsResponses.Err()) + return nil, rdsResponses.Err() } - arrayResult = append(arrayResult, &pb.TotalFileRateDataPoint{ - TotalRequests: totalRequests, - CacheMisses: miss, - FromCache: hit, - FromDisk: fromDisk, - }) + return arrayResult, nil } - if rdsResponses.Err() != nil { - return nil, rdsResponses.Err() + arrayResult, err := queryfunction(s, ctx, query) // Query newer data + if err != nil { + log.Error("queryFileRate: arrayResult: RESULT: ", arrayResult, " ERROR: ", err) + return nil, err } - + secondArrayResult, err := queryfunction(s, ctx, queryForOldData) // Query averaged data + if err != nil { + log.Error("queryFileRate: secondArrayResult: RESULT: ", secondArrayResult, " ERROR: ", err) + return nil, err + } + // Append the results of the second query to those of the first + for a := range secondArrayResult { + arrayResult = append(arrayResult, secondArrayResult[a]) + } + fmt.Printf("\n") return arrayResult, nil } func queryTaskTime(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TaskTimeDataPoint, error) { - var arrayResult []*pb.TaskTimeDataPoint - - result, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDbDatabase+"\")"+ - " |> range(start: "+startTime+", stop: "+endTime+")"+ - " |> filter(fn: (r) => r._measurement == \""+dbMeasurementFileInput+"\" or r._measurement == \""+dbMeasurementRdsFileRequests+"\")"+ - " |> filter(fn: (r) => r._field == \"avgTransferReceiveTimeUs\" or r._field == \"avgWriteIoTimeUs\" or r._field == \"avgDbTimeUs\" or r._field == \"avgRdsOutputTransferTimeUs\")"+ - filter+ - " |> group(columns: [\"_field\"])"+ - " |> aggregateWindow(every: "+strconv.Itoa(intervalInSec)+"s, fn: mean, createEmpty: true)"+ - " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")", - ) - if err != nil { - return nil, err - } + log.Debug("queryTaskTime") + // Querystrings + query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring + " |> range(start: " + startTime + ", stop: " + endTime + ")" + + " |> filter(fn: (r) => r._measurement == \"" + dbMeasurementFileInput + "\" or r._measurement == \"" + dbMeasurementRdsFileRequests + "\")" + + " |> filter(fn: (r) => r._field == \"avgTransferReceiveTimeUs\" or r._field == \"avgWriteIoTimeUs\" or r._field == \"avgDbTimeUs\" or r._field == \"avgRdsOutputTransferTimeUs\")" + + filter + + " |> group(columns: [\"_field\"])" + + " |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: mean, createEmpty: true)" + + " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" + queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data + + queryfunction := func(s *QueryServer, ctx context.Context, query string) ([]*pb.TaskTimeDataPoint, error) { + var arrayResult []*pb.TaskTimeDataPoint + result, err := s.dbQueryApi.Query(ctx, query) + if err != nil { + log.Error("queryTaskTime: queryfunction: RESULT: ", result, " ERROR: ", err) + return nil, err + } - for result.Next() { - receiveIoTimeUs := float64(0) - writeToDiskTimeUs := float64(0) - writeToDatabaseTimeUs := float64(0) - rdsOutputTransferTimeUs := float64(0) + for result.Next() { + receiveIoTimeUs := float64(0) + writeToDiskTimeUs := float64(0) + writeToDatabaseTimeUs := float64(0) + rdsOutputTransferTimeUs := float64(0) - if result.Record().Values()["avgTransferReceiveTimeUs"] != nil { - receiveIoTimeUs = result.Record().Values()["avgTransferReceiveTimeUs"].(float64) - } - if result.Record().Values()["avgWriteIoTimeUs"] != nil { - writeToDiskTimeUs = result.Record().Values()["avgWriteIoTimeUs"].(float64) - } - if result.Record().Values()["avgDbTimeUs"] != nil { - writeToDatabaseTimeUs = result.Record().Values()["avgDbTimeUs"].(float64) + if result.Record().Values()["avgTransferReceiveTimeUs"] != nil { + receiveIoTimeUs = result.Record().Values()["avgTransferReceiveTimeUs"].(float64) + } + if result.Record().Values()["avgWriteIoTimeUs"] != nil { + writeToDiskTimeUs = result.Record().Values()["avgWriteIoTimeUs"].(float64) + } + if result.Record().Values()["avgDbTimeUs"] != nil { + writeToDatabaseTimeUs = result.Record().Values()["avgDbTimeUs"].(float64) + } + if result.Record().Values()["avgRdsOutputTransferTimeUs"] != nil { + rdsOutputTransferTimeUs = result.Record().Values()["avgRdsOutputTransferTimeUs"].(float64) + } + + arrayResult = append(arrayResult, &pb.TaskTimeDataPoint{ + ReceiveIoTimeUs: uint32(receiveIoTimeUs), + WriteToDiskTimeUs: uint32(writeToDiskTimeUs), + WriteToDatabaseTimeUs: uint32(writeToDatabaseTimeUs), + RdsSendToConsumerTimeUs: uint32(rdsOutputTransferTimeUs), + }) } - if result.Record().Values()["avgRdsOutputTransferTimeUs"] != nil { - rdsOutputTransferTimeUs = result.Record().Values()["avgRdsOutputTransferTimeUs"].(float64) + + if result.Err() != nil { + log.Error("queryTaskTime: queryfunction: result has error: RESULT: ", result, " ERROR: ", result.Err()) + return nil, result.Err() } - arrayResult = append(arrayResult, &pb.TaskTimeDataPoint{ - ReceiveIoTimeUs: uint32(receiveIoTimeUs), - WriteToDiskTimeUs: uint32(writeToDiskTimeUs), - WriteToDatabaseTimeUs: uint32(writeToDatabaseTimeUs), - RdsSendToConsumerTimeUs: uint32(rdsOutputTransferTimeUs), - }) + return arrayResult, nil } - if result.Err() != nil { - return nil, result.Err() + arrayResult, err := queryfunction(s, ctx, query) // Query newer data + if err != nil { + log.Error("queryTaskTIme: arrayResult: RESULT: ", arrayResult, " ERROR: ", err) + return nil, err } - + secondArrayResult, err := queryfunction(s, ctx, queryForOldData) // Query averaged data + if err != nil { + log.Error("queryTaskTime: secondArrayResult: RESULT: ", secondArrayResult, " ERROR: ", err) + return nil, err + } + // Append the results of the second query to those of the first + for a := range secondArrayResult { + arrayResult = append(arrayResult, secondArrayResult[a]) + } + fmt.Printf("\n") return arrayResult, nil } func queryMemoryUsage(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.RdsMemoryUsageDataPoint, error) { - var arrayResult []*pb.RdsMemoryUsageDataPoint - - result, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDbDatabase+"\")"+ - " |> range(start: "+startTime+", stop: "+endTime+")"+ - " |> filter(fn: (r) => r._measurement == \""+dbMeasurementRdsCacheMemoryUsage+"\")"+ - " |> filter(fn: (r) => r._field == \"rdsCacheUsedBytes\")"+ - filter+ - " |> group(columns: [\"_field\"])"+ - " |> aggregateWindow(every: "+strconv.Itoa(intervalInSec)+"s, fn: sum, createEmpty: true)"+ - " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")", - ) - if err != nil { - return nil, err - } + log.Debug("queryMemoryUsage") + // Querystrings + query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring + " |> range(start: " + startTime + ", stop: " + endTime + ")" + + " |> filter(fn: (r) => r._measurement == \"" + dbMeasurementRdsCacheMemoryUsage + "\")" + + " |> filter(fn: (r) => r._field == \"rdsCacheUsedBytes\")" + + filter + + " |> group(columns: [\"_field\"])" + + " |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: sum, createEmpty: true)" + + " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" + queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data + + queryfunction := func(s *QueryServer, ctx context.Context, query string) ([]*pb.RdsMemoryUsageDataPoint, error) { + var arrayResult []*pb.RdsMemoryUsageDataPoint + + result, err := s.dbQueryApi.Query(ctx, query) + if err != nil { + log.Error("queryMemoryUsage: queryfunction: RESULT: ", result, " ERROR: ", err) + return nil, err + } + for result.Next() { + usedBytes := int64(0) - for result.Next() { - usedBytes := int64(0) + if result.Record().Values()["rdsCacheUsedBytes"] != nil { + usedBytes = result.Record().Values()["rdsCacheUsedBytes"].(int64) + } - if result.Record().Values()["rdsCacheUsedBytes"] != nil { - usedBytes = result.Record().Values()["rdsCacheUsedBytes"].(int64) + arrayResult = append(arrayResult, &pb.RdsMemoryUsageDataPoint{ + TotalUsedMemory: uint64(usedBytes), + }) + } + if result.Err() != nil { + log.Error("queryMemoryUsage: queryfunction: result has error: RESULT: ", result, " ERROR: ", result.Err()) + return nil, result.Err() } - arrayResult = append(arrayResult, &pb.RdsMemoryUsageDataPoint{ - TotalUsedMemory: uint64(usedBytes), - }) + return arrayResult, nil } - if result.Err() != nil { - return nil, result.Err() + arrayResult, err := queryfunction(s, ctx, query) // Query for newer data + if err != nil { + log.Error("queryMemoryUsage: arrayResult: RESULT: ", arrayResult, " ERROR: ", err) + return nil, err } - + secondArrayResult, err := queryfunction(s, ctx, queryForOldData) // Query for averaged data + if err != nil { + log.Error("queryMemoryUsage: SecondArrayResult: RESULT: ", secondArrayResult, " ERROR: ", err) + return nil, err + } + // Append the results of the second query to those of the first + for a := range secondArrayResult { + arrayResult = append(arrayResult, secondArrayResult[a]) + } + fmt.Printf("\n") return arrayResult, nil } func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) (*pb.TopologyResponse, error) { + log.Debug("GetTopology") if doesStringContainsDangerousElements(query.BeamtimeFilter) { return nil, errors.New("some input characters are not allowed") } @@ -417,18 +532,13 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) ( startTime := strconv.FormatUint(query.FromTimestamp, 10) endTime := strconv.FormatUint(query.ToTimestamp, 10) - result, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDbDatabase+"\")"+ - " |> range(start: "+startTime+", stop: "+endTime+")"+ - " |> filter(fn: (r) => r._measurement == \""+dbMeasurementFileInput+"\" or r._measurement == \""+dbMeasurementBrokerFileRequests+"\")"+ - " |> filter(fn: (r) => "+createFilterElement("beamtime", query.BeamtimeFilter)+")"+ - " |> keep(columns: [\"receiverName\", \"brokerName\", \"pipelineStepId\", \"source\", \"producerInstanceId\", \"consumerInstanceId\"])"+ - " |> group()", - ) - - if err != nil { - fmt.Printf("Error: %v\n", err) - return nil, err - } + queryString := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring + " |> range(start: " + startTime + ", stop: " + endTime + ")" + + " |> filter(fn: (r) => r._measurement == \"" + dbMeasurementFileInput + "\" or r._measurement == \"" + dbMeasurementBrokerFileRequests + "\")" + + " |> filter(fn: (r) => " + createFilterElement("beamtime", query.BeamtimeFilter) + ")" + + " |> keep(columns: [\"receiverName\", \"brokerName\", \"pipelineStepId\", \"source\", \"producerInstanceId\", \"consumerInstanceId\"])" + + " |> group()" + queryForOldData := generateFilterForAVGData(s, queryString) // Querystring for averaged data type PipelineStep struct { // these maps here are replacements for HashSets producesSourceId map[string] /*produces sourceId*/ bool @@ -440,244 +550,265 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) ( involvedReceivers map[string] /*receiver id*/ bool } - pipelineSteps := map[string] /*pipelineStepId*/ PipelineStep{} + var responseFun pb.TopologyResponse + // Queryfunction that appends the results to a pb.TopologyResponse + queryfunction := func(s *QueryServer, ctx context.Context, query string, response pb.TopologyResponse) (*pb.TopologyResponse, error) { + result, err := s.dbQueryApi.Query(ctx, query) + if err != nil { + log.Error("GetTopology: queryfunction: RESULT: ", result, " ERROR: ", err) + fmt.Printf("Error: %v\n", err) + return nil, err + } - getOrCreateStep := func(stepId string) PipelineStep { - if _, containsKey := pipelineSteps[stepId]; !containsKey { - pipelineSteps[stepId] = PipelineStep{ - producesSourceId: map[string] /*produces sourceId*/ bool{}, - consumesSourceId: map[string] /*consumers sourceId*/ bool{}, - producerInstances: map[string] /*produces instanceId*/ bool{}, - consumerInstances: map[string] /*consumers instanceId*/ bool{}, - involvedReceivers: map[string] /*receiver id*/ bool{}, + pipelineSteps := map[string] /*pipelineStepId*/ PipelineStep{} + getOrCreateStep := func(stepId string) PipelineStep { + if _, containsKey := pipelineSteps[stepId]; !containsKey { + pipelineSteps[stepId] = PipelineStep{ + producesSourceId: map[string] /*produces sourceId*/ bool{}, + consumesSourceId: map[string] /*consumers sourceId*/ bool{}, + producerInstances: map[string] /*produces instanceId*/ bool{}, + consumerInstances: map[string] /*consumers instanceId*/ bool{}, + involvedReceivers: map[string] /*receiver id*/ bool{}, + } } + return pipelineSteps[stepId] } - return pipelineSteps[stepId] - } - for result.Next() { - if result.Record().Values()["receiverName"] != nil { // data is coming from receiver => means it must be a producer - stepId, ok := result.Record().Values()["pipelineStepId"].(string) - if !ok { - stepId = "defaultStep" + for result.Next() { + if result.Record().Values()["receiverName"] != nil { // data is coming from receiver => means it must be a producer + stepId, ok := result.Record().Values()["pipelineStepId"].(string) + if !ok { + stepId = "defaultStep" + } + source := result.Record().Values()["source"].(string) + producerInstanceId := result.Record().Values()["producerInstanceId"].(string) + var step = getOrCreateStep(stepId) + step.producesSourceId[source] = true + step.producerInstances[producerInstanceId] = true + var receiverName = result.Record().Values()["receiverName"].(string) + step.involvedReceivers[receiverName] = true + } else if result.Record().Values()["brokerName"] != nil { // data is coming from broker => means it must be a consumer + stepId := result.Record().Values()["pipelineStepId"].(string) + source := result.Record().Values()["source"].(string) + consumerInstanceId := result.Record().Values()["consumerInstanceId"].(string) + var step = getOrCreateStep(stepId) + step.consumesSourceId[source] = true + step.consumerInstances[consumerInstanceId] = true + } else { + log.Debug("Got an entry without receiverName or brokerName") } - source := result.Record().Values()["source"].(string) - producerInstanceId := result.Record().Values()["producerInstanceId"].(string) - var step = getOrCreateStep(stepId) - step.producesSourceId[source] = true - step.producerInstances[producerInstanceId] = true - var receiverName = result.Record().Values()["receiverName"].(string) - step.involvedReceivers[receiverName] = true - } else if result.Record().Values()["brokerName"] != nil { // data is coming from broker => means it must be a consumer - stepId := result.Record().Values()["pipelineStepId"].(string) - source := result.Record().Values()["source"].(string) - consumerInstanceId := result.Record().Values()["consumerInstanceId"].(string) - var step = getOrCreateStep(stepId) - step.consumesSourceId[source] = true - step.consumerInstances[consumerInstanceId] = true - } else { - log.Debug("Got an entry without receiverName or brokerName") } - } - type PipelineLevelStepInfo struct { - stepId string - } + type PipelineLevelStepInfo struct { + stepId string + } - type PipelineEdgeInfo struct { - fromStepId string - toStepId string - sourceId string - involvedReceivers map[string] /*receiver id*/ bool - } + type PipelineEdgeInfo struct { + fromStepId string + toStepId string + sourceId string + involvedReceivers map[string] /*receiver id*/ bool + } - // Simple algorithm - // Goes through each remainingStepIds and checks if all consumesSourceIds of this step is available - // If all consuming sources are available - // add it to the current level - // add all connecting edges - // and producing sources to availableSourcesInNextLevel - // If not go to next step, which might produces this - // When an iteration of remainingStepIds is done: - // Add all new producing sources to availableSources - // Go to next level - remainingStepIds := map[string] /*pipelineStepId*/ bool{} - availableSources := map[string] /*sourceId*/ [] /* available from pipelineStepId */ string{} - - for stepId, _ := range pipelineSteps { - remainingStepIds[stepId] = true - } - - var levels [] /*levelDepth*/ [] /*verticalIdx*/ PipelineLevelStepInfo - var edges []PipelineEdgeInfo - var currentLevel = -1 - - for len(remainingStepIds) > 0 { - currentLevel++ - levels = append(levels, []PipelineLevelStepInfo{}) - - var foundAtLeastOne = false - availableSourcesInNextLevel := map[string] /*sourceId*/ [] /* available from pipelineStepId */ string{} - for stepId := range remainingStepIds { - var allConsumingSourcesAvailable = true - for requiredSourceId := range pipelineSteps[stepId].consumesSourceId { - if _, sourceIsAvailable := availableSources[requiredSourceId]; !sourceIsAvailable { // Oh source is unavailable - allConsumingSourcesAvailable = false - } else { - // Verify that no other producer can create this source. - // Maybe we have to wait one or two steps until all producers are available - for stepId2 := range remainingStepIds { - if _, thereIsAnotherProducer := pipelineSteps[stepId2].producesSourceId[requiredSourceId]; thereIsAnotherProducer { - if stepId == stepId2 { - continue // The pipeline has self reference?! allow it - } - allConsumingSourcesAvailable = false - } - } - } - if !allConsumingSourcesAvailable { - break // We don't even need to try the other options - } - } + // Simple algorithm + // Goes through each remainingStepIds and checks if all consumesSourceIds of this step is available + // If all consuming sources are available + // add it to the current level + // add all connecting edges + // and producing sources to availableSourcesInNextLevel + // If not go to next step, which might produces this + // When an iteration of remainingStepIds is done: + // Add all new producing sources to availableSources + // Go to next level + remainingStepIds := map[string] /*pipelineStepId*/ bool{} + availableSources := map[string] /*sourceId*/ [] /* available from pipelineStepId */ string{} + + for stepId := range pipelineSteps { + remainingStepIds[stepId] = true + } - if allConsumingSourcesAvailable { - foundAtLeastOne = true - stepInfo := pipelineSteps[stepId] + var levels [] /*levelDepth*/ [] /*verticalIdx*/ PipelineLevelStepInfo + var edges []PipelineEdgeInfo + var currentLevel = -1 - // Add edge connection - for consumingSourceId := range stepInfo.consumesSourceId { - if stepInfo.producesSourceId[consumingSourceId] { // Add self reference edge - edges = append(edges, PipelineEdgeInfo{ - fromStepId: stepId, - toStepId: stepId, - sourceId: consumingSourceId, - involvedReceivers: stepInfo.involvedReceivers, - }) + for len(remainingStepIds) > 0 { + currentLevel++ + levels = append(levels, []PipelineLevelStepInfo{}) + + var foundAtLeastOne = false + availableSourcesInNextLevel := map[string] /*sourceId*/ [] /* available from pipelineStepId */ string{} + for stepId := range remainingStepIds { + var allConsumingSourcesAvailable = true + for requiredSourceId := range pipelineSteps[stepId].consumesSourceId { + if _, sourceIsAvailable := availableSources[requiredSourceId]; !sourceIsAvailable { // Oh source is unavailable + allConsumingSourcesAvailable = false + } else { + // Verify that no other producer can create this source. + // Maybe we have to wait one or two steps until all producers are available + for stepId2 := range remainingStepIds { + if _, thereIsAnotherProducer := pipelineSteps[stepId2].producesSourceId[requiredSourceId]; thereIsAnotherProducer { + if stepId == stepId2 { + continue // The pipeline has self reference?! allow it + } + allConsumingSourcesAvailable = false + } + } } - for _, sourceIdAvailableFromStepId := range availableSources[consumingSourceId] { // Add all the others edges - var producingSource = pipelineSteps[sourceIdAvailableFromStepId] - edges = append(edges, PipelineEdgeInfo{ - fromStepId: sourceIdAvailableFromStepId, - toStepId: stepId, - sourceId: consumingSourceId, - involvedReceivers: producingSource.involvedReceivers, - }) + if !allConsumingSourcesAvailable { + break // We don't even need to try the other options } } - // prepare sources for next level - for sourceId := range stepInfo.producesSourceId { - if _, sourceIsAvailable := availableSourcesInNextLevel[sourceId]; !sourceIsAvailable { - availableSourcesInNextLevel[sourceId] = nil + if allConsumingSourcesAvailable { + foundAtLeastOne = true + stepInfo := pipelineSteps[stepId] + + // Add edge connection + for consumingSourceId := range stepInfo.consumesSourceId { + if stepInfo.producesSourceId[consumingSourceId] { // Add self reference edge + edges = append(edges, PipelineEdgeInfo{ + fromStepId: stepId, + toStepId: stepId, + sourceId: consumingSourceId, + involvedReceivers: stepInfo.involvedReceivers, + }) + } + for _, sourceIdAvailableFromStepId := range availableSources[consumingSourceId] { // Add all the others edges + var producingSource = pipelineSteps[sourceIdAvailableFromStepId] + edges = append(edges, PipelineEdgeInfo{ + fromStepId: sourceIdAvailableFromStepId, + toStepId: stepId, + sourceId: consumingSourceId, + involvedReceivers: producingSource.involvedReceivers, + }) + } } - availableSourcesInNextLevel[sourceId] = append(availableSourcesInNextLevel[sourceId], stepId) - } - - levels[currentLevel] = append(levels[currentLevel], PipelineLevelStepInfo{ - stepId: stepId, - }) - - delete(remainingStepIds, stepId) - } - } + // prepare sources for next level + for sourceId := range stepInfo.producesSourceId { + if _, sourceIsAvailable := availableSourcesInNextLevel[sourceId]; !sourceIsAvailable { + availableSourcesInNextLevel[sourceId] = nil + } - if !foundAtLeastOne { - // probably only requests of files came, but no receiver registered a producer - // log.Error("infinite loop while building topology tree; Still has pipeline steps but found no way how to connect them") - // return nil, errors.New("infinite loop while building topology tree; Still has pipeline steps but found no way how to connect them") + availableSourcesInNextLevel[sourceId] = append(availableSourcesInNextLevel[sourceId], stepId) + } - var unkStep = "Unknown" + levels[currentLevel] = append(levels[currentLevel], PipelineLevelStepInfo{ + stepId: stepId, + }) - pipelineSteps[unkStep] = PipelineStep{ - producesSourceId: nil, - consumesSourceId: nil, - producerInstances: nil, - consumerInstances: nil, - involvedReceivers: nil, + delete(remainingStepIds, stepId) + } } - levels[0] = append(levels[0], PipelineLevelStepInfo{ - stepId: unkStep, - }) + if !foundAtLeastOne { + // probably only requests of files came, but no receiver registered a producer + // log.Error("infinite loop while building topology tree; Still has pipeline steps but found no way how to connect them") + // return nil, errors.New("infinite loop while building topology tree; Still has pipeline steps but found no way how to connect them") - if len(levels) < 2 { - levels = append(levels, []PipelineLevelStepInfo{}) - } + var unkStep = "Unknown" - for stepId := range remainingStepIds { - var step = pipelineSteps[stepId] - levels[1] = append(levels[1], PipelineLevelStepInfo{ - stepId: stepId, + pipelineSteps[unkStep] = PipelineStep{ + producesSourceId: nil, + consumesSourceId: nil, + producerInstances: nil, + consumerInstances: nil, + involvedReceivers: nil, + } + + levels[0] = append(levels[0], PipelineLevelStepInfo{ + stepId: unkStep, }) - for sourceId := range step.consumesSourceId { - edges = append(edges, PipelineEdgeInfo{ - fromStepId: unkStep, - toStepId: stepId, - sourceId: sourceId, - involvedReceivers: nil, - }) + if len(levels) < 2 { + levels = append(levels, []PipelineLevelStepInfo{}) } - } - break // Go to response building - } + for stepId := range remainingStepIds { + var step = pipelineSteps[stepId] + levels[1] = append(levels[1], PipelineLevelStepInfo{ + stepId: stepId, + }) - for sourceId, element := range availableSourcesInNextLevel { - if _, sourceIsAvailable := availableSources[sourceId]; !sourceIsAvailable { - availableSources[sourceId] = nil + for sourceId := range step.consumesSourceId { + edges = append(edges, PipelineEdgeInfo{ + fromStepId: unkStep, + toStepId: stepId, + sourceId: sourceId, + involvedReceivers: nil, + }) + } + } + + break // Go to response building } - for _, newSource := range element { - availableSources[sourceId] = append(availableSources[sourceId], newSource) + for sourceId, element := range availableSourcesInNextLevel { + if _, sourceIsAvailable := availableSources[sourceId]; !sourceIsAvailable { + availableSources[sourceId] = nil + } + + for _, newSource := range element { + availableSources[sourceId] = append(availableSources[sourceId], newSource) + } } } - } - // Response building - var response pb.TopologyResponse + // Response building - for levelIndex, level := range levels { - for _, node := range level { - var producers []string = nil - var consumers []string = nil + for levelIndex, level := range levels { + for _, node := range level { + var producers []string = nil + var consumers []string = nil - for producerInstanceId := range pipelineSteps[node.stepId].producerInstances { - producers = append(producers, producerInstanceId) - } - for consumerInstanceId := range pipelineSteps[node.stepId].consumerInstances { - consumers = append(consumers, consumerInstanceId) + for producerInstanceId := range pipelineSteps[node.stepId].producerInstances { + producers = append(producers, producerInstanceId) + } + for consumerInstanceId := range pipelineSteps[node.stepId].consumerInstances { + consumers = append(consumers, consumerInstanceId) + } + response.Nodes = append(response.Nodes, &pb.TopologyResponseNode{ + NodeId: node.stepId, + Level: uint32(levelIndex), + ProducerInstances: producers, + ConsumerInstances: consumers, + }) } - response.Nodes = append(response.Nodes, &pb.TopologyResponseNode{ - NodeId: node.stepId, - Level: uint32(levelIndex), - ProducerInstances: producers, - ConsumerInstances: consumers, - }) } - } - for _, edge := range edges { - newEdge := pb.TopologyResponseEdge{ - FromNodeId: edge.fromStepId, - ToNodeId: edge.toStepId, - SourceName: edge.sourceId, - InvolvedReceivers: nil, - } + for _, edge := range edges { + newEdge := pb.TopologyResponseEdge{ + FromNodeId: edge.fromStepId, + ToNodeId: edge.toStepId, + SourceName: edge.sourceId, + InvolvedReceivers: nil, + } - for receiverId := range edge.involvedReceivers { - newEdge.InvolvedReceivers = append(newEdge.InvolvedReceivers, receiverId) + for receiverId := range edge.involvedReceivers { + newEdge.InvolvedReceivers = append(newEdge.InvolvedReceivers, receiverId) + } + + response.Edges = append(response.Edges, &newEdge) } - response.Edges = append(response.Edges, &newEdge) + return &response, nil } - - return &response, nil + firstQuery, err := queryfunction(s, ctx, queryString, responseFun) // Query newer data + if err != nil { + log.Error("GetTopology: firstQuery: RESULT: ", firstQuery, " ERROR: ", err) + return nil, err + } + secondQuery, err := queryfunction(s, ctx, queryForOldData, *firstQuery) // Query averaged data and append it to the first query + if err != nil { + log.Error("GetTopology: secondQuery: RESULT: ", secondQuery, " ERROR: ", err) + return nil, err + } + fmt.Printf("\n") + return secondQuery, nil } func (s *QueryServer) GetGroupDelay(ctx context.Context, query *pb.GroupDelayQuery) (*pb.GroupDelayResponse, error) { - + log.Debug("GetGroupDelay") if query.BeamtimeFilter == "" { return nil, status.Errorf(codes.InvalidArgument, "Beamtime is required") } @@ -689,33 +820,49 @@ func (s *QueryServer) GetGroupDelay(ctx context.Context, query *pb.GroupDelayQue startTime := strconv.FormatUint(query.FromTimestamp, 10) endTime := strconv.FormatUint(query.ToTimestamp, 10) - result, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDbDatabase+"\")"+ - " |> range(start: "+startTime+", stop: "+endTime+")"+ - " |> filter(fn: (r) => r._measurement == \""+dbMeasurementBrokerFileRequests+"\""+" and r.brokerCommand == \"next\")"+ - " |> filter(fn: (r) => r.beamtime == \""+query.BeamtimeFilter+"\" and r._field == \"delayMs\")"+ - " |> group(columns: [\"groupId\"])"+ - " |> last()"+ - " |> keep(columns: [\"groupId\", \"_value\"])"+ - " |> group()", - ) - if err != nil { - return nil, err - } - + queryString := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring + " |> range(start: " + startTime + ", stop: " + endTime + ")" + + " |> filter(fn: (r) => r._measurement == \"" + dbMeasurementBrokerFileRequests + "\"" + " and r.brokerCommand == \"next\")" + + " |> filter(fn: (r) => r.beamtime == \"" + query.BeamtimeFilter + "\" and r._field == \"delayMs\")" + + " |> group(columns: [\"groupId\"])" + + " |> last()" + + " |> keep(columns: [\"groupId\", \"_value\"])" + + " |> group()" + queryForOldData := generateFilterForAVGData(s, queryString) // Querystring for averaged data response := pb.GroupDelayResponse{} - for result.Next() { - if result.Record().Values()["groupId"] != nil && result.Record().Value() != nil { - item := pb.GroupDelayResponseItem{ - GroupId: result.Record().Values()["groupId"].(string), - DelayMs: (uint32)(result.Record().Value().(int64)), + // Queryfunction that appends the results to a pb.GroupDelayResponse + queryfunction := func(s *QueryServer, ctx context.Context, query string, response pb.GroupDelayResponse) (*pb.GroupDelayResponse, error) { + result, err := s.dbQueryApi.Query(ctx, query) + if err != nil { + log.Error("GetGroupDelay: queryfunction: RESULT: ", result, " ERROR: ", err) + return nil, err + } + + for result.Next() { + if result.Record().Values()["groupId"] != nil && result.Record().Value() != nil { + item := pb.GroupDelayResponseItem{ + GroupId: result.Record().Values()["groupId"].(string), + DelayMs: (uint32)(result.Record().Value().(int64)), + } + response.GroupIds = append(response.GroupIds, &item) } - response.GroupIds = append(response.GroupIds, &item) } - } - sort.SliceStable(response.GroupIds, func(i, j int) bool { - return response.GroupIds[i].DelayMs > response.GroupIds[j].DelayMs - }) + sort.SliceStable(response.GroupIds, func(i, j int) bool { + return response.GroupIds[i].DelayMs > response.GroupIds[j].DelayMs + }) - return &response, nil + return &response, nil + } + firstQuery, err := queryfunction(s, ctx, queryString, response) // Query newer data + if err != nil { + log.Error("GetGroupDelay: firstQuery: RESULT: ", firstQuery, " ERROR: ", err) + return nil, err + } + secondQuery, err := queryfunction(s, ctx, queryForOldData, *firstQuery) // Query averaged data + if err != nil { + log.Error("GetGroupDelay: secondQuery: RESULT: ", secondQuery, " ERROR: ", err) + } + fmt.Printf("\n") + return secondQuery, err } diff --git a/monitoring/monitoring_server/src/asapo_monitoring_server/server/Settings.go b/monitoring/monitoring_server/src/asapo_monitoring_server/server/Settings.go index 137b8f091..cdabee52a 100644 --- a/monitoring/monitoring_server/src/asapo_monitoring_server/server/Settings.go +++ b/monitoring/monitoring_server/src/asapo_monitoring_server/server/Settings.go @@ -31,6 +31,9 @@ const ( // [sourceMeta], ftsName, pipelineStepId, consumerInstanceId // > ftsFileCount, totalFtsTransferredFileSize, avgTransferSendTimeUs dbMeasurementFtsTransfers = "ftsTransfers" + + // Used to get a corresponding bucket with averaged data + averageSuffix = "_avg" ) type Settings struct { @@ -39,6 +42,6 @@ type Settings struct { ServerPort uint16 // e.g. "50051" LogLevel string // e.g. "info" - InfluxDbUrl string // e.g. "http://127.0.0.1:8086" - InfluxDbDatabase string // e.g. "asapo-monitoring" + InfluxDbUrl string // e.g. "http://127.0.0.1:8086" + InfluxDbDatabase string // e.g. "asapo-monitoring" } -- GitLab