diff --git a/CMakeModules/prepare_asapo.cmake b/CMakeModules/prepare_asapo.cmake index 99e4f0a58fa1964e7eeb44373b1277653e798a25..f5beccfa79491a8daf8ec539b28ba813f0b4a2c7 100644 --- a/CMakeModules/prepare_asapo.cmake +++ b/CMakeModules/prepare_asapo.cmake @@ -20,7 +20,7 @@ function(prepare_asapo) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver.json.tpl.lin.in receiver.json.tpl @ONLY) endif() - configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver.nmd.in receiver.nmd @ONLY) + configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver.nmd.in receiver.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/discovery.nmd.in discovery.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/authorizer.nmd.in authorizer.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/broker.nmd.in broker.nmd @ONLY) diff --git a/broker/src/asapo_broker/database/database.go b/broker/src/asapo_broker/database/database.go index 9f3dd53983a2efd148777a5060f7451a5f0dfbe7..efbfec7760794874aa191e7f97e9a37d0c477da3 100644 --- a/broker/src/asapo_broker/database/database.go +++ b/broker/src/asapo_broker/database/database.go @@ -1,7 +1,7 @@ package database type Agent interface { - GetRecordFromDb(db_name string, op string, id int) ([]byte, error) + GetRecordFromDb(db_name string, group_id string, op string, id int) ([]byte, error) Connect(string) error Close() Copy() Agent diff --git a/broker/src/asapo_broker/database/mock_database.go b/broker/src/asapo_broker/database/mock_database.go index 81ec5978cc7fc2fb095508f340834d170e888689..2f6b668283aefc2858c1652dc60e216ce8ce4bfa 100644 --- a/broker/src/asapo_broker/database/mock_database.go +++ b/broker/src/asapo_broker/database/mock_database.go @@ -24,7 +24,7 @@ func (db *MockedDatabase) Copy() Agent { return db } -func (db *MockedDatabase) GetRecordFromDb(db_name string, op string, id int) (answer []byte, err error) { - args := db.Called(db_name, op, id) +func (db *MockedDatabase) GetRecordFromDb(db_name string, group_id string, op string, id int) (answer []byte, err error) { + args := db.Called(db_name, group_id, op, id) return args.Get(0).([]byte), args.Error(1) } diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 9e290cf9f2f83292a8dbda095444bb2e40cb7613..3582a0ca948ee018cab4c3b1f7ab18f9b36f7d17 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -127,32 +127,32 @@ func (db *Mongodb) getMaxIndex(dbname string) (max_id int, err error) { return id.ID, nil } -func (db *Mongodb) createLocationPointers(dbname string) (err error) { +func (db *Mongodb) createLocationPointers(dbname string, group_id string) (err error) { change := mgo.Change{ Update: bson.M{"$inc": bson.M{pointer_field_name: 0}}, Upsert: true, } - q := bson.M{"_id": 0} + q := bson.M{"_id": group_id} c := db.session.DB(dbname).C(pointer_collection_name) var res map[string]interface{} _, err = c.Find(q).Apply(change, &res) return err } -func (db *Mongodb) setCounter(dbname string, ind int) (err error) { +func (db *Mongodb) setCounter(dbname string, group_id string, ind int) (err error) { update := bson.M{"$set": bson.M{pointer_field_name: ind}} c := db.session.DB(dbname).C(pointer_collection_name) - return c.UpdateId(0, update) + return c.UpdateId(group_id, update) } -func (db *Mongodb) incrementField(dbname string, max_ind int, res interface{}) (err error) { +func (db *Mongodb) incrementField(dbname string, group_id string, max_ind int, res interface{}) (err error) { update := bson.M{"$inc": bson.M{pointer_field_name: 1}} change := mgo.Change{ Update: update, Upsert: false, ReturnNew: true, } - q := bson.M{"_id": 0, pointer_field_name: bson.M{"$lt": max_ind}} + q := bson.M{"_id": group_id, pointer_field_name: bson.M{"$lt": max_ind}} c := db.session.DB(dbname).C(pointer_collection_name) _, err = c.Find(q).Apply(change, res) if err == mgo.ErrNotFound { @@ -186,26 +186,26 @@ func (db *Mongodb) GetRecordByID(dbname string, id int, returnID bool) ([]byte, return utils.MapToJson(&res) } -func (db *Mongodb) needCreateLocationPointersInDb(db_name string) bool { +func (db *Mongodb) needCreateLocationPointersInDb(group_id string) bool { dbPointersLock.RLock() - needCreate := !db.db_pointers_created[db_name] + needCreate := !db.db_pointers_created[group_id] dbPointersLock.RUnlock() return needCreate } -func (db *Mongodb) SetLocationPointersCreateFlag(db_name string) { +func (db *Mongodb) SetLocationPointersCreateFlag(group_id string) { dbPointersLock.Lock() if db.db_pointers_created == nil { db.db_pointers_created = make(map[string]bool) } - db.db_pointers_created[db_name] = true + db.db_pointers_created[group_id] = true dbPointersLock.Unlock() } -func (db *Mongodb) generateLocationPointersInDbIfNeeded(db_name string) { - if db.needCreateLocationPointersInDb(db_name) { - db.createLocationPointers(db_name) - db.SetLocationPointersCreateFlag(db_name) +func (db *Mongodb) generateLocationPointersInDbIfNeeded(db_name string, group_id string) { + if db.needCreateLocationPointersInDb(group_id) { + db.createLocationPointers(db_name, group_id) + db.SetLocationPointersCreateFlag(group_id) } } @@ -217,7 +217,7 @@ func (db *Mongodb) getParentDB() *Mongodb { } } -func (db *Mongodb) checkDatabaseOperationPrerequisites(db_name string) error { +func (db *Mongodb) checkDatabaseOperationPrerequisites(db_name string, group_id string) error { if db.session == nil { return &DBError{utils.StatusError, no_session_msg} } @@ -226,18 +226,18 @@ func (db *Mongodb) checkDatabaseOperationPrerequisites(db_name string) error { return &DBError{utils.StatusWrongInput, err.Error()} } - db.getParentDB().generateLocationPointersInDbIfNeeded(db_name) + db.getParentDB().generateLocationPointersInDbIfNeeded(db_name, group_id) return nil } -func (db *Mongodb) getCurrentPointer(db_name string) (Pointer, error) { +func (db *Mongodb) getCurrentPointer(db_name string, group_id string) (Pointer, error) { max_ind, err := db.getMaxIndex(db_name) if err != nil { return Pointer{}, err } var curPointer Pointer - err = db.incrementField(db_name, max_ind, &curPointer) + err = db.incrementField(db_name, group_id, max_ind, &curPointer) if err != nil { return Pointer{}, err } @@ -245,51 +245,51 @@ func (db *Mongodb) getCurrentPointer(db_name string) (Pointer, error) { return curPointer, nil } -func (db *Mongodb) GetNextRecord(db_name string) ([]byte, error) { +func (db *Mongodb) GetNextRecord(db_name string, group_id string) ([]byte, error) { - if err := db.checkDatabaseOperationPrerequisites(db_name); err != nil { + if err := db.checkDatabaseOperationPrerequisites(db_name, group_id); err != nil { return nil, err } - curPointer, err := db.getCurrentPointer(db_name) + curPointer, err := db.getCurrentPointer(db_name, group_id) if err != nil { - log_str := "error getting next pointer for " + db_name + ":" + err.Error() + log_str := "error getting next pointer for " + db_name + ", groupid: " + group_id + ":" + err.Error() logger.Debug(log_str) return nil, err } - log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + db_name + log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + db_name + ", groupid: " + group_id logger.Debug(log_str) return db.GetRecordByID(db_name, curPointer.Value, true) } -func (db *Mongodb) GetLastRecord(db_name string) ([]byte, error) { +func (db *Mongodb) GetLastRecord(db_name string, group_id string) ([]byte, error) { - if err := db.checkDatabaseOperationPrerequisites(db_name); err != nil { + if err := db.checkDatabaseOperationPrerequisites(db_name, group_id); err != nil { return nil, err } max_ind, err := db.getMaxIndex(db_name) if err != nil { - log_str := "error getting last pointer for " + db_name + ":" + err.Error() + log_str := "error getting last pointer for " + db_name + ", groupid: " + group_id + ":" + err.Error() logger.Debug(log_str) return nil, err } res, err := db.GetRecordByID(db_name, max_ind, false) - db.setCounter(db_name, max_ind) + db.setCounter(db_name, group_id, max_ind) return res, err } -func (db *Mongodb) GetRecordFromDb(db_name string, op string, id int) (answer []byte, err error) { +func (db *Mongodb) GetRecordFromDb(db_name string, group_id string, op string, id int) (answer []byte, err error) { switch op { case "next": - return db.GetNextRecord(db_name) + return db.GetNextRecord(db_name, group_id) case "id": return db.GetRecordByID(db_name, id, true) case "last": - return db.GetLastRecord(db_name) + return db.GetLastRecord(db_name, group_id) } return nil, errors.New("Wrong db operation: " + op) } diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index b5b5c18429815bc56c450990ef2db0cd798f4296..038dea77ee6e18392aa6287e2722efa679a3b666 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -19,6 +19,7 @@ var db Mongodb const dbname = "run1" const dbaddress = "127.0.0.1:27017" +const groupId = "bid2a5auidddp1vl71d0" var rec1 = TestRecord{1, "aaa"} var rec2 = TestRecord{2, "bbb"} @@ -49,14 +50,14 @@ func TestMongoDBConnectOK(t *testing.T) { } func TestMongoDBGetNextErrorWhenNotConnected(t *testing.T) { - _, err := db.GetNextRecord("") + _, err := db.GetNextRecord("", groupId) assert.Equal(t, utils.StatusError, err.(*DBError).Code) } func TestMongoDBGetNextErrorWhenWrongDatabasename(t *testing.T) { db.Connect(dbaddress) defer cleanup() - _, err := db.GetNextRecord("") + _, err := db.GetNextRecord("", groupId) assert.Equal(t, utils.StatusWrongInput, err.(*DBError).Code) } @@ -64,7 +65,7 @@ func TestMongoDBGetNextErrorWhenEmptyCollection(t *testing.T) { db.Connect(dbaddress) db.databases = append(db.databases, dbname) defer cleanup() - _, err := db.GetNextRecord(dbname) + _, err := db.GetNextRecord(dbname, groupId) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) } @@ -72,7 +73,7 @@ func TestMongoDBGetNextErrorWhenRecordNotThereYet(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.InsertRecord(dbname, &rec2) - _, err := db.GetNextRecord(dbname) + _, err := db.GetNextRecord(dbname, groupId) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"id\":1}", err.Error()) } @@ -81,7 +82,7 @@ func TestMongoDBGetNextOK(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.InsertRecord(dbname, &rec1) - res, err := db.GetNextRecord(dbname) + res, err := db.GetNextRecord(dbname, groupId) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -90,8 +91,8 @@ func TestMongoDBGetNextErrorOnNoMoreData(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.InsertRecord(dbname, &rec1) - db.GetNextRecord(dbname) - _, err := db.GetNextRecord(dbname) + db.GetNextRecord(dbname, groupId) + _, err := db.GetNextRecord(dbname, groupId) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) } @@ -100,8 +101,8 @@ func TestMongoDBGetNextCorrectOrder(t *testing.T) { defer cleanup() db.InsertRecord(dbname, &rec2) db.InsertRecord(dbname, &rec1) - res1, _ := db.GetNextRecord(dbname) - res2, _ := db.GetNextRecord(dbname) + res1, _ := db.GetNextRecord(dbname, groupId) + res2, _ := db.GetNextRecord(dbname, groupId) assert.Equal(t, string(rec1_expect), string(res1)) assert.Equal(t, string(rec2_expect), string(res2)) } @@ -133,7 +134,7 @@ func getRecords(n int) []int { for i := 0; i < n; i++ { go func() { defer wg.Done() - res_bin, _ := db.GetNextRecord(dbname) + res_bin, _ := db.GetNextRecord(dbname, groupId) var res TestRecord json.Unmarshal(res_bin, &res) results[res.ID] = 1 @@ -177,7 +178,7 @@ func TestMongoDBGetRecordNext(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.InsertRecord(dbname, &rec1) - res, err := db.GetRecordFromDb(dbname, "next", 0) + res, err := db.GetRecordFromDb(dbname, groupId, "next", 0) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -186,7 +187,7 @@ func TestMongoDBGetRecordID(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.InsertRecord(dbname, &rec1) - res, err := db.GetRecordFromDb(dbname, "id", 1) + res, err := db.GetRecordFromDb(dbname, groupId, "id", 1) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -195,7 +196,7 @@ func TestMongoDBWrongOp(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.InsertRecord(dbname, &rec1) - _, err := db.GetRecordFromDb(dbname, "bla", 0) + _, err := db.GetRecordFromDb(dbname, groupId, "bla", 0) assert.NotNil(t, err) } @@ -205,7 +206,7 @@ func TestMongoDBGetRecordLast(t *testing.T) { db.InsertRecord(dbname, &rec1) db.InsertRecord(dbname, &rec2) - res, err := db.GetRecordFromDb(dbname, "last", 0) + res, err := db.GetRecordFromDb(dbname, groupId, "last", 0) assert.Nil(t, err) assert.Equal(t, string(rec2_expect), string(res)) } @@ -216,13 +217,13 @@ func TestMongoDBGetNextAfterGetLastCorrect(t *testing.T) { db.InsertRecord(dbname, &rec1) db.InsertRecord(dbname, &rec2) - res, err := db.GetRecordFromDb(dbname, "last", 0) + res, err := db.GetRecordFromDb(dbname, groupId, "last", 0) assert.Nil(t, err) assert.Equal(t, string(rec2_expect), string(res)) db.InsertRecord(dbname, &rec3) - res, err = db.GetRecordFromDb(dbname, "next", 0) + res, err = db.GetRecordFromDb(dbname, groupId, "next", 0) assert.Nil(t, err) assert.Equal(t, string(rec3_expect), string(res)) diff --git a/broker/src/asapo_broker/server/get_id.go b/broker/src/asapo_broker/server/get_id.go index adffdf262ff5fc9ea157990d616e9f624ef50a4b..80a30ddbe2a5ba632e9da206937e186b1ebace60 100644 --- a/broker/src/asapo_broker/server/get_id.go +++ b/broker/src/asapo_broker/server/get_id.go @@ -22,5 +22,5 @@ func routeGetByID(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) return } - getImage(w, r, "id", id) + getImage(w, r, "id", id, false) } diff --git a/broker/src/asapo_broker/server/get_id_test.go b/broker/src/asapo_broker/server/get_id_test.go index dff12942c66fd9f5874e25c67669769624595612..dc09a5a7ee99d7d5bfd9d4e8aa5fbf6985f39122 100644 --- a/broker/src/asapo_broker/server/get_id_test.go +++ b/broker/src/asapo_broker/server/get_id_test.go @@ -45,7 +45,7 @@ func TestGetIDTestSuite(t *testing.T) { } func (suite *GetIDTestSuite) TestGetIdCallsCorrectRoutine() { - suite.mock_db.On("GetRecordFromDb", expectedBeamtimeId, "id", 1).Return([]byte("Hello"), nil) + suite.mock_db.On("GetRecordFromDb", expectedBeamtimeId, "", "id", 1).Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("get id request"))) ExpectCopyClose(suite.mock_db) diff --git a/broker/src/asapo_broker/server/get_image.go b/broker/src/asapo_broker/server/get_image.go index 79f1ceaae687fe0d3deb376432d78e6e30eba8f6..66a8bd8f7c07d8dd442f30e38d9acebff6a1ec66 100644 --- a/broker/src/asapo_broker/server/get_image.go +++ b/broker/src/asapo_broker/server/get_image.go @@ -4,19 +4,43 @@ import ( "asapo_broker/database" "asapo_common/logger" "asapo_common/utils" + "fmt" "github.com/gorilla/mux" + "github.com/rs/xid" "net/http" ) -func extractRequestParameters(r *http.Request) (string, bool) { +func extractRequestParameters(r *http.Request, needGroupID bool) (string, string, bool) { vars := mux.Vars(r) - db_name, ok := vars["dbname"] - return db_name, ok + db_name, ok1 := vars["dbname"] + ok2 := true + group_id := "" + if needGroupID { + group_id, ok2 = vars["groupid"] + } + return db_name, group_id, ok1 && ok2 } -func getImage(w http.ResponseWriter, r *http.Request, op string, id int) { +func checkGroupID(w http.ResponseWriter, needGroupID bool, group_id string, db_name string, op string) bool { + if !needGroupID { + return true + } + if _, err := xid.FromString(group_id); err != nil { + err_str := "wrong groupid " + group_id + log_str := "processing get " + op + " request in " + db_name + " at " + settings.BrokerDbAddress + ": " + err_str + logger.Error(log_str) + fmt.Println(log_str) + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err_str)) + return false + } + return true +} + +func getImage(w http.ResponseWriter, r *http.Request, op string, id int, needGroupID bool) { r.Header.Set("Content-type", "application/json") - db_name, ok := extractRequestParameters(r) + w.Header().Set("Access-Control-Allow-Origin", "*") + db_name, group_id, ok := extractRequestParameters(r, needGroupID) if !ok { w.WriteHeader(http.StatusBadRequest) return @@ -27,7 +51,11 @@ func getImage(w http.ResponseWriter, r *http.Request, op string, id int) { return } - answer, code := getRecord(db_name, op, id) + if !checkGroupID(w, needGroupID, group_id, db_name, op) { + return + } + + answer, code := getRecord(db_name, group_id, op, id) w.WriteHeader(code) w.Write(answer) } @@ -46,11 +74,11 @@ func returnError(err error, log_str string) (answer []byte, code int) { return []byte(err.Error()), code } -func getRecord(db_name string, op string, id int) (answer []byte, code int) { +func getRecord(db_name string, group_id string, op string, id int) (answer []byte, code int) { db_new := db.Copy() defer db_new.Close() statistics.IncreaseCounter() - answer, err := db_new.GetRecordFromDb(db_name, op, id) + answer, err := db_new.GetRecordFromDb(db_name, group_id, op, id) log_str := "processing get " + op + " request in " + db_name + " at " + settings.BrokerDbAddress if err != nil { return returnError(err, log_str) diff --git a/broker/src/asapo_broker/server/get_image_test.go b/broker/src/asapo_broker/server/get_image_test.go index cd885e7e6cb402cd8df6c4e3df08c25713310821..6d583c4cf60d2c9cbbad374930ea05c4218ad433 100644 --- a/broker/src/asapo_broker/server/get_image_test.go +++ b/broker/src/asapo_broker/server/get_image_test.go @@ -16,6 +16,9 @@ import ( var correctTokenSuffix, wrongTokenSuffix, suffixWithWrongToken, expectedBeamtimeId string +const expectedGroupID = "bid2a5auidddp1vl71d0" +const wrongGroupID = "bid2a5auidddp1vl71" + func prepareTestAuth() { expectedBeamtimeId = "beamtime_id" auth = utils.NewHMACAuth("secret") @@ -46,9 +49,14 @@ func containsMatcher(substrings ...string) func(str string) bool { } } -func doRequest(path string) *httptest.ResponseRecorder { +func doRequest(path string, method ...string) *httptest.ResponseRecorder { + m := "GET" + if len(method) > 0 { + m = method[0] + } + mux := utils.NewRouter(listRoutes) - req, _ := http.NewRequest("GET", path, nil) + req, _ := http.NewRequest(m, path, nil) w := httptest.NewRecorder() mux.ServeHTTP(w, req) return w @@ -90,7 +98,7 @@ func TestGetImageTestSuite(t *testing.T) { func (suite *GetImageTestSuite) TestGetImageWithWrongToken() { logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("wrong token"))) - w := doRequest("/database/" + expectedBeamtimeId + "/next" + suffixWithWrongToken) + w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedGroupID + "/next" + suffixWithWrongToken) suite.Equal(http.StatusUnauthorized, w.Code, "wrong token") } @@ -98,37 +106,43 @@ func (suite *GetImageTestSuite) TestGetImageWithWrongToken() { func (suite *GetImageTestSuite) TestGetImageWithNoToken() { logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("cannot extract"))) - w := doRequest("/database/" + expectedBeamtimeId + "/next" + wrongTokenSuffix) + w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedGroupID + "/next" + wrongTokenSuffix) suite.Equal(http.StatusUnauthorized, w.Code, "no token") } func (suite *GetImageTestSuite) TestGetImageWithWrongDatabaseName() { - suite.mock_db.On("GetRecordFromDb", expectedBeamtimeId, "next", 0).Return([]byte(""), + suite.mock_db.On("GetRecordFromDb", expectedBeamtimeId, expectedGroupID, "next", 0).Return([]byte(""), &database.DBError{utils.StatusWrongInput, ""}) logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("get next request"))) ExpectCopyClose(suite.mock_db) - w := doRequest("/database/" + expectedBeamtimeId + "/next" + correctTokenSuffix) + w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedGroupID + "/next" + correctTokenSuffix) suite.Equal(http.StatusBadRequest, w.Code, "wrong database name") } func (suite *GetImageTestSuite) TestGetImageWithInternalDBError() { - suite.mock_db.On("GetRecordFromDb", expectedBeamtimeId, "next", 0).Return([]byte(""), errors.New("")) + suite.mock_db.On("GetRecordFromDb", expectedBeamtimeId, expectedGroupID, "next", 0).Return([]byte(""), errors.New("")) logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("get next request"))) ExpectCopyClose(suite.mock_db) - w := doRequest("/database/" + expectedBeamtimeId + "/next" + correctTokenSuffix) + w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedGroupID + "/next" + correctTokenSuffix) suite.Equal(http.StatusInternalServerError, w.Code, "internal error") } func (suite *GetImageTestSuite) TestGetImageAddsCounter() { - suite.mock_db.On("GetRecordFromDb", expectedBeamtimeId, "next", 0).Return([]byte("Hello"), nil) + suite.mock_db.On("GetRecordFromDb", expectedBeamtimeId, expectedGroupID, "next", 0).Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("get next request in "+expectedBeamtimeId))) ExpectCopyClose(suite.mock_db) - doRequest("/database/" + expectedBeamtimeId + "/next" + correctTokenSuffix) + doRequest("/database/" + expectedBeamtimeId + "/" + expectedGroupID + "/next" + correctTokenSuffix) suite.Equal(1, statistics.GetCounter(), "GetImage increases counter") } + +func (suite *GetImageTestSuite) TestGetImageWrongGroupID() { + logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("wrong groupid"))) + w := doRequest("/database/" + expectedBeamtimeId + "/" + wrongGroupID + "/next" + correctTokenSuffix) + suite.Equal(http.StatusBadRequest, w.Code, "wrong group id") +} diff --git a/broker/src/asapo_broker/server/get_last.go b/broker/src/asapo_broker/server/get_last.go index 19b270b58e5968088f6a9f9155448bcd795c33bc..1242ad6bb8179c9e1fb9215e504747586294f58a 100644 --- a/broker/src/asapo_broker/server/get_last.go +++ b/broker/src/asapo_broker/server/get_last.go @@ -5,5 +5,5 @@ import ( ) func routeGetLast(w http.ResponseWriter, r *http.Request) { - getImage(w, r, "last", 0) + getImage(w, r, "last", 0, true) } diff --git a/broker/src/asapo_broker/server/get_last_test.go b/broker/src/asapo_broker/server/get_last_test.go index 05d03460d73b503fb7cc1e2aef4979433452ab40..ea355ca7c7562dbed4d84bb518cf0f19af3fd67c 100644 --- a/broker/src/asapo_broker/server/get_last_test.go +++ b/broker/src/asapo_broker/server/get_last_test.go @@ -33,11 +33,11 @@ func TestGetLastTestSuite(t *testing.T) { } func (suite *GetLastTestSuite) TestGetLastCallsCorrectRoutine() { - suite.mock_db.On("GetRecordFromDb", expectedBeamtimeId, "last", 0).Return([]byte("Hello"), nil) + suite.mock_db.On("GetRecordFromDb", expectedBeamtimeId, expectedGroupID, "last", 0).Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("get last request"))) ExpectCopyClose(suite.mock_db) - w := doRequest("/database/" + expectedBeamtimeId + "/last" + correctTokenSuffix) + w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedGroupID + "/last" + correctTokenSuffix) suite.Equal(http.StatusOK, w.Code, "GetLast OK") suite.Equal("Hello", string(w.Body.Bytes()), "GetLast sends data") } diff --git a/broker/src/asapo_broker/server/get_next.go b/broker/src/asapo_broker/server/get_next.go index 661555bfecaa93152c39fcf96578aa6675d91908..535c54954a2f60748f9587a67046cf2208c5faea 100644 --- a/broker/src/asapo_broker/server/get_next.go +++ b/broker/src/asapo_broker/server/get_next.go @@ -5,5 +5,5 @@ import ( ) func routeGetNext(w http.ResponseWriter, r *http.Request) { - getImage(w, r, "next", 0) + getImage(w, r, "next", 0, true) } diff --git a/broker/src/asapo_broker/server/get_next_test.go b/broker/src/asapo_broker/server/get_next_test.go index 04c681c61666d3418dcee1efc7ef3f7924279c25..45cead5a16bf8d8b10d6e5e274f03d188ae6c699 100644 --- a/broker/src/asapo_broker/server/get_next_test.go +++ b/broker/src/asapo_broker/server/get_next_test.go @@ -33,11 +33,11 @@ func TestGetNextTestSuite(t *testing.T) { } func (suite *GetNextTestSuite) TestGetNextCallsCorrectRoutine() { - suite.mock_db.On("GetRecordFromDb", expectedBeamtimeId, "next", 0).Return([]byte("Hello"), nil) + suite.mock_db.On("GetRecordFromDb", expectedBeamtimeId, expectedGroupID, "next", 0).Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("get next request"))) ExpectCopyClose(suite.mock_db) - w := doRequest("/database/" + expectedBeamtimeId + "/next" + correctTokenSuffix) + w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedGroupID + "/next" + correctTokenSuffix) suite.Equal(http.StatusOK, w.Code, "GetNext OK") suite.Equal("Hello", string(w.Body.Bytes()), "GetNext sends data") } diff --git a/broker/src/asapo_broker/server/listroutes.go b/broker/src/asapo_broker/server/listroutes.go index 23c73d9daa12e6363ee14613cc354d293840be72..486905245bf5dd46be582120e55dd6b5ea624091 100644 --- a/broker/src/asapo_broker/server/listroutes.go +++ b/broker/src/asapo_broker/server/listroutes.go @@ -8,13 +8,13 @@ var listRoutes = utils.Routes{ utils.Route{ "GetNext", "Get", - "/database/{dbname}/next", + "/database/{dbname}/{groupid}/next", routeGetNext, }, utils.Route{ "GetLast", "Get", - "/database/{dbname}/last", + "/database/{dbname}/{groupid}/last", routeGetLast, }, utils.Route{ @@ -23,7 +23,12 @@ var listRoutes = utils.Routes{ "/database/{dbname}/{id}", routeGetByID, }, - + utils.Route{ + "CreateGroup", + "Post", + "/creategroup", + routeCreateGroupID, + }, utils.Route{ "Health", "Get", diff --git a/broker/src/asapo_broker/server/post_create_group.go b/broker/src/asapo_broker/server/post_create_group.go new file mode 100644 index 0000000000000000000000000000000000000000..b9cfb51f7d10ef99083ffa9a28ec937efff816da --- /dev/null +++ b/broker/src/asapo_broker/server/post_create_group.go @@ -0,0 +1,14 @@ +package server + +import ( + "asapo_common/logger" + "github.com/rs/xid" + "net/http" +) + +func routeCreateGroupID(w http.ResponseWriter, r *http.Request) { + guid := xid.New() + w.Write([]byte(guid.String())) + logger.Debug("generated new group: " + guid.String()) + statistics.IncreaseCounter() +} diff --git a/broker/src/asapo_broker/server/post_create_group_test.go b/broker/src/asapo_broker/server/post_create_group_test.go new file mode 100644 index 0000000000000000000000000000000000000000..5f19da351eedf60bc8992a3b7827982d659be194 --- /dev/null +++ b/broker/src/asapo_broker/server/post_create_group_test.go @@ -0,0 +1,34 @@ +package server + +import ( + "asapo_common/logger" + "github.com/rs/xid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "net/http" + "testing" +) + +func GetObjectID(t *testing.T) (xid.ID, error) { + w := doRequest("/creategroup", "POST") + assert.Equal(t, http.StatusOK, w.Code, "New Group OK") + return xid.FromString(w.Body.String()) +} + +func TestGetNewGroup(t *testing.T) { + statistics.Reset() + logger.SetMockLog() + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("generated new group"))) + + id1, err := GetObjectID(t) + assert.Nil(t, err, "first is ObjectID") + + id2, err := GetObjectID(t) + assert.Nil(t, err, "second is ObjectID") + + assert.NotEqual(t, id1.String(), id2.String()) + assert.Equal(t, id1.Counter()+1, id2.Counter()) + assert.Equal(t, 2, statistics.GetCounter(), "creategroup increases counter") + + logger.UnsetMockLog() +} diff --git a/common/cpp/src/http_client/curl_http_client.cpp b/common/cpp/src/http_client/curl_http_client.cpp index 6111559f39b5dbe1ba4e17881c05bb2ffb5887f5..8e80571f6f2463c2ff0706474e0f10f159277932 100644 --- a/common/cpp/src/http_client/curl_http_client.cpp +++ b/common/cpp/src/http_client/curl_http_client.cpp @@ -38,8 +38,9 @@ void SetCurlOptions(CURL* curl, bool post, const std::string& data, const std::s if (post) { curl_easy_setopt(curl, CURLOPT_POSTFIELDS, data.c_str()); + } else { + curl_easy_setopt(curl, CURLOPT_HTTPGET, 1L); } - } HttpCode GetResponseCode(CURL* curl) { diff --git a/examples/worker/getnext_broker/getnext_broker.cpp b/examples/worker/getnext_broker/getnext_broker.cpp index c9a497d61ef7bf279b03e5c0f3c0c63d9ab18dc9..0410877de75c546f0cf13c9fd69d973d2bf56ed0 100644 --- a/examples/worker/getnext_broker/getnext_broker.cpp +++ b/examples/worker/getnext_broker/getnext_broker.cpp @@ -6,12 +6,16 @@ #include <chrono> #include <iomanip> #include <numeric> +#include <mutex> #include "asapo_worker.h" using std::chrono::high_resolution_clock; using asapo::Error; +std::string group_id = ""; +std::mutex lock; + struct Params { std::string server; std::string file_path; @@ -46,8 +50,20 @@ std::vector<std::thread> StartThreads(const Params& params, broker->SetTimeout((uint64_t) params.timeout_ms); asapo::FileData data; + lock.lock(); + + if (group_id.empty()) { + group_id = broker->GenerateNewGroupId(&err); + if (err) { + (*errors)[i] += ProcessError(err); + return; + } + } + + lock.unlock(); + while (true) { - err = broker->GetNext(&fi, params.read_data ? &data : nullptr); + err = broker->GetNext(&fi, group_id, params.read_data ? &data : nullptr); if (err == nullptr) { (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; if (params.read_data && (*nfiles)[i] < 10 && fi.size < 10) { diff --git a/examples/worker/getnext_broker_python/check_linux.sh b/examples/worker/getnext_broker_python/check_linux.sh index 8008fce0aa221ffcacc88558e3ed52e325a26f84..707c48bbc60f6bf4cf1443d357b35f041709587f 100644 --- a/examples/worker/getnext_broker_python/check_linux.sh +++ b/examples/worker/getnext_broker_python/check_linux.sh @@ -3,7 +3,7 @@ source_path=dummy database_name=test_run token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo= - +group_id=bif31l2uiddd4r0q6b40 #set -e @@ -31,20 +31,25 @@ sleep 1 export PYTHONPATH=$1:${PYTHONPATH} -python getnext.py 127.0.0.1:8400 $source_path $database_name $token_test_run > out +python getnext.py 127.0.0.1:8400 $source_path $database_name $token_test_run $group_id > out cat out cat out | grep '"size": 100' cat out | grep '"_id": 1' -python getnext.py 127.0.0.1:8400 $source_path $database_name $token_test_run > out +python getnext.py 127.0.0.1:8400 $source_path $database_name $token_test_run $group_id> out cat out cat out | grep '"_id": 2' -python3 getnext.py 127.0.0.1:8400 $source_path $database_name $token_test_run > out +python3 getnext.py 127.0.0.1:8400 $source_path $database_name $token_test_run $group_id> out cat out cat out | grep '"_id": 3' +python3 getnext.py 127.0.0.1:8400 $source_path $database_name $token_test_run new> out +cat out +cat out | grep '"_id": 1' + + #echo $? diff --git a/examples/worker/getnext_broker_python/check_windows.bat b/examples/worker/getnext_broker_python/check_windows.bat index 4f327539e76cf03437162c28fd1f1222103c0494..7bc8389650b19bd367e203d5c4c9d7a98324a621 100644 --- a/examples/worker/getnext_broker_python/check_windows.bat +++ b/examples/worker/getnext_broker_python/check_windows.bat @@ -3,6 +3,7 @@ SET database_name=test_run SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" set token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo= +set group_id=bif31l2uiddd4r0q6b40 c:\opt\consul\nomad run discovery.nmd c:\opt\consul\nomad run broker.nmd @@ -14,20 +15,26 @@ for /l %%x in (1, 1, 3) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x set PYTHONPATH=%1 -python3 getnext.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% > out +python3 getnext.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% %group_id% > out type out type out | findstr /c:"100" || goto :error type out | findstr /c:"\"_id\": 1" || goto :error -python3 getnext.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% > out +python3 getnext.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% %group_id% > out type out type out | findstr /c:"\"_id\": 2" || goto :error -python3 getnext.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% > out +python3 getnext.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% %group_id% > out type out type out | findstr /c:"\"_id\": 3" || goto :error +python3 getnext.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% new > out +type out +type out | findstr /c:"100" || goto :error +type out | findstr /c:"\"_id\": 1" || goto :error + + goto :clean :error diff --git a/examples/worker/getnext_broker_python/getnext.py b/examples/worker/getnext_broker_python/getnext.py index b8d10262d1ece33fc11337e484819b4d0dab63f7..f124d484aa41995fa3b8e34b77d378a1a3dd409d 100644 --- a/examples/worker/getnext_broker_python/getnext.py +++ b/examples/worker/getnext_broker_python/getnext.py @@ -4,11 +4,21 @@ import asapo_worker import json import sys -source, path, beamtime, token = sys.argv[1:] +source, path, beamtime, token, group_id = sys.argv[1:] broker, err = asapo_worker.create_server_broker(source,path, beamtime,token,1000) -_, meta, err = broker.get_next(meta_only=True) + +if group_id == "new": + group_id_new, err = broker.generate_group_id() + if err != None: + print ('cannot generate group id, err: ', err) + else: + print ('generated group id: ', group_id_new) +else: + group_id_new = group_id + +_, meta, err = broker.get_next(group_id_new, meta_only=True) if err != None: print ('err: ', err) else: diff --git a/examples/worker/process_folder/process_folder.cpp b/examples/worker/process_folder/process_folder.cpp index 63a6955dd066b4c616e3b054c5aea1551b6dae54..9c390e9458be7681c6b0fcd687e6b517d9ed0f99 100644 --- a/examples/worker/process_folder/process_folder.cpp +++ b/examples/worker/process_folder/process_folder.cpp @@ -57,7 +57,7 @@ void ReadAllData(std::unique_ptr<asapo::DataBroker>* broker, Statistics* statist int nfiles = 0; uint64_t size = 0; - while ((err = (*broker)->GetNext(&file_info, &file_data)) == nullptr) { + while ((err = (*broker)->GetNext(&file_info, "", &file_data)) == nullptr) { nfiles++; size += file_info.size; } diff --git a/tests/automatic/broker/check_monitoring/check_linux.sh b/tests/automatic/broker/check_monitoring/check_linux.sh index c992ca452b3b6229712eedb9046c0ed7a56429f3..2f71cb601c61e4a9d7aac62586fbe123ef0b91fe 100644 --- a/tests/automatic/broker/check_monitoring/check_linux.sh +++ b/tests/automatic/broker/check_monitoring/check_linux.sh @@ -24,13 +24,16 @@ sleep 0.3 brokerid=`echo $!` +groupid=`curl -d '' --silent 127.0.0.1:5005/creategroup` + + for i in `seq 1 50`; do - curl --silent 127.0.0.1:5005/database/data/next?token=$token >/dev/null 2>&1 & + curl --silent 127.0.0.1:5005/database/data/${groupid}/next?token=$token >/dev/null 2>&1 & done sleep 3 -influx -execute "select sum(rate) from RequestsRate" -database=${database_name} -format=json | jq .results[0].series[0].values[0][1] | tee /dev/stderr | grep 50 +influx -execute "select sum(rate) from RequestsRate" -database=${database_name} -format=json | jq .results[0].series[0].values[0][1] | tee /dev/stderr | grep 51 diff --git a/tests/automatic/broker/get_last/check_linux.sh b/tests/automatic/broker/get_last/check_linux.sh index 4f0781386f675ce4f5567b3645461f7c623a0064..fd98f97807a31bd55096a47ba17e97db5e08ecff 100644 --- a/tests/automatic/broker/get_last/check_linux.sh +++ b/tests/automatic/broker/get_last/check_linux.sh @@ -23,16 +23,23 @@ sleep 0.3 brokerid=`echo $!` -curl -v --silent 127.0.0.1:5005/database/data/last?token=$token --stderr - +groupid=`curl -d '' --silent 127.0.0.1:5005/creategroup` -curl -v --silent 127.0.0.1:5005/database/data/last?token=$token --stderr - | grep '"_id":2' -curl -v --silent 127.0.0.1:5005/database/data/last?token=$token --stderr - | grep '"_id":2' +curl -v --silent 127.0.0.1:5005/database/data/${groupid}/last?token=$token --stderr - + +curl -v --silent 127.0.0.1:5005/database/data/${groupid}/last?token=$token --stderr - | grep '"_id":2' +curl -v --silent 127.0.0.1:5005/database/data/${groupid}/last?token=$token --stderr - | grep '"_id":2' echo "db.data.insert({"_id":3})" | mongo ${database_name} -curl -v --silent 127.0.0.1:5005/database/data/last?token=$token --stderr - | grep '"_id":3' +curl -v --silent 127.0.0.1:5005/database/data/${groupid}/last?token=$token --stderr - | grep '"_id":3' echo "db.data.insert({"_id":4})" | mongo ${database_name} -curl -v --silent 127.0.0.1:5005/database/data/next?token=$token --stderr - | grep '"_id":4' -curl -v --silent 127.0.0.1:5005/database/data/last?token=$token --stderr - | grep '"_id":4' \ No newline at end of file +curl -v --silent 127.0.0.1:5005/database/data/${groupid}/next?token=$token --stderr - | grep '"_id":4' +curl -v --silent 127.0.0.1:5005/database/data/${groupid}/last?token=$token --stderr - | grep '"_id":4' + +#with a new group +groupid=`curl -d '' --silent 127.0.0.1:5005/creategroup` +curl -v --silent 127.0.0.1:5005/database/data/${groupid}/next?token=$token --stderr - | grep '"_id":1' +curl -v --silent 127.0.0.1:5005/database/data/${groupid}/last?token=$token --stderr - | grep '"_id":4' \ No newline at end of file diff --git a/tests/automatic/broker/get_last/check_windows.bat b/tests/automatic/broker/get_last/check_windows.bat index 8661112fa16ba17bfc9517887cd42b5773a69eb3..4a396ec7656c3bebfd80d2a712206ec3f477ff2a 100644 --- a/tests/automatic/broker/get_last/check_windows.bat +++ b/tests/automatic/broker/get_last/check_windows.bat @@ -10,22 +10,29 @@ set short_name="%~nx1" "%2" token -secret broker_secret.key data > token set /P token=< token - - start /B "" "%full_name%" -config settings.json - ping 1.0.0.0 -n 1 -w 100 > nul -C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/last?token=%token% --stderr - | findstr /c:\"_id\":2 || goto :error -C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/last?token=%token% --stderr - | findstr /c:\"_id\":2 || goto :error +C:\Curl\curl.exe -d '' --silent 127.0.0.1:5005/creategroup > groupid +set /P groupid=< groupid + + +C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/%groupid%/last?token=%token% --stderr - | findstr /c:\"_id\":2 || goto :error +C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/%groupid%/last?token=%token% --stderr - | findstr /c:\"_id\":2 || goto :error echo db.data.insert({"_id":3}) | %mongo_exe% %database_name% || goto :error -C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/last?token=%token% --stderr - | findstr /c:\"_id\":3 || goto :error +C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/%groupid%/last?token=%token% --stderr - | findstr /c:\"_id\":3 || goto :error echo db.data.insert({"_id":4}) | %mongo_exe% %database_name% || goto :error -C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next?token=%token% --stderr - | findstr /c:\"_id\":4 || goto :error -C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/last?token=%token% --stderr - | findstr /c:\"_id\":4 || goto :error +C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/%groupid%/next?token=%token% --stderr - | findstr /c:\"_id\":4 || goto :error +C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/%groupid%/last?token=%token% --stderr - | findstr /c:\"_id\":4 || goto :error + + +C:\Curl\curl.exe -d '' --silent 127.0.0.1:5005/creategroup > groupid +set /P groupid=< groupid +C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/%groupid%/next?token=%token% --stderr - | findstr /c:\"_id\":1 || goto :error +C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/%groupid%/last?token=%token% --stderr - | findstr /c:\"_id\":4 || goto :error goto :clean @@ -37,4 +44,5 @@ exit /b 1 :clean Taskkill /IM "%short_name%" /F echo db.dropDatabase() | %mongo_exe% %database_name% -del /f token \ No newline at end of file +del /f token +del /f groupid \ No newline at end of file diff --git a/tests/automatic/broker/get_next/check_linux.sh b/tests/automatic/broker/get_next/check_linux.sh index f4bf78a6f74689bfbe02418955317dca3472eb62..e41fa62d85758b820530ff0fe146c863aae27fc2 100644 --- a/tests/automatic/broker/get_next/check_linux.sh +++ b/tests/automatic/broker/get_next/check_linux.sh @@ -22,9 +22,12 @@ $1 -config settings.json & sleep 0.3 brokerid=`echo $!` +groupid=`curl -d '' --silent 127.0.0.1:5005/creategroup` +curl -v --silent 127.0.0.1:5005/database/data/${groupid}/next?token=$token --stderr - | grep '"_id":1' +curl -v --silent 127.0.0.1:5005/database/data/${groupid}/next?token=$token --stderr - | grep '"_id":2' +curl -v --silent 127.0.0.1:5005/database/data/${groupid}/next?token=$token --stderr - | grep "not found" -curl -v --silent 127.0.0.1:5005/database/data/next?token=$token --stderr - | grep '"_id":1' -curl -v --silent 127.0.0.1:5005/database/data/next?token=$token --stderr - | grep '"_id":2' - -curl -v --silent 127.0.0.1:5005/database/data/next?token=$token --stderr - | grep "not found" +# with a new group +groupid=`curl -d '' --silent 127.0.0.1:5005/creategroup` +curl -v --silent 127.0.0.1:5005/database/data/${groupid}/next?token=$token --stderr - | grep '"_id":1' \ No newline at end of file diff --git a/tests/automatic/broker/get_next/check_windows.bat b/tests/automatic/broker/get_next/check_windows.bat index dfa4ffa8535ce466d68a547151f8d9b19d81cde2..187de2ac8e9eef0fec2d3bdbadfa6529754b3549 100644 --- a/tests/automatic/broker/get_next/check_windows.bat +++ b/tests/automatic/broker/get_next/check_windows.bat @@ -10,15 +10,19 @@ set short_name="%~nx1" "%2" token -secret broker_secret.key data > token set /P token=< token - - start /B "" "%full_name%" -config settings.json ping 1.0.0.0 -n 1 -w 100 > nul -C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next?token=%token% --stderr - | findstr /c:\"_id\":1 || goto :error -C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next?token=%token% --stderr - | findstr /c:\"_id\":2 || goto :error -C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next?token=%token% --stderr - | findstr /c:"not found" || goto :error +C:\Curl\curl.exe -d '' --silent 127.0.0.1:5005/creategroup > groupid +set /P groupid=< groupid +C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/%groupid%/next?token=%token% --stderr - | findstr /c:\"_id\":1 || goto :error +C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/%groupid%/next?token=%token% --stderr - | findstr /c:\"_id\":2 || goto :error +C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/%groupid%/next?token=%token% --stderr - | findstr /c:"not found" || goto :error + +C:\Curl\curl.exe -d '' --silent 127.0.0.1:5005/creategroup > groupid +set /P groupid=< groupid +C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/%groupid%/next?token=%token% --stderr - | findstr /c:\"_id\":1 || goto :error goto :clean @@ -29,4 +33,5 @@ exit /b 1 :clean Taskkill /IM "%short_name%" /F echo db.dropDatabase() | %mongo_exe% %database_name% -del /f token \ No newline at end of file +del /f token +del /f groupid \ No newline at end of file diff --git a/tests/automatic/worker/next_multithread_broker/next_multithread_broker.cpp b/tests/automatic/worker/next_multithread_broker/next_multithread_broker.cpp index d4592188276de1cd5899cdc8b0019debb9b54a19..ecacc919da9e5e29112383e56f04bca66f292442 100644 --- a/tests/automatic/worker/next_multithread_broker/next_multithread_broker.cpp +++ b/tests/automatic/worker/next_multithread_broker/next_multithread_broker.cpp @@ -50,11 +50,11 @@ Args GetArgs(int argc, char* argv[]) { void GetAllFromBroker(const Args& args) { asapo::Error err; auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, "dummy", args.run_name, args.token, &err); - + auto group_id = broker->GenerateNewGroupId(&err); std::vector<asapo::FileInfos>file_infos(args.nthreads); auto exec_next = [&](int i) { asapo::FileInfo fi; - while ((err = broker->GetNext(&fi, nullptr)) == nullptr) { + while ((err = broker->GetNext(&fi, group_id, nullptr)) == nullptr) { file_infos[i].emplace_back(fi); } printf("%s\n", err->Explain().c_str()); diff --git a/tests/automatic/worker/next_multithread_folder/next_multithread_folder.cpp b/tests/automatic/worker/next_multithread_folder/next_multithread_folder.cpp index 9c7fb9634e8ce90e4f8f07d811943d0131ba0a8f..434fc7ea0d437e8674565ba774805ce7265df888 100644 --- a/tests/automatic/worker/next_multithread_folder/next_multithread_folder.cpp +++ b/tests/automatic/worker/next_multithread_folder/next_multithread_folder.cpp @@ -45,7 +45,7 @@ void GetAllFromBroker(const Args& args) { std::vector<asapo::FileInfo>file_infos(args.nthreads); auto exec_next = [&](int i) { - broker->GetNext(&file_infos[i], nullptr); + broker->GetNext(&file_infos[i], "", nullptr); }; std::vector<std::thread> threads; diff --git a/tests/manual/performance_broker_receiver/getlast_broker.cpp b/tests/manual/performance_broker_receiver/getlast_broker.cpp index 9bdb7e7ad5f833fea1a9d1f8c51078ca8bf85467..95c374790fa7cec7e696c10b2414d919178f840a 100644 --- a/tests/manual/performance_broker_receiver/getlast_broker.cpp +++ b/tests/manual/performance_broker_receiver/getlast_broker.cpp @@ -9,6 +9,11 @@ #include "asapo_worker.h" +#include <mutex> + +std::string group_id = ""; +std::mutex lock; + using std::chrono::high_resolution_clock; using asapo::Error; @@ -46,10 +51,22 @@ std::vector<std::thread> StartThreads(const Params& params, broker->SetTimeout(params.timeout_ms); asapo::FileData data; + lock.lock(); + + if (group_id.empty()) { + group_id = broker->GenerateNewGroupId(&err); + if (err) { + (*errors)[i] += ProcessError(err); + return; + } + } + + lock.unlock(); + auto start = high_resolution_clock::now(); while (std::chrono::duration_cast<std::chrono::milliseconds>(high_resolution_clock::now() - start).count() < params.timeout_ms) { - err = broker->GetLast(&fi, params.read_data ? &data : nullptr); + err = broker->GetLast(&fi, group_id, params.read_data ? &data : nullptr); if (err == nullptr) { (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; if (params.read_data && (*nfiles)[i] < 10 && fi.size < 10) { diff --git a/worker/api/cpp/include/worker/data_broker.h b/worker/api/cpp/include/worker/data_broker.h index 6f6e468b1ceda55668f045edad5f3f3a577aaf92..50d6b90cea1a9377f5a96da2e2757a2a1d57f28a 100644 --- a/worker/api/cpp/include/worker/data_broker.h +++ b/worker/api/cpp/include/worker/data_broker.h @@ -34,19 +34,25 @@ class DataBroker { //! Set timeout for broker operations. Default - no timeout virtual void SetTimeout(uint64_t timeout_ms) = 0; //! Receive next image. + /*! + \param err - return nullptr of operation succeed, error otherwise. + \return group id if OK, "" otherwise. + */ + virtual std::string GenerateNewGroupId(Error* err) = 0; + //! Receive last available image. /*! \param info - where to store image metadata. Can be set to nullptr only image data is needed. \param data - where to store image data. Can be set to nullptr only image metadata is needed. - \return Error if both pointers are nullptr or data cannot be read, WorkerErrorCode::OK otherwise. + \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ - virtual Error GetNext(FileInfo* info, FileData* data) = 0; + virtual Error GetNext(FileInfo* info, std::string group_id, FileData* data) = 0; //! Receive last available image. /*! \param info - where to store image metadata. Can be set to nullptr only image data is needed. \param data - where to store image data. Can be set to nullptr only image metadata is needed. - \return Error if both pointers are nullptr or data cannot be read, WorkerErrorCode::OK otherwise. + \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ - virtual Error GetLast(FileInfo* info, FileData* data) = 0; + virtual Error GetLast(FileInfo* info, std::string group_id, FileData* data) = 0; virtual ~DataBroker() = default; // needed for unique_ptr to delete itself }; diff --git a/worker/api/cpp/src/folder_data_broker.cpp b/worker/api/cpp/src/folder_data_broker.cpp index 6214c505f8e68b9216df612f152a4581f8459193..d2a47b65e3a2fa6040e8143186fe5ad784ddd2c9 100644 --- a/worker/api/cpp/src/folder_data_broker.cpp +++ b/worker/api/cpp/src/folder_data_broker.cpp @@ -61,7 +61,7 @@ Error FolderDataBroker::GetFileByIndex(uint64_t nfile_to_get, FileInfo* info, Fi } -Error FolderDataBroker::GetNext(FileInfo* info, FileData* data) { +Error FolderDataBroker::GetNext(FileInfo* info, std::string group_id, FileData* data) { // could probably use atomic here, but just to make sure (tests showed no performance difference) mutex_.lock(); uint64_t nfile_to_get = ++current_file_; @@ -70,9 +70,14 @@ Error FolderDataBroker::GetNext(FileInfo* info, FileData* data) { return GetFileByIndex(nfile_to_get, info, data); } -Error FolderDataBroker::GetLast(FileInfo* info, FileData* data) { +Error FolderDataBroker::GetLast(FileInfo* info, std::string group_id, FileData* data) { uint64_t nfile_to_get = filelist_.size() - 1; return GetFileByIndex(nfile_to_get, info, data); } +std::string FolderDataBroker::GenerateNewGroupId(Error* err) { + *err = nullptr; + return ""; +} + } diff --git a/worker/api/cpp/src/folder_data_broker.h b/worker/api/cpp/src/folder_data_broker.h index cbf2005a86f67eac1c049544d6ae49cb11ad6284..78a2591f848124b7644364a1ebacb71f61491089 100644 --- a/worker/api/cpp/src/folder_data_broker.h +++ b/worker/api/cpp/src/folder_data_broker.h @@ -14,9 +14,11 @@ class FolderDataBroker final : public asapo::DataBroker { public: explicit FolderDataBroker(const std::string& source_name); Error Connect() override; - Error GetNext(FileInfo* info, FileData* data) override; - Error GetLast(FileInfo* info, FileData* data) override; + Error GetNext(FileInfo* info, std::string group_id, FileData* data) override; + Error GetLast(FileInfo* info, std::string group_id, FileData* data) override; void SetTimeout(uint64_t timeout_ms) override {}; // to timeout in this case + std::string GenerateNewGroupId(Error* err) + override; // return "0" always and no error - no group ids for folder datra broker std::unique_ptr<asapo::IO> io__; // modified in testings to mock system calls,otherwise do not touch private: diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp index 99d11e598dbe65e474ba62a27ebfacce75d1a5cf..db61b1a8c8f18d9bbcdbf7e5e8da9db36d3f8505 100644 --- a/worker/api/cpp/src/server_data_broker.cpp +++ b/worker/api/cpp/src/server_data_broker.cpp @@ -36,7 +36,7 @@ Error HttpCodeToWorkerError(const HttpCode& code) { message = WorkerErrorMessage::kNoData; return TextErrorWithType(message, ErrorType::kEndOfFile); default: - message = WorkerErrorMessage::kErrorReadingSource; + message = WorkerErrorMessage::kUnknownIOError; break; } return Error{new HttpError(message, code)}; @@ -88,10 +88,14 @@ std::string ServerDataBroker::RequestWithToken(std::string uri) { return std::move(uri) + "?token=" + token_; } -Error ServerDataBroker::ProcessRequest(std::string* response, std::string request_uri) { +Error ServerDataBroker::ProcessRequest(std::string* response, std::string request_uri, bool post) { Error err; HttpCode code; - *response = httpclient__->Get(RequestWithToken(request_uri), &code, &err); + if (post) { + *response = httpclient__->Post(RequestWithToken(request_uri), "", &code, &err); + } else { + *response = httpclient__->Get(RequestWithToken(request_uri), &code, &err); + } if (err != nullptr) { current_broker_uri_ = ""; return err; @@ -106,7 +110,7 @@ Error ServerDataBroker::GetBrokerUri() { std::string request_uri = server_uri_ + "/discovery/broker"; Error err; - err = ProcessRequest(¤t_broker_uri_, request_uri); + err = ProcessRequest(¤t_broker_uri_, request_uri, false); if (err != nullptr || current_broker_uri_.empty()) { current_broker_uri_ = ""; return TextError("cannot get broker uri from " + server_uri_); @@ -115,15 +119,15 @@ Error ServerDataBroker::GetBrokerUri() { } -Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, GetImageServerOperation op) { - std::string request_suffix = OpToUriCmd(op); +Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, std::string group_id, GetImageServerOperation op) { + std::string request_suffix = std::move(group_id) + "/" + OpToUriCmd(op); uint64_t elapsed_ms = 0; std::string response; while (true) { auto err = GetBrokerUri(); if (err == nullptr) { std::string request_api = current_broker_uri_ + "/database/" + source_name_ + "/"; - err = ProcessRequest(&response, request_api + request_suffix); + err = ProcessRequest(&response, request_api + request_suffix, false); if (err == nullptr) { break; } @@ -145,12 +149,12 @@ Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, GetImageServerOper return nullptr; } -Error ServerDataBroker::GetNext(FileInfo* info, FileData* data) { - return GetImageFromServer(GetImageServerOperation::GetNext, info, data); +Error ServerDataBroker::GetNext(FileInfo* info, std::string group_id, FileData* data) { + return GetImageFromServer(GetImageServerOperation::GetNext, std::move(group_id), info, data); } -Error ServerDataBroker::GetLast(FileInfo* info, FileData* data) { - return GetImageFromServer(GetImageServerOperation::GetLast, info, data); +Error ServerDataBroker::GetLast(FileInfo* info, std::string group_id, FileData* data) { + return GetImageFromServer(GetImageServerOperation::GetLast, std::move(group_id), info, data); } std::string ServerDataBroker::OpToUriCmd(GetImageServerOperation op) { @@ -163,12 +167,13 @@ std::string ServerDataBroker::OpToUriCmd(GetImageServerOperation op) { return ""; } -Error ServerDataBroker::GetImageFromServer(GetImageServerOperation op, FileInfo* info, FileData* data) { +Error ServerDataBroker::GetImageFromServer(GetImageServerOperation op, std::string group_id, FileInfo* info, + FileData* data) { if (info == nullptr) { return TextError(WorkerErrorMessage::kWrongInput); } - auto err = GetFileInfoFromServer(info, op); + auto err = GetFileInfoFromServer(info, std::move(group_id), op); if (err != nullptr) { return err; } @@ -202,4 +207,24 @@ Error ServerDataBroker::TryGetDataFromBuffer(const FileInfo* info, FileData* dat return net_client__->GetData(info, data); } +std::string ServerDataBroker::GenerateNewGroupId(Error* err) { + uint64_t elapsed_ms = 0; + std::string response; + while (elapsed_ms <= timeout_ms_) { + *err = GetBrokerUri(); + if (*err == nullptr) { + std::string request = current_broker_uri_ + "/creategroup"; + *err = ProcessRequest(&response, request, true); + if (*err == nullptr) { + return response; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + elapsed_ms += 100; + } + + *err = TextErrorWithType("exit on timeout, last error: " + (*err)->Explain(), asapo::ErrorType::kTimeOut); + return ""; +} + } diff --git a/worker/api/cpp/src/server_data_broker.h b/worker/api/cpp/src/server_data_broker.h index 34f8eebd3aee1e7311e28aff4ed7a5ebd739c771..80dfe0d7c5199daaa194ee396fd1a99991acbfee 100644 --- a/worker/api/cpp/src/server_data_broker.h +++ b/worker/api/cpp/src/server_data_broker.h @@ -19,8 +19,9 @@ class ServerDataBroker final : public asapo::DataBroker { public: explicit ServerDataBroker(std::string server_uri, std::string source_path, std::string source_name, std::string token); Error Connect() override; - Error GetNext(FileInfo* info, FileData* data) override; - Error GetLast(FileInfo* info, FileData* data) override; + Error GetNext(FileInfo* info, std::string group_id, FileData* data) override; + Error GetLast(FileInfo* info, std::string group_id, FileData* data) override; + std::string GenerateNewGroupId(Error* err) override; void SetTimeout(uint64_t timeout_ms) override; std::unique_ptr<IO> io__; // modified in testings to mock system calls,otherwise do not touch @@ -28,12 +29,12 @@ class ServerDataBroker final : public asapo::DataBroker { std::unique_ptr<NetClient> net_client__; private: std::string RequestWithToken(std::string uri); - Error GetFileInfoFromServer(FileInfo* info, GetImageServerOperation op); + Error GetFileInfoFromServer(FileInfo* info, std::string group_id, GetImageServerOperation op); Error GetDataIfNeeded(FileInfo* info, FileData* data); Error GetBrokerUri(); void ProcessServerError(Error* err, const std::string& response, std::string* redirect_uri); - Error ProcessRequest(std::string* response, std::string request_uri); - Error GetImageFromServer(GetImageServerOperation op, FileInfo* info, FileData* data); + Error ProcessRequest(std::string* response, std::string request_uri, bool post); + Error GetImageFromServer(GetImageServerOperation op, std::string group_id, FileInfo* info, FileData* data); bool DataCanBeInBuffer(const FileInfo* info); Error TryGetDataFromBuffer(const FileInfo* info, FileData* data); std::string OpToUriCmd(GetImageServerOperation op); diff --git a/worker/api/cpp/unittests/test_folder_broker.cpp b/worker/api/cpp/unittests/test_folder_broker.cpp index f7e3de194c8847bdca89990300e3dec47ac2462c..6d8beba9697d9be7c19c4e3a0654e7c191df891f 100644 --- a/worker/api/cpp/unittests/test_folder_broker.cpp +++ b/worker/api/cpp/unittests/test_folder_broker.cpp @@ -134,7 +134,7 @@ TEST_F(FolderDataBrokerTests, ConnectReturnsUnknownIOError) { } TEST_F(FolderDataBrokerTests, GetNextWithoutConnectReturnsError) { - auto err = data_broker->GetNext(nullptr, nullptr); + auto err = data_broker->GetNext(nullptr, "", nullptr); ASSERT_THAT(err->Explain(), Eq(asapo::WorkerErrorMessage::kSourceNotConnected)); } @@ -142,7 +142,7 @@ TEST_F(FolderDataBrokerTests, GetNextWithoutConnectReturnsError) { TEST_F(FolderDataBrokerTests, GetNextWithNullPointersReturnsError) { data_broker->Connect(); - auto err = data_broker->GetNext(nullptr, nullptr); + auto err = data_broker->GetNext(nullptr, "", nullptr); ASSERT_THAT(err->Explain(), Eq(asapo::WorkerErrorMessage::kWrongInput)); } @@ -151,7 +151,7 @@ TEST_F(FolderDataBrokerTests, GetNextReturnsFileInfo) { data_broker->Connect(); FileInfo fi; - auto err = data_broker->GetNext(&fi, nullptr); + auto err = data_broker->GetNext(&fi, "", nullptr); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(fi.name, Eq("1")); @@ -164,7 +164,7 @@ TEST_F(FolderDataBrokerTests, GetLastReturnsFileInfo) { data_broker->Connect(); FileInfo fi; - auto err = data_broker->GetLast(&fi, nullptr); + auto err = data_broker->GetLast(&fi, "", nullptr); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(fi.name, Eq("3")); @@ -176,9 +176,9 @@ TEST_F(FolderDataBrokerTests, GetLastSecondTimeReturnsSameFileInfo) { data_broker->Connect(); FileInfo fi; - auto err = data_broker->GetLast(&fi, nullptr); + auto err = data_broker->GetLast(&fi, "", nullptr); ASSERT_THAT(err, Eq(nullptr)); - err = data_broker->GetLast(&fi, nullptr); + err = data_broker->GetLast(&fi, "", nullptr); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(fi.name, Eq("3")); @@ -191,9 +191,9 @@ TEST_F(FolderDataBrokerTests, GetLastSecondTimeReturnsSameFileInfo) { TEST_F(FolderDataBrokerTests, SecondNextReturnsAnotherFileInfo) { data_broker->Connect(); FileInfo fi; - data_broker->GetNext(&fi, nullptr); + data_broker->GetNext(&fi, "", nullptr); - auto err = data_broker->GetNext(&fi, nullptr); + auto err = data_broker->GetNext(&fi, "", nullptr); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(fi.name, Eq("2")); @@ -204,7 +204,7 @@ TEST_F(FolderDataBrokerTests, GetNextFromEmptyFolderReturnsError) { data_broker->Connect(); FileInfo fi; - auto err = data_broker->GetNext(&fi, nullptr); + auto err = data_broker->GetNext(&fi, "", nullptr); ASSERT_TRUE(asapo::ErrorTemplates::kEndOfFile == err); ASSERT_THAT(err->Explain(), Eq(asapo::WorkerErrorMessage::kNoData)); @@ -216,7 +216,7 @@ TEST_F(FolderDataBrokerTests, GetNextReturnsErrorWhenFilePermissionsDenied) { FileInfo fi; FileData data; - auto err = data_broker->GetNext(&fi, &data); + auto err = data_broker->GetNext(&fi, "", &data); ASSERT_THAT(err->Explain(), Eq(asapo::IOErrorTemplates::kPermissionDenied.Generate()->Explain())); } @@ -244,7 +244,7 @@ TEST_F(GetDataFromFileTests, GetNextCallsGetDataFileWithFileName) { EXPECT_CALL(mock, GetDataFromFile_t(std::string("/path/to/file") + asapo::kPathSeparator + "1", _, _)). WillOnce(DoAll(testing::SetArgPointee<2>(static_cast<SimpleError*>(nullptr)), testing::Return(nullptr))); - data_broker->GetNext(&fi, &data); + data_broker->GetNext(&fi, "", &data); } @@ -252,7 +252,7 @@ TEST_F(GetDataFromFileTests, GetNextReturnsDataAndInfo) { EXPECT_CALL(mock, GetDataFromFile_t(_, _, _)). WillOnce(DoAll(testing::SetArgPointee<2>(nullptr), testing::Return(new uint8_t[1] {'1'}))); - data_broker->GetNext(&fi, &data); + data_broker->GetNext(&fi, "", &data); ASSERT_THAT(data[0], Eq('1')); ASSERT_THAT(fi.name, Eq("1")); @@ -264,7 +264,7 @@ TEST_F(GetDataFromFileTests, GetNextReturnsErrorWhenCannotReadData) { WillOnce(DoAll(testing::SetArgPointee<2>(asapo::IOErrorTemplates::kReadError.Generate().release()), testing::Return(nullptr))); - auto err = data_broker->GetNext(&fi, &data); + auto err = data_broker->GetNext(&fi, "", &data); ASSERT_THAT(err->Explain(), Eq(asapo::IOErrorTemplates::kReadError.Generate()->Explain())); } @@ -274,7 +274,7 @@ TEST_F(GetDataFromFileTests, GetNextReturnsErrorWhenCannotAllocateData) { WillOnce(DoAll(testing::SetArgPointee<2>(asapo::ErrorTemplates::kMemoryAllocationError.Generate().release()), testing::Return(nullptr))); - auto err = data_broker->GetNext(&fi, &data); + auto err = data_broker->GetNext(&fi, "", &data); ASSERT_THAT(err->Explain(), Eq(asapo::ErrorTemplates::kMemoryAllocationError.Generate()->Explain())); } diff --git a/worker/api/cpp/unittests/test_server_broker.cpp b/worker/api/cpp/unittests/test_server_broker.cpp index 036c08d42c9d20c556bf1e6f1e3a23a6ee915da7..7c7216e33bba4f5b45c56cad457e71e2ee378f79 100644 --- a/worker/api/cpp/unittests/test_server_broker.cpp +++ b/worker/api/cpp/unittests/test_server_broker.cpp @@ -61,6 +61,8 @@ class ServerDataBrokerTests : public Test { std::string expected_path = "/tmp/beamline/beamtime"; std::string expected_filename = "filename"; std::string expected_full_path = std::string("/tmp/beamline/beamtime") + asapo::kPathSeparator + expected_filename; + std::string expected_group_id = "groupid"; + static const uint64_t expected_buf_id = 123; void SetUp() override { data_broker = std::unique_ptr<ServerDataBroker> { @@ -123,30 +125,32 @@ TEST_F(ServerDataBrokerTests, CanConnect) { } TEST_F(ServerDataBrokerTests, GetImageReturnsErrorOnWrongInput) { - auto return_code = data_broker->GetNext(nullptr, nullptr); + auto return_code = data_broker->GetNext(nullptr, "", nullptr); ASSERT_THAT(return_code->Explain(), Eq(asapo::WorkerErrorMessage::kWrongInput)); } TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/next?token=" + expected_token, _, + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id + "/next?token=" + + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), Return(""))); - data_broker->GetNext(&info, nullptr); + data_broker->GetNext(&info, expected_group_id, nullptr); } TEST_F(ServerDataBrokerTests, GetLastUsesCorrectUri) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/last?token=" + expected_token, _, + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id + "/last?token=" + + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), Return(""))); - data_broker->GetLast(&info, nullptr); + data_broker->GetLast(&info, expected_group_id, nullptr); } TEST_F(ServerDataBrokerTests, GetImageReturnsEOFFromHttpClient) { @@ -157,7 +161,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsEOFFromHttpClient) { SetArgPointee<2>(nullptr), Return("{\"id\":1}"))); - auto err = data_broker->GetNext(&info, nullptr); + auto err = data_broker->GetNext(&info, expected_group_id, nullptr); ASSERT_THAT(err, Ne(nullptr)); ASSERT_THAT(err->Explain(), HasSubstr("timeout")); @@ -171,7 +175,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsNotAuthorized) { SetArgPointee<2>(nullptr), Return(""))); - auto err = data_broker->GetNext(&info, nullptr); + auto err = data_broker->GetNext(&info, expected_group_id, nullptr); ASSERT_THAT(err, Ne(nullptr)); ASSERT_THAT(err->Explain(), HasSubstr("authorization")); @@ -186,7 +190,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsWrongResponseFromHttpClient) { SetArgPointee<2>(nullptr), Return("id"))); - auto err = data_broker->GetNext(&info, nullptr); + auto err = data_broker->GetNext(&info, expected_group_id, nullptr); ASSERT_THAT(err->Explain(), HasSubstr("Cannot parse")); } @@ -199,7 +203,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsIfBrokerAddressNotFound) { Return(""))); data_broker->SetTimeout(100); - auto err = data_broker->GetNext(&info, nullptr); + auto err = data_broker->GetNext(&info, expected_group_id, nullptr); ASSERT_THAT(err->Explain(), AllOf(HasSubstr("broker uri"), HasSubstr("cannot"))); } @@ -212,7 +216,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsIfBrokerUriEmpty) { Return(""))); data_broker->SetTimeout(100); - auto err = data_broker->GetNext(&info, nullptr); + auto err = data_broker->GetNext(&info, expected_group_id, nullptr); ASSERT_THAT(err->Explain(), AllOf(HasSubstr("broker uri"), HasSubstr("cannot"))); } @@ -222,12 +226,12 @@ TEST_F(ServerDataBrokerTests, GetDoNotCallBrokerUriIfAlreadyFound) { MockGet("error_response"); data_broker->SetTimeout(100); - data_broker->GetNext(&info, nullptr); + data_broker->GetNext(&info, expected_group_id, nullptr); Mock::VerifyAndClearExpectations(&mock_http_client); EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/discovery/broker"), _, _)).Times(0); MockGet("error_response"); - data_broker->GetNext(&info, nullptr); + data_broker->GetNext(&info, expected_group_id, nullptr); } TEST_F(ServerDataBrokerTests, GetBrokerUriAgainAfterConnectionError) { @@ -235,12 +239,12 @@ TEST_F(ServerDataBrokerTests, GetBrokerUriAgainAfterConnectionError) { MockGetError(); data_broker->SetTimeout(0); - data_broker->GetNext(&info, nullptr); + data_broker->GetNext(&info, expected_group_id, nullptr); Mock::VerifyAndClearExpectations(&mock_http_client); MockGetBrokerUri(); MockGet("error_response"); - data_broker->GetNext(&info, nullptr); + data_broker->GetNext(&info, expected_group_id, nullptr); } TEST_F(ServerDataBrokerTests, GetImageReturnsEOFFromHttpClientUntilTimeout) { @@ -258,7 +262,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsEOFFromHttpClientUntilTimeout) { Return("{\"id\":1}"))); data_broker->SetTimeout(100); - auto err = data_broker->GetNext(&info, nullptr); + auto err = data_broker->GetNext(&info, expected_group_id, nullptr); ASSERT_THAT(err->Explain(), HasSubstr("timeout")); } @@ -271,7 +275,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsFileInfo) { MockGet(json); - auto err = data_broker->GetNext(&info, nullptr); + auto err = data_broker->GetNext(&info, expected_group_id, nullptr); ASSERT_THAT(err, Eq(nullptr)); @@ -284,7 +288,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsFileInfo) { TEST_F(ServerDataBrokerTests, GetImageReturnsParseError) { MockGetBrokerUri(); MockGet("error_response"); - auto err = data_broker->GetNext(&info, nullptr); + auto err = data_broker->GetNext(&info, expected_group_id, nullptr); ASSERT_THAT(err->Explain(), Eq(asapo::WorkerErrorMessage::kErrorReadingSource)); } @@ -296,7 +300,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsIfNoDtataNeeded) { EXPECT_CALL(mock_netclient, GetData_t(_, _)).Times(0); EXPECT_CALL(mock_io, GetDataFromFile_t(_, _, _)).Times(0); - data_broker->GetNext(&info, nullptr); + data_broker->GetNext(&info, expected_group_id, nullptr); } TEST_F(ServerDataBrokerTests, GetImageTriesToGetDataFromMemoryCache) { @@ -309,7 +313,7 @@ TEST_F(ServerDataBrokerTests, GetImageTriesToGetDataFromMemoryCache) { EXPECT_CALL(mock_netclient, GetData_t(&info, &data)).WillOnce(Return(nullptr)); MockReadDataFromFile(0); - data_broker->GetNext(&info, &data); + data_broker->GetNext(&info, expected_group_id, &data); ASSERT_THAT(info.buf_id, Eq(expected_buf_id)); @@ -327,7 +331,7 @@ TEST_F(ServerDataBrokerTests, GetImageCallsReadFromFileIfCannotReadFromCache) { &data)).WillOnce(Return(asapo::IOErrorTemplates::kUnknownIOError.Generate().release())); MockReadDataFromFile(); - data_broker->GetNext(&info, &data); + data_broker->GetNext(&info, expected_group_id, &data); ASSERT_THAT(info.buf_id, Eq(0)); } @@ -344,7 +348,41 @@ TEST_F(ServerDataBrokerTests, GetImageCallsReadFromFileIfZeroBufId) { MockReadDataFromFile(); - data_broker->GetNext(&info, &data); + data_broker->GetNext(&info, expected_group_id, &data); +} + + +TEST_F(ServerDataBrokerTests, GenerateNewGroupIdReturnsErrorCreateGroup) { + MockGetBrokerUri(); + + EXPECT_CALL(mock_http_client, Post_t(HasSubstr("creategroup"), "", _, _)).Times(AtLeast(2)).WillRepeatedly(DoAll( + SetArgPointee<2>(HttpCode::BadRequest), + SetArgPointee<3>(nullptr), + Return(""))); + + data_broker->SetTimeout(100); + asapo::Error err; + auto groupid = data_broker->GenerateNewGroupId(&err); + if (err != nullptr ) { + ASSERT_THAT(err->Explain(), HasSubstr("timeout")); + } + ASSERT_THAT(groupid, Eq("")); +} + + +TEST_F(ServerDataBrokerTests, GenerateNewGroupIdReturnsGroupID) { + MockGetBrokerUri(); + + EXPECT_CALL(mock_http_client, Post_t(HasSubstr("creategroup"), "", _, _)).WillOnce(DoAll( + SetArgPointee<2>(HttpCode::OK), + SetArgPointee<3>(nullptr), + Return(expected_group_id))); + + data_broker->SetTimeout(100); + asapo::Error err; + auto groupid = data_broker->GenerateNewGroupId(&err); + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(groupid, Eq(expected_group_id)); } diff --git a/worker/api/python/asapo_worker.pxd b/worker/api/python/asapo_worker.pxd index 06c2a9e5eb8eb299e1f4ac0f578b2bf0e0b9b784..025c84eeee84604225cb8d4abc751cfb902f182d 100644 --- a/worker/api/python/asapo_worker.pxd +++ b/worker/api/python/asapo_worker.pxd @@ -28,9 +28,9 @@ cdef extern from "asapo_worker.h" namespace "asapo": cdef cppclass DataBroker: DataBroker() except + void SetTimeout(uint64_t timeout_ms) - Error GetNext(FileInfo* info, FileData* data) - Error GetLast(FileInfo* info, FileData* data) - + Error GetNext(FileInfo* info, string group_id, FileData* data) + Error GetLast(FileInfo* info, string group_id, FileData* data) + string GenerateNewGroupId(Error* err) cdef extern from "asapo_worker.h" namespace "asapo": cdef cppclass DataBrokerFactory: diff --git a/worker/api/python/asapo_worker.pyx.in b/worker/api/python/asapo_worker.pyx.in index 8313f7539b6a79b57d9050faa6d1b64a8cb18b67..2bb14cdedb4d813d3de1d0fab71f714dbf62fa2b 100644 --- a/worker/api/python/asapo_worker.pyx.in +++ b/worker/api/python/asapo_worker.pyx.in @@ -26,15 +26,15 @@ cdef bytes _bytes(s): cdef class PyDataBroker: cdef DataBroker* c_broker - def _op(self, op, meta_only): + def _op(self, op, group_id, meta_only): cdef FileInfo info cdef FileData data cdef Error err cdef np.npy_intp dims[1] if op == "next": - err = self.c_broker.GetNext(&info,<FileData*>NULL if meta_only else &data) + err = self.c_broker.GetNext(&info, _bytes(group_id), <FileData*>NULL if meta_only else &data) elif op == "last": - err = self.c_broker.GetLast(&info,<FileData*>NULL if meta_only else &data) + err = self.c_broker.GetLast(&info,_bytes(group_id), <FileData*>NULL if meta_only else &data) err_str = _str(GetErrorString(&err)) if err_str.strip(): return None,None,err_str @@ -49,13 +49,19 @@ cdef class PyDataBroker: del meta['lastchange'] arr = np.PyArray_SimpleNewFromData(1, dims, np.NPY_BYTE, ptr) return arr,meta,None - - - def get_next(self, meta_only = True): - return self._op("next",meta_only) - def get_last(self, meta_only = True): - return self._op("last",meta_only) - + def get_next(self, group_id, meta_only = True): + return self._op("next",group_id,meta_only) + def get_last(self, group_id, meta_only = True): + return self._op("last",group_id,meta_only) + def generate_group_id(self): + cdef Error err + cdef string group_id + group_id = self.c_broker.GenerateNewGroupId(&err) + err_str = _str(GetErrorString(&err)) + if err_str.strip(): + return None, err_str + else: + return _str(group_id), None cdef class PyDataBrokerFactory: cdef DataBrokerFactory c_factory