From b332bfc3bae19a19cc9d8d3b7ac7d00a020b4ff0 Mon Sep 17 00:00:00 2001
From: Carsten Patzke <carsten.patzke@desy.de>
Date: Fri, 6 Aug 2021 13:13:38 +0200
Subject: [PATCH] Added python consumer code

---
 consumer/api/python/asapo_consumer.pxd        |  2 ++
 consumer/api/python/asapo_consumer.pyx.in     | 22 ++++++++++++++++---
 .../send_recv_streams/check_linux.sh          |  3 ++-
 .../send_recv_streams_python/check_linux.sh   |  2 ++
 .../send_recv_streams.py                      |  4 ++--
 .../check_linux.sh                            |  2 ++
 6 files changed, 29 insertions(+), 6 deletions(-)

diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd
index 1347d8862..ea5e84009 100644
--- a/consumer/api/python/asapo_consumer.pxd
+++ b/consumer/api/python/asapo_consumer.pxd
@@ -42,6 +42,8 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo":
     uint64_t expected_size
     MessageMetas content
   struct  SourceCredentials:
+    string instance_id
+    string pipeline_step
     string beamtime_id
     string data_source
     string user_token
diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in
index f4232d7c7..1a983b0d0 100644
--- a/consumer/api/python/asapo_consumer.pyx.in
+++ b/consumer/api/python/asapo_consumer.pyx.in
@@ -395,11 +395,13 @@ cdef class __PyConsumerFactory:
     def __cinit__(self):
         with nogil:
             self.c_factory = ConsumerFactory()
-    def create_consumer(self,server_name,source_path,has_filesystem,beamtime_id,data_source,token,timeout):
+    def create_consumer(self,server_name,source_path,has_filesystem,instance_id,pipeline_step,beamtime_id,data_source,token,timeout):
         cdef string b_server_name = _bytes(server_name)
         cdef string b_source_path = _bytes(source_path)
         cdef bool b_has_filesystem = has_filesystem
         cdef SourceCredentials source
+        source.instance_id = _bytes(instance_id)
+        source.pipeline_step = _bytes(pipeline_step)
         source.beamtime_id = _bytes(beamtime_id)
         source.user_token = _bytes(token)
         source.data_source = _bytes(data_source)
@@ -412,7 +414,7 @@ cdef class __PyConsumerFactory:
         consumer.c_consumer.get().SetTimeout(timeout)
         return consumer
 
-def create_consumer(server_name,source_path,has_filesystem,beamtime_id,data_source,token,timeout_ms):
+def create_consumer(server_name,source_path,has_filesystem,instance_id,pipeline_step,beamtime_id,data_source,token,timeout_ms):
     """
       :param server_name: Server endpoint (hostname:port)
       :type server_name: string
@@ -420,11 +422,25 @@ def create_consumer(server_name,source_path,has_filesystem,beamtime_id,data_sour
       :type source_path: string
       :param has_filesystem: True if the source_path is accessible locally, otherwise will use file transfer service to get data
       :type has_filesystem: bool
+      :param instance_id: instance id, can be "auto", will create a combination out of hostname and pid
+      :type instance_id: string
+      :param pipeline_step: pipeline step id, can be "auto", "DefaultStep" is used then
+      :type pipeline_step: string
+      :param beamline: beamline name, can be "auto" if beamtime_id is given
+      :type beamline: string
+      :param data_source: name of the data source that produces data
+      :type data_source: string
+      :param token: authorization token
+      :type token: string
+      :param nthreads: ingest mode flag
+      :type nthreads: int
+      :param timeout_ms: send requests timeout in milliseconds
+      :type timeout_ms: int
       :return: consumer object and error. (None,err) if case of error, (consumer, None) if success
       :rtype: Tuple with consumer object and error.
 	"""
     factory = __PyConsumerFactory()
-    return factory.create_consumer(server_name,source_path,has_filesystem, beamtime_id,data_source,token,timeout_ms)
+    return factory.create_consumer(server_name,source_path,has_filesystem,instance_id,pipeline_step,beamtime_id,data_source,token,timeout_ms)
 
 
 __version__ = "@PYTHON_ASAPO_VERSION@@ASAPO_VERSION_COMMIT@"
diff --git a/tests/automatic/full_chain/send_recv_streams/check_linux.sh b/tests/automatic/full_chain/send_recv_streams/check_linux.sh
index d1e48ef1f..c5e1265cb 100644
--- a/tests/automatic/full_chain/send_recv_streams/check_linux.sh
+++ b/tests/automatic/full_chain/send_recv_streams/check_linux.sh
@@ -12,13 +12,14 @@ token=$ASAPO_TEST_RW_TOKEN
 beamline=test
 
 set -e
+set -o pipefail
 
 trap Cleanup EXIT
 
 network_type=$2
 
 Cleanup() {
-    set +e
+  set +e
 	echo "db.dropDatabase()" | mongo ${indatabase_name}
 }
 
diff --git a/tests/automatic/full_chain/send_recv_streams_python/check_linux.sh b/tests/automatic/full_chain/send_recv_streams_python/check_linux.sh
index 7e9b4f3db..46f17934f 100644
--- a/tests/automatic/full_chain/send_recv_streams_python/check_linux.sh
+++ b/tests/automatic/full_chain/send_recv_streams_python/check_linux.sh
@@ -10,11 +10,13 @@ token=$ASAPO_TEST_RW_TOKEN
 beamline=test
 
 set -e
+set -o pipefail
 
 trap Cleanup EXIT
 
 Cleanup() {
   set +e
+  set +o pipefail
 	echo "db.dropDatabase()" | mongo ${indatabase_name}
 }
 
diff --git a/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py b/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py
index 5f48b974a..d603ab01f 100644
--- a/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py
+++ b/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py
@@ -25,8 +25,8 @@ def callback(header,err):
 
 source, beamtime, token = sys.argv[1:]
 
-consumer = asapo_consumer.create_consumer(source,".",True, beamtime,"",token,timeout)
-producer  = asapo_producer.create_producer(source,'processed',beamtime,'auto', "", token, 1, 600000)
+consumer = asapo_consumer.create_consumer(source,".",True,"auto","Consumer",beamtime,"",token,timeout)
+producer = asapo_producer.create_producer(source,'processed',"auto","Producer",beamtime,'auto', "", token, 1, 600000)
 producer.set_log_level("debug")
 
 group_id  = consumer.generate_group_id()
diff --git a/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/check_linux.sh b/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/check_linux.sh
index c6decd530..d6b7a52ed 100644
--- a/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/check_linux.sh
+++ b/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/check_linux.sh
@@ -1,6 +1,7 @@
 #!/usr/bin/env bash
 
 set -e
+set -o pipefail
 
 trap Cleanup EXIT
 
@@ -26,6 +27,7 @@ mkdir -p /tmp/asapo/test_in/processed
 Cleanup() {
     echo cleanup
     set +e
+    set +o pipefail
     if [[ $network_type == "fabric" ]]; then
       nomad stop receiver
       nomad run receiver_tcp.nmd
-- 
GitLab