Commit cdc97ad2 authored by Tim Schoof's avatar Tim Schoof
Browse files

Introduce post_scan method for workers

A worker can implement a post_scan method to receive stream info and
stream meta at the end of a stream.
As the end of a stream cannot be determined reliably, the post_scan
method can be called multiple times even before the stream actually
ends. It is also not guaranteed that the post_scan method is called when
the stream info or stream meta changes.
parent 35eb72dc
Pipeline #26944 passed with stage
in 51 seconds
......@@ -114,6 +114,8 @@ class Streamer:
self.fix_metadata_stream = fix_metadata_stream
self.stopped = Event()
self.stream_naming_scheme = stream_naming_scheme
self.stream_info = None
self.stream_meta = None
def _process_stream(self):
data, metadata = self._get_next()
......@@ -199,6 +201,9 @@ class Streamer:
# stream list
# -> stream is likely finished or was skipped
# -> start next stream + reduce polling rate
# Always try to call the post_scan method
if self.likely_done:
# nothing is left to be done
......@@ -268,6 +273,33 @@ class Streamer:
def _handle_receiver_critical_error(self):
def _call_post_scan(self):
# TODO: This is a very WIP implementation which overlaps with the
# stream_finished functionality.
# The idea is to call worker.post_scan after each possibly last message
# but not on every EndOfStreamError or StreamFinishedError.
if hasattr(self.worker, "post_scan") and (self.stream_info is None or self.stream_meta is None):
# Initially, or if there have been messages since the last call then
# self.stream_info or self.stream_meta are unset
# set them and call the post_scan method once"Getting stream info and meta for stream '%s'",
self.stream_info = self.receiver.get_stream_info()
except StreamError:
log.warning("Getting stream info for stream '%s' failed",
self.stream_meta = self.receiver.get_stream_meta()
except StreamError:
# TODO: Distinguish no stream meta from connection error or similar
log.warning("Getting stream meta for stream '%s' failed",
self.stream_meta = {}
if self.stream_info:
# Otherwise, the stream doesn't have data
self.worker.post_scan(self.stream_info, self.stream_meta)
except Exception:
log.error("Processing post scan for stream '%s' failed",, exc_info=True)
def run(self):
threading.current_thread().name = "stream_{}".format(
......@@ -298,6 +330,11 @@ class Streamer:
if success:
# When a new message arrived, unset self.stream_info and self.stream_meta
# so that the post_scan method can be called again for the next likely last message
self.stream_info = None
self.stream_meta = None
except Exception as e:
log.error("Streamer fails with error: %s", e, exc_info=True)
......@@ -325,6 +362,7 @@ class Streamer:
def _shutdown(self):"Cleaning up.")
if self.worker.sender:
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment