Commit 01278fdc authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Add substream work

parent f19756ca
......@@ -210,3 +210,9 @@ class AsapoSender:
log.info("Received last record with id=%i.", current_id)
return data, metadata
def get_substream_list(self):
try:
return self.broker.get_substream_list()
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get substream list") from err
......@@ -13,12 +13,27 @@ def max_substream(substreams):
return max(int(s) for s in substreams if s.isdecimal())
def has_newer_substream(substream, substream_list, metadata_substream_list):
# a newer substream is ready, when it has data and metadata
max_substream_number = max_substream(substream_list)
return (
int(substream) < max_substream_number
and max_substream_number in metadata_substream_list)
def get_new_substream(substream_list, sender_substream_list):
if sender_substream_list is None:
return None
new_seubstreams = list(set(substream_list)
- set(sender_substream_list) - set(substream))
if len(new_seubstreams) > 0:
return new_seubstreams[0]
else:
return None
def has_newer_substream(substream, substream_list, metadata_substream_list,
sender_substream_list=None):
if sender_substream_list:
return get_new_substream(substream_list, sender_substream_list) is not None
else:
# a newer substream is ready, when it has data and metadata
max_substream_number = max_substream(substream_list)
return (
int(substream) < max_substream_number
and max_substream_number in metadata_substream_list)
class ContainsAll:
......@@ -114,6 +129,11 @@ class Streamer:
log.warn("Failed to get substream list", exc_info=True)
return
try:
sender_substream_list = self.receiver.get_substream_list()
except StreamError:
sender_substream_list = None
if self.metadata_receiver:
try:
metadata_substream_list = (
......@@ -137,14 +157,16 @@ class Streamer:
try:
newer_has_data = has_newer_substream(
self.receiver.substream, substream_list,
metadata_substream_list)
metadata_substream_list,
sender_substream_list)
except ValueError:
# substream is not an integer, so nothing should be done
return
if (has_data or newer_has_data) and self.end_of_stream_callback:
# call the callback only once
self.end_of_stream_callback(self.receiver.substream)
new_substream = get_new_substream(substream_list, sender_substream_list)
self.end_of_stream_callback(self.receiver.substream, new_substream)
self.end_of_stream_callback = None
if newer_has_data:
......
......@@ -31,14 +31,15 @@ class StreamerManager:
self.initialized = True
def start_next_callback(self, substream):
try:
next_substream = str(int(substream) + 1)
except ValueError:
log.warning(
"Cannot calculate next substream from non-integer value "
"substream=%s", substream)
return
def start_next_callback(self, substream, next_substream=None):
if not next_substream:
try:
next_substream = str(int(substream) + 1)
except ValueError:
log.warning(
"Cannot calculate next substream from non-integer value "
"substream=%s", substream)
return
self.start_substream_thread(next_substream)
def start_substream_thread(self, substream):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment