Skip to content
Snippets Groups Projects
Commit b3f49ab8 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

incomplete datasets in broker

parent 17ae5c6c
No related branches found
No related tags found
No related merge requests found
package database
import "asapo_common/utils"
type Request struct {
DbName string
DbCollectionName string
......@@ -31,3 +33,13 @@ type DBError struct {
func (err *DBError) Error() string {
return err.Message
}
func GetStatusCodeFromError(err error) int {
err_db, ok := err.(*DBError)
if ok {
return err_db.Code
} else {
return utils.StatusServiceUnavailable
}
}
......@@ -8,7 +8,9 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"math"
......@@ -159,14 +161,19 @@ func (db *Mongodb) insertMeta(dbname string, s interface{}) error {
return err
}
func (db *Mongodb) getMaxIndex(request Request) (max_id int, err error) {
func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool) (max_id int, err error) {
c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.DbCollectionName)
var q bson.M
if request.DatasetOp {
q = bson.M{"$expr": bson.M{"$eq": []interface{}{"$size", bson.M{"$size": "$images"}}}}
if request.DatasetOp && !returnIncompete {
if request.MinDatasetSize>0 {
q = bson.M{"size": bson.M{"$gte": request.MinDatasetSize}}
} else {
q = bson.M{"$expr": bson.M{"$eq": []interface{}{"$size", bson.M{"$size": "$images"}}}}
}
} else {
q = nil
}
opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true)
var result ID
err = c.FindOne(context.TODO(), q, opts).Decode(&result)
......@@ -236,24 +243,47 @@ func encodeAnswer(id, id_max int, next_substream string) string {
func (db *Mongodb) getRecordByIDRow(request Request, id, id_max int) ([]byte, error) {
var res map[string]interface{}
var q bson.M
if request.DatasetOp {
q = bson.M{"$and": []bson.M{bson.M{"_id": id}, bson.M{"$expr": bson.M{"$eq": []interface{}{"$size", bson.M{"$size": "$images"}}}}}}
} else {
q = bson.M{"_id": id}
}
q := bson.M{"_id": id}
c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.DbCollectionName)
err := c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res)
if err != nil {
answer := encodeAnswer(id, id_max, "")
log_str := "error getting record id " + strconv.Itoa(id) + " for " + request.DbName + " : " + err.Error()
fmt.Println(err)
logger.Debug(log_str)
return nil, &DBError{utils.StatusNoData, answer}
}
log_str := "got record id " + strconv.Itoa(id) + " for " + request.DbName
logger.Debug(log_str)
return utils.MapToJson(&res)
partialData := false
if request.DatasetOp {
imgs,ok1 :=res["images"].(primitive.A)
expectedSize,ok2 := utils.InterfaceToInt64(res["size"])
if !ok1 || !ok2 {
return nil, &DBError{utils.StatusTransactionInterrupted, "getRecordByIDRow: cannot parse database response" }
}
nImages := len(imgs)
if (request.MinDatasetSize==0 && int64(nImages)!=expectedSize) || (request.MinDatasetSize==0 && nImages<request.MinDatasetSize) {
partialData = true
}
}
if partialData {
log_str := "got record id " + strconv.Itoa(id) + " for " + request.DbName
logger.Debug(log_str)
} else {
log_str := "got record id " + strconv.Itoa(id) + " for " + request.DbName
logger.Debug(log_str)
}
answer,err := utils.MapToJson(&res)
if err!=nil {
return nil,err
}
if partialData {
return nil,&DBError{utils.StatusPartialData, string(answer)}
}
return answer,nil
}
func (db *Mongodb) getEarliestRecord(dbname string, collection_name string) (map[string]interface{}, error) {
......@@ -278,7 +308,7 @@ func (db *Mongodb) getRecordByID(request Request) ([]byte, error) {
return nil, &DBError{utils.StatusWrongInput, err.Error()}
}
max_ind, err := db.getMaxIndex(request)
max_ind, err := db.getMaxIndex(request,true)
if err != nil {
return nil, err
}
......@@ -338,7 +368,7 @@ func (db *Mongodb) checkDatabaseOperationPrerequisites(request Request) error {
}
func (db *Mongodb) getCurrentPointer(request Request) (LocationPointer, int, error) {
max_ind, err := db.getMaxIndex(request)
max_ind, err := db.getMaxIndex(request,true)
if err != nil {
return LocationPointer{}, 0, err
}
......@@ -429,7 +459,7 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi
}
}
if record_ind != 0 {
max_ind, err = db.getMaxIndex(request)
max_ind, err = db.getMaxIndex(request, true)
if err != nil {
return 0, 0, err
}
......@@ -512,7 +542,7 @@ func (db *Mongodb) getNextRecord(request Request) ([]byte, error) {
}
data, err := db.getRecordByIDRow(request, nextInd, maxInd)
if nextInd == maxInd {
if nextInd == maxInd && GetStatusCodeFromError(err)!=utils.StatusPartialData {
data, err = db.processLastRecord(request,data, err)
}
......@@ -526,7 +556,7 @@ func (db *Mongodb) getNextRecord(request Request) ([]byte, error) {
}
func (db *Mongodb) getLastRecord(request Request) ([]byte, error) {
max_ind, err := db.getMaxIndex(request)
max_ind, err := db.getMaxIndex(request, false)
if err != nil {
return nil, err
}
......@@ -669,7 +699,7 @@ func (db *Mongodb) nacks(request Request) ([]byte, error) {
}
if to == 0 {
to, err = db.getMaxIndex(request)
to, err = db.getMaxIndex(request, true)
if err != nil {
return nil, err
}
......
......@@ -3,6 +3,7 @@
package database
import (
"asapo_common/utils"
"context"
"errors"
"go.mongodb.org/mongo-driver/bson"
......@@ -76,14 +77,10 @@ func updateTimestamps(db *Mongodb, db_name string, rec *SubstreamsRecord) {
}
res, err := db.getEarliestRecord(db_name, record.Name)
if err == nil {
ts,ok:=res["timestamp"].(int64)
var tsint float64
if !ok { // we need this (at least for tests) since by default values are float in mongo
tsint,ok = res["timestamp"].(float64)
ts = int64(tsint)
}
ts,ok:=utils.InterfaceToInt64(res["timestamp"])
if ok {
rec.Substreams[i].Timestamp = ts }
rec.Substreams[i].Timestamp = ts
}
}
}
}
......
......@@ -13,15 +13,15 @@ import (
)
type TestRecord struct {
ID int `bson:"_id" json:"_id"`
ID int64 `bson:"_id" json:"_id"`
Meta map[string]string `bson:"meta" json:"meta"`
Name string `bson:"name" json:"name"`
Timestamp int64 `bson:"timestamp" json:"timestamp"`
}
type TestDataset struct {
ID int `bson:"_id" json:"_id"`
Size int `bson:"size" json:"size"`
ID int64 `bson:"_id" json:"_id"`
Size int64 `bson:"size" json:"size"`
Images []TestRecord `bson:"images" json:"images"`
}
......@@ -187,7 +187,7 @@ func getNOnes(array []int) int {
func insertRecords(n int) {
records := make([]TestRecord, n)
for ind, record := range records {
record.ID = ind + 1
record.ID = int64(ind) + 1
record.Name = string(ind)
if err := db.insertRecord(dbname, collection, &record); err != nil {
fmt.Println("error at insert ", ind)
......@@ -589,14 +589,31 @@ func TestMongoDBNoDataOnNotCompletedFirstDataset(t *testing.T) {
db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", DatasetOp: true})
_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", DatasetOp: true})
assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_substream\":\"\"}", err.(*DBError).Message)
assert.Equal(t, utils.StatusPartialData, err.(*DBError).Code)
var res TestDataset
json.Unmarshal([]byte(err.(*DBError).Message), &res)
assert.Equal(t, rec_dataset1_incomplete, res)
}
func TestMongoDBReturnInCompletedDataset(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
res_string, err := db.ProcessRequest(Request{DbName: dbname,
DbCollectionName: collection, GroupId: groupId, Op: "next", DatasetOp: true, MinDatasetSize: 1})
assert.Nil(t, err)
var res TestDataset
json.Unmarshal(res_string, &res)
assert.Equal(t, "", string(res_string))
assert.Equal(t, rec_dataset1_incomplete, res)
}
func TestMongoDBGetRecordLastDataSetSkipsIncompleteSets(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
......@@ -614,6 +631,24 @@ func TestMongoDBGetRecordLastDataSetSkipsIncompleteSets(t *testing.T) {
assert.Equal(t, rec_dataset1, res)
}
func TestMongoDBGetRecordLastDataSetReturnsIncompleteSets(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, collection, &rec_dataset1)
db.insertRecord(dbname, collection, &rec_dataset2)
res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last",
DatasetOp:true,MinDatasetSize: 2,ExtraParam: "0"})
assert.Nil(t, err)
var res TestDataset
json.Unmarshal(res_string, &res)
assert.Equal(t, rec_dataset2, res)
}
func TestMongoDBGetRecordLastDataSetOK(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
......@@ -647,6 +682,38 @@ func TestMongoDBGetDatasetID(t *testing.T) {
}
func TestMongoDBErrorOnIncompleteDatasetID(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp:true, ExtraParam: "1"})
assert.Equal(t, utils.StatusPartialData, err.(*DBError).Code)
var res TestDataset
json.Unmarshal([]byte(err.(*DBError).Message), &res)
assert.Equal(t, rec_dataset1_incomplete, res)
}
func TestMongoDBOkOnIncompleteDatasetID(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp:true,MinDatasetSize: 3,ExtraParam: "1"})
assert.Nil(t, err)
var res TestDataset
json.Unmarshal(res_string, &res)
assert.Equal(t, rec_dataset1_incomplete, res)
}
type Substream struct {
name string
records []TestRecord
......
......@@ -83,18 +83,9 @@ func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_par
w.Write(answer)
}
func getStatusCodeFromDbError(err error) int {
err_db, ok := err.(*database.DBError)
if ok {
return err_db.Code
} else {
return utils.StatusServiceUnavailable
}
}
func returnError(err error, log_str string) (answer []byte, code int) {
code = getStatusCodeFromDbError(err)
if code != utils.StatusNoData {
code = database.GetStatusCodeFromError(err)
if code != utils.StatusNoData && code != utils.StatusPartialData{
logger.Error(log_str + " - " + err.Error())
} else {
logger.Debug(log_str + " - " + err.Error())
......@@ -103,7 +94,7 @@ func returnError(err error, log_str string) (answer []byte, code int) {
}
func reconnectIfNeeded(db_error error) {
code := getStatusCodeFromDbError(db_error)
code := database.GetStatusCodeFromError(db_error)
if code != utils.StatusServiceUnavailable {
return
}
......
......@@ -25,6 +25,16 @@ func MapToJson(res interface{}) ([]byte, error) {
}
}
func InterfaceToInt64(val interface{}) (int64, bool) {
val64, ok := val.(int64)
var valf64 float64
if !ok { // we need this (at least for tests) since by default values are float in mongo
valf64, ok = val.(float64)
val64 = int64(valf64)
}
return val64, ok
}
func ReadJsonFromFile(fname string, config interface{}) error {
content, err := ioutil.ReadFile(fname)
if err != nil {
......
......@@ -9,7 +9,8 @@ const (
const (
//error codes
StatusTransactionInterrupted = http.StatusInternalServerError
StatusServiceUnavailable = http.StatusNotFound
StatusServiceUnavailable = http.StatusNotFound
StatusWrongInput = http.StatusBadRequest
StatusNoData = http.StatusConflict
StatusPartialData = http.StatusPartialContent
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment