Skip to content
Snippets Groups Projects
Commit e47e3fb8 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Merge branch 'accept_more_python_objects' into 'develop'

Allow sending objects supporting the buffer interface

Closes #7

See merge request asapo/asapo!187
parents 9549a565 6117c9f1
No related branches found
No related tags found
No related merge requests found
......@@ -89,22 +89,24 @@ cdef python_exception_from_error(Error& err):
cdef throw_exception(Error& err):
raise python_exception_from_error(err)
cdef void* data_pointer_nparray(data) except? NULL:
if data is None:
return <void*>NULL
data_char = data.view(np.int8)
try:
data_char.shape=(-1)
except:
raise AsapoWrongInputError("cannot do no-copy flatten - non-contiguous array?")
cdef char[::1] arr_memview = data_char
return <void*>&arr_memview[0]
cdef void* data_pointer_bytes(data):
if data is None:
return <void*>NULL
cdef const unsigned char[::1] arr_memview = data
return <void*>&arr_memview[0]
cdef void* data_pointer(data) except? NULL:
if data is None:
return <void*>NULL
# create a 1D view of unsigned bytes of the underlying data because the following typed
# memoryview only accepts 1D arrays
try:
reshaped_view = memoryview(data).cast("B")
except Exception as err:
raise AsapoWrongInputError("cannot do no-copy flatten - non-contiguous array?") from err
cdef const unsigned char[::1] memview = reshaped_view
return <void*>&memview[0]
def data_nbytes(data):
if data is None:
return 0
return memoryview(data).nbytes
cdef class PyProducer:
cdef unique_ptr[Producer] c_producer
......@@ -142,19 +144,20 @@ cdef class PyProducer:
return {'client': _str(client_info), 'server': _str(server_info), 'supported': supported}
else:
return {'client': _str(client_info)}
def __send_np_array(self, id, exposed_path,data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False):
def __send(self, id, exposed_path, data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False):
cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id)
if data is None:
message_header.data_size = 0
else:
message_header.data_size = data.nbytes
err = self.c_producer.get().Send__(message_header, data_pointer_nparray(data),ingest_mode,_bytes(stream),
unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_ndarr,
try:
message_header.data_size = data_nbytes(data)
except Exception as ex:
raise AsapoProducerError("wrong data type: " + str(type(data))) from ex
err = self.c_producer.get().Send__(message_header, data_pointer(data),ingest_mode,_bytes(stream),
unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_data,
<void*>self,<void*>callback, <void*>data))
if err:
throw_exception(err)
if data is not None:
Py_XINCREF(<PyObject*>data)
Py_XINCREF(<PyObject*>data)
if callback != None:
Py_XINCREF(<PyObject*>callback)
return
......@@ -172,18 +175,6 @@ cdef class PyProducer:
message_header.dataset_size = dataset[1]
return message_header
def __send_bytes(self, id, exposed_path,data, user_meta=None,dataset=None, stream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False):
cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id)
message_header.data_size = len(data)
err = self.c_producer.get().Send__(message_header, data_pointer_bytes(data), ingest_mode, _bytes(stream),
unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr,
<void*>self,<void*>callback, <void*>data))
if err:
throw_exception(err)
Py_XINCREF(<PyObject*>data)
if callback != None:
Py_XINCREF(<PyObject*>callback)
return
def send_stream_meta(self, metadata, mode = 'replace', upsert = True, stream='default', callback=None):
"""
:param stream: stream name, default "default"
......@@ -240,7 +231,7 @@ cdef class PyProducer:
:param exposed_path: Path which will be exposed to consumers
:type exposed_path: string
:param data: data to send
:type data: contiguous numpy or bytes array, can be None for INGEST_MODE_TRANSFER_METADATA_ONLY ingest mode
:type data: contiguous buffer like numpy or bytes array, can be None for INGEST_MODE_TRANSFER_METADATA_ONLY ingest mode
:param user_meta: user metadata, default None
:type user_meta: JSON string
:param dataset: a tuple with two int values (dataset substream id, amount of dataset substreams), default None
......@@ -257,12 +248,7 @@ cdef class PyProducer:
AsapoWrongInputError: wrong input (authorization, meta, ...)
AsapoProducerError: actually should not happen
"""
if type(data) == np.ndarray or data == None:
self.__send_np_array(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id)
elif type(data) == bytes:
self.__send_bytes(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id)
else:
raise(AsapoProducerError("wrong data type: " + str(type(data))))
self.__send(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id)
def send_stream_finished_flag(self, stream, uint64_t last_id, next_stream = None, callback = None):
"""
:param stream: stream name
......@@ -469,15 +455,10 @@ cdef class PyProducer:
cdef void c_callback(self,py_callback, RequestCallbackPayload payload, Error err) with gil:
self.c_callback_python(py_callback,None,payload,err)
cdef void c_callback_ndarr(self,py_callback,nd_array,RequestCallbackPayload payload, Error err) with gil:
self.c_callback_python(py_callback,nd_array,payload,err)
if nd_array is not None:
Py_XDECREF(<PyObject*>nd_array)
cdef void c_callback_data(self,py_callback,data,RequestCallbackPayload payload, Error err) with gil:
self.c_callback_python(py_callback,data,payload,err)
Py_XDECREF(<PyObject*>data)
cdef void c_callback_bytesaddr(self,py_callback,bytes_array,RequestCallbackPayload payload, Error err) with gil:
self.c_callback_python(py_callback,bytes_array,payload,err)
if bytes_array is not None:
Py_XDECREF(<PyObject*>bytes_array)
def cleanup(self):
with nogil:
if self.c_producer.get() is not NULL:
......
......@@ -27,8 +27,8 @@ echo test > file1
$1 $3 $data_source $beamtime_id "127.0.0.1:8400" &> out || cat out
cat out
echo count successfully send, expect 17
cat out | grep "successfuly sent" | wc -l | tee /dev/stderr | grep 17
echo count successfully send, expect 19
cat out | grep "successfuly sent" | wc -l | tee /dev/stderr | grep 19
echo count wrong input, expect 11
cat out | grep "wrong input" | wc -l | tee /dev/stderr | grep 11
......@@ -39,7 +39,7 @@ cat out | grep "already have record with same id" | wc -l | tee /dev/stderr | gr
echo count duplicates, expect 6
cat out | grep "duplicate" | wc -l | tee /dev/stderr | grep 6
echo count data in callback, expect 6
cat out | grep "'data':" | wc -l | tee /dev/stderr | grep 6
cat out | grep "'data':" | wc -l | tee /dev/stderr | grep 8
echo check found local io error
cat out | grep "local i/o error"
cat out | grep "Finished successfully"
......@@ -105,6 +105,18 @@ else:
print("should be error sending non-cont array")
sys.exit(1)
# send data from read-only array
read_only_data = np.frombuffer(data.tobytes(), dtype=np.uint8)
producer.send(11, "processed/" + data_source + "/" + "file11", memoryview(data),
ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback)
# send data from memoryview
producer.send(12, "processed/" + data_source + "/" + "file12", memoryview(data),
ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback)
try:
producer.send(0, "processed/" + data_source + "/" + "file6", b"hello",
ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback)
......@@ -164,7 +176,7 @@ assert_eq(n, 0, "requests in queue")
data = np.arange(1000000, dtype=np.float64)
producer.set_requests_queue_limits(0,1)
try:
producer.send(11, "processed/bla", data)
producer.send(13, "processed/bla", data)
except asapo_producer.AsapoRequestsPoolIsFull as e:
print(e)
else:
......@@ -189,7 +201,7 @@ assert_eq(no_meta, None, "no meta")
#stream infos
info = producer.stream_info()
assert_eq(info['lastId'], 10, "stream_info last id")
assert_eq(info['lastId'], 12, "stream_info last id")
assert_eq(info['name'], "default", "stream_info name")
assert_eq(info['timestampCreated']/1000000000<time.time(),True , "stream_info time created")
assert_eq(info['timestampCreated']/1000000000>time.time()-10,True , "stream_info time created")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment