diff --git a/examples/pipeline/in_to_out/check_linux.sh b/examples/pipeline/in_to_out/check_linux.sh index cef2a59dde2e09fa8b3b27d164aab095721420af..55199387c0f92e082635db355ac7ebc4c0713f88 100644 --- a/examples/pipeline/in_to_out/check_linux.sh +++ b/examples/pipeline/in_to_out/check_linux.sh @@ -56,7 +56,7 @@ done sleep 1 -$1 127.0.0.1:8400 $source_path $beamtime_id $stream_in $stream_out $token 2 1000 1 > out +$1 127.0.0.1:8400 $source_path $beamtime_id $stream_in $stream_out $token 2 1000 25000 1 > out cat out cat out | grep "Processed 3 file(s)" cat out | grep "Sent 3 file(s)" @@ -67,7 +67,7 @@ cat ${receiver_folder}/file1_${stream_out} | grep hello1 cat ${receiver_folder}/file2_${stream_out} | grep hello2 cat ${receiver_folder}/file3_${stream_out} | grep hello3 -$1 127.0.0.1:8400 $source_path $beamtime_id $stream_in $stream_out2 $token 2 1000 0 > out2 +$1 127.0.0.1:8400 $source_path $beamtime_id $stream_in $stream_out2 $token 2 1000 25000 0 > out2 cat out2 test ! -f ${receiver_folder}/file1_${stream_out2} echo "db.data.find({"_id":1})" | mongo ${outdatabase_name2} | tee /dev/stderr | grep ./file1 diff --git a/examples/pipeline/in_to_out/check_windows.bat b/examples/pipeline/in_to_out/check_windows.bat index 30a6407fab67ffb8ed1cc2b636bf52c5f7b86e97..b4ebe9fbf68587810aad8b9e869fcce75a1b9614 100644 --- a/examples/pipeline/in_to_out/check_windows.bat +++ b/examples/pipeline/in_to_out/check_windows.bat @@ -36,7 +36,7 @@ echo hello2 > file2 echo hello3 > file3 -"%1" 127.0.0.1:8400 %source_path% %beamtime_id% %stream_in% %stream_out% %token% 2 1000 1 > out +"%1" 127.0.0.1:8400 %source_path% %beamtime_id% %stream_in% %stream_out% %token% 2 1000 25000 1 > out type out findstr /I /L /C:"Processed 3 file(s)" out || goto :error findstr /I /L /C:"Sent 3 file(s)" out || goto :error @@ -48,7 +48,7 @@ findstr /I /L /C:"hello2" %receiver_folder%\file2_%stream_out% || goto :error findstr /I /L /C:"hello3" %receiver_folder%\file3_%stream_out% || goto :error -"%1" 127.0.0.1:8400 %source_path% %beamtime_id% %stream_in% %stream_out2% %token% 2 1000 0 > out2 +"%1" 127.0.0.1:8400 %source_path% %beamtime_id% %stream_in% %stream_out2% %token% 2 1000 25000 0 > out2 type out2 findstr /I /L /C:"Processed 3 file(s)" out2 || goto :error findstr /I /L /C:"Sent 3 file(s)" out2 || goto :error diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp index ac2ee3e5c4e899e37b7493d150afce9379b4e9d5..0a15cac5f0d3d194b063cc1784e0cf6e733fad25 100644 --- a/examples/pipeline/in_to_out/in_to_out.cpp +++ b/examples/pipeline/in_to_out/in_to_out.cpp @@ -33,6 +33,7 @@ struct Args { std::string stream_out; std::string token; int timeout_ms; + int timeout_ms_producer; int nthreads; bool transfer_data; }; @@ -206,7 +207,7 @@ void WaitProducerThreadsFinished(const Args& args, int nfiles) { } std::this_thread::sleep_for(std::chrono::milliseconds(100)); elapsed_ms += 100; - if (elapsed_ms > args.timeout_ms) { + if (elapsed_ms > args.timeout_ms_producer) { std::cerr << "Stream out exit on timeout " << std::endl; break; } @@ -218,9 +219,9 @@ void WaitProducerThreadsFinished(const Args& args, int nfiles) { int main(int argc, char* argv[]) { asapo::ExitAfterPrintVersionIfNeeded("GetNext Broker Example", argc, argv); Args args; - if (argc != 10) { + if (argc != 11) { std::cout << "Usage: " + std::string{argv[0]} - + " <server> <files_path> <beamtime_id> <stream_in> <stream_out> <nthreads> <token> <timeout ms> <transfer data>" + + " <server> <files_path> <beamtime_id> <stream_in> <stream_out> <nthreads> <token> <timeout ms> <timeout ms producer> <transfer data>" << std::endl; exit(EXIT_FAILURE); @@ -233,7 +234,8 @@ int main(int argc, char* argv[]) { args.token = std::string{argv[6]}; args.nthreads = atoi(argv[7]); args.timeout_ms = atoi(argv[8]); - args.transfer_data = atoi(argv[9]) == 1; + args.timeout_ms_producer = atoi(argv[9]); + args.transfer_data = atoi(argv[10]) == 1; auto producer = CreateProducer(args); files_sent = 0; diff --git a/examples/pipeline/in_to_out_python/check_linux.sh b/examples/pipeline/in_to_out_python/check_linux.sh index 9feb505bf0fdf909e5bce54591b4a380cdfdbd0f..2d3b4a8440d6a2bb426bf41ffa89c1a33accf902 100644 --- a/examples/pipeline/in_to_out_python/check_linux.sh +++ b/examples/pipeline/in_to_out_python/check_linux.sh @@ -6,6 +6,7 @@ stream_in=detector stream_out=stream timeout=2 +timeout_producer=25 nthreads=4 indatabase_name=${beamtime_id}_${stream_in} @@ -60,7 +61,7 @@ sleep 1 export PYTHONPATH=$2:$3:${PYTHONPATH} -$1 $4 127.0.0.1:8400 $source_path $beamtime_id $stream_in $stream_out $token $timeout $nthreads 1 > out +$1 $4 127.0.0.1:8400 $source_path $beamtime_id $stream_in $stream_out $token $timeout $timeout_producer $nthreads 1 > out cat out cat out | grep "Processed 3 file(s)" cat out | grep "Sent 3 file(s)" diff --git a/examples/pipeline/in_to_out_python/check_windows.bat b/examples/pipeline/in_to_out_python/check_windows.bat index a4d705c93e6929c1a2eb77b5de8f6fd08aa2542e..1fda1fe94737c58f13a6249573b6e406a7f3670e 100644 --- a/examples/pipeline/in_to_out_python/check_windows.bat +++ b/examples/pipeline/in_to_out_python/check_windows.bat @@ -17,6 +17,7 @@ SET receiver_folder="%receiver_root_folder%\%beamline%\%beamtime_id%" SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" SET timeout=2 +SET timeout_producer=25 SET nthreads=4 c:\opt\consul\nomad run discovery.nmd @@ -37,7 +38,7 @@ echo hello3 > file3 set PYTHONPATH=%2;%3 -"%1" "%4" 127.0.0.1:8400 %source_path% %beamtime_id% %stream_in% %stream_out% %token% %timeout% %nthreads% 1 > out +"%1" "%4" 127.0.0.1:8400 %source_path% %beamtime_id% %stream_in% %stream_out% %token% %timeout% %timeout_producer% %nthreads% 1 > out type out findstr /I /L /C:"Processed 3 file(s)" out || goto :error diff --git a/examples/pipeline/in_to_out_python/in_to_out.py b/examples/pipeline/in_to_out_python/in_to_out.py index c8d5aed3fcfd9f9d0d0d38c45ecfc068c37cb981..e2e2d603173f1ed7d21bce94a4fcce936736bd61 100644 --- a/examples/pipeline/in_to_out_python/in_to_out.py +++ b/examples/pipeline/in_to_out_python/in_to_out.py @@ -29,8 +29,9 @@ def wait_send(n_files, timeout_s): break time.sleep(1) -source, path, beamtime,stream_in, stream_out, token, timeout_s,nthreads, transfer_data = sys.argv[1:] +source, path, beamtime,stream_in, stream_out, token, timeout_s,timeout_s_producer,nthreads, transfer_data = sys.argv[1:] timeout_s=int(timeout_s) +timeout_s_producer=int(timeout_s_producer) nthreads=int(nthreads) transfer_data=int(transfer_data)>0 @@ -60,7 +61,7 @@ while True: break -wait_send(n_recv,timeout_s) +wait_send(n_recv,timeout_s_producer) print ("Processed "+str(n_recv)+" file(s)") print ("Sent "+str(n_send)+" file(s)")