From 6d0162cd150248529fbda2b5fe024838d6646cf1 Mon Sep 17 00:00:00 2001 From: Carsten Patzke <carsten.patzke@desy.de> Date: Mon, 16 Aug 2021 16:05:58 +0200 Subject: [PATCH] Improved ToplogyDetection Code --- .../server/IngestServer.go | 1 - .../server/QueryServer.go | 31 +++++++++++++------ 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/monitoring/monitoring_server/src/asapo_monitoring_server/server/IngestServer.go b/monitoring/monitoring_server/src/asapo_monitoring_server/server/IngestServer.go index e2760e997..b500741ed 100644 --- a/monitoring/monitoring_server/src/asapo_monitoring_server/server/IngestServer.go +++ b/monitoring/monitoring_server/src/asapo_monitoring_server/server/IngestServer.go @@ -51,7 +51,6 @@ func (s *IngestServer) InsertReceiverDataPoints(ctx context.Context, data *pb.Re timestamp, ) - println("Got: step: " + dataPoint.PipelineStepId + " with source: " + dataPoint.Source) s.dbWriterApi.WritePoint(p) } diff --git a/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go b/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go index eb50cd25c..055c0db2a 100644 --- a/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go +++ b/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go @@ -216,7 +216,6 @@ func (s *QueryServer) GetDataPoints(ctx context.Context, query *pb.DataPointsQue } func queryTransferSpeed(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TotalTransferRateDataPoint, error) { - println("queryTransferSpeed filter: " + filter) var arrayResult []*pb.TotalTransferRateDataPoint result, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDb2Bucket+"\")"+ @@ -426,7 +425,9 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) ( result, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDb2Bucket+"\")"+ " |> range(start: "+startTime+", stop: "+endTime+")"+ " |> filter(fn: (r) => r._measurement == \""+dbMeasurementFileInput+"\" or r._measurement == \""+dbMeasurementBrokerFileRequests+"\")" + - " |> filter(fn: (r) => " + createFilterElement("beamtime", query.BeamtimeFilter) + ")", + " |> filter(fn: (r) => " + createFilterElement("beamtime", query.BeamtimeFilter) + ")" + + " |> keep(columns: [\"receiverName\", \"brokerName\", \"pipelineStepId\", \"source\", \"producerInstanceId\", \"consumerInstanceId\"])" + + " |> group()", ) if err != nil { @@ -491,6 +492,16 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) ( involvedReceivers map[string] /*receiver id*/ bool } + // Simple algorithm + // Goes through each remainingStepIds and checks if all consumesSourceIds of this step is available + // If all consuming sources are available + // add it to the current level + // add all connecting edges + // and producing sources to availableSourcesInNextLevel + // If not go to next step, which might produces this + // When an iteration of remainingStepIds is done: + // Add all new producing sources to availableSources + // Go to next level remainingStepIds := map[string] /*pipelineStepId*/ bool{} availableSources := map[string] /*sourceId*/ [] /* available from pipelineStepId */ string{} @@ -511,8 +522,11 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) ( for stepId := range remainingStepIds { var allConsumingSourcesAvailable = true for requiredSourceId := range pipelineSteps[stepId].consumesSourceId { - if _, sourceIsAvailable := availableSources[requiredSourceId]; !sourceIsAvailable { - // Verify that no other producer can create this stream + if _, sourceIsAvailable := availableSources[requiredSourceId]; !sourceIsAvailable { // Oh source is unavailable + allConsumingSourcesAvailable = false + } else { + // Verify that no other producer can create this stream. + // Maybe we have to wait one or two steps until all producers are available for stepId2 := range remainingStepIds { if _, thereIsAnotherProducer := pipelineSteps[stepId2].producesSourceId[requiredSourceId]; thereIsAnotherProducer { if stepId == stepId2 { @@ -521,10 +535,9 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) ( allConsumingSourcesAvailable = false } } - - if !allConsumingSourcesAvailable { - break - } + } + if !allConsumingSourcesAvailable { + break // We don't even need to try the other options } } @@ -542,7 +555,7 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) ( involvedReceivers: stepInfo.involvedReceivers, }) } - for _, sourceIdAvailableFromStepId := range availableSources[consumingSourceId] { // Add all other edges + for _, sourceIdAvailableFromStepId := range availableSources[consumingSourceId] { // Add all the others edges edges = append(edges, PipelineEdgeInfo{ fromStepId: sourceIdAvailableFromStepId, toStepId: stepId, -- GitLab