diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d5eafefdbd9fef7adbf9073f039828fa77af421..e6b3d88008fd71db9416646043ae082b7c5d371a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,10 @@ BREAKING CHANGES * Consumer API - get_next_dataset, get_last_dataset, get_dataset_by_id return dictionary with 'id','expected_size','content' fields, not tuple (id,content) as before * Consumer API - remove group_id argument from get_last/get_by_id/get_last_dataset/get_dataset_by_id functions * Producer API - changed meaning of subsets (subset_id replaced with id_in_subset and this means now id of the image within a subset (e.g. module number for multi-module detector)), file_id is now a global id of a multi-set data (i.g. multi-image id) + #### renaming - Producer API +* stream -> data_source, substream -> stream + #### renaming - Consumer API +* BUG FIXES * fix memory leak bug in Python consumer library (lead to problems when creating many consumer instances) diff --git a/authorizer/src/asapo_authorizer/server/authorize.go b/authorizer/src/asapo_authorizer/server/authorize.go index 394b652a7c3c45b7cef75545c3ebb235e2839122..edcf0b703767f142d746a56235155bef6b942e91 100644 --- a/authorizer/src/asapo_authorizer/server/authorize.go +++ b/authorizer/src/asapo_authorizer/server/authorize.go @@ -13,7 +13,7 @@ import ( type SourceCredentials struct { BeamtimeId string Beamline string - Stream string + DataSource string Token string Type string } @@ -30,8 +30,8 @@ func getSourceCredentials(request authorizationRequest) (SourceCredentials, erro return SourceCredentials{}, errors.New("cannot get source credentials from " + request.SourceCredentials) } creds := SourceCredentials{vals[1], vals[2], vals[3], vals[4],vals[0]} - if creds.Stream == "" { - creds.Stream = "detector" + if creds.DataSource == "" { + creds.DataSource = "detector" } if creds.Beamline == "" { @@ -124,7 +124,7 @@ func findBeamtimeMetaFromBeamline(beamline string) (beamtimeMeta, error) { func alwaysAllowed(creds SourceCredentials) (beamtimeMeta, bool) { for _, pair := range settings.AlwaysAllowedBeamtimes { if pair.BeamtimeId == creds.BeamtimeId { - pair.Stream = creds.Stream + pair.DataSource = creds.DataSource pair.Type = creds.Type return pair, true } @@ -198,7 +198,7 @@ func findMeta(creds SourceCredentials) (beamtimeMeta, error) { return beamtimeMeta{}, err } - meta.Stream = creds.Stream + meta.DataSource = creds.DataSource meta.Type = creds.Type return meta, nil diff --git a/authorizer/src/asapo_authorizer/server/authorize_test.go b/authorizer/src/asapo_authorizer/server/authorize_test.go index d3b8e36294e79e8c99a39c16a128135ba64b4671..4085b1b5b473ebbae4a06998e73e509ddee12c9b 100644 --- a/authorizer/src/asapo_authorizer/server/authorize_test.go +++ b/authorizer/src/asapo_authorizer/server/authorize_test.go @@ -58,16 +58,16 @@ var credTests = [] struct { ok bool message string } { - {"processed%asapo_test%auto%%", SourceCredentials{"asapo_test","auto","detector","","processed"},true,"auto beamline, stream and no token"}, - {"processed%asapo_test%auto%%token", SourceCredentials{"asapo_test","auto","detector","token","processed"},true,"auto beamline, stream"}, - {"processed%asapo_test%auto%stream%", SourceCredentials{"asapo_test","auto","stream","","processed"},true,"auto beamline, no token"}, - {"processed%asapo_test%auto%stream%token", SourceCredentials{"asapo_test","auto","stream","token","processed"},true,"auto beamline,stream, token"}, - {"processed%asapo_test%beamline%stream%token", SourceCredentials{"asapo_test","beamline","stream","token","processed"},true,"all set"}, - {"processed%auto%beamline%stream%token", SourceCredentials{"auto","beamline","stream","token","processed"},true,"auto beamtime"}, - {"raw%auto%auto%stream%token", SourceCredentials{},false,"auto beamtime and beamline"}, - {"raw%%beamline%stream%token", SourceCredentials{"auto","beamline","stream","token","raw"},true,"empty beamtime"}, - {"raw%asapo_test%%stream%token", SourceCredentials{"asapo_test","auto","stream","token","raw"},true,"empty bealine"}, - {"raw%%%stream%token", SourceCredentials{},false,"both empty"}, + {"processed%asapo_test%auto%%", SourceCredentials{"asapo_test","auto","detector","","processed"},true,"auto beamline, source and no token"}, + {"processed%asapo_test%auto%%token", SourceCredentials{"asapo_test","auto","detector","token","processed"},true,"auto beamline, source"}, + {"processed%asapo_test%auto%source%", SourceCredentials{"asapo_test","auto","source","","processed"},true,"auto beamline, no token"}, + {"processed%asapo_test%auto%source%token", SourceCredentials{"asapo_test","auto","source","token","processed"},true,"auto beamline,source, token"}, + {"processed%asapo_test%beamline%source%token", SourceCredentials{"asapo_test","beamline","source","token","processed"},true,"all set"}, + {"processed%auto%beamline%source%token", SourceCredentials{"auto","beamline","source","token","processed"},true,"auto beamtime"}, + {"raw%auto%auto%source%token", SourceCredentials{},false,"auto beamtime and beamline"}, + {"raw%%beamline%source%token", SourceCredentials{"auto","beamline","source","token","raw"},true,"empty beamtime"}, + {"raw%asapo_test%%source%token", SourceCredentials{"asapo_test","auto","source","token","raw"},true,"empty bealine"}, + {"raw%%%source%token", SourceCredentials{},false,"both empty"}, } func TestSplitCreds(t *testing.T) { @@ -150,46 +150,46 @@ var authTests = [] struct { source_type string beamtime_id string beamline string - stream string + dataSource string token string originHost string status int message string answer string }{ - {"processed","test","auto","stream", prepareToken("test"),"127.0.0.2",http.StatusOK,"user stream with correct token", - `{"beamtimeId":"test","beamline":"bl1","stream":"stream","core-path":"./tf/gpfs/bl1/2019/data/test","beamline-path":"","source-type":"processed"}`}, - {"processed","test_online","auto","stream", prepareToken("test_online"),"127.0.0.1",http.StatusOK,"with online path, processed type", - `{"beamtimeId":"test_online","beamline":"bl1","stream":"stream","core-path":"./tf/gpfs/bl1/2019/data/test_online","beamline-path":"","source-type":"processed"}`}, - {"processed","test1","auto","stream", prepareToken("test1"),"127.0.0.1",http.StatusUnauthorized,"correct token, beamtime not found", + {"processed","test","auto","dataSource", prepareToken("test"),"127.0.0.2",http.StatusOK,"user source with correct token", + `{"beamtimeId":"test","beamline":"bl1","dataSource":"dataSource","core-path":"./tf/gpfs/bl1/2019/data/test","beamline-path":"","source-type":"processed"}`}, + {"processed","test_online","auto","dataSource", prepareToken("test_online"),"127.0.0.1",http.StatusOK,"with online path, processed type", + `{"beamtimeId":"test_online","beamline":"bl1","dataSource":"dataSource","core-path":"./tf/gpfs/bl1/2019/data/test_online","beamline-path":"","source-type":"processed"}`}, + {"processed","test1","auto","dataSource", prepareToken("test1"),"127.0.0.1",http.StatusUnauthorized,"correct token, beamtime not found", ""}, - {"processed","test","auto","stream", prepareToken("wrong"),"127.0.0.1",http.StatusUnauthorized,"user stream with wrong token", + {"processed","test","auto","dataSource", prepareToken("wrong"),"127.0.0.1",http.StatusUnauthorized,"user source with wrong token", ""}, - {"processed","test","bl1","stream", prepareToken("test"),"127.0.0.1",http.StatusOK,"correct beamline given", - `{"beamtimeId":"test","beamline":"bl1","stream":"stream","core-path":"./tf/gpfs/bl1/2019/data/test","beamline-path":"","source-type":"processed"}`}, - {"processed","test","bl2","stream", prepareToken("test"),"127.0.0.1",http.StatusUnauthorized,"incorrect beamline given", + {"processed","test","bl1","dataSource", prepareToken("test"),"127.0.0.1",http.StatusOK,"correct beamline given", + `{"beamtimeId":"test","beamline":"bl1","dataSource":"dataSource","core-path":"./tf/gpfs/bl1/2019/data/test","beamline-path":"","source-type":"processed"}`}, + {"processed","test","bl2","dataSource", prepareToken("test"),"127.0.0.1",http.StatusUnauthorized,"incorrect beamline given", ""}, - {"processed","auto","p07", "stream",prepareToken("bl_p07"),"127.0.0.1",http.StatusOK,"beamtime found", - `{"beamtimeId":"11111111","beamline":"p07","stream":"stream","core-path":"asap3/petra3/gpfs/p07/2020/data/11111111","beamline-path":"","source-type":"processed"}`}, - {"processed","auto","p07", "stream",prepareToken("bl_p06"),"127.0.0.1",http.StatusUnauthorized,"wrong token", + {"processed","auto","p07", "dataSource",prepareToken("bl_p07"),"127.0.0.1",http.StatusOK,"beamtime found", + `{"beamtimeId":"11111111","beamline":"p07","dataSource":"dataSource","core-path":"asap3/petra3/gpfs/p07/2020/data/11111111","beamline-path":"","source-type":"processed"}`}, + {"processed","auto","p07", "dataSource",prepareToken("bl_p06"),"127.0.0.1",http.StatusUnauthorized,"wrong token", ""}, - {"processed","auto","p08", "stream",prepareToken("bl_p08"),"127.0.0.1",http.StatusUnauthorized,"beamtime not found", + {"processed","auto","p08", "dataSource",prepareToken("bl_p08"),"127.0.0.1",http.StatusUnauthorized,"beamtime not found", ""}, - {"raw","test_online","auto","stream", prepareToken("test_online"),"127.0.0.1",http.StatusOK,"raw type", - `{"beamtimeId":"test_online","beamline":"bl1","stream":"stream","core-path":"./tf/gpfs/bl1/2019/data/test_online","beamline-path":"./bl1/current","source-type":"raw"}`}, - {"raw","test_online","auto","stream", "","127.0.0.1",http.StatusOK,"raw type", - `{"beamtimeId":"test_online","beamline":"bl1","stream":"stream","core-path":"./tf/gpfs/bl1/2019/data/test_online","beamline-path":"./bl1/current","source-type":"raw"}`}, - {"raw","auto","p07","stream", "","127.0.0.1",http.StatusOK,"raw type, auto beamtime", - `{"beamtimeId":"11111111","beamline":"p07","stream":"stream","core-path":"asap3/petra3/gpfs/p07/2020/data/11111111","beamline-path":"./p07/current","source-type":"raw"}`}, + {"raw","test_online","auto","dataSource", prepareToken("test_online"),"127.0.0.1",http.StatusOK,"raw type", + `{"beamtimeId":"test_online","beamline":"bl1","dataSource":"dataSource","core-path":"./tf/gpfs/bl1/2019/data/test_online","beamline-path":"./bl1/current","source-type":"raw"}`}, + {"raw","test_online","auto","dataSource", "","127.0.0.1",http.StatusOK,"raw type", + `{"beamtimeId":"test_online","beamline":"bl1","dataSource":"dataSource","core-path":"./tf/gpfs/bl1/2019/data/test_online","beamline-path":"./bl1/current","source-type":"raw"}`}, + {"raw","auto","p07","dataSource", "","127.0.0.1",http.StatusOK,"raw type, auto beamtime", + `{"beamtimeId":"11111111","beamline":"p07","dataSource":"dataSource","core-path":"asap3/petra3/gpfs/p07/2020/data/11111111","beamline-path":"./p07/current","source-type":"raw"}`}, {"raw","auto","p07","noldap", "","127.0.0.1",http.StatusNotFound,"no conection to ldap", ""}, - {"raw","test_online","auto","stream", "","127.0.0.2",http.StatusUnauthorized,"raw type, wrong origin host", + {"raw","test_online","auto","dataSource", "","127.0.0.2",http.StatusUnauthorized,"raw type, wrong origin host", ""}, - {"raw","test","auto","stream", prepareToken("test"),"127.0.0.1",http.StatusUnauthorized,"raw when not online", + {"raw","test","auto","dataSource", prepareToken("test"),"127.0.0.1",http.StatusUnauthorized,"raw when not online", ""}, - {"processed","test","auto","stream", "","127.0.0.1:1001",http.StatusOK,"processed without token", - `{"beamtimeId":"test","beamline":"bl1","stream":"stream","core-path":"./tf/gpfs/bl1/2019/data/test","beamline-path":"","source-type":"processed"}`}, - {"processed","test","auto","stream", "","127.0.0.2",http.StatusUnauthorized,"processed without token, wrong host", + {"processed","test","auto","dataSource", "","127.0.0.1:1001",http.StatusOK,"processed without token", + `{"beamtimeId":"test","beamline":"bl1","dataSource":"dataSource","core-path":"./tf/gpfs/bl1/2019/data/test","beamline-path":"","source-type":"processed"}`}, + {"processed","test","auto","dataSource", "","127.0.0.2",http.StatusUnauthorized,"processed without token, wrong host", ""}, } @@ -222,7 +222,7 @@ func TestAuthorize(t *testing.T) { bl = "bl1" } expected_filter:="a3"+bl+"-hosts" - if test.stream == "noldap" { + if test.dataSource == "noldap" { err := &common.ServerError{utils.StatusServiceUnavailable,""} mockClient.On("GetAllowedIpsForBeamline", expected_uri, expected_base,expected_filter).Return([]string{}, err) } else { @@ -230,7 +230,7 @@ func TestAuthorize(t *testing.T) { } } - request := makeRequest(authorizationRequest{test.source_type+"%"+test.beamtime_id+"%"+test.beamline+"%"+test.stream+"%"+test.token,test.originHost}) + request := makeRequest(authorizationRequest{test.source_type+"%"+test.beamtime_id+"%"+test.beamline+"%"+test.dataSource+"%"+test.token,test.originHost}) w := doPostRequest("/authorize",request) body, _ := ioutil.ReadAll(w.Body) diff --git a/authorizer/src/asapo_authorizer/server/server.go b/authorizer/src/asapo_authorizer/server/server.go index e5f7518738ecce6cc70722de09161b39369f698e..7dc7aca8c467718c6324eb55e131b2da8c8defb7 100644 --- a/authorizer/src/asapo_authorizer/server/server.go +++ b/authorizer/src/asapo_authorizer/server/server.go @@ -8,7 +8,7 @@ import ( type beamtimeMeta struct { BeamtimeId string `json:"beamtimeId"` Beamline string `json:"beamline"` - Stream string `json:"stream"` + DataSource string `json:"dataSource"` OfflinePath string `json:"core-path"` OnlinePath string `json:"beamline-path"` Type string `json:"source-type"` diff --git a/broker/src/asapo_broker/server/listroutes.go b/broker/src/asapo_broker/server/listroutes.go index f971d71994e0ba718f383dd41d7fb65c351fc331..a59d65bca869786882a1bcff2d5d5d5f652f2840 100644 --- a/broker/src/asapo_broker/server/listroutes.go +++ b/broker/src/asapo_broker/server/listroutes.go @@ -8,49 +8,49 @@ var listRoutes = utils.Routes{ utils.Route{ "GetNext", "Get", - "/database/{dbname}/{stream}/{substream}/{groupid}/next", + "/database/{dbname}/{datasource}/{substream}/{groupid}/next", routeGetNext, }, utils.Route{ "GetSize", "Get", - "/database/{dbname}/{stream}/{substream}/size", + "/database/{dbname}/{datasource}/{substream}/size", routeGetSize, }, utils.Route{ "GetSubstreams", "Get", - "/database/{dbname}/{stream}/{substream}/substreams", + "/database/{dbname}/{datasource}/{substream}/substreams", routeGetSubstreams, }, utils.Route{ "GetLast", "Get", - "/database/{dbname}/{stream}/{substream}/0/last", + "/database/{dbname}/{datasource}/{substream}/0/last", routeGetLast, }, utils.Route{ "GetLastAck", "Get", - "/database/{dbname}/{stream}/{substream}/{groupid}/lastack", + "/database/{dbname}/{datasource}/{substream}/{groupid}/lastack", routeGetLastAck, }, utils.Route{ "GetNacks", "Get", - "/database/{dbname}/{stream}/{substream}/{groupid}/nacks", + "/database/{dbname}/{datasource}/{substream}/{groupid}/nacks", routeGetNacks, }, utils.Route{ "GetID", "Get", - "/database/{dbname}/{stream}/{substream}/0/{id}", + "/database/{dbname}/{datasource}/{substream}/0/{id}", routeGetByID, }, utils.Route{ "GetMeta", "Get", - "/database/{dbname}/{stream}/{substream}/0/meta/{id}", + "/database/{dbname}/{datasource}/{substream}/0/meta/{id}", routeGetMeta, }, utils.Route{ @@ -62,19 +62,19 @@ var listRoutes = utils.Routes{ utils.Route{ "QueryImages", "Post", - "/database/{dbname}/{stream}/{substream}/0/queryimages", + "/database/{dbname}/{datasource}/{substream}/0/queryimages", routeQueryImages, }, utils.Route{ "ResetConter", "Post", - "/database/{dbname}/{stream}/{substream}/{groupid}/resetcounter", + "/database/{dbname}/{datasource}/{substream}/{groupid}/resetcounter", routeResetCounter, }, utils.Route{ "ImageOp", "Post", - "/database/{dbname}/{stream}/{substream}/{groupid}/{id}", + "/database/{dbname}/{datasource}/{substream}/{groupid}/{id}", routeImageOp, }, utils.Route{ diff --git a/broker/src/asapo_broker/server/process_request.go b/broker/src/asapo_broker/server/process_request.go index 4adf102b6d48b8319780b1af186122251a45bdc9..ab1c4d5f213257f5a9c92d7874aa636fbb11caae 100644 --- a/broker/src/asapo_broker/server/process_request.go +++ b/broker/src/asapo_broker/server/process_request.go @@ -13,7 +13,7 @@ func extractRequestParameters(r *http.Request, needGroupID bool) (string, string vars := mux.Vars(r) db_name, ok1 := vars["dbname"] - stream, ok3 := vars["stream"] + datasource, ok3 := vars["datasource"] substream, ok4 := vars["substream"] ok2 := true @@ -21,7 +21,7 @@ func extractRequestParameters(r *http.Request, needGroupID bool) (string, string if needGroupID { group_id, ok2 = vars["groupid"] } - return db_name, stream, substream, group_id, ok1 && ok2 && ok3 && ok4 + return db_name, datasource, substream, group_id, ok1 && ok2 && ok3 && ok4 } func IsLetterOrNumbers(s string) bool { @@ -52,7 +52,7 @@ func checkGroupID(w http.ResponseWriter, needGroupID bool, group_id string, db_n func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_param string, needGroupID bool) { r.Header.Set("Content-type", "application/json") w.Header().Set("Access-Control-Allow-Origin", "*") - db_name, stream, substream, group_id, ok := extractRequestParameters(r, needGroupID) + db_name, datasource, substream, group_id, ok := extractRequestParameters(r, needGroupID) if !ok { w.WriteHeader(http.StatusBadRequest) return @@ -68,7 +68,7 @@ func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_par } request := database.Request{} - request.DbName = db_name+"_"+stream + request.DbName = db_name+"_"+datasource request.Op = op request.ExtraParam = extra_param request.DbCollectionName = substream diff --git a/broker/src/asapo_broker/server/process_request_test.go b/broker/src/asapo_broker/server/process_request_test.go index 5aa7b28fc2a622dc88a6ec6f0c02c1951517c233..24605fdc0378378341d3fbb4e4d15ec13d4bc973 100644 --- a/broker/src/asapo_broker/server/process_request_test.go +++ b/broker/src/asapo_broker/server/process_request_test.go @@ -20,7 +20,7 @@ var correctTokenSuffix, wrongTokenSuffix, suffixWithWrongToken, expectedBeamtime const expectedGroupID = "bid2a5auidddp1vl71d0" const wrongGroupID = "_bid2a5auidddp1vl71" -const expectedStream = "stream" +const expectedStream = "datasource" const expectedSubstream = "substream" func prepareTestAuth() { diff --git a/common/cpp/include/asapo/common/data_structs.h b/common/cpp/include/asapo/common/data_structs.h index c17921eb780068ba8ba2c85c996e62f16457baf8..738b3d8b1aa11e8104d4d4158ff1f431365233bb 100644 --- a/common/cpp/include/asapo/common/data_structs.h +++ b/common/cpp/include/asapo/common/data_structs.h @@ -80,10 +80,10 @@ Error GetSourceTypeFromString(std::string stype,SourceType *type); std::string GetStringFromSourceType(SourceType type); struct SourceCredentials { - SourceCredentials(SourceType type, std::string beamtime, std::string beamline, std::string stream, std::string token): + SourceCredentials(SourceType type, std::string beamtime, std::string beamline, std::string data_source, std::string token): beamtime_id{std::move(beamtime)}, beamline{std::move(beamline)}, - stream{std::move(stream)}, + data_source{std::move(data_source)}, user_token{std::move(token)}, type{type}{}; SourceCredentials() {}; @@ -92,11 +92,11 @@ struct SourceCredentials { static const std::string kDefaultBeamtimeId; std::string beamtime_id; std::string beamline; - std::string stream; + std::string data_source; std::string user_token; SourceType type = SourceType::kProcessed; std::string GetString() { - return (type==SourceType::kRaw?std::string("raw"):std::string("processed")) + "%"+ beamtime_id + "%" + beamline + "%" + stream + "%" + user_token; + return (type==SourceType::kRaw?std::string("raw"):std::string("processed")) + "%"+ beamtime_id + "%" + beamline + "%" + data_source + "%" + user_token; }; }; diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp index 590ffcc55812248869f589ab3e2a5be152e86dee..b5189be3796ba86631e50da95234707b090e4973 100644 --- a/consumer/api/cpp/src/server_data_broker.cpp +++ b/consumer/api/cpp/src/server_data_broker.cpp @@ -114,8 +114,8 @@ ServerDataBroker::ServerDataBroker(std::string server_uri, // net_client__ will be lazy initialized - if (source_credentials_.stream.empty()) { - source_credentials_.stream = SourceCredentials::kDefaultStream; + if (source_credentials_.data_source.empty()) { + source_credentials_.data_source = SourceCredentials::kDefaultStream; } } @@ -246,7 +246,7 @@ Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string g interrupt_flag_= false; std::string request_suffix = OpToUriCmd(op); std::string request_group = OpToUriCmd(op); - std::string request_api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + std::string request_api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + "/" + std::move(substream); uint64_t elapsed_ms = 0; Error no_data_error; @@ -529,7 +529,7 @@ Error ServerDataBroker::ResetLastReadMarker(std::string group_id, std::string su Error ServerDataBroker::SetLastReadMarker(uint64_t value, std::string group_id, std::string substream) { RequestInfo ri; - ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/" + ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + "/" + std::move(substream) + "/" + std::move(group_id) + "/resetcounter"; ri.extra_params = "&value=" + std::to_string(value); ri.post = true; @@ -541,7 +541,7 @@ Error ServerDataBroker::SetLastReadMarker(uint64_t value, std::string group_id, uint64_t ServerDataBroker::GetCurrentSize(std::string substream, Error* err) { RequestInfo ri; - ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + + ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + +"/" + std::move(substream) + "/size"; auto responce = BrokerRequestWithTimeout(ri, err); if (*err) { @@ -575,7 +575,7 @@ Error ServerDataBroker::GetRecordFromServerById(uint64_t id, std::string* respon std::string substream, bool dataset, uint64_t min_size) { RequestInfo ri; - ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + + ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + +"/" + std::move(substream) + "/" + std::move( group_id) + "/" + std::to_string(id); @@ -591,7 +591,7 @@ Error ServerDataBroker::GetRecordFromServerById(uint64_t id, std::string* respon std::string ServerDataBroker::GetBeamtimeMeta(Error* err) { RequestInfo ri; - ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/default/0/meta/0"; + ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + "/default/0/meta/0"; return BrokerRequestWithTimeout(ri, err); } @@ -608,7 +608,7 @@ DataSet DecodeDatasetFromResponse(std::string response, Error* err) { FileInfos ServerDataBroker::QueryImages(std::string query, std::string substream, Error* err) { RequestInfo ri; - ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + + ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + "/" + std::move(substream) + "/0/queryimages"; ri.post = true; ri.body = std::move(query); @@ -692,7 +692,7 @@ StreamInfos ParseSubstreamsFromResponse(std::string response, Error* err) { StreamInfos ServerDataBroker::GetSubstreamList(std::string from, Error* err) { RequestInfo ri; - ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/0/substreams"; + ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + "/0/substreams"; ri.post = false; if (!from.empty()) { ri.extra_params = "&from=" + from; @@ -762,7 +762,7 @@ Error ServerDataBroker::GetDataFromFileTransferService(FileInfo* info, FileData* Error ServerDataBroker::Acknowledge(std::string group_id, uint64_t id, std::string substream) { RequestInfo ri; - ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + + ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + +"/" + std::move(substream) + "/" + std::move(group_id) + "/" + std::to_string(id); ri.post = true; @@ -779,7 +779,7 @@ IdList ServerDataBroker::GetUnacknowledgedTupleIds(std::string group_id, uint64_t to_id, Error* error) { RequestInfo ri; - ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + + ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + +"/" + std::move(substream) + "/" + std::move(group_id) + "/nacks"; ri.extra_params = "&from=" + std::to_string(from_id) + "&to=" + std::to_string(to_id); @@ -807,7 +807,7 @@ IdList ServerDataBroker::GetUnacknowledgedTupleIds(std::string group_id, uint64_t ServerDataBroker::GetLastAcknowledgedTulpeId(std::string group_id, std::string substream, Error* error) { RequestInfo ri; - ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + + ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + +"/" + std::move(substream) + "/" + std::move(group_id) + "/lastack"; @@ -843,7 +843,7 @@ Error ServerDataBroker::NegativeAcknowledge(std::string group_id, uint64_t delay_sec, std::string substream) { RequestInfo ri; - ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + + ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + +"/" + std::move(substream) + "/" + std::move(group_id) + "/" + std::to_string(id); ri.post = true; diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp index 54d3075ab968ac11b284d0f77be9a53756f94c12..b1a3ce36d82634cc840af17ce57f146e3a89c31a 100644 --- a/consumer/api/cpp/unittests/test_server_broker.cpp +++ b/consumer/api/cpp/unittests/test_server_broker.cpp @@ -72,7 +72,7 @@ class ServerDataBrokerTests : public Test { std::string expected_filename = "filename"; std::string expected_full_path = std::string("/tmp/beamline/beamtime") + asapo::kPathSeparator + expected_filename; std::string expected_group_id = "groupid"; - std::string expected_stream = "stream"; + std::string expected_data_source = "source"; std::string expected_substream = "substream"; std::string expected_metadata = "{\"meta\":1}"; std::string expected_query_string = "bla"; @@ -93,14 +93,14 @@ class ServerDataBrokerTests : public Test { expected_path, true, asapo::SourceCredentials{asapo::SourceType::kProcessed, expected_beamtime_id, "", - expected_stream, expected_token}) + expected_data_source, expected_token}) }; fts_data_broker = std::unique_ptr<ServerDataBroker>{ new ServerDataBroker(expected_server_uri, expected_path, false, asapo::SourceCredentials{asapo::SourceType::kProcessed, expected_beamtime_id, "", - expected_stream, expected_token}) + expected_data_source, expected_token}) }; data_broker->io__ = std::unique_ptr<IO>{&mock_io}; data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient>{&mock_http_client}; @@ -215,7 +215,7 @@ TEST_F(ServerDataBrokerTests, DefaultStreamIsDetector) { TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/" + expected_group_id + "/next?token=" + expected_token, _, _)).WillOnce(DoAll( @@ -228,7 +228,7 @@ TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) { TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUriWithSubstream) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" + expected_substream + "/" + expected_group_id + "/next?token=" + expected_token, _, _)).WillOnce(DoAll( @@ -242,7 +242,7 @@ TEST_F(ServerDataBrokerTests, GetLastUsesCorrectUri) { MockGetBrokerUri(); EXPECT_CALL(mock_http_client, - Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/last?token=" + Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/last?token=" + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), @@ -411,7 +411,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsNoDataAfterTimeoutEvenIfOtherErrorO Return("{\"op\":\"get_record_by_id\",\"id\":" + std::to_string(expected_dataset_id) + ",\"id_max\":2,\"next_substream\":\"""\"}"))); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/" + std::to_string(expected_dataset_id) + "?token=" + expected_token, _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll( SetArgPointee<1>(HttpCode::NotFound), @@ -595,7 +595,7 @@ TEST_F(ServerDataBrokerTests, ResetCounterByDefaultUsesCorrectUri) { data_broker->SetTimeout(100); EXPECT_CALL(mock_http_client, - Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" + + Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/" + expected_group_id + "/resetcounter?token=" + expected_token + "&value=0", _, _, _, _)).WillOnce(DoAll( SetArgPointee<3>(HttpCode::OK), @@ -610,7 +610,7 @@ TEST_F(ServerDataBrokerTests, ResetCounterUsesCorrectUri) { data_broker->SetTimeout(100); EXPECT_CALL(mock_http_client, - Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" + + Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/" + expected_group_id + "/resetcounter?token=" + expected_token + "&value=10", _, _, _, _)).WillOnce(DoAll( SetArgPointee<3>(HttpCode::OK), @@ -624,7 +624,7 @@ TEST_F(ServerDataBrokerTests, ResetCounterUsesCorrectUriWithSubstream) { MockGetBrokerUri(); data_broker->SetTimeout(100); - EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" + + EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" + expected_substream + "/" + expected_group_id + "/resetcounter?token=" + expected_token + "&value=10", _, _, _, _)).WillOnce(DoAll( @@ -639,7 +639,7 @@ TEST_F(ServerDataBrokerTests, GetCurrentSizeUsesCorrectUri) { MockGetBrokerUri(); data_broker->SetTimeout(100); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/size?token=" + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), @@ -655,7 +655,7 @@ TEST_F(ServerDataBrokerTests, GetCurrentSizeUsesCorrectUriWithSubstream) { MockGetBrokerUri(); data_broker->SetTimeout(100); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" + expected_substream + "/size?token=" + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), @@ -671,7 +671,7 @@ TEST_F(ServerDataBrokerTests, GetCurrentSizeErrorOnWrongResponce) { MockGetBrokerUri(); data_broker->SetTimeout(100); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/size?token=" + expected_token, _, _)).WillRepeatedly(DoAll( SetArgPointee<1>(HttpCode::Unauthorized), @@ -687,7 +687,7 @@ TEST_F(ServerDataBrokerTests, GetNDataErrorOnWrongParse) { MockGetBrokerUri(); data_broker->SetTimeout(100); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/size?token=" + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), @@ -705,7 +705,7 @@ TEST_F(ServerDataBrokerTests, GetByIdUsesCorrectUri) { auto to_send = CreateFI(); auto json = to_send.Json(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/" + std::to_string( expected_dataset_id) + "?token=" + expected_token, _, @@ -724,7 +724,7 @@ TEST_F(ServerDataBrokerTests, GetByIdTimeouts) { MockGetBrokerUri(); data_broker->SetTimeout(10); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/" + std::to_string(expected_dataset_id) + "?token=" + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::Conflict), @@ -740,7 +740,7 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsEndOfStream) { MockGetBrokerUri(); data_broker->SetTimeout(10); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/" + std::to_string(expected_dataset_id) + "?token=" + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::Conflict), @@ -756,7 +756,7 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsEndOfStreamWhenIdTooLarge) { MockGetBrokerUri(); data_broker->SetTimeout(10); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/" + std::to_string(expected_dataset_id) + "?token=" + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::Conflict), @@ -772,7 +772,7 @@ TEST_F(ServerDataBrokerTests, GetMetaDataOK) { MockGetBrokerUri(); data_broker->SetTimeout(100); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/meta/0?token=" + expected_token, _, _)).WillOnce(DoAll( @@ -878,7 +878,7 @@ TEST_F(ServerDataBrokerTests, QueryImagesReturnRecords) { auto responce_string = "[" + json1 + "," + json2 + "]"; EXPECT_CALL(mock_http_client, - Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0" + + Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0" + "/queryimages?token=" + expected_token, _, expected_query_string, _, _)).WillOnce(DoAll( SetArgPointee<3>(HttpCode::OK), SetArgPointee<4>(nullptr), @@ -899,7 +899,7 @@ TEST_F(ServerDataBrokerTests, QueryImagesUsesCorrectUriWithSubstream) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" + + EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" + expected_substream + "/0" + "/queryimages?token=" + expected_token, _, expected_query_string, _, _)).WillOnce(DoAll( SetArgPointee<3>(HttpCode::OK), @@ -917,7 +917,7 @@ TEST_F(ServerDataBrokerTests, QueryImagesUsesCorrectUriWithSubstream) { TEST_F(ServerDataBrokerTests, GetNextDatasetUsesCorrectUri) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/" + expected_group_id + "/next?token=" + expected_token + "&dataset=true&minsize=0", _, _)).WillOnce(DoAll( @@ -1046,7 +1046,7 @@ TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUri) { MockGetBrokerUri(); EXPECT_CALL(mock_http_client, - Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/last?token=" + Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/last?token=" + expected_token + "&dataset=true&minsize=2", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), @@ -1059,7 +1059,7 @@ TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUri) { TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUriWithSubstream) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" + expected_substream + "/0/last?token=" + expected_token + "&dataset=true&minsize=1", _, _)).WillOnce(DoAll( @@ -1073,7 +1073,7 @@ TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUriWithSubstream) { TEST_F(ServerDataBrokerTests, GetDatasetByIdUsesCorrectUri) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/" + std::to_string(expected_dataset_id) + "?token=" + expected_token + "&dataset=true" + "&minsize=0", _, _)).WillOnce(DoAll( @@ -1089,7 +1089,7 @@ TEST_F(ServerDataBrokerTests, GetSubstreamListUsesCorrectUri) { std::string return_substreams = R"({"substreams":[{"lastId":123,"name":"test","timestampCreated":1000000},{"name":"test1","timestampCreated":2000000}]})"; EXPECT_CALL(mock_http_client, - Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/0/substreams" + Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/0/substreams" + "?token=" + expected_token + "&from=stream_from", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), @@ -1108,7 +1108,7 @@ TEST_F(ServerDataBrokerTests, GetSubstreamListUsesCorrectUri) { TEST_F(ServerDataBrokerTests, GetSubstreamListUsesCorrectUriWithoutFrom) { MockGetBrokerUri(); EXPECT_CALL(mock_http_client, - Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/0/substreams" + Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/0/substreams" + "?token=" + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), @@ -1250,7 +1250,7 @@ TEST_F(ServerDataBrokerTests, GetImageTriesToGetTokenAgainIfTransferFailed) { TEST_F(ServerDataBrokerTests, AcknowledgeUsesCorrectUri) { MockGetBrokerUri(); auto expected_acknowledge_command = "{\"Op\":\"ackimage\"}"; - EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" + + EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" + expected_substream + "/" + expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token=" @@ -1268,7 +1268,7 @@ TEST_F(ServerDataBrokerTests, AcknowledgeUsesCorrectUriWithDefaultSubStream) { MockGetBrokerUri(); auto expected_acknowledge_command = "{\"Op\":\"ackimage\"}"; EXPECT_CALL(mock_http_client, - Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" + + Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/" + expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token=" + expected_token, _, expected_acknowledge_command, _, _)).WillOnce(DoAll( @@ -1283,7 +1283,7 @@ TEST_F(ServerDataBrokerTests, AcknowledgeUsesCorrectUriWithDefaultSubStream) { void ServerDataBrokerTests::ExpectIdList(bool error) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" + expected_substream + "/" + expected_group_id + "/nacks?token=" + expected_token + "&from=1&to=0", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), @@ -1301,7 +1301,7 @@ TEST_F(ServerDataBrokerTests, GetUnAcknowledgedListReturnsIds) { } void ServerDataBrokerTests::ExpectLastAckId(bool empty_response) { - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" + expected_substream + "/" + expected_group_id + "/lastack?token=" + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), @@ -1339,7 +1339,7 @@ TEST_F(ServerDataBrokerTests, GetByIdErrorsForId0) { TEST_F(ServerDataBrokerTests, ResendNacks) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/" + expected_group_id + "/next?token=" + expected_token + "&resend_nacks=true&delay_sec=10&resend_attempts=3", _, _)).WillOnce(DoAll( @@ -1354,7 +1354,7 @@ TEST_F(ServerDataBrokerTests, ResendNacks) { TEST_F(ServerDataBrokerTests, NegativeAcknowledgeUsesCorrectUri) { MockGetBrokerUri(); auto expected_neg_acknowledge_command = R"({"Op":"negackimage","Params":{"DelaySec":10}})"; - EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" + + EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" + expected_substream + "/" + expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token=" diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index a1182ee75b5608f5d796cfcaf762a930446b7215..9cb5824c0e5d511feeb01c1af82a06ba5ba11e92 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -43,7 +43,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo": FileInfos content struct SourceCredentials: string beamtime_id - string stream + string data_source string user_token cppclass StreamInfo: string Json(bool add_last_id) diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index fc5ea16b1cad54f0611ef4bb4d251c5148bac97e..69e32ee50b6477d87b2ce2f1f83bb25fea0369f4 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -324,14 +324,14 @@ cdef class __PyDataBrokerFactory: def __cinit__(self): with nogil: self.c_factory = DataBrokerFactory() - def create_server_broker(self,server_name,source_path,has_filesystem,beamtime_id,stream,token,timeout): + def create_server_broker(self,server_name,source_path,has_filesystem,beamtime_id,data_source,token,timeout): cdef string b_server_name = _bytes(server_name) cdef string b_source_path = _bytes(source_path) cdef bool b_has_filesystem = has_filesystem cdef SourceCredentials source source.beamtime_id = _bytes(beamtime_id) source.user_token = _bytes(token) - source.stream = _bytes(stream) + source.data_source = _bytes(data_source) cdef Error err broker = PyDataBroker() with nogil: @@ -341,7 +341,7 @@ cdef class __PyDataBrokerFactory: broker.c_broker.get().SetTimeout(timeout) return broker -def create_server_broker(server_name,source_path,has_filesystem,beamtime_id,stream,token,timeout_ms): +def create_server_broker(server_name,source_path,has_filesystem,beamtime_id,data_source,token,timeout_ms): """ :param server_name: Server endpoint (hostname:port) :type server_name: string @@ -353,7 +353,7 @@ def create_server_broker(server_name,source_path,has_filesystem,beamtime_id,stre :rtype: Tuple with broker object and error. """ factory = __PyDataBrokerFactory() - return factory.create_server_broker(server_name,source_path,has_filesystem, beamtime_id,stream,token,timeout_ms) + return factory.create_server_broker(server_name,source_path,has_filesystem, beamtime_id,data_source,token,timeout_ms) __version__ = "@PYTHON_ASAPO_VERSION@@ASAPO_VERSION_COMMIT@" diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index 9399cd327647cc6976d9ae9f485ec69fc5f55fcb..2fa855984f37a6ef627f3f64a0005642f2331dd1 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -186,8 +186,8 @@ Error ProducerImpl::SetCredentials(SourceCredentials source_cred) { return ProducerErrorTemplates::kWrongInput.Generate("credentials already set"); } - if (source_cred.stream.empty()) { - source_cred.stream = SourceCredentials::kDefaultStream; + if (source_cred.data_source.empty()) { + source_cred.data_source = SourceCredentials::kDefaultStream; } if (source_cred.beamline.empty()) { diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index f1b400c10cf70eaf222f9ddb420f48bc4c3ab433..c267b701d20be5772ac95a7d593a3f25cd1171ee 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -55,7 +55,7 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo": struct SourceCredentials: string beamtime_id string beamline - string stream + string data_source string user_token SourceType type diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 1040a6fb22a2389428d6d740973388f382f3c916..9e49f3187c41ead4499d3f8dfa76fabe147cc939 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -320,7 +320,7 @@ cdef class PyProducer: if self.c_producer.get() is not NULL: self.c_producer.get().StopThreads__() @staticmethod - def __create_producer(endpoint,type,beamtime_id,beamline,stream,token,nthreads,timeout_sec): + def __create_producer(endpoint,type,beamtime_id,beamline,data_source,token,nthreads,timeout_sec): pyProd = PyProducer() cdef Error err cdef SourceType source_type @@ -331,14 +331,14 @@ cdef class PyProducer: source.beamtime_id = beamtime_id source.beamline = beamline source.user_token = token - source.stream = stream + source.data_source = data_source source.type = source_type pyProd.c_producer = Producer.Create(endpoint,nthreads,RequestHandlerType_Tcp,source,timeout_sec,&err) if err: throw_exception(err) return pyProd -def create_producer(endpoint,type,beamtime_id,beamline,stream,token,nthreads,timeout_sec): +def create_producer(endpoint,type,beamtime_id,beamline,data_source,token,nthreads,timeout_sec): """ :param endpoint: server endpoint (url:port) :type endpoint: string @@ -348,8 +348,8 @@ def create_producer(endpoint,type,beamtime_id,beamline,stream,token,nthreads,tim :type beamtime_id: string :param beamline: beamline name, can be "auto" if beamtime_id is given :type beamline: string - :param stream: stream to producer data to - :type stream: string + :param data_source: name of the data source that produces data + :type data_source: string :param token: authorization token :type token: string :param nthreads: ingest mode flag @@ -360,7 +360,7 @@ def create_producer(endpoint,type,beamtime_id,beamline,stream,token,nthreads,tim AsapoWrongInputError: wrong input (number of threads, ,,,) AsapoProducerError: actually should not happen """ - return PyProducer.__create_producer(_bytes(endpoint),_bytes(type),_bytes(beamtime_id),_bytes(beamline),_bytes(stream),_bytes(token),nthreads,timeout_sec) + return PyProducer.__create_producer(_bytes(endpoint),_bytes(type),_bytes(beamtime_id),_bytes(beamline),_bytes(data_source),_bytes(token),nthreads,timeout_sec) __version__ = "@PYTHON_ASAPO_VERSION@@ASAPO_VERSION_COMMIT@" diff --git a/receiver/src/request.cpp b/receiver/src/request.cpp index c07c249fb79d4141c504988614860d0e64d0fc1b..808567d3d74a109f5784ab76608faf032b5f9c55 100644 --- a/receiver/src/request.cpp +++ b/receiver/src/request.cpp @@ -129,11 +129,11 @@ const std::string& Request::GetMetaData() const { const CustomRequestData& Request::GetCustomData() const { return request_header_.custom_data; } -const std::string& Request::GetStream() const { - return stream_; +const std::string& Request::GetDataSource() const { + return data_source_; } -void Request::SetStream(std::string stream) { - stream_ = std::move(stream); +void Request::SetDataSource(std::string data_source) { + data_source_ = std::move(data_source); } void Request::UnlockDataBufferIfNeeded() { diff --git a/receiver/src/request.h b/receiver/src/request.h index 2bd6d5796a9f4eb74a1b83618751d7c511d07ebb..4811b01aae16b999fa8bee9c1b93f0b51287ba2a 100644 --- a/receiver/src/request.h +++ b/receiver/src/request.h @@ -57,8 +57,8 @@ class Request { VIRTUAL void SetSourceType(SourceType); VIRTUAL SourceType GetSourceType() const; - VIRTUAL const std::string& GetStream() const; - VIRTUAL void SetStream(std::string stream); + VIRTUAL const std::string& GetDataSource() const; + VIRTUAL void SetDataSource(std::string data_source); VIRTUAL void SetMetadata(std::string metadata); VIRTUAL void SetOnlinePath(std::string facility); @@ -88,7 +88,7 @@ class Request { RequestHandlerList handlers_; std::string origin_uri_; std::string beamtime_id_; - std::string stream_; + std::string data_source_; std::string beamline_; std::string offline_path_; std::string online_path_; diff --git a/receiver/src/request_handler/request_handler_authorize.cpp b/receiver/src/request_handler/request_handler_authorize.cpp index fa321e304b037acebed1c5fd351d920bc2738711..f13fc1ae28a41cedcd44ba348544e4c1e6c9d4fa 100644 --- a/receiver/src/request_handler/request_handler_authorize.cpp +++ b/receiver/src/request_handler/request_handler_authorize.cpp @@ -45,7 +45,7 @@ Error RequestHandlerAuthorize::Authorize(Request* request, const char* source_cr JsonStringParser parser{response}; (err = parser.GetString("beamtimeId", &beamtime_id_)) || - (err = parser.GetString("stream", &stream_)) || + (err = parser.GetString("dataSource", &data_source_)) || (err = parser.GetString("core-path", &offline_path_)) || (err = parser.GetString("beamline-path", &online_path_)) || (err = parser.GetString("source-type", &stype)) || @@ -55,7 +55,7 @@ Error RequestHandlerAuthorize::Authorize(Request* request, const char* source_cr return ErrorFromAuthorizationServerResponse(err, code); } else { log__->Debug(std::string("authorized connection from ") + request->GetOriginUri() +"source type: "+stype+ " beamline: " + - beamline_ + ", beamtime id: " + beamtime_id_ + ", stream: " + stream_); + beamline_ + ", beamtime id: " + beamtime_id_ + ", data soucre: " + data_source_); } last_updated_ = system_clock::now(); @@ -106,7 +106,7 @@ Error RequestHandlerAuthorize::ProcessOtherRequest(Request* request) const { } request->SetBeamtimeId(beamtime_id_); request->SetBeamline(beamline_); - request->SetStream(stream_); + request->SetDataSource(data_source_); request->SetOfflinePath(offline_path_); request->SetOnlinePath(online_path_); request->SetSourceType(source_type_); diff --git a/receiver/src/request_handler/request_handler_authorize.h b/receiver/src/request_handler/request_handler_authorize.h index 481927cf0a904c63c10cba11d439725108334786..1798ea8fb31de1e8d0e99459bcf447b2763085ea 100644 --- a/receiver/src/request_handler/request_handler_authorize.h +++ b/receiver/src/request_handler/request_handler_authorize.h @@ -21,7 +21,7 @@ class RequestHandlerAuthorize final: public ReceiverRequestHandler { std::unique_ptr<HttpClient>http_client__; private: mutable std::string beamtime_id_; - mutable std::string stream_; + mutable std::string data_source_; mutable std::string beamline_; mutable std::string offline_path_; mutable std::string online_path_; diff --git a/receiver/src/request_handler/request_handler_db.cpp b/receiver/src/request_handler/request_handler_db.cpp index c26ea6e286145dba691d7b453032a2f4eb588e67..821f0d770a551e27e2a1c0737e4902ce46e5519e 100644 --- a/receiver/src/request_handler/request_handler_db.cpp +++ b/receiver/src/request_handler/request_handler_db.cpp @@ -8,8 +8,8 @@ namespace asapo { Error RequestHandlerDb::ProcessRequest(Request* request) const { if (db_name_.empty()) { db_name_ = request->GetBeamtimeId(); - auto stream = request->GetStream(); - db_name_ += "_" + stream; + auto data_source = request->GetDataSource(); + db_name_ += "_" + data_source; } diff --git a/receiver/unittests/receiver_mocking.h b/receiver/unittests/receiver_mocking.h index 7b72249a58fc9bfa4250b6f1e9f0d6b4ea6748c0..4845de5770ef6339dd6fb6d818f26d7ff2e8be47 100644 --- a/receiver/unittests/receiver_mocking.h +++ b/receiver/unittests/receiver_mocking.h @@ -71,7 +71,7 @@ class MockRequest: public Request { MOCK_CONST_METHOD0(GetSlotId, uint64_t()); MOCK_CONST_METHOD0(GetData, void* ()); MOCK_CONST_METHOD0(GetBeamtimeId, const std::string & ()); - MOCK_CONST_METHOD0(GetStream, const std::string & ()); + MOCK_CONST_METHOD0(GetDataSource, const std::string & ()); MOCK_CONST_METHOD0(GetMetaData, const std::string & ()); MOCK_CONST_METHOD0(GetBeamline, const std::string & ()); MOCK_CONST_METHOD0(GetOpCode, asapo::Opcode ()); @@ -87,7 +87,7 @@ class MockRequest: public Request { MOCK_CONST_METHOD0(GetCustomData_t, const uint64_t* ()); MOCK_CONST_METHOD0(GetMessage, const char* ()); MOCK_METHOD1(SetBeamtimeId, void (std::string)); - MOCK_METHOD1(SetStream, void (std::string)); + MOCK_METHOD1(SetDataSource, void (std::string)); MOCK_METHOD1(SetBeamline, void (std::string)); MOCK_METHOD1(SetOnlinePath, void (std::string)); MOCK_METHOD1(SetOfflinePath, void (std::string)); diff --git a/receiver/unittests/request_handler/test_request_handler_authorizer.cpp b/receiver/unittests/request_handler/test_request_handler_authorizer.cpp index 304d0af27007b777e578ccb3a95a50dd2b383c9e..c1e5a97410fb279f029ee9a3447c67ba4abce3a2 100644 --- a/receiver/unittests/request_handler/test_request_handler_authorizer.cpp +++ b/receiver/unittests/request_handler/test_request_handler_authorizer.cpp @@ -64,7 +64,7 @@ class AuthorizerHandlerTests : public Test { NiceMock<asapo::MockLogger> mock_logger; std::string expected_beamtime_id = "beamtime_id"; - std::string expected_stream = "stream"; + std::string expected_data_source = "source"; std::string expected_beamline = "beamline"; std::string expected_beamline_path = "/beamline/p01/current"; std::string expected_core_path = "/gpfs/blabla"; @@ -77,7 +77,7 @@ class AuthorizerHandlerTests : public Test { void MockRequestData(); void SetUp() override { GenericRequestHeader request_header; - expected_source_credentials = "processed%"+expected_beamtime_id + "%stream%token"; + expected_source_credentials = "processed%"+expected_beamtime_id + "%source%token"; expect_request_string = std::string("{\"SourceCredentials\":\"") + expected_source_credentials + "\",\"OriginHost\":\"" + expected_producer_uri + "\"}"; @@ -111,7 +111,7 @@ class AuthorizerHandlerTests : public Test { DoAll(SetArgPointee<4>(nullptr), SetArgPointee<3>(code), Return("{\"beamtimeId\":\"" + expected_beamtime_id + - "\",\"stream\":" + "\"" + expected_stream + + "\",\"dataSource\":" + "\"" + expected_data_source + "\",\"beamline-path\":" + "\"" + expected_beamline_path + "\",\"core-path\":" + "\"" + expected_core_path + "\",\"source-type\":" + "\"" + expected_source_type_str + @@ -123,7 +123,7 @@ class AuthorizerHandlerTests : public Test { HasSubstr(std::to_string(int(code))), HasSubstr(expected_source_type_str), HasSubstr(expected_beamtime_id), - HasSubstr(expected_stream), + HasSubstr(expected_data_source), HasSubstr(expected_producer_uri), HasSubstr(expected_authorization_server)))); } else { @@ -131,7 +131,7 @@ class AuthorizerHandlerTests : public Test { HasSubstr(expected_beamtime_id), HasSubstr(expected_beamline), HasSubstr(expected_source_type_str), - HasSubstr(expected_stream), + HasSubstr(expected_data_source), HasSubstr(expected_producer_uri)))); } } @@ -156,7 +156,7 @@ class AuthorizerHandlerTests : public Test { if (!error && code == HttpCode::OK && set_request) { EXPECT_CALL(*mock_request, SetBeamtimeId(expected_beamtime_id)); - EXPECT_CALL(*mock_request, SetStream(expected_stream)); + EXPECT_CALL(*mock_request, SetDataSource(expected_data_source)); EXPECT_CALL(*mock_request, SetOfflinePath(expected_core_path)); EXPECT_CALL(*mock_request, SetOnlinePath(expected_beamline_path)); EXPECT_CALL(*mock_request, SetBeamline(expected_beamline)); @@ -265,7 +265,7 @@ TEST_F(AuthorizerHandlerTests, DataTransferRequestAuthorizeUsesCachedValue) { EXPECT_CALL(mock_http_client, Post_t(_, _, _, _, _)).Times(0); EXPECT_CALL(*mock_request, SetBeamtimeId(expected_beamtime_id)); EXPECT_CALL(*mock_request, SetBeamline(expected_beamline)); - EXPECT_CALL(*mock_request, SetStream(expected_stream)); + EXPECT_CALL(*mock_request, SetDataSource(expected_data_source)); EXPECT_CALL(*mock_request, SetOnlinePath(expected_beamline_path)); EXPECT_CALL(*mock_request, SetOfflinePath(expected_core_path)); EXPECT_CALL(*mock_request, SetSourceType(expected_source_type)); diff --git a/receiver/unittests/request_handler/test_request_handler_db.cpp b/receiver/unittests/request_handler/test_request_handler_db.cpp index a84aa60b1e0364b0fb017b933030d1577d49dd17..0cda482d766393ddc76ef4532943cd2781991d57 100644 --- a/receiver/unittests/request_handler/test_request_handler_db.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db.cpp @@ -66,8 +66,8 @@ class DbHandlerTests : public Test { NiceMock<asapo::MockLogger> mock_logger; ReceiverConfig config; std::string expected_beamtime_id = "beamtime_id"; - std::string expected_stream = "stream"; - std::string expected_default_stream = "detector"; + std::string expected_stream = "source"; + std::string expected_default_source = "detector"; std::string expected_discovery_server = "discovery"; std::string expected_database_server = "127.0.0.1:27017"; @@ -81,7 +81,7 @@ class DbHandlerTests : public Test { handler.http_client__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; mock_request.reset(new NiceMock<MockRequest> {request_header, 1, "", nullptr}); ON_CALL(*mock_request, GetBeamtimeId()).WillByDefault(ReturnRef(expected_beamtime_id)); - ON_CALL(*mock_request, GetStream()).WillByDefault(ReturnRef(expected_stream)); + ON_CALL(*mock_request, GetDataSource()).WillByDefault(ReturnRef(expected_stream)); } void TearDown() override { @@ -151,7 +151,7 @@ TEST_F(DbHandlerTests, ProcessRequestDiscoversMongoDbAddress) { .WillOnce(ReturnRef(expected_beamtime_id)) ; - EXPECT_CALL(*mock_request, GetStream()) + EXPECT_CALL(*mock_request, GetDataSource()) .WillOnce(ReturnRef(expected_stream)) ; @@ -188,7 +188,7 @@ TEST_F(DbHandlerTests, ProcessRequestCallsConnectDbWhenNotConnected) { .WillOnce(ReturnRef(expected_beamtime_id)) ; - EXPECT_CALL(*mock_request, GetStream()) + EXPECT_CALL(*mock_request, GetDataSource()) .WillOnce(ReturnRef(expected_stream)) ; diff --git a/receiver/unittests/request_handler/test_request_handler_db_check_request.cpp b/receiver/unittests/request_handler/test_request_handler_db_check_request.cpp index 5f02a4d1dfb6ef1f181331d7fe6e3bc787d3f99e..780e78bd6caa7a2c492be55e64aab4314da7d5ea 100644 --- a/receiver/unittests/request_handler/test_request_handler_db_check_request.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db_check_request.cpp @@ -73,8 +73,8 @@ class DbCheckRequestHandlerTests : public Test { NiceMock<asapo::MockLogger> mock_logger; ReceiverConfig config; std::string expected_beamtime_id = "beamtime_id"; - std::string expected_default_stream = "detector"; - std::string expected_stream = "stream"; + std::string expected_default_source = "detector"; + std::string expected_data_source = "source"; std::string expected_host_uri = "127.0.0.1:1234"; uint64_t expected_port = 1234; uint64_t expected_buf_id = 18446744073709551615ull; @@ -148,7 +148,7 @@ void DbCheckRequestHandlerTests::ExpectRequestParams(asapo::Opcode op_code, cons .WillOnce(ReturnRef(expected_beamtime_id)) ; - EXPECT_CALL(*mock_request, GetStream()) + EXPECT_CALL(*mock_request, GetDataSource()) .WillOnce(ReturnRef(stream)) ; } @@ -198,7 +198,7 @@ FileInfo DbCheckRequestHandlerTests::PrepareFileInfo() { } void DbCheckRequestHandlerTests::MockGetByID(asapo::ErrorInterface* error, bool expect_compare ) { - ExpectRequestParams(asapo::Opcode::kOpcodeTransferData, expected_stream, expect_compare); + ExpectRequestParams(asapo::Opcode::kOpcodeTransferData, expected_data_source, expect_compare); EXPECT_CALL(mock_db, GetById_t(expected_collection_name, expected_id, _)). WillOnce(DoAll( SetArgPointee<2>(expected_file_info), @@ -207,7 +207,7 @@ void DbCheckRequestHandlerTests::MockGetByID(asapo::ErrorInterface* error, bool } void DbCheckRequestHandlerTests::MockGetSetByID(asapo::ErrorInterface* error, bool expect_compare ) { - ExpectRequestParams(asapo::Opcode::kOpcodeTransferSubsetData, expected_stream, expect_compare); + ExpectRequestParams(asapo::Opcode::kOpcodeTransferSubsetData, expected_data_source, expect_compare); EXPECT_CALL(mock_db, GetSetById_t(expected_collection_name, expected_subset_id, expected_id, _)). WillOnce(DoAll( SetArgPointee<3>(expected_file_info), diff --git a/receiver/unittests/request_handler/test_request_handler_db_last_stream.cpp b/receiver/unittests/request_handler/test_request_handler_db_last_stream.cpp index 120dfb7b2b69a08b59d1a3fa476d8d617d632479..35d6ae14b89b988f5d096aad273fb4da6634d2de 100644 --- a/receiver/unittests/request_handler/test_request_handler_db_last_stream.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db_last_stream.cpp @@ -62,7 +62,7 @@ class DbMetaLastStreamTests : public Test { NiceMock<asapo::MockLogger> mock_logger; ReceiverConfig config; std::string expected_beamtime_id = "beamtime_id"; - std::string expected_stream = "stream"; + std::string expected_data_source = "stream"; std::string info_str = R"({"lastId":10,"name":"substream","timestampCreated":1000000,"timestampLast":2000000})"; asapo::StreamInfo expected_stream_info; void SetUp() override { @@ -89,9 +89,9 @@ TEST_F(DbMetaLastStreamTests, CallsUpdate) { .WillOnce(ReturnRef(expected_beamtime_id)) ; - EXPECT_CALL(*mock_request, GetStream()).WillOnce(ReturnRef(expected_stream)); + EXPECT_CALL(*mock_request, GetDataSource()).WillOnce(ReturnRef(expected_data_source)); - EXPECT_CALL(mock_db, Connect_t(config.database_uri, expected_beamtime_id + "_" + expected_stream)). + EXPECT_CALL(mock_db, Connect_t(config.database_uri, expected_beamtime_id + "_" + expected_data_source)). WillOnce(testing::Return(nullptr)); diff --git a/receiver/unittests/request_handler/test_request_handler_db_meta_writer.cpp b/receiver/unittests/request_handler/test_request_handler_db_meta_writer.cpp index d80d877bb2a943edd4f99a69a8c4c70b7ebd87af..7cec10a2cef7c4edb6a13e6840cd67fab9675a1b 100644 --- a/receiver/unittests/request_handler/test_request_handler_db_meta_writer.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db_meta_writer.cpp @@ -62,7 +62,7 @@ class DbMetaWriterHandlerTests : public Test { NiceMock<asapo::MockLogger> mock_logger; ReceiverConfig config; std::string expected_beamtime_id = "beamtime_id"; - std::string expected_stream = "stream"; + std::string expected_data_source = "source"; std::string meta_str = R"("info":"stat","int_data":0,"float_data":0.1,"bool":false)"; const uint8_t* expected_meta = reinterpret_cast<const uint8_t*>(meta_str.c_str()); @@ -91,12 +91,12 @@ TEST_F(DbMetaWriterHandlerTests, CallsUpdate) { .WillOnce(ReturnRef(expected_beamtime_id)) ; - EXPECT_CALL(*mock_request, GetStream()) - .WillOnce(ReturnRef(expected_stream)) + EXPECT_CALL(*mock_request, GetDataSource()) + .WillOnce(ReturnRef(expected_data_source)) ; - EXPECT_CALL(mock_db, Connect_t(config.database_uri, expected_beamtime_id + "_" + expected_stream)). + EXPECT_CALL(mock_db, Connect_t(config.database_uri, expected_beamtime_id + "_" + expected_data_source)). WillOnce(testing::Return(nullptr)); diff --git a/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp b/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp index 5bcd03d12892f15eecff516ddb31035247db166c..bd0aee05660fba242ccc2e44e27aa87c028637ba 100644 --- a/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp @@ -63,7 +63,7 @@ class DbMetaStreamInfoTests : public Test { NiceMock<asapo::MockLogger> mock_logger; ReceiverConfig config; std::string expected_beamtime_id = "beamtime_id"; - std::string expected_stream = "stream"; + std::string expected_data_source = "source"; std::string info_str = R"({"lastId":10,"name":"substream","timestampCreated":1000000,"timestampLast":2000000})"; asapo::StreamInfo expected_stream_info; void SetUp() override { @@ -90,13 +90,13 @@ TEST_F(DbMetaStreamInfoTests, CallsUpdate) { .WillOnce(ReturnRef(expected_beamtime_id)) ; - EXPECT_CALL(*mock_request, GetStream()).WillOnce(ReturnRef(expected_stream)); + EXPECT_CALL(*mock_request, GetDataSource()).WillOnce(ReturnRef(expected_data_source)); EXPECT_CALL(*mock_request, GetSubstream()).Times(2) .WillRepeatedly(Return(expected_substream)) ; - EXPECT_CALL(mock_db, Connect_t(config.database_uri, expected_beamtime_id + "_" + expected_stream)). + EXPECT_CALL(mock_db, Connect_t(config.database_uri, expected_beamtime_id + "_" + expected_data_source)). WillOnce(testing::Return(nullptr)); diff --git a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp index d291c10754d9e6188930109e1a055ad18092eef6..8ac7d78863637b20581a7ab2231fd7178aff8b2d 100644 --- a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp @@ -74,8 +74,8 @@ class DbWriterHandlerTests : public Test { NiceMock<asapo::MockLogger> mock_logger; ReceiverConfig config; std::string expected_beamtime_id = "beamtime_id"; - std::string expected_default_stream = "detector"; - std::string expected_stream = "stream"; + std::string expected_default_source = "detector"; + std::string expected_data_source = "source"; std::string expected_host_ip = "127.0.0.1"; uint64_t expected_port = 1234; uint64_t expected_buf_id = 18446744073709551615ull; @@ -129,7 +129,7 @@ MATCHER_P(CompareFileInfo, file, "") { } -void DbWriterHandlerTests::ExpectRequestParams(asapo::Opcode op_code, const std::string& stream) { +void DbWriterHandlerTests::ExpectRequestParams(asapo::Opcode op_code, const std::string& data_source) { EXPECT_CALL(*mock_request, WasAlreadyProcessed()) .WillOnce(Return(false)) @@ -139,8 +139,8 @@ void DbWriterHandlerTests::ExpectRequestParams(asapo::Opcode op_code, const std: .WillOnce(ReturnRef(expected_beamtime_id)) ; - EXPECT_CALL(*mock_request, GetStream()) - .WillOnce(ReturnRef(stream)) + EXPECT_CALL(*mock_request, GetDataSource()) + .WillOnce(ReturnRef(data_source)) ; @@ -149,7 +149,7 @@ void DbWriterHandlerTests::ExpectRequestParams(asapo::Opcode op_code, const std: ; std::string db_name = expected_beamtime_id; - db_name += "_" + stream; + db_name += "_" + data_source; EXPECT_CALL(mock_db, Connect_t(config.database_uri, db_name)). WillOnce(testing::Return(nullptr)); @@ -203,7 +203,7 @@ void DbWriterHandlerTests::ExpectLogger() { EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("insert record"), HasSubstr(config.database_uri), HasSubstr(expected_beamtime_id), - HasSubstr(expected_stream), + HasSubstr(expected_data_source), HasSubstr(expected_collection_name) ) ) @@ -213,7 +213,7 @@ void DbWriterHandlerTests::ExpectLogger() { TEST_F(DbWriterHandlerTests, CallsInsert) { - ExpectRequestParams(asapo::Opcode::kOpcodeTransferData, expected_stream); + ExpectRequestParams(asapo::Opcode::kOpcodeTransferData, expected_data_source); auto file_info = PrepareFileInfo(); EXPECT_CALL(mock_db, Insert_t(expected_collection_name, CompareFileInfo(file_info), false)). @@ -225,13 +225,13 @@ TEST_F(DbWriterHandlerTests, CallsInsert) { TEST_F(DbWriterHandlerTests, CallsInsertSubset) { - ExpectRequestParams(asapo::Opcode::kOpcodeTransferSubsetData, expected_default_stream); + ExpectRequestParams(asapo::Opcode::kOpcodeTransferSubsetData, expected_data_source); auto file_info = PrepareFileInfo(); EXPECT_CALL(mock_db, InsertAsSubset_t(expected_collection_name, CompareFileInfo(file_info), expected_subset_id, expected_subset_size, false)). - WillOnce(testing::Return(nullptr)); + WillOnce(testing::Return( nullptr)); ExpectLogger(); handler.ProcessRequest(mock_request.get()); @@ -239,7 +239,7 @@ TEST_F(DbWriterHandlerTests, CallsInsertSubset) { void DbWriterHandlerTests::ExpectDuplicatedID() { - ExpectRequestParams(asapo::Opcode::kOpcodeTransferData, expected_stream); + ExpectRequestParams(asapo::Opcode::kOpcodeTransferData, expected_data_source); auto file_info = PrepareFileInfo(); EXPECT_CALL(mock_db, Insert_t(expected_collection_name, CompareFileInfo(file_info), false)). diff --git a/receiver/unittests/test_request.cpp b/receiver/unittests/test_request.cpp index 59ce937bdf33c689cbfcb35e0ecef8445fd135af..402f00defbbb24ca2f4a685e2efd1960ec9b42bb 100644 --- a/receiver/unittests/test_request.cpp +++ b/receiver/unittests/test_request.cpp @@ -224,9 +224,9 @@ TEST_F(RequestTests, SetGetBeamtimeId) { TEST_F(RequestTests, SetGetStream) { - request->SetStream("stream"); + request->SetDataSource("stream"); - ASSERT_THAT(request->GetStream(), "stream"); + ASSERT_THAT(request->GetDataSource(), "stream"); }