Skip to content
Snippets Groups Projects
Commit 606bb19b authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Add pytests to test deleting stream. This is integration test that requires running asapo service.

parent a86a3156
No related branches found
No related tags found
No related merge requests found
...@@ -120,6 +120,9 @@ test-services-linux-debug: ...@@ -120,6 +120,9 @@ test-services-linux-debug:
- bash $CI_PROJECT_DIR/deploy/build_env/services-linux/run_asapo.sh - bash $CI_PROJECT_DIR/deploy/build_env/services-linux/run_asapo.sh
- cd $CI_PROJECT_DIR/build - cd $CI_PROJECT_DIR/build
- ctest --no-compress-output -T Test -L all -E "full_chain_monitoring|noaccess|restart|logger_fluentd" --output-on-failure --output-junit testResult.xml - ctest --no-compress-output -T Test -L all -E "full_chain_monitoring|noaccess|restart|logger_fluentd" --output-on-failure --output-junit testResult.xml
- pip3 install pytest
- pytest -vv -o log_cli=true --log-cli-level=DEBUG ../tests/automatic/pytests/
tags: tags:
- kubernetes-executor - kubernetes-executor
rules: rules:
......
import pytest
import os
import logging
import json
import sys
from time import sleep
sys.path.insert(0,'producer/api/python/')
sys.path.insert(0,'consumer/api/python/')
import asapo_producer
import asapo_consumer
log = logging.getLogger(__name__)
beamtime="asapo_test"
data_source="python"
token = str("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiO"
"jk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmNDNhbGZwOHJua20wIiwic3V"
"iIjoiYnRfYXNhcG9fdGVzdCIsIkV4dHJhQ2xhaW1zIjp7IkFjY2Vzc1R5c"
"GVzIjpbIndyaXRlIiwicmVhZCJdfX0.EGBSmV8YIJtwstHsoidhZt4ZXo18nJOnUQGJkCpoH4I")
host = "localhost:8400"
data_path = "/tmp/asapo/receiver/files/test_facility/gpfs/test/2019/data/asapo_test/processed"
def callback(payload,err):
if err is not None and not isinstance(err, asapo_producer.AsapoServerWarning):
# the data was not sent. Something is terribly wrong.
log.debug(f"could not send: {payload} {err}")
elif err is not None:
# The data was sent, but there was some unexpected problem, e.g. the file was overwritten.
log.debug(f"sent with warning: {payload} {err}")
else:
# all fine
log.debug(f"successfuly sent: {payload}")
@pytest.fixture(autouse=True)
def cleanup_db():
os.makedirs(data_path, exist_ok=True)
yield None
os.system(f'echo "db.dropDatabase()" | mongo {beamtime}_{data_source} >/dev/null')
os.system(f"rm -fr {data_path}")
def test_delete_stream():
stream_name = "test001"
producer = asapo_producer.create_producer(host, 'processed', beamtime, "auto", data_source,
token, 1, 6000)
consumer = asapo_consumer.create_consumer(host, "", False, beamtime, data_source, token, 1000)
g = consumer.generate_group_id()
def inject_data():
for i in range(5):
producer.send(i+1, # message number. Should be unique and ordered.
f"processed/file_part{i}.cbf", #
b"The data", # binary data
callback = callback,
stream=stream_name)
producer.wait_requests_finished(3000)
# Create stream by injecting data
inject_data()
# asapo_consumer.AsapoEndOfStreamError
# check stream info
log.debug(consumer.get_stream_list())
stream = consumer.get_stream_list()[0]
assert(stream['name'] == stream_name)
assert(stream['lastId'] == 5)
log.debug(stream)
# Delete stream
producer.delete_stream(stream=stream_name)
# There is no wait or call-back function. Wait manually.
sleep(1)
stream_list = consumer.get_stream_list()
assert(len(stream_list) == 0)
# Create stream again
inject_data()
stream = consumer.get_stream_list()[0]
assert(stream['name'] == stream_name)
assert(stream['lastId'] == 5)
log.debug(stream)
# Delete stream with consumer
consumer.delete_stream(stream=stream_name)
sleep(1)
stream_list = consumer.get_stream_list()
assert(len(stream_list) == 0)
def test_get_next_ordered():
stream_name = "test001"
producer = asapo_producer.create_producer(host, 'processed', beamtime, "auto", data_source,
token, 1, 6000)
consumer = asapo_consumer.create_consumer(host, "", False, beamtime, data_source, token, 1000)
producer.send(2,
f"processed/file_part1.cbf", #
b"The data", # binary data
callback = callback,
stream=stream_name)
producer.wait_requests_finished(3000)
with pytest.raises(Exception):
consumer.get_next(consumer.generate_group_id(), meta_only=True, stream=stream_name)
_, mdata = consumer.get_next(consumer.generate_group_id(), meta_only=True, stream=stream_name, ordered=False)
assert(mdata['_id'] == 2)
producer.send(1,
f"processed/file_part1.cbf", #
b"The data", # binary data
callback = callback,
stream=stream_name)
producer.wait_requests_finished(3000)
_, mdata = consumer.get_next(consumer.generate_group_id(), meta_only=True, stream=stream_name)
assert(mdata['_id'] == 1)
def test_get_next_ordered_resend_no_delay():
stream_name = "test001"
producer = asapo_producer.create_producer(host, 'processed', beamtime, "auto", data_source,
token, 1, 6000)
consumer = asapo_consumer.create_consumer(host, "", False, beamtime, data_source, token, 500)
g = consumer.generate_group_id()
consumer.set_resend_nacs(True, 0, 2)
producer.send(2,
f"processed/file_part1.cbf", #
b"The data", # binary data
callback = callback,
stream=stream_name)
producer.wait_requests_finished(3000)
with pytest.raises(asapo_consumer.AsapoNoDataError):
consumer.get_next(g, meta_only=True, stream=stream_name)
with pytest.raises(asapo_consumer.AsapoNoDataError):
consumer.get_next(g, meta_only=True, stream=stream_name)
producer.send(1,
f"processed/file_part1.cbf", #
b"The data", # binary data
callback = callback,
stream=stream_name)
producer.wait_requests_finished(3000)
_, mdata = consumer.get_next(g, meta_only=True, stream=stream_name)
assert(mdata['_id'] == 1)
def test_get_next_available_resend_no_delay():
stream_name = "test001"
producer = asapo_producer.create_producer(host, 'processed', beamtime, "auto", data_source,
token, 1, 6000)
consumer = asapo_consumer.create_consumer(host, "", False, beamtime, data_source, token, 500)
g = consumer.generate_group_id()
consumer.set_resend_nacs(True, 0, 2)
producer.send(2,
f"processed/file_part1.cbf", #
b"The data", # binary data
callback = callback,
stream=stream_name)
producer.wait_requests_finished(3000)
producer.send(1,
f"processed/file_part1.cbf", #
b"The data", # binary data
callback = callback,
stream=stream_name)
producer.wait_requests_finished(3000)
_, mdata = consumer.get_next(g, meta_only=True, stream=stream_name, ordered=False)
assert(mdata['_id'] == 2)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment