Commit f0d91220 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Add option to receive dataset

parent 17944783
......@@ -65,6 +65,10 @@ class Application:
"--delay_on_error", type=float, default=1, metavar="FLOAT",
help="When an error occurs, wait so many seconds before retrying")
parser.add_argument(
"--receive_dataset", default=False, action='store_true',
help="Expect dataset in receiving stream")
parser.add_argument(
"--substream", type=str, default="default", metavar="STR",
help="The substream where streaming should start")
......@@ -150,11 +154,13 @@ class Application:
if sender:
worker.sender = sender
streamer_options = {}
if "delay_on_error" in self.options:
streamer_options = {
"delay_on_error": self.options["delay_on_error"]}
else:
streamer_options = {}
if "receive_dataset" in self.options:
streamer_options = {
"receive_dataset": self.options["receive_dataset"]}
streamer = Streamer(
consumer, worker, metadata_receiver=self.metadata_receiver,
......
......@@ -94,6 +94,30 @@ class SimpleAsapoReceiver:
return data, metadata
def get_next_dataset(self, meta_only=True):
log.info("Requesting next datset record for group_id=%s.", self.group_id)
try:
current_id, metadata_list = self.broker.get_next_dataset(
self.group_id, substream=self.substream)
data_list = []
if not meta_only:
for meta in metadata_list:
data_list.append(self.broker.retrieve_data(meta))
except asapo_consumer.AsapoEndOfStreamError as err:
raise EndOfStreamError("End of stream") from err
except (asapo_consumer.AsapoUnavailableServiceError,
asapo_consumer.AsapoInterruptedTransactionError,
asapo_consumer.AsapoNoDataError,
asapo_consumer.AsapoLocalIOError) as err:
raise TemporaryError("Failed to get next") from err
except asapo_consumer.AsapoWrongInputError as err:
raise ConfigurationError("Failed to get next") from err
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get next") from err
log.info("Received record with id=%i.", current_id)
return data_list, metadata_list
def get_current_size(self):
try:
return self.broker.get_current_size(substream=self.substream)
......
......@@ -29,8 +29,9 @@ class ContainsAll:
class Streamer:
def __init__(
self, receiver, worker, metadata_receiver=None, delay_on_error=3,
end_of_stream_callback=None, meta_only=False):
end_of_stream_callback=None, meta_only=False, receive_dataset=False):
self.receiver = receiver
self.receive_dataset = receive_dataset
self.worker = worker
self.metadata_receiver = metadata_receiver
self.initial_delay_on_error = delay_on_error
......@@ -72,7 +73,10 @@ class Streamer:
def _get_next(self):
try:
data, metadata = self.receiver.get_next(meta_only=self.meta_only)
if self.receive_dataset:
return self.receiver.get_next_dataset(meta_only=self.meta_only)
else:
return self.receiver.get_next(meta_only=self.meta_only)
except EndOfStreamError as err:
log.info(format_error(err))
self._handle_receiver_temporary_error()
......@@ -91,8 +95,6 @@ class Streamer:
self._handle_receiver_critical_error()
raise err
return data, metadata
def _handle_end_of_stream(self):
# When receiving an EndOfStreamError, there are two cases to consider:
# 1. The substream has data and metadata, i.e., occurs in the substream
......
......@@ -47,6 +47,12 @@ class Worker:
raise ConfigurationError(
"Worker wants to send data, but no sender configured!")
def shutdown(self):
"""
Function is called at the end of processing
"""
pass
@Configurable(kw_only=True)
class SimpleWorker(Worker):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment