From 4bd835ef5c6163452d9f8b9cba37bc0d6dabdd46 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Wed, 26 Feb 2020 19:15:59 +0100 Subject: [PATCH] fix logic, update tests --- .../pipeline/in_to_out_python/in_to_out.py | 2 +- producer/api/python/asapo_producer.pyx.in | 2 +- receiver/src/request_handler_authorize.cpp | 25 +++++++++++-------- receiver/src/request_handler_authorize.h | 3 ++- .../test_request_handler_authorizer.cpp | 14 +++-------- .../bugfix_callback.py | 2 +- .../send_recv_substreams.py | 2 +- tests/automatic/producer/aai/check_linux.sh | 6 ++--- .../automatic/producer/aai/check_windows.bat | 2 +- tests/automatic/producer/aai/producer_aai.py | 6 +++-- .../producer/python_api/producer_api.py | 4 +-- tests/manual/python_tests/producer/test.py | 2 +- .../producer_wait_bug_mongo/test.py | 2 +- .../producer_wait_threads/producer_api.py | 4 +-- .../producer_wait_threads/test.py | 2 +- 15 files changed, 39 insertions(+), 39 deletions(-) diff --git a/examples/pipeline/in_to_out_python/in_to_out.py b/examples/pipeline/in_to_out_python/in_to_out.py index 4c67b5947..9bc02800e 100644 --- a/examples/pipeline/in_to_out_python/in_to_out.py +++ b/examples/pipeline/in_to_out_python/in_to_out.py @@ -30,7 +30,7 @@ transfer_data=int(transfer_data)>0 broker = asapo_consumer.create_server_broker(source,path, beamtime,stream_in,token,timeout_s*1000) -producer = asapo_producer.create_producer(source,beamtime, stream_out, token, nthreads, 600) +producer = asapo_producer.create_producer(source,beamtime,'auto', stream_out, token, nthreads, 600) group_id = broker.generate_group_id() diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index c382900c7..6a5421ce2 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -291,7 +291,7 @@ cdef class PyProducer: throw_exception(err) return pyProd -def create_producer(endpoint,beamtime_id='auto',beamline='auto',stream='detector',token='',nthreads=1,timeout_sec=3600): +def create_producer(endpoint,beamtime_id,beamline,stream,token,nthreads,timeout_sec): """ :param endpoint: server endpoint (url:port) :type endpoint: string diff --git a/receiver/src/request_handler_authorize.cpp b/receiver/src/request_handler_authorize.cpp index 9c35a498e..c36683a76 100644 --- a/receiver/src/request_handler_authorize.cpp +++ b/receiver/src/request_handler_authorize.cpp @@ -72,26 +72,32 @@ Error RequestHandlerAuthorize::ProcessAuthorizationRequest(Request* request) con return Authorize(request, request->GetMessage()); } -Error RequestHandlerAuthorize::ProcessReAuthorization(const Request* request, Error err) const { +Error RequestHandlerAuthorize::ProcessReAuthorization(Request* request) const { + std::string old_beamtimeId = beamtime_id_; + auto err = Authorize(request, cached_source_credentials_.c_str()); if (err == asapo::ReceiverErrorTemplates::kAuthorizationFailure || ( - err==nullptr && request->GetBeamtimeId()!=beamtime_id_)) { + err==nullptr && old_beamtimeId!=beamtime_id_)) { return asapo::ReceiverErrorTemplates::kReAuthorizationFailure.Generate(); } return err; } +bool RequestHandlerAuthorize::NeedReauthorize() const { + uint64_t elapsed_ms = (uint64_t) std::chrono::duration_cast<std::chrono::milliseconds> + (system_clock::now() - last_updated_).count(); + return elapsed_ms >= GetReceiverConfig()->authorization_interval_ms; +} + + Error RequestHandlerAuthorize::ProcessOtherRequest(Request* request) const { if (cached_source_credentials_.empty()) { return ReceiverErrorTemplates::kAuthorizationFailure.Generate(); } - uint64_t elapsed_ms = (uint64_t) std::chrono::duration_cast<std::chrono::milliseconds> - (system_clock::now() - last_updated_).count(); - if (elapsed_ms >= GetReceiverConfig()->authorization_interval_ms) { - auto err = Authorize(request, cached_source_credentials_.c_str()); - auto reauth_err = ProcessReAuthorization(request, std::move(err)); - if (reauth_err) { - return reauth_err; + if (NeedReauthorize()) { + auto err = ProcessReAuthorization(request); + if (err) { + return err; } } request->SetBeamtimeId(beamtime_id_); @@ -119,5 +125,4 @@ StatisticEntity RequestHandlerAuthorize::GetStatisticEntity() const { return StatisticEntity::kNetwork; } - } \ No newline at end of file diff --git a/receiver/src/request_handler_authorize.h b/receiver/src/request_handler_authorize.h index 642a1919c..c0bcd062b 100644 --- a/receiver/src/request_handler_authorize.h +++ b/receiver/src/request_handler_authorize.h @@ -31,7 +31,8 @@ class RequestHandlerAuthorize final: public ReceiverRequestHandler { Error ProcessOtherRequest(Request* request) const; Error Authorize(Request* request, const char* source_credentials) const; Error ErrorFromAuthorizationServerResponse(const Error& err, HttpCode code) const; - Error ProcessReAuthorization(const Request* request, Error err) const; + Error ProcessReAuthorization(Request* request) const; + bool NeedReauthorize() const; std::string GetRequestString(const Request* request, const char* source_credentials) const; }; diff --git a/receiver/unittests/test_request_handler_authorizer.cpp b/receiver/unittests/test_request_handler_authorizer.cpp index b2a444115..c66100cfa 100644 --- a/receiver/unittests/test_request_handler_authorizer.cpp +++ b/receiver/unittests/test_request_handler_authorizer.cpp @@ -144,12 +144,12 @@ class AuthorizerHandlerTests : public Test { MockAuthRequest(error, code); return handler.ProcessRequest(mock_request.get()); } - Error MockRequestAuthorization(bool error, HttpCode code = HttpCode::OK,std::string return_beamtime_id="beamtime_id") { + Error MockRequestAuthorization(bool error, HttpCode code = HttpCode::OK,bool set_request=true) { EXPECT_CALL(*mock_request, GetOpCode()) .WillOnce(Return(asapo::kOpcodeTransferData)) ; - if (!error && code == HttpCode::OK && return_beamtime_id==expected_beamtime_id) { + if (!error && code == HttpCode::OK && set_request) { EXPECT_CALL(*mock_request, SetBeamtimeId(expected_beamtime_id)); EXPECT_CALL(*mock_request, SetStream(expected_stream)); EXPECT_CALL(*mock_request, SetOfflinePath(expected_core_path)); @@ -234,10 +234,6 @@ TEST_F(AuthorizerHandlerTests, DataTransferRequestAuthorizeReturns401) { TEST_F(AuthorizerHandlerTests, DataTransferRequestAuthorizeReturnsSameBeamtimeId) { MockFirstAuthorization(false); - EXPECT_CALL(*mock_request, GetBeamtimeId()) - .WillOnce(ReturnRef(expected_beamtime_id)) - ; - auto err = MockRequestAuthorization(false); ASSERT_THAT(err, Eq(nullptr)); @@ -245,11 +241,9 @@ TEST_F(AuthorizerHandlerTests, DataTransferRequestAuthorizeReturnsSameBeamtimeId TEST_F(AuthorizerHandlerTests, RequestAuthorizeReturnsDifferentBeamtimeId) { MockFirstAuthorization(false); - EXPECT_CALL(*mock_request, GetBeamtimeId()) - .WillOnce(ReturnRef("beamtime_id2")) - ; - auto err = MockRequestAuthorization(false,HttpCode::OK,"beamtime_id2"); + expected_beamtime_id = "different_id"; + auto err = MockRequestAuthorization(false,HttpCode::OK,false); ASSERT_THAT(err, Eq(asapo::ReceiverErrorTemplates::kReAuthorizationFailure)); } diff --git a/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py b/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py index 8f8864647..0f048aeed 100644 --- a/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py +++ b/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py @@ -25,7 +25,7 @@ class AsapoSender: def _callback(self, header, err): print ("hello self callback") -producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads, 600) +producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, nthreads, 600) producer.set_log_level("debug") sender = AsapoSender(producer) diff --git a/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py b/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py index f7c959370..f52b8f446 100644 --- a/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py +++ b/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py @@ -28,7 +28,7 @@ def callback(header,err): source, beamtime, token = sys.argv[1:] broker = asapo_consumer.create_server_broker(source,".", beamtime,"",token,timeout) -producer = asapo_producer.create_producer(source,beamtime, "", token, 1, 600) +producer = asapo_producer.create_producer(source,beamtime,'auto', "", token, 1, 600) producer.set_log_level("debug") group_id = broker.generate_group_id() diff --git a/tests/automatic/producer/aai/check_linux.sh b/tests/automatic/producer/aai/check_linux.sh index 826762621..553319166 100644 --- a/tests/automatic/producer/aai/check_linux.sh +++ b/tests/automatic/producer/aai/check_linux.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -#set -e +set -e trap Cleanup EXIT @@ -30,8 +30,6 @@ Cleanup() { export PYTHONPATH=$2:${PYTHONPATH} -echo "db.${beamtime_id}_${stream}.insert({dummy:1})" | mongo ${beamtime_id}_${stream} >/dev/null - nomad run authorizer.nmd >/dev/null nomad run nginx.nmd >/dev/null nomad run receiver.nmd >/dev/null @@ -46,7 +44,7 @@ echo test > file1 $1 $3 $beamline $token $stream "127.0.0.1:8400" > out || cat out cat out -cat out | grep "successfuly sent" | wc -l | grep 4 +cat out | grep "successfuly sent" | wc -l | grep 3 cat out | grep "reauthorization" | wc -l | grep 1 cat out | grep "duplicated" | wc -l | grep 2 diff --git a/tests/automatic/producer/aai/check_windows.bat b/tests/automatic/producer/aai/check_windows.bat index 2acbb41ac..ea077946c 100644 --- a/tests/automatic/producer/aai/check_windows.bat +++ b/tests/automatic/producer/aai/check_windows.bat @@ -33,7 +33,7 @@ set PYTHONPATH=%2 type out set NUM=0 for /F %%N in ('find /C "successfuly sent" ^< "out"') do set NUM=%%N -echo %NUM% | findstr 4 || goto error +echo %NUM% | findstr 3 || goto error for /F %%N in ('find /C "reauthorization" ^< "out"') do set NUM=%%N echo %NUM% | findstr 1 || goto error diff --git a/tests/automatic/producer/aai/producer_aai.py b/tests/automatic/producer/aai/producer_aai.py index 57b3696cd..54bc6f775 100644 --- a/tests/automatic/producer/aai/producer_aai.py +++ b/tests/automatic/producer/aai/producer_aai.py @@ -26,7 +26,7 @@ def callback(header,err): lock.release() -producer = asapo_producer.create_producer(endpoint,beamline=beamline, stream=stream, token=token, nthreads=nthreads, timeout_sec=60) +producer = asapo_producer.create_producer(endpoint,'auto',beamline, stream, token, nthreads, 60) producer.set_log_level("debug") @@ -35,11 +35,13 @@ producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", producer.wait_requests_finished(10000) +time.sleep(2) -#send single file to other beamtime - should be warning on duplicated request +#send single file to other beamtime - should be warning on duplicated request (same beamtime, no reauthorization) producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback) producer.wait_requests_finished(10000) + fname = 'beamline/p07/current/beamtime-metadata-11111111.json' with open(fname) as json_file: data = json.load(json_file) diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index 084af5568..a0f559f6f 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -24,7 +24,7 @@ def callback(header,err): print ("successfuly sent: ",header) lock.release() -producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads,60) +producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, nthreads,60) producer.set_log_level("debug") @@ -96,7 +96,7 @@ if n!=0: # create with error try: - producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, 0,0) + producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, 0,0) except asapo_producer.AsapoWrongInputError as e: print(e) else: diff --git a/tests/manual/python_tests/producer/test.py b/tests/manual/python_tests/producer/test.py index b5354b29d..2d364a7d7 100644 --- a/tests/manual/python_tests/producer/test.py +++ b/tests/manual/python_tests/producer/test.py @@ -27,7 +27,7 @@ def assert_err(err): print(err) sys.exit(1) -producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads ,0) +producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, nthreads ,0) producer.set_log_level("info") diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/test.py b/tests/manual/python_tests/producer_wait_bug_mongo/test.py index 99d063b46..9e420f33c 100644 --- a/tests/manual/python_tests/producer_wait_bug_mongo/test.py +++ b/tests/manual/python_tests/producer_wait_bug_mongo/test.py @@ -27,7 +27,7 @@ def assert_err(err): print(err) sys.exit(1) -producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads, 600) +producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, nthreads, 600) producer.set_log_level("debug") diff --git a/tests/manual/python_tests/producer_wait_threads/producer_api.py b/tests/manual/python_tests/producer_wait_threads/producer_api.py index e15ce89b8..85ccd36c0 100644 --- a/tests/manual/python_tests/producer_wait_threads/producer_api.py +++ b/tests/manual/python_tests/producer_wait_threads/producer_api.py @@ -22,7 +22,7 @@ def callback(header,err): print ("successfuly sent: ",header) lock.release() -producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads, 600) +producer = asapo_producer.create_producer(endpoint,beamtime, 'auto', stream, token, nthreads, 600) producer.set_log_level("info") @@ -63,7 +63,7 @@ if n!=0: # create with error try: - producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, 0, 600) + producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, 0, 600) except Exception as Asapo: print(e) else: diff --git a/tests/manual/python_tests/producer_wait_threads/test.py b/tests/manual/python_tests/producer_wait_threads/test.py index 70c04381e..5ebe7b95c 100644 --- a/tests/manual/python_tests/producer_wait_threads/test.py +++ b/tests/manual/python_tests/producer_wait_threads/test.py @@ -22,7 +22,7 @@ def callback(header,err): print ("successfuly sent: ",header) lock.release() -producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads, 600) +producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, nthreads, 600) producer.set_log_level("info") -- GitLab