diff --git a/3d_party/libcurl/install.sh b/3d_party/libcurl/install.sh index 9b39b43b43efabdfe8e4d3be5a28f3a43ecab090..158608c29232b4d5086cb9238ff5c4f0d588acbf 100755 --- a/3d_party/libcurl/install.sh +++ b/3d_party/libcurl/install.sh @@ -4,7 +4,7 @@ cd $1 wget https://curl.haxx.se/download/curl-7.58.0.tar.gz tar xzf curl-7.58.0.tar.gz cd curl-7.58.0 -./configure --without-ssl --disable-shared --disable-manual --disable-ares --disable-cookies \ +./configure --without-ssl --disable-shared --disable-manual --disable-ares \ --disable-crypto-auth --disable-ipv6 --disable-proxy --disable-unix-sockets \ --without-libidn --without-librtmp --without-zlib --disable-ldap \ --disable-libcurl-option --prefix=`pwd`/../ diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a0fcc607fecc90f7c134e56fcfb32def7cea47c..1261502b518146b15dd8c7905f97e2b416d10d1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,8 @@ FEATURES * introduced substreams for producer/consumer * introduced timeout for producer requests * producer accepts "auto" for beamtime, will automatically select a current one for a given beamline +* introduced file transfer service - possibility for consumer clients to receive data also in case filesystem is inaccessible + IMPROVEMENTS * switch to MongoDB 4.2 diff --git a/CMakeLists.txt b/CMakeLists.txt index 6a19e04e8c3ba1fce6cbb406a14999b59b15d5f5..9eca15771bdc8a9f261a509e131bf3b21bb91a21 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -94,6 +94,8 @@ add_subdirectory(authorizer) add_subdirectory(asapo_tools) +add_subdirectory(file_transfer) + add_subdirectory(deploy) if(BUILD_INTEGRATION_TESTS) diff --git a/CMakeModules/prepare_asapo.cmake b/CMakeModules/prepare_asapo.cmake index c5163021646a4307e61dc521f47da6ab80e89db3..6249467e58838f0c041af6b76796ac7a507ff5aa 100644 --- a/CMakeModules/prepare_asapo.cmake +++ b/CMakeModules/prepare_asapo.cmake @@ -3,9 +3,21 @@ function(prepare_asapo) get_target_property(RECEIVER_NAME receiver-bin OUTPUT_NAME) get_target_property(DISCOVERY_FULLPATH asapo-discovery EXENAME) get_target_property(AUTHORIZER_FULLPATH asapo-authorizer EXENAME) + get_target_property(FILE_TRANSFER_FULLPATH asapo-file-transfer EXENAME) get_target_property(BROKER_FULLPATH asapo-broker EXENAME) set(WORK_DIR ${CMAKE_CURRENT_BINARY_DIR}) + file(TO_NATIVE_PATH ${CMAKE_CURRENT_BINARY_DIR}/asap3 ASAP3_FOLDER ) + file(TO_NATIVE_PATH ${CMAKE_CURRENT_BINARY_DIR}/beamline CURRENT_BEAMLINES_FOLDER ) + + if (WIN32) + string(REPLACE "\\" "\\\\" ASAP3_FOLDER "${ASAP3_FOLDER}") + string(REPLACE "\\" "\\\\" CURRENT_BEAMLINES_FOLDER "${CURRENT_BEAMLINES_FOLDER}") + endif() + + set (ASAP3_FOLDER "${ASAP3_FOLDER}" PARENT_SCOPE) + set (CURRENT_BEAMLINES_FOLDER "${CURRENT_BEAMLINES_FOLDER}" PARENT_SCOPE) + if(NOT DEFINED RECEIVER_USE_CACHE) set(RECEIVER_USE_CACHE true) endif() @@ -27,9 +39,11 @@ function(prepare_asapo) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver.nmd.in receiver.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/discovery.nmd.in discovery.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/authorizer.nmd.in authorizer.nmd @ONLY) + configure_file(${CMAKE_SOURCE_DIR}/config/nomad/file_transfer.nmd.in file_transfer.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/broker.nmd.in broker.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/discovery_settings.json.tpl discovery.json.tpl COPYONLY) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/broker_settings.json.tpl broker.json.tpl COPYONLY) + configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/file_transfer_settings.json.tpl file_transfer.json.tpl COPYONLY) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/auth_secret.key auth_secret.key COPYONLY) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/nginx.conf.tpl nginx.conf.tpl COPYONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/nginx.nmd.in nginx.nmd @ONLY) diff --git a/authorizer/src/asapo_authorizer/server/authorize.go b/authorizer/src/asapo_authorizer/server/authorize.go index 9574aa9e3d9566fbe4538fb53e1f0dc297d4a510..42ee3cd020546075e3a30e72a20fddd68eefe74c 100644 --- a/authorizer/src/asapo_authorizer/server/authorize.go +++ b/authorizer/src/asapo_authorizer/server/authorize.go @@ -3,7 +3,6 @@ package server import ( log "asapo_common/logger" "asapo_common/utils" - "encoding/json" "errors" "net/http" "path/filepath" @@ -48,11 +47,6 @@ func getSourceCredentials(request authorizationRequest) (SourceCredentials, erro return creds, nil } -func extractRequest(r *http.Request) (request authorizationRequest, err error) { - decoder := json.NewDecoder(r.Body) - err = decoder.Decode(&request) - return -} func splitHost(hostPort string) string { s := strings.Split(hostPort, ":") @@ -95,7 +89,7 @@ func beamtimeMetaFromMatch(match string) (beamtimeMeta, error) { return beamtimeMeta{}, errors.New("skipped fodler") } - bt.OfflinePath = match + bt.OfflinePath = settings.RootBeamtimesFolder+string(filepath.Separator)+match bt.Beamline, bt.BeamtimeId = vars[2], vars[5] return bt, nil @@ -172,10 +166,10 @@ func needHostAuthorization(creds SourceCredentials) bool { func authorizeByToken(creds SourceCredentials) error { var token_expect string if (creds.BeamtimeId != "auto") { - token_expect, _ = auth.GenerateToken(&creds.BeamtimeId) + token_expect, _ = authHMAC.GenerateToken(&creds.BeamtimeId) } else { key := "bl_" + creds.Beamline - token_expect, _ = auth.GenerateToken(&key) + token_expect, _ = authHMAC.GenerateToken(&key) } var err_string string @@ -255,33 +249,31 @@ func authorize(request authorizationRequest, creds SourceCredentials) (beamtimeM } func routeAuthorize(w http.ResponseWriter, r *http.Request) { - request, err := extractRequest(r) + var request authorizationRequest + err := utils.ExtractRequest(r,&request) if err != nil { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte(err.Error())) + utils.WriteServerError(w,err,http.StatusBadRequest) return } creds, err := getSourceCredentials(request) if err != nil { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte(err.Error())) + utils.WriteServerError(w,err,http.StatusBadRequest) return } beamtimeInfo, err := authorize(request, creds) if (err != nil) { - w.WriteHeader(http.StatusUnauthorized) - w.Write([]byte(err.Error())) + utils.WriteServerError(w,err,http.StatusUnauthorized) return } res, err := utils.MapToJson(&beamtimeInfo) if err != nil { - w.WriteHeader(http.StatusInternalServerError) + utils.WriteServerError(w,err,http.StatusInternalServerError) return - } + w.WriteHeader(http.StatusOK) w.Write([]byte(res)) } diff --git a/authorizer/src/asapo_authorizer/server/authorize_test.go b/authorizer/src/asapo_authorizer/server/authorize_test.go index 3867622f1b4aca3fd437fc24d8b52e2e795b2630..ca2c21f7689272ce12281f597e91fcdda7bb4293 100644 --- a/authorizer/src/asapo_authorizer/server/authorize_test.go +++ b/authorizer/src/asapo_authorizer/server/authorize_test.go @@ -13,8 +13,8 @@ import ( ) func prepareToken(beamtime_or_beamline string) string{ - auth = utils.NewHMACAuth("secret") - token, _ := auth.GenerateToken(&beamtime_or_beamline) + authHMAC = utils.NewHMACAuth("secret") + token, _ := authHMAC.GenerateToken(&beamtime_or_beamline) return token } @@ -35,12 +35,12 @@ func containsMatcher(substr string) func(str string) bool { return func(str string) bool { return strings.Contains(str, substr) } } -func makeRequest(request authorizationRequest) string { +func makeRequest(request interface{}) string { buf, _ := utils.MapToJson(request) return string(buf) } -func doAuthorizeRequest(path string,buf string) *httptest.ResponseRecorder { +func doPostRequest(path string,buf string) *httptest.ResponseRecorder { mux := utils.NewRouter(listRoutes) req, _ := http.NewRequest("POST", path, strings.NewReader(buf)) w := httptest.NewRecorder() @@ -86,7 +86,7 @@ func TestSplitCreds(t *testing.T) { func TestAuthorizeDefaultOK(t *testing.T) { allowBeamlines([]beamtimeMeta{{"asapo_test","beamline","","2019","tf"}}) request := makeRequest(authorizationRequest{"asapo_test%%%","host"}) - w := doAuthorizeRequest("/authorize",request) + w := doPostRequest("/authorize",request) body, _ := ioutil.ReadAll(w.Body) @@ -135,7 +135,7 @@ func TestAuthorizeWithToken(t *testing.T) { for _, test := range authTests { request := makeRequest(authorizationRequest{test.beamtime_id+"%"+test.beamline+"%"+test.stream+"%"+test.token,"host"}) - w := doAuthorizeRequest("/authorize",request) + w := doPostRequest("/authorize",request) body, _ := ioutil.ReadAll(w.Body) if test.status==http.StatusOK { @@ -148,7 +148,7 @@ func TestAuthorizeWithToken(t *testing.T) { assert.Contains(t, body_str, "tf/gpfs/bl1/2019/data/test", "") if (test.beamtime_id == "test_online") { assert.Contains(t, body_str, "tf/gpfs/bl1/2019/data/test_online", "") - assert.Contains(t, body_str, "./bl1/current", "") + assert.Contains(t, body_str, "bl1/current", "") } else { assert.NotContains(t, body_str, "current", "") } @@ -222,7 +222,7 @@ func TestAuthorizeBeamline(t *testing.T) { for _, test := range authBeamlineTests { request := makeRequest(authorizationRequest{"auto%"+test.beamline+"%stream%"+test.token,"host"}) - w := doAuthorizeRequest("/authorize",request) + w := doPostRequest("/authorize",request) body, _ := ioutil.ReadAll(w.Body) body_str:=string(body) @@ -243,19 +243,19 @@ func TestAuthorizeBeamline(t *testing.T) { func TestNotAuthorized(t *testing.T) { request := makeRequest(authorizationRequest{"any_id%%%","host"}) - w := doAuthorizeRequest("/authorize",request) + w := doPostRequest("/authorize",request) assert.Equal(t, http.StatusUnauthorized, w.Code, "") } func TestAuthorizeWrongRequest(t *testing.T) { - w := doAuthorizeRequest("/authorize","babla") + w := doPostRequest("/authorize","babla") assert.Equal(t, http.StatusBadRequest, w.Code, "") } func TestAuthorizeWrongPath(t *testing.T) { - w := doAuthorizeRequest("/authorized","") + w := doPostRequest("/authorized","") assert.Equal(t, http.StatusNotFound, w.Code, "") } @@ -296,7 +296,7 @@ func TestAuthorizeWithFile(t *testing.T) { request := authorizationRequest{"11003924%%%","127.0.0.1"} - w := doAuthorizeRequest("/authorize",makeRequest(request)) + w := doPostRequest("/authorize",makeRequest(request)) body, _ := ioutil.ReadAll(w.Body) body_str:=string(body) @@ -309,7 +309,7 @@ func TestAuthorizeWithFile(t *testing.T) { assert.Equal(t, http.StatusOK, w.Code, "") request = authorizationRequest{"wrong%%%","127.0.0.1"} - w = doAuthorizeRequest("/authorize",makeRequest(request)) + w = doPostRequest("/authorize",makeRequest(request)) assert.Equal(t, http.StatusUnauthorized, w.Code, "") os.Remove("127.0.0.1") @@ -339,7 +339,7 @@ func TestGetBeamtimeInfo(t *testing.T) { settings.RootBeamtimesFolder=test.root bt,err:= beamtimeMetaFromMatch(test.root+string(filepath.Separator)+test.fname) if test.ok { - assert.Equal(t,bt.OfflinePath,test.fname) + assert.Equal(t,bt.OfflinePath,test.root+string(filepath.Separator)+test.fname) assert.Equal(t,bt.Beamline,test.beamline) assert.Equal(t,bt.BeamtimeId,test.id) assert.Nil(t,err,"should not be error") diff --git a/authorizer/src/asapo_authorizer/server/folder_token.go b/authorizer/src/asapo_authorizer/server/folder_token.go new file mode 100644 index 0000000000000000000000000000000000000000..8c50ae06224924888ea1ce450a73e16e37731769 --- /dev/null +++ b/authorizer/src/asapo_authorizer/server/folder_token.go @@ -0,0 +1,115 @@ +package server + +import ( + "asapo_common/utils" + "net/http" + "time" + log "asapo_common/logger" + "errors" + "path/filepath" +) + +type folderTokenRequest struct { + Folder string + BeamtimeId string + Token string +} + +type folderToken struct { + Token string +} + +/*func routeFolderToken(w http.ResponseWriter, r *http.Request) { + utils.ProcessJWTAuth(processFolderTokenRequest,settings.secret)(w,r) +}*/ + +func prepareJWTToken(request folderTokenRequest) (string,error) { + var claims utils.CustomClaims + var extraClaim utils.FolderTokenTokenExtraClaim + + extraClaim.RootFolder = request.Folder + claims.ExtraClaims = &extraClaim + claims.Duration = time.Duration(settings.TokenDurationMin) * time.Minute + + return authJWT.GenerateToken(&claims) + +} + +func folderTokenResponce(token string) []byte{ + return []byte(token) +} + +func checkBeamtimeToken(request folderTokenRequest) error { + token_expect, _ := authHMAC.GenerateToken(&request.BeamtimeId) + var err_string string + if request.Token != token_expect { + err_string = "wrong token for beamtime " + request.BeamtimeId + log.Error(err_string) + return errors.New(err_string) + } + return nil +} + + +func extractFolderTokenrequest(r *http.Request) (folderTokenRequest,error) { + var request folderTokenRequest + err := utils.ExtractRequest(r,&request) + if err != nil { + return folderTokenRequest{},err + } + + if len(request.Folder)==0 ||len(request.BeamtimeId)==0 || len(request.Token) == 0 { + return folderTokenRequest{},errors.New("some request fields are empty") + } + return request,nil + +} + +func checkBeamtimeFolder(request folderTokenRequest) error { + beamtimeMeta, err := findMeta(SourceCredentials{request.BeamtimeId,"auto","",""}) + if err != nil { + log.Error("cannot get beamtime meta"+err.Error()) + return err + } + + folder := filepath.Clean(request.Folder) + if (folder != filepath.Clean(beamtimeMeta.OnlinePath) && folder != filepath.Clean(beamtimeMeta.OfflinePath)) { + err_string := folder + " does not match beamtime folders "+beamtimeMeta.OnlinePath+" or " +beamtimeMeta.OfflinePath + log.Error(err_string) + return errors.New(err_string) + } + + return nil +} + +func routeFolderToken(w http.ResponseWriter, r *http.Request) { + request, err := extractFolderTokenrequest(r) + if err != nil { + utils.WriteServerError(w,err,http.StatusBadRequest) + return + } + + err = checkBeamtimeToken(request) + if err != nil { + utils.WriteServerError(w,err,http.StatusUnauthorized) + return + } + + err = checkBeamtimeFolder(request) + if err != nil { + utils.WriteServerError(w,err,http.StatusUnauthorized) + return + } + + token, err := prepareJWTToken(request) + if err != nil { + utils.WriteServerError(w,err,http.StatusInternalServerError) + return + } + + log.Debug("generated folder token for beamtime " + request.BeamtimeId + ", folder " + request.Folder) + + answer := folderTokenResponce(token) + w.WriteHeader(http.StatusOK) + w.Write(answer) +} diff --git a/authorizer/src/asapo_authorizer/server/folder_token_test.go b/authorizer/src/asapo_authorizer/server/folder_token_test.go new file mode 100644 index 0000000000000000000000000000000000000000..dbd48e2c42ad871d4157cf9b97e9af8677de9d99 --- /dev/null +++ b/authorizer/src/asapo_authorizer/server/folder_token_test.go @@ -0,0 +1,65 @@ +package server + +import ( + "asapo_common/utils" + "github.com/stretchr/testify/assert" + "io/ioutil" + "net/http" + "testing" + "os" + "path/filepath" + "fmt" +) + +var fodlerTokenTests = [] struct { + beamtime_id string + root_folder string + token string + status int + message string +}{ + {"test", "tf/gpfs/bl1/2019/data/test",prepareToken("test"),http.StatusOK,"beamtime found"}, + {"test_online", "bl1/current",prepareToken("test_online"),http.StatusOK,"online beamtime found"}, + {"test", "bl1/current",prepareToken("test"),http.StatusUnauthorized,"no online beamtime found"}, + {"test_online", "bl2/current",prepareToken("test_online"),http.StatusUnauthorized,"wrong online folder"}, + {"test", "tf/gpfs/bl1/2019/data/test1",prepareToken("test"),http.StatusUnauthorized,"wrong folder"}, + {"test", "tf/gpfs/bl1/2019/data/test",prepareToken("test1"),http.StatusUnauthorized,"wrong token"}, + {"11111111", "tf/gpfs/bl1/2019/data/test",prepareToken("11111111"),http.StatusBadRequest,"bad request"}, +} + +func TestFolderToken(t *testing.T) { + allowBeamlines([]beamtimeMeta{}) + settings.RootBeamtimesFolder ="." + settings.CurrentBeamlinesFolder="." + os.MkdirAll(filepath.Clean("tf/gpfs/bl1/2019/data/test"), os.ModePerm) + os.MkdirAll(filepath.Clean("tf/gpfs/bl1/2019/data/test_online"), os.ModePerm) + + os.MkdirAll(filepath.Clean("bl1/current"), os.ModePerm) + ioutil.WriteFile(filepath.Clean("bl1/current/beamtime-metadata-test_online.json"), []byte(beamtime_meta_online), 0644) + + defer os.RemoveAll("tf") + defer os.RemoveAll("bl1") + + for _, test := range fodlerTokenTests { + authJWT = utils.NewJWTAuth("secret") + abs_path:=settings.RootBeamtimesFolder + string(filepath.Separator)+test.root_folder + request := makeRequest(folderTokenRequest{abs_path,test.beamtime_id,test.token}) + if test.status == http.StatusBadRequest { + request =makeRequest(authorizationRequest{}) + } + w := doPostRequest("/folder",request) + if w.Code == http.StatusOK { + body, _ := ioutil.ReadAll(w.Body) + claims,_ := utils.CheckJWTToken(string(body),"secret") + var extra_claim utils.FolderTokenTokenExtraClaim + utils.MapToStruct(claims.(*utils.CustomClaims).ExtraClaims.(map[string]interface{}), &extra_claim) + assert.Equal(t, abs_path, extra_claim.RootFolder, test.message) + } else { + body, _ := ioutil.ReadAll(w.Body) + fmt.Println(string(body)) + } + + assert.Equal(t, test.status, w.Code, test.message) + } +} + diff --git a/authorizer/src/asapo_authorizer/server/listroutes.go b/authorizer/src/asapo_authorizer/server/listroutes.go index f32c5c8303ab09a9be2f4b2951f52deecd51bf24..9f8e562b5ddad13f54363f25599f16420440ca80 100644 --- a/authorizer/src/asapo_authorizer/server/listroutes.go +++ b/authorizer/src/asapo_authorizer/server/listroutes.go @@ -17,5 +17,10 @@ var listRoutes = utils.Routes{ "/health-check", routeGetHealth, }, - + utils.Route{ + "Folder Token", + "POST", + "/folder", + routeFolderToken, + }, } diff --git a/authorizer/src/asapo_authorizer/server/server.go b/authorizer/src/asapo_authorizer/server/server.go index e6b6f632a30cc32a3e8a910fde86a5d87f6318c8..c2fbe406905883e4e02a818d56880e8b0da71e43 100644 --- a/authorizer/src/asapo_authorizer/server/server.go +++ b/authorizer/src/asapo_authorizer/server/server.go @@ -1,6 +1,8 @@ package server -import "asapo_common/utils" +import ( +"asapo_common/utils" +) type beamtimeMeta struct { BeamtimeId string `json:"beamtimeId"` @@ -18,8 +20,10 @@ type serverSettings struct { CurrentBeamlinesFolder string AlwaysAllowedBeamtimes []beamtimeMeta SecretFile string + TokenDurationMin int } var settings serverSettings -var auth utils.Auth +var authHMAC utils.Auth +var authJWT utils.Auth diff --git a/authorizer/src/asapo_authorizer/server/server_nottested.go b/authorizer/src/asapo_authorizer/server/server_nottested.go index 55fee6f5d1d90cc6a6a1f57a0d31984c7810b704..4f693e3812b700507906c02936c915b00e3379de 100644 --- a/authorizer/src/asapo_authorizer/server/server_nottested.go +++ b/authorizer/src/asapo_authorizer/server/server_nottested.go @@ -18,12 +18,12 @@ func Start() { log.Fatal(http.ListenAndServe(":"+strconv.Itoa(settings.Port), http.HandlerFunc(mux.ServeHTTP))) } -func createAuth() (utils.Auth, error) { +func createAuth() (utils.Auth,utils.Auth, error) { secret, err := utils.ReadFirstStringFromFile(settings.SecretFile) if err != nil { - return nil, err + return nil,nil, err } - return utils.NewHMACAuth(secret), nil + return utils.NewHMACAuth(secret),utils.NewJWTAuth(secret), nil } @@ -41,7 +41,7 @@ func ReadConfig(fname string) (log.Level, error) { } var err error - auth, err = createAuth() + authHMAC,authJWT, err = createAuth() if err != nil { return log.FatalLevel, err } diff --git a/common/cpp/include/common/data_structs.h b/common/cpp/include/common/data_structs.h index e46b6ec7ae0ff3ec09ba260841c58c45624210ad..21f89e211ee62935aad0949e01277674114e3254 100644 --- a/common/cpp/include/common/data_structs.h +++ b/common/cpp/include/common/data_structs.h @@ -7,6 +7,8 @@ #include <vector> #include <string> +#include "error.h" + namespace asapo { std::string IsoDateFromEpochNanosecs(uint64_t time_from_epoch_nanosec); @@ -36,6 +38,7 @@ inline bool operator==(const FileInfo& lhs, const FileInfo& rhs) { using FileData = std::unique_ptr<uint8_t[]>; + using FileInfos = std::vector<FileInfo>; struct DataSet { diff --git a/common/cpp/include/http_client/http_client.h b/common/cpp/include/http_client/http_client.h index 9b9f5444b3e34a38fcedfe89a5d854c2bac1966e..5ea1c6b71c7efe91af9b797156b4071810627257 100644 --- a/common/cpp/include/http_client/http_client.h +++ b/common/cpp/include/http_client/http_client.h @@ -2,6 +2,7 @@ #define ASAPO_HTTP_CLIENT_H #include "common/error.h" +#include "common/data_structs.h" namespace asapo { @@ -10,10 +11,17 @@ enum class HttpCode; class HttpClient { public: virtual std::string Get(const std::string& uri, HttpCode* response_code, Error* err) const noexcept = 0; - virtual std::string Post(const std::string& uri, const std::string& data, HttpCode* response_code, + virtual std::string Post(const std::string& uri, const std::string& cookie, const std::string& data, + HttpCode* response_code, Error* err) const noexcept = 0; + virtual Error Post(const std::string& uri, const std::string& cookie, + const std::string& input_data, FileData* ouput_data, + uint64_t output_data_size, + HttpCode* response_code) const noexcept = 0; + virtual Error Post(const std::string& uri, const std::string& cookie, + const std::string& input_data, std::string output_file_name, + HttpCode* response_code) const noexcept = 0; virtual ~HttpClient() = default; - }; std::unique_ptr<HttpClient> DefaultHttpClient(); diff --git a/common/cpp/include/unittests/MockHttpClient.h b/common/cpp/include/unittests/MockHttpClient.h index d128e8a84fd669604c99afdda2272445e434f471..f201558565cb6f84ada470525b9c33d938d6429b 100644 --- a/common/cpp/include/unittests/MockHttpClient.h +++ b/common/cpp/include/unittests/MockHttpClient.h @@ -16,16 +16,35 @@ class MockHttpClient : public HttpClient { err->reset(error); return response; } - std::string Post(const std::string& uri, const std::string& data, HttpCode* code, Error* err) const noexcept override { + std::string Post(const std::string& uri, const std::string& cookie, const std::string& data, HttpCode* code, + Error* err) const noexcept override { ErrorInterface* error = nullptr; auto response = Post_t(uri, data, code, &error); err->reset(error); return response; } + + Error Post(const std::string& uri, const std::string& cookie, const std::string& input_data, FileData* ouput_data, + uint64_t output_data_size, + HttpCode* response_code) const noexcept override { + return Error{PostReturnArray_t(uri, cookie, input_data, ouput_data, output_data_size, response_code)}; + }; + + Error Post(const std::string& uri, const std::string& cookie, + const std::string& input_data, std::string output_file_name, + HttpCode* response_code) const noexcept override { + return nullptr; + }; + + MOCK_CONST_METHOD3(Get_t, std::string(const std::string& uri, HttpCode* code, ErrorInterface** err)); MOCK_CONST_METHOD4(Post_t, std::string(const std::string& uri, const std::string& data, HttpCode* code, ErrorInterface** err)); + MOCK_CONST_METHOD6(PostReturnArray_t, + ErrorInterface * (const std::string& uri, const std::string& cookie, const std::string& input_data, + FileData* ouput_data, uint64_t output_data_size, HttpCode* code)); + }; diff --git a/common/cpp/src/http_client/curl_http_client.cpp b/common/cpp/src/http_client/curl_http_client.cpp index cf335c73a960652c4fa2b1919508e39d2c039783..7698e7f81155926f3d7cdb51be28cfd79531e5f9 100644 --- a/common/cpp/src/http_client/curl_http_client.cpp +++ b/common/cpp/src/http_client/curl_http_client.cpp @@ -2,6 +2,8 @@ #include <cstring> #include "http_client/http_error.h" +#include "common/data_structs.h" +#include "io/io_factory.h" namespace asapo { @@ -20,20 +22,44 @@ CurlHttpClientInstance::~CurlHttpClientInstance() { curl_global_cleanup(); } -size_t curl_write( void* ptr, size_t size, size_t nmemb, void* buffer) { - auto strbuf = (std::string*)buffer; - strbuf->append((char*)ptr, size * nmemb); - return size * nmemb; +size_t curl_write( void* ptr, size_t size, size_t nmemb, void* data_container) { + auto container = (CurlDataContainer*)data_container; + size_t nbytes = size * nmemb; + switch (container->mode) { + case CurlDataMode::string: + container->string_buffer.append((char*)ptr, nbytes); + break; + case CurlDataMode::array: + if (container->bytes_received + nbytes > container->array_size) { + return -1; + } + memcpy(container->p_array->get(), ptr, nbytes); + container->bytes_received += nbytes; + break; + case CurlDataMode::file: + Error err; + container->io->Write(container->fd, ptr, nbytes, &err); + if (err) { + return -1; + } + break; + } + return nbytes; } -void SetCurlOptions(CURL* curl, bool post, const std::string& data, const std::string& uri, char* errbuf, - std::string* buffer) { +void SetCurlOptions(CURL* curl, bool post, const std::string& cookie, const std::string& data, const std::string& uri, + char* errbuf, + CurlDataContainer* data_container) { errbuf[0] = 0; curl_easy_setopt(curl, CURLOPT_URL, uri.c_str()); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, buffer); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, data_container); curl_easy_setopt(curl, CURLOPT_FAILONERROR, 0L); curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, errbuf); + if (!cookie.empty()) { + curl_easy_setopt(curl, CURLOPT_COOKIE, cookie.c_str()); + } + //todo use a config parameter for this curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS, 5000L); @@ -58,50 +84,128 @@ std::string GetCurlError(CURL* curl, CURLcode res, const char* errbuf) { } } -Error ProcessCurlResponse(CURL* curl, CURLcode res, const char* errbuf, - std::string* buffer, HttpCode* response_code) { +Error ProcessCurlResponse(CURL* curl, CURLcode res, const char* errbuf, HttpCode* response_code) { if(res == CURLE_OK) { *response_code = GetResponseCode(curl); return nullptr; } else { - *buffer = GetCurlError(curl, res, errbuf); + auto err_string = GetCurlError(curl, res, errbuf); if (res == CURLE_COULDNT_CONNECT || res == CURLE_COULDNT_RESOLVE_HOST) { - return HttpErrorTemplates::kConnectionError.Generate(*buffer); + return HttpErrorTemplates::kConnectionError.Generate(err_string); } else { - return HttpErrorTemplates::kTransferError.Generate(*buffer); + return HttpErrorTemplates::kTransferError.Generate(err_string); } } } -std::string CurlHttpClient::Command(bool post, const std::string& uri, const std::string& data, HttpCode* response_code, - Error* err) const noexcept { +Error CurlHttpClient::Command(bool post, CurlDataContainer* data_container, const std::string& uri, + const std::string& cookie, + const std::string& data, HttpCode* response_code) const noexcept { std::lock_guard<std::mutex> lock{mutex_}; - - std::string buffer; char errbuf[CURL_ERROR_SIZE]; - - SetCurlOptions(curl_, post, data, uri, errbuf, &buffer); - + SetCurlOptions(curl_, post, cookie, data, uri, errbuf, data_container); auto res = curl_easy_perform(curl_); + return ProcessCurlResponse(curl_, res, errbuf, response_code); +} - *err = ProcessCurlResponse(curl_, res, errbuf, &buffer, response_code); +FileData AllocateMemory(uint64_t size, Error* err) { + FileData data; + try { + data = FileData{new uint8_t[(size_t)size +1 ]}; + } catch (...) { + *err = ErrorTemplates::kMemoryAllocationError.Generate(); + return nullptr; + } + *err = nullptr; + return data; +} - return buffer; +Error CurlHttpClient::Post(const std::string& uri, + const std::string& cookie, + const std::string& input_data, + FileData* output_data, + uint64_t output_data_size, + HttpCode* response_code) const noexcept { + Error err; + CurlDataContainer data_container; + data_container.mode = CurlDataMode::array; + uint64_t extended_size =output_data_size + 10000; // for error messages + *output_data = AllocateMemory(extended_size, &err); + if (err) { + return err; + } + data_container.p_array = output_data; + data_container.array_size = extended_size; + err = Command(true, &data_container, uri, cookie, input_data, response_code); + if (!err) { + if (*response_code == HttpCode::OK) { + if (output_data_size != data_container.bytes_received) { + return HttpErrorTemplates::kTransferError.Generate("received " + + std::to_string(data_container.bytes_received) + ", expected " + std::to_string(output_data_size) + "bytes"); + } + (*output_data)[output_data_size] = 0; // for reinterpret cast to string worked + } else { + (*output_data)[data_container.bytes_received] = 0; // for reinterpret cast to string worked + } + return nullptr; + } else { + return err; + } +} +Error CurlHttpClient::Post(const std::string& uri, + const std::string& cookie, + const std::string& input_data, + std::string output_file_name, + HttpCode* response_code) const noexcept { + Error err; + CurlDataContainer data_container; + data_container.mode = CurlDataMode::file; + data_container.io = io_.get(); + data_container.fd = io_->Open(output_file_name, IO_OPEN_MODE_CREATE | IO_OPEN_MODE_RW | IO_OPEN_MODE_SET_LENGTH_0, + &err); + if (err) { + return err; + } + + err = Command(true, &data_container, uri, cookie, input_data, response_code); + io_->Close(data_container.fd, nullptr); + if (!err) { + return nullptr; + } else { + return err; + } } +std::string CurlHttpClient::StringPostGet(bool post, + const std::string& uri, + const std::string& cookie, + const std::string& data, + HttpCode* response_code, Error* err) const noexcept { + CurlDataContainer data_container; + data_container.mode = CurlDataMode::string; + *err = Command(post, &data_container, uri, cookie, data, response_code); + if (!*err) { + return data_container.string_buffer; + } else { + return ""; + } +} std::string CurlHttpClient::Get(const std::string& uri, HttpCode* response_code, Error* err) const noexcept { - return Command(false, uri, "", response_code, err); + return StringPostGet(false, uri, "", "", response_code, err); } -std::string CurlHttpClient::Post(const std::string& uri, const std::string& data, HttpCode* response_code, +std::string CurlHttpClient::Post(const std::string& uri, + const std::string& cookie, + const std::string& data, + HttpCode* response_code, Error* err) const noexcept { - return Command(true, uri, data, response_code, err); + return StringPostGet(true, uri, cookie, data, response_code, err); } -CurlHttpClient::CurlHttpClient() { +CurlHttpClient::CurlHttpClient(): io_{GenerateDefaultIO()} { curl_ = curl_easy_init(); if (!curl_) { throw "Cannot initialize curl"; @@ -109,12 +213,10 @@ CurlHttpClient::CurlHttpClient() { } - CurlHttpClient::~CurlHttpClient() { if (curl_) { curl_easy_cleanup(curl_); } } - } diff --git a/common/cpp/src/http_client/curl_http_client.h b/common/cpp/src/http_client/curl_http_client.h index a3627136e95b049438dcad1b7fffd2e828f3ba32..f437a1e4ce11fc6b7520a6125faafd0e9c4d1616 100644 --- a/common/cpp/src/http_client/curl_http_client.h +++ b/common/cpp/src/http_client/curl_http_client.h @@ -7,24 +7,54 @@ #include "http_client/http_client.h" #include "curl/curl.h" +#include "io/io.h" namespace asapo { +enum class CurlDataMode { + string, + array, + file +}; + +struct CurlDataContainer { + std::string string_buffer; + FileData* p_array; + uint64_t array_size; + uint64_t bytes_received = 0; + CurlDataMode mode; + FileDescriptor fd; + const IO* io; +}; + + class CurlHttpClient final : public HttpClient { public: CurlHttpClient(); std::string Get(const std::string& uri, HttpCode* response_code, Error* err) const noexcept override; - std::string Post(const std::string& uri, const std::string& data, HttpCode* response_code, + std::string Post(const std::string& uri, const std::string& cookie, const std::string& data, HttpCode* response_code, Error* err) const noexcept override; + Error Post(const std::string& uri, const std::string& cookie, const std::string& input_data, FileData* output_data, + uint64_t output_data_size, + HttpCode* response_code) const noexcept override; + Error Post(const std::string& uri, const std::string& cookie, + const std::string& input_data, std::string output_file_name, + HttpCode* response_code) const noexcept override; + + virtual ~CurlHttpClient(); private: - std::string Command(bool post, const std::string& uri, const std::string& data, HttpCode* response_code, - Error* err) const noexcept; + std::unique_ptr<IO> io_; + Error Command(bool post, CurlDataContainer* data_container, const std::string& uri, const std::string& cookie, + const std::string& data, HttpCode* response_code) const noexcept; + std::string StringPostGet(bool post, const std::string& uri, const std::string& cookie, + const std::string& data, HttpCode* response_code, Error* err) const noexcept; mutable std::mutex mutex_; CURL* curl_ = 0; }; + } #endif //ASAPO_CURL_HTTP_CLIENT_H diff --git a/common/cpp/src/logger/CMakeLists.txt b/common/cpp/src/logger/CMakeLists.txt index 600a1b66693026ba87d7db2e5769f2c3419aea7d..04b6fb251b4bf98b06390fc10894b6d5a6bae5f4 100644 --- a/common/cpp/src/logger/CMakeLists.txt +++ b/common/cpp/src/logger/CMakeLists.txt @@ -21,6 +21,6 @@ set(TEST_SOURCE_FILES ../../unittests/logger/test_logger.cpp link_libraries(${CURL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) -set(TEST_LIBRARIES "${TARGET_NAME};curl_http_client") +set(TEST_LIBRARIES "${TARGET_NAME};curl_http_client;system_io") include_directories(${ASAPO_CXX_COMMON_INCLUDE_DIR} ${CMAKE_SOURCE_DIR}/3d_party/spd_log/include) gtest(${TARGET_NAME} "${TEST_SOURCE_FILES}" "${TEST_LIBRARIES}") diff --git a/common/cpp/src/logger/fluentd_sink.cpp b/common/cpp/src/logger/fluentd_sink.cpp index 540841f0a7ba650c3d1832a74a37381c85ec45c9..e291fc7de767793b4c14d0fcfe9761344fdcb30e 100644 --- a/common/cpp/src/logger/fluentd_sink.cpp +++ b/common/cpp/src/logger/fluentd_sink.cpp @@ -9,7 +9,7 @@ void FluentdSink::_sink_it(const spdlog::details::log_msg& msg) { HttpCode code; log_str.erase(log_str.find_last_not_of("\n\r\t") + 1); Error err; - httpclient__->Post(endpoint_uri_, log_str, &code, &err); + httpclient__->Post(endpoint_uri_, "", log_str, &code, &err); if (err) { std::cerr << "cannot send logs - " + err->Explain() << std::endl; } diff --git a/common/go/src/asapo_common/utils/authorization.go b/common/go/src/asapo_common/utils/authorization.go index 48ba8e4b8505ea1c502b7c34709796f889a07a68..aac273c5a213b2fd79abe03a55e025a37c666523 100644 --- a/common/go/src/asapo_common/utils/authorization.go +++ b/common/go/src/asapo_common/utils/authorization.go @@ -1,15 +1,16 @@ package utils import ( - "errors" - "net/http" - "net/url" - "strings" "context" - "github.com/dgrijalva/jwt-go" "crypto/hmac" "crypto/sha256" "encoding/base64" + "errors" + "github.com/dgrijalva/jwt-go" + "net/http" + "net/url" + "strings" + "time" ) type AuthorizationRequest struct { @@ -78,13 +79,10 @@ func ExtractAuthInfo(r *http.Request) (authType, token string, err error) { type CustomClaims struct { jwt.StandardClaims + Duration time.Duration ExtraClaims interface{} } -type JobClaim struct { - BeamtimeId string -} - type JWTAuth struct { Key string } @@ -103,9 +101,9 @@ func (t JWTAuth) GenerateToken(val ...interface{}) (string, error) { return "", errors.New("Wrong claims") } -// if claims.Duration > 0 { -// claims.ExpiresAt = time.Now().Add(claims.Duration).Unix() -// } + if claims.Duration > 0 { + claims.ExpiresAt = time.Now().Add(claims.Duration).Unix() + } token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) tokenString, err := token.SignedString([]byte(t.Key)) @@ -119,7 +117,10 @@ func (t JWTAuth) GenerateToken(val ...interface{}) (string, error) { func ProcessJWTAuth(fn http.HandlerFunc, key string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - + if (r.RequestURI == "/health-check") { // always allow /health-check request + fn(w,r) + return + } authType, token, err := ExtractAuthInfo(r) if err != nil { @@ -131,13 +132,13 @@ func ProcessJWTAuth(fn http.HandlerFunc, key string) http.HandlerFunc { if authType == "Bearer" { if claims, ok := CheckJWTToken(token, key); !ok { - http.Error(w, "Internal authorization error - tocken does not match", http.StatusUnauthorized) + http.Error(w, "Authorization error - tocken does not match", http.StatusUnauthorized) return } else { - ctx = context.WithValue(ctx, "JobClaim", claims) + ctx = context.WithValue(ctx, "TokenClaims", claims) } } else { - http.Error(w, "Internal authorization error - wrong auth type", http.StatusUnauthorized) + http.Error(w, "Authorization error - wrong auth type", http.StatusUnauthorized) return } fn(w, r.WithContext(ctx)) @@ -162,7 +163,7 @@ func CheckJWTToken(token, key string) (jwt.Claims, bool) { } func JobClaimFromContext(r *http.Request, val interface{}) error { - c := r.Context().Value("JobClaim") + c := r.Context().Value("TokenClaims") if c == nil { return errors.New("Empty context") @@ -192,7 +193,7 @@ func generateHMACToken(value string, key string) string { mac.Write([]byte(value)) return base64.URLEncoding.EncodeToString(mac.Sum(nil)) -} + } func (h HMACAuth) GenerateToken(val ...interface{}) (string, error) { if len(val) != 1 { @@ -204,6 +205,7 @@ func (h HMACAuth) GenerateToken(val ...interface{}) (string, error) { } sha := generateHMACToken(*value, h.Key) + return sha, nil } @@ -219,7 +221,7 @@ func ProcessHMACAuth(fn http.HandlerFunc, key string) http.HandlerFunc { // todo extract beamline from request value := "beamline" if authType == "HMAC-SHA-256" { - if !checkHMACToken(value, token, key) { + if !CheckHMACToken(value, token, key) { http.Error(w, "Internal authorization error - tocken does not match", http.StatusUnauthorized) return } @@ -231,7 +233,7 @@ func ProcessHMACAuth(fn http.HandlerFunc, key string) http.HandlerFunc { } } -func checkHMACToken(value string, token, key string) bool { +func CheckHMACToken(value string, token, key string) bool { if token == "" { return false diff --git a/common/go/src/asapo_common/utils/authorization_test.go b/common/go/src/asapo_common/utils/authorization_test.go new file mode 100644 index 0000000000000000000000000000000000000000..d8a9b47745443a470d52051d73e2de11d36ab94d --- /dev/null +++ b/common/go/src/asapo_common/utils/authorization_test.go @@ -0,0 +1,86 @@ +package utils + +import ( + "net/http" + "testing" + "net/http/httptest" + "time" + "github.com/stretchr/testify/assert" +) + +type JobClaim struct { + AuthorizationResponce + JobInd string +} + + +func writeAuthResponse(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + var jc JobClaim + JobClaimFromContext(r, &jc) + w.Write([]byte(jc.UserName)) + w.Write([]byte(jc.JobInd)) +} + +func TestGenerateJWTToken(t *testing.T) { + + a := NewJWTAuth("hi") + token, _ := a.GenerateToken((&CustomClaims{Duration: 0, ExtraClaims: nil})) + assert.Equal(t, "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJEdXJhdGlvbiI"+ + "6MCwiRXh0cmFDbGFpbXMiOm51bGx9.JJcqNZciIDILk-A2sJZCY1sND458bcjNv6tXC2jxric", + token, "jwt token") + +} + +var HJWTAuthtests = []struct { + Mode string + Key string + User string + jobID string + Duration time.Duration + Answercode int + Message string +}{ + {"header", "hi", "testuser", "123", time.Hour, http.StatusOK, "correct auth - header"}, + {"cookie", "hi", "testuser", "123", time.Hour, http.StatusOK, "correct auth - cookie"}, + {"header", "hi", "testuser", "123", time.Microsecond, http.StatusUnauthorized, "token expired"}, + {"header", "hih", "testuser", "123", 1, http.StatusUnauthorized, "wrong key"}, + {"", "hi", "testuser", "123", 1, http.StatusUnauthorized, "auth no header"}, +} + +func TestProcessJWTAuth(t *testing.T) { + for _, test := range HJWTAuthtests { + req, _ := http.NewRequest("POST", "http://blabla", nil) + + var claim JobClaim + claim.UserName = test.User + claim.JobInd = test.jobID + + a := NewJWTAuth(test.Key) + + token, _ := a.GenerateToken((&CustomClaims{Duration: test.Duration, ExtraClaims: &claim})) + if test.Mode == "header" { + req.Header.Add("Authorization", "Bearer "+token) + } + + if test.Mode == "cookie" { + c := http.Cookie{Name: "Authorization", Value: "Bearer "+token} + req.AddCookie(&c) + } + + w := httptest.NewRecorder() + if test.Duration == time.Microsecond { + if testing.Short() { + continue + } + time.Sleep(time.Second) + } + ProcessJWTAuth(http.HandlerFunc(writeAuthResponse), "hi")(w, req) + assert.Equal(t, test.Answercode, w.Code, test.Message) + if w.Code == http.StatusOK { + assert.Contains(t, w.Body.String(), test.User, test.Message) + assert.Contains(t, w.Body.String(), test.jobID, test.Message) + } + } +} + diff --git a/common/go/src/asapo_common/utils/http.go b/common/go/src/asapo_common/utils/http.go new file mode 100644 index 0000000000000000000000000000000000000000..4e6f9736e6ff015ba66a5b56a6b32b9e3a1e4982 --- /dev/null +++ b/common/go/src/asapo_common/utils/http.go @@ -0,0 +1,16 @@ +package utils + +import ( + "encoding/json" + "net/http" +) + +func ExtractRequest(r *http.Request, request interface{}) error { + decoder := json.NewDecoder(r.Body) + return decoder.Decode(request) +} + +func WriteServerError(w http.ResponseWriter, err error,code int) { + w.WriteHeader(code) + w.Write([]byte(err.Error())) +} \ No newline at end of file diff --git a/common/go/src/asapo_common/utils/stucts.go b/common/go/src/asapo_common/utils/stucts.go index 9682dd622216a1dee1fd003c20d43499d9c4839c..e9c11a59e09cdd64e60cbffc4d996bdf1a036c66 100644 --- a/common/go/src/asapo_common/utils/stucts.go +++ b/common/go/src/asapo_common/utils/stucts.go @@ -16,10 +16,15 @@ type MongoInfo struct { StaticEndpoint string } +type FtsInfo struct { + StaticEndpoint string +} + type Settings struct { Receiver ReceiverInfo Broker BrokerInfo Mongo MongoInfo + FileTransferService FtsInfo ConsulEndpoints []string Mode string Port int @@ -51,3 +56,7 @@ func (settings *Settings) Validate() error { return nil } + +type FolderTokenTokenExtraClaim struct { + RootFolder string +} diff --git a/config/nomad/file_transfer.nmd.in b/config/nomad/file_transfer.nmd.in new file mode 100644 index 0000000000000000000000000000000000000000..2ff798a51180888984ab5b8660817551bb30f7f4 --- /dev/null +++ b/config/nomad/file_transfer.nmd.in @@ -0,0 +1,55 @@ +job "file_transfer" { + datacenters = ["dc1"] + + type = "service" + + group "group" { + count = 1 + + task "file_transfer" { + driver = "raw_exec" + + config { + command = "@FILE_TRANSFER_FULLPATH@", + args = ["-config","${NOMAD_TASK_DIR}/file_transfer.json"] + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + port "file_transfer" { + static = "5008" + } + } + } + + service { + name = "asapo-fts" + port = "file_transfer" + check { + name = "alive" + type = "http" + path = "/health-check" + interval = "10s" + timeout = "2s" + initial_status = "passing" + } + } + + template { + source = "@WORK_DIR@/file_transfer.json.tpl" + destination = "local/file_transfer.json" + change_mode = "signal" + change_signal = "SIGHUP" + } + + template { + source = "@WORK_DIR@/auth_secret.key" + destination = "auth_secret.key" + change_mode = "signal" + change_signal = "SIGHUP" + } + } + } +} diff --git a/consumer/api/cpp/include/consumer/consumer_error.h b/consumer/api/cpp/include/consumer/consumer_error.h index e994a98025aa44e84a3f750f454b1cafd273ef8a..4f50ae3b0fb835cb25e8263d19de0b8252b0d883 100644 --- a/consumer/api/cpp/include/consumer/consumer_error.h +++ b/consumer/api/cpp/include/consumer/consumer_error.h @@ -54,7 +54,7 @@ auto const kInterruptedTransaction = ConsumerErrorTemplate{ }; auto const kUnavailableService = ConsumerErrorTemplate{ - "broker service unavailable", ConsumerErrorType::kUnavailableService + "service unavailable", ConsumerErrorType::kUnavailableService }; diff --git a/consumer/api/cpp/include/consumer/data_broker.h b/consumer/api/cpp/include/consumer/data_broker.h index 539b688187dcb0bef2f9700d7a59d4fac1defa6e..f142bf497d4206b2b5697b1e579c87fa4497ad62 100644 --- a/consumer/api/cpp/include/consumer/data_broker.h +++ b/consumer/api/cpp/include/consumer/data_broker.h @@ -136,6 +136,7 @@ class DataBroker { class DataBrokerFactory { public: static std::unique_ptr<DataBroker> CreateServerBroker(std::string server_name, std::string source_path, + bool has_filesystem, SourceCredentials source, Error* error) noexcept; diff --git a/consumer/api/cpp/src/data_broker.cpp b/consumer/api/cpp/src/data_broker.cpp index 48abcb63e5dc0ed2d694b33ab5291f162fae5aed..1f3b1db63f68a21992a76d3283271c5c96e13007 100644 --- a/consumer/api/cpp/src/data_broker.cpp +++ b/consumer/api/cpp/src/data_broker.cpp @@ -26,9 +26,10 @@ std::unique_ptr<DataBroker> Create(const std::string& source_name, } std::unique_ptr<DataBroker> DataBrokerFactory::CreateServerBroker(std::string server_name, std::string source_path, - SourceCredentials source, + bool has_filesystem, SourceCredentials source, Error* error) noexcept { - return Create<ServerDataBroker>(std::move(server_name), error, std::move(source_path), std::move(source)); + return Create<ServerDataBroker>(std::move(server_name), error, std::move(source_path), has_filesystem, + std::move(source)); } diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp index 4078698d39146906774f575d26508da0446db2f2..0a54c141dc93f7847e224ac89956c75f3230087e 100644 --- a/consumer/api/cpp/src/server_data_broker.cpp +++ b/consumer/api/cpp/src/server_data_broker.cpp @@ -1,4 +1,5 @@ #include "server_data_broker.h" +#include "server_data_broker.h" #include <chrono> @@ -13,6 +14,9 @@ using std::chrono::system_clock; namespace asapo { +const std::string ServerDataBroker::kBrokerServiceName = "broker"; +const std::string ServerDataBroker::kFileTransferServiceName = "fts"; + Error GetNoDataResponseFromJson(const std::string& json_string, ConsumerErrorData* data) { JsonStringParser parser(json_string); Error err; @@ -23,7 +27,7 @@ Error GetNoDataResponseFromJson(const std::string& json_string, ConsumerErrorDat return nullptr; } -Error ErrorFromNoDataResponse(const std::string& response) { +Error ConsumerErrorFromNoDataResponse(const std::string& response) { if (response.find("get_record_by_id") != std::string::npos) { ConsumerErrorData data; auto parse_error = GetNoDataResponseFromJson(response, &data); @@ -31,7 +35,7 @@ Error ErrorFromNoDataResponse(const std::string& response) { return ConsumerErrorTemplates::kInterruptedTransaction.Generate("malformed response - " + response); } Error err; - if (data.id >= data.id_max ) { + if (data.id >= data.id_max) { err = data.next_substream.empty() ? ConsumerErrorTemplates::kEndOfStream.Generate() : ConsumerErrorTemplates::kStreamFinished.Generate(); } else { @@ -44,31 +48,48 @@ Error ErrorFromNoDataResponse(const std::string& response) { return ConsumerErrorTemplates::kNoData.Generate(); } -Error ErrorFromServerResponce(const std::string& response, const HttpCode& code) { +Error ConsumerErrorFromHttpCode(const RequestOutput* response, const HttpCode& code) { switch (code) { case HttpCode::OK: return nullptr; case HttpCode::BadRequest: - return ConsumerErrorTemplates::kWrongInput.Generate(response); + return ConsumerErrorTemplates::kWrongInput.Generate(response->to_string()); case HttpCode::Unauthorized: - return ConsumerErrorTemplates::kWrongInput.Generate(response); + return ConsumerErrorTemplates::kWrongInput.Generate(response->to_string()); case HttpCode::InternalServerError: - return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response); + return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response->to_string()); case HttpCode::NotFound: - return ConsumerErrorTemplates::kUnavailableService.Generate(response); + return ConsumerErrorTemplates::kUnavailableService.Generate(response->to_string()); case HttpCode::Conflict: - return ErrorFromNoDataResponse(response); + return ConsumerErrorFromNoDataResponse(response->to_string()); default: - return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response); + return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response->to_string()); + } +} +Error ConsumerErrorFromServerError(const Error& server_err) { + if (server_err == HttpErrorTemplates::kTransferError) { + return ConsumerErrorTemplates::kInterruptedTransaction.Generate( + "error processing request: " + server_err->Explain()); + } else { + return ConsumerErrorTemplates::kUnavailableService.Generate("error processing request: " + server_err->Explain()); } } +Error ProcessRequestResponce(const Error& server_err, const RequestOutput* response, const HttpCode& code) { + if (server_err != nullptr) { + return ConsumerErrorFromServerError(server_err); + } + return ConsumerErrorFromHttpCode(response, code); +} + ServerDataBroker::ServerDataBroker(std::string server_uri, std::string source_path, + bool has_filesystem, SourceCredentials source) : io__{GenerateDefaultIO()}, httpclient__{DefaultHttpClient()}, net_client__{new TcpClient()}, -server_uri_{std::move(server_uri)}, source_path_{std::move(source_path)}, source_credentials_(std::move(source)) { + endpoint_{std::move(server_uri)}, source_path_{std::move(source_path)}, has_filesystem_{has_filesystem}, +source_credentials_(std::move(source)) { if (source_credentials_.stream.empty()) { source_credentials_.stream = SourceCredentials::kDefaultStream; @@ -84,41 +105,62 @@ std::string ServerDataBroker::RequestWithToken(std::string uri) { return std::move(uri) + "?token=" + source_credentials_.user_token; } -Error ServerDataBroker::ProcessRequest(std::string* response, const RequestInfo& request) { +Error ServerDataBroker::ProcessPostRequest(const RequestInfo& request,RequestOutput* response, HttpCode* code) { + Error err; + switch (request.output_mode) { + case OutputDataMode::string: + response->string_output = + httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, + request.cookie, + request.body, + code, + &err); + break; + case OutputDataMode::array: + err = httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, request.cookie, + request.body, &response->data_output, response->data_output_size, code); + break; + } + return err; +} + + +Error ServerDataBroker::ProcessGetRequest(const RequestInfo& request,RequestOutput* response, HttpCode* code) { + Error err; + response->string_output = + httpclient__->Get(RequestWithToken(request.host + request.api) + request.extra_params, code, &err); + return err; +} + + +Error ServerDataBroker::ProcessRequest(RequestOutput* response, const RequestInfo& request, std::string* service_uri) { Error err; HttpCode code; if (request.post) { - *response = - httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, request.body, &code, - &err); + err = ProcessPostRequest(request,response,&code); } else { - *response = httpclient__->Get(RequestWithToken(request.host + request.api) + request.extra_params, &code, &err); + err = ProcessGetRequest(request,response,&code); } - if (err != nullptr) { - current_broker_uri_ = ""; - if (err == HttpErrorTemplates::kTransferError) { - return ConsumerErrorTemplates::kInterruptedTransaction.Generate("error processing request: " + err->Explain()); - } else { - return ConsumerErrorTemplates::kUnavailableService.Generate("error processing request: " + err->Explain()); - } + if (err && service_uri) { + service_uri->clear(); } - return ErrorFromServerResponce(*response, code); + return ProcessRequestResponce(err, response, code); } -Error ServerDataBroker::GetBrokerUri() { - if (!current_broker_uri_.empty()) { +Error ServerDataBroker::DiscoverService(const std::string& service_name , std::string* uri_to_set) { + if (!uri_to_set->empty()) { return nullptr; } - RequestInfo ri; - ri.host = server_uri_; - ri.api = "/discovery/broker"; - + ri.host = endpoint_; + ri.api = "/discovery/" + service_name; + RequestOutput output; Error err; - err = ProcessRequest(¤t_broker_uri_, ri); - if (err != nullptr || current_broker_uri_.empty()) { - current_broker_uri_ = ""; - return ConsumerErrorTemplates::kUnavailableService.Generate(" on " + server_uri_ + err = ProcessRequest(&output, ri, nullptr); + *uri_to_set = std::move(output.string_output); + if (err != nullptr || uri_to_set->empty()) { + uri_to_set->clear(); + return ConsumerErrorTemplates::kUnavailableService.Generate(" on " + endpoint_ + (err != nullptr ? ": " + err->Explain() : "")); } @@ -148,22 +190,23 @@ RequestInfo ServerDataBroker::PrepareRequestInfo(std::string api_url, bool datas return ri; } - Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string group_id, std::string substream, GetImageServerOperation op, bool dataset) { std::string request_suffix = OpToUriCmd(op); std::string request_api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/" + std::move(substream) + - + "/" + std::move(group_id) + "/"; + +"/" + std::move(group_id) + "/"; uint64_t elapsed_ms = 0; Error no_data_error; while (true) { auto start = system_clock::now(); - auto err = GetBrokerUri(); + auto err = DiscoverService(kBrokerServiceName, ¤t_broker_uri_); if (err == nullptr) { - auto ri = PrepareRequestInfo(request_api + request_suffix, dataset); - err = ProcessRequest(response, ri); + auto ri = PrepareRequestInfo(request_api + request_suffix, dataset); + RequestOutput output; + err = ProcessRequest(&output, ri, ¤t_broker_uri_); + *response = std::move(output.string_output); if (err == nullptr) { break; } @@ -186,7 +229,7 @@ Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string g return no_data_error ? std::move(no_data_error) : std::move(err); } std::this_thread::sleep_for(std::chrono::milliseconds(100)); - elapsed_ms += std::chrono::duration_cast<std::chrono::milliseconds>( system_clock::now() - start).count(); + elapsed_ms += std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now() - start).count(); } return nullptr; } @@ -196,7 +239,12 @@ Error ServerDataBroker::GetNext(FileInfo* info, std::string group_id, FileData* } Error ServerDataBroker::GetNext(FileInfo* info, std::string group_id, std::string substream, FileData* data) { - return GetImageFromServer(GetImageServerOperation::GetNext, 0, std::move(group_id), std::move(substream), info, data); + return GetImageFromServer(GetImageServerOperation::GetNext, + 0, + std::move(group_id), + std::move(substream), + info, + data); } Error ServerDataBroker::GetLast(FileInfo* info, std::string group_id, FileData* data) { @@ -204,7 +252,12 @@ Error ServerDataBroker::GetLast(FileInfo* info, std::string group_id, FileData* } Error ServerDataBroker::GetLast(FileInfo* info, std::string group_id, std::string substream, FileData* data) { - return GetImageFromServer(GetImageServerOperation::GetLast, 0, std::move(group_id), std::move(substream), info, data); + return GetImageFromServer(GetImageServerOperation::GetLast, + 0, + std::move(group_id), + std::move(substream), + info, + data); } std::string ServerDataBroker::OpToUriCmd(GetImageServerOperation op) { @@ -240,10 +293,19 @@ Error ServerDataBroker::GetImageFromServer(GetImageServerOperation op, uint64_t if (!info->SetFromJson(response)) { return ConsumerErrorTemplates::kInterruptedTransaction.Generate(std::string("malformed response:") + response); } - return GetDataIfNeeded(info, data); } +Error ServerDataBroker::GetDataFromFile(FileInfo* info, FileData* data) { + Error error; + *data = io__->GetDataFromFile(info->FullName(source_path_), &info->size, &error); + if (error) { + return ConsumerErrorTemplates::kLocalIOError.Generate(error->Explain()); + } + return nullptr; +} + + Error ServerDataBroker::RetrieveData(FileInfo* info, FileData* data) { if (data == nullptr || info == nullptr) { return ConsumerErrorTemplates::kWrongInput.Generate("pointers are empty"); @@ -257,13 +319,11 @@ Error ServerDataBroker::RetrieveData(FileInfo* info, FileData* data) { } } - Error error; - *data = io__->GetDataFromFile(info->FullName(source_path_), &info->size, &error); - if (error) { - return ConsumerErrorTemplates::kLocalIOError.Generate(error->Explain()); + if (has_filesystem_) { + return GetDataFromFile(info, data); } - return nullptr; + return GetDataFromFileTransferService(info, data, false); } Error ServerDataBroker::GetDataIfNeeded(FileInfo* info, FileData* data) { @@ -290,27 +350,57 @@ std::string ServerDataBroker::GenerateNewGroupId(Error* err) { return BrokerRequestWithTimeout(ri, err); } -std::string ServerDataBroker::AppendUri(std::string request_string) { - return current_broker_uri_ + "/" + std::move(request_string); -} - -std::string ServerDataBroker::BrokerRequestWithTimeout(RequestInfo request, Error* err) { +Error ServerDataBroker::ServiceRequestWithTimeout(const std::string& service_name, + std::string* service_uri, + RequestInfo request, + RequestOutput* response) { uint64_t elapsed_ms = 0; - std::string response; + Error err; while (elapsed_ms <= timeout_ms_) { auto start = system_clock::now(); - *err = GetBrokerUri(); - if (*err == nullptr) { - request.host = current_broker_uri_; - *err = ProcessRequest(&response, request); - if (*err == nullptr || (*err) == ConsumerErrorTemplates::kWrongInput) { - return response; + err = DiscoverService(service_name, service_uri); + if (err == nullptr) { + request.host = *service_uri; + err = ProcessRequest(response, request, service_uri); + if (err == nullptr || err == ConsumerErrorTemplates::kWrongInput) { + break; } } std::this_thread::sleep_for(std::chrono::milliseconds(100)); - elapsed_ms += std::chrono::duration_cast<std::chrono::milliseconds>( system_clock::now() - start).count(); + elapsed_ms += std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now() - start).count(); + } + return err; +} + +Error ServerDataBroker::FtsRequestWithTimeout(const FileInfo* info, FileData* data) { + RequestInfo ri = CreateFileTransferRequest(info); + RequestOutput response; + response.data_output_size = info->size; + auto err = ServiceRequestWithTimeout(kFileTransferServiceName, ¤t_fts_uri_, ri, &response); + if (err) { + return err; + } + *data = std::move(response.data_output); + return nullptr; +} + +RequestInfo ServerDataBroker::CreateFileTransferRequest(const FileInfo* info) const { + RequestInfo ri; + ri.api = "/transfer"; + ri.post = true; + ri.body = "{\"Folder\":\"" + source_path_ + "\",\"FileName\":\"" + info->name + "\"}"; + ri.cookie = "Authorization=Bearer " + folder_token_; + ri.output_mode = OutputDataMode::array; + return ri; +} + +std::string ServerDataBroker::BrokerRequestWithTimeout(RequestInfo request, Error* err) { + RequestOutput response; + *err = ServiceRequestWithTimeout(kBrokerServiceName, ¤t_broker_uri_, request, &response); + if (*err) { + return ""; } - return ""; + return std::move(response.string_output); } Error ServerDataBroker::SetLastReadMarker(uint64_t value, std::string group_id) { @@ -340,7 +430,7 @@ Error ServerDataBroker::SetLastReadMarker(uint64_t value, std::string group_id, uint64_t ServerDataBroker::GetCurrentSize(std::string substream, Error* err) { RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + - + "/" + std::move(substream) + "/size"; + +"/" + std::move(substream) + "/size"; auto responce = BrokerRequestWithTimeout(ri, err); if (*err) { return 0; @@ -369,13 +459,12 @@ Error ServerDataBroker::GetById(uint64_t id, return GetImageFromServer(GetImageServerOperation::GetID, id, group_id, substream, info, data); } - Error ServerDataBroker::GetRecordFromServerById(uint64_t id, std::string* response, std::string group_id, std::string substream, bool dataset) { RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + - + "/" + std::move(substream) + + +"/" + std::move(substream) + "/" + std::move( group_id) + "/" + std::to_string(id); if (dataset) { @@ -421,7 +510,6 @@ FileInfos ServerDataBroker::QueryImages(std::string query, std::string substream return dataset.content; } - FileInfos ServerDataBroker::QueryImages(std::string query, Error* err) { return QueryImages(std::move(query), kDefaultSubstream, err); } @@ -477,7 +565,6 @@ std::vector<std::string> ParseSubstreamsFromResponse(std::string response, Error return substreams; } - std::vector<std::string> ServerDataBroker::GetSubstreamList(Error* err) { RequestInfo ri; @@ -492,4 +579,45 @@ std::vector<std::string> ServerDataBroker::GetSubstreamList(Error* err) { return ParseSubstreamsFromResponse(std::move(response), err); } +Error ServerDataBroker::UpdateFolderTokenIfNeeded(bool ignore_existing) { + if (!folder_token_.empty() && !ignore_existing) { + return nullptr; + } + folder_token_.clear(); + + RequestOutput output; + RequestInfo ri = CreateFolderTokenRequest(); + auto err = ProcessRequest(&output, ri, nullptr); + if (err) { + return err; + } + folder_token_ = std::move(output.string_output); + return nullptr; +} + +RequestInfo ServerDataBroker::CreateFolderTokenRequest() const { + RequestInfo ri; + ri.host = endpoint_; + ri.api = "/authorizer/folder"; + ri.post = true; + ri.body = "{\"Folder\":\"" + source_path_ + "\",\"BeamtimeId\":\"" + source_credentials_.beamtime_id + "\",\"Token\":\"" + + source_credentials_.user_token + "\"}"; + return ri; +} + +Error ServerDataBroker::GetDataFromFileTransferService(const FileInfo* info, FileData* data, + bool retry_with_new_token) { + auto err = UpdateFolderTokenIfNeeded(retry_with_new_token); + if (err) { + return err; + } + + err = FtsRequestWithTimeout(info, data); + if (err == ConsumerErrorTemplates::kWrongInput + && !retry_with_new_token) { // token expired? Refresh token and try again. + return GetDataFromFileTransferService(info, data, true); + } + return err; +} + } diff --git a/consumer/api/cpp/src/server_data_broker.h b/consumer/api/cpp/src/server_data_broker.h index ba9ca04544bc02b1600ed462a1d40796f6e122a3..fb5111ebca6091f85f98711e1697761a250717cb 100644 --- a/consumer/api/cpp/src/server_data_broker.h +++ b/consumer/api/cpp/src/server_data_broker.h @@ -8,8 +8,6 @@ namespace asapo { -Error ErrorFromServerResponce(const std::string& response, const HttpCode& code); -Error ErrorFromNoDataResponse(const std::string& response); enum class GetImageServerOperation { GetNext, @@ -17,18 +15,44 @@ enum class GetImageServerOperation { GetID }; +enum class OutputDataMode { + string, + array, + file +}; + + struct RequestInfo { std::string host; std::string api; std::string extra_params; std::string body; + std::string cookie; + OutputDataMode output_mode = OutputDataMode::string; bool post = false; }; +struct RequestOutput { + std::string string_output; + FileData data_output; + uint64_t data_output_size; + const char* to_string() const { + if (!data_output) { + return string_output.c_str(); + } else { + return reinterpret_cast<char const*>(data_output.get()) ; + } + } +}; + +Error ProcessRequestResponce(const Error& server_err, const RequestOutput* response, const HttpCode& code); +Error ConsumerErrorFromNoDataResponse(const std::string& response); + class ServerDataBroker final : public asapo::DataBroker { public: - explicit ServerDataBroker(std::string server_uri, std::string source_path, SourceCredentials source); + explicit ServerDataBroker(std::string server_uri, std::string source_path, bool has_filesystem, + SourceCredentials source); Error ResetLastReadMarker(std::string group_id) override; Error ResetLastReadMarker(std::string group_id, std::string substream) override; @@ -72,31 +96,47 @@ class ServerDataBroker final : public asapo::DataBroker { std::unique_ptr<HttpClient> httpclient__; std::unique_ptr<NetClient> net_client__; private: + Error GetDataFromFileTransferService(const FileInfo* info, FileData* data, bool retry_with_new_token); + Error GetDataFromFile(FileInfo* info, FileData* data); + static const std::string kBrokerServiceName; + static const std::string kFileTransferServiceName; std::string RequestWithToken(std::string uri); Error GetRecordFromServer(std::string* info, std::string group_id, std::string substream, GetImageServerOperation op, bool dataset = false); Error GetRecordFromServerById(uint64_t id, std::string* info, std::string group_id, std::string substream, bool dataset = false); Error GetDataIfNeeded(FileInfo* info, FileData* data); - Error GetBrokerUri(); + Error DiscoverService(const std::string& service_name, std::string* uri_to_set); bool SwitchToGetByIdIfNoData(Error* err, const std::string& response, std::string* redirect_uri); - Error ProcessRequest(std::string* response, const RequestInfo& request); + Error ProcessRequest(RequestOutput* response, const RequestInfo& request, std::string* service_uri); Error GetImageFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, std::string substream, FileInfo* info, FileData* data); DataSet GetDatasetFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, std::string substream, Error* err); bool DataCanBeInBuffer(const FileInfo* info); Error TryGetDataFromBuffer(const FileInfo* info, FileData* data); + Error ServiceRequestWithTimeout(const std::string& service_name, std::string* service_uri, RequestInfo request, + RequestOutput* response); std::string BrokerRequestWithTimeout(RequestInfo request, Error* err); - std::string AppendUri(std::string request_string); + Error FtsRequestWithTimeout(const FileInfo* info, FileData* data); + Error RequestDataFromFts(const FileInfo* info, FileData* data); + Error ProcessPostRequest(const RequestInfo& request,RequestOutput* response, HttpCode* code); + Error ProcessGetRequest(const RequestInfo& request,RequestOutput* response, HttpCode* code); + DataSet DecodeDatasetFromResponse(std::string response, Error* err); RequestInfo PrepareRequestInfo(std::string api_url, bool dataset); std::string OpToUriCmd(GetImageServerOperation op); - std::string server_uri_; + Error UpdateFolderTokenIfNeeded(bool ignore_existing); + std::string endpoint_; std::string current_broker_uri_; + std::string current_fts_uri_; std::string source_path_; + bool has_filesystem_; SourceCredentials source_credentials_; uint64_t timeout_ms_ = 0; + std::string folder_token_; + RequestInfo CreateFolderTokenRequest() const; + RequestInfo CreateFileTransferRequest(const FileInfo* info) const; }; diff --git a/consumer/api/cpp/unittests/test_consumer_api.cpp b/consumer/api/cpp/unittests/test_consumer_api.cpp index d683a4233d2d5eb9b0260ca48ec8d9a5da4da060..90e5b0b0182c91f63a0a1b03d056a809b76c0d63 100644 --- a/consumer/api/cpp/unittests/test_consumer_api.cpp +++ b/consumer/api/cpp/unittests/test_consumer_api.cpp @@ -27,7 +27,7 @@ class DataBrokerFactoryTests : public Test { TEST_F(DataBrokerFactoryTests, CreateServerDataSource) { - auto data_broker = DataBrokerFactory::CreateServerBroker("server", "path", asapo::SourceCredentials{"beamtime_id", "", "", "token"}, &error); + auto data_broker = DataBrokerFactory::CreateServerBroker("server", "path", false, asapo::SourceCredentials{"beamtime_id", "", "", "token"}, &error); ASSERT_THAT(error, Eq(nullptr)); ASSERT_THAT(dynamic_cast<ServerDataBroker*>(data_broker.get()), Ne(nullptr)); diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp index 26b9e61ebd8db83c175b4d3eac1ffd8952c12bb6..2821fcd1562b9f191e5edd3a6e74f970343e6f7d 100644 --- a/consumer/api/cpp/unittests/test_server_broker.cpp +++ b/consumer/api/cpp/unittests/test_server_broker.cpp @@ -1,4 +1,5 @@ #include <gmock/gmock.h> +#include <gmock/gmock.h> #include "gtest/gtest.h" #include "consumer/data_broker.h" @@ -44,21 +45,26 @@ namespace { TEST(FolderDataBroker, Constructor) { auto data_broker = - std::unique_ptr<ServerDataBroker> {new ServerDataBroker("test", "path", asapo::SourceCredentials{"beamtime_id", "", "", "token"})}; + std::unique_ptr<ServerDataBroker> {new ServerDataBroker("test", "path", false, + asapo::SourceCredentials{"beamtime_id", "", "", "token"}) + }; ASSERT_THAT(dynamic_cast<asapo::SystemIO*>(data_broker->io__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::CurlHttpClient*>(data_broker->httpclient__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::TcpClient*>(data_broker->net_client__.get()), Ne(nullptr)); } +const uint8_t expected_value = 1; + class ServerDataBrokerTests : public Test { public: - std::unique_ptr<ServerDataBroker> data_broker; + std::unique_ptr<ServerDataBroker> data_broker, fts_data_broker; NiceMock<MockIO> mock_io; NiceMock<MockHttpClient> mock_http_client; NiceMock<MockNetClient> mock_netclient; FileInfo info; std::string expected_server_uri = "test:8400"; std::string expected_broker_uri = "broker:5005"; + std::string expected_fts_uri = "fts:5008"; std::string expected_token = "token"; std::string expected_path = "/tmp/beamline/beamtime"; std::string expected_filename = "filename"; @@ -68,22 +74,40 @@ class ServerDataBrokerTests : public Test { std::string expected_substream = "substream"; std::string expected_metadata = "{\"meta\":1}"; std::string expected_query_string = "bla"; - + std::string expected_folder_token = "folder_token"; + std::string expected_beamtime_id = "beamtime_id"; + uint64_t expected_image_size = 100; uint64_t expected_dataset_id = 1; static const uint64_t expected_buf_id = 123; std::string expected_next_substream = "nextsubstream"; + std::string expected_fts_query_string = "{\"Folder\":\"" + expected_path + "\",\"FileName\":\"" + expected_filename + + "\"}"; + std::string expected_cookie = "Authorization=Bearer " + expected_folder_token; + + void AssertSingleFileTransfer(); void SetUp() override { data_broker = std::unique_ptr<ServerDataBroker> { - new ServerDataBroker(expected_server_uri, expected_path, asapo::SourceCredentials{"beamtime_id", "", expected_stream, expected_token}) + new ServerDataBroker(expected_server_uri, expected_path, true, asapo::SourceCredentials{expected_beamtime_id, "", expected_stream, expected_token}) + }; + fts_data_broker = std::unique_ptr<ServerDataBroker> { + new ServerDataBroker(expected_server_uri, expected_path, false, asapo::SourceCredentials{expected_beamtime_id, "", expected_stream, expected_token}) }; data_broker->io__ = std::unique_ptr<IO> {&mock_io}; data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; data_broker->net_client__ = std::unique_ptr<asapo::NetClient> {&mock_netclient}; + fts_data_broker->io__ = std::unique_ptr<IO> {&mock_io}; + fts_data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; + fts_data_broker->net_client__ = std::unique_ptr<asapo::NetClient> {&mock_netclient}; + } void TearDown() override { data_broker->io__.release(); data_broker->httpclient__.release(); data_broker->net_client__.release(); + fts_data_broker->io__.release(); + fts_data_broker->httpclient__.release(); + fts_data_broker->net_client__.release(); + } void MockGet(const std::string& response) { EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_broker_uri), _, _)).WillOnce(DoAll( @@ -100,11 +124,25 @@ class ServerDataBrokerTests : public Test { Return("") )); } - void MockGetBrokerUri() { - EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/discovery/broker"), _, _)).WillOnce(DoAll( + void MockGetServiceUri(std::string service, std::string result) { + EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/discovery/" + service), _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), - Return(expected_broker_uri))); + Return(result))); + } + + void MockBeforeFTS(FileData* data); + + void MockGetFTSUri() { + MockGetServiceUri("fts", expected_fts_uri); + } + + void ExpectFolderToken(); + void ExpectFileTransfer(const asapo::ConsumerErrorTemplate* p_err_template); + void ExpectRepeatedFileTransfer(); + + void MockGetBrokerUri() { + MockGetServiceUri("broker", expected_broker_uri); } void MockReadDataFromFile(int times = 1) { if (times == 0) { @@ -117,7 +155,7 @@ class ServerDataBrokerTests : public Test { } FileInfo CreateFI(uint64_t buf_id = expected_buf_id) { FileInfo fi; - fi.size = 100; + fi.size = expected_image_size; fi.id = 1; fi.buf_id = buf_id; fi.name = expected_filename; @@ -137,7 +175,7 @@ TEST_F(ServerDataBrokerTests, DefaultStreamIsDetector) { data_broker->httpclient__.release(); data_broker->net_client__.release(); data_broker = std::unique_ptr<ServerDataBroker> { - new ServerDataBroker(expected_server_uri, expected_path, asapo::SourceCredentials{"beamtime_id", "", "", expected_token}) + new ServerDataBroker(expected_server_uri, expected_path, false, asapo::SourceCredentials{"beamtime_id", "", "", expected_token}) }; data_broker->io__ = std::unique_ptr<IO> {&mock_io}; data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; @@ -1002,7 +1040,97 @@ TEST_F(ServerDataBrokerTests, GetSubstreamListUsesCorrectUri) { } +void ServerDataBrokerTests::MockBeforeFTS(FileData* data) { + auto to_send = CreateFI(); + auto json = to_send.Json(); + MockGet(json); + + EXPECT_CALL(mock_netclient, GetData_t(&info, + data)).WillOnce(Return(asapo::IOErrorTemplates::kUnknownIOError.Generate().release())); +} + +void ServerDataBrokerTests::ExpectFolderToken() { + std::string expected_folder_query_string = "{\"Folder\":\"" + expected_path + "\",\"BeamtimeId\":\"" + expected_beamtime_id + + "\",\"Token\":\"" + expected_token + "\"}"; + + EXPECT_CALL(mock_http_client, Post_t(HasSubstr(expected_server_uri + "/authorizer/folder"), + expected_folder_query_string, _, _)).WillOnce(DoAll( + SetArgPointee<2>(HttpCode::OK), + SetArgPointee<3>(nullptr), + Return(expected_folder_token) + )); + +} + +ACTION_P(AssignArg3, assign) { + if (assign) { + asapo::FileData data = asapo::FileData{new uint8_t[1] }; + data[0] = expected_value; + *arg3 = std::move(data); + } +} +void ServerDataBrokerTests::ExpectFileTransfer(const asapo::ConsumerErrorTemplate* p_err_template) { + EXPECT_CALL(mock_http_client, PostReturnArray_t(HasSubstr(expected_fts_uri + "/transfer"), + expected_cookie, expected_fts_query_string, _, expected_image_size, _)).WillOnce(DoAll( + SetArgPointee<5>(HttpCode::OK), + AssignArg3(p_err_template == nullptr), + Return(p_err_template == nullptr ? nullptr : p_err_template->Generate().release()) + )); +} +void ServerDataBrokerTests::ExpectRepeatedFileTransfer() { + EXPECT_CALL(mock_http_client, PostReturnArray_t(HasSubstr(expected_fts_uri + "/transfer"), + expected_cookie, expected_fts_query_string, _, expected_image_size, _)). + WillOnce(DoAll( + SetArgPointee<5>(HttpCode::Unauthorized), + Return(nullptr))). + WillOnce(DoAll( + SetArgPointee<5>(HttpCode::OK), + Return(nullptr) + )); +} + +void ServerDataBrokerTests::AssertSingleFileTransfer() { + asapo::FileData data = asapo::FileData{new uint8_t[1] }; + MockGetBrokerUri(); + MockBeforeFTS(&data); + ExpectFolderToken(); + MockGetFTSUri(); + ExpectFileTransfer(nullptr); + + fts_data_broker->GetNext(&info, expected_group_id, &data); + + ASSERT_THAT(data[0], Eq(expected_value)); + Mock::VerifyAndClearExpectations(&mock_http_client); + Mock::VerifyAndClearExpectations(&mock_netclient); + Mock::VerifyAndClearExpectations(&mock_io); +} + + +TEST_F(ServerDataBrokerTests, GetImageUsesFileTransferServiceIfCannotReadFromCache) { + AssertSingleFileTransfer(); +} + +TEST_F(ServerDataBrokerTests, GetImageReusesTokenAndUri) { + AssertSingleFileTransfer(); + + asapo::FileData data = asapo::FileData{new uint8_t[1] }; + MockBeforeFTS(&data); + ExpectFileTransfer(nullptr); + + auto err = fts_data_broker->GetNext(&info, expected_group_id, &data); +} + +TEST_F(ServerDataBrokerTests, GetImageTriesToGetTokenAgainIfTransferFailed) { + AssertSingleFileTransfer(); + + asapo::FileData data; + MockBeforeFTS(&data); + ExpectRepeatedFileTransfer(); + ExpectFolderToken(); + + auto err = fts_data_broker->GetNext(&info, expected_group_id, &data); +} } diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 1475ad22f0e0ba37df4ba88ee97aa8dc7e3f6fa4..e55b036a581cc5c8f1ec013be8b9869d6902f0fc 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -66,7 +66,7 @@ cdef extern from "asapo_consumer.h" namespace "asapo" nogil: cdef extern from "asapo_consumer.h" namespace "asapo" nogil: cdef cppclass DataBrokerFactory: DataBrokerFactory() except + - unique_ptr[DataBroker] CreateServerBroker(string server_name,string source_path,SourceCredentials source,Error* error) + unique_ptr[DataBroker] CreateServerBroker(string server_name,string source_path,bool has_filesystem,SourceCredentials source,Error* error) cdef extern from "asapo_consumer.h" namespace "asapo": diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index ad31fd10332652e785bc61d6ea31274262efe2d9..c26126f2ac5d76086bff9059c6cb211e9ed94d91 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -256,9 +256,10 @@ cdef class __PyDataBrokerFactory: def __cinit__(self): with nogil: self.c_factory = DataBrokerFactory() - def create_server_broker(self,server_name,source_path,beamtime_id,stream,token,timeout): + def create_server_broker(self,server_name,source_path,has_filesystem,beamtime_id,stream,token,timeout): cdef string b_server_name = _bytes(server_name) cdef string b_source_path = _bytes(source_path) + cdef bool b_has_filesystem = has_filesystem cdef SourceCredentials source source.beamtime_id = _bytes(beamtime_id) source.user_token = _bytes(token) @@ -266,7 +267,7 @@ cdef class __PyDataBrokerFactory: cdef Error err cdef unique_ptr[DataBroker] c_broker with nogil: - c_broker = self.c_factory.CreateServerBroker(b_server_name,b_source_path,source,&err) + c_broker = self.c_factory.CreateServerBroker(b_server_name,b_source_path,b_has_filesystem,source,&err) broker = PyDataBroker() broker.c_broker = c_broker.release() broker.c_broker.SetTimeout(timeout) @@ -274,17 +275,19 @@ cdef class __PyDataBrokerFactory: throw_exception(err) return broker -def create_server_broker(server_name,source_path,beamtime_id,stream,token,timeout_ms): +def create_server_broker(server_name,source_path,has_filesystem,beamtime_id,stream,token,timeout_ms): """ :param server_name: Server endpoint (hostname:port) :type server_name: string :param source_path: Path to the folder to read data from :type source_path: string + :param has_filesystem: True if the source_path is accessible locally, otherwise will use file transfer service to get data + :type has_filesystem: bool :return: Broker object and error. (None,err) if case of error, (broker, None) if success :rtype: Tuple with broker object and error. """ factory = __PyDataBrokerFactory() - return factory.create_server_broker(_bytes(server_name),_bytes(source_path),_bytes(beamtime_id),_bytes(stream),_bytes(token),timeout_ms) + return factory.create_server_broker(_bytes(server_name),_bytes(source_path),has_filesystem, _bytes(beamtime_id),_bytes(stream),_bytes(token),timeout_ms) __version__ = "@ASAPO_VERSION_PYTHON@" diff --git a/discovery/src/asapo_discovery/request_handler/request_handler.go b/discovery/src/asapo_discovery/request_handler/request_handler.go index 25302df2d75146a268977466cbb15ce80750f906..8949bf179280329cc83a36bd17e0a5c4a8a416e0 100644 --- a/discovery/src/asapo_discovery/request_handler/request_handler.go +++ b/discovery/src/asapo_discovery/request_handler/request_handler.go @@ -6,6 +6,7 @@ type Agent interface { GetReceivers(bool) ([]byte, error) GetBroker() ([]byte, error) GetMongo() ([]byte, error) + GetFts() ([]byte, error) Init(settings utils.Settings) error } diff --git a/discovery/src/asapo_discovery/request_handler/request_handler_consul.go b/discovery/src/asapo_discovery/request_handler/request_handler_consul.go index 147970c5ce8fbfc33156db874c6c6f7070833816..3fa6862e3c944fe5cf5efbe52ffe913840ad49af 100644 --- a/discovery/src/asapo_discovery/request_handler/request_handler_consul.go +++ b/discovery/src/asapo_discovery/request_handler/request_handler_consul.go @@ -119,6 +119,26 @@ func (rh *ConsulRequestHandler) GetMongo() ([]byte, error) { return nil, nil } +func (rh *ConsulRequestHandler) GetFts() ([]byte, error) { + if len(rh.staticHandler.fts)>0 { + return rh.staticHandler.GetFts() + } + + if (rh.client == nil) { + return nil, errors.New("consul client not connected") + } + response, err := rh.GetServices("asapo-fts",false) + if err != nil { + return nil, err + } + size := len(response) + if size ==0 { + return []byte(""),nil + }else { + return []byte(response[counter.Next(size)]),nil + } + return nil, nil +} func (rh *ConsulRequestHandler) connectClient(uri string) (client *api.Client, err error) { config := api.DefaultConfig() diff --git a/discovery/src/asapo_discovery/request_handler/request_handler_consul_test.go b/discovery/src/asapo_discovery/request_handler/request_handler_consul_test.go index 3c258e725bda542998de47e9cea4c3c35ac22831..234e17192a496021cf3eef4c52f21b0ba03818b3 100644 --- a/discovery/src/asapo_discovery/request_handler/request_handler_consul_test.go +++ b/discovery/src/asapo_discovery/request_handler/request_handler_consul_test.go @@ -51,8 +51,8 @@ func (suite *ConsulHandlerTestSuite) SetupTest() { suite.registerAgents("asapo-receiver") suite.registerAgents("asapo-broker") + suite.registerAgents("asapo-fts") suite.registerAgents("mongo") - } func (suite *ConsulHandlerTestSuite) TearDownTest() { @@ -62,6 +62,8 @@ func (suite *ConsulHandlerTestSuite) TearDownTest() { suite.client.Agent().ServiceDeregister("asapo-broker1235") suite.client.Agent().ServiceDeregister("mongo1234") suite.client.Agent().ServiceDeregister("mongo1235") + suite.client.Agent().ServiceDeregister("asapo-fts1234") + suite.client.Agent().ServiceDeregister("asapo-fts1235") } @@ -146,20 +148,19 @@ func (suite *ConsulHandlerTestSuite) TestGetBrokerRoundRobin() { } - func (suite *ConsulHandlerTestSuite) TestGetMongoRoundRobin() { suite.handler.Init(consul_settings) res, err := suite.handler.GetMongo() suite.NoError(err, "") - suite.Equal("127.0.0.1:1235", string(res), "uris") + suite.Equal("127.0.0.1:1234", string(res), "uris") res, err = suite.handler.GetMongo() suite.NoError(err, "") - suite.Equal("127.0.0.1:1234", string(res), "uris") + suite.Equal("127.0.0.1:1235", string(res), "uris") res, err = suite.handler.GetMongo() suite.NoError(err, "") - suite.Equal("127.0.0.1:1235", string(res), "uris") + suite.Equal("127.0.0.1:1234", string(res), "uris") } func (suite *ConsulHandlerTestSuite) TestGetMongoStatic() { @@ -196,4 +197,31 @@ func (suite *ConsulHandlerTestSuite) TestGetBrokerEmpty() { suite.Equal("", string(res), "uris") } +func (suite *ConsulHandlerTestSuite) TestGetFtsRoundRobin() { + suite.handler.Init(consul_settings) + res, err := suite.handler.GetFts() + suite.NoError(err, "") + suite.Equal("127.0.0.1:1235", string(res), "uris") + + res, err = suite.handler.GetFts() + suite.NoError(err, "") + suite.Equal("127.0.0.1:1234", string(res), "uris") + + res, err = suite.handler.GetFts() + suite.NoError(err, "") + suite.Equal("127.0.0.1:1235", string(res), "uris") +} + +func (suite *ConsulHandlerTestSuite) TestGetFtsStatic() { + consul_settings.FileTransferService.StaticEndpoint="127.0.0.1:0000" + suite.handler.Init(consul_settings) + res, err := suite.handler.GetFts() + suite.NoError(err, "") + suite.Equal("127.0.0.1:0000", string(res), "uris") + + res, err = suite.handler.GetFts() + suite.NoError(err, "") + suite.Equal("127.0.0.1:0000", string(res), "uris") +} + diff --git a/discovery/src/asapo_discovery/request_handler/request_handler_static.go b/discovery/src/asapo_discovery/request_handler/request_handler_static.go index e3d606a5decbea9cdc0a45496c9aa92090d1cbdb..b603c85939882e700c3a7affb5c06e80ef07eb4a 100644 --- a/discovery/src/asapo_discovery/request_handler/request_handler_static.go +++ b/discovery/src/asapo_discovery/request_handler/request_handler_static.go @@ -8,6 +8,7 @@ type StaticRequestHandler struct { receiverResponce Responce broker string mongo string + fts string } @@ -23,6 +24,9 @@ func (rh *StaticRequestHandler) GetMongo() ([]byte, error) { return []byte(rh.mongo),nil } +func (rh *StaticRequestHandler) GetFts() ([]byte, error) { + return []byte(rh.fts),nil +} func (rh *StaticRequestHandler) Init(settings utils.Settings) error { @@ -30,6 +34,6 @@ func (rh *StaticRequestHandler) Init(settings utils.Settings) error { rh.receiverResponce.Uris = settings.Receiver.StaticEndpoints rh.broker = settings.Broker.StaticEndpoint rh.mongo = settings.Mongo.StaticEndpoint - + rh.fts = settings.FileTransferService.StaticEndpoint return nil } diff --git a/discovery/src/asapo_discovery/request_handler/request_handler_static_test.go b/discovery/src/asapo_discovery/request_handler/request_handler_static_test.go index 83972b343cbdd96b7274ac05f9e6943d7b5650ec..7409996fa663b31d4c0ff6fe746091852ad6fe3d 100644 --- a/discovery/src/asapo_discovery/request_handler/request_handler_static_test.go +++ b/discovery/src/asapo_discovery/request_handler/request_handler_static_test.go @@ -11,7 +11,7 @@ var uris = []string{"ip1","ip2"} const max_conn = 1 var static_settings utils.Settings= utils.Settings{Receiver:utils.ReceiverInfo{MaxConnections:max_conn,StaticEndpoints:uris},Broker:utils.BrokerInfo{ - StaticEndpoint:"ip_broker"}, Mongo:utils.MongoInfo{StaticEndpoint:"ip_mongo"}} + StaticEndpoint:"ip_broker"}, Mongo:utils.MongoInfo{StaticEndpoint:"ip_mongo"}, FileTransferService:utils.FtsInfo{StaticEndpoint:"ip_fts"}} @@ -42,3 +42,11 @@ func TestStaticHandlerGetMongoOK(t *testing.T) { assert.Equal(t,string(res), "ip_mongo") assert.Nil(t, err) } + + +func TestStaticHandlerGetFtsOK(t *testing.T) { + rh.Init(static_settings) + res,err := rh.GetFts() + assert.Equal(t,string(res), "ip_fts") + assert.Nil(t, err) +} \ No newline at end of file diff --git a/discovery/src/asapo_discovery/server/get_receivers.go b/discovery/src/asapo_discovery/server/get_receivers.go index a35bfc14ed4c303bea1b278d81dbb39adb7303f9..056d303705207bc88c8955368a090ca63ee1e4a7 100644 --- a/discovery/src/asapo_discovery/server/get_receivers.go +++ b/discovery/src/asapo_discovery/server/get_receivers.go @@ -18,7 +18,9 @@ func getService(service string) (answer []byte, code int) { case "mongo": answer, err = requestHandler.GetMongo() break - + case "fts": + answer, err = requestHandler.GetFts() + break default: err = errors.New("wrong request: "+service) } @@ -53,3 +55,10 @@ func routeGetMongo(w http.ResponseWriter, r *http.Request) { w.WriteHeader(code) w.Write(answer) } + +func routeGetFileTransferService(w http.ResponseWriter, r *http.Request) { + r.Header.Set("Content-type", "application/json") + answer,code := getService("fts") + w.WriteHeader(code) + w.Write(answer) +} \ No newline at end of file diff --git a/discovery/src/asapo_discovery/server/listroutes.go b/discovery/src/asapo_discovery/server/listroutes.go index 4eb1f5641f6ab8cad8a02d85eee32f706234fd48..6ae466fa6445b8dd3f14f83d6d93a3e37f6bc63e 100644 --- a/discovery/src/asapo_discovery/server/listroutes.go +++ b/discovery/src/asapo_discovery/server/listroutes.go @@ -23,5 +23,11 @@ var listRoutes = utils.Routes{ "/mongo", routeGetMongo, }, + utils.Route{ + "GetFTS", + "Get", + "/fts", + routeGetFileTransferService, + }, } diff --git a/discovery/src/asapo_discovery/server/routes_test.go b/discovery/src/asapo_discovery/server/routes_test.go index d838ee181fc3fa1f3d63e1f43798dab129dcf608..eeac5da5567e184b7f23a6a04f3f4b7b6d67b5fb 100644 --- a/discovery/src/asapo_discovery/server/routes_test.go +++ b/discovery/src/asapo_discovery/server/routes_test.go @@ -31,7 +31,8 @@ type GetReceiversTestSuite struct { func (suite *GetReceiversTestSuite) SetupTest() { requestHandler = new(request_handler.StaticRequestHandler) var s utils.Settings= utils.Settings{Receiver:utils.ReceiverInfo{MaxConnections:10,StaticEndpoints:[]string{"ip1","ip2"}}, - Broker:utils.BrokerInfo{StaticEndpoint:"ip_broker"},Mongo:utils.MongoInfo{StaticEndpoint:"ip_mongo"}} + Broker:utils.BrokerInfo{StaticEndpoint:"ip_broker"},Mongo:utils.MongoInfo{StaticEndpoint:"ip_mongo"}, + FileTransferService:utils.FtsInfo{StaticEndpoint:"ip_fts"}} requestHandler.Init(s) logger.SetMockLog() @@ -86,3 +87,13 @@ func (suite *GetReceiversTestSuite) TestGetMongo() { suite.Equal(w.Body.String(), "ip_mongo", "result") assertExpectations(suite.T()) } + +func (suite *GetReceiversTestSuite) TestGetFts() { + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing get fts"))) + + w := doRequest("/fts") + + suite.Equal(http.StatusOK, w.Code, "code ok") + suite.Equal(w.Body.String(), "ip_fts", "result") + assertExpectations(suite.T()) +} diff --git a/examples/consumer/getnext_broker/getnext_broker.cpp b/examples/consumer/getnext_broker/getnext_broker.cpp index 242c82a898d51e1c2e6cbdebdc4a0a04688502bf..a792e1f2199cab85cfa32f01e1ae6cfca3e43b14 100644 --- a/examples/consumer/getnext_broker/getnext_broker.cpp +++ b/examples/consumer/getnext_broker/getnext_broker.cpp @@ -54,7 +54,7 @@ std::vector<std::thread> StartThreads(const Args& params, auto exec_next = [¶ms, nfiles, errors, nbuf, nfiles_total](int i) { asapo::FileInfo fi; Error err; - auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path, + auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path, true, asapo::SourceCredentials{params.beamtime_id, "", params.stream, params.token}, &err); broker->SetTimeout((uint64_t) params.timeout_ms); diff --git a/examples/consumer/getnext_broker_python/getnext.py b/examples/consumer/getnext_broker_python/getnext.py index 070030af19c698902f4798178f047a198ced1c5d..10d6517cdedc885ff9ddf5ec79828d5908fb8d3c 100644 --- a/examples/consumer/getnext_broker_python/getnext.py +++ b/examples/consumer/getnext_broker_python/getnext.py @@ -6,7 +6,7 @@ import sys source, path, beamtime, token, group_id = sys.argv[1:] -broker = asapo_consumer.create_server_broker(source,path, beamtime,"",token,60000) +broker = asapo_consumer.create_server_broker(source,path,True, beamtime,"",token,60000) if group_id == "new": diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp index d9bd76f51c1e6f5775b0012b3a9f480908123f98..b5813df8032e5d14159db08ea0eea294d30c19c2 100644 --- a/examples/pipeline/in_to_out/in_to_out.cpp +++ b/examples/pipeline/in_to_out/in_to_out.cpp @@ -64,7 +64,7 @@ int ProcessError(const Error& err) { } BrokerPtr CreateBrokerAndGroup(const Args& args, Error* err) { - auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, args.file_path, + auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, args.file_path, true, asapo::SourceCredentials{args.beamtime_id, "", args.stream_in, args.token}, err); if (*err) { return nullptr; diff --git a/examples/pipeline/in_to_out_python/in_to_out.py b/examples/pipeline/in_to_out_python/in_to_out.py index 9bc02800e7944e485a7ed1e32b8d9486ca24a695..e3444b92a91b28ab1dcd5aa83ab6cfa9fdc8318e 100644 --- a/examples/pipeline/in_to_out_python/in_to_out.py +++ b/examples/pipeline/in_to_out_python/in_to_out.py @@ -28,7 +28,7 @@ timeout_s_producer=int(timeout_s_producer) nthreads=int(nthreads) transfer_data=int(transfer_data)>0 -broker = asapo_consumer.create_server_broker(source,path, beamtime,stream_in,token,timeout_s*1000) +broker = asapo_consumer.create_server_broker(source,path, True,beamtime,stream_in,token,timeout_s*1000) producer = asapo_producer.create_producer(source,beamtime,'auto', stream_out, token, nthreads, 600) diff --git a/file_transfer/CMakeLists.txt b/file_transfer/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..b420835f5162dedcedb6c7e80ceb560644e1a166 --- /dev/null +++ b/file_transfer/CMakeLists.txt @@ -0,0 +1,38 @@ +set (TARGET_NAME asapo-file-transfer) + +if (NOT "$ENV{GOPATH}" STREQUAL "") + set(GOPATH $ENV{GOPATH}) +endif() + +if (NOT GOPATH) + message (FATAL_ERROR "GOPATH not set") +endif() + +message(STATUS "global gopath ${GOPATH}") + +IF(WIN32) + set (gopath "${GOPATH}\;${CMAKE_CURRENT_SOURCE_DIR}\;${CMAKE_SOURCE_DIR}/common/go") + set (exe_name "${TARGET_NAME}.exe") +ELSE() + set (gopath ${GOPATH}:${CMAKE_CURRENT_SOURCE_DIR}:${CMAKE_SOURCE_DIR}/common/go) + set (exe_name "${TARGET_NAME}") +ENDIF() + +include(testing_go) + +configure_file(docker/Dockerfile . COPYONLY) + +add_custom_target(asapo-file-transfer ALL + COMMAND ${CMAKE_COMMAND} -E env GOPATH=${gopath} + go build ${GO_OPTS} -o ${exe_name} asapo_file_transfer/main + VERBATIM) +define_property(TARGET PROPERTY EXENAME + BRIEF_DOCS <executable name> + FULL_DOCS <full-doc>) + +set_target_properties(asapo-file-transfer PROPERTIES EXENAME ${CMAKE_CURRENT_BINARY_DIR}/${exe_name}) + + +install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${exe_name} DESTINATION bin) + +gotest(${TARGET_NAME} "./...") diff --git a/file_transfer/docker/Dockerfile b/file_transfer/docker/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..71200554d9efe0c45347be32108514620bf9238c --- /dev/null +++ b/file_transfer/docker/Dockerfile @@ -0,0 +1,3 @@ +FROM busybox:glibc +ADD asapo-file-transfer / +CMD ["/asapo-file-transfer","-config","/var/lib/file_transfer/config.json"] diff --git a/file_transfer/src/asapo_file_transfer/main/file_transfer.go b/file_transfer/src/asapo_file_transfer/main/file_transfer.go new file mode 100644 index 0000000000000000000000000000000000000000..52dd23ab3ca954a1c62a148be85222a0c8dce56f --- /dev/null +++ b/file_transfer/src/asapo_file_transfer/main/file_transfer.go @@ -0,0 +1,39 @@ +//+build !test + +package main + +import ( + log "asapo_common/logger" + "asapo_file_transfer/server" + "asapo_common/version" + "flag" + "os" +) + +func PrintUsage() { + log.Fatal("Usage: " + os.Args[0] + " -config <config file>") +} + +func main() { + var fname = flag.String("config", "", "config file path") + + if ret := version.ShowVersion(os.Stdout, "ASAPO File Transfer Service"); ret { + return + } + + log.SetSoucre("file tranfer") + + flag.Parse() + if *fname == "" { + PrintUsage() + } + + logLevel, err := server.ReadConfig(*fname) + if err != nil { + log.Fatal(err.Error()) + } + + log.SetLevel(logLevel) + + server.Start() +} diff --git a/file_transfer/src/asapo_file_transfer/server/get_health.go b/file_transfer/src/asapo_file_transfer/server/get_health.go new file mode 100644 index 0000000000000000000000000000000000000000..b7d9f2446fb62c2c3e7d353172978d4a9682e832 --- /dev/null +++ b/file_transfer/src/asapo_file_transfer/server/get_health.go @@ -0,0 +1,11 @@ +package server + +import ( + "net/http" +) + + +func routeGetHealth(w http.ResponseWriter, r *http.Request) { + r.Header.Set("Content-type", "application/json") + w.WriteHeader(http.StatusNoContent) +} diff --git a/file_transfer/src/asapo_file_transfer/server/get_health_test.go b/file_transfer/src/asapo_file_transfer/server/get_health_test.go new file mode 100644 index 0000000000000000000000000000000000000000..fc8d6c2fad3f5fb5e09c5ee74d799d34d9bf8a30 --- /dev/null +++ b/file_transfer/src/asapo_file_transfer/server/get_health_test.go @@ -0,0 +1,18 @@ +package server + +import ( + "github.com/stretchr/testify/assert" + "net/http" + "testing" + "net/http/httptest" + "asapo_common/utils" +) + + +func TestGetNext(t *testing.T) { + mux := utils.NewRouter(listRoutes) + req, _ := http.NewRequest("GET", "/health-check", nil) + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + assert.Equal(t, http.StatusNoContent, w.Code) +} diff --git a/file_transfer/src/asapo_file_transfer/server/listroutes.go b/file_transfer/src/asapo_file_transfer/server/listroutes.go new file mode 100644 index 0000000000000000000000000000000000000000..03fba92d86e41b3418eba91878c4253cb5ffb2f0 --- /dev/null +++ b/file_transfer/src/asapo_file_transfer/server/listroutes.go @@ -0,0 +1,20 @@ +package server + +import ( + "asapo_common/utils" +) + +var listRoutes = utils.Routes{ + utils.Route{ + "Transfer File", + "POST", + "/transfer", + routeFileTransfer, + }, + utils.Route{ + "HealthCheck", + "Get", + "/health-check", + routeGetHealth, + }, +} diff --git a/file_transfer/src/asapo_file_transfer/server/server.go b/file_transfer/src/asapo_file_transfer/server/server.go new file mode 100644 index 0000000000000000000000000000000000000000..0c07eac6ff8932c2e785aebb66f2984e17782615 --- /dev/null +++ b/file_transfer/src/asapo_file_transfer/server/server.go @@ -0,0 +1,16 @@ +package server + +import ( +"asapo_common/utils" +) + +type serverSettings struct { + Port int + LogLevel string + SecretFile string + key string +} + +var settings serverSettings +var authJWT utils.Auth + diff --git a/file_transfer/src/asapo_file_transfer/server/server_nottested.go b/file_transfer/src/asapo_file_transfer/server/server_nottested.go new file mode 100644 index 0000000000000000000000000000000000000000..a55f7df422932df339e39f61d2836fb91e2d06df --- /dev/null +++ b/file_transfer/src/asapo_file_transfer/server/server_nottested.go @@ -0,0 +1,43 @@ +//+build !test + +package server + +import ( + log "asapo_common/logger" + "asapo_common/utils" + "asapo_common/version" + "errors" + "net/http" + "strconv" +) + +func Start() { + mux := utils.NewRouter(listRoutes) + log.Info("Starting ASAPO Authorizer, version " + version.GetVersion()) + log.Info("Listening on port: " + strconv.Itoa(settings.Port)) + log.Fatal(http.ListenAndServe(":"+strconv.Itoa(settings.Port), utils.ProcessJWTAuth(mux.ServeHTTP, settings.key))) +} + +func ReadConfig(fname string) (log.Level, error) { + if err := utils.ReadJsonFromFile(fname, &settings); err != nil { + return log.FatalLevel, err + } + + if settings.Port == 0 { + return log.FatalLevel, errors.New("Server port not set") + } + + if settings.SecretFile == "" { + return log.FatalLevel, errors.New("Secret file not set") + } + + var err error + settings.key, err = utils.ReadFirstStringFromFile(settings.SecretFile) + if err != nil { + return log.FatalLevel, errors.New("Cannot read secret from file " + err.Error()) + } + + level, err := log.LevelFromString(settings.LogLevel) + + return level, err +} diff --git a/file_transfer/src/asapo_file_transfer/server/transfer.go b/file_transfer/src/asapo_file_transfer/server/transfer.go new file mode 100644 index 0000000000000000000000000000000000000000..8bd115d12771f4ec23cfb9b09cfeddd3a579dc92 --- /dev/null +++ b/file_transfer/src/asapo_file_transfer/server/transfer.go @@ -0,0 +1,79 @@ +package server + +import ( + log "asapo_common/logger" + "asapo_common/utils" + "errors" + "net/http" + "os" + "path" +) + + +type fileTransferRequest struct { + Folder string + FileName string +} + + +func Exists(name string) bool { + fi, err := os.Stat(name) + return !os.IsNotExist(err) && !fi.IsDir() +} + + +func checkClaim(r *http.Request,request* fileTransferRequest) (int,error) { + var extraClaim utils.FolderTokenTokenExtraClaim + if err := utils.JobClaimFromContext(r, &extraClaim); err != nil { + return http.StatusInternalServerError,err + } + if extraClaim.RootFolder!=request.Folder { + err_txt := "access forbidden for folder "+request.Folder + log.Error("cannot transfer file: "+err_txt) + return http.StatusUnauthorized, errors.New(err_txt) + } + return http.StatusOK,nil +} + +func checkFileExists(r *http.Request,name string) (int,error) { + if !Exists(name) { + err_txt := "file "+name+" does not exist" + log.Error("cannot transfer file: "+err_txt) + return http.StatusBadRequest,errors.New(err_txt) + } + return http.StatusOK,nil + +} + +func checkRequest(r *http.Request) (string,int,error) { + var request fileTransferRequest + err := utils.ExtractRequest(r,&request) + if err != nil { + return "",http.StatusBadRequest,err + } + + if status,err := checkClaim(r,&request); err != nil { + return "",status,err + } + fullName := request.Folder+string(os.PathSeparator)+request.FileName + if status,err := checkFileExists(r,fullName); err != nil { + return "",status,err + } + return fullName,http.StatusOK,nil +} + +func serveFile(w http.ResponseWriter, r *http.Request, fullName string) { + _, file := path.Split(fullName) + w.Header().Set("Content-Disposition", "attachment; filename=\""+file+"\"") + log.Debug("Transferring file " + fullName) + http.ServeFile(w,r, fullName) +} + +func routeFileTransfer(w http.ResponseWriter, r *http.Request) { + fullName, status,err := checkRequest(r); + if err != nil { + utils.WriteServerError(w,err,status) + return + } + serveFile(w,r,fullName) +} diff --git a/file_transfer/src/asapo_file_transfer/server/transfer_test.go b/file_transfer/src/asapo_file_transfer/server/transfer_test.go new file mode 100644 index 0000000000000000000000000000000000000000..1f6169ce9761d1c6ad99b75c0e8dfe4da67c9a72 --- /dev/null +++ b/file_transfer/src/asapo_file_transfer/server/transfer_test.go @@ -0,0 +1,87 @@ +package server + +import ( + "asapo_common/utils" + "github.com/stretchr/testify/assert" + "net/http" + "net/http/httptest" + "strings" + "testing" + "io/ioutil" + "time" + "os" + "path/filepath" +) + + +type request struct { + path string + cmd string + answer int + message string +} + +func containsMatcher(substr string) func(str string) bool { + return func(str string) bool { return strings.Contains(str, substr) } +} + +func makeRequest(request interface{}) string { + buf, _ := utils.MapToJson(request) + return string(buf) +} + +func prepareToken(folder string) string{ + auth := utils.NewJWTAuth("key") + + var claims utils.CustomClaims + var extraClaim utils.FolderTokenTokenExtraClaim + extraClaim.RootFolder = folder + claims.ExtraClaims = &extraClaim + claims.Duration = time.Duration(1) * time.Minute + token,_ := auth.GenerateToken(&claims) + return token +} + +func doPostRequest(path string,buf string,token string) *httptest.ResponseRecorder { + mux := utils.NewRouter(listRoutes) + req, _ := http.NewRequest("POST", path, strings.NewReader(buf)) + req.Header.Add("Authorization", "Bearer "+token) + w := httptest.NewRecorder() + utils.ProcessJWTAuth(mux.ServeHTTP, "key")(w,req) + return w +} + +var transferFileTests = [] struct { + folder string + fname string + token string + status int + message string +}{ + {"folder","exists", prepareToken("folder"),http.StatusOK,"file transferred"}, + {"folder","not_exists", prepareToken("folder"),http.StatusBadRequest,"file not exists"}, + {"wrong_folder","p07", prepareToken("folder"),http.StatusUnauthorized,"wrong folder"}, + {"folder","p07", "wrong token",http.StatusUnauthorized,"wrong token"}, +} + +func TestTransferFile(t *testing.T) { + os.MkdirAll(filepath.Clean("folder"), os.ModePerm) + ioutil.WriteFile(filepath.Clean("folder/exists"), []byte("hello"), 0644) + defer os.RemoveAll("folder") + + for _, test := range transferFileTests { + request := makeRequest(fileTransferRequest{test.folder,test.fname}) + w := doPostRequest("/transfer",request,test.token) + if test.status==http.StatusOK { + body, _ := ioutil.ReadAll(w.Body) + body_str:=string(body) + assert.Equal(t, test.status, w.Code, test.message) + assert.Contains(t, w.Header().Get("Content-Disposition"),test.fname, test.message) + assert.Equal(t, "hello", body_str, test.message) + + } + assert.Equal(t, test.status, w.Code, test.message) + } +} + + diff --git a/receiver/src/request_handler_authorize.cpp b/receiver/src/request_handler_authorize.cpp index d158882d38756fe96e90f281f98db7ac24f4023f..8829abb5de4484a3ac7c53c5cabef333b19f078f 100644 --- a/receiver/src/request_handler_authorize.cpp +++ b/receiver/src/request_handler_authorize.cpp @@ -31,7 +31,7 @@ Error RequestHandlerAuthorize::Authorize(Request* request, const char* source_cr Error err; std::string request_string = GetRequestString(request, source_credentials); - auto response = http_client__->Post(GetReceiverConfig()->authorization_server + "/authorize", request_string, &code, + auto response = http_client__->Post(GetReceiverConfig()->authorization_server + "/authorize", "", request_string, &code, &err); if (err || code != HttpCode::OK) { auto auth_error = ErrorFromAuthorizationServerResponse(err, code); diff --git a/receiver/src/statistics_sender_influx_db.cpp b/receiver/src/statistics_sender_influx_db.cpp index 66bac257969cc08d84f1873147417ebcd9e813d4..4de06b6f272166a0283b8ab245ce205a235009b1 100644 --- a/receiver/src/statistics_sender_influx_db.cpp +++ b/receiver/src/statistics_sender_influx_db.cpp @@ -22,7 +22,7 @@ void StatisticsSenderInfluxDb::SendStatistics(const StatisticsToSend& statistic) HttpCode code; Error err; auto response = httpclient__->Post(GetReceiverConfig()->performance_db_uri + "/write?db=" + - GetReceiverConfig()->performance_db_name, StatisticsToString(statistic), + GetReceiverConfig()->performance_db_name, "", StatisticsToString(statistic), &code, &err); std::string msg = "sending statistics to " + GetReceiverConfig()->performance_db_name + " at " + GetReceiverConfig()->performance_db_uri; @@ -56,7 +56,7 @@ std::string StatisticsSenderInfluxDb::StatisticsToString(const StatisticsToSend& StatisticsSenderInfluxDb::StatisticsSenderInfluxDb(): httpclient__{DefaultHttpClient()}, log__{GetDefaultReceiverLogger()} { HttpCode code; Error err; - auto response = httpclient__->Post(GetReceiverConfig()->performance_db_uri + "/query", + auto response = httpclient__->Post(GetReceiverConfig()->performance_db_uri + "/query", "", "q=create database " + GetReceiverConfig()->performance_db_name, &code, &err); std::string msg = "initializing statistics for " + GetReceiverConfig()->performance_db_name + " at " + GetReceiverConfig()->performance_db_uri; diff --git a/tests/automatic/CMakeLists.txt b/tests/automatic/CMakeLists.txt index 2f73aa174c687d7ea3102923a9af08b706ad8d85..7e926f7928815a5d64b056fb3bd22442a6ce462e 100644 --- a/tests/automatic/CMakeLists.txt +++ b/tests/automatic/CMakeLists.txt @@ -29,6 +29,9 @@ add_subdirectory(spd_logger) add_subdirectory(producer) +add_subdirectory(file_transfer_service) + + if (UNIX) add_subdirectory(high_avail) endif() diff --git a/tests/automatic/bug_fixes/consumer_python_memleak/check_linux.sh b/tests/automatic/bug_fixes/consumer_python_memleak/check_linux.sh index 6b81cebc39ec499ecc2b31c09bedf4f2579b644d..edf36d29606436cd033909680251472d05767f59 100644 --- a/tests/automatic/bug_fixes/consumer_python_memleak/check_linux.sh +++ b/tests/automatic/bug_fixes/consumer_python_memleak/check_linux.sh @@ -53,4 +53,4 @@ leak=$(( $mem2 - $mem1 )) cat out echo leak: $leak -test $leak -eq 0 \ No newline at end of file +test $leak -lt 300000 \ No newline at end of file diff --git a/tests/automatic/bug_fixes/consumer_python_memleak/memleak.py b/tests/automatic/bug_fixes/consumer_python_memleak/memleak.py index 0ff83df0d938ce8dec651b8a979dd0a78c2643af..2f5eaac9bc5eeda3289c08e2de21e7828b169f98 100644 --- a/tests/automatic/bug_fixes/consumer_python_memleak/memleak.py +++ b/tests/automatic/bug_fixes/consumer_python_memleak/memleak.py @@ -5,7 +5,7 @@ import sys source, path, beamtime, token = sys.argv[1:] broker = asapo_consumer.create_server_broker( - source, path, beamtime, "stream", token, 1000) + source, path,True, beamtime, "stream", token, 1000) group_id = broker.generate_group_id() print('generated group id: ', group_id) diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp index 2a85a1d351632ccba6ebf6ae131c2b589c172fb6..73ebf67fe962db006eb8520a4a6a0a6600eae6d9 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.cpp +++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp @@ -186,7 +186,7 @@ void TestDataset(const std::unique_ptr<asapo::DataBroker>& broker, const std::st void TestAll(const Args& args) { asapo::Error err; - auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, ".", + auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, ".", true, asapo::SourceCredentials{args.run_name, "", "", args.token}, &err); broker->SetTimeout(100); auto group_id = broker->GenerateNewGroupId(&err); diff --git a/tests/automatic/consumer/consumer_api_python/CMakeLists.txt b/tests/automatic/consumer/consumer_api_python/CMakeLists.txt index 03b6ff48320be3947dcf696aa94c97a5efb63147..f63e4734aabc8983a8dd9b7bb01d4a2c12724e07 100644 --- a/tests/automatic/consumer/consumer_api_python/CMakeLists.txt +++ b/tests/automatic/consumer/consumer_api_python/CMakeLists.txt @@ -9,6 +9,8 @@ else() get_target_property(PYTHON_LIBS asapo_consumer BINARY_DIR) endif() -add_script_test("${TARGET_NAME}" "${PYTHON_LIBS} ${Python_EXECUTABLE}" nomem) -configure_file(consumer_api.py consumer_api.py COPYONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/authorizer_settings.json.tpl.in authorizer.json.tpl @ONLY) + + +add_script_test("${TARGET_NAME}" "${PYTHON_LIBS} ${Python_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}" nomem) diff --git a/tests/automatic/consumer/consumer_api_python/authorizer_settings.json.tpl.in b/tests/automatic/consumer/consumer_api_python/authorizer_settings.json.tpl.in new file mode 100644 index 0000000000000000000000000000000000000000..7b88592e44067c5c1f3d3f2cca3c849f7313f8e5 --- /dev/null +++ b/tests/automatic/consumer/consumer_api_python/authorizer_settings.json.tpl.in @@ -0,0 +1,8 @@ +{ + "Port": {{ env "NOMAD_PORT_authorizer" }}, + "LogLevel":"debug", + "RootBeamtimesFolder":"@ASAP3_FOLDER@", + "CurrentBeamlinesFolder":"@CURRENT_BEAMLINES_FOLDER@", + "SecretFile":"auth_secret.key", + "TokenDurationMin":600 +} diff --git a/tests/automatic/consumer/consumer_api_python/check_linux.sh b/tests/automatic/consumer/consumer_api_python/check_linux.sh index df7485701ab514ba84090defcef0fdb2807f07e7..04f3c058ba108f46132e138160a9900abbeb1dd8 100644 --- a/tests/automatic/consumer/consumer_api_python/check_linux.sh +++ b/tests/automatic/consumer/consumer_api_python/check_linux.sh @@ -1,12 +1,13 @@ #!/usr/bin/env bash -source_path=. beamtime_id=test_run +source_path=`pwd`/asap3/petra3/gpfs/p01/2019/data/$beamtime_id stream=detector database_name=${beamtime_id}_${stream} token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo= set -e + trap Cleanup EXIT Cleanup() { @@ -15,16 +16,22 @@ Cleanup() { nomad run nginx_kill.nmd && nomad stop -yes -purge nginx_kill nomad stop discovery >/dev/null nomad stop broker >/dev/null + nomad stop file_transfer >/dev/null + nomad stop authorizer >/dev/null echo "db.dropDatabase()" | mongo ${database_name} >/dev/null - rm 1 1_1 + rm $source_path/1 $source_path/1_1 } nomad run nginx.nmd nomad run discovery.nmd nomad run broker.nmd +nomad run file_transfer.nmd +nomad run authorizer.nmd + -echo hello1 > 1 -echo hello1 > 1_1 +mkdir -p $source_path +echo -n hello1 > $source_path/1 +echo -n hello1 > $source_path/1_1 for i in `seq 1 5`; do @@ -45,8 +52,7 @@ sleep 1 export PYTHONPATH=$1:${PYTHONPATH} export Python_EXECUTABLE=$2 -$Python_EXECUTABLE consumer_api.py 127.0.0.1:8400 $source_path $beamtime_id $token_test_run single - +$Python_EXECUTABLE $3/consumer_api.py 127.0.0.1:8400 $source_path $beamtime_id $token_test_run single #check datasets echo "db.dropDatabase()" | mongo ${database_name} > /dev/null @@ -65,4 +71,4 @@ do done -$Python_EXECUTABLE consumer_api.py 127.0.0.1:8400 $source_path $beamtime_id $token_test_run datasets +$Python_EXECUTABLE $3/consumer_api.py 127.0.0.1:8400 $source_path $beamtime_id $token_test_run datasets diff --git a/tests/automatic/consumer/consumer_api_python/check_windows.bat b/tests/automatic/consumer/consumer_api_python/check_windows.bat index 1ac06c62adf06199eecc36522b8e12e65924c43f..86d08b03e970cd32a50f1ff9e9bbe555d67ae7f7 100644 --- a/tests/automatic/consumer/consumer_api_python/check_windows.bat +++ b/tests/automatic/consumer/consumer_api_python/check_windows.bat @@ -1,5 +1,8 @@ -SET source_path=. +setlocal SET beamtime_id=test_run +SET source_path=%cd%\asap3\petra3\gpfs\p01\2019\data\%beamtime_id% +set source_path=%source_path:\=\\% + SET stream=detector SET database_name=%beamtime_id%_%stream% @@ -10,6 +13,8 @@ set token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo= c:\opt\consul\nomad run discovery.nmd c:\opt\consul\nomad run broker.nmd c:\opt\consul\nomad run nginx.nmd +c:\opt\consul\nomad run file_transfer.nmd +c:\opt\consul\nomad run authorizer.nmd ping 1.0.0.0 -n 10 -w 100 > nul @@ -20,19 +25,21 @@ for /l %%x in (1, 1, 5) do echo db.data_stream1.insert({"_id":%%x,"size":6,"name for /l %%x in (1, 1, 5) do echo db.data_stream2.insert({"_id":%%x,"size":6,"name":"2%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error -set PYTHONPATH=%1 +mkdir %source_path% + -echo hello1 > 1 -echo hello1 > 1_1 +set PYTHONPATH=%1 +echo | set /p dummyName="hello1" > %source_path%\1 +echo | set /p dummyName="hello1" > %source_path%\1_1 -python consumer_api.py 127.0.0.1:8400 %source_path% %beamtime_id% %token_test_run% single || goto :error +python %3/consumer_api.py 127.0.0.1:8400 %source_path% %beamtime_id% %token_test_run% single || goto :error echo db.dropDatabase() | %mongo_exe% %database_name% for /l %%x in (1, 1, 10) do echo db.data_default.insert({"_id":%%x,"size":3,"images":[{"_id":1, "size":6,"name":"%%x_1","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":2, "size":6,"name":"%%x_2","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":3, "size":6,"name":"%%x_3","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}]}) | %mongo_exe% %database_name% || goto :error -python consumer_api.py 127.0.0.1:8400 %source_path% %beamtime_id% %token_test_run% datasets || goto :error +python %3/consumer_api.py 127.0.0.1:8400 %source_path% %beamtime_id% %token_test_run% datasets || goto :error goto :clean @@ -46,5 +53,10 @@ c:\opt\consul\nomad stop discovery c:\opt\consul\nomad stop broker c:\opt\consul\nomad stop nginx c:\opt\consul\nomad run nginx_kill.nmd && c:\opt\consul\nomad stop -yes -purge nginx_kill +c:\opt\consul\nomad stop file_transfer +c:\opt\consul\nomad stop authorizer + echo db.dropDatabase() | %mongo_exe% %database_name% -del "1 1_1" +del c:\tmp\asapo\consumer_test\files\1 +del c:\tmp\asapo\consumer_test\files\1_1 + diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py index 10349aec51fd140f36f2fe1f807cd22a19489ee2..706828c68e38fd4d3b5a06274a9304d7880c5905 100644 --- a/tests/automatic/consumer/consumer_api_python/consumer_api.py +++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py @@ -26,11 +26,18 @@ def assert_usermetadata(meta,name): def assert_eq(val,expected,name): + print ("asserting eq for "+name) if val != expected: print ("error at "+name) print ('val: ', val,' expected: ',expected) sys.exit(1) +def check_file_transfer_service(broker,group_id): + broker.set_timeout(1000) + data, meta = broker.get_by_id(1, group_id, meta_only=False) + assert_eq(data.tostring().decode("utf-8"),"hello1","check_file_transfer_service ok") + + def check_single(broker,group_id_new): _, meta = broker.get_next(group_id_new, meta_only=True) @@ -141,7 +148,7 @@ def check_single(broker,group_id_new): else: exit_on_noerr("wrong query") - broker = asapo_consumer.create_server_broker("bla",path, beamtime,"",token,1000) + broker = asapo_consumer.create_server_broker("bla",path, True, beamtime,"",token,1000) try: broker.get_last(group_id_new, meta_only=True) except asapo_consumer.AsapoUnavailableServiceError as err: @@ -192,12 +199,16 @@ def check_dataset(broker,group_id_new): source, path, beamtime, token, mode = sys.argv[1:] -broker = asapo_consumer.create_server_broker(source,path, beamtime,"",token,60000) +broker = asapo_consumer.create_server_broker(source,path,True, beamtime,"",token,60000) +broker_fts = asapo_consumer.create_server_broker(source,path,False, beamtime,"",token,60000) group_id_new = broker.generate_group_id() +group_id_fts = broker_fts.generate_group_id() + if mode == "single": check_single(broker,group_id_new) + check_file_transfer_service(broker_fts,group_id_fts) if mode == "datasets": check_dataset(broker,group_id_new) diff --git a/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp b/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp index 3f06dda46b641e892bc57064e1d506ee7fde8a59..deea0768f29ff3844d6f3c7d1ac50cf1487b2535 100644 --- a/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp +++ b/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp @@ -49,7 +49,7 @@ Args GetArgs(int argc, char* argv[]) { void TestAll(const Args& args) { asapo::Error err; - auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, "dummy", asapo::SourceCredentials{args.run_name, "", "", args.token}, &err); + auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, "dummy", true, asapo::SourceCredentials{args.run_name, "", "", args.token}, &err); auto group_id = broker->GenerateNewGroupId(&err); broker->SetTimeout(10000); std::vector<asapo::FileInfos>file_infos(args.nthreads); diff --git a/tests/automatic/curl_http_client/curl_http_client_command/CMakeLists.txt b/tests/automatic/curl_http_client/curl_http_client_command/CMakeLists.txt index 94651979681b9fb6c0f2d4a577d5c307d6fd5fca..506902ea3b035cda2fe255ef5606afd12d1f518a 100644 --- a/tests/automatic/curl_http_client/curl_http_client_command/CMakeLists.txt +++ b/tests/automatic/curl_http_client/curl_http_client_command/CMakeLists.txt @@ -18,12 +18,8 @@ target_link_libraries(${TARGET_NAME} test_common asapo-consumer) # Testing ################################ -#add_test_setup_cleanup(${TARGET_NAME}) -#add_integration_test(${TARGET_NAME} get_httpbin "GET http://httpbin.org body 200") -add_integration_test(${TARGET_NAME} get_badaddress "GET google.com/badaddress found 404") -#add_integration_test(${TARGET_NAME} post "POST http://httpbin.org/post data 200") +prepare_asapo() -add_integration_test(${TARGET_NAME} post_badaddress "POST google.com/badaddress found 404") +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/authorizer_settings.json.tpl.in authorizer.json.tpl @ONLY) -#add_integration_test(${TARGET_NAME} get_badaddress2 "GET 111 clienterror 404") -#add_integration_test(${TARGET_NAME} post_badaddress2 "POST 111 clienterror 404") \ No newline at end of file +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:${TARGET_NAME}>" nomem) diff --git a/tests/automatic/curl_http_client/curl_http_client_command/authorizer_settings.json.tpl.in b/tests/automatic/curl_http_client/curl_http_client_command/authorizer_settings.json.tpl.in new file mode 100644 index 0000000000000000000000000000000000000000..7b88592e44067c5c1f3d3f2cca3c849f7313f8e5 --- /dev/null +++ b/tests/automatic/curl_http_client/curl_http_client_command/authorizer_settings.json.tpl.in @@ -0,0 +1,8 @@ +{ + "Port": {{ env "NOMAD_PORT_authorizer" }}, + "LogLevel":"debug", + "RootBeamtimesFolder":"@ASAP3_FOLDER@", + "CurrentBeamlinesFolder":"@CURRENT_BEAMLINES_FOLDER@", + "SecretFile":"auth_secret.key", + "TokenDurationMin":600 +} diff --git a/tests/automatic/curl_http_client/curl_http_client_command/check_linux.sh b/tests/automatic/curl_http_client/curl_http_client_command/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..989a4562e6b4e5708dc3d000e31b1394c4b38ece --- /dev/null +++ b/tests/automatic/curl_http_client/curl_http_client_command/check_linux.sh @@ -0,0 +1,27 @@ +#!/usr/bin/env bash + +set -e + +trap Cleanup EXIT + +beamtime_id=aaa +file_transfer_folder=`pwd`/asap3/petra3/gpfs/p01/2019/data/$beamtime_id + +Cleanup() { + echo cleanup + nomad stop authorizer + nomad stop file_transfer + rm -rf $file_transfer_folder bbb +} + +nomad run authorizer.nmd +nomad run file_transfer.nmd +sleep 1 + +mkdir -p $file_transfer_folder +echo -n hello > $file_transfer_folder/aaa + +$1 127.0.0.1:5007 127.0.0.1:5008 $file_transfer_folder aaa +cat bbb | tee /dev/stderr | grep hello + + diff --git a/tests/automatic/curl_http_client/curl_http_client_command/check_windows.bat b/tests/automatic/curl_http_client/curl_http_client_command/check_windows.bat new file mode 100644 index 0000000000000000000000000000000000000000..1748f6e8c9c643754a6f2c383c12dc154c1ed4f7 --- /dev/null +++ b/tests/automatic/curl_http_client/curl_http_client_command/check_windows.bat @@ -0,0 +1,31 @@ +setlocal +SET beamtime_id=aaa +SET file_transfer_folder=%cd%\asap3\petra3\gpfs\p01\2019\data\%beamtime_id% +set file_transfer_folder=%file_transfer_folder:\=\\% + + + +c:\opt\consul\nomad run authorizer.nmd +c:\opt\consul\nomad run file_transfer.nmd + +ping 1.0.0.0 -n 1 -w 100 > nul + +mkdir %file_transfer_folder% +echo | set /p dummyName="hello" > %file_transfer_folder%\aaa + +"%1" 127.0.0.1:5007 127.0.0.1:5008 %file_transfer_folder% aaa || goto :error + +type bbb | findstr /c:"hello" || goto :error + +goto :clean + +:error +call :clean +exit /b 1 + +:clean +c:\opt\consul\nomad stop authorizer +c:\opt\consul\nomad stop file_transfer +rmdir /S /Q %file_transfer_folder% +del /f bbb + diff --git a/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp b/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp index 49efa4afae83f9f87a6a00f93e10a697126ed803..264d88c6befd767c99e69ae1b6dcbc52d35c3fd8 100644 --- a/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp +++ b/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp @@ -3,15 +3,17 @@ #include "consumer/data_broker.h" #include "testing.h" #include "../../../consumer/api/cpp/src/server_data_broker.h" +#include "preprocessor/definitions.h" using asapo::M_AssertEq; using asapo::M_AssertContains; +using asapo::M_AssertTrue; struct Args { - std::string command; - std::string uri; - int code; - std::string answer; + std::string uri_authorizer; + std::string uri_fts; + std::string folder; + std::string fname; }; Args GetArgs(int argc, char* argv[]) { @@ -19,38 +21,55 @@ Args GetArgs(int argc, char* argv[]) { std::cout << "Wrong number of arguments" << std::endl; exit(EXIT_FAILURE); } - std::string command{argv[1]}; - std::string uri{argv[2]}; - std::string answer {argv[3]}; - int code = std::stoi(argv[4]); - return Args{command, uri, code, answer}; + std::string uri_authorizer{argv[1]}; + std::string uri_fts{argv[2]}; + std::string folder{argv[3]}; + std::string fname{argv[4]}; + return Args{uri_authorizer, uri_fts, folder, fname}; } int main(int argc, char* argv[]) { auto args = GetArgs(argc, argv); - + auto token = "bnCXpOdBV90wU1zybEw1duQNSORuwaKz6oDHqmL35p0="; //token for aaa + std::string authorize_request = "{\"Folder\":\"" + args.folder + "\",\"BeamtimeId\":\"aaa\",\"Token\":\"" + token + + "\"}"; asapo::Error err; - auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.uri, "", asapo::SourceCredentials{"", "", "", ""}, &err); + auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.uri_authorizer, "", true, asapo::SourceCredentials{"", "", "", ""}, &err); auto server_broker = static_cast<asapo::ServerDataBroker*>(broker.get()); asapo::HttpCode code; std::string response; - if (args.command == "GET") { - response = server_broker->httpclient__->Get(args.uri, &code, &err); - } else if (args.command == "POST") { - response = server_broker->httpclient__->Post(args.uri, "testdata", &code, &err); + std::string input_data; + auto folder_token = server_broker->httpclient__->Post(args.uri_authorizer + "/folder", "", authorize_request, &code, + &err); + M_AssertTrue(err == nullptr); + M_AssertTrue(code == asapo::HttpCode::OK); + if (err) { + std::cout << err->Explain(); } - if (err != nullptr) { - M_AssertEq("clienterror", args.answer); - M_AssertContains(response, "Could"); - return 0; - } + server_broker->httpclient__->Post(args.uri_authorizer + "/folder", "", "", &code, &err); + M_AssertTrue(code == asapo::HttpCode::BadRequest); + + server_broker->httpclient__->Post(args.uri_authorizer + "/bla", "", "", &code, &err); + M_AssertTrue(code == asapo::HttpCode::NotFound); + +// check post with data + std::string transfer = "{\"Folder\":\"" + args.folder + "\",\"FileName\":\"aaa\"}"; + std::string cookie = "Authorization=Bearer " + folder_token + ";"; + auto content = server_broker->httpclient__->Post(args.uri_fts + "/transfer", cookie, transfer, &code, &err); + M_AssertEq("hello", content); + M_AssertTrue(code == asapo::HttpCode::OK); + + asapo::FileData data; + err = server_broker->httpclient__->Post(args.uri_fts + "/transfer", cookie, transfer, &data, 5, &code); + M_AssertEq( "hello", reinterpret_cast<char const*>(data.get())); + M_AssertTrue(code == asapo::HttpCode::OK); - M_AssertContains(response, args.answer); - M_AssertEq(static_cast<int>(code), args.code); + err = server_broker->httpclient__->Post(args.uri_fts + "/transfer", cookie, transfer, "bbb", &code); + M_AssertTrue(code == asapo::HttpCode::OK); return 0; } diff --git a/tests/automatic/file_transfer_service/CMakeLists.txt b/tests/automatic/file_transfer_service/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..798692340aa9b1edf63ff14ecd7c9f39ee57830b --- /dev/null +++ b/tests/automatic/file_transfer_service/CMakeLists.txt @@ -0,0 +1 @@ +add_subdirectory(rest_api) diff --git a/tests/automatic/file_transfer_service/rest_api/CMakeLists.txt b/tests/automatic/file_transfer_service/rest_api/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..7ead397a72d3fdf0c9d4dca1b8701a22b29e281f --- /dev/null +++ b/tests/automatic/file_transfer_service/rest_api/CMakeLists.txt @@ -0,0 +1,9 @@ +set(TARGET_NAME file_transfer_rest_api) + +################################ +# Testing +################################ +prepare_asapo() + +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/authorizer_settings.json.tpl.in authorizer.json.tpl @ONLY) +add_script_test("${TARGET_NAME}" "" nomem) diff --git a/tests/automatic/file_transfer_service/rest_api/authorizer_settings.json.tpl.in b/tests/automatic/file_transfer_service/rest_api/authorizer_settings.json.tpl.in new file mode 100644 index 0000000000000000000000000000000000000000..7b88592e44067c5c1f3d3f2cca3c849f7313f8e5 --- /dev/null +++ b/tests/automatic/file_transfer_service/rest_api/authorizer_settings.json.tpl.in @@ -0,0 +1,8 @@ +{ + "Port": {{ env "NOMAD_PORT_authorizer" }}, + "LogLevel":"debug", + "RootBeamtimesFolder":"@ASAP3_FOLDER@", + "CurrentBeamlinesFolder":"@CURRENT_BEAMLINES_FOLDER@", + "SecretFile":"auth_secret.key", + "TokenDurationMin":600 +} diff --git a/tests/automatic/file_transfer_service/rest_api/check_linux.sh b/tests/automatic/file_transfer_service/rest_api/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..89f45c4c8440887d845f022b61d7c1296e25e024 --- /dev/null +++ b/tests/automatic/file_transfer_service/rest_api/check_linux.sh @@ -0,0 +1,41 @@ +#!/usr/bin/env bash + +set -e + +trap Cleanup EXIT + +file_transfer_folder=`pwd`/asap3/petra3/gpfs/p01/2019/data/aaa + + +Cleanup() { + echo cleanup + nomad stop authorizer + nomad stop file_transfer + rm -rf $file_transfer_folder aaa big_file +} + +nomad run authorizer.nmd +nomad run file_transfer.nmd + +sleep 1 + +mkdir -p $file_transfer_folder + +token=bnCXpOdBV90wU1zybEw1duQNSORuwaKz6oDHqmL35p0= #token for aaa +folder_token=`curl --silent --data "{\"Folder\":\"$file_transfer_folder\",\"BeamtimeId\":\"aaa\",\"Token\":\"$token\"}" 127.0.0.1:5007/folder` +echo $folder_token + +echo hello > $file_transfer_folder/aaa + +curl -o aaa --silent -H "Authorization: Bearer ${folder_token}" --data "{\"Folder\":\"$file_transfer_folder\",\"FileName\":\"aaa\",\"Token\":\"$folder_token\"}" 127.0.0.1:5008/transfer --stderr - | tee /dev/stderr + +cat aaa | grep hello + +dd if=/dev/zero of=$file_transfer_folder/big_file bs=1 count=0 seek=5368709120 + +curl -vvv -o big_file -H "Authorization: Bearer ${folder_token}" --data "{\"Folder\":\"$file_transfer_folder\",\"FileName\":\"big_file\",\"Token\":\"$folder_token\"}" 127.0.0.1:5008/transfer --stderr - | tee /dev/stderr + +ls -ln big_file | awk '{ print $5 }' | tee /dev/stderr | grep 5368709120 + + + diff --git a/tests/automatic/file_transfer_service/rest_api/check_windows.bat b/tests/automatic/file_transfer_service/rest_api/check_windows.bat new file mode 100644 index 0000000000000000000000000000000000000000..a193eaefc45e3e90dd296b4e8d248ff40fec2e5e --- /dev/null +++ b/tests/automatic/file_transfer_service/rest_api/check_windows.bat @@ -0,0 +1,32 @@ +setlocal +SET beamtime_id=aaa +SET file_transfer_folder=%cd%\asap3\petra3\gpfs\p01\2019\data\%beamtime_id% +set file_transfer_folder=%file_transfer_folder:\=\\% + + +c:\opt\consul\nomad run authorizer.nmd +c:\opt\consul\nomad run file_transfer.nmd + +ping 1.0.0.0 -n 1 -w 100 > nul + +set token=bnCXpOdBV90wU1zybEw1duQNSORuwaKz6oDHqmL35p0= + +mkdir %file_transfer_folder% + +C:\Curl\curl.exe --silent --data "{\"Folder\":\"%file_transfer_folder%\",\"BeamtimeId\":\"aaa\",\"Token\":\"%token%\"}" 127.0.0.1:5007/folder > token +set /P folder_token=< token + +echo hello > %file_transfer_folder%\aaa + +C:\Curl\curl.exe --silent -H "Authorization: Bearer %folder_token%" --data "{\"Folder\":\"%file_transfer_folder%\",\"FileName\":\"aaa\",\"Token\":\"%folder_token%\"}" 127.0.0.1:5008/transfer --stderr - | findstr hello || goto :error + +goto :clean + +:error +call :clean +exit /b 1 + +:clean +c:\opt\consul\nomad stop authorizer +c:\opt\consul\nomad stop file_transfer +rmdir /S /Q %file_transfer_folder% diff --git a/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp b/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp index 34a1623ad0c80f60e60b985f9aacfd4f53aa3895..333d139fde158448629714a472e9c1ea631c94b1 100644 --- a/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp +++ b/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp @@ -35,7 +35,7 @@ void ProcessAfterSend(asapo::GenericRequestHeader header, asapo::Error err) { } BrokerPtr CreateBrokerAndGroup(const Args& args, Error* err) { - auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, ".", + auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, ".", true, asapo::SourceCredentials{args.beamtime_id, "", "", args.token}, err); if (*err) { return nullptr; diff --git a/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py b/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py index f52b8f4460d60254bbdbbccec174f9d0ccc84c1c..60ad65264b71f728a5f16c2a8babfb3d03d9c2f4 100644 --- a/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py +++ b/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py @@ -27,7 +27,7 @@ def callback(header,err): source, beamtime, token = sys.argv[1:] -broker = asapo_consumer.create_server_broker(source,".", beamtime,"",token,timeout) +broker = asapo_consumer.create_server_broker(source,".",True, beamtime,"",token,timeout) producer = asapo_producer.create_producer(source,beamtime,'auto', "", token, 1, 600) producer.set_log_level("debug") diff --git a/tests/automatic/full_chain/simple_chain_usermeta_python/get_user_meta.py b/tests/automatic/full_chain/simple_chain_usermeta_python/get_user_meta.py index a3ac8309400f77fd7d0b9181719173e530015023..5fb497e041b462bf7290286f805bf1fe646c0737 100644 --- a/tests/automatic/full_chain/simple_chain_usermeta_python/get_user_meta.py +++ b/tests/automatic/full_chain/simple_chain_usermeta_python/get_user_meta.py @@ -6,7 +6,7 @@ import sys source, path, beamtime, token, group_id = sys.argv[1:] -broker = asapo_consumer.create_server_broker(source,path, beamtime,"",token,60000) +broker = asapo_consumer.create_server_broker(source,path,True, beamtime,"",token,60000) images = broker.query_images("meta.user_meta regexp 'test*' order by _id") diff --git a/tests/automatic/settings/authorizer_settings.json.tpl.lin b/tests/automatic/settings/authorizer_settings.json.tpl.lin index 07988026d1779f11f3ae53d801eafd26abe6b553..bfab06d1339cfb1a3cb8de2e8da40cf1f5a9bbee 100644 --- a/tests/automatic/settings/authorizer_settings.json.tpl.lin +++ b/tests/automatic/settings/authorizer_settings.json.tpl.lin @@ -4,5 +4,6 @@ "AlwaysAllowedBeamtimes":[{"beamtimeId":"asapo_test","beamline":"test","core-path":"/tmp/asapo/receiver/files/test_facility/gpfs/test/2019/data/asapo_test"}, {"beamtimeId":"asapo_test1","beamline":"test1","core-path":"/tmp/asapo/receiver/files/test_facility/gpfs/test1/2019/data/asapo_test1"}, {"beamtimeId":"asapo_test2","beamline":"test2","core-path":"/tmp/asapo/receiver/files/test_facility/gpfs/test2/2019/data/asapo_test2"}], - "SecretFile":"auth_secret.key" + "SecretFile":"auth_secret.key", + "TokenDurationMin":600 } diff --git a/tests/automatic/settings/authorizer_settings.json.tpl.win b/tests/automatic/settings/authorizer_settings.json.tpl.win index 418da1031bbdd84616f635be9d83b3635232e036..755b4d25c697a13124477123b9f20a0e50035322 100644 --- a/tests/automatic/settings/authorizer_settings.json.tpl.win +++ b/tests/automatic/settings/authorizer_settings.json.tpl.win @@ -4,5 +4,6 @@ "AlwaysAllowedBeamtimes":[{"beamtimeId":"asapo_test","beamline":"test","core-path":"c:\\tmp\\asapo\\receiver\\files\\test_facility\\gpfs\\test\\2019\\data\\asapo_test"}, {"beamtimeId":"asapo_test1","beamline":"test1","core-path":"c:\\tmp\\asapo\\receiver\\files\\test_facility\\gpfs\\test1\\2019\\data\\asapo_test1"}, {"beamtimeId":"asapo_test2","beamline":"test2","core-path":"c:\\tmp\\asapo\\receiver\\files\\test_facility\\gpfs\\test2\\2019\\data\\asapo_test2"}], - "SecretFile":"auth_secret.key" + "SecretFile":"auth_secret.key", + "TokenDurationMin":600 } diff --git a/tests/automatic/settings/file_transfer_settings.json.tpl b/tests/automatic/settings/file_transfer_settings.json.tpl new file mode 100644 index 0000000000000000000000000000000000000000..0248e349825302f6a72d3f01fae4c230962d4f2b --- /dev/null +++ b/tests/automatic/settings/file_transfer_settings.json.tpl @@ -0,0 +1,5 @@ +{ + "Port": {{ env "NOMAD_PORT_file_transfer" }}, + "LogLevel":"debug", + "SecretFile":"auth_secret.key" +} diff --git a/tests/automatic/spd_logger/console/CMakeLists.txt b/tests/automatic/spd_logger/console/CMakeLists.txt index fa35faabf13c3ef6bf36738361f598cf9f87458d..398f0e7815c8b2ceea4747799949dc47cc61e5ea 100644 --- a/tests/automatic/spd_logger/console/CMakeLists.txt +++ b/tests/automatic/spd_logger/console/CMakeLists.txt @@ -5,7 +5,7 @@ set(SOURCE_FILES spd_logger_console.cpp) ################################ # Executable and link ################################ -add_executable(${TARGET_NAME} ${SOURCE_FILES} $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:curl_http_client>) +add_executable(${TARGET_NAME} ${SOURCE_FILES} $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:curl_http_client> $<TARGET_OBJECTS:system_io>) target_link_libraries(${TARGET_NAME} ${CURL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR}) diff --git a/tests/automatic/spd_logger/fluentd/CMakeLists.txt b/tests/automatic/spd_logger/fluentd/CMakeLists.txt index e4a0513d0549af6e95c07b412ab4c145596c842f..017f3905bcdc7fdd44dab3d7a6a86c2e809494bf 100644 --- a/tests/automatic/spd_logger/fluentd/CMakeLists.txt +++ b/tests/automatic/spd_logger/fluentd/CMakeLists.txt @@ -5,7 +5,7 @@ set(SOURCE_FILES spd_logger_fluentd.cpp) ################################ # Executable and link ################################ -add_executable(${TARGET_NAME} ${SOURCE_FILES} $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:curl_http_client>) +add_executable(${TARGET_NAME} ${SOURCE_FILES} $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:curl_http_client> $<TARGET_OBJECTS:system_io>) target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR}) target_link_libraries(${TARGET_NAME} ${CURL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) diff --git a/tests/manual/performance_broker_receiver/getlast_broker.cpp b/tests/manual/performance_broker_receiver/getlast_broker.cpp index 4272ae17c239cd8a7139d03835619af80761046a..1adcda25b2a1edee2db8379dfabc6229fa565987 100644 --- a/tests/manual/performance_broker_receiver/getlast_broker.cpp +++ b/tests/manual/performance_broker_receiver/getlast_broker.cpp @@ -47,7 +47,7 @@ std::vector<std::thread> StartThreads(const Args& params, auto exec_next = [¶ms, nfiles, errors, nbuf, nfiles_total](int i) { asapo::FileInfo fi; Error err; - auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path, + auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path, true, asapo::SourceCredentials{params.beamtime_id, "", "", params.token}, &err); broker->SetTimeout((uint64_t) params.timeout_ms); asapo::FileData data; diff --git a/tests/manual/python_tests/plot_images_online.py b/tests/manual/python_tests/plot_images_online.py index 829200981948f473083b41ce4bca327306cefa30..31b0e88e01bf99908bfb377d785331ba77541d34 100644 --- a/tests/manual/python_tests/plot_images_online.py +++ b/tests/manual/python_tests/plot_images_online.py @@ -9,7 +9,7 @@ import matplotlib.pyplot as plt #dset = f.create_dataset("mydataset", data = d1) #f.close() -broker, err = asapo_consumer.create_server_broker("psana002:8400", "/tmp", "asapo_test2","","yzgAcLmijSLWIm8dBiGNCbc0i42u5HSm-zR6FRqo__Y=", 1000000) +broker, err = asapo_consumer.create_server_broker("psana002:8400", "/tmp", True, "asapo_test2","","yzgAcLmijSLWIm8dBiGNCbc0i42u5HSm-zR6FRqo__Y=", 1000000) last_id = 0 while True: diff --git a/tests/manual/python_tests/test_p.py b/tests/manual/python_tests/test_p.py index 337cac212e868892737fcd60258076b9e890361b..764a2f377f2c06a94d46ba85d698935200d825c3 100644 --- a/tests/manual/python_tests/test_p.py +++ b/tests/manual/python_tests/test_p.py @@ -11,7 +11,7 @@ beamtime = "asapo_test" token = "KmUDdacgBzaOD3NIJvN1NmKGqWKtx0DK-NyPjdpeWkc=" broker, err = asapo_consumer.create_server_broker( - source, path, beamtime, token, 1000) + source, path, True, beamtime, token, 1000) group_id, err = broker.generate_group_id() if err is not None: