From b46bf564b25403550e3bb3269a1afb6deac10d36 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Wed, 25 Sep 2019 22:18:34 +0200 Subject: [PATCH] more error types to Python API for consumer --- consumer/api/python/asapo_consumer.pyx.in | 17 +++++- deploy/docker/cluster/asapo-start | 5 +- .../consumer_api_python/check_linux.sh | 52 +++++++++++++++---- .../consumer_api_python/consumer_api.py | 40 ++++++++++++++ 4 files changed, 99 insertions(+), 15 deletions(-) diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index fd7d3047e..0b56182bc 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -34,6 +34,15 @@ class AsapoConsumerError(Exception): class AsapoWrongInputError(AsapoConsumerError): pass +class AsapoBrokerServerError(AsapoConsumerError): + pass + +class AsapoBrokerServersNotFound(AsapoConsumerError): + pass + +class AsapoIOError(AsapoConsumerError): + pass + class AsapoEndOfStreamError(AsapoConsumerError): def __init__(self,message,id_max=None): AsapoConsumerError.__init__(self,message) @@ -45,7 +54,6 @@ class AsapoNoDataError(AsapoConsumerError): self.id_max = id_max self.id = id - cdef throw_exception(Error& err): cdef ConsumerErrorData* data if err == kEndOfStream: @@ -62,10 +70,15 @@ cdef throw_exception(Error& err): raise AsapoNoDataError(err.get().Explain()) elif err == kWrongInput: raise AsapoWrongInputError(err.get().Explain()) + elif err == kIOError: + raise AsapoIOError(err.get().Explain()) + elif err == kBrokerServersNotFound: + raise AsapoBrokerServersNotFound(err.get().Explain()) + elif err == kBrokerServerError: + raise AsapoBrokerServerError(err.get().Explain()) else: raise AsapoConsumerError(err.get().Explain()) - cdef class PyDataBroker: cdef DataBroker* c_broker def _op(self, op, group_id, meta_only, uint64_t id): diff --git a/deploy/docker/cluster/asapo-start b/deploy/docker/cluster/asapo-start index daa182133..6b93af566 100755 --- a/deploy/docker/cluster/asapo-start +++ b/deploy/docker/cluster/asapo-start @@ -1,12 +1,9 @@ #!/usr/bin/env bash - -if [ ! -f /var/nomad/token_all ]; then +if [ ! -f /var/nomad/token ]; then nomad acl bootstrap > /var/nomad/bootstrap cat /var/nomad/bootstrap | grep Secret | awk '{print $4}' > /var/nomad/token cp /var/nomad/token $TF_VAR_service_dir/nomad_token fi -#export NOMAD_TOKEN=`cat /var/nomad/token ` - cd /var/run/asapo && terraform apply -auto-approve "$@" \ No newline at end of file diff --git a/tests/automatic/consumer/consumer_api_python/check_linux.sh b/tests/automatic/consumer/consumer_api_python/check_linux.sh index 2851220aa..a5007b554 100644 --- a/tests/automatic/consumer/consumer_api_python/check_linux.sh +++ b/tests/automatic/consumer/consumer_api_python/check_linux.sh @@ -7,19 +7,48 @@ database_name=${beamtime_id}_${stream} token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo= set -e +trap Cleanup EXIT +function wait_mongo { +NEXT_WAIT_TIME=0 +until mongo --port 27016 --eval "db.version()" | tail -2 | grep version || [ $NEXT_WAIT_TIME -eq 30 ]; do + echo "Wait for mongo" + NEXT_WAIT_TIME=$(( NEXT_WAIT_TIME++ )) + sleep 1 +done +if (( NEXT_WAIT_TIME == 30 )); then + echo "Timeout" + exit -1 +fi +} + + +function kill_mongo { + kill -2 `ps xa | grep mongod | grep 27016 | awk '{print $1;}'` +} + + +function start_mongo { + mongod --dbpath /tmp/mongo --port 27016 --logpath /tmp/mongolog --fork +} -trap Cleanup EXIT Cleanup() { set +e - nomad stop nginx - nomad stop discovery - nomad stop broker - echo "db.dropDatabase()" | mongo ${database_name} + nomad stop nginx >/dev/null + nomad stop discovery >/dev/null + nomad stop broker >/dev/null + echo "db.dropDatabase()" | mongo --port 27016 ${database_name} >/dev/null rm 1 1_1 + kill_mongo } +sed -i 's/27017/27016/g' discovery.json.tpl + + +start_mongo +wait_mongo + nomad run nginx.nmd nomad run discovery.nmd nomad run broker.nmd @@ -27,21 +56,26 @@ nomad run broker.nmd echo hello1 > 1 echo hello1 > 1_1 - for i in `seq 1 5`; do - echo 'db.data.insert({"_id":'$i',"size":6,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data.insert({"_id":'$i',"size":6,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo --port 27016 ${database_name} >/dev/null done sleep 1 export PYTHONPATH=$1:${PYTHONPATH} +kill_mongo +#python consumer_api.py 127.0.0.1:8400 $source_path $beamtime_id $token_test_run broker_server_error +sleep 1 +start_mongo +wait_mongo python consumer_api.py 127.0.0.1:8400 $source_path $beamtime_id $token_test_run single + #check datasets -echo "db.dropDatabase()" | mongo ${database_name} +echo "db.dropDatabase()" | mongo --port 27016 ${database_name} > /dev/null sleep 1 @@ -53,7 +87,7 @@ do images="$images,{"_id":$j,"size":6,"name":'${i}_${j}',"lastchange":1,"source":'none',"buf_id":0,"meta":{"test":10}}" done images=${images#?} - echo 'db.data.insert({"_id":'$i',"size":3,"images":['$images']})' | mongo ${database_name} + echo 'db.data.insert({"_id":'$i',"size":3,"images":['$images']})' | mongo --port 27016 ${database_name} >/dev/null done diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py index dcbf21bd3..a44fea88b 100644 --- a/tests/automatic/consumer/consumer_api_python/consumer_api.py +++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py @@ -29,6 +29,16 @@ def assert_eq(val,expected,name): print ('val: ', val,' expected: ',expected) sys.exit(1) + +def check_broker_server_error(broker,group_id_new): + try: + broker.get_last(group_id_new, meta_only=True) + except asapo_consumer.AsapoBrokerServerError as err: + print(err) + pass + else: + exit_on_noerr("AsapoBrokerServerError") + def check_single(broker,group_id_new): _, meta = broker.get_next(group_id_new, meta_only=True) @@ -79,6 +89,22 @@ def check_single(broker,group_id_new): assert_metaname(meta,"5","get next6") assert_usermetadata(meta,"get next6") + try: + broker.get_next("bla", meta_only=True) + except asapo_consumer.AsapoWrongInputError as err: + print(err) + pass + else: + exit_on_noerr("wrong input") + + + try: + broker.get_last(group_id_new, meta_only=False) + except asapo_consumer.AsapoIOError as err: + print(err) + pass + else: + exit_on_noerr("io error") images = broker.query_images("meta.test = 10") assert_eq(len(images),5,"size of query answer 1") @@ -102,6 +128,17 @@ def check_single(broker,group_id_new): else: exit_on_noerr("wrong query") + broker = asapo_consumer.create_server_broker("bla",path, beamtime,"",token,1000) + try: + broker.get_last(group_id_new, meta_only=True) + except asapo_consumer.AsapoBrokerServersNotFound as err: + print(err) + pass + else: + exit_on_noerr("AsapoBrokerServersNotFound") + + + def check_dataset(broker,group_id_new): id, metas = broker.get_next_dataset(group_id_new) assert_eq(id,1,"get_next_dataset1") @@ -146,6 +183,9 @@ broker = asapo_consumer.create_server_broker(source,path, beamtime,"",token,1000 group_id_new = broker.generate_group_id() +if mode == "broker_server_error": + check_broker_server_error(broker,group_id_new) + if mode == "single": check_single(broker,group_id_new) -- GitLab