Skip to content
Snippets Groups Projects
Commit 0ccc4be9 authored by Lars Janssen's avatar Lars Janssen
Browse files

Modified QueryServer.go to use average data from a different bucket and from newer bucket.

From "asapo_monitoring" to "asapo_monitoring_avg".
Added the averageSuffix "_avg" to Settings.go.
parent 29e0585b
No related branches found
No related tags found
No related merge requests found
......@@ -168,7 +168,26 @@ func doesStringContainsDangerousElements(input string) bool {
strings.Contains(input, "}")
}
// Modifies filter to query averaged data
func generateFilterForAVGData(s *QueryServer, filter string) string {
// Map of all modifications that a querystring needs to access averaged data.
// Currently, there is only one modification needed so this is a bit redundant.
filterModifications := make(map[string]string)
filterModifications[s.settings.InfluxDbDatabase] = s.settings.InfluxDbDatabase + averageSuffix
alteredFilter := filter
// Do all the filterModifications
for key, element := range filterModifications {
alteredFilter = strings.ReplaceAll(filter, key, element)
}
fmt.Printf("\n")
log.Debug("generateFilterForAVGData: CHANGED: ", filter, " TO: ", alteredFilter, " WITH: ", filterModifications)
fmt.Printf("\n")
return alteredFilter
}
func (s *QueryServer) GetDataPoints(ctx context.Context, query *pb.DataPointsQuery) (*pb.DataPointsResponse, error) {
log.Debug("Getting Datapoints", query)
startTime := strconv.FormatUint(query.FromTimestamp, 10)
endTime := strconv.FormatUint(query.ToTimestamp, 10)
intervalInSec := 5
......@@ -219,197 +238,293 @@ func (s *QueryServer) GetDataPoints(ctx context.Context, query *pb.DataPointsQue
response.StartTimestampInSec = query.FromTimestamp // TODO: Use first timestamp from query result
response.TimeIntervalInSec = uint32(intervalInSec)
fmt.Printf("\n")
return &response, nil
}
func queryTransferSpeed(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TotalTransferRateDataPoint, error) {
var arrayResult []*pb.TotalTransferRateDataPoint
result, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDbDatabase+"\")"+
" |> range(start: "+startTime+", stop: "+endTime+")"+
" |> filter(fn: (r) => r._measurement == \""+dbMeasurementFileInput+"\" or r._measurement == \""+dbMeasurementBrokerFileRequests+"\" or r._measurement == \""+dbMeasurementRdsFileRequests+"\" or r._measurement == \""+dbMeasurementFtsTransfers+"\")"+
" |> filter(fn: (r) => r._field == \"totalInputFileSize\" or r._field == \"totalRequestedFileSize\" or r._field == \"totalRdsOutputFileSize\" or r._field == \"totalFtsTransferredFileSize\")"+
filter+
" |> group(columns: [\"_field\"])"+
" |> aggregateWindow(every: "+strconv.Itoa(intervalInSec)+"s, fn: sum, createEmpty: true)"+
" |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")",
)
log.Debug("queryTransferSpeed")
// Querystring
query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring
" |> range(start: " + startTime + ", stop: " + endTime + ")" +
" |> filter(fn: (r) => r._measurement == \"" + dbMeasurementFileInput + "\" or r._measurement == \"" + dbMeasurementBrokerFileRequests + "\" or r._measurement == \"" + dbMeasurementRdsFileRequests + "\" or r._measurement == \"" + dbMeasurementFtsTransfers + "\")" +
" |> filter(fn: (r) => r._field == \"totalInputFileSize\" or r._field == \"totalRequestedFileSize\" or r._field == \"totalRdsOutputFileSize\" or r._field == \"totalFtsTransferredFileSize\")" +
filter +
" |> group(columns: [\"_field\"])" +
" |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: sum, createEmpty: true)" +
" |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")"
queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data
queryfunction := func(s *QueryServer, ctx context.Context, intervalInSec int, query string) ([]*pb.TotalTransferRateDataPoint, error) {
result, err := s.dbQueryApi.Query(ctx, query)
if err != nil {
log.Error("queryTransferSpeed: queryfunction: RESULT: ", result, " ERROR: ", err)
return nil, err
}
if err != nil {
return nil, err
}
var arrayResult []*pb.TotalTransferRateDataPoint
for result.Next() {
recvBytes := int64(0)
sendBytes := int64(0)
sendRdsBytes := int64(0)
sendFtsBytes := int64(0)
for result.Next() {
recvBytes := int64(0)
sendBytes := int64(0)
sendRdsBytes := int64(0)
sendFtsBytes := int64(0)
if result.Record().Values()["totalInputFileSize"] != nil {
recvBytes = result.Record().Values()["totalInputFileSize"].(int64)
}
if result.Record().Values()["totalRequestedFileSize"] != nil {
sendBytes = result.Record().Values()["totalRequestedFileSize"].(int64)
}
if result.Record().Values()["totalRdsOutputFileSize"] != nil {
sendRdsBytes = result.Record().Values()["totalRdsOutputFileSize"].(int64)
}
if result.Record().Values()["totalFtsTransferredFileSize"] != nil {
sendFtsBytes = result.Record().Values()["totalFtsTransferredFileSize"].(int64)
}
if result.Record().Values()["totalInputFileSize"] != nil {
recvBytes = result.Record().Values()["totalInputFileSize"].(int64)
}
if result.Record().Values()["totalRequestedFileSize"] != nil {
sendBytes = result.Record().Values()["totalRequestedFileSize"].(int64)
}
if result.Record().Values()["totalRdsOutputFileSize"] != nil {
sendRdsBytes = result.Record().Values()["totalRdsOutputFileSize"].(int64)
}
if result.Record().Values()["totalFtsTransferredFileSize"] != nil {
sendFtsBytes = result.Record().Values()["totalFtsTransferredFileSize"].(int64)
arrayResult = append(arrayResult, &pb.TotalTransferRateDataPoint{
TotalBytesPerSecRecv: uint64(recvBytes) / uint64(intervalInSec),
TotalBytesPerSecSend: uint64(sendBytes) / uint64(intervalInSec),
TotalBytesPerSecRdsSend: uint64(sendRdsBytes) / uint64(intervalInSec),
TotalBytesPerSecFtsSend: uint64(sendFtsBytes) / uint64(intervalInSec),
})
}
arrayResult = append(arrayResult, &pb.TotalTransferRateDataPoint{
TotalBytesPerSecRecv: uint64(recvBytes) / uint64(intervalInSec),
TotalBytesPerSecSend: uint64(sendBytes) / uint64(intervalInSec),
TotalBytesPerSecRdsSend: uint64(sendRdsBytes) / uint64(intervalInSec),
TotalBytesPerSecFtsSend: uint64(sendFtsBytes) / uint64(intervalInSec),
})
if result.Err() != nil {
log.Error("queryTransferSpeed: queryfunction: result has error: RESULT: ", result, " ERROR: ", result.Err())
return nil, result.Err()
}
return arrayResult, nil
}
if result.Err() != nil {
return nil, result.Err()
arrayResult, err := queryfunction(s, ctx, intervalInSec, query) // Query newer data
if err != nil {
log.Error("queryTransferSpeed: arrayResult: RESULT: ", arrayResult, " ERROR: ", err)
return nil, err
}
secondArrayResult, err := queryfunction(s, ctx, intervalInSec, queryForOldData) // Query averaged data
if err != nil {
log.Error("queryTransferSpeed: secondArrayResult: RESULT: ", secondArrayResult, " ERROR: ", err)
return nil, err
}
// Append the results of the second query to those of the first
for a := range secondArrayResult {
arrayResult = append(arrayResult, secondArrayResult[a])
}
fmt.Printf("\n")
return arrayResult, nil
}
func queryFileRate(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TotalFileRateDataPoint, error) {
var arrayResult []*pb.TotalFileRateDataPoint
rdsResponses, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDbDatabase+"\")"+
" |> range(start: "+startTime+", stop: "+endTime+")"+
" |> filter(fn: (r) => r._measurement == \""+dbMeasurementRdsFileRequests+"\" or r._measurement == \""+dbMeasurementBrokerFileRequests+"\" or r._measurement == \""+dbMeasurementFtsTransfers+"\")"+
" |> filter(fn: (r) => r._field == \"totalRdsHits\" or r._field == \"totalRdsMisses\" or r._field == \"requestedFileCount\" or r._field == \"ftsFileCount\")"+
filter+
" |> group(columns: [\"_field\"])"+
" |> aggregateWindow(every: "+strconv.Itoa(intervalInSec)+"s, fn: sum, createEmpty: true)"+
" |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")",
)
if err != nil {
return nil, err
}
log.Debug("queryFileRate")
// Querystrings
query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring
" |> range(start: " + startTime + ", stop: " + endTime + ")" +
" |> filter(fn: (r) => r._measurement == \"" + dbMeasurementRdsFileRequests + "\" or r._measurement == \"" + dbMeasurementBrokerFileRequests + "\" or r._measurement == \"" + dbMeasurementFtsTransfers + "\")" +
" |> filter(fn: (r) => r._field == \"totalRdsHits\" or r._field == \"totalRdsMisses\" or r._field == \"requestedFileCount\" or r._field == \"ftsFileCount\")" +
filter +
" |> group(columns: [\"_field\"])" +
" |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: sum, createEmpty: true)" +
" |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")"
queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data
queryfunction := func(s *QueryServer, ctx context.Context, query string) ([]*pb.TotalFileRateDataPoint, error) {
rdsResponses, err := s.dbQueryApi.Query(ctx, query)
if err != nil {
log.Error("queryFileRate: queryfunction: RESULT: ", rdsResponses, " ERROR: ", err)
return nil, err
}
for rdsResponses.Next() {
totalRequests := uint32(0)
hit := uint32(0)
miss := uint32(0)
fromDisk := uint32(0)
var arrayResult []*pb.TotalFileRateDataPoint
for rdsResponses.Next() {
totalRequests := uint32(0)
hit := uint32(0)
miss := uint32(0)
fromDisk := uint32(0)
if rdsResponses.Record().Values()["requestedFileCount"] != nil {
totalRequests = uint32(rdsResponses.Record().Values()["requestedFileCount"].(int64))
}
if rdsResponses.Record().Values()["totalRdsHits"] != nil {
hit = uint32(rdsResponses.Record().Values()["totalRdsHits"].(int64))
}
if rdsResponses.Record().Values()["totalRdsMisses"] != nil {
miss = uint32(rdsResponses.Record().Values()["totalRdsMisses"].(int64))
if rdsResponses.Record().Values()["requestedFileCount"] != nil {
totalRequests = uint32(rdsResponses.Record().Values()["requestedFileCount"].(int64))
}
if rdsResponses.Record().Values()["totalRdsHits"] != nil {
hit = uint32(rdsResponses.Record().Values()["totalRdsHits"].(int64))
}
if rdsResponses.Record().Values()["totalRdsMisses"] != nil {
miss = uint32(rdsResponses.Record().Values()["totalRdsMisses"].(int64))
}
if rdsResponses.Record().Values()["ftsFileCount"] != nil {
fromDisk = uint32(rdsResponses.Record().Values()["ftsFileCount"].(int64))
}
arrayResult = append(arrayResult, &pb.TotalFileRateDataPoint{
TotalRequests: totalRequests,
CacheMisses: miss,
FromCache: hit,
FromDisk: fromDisk,
})
}
if rdsResponses.Record().Values()["ftsFileCount"] != nil {
fromDisk = uint32(rdsResponses.Record().Values()["ftsFileCount"].(int64))
if rdsResponses.Err() != nil {
log.Debug("queryFileRate: queryfunction: result has error. RESULT: ", rdsResponses, " ERROR: ", rdsResponses.Err())
return nil, rdsResponses.Err()
}
arrayResult = append(arrayResult, &pb.TotalFileRateDataPoint{
TotalRequests: totalRequests,
CacheMisses: miss,
FromCache: hit,
FromDisk: fromDisk,
})
return arrayResult, nil
}
if rdsResponses.Err() != nil {
return nil, rdsResponses.Err()
arrayResult, err := queryfunction(s, ctx, query) // Query newer data
if err != nil {
log.Error("queryFileRate: arrayResult: RESULT: ", arrayResult, " ERROR: ", err)
return nil, err
}
secondArrayResult, err := queryfunction(s, ctx, queryForOldData) // Query averaged data
if err != nil {
log.Error("queryFileRate: secondArrayResult: RESULT: ", secondArrayResult, " ERROR: ", err)
return nil, err
}
// Append the results of the second query to those of the first
for a := range secondArrayResult {
arrayResult = append(arrayResult, secondArrayResult[a])
}
fmt.Printf("\n")
return arrayResult, nil
}
func queryTaskTime(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TaskTimeDataPoint, error) {
var arrayResult []*pb.TaskTimeDataPoint
result, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDbDatabase+"\")"+
" |> range(start: "+startTime+", stop: "+endTime+")"+
" |> filter(fn: (r) => r._measurement == \""+dbMeasurementFileInput+"\" or r._measurement == \""+dbMeasurementRdsFileRequests+"\")"+
" |> filter(fn: (r) => r._field == \"avgTransferReceiveTimeUs\" or r._field == \"avgWriteIoTimeUs\" or r._field == \"avgDbTimeUs\" or r._field == \"avgRdsOutputTransferTimeUs\")"+
filter+
" |> group(columns: [\"_field\"])"+
" |> aggregateWindow(every: "+strconv.Itoa(intervalInSec)+"s, fn: mean, createEmpty: true)"+
" |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")",
)
if err != nil {
return nil, err
}
log.Debug("queryTaskTime")
// Querystrings
query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring
" |> range(start: " + startTime + ", stop: " + endTime + ")" +
" |> filter(fn: (r) => r._measurement == \"" + dbMeasurementFileInput + "\" or r._measurement == \"" + dbMeasurementRdsFileRequests + "\")" +
" |> filter(fn: (r) => r._field == \"avgTransferReceiveTimeUs\" or r._field == \"avgWriteIoTimeUs\" or r._field == \"avgDbTimeUs\" or r._field == \"avgRdsOutputTransferTimeUs\")" +
filter +
" |> group(columns: [\"_field\"])" +
" |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: mean, createEmpty: true)" +
" |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")"
queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data
queryfunction := func(s *QueryServer, ctx context.Context, query string) ([]*pb.TaskTimeDataPoint, error) {
var arrayResult []*pb.TaskTimeDataPoint
result, err := s.dbQueryApi.Query(ctx, query)
if err != nil {
log.Error("queryTaskTime: queryfunction: RESULT: ", result, " ERROR: ", err)
return nil, err
}
for result.Next() {
receiveIoTimeUs := float64(0)
writeToDiskTimeUs := float64(0)
writeToDatabaseTimeUs := float64(0)
rdsOutputTransferTimeUs := float64(0)
for result.Next() {
receiveIoTimeUs := float64(0)
writeToDiskTimeUs := float64(0)
writeToDatabaseTimeUs := float64(0)
rdsOutputTransferTimeUs := float64(0)
if result.Record().Values()["avgTransferReceiveTimeUs"] != nil {
receiveIoTimeUs = result.Record().Values()["avgTransferReceiveTimeUs"].(float64)
}
if result.Record().Values()["avgWriteIoTimeUs"] != nil {
writeToDiskTimeUs = result.Record().Values()["avgWriteIoTimeUs"].(float64)
}
if result.Record().Values()["avgDbTimeUs"] != nil {
writeToDatabaseTimeUs = result.Record().Values()["avgDbTimeUs"].(float64)
if result.Record().Values()["avgTransferReceiveTimeUs"] != nil {
receiveIoTimeUs = result.Record().Values()["avgTransferReceiveTimeUs"].(float64)
}
if result.Record().Values()["avgWriteIoTimeUs"] != nil {
writeToDiskTimeUs = result.Record().Values()["avgWriteIoTimeUs"].(float64)
}
if result.Record().Values()["avgDbTimeUs"] != nil {
writeToDatabaseTimeUs = result.Record().Values()["avgDbTimeUs"].(float64)
}
if result.Record().Values()["avgRdsOutputTransferTimeUs"] != nil {
rdsOutputTransferTimeUs = result.Record().Values()["avgRdsOutputTransferTimeUs"].(float64)
}
arrayResult = append(arrayResult, &pb.TaskTimeDataPoint{
ReceiveIoTimeUs: uint32(receiveIoTimeUs),
WriteToDiskTimeUs: uint32(writeToDiskTimeUs),
WriteToDatabaseTimeUs: uint32(writeToDatabaseTimeUs),
RdsSendToConsumerTimeUs: uint32(rdsOutputTransferTimeUs),
})
}
if result.Record().Values()["avgRdsOutputTransferTimeUs"] != nil {
rdsOutputTransferTimeUs = result.Record().Values()["avgRdsOutputTransferTimeUs"].(float64)
if result.Err() != nil {
log.Error("queryTaskTime: queryfunction: result has error: RESULT: ", result, " ERROR: ", result.Err())
return nil, result.Err()
}
arrayResult = append(arrayResult, &pb.TaskTimeDataPoint{
ReceiveIoTimeUs: uint32(receiveIoTimeUs),
WriteToDiskTimeUs: uint32(writeToDiskTimeUs),
WriteToDatabaseTimeUs: uint32(writeToDatabaseTimeUs),
RdsSendToConsumerTimeUs: uint32(rdsOutputTransferTimeUs),
})
return arrayResult, nil
}
if result.Err() != nil {
return nil, result.Err()
arrayResult, err := queryfunction(s, ctx, query) // Query newer data
if err != nil {
log.Error("queryTaskTIme: arrayResult: RESULT: ", arrayResult, " ERROR: ", err)
return nil, err
}
secondArrayResult, err := queryfunction(s, ctx, queryForOldData) // Query averaged data
if err != nil {
log.Error("queryTaskTime: secondArrayResult: RESULT: ", secondArrayResult, " ERROR: ", err)
return nil, err
}
// Append the results of the second query to those of the first
for a := range secondArrayResult {
arrayResult = append(arrayResult, secondArrayResult[a])
}
fmt.Printf("\n")
return arrayResult, nil
}
func queryMemoryUsage(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.RdsMemoryUsageDataPoint, error) {
var arrayResult []*pb.RdsMemoryUsageDataPoint
result, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDbDatabase+"\")"+
" |> range(start: "+startTime+", stop: "+endTime+")"+
" |> filter(fn: (r) => r._measurement == \""+dbMeasurementRdsCacheMemoryUsage+"\")"+
" |> filter(fn: (r) => r._field == \"rdsCacheUsedBytes\")"+
filter+
" |> group(columns: [\"_field\"])"+
" |> aggregateWindow(every: "+strconv.Itoa(intervalInSec)+"s, fn: sum, createEmpty: true)"+
" |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")",
)
if err != nil {
return nil, err
}
log.Debug("queryMemoryUsage")
// Querystrings
query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring
" |> range(start: " + startTime + ", stop: " + endTime + ")" +
" |> filter(fn: (r) => r._measurement == \"" + dbMeasurementRdsCacheMemoryUsage + "\")" +
" |> filter(fn: (r) => r._field == \"rdsCacheUsedBytes\")" +
filter +
" |> group(columns: [\"_field\"])" +
" |> aggregateWindow(every: " + strconv.Itoa(intervalInSec) + "s, fn: sum, createEmpty: true)" +
" |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")"
queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data
queryfunction := func(s *QueryServer, ctx context.Context, query string) ([]*pb.RdsMemoryUsageDataPoint, error) {
var arrayResult []*pb.RdsMemoryUsageDataPoint
result, err := s.dbQueryApi.Query(ctx, query)
if err != nil {
log.Error("queryMemoryUsage: queryfunction: RESULT: ", result, " ERROR: ", err)
return nil, err
}
for result.Next() {
usedBytes := int64(0)
for result.Next() {
usedBytes := int64(0)
if result.Record().Values()["rdsCacheUsedBytes"] != nil {
usedBytes = result.Record().Values()["rdsCacheUsedBytes"].(int64)
}
if result.Record().Values()["rdsCacheUsedBytes"] != nil {
usedBytes = result.Record().Values()["rdsCacheUsedBytes"].(int64)
arrayResult = append(arrayResult, &pb.RdsMemoryUsageDataPoint{
TotalUsedMemory: uint64(usedBytes),
})
}
if result.Err() != nil {
log.Error("queryMemoryUsage: queryfunction: result has error: RESULT: ", result, " ERROR: ", result.Err())
return nil, result.Err()
}
arrayResult = append(arrayResult, &pb.RdsMemoryUsageDataPoint{
TotalUsedMemory: uint64(usedBytes),
})
return arrayResult, nil
}
if result.Err() != nil {
return nil, result.Err()
arrayResult, err := queryfunction(s, ctx, query) // Query for newer data
if err != nil {
log.Error("queryMemoryUsage: arrayResult: RESULT: ", arrayResult, " ERROR: ", err)
return nil, err
}
secondArrayResult, err := queryfunction(s, ctx, queryForOldData) // Query for averaged data
if err != nil {
log.Error("queryMemoryUsage: SecondArrayResult: RESULT: ", secondArrayResult, " ERROR: ", err)
return nil, err
}
// Append the results of the second query to those of the first
for a := range secondArrayResult {
arrayResult = append(arrayResult, secondArrayResult[a])
}
fmt.Printf("\n")
return arrayResult, nil
}
func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) (*pb.TopologyResponse, error) {
log.Debug("GetTopology")
if doesStringContainsDangerousElements(query.BeamtimeFilter) {
return nil, errors.New("some input characters are not allowed")
}
......@@ -417,18 +532,13 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) (
startTime := strconv.FormatUint(query.FromTimestamp, 10)
endTime := strconv.FormatUint(query.ToTimestamp, 10)
result, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDbDatabase+"\")"+
" |> range(start: "+startTime+", stop: "+endTime+")"+
" |> filter(fn: (r) => r._measurement == \""+dbMeasurementFileInput+"\" or r._measurement == \""+dbMeasurementBrokerFileRequests+"\")"+
" |> filter(fn: (r) => "+createFilterElement("beamtime", query.BeamtimeFilter)+")"+
" |> keep(columns: [\"receiverName\", \"brokerName\", \"pipelineStepId\", \"source\", \"producerInstanceId\", \"consumerInstanceId\"])"+
" |> group()",
)
if err != nil {
fmt.Printf("Error: %v\n", err)
return nil, err
}
queryString := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring
" |> range(start: " + startTime + ", stop: " + endTime + ")" +
" |> filter(fn: (r) => r._measurement == \"" + dbMeasurementFileInput + "\" or r._measurement == \"" + dbMeasurementBrokerFileRequests + "\")" +
" |> filter(fn: (r) => " + createFilterElement("beamtime", query.BeamtimeFilter) + ")" +
" |> keep(columns: [\"receiverName\", \"brokerName\", \"pipelineStepId\", \"source\", \"producerInstanceId\", \"consumerInstanceId\"])" +
" |> group()"
queryForOldData := generateFilterForAVGData(s, queryString) // Querystring for averaged data
type PipelineStep struct { // these maps here are replacements for HashSets
producesSourceId map[string] /*produces sourceId*/ bool
......@@ -440,244 +550,265 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) (
involvedReceivers map[string] /*receiver id*/ bool
}
pipelineSteps := map[string] /*pipelineStepId*/ PipelineStep{}
var responseFun pb.TopologyResponse
// Queryfunction that appends the results to a pb.TopologyResponse
queryfunction := func(s *QueryServer, ctx context.Context, query string, response pb.TopologyResponse) (*pb.TopologyResponse, error) {
result, err := s.dbQueryApi.Query(ctx, query)
if err != nil {
log.Error("GetTopology: queryfunction: RESULT: ", result, " ERROR: ", err)
fmt.Printf("Error: %v\n", err)
return nil, err
}
getOrCreateStep := func(stepId string) PipelineStep {
if _, containsKey := pipelineSteps[stepId]; !containsKey {
pipelineSteps[stepId] = PipelineStep{
producesSourceId: map[string] /*produces sourceId*/ bool{},
consumesSourceId: map[string] /*consumers sourceId*/ bool{},
producerInstances: map[string] /*produces instanceId*/ bool{},
consumerInstances: map[string] /*consumers instanceId*/ bool{},
involvedReceivers: map[string] /*receiver id*/ bool{},
pipelineSteps := map[string] /*pipelineStepId*/ PipelineStep{}
getOrCreateStep := func(stepId string) PipelineStep {
if _, containsKey := pipelineSteps[stepId]; !containsKey {
pipelineSteps[stepId] = PipelineStep{
producesSourceId: map[string] /*produces sourceId*/ bool{},
consumesSourceId: map[string] /*consumers sourceId*/ bool{},
producerInstances: map[string] /*produces instanceId*/ bool{},
consumerInstances: map[string] /*consumers instanceId*/ bool{},
involvedReceivers: map[string] /*receiver id*/ bool{},
}
}
return pipelineSteps[stepId]
}
return pipelineSteps[stepId]
}
for result.Next() {
if result.Record().Values()["receiverName"] != nil { // data is coming from receiver => means it must be a producer
stepId, ok := result.Record().Values()["pipelineStepId"].(string)
if !ok {
stepId = "defaultStep"
for result.Next() {
if result.Record().Values()["receiverName"] != nil { // data is coming from receiver => means it must be a producer
stepId, ok := result.Record().Values()["pipelineStepId"].(string)
if !ok {
stepId = "defaultStep"
}
source := result.Record().Values()["source"].(string)
producerInstanceId := result.Record().Values()["producerInstanceId"].(string)
var step = getOrCreateStep(stepId)
step.producesSourceId[source] = true
step.producerInstances[producerInstanceId] = true
var receiverName = result.Record().Values()["receiverName"].(string)
step.involvedReceivers[receiverName] = true
} else if result.Record().Values()["brokerName"] != nil { // data is coming from broker => means it must be a consumer
stepId := result.Record().Values()["pipelineStepId"].(string)
source := result.Record().Values()["source"].(string)
consumerInstanceId := result.Record().Values()["consumerInstanceId"].(string)
var step = getOrCreateStep(stepId)
step.consumesSourceId[source] = true
step.consumerInstances[consumerInstanceId] = true
} else {
log.Debug("Got an entry without receiverName or brokerName")
}
source := result.Record().Values()["source"].(string)
producerInstanceId := result.Record().Values()["producerInstanceId"].(string)
var step = getOrCreateStep(stepId)
step.producesSourceId[source] = true
step.producerInstances[producerInstanceId] = true
var receiverName = result.Record().Values()["receiverName"].(string)
step.involvedReceivers[receiverName] = true
} else if result.Record().Values()["brokerName"] != nil { // data is coming from broker => means it must be a consumer
stepId := result.Record().Values()["pipelineStepId"].(string)
source := result.Record().Values()["source"].(string)
consumerInstanceId := result.Record().Values()["consumerInstanceId"].(string)
var step = getOrCreateStep(stepId)
step.consumesSourceId[source] = true
step.consumerInstances[consumerInstanceId] = true
} else {
log.Debug("Got an entry without receiverName or brokerName")
}
}
type PipelineLevelStepInfo struct {
stepId string
}
type PipelineLevelStepInfo struct {
stepId string
}
type PipelineEdgeInfo struct {
fromStepId string
toStepId string
sourceId string
involvedReceivers map[string] /*receiver id*/ bool
}
type PipelineEdgeInfo struct {
fromStepId string
toStepId string
sourceId string
involvedReceivers map[string] /*receiver id*/ bool
}
// Simple algorithm
// Goes through each remainingStepIds and checks if all consumesSourceIds of this step is available
// If all consuming sources are available
// add it to the current level
// add all connecting edges
// and producing sources to availableSourcesInNextLevel
// If not go to next step, which might produces this
// When an iteration of remainingStepIds is done:
// Add all new producing sources to availableSources
// Go to next level
remainingStepIds := map[string] /*pipelineStepId*/ bool{}
availableSources := map[string] /*sourceId*/ [] /* available from pipelineStepId */ string{}
for stepId, _ := range pipelineSteps {
remainingStepIds[stepId] = true
}
var levels [] /*levelDepth*/ [] /*verticalIdx*/ PipelineLevelStepInfo
var edges []PipelineEdgeInfo
var currentLevel = -1
for len(remainingStepIds) > 0 {
currentLevel++
levels = append(levels, []PipelineLevelStepInfo{})
var foundAtLeastOne = false
availableSourcesInNextLevel := map[string] /*sourceId*/ [] /* available from pipelineStepId */ string{}
for stepId := range remainingStepIds {
var allConsumingSourcesAvailable = true
for requiredSourceId := range pipelineSteps[stepId].consumesSourceId {
if _, sourceIsAvailable := availableSources[requiredSourceId]; !sourceIsAvailable { // Oh source is unavailable
allConsumingSourcesAvailable = false
} else {
// Verify that no other producer can create this source.
// Maybe we have to wait one or two steps until all producers are available
for stepId2 := range remainingStepIds {
if _, thereIsAnotherProducer := pipelineSteps[stepId2].producesSourceId[requiredSourceId]; thereIsAnotherProducer {
if stepId == stepId2 {
continue // The pipeline has self reference?! allow it
}
allConsumingSourcesAvailable = false
}
}
}
if !allConsumingSourcesAvailable {
break // We don't even need to try the other options
}
}
// Simple algorithm
// Goes through each remainingStepIds and checks if all consumesSourceIds of this step is available
// If all consuming sources are available
// add it to the current level
// add all connecting edges
// and producing sources to availableSourcesInNextLevel
// If not go to next step, which might produces this
// When an iteration of remainingStepIds is done:
// Add all new producing sources to availableSources
// Go to next level
remainingStepIds := map[string] /*pipelineStepId*/ bool{}
availableSources := map[string] /*sourceId*/ [] /* available from pipelineStepId */ string{}
for stepId := range pipelineSteps {
remainingStepIds[stepId] = true
}
if allConsumingSourcesAvailable {
foundAtLeastOne = true
stepInfo := pipelineSteps[stepId]
var levels [] /*levelDepth*/ [] /*verticalIdx*/ PipelineLevelStepInfo
var edges []PipelineEdgeInfo
var currentLevel = -1
// Add edge connection
for consumingSourceId := range stepInfo.consumesSourceId {
if stepInfo.producesSourceId[consumingSourceId] { // Add self reference edge
edges = append(edges, PipelineEdgeInfo{
fromStepId: stepId,
toStepId: stepId,
sourceId: consumingSourceId,
involvedReceivers: stepInfo.involvedReceivers,
})
for len(remainingStepIds) > 0 {
currentLevel++
levels = append(levels, []PipelineLevelStepInfo{})
var foundAtLeastOne = false
availableSourcesInNextLevel := map[string] /*sourceId*/ [] /* available from pipelineStepId */ string{}
for stepId := range remainingStepIds {
var allConsumingSourcesAvailable = true
for requiredSourceId := range pipelineSteps[stepId].consumesSourceId {
if _, sourceIsAvailable := availableSources[requiredSourceId]; !sourceIsAvailable { // Oh source is unavailable
allConsumingSourcesAvailable = false
} else {
// Verify that no other producer can create this source.
// Maybe we have to wait one or two steps until all producers are available
for stepId2 := range remainingStepIds {
if _, thereIsAnotherProducer := pipelineSteps[stepId2].producesSourceId[requiredSourceId]; thereIsAnotherProducer {
if stepId == stepId2 {
continue // The pipeline has self reference?! allow it
}
allConsumingSourcesAvailable = false
}
}
}
for _, sourceIdAvailableFromStepId := range availableSources[consumingSourceId] { // Add all the others edges
var producingSource = pipelineSteps[sourceIdAvailableFromStepId]
edges = append(edges, PipelineEdgeInfo{
fromStepId: sourceIdAvailableFromStepId,
toStepId: stepId,
sourceId: consumingSourceId,
involvedReceivers: producingSource.involvedReceivers,
})
if !allConsumingSourcesAvailable {
break // We don't even need to try the other options
}
}
// prepare sources for next level
for sourceId := range stepInfo.producesSourceId {
if _, sourceIsAvailable := availableSourcesInNextLevel[sourceId]; !sourceIsAvailable {
availableSourcesInNextLevel[sourceId] = nil
if allConsumingSourcesAvailable {
foundAtLeastOne = true
stepInfo := pipelineSteps[stepId]
// Add edge connection
for consumingSourceId := range stepInfo.consumesSourceId {
if stepInfo.producesSourceId[consumingSourceId] { // Add self reference edge
edges = append(edges, PipelineEdgeInfo{
fromStepId: stepId,
toStepId: stepId,
sourceId: consumingSourceId,
involvedReceivers: stepInfo.involvedReceivers,
})
}
for _, sourceIdAvailableFromStepId := range availableSources[consumingSourceId] { // Add all the others edges
var producingSource = pipelineSteps[sourceIdAvailableFromStepId]
edges = append(edges, PipelineEdgeInfo{
fromStepId: sourceIdAvailableFromStepId,
toStepId: stepId,
sourceId: consumingSourceId,
involvedReceivers: producingSource.involvedReceivers,
})
}
}
availableSourcesInNextLevel[sourceId] = append(availableSourcesInNextLevel[sourceId], stepId)
}
levels[currentLevel] = append(levels[currentLevel], PipelineLevelStepInfo{
stepId: stepId,
})
delete(remainingStepIds, stepId)
}
}
// prepare sources for next level
for sourceId := range stepInfo.producesSourceId {
if _, sourceIsAvailable := availableSourcesInNextLevel[sourceId]; !sourceIsAvailable {
availableSourcesInNextLevel[sourceId] = nil
}
if !foundAtLeastOne {
// probably only requests of files came, but no receiver registered a producer
// log.Error("infinite loop while building topology tree; Still has pipeline steps but found no way how to connect them")
// return nil, errors.New("infinite loop while building topology tree; Still has pipeline steps but found no way how to connect them")
availableSourcesInNextLevel[sourceId] = append(availableSourcesInNextLevel[sourceId], stepId)
}
var unkStep = "Unknown"
levels[currentLevel] = append(levels[currentLevel], PipelineLevelStepInfo{
stepId: stepId,
})
pipelineSteps[unkStep] = PipelineStep{
producesSourceId: nil,
consumesSourceId: nil,
producerInstances: nil,
consumerInstances: nil,
involvedReceivers: nil,
delete(remainingStepIds, stepId)
}
}
levels[0] = append(levels[0], PipelineLevelStepInfo{
stepId: unkStep,
})
if !foundAtLeastOne {
// probably only requests of files came, but no receiver registered a producer
// log.Error("infinite loop while building topology tree; Still has pipeline steps but found no way how to connect them")
// return nil, errors.New("infinite loop while building topology tree; Still has pipeline steps but found no way how to connect them")
if len(levels) < 2 {
levels = append(levels, []PipelineLevelStepInfo{})
}
var unkStep = "Unknown"
for stepId := range remainingStepIds {
var step = pipelineSteps[stepId]
levels[1] = append(levels[1], PipelineLevelStepInfo{
stepId: stepId,
pipelineSteps[unkStep] = PipelineStep{
producesSourceId: nil,
consumesSourceId: nil,
producerInstances: nil,
consumerInstances: nil,
involvedReceivers: nil,
}
levels[0] = append(levels[0], PipelineLevelStepInfo{
stepId: unkStep,
})
for sourceId := range step.consumesSourceId {
edges = append(edges, PipelineEdgeInfo{
fromStepId: unkStep,
toStepId: stepId,
sourceId: sourceId,
involvedReceivers: nil,
})
if len(levels) < 2 {
levels = append(levels, []PipelineLevelStepInfo{})
}
}
break // Go to response building
}
for stepId := range remainingStepIds {
var step = pipelineSteps[stepId]
levels[1] = append(levels[1], PipelineLevelStepInfo{
stepId: stepId,
})
for sourceId, element := range availableSourcesInNextLevel {
if _, sourceIsAvailable := availableSources[sourceId]; !sourceIsAvailable {
availableSources[sourceId] = nil
for sourceId := range step.consumesSourceId {
edges = append(edges, PipelineEdgeInfo{
fromStepId: unkStep,
toStepId: stepId,
sourceId: sourceId,
involvedReceivers: nil,
})
}
}
break // Go to response building
}
for _, newSource := range element {
availableSources[sourceId] = append(availableSources[sourceId], newSource)
for sourceId, element := range availableSourcesInNextLevel {
if _, sourceIsAvailable := availableSources[sourceId]; !sourceIsAvailable {
availableSources[sourceId] = nil
}
for _, newSource := range element {
availableSources[sourceId] = append(availableSources[sourceId], newSource)
}
}
}
}
// Response building
var response pb.TopologyResponse
// Response building
for levelIndex, level := range levels {
for _, node := range level {
var producers []string = nil
var consumers []string = nil
for levelIndex, level := range levels {
for _, node := range level {
var producers []string = nil
var consumers []string = nil
for producerInstanceId := range pipelineSteps[node.stepId].producerInstances {
producers = append(producers, producerInstanceId)
}
for consumerInstanceId := range pipelineSteps[node.stepId].consumerInstances {
consumers = append(consumers, consumerInstanceId)
for producerInstanceId := range pipelineSteps[node.stepId].producerInstances {
producers = append(producers, producerInstanceId)
}
for consumerInstanceId := range pipelineSteps[node.stepId].consumerInstances {
consumers = append(consumers, consumerInstanceId)
}
response.Nodes = append(response.Nodes, &pb.TopologyResponseNode{
NodeId: node.stepId,
Level: uint32(levelIndex),
ProducerInstances: producers,
ConsumerInstances: consumers,
})
}
response.Nodes = append(response.Nodes, &pb.TopologyResponseNode{
NodeId: node.stepId,
Level: uint32(levelIndex),
ProducerInstances: producers,
ConsumerInstances: consumers,
})
}
}
for _, edge := range edges {
newEdge := pb.TopologyResponseEdge{
FromNodeId: edge.fromStepId,
ToNodeId: edge.toStepId,
SourceName: edge.sourceId,
InvolvedReceivers: nil,
}
for _, edge := range edges {
newEdge := pb.TopologyResponseEdge{
FromNodeId: edge.fromStepId,
ToNodeId: edge.toStepId,
SourceName: edge.sourceId,
InvolvedReceivers: nil,
}
for receiverId := range edge.involvedReceivers {
newEdge.InvolvedReceivers = append(newEdge.InvolvedReceivers, receiverId)
for receiverId := range edge.involvedReceivers {
newEdge.InvolvedReceivers = append(newEdge.InvolvedReceivers, receiverId)
}
response.Edges = append(response.Edges, &newEdge)
}
response.Edges = append(response.Edges, &newEdge)
return &response, nil
}
return &response, nil
firstQuery, err := queryfunction(s, ctx, queryString, responseFun) // Query newer data
if err != nil {
log.Error("GetTopology: firstQuery: RESULT: ", firstQuery, " ERROR: ", err)
return nil, err
}
secondQuery, err := queryfunction(s, ctx, queryForOldData, *firstQuery) // Query averaged data and append it to the first query
if err != nil {
log.Error("GetTopology: secondQuery: RESULT: ", secondQuery, " ERROR: ", err)
return nil, err
}
fmt.Printf("\n")
return secondQuery, nil
}
func (s *QueryServer) GetGroupDelay(ctx context.Context, query *pb.GroupDelayQuery) (*pb.GroupDelayResponse, error) {
log.Debug("GetGroupDelay")
if query.BeamtimeFilter == "" {
return nil, status.Errorf(codes.InvalidArgument, "Beamtime is required")
}
......@@ -689,33 +820,49 @@ func (s *QueryServer) GetGroupDelay(ctx context.Context, query *pb.GroupDelayQue
startTime := strconv.FormatUint(query.FromTimestamp, 10)
endTime := strconv.FormatUint(query.ToTimestamp, 10)
result, err := s.dbQueryApi.Query(ctx, "from(bucket: \""+s.settings.InfluxDbDatabase+"\")"+
" |> range(start: "+startTime+", stop: "+endTime+")"+
" |> filter(fn: (r) => r._measurement == \""+dbMeasurementBrokerFileRequests+"\""+" and r.brokerCommand == \"next\")"+
" |> filter(fn: (r) => r.beamtime == \""+query.BeamtimeFilter+"\" and r._field == \"delayMs\")"+
" |> group(columns: [\"groupId\"])"+
" |> last()"+
" |> keep(columns: [\"groupId\", \"_value\"])"+
" |> group()",
)
if err != nil {
return nil, err
}
queryString := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring
" |> range(start: " + startTime + ", stop: " + endTime + ")" +
" |> filter(fn: (r) => r._measurement == \"" + dbMeasurementBrokerFileRequests + "\"" + " and r.brokerCommand == \"next\")" +
" |> filter(fn: (r) => r.beamtime == \"" + query.BeamtimeFilter + "\" and r._field == \"delayMs\")" +
" |> group(columns: [\"groupId\"])" +
" |> last()" +
" |> keep(columns: [\"groupId\", \"_value\"])" +
" |> group()"
queryForOldData := generateFilterForAVGData(s, queryString) // Querystring for averaged data
response := pb.GroupDelayResponse{}
for result.Next() {
if result.Record().Values()["groupId"] != nil && result.Record().Value() != nil {
item := pb.GroupDelayResponseItem{
GroupId: result.Record().Values()["groupId"].(string),
DelayMs: (uint32)(result.Record().Value().(int64)),
// Queryfunction that appends the results to a pb.GroupDelayResponse
queryfunction := func(s *QueryServer, ctx context.Context, query string, response pb.GroupDelayResponse) (*pb.GroupDelayResponse, error) {
result, err := s.dbQueryApi.Query(ctx, query)
if err != nil {
log.Error("GetGroupDelay: queryfunction: RESULT: ", result, " ERROR: ", err)
return nil, err
}
for result.Next() {
if result.Record().Values()["groupId"] != nil && result.Record().Value() != nil {
item := pb.GroupDelayResponseItem{
GroupId: result.Record().Values()["groupId"].(string),
DelayMs: (uint32)(result.Record().Value().(int64)),
}
response.GroupIds = append(response.GroupIds, &item)
}
response.GroupIds = append(response.GroupIds, &item)
}
}
sort.SliceStable(response.GroupIds, func(i, j int) bool {
return response.GroupIds[i].DelayMs > response.GroupIds[j].DelayMs
})
sort.SliceStable(response.GroupIds, func(i, j int) bool {
return response.GroupIds[i].DelayMs > response.GroupIds[j].DelayMs
})
return &response, nil
return &response, nil
}
firstQuery, err := queryfunction(s, ctx, queryString, response) // Query newer data
if err != nil {
log.Error("GetGroupDelay: firstQuery: RESULT: ", firstQuery, " ERROR: ", err)
return nil, err
}
secondQuery, err := queryfunction(s, ctx, queryForOldData, *firstQuery) // Query averaged data
if err != nil {
log.Error("GetGroupDelay: secondQuery: RESULT: ", secondQuery, " ERROR: ", err)
}
fmt.Printf("\n")
return secondQuery, err
}
......@@ -31,6 +31,9 @@ const (
// [sourceMeta], ftsName, pipelineStepId, consumerInstanceId
// > ftsFileCount, totalFtsTransferredFileSize, avgTransferSendTimeUs
dbMeasurementFtsTransfers = "ftsTransfers"
// Used to get a corresponding bucket with averaged data
averageSuffix = "_avg"
)
type Settings struct {
......@@ -39,6 +42,6 @@ type Settings struct {
ServerPort uint16 // e.g. "50051"
LogLevel string // e.g. "info"
InfluxDbUrl string // e.g. "http://127.0.0.1:8086"
InfluxDbDatabase string // e.g. "asapo-monitoring"
InfluxDbUrl string // e.g. "http://127.0.0.1:8086"
InfluxDbDatabase string // e.g. "asapo-monitoring"
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment