diff --git a/broker/src/asapo_broker/database/database.go b/broker/src/asapo_broker/database/database.go index 814d1a321e701f4e1e91dc5dc958f319a9ee4966..a620d929c1eca25b868a4dcede96cd5321e4b228 100644 --- a/broker/src/asapo_broker/database/database.go +++ b/broker/src/asapo_broker/database/database.go @@ -16,6 +16,18 @@ type Request struct { ExtraParam string } +type ExtraParamId struct { + Id int `json:"id"` + IdKey string `json:"idKey"` +} + +type ExtraParamNext struct { + IdKey string `json:"id_key"` + Resend bool `json:"resend"` + DelayMs int `json:"delay_ms"` + ResendAttempts int `json:"resend_attempts"` +} + func (request *Request) Logger() logger.Logger { return logger.WithFields(map[string]interface{}{ "beamtime": request.Beamtime, diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index b59bf4c8bba62a2c0d462cb0c49c79450d6c65b4..ce7ddcc202107e985424cee5725885b7766eaa71 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -26,6 +26,7 @@ type ID struct { type MessageRecord struct { ID int `json:"_id"` + MessageID int `json:"message_id"` Timestamp int `bson:"timestamp" json:"timestamp"` Name string `json:"name"` Meta map[string]interface{} `json:"meta"` @@ -201,25 +202,31 @@ func maxIndexQuery(request Request, returnIncompete bool) bson.M { } q = bson.M{"$or": []interface{}{bson.M{"name": finish_stream_keyword}, q}} } else { - q = nil + q = bson.M{} } return q } -func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool) (max_id int, err error) { +func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool, idKey string) (int, error) { c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream) q := maxIndexQuery(request, returnIncompete) - opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true) - var result ID - err = c.FindOne(context.TODO(), q, opts).Decode(&result) + opts := options.FindOne().SetSort(bson.M{idKey: -1}).SetProjection(bson.D{{idKey, 1}}) + var result map[string]interface{} + err := c.FindOne(context.TODO(), q, opts).Decode(&result) if err == mongo.ErrNoDocuments { return 0, nil - } - - return result.ID, err + if err != nil { + return 0, err + } + maxId, ok := result[idKey].(int32) + if !ok { + return 0, errors.New("cannot get max index by " + idKey) + } + return int(maxId), nil } + func duplicateError(err error) bool { command_error, ok := err.(mongo.CommandError) if !ok { @@ -242,7 +249,8 @@ func (db *Mongodb) setCounter(request Request, ind int) (err error) { } func (db *Mongodb) errorWhenCannotSetField(request Request, max_ind int) error { - if res, err := db.getRecordFromDb(request, max_ind, max_ind); err == nil { + // ToDo check tat idKey should be _id + if res, err := db.getRecordFromDb(request, max_ind, max_ind, "_id"); err == nil { if err2 := checkStreamFinished(request, max_ind, max_ind, res); err2 != nil { return err2 } @@ -312,8 +320,8 @@ func recordContainsPartialData(request Request, rec map[string]interface{}) bool return false } -func (db *Mongodb) getRecordFromDb(request Request, id, id_max int) (res map[string]interface{}, err error) { - q := bson.M{"_id": id} +func (db *Mongodb) getRecordFromDb(request Request, id, id_max int, idKey string) (res map[string]interface{}, err error) { + q := bson.M{idKey: id} c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream) err = c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res) if err != nil { @@ -324,17 +332,21 @@ func (db *Mongodb) getRecordFromDb(request Request, id, id_max int) (res map[str return res, err } -func (db *Mongodb) getRecordByIDRaw(request Request, id, id_max int) ([]byte, error) { - res, err := db.getRecordFromDb(request, id, id_max) +func (db *Mongodb) getRecordByIDRaw(request Request, id, id_max int, idKey string) ([]byte, error) { + res, err := db.getRecordFromDb(request, id, id_max, idKey) if err != nil { return nil, err } + // Copy value of d key to _id to keep back compatibility + if res["message_id"] != nil { + res["_id"] = res["message_id"] + } if err := checkStreamFinished(request, id, id_max, res); err != nil { return nil, err } - request.Logger().WithFields(map[string]interface{}{"id": id}).Debug("got record from db") + request.Logger().WithFields(map[string]interface{}{"id": id}).Debug("got record from db by", idKey) record, err := utils.MapToJson(&res) if err != nil { @@ -372,17 +384,18 @@ func (db *Mongodb) getEarliestRawRecord(dbname string, collection_name string) ( } func (db *Mongodb) getRecordByID(request Request) ([]byte, error) { - id, err := strconv.Atoi(request.ExtraParam) + var params ExtraParamId + err := json.Unmarshal([]byte(request.ExtraParam), ¶ms) if err != nil { return nil, &DBError{utils.StatusWrongInput, err.Error()} } - max_ind, err := db.getMaxIndex(request, true) + max_ind, err := db.getMaxIndex(request, true, params.IdKey) if err != nil { return nil, err } - return db.getRecordByIDRaw(request, id, max_ind) + return db.getRecordByIDRaw(request, params.Id, max_ind, params.IdKey) } func (db *Mongodb) negAckRecord(request Request) ([]byte, error) { @@ -438,8 +451,8 @@ func (db *Mongodb) checkDatabaseOperationPrerequisites(request Request) error { return nil } -func (db *Mongodb) getCurrentPointer(request Request) (LocationPointer, int, error) { - max_ind, err := db.getMaxIndex(request, true) +func (db *Mongodb) getCurrentPointer(request Request, collectionName string, idKey string) (LocationPointer, int, error) { + max_ind, err := db.getMaxIndex(request, true, idKey) if err != nil { return LocationPointer{}, 0, err } @@ -450,7 +463,7 @@ func (db *Mongodb) getCurrentPointer(request Request) (LocationPointer, int, err var curPointer LocationPointer err = db.changeField(request, fieldChangeRequest{ - collectionName: pointer_collection_name, + collectionName: collectionName, fieldName: pointer_field_name, op: field_op_inc, max_ind: max_ind}, &curPointer) @@ -503,36 +516,22 @@ func (db *Mongodb) InsertRecordToInprocess(db_name string, collection_name strin return err } -func (db *Mongodb) InsertToInprocessIfNeeded(db_name string, collection_name string, id int, extra_param string) error { - if len(extra_param) == 0 { +func (db *Mongodb) InsertToInprocessIfNeeded(db_name string, collection_name string, id int, params ExtraParamNext) error { + if !params.Resend { return nil } - delayMs, nResendAttempts, err := extractsTwoIntsFromString(extra_param) - if err != nil { - return err - } - - return db.InsertRecordToInprocess(db_name, collection_name, id, delayMs, nResendAttempts, false) - + return db.InsertRecordToInprocess(db_name, collection_name, id, params.DelayMs, params.ResendAttempts, false) } -func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTimeout bool) (int, int, error) { - var record_ind, max_ind, delayMs, nResendAttempts int +func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, params ExtraParamNext, ignoreTimeout bool) (int, int, error) { + var record_ind, max_ind int var err error - if len(request.ExtraParam) != 0 { - delayMs, nResendAttempts, err = extractsTwoIntsFromString(request.ExtraParam) - if err != nil { - return 0, 0, err - } - } else { - nResendAttempts = -1 - } tNow := time.Now().Unix() dbSessionLock.Lock() t := db.lastReadFromInprocess[request.Stream+"_"+request.GroupId] dbSessionLock.Unlock() if (t <= tNow-int64(db.settings.ReadFromInprocessPeriod)) || ignoreTimeout { - record_ind, err = db.getUnProcessedId(request.DbName(), inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, delayMs, nResendAttempts, + record_ind, err = db.getUnProcessedId(request.DbName(), inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, params.DelayMs, params.ResendAttempts, request.Logger()) if err != nil { request.Logger().WithFields(map[string]interface{}{"cause": err.Error()}).Debug("error getting unprocessed message") @@ -540,7 +539,7 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi } } if record_ind != 0 { - max_ind, err = db.getMaxIndex(request, true) + max_ind, err = db.getMaxIndex(request, true, params.IdKey) if err != nil { return 0, 0, err } @@ -554,9 +553,10 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi } -func (db *Mongodb) getNextAndMaxIndexesFromCurPointer(request Request) (int, int, error) { - curPointer, max_ind, err := db.getCurrentPointer(request) +func (db *Mongodb) getNextAndMaxIndexesFromCurPointer(request Request, collectionName, idKey string) (int, int, error) { + curPointer, max_ind, err := db.getCurrentPointer(request, collectionName, idKey) if err != nil { + request.Logger().Debug("error getting next pointer", err) request.Logger().WithFields(map[string]interface{}{"cause": err.Error()}).Debug("error getting next pointer") return 0, 0, err } @@ -564,17 +564,17 @@ func (db *Mongodb) getNextAndMaxIndexesFromCurPointer(request Request) (int, int return curPointer.Value, max_ind, nil } -func (db *Mongodb) getNextAndMaxIndexes(request Request) (int, int, error) { - nextInd, maxInd, err := db.getNextAndMaxIndexesFromInprocessed(request, false) +func (db *Mongodb) getNextAndMaxIndexes(request Request, params ExtraParamNext) (int, int, error) { + nextInd, maxInd, err := db.getNextAndMaxIndexesFromInprocessed(request, params, false) if err != nil { return 0, 0, err } if nextInd == 0 { - nextInd, maxInd, err = db.getNextAndMaxIndexesFromCurPointer(request) + nextInd, maxInd, err = db.getNextAndMaxIndexesFromCurPointer(request, pointer_collection_name, params.IdKey) if err_db, ok := err.(*DBError); ok && err_db.Code == utils.StatusNoData { var err_inproc error - nextInd, maxInd, err_inproc = db.getNextAndMaxIndexesFromInprocessed(request, true) + nextInd, maxInd, err_inproc = db.getNextAndMaxIndexesFromInprocessed(request, params, true) if err_inproc != nil { return 0, 0, err_inproc } @@ -604,16 +604,19 @@ func ExtractMessageRecord(data map[string]interface{}) (MessageRecord, bool) { return r, true } -func (db *Mongodb) tryGetRecordFromInprocessed(request Request, originalerror error) ([]byte, error) { +func (db *Mongodb) getRecordFromInprocessed(request Request, params ExtraParamNext, originalerror error, ignoreTimeout bool) ([]byte, int, error) { + // Get next index from inprocessed collection and + // return corresponding data var err_inproc error - nextInd, maxInd, err_inproc := db.getNextAndMaxIndexesFromInprocessed(request, true) + nextInd, maxInd, err_inproc := db.getNextAndMaxIndexesFromInprocessed(request, params, ignoreTimeout) if err_inproc != nil { - return nil, err_inproc + return nil, 0, err_inproc } if nextInd != 0 { - return db.getRecordByIDRaw(request, nextInd, maxInd) + data, err := db.getRecordByIDRaw(request, nextInd, maxInd, params.IdKey) + return data, nextInd, err } else { - return nil, originalerror + return nil, 0, originalerror } } @@ -632,18 +635,25 @@ func checkStreamFinished(request Request, id, id_max int, data map[string]interf } func (db *Mongodb) getNextRecord(request Request) ([]byte, error) { - nextInd, maxInd, err := db.getNextAndMaxIndexes(request) + var params ExtraParamNext + err := json.Unmarshal([]byte(request.ExtraParam), ¶ms) + if err != nil { + return nil, errors.New("fails to extract request parameters: " + err.Error()) + } + request.Logger().Debug("get parameters ", params.IdKey) + + nextInd, maxInd, err := db.getNextAndMaxIndexes(request, params) if err != nil { return nil, err } - data, err := db.getRecordByIDRaw(request, nextInd, maxInd) + data, err := db.getRecordByIDRaw(request, nextInd, maxInd, params.IdKey) if err != nil { - data, err = db.tryGetRecordFromInprocessed(request, err) + data, nextInd, err = db.getRecordFromInprocessed(request, params, err, true) } if err == nil { - err_update := db.InsertToInprocessIfNeeded(request.DbName(), inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, nextInd, request.ExtraParam) + err_update := db.InsertToInprocessIfNeeded(request.DbName(), inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, nextInd, params) if err_update != nil { return nil, err_update } @@ -651,16 +661,16 @@ func (db *Mongodb) getNextRecord(request Request) ([]byte, error) { return data, err } -func (db *Mongodb) getLastRecord(request Request) ([]byte, error) { - max_ind, err := db.getMaxIndex(request, false) +func (db *Mongodb) getLastRecord(request Request, idKey string) ([]byte, error) { + max_ind, err := db.getMaxIndex(request, false, idKey) if err != nil { return nil, err } - return db.getRecordByIDRaw(request, max_ind, max_ind) + return db.getRecordByIDRaw(request, max_ind, max_ind, idKey) } -func (db *Mongodb) getLastRecordInGroup(request Request) ([]byte, error) { - max_ind, err := db.getMaxIndex(request, false) +func (db *Mongodb) getLastRecordInGroup(request Request, idKey string) ([]byte, error) { + max_ind, err := db.getMaxIndex(request, false, idKey) if err != nil { return nil, err } @@ -676,7 +686,7 @@ func (db *Mongodb) getLastRecordInGroup(request Request) ([]byte, error) { if err != nil { return nil, err } - return db.getRecordByIDRaw(request, max_ind, max_ind) + return db.getRecordByIDRaw(request, max_ind, max_ind, idKey) } func getSizeFilter(request Request) bson.M { @@ -837,7 +847,7 @@ func (db *Mongodb) getNacksLimits(request Request) (int, int, error) { } if to == 0 { - to, err = db.getMaxLimitWithoutEndOfStream(request, err) + to, err = db.getMaxLimitWithoutEndOfStream(request, err, "_id") if err != nil { return 0, 0, err } @@ -845,12 +855,12 @@ func (db *Mongodb) getNacksLimits(request Request) (int, int, error) { return from, to, nil } -func (db *Mongodb) getMaxLimitWithoutEndOfStream(request Request, err error) (int, error) { - maxInd, err := db.getMaxIndex(request, true) +func (db *Mongodb) getMaxLimitWithoutEndOfStream(request Request, err error, idKey string) (int, error) { + maxInd, err := db.getMaxIndex(request, true, idKey) if err != nil { return 0, err } - _, last_err := db.getRecordByIDRaw(request, maxInd, maxInd) + _, last_err := db.getRecordByIDRaw(request, maxInd, maxInd, idKey) if last_err != nil && maxInd > 0 { maxInd = maxInd - 1 } @@ -1106,9 +1116,9 @@ func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) { case "id": return db.getRecordByID(request) case "last": - return db.getLastRecord(request) + return db.getLastRecord(request, "_id") case "groupedlast": - return db.getLastRecordInGroup(request) + return db.getLastRecordInGroup(request, "_id") case "resetcounter": return db.resetCounter(request) case "size": diff --git a/broker/src/asapo_broker/server/get_id.go b/broker/src/asapo_broker/server/get_id.go index 34ddb643e9b2b12e7b8e848a01b6770766d2c37f..6fb4f652bc256bf3828bf0618da9f624949680d9 100644 --- a/broker/src/asapo_broker/server/get_id.go +++ b/broker/src/asapo_broker/server/get_id.go @@ -1,8 +1,11 @@ package server import ( + "asapo_broker/database" + "encoding/json" "github.com/gorilla/mux" "net/http" + "strconv" ) func extractRequestParametersID(r *http.Request) (string, bool) { @@ -14,11 +17,32 @@ func extractRequestParametersID(r *http.Request) (string, bool) { return id_str, true } +func getExtraParameters(r *http.Request) (string, bool) { + vars := mux.Vars(r) + id_str, ok := vars["id"] + if !ok { + return "0", ok + } + id_int, err := strconv.Atoi(id_str) + if err != nil { + return "0", false + } + id_key, id_ok := vars["id_key"] + if !id_ok { + id_key = "message_id" + } + extraParam := database.ExtraParamId{ + Id: id_int, + IdKey: id_key} + encoded, _ := json.Marshal(extraParam) + return string(encoded), true +} + func routeGetByID(w http.ResponseWriter, r *http.Request) { - id, ok := extractRequestParametersID(r) + extraParam, ok := getExtraParameters(r) if !ok { w.WriteHeader(http.StatusBadRequest) return } - processRequest(w, r, "id", id, false) + processRequest(w, r, "id", extraParam, false) } diff --git a/broker/src/asapo_broker/server/get_next.go b/broker/src/asapo_broker/server/get_next.go index 3f051f57e72398d1eeb8222fecbed005de4dc3f0..c76dfc4ce029eed7de284dd417608f80b068e2ce 100644 --- a/broker/src/asapo_broker/server/get_next.go +++ b/broker/src/asapo_broker/server/get_next.go @@ -1,21 +1,42 @@ package server import ( + "asapo_broker/database" + "encoding/json" "net/http" + "strconv" ) -func extractResend(r *http.Request) (string) { +func extractParams(r *http.Request) string { keys := r.URL.Query() - resend := keys.Get("resend_nacks") - delay_ms := keys.Get("delay_ms") - resend_attempts := keys.Get("resend_attempts") - resend_params := "" - if len(resend)!=0 { - resend_params=delay_ms+"_"+resend_attempts + delay_ms, err := strconv.Atoi(keys.Get("delay_ms")) + if err != nil { + delay_ms = 0 } - return resend_params + resend_attempts, err := strconv.Atoi(keys.Get("resend_attempts")) + if err != nil { + resend_attempts = 0 + } + resend := true + if len(keys.Get("resend_nacks")) == 0 { + resend = false + resend_attempts = -1 + } + IdKey := keys.Get("id_key") + if IdKey == "" { + IdKey = "_id" + } + extraParam := database.ExtraParamNext{ + IdKey: IdKey, + Resend: resend, + DelayMs: delay_ms, + ResendAttempts: resend_attempts} + + encoded, _ := json.Marshal(extraParam) + return string(encoded) + } func routeGetNext(w http.ResponseWriter, r *http.Request) { - processRequest(w, r, "next", extractResend(r), true) + processRequest(w, r, "next", extractParams(r), true) }