Skip to content
Snippets Groups Projects
Commit 4bd835ef authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

fix logic, update tests

parent 4f0798fd
No related branches found
No related tags found
No related merge requests found
Showing
with 39 additions and 39 deletions
...@@ -30,7 +30,7 @@ transfer_data=int(transfer_data)>0 ...@@ -30,7 +30,7 @@ transfer_data=int(transfer_data)>0
broker = asapo_consumer.create_server_broker(source,path, beamtime,stream_in,token,timeout_s*1000) broker = asapo_consumer.create_server_broker(source,path, beamtime,stream_in,token,timeout_s*1000)
producer = asapo_producer.create_producer(source,beamtime, stream_out, token, nthreads, 600) producer = asapo_producer.create_producer(source,beamtime,'auto', stream_out, token, nthreads, 600)
group_id = broker.generate_group_id() group_id = broker.generate_group_id()
......
...@@ -291,7 +291,7 @@ cdef class PyProducer: ...@@ -291,7 +291,7 @@ cdef class PyProducer:
throw_exception(err) throw_exception(err)
return pyProd return pyProd
def create_producer(endpoint,beamtime_id='auto',beamline='auto',stream='detector',token='',nthreads=1,timeout_sec=3600): def create_producer(endpoint,beamtime_id,beamline,stream,token,nthreads,timeout_sec):
""" """
:param endpoint: server endpoint (url:port) :param endpoint: server endpoint (url:port)
:type endpoint: string :type endpoint: string
......
...@@ -72,26 +72,32 @@ Error RequestHandlerAuthorize::ProcessAuthorizationRequest(Request* request) con ...@@ -72,26 +72,32 @@ Error RequestHandlerAuthorize::ProcessAuthorizationRequest(Request* request) con
return Authorize(request, request->GetMessage()); return Authorize(request, request->GetMessage());
} }
Error RequestHandlerAuthorize::ProcessReAuthorization(const Request* request, Error err) const { Error RequestHandlerAuthorize::ProcessReAuthorization(Request* request) const {
std::string old_beamtimeId = beamtime_id_;
auto err = Authorize(request, cached_source_credentials_.c_str());
if (err == asapo::ReceiverErrorTemplates::kAuthorizationFailure || ( if (err == asapo::ReceiverErrorTemplates::kAuthorizationFailure || (
err==nullptr && request->GetBeamtimeId()!=beamtime_id_)) { err==nullptr && old_beamtimeId!=beamtime_id_)) {
return asapo::ReceiverErrorTemplates::kReAuthorizationFailure.Generate(); return asapo::ReceiverErrorTemplates::kReAuthorizationFailure.Generate();
} }
return err; return err;
} }
bool RequestHandlerAuthorize::NeedReauthorize() const {
uint64_t elapsed_ms = (uint64_t) std::chrono::duration_cast<std::chrono::milliseconds>
(system_clock::now() - last_updated_).count();
return elapsed_ms >= GetReceiverConfig()->authorization_interval_ms;
}
Error RequestHandlerAuthorize::ProcessOtherRequest(Request* request) const { Error RequestHandlerAuthorize::ProcessOtherRequest(Request* request) const {
if (cached_source_credentials_.empty()) { if (cached_source_credentials_.empty()) {
return ReceiverErrorTemplates::kAuthorizationFailure.Generate(); return ReceiverErrorTemplates::kAuthorizationFailure.Generate();
} }
uint64_t elapsed_ms = (uint64_t) std::chrono::duration_cast<std::chrono::milliseconds> if (NeedReauthorize()) {
(system_clock::now() - last_updated_).count(); auto err = ProcessReAuthorization(request);
if (elapsed_ms >= GetReceiverConfig()->authorization_interval_ms) { if (err) {
auto err = Authorize(request, cached_source_credentials_.c_str()); return err;
auto reauth_err = ProcessReAuthorization(request, std::move(err));
if (reauth_err) {
return reauth_err;
} }
} }
request->SetBeamtimeId(beamtime_id_); request->SetBeamtimeId(beamtime_id_);
...@@ -119,5 +125,4 @@ StatisticEntity RequestHandlerAuthorize::GetStatisticEntity() const { ...@@ -119,5 +125,4 @@ StatisticEntity RequestHandlerAuthorize::GetStatisticEntity() const {
return StatisticEntity::kNetwork; return StatisticEntity::kNetwork;
} }
} }
\ No newline at end of file
...@@ -31,7 +31,8 @@ class RequestHandlerAuthorize final: public ReceiverRequestHandler { ...@@ -31,7 +31,8 @@ class RequestHandlerAuthorize final: public ReceiverRequestHandler {
Error ProcessOtherRequest(Request* request) const; Error ProcessOtherRequest(Request* request) const;
Error Authorize(Request* request, const char* source_credentials) const; Error Authorize(Request* request, const char* source_credentials) const;
Error ErrorFromAuthorizationServerResponse(const Error& err, HttpCode code) const; Error ErrorFromAuthorizationServerResponse(const Error& err, HttpCode code) const;
Error ProcessReAuthorization(const Request* request, Error err) const; Error ProcessReAuthorization(Request* request) const;
bool NeedReauthorize() const;
std::string GetRequestString(const Request* request, const char* source_credentials) const; std::string GetRequestString(const Request* request, const char* source_credentials) const;
}; };
......
...@@ -144,12 +144,12 @@ class AuthorizerHandlerTests : public Test { ...@@ -144,12 +144,12 @@ class AuthorizerHandlerTests : public Test {
MockAuthRequest(error, code); MockAuthRequest(error, code);
return handler.ProcessRequest(mock_request.get()); return handler.ProcessRequest(mock_request.get());
} }
Error MockRequestAuthorization(bool error, HttpCode code = HttpCode::OK,std::string return_beamtime_id="beamtime_id") { Error MockRequestAuthorization(bool error, HttpCode code = HttpCode::OK,bool set_request=true) {
EXPECT_CALL(*mock_request, GetOpCode()) EXPECT_CALL(*mock_request, GetOpCode())
.WillOnce(Return(asapo::kOpcodeTransferData)) .WillOnce(Return(asapo::kOpcodeTransferData))
; ;
if (!error && code == HttpCode::OK && return_beamtime_id==expected_beamtime_id) { if (!error && code == HttpCode::OK && set_request) {
EXPECT_CALL(*mock_request, SetBeamtimeId(expected_beamtime_id)); EXPECT_CALL(*mock_request, SetBeamtimeId(expected_beamtime_id));
EXPECT_CALL(*mock_request, SetStream(expected_stream)); EXPECT_CALL(*mock_request, SetStream(expected_stream));
EXPECT_CALL(*mock_request, SetOfflinePath(expected_core_path)); EXPECT_CALL(*mock_request, SetOfflinePath(expected_core_path));
...@@ -234,10 +234,6 @@ TEST_F(AuthorizerHandlerTests, DataTransferRequestAuthorizeReturns401) { ...@@ -234,10 +234,6 @@ TEST_F(AuthorizerHandlerTests, DataTransferRequestAuthorizeReturns401) {
TEST_F(AuthorizerHandlerTests, DataTransferRequestAuthorizeReturnsSameBeamtimeId) { TEST_F(AuthorizerHandlerTests, DataTransferRequestAuthorizeReturnsSameBeamtimeId) {
MockFirstAuthorization(false); MockFirstAuthorization(false);
EXPECT_CALL(*mock_request, GetBeamtimeId())
.WillOnce(ReturnRef(expected_beamtime_id))
;
auto err = MockRequestAuthorization(false); auto err = MockRequestAuthorization(false);
ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(err, Eq(nullptr));
...@@ -245,11 +241,9 @@ TEST_F(AuthorizerHandlerTests, DataTransferRequestAuthorizeReturnsSameBeamtimeId ...@@ -245,11 +241,9 @@ TEST_F(AuthorizerHandlerTests, DataTransferRequestAuthorizeReturnsSameBeamtimeId
TEST_F(AuthorizerHandlerTests, RequestAuthorizeReturnsDifferentBeamtimeId) { TEST_F(AuthorizerHandlerTests, RequestAuthorizeReturnsDifferentBeamtimeId) {
MockFirstAuthorization(false); MockFirstAuthorization(false);
EXPECT_CALL(*mock_request, GetBeamtimeId())
.WillOnce(ReturnRef("beamtime_id2"))
;
auto err = MockRequestAuthorization(false,HttpCode::OK,"beamtime_id2"); expected_beamtime_id = "different_id";
auto err = MockRequestAuthorization(false,HttpCode::OK,false);
ASSERT_THAT(err, Eq(asapo::ReceiverErrorTemplates::kReAuthorizationFailure)); ASSERT_THAT(err, Eq(asapo::ReceiverErrorTemplates::kReAuthorizationFailure));
} }
......
...@@ -25,7 +25,7 @@ class AsapoSender: ...@@ -25,7 +25,7 @@ class AsapoSender:
def _callback(self, header, err): def _callback(self, header, err):
print ("hello self callback") print ("hello self callback")
producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads, 600) producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, nthreads, 600)
producer.set_log_level("debug") producer.set_log_level("debug")
sender = AsapoSender(producer) sender = AsapoSender(producer)
......
...@@ -28,7 +28,7 @@ def callback(header,err): ...@@ -28,7 +28,7 @@ def callback(header,err):
source, beamtime, token = sys.argv[1:] source, beamtime, token = sys.argv[1:]
broker = asapo_consumer.create_server_broker(source,".", beamtime,"",token,timeout) broker = asapo_consumer.create_server_broker(source,".", beamtime,"",token,timeout)
producer = asapo_producer.create_producer(source,beamtime, "", token, 1, 600) producer = asapo_producer.create_producer(source,beamtime,'auto', "", token, 1, 600)
producer.set_log_level("debug") producer.set_log_level("debug")
group_id = broker.generate_group_id() group_id = broker.generate_group_id()
......
#!/usr/bin/env bash #!/usr/bin/env bash
#set -e set -e
trap Cleanup EXIT trap Cleanup EXIT
...@@ -30,8 +30,6 @@ Cleanup() { ...@@ -30,8 +30,6 @@ Cleanup() {
export PYTHONPATH=$2:${PYTHONPATH} export PYTHONPATH=$2:${PYTHONPATH}
echo "db.${beamtime_id}_${stream}.insert({dummy:1})" | mongo ${beamtime_id}_${stream} >/dev/null
nomad run authorizer.nmd >/dev/null nomad run authorizer.nmd >/dev/null
nomad run nginx.nmd >/dev/null nomad run nginx.nmd >/dev/null
nomad run receiver.nmd >/dev/null nomad run receiver.nmd >/dev/null
...@@ -46,7 +44,7 @@ echo test > file1 ...@@ -46,7 +44,7 @@ echo test > file1
$1 $3 $beamline $token $stream "127.0.0.1:8400" > out || cat out $1 $3 $beamline $token $stream "127.0.0.1:8400" > out || cat out
cat out cat out
cat out | grep "successfuly sent" | wc -l | grep 4 cat out | grep "successfuly sent" | wc -l | grep 3
cat out | grep "reauthorization" | wc -l | grep 1 cat out | grep "reauthorization" | wc -l | grep 1
cat out | grep "duplicated" | wc -l | grep 2 cat out | grep "duplicated" | wc -l | grep 2
...@@ -33,7 +33,7 @@ set PYTHONPATH=%2 ...@@ -33,7 +33,7 @@ set PYTHONPATH=%2
type out type out
set NUM=0 set NUM=0
for /F %%N in ('find /C "successfuly sent" ^< "out"') do set NUM=%%N for /F %%N in ('find /C "successfuly sent" ^< "out"') do set NUM=%%N
echo %NUM% | findstr 4 || goto error echo %NUM% | findstr 3 || goto error
for /F %%N in ('find /C "reauthorization" ^< "out"') do set NUM=%%N for /F %%N in ('find /C "reauthorization" ^< "out"') do set NUM=%%N
echo %NUM% | findstr 1 || goto error echo %NUM% | findstr 1 || goto error
......
...@@ -26,7 +26,7 @@ def callback(header,err): ...@@ -26,7 +26,7 @@ def callback(header,err):
lock.release() lock.release()
producer = asapo_producer.create_producer(endpoint,beamline=beamline, stream=stream, token=token, nthreads=nthreads, timeout_sec=60) producer = asapo_producer.create_producer(endpoint,'auto',beamline, stream, token, nthreads, 60)
producer.set_log_level("debug") producer.set_log_level("debug")
...@@ -35,11 +35,13 @@ producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", ...@@ -35,11 +35,13 @@ producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1",
producer.wait_requests_finished(10000) producer.wait_requests_finished(10000)
time.sleep(2)
#send single file to other beamtime - should be warning on duplicated request #send single file to other beamtime - should be warning on duplicated request (same beamtime, no reauthorization)
producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback) producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback)
producer.wait_requests_finished(10000) producer.wait_requests_finished(10000)
fname = 'beamline/p07/current/beamtime-metadata-11111111.json' fname = 'beamline/p07/current/beamtime-metadata-11111111.json'
with open(fname) as json_file: with open(fname) as json_file:
data = json.load(json_file) data = json.load(json_file)
......
...@@ -24,7 +24,7 @@ def callback(header,err): ...@@ -24,7 +24,7 @@ def callback(header,err):
print ("successfuly sent: ",header) print ("successfuly sent: ",header)
lock.release() lock.release()
producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads,60) producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, nthreads,60)
producer.set_log_level("debug") producer.set_log_level("debug")
...@@ -96,7 +96,7 @@ if n!=0: ...@@ -96,7 +96,7 @@ if n!=0:
# create with error # create with error
try: try:
producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, 0,0) producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, 0,0)
except asapo_producer.AsapoWrongInputError as e: except asapo_producer.AsapoWrongInputError as e:
print(e) print(e)
else: else:
......
...@@ -27,7 +27,7 @@ def assert_err(err): ...@@ -27,7 +27,7 @@ def assert_err(err):
print(err) print(err)
sys.exit(1) sys.exit(1)
producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads ,0) producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, nthreads ,0)
producer.set_log_level("info") producer.set_log_level("info")
......
...@@ -27,7 +27,7 @@ def assert_err(err): ...@@ -27,7 +27,7 @@ def assert_err(err):
print(err) print(err)
sys.exit(1) sys.exit(1)
producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads, 600) producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, nthreads, 600)
producer.set_log_level("debug") producer.set_log_level("debug")
......
...@@ -22,7 +22,7 @@ def callback(header,err): ...@@ -22,7 +22,7 @@ def callback(header,err):
print ("successfuly sent: ",header) print ("successfuly sent: ",header)
lock.release() lock.release()
producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads, 600) producer = asapo_producer.create_producer(endpoint,beamtime, 'auto', stream, token, nthreads, 600)
producer.set_log_level("info") producer.set_log_level("info")
...@@ -63,7 +63,7 @@ if n!=0: ...@@ -63,7 +63,7 @@ if n!=0:
# create with error # create with error
try: try:
producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, 0, 600) producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, 0, 600)
except Exception as Asapo: except Exception as Asapo:
print(e) print(e)
else: else:
......
...@@ -22,7 +22,7 @@ def callback(header,err): ...@@ -22,7 +22,7 @@ def callback(header,err):
print ("successfuly sent: ",header) print ("successfuly sent: ",header)
lock.release() lock.release()
producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads, 600) producer = asapo_producer.create_producer(endpoint,beamtime,'auto', stream, token, nthreads, 600)
producer.set_log_level("info") producer.set_log_level("info")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment