Commit 3ea74236 authored by Tim Schoof's avatar Tim Schoof
Browse files

Remove pipeline step option and always use group id

There doesn't seem to be an obvious use case for setting the pipeline
step explicitly. Therefore, removing the option improves the usability.
parent fe472149
Pipeline #30831 passed with stage
in 1 minute and 3 seconds
......@@ -109,6 +109,12 @@ class Application:
group_id = str(binascii.crc32(sender_data_source.encode()))
self.options["receiver"]["group_id"] = group_id
# Use the receiver group_id (if available) as the default pipeline step
# Otherwise the default "auto" is used
if "group_id" in self.options["receiver"]:
self.options["receiver"]["pipeline_step"] = self.options["receiver"]["group_id"]
self.options["sender"]["pipeline_step"] = self.options["receiver"]["group_id"]
def _setup_logging(self):
log_level = self.options["log_level"]
format = (
......
import logging
import attr
import asapo_consumer
from AsapoWorker.configurable import Configurable, Config
from AsapoWorker.errors import (
......@@ -59,10 +60,10 @@ class SimpleAsapoReceiver:
timeout=Config(
"Allowed time in milliseconds for ASAP::O data access before "
"exception is thrown", type=float, default=3000),
pipeline_step=Config(
"Name of the worker node in a pipeline", type=str, default="auto"),
n_resend_nacs=Config(
"Number if tries to resend unacknowledged messages", type=int, default=0)
"Number if tries to resend unacknowledged messages", type=int, default=0),
# Use attr to hide the argument from the command line and config file
pipeline_step=attr.ib(type=str, default="auto"),
))
group_id = Config(
"The data_source data is divided between all workers with the same "
......@@ -73,10 +74,9 @@ class SimpleAsapoReceiver:
"The name of the stream.", type=str, default="default", init=False)
data_source = Config(
"Name of input data_source", type=str, default="")
pipeline_step = Config(
"Name of the worker node in a pipeline", type=str, default="auto")
n_resend_nacs = Config(
"Number if tries to resend unacknowledged messages", type=int, default=0)
pipeline_step = attr.ib(type=str, default="auto")
@group_id.default
def _generate_group_id(self):
......@@ -416,14 +416,15 @@ class AsapoMetadataReceiver:
has_filesystem=Config(
"Read files directly from filesystem",
type=bool, default=False, init=False),
pipeline_step=Config(
"Name of the worker node in a pipeline", type=str, default="auto"),
# Use attr to hide the argument from the command line and config file
pipeline_step=attr.ib(type=str, default="auto"),
))
group_id = Config(
"The data_source data is divided between all workers with the same "
"group_id. If not given, a unique group id will be generated "
"and the worker will receive the complete data_source.",
type=str)
pipeline_step = attr.ib(type=str, default="auto")
@group_id.default
def _generate_group_id(self):
......
import logging
import threading
import time
import attr
import asapo_consumer
import asapo_producer
from AsapoWorker.configurable import Configurable, Config
......@@ -81,8 +82,8 @@ class AsapoSender:
timeout_producer=Config(
"Allowed time in milliseconds for ASAP::O data access before "
"exception is thrown", type=float, default=30000),
pipeline_step=Config(
"Name of the worker node in a pipeline", type=str, default="auto"),
# Use attr to hide the argument from the command line and config file
pipeline_step=attr.ib(type=str, default="auto"),
))
consumer = Config(
"An ASAP::O consumer consumer", type=asapo_consumer.PyConsumer,
......@@ -120,8 +121,7 @@ class AsapoSender:
retry_delay = Config(
"Seconds between retries in case of connection problems",
type=float, default=3)
pipeline_step = Config(
"Name of the worker node in a pipeline", type=str, default="auto"),
pipeline_step = attr.ib(type=str, default="auto")
_n_queued = Config(
"Length of queue of data waiting to be sent", type=int, default=0,
init=False)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment