Commit 3bb9c78d authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

refactor

parent 94c8bac1
Pipeline #6325 passed with stage
in 53 seconds
......@@ -155,7 +155,7 @@ class Application:
if self.options["stream"] != '':
stream = self.options["stream"]
else:
stream = self.get_nonprocessed_stream(consumer.consumer)
stream = self.get_nonprocessed_stream(consumer)
if stream:
log.info("Setting stream=%s", stream)
......@@ -168,6 +168,7 @@ class Application:
worker = create_instance_from_configurable(
self.worker_class, self.options["worker"])
worker._acknowledge = consumer.acknowledge
# The start position (provided by the user or calculated from the
# acknowledged id) was used to create the worker and must be reset.
......@@ -198,16 +199,19 @@ class Application:
def get_nonprocessed_stream(self, consumer):
log.info("Get non-processed receiver stream")
sender_data_source = self.options["sender"]["data_source"]
group_id = str(binascii.crc32(sender_data_source.encode()))
streams = consumer.get_stream_list()
for stream_info in streams:
msgs = consumer.get_unacknowledged_messages(group_id, stream=stream_info["name"])
if msgs:
log.info("Earliest non-processed stream=%s, number of messages=%s",
stream_info["name"], len(msgs))
return stream_info["name"]
return None
try:
sender_data_source = self.options["sender"]["data_source"]
group_id = str(binascii.crc32(sender_data_source.encode()))
streams = consumer.get_stream_list()
for stream_info in streams:
msgs = consumer.get_unacknowledged_messages(group_id, stream=stream_info["name"])
if msgs:
log.info("Earliest non-processed stream=%s, number of messages=%s",
stream_info["name"], len(msgs))
return stream_info["name"]
except Exception as e:
log.error("Getting non-processed receiver stream fails messages=%s", e)
return 'default'
def run(self):
if not self.initialized:
......
......@@ -121,9 +121,16 @@ class SimpleAsapoReceiver:
def acknowledge(self, name, ids):
for ack_id in ids:
group_id = str(binascii.crc32(name.encode()))
log.info("Acknowledging stream=%s group_id=%s id=%s", self.stream, group_id, ack_id)
log.info("Acknowledging id=%s stream=%s group_id=%s", ack_id, self.stream, group_id)
self.consumer.acknowledge(group_id, ack_id, stream=self.stream)
def get_unacknowledged_messages(self, group_id, stream):
try:
msgs = self.consumer.get_unacknowledged_messages(group_id, stream=stream)
return msgs
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get unacknowledged messages") from err
# TODO: Ensure also that indices are consecutive or start at 0
@Configurable
......
......@@ -153,11 +153,10 @@ class AsapoSender:
def __attrs_post_init__(self):
log.info("Receiver created with ingest_mode=%s", self.ingest_mode)
def send_data(self, data, metadata, acknowledge=None, ack_dependencies=None):
def send_data(self, data, metadata, acknowledge=[], ack_dependencies=None):
log.info(
"Sending data with id=%s name=%s",
metadata["_id"], metadata["name"])
with self._lock:
self._n_queued += 1
......@@ -166,7 +165,6 @@ class AsapoSender:
for out_id in acknowledge:
if out_id not in self._ids_to_acknowledge:
self._ids_to_acknowledge[out_id] = ack_dependencies
try:
self.producer.send(
metadata['_id'], metadata['name'], data,
......@@ -215,15 +213,13 @@ class AsapoSender:
for in_id in self._ids_to_acknowledge:
if out_id in self._ids_to_acknowledge[in_id]:
if len(self._ids_to_acknowledge[in_id]) == 1:
del self._ids_to_acknowledge[in_id]
id_to_acknowledge.append(in_id)
else:
self._ids_to_acknowledge[in_id].remove(out_id)
for in_id in id_to_acknowledge:
del self._ids_to_acknowledge[in_id]
return id_to_acknowledge
def wait(self, timeout=10):
with self._lock:
n_queued = self._n_queued
......
......@@ -16,6 +16,8 @@ class Worker:
meta_only = Config(
"Get only metadata from receiver",
type=bool, default=False, init=False)
_acknowledge = Config(
"Callback to acknowledge id", init=False, default=None)
def handle_error(self):
pass
......@@ -71,7 +73,7 @@ class Worker:
"""
pass
def send(self, data, metadata, acknowledge=None, ack_dependencies=None):
def send(self, data, metadata, acknowledge=[], ack_dependencies=None):
if self.sender:
self.sender.send_data(
data, metadata, acknowledge=acknowledge, ack_dependencies=ack_dependencies)
......@@ -79,8 +81,8 @@ class Worker:
raise ConfigurationError(
"Worker wants to send data, but no sender configured!")
def send_acknowledgement(self, acknowledge=None):
self.sender._acknowledge_received(self.sender.data_source, acknowledge)
def send_acknowledgement(self, acknowledge=[]):
self._acknowledge(self.sender.data_source, acknowledge)
@Configurable(kw_only=True)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment