Commit e2010613 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Send stream finish flag only once

parent a604da6e
Pipeline #12496 failed with stage
......@@ -98,6 +98,9 @@ class SimpleWorker(Worker):
output_name_format = Config(
"Format for deriving output name from input name",
type=str, default="{basename}_processed-{index}")
is_finished = Config(
"Flag shows that processed stream is finished",
type=bool, default=False)
def get_output_name(self, metadata):
"Get the output name from output_name_format and the input metadata"
......@@ -144,9 +147,10 @@ class SimpleWorker(Worker):
"""
Function is called at the end of processing to send end-of-stream flag
"""
if self.sender:
if self.sender and not self.is_finished:
self.sender.send_end_of_stream(stream_info['lastId'],
next_stream=stream_info['nextStream'])
self.is_finished = True
else:
raise ConfigurationError(
"Worker wants to send data, but no sender configured!")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment