Commit 3ce9b486 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Add more error handling

parent b8e43e03
Pipeline #6392 passed with stage
in 53 seconds
......@@ -248,15 +248,18 @@ def _set_start_position(options, consumer, sender, worker_class):
group_id = str(binascii.crc32(sender.data_source.encode()))
last_acknowledged_message = consumer.get_last_acknowledged_message(group_id)
except StreamError:
last_acknowledged_message = None
input_start_id, output_start_id = (
worker_class.calculate_start_ids(last_output_metadata, last_acknowledged_message))
output_start_index = worker_class.calculate_start_index(
last_output_metadata, last_acknowledged_message)
except Exception as e:
"Worker does not support starting from the last processed "
"record. Starting from the beginning instead")
"record. %s Starting from the beginning instead", e)
......@@ -161,7 +161,7 @@ class AsapoSender:
self._n_queued += 1
if ack_dependencies is None:
ack_dependencies = [metadata["_id"]]
ack_dependencies = set([metadata["_id"]])
for out_id in acknowledge:
if out_id not in self._ids_to_acknowledge:
self._ids_to_acknowledge[out_id] = ack_dependencies
......@@ -246,7 +246,7 @@ class AsapoSender:"Successfully finished sending all queued data")
def get_last(self, meta_only=True):"Requesting last record")"Requesting last record for stream=%s",
data, metadata = self.consumer.get_last(meta_only=meta_only,
except asapo_consumer.AsapoEndOfStreamError:
......@@ -264,9 +264,10 @@ class SerialWorker(Worker):
start_index : int
This index will be assigned to the first data to be sent
if last_output_metadata:
name = last_output_metadata["name"]
last_index = int(name.rpartition("-")[-1].split(".")[0])
return last_index + 1
except Exception as e:
log.warning("Start index can not be derived")
return 0
