Skip to content
Snippets Groups Projects
Commit 054de10b authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

fix memory leak in python consumer package

parent 88fb0c18
No related branches found
No related tags found
No related merge requests found
......@@ -103,7 +103,7 @@ cdef throw_exception(Error& err, res = None):
raise AsapoConsumerError(error_string)
cdef class PyDataBroker:
cdef DataBroker* c_broker
cdef unique_ptr[DataBroker] c_broker
def _op(self, op, group_id, substream, meta_only, uint64_t id):
cdef FileInfo info
cdef string b_group_id = _bytes(group_id)
......@@ -114,13 +114,13 @@ cdef class PyDataBroker:
cdef np.npy_intp dims[1]
if op == "next":
with nogil:
err = self.c_broker.GetNext(&info, b_group_id,b_substream, p_data)
err = self.c_broker.get().GetNext(&info, b_group_id,b_substream, p_data)
elif op == "last":
with nogil:
err = self.c_broker.GetLast(&info, b_substream, p_data)
err = self.c_broker.get().GetLast(&info, b_substream, p_data)
elif op == "id":
with nogil:
err = self.c_broker.GetById(id, &info, b_substream, p_data)
err = self.c_broker.get().GetById(id, &info, b_substream, p_data)
if err:
throw_exception(err)
info_str = _str(info.Json())
......@@ -146,7 +146,7 @@ cdef class PyDataBroker:
cdef Error err
cdef FileData data
with nogil:
err = self.c_broker.RetrieveData(&info, &data)
err = self.c_broker.get().RetrieveData(&info, &data)
if err:
throw_exception(err)
cdef np.npy_intp dims[1]
......@@ -162,17 +162,17 @@ cdef class PyDataBroker:
cdef uint64_t size
cdef string b_substream = _bytes(substream)
with nogil:
size = self.c_broker.GetCurrentSize(b_substream,&err)
size = self.c_broker.get().GetCurrentSize(b_substream,&err)
err_str = _str(GetErrorString(&err))
if err:
throw_exception(err)
return size
def set_timeout(self,timeout):
self.c_broker.SetTimeout(timeout)
self.c_broker.get().SetTimeout(timeout)
def force_no_rdma(self):
self.c_broker.ForceNoRdma()
self.c_broker.get().ForceNoRdma()
def current_connection_type(self):
cdef NetworkConnectionType connection_type = self.c_broker.CurrentConnectionType()
cdef NetworkConnectionType connection_type = self.c_broker.get().CurrentConnectionType()
cdef int cased = <int>connection_type
cdef string result = "Unknown"
if cased == <int>NetworkConnectionType_kUndefined:
......@@ -188,7 +188,7 @@ cdef class PyDataBroker:
cdef Error err
cdef uint64_t id = value
with nogil:
err = self.c_broker.SetLastReadMarker(id,b_group_id,b_substream)
err = self.c_broker.get().SetLastReadMarker(id,b_group_id,b_substream)
if err:
throw_exception(err)
return
......@@ -197,7 +197,7 @@ cdef class PyDataBroker:
cdef string b_substream = _bytes(substream)
cdef Error err
with nogil:
err = self.c_broker.ResetLastReadMarker(b_group_id,b_substream)
err = self.c_broker.get().ResetLastReadMarker(b_group_id,b_substream)
if err:
throw_exception(err)
return
......@@ -205,7 +205,7 @@ cdef class PyDataBroker:
cdef Error err
cdef string group_id
with nogil:
group_id = self.c_broker.GenerateNewGroupId(&err)
group_id = self.c_broker.get().GenerateNewGroupId(&err)
if err:
throw_exception(err)
return _str(group_id)
......@@ -214,7 +214,7 @@ cdef class PyDataBroker:
cdef vector[StreamInfo] substreams
cdef string b_from_substream = _bytes(from_substream)
with nogil:
substreams = self.c_broker.GetSubstreamList(b_from_substream,&err)
substreams = self.c_broker.get().GetSubstreamList(b_from_substream,&err)
if err:
throw_exception(err)
list = []
......@@ -226,7 +226,7 @@ cdef class PyDataBroker:
cdef string b_substream = _bytes(substream)
cdef Error err
with nogil:
err = self.c_broker.Acknowledge(b_group_id,id,b_substream)
err = self.c_broker.get().Acknowledge(b_group_id,id,b_substream)
if err:
throw_exception(err)
def neg_acknowledge(self, group_id, uint64_t id, uint64_t delay_sec, substream = "default"):
......@@ -234,12 +234,12 @@ cdef class PyDataBroker:
cdef string b_substream = _bytes(substream)
cdef Error err
with nogil:
err = self.c_broker.NegativeAcknowledge(b_group_id,id,delay_sec,b_substream)
err = self.c_broker.get().NegativeAcknowledge(b_group_id,id,delay_sec,b_substream)
if err:
throw_exception(err)
def set_resend_nacs(self,bool resend, uint64_t delay_sec, uint64_t resend_attempts):
with nogil:
self.c_broker.SetResendNacs(resend,delay_sec,resend_attempts)
self.c_broker.get().SetResendNacs(resend,delay_sec,resend_attempts)
def get_last_acknowledged_tuple_id(self, group_id, substream = "default"):
cdef string b_group_id = _bytes(group_id)
......@@ -247,7 +247,7 @@ cdef class PyDataBroker:
cdef Error err
cdef uint64_t id
with nogil:
id = self.c_broker.GetLastAcknowledgedTulpeId(b_group_id,b_substream,&err)
id = self.c_broker.get().GetLastAcknowledgedTulpeId(b_group_id,b_substream,&err)
if err:
throw_exception(err)
return id
......@@ -258,7 +258,7 @@ cdef class PyDataBroker:
cdef string b_substream = _bytes(substream)
cdef IdList ids
with nogil:
ids = self.c_broker.GetUnacknowledgedTupleIds(b_group_id, b_substream, from_id, to_id, &err)
ids = self.c_broker.get().GetUnacknowledgedTupleIds(b_group_id, b_substream, from_id, to_id, &err)
if err:
throw_exception(err)
list = []
......@@ -272,7 +272,7 @@ cdef class PyDataBroker:
cdef Error err
cdef FileInfos file_infos
with nogil:
file_infos = self.c_broker.QueryImages(b_query,b_substream,&err)
file_infos = self.c_broker.get().QueryImages(b_query,b_substream,&err)
if err:
throw_exception(err)
json_list = []
......@@ -287,13 +287,13 @@ cdef class PyDataBroker:
cdef Error err
if op == "next":
with nogil:
dataset = self.c_broker.GetNextDataset(b_group_id,b_substream, min_size, &err)
dataset = self.c_broker.get().GetNextDataset(b_group_id,b_substream, min_size, &err)
elif op == "last":
with nogil:
dataset = self.c_broker.GetLastDataset(b_substream, min_size, &err)
dataset = self.c_broker.get().GetLastDataset(b_substream, min_size, &err)
elif op == "id":
with nogil:
dataset = self.c_broker.GetDatasetById(id, b_substream, min_size, &err)
dataset = self.c_broker.get().GetDatasetById(id, b_substream, min_size, &err)
json_list = []
for fi in dataset.content:
json_list.append(json.loads(_str(fi.Json())))
......@@ -311,14 +311,14 @@ cdef class PyDataBroker:
cdef Error err
cdef string meta_str
with nogil:
meta_str = self.c_broker.GetBeamtimeMeta(&err)
meta_str = self.c_broker.get().GetBeamtimeMeta(&err)
if err:
throw_exception(err)
meta = json.loads(_str(meta_str))
del meta['_id']
return meta
def interrupt_current_operation(self):
self.c_broker.InterruptCurrentOperation()
self.c_broker.get().InterruptCurrentOperation()
cdef class __PyDataBrokerFactory:
cdef DataBrokerFactory c_factory
def __cinit__(self):
......@@ -333,14 +333,12 @@ cdef class __PyDataBrokerFactory:
source.user_token = _bytes(token)
source.stream = _bytes(stream)
cdef Error err
cdef unique_ptr[DataBroker] c_broker
broker = PyDataBroker()
with nogil:
c_broker = self.c_factory.CreateServerBroker(b_server_name,b_source_path,b_has_filesystem,source,&err)
broker.c_broker = self.c_factory.CreateServerBroker(b_server_name,b_source_path,b_has_filesystem,source,&err)
if err:
throw_exception(err)
broker = PyDataBroker()
broker.c_broker = c_broker.release()
broker.c_broker.SetTimeout(timeout)
broker.c_broker.get().SetTimeout(timeout)
return broker
def create_server_broker(server_name,source_path,has_filesystem,beamtime_id,stream,token,timeout_ms):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment