Commit 03c1b6f9 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Merge branch 'feat/resend_nack' into 'master'

Resend unacknowledged messages

See merge request !31
parents c07fb73e c20bd3ee
Pipeline #6480 passed with stage
......@@ -104,6 +104,11 @@ class Application:
if key not in self.options:
self.options[key] = {}
if "data_source" in self.options["sender"] and "group_id" not in self.options["receiver"]:
sender_data_source = self.options["sender"]["data_source"]
group_id = str(binascii.crc32(sender_data_source.encode()))
self.options["receiver"]["group_id"] = group_id
def _setup_logging(self):
log_level = self.options["log_level"]
format = (
......@@ -197,11 +202,9 @@ class Application:
if self.options["stream"] != '':
return self.options["stream"]
try:
sender_data_source = self.options["sender"]["data_source"]
group_id = str(binascii.crc32(sender_data_source.encode()))
streams = consumer.get_stream_list()
for stream_info in streams:
msgs = consumer.get_unacknowledged_messages(group_id, stream=stream_info["name"])
msgs = consumer.get_unacknowledged_messages(stream=stream_info["name"])
if msgs:
log.info("Earliest non-processed stream=%s, number of messages=%s",
stream_info["name"], len(msgs))
......
import logging
import binascii
import asapo_consumer
from AsapoWorker.configurable import Configurable, Config
from AsapoWorker.errors import (
......@@ -10,7 +9,7 @@ log = logging.getLogger(__name__)
def create_consumer(
source, path, has_filesystem, beamtime, data_source, token, timeout):
source, path, has_filesystem, beamtime, data_source, token, timeout, n_resend_nacs=0):
log.info(
"Create new consumer (source=%s, path=%s, has_filesystem=%s, "
"beamtime=%s, data_source=%s, token=%s, timeout=%i).",
......@@ -18,6 +17,11 @@ def create_consumer(
try:
consumer = asapo_consumer.create_consumer(
source, path, has_filesystem, beamtime, data_source, token, timeout)
if n_resend_nacs > 0:
log.info("Resend unacknowledged messages n=%s times", n_resend_nacs)
consumer.set_resend_nacs(True, timeout, n_resend_nacs)
except asapo_consumer.AsapoWrongInputError as err:
raise ConfigurationError("Cannot create consumer") from err
except asapo_consumer.AsapoConsumerError as err:
......@@ -50,7 +54,9 @@ class SimpleAsapoReceiver:
type=bool, default=False),
timeout=Config(
"Allowed time in milliseconds for ASAP::O data access before "
"exception is thrown", type=float, default=3000)
"exception is thrown", type=float, default=3000),
n_resend_nacs=Config(
"Number if tries to resend unacknowledged messages", type=int, default=0)
))
group_id = Config(
"The data_source data is divided between all workers with the same "
......@@ -59,7 +65,7 @@ class SimpleAsapoReceiver:
type=str)
stream = Config(
"The name of the stream.", type=str, default="default", init=False)
data_source=Config(
data_source = Config(
"Name of input data_source", type=str, default="")
@group_id.default
......@@ -118,19 +124,20 @@ class SimpleAsapoReceiver:
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get stream info") from err
def acknowledge(self, name, ids):
def acknowledge(self, ids):
for ack_id in ids:
try:
group_id = str(binascii.crc32(name.encode()))
log.info("Acknowledging id=%s stream=%s group_id=%s", ack_id, self.stream, group_id)
self.consumer.acknowledge(group_id, ack_id, stream=self.stream)
log.info("Acknowledging id=%s stream=%s group_id=%s", ack_id, self.stream, self.group_id)
self.consumer.acknowledge(self.group_id, ack_id, stream=self.stream)
except Exception as e:
log.warning("Acknowledging of id=%s stream=%s group_id=%s fails. Reason=%s",
ack_id, self.stream, group_id, e)
ack_id, self.stream, self.group_id, e)
def get_unacknowledged_messages(self, group_id, stream):
def get_unacknowledged_messages(self, stream=None):
if stream is None:
stream = self.stream
try:
msgs = self.consumer.get_unacknowledged_messages(group_id, stream=stream)
msgs = self.consumer.get_unacknowledged_messages(self.group_id, stream=stream)
return msgs
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get unacknowledged messages") from err
......
......@@ -208,7 +208,7 @@ class AsapoSender:
else:
log.info("Successfully sent data for header=%s", header)
if self._acknowledge_received:
self._acknowledge_received(self.data_source, ids_to_acknowledge)
self._acknowledge_received(ids_to_acknowledge)
def get_ids_to_acknowledge(self, out_id):
id_to_acknowledge = []
......
......@@ -110,6 +110,7 @@ class Streamer:
self.delay_on_error = delay_on_error
self.end_of_stream_callback = end_of_stream_callback
self.likely_done = False
self.stream_finished = False
self.fix_metadata_stream = fix_metadata_stream
self.stopped = Event()
self.stream_naming_scheme = stream_naming_scheme
......@@ -155,6 +156,9 @@ class Streamer:
log.info(format_error(err))
self._handle_receiver_temporary_error()
self._handle_end_of_stream()
if self.stream_finished:
if self.end_of_stream_callback is None:
self.stop()
return None, None
except TemporaryError as err:
log.warn(format_error(err))
......@@ -166,12 +170,16 @@ class Streamer:
return None, None
except StreamFinishedError as err:
log.info("Stream is finished")
self.stream_finished = True
left_msgs = len(self.receiver.get_unacknowledged_messages())
log.info("Number of unacknowledged messages = %s", left_msgs)
self._handle_end_of_stream()
stream_info = self.receiver.get_stream_info()
self.worker.stream_finished(stream_info)
self._handle_end_of_stream()
# Stop this instance if it can not start new instance anymore
if self.end_of_stream_callback is None:
self.stop()
if left_msgs == 0:
# Stop this instance if it can not start new instance anymore
if self.end_of_stream_callback is None:
self.stop()
return None, None
except Exception as err:
log.critical("Unhandled exception", exc_info=True)
......@@ -243,7 +251,7 @@ class Streamer:
if (has_data or new_stream is not None) and self.end_of_stream_callback:
# call the callback only once
print("Start next: ", new_stream)
log.debug("Start next stream=%s", new_stream)
self.end_of_stream_callback(new_stream)
self.end_of_stream_callback = None
......
......@@ -89,7 +89,7 @@ class Worker:
"Worker wants to send data, but no sender configured!")
def send_acknowledgement(self, acknowledge=None):
self._acknowledge(self.sender.data_source, acknowledge)
self._acknowledge(acknowledge)
@Configurable(kw_only=True)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment