From 5d38d75287ec3123531e17c633b6af7badf96c79 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Tue, 29 Jun 2021 15:45:21 +0200
Subject: [PATCH] update tests, changelog

---
 CHANGELOG.md                                  |  6 +++-
 producer/api/python/asapo_producer.pxd        |  2 ++
 producer/api/python/asapo_producer.pyx.in     | 28 ++++++++++++-------
 .../producer/python_api/producer_api.py       |  6 ++--
 4 files changed, 28 insertions(+), 14 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 12ad1db73..3c188b0e0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,8 @@
-## 21.06.0 (in progress)
+## 21.06.0
+
+FEATURES
+* Consumer API: C client 
+* Producer API: An option to automatically generate message id (use sparingly, reduced performance possible)  
 
 IMPROVEMENTS
 * Consumer/Producer API - allow any characters in source/stream/group names
diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd
index e50f762bc..fc762012b 100644
--- a/producer/api/python/asapo_producer.pxd
+++ b/producer/api/python/asapo_producer.pxd
@@ -75,6 +75,8 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo":
     string user_metadata
     uint64_t dataset_substream
     uint64_t dataset_size
+    bool auto_id
+
 
 cdef extern from "asapo/asapo_producer.h" namespace "asapo":
   struct  GenericRequestHeader:
diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in
index f681fd636..d0101a01b 100644
--- a/producer/api/python/asapo_producer.pyx.in
+++ b/producer/api/python/asapo_producer.pyx.in
@@ -1,5 +1,6 @@
 #distutils: language=c++
 
+
 cimport asapo_producer
 import numpy as np
 cimport numpy as np
@@ -139,8 +140,8 @@ cdef class PyProducer:
             return {'client': _str(client_info), 'server': _str(server_info), 'supported': supported}
         else:
             return {'client': _str(client_info)}
-    def __send_np_array(self, id, exposed_path,data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None):
-        cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode)
+    def __send_np_array(self, id, exposed_path,data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False):
+        cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id)
         if data is None:
             message_header.data_size = 0
         else:
@@ -158,11 +159,12 @@ cdef class PyProducer:
         if callback != None:
             Py_XINCREF(<PyObject*>callback)
         return
-    cdef MessageHeader create_message_header(self,uint64_t id, exposed_path,user_meta,dataset,ingest_mode):
+    cdef MessageHeader create_message_header(self,uint64_t id, exposed_path,user_meta,dataset,ingest_mode,auto_id):
         cdef MessageHeader message_header
         message_header.message_id = id
         message_header.file_name = _bytes(exposed_path)
         message_header.user_metadata = _bytes(user_meta) if user_meta!=None else ""
+        message_header.auto_id = auto_id
         if dataset == None:
             message_header.dataset_substream = 0
             message_header.dataset_size = 0
@@ -171,8 +173,8 @@ cdef class PyProducer:
             message_header.dataset_size = dataset[1]
         return message_header
 
-    def __send_bytes(self, id, exposed_path,data, user_meta=None,dataset=None, stream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None):
-        cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode)
+    def __send_bytes(self, id, exposed_path,data, user_meta=None,dataset=None, stream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None, auto_id = False):
+        cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id)
         message_header.data_size = len(data)
         err = self.c_producer.get().Send__(message_header, data_pointer_bytes(data), ingest_mode, _bytes(stream),
             unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr,
@@ -231,7 +233,8 @@ cdef class PyProducer:
             throw_exception(err)
         if callback != None:
             Py_XINCREF(<PyObject*>callback)
-    def send(self, uint64_t id, exposed_path, data, user_meta=None, dataset=None, ingest_mode = DEFAULT_INGEST_MODE, stream = "default", callback=None):
+    def send(self, uint64_t id, exposed_path, data, user_meta=None, dataset=None, ingest_mode = DEFAULT_INGEST_MODE,
+     stream = "default", callback=None, auto_id = False):
         """
          :param id: unique data id
          :type id: int
@@ -249,14 +252,16 @@ cdef class PyProducer:
          :type stream: string
          :param callback: callback function, default None
          :type callback: callback(info,err), where info - json string with event header that was used to send data and response, err - error string or None
+         :param auto_id: a flag to assign ids automatically, id must be 0 when auto_id = True
+         :type auto_id: Boolean
          :raises:
             AsapoWrongInputError: wrong input (authorization, meta, ...)
             AsapoProducerError: actually should not happen
         """
         if type(data) == np.ndarray or data == None:
-            self.__send_np_array(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback)
+            self.__send_np_array(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id)
         elif type(data) == bytes:
-            self.__send_bytes(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback)
+            self.__send_bytes(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback,auto_id)
         else:
             raise(AsapoProducerError("wrong data type: " + str(type(data))))
     def send_stream_finished_flag(self, stream, uint64_t last_id, next_stream = None, callback = None):
@@ -371,7 +376,8 @@ cdef class PyProducer:
         if err:
             throw_exception(err)
         return json.loads(_str(info.Json()))
-    def send_file(self, uint64_t id, local_path, exposed_path, user_meta=None, dataset=None, ingest_mode = DEFAULT_INGEST_MODE, stream = "default", callback=None):
+    def send_file(self, uint64_t id, local_path, exposed_path, user_meta=None, dataset=None,
+        ingest_mode = DEFAULT_INGEST_MODE, stream = "default", callback=None, auto_id = False):
         """
          :param id: unique data id
          :type id: int
@@ -389,13 +395,15 @@ cdef class PyProducer:
          :type stream: string
          :param callback: callback function, default None
          :type callback: callback(info,err), where info - json string with event header that was used to send data and response, err - error string or None
+         :param auto_id: a flag to assign ids automatically, id must be 0 when auto_id = True
+         :type auto_id: Boolean
          :raises:
             AsapoWrongInputError: wrong input (authorization, meta, ...)
             AsapoLocalIOError: problems reading file to send
             AsapoProducerError: actually should not happen
         """
 
-        cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode)
+        cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode,auto_id)
         message_header.data_size = 0
         err = self.c_producer.get().SendFile(message_header, _bytes(local_path), ingest_mode, _bytes(stream),
             unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL))
diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py
index 6f4bad158..beb45a12d 100644
--- a/tests/automatic/producer/python_api/producer_api.py
+++ b/tests/automatic/producer/python_api/producer_api.py
@@ -117,8 +117,8 @@ else:
 # wait before sending to another stream so we sure that this stream appears later
 producer.wait_requests_finished(50000)
 
-# send to another stream
-producer.send(1, "processed/" + data_source + "/" + "file9", None,
+# send to another stream with auto id
+producer.send(0, "processed/" + data_source + "/" + "file9", None,auto_id = True,
                    ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, stream="stream/test $", callback=callback)
 
 # wait normal requests finished before sending duplicates
@@ -153,7 +153,7 @@ n = producer.get_requests_queue_size()
 assert_eq(n, 0, "requests in queue")
 
 # send another data to stream stream
-producer.send(2, "processed/" + data_source + "/" + "file10", None,
+producer.send(0, "processed/" + data_source + "/" + "file10", None, auto_id = True,
                    ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, stream="stream/test $", callback=callback)
 
 producer.wait_requests_finished(50000)
-- 
GitLab