Skip to content
Snippets Groups Projects
Commit 607d8585 authored by Lars Janssen's avatar Lars Janssen
Browse files

Reverted GetTopology and GetGroupDelay

parent 531318af
Branches
Tags
No related merge requests found
......@@ -5,6 +5,7 @@ import (
log "asapo_common/logger"
"context"
"errors"
"fmt"
"github.com/influxdata/influxdb-client-go/v2/api"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
......@@ -500,13 +501,18 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) (
startTime := strconv.FormatUint(query.FromTimestamp, 10)
endTime := strconv.FormatUint(query.ToTimestamp, 10)
queryString := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring
" |> range(start: " + startTime + ", stop: " + endTime + ")" +
" |> filter(fn: (r) => r._measurement == \"" + dbMeasurementFileInput + "\" or r._measurement == \"" + dbMeasurementBrokerFileRequests + "\")" +
" |> filter(fn: (r) => " + createFilterElement("beamtime", query.BeamtimeFilter) + ")" +
" |> keep(columns: [\"receiverName\", \"brokerName\", \"pipelineStepId\", \"source\", \"producerInstanceId\", \"consumerInstanceId\"])" +
" |> group()"
queryForOldData := generateFilterForAVGData(s, queryString) // Querystring for averaged data
result, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDbDatabase+"\")"+
" |> range(start: "+startTime+", stop: "+endTime+")"+
" |> filter(fn: (r) => r._measurement == \""+dbMeasurementFileInput+"\" or r._measurement == \""+dbMeasurementBrokerFileRequests+"\")"+
" |> filter(fn: (r) => "+createFilterElement("beamtime", query.BeamtimeFilter)+")"+
" |> keep(columns: [\"receiverName\", \"brokerName\", \"pipelineStepId\", \"source\", \"producerInstanceId\", \"consumerInstanceId\"])"+
" |> group()",
)
if err != nil {
fmt.Printf("Error: %v\n", err)
return nil, err
}
type PipelineStep struct { // these maps here are replacements for HashSets
producesSourceId map[string] /*produces sourceId*/ bool
......@@ -518,259 +524,244 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) (
involvedReceivers map[string] /*receiver id*/ bool
}
var responseFun pb.TopologyResponse
// Queryfunction that appends the results to a pb.TopologyResponse
queryfunction := func(s *QueryServer, ctx context.Context, query string, response pb.TopologyResponse) (*pb.TopologyResponse, error) {
result, err := s.dbQueryApi.Query(ctx, query)
if err != nil {
return nil, err
}
pipelineSteps := map[string] /*pipelineStepId*/ PipelineStep{}
pipelineSteps := map[string] /*pipelineStepId*/ PipelineStep{}
getOrCreateStep := func(stepId string) PipelineStep {
if _, containsKey := pipelineSteps[stepId]; !containsKey {
pipelineSteps[stepId] = PipelineStep{
producesSourceId: map[string] /*produces sourceId*/ bool{},
consumesSourceId: map[string] /*consumers sourceId*/ bool{},
producerInstances: map[string] /*produces instanceId*/ bool{},
consumerInstances: map[string] /*consumers instanceId*/ bool{},
involvedReceivers: map[string] /*receiver id*/ bool{},
}
getOrCreateStep := func(stepId string) PipelineStep {
if _, containsKey := pipelineSteps[stepId]; !containsKey {
pipelineSteps[stepId] = PipelineStep{
producesSourceId: map[string] /*produces sourceId*/ bool{},
consumesSourceId: map[string] /*consumers sourceId*/ bool{},
producerInstances: map[string] /*produces instanceId*/ bool{},
consumerInstances: map[string] /*consumers instanceId*/ bool{},
involvedReceivers: map[string] /*receiver id*/ bool{},
}
return pipelineSteps[stepId]
}
return pipelineSteps[stepId]
}
for result.Next() {
if result.Record().Values()["receiverName"] != nil { // data is coming from receiver => means it must be a producer
stepId, ok := result.Record().Values()["pipelineStepId"].(string)
if !ok {
stepId = "defaultStep"
}
source := result.Record().Values()["source"].(string)
producerInstanceId := result.Record().Values()["producerInstanceId"].(string)
var step = getOrCreateStep(stepId)
step.producesSourceId[source] = true
step.producerInstances[producerInstanceId] = true
var receiverName = result.Record().Values()["receiverName"].(string)
step.involvedReceivers[receiverName] = true
} else if result.Record().Values()["brokerName"] != nil { // data is coming from broker => means it must be a consumer
stepId := result.Record().Values()["pipelineStepId"].(string)
source := result.Record().Values()["source"].(string)
consumerInstanceId := result.Record().Values()["consumerInstanceId"].(string)
var step = getOrCreateStep(stepId)
step.consumesSourceId[source] = true
step.consumerInstances[consumerInstanceId] = true
} else {
log.Debug("Got an entry without receiverName or brokerName")
for result.Next() {
if result.Record().Values()["receiverName"] != nil { // data is coming from receiver => means it must be a producer
stepId, ok := result.Record().Values()["pipelineStepId"].(string)
if !ok {
stepId = "defaultStep"
}
source := result.Record().Values()["source"].(string)
producerInstanceId := result.Record().Values()["producerInstanceId"].(string)
var step = getOrCreateStep(stepId)
step.producesSourceId[source] = true
step.producerInstances[producerInstanceId] = true
var receiverName = result.Record().Values()["receiverName"].(string)
step.involvedReceivers[receiverName] = true
} else if result.Record().Values()["brokerName"] != nil { // data is coming from broker => means it must be a consumer
stepId := result.Record().Values()["pipelineStepId"].(string)
source := result.Record().Values()["source"].(string)
consumerInstanceId := result.Record().Values()["consumerInstanceId"].(string)
var step = getOrCreateStep(stepId)
step.consumesSourceId[source] = true
step.consumerInstances[consumerInstanceId] = true
} else {
log.Debug("Got an entry without receiverName or brokerName")
}
}
type PipelineLevelStepInfo struct {
stepId string
}
type PipelineEdgeInfo struct {
fromStepId string
toStepId string
sourceId string
involvedReceivers map[string] /*receiver id*/ bool
}
// Simple algorithm
// Goes through each remainingStepIds and checks if all consumesSourceIds of this step is available
// If all consuming sources are available
// add it to the current level
// add all connecting edges
// and producing sources to availableSourcesInNextLevel
// If not go to next step, which might produces this
// When an iteration of remainingStepIds is done:
// Add all new producing sources to availableSources
// Go to next level
remainingStepIds := map[string] /*pipelineStepId*/ bool{}
availableSources := map[string] /*sourceId*/ [] /* available from pipelineStepId */ string{}
for stepId := range pipelineSteps {
remainingStepIds[stepId] = true
}
var levels [] /*levelDepth*/ [] /*verticalIdx*/ PipelineLevelStepInfo
var edges []PipelineEdgeInfo
var currentLevel = -1
type PipelineLevelStepInfo struct {
stepId string
}
for len(remainingStepIds) > 0 {
currentLevel++
levels = append(levels, []PipelineLevelStepInfo{})
type PipelineEdgeInfo struct {
fromStepId string
toStepId string
sourceId string
involvedReceivers map[string] /*receiver id*/ bool
}
var foundAtLeastOne = false
availableSourcesInNextLevel := map[string] /*sourceId*/ [] /* available from pipelineStepId */ string{}
for stepId := range remainingStepIds {
var allConsumingSourcesAvailable = true
for requiredSourceId := range pipelineSteps[stepId].consumesSourceId {
if _, sourceIsAvailable := availableSources[requiredSourceId]; !sourceIsAvailable { // Oh source is unavailable
allConsumingSourcesAvailable = false
} else {
// Verify that no other producer can create this source.
// Maybe we have to wait one or two steps until all producers are available
for stepId2 := range remainingStepIds {
if _, thereIsAnotherProducer := pipelineSteps[stepId2].producesSourceId[requiredSourceId]; thereIsAnotherProducer {
if stepId == stepId2 {
continue // The pipeline has self reference?! allow it
}
allConsumingSourcesAvailable = false
// Simple algorithm
// Goes through each remainingStepIds and checks if all consumesSourceIds of this step is available
// If all consuming sources are available
// add it to the current level
// add all connecting edges
// and producing sources to availableSourcesInNextLevel
// If not go to next step, which might produces this
// When an iteration of remainingStepIds is done:
// Add all new producing sources to availableSources
// Go to next level
remainingStepIds := map[string] /*pipelineStepId*/ bool{}
availableSources := map[string] /*sourceId*/ [] /* available from pipelineStepId */ string{}
for stepId, _ := range pipelineSteps {
remainingStepIds[stepId] = true
}
var levels [] /*levelDepth*/ [] /*verticalIdx*/ PipelineLevelStepInfo
var edges []PipelineEdgeInfo
var currentLevel = -1
for len(remainingStepIds) > 0 {
currentLevel++
levels = append(levels, []PipelineLevelStepInfo{})
var foundAtLeastOne = false
availableSourcesInNextLevel := map[string] /*sourceId*/ [] /* available from pipelineStepId */ string{}
for stepId := range remainingStepIds {
var allConsumingSourcesAvailable = true
for requiredSourceId := range pipelineSteps[stepId].consumesSourceId {
if _, sourceIsAvailable := availableSources[requiredSourceId]; !sourceIsAvailable { // Oh source is unavailable
allConsumingSourcesAvailable = false
} else {
// Verify that no other producer can create this source.
// Maybe we have to wait one or two steps until all producers are available
for stepId2 := range remainingStepIds {
if _, thereIsAnotherProducer := pipelineSteps[stepId2].producesSourceId[requiredSourceId]; thereIsAnotherProducer {
if stepId == stepId2 {
continue // The pipeline has self reference?! allow it
}
allConsumingSourcesAvailable = false
}
}
if !allConsumingSourcesAvailable {
break // We don't even need to try the other options
}
}
if !allConsumingSourcesAvailable {
break // We don't even need to try the other options
}
}
if allConsumingSourcesAvailable {
foundAtLeastOne = true
stepInfo := pipelineSteps[stepId]
// Add edge connection
for consumingSourceId := range stepInfo.consumesSourceId {
if stepInfo.producesSourceId[consumingSourceId] { // Add self reference edge
edges = append(edges, PipelineEdgeInfo{
fromStepId: stepId,
toStepId: stepId,
sourceId: consumingSourceId,
involvedReceivers: stepInfo.involvedReceivers,
})
}
for _, sourceIdAvailableFromStepId := range availableSources[consumingSourceId] { // Add all the others edges
var producingSource = pipelineSteps[sourceIdAvailableFromStepId]
edges = append(edges, PipelineEdgeInfo{
fromStepId: sourceIdAvailableFromStepId,
toStepId: stepId,
sourceId: consumingSourceId,
involvedReceivers: producingSource.involvedReceivers,
})
}
}
// prepare sources for next level
for sourceId := range stepInfo.producesSourceId {
if _, sourceIsAvailable := availableSourcesInNextLevel[sourceId]; !sourceIsAvailable {
availableSourcesInNextLevel[sourceId] = nil
}
if allConsumingSourcesAvailable {
foundAtLeastOne = true
stepInfo := pipelineSteps[stepId]
availableSourcesInNextLevel[sourceId] = append(availableSourcesInNextLevel[sourceId], stepId)
// Add edge connection
for consumingSourceId := range stepInfo.consumesSourceId {
if stepInfo.producesSourceId[consumingSourceId] { // Add self reference edge
edges = append(edges, PipelineEdgeInfo{
fromStepId: stepId,
toStepId: stepId,
sourceId: consumingSourceId,
involvedReceivers: stepInfo.involvedReceivers,
})
}
for _, sourceIdAvailableFromStepId := range availableSources[consumingSourceId] { // Add all the others edges
var producingSource = pipelineSteps[sourceIdAvailableFromStepId]
edges = append(edges, PipelineEdgeInfo{
fromStepId: sourceIdAvailableFromStepId,
toStepId: stepId,
sourceId: consumingSourceId,
involvedReceivers: producingSource.involvedReceivers,
})
}
levels[currentLevel] = append(levels[currentLevel], PipelineLevelStepInfo{
stepId: stepId,
})
delete(remainingStepIds, stepId)
}
}
if !foundAtLeastOne {
// probably only requests of files came, but no receiver registered a producer
// log.Error("infinite loop while building topology tree; Still has pipeline steps but found no way how to connect them")
// return nil, errors.New("infinite loop while building topology tree; Still has pipeline steps but found no way how to connect them")
var unkStep = "Unknown"
// prepare sources for next level
for sourceId := range stepInfo.producesSourceId {
if _, sourceIsAvailable := availableSourcesInNextLevel[sourceId]; !sourceIsAvailable {
availableSourcesInNextLevel[sourceId] = nil
}
pipelineSteps[unkStep] = PipelineStep{
producesSourceId: nil,
consumesSourceId: nil,
producerInstances: nil,
consumerInstances: nil,
involvedReceivers: nil,
availableSourcesInNextLevel[sourceId] = append(availableSourcesInNextLevel[sourceId], stepId)
}
levels[0] = append(levels[0], PipelineLevelStepInfo{
stepId: unkStep,
levels[currentLevel] = append(levels[currentLevel], PipelineLevelStepInfo{
stepId: stepId,
})
if len(levels) < 2 {
levels = append(levels, []PipelineLevelStepInfo{})
}
delete(remainingStepIds, stepId)
}
}
for stepId := range remainingStepIds {
var step = pipelineSteps[stepId]
levels[1] = append(levels[1], PipelineLevelStepInfo{
stepId: stepId,
})
if !foundAtLeastOne {
// probably only requests of files came, but no receiver registered a producer
// log.Error("infinite loop while building topology tree; Still has pipeline steps but found no way how to connect them")
// return nil, errors.New("infinite loop while building topology tree; Still has pipeline steps but found no way how to connect them")
for sourceId := range step.consumesSourceId {
edges = append(edges, PipelineEdgeInfo{
fromStepId: unkStep,
toStepId: stepId,
sourceId: sourceId,
involvedReceivers: nil,
})
}
}
var unkStep = "Unknown"
break // Go to response building
pipelineSteps[unkStep] = PipelineStep{
producesSourceId: nil,
consumesSourceId: nil,
producerInstances: nil,
consumerInstances: nil,
involvedReceivers: nil,
}
for sourceId, element := range availableSourcesInNextLevel {
if _, sourceIsAvailable := availableSources[sourceId]; !sourceIsAvailable {
availableSources[sourceId] = nil
}
levels[0] = append(levels[0], PipelineLevelStepInfo{
stepId: unkStep,
})
for _, newSource := range element {
availableSources[sourceId] = append(availableSources[sourceId], newSource)
}
if len(levels) < 2 {
levels = append(levels, []PipelineLevelStepInfo{})
}
}
// Response building
for levelIndex, level := range levels {
for _, node := range level {
var producers []string = nil
var consumers []string = nil
for stepId := range remainingStepIds {
var step = pipelineSteps[stepId]
levels[1] = append(levels[1], PipelineLevelStepInfo{
stepId: stepId,
})
for producerInstanceId := range pipelineSteps[node.stepId].producerInstances {
producers = append(producers, producerInstanceId)
}
for consumerInstanceId := range pipelineSteps[node.stepId].consumerInstances {
consumers = append(consumers, consumerInstanceId)
for sourceId := range step.consumesSourceId {
edges = append(edges, PipelineEdgeInfo{
fromStepId: unkStep,
toStepId: stepId,
sourceId: sourceId,
involvedReceivers: nil,
})
}
response.Nodes = append(response.Nodes, &pb.TopologyResponseNode{
NodeId: node.stepId,
Level: uint32(levelIndex),
ProducerInstances: producers,
ConsumerInstances: consumers,
})
}
break // Go to response building
}
for _, edge := range edges {
newEdge := pb.TopologyResponseEdge{
FromNodeId: edge.fromStepId,
ToNodeId: edge.toStepId,
SourceName: edge.sourceId,
InvolvedReceivers: nil,
for sourceId, element := range availableSourcesInNextLevel {
if _, sourceIsAvailable := availableSources[sourceId]; !sourceIsAvailable {
availableSources[sourceId] = nil
}
for receiverId := range edge.involvedReceivers {
newEdge.InvolvedReceivers = append(newEdge.InvolvedReceivers, receiverId)
for _, newSource := range element {
availableSources[sourceId] = append(availableSources[sourceId], newSource)
}
response.Edges = append(response.Edges, &newEdge)
}
return &response, nil
}
firstQuery, err := queryfunction(s, ctx, queryString, responseFun) // Query newer data
if err != nil {
return nil, err
// Response building
var response pb.TopologyResponse
for levelIndex, level := range levels {
for _, node := range level {
var producers []string = nil
var consumers []string = nil
for producerInstanceId := range pipelineSteps[node.stepId].producerInstances {
producers = append(producers, producerInstanceId)
}
for consumerInstanceId := range pipelineSteps[node.stepId].consumerInstances {
consumers = append(consumers, consumerInstanceId)
}
response.Nodes = append(response.Nodes, &pb.TopologyResponseNode{
NodeId: node.stepId,
Level: uint32(levelIndex),
ProducerInstances: producers,
ConsumerInstances: consumers,
})
}
}
secondQuery, err := queryfunction(s, ctx, queryForOldData, *firstQuery) // Query averaged data and append it to the first query
if err != nil {
return nil, err
for _, edge := range edges {
newEdge := pb.TopologyResponseEdge{
FromNodeId: edge.fromStepId,
ToNodeId: edge.toStepId,
SourceName: edge.sourceId,
InvolvedReceivers: nil,
}
for receiverId := range edge.involvedReceivers {
newEdge.InvolvedReceivers = append(newEdge.InvolvedReceivers, receiverId)
}
response.Edges = append(response.Edges, &newEdge)
}
return secondQuery, nil
return &response, nil
}
func (s *QueryServer) GetGroupDelay(ctx context.Context, query *pb.GroupDelayQuery) (*pb.GroupDelayResponse, error) {
if query.BeamtimeFilter == "" {
return nil, status.Errorf(codes.InvalidArgument, "Beamtime is required")
}
......@@ -782,46 +773,33 @@ func (s *QueryServer) GetGroupDelay(ctx context.Context, query *pb.GroupDelayQue
startTime := strconv.FormatUint(query.FromTimestamp, 10)
endTime := strconv.FormatUint(query.ToTimestamp, 10)
queryString := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring
" |> range(start: " + startTime + ", stop: " + endTime + ")" +
" |> filter(fn: (r) => r._measurement == \"" + dbMeasurementBrokerFileRequests + "\"" + " and r.brokerCommand == \"next\")" +
" |> filter(fn: (r) => r.beamtime == \"" + query.BeamtimeFilter + "\" and r._field == \"delayMs\")" +
" |> group(columns: [\"groupId\"])" +
" |> last()" +
" |> keep(columns: [\"groupId\", \"_value\"])" +
" |> group()"
queryForOldData := generateFilterForAVGData(s, queryString) // Querystring for averaged data
response := pb.GroupDelayResponse{}
// Queryfunction that appends the results to a pb.GroupDelayResponse
queryfunction := func(s *QueryServer, ctx context.Context, query string, response pb.GroupDelayResponse) (*pb.GroupDelayResponse, error) {
result, err := s.dbQueryApi.Query(ctx, query)
if err != nil {
return nil, err
}
result, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDbDatabase+"\")"+
" |> range(start: "+startTime+", stop: "+endTime+")"+
" |> filter(fn: (r) => r._measurement == \""+dbMeasurementBrokerFileRequests+"\""+" and r.brokerCommand == \"next\")"+
" |> filter(fn: (r) => r.beamtime == \""+query.BeamtimeFilter+"\" and r._field == \"delayMs\")"+
" |> group(columns: [\"groupId\"])"+
" |> last()"+
" |> keep(columns: [\"groupId\", \"_value\"])"+
" |> group()",
)
if err != nil {
return nil, err
}
for result.Next() {
if result.Record().Values()["groupId"] != nil && result.Record().Value() != nil {
item := pb.GroupDelayResponseItem{
GroupId: result.Record().Values()["groupId"].(string),
DelayMs: (uint32)(result.Record().Value().(int64)),
}
response.GroupIds = append(response.GroupIds, &item)
response := pb.GroupDelayResponse{}
for result.Next() {
if result.Record().Values()["groupId"] != nil && result.Record().Value() != nil {
item := pb.GroupDelayResponseItem{
GroupId: result.Record().Values()["groupId"].(string),
DelayMs: (uint32)(result.Record().Value().(int64)),
}
response.GroupIds = append(response.GroupIds, &item)
}
}
sort.SliceStable(response.GroupIds, func(i, j int) bool {
return response.GroupIds[i].DelayMs > response.GroupIds[j].DelayMs
})
sort.SliceStable(response.GroupIds, func(i, j int) bool {
return response.GroupIds[i].DelayMs > response.GroupIds[j].DelayMs
})
return &response, nil
}
firstQuery, err := queryfunction(s, ctx, queryString, response) // Query newer data
if err != nil {
return nil, err
}
secondQuery, err := queryfunction(s, ctx, queryForOldData, *firstQuery) // Query averaged data
if err != nil {
return nil, err
}
return secondQuery, err
return &response, nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment