Commit 64cfecbb authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Send stream-finish flag once on calling side.

parent e2010613
Pipeline #12869 passed with stage
in 54 seconds
......@@ -170,10 +170,11 @@ class Streamer:
return None, None
except StreamFinishedError as err:
log.info("Stream is finished")
self.stream_finished = True
self._handle_end_of_stream()
stream_info = self.receiver.get_stream_info()
self.worker.stream_finished(stream_info)
if not self.stream_finished:
stream_info = self.receiver.get_stream_info()
self.worker.stream_finished(stream_info)
self.stream_finished = True
left_msgs = len(self.receiver.get_unacknowledged_messages())
log.info("Number of unacknowledged messages = %s", left_msgs)
if left_msgs == 0 or self.receiver.n_resend_nacs == 0:
......
......@@ -98,9 +98,6 @@ class SimpleWorker(Worker):
output_name_format = Config(
"Format for deriving output name from input name",
type=str, default="{basename}_processed-{index}")
is_finished = Config(
"Flag shows that processed stream is finished",
type=bool, default=False)
def get_output_name(self, metadata):
"Get the output name from output_name_format and the input metadata"
......@@ -147,10 +144,9 @@ class SimpleWorker(Worker):
"""
Function is called at the end of processing to send end-of-stream flag
"""
if self.sender and not self.is_finished:
if self.sender:
self.sender.send_end_of_stream(stream_info['lastId'],
next_stream=stream_info['nextStream'])
self.is_finished = True
else:
raise ConfigurationError(
"Worker wants to send data, but no sender configured!")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment