Skip to content
Snippets Groups Projects
Commit 5d38d752 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

update tests, changelog

parent ebbc247b
Branches
Tags
No related merge requests found
## 21.06.0 (in progress)
## 21.06.0
FEATURES
* Consumer API: C client
* Producer API: An option to automatically generate message id (use sparingly, reduced performance possible)
IMPROVEMENTS
* Consumer/Producer API - allow any characters in source/stream/group names
......
......@@ -75,6 +75,8 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo":
string user_metadata
uint64_t dataset_substream
uint64_t dataset_size
bool auto_id
cdef extern from "asapo/asapo_producer.h" namespace "asapo":
struct GenericRequestHeader:
......
#distutils: language=c++
cimport asapo_producer
import numpy as np
cimport numpy as np
......@@ -139,8 +140,8 @@ cdef class PyProducer:
return {'client': _str(client_info), 'server': _str(server_info), 'supported': supported}
else:
return {'client': _str(client_info)}
def __send_np_array(self, id, exposed_path,data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None):
cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode)
def __send_np_array(self, id, exposed_path,data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False):
cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id)
if data is None:
message_header.data_size = 0
else:
......@@ -158,11 +159,12 @@ cdef class PyProducer:
if callback != None:
Py_XINCREF(<PyObject*>callback)
return
cdef MessageHeader create_message_header(self,uint64_t id, exposed_path,user_meta,dataset,ingest_mode):
cdef MessageHeader create_message_header(self,uint64_t id, exposed_path,user_meta,dataset,ingest_mode,auto_id):
cdef MessageHeader message_header
message_header.message_id = id
message_header.file_name = _bytes(exposed_path)
message_header.user_metadata = _bytes(user_meta) if user_meta!=None else ""
message_header.auto_id = auto_id
if dataset == None:
message_header.dataset_substream = 0
message_header.dataset_size = 0
......@@ -171,8 +173,8 @@ cdef class PyProducer:
message_header.dataset_size = dataset[1]
return message_header
def __send_bytes(self, id, exposed_path,data, user_meta=None,dataset=None, stream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None):
cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode)
def __send_bytes(self, id, exposed_path,data, user_meta=None,dataset=None, stream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False):
cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id)
message_header.data_size = len(data)
err = self.c_producer.get().Send__(message_header, data_pointer_bytes(data), ingest_mode, _bytes(stream),
unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr,
......@@ -231,7 +233,8 @@ cdef class PyProducer:
throw_exception(err)
if callback != None:
Py_XINCREF(<PyObject*>callback)
def send(self, uint64_t id, exposed_path, data, user_meta=None, dataset=None, ingest_mode = DEFAULT_INGEST_MODE, stream = "default", callback=None):
def send(self, uint64_t id, exposed_path, data, user_meta=None, dataset=None, ingest_mode = DEFAULT_INGEST_MODE,
stream = "default", callback=None, auto_id = False):
"""
:param id: unique data id
:type id: int
......@@ -249,14 +252,16 @@ cdef class PyProducer:
:type stream: string
:param callback: callback function, default None
:type callback: callback(info,err), where info - json string with event header that was used to send data and response, err - error string or None
:param auto_id: a flag to assign ids automatically, id must be 0 when auto_id = True
:type auto_id: Boolean
:raises:
AsapoWrongInputError: wrong input (authorization, meta, ...)
AsapoProducerError: actually should not happen
"""
if type(data) == np.ndarray or data == None:
self.__send_np_array(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback)
self.__send_np_array(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id)
elif type(data) == bytes:
self.__send_bytes(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback)
self.__send_bytes(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id)
else:
raise(AsapoProducerError("wrong data type: " + str(type(data))))
def send_stream_finished_flag(self, stream, uint64_t last_id, next_stream = None, callback = None):
......@@ -371,7 +376,8 @@ cdef class PyProducer:
if err:
throw_exception(err)
return json.loads(_str(info.Json()))
def send_file(self, uint64_t id, local_path, exposed_path, user_meta=None, dataset=None, ingest_mode = DEFAULT_INGEST_MODE, stream = "default", callback=None):
def send_file(self, uint64_t id, local_path, exposed_path, user_meta=None, dataset=None,
ingest_mode = DEFAULT_INGEST_MODE, stream = "default", callback=None, auto_id = False):
"""
:param id: unique data id
:type id: int
......@@ -389,13 +395,15 @@ cdef class PyProducer:
:type stream: string
:param callback: callback function, default None
:type callback: callback(info,err), where info - json string with event header that was used to send data and response, err - error string or None
:param auto_id: a flag to assign ids automatically, id must be 0 when auto_id = True
:type auto_id: Boolean
:raises:
AsapoWrongInputError: wrong input (authorization, meta, ...)
AsapoLocalIOError: problems reading file to send
AsapoProducerError: actually should not happen
"""
cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode)
cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id)
message_header.data_size = 0
err = self.c_producer.get().SendFile(message_header, _bytes(local_path), ingest_mode, _bytes(stream),
unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL))
......
......@@ -117,8 +117,8 @@ else:
# wait before sending to another stream so we sure that this stream appears later
producer.wait_requests_finished(50000)
# send to another stream
producer.send(1, "processed/" + data_source + "/" + "file9", None,
# send to another stream with auto id
producer.send(0, "processed/" + data_source + "/" + "file9", None,auto_id = True,
ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, stream="stream/test $", callback=callback)
# wait normal requests finished before sending duplicates
......@@ -153,7 +153,7 @@ n = producer.get_requests_queue_size()
assert_eq(n, 0, "requests in queue")
# send another data to stream stream
producer.send(2, "processed/" + data_source + "/" + "file10", None,
producer.send(0, "processed/" + data_source + "/" + "file10", None, auto_id = True,
ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, stream="stream/test $", callback=callback)
producer.wait_requests_finished(50000)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment