diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index e2d5e78b2e215d523668617556ecda76701bb020..1d5660c1757e47387e2347be6fa2bd661db8eb27 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -120,6 +120,9 @@ test-services-linux-debug: - bash $CI_PROJECT_DIR/deploy/build_env/services-linux/run_asapo.sh - cd $CI_PROJECT_DIR/build - ctest --no-compress-output -T Test -L all -E "full_chain_monitoring|noaccess|restart|logger_fluentd" --output-on-failure --output-junit testResult.xml + - pip3 install pytest + - pytest -vv -o log_cli=true --log-cli-level=DEBUG ../tests/automatic/pytests/ + tags: - kubernetes-executor rules: diff --git a/tests/automatic/pytests/test_pipeline.py b/tests/automatic/pytests/test_pipeline.py new file mode 100644 index 0000000000000000000000000000000000000000..cae5ab61ed20c26f9d019e717f435612e6f14909 --- /dev/null +++ b/tests/automatic/pytests/test_pipeline.py @@ -0,0 +1,176 @@ +import pytest +import os +import logging +import json +import sys +from time import sleep + +sys.path.insert(0,'producer/api/python/') +sys.path.insert(0,'consumer/api/python/') + +import asapo_producer +import asapo_consumer + +log = logging.getLogger(__name__) + +beamtime="asapo_test" +data_source="python" +token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiO" +"jk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmNDNhbGZwOHJua20wIiwic3V" +"iIjoiYnRfYXNhcG9fdGVzdCIsIkV4dHJhQ2xhaW1zIjp7IkFjY2Vzc1R5c" +"GVzIjpbIndyaXRlIiwicmVhZCJdfX0.EGBSmV8YIJtwstHsoidhZt4ZXo18nJOnUQGJkCpoH4I") + +host = "localhost:8400" +data_path = "/tmp/asapo/receiver/files/test_facility/gpfs/test/2019/data/asapo_test/processed" + +def callback(payload,err): + if err is not None and not isinstance(err, asapo_producer.AsapoServerWarning): + # the data was not sent. Something is terribly wrong. + log.debug(f"could not send: {payload} {err}") + elif err is not None: + # The data was sent, but there was some unexpected problem, e.g. the file was overwritten. + log.debug(f"sent with warning: {payload} {err}") + else: + # all fine + log.debug(f"successfuly sent: {payload}") + +@pytest.fixture(autouse=True) +def cleanup_db(): + os.makedirs(data_path, exist_ok=True) + yield None + os.system(f'echo "db.dropDatabase()" | mongo {beamtime}_{data_source} >/dev/null') + os.system(f"rm -fr {data_path}") + +def test_delete_stream(): + stream_name = "test001" + producer = asapo_producer.create_producer(host, 'processed', beamtime, "auto", data_source, + token, 1, 6000) + + consumer = asapo_consumer.create_consumer(host, "", False, beamtime, data_source, token, 1000) + g = consumer.generate_group_id() + + def inject_data(): + for i in range(5): + producer.send(i+1, # message number. Should be unique and ordered. + f"processed/file_part{i}.cbf", # + b"The data", # binary data + callback = callback, + stream=stream_name) + producer.wait_requests_finished(3000) + + # Create stream by injecting data + inject_data() + # asapo_consumer.AsapoEndOfStreamError + # check stream info + log.debug(consumer.get_stream_list()) + stream = consumer.get_stream_list()[0] + assert(stream['name'] == stream_name) + assert(stream['lastId'] == 5) + log.debug(stream) + + # Delete stream + producer.delete_stream(stream=stream_name) + # There is no wait or call-back function. Wait manually. + sleep(1) + stream_list = consumer.get_stream_list() + assert(len(stream_list) == 0) + + # Create stream again + inject_data() + stream = consumer.get_stream_list()[0] + assert(stream['name'] == stream_name) + assert(stream['lastId'] == 5) + log.debug(stream) + + # Delete stream with consumer + consumer.delete_stream(stream=stream_name) + sleep(1) + stream_list = consumer.get_stream_list() + assert(len(stream_list) == 0) + +def test_get_next_ordered(): + stream_name = "test001" + producer = asapo_producer.create_producer(host, 'processed', beamtime, "auto", data_source, + token, 1, 6000) + + consumer = asapo_consumer.create_consumer(host, "", False, beamtime, data_source, token, 1000) + producer.send(2, + f"processed/file_part1.cbf", # + b"The data", # binary data + callback = callback, + stream=stream_name) + producer.wait_requests_finished(3000) + + with pytest.raises(Exception): + consumer.get_next(consumer.generate_group_id(), meta_only=True, stream=stream_name) + + _, mdata = consumer.get_next(consumer.generate_group_id(), meta_only=True, stream=stream_name, ordered=False) + assert(mdata['_id'] == 2) + + producer.send(1, + f"processed/file_part1.cbf", # + b"The data", # binary data + callback = callback, + stream=stream_name) + producer.wait_requests_finished(3000) + + _, mdata = consumer.get_next(consumer.generate_group_id(), meta_only=True, stream=stream_name) + assert(mdata['_id'] == 1) + +def test_get_next_ordered_resend_no_delay(): + stream_name = "test001" + producer = asapo_producer.create_producer(host, 'processed', beamtime, "auto", data_source, + token, 1, 6000) + + consumer = asapo_consumer.create_consumer(host, "", False, beamtime, data_source, token, 500) + g = consumer.generate_group_id() + consumer.set_resend_nacs(True, 0, 2) + + producer.send(2, + f"processed/file_part1.cbf", # + b"The data", # binary data + callback = callback, + stream=stream_name) + producer.wait_requests_finished(3000) + + with pytest.raises(asapo_consumer.AsapoNoDataError): + consumer.get_next(g, meta_only=True, stream=stream_name) + + with pytest.raises(asapo_consumer.AsapoNoDataError): + consumer.get_next(g, meta_only=True, stream=stream_name) + + producer.send(1, + f"processed/file_part1.cbf", # + b"The data", # binary data + callback = callback, + stream=stream_name) + producer.wait_requests_finished(3000) + + _, mdata = consumer.get_next(g, meta_only=True, stream=stream_name) + assert(mdata['_id'] == 1) + +def test_get_next_available_resend_no_delay(): + stream_name = "test001" + producer = asapo_producer.create_producer(host, 'processed', beamtime, "auto", data_source, + token, 1, 6000) + + consumer = asapo_consumer.create_consumer(host, "", False, beamtime, data_source, token, 500) + g = consumer.generate_group_id() + consumer.set_resend_nacs(True, 0, 2) + + producer.send(2, + f"processed/file_part1.cbf", # + b"The data", # binary data + callback = callback, + stream=stream_name) + producer.wait_requests_finished(3000) + + producer.send(1, + f"processed/file_part1.cbf", # + b"The data", # binary data + callback = callback, + stream=stream_name) + producer.wait_requests_finished(3000) + + _, mdata = consumer.get_next(g, meta_only=True, stream=stream_name, ordered=False) + assert(mdata['_id'] == 2) \ No newline at end of file