asapo_receiver.py 13.9 KB
Newer Older
Tim Schoof's avatar
Tim Schoof committed
1
import logging
Tim Schoof's avatar
Tim Schoof committed
2
import asapo_consumer
3
from AsapoWorker.configurable import Configurable, Config
Tim Schoof's avatar
Tim Schoof committed
4
from AsapoWorker.errors import (
5
6
    StreamError, ConfigurationError, TemporaryError, MissingDataError,
    EndOfStreamError)
Tim Schoof's avatar
Tim Schoof committed
7

Tim Schoof's avatar
Tim Schoof committed
8
9
log = logging.getLogger(__name__)

Tim Schoof's avatar
Tim Schoof committed
10

Tim Schoof's avatar
Tim Schoof committed
11
12
def create_broker(
        source, path, has_filesystem, beamtime, stream, token, timeout):
Tim Schoof's avatar
Tim Schoof committed
13
    log.info(
Tim Schoof's avatar
Tim Schoof committed
14
15
16
        "Create new broker (source=%s, path=%s, has_filesystem=%s, "
        "beamtime=%s, stream=%s, token=%s, timeout=%i).",
        source, path, has_filesystem, beamtime, stream, token, timeout)
Tim Schoof's avatar
Tim Schoof committed
17
18
    try:
        broker = asapo_consumer.create_server_broker(
Tim Schoof's avatar
Tim Schoof committed
19
            source, path, has_filesystem, beamtime, stream, token, timeout)
Tim Schoof's avatar
Tim Schoof committed
20
    except asapo_consumer.AsapoWrongInputError as err:
Tim Schoof's avatar
Tim Schoof committed
21
        raise ConfigurationError("Cannot create broker") from err
Tim Schoof's avatar
Tim Schoof committed
22
    except asapo_consumer.AsapoConsumerError as err:
Tim Schoof's avatar
Tim Schoof committed
23
        raise StreamError("Cannot create broker") from err
Tim Schoof's avatar
Tim Schoof committed
24
25
26
27

    return broker


28
29
30
31
32
33
34
def create_metadata_broker(
        source, path, has_filesystem, beamtime, stream, token, timeout):
    return create_broker(
        source, path, has_filesystem, beamtime, stream + "_metadata", token,
        timeout)


35
@Configurable
36
37
class SimpleAsapoReceiver:
    """A simple wrapper for an ASAP::O consumer"""
38
    broker = Config(
39
        "An ASAP::O consumer broker", type=asapo_consumer.PyDataBroker,
40
        builder=create_broker, flatten=True, arguments=dict(
Tim Schoof's avatar
Tim Schoof committed
41
42
            source=Config("ASAP::O endpoint", type=str),
            path=Config("ASAP::O mount path", type=str),
43
44
            beamtime=Config("Beamtime ID", type=str),
            token=Config("Beamtime access token", type=str),
Tim Schoof's avatar
Tim Schoof committed
45
46
            stream=Config(
                "Name of input stream", type=str, default=""),
Tim Schoof's avatar
Tim Schoof committed
47
48
49
            has_filesystem=Config(
                "Read files directly from filesystem",
                type=bool, default=False),
50
            timeout=Config(
Tim Schoof's avatar
Tim Schoof committed
51
                "Allowed time in milliseconds for ASAP::O data access before "
Tim Schoof's avatar
Tim Schoof committed
52
                "exception is thrown", type=float, default=3000)
53
54
        ))
    group_id = Config(
55
        "The stream data is divided between all workers with the same "
56
57
58
        "group_id. If not given, a unique group id will be generated "
        "and the worker will receive the complete stream.",
        type=str)
59
60
    substream = Config(
        "The name of the substream.", type=str, default="default", init=False)
Tim Schoof's avatar
Tim Schoof committed
61

62
63
    @group_id.default
    def _generate_group_id(self):
Tim Schoof's avatar
Tim Schoof committed
64
65
        log.info("Generating new group id.")
        try:
66
            group_id = self.broker.generate_group_id()
Tim Schoof's avatar
Tim Schoof committed
67
        except asapo_consumer.AsapoConsumerError as err:
Tim Schoof's avatar
Tim Schoof committed
68
            raise StreamError("Cannot generate group_id") from err
Tim Schoof's avatar
Tim Schoof committed
69
70

        log.info("New group_id=%s.", group_id)
71
        return group_id
Tim Schoof's avatar
Tim Schoof committed
72

73
74
75
76
    def get_next(self, meta_only=True):
        log.info("Requesting next record for group_id=%s.", self.group_id)
        try:
            data, metadata = self.broker.get_next(
77
78
79
80
                self.group_id, meta_only=meta_only, substream=self.substream)
        except asapo_consumer.AsapoEndOfStreamError as err:
            raise EndOfStreamError("End of stream") from err
        except (asapo_consumer.AsapoUnavailableServiceError,
81
82
83
84
85
86
87
88
89
90
91
92
                asapo_consumer.AsapoInterruptedTransactionError,
                asapo_consumer.AsapoNoDataError,
                asapo_consumer.AsapoLocalIOError) as err:
            raise TemporaryError("Failed to get next") from err
        except asapo_consumer.AsapoWrongInputError as err:
            raise ConfigurationError("Failed to get next") from err
        except asapo_consumer.AsapoConsumerError as err:
            raise StreamError("Failed to get next") from err

        current_id = metadata["_id"]
        log.info("Received record with id=%i.", current_id)

Tim Schoof's avatar
Tim Schoof committed
93
94
        metadata["substream"] = self.substream

95
96
        return data, metadata

97
98
99
100
101
102
    def get_current_size(self):
        try:
            return self.broker.get_current_size(substream=self.substream)
        except asapo_consumer.AsapoConsumerError as err:
            raise StreamError("Failed to get current size") from err

Tim Schoof's avatar
Tim Schoof committed
103
104
105
106
107
108
    def get_substream_list(self):
        try:
            return self.broker.get_substream_list()
        except asapo_consumer.AsapoConsumerError as err:
            raise StreamError("Failed to get substream list") from err

109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130

# TODO: Ensure also that indices are consecutive or start at 0
@Configurable
class SerialAsapoReceiver(SimpleAsapoReceiver):
    """
    A wrapper for an ASAP::O consumer for serial processing

    This wrapper guarantees that the data returned by get_next is in ordered by
    id. If a record cannot be retrieved, get_next raises a MissingDataError and
    continues with the next id on the next call. A MissingDataError is raised
    by get_next exactly once per AsapoReceiver instance for each skipped
    record.
    """
    start_id = Config(
        "The id of the first data to be received. "
        "All earlier data is skipped. "
        "Defaults to start_id=1, i.e., the beginning of the stream.",
        type=int, default=1)
    max_retries = Config(
        "In case of ASAP::O errors, retry this many times before skipping "
        "data.", type=int, default=2)

131
132
    def __attrs_post_init__(self):
        self.expected_id = self.start_id
Tim Schoof's avatar
Tim Schoof committed
133

134
        if self.start_id != 1:
Tim Schoof's avatar
Tim Schoof committed
135
136
137
138
139
140
            self.need_set_marker = True
        else:
            self.need_set_marker = False

        self.retries = 0

141
142
143
144
145
146
    def set_start_id(self, value):
        log.info("Setting new start_id=%s", value)
        self.start_id = value
        self.expected_id = value
        self.need_set_marker = True

Tim Schoof's avatar
Tim Schoof committed
147
148
149
150
151
152
153
154
155
    def get_next(self, meta_only=True):
        if self.need_set_marker:
            self._set_marker()
        return self._get_next(meta_only=meta_only)

    def _get_next(self, meta_only):
        log.info("Requesting next record for group_id=%s.", self.group_id)
        try:
            data, metadata = self.broker.get_next(
156
157
158
159
160
161
                self.group_id, meta_only=meta_only, substream=self.substream)
        except asapo_consumer.AsapoEndOfStreamError as err:
            raise EndOfStreamError(
                "End of stream at expected_id"
                + str(self.expected_id)) from err
        except asapo_consumer.AsapoUnavailableServiceError as err:
Tim Schoof's avatar
Tim Schoof committed
162
            raise TemporaryError(
Tim Schoof's avatar
Tim Schoof committed
163
164
                "Failed to get next at expected_id="
                + str(self.expected_id)) from err
Tim Schoof's avatar
Tim Schoof committed
165
166
167
168
169
170
        except (asapo_consumer.AsapoInterruptedTransactionError,
                asapo_consumer.AsapoNoDataError,
                asapo_consumer.AsapoLocalIOError) as err:
            self.need_set_marker = True
            self.retries += 1
            raise TemporaryError(
Tim Schoof's avatar
Tim Schoof committed
171
172
                "Failed to get next at expected_id="
                + str(self.expected_id)) from err
Tim Schoof's avatar
Tim Schoof committed
173
174
        except asapo_consumer.AsapoWrongInputError as err:
            raise ConfigurationError(
Tim Schoof's avatar
Tim Schoof committed
175
176
                "Failed to get next at expected_id="
                + str(self.expected_id)) from err
Tim Schoof's avatar
Tim Schoof committed
177
178
        except asapo_consumer.AsapoConsumerError as err:
            raise StreamError(
Tim Schoof's avatar
Tim Schoof committed
179
180
                "Failed to get next at expected_id="
                + str(self.expected_id)) from err
Tim Schoof's avatar
Tim Schoof committed
181
182
183
184
185
186
187
188

        current_id = metadata["_id"]
        log.info("Received record with id=%i.", current_id)

        if current_id != self.expected_id:
            self.need_set_marker = True
            self.retries += 1
            raise TemporaryError(
189
                "Unexpected id, received id={}, expected_id={}".format(
Tim Schoof's avatar
Tim Schoof committed
190
191
192
193
                    current_id, self.expected_id))

        self.expected_id = current_id + 1
        self.retries = 0
Tim Schoof's avatar
Tim Schoof committed
194
195
196

        metadata["substream"] = self.substream

Tim Schoof's avatar
Tim Schoof committed
197
198
199
200
201
202
203
204
205
        return data, metadata

    def _set_marker(self):
        log.info(
            "Setting last read marker to id=%i for group_id=%s.",
            self.expected_id - 1, self.group_id)

        try:
            self.broker.set_lastread_marker(
206
                self.expected_id - 1, self.group_id, substream=self.substream)
Tim Schoof's avatar
Tim Schoof committed
207
208
209
210
211
        except (asapo_consumer.AsapoEndOfStreamError,
                asapo_consumer.AsapoUnavailableServiceError) as err:
            # Do not increase retry counter because get_next will likely
            # error as well
            raise TemporaryError(
Tim Schoof's avatar
Tim Schoof committed
212
213
                "Failed to set last read marker at expected_id="
                + str(self.expected_id)) from err
Tim Schoof's avatar
Tim Schoof committed
214
215
216
217
        except (asapo_consumer.AsapoInterruptedTransactionError,
                asapo_consumer.AsapoNoDataError,
                asapo_consumer.AsapoLocalIOError) as err:
            if self.retries >= self.max_retries:
Tim Schoof's avatar
Tim Schoof committed
218
                self._handle_too_many_retries(err)
Tim Schoof's avatar
Tim Schoof committed
219
220
            self.retries += 1
            raise TemporaryError(
Tim Schoof's avatar
Tim Schoof committed
221
222
                "Failed to set last read marker at expected_id="
                + str(self.expected_id)) from err
Tim Schoof's avatar
Tim Schoof committed
223
224
225
226
        except asapo_consumer.AsapoWrongInputError as err:
            # Do not increase retry counter because get_next will likely
            # error as well
            raise ConfigurationError(
Tim Schoof's avatar
Tim Schoof committed
227
228
                "Failed to set last read marker at expected_id="
                + str(self.expected_id)) from err
Tim Schoof's avatar
Tim Schoof committed
229
230
        except asapo_consumer.AsapoConsumerError as err:
            raise StreamError(
Tim Schoof's avatar
Tim Schoof committed
231
232
                "Failed to set last read marker at expected_id="
                + str(self.expected_id)) from err
Tim Schoof's avatar
Tim Schoof committed
233
234
235
236
237
238

        if self.retries >= self.max_retries:
            self._handle_too_many_retries()

        self.need_set_marker = False

Tim Schoof's avatar
Tim Schoof committed
239
    def _handle_too_many_retries(self, err=None):
Tim Schoof's avatar
Tim Schoof committed
240
        # There might be a "hole" in the stream, fall back to get_next
Tim Schoof's avatar
Tim Schoof committed
241
242
        msg = "Cannot get id={} after retries={}, possible data loss!".format(
            self.expected_id, self.retries)
Tim Schoof's avatar
Tim Schoof committed
243
244
        self.retries = 0
        self.expected_id += 1
Tim Schoof's avatar
Tim Schoof committed
245
246
247
248
        if err:
            raise MissingDataError(msg) from err
        else:
            raise MissingDataError(msg)
249
250
251
252
253

    def get_last(self, meta_only=True):
        log.info("Requesting last record for group_id=%s.", self.group_id)
        try:
            data, metadata = self.broker.get_last(
254
                self.group_id, meta_only=meta_only, substream=self.substream)
255
256
        except (asapo_consumer.AsapoEndOfStreamError,
                asapo_consumer.AsapoUnavailableServiceError) as err:
Tim Schoof's avatar
Tim Schoof committed
257
            raise TemporaryError("Failed to get last") from err
258
259
260
        except (asapo_consumer.AsapoInterruptedTransactionError,
                asapo_consumer.AsapoNoDataError,
                asapo_consumer.AsapoLocalIOError) as err:
Tim Schoof's avatar
Tim Schoof committed
261
            raise MissingDataError("Failed to get last") from err
262
        except asapo_consumer.AsapoWrongInputError as err:
Tim Schoof's avatar
Tim Schoof committed
263
            raise ConfigurationError("Failed to get last") from err
264
        except asapo_consumer.AsapoConsumerError as err:
Tim Schoof's avatar
Tim Schoof committed
265
            raise StreamError("Failed to get last") from err
266
267
268
269
270
271
272

        current_id = metadata["_id"]
        log.info("Received last record with id=%i.", current_id)

        # ASAP::O sets the last read marker when calling get_last
        self.need_set_marker = True

Tim Schoof's avatar
Tim Schoof committed
273
274
        metadata["substream"] = self.substream

275
        return data, metadata
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327


@Configurable
class AsapoMetadataReceiver:
    """A wrapper for receiving substream metadata of an ASAP::O stream"""
    broker = Config(
        "An ASAP::O consumer broker", type=asapo_consumer.PyDataBroker,
        builder=create_broker, flatten=True, arguments=dict(
            source=Config("ASAP::O endpoint", type=str),
            beamtime=Config("Beamtime ID", type=str),
            token=Config("Beamtime access token", type=str),
            stream=Config(
                "Name of metadata stream", type=str, default=""),
            timeout=Config(
                "Allowed time in milliseconds for ASAP::O data access before "
                "exception is thrown", type=float, default=3000),
            path=Config(
                "ASAP::O mount path", type=str, default="", init=False),
            has_filesystem=Config(
                "Read files directly from filesystem",
                type=bool, default=False, init=False)
        ))
    group_id = Config(
        "The stream data is divided between all workers with the same "
        "group_id. If not given, a unique group id will be generated "
        "and the worker will receive the complete stream.",
        type=str)

    @group_id.default
    def _generate_group_id(self):
        log.info("Generating new group id.")
        try:
            group_id = self.broker.generate_group_id()
        except asapo_consumer.AsapoConsumerError as err:
            raise StreamError("Cannot generate group_id") from err

        log.info("New group_id=%s.", group_id)
        return group_id

    def get_current_size(self, substream):
        try:
            return self.broker.get_current_size(substream=substream)
        except asapo_consumer.AsapoConsumerError as err:
            raise StreamError("Failed to get current size") from err

    def get_substream_list(self):
        try:
            return self.broker.get_substream_list()
        except asapo_consumer.AsapoConsumerError as err:
            raise StreamError("Failed to get substream list") from err

    def get_substream_metadata(self, substream):
328
329
330
331
332
333
334
335
336
        """
        Return the last available entry from given substream
        of the metadata stream

        Parameters
        ----------
        substream : str
            Name of substream
        """
337
        try:
338
            return self.broker.get_last(self.group_id, substream=substream)
339
340
341
342
343
344
345
346
347
348
349
350
        except asapo_consumer.AsapoEndOfStreamError as err:
            raise EndOfStreamError("End of metadata stream") from err
        except (asapo_consumer.AsapoUnavailableServiceError,
                asapo_consumer.AsapoInterruptedTransactionError,
                asapo_consumer.AsapoNoDataError,
                asapo_consumer.AsapoLocalIOError) as err:
            raise TemporaryError("Failed to get substream metadata") from err
        except asapo_consumer.AsapoWrongInputError as err:
            raise ConfigurationError(
                "Failed to get substream metadata") from err
        except asapo_consumer.AsapoConsumerError as err:
            raise StreamError("Failed to get substream metadata") from err