Commit 49186dae authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Resend unacknowledged messages

parent c07fb73e
Pipeline #6457 passed with stage
in 53 seconds
......@@ -123,6 +123,11 @@ class Application:
consumer = create_instance_from_configurable(
self.consumer_class, self.options["receiver"], kwargs=kwargs)
if consumer.n_resend_nacs > 0:
log.info("Resend unacknowledged messages n=%s times", consumer.n_resend_nacs)
consumer.consumer.set_resend_nacs(True, consumer.timeout,
consumer.n_resend_nacs)
if not self.receiver_consumer:
self.receiver_consumer = consumer.consumer
......@@ -161,6 +166,12 @@ class Application:
if sender:
sender.stream = stream
if sender and "group_id" not in self.options["receiver"]:
sender_data_source = self.options["sender"]["data_source"]
group_id = str(binascii.crc32(sender_data_source.encode()))
consumer.group_id = group_id
log.info(f"Set consumer group ID=%s, group_id")
worker = create_instance_from_configurable(
self.worker_class, self.options["worker"])
......@@ -197,11 +208,9 @@ class Application:
if self.options["stream"] != '':
return self.options["stream"]
try:
sender_data_source = self.options["sender"]["data_source"]
group_id = str(binascii.crc32(sender_data_source.encode()))
streams = consumer.get_stream_list()
for stream_info in streams:
msgs = consumer.get_unacknowledged_messages(group_id, stream=stream_info["name"])
msgs = consumer.get_unacknowledged_messages(stream=stream_info["name"])
if msgs:
log.info("Earliest non-processed stream=%s, number of messages=%s",
stream_info["name"], len(msgs))
......
import logging
import binascii
import asapo_consumer
from AsapoWorker.configurable import Configurable, Config
from AsapoWorker.errors import (
......@@ -59,8 +58,13 @@ class SimpleAsapoReceiver:
type=str)
stream = Config(
"The name of the stream.", type=str, default="default", init=False)
data_source=Config(
n_resend_nacs = Config(
"Number if tries to resend unacknowledged messages", type=int, default=0)
data_source = Config(
"Name of input data_source", type=str, default="")
timeout = Config(
"Allowed time in milliseconds for ASAP::O data access before "
"exception is thrown", type=float, default=3000)
@group_id.default
def _generate_group_id(self):
......@@ -121,16 +125,17 @@ class SimpleAsapoReceiver:
def acknowledge(self, name, ids):
for ack_id in ids:
try:
group_id = str(binascii.crc32(name.encode()))
log.info("Acknowledging id=%s stream=%s group_id=%s", ack_id, self.stream, group_id)
self.consumer.acknowledge(group_id, ack_id, stream=self.stream)
log.info("Acknowledging id=%s stream=%s group_id=%s", ack_id, self.stream, self.group_id)
self.consumer.acknowledge(self.group_id, ack_id, stream=self.stream)
except Exception as e:
log.warning("Acknowledging of id=%s stream=%s group_id=%s fails. Reason=%s",
ack_id, self.stream, group_id, e)
ack_id, self.stream, self.group_id, e)
def get_unacknowledged_messages(self, group_id, stream):
def get_unacknowledged_messages(self, stream=None):
if stream is None:
stream = self.stream
try:
msgs = self.consumer.get_unacknowledged_messages(group_id, stream=stream)
msgs = self.consumer.get_unacknowledged_messages(self.group_id, stream=stream)
return msgs
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get unacknowledged messages") from err
......
......@@ -110,6 +110,7 @@ class Streamer:
self.delay_on_error = delay_on_error
self.end_of_stream_callback = end_of_stream_callback
self.likely_done = False
self.stream_finished = False
self.fix_metadata_stream = fix_metadata_stream
self.stopped = Event()
self.stream_naming_scheme = stream_naming_scheme
......@@ -155,6 +156,11 @@ class Streamer:
log.info(format_error(err))
self._handle_receiver_temporary_error()
self._handle_end_of_stream()
if self.stream_finished:
stream_info = self.receiver.get_stream_info()
self.worker.stream_finished(stream_info)
if self.end_of_stream_callback is None:
self.stop()
return None, None
except TemporaryError as err:
log.warn(format_error(err))
......@@ -166,12 +172,16 @@ class Streamer:
return None, None
except StreamFinishedError as err:
log.info("Stream is finished")
stream_info = self.receiver.get_stream_info()
self.worker.stream_finished(stream_info)
self._handle_end_of_stream()
# Stop this instance if it can not start new instance anymore
if self.end_of_stream_callback is None:
self.stop()
self.stream_finished = True
left_msgs = len(self.receiver.get_unacknowledged_messages())
log.info("Number of unacknowledged messages = %s", left_msgs)
if left_msgs == 0:
stream_info = self.receiver.get_stream_info()
self.worker.stream_finished(stream_info)
self._handle_end_of_stream()
# Stop this instance if it can not start new instance anymore
if self.end_of_stream_callback is None:
self.stop()
return None, None
except Exception as err:
log.critical("Unhandled exception", exc_info=True)
......@@ -243,7 +253,7 @@ class Streamer:
if (has_data or new_stream is not None) and self.end_of_stream_callback:
# call the callback only once
print("Start next: ", new_stream)
log.debug("Start next stream=%s", new_stream)
self.end_of_stream_callback(new_stream)
self.end_of_stream_callback = None
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment