Commit 24d22b6d authored by Tim Schoof's avatar Tim Schoof
Browse files

Add option for max number of streams

parent 525081c8
......@@ -9,9 +9,13 @@ log = logging.getLogger(__name__)
class StreamerManager:
def __init__(self, create_streamer):
def __init__(self, create_streamer, max_streams=16):
self._create_streamer = create_streamer
self.executor = ThreadPoolExecutor(max_workers=16)
self.max_streams = max_streams
# Temporarily, there can be more than max_streams threads,
# therefore the thread pool should be a bit larger
# (number of additional threads chosen by fair dice roll)
self.executor = ThreadPoolExecutor(max_workers=max_streams + 4)
self.streamers = OrderedDict()
self.futures = OrderedDict()
self.stopped = Event()
......@@ -46,7 +50,7 @@ class StreamerManager:
self.streamers[substream] = streamer
self.futures[substream] = future
def stop_old_streamers(self, n_max=10):
def stop_old_streamers(self, n_max):
with self.lock:
for streamer in not_last_n(n_max, self.streamers.values()):
log.info("Stopping substream=%s", streamer.receiver.substream)
......@@ -96,7 +100,7 @@ class StreamerManager:
self.stop_old_streamers(n_max=0)
time.sleep(0.5) # replace wait which now returns immediately
else:
self.stop_old_streamers()
self.stop_old_streamers(n_max=self.max_streams)
self.cleanup_stopped_streamers()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment