From 4322d2df1d761da0f0ffc4c620885790c27b35ab Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Tue, 11 Feb 2020 13:23:32 +0100
Subject: [PATCH] substreams in Python producer API, test

---
 CHANGELOG.md                                  |  1 +
 consumer/api/python/asapo_consumer.pxd        |  2 +
 consumer/api/python/asapo_consumer.pyx.in     | 37 ++++++++----
 producer/api/python/asapo_producer.pxd        |  5 +-
 producer/api/python/asapo_producer.pyx.in     | 45 ++++++++++----
 tests/automatic/full_chain/CMakeLists.txt     |  3 +-
 .../CMakeLists.txt                            | 14 +++++
 .../check_linux.sh                            | 37 ++++++++++++
 .../check_windows.bat                         | 36 +++++++++++
 .../send_recv_substreams.py                   | 60 +++++++++++++++++++
 10 files changed, 213 insertions(+), 27 deletions(-)
 create mode 100644 tests/automatic/full_chain/send_recv_substreams_python/CMakeLists.txt
 create mode 100644 tests/automatic/full_chain/send_recv_substreams_python/check_linux.sh
 create mode 100644 tests/automatic/full_chain/send_recv_substreams_python/check_windows.bat
 create mode 100644 tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py

diff --git a/CHANGELOG.md b/CHANGELOG.md
index f8c67a30f..513b2886d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,6 +7,7 @@ IMPROVEMENTS
 * receiver use ASAP3 directory structure to save files to
 * API documentation is available at [asapo-docs.desy.de](asapo-docs.desy.de)
 * switch to using cmake 3.7+
+* error messages in Python as Python strings, not byte objects
 
 BUG FIXES
 * consumer operation timout - take duration of the operation into account
\ No newline at end of file
diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd
index 91544e739..1475ad22f 100644
--- a/consumer/api/python/asapo_consumer.pxd
+++ b/consumer/api/python/asapo_consumer.pxd
@@ -72,6 +72,7 @@ cdef extern from "asapo_consumer.h" namespace "asapo" nogil:
 cdef extern from "asapo_consumer.h" namespace "asapo":
   ErrorTemplateInterface kNoData "asapo::ConsumerErrorTemplates::kNoData"
   ErrorTemplateInterface kEndOfStream "asapo::ConsumerErrorTemplates::kEndOfStream"
+  ErrorTemplateInterface kStreamFinished "asapo::ConsumerErrorTemplates::kStreamFinished"
   ErrorTemplateInterface kUnavailableService "asapo::ConsumerErrorTemplates::kUnavailableService"
   ErrorTemplateInterface kInterruptedTransaction "asapo::ConsumerErrorTemplates::kInterruptedTransaction"
   ErrorTemplateInterface kLocalIOError "asapo::ConsumerErrorTemplates::kLocalIOError"
@@ -79,4 +80,5 @@ cdef extern from "asapo_consumer.h" namespace "asapo":
   cdef cppclass ConsumerErrorData:
     uint64_t id
     uint64_t id_max
+    string next_substream
 
diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in
index 791c75456..ad31fd103 100644
--- a/consumer/api/python/asapo_consumer.pyx.in
+++ b/consumer/api/python/asapo_consumer.pyx.in
@@ -37,12 +37,22 @@ class AsapoWrongInputError(AsapoConsumerError):
 class AsapoInterruptedTransactionError(AsapoConsumerError):
   pass
 
+class AsapoStreamFinishedError(AsapoConsumerError):
+  pass
+
 class AsapoUnavailableServiceError(AsapoConsumerError):
   pass
 
 class AsapoLocalIOError(AsapoConsumerError):
   pass
 
+class AsapoStreamFinishedError(AsapoConsumerError):
+  def __init__(self,message,id_max=None,next_substream=None):
+    AsapoConsumerError.__init__(self,message)
+    self.id_max = id_max
+    self.next_substream = _str(next_substream)
+
+
 class AsapoEndOfStreamError(AsapoConsumerError):
   def __init__(self,message,id_max=None):
     AsapoConsumerError.__init__(self,message)
@@ -63,6 +73,12 @@ cdef throw_exception(Error& err):
                 raise AsapoEndOfStreamError(error_string,data.id_max)
             else:
                 raise AsapoEndOfStreamError(error_string)
+    elif err == kStreamFinished:
+            data=<ConsumerErrorData*>err.get().GetCustomData()
+            if data != NULL:
+                raise AsapoStreamFinishedError(error_string,data.id_max,data.next_substream)
+            else:
+                raise AsapoStreamFinishedError(error_string)
     elif err == kNoData:
             data=<ConsumerErrorData*>err.get().GetCustomData()
             if data != NULL:
@@ -81,7 +97,6 @@ cdef throw_exception(Error& err):
         raise AsapoConsumerError(error_string)
 
 cdef class PyDataBroker:
-    _default_stream = "default"
     cdef DataBroker* c_broker
     def _op(self, op, group_id, substream, meta_only, uint64_t id):
         cdef FileInfo info
@@ -111,11 +126,11 @@ cdef class PyDataBroker:
         arr =  np.PyArray_SimpleNewFromData(1, dims, np.NPY_BYTE, ptr)
         PyArray_ENABLEFLAGS(arr,np.NPY_OWNDATA)
         return arr,meta
-    def get_next(self, group_id, substream = _default_stream, meta_only = True):
+    def get_next(self, group_id, substream = "default", meta_only = True):
         return self._op("next",group_id,substream,meta_only,0)
-    def get_last(self, group_id, substream = _default_stream, meta_only = True):
+    def get_last(self, group_id, substream = "default", meta_only = True):
         return self._op("last",group_id,substream,meta_only,0)
-    def get_by_id(self,id,group_id, substream = _default_stream,meta_only = True):
+    def get_by_id(self,id,group_id, substream = "default",meta_only = True):
         return self._op("id",group_id,substream,meta_only,id)
     def retrieve_data(self,meta):
         json_str = json.dumps(meta)
@@ -134,7 +149,7 @@ cdef class PyDataBroker:
         arr =  np.PyArray_SimpleNewFromData(1, dims, np.NPY_BYTE, ptr)
         PyArray_ENABLEFLAGS(arr,np.NPY_OWNDATA)
         return arr
-    def get_current_size(self, substream = _default_stream):
+    def get_current_size(self, substream = "default"):
         cdef Error err
         cdef uint64_t size
         cdef string b_substream = _bytes(substream)
@@ -146,7 +161,7 @@ cdef class PyDataBroker:
         return size
     def set_timeout(self,timeout):
         self.c_broker.SetTimeout(timeout)
-    def set_lastread_marker(self,value,group_id, substream = _default_stream):
+    def set_lastread_marker(self,value,group_id, substream = "default"):
         cdef string b_group_id = _bytes(group_id)
         cdef string b_substream = _bytes(substream)
         cdef Error err
@@ -156,7 +171,7 @@ cdef class PyDataBroker:
         if err:
             throw_exception(err)
         return
-    def reset_lastread_marker(self,group_id, substream = _default_stream):
+    def reset_lastread_marker(self,group_id, substream = "default"):
         cdef string b_group_id = _bytes(group_id)
         cdef string b_substream = _bytes(substream)
         cdef Error err
@@ -185,7 +200,7 @@ cdef class PyDataBroker:
             list.append(_str(substream))
         return list
 
-    def query_images(self,query, substream = _default_stream):
+    def query_images(self,query, substream = "default"):
         cdef string b_query = _bytes(query)
         cdef string b_substream = _bytes(substream)
         cdef Error err
@@ -219,11 +234,11 @@ cdef class PyDataBroker:
         for fi in dataset.content:
             json_list.append(json.loads(_str(fi.Json())))
         return dataset.id, json_list
-    def get_next_dataset(self, group_id, substream = _default_stream):
+    def get_next_dataset(self, group_id, substream = "default"):
         return self._op_dataset("next",group_id,substream,0)
-    def get_last_dataset(self, group_id, substream = _default_stream):
+    def get_last_dataset(self, group_id, substream = "default"):
         return self._op_dataset("last",group_id,substream,0)
-    def get_dataset_by_id(self, id, group_id, substream = _default_stream):
+    def get_dataset_by_id(self, id, group_id, substream = "default"):
         return self._op_dataset("id",group_id,substream,id)
     def get_beamtime_meta(self):
         cdef Error err
diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd
index ce6912c33..4ce8fda3b 100644
--- a/producer/api/python/asapo_producer.pxd
+++ b/producer/api/python/asapo_producer.pxd
@@ -89,12 +89,13 @@ cdef extern from "asapo_producer.h" namespace "asapo" nogil:
     cppclass Producer:
         @staticmethod
         unique_ptr[Producer] Create(string endpoint,uint8_t nthreads,RequestHandlerType type, SourceCredentials source,Error* error)
