Commit 908ca2c3 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Implement end-of-stream

parent c3448672
......@@ -173,6 +173,10 @@ class AsapoSender:
with self._lock:
n_queued = self._n_queued
def send_end_of_stream(self, last_id=None):
if last_id is None:
last_id = self.producer.stream_info(stream=self.stream)['lastId']
self.producer.send_stream_finished_flag(stream=self.stream, last_id=last_id)
def _callback(self, header, err):
with self._lock:
......
......@@ -62,11 +62,20 @@ class Worker:
"""
pass
def stream_finished(self):
def stream_finished(self, last_id=None):
"""
Function is called what enf-of-stream flag is received
Function is called at the end of processing and send end-of-stream flag
Parameters
----------
last_id: id of last message in stream. If not given last existing is
considered as last message.
"""
pass
if self.sender:
self.sender.send_end_of_stream(last_id)
else:
raise ConfigurationError(
"Worker wants to send data, but no sender configured!")
def send(self, data, metadata):
if self.sender:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment