diff --git a/CHANGELOG.md b/CHANGELOG.md index 12ad1db7311c7d265de8aaedc4365224a4ddc0fa..3c188b0e09c768093bdd52be7c808130be703289 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,8 @@ -## 21.06.0 (in progress) +## 21.06.0 + +FEATURES +* Consumer API: C client +* Producer API: An option to automatically generate message id (use sparingly, reduced performance possible) IMPROVEMENTS * Consumer/Producer API - allow any characters in source/stream/group names diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index e50f762bc05d4b47cb920d36cc6e58e4a7f2d764..fc762012bc9d27f6974625d37184df6e5528e6c6 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -75,6 +75,8 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo": string user_metadata uint64_t dataset_substream uint64_t dataset_size + bool auto_id + cdef extern from "asapo/asapo_producer.h" namespace "asapo": struct GenericRequestHeader: diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index f681fd636d495403c55920bb580428332bfb8b27..d0101a01b7e853d53350f2413296230c7fe6cff7 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -1,5 +1,6 @@ #distutils: language=c++ + cimport asapo_producer import numpy as np cimport numpy as np @@ -139,8 +140,8 @@ cdef class PyProducer: return {'client': _str(client_info), 'server': _str(server_info), 'supported': supported} else: return {'client': _str(client_info)} - def __send_np_array(self, id, exposed_path,data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None): - cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode) + def __send_np_array(self, id, exposed_path,data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False): + cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id) if data is None: message_header.data_size = 0 else: @@ -158,11 +159,12 @@ cdef class PyProducer: if callback != None: Py_XINCREF(<PyObject*>callback) return - cdef MessageHeader create_message_header(self,uint64_t id, exposed_path,user_meta,dataset,ingest_mode): + cdef MessageHeader create_message_header(self,uint64_t id, exposed_path,user_meta,dataset,ingest_mode,auto_id): cdef MessageHeader message_header message_header.message_id = id message_header.file_name = _bytes(exposed_path) message_header.user_metadata = _bytes(user_meta) if user_meta!=None else "" + message_header.auto_id = auto_id if dataset == None: message_header.dataset_substream = 0 message_header.dataset_size = 0 @@ -171,8 +173,8 @@ cdef class PyProducer: message_header.dataset_size = dataset[1] return message_header - def __send_bytes(self, id, exposed_path,data, user_meta=None,dataset=None, stream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None): - cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode) + def __send_bytes(self, id, exposed_path,data, user_meta=None,dataset=None, stream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False): + cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id) message_header.data_size = len(data) err = self.c_producer.get().Send__(message_header, data_pointer_bytes(data), ingest_mode, _bytes(stream), unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr, @@ -231,7 +233,8 @@ cdef class PyProducer: throw_exception(err) if callback != None: Py_XINCREF(<PyObject*>callback) - def send(self, uint64_t id, exposed_path, data, user_meta=None, dataset=None, ingest_mode = DEFAULT_INGEST_MODE, stream = "default", callback=None): + def send(self, uint64_t id, exposed_path, data, user_meta=None, dataset=None, ingest_mode = DEFAULT_INGEST_MODE, + stream = "default", callback=None, auto_id = False): """ :param id: unique data id :type id: int @@ -249,14 +252,16 @@ cdef class PyProducer: :type stream: string :param callback: callback function, default None :type callback: callback(info,err), where info - json string with event header that was used to send data and response, err - error string or None + :param auto_id: a flag to assign ids automatically, id must be 0 when auto_id = True + :type auto_id: Boolean :raises: AsapoWrongInputError: wrong input (authorization, meta, ...) AsapoProducerError: actually should not happen """ if type(data) == np.ndarray or data == None: - self.__send_np_array(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback) + self.__send_np_array(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id) elif type(data) == bytes: - self.__send_bytes(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback) + self.__send_bytes(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id) else: raise(AsapoProducerError("wrong data type: " + str(type(data)))) def send_stream_finished_flag(self, stream, uint64_t last_id, next_stream = None, callback = None): @@ -371,7 +376,8 @@ cdef class PyProducer: if err: throw_exception(err) return json.loads(_str(info.Json())) - def send_file(self, uint64_t id, local_path, exposed_path, user_meta=None, dataset=None, ingest_mode = DEFAULT_INGEST_MODE, stream = "default", callback=None): + def send_file(self, uint64_t id, local_path, exposed_path, user_meta=None, dataset=None, + ingest_mode = DEFAULT_INGEST_MODE, stream = "default", callback=None, auto_id = False): """ :param id: unique data id :type id: int @@ -389,13 +395,15 @@ cdef class PyProducer: :type stream: string :param callback: callback function, default None :type callback: callback(info,err), where info - json string with event header that was used to send data and response, err - error string or None + :param auto_id: a flag to assign ids automatically, id must be 0 when auto_id = True + :type auto_id: Boolean :raises: AsapoWrongInputError: wrong input (authorization, meta, ...) AsapoLocalIOError: problems reading file to send AsapoProducerError: actually should not happen """ - cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode) + cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id) message_header.data_size = 0 err = self.c_producer.get().SendFile(message_header, _bytes(local_path), ingest_mode, _bytes(stream), unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL)) diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index 6f4bad1586c4f404f4c98afa3ba1157fa5efe18e..beb45a12dd9d5544496655a28f9ca709a017ba14 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -117,8 +117,8 @@ else: # wait before sending to another stream so we sure that this stream appears later producer.wait_requests_finished(50000) -# send to another stream -producer.send(1, "processed/" + data_source + "/" + "file9", None, +# send to another stream with auto id +producer.send(0, "processed/" + data_source + "/" + "file9", None,auto_id = True, ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, stream="stream/test $", callback=callback) # wait normal requests finished before sending duplicates @@ -153,7 +153,7 @@ n = producer.get_requests_queue_size() assert_eq(n, 0, "requests in queue") # send another data to stream stream -producer.send(2, "processed/" + data_source + "/" + "file10", None, +producer.send(0, "processed/" + data_source + "/" + "file10", None, auto_id = True, ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, stream="stream/test $", callback=callback) producer.wait_requests_finished(50000)