Commit 75bb02c6 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Merge branch 'feat/expose_stream_to_receiver' into 'master'

Feat: Expose stream to receiver

See merge request tim.schoof/asapoworker!7
parents 831d0c31 757316dd
Pipeline #186 failed with stage
in 1 minute and 47 seconds
......@@ -58,6 +58,8 @@ class SimpleAsapoReceiver:
type=str)
substream = Config(
"The name of the substream.", type=str, default="default", init=False)
stream=Config(
"Name of input stream", type=str, default="")
@group_id.default
def _generate_group_id(self):
......@@ -91,7 +93,7 @@ class SimpleAsapoReceiver:
log.info("Received record with id=%i.", current_id)
metadata["substream"] = self.substream
metadata["stream"] = self.stream
return data, metadata
def get_current_size(self):
......@@ -193,7 +195,7 @@ class SerialAsapoReceiver(SimpleAsapoReceiver):
self.retries = 0
metadata["substream"] = self.substream
metadata["stream"] = self.stream
return data, metadata
def _set_marker(self):
......@@ -271,7 +273,7 @@ class SerialAsapoReceiver(SimpleAsapoReceiver):
self.need_set_marker = True
metadata["substream"] = self.substream
metadata["stream"] = self.stream
return data, metadata
......@@ -295,6 +297,7 @@ class SerialDatasetAsapoReceiver(SerialAsapoReceiver):
data.append(self.broker.retrieve_data(meta))
meta['dataset_id'] = current_id
meta['substream'] = self.substream
meta['stream'] = self.stream
log.info("Received record with id=%i.", current_id)
except asapo_consumer.AsapoEndOfStreamError as err:
raise EndOfStreamError(
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment