Commit 82461e10 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Acknowledgements with dependencies

parent 23b45aa4
Pipeline #6303 passed with stage
in 49 seconds
......@@ -153,18 +153,17 @@ class AsapoSender:
def __attrs_post_init__(self):
log.info("Receiver created with ingest_mode=%s", self.ingest_mode)
def send_data(self, data, metadata, acknowledge=None):
def send_data(self, data, metadata, acknowledge=None, ack_dependencies=None):
log.info(
"Sending data with id=%s name=%s",
metadata["_id"], metadata["name"])
out_id = metadata["_id"]
if acknowledge is None:
acknowledge = []
with self._lock:
self._n_queued += 1
self._ids_to_acknowledge[out_id] = acknowledge
if acknowledge is not None and acknowledge not in self._ids_to_acknowledge:
if ack_dependencies is None:
ack_dependencies = [metadata["_id"]]
self._ids_to_acknowledge[acknowledge] = ack_dependencies
try:
self.producer.send(
......@@ -199,7 +198,7 @@ class AsapoSender:
with self._lock:
assert self._n_queued > 0
self._n_queued -= 1
ids_to_acknowledge = self._ids_to_acknowledge[out_id]
ids_to_acknowledge = self.get_ids_to_acknowledge(out_id)
if err:
log.error(
"Sending data failed for header=%s with error='%s'",
......@@ -209,6 +208,20 @@ class AsapoSender:
if self._acknowledge_received:
self._acknowledge_received(self.data_source, ids_to_acknowledge)
def get_ids_to_acknowledge(self, out_id):
id_to_acknowledge = []
for in_id in self._ids_to_acknowledge:
if out_id in self._ids_to_acknowledge[in_id]:
if len(self._ids_to_acknowledge[in_id]) == 1:
del self._ids_to_acknowledge[in_id]
id_to_acknowledge = [in_id]
else:
self._ids_to_acknowledge[in_id].remove(out_id)
return id_to_acknowledge
def wait(self, timeout=10):
with self._lock:
n_queued = self._n_queued
......
......@@ -71,14 +71,17 @@ class Worker:
"""
pass
def send(self, data, metadata, acknowledge=None):
def send(self, data, metadata, acknowledge=None, ack_dependencies=None):
if self.sender:
self.sender.send_data(
data, metadata, acknowledge=acknowledge)
data, metadata, acknowledge=acknowledge, ack_dependencies=ack_dependencies)
else:
raise ConfigurationError(
"Worker wants to send data, but no sender configured!")
def send_acknowledgement(self, acknowledge=None):
self.sender._acknowledge_received(self.sender.data_source, acknowledge)
@Configurable(kw_only=True)
class SimpleWorker(Worker):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment