Skip to content
Snippets Groups Projects
Commit 32a793ef authored by Carsten Patzke's avatar Carsten Patzke
Browse files

Monitoring: Improved time sleep code

parent 71d86046
Branches
Tags
No related merge requests found
......@@ -77,15 +77,26 @@ func (m *brokerMonitoring) Init() error {
}
func (m *brokerMonitoring) RunThread() {
time.Sleep(5000 * time.Millisecond)
globalStart := time.Now()
sendingInterval := 5000 * time.Millisecond
time.Sleep(sendingInterval)
waitForNextIteration := func(iterationStartTime time.Time) {
timeTook := time.Since(iterationStartTime)
if timeTook < sendingInterval {
sleepTime := sendingInterval - (time.Since(globalStart) % sendingInterval)
time.Sleep(sleepTime)
}
}
for {
start := time.Now()
iterationStart := time.Now()
if !m.Sender.IsInitialized() {
err := m.reinitializeSender()
if err != nil {
log.Warning("monitoring.reinitializeSender failed " , err.Error())
elapsed := time.Since(start)
time.Sleep((5000 * time.Millisecond) - elapsed)
waitForNextIteration(iterationStart)
continue
}
}
......@@ -101,7 +112,7 @@ func (m *brokerMonitoring) RunThread() {
log.Error("reinitializeSender also failed - " + err.Error())
}
}
elapsed := time.Since(start)
time.Sleep((5000 * time.Millisecond) - elapsed)
waitForNextIteration(iterationStart)
}
}
......@@ -78,14 +78,26 @@ func (m *brokerMonitoring) Init() error {
func (m *brokerMonitoring) RunThread() {
time.Sleep(5000 * time.Millisecond)
globalStart := time.Now()
sendingInterval := 5000 * time.Millisecond
time.Sleep(sendingInterval)
waitForNextIteration := func(iterationStartTime time.Time) {
timeTook := time.Since(iterationStartTime)
if timeTook < sendingInterval {
sleepTime := sendingInterval - (time.Since(globalStart) % sendingInterval)
time.Sleep(sleepTime)
}
}
for {
start := time.Now()
iterationStart := time.Now()
if !m.Sender.IsInitialized() {
err := m.reinitializeSender()
if err != nil {
log.Warning("monitoring.reinitializeSender failed " , err.Error())
elapsed := time.Since(start)
time.Sleep((5000 * time.Millisecond) - elapsed)
waitForNextIteration(iterationStart)
continue
}
}
......@@ -101,7 +113,7 @@ func (m *brokerMonitoring) RunThread() {
log.Error("reinitializeSender also failed - " + err.Error())
}
}
elapsed := time.Since(start)
time.Sleep((5000 * time.Millisecond) - elapsed)
waitForNextIteration(iterationStart)
}
}
......@@ -19,8 +19,6 @@ SharedReceiverMonitoringClient asapo::GenerateDefaultReceiverMonitoringClient(co
return SharedReceiverMonitoringClient{new ReceiverMonitoringClientNoop};
}
std::chrono::high_resolution_clock::time_point asapo::ReceiverMonitoringClient::HelperTimeNow() {
return std::chrono::high_resolution_clock::now();
}
......
......@@ -9,7 +9,7 @@
using namespace asapo;
static const int universalSendingIntervalMs = 5000;
static const int kUniversalSendingIntervalMs = 5000;
uint64_t NowUnixTimestampMs() {
return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
......@@ -56,10 +56,11 @@ void asapo::ReceiverMonitoringClientImpl::StopSendingThread() {
}
void asapo::ReceiverMonitoringClientImpl::SendingThreadFunction() {
auto globalStartTime = ReceiverMonitoringClient::HelperTimeNow();
std::unique_ptr<ToBeSendData> localToBeSend{new ToBeSendData};
while(sendingThreadRunning__) {
auto start = HelperTimeNow();
auto iterationStart = ReceiverMonitoringClient::HelperTimeNow();
FillMemoryStats();
// Clear and swap data
......@@ -81,19 +82,22 @@ void asapo::ReceiverMonitoringClientImpl::SendingThreadFunction() {
}
size_t size = localToBeSend->container.ByteSizeLong();
auto tookTimeUs = HelperTimeDiffInMicroseconds(start);
int sleepDurationInMs = universalSendingIntervalMs - (int)(tookTimeUs/1000);
if (!err) {
log__->Debug("Sending of all monitoring data(" + std::to_string(size) + " byte) took " + std::to_string(tookTimeUs/1000) + "ms (sleeping for " + std::to_string(sleepDurationInMs) + "ms)");
} else {
log__->Info("Will try again in " + std::to_string(sleepDurationInMs) + "ms");
}
uint64_t sleepDurationInMs = WaitTimeMsUntilNextInterval(globalStartTime);
auto tookTimeMs = ReceiverMonitoringClient::HelperTimeDiffInMicroseconds(iterationStart) / 1000;
if (sleepDurationInMs < 0) {
if (tookTimeMs > kUniversalSendingIntervalMs) {
sleepDurationInMs = 0;
}
auto tookInMsStr = std::to_string(tookTimeMs);
if (!err) {
log__->Debug("Sending of all monitoring data(" + std::to_string(size) + " byte) took " + tookInMsStr+ "ms (sleeping for " + std::to_string(sleepDurationInMs) + "ms)");
} else {
log__->Info("Error took " + tookInMsStr + "ms, will try again in " + std::to_string(sleepDurationInMs) + "ms");
}
std::this_thread::sleep_for(std::chrono::milliseconds(sleepDurationInMs));
}
}
......@@ -327,3 +331,12 @@ RdsMemoryDataPoint* asapo::ReceiverMonitoringClientImpl::ToBeSendData::GetMemory
return newItem;
}
uint64_t asapo::ReceiverMonitoringClientImpl::WaitTimeMsUntilNextInterval(std::chrono::high_resolution_clock::time_point startTime) {
auto now = std::chrono::high_resolution_clock::now();
auto delta = now - startTime;
auto interval = std::chrono::milliseconds(kUniversalSendingIntervalMs);
auto result = (uint64_t) std::chrono::duration_cast<std::chrono::milliseconds>(interval - (delta % interval)).count();
return result;
}
......@@ -74,6 +74,8 @@ private:
Error ReinitializeClient();
Error GetMonitoringServerUrl(std::string* url) const;
static uint64_t WaitTimeMsUntilNextInterval(std::chrono::high_resolution_clock::time_point startTime);
public:
// Internal struct
class ToBeSendData {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment