Commit 00f78aa5 authored by Tim Schoof's avatar Tim Schoof
Browse files

Don't create new broker and producer instances

Before, new broker and producer instances were created for each
substream and immediately replaced with the instances stored in the
application object.
The large amount of new instances is unnecessary and triggered a
resource leak in the Python broker wrapper.
parent 0e5bee8d
Pipeline #1761 passed with stage
in 36 seconds
......@@ -110,24 +110,33 @@ class Application:
logging.basicConfig(level=log_level, format=format)
logging.info("Log level set to %s", log_level)
# TODO: resuse the same broker and sender objects for all threads
# TODO: resusing the same broker and sender objects for all threads relies
# on an implementation detail of create_instance_from_configurable
def _create_streamer(self, substream=None):
consumer = create_instance_from_configurable(
self.consumer_class, self.options["receiver"])
if self.receiver_broker:
consumer.broker = self.receiver_broker
kwargs = {"broker": self.receiver_broker}
else:
kwargs = {}
consumer = create_instance_from_configurable(
self.consumer_class, self.options["receiver"], kwargs=kwargs)
if not self.receiver_broker:
self.receiver_broker = consumer.broker
if self.producer_class:
sender = create_instance_from_configurable(
self.producer_class, self.options["sender"])
if self.sender_broker:
assert self.sender_producer is not None
sender.broker = self.sender_broker
sender.producer = self.sender_producer
kwargs = {
"broker": self.sender_broker,
"producer": self.sender_producer}
else:
kwargs = {}
sender = create_instance_from_configurable(
self.producer_class, self.options["sender"], kwargs=kwargs)
if not self.sender_broker:
self.sender_broker = sender.broker
self.sender_producer = sender.producer
else:
......
......@@ -165,7 +165,8 @@ def create_instance_from_configurable(
def create_instance_from_attribute(attribute, options, kwargs):
assert(attribute.name not in kwargs)
if attribute.name in kwargs:
return
config_entry = attribute.metadata[CONFIGURABLE_CONFIG_ENTRY]
if not config_entry.flatten:
options = options.get(attribute.name, {})
......
......@@ -418,7 +418,7 @@ def test_merge_flattened_config_from_builder_in_parent_cli(capsys):
"""
Flattened config options from builder function are merged with identical
config options of a parent class.
This test fails with attrs version 20.1.0 due to a regression in attrs.
The regression is fixed in attrs version 20.2.0.
"""
......@@ -454,3 +454,22 @@ def test_merge_flattened_config_from_builder_in_parent_cli(capsys):
captured_flat = capsys.readouterr()
assert captured_flattened == captured_flat
def test_create_instance_overwrite_options_via_kwargs():
options = {
"simple_attribute": 10,
"configurable_attribute": {
"untyped": "whatever"}}
simple_attribute = 11
configurable_attribute = ConfigurableClass("whatever else")
extern_attribute = ExternClass(2, 2.34)
kwargs = {
"simple_attribute": simple_attribute,
"configurable_attribute": configurable_attribute,
"extern_attribute": extern_attribute}
instance = create_instance_from_configurable(
TopLevel, options, kwargs=kwargs)
assert instance.simple_attribute is simple_attribute
assert instance.configurable_attribute is configurable_attribute
assert instance.extern_attribute is extern_attribute
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment