diff --git a/examples/pipeline/in_to_out_python/in_to_out.py b/examples/pipeline/in_to_out_python/in_to_out.py index 4c67b594795c3969babb0e937842088a5f1a9303..9bc02800e7944e485a7ed1e32b8d9486ca24a695 100644 --- a/examples/pipeline/in_to_out_python/in_to_out.py +++ b/examples/pipeline/in_to_out_python/in_to_out.py @@ -30,7 +30,7 @@ transfer_data=int(transfer_data)>0 broker = asapo_consumer.create_server_broker(source,path, beamtime,stream_in,token,timeout_s*1000) -producer = asapo_producer.create_producer(source,beamtime, stream_out, token, nthreads, 600) +producer = asapo_producer.create_producer(source,beamtime,'auto', stream_out, token, nthreads, 600) group_id = broker.generate_group_id() diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index c382900c7fa8626c543e55accfa5b9cfac4607ea..6a5421ce23148bda3a3fe852b1d20a134679d55b 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -291,7 +291,7 @@ cdef class PyProducer: throw_exception(err) return pyProd -def create_producer(endpoint,beamtime_id='auto',beamline='auto',stream='detector',token='',nthreads=1,timeout_sec=3600): +def create_producer(endpoint,beamtime_id,beamline,stream,token,nthreads,timeout_sec): """ :param endpoint: server endpoint (url:port) :type endpoint: string diff --git a/receiver/src/request_handler_authorize.cpp b/receiver/src/request_handler_authorize.cpp index 9c35a498eef4841de94d406b8180b65612738644..c36683a76fc29260017313a842829f84172c97ae 100644 --- a/receiver/src/request_handler_authorize.cpp +++ b/receiver/src/request_handler_authorize.cpp @@ -72,26 +72,32 @@ Error RequestHandlerAuthorize::ProcessAuthorizationRequest(Request* request) con return Authorize(request, request->GetMessage()); } -Error RequestHandlerAuthorize::ProcessReAuthorization(const Request* request, Error err) const { +Error RequestHandlerAuthorize::ProcessReAuthorization(Request* request) const { + std::string old_beamtimeId = beamtime_id_; + auto err = Authorize(request, cached_source_credentials_.c_str()); if (err == asapo::ReceiverErrorTemplates::kAuthorizationFailure || ( - err==nullptr && request->GetBeamtimeId()!=beamtime_id_)) { + err==nullptr && old_beamtimeId!=beamtime_id_)) { return asapo::ReceiverErrorTemplates::kReAuthorizationFailure.Generate(); } return err; } +bool RequestHandlerAuthorize::NeedReauthorize() const { + uint64_t elapsed_ms = (uint64_t) std::chrono::duration_cast<std::chrono::milliseconds> + (system_clock::now() - last_updated_).count(); + return elapsed_ms >= GetReceiverConfig()->authorization_interval_ms; +} + + Error RequestHandlerAuthorize::ProcessOtherRequest(Request* request) const { if (cached_source_credentials_.empty()) { return ReceiverErrorTemplates::kAuthorizationFailure.Generate(); } - uint64_t elapsed_ms = (uint64_t) std::chrono::duration_cast<std::chrono::milliseconds> - (system_clock::now() - last_updated_).count(); - if (elapsed_ms >= GetReceiverConfig()->authorization_interval_ms) { - auto err = Authorize(request, cached_source_credentials_.c_str()); - auto reauth_err = ProcessReAuthorization(request, std::move(err)); - if (reauth_err) { - return reauth_err; + if (NeedReauthorize()) { + auto err = ProcessReAuthorization(request); + if (err) { + return err; } } request->SetBeamtimeId(beamtime_id_); @@ -119,5 +125,4 @@ StatisticEntity RequestHandlerAuthorize::GetStatisticEntity() const { return StatisticEntity::kNetwork; } - } \ No newline at end of file diff --git a/receiver/src/request_handler_authorize.h b/receiver/src/request_handler_authorize.h index 642a1919ca3e97cfd599fe7ed32be2672b649b32..c0bcd062b046094ab4cdb26422df9489cdda7c44 100644 --- a/receiver/src/request_handler_authorize.h +++ b/receiver/src/request_handler_authorize.h @@ -31,7 +31,8 @@ class RequestHandlerAuthorize final: public ReceiverRequestHandler { Error ProcessOtherRequest(Request* request) const; Error Authorize(Request* request, const char* source_credentials) const; Error ErrorFromAuthorizationServerResponse(const Error& err, HttpCode code) const; - Error ProcessReAuthorization(const Request* request, Error err) const; + Error ProcessReAuthorization(Request* request) const; + bool NeedReauthorize() const; std::string GetRequestString(const Request* request, const char* source_credentials) const; }; diff --git a/receiver/unittests/test_request_handler_authorizer.cpp b/receiver/unittests/test_request_handler_authorizer.cpp index b2a444115054ef7f4624fb66fb5e051fae979080..c66100cfab1a868ce74aa07e21a1c3db128de5cf 100644 --- a/receiver/unittests/test_request_handler_authorizer.cpp +++ b/receiver/unittests/test_request_handler_authorizer.cpp @@ -144,12 +144,12 @@ class AuthorizerHandlerTests : public Test { MockAuthRequest(error, code); return handler.ProcessRequest(mock_request.get()); } - Error MockRequestAuthorization(bool error, HttpCode code = HttpCode::OK,std::string return_beamtime_id="beamtime_id") { + Error MockRequestAuthorization(bool error, HttpCode code = HttpCode::OK,bool set_request=true) { EXPECT_CALL(*mock_request, GetOpCode()) .WillOnce(Return(asapo::kOpcodeTransferData)) ; - if (!error && code == HttpCode::OK && return_beamtime_id==expected_beamtime_id) { + if (!error && code == HttpCode::OK && set_request) { EXPECT_CALL(*mock_request, SetBeamtimeId(expected_beamtime_id)); EXPECT_CALL(*mock_request, SetStream(expected_stream)); EXPECT_CALL(*mock_request, SetOfflinePath(expected_core_path)); @@ -234,10 +234,6 @@ TEST_F(AuthorizerHandlerTests, DataTransferRequestAuthorizeReturns401) { TEST_F(AuthorizerHandlerTests, DataTransferRequestAuthorizeReturnsSameBeamtimeId) { MockFirstAuthorization(false); - EXPECT_CALL(*mock_request, GetBeamtimeId()) - .WillOnce(ReturnRef(expected_beamtime_id)) - ; - auto err = MockRequestAuthorization(false); ASSERT_THAT(err, Eq(nullptr)); @@ -245,11 +241,9 @@ TEST_F(AuthorizerHandlerTests, DataTransferRequestAuthorizeReturnsSameBeamtimeId TEST_F(AuthorizerHandlerTests, RequestAuthorizeReturnsDifferentBeamtimeId) { MockFirstAuthorization(false); - EXPECT_CALL(*mock_request, GetBeamtimeId()) - .WillOnce(ReturnRef("beamtime_id2")) - ; - auto err = MockRequestAuthorization(false,HttpCode::OK,"beamtime_id2"); + expected_beamtime_id = "different_id"; + auto err = MockRequestAuthorization(false,HttpCode::OK,false); ASSERT_THAT(err, Eq(asapo::ReceiverErrorTemplates::kReAuthorizationFailure)); } diff --git a/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py b/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py index 8f8864647e2c59cc86b4b7a24b7d25b25a30ad29..0f048aeed2d6f2236d10dbb2197490b5cb038cfc 100644 --- a/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py +++ b/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py @@ -25,7 +25,7 @@ class AsapoSender: def _callback(self, header, err): print ("hello self callback") -producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads, 600) +producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, nthreads, 600) producer.set_log_level("debug") sender = AsapoSender(producer) diff --git a/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py b/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py index f7c959370bdc4f68d6f5512af9fba4c65761bb63..f52b8f4460d60254bbdbbccec174f9d0ccc84c1c 100644 --- a/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py +++ b/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py @@ -28,7 +28,7 @@ def callback(header,err): source, beamtime, token = sys.argv[1:] broker = asapo_consumer.create_server_broker(source,".", beamtime,"",token,timeout) -producer = asapo_producer.create_producer(source,beamtime, "", token, 1, 600) +producer = asapo_producer.create_producer(source,beamtime,'auto', "", token, 1, 600) producer.set_log_level("debug") group_id = broker.generate_group_id() diff --git a/tests/automatic/producer/aai/check_linux.sh b/tests/automatic/producer/aai/check_linux.sh index 826762621fdbeebf242bc9776ac51c012161d448..553319166de9bb1bfa933bac07b0da0eb141c33e 100644 --- a/tests/automatic/producer/aai/check_linux.sh +++ b/tests/automatic/producer/aai/check_linux.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -#set -e +set -e trap Cleanup EXIT @@ -30,8 +30,6 @@ Cleanup() { export PYTHONPATH=$2:${PYTHONPATH} -echo "db.${beamtime_id}_${stream}.insert({dummy:1})" | mongo ${beamtime_id}_${stream} >/dev/null - nomad run authorizer.nmd >/dev/null nomad run nginx.nmd >/dev/null nomad run receiver.nmd >/dev/null @@ -46,7 +44,7 @@ echo test > file1 $1 $3 $beamline $token $stream "127.0.0.1:8400" > out || cat out cat out -cat out | grep "successfuly sent" | wc -l | grep 4 +cat out | grep "successfuly sent" | wc -l | grep 3 cat out | grep "reauthorization" | wc -l | grep 1 cat out | grep "duplicated" | wc -l | grep 2 diff --git a/tests/automatic/producer/aai/check_windows.bat b/tests/automatic/producer/aai/check_windows.bat index 2acbb41ac42accc9c8f43a63135325be12cd1a0f..ea077946cbae3ecd7a9f29da0f914ca244bf5a7e 100644 --- a/tests/automatic/producer/aai/check_windows.bat +++ b/tests/automatic/producer/aai/check_windows.bat @@ -33,7 +33,7 @@ set PYTHONPATH=%2 type out set NUM=0 for /F %%N in ('find /C "successfuly sent" ^< "out"') do set NUM=%%N -echo %NUM% | findstr 4 || goto error +echo %NUM% | findstr 3 || goto error for /F %%N in ('find /C "reauthorization" ^< "out"') do set NUM=%%N echo %NUM% | findstr 1 || goto error diff --git a/tests/automatic/producer/aai/producer_aai.py b/tests/automatic/producer/aai/producer_aai.py index 57b3696cd5a2f6334256579dbdd5c342e30934c8..54bc6f7758aa18a9e72de3fa91005df18037581a 100644 --- a/tests/automatic/producer/aai/producer_aai.py +++ b/tests/automatic/producer/aai/producer_aai.py @@ -26,7 +26,7 @@ def callback(header,err): lock.release() -producer = asapo_producer.create_producer(endpoint,beamline=beamline, stream=stream, token=token, nthreads=nthreads, timeout_sec=60) +producer = asapo_producer.create_producer(endpoint,'auto',beamline, stream, token, nthreads, 60) producer.set_log_level("debug") @@ -35,11 +35,13 @@ producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", producer.wait_requests_finished(10000) +time.sleep(2) -#send single file to other beamtime - should be warning on duplicated request +#send single file to other beamtime - should be warning on duplicated request (same beamtime, no reauthorization) producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback) producer.wait_requests_finished(10000) + fname = 'beamline/p07/current/beamtime-metadata-11111111.json' with open(fname) as json_file: data = json.load(json_file) diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index 084af55680c72cb65808c84fef9c30f1245a3f11..a0f559f6f50a69e4e9bf17ea8af062308c3e8212 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -24,7 +24,7 @@ def callback(header,err): print ("successfuly sent: ",header) lock.release() -producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads,60) +producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, nthreads,60) producer.set_log_level("debug") @@ -96,7 +96,7 @@ if n!=0: # create with error try: - producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, 0,0) + producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, 0,0) except asapo_producer.AsapoWrongInputError as e: print(e) else: diff --git a/tests/manual/python_tests/producer/test.py b/tests/manual/python_tests/producer/test.py index b5354b29daf550583db5cd6a3efc6fa4000bdf2e..2d364a7d7e4827f655e79b38229d88b7db457214 100644 --- a/tests/manual/python_tests/producer/test.py +++ b/tests/manual/python_tests/producer/test.py @@ -27,7 +27,7 @@ def assert_err(err): print(err) sys.exit(1) -producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads ,0) +producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, nthreads ,0) producer.set_log_level("info") diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/test.py b/tests/manual/python_tests/producer_wait_bug_mongo/test.py index 99d063b467a4f2fe9cfc498a94ab1252e333f638..9e420f33ccef2879ae05e46fa3a616edf241d88d 100644 --- a/tests/manual/python_tests/producer_wait_bug_mongo/test.py +++ b/tests/manual/python_tests/producer_wait_bug_mongo/test.py @@ -27,7 +27,7 @@ def assert_err(err): print(err) sys.exit(1) -producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads, 600) +producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, nthreads, 600) producer.set_log_level("debug") diff --git a/tests/manual/python_tests/producer_wait_threads/producer_api.py b/tests/manual/python_tests/producer_wait_threads/producer_api.py index e15ce89b8b6214fe763b6eb8f66697e9404e0dc9..85ccd36c07dfe93c0018d1bb017fedbb8e18f11b 100644 --- a/tests/manual/python_tests/producer_wait_threads/producer_api.py +++ b/tests/manual/python_tests/producer_wait_threads/producer_api.py @@ -22,7 +22,7 @@ def callback(header,err): print ("successfuly sent: ",header) lock.release() -producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads, 600) +producer = asapo_producer.create_producer(endpoint,beamtime, 'auto', stream, token, nthreads, 600) producer.set_log_level("info") @@ -63,7 +63,7 @@ if n!=0: # create with error try: - producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, 0, 600) + producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, 0, 600) except Exception as Asapo: print(e) else: diff --git a/tests/manual/python_tests/producer_wait_threads/test.py b/tests/manual/python_tests/producer_wait_threads/test.py index 70c04381ed06d5b10819b35146a6f064b2c26f79..5ebe7b95caec871caed2240f232a8494b05857bf 100644 --- a/tests/manual/python_tests/producer_wait_threads/test.py +++ b/tests/manual/python_tests/producer_wait_threads/test.py @@ -22,7 +22,7 @@ def callback(header,err): print ("successfuly sent: ",header) lock.release() -producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads, 600) +producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, nthreads, 600) producer.set_log_level("info")