Commit 726aef85 authored by Tim Schoof's avatar Tim Schoof
Browse files

Implement substream metadata on the input stream

The stream will only be processed if the substream metadata is available
(can be an empty dict) and pre_scan was successfully called with the
received metadata.
parent fc5619a4
......@@ -25,6 +25,13 @@ def create_broker(
return broker
def create_metadata_broker(
source, path, has_filesystem, beamtime, stream, token, timeout):
return create_broker(
source, path, has_filesystem, beamtime, stream + "_metadata", token,
timeout)
@Configurable
class SimpleAsapoReceiver:
"""A simple wrapper for an ASAP::O consumer"""
......@@ -51,6 +58,23 @@ class SimpleAsapoReceiver:
type=str)
substream = Config(
"The name of the substream.", type=str, default="default", init=False)
metadata_broker = Config(
"An ASAP::O consumer broker", type=asapo_consumer.PyDataBroker,
builder=create_metadata_broker, flatten=True, default=None,
arguments=dict(
source=Config("ASAP::O endpoint", type=str),
path=Config("ASAP::O mount path", type=str),
beamtime=Config("Beamtime ID", type=str),
token=Config("Beamtime access token", type=str),
stream=Config(
"Name of input stream", type=str, default=""),
has_filesystem=Config(
"Read files directly from filesystem",
type=bool, default=False),
timeout=Config(
"Allowed time in milliseconds for ASAP::O data access before "
"exception is thrown", type=float, default=3000)
))
@group_id.default
def _generate_group_id(self):
......@@ -99,6 +123,30 @@ class SimpleAsapoReceiver:
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get substream list") from err
def get_metadata_substream_list(self):
try:
return self.metadata_broker.get_substream_list()
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get substream list") from err
def get_substream_metadata(self):
try:
data, metadata = self.metadata_broker.get_by_id(
1, self.group_id, substream=self.substream)
return metadata
except asapo_consumer.AsapoEndOfStreamError as err:
raise EndOfStreamError("End of metadata stream") from err
except (asapo_consumer.AsapoUnavailableServiceError,
asapo_consumer.AsapoInterruptedTransactionError,
asapo_consumer.AsapoNoDataError,
asapo_consumer.AsapoLocalIOError) as err:
raise TemporaryError("Failed to get substream metadata") from err
except asapo_consumer.AsapoWrongInputError as err:
raise ConfigurationError(
"Failed to get substream metadata") from err
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get substream metadata") from err
# TODO: Ensure also that indices are consecutive or start at 0
@Configurable
......
......@@ -13,8 +13,12 @@ def max_substream(substreams):
return max(int(s) for s in substreams if s.isdecimal())
def has_newer_substream(substream, substream_list):
return int(substream) < max_substream(substream_list)
def has_newer_substream(substream, substream_list, metadata_substream_list):
# a newer substream is ready, when it has data and metadata
max_substream_number = max_substream(substream_list)
return (
int(substream) < max_substream_number
and max_substream_number in metadata_substream_list)
class Streamer:
......@@ -47,6 +51,18 @@ class Streamer:
return True
def _get_substream_metadata(self):
try:
return self.receiver.get_substream_metadata()
except EndOfStreamError as err:
log.info(format_error(err))
# The substream might have been skipped
self._handle_end_of_stream()
return None
except TemporaryError as err:
log.warn(format_error(err))
return None
def _get_next(self):
try:
data, metadata = self.receiver.get_next(meta_only=self.meta_only)
......@@ -72,10 +88,12 @@ class Streamer:
def _handle_end_of_stream(self):
# When receiving an EndOfStreamError, there are two cases to consider:
# 1. The substream has data, i.e., occurs in the substream list
# 1. The substream has data and metadata, i.e., occurs in the substream
# list
# -> data receiving is slow or substream is finished
# -> start next stream
# 2. A newer substream has data, i.e., occurs in the substream list
# 2. A newer substream has data and metadata, i.e., occurs in the
# substream list
# -> substream is likely finished or was skipped
# -> start next substream + reduce polling rate
if self.likely_done:
......@@ -89,14 +107,24 @@ class Streamer:
log.warn("Failed to get substream list", exc_info=True)
return
if self.receiver.substream in substream_list:
try:
metadata_substream_list = (
self.receiver.get_metadata_substream_list())
except StreamError:
# the state is unknown, so nothing should be done
log.warn("Failed to get metadata substream list", exc_info=True)
return
if (self.receiver.substream in substream_list
and self.receiver.substream in metadata_substream_list):
has_data = True
else:
has_data = False
try:
newer_has_data = has_newer_substream(
self.receiver.substream, substream_list)
self.receiver.substream, substream_list,
metadata_substream_list)
except ValueError:
# substream is not an integer, so nothing should be done
return
......@@ -119,8 +147,24 @@ class Streamer:
self.worker.handle_receiver_critical_error()
def run(self):
log.info("Start stream processing.")
try:
log.info("Waiting for substream metadata")
while not self.stopped.is_set():
substream_metadata = self._get_substream_metadata()
if substream_metadata:
self._reset_delay_on_error()
break
if self.likely_done:
self.stopped.wait(self.delay_on_error)
self._increase_delay_on_error()
else:
# no break, i.e., stopped is set
return
log.info("Performing pre-scan setup")
self.worker.pre_scan(substream_metadata["meta"])
log.info("Start stream processing.")
while not self.stopped.is_set():
success = self._process_stream()
if self.likely_done and not success:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment