Commit c966fea5 authored by Tim Schoof's avatar Tim Schoof
Browse files

Add SimpleAsapoReceiver and rename AsapoReceiver to SerialAsapoReceiver

parent 21e8732b
......@@ -24,18 +24,9 @@ def create_broker(
return broker
# TODO: Ensure also that indices are consecutive or start at 0
@Configurable
class AsapoReceiver:
"""
A wrapper for an ASAP::O consumer for serial processing
This wrapper guarantees that the data returned by get_next is in ordered by
id. If a record cannot be retrieved, get_next raises a MissingDataError and
continues with the next id on the next call. A MissingDataError is raised
by get_next exactly once per AsapoReceiver instance for each skipped
record.
"""
class SimpleAsapoReceiver:
"""A simple wrapper for an ASAP::O consumer"""
broker = Config(
"An ASAP::O consumer broker", type=asapo_consumer.PyDataBroker,
builder=create_broker, flatten=True, arguments=dict(
......@@ -57,14 +48,6 @@ class AsapoReceiver:
"group_id. If not given, a unique group id will be generated "
"and the worker will receive the complete stream.",
type=str)
start_id = Config(
"The id of the first data to be received. "
"All earlier data is skipped. "
"Defaults to start_id=1, i.e., the beginning of the stream.",
type=int, default=1)
max_retries = Config(
"In case of ASAP::O errors, retry this many times before skipping "
"data.", type=int, default=2)
@group_id.default
def _generate_group_id(self):
......@@ -77,6 +60,49 @@ class AsapoReceiver:
log.info("New group_id=%s.", group_id)
return group_id
def get_next(self, meta_only=True):
log.info("Requesting next record for group_id=%s.", self.group_id)
try:
data, metadata = self.broker.get_next(
self.group_id, meta_only=meta_only)
except (asapo_consumer.AsapoEndOfStreamError,
asapo_consumer.AsapoUnavailableServiceError,
asapo_consumer.AsapoInterruptedTransactionError,
asapo_consumer.AsapoNoDataError,
asapo_consumer.AsapoLocalIOError) as err:
raise TemporaryError("Failed to get next") from err
except asapo_consumer.AsapoWrongInputError as err:
raise ConfigurationError("Failed to get next") from err
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get next") from err
current_id = metadata["_id"]
log.info("Received record with id=%i.", current_id)
return data, metadata
# TODO: Ensure also that indices are consecutive or start at 0
@Configurable
class SerialAsapoReceiver(SimpleAsapoReceiver):
"""
A wrapper for an ASAP::O consumer for serial processing
This wrapper guarantees that the data returned by get_next is in ordered by
id. If a record cannot be retrieved, get_next raises a MissingDataError and
continues with the next id on the next call. A MissingDataError is raised
by get_next exactly once per AsapoReceiver instance for each skipped
record.
"""
start_id = Config(
"The id of the first data to be received. "
"All earlier data is skipped. "
"Defaults to start_id=1, i.e., the beginning of the stream.",
type=int, default=1)
max_retries = Config(
"In case of ASAP::O errors, retry this many times before skipping "
"data.", type=int, default=2)
def __attrs_post_init__(self):
self.expected_id = self.start_id
......
......@@ -7,7 +7,7 @@ except ImportError:
import pytest
import asapo_consumer
from AsapoWorker.asapo_receiver import AsapoReceiver
from AsapoWorker.asapo_receiver import SerialAsapoReceiver
from AsapoWorker.errors import StreamError, MissingDataError, TemporaryError
from AsapoWorker.testing import DummyStream, call_iterator, choices
......@@ -56,7 +56,7 @@ def test_receiver(mock_broker, dummy_stream):
[dummy_stream.get_next]*len(dummy_stream.data)
)
receiver = AsapoReceiver(mock_broker)
receiver = SerialAsapoReceiver(mock_broker)
for i in range(len(dummy_stream.data)):
data, metadata = receiver.get_next(meta_only=False)
......@@ -69,12 +69,12 @@ def test_receiver_generate_error(mock_broker):
asapo_consumer.AsapoWrongInputError())
with pytest.raises(StreamError):
AsapoReceiver(mock_broker)
SerialAsapoReceiver(mock_broker)
@pytest.mark.parametrize("i", range(10))
def test_receiver_random(mock_failing_broker, dummy_stream, i):
receiver = AsapoReceiver(mock_failing_broker, max_retries=99999)
receiver = SerialAsapoReceiver(mock_failing_broker, max_retries=99999)
id = 1
......@@ -94,7 +94,7 @@ def test_receiver_random(mock_failing_broker, dummy_stream, i):
def test_receiver_start_id(mock_failing_broker, dummy_stream, i):
start_id = random.randint(1, len(dummy_stream.data))
receiver = AsapoReceiver(
receiver = SerialAsapoReceiver(
mock_failing_broker, start_id=start_id, max_retries=99999)
id = start_id
......@@ -123,7 +123,7 @@ def test_receiver_max_retries(mock_broker, dummy_stream):
[dummy_stream.set_lastread_marker_ok]*(max_retries + 2)
)
receiver = AsapoReceiver(mock_broker, max_retries=max_retries)
receiver = SerialAsapoReceiver(mock_broker, max_retries=max_retries)
id = 2
......@@ -140,7 +140,7 @@ def test_receiver_max_retries(mock_broker, dummy_stream):
@pytest.mark.parametrize("i", range(100))
def test_receiver_missing_data_error(mock_failing_broker, dummy_stream, i):
receiver = AsapoReceiver(mock_failing_broker, max_retries=2)
receiver = SerialAsapoReceiver(mock_failing_broker, max_retries=2)
id = 1
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment