Commit 2ade74b0 authored by Tim Schoof's avatar Tim Schoof
Browse files

Implement substreams using threads

parent 4c310fef
import argparse
from concurrent.futures import ThreadPoolExecutor
from collections import OrderedDict
import logging
import signal
import sys
from threading import Lock
import time
from AsapoWorker.configuration import (
create_instance_from_configurable,
create_cli_parser_from_configurable_class,
parsed_args_to_dict)
from AsapoWorker.streamer import Streamer
from AsapoWorker.errors import StreamError
from AsapoWorker.utils import format_error, not_last_n
log = logging.getLogger(__name__)
class Application:
......@@ -18,11 +25,38 @@ class Application:
self.worker_class = worker_class
self.consumer_class = consumer_class
self.producer_class = producer_class
self.executor = ThreadPoolExecutor(max_workers=16)
self.lock = Lock()
self.verbose = verbose
self.log_level = log_level
self.initialized = False
self.streamers = OrderedDict()
self.futures = OrderedDict()
self.stopped = False
def initialize(self):
self._parse_options()
self._setup_logging()
streamer = self._create_streamer(set_start_position=True)
# The user provided start position is only valid for the initial
# substream so remove it from the options
_unset_start_position(self.options)
with self.lock:
self.streamers[streamer.receiver.substream] = streamer
def signalhandler(signum, frame):
self.stopped = True
signal.signal(signal.SIGINT, signalhandler)
signal.signal(signal.SIGTERM, signalhandler)
self.initialized = True
def _parse_options(self):
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
......@@ -40,6 +74,10 @@ class Application:
"--delay_on_error", type=float, default=1, metavar="FLOAT",
help="When an error occurs, wait so many seconds before retrying")
parser.add_argument(
"--substream", type=str, default="default", metavar="STR",
help="The substream where streaming should start")
create_cli_parser_from_configurable_class(
self.consumer_class, parser, prefix="receiver.")
create_cli_parser_from_configurable_class(
......@@ -51,54 +89,128 @@ class Application:
parsed_args = parser.parse_args(sys.argv[1:])
options = parsed_args_to_dict(parsed_args)
self.options = parsed_args_to_dict(parsed_args)
log_level = options["log_level"]
def _setup_logging(self):
log_level = self.options["log_level"]
format = (
"%(asctime)s AsapoWorker %(filename)s:%(lineno)d "
"%(levelname)-8s %(message)s")
logging.basicConfig(level=log_level, format=format)
logging.info("Log level set to %s", log_level)
# TODO: resuse the same broker and sender objects for all threads
def _create_streamer(self, substream=None, set_start_position=False):
consumer = create_instance_from_configurable(
self.consumer_class, options["receiver"])
self.consumer_class, self.options["receiver"])
if self.producer_class:
sender = create_instance_from_configurable(
self.producer_class, options["sender"])
self.producer_class, self.options["sender"])
else:
sender = None
_set_start_position(
options, consumer, sender, self.worker_class)
if set_start_position:
_set_start_position(
self.options, consumer, sender, self.worker_class)
worker = create_instance_from_configurable(
self.worker_class, options["worker"])
self.worker_class, self.options["worker"])
if sender:
worker.sender = sender
if "delay_on_error" in options:
streamer_options = {"delay_on_error": options["delay_on_error"]}
if "delay_on_error" in self.options:
streamer_options = {
"delay_on_error": self.options["delay_on_error"]}
else:
streamer_options = {}
self.streamer = Streamer(
consumer, worker, **streamer_options)
if not substream:
if "substream" in self.options:
substream = self.options["substream"]
def signalhandler(signum, frame):
self.streamer.stop()
if substream:
consumer.substream = substream
if sender:
sender.substream = substream
signal.signal(signal.SIGINT, signalhandler)
signal.signal(signal.SIGTERM, signalhandler)
streamer = Streamer(
consumer, worker,
end_of_stream_callback=self.start_next_callback,
**streamer_options)
self.initialized = True
return streamer
def start_next_callback(self, substream):
try:
next_substream = str(int(substream) + 1)
except ValueError:
log.warning(
"Cannot calculate next substream from non-integer value "
"substream=%s", substream)
return
self.start_stream_thread(next_substream)
def start_stream_thread(self, substream):
with self.lock:
if substream not in self.streamers and not self.stopped:
log.info("Starting new substream=%s", substream)
streamer = self._create_streamer(substream=substream)
future = self.executor.submit(streamer.run)
self.streamers[substream] = streamer
self.futures[substream] = future
def stop_old_streamers(self, n_max=10):
with self.lock:
for streamer in not_last_n(n_max, self.streamers.values()):
log.info("Stopping substream=%s", streamer.receiver.substream)
streamer.stop()
def cleanup_stopped_streamers(self):
with self.lock:
# A for loop would not work as we delete elements from the iterator
while True:
# get the first element
substream, future = next(iter(self.futures.items()))
if future.done():
err = future.exception()
if err:
log.error(
"Stream stopped with error: " + format_error(err))
del self.futures[substream]
del self.streamers[substream]
else:
break
def _shutdown(self):
self.executor.shutdown()
def run(self):
if not self.initialized:
self.initialize()
self.streamer.run()
# start initial streamer
with self.lock:
if not self.stopped:
for streamer in self.streamers.values():
self.futures[streamer.receiver.substream] = (
self.executor.submit(streamer.run))
while True:
time.sleep(0.5)
if self.stopped:
self.stop_old_streamers(n_max=0)
self.stop_old_streamers()
try:
self.cleanup_stopped_streamers()
except StopIteration:
break
self._shutdown()
def _set_start_position(options, consumer, sender, worker_class):
......@@ -120,7 +232,7 @@ def _set_start_position(options, consumer, sender, worker_class):
try:
_, last_output_metadata = sender.get_last()
except StreamError as err:
logging.warning(
log.warning(
"Retrieving last output data failed with err=%s", err)
last_output_metadata = None
else:
......@@ -134,16 +246,25 @@ def _set_start_position(options, consumer, sender, worker_class):
output_start_index = worker_class.calculate_start_index(
last_output_metadata)
except AttributeError:
logging.warning(
log.warning(
"Worker does not support starting from the last processed "
"record. Starting from the beginning instead")
else:
consumer.set_start_id(input_start_id)
logging.info(
log.info(
"Setting worker option start_id=%s", output_start_id)
options["worker"]["start_id"] = output_start_id
logging.info(
log.info(
"Setting worker option start_index=%s", output_start_index)
options["worker"]["start_index"] = output_start_index
def _unset_start_position(options):
for key in ["start_id", "start_index"]:
for section in ["receiver", "worker"]:
try:
del options[section][key]
except KeyError:
pass
\ No newline at end of file
......@@ -2,7 +2,8 @@ import logging
import asapo_consumer
from AsapoWorker.configurable import Configurable, Config
from AsapoWorker.errors import (
StreamError, ConfigurationError, TemporaryError, MissingDataError)
StreamError, ConfigurationError, TemporaryError, MissingDataError,
EndOfStreamError)
log = logging.getLogger(__name__)
......@@ -48,6 +49,8 @@ class SimpleAsapoReceiver:
"group_id. If not given, a unique group id will be generated "
"and the worker will receive the complete stream.",
type=str)
substream = Config(
"The name of the substream.", type=str, default="default", init=False)
@group_id.default
def _generate_group_id(self):
......@@ -64,9 +67,10 @@ class SimpleAsapoReceiver:
log.info("Requesting next record for group_id=%s.", self.group_id)
try:
data, metadata = self.broker.get_next(
self.group_id, meta_only=meta_only)
except (asapo_consumer.AsapoEndOfStreamError,
asapo_consumer.AsapoUnavailableServiceError,
self.group_id, meta_only=meta_only, substream=self.substream)
except asapo_consumer.AsapoEndOfStreamError as err:
raise EndOfStreamError("End of stream") from err
except (asapo_consumer.AsapoUnavailableServiceError,
asapo_consumer.AsapoInterruptedTransactionError,
asapo_consumer.AsapoNoDataError,
asapo_consumer.AsapoLocalIOError) as err:
......@@ -81,6 +85,12 @@ class SimpleAsapoReceiver:
return data, metadata
def get_current_size(self):
try:
return self.broker.get_current_size(substream=self.substream)
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get current size") from err
# TODO: Ensure also that indices are consecutive or start at 0
@Configurable
......@@ -128,9 +138,12 @@ class SerialAsapoReceiver(SimpleAsapoReceiver):
log.info("Requesting next record for group_id=%s.", self.group_id)
try:
data, metadata = self.broker.get_next(
self.group_id, meta_only=meta_only)
except (asapo_consumer.AsapoEndOfStreamError,
asapo_consumer.AsapoUnavailableServiceError) as err:
self.group_id, meta_only=meta_only, substream=self.substream)
except asapo_consumer.AsapoEndOfStreamError as err:
raise EndOfStreamError(
"End of stream at expected_id"
+ str(self.expected_id)) from err
except asapo_consumer.AsapoUnavailableServiceError as err:
raise TemporaryError(
"Failed to get next at expected_id="
+ str(self.expected_id)) from err
......@@ -172,7 +185,7 @@ class SerialAsapoReceiver(SimpleAsapoReceiver):
try:
self.broker.set_lastread_marker(
self.expected_id - 1, self.group_id)
self.expected_id - 1, self.group_id, substream=self.substream)
except (asapo_consumer.AsapoEndOfStreamError,
asapo_consumer.AsapoUnavailableServiceError) as err:
# Do not increase retry counter because get_next will likely
......@@ -220,7 +233,7 @@ class SerialAsapoReceiver(SimpleAsapoReceiver):
log.info("Requesting last record for group_id=%s.", self.group_id)
try:
data, metadata = self.broker.get_last(
self.group_id, meta_only=meta_only)
self.group_id, meta_only=meta_only, substream=self.substream)
except (asapo_consumer.AsapoEndOfStreamError,
asapo_consumer.AsapoUnavailableServiceError) as err:
raise TemporaryError("Failed to get last") from err
......
......@@ -92,6 +92,8 @@ class AsapoSender:
"Allowed time in milliseconds for ASAP::O data access before "
"exception is thrown", type=float, default=3000)
))
substream = Config(
"The name of the substream.", type=str, default="default", init=False)
ingest_mode = Config(
"The ingest mode determines how the data is handled by ASAP::O. "
"Also accepts the name of the ingest mode as a string",
......@@ -101,7 +103,7 @@ class AsapoSender:
"If the length of the sending queue exceeds this value, the call to "
"send_data becomes blocking until at least one record is removed from "
"the queue (by successfully sending or by failure).",
type=int, default=10)
type=int, default=2)
_n_queued = Config(
"Length of queue of data waiting to be sent", type=int, default=0,
init=False)
......@@ -135,7 +137,7 @@ class AsapoSender:
self.producer.send_data(
metadata['_id'], metadata['name'], data,
user_meta=metadata["meta"], ingest_mode=self.ingest_mode,
callback=self._callback)
substream=self.substream, callback=self._callback)
except:
with self._lock:
self._n_queued -= 1
......@@ -195,7 +197,7 @@ class AsapoSender:
log.info("Requesting last record for group_id=%s.", self._group_id)
try:
data, metadata = self.broker.get_last(
self._group_id, meta_only=meta_only)
self._group_id, meta_only=meta_only, substream=self.substream)
except asapo_consumer.AsapoEndOfStreamError:
return None, None
except asapo_consumer.AsapoConsumerError as err:
......
......@@ -12,3 +12,7 @@ class TemporaryError(StreamError):
class MissingDataError(StreamError):
pass
class EndOfStreamError(TemporaryError):
pass
\ No newline at end of file
......@@ -2,23 +2,22 @@ import logging
import time
from asapo_producer import AsapoProducerError
from AsapoWorker.errors import (
TemporaryError, MissingDataError, ConfigurationError)
TemporaryError, MissingDataError, ConfigurationError, EndOfStreamError,
StreamError)
from AsapoWorker.utils import format_error
log = logging.getLogger(__name__)
def format_error(err):
msg = err.__class__.__name__ + "(" + str(err) + ")"
if err.__cause__:
msg += " caused by " + format_error(err.__cause__)
return msg
class Streamer:
def __init__(self, receiver, worker, delay_on_error=1):
def __init__(
self, receiver, worker, delay_on_error=1,
end_of_stream_callback=None):
self.receiver = receiver
self.worker = worker
self.delay_on_error = delay_on_error
self.end_of_stream_callback = end_of_stream_callback
self.started = False
self.stopped = False
def _process_stream(self):
......@@ -43,6 +42,11 @@ class Streamer:
def _get_next(self):
try:
data, metadata = self.receiver.get_next(meta_only=False)
except EndOfStreamError as err:
log.info(format_error(err))
self._handle_receiver_temporary_error()
self._handle_end_of_stream()
return None, None
except TemporaryError as err:
log.warn(format_error(err))
self._handle_receiver_temporary_error()
......@@ -57,8 +61,30 @@ class Streamer:
self.stop()
return None, None
self.started = True
return data, metadata
# TODO: slow down to not keep polling the broker
# (but speed up if stream continues)
def _handle_end_of_stream(self):
if not self.started:
# if the stream is already at the end without ever processing data,
# the size has to be checked to determine if the producer
# has already started
try:
current_size = self.receiver.get_current_size()
except StreamError as err:
# the state is unknown, so nothing should be done
log.warn("Failed to get current size", exc_info=True)
return
if current_size > 0:
self.started = True
if self.started and self.end_of_stream_callback:
# call the callback only once
self.end_of_stream_callback(self.receiver.substream)
self.end_of_stream_callback = None
def _handle_receiver_temporary_error(self):
self.worker.handle_receiver_temporary_error()
......
......@@ -16,7 +16,9 @@ class DummyStream:
self.metadata = metadata
self.indices = {}
def get_next(self, group_id, meta_only=True):
def get_next(self, group_id, substream="default", meta_only=True):
if substream != "default":
raise asapo_consumer.AsapoEndOfStreamError("End of stream")
index = self.indices.setdefault(group_id, 1)
if index > len(self.data):
raise asapo_consumer.AsapoEndOfStreamError("End of stream")
......@@ -29,56 +31,62 @@ class DummyStream:
self.indices[group_id] += 1
return data, metadata
def get_next_ok(self, group_id, meta_only=True):
def get_next_ok(self, group_id, substream="default", meta_only=True):
log.info("get_next_ok")
return self.get_next(group_id, meta_only)
return self.get_next(group_id, substream=substream, meta_only=meta_only)
def get_next_end_of_stream(self, group_id, meta_only=True):
def get_next_end_of_stream(
self, group_id, substream="default", meta_only=True):
log.info("get_next_end_of_stream")
raise asapo_consumer.AsapoEndOfStreamError("End of stream")
def get_next_skip(self, group_id, meta_only=True):
def get_next_skip(self, group_id, substream="default", meta_only=True):
log.info("get_next_skip")
self.indices[group_id] = self.indices.get(group_id, 0) + 1
raise asapo_consumer.AsapoInterruptedTransactionError("Interrupted")
def get_next_no_skip(self, group_id, meta_only=True):
def get_next_no_skip(self, group_id, substream="default", meta_only=True):
log.info("get_next_no_skip")
raise asapo_consumer.AsapoInterruptedTransactionError("Interrrupted")
def get_next_no_data(self, group_id, meta_only=True):
def get_next_no_data(self, group_id, substream="default", meta_only=True):
log.info("get_next_no_data")
raise asapo_consumer.AsapoNoDataError("No data")
def get_next_unavailable(self, group_id, meta_only=True):
def get_next_unavailable(
self, group_id, substream="default", meta_only=True):
log.info("get_next_unavailable")
raise asapo_consumer.AsapoUnavailableServiceError("Unavailable")
def get_next_io_error(self, group_id, meta_only=True):
def get_next_io_error(self, group_id, substream="default", meta_only=True):
log.info("get_next_io_error")
raise asapo_consumer.AsapoUnavailableServiceError("IO error")
def get_next_consumer_error(self, group_id, meta_only=True):
def get_next_consumer_error(
self, group_id, substream="default", meta_only=True):
log.info("get_next_consumer_error")
raise asapo_consumer.AsapoConsumerError()
def set_lastread_marker(self, id, group_id):
def set_lastread_marker(self, id, group_id, substream="default"):
if substream != "default":
return
self.indices[group_id] = id + 1
def set_lastread_marker_ok(self, id, group_id):
def set_lastread_marker_ok(self, id, group_id, substream="default"):
log.info("set_lastread_marker_ok")
return self.set_lastread_marker(id, group_id)
return self.set_lastread_marker(id, group_id, substream=substream)
def set_lastread_marker_not_set(self, id, group_id):
def set_lastread_marker_not_set(self, id, group_id, substream="default"):
log.info("set_lastread_marker_not_set")
raise asapo_consumer.AsapoInterruptedTransactionError("Interrrupted")
def set_lastread_marker_set(self, id, group_id):
def set_lastread_marker_set(self, id, group_id, substream="default"):
log.info("set_lastread_marker_set")
self.set_lastread_marker(id, group_id)
raise asapo_consumer.AsapoInterruptedTransactionError("Interrrupted")
def set_lastread_marker_unavailable(self, id, group_id):
def set_lastread_marker_unavailable(
self, id, group_id, substream="default"):
log.info("set_lastread_marker_unavailable")
raise asapo_consumer.AsapoUnavailableServiceError("Unavailable")
......
def not_last_n(n, iterator):
for i, element in enumerate(iterator):
if len(iterator) - i > n:
yield element
else:
raise StopIteration()
def format_error(err):
msg = err.__class__.__name__ + "(" + str(err) + ")"
if err.__cause__:
msg += " caused by " + format_error(err.__cause__)
return msg
......@@ -51,7 +51,8 @@ def callback_futures():
def callback_executor(mock_producer, callback_futures):
executor = ThreadPoolExecutor(max_workers=5)
def mock_send_data(id, name, data, user_meta, ingest_mode, callback):
def mock_send_data(
id, name, data, user_meta, ingest_mode, substream, callback):
header = {"id": id, "buffer": name}
err = user_meta.get("err", None)
delay = user_meta.get("delay", 0)
......@@ -95,7 +96,7 @@ def test_asapo_sender(sender):
sender.producer.send_data.assert_called_once_with(
1, "foobar", "data", user_meta={},
ingest_mode=asapo_producer.DEFAULT_INGEST_MODE,
callback=sender._callback)
substream="default", callback=sender._callback)
assert sender._n_queued == 0
......@@ -115,7 +116,7 @@ def test_asapo_sender_random(sender, callback_executor, callback_futures):
call(
i, name, "data", user_meta=meta,
ingest_mode=asapo_producer.DEFAULT_INGEST_MODE,
callback=sender._callback))
substream="default", callback=sender._callback))
for metadata in metadata_list:
sender.send_data("data", metadata)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment