diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 24c7dde96cad848bc8cad0c98c0ea4f96cc169b1..4527693cfe7a9949dba17ddfe4ab2d20c7a3782d 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -373,13 +373,17 @@ func (db *Mongodb) ackRecord(request Request) ([]byte, error) { } c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId) _, err = c.InsertOne(context.Background(), &record) - - if err == nil { - c = db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId) - _, err_del := c.DeleteOne(context.Background(), bson.M{"_id": record.ID}) - if err_del != nil { - return nil, &DBError{utils.StatusWrongInput, err.Error()} + if err != nil { + if duplicateError(err) { + return nil, &DBError{utils.StatusWrongInput, "already acknowledged"} } + return nil, err + } + + c = db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId) + _, err_del := c.DeleteOne(context.Background(), bson.M{"_id": record.ID}) + if err_del != nil { + return nil, &DBError{utils.StatusWrongInput, err.Error()} } return []byte(""), err diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index d6e7b0d717eeb69edd8699444d4231c72dcf5b73..e1caa8553eca0693e4a60dbdde99a242983a7bff 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -1102,6 +1102,18 @@ func TestMongoDBAckDeletesInprocessed(t *testing.T) { } } +func TestMongoDBAckTwiceErrors(t *testing.T) { + db.SetSettings(DBSettings{ReadFromInprocessPeriod: 0}) + db.Connect(dbaddress) + defer cleanup() + db.insertRecord(dbname, collection, &rec1) + query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}" + db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}) + _,err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}) + assert.Equal(t, utils.StatusWrongInput, err.(*DBError).Code) +} + + func TestMongoDBNegAck(t *testing.T) { db.SetSettings(DBSettings{ReadFromInprocessPeriod: 0}) db.Connect(dbaddress) diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py index bb4bd0436898721d3655aa9c5dd42f1cea3bf21d..e15030262a6599d559e00e276d660a85f2cf8449 100644 --- a/tests/automatic/consumer/consumer_api_python/consumer_api.py +++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py @@ -179,6 +179,14 @@ def check_single(consumer, group_id): consumer.acknowledge(group_id, 1) + try: + consumer.acknowledge(group_id, 1) + except asapo_consumer.AsapoWrongInputError as err: + print(err) + pass + else: + exit_on_noerr("should be wrong input on second ack") + nacks = consumer.get_unacknowledged_messages(group_id) assert_eq(len(nacks), 4, "nacks default stream size = 4")