diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index ef9942af1639567b05b649e2ed3b49c09ff3d4bf..03502c18abb204fe87c9651ee796f5fc521d0cba 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -89,22 +89,24 @@ cdef python_exception_from_error(Error& err): cdef throw_exception(Error& err): raise python_exception_from_error(err) -cdef void* data_pointer_nparray(data) except? NULL: - if data is None: - return <void*>NULL - data_char = data.view(np.int8) - try: - data_char.shape=(-1) - except: - raise AsapoWrongInputError("cannot do no-copy flatten - non-contiguous array?") - cdef char[::1] arr_memview = data_char - return <void*>&arr_memview[0] - -cdef void* data_pointer_bytes(data): - if data is None: - return <void*>NULL - cdef const unsigned char[::1] arr_memview = data - return <void*>&arr_memview[0] +cdef void* data_pointer(data) except? NULL: + if data is None: + return <void*>NULL + + # create a 1D view of unsigned bytes of the underlying data because the following typed + # memoryview only accepts 1D arrays + try: + reshaped_view = memoryview(data).cast("B") + except Exception as err: + raise AsapoWrongInputError("cannot do no-copy flatten - non-contiguous array?") from err + + cdef const unsigned char[::1] memview = reshaped_view + return <void*>&memview[0] + +def data_nbytes(data): + if data is None: + return 0 + return memoryview(data).nbytes cdef class PyProducer: cdef unique_ptr[Producer] c_producer @@ -142,19 +144,20 @@ cdef class PyProducer: return {'client': _str(client_info), 'server': _str(server_info), 'supported': supported} else: return {'client': _str(client_info)} - def __send_np_array(self, id, exposed_path,data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False): + def __send(self, id, exposed_path, data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False): cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id) - if data is None: - message_header.data_size = 0 - else: - message_header.data_size = data.nbytes - err = self.c_producer.get().Send__(message_header, data_pointer_nparray(data),ingest_mode,_bytes(stream), - unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_ndarr, + try: + message_header.data_size = data_nbytes(data) + except Exception as ex: + raise AsapoProducerError("wrong data type: " + str(type(data))) from ex + + err = self.c_producer.get().Send__(message_header, data_pointer(data),ingest_mode,_bytes(stream), + unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_data, <void*>self,<void*>callback, <void*>data)) if err: throw_exception(err) - if data is not None: - Py_XINCREF(<PyObject*>data) + + Py_XINCREF(<PyObject*>data) if callback != None: Py_XINCREF(<PyObject*>callback) return @@ -172,18 +175,6 @@ cdef class PyProducer: message_header.dataset_size = dataset[1] return message_header - def __send_bytes(self, id, exposed_path,data, user_meta=None,dataset=None, stream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False): - cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id) - message_header.data_size = len(data) - err = self.c_producer.get().Send__(message_header, data_pointer_bytes(data), ingest_mode, _bytes(stream), - unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr, - <void*>self,<void*>callback, <void*>data)) - if err: - throw_exception(err) - Py_XINCREF(<PyObject*>data) - if callback != None: - Py_XINCREF(<PyObject*>callback) - return def send_stream_meta(self, metadata, mode = 'replace', upsert = True, stream='default', callback=None): """ :param stream: stream name, default "default" @@ -240,7 +231,7 @@ cdef class PyProducer: :param exposed_path: Path which will be exposed to consumers :type exposed_path: string :param data: data to send - :type data: contiguous numpy or bytes array, can be None for INGEST_MODE_TRANSFER_METADATA_ONLY ingest mode + :type data: contiguous buffer like numpy or bytes array, can be None for INGEST_MODE_TRANSFER_METADATA_ONLY ingest mode :param user_meta: user metadata, default None :type user_meta: JSON string :param dataset: a tuple with two int values (dataset substream id, amount of dataset substreams), default None @@ -257,12 +248,7 @@ cdef class PyProducer: AsapoWrongInputError: wrong input (authorization, meta, ...) AsapoProducerError: actually should not happen """ - if type(data) == np.ndarray or data == None: - self.__send_np_array(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id) - elif type(data) == bytes: - self.__send_bytes(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id) - else: - raise(AsapoProducerError("wrong data type: " + str(type(data)))) + self.__send(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id) def send_stream_finished_flag(self, stream, uint64_t last_id, next_stream = None, callback = None): """ :param stream: stream name @@ -469,15 +455,10 @@ cdef class PyProducer: cdef void c_callback(self,py_callback, RequestCallbackPayload payload, Error err) with gil: self.c_callback_python(py_callback,None,payload,err) - cdef void c_callback_ndarr(self,py_callback,nd_array,RequestCallbackPayload payload, Error err) with gil: - self.c_callback_python(py_callback,nd_array,payload,err) - if nd_array is not None: - Py_XDECREF(<PyObject*>nd_array) + cdef void c_callback_data(self,py_callback,data,RequestCallbackPayload payload, Error err) with gil: + self.c_callback_python(py_callback,data,payload,err) + Py_XDECREF(<PyObject*>data) - cdef void c_callback_bytesaddr(self,py_callback,bytes_array,RequestCallbackPayload payload, Error err) with gil: - self.c_callback_python(py_callback,bytes_array,payload,err) - if bytes_array is not None: - Py_XDECREF(<PyObject*>bytes_array) def cleanup(self): with nogil: if self.c_producer.get() is not NULL: diff --git a/tests/automatic/producer/python_api/check_linux.sh b/tests/automatic/producer/python_api/check_linux.sh index 51552b6eb0ce43d9eb1fd742872b1ab1859d0c95..6bef80f4d18633847bd24c22fc7e19ab922df4ff 100644 --- a/tests/automatic/producer/python_api/check_linux.sh +++ b/tests/automatic/producer/python_api/check_linux.sh @@ -27,8 +27,8 @@ echo test > file1 $1 $3 $data_source $beamtime_id "127.0.0.1:8400" &> out || cat out cat out -echo count successfully send, expect 17 -cat out | grep "successfuly sent" | wc -l | tee /dev/stderr | grep 17 +echo count successfully send, expect 19 +cat out | grep "successfuly sent" | wc -l | tee /dev/stderr | grep 19 echo count wrong input, expect 11 cat out | grep "wrong input" | wc -l | tee /dev/stderr | grep 11 @@ -39,7 +39,7 @@ cat out | grep "already have record with same id" | wc -l | tee /dev/stderr | gr echo count duplicates, expect 6 cat out | grep "duplicate" | wc -l | tee /dev/stderr | grep 6 echo count data in callback, expect 6 -cat out | grep "'data':" | wc -l | tee /dev/stderr | grep 6 +cat out | grep "'data':" | wc -l | tee /dev/stderr | grep 8 echo check found local io error cat out | grep "local i/o error" cat out | grep "Finished successfully" diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index beb45a12dd9d5544496655a28f9ca709a017ba14..14f1a18a7f7284afda97927b3f51132ec8a1959c 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -105,6 +105,18 @@ else: print("should be error sending non-cont array") sys.exit(1) + +# send data from read-only array +read_only_data = np.frombuffer(data.tobytes(), dtype=np.uint8) +producer.send(11, "processed/" + data_source + "/" + "file11", memoryview(data), + ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback) + + +# send data from memoryview +producer.send(12, "processed/" + data_source + "/" + "file12", memoryview(data), + ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback) + + try: producer.send(0, "processed/" + data_source + "/" + "file6", b"hello", ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback) @@ -164,7 +176,7 @@ assert_eq(n, 0, "requests in queue") data = np.arange(1000000, dtype=np.float64) producer.set_requests_queue_limits(0,1) try: - producer.send(11, "processed/bla", data) + producer.send(13, "processed/bla", data) except asapo_producer.AsapoRequestsPoolIsFull as e: print(e) else: @@ -189,7 +201,7 @@ assert_eq(no_meta, None, "no meta") #stream infos info = producer.stream_info() -assert_eq(info['lastId'], 10, "stream_info last id") +assert_eq(info['lastId'], 12, "stream_info last id") assert_eq(info['name'], "default", "stream_info name") assert_eq(info['timestampCreated']/1000000000<time.time(),True , "stream_info time created") assert_eq(info['timestampCreated']/1000000000>time.time()-10,True , "stream_info time created")