Skip to content
Snippets Groups Projects
Commit d09702d0 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Fix acknowledgement for get next available. In-process table now contain only _id, but not time_id.

parent 82672ffc
No related branches found
No related tags found
No related merge requests found
......@@ -223,7 +223,7 @@ func maxIndexQuery(request Request, returnIncompete bool) bson.M {
return q
}
func getIntVal(request Request, val interface{}) (int32, error) {
func getInt32Val(request Request, val interface{}) (int32, error) {
var result int32
switch v := val.(type) {
default:
......@@ -238,6 +238,21 @@ func getIntVal(request Request, val interface{}) (int32, error) {
return result, nil
}
func getIntVal(request Request, val interface{}) (int, error) {
var result int
switch v := val.(type) {
default:
request.Logger().Debug("unexpected type %T", v)
return 0, errors.New("cannot convert value to int")
case int32:
result, _ = val.(int)
case int64:
val, _ := val.(int64)
result = int(val)
}
return result, nil
}
func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool, idKey string) (int, error) {
c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream)
q := maxIndexQuery(request, returnIncompete)
......@@ -251,7 +266,7 @@ func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool, idKey stri
if err != nil {
return 0, err
}
maxId, ok := getIntVal(request, result[idKey])
maxId, ok := getInt32Val(request, result[idKey])
if ok != nil {
return 0, errors.New("cannot get max index by " + idKey)
}
......@@ -667,6 +682,7 @@ func (db *Mongodb) updateCurrentPointerIfEqualDB(request Request, currentValue,
}
// Return record based on current pointer, index of the record and error
// Return actual value of _id, even if request is done for time_id
// If obtaining current pointer fails, record is corrupted or thisis end of stream: return 0 instead of record id
// If record is missing, or incomplete return valid record id
func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNext) ([]byte, int, error) {
......@@ -677,6 +693,13 @@ func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNex
}
var res map[string]interface{}
res, err = db.getRecordFromDb(request, nextInd, maxInd, params.IdKey)
var idx int
if params.IdKey == "_id" {
idx = nextInd
}
if err == nil && params.IdKey == "time_id" {
idx, _ = getIntVal(request, res["_id"])
}
// Missing Id! Since time_Ids are sequential, jump to the next available id
// Update current pointer to next available id
if err != nil && params.IdKey == "time_id" {
......@@ -689,9 +712,13 @@ func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNex
if err != nil {
return nil, 0, err
}
nextValue, ok := getIntVal(request, res["time_id"])
nextValue, ok := getInt32Val(request, res["time_id"])
if ok != nil {
return nil, 0, errors.New("failed to get next available id. Extraction of id fails")
}
idx, ok = getIntVal(request, res["_id"])
if ok != nil {
return nil, 0, errors.New("failed to next next available id. Extraction of id fails")
return nil, 0, errors.New("failed to get _id. Extraction of _id fails")
}
updateErr := db.updateCurrentPointerIfEqualDB(request, curPointer.Value, int(nextValue))
// If update fails, another broker already change the value. Go to the next one.
......@@ -703,7 +730,7 @@ func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNex
} else {
// In this case we can not guess, what is the next
if err != nil {
return nil, nextInd, err
return nil, idx, err
}
}
......@@ -711,7 +738,7 @@ func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNex
return nil, 0, err
}
request.Logger().WithFields(map[string]interface{}{"id": nextInd}).Debug("got record from db by id key: ",
request.Logger().WithFields(map[string]interface{}{"id": idx}).Debug("got record from db by id key: ",
params.IdKey)
record, err := utils.MapToJson(&res)
......@@ -720,9 +747,9 @@ func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNex
return nil, 0, err
}
if recordContainsPartialData(request, res) {
return nil, nextInd, &DBError{utils.StatusPartialData, string(record)}
return nil, idx, &DBError{utils.StatusPartialData, string(record)}
} else {
return record, nextInd, nil
return record, idx, nil
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment