Skip to content
Snippets Groups Projects

Fix callback

Merged Mikhail Karnevskiy requested to merge fix_callback into develop
2 unresolved threads
Files
4
# conftest.py
import pytest
import logging
import os
from pathlib import Path
import sys
if build_dir_env := os.environ.get("BUILD_DIR"):
build_dir = Path(build_dir_env)
assert build_dir.exists()
else:
build_dir = Path()
sys.path.insert(0, str(build_dir / "producer/api/python/"))
sys.path.insert(0, str(build_dir / "consumer/api/python/"))
import asapo_producer
import asapo_consumer
log = logging.getLogger(__name__)
@@ -25,3 +41,24 @@ def pytest_configure(config):
@pytest.fixture
def asapo_config(request):
return request.config
@pytest.fixture
def producer(asapo_config):
return asapo_producer.create_producer(asapo_config.host, 'processed', asapo_config.beamtime, "auto",
asapo_config.data_source,
asapo_config.token, asapo_config.n_threads, asapo_config.timeout)
@pytest.fixture
def consumer(asapo_config):
return asapo_consumer.create_consumer(asapo_config.host, "auto",
False, asapo_config.beamtime, asapo_config.data_source,
asapo_config.token, timeout_ms=100)
@pytest.fixture(autouse=True)
def cleanup_db(asapo_config):
log.info(f"Run test for consumer {asapo_consumer.__version__} and producer {asapo_producer.__version__}")
os.makedirs(asapo_config.data_path, exist_ok=True)
yield None
Loading