Skip to content
Snippets Groups Projects
Commit 0430ec13 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Merge branch 'fix_callback' into 'develop'

Fix callback

See merge request !265
parents 8f800f19 8385e182
No related branches found
No related tags found
1 merge request!265Fix callback
Pipeline #129124 failed
......@@ -153,7 +153,7 @@ cdef class PyProducer:
err = self.c_producer.get().Send__(message_header, data_pointer(data),ingest_mode,_bytes(stream),
unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_data,
<void*>self,<void*>callback, <void*>data))
<void*>self,<void*>callback if callback != None else NULL, <void*>data))
if err:
throw_exception(err)
......
......@@ -28,6 +28,9 @@ RequestCallback unwrap_callback(RequestCallbackCython callback, void* c_self, vo
RequestCallback unwrap_callback_with_memory(RequestCallbackCythonMemory callback, void* c_self, void* py_func,
void* nd_array) {
if (py_func == NULL) {
return nullptr;
}
RequestCallback wrapper = [ = ](RequestCallbackPayload payload, Error err) -> void {
callback(c_self, py_func, nd_array, std::move(payload), std::move(err));
};
......
# conftest.py
import pytest
import logging
import os
from pathlib import Path
import sys
if build_dir_env := os.environ.get("BUILD_DIR"):
build_dir = Path(build_dir_env)
assert build_dir.exists()
else:
build_dir = Path()
sys.path.insert(0, str(build_dir / "producer/api/python/"))
sys.path.insert(0, str(build_dir / "consumer/api/python/"))
import asapo_producer
import asapo_consumer
log = logging.getLogger(__name__)
......@@ -25,3 +41,24 @@ def pytest_configure(config):
@pytest.fixture
def asapo_config(request):
return request.config
@pytest.fixture
def producer(asapo_config):
return asapo_producer.create_producer(asapo_config.host, 'processed', asapo_config.beamtime, "auto",
asapo_config.data_source,
asapo_config.token, asapo_config.n_threads, asapo_config.timeout)
@pytest.fixture
def consumer(asapo_config):
return asapo_consumer.create_consumer(asapo_config.host, "auto",
False, asapo_config.beamtime, asapo_config.data_source,
asapo_config.token, timeout_ms=100)
@pytest.fixture(autouse=True)
def cleanup_db(asapo_config):
log.info(f"Run test for consumer {asapo_consumer.__version__} and producer {asapo_producer.__version__}")
os.makedirs(asapo_config.data_path, exist_ok=True)
yield None
import pytest
import logging
from time import time
import json
log = logging.getLogger(__name__)
# There tests uses global variable and therefore can not be run in-parallel
callback_results = {'payload': None, 'err': None}
def callback(payload, err):
callback_results['payload'] = payload
callback_results['err'] = err
log.debug(f"callback: {payload}, {err}")
def test_no_callback(producer, consumer):
stream = f"{time()}"
producer.send(1, f"processed/tilt_4_008_{stream}_1.h5",
b"test", user_meta=json.dumps({"ggg": 123}), stream=stream)
producer.send_file(2, f"{__file__}", f"processed/tilt_4_008_{stream}_2.h5",
user_meta=json.dumps({"ggg": 123}), stream=stream)
producer.send_stream_finished_flag(stream, 2)
producer.send_beamtime_meta(json.dumps({'bbb': 234}),
mode='update')
producer.send_stream_meta(json.dumps({'sss': 345}),
mode='update',
stream=stream)
producer.wait_requests_finished(50000)
# sleep(1)
meta = producer.get_stream_meta(stream=stream)
log.info(f"Stream meta {meta}")
assert meta['sss'] == 345
meta = consumer.get_beamtime_meta()
log.info(f"Beamtime meta {meta}")
_, meta = consumer.get_by_id(1, stream=stream, meta_only=False)
log.info(f"Meta {meta}")
_, meta = consumer.get_by_id(2, stream=stream, meta_only=False)
log.info(f"Meta {meta}")
def test_with_callback(producer, consumer):
stream = f"{time()}"
producer.send(1, f"processed/tilt_4_008_{stream}.h5",
b"test", user_meta=json.dumps({"ggg": 123}), stream=stream,
callback=callback)
producer.wait_requests_finished(50000)
assert callback_results['payload']['id'] == 1
producer.send(1, f"processed/tilt_4_008_{stream}.h5",
b"test", user_meta=json.dumps({"ggg": 123}), stream=stream,
callback=callback)
producer.wait_requests_finished(50000)
assert "duplicated request" in str(callback_results['err'])
producer.send_stream_finished_flag(stream, 2, callback=callback)
producer.wait_requests_finished(50000)
assert callback_results['payload']['id'] == 3
producer.send_beamtime_meta(json.dumps({'bbb': 234}),
mode='update', callback=callback)
producer.wait_requests_finished(50000)
assert callback_results['payload']['id'] == 0
producer.send_stream_meta(json.dumps({'sss': 345}),
mode='update',
stream=stream, callback=callback)
producer.wait_requests_finished(50000)
assert callback_results['payload']['buffer'] == f"{stream}.meta"
import pytest
import os
import logging
import sys
from pathlib import Path
from time import time
import json
# Using the environment variable `BUILD_DIR` enables to run the tests from a directory
# different from the directory where `ASAP::O` was built. In particular, the GitLab CI
# job `test-services-linux-debug` defined in `.gitlab-ci.yml` runs the tests from the
# project root directory.
if build_dir_env := os.environ.get("BUILD_DIR"):
build_dir = Path(build_dir_env)
assert build_dir.exists()
else:
build_dir = Path()
sys.path.insert(0, str(build_dir / "producer/api/python/"))
sys.path.insert(0, str(build_dir / "consumer/api/python/"))
import asapo_producer
import asapo_consumer
......@@ -33,27 +17,6 @@ def callback(payload, err):
log.debug(f"callback: {payload.get('id')}, {err}")
@pytest.fixture(autouse=True)
def cleanup_db(asapo_config):
log.info(f"Run test for consumer {asapo_consumer.__version__} and producer {asapo_producer.__version__}")
os.makedirs(asapo_config.data_path, exist_ok=True)
yield None
@pytest.fixture
def producer(asapo_config):
return asapo_producer.create_producer(asapo_config.host, 'processed', asapo_config.beamtime, "auto",
asapo_config.data_source,
asapo_config.token, asapo_config.n_threads, asapo_config.timeout)
@pytest.fixture
def consumer(asapo_config):
return asapo_consumer.create_consumer(asapo_config.host, "auto",
False, asapo_config.beamtime, asapo_config.data_source,
asapo_config.token, timeout_ms=100)
def send_data(producer, idx, stream, dataset_substream=None,
file_name="processed/tilt_4_008_{stream}_{substream}_data_00000{idx}.h5",
user_meta={"ggg": 123}, ingest_mode=asapo_producer.DEFAULT_INGEST_MODE,
......
import pytest
import json
import numpy as np
import os
from time import time
import logging
import sys
from datetime import datetime
from pathlib import Path
log = logging.getLogger(__name__)
# Using the environment variable `BUILD_DIR` enables to run the tests from a directory
# different from the directory where `ASAP::O` was built. In particular, the GitLab CI
# job `test-services-linux-debug` defined in `.gitlab-ci.yml` runs the tests from the
# project root directory.
if build_dir_env := os.environ.get("BUILD_DIR"):
build_dir = Path(build_dir_env)
assert build_dir.exists()
else:
build_dir = Path()
sys.path.insert(0, str(build_dir / "producer/api/python/"))
sys.path.insert(0, str(build_dir / "consumer/api/python/"))
import asapo_producer
import asapo_consumer
......@@ -41,28 +23,6 @@ def callback(payload, err):
log.debug(f"successfuly sent: {payload.get('id')}")
@pytest.fixture(autouse=True)
def cleanup_db(asapo_config):
log.info(f"Run test for consumer {asapo_consumer.__version__} and producer {asapo_producer.__version__}")
os.makedirs(asapo_config.data_path, exist_ok=True)
yield None
os.system(f"rm -fr {asapo_config.data_path}")
@pytest.fixture
def producer(asapo_config):
return asapo_producer.create_producer(asapo_config.host, 'processed', asapo_config.beamtime, "auto",
asapo_config.data_source,
asapo_config.token, 1, 6000)
@pytest.fixture
def consumer(asapo_config):
return asapo_consumer.create_consumer(asapo_config.host, "auto",
False, asapo_config.beamtime, asapo_config.data_source,
asapo_config.token, 100)
def send_data(producer, idx, stream, substream=None, n_modules=3):
producer.send(idx, # message number. Should be unique and ordered.
f"processed/tilt_4_008_{stream}_{substream}_data_00000{idx}.h5",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment