diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 54762061293d263de17b6124774c9fbbefb81134..c11ce3e367c10f49e428d38c4e611bd8f5adf3ee 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -121,8 +121,6 @@ test-services-linux-debug: - cd $CI_PROJECT_DIR/build - ctest --no-compress-output -T Test -E "full_chain_monitoring|noaccess|restart|logger_fluentd" --output-on-failure --output-junit testResult.xml - pip3 install pytest - - pytest -vv -o log_cli=true --log-cli-level=DEBUG ../tests/automatic/pytests/ - tags: - kubernetes-executor rules: @@ -133,6 +131,24 @@ test-services-linux-debug: reports: junit: build/testResult.xml +test_python_wheels: + services: + - name: gitlab.desy.de:5555/asapo/asapo/asapo-standalone-dev:latest + image: + name: $CI_REGISTRY_IMAGE/asapo-packages-manylinux-build-env:latest + stage: deploy + script: + - python_bin=/opt/python/cp37-cp37m/bin/python + - pip_bin=/opt/python/cp37-cp37m/bin/pip + - cd $CI_PROJECT_DIR/tests/automatic/pytests + - $pip_bin install pytest + - $pip_bin install --trusted-host nims.desy.de --find-links=http://nims.desy.de/extra/asapo/linux_wheels asapo_consumer==100.0.dev2 + - $pip_bin install --trusted-host nims.desy.de --find-links=http://nims.desy.de/extra/asapo/linux_wheels asapo_producer==100.0.dev2 + - $python_bin -m pytest test_pipeline.py --token-path standalone_token.txt -vv -o log_cli=true --log-cli-level=DEBUG + dependencies: + - upload-python-packages + - build-services-docker-release + build-services-linux-release: extends: .go-cache image: diff --git a/tests/automatic/pytests/conftest.py b/tests/automatic/pytests/conftest.py new file mode 100644 index 0000000000000000000000000000000000000000..cee0b1706ccdd54b648e2aa56b7bd5c12a2757f1 --- /dev/null +++ b/tests/automatic/pytests/conftest.py @@ -0,0 +1,25 @@ +# conftest.py +import pytest +import logging + +log = logging.getLogger(__name__) + +def pytest_addoption(parser): + parser.addoption("--token-path", action="store", default="", + help="Input string for tests") + +def pytest_configure(config): + token_path = config.getoption("--token-path") + + with open(token_path, "r") as f: + config.token = f.readline().split("\n")[0] + + config.beamtime="asapo_test" + config.data_source="python" + config.host = "localhost:8400" + config.data_path = "/tmp/asapo/receiver/files/test_facility/gpfs/test/2019/data/asapo_test/processed" + + +@pytest.fixture +def asapo_config(request): + return request.config diff --git a/tests/automatic/pytests/standalone_token.txt b/tests/automatic/pytests/standalone_token.txt new file mode 100644 index 0000000000000000000000000000000000000000..f4abfa25302c16df52c2bf8863dcd4fc69f49a4c --- /dev/null +++ b/tests/automatic/pytests/standalone_token.txt @@ -0,0 +1 @@ +eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjk1NzE3MTAyMTYsImp0aSI6Ind0ZmlzdGhpcyIsInN1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY2Nlc3NUeXBlcyI6WyJ3cml0ZSIsIndyaXRlcmF3IiwicmVhZCJdfX0.cz6R_kVf4yh7IJD6bJjDdgTaxPN3txudZx9DE6WaTtk \ No newline at end of file diff --git a/tests/automatic/pytests/test_pipeline.py b/tests/automatic/pytests/test_pipeline.py index cae5ab61ed20c26f9d019e717f435612e6f14909..f54fe74cc27697261f2229fc92f2b77ff7ef1148 100644 --- a/tests/automatic/pytests/test_pipeline.py +++ b/tests/automatic/pytests/test_pipeline.py @@ -3,27 +3,19 @@ import os import logging import json import sys -from time import sleep +from time import sleep +from datetime import datetime -sys.path.insert(0,'producer/api/python/') -sys.path.insert(0,'consumer/api/python/') +sys.path.insert(0, 'producer/api/python/') +sys.path.insert(0, 'consumer/api/python/') import asapo_producer import asapo_consumer log = logging.getLogger(__name__) -beamtime="asapo_test" -data_source="python" -token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiO" -"jk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmNDNhbGZwOHJua20wIiwic3V" -"iIjoiYnRfYXNhcG9fdGVzdCIsIkV4dHJhQ2xhaW1zIjp7IkFjY2Vzc1R5c" -"GVzIjpbIndyaXRlIiwicmVhZCJdfX0.EGBSmV8YIJtwstHsoidhZt4ZXo18nJOnUQGJkCpoH4I") -host = "localhost:8400" -data_path = "/tmp/asapo/receiver/files/test_facility/gpfs/test/2019/data/asapo_test/processed" - -def callback(payload,err): +def callback(payload, err): if err is not None and not isinstance(err, asapo_producer.AsapoServerWarning): # the data was not sent. Something is terribly wrong. log.debug(f"could not send: {payload} {err}") @@ -34,27 +26,35 @@ def callback(payload,err): # all fine log.debug(f"successfuly sent: {payload}") + @pytest.fixture(autouse=True) -def cleanup_db(): - os.makedirs(data_path, exist_ok=True) +def cleanup_db(asapo_config): + log.info(f"Run test for consumer {asapo_consumer.__version__} and producer {asapo_producer.__version__}") + os.makedirs(asapo_config.data_path, exist_ok=True) yield None - os.system(f'echo "db.dropDatabase()" | mongo {beamtime}_{data_source} >/dev/null') - os.system(f"rm -fr {data_path}") + os.system(f'echo "db.dropDatabase()" | mongo {asapo_config.beamtime}_{asapo_config.data_source} >/dev/null') + os.system(f"rm -fr {asapo_config.data_path}") + -def test_delete_stream(): +@pytest.mark.compatible +def test_delete_stream(asapo_config): stream_name = "test001" - producer = asapo_producer.create_producer(host, 'processed', beamtime, "auto", data_source, - token, 1, 6000) - - consumer = asapo_consumer.create_consumer(host, "", False, beamtime, data_source, token, 1000) + ts = datetime.now().strftime('%Y%m%dT_%H%M%S') + data_source = f"source_{ts}" + producer = asapo_producer.create_producer(asapo_config.host, 'processed', asapo_config.beamtime, "auto", + data_source, + asapo_config.token, 1, 6000) + + consumer = asapo_consumer.create_consumer(asapo_config.host, "", False, asapo_config.beamtime, + data_source, asapo_config.token, 1000) g = consumer.generate_group_id() def inject_data(): for i in range(5): - producer.send(i+1, # message number. Should be unique and ordered. - f"processed/file_part{i}.cbf", # - b"The data", # binary data - callback = callback, + producer.send(i + 1, # message number. Should be unique and ordered. + f"processed/{stream_name}_file_part{i}.cbf", # + b"The data", # binary data + callback=callback, stream=stream_name) producer.wait_requests_finished(3000) @@ -64,8 +64,8 @@ def test_delete_stream(): # check stream info log.debug(consumer.get_stream_list()) stream = consumer.get_stream_list()[0] - assert(stream['name'] == stream_name) - assert(stream['lastId'] == 5) + assert (stream['name'] == stream_name) + assert (stream['lastId'] == 5) log.debug(stream) # Delete stream @@ -73,64 +73,72 @@ def test_delete_stream(): # There is no wait or call-back function. Wait manually. sleep(1) stream_list = consumer.get_stream_list() - assert(len(stream_list) == 0) + assert (len(stream_list) == 0) # Create stream again inject_data() stream = consumer.get_stream_list()[0] - assert(stream['name'] == stream_name) - assert(stream['lastId'] == 5) + assert (stream['name'] == stream_name) + assert (stream['lastId'] == 5) log.debug(stream) # Delete stream with consumer consumer.delete_stream(stream=stream_name) sleep(1) stream_list = consumer.get_stream_list() - assert(len(stream_list) == 0) - -def test_get_next_ordered(): - stream_name = "test001" - producer = asapo_producer.create_producer(host, 'processed', beamtime, "auto", data_source, - token, 1, 6000) + assert (len(stream_list) == 0) + - consumer = asapo_consumer.create_consumer(host, "", False, beamtime, data_source, token, 1000) +@pytest.mark.compatible +def test_get_next_ordered(asapo_config): + stream_name = "test001" + ts = datetime.now().strftime('%Y%m%dT_%H%M%S') + data_source = f"source_{ts}" + producer = asapo_producer.create_producer(asapo_config.host, 'processed', asapo_config.beamtime, "auto", + data_source, + asapo_config.token, 1, 6000) + + consumer = asapo_consumer.create_consumer(asapo_config.host, "", False, asapo_config.beamtime, + data_source, asapo_config.token, 1000) producer.send(2, - f"processed/file_part1.cbf", # - b"The data", # binary data - callback = callback, - stream=stream_name) + f"processed/{stream_name}_file_part1.cbf", # + b"The data", # binary data + callback=callback, + stream=stream_name) producer.wait_requests_finished(3000) with pytest.raises(Exception): consumer.get_next(consumer.generate_group_id(), meta_only=True, stream=stream_name) - _, mdata = consumer.get_next(consumer.generate_group_id(), meta_only=True, stream=stream_name, ordered=False) - assert(mdata['_id'] == 2) - producer.send(1, - f"processed/file_part1.cbf", # - b"The data", # binary data - callback = callback, - stream=stream_name) + f"processed/{stream_name}_file_part1.cbf", # + b"The data", # binary data + callback=callback, + stream=stream_name) producer.wait_requests_finished(3000) _, mdata = consumer.get_next(consumer.generate_group_id(), meta_only=True, stream=stream_name) - assert(mdata['_id'] == 1) + assert (mdata['_id'] == 1) -def test_get_next_ordered_resend_no_delay(): - stream_name = "test001" - producer = asapo_producer.create_producer(host, 'processed', beamtime, "auto", data_source, - token, 1, 6000) - consumer = asapo_consumer.create_consumer(host, "", False, beamtime, data_source, token, 500) +@pytest.mark.compatible +def test_get_next_ordered_resend_no_delay(asapo_config): + ts = datetime.now().strftime('%Y%m%dT_%H%M%S') + stream_name = f"test001_{ts}" + producer = asapo_producer.create_producer(asapo_config.host, 'processed', asapo_config.beamtime, "auto", + asapo_config.data_source, + asapo_config.token, 1, 6000) + + consumer = asapo_consumer.create_consumer(asapo_config.host, "", False, asapo_config.beamtime, + asapo_config.data_source, asapo_config.token, 500) g = consumer.generate_group_id() consumer.set_resend_nacs(True, 0, 2) producer.send(2, - f"processed/file_part1.cbf", # - b"The data", # binary data - callback = callback, - stream=stream_name) + f"processed/{stream_name}_file_part1.cbf", # + b"The data", # binary data + callback=callback, + stream=stream_name) producer.wait_requests_finished(3000) with pytest.raises(asapo_consumer.AsapoNoDataError): @@ -140,37 +148,95 @@ def test_get_next_ordered_resend_no_delay(): consumer.get_next(g, meta_only=True, stream=stream_name) producer.send(1, - f"processed/file_part1.cbf", # - b"The data", # binary data - callback = callback, - stream=stream_name) + f"processed/{stream_name}_file_part1.cbf", # + b"The data", # binary data + callback=callback, + stream=stream_name) producer.wait_requests_finished(3000) _, mdata = consumer.get_next(g, meta_only=True, stream=stream_name) - assert(mdata['_id'] == 1) + assert (mdata['_id'] == 1) -def test_get_next_available_resend_no_delay(): - stream_name = "test001" - producer = asapo_producer.create_producer(host, 'processed', beamtime, "auto", data_source, - token, 1, 6000) - consumer = asapo_consumer.create_consumer(host, "", False, beamtime, data_source, token, 500) +def test_get_next_available_resend_no_delay(asapo_config): + ts = datetime.now().strftime('%Y%m%dT_%H%M%S') + stream_name = f"test001_{ts}" + producer = asapo_producer.create_producer(asapo_config.host, 'processed', asapo_config.beamtime, "auto", + asapo_config.data_source, + asapo_config.token, 1, 6000) + + consumer = asapo_consumer.create_consumer(asapo_config.host, "", False, asapo_config.beamtime, + asapo_config.data_source, asapo_config.token, 500) g = consumer.generate_group_id() consumer.set_resend_nacs(True, 0, 2) producer.send(2, - f"processed/file_part1.cbf", # - b"The data", # binary data - callback = callback, - stream=stream_name) + f"processed/{stream_name}_file_part1.cbf", # + b"The data", # binary data + callback=callback, + stream=stream_name) producer.wait_requests_finished(3000) producer.send(1, - f"processed/file_part1.cbf", # - b"The data", # binary data - callback = callback, - stream=stream_name) + f"processed/{stream_name}_file_part1.cbf", # + b"The data", # binary data + callback=callback, + stream=stream_name) producer.wait_requests_finished(3000) _, mdata = consumer.get_next(g, meta_only=True, stream=stream_name, ordered=False) - assert(mdata['_id'] == 2) \ No newline at end of file + assert (mdata['_id'] == 2) + + +@pytest.mark.compatible +def test_get_by_id_and_last(asapo_config): + ts = datetime.now().strftime('%Y%m%dT_%H%M%S') + stream_name = f"test001_{ts}" + producer = asapo_producer.create_producer(asapo_config.host, 'processed', asapo_config.beamtime, "auto", + asapo_config.data_source, + asapo_config.token, 1, 6000) + + consumer = asapo_consumer.create_consumer(asapo_config.host, "", False, asapo_config.beamtime, + asapo_config.data_source, asapo_config.token, 500) + + for i in [3, 1, 2, 5, 4]: + producer.send(i, + f"processed/{stream_name}_file_part{i}.cbf", # + b"The data", # binary data + callback=callback, + stream=stream_name) + producer.wait_requests_finished(3000) + + _, mdata = consumer.get_last(meta_only=True, stream=stream_name) + assert mdata["_id"] == 5 + + for i in range(1, 6): + _, mdata = consumer.get_by_id(i, meta_only=True, stream=stream_name) + assert mdata["_id"] == i + + +def test_get_source_list(asapo_config): + def create_source(source_name): + ts = datetime.now().strftime('%Y%m%dT_%H%M%S') + stream_name = f"test001_{ts}" + producer = asapo_producer.create_producer(asapo_config.host, 'processed', asapo_config.beamtime, "auto", + source_name, asapo_config.token, 1, 6000) + + producer.send(3, + f"processed/{stream_name}_file_part3.cbf", # + b"The data", # binary data + callback=callback, + stream="stream_1") + producer.wait_requests_finished(3000) + + for i in range(5): + create_source(f"source_{i}") + + consumer = asapo_consumer.create_consumer(asapo_config.host, "", False, asapo_config.beamtime, + asapo_config.data_source, asapo_config.token, 500) + + sources = consumer.get_source_list() + log.debug(f"All sources {sources}") + assert len(sources) >= 5 # In case of cleanup is not working + for i in range(5): + assert f"source_{i}" in sources