Commit cc46d3e4 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

refactoring and cleanup

parent 49186dae
......@@ -123,11 +123,6 @@ class Application:
consumer = create_instance_from_configurable(
self.consumer_class, self.options["receiver"], kwargs=kwargs)
if consumer.n_resend_nacs > 0:
log.info("Resend unacknowledged messages n=%s times", consumer.n_resend_nacs)
consumer.consumer.set_resend_nacs(True, consumer.timeout,
consumer.n_resend_nacs)
if not self.receiver_consumer:
self.receiver_consumer = consumer.consumer
......
......@@ -9,7 +9,7 @@ log = logging.getLogger(__name__)
def create_consumer(
source, path, has_filesystem, beamtime, data_source, token, timeout):
source, path, has_filesystem, beamtime, data_source, token, timeout, n_resend_nacs=0):
log.info(
"Create new consumer (source=%s, path=%s, has_filesystem=%s, "
"beamtime=%s, data_source=%s, token=%s, timeout=%i).",
......@@ -17,6 +17,11 @@ def create_consumer(
try:
consumer = asapo_consumer.create_consumer(
source, path, has_filesystem, beamtime, data_source, token, timeout)
if n_resend_nacs > 0:
log.info("Resend unacknowledged messages n=%s times", n_resend_nacs)
consumer.set_resend_nacs(True, timeout, n_resend_nacs)
except asapo_consumer.AsapoWrongInputError as err:
raise ConfigurationError("Cannot create consumer") from err
except asapo_consumer.AsapoConsumerError as err:
......@@ -49,7 +54,9 @@ class SimpleAsapoReceiver:
type=bool, default=False),
timeout=Config(
"Allowed time in milliseconds for ASAP::O data access before "
"exception is thrown", type=float, default=3000)
"exception is thrown", type=float, default=3000),
n_resend_nacs=Config(
"Number if tries to resend unacknowledged messages", type=int, default=0)
))
group_id = Config(
"The data_source data is divided between all workers with the same "
......@@ -58,13 +65,8 @@ class SimpleAsapoReceiver:
type=str)
stream = Config(
"The name of the stream.", type=str, default="default", init=False)
n_resend_nacs = Config(
"Number if tries to resend unacknowledged messages", type=int, default=0)
data_source = Config(
"Name of input data_source", type=str, default="")
timeout = Config(
"Allowed time in milliseconds for ASAP::O data access before "
"exception is thrown", type=float, default=3000)
@group_id.default
def _generate_group_id(self):
......@@ -122,7 +124,7 @@ class SimpleAsapoReceiver:
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get stream info") from err
def acknowledge(self, name, ids):
def acknowledge(self, ids):
for ack_id in ids:
try:
log.info("Acknowledging id=%s stream=%s group_id=%s", ack_id, self.stream, self.group_id)
......
......@@ -208,7 +208,7 @@ class AsapoSender:
else:
log.info("Successfully sent data for header=%s", header)
if self._acknowledge_received:
self._acknowledge_received(self.data_source, ids_to_acknowledge)
self._acknowledge_received(ids_to_acknowledge)
def get_ids_to_acknowledge(self, out_id):
id_to_acknowledge = []
......
......@@ -175,10 +175,10 @@ class Streamer:
self.stream_finished = True
left_msgs = len(self.receiver.get_unacknowledged_messages())
log.info("Number of unacknowledged messages = %s", left_msgs)
self._handle_end_of_stream()
if left_msgs == 0:
stream_info = self.receiver.get_stream_info()
self.worker.stream_finished(stream_info)
self._handle_end_of_stream()
# Stop this instance if it can not start new instance anymore
if self.end_of_stream_callback is None:
self.stop()
......
......@@ -89,7 +89,7 @@ class Worker:
"Worker wants to send data, but no sender configured!")
def send_acknowledgement(self, acknowledge=None):
self._acknowledge(self.sender.data_source, acknowledge)
self._acknowledge(acknowledge)
@Configurable(kw_only=True)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment