Commit e2c57211 authored by Tim Schoof's avatar Tim Schoof
Browse files

Fix bugs with calculated start ids

* The default substream was used for the first worker
* The start_id options must be reset for each worker, not just the first
parent 462ca804
......@@ -39,11 +39,7 @@ class Application:
self._setup_logging()
streamer = self._create_streamer(set_start_position=True)
# The user provided start position is only valid for the initial
# substream so remove it from the options
_unset_start_position(self.options)
streamer = self._create_streamer()
with self.lock:
self.streamers[streamer.receiver.substream] = streamer
......@@ -100,7 +96,7 @@ class Application:
logging.info("Log level set to %s", log_level)
# TODO: resuse the same broker and sender objects for all threads
def _create_streamer(self, substream=None, set_start_position=False):
def _create_streamer(self, substream=None):
consumer = create_instance_from_configurable(
self.consumer_class, self.options["receiver"])
......@@ -110,13 +106,29 @@ class Application:
else:
sender = None
if set_start_position:
_set_start_position(
self.options, consumer, sender, self.worker_class)
if not substream:
if "substream" in self.options:
substream = self.options["substream"]
if substream:
log.info("Setting substream=%s", substream)
consumer.substream = substream
if sender:
sender.substream = substream
_set_start_position(
self.options, consumer, sender, self.worker_class)
worker = create_instance_from_configurable(
self.worker_class, self.options["worker"])
# The start position (provided by the user or calculated from the
# acknowledged id) was used to create the worker and must be reset.
# This means that for all but the first worker the user provided start
# id will be ignored and the start id will be calculated from the
# acknowledged ids
_unset_start_position(self.options)
if sender:
worker.sender = sender
......@@ -126,15 +138,6 @@ class Application:
else:
streamer_options = {}
if not substream:
if "substream" in self.options:
substream = self.options["substream"]
if substream:
consumer.substream = substream
if sender:
sender.substream = substream
streamer = Streamer(
consumer, worker,
end_of_stream_callback=self.start_next_callback,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment