Skip to content
Snippets Groups Projects
Commit b46bf564 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

more error types to Python API for consumer

parent 752f7c6e
No related branches found
No related tags found
No related merge requests found
......@@ -34,6 +34,15 @@ class AsapoConsumerError(Exception):
class AsapoWrongInputError(AsapoConsumerError):
pass
class AsapoBrokerServerError(AsapoConsumerError):
pass
class AsapoBrokerServersNotFound(AsapoConsumerError):
pass
class AsapoIOError(AsapoConsumerError):
pass
class AsapoEndOfStreamError(AsapoConsumerError):
def __init__(self,message,id_max=None):
AsapoConsumerError.__init__(self,message)
......@@ -45,7 +54,6 @@ class AsapoNoDataError(AsapoConsumerError):
self.id_max = id_max
self.id = id
cdef throw_exception(Error& err):
cdef ConsumerErrorData* data
if err == kEndOfStream:
......@@ -62,10 +70,15 @@ cdef throw_exception(Error& err):
raise AsapoNoDataError(err.get().Explain())
elif err == kWrongInput:
raise AsapoWrongInputError(err.get().Explain())
elif err == kIOError:
raise AsapoIOError(err.get().Explain())
elif err == kBrokerServersNotFound:
raise AsapoBrokerServersNotFound(err.get().Explain())
elif err == kBrokerServerError:
raise AsapoBrokerServerError(err.get().Explain())
else:
raise AsapoConsumerError(err.get().Explain())
cdef class PyDataBroker:
cdef DataBroker* c_broker
def _op(self, op, group_id, meta_only, uint64_t id):
......
#!/usr/bin/env bash
if [ ! -f /var/nomad/token_all ]; then
if [ ! -f /var/nomad/token ]; then
nomad acl bootstrap > /var/nomad/bootstrap
cat /var/nomad/bootstrap | grep Secret | awk '{print $4}' > /var/nomad/token
cp /var/nomad/token $TF_VAR_service_dir/nomad_token
fi
#export NOMAD_TOKEN=`cat /var/nomad/token `
cd /var/run/asapo && terraform apply -auto-approve "$@"
\ No newline at end of file
......@@ -7,19 +7,48 @@ database_name=${beamtime_id}_${stream}
token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo=
set -e
trap Cleanup EXIT
function wait_mongo {
NEXT_WAIT_TIME=0
until mongo --port 27016 --eval "db.version()" | tail -2 | grep version || [ $NEXT_WAIT_TIME -eq 30 ]; do
echo "Wait for mongo"
NEXT_WAIT_TIME=$(( NEXT_WAIT_TIME++ ))
sleep 1
done
if (( NEXT_WAIT_TIME == 30 )); then
echo "Timeout"
exit -1
fi
}
function kill_mongo {
kill -2 `ps xa | grep mongod | grep 27016 | awk '{print $1;}'`
}
function start_mongo {
mongod --dbpath /tmp/mongo --port 27016 --logpath /tmp/mongolog --fork
}
trap Cleanup EXIT
Cleanup() {
set +e
nomad stop nginx
nomad stop discovery
nomad stop broker
echo "db.dropDatabase()" | mongo ${database_name}
nomad stop nginx >/dev/null
nomad stop discovery >/dev/null
nomad stop broker >/dev/null
echo "db.dropDatabase()" | mongo --port 27016 ${database_name} >/dev/null
rm 1 1_1
kill_mongo
}
sed -i 's/27017/27016/g' discovery.json.tpl
start_mongo
wait_mongo
nomad run nginx.nmd
nomad run discovery.nmd
nomad run broker.nmd
......@@ -27,21 +56,26 @@ nomad run broker.nmd
echo hello1 > 1
echo hello1 > 1_1
for i in `seq 1 5`;
do
echo 'db.data.insert({"_id":'$i',"size":6,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name}
echo 'db.data.insert({"_id":'$i',"size":6,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo --port 27016 ${database_name} >/dev/null
done
sleep 1
export PYTHONPATH=$1:${PYTHONPATH}
kill_mongo
#python consumer_api.py 127.0.0.1:8400 $source_path $beamtime_id $token_test_run broker_server_error
sleep 1
start_mongo
wait_mongo
python consumer_api.py 127.0.0.1:8400 $source_path $beamtime_id $token_test_run single
#check datasets
echo "db.dropDatabase()" | mongo ${database_name}
echo "db.dropDatabase()" | mongo --port 27016 ${database_name} > /dev/null
sleep 1
......@@ -53,7 +87,7 @@ do
images="$images,{"_id":$j,"size":6,"name":'${i}_${j}',"lastchange":1,"source":'none',"buf_id":0,"meta":{"test":10}}"
done
images=${images#?}
echo 'db.data.insert({"_id":'$i',"size":3,"images":['$images']})' | mongo ${database_name}
echo 'db.data.insert({"_id":'$i',"size":3,"images":['$images']})' | mongo --port 27016 ${database_name} >/dev/null
done
......
......@@ -29,6 +29,16 @@ def assert_eq(val,expected,name):
print ('val: ', val,' expected: ',expected)
sys.exit(1)
def check_broker_server_error(broker,group_id_new):
try:
broker.get_last(group_id_new, meta_only=True)
except asapo_consumer.AsapoBrokerServerError as err:
print(err)
pass
else:
exit_on_noerr("AsapoBrokerServerError")
def check_single(broker,group_id_new):
_, meta = broker.get_next(group_id_new, meta_only=True)
......@@ -79,6 +89,22 @@ def check_single(broker,group_id_new):
assert_metaname(meta,"5","get next6")
assert_usermetadata(meta,"get next6")
try:
broker.get_next("bla", meta_only=True)
except asapo_consumer.AsapoWrongInputError as err:
print(err)
pass
else:
exit_on_noerr("wrong input")
try:
broker.get_last(group_id_new, meta_only=False)
except asapo_consumer.AsapoIOError as err:
print(err)
pass
else:
exit_on_noerr("io error")
images = broker.query_images("meta.test = 10")
assert_eq(len(images),5,"size of query answer 1")
......@@ -102,6 +128,17 @@ def check_single(broker,group_id_new):
else:
exit_on_noerr("wrong query")
broker = asapo_consumer.create_server_broker("bla",path, beamtime,"",token,1000)
try:
broker.get_last(group_id_new, meta_only=True)
except asapo_consumer.AsapoBrokerServersNotFound as err:
print(err)
pass
else:
exit_on_noerr("AsapoBrokerServersNotFound")
def check_dataset(broker,group_id_new):
id, metas = broker.get_next_dataset(group_id_new)
assert_eq(id,1,"get_next_dataset1")
......@@ -146,6 +183,9 @@ broker = asapo_consumer.create_server_broker(source,path, beamtime,"",token,1000
group_id_new = broker.generate_group_id()
if mode == "broker_server_error":
check_broker_server_error(broker,group_id_new)
if mode == "single":
check_single(broker,group_id_new)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment