Commit 714814dd authored by Tim Schoof's avatar Tim Schoof
Browse files

Handle AsapoDataNotInCacheError in AsapoReceiver

In case the data is not in the cache any more, the data is lost, and
thus a MissingDataError is raised for an AsapoDataNotInCacheError.

SimpleAsapoReceiver can now also throw a MissingDataError, but this
should be ok as a worker either expects this error or it falls back to
the defaul no-op handler.
parent 0dd347fc
Pipeline #29747 passed with stage
in 53 seconds
......@@ -103,6 +103,9 @@ class SimpleAsapoReceiver:
asapo_consumer.AsapoNoDataError,
asapo_consumer.AsapoLocalIOError) as err:
raise TemporaryError("Failed to get next") from err
except asapo_consumer.AsapoDataNotInCacheError as err:
raise MissingDataError(
"Failed to get next, possible data loss") from err
except asapo_consumer.AsapoWrongInputError as err:
raise ConfigurationError("Failed to get next") from err
except asapo_consumer.AsapoConsumerError as err:
......@@ -240,6 +243,13 @@ class SerialAsapoReceiver(SimpleAsapoReceiver):
raise TemporaryError(
"Failed to get next at expected_id="
+ str(self.expected_id)) from err
except asapo_consumer.AsapoDataNotInCacheError as err:
# Data cannot be retrieved any longer
msg = (
"Failed to get next at expected_id={}, skipping id due to "
"possible data loss").format(self.expected_id)
self.expected_id += 1
raise MissingDataError(msg) from err
except asapo_consumer.AsapoWrongInputError as err:
raise ConfigurationError(
"Failed to get next at expected_id="
......@@ -326,7 +336,8 @@ class SerialAsapoReceiver(SimpleAsapoReceiver):
raise TemporaryError("Failed to get last") from err
except (asapo_consumer.AsapoInterruptedTransactionError,
asapo_consumer.AsapoNoDataError,
asapo_consumer.AsapoLocalIOError) as err:
asapo_consumer.AsapoLocalIOError,
asapo_consumer.AsapoDataNotInCacheError) as err:
raise MissingDataError("Failed to get last") from err
except asapo_consumer.AsapoWrongInputError as err:
raise ConfigurationError("Failed to get last") from err
......@@ -389,6 +400,14 @@ class SerialDatasetAsapoReceiver(SerialAsapoReceiver):
raise TemporaryError(
"Failed to get next at expected_id="
+ str(self.expected_id)) from err
except asapo_consumer.AsapoDataNotInCacheError as err:
# Data cannot be retrieved any longer and the lastread marker is
# advanced
msg = (
"Failed to get next at expected_id={}, skipping id due to "
"possible data loss").format(self.expected_id)
self.expected_id += 1
raise MissingDataError(msg) from err
except asapo_consumer.AsapoWrongInputError as err:
raise ConfigurationError(
"Failed to get next at expected_id="
......
......@@ -54,6 +54,12 @@ class DummyStream:
self.indices[group_id] = self.indices.get(group_id, 1) + 1
raise asapo_consumer.AsapoNoDataError("No data")
def get_next_data_not_in_cache(
self, group_id, stream="default", meta_only=True):
log.info("get_next_no_data_in_cache")
self.indices[group_id] = self.indices.get(group_id, 1) + 1
raise asapo_consumer.AsapoDataNotInCacheError("Data not in cache")
def get_next_unavailable(
self, group_id, stream="default", meta_only=True):
log.info("get_next_unavailable")
......
......@@ -159,3 +159,31 @@ def test_receiver_missing_data_error(mock_failing_broker, dummy_stream, i):
assert metadata["_id"] == id
assert data is not None
id += 1
@pytest.mark.parametrize("i", range(100))
def test_receiver_data_not_in_cache_error(mock_failing_broker, dummy_stream, i):
call_list = choices(
["end_of_stream", "no_data", "data_not_in_cache"] + ["ok"]*4, 100)
mock_failing_broker.get_next.side_effect = call_iterator(
call_list, dummy_stream, "get_next"
)
receiver = SerialAsapoReceiver(mock_failing_broker, max_retries=9999)
id = 1
while id < len(dummy_stream.data):
try:
data, metadata = receiver.get_next(meta_only=False)
except TemporaryError as err:
assert "Unexpected id" not in str(err)
continue
except MissingDataError:
id += 1
continue
except StreamError:
continue
assert metadata["_id"] == id
assert data is not None
id += 1
......@@ -28,7 +28,8 @@ def mock_consumer():
def test_dummy_stream(mock_consumer, dummy_stream):
mock_consumer.get_next.side_effect = call_iterator(
["ok", "no_data", "skip"] + ["ok"]*(len(dummy_stream.data) + 5),
["ok", "no_data", "data_not_in_cache", "skip"]
+ ["ok"]*(len(dummy_stream.data) + 5),
dummy_stream, "get_next"
)
......@@ -44,11 +45,14 @@ def test_dummy_stream(mock_consumer, dummy_stream):
with pytest.raises(asapo_consumer.AsapoNoDataError):
data, metadata = mock_consumer.get_next("group_id", meta_only=False)
with pytest.raises(asapo_consumer.AsapoDataNotInCacheError):
data, metadata = mock_consumer.get_next("group_id", meta_only=False)
with pytest.raises(asapo_consumer.AsapoInterruptedTransactionError):
data, metadata = mock_consumer.get_next("group_id", meta_only=False)
data, metadata = mock_consumer.get_next("group_id", meta_only=False)
assert metadata["_id"] == 4
assert metadata["_id"] == 5
assert data is not None
mock_consumer.set_lastread_marker("group_id", 1)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment