Commit 4c310fef authored by Tim Schoof's avatar Tim Schoof
Browse files

Block in send_data if sending queue is too long

parent 440b790b
......@@ -97,6 +97,11 @@ class AsapoSender:
"Also accepts the name of the ingest mode as a string",
type=int, converter=convert_ingest_mode,
default="DEFAULT_INGEST_MODE")
queue_length_threshold = Config(
"If the length of the sending queue exceeds this value, the call to "
"send_data becomes blocking until at least one record is removed from "
"the queue (by successfully sending or by failure).",
type=int, default=10)
_n_queued = Config(
"Length of queue of data waiting to be sent", type=int, default=0,
init=False)
......@@ -136,6 +141,19 @@ class AsapoSender:
self._n_queued -= 1
raise
with self._lock:
n_queued = self._n_queued
while n_queued > self.queue_length_threshold:
log.info(
"Queue length threshold exceeded. Waiting for queued data to "
"be sent: n_queued=%s queue_length_threshold=%s",
n_queued, self.queue_length_threshold)
time.sleep(1)
with self._lock:
n_queued = self._n_queued
def _callback(self, header, err):
with self._lock:
assert self._n_queued > 0
......
......@@ -175,3 +175,16 @@ def test_asapo_sender_get_last_other_error(sender):
"Error")
with pytest.raises(StreamError):
sender.get_last()
def test_asapo_sender_queue_length_threshold(
sender, callback_executor, callback_futures):
sender.queue_length_threshold = 1
for i in range(3):
metadata = {"_id": i, "name": "foobar", "meta": {"delay": 0.3}}
sender.send_data("data", metadata)
assert sender._n_queued < 2
time.sleep(0.4) # faster than sender.wait() which waits at least 1 s
assert sender._n_queued == 0
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment