Commit db85b614 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Merge branch 'feat/end_of_stream' into 'master'

Add end-of-stream error handling

See merge request fs-sc/asapoworker!23
parents 939e39ad 5e392fa0
......@@ -3,7 +3,7 @@ import asapo_consumer
from AsapoWorker.configurable import Configurable, Config
from AsapoWorker.errors import (
StreamError, ConfigurationError, TemporaryError, MissingDataError,
EndOfStreamError)
EndOfStreamError, StreamFinishedError)
log = logging.getLogger(__name__)
......@@ -79,6 +79,8 @@ class SimpleAsapoReceiver:
self.group_id, meta_only=meta_only, stream=self.stream)
except asapo_consumer.AsapoEndOfStreamError as err:
raise EndOfStreamError("End of data_source") from err
except asapo_consumer.AsapoStreamFinishedError as err:
raise StreamFinishedError("Finish of data_source") from err
except (asapo_consumer.AsapoUnavailableServiceError,
asapo_consumer.AsapoInterruptedTransactionError,
asapo_consumer.AsapoNoDataError,
......@@ -108,6 +110,12 @@ class SimpleAsapoReceiver:
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get stream list") from err
def get_stream_info(self):
try:
return self.consumer.get_stream_list(from_stream=self.stream)[0]
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get stream info") from err
# TODO: Ensure also that indices are consecutive or start at 0
@Configurable
......@@ -160,6 +168,10 @@ class SerialAsapoReceiver(SimpleAsapoReceiver):
raise EndOfStreamError(
"End of data_source at expected_id"
+ str(self.expected_id)) from err
except asapo_consumer.AsapoStreamFinishedError as err:
raise StreamFinishedError(
"Finish of data_source at expected_id"
+ str(self.expected_id)) from err
except asapo_consumer.AsapoUnavailableServiceError as err:
raise TemporaryError(
"Failed to get next at expected_id="
......@@ -305,6 +317,10 @@ class SerialDatasetAsapoReceiver(SerialAsapoReceiver):
raise EndOfStreamError(
"End of data_source at expected_id"
+ str(self.expected_id)) from err
except asapo_consumer.AsapoStreamFinishedError as err:
raise StreamFinishedError(
"Finish of data_source at expected_id"
+ str(self.expected_id)) from err
except asapo_consumer.AsapoUnavailableServiceError as err:
raise TemporaryError(
"Failed to get next at expected_id="
......
......@@ -173,6 +173,10 @@ class AsapoSender:
with self._lock:
n_queued = self._n_queued
def send_end_of_stream(self, last_id=None, next_stream=None):
if last_id is None:
last_id = self.producer.stream_info(stream=self.stream)['lastId']
self.producer.send_stream_finished_flag(stream=self.stream, last_id=last_id, next_stream=next_stream)
def _callback(self, header, err):
with self._lock:
......
......@@ -15,4 +15,9 @@ class MissingDataError(StreamError):
class EndOfStreamError(TemporaryError):
pass
\ No newline at end of file
pass
class StreamFinishedError(TemporaryError):
pass
......@@ -7,7 +7,7 @@ from AsapoWorker.configuration import create_instance_from_configurable
from AsapoWorker.configurable import check_type
from AsapoWorker.errors import (
TemporaryError, MissingDataError, ConfigurationError, EndOfStreamError,
StreamError)
StreamError, StreamFinishedError)
from AsapoWorker.utils import format_error
log = logging.getLogger(__name__)
......@@ -163,6 +163,15 @@ class Streamer:
log.error("Missing data error", exc_info=True)
self._handle_receiver_missing_data_error()
return None, None
except StreamFinishedError as err:
log.info("Stream is finished")
self._handle_end_of_stream()
stream_info = self.receiver.get_stream_info()
self.worker.stream_finished(stream_info)
# Stop this instance if it can not start new instance anymore
if self.end_of_stream_callback is None:
self.stop()
return None, None
except Exception as err:
log.critical("Unhandled exception", exc_info=True)
self._handle_receiver_critical_error()
......
......@@ -62,6 +62,12 @@ class Worker:
"""
pass
def stream_finished(self, stream_info):
"""
Function is called at the end of processing to send end-of-stream flag
"""
pass
def send(self, data, metadata):
if self.sender:
self.sender.send_data(data, metadata)
......@@ -118,6 +124,17 @@ class SimpleWorker(Worker):
return new_metadata
def stream_finished(self, stream_info):
"""
Function is called at the end of processing to send end-of-stream flag
"""
if self.sender:
self.sender.send_end_of_stream(stream_info['lastId'],
next_stream=stream_info['nextStream'])
else:
raise ConfigurationError(
"Worker wants to send data, but no sender configured!")
def send(self, data, metadata, extra_meta=None):
"""Send the data to the configured output stream
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment