diff --git a/broker/src/asapo_broker/database/database.go b/broker/src/asapo_broker/database/database.go index 814d1a321e701f4e1e91dc5dc958f319a9ee4966..56a07d25a143165a12b0129aa647f723e253d7cb 100644 --- a/broker/src/asapo_broker/database/database.go +++ b/broker/src/asapo_broker/database/database.go @@ -16,6 +16,18 @@ type Request struct { ExtraParam string } +type ExtraParamId struct { + Id int `json:"id"` + IdKey string `json:"id_key"` +} + +type ExtraParamNext struct { + IdKey string `json:"id_key"` + Resend bool `json:"resend"` + DelayMs int `json:"delay_ms"` + ResendAttempts int `json:"resend_attempts"` +} + func (request *Request) Logger() logger.Logger { return logger.WithFields(map[string]interface{}{ "beamtime": request.Beamtime, diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index b59bf4c8bba62a2c0d462cb0c49c79450d6c65b4..fe5046e5e49c7926561036973106fa528b80aaf9 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -9,6 +9,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" @@ -26,6 +27,7 @@ type ID struct { type MessageRecord struct { ID int `json:"_id"` + MessageID int `json:"message_id"` Timestamp int `bson:"timestamp" json:"timestamp"` Name string `json:"name"` Meta map[string]interface{} `json:"meta"` @@ -201,25 +203,46 @@ func maxIndexQuery(request Request, returnIncompete bool) bson.M { } q = bson.M{"$or": []interface{}{bson.M{"name": finish_stream_keyword}, q}} } else { - q = nil + q = bson.M{} } return q } -func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool) (max_id int, err error) { +func getIntVal(request Request, val interface{}) (int32, error) { + var result int32 + switch v := val.(type) { + default: + request.Logger().Debug("unexpected type %T", v) + return 0, errors.New("cannot convert value to int") + case int32: + result, _ = val.(int32) + case int64: + val, _ := val.(int64) + result = int32(val) + } + return result, nil +} + +func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool, idKey string) (int, error) { c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream) q := maxIndexQuery(request, returnIncompete) - opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true) - var result ID - err = c.FindOne(context.TODO(), q, opts).Decode(&result) + opts := options.FindOne().SetSort(bson.M{idKey: -1}).SetProjection(bson.D{{idKey, 1}}) + var result map[string]interface{} + err := c.FindOne(context.TODO(), q, opts).Decode(&result) if err == mongo.ErrNoDocuments { return 0, nil - } - - return result.ID, err + if err != nil { + return 0, err + } + maxId, ok := getIntVal(request, result[idKey]) + if ok != nil { + return 0, errors.New("cannot get max index by " + idKey) + } + return int(maxId), nil } + func duplicateError(err error) bool { command_error, ok := err.(mongo.CommandError) if !ok { @@ -242,7 +265,8 @@ func (db *Mongodb) setCounter(request Request, ind int) (err error) { } func (db *Mongodb) errorWhenCannotSetField(request Request, max_ind int) error { - if res, err := db.getRecordFromDb(request, max_ind, max_ind); err == nil { + // ToDo check tat idKey should be _id + if res, err := db.getRecordFromDb(request, max_ind, max_ind, "_id"); err == nil { if err2 := checkStreamFinished(request, max_ind, max_ind, res); err2 != nil { return err2 } @@ -312,8 +336,8 @@ func recordContainsPartialData(request Request, rec map[string]interface{}) bool return false } -func (db *Mongodb) getRecordFromDb(request Request, id, id_max int) (res map[string]interface{}, err error) { - q := bson.M{"_id": id} +func (db *Mongodb) getRecordFromDb(request Request, id, id_max int, idKey string) (res map[string]interface{}, err error) { + q := bson.M{idKey: id} c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream) err = c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res) if err != nil { @@ -324,17 +348,21 @@ func (db *Mongodb) getRecordFromDb(request Request, id, id_max int) (res map[str return res, err } -func (db *Mongodb) getRecordByIDRaw(request Request, id, id_max int) ([]byte, error) { - res, err := db.getRecordFromDb(request, id, id_max) +func (db *Mongodb) getRecordByIDRaw(request Request, id, id_max int, idKey string) ([]byte, error) { + res, err := db.getRecordFromDb(request, id, id_max, idKey) if err != nil { return nil, err } + // Copy value of d key to _id to keep back compatibility + if res["message_id"] != nil { + res["_id"] = res["message_id"] + } if err := checkStreamFinished(request, id, id_max, res); err != nil { return nil, err } - request.Logger().WithFields(map[string]interface{}{"id": id}).Debug("got record from db") + request.Logger().WithFields(map[string]interface{}{"id": id}).Debug("got record from db by", idKey) record, err := utils.MapToJson(&res) if err != nil { @@ -372,17 +400,18 @@ func (db *Mongodb) getEarliestRawRecord(dbname string, collection_name string) ( } func (db *Mongodb) getRecordByID(request Request) ([]byte, error) { - id, err := strconv.Atoi(request.ExtraParam) + var params ExtraParamId + err := json.Unmarshal([]byte(request.ExtraParam), ¶ms) if err != nil { return nil, &DBError{utils.StatusWrongInput, err.Error()} } - max_ind, err := db.getMaxIndex(request, true) + max_ind, err := db.getMaxIndex(request, true, params.IdKey) if err != nil { return nil, err } - return db.getRecordByIDRaw(request, id, max_ind) + return db.getRecordByIDRaw(request, params.Id, max_ind, params.IdKey) } func (db *Mongodb) negAckRecord(request Request) ([]byte, error) { @@ -438,8 +467,8 @@ func (db *Mongodb) checkDatabaseOperationPrerequisites(request Request) error { return nil } -func (db *Mongodb) getCurrentPointer(request Request) (LocationPointer, int, error) { - max_ind, err := db.getMaxIndex(request, true) +func (db *Mongodb) getCurrentPointer(request Request, collectionName string, idKey string) (LocationPointer, int, error) { + max_ind, err := db.getMaxIndex(request, true, idKey) if err != nil { return LocationPointer{}, 0, err } @@ -450,7 +479,7 @@ func (db *Mongodb) getCurrentPointer(request Request) (LocationPointer, int, err var curPointer LocationPointer err = db.changeField(request, fieldChangeRequest{ - collectionName: pointer_collection_name, + collectionName: collectionName, fieldName: pointer_field_name, op: field_op_inc, max_ind: max_ind}, &curPointer) @@ -503,36 +532,22 @@ func (db *Mongodb) InsertRecordToInprocess(db_name string, collection_name strin return err } -func (db *Mongodb) InsertToInprocessIfNeeded(db_name string, collection_name string, id int, extra_param string) error { - if len(extra_param) == 0 { +func (db *Mongodb) InsertToInprocessIfNeeded(db_name string, collection_name string, id int, params ExtraParamNext) error { + if !params.Resend { return nil } - delayMs, nResendAttempts, err := extractsTwoIntsFromString(extra_param) - if err != nil { - return err - } - - return db.InsertRecordToInprocess(db_name, collection_name, id, delayMs, nResendAttempts, false) - + return db.InsertRecordToInprocess(db_name, collection_name, id, params.DelayMs, params.ResendAttempts, false) } -func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTimeout bool) (int, int, error) { - var record_ind, max_ind, delayMs, nResendAttempts int +func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, params ExtraParamNext, ignoreTimeout bool) (int, int, error) { + var record_ind, max_ind int var err error - if len(request.ExtraParam) != 0 { - delayMs, nResendAttempts, err = extractsTwoIntsFromString(request.ExtraParam) - if err != nil { - return 0, 0, err - } - } else { - nResendAttempts = -1 - } tNow := time.Now().Unix() dbSessionLock.Lock() t := db.lastReadFromInprocess[request.Stream+"_"+request.GroupId] dbSessionLock.Unlock() if (t <= tNow-int64(db.settings.ReadFromInprocessPeriod)) || ignoreTimeout { - record_ind, err = db.getUnProcessedId(request.DbName(), inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, delayMs, nResendAttempts, + record_ind, err = db.getUnProcessedId(request.DbName(), inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, params.DelayMs, params.ResendAttempts, request.Logger()) if err != nil { request.Logger().WithFields(map[string]interface{}{"cause": err.Error()}).Debug("error getting unprocessed message") @@ -540,7 +555,7 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi } } if record_ind != 0 { - max_ind, err = db.getMaxIndex(request, true) + max_ind, err = db.getMaxIndex(request, true, params.IdKey) if err != nil { return 0, 0, err } @@ -554,9 +569,10 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi } -func (db *Mongodb) getNextAndMaxIndexesFromCurPointer(request Request) (int, int, error) { - curPointer, max_ind, err := db.getCurrentPointer(request) +func (db *Mongodb) getNextAndMaxIndexesFromCurPointer(request Request, collectionName, idKey string) (int, int, error) { + curPointer, max_ind, err := db.getCurrentPointer(request, collectionName, idKey) if err != nil { + request.Logger().Debug("error getting next pointer", err) request.Logger().WithFields(map[string]interface{}{"cause": err.Error()}).Debug("error getting next pointer") return 0, 0, err } @@ -564,17 +580,17 @@ func (db *Mongodb) getNextAndMaxIndexesFromCurPointer(request Request) (int, int return curPointer.Value, max_ind, nil } -func (db *Mongodb) getNextAndMaxIndexes(request Request) (int, int, error) { - nextInd, maxInd, err := db.getNextAndMaxIndexesFromInprocessed(request, false) +func (db *Mongodb) getNextAndMaxIndexes(request Request, params ExtraParamNext) (int, int, error) { + nextInd, maxInd, err := db.getNextAndMaxIndexesFromInprocessed(request, params, false) if err != nil { return 0, 0, err } if nextInd == 0 { - nextInd, maxInd, err = db.getNextAndMaxIndexesFromCurPointer(request) + nextInd, maxInd, err = db.getNextAndMaxIndexesFromCurPointer(request, pointer_collection_name, params.IdKey) if err_db, ok := err.(*DBError); ok && err_db.Code == utils.StatusNoData { var err_inproc error - nextInd, maxInd, err_inproc = db.getNextAndMaxIndexesFromInprocessed(request, true) + nextInd, maxInd, err_inproc = db.getNextAndMaxIndexesFromInprocessed(request, params, true) if err_inproc != nil { return 0, 0, err_inproc } @@ -604,16 +620,108 @@ func ExtractMessageRecord(data map[string]interface{}) (MessageRecord, bool) { return r, true } -func (db *Mongodb) tryGetRecordFromInprocessed(request Request, originalerror error) ([]byte, error) { +func (db *Mongodb) getNextRecordDB(request Request, currentPointer int) (map[string]interface{}, error) { + var res map[string]interface{} + filter := bson.D{{"_id", bson.D{{"$gt", currentPointer}}}} + opts := options.FindOne().SetSort(bson.M{"_id": 1}) + coll := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream) + err := coll.FindOne(context.TODO(), filter, opts).Decode(&res) + return res, err +} + +func (db *Mongodb) getCurrentPointerDB(request Request) (LocationPointer, error) { + var curPointer LocationPointer + filter := bson.M{"_id": request.GroupId + "_" + request.Stream} + opts := options.FindOne() + coll := db.client.Database(request.DbName()).Collection(pointer_collection_name) + err := coll.FindOne(context.TODO(), filter, opts).Decode(&curPointer) + return curPointer, err +} + +func (db *Mongodb) updateCurrentPointerIfEqualDB(request Request, currentValue, nextValue int) error { + update := bson.M{"$set": bson.M{pointer_field_name: nextValue}} + opts := options.Update().SetUpsert(false) + coll := db.client.Database(request.DbName()).Collection(pointer_collection_name) + filter := bson.M{"_id": request.GroupId + "_" + request.Stream, pointer_field_name: currentValue} + _, err := coll.UpdateOne(context.TODO(), filter, update, opts) + return err +} + +func (db *Mongodb) getRecordFromCurPointer(request Request, params ExtraParamNext) ([]byte, int, error) { + + nextInd, maxInd, err := db.getNextAndMaxIndexesFromCurPointer(request, pointer_collection_name, params.IdKey) + if err != nil { + return nil, 0, err + } + var res map[string]interface{} + res, err = db.getRecordFromDb(request, nextInd, maxInd, params.IdKey) + // Missing Id! Since Ids are sequential, jump to the next available id + // Update current pointer to next available id + if err != nil && params.IdKey == "_id" { + for { + curPointer, err := db.getCurrentPointerDB(request) + if err != nil { + return nil, 0, err + } + res, err = db.getNextRecordDB(request, curPointer.Value) + if err != nil { + return nil, 0, err + } + nextValue, ok := getIntVal(request, res["_id"]) + if ok != nil { + return nil, 0, errors.New("failed to next next available id. Extraction of id fails") + } + updateErr := db.updateCurrentPointerIfEqualDB(request, curPointer.Value, int(nextValue)) + // If update fails, another broker already change the value. Go to the next one. + if updateErr == nil { + nextInd = int(nextValue) + break + } + } + } else { + // In this case we can not guess, what is the next + if err != nil { + return nil, 0, err + } + } + + // Copy value of d key to _id to keep back compatibility + if res["message_id"] != nil { + res["_id"] = res["message_id"] + } + if err := checkStreamFinished(request, nextInd, maxInd, res); err != nil { + return nil, 0, err + } + + request.Logger().WithFields(map[string]interface{}{"id": nextInd}).Debug("got record from db by id key: ", + params.IdKey) + + record, err := utils.MapToJson(&res) + if err != nil { + return nil, 0, err + } + if recordContainsPartialData(request, res) { + return nil, 0, &DBError{utils.StatusPartialData, string(record)} + } else { + fmt.Println("Return record without error: ", string(record)) + return record, nextInd, nil + } + +} + +func (db *Mongodb) getRecordFromInprocessed(request Request, params ExtraParamNext, originalerror error, ignoreTimeout bool) ([]byte, int, error) { + // Get next index from inprocessed collection and + // return corresponding data var err_inproc error - nextInd, maxInd, err_inproc := db.getNextAndMaxIndexesFromInprocessed(request, true) + nextInd, maxInd, err_inproc := db.getNextAndMaxIndexesFromInprocessed(request, params, ignoreTimeout) if err_inproc != nil { - return nil, err_inproc + return nil, 0, err_inproc } if nextInd != 0 { - return db.getRecordByIDRaw(request, nextInd, maxInd) + data, err := db.getRecordByIDRaw(request, nextInd, maxInd, params.IdKey) + return data, nextInd, err } else { - return nil, originalerror + return nil, 0, originalerror } } @@ -632,18 +740,34 @@ func checkStreamFinished(request Request, id, id_max int, data map[string]interf } func (db *Mongodb) getNextRecord(request Request) ([]byte, error) { - nextInd, maxInd, err := db.getNextAndMaxIndexes(request) + var params ExtraParamNext + err := json.Unmarshal([]byte(request.ExtraParam), ¶ms) if err != nil { - return nil, err + return nil, errors.New("fails to extract request parameters: " + err.Error()) } + request.Logger().Debug("get next by : ", params.IdKey) - data, err := db.getRecordByIDRaw(request, nextInd, maxInd) + var data []byte + var nextInd int + data, nextInd, err = db.getRecordFromInprocessed(request, params, nil, false) if err != nil { - data, err = db.tryGetRecordFromInprocessed(request, err) + return nil, err + } + + if data == nil { + data, nextInd, err = db.getRecordFromCurPointer(request, params) + if err != nil { + request.Logger().Debug("error getting record from current pointer: ", err) + return nil, err + } + } + + if data == nil { + data, nextInd, err = db.getRecordFromInprocessed(request, params, err, true) } if err == nil { - err_update := db.InsertToInprocessIfNeeded(request.DbName(), inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, nextInd, request.ExtraParam) + err_update := db.InsertToInprocessIfNeeded(request.DbName(), inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, nextInd, params) if err_update != nil { return nil, err_update } @@ -651,16 +775,16 @@ func (db *Mongodb) getNextRecord(request Request) ([]byte, error) { return data, err } -func (db *Mongodb) getLastRecord(request Request) ([]byte, error) { - max_ind, err := db.getMaxIndex(request, false) +func (db *Mongodb) getLastRecord(request Request, idKey string) ([]byte, error) { + max_ind, err := db.getMaxIndex(request, false, idKey) if err != nil { return nil, err } - return db.getRecordByIDRaw(request, max_ind, max_ind) + return db.getRecordByIDRaw(request, max_ind, max_ind, idKey) } -func (db *Mongodb) getLastRecordInGroup(request Request) ([]byte, error) { - max_ind, err := db.getMaxIndex(request, false) +func (db *Mongodb) getLastRecordInGroup(request Request, idKey string) ([]byte, error) { + max_ind, err := db.getMaxIndex(request, false, idKey) if err != nil { return nil, err } @@ -676,7 +800,7 @@ func (db *Mongodb) getLastRecordInGroup(request Request) ([]byte, error) { if err != nil { return nil, err } - return db.getRecordByIDRaw(request, max_ind, max_ind) + return db.getRecordByIDRaw(request, max_ind, max_ind, idKey) } func getSizeFilter(request Request) bson.M { @@ -837,7 +961,7 @@ func (db *Mongodb) getNacksLimits(request Request) (int, int, error) { } if to == 0 { - to, err = db.getMaxLimitWithoutEndOfStream(request, err) + to, err = db.getMaxLimitWithoutEndOfStream(request, err, "_id") if err != nil { return 0, 0, err } @@ -845,12 +969,12 @@ func (db *Mongodb) getNacksLimits(request Request) (int, int, error) { return from, to, nil } -func (db *Mongodb) getMaxLimitWithoutEndOfStream(request Request, err error) (int, error) { - maxInd, err := db.getMaxIndex(request, true) +func (db *Mongodb) getMaxLimitWithoutEndOfStream(request Request, err error, idKey string) (int, error) { + maxInd, err := db.getMaxIndex(request, true, idKey) if err != nil { return 0, err } - _, last_err := db.getRecordByIDRaw(request, maxInd, maxInd) + _, last_err := db.getRecordByIDRaw(request, maxInd, maxInd, idKey) if last_err != nil && maxInd > 0 { maxInd = maxInd - 1 } @@ -1106,9 +1230,9 @@ func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) { case "id": return db.getRecordByID(request) case "last": - return db.getLastRecord(request) + return db.getLastRecord(request, "_id") case "groupedlast": - return db.getLastRecordInGroup(request) + return db.getLastRecordInGroup(request, "_id") case "resetcounter": return db.resetCounter(request) case "size": diff --git a/broker/src/asapo_broker/server/get_commands_test.go b/broker/src/asapo_broker/server/get_commands_test.go index b0ee5d04fcebda10ac4c128beddd3c05bf0eec46..9ccc2ea5b24f74a2e9945b9021d2c568a57aef4b 100644 --- a/broker/src/asapo_broker/server/get_commands_test.go +++ b/broker/src/asapo_broker/server/get_commands_test.go @@ -44,13 +44,13 @@ var testsGetCommand = []struct { externalParam string }{ {"last", expectedSource, expectedStream, "", expectedStream + "/0/last", "", "0"}, - {"id", expectedSource, expectedStream, "", expectedStream + "/0/1", "", "1"}, + {"id", expectedSource, expectedStream, "", expectedStream + "/0/1", "&id=1", "{\"id\":1,\"id_key\":\"message_id\"}"}, {"meta", expectedSource, "default", "", "default/0/meta/0", "", "0"}, {"nacks", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/nacks", "", "0_0"}, {"groupedlast", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/groupedlast", "", ""}, - {"next", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/next", "", ""}, + {"next", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/next", "", "{\"id_key\":\"_id\",\"resend\":false,\"delay_ms\":0,\"resend_attempts\":-1}"}, {"next", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + - expectedGroupID + "/next", "&resend_nacks=true&delay_ms=10000&resend_attempts=3", "10000_3"}, + expectedGroupID + "/next", "&resend_nacks=true&delay_ms=10000&resend_attempts=3", "{\"id_key\":\"_id\",\"resend\":true,\"delay_ms\":10000,\"resend_attempts\":3}"}, {"size", expectedSource, expectedStream, "", expectedStream + "/size", "", ""}, {"size", expectedSource, expectedStream, "", expectedStream + "/size", "&incomplete=true", "true"}, {"streams", expectedSource, "0", "", "0/streams", "", "{\"from\":\"\",\"filter\":\"\",\"detailed\":\"\"}"}, diff --git a/broker/src/asapo_broker/server/get_id.go b/broker/src/asapo_broker/server/get_id.go index 34ddb643e9b2b12e7b8e848a01b6770766d2c37f..6fb4f652bc256bf3828bf0618da9f624949680d9 100644 --- a/broker/src/asapo_broker/server/get_id.go +++ b/broker/src/asapo_broker/server/get_id.go @@ -1,8 +1,11 @@ package server import ( + "asapo_broker/database" + "encoding/json" "github.com/gorilla/mux" "net/http" + "strconv" ) func extractRequestParametersID(r *http.Request) (string, bool) { @@ -14,11 +17,32 @@ func extractRequestParametersID(r *http.Request) (string, bool) { return id_str, true } +func getExtraParameters(r *http.Request) (string, bool) { + vars := mux.Vars(r) + id_str, ok := vars["id"] + if !ok { + return "0", ok + } + id_int, err := strconv.Atoi(id_str) + if err != nil { + return "0", false + } + id_key, id_ok := vars["id_key"] + if !id_ok { + id_key = "message_id" + } + extraParam := database.ExtraParamId{ + Id: id_int, + IdKey: id_key} + encoded, _ := json.Marshal(extraParam) + return string(encoded), true +} + func routeGetByID(w http.ResponseWriter, r *http.Request) { - id, ok := extractRequestParametersID(r) + extraParam, ok := getExtraParameters(r) if !ok { w.WriteHeader(http.StatusBadRequest) return } - processRequest(w, r, "id", id, false) + processRequest(w, r, "id", extraParam, false) } diff --git a/broker/src/asapo_broker/server/get_next.go b/broker/src/asapo_broker/server/get_next.go index 3f051f57e72398d1eeb8222fecbed005de4dc3f0..c76dfc4ce029eed7de284dd417608f80b068e2ce 100644 --- a/broker/src/asapo_broker/server/get_next.go +++ b/broker/src/asapo_broker/server/get_next.go @@ -1,21 +1,42 @@ package server import ( + "asapo_broker/database" + "encoding/json" "net/http" + "strconv" ) -func extractResend(r *http.Request) (string) { +func extractParams(r *http.Request) string { keys := r.URL.Query() - resend := keys.Get("resend_nacks") - delay_ms := keys.Get("delay_ms") - resend_attempts := keys.Get("resend_attempts") - resend_params := "" - if len(resend)!=0 { - resend_params=delay_ms+"_"+resend_attempts + delay_ms, err := strconv.Atoi(keys.Get("delay_ms")) + if err != nil { + delay_ms = 0 } - return resend_params + resend_attempts, err := strconv.Atoi(keys.Get("resend_attempts")) + if err != nil { + resend_attempts = 0 + } + resend := true + if len(keys.Get("resend_nacks")) == 0 { + resend = false + resend_attempts = -1 + } + IdKey := keys.Get("id_key") + if IdKey == "" { + IdKey = "_id" + } + extraParam := database.ExtraParamNext{ + IdKey: IdKey, + Resend: resend, + DelayMs: delay_ms, + ResendAttempts: resend_attempts} + + encoded, _ := json.Marshal(extraParam) + return string(encoded) + } func routeGetNext(w http.ResponseWriter, r *http.Request) { - processRequest(w, r, "next", extractResend(r), true) + processRequest(w, r, "next", extractParams(r), true) } diff --git a/broker/src/asapo_broker/server/process_request_test.go b/broker/src/asapo_broker/server/process_request_test.go index a9c5f53f36bd0e760ee7bcb1d9bf624b0d50d9a0..06571264cad5bba847d6683c757d361de553238e 100644 --- a/broker/src/asapo_broker/server/process_request_test.go +++ b/broker/src/asapo_broker/server/process_request_test.go @@ -45,7 +45,7 @@ func (a *MockAuthServer) AuthorizeToken(tokenJWT string) (token Token, err error }, nil } - return Token{}, &AuthorizationError{errors.New("wrong or expired JWT token"),http.StatusUnauthorized} + return Token{}, &AuthorizationError{errors.New("wrong or expired JWT token"), http.StatusUnauthorized} } func prepareTestAuth() { @@ -68,7 +68,7 @@ type request struct { func containsMatcherMap(substrings ...string) func(map[string]interface{}) bool { return func(vals map[string]interface{}) bool { - res,_:=utils.MapToJson(vals) + res, _ := utils.MapToJson(vals) for _, substr := range substrings { if !strings.Contains(string(res), substr) { return false @@ -89,7 +89,6 @@ func containsMatcherStr(substrings ...string) func(str string) bool { } } - func doRequest(path string, extra_params ...string) *httptest.ResponseRecorder { m := "GET" if len(extra_params) > 0 { @@ -167,7 +166,8 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithNoToken() { func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() { - expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"} + expectedRequest := database.Request{Beamtime: expectedBeamtimeId, DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next", + ExtraParam: "{\"id_key\":\"_id\",\"resend\":false,\"delay_ms\":0,\"resend_attempts\":-1}"} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), &database.DBError{utils.StatusNoData, ""}) @@ -176,7 +176,6 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("got request"))) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("no data or partial data"))) - w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix) suite.Equal(http.StatusConflict, w.Code, "wrong database name") @@ -184,12 +183,13 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() { - expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"} + expectedRequest := database.Request{Beamtime: expectedBeamtimeId, DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next", + ExtraParam: "{\"id_key\":\"_id\",\"resend\":false,\"delay_ms\":0,\"resend_attempts\":-1}"} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), &database.DBError{utils.StatusServiceUnavailable, ""}) - logger.MockLog.On("WithFields", mock.Anything) + logger.MockLog.On("WithFields", mock.Anything) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("got request"))) logger.MockLog.On("Error", mock.MatchedBy(containsMatcherStr("cannot process request"))) ExpectReconnect(suite.mock_db) @@ -202,7 +202,8 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() { func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() { - expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"} + expectedRequest := database.Request{Beamtime: expectedBeamtimeId, DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next", + ExtraParam: "{\"id_key\":\"_id\",\"resend\":false,\"delay_ms\":0,\"resend_attempts\":-1}"} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), errors.New("")) @@ -220,7 +221,8 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() { func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() { - expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"} + expectedRequest := database.Request{Beamtime: expectedBeamtimeId, DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next", + ExtraParam: "{\"id_key\":\"_id\",\"resend\":false,\"delay_ms\":0,\"resend_attempts\":-1}"} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil) logger.MockLog.On("WithFields", mock.Anything) @@ -232,7 +234,8 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() { func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() { - expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, DatasetOp: true, Op: "next"} + expectedRequest := database.Request{Beamtime: expectedBeamtimeId, DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, DatasetOp: true, Op: "next", + ExtraParam: "{\"id_key\":\"_id\",\"resend\":false,\"delay_ms\":0,\"resend_attempts\":-1}"} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil) logger.MockLog.On("WithFields", mock.Anything) @@ -259,7 +262,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestDeleteStreamReadToken() func (suite *ProcessRequestTestSuite) TestProcessRequestDeleteStreamWriteToken() { query_str := "query_string" - expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: "", Op: "delete_stream", ExtraParam: query_str} + expectedRequest := database.Request{Beamtime: expectedBeamtimeId, DataSource: expectedSource, Stream: expectedStream, GroupId: "", Op: "delete_stream", ExtraParam: query_str} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil) logger.MockLog.On("WithFields", mock.MatchedBy(containsMatcherMap("delete_stream"))) diff --git a/common/cpp/include/asapo/common/data_structs.h b/common/cpp/include/asapo/common/data_structs.h index d29301909599de07885df777baac9747216cfe6c..be8b8497c24a2e4c75e691cad8f3caa0c0909d62 100644 --- a/common/cpp/include/asapo/common/data_structs.h +++ b/common/cpp/include/asapo/common/data_structs.h @@ -35,6 +35,7 @@ class MessageMeta { std::chrono::system_clock::time_point timestamp; uint64_t size{0}; uint64_t id{0}; + uint64_t message_id{0}; std::string source; std::string ib_source; std::string metadata; diff --git a/common/cpp/src/data_structs/data_structs.cpp b/common/cpp/src/data_structs/data_structs.cpp index 30869d441d9c376a7637dbb239716819cf392e2a..e32d4c4c163d67f88d53fbab70667fd1b4bcdf41 100644 --- a/common/cpp/src/data_structs/data_structs.cpp +++ b/common/cpp/src/data_structs/data_structs.cpp @@ -64,6 +64,7 @@ std::string MessageMeta::Json() const { int64_t buf_id_int = static_cast<int64_t>(buf_id); std::string s = "{\"_id\":" + std::to_string(id) + "," + "\"message_id\":" + std::to_string(message_id) + "," "\"size\":" + std::to_string(size) + "," "\"name\":\"" + x + "\"," "\"timestamp\":" diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp index c15dda36068bbb29866111bc28855998d9dc061b..1f5b037f1dc1dffd42e3bafa6958bae0af995c8a 100644 --- a/common/cpp/src/database/mongodb_client.cpp +++ b/common/cpp/src/database/mongodb_client.cpp @@ -87,10 +87,65 @@ Error MongoDBClient::UpdateCurrentCollectionIfNeeded(const std::string& collecti current_collection_name_ = collection_name; mongoc_collection_set_write_concern(current_collection_, write_concern_); + // Create index `message_id` for collection of asapo messages + // ToDo cache streams with already created indices + if (collection_name.rfind(kDBDataCollectionNamePrefix, 0) != 0){ + return nullptr; + } + CreateIndex(encoded_name); + return nullptr; } +Error MongoDBClient::CreateIndex(const std::string& collection_name) const { + + bson_t keys; + char *index_name; + bson_t *create_indexes; + bson_t reply; + char *reply_str; + bson_error_t error; + bool r; + mongoc_database_t *db; + + db = mongoc_client_get_database (client_, database_name_.c_str()); + + bson_init (&keys); + BSON_APPEND_INT32 (&keys, "message_id", 1); + index_name = mongoc_collection_keys_to_index_string (&keys); + create_indexes = BCON_NEW ("createIndexes", + BCON_UTF8 (collection_name.c_str()), + "indexes", + "[", + "{", + "key", + BCON_DOCUMENT (&keys), + "name", + BCON_UTF8 (index_name), + "unique", BCON_BOOL (true), + "}", + "]"); + + r = mongoc_database_write_command_with_opts ( + db, create_indexes, NULL , &reply, &error); + + reply_str = bson_as_json (&reply, NULL); + printf ("%s\n", reply_str); + + if (!r) { + fprintf (stderr, "Error in createIndexes: %s\n", error.message); + } + bson_free (index_name); + bson_free (reply_str); + bson_destroy (&reply); + bson_destroy (create_indexes); + // return DBErrorTemplates::kInsertError.Generate("cannot create index for collection: "+collection_name); + return nullptr; +} + + + Error MongoDBClient::TryConnectDatabase() { auto err = Ping(); if (err == nullptr) { @@ -347,6 +402,10 @@ Error MongoDBClient::InsertWithAutoId(const MessageMeta& file, auto meta_new = file; meta_new.id = id; + // Inset with auto ID + if (file.message_id == 0){ + meta_new.message_id = id; + } return Insert(current_collection_name_, meta_new, false, id_inserted); } @@ -469,7 +528,9 @@ Error MongoDBClient::InsertAsDatasetMessage(const std::string& collection, const return err; } auto query = - BCON_NEW ("$and", "[", "{", "_id", BCON_INT64(static_cast<int64_t>(file.id)), "}", "{", "messages.dataset_substream", + BCON_NEW ("$and", "[", "{", "_id", BCON_INT64(static_cast<int64_t>(file.message_id)), "}", + "{", "message_id", BCON_INT64(static_cast<int64_t>(file.message_id)), "}", + "{", "messages.dataset_substream", "{", "$ne", BCON_INT64(static_cast<int64_t>(file.dataset_substream)), "}", "}", "]"); auto update = BCON_NEW ("$setOnInsert", "{", @@ -516,10 +577,18 @@ Error MongoDBClient::GetRecordFromDb(const std::string& collection, uint64_t id, filter = BCON_NEW ("_id", BCON_INT64(static_cast<int64_t>(id))); opts = BCON_NEW ("limit", BCON_INT64(1)); break; + case GetRecordMode::kByMessageId: + filter = BCON_NEW ("message_id", BCON_INT64(static_cast<int64_t>(id))); + opts = BCON_NEW ("limit", BCON_INT64(1)); + break; case GetRecordMode::kLast: filter = BCON_NEW (NULL); opts = BCON_NEW ("limit", BCON_INT64(1), "sort", "{", "_id", BCON_INT64(-1), "}"); break; + case GetRecordMode::kLastMessageId: + filter = BCON_NEW (NULL); + opts = BCON_NEW ("limit", BCON_INT64(1), "sort", "{", "message_id", BCON_INT64(-1), "}"); + break; case GetRecordMode::kEarliest: filter = BCON_NEW (NULL); opts = BCON_NEW ("limit", BCON_INT64(1), "sort", "{", "timestamp", BCON_INT64(1), "}"); @@ -553,7 +622,7 @@ Error MongoDBClient::GetRecordFromDb(const std::string& collection, uint64_t id, Error MongoDBClient::GetById(const std::string& collection, uint64_t id, MessageMeta* file) const { std::string record_str; - auto err = GetRecordFromDb(collection, id, "", GetRecordMode::kById, &record_str); + auto err = GetRecordFromDb(collection, id, "", GetRecordMode::kByMessageId, &record_str); if (err) { return err; } @@ -643,9 +712,9 @@ Error UpdateStreamInfoFromLastRecord(const std::string& last_record_str, return DBErrorTemplates::kJsonParseError.Generate( "UpdateStreamInfoFromLastRecord: cannot parse timestamp in response: " + last_record_str); } - if (parser.GetUInt64("_id", &id) != nullptr) { + if (parser.GetUInt64("message_id", &id) != nullptr) { return DBErrorTemplates::kJsonParseError.Generate( - "UpdateStreamInfoFromLastRecord: cannot parse _id in response: " + last_record_str); + "UpdateStreamInfoFromLastRecord: cannot parse message_id in response: " + last_record_str); } info->timestamp_lastentry = timestamp_last; @@ -671,7 +740,7 @@ Error StreamInfoFromDbResponse(const std::string& last_record_str, Error MongoDBClient::GetStreamInfo(const std::string& collection, StreamInfo* info) const { std::string last_record_str, earliest_record_str; - auto err = GetRecordFromDb(collection, 0, "", GetRecordMode::kLast, &last_record_str); + auto err = GetRecordFromDb(collection, 0, "", GetRecordMode::kLastMessageId, &last_record_str); if (err) { if (err == DBErrorTemplates::kNoRecord) { // with noRecord error it will return last_id = 0 which can be used to understand that the stream is not started yet diff --git a/common/cpp/src/database/mongodb_client.h b/common/cpp/src/database/mongodb_client.h index d2a0621f9e382c9540f0c1cf3f7dc76986055616..e45d79521c83e1c8f94b8a343d971921d75895d7 100644 --- a/common/cpp/src/database/mongodb_client.h +++ b/common/cpp/src/database/mongodb_client.h @@ -35,9 +35,11 @@ using bson_p = std::unique_ptr<_bson_t, BsonDestroyFunctor>; enum class GetRecordMode { kById, + kByMessageId, kLast, kEarliest, kByStringId, + kLastMessageId, }; const size_t maxDbNameLength = 63; @@ -62,6 +64,7 @@ class MongoDBClient final : public Database { Error GetPersistedStreamsNumber(int* res) const override; Error PersistStream(const std::string& stream_name) const override; Error GetNextId(const std::string& collection, uint64_t* id) const; + Error CreateIndex(const std::string& collection_name) const; ~MongoDBClient() override; private: mongoc_client_t* client_{nullptr}; diff --git a/common/cpp/unittests/data_structs/test_data_structs.cpp b/common/cpp/unittests/data_structs/test_data_structs.cpp index 85222ab50bc9b905fb3644baee4ba8b3d8a7d59a..af3679afa3a7732254eca0fb9aad3ab79c8a5060 100644 --- a/common/cpp/unittests/data_structs/test_data_structs.cpp +++ b/common/cpp/unittests/data_structs/test_data_structs.cpp @@ -30,6 +30,7 @@ MessageMeta PrepareMessageMeta(bool includeNewStreamField = true) { MessageMeta message_meta; message_meta.size = 100; message_meta.id = 1; + message_meta.message_id = 1; message_meta.dataset_substream = 3; message_meta.name = std::string("folder") + asapo::kPathSeparator + "test"; message_meta.source = "host:1234"; @@ -57,10 +58,10 @@ TEST(MessageMetaTests, CorrectConvertToJson) { std::string json = message_meta.Json(); if (asapo::kPathSeparator == '/') { ASSERT_THAT(json, Eq( - R"({"_id":1,"size":100,"name":"folder/test","timestamp":1000000,"source":"host:1234","ib_source":"","buf_id":-1,"stream":"testStream","dataset_substream":3,"ingest_mode":13,"meta":{"bla":10}})")); + R"({"_id":1,"message_id":1,"size":100,"name":"folder/test","timestamp":1000000,"source":"host:1234","ib_source":"","buf_id":-1,"stream":"testStream","dataset_substream":3,"ingest_mode":13,"meta":{"bla":10}})")); } else { ASSERT_THAT(json, Eq( - R"({"_id":1,"size":100,"name":"folder\\test","timestamp":1000000,"source":"host:1234","ib_source":"","buf_id":-1,"stream":"testStream","dataset_substream":3,"ingest_mode":13,"meta":{"bla":10}})")); + R"({"_id":1,"message_id":1,"size":100,"name":"folder\\test","timestamp":1000000,"source":"host:1234","ib_source":"","buf_id":-1,"stream":"testStream","dataset_substream":3,"ingest_mode":13,"meta":{"bla":10}})")); } } diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index 03b48601e4a9c2f0143697249cff6a6d91a12070..fbc6bebec3fc76fab9eb608be238f15f73578afa 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -149,7 +149,7 @@ class Consumer { */ virtual std::string GetStreamMeta(const std::string& stream, Error* err) = 0; - //! Receive next available message. + //! Receive next by ID message. /*! \param info - where to store message metadata. Can be set to nullptr only message data is needed. \param group_id - group id to use @@ -158,6 +158,7 @@ class Consumer { \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ virtual Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) = 0; + virtual Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream, bool ordered) = 0; //! Retrieves message using message meta. /*! diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index 8710b5417b87bd7806a1481b93ef6fb2a2c9ae87..1f04b8036f9b735ad788fa7e13fa92789be62064 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -346,6 +346,11 @@ Error ConsumerImpl::GetRecordFromServer(std::string* response, std::string group ri.extra_params = ri.extra_params + "&resend_nacks=true" + "&delay_ms=" + std::to_string(delay_ms_) + "&resend_attempts=" + std::to_string(resend_attempts_); } + if (op == GetMessageServerOperation::GetNextAvailable) { + ri.extra_params += "&id_key=_id"; + } else { + ri.extra_params += "&id_key=message_id"; + } RequestOutput output; err = ProcessRequest(&output, ri, ¤t_broker_uri_); *response = std::move(output.string_output); @@ -390,6 +395,24 @@ Error ConsumerImpl::GetNext(std::string group_id, MessageMeta* info, MessageData data); } +Error ConsumerImpl::GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream, bool ordered) { + if ( ordered ) { + return GetMessageFromServer(GetMessageServerOperation::GetNext, + 0, + std::move(group_id), + std::move(stream), + info, + data); + } else { + return GetMessageFromServer(GetMessageServerOperation::GetNextAvailable, + 0, + std::move(group_id), + std::move(stream), + info, + data); + } +} + Error ConsumerImpl::GetLast(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) { return GetMessageFromServer(GetMessageServerOperation::GetLastInGroup, 0, @@ -413,6 +436,8 @@ std::string ConsumerImpl::OpToUriCmd(GetMessageServerOperation op) { switch (op) { case GetMessageServerOperation::GetNext: return "next"; + case GetMessageServerOperation::GetNextAvailable: + return "next"; case GetMessageServerOperation::GetLastInGroup: return "groupedlast"; case GetMessageServerOperation::GetLast: diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index d1672ea1edcecede0470d4ffb93b443d4efa9b2b..f8556e58c3799f8af9fcde5a48feb937d7d423c2 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -13,6 +13,7 @@ namespace asapo { enum class GetMessageServerOperation { GetNext, + GetNextAvailable, GetLast, GetID, GetLastInGroup, @@ -74,6 +75,7 @@ class ConsumerImpl final : public asapo::Consumer { Error SetLastReadMarker(std::string group_id, uint64_t value, std::string stream) override; Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) override; + Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream, bool ordered) override; Error GetLast(MessageMeta* info, MessageData* data, std::string stream) override; Error GetLast(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) override; diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp index 15c27317a0b2f28e9fedf245c611ee4330b6ef3c..81972bdd9e717ec912068d59debdfa0cd159fb08 100644 --- a/consumer/api/cpp/unittests/test_consumer_impl.cpp +++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp @@ -269,7 +269,7 @@ TEST_F(ConsumerImplTests, DefaultStreamIsDetector) { asapo::SourceType::kProcessed, "instance", "step", "beamtime_id", "", "", expected_token }, "/beamtime/beamtime_id/detector/stream/" + expected_group_id_encoded + "/next?token=" + expected_token - + "&instanceid=instance&pipelinestep=step"); + + "&instanceid=instance&pipelinestep=step&id_key=message_id"); } TEST_F(ConsumerImplTests, DefaultPipelineStepIsDefaultStep) { @@ -278,7 +278,7 @@ TEST_F(ConsumerImplTests, DefaultPipelineStepIsDefaultStep) { asapo::SourceType::kProcessed, "instance", "", "beamtime_id", "a", "b", expected_token }, "/beamtime/beamtime_id/b/stream/" + expected_group_id_encoded + "/next?token=" + expected_token - + "&instanceid=instance&pipelinestep=DefaultStep"); + + "&instanceid=instance&pipelinestep=DefaultStep&id_key=message_id"); } TEST_F(ConsumerImplTests, AutoPipelineStepIsDefaultStep) { @@ -287,7 +287,7 @@ TEST_F(ConsumerImplTests, AutoPipelineStepIsDefaultStep) { asapo::SourceType::kProcessed, "instance", "auto", "beamtime_id", "a", "b", expected_token }, "/beamtime/beamtime_id/b/stream/" + expected_group_id_encoded + "/next?token=" + expected_token - + "&instanceid=instance&pipelinestep=DefaultStep"); + + "&instanceid=instance&pipelinestep=DefaultStep&id_key=message_id"); } /* @@ -329,7 +329,8 @@ TEST_F(ConsumerImplTests, GetNextUsesCorrectUriWithStream) { + expected_stream_encoded + "/" + expected_group_id_encoded + "/next?token=" + expected_token - + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _, + + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded + + "&id_key=message_id", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), @@ -345,7 +346,8 @@ TEST_F(ConsumerImplTests, GetLastOnceUsesCorrectUri) { + expected_stream_encoded + "/" + expected_group_id_encoded + "/groupedlast?token=" + expected_token - + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _, + + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded + + "&id_key=message_id", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), @@ -359,7 +361,8 @@ TEST_F(ConsumerImplTests, GetLastUsesCorrectUri) { EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" + expected_stream_encoded + "/0/last?token=" + expected_token - + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _, + + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded + + "&id_key=message_id", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), @@ -545,6 +548,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsNoDataAfterTimeoutEvenIfOtherErrorOcc "/stream/0/" + std::to_string(expected_dataset_id) + "?token=" + expected_token + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded + + "&id_key=message_id" , _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll( SetArgPointee<1>(HttpCode::ServiceUnavailable), SetArgPointee<2>(nullptr), @@ -1065,7 +1069,8 @@ TEST_F(ConsumerImplTests, GetNextDatasetUsesCorrectUri) { "/stream/" + expected_group_id_encoded + "/next?token=" + expected_token + "&dataset=true&minsize=0" - + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _, + + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded + + "&id_key=message_id", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), @@ -1202,7 +1207,8 @@ TEST_F(ConsumerImplTests, GetLastDatasetUsesCorrectUri) { + expected_stream_encoded + "/0/last?token=" + expected_token + "&dataset=true&minsize=1" - + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _, + + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded + + "&id_key=message_id", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), @@ -1219,7 +1225,8 @@ TEST_F(ConsumerImplTests, GetLastDatasetInGroupUsesCorrectUri) { + expected_stream_encoded + "/" + expected_group_id_encoded + "/groupedlast?token=" + expected_token + "&dataset=true&minsize=1" - + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _, + + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded + + "&id_key=message_id", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), @@ -1525,7 +1532,7 @@ TEST_F(ConsumerImplTests, ResendNacks) { + expected_group_id_encoded + "/next?token=" + expected_token + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded - + "&resend_nacks=true&delay_ms=10000&resend_attempts=3", _, + + "&resend_nacks=true&delay_ms=10000&resend_attempts=3&id_key=message_id", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 05e32cf8beeaa82f58002979a657c0b257391938..9558fc785475727981f19ef96d151771a8fa6ece 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -72,7 +72,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: void ForceNoRdma() Error DisableMonitoring(bool enabled) NetworkConnectionType CurrentConnectionType() - Error GetNext(string group_id, MessageMeta* info, MessageData* data,string stream) + Error GetNext(string group_id, MessageMeta* info, MessageData* data,string stream, bool ordered) Error GetLast(MessageMeta* info, MessageData* data, string stream) Error GetLast(string group_id, MessageMeta* info, MessageData* data, string stream) Error GetById(uint64_t id, MessageMeta* info, MessageData* data, string stream) diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 3ba383a617c7f7d80158cc26e09f34a41527315f..5c8b8b4a0bdc426936d035630d0945f3c7aa50d1 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -134,7 +134,10 @@ cdef class PyConsumer: cdef np.npy_intp dims[1] if op == "next": with nogil: - err = self.c_consumer.get().GetNext(b_group_id, &info, p_data,b_stream) + err = self.c_consumer.get().GetNext(b_group_id, &info, p_data, b_stream, True) + elif op == "next_available": + with nogil: + err = self.c_consumer.get().GetNext(b_group_id, &info, p_data, b_stream, False) elif op == "last" and group_id == "": with nogil: err = self.c_consumer.get().GetLast(&info, p_data, b_stream) @@ -155,8 +158,11 @@ cdef class PyConsumer: arr = np.PyArray_SimpleNewFromData(1, dims, np.NPY_BYTE, ptr) PyArray_ENABLEFLAGS(arr,np.NPY_ARRAY_OWNDATA) return arr,meta - def get_next(self, group_id, meta_only = True, stream = "default"): - return self._op("next",group_id,stream,meta_only,0) + def get_next(self, group_id, meta_only = True, stream = "default", ordered = True): + if ordered: + return self._op("next",group_id,stream,meta_only,0) + else: + return self._op("next_available",group_id,stream,meta_only,0) def get_last(self, meta_only = True, stream = "default", group_id = ""): return self._op("last",group_id,stream,meta_only,0) def get_by_id(self,uint64_t id,meta_only = True, stream = "default"): diff --git a/consumer/tools/folder_to_db/src/folder_db_importer.cpp b/consumer/tools/folder_to_db/src/folder_db_importer.cpp index c973afe04fbb867990e0ba4efeb6de34ffc66dfe..cdcf6b0700d2867682929785e698839123eb7b3b 100644 --- a/consumer/tools/folder_to_db/src/folder_db_importer.cpp +++ b/consumer/tools/folder_to_db/src/folder_db_importer.cpp @@ -102,6 +102,10 @@ Error FolderToDbImporter::ImportFilelist(const MessageMetas& file_list) const { MessageMetas FolderToDbImporter::GetFilesInFolder(const std::string& folder, Error* err) const { auto file_list = io__->FilesInFolder(folder, err); + // Set id to 0 to use autoincrementing id + for (auto i = 0; i < int(file_list.size()); i++) { + file_list[i].id = 0; + } return file_list; } diff --git a/examples/consumer/getnext_python/check_linux.sh b/examples/consumer/getnext_python/check_linux.sh index 5e7040bd39b6c5fc9190eab44b5d79c7f7753941..8ab598fcea4b1c91377d2bc0dd10c6db9b21544b 100644 --- a/examples/consumer/getnext_python/check_linux.sh +++ b/examples/consumer/getnext_python/check_linux.sh @@ -17,7 +17,7 @@ Cleanup() { for i in `seq 1 3`; do - echo 'db.data_default.insert({"_id":'$i',"size":100,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":100,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} done echo 'db.meta.insert({"_id":"bt","meta":{"meta_test":"test"}})' | mongo ${database_name} diff --git a/examples/consumer/getnext_python/getnext.py b/examples/consumer/getnext_python/getnext.py index e521c98f5be613f2a1f6d04acc2251858017d5b2..8fa7efb9c6a5bcff55b952201a7356a614cca230 100644 --- a/examples/consumer/getnext_python/getnext.py +++ b/examples/consumer/getnext_python/getnext.py @@ -22,5 +22,5 @@ print ('meta: ', json.dumps(meta, indent=4, sort_keys=True)) try: beamtime_meta = consumer.get_beamtime_meta() print ('beamtime meta: ', json.dumps(beamtime_meta, indent=4, sort_keys=True)) -except asapo_consumer.AsapoError as err: +except Exception as err: print ('error getting beamtime meta: ', err) diff --git a/examples/pipeline/in_to_out/check_linux.sh b/examples/pipeline/in_to_out/check_linux.sh index ceaeccdacc4c7a1d7e4091715cb2a35acda58a56..0a41099f9993f2aed10c1c90ffb58a9d295aa948 100644 --- a/examples/pipeline/in_to_out/check_linux.sh +++ b/examples/pipeline/in_to_out/check_linux.sh @@ -42,7 +42,7 @@ echo hello3 > processed/file3 for i in `seq 1 3`; do - echo 'db.data_default.insert({"_id":'$i',"size":6,"name":"'processed/file$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${indatabase_name} + echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":6,"name":"'processed/file$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${indatabase_name} done sleep 1 @@ -52,7 +52,7 @@ cat out cat out | grep "Processed 3 file(s)" cat out | grep "Sent 3 file(s)" -echo "db.data_default.find({"_id":1})" | mongo ${outdatabase_name} | tee /dev/stderr | grep file1_${data_source_out} +echo "db.data_default.find({"message_id":1})" | mongo ${outdatabase_name} | tee /dev/stderr | grep file1_${data_source_out} cat ${receiver_folder}/processed/file1_${data_source_out} | grep hello1 cat ${receiver_folder}/processed/file2_${data_source_out} | grep hello2 @@ -61,4 +61,4 @@ cat ${receiver_folder}/processed/file3_${data_source_out} | grep hello3 $1 127.0.0.1:8400 $source_path $beamtime_id $data_source_in $data_source_out2 $token 2 1000 25000 0 > out2 cat out2 test ! -f ${receiver_folder}/processed/file1_${data_source_out2} -echo "db.data_default.find({"_id":1})" | mongo ${outdatabase_name2} | tee /dev/stderr | grep processed/file1 +echo "db.data_default.find({"message_id":1})" | mongo ${outdatabase_name2} | tee /dev/stderr | grep processed/file1 diff --git a/examples/pipeline/in_to_out_python/check_linux.sh b/examples/pipeline/in_to_out_python/check_linux.sh index c2d2fb48b33293d587a0b5dae855962f8e289977..6a3f447463bc13e2d2feddb01a47899c3bccc6fa 100644 --- a/examples/pipeline/in_to_out_python/check_linux.sh +++ b/examples/pipeline/in_to_out_python/check_linux.sh @@ -33,7 +33,7 @@ Cleanup() { echo "db.dropDatabase()" | mongo ${outdatabase_name} rm -rf processed rm -rf ${receiver_root_folder} -# rm -rf out + rm -rf out } @@ -46,7 +46,7 @@ echo hello3 > processed/file3 for i in `seq 1 3`; do - echo 'db.data_default.insert({"_id":'$i',"size":6,"name":"'processed/file$i'","timestamp":1,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${indatabase_name} + echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":6,"name":"'processed/file$i'","timestamp":1,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${indatabase_name} done sleep 1 @@ -61,7 +61,8 @@ cat out | grep "Sent 5 file(s)" cat out | grep bt_meta cat out | grep st_meta -echo "db.data_default.find({"_id":1})" | mongo ${outdatabase_name} | tee /dev/stderr | grep "file1_${data_source_out}" +echo "db.data_default.find({"message_id":1})" | mongo ${outdatabase_name} +echo "db.data_default.find({"message_id":1})" | mongo ${outdatabase_name} | tee /dev/stderr | grep "file1_${data_source_out}" cat ${receiver_folder}/processed/file1_${data_source_out} | grep hello1 cat ${receiver_folder}/processed/file2_${data_source_out} | grep hello2 diff --git a/receiver/src/request_handler/request_handler_db_write.cpp b/receiver/src/request_handler/request_handler_db_write.cpp index 070559bff28c82e1dd053b6adce461d9835c615a..556ebd1797bb1e14ce5dfaaf60f53dc2b0bb37ec 100644 --- a/receiver/src/request_handler/request_handler_db_write.cpp +++ b/receiver/src/request_handler/request_handler_db_write.cpp @@ -77,7 +77,8 @@ MessageMeta RequestHandlerDbWrite::PrepareMessageMeta(const Request* request) co MessageMeta message_meta; message_meta.name = request->GetFileName(); message_meta.size = request->GetDataSize(); - message_meta.id = request->GetDataID(); + message_meta.id = 0; + message_meta.message_id = request->GetDataID(); message_meta.ingest_mode = request->GetIngestMode(); message_meta.buf_id = request->GetSlotId(); message_meta.stream = request->GetStream(); diff --git a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp index d1f5aaa8c7bb9029a94a8faeee802898916fa912..c1a30692bad8377e9952f4b8a0e81f3c79be73aa 100644 --- a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp @@ -168,7 +168,8 @@ MessageMeta DbWriterHandlerTests::PrepareMessageMeta(bool substream) { MessageMeta message_meta; message_meta.size = expected_file_size; message_meta.name = expected_file_name; - message_meta.id = expected_id; + message_meta.id = 0; + message_meta.message_id = expected_id; message_meta.ingest_mode = expected_ingest_mode; if (substream) { message_meta.dataset_substream = expected_substream; diff --git a/tests/automatic/broker/get_last/check_linux.sh b/tests/automatic/broker/get_last/check_linux.sh index 6f50ea116a40be683a75de25dfc07d3d5f0b7b4e..aaf6f17f9293427d7820cfb4cb5ae14e4632ca14 100644 --- a/tests/automatic/broker/get_last/check_linux.sh +++ b/tests/automatic/broker/get_last/check_linux.sh @@ -12,8 +12,8 @@ Cleanup() { echo "db.dropDatabase()" | mongo ${database_name} } -echo "db.data_${stream}.insert({"_id":2})" | mongo ${database_name} -echo "db.data_${stream}.insert({"_id":1})" | mongo ${database_name} +echo "db.data_${stream}.insert({"_id":NumberInt(2),"message_id":NumberInt(2)})" | mongo ${database_name} +echo "db.data_${stream}.insert({"_id":NumberInt(1),"message_id":NumberInt(1)})" | mongo ${database_name} token=$BT_DATA_TOKEN @@ -22,21 +22,22 @@ echo found broker at $broker groupid=`curl -d '' --silent $broker/v0.2/creategroup` -curl -v --silent $broker/v0.2/beamtime/data/detector/${stream}/0/last?token=$token --stderr - -curl -v --silent $broker/v0.2/beamtime/data/detector/${stream}/0/last?token=$token --stderr - | grep '"_id":2' -curl -v --silent $broker/v0.2/beamtime/data/detector/${stream}/0/last?token=$token --stderr - | grep '"_id":2' +curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=message_id" --stderr - -echo "db.data_${stream}.insert({"_id":3})" | mongo ${database_name} +curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=message_id" --stderr - | grep '"_id":2' +curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=message_id" --stderr - | grep '"_id":2' -curl -v --silent $broker/v0.2/beamtime/data/detector/${stream}/0/last?token=$token --stderr - | grep '"_id":3' +echo "db.data_${stream}.insert({"_id":NumberInt(3),"message_id":NumberInt(3)})"| mongo ${database_name} -echo "db.data_${stream}.insert({"_id":4})" | mongo ${database_name} +curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=message_id" --stderr - | grep '"_id":3' -curl -v --silent $broker/v0.2/beamtime/data/detector/${stream}/${groupid}/next?token=$token --stderr - | grep '"_id":1' -curl -v --silent $broker/v0.2/beamtime/data/detector/${stream}/0/last?token=$token --stderr - | grep '"_id":4' +echo "db.data_${stream}.insert({"_id":NumberInt(4),"message_id":NumberInt(4)})" | mongo ${database_name} + +curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/${groupid}/next?token=${token}&id_key=message_id" --stderr - | grep '"_id":1' +curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=message_id" --stderr - | grep '"_id":4' #with a new group groupid=`curl -d '' --silent $broker/v0.2/creategroup` -curl -v --silent $broker/v0.2/beamtime/data/detector/${stream}/${groupid}/next?token=$token --stderr - | grep '"_id":1' -curl -v --silent $broker/v0.2/beamtime/data/detector/${stream}/0/last?token=$token --stderr - | grep '"_id":4' \ No newline at end of file +curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/${groupid}/next?token=${token}&id_key=message_id" --stderr - | grep '"_id":1' +curl -v --silent "${broker}/v0.2/beamtime/data/detector/${stream}/0/last?token=${token}&id_key=message_id" --stderr - | grep '"_id":4' \ No newline at end of file diff --git a/tests/automatic/broker/get_next/check_linux.sh b/tests/automatic/broker/get_next/check_linux.sh index 840a0f12d7583792b841b8954309b553daacae3f..aec3b15000e64ca0b2e527f2e4cb790791dc4ce6 100644 --- a/tests/automatic/broker/get_next/check_linux.sh +++ b/tests/automatic/broker/get_next/check_linux.sh @@ -12,20 +12,19 @@ Cleanup() { echo "db.dropDatabase()" | mongo ${database_name} } -echo "db.data_${stream}.insert({"_id":2})" | mongo ${database_name} -echo "db.data_${stream}.insert({"_id":1})" | mongo ${database_name} +echo "db.data_${stream}.insert({"_id":NumberInt(2),"message_id":NumberInt(1)})" | mongo ${database_name} +echo "db.data_${stream}.insert({"_id":NumberInt(1),"message_id":NumberInt(2)})" | mongo ${database_name} token=$BT_DATA_TOKEN broker=`curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.6` echo found broker at $broker - groupid=`curl -d '' --silent $broker/v0.3/creategroup` -curl -v --silent $broker/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=$token --stderr - | tee /dev/stderr | grep '"_id":1' -curl -v --silent $broker/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=$token --stderr - | tee /dev/stderr | grep '"_id":2' -curl -v --silent $broker/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=$token --stderr - | tee /dev/stderr | grep '"id_max":2' +curl -v --silent "${broker}/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=${token}&id_key=message_id" --stderr - | tee /dev/stderr | grep '"message_id":1' +curl -v --silent "${broker}/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=${token}&id_key=message_id" --stderr - | tee /dev/stderr | grep '"message_id":2' +curl -v --silent "${broker}/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=${token}&id_key=message_id" --stderr - | tee /dev/stderr | grep '"id_max":2' # with a new group groupid=`curl -d '' --silent $broker/v0.3/creategroup` -curl -v --silent $broker/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=$token --stderr - | tee /dev/stderr | grep '"_id":1' \ No newline at end of file +curl -v --silent "${broker}/v0.3/beamtime/data/source/${stream}/${groupid}/next?token=${token}&id_key=message_id" --stderr - | tee /dev/stderr | grep '"message_id":1' \ No newline at end of file diff --git a/tests/automatic/consumer/consumer_api/check_linux.sh b/tests/automatic/consumer/consumer_api/check_linux.sh index 043ad990ffe3beb447df4176ee89263b913a9d63..63d46610c70a72ba4ddcdfc4f989f9371ddb0cb1 100644 --- a/tests/automatic/consumer/consumer_api/check_linux.sh +++ b/tests/automatic/consumer/consumer_api/check_linux.sh @@ -18,21 +18,21 @@ Cleanup() { for i in `seq 1 10`; do - echo 'db.data_default.insert({"_id":'$i',"size":6,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} >/dev/null + echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":6,"name":"'$i'","timestamp":'10-$i',"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} >/dev/null done for i in `seq 1 5`; do - echo 'db.data_stream1.insert({"_id":'$i',"size":6,"name":"'1$i'","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} >/dev/null + echo 'db.data_stream1.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":6,"name":"'1$i'","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} >/dev/null done -echo 'db.data_stream1.insert({"_id":'6',"size":0,"name":"asapo_finish_stream","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"ns"}})' | mongo ${database_name} >/dev/null +echo 'db.data_stream1.insert({"_id":NumberInt(6),"message_id":NumberInt(6),"size":0,"name":"asapo_finish_stream","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"ns"}})' | mongo ${database_name} >/dev/null for i in `seq 1 5`; do - echo 'db.data_stream2.insert({"_id":'$i',"size":6,"name":"'2$i'","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} >/dev/null + echo 'db.data_stream2.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":6,"name":"'2$i'","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} >/dev/null done -echo 'db.data_stream2.insert({"_id":'6',"size":0,"name":"asapo_finish_stream","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"asapo_no_next"}})' | mongo ${database_name} >/dev/null +echo 'db.data_stream2.insert({"_id":NumberInt(6),"message_id":NumberInt(6),"size":0,"name":"asapo_finish_stream","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"asapo_no_next"}})' | mongo ${database_name} >/dev/null echo hello1 > 1 @@ -49,10 +49,10 @@ do messages='' for j in `seq 1 3`; do - messages="$messages,{"_id":$j,"size":6,"name":'${i}_${j}',"timestamp":1000,"source":'none',"buf_id":0,"dataset_substream":0,"meta":{"test":10}}" >/dev/null + messages="$messages,{"_id":$j,"message_id":$j,"size":6,"name":'${i}_${j}',"timestamp":1000,"source":'none',"buf_id":0,"dataset_substream":0,"meta":{"test":10}}" >/dev/null done messages=${messages#?} - echo 'db.data_default.insert({"_id":'$i',"size":3,"messages":['$messages']})' | mongo ${database_name} >/dev/null + echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":3,"messages":['$messages']})' | mongo ${database_name} >/dev/null done for i in `seq 1 5`; @@ -60,10 +60,10 @@ do messages='' for j in `seq 1 2`; do - messages="$messages,{"_id":$j,"size":6,"name":'${i}_${j}',"timestamp":1000,"source":'none',"buf_id":0,"dataset_substream":0,"meta":{"test":10}}" >/dev/null + messages="$messages,{"_id":$j,"message_id":$j,"size":6,"name":'${i}_${j}',"timestamp":1000,"source":'none',"buf_id":0,"dataset_substream":0,"meta":{"test":10}}" >/dev/null done messages=${messages#?} - echo 'db.data_incomplete.insert({"_id":'$i',"size":3,"messages":['$messages']})' | mongo ${database_name} >/dev/null + echo 'db.data_incomplete.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":3,"messages":['$messages']})' | mongo ${database_name} >/dev/null done diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp index dfef86461459c33ba13659055bedd5f77c70fe29..259ad6981cc62378b3d6e7749c77b7d83286512d 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.cpp +++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp @@ -154,7 +154,7 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str M_AssertTrue(err == nullptr, "GetNext stream2 no error"); M_AssertTrue(fi.name == "21", "GetNext stream2 filename"); - auto streams = consumer->GetStreamList("", asapo::StreamFilter::kAllStreams, &err); + auto streams = consumer->GetStreamList("", asapo::StreamFilter::kAllStreams, true, &err); M_AssertTrue(err == nullptr, "GetStreamList no error"); M_AssertTrue(streams.size() == 3, "streams.size"); M_AssertTrue(streams[0].name == "default", "streams0.name"); diff --git a/tests/automatic/consumer/consumer_api_python/check_linux.sh b/tests/automatic/consumer/consumer_api_python/check_linux.sh index 04ea2c9d197c2e75f3c2cf87fdd85dd14bd2c758..c6efb06f43e651db36e2385f63d1fd2053d10cb7 100644 --- a/tests/automatic/consumer/consumer_api_python/check_linux.sh +++ b/tests/automatic/consumer/consumer_api_python/check_linux.sh @@ -23,10 +23,10 @@ echo -n hello1 > $source_path/1_1 for i in `seq 1 5`; do - echo 'db.data_default.insert({"_id":'$i',"size":6,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":6,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} done -echo 'db.data_streamfts.insert({"_id":'1',"size":0,"name":"'1'","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} +echo 'db.data_streamfts.insert({"_id":NumberInt(1),"message_id":NumberInt(1),"size":0,"name":"'1'","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} echo 'db.meta.insert({"_id":"bt","meta":{"data":"test_bt"}})' | mongo ${database_name} echo 'db.meta.insert({"_id":"st_test","meta":{"data":"test_st"}})' | mongo ${database_name} @@ -34,16 +34,16 @@ echo 'db.meta.insert({"_id":"st_test","meta":{"data":"test_st"}})' | mongo ${dat for i in `seq 1 5`; do - echo 'db.data_stream1.insert({"_id":'$i',"size":6,"name":"'1$i'","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data_stream1.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":6,"name":"'1$i'","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} done for i in `seq 1 5`; do - echo 'db.data_stream2.insert({"_id":'$i',"size":6,"name":"'2$i'","timestamp":3000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data_stream2.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":6,"name":"'2$i'","timestamp":3000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} done -echo 'db.data_stream1.insert({"_id":'6',"size":0,"name":"asapo_finish_stream","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"ns"}})' | mongo ${database_name} -echo 'db.data_stream2.insert({"_id":'6',"size":0,"name":"asapo_finish_stream","timestamp":3000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"asapo_no_next"}})' | mongo ${database_name} +echo 'db.data_stream1.insert({"_id":NumberInt(6),"message_id":NumberInt(6),"size":0,"name":"asapo_finish_stream","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"ns"}})' | mongo ${database_name} +echo 'db.data_stream2.insert({"_id":NumberInt(6),"message_id":NumberInt(6),"size":0,"name":"asapo_finish_stream","timestamp":3000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"next_stream":"asapo_no_next"}})' | mongo ${database_name} sleep 1 @@ -62,21 +62,22 @@ do messages='' for j in `seq 1 3`; do - messages="$messages,{"_id":$j,"size":6,"name":'${i}_${j}',"timestamp":0,"source":'none',"buf_id":0,"dataset_substream":0,"meta":{"test":10}}" + messages="$messages,{"_id":$j,"message_id":$j,"size":6,"name":'${i}_${j}',"timestamp":0,"source":'none',"buf_id":0,"dataset_substream":0,"meta":{"test":10}}" >/dev/null done messages=${messages#?} - echo 'db.data_default.insert({"_id":'$i',"size":3,"messages":['$messages']})' | mongo ${database_name} >/dev/null + echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":3,"messages":['$messages']})' | mongo ${database_name} >/dev/null done + for i in `seq 1 5`; do messages='' for j in `seq 1 2`; do - messages="$messages,{"_id":$j,"size":6,"name":'${i}_${j}',"timestamp":1000,"source":'none',"buf_id":0,"dataset_substream":0,"meta":{"test":10}}" + messages="$messages,{"_id":NumberInt('$j'),"message_id":NumberInt('$j'),"size":6,"name":'${i}_${j}',"timestamp":1000,"source":'none',"buf_id":0,"dataset_substream":0,"meta":{"test":10}}" done messages=${messages#?} - echo 'db.data_incomplete.insert({"_id":'$i',"size":3,"messages":['$messages']})' | mongo ${database_name} + echo 'db.data_incomplete.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":3,"messages":['$messages']})' | mongo ${database_name} done diff --git a/tests/automatic/consumer/next_multithread_broker/check_linux.sh b/tests/automatic/consumer/next_multithread_broker/check_linux.sh index 02e48655045cd8e9b824f709a28616775546f997..600b77dfaed3ab66eb7d2940aa6060dc2d42616e 100644 --- a/tests/automatic/consumer/next_multithread_broker/check_linux.sh +++ b/tests/automatic/consumer/next_multithread_broker/check_linux.sh @@ -13,7 +13,7 @@ Cleanup() { for i in `seq 1 10`; do - echo 'db.data_default.insert({"_id":'$i',"size":100,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":100,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} done $@ 127.0.0.1:8400 test_run 4 10 $token_test_run diff --git a/tests/automatic/mongo_db/auto_id/auto_id.cpp b/tests/automatic/mongo_db/auto_id/auto_id.cpp index 337360aa8d5a365445ea6d8636d3c8ddb0f2baa9..7f24aa932928825dafc79319d9551075d13f6660 100644 --- a/tests/automatic/mongo_db/auto_id/auto_id.cpp +++ b/tests/automatic/mongo_db/auto_id/auto_id.cpp @@ -52,14 +52,14 @@ Args GetArgs(int argc, char* argv[]) { } void Insert(const asapo::MongoDBClient& db, const std::string& name, asapo::MessageMeta fi, const Args& args) { - auto start = fi.id; + auto start = fi.message_id; for (int i = 0; i < args.n_messages_per_thread; i++) { switch (args.mode) { case Mode::kTransaction: - fi.id = 0; + fi.message_id = 0; break; case Mode::kUpdateCounterThenIngest: - fi.id = start + static_cast<uint64_t>(i) + 1; + fi.message_id = start + static_cast<uint64_t>(i) + 1; break; } uint64_t inserted_id{0}; @@ -87,7 +87,8 @@ int main(int argc, char* argv[]) { fi.timestamp = std::chrono::system_clock::now(); fi.buf_id = 18446744073709551615ull; fi.source = "host:1234"; - fi.id = static_cast<uint64_t>(args.n_messages_per_thread * i); + fi.id = 0; + fi.message_id = static_cast<uint64_t>(args.n_messages_per_thread * i); db.Connect("127.0.0.1", db_name); Insert(db, "stream", fi, args); }; diff --git a/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp b/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp index 0beb953e44fc5d3473d9e83a017005727998e172..0642a3246fa7bd29ca49a9e02b4c04981bb98d15 100644 --- a/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp +++ b/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp @@ -55,7 +55,8 @@ int main(int argc, char* argv[]) { asapo::MessageMeta fi; fi.size = 100; fi.name = "relpath/1"; - fi.id = static_cast<uint64_t>(args.file_id); + fi.id = 0; + fi.message_id = static_cast<uint64_t>(args.file_id); fi.timestamp = std::chrono::system_clock::now(); fi.buf_id = 18446744073709551615ull; fi.source = "host:1234"; @@ -78,7 +79,7 @@ int main(int argc, char* argv[]) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); auto fi1 = fi; auto fi2 = fi; - fi2.id = 123; + fi2.message_id = 123; fi1.timestamp = std::chrono::system_clock::now(); fi2.timestamp = std::chrono::system_clock::now() + std::chrono::minutes(1); fi2.name = asapo::kFinishStreamKeyword; @@ -92,7 +93,8 @@ int main(int argc, char* argv[]) { asapo::MessageMeta fi_db; asapo::MongoDBClient db_new; db_new.Connect("127.0.0.1", db_name); - err = db_new.GetById(std::string("data_") + stream_name, fi.id, &fi_db); + err = db_new.GetById(std::string("data_") + stream_name, fi.message_id, &fi_db); + fi_db.id = fi.id; M_AssertTrue(fi_db == fi, "get record from db"); M_AssertEq(nullptr, err); err = db_new.GetById(std::string("data_") + stream_name, 0, &fi_db); @@ -102,11 +104,11 @@ int main(int argc, char* argv[]) { err = db.GetStreamInfo(std::string("data_") + stream_name, &info); M_AssertEq(nullptr, err); - M_AssertEq(fi.id, info.last_id); + M_AssertEq(fi.message_id, info.last_id); err = db.GetLastStream(&info); M_AssertEq(nullptr, err); - M_AssertEq(fi2.id, info.last_id); + M_AssertEq(fi2.message_id, info.last_id); M_AssertEq("test1", info.name); M_AssertEq(true, info.finished); M_AssertEq("ns", info.next_stream); diff --git a/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp b/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp index ee9cea6c0b567e6d3dc1ee7f614c58cc4124c416..9625eaf09a3a7ee79126484ce4a3f8d1992aa09a 100644 --- a/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp +++ b/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp @@ -13,6 +13,7 @@ void Assert(const Error& error, const std::string& expect) { } else { result = error->Explain(); } + std::cout << "Assertion: " << result << std::endl; M_AssertContains(result, expect); } @@ -40,7 +41,8 @@ int main(int argc, char* argv[]) { fi.timestamp = std::chrono::system_clock::now(); fi.buf_id = 18446744073709551615ull; fi.source = "host:1234"; - fi.id = static_cast<uint64_t>(args.file_id); + fi.id = 0; + fi.message_id = static_cast<uint64_t>(args.file_id); fi.dataset_substream = 10; uint64_t dataset_size = 2; @@ -62,7 +64,12 @@ int main(int argc, char* argv[]) { if (args.keyword == "OK") { // check retrieve asapo::MessageMeta fi_db; - err = db.GetDataSetById("data_test", fi.dataset_substream, fi.id, &fi_db); + err = db.GetDataSetById("data_test", fi.dataset_substream, fi.message_id, &fi_db); + fi_db.id = 0; + if (err != nullptr) { + std::cout << "GetDataSetById failed: " << err->Explain() << std::endl; + } + M_AssertTrue(fi_db == fi, "get record from db"); M_AssertEq(nullptr, err); err = db.GetDataSetById("data_test", 0, 0, &fi_db); @@ -72,26 +79,30 @@ int main(int argc, char* argv[]) { err = db.GetStreamInfo("data_test", &info); M_AssertEq(nullptr, err); - M_AssertEq(fi.id, info.last_id); + M_AssertEq(fi.message_id, info.last_id); asapo::StreamInfo info_last; err = db.GetLastStream(&info_last); M_AssertEq(nullptr, err); M_AssertEq("test", info_last.name); - M_AssertEq(fi.id, info_last.last_id); + M_AssertEq(fi.message_id, info_last.last_id); M_AssertEq(false, info_last.finished); auto fi2 = fi; fi2.id = 123; + fi2.message_id = 123; fi2.timestamp = std::chrono::system_clock::now() + std::chrono::minutes(1); fi2.name = asapo::kFinishStreamKeyword; fi2.metadata = R"({"next_stream":"ns"})"; - db.Insert("data_test", fi2, false, nullptr); + err = db.Insert("data_test", fi2, false, nullptr); + if (err != nullptr) { + std::cout << "Insert failed: " << err->Explain() << std::endl; + } err = db.GetLastStream(&info_last); M_AssertEq(nullptr, err); M_AssertEq("test", info_last.name); - M_AssertEq(fi2.id, info_last.last_id); + M_AssertEq(fi2.message_id, info_last.last_id); M_AssertEq(true, info_last.finished); err = db.DeleteStream("test"); M_AssertEq(nullptr, err); diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index 14f1a18a7f7284afda97927b3f51132ec8a1959c..bcd5f08de7296816ba6c34bbd8289d7be068c2e9 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -21,7 +21,7 @@ nthreads = 8 def assert_eq(val, expected, name): print("asserting eq for " + name) if val != expected: - print("error at " + name) + print("assertion error at: " + name) print('val: ', val, ' expected: ', expected) sys.exit(1) @@ -100,7 +100,7 @@ try: producer.send(8, "processed/" + data_source + "/" + "file8", x, ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback) except asapo_producer.AsapoWrongInputError as e: - print(e) + print("Expected error: ", e) else: print("should be error sending non-cont array") sys.exit(1) @@ -121,7 +121,7 @@ try: producer.send(0, "processed/" + data_source + "/" + "file6", b"hello", ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback) except asapo_producer.AsapoWrongInputError as e: - print(e) + print("Expected error: ", e) else: print("should be error sending id 0 ") sys.exit(1) @@ -178,7 +178,7 @@ producer.set_requests_queue_limits(0,1) try: producer.send(13, "processed/bla", data) except asapo_producer.AsapoRequestsPoolIsFull as e: - print(e) + print("Expected error: ", e) else: print("should be AsapoRequestsPoolIsFull error ") sys.exit(1) @@ -237,7 +237,7 @@ producer.delete_stream('unknown_stream',error_on_not_exist = False) try: producer.delete_stream('unknown_stream',error_on_not_exist = True) except asapo_producer.AsapoWrongInputError as e: - print(e) + print("Expected error: ", e) else: print("should be error on delete unknown stream with flag") sys.exit(1) @@ -247,7 +247,7 @@ else: try: producer = asapo_producer.create_producer(endpoint,'processed', beamtime, 'auto', data_source, token, 0, 0) except asapo_producer.AsapoWrongInputError as e: - print(e) + print("Expected error: ", e) else: print("should be error") sys.exit(1) diff --git a/tests/automatic/producer_receiver/transfer_datasets/check_linux.sh b/tests/automatic/producer_receiver/transfer_datasets/check_linux.sh index 1adb4357351474943a6a5ff3406617e31c0c409b..7c1769c59954a6b1f5675b40b409046b4f2f944a 100644 --- a/tests/automatic/producer_receiver/transfer_datasets/check_linux.sh +++ b/tests/automatic/producer_receiver/transfer_datasets/check_linux.sh @@ -28,4 +28,4 @@ ls -ln ${receiver_folder}/processed/1_1 | awk '{ print $5 }'| grep 100000 ls -ln ${receiver_folder}/processed/1_2 | awk '{ print $5 }'| grep 100000 ls -ln ${receiver_folder}/processed/1_3 | awk '{ print $5 }'| grep 100000 -echo 'db.data_default.find({"messages._id":{$gt:0}},{"messages.name":1})' | mongo asapo_test_detector | grep 1_1 | grep 1_2 | grep 1_3 +echo 'db.data_default.find({"messages.message_id":{$gt:0}},{"messages.name":1})' | mongo asapo_test_detector | grep 1_1 | grep 1_2 | grep 1_3 diff --git a/tests/automatic/support/getnext/check_linux.sh b/tests/automatic/support/getnext/check_linux.sh index 2cb761da0fcdbc11a634e2c46dcfcb0417885c96..c880d75ddd1094e7d19df1b587ffd3fc30e224c2 100644 --- a/tests/automatic/support/getnext/check_linux.sh +++ b/tests/automatic/support/getnext/check_linux.sh @@ -17,7 +17,7 @@ Cleanup() { for i in `seq 1 3`; do - echo 'db.data_default.insert({"_id":'$i',"size":100,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data_default.insert({"_id":NumberInt('$i'),"message_id":NumberInt('$i'),"size":100,"name":"'$i'","timestamp":0,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} done sleep 1