Commit ba20871e authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Check number of unacknowledged messages before closing stream only if receiver...

Check number of unacknowledged messages before closing stream only if receiver will retry. Starting stream first, because it may take time.
parent 03c1b6f9
......@@ -67,6 +67,8 @@ class SimpleAsapoReceiver:
"The name of the stream.", type=str, default="default", init=False)
data_source = Config(
"Name of input data_source", type=str, default="")
n_resend_nacs = Config(
"Number if tries to resend unacknowledged messages", type=int, default=0)
@group_id.default
def _generate_group_id(self):
......
......@@ -171,12 +171,12 @@ class Streamer:
except StreamFinishedError as err:
log.info("Stream is finished")
self.stream_finished = True
left_msgs = len(self.receiver.get_unacknowledged_messages())
log.info("Number of unacknowledged messages = %s", left_msgs)
self._handle_end_of_stream()
stream_info = self.receiver.get_stream_info()
self.worker.stream_finished(stream_info)
if left_msgs == 0:
left_msgs = len(self.receiver.get_unacknowledged_messages())
log.info("Number of unacknowledged messages = %s", left_msgs)
if left_msgs == 0 or self.receiver.n_resend_nacs == 0:
# Stop this instance if it can not start new instance anymore
if self.end_of_stream_callback is None:
self.stop()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment