From ba20871ec535487a27cfc85923bd6d8ae050189b Mon Sep 17 00:00:00 2001 From: karnem Date: Wed, 7 Jul 2021 13:36:37 +0200 Subject: [PATCH 1/3] Check number of unacknowledged messages before closing stream only if receiver will retry. Starting stream first, because it may take time. --- src/AsapoWorker/asapo_receiver.py | 2 ++ src/AsapoWorker/streamer.py | 6 +++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/AsapoWorker/asapo_receiver.py b/src/AsapoWorker/asapo_receiver.py index b8e020f..f138b30 100644 --- a/src/AsapoWorker/asapo_receiver.py +++ b/src/AsapoWorker/asapo_receiver.py @@ -67,6 +67,8 @@ class SimpleAsapoReceiver: "The name of the stream.", type=str, default="default", init=False) data_source = Config( "Name of input data_source", type=str, default="") + n_resend_nacs = Config( + "Number if tries to resend unacknowledged messages", type=int, default=0) @group_id.default def _generate_group_id(self): diff --git a/src/AsapoWorker/streamer.py b/src/AsapoWorker/streamer.py index da6c00b..8769ef8 100644 --- a/src/AsapoWorker/streamer.py +++ b/src/AsapoWorker/streamer.py @@ -171,12 +171,12 @@ class Streamer: except StreamFinishedError as err: log.info("Stream is finished") self.stream_finished = True - left_msgs = len(self.receiver.get_unacknowledged_messages()) - log.info("Number of unacknowledged messages = %s", left_msgs) self._handle_end_of_stream() stream_info = self.receiver.get_stream_info() self.worker.stream_finished(stream_info) - if left_msgs == 0: + left_msgs = len(self.receiver.get_unacknowledged_messages()) + log.info("Number of unacknowledged messages = %s", left_msgs) + if left_msgs == 0 or self.receiver.n_resend_nacs == 0: # Stop this instance if it can not start new instance anymore if self.end_of_stream_callback is None: self.stop() -- GitLab From 110a35e51ae3cea4806d46024a457f19a899352a Mon Sep 17 00:00:00 2001 From: karnem Date: Wed, 7 Jul 2021 13:37:59 +0200 Subject: [PATCH 2/3] Try to set starting position only if worker have corresponding function. Set stream before trying to set starting position. --- src/AsapoWorker/application.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/AsapoWorker/application.py b/src/AsapoWorker/application.py index 3f50b66..6a4c0b9 100644 --- a/src/AsapoWorker/application.py +++ b/src/AsapoWorker/application.py @@ -159,12 +159,12 @@ class Application: if not stream: stream = self.get_starting_stream(consumer) - _set_start_position( - self.options, consumer, sender, self.worker_class) log.info("Setting stream=%s", stream) consumer.stream = stream if sender: sender.stream = stream + if hasattr(self.worker_class, 'calculate_start_ids'): + _set_start_position(self.options, consumer, sender, self.worker_class) worker = create_instance_from_configurable( self.worker_class, self.options["worker"]) -- GitLab From 0ebfec4e3d998bd5dfcfa42fb7d9864732775e22 Mon Sep 17 00:00:00 2001 From: karnem Date: Thu, 8 Jul 2021 22:35:06 +0200 Subject: [PATCH 3/3] Move check if it make sense to set start position to dedicated function. --- src/AsapoWorker/application.py | 38 +++++++++++++++++----------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/src/AsapoWorker/application.py b/src/AsapoWorker/application.py index 6a4c0b9..a98b86e 100644 --- a/src/AsapoWorker/application.py +++ b/src/AsapoWorker/application.py @@ -163,8 +163,7 @@ class Application: consumer.stream = stream if sender: sender.stream = stream - if hasattr(self.worker_class, 'calculate_start_ids'): - _set_start_position(self.options, consumer, sender, self.worker_class) + _set_start_position(self.options, consumer, sender, self.worker_class) worker = create_instance_from_configurable( self.worker_class, self.options["worker"]) @@ -221,6 +220,12 @@ class Application: def _set_start_position(options, consumer, sender, worker_class): + if (not hasattr(worker_class, 'calculate_start_ids') + or not hasattr(worker_class, 'calculate_start_index')): + log.info( + "Worker does not support starting from the last processed " + "record. Starting from the beginning instead") + return if ("start_id" not in options["worker"] or "start_id" not in options["receiver"] or "start_index" not in options["worker"]): @@ -246,26 +251,21 @@ def _set_start_position(options, consumer, sender, worker_class): last_output_metadata = None if last_output_metadata: - try: - input_start_id, output_start_id = ( - worker_class.calculate_start_ids(last_output_metadata)) + input_start_id, output_start_id = ( + worker_class.calculate_start_ids(last_output_metadata)) - output_start_index = worker_class.calculate_start_index( - last_output_metadata) - except AttributeError: - log.warning( - "Worker does not support starting from the last processed " - "record. Starting from the beginning instead") - else: - consumer.set_start_id(input_start_id) + output_start_index = worker_class.calculate_start_index( + last_output_metadata) + + consumer.set_start_id(input_start_id) - log.info( - "Setting worker option start_id=%s", output_start_id) - options["worker"]["start_id"] = output_start_id + log.info( + "Setting worker option start_id=%s", output_start_id) + options["worker"]["start_id"] = output_start_id - log.info( - "Setting worker option start_index=%s", output_start_index) - options["worker"]["start_index"] = output_start_index + log.info( + "Setting worker option start_index=%s", output_start_index) + options["worker"]["start_index"] = output_start_index def _unset_start_position(options): -- GitLab