diff --git a/monitoring/monitoring_server/src/asapo_monitoring_server/main/main.go b/monitoring/monitoring_server/src/asapo_monitoring_server/main/main.go index 720b69c2403c584ec60a9ee9d6d7429fdc3aa5a7..68707f2a1b5581e4988f35190b2ae9043c220ec1 100644 --- a/monitoring/monitoring_server/src/asapo_monitoring_server/main/main.go +++ b/monitoring/monitoring_server/src/asapo_monitoring_server/main/main.go @@ -53,6 +53,28 @@ func loadConfig(configFileName string) (log.Level, server.Settings, error) { return logLevel, settings, nil } +func postTODB(queryPostString string, settings server.Settings) { + data := url.Values{} + data.Set("q", queryPostString) + + res, err := http.Post( + settings.InfluxDbUrl+"/query", "application/x-www-form-urlencoded", + strings.NewReader(data.Encode())) + + if err != nil { + log.Fatal(err.Error()) + return + } + if res.StatusCode != 200 { + rawResponse, _ := ioutil.ReadAll(res.Body) + strResponse := string(rawResponse) + + log.Fatal("failed to do the post, post was: " + queryPostString + + " and status code was " + strconv.Itoa(res.StatusCode) + ": " + strResponse) + return + } +} + func main() { var configFileName = flag.String("config", "", "config file path") @@ -77,41 +99,22 @@ func main() { } // InfluxDB 1 fix, create database // Does multiple HTTP posts to DB - // It creates two databases, one retention policy and one continuous query on settings.InfluxdbDatabase - postTODB := func(queryPostString string) { - data := url.Values{} - data.Set("q", queryPostString) - - res, err := http.Post( - settings.InfluxDbUrl+"/query", "application/x-www-form-urlencoded", - strings.NewReader(data.Encode())) - - if err != nil { - log.Fatal(err.Error()) - return - } - if res.StatusCode != 200 { - rawResponse, _ := ioutil.ReadAll(res.Body) - strResponse := string(rawResponse) - - log.Fatal("failed to do the post, post was: " + queryPostString + - " and status code was " + strconv.Itoa(res.StatusCode) + ": " + strResponse) - return - } - } + // It creates two databases. normalDB := settings.InfluxDbDatabase - avgDB := settings.InfluxDbDatabase + "_avg" + avgDB := normalDB + "_avg" + rpName := "one_day" + cqName := "avg_values" postStrings := []string{ "CREATE DATABASE " + normalDB, "CREATE DATABASE " + avgDB, - "CREATE RETENTION POLICY one_day ON " + normalDB + + "CREATE RETENTION POLICY " + rpName + " ON " + normalDB + " DURATION 24h REPLICATION 1 DEFAULT", - "CREATE CONTINUOUS QUERY ON " + normalDB + // TODO avoid probable error if rp and qc are already excising - "BEGIN SELECT mean(*) INTO " + avgDB + - ".one_day.:MEASUREMENT FROM /.*/ GROUP BY time(12h) END", + "CREATE CONTINUOUS QUERY " + cqName + " ON " + normalDB + " BEGIN" + + " SELECT mean(*) INTO " + avgDB + + "..:MEASUREMENT FROM /.*/ GROUP BY time(12h) END", } for i := 0; i < len(postStrings); i++ { - postTODB(postStrings[i]) + postTODB(postStrings[i], settings) } log.SetLevel(logLevel) diff --git a/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go b/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go index 6a7f6aedffe22a7b97e066726c2753c718cc0cc2..158347cfe1bc7140fe7c2bfd32f6d46717fc2fda 100644 --- a/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go +++ b/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go @@ -237,6 +237,49 @@ func (s *QueryServer) GetDataPoints(ctx context.Context, query *pb.DataPointsQue return &response, nil } +// Queries DB for transferspeed +func queryfunctionTransferSpeed(s *QueryServer, ctx context.Context, intervalInSec int, query string) ([]*pb.TotalTransferRateDataPoint, error) { + result, err := s.dbQueryApi.Query(ctx, query) + + if err != nil { + return nil, err + } + + var arrayResult []*pb.TotalTransferRateDataPoint + for result.Next() { + recvBytes := int64(0) + sendBytes := int64(0) + sendRdsBytes := int64(0) + sendFtsBytes := int64(0) + + if result.Record().Values()["totalInputFileSize"] != nil { + recvBytes = result.Record().Values()["totalInputFileSize"].(int64) + } + if result.Record().Values()["totalRequestedFileSize"] != nil { + sendBytes = result.Record().Values()["totalRequestedFileSize"].(int64) + } + if result.Record().Values()["totalRdsOutputFileSize"] != nil { + sendRdsBytes = result.Record().Values()["totalRdsOutputFileSize"].(int64) + } + if result.Record().Values()["totalFtsTransferredFileSize"] != nil { + sendFtsBytes = result.Record().Values()["totalFtsTransferredFileSize"].(int64) + } + + arrayResult = append(arrayResult, &pb.TotalTransferRateDataPoint{ + TotalBytesPerSecRecv: uint64(recvBytes) / uint64(intervalInSec), + TotalBytesPerSecSend: uint64(sendBytes) / uint64(intervalInSec), + TotalBytesPerSecRdsSend: uint64(sendRdsBytes) / uint64(intervalInSec), + TotalBytesPerSecFtsSend: uint64(sendFtsBytes) / uint64(intervalInSec), + }) + } + + if result.Err() != nil { + return nil, result.Err() + } + return arrayResult, nil +} + +// Queries DB for transferspeed, using both the normal and the average bucket func queryTransferSpeed(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TotalTransferRateDataPoint, error) { // Querystring query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring @@ -249,52 +292,11 @@ func queryTransferSpeed(s *QueryServer, ctx context.Context, startTime string, e " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data - queryfunction := func(s *QueryServer, ctx context.Context, intervalInSec int, query string) ([]*pb.TotalTransferRateDataPoint, error) { - result, err := s.dbQueryApi.Query(ctx, query) - - if err != nil { - return nil, err - } - - var arrayResult []*pb.TotalTransferRateDataPoint - for result.Next() { - recvBytes := int64(0) - sendBytes := int64(0) - sendRdsBytes := int64(0) - sendFtsBytes := int64(0) - - if result.Record().Values()["totalInputFileSize"] != nil { - recvBytes = result.Record().Values()["totalInputFileSize"].(int64) - } - if result.Record().Values()["totalRequestedFileSize"] != nil { - sendBytes = result.Record().Values()["totalRequestedFileSize"].(int64) - } - if result.Record().Values()["totalRdsOutputFileSize"] != nil { - sendRdsBytes = result.Record().Values()["totalRdsOutputFileSize"].(int64) - } - if result.Record().Values()["totalFtsTransferredFileSize"] != nil { - sendFtsBytes = result.Record().Values()["totalFtsTransferredFileSize"].(int64) - } - - arrayResult = append(arrayResult, &pb.TotalTransferRateDataPoint{ - TotalBytesPerSecRecv: uint64(recvBytes) / uint64(intervalInSec), - TotalBytesPerSecSend: uint64(sendBytes) / uint64(intervalInSec), - TotalBytesPerSecRdsSend: uint64(sendRdsBytes) / uint64(intervalInSec), - TotalBytesPerSecFtsSend: uint64(sendFtsBytes) / uint64(intervalInSec), - }) - } - - if result.Err() != nil { - return nil, result.Err() - } - return arrayResult, nil - } - - arrayResult, err := queryfunction(s, ctx, intervalInSec, query) // Query newer data + arrayResult, err := queryfunctionTransferSpeed(s, ctx, intervalInSec, query) // Query newer data if err != nil { return nil, err } - secondArrayResult, err := queryfunction(s, ctx, intervalInSec, queryForOldData) // Query averaged data + secondArrayResult, err := queryfunctionTransferSpeed(s, ctx, intervalInSec, queryForOldData) // Query averaged data if err != nil { return nil, err } @@ -305,6 +307,49 @@ func queryTransferSpeed(s *QueryServer, ctx context.Context, startTime string, e return arrayResult, nil } +// Queries DB for Filerate +func queryfunctionFileRate(s *QueryServer, ctx context.Context, query string) ([]*pb.TotalFileRateDataPoint, error) { + rdsResponses, err := s.dbQueryApi.Query(ctx, query) + if err != nil { + return nil, err + } + + var arrayResult []*pb.TotalFileRateDataPoint + for rdsResponses.Next() { + totalRequests := uint32(0) + hit := uint32(0) + miss := uint32(0) + fromDisk := uint32(0) + + if rdsResponses.Record().Values()["requestedFileCount"] != nil { + totalRequests = uint32(rdsResponses.Record().Values()["requestedFileCount"].(int64)) + } + if rdsResponses.Record().Values()["totalRdsHits"] != nil { + hit = uint32(rdsResponses.Record().Values()["totalRdsHits"].(int64)) + } + if rdsResponses.Record().Values()["totalRdsMisses"] != nil { + miss = uint32(rdsResponses.Record().Values()["totalRdsMisses"].(int64)) + } + if rdsResponses.Record().Values()["ftsFileCount"] != nil { + fromDisk = uint32(rdsResponses.Record().Values()["ftsFileCount"].(int64)) + } + + arrayResult = append(arrayResult, &pb.TotalFileRateDataPoint{ + TotalRequests: totalRequests, + CacheMisses: miss, + FromCache: hit, + FromDisk: fromDisk, + }) + } + + if rdsResponses.Err() != nil { + return nil, rdsResponses.Err() + } + + return arrayResult, nil +} + +// Queries DB for Filerate, using both the normal and the average bucket func queryFileRate(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TotalFileRateDataPoint, error) { // Querystrings query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring @@ -317,52 +362,11 @@ func queryFileRate(s *QueryServer, ctx context.Context, startTime string, endTim " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data - queryfunction := func(s *QueryServer, ctx context.Context, query string) ([]*pb.TotalFileRateDataPoint, error) { - rdsResponses, err := s.dbQueryApi.Query(ctx, query) - if err != nil { - return nil, err - } - - var arrayResult []*pb.TotalFileRateDataPoint - for rdsResponses.Next() { - totalRequests := uint32(0) - hit := uint32(0) - miss := uint32(0) - fromDisk := uint32(0) - - if rdsResponses.Record().Values()["requestedFileCount"] != nil { - totalRequests = uint32(rdsResponses.Record().Values()["requestedFileCount"].(int64)) - } - if rdsResponses.Record().Values()["totalRdsHits"] != nil { - hit = uint32(rdsResponses.Record().Values()["totalRdsHits"].(int64)) - } - if rdsResponses.Record().Values()["totalRdsMisses"] != nil { - miss = uint32(rdsResponses.Record().Values()["totalRdsMisses"].(int64)) - } - if rdsResponses.Record().Values()["ftsFileCount"] != nil { - fromDisk = uint32(rdsResponses.Record().Values()["ftsFileCount"].(int64)) - } - - arrayResult = append(arrayResult, &pb.TotalFileRateDataPoint{ - TotalRequests: totalRequests, - CacheMisses: miss, - FromCache: hit, - FromDisk: fromDisk, - }) - } - - if rdsResponses.Err() != nil { - return nil, rdsResponses.Err() - } - - return arrayResult, nil - } - - arrayResult, err := queryfunction(s, ctx, query) // Query newer data + arrayResult, err := queryfunctionFileRate(s, ctx, query) // Query newer data if err != nil { return nil, err } - secondArrayResult, err := queryfunction(s, ctx, queryForOldData) // Query averaged data + secondArrayResult, err := queryfunctionFileRate(s, ctx, queryForOldData) // Query averaged data if err != nil { return nil, err } @@ -373,6 +377,49 @@ func queryFileRate(s *QueryServer, ctx context.Context, startTime string, endTim return arrayResult, nil } +// Queries DB for Tasktime +func queryfunctionTaskTime(s *QueryServer, ctx context.Context, query string) ([]*pb.TaskTimeDataPoint, error) { + var arrayResult []*pb.TaskTimeDataPoint + result, err := s.dbQueryApi.Query(ctx, query) + if err != nil { + return nil, err + } + + for result.Next() { + receiveIoTimeUs := float64(0) + writeToDiskTimeUs := float64(0) + writeToDatabaseTimeUs := float64(0) + rdsOutputTransferTimeUs := float64(0) + + if result.Record().Values()["avgTransferReceiveTimeUs"] != nil { + receiveIoTimeUs = result.Record().Values()["avgTransferReceiveTimeUs"].(float64) + } + if result.Record().Values()["avgWriteIoTimeUs"] != nil { + writeToDiskTimeUs = result.Record().Values()["avgWriteIoTimeUs"].(float64) + } + if result.Record().Values()["avgDbTimeUs"] != nil { + writeToDatabaseTimeUs = result.Record().Values()["avgDbTimeUs"].(float64) + } + if result.Record().Values()["avgRdsOutputTransferTimeUs"] != nil { + rdsOutputTransferTimeUs = result.Record().Values()["avgRdsOutputTransferTimeUs"].(float64) + } + + arrayResult = append(arrayResult, &pb.TaskTimeDataPoint{ + ReceiveIoTimeUs: uint32(receiveIoTimeUs), + WriteToDiskTimeUs: uint32(writeToDiskTimeUs), + WriteToDatabaseTimeUs: uint32(writeToDatabaseTimeUs), + RdsSendToConsumerTimeUs: uint32(rdsOutputTransferTimeUs), + }) + } + + if result.Err() != nil { + return nil, result.Err() + } + + return arrayResult, nil +} + +// Queries DB for Tasktime, using both the normal and the average bucket func queryTaskTime(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.TaskTimeDataPoint, error) { // Querystrings query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring @@ -385,52 +432,11 @@ func queryTaskTime(s *QueryServer, ctx context.Context, startTime string, endTim " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data - queryfunction := func(s *QueryServer, ctx context.Context, query string) ([]*pb.TaskTimeDataPoint, error) { - var arrayResult []*pb.TaskTimeDataPoint - result, err := s.dbQueryApi.Query(ctx, query) - if err != nil { - return nil, err - } - - for result.Next() { - receiveIoTimeUs := float64(0) - writeToDiskTimeUs := float64(0) - writeToDatabaseTimeUs := float64(0) - rdsOutputTransferTimeUs := float64(0) - - if result.Record().Values()["avgTransferReceiveTimeUs"] != nil { - receiveIoTimeUs = result.Record().Values()["avgTransferReceiveTimeUs"].(float64) - } - if result.Record().Values()["avgWriteIoTimeUs"] != nil { - writeToDiskTimeUs = result.Record().Values()["avgWriteIoTimeUs"].(float64) - } - if result.Record().Values()["avgDbTimeUs"] != nil { - writeToDatabaseTimeUs = result.Record().Values()["avgDbTimeUs"].(float64) - } - if result.Record().Values()["avgRdsOutputTransferTimeUs"] != nil { - rdsOutputTransferTimeUs = result.Record().Values()["avgRdsOutputTransferTimeUs"].(float64) - } - - arrayResult = append(arrayResult, &pb.TaskTimeDataPoint{ - ReceiveIoTimeUs: uint32(receiveIoTimeUs), - WriteToDiskTimeUs: uint32(writeToDiskTimeUs), - WriteToDatabaseTimeUs: uint32(writeToDatabaseTimeUs), - RdsSendToConsumerTimeUs: uint32(rdsOutputTransferTimeUs), - }) - } - - if result.Err() != nil { - return nil, result.Err() - } - - return arrayResult, nil - } - - arrayResult, err := queryfunction(s, ctx, query) // Query newer data + arrayResult, err := queryfunctionTaskTime(s, ctx, query) // Query newer data if err != nil { return nil, err } - secondArrayResult, err := queryfunction(s, ctx, queryForOldData) // Query averaged data + secondArrayResult, err := queryfunctionTaskTime(s, ctx, queryForOldData) // Query averaged data if err != nil { return nil, err } @@ -441,6 +447,33 @@ func queryTaskTime(s *QueryServer, ctx context.Context, startTime string, endTim return arrayResult, nil } +// Queries DB for Memory usage +func queryfunctionMemoryUsage(s *QueryServer, ctx context.Context, query string) ([]*pb.RdsMemoryUsageDataPoint, error) { + var arrayResult []*pb.RdsMemoryUsageDataPoint + + result, err := s.dbQueryApi.Query(ctx, query) + if err != nil { + return nil, err + } + for result.Next() { + usedBytes := int64(0) + + if result.Record().Values()["rdsCacheUsedBytes"] != nil { + usedBytes = result.Record().Values()["rdsCacheUsedBytes"].(int64) + } + + arrayResult = append(arrayResult, &pb.RdsMemoryUsageDataPoint{ + TotalUsedMemory: uint64(usedBytes), + }) + } + if result.Err() != nil { + return nil, result.Err() + } + + return arrayResult, nil +} + +// Queries DB for Memory usage, using both the normal and the average bucket func queryMemoryUsage(s *QueryServer, ctx context.Context, startTime string, endTime string, intervalInSec int, filter string) ([]*pb.RdsMemoryUsageDataPoint, error) { // Querystrings query := "from(bucket: \"" + s.settings.InfluxDbDatabase + "\")" + // Normal querystring @@ -453,36 +486,11 @@ func queryMemoryUsage(s *QueryServer, ctx context.Context, startTime string, end " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" queryForOldData := generateFilterForAVGData(s, query) // Querystring for averaged data - queryfunction := func(s *QueryServer, ctx context.Context, query string) ([]*pb.RdsMemoryUsageDataPoint, error) { - var arrayResult []*pb.RdsMemoryUsageDataPoint - - result, err := s.dbQueryApi.Query(ctx, query) - if err != nil { - return nil, err - } - for result.Next() { - usedBytes := int64(0) - - if result.Record().Values()["rdsCacheUsedBytes"] != nil { - usedBytes = result.Record().Values()["rdsCacheUsedBytes"].(int64) - } - - arrayResult = append(arrayResult, &pb.RdsMemoryUsageDataPoint{ - TotalUsedMemory: uint64(usedBytes), - }) - } - if result.Err() != nil { - return nil, result.Err() - } - - return arrayResult, nil - } - - arrayResult, err := queryfunction(s, ctx, query) // Query for newer data + arrayResult, err := queryfunctionMemoryUsage(s, ctx, query) // Query for newer data if err != nil { return nil, err } - secondArrayResult, err := queryfunction(s, ctx, queryForOldData) // Query for averaged data + secondArrayResult, err := queryfunctionMemoryUsage(s, ctx, queryForOldData) // Query for averaged data if err != nil { return nil, err }