From ceca65db198dc3bdc8251e41bb459ba2f448a722 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Mon, 18 Nov 2019 15:28:59 +0100 Subject: [PATCH] fix bug with multidimentional numpy array and wait threads function --- common/cpp/src/request/request_pool.cpp | 6 +- producer/api/python/asapo_producer.pyx.in | 10 +- .../producer/python_api/check_linux.sh | 2 +- .../producer/python_api/check_windows.bat | 4 +- .../producer/python_api/producer_api.py | 17 +++- .../python_tests/producer/asapo_producer.so | 1 + .../python_tests/producer/discovery.json.tpl | 5 +- tests/manual/python_tests/producer/test.py | 37 +++++--- .../producer_wait_bug_mongo/asapo_producer.so | 1 + .../authorizer.json.tpl | 10 ++ .../producer_wait_bug_mongo/authorizer.nmd | 55 +++++++++++ .../producer_wait_bug_mongo/clean_db.sh | 7 ++ .../discovery.json.tpl | 11 +++ .../producer_wait_bug_mongo/discovery.nmd | 49 ++++++++++ .../producer_wait_bug_mongo/file1 | 1 + .../producer_wait_bug_mongo/nginx.conf.tpl | 91 +++++++++++++++++++ .../producer_wait_bug_mongo/nginx.nmd | 63 +++++++++++++ .../producer_wait_bug_mongo/receiver.json.tpl | 25 +++++ .../producer_wait_bug_mongo/receiver.nmd | 47 ++++++++++ .../producer_wait_bug_mongo/run.sh | 9 ++ .../producer_wait_bug_mongo/start_services.sh | 6 ++ .../producer_wait_bug_mongo/stop_services.sh | 7 ++ .../producer_wait_bug_mongo/test.py | 76 ++++++++++++++++ 23 files changed, 514 insertions(+), 26 deletions(-) create mode 120000 tests/manual/python_tests/producer/asapo_producer.so create mode 120000 tests/manual/python_tests/producer_wait_bug_mongo/asapo_producer.so create mode 100644 tests/manual/python_tests/producer_wait_bug_mongo/authorizer.json.tpl create mode 100644 tests/manual/python_tests/producer_wait_bug_mongo/authorizer.nmd create mode 100755 tests/manual/python_tests/producer_wait_bug_mongo/clean_db.sh create mode 100644 tests/manual/python_tests/producer_wait_bug_mongo/discovery.json.tpl create mode 100644 tests/manual/python_tests/producer_wait_bug_mongo/discovery.nmd create mode 100644 tests/manual/python_tests/producer_wait_bug_mongo/file1 create mode 100644 tests/manual/python_tests/producer_wait_bug_mongo/nginx.conf.tpl create mode 100644 tests/manual/python_tests/producer_wait_bug_mongo/nginx.nmd create mode 100644 tests/manual/python_tests/producer_wait_bug_mongo/receiver.json.tpl create mode 100644 tests/manual/python_tests/producer_wait_bug_mongo/receiver.nmd create mode 100755 tests/manual/python_tests/producer_wait_bug_mongo/run.sh create mode 100755 tests/manual/python_tests/producer_wait_bug_mongo/start_services.sh create mode 100755 tests/manual/python_tests/producer_wait_bug_mongo/stop_services.sh create mode 100644 tests/manual/python_tests/producer_wait_bug_mongo/test.py diff --git a/common/cpp/src/request/request_pool.cpp b/common/cpp/src/request/request_pool.cpp index 7d9b6c03b..44bbe5da2 100644 --- a/common/cpp/src/request/request_pool.cpp +++ b/common/cpp/src/request/request_pool.cpp @@ -49,9 +49,11 @@ void RequestPool::ProcessRequest(const std::unique_ptr<RequestHandler>& request_ requests_in_progress_--; request_handler->TearDownProcessingRequestLocked(success); if (!success) { - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); PutRequestBackToQueue(std::move(request)); condition_.notify_all(); + thread_info->lock.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + thread_info->lock.lock(); } } @@ -107,11 +109,11 @@ Error RequestPool::WaitRequestsFinished(uint64_t timeout_ms) { } void RequestPool::StopThreads() { + log__->Debug("trying to stop threads"); mutex_.lock(); quit_ = true; mutex_.unlock(); condition_.notify_all(); - for(size_t i = 0; i < threads_.size(); i++) { if(threads_[i].joinable()) { log__->Debug("finishing thread " + std::to_string(i)); diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 46c2b859f..9a3dd7341 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -60,10 +60,14 @@ cdef python_exception_from_error(Error& err): cdef throw_exception(Error& err): raise python_exception_from_error(err) -cdef void* data_pointer_nparray(data): +cdef void* data_pointer_nparray(data) except? NULL: if data is None: return <void*>NULL data_char = data.view(np.int8) + try: + data_char.shape=(-1) + except: + raise AsapoWrongInputError("cannot do no-copy flatten - non-contiguous array?") cdef char[::1] arr_memview = data_char return <void*>&arr_memview[0] @@ -183,7 +187,7 @@ cdef class PyProducer: :type callback: callback(info,err), where info - json string with event header that was used to send data, err - error string or None :raises: AsapoWrongInputError: wrong input (authorization, meta, ...) - AsapoLocalIOError: problems reading file to send + AsapoLocalIOError: problems reading file to send AsapoProducerError: actually should not happen """ @@ -201,7 +205,7 @@ cdef class PyProducer: :param timeout_ms: timeout in milliseconds :type timeout_ms: int :raises: - AsapoTimeoutError: requests not finished for a given timeout + AsapoTimeoutError: requests not finished for a given timeout """ cdef Error err cdef uint64_t timeout = timeout_ms diff --git a/tests/automatic/producer/python_api/check_linux.sh b/tests/automatic/producer/python_api/check_linux.sh index f3f67ec52..daa74995f 100644 --- a/tests/automatic/producer/python_api/check_linux.sh +++ b/tests/automatic/producer/python_api/check_linux.sh @@ -38,5 +38,5 @@ sleep 1 $1 $3 $stream $beamtime_id "127.0.0.1:8400" > out || cat out cat out -cat out | grep "successfuly sent" | wc -l | grep 7 +cat out | grep "successfuly sent" | wc -l | grep 8 cat out | grep "local i/o error" \ No newline at end of file diff --git a/tests/automatic/producer/python_api/check_windows.bat b/tests/automatic/producer/python_api/check_windows.bat index 070a6e844..7038ac0ff 100644 --- a/tests/automatic/producer/python_api/check_windows.bat +++ b/tests/automatic/producer/python_api/check_windows.bat @@ -27,12 +27,10 @@ set PYTHONPATH=%2 type out set NUM=0 for /F %%N in ('find /C "successfuly sent" ^< "out"') do set NUM=%%N -echo %NUM% | findstr 7 || goto error - +echo %NUM% | findstr 8 || goto error goto :clean - :error call :clean exit /b 1 diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index 8c2fab4ec..e6905705c 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -57,10 +57,25 @@ producer.send_data(6, stream+"/"+"file7",None, #send single file/wrong filename producer.send_file(1, local_path = "./file2", exposed_path = stream+"/"+"file1", callback = callback) +x = np.array([[1, 2, 3], [4, 5, 6]], np.float32) +producer.send_data(8, stream+"/"+"file8",x, + ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) + +try: + x = x.T + producer.send_data(8, stream+"/"+"file8",x, + ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) +except asapo_producer.AsapoWrongInputError as e: + print(e) +else: + print("should be error sending non-cont array") + sys.exit(1) + + producer.wait_requests_finished(50000) n = producer.get_requests_queue_size() if n!=0: - print("number of remaining requestst should be zero, got ",n) + print("number of remaining requests should be zero, got ",n) sys.exit(1) diff --git a/tests/manual/python_tests/producer/asapo_producer.so b/tests/manual/python_tests/producer/asapo_producer.so new file mode 120000 index 000000000..cd186e180 --- /dev/null +++ b/tests/manual/python_tests/producer/asapo_producer.so @@ -0,0 +1 @@ +/home/yakubov/projects/asapo/cmake-build-debug/producer/api/python/asapo_producer.so \ No newline at end of file diff --git a/tests/manual/python_tests/producer/discovery.json.tpl b/tests/manual/python_tests/producer/discovery.json.tpl index 3f16854ef..488362383 100644 --- a/tests/manual/python_tests/producer/discovery.json.tpl +++ b/tests/manual/python_tests/producer/discovery.json.tpl @@ -5,7 +5,10 @@ "UseIBAddress": false }, "Port": {{ env "NOMAD_PORT_discovery" }}, - "LogLevel":"debug" + "LogLevel":"debug", + "Mongo": { + "StaticEndpoint": "127.0.0.1:27017" + } } diff --git a/tests/manual/python_tests/producer/test.py b/tests/manual/python_tests/producer/test.py index a5960680a..83ea7b425 100644 --- a/tests/manual/python_tests/producer/test.py +++ b/tests/manual/python_tests/producer/test.py @@ -10,12 +10,10 @@ lock = threading.Lock() endpoint = "127.0.0.1:8400" beamtime = "asapo_test1" -stream = sys.argv[1] +stream = "detector" token = "" nthreads = 8 - - def callback(header,err): lock.acquire() # to print if err is not None: @@ -29,41 +27,50 @@ def assert_err(err): print(err) sys.exit(1) -producer, err = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads) -assert_err(err) +producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads) producer.set_log_level("info") #send single file -err = producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback) -assert_err(err) +producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback) + #send subsets producer.send_file(2, local_path = "./file1", exposed_path = stream+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) producer.send_file(3, local_path = "./file1", exposed_path = stream+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) #send meta only -err = producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever", +producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever", ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) -assert_err(err) data = np.arange(10,dtype=np.float64) #send data from array -err = producer.send_data(4, stream+"/"+"file5",data, +producer.send_data(4, stream+"/"+"file5",data, ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) -assert_err(err) #send data from string err = producer.send_data(5, stream+"/"+"file6",b"hello", ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) -assert_err(err) #send metadata only -err = producer.send_data(6, stream+"/"+"file7",None, +producer.send_data(6, stream+"/"+"file7",None, ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) -assert_err(err) -time.sleep(1) +x = np.array([[1, 2, 3], [4, 5, 6]], np.float32) +producer.send_data(4, stream+"/"+"file5",x, + ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) + +try: + x = x.T + producer.send_data(4, stream+"/"+"file5",x, + ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) +except: + pass +else: + print ("sohuld be exception") + + +producer.wait_requests_finished(1000) diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/asapo_producer.so b/tests/manual/python_tests/producer_wait_bug_mongo/asapo_producer.so new file mode 120000 index 000000000..cd186e180 --- /dev/null +++ b/tests/manual/python_tests/producer_wait_bug_mongo/asapo_producer.so @@ -0,0 +1 @@ +/home/yakubov/projects/asapo/cmake-build-debug/producer/api/python/asapo_producer.so \ No newline at end of file diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/authorizer.json.tpl b/tests/manual/python_tests/producer_wait_bug_mongo/authorizer.json.tpl new file mode 100644 index 000000000..7c3a796d2 --- /dev/null +++ b/tests/manual/python_tests/producer_wait_bug_mongo/authorizer.json.tpl @@ -0,0 +1,10 @@ +{ + "Port": {{ env "NOMAD_PORT_authorizer" }}, + "LogLevel":"debug", + "AlwaysAllowedBeamtimes":[{"BeamtimeId":"asapo_test","Beamline":"test"}, + {"BeamtimeId":"asapo_test1","Beamline":"test1"}, + {"BeamtimeId":"asapo_test2","Beamline":"test2"}], + "SecretFile":"auth_secret.key" +} + + diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/authorizer.nmd b/tests/manual/python_tests/producer_wait_bug_mongo/authorizer.nmd new file mode 100644 index 000000000..e03b1538f --- /dev/null +++ b/tests/manual/python_tests/producer_wait_bug_mongo/authorizer.nmd @@ -0,0 +1,55 @@ +job "authorizer" { + datacenters = ["dc1"] + + type = "service" + + group "group" { + count = 1 + + task "asapo-authorizer" { + driver = "raw_exec" + + config { + command = "/home/yakubov/projects/asapo/cmake-build-debug/authorizer/asapo-authorizer", + args = ["-config","${NOMAD_TASK_DIR}/authorizer.json"] + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + port "authorizer" { + static = "5007" + } + } + } + + service { + name = "authorizer" + port = "authorizer" + check { + name = "alive" + type = "http" + path = "/health-check" + interval = "10s" + timeout = "2s" + initial_status = "passing" + } + } + + template { + source = "/home/yakubov/projects/asapo/cmake-build-debug/tests/automatic/full_chain/simple_chain/authorizer.json.tpl" + destination = "local/authorizer.json" + change_mode = "signal" + change_signal = "SIGHUP" + } + + template { + source = "/home/yakubov/projects/asapo/cmake-build-debug/tests/automatic/full_chain/simple_chain/auth_secret.key" + destination = "auth_secret.key" + change_mode = "signal" + change_signal = "SIGHUP" + } + } + } +} diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/clean_db.sh b/tests/manual/python_tests/producer_wait_bug_mongo/clean_db.sh new file mode 100755 index 000000000..1f89519f8 --- /dev/null +++ b/tests/manual/python_tests/producer_wait_bug_mongo/clean_db.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +beamtime_id=asapo_test1 + + +echo "db.dropDatabase()" | mongo ${beamtime_id}_python2 +echo "db.dropDatabase()" | mongo ${beamtime_id}_python3 diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/discovery.json.tpl b/tests/manual/python_tests/producer_wait_bug_mongo/discovery.json.tpl new file mode 100644 index 000000000..4a72abf42 --- /dev/null +++ b/tests/manual/python_tests/producer_wait_bug_mongo/discovery.json.tpl @@ -0,0 +1,11 @@ +{ + "Mode": "consul", + "Receiver": { + "MaxConnections": 32, + "UseIBAddress": false + }, + "Port": {{ env "NOMAD_PORT_discovery" }}, + "LogLevel":"debug", +} + + diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/discovery.nmd b/tests/manual/python_tests/producer_wait_bug_mongo/discovery.nmd new file mode 100644 index 000000000..f95980bae --- /dev/null +++ b/tests/manual/python_tests/producer_wait_bug_mongo/discovery.nmd @@ -0,0 +1,49 @@ +job "discovery" { + datacenters = ["dc1"] + + type = "service" + + group "group" { + count = 1 + + task "discovery" { + driver = "raw_exec" + + config { + command = "/home/yakubov/projects/asapo/cmake-build-debug/discovery/asapo-discovery", + args = ["-config","${NOMAD_TASK_DIR}/discovery.json"] + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + port "discovery" { + static = "5006" + } + } + } + + service { + name = "discovery" + port = "discovery" + check { + name = "alive" + type = "http" + path = "/receivers" + interval = "10s" + timeout = "2s" + initial_status = "passing" + } + } + + template { + source = "/home/yakubov/projects/asapo/tests/manual/python_tests/producer/discovery.json.tpl" + destination = "local/discovery.json" + change_mode = "signal" + change_signal = "SIGHUP" + } + + } + } +} diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/file1 b/tests/manual/python_tests/producer_wait_bug_mongo/file1 new file mode 100644 index 000000000..a5bce3fd2 --- /dev/null +++ b/tests/manual/python_tests/producer_wait_bug_mongo/file1 @@ -0,0 +1 @@ +test1 diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/nginx.conf.tpl b/tests/manual/python_tests/producer_wait_bug_mongo/nginx.conf.tpl new file mode 100644 index 000000000..b784d0725 --- /dev/null +++ b/tests/manual/python_tests/producer_wait_bug_mongo/nginx.conf.tpl @@ -0,0 +1,91 @@ +worker_processes 1; + +events { + worker_connections 1024; +} + +http { +# include mime.types; +# default_type application/octet-stream; + +# sendfile on; +# tcp_nopush on; + +# keepalive_timeout 0; +# keepalive_timeout 65; + + resolver 127.0.0.1:8600 valid=1s; + server { + listen {{ env "NOMAD_PORT_nginx" }}; + set $discovery_endpoint discovery.service.asapo; + set $authorizer_endpoint authorizer.service.asapo; + set $fluentd_endpoint fluentd.service.asapo; + set $kibana_endpoint kibana.service.asapo; + set $grafana_endpoint grafana.service.asapo; + set $mongo_endpoint mongo.service.asapo; + set $influxdb_endpoint influxdb.service.asapo; + set $elasticsearch_endpoint elasticsearch.service.asapo; + + location /mongo/ { + rewrite ^/mongo(/.*) $1 break; + proxy_pass http://$mongo_endpoint:27017$uri$is_args$args; + } + + location /influxdb/ { + rewrite ^/influxdb(/.*) $1 break; + proxy_pass http://$influxdb_endpoint:8086$uri$is_args$args; + } + + location /elasticsearch/ { + rewrite ^/elasticsearch(/.*) $1 break; + proxy_pass http://$elasticsearch_endpoint:9200$uri$is_args$args; + } + + location /discovery/ { + rewrite ^/discovery(/.*) $1 break; + proxy_pass http://$discovery_endpoint:5006$uri$is_args$args; + } + + location /logs/ { + rewrite ^/logs(/.*) $1 break; + proxy_pass http://$fluentd_endpoint:9880$uri$is_args$args; + } + + location /logsview/ { + proxy_pass http://$kibana_endpoint:5601$uri$is_args$args; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header Host $http_host; + } + + location /performance/ { + rewrite ^/performance(/.*) $1 break; + proxy_pass http://$grafana_endpoint:3000$uri$is_args$args; + } + + location /authorizer/ { + rewrite ^/authorizer(/.*) $1 break; + proxy_pass http://$authorizer_endpoint:5007$uri$is_args$args; + } + + location /nginx-health { + return 200 "healthy\n"; + } + } +} + +stream { + resolver 127.0.0.1:8600 valid=1s; + + map $remote_addr $upstream { + default fluentd.service.asapo; + } + + + server { + listen 9881; + proxy_pass $upstream:24224; + } +} + + diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/nginx.nmd b/tests/manual/python_tests/producer_wait_bug_mongo/nginx.nmd new file mode 100644 index 000000000..b424e5387 --- /dev/null +++ b/tests/manual/python_tests/producer_wait_bug_mongo/nginx.nmd @@ -0,0 +1,63 @@ +job "nginx" { + datacenters = ["dc1"] + + type = "service" + + update { + max_parallel = 1 + min_healthy_time = "10s" + healthy_deadline = "3m" + auto_revert = false + } + + group "nginx" { + count = 1 + + restart { + attempts = 2 + interval = "30m" + delay = "15s" + mode = "fail" + } + + task "nginx" { + driver = "raw_exec" + + config { + command = "nginx", + args = ["-c","${NOMAD_TASK_DIR}/nginx.conf"] + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + mbits = 10 + port "nginx" { + static = 8400 + } + } + } + + service { + port = "nginx" + name = "nginx" + check { + name = "alive" + type = "http" + path = "/nginx-health" + timeout = "2s" + interval = "10s" + } + } + + template { + source = "/home/yakubov/projects/asapo/cmake-build-debug/tests/automatic/full_chain/simple_chain/nginx.conf.tpl" + destination = "local/nginx.conf" + change_mode = "restart" + } + + + } + } +} diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/receiver.json.tpl b/tests/manual/python_tests/producer_wait_bug_mongo/receiver.json.tpl new file mode 100644 index 000000000..bb59485af --- /dev/null +++ b/tests/manual/python_tests/producer_wait_bug_mongo/receiver.json.tpl @@ -0,0 +1,25 @@ +{ + "AdvertiseIP": "127.0.0.1", + "PerformanceDbServer":"localhost:8086", + "PerformanceDbName": "db_test", + "DatabaseServer":"localhost:27017", + "DiscoveryServer": "localhost:8400/discovery", + "DataServer": { + "NThreads": 2, + "ListenPort": {{ env "NOMAD_PORT_recv_ds" }} + }, + "DataCache": { + "Use": true, + "SizeGB": 1, + "ReservedShare": 10 + }, + "AuthorizationServer": "localhost:8400/authorizer", + "AuthorizationInterval": 10000, + "ListenPort": {{ env "NOMAD_PORT_recv" }}, + "Tag": "{{ env "NOMAD_ADDR_recv" }}", + "WriteToDisk": true, + "ReceiveToDiskThresholdMB":50, + "WriteToDb": true, + "LogLevel" : "debug", + "RootFolder" : "/tmp/asapo/receiver/files" +} diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/receiver.nmd b/tests/manual/python_tests/producer_wait_bug_mongo/receiver.nmd new file mode 100644 index 000000000..75fbca574 --- /dev/null +++ b/tests/manual/python_tests/producer_wait_bug_mongo/receiver.nmd @@ -0,0 +1,47 @@ +job "receiver" { + datacenters = ["dc1"] + + type = "service" + + group "group" { + count = 1 + + task "receiver" { + driver = "raw_exec" + + config { + command = "/home/yakubov/projects/asapo/cmake-build-debug/receiver/receiver", + args = ["${NOMAD_TASK_DIR}/receiver.json"] + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + port "recv" {} + port "recv_ds" {} + } + } + + service { + name = "asapo-receiver" + port = "recv" + check { + name = "alive" + type = "tcp" + interval = "10000s" + timeout = "2s" + initial_status = "passing" + } + } + + template { + source = "/home/yakubov/projects/asapo/cmake-build-debug/tests/automatic/full_chain/simple_chain/receiver.json.tpl" + destination = "local/receiver.json" + change_mode = "signal" + change_signal = "SIGHUP" + } + + } + } +} diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/run.sh b/tests/manual/python_tests/producer_wait_bug_mongo/run.sh new file mode 100755 index 000000000..0a0f2a5a1 --- /dev/null +++ b/tests/manual/python_tests/producer_wait_bug_mongo/run.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +export PYTHONPATH=/home/yakubov/projects/asapo/cmake-build-debug/producer/api/python:${PYTHONPATH} + +mkdir -p /tmp/asapo/receiver/files/test1/asapo_test1 + +python test.py python2 + + +python3 test.py python3 diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/start_services.sh b/tests/manual/python_tests/producer_wait_bug_mongo/start_services.sh new file mode 100755 index 000000000..bd0128b53 --- /dev/null +++ b/tests/manual/python_tests/producer_wait_bug_mongo/start_services.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +nomad run authorizer.nmd +nomad run discovery.nmd +nomad run nginx.nmd +nomad run receiver.nmd diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/stop_services.sh b/tests/manual/python_tests/producer_wait_bug_mongo/stop_services.sh new file mode 100755 index 000000000..1ce92c903 --- /dev/null +++ b/tests/manual/python_tests/producer_wait_bug_mongo/stop_services.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +nomad stop authorizer +nomad stop discovery +nomad stop nginx +nomad run nginx_kill.nmd && nomad stop -yes -purge nginx_kill +nomad stop receiver diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/test.py b/tests/manual/python_tests/producer_wait_bug_mongo/test.py new file mode 100644 index 000000000..2725862d9 --- /dev/null +++ b/tests/manual/python_tests/producer_wait_bug_mongo/test.py @@ -0,0 +1,76 @@ +from __future__ import print_function + +import asapo_producer +import sys +import time +import numpy as np +import threading +lock = threading.Lock() + + +endpoint = "127.0.0.1:8400" +beamtime = "asapo_test1" +stream = "detector" +token = "" +nthreads = 8 + +def callback(header,err): + lock.acquire() # to print + if err is not None: + print("could not sent: ",header,err) + else: + print ("successfuly sent: ",header) + lock.release() + +def assert_err(err): + if err is not None: + print(err) + sys.exit(1) + +producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads) + +producer.set_log_level("debug") + +#send single file +producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback) + + +#send subsets +producer.send_file(2, local_path = "./file1", exposed_path = stream+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) +producer.send_file(3, local_path = "./file1", exposed_path = stream+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) + +#send meta only +producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever", + ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) + +data = np.arange(10,dtype=np.float64) + +#send data from array +producer.send_data(4, stream+"/"+"file5",data, + ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) + +#send data from string +err = producer.send_data(5, stream+"/"+"file6",b"hello", + ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) + +#send metadata only +producer.send_data(6, stream+"/"+"file7",None, + ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) + + +x = np.array([[1, 2, 3], [4, 5, 6]], np.float32) +producer.send_data(4, stream+"/"+"file5",x, + ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) + +try: + x = x.T + producer.send_data(4, stream+"/"+"file5",x, + ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) +except: + pass +else: + print ("should be exception") + + +producer.wait_requests_finished(1000) + -- GitLab