Commit c07fb73e authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Merge branch 'feat/acknowledge_slim' into 'master'

Feat/acknowledge slim

See merge request fs-sc/asapoworker!30
parents bdfe112c cb06c034
import argparse
import binascii
import logging
import signal
import sys
from AsapoWorker.configuration import (
create_instance_from_configurable,
create_cli_parser_from_configurable_class,
......@@ -75,7 +77,7 @@ class Application:
help="Scheme to chose new stream name")
parser.add_argument(
"--stream", type=str, default="default", metavar="STR",
"--stream", type=str, default="", metavar="STR",
help="The stream where streaming should start")
create_cli_parser_from_configurable_class(
......@@ -139,6 +141,7 @@ class Application:
if not self.sender_consumer:
self.sender_consumer = sender.consumer
self.sender_producer = sender.producer
sender._acknowledge_received = consumer.acknowledge
else:
sender = None
......@@ -149,17 +152,14 @@ class Application:
self.options["metadata_receiver"])
if not stream:
if "stream" in self.options:
stream = self.options["stream"]
if stream:
log.info("Setting stream=%s", stream)
consumer.stream = stream
if sender:
sender.stream = stream
stream = self.get_starting_stream(consumer)
_set_start_position(
self.options, consumer, sender, self.worker_class)
log.info("Setting stream=%s", stream)
consumer.stream = stream
if sender:
sender.stream = stream
worker = create_instance_from_configurable(
self.worker_class, self.options["worker"])
......@@ -170,6 +170,7 @@ class Application:
# id will be ignored and the start id will be calculated from the
# acknowledged ids
_unset_start_position(self.options)
worker._acknowledge = consumer.acknowledge
if sender:
worker.sender = sender
......@@ -191,6 +192,24 @@ class Application:
return streamer
def get_starting_stream(self, consumer):
log.info("Get starting receiver stream")
if self.options["stream"] != '':
return self.options["stream"]
try:
sender_data_source = self.options["sender"]["data_source"]
group_id = str(binascii.crc32(sender_data_source.encode()))
streams = consumer.get_stream_list()
for stream_info in streams:
msgs = consumer.get_unacknowledged_messages(group_id, stream=stream_info["name"])
if msgs:
log.info("Earliest non-processed stream=%s, number of messages=%s",
stream_info["name"], len(msgs))
return stream_info["name"]
except Exception as e:
log.error("Getting non-processed receiver stream fails messages=%s", e)
return 'default'
def run(self):
if not self.initialized:
self.initialize()
......
import logging
import binascii
import asapo_consumer
from AsapoWorker.configurable import Configurable, Config
from AsapoWorker.errors import (
......@@ -117,6 +118,29 @@ class SimpleAsapoReceiver:
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get stream info") from err
def acknowledge(self, name, ids):
for ack_id in ids:
try:
group_id = str(binascii.crc32(name.encode()))
log.info("Acknowledging id=%s stream=%s group_id=%s", ack_id, self.stream, group_id)
self.consumer.acknowledge(group_id, ack_id, stream=self.stream)
except Exception as e:
log.warning("Acknowledging of id=%s stream=%s group_id=%s fails. Reason=%s",
ack_id, self.stream, group_id, e)
def get_unacknowledged_messages(self, group_id, stream):
try:
msgs = self.consumer.get_unacknowledged_messages(group_id, stream=stream)
return msgs
except asapo_consumer.AsapoConsumerError as err:
raise StreamError("Failed to get unacknowledged messages") from err
def get_last_acknowledged_message(self, group_id):
try:
return self.consumer.get_last_acknowledged_message(group_id, stream=self.stream)
except Exception as err:
raise StreamError("Failed to get unacknowledged messages") from err
# TODO: Ensure also that indices are consecutive or start at 0
@Configurable
......@@ -295,7 +319,7 @@ class SerialDatasetAsapoReceiver(SerialAsapoReceiver):
A wrapper for an ASAP::O consumer for dataset processing
This wrapper supports functionality of SerialAsapoReceiver but
expects to receive a dataset
expects to receive a dataset
"""
def _get_next(self, meta_only):
......
......@@ -12,7 +12,7 @@ log = logging.getLogger(__name__)
def create_producer(
source, source_type, beamtime, beamline, data_source, token, nthreads=1,
timeout_producer=30000):
timeout = timeout_producer/1000
timeout = timeout_producer
log.info(
"Create new producer (source=%s, type=%s, beamtime=%s, beamline=%s, "
"data_source=%s, token=%s, nthreads=%i, timeout=%s).",
......@@ -96,6 +96,8 @@ class AsapoSender:
"Allowed time in milliseconds for ASAP::O data access before "
"exception is thrown", type=float, default=3000)
))
data_source = Config(
"Name of output data_source", type=str, default="")
stream = Config(
"The name of the stream.", type=str, default="default", init=False)
ingest_mode = Config(
......@@ -118,10 +120,16 @@ class AsapoSender:
"Length of queue of data waiting to be sent", type=int, default=0,
init=False)
_lock = Config(
"Used internally for concurrent access to the n_queued attribute",
"Used internally for concurrent access from callbacks",
factory=threading.Lock, init=False)
_group_id = Config(
"Generated group id for the ASAP::O consumer", type=str, init=False)
_ids_to_acknowledge = Config(
"Dictionary mapping output ids to input ids that need to be "
"acknowledged when sending is successful", factory=dict,
init=False)
_acknowledge_received = Config(
"Callback to acknowledge id", init=False, default=None)
@_group_id.default
def _generate_group_id(self):
......@@ -145,12 +153,20 @@ class AsapoSender:
def __attrs_post_init__(self):
log.info("Receiver created with ingest_mode=%s", self.ingest_mode)
def send_data(self, data, metadata):
def send_data(self, data, metadata, acknowledge=None, ack_dependencies=None):
log.info(
"Sending data with id=%s name=%s",
metadata["_id"], metadata["name"])
with self._lock:
self._n_queued += 1
if ack_dependencies is None:
ack_dependencies = set([metadata["_id"]])
if acknowledge is None:
acknowledge = []
for out_id in acknowledge:
if out_id not in self._ids_to_acknowledge:
self._ids_to_acknowledge[out_id] = ack_dependencies
try:
self.producer.send(
metadata['_id'], metadata['name'], data,
......@@ -180,15 +196,30 @@ class AsapoSender:
def _callback(self, header, err):
header = {key: val for key, val in header.items() if key != 'data'}
out_id = header["id"]
with self._lock:
assert self._n_queued > 0
self._n_queued -= 1
ids_to_acknowledge = self.get_ids_to_acknowledge(out_id)
if err:
log.error(
"Sending data failed for header=%s with error='%s'",
header, err)
else:
log.info("Successfully sent data for header=%s", header)
if self._acknowledge_received:
self._acknowledge_received(self.data_source, ids_to_acknowledge)
def get_ids_to_acknowledge(self, out_id):
id_to_acknowledge = []
for in_id, ack_ids in self._ids_to_acknowledge.items():
ack_ids.discard(out_id)
if not ack_ids:
id_to_acknowledge.append(in_id)
for in_id in id_to_acknowledge:
del self._ids_to_acknowledge[in_id]
return id_to_acknowledge
def wait(self, timeout=10):
with self._lock:
......@@ -217,7 +248,7 @@ class AsapoSender:
log.info("Successfully finished sending all queued data")
def get_last(self, meta_only=True):
log.info("Requesting last record")
log.info("Requesting last record for stream=%s", self.stream)
try:
data, metadata = self.consumer.get_last(meta_only=meta_only, stream=self.stream)
except asapo_consumer.AsapoEndOfStreamError:
......
import json
import logging
from AsapoWorker.data_handler import get_filename_parts
from AsapoWorker.asapo_sender import AsapoSender
from AsapoWorker.configurable import Configurable, Config
from AsapoWorker.errors import ConfigurationError
log = logging.getLogger(__name__)
def to_set(a):
if a is None or isinstance(a, set):
return a
else:
return set(a)
@Configurable
class Worker:
......@@ -13,6 +23,8 @@ class Worker:
meta_only = Config(
"Get only metadata from receiver",
type=bool, default=False, init=False)
_acknowledge = Config(
"Callback to acknowledge id", init=False, default=None)
def handle_error(self):
pass
......@@ -68,13 +80,17 @@ class Worker:
"""
pass
def send(self, data, metadata):
def send(self, data, metadata, acknowledge=None, ack_dependencies=None):
if self.sender:
self.sender.send_data(data, metadata)
self.sender.send_data(
data, metadata, acknowledge=acknowledge, ack_dependencies=to_set(ack_dependencies))
else:
raise ConfigurationError(
"Worker wants to send data, but no sender configured!")
def send_acknowledgement(self, acknowledge=None):
self._acknowledge(self.sender.data_source, acknowledge)
@Configurable(kw_only=True)
class SimpleWorker(Worker):
......@@ -135,7 +151,7 @@ class SimpleWorker(Worker):
raise ConfigurationError(
"Worker wants to send data, but no sender configured!")
def send(self, data, metadata, extra_meta=None):
def send(self, data, metadata, extra_meta=None, acknowledge=None):
"""Send the data to the configured output stream
The metadata and extra_meta arguments are passed to
......@@ -150,10 +166,11 @@ class SimpleWorker(Worker):
extra_meta: dict (optional)
If given, the 'meta' entry of the output metadata is updated with
the content of this dict
acknowledge: list if receiver message ids to be acknowledged
"""
new_metadata = self.get_output_metadata(metadata, extra_meta)
super().send(data, new_metadata)
super().send(data, new_metadata, acknowledge=acknowledge)
@Configurable(kw_only=True)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment