Commit 5405b151 authored by Tim Schoof's avatar Tim Schoof
Browse files

Move substream metadata to extra stream

An application that does not configure the metadata receiver will
continue to work as in previous versions without a metadata stream. In
particular, the worker's pre_scan method will not be called in that
case.
parent 726aef85
Pipeline #26 failed with stages
in 12 minutes and 23 seconds
......@@ -16,14 +16,16 @@ log = logging.getLogger(__name__)
class Application:
def __init__(
self, worker_class, consumer_class,
producer_class=None,
producer_class=None, metadata_receiver_class=None,
verbose=False, log_level=logging.WARN):
self.worker_class = worker_class
self.consumer_class = consumer_class
self.producer_class = producer_class
self.metadata_receiver_class = metadata_receiver_class
self.receiver_broker = None
self.sender_broker = None
self.sender_producer = None
self.metadata_receiver = None
self.options = None
self.streamer_manager = StreamerManager(self._create_streamer)
self.verbose = verbose
......@@ -76,6 +78,11 @@ class Application:
create_cli_parser_from_configurable_class(
self.producer_class, parser, prefix="sender.")
if self.metadata_receiver_class:
create_cli_parser_from_configurable_class(
self.metadata_receiver_class, parser,
prefix="metadata_receiver.")
parsed_args = parser.parse_args(sys.argv[1:])
self.options = parsed_args_to_dict(parsed_args)
......@@ -111,6 +118,12 @@ class Application:
else:
sender = None
if self.metadata_receiver_class:
if not self.metadata_receiver:
self.metadata_receiver = create_instance_from_configurable(
self.metadata_receiver_class,
self.options["metadata_receiver"])
if not substream:
if "substream" in self.options:
substream = self.options["substream"]
......@@ -144,7 +157,7 @@ class Application:
streamer_options = {}
streamer = Streamer(
consumer, worker,
consumer, worker, metadata_receiver=self.metadata_receiver,
end_of_stream_callback=self.streamer_manager.start_next_callback,
**streamer_options)
......
......@@ -58,23 +58,6 @@ class SimpleAsapoReceiver:
type=str)
substream = Config(
"The name of the substream.", type=str, default="default", init=False)
metadata_broker = Config(
"An ASAP::O consumer broker", type=asapo_consumer.PyDataBroker,
builder=create_metadata_broker, flatten=True, default=None,
arguments=dict(
source=Config("ASAP::O endpoint", type=str),
path=Config("ASAP::O mount path", type=str),
beamtime=Config("Beamtime ID", type=str),
token=Config("Beamtime access token", type=str),
stream=Config(
"Name of input stream", type=str, default=""),
has_filesystem=Config(
"Read files directly from filesystem",
type=bool, default=False),
timeout=Config(
"Allowed time in milliseconds for ASAP::O data access before "
"exception is thrown", type=float, default=3000)
))
@group_id.default
def _generate_group_id(self):
......@@ -123,30 +106,6 @@ class SimpleAsapoReceiver:
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get substream list") from err
def get_metadata_substream_list(self):
try:
return self.metadata_broker.get_substream_list()
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get substream list") from err
def get_substream_metadata(self):
try:
data, metadata = self.metadata_broker.get_by_id(
1, self.group_id, substream=self.substream)
return metadata
except asapo_consumer.AsapoEndOfStreamError as err:
raise EndOfStreamError("End of metadata stream") from err
except (asapo_consumer.AsapoUnavailableServiceError,
asapo_consumer.AsapoInterruptedTransactionError,
asapo_consumer.AsapoNoDataError,
asapo_consumer.AsapoLocalIOError) as err:
raise TemporaryError("Failed to get substream metadata") from err
except asapo_consumer.AsapoWrongInputError as err:
raise ConfigurationError(
"Failed to get substream metadata") from err
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get substream metadata") from err
# TODO: Ensure also that indices are consecutive or start at 0
@Configurable
......@@ -314,3 +273,71 @@ class SerialAsapoReceiver(SimpleAsapoReceiver):
metadata["substream"] = self.substream
return data, metadata
@Configurable
class AsapoMetadataReceiver:
"""A wrapper for receiving substream metadata of an ASAP::O stream"""
broker = Config(
"An ASAP::O consumer broker", type=asapo_consumer.PyDataBroker,
builder=create_broker, flatten=True, arguments=dict(
source=Config("ASAP::O endpoint", type=str),
beamtime=Config("Beamtime ID", type=str),
token=Config("Beamtime access token", type=str),
stream=Config(
"Name of metadata stream", type=str, default=""),
timeout=Config(
"Allowed time in milliseconds for ASAP::O data access before "
"exception is thrown", type=float, default=3000),
path=Config(
"ASAP::O mount path", type=str, default="", init=False),
has_filesystem=Config(
"Read files directly from filesystem",
type=bool, default=False, init=False)
))
group_id = Config(
"The stream data is divided between all workers with the same "
"group_id. If not given, a unique group id will be generated "
"and the worker will receive the complete stream.",
type=str)
@group_id.default
def _generate_group_id(self):
log.info("Generating new group id.")
try:
group_id = self.broker.generate_group_id()
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Cannot generate group_id") from err
log.info("New group_id=%s.", group_id)
return group_id
def get_current_size(self, substream):
try:
return self.broker.get_current_size(substream=substream)
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get current size") from err
def get_substream_list(self):
try:
return self.broker.get_substream_list()
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get substream list") from err
def get_substream_metadata(self, substream):
try:
data, metadata = self.broker.get_by_id(
1, self.group_id, substream=substream)
return metadata
except asapo_consumer.AsapoEndOfStreamError as err:
raise EndOfStreamError("End of metadata stream") from err
except (asapo_consumer.AsapoUnavailableServiceError,
asapo_consumer.AsapoInterruptedTransactionError,
asapo_consumer.AsapoNoDataError,
asapo_consumer.AsapoLocalIOError) as err:
raise TemporaryError("Failed to get substream metadata") from err
except asapo_consumer.AsapoWrongInputError as err:
raise ConfigurationError(
"Failed to get substream metadata") from err
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get substream metadata") from err
......@@ -21,12 +21,18 @@ def has_newer_substream(substream, substream_list, metadata_substream_list):
and max_substream_number in metadata_substream_list)
class ContainsAll:
def __contains__(self, item):
return True
class Streamer:
def __init__(
self, receiver, worker, delay_on_error=3,
self, receiver, worker, metadata_receiver=None, delay_on_error=3,
end_of_stream_callback=None, meta_only=False):
self.receiver = receiver
self.worker = worker
self.metadata_receiver = metadata_receiver
self.initial_delay_on_error = delay_on_error
self.delay_on_error = delay_on_error
self.end_of_stream_callback = end_of_stream_callback
......@@ -53,7 +59,8 @@ class Streamer:
def _get_substream_metadata(self):
try:
return self.receiver.get_substream_metadata()
return self.metadata_receiver.get_substream_metadata(
self.receiver.substream)
except EndOfStreamError as err:
log.info(format_error(err))
# The substream might have been skipped
......@@ -107,13 +114,19 @@ class Streamer:
log.warn("Failed to get substream list", exc_info=True)
return
try:
metadata_substream_list = (
self.receiver.get_metadata_substream_list())
except StreamError:
# the state is unknown, so nothing should be done
log.warn("Failed to get metadata substream list", exc_info=True)
return
if self.metadata_receiver:
try:
metadata_substream_list = (
self.metadata_receiver.get_substream_list())
except StreamError:
# the state is unknown, so nothing should be done
log.warn(
"Failed to get metadata substream list", exc_info=True)
return
else:
# Stream does not use substream metadata, therefore consider all
# metadata as available
metadata_substream_list = ContainsAll()
if (self.receiver.substream in substream_list
and self.receiver.substream in metadata_substream_list):
......@@ -148,21 +161,22 @@ class Streamer:
def run(self):
try:
log.info("Waiting for substream metadata")
while not self.stopped.is_set():
substream_metadata = self._get_substream_metadata()
if substream_metadata:
self._reset_delay_on_error()
break
if self.likely_done:
self.stopped.wait(self.delay_on_error)
self._increase_delay_on_error()
else:
# no break, i.e., stopped is set
return
if self.metadata_receiver:
log.info("Waiting for substream metadata")
while not self.stopped.is_set():
substream_metadata = self._get_substream_metadata()
if substream_metadata:
self._reset_delay_on_error()
break
if self.likely_done:
self.stopped.wait(self.delay_on_error)
self._increase_delay_on_error()
else:
# no break, i.e., stopped is set
return
log.info("Performing pre-scan setup")
self.worker.pre_scan(substream_metadata["meta"])
log.info("Performing pre-scan setup")
self.worker.pre_scan(substream_metadata["meta"])
log.info("Start stream processing.")
while not self.stopped.is_set():
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment