diff --git a/CHANGELOG.md b/CHANGELOG.md index 599cc91dddf87c4040683cca14c6dd088a28286e..bda0b8455636386cb4cd8411dd980d74af731cdf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ ##20.09.0 FEATURES -* implemented data resend - if data is not acknowledged during a given period it will be redelivered +* implemented data resend - data will be redelivered if it is not acknowledged during a given period or a consumer sent a negative acknowledge BUG FIXES * fix data query images when beamtime_id starts with number diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index c925dd7a3d94652c3bd5bee246dea7de4f66c57a..000ced97dce81ef41d45ee57f1665730646c98e0 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -11,6 +11,7 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "math" "sort" "strconv" "strings" @@ -31,10 +32,19 @@ type ServiceRecord struct { type InProcessingRecord struct { ID int `bson:"_id" json:"_id"` + MaxResendAttempts int `bson:"maxResendAttempts" json:"maxResendAttempts"` ResendAttempts int `bson:"resendAttempts" json:"resendAttempts"` - Updated int64 + DelaySec int64 `bson:"delaySec" json:"delaySec"` } +type NegAckParamsRecord struct { + ID int `bson:"_id" json:"_id"` + MaxResendAttempts int `bson:"maxResendAttempts" json:"maxResendAttempts"` + ResendAttempts int `bson:"resendAttempts" json:"resendAttempts"` + DelaySec int64 `bson:"delaySec" json:"delaySec"` +} + + type Nacks struct { Unacknowledged []int `json:"unacknowledged"` } @@ -265,20 +275,36 @@ func (db *Mongodb) getRecordByID(dbname string, collection_name string, group_id } +func (db *Mongodb) negAckRecord(dbname string, group_id string, input_str string) ([]byte, error) { + input := struct { + Id int + Params struct { + DelaySec int + } + }{} + + err := json.Unmarshal([]byte(input_str), &input) + if err != nil { + return nil, &DBError{utils.StatusWrongInput, err.Error()} + } + + err = db.InsertRecordToInprocess(dbname,inprocess_collection_name_prefix+group_id,input.Id,input.Params.DelaySec, 1) + return []byte(""), err +} + + func (db *Mongodb) ackRecord(dbname string, collection_name string, group_id string, id_str string) ([]byte, error) { - id, err := strconv.Atoi(id_str) + var record ID + err := json.Unmarshal([]byte(id_str),&record) if err != nil { return nil, &DBError{utils.StatusWrongInput, err.Error()} } - record := struct { - Id int `bson:"_id"` - }{id} c := db.client.Database(dbname).Collection(acks_collection_name_prefix + collection_name + "_" + group_id) _, err = c.InsertOne(context.Background(), &record) if err == nil { c = db.client.Database(dbname).Collection(inprocess_collection_name_prefix + group_id) - _, err_del := c.DeleteOne(context.Background(), bson.M{"_id": id}) + _, err_del := c.DeleteOne(context.Background(), bson.M{"_id": record.ID}) if err_del != nil { return nil, &DBError{utils.StatusWrongInput, err.Error()} } @@ -326,12 +352,18 @@ func (db *Mongodb) getCurrentPointer(db_name string, collection_name string, gro return curPointer, max_ind, nil } -func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, resendAfter int,nResendAttempts int) (int, error) { +func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, delaySec int,nResendAttempts int) (int, error) { var res InProcessingRecord opts := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After) tNow := time.Now().Unix() - update := bson.M{"$set": bson.M{"updated": tNow}, "$inc": bson.M{"resendAttempts": 1}} - q := bson.M{"updated": bson.M{"$lte": tNow - int64(resendAfter)},"resendAttempts": bson.M{"$lt": nResendAttempts}} + var update bson.M + if nResendAttempts==0 { + update = bson.M{"$set": bson.M{"delaySec": tNow + int64(delaySec) ,"maxResendAttempts":math.MaxInt32}, "$inc": bson.M{"resendAttempts": 1}} + } else { + update = bson.M{"$set": bson.M{"delaySec": tNow + int64(delaySec) ,"maxResendAttempts":nResendAttempts}, "$inc": bson.M{"resendAttempts": 1}} + } + + q := bson.M{"delaySec": bson.M{"$lte": tNow},"$expr": bson.M{"$lt": []string{"$resendAttempts","$maxResendAttempts"}}} c := db.client.Database(dbname).Collection(collection_name) err := c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(&res) if err != nil { @@ -346,13 +378,9 @@ func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, resen return res.ID, nil } -func (db *Mongodb) InsertToInprocessIfNeeded(db_name string, collection_name string, id int, extra_param string) error { - if len(extra_param) == 0 { - return nil - } - +func (db *Mongodb) InsertRecordToInprocess(db_name string, collection_name string,id int,delaySec int, nResendAttempts int) error { record := InProcessingRecord{ - id, 0, time.Now().Unix(), + id, nResendAttempts, 0,time.Now().Unix()+int64(delaySec), } c := db.client.Database(db_name).Collection(collection_name) @@ -363,20 +391,33 @@ func (db *Mongodb) InsertToInprocessIfNeeded(db_name string, collection_name str return err } -func (db *Mongodb) getNextIndexesFromInprocessed(db_name string, collection_name string, group_id string, dataset bool, extra_param string, ignoreTimeout bool) (int, int, error) { +func (db *Mongodb) InsertToInprocessIfNeeded(db_name string, collection_name string, id int, extra_param string) error { if len(extra_param) == 0 { - return 0, 0, nil + return nil } - - record_ind := 0 - max_ind := 0 - resendAfter, nResendAttempts, err := extractsTwoIntsFromString(extra_param) + delaySec, nResendAttempts, err := extractsTwoIntsFromString(extra_param) if err != nil { - return 0, 0, err + return err + } + + return db.InsertRecordToInprocess(db_name,collection_name,id,delaySec, nResendAttempts) + +} + +func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(db_name string, collection_name string, group_id string, dataset bool, extra_param string, ignoreTimeout bool) (int, int, error) { + var record_ind, max_ind, delaySec, nResendAttempts int + var err error + if len(extra_param) != 0 { + delaySec, nResendAttempts, err = extractsTwoIntsFromString(extra_param) + if err != nil { + return 0, 0, err + } + } else { + nResendAttempts = -1 } tNow := time.Now().Unix() if (atomic.LoadInt64(&db.lastReadFromInprocess) <= tNow-int64(db.settings.ReadFromInprocessPeriod)) || ignoreTimeout { - record_ind, err = db.getUnProcessedId(db_name, inprocess_collection_name_prefix+group_id, resendAfter,nResendAttempts) + record_ind, err = db.getUnProcessedId(db_name, inprocess_collection_name_prefix+group_id, delaySec,nResendAttempts) if err != nil { log_str := "error getting unprocessed id " + db_name + ", groupid: " + group_id + ":" + err.Error() logger.Debug(log_str) @@ -396,7 +437,7 @@ func (db *Mongodb) getNextIndexesFromInprocessed(db_name string, collection_name } -func (db *Mongodb) getNextIndexesFromCurPointer(db_name string, collection_name string, group_id string, dataset bool, extra_param string) (int, int, error) { +func (db *Mongodb) getNextAndMaxIndexesFromCurPointer(db_name string, collection_name string, group_id string, dataset bool, extra_param string) (int, int, error) { curPointer, max_ind, err := db.getCurrentPointer(db_name, collection_name, group_id, dataset) if err != nil { log_str := "error getting next pointer for " + db_name + ", groupid: " + group_id + ":" + err.Error() @@ -408,17 +449,17 @@ func (db *Mongodb) getNextIndexesFromCurPointer(db_name string, collection_name return curPointer.Value, max_ind, nil } -func (db *Mongodb) getNextIndexes(db_name string, collection_name string, group_id string, dataset bool, extra_param string) (int, int, error) { - nextInd, maxInd, err := db.getNextIndexesFromInprocessed(db_name, collection_name, group_id, dataset, extra_param, false) +func (db *Mongodb) getNextAndMaxIndexes(db_name string, collection_name string, group_id string, dataset bool, extra_param string) (int, int, error) { + nextInd, maxInd, err := db.getNextAndMaxIndexesFromInprocessed(db_name, collection_name, group_id, dataset, extra_param, false) if err != nil { return 0, 0, err } if nextInd == 0 { - nextInd, maxInd, err = db.getNextIndexesFromCurPointer(db_name, collection_name, group_id, dataset, extra_param) + nextInd, maxInd, err = db.getNextAndMaxIndexesFromCurPointer(db_name, collection_name, group_id, dataset, extra_param) if err_db, ok := err.(*DBError); ok && err_db.Code == utils.StatusNoData { var err_inproc error - nextInd, maxInd, err_inproc = db.getNextIndexesFromInprocessed(db_name, collection_name, group_id, dataset, extra_param, true) + nextInd, maxInd, err_inproc = db.getNextAndMaxIndexesFromInprocessed(db_name, collection_name, group_id, dataset, extra_param, true) if err_inproc != nil { return 0, 0, err_inproc } @@ -449,7 +490,7 @@ func (db *Mongodb) processLastRecord(data []byte, err error, db_name string, col var err_inproc error - nextInd, maxInd, err_inproc := db.getNextIndexesFromInprocessed(db_name, collection_name, group_id, dataset, extra_param, true) + nextInd, maxInd, err_inproc := db.getNextAndMaxIndexesFromInprocessed(db_name, collection_name, group_id, dataset, extra_param, true) if err_inproc != nil { return nil, err_inproc } @@ -461,7 +502,7 @@ func (db *Mongodb) processLastRecord(data []byte, err error, db_name string, col } func (db *Mongodb) getNextRecord(db_name string, collection_name string, group_id string, dataset bool, extra_param string) ([]byte, error) { - nextInd, maxInd, err := db.getNextIndexes(db_name, collection_name, group_id, dataset, extra_param) + nextInd, maxInd, err := db.getNextAndMaxIndexes(db_name, collection_name, group_id, dataset, extra_param) if err != nil { return nil, err } @@ -512,8 +553,17 @@ func (db *Mongodb) resetCounter(db_name string, collection_name string, group_id } err = db.setCounter(db_name, collection_name, group_id, id) + if err!= nil { + return []byte(""), err + } - return []byte(""), err + c := db.client.Database(db_name).Collection(inprocess_collection_name_prefix + group_id) + _, err_del := c.DeleteMany(context.Background(), bson.M{"_id": bson.M{"$gte": id}}) + if err_del != nil { + return nil, &DBError{utils.StatusWrongInput, err.Error()} + } + + return []byte(""), nil } func (db *Mongodb) getMeta(dbname string, id_str string) ([]byte, error) { @@ -749,6 +799,8 @@ func (db *Mongodb) ProcessRequest(db_name string, collection_name string, group_ return db.getSubstreams(db_name) case "ackimage": return db.ackRecord(db_name, collection_name, group_id, extra_param) + case "negackimage": + return db.negAckRecord(db_name, group_id, extra_param) case "nacks": return db.nacks(db_name, collection_name, group_id, extra_param) case "lastack": diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index 987aa3daa39595f2cee3e4d71bcf6e78c3030bb9..13f5e8090dddda68eb4680600c70b8d80137dabe 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -684,8 +684,8 @@ func TestMongoDBAckImage(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec1) - - res, err := db.ProcessRequest(dbname, collection, groupId, "ackimage", "1") + query_str := "{\"Id\":1,\"Op\":\"ackimage\"}" + res, err := db.ProcessRequest(dbname, collection, groupId, "ackimage", query_str) nacks,_ := db.getNacks(dbname,collection,groupId,1,1) assert.Nil(t, err) assert.Equal(t, "", string(res)) @@ -721,9 +721,9 @@ func TestMongoDBNacks(t *testing.T) { insertRecords(10) } if (test.ackRecords) { - db.ackRecord(dbname, collection, groupId,"2") - db.ackRecord(dbname, collection, groupId,"3") - db.ackRecord(dbname, collection, groupId,"4") + db.ackRecord(dbname, collection, groupId,"{\"Id\":2,\"Op\":\"ackimage\"}") + db.ackRecord(dbname, collection, groupId,"{\"Id\":3,\"Op\":\"ackimage\"}") + db.ackRecord(dbname, collection, groupId,"{\"Id\":4,\"Op\":\"ackimage\"}") } res, err := db.ProcessRequest(dbname, collection, groupId, "nacks", test.rangeString) if test.ok { @@ -755,9 +755,9 @@ func TestMongoDBLastAcks(t *testing.T) { insertRecords(10) } if (test.ackRecords) { - db.ackRecord(dbname, collection, groupId,"2") - db.ackRecord(dbname, collection, groupId,"3") - db.ackRecord(dbname, collection, groupId,"4") + db.ackRecord(dbname, collection, groupId,"{\"Id\":2,\"Op\":\"ackimage\"}") + db.ackRecord(dbname, collection, groupId,"{\"Id\":3,\"Op\":\"ackimage\"}") + db.ackRecord(dbname, collection, groupId,"{\"Id\":4,\"Op\":\"ackimage\"}") } res, err := db.ProcessRequest(dbname, collection, groupId, "lastack", "") assert.Nil(t, err, test.test) @@ -869,20 +869,71 @@ func TestMongoDBGetNextUsesInprocessedImmedeatlyIfEndofStream(t *testing.T) { assert.Equal(t, string(rec1_expect), string(res2)) } - - func TestMongoDBAckDeletesInprocessed(t *testing.T) { db.SetSettings(DBSettings{ReadFromInprocessPeriod: 0}) db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) db.ProcessRequest(dbname, collection, groupId, "next", "0_3") - db.ProcessRequest(dbname, collection, groupId, "ackimage", "1") + query_str := "{\"Id\":1,\"Op\":\"ackimage\"}" + db.ProcessRequest(dbname, collection, groupId, "ackimage", query_str) _, err := db.ProcessRequest(dbname, collection, groupId, "next", "0_3") - assert.NotNil(t, err) if err!=nil { assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"\"}", err.Error()) } +} + + +func TestMongoDBNegAck(t *testing.T) { + db.SetSettings(DBSettings{ReadFromInprocessPeriod: 0}) + db.Connect(dbaddress) + defer cleanup() + inputParams := struct { + Id int + Params struct { + DelaySec int + } + }{} + inputParams.Id = 1 + inputParams.Params.DelaySec=0 + + + db.insertRecord(dbname, collection, &rec1) + db.ProcessRequest(dbname, collection, groupId, "next", "") + bparam,_:= json.Marshal(&inputParams) + db.ProcessRequest(dbname, collection, groupId, "negackimage", string(bparam)) + res, err := db.ProcessRequest(dbname, collection, groupId, "next", "") // first time image from negack + _, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "") // second time nothing + + assert.Nil(t, err) + assert.Equal(t, string(rec1_expect), string(res)) + assert.NotNil(t, err1) + if err1!=nil { + assert.Equal(t, utils.StatusNoData, err1.(*DBError).Code) + assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"\"}", err1.Error()) + } +} + +func TestMongoDBGetNextClearsInprocessAfterReset(t *testing.T) { + db.SetSettings(DBSettings{ReadFromInprocessPeriod: 0}) + db.Connect(dbaddress) + defer cleanup() + err := db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection, &rec2) + res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0_1") + res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "0_1") + db.ProcessRequest(dbname, collection, groupId, "resetcounter", "0") + res2, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "0_1") + res3, err3 := db.ProcessRequest(dbname, collection, groupId, "next", "0_1") + + assert.Nil(t, err) + assert.Nil(t, err1) + assert.Nil(t, err2) + assert.Nil(t, err3) + assert.Equal(t, string(rec1_expect), string(res)) + assert.Equal(t, string(rec1_expect), string(res1)) + assert.Equal(t, string(rec1_expect), string(res2)) + assert.Equal(t, string(rec1_expect), string(res3)) } \ No newline at end of file diff --git a/broker/src/asapo_broker/server/get_commands_test.go b/broker/src/asapo_broker/server/get_commands_test.go index 2411265668315b21280d1a7f8401b75ea991ef37..c0a4eba12b086ee9be0ecc32efc2795ae6afef28 100644 --- a/broker/src/asapo_broker/server/get_commands_test.go +++ b/broker/src/asapo_broker/server/get_commands_test.go @@ -46,7 +46,7 @@ var testsGetCommand = []struct { {"nacks", expectedSubstream, expectedGroupID, expectedSubstream + "/" + expectedGroupID + "/nacks","","0_0"}, {"next", expectedSubstream, expectedGroupID, expectedSubstream + "/" + expectedGroupID + "/next","",""}, {"next", expectedSubstream, expectedGroupID, expectedSubstream + "/" + - expectedGroupID + "/next","&resend_nacks=true&resend_after=10&resend_attempts=3","10_3"}, + expectedGroupID + "/next","&resend_nacks=true&delay_sec=10&resend_attempts=3","10_3"}, {"size", expectedSubstream, "", expectedSubstream + "/size","","0"}, {"substreams", "0", "", "0/substreams","",""}, {"lastack", expectedSubstream, expectedGroupID, expectedSubstream + "/" + expectedGroupID + "/lastack","",""}, diff --git a/broker/src/asapo_broker/server/get_next.go b/broker/src/asapo_broker/server/get_next.go index 41dd9f1dbfacdd249060cc85ab2462e9812d3c4e..8297dc97322cb60595b9fbb82bd3a5c046d1d986 100644 --- a/broker/src/asapo_broker/server/get_next.go +++ b/broker/src/asapo_broker/server/get_next.go @@ -9,11 +9,11 @@ import ( func extractResend(r *http.Request) (string) { keys := r.URL.Query() resend := keys.Get("resend_nacks") - resend_after := keys.Get("resend_after") + delay_sec := keys.Get("delay_sec") resend_attempts := keys.Get("resend_attempts") resend_params := "" if len(resend)!=0 { - resend_params=resend_after+"_"+resend_attempts + resend_params=delay_sec+"_"+resend_attempts } return resend_params } diff --git a/broker/src/asapo_broker/server/post_op_image.go b/broker/src/asapo_broker/server/post_op_image.go index 5c1bafd82533fc65ed86ff8d3aacb7576522b45d..0f3b22198f867ead06f380b220b713b1bbd8c048 100644 --- a/broker/src/asapo_broker/server/post_op_image.go +++ b/broker/src/asapo_broker/server/post_op_image.go @@ -2,13 +2,15 @@ package server import ( "encoding/json" - "fmt" "io/ioutil" "net/http" + "strconv" ) type ImageOp struct { + Id int Op string + Params map[string]interface{} `json:",omitempty"` } func routeImageOp(w http.ResponseWriter, r *http.Request) { body, err := ioutil.ReadAll(r.Body) @@ -17,20 +19,25 @@ func routeImageOp(w http.ResponseWriter, r *http.Request) { return } - id, ok := extractRequestParametersID(r) + id_str, ok := extractRequestParametersID(r) if !ok { w.WriteHeader(http.StatusBadRequest) return } + id, err := strconv.Atoi(id_str) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + var op ImageOp err = json.Unmarshal(body, &op) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - - fmt.Println(op) - - processRequest(w, r, "ackimage", id, true) + op.Id = id + bOp,_ := json.Marshal(&op) + processRequest(w, r, op.Op, string(bOp), true) } diff --git a/broker/src/asapo_broker/server/post_op_image_test.go b/broker/src/asapo_broker/server/post_op_image_test.go index 4c2cf86917ff9308d9e8751acb43337cec5912ba..fbba23069496813d34f5aae1ca30cd5387ac3c45 100644 --- a/broker/src/asapo_broker/server/post_op_image_test.go +++ b/broker/src/asapo_broker/server/post_op_image_test.go @@ -32,9 +32,9 @@ func TestImageOpTestSuite(t *testing.T) { suite.Run(t, new(ImageOpTestSuite)) } -func (suite *ImageOpTestSuite) TestImageOpOK() { - query_str := "{\"Op\":\"Acknowledge\"}" - suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "ackimage", "1").Return([]byte(""), nil) +func (suite *ImageOpTestSuite) TestAckImageOpOK() { + query_str := "{\"Id\":1,\"Op\":\"ackimage\"}" + suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "ackimage", query_str).Return([]byte(""), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request ackimage"))) w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/1" + correctTokenSuffix,"POST",query_str) suite.Equal(http.StatusOK, w.Code, "ackimage OK") diff --git a/broker/src/asapo_broker/server/server.go b/broker/src/asapo_broker/server/server.go index 1a559d891aa5f7953983c48588e8ea2d5029597c..288202d8dde6562cddb7c7f33117df235f5569ee 100644 --- a/broker/src/asapo_broker/server/server.go +++ b/broker/src/asapo_broker/server/server.go @@ -9,6 +9,8 @@ import ( "net/http" ) +const kDefaultresendInterval = 10 + var db database.Agent type serverSettings struct { @@ -20,7 +22,14 @@ type serverSettings struct { Port int LogLevel string discoveredDbAddress string - CheckResendInterval int + CheckResendInterval *int +} + +func (s *serverSettings) GetResendInterval() int { + if s.CheckResendInterval==nil { + return kDefaultresendInterval + } + return *s.CheckResendInterval } func (s *serverSettings) GetDatabaseServer() string { @@ -73,7 +82,7 @@ func InitDB(dbAgent database.Agent) (err error) { log.Debug("Got mongodb server: " + settings.discoveredDbAddress) } - db.SetSettings(database.DBSettings{ReadFromInprocessPeriod: settings.CheckResendInterval}) + db.SetSettings(database.DBSettings{ReadFromInprocessPeriod: settings.GetResendInterval()}) return db.Connect(settings.GetDatabaseServer()) } diff --git a/broker/src/asapo_broker/server/server_nottested.go b/broker/src/asapo_broker/server/server_nottested.go index 0c512a1606d8df566218d5619ab93022a44046c3..ffa4df68f9fab3e6fc40dbdbd68c66d8a43a4dc7 100644 --- a/broker/src/asapo_broker/server/server_nottested.go +++ b/broker/src/asapo_broker/server/server_nottested.go @@ -53,8 +53,8 @@ func ReadConfig(fname string) (log.Level, error) { return log.FatalLevel, errors.New("Server port not set") } - if settings.CheckResendInterval<0 { - return log.FatalLevel, errors.New("Resend interval must be not negative") + if settings.CheckResendInterval==nil || *settings.CheckResendInterval<0 { + return log.FatalLevel, errors.New("Resend interval must be set and not negative") } if settings.PerformanceDbName == "" { diff --git a/consumer/api/cpp/include/consumer/data_broker.h b/consumer/api/cpp/include/consumer/data_broker.h index 78c0df9f768204bc191ee6de57dd0de5b0e0aee1..eb8eac957124c369462eedadd83925f2443d3583 100644 --- a/consumer/api/cpp/include/consumer/data_broker.h +++ b/consumer/api/cpp/include/consumer/data_broker.h @@ -32,6 +32,17 @@ class DataBroker { */ virtual Error Acknowledge(std::string group_id, uint64_t id, std::string substream = kDefaultSubstream) = 0; + //! Negative acknowledge data tuple for specific group id and substream. + /*! + \param group_id - group id to use. + \param id - data tuple id + \param delay_sec - data tuple will be redelivered after delay, 0 to redeliver immediately + \param substream (optional) - substream + \return nullptr of command was successful, otherwise error. + */ + virtual Error NegativeAcknowledge(std::string group_id, uint64_t id, uint64_t delay_sec, std::string substream = kDefaultSubstream) = 0; + + //! Get unacknowledged tuple for specific group id and substream. /*! \param group_id - group id to use. @@ -162,10 +173,10 @@ class DataBroker { //! Configure resending nonacknowledged data /*! \param resend - where to resend - \param resend_after - how many seconds to wait for acknowledgment - \param resend_attempts - how many time to resend. + \param delay_sec - how many seconds to wait before resending + \param resend_attempts - how many resend attempts to make */ - virtual void SetResendNacs(bool resend, uint64_t resend_after, uint64_t resend_attempts) = 0; + virtual void SetResendNacs(bool resend, uint64_t delay_sec, uint64_t resend_attempts) = 0; virtual ~DataBroker() = default; // needed for unique_ptr to delete itself diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp index 95d5b1a5e80f58635bf9361b772091a2f5c0652f..b8d9679b758ff54399152560bdbbbb07b53a6401 100644 --- a/consumer/api/cpp/src/server_data_broker.cpp +++ b/consumer/api/cpp/src/server_data_broker.cpp @@ -198,8 +198,8 @@ Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string g if (err == nullptr) { auto ri = PrepareRequestInfo(request_api + request_suffix, dataset); if (request_suffix == "next" && resend_) { - ri.extra_params = ri.extra_params + "&resend_nacks=true" + "&resend_after=" + - std::to_string(resend_after_) + "&resend_attempts=" + std::to_string(resend_attempts_); + ri.extra_params = ri.extra_params + "&resend_nacks=true" + "&delay_sec=" + + std::to_string(delay_sec_) + "&resend_attempts=" + std::to_string(resend_attempts_); } RequestOutput output; err = ProcessRequest(&output, ri, ¤t_broker_uri_); @@ -651,7 +651,7 @@ Error ServerDataBroker::Acknowledge(std::string group_id, uint64_t id, std::stri +"/" + std::move(substream) + "/" + std::move(group_id) + "/" + std::to_string(id); ri.post = true; - ri.body = "{\"Op\":\"Acknowledge\"}"; + ri.body = "{\"Op\":\"ackimage\"}"; Error err; BrokerRequestWithTimeout(ri, &err); @@ -717,10 +717,26 @@ uint64_t ServerDataBroker::GetLastAcknowledgedTulpeId(std::string group_id, Erro return GetLastAcknowledgedTulpeId(std::move(group_id), kDefaultSubstream, error); } -void ServerDataBroker::SetResendNacs(bool resend, uint64_t resend_after, uint64_t resend_attempts) { +void ServerDataBroker::SetResendNacs(bool resend, uint64_t delay_sec, uint64_t resend_attempts) { resend_ = resend; - resend_after_ = resend_after; + delay_sec_ = delay_sec; resend_attempts_ = resend_attempts; } +Error ServerDataBroker::NegativeAcknowledge(std::string group_id, + uint64_t id, + uint64_t delay_sec, + std::string substream) { + RequestInfo ri; + ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + + +"/" + std::move(substream) + + "/" + std::move(group_id) + "/" + std::to_string(id); + ri.post = true; + ri.body = R"({"Op":"negackimage","Params":{"DelaySec":)"+std::to_string(delay_sec)+"}}"; + + Error err; + BrokerRequestWithTimeout(ri, &err); + return err; +} + } diff --git a/consumer/api/cpp/src/server_data_broker.h b/consumer/api/cpp/src/server_data_broker.h index 55f032db33b5eff8b7fa5a397f499e741e05a1fd..00b640475ff98d6626cb70e89ecf04238b5eb8c6 100644 --- a/consumer/api/cpp/src/server_data_broker.h +++ b/consumer/api/cpp/src/server_data_broker.h @@ -52,6 +52,7 @@ class ServerDataBroker final : public asapo::DataBroker { SourceCredentials source); Error Acknowledge(std::string group_id, uint64_t id, std::string substream = kDefaultSubstream) override; + Error NegativeAcknowledge(std::string group_id, uint64_t id, uint64_t delay_sec, std::string substream = kDefaultSubstream) override; IdList GetUnacknowledgedTupleIds(std::string group_id, std::string substream, @@ -100,7 +101,7 @@ class ServerDataBroker final : public asapo::DataBroker { Error RetrieveData(FileInfo* info, FileData* data) override; std::vector<std::string> GetSubstreamList(Error* err) override; - void SetResendNacs(bool resend, uint64_t resend_after, uint64_t resend_attempts) override; + void SetResendNacs(bool resend, uint64_t delay_sec, uint64_t resend_attempts) override; std::unique_ptr<IO> io__; // modified in testings to mock system calls,otherwise do not touch std::unique_ptr<HttpClient> httpclient__; @@ -149,7 +150,7 @@ class ServerDataBroker final : public asapo::DataBroker { RequestInfo CreateFileTransferRequest(const FileInfo* info) const; uint64_t resend_timout_ = 0; bool resend_ = false; - uint64_t resend_after_; + uint64_t delay_sec_; uint64_t resend_attempts_; }; diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp index 0b442d023674217d113adf45d4a277aac9111aae..b19620c467582a64b400469adf053b09879f4911 100644 --- a/consumer/api/cpp/unittests/test_server_broker.cpp +++ b/consumer/api/cpp/unittests/test_server_broker.cpp @@ -1161,7 +1161,7 @@ TEST_F(ServerDataBrokerTests, GetImageTriesToGetTokenAgainIfTransferFailed) { TEST_F(ServerDataBrokerTests, AcknowledgeUsesCorrectUri) { MockGetBrokerUri(); - auto expected_acknowledge_command = "{\"Op\":\"Acknowledge\"}"; + auto expected_acknowledge_command = "{\"Op\":\"ackimage\"}"; EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/"+expected_substream+"/" + expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token=" @@ -1178,7 +1178,7 @@ TEST_F(ServerDataBrokerTests, AcknowledgeUsesCorrectUri) { TEST_F(ServerDataBrokerTests, AcknowledgeUsesCorrectUriWithDefaultSubStream) { MockGetBrokerUri(); - auto expected_acknowledge_command = "{\"Op\":\"Acknowledge\"}"; + auto expected_acknowledge_command = "{\"Op\":\"ackimage\"}"; EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" + expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token=" @@ -1252,7 +1252,7 @@ TEST_F(ServerDataBrokerTests, ResendNacks) { EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" + expected_group_id + "/next?token=" - + expected_token+"&resend_nacks=true&resend_after=10&resend_attempts=3", _, + + expected_token+"&resend_nacks=true&delay_sec=10&resend_attempts=3", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), @@ -1263,4 +1263,20 @@ TEST_F(ServerDataBrokerTests, ResendNacks) { } +TEST_F(ServerDataBrokerTests, NegativeAcknowledgeUsesCorrectUri) { + MockGetBrokerUri(); + auto expected_neg_acknowledge_command = R"({"Op":"negackimage","Params":{"DelaySec":10}})"; + EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/"+expected_substream+"/" + + expected_group_id + + "/" + std::to_string(expected_dataset_id) + "?token=" + + expected_token,_,expected_neg_acknowledge_command, _,_)).WillOnce(DoAll( + SetArgPointee<3>(HttpCode::OK), + SetArgPointee<4>(nullptr), + Return(""))); + + auto err = data_broker->NegativeAcknowledge(expected_group_id, expected_dataset_id,10, expected_substream); + + ASSERT_THAT(err, Eq(nullptr)); +} + } diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 373ed8de567cbbf3296f7c0eabcdee97bfece366..afe1db63f8ed665c99a75e84d6012939fd02961c 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -56,6 +56,7 @@ cdef extern from "asapo_consumer.h" namespace "asapo" nogil: Error SetLastReadMarker(uint64_t value, string group_id,string substream) Error ResetLastReadMarker(string group_id,string substream) Error Acknowledge(string group_id, uint64_t id, string substream) + Error NegativeAcknowledge(string group_id, uint64_t id, uint64_t delay_sec, string substream) uint64_t GetLastAcknowledgedTulpeId(string group_id, string substream, Error* error) IdList GetUnacknowledgedTupleIds(string group_id, string substream, uint64_t from_id, uint64_t to_id, Error* error) string GenerateNewGroupId(Error* err) @@ -66,7 +67,7 @@ cdef extern from "asapo_consumer.h" namespace "asapo" nogil: DataSet GetDatasetById(uint64_t id,string group_id,string substream, Error* err) Error RetrieveData(FileInfo* info, FileData* data) vector[string] GetSubstreamList(Error* err) - void SetResendNacs(bool resend, uint64_t resend_after, uint64_t resend_attempts) + void SetResendNacs(bool resend, uint64_t delay_sec, uint64_t resend_attempts) cdef extern from "asapo_consumer.h" namespace "asapo" nogil: diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 75c7be9a21ed9b56fa96ea517589388af873746e..3b3ac2efb227c99fcac32e7f9e9fe6e302688cd8 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -201,7 +201,6 @@ cdef class PyDataBroker: for substream in substreams: list.append(_str(substream)) return list - def acknowledge(self, group_id, uint64_t id, substream = "default"): cdef string b_group_id = _bytes(group_id) cdef string b_substream = _bytes(substream) @@ -210,9 +209,17 @@ cdef class PyDataBroker: err = self.c_broker.Acknowledge(b_group_id,id,b_substream) if err: throw_exception(err) - def set_resend_nacs(self,bool resend, uint64_t resend_after, uint64_t resend_attempts): + def neg_acknowledge(self, group_id, uint64_t id, uint64_t delay_sec, substream = "default"): + cdef string b_group_id = _bytes(group_id) + cdef string b_substream = _bytes(substream) + cdef Error err + with nogil: + err = self.c_broker.NegativeAcknowledge(b_group_id,id,delay_sec,b_substream) + if err: + throw_exception(err) + def set_resend_nacs(self,bool resend, uint64_t delay_sec, uint64_t resend_attempts): with nogil: - self.c_broker.SetResendNacs(resend,resend_after,resend_attempts) + self.c_broker.SetResendNacs(resend,delay_sec,resend_attempts) def get_last_acknowledged_tuple_id(self, group_id, substream = "default"): cdef string b_group_id = _bytes(group_id) diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp index 0d13b0c9af680611df24e9f467039726e3fa8567..0ba8d1b644717f3e8587ad5c152025ddd8057031 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.cpp +++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp @@ -167,7 +167,18 @@ void TestSingle(const std::unique_ptr<asapo::DataBroker>& broker, const std::str nacks = broker->GetUnacknowledgedTupleIds(group_id,"stream1",0,0,&err); M_AssertTrue(nacks.size() == 4, "nacks stream1 size = 4 after ack"); -// resend +// negative acks + broker->ResetLastReadMarker(group_id); + err = broker->GetNext(&fi, group_id, nullptr); + M_AssertTrue(err == nullptr, "GetNextNegAckBeforeResend no error"); + M_AssertTrue(fi.name == "1", "GetNextNegAckBeforeResend filename"); + err = broker->NegativeAcknowledge(group_id,1,0); + M_AssertTrue(err == nullptr, "NegativeAcknowledge no error"); + err = broker->GetNext(&fi, group_id, nullptr); + M_AssertTrue(err == nullptr, "GetNextNegAckWithResend no error"); + M_AssertTrue(fi.name == "1", "GetNextNegAckWithResend filename"); + +// automatic resend broker->ResetLastReadMarker(group_id); broker->SetResendNacs(true,0,1); err = broker->GetNext(&fi, group_id, nullptr); diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py index 26422e76d13f7a6fd152e8c1834110e52ac49575..e0901a0bb9b9d950bb30565f56d7b33e8d45e59e 100644 --- a/tests/automatic/consumer/consumer_api_python/consumer_api.py +++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py @@ -152,6 +152,14 @@ def check_single(broker,group_id): nacks = broker.get_unacknowledged_tuple_ids(group_id) assert_eq(len(nacks),4,"nacks stream1 size = 4 after ack") +# neg acks + broker.reset_lastread_marker(group_id) + _, meta = broker.get_next(group_id, meta_only=True) + assert_metaname(meta,"1","get next neg ack before resend") + broker.reset_lastread_marker(group_id) + _, meta = broker.get_next(group_id, meta_only=True) + assert_metaname(meta,"1","get next neg ack with resend") + #resend broker.reset_lastread_marker(group_id) broker.set_resend_nacs(True,0,1)