Commit 5e392fa0 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Copy next stream flag

parent 12571164
Pipeline #4524 passed with stage
in 48 seconds
......@@ -173,10 +173,10 @@ class AsapoSender:
with self._lock:
n_queued = self._n_queued
def send_end_of_stream(self, last_id=None):
def send_end_of_stream(self, last_id=None, next_stream=None):
if last_id is None:
last_id = self.producer.stream_info(stream=self.stream)['lastId']
self.producer.send_stream_finished_flag(stream=self.stream, last_id=last_id)
self.producer.send_stream_finished_flag(stream=self.stream, last_id=last_id, next_stream=next_stream)
def _callback(self, header, err):
with self._lock:
......
......@@ -129,7 +129,8 @@ class SimpleWorker(Worker):
Function is called at the end of processing to send end-of-stream flag
"""
if self.sender:
self.sender.send_end_of_stream(stream_info['lastId'])
self.sender.send_end_of_stream(stream_info['lastId'],
next_stream=stream_info['nextStream'])
else:
raise ConfigurationError(
"Worker wants to send data, but no sender configured!")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment