Commit 1d6d6754 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Feat: Add data to metadata stream.

parent 16544352
...@@ -325,10 +325,17 @@ class AsapoMetadataReceiver: ...@@ -325,10 +325,17 @@ class AsapoMetadataReceiver:
raise StreamError("Failed to get substream list") from err raise StreamError("Failed to get substream list") from err
def get_substream_metadata(self, substream): def get_substream_metadata(self, substream):
"""
Return the last available entry from given substream
of the metadata stream
Parameters
----------
substream : str
Name of substream
"""
try: try:
data, metadata = self.broker.get_by_id( return self.broker.get_last(self.group_id, substream=substream)
1, self.group_id, substream=substream)
return metadata
except asapo_consumer.AsapoEndOfStreamError as err: except asapo_consumer.AsapoEndOfStreamError as err:
raise EndOfStreamError("End of metadata stream") from err raise EndOfStreamError("End of metadata stream") from err
except (asapo_consumer.AsapoUnavailableServiceError, except (asapo_consumer.AsapoUnavailableServiceError,
......
...@@ -65,10 +65,10 @@ class Streamer: ...@@ -65,10 +65,10 @@ class Streamer:
log.info(format_error(err)) log.info(format_error(err))
# The substream might have been skipped # The substream might have been skipped
self._handle_end_of_stream() self._handle_end_of_stream()
return None return None, None
except TemporaryError as err: except TemporaryError as err:
log.warn(format_error(err)) log.warn(format_error(err))
return None return None, None
def _get_next(self): def _get_next(self):
try: try:
...@@ -164,7 +164,7 @@ class Streamer: ...@@ -164,7 +164,7 @@ class Streamer:
if self.metadata_receiver: if self.metadata_receiver:
log.info("Waiting for substream metadata") log.info("Waiting for substream metadata")
while not self.stopped.is_set(): while not self.stopped.is_set():
substream_metadata = self._get_substream_metadata() data, substream_metadata = self._get_substream_metadata()
if substream_metadata: if substream_metadata:
self._reset_delay_on_error() self._reset_delay_on_error()
break break
...@@ -176,7 +176,7 @@ class Streamer: ...@@ -176,7 +176,7 @@ class Streamer:
return return
log.info("Performing pre-scan setup") log.info("Performing pre-scan setup")
self.worker.pre_scan(substream_metadata["meta"]) self.worker.pre_scan(data, substream_metadata)
log.info("Start stream processing.") log.info("Start stream processing.")
while not self.stopped.is_set(): while not self.stopped.is_set():
......
...@@ -26,6 +26,20 @@ class Worker: ...@@ -26,6 +26,20 @@ class Worker:
def handle_receiver_critical_error(self): def handle_receiver_critical_error(self):
self.handle_receiver_error() self.handle_receiver_error()
def pre_scan(self, data, metadata):
"""
If metadata stream is given this function is called at the beginning of
each stream before get_next is called.
Parameters
----------
data:
data of the metadata stream
metadata: dict
metadata of the metadata steam
"""
pass
def send(self, data, metadata): def send(self, data, metadata):
if self.sender: if self.sender:
self.sender.send_data(data, metadata) self.sender.send_data(data, metadata)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment