Skip to content
Snippets Groups Projects
Commit b332bfc3 authored by Carsten Patzke's avatar Carsten Patzke
Browse files

Added python consumer code

parent cf69e02b
No related branches found
No related tags found
No related merge requests found
......@@ -42,6 +42,8 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo":
uint64_t expected_size
MessageMetas content
struct SourceCredentials:
string instance_id
string pipeline_step
string beamtime_id
string data_source
string user_token
......
......@@ -395,11 +395,13 @@ cdef class __PyConsumerFactory:
def __cinit__(self):
with nogil:
self.c_factory = ConsumerFactory()
def create_consumer(self,server_name,source_path,has_filesystem,beamtime_id,data_source,token,timeout):
def create_consumer(self,server_name,source_path,has_filesystem,instance_id,pipeline_step,beamtime_id,data_source,token,timeout):
cdef string b_server_name = _bytes(server_name)
cdef string b_source_path = _bytes(source_path)
cdef bool b_has_filesystem = has_filesystem
cdef SourceCredentials source
source.instance_id = _bytes(instance_id)
source.pipeline_step = _bytes(pipeline_step)
source.beamtime_id = _bytes(beamtime_id)
source.user_token = _bytes(token)
source.data_source = _bytes(data_source)
......@@ -412,7 +414,7 @@ cdef class __PyConsumerFactory:
consumer.c_consumer.get().SetTimeout(timeout)
return consumer
def create_consumer(server_name,source_path,has_filesystem,beamtime_id,data_source,token,timeout_ms):
def create_consumer(server_name,source_path,has_filesystem,instance_id,pipeline_step,beamtime_id,data_source,token,timeout_ms):
"""
:param server_name: Server endpoint (hostname:port)
:type server_name: string
......@@ -420,11 +422,25 @@ def create_consumer(server_name,source_path,has_filesystem,beamtime_id,data_sour
:type source_path: string
:param has_filesystem: True if the source_path is accessible locally, otherwise will use file transfer service to get data
:type has_filesystem: bool
:param instance_id: instance id, can be "auto", will create a combination out of hostname and pid
:type instance_id: string
:param pipeline_step: pipeline step id, can be "auto", "DefaultStep" is used then
:type pipeline_step: string
:param beamline: beamline name, can be "auto" if beamtime_id is given
:type beamline: string
:param data_source: name of the data source that produces data
:type data_source: string
:param token: authorization token
:type token: string
:param nthreads: ingest mode flag
:type nthreads: int
:param timeout_ms: send requests timeout in milliseconds
:type timeout_ms: int
:return: consumer object and error. (None,err) if case of error, (consumer, None) if success
:rtype: Tuple with consumer object and error.
"""
factory = __PyConsumerFactory()
return factory.create_consumer(server_name,source_path,has_filesystem, beamtime_id,data_source,token,timeout_ms)
return factory.create_consumer(server_name,source_path,has_filesystem,instance_id,pipeline_step,beamtime_id,data_source,token,timeout_ms)
__version__ = "@PYTHON_ASAPO_VERSION@@ASAPO_VERSION_COMMIT@"
......@@ -12,13 +12,14 @@ token=$ASAPO_TEST_RW_TOKEN
beamline=test
set -e
set -o pipefail
trap Cleanup EXIT
network_type=$2
Cleanup() {
set +e
set +e
echo "db.dropDatabase()" | mongo ${indatabase_name}
}
......
......@@ -10,11 +10,13 @@ token=$ASAPO_TEST_RW_TOKEN
beamline=test
set -e
set -o pipefail
trap Cleanup EXIT
Cleanup() {
set +e
set +o pipefail
echo "db.dropDatabase()" | mongo ${indatabase_name}
}
......
......@@ -25,8 +25,8 @@ def callback(header,err):
source, beamtime, token = sys.argv[1:]
consumer = asapo_consumer.create_consumer(source,".",True, beamtime,"",token,timeout)
producer = asapo_producer.create_producer(source,'processed',beamtime,'auto', "", token, 1, 600000)
consumer = asapo_consumer.create_consumer(source,".",True,"auto","Consumer",beamtime,"",token,timeout)
producer = asapo_producer.create_producer(source,'processed',"auto","Producer",beamtime,'auto', "", token, 1, 600000)
producer.set_log_level("debug")
group_id = consumer.generate_group_id()
......
#!/usr/bin/env bash
set -e
set -o pipefail
trap Cleanup EXIT
......@@ -26,6 +27,7 @@ mkdir -p /tmp/asapo/test_in/processed
Cleanup() {
echo cleanup
set +e
set +o pipefail
if [[ $network_type == "fabric" ]]; then
nomad stop receiver
nomad run receiver_tcp.nmd
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment