From 054de10b1184d3eb8c5beb749db56292637f7fc8 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Thu, 17 Dec 2020 15:56:21 +0100 Subject: [PATCH] fix memory leak in python consumer package --- consumer/api/python/asapo_consumer.pyx.in | 56 +++++++++++------------ 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 0cc14340b..fc5ea16b1 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -103,7 +103,7 @@ cdef throw_exception(Error& err, res = None): raise AsapoConsumerError(error_string) cdef class PyDataBroker: - cdef DataBroker* c_broker + cdef unique_ptr[DataBroker] c_broker def _op(self, op, group_id, substream, meta_only, uint64_t id): cdef FileInfo info cdef string b_group_id = _bytes(group_id) @@ -114,13 +114,13 @@ cdef class PyDataBroker: cdef np.npy_intp dims[1] if op == "next": with nogil: - err = self.c_broker.GetNext(&info, b_group_id,b_substream, p_data) + err = self.c_broker.get().GetNext(&info, b_group_id,b_substream, p_data) elif op == "last": with nogil: - err = self.c_broker.GetLast(&info, b_substream, p_data) + err = self.c_broker.get().GetLast(&info, b_substream, p_data) elif op == "id": with nogil: - err = self.c_broker.GetById(id, &info, b_substream, p_data) + err = self.c_broker.get().GetById(id, &info, b_substream, p_data) if err: throw_exception(err) info_str = _str(info.Json()) @@ -146,7 +146,7 @@ cdef class PyDataBroker: cdef Error err cdef FileData data with nogil: - err = self.c_broker.RetrieveData(&info, &data) + err = self.c_broker.get().RetrieveData(&info, &data) if err: throw_exception(err) cdef np.npy_intp dims[1] @@ -162,17 +162,17 @@ cdef class PyDataBroker: cdef uint64_t size cdef string b_substream = _bytes(substream) with nogil: - size = self.c_broker.GetCurrentSize(b_substream,&err) + size = self.c_broker.get().GetCurrentSize(b_substream,&err) err_str = _str(GetErrorString(&err)) if err: throw_exception(err) return size def set_timeout(self,timeout): - self.c_broker.SetTimeout(timeout) + self.c_broker.get().SetTimeout(timeout) def force_no_rdma(self): - self.c_broker.ForceNoRdma() + self.c_broker.get().ForceNoRdma() def current_connection_type(self): - cdef NetworkConnectionType connection_type = self.c_broker.CurrentConnectionType() + cdef NetworkConnectionType connection_type = self.c_broker.get().CurrentConnectionType() cdef int cased = <int>connection_type cdef string result = "Unknown" if cased == <int>NetworkConnectionType_kUndefined: @@ -188,7 +188,7 @@ cdef class PyDataBroker: cdef Error err cdef uint64_t id = value with nogil: - err = self.c_broker.SetLastReadMarker(id,b_group_id,b_substream) + err = self.c_broker.get().SetLastReadMarker(id,b_group_id,b_substream) if err: throw_exception(err) return @@ -197,7 +197,7 @@ cdef class PyDataBroker: cdef string b_substream = _bytes(substream) cdef Error err with nogil: - err = self.c_broker.ResetLastReadMarker(b_group_id,b_substream) + err = self.c_broker.get().ResetLastReadMarker(b_group_id,b_substream) if err: throw_exception(err) return @@ -205,7 +205,7 @@ cdef class PyDataBroker: cdef Error err cdef string group_id with nogil: - group_id = self.c_broker.GenerateNewGroupId(&err) + group_id = self.c_broker.get().GenerateNewGroupId(&err) if err: throw_exception(err) return _str(group_id) @@ -214,7 +214,7 @@ cdef class PyDataBroker: cdef vector[StreamInfo] substreams cdef string b_from_substream = _bytes(from_substream) with nogil: - substreams = self.c_broker.GetSubstreamList(b_from_substream,&err) + substreams = self.c_broker.get().GetSubstreamList(b_from_substream,&err) if err: throw_exception(err) list = [] @@ -226,7 +226,7 @@ cdef class PyDataBroker: cdef string b_substream = _bytes(substream) cdef Error err with nogil: - err = self.c_broker.Acknowledge(b_group_id,id,b_substream) + err = self.c_broker.get().Acknowledge(b_group_id,id,b_substream) if err: throw_exception(err) def neg_acknowledge(self, group_id, uint64_t id, uint64_t delay_sec, substream = "default"): @@ -234,12 +234,12 @@ cdef class PyDataBroker: cdef string b_substream = _bytes(substream) cdef Error err with nogil: - err = self.c_broker.NegativeAcknowledge(b_group_id,id,delay_sec,b_substream) + err = self.c_broker.get().NegativeAcknowledge(b_group_id,id,delay_sec,b_substream) if err: throw_exception(err) def set_resend_nacs(self,bool resend, uint64_t delay_sec, uint64_t resend_attempts): with nogil: - self.c_broker.SetResendNacs(resend,delay_sec,resend_attempts) + self.c_broker.get().SetResendNacs(resend,delay_sec,resend_attempts) def get_last_acknowledged_tuple_id(self, group_id, substream = "default"): cdef string b_group_id = _bytes(group_id) @@ -247,7 +247,7 @@ cdef class PyDataBroker: cdef Error err cdef uint64_t id with nogil: - id = self.c_broker.GetLastAcknowledgedTulpeId(b_group_id,b_substream,&err) + id = self.c_broker.get().GetLastAcknowledgedTulpeId(b_group_id,b_substream,&err) if err: throw_exception(err) return id @@ -258,7 +258,7 @@ cdef class PyDataBroker: cdef string b_substream = _bytes(substream) cdef IdList ids with nogil: - ids = self.c_broker.GetUnacknowledgedTupleIds(b_group_id, b_substream, from_id, to_id, &err) + ids = self.c_broker.get().GetUnacknowledgedTupleIds(b_group_id, b_substream, from_id, to_id, &err) if err: throw_exception(err) list = [] @@ -272,7 +272,7 @@ cdef class PyDataBroker: cdef Error err cdef FileInfos file_infos with nogil: - file_infos = self.c_broker.QueryImages(b_query,b_substream,&err) + file_infos = self.c_broker.get().QueryImages(b_query,b_substream,&err) if err: throw_exception(err) json_list = [] @@ -287,13 +287,13 @@ cdef class PyDataBroker: cdef Error err if op == "next": with nogil: - dataset = self.c_broker.GetNextDataset(b_group_id,b_substream, min_size, &err) + dataset = self.c_broker.get().GetNextDataset(b_group_id,b_substream, min_size, &err) elif op == "last": with nogil: - dataset = self.c_broker.GetLastDataset(b_substream, min_size, &err) + dataset = self.c_broker.get().GetLastDataset(b_substream, min_size, &err) elif op == "id": with nogil: - dataset = self.c_broker.GetDatasetById(id, b_substream, min_size, &err) + dataset = self.c_broker.get().GetDatasetById(id, b_substream, min_size, &err) json_list = [] for fi in dataset.content: json_list.append(json.loads(_str(fi.Json()))) @@ -311,14 +311,14 @@ cdef class PyDataBroker: cdef Error err cdef string meta_str with nogil: - meta_str = self.c_broker.GetBeamtimeMeta(&err) + meta_str = self.c_broker.get().GetBeamtimeMeta(&err) if err: throw_exception(err) meta = json.loads(_str(meta_str)) del meta['_id'] return meta def interrupt_current_operation(self): - self.c_broker.InterruptCurrentOperation() + self.c_broker.get().InterruptCurrentOperation() cdef class __PyDataBrokerFactory: cdef DataBrokerFactory c_factory def __cinit__(self): @@ -333,14 +333,12 @@ cdef class __PyDataBrokerFactory: source.user_token = _bytes(token) source.stream = _bytes(stream) cdef Error err - cdef unique_ptr[DataBroker] c_broker + broker = PyDataBroker() with nogil: - c_broker = self.c_factory.CreateServerBroker(b_server_name,b_source_path,b_has_filesystem,source,&err) + broker.c_broker = self.c_factory.CreateServerBroker(b_server_name,b_source_path,b_has_filesystem,source,&err) if err: throw_exception(err) - broker = PyDataBroker() - broker.c_broker = c_broker.release() - broker.c_broker.SetTimeout(timeout) + broker.c_broker.get().SetTimeout(timeout) return broker def create_server_broker(server_name,source_path,has_filesystem,beamtime_id,stream,token,timeout_ms): -- GitLab