Commit 0fe7c647 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Add flag to use meta_only

parent f0d91220
......@@ -69,6 +69,10 @@ class Application:
"--receive_dataset", default=False, action='store_true',
help="Expect dataset in receiving stream")
parser.add_argument(
"--meta_only", default=False, action='store_true',
help="Get only metadata from receiver")
parser.add_argument(
"--substream", type=str, default="default", metavar="STR",
help="The substream where streaming should start")
......@@ -156,11 +160,11 @@ class Application:
streamer_options = {}
if "delay_on_error" in self.options:
streamer_options = {
"delay_on_error": self.options["delay_on_error"]}
streamer_options["delay_on_error"] = self.options["delay_on_error"]
if "receive_dataset" in self.options:
streamer_options = {
"receive_dataset": self.options["receive_dataset"]}
streamer_options["receive_dataset"] = self.options["receive_dataset"]
if "meta_only" in self.options:
streamer_options["meta_only"] = self.options["meta_only"]
streamer = Streamer(
consumer, worker, metadata_receiver=self.metadata_receiver,
......
......@@ -70,39 +70,26 @@ class SimpleAsapoReceiver:
log.info("New group_id=%s.", group_id)
return group_id
def get_next(self, meta_only=True):
def get_next(self, meta_only=True, receive_dataset=False):
log.info("Requesting next record for group_id=%s.", self.group_id)
try:
data, metadata = self.broker.get_next(
self.group_id, meta_only=meta_only, substream=self.substream)
except asapo_consumer.AsapoEndOfStreamError as err:
raise EndOfStreamError("End of stream") from err
except (asapo_consumer.AsapoUnavailableServiceError,
asapo_consumer.AsapoInterruptedTransactionError,
asapo_consumer.AsapoNoDataError,
asapo_consumer.AsapoLocalIOError) as err:
raise TemporaryError("Failed to get next") from err
except asapo_consumer.AsapoWrongInputError as err:
raise ConfigurationError("Failed to get next") from err
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get next") from err
if receive_dataset:
dataset_id, metadata = self.broker.get_next_dataset(
self.group_id, substream=self.substream)
data = []
if not meta_only:
for meta in metadata:
data.append(self.broker.retrieve_data(meta))
metadata.append({'dataset_id': dataset_id,
'substream': self.substream})
else:
data, metadata = self.broker.get_next(
self.group_id, meta_only=meta_only, substream=self.substream)
metadata["substream"] = self.substream
log.info("Received record with id=%i.", metadata["_id"])
current_id = metadata["_id"]
log.info("Received record with id=%i.", current_id)
metadata["substream"] = self.substream
return data, metadata
def get_next_dataset(self, meta_only=True):
log.info("Requesting next datset record for group_id=%s.", self.group_id)
try:
current_id, metadata_list = self.broker.get_next_dataset(
self.group_id, substream=self.substream)
data_list = []
if not meta_only:
for meta in metadata_list:
data_list.append(self.broker.retrieve_data(meta))
except asapo_consumer.AsapoEndOfStreamError as err:
raise EndOfStreamError("End of stream") from err
except (asapo_consumer.AsapoUnavailableServiceError,
......@@ -115,8 +102,7 @@ class SimpleAsapoReceiver:
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get next") from err
log.info("Received record with id=%i.", current_id)
return data_list, metadata_list
return data, metadata
def get_current_size(self):
try:
......@@ -168,16 +154,34 @@ class SerialAsapoReceiver(SimpleAsapoReceiver):
self.expected_id = value
self.need_set_marker = True
def get_next(self, meta_only=True):
def get_next(self, meta_only=True, receive_dataset=False):
if self.need_set_marker:
self._set_marker()
return self._get_next(meta_only=meta_only)
return self._get_next(meta_only=meta_only, receive_dataset=receive_dataset)
def _get_next(self, meta_only):
def _get_next(self, meta_only, receive_dataset):
log.info("Requesting next record for group_id=%s.", self.group_id)
print("receive_dataset: ", receive_dataset)
try:
data, metadata = self.broker.get_next(
self.group_id, meta_only=meta_only, substream=self.substream)
if receive_dataset:
print("Bla")
current_id, metadata = self.broker.get_next_dataset(
self.group_id, substream=self.substream)
data = []
if not meta_only:
for meta in metadata:
data.append(self.broker.retrieve_data(meta))
metadata.append({'dataset_id': current_id,
'substream': self.substream})
else:
data, metadata = self.broker.get_next(
self.group_id, meta_only=meta_only, substream=self.substream)
current_id = metadata["_id"]
metadata["substream"] = self.substream
log.info("Received record with id=%i.", current_id)
except asapo_consumer.AsapoEndOfStreamError as err:
raise EndOfStreamError(
"End of stream at expected_id"
......@@ -203,9 +207,6 @@ class SerialAsapoReceiver(SimpleAsapoReceiver):
"Failed to get next at expected_id="
+ str(self.expected_id)) from err
current_id = metadata["_id"]
log.info("Received record with id=%i.", current_id)
if current_id != self.expected_id:
self.need_set_marker = True
self.retries += 1
......@@ -216,8 +217,6 @@ class SerialAsapoReceiver(SimpleAsapoReceiver):
self.expected_id = current_id + 1
self.retries = 0
metadata["substream"] = self.substream
return data, metadata
def _set_marker(self):
......
......@@ -73,10 +73,8 @@ class Streamer:
def _get_next(self):
try:
if self.receive_dataset:
return self.receiver.get_next_dataset(meta_only=self.meta_only)
else:
return self.receiver.get_next(meta_only=self.meta_only)
return self.receiver.get_next(meta_only=self.meta_only,
receive_dataset=self.receive_dataset)
except EndOfStreamError as err:
log.info(format_error(err))
self._handle_receiver_temporary_error()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment