diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 7af62697c5909510e4436ac32afa1e87385e764d..ce7dfbc01799da5297753a7ad3ac4695f2aba5d0 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -381,7 +381,12 @@ func (db *Mongodb) queryImages(dbname string, query string) ([]byte, error) { log_str := "processed query " + query + " for " + dbname + " ,found" + strconv.Itoa(len(res)) + " records" logger.Debug(log_str) - return utils.MapToJson(&res) + if res != nil { + return utils.MapToJson(&res) + } else { + return []byte("[]"), nil + } + } func (db *Mongodb) ProcessRequest(db_name string, group_id string, op string, extra_param string) (answer []byte, err error) { diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index 7c537a36a7d2575fb34e7c71a3fec413d2a74dd2..8d9f50151b2d0c5d24fb9f4cb401a2ee10f05744 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -356,6 +356,7 @@ var tests = []struct { ok bool }{ {"meta.counter = 10", []TestRecordMeta{recq1, recq3}, true}, + {"meta.counter = 18", []TestRecordMeta{}, true}, {"meta.counter = 11", []TestRecordMeta{recq2}, true}, {"meta.counter > 11", []TestRecordMeta{recq4}, true}, {"meta.counter < 11", []TestRecordMeta{recq1, recq3}, true}, diff --git a/common/cpp/src/json_parser/rapid_json.cpp b/common/cpp/src/json_parser/rapid_json.cpp index cc33e937c6d8cb15cca739ff4e444467441b1562..34e0ce74e0cb9dea06c4e4ca3f7c87f35fabb2f2 100644 --- a/common/cpp/src/json_parser/rapid_json.cpp +++ b/common/cpp/src/json_parser/rapid_json.cpp @@ -64,7 +64,7 @@ asapo::Error RapidJson::CheckValueType(const std::string& name, ValueType type, break; } if (!res) { - return TextError("wrong type: " + name + " in: " + json_); + return TextError("wrong type for: " + name + " in: " + json_); } return nullptr; diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 39c80543b6c572b4a3ca9ccdb9d5ae520ecdf835..49bf28341180e6afbe3635d8cf0b9dad8be060b0 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -105,7 +105,7 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it for(uint64_t i = 0; i < iterations; i++) { auto buffer = CreateMemoryBuffer(number_of_byte); asapo::EventHeader event_header{i + 1, number_of_byte, std::to_string(i + 1)}; - std::string meta = "{\"dummy_meta\":\"test" + std::to_string(i + 1) + "\"}"; + std::string meta = "{\"user_meta\":\"test" + std::to_string(i + 1) + "\"}"; auto err = producer->SendData(event_header, std::move(buffer), std::move(meta), &ProcessAfterSend); if (err) { std::cerr << "Cannot send file: " << err << std::endl; diff --git a/tests/automatic/full_chain/CMakeLists.txt b/tests/automatic/full_chain/CMakeLists.txt index 35c1e00165fbc6880dd9988907e659978ce95b04..035c7a4da4e7bc2d3914ed30b0d22e0aacc4cf89 100644 --- a/tests/automatic/full_chain/CMakeLists.txt +++ b/tests/automatic/full_chain/CMakeLists.txt @@ -1,4 +1,7 @@ add_subdirectory(simple_chain) +if (UNIX OR CMAKE_BUILD_TYPE STREQUAL "Release") + add_subdirectory(simple_chain_usermeta_python) +endif() add_subdirectory(simple_chain_metadata) add_subdirectory(two_beamlines) add_subdirectory(simple_chain_filegen) diff --git a/tests/automatic/full_chain/simple_chain/check_linux.sh b/tests/automatic/full_chain/simple_chain/check_linux.sh index c410fcf2869157b2f3657e5231f9acda572985e5..fa3fb0d06791a27a1e6606ecc30c826368cf790b 100644 --- a/tests/automatic/full_chain/simple_chain/check_linux.sh +++ b/tests/automatic/full_chain/simple_chain/check_linux.sh @@ -48,4 +48,4 @@ $1 localhost:8400 ${beamtime_id} 100 1000 4 0 100 & $2 ${proxy_address} ${receiver_folder} ${beamtime_id} 2 $token 5000 1 > out cat out cat out | grep "Processed 1000 file(s)" -cat out | grep "Cannot get metadata" \ No newline at end of file +cat out | grep "Cannot get metadata" diff --git a/tests/automatic/full_chain/simple_chain_usermeta_python/CMakeLists.txt b/tests/automatic/full_chain/simple_chain_usermeta_python/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..e11f1a5e978ed7649333954ca20860c58bc825f0 --- /dev/null +++ b/tests/automatic/full_chain/simple_chain_usermeta_python/CMakeLists.txt @@ -0,0 +1,15 @@ +set(TARGET_NAME full_chain_usermeta_python) + +################################ +# Testing +################################ +prepare_asapo() + +if (UNIX) + get_target_property(PYTHON_LIBS python-lib2 BINARY_DIR) +else() + get_target_property(PYTHON_LIBS asapo_worker BINARY_DIR) +endif() + + +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:dummy-data-producer> $<TARGET_PROPERTY:asapo,EXENAME> ${CMAKE_CURRENT_SOURCE_DIR} ${PYTHON_LIBS}" nomem) diff --git a/tests/automatic/full_chain/simple_chain_usermeta_python/check_linux.sh b/tests/automatic/full_chain/simple_chain_usermeta_python/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..b6209b156287f32c879e69a33d62288ecc8e643c --- /dev/null +++ b/tests/automatic/full_chain/simple_chain_usermeta_python/check_linux.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash + +set -e + +trap Cleanup EXIT + +beamtime_id=asapo_test +token=`$2 token -secret broker_secret.key $beamtime_id` + +monitor_database_name=db_test +proxy_address=127.0.0.1:8400 + +beamline=test +receiver_root_folder=/tmp/asapo/receiver/files +receiver_folder=${receiver_root_folder}/${beamline}/${beamtime_id} + +Cleanup() { + echo cleanup + rm -rf ${receiver_root_folder} + nomad stop nginx + nomad stop receiver + nomad stop discovery + nomad stop broker + nomad stop authorizer + rm -rf out + echo "db.dropDatabase()" | mongo ${beamtime_id} + influx -execute "drop database ${monitor_database_name}" +} + +influx -execute "create database ${monitor_database_name}" +echo "db.${beamtime_id}.insert({dummy:1})" | mongo ${beamtime_id} + +nomad run nginx.nmd +nomad run authorizer.nmd +nomad run receiver.nmd +nomad run discovery.nmd +nomad run broker.nmd + +sleep 1 + +#producer +mkdir -p ${receiver_folder} +$1 localhost:8400 ${beamtime_id} 100 100 1 0 100 + +export PYTHONPATH=$4:${PYTHONPATH} + + +python $3/get_user_meta.py $proxy_address $receiver_folder $beamtime_id $token new > out +cat out +cat out | grep "found images: 100" +#cat out | grep "test100" diff --git a/tests/automatic/full_chain/simple_chain_usermeta_python/check_windows.bat b/tests/automatic/full_chain/simple_chain_usermeta_python/check_windows.bat new file mode 100644 index 0000000000000000000000000000000000000000..3cfc352e7b8bcedc077a2063f269ece07c68efec --- /dev/null +++ b/tests/automatic/full_chain/simple_chain_usermeta_python/check_windows.bat @@ -0,0 +1,52 @@ +SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" +SET beamtime_id=asapo_test +SET beamline=test +SET receiver_root_folder=c:\tmp\asapo\receiver\files +SET receiver_folder="%receiver_root_folder%\%beamline%\%beamtime_id%" + + +"%2" token -secret broker_secret.key %beamtime_id% > token +set /P token=< token + +set proxy_address="127.0.0.1:8400" + +echo db.%beamtime_id%.insert({dummy:1}) | %mongo_exe% %beamtime_id% + +c:\opt\consul\nomad run receiver.nmd +c:\opt\consul\nomad run authorizer.nmd +c:\opt\consul\nomad run discovery.nmd +c:\opt\consul\nomad run broker.nmd +c:\opt\consul\nomad run nginx.nmd + +ping 1.0.0.0 -n 10 -w 100 > nul + +REM producer +mkdir %receiver_folder% +"%1" %proxy_address% %beamtime_id% 100 100 4 0 100 + +REM worker +set PYTHONPATH=%4 + +python3 %3/get_user_meta.py %proxy_address% %receiver_folder% %beamtime_id% %token% new > out +type out +type out | findstr /c:"found images: 100" || goto :error +rem todo: implement orderby +rem type out | findstr /c:"test100" || goto :error + +goto :clean + +:error +call :clean +exit /b 1 + +:clean +c:\opt\consul\nomad stop receiver +c:\opt\consul\nomad stop discovery +c:\opt\consul\nomad stop broker +c:\opt\consul\nomad stop authorizer +c:\opt\consul\nomad stop nginx +rmdir /S /Q %receiver_root_folder% +del /f token +echo db.dropDatabase() | %mongo_exe% %beamtime_id% + + diff --git a/tests/automatic/full_chain/simple_chain_usermeta_python/get_user_meta.py b/tests/automatic/full_chain/simple_chain_usermeta_python/get_user_meta.py new file mode 100644 index 0000000000000000000000000000000000000000..fc7c6193ccaf23c44b0328f407add0c5bf49d7c3 --- /dev/null +++ b/tests/automatic/full_chain/simple_chain_usermeta_python/get_user_meta.py @@ -0,0 +1,21 @@ +from __future__ import print_function + +import asapo_worker +import json +import sys + +source, path, beamtime, token, group_id = sys.argv[1:] + +broker, err = asapo_worker.create_server_broker(source,path, beamtime,token,1000) + + +images,err = broker.query_images("meta.user_meta regexp 'test*'") + +if err != None: + print ('err: ', err) +else: + print ('found images:',len(images)) + print (images[99]['meta']['user_meta']) + + + diff --git a/tests/automatic/worker/worker_api/worker_api.cpp b/tests/automatic/worker/worker_api/worker_api.cpp index 2a29082e49b9f462fe05e4ed26b613447d2a23b0..820c2afb6b42b03b219ba6cf3aff14313869ff27 100644 --- a/tests/automatic/worker/worker_api/worker_api.cpp +++ b/tests/automatic/worker/worker_api/worker_api.cpp @@ -76,6 +76,27 @@ void GetAllFromBroker(const Args& args) { M_AssertTrue(err == nullptr, "GetNext5 no error"); M_AssertTrue(fi.name == "1", "GetNext5 filename"); + auto images = broker->QueryImages("meta.test = 10", &err); + M_AssertTrue(err == nullptr, "query1"); + M_AssertTrue(images.size() == 10, "size of query answer 1"); + + images = broker->QueryImages("meta.test = 10 AND name='1'", &err); + M_AssertTrue(err == nullptr, "query2"); + M_AssertTrue(images.size() == 1, "size of query answer 2"); + M_AssertTrue(fi.name == "1", "GetNext5 filename"); + + + images = broker->QueryImages("meta.test = 11", &err); + M_AssertTrue(err == nullptr, "query3"); + M_AssertTrue(images.size() == 0, "size of query answer 3"); + + images = broker->QueryImages("meta.test = 18", &err); + M_AssertTrue(err == nullptr, "query4"); + M_AssertTrue(images.size() == 0, "size of query answer 4"); + + images = broker->QueryImages("bla", &err); + M_AssertTrue(err != nullptr, "query5"); + M_AssertTrue(images.size() == 0, "size of query answer 5"); } int main(int argc, char* argv[]) { diff --git a/tests/automatic/worker/worker_api_python/worker_api.py b/tests/automatic/worker/worker_api_python/worker_api.py index c37876803873ac91ed48e177ffcb4be0b60b3d98..8f3e1bf0f2b0e22b95e6e80859be1f41f4aee760 100644 --- a/tests/automatic/worker/worker_api_python/worker_api.py +++ b/tests/automatic/worker/worker_api_python/worker_api.py @@ -4,12 +4,12 @@ import asapo_worker import json import sys -def assert_err(err,name): +def assert_noterr(err, name): if err != None: print (name + ' err: ', err) sys.exit(1) -def assert_noterr(err,name): +def assert_err(err, name): if err == None: print (name + ' err: ', err) sys.exit(1) @@ -22,7 +22,7 @@ def assert_metaname(meta,compare,name): sys.exit(1) def assert_usermetadata(meta,name): - if meta['user_metadata']['test'] != 10: + if meta['meta']['test'] != 10: print ('meta: ', json.dumps(meta, indent=4, sort_keys=True)) print ("error at "+name) print ('meta: ', json.dumps(meta, indent=4, sort_keys=True)) @@ -41,47 +41,70 @@ source, path, beamtime, token = sys.argv[1:] broker, err = asapo_worker.create_server_broker(source,path, beamtime,token,1000) group_id_new, err = broker.generate_group_id() -assert_err(err,"generate_group") +assert_noterr(err, "generate_group") _, meta, err = broker.get_next(group_id_new, meta_only=True) -assert_err(err,"get_next") +assert_noterr(err, "get_next") assert_metaname(meta,"1","get next1") assert_usermetadata(meta,"get next1") _, meta, err = broker.get_next(group_id_new, meta_only=True) -assert_err(err,"get_next2") +assert_noterr(err, "get_next2") assert_metaname(meta,"2","get next2") assert_usermetadata(meta,"get next2") _, meta, err = broker.get_last(group_id_new, meta_only=True) -assert_err(err,"get_last1") +assert_noterr(err, "get_last1") assert_metaname(meta,"5","get last1") assert_usermetadata(meta,"get last1") _, meta, err = broker.get_next(group_id_new, meta_only=True) -assert_noterr(err,"get_next3") +assert_err(err, "get_next3") size,err = broker.get_ndatasets() -assert_err(err,"get_ndatasets") +assert_noterr(err, "get_ndatasets") assert_eq(size,5,"get_ndatasets") err = broker.reset_counter(group_id_new) -assert_err(err,"reset_counter") +assert_noterr(err, "reset_counter") _, meta, err = broker.get_next(group_id_new, meta_only=True) -assert_err(err,"get_next4") +assert_noterr(err, "get_next4") assert_metaname(meta,"1","get next4") assert_usermetadata(meta,"get next4") _, meta, err = broker.get_by_id(3, group_id_new, meta_only=True) -assert_err(err,"get_by_id") +assert_noterr(err, "get_by_id") assert_metaname(meta,"3","get get_by_id") assert_usermetadata(meta,"get get_by_id") _, meta, err = broker.get_next(group_id_new, meta_only=True) -assert_err(err,"get_next5") +assert_noterr(err, "get_next5") assert_metaname(meta,"4","get next5") assert_usermetadata(meta,"get next5") + + +images,err = broker.query_images("meta.test = 10") +assert_noterr(err, "query1") +assert_eq(len(images),5,"size of query answer 1") +for image in images: + assert_usermetadata(image,"query_images") + + +images,err = broker.query_images("meta.test = 10 AND name='1'") +assert_eq(len(images),1,"size of query answer 2 ") +assert_noterr(err, "query2") + +for image in images: + assert_usermetadata(image,"query_images") + +images,err = broker.query_images("meta.test = 11") +assert_eq(len(images),0,"size of query answer 3 ") +assert_noterr(err, "query3") + +images,err = broker.query_images("bla") +assert_err(err, "wrong query") + diff --git a/worker/api/python/asapo_worker.pxd b/worker/api/python/asapo_worker.pxd index 0cd548ea26769f1b699895c2cc94fb037c220e91..4b12bb5e14230ac6bc4d7733dd03968399a634f0 100644 --- a/worker/api/python/asapo_worker.pxd +++ b/worker/api/python/asapo_worker.pxd @@ -1,5 +1,6 @@ from libcpp.memory cimport unique_ptr from libcpp.string cimport string +from libcpp.vector cimport vector ctypedef unsigned char uint8_t @@ -21,6 +22,9 @@ cdef extern from "asapo_worker.h" namespace "asapo": cdef extern from "asapo_worker.h" namespace "asapo": cppclass FileInfo: string Json() + cppclass FileInfos: + vector[FileInfo].iterator begin() + vector[FileInfo].iterator end() @@ -35,6 +39,7 @@ cdef extern from "asapo_worker.h" namespace "asapo": Error ResetCounter(string group_id) string GenerateNewGroupId(Error* err) string GetBeamtimeMeta(Error* err) + FileInfos QueryImages(string query, Error* err) cdef extern from "asapo_worker.h" namespace "asapo": diff --git a/worker/api/python/asapo_worker.pyx.in b/worker/api/python/asapo_worker.pyx.in index 6f66135190b183058305b851bed2ca52a3ea52b1..46b1f7845784068410bef088ccc74cd6558c1e26 100644 --- a/worker/api/python/asapo_worker.pyx.in +++ b/worker/api/python/asapo_worker.pyx.in @@ -45,8 +45,6 @@ cdef class PyDataBroker: del meta['buf_id'] del meta['source'] del meta['lastchange'] - if 'meta' in meta: - meta['user_metadata'] = meta.pop('meta') if meta_only: return None,meta,None cdef char* ptr = <char*> data.release() @@ -84,6 +82,20 @@ cdef class PyDataBroker: return None, err_str else: return _str(group_id), None + def query_images(self,query): + cdef Error err + cdef FileInfos file_infos + file_infos = self.c_broker.QueryImages(_bytes(query),&err) + err_str = _str(GetErrorString(&err)) + if err_str.strip(): + return None, err_str + else: + json_list = [] + for fi in file_infos: + json_list.append(json.loads(_str(fi.Json()))) + return json_list, None + + def get_beamtime_meta(self): cdef Error err cdef string meta_str diff --git a/worker/tools/folder_to_db/unittests/test_folder_to_db.cpp b/worker/tools/folder_to_db/unittests/test_folder_to_db.cpp index 5ff035dddf9e61938eaac7295ebdcb8486bf8bf7..c8afc46141d2c3bf2a7d0f68735bc83bf60c57e1 100644 --- a/worker/tools/folder_to_db/unittests/test_folder_to_db.cpp +++ b/worker/tools/folder_to_db/unittests/test_folder_to_db.cpp @@ -249,8 +249,8 @@ TEST_F(FolderDBConverterTests, ComputesStatistics) { asapo::FolderImportStatistics statistics; - statistics.time_read_folder = std::chrono::nanoseconds{-1}; - statistics.time_import_files = std::chrono::nanoseconds{-1}; + statistics.time_read_folder = std::chrono::nanoseconds{ -1}; + statistics.time_import_files = std::chrono::nanoseconds{ -1}; auto error = converter.Convert(uri, folder, db_name, &statistics);