streamer.py 10.3 KB
Newer Older
Tim Schoof's avatar
Tim Schoof committed
1
import logging
Tim Schoof's avatar
Tim Schoof committed
2
from threading import Event
3
import threading
4
5
from asapo_producer import AsapoProducerError
from AsapoWorker.errors import (
6
7
8
    TemporaryError, MissingDataError, ConfigurationError, EndOfStreamError,
    StreamError)
from AsapoWorker.utils import format_error
Tim Schoof's avatar
Tim Schoof committed
9

Tim Schoof's avatar
Tim Schoof committed
10
11
log = logging.getLogger(__name__)

Tim Schoof's avatar
Tim Schoof committed
12

Tim Schoof's avatar
Tim Schoof committed
13
14
15
16
def max_substream(substreams):
    return max(int(s) for s in substreams if s.isdecimal())


17
def get_new_substream(substream, substream_list, metadata_substream_list,
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
                      sender_substream_list=None, naming_scheme='date'):
    """
    Get name of next substream using one of naming scheme

    Scheme 'date': substreams name encoded timestamp and therefore
        can be ordered from earliest to latest. Next substream is one, which is
        next in the substream_list compared to current substream

    Scheme 'basename': list of streamer and receiver substreams are compared.
        Next substream is one, which is present in the receiver substream list and
        not present in the sender substream list.

    Scheme 'numeric': Assume the name of substream is an integer, which increases
        for each next substream a newer substream is ready, when it has data and metadata

    Parameters
    ----------
    substream: str
        Name of current substream
    substream_list: list of str
        List of receiver substreams
    metadata_substream_list: list of str
        List of metadata-stream substreams
    sender_substream_list: list of str
        List of sender substreams
    naming_scheme: str
        Naming scheme to chose next substream
    """

    if naming_scheme == 'basename':
        if sender_substream_list is None:
            log.warning(
                "Cannot calculate next substream. sender substream list is None"
                "substream=%s", substream)
            return None

54
55
56
57
        new_substreams = list(set(substream_list)
                              - set(sender_substream_list) - set(substream))
        if len(new_substreams) > 0:
            return new_substreams[0]
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
58
59
60
61
62
63
64
65
66
67
    elif naming_scheme == 'date':

        if substream not in substream_list:
            pos = 0
        else:
            pos = sorted(substream_list).index(substream) + 1
        if pos < len(substream_list):
            return sorted(substream_list)[pos]

    elif naming_scheme == 'numeric':
68
69
        max_substream_number = max_substream(substream_list)
        try:
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
70
71
            if (int(substream) < max_substream_number
                    and max_substream_number in metadata_substream_list):
72
73
74
75
76
                return str(int(substream) + 1)
        except ValueError:
            log.warning(
                "Cannot calculate next substream from non-integer value "
                "substream=%s", substream)
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
77
78
79
80
81
    else:
        log.warning(
            f"Unknown substream naming scheme {naming_scheme}")
        return None

82
    return None
Tim Schoof's avatar
Tim Schoof committed
83
84


85
86
87
88
89
class ContainsAll:
    def __contains__(self, item):
        return True


Tim Schoof's avatar
Tim Schoof committed
90
class Streamer:
91
    def __init__(
92
            self, receiver, worker, metadata_receiver=None, delay_on_error=3,
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
93
94
            end_of_stream_callback=None, fix_metadata_substream=False,
            substream_naming_scheme='numeric'):
Tim Schoof's avatar
Tim Schoof committed
95
96
        self.receiver = receiver
        self.worker = worker
97
        self.metadata_receiver = metadata_receiver
Tim Schoof's avatar
Tim Schoof committed
98
        self.initial_delay_on_error = delay_on_error
Tim Schoof's avatar
Tim Schoof committed
99
        self.delay_on_error = delay_on_error
100
        self.end_of_stream_callback = end_of_stream_callback
Tim Schoof's avatar
Tim Schoof committed
101
        self.likely_done = False
102
        self.fix_metadata_substream = fix_metadata_substream
Tim Schoof's avatar
Tim Schoof committed
103
        self.stopped = Event()
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
104
        self.substream_naming_scheme = substream_naming_scheme
Tim Schoof's avatar
Tim Schoof committed
105
106
107
108
109
110
111
112
113

    def _process_stream(self):
        data, metadata = self._get_next()

        if metadata is None:
            return False

        try:
            self.worker.process(data, metadata)
114
115
        except (AsapoProducerError, ConfigurationError) as err:
            log.critical("Sending failed: " + str(err))
116
            raise err
Tim Schoof's avatar
Tim Schoof committed
117
118
        except Exception as err:
            log.exception("Worker could not process data.")
119
            raise err
Tim Schoof's avatar
Tim Schoof committed
120
121
122

        return True

123
124
    def _get_substream_metadata(self):
        try:
125
126
127
128
            if self.fix_metadata_substream:
                substream = 'default'
            else:
                substream = self.receiver.substream
129
            return self.metadata_receiver.get_substream_metadata(
130
                substream)
131
132
133
134
        except EndOfStreamError as err:
            log.info(format_error(err))
            # The substream might have been skipped
            self._handle_end_of_stream()
135
            return None, None
136
137
        except TemporaryError as err:
            log.warn(format_error(err))
138
            return None, None
139

Tim Schoof's avatar
Tim Schoof committed
140
141
    def _get_next(self):
        try:
142
            data, metadata = self.receiver.get_next(meta_only=self.worker.meta_only)
143
144
145
146
147
        except EndOfStreamError as err:
            log.info(format_error(err))
            self._handle_receiver_temporary_error()
            self._handle_end_of_stream()
            return None, None
Tim Schoof's avatar
Tim Schoof committed
148
        except TemporaryError as err:
Tim Schoof's avatar
Tim Schoof committed
149
            log.warn(format_error(err))
Tim Schoof's avatar
Tim Schoof committed
150
151
152
            self._handle_receiver_temporary_error()
            return None, None
        except MissingDataError as err:
Tim Schoof's avatar
Tim Schoof committed
153
            log.error("Missing data error", exc_info=True)
Tim Schoof's avatar
Tim Schoof committed
154
155
156
            self._handle_receiver_missing_data_error()
            return None, None
        except Exception as err:
Tim Schoof's avatar
Tim Schoof committed
157
            log.critical("Unhandled exception", exc_info=True)
Tim Schoof's avatar
Tim Schoof committed
158
            self._handle_receiver_critical_error()
159
            raise err
Tim Schoof's avatar
Tim Schoof committed
160
161
162

        return data, metadata

163
    def _handle_end_of_stream(self):
Tim Schoof's avatar
Tim Schoof committed
164
        # When receiving an EndOfStreamError, there are two cases to consider:
165
166
        # 1. The substream has data and metadata, i.e., occurs in the substream
        #    list
Tim Schoof's avatar
Tim Schoof committed
167
168
        #    -> data receiving is slow or substream is finished
        #    -> start next stream
169
170
        # 2. A newer substream has data and metadata, i.e., occurs in the
        #    substream list
Tim Schoof's avatar
Tim Schoof committed
171
172
173
174
175
176
177
178
179
180
181
182
183
        #    -> substream is likely finished or was skipped
        #    -> start next substream + reduce polling rate
        if self.likely_done:
            # nothing is left to be done
            return

        try:
            substream_list = self.receiver.get_substream_list()
        except StreamError:
            # the state is unknown, so nothing should be done
            log.warn("Failed to get substream list", exc_info=True)
            return

184
185
186
187
188
189
190
191
192
193
        if self.worker.sender is not None:
            try:
                sender_substream_list = self.worker.sender.get_substream_list()
            except StreamError:
                log.warn(
                    "Failed to get sender substream list", exc_info=True)
                return
        else:
            sender_substream_list = None

194
195
196
197
198
199
200
201
202
203
204
205
206
        if self.metadata_receiver:
            try:
                metadata_substream_list = (
                    self.metadata_receiver.get_substream_list())
            except StreamError:
                # the state is unknown, so nothing should be done
                log.warn(
                    "Failed to get metadata substream list", exc_info=True)
                return
        else:
            # Stream does not use substream metadata, therefore consider all
            # metadata as available
            metadata_substream_list = ContainsAll()
207
208
209

        if (self.receiver.substream in substream_list
                and self.receiver.substream in metadata_substream_list):
Tim Schoof's avatar
Tim Schoof committed
210
211
212
213
214
            has_data = True
        else:
            has_data = False

        try:
215
            new_substream = get_new_substream(
216
                self.receiver.substream, substream_list,
217
                metadata_substream_list,
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
218
219
                sender_substream_list,
                self.substream_naming_scheme)
Tim Schoof's avatar
Tim Schoof committed
220
221
222
223
        except ValueError:
            # substream is not an integer, so nothing should be done
            return

224
        if (has_data or new_substream is not None) and self.end_of_stream_callback:
225
            # call the callback only once
226
            self.end_of_stream_callback(new_substream)
227
228
            self.end_of_stream_callback = None

229
        if new_substream is not None:
Tim Schoof's avatar
Tim Schoof committed
230
231
            self.likely_done = True

Tim Schoof's avatar
Tim Schoof committed
232
233
234
235
236
237
238
239
240
241
    def _handle_receiver_temporary_error(self):
        self.worker.handle_receiver_temporary_error()

    def _handle_receiver_missing_data_error(self):
        self.worker.handle_receiver_missing_data_error()

    def _handle_receiver_critical_error(self):
        self.worker.handle_receiver_critical_error()

    def run(self):
242
        try:
243
            threading.current_thread().name = f"substream_{self.receiver.substream}"
244
245
246
            if self.metadata_receiver:
                log.info("Waiting for substream metadata")
                while not self.stopped.is_set():
247
                    data, substream_metadata = self._get_substream_metadata()
248
249
250
251
252
253
254
255
256
257
258
                    if substream_metadata:
                        self._reset_delay_on_error()
                        break
                    if self.likely_done:
                        self.stopped.wait(self.delay_on_error)
                        self._increase_delay_on_error()
                else:
                    # no break, i.e., stopped is set
                    return

                log.info("Performing pre-scan setup")
259
                self.worker.pre_scan(data, substream_metadata)
260
261

            log.info("Start stream processing.")
262
263
            while not self.stopped.is_set():
                success = self._process_stream()
264
                if self.likely_done and not success:
265
266
267
268
269
270
                    self.stopped.wait(self.delay_on_error)
                    self._increase_delay_on_error()
                else:
                    self._reset_delay_on_error()
        finally:
            self._shutdown()
Tim Schoof's avatar
Tim Schoof committed
271

Tim Schoof's avatar
Tim Schoof committed
272
273
274
275
276
277
278
279
    def _increase_delay_on_error(self):
        self.delay_on_error = max(
            10*(self.initial_delay_on_error),
            self.delay_on_error + self.initial_delay_on_error)

    def _reset_delay_on_error(self):
        self.delay_on_error = self.initial_delay_on_error

Tim Schoof's avatar
Tim Schoof committed
280
281
    def stop(self):
        log.info("Stopping stream processing.")
Tim Schoof's avatar
Tim Schoof committed
282
        self.stopped.set()
Tim Schoof's avatar
Tim Schoof committed
283
284
285
286

    def _shutdown(self):
        log.info("Cleaning up.")
        self.worker.shutdown()
287
288
289
290
291
        if self.worker.sender:
            try:
                self.worker.sender.wait(timeout=10)
            except Exception as err:
                log.error("Possible data loss:" + str(err))