diff --git a/broker/src/asapo_broker/database/database.go b/broker/src/asapo_broker/database/database.go index 5df85c2ddd38784896fbf0535220006415fd1287..2ec1142d4dde6c50ea2557dff8ea0af6401d939c 100644 --- a/broker/src/asapo_broker/database/database.go +++ b/broker/src/asapo_broker/database/database.go @@ -1,5 +1,7 @@ package database +import "asapo_common/utils" + type Request struct { DbName string DbCollectionName string @@ -31,3 +33,13 @@ type DBError struct { func (err *DBError) Error() string { return err.Message } + +func GetStatusCodeFromError(err error) int { + err_db, ok := err.(*DBError) + if ok { + return err_db.Code + } else { + return utils.StatusServiceUnavailable + } +} + diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 041f6b0b3bf7ff4c0925f731c0bc44e0bc03204e..83d57f167c7643264ab5f34a7d8c4b7c59ff8c6b 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -8,7 +8,9 @@ import ( "context" "encoding/json" "errors" + "fmt" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "math" @@ -159,14 +161,19 @@ func (db *Mongodb) insertMeta(dbname string, s interface{}) error { return err } -func (db *Mongodb) getMaxIndex(request Request) (max_id int, err error) { +func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool) (max_id int, err error) { c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.DbCollectionName) var q bson.M - if request.DatasetOp { - q = bson.M{"$expr": bson.M{"$eq": []interface{}{"$size", bson.M{"$size": "$images"}}}} + if request.DatasetOp && !returnIncompete { + if request.MinDatasetSize>0 { + q = bson.M{"size": bson.M{"$gte": request.MinDatasetSize}} + } else { + q = bson.M{"$expr": bson.M{"$eq": []interface{}{"$size", bson.M{"$size": "$images"}}}} + } } else { q = nil } + opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true) var result ID err = c.FindOne(context.TODO(), q, opts).Decode(&result) @@ -236,24 +243,47 @@ func encodeAnswer(id, id_max int, next_substream string) string { func (db *Mongodb) getRecordByIDRow(request Request, id, id_max int) ([]byte, error) { var res map[string]interface{} - var q bson.M - if request.DatasetOp { - q = bson.M{"$and": []bson.M{bson.M{"_id": id}, bson.M{"$expr": bson.M{"$eq": []interface{}{"$size", bson.M{"$size": "$images"}}}}}} - } else { - q = bson.M{"_id": id} - } + q := bson.M{"_id": id} c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.DbCollectionName) err := c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res) if err != nil { answer := encodeAnswer(id, id_max, "") log_str := "error getting record id " + strconv.Itoa(id) + " for " + request.DbName + " : " + err.Error() + fmt.Println(err) logger.Debug(log_str) return nil, &DBError{utils.StatusNoData, answer} } - log_str := "got record id " + strconv.Itoa(id) + " for " + request.DbName - logger.Debug(log_str) - return utils.MapToJson(&res) + + partialData := false + if request.DatasetOp { + imgs,ok1 :=res["images"].(primitive.A) + expectedSize,ok2 := utils.InterfaceToInt64(res["size"]) + if !ok1 || !ok2 { + return nil, &DBError{utils.StatusTransactionInterrupted, "getRecordByIDRow: cannot parse database response" } + } + nImages := len(imgs) + if (request.MinDatasetSize==0 && int64(nImages)!=expectedSize) || (request.MinDatasetSize==0 && nImages<request.MinDatasetSize) { + partialData = true + } + } + + if partialData { + log_str := "got record id " + strconv.Itoa(id) + " for " + request.DbName + logger.Debug(log_str) + } else { + log_str := "got record id " + strconv.Itoa(id) + " for " + request.DbName + logger.Debug(log_str) + } + + answer,err := utils.MapToJson(&res) + if err!=nil { + return nil,err + } + if partialData { + return nil,&DBError{utils.StatusPartialData, string(answer)} + } + return answer,nil } func (db *Mongodb) getEarliestRecord(dbname string, collection_name string) (map[string]interface{}, error) { @@ -278,7 +308,7 @@ func (db *Mongodb) getRecordByID(request Request) ([]byte, error) { return nil, &DBError{utils.StatusWrongInput, err.Error()} } - max_ind, err := db.getMaxIndex(request) + max_ind, err := db.getMaxIndex(request,true) if err != nil { return nil, err } @@ -338,7 +368,7 @@ func (db *Mongodb) checkDatabaseOperationPrerequisites(request Request) error { } func (db *Mongodb) getCurrentPointer(request Request) (LocationPointer, int, error) { - max_ind, err := db.getMaxIndex(request) + max_ind, err := db.getMaxIndex(request,true) if err != nil { return LocationPointer{}, 0, err } @@ -429,7 +459,7 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi } } if record_ind != 0 { - max_ind, err = db.getMaxIndex(request) + max_ind, err = db.getMaxIndex(request, true) if err != nil { return 0, 0, err } @@ -512,7 +542,7 @@ func (db *Mongodb) getNextRecord(request Request) ([]byte, error) { } data, err := db.getRecordByIDRow(request, nextInd, maxInd) - if nextInd == maxInd { + if nextInd == maxInd && GetStatusCodeFromError(err)!=utils.StatusPartialData { data, err = db.processLastRecord(request,data, err) } @@ -526,7 +556,7 @@ func (db *Mongodb) getNextRecord(request Request) ([]byte, error) { } func (db *Mongodb) getLastRecord(request Request) ([]byte, error) { - max_ind, err := db.getMaxIndex(request) + max_ind, err := db.getMaxIndex(request, false) if err != nil { return nil, err } @@ -669,7 +699,7 @@ func (db *Mongodb) nacks(request Request) ([]byte, error) { } if to == 0 { - to, err = db.getMaxIndex(request) + to, err = db.getMaxIndex(request, true) if err != nil { return nil, err } diff --git a/broker/src/asapo_broker/database/mongodb_substreams.go b/broker/src/asapo_broker/database/mongodb_substreams.go index a96b89b8b4bc83959995457579c679ffda2c8b79..999e6fa17b1c2b07b24db67e2d0166d7291d336d 100644 --- a/broker/src/asapo_broker/database/mongodb_substreams.go +++ b/broker/src/asapo_broker/database/mongodb_substreams.go @@ -3,6 +3,7 @@ package database import ( + "asapo_common/utils" "context" "errors" "go.mongodb.org/mongo-driver/bson" @@ -76,14 +77,10 @@ func updateTimestamps(db *Mongodb, db_name string, rec *SubstreamsRecord) { } res, err := db.getEarliestRecord(db_name, record.Name) if err == nil { - ts,ok:=res["timestamp"].(int64) - var tsint float64 - if !ok { // we need this (at least for tests) since by default values are float in mongo - tsint,ok = res["timestamp"].(float64) - ts = int64(tsint) - } + ts,ok:=utils.InterfaceToInt64(res["timestamp"]) if ok { - rec.Substreams[i].Timestamp = ts } + rec.Substreams[i].Timestamp = ts + } } } } diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index 22a1ffd8f4b2fe07840a23ed48c6f9c6a81e1b6a..2b06197fb1ef6907b1d5e45addd36d0add381ca5 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -13,15 +13,15 @@ import ( ) type TestRecord struct { - ID int `bson:"_id" json:"_id"` + ID int64 `bson:"_id" json:"_id"` Meta map[string]string `bson:"meta" json:"meta"` Name string `bson:"name" json:"name"` Timestamp int64 `bson:"timestamp" json:"timestamp"` } type TestDataset struct { - ID int `bson:"_id" json:"_id"` - Size int `bson:"size" json:"size"` + ID int64 `bson:"_id" json:"_id"` + Size int64 `bson:"size" json:"size"` Images []TestRecord `bson:"images" json:"images"` } @@ -187,7 +187,7 @@ func getNOnes(array []int) int { func insertRecords(n int) { records := make([]TestRecord, n) for ind, record := range records { - record.ID = ind + 1 + record.ID = int64(ind) + 1 record.Name = string(ind) if err := db.insertRecord(dbname, collection, &record); err != nil { fmt.Println("error at insert ", ind) @@ -589,14 +589,31 @@ func TestMongoDBNoDataOnNotCompletedFirstDataset(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1_incomplete) - res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", DatasetOp: true}) + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", DatasetOp: true}) - assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) - assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_substream\":\"\"}", err.(*DBError).Message) + assert.Equal(t, utils.StatusPartialData, err.(*DBError).Code) + var res TestDataset + json.Unmarshal([]byte(err.(*DBError).Message), &res) + assert.Equal(t, rec_dataset1_incomplete, res) +} + +func TestMongoDBReturnInCompletedDataset(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + + db.insertRecord(dbname, collection, &rec_dataset1_incomplete) + + res_string, err := db.ProcessRequest(Request{DbName: dbname, + DbCollectionName: collection, GroupId: groupId, Op: "next", DatasetOp: true, MinDatasetSize: 1}) + + assert.Nil(t, err) + var res TestDataset + json.Unmarshal(res_string, &res) - assert.Equal(t, "", string(res_string)) + assert.Equal(t, rec_dataset1_incomplete, res) } + func TestMongoDBGetRecordLastDataSetSkipsIncompleteSets(t *testing.T) { db.Connect(dbaddress) defer cleanup() @@ -614,6 +631,24 @@ func TestMongoDBGetRecordLastDataSetSkipsIncompleteSets(t *testing.T) { assert.Equal(t, rec_dataset1, res) } +func TestMongoDBGetRecordLastDataSetReturnsIncompleteSets(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + + db.insertRecord(dbname, collection, &rec_dataset1) + db.insertRecord(dbname, collection, &rec_dataset2) + + res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", + DatasetOp:true,MinDatasetSize: 2,ExtraParam: "0"}) + + assert.Nil(t, err) + + var res TestDataset + json.Unmarshal(res_string, &res) + + assert.Equal(t, rec_dataset2, res) +} + func TestMongoDBGetRecordLastDataSetOK(t *testing.T) { db.Connect(dbaddress) defer cleanup() @@ -647,6 +682,38 @@ func TestMongoDBGetDatasetID(t *testing.T) { } +func TestMongoDBErrorOnIncompleteDatasetID(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + db.insertRecord(dbname, collection, &rec_dataset1_incomplete) + + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp:true, ExtraParam: "1"}) + + assert.Equal(t, utils.StatusPartialData, err.(*DBError).Code) + + var res TestDataset + json.Unmarshal([]byte(err.(*DBError).Message), &res) + + assert.Equal(t, rec_dataset1_incomplete, res) + +} + +func TestMongoDBOkOnIncompleteDatasetID(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + db.insertRecord(dbname, collection, &rec_dataset1_incomplete) + + res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp:true,MinDatasetSize: 3,ExtraParam: "1"}) + + assert.Nil(t, err) + + var res TestDataset + json.Unmarshal(res_string, &res) + + assert.Equal(t, rec_dataset1_incomplete, res) + +} + type Substream struct { name string records []TestRecord diff --git a/broker/src/asapo_broker/server/process_request.go b/broker/src/asapo_broker/server/process_request.go index 5b4bf2f3c3d09a97cb26166a53d634e487a01e50..4adf102b6d48b8319780b1af186122251a45bdc9 100644 --- a/broker/src/asapo_broker/server/process_request.go +++ b/broker/src/asapo_broker/server/process_request.go @@ -83,18 +83,9 @@ func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_par w.Write(answer) } -func getStatusCodeFromDbError(err error) int { - err_db, ok := err.(*database.DBError) - if ok { - return err_db.Code - } else { - return utils.StatusServiceUnavailable - } -} - func returnError(err error, log_str string) (answer []byte, code int) { - code = getStatusCodeFromDbError(err) - if code != utils.StatusNoData { + code = database.GetStatusCodeFromError(err) + if code != utils.StatusNoData && code != utils.StatusPartialData{ logger.Error(log_str + " - " + err.Error()) } else { logger.Debug(log_str + " - " + err.Error()) @@ -103,7 +94,7 @@ func returnError(err error, log_str string) (answer []byte, code int) { } func reconnectIfNeeded(db_error error) { - code := getStatusCodeFromDbError(db_error) + code := database.GetStatusCodeFromError(db_error) if code != utils.StatusServiceUnavailable { return } diff --git a/common/go/src/asapo_common/utils/helpers.go b/common/go/src/asapo_common/utils/helpers.go index 94f0fdfa695044120d6c7c19ef334d0c102a1fa8..9b7dc20936f9c4da8d56825b2cafcc2ea317dcf9 100644 --- a/common/go/src/asapo_common/utils/helpers.go +++ b/common/go/src/asapo_common/utils/helpers.go @@ -25,6 +25,16 @@ func MapToJson(res interface{}) ([]byte, error) { } } +func InterfaceToInt64(val interface{}) (int64, bool) { + val64, ok := val.(int64) + var valf64 float64 + if !ok { // we need this (at least for tests) since by default values are float in mongo + valf64, ok = val.(float64) + val64 = int64(valf64) + } + return val64, ok +} + func ReadJsonFromFile(fname string, config interface{}) error { content, err := ioutil.ReadFile(fname) if err != nil { diff --git a/common/go/src/asapo_common/utils/status_codes.go b/common/go/src/asapo_common/utils/status_codes.go index 9f6e061622fe87e82f779e6bc871cca099b29dc9..7002a963e250b20b9af88c1125b54c2993215aca 100644 --- a/common/go/src/asapo_common/utils/status_codes.go +++ b/common/go/src/asapo_common/utils/status_codes.go @@ -9,7 +9,8 @@ const ( const ( //error codes StatusTransactionInterrupted = http.StatusInternalServerError - StatusServiceUnavailable = http.StatusNotFound + StatusServiceUnavailable = http.StatusNotFound StatusWrongInput = http.StatusBadRequest StatusNoData = http.StatusConflict + StatusPartialData = http.StatusPartialContent )