diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 7cd881b32d7c91bfe278bf2f483a3df21026b73f..c925dd7a3d94652c3bd5bee246dea7de4f66c57a 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -31,7 +31,7 @@ type ServiceRecord struct { type InProcessingRecord struct { ID int `bson:"_id" json:"_id"` - Attempts int + ResendAttempts int `bson:"resendAttempts" json:"resendAttempts"` Updated int64 } @@ -96,7 +96,6 @@ func (db *Mongodb) Connect(address string) (err error) { if db.client != nil { return &DBError{utils.StatusServiceUnavailable, already_connected_msg} } - db.client, err = mongo.NewClient(options.Client().SetConnectTimeout(20 * time.Second).ApplyURI("mongodb://" + address)) if err != nil { return err @@ -180,7 +179,7 @@ func duplicateError(err error) bool { if !ok1 { return false } - return strings.Contains(write_exception_error.Error(),"duplicate key") + return strings.Contains(write_exception_error.Error(), "duplicate key") } return command_error.Name == "DuplicateKey" } @@ -205,6 +204,11 @@ func (db *Mongodb) incrementField(dbname string, collection_name string, group_i // duplicateerror can happen because we set Upsert=true to allow insert pointer when it is absent. But then it will try to insert // pointer in case query found nothing, also when pointer exists and equal to max_ind. Here we have to return NoData if err == mongo.ErrNoDocuments || duplicateError(err) { + // try again without upsert - if the first error was due to missing pointer + opts = options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After) + if err2 := c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res);err2==nil { + return nil + } return &DBError{utils.StatusNoData, encodeAnswer(max_ind, max_ind, "")} } return &DBError{utils.StatusTransactionInterrupted, err.Error()} @@ -272,9 +276,9 @@ func (db *Mongodb) ackRecord(dbname string, collection_name string, group_id str c := db.client.Database(dbname).Collection(acks_collection_name_prefix + collection_name + "_" + group_id) _, err = c.InsertOne(context.Background(), &record) - if err==nil { - c = db.client.Database(dbname).Collection(inprocess_collection_name_prefix+group_id) - _,err_del := c.DeleteOne(context.Background(),bson.M{"_id":id}) + if err == nil { + c = db.client.Database(dbname).Collection(inprocess_collection_name_prefix + group_id) + _, err_del := c.DeleteOne(context.Background(), bson.M{"_id": id}) if err_del != nil { return nil, &DBError{utils.StatusWrongInput, err.Error()} } @@ -322,30 +326,12 @@ func (db *Mongodb) getCurrentPointer(db_name string, collection_name string, gro return curPointer, max_ind, nil } -func processLastRecord(data []byte, collection_name string, err error) ([]byte, error) { - var r ServiceRecord - err = json.Unmarshal(data, &r) - if err != nil || r.Name != finish_substream_keyword { - return data, err - } - var next_substream string - next_substream, ok := r.Meta["next_substream"].(string) - if !ok { - next_substream = no_next_substream_keyword - } - - answer := encodeAnswer(r.ID, r.ID, next_substream) - log_str := "reached end of substream " + collection_name + " , next_substream: " + next_substream - logger.Debug(log_str) - return nil, &DBError{utils.StatusNoData, answer} -} - -func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, resendAfter int) (int, error) { +func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, resendAfter int,nResendAttempts int) (int, error) { var res InProcessingRecord opts := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After) tNow := time.Now().Unix() - update := bson.M{"$set": bson.M{"updated": tNow}, "$inc": bson.M{"attempts": 1}} - q := bson.M{"updated": bson.M{"$lte": tNow - int64(resendAfter)}} + update := bson.M{"$set": bson.M{"updated": tNow}, "$inc": bson.M{"resendAttempts": 1}} + q := bson.M{"updated": bson.M{"$lte": tNow - int64(resendAfter)},"resendAttempts": bson.M{"$lt": nResendAttempts}} c := db.client.Database(dbname).Collection(collection_name) err := c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(&res) if err != nil { @@ -360,13 +346,13 @@ func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, resen return res.ID, nil } -func (db *Mongodb) InsertToInprocessIfNeeded(db_name string, collection_name string, id int, extra_param string) (error) { - if len(extra_param)==0 { +func (db *Mongodb) InsertToInprocessIfNeeded(db_name string, collection_name string, id int, extra_param string) error { + if len(extra_param) == 0 { return nil } record := InProcessingRecord{ - id,0,time.Now().Unix(), + id, 0, time.Now().Unix(), } c := db.client.Database(db_name).Collection(collection_name) @@ -377,64 +363,120 @@ func (db *Mongodb) InsertToInprocessIfNeeded(db_name string, collection_name str return err } -func (db *Mongodb) getNextRecord(db_name string, collection_name string, group_id string, dataset bool, extra_param string) ([]byte, error) { +func (db *Mongodb) getNextIndexesFromInprocessed(db_name string, collection_name string, group_id string, dataset bool, extra_param string, ignoreTimeout bool) (int, int, error) { + if len(extra_param) == 0 { + return 0, 0, nil + } + record_ind := 0 max_ind := 0 - if len(extra_param) > 0 { - resendAfter, _, err := extractsTwoIntsFromString(extra_param) + resendAfter, nResendAttempts, err := extractsTwoIntsFromString(extra_param) + if err != nil { + return 0, 0, err + } + tNow := time.Now().Unix() + if (atomic.LoadInt64(&db.lastReadFromInprocess) <= tNow-int64(db.settings.ReadFromInprocessPeriod)) || ignoreTimeout { + record_ind, err = db.getUnProcessedId(db_name, inprocess_collection_name_prefix+group_id, resendAfter,nResendAttempts) if err != nil { - return nil, err - } - tNow := time.Now().Unix() - if atomic.LoadInt64(&db.lastReadFromInprocess) <= tNow-int64(db.settings.ReadFromInprocessPeriod) { - record_ind, err = db.getUnProcessedId(db_name, inprocess_collection_name_prefix+group_id, resendAfter) - if err != nil { - log_str := "error getting unprocessed id " + db_name + ", groupid: " + group_id + ":" + err.Error() - logger.Debug(log_str) - return nil, err - } - } - if record_ind != 0 { - max_ind, err = db.getMaxIndex(db_name, collection_name, dataset) - if err != nil { - return nil, err - } + log_str := "error getting unprocessed id " + db_name + ", groupid: " + group_id + ":" + err.Error() + logger.Debug(log_str) + return 0, 0, err } } - - if record_ind == 0 { - var curPointer LocationPointer - var err error - curPointer, max_ind, err = db.getCurrentPointer(db_name, collection_name, group_id, dataset) + if record_ind != 0 { + max_ind, err = db.getMaxIndex(db_name, collection_name, dataset) if err != nil { - log_str := "error getting next pointer for " + db_name + ", groupid: " + group_id + ":" + err.Error() - logger.Debug(log_str) - return nil, err + return 0, 0, err } - log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + db_name + ", groupid: " + group_id + } else { + atomic.StoreInt64(&db.lastReadFromInprocess, time.Now().Unix()) + } + + return record_ind, max_ind, nil + +} + +func (db *Mongodb) getNextIndexesFromCurPointer(db_name string, collection_name string, group_id string, dataset bool, extra_param string) (int, int, error) { + curPointer, max_ind, err := db.getCurrentPointer(db_name, collection_name, group_id, dataset) + if err != nil { + log_str := "error getting next pointer for " + db_name + ", groupid: " + group_id + ":" + err.Error() logger.Debug(log_str) - record_ind = curPointer.Value + return 0, 0, err } + log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + db_name + ", groupid: " + group_id + logger.Debug(log_str) + return curPointer.Value, max_ind, nil +} +func (db *Mongodb) getNextIndexes(db_name string, collection_name string, group_id string, dataset bool, extra_param string) (int, int, error) { + nextInd, maxInd, err := db.getNextIndexesFromInprocessed(db_name, collection_name, group_id, dataset, extra_param, false) + if err != nil { + return 0, 0, err + } - data, err := db.getRecordByIDRow(db_name, collection_name, record_ind, max_ind, dataset) - if record_ind != max_ind { - if err==nil { - err_update := db.InsertToInprocessIfNeeded(db_name,inprocess_collection_name_prefix+group_id,record_ind,extra_param) - if err_update!=nil { - return nil,err_update + if nextInd == 0 { + nextInd, maxInd, err = db.getNextIndexesFromCurPointer(db_name, collection_name, group_id, dataset, extra_param) + if err_db, ok := err.(*DBError); ok && err_db.Code == utils.StatusNoData { + var err_inproc error + nextInd, maxInd, err_inproc = db.getNextIndexesFromInprocessed(db_name, collection_name, group_id, dataset, extra_param, true) + if err_inproc != nil { + return 0, 0, err_inproc + } + if nextInd == 0 { + return 0, 0, err } } + } + return nextInd, maxInd, nil +} + +func (db *Mongodb) processLastRecord(data []byte, err error, db_name string, collection_name string, + group_id string, dataset bool, extra_param string) ([]byte, error) { + var r ServiceRecord + err = json.Unmarshal(data, &r) + if err != nil || r.Name != finish_substream_keyword { return data, err } - data,err = processLastRecord(data, collection_name, err) - if err==nil { - err_update := db.InsertToInprocessIfNeeded(db_name,inprocess_collection_name_prefix+group_id,record_ind,extra_param) - if err_update!=nil { - return nil,err_update - } + var next_substream string + next_substream, ok := r.Meta["next_substream"].(string) + if !ok { + next_substream = no_next_substream_keyword } + answer := encodeAnswer(r.ID, r.ID, next_substream) + log_str := "reached end of substream " + collection_name + " , next_substream: " + next_substream + logger.Debug(log_str) + + + var err_inproc error + nextInd, maxInd, err_inproc := db.getNextIndexesFromInprocessed(db_name, collection_name, group_id, dataset, extra_param, true) + if err_inproc != nil { + return nil, err_inproc + } + if nextInd != 0 { + return db.getRecordByIDRow(db_name, collection_name, nextInd, maxInd, dataset) + } + + return nil, &DBError{utils.StatusNoData, answer} +} + +func (db *Mongodb) getNextRecord(db_name string, collection_name string, group_id string, dataset bool, extra_param string) ([]byte, error) { + nextInd, maxInd, err := db.getNextIndexes(db_name, collection_name, group_id, dataset, extra_param) + if err != nil { + return nil, err + } + + data, err := db.getRecordByIDRow(db_name, collection_name, nextInd, maxInd, dataset) + if nextInd == maxInd { + data, err = db.processLastRecord(data, err,db_name,collection_name,group_id,dataset,extra_param) + } + + if err == nil { + err_update := db.InsertToInprocessIfNeeded(db_name, inprocess_collection_name_prefix+group_id, nextInd, extra_param) + if err_update != nil { + return nil, err_update + } + } return data, err } diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index 54a923b8708cd5ec65d80f6da90f283032189a69..987aa3daa39595f2cee3e4d71bcf6e78c3030bb9 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -5,6 +5,7 @@ package database import ( "asapo_common/utils" "encoding/json" + "fmt" "github.com/stretchr/testify/assert" "sync" "testing" @@ -138,7 +139,7 @@ func TestMongoDBGetNextOK(t *testing.T) { assert.Equal(t, string(rec1_expect), string(res)) } -/* + func TestMongoDBGetNextErrorOnFinishedStream(t *testing.T) { db.Connect(dbaddress) defer cleanup() @@ -151,7 +152,7 @@ func TestMongoDBGetNextErrorOnFinishedStream(t *testing.T) { assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":2,\"next_substream\":\"next1\"}", err.(*DBError).Message) } -*/ + func TestMongoDBGetNextErrorOnNoMoreData(t *testing.T) { db.Connect(dbaddress) defer cleanup() @@ -189,19 +190,27 @@ func insertRecords(n int) { for ind, record := range records { record.ID = ind + 1 record.Name = string(ind) - db.insertRecord(dbname, collection, &record) + if err:= db.insertRecord(dbname, collection, &record);err!=nil { + fmt.Println("error at insert ",ind) + } } - time.Sleep(time.Millisecond*100) } -func getRecords(n int) []int { +func getRecords(n int, resend bool) []int { results := make([]int, n) var wg sync.WaitGroup wg.Add(n) + extra_param:="" + if resend { + extra_param="0_1" + } for i := 0; i < n; i++ { go func() { defer wg.Done() - res_bin, _:= db.ProcessRequest(dbname, collection, groupId, "next", "") + res_bin, err:= db.ProcessRequest(dbname, collection, groupId, "next", extra_param) + if err!=nil { + fmt.Println("error at read ",i) + } var res TestRecord json.Unmarshal(res_bin, &res) if res.ID>0 { @@ -220,11 +229,27 @@ func TestMongoDBGetNextInParallel(t *testing.T) { n := 100 insertRecords(n) - results := getRecords(n) + results := getRecords(n,false) assert.Equal(t, n, getNOnes(results)) } +func TestMongoDBGetNextInParallelWithResend(t *testing.T) { + db.SetSettings(DBSettings{ReadFromInprocessPeriod: 100}) + db.Connect(dbaddress) + defer cleanup() + n := 100 + insertRecords(n) + + results := getRecords(n,true) + results2 := getRecords(n,true) + + assert.Equal(t, n, getNOnes(results),"first") + assert.Equal(t, n, getNOnes(results2),"second") +} + + + func TestMongoDBGetLastAfterErasingDatabase(t *testing.T) { db.Connect(dbaddress) defer cleanup() @@ -249,7 +274,7 @@ func TestMongoDBGetNextAfterErasingDatabase(t *testing.T) { n := 100 insertRecords(n) - results := getRecords(n) + results := getRecords(n,false) assert.Equal(t, n, getNOnes(results)) } @@ -742,3 +767,122 @@ func TestMongoDBLastAcks(t *testing.T) { } +func TestMongoDBGetNextUsesInprocessedImmedeatly(t *testing.T) { + db.SetSettings(DBSettings{ReadFromInprocessPeriod: 0}) + db.Connect(dbaddress) + defer cleanup() + err := db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection, &rec2) + res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0_3") + res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "0_3") + + assert.Nil(t, err) + assert.Nil(t, err1) + assert.Equal(t, string(rec1_expect), string(res)) + assert.Equal(t, string(rec1_expect), string(res1)) +} + +func TestMongoDBGetNextUsesInprocessedNumRetry(t *testing.T) { + db.SetSettings(DBSettings{ReadFromInprocessPeriod: 0}) + db.Connect(dbaddress) + defer cleanup() + err := db.insertRecord(dbname, collection, &rec1) + res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0_1") + res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "0_1") + _, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "0_1") + + assert.Nil(t, err) + assert.Nil(t, err1) + assert.NotNil(t, err2) + if err2!=nil { + assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"\"}", err2.Error()) + } + assert.Equal(t, string(rec1_expect), string(res)) + assert.Equal(t, string(rec1_expect), string(res1)) +} + +func TestMongoDBGetNextUsesInprocessedAfterTimeout(t *testing.T) { + db.SetSettings(DBSettings{ReadFromInprocessPeriod: 0}) + db.Connect(dbaddress) + defer cleanup() + err := db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection, &rec2) + res, err := db.ProcessRequest(dbname, collection, groupId, "next", "1_3") + res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "1_3") + time.Sleep(time.Second) + res2, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "1_3") + assert.Nil(t, err) + assert.Nil(t, err1) + assert.Nil(t, err2) + assert.Equal(t, string(rec1_expect), string(res)) + assert.Equal(t, string(rec2_expect), string(res1)) + assert.Equal(t, string(rec1_expect), string(res2)) +} + +func TestMongoDBGetNextReturnsToNormalAfterUsesInprocessed(t *testing.T) { + db.SetSettings(DBSettings{ReadFromInprocessPeriod: 0}) + db.Connect(dbaddress) + defer cleanup() + err := db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection, &rec2) + res, err := db.ProcessRequest(dbname, collection, groupId, "next", "1_3") + time.Sleep(time.Second) + res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "1_3") + res2, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "1_3") + assert.Nil(t, err) + assert.Nil(t, err1) + assert.Nil(t, err2) + assert.Equal(t, string(rec1_expect), string(res)) + assert.Equal(t, string(rec1_expect), string(res1)) + assert.Equal(t, string(rec2_expect), string(res2)) +} + + +func TestMongoDBGetNextUsesInprocessedImmedeatlyIfFinishedStream(t *testing.T) { + db.SetSettings(DBSettings{ReadFromInprocessPeriod: 10}) + db.Connect(dbaddress) + defer cleanup() + err := db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection, &rec_finished) + res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0_3") + res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "0_3") + assert.Nil(t, err) + assert.Nil(t, err1) + assert.Equal(t, string(rec1_expect), string(res)) + assert.Equal(t, string(rec1_expect), string(res1)) +} + +func TestMongoDBGetNextUsesInprocessedImmedeatlyIfEndofStream(t *testing.T) { + db.SetSettings(DBSettings{ReadFromInprocessPeriod: 10}) + db.Connect(dbaddress) + defer cleanup() + err := db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection, &rec2) + res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0_3") + res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "0_3") + res2, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "0_3") + assert.Nil(t, err) + assert.Nil(t, err1) + assert.Nil(t, err2) + assert.Equal(t, string(rec1_expect), string(res)) + assert.Equal(t, string(rec2_expect), string(res1)) + assert.Equal(t, string(rec1_expect), string(res2)) +} + + + +func TestMongoDBAckDeletesInprocessed(t *testing.T) { + db.SetSettings(DBSettings{ReadFromInprocessPeriod: 0}) + db.Connect(dbaddress) + defer cleanup() + db.insertRecord(dbname, collection, &rec1) + db.ProcessRequest(dbname, collection, groupId, "next", "0_3") + db.ProcessRequest(dbname, collection, groupId, "ackimage", "1") + _, err := db.ProcessRequest(dbname, collection, groupId, "next", "0_3") + + assert.NotNil(t, err) + if err!=nil { + assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) + assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"\"}", err.Error()) + } +} \ No newline at end of file diff --git a/broker/src/asapo_broker/server/server.go b/broker/src/asapo_broker/server/server.go index 649764fe780a09a35d67b065344474ce166b8f91..1a559d891aa5f7953983c48588e8ea2d5029597c 100644 --- a/broker/src/asapo_broker/server/server.go +++ b/broker/src/asapo_broker/server/server.go @@ -20,6 +20,7 @@ type serverSettings struct { Port int LogLevel string discoveredDbAddress string + CheckResendInterval int } func (s *serverSettings) GetDatabaseServer() string { @@ -72,7 +73,7 @@ func InitDB(dbAgent database.Agent) (err error) { log.Debug("Got mongodb server: " + settings.discoveredDbAddress) } - db.SetSettings(database.DBSettings{ReadFromInprocessPeriod: 10}) + db.SetSettings(database.DBSettings{ReadFromInprocessPeriod: settings.CheckResendInterval}) return db.Connect(settings.GetDatabaseServer()) } diff --git a/broker/src/asapo_broker/server/server_nottested.go b/broker/src/asapo_broker/server/server_nottested.go index 5a4d32102e14453feddd1b3c86162b4e464222c2..0c512a1606d8df566218d5619ab93022a44046c3 100644 --- a/broker/src/asapo_broker/server/server_nottested.go +++ b/broker/src/asapo_broker/server/server_nottested.go @@ -53,6 +53,10 @@ func ReadConfig(fname string) (log.Level, error) { return log.FatalLevel, errors.New("Server port not set") } + if settings.CheckResendInterval<0 { + return log.FatalLevel, errors.New("Resend interval must be not negative") + } + if settings.PerformanceDbName == "" { return log.FatalLevel, errors.New("PerformanceDbName not set") } diff --git a/deploy/asapo_helm_chart/asapo/configs/asapo-broker.json b/deploy/asapo_helm_chart/asapo/configs/asapo-broker.json index 3753878d2cbce843fa612b36b264e83c5a227172..63452b6f31afeef34bfcdb1f8c6a3f74fc4c77b5 100644 --- a/deploy/asapo_helm_chart/asapo/configs/asapo-broker.json +++ b/deploy/asapo_helm_chart/asapo/configs/asapo-broker.json @@ -4,6 +4,7 @@ "PerformanceDbServer":"{{ .Chart.Name }}-influxdb:{{ .Values.influxdb.influxdb.service.port }}", "PerformanceDbName": "asapo_brokers", "Port": {{ .Values.ownServices.broker.port }}, + "CheckResendInterval":10, "LogLevel":"debug", "SecretFile":"/etc/broker/auth_secret.key" } diff --git a/deploy/asapo_services/scripts/broker.json.tpl b/deploy/asapo_services/scripts/broker.json.tpl index 26aead08bc8f38911278adc11e9af75e4861bec4..9b0f75f1e72f49db9ae54c8e3e85250466b58ed2 100644 --- a/deploy/asapo_services/scripts/broker.json.tpl +++ b/deploy/asapo_services/scripts/broker.json.tpl @@ -2,6 +2,7 @@ "DatabaseServer":"auto", "DiscoveryServer": "localhost:8400/asapo-discovery", "PerformanceDbServer":"localhost:8400/influxdb", + "CheckResendInterval":10, "PerformanceDbName": "asapo_brokers", "Port":{{ env "NOMAD_PORT_broker" }}, "LogLevel":"info", diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp index 7fafcb321b9687a0afb267bdd5708accf84ac147..0d13b0c9af680611df24e9f467039726e3fa8567 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.cpp +++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp @@ -166,6 +166,23 @@ void TestSingle(const std::unique_ptr<asapo::DataBroker>& broker, const std::str nacks = broker->GetUnacknowledgedTupleIds(group_id,"stream1",0,0,&err); M_AssertTrue(nacks.size() == 4, "nacks stream1 size = 4 after ack"); + +// resend + broker->ResetLastReadMarker(group_id); + broker->SetResendNacs(true,0,1); + err = broker->GetNext(&fi, group_id, nullptr); + M_AssertTrue(err == nullptr, "GetNextBeforeResend no error"); + M_AssertTrue(fi.name == "1", "GetNextBeforeResend filename"); + + err = broker->GetNext(&fi, group_id, nullptr); + M_AssertTrue(err == nullptr, "GetNextWithResend no error"); + M_AssertTrue(fi.name == "1", "GetNextWithResend filename"); + + broker->SetResendNacs(false,0,1); + err = broker->GetNext(&fi, group_id, nullptr); + M_AssertTrue(err == nullptr, "GetNextAfterResend no error"); + M_AssertTrue(fi.name == "2", "GetNextAfterResend filename"); + } diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py index 179fd15327546302e11c023ad37357cf4521f304..26422e76d13f7a6fd152e8c1834110e52ac49575 100644 --- a/tests/automatic/consumer/consumer_api_python/consumer_api.py +++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py @@ -152,6 +152,19 @@ def check_single(broker,group_id): nacks = broker.get_unacknowledged_tuple_ids(group_id) assert_eq(len(nacks),4,"nacks stream1 size = 4 after ack") +#resend + broker.reset_lastread_marker(group_id) + broker.set_resend_nacs(True,0,1) + _, meta = broker.get_next(group_id, meta_only=True) + assert_metaname(meta,"1","get next before resend") + + _, meta = broker.get_next(group_id, meta_only=True) + assert_metaname(meta,"1","get next with resend") + + _, meta = broker.get_next(group_id, meta_only=True) + assert_metaname(meta,"2","get next after resend") + + #images images = broker.query_images("meta.test = 10") diff --git a/tests/automatic/settings/broker_settings.json b/tests/automatic/settings/broker_settings.json index 769f171b7924765b97faf72819aa5690e9902cb9..a80cbbccdaa650fbfc2da3570dceb3eb392acbae 100644 --- a/tests/automatic/settings/broker_settings.json +++ b/tests/automatic/settings/broker_settings.json @@ -4,5 +4,6 @@ "PerformanceDbName": "db_test", "Port":5005, "LogLevel":"info", + "CheckResendInterval":0, "SecretFile":"auth_secret.key" } \ No newline at end of file diff --git a/tests/automatic/settings/broker_settings.json.tpl b/tests/automatic/settings/broker_settings.json.tpl index 3edfc9b888491cd19af803f02bdc9dda650bb95c..81860d6aef6faaeb4c81bf0462500a440456ce1e 100644 --- a/tests/automatic/settings/broker_settings.json.tpl +++ b/tests/automatic/settings/broker_settings.json.tpl @@ -2,6 +2,7 @@ "DatabaseServer":"auto", "DiscoveryServer": "localhost:8400/asapo-discovery", "PerformanceDbServer": "localhost:8086", + "CheckResendInterval":0, "PerformanceDbName": "db_test", "Port":{{ env "NOMAD_PORT_broker" }}, "LogLevel":"info", diff --git a/tests/manual/broker_debug_local/broker.json b/tests/manual/broker_debug_local/broker.json index df0effdb32e30400a8c2b0b8168cee944d171749..11c716e064c29638fa1dc000fce31b35aece8f69 100644 --- a/tests/manual/broker_debug_local/broker.json +++ b/tests/manual/broker_debug_local/broker.json @@ -2,6 +2,7 @@ "DatabaseServer":"auto", "DiscoveryServer": "localhost:8400/discovery", "PerformanceDbServer": "localhost:8086", + "CheckResendInterval":10, "PerformanceDbName": "db_test", "Port": 5005, "LogLevel":"info", diff --git a/tests/manual/performance_broker/settings.json b/tests/manual/performance_broker/settings.json index e982871610bb8de89de2f0ef140793e51361f82e..76e84d085f3e8550ddfb0d3e54b15ff18059c116 100644 --- a/tests/manual/performance_broker/settings.json +++ b/tests/manual/performance_broker/settings.json @@ -4,5 +4,6 @@ "PerformanceDbName": "db_test", "Port":5005, "LogLevel":"info", + "CheckResendInterval":10, "SecretFile":"auth_secret.key" } \ No newline at end of file diff --git a/tests/manual/performance_full_chain_simple/broker.json b/tests/manual/performance_full_chain_simple/broker.json index e982871610bb8de89de2f0ef140793e51361f82e..76e84d085f3e8550ddfb0d3e54b15ff18059c116 100644 --- a/tests/manual/performance_full_chain_simple/broker.json +++ b/tests/manual/performance_full_chain_simple/broker.json @@ -4,5 +4,6 @@ "PerformanceDbName": "db_test", "Port":5005, "LogLevel":"info", + "CheckResendInterval":10, "SecretFile":"auth_secret.key" } \ No newline at end of file diff --git a/tests/manual/receiver_debug_local/broker.json.tpl b/tests/manual/receiver_debug_local/broker.json.tpl index 56694d8538be26718c9e32bc69a538ea20abc987..9c840220c40110c613cc41dd55d82b5704311823 100644 --- a/tests/manual/receiver_debug_local/broker.json.tpl +++ b/tests/manual/receiver_debug_local/broker.json.tpl @@ -5,5 +5,6 @@ "PerformanceDbName": "db_test", "Port":{{ env "NOMAD_PORT_broker" }}, "LogLevel":"info", + "CheckResendInterval":10, "SecretFile":"auth_secret.key" } \ No newline at end of file