Commit 939e39ad authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Merge branch 'feat/next_stream_update' into 'master'

Feat/next stream update

See merge request !22
parents b5353d86 74ea9f05
......@@ -17,18 +17,19 @@ def max_stream(streams):
return max(int(s) for s in streams if s.isdecimal())
def get_new_stream(stream, stream_list, metadata_stream_list,
sender_stream_list=None, naming_scheme='date'):
def get_new_stream(stream, receiver_stream_list, metadata_stream_list,
sender_stream_list=None, naming_scheme='date'):
"""
Get name of next stream using one of naming scheme
Scheme 'date': streams name encoded timestamp and therefore
can be ordered from earliest to latest. Next stream is one, which is
Scheme 'date': streams are ordered from earliest to latest.
Next stream is one, which is
next in the stream_list compared to current stream
Scheme 'basename': list of streamer and receiver streams are compared.
Next stream is one, which is present in the receiver stream list and
not present in the sender stream list.
not present in the sender stream list. Using option `_nonfinished`
only non finished sender string will be considered
Scheme 'numeric': Assume the name of stream is an integer, which increases
for each next stream a newer stream is ready, when it has data and metadata
......@@ -37,41 +38,46 @@ def get_new_stream(stream, stream_list, metadata_stream_list,
----------
stream: str
Name of current stream
stream_list: list of str
receiver_stream_list: list of dict
List of receiver streams
metadata_stream_list: list of str
metadata_stream_list: list of dict
List of metadata-stream streams
sender_stream_list: list of str
sender_stream_list: list of dict
List of sender streams
naming_scheme: str
Naming scheme to chose next stream
"""
if naming_scheme == 'basename':
receiver_stream_list = sorted(receiver_stream_list, key=lambda x: x['timestampCreated'])
receiver_stream_list = [x['name'] for x in receiver_stream_list]
metadata_stream_list = [x['name'] for x in metadata_stream_list]
if 'basename' in naming_scheme:
if sender_stream_list is None:
log.warning(
"Cannot calculate next stream. sender stream list is None"
"stream=%s", stream)
return None
new_streams = list(set(stream_list)
- set(sender_stream_list) - set(stream))
if len(new_streams) > 0:
return new_streams[0]
if 'nonfinished' in naming_scheme:
sender_stream_list = [x for x in sender_stream_list if x['finished']]
sender_stream_list = [x['name'] for x in sender_stream_list]
for receiver_stream in receiver_stream_list:
if (receiver_stream != stream and
receiver_stream not in sender_stream_list and
receiver_stream in metadata_stream_list):
return receiver_stream
elif naming_scheme == 'date':
if stream not in stream_list:
if stream not in receiver_stream_list:
pos = 0
else:
pos = sorted(stream_list).index(stream) + 1
if pos < len(stream_list):
return sorted(stream_list)[pos]
pos = receiver_stream_list.index(stream) + 1
if pos < len(receiver_stream_list) and receiver_stream_list[pos] in metadata_stream_list:
return receiver_stream_list[pos]
elif naming_scheme == 'numeric':
max_stream_number = max_stream(stream_list)
max_stream_number = max_stream(receiver_stream_list)
try:
if (int(stream) < max_stream_number
and max_stream_number in metadata_stream_list):
and str(int(stream) + 1) in metadata_stream_list):
return str(int(stream) + 1)
except ValueError:
log.warning(
......@@ -195,7 +201,7 @@ class Streamer:
else:
sender_stream_list = None
if self.metadata_receiver:
if self.metadata_receiver and not self.fix_metadata_stream:
try:
metadata_stream_list = (
self.metadata_receiver.get_stream_list())
......@@ -228,7 +234,7 @@ class Streamer:
if (has_data or new_stream is not None) and self.end_of_stream_callback:
# call the callback only once
print("Start next: ", new_stream)
self.end_of_stream_callback(new_stream['name'])
self.end_of_stream_callback(new_stream)
self.end_of_stream_callback = None
if new_stream is not None:
......
import pytest
from AsapoWorker.streamer import get_new_stream
@pytest.fixture
def receiver_streams():
return [
{'lastId': 1, 'name': 'default', 'timestampCreated': 10, 'timestampLast': 10,
'finished': False, 'nextStream': ''},
{'lastId': 4, 'name': '1', 'timestampCreated': 30, 'timestampLast': 11,
'finished': True, 'nextStream': ''},
{'lastId': 4, 'name': '2', 'timestampCreated': 20, 'timestampLast': 12,
'finished': True, 'nextStream': ''},
{'lastId': 4, 'name': '3', 'timestampCreated': 40, 'timestampLast': 13,
'finished': True, 'nextStream': ''},
{'lastId': 1, 'name': '4', 'timestampCreated': 50, 'timestampLast': 14,
'finished': False, 'nextStream': ''},
{'lastId': 1, 'name': '5', 'timestampCreated': 60, 'timestampLast': 15,
'finished': False, 'nextStream': ''}]
@pytest.fixture
def sender_streams():
return [
{'lastId': 1, 'name': 'default', 'timestampCreated': 10, 'timestampLast': 10,
'finished': False, 'nextStream': ''},
{'lastId': 4, 'name': '1', 'timestampCreated': 30, 'timestampLast': 11,
'finished': True, 'nextStream': ''},
{'lastId': 4, 'name': '2', 'timestampCreated': 20, 'timestampLast': 12,
'finished': True, 'nextStream': ''},
{'lastId': 4, 'name': '3', 'timestampCreated': 40, 'timestampLast': 13,
'finished': False, 'nextStream': ''}]
def test_get_next_by_date(receiver_streams, sender_streams):
stream = receiver_streams[0]['name']
expected = ['2', '1', '3', '4', '5', None]
for expected_stream in expected:
stream = get_new_stream(stream, receiver_streams, receiver_streams,
sender_stream_list=sender_streams, naming_scheme='date')
assert stream == expected_stream
def test_get_next_by_number(receiver_streams, sender_streams):
stream = receiver_streams[1]['name']
expected = ['2', '3', '4', '5', None]
for expected_stream in expected:
stream = get_new_stream(stream, receiver_streams, receiver_streams,
sender_stream_list=sender_streams, naming_scheme='numeric')
assert stream == expected_stream
def test_get_next_by_name(receiver_streams, sender_streams):
stream = receiver_streams[0]['name']
expected = ['4', '5', None, None, None]
for expected_stream in expected:
stream = get_new_stream(stream, receiver_streams, receiver_streams,
sender_stream_list=sender_streams, naming_scheme='basename')
sender_streams += [x for x in receiver_streams if x['name'] == stream]
assert stream == expected_stream
if stream is None:
break
def test_get_next_by_name_nonfinished(receiver_streams, sender_streams):
stream = receiver_streams[0]['name']
expected = ['3', '4', '5', None, None]
for expected_stream in expected:
new_stream = get_new_stream(stream, receiver_streams, receiver_streams,
sender_stream_list=sender_streams, naming_scheme='basename_nonfinished')
finished_stream = [x for x in receiver_streams if x['name'] == stream]
finished_stream[0]['finished'] = True
sender_streams += finished_stream
stream = new_stream
assert stream == expected_stream
if stream is None:
break
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment