Commit 12571164 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Implement FinishStream for simple worker

parent 9f096471
......@@ -110,6 +110,12 @@ class SimpleAsapoReceiver:
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get stream list") from err
def get_stream_info(self):
try:
return self.consumer.get_stream_list(from_stream=self.stream)[0]
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get stream info") from err
# TODO: Ensure also that indices are consecutive or start at 0
@Configurable
......
......@@ -160,7 +160,8 @@ class Streamer:
except StreamFinishedError as err:
log.info("Stream is finished")
self._handle_end_of_stream()
self.worker.stream_finished()
stream_info = self.receiver.get_stream_info()
self.worker.stream_finished(stream_info)
# Stop this instance if it can not start new instance anymore
if self.end_of_stream_callback is None:
self.stop()
......
......@@ -62,15 +62,11 @@ class Worker:
"""
pass
def stream_finished(self):
def stream_finished(self, stream_info):
"""
Function is called at the end of processing to send end-of-stream flag
"""
if self.sender:
self.sender.send_end_of_stream()
else:
raise ConfigurationError(
"Worker wants to send data, but no sender configured!")
pass
def send(self, data, metadata):
if self.sender:
......@@ -128,6 +124,16 @@ class SimpleWorker(Worker):
return new_metadata
def stream_finished(self, stream_info):
"""
Function is called at the end of processing to send end-of-stream flag
"""
if self.sender:
self.sender.send_end_of_stream(stream_info['lastId'])
else:
raise ConfigurationError(
"Worker wants to send data, but no sender configured!")
def send(self, data, metadata, extra_meta=None):
"""Send the data to the configured output stream
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment