Commit bef43bbb authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Move check if it make sense to set start position to dedicated function.

parent 110a35e5
Pipeline #8010 passed with stage
in 51 seconds
...@@ -163,8 +163,7 @@ class Application: ...@@ -163,8 +163,7 @@ class Application:
consumer.stream = stream consumer.stream = stream
if sender: if sender:
sender.stream = stream sender.stream = stream
if hasattr(self.worker_class, 'calculate_start_ids'): _set_start_position(self.options, consumer, sender, self.worker_class)
_set_start_position(self.options, consumer, sender, self.worker_class)
worker = create_instance_from_configurable( worker = create_instance_from_configurable(
self.worker_class, self.options["worker"]) self.worker_class, self.options["worker"])
...@@ -221,6 +220,8 @@ class Application: ...@@ -221,6 +220,8 @@ class Application:
def _set_start_position(options, consumer, sender, worker_class): def _set_start_position(options, consumer, sender, worker_class):
if not hasattr(worker_class, 'calculate_start_ids'):
return
if ("start_id" not in options["worker"] if ("start_id" not in options["worker"]
or "start_id" not in options["receiver"] or "start_id" not in options["receiver"]
or "start_index" not in options["worker"]): or "start_index" not in options["worker"]):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment