Commit bb036bef authored by Tim Schoof's avatar Tim Schoof
Browse files

Throw an exception when a stream stops due to errors

parent b57af207
......@@ -32,12 +32,10 @@ class Streamer:
self.worker.process(data, metadata)
except (AsapoProducerError, ConfigurationError) as err:
log.critical("Sending failed: " + str(err))
self.stop()
return False
raise err
except Exception as err:
log.exception("Worker could not process data.")
self.stop()
return False
raise err
return True
......@@ -60,8 +58,7 @@ class Streamer:
except Exception as err:
log.critical("Unhandled exception", exc_info=True)
self._handle_receiver_critical_error()
self.stop()
return None, None
raise err
self.started = True
......@@ -97,15 +94,16 @@ class Streamer:
def run(self):
log.info("Start stream processing.")
self.stopped.clear()
while not self.stopped.is_set():
success = self._process_stream()
if not success:
self.stopped.wait(self.delay_on_error)
self._increase_delay_on_error()
else:
self._reset_delay_on_error()
self._shutdown()
try:
while not self.stopped.is_set():
success = self._process_stream()
if not success:
self.stopped.wait(self.delay_on_error)
self._increase_delay_on_error()
else:
self._reset_delay_on_error()
finally:
self._shutdown()
def _increase_delay_on_error(self):
self.delay_on_error = max(
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment