diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 5cfbbd2b9bea36a989123be775bd8ddb01cacf69..227381d70bb37640eb5c4f4ca26df45849e95581 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -223,7 +223,7 @@ func maxIndexQuery(request Request, returnIncompete bool) bson.M { return q } -func getIntVal(request Request, val interface{}) (int32, error) { +func getInt32Val(request Request, val interface{}) (int32, error) { var result int32 switch v := val.(type) { default: @@ -238,6 +238,21 @@ func getIntVal(request Request, val interface{}) (int32, error) { return result, nil } +func getIntVal(request Request, val interface{}) (int, error) { + var result int + switch v := val.(type) { + default: + request.Logger().Debug("unexpected type %T", v) + return 0, errors.New("cannot convert value to int") + case int32: + result, _ = val.(int) + case int64: + val, _ := val.(int64) + result = int(val) + } + return result, nil +} + func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool, idKey string) (int, error) { c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream) q := maxIndexQuery(request, returnIncompete) @@ -251,7 +266,7 @@ func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool, idKey stri if err != nil { return 0, err } - maxId, ok := getIntVal(request, result[idKey]) + maxId, ok := getInt32Val(request, result[idKey]) if ok != nil { return 0, errors.New("cannot get max index by " + idKey) } @@ -667,6 +682,7 @@ func (db *Mongodb) updateCurrentPointerIfEqualDB(request Request, currentValue, } // Return record based on current pointer, index of the record and error +// Return actual value of _id, even if request is done for time_id // If obtaining current pointer fails, record is corrupted or thisis end of stream: return 0 instead of record id // If record is missing, or incomplete return valid record id func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNext) ([]byte, int, error) { @@ -677,6 +693,13 @@ func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNex } var res map[string]interface{} res, err = db.getRecordFromDb(request, nextInd, maxInd, params.IdKey) + var idx int + if params.IdKey == "_id" { + idx = nextInd + } + if err == nil && params.IdKey == "time_id" { + idx, _ = getIntVal(request, res["_id"]) + } // Missing Id! Since time_Ids are sequential, jump to the next available id // Update current pointer to next available id if err != nil && params.IdKey == "time_id" { @@ -689,9 +712,13 @@ func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNex if err != nil { return nil, 0, err } - nextValue, ok := getIntVal(request, res["time_id"]) + nextValue, ok := getInt32Val(request, res["time_id"]) + if ok != nil { + return nil, 0, errors.New("failed to get next available id. Extraction of id fails") + } + idx, ok = getIntVal(request, res["_id"]) if ok != nil { - return nil, 0, errors.New("failed to next next available id. Extraction of id fails") + return nil, 0, errors.New("failed to get _id. Extraction of _id fails") } updateErr := db.updateCurrentPointerIfEqualDB(request, curPointer.Value, int(nextValue)) // If update fails, another broker already change the value. Go to the next one. @@ -703,7 +730,7 @@ func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNex } else { // In this case we can not guess, what is the next if err != nil { - return nil, nextInd, err + return nil, idx, err } } @@ -711,7 +738,7 @@ func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNex return nil, 0, err } - request.Logger().WithFields(map[string]interface{}{"id": nextInd}).Debug("got record from db by id key: ", + request.Logger().WithFields(map[string]interface{}{"id": idx}).Debug("got record from db by id key: ", params.IdKey) record, err := utils.MapToJson(&res) @@ -720,9 +747,9 @@ func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNex return nil, 0, err } if recordContainsPartialData(request, res) { - return nil, nextInd, &DBError{utils.StatusPartialData, string(record)} + return nil, idx, &DBError{utils.StatusPartialData, string(record)} } else { - return record, nextInd, nil + return record, idx, nil } }