Skip to content
Snippets Groups Projects
Commit be1e4d4e authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

refactor nacks

parent 6c908956
No related branches found
No related tags found
No related merge requests found
......@@ -16,7 +16,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
......@@ -86,7 +85,7 @@ type Mongodb struct {
client *mongo.Client
timeout time.Duration
settings DBSettings
lastReadFromInprocess int64
lastReadFromInprocess map[string]int64
}
func (db *Mongodb) SetSettings(settings DBSettings) {
......@@ -103,6 +102,9 @@ func (db *Mongodb) Ping() (err error) {
}
func (db *Mongodb) Connect(address string) (err error) {
dbSessionLock.Lock()
defer dbSessionLock.Unlock()
if db.client != nil {
return &DBError{utils.StatusServiceUnavailable, already_connected_msg}
}
......@@ -118,19 +120,21 @@ func (db *Mongodb) Connect(address string) (err error) {
return err
}
atomic.StoreInt64(&db.lastReadFromInprocess, time.Now().Unix())
if db.lastReadFromInprocess == nil {
db.lastReadFromInprocess = make(map[string]int64, 100)
}
return db.Ping()
}
func (db *Mongodb) Close() {
dbSessionLock.Lock()
defer dbSessionLock.Unlock()
if db.client != nil {
dbSessionLock.Lock()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
db.client.Disconnect(ctx)
db.client = nil
dbSessionLock.Unlock()
}
}
......@@ -357,7 +361,7 @@ func (db *Mongodb) negAckRecord(request Request) ([]byte, error) {
return nil, &DBError{utils.StatusWrongInput, err.Error()}
}
err = db.InsertRecordToInprocess(request.DbName, inprocess_collection_name_prefix+request.DbCollectionName+"_"+request.GroupId, input.Id, input.Params.DelayMs, 1)
err = db.InsertRecordToInprocess(request.DbName, inprocess_collection_name_prefix+request.DbCollectionName+"_"+request.GroupId, input.Id, input.Params.DelayMs, 1, true)
return []byte(""), err
}
......@@ -438,7 +442,7 @@ func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, delay
return res.ID, nil
}
func (db *Mongodb) InsertRecordToInprocess(db_name string, collection_name string, id int, delayMs int, nResendAttempts int) error {
func (db *Mongodb) InsertRecordToInprocess(db_name string, collection_name string, id int, delayMs int, nResendAttempts int, replaceIfExist bool) error {
record := InProcessingRecord{
id, nResendAttempts, 0, time.Now().UnixNano() + int64(delayMs*1e6),
}
......@@ -446,7 +450,11 @@ func (db *Mongodb) InsertRecordToInprocess(db_name string, collection_name strin
c := db.client.Database(db_name).Collection(collection_name)
_, err := c.InsertOne(context.TODO(), &record)
if duplicateError(err) {
return nil
if !replaceIfExist {
return nil
}
_, err := c.ReplaceOne(context.TODO(), bson.M{"_id": id}, &record)
return err
}
return err
}
......@@ -460,7 +468,7 @@ func (db *Mongodb) InsertToInprocessIfNeeded(db_name string, collection_name str
return err
}
return db.InsertRecordToInprocess(db_name, collection_name, id, delayMs, nResendAttempts)
return db.InsertRecordToInprocess(db_name, collection_name, id, delayMs, nResendAttempts, false)
}
......@@ -476,7 +484,10 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi
nResendAttempts = -1
}
tNow := time.Now().Unix()
if (atomic.LoadInt64(&db.lastReadFromInprocess) <= tNow-int64(db.settings.ReadFromInprocessPeriod)) || ignoreTimeout {
dbSessionLock.Lock()
t := db.lastReadFromInprocess[request.DbCollectionName+"_"+request.GroupId]
dbSessionLock.Unlock()
if (t <= tNow-int64(db.settings.ReadFromInprocessPeriod)) || ignoreTimeout {
record_ind, err = db.getUnProcessedId(request.DbName, inprocess_collection_name_prefix+request.DbCollectionName+"_"+request.GroupId, delayMs, nResendAttempts)
if err != nil {
log_str := "error getting unprocessed id " + request.DbName + ", groupid: " + request.GroupId + ":" + err.Error()
......@@ -490,7 +501,9 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi
return 0, 0, err
}
} else {
atomic.StoreInt64(&db.lastReadFromInprocess, time.Now().Unix())
dbSessionLock.Lock()
db.lastReadFromInprocess[request.DbCollectionName+"_"+request.GroupId] = time.Now().Unix()
dbSessionLock.Unlock()
}
return record_ind, max_ind, nil
......@@ -814,7 +827,7 @@ func (db *Mongodb) deleteDataCollection(errorOnNotexist bool, request Request) e
return err
}
if !exist {
return &DBError{utils.StatusWrongInput, "stream "+request.DbCollectionName+" does not exist"}
return &DBError{utils.StatusWrongInput, "stream " + request.DbCollectionName + " does not exist"}
}
}
return db.deleteCollection(request, dataCol)
......@@ -843,14 +856,14 @@ func (db *Mongodb) deleteCollectionsWithPrefix(request Request, prefix string) e
return nil
}
func (db *Mongodb) deleteServiceMeta(request Request) (error) {
func (db *Mongodb) deleteServiceMeta(request Request) error {
err := db.deleteCollectionsWithPrefix(request, acks_collection_name_prefix+request.DbCollectionName)
if err != nil {
return err
return err
}
err = db.deleteCollectionsWithPrefix(request, inprocess_collection_name_prefix+request.DbCollectionName)
if err != nil {
return err
return err
}
return db.deleteDocumentsInCollection(request, pointer_collection_name, "_id", ".*_"+request.DbCollectionName+"$")
}
......
......@@ -1122,10 +1122,14 @@ func TestMongoDBNegAck(t *testing.T) {
db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "negackmessage", ExtraParam: string(bparam)})
res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) // first time message from negack
_, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) // second time nothing
db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "negackmessage", ExtraParam: string(bparam)})
_, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) // second time nothing
assert.Nil(t, err)
assert.Equal(t, string(rec1_expect), string(res))
assert.NotNil(t, err1)
assert.Nil(t, err2)
if err1 != nil {
assert.Equal(t, utils.StatusNoData, err1.(*DBError).Code)
assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"\"}", err1.Error())
......
......@@ -42,7 +42,7 @@ func checkGroupID(w http.ResponseWriter, needGroupID bool, group_id string, db_n
if len(group_id) > 0 && len (group_id) < 100 && IsLetterOrNumbers(group_id) {
return true
}
err_str := "wrong groupid " + group_id
err_str := "wrong groupid name, check length or allowed charecters in " + group_id
log_str := "processing get " + op + " request in " + db_name + " at " + settings.GetDatabaseServer() + ": " + err_str
logger.Error(log_str)
w.WriteHeader(http.StatusBadRequest)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment