Skip to content
Snippets Groups Projects
Commit f2b6fb01 authored by Tim Schoof's avatar Tim Schoof
Browse files

Allow sending and object supporting the buffer interface

This includes read-only buffers.

The code split between bytes and numpy array is removed and all objects
are now handled equally.

Furthermore, it seems to be more correct to increase the ref count for
None objects as well, while a pointer to that object is used.
parent 4fe89582
No related branches found
No related tags found
No related merge requests found
......@@ -89,22 +89,24 @@ cdef python_exception_from_error(Error& err):
cdef throw_exception(Error& err):
raise python_exception_from_error(err)
cdef void* data_pointer_nparray(data) except? NULL:
if data is None:
return <void*>NULL
data_char = data.view(np.int8)
try:
data_char.shape=(-1)
except:
raise AsapoWrongInputError("cannot do no-copy flatten - non-contiguous array?")
cdef char[::1] arr_memview = data_char
return <void*>&arr_memview[0]
cdef void* data_pointer_bytes(data):
if data is None:
return <void*>NULL
cdef const unsigned char[::1] arr_memview = data
return <void*>&arr_memview[0]
cdef void* data_pointer(data) except? NULL:
if data is None:
return <void*>NULL
# create a 1D view of unsigned bytes of the underlying data because the following typed
# memoryview only accepts 1D arrays
try:
reshaped_view = memoryview(data).cast("B")
except Exception as err:
raise AsapoWrongInputError("cannot do no-copy flatten - non-contiguous array?") from err
cdef const unsigned char[::1] memview = reshaped_view
return <void*>&memview[0]
cdef int data_nbytes(data) except -1:
if data is None:
return 0
return memoryview(data).nbytes
cdef class PyProducer:
cdef unique_ptr[Producer] c_producer
......@@ -142,19 +144,20 @@ cdef class PyProducer:
return {'client': _str(client_info), 'server': _str(server_info), 'supported': supported}
else:
return {'client': _str(client_info)}
def __send_np_array(self, id, exposed_path,data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False):
def __send(self, id, exposed_path, data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False):
cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id)
if data is None:
message_header.data_size = 0
else:
message_header.data_size = data.nbytes
err = self.c_producer.get().Send__(message_header, data_pointer_nparray(data),ingest_mode,_bytes(stream),
unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_ndarr,
try:
message_header.data_size = data_nbytes(data)
except Exception as err:
raise AsapoProducerError("wrong data type: " + str(type(data))) from err
err = self.c_producer.get().Send__(message_header, data_pointer(data),ingest_mode,_bytes(stream),
unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_data,
<void*>self,<void*>callback, <void*>data))
if err:
throw_exception(err)
if data is not None:
Py_XINCREF(<PyObject*>data)
Py_XINCREF(<PyObject*>data)
if callback != None:
Py_XINCREF(<PyObject*>callback)
return
......@@ -172,18 +175,6 @@ cdef class PyProducer:
message_header.dataset_size = dataset[1]
return message_header
def __send_bytes(self, id, exposed_path,data, user_meta=None,dataset=None, stream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False):
cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id)
message_header.data_size = len(data)
err = self.c_producer.get().Send__(message_header, data_pointer_bytes(data), ingest_mode, _bytes(stream),
unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr,
<void*>self,<void*>callback, <void*>data))
if err:
throw_exception(err)
Py_XINCREF(<PyObject*>data)
if callback != None:
Py_XINCREF(<PyObject*>callback)
return
def send_stream_meta(self, metadata, mode = 'replace', upsert = True, stream='default', callback=None):
"""
:param stream: stream name, default "default"
......@@ -240,7 +231,7 @@ cdef class PyProducer:
:param exposed_path: Path which will be exposed to consumers
:type exposed_path: string
:param data: data to send
:type data: contiguous numpy or bytes array, can be None for INGEST_MODE_TRANSFER_METADATA_ONLY ingest mode
:type data: contiguous buffer like numpy or bytes array, can be None for INGEST_MODE_TRANSFER_METADATA_ONLY ingest mode
:param user_meta: user metadata, default None
:type user_meta: JSON string
:param dataset: a tuple with two int values (dataset substream id, amount of dataset substreams), default None
......@@ -257,12 +248,7 @@ cdef class PyProducer:
AsapoWrongInputError: wrong input (authorization, meta, ...)
AsapoProducerError: actually should not happen
"""
if type(data) == np.ndarray or data == None:
self.__send_np_array(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id)
elif type(data) == bytes:
self.__send_bytes(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id)
else:
raise(AsapoProducerError("wrong data type: " + str(type(data))))
self.__send(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id)
def send_stream_finished_flag(self, stream, uint64_t last_id, next_stream = None, callback = None):
"""
:param stream: stream name
......@@ -469,15 +455,10 @@ cdef class PyProducer:
cdef void c_callback(self,py_callback, RequestCallbackPayload payload, Error err) with gil:
self.c_callback_python(py_callback,None,payload,err)
cdef void c_callback_ndarr(self,py_callback,nd_array,RequestCallbackPayload payload, Error err) with gil:
self.c_callback_python(py_callback,nd_array,payload,err)
if nd_array is not None:
Py_XDECREF(<PyObject*>nd_array)
cdef void c_callback_data(self,py_callback,data,RequestCallbackPayload payload, Error err) with gil:
self.c_callback_python(py_callback,data,payload,err)
Py_XDECREF(<PyObject*>data)
cdef void c_callback_bytesaddr(self,py_callback,bytes_array,RequestCallbackPayload payload, Error err) with gil:
self.c_callback_python(py_callback,bytes_array,payload,err)
if bytes_array is not None:
Py_XDECREF(<PyObject*>bytes_array)
def cleanup(self):
with nogil:
if self.c_producer.get() is not NULL:
......
......@@ -105,6 +105,18 @@ else:
print("should be error sending non-cont array")
sys.exit(1)
# send data from read-only array
read_only_data = np.frombuffer(data.tobytes(), dtype=np.uint8)
producer.send(11, "processed/" + data_source + "/" + "file5", memoryview(data),
ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback)
# send data from memoryview
producer.send(12, "processed/" + data_source + "/" + "file5", memoryview(data),
ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback)
try:
producer.send(0, "processed/" + data_source + "/" + "file6", b"hello",
ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment