diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fc28876fe5513b5153ad7706cf0e5320a364636..1b04c08a883546bb55ed6daf6289f6bb9da1ea14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,12 @@ ## 21.03.1 +IMPROVEMENTS +* Consumer API - retry file delivery/reading with timeout (can be useful for the case file arrives after metadta ingested asapo, e.g. for slow NFS transfer,...) + BUG FIXES -* fix LDAP authorization for raw data type Producers +* Core services: fix LDAP authorization for raw data type Producers +* Consumer API: fix race condition in GetStreamList/get_stream_list +* Producer API: fix segfault in send_stream_finished_flag ## 21.03.0 diff --git a/broker/src/asapo_broker/database/mongodb_streams.go b/broker/src/asapo_broker/database/mongodb_streams.go index ce1ce2cdac11aa19bba2c9fec166d4997a63b6f2..4d32341c6044570d9c3028f5770e7f08e8c72c63 100644 --- a/broker/src/asapo_broker/database/mongodb_streams.go +++ b/broker/src/asapo_broker/database/mongodb_streams.go @@ -28,14 +28,14 @@ type StreamsRecord struct { type Streams struct { records map[string]StreamsRecord - lastUpdated int64 + lastUpdated map[string]int64 } -var streams = Streams{lastUpdated: 0, records: make(map[string]StreamsRecord, 0)} +var streams = Streams{lastUpdated: make(map[string]int64, 0), records: make(map[string]StreamsRecord, 0)} var streamsLock sync.Mutex func (ss *Streams) tryGetFromCache(db_name string, updatePeriodMs int) (StreamsRecord, error) { - if ss.lastUpdated < time.Now().UnixNano()-int64(updatePeriodMs*1000000) { + if ss.lastUpdated[db_name] < time.Now().UnixNano()-int64(updatePeriodMs*1000000) { return StreamsRecord{}, errors.New("cache expired") } rec, ok := ss.records[db_name] @@ -173,7 +173,7 @@ func (ss *Streams) updateFromDb(db *Mongodb, db_name string) (StreamsRecord, err res :=StreamsRecord{} utils.DeepCopy(rec,&res) ss.records[db_name] = res - ss.lastUpdated = time.Now().UnixNano() + ss.lastUpdated[db_name] = time.Now().UnixNano() } return rec, nil } diff --git a/tests/automatic/bug_fixes/CMakeLists.txt b/tests/automatic/bug_fixes/CMakeLists.txt index 17a66a5243a33d9371e73664d479faca25d20793..6cc3dbaa7a47a0f8687396b687db47de0129ec79 100644 --- a/tests/automatic/bug_fixes/CMakeLists.txt +++ b/tests/automatic/bug_fixes/CMakeLists.txt @@ -2,6 +2,7 @@ if (UNIX) add_subdirectory(receiver_cpu_usage) if (BUILD_PYTHON) add_subdirectory(consumer_python_memleak) + add_subdirectory(streamlist_python_multithread) add_subdirectory(error-sending-data-using-callback-method) endif() endif() diff --git a/tests/automatic/bug_fixes/streamlist_python_multithread/CMakeLists.txt b/tests/automatic/bug_fixes/streamlist_python_multithread/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..32b3c885e3a01f74c55a3d657af5851b686deba2 --- /dev/null +++ b/tests/automatic/bug_fixes/streamlist_python_multithread/CMakeLists.txt @@ -0,0 +1,17 @@ +set(TARGET_NAME consumer_python_memleak) + + +prepare_asapo() + +if (UNIX) + get_target_property(PYTHON_LIBS python-lib-consumer BINARY_DIR) + get_target_property(PYTHON_LIBS_PRODUCER python-lib-producer BINARY_DIR) +else() + get_target_property(PYTHON_LIBS asapo_consumer BINARY_DIR) + get_target_property(PYTHON_LIBS_PRODUCER asapo_producer BINARY_DIR) +endif() + +add_script_test("${TARGET_NAME}" "${PYTHON_LIBS} ${PYTHON_LIBS_PRODUCER} ${Python_EXECUTABLE}" nomem) + +configure_file(magic_producer.py magic_producer.py COPYONLY) + diff --git a/tests/automatic/bug_fixes/streamlist_python_multithread/check_linux.sh b/tests/automatic/bug_fixes/streamlist_python_multithread/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..162da7d2d5a70f669602064b4184af61ac4446d2 --- /dev/null +++ b/tests/automatic/bug_fixes/streamlist_python_multithread/check_linux.sh @@ -0,0 +1,36 @@ +#!/usr/bin/env bash +set -e + +trap Cleanup EXIT + +endpoint=127.0.0.1:8400 +beamtime_id=asapo_test +token=$ASAPO_TEST_RW_TOKEN + +Cleanup() { + echo cleanup + nomad stop nginx + nomad run nginx_kill.nmd && nomad stop -yes -purge nginx_kill + nomad stop broker + nomad stop discovery + nomad stop authorizer + nomad stop receiver + echo "db.dropDatabase()" | mongo ${beamtime_id}_source_1 + echo "db.dropDatabase()" | mongo ${beamtime_id}_source_2 +} + +nomad run nginx.nmd +nomad run discovery.nmd +nomad run broker.nmd +nomad run receiver_tcp.nmd +nomad run authorizer.nmd + +sleep 1 + +export PYTHONPATH=$1:$2:${PYTHONPATH} +export Python_EXECUTABLE=$3 + +$Python_EXECUTABLE magic_producer.py $endpoint $beamtime_id $token > out +cat out +cat out | grep "5 : number of streams source_1: 5" +cat out | grep "5 : number of streams source_2: 5" diff --git a/tests/automatic/bug_fixes/streamlist_python_multithread/magic_producer.py b/tests/automatic/bug_fixes/streamlist_python_multithread/magic_producer.py new file mode 100644 index 0000000000000000000000000000000000000000..3f658c26143f291c2ab8fa723ccc83bc82a5a750 --- /dev/null +++ b/tests/automatic/bug_fixes/streamlist_python_multithread/magic_producer.py @@ -0,0 +1,41 @@ +from concurrent.futures import ThreadPoolExecutor +import threading +import asapo_producer +import asapo_consumer +from time import sleep +from datetime import datetime +import sys + +endpoint, beamtime, token = sys.argv[1:] + +def dummy_producer(data_source): + def callback(header, err): + if err is not None: + print("could not sent: ", header, err) +# else: +# print("successfuly sent: ", header) + producer = asapo_producer.create_producer(endpoint, 'processed', beamtime, 'auto', data_source, '', 5, + 60000) # source type 'processed' to write to the core filesystem + producer.set_log_level("none") + consumer = asapo_consumer.create_consumer(endpoint, "", False, beamtime, data_source, token, 3000) + + for j in range(5): + stream = datetime.now().strftime('%Y%m%dT_%H%M%S') +# print(f"{data_source} new stream {stream}") + for i in range(5): + producer.send(i + 1, f"{data_source}_{stream}_{i}", None, + callback=callback, + ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, + stream=stream) + sleep(0.6) + producer.send_stream_finished_flag(stream=stream, last_id=i + 1) + print(j+1,": number of streams",f"{data_source}: ", len(consumer.get_stream_list())) + +def main(): + with ThreadPoolExecutor(max_workers=3) as executor: + task1 = executor.submit(dummy_producer, "source_1") + task2 = executor.submit(dummy_producer, "source_2") + + +if __name__ == '__main__': + main() diff --git a/tests/automatic/producer/python_api/check_windows.bat b/tests/automatic/producer/python_api/check_windows.bat index 5d679b0a5ef01997aaa7a9945148075509789041..844e51b378243114eed735074a11e6dbb81e02dc 100644 --- a/tests/automatic/producer/python_api/check_windows.bat +++ b/tests/automatic/producer/python_api/check_windows.bat @@ -31,7 +31,7 @@ for /F %%N in ('find /C "} server warning: ignoring duplicate record" ^< "out"') echo %NUM% | findstr 2 || goto error for /F %%N in ('find /C "} server warning: duplicated request" ^< "out"') do set NUM=%%N -echo %NUM% | findstr 2 || goto error +echo %NUM% | findstr 1 || goto error findstr /I /L /C:"Finished successfully" out || goto :error