Commit 93143d1a authored by Tim Schoof's avatar Tim Schoof
Browse files

Implement asapo api changes

parent 2b0e3a3a
...@@ -7,14 +7,15 @@ from AsapoWorker.errors import ( ...@@ -7,14 +7,15 @@ from AsapoWorker.errors import (
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def create_broker(source, path, beamtime, stream, token, timeout): def create_broker(
source, path, has_filesystem, beamtime, stream, token, timeout):
log.info( log.info(
"Create new broker (source=%s, path=%s, beamtime=%s, " "Create new broker (source=%s, path=%s, has_filesystem=%s, "
"stream=%s, token=%s, timeout=%i).", "beamtime=%s, stream=%s, token=%s, timeout=%i).",
source, path, beamtime, stream, token, timeout) source, path, has_filesystem, beamtime, stream, token, timeout)
try: try:
broker = asapo_consumer.create_server_broker( broker = asapo_consumer.create_server_broker(
source, path, beamtime, stream, token, timeout) source, path, has_filesystem, beamtime, stream, token, timeout)
except asapo_consumer.AsapoWrongInputError as err: except asapo_consumer.AsapoWrongInputError as err:
raise ConfigurationError("Cannot create broker") from err raise ConfigurationError("Cannot create broker") from err
except asapo_consumer.AsapoConsumerError as err: except asapo_consumer.AsapoConsumerError as err:
...@@ -44,6 +45,9 @@ class AsapoReceiver: ...@@ -44,6 +45,9 @@ class AsapoReceiver:
token=Config("Beamtime access token", type=str), token=Config("Beamtime access token", type=str),
stream=Config( stream=Config(
"Name of input stream", type=str, default=""), "Name of input stream", type=str, default=""),
has_filesystem=Config(
"Read files directly from filesystem",
type=bool, default=False),
timeout=Config( timeout=Config(
"Allowed time in milliseconds for ASAP::O data access before " "Allowed time in milliseconds for ASAP::O data access before "
"exception is thrown", type=float, default=3000) "exception is thrown", type=float, default=3000)
......
...@@ -9,24 +9,27 @@ from AsapoWorker.errors import StreamError, ConfigurationError ...@@ -9,24 +9,27 @@ from AsapoWorker.errors import StreamError, ConfigurationError
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def create_producer(source, beamtime, stream, token, nthreads=1): def create_producer(
source, beamtime, beamline, stream, token, nthreads=1, timeout=3000):
timeout = timeout/1000
log.info( log.info(
"Create new producer (source=%s, beamtime=%s, " "Create new producer (source=%s, beamtime=%s, beamline=%s, "
"stream=%s, token=%s, nthreads=%i).", "stream=%s, token=%s, nthreads=%i, timeout=%s).",
source, beamtime, stream, token, nthreads) source, beamtime, beamline, stream, token, nthreads, timeout)
producer = asapo_producer.create_producer( producer = asapo_producer.create_producer(
source, beamtime, stream, token, nthreads) source, beamtime, beamline, stream, token, nthreads, timeout)
return producer return producer
def create_broker(source, path, beamtime, stream, token, timeout): def create_broker(
source, path, has_filesystem, beamtime, stream, token, timeout):
log.info( log.info(
"Create new broker (source=%s, path=%s, beamtime=%s, " "Create new broker (source=%s, path=%s, has_filesystem=%s, "
"stream=%s, token=%s, timeout=%i).", "beamtime=%s, stream=%s, token=%s, timeout=%i).",
source, path, beamtime, stream, token, timeout) source, path, has_filesystem, beamtime, stream, token, timeout)
try: try:
broker = asapo_consumer.create_server_broker( broker = asapo_consumer.create_server_broker(
source, path, beamtime, stream, token, timeout) source, path, has_filesystem, beamtime, stream, token, timeout)
except asapo_consumer.AsapoWrongInputError as err: except asapo_consumer.AsapoWrongInputError as err:
raise ConfigurationError("Cannot create broker") from err raise ConfigurationError("Cannot create broker") from err
except asapo_consumer.AsapoConsumerError as err: except asapo_consumer.AsapoConsumerError as err:
...@@ -64,10 +67,14 @@ class AsapoSender: ...@@ -64,10 +67,14 @@ class AsapoSender:
source=Config("ASAP::O endpoint", type=str), source=Config("ASAP::O endpoint", type=str),
beamtime=Config("Beamtime ID", type=str), beamtime=Config("Beamtime ID", type=str),
token=Config("Beamtime access token", type=str), token=Config("Beamtime access token", type=str),
beamline=Config("Beamline", type=str, default="auto"),
stream=Config( stream=Config(
"Name of output stream", type=str, default=""), "Name of output stream", type=str, default=""),
nthreads=Config( nthreads=Config(
"Number of threads", type=int, default=1) "Number of threads", type=int, default=1),
timeout=Config(
"Allowed time in milliseconds for ASAP::O data access before "
"exception is thrown", type=float, default=3000)
)) ))
broker = Config( broker = Config(
"An ASAP::O consumer broker", type=asapo_consumer.PyDataBroker, "An ASAP::O consumer broker", type=asapo_consumer.PyDataBroker,
...@@ -78,6 +85,9 @@ class AsapoSender: ...@@ -78,6 +85,9 @@ class AsapoSender:
token=Config("Beamtime access token", type=str), token=Config("Beamtime access token", type=str),
stream=Config( stream=Config(
"Name of output stream", type=str, default=""), "Name of output stream", type=str, default=""),
has_filesystem=Config(
"Read files directly from filesystem",
type=bool, default=False),
timeout=Config( timeout=Config(
"Allowed time in milliseconds for ASAP::O data access before " "Allowed time in milliseconds for ASAP::O data access before "
"exception is thrown", type=float, default=3000) "exception is thrown", type=float, default=3000)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment