Commit c20bd3ee authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Propagate end-of-streams flag

parent 03ffe9d2
Pipeline #6473 failed with stage
......@@ -157,8 +157,6 @@ class Streamer:
self._handle_receiver_temporary_error()
self._handle_end_of_stream()
if self.stream_finished:
stream_info = self.receiver.get_stream_info()
self.worker.stream_finished(stream_info)
if self.end_of_stream_callback is None:
self.stop()
return None, None
......@@ -176,9 +174,9 @@ class Streamer:
left_msgs = len(self.receiver.get_unacknowledged_messages())
log.info("Number of unacknowledged messages = %s", left_msgs)
self._handle_end_of_stream()
stream_info = self.receiver.get_stream_info()
self.worker.stream_finished(stream_info)
if left_msgs == 0:
stream_info = self.receiver.get_stream_info()
self.worker.stream_finished(stream_info)
# Stop this instance if it can not start new instance anymore
if self.end_of_stream_callback is None:
self.stop()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment