Commit b57af207 authored by Tim Schoof's avatar Tim Schoof
Browse files

Stop the application if a stream raised an exception

parent 029b7a1b
......@@ -187,19 +187,22 @@ class Application:
def cleanup_stopped_streamers(self):
with self.lock:
# A for loop would not work as we delete elements from the iterator
while True:
# get the first element
substream, future = next(iter(self.futures.items()))
finished_substreams = []
for substream, future in self.futures.items():
if future.done():
# Deleting items of the iterator here could break the loop
finished_substreams.append(substream)
err = future.exception()
if err:
log.error(
"Stream stopped with error: " + format_error(err))
"Substream {} stopped with error: ".format(
substream) + format_error(err))
# Stop application
self.stopped.set()
for substream in finished_substreams:
del self.futures[substream]
del self.streamers[substream]
else:
break
def _shutdown(self):
self.executor.shutdown()
......@@ -225,9 +228,9 @@ class Application:
else:
self.stop_old_streamers()
try:
self.cleanup_stopped_streamers()
except StopIteration:
self.cleanup_stopped_streamers()
if not self.futures:
break
self._shutdown()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment