Commit 110a35e5 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Try to set starting position only if worker have corresponding function. Set...

Try to set starting position only if worker have corresponding function. Set stream before trying to set starting position.
parent ba20871e
Pipeline #7944 passed with stage
in 59 seconds
......@@ -159,12 +159,12 @@ class Application:
if not stream:
stream = self.get_starting_stream(consumer)
_set_start_position(
self.options, consumer, sender, self.worker_class)
log.info("Setting stream=%s", stream)
consumer.stream = stream
if sender:
sender.stream = stream
if hasattr(self.worker_class, 'calculate_start_ids'):
_set_start_position(self.options, consumer, sender, self.worker_class)
worker = create_instance_from_configurable(
self.worker_class, self.options["worker"])
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment