Commit 107e0ed6 authored by Tim Schoof's avatar Tim Schoof
Browse files

Handle skipped substreams

Simplify and extend logic for starting the next subtream to handle
skipped substreams as well.
parent 5433e8d8
......@@ -91,6 +91,12 @@ class SimpleAsapoReceiver:
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get current size") from err
def get_substream_list(self):
try:
return self.broker.get_substream_list()
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get substream list") from err
# TODO: Ensure also that indices are consecutive or start at 0
@Configurable
......
import logging
import time
from threading import Event
from asapo_producer import AsapoProducerError
from AsapoWorker.errors import (
......@@ -10,6 +9,14 @@ from AsapoWorker.utils import format_error
log = logging.getLogger(__name__)
def max_substream(substreams):
return max(int(s) for s in substreams if s.isdecimal())
def has_newer_substream(substream, substream_list):
return int(substream) < max_substream(substream_list)
class Streamer:
def __init__(
self, receiver, worker, delay_on_error=1,
......@@ -19,7 +26,7 @@ class Streamer:
self.initial_delay_on_error = delay_on_error
self.delay_on_error = delay_on_error
self.end_of_stream_callback = end_of_stream_callback
self.started = False
self.likely_done = False
self.stopped = Event()
def _process_stream(self):
......@@ -60,28 +67,47 @@ class Streamer:
self._handle_receiver_critical_error()
raise err
self.started = True
return data, metadata
def _handle_end_of_stream(self):
if not self.started:
# if the stream is already at the end without ever processing data,
# the size has to be checked to determine if the producer
# has already started
try:
current_size = self.receiver.get_current_size()
except StreamError as err:
# the state is unknown, so nothing should be done
log.warn("Failed to get current size", exc_info=True)
return
if current_size > 0:
self.started = True
if self.started and self.end_of_stream_callback:
# When receiving an EndOfStreamError, there are two cases to consider:
# 1. The substream has data, i.e., occurs in the substream list
# -> data receiving is slow or substream is finished
# -> start next stream
# 2. A newer substream has data, i.e., occurs in the substream list
# -> substream is likely finished or was skipped
# -> start next substream + reduce polling rate
if self.likely_done:
# nothing is left to be done
return
try:
substream_list = self.receiver.get_substream_list()
except StreamError:
# the state is unknown, so nothing should be done
log.warn("Failed to get substream list", exc_info=True)
return
if self.receiver.substream in substream_list:
has_data = True
else:
has_data = False
try:
newer_has_data = has_newer_substream(
self.receiver.substream, substream_list)
except ValueError:
# substream is not an integer, so nothing should be done
return
if (has_data or newer_has_data) and self.end_of_stream_callback:
# call the callback only once
self.end_of_stream_callback(self.receiver.substream)
self.end_of_stream_callback = None
if newer_has_data:
self.likely_done = True
def _handle_receiver_temporary_error(self):
self.worker.handle_receiver_temporary_error()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment