From f2b6fb0107951462aca8272954a371d495cc5dbf Mon Sep 17 00:00:00 2001
From: Tim Schoof <tim.schoof@desy.de>
Date: Wed, 23 Nov 2022 18:27:13 +0100
Subject: [PATCH] Allow sending and object supporting the buffer interface

This includes read-only buffers.

The code split between bytes and numpy array is removed and all objects
are now handled equally.

Furthermore, it seems to be more correct to increase the ref count for
None objects as well, while a pointer to that object is used.
---
 producer/api/python/asapo_producer.pyx.in     | 85 +++++++------------
 .../producer/python_api/producer_api.py       | 12 +++
 2 files changed, 45 insertions(+), 52 deletions(-)

diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in
index ef9942af1..ba9a5a837 100644
--- a/producer/api/python/asapo_producer.pyx.in
+++ b/producer/api/python/asapo_producer.pyx.in
@@ -89,22 +89,24 @@ cdef python_exception_from_error(Error& err):
 cdef throw_exception(Error& err):
     raise python_exception_from_error(err)
 
-cdef void* data_pointer_nparray(data) except? NULL:
-  if data is None:
-    return <void*>NULL
-  data_char = data.view(np.int8)
-  try:
-    data_char.shape=(-1)
-  except:
-    raise AsapoWrongInputError("cannot do no-copy flatten - non-contiguous array?")
-  cdef char[::1] arr_memview = data_char
-  return <void*>&arr_memview[0]
-
-cdef void* data_pointer_bytes(data):
-  if data is None:
-    return <void*>NULL
-  cdef const unsigned char[::1] arr_memview = data
-  return <void*>&arr_memview[0]
+cdef void* data_pointer(data) except? NULL:
+    if data is None:
+        return <void*>NULL
+
+    # create a 1D view of unsigned bytes of the underlying data because the following typed
+    # memoryview only accepts 1D arrays
+    try:
+        reshaped_view = memoryview(data).cast("B")
+    except Exception as err:
+        raise AsapoWrongInputError("cannot do no-copy flatten - non-contiguous array?") from err
+
+    cdef const unsigned char[::1] memview = reshaped_view
+    return <void*>&memview[0]
+
+cdef int data_nbytes(data) except -1:
+    if data is None:
+        return 0
+    return memoryview(data).nbytes
 
 cdef class PyProducer:
     cdef unique_ptr[Producer] c_producer
@@ -142,19 +144,20 @@ cdef class PyProducer:
             return {'client': _str(client_info), 'server': _str(server_info), 'supported': supported}
         else:
             return {'client': _str(client_info)}
-    def __send_np_array(self, id, exposed_path,data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False):
+    def __send(self, id, exposed_path, data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False):
         cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id)
-        if data is None:
-            message_header.data_size = 0
-        else:
-            message_header.data_size = data.nbytes
-        err = self.c_producer.get().Send__(message_header, data_pointer_nparray(data),ingest_mode,_bytes(stream),
-            unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_ndarr,
+        try:
+            message_header.data_size = data_nbytes(data)
+        except Exception as err:
+            raise AsapoProducerError("wrong data type: " + str(type(data))) from err
+
+        err = self.c_producer.get().Send__(message_header, data_pointer(data),ingest_mode,_bytes(stream),
+            unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_data,
              <void*>self,<void*>callback, <void*>data))
         if err:
             throw_exception(err)
-        if data is not None:
-            Py_XINCREF(<PyObject*>data)
+
+        Py_XINCREF(<PyObject*>data)
         if callback != None:
             Py_XINCREF(<PyObject*>callback)
         return
@@ -172,18 +175,6 @@ cdef class PyProducer:
             message_header.dataset_size = dataset[1]
         return message_header
 
-    def __send_bytes(self, id, exposed_path,data, user_meta=None,dataset=None, stream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False):
-        cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id)
-        message_header.data_size = len(data)
-        err = self.c_producer.get().Send__(message_header, data_pointer_bytes(data), ingest_mode, _bytes(stream),
-            unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr,
-             <void*>self,<void*>callback, <void*>data))
-        if err:
-            throw_exception(err)
-        Py_XINCREF(<PyObject*>data)
-        if callback != None:
-            Py_XINCREF(<PyObject*>callback)
-        return
     def send_stream_meta(self, metadata, mode = 'replace', upsert = True, stream='default', callback=None):
         """
          :param stream: stream name, default "default"
@@ -240,7 +231,7 @@ cdef class PyProducer:
          :param exposed_path: Path which will be exposed to consumers
          :type exposed_path: string
          :param data: data to send
-         :type data: contiguous numpy or bytes array, can be None for INGEST_MODE_TRANSFER_METADATA_ONLY ingest mode
+         :type data: contiguous buffer like numpy or bytes array, can be None for INGEST_MODE_TRANSFER_METADATA_ONLY ingest mode
          :param user_meta: user metadata, default None
          :type user_meta: JSON string
          :param dataset: a tuple with two int values (dataset substream id, amount of dataset substreams), default None
@@ -257,12 +248,7 @@ cdef class PyProducer:
             AsapoWrongInputError: wrong input (authorization, meta, ...)
             AsapoProducerError: actually should not happen
         """
-        if type(data) == np.ndarray or data == None:
-            self.__send_np_array(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id)
-        elif type(data) == bytes:
-            self.__send_bytes(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id)
-        else:
-            raise(AsapoProducerError("wrong data type: " + str(type(data))))
+        self.__send(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id)
     def send_stream_finished_flag(self, stream, uint64_t last_id, next_stream = None, callback = None):
         """
          :param stream: stream name
@@ -469,15 +455,10 @@ cdef class PyProducer:
     cdef void c_callback(self,py_callback, RequestCallbackPayload payload, Error err) with gil:
         self.c_callback_python(py_callback,None,payload,err)
 
-    cdef void c_callback_ndarr(self,py_callback,nd_array,RequestCallbackPayload payload, Error err) with gil:
-        self.c_callback_python(py_callback,nd_array,payload,err)
-        if nd_array is not None:
-            Py_XDECREF(<PyObject*>nd_array)
+    cdef void c_callback_data(self,py_callback,data,RequestCallbackPayload payload, Error err) with gil:
+        self.c_callback_python(py_callback,data,payload,err)
+        Py_XDECREF(<PyObject*>data)
 
-    cdef void c_callback_bytesaddr(self,py_callback,bytes_array,RequestCallbackPayload payload, Error err) with gil:
-        self.c_callback_python(py_callback,bytes_array,payload,err)
-        if bytes_array is not None:
-            Py_XDECREF(<PyObject*>bytes_array)
     def cleanup(self):
         with  nogil:
             if self.c_producer.get() is not NULL:
diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py
index beb45a12d..989014d9c 100644
--- a/tests/automatic/producer/python_api/producer_api.py
+++ b/tests/automatic/producer/python_api/producer_api.py
@@ -105,6 +105,18 @@ else:
     print("should be error sending non-cont array")
     sys.exit(1)
 
+
+# send data from read-only array
+read_only_data = np.frombuffer(data.tobytes(), dtype=np.uint8)
+producer.send(11, "processed/" + data_source + "/" + "file5", memoryview(data),
+                   ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback)
+
+
+# send data from memoryview
+producer.send(12, "processed/" + data_source + "/" + "file5", memoryview(data),
+                   ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback)
+
+
 try:
     producer.send(0, "processed/" + data_source + "/" + "file6", b"hello",
                        ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback)
-- 
GitLab