Commit 2c3a4f67 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Remove serial worker

parent 3ce9b486
Pipeline #6413 passed with stage
in 49 seconds
......@@ -152,31 +152,17 @@ class Application:
self.options["metadata_receiver"])
if not stream:
if self.options["stream"] != '':
stream = self.options["stream"]
else:
stream = self.get_nonprocessed_stream(consumer)
if stream:
log.info("Setting stream=%s", stream)
consumer.stream = stream
if sender:
sender.stream = stream
stream = self.get_starting_stream(consumer)
_set_start_position(
self.options, consumer, sender, self.worker_class)
log.info("Setting stream=%s", stream)
consumer.stream = stream
if sender:
sender.stream = stream
worker = create_instance_from_configurable(
self.worker_class, self.options["worker"])
worker._acknowledge = consumer.acknowledge
# The start position (provided by the user or calculated from the
# acknowledged id) was used to create the worker and must be reset.
# This means that for all but the first worker the user provided start
# id will be ignored and the start id will be calculated from the
# acknowledged ids
_unset_start_position(self.options)
if sender:
worker.sender = sender
......@@ -197,8 +183,10 @@ class Application:
return streamer
def get_nonprocessed_stream(self, consumer):
log.info("Get non-processed receiver stream")
def get_starting_stream(self, consumer):
log.info("Get starting receiver stream")
if self.options["stream"] != '':
return self.options["stream"]
try:
sender_data_source = self.options["sender"]["data_source"]
group_id = str(binascii.crc32(sender_data_source.encode()))
......@@ -218,64 +206,3 @@ class Application:
self.initialize()
self.streamer_manager.run()
def _set_start_position(options, consumer, sender, worker_class):
if ("start_id" not in options["worker"]
or "start_id" not in options["receiver"]
or "start_index" not in options["worker"]):
if ("start_id" in options["worker"]
or "start_id" in options["receiver"]):
raise ValueError(
"The start id must be given for either both or none of "
"the receiver and worker")
if ("start_id" in options["worker"]
or "start_index" in options["worker"]):
raise ValueError(
"If one of start id or start index for the worker is given, "
"the other must be given as well")
if sender:
try:
_, last_output_metadata = sender.get_last()
except StreamError as err:
log.warning(
"Retrieving last output data failed with err=%s", err)
last_output_metadata = None
else:
last_output_metadata = None
try:
group_id = str(binascii.crc32(sender.data_source.encode()))
last_acknowledged_message = consumer.get_last_acknowledged_message(group_id)
except StreamError:
last_acknowledged_message = None
try:
input_start_id, output_start_id = (
worker_class.calculate_start_ids(last_output_metadata, last_acknowledged_message))
output_start_index = worker_class.calculate_start_index(
last_output_metadata, last_acknowledged_message)
except Exception as e:
log.warning(
"Worker does not support starting from the last processed "
"record. %s Starting from the beginning instead", e)
else:
consumer.set_start_id(input_start_id)
log.info(
"Setting worker option start_id=%s", output_start_id)
options["worker"]["start_id"] = output_start_id
log.info(
"Setting worker option start_index=%s", output_start_index)
options["worker"]["start_index"] = output_start_index
def _unset_start_position(options):
for key in ["start_id", "start_index"]:
for section in ["receiver", "worker"]:
try:
del options[section][key]
except KeyError:
pass
......@@ -171,103 +171,3 @@ class SimpleWorker(Worker):
new_metadata = self.get_output_metadata(metadata, extra_meta)
super().send(data, new_metadata, acknowledge=acknowledge)
@Configurable(kw_only=True)
class SerialWorker(Worker):
start_id = Config(
"The id of the first record that will be sent",
type=int, default=1)
start_index = Config(
"The index within a scan of the first record that will be sent",
type=int, default=0)
last_id = Config("Id of the last sent record", type=int, init=False)
last_index = Config(
"Index within a scan of the last sent record",
type=int, init=False)
@last_id.default
def _last_id_default(self):
return self.start_id - 1
@last_index.default
def _last_index_default(self):
return self.start_index - 1
@classmethod
def calculate_start_ids(cls, last_output_metadata, last_acknowledged_message):
"""
Calculate the start ids for sender and receiver, respectively, from the
metadata of the last record in the output stream.
This method is called by the application before data processing begins,
if neither of the start ids are configured.
The worker should use any additional information to determing the start
ids by overwriting this method.
Parameters
----------
last_output_metadata : dict
Metadata of the last record in the output stream
last_acknowledged_message: int
Last acknowledged message in the input stream
Returns
-------
input_start_id : int
Processing will start with the data at this id
output_start_id : int
This id will be assigned to the first data to be sent
"""
if last_output_metadata:
last_output_id = last_output_metadata["_id"]
try:
last_acknowledged_id = max(
last_output_metadata["meta"]["acknowledged_ids"])
except KeyError:
last_acknowledged_id = 0
if last_acknowledged_id < 0:
raise ValueError(
"No valid acknowledged id in last record of output stream "
"last_acknowledged_id=%s",
last_acknowledged_id)
input_start_id = last_acknowledged_id + 1
output_start_id = last_output_id + 1
return input_start_id, output_start_id
else:
return 1, 1
@classmethod
def calculate_start_index(cls, last_output_metadata, last_acknowledged_message):
"""
Calculate the start index for the sender from the
metadata of the last record in the output stream.
This method is called by the application before data processing begins,
if the start index is not configured.
The arguments might be None when called.
The worker should use any additional information to determing the start
index by overwriting this method.
Parameters
----------
last_output_metadata : dict
Metadata of the last record in the output stream
last_acknowledged_message: int
Last acknowledged message in the input stream
Returns
-------
start_index : int
This index will be assigned to the first data to be sent
"""
try:
name = last_output_metadata["name"]
last_index = int(name.rpartition("-")[-1].split(".")[0])
return last_index + 1
except Exception as e:
log.warning("Start index can not be derived")
return 0
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment