Skip to content
Snippets Groups Projects
Commit c94c826c authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

create position pointer if not exist on each get_next request

parent a690cf54
No related branches found
No related tags found
No related merge requests found
......@@ -104,7 +104,7 @@ func (db *Mongodb) Close() {
}
}
func (db *Mongodb) deleteAllRecords(dbname string) (err error) {
func (db *Mongodb) dropDatabase(dbname string) (err error) {
if db.client == nil {
return &DBError{utils.StatusServiceUnavailable, no_session_msg}
}
......@@ -146,6 +146,7 @@ func (db *Mongodb) getMaxIndex(dbname string, collection_name string, dataset bo
err = c.FindOne(context.TODO(), q, opts).Decode(&result)
if err == mongo.ErrNoDocuments {
return 0, nil
}
return result.ID, err
......@@ -168,15 +169,25 @@ func (db *Mongodb) setCounter(dbname string, collection_name string, group_id st
return
}
func duplicateError(err error) bool {
command_error, ok := err.(mongo.CommandError)
if (!ok) {
return false
}
return command_error.Name=="DuplicateKey"
}
func (db *Mongodb) incrementField(dbname string, collection_name string, group_id string, max_ind int, res interface{}) (err error) {
update := bson.M{"$inc": bson.M{pointer_field_name: 1}}
opts := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After)
opts := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After)
q := bson.M{"_id": group_id + "_" + collection_name, pointer_field_name: bson.M{"$lt": max_ind}}
c := db.client.Database(dbname).Collection(pointer_collection_name)
err = c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res)
if err != nil {
if err == mongo.ErrNoDocuments {
// duplicateerror can happen because we set Upsert=true to allow insert pointer when it is absent. But then it will try to insert
// pointer in case query found nothing, also when pointer exists and equal to max_ind. Here we have to return NoData
if err == mongo.ErrNoDocuments || duplicateError(err) {
return &DBError{utils.StatusNoData, encodeAnswer(max_ind, max_ind, "")}
}
return &DBError{utils.StatusTransactionInterrupted, err.Error()}
......@@ -289,6 +300,10 @@ func (db *Mongodb) getCurrentPointer(db_name string, collection_name string, gro
return LocationPointer{}, 0, err
}
if (max_ind == 0) {
return LocationPointer{}, 0, &DBError{utils.StatusNoData, encodeAnswer(0, 0, "")}
}
var curPointer LocationPointer
err = db.incrementField(db_name, collection_name, group_id, max_ind, &curPointer)
if err != nil {
......
......@@ -52,7 +52,7 @@ func cleanup() {
if db.client == nil {
return
}
db.deleteAllRecords(dbname)
db.dropDatabase(dbname)
db.db_pointers_created = nil
db.Close()
}
......@@ -222,6 +222,47 @@ func TestMongoDBGetNextInParallel(t *testing.T) {
assert.Equal(t, n, getNOnes(results))
}
func TestMongoDBGetLastAfterErasingDatabase(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
insertRecords(10)
db.ProcessRequest(dbname, collection, groupId, "next", "0")
db.dropDatabase(dbname)
db.insertRecord(dbname, collection, &rec1)
db.insertRecord(dbname, collection, &rec2)
res, err := db.ProcessRequest(dbname, collection, groupId, "last", "0")
assert.Nil(t, err)
assert.Equal(t, string(rec2_expect), string(res))
}
func TestMongoDBGetNextAfterErasingDatabase(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
insertRecords(200)
db.ProcessRequest(dbname, collection, groupId, "next", "0")
db.dropDatabase(dbname)
n := 100
insertRecords(n)
results := getRecords(n)
assert.Equal(t, n, getNOnes(results))
}
func TestMongoDBGetNextEmptyAfterErasingDatabase(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
insertRecords(10)
db.ProcessRequest(dbname, collection, groupId, "next", "0")
db.dropDatabase(dbname)
_, err := db.ProcessRequest(dbname, collection, groupId, "next", "0")
assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_substream\":\"\"}", err.Error())
}
func TestMongoDBgetRecordByID(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment