diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 9d73138625ac91f549fc6a5b6d37a9a6f50db8dc..24c7dde96cad848bc8cad0c98c0ea4f96cc169b1 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -16,7 +16,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" ) @@ -86,7 +85,7 @@ type Mongodb struct { client *mongo.Client timeout time.Duration settings DBSettings - lastReadFromInprocess int64 + lastReadFromInprocess map[string]int64 } func (db *Mongodb) SetSettings(settings DBSettings) { @@ -103,6 +102,9 @@ func (db *Mongodb) Ping() (err error) { } func (db *Mongodb) Connect(address string) (err error) { + dbSessionLock.Lock() + defer dbSessionLock.Unlock() + if db.client != nil { return &DBError{utils.StatusServiceUnavailable, already_connected_msg} } @@ -118,19 +120,21 @@ func (db *Mongodb) Connect(address string) (err error) { return err } - atomic.StoreInt64(&db.lastReadFromInprocess, time.Now().Unix()) + if db.lastReadFromInprocess == nil { + db.lastReadFromInprocess = make(map[string]int64, 100) + } return db.Ping() } func (db *Mongodb) Close() { + dbSessionLock.Lock() + defer dbSessionLock.Unlock() if db.client != nil { - dbSessionLock.Lock() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() db.client.Disconnect(ctx) db.client = nil - dbSessionLock.Unlock() } } @@ -357,7 +361,7 @@ func (db *Mongodb) negAckRecord(request Request) ([]byte, error) { return nil, &DBError{utils.StatusWrongInput, err.Error()} } - err = db.InsertRecordToInprocess(request.DbName, inprocess_collection_name_prefix+request.DbCollectionName+"_"+request.GroupId, input.Id, input.Params.DelayMs, 1) + err = db.InsertRecordToInprocess(request.DbName, inprocess_collection_name_prefix+request.DbCollectionName+"_"+request.GroupId, input.Id, input.Params.DelayMs, 1, true) return []byte(""), err } @@ -438,7 +442,7 @@ func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, delay return res.ID, nil } -func (db *Mongodb) InsertRecordToInprocess(db_name string, collection_name string, id int, delayMs int, nResendAttempts int) error { +func (db *Mongodb) InsertRecordToInprocess(db_name string, collection_name string, id int, delayMs int, nResendAttempts int, replaceIfExist bool) error { record := InProcessingRecord{ id, nResendAttempts, 0, time.Now().UnixNano() + int64(delayMs*1e6), } @@ -446,7 +450,11 @@ func (db *Mongodb) InsertRecordToInprocess(db_name string, collection_name strin c := db.client.Database(db_name).Collection(collection_name) _, err := c.InsertOne(context.TODO(), &record) if duplicateError(err) { - return nil + if !replaceIfExist { + return nil + } + _, err := c.ReplaceOne(context.TODO(), bson.M{"_id": id}, &record) + return err } return err } @@ -460,7 +468,7 @@ func (db *Mongodb) InsertToInprocessIfNeeded(db_name string, collection_name str return err } - return db.InsertRecordToInprocess(db_name, collection_name, id, delayMs, nResendAttempts) + return db.InsertRecordToInprocess(db_name, collection_name, id, delayMs, nResendAttempts, false) } @@ -476,7 +484,10 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi nResendAttempts = -1 } tNow := time.Now().Unix() - if (atomic.LoadInt64(&db.lastReadFromInprocess) <= tNow-int64(db.settings.ReadFromInprocessPeriod)) || ignoreTimeout { + dbSessionLock.Lock() + t := db.lastReadFromInprocess[request.DbCollectionName+"_"+request.GroupId] + dbSessionLock.Unlock() + if (t <= tNow-int64(db.settings.ReadFromInprocessPeriod)) || ignoreTimeout { record_ind, err = db.getUnProcessedId(request.DbName, inprocess_collection_name_prefix+request.DbCollectionName+"_"+request.GroupId, delayMs, nResendAttempts) if err != nil { log_str := "error getting unprocessed id " + request.DbName + ", groupid: " + request.GroupId + ":" + err.Error() @@ -490,7 +501,9 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi return 0, 0, err } } else { - atomic.StoreInt64(&db.lastReadFromInprocess, time.Now().Unix()) + dbSessionLock.Lock() + db.lastReadFromInprocess[request.DbCollectionName+"_"+request.GroupId] = time.Now().Unix() + dbSessionLock.Unlock() } return record_ind, max_ind, nil @@ -814,7 +827,7 @@ func (db *Mongodb) deleteDataCollection(errorOnNotexist bool, request Request) e return err } if !exist { - return &DBError{utils.StatusWrongInput, "stream "+request.DbCollectionName+" does not exist"} + return &DBError{utils.StatusWrongInput, "stream " + request.DbCollectionName + " does not exist"} } } return db.deleteCollection(request, dataCol) @@ -843,14 +856,14 @@ func (db *Mongodb) deleteCollectionsWithPrefix(request Request, prefix string) e return nil } -func (db *Mongodb) deleteServiceMeta(request Request) (error) { +func (db *Mongodb) deleteServiceMeta(request Request) error { err := db.deleteCollectionsWithPrefix(request, acks_collection_name_prefix+request.DbCollectionName) if err != nil { - return err + return err } err = db.deleteCollectionsWithPrefix(request, inprocess_collection_name_prefix+request.DbCollectionName) if err != nil { - return err + return err } return db.deleteDocumentsInCollection(request, pointer_collection_name, "_id", ".*_"+request.DbCollectionName+"$") } diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index 66a78e1a5f94f0d06fbe4d7b3da938c413d98127..d6e7b0d717eeb69edd8699444d4231c72dcf5b73 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -1122,10 +1122,14 @@ func TestMongoDBNegAck(t *testing.T) { db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "negackmessage", ExtraParam: string(bparam)}) res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) // first time message from negack _, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) // second time nothing + db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "negackmessage", ExtraParam: string(bparam)}) + _, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) // second time nothing assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) assert.NotNil(t, err1) + assert.Nil(t, err2) + if err1 != nil { assert.Equal(t, utils.StatusNoData, err1.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"\"}", err1.Error()) diff --git a/broker/src/asapo_broker/server/process_request.go b/broker/src/asapo_broker/server/process_request.go index 27350fed5c270b2555d511eb70dcc0cb5ef64453..7e001f3397feef9dc0384da6d38aa21078caa5e3 100644 --- a/broker/src/asapo_broker/server/process_request.go +++ b/broker/src/asapo_broker/server/process_request.go @@ -42,7 +42,7 @@ func checkGroupID(w http.ResponseWriter, needGroupID bool, group_id string, db_n if len(group_id) > 0 && len (group_id) < 100 && IsLetterOrNumbers(group_id) { return true } - err_str := "wrong groupid " + group_id + err_str := "wrong groupid name, check length or allowed charecters in " + group_id log_str := "processing get " + op + " request in " + db_name + " at " + settings.GetDatabaseServer() + ": " + err_str logger.Error(log_str) w.WriteHeader(http.StatusBadRequest)