Skip to content
Snippets Groups Projects
Commit 37186bc5 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

fix streamlist bug

parent 3e2a612f
No related branches found
No related tags found
No related merge requests found
## 21.03.1 ## 21.03.1
IMPROVEMENTS
* Consumer API - retry file delivery/reading with timeout (can be useful for the case file arrives after metadta ingested asapo, e.g. for slow NFS transfer,...)
BUG FIXES BUG FIXES
* fix LDAP authorization for raw data type Producers * Core services: fix LDAP authorization for raw data type Producers
* Consumer API: fix race condition in GetStreamList/get_stream_list
* Producer API: fix segfault in send_stream_finished_flag
## 21.03.0 ## 21.03.0
......
...@@ -28,14 +28,14 @@ type StreamsRecord struct { ...@@ -28,14 +28,14 @@ type StreamsRecord struct {
type Streams struct { type Streams struct {
records map[string]StreamsRecord records map[string]StreamsRecord
lastUpdated int64 lastUpdated map[string]int64
} }
var streams = Streams{lastUpdated: 0, records: make(map[string]StreamsRecord, 0)} var streams = Streams{lastUpdated: make(map[string]int64, 0), records: make(map[string]StreamsRecord, 0)}
var streamsLock sync.Mutex var streamsLock sync.Mutex
func (ss *Streams) tryGetFromCache(db_name string, updatePeriodMs int) (StreamsRecord, error) { func (ss *Streams) tryGetFromCache(db_name string, updatePeriodMs int) (StreamsRecord, error) {
if ss.lastUpdated < time.Now().UnixNano()-int64(updatePeriodMs*1000000) { if ss.lastUpdated[db_name] < time.Now().UnixNano()-int64(updatePeriodMs*1000000) {
return StreamsRecord{}, errors.New("cache expired") return StreamsRecord{}, errors.New("cache expired")
} }
rec, ok := ss.records[db_name] rec, ok := ss.records[db_name]
...@@ -173,7 +173,7 @@ func (ss *Streams) updateFromDb(db *Mongodb, db_name string) (StreamsRecord, err ...@@ -173,7 +173,7 @@ func (ss *Streams) updateFromDb(db *Mongodb, db_name string) (StreamsRecord, err
res :=StreamsRecord{} res :=StreamsRecord{}
utils.DeepCopy(rec,&res) utils.DeepCopy(rec,&res)
ss.records[db_name] = res ss.records[db_name] = res
ss.lastUpdated = time.Now().UnixNano() ss.lastUpdated[db_name] = time.Now().UnixNano()
} }
return rec, nil return rec, nil
} }
......
...@@ -2,6 +2,7 @@ if (UNIX) ...@@ -2,6 +2,7 @@ if (UNIX)
add_subdirectory(receiver_cpu_usage) add_subdirectory(receiver_cpu_usage)
if (BUILD_PYTHON) if (BUILD_PYTHON)
add_subdirectory(consumer_python_memleak) add_subdirectory(consumer_python_memleak)
add_subdirectory(streamlist_python_multithread)
add_subdirectory(error-sending-data-using-callback-method) add_subdirectory(error-sending-data-using-callback-method)
endif() endif()
endif() endif()
......
set(TARGET_NAME consumer_python_memleak)
prepare_asapo()
if (UNIX)
get_target_property(PYTHON_LIBS python-lib-consumer BINARY_DIR)
get_target_property(PYTHON_LIBS_PRODUCER python-lib-producer BINARY_DIR)
else()
get_target_property(PYTHON_LIBS asapo_consumer BINARY_DIR)
get_target_property(PYTHON_LIBS_PRODUCER asapo_producer BINARY_DIR)
endif()
add_script_test("${TARGET_NAME}" "${PYTHON_LIBS} ${PYTHON_LIBS_PRODUCER} ${Python_EXECUTABLE}" nomem)
configure_file(magic_producer.py magic_producer.py COPYONLY)
#!/usr/bin/env bash
set -e
trap Cleanup EXIT
endpoint=127.0.0.1:8400
beamtime_id=asapo_test
token=$ASAPO_TEST_RW_TOKEN
Cleanup() {
echo cleanup
nomad stop nginx
nomad run nginx_kill.nmd && nomad stop -yes -purge nginx_kill
nomad stop broker
nomad stop discovery
nomad stop authorizer
nomad stop receiver
echo "db.dropDatabase()" | mongo ${beamtime_id}_source_1
echo "db.dropDatabase()" | mongo ${beamtime_id}_source_2
}
nomad run nginx.nmd
nomad run discovery.nmd
nomad run broker.nmd
nomad run receiver_tcp.nmd
nomad run authorizer.nmd
sleep 1
export PYTHONPATH=$1:$2:${PYTHONPATH}
export Python_EXECUTABLE=$3
$Python_EXECUTABLE magic_producer.py $endpoint $beamtime_id $token > out
cat out
cat out | grep "5 : number of streams source_1: 5"
cat out | grep "5 : number of streams source_2: 5"
from concurrent.futures import ThreadPoolExecutor
import threading
import asapo_producer
import asapo_consumer
from time import sleep
from datetime import datetime
import sys
endpoint, beamtime, token = sys.argv[1:]
def dummy_producer(data_source):
def callback(header, err):
if err is not None:
print("could not sent: ", header, err)
# else:
# print("successfuly sent: ", header)
producer = asapo_producer.create_producer(endpoint, 'processed', beamtime, 'auto', data_source, '', 5,
60000) # source type 'processed' to write to the core filesystem
producer.set_log_level("none")
consumer = asapo_consumer.create_consumer(endpoint, "", False, beamtime, data_source, token, 3000)
for j in range(5):
stream = datetime.now().strftime('%Y%m%dT_%H%M%S')
# print(f"{data_source} new stream {stream}")
for i in range(5):
producer.send(i + 1, f"{data_source}_{stream}_{i}", None,
callback=callback,
ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY,
stream=stream)
sleep(0.6)
producer.send_stream_finished_flag(stream=stream, last_id=i + 1)
print(j+1,": number of streams",f"{data_source}: ", len(consumer.get_stream_list()))
def main():
with ThreadPoolExecutor(max_workers=3) as executor:
task1 = executor.submit(dummy_producer, "source_1")
task2 = executor.submit(dummy_producer, "source_2")
if __name__ == '__main__':
main()
...@@ -31,7 +31,7 @@ for /F %%N in ('find /C "} server warning: ignoring duplicate record" ^< "out"') ...@@ -31,7 +31,7 @@ for /F %%N in ('find /C "} server warning: ignoring duplicate record" ^< "out"')
echo %NUM% | findstr 2 || goto error echo %NUM% | findstr 2 || goto error
for /F %%N in ('find /C "} server warning: duplicated request" ^< "out"') do set NUM=%%N for /F %%N in ('find /C "} server warning: duplicated request" ^< "out"') do set NUM=%%N
echo %NUM% | findstr 2 || goto error echo %NUM% | findstr 1 || goto error
findstr /I /L /C:"Finished successfully" out || goto :error findstr /I /L /C:"Finished successfully" out || goto :error
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment