Commit fe472149 authored by Tim Schoof's avatar Tim Schoof
Browse files

Add pipelin_step option to receiver and sender

parent 8e4f2444
......@@ -9,14 +9,17 @@ log = logging.getLogger(__name__)
def create_consumer(
source, path, has_filesystem, beamtime, data_source, token, timeout, n_resend_nacs=0):
source, path, has_filesystem, beamtime, data_source, token, timeout,
pipeline_step="auto", n_resend_nacs=0):
log.info(
"Create new consumer (source=%s, path=%s, has_filesystem=%s, "
"beamtime=%s, data_source=%s, token=%s, timeout=%i).",
source, path, has_filesystem, beamtime, data_source, token, timeout)
"beamtime=%s, data_source=%s, token=%s, timeout=%i, pipeline_step=%s).",
source, path, has_filesystem, beamtime, data_source, token, timeout,
pipeline_step)
try:
consumer = asapo_consumer.create_consumer(
source, path, has_filesystem, beamtime, data_source, token, timeout)
source, path, has_filesystem, beamtime, data_source, token, timeout,
pipeline_step=pipeline_step)
if n_resend_nacs > 0:
log.info("Resend unacknowledged messages n=%s times", n_resend_nacs)
......@@ -31,10 +34,11 @@ def create_consumer(
def create_metadata_consumer(
source, path, has_filesystem, beamtime, data_source, token, timeout):
source, path, has_filesystem, beamtime, data_source, token, timeout,
pipeline_step="auto"):
return create_consumer(
source, path, has_filesystem, beamtime, data_source + "_metadata", token,
timeout)
timeout, pipeline_step=pipeline_step)
@Configurable
......@@ -55,6 +59,8 @@ class SimpleAsapoReceiver:
timeout=Config(
"Allowed time in milliseconds for ASAP::O data access before "
"exception is thrown", type=float, default=3000),
pipeline_step=Config(
"Name of the worker node in a pipeline", type=str, default="auto"),
n_resend_nacs=Config(
"Number if tries to resend unacknowledged messages", type=int, default=0)
))
......@@ -67,6 +73,8 @@ class SimpleAsapoReceiver:
"The name of the stream.", type=str, default="default", init=False)
data_source = Config(
"Name of input data_source", type=str, default="")
pipeline_step = Config(
"Name of the worker node in a pipeline", type=str, default="auto")
n_resend_nacs = Config(
"Number if tries to resend unacknowledged messages", type=int, default=0)
......@@ -407,7 +415,9 @@ class AsapoMetadataReceiver:
"ASAP::O mount path", type=str, default="", init=False),
has_filesystem=Config(
"Read files directly from filesystem",
type=bool, default=False, init=False)
type=bool, default=False, init=False),
pipeline_step=Config(
"Name of the worker node in a pipeline", type=str, default="auto"),
))
group_id = Config(
"The data_source data is divided between all workers with the same "
......
......@@ -11,14 +11,16 @@ log = logging.getLogger(__name__)
def create_producer(
source, source_type, beamtime, beamline, data_source, token, nthreads=1,
timeout_producer=30000):
timeout_producer=30000, pipeline_step="auto"):
timeout = timeout_producer
log.info(
"Create new producer (source=%s, type=%s, beamtime=%s, beamline=%s, "
"data_source=%s, token=%s, nthreads=%i, timeout=%s).",
source, source_type, beamtime, beamline, data_source, token, nthreads, timeout)
"data_source=%s, token=%s, nthreads=%i, timeout=%s, pipeline_step=%s).",
source, source_type, beamtime, beamline, data_source, token, nthreads, timeout,
pipeline_step)
producer = asapo_producer.create_producer(
source, source_type, beamtime, beamline, data_source, token, nthreads, timeout)
source, source_type, beamtime, beamline, data_source, token, nthreads, timeout,
pipeline_step=pipeline_step)
return producer
......@@ -78,7 +80,9 @@ class AsapoSender:
"Number of threads", type=int, default=1),
timeout_producer=Config(
"Allowed time in milliseconds for ASAP::O data access before "
"exception is thrown", type=float, default=30000)
"exception is thrown", type=float, default=30000),
pipeline_step=Config(
"Name of the worker node in a pipeline", type=str, default="auto"),
))
consumer = Config(
"An ASAP::O consumer consumer", type=asapo_consumer.PyConsumer,
......@@ -116,6 +120,8 @@ class AsapoSender:
retry_delay = Config(
"Seconds between retries in case of connection problems",
type=float, default=3)
pipeline_step = Config(
"Name of the worker node in a pipeline", type=str, default="auto"),
_n_queued = Config(
"Length of queue of data waiting to be sent", type=int, default=0,
init=False)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment