Commit 0f4590a9 authored by Tim Schoof's avatar Tim Schoof
Browse files

Slow down streamer on error

Use stop events for fast shutdown despite the increasingly long delays.
parent a468298c
......@@ -4,7 +4,7 @@ from collections import OrderedDict
import logging
import signal
import sys
from threading import Lock
from threading import Lock, Event
import time
from AsapoWorker.configuration import (
create_instance_from_configurable,
......@@ -35,7 +35,7 @@ class Application:
self.initialized = False
self.streamers = OrderedDict()
self.futures = OrderedDict()
self.stopped = False
self.stopped = Event()
def initialize(self):
self._parse_options()
......@@ -48,7 +48,7 @@ class Application:
self.streamers[streamer.receiver.substream] = streamer
def signalhandler(signum, frame):
self.stopped = True
self.stopped.set()
signal.signal(signal.SIGINT, signalhandler)
signal.signal(signal.SIGTERM, signalhandler)
......@@ -115,7 +115,7 @@ class Application:
assert self.sender_producer is not None
sender.broker = self.sender_broker
sender.producer = self.sender_producer
else:
else:
self.sender_broker = sender.broker
self.sender_producer = sender.producer
else:
......@@ -172,7 +172,7 @@ class Application:
def start_stream_thread(self, substream):
with self.lock:
if substream not in self.streamers and not self.stopped:
if substream not in self.streamers and not self.stopped.is_set():
log.info("Starting new substream=%s", substream)
streamer = self._create_streamer(substream=substream)
future = self.executor.submit(streamer.run)
......@@ -210,17 +210,20 @@ class Application:
# start initial streamer
with self.lock:
if not self.stopped:
for streamer in self.streamers.values():
self.futures[streamer.receiver.substream] = (
if not self.stopped.is_set():
for substream, streamer in self.streamers.items():
log.info(
"Starting new substream=%s",
streamer.receiver.substream)
self.futures[substream] = (
self.executor.submit(streamer.run))
while True:
time.sleep(0.5)
if self.stopped:
if self.stopped.wait(0.5):
self.stop_old_streamers(n_max=0)
self.stop_old_streamers()
time.sleep(0.5) # replace wait which now returns immediately
else:
self.stop_old_streamers()
try:
self.cleanup_stopped_streamers()
......
import logging
import time
from threading import Event
from asapo_producer import AsapoProducerError
from AsapoWorker.errors import (
TemporaryError, MissingDataError, ConfigurationError, EndOfStreamError,
......@@ -15,10 +16,11 @@ class Streamer:
end_of_stream_callback=None):
self.receiver = receiver
self.worker = worker
self.initial_delay_on_error = delay_on_error
self.delay_on_error = delay_on_error
self.end_of_stream_callback = end_of_stream_callback
self.started = False
self.stopped = False
self.stopped = Event()
def _process_stream(self):
data, metadata = self._get_next()
......@@ -65,8 +67,6 @@ class Streamer:
return data, metadata
# TODO: slow down to not keep polling the broker
# (but speed up if stream continues)
def _handle_end_of_stream(self):
if not self.started:
# if the stream is already at the end without ever processing data,
......@@ -96,19 +96,28 @@ class Streamer:
def run(self):
log.info("Start stream processing.")
self.stopped = False
while not self.stopped:
self.stopped.clear()
while not self.stopped.is_set():
success = self._process_stream()
if self.stopped:
break
if not success and self.delay_on_error:
time.sleep(self.delay_on_error)
if not success:
self.stopped.wait(self.delay_on_error)
self._increase_delay_on_error()
else:
self._reset_delay_on_error()
self._shutdown()
def _increase_delay_on_error(self):
self.delay_on_error = max(
10*(self.initial_delay_on_error),
self.delay_on_error + self.initial_delay_on_error)
def _reset_delay_on_error(self):
self.delay_on_error = self.initial_delay_on_error
def stop(self):
log.info("Stopping stream processing.")
self.stopped = True
self.stopped.set()
def _shutdown(self):
log.info("Cleaning up.")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment