Commit 7282977c authored by Tim Schoof's avatar Tim Schoof
Browse files

Extract code from Application into StreamerManager

This makes it easier to use the theaded handling of substreams in other
applications.
parent 9a86b3ea
import argparse
from concurrent.futures import ThreadPoolExecutor
from collections import OrderedDict
import logging
import signal
import sys
from threading import Lock, Event
import time
from AsapoWorker.configuration import (
create_instance_from_configurable,
create_cli_parser_from_configurable_class,
parsed_args_to_dict)
from AsapoWorker.streamer import Streamer
from AsapoWorker.streamer_manager import StreamerManager
from AsapoWorker.errors import StreamError
from AsapoWorker.utils import format_error, not_last_n
log = logging.getLogger(__name__)
......@@ -28,27 +24,21 @@ class Application:
self.receiver_broker = None
self.sender_broker = None
self.sender_producer = None
self.executor = ThreadPoolExecutor(max_workers=16)
self.lock = Lock()
self.options = None
self.streamer_manager = StreamerManager(self._create_streamer)
self.verbose = verbose
self.log_level = log_level
self.initialized = False
self.streamers = OrderedDict()
self.futures = OrderedDict()
self.stopped = Event()
def initialize(self):
self._parse_options()
self._setup_logging()
streamer = self._create_streamer()
with self.lock:
self.streamers[streamer.receiver.substream] = streamer
self.streamer_manager.initialize()
def signalhandler(signum, frame):
self.stopped.set()
self.streamer_manager.stop()
signal.signal(signal.SIGINT, signalhandler)
signal.signal(signal.SIGTERM, signalhandler)
......@@ -154,87 +144,17 @@ class Application:
streamer_options = {}
streamer = Streamer(
consumer, worker,
end_of_stream_callback=self.start_next_callback,
**streamer_options)
consumer, worker,
end_of_stream_callback=self.streamer_manager.start_next_callback,
**streamer_options)
return streamer
def start_next_callback(self, substream):
try:
next_substream = str(int(substream) + 1)
except ValueError:
log.warning(
"Cannot calculate next substream from non-integer value "
"substream=%s", substream)
return
self.start_stream_thread(next_substream)
def start_stream_thread(self, substream):
with self.lock:
if substream not in self.streamers and not self.stopped.is_set():
log.info("Starting new substream=%s", substream)
streamer = self._create_streamer(substream=substream)
future = self.executor.submit(streamer.run)
self.streamers[substream] = streamer
self.futures[substream] = future
def stop_old_streamers(self, n_max=10):
with self.lock:
for streamer in not_last_n(n_max, self.streamers.values()):
log.info("Stopping substream=%s", streamer.receiver.substream)
streamer.stop()
def cleanup_stopped_streamers(self):
with self.lock:
finished_substreams = []
for substream, future in self.futures.items():
if future.done():
# Deleting items of the iterator here could break the loop
finished_substreams.append(substream)
err = future.exception()
if err:
log.error(
"Substream {} stopped with error: ".format(
substream) + format_error(err))
# Stop application
self.stopped.set()
for substream in finished_substreams:
del self.futures[substream]
del self.streamers[substream]
def _shutdown(self):
self.executor.shutdown()
def run(self):
if not self.initialized:
self.initialize()
# start initial streamer
with self.lock:
if not self.stopped.is_set():
for substream, streamer in self.streamers.items():
log.info(
"Starting new substream=%s",
streamer.receiver.substream)
self.futures[substream] = (
self.executor.submit(streamer.run))
while True:
if self.stopped.wait(0.5):
self.stop_old_streamers(n_max=0)
time.sleep(0.5) # replace wait which now returns immediately
else:
self.stop_old_streamers()
self.cleanup_stopped_streamers()
if not self.futures:
break
self._shutdown()
self.streamer_manager.run()
def _set_start_position(options, consumer, sender, worker_class):
......@@ -291,4 +211,4 @@ def _unset_start_position(options):
try:
del options[section][key]
except KeyError:
pass
\ No newline at end of file
pass
from concurrent.futures import ThreadPoolExecutor
from collections import OrderedDict
import logging
import time
from threading import Lock, Event
from AsapoWorker.utils import format_error, not_last_n
log = logging.getLogger(__name__)
class StreamerManager:
def __init__(self, create_streamer):
self._create_streamer = create_streamer
self.executor = ThreadPoolExecutor(max_workers=16)
self.streamers = OrderedDict()
self.futures = OrderedDict()
self.stopped = Event()
self.lock = Lock()
self.initialized = False
def initialize(self):
# Create first streamer to trigger errors with create_streamer early
streamer = self._create_streamer()
with self.lock:
self.streamers[streamer.receiver.substream] = streamer
self.initialized = True
def start_next_callback(self, substream):
try:
next_substream = str(int(substream) + 1)
except ValueError:
log.warning(
"Cannot calculate next substream from non-integer value "
"substream=%s", substream)
return
self.start_substream_thread(next_substream)
def start_substream_thread(self, substream):
with self.lock:
if substream not in self.streamers and not self.stopped.is_set():
log.info("Starting new substream=%s", substream)
streamer = self._create_streamer(substream=substream)
future = self.executor.submit(streamer.run)
self.streamers[substream] = streamer
self.futures[substream] = future
def stop_old_streamers(self, n_max=10):
with self.lock:
for streamer in not_last_n(n_max, self.streamers.values()):
log.info("Stopping substream=%s", streamer.receiver.substream)
streamer.stop()
def cleanup_stopped_streamers(self):
with self.lock:
finished_substreams = []
for substream, future in self.futures.items():
if future.done():
# Deleting items of the iterator here could break the loop
finished_substreams.append(substream)
err = future.exception()
if err:
log.error(
"Substream {} stopped with error: ".format(
substream) + format_error(err))
# Stop application
self.stopped.set()
for substream in finished_substreams:
del self.futures[substream]
del self.streamers[substream]
def stop(self):
self.stopped.set()
def _shutdown(self):
self.executor.shutdown()
def run(self):
if not self.initialized:
self.initialize()
# start initial streamer
with self.lock:
if not self.stopped.is_set():
for substream, streamer in self.streamers.items():
log.info(
"Starting new substream=%s",
streamer.receiver.substream)
self.futures[substream] = (
self.executor.submit(streamer.run))
while True:
if self.stopped.wait(0.5):
self.stop_old_streamers(n_max=0)
time.sleep(0.5) # replace wait which now returns immediately
else:
self.stop_old_streamers()
self.cleanup_stopped_streamers()
if not self.futures:
break
self._shutdown()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment