Skip to content
Snippets Groups Projects
Commit a18029f1 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

fix double ack

parent dead52d8
Branches
Tags
No related merge requests found
......@@ -373,13 +373,17 @@ func (db *Mongodb) ackRecord(request Request) ([]byte, error) {
}
c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId)
_, err = c.InsertOne(context.Background(), &record)
if err == nil {
c = db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId)
_, err_del := c.DeleteOne(context.Background(), bson.M{"_id": record.ID})
if err_del != nil {
return nil, &DBError{utils.StatusWrongInput, err.Error()}
if err != nil {
if duplicateError(err) {
return nil, &DBError{utils.StatusWrongInput, "already acknowledged"}
}
return nil, err
}
c = db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId)
_, err_del := c.DeleteOne(context.Background(), bson.M{"_id": record.ID})
if err_del != nil {
return nil, &DBError{utils.StatusWrongInput, err.Error()}
}
return []byte(""), err
......
......@@ -1102,6 +1102,18 @@ func TestMongoDBAckDeletesInprocessed(t *testing.T) {
}
}
func TestMongoDBAckTwiceErrors(t *testing.T) {
db.SetSettings(DBSettings{ReadFromInprocessPeriod: 0})
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, collection, &rec1)
query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}"
db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str})
_,err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str})
assert.Equal(t, utils.StatusWrongInput, err.(*DBError).Code)
}
func TestMongoDBNegAck(t *testing.T) {
db.SetSettings(DBSettings{ReadFromInprocessPeriod: 0})
db.Connect(dbaddress)
......
......@@ -179,6 +179,14 @@ def check_single(consumer, group_id):
consumer.acknowledge(group_id, 1)
try:
consumer.acknowledge(group_id, 1)
except asapo_consumer.AsapoWrongInputError as err:
print(err)
pass
else:
exit_on_noerr("should be wrong input on second ack")
nacks = consumer.get_unacknowledged_messages(group_id)
assert_eq(len(nacks), 4, "nacks default stream size = 4")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment