Skip to content
Snippets Groups Projects
Commit cca1b644 authored by Tim Schoof's avatar Tim Schoof
Browse files

Release the Python GIL for all IO operations

parent 31dde5f5
Branches
Tags
No related merge requests found
......@@ -32,7 +32,7 @@ cdef extern from "asapo_worker.h" namespace "asapo":
FileInfos content
cdef extern from "asapo_worker.h" namespace "asapo":
cdef extern from "asapo_worker.h" namespace "asapo" nogil:
cdef cppclass DataBroker:
DataBroker() except +
void SetTimeout(uint64_t timeout_ms)
......@@ -50,7 +50,7 @@ cdef extern from "asapo_worker.h" namespace "asapo":
Error RetrieveData(FileInfo* info, FileData* data)
cdef extern from "asapo_worker.h" namespace "asapo":
cdef extern from "asapo_worker.h" namespace "asapo" nogil:
cdef cppclass DataBrokerFactory:
DataBrokerFactory() except +
unique_ptr[DataBroker] CreateServerBroker(string server_name,string source_path,string beamtime_id,string token,Error* error)
......
......@@ -5,6 +5,7 @@ import numpy as np
cimport numpy as np
import json
from cpython.version cimport PY_MAJOR_VERSION
from libcpp.string cimport string
np.import_array()
......@@ -30,17 +31,22 @@ cdef bytes _bytes(s):
cdef class PyDataBroker:
cdef DataBroker* c_broker
def _op(self, op, group_id, meta_only,id):
def _op(self, op, group_id, meta_only, uint64_t id):
cdef FileInfo info
cdef string b_group_id = _bytes(group_id)
cdef FileData* p_data = <FileData*>NULL if meta_only else &data
cdef FileData data
cdef Error err
cdef np.npy_intp dims[1]
if op == "next":
err = self.c_broker.GetNext(&info, _bytes(group_id), <FileData*>NULL if meta_only else &data)
with nogil:
err = self.c_broker.GetNext(&info, b_group_id, p_data)
elif op == "last":
err = self.c_broker.GetLast(&info,_bytes(group_id), <FileData*>NULL if meta_only else &data)
with nogil:
err = self.c_broker.GetLast(&info, b_group_id, p_data)
elif op == "id":
err = self.c_broker.GetById(id, &info,_bytes(group_id), <FileData*>NULL if meta_only else &data)
with nogil:
err = self.c_broker.GetById(id, &info, b_group_id, p_data)
err_str = _str(GetErrorString(&err))
if err_str.strip():
return None,None,err_str
......@@ -66,7 +72,8 @@ cdef class PyDataBroker:
return None,"wrong metadata"
cdef Error err
cdef FileData data
err = self.c_broker.RetrieveData(&info, &data)
with nogil:
err = self.c_broker.RetrieveData(&info, &data)
err_str = _str(GetErrorString(&err))
if err_str.strip():
return None,err_str
......@@ -78,15 +85,19 @@ cdef class PyDataBroker:
return arr,None
def get_ndatasets(self):
cdef Error err
size = self.c_broker.GetNDataSets(&err)
cdef uint64_t size
with nogil:
size = self.c_broker.GetNDataSets(&err)
err_str = _str(GetErrorString(&err))
if err_str.strip():
return None,err_str
else:
return size,None
def reset_counter(self,group_id):
cdef string b_group_id = _bytes(group_id)
cdef Error err
err = self.c_broker.ResetCounter(_bytes(group_id))
with nogil:
err = self.c_broker.ResetCounter(b_group_id)
err_str = _str(GetErrorString(&err))
if err_str.strip():
return err_str
......@@ -95,16 +106,19 @@ cdef class PyDataBroker:
def generate_group_id(self):
cdef Error err
cdef string group_id
group_id = self.c_broker.GenerateNewGroupId(&err)
with nogil:
group_id = self.c_broker.GenerateNewGroupId(&err)
err_str = _str(GetErrorString(&err))
if err_str.strip():
return None, err_str
else:
return _str(group_id), None
def query_images(self,query):
cdef string b_query = _bytes(query)
cdef Error err
cdef FileInfos file_infos
file_infos = self.c_broker.QueryImages(_bytes(query),&err)
with nogil:
file_infos = self.c_broker.QueryImages(b_query,&err)
err_str = _str(GetErrorString(&err))
if err_str.strip():
return None, err_str
......@@ -113,16 +127,20 @@ cdef class PyDataBroker:
for fi in file_infos:
json_list.append(json.loads(_str(fi.Json())))
return json_list, None
def _op_dataset(self, op, group_id, id):
def _op_dataset(self, op, group_id, uint64_t id):
cdef string b_group_id = _bytes(group_id)
cdef FileInfos file_infos
cdef DataSet dataset
cdef Error err
if op == "next":
dataset = self.c_broker.GetNextDataset(_bytes(group_id),&err)
with nogil:
dataset = self.c_broker.GetNextDataset(b_group_id, &err)
elif op == "last":
dataset = self.c_broker.GetLastDataset(_bytes(group_id),&err)
with nogil:
dataset = self.c_broker.GetLastDataset(b_group_id, &err)
elif op == "id":
dataset = self.c_broker.GetDatasetById(id,_bytes(group_id),&err)
with nogil:
dataset = self.c_broker.GetDatasetById(id, b_group_id, &err)
err_str = _str(GetErrorString(&err))
if err_str.strip():
return None, None, err_str
......@@ -144,7 +162,8 @@ cdef class PyDataBroker:
def get_beamtime_meta(self):
cdef Error err
cdef string meta_str
meta_str = self.c_broker.GetBeamtimeMeta(&err)
with nogil:
meta_str = self.c_broker.GetBeamtimeMeta(&err)
err_str = _str(GetErrorString(&err))
if err_str.strip():
return None, err_str
......@@ -156,10 +175,18 @@ cdef class PyDataBroker:
cdef class PyDataBrokerFactory:
cdef DataBrokerFactory c_factory
def __cinit__(self):
self.c_factory = DataBrokerFactory()
with nogil:
self.c_factory = DataBrokerFactory()
def create_server_broker(self,server_name,source_path,beamtime_id,token,timeout):
cdef string b_server_name = _bytes(server_name)
cdef string b_source_path = _bytes(source_path)
cdef string b_beamtime_id = _bytes(beamtime_id)
cdef string b_token = _bytes(token)
cdef Error err
cdef unique_ptr[DataBroker] c_broker = self.c_factory.CreateServerBroker(server_name,source_path,beamtime_id,token,&err)
cdef unique_ptr[DataBroker] c_broker
with nogil:
c_broker = self.c_factory.CreateServerBroker(
b_server_name, b_source_path, b_beamtime_id, b_token, &err)
broker = PyDataBroker()
broker.c_broker = c_broker.release()
broker.c_broker.SetTimeout(timeout)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment