Skip to content
Snippets Groups Projects
Commit ceca65db authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

fix bug with multidimentional numpy array and wait threads function

parent 4dd55501
No related branches found
No related tags found
No related merge requests found
Showing
with 425 additions and 26 deletions
......@@ -49,9 +49,11 @@ void RequestPool::ProcessRequest(const std::unique_ptr<RequestHandler>& request_
requests_in_progress_--;
request_handler->TearDownProcessingRequestLocked(success);
if (!success) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
PutRequestBackToQueue(std::move(request));
condition_.notify_all();
thread_info->lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
thread_info->lock.lock();
}
}
......@@ -107,11 +109,11 @@ Error RequestPool::WaitRequestsFinished(uint64_t timeout_ms) {
}
void RequestPool::StopThreads() {
log__->Debug("trying to stop threads");
mutex_.lock();
quit_ = true;
mutex_.unlock();
condition_.notify_all();
for(size_t i = 0; i < threads_.size(); i++) {
if(threads_[i].joinable()) {
log__->Debug("finishing thread " + std::to_string(i));
......
......@@ -60,10 +60,14 @@ cdef python_exception_from_error(Error& err):
cdef throw_exception(Error& err):
raise python_exception_from_error(err)
cdef void* data_pointer_nparray(data):
cdef void* data_pointer_nparray(data) except? NULL:
if data is None:
return <void*>NULL
data_char = data.view(np.int8)
try:
data_char.shape=(-1)
except:
raise AsapoWrongInputError("cannot do no-copy flatten - non-contiguous array?")
cdef char[::1] arr_memview = data_char
return <void*>&arr_memview[0]
......@@ -183,7 +187,7 @@ cdef class PyProducer:
:type callback: callback(info,err), where info - json string with event header that was used to send data, err - error string or None
:raises:
AsapoWrongInputError: wrong input (authorization, meta, ...)
AsapoLocalIOError: problems reading file to send
AsapoLocalIOError: problems reading file to send
AsapoProducerError: actually should not happen
"""
......@@ -201,7 +205,7 @@ cdef class PyProducer:
:param timeout_ms: timeout in milliseconds
:type timeout_ms: int
:raises:
AsapoTimeoutError: requests not finished for a given timeout
AsapoTimeoutError: requests not finished for a given timeout
"""
cdef Error err
cdef uint64_t timeout = timeout_ms
......
......@@ -38,5 +38,5 @@ sleep 1
$1 $3 $stream $beamtime_id "127.0.0.1:8400" > out || cat out
cat out
cat out | grep "successfuly sent" | wc -l | grep 7
cat out | grep "successfuly sent" | wc -l | grep 8
cat out | grep "local i/o error"
\ No newline at end of file
......@@ -27,12 +27,10 @@ set PYTHONPATH=%2
type out
set NUM=0
for /F %%N in ('find /C "successfuly sent" ^< "out"') do set NUM=%%N
echo %NUM% | findstr 7 || goto error
echo %NUM% | findstr 8 || goto error
goto :clean
:error
call :clean
exit /b 1
......
......@@ -57,10 +57,25 @@ producer.send_data(6, stream+"/"+"file7",None,
#send single file/wrong filename
producer.send_file(1, local_path = "./file2", exposed_path = stream+"/"+"file1", callback = callback)
x = np.array([[1, 2, 3], [4, 5, 6]], np.float32)
producer.send_data(8, stream+"/"+"file8",x,
ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
try:
x = x.T
producer.send_data(8, stream+"/"+"file8",x,
ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
except asapo_producer.AsapoWrongInputError as e:
print(e)
else:
print("should be error sending non-cont array")
sys.exit(1)
producer.wait_requests_finished(50000)
n = producer.get_requests_queue_size()
if n!=0:
print("number of remaining requestst should be zero, got ",n)
print("number of remaining requests should be zero, got ",n)
sys.exit(1)
......
/home/yakubov/projects/asapo/cmake-build-debug/producer/api/python/asapo_producer.so
\ No newline at end of file
......@@ -5,7 +5,10 @@
"UseIBAddress": false
},
"Port": {{ env "NOMAD_PORT_discovery" }},
"LogLevel":"debug"
"LogLevel":"debug",
"Mongo": {
"StaticEndpoint": "127.0.0.1:27017"
}
}
......@@ -10,12 +10,10 @@ lock = threading.Lock()
endpoint = "127.0.0.1:8400"
beamtime = "asapo_test1"
stream = sys.argv[1]
stream = "detector"
token = ""
nthreads = 8
def callback(header,err):
lock.acquire() # to print
if err is not None:
......@@ -29,41 +27,50 @@ def assert_err(err):
print(err)
sys.exit(1)
producer, err = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads)
assert_err(err)
producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads)
producer.set_log_level("info")
#send single file
err = producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback)
assert_err(err)
producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback)
#send subsets
producer.send_file(2, local_path = "./file1", exposed_path = stream+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
producer.send_file(3, local_path = "./file1", exposed_path = stream+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
#send meta only
err = producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever",
producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever",
ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback)
assert_err(err)
data = np.arange(10,dtype=np.float64)
#send data from array
err = producer.send_data(4, stream+"/"+"file5",data,
producer.send_data(4, stream+"/"+"file5",data,
ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
assert_err(err)
#send data from string
err = producer.send_data(5, stream+"/"+"file6",b"hello",
ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
assert_err(err)
#send metadata only
err = producer.send_data(6, stream+"/"+"file7",None,
producer.send_data(6, stream+"/"+"file7",None,
ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback)
assert_err(err)
time.sleep(1)
x = np.array([[1, 2, 3], [4, 5, 6]], np.float32)
producer.send_data(4, stream+"/"+"file5",x,
ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
try:
x = x.T
producer.send_data(4, stream+"/"+"file5",x,
ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
except:
pass
else:
print ("sohuld be exception")
producer.wait_requests_finished(1000)
/home/yakubov/projects/asapo/cmake-build-debug/producer/api/python/asapo_producer.so
\ No newline at end of file
{
"Port": {{ env "NOMAD_PORT_authorizer" }},
"LogLevel":"debug",
"AlwaysAllowedBeamtimes":[{"BeamtimeId":"asapo_test","Beamline":"test"},
{"BeamtimeId":"asapo_test1","Beamline":"test1"},
{"BeamtimeId":"asapo_test2","Beamline":"test2"}],
"SecretFile":"auth_secret.key"
}
job "authorizer" {
datacenters = ["dc1"]
type = "service"
group "group" {
count = 1
task "asapo-authorizer" {
driver = "raw_exec"
config {
command = "/home/yakubov/projects/asapo/cmake-build-debug/authorizer/asapo-authorizer",
args = ["-config","${NOMAD_TASK_DIR}/authorizer.json"]
}
resources {
cpu = 500 # 500 MHz
memory = 256 # 256MB
network {
port "authorizer" {
static = "5007"
}
}
}
service {
name = "authorizer"
port = "authorizer"
check {
name = "alive"
type = "http"
path = "/health-check"
interval = "10s"
timeout = "2s"
initial_status = "passing"
}
}
template {
source = "/home/yakubov/projects/asapo/cmake-build-debug/tests/automatic/full_chain/simple_chain/authorizer.json.tpl"
destination = "local/authorizer.json"
change_mode = "signal"
change_signal = "SIGHUP"
}
template {
source = "/home/yakubov/projects/asapo/cmake-build-debug/tests/automatic/full_chain/simple_chain/auth_secret.key"
destination = "auth_secret.key"
change_mode = "signal"
change_signal = "SIGHUP"
}
}
}
}
#!/usr/bin/env bash
beamtime_id=asapo_test1
echo "db.dropDatabase()" | mongo ${beamtime_id}_python2
echo "db.dropDatabase()" | mongo ${beamtime_id}_python3
{
"Mode": "consul",
"Receiver": {
"MaxConnections": 32,
"UseIBAddress": false
},
"Port": {{ env "NOMAD_PORT_discovery" }},
"LogLevel":"debug",
}
job "discovery" {
datacenters = ["dc1"]
type = "service"
group "group" {
count = 1
task "discovery" {
driver = "raw_exec"
config {
command = "/home/yakubov/projects/asapo/cmake-build-debug/discovery/asapo-discovery",
args = ["-config","${NOMAD_TASK_DIR}/discovery.json"]
}
resources {
cpu = 500 # 500 MHz
memory = 256 # 256MB
network {
port "discovery" {
static = "5006"
}
}
}
service {
name = "discovery"
port = "discovery"
check {
name = "alive"
type = "http"
path = "/receivers"
interval = "10s"
timeout = "2s"
initial_status = "passing"
}
}
template {
source = "/home/yakubov/projects/asapo/tests/manual/python_tests/producer/discovery.json.tpl"
destination = "local/discovery.json"
change_mode = "signal"
change_signal = "SIGHUP"
}
}
}
}
test1
worker_processes 1;
events {
worker_connections 1024;
}
http {
# include mime.types;
# default_type application/octet-stream;
# sendfile on;
# tcp_nopush on;
# keepalive_timeout 0;
# keepalive_timeout 65;
resolver 127.0.0.1:8600 valid=1s;
server {
listen {{ env "NOMAD_PORT_nginx" }};
set $discovery_endpoint discovery.service.asapo;
set $authorizer_endpoint authorizer.service.asapo;
set $fluentd_endpoint fluentd.service.asapo;
set $kibana_endpoint kibana.service.asapo;
set $grafana_endpoint grafana.service.asapo;
set $mongo_endpoint mongo.service.asapo;
set $influxdb_endpoint influxdb.service.asapo;
set $elasticsearch_endpoint elasticsearch.service.asapo;
location /mongo/ {
rewrite ^/mongo(/.*) $1 break;
proxy_pass http://$mongo_endpoint:27017$uri$is_args$args;
}
location /influxdb/ {
rewrite ^/influxdb(/.*) $1 break;
proxy_pass http://$influxdb_endpoint:8086$uri$is_args$args;
}
location /elasticsearch/ {
rewrite ^/elasticsearch(/.*) $1 break;
proxy_pass http://$elasticsearch_endpoint:9200$uri$is_args$args;
}
location /discovery/ {
rewrite ^/discovery(/.*) $1 break;
proxy_pass http://$discovery_endpoint:5006$uri$is_args$args;
}
location /logs/ {
rewrite ^/logs(/.*) $1 break;
proxy_pass http://$fluentd_endpoint:9880$uri$is_args$args;
}
location /logsview/ {
proxy_pass http://$kibana_endpoint:5601$uri$is_args$args;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $http_host;
}
location /performance/ {
rewrite ^/performance(/.*) $1 break;
proxy_pass http://$grafana_endpoint:3000$uri$is_args$args;
}
location /authorizer/ {
rewrite ^/authorizer(/.*) $1 break;
proxy_pass http://$authorizer_endpoint:5007$uri$is_args$args;
}
location /nginx-health {
return 200 "healthy\n";
}
}
}
stream {
resolver 127.0.0.1:8600 valid=1s;
map $remote_addr $upstream {
default fluentd.service.asapo;
}
server {
listen 9881;
proxy_pass $upstream:24224;
}
}
job "nginx" {
datacenters = ["dc1"]
type = "service"
update {
max_parallel = 1
min_healthy_time = "10s"
healthy_deadline = "3m"
auto_revert = false
}
group "nginx" {
count = 1
restart {
attempts = 2
interval = "30m"
delay = "15s"
mode = "fail"
}
task "nginx" {
driver = "raw_exec"
config {
command = "nginx",
args = ["-c","${NOMAD_TASK_DIR}/nginx.conf"]
}
resources {
cpu = 500 # 500 MHz
memory = 256 # 256MB
network {
mbits = 10
port "nginx" {
static = 8400
}
}
}
service {
port = "nginx"
name = "nginx"
check {
name = "alive"
type = "http"
path = "/nginx-health"
timeout = "2s"
interval = "10s"
}
}
template {
source = "/home/yakubov/projects/asapo/cmake-build-debug/tests/automatic/full_chain/simple_chain/nginx.conf.tpl"
destination = "local/nginx.conf"
change_mode = "restart"
}
}
}
}
{
"AdvertiseIP": "127.0.0.1",
"PerformanceDbServer":"localhost:8086",
"PerformanceDbName": "db_test",
"DatabaseServer":"localhost:27017",
"DiscoveryServer": "localhost:8400/discovery",
"DataServer": {
"NThreads": 2,
"ListenPort": {{ env "NOMAD_PORT_recv_ds" }}
},
"DataCache": {
"Use": true,
"SizeGB": 1,
"ReservedShare": 10
},
"AuthorizationServer": "localhost:8400/authorizer",
"AuthorizationInterval": 10000,
"ListenPort": {{ env "NOMAD_PORT_recv" }},
"Tag": "{{ env "NOMAD_ADDR_recv" }}",
"WriteToDisk": true,
"ReceiveToDiskThresholdMB":50,
"WriteToDb": true,
"LogLevel" : "debug",
"RootFolder" : "/tmp/asapo/receiver/files"
}
job "receiver" {
datacenters = ["dc1"]
type = "service"
group "group" {
count = 1
task "receiver" {
driver = "raw_exec"
config {
command = "/home/yakubov/projects/asapo/cmake-build-debug/receiver/receiver",
args = ["${NOMAD_TASK_DIR}/receiver.json"]
}
resources {
cpu = 500 # 500 MHz
memory = 256 # 256MB
network {
port "recv" {}
port "recv_ds" {}
}
}
service {
name = "asapo-receiver"
port = "recv"
check {
name = "alive"
type = "tcp"
interval = "10000s"
timeout = "2s"
initial_status = "passing"
}
}
template {
source = "/home/yakubov/projects/asapo/cmake-build-debug/tests/automatic/full_chain/simple_chain/receiver.json.tpl"
destination = "local/receiver.json"
change_mode = "signal"
change_signal = "SIGHUP"
}
}
}
}
#!/usr/bin/env bash
export PYTHONPATH=/home/yakubov/projects/asapo/cmake-build-debug/producer/api/python:${PYTHONPATH}
mkdir -p /tmp/asapo/receiver/files/test1/asapo_test1
python test.py python2
python3 test.py python3
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment