Skip to content
Snippets Groups Projects
Commit 6d0162cd authored by Carsten Patzke's avatar Carsten Patzke
Browse files

Improved ToplogyDetection Code

parent a71a6c5f
No related branches found
No related tags found
No related merge requests found
......@@ -51,7 +51,6 @@ func (s *IngestServer) InsertReceiverDataPoints(ctx context.Context, data *pb.Re
timestamp,
)
println("Got: step: " + dataPoint.PipelineStepId + " with source: " + dataPoint.Source)
s.dbWriterApi.WritePoint(p)
}
......
......@@ -216,7 +216,6 @@ func (s *QueryServer) GetDataPoints(ctx context.Context, query *pb.DataPointsQue
}
func queryTransferSpeed(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TotalTransferRateDataPoint, error) {
println("queryTransferSpeed filter: " + filter)
var arrayResult []*pb.TotalTransferRateDataPoint
result, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDb2Bucket+"\")"+
......@@ -426,7 +425,9 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) (
result, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDb2Bucket+"\")"+
" |> range(start: "+startTime+", stop: "+endTime+")"+
" |> filter(fn: (r) => r._measurement == \""+dbMeasurementFileInput+"\" or r._measurement == \""+dbMeasurementBrokerFileRequests+"\")" +
" |> filter(fn: (r) => " + createFilterElement("beamtime", query.BeamtimeFilter) + ")",
" |> filter(fn: (r) => " + createFilterElement("beamtime", query.BeamtimeFilter) + ")" +
" |> keep(columns: [\"receiverName\", \"brokerName\", \"pipelineStepId\", \"source\", \"producerInstanceId\", \"consumerInstanceId\"])" +
" |> group()",
)
if err != nil {
......@@ -491,6 +492,16 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) (
involvedReceivers map[string] /*receiver id*/ bool
}
// Simple algorithm
// Goes through each remainingStepIds and checks if all consumesSourceIds of this step is available
// If all consuming sources are available
// add it to the current level
// add all connecting edges
// and producing sources to availableSourcesInNextLevel
// If not go to next step, which might produces this
// When an iteration of remainingStepIds is done:
// Add all new producing sources to availableSources
// Go to next level
remainingStepIds := map[string] /*pipelineStepId*/ bool{}
availableSources := map[string] /*sourceId*/ [] /* available from pipelineStepId */ string{}
......@@ -511,8 +522,11 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) (
for stepId := range remainingStepIds {
var allConsumingSourcesAvailable = true
for requiredSourceId := range pipelineSteps[stepId].consumesSourceId {
if _, sourceIsAvailable := availableSources[requiredSourceId]; !sourceIsAvailable {
// Verify that no other producer can create this stream
if _, sourceIsAvailable := availableSources[requiredSourceId]; !sourceIsAvailable { // Oh source is unavailable
allConsumingSourcesAvailable = false
} else {
// Verify that no other producer can create this stream.
// Maybe we have to wait one or two steps until all producers are available
for stepId2 := range remainingStepIds {
if _, thereIsAnotherProducer := pipelineSteps[stepId2].producesSourceId[requiredSourceId]; thereIsAnotherProducer {
if stepId == stepId2 {
......@@ -521,10 +535,9 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) (
allConsumingSourcesAvailable = false
}
}
if !allConsumingSourcesAvailable {
break
}
}
if !allConsumingSourcesAvailable {
break // We don't even need to try the other options
}
}
......@@ -542,7 +555,7 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) (
involvedReceivers: stepInfo.involvedReceivers,
})
}
for _, sourceIdAvailableFromStepId := range availableSources[consumingSourceId] { // Add all other edges
for _, sourceIdAvailableFromStepId := range availableSources[consumingSourceId] { // Add all the others edges
edges = append(edges, PipelineEdgeInfo{
fromStepId: sourceIdAvailableFromStepId,
toStepId: stepId,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment