Commit c1863fff authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Merge branch 'fix/speedup' into 'master'

Fix and speedup

See merge request fs-sc/asapoworker!33
parents 6e9f0a6a 0ebfec4e
Pipeline #8055 passed with stage
in 55 seconds
...@@ -159,12 +159,11 @@ class Application: ...@@ -159,12 +159,11 @@ class Application:
if not stream: if not stream:
stream = self.get_starting_stream(consumer) stream = self.get_starting_stream(consumer)
_set_start_position(
self.options, consumer, sender, self.worker_class)
log.info("Setting stream=%s", stream) log.info("Setting stream=%s", stream)
consumer.stream = stream consumer.stream = stream
if sender: if sender:
sender.stream = stream sender.stream = stream
_set_start_position(self.options, consumer, sender, self.worker_class)
worker = create_instance_from_configurable( worker = create_instance_from_configurable(
self.worker_class, self.options["worker"]) self.worker_class, self.options["worker"])
...@@ -221,6 +220,12 @@ class Application: ...@@ -221,6 +220,12 @@ class Application:
def _set_start_position(options, consumer, sender, worker_class): def _set_start_position(options, consumer, sender, worker_class):
if (not hasattr(worker_class, 'calculate_start_ids')
or not hasattr(worker_class, 'calculate_start_index')):
log.info(
"Worker does not support starting from the last processed "
"record. Starting from the beginning instead")
return
if ("start_id" not in options["worker"] if ("start_id" not in options["worker"]
or "start_id" not in options["receiver"] or "start_id" not in options["receiver"]
or "start_index" not in options["worker"]): or "start_index" not in options["worker"]):
...@@ -246,26 +251,21 @@ def _set_start_position(options, consumer, sender, worker_class): ...@@ -246,26 +251,21 @@ def _set_start_position(options, consumer, sender, worker_class):
last_output_metadata = None last_output_metadata = None
if last_output_metadata: if last_output_metadata:
try: input_start_id, output_start_id = (
input_start_id, output_start_id = ( worker_class.calculate_start_ids(last_output_metadata))
worker_class.calculate_start_ids(last_output_metadata))
output_start_index = worker_class.calculate_start_index( output_start_index = worker_class.calculate_start_index(
last_output_metadata) last_output_metadata)
except AttributeError:
log.warning( consumer.set_start_id(input_start_id)
"Worker does not support starting from the last processed "
"record. Starting from the beginning instead")
else:
consumer.set_start_id(input_start_id)
log.info( log.info(
"Setting worker option start_id=%s", output_start_id) "Setting worker option start_id=%s", output_start_id)
options["worker"]["start_id"] = output_start_id options["worker"]["start_id"] = output_start_id
log.info( log.info(
"Setting worker option start_index=%s", output_start_index) "Setting worker option start_index=%s", output_start_index)
options["worker"]["start_index"] = output_start_index options["worker"]["start_index"] = output_start_index
def _unset_start_position(options): def _unset_start_position(options):
......
...@@ -67,6 +67,8 @@ class SimpleAsapoReceiver: ...@@ -67,6 +67,8 @@ class SimpleAsapoReceiver:
"The name of the stream.", type=str, default="default", init=False) "The name of the stream.", type=str, default="default", init=False)
data_source = Config( data_source = Config(
"Name of input data_source", type=str, default="") "Name of input data_source", type=str, default="")
n_resend_nacs = Config(
"Number if tries to resend unacknowledged messages", type=int, default=0)
@group_id.default @group_id.default
def _generate_group_id(self): def _generate_group_id(self):
......
...@@ -171,12 +171,12 @@ class Streamer: ...@@ -171,12 +171,12 @@ class Streamer:
except StreamFinishedError as err: except StreamFinishedError as err:
log.info("Stream is finished") log.info("Stream is finished")
self.stream_finished = True self.stream_finished = True
left_msgs = len(self.receiver.get_unacknowledged_messages())
log.info("Number of unacknowledged messages = %s", left_msgs)
self._handle_end_of_stream() self._handle_end_of_stream()
stream_info = self.receiver.get_stream_info() stream_info = self.receiver.get_stream_info()
self.worker.stream_finished(stream_info) self.worker.stream_finished(stream_info)
if left_msgs == 0: left_msgs = len(self.receiver.get_unacknowledged_messages())
log.info("Number of unacknowledged messages = %s", left_msgs)
if left_msgs == 0 or self.receiver.n_resend_nacs == 0:
# Stop this instance if it can not start new instance anymore # Stop this instance if it can not start new instance anymore
if self.end_of_stream_callback is None: if self.end_of_stream_callback is None:
self.stop() self.stop()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment