Commit b01c0b6c authored by Tim Schoof's avatar Tim Schoof
Browse files

Calculate start id and index from the last record of the output stream

This commit introduces a SerialWorker base class for workers which keep
track of the last sent id and index. Additionally an interface is added
which allows the application to calculate the start id and index
from the metadata of the last sent output record at startup.
parent 91d5606d
......@@ -8,6 +8,7 @@ from AsapoWorker.configuration import (
parsed_args_to_dict)
from AsapoWorker.asapo_receiver import AsapoReceiver
from AsapoWorker.streamer import Streamer
from AsapoWorker.errors import StreamError
class Application:
......@@ -58,15 +59,23 @@ class Application:
logging.basicConfig(level=log_level, format=format)
logging.info("Log level set to %s", log_level)
self.consumer = create_instance_from_configurable(
consumer = create_instance_from_configurable(
self.consumer_class, options["receiver"])
self.worker = create_instance_from_configurable(
self.worker_class, options["worker"])
if self.producer_class:
self.worker.sender = create_instance_from_configurable(
sender = create_instance_from_configurable(
self.producer_class, options["sender"])
else:
sender = None
_set_start_position(
options, consumer, sender, self.worker_class)
worker = create_instance_from_configurable(
self.worker_class, options["worker"])
if sender:
worker.sender = sender
if "delay_on_error" in options:
streamer_options = {"delay_on_error": options["delay_on_error"]}
......@@ -74,7 +83,7 @@ class Application:
streamer_options = {}
self.streamer = Streamer(
self.consumer, self.worker, **streamer_options)
consumer, worker, **streamer_options)
def signalhandler(signum, frame):
self.streamer.stop()
......@@ -89,3 +98,51 @@ class Application:
self.initialize()
self.streamer.run()
def _set_start_position(options, consumer, sender, worker_class):
if ("start_id" not in options["worker"]
or "start_id" not in options["receiver"]
or "start_index" not in options["worker"]):
if ("start_id" in options["worker"]
or "start_id" in options["receiver"]):
raise ValueError(
"The start id must be given for either both or none of "
"the receiver and worker")
if ("start_id" in options["worker"]
or "start_index" in options["worker"]):
raise ValueError(
"If one of start id or start index for the worker is given, "
"the other must be given as well")
if sender:
try:
_, last_output_metadata = sender.get_last()
except StreamError as err:
logging.warning(
"Retrieving last output data failed with err=%s", err)
last_output_metadata = None
else:
last_output_metadata = None
if last_output_metadata:
try:
input_start_id, output_start_id = (
worker_class.calculate_start_ids(last_output_metadata))
output_start_index = worker_class.calculate_start_index(
last_output_metadata)
except AttributeError:
logging.warning(
"Worker does not support starting from the last processed "
"record. Starting from the beginning instead")
else:
consumer.set_start_id(input_start_id)
logging.info(
"Setting worker option start_id=%s", output_start_id)
options["worker"]["start_id"] = output_start_id
logging.info(
"Setting worker option start_index=%s", output_start_index)
options["worker"]["start_index"] = output_start_index
......@@ -30,3 +30,93 @@ class Worker:
else:
raise ConfigurationError(
"Worker wants to send data, but no sender configured!")
@Configurable(kw_only=True)
class SerialWorker(Worker):
start_id = Config(
"The id of the first record that will be sent",
type=int, default=1)
start_index = Config(
"The index within a scan of the first record that will be sent",
type=int, default=0)
last_id = Config("Id of the last sent record", type=int, init=False)
last_index = Config(
"Index within a scan of the last sent record",
type=int, init=False)
@last_id.default
def _last_id_default(self):
return self.start_id - 1
@last_index.default
def _last_index_default(self):
return self.start_index - 1
@classmethod
def calculate_start_ids(cls, last_output_metadata):
"""
Calculate the start ids for sender and receiver, respectively, from the
metadata of the last record in the output stream.
This method is called by the application before data processing begins,
if neither of the start ids are configured.
The worker should use any additional information to determing the start
ids by overwriting this method.
Parameters
----------
last_output_metadata : dict
Metadata of the last record in the output stream
Returns
-------
input_start_id : int
Processing will start with the data at this id
output_start_id : int
This id will be assigned to the first data to be sent
"""
if last_output_metadata:
last_output_id = last_output_metadata["_id"]
try:
last_acknowledged_id = max(
last_output_metadata["meta"]["acknowledged_ids"])
except KeyError:
last_acknowledged_id = 0
input_start_id = last_acknowledged_id + 1
output_start_id = last_output_id + 1
return input_start_id, output_start_id
else:
return 1, 1
@classmethod
def calculate_start_index(cls, last_output_metadata):
"""
Calculate the start index for the sender from the
metadata of the last record in the output stream.
This method is called by the application before data processing begins,
if the start index is not configured.
The arguments might be None when called.
The worker should use any additional information to determing the start
index by overwriting this method.
Parameters
----------
last_output_metadata : dict
Metadata of the last record in the output stream
Returns
-------
start_index : int
This index will be assigned to the first data to be sent
"""
if last_output_metadata:
name = last_output_metadata["name"]
last_index = int(name.rpartition("-")[-1].split(".")[0])
return last_index + 1
else:
return 0
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment