diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index ef9942af1639567b05b649e2ed3b49c09ff3d4bf..ba9a5a837725b2ef9c1f6824de7b838d297578cb 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -89,22 +89,24 @@ cdef python_exception_from_error(Error& err): cdef throw_exception(Error& err): raise python_exception_from_error(err) -cdef void* data_pointer_nparray(data) except? NULL: - if data is None: - return <void*>NULL - data_char = data.view(np.int8) - try: - data_char.shape=(-1) - except: - raise AsapoWrongInputError("cannot do no-copy flatten - non-contiguous array?") - cdef char[::1] arr_memview = data_char - return <void*>&arr_memview[0] - -cdef void* data_pointer_bytes(data): - if data is None: - return <void*>NULL - cdef const unsigned char[::1] arr_memview = data - return <void*>&arr_memview[0] +cdef void* data_pointer(data) except? NULL: + if data is None: + return <void*>NULL + + # create a 1D view of unsigned bytes of the underlying data because the following typed + # memoryview only accepts 1D arrays + try: + reshaped_view = memoryview(data).cast("B") + except Exception as err: + raise AsapoWrongInputError("cannot do no-copy flatten - non-contiguous array?") from err + + cdef const unsigned char[::1] memview = reshaped_view + return <void*>&memview[0] + +cdef int data_nbytes(data) except -1: + if data is None: + return 0 + return memoryview(data).nbytes cdef class PyProducer: cdef unique_ptr[Producer] c_producer @@ -142,19 +144,20 @@ cdef class PyProducer: return {'client': _str(client_info), 'server': _str(server_info), 'supported': supported} else: return {'client': _str(client_info)} - def __send_np_array(self, id, exposed_path,data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False): + def __send(self, id, exposed_path, data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False): cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id) - if data is None: - message_header.data_size = 0 - else: - message_header.data_size = data.nbytes - err = self.c_producer.get().Send__(message_header, data_pointer_nparray(data),ingest_mode,_bytes(stream), - unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_ndarr, + try: + message_header.data_size = data_nbytes(data) + except Exception as err: + raise AsapoProducerError("wrong data type: " + str(type(data))) from err + + err = self.c_producer.get().Send__(message_header, data_pointer(data),ingest_mode,_bytes(stream), + unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_data, <void*>self,<void*>callback, <void*>data)) if err: throw_exception(err) - if data is not None: - Py_XINCREF(<PyObject*>data) + + Py_XINCREF(<PyObject*>data) if callback != None: Py_XINCREF(<PyObject*>callback) return @@ -172,18 +175,6 @@ cdef class PyProducer: message_header.dataset_size = dataset[1] return message_header - def __send_bytes(self, id, exposed_path,data, user_meta=None,dataset=None, stream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False): - cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id) - message_header.data_size = len(data) - err = self.c_producer.get().Send__(message_header, data_pointer_bytes(data), ingest_mode, _bytes(stream), - unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr, - <void*>self,<void*>callback, <void*>data)) - if err: - throw_exception(err) - Py_XINCREF(<PyObject*>data) - if callback != None: - Py_XINCREF(<PyObject*>callback) - return def send_stream_meta(self, metadata, mode = 'replace', upsert = True, stream='default', callback=None): """ :param stream: stream name, default "default" @@ -240,7 +231,7 @@ cdef class PyProducer: :param exposed_path: Path which will be exposed to consumers :type exposed_path: string :param data: data to send - :type data: contiguous numpy or bytes array, can be None for INGEST_MODE_TRANSFER_METADATA_ONLY ingest mode + :type data: contiguous buffer like numpy or bytes array, can be None for INGEST_MODE_TRANSFER_METADATA_ONLY ingest mode :param user_meta: user metadata, default None :type user_meta: JSON string :param dataset: a tuple with two int values (dataset substream id, amount of dataset substreams), default None @@ -257,12 +248,7 @@ cdef class PyProducer: AsapoWrongInputError: wrong input (authorization, meta, ...) AsapoProducerError: actually should not happen """ - if type(data) == np.ndarray or data == None: - self.__send_np_array(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id) - elif type(data) == bytes: - self.__send_bytes(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id) - else: - raise(AsapoProducerError("wrong data type: " + str(type(data)))) + self.__send(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id) def send_stream_finished_flag(self, stream, uint64_t last_id, next_stream = None, callback = None): """ :param stream: stream name @@ -469,15 +455,10 @@ cdef class PyProducer: cdef void c_callback(self,py_callback, RequestCallbackPayload payload, Error err) with gil: self.c_callback_python(py_callback,None,payload,err) - cdef void c_callback_ndarr(self,py_callback,nd_array,RequestCallbackPayload payload, Error err) with gil: - self.c_callback_python(py_callback,nd_array,payload,err) - if nd_array is not None: - Py_XDECREF(<PyObject*>nd_array) + cdef void c_callback_data(self,py_callback,data,RequestCallbackPayload payload, Error err) with gil: + self.c_callback_python(py_callback,data,payload,err) + Py_XDECREF(<PyObject*>data) - cdef void c_callback_bytesaddr(self,py_callback,bytes_array,RequestCallbackPayload payload, Error err) with gil: - self.c_callback_python(py_callback,bytes_array,payload,err) - if bytes_array is not None: - Py_XDECREF(<PyObject*>bytes_array) def cleanup(self): with nogil: if self.c_producer.get() is not NULL: diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index beb45a12dd9d5544496655a28f9ca709a017ba14..989014d9ca379329cd4e6312797db1d012ccea58 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -105,6 +105,18 @@ else: print("should be error sending non-cont array") sys.exit(1) + +# send data from read-only array +read_only_data = np.frombuffer(data.tobytes(), dtype=np.uint8) +producer.send(11, "processed/" + data_source + "/" + "file5", memoryview(data), + ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback) + + +# send data from memoryview +producer.send(12, "processed/" + data_source + "/" + "file5", memoryview(data), + ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback) + + try: producer.send(0, "processed/" + data_source + "/" + "file6", b"hello", ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback)