Commit 0ebfec4e authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Move check if it make sense to set start position to dedicated function.

parent 110a35e5
Pipeline #8013 passed with stage
in 56 seconds
...@@ -163,8 +163,7 @@ class Application: ...@@ -163,8 +163,7 @@ class Application:
consumer.stream = stream consumer.stream = stream
if sender: if sender:
sender.stream = stream sender.stream = stream
if hasattr(self.worker_class, 'calculate_start_ids'): _set_start_position(self.options, consumer, sender, self.worker_class)
_set_start_position(self.options, consumer, sender, self.worker_class)
worker = create_instance_from_configurable( worker = create_instance_from_configurable(
self.worker_class, self.options["worker"]) self.worker_class, self.options["worker"])
...@@ -221,6 +220,12 @@ class Application: ...@@ -221,6 +220,12 @@ class Application:
def _set_start_position(options, consumer, sender, worker_class): def _set_start_position(options, consumer, sender, worker_class):
if (not hasattr(worker_class, 'calculate_start_ids')
or not hasattr(worker_class, 'calculate_start_index')):
log.info(
"Worker does not support starting from the last processed "
"record. Starting from the beginning instead")
return
if ("start_id" not in options["worker"] if ("start_id" not in options["worker"]
or "start_id" not in options["receiver"] or "start_id" not in options["receiver"]
or "start_index" not in options["worker"]): or "start_index" not in options["worker"]):
...@@ -246,26 +251,21 @@ def _set_start_position(options, consumer, sender, worker_class): ...@@ -246,26 +251,21 @@ def _set_start_position(options, consumer, sender, worker_class):
last_output_metadata = None last_output_metadata = None
if last_output_metadata: if last_output_metadata:
try: input_start_id, output_start_id = (
input_start_id, output_start_id = ( worker_class.calculate_start_ids(last_output_metadata))
worker_class.calculate_start_ids(last_output_metadata))
output_start_index = worker_class.calculate_start_index( output_start_index = worker_class.calculate_start_index(
last_output_metadata) last_output_metadata)
except AttributeError:
log.warning( consumer.set_start_id(input_start_id)
"Worker does not support starting from the last processed "
"record. Starting from the beginning instead")
else:
consumer.set_start_id(input_start_id)
log.info( log.info(
"Setting worker option start_id=%s", output_start_id) "Setting worker option start_id=%s", output_start_id)
options["worker"]["start_id"] = output_start_id options["worker"]["start_id"] = output_start_id
log.info( log.info(
"Setting worker option start_index=%s", output_start_index) "Setting worker option start_index=%s", output_start_index)
options["worker"]["start_index"] = output_start_index options["worker"]["start_index"] = output_start_index
def _unset_start_position(options): def _unset_start_position(options):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment