Commit a468298c authored by Tim Schoof's avatar Tim Schoof
Browse files

Share broker/producer instances among all streams

The current quick and dirty implementation still creates new instances
but replaces them immediately with the shared ones. This should be
improved.
parent e2c57211
......@@ -25,6 +25,9 @@ class Application:
self.worker_class = worker_class
self.consumer_class = consumer_class
self.producer_class = producer_class
self.receiver_broker = None
self.sender_broker = None
self.sender_producer = None
self.executor = ThreadPoolExecutor(max_workers=16)
self.lock = Lock()
self.verbose = verbose
......@@ -100,9 +103,21 @@ class Application:
consumer = create_instance_from_configurable(
self.consumer_class, self.options["receiver"])
if self.receiver_broker:
consumer.broker = self.receiver_broker
else:
self.receiver_broker = consumer.broker
if self.producer_class:
sender = create_instance_from_configurable(
self.producer_class, self.options["sender"])
if self.sender_broker:
assert self.sender_producer is not None
sender.broker = self.sender_broker
sender.producer = self.sender_producer
else:
self.sender_broker = sender.broker
self.sender_producer = sender.producer
else:
sender = None
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment