Skip to content
Snippets Groups Projects
Forked from asapo / asapo
986 commits behind the upstream repository.
monitoring_nottested.go 2.37 KiB
package server

import (
	pb "asapo_common/generated_proto"
	log "asapo_common/logger"
	"context"
	"google.golang.org/grpc"
	"os"
	"strconv"
	"time"
)

type BrokerMonitoringDataSender interface {
	Init(serverUrl string) error
	Send(container *pb.BrokerDataPointContainer) error
	IsInitialized() bool
}

type gRPCBrokerMonitoringDataSender struct {
	client pb.AsapoMonitoringIngestServiceClient
}

func (g *gRPCBrokerMonitoringDataSender) IsInitialized() bool {
	return g.client != nil
}

func (g *gRPCBrokerMonitoringDataSender) Init(serverUrl string) error {
	var opts []grpc.DialOption
	opts = append(opts, grpc.WithInsecure())
	conn, err := grpc.Dial(serverUrl, opts...)
	if err != nil {
		return err
	}

	g.client = pb.NewAsapoMonitoringIngestServiceClient(conn)

	return nil
}

func (g *gRPCBrokerMonitoringDataSender) Send(container *pb.BrokerDataPointContainer) error {
	_, err := g.client.InsertBrokerDataPoints(context.TODO(), container)

	if err != nil {
		return err
	}

	return nil
}

func (m *brokerMonitoring) reinitializeSender() error {
	if settings.MonitoringServerUrl == "auto" {
		fetchedMonitoringServerUrl, err := discoveryService.GetMonitoringServerUrl()
		if err != nil {
			return err
		}
		m.discoveredMonitoringServerUrl = fetchedMonitoringServerUrl
	} else {
		m.discoveredMonitoringServerUrl = settings.MonitoringServerUrl
	}

	err := m.Sender.Init(m.discoveredMonitoringServerUrl)
	if err != nil {
		return err
	}

	return nil
}

func (m *brokerMonitoring) Init() error {
	hostname, err := os.Hostname()
	if err != nil {
		hostname = "hostnameerror"
	}
	m.BrokerName = "broker_" + hostname + "_" + strconv.Itoa(os.Getpid())

	return nil
}

func (m *brokerMonitoring) RunThread() {
	time.Sleep(5000 * time.Millisecond)
	for {
		start := time.Now()
		if !m.Sender.IsInitialized() {
			err := m.reinitializeSender()
			if err != nil {
				log.Warning("monitoring.reinitializeSender failed " , err.Error())
				elapsed := time.Since(start)
				time.Sleep((5000 * time.Millisecond) - elapsed)
				continue
			}
		}

		err := m.sendNow()

		if err != nil {
			logString := "sending monitoring data to '" + m.discoveredMonitoringServerUrl + "'"
			log.Error(logString + " - " + err.Error())

			err = m.reinitializeSender()
			if err != nil {
				log.Error("reinitializeSender also failed - " + err.Error())
			}
		}
		elapsed := time.Since(start)
		time.Sleep((5000 * time.Millisecond) - elapsed)
	}
}