Commit 9a8844b8 authored by Tim Schoof's avatar Tim Schoof
Browse files

Merge branch 'asapo_sender_retry' into 'master'

Retry getting group id in case of errors

See merge request !13
parents 0e5bee8d cbc33309
Pipeline #1709 passed with stage
in 38 seconds
......@@ -107,6 +107,12 @@ class AsapoSender:
"send_data becomes blocking until at least one record is removed from "
"the queue (by successfully sending or by failure).",
type=int, default=2)
retries = Config(
"Number of retries in case of connection problems",
type=int, default=3)
retry_delay = Config(
"Seconds between retries in case of connection problems",
type=float, default=3)
_n_queued = Config(
"Length of queue of data waiting to be sent", type=int, default=0,
init=False)
......@@ -119,10 +125,18 @@ class AsapoSender:
@_group_id.default
def _generate_group_id(self):
log.info("Generating new group id.")
try:
group_id = self.broker.generate_group_id()
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Cannot generate group_id") from err
error = None
for i in range(self.retries):
try:
group_id = self.broker.generate_group_id()
break
except asapo_consumer.AsapoConsumerError as err:
error = err
if i < self.retries - 1:
log.warning("Cannot generate group id, retrying...")
time.sleep(self.retry_delay)
else:
raise StreamError("Cannot generate group_id") from error
log.info("New group_id=%s.", group_id)
return group_id
......
......@@ -188,4 +188,18 @@ def test_asapo_sender_queue_length_threshold(
time.sleep(0.4) # faster than sender.wait() which waits at least 1 s
assert sender._n_queued == 0
\ No newline at end of file
assert sender._n_queued == 0
def test_asapo_sender_group_id_error(mock_producer, mock_broker):
error = asapo_consumer.AsapoConsumerError()
mock_broker.generate_group_id.side_effect = [error, error, "1234"]
with pytest.raises(StreamError):
AsapoSender(mock_producer, mock_broker, retries=1, retry_delay=0.01)
def test_asapo_sender_group_id_error_recover(mock_producer, mock_broker):
error = asapo_consumer.AsapoConsumerError()
mock_broker.generate_group_id.side_effect = [error, error, "1234"]
sender = AsapoSender(mock_producer, mock_broker, retry_delay=0.01)
assert sender._group_id == "1234"
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment