diff --git a/broker/src/asapo_broker/database/database.go b/broker/src/asapo_broker/database/database.go index 452419aeb1b0e1e00677a26fe1632e8cb76cc87c..7f31bf1204d256edcbe7f9c9e4f371ba8415d377 100644 --- a/broker/src/asapo_broker/database/database.go +++ b/broker/src/asapo_broker/database/database.go @@ -5,6 +5,11 @@ type Agent interface { Ping() error Connect(string) error Close() + SetSettings(settings DBSettings) +} + +type DBSettings struct { + ReadFromInprocessPeriod int } type DBError struct { diff --git a/broker/src/asapo_broker/database/mock_database.go b/broker/src/asapo_broker/database/mock_database.go index be469668f5f8be780961c32ea7ce1656f76f7fc7..3797f8afb6d9fa006fb3b30ac488d66a55c59c11 100644 --- a/broker/src/asapo_broker/database/mock_database.go +++ b/broker/src/asapo_broker/database/mock_database.go @@ -24,6 +24,11 @@ func (db *MockedDatabase) Ping() error { return args.Error(0) } +func (db *MockedDatabase) SetSettings(settings DBSettings) { + db.Called() +} + + func (db *MockedDatabase) ProcessRequest(db_name string, data_collection_name string, group_id string, op string, extra_param string) (answer []byte, err error) { args := db.Called(db_name, data_collection_name, group_id, op, extra_param) return args.Get(0).([]byte), args.Error(1) diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index ff9c815b12880cf595ee0f97955615f9a935beca..7cd881b32d7c91bfe278bf2f483a3df21026b73f 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -15,6 +15,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" ) @@ -28,15 +29,20 @@ type ServiceRecord struct { Meta map[string]interface{} `json:"meta"` } +type InProcessingRecord struct { + ID int `bson:"_id" json:"_id"` + Attempts int + Updated int64 +} + type Nacks struct { - Unacknowledged []int `json:"unacknowledged"` + Unacknowledged []int `json:"unacknowledged"` } type LastAck struct { ID int `bson:"_id" json:"lastAckId"` } - type SubstreamsRecord struct { Substreams []string `bson:"substreams" json:"substreams"` } @@ -48,6 +54,7 @@ type LocationPointer struct { const data_collection_name_prefix = "data_" const acks_collection_name_prefix = "acks_" +const inprocess_collection_name_prefix = "inprocess_" const meta_collection_name = "meta" const pointer_collection_name = "current_location" const pointer_field_name = "current_pointer" @@ -58,8 +65,6 @@ const already_connected_msg = "already connected" const finish_substream_keyword = "asapo_finish_substream" const no_next_substream_keyword = "asapo_no_next" -var dbListLock sync.RWMutex -var dbPointersLock sync.RWMutex var dbSessionLock sync.RWMutex type SizeRecord struct { @@ -67,9 +72,15 @@ type SizeRecord struct { } type Mongodb struct { - client *mongo.Client - timeout time.Duration - parent_db *Mongodb + client *mongo.Client + timeout time.Duration + parent_db *Mongodb + settings DBSettings + lastReadFromInprocess int64 +} + +func (db *Mongodb) SetSettings(settings DBSettings) { + db.settings = settings } func (db *Mongodb) Ping() (err error) { @@ -98,7 +109,8 @@ func (db *Mongodb) Connect(address string) (err error) { return err } - // db.client.SetSafe(&mgo.Safe{J: true}) + atomic.StoreInt64(&db.lastReadFromInprocess, time.Now().Unix()) + return db.Ping() } @@ -163,10 +175,14 @@ func (db *Mongodb) getMaxIndex(dbname string, collection_name string, dataset bo func duplicateError(err error) bool { command_error, ok := err.(mongo.CommandError) - if (!ok) { - return false + if !ok { + write_exception_error, ok1 := err.(mongo.WriteException) + if !ok1 { + return false + } + return strings.Contains(write_exception_error.Error(),"duplicate key") } - return command_error.Name=="DuplicateKey" + return command_error.Name == "DuplicateKey" } func (db *Mongodb) setCounter(dbname string, collection_name string, group_id string, ind int) (err error) { @@ -186,9 +202,9 @@ func (db *Mongodb) incrementField(dbname string, collection_name string, group_i err = c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res) if err != nil { -// duplicateerror can happen because we set Upsert=true to allow insert pointer when it is absent. But then it will try to insert -// pointer in case query found nothing, also when pointer exists and equal to max_ind. Here we have to return NoData - if err == mongo.ErrNoDocuments || duplicateError(err) { + // duplicateerror can happen because we set Upsert=true to allow insert pointer when it is absent. But then it will try to insert + // pointer in case query found nothing, also when pointer exists and equal to max_ind. Here we have to return NoData + if err == mongo.ErrNoDocuments || duplicateError(err) { return &DBError{utils.StatusNoData, encodeAnswer(max_ind, max_ind, "")} } return &DBError{utils.StatusTransactionInterrupted, err.Error()} @@ -255,10 +271,17 @@ func (db *Mongodb) ackRecord(dbname string, collection_name string, group_id str }{id} c := db.client.Database(dbname).Collection(acks_collection_name_prefix + collection_name + "_" + group_id) _, err = c.InsertOne(context.Background(), &record) - return []byte(""),err -} + if err==nil { + c = db.client.Database(dbname).Collection(inprocess_collection_name_prefix+group_id) + _,err_del := c.DeleteOne(context.Background(),bson.M{"_id":id}) + if err_del != nil { + return nil, &DBError{utils.StatusWrongInput, err.Error()} + } + } + return []byte(""), err +} func (db *Mongodb) getParentDB() *Mongodb { if db.parent_db == nil { @@ -273,7 +296,7 @@ func (db *Mongodb) checkDatabaseOperationPrerequisites(db_name string, collectio return &DBError{utils.StatusServiceUnavailable, no_session_msg} } - if len(db_name)==0 || len(collection_name) ==0 { + if len(db_name) == 0 || len(collection_name) == 0 { return &DBError{utils.StatusWrongInput, "beamtime_id ans substream must be set"} } @@ -286,7 +309,7 @@ func (db *Mongodb) getCurrentPointer(db_name string, collection_name string, gro return LocationPointer{}, 0, err } - if (max_ind == 0) { + if max_ind == 0 { return LocationPointer{}, 0, &DBError{utils.StatusNoData, encodeAnswer(0, 0, "")} } @@ -317,20 +340,102 @@ func processLastRecord(data []byte, collection_name string, err error) ([]byte, return nil, &DBError{utils.StatusNoData, answer} } -func (db *Mongodb) getNextRecord(db_name string, collection_name string, group_id string, dataset bool) ([]byte, error) { - curPointer, max_ind, err := db.getCurrentPointer(db_name, collection_name, group_id, dataset) +func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, resendAfter int) (int, error) { + var res InProcessingRecord + opts := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After) + tNow := time.Now().Unix() + update := bson.M{"$set": bson.M{"updated": tNow}, "$inc": bson.M{"attempts": 1}} + q := bson.M{"updated": bson.M{"$lte": tNow - int64(resendAfter)}} + c := db.client.Database(dbname).Collection(collection_name) + err := c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(&res) if err != nil { - log_str := "error getting next pointer for " + db_name + ", groupid: " + group_id + ":" + err.Error() - logger.Debug(log_str) - return nil, err + if err == mongo.ErrNoDocuments { + return 0, nil + } + return 0, err } - log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + db_name + ", groupid: " + group_id + + log_str := "got unprocessed id " + strconv.Itoa(res.ID) + " for " + dbname logger.Debug(log_str) - data, err := db.getRecordByIDRow(db_name, collection_name, curPointer.Value, max_ind, dataset) - if curPointer.Value != max_ind { + return res.ID, nil +} + +func (db *Mongodb) InsertToInprocessIfNeeded(db_name string, collection_name string, id int, extra_param string) (error) { + if len(extra_param)==0 { + return nil + } + + record := InProcessingRecord{ + id,0,time.Now().Unix(), + } + + c := db.client.Database(db_name).Collection(collection_name) + _, err := c.InsertOne(context.TODO(), &record) + if duplicateError(err) { + return nil + } + return err +} + +func (db *Mongodb) getNextRecord(db_name string, collection_name string, group_id string, dataset bool, extra_param string) ([]byte, error) { + record_ind := 0 + max_ind := 0 + if len(extra_param) > 0 { + resendAfter, _, err := extractsTwoIntsFromString(extra_param) + if err != nil { + return nil, err + } + tNow := time.Now().Unix() + if atomic.LoadInt64(&db.lastReadFromInprocess) <= tNow-int64(db.settings.ReadFromInprocessPeriod) { + record_ind, err = db.getUnProcessedId(db_name, inprocess_collection_name_prefix+group_id, resendAfter) + if err != nil { + log_str := "error getting unprocessed id " + db_name + ", groupid: " + group_id + ":" + err.Error() + logger.Debug(log_str) + return nil, err + } + } + if record_ind != 0 { + max_ind, err = db.getMaxIndex(db_name, collection_name, dataset) + if err != nil { + return nil, err + } + } + } + + if record_ind == 0 { + var curPointer LocationPointer + var err error + curPointer, max_ind, err = db.getCurrentPointer(db_name, collection_name, group_id, dataset) + if err != nil { + log_str := "error getting next pointer for " + db_name + ", groupid: " + group_id + ":" + err.Error() + logger.Debug(log_str) + return nil, err + } + log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + db_name + ", groupid: " + group_id + logger.Debug(log_str) + record_ind = curPointer.Value + } + + + data, err := db.getRecordByIDRow(db_name, collection_name, record_ind, max_ind, dataset) + if record_ind != max_ind { + if err==nil { + err_update := db.InsertToInprocessIfNeeded(db_name,inprocess_collection_name_prefix+group_id,record_ind,extra_param) + if err_update!=nil { + return nil,err_update + } + } return data, err } - return processLastRecord(data, collection_name, err) + data,err = processLastRecord(data, collection_name, err) + if err==nil { + err_update := db.InsertToInprocessIfNeeded(db_name,inprocess_collection_name_prefix+group_id,record_ind,extra_param) + if err_update!=nil { + return nil,err_update + } + } + + return data, err } func (db *Mongodb) getLastRecord(db_name string, collection_name string, group_id string, dataset bool) ([]byte, error) { @@ -448,7 +553,6 @@ func (db *Mongodb) getSubstreams(db_name string) ([]byte, error) { return json.Marshal(&rec) } - func makeRange(min, max int) []int { a := make([]int, max-min+1) for i := range a { @@ -457,32 +561,32 @@ func makeRange(min, max int) []int { return a } -func extractsLimitsFromString(from_to string) (int,int,error) { +func extractsTwoIntsFromString(from_to string) (int, int, error) { s := strings.Split(from_to, "_") - if len(s)!=2 { - return 0,0,errors.New("wrong format: "+from_to) + if len(s) != 2 { + return 0, 0, errors.New("wrong format: " + from_to) } from, err := strconv.Atoi(s[0]) if err != nil { - return 0,0, err + return 0, 0, err } to, err := strconv.Atoi(s[1]) if err != nil { - return 0,0, err + return 0, 0, err } - return from,to,nil + return from, to, nil } -func (db *Mongodb) nacks(db_name string, collection_name string, group_id string,from_to string) ([]byte, error) { - from, to, err := extractsLimitsFromString(from_to) - if err!=nil { - return nil,err +func (db *Mongodb) nacks(db_name string, collection_name string, group_id string, from_to string) ([]byte, error) { + from, to, err := extractsTwoIntsFromString(from_to) + if err != nil { + return nil, err } - if from==0 { + if from == 0 { from = 1 } @@ -494,11 +598,11 @@ func (db *Mongodb) nacks(db_name string, collection_name string, group_id string } res := Nacks{[]int{}} - if (to == 0) { + if to == 0 { return utils.MapToJson(&res) } - res.Unacknowledged, err = db.getNacks(db_name,collection_name,group_id,from,to) + res.Unacknowledged, err = db.getNacks(db_name, collection_name, group_id, from, to) if err != nil { return nil, err } @@ -515,19 +619,18 @@ func (db *Mongodb) lastAck(db_name string, collection_name string, group_id stri if err == mongo.ErrNoDocuments { return utils.MapToJson(&result) } - if err!=nil { + if err != nil { return nil, err } return utils.MapToJson(&result) } - -func (db *Mongodb) getNacks(db_name string, collection_name string, group_id string, min_index,max_index int) ([]int,error) { +func (db *Mongodb) getNacks(db_name string, collection_name string, group_id string, min_index, max_index int) ([]int, error) { c := db.client.Database(db_name).Collection(acks_collection_name_prefix + collection_name + "_" + group_id) - if (min_index > max_index) { + if min_index > max_index { return []int{}, errors.New("from index is greater than to index") } @@ -536,16 +639,15 @@ func (db *Mongodb) getNacks(db_name string, collection_name string, group_id str return []int{}, err } - if (size == 0) { - return makeRange(min_index,max_index), nil + if size == 0 { + return makeRange(min_index, max_index), nil } if min_index == 1 && int(size) == max_index { return []int{}, nil } - - matchStage := bson.D{{"$match", bson.D{{"_id", bson.D{{"$lt",max_index+1},{"$gt",min_index-1}}}}}} + matchStage := bson.D{{"$match", bson.D{{"_id", bson.D{{"$lt", max_index + 1}, {"$gt", min_index - 1}}}}}} groupStage := bson.D{ {"$group", bson.D{ {"_id", 0}, @@ -557,22 +659,22 @@ func (db *Mongodb) getNacks(db_name string, collection_name string, group_id str {"$project", bson.D{ {"_id", 0}, {"numbers", bson.D{ - {"$setDifference", bson.A{bson.D{{"$range",bson.A{min_index,max_index+1}}},"$numbers"}}, + {"$setDifference", bson.A{bson.D{{"$range", bson.A{min_index, max_index + 1}}}, "$numbers"}}, }}}, }} - query := mongo.Pipeline{matchStage, groupStage,projectStage} + query := mongo.Pipeline{matchStage, groupStage, projectStage} cursor, err := c.Aggregate(context.Background(), query) type res struct { Numbers []int } resp := []res{} - err = cursor.All(context.Background(),&resp) - if err!= nil || len(resp)!=1 { + err = cursor.All(context.Background(), &resp) + if err != nil || len(resp) != 1 { return []int{}, err } - return resp[0].Numbers,nil + return resp[0].Numbers, nil } func (db *Mongodb) ProcessRequest(db_name string, collection_name string, group_id string, op string, extra_param string) (answer []byte, err error) { @@ -588,7 +690,7 @@ func (db *Mongodb) ProcessRequest(db_name string, collection_name string, group_ switch op { case "next": - return db.getNextRecord(db_name, collection_name, group_id, dataset) + return db.getNextRecord(db_name, collection_name, group_id, dataset, extra_param) case "id": return db.getRecordByID(db_name, collection_name, group_id, extra_param, dataset) case "last": diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index f84efe1c0677ea8bd631793d59629a34f82ef1f2..54a923b8708cd5ec65d80f6da90f283032189a69 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "sync" "testing" + "time" ) type TestRecord struct { @@ -137,6 +138,7 @@ func TestMongoDBGetNextOK(t *testing.T) { assert.Equal(t, string(rec1_expect), string(res)) } +/* func TestMongoDBGetNextErrorOnFinishedStream(t *testing.T) { db.Connect(dbaddress) defer cleanup() @@ -149,7 +151,7 @@ func TestMongoDBGetNextErrorOnFinishedStream(t *testing.T) { assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":2,\"next_substream\":\"next1\"}", err.(*DBError).Message) } - +*/ func TestMongoDBGetNextErrorOnNoMoreData(t *testing.T) { db.Connect(dbaddress) defer cleanup() @@ -189,7 +191,7 @@ func insertRecords(n int) { record.Name = string(ind) db.insertRecord(dbname, collection, &record) } - + time.Sleep(time.Millisecond*100) } func getRecords(n int) []int { @@ -199,10 +201,12 @@ func getRecords(n int) []int { for i := 0; i < n; i++ { go func() { defer wg.Done() - res_bin, _ := db.ProcessRequest(dbname, collection, groupId, "next", "") + res_bin, _:= db.ProcessRequest(dbname, collection, groupId, "next", "") var res TestRecord json.Unmarshal(res_bin, &res) - results[res.ID-1] = 1 + if res.ID>0 { + results[res.ID-1] = 1 + } }() } wg.Wait() @@ -225,7 +229,7 @@ func TestMongoDBGetLastAfterErasingDatabase(t *testing.T) { db.Connect(dbaddress) defer cleanup() insertRecords(10) - db.ProcessRequest(dbname, collection, groupId, "next", "0") + db.ProcessRequest(dbname, collection, groupId, "next", "") db.dropDatabase(dbname) db.insertRecord(dbname, collection, &rec1) @@ -240,7 +244,7 @@ func TestMongoDBGetNextAfterErasingDatabase(t *testing.T) { db.Connect(dbaddress) defer cleanup() insertRecords(200) - db.ProcessRequest(dbname, collection, groupId, "next", "0") + db.ProcessRequest(dbname, collection, groupId, "next", "") db.dropDatabase(dbname) n := 100 @@ -253,10 +257,10 @@ func TestMongoDBGetNextEmptyAfterErasingDatabase(t *testing.T) { db.Connect(dbaddress) defer cleanup() insertRecords(10) - db.ProcessRequest(dbname, collection, groupId, "next", "0") + db.ProcessRequest(dbname, collection, groupId, "next", "") db.dropDatabase(dbname) - _, err := db.ProcessRequest(dbname, collection, groupId, "next", "0") + _, err := db.ProcessRequest(dbname, collection, groupId, "next", "") assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_substream\":\"\"}", err.Error()) } @@ -284,7 +288,7 @@ func TestMongoDBGetRecordNext(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0") + res, err := db.ProcessRequest(dbname, collection, groupId, "next", "") assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -295,8 +299,8 @@ func TestMongoDBGetRecordNextMultipleCollections(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection2, &rec_dataset1) - res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0") - res_string, err2 := db.ProcessRequest(dbname, collection2, groupId, "next_dataset", "0") + res, err := db.ProcessRequest(dbname, collection, groupId, "next", "") + res_string, err2 := db.ProcessRequest(dbname, collection2, groupId, "next_dataset", "") var res_ds TestDataset json.Unmarshal(res_string, &res_ds) @@ -348,7 +352,7 @@ func TestMongoDBGetNextAfterGetLastCorrect(t *testing.T) { db.insertRecord(dbname, collection, &rec3) - res, err = db.ProcessRequest(dbname, collection, groupId, "next", "0") + res, err = db.ProcessRequest(dbname, collection, groupId, "next", "") assert.Nil(t, err) assert.Equal(t, string(rec3_expect), string(res)) @@ -398,7 +402,7 @@ func TestMongoDBResetCounter(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "0") + res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "") assert.Nil(t, err1) assert.Equal(t, string(rec1_expect), string(res1)) @@ -406,18 +410,19 @@ func TestMongoDBResetCounter(t *testing.T) { _, err_reset := db.ProcessRequest(dbname, collection, groupId, "resetcounter", "1") assert.Nil(t, err_reset) - res2, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "0") + res2, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "") assert.Nil(t, err2) assert.Equal(t, string(rec2_expect), string(res2)) } func TestMongoDBGetMetaOK(t *testing.T) { + recm := rec1 db.Connect(dbaddress) defer cleanup() - rec1.ID = metaID - rec_expect, _ := json.Marshal(rec1) - db.insertMeta(dbname, &rec1) + recm.ID = metaID + rec_expect, _ := json.Marshal(recm) + db.insertMeta(dbname, &recm) res, err := db.ProcessRequest(dbname, collection, "", "meta", metaID_str) @@ -546,7 +551,7 @@ func TestMongoDBGetDataset(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1) - res_string, err := db.ProcessRequest(dbname, collection, groupId, "next_dataset", "0") + res_string, err := db.ProcessRequest(dbname, collection, groupId, "next_dataset", "") assert.Nil(t, err) @@ -562,7 +567,7 @@ func TestMongoDBNoDataOnNotCompletedFirstDataset(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1_incomplete) - res_string, err := db.ProcessRequest(dbname, collection, groupId, "next_dataset", "0") + res_string, err := db.ProcessRequest(dbname, collection, groupId, "next_dataset", "") assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_substream\":\"\"}", err.(*DBError).Message) @@ -735,3 +740,5 @@ func TestMongoDBLastAcks(t *testing.T) { cleanup() } } + + diff --git a/broker/src/asapo_broker/server/get_commands_test.go b/broker/src/asapo_broker/server/get_commands_test.go index 69f05497cb840c077cec02dae62e48a0cb4e463a..2411265668315b21280d1a7f8401b75ea991ef37 100644 --- a/broker/src/asapo_broker/server/get_commands_test.go +++ b/broker/src/asapo_broker/server/get_commands_test.go @@ -37,16 +37,19 @@ var testsGetCommand = []struct { substream string groupid string reqString string + queryParams string externalParam string }{ - {"last", expectedSubstream, expectedGroupID, expectedSubstream + "/" + expectedGroupID + "/last","0"}, - {"id", expectedSubstream, expectedGroupID, expectedSubstream + "/" + expectedGroupID + "/1","1"}, - {"meta", "default", "", "default/0/meta/0","0"}, - {"nacks", expectedSubstream, expectedGroupID, expectedSubstream + "/" + expectedGroupID + "/nacks","0_0"}, - {"next", expectedSubstream, expectedGroupID, expectedSubstream + "/" + expectedGroupID + "/next","0"}, - {"size", expectedSubstream, "", expectedSubstream + "/size","0"}, - {"substreams", "0", "", "0/substreams",""}, - {"lastack", expectedSubstream, expectedGroupID, expectedSubstream + "/" + expectedGroupID + "/lastack",""}, + {"last", expectedSubstream, expectedGroupID, expectedSubstream + "/" + expectedGroupID + "/last","","0"}, + {"id", expectedSubstream, expectedGroupID, expectedSubstream + "/" + expectedGroupID + "/1","","1"}, + {"meta", "default", "", "default/0/meta/0","","0"}, + {"nacks", expectedSubstream, expectedGroupID, expectedSubstream + "/" + expectedGroupID + "/nacks","","0_0"}, + {"next", expectedSubstream, expectedGroupID, expectedSubstream + "/" + expectedGroupID + "/next","",""}, + {"next", expectedSubstream, expectedGroupID, expectedSubstream + "/" + + expectedGroupID + "/next","&resend_nacks=true&resend_after=10&resend_attempts=3","10_3"}, + {"size", expectedSubstream, "", expectedSubstream + "/size","","0"}, + {"substreams", "0", "", "0/substreams","",""}, + {"lastack", expectedSubstream, expectedGroupID, expectedSubstream + "/" + expectedGroupID + "/lastack","",""}, } @@ -55,7 +58,7 @@ func (suite *GetCommandsTestSuite) TestGetCommandsCallsCorrectRoutine() { for _, test := range testsGetCommand { suite.mock_db.On("ProcessRequest", expectedDBName, test.substream, test.groupid, test.command, test.externalParam).Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request "+test.command))) - w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + test.reqString+correctTokenSuffix) + w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + test.reqString+correctTokenSuffix+test.queryParams) suite.Equal(http.StatusOK, w.Code, test.command+ " OK") suite.Equal("Hello", string(w.Body.Bytes()), test.command+" sends data") } diff --git a/broker/src/asapo_broker/server/get_health_test.go b/broker/src/asapo_broker/server/get_health_test.go index 3675fcbb0a3e6defc90f17b1ae51cbd5f7bfbefb..a318c8d953d8ce3abc179f7fbdc20076a17b3785 100644 --- a/broker/src/asapo_broker/server/get_health_test.go +++ b/broker/src/asapo_broker/server/get_health_test.go @@ -4,6 +4,7 @@ import ( "asapo_broker/database" "asapo_common/logger" "errors" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "net/http" "testing" @@ -39,6 +40,8 @@ func (suite *GetHealthTestSuite) TestGetHealthOk() { func (suite *GetHealthTestSuite) TestGetHealthTriesToReconnectsToDataBase() { suite.mock_db.On("Ping").Return(errors.New("ping error")) + suite.mock_db.On("SetSettings", mock.Anything).Return() + ExpectReconnect(suite.mock_db) w := doRequest("/health") diff --git a/broker/src/asapo_broker/server/get_next.go b/broker/src/asapo_broker/server/get_next.go index dc66ce677d872e3700d343739ca7b487d4577800..41dd9f1dbfacdd249060cc85ab2462e9812d3c4e 100644 --- a/broker/src/asapo_broker/server/get_next.go +++ b/broker/src/asapo_broker/server/get_next.go @@ -4,6 +4,21 @@ import ( "net/http" ) + + +func extractResend(r *http.Request) (string) { + keys := r.URL.Query() + resend := keys.Get("resend_nacks") + resend_after := keys.Get("resend_after") + resend_attempts := keys.Get("resend_attempts") + resend_params := "" + if len(resend)!=0 { + resend_params=resend_after+"_"+resend_attempts + } + return resend_params +} + + func routeGetNext(w http.ResponseWriter, r *http.Request) { - processRequest(w, r, "next", "0", true) + processRequest(w, r, "next", extractResend(r), true) } diff --git a/broker/src/asapo_broker/server/process_request_test.go b/broker/src/asapo_broker/server/process_request_test.go index 3eec6bbbc6e69afaeeb1dfbf25a1cbfe636a18ec..1757decba923ba6e98e2b107b2f7b6c3f168b4f1 100644 --- a/broker/src/asapo_broker/server/process_request_test.go +++ b/broker/src/asapo_broker/server/process_request_test.go @@ -79,6 +79,8 @@ func TestProcessRequestWithoutDatabaseName(t *testing.T) { func ExpectReconnect(mock_db *database.MockedDatabase) { mock_db.On("Close").Return() mock_db.On("Connect", mock.AnythingOfType("string")).Return(nil) + mock_db.On("SetSettings", mock.Anything).Return() + } type ProcessRequestTestSuite struct { @@ -121,7 +123,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithNoToken() { } func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "0").Return([]byte(""), + suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "").Return([]byte(""), &database.DBError{utils.StatusNoData, ""}) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next"))) @@ -132,7 +134,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() } func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "0").Return([]byte(""), + suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "").Return([]byte(""), &database.DBError{utils.StatusServiceUnavailable, ""}) logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next"))) @@ -145,7 +147,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() { } func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "0").Return([]byte(""), errors.New("")) + suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "").Return([]byte(""), errors.New("")) logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next"))) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("reconnected"))) @@ -157,7 +159,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() { } func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "0").Return([]byte("Hello"), nil) + suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "").Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName))) doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/next" + correctTokenSuffix) @@ -171,7 +173,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWrongGroupID() { } func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next_dataset", "0").Return([]byte("Hello"), nil) + suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next_dataset", "").Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next_dataset in "+expectedDBName))) doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/next" + correctTokenSuffix + "&dataset=true") diff --git a/broker/src/asapo_broker/server/server.go b/broker/src/asapo_broker/server/server.go index cc69db83fe47aa379794d2f3edfde231eb992b5b..649764fe780a09a35d67b065344474ce166b8f91 100644 --- a/broker/src/asapo_broker/server/server.go +++ b/broker/src/asapo_broker/server/server.go @@ -72,6 +72,8 @@ func InitDB(dbAgent database.Agent) (err error) { log.Debug("Got mongodb server: " + settings.discoveredDbAddress) } + db.SetSettings(database.DBSettings{ReadFromInprocessPeriod: 10}) + return db.Connect(settings.GetDatabaseServer()) } diff --git a/broker/src/asapo_broker/server/server_test.go b/broker/src/asapo_broker/server/server_test.go index 39967ed7591e81cb5c6af5fe0dcb401cb1daaced..d1c9957df9504d5ad8deb6b6bc37c4136edc16ef 100644 --- a/broker/src/asapo_broker/server/server_test.go +++ b/broker/src/asapo_broker/server/server_test.go @@ -14,12 +14,16 @@ import ( func setup() *database.MockedDatabase { mock_db := new(database.MockedDatabase) mock_db.On("Connect", mock.AnythingOfType("string")).Return(nil) + mock_db.On("SetSettings", mock.Anything).Return() + return mock_db } func setup_and_init(t *testing.T) *database.MockedDatabase { mock_db := new(database.MockedDatabase) mock_db.On("Connect", mock.AnythingOfType("string")).Return(nil) + mock_db.On("SetSettings", mock.Anything).Return() + InitDB(mock_db) assertExpectations(t, mock_db) return mock_db @@ -50,6 +54,7 @@ func TestInitDBWithWrongAddress(t *testing.T) { for _, test := range initDBTests { mock_db.On("Connect", "0.0.0.0:0000").Return(test.answer) + mock_db.On("SetSettings", mock.Anything).Return() err := InitDB(mock_db) @@ -75,6 +80,8 @@ func TestInitDBWithAutoAddress(t *testing.T) { discoveryService = discoveryAPI{mock_server.Client(), mock_server.URL} mock_db.On("Connect", "0.0.0.0:0000").Return(nil) + mock_db.On("SetSettings", mock.Anything).Return() + err := InitDB(mock_db) assert.Equal(t, nil, err, "auto connect ok") @@ -101,6 +108,7 @@ func TestReconnectDB(t *testing.T) { mock_db.On("Close").Return() mock_db.On("Connect", "1.0.0.0:0000").Return(nil) + mock_db.On("SetSettings", mock.Anything).Return() err := ReconnectDb() assert.Equal(t, nil, err, "auto connect ok") diff --git a/consumer/api/cpp/include/consumer/data_broker.h b/consumer/api/cpp/include/consumer/data_broker.h index 922e7c0124258609b1f3c4841dc920704d3adb07..78c0df9f768204bc191ee6de57dd0de5b0e0aee1 100644 --- a/consumer/api/cpp/include/consumer/data_broker.h +++ b/consumer/api/cpp/include/consumer/data_broker.h @@ -32,8 +32,6 @@ class DataBroker { */ virtual Error Acknowledge(std::string group_id, uint64_t id, std::string substream = kDefaultSubstream) = 0; - - //! Get unacknowledged tuple for specific group id and substream. /*! \param group_id - group id to use. @@ -112,7 +110,6 @@ class DataBroker { virtual DataSet GetLastDataset(std::string group_id, Error* err) = 0; virtual DataSet GetLastDataset(std::string group_id, std::string substream, Error* err) = 0; - //! Receive dataset by id. /*! \param id - dataset id @@ -143,8 +140,6 @@ class DataBroker { virtual uint64_t GetLastAcknowledgedTulpeId(std::string group_id, std::string substream, Error* error) = 0; virtual uint64_t GetLastAcknowledgedTulpeId(std::string group_id, Error* error) = 0; - - //! Receive last available image. /*! \param info - where to store image metadata. Can be set to nullptr only image data is needed. @@ -164,6 +159,15 @@ class DataBroker { virtual FileInfos QueryImages(std::string query, Error* err) = 0; virtual FileInfos QueryImages(std::string query, std::string substream, Error* err) = 0; + //! Configure resending nonacknowledged data + /*! + \param resend - where to resend + \param resend_after - how many seconds to wait for acknowledgment + \param resend_attempts - how many time to resend. + */ + virtual void SetResendNacs(bool resend, uint64_t resend_after, uint64_t resend_attempts) = 0; + + virtual ~DataBroker() = default; // needed for unique_ptr to delete itself }; diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp index 630954b36e466a0b0f6b77c85578cd48bf01f127..95d5b1a5e80f58635bf9361b772091a2f5c0652f 100644 --- a/consumer/api/cpp/src/server_data_broker.cpp +++ b/consumer/api/cpp/src/server_data_broker.cpp @@ -17,17 +17,17 @@ namespace asapo { const std::string ServerDataBroker::kBrokerServiceName = "asapo-broker"; const std::string ServerDataBroker::kFileTransferServiceName = "asapo-file-transfer"; -Error GetNoDataResponseFromJson(const std::string& json_string, ConsumerErrorData* data) { +Error GetNoDataResponseFromJson(const std::string &json_string, ConsumerErrorData* data) { JsonStringParser parser(json_string); Error err; if ((err = parser.GetUInt64("id", &data->id)) || (err = parser.GetUInt64("id_max", &data->id_max)) - || (err = parser.GetString("next_substream", &data->next_substream))) { + || (err = parser.GetString("next_substream", &data->next_substream))) { return err; } return nullptr; } -Error ConsumerErrorFromNoDataResponse(const std::string& response) { +Error ConsumerErrorFromNoDataResponse(const std::string &response) { if (response.find("get_record_by_id") != std::string::npos) { ConsumerErrorData data; auto parse_error = GetNoDataResponseFromJson(response, &data); @@ -42,40 +42,34 @@ Error ConsumerErrorFromNoDataResponse(const std::string& response) { err = ConsumerErrorTemplates::kNoData.Generate(); } ConsumerErrorData* error_data = new ConsumerErrorData{data}; - err->SetCustomData(std::unique_ptr<CustomErrorData> {error_data}); + err->SetCustomData(std::unique_ptr<CustomErrorData>{error_data}); return err; } return ConsumerErrorTemplates::kNoData.Generate(); } -Error ConsumerErrorFromHttpCode(const RequestOutput* response, const HttpCode& code) { +Error ConsumerErrorFromHttpCode(const RequestOutput* response, const HttpCode &code) { switch (code) { - case HttpCode::OK: - return nullptr; - case HttpCode::BadRequest: - return ConsumerErrorTemplates::kWrongInput.Generate(response->to_string()); - case HttpCode::Unauthorized: - return ConsumerErrorTemplates::kWrongInput.Generate(response->to_string()); - case HttpCode::InternalServerError: - return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response->to_string()); - case HttpCode::NotFound: - return ConsumerErrorTemplates::kUnavailableService.Generate(response->to_string()); - case HttpCode::Conflict: - return ConsumerErrorFromNoDataResponse(response->to_string()); - default: - return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response->to_string()); - } -} -Error ConsumerErrorFromServerError(const Error& server_err) { + case HttpCode::OK:return nullptr; + case HttpCode::BadRequest:return ConsumerErrorTemplates::kWrongInput.Generate(response->to_string()); + case HttpCode::Unauthorized:return ConsumerErrorTemplates::kWrongInput.Generate(response->to_string()); + case HttpCode::InternalServerError:return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response->to_string()); + case HttpCode::NotFound:return ConsumerErrorTemplates::kUnavailableService.Generate(response->to_string()); + case HttpCode::Conflict:return ConsumerErrorFromNoDataResponse(response->to_string()); + default:return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response->to_string()); + } +} +Error ConsumerErrorFromServerError(const Error &server_err) { if (server_err == HttpErrorTemplates::kTransferError) { return ConsumerErrorTemplates::kInterruptedTransaction.Generate( - "error processing request: " + server_err->Explain()); + "error processing request: " + server_err->Explain()); } else { - return ConsumerErrorTemplates::kUnavailableService.Generate("error processing request: " + server_err->Explain()); + return ConsumerErrorTemplates::kUnavailableService.Generate( + "error processing request: " + server_err->Explain()); } } -Error ProcessRequestResponce(const Error& server_err, const RequestOutput* response, const HttpCode& code) { +Error ProcessRequestResponce(const Error &server_err, const RequestOutput* response, const HttpCode &code) { if (server_err != nullptr) { return ConsumerErrorFromServerError(server_err); } @@ -88,8 +82,8 @@ ServerDataBroker::ServerDataBroker(std::string server_uri, SourceCredentials source) : io__{GenerateDefaultIO()}, httpclient__{DefaultHttpClient()}, net_client__{new TcpClient()}, - endpoint_{std::move(server_uri)}, source_path_{std::move(source_path)}, has_filesystem_{has_filesystem}, -source_credentials_(std::move(source)) { + endpoint_{std::move(server_uri)}, source_path_{std::move(source_path)}, has_filesystem_{has_filesystem}, + source_credentials_(std::move(source)) { if (source_credentials_.stream.empty()) { source_credentials_.stream = SourceCredentials::kDefaultStream; @@ -105,35 +99,34 @@ std::string ServerDataBroker::RequestWithToken(std::string uri) { return std::move(uri) + "?token=" + source_credentials_.user_token; } -Error ServerDataBroker::ProcessPostRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code) { +Error ServerDataBroker::ProcessPostRequest(const RequestInfo &request, RequestOutput* response, HttpCode* code) { Error err; switch (request.output_mode) { - case OutputDataMode::string: - response->string_output = - httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, - request.cookie, - request.body, - code, - &err); - break; - case OutputDataMode::array: - err = httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, request.cookie, - request.body, &response->data_output, response->data_output_size, code); - break; + case OutputDataMode::string: + response->string_output = + httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, + request.cookie, + request.body, + code, + &err); + break; + case OutputDataMode::array: + err = + httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, request.cookie, + request.body, &response->data_output, response->data_output_size, code); + break; } return err; } - -Error ServerDataBroker::ProcessGetRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code) { +Error ServerDataBroker::ProcessGetRequest(const RequestInfo &request, RequestOutput* response, HttpCode* code) { Error err; response->string_output = httpclient__->Get(RequestWithToken(request.host + request.api) + request.extra_params, code, &err); return err; } - -Error ServerDataBroker::ProcessRequest(RequestOutput* response, const RequestInfo& request, std::string* service_uri) { +Error ServerDataBroker::ProcessRequest(RequestOutput* response, const RequestInfo &request, std::string* service_uri) { Error err; HttpCode code; if (request.post) { @@ -147,7 +140,7 @@ Error ServerDataBroker::ProcessRequest(RequestOutput* response, const RequestInf return ProcessRequestResponce(err, response, code); } -Error ServerDataBroker::DiscoverService(const std::string& service_name, std::string* uri_to_set) { +Error ServerDataBroker::DiscoverService(const std::string &service_name, std::string* uri_to_set) { if (!uri_to_set->empty()) { return nullptr; } @@ -161,13 +154,13 @@ Error ServerDataBroker::DiscoverService(const std::string& service_name, std::st if (err != nullptr || uri_to_set->empty()) { uri_to_set->clear(); return ConsumerErrorTemplates::kUnavailableService.Generate(" on " + endpoint_ - + (err != nullptr ? ": " + err->Explain() - : "")); + + (err != nullptr ? ": " + err->Explain() + : "")); } return nullptr; } -bool ServerDataBroker::SwitchToGetByIdIfNoData(Error* err, const std::string& response, std::string* redirect_uri) { +bool ServerDataBroker::SwitchToGetByIdIfNoData(Error* err, const std::string &response, std::string* redirect_uri) { if (*err == ConsumerErrorTemplates::kNoData) { auto error_data = static_cast<const ConsumerErrorData*>((*err)->GetCustomData()); if (error_data == nullptr) { @@ -195,8 +188,8 @@ Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string g bool dataset) { std::string request_suffix = OpToUriCmd(op); std::string request_api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream - + "/" + std::move(substream) + - +"/" + std::move(group_id) + "/"; + + "/" + std::move(substream) + + +"/" + std::move(group_id) + "/"; uint64_t elapsed_ms = 0; Error no_data_error; while (true) { @@ -204,6 +197,10 @@ Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string g auto err = DiscoverService(kBrokerServiceName, ¤t_broker_uri_); if (err == nullptr) { auto ri = PrepareRequestInfo(request_api + request_suffix, dataset); + if (request_suffix == "next" && resend_) { + ri.extra_params = ri.extra_params + "&resend_nacks=true" + "&resend_after=" + + std::to_string(resend_after_) + "&resend_attempts=" + std::to_string(resend_attempts_); + } RequestOutput output; err = ProcessRequest(&output, ri, ¤t_broker_uri_); *response = std::move(output.string_output); @@ -262,12 +259,9 @@ Error ServerDataBroker::GetLast(FileInfo* info, std::string group_id, std::strin std::string ServerDataBroker::OpToUriCmd(GetImageServerOperation op) { switch (op) { - case GetImageServerOperation::GetNext: - return "next"; - case GetImageServerOperation::GetLast: - return "last"; - default: - return "last"; + case GetImageServerOperation::GetNext:return "next"; + case GetImageServerOperation::GetLast:return "last"; + default:return "last"; } } @@ -305,7 +299,6 @@ Error ServerDataBroker::GetDataFromFile(FileInfo* info, FileData* data) { return nullptr; } - Error ServerDataBroker::RetrieveData(FileInfo* info, FileData* data) { if (data == nullptr || info == nullptr) { return ConsumerErrorTemplates::kWrongInput.Generate("pointers are empty"); @@ -350,7 +343,7 @@ std::string ServerDataBroker::GenerateNewGroupId(Error* err) { return BrokerRequestWithTimeout(ri, err); } -Error ServerDataBroker::ServiceRequestWithTimeout(const std::string& service_name, +Error ServerDataBroker::ServiceRequestWithTimeout(const std::string &service_name, std::string* service_uri, RequestInfo request, RequestOutput* response) { @@ -374,7 +367,7 @@ Error ServerDataBroker::ServiceRequestWithTimeout(const std::string& service_nam Error ServerDataBroker::FtsSizeRequestWithTimeout(FileInfo* info) { RequestInfo ri = CreateFileTransferRequest(info); - ri.extra_params="&sizeonly=true"; + ri.extra_params = "&sizeonly=true"; ri.output_mode = OutputDataMode::string; RequestOutput response; auto err = ServiceRequestWithTimeout(kFileTransferServiceName, ¤t_fts_uri_, ri, &response); @@ -387,7 +380,6 @@ Error ServerDataBroker::FtsSizeRequestWithTimeout(FileInfo* info) { return err; } - Error ServerDataBroker::FtsRequestWithTimeout(FileInfo* info, FileData* data) { RequestInfo ri = CreateFileTransferRequest(info); RequestOutput response; @@ -434,7 +426,7 @@ Error ServerDataBroker::ResetLastReadMarker(std::string group_id, std::string su Error ServerDataBroker::SetLastReadMarker(uint64_t value, std::string group_id, std::string substream) { RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/" - + std::move(substream) + "/" + std::move(group_id) + "/resetcounter"; + + std::move(substream) + "/" + std::move(group_id) + "/resetcounter"; ri.extra_params = "&value=" + std::to_string(value); ri.post = true; @@ -446,7 +438,7 @@ Error ServerDataBroker::SetLastReadMarker(uint64_t value, std::string group_id, uint64_t ServerDataBroker::GetCurrentSize(std::string substream, Error* err) { RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + - +"/" + std::move(substream) + "/size"; + +"/" + std::move(substream) + "/size"; auto responce = BrokerRequestWithTimeout(ri, err); if (*err) { return 0; @@ -484,9 +476,9 @@ Error ServerDataBroker::GetRecordFromServerById(uint64_t id, std::string* respon bool dataset) { RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + - +"/" + std::move(substream) + - "/" + std::move( - group_id) + "/" + std::to_string(id); + +"/" + std::move(substream) + + "/" + std::move( + group_id) + "/" + std::to_string(id); if (dataset) { ri.extra_params += "&dataset=true"; } @@ -517,7 +509,7 @@ DataSet ServerDataBroker::DecodeDatasetFromResponse(std::string response, Error* FileInfos ServerDataBroker::QueryImages(std::string query, std::string substream, Error* err) { RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + - "/" + std::move(substream) + "/0/queryimages"; + "/" + std::move(substream) + "/0/queryimages"; ri.post = true; ri.body = std::move(query); @@ -580,7 +572,7 @@ std::vector<std::string> ParseSubstreamsFromResponse(std::string response, Error std::vector<std::string> substreams; *err = parser.GetArrayString("substreams", &substreams); if (*err) { - return std::vector<std::string> {}; + return std::vector<std::string>{}; } return substreams; } @@ -593,7 +585,7 @@ std::vector<std::string> ServerDataBroker::GetSubstreamList(Error* err) { auto response = BrokerRequestWithTimeout(ri, err); if (*err) { - return std::vector<std::string> {}; + return std::vector<std::string>{}; } return ParseSubstreamsFromResponse(std::move(response), err); @@ -620,14 +612,15 @@ RequestInfo ServerDataBroker::CreateFolderTokenRequest() const { ri.host = endpoint_; ri.api = "/asapo-authorizer/folder"; ri.post = true; - ri.body = "{\"Folder\":\"" + source_path_ + "\",\"BeamtimeId\":\"" + source_credentials_.beamtime_id + "\",\"Token\":\"" - + - source_credentials_.user_token + "\"}"; + ri.body = + "{\"Folder\":\"" + source_path_ + "\",\"BeamtimeId\":\"" + source_credentials_.beamtime_id + "\",\"Token\":\"" + + + source_credentials_.user_token + "\"}"; return ri; } Error ServerDataBroker::GetDataFromFileTransferService(FileInfo* info, FileData* data, - bool retry_with_new_token) { + bool retry_with_new_token) { auto err = UpdateFolderTokenIfNeeded(retry_with_new_token); if (err) { return err; @@ -646,7 +639,7 @@ Error ServerDataBroker::GetDataFromFileTransferService(FileInfo* info, FileData* err = FtsRequestWithTimeout(info, data); if (err == ConsumerErrorTemplates::kWrongInput - && !retry_with_new_token) { // token expired? Refresh token and try again. + && !retry_with_new_token) { // token expired? Refresh token and try again. return GetDataFromFileTransferService(info, data, true); } return err; @@ -665,12 +658,16 @@ Error ServerDataBroker::Acknowledge(std::string group_id, uint64_t id, std::stri return err; } -IdList ServerDataBroker::GetUnacknowledgedTupleIds(std::string group_id, std::string substream, uint64_t from_id, uint64_t to_id, Error* error) { +IdList ServerDataBroker::GetUnacknowledgedTupleIds(std::string group_id, + std::string substream, + uint64_t from_id, + uint64_t to_id, + Error* error) { RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + +"/" + std::move(substream) + "/" + std::move(group_id) + "/nacks"; - ri.extra_params = "&from=" + std::to_string(from_id)+"&to=" + std::to_string(to_id); + ri.extra_params = "&from=" + std::to_string(from_id) + "&to=" + std::to_string(to_id); auto json_string = BrokerRequestWithTimeout(ri, error); if (*error) { @@ -686,7 +683,10 @@ IdList ServerDataBroker::GetUnacknowledgedTupleIds(std::string group_id, std::st return list; } -IdList ServerDataBroker::GetUnacknowledgedTupleIds(std::string group_id, uint64_t from_id, uint64_t to_id, Error* error) { +IdList ServerDataBroker::GetUnacknowledgedTupleIds(std::string group_id, + uint64_t from_id, + uint64_t to_id, + Error* error) { return GetUnacknowledgedTupleIds(std::move(group_id), kDefaultSubstream, from_id, to_id, error); } @@ -708,7 +708,7 @@ uint64_t ServerDataBroker::GetLastAcknowledgedTulpeId(std::string group_id, std: } if (id == 0) { - *error=ConsumerErrorTemplates::kNoData.Generate(); + *error = ConsumerErrorTemplates::kNoData.Generate(); } return id; } @@ -717,4 +717,10 @@ uint64_t ServerDataBroker::GetLastAcknowledgedTulpeId(std::string group_id, Erro return GetLastAcknowledgedTulpeId(std::move(group_id), kDefaultSubstream, error); } +void ServerDataBroker::SetResendNacs(bool resend, uint64_t resend_after, uint64_t resend_attempts) { + resend_ = resend; + resend_after_ = resend_after; + resend_attempts_ = resend_attempts; +} + } diff --git a/consumer/api/cpp/src/server_data_broker.h b/consumer/api/cpp/src/server_data_broker.h index c26a1897c387b99cf24e2af49d5c244f29ea6d6d..55f032db33b5eff8b7fa5a397f499e741e05a1fd 100644 --- a/consumer/api/cpp/src/server_data_broker.h +++ b/consumer/api/cpp/src/server_data_broker.h @@ -8,150 +8,150 @@ namespace asapo { - enum class GetImageServerOperation { - GetNext, - GetLast, - GetID + GetNext, + GetLast, + GetID }; enum class OutputDataMode { - string, - array, - file + string, + array, + file }; - struct RequestInfo { - std::string host; - std::string api; - std::string extra_params; - std::string body; - std::string cookie; - OutputDataMode output_mode = OutputDataMode::string; - bool post = false; + std::string host; + std::string api; + std::string extra_params; + std::string body; + std::string cookie; + OutputDataMode output_mode = OutputDataMode::string; + bool post = false; }; struct RequestOutput { - std::string string_output; - FileData data_output; - uint64_t data_output_size; - const char* to_string() const { - if (!data_output) { - return string_output.c_str(); - } else { - return reinterpret_cast<char const*>(data_output.get()) ; - } - } + std::string string_output; + FileData data_output; + uint64_t data_output_size; + const char* to_string() const { + if (!data_output) { + return string_output.c_str(); + } else { + return reinterpret_cast<char const*>(data_output.get()); + } + } }; -Error ProcessRequestResponce(const Error& server_err, const RequestOutput* response, const HttpCode& code); -Error ConsumerErrorFromNoDataResponse(const std::string& response); - +Error ProcessRequestResponce(const Error &server_err, const RequestOutput* response, const HttpCode &code); +Error ConsumerErrorFromNoDataResponse(const std::string &response); class ServerDataBroker final : public asapo::DataBroker { - public: - explicit ServerDataBroker(std::string server_uri, std::string source_path, bool has_filesystem, - SourceCredentials source); - - Error Acknowledge(std::string group_id, uint64_t id, std::string substream = kDefaultSubstream) override; - - IdList GetUnacknowledgedTupleIds(std::string group_id, std::string substream, uint64_t from_id, uint64_t to_id, Error* error) override; - IdList GetUnacknowledgedTupleIds(std::string group_id, uint64_t from_id, uint64_t to_id, Error* error) override; - - uint64_t GetLastAcknowledgedTulpeId(std::string group_id, std::string substream, Error* error) override; - uint64_t GetLastAcknowledgedTulpeId(std::string group_id, Error* error) override; - - - Error ResetLastReadMarker(std::string group_id) override; - Error ResetLastReadMarker(std::string group_id, std::string substream) override; - - Error SetLastReadMarker(uint64_t value, std::string group_id) override; - Error SetLastReadMarker(uint64_t value, std::string group_id, std::string substream) override; - - Error GetNext(FileInfo* info, std::string group_id, FileData* data) override; - Error GetNext(FileInfo* info, std::string group_id, std::string substream, FileData* data) override; - - Error GetLast(FileInfo* info, std::string group_id, FileData* data) override; - Error GetLast(FileInfo* info, std::string group_id, std::string substream, FileData* data) override; - - std::string GenerateNewGroupId(Error* err) override; - std::string GetBeamtimeMeta(Error* err) override; - - uint64_t GetCurrentSize(Error* err) override; - uint64_t GetCurrentSize(std::string substream, Error* err) override; - - - Error GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) override; - Error GetById(uint64_t id, FileInfo* info, std::string group_id, std::string substream, FileData* data) override; - - void SetTimeout(uint64_t timeout_ms) override; - FileInfos QueryImages(std::string query, Error* err) override; - FileInfos QueryImages(std::string query, std::string substream, Error* err) override; - - DataSet GetNextDataset(std::string group_id, Error* err) override; - DataSet GetNextDataset(std::string group_id, std::string substream, Error* err) override; - - DataSet GetLastDataset(std::string group_id, Error* err) override; - DataSet GetLastDataset(std::string group_id, std::string substream, Error* err) override; - - DataSet GetDatasetById(uint64_t id, std::string group_id, Error* err) override; - DataSet GetDatasetById(uint64_t id, std::string group_id, std::string substream, Error* err) override; - - Error RetrieveData(FileInfo* info, FileData* data) override; - - std::vector<std::string> GetSubstreamList(Error* err) override; - - std::unique_ptr<IO> io__; // modified in testings to mock system calls,otherwise do not touch - std::unique_ptr<HttpClient> httpclient__; - std::unique_ptr<NetClient> net_client__; - private: - Error GetDataFromFileTransferService(FileInfo* info, FileData* data, bool retry_with_new_token); - Error GetDataFromFile(FileInfo* info, FileData* data); - static const std::string kBrokerServiceName; - static const std::string kFileTransferServiceName; - std::string RequestWithToken(std::string uri); - Error GetRecordFromServer(std::string* info, std::string group_id, std::string substream, GetImageServerOperation op, - bool dataset = false); - Error GetRecordFromServerById(uint64_t id, std::string* info, std::string group_id, std::string substream, - bool dataset = false); - Error GetDataIfNeeded(FileInfo* info, FileData* data); - Error DiscoverService(const std::string& service_name, std::string* uri_to_set); - bool SwitchToGetByIdIfNoData(Error* err, const std::string& response, std::string* redirect_uri); - Error ProcessRequest(RequestOutput* response, const RequestInfo& request, std::string* service_uri); - Error GetImageFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, std::string substream, - FileInfo* info, FileData* data); - DataSet GetDatasetFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, std::string substream, - Error* err); - bool DataCanBeInBuffer(const FileInfo* info); - Error TryGetDataFromBuffer(const FileInfo* info, FileData* data); - Error ServiceRequestWithTimeout(const std::string& service_name, std::string* service_uri, RequestInfo request, - RequestOutput* response); - std::string BrokerRequestWithTimeout(RequestInfo request, Error* err); - Error FtsRequestWithTimeout(FileInfo* info, FileData* data); - Error FtsSizeRequestWithTimeout(FileInfo* info); - Error ProcessPostRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code); - Error ProcessGetRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code); - - DataSet DecodeDatasetFromResponse(std::string response, Error* err); - RequestInfo PrepareRequestInfo(std::string api_url, bool dataset); - std::string OpToUriCmd(GetImageServerOperation op); - Error UpdateFolderTokenIfNeeded(bool ignore_existing); - std::string endpoint_; - std::string current_broker_uri_; - std::string current_fts_uri_; - std::string source_path_; - bool has_filesystem_; - SourceCredentials source_credentials_; - uint64_t timeout_ms_ = 0; - std::string folder_token_; - RequestInfo CreateFolderTokenRequest() const; - RequestInfo CreateFileTransferRequest(const FileInfo* info) const; + public: + explicit ServerDataBroker(std::string server_uri, std::string source_path, bool has_filesystem, + SourceCredentials source); + + Error Acknowledge(std::string group_id, uint64_t id, std::string substream = kDefaultSubstream) override; + + IdList GetUnacknowledgedTupleIds(std::string group_id, + std::string substream, + uint64_t from_id, + uint64_t to_id, + Error* error) override; + IdList GetUnacknowledgedTupleIds(std::string group_id, uint64_t from_id, uint64_t to_id, Error* error) override; + + uint64_t GetLastAcknowledgedTulpeId(std::string group_id, std::string substream, Error* error) override; + uint64_t GetLastAcknowledgedTulpeId(std::string group_id, Error* error) override; + + Error ResetLastReadMarker(std::string group_id) override; + Error ResetLastReadMarker(std::string group_id, std::string substream) override; + + Error SetLastReadMarker(uint64_t value, std::string group_id) override; + Error SetLastReadMarker(uint64_t value, std::string group_id, std::string substream) override; + + Error GetNext(FileInfo* info, std::string group_id, FileData* data) override; + Error GetNext(FileInfo* info, std::string group_id, std::string substream, FileData* data) override; + + Error GetLast(FileInfo* info, std::string group_id, FileData* data) override; + Error GetLast(FileInfo* info, std::string group_id, std::string substream, FileData* data) override; + + std::string GenerateNewGroupId(Error* err) override; + std::string GetBeamtimeMeta(Error* err) override; + + uint64_t GetCurrentSize(Error* err) override; + uint64_t GetCurrentSize(std::string substream, Error* err) override; + + Error GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) override; + Error GetById(uint64_t id, FileInfo* info, std::string group_id, std::string substream, FileData* data) override; + + void SetTimeout(uint64_t timeout_ms) override; + FileInfos QueryImages(std::string query, Error* err) override; + FileInfos QueryImages(std::string query, std::string substream, Error* err) override; + + DataSet GetNextDataset(std::string group_id, Error* err) override; + DataSet GetNextDataset(std::string group_id, std::string substream, Error* err) override; + + DataSet GetLastDataset(std::string group_id, Error* err) override; + DataSet GetLastDataset(std::string group_id, std::string substream, Error* err) override; + + DataSet GetDatasetById(uint64_t id, std::string group_id, Error* err) override; + DataSet GetDatasetById(uint64_t id, std::string group_id, std::string substream, Error* err) override; + + Error RetrieveData(FileInfo* info, FileData* data) override; + + std::vector<std::string> GetSubstreamList(Error* err) override; + void SetResendNacs(bool resend, uint64_t resend_after, uint64_t resend_attempts) override; + + std::unique_ptr<IO> io__; // modified in testings to mock system calls,otherwise do not touch + std::unique_ptr<HttpClient> httpclient__; + std::unique_ptr<NetClient> net_client__; + private: + Error GetDataFromFileTransferService(FileInfo* info, FileData* data, bool retry_with_new_token); + Error GetDataFromFile(FileInfo* info, FileData* data); + static const std::string kBrokerServiceName; + static const std::string kFileTransferServiceName; + std::string RequestWithToken(std::string uri); + Error GetRecordFromServer(std::string* info, std::string group_id, std::string substream, GetImageServerOperation op, + bool dataset = false); + Error GetRecordFromServerById(uint64_t id, std::string* info, std::string group_id, std::string substream, + bool dataset = false); + Error GetDataIfNeeded(FileInfo* info, FileData* data); + Error DiscoverService(const std::string &service_name, std::string* uri_to_set); + bool SwitchToGetByIdIfNoData(Error* err, const std::string &response, std::string* redirect_uri); + Error ProcessRequest(RequestOutput* response, const RequestInfo &request, std::string* service_uri); + Error GetImageFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, std::string substream, + FileInfo* info, FileData* data); + DataSet GetDatasetFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, std::string substream, + Error* err); + bool DataCanBeInBuffer(const FileInfo* info); + Error TryGetDataFromBuffer(const FileInfo* info, FileData* data); + Error ServiceRequestWithTimeout(const std::string &service_name, std::string* service_uri, RequestInfo request, + RequestOutput* response); + std::string BrokerRequestWithTimeout(RequestInfo request, Error* err); + Error FtsRequestWithTimeout(FileInfo* info, FileData* data); + Error FtsSizeRequestWithTimeout(FileInfo* info); + Error ProcessPostRequest(const RequestInfo &request, RequestOutput* response, HttpCode* code); + Error ProcessGetRequest(const RequestInfo &request, RequestOutput* response, HttpCode* code); + + DataSet DecodeDatasetFromResponse(std::string response, Error* err); + RequestInfo PrepareRequestInfo(std::string api_url, bool dataset); + std::string OpToUriCmd(GetImageServerOperation op); + Error UpdateFolderTokenIfNeeded(bool ignore_existing); + std::string endpoint_; + std::string current_broker_uri_; + std::string current_fts_uri_; + std::string source_path_; + bool has_filesystem_; + SourceCredentials source_credentials_; + uint64_t timeout_ms_ = 0; + std::string folder_token_; + RequestInfo CreateFolderTokenRequest() const; + RequestInfo CreateFileTransferRequest(const FileInfo* info) const; + uint64_t resend_timout_ = 0; + bool resend_ = false; + uint64_t resend_after_; + uint64_t resend_attempts_; }; - - - } - #endif //ASAPO_SERVER_DATA_BROKER_H diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp index 72ea91c999b5816c68e41f44e6fcc03d9d7f3182..0b442d023674217d113adf45d4a277aac9111aae 100644 --- a/consumer/api/cpp/unittests/test_server_broker.cpp +++ b/consumer/api/cpp/unittests/test_server_broker.cpp @@ -1247,6 +1247,20 @@ TEST_F(ServerDataBrokerTests, GetByIdErrorsForId0) { ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput)); } +TEST_F(ServerDataBrokerTests, ResendNacks) { + MockGetBrokerUri(); + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" + + expected_group_id + "/next?token=" + + expected_token+"&resend_nacks=true&resend_after=10&resend_attempts=3", _, + _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(""))); + + data_broker->SetResendNacs(true,10,3); + data_broker->GetNext(&info, expected_group_id, nullptr); +} } diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 092b753df9665c36157f798e8da163b8130bacdb..373ed8de567cbbf3296f7c0eabcdee97bfece366 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -66,6 +66,7 @@ cdef extern from "asapo_consumer.h" namespace "asapo" nogil: DataSet GetDatasetById(uint64_t id,string group_id,string substream, Error* err) Error RetrieveData(FileInfo* info, FileData* data) vector[string] GetSubstreamList(Error* err) + void SetResendNacs(bool resend, uint64_t resend_after, uint64_t resend_attempts) cdef extern from "asapo_consumer.h" namespace "asapo" nogil: diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 5cf59f4b66c14365c2908258869658f4d910a412..75c7be9a21ed9b56fa96ea517589388af873746e 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -210,6 +210,9 @@ cdef class PyDataBroker: err = self.c_broker.Acknowledge(b_group_id,id,b_substream) if err: throw_exception(err) + def set_resend_nacs(self,bool resend, uint64_t resend_after, uint64_t resend_attempts): + with nogil: + self.c_broker.SetResendNacs(resend,resend_after,resend_attempts) def get_last_acknowledged_tuple_id(self, group_id, substream = "default"): cdef string b_group_id = _bytes(group_id)