diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 1347d886237eb27133ff20d87f2a11d3e1e7a54f..ea5e840093c815ed52791e085e5d621e9fa8a986 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -42,6 +42,8 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo": uint64_t expected_size MessageMetas content struct SourceCredentials: + string instance_id + string pipeline_step string beamtime_id string data_source string user_token diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index f4232d7c74a914ee65d19a50f82d9782ac7fc977..1a983b0d0a3efb82f1f03ba15642ca0f09d01af0 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -395,11 +395,13 @@ cdef class __PyConsumerFactory: def __cinit__(self): with nogil: self.c_factory = ConsumerFactory() - def create_consumer(self,server_name,source_path,has_filesystem,beamtime_id,data_source,token,timeout): + def create_consumer(self,server_name,source_path,has_filesystem,instance_id,pipeline_step,beamtime_id,data_source,token,timeout): cdef string b_server_name = _bytes(server_name) cdef string b_source_path = _bytes(source_path) cdef bool b_has_filesystem = has_filesystem cdef SourceCredentials source + source.instance_id = _bytes(instance_id) + source.pipeline_step = _bytes(pipeline_step) source.beamtime_id = _bytes(beamtime_id) source.user_token = _bytes(token) source.data_source = _bytes(data_source) @@ -412,7 +414,7 @@ cdef class __PyConsumerFactory: consumer.c_consumer.get().SetTimeout(timeout) return consumer -def create_consumer(server_name,source_path,has_filesystem,beamtime_id,data_source,token,timeout_ms): +def create_consumer(server_name,source_path,has_filesystem,instance_id,pipeline_step,beamtime_id,data_source,token,timeout_ms): """ :param server_name: Server endpoint (hostname:port) :type server_name: string @@ -420,11 +422,25 @@ def create_consumer(server_name,source_path,has_filesystem,beamtime_id,data_sour :type source_path: string :param has_filesystem: True if the source_path is accessible locally, otherwise will use file transfer service to get data :type has_filesystem: bool + :param instance_id: instance id, can be "auto", will create a combination out of hostname and pid + :type instance_id: string + :param pipeline_step: pipeline step id, can be "auto", "DefaultStep" is used then + :type pipeline_step: string + :param beamline: beamline name, can be "auto" if beamtime_id is given + :type beamline: string + :param data_source: name of the data source that produces data + :type data_source: string + :param token: authorization token + :type token: string + :param nthreads: ingest mode flag + :type nthreads: int + :param timeout_ms: send requests timeout in milliseconds + :type timeout_ms: int :return: consumer object and error. (None,err) if case of error, (consumer, None) if success :rtype: Tuple with consumer object and error. """ factory = __PyConsumerFactory() - return factory.create_consumer(server_name,source_path,has_filesystem, beamtime_id,data_source,token,timeout_ms) + return factory.create_consumer(server_name,source_path,has_filesystem,instance_id,pipeline_step,beamtime_id,data_source,token,timeout_ms) __version__ = "@PYTHON_ASAPO_VERSION@@ASAPO_VERSION_COMMIT@" diff --git a/tests/automatic/full_chain/send_recv_streams/check_linux.sh b/tests/automatic/full_chain/send_recv_streams/check_linux.sh index d1e48ef1f0f726694ac5ae067b5e5dc3f932717b..c5e1265cbe94fd5ebe2696b9fd2f4830ce59b578 100644 --- a/tests/automatic/full_chain/send_recv_streams/check_linux.sh +++ b/tests/automatic/full_chain/send_recv_streams/check_linux.sh @@ -12,13 +12,14 @@ token=$ASAPO_TEST_RW_TOKEN beamline=test set -e +set -o pipefail trap Cleanup EXIT network_type=$2 Cleanup() { - set +e + set +e echo "db.dropDatabase()" | mongo ${indatabase_name} } diff --git a/tests/automatic/full_chain/send_recv_streams_python/check_linux.sh b/tests/automatic/full_chain/send_recv_streams_python/check_linux.sh index 7e9b4f3db1fc1c8868bfef0a128df7ed97fc7029..46f17934f7ecd52ef6f533bc58159f77a3a08920 100644 --- a/tests/automatic/full_chain/send_recv_streams_python/check_linux.sh +++ b/tests/automatic/full_chain/send_recv_streams_python/check_linux.sh @@ -10,11 +10,13 @@ token=$ASAPO_TEST_RW_TOKEN beamline=test set -e +set -o pipefail trap Cleanup EXIT Cleanup() { set +e + set +o pipefail echo "db.dropDatabase()" | mongo ${indatabase_name} } diff --git a/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py b/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py index 5f48b974a311c3bb9cc7278465c49ac3c0712f04..d603ab01f57ab6136aa096ae35fac4dd2c299aca 100644 --- a/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py +++ b/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py @@ -25,8 +25,8 @@ def callback(header,err): source, beamtime, token = sys.argv[1:] -consumer = asapo_consumer.create_consumer(source,".",True, beamtime,"",token,timeout) -producer = asapo_producer.create_producer(source,'processed',beamtime,'auto', "", token, 1, 600000) +consumer = asapo_consumer.create_consumer(source,".",True,"auto","Consumer",beamtime,"",token,timeout) +producer = asapo_producer.create_producer(source,'processed',"auto","Producer",beamtime,'auto', "", token, 1, 600000) producer.set_log_level("debug") group_id = consumer.generate_group_id() diff --git a/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/check_linux.sh b/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/check_linux.sh index c6decd530ef5d84120d8f9aa5896645c917b1f30..d6b7a52ed24a3a1ec24e09c392038ef2f4b0f890 100644 --- a/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/check_linux.sh +++ b/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/check_linux.sh @@ -1,6 +1,7 @@ #!/usr/bin/env bash set -e +set -o pipefail trap Cleanup EXIT @@ -26,6 +27,7 @@ mkdir -p /tmp/asapo/test_in/processed Cleanup() { echo cleanup set +e + set +o pipefail if [[ $network_type == "fabric" ]]; then nomad stop receiver nomad run receiver_tcp.nmd