Commit b8e43e03 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

add parameters for serial worker

parent 59f8cbad
Pipeline #6334 passed with stage
in 48 seconds
......@@ -245,27 +245,28 @@ def _set_start_position(options, consumer, sender, worker_class):
else:
last_output_metadata = None
if last_output_metadata:
try:
input_start_id, output_start_id = (
worker_class.calculate_start_ids(last_output_metadata))
try:
group_id = str(binascii.crc32(sender.data_source.encode()))
last_acknowledged_message = consumer.get_last_acknowledged_message(group_id)
input_start_id, output_start_id = (
worker_class.calculate_start_ids(last_output_metadata, last_acknowledged_message))
output_start_index = worker_class.calculate_start_index(
last_output_metadata)
except AttributeError:
log.warning(
"Worker does not support starting from the last processed "
"record. Starting from the beginning instead")
else:
consumer.set_start_id(input_start_id)
output_start_index = worker_class.calculate_start_index(
last_output_metadata, last_acknowledged_message)
except Exception as e:
log.warning(
"Worker does not support starting from the last processed "
"record. Starting from the beginning instead")
else:
consumer.set_start_id(input_start_id)
log.info(
"Setting worker option start_id=%s", output_start_id)
options["worker"]["start_id"] = output_start_id
log.info(
"Setting worker option start_id=%s", output_start_id)
options["worker"]["start_id"] = output_start_id
log.info(
"Setting worker option start_index=%s", output_start_index)
options["worker"]["start_index"] = output_start_index
log.info(
"Setting worker option start_index=%s", output_start_index)
options["worker"]["start_index"] = output_start_index
def _unset_start_position(options):
......
......@@ -131,6 +131,12 @@ class SimpleAsapoReceiver:
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get unacknowledged messages") from err
def get_last_acknowledged_message(self, group_id):
try:
return self.consumer.get_last_acknowledged_message(group_id, stream=self.stream)
except Exception as err:
raise StreamError("Failed to get unacknowledged messages") from err
# TODO: Ensure also that indices are consecutive or start at 0
@Configurable
......
......@@ -195,7 +195,7 @@ class SerialWorker(Worker):
return self.start_index - 1
@classmethod
def calculate_start_ids(cls, last_output_metadata):
def calculate_start_ids(cls, last_output_metadata, last_acknowledged_message):
"""
Calculate the start ids for sender and receiver, respectively, from the
metadata of the last record in the output stream.
......@@ -210,6 +210,8 @@ class SerialWorker(Worker):
----------
last_output_metadata : dict
Metadata of the last record in the output stream
last_acknowledged_message: int
Last acknowledged message in the input stream
Returns
-------
......@@ -237,7 +239,7 @@ class SerialWorker(Worker):
return 1, 1
@classmethod
def calculate_start_index(cls, last_output_metadata):
def calculate_start_index(cls, last_output_metadata, last_acknowledged_message):
"""
Calculate the start index for the sender from the
metadata of the last record in the output stream.
......@@ -254,6 +256,8 @@ class SerialWorker(Worker):
----------
last_output_metadata : dict
Metadata of the last record in the output stream
last_acknowledged_message: int
Last acknowledged message in the input stream
Returns
-------
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment