streamer.py 12.3 KB
Newer Older
Tim Schoof's avatar
Tim Schoof committed
1
import logging
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
2
import traceback
Tim Schoof's avatar
Tim Schoof committed
3
from threading import Event
4
import threading
5
from asapo_producer import AsapoProducerError
6
7
from AsapoWorker.configuration import create_instance_from_configurable
from AsapoWorker.configurable import check_type
8
from AsapoWorker.errors import (
9
    TemporaryError, MissingDataError, ConfigurationError, EndOfStreamError,
10
    StreamError, StreamFinishedError)
11
from AsapoWorker.utils import format_error
Tim Schoof's avatar
Tim Schoof committed
12

Tim Schoof's avatar
Tim Schoof committed
13
14
log = logging.getLogger(__name__)

Tim Schoof's avatar
Tim Schoof committed
15

Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
16
17
def max_stream(streams):
    return max(int(s) for s in streams if s.isdecimal())
Tim Schoof's avatar
Tim Schoof committed
18
19


20
21
def get_new_stream(stream, receiver_stream_list, metadata_stream_list,
                   sender_stream_list=None, naming_scheme='date'):
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
22
    """
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
23
    Get name of next stream using one of naming scheme
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
24

25
26
    Scheme 'date': streams are ordered from earliest to latest.
        Next stream is one, which is
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
27
        next in the stream_list compared to current stream
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
28

Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
29
30
    Scheme 'basename': list of streamer and receiver streams are compared.
        Next stream is one, which is present in the receiver stream list and
31
32
        not present in the sender stream list. Using option `_nonfinished`
        only non finished sender string will be considered
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
33

Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
34
35
    Scheme 'numeric': Assume the name of stream is an integer, which increases
        for each next stream a newer stream is ready, when it has data and metadata
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
36
37
38

    Parameters
    ----------
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
39
40
    stream: str
        Name of current stream
41
    receiver_stream_list: list of dict
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
42
        List of receiver streams
43
    metadata_stream_list: list of dict
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
44
        List of metadata-stream streams
45
    sender_stream_list: list of dict
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
46
        List of sender streams
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
47
    naming_scheme: str
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
48
        Naming scheme to chose next stream
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
49
50
    """

51
52
    receiver_stream_list = sorted(receiver_stream_list, key=lambda x: x['timestampCreated'])
    receiver_stream_list = [x['name'] for x in receiver_stream_list]
53
    if isinstance(metadata_stream_list, list):
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
54
        metadata_stream_list = [x['name'] for x in metadata_stream_list]
55
    if 'basename' in naming_scheme:
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
56
        if sender_stream_list is None:
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
57
            log.warning(
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
58
59
                "Cannot calculate next stream. sender stream list is None"
                "stream=%s", stream)
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
60
            return None
61
        if 'nonfinished' in naming_scheme:
62
            sender_stream_list = [x for x in sender_stream_list if x['finished']]
63
        sender_stream_list = [x['name'] for x in sender_stream_list]
64
        for receiver_stream in receiver_stream_list:
65
66
67
            if (receiver_stream != stream and
                    receiver_stream not in sender_stream_list and
                    receiver_stream in metadata_stream_list):
68
                return receiver_stream
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
69
    elif naming_scheme == 'date':
70
        if stream not in receiver_stream_list:
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
71
72
            pos = 0
        else:
73
            pos = receiver_stream_list.index(stream) + 1
74
        if pos < len(receiver_stream_list) and receiver_stream_list[pos] in metadata_stream_list:
75
            return receiver_stream_list[pos]
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
76
77

    elif naming_scheme == 'numeric':
78
        max_stream_number = max_stream(receiver_stream_list)
79
        try:
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
80
            if (int(stream) < max_stream_number
81
                    and str(int(stream) + 1) in metadata_stream_list):
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
82
                return str(int(stream) + 1)
83
84
        except ValueError:
            log.warning(
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
85
86
                "Cannot calculate next stream from non-integer value "
                "stream=%s", stream)
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
87
88
    else:
        log.warning(
Tim Schoof's avatar
Tim Schoof committed
89
90
            "Unknown stream naming scheme {naming_scheme}".format(
                naming_scheme=naming_scheme))
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
91
92
        return None

93
    return None
Tim Schoof's avatar
Tim Schoof committed
94
95


96
97
98
99
100
class ContainsAll:
    def __contains__(self, item):
        return True


Tim Schoof's avatar
Tim Schoof committed
101
class Streamer:
102
    def __init__(
103
            self, receiver, worker, metadata_receiver=None, delay_on_error=3,
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
104
105
            end_of_stream_callback=None, fix_metadata_stream=False,
            stream_naming_scheme='numeric'):
Tim Schoof's avatar
Tim Schoof committed
106
107
        self.receiver = receiver
        self.worker = worker
108
        self.metadata_receiver = metadata_receiver
Tim Schoof's avatar
Tim Schoof committed
109
        self.initial_delay_on_error = delay_on_error
Tim Schoof's avatar
Tim Schoof committed
110
        self.delay_on_error = delay_on_error
111
        self.end_of_stream_callback = end_of_stream_callback
Tim Schoof's avatar
Tim Schoof committed
112
        self.likely_done = False
113
        self.stream_finished = False
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
114
        self.fix_metadata_stream = fix_metadata_stream
Tim Schoof's avatar
Tim Schoof committed
115
        self.stopped = Event()
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
116
        self.stream_naming_scheme = stream_naming_scheme
Tim Schoof's avatar
Tim Schoof committed
117
118
119
120
121
122
123
124
125

    def _process_stream(self):
        data, metadata = self._get_next()

        if metadata is None:
            return False

        try:
            self.worker.process(data, metadata)
126
127
        except (AsapoProducerError, ConfigurationError) as err:
            log.critical("Sending failed: " + str(err))
128
            raise err
Tim Schoof's avatar
Tim Schoof committed
129
130
        except Exception as err:
            log.exception("Worker could not process data.")
131
            raise err
Tim Schoof's avatar
Tim Schoof committed
132
133
134

        return True

Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
135
    def _get_stream_metadata(self):
136
        try:
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
137
138
            if self.fix_metadata_stream:
                stream = 'default'
139
            else:
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
140
141
142
                stream = self.receiver.stream
            return self.metadata_receiver.get_stream_metadata(
                stream)
143
144
        except EndOfStreamError as err:
            log.info(format_error(err))
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
145
            # The stream might have been skipped
146
            self._handle_end_of_stream()
147
            return None, None
148
149
        except TemporaryError as err:
            log.warn(format_error(err))
150
            return None, None
151

Tim Schoof's avatar
Tim Schoof committed
152
153
    def _get_next(self):
        try:
154
            data, metadata = self.receiver.get_next(meta_only=self.worker.meta_only)
155
156
157
158
        except EndOfStreamError as err:
            log.info(format_error(err))
            self._handle_receiver_temporary_error()
            self._handle_end_of_stream()
159
160
161
            if self.stream_finished:
                if self.end_of_stream_callback is None:
                    self.stop()
162
            return None, None
163
164
165
166
167
168
169
170
        except TemporaryError as err:
            log.warn(format_error(err))
            self._handle_receiver_temporary_error()
            return None, None
        except MissingDataError as err:
            log.error("Missing data error", exc_info=True)
            self._handle_receiver_missing_data_error()
            return None, None
171
172
        except StreamFinishedError as err:
            log.info("Stream is finished")
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
173
            self._handle_end_of_stream()
174
175
176
            if not self.stream_finished:
                stream_info = self.receiver.get_stream_info()
                self.worker.stream_finished(stream_info)
177
                self.stream_finished = True
178
179
180
            left_msgs = len(self.receiver.get_unacknowledged_messages())
            log.info("Number of unacknowledged messages = %s", left_msgs)
            if left_msgs == 0 or self.receiver.n_resend_nacs == 0:
181
182
183
                # Stop this instance if it can not start new instance anymore
                if self.end_of_stream_callback is None:
                    self.stop()
184
            return None, None
Tim Schoof's avatar
Tim Schoof committed
185
        except Exception as err:
Tim Schoof's avatar
Tim Schoof committed
186
            log.critical("Unhandled exception", exc_info=True)
Tim Schoof's avatar
Tim Schoof committed
187
            self._handle_receiver_critical_error()
188
            raise err
Tim Schoof's avatar
Tim Schoof committed
189
190
191

        return data, metadata

192
    def _handle_end_of_stream(self):
Tim Schoof's avatar
Tim Schoof committed
193
        # When receiving an EndOfStreamError, there are two cases to consider:
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
194
        # 1. The stream has data and metadata, i.e., occurs in the stream
195
        #    list
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
196
        #    -> data receiving is slow or stream is finished
Tim Schoof's avatar
Tim Schoof committed
197
        #    -> start next stream
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
198
199
200
201
        # 2. A newer stream has data and metadata, i.e., occurs in the
        #    stream list
        #    -> stream is likely finished or was skipped
        #    -> start next stream + reduce polling rate
Tim Schoof's avatar
Tim Schoof committed
202
203
204
205
206
        if self.likely_done:
            # nothing is left to be done
            return

        try:
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
207
            stream_list = self.receiver.get_stream_list()
Tim Schoof's avatar
Tim Schoof committed
208
209
        except StreamError:
            # the state is unknown, so nothing should be done
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
210
            log.warn("Failed to get stream list", exc_info=True)
Tim Schoof's avatar
Tim Schoof committed
211
212
            return

213
214
        if self.worker.sender is not None:
            try:
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
215
                sender_stream_list = self.worker.sender.get_stream_list()
216
217
            except StreamError:
                log.warn(
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
218
                    "Failed to get sender stream list", exc_info=True)
219
220
                return
        else:
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
221
            sender_stream_list = None
222

223
        if self.metadata_receiver and not self.fix_metadata_stream:
224
            try:
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
225
226
                metadata_stream_list = (
                    self.metadata_receiver.get_stream_list())
227
228
229
            except StreamError:
                # the state is unknown, so nothing should be done
                log.warn(
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
230
                    "Failed to get metadata stream list", exc_info=True)
231
232
                return
        else:
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
233
            # Data_source does not use stream metadata, therefore consider all
234
            # metadata as available
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
235
            metadata_stream_list = ContainsAll()
236

Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
237
238
        if (self.receiver.stream in stream_list
                and self.receiver.stream in metadata_stream_list):
Tim Schoof's avatar
Tim Schoof committed
239
240
241
242
243
            has_data = True
        else:
            has_data = False

        try:
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
244
245
246
247
248
            new_stream = get_new_stream(
                self.receiver.stream, stream_list,
                metadata_stream_list,
                sender_stream_list,
                self.stream_naming_scheme)
Tim Schoof's avatar
Tim Schoof committed
249
        except ValueError:
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
250
            # stream is not an integer, so nothing should be done
Tim Schoof's avatar
Tim Schoof committed
251
252
            return

Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
253
        if (has_data or new_stream is not None) and self.end_of_stream_callback:
254
            # call the callback only once
255
            log.debug("Start next stream=%s", new_stream)
256
            self.end_of_stream_callback(new_stream)
257
258
            self.end_of_stream_callback = None

Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
259
        if new_stream is not None:
Tim Schoof's avatar
Tim Schoof committed
260
261
            self.likely_done = True

Tim Schoof's avatar
Tim Schoof committed
262
263
264
265
266
267
268
269
270
271
    def _handle_receiver_temporary_error(self):
        self.worker.handle_receiver_temporary_error()

    def _handle_receiver_missing_data_error(self):
        self.worker.handle_receiver_missing_data_error()

    def _handle_receiver_critical_error(self):
        self.worker.handle_receiver_critical_error()

    def run(self):
272
        try:
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
273
274
            threading.current_thread().name = "stream_{}".format(
                self.receiver.stream)
275
            if self.metadata_receiver:
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
276
                log.info("Waiting for stream metadata")
277
                while not self.stopped.is_set():
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
278
279
                    data, stream_metadata = self._get_stream_metadata()
                    if stream_metadata:
280
281
282
283
284
285
286
287
288
289
                        self._reset_delay_on_error()
                        break
                    if self.likely_done:
                        self.stopped.wait(self.delay_on_error)
                        self._increase_delay_on_error()
                else:
                    # no break, i.e., stopped is set
                    return

                log.info("Performing pre-scan setup")
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
290
291
                parameters = self._meta_to_parameters(stream_metadata['meta'])
                self.worker.pre_scan(data, stream_metadata, parameters)
292
293

            log.info("Start stream processing.")
294
295
            while not self.stopped.is_set():
                success = self._process_stream()
296
                if (self.likely_done or self.stream_finished) and not success:
297
298
299
300
                    self.stopped.wait(self.delay_on_error)
                    self._increase_delay_on_error()
                else:
                    self._reset_delay_on_error()
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
301
        except Exception as e:
Tim Schoof's avatar
Tim Schoof committed
302
            log.error("Streamer fails with error: %s", e, exc_info=True)
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
303
            raise
304
305
        finally:
            self._shutdown()
Tim Schoof's avatar
Tim Schoof committed
306

307
    def _meta_to_parameters(self, metadata):
308
        parameters = None
309
        if hasattr(self.worker, "Parameters"):
310
            parameters = create_instance_from_configurable(self.worker.Parameters, metadata)
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
311
            check_type(parameters)
312
313
        return parameters

Tim Schoof's avatar
Tim Schoof committed
314
315
316
317
318
319
320
321
    def _increase_delay_on_error(self):
        self.delay_on_error = max(
            10*(self.initial_delay_on_error),
            self.delay_on_error + self.initial_delay_on_error)

    def _reset_delay_on_error(self):
        self.delay_on_error = self.initial_delay_on_error

Tim Schoof's avatar
Tim Schoof committed
322
323
    def stop(self):
        log.info("Stopping stream processing.")
Tim Schoof's avatar
Tim Schoof committed
324
        self.stopped.set()
Tim Schoof's avatar
Tim Schoof committed
325
326
327
328

    def _shutdown(self):
        log.info("Cleaning up.")
        self.worker.shutdown()
329
330
331
332
333
        if self.worker.sender:
            try:
                self.worker.sender.wait(timeout=10)
            except Exception as err:
                log.error("Possible data loss:" + str(err))