Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • asapo/asapo
  • joao.alvim/asapo
  • philipp.middendorf/asapo
  • stefan.dietrich/asapo
4 results
Show changes
......@@ -2,4 +2,4 @@ del test_file
icacls test_noaccess /grant:r *S-1-1-0:F
rmdir /S /Q test_noaccess
rmdir /S /Q folder
\ No newline at end of file
rmdir /S /Q folder
......@@ -2,4 +2,3 @@
mkdir test_noaccess
chmod -rx test_noaccess
mkdir test_noaccess
icacls test_noaccess /deny *S-1-1-0:(GW)
import argparse
import json
import sys
import threading
import time
import uuid
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor, wait
from multiprocessing import Manager, set_start_method
from pathlib import Path
import asapo_consumer
import asapo_producer
import numpy as np
required_options = {
"endpoint",
"beamline",
"token",
}
default_config = {
"token_file": None,
"beamtime": "auto",
"n_threads": 4,
"producer_timeout": 60_000,
"consumer_timeout": 3_000,
"buffer_size": 1000,
"write_to_core": False,
"do_not_store_in_filesystem": False,
"has_filesystem": False,
"filesystem_path": "auto",
"ordered_consumer": False,
"meta_only_consumer": False,
"substream_consumer": False,
"resend_timeout": 10000,
"resend_attempts": 3,
"debug": False,
}
def parse_args(argv):
parser = argparse.ArgumentParser()
# Performance test options
parser.add_argument(
"--data_sources",
type=int,
help="Number of data sources used in parallel",
default=1,
)
parser.add_argument(
"--streams", type=int, help="Number of streams per data_source", default=1
)
parser.add_argument(
"--concurrent_streams",
type=int,
help="Number of streams per data_source to use in parallel",
default=1,
)
parser.add_argument(
"--messages", type=int, help="Number of messages per stream", default=1
)
parser.add_argument(
"--consumer_groups", type=int, help="Number of consumer groups", default=1
)
parser.add_argument(
"--consumers",
type=int,
help="Number of parallel consumers per concurrent stream and consumer group",
default=1,
)
parser.add_argument(
"--producers",
type=int,
help="Number of parallel producers per concurrent stream",
default=1,
)
parser.add_argument(
"--messages_per_second",
type=int,
help="If > 0, maximum number of messages a producer will send per second",
default=0,
)
parser.add_argument(
"--dataset_size",
type=int,
help="If > 0, datasets of the specified size will be sent and received",
default=0,
)
parser.add_argument(
"--data_n_bytes",
type=int,
help="Size of the data blob in bytes (the actual size might be larger if the given value is too small)",
default=1000,
)
# ASAPO options
parser.add_argument("--endpoint", type=str, help="URL of the asapo endpoint")
parser.add_argument(
"--beamtime",
type=str,
help=f"The current beamtime (default: '{default_config['beamtime']}')",
)
parser.add_argument("--beamline", type=str, help="The beamline of the detector")
parser.add_argument("--token", type=str, help="Asapo token for authentication")
parser.add_argument(
"--token_file",
type=str,
help=(
"File containing the Asapo token for authentication. "
"Can be used instead of --token"
),
)
parser.add_argument(
"--n_threads",
type=int,
help=f"Number of producer threads (default: {default_config['n_threads']})",
)
parser.add_argument(
"--producer_timeout",
type=int,
help=f"Producer timeout in milliseconds (default: {default_config['producer_timeout']})",
)
parser.add_argument(
"--buffer_size",
type=int,
help=(
"Number of messages that can be queued "
f"(default: {default_config['buffer_size']})"
),
)
parser.add_argument(
"--write_to_core",
action="store_true",
help="Let ASAPO::O write to core instead of beamline FS",
)
parser.add_argument(
"--do_not_store_in_filesystem",
action="store_true",
help="ASAPO::O will not write the data to disk",
)
parser.add_argument(
"--consumer_timeout",
type=int,
help=f"Producer timeout in milliseconds (default: {default_config['consumer_timeout']})",
)
parser.add_argument(
"--has_filesystem",
action="store_true",
help=f"Consumers have direct access to the filesystem (default: {default_config['has_filesystem']})",
)
parser.add_argument(
"--filesystem_path",
type=int,
help=f"Filesystem path for direct access to the data (default: {default_config['filesystem_path']})",
)
parser.add_argument(
"--ordered_consumer",
action="store_true",
help=f"Use ordered=True for get_next (default: {default_config['ordered_consumer']})",
)
parser.add_argument(
"--meta_only_consumer",
action="store_true",
help=f"Use meta_only=True for get_next (default: {default_config['meta_only_consumer']})",
)
parser.add_argument(
"--substream_consumer",
action="store_true",
help=f"Start consumers for each substream (default: {default_config['substream_consumer']})",
)
parser.add_argument(
"--resend_timeout",
type=int,
help=f"Delay in milliseconds before unacknowledged messages are resent (default: {default_config['resend_timeout']})",
)
parser.add_argument(
"--resend_attempts",
type=int,
help=f"Maximum number of time unacknowledged messages are resent (default: {default_config['resend_attempts']})",
)
args = vars(parser.parse_args(argv))
return args
def load_token_file(filename):
return Path(filename).read_text().strip()
def get_config(argv, default_config=default_config):
args = parse_args(argv)
# Start with the default config
config = default_config.copy()
# Overwrite config with command line arguments
for key, value in args.items():
if value is not None:
config[key] = value
token_file = config["token_file"]
if token_file:
if "token" in config:
raise ValueError("token_file and token cannot be used at the same time")
config["token"] = load_token_file(token_file)
missing_required_options = required_options.difference(config)
if missing_required_options:
raise KeyError(f"Missing required options: {missing_required_options}")
return config
def producer_process(
barrier,
producer_config,
data_source,
stream_names,
ingest_mode,
n_messages_per_stream,
substream,
indices,
send_interval,
data_n_bytes,
):
lock = threading.Lock()
counter = 0
try:
# Using a callback is useful for three reasons:
# * Older versions don't work without callback
# * Callbacks might impact performance and therefore should be included in a stress test
# * It allows counting the number of messages sent without errors
def callback(payload, err):
nonlocal counter
if err:
print(err)
else:
# TODO: This ignores messages that have been sent with a warning
with lock:
counter += 1
producer = asapo_producer.create_producer(
**producer_config, data_source=data_source
)
print("Producer initialized")
barrier.wait()
next_send_time = time.time()
for stream in stream_names:
for i in indices:
index = i + 1
assert index <= n_messages_per_stream
if substream:
name = (
f"processed/{data_source}/{stream}-{substream[0]}/{index}.txt"
)
else:
name = f"processed/{data_source}/{stream}/{index}.txt"
encoded_name = name.encode()
# Use uninitialized memory because we do not care about the content
data = np.empty(
max(2 * len(encoded_name) + 2, data_n_bytes), dtype="uint8"
)
# Write the name to the beginning and end of the buffer to that we can check
# at the receiver side that we did not receive partial data
memoryview(data)[: len(encoded_name)] = encoded_name
memoryview(data)[-len(encoded_name) :] = encoded_name
# Delay the time until the end of the next send interval
sleep_time = next_send_time - time.time()
if sleep_time > 0:
time.sleep(sleep_time)
user_meta = {
"name": name,
# Add a custom timestamp to measure the time from calling send to return from receive
# The timestamp that ASAPO adds to the message could already be delayed
"timestamp": time.time(),
}
user_meta_json = json.dumps(user_meta)
producer.send(
index,
name,
data,
stream=stream,
dataset=substream,
user_meta=user_meta_json,
ingest_mode=ingest_mode,
callback=callback,
)
next_send_time += send_interval
producer.send_stream_finished_flag(stream, n_messages_per_stream)
producer.wait_requests_finished(producer_config["timeout_ms"])
except Exception as err:
print("ERROR:", repr(err))
return counter
def get_next_dataset(
consumer,
group_id,
stream,
min_size,
meta_only,
ordered=False,
substream=0,
):
message = consumer.get_next_dataset(
group_id, min_size, stream=stream, ordered=ordered, substream=substream
)
return retrieve_dataset(consumer, message, meta_only)
def retrieve_dataset(consumer, message, meta_only):
current_id = message["id"]
dataset_size = message["expected_size"]
metadata_list = message["content"]
substream_messages = []
for meta in metadata_list:
assert current_id == meta["_id"]
assert 0 < meta["dataset_substream"] <= dataset_size
if meta_only:
data = None
else:
data = consumer.retrieve_data(meta)
substream_messages.append((data, meta))
return substream_messages
def consumer_process(
barrier,
consumer_config,
data_source,
group_id,
stream_names,
n_messages_per_stream,
dataset_size,
ordered_consumer,
meta_only,
substream,
resend_timeout,
resend_attempts,
):
received_indices_per_stream = {}
latencies = []
try:
consumer = asapo_consumer.create_consumer(
**consumer_config, data_source=data_source
)
if resend_attempts > 0:
consumer.set_resend_nacs(True, resend_timeout, resend_attempts)
barrier.wait()
for stream in stream_names:
received_indices = np.zeros(n_messages_per_stream, dtype=bool)
while True:
try:
if dataset_size > 0:
messages = get_next_dataset(
consumer,
group_id,
stream=stream,
meta_only=meta_only,
ordered=ordered_consumer,
min_size=0, # only complete datasets (ignored if substream)
substream=substream,
)
if substream > 0:
assert len(messages) == 1
else:
assert len(messages) == dataset_size
else:
message = consumer.get_next(
group_id,
stream=stream,
meta_only=meta_only,
ordered=ordered_consumer,
)
messages = [message]
# All indices should be the same (checked in `get_next_dataset`),
# therefore just take the first
index = messages[0][1]["_id"]
if resend_attempts == 0:
# There should be no duplicated messages
assert not received_indices[index - 1]
for data, meta in messages:
if not meta_only:
encoded_name = meta["name"].encode()
assert memoryview(data)[: len(encoded_name)] == encoded_name
assert (
memoryview(data)[-len(encoded_name) :] == encoded_name
)
if not received_indices[index - 1]:
# Avoid measuring latency for resent messages because these skew
# the statistics (messages resent to different consumers are
# still counted because there is no easy way to prevent that)
now = time.time()
latencies.append(now - meta["meta"]["timestamp"])
received_indices[index - 1] = True
if resend_attempts > 0:
consumer.acknowledge(group_id, index, stream=stream)
except asapo_consumer.AsapoEndOfStreamError:
continue
except asapo_consumer.AsapoStreamFinishedError:
break
received_indices_per_stream[stream] = received_indices
except Exception as err:
print("ERROR:", repr(err))
return {data_source: {group_id: received_indices_per_stream}}, latencies
def run_benchmark(
n_data_sources,
n_streams,
n_concurrent_streams,
n_messages_per_stream,
n_group_ids,
n_producers_per_concurrent_stream,
n_consumers_per_concurrent_stream_and_group_id,
dataset_size,
data_n_bytes,
messages_per_second,
data_source_prefix,
producer_config,
ingest_mode,
consumer_config,
ordered_consumer,
meta_only_consumer,
substream_consumer,
resend_timeout,
resend_attempts,
):
if n_streams % n_concurrent_streams != 0:
raise ValueError("n_streams must be a multiple of n_concurrent_streams")
if substream_consumer and dataset_size == 0:
raise ValueError("substream_consumer is only supported for dataset_size > 0")
n_substreams = 1 if dataset_size == 0 else dataset_size
n_messages_total = n_data_sources * n_streams * n_substreams * n_messages_per_stream
n_producers = (
n_data_sources
* n_substreams
* n_concurrent_streams
* n_producers_per_concurrent_stream
)
if substream_consumer:
n_substream_consumers = n_substreams
else:
n_substream_consumers = 1
n_consumers = (
n_data_sources
* n_concurrent_streams
* n_group_ids
* n_substream_consumers
* n_consumers_per_concurrent_stream_and_group_id
)
print("Number of producers:", n_producers)
print("Number of consumers:", n_consumers)
n_processes = n_producers + n_consumers
if messages_per_second <= 0:
send_interval = 0
else:
send_interval = 1 / messages_per_second
manager = Manager()
barrier = manager.Barrier(n_processes + 1)
producer_futures = []
consumer_futures = []
with ProcessPoolExecutor(max_workers=n_processes + 1) as pool:
print(f"Starting {n_processes} processes...")
for data_source_idx in range(n_data_sources):
data_source = f"{data_source_prefix}_{data_source_idx}"
for concurrent_stream_idx in range(n_concurrent_streams):
stream_names = [
f"stream_{i}"
for i in range(
concurrent_stream_idx, n_streams, n_concurrent_streams
)
]
for substream_idx in range(1, n_substreams + 1):
substream = (
None if dataset_size == 0 else (substream_idx, dataset_size)
)
for producer_idx in range(n_producers_per_concurrent_stream):
indices = range(
producer_idx,
n_messages_per_stream,
n_producers_per_concurrent_stream,
)
future = pool.submit(
producer_process,
barrier,
producer_config,
data_source,
stream_names,
ingest_mode,
n_messages_per_stream,
substream=substream,
indices=indices,
send_interval=send_interval,
data_n_bytes=data_n_bytes,
)
producer_futures.append(future)
for group_id_idx in range(n_group_ids):
group_id = f"group_id_{group_id_idx}"
for substream_consumer_idx in range(1, n_substream_consumers + 1):
substream = substream_consumer_idx if substream_consumer else 0
for consumer_idx in range(
n_consumers_per_concurrent_stream_and_group_id
):
future = pool.submit(
consumer_process,
barrier,
consumer_config,
data_source,
group_id,
stream_names,
n_messages_per_stream,
dataset_size=dataset_size,
ordered_consumer=ordered_consumer,
meta_only=meta_only_consumer,
substream=substream,
resend_timeout=resend_timeout,
resend_attempts=resend_attempts,
)
consumer_futures.append(future)
assert len(producer_futures) == n_producers
assert len(consumer_futures) == n_consumers
print("Waiting for processes to initialize...")
barrier.wait()
start = time.time()
print("Waiting for producers to finish...")
wait(producer_futures)
producer_duration = time.time() - start
n_messages_sent = sum(future.result() for future in producer_futures)
producer_rate = n_messages_sent / producer_duration
print(
f"Producers finished sending {n_messages_sent} messages in {producer_duration} s: {producer_rate} messages/s"
)
print("Waiting for consumers to finish...")
wait(consumer_futures)
consumer_duration = time.time() - start
received_messages = defaultdict(
lambda: defaultdict(
lambda: defaultdict(lambda: np.zeros(n_messages_per_stream, dtype=bool))
)
)
combined_latencies = []
for future in consumer_futures:
data_source_dict, latencies = future.result()
combined_latencies.extend(latencies)
for data_source, group_id_dict in data_source_dict.items():
for group_id, stream_dict in group_id_dict.items():
for stream, hits in stream_dict.items():
if resend_attempts == 0:
# There should be no duplicated messages
assert not np.any(
received_messages[data_source][group_id][stream] & hits
)
received_messages[data_source][group_id][stream] |= hits
total_received = 0
for data_source, data_source_dict in received_messages.items():
for group_id, group_id_dict in data_source_dict.items():
for stream, hits in group_id_dict.items():
n = np.sum(hits)
total_received += (
n * n_substreams
) # currently, only complete datasets are received
if n != n_messages_per_stream:
print(
f"ERROR: Only received {n} messages for stream {stream} and group_id {group_id} of data_source {data_source}"
)
consumer_rate = total_received / consumer_duration
print(
f"Consumers finished receiving {total_received} messages in {consumer_duration} s: {consumer_rate} messages/s"
)
print("Latency statistics:")
print("mean: ", np.mean(combined_latencies))
print("min: ", np.min(combined_latencies))
print("max: ", np.max(combined_latencies))
print("median:", np.median(combined_latencies))
print("95%: ", np.percentile(combined_latencies, 95))
assert n_messages_sent == n_messages_total
assert total_received == n_messages_total * n_group_ids
def main(argv=sys.argv[1:]):
config = get_config(argv)
if config["write_to_core"] and config["do_not_store_in_filesystem"]:
raise ValueError(
"Conflicting options: '--write_to_core' and '--do_not_store_in_filesystem'"
)
if config["write_to_core"]:
ingest_mode = (
asapo_producer.DEFAULT_INGEST_MODE
| asapo_producer.INGEST_MODE_WRITE_RAW_DATA_TO_OFFLINE_FS
)
elif config["do_not_store_in_filesystem"]:
ingest_mode = (
asapo_producer.INGEST_MODE_TRANSFER_DATA
| asapo_producer.INGEST_MODE_STORE_IN_DATABASE
)
else:
ingest_mode = asapo_producer.DEFAULT_INGEST_MODE
producer_config = {
"endpoint": config["endpoint"],
"type": "processed",
"beamtime_id": config["beamtime"],
"beamline": config["beamline"],
"token": config["token"],
"nthreads": config["n_threads"],
"timeout_ms": config["producer_timeout"],
}
consumer_config = {
"server_name": config["endpoint"],
"beamtime_id": config["beamtime"],
"token": config["token"],
"timeout_ms": config["consumer_timeout"],
"has_filesystem": config["has_filesystem"],
"source_path": config["filesystem_path"],
}
data_source_prefix = "data_source_" + str(uuid.uuid4())
print("Using data_source uuid", data_source_prefix)
run_benchmark(
n_data_sources=config["data_sources"],
n_streams=config["streams"],
n_concurrent_streams=config["concurrent_streams"],
n_messages_per_stream=config["messages"],
n_group_ids=config["consumer_groups"],
n_producers_per_concurrent_stream=config["producers"],
n_consumers_per_concurrent_stream_and_group_id=config["consumers"],
dataset_size=config["dataset_size"],
data_n_bytes=config["data_n_bytes"],
messages_per_second=config["messages_per_second"],
data_source_prefix=data_source_prefix,
producer_config=producer_config,
ingest_mode=ingest_mode,
consumer_config=consumer_config,
ordered_consumer=config["ordered_consumer"],
meta_only_consumer=config["meta_only_consumer"],
substream_consumer=config["substream_consumer"],
resend_timeout=config["resend_timeout"],
resend_attempts=config["resend_attempts"],
)
if __name__ == "__main__":
set_start_method("spawn")
main()
import random
import subprocess
import sys
from pathlib import Path
import pytest
@pytest.mark.parametrize("i", range(10))
def test_stress_test_random_arguments(i):
asapo_options = [
"--endpoint",
"localhost:8400",
"--beamline",
"auto",
"--beamtime",
"asapo_test",
"--token_file",
Path(__file__).parent.parent / "automatic/pytests/standalone_token.txt",
"--consumer_timeout",
"10000",
]
possible_arguments = [
["--data_sources", "2"],
["--streams", "2"],
["--messages", "10"],
["--consumer_groups", "2"],
["--consumers", "2"],
["--producers", "2"],
["--messages_per_second", "100"],
["--dataset_size", "2"],
["--data_n_bytes", "2000"],
["--ordered_consumer"],
["--meta_only_consumer"],
["--resend_attempts", "0"],
]
arguments = random.sample(
possible_arguments, random.randint(0, len(possible_arguments))
)
arguments = [arg for args in arguments for arg in args]
if "--streams" in arguments:
if random.random() < 0.5:
arguments.extend(["--concurrent_streams", "2"])
if "--dataset_size" in arguments:
if random.random() < 0.5:
arguments.extend(["--substream_consumer"])
arguments.extend(asapo_options)
print(arguments)
proc = subprocess.run(
[sys.executable, Path(__file__).parent / "stress_test.py"] + arguments
)
assert proc.returncode == 0
......@@ -120,5 +120,3 @@
fun:fi_accept
fun:rxm_msg_process_connreq
}