-        Error SendFile(const EventHeader& event_header, string full_path, uint64_t ingest_mode,RequestCallback callback)
-        Error SendData__(const EventHeader& event_header, void* data, uint64_t ingest_mode,RequestCallback callback)
+        Error SendFile(const EventHeader& event_header, string substream, string full_path, uint64_t ingest_mode,RequestCallback callback)
+        Error SendData__(const EventHeader& event_header, string substream, void* data, uint64_t ingest_mode,RequestCallback callback)
         void StopThreads__()
         void SetLogLevel(LogLevel level)
         uint64_t  GetRequestsQueueSize()
         Error WaitRequestsFinished(uint64_t timeout_ms)
+        Error SendSubstreamFinishedFlag(string substream, uint64_t last_id, string next_substream, RequestCallback callback)
 
 cdef extern from "asapo_producer.h" namespace "asapo":
     uint64_t kDefaultIngestMode
diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in
index e74837c9c..3af967b8d 100644
--- a/producer/api/python/asapo_producer.pyx.in
+++ b/producer/api/python/asapo_producer.pyx.in
@@ -48,14 +48,15 @@ class AsapoTimeOutError(AsapoProducerError):
   pass
 
 cdef python_exception_from_error(Error& err):
+    error_string =  _str(err.get().Explain())
     if err == kTimeout:
-                return AsapoTimeOutError(err.get().Explain())
+                return AsapoTimeOutError(error_string)
     elif err == kWrongInput:
-            return AsapoWrongInputError(err.get().Explain())
+            return AsapoWrongInputError(error_string)
     elif err == kLocalIOError:
-            return AsapoLocalIOError(err.get().Explain())
+            return AsapoLocalIOError(error_string)
     else:
-        return AsapoProducerError(err.get().Explain())
+        return AsapoProducerError(error_string)
 
 cdef throw_exception(Error& err):
     raise python_exception_from_error(err)
@@ -99,14 +100,14 @@ cdef class PyProducer:
             return
          self.c_producer.get().SetLogLevel(log_level)
 
-    def __send_np_array(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None):
+    def __send_np_array(self,int id, exposed_path,data, user_meta=None,subset=None,substream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None):
         cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode)
         event_header.file_id = id
         if data is None:
             event_header.file_size = 0
         else:
             event_header.file_size = data.nbytes
-        err = self.c_producer.get().SendData__(event_header, data_pointer_nparray(data), ingest_mode,
+        err = self.c_producer.get().SendData__(event_header, _bytes(substream), data_pointer_nparray(data),ingest_mode,
             unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_ndarr,
              <void*>self,<void*>callback, <void*>data))
         if err:
@@ -133,10 +134,10 @@ cdef class PyProducer:
             event_header.subset_size = subset[1]
         return event_header
 
-    def __send_bytes(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None):
+    def __send_bytes(self,int id, exposed_path,data, user_meta=None,subset=None, substream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None):
         cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode)
         event_header.file_size = len(data)
-        err = self.c_producer.get().SendData__(event_header, data_pointer_bytes(data), ingest_mode,
+        err = self.c_producer.get().SendData__(event_header,_bytes(substream),  data_pointer_bytes(data), ingest_mode,
             unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr,
              <void*>self,<void*>callback, <void*>data))
         if err:
@@ -146,7 +147,7 @@ cdef class PyProducer:
             Py_XINCREF(<PyObject*>callback)
         return
 
-    def send_data(self, id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None):
+    def send_data(self, id, exposed_path, data, user_meta=None, subset=None, substream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None):
         """
          :param id: unique data id
          :type id: int
@@ -167,13 +168,31 @@ cdef class PyProducer:
             AsapoProducerError: actually should not happen
         """
         if type(data) == np.ndarray or data == None:
-            self.__send_np_array(id,exposed_path,data,user_meta,subset,ingest_mode,callback)
+            self.__send_np_array(id,exposed_path,data,user_meta,subset,substream,ingest_mode,callback)
         elif type(data) == bytes:
-            self.__send_bytes(id,exposed_path,data,user_meta,subset,ingest_mode,callback)
+            self.__send_bytes(id,exposed_path,data,user_meta,subset,substream,ingest_mode,callback)
         else:
             raise(AsapoProducerError("wrong data type: " + str(type(data))))
+    def send_substream_finished_flag(self, substream, last_id, next_substream = None, callback = None):
+        """
+         :param substream: substream name
+         :type substream: string
+         :param id: id of the last record
+         :param next_substream: name of the next substream or None
+         :type substream: string
+         :param callback: callback function, default None
+         :type callback: callback(info,err), where info - json string with event header that was used to send data, err - error string or None
+         :raises:
+            AsapoWrongInputError: wrong input (authorization, meta, ...)
+            AsapoProducerError: actually should not happen
+        """
+        err = self.c_producer.get().SendSubstreamFinishedFlag(_bytes(substream), last_id,_bytes(next_substream) if next_substream != None else "",
+        unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL))
+        if err:
+            throw_exception(err)
+
 
-    def send_file(self, id, local_path, exposed_path,user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None):
+    def send_file(self, id, local_path, exposed_path, user_meta=None, subset=None, substream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None):
         """
          :param id: unique data id
          :type id: int
@@ -197,7 +216,7 @@ cdef class PyProducer:
 
         cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode)
         event_header.file_size = 0
-        err = self.c_producer.get().SendFile(event_header, _bytes(local_path), ingest_mode,
+        err = self.c_producer.get().SendFile(event_header, _bytes(local_path), _bytes(substream), ingest_mode,
             unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL))
         if err:
             throw_exception(err)
diff --git a/tests/automatic/full_chain/CMakeLists.txt b/tests/automatic/full_chain/CMakeLists.txt
index c177a9f4a..ecf39cced 100644
--- a/tests/automatic/full_chain/CMakeLists.txt
+++ b/tests/automatic/full_chain/CMakeLists.txt
@@ -11,4 +11,5 @@ add_subdirectory(simple_chain_filegen_multisource)
 add_subdirectory(simple_chain_filegen_readdata_cache)
 add_subdirectory(simple_chain_filegen_readdata_file)
 add_subdirectory(simple_chain_dataset)
-add_subdirectory(send_recv_substreams)
\ No newline at end of file
+add_subdirectory(send_recv_substreams)
+add_subdirectory(send_recv_substreams_python)
\ No newline at end of file
diff --git a/tests/automatic/full_chain/send_recv_substreams_python/CMakeLists.txt b/tests/automatic/full_chain/send_recv_substreams_python/CMakeLists.txt
new file mode 100644
index 000000000..4312c3ca1
--- /dev/null
+++ b/tests/automatic/full_chain/send_recv_substreams_python/CMakeLists.txt
@@ -0,0 +1,14 @@
+set(TARGET_NAME send_recv_substreams_python)
+prepare_asapo()
+
+if (UNIX)
+    get_target_property(PYTHON_LIBS_CONSUMER python-lib BINARY_DIR)
+    get_target_property(PYTHON_LIBS_PRODUCER python-lib-producer BINARY_DIR)
+else()
+    get_target_property(PYTHON_LIBS_CONSUMER asapo_consumer BINARY_DIR)
+    get_target_property(PYTHON_LIBS_PRODUCER asapo_producer BINARY_DIR)
+endif()
+
+file(TO_NATIVE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/send_recv_substreams.py TEST_SCRIPT )
+
+add_script_test("${TARGET_NAME}" "${Python_EXECUTABLE} ${PYTHON_LIBS_CONSUMER} ${PYTHON_LIBS_PRODUCER} ${TEST_SCRIPT} " nomem)
diff --git a/tests/automatic/full_chain/send_recv_substreams_python/check_linux.sh b/tests/automatic/full_chain/send_recv_substreams_python/check_linux.sh
new file mode 100644
index 000000000..fed341d8c
--- /dev/null
+++ b/tests/automatic/full_chain/send_recv_substreams_python/check_linux.sh
@@ -0,0 +1,37 @@
+#!/usr/bin/env bash
+
+source_path=.
+beamtime_id=asapo_test
+stream_in=detector
+
+indatabase_name=${beamtime_id}_${stream_in}
+token=IEfwsWa0GXky2S3MkxJSUHJT1sI8DD5teRdjBUXVRxk=
+
+beamline=test
+
+set -e
+
+trap Cleanup EXIT
+
+Cleanup() {
+    set +e
+    nomad stop nginx
+    nomad run nginx_kill.nmd  && nomad stop -yes -purge nginx_kill
+    nomad stop discovery
+    nomad stop broker
+    nomad stop receiver
+    nomad stop authorizer
+	echo "db.dropDatabase()" | mongo ${indatabase_name}
+}
+
+nomad run nginx.nmd
+nomad run discovery.nmd
+nomad run broker.nmd
+nomad run receiver.nmd
+nomad run authorizer.nmd
+
+
+export PYTHONPATH=$2:$3:${PYTHONPATH}
+
+
+$1 $4 127.0.0.1:8400 $beamtime_id $token
\ No newline at end of file
diff --git a/tests/automatic/full_chain/send_recv_substreams_python/check_windows.bat b/tests/automatic/full_chain/send_recv_substreams_python/check_windows.bat
new file mode 100644
index 000000000..aa79cf59d
--- /dev/null
+++ b/tests/automatic/full_chain/send_recv_substreams_python/check_windows.bat
@@ -0,0 +1,36 @@
+SET source_path=.
+SET beamtime_id=asapo_test
+SET stream_in=detector
+
+SET indatabase_name=%beamtime_id%_%stream_in%
+
+SET token=IEfwsWa0GXky2S3MkxJSUHJT1sI8DD5teRdjBUXVRxk=
+
+SET beamline=test
+
+SET mongo_exe="c:\Program Files\MongoDB\Server\4.2\bin\mongo.exe"
+
+
+c:\opt\consul\nomad run discovery.nmd
+c:\opt\consul\nomad run broker.nmd
+c:\opt\consul\nomad run nginx.nmd
+c:\opt\consul\nomad run receiver.nmd
+c:\opt\consul\nomad run authorizer.nmd
+
+"%1" 127.0.0.1:8400 %beamtime_id% %token%
+
+goto :clean
+
+:error
+call :clean
+exit /b 1
+
+:clean
+c:\opt\consul\nomad stop discovery
+c:\opt\consul\nomad stop broker
+c:\opt\consul\nomad stop nginx
+c:\opt\consul\nomad run nginx_kill.nmd  && c:\opt\consul\nomad stop -yes -purge nginx_kill
+c:\opt\consul\nomad stop receiver
+c:\opt\consul\nomad stop authorizer
+
+echo db.dropDatabase() | %mongo_exe% %indatabase_name%
diff --git a/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py b/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py
new file mode 100644
index 000000000..a8081ed28
--- /dev/null
+++ b/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py
@@ -0,0 +1,60 @@
+from __future__ import print_function
+
+import asapo_consumer
+import asapo_producer
+import sys
+import os
+
+
+import threading
+lock = threading.Lock()
+
+timeout = 10 * 1000
+
+def assert_eq(val,expected,name):
+    if val != expected:
+        print ("error at "+name)
+        print ('val: ', val,' expected: ',expected)
+        sys.exit(1)
+
+def callback(header,err):
+    lock.acquire() # to print
+    if err is not None:
+        print("could not sent: ",header,err)
+    else:
+        print ("successfuly sent: ",header)
+    lock.release()
+
+source, beamtime, token = sys.argv[1:]
+
+broker = asapo_consumer.create_server_broker(source,".", beamtime,"",token,timeout)
+producer  = asapo_producer.create_producer(source,beamtime, "", token, 1)
+producer.set_log_level("debug")
+
+group_id  = broker.generate_group_id()
+
+n_send = 10
+
+for i in range(n_send):
+    producer.send_data(i+1, "name"+str(i),None,ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY,substream = "substream", callback = callback)
+
+producer.send_substream_finished_flag("substream", 10, next_substream = "next_substream", callback = callback)
+producer.wait_requests_finished(timeout)
+
+n_recv = 0
+substream_finished=False
+while True:
+    try:
+        data, meta = broker.get_next(group_id,substream = "substream", meta_only=True)
+        print ("received: ",meta)
+        n_recv = n_recv + 1
+    except  asapo_consumer.AsapoStreamFinishedError as finished_substream:
+        substream_finished = True
+        assert_eq(finished_substream.id_max, 11, "last id")
+        assert_eq(finished_substream.next_substream, "next_substream", "next substream")
+        break
+
+assert_eq(n_recv, n_send, "send=recv")
+assert_eq(substream_finished, True, "substream finished")
+
+
-- 
GitLab