diff --git a/monitoring/monitoring_server/src/asapo_monitoring_server/server/IngestServer.go b/monitoring/monitoring_server/src/asapo_monitoring_server/server/IngestServer.go index e2760e997727eeb350099b64592def12e87e4260..b500741ed90a24d73194d0c847b1dbf0f7a10cd4 100644 --- a/monitoring/monitoring_server/src/asapo_monitoring_server/server/IngestServer.go +++ b/monitoring/monitoring_server/src/asapo_monitoring_server/server/IngestServer.go @@ -51,7 +51,6 @@ func (s *IngestServer) InsertReceiverDataPoints(ctx context.Context, data *pb.Re timestamp, ) - println("Got: step: " + dataPoint.PipelineStepId + " with source: " + dataPoint.Source) s.dbWriterApi.WritePoint(p) } diff --git a/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go b/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go index eb50cd25c0bf8dd9077c7443efd964b1c59fcf28..055c0db2a2a9944c5b90a0f33d8c5c53f2237c4e 100644 --- a/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go +++ b/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go @@ -216,7 +216,6 @@ func (s *QueryServer) GetDataPoints(ctx context.Context, query *pb.DataPointsQue } func queryTransferSpeed(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TotalTransferRateDataPoint, error) { - println("queryTransferSpeed filter: " + filter) var arrayResult []*pb.TotalTransferRateDataPoint result, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDb2Bucket+"\")"+ @@ -426,7 +425,9 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) ( result, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDb2Bucket+"\")"+ " |> range(start: "+startTime+", stop: "+endTime+")"+ " |> filter(fn: (r) => r._measurement == \""+dbMeasurementFileInput+"\" or r._measurement == \""+dbMeasurementBrokerFileRequests+"\")" + - " |> filter(fn: (r) => " + createFilterElement("beamtime", query.BeamtimeFilter) + ")", + " |> filter(fn: (r) => " + createFilterElement("beamtime", query.BeamtimeFilter) + ")" + + " |> keep(columns: [\"receiverName\", \"brokerName\", \"pipelineStepId\", \"source\", \"producerInstanceId\", \"consumerInstanceId\"])" + + " |> group()", ) if err != nil { @@ -491,6 +492,16 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) ( involvedReceivers map[string] /*receiver id*/ bool } + // Simple algorithm + // Goes through each remainingStepIds and checks if all consumesSourceIds of this step is available + // If all consuming sources are available + // add it to the current level + // add all connecting edges + // and producing sources to availableSourcesInNextLevel + // If not go to next step, which might produces this + // When an iteration of remainingStepIds is done: + // Add all new producing sources to availableSources + // Go to next level remainingStepIds := map[string] /*pipelineStepId*/ bool{} availableSources := map[string] /*sourceId*/ [] /* available from pipelineStepId */ string{} @@ -511,8 +522,11 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) ( for stepId := range remainingStepIds { var allConsumingSourcesAvailable = true for requiredSourceId := range pipelineSteps[stepId].consumesSourceId { - if _, sourceIsAvailable := availableSources[requiredSourceId]; !sourceIsAvailable { - // Verify that no other producer can create this stream + if _, sourceIsAvailable := availableSources[requiredSourceId]; !sourceIsAvailable { // Oh source is unavailable + allConsumingSourcesAvailable = false + } else { + // Verify that no other producer can create this stream. + // Maybe we have to wait one or two steps until all producers are available for stepId2 := range remainingStepIds { if _, thereIsAnotherProducer := pipelineSteps[stepId2].producesSourceId[requiredSourceId]; thereIsAnotherProducer { if stepId == stepId2 { @@ -521,10 +535,9 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) ( allConsumingSourcesAvailable = false } } - - if !allConsumingSourcesAvailable { - break - } + } + if !allConsumingSourcesAvailable { + break // We don't even need to try the other options } } @@ -542,7 +555,7 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) ( involvedReceivers: stepInfo.involvedReceivers, }) } - for _, sourceIdAvailableFromStepId := range availableSources[consumingSourceId] { // Add all other edges + for _, sourceIdAvailableFromStepId := range availableSources[consumingSourceId] { // Add all the others edges edges = append(edges, PipelineEdgeInfo{ fromStepId: sourceIdAvailableFromStepId, toStepId: stepId,