From 607d8585d50e980fb75ca0532aeae9b78ca9671f Mon Sep 17 00:00:00 2001 From: Lars Janssen <lars.janssen@hae-hamburg.de> Date: Wed, 1 Mar 2023 15:51:05 +0100 Subject: [PATCH] Reverted GetTopology and GetGroupDelay --- .../server/QueryServer.go | 480 +++++++++--------- 1 file changed, 229 insertions(+), 251 deletions(-) diff --git a/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go b/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go index c77738a10..6a7f6aedf 100644 --- a/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go +++ b/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go @@ -5,6 +5,7 @@ import ( log "asapo_common/logger" "context" "errors" + "fmt" "github.com/influxdata/influxdb-client-go/v2/api" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -500,13 +501,18 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) ( startTime := strconv.FormatUint(query.FromTimestamp, 10) endTime := strconv.FormatUint(query.ToTimestamp, 10) - queryString := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring - " |> range(start: " + startTime + ", stop: " + endTime + ")" + - " |> filter(fn: (r) => r._measurement == \"" + dbMeasurementFileInput + "\" or r._measurement == \"" + dbMeasurementBrokerFileRequests + "\")" + - " |> filter(fn: (r) => " + createFilterElement("beamtime", query.BeamtimeFilter) + ")" + - " |> keep(columns: [\"receiverName\", \"brokerName\", \"pipelineStepId\", \"source\", \"producerInstanceId\", \"consumerInstanceId\"])" + - " |> group()" - queryForOldData := generateFilterForAVGData(s, queryString) // Querystring for averaged data + result, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDbDatabase+"\")"+ + " |> range(start: "+startTime+", stop: "+endTime+")"+ + " |> filter(fn: (r) => r._measurement == \""+dbMeasurementFileInput+"\" or r._measurement == \""+dbMeasurementBrokerFileRequests+"\")"+ + " |> filter(fn: (r) => "+createFilterElement("beamtime", query.BeamtimeFilter)+")"+ + " |> keep(columns: [\"receiverName\", \"brokerName\", \"pipelineStepId\", \"source\", \"producerInstanceId\", \"consumerInstanceId\"])"+ + " |> group()", + ) + + if err != nil { + fmt.Printf("Error: %v\n", err) + return nil, err + } type PipelineStep struct { // these maps here are replacements for HashSets producesSourceId map[string] /*produces sourceId*/ bool @@ -518,259 +524,244 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) ( involvedReceivers map[string] /*receiver id*/ bool } - var responseFun pb.TopologyResponse - // Queryfunction that appends the results to a pb.TopologyResponse - queryfunction := func(s *QueryServer, ctx context.Context, query string, response pb.TopologyResponse) (*pb.TopologyResponse, error) { - result, err := s.dbQueryApi.Query(ctx, query) - if err != nil { - return nil, err - } + pipelineSteps := map[string] /*pipelineStepId*/ PipelineStep{} - pipelineSteps := map[string] /*pipelineStepId*/ PipelineStep{} - getOrCreateStep := func(stepId string) PipelineStep { - if _, containsKey := pipelineSteps[stepId]; !containsKey { - pipelineSteps[stepId] = PipelineStep{ - producesSourceId: map[string] /*produces sourceId*/ bool{}, - consumesSourceId: map[string] /*consumers sourceId*/ bool{}, - producerInstances: map[string] /*produces instanceId*/ bool{}, - consumerInstances: map[string] /*consumers instanceId*/ bool{}, - involvedReceivers: map[string] /*receiver id*/ bool{}, - } + getOrCreateStep := func(stepId string) PipelineStep { + if _, containsKey := pipelineSteps[stepId]; !containsKey { + pipelineSteps[stepId] = PipelineStep{ + producesSourceId: map[string] /*produces sourceId*/ bool{}, + consumesSourceId: map[string] /*consumers sourceId*/ bool{}, + producerInstances: map[string] /*produces instanceId*/ bool{}, + consumerInstances: map[string] /*consumers instanceId*/ bool{}, + involvedReceivers: map[string] /*receiver id*/ bool{}, } - return pipelineSteps[stepId] } + return pipelineSteps[stepId] + } - for result.Next() { - if result.Record().Values()["receiverName"] != nil { // data is coming from receiver => means it must be a producer - stepId, ok := result.Record().Values()["pipelineStepId"].(string) - if !ok { - stepId = "defaultStep" - } - source := result.Record().Values()["source"].(string) - producerInstanceId := result.Record().Values()["producerInstanceId"].(string) - var step = getOrCreateStep(stepId) - step.producesSourceId[source] = true - step.producerInstances[producerInstanceId] = true - var receiverName = result.Record().Values()["receiverName"].(string) - step.involvedReceivers[receiverName] = true - } else if result.Record().Values()["brokerName"] != nil { // data is coming from broker => means it must be a consumer - stepId := result.Record().Values()["pipelineStepId"].(string) - source := result.Record().Values()["source"].(string) - consumerInstanceId := result.Record().Values()["consumerInstanceId"].(string) - var step = getOrCreateStep(stepId) - step.consumesSourceId[source] = true - step.consumerInstances[consumerInstanceId] = true - } else { - log.Debug("Got an entry without receiverName or brokerName") + for result.Next() { + if result.Record().Values()["receiverName"] != nil { // data is coming from receiver => means it must be a producer + stepId, ok := result.Record().Values()["pipelineStepId"].(string) + if !ok { + stepId = "defaultStep" } + source := result.Record().Values()["source"].(string) + producerInstanceId := result.Record().Values()["producerInstanceId"].(string) + var step = getOrCreateStep(stepId) + step.producesSourceId[source] = true + step.producerInstances[producerInstanceId] = true + var receiverName = result.Record().Values()["receiverName"].(string) + step.involvedReceivers[receiverName] = true + } else if result.Record().Values()["brokerName"] != nil { // data is coming from broker => means it must be a consumer + stepId := result.Record().Values()["pipelineStepId"].(string) + source := result.Record().Values()["source"].(string) + consumerInstanceId := result.Record().Values()["consumerInstanceId"].(string) + var step = getOrCreateStep(stepId) + step.consumesSourceId[source] = true + step.consumerInstances[consumerInstanceId] = true + } else { + log.Debug("Got an entry without receiverName or brokerName") } + } - type PipelineLevelStepInfo struct { - stepId string - } - - type PipelineEdgeInfo struct { - fromStepId string - toStepId string - sourceId string - involvedReceivers map[string] /*receiver id*/ bool - } - - // Simple algorithm - // Goes through each remainingStepIds and checks if all consumesSourceIds of this step is available - // If all consuming sources are available - // add it to the current level - // add all connecting edges - // and producing sources to availableSourcesInNextLevel - // If not go to next step, which might produces this - // When an iteration of remainingStepIds is done: - // Add all new producing sources to availableSources - // Go to next level - remainingStepIds := map[string] /*pipelineStepId*/ bool{} - availableSources := map[string] /*sourceId*/ [] /* available from pipelineStepId */ string{} - - for stepId := range pipelineSteps { - remainingStepIds[stepId] = true - } - - var levels [] /*levelDepth*/ [] /*verticalIdx*/ PipelineLevelStepInfo - var edges []PipelineEdgeInfo - var currentLevel = -1 + type PipelineLevelStepInfo struct { + stepId string + } - for len(remainingStepIds) > 0 { - currentLevel++ - levels = append(levels, []PipelineLevelStepInfo{}) + type PipelineEdgeInfo struct { + fromStepId string + toStepId string + sourceId string + involvedReceivers map[string] /*receiver id*/ bool + } - var foundAtLeastOne = false - availableSourcesInNextLevel := map[string] /*sourceId*/ [] /* available from pipelineStepId */ string{} - for stepId := range remainingStepIds { - var allConsumingSourcesAvailable = true - for requiredSourceId := range pipelineSteps[stepId].consumesSourceId { - if _, sourceIsAvailable := availableSources[requiredSourceId]; !sourceIsAvailable { // Oh source is unavailable - allConsumingSourcesAvailable = false - } else { - // Verify that no other producer can create this source. - // Maybe we have to wait one or two steps until all producers are available - for stepId2 := range remainingStepIds { - if _, thereIsAnotherProducer := pipelineSteps[stepId2].producesSourceId[requiredSourceId]; thereIsAnotherProducer { - if stepId == stepId2 { - continue // The pipeline has self reference?! allow it - } - allConsumingSourcesAvailable = false + // Simple algorithm + // Goes through each remainingStepIds and checks if all consumesSourceIds of this step is available + // If all consuming sources are available + // add it to the current level + // add all connecting edges + // and producing sources to availableSourcesInNextLevel + // If not go to next step, which might produces this + // When an iteration of remainingStepIds is done: + // Add all new producing sources to availableSources + // Go to next level + remainingStepIds := map[string] /*pipelineStepId*/ bool{} + availableSources := map[string] /*sourceId*/ [] /* available from pipelineStepId */ string{} + + for stepId, _ := range pipelineSteps { + remainingStepIds[stepId] = true + } + + var levels [] /*levelDepth*/ [] /*verticalIdx*/ PipelineLevelStepInfo + var edges []PipelineEdgeInfo + var currentLevel = -1 + + for len(remainingStepIds) > 0 { + currentLevel++ + levels = append(levels, []PipelineLevelStepInfo{}) + + var foundAtLeastOne = false + availableSourcesInNextLevel := map[string] /*sourceId*/ [] /* available from pipelineStepId */ string{} + for stepId := range remainingStepIds { + var allConsumingSourcesAvailable = true + for requiredSourceId := range pipelineSteps[stepId].consumesSourceId { + if _, sourceIsAvailable := availableSources[requiredSourceId]; !sourceIsAvailable { // Oh source is unavailable + allConsumingSourcesAvailable = false + } else { + // Verify that no other producer can create this source. + // Maybe we have to wait one or two steps until all producers are available + for stepId2 := range remainingStepIds { + if _, thereIsAnotherProducer := pipelineSteps[stepId2].producesSourceId[requiredSourceId]; thereIsAnotherProducer { + if stepId == stepId2 { + continue // The pipeline has self reference?! allow it } + allConsumingSourcesAvailable = false } } - if !allConsumingSourcesAvailable { - break // We don't even need to try the other options - } } + if !allConsumingSourcesAvailable { + break // We don't even need to try the other options + } + } - if allConsumingSourcesAvailable { - foundAtLeastOne = true - stepInfo := pipelineSteps[stepId] - - // Add edge connection - for consumingSourceId := range stepInfo.consumesSourceId { - if stepInfo.producesSourceId[consumingSourceId] { // Add self reference edge - edges = append(edges, PipelineEdgeInfo{ - fromStepId: stepId, - toStepId: stepId, - sourceId: consumingSourceId, - involvedReceivers: stepInfo.involvedReceivers, - }) - } - for _, sourceIdAvailableFromStepId := range availableSources[consumingSourceId] { // Add all the others edges - var producingSource = pipelineSteps[sourceIdAvailableFromStepId] - edges = append(edges, PipelineEdgeInfo{ - fromStepId: sourceIdAvailableFromStepId, - toStepId: stepId, - sourceId: consumingSourceId, - involvedReceivers: producingSource.involvedReceivers, - }) - } - } - - // prepare sources for next level - for sourceId := range stepInfo.producesSourceId { - if _, sourceIsAvailable := availableSourcesInNextLevel[sourceId]; !sourceIsAvailable { - availableSourcesInNextLevel[sourceId] = nil - } + if allConsumingSourcesAvailable { + foundAtLeastOne = true + stepInfo := pipelineSteps[stepId] - availableSourcesInNextLevel[sourceId] = append(availableSourcesInNextLevel[sourceId], stepId) + // Add edge connection + for consumingSourceId := range stepInfo.consumesSourceId { + if stepInfo.producesSourceId[consumingSourceId] { // Add self reference edge + edges = append(edges, PipelineEdgeInfo{ + fromStepId: stepId, + toStepId: stepId, + sourceId: consumingSourceId, + involvedReceivers: stepInfo.involvedReceivers, + }) + } + for _, sourceIdAvailableFromStepId := range availableSources[consumingSourceId] { // Add all the others edges + var producingSource = pipelineSteps[sourceIdAvailableFromStepId] + edges = append(edges, PipelineEdgeInfo{ + fromStepId: sourceIdAvailableFromStepId, + toStepId: stepId, + sourceId: consumingSourceId, + involvedReceivers: producingSource.involvedReceivers, + }) } - - levels[currentLevel] = append(levels[currentLevel], PipelineLevelStepInfo{ - stepId: stepId, - }) - - delete(remainingStepIds, stepId) } - } - if !foundAtLeastOne { - // probably only requests of files came, but no receiver registered a producer - // log.Error("infinite loop while building topology tree; Still has pipeline steps but found no way how to connect them") - // return nil, errors.New("infinite loop while building topology tree; Still has pipeline steps but found no way how to connect them") - - var unkStep = "Unknown" + // prepare sources for next level + for sourceId := range stepInfo.producesSourceId { + if _, sourceIsAvailable := availableSourcesInNextLevel[sourceId]; !sourceIsAvailable { + availableSourcesInNextLevel[sourceId] = nil + } - pipelineSteps[unkStep] = PipelineStep{ - producesSourceId: nil, - consumesSourceId: nil, - producerInstances: nil, - consumerInstances: nil, - involvedReceivers: nil, + availableSourcesInNextLevel[sourceId] = append(availableSourcesInNextLevel[sourceId], stepId) } - levels[0] = append(levels[0], PipelineLevelStepInfo{ - stepId: unkStep, + levels[currentLevel] = append(levels[currentLevel], PipelineLevelStepInfo{ + stepId: stepId, }) - if len(levels) < 2 { - levels = append(levels, []PipelineLevelStepInfo{}) - } + delete(remainingStepIds, stepId) + } + } - for stepId := range remainingStepIds { - var step = pipelineSteps[stepId] - levels[1] = append(levels[1], PipelineLevelStepInfo{ - stepId: stepId, - }) + if !foundAtLeastOne { + // probably only requests of files came, but no receiver registered a producer + // log.Error("infinite loop while building topology tree; Still has pipeline steps but found no way how to connect them") + // return nil, errors.New("infinite loop while building topology tree; Still has pipeline steps but found no way how to connect them") - for sourceId := range step.consumesSourceId { - edges = append(edges, PipelineEdgeInfo{ - fromStepId: unkStep, - toStepId: stepId, - sourceId: sourceId, - involvedReceivers: nil, - }) - } - } + var unkStep = "Unknown" - break // Go to response building + pipelineSteps[unkStep] = PipelineStep{ + producesSourceId: nil, + consumesSourceId: nil, + producerInstances: nil, + consumerInstances: nil, + involvedReceivers: nil, } - for sourceId, element := range availableSourcesInNextLevel { - if _, sourceIsAvailable := availableSources[sourceId]; !sourceIsAvailable { - availableSources[sourceId] = nil - } + levels[0] = append(levels[0], PipelineLevelStepInfo{ + stepId: unkStep, + }) - for _, newSource := range element { - availableSources[sourceId] = append(availableSources[sourceId], newSource) - } + if len(levels) < 2 { + levels = append(levels, []PipelineLevelStepInfo{}) } - } - - // Response building - for levelIndex, level := range levels { - for _, node := range level { - var producers []string = nil - var consumers []string = nil + for stepId := range remainingStepIds { + var step = pipelineSteps[stepId] + levels[1] = append(levels[1], PipelineLevelStepInfo{ + stepId: stepId, + }) - for producerInstanceId := range pipelineSteps[node.stepId].producerInstances { - producers = append(producers, producerInstanceId) - } - for consumerInstanceId := range pipelineSteps[node.stepId].consumerInstances { - consumers = append(consumers, consumerInstanceId) + for sourceId := range step.consumesSourceId { + edges = append(edges, PipelineEdgeInfo{ + fromStepId: unkStep, + toStepId: stepId, + sourceId: sourceId, + involvedReceivers: nil, + }) } - response.Nodes = append(response.Nodes, &pb.TopologyResponseNode{ - NodeId: node.stepId, - Level: uint32(levelIndex), - ProducerInstances: producers, - ConsumerInstances: consumers, - }) } + + break // Go to response building } - for _, edge := range edges { - newEdge := pb.TopologyResponseEdge{ - FromNodeId: edge.fromStepId, - ToNodeId: edge.toStepId, - SourceName: edge.sourceId, - InvolvedReceivers: nil, + for sourceId, element := range availableSourcesInNextLevel { + if _, sourceIsAvailable := availableSources[sourceId]; !sourceIsAvailable { + availableSources[sourceId] = nil } - for receiverId := range edge.involvedReceivers { - newEdge.InvolvedReceivers = append(newEdge.InvolvedReceivers, receiverId) + for _, newSource := range element { + availableSources[sourceId] = append(availableSources[sourceId], newSource) } - - response.Edges = append(response.Edges, &newEdge) } - - return &response, nil } - firstQuery, err := queryfunction(s, ctx, queryString, responseFun) // Query newer data - if err != nil { - return nil, err + + // Response building + var response pb.TopologyResponse + + for levelIndex, level := range levels { + for _, node := range level { + var producers []string = nil + var consumers []string = nil + + for producerInstanceId := range pipelineSteps[node.stepId].producerInstances { + producers = append(producers, producerInstanceId) + } + for consumerInstanceId := range pipelineSteps[node.stepId].consumerInstances { + consumers = append(consumers, consumerInstanceId) + } + response.Nodes = append(response.Nodes, &pb.TopologyResponseNode{ + NodeId: node.stepId, + Level: uint32(levelIndex), + ProducerInstances: producers, + ConsumerInstances: consumers, + }) + } } - secondQuery, err := queryfunction(s, ctx, queryForOldData, *firstQuery) // Query averaged data and append it to the first query - if err != nil { - return nil, err + + for _, edge := range edges { + newEdge := pb.TopologyResponseEdge{ + FromNodeId: edge.fromStepId, + ToNodeId: edge.toStepId, + SourceName: edge.sourceId, + InvolvedReceivers: nil, + } + + for receiverId := range edge.involvedReceivers { + newEdge.InvolvedReceivers = append(newEdge.InvolvedReceivers, receiverId) + } + + response.Edges = append(response.Edges, &newEdge) } - return secondQuery, nil + + return &response, nil } func (s *QueryServer) GetGroupDelay(ctx context.Context, query *pb.GroupDelayQuery) (*pb.GroupDelayResponse, error) { + if query.BeamtimeFilter == "" { return nil, status.Errorf(codes.InvalidArgument, "Beamtime is required") } @@ -782,46 +773,33 @@ func (s *QueryServer) GetGroupDelay(ctx context.Context, query *pb.GroupDelayQue startTime := strconv.FormatUint(query.FromTimestamp, 10) endTime := strconv.FormatUint(query.ToTimestamp, 10) - queryString := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring - " |> range(start: " + startTime + ", stop: " + endTime + ")" + - " |> filter(fn: (r) => r._measurement == \"" + dbMeasurementBrokerFileRequests + "\"" + " and r.brokerCommand == \"next\")" + - " |> filter(fn: (r) => r.beamtime == \"" + query.BeamtimeFilter + "\" and r._field == \"delayMs\")" + - " |> group(columns: [\"groupId\"])" + - " |> last()" + - " |> keep(columns: [\"groupId\", \"_value\"])" + - " |> group()" - queryForOldData := generateFilterForAVGData(s, queryString) // Querystring for averaged data - response := pb.GroupDelayResponse{} - // Queryfunction that appends the results to a pb.GroupDelayResponse - queryfunction := func(s *QueryServer, ctx context.Context, query string, response pb.GroupDelayResponse) (*pb.GroupDelayResponse, error) { - result, err := s.dbQueryApi.Query(ctx, query) - if err != nil { - return nil, err - } + result, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDbDatabase+"\")"+ + " |> range(start: "+startTime+", stop: "+endTime+")"+ + " |> filter(fn: (r) => r._measurement == \""+dbMeasurementBrokerFileRequests+"\""+" and r.brokerCommand == \"next\")"+ + " |> filter(fn: (r) => r.beamtime == \""+query.BeamtimeFilter+"\" and r._field == \"delayMs\")"+ + " |> group(columns: [\"groupId\"])"+ + " |> last()"+ + " |> keep(columns: [\"groupId\", \"_value\"])"+ + " |> group()", + ) + if err != nil { + return nil, err + } - for result.Next() { - if result.Record().Values()["groupId"] != nil && result.Record().Value() != nil { - item := pb.GroupDelayResponseItem{ - GroupId: result.Record().Values()["groupId"].(string), - DelayMs: (uint32)(result.Record().Value().(int64)), - } - response.GroupIds = append(response.GroupIds, &item) + response := pb.GroupDelayResponse{} + for result.Next() { + if result.Record().Values()["groupId"] != nil && result.Record().Value() != nil { + item := pb.GroupDelayResponseItem{ + GroupId: result.Record().Values()["groupId"].(string), + DelayMs: (uint32)(result.Record().Value().(int64)), } + response.GroupIds = append(response.GroupIds, &item) } + } - sort.SliceStable(response.GroupIds, func(i, j int) bool { - return response.GroupIds[i].DelayMs > response.GroupIds[j].DelayMs - }) + sort.SliceStable(response.GroupIds, func(i, j int) bool { + return response.GroupIds[i].DelayMs > response.GroupIds[j].DelayMs + }) - return &response, nil - } - firstQuery, err := queryfunction(s, ctx, queryString, response) // Query newer data - if err != nil { - return nil, err - } - secondQuery, err := queryfunction(s, ctx, queryForOldData, *firstQuery) // Query averaged data - if err != nil { - return nil, err - } - return secondQuery, err + return &response, nil } -- GitLab