Skip to content
Snippets Groups Projects
Commit d33db4a0 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

substreams for broker

parent fbca258c
No related branches found
No related tags found
No related merge requests found
Showing
with 170 additions and 141 deletions
package database
type Agent interface {
ProcessRequest(db_name string, group_id string, op string, extra string) ([]byte, error)
ProcessRequest(db_name string, data_collection_name string, group_id string, op string, extra string) ([]byte, error)
Ping() error
Connect(string) error
Close()
......
......@@ -24,7 +24,7 @@ func (db *MockedDatabase) Ping() error {
return args.Error(0)
}
func (db *MockedDatabase) ProcessRequest(db_name string, group_id string, op string, extra_param string) (answer []byte, err error) {
args := db.Called(db_name, group_id, op, extra_param)
func (db *MockedDatabase) ProcessRequest(db_name string, data_collection_name string, group_id string, op string, extra_param string) (answer []byte, err error) {
args := db.Called(db_name, data_collection_name, group_id, op, extra_param)
return args.Get(0).([]byte), args.Error(1)
}
......@@ -26,7 +26,7 @@ type LocationPointer struct {
Value int `bson:"current_pointer"`
}
const data_collection_name = "data_default"
const data_collection_name_prefix = "data_"
const meta_collection_name = "meta"
const pointer_collection_name = "current_location"
const pointer_field_name = "current_pointer"
......@@ -135,12 +135,12 @@ func (db *Mongodb) deleteAllRecords(dbname string) (err error) {
return db.client.Database(dbname).Drop(context.TODO())
}
func (db *Mongodb) insertRecord(dbname string, s interface{}) error {
func (db *Mongodb) insertRecord(dbname string, collection_name string, s interface{}) error {
if db.client == nil {
return &DBError{utils.StatusServiceUnavailable, no_session_msg}
}
c := db.client.Database(dbname).Collection(data_collection_name)
c := db.client.Database(dbname).Collection(data_collection_name_prefix + collection_name)
_, err := c.InsertOne(context.TODO(), s)
return err
......@@ -157,8 +157,8 @@ func (db *Mongodb) insertMeta(dbname string, s interface{}) error {
return err
}
func (db *Mongodb) getMaxIndex(dbname string, dataset bool) (max_id int, err error) {
c := db.client.Database(dbname).Collection(data_collection_name)
func (db *Mongodb) getMaxIndex(dbname string, collection_name string, dataset bool) (max_id int, err error) {
c := db.client.Database(dbname).Collection(data_collection_name_prefix + collection_name)
var q bson.M
if dataset {
q = bson.M{"$expr": bson.M{"$eq": []interface{}{"$size", bson.M{"$size": "$images"}}}}
......@@ -175,27 +175,27 @@ func (db *Mongodb) getMaxIndex(dbname string, dataset bool) (max_id int, err err
return result.ID, err
}
func (db *Mongodb) createLocationPointers(dbname string, group_id string) (err error) {
func (db *Mongodb) createLocationPointers(dbname string, collection_name string, group_id string) (err error) {
opts := options.Update().SetUpsert(true)
update := bson.M{"$inc": bson.M{pointer_field_name: 0}}
q := bson.M{"_id": group_id}
q := bson.M{"_id": group_id + "_" + collection_name}
c := db.client.Database(dbname).Collection(pointer_collection_name)
_, err = c.UpdateOne(context.TODO(), q, update, opts)
return
}
func (db *Mongodb) setCounter(dbname string, group_id string, ind int) (err error) {
func (db *Mongodb) setCounter(dbname string, collection_name string, group_id string, ind int) (err error) {
update := bson.M{"$set": bson.M{pointer_field_name: ind}}
c := db.client.Database(dbname).Collection(pointer_collection_name)
q := bson.M{"_id": group_id}
q := bson.M{"_id": group_id + "_" + collection_name}
_, err = c.UpdateOne(context.TODO(), q, update, options.Update())
return
}
func (db *Mongodb) incrementField(dbname string, group_id string, max_ind int, res interface{}) (err error) {
func (db *Mongodb) incrementField(dbname string, collection_name string, group_id string, max_ind int, res interface{}) (err error) {
update := bson.M{"$inc": bson.M{pointer_field_name: 1}}
opts := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After)
q := bson.M{"_id": group_id, pointer_field_name: bson.M{"$lt": max_ind}}
q := bson.M{"_id": group_id + "_" + collection_name, pointer_field_name: bson.M{"$lt": max_ind}}
c := db.client.Database(dbname).Collection(pointer_collection_name)
err = c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res)
......@@ -219,7 +219,7 @@ func encodeAnswer(id, id_max int) string {
return string(answer)
}
func (db *Mongodb) getRecordByIDRow(dbname string, id, id_max int, dataset bool) ([]byte, error) {
func (db *Mongodb) getRecordByIDRow(dbname string, collection_name string, id, id_max int, dataset bool) ([]byte, error) {
var res map[string]interface{}
var q bson.M
if dataset {
......@@ -228,7 +228,7 @@ func (db *Mongodb) getRecordByIDRow(dbname string, id, id_max int, dataset bool)
q = bson.M{"_id": id}
}
c := db.client.Database(dbname).Collection(data_collection_name)
c := db.client.Database(dbname).Collection(data_collection_name_prefix + collection_name)
err := c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res)
if err != nil {
answer := encodeAnswer(id, id_max)
......@@ -241,41 +241,41 @@ func (db *Mongodb) getRecordByIDRow(dbname string, id, id_max int, dataset bool)
return utils.MapToJson(&res)
}
func (db *Mongodb) getRecordByID(dbname string, group_id string, id_str string, dataset bool) ([]byte, error) {
func (db *Mongodb) getRecordByID(dbname string, collection_name string, group_id string, id_str string, dataset bool) ([]byte, error) {
id, err := strconv.Atoi(id_str)
if err != nil {
return nil, &DBError{utils.StatusWrongInput, err.Error()}
}
max_ind, err := db.getMaxIndex(dbname, dataset)
max_ind, err := db.getMaxIndex(dbname, collection_name, dataset)
if err != nil {
return nil, err
}
return db.getRecordByIDRow(dbname, id, max_ind, dataset)
return db.getRecordByIDRow(dbname, collection_name, id, max_ind, dataset)
}
func (db *Mongodb) needCreateLocationPointersInDb(group_id string) bool {
func (db *Mongodb) needCreateLocationPointersInDb(collection_name string, group_id string) bool {
dbPointersLock.RLock()
needCreate := !db.db_pointers_created[group_id]
needCreate := !db.db_pointers_created[group_id+"_"+collection_name]
dbPointersLock.RUnlock()
return needCreate
}
func (db *Mongodb) setLocationPointersCreateFlag(group_id string) {
func (db *Mongodb) setLocationPointersCreateFlag(collection_name string, group_id string) {
dbPointersLock.Lock()
if db.db_pointers_created == nil {
db.db_pointers_created = make(map[string]bool)
}
db.db_pointers_created[group_id] = true
db.db_pointers_created[group_id+"_"+collection_name] = true
dbPointersLock.Unlock()
}
func (db *Mongodb) generateLocationPointersInDbIfNeeded(db_name string, group_id string) {
if db.needCreateLocationPointersInDb(group_id) {
db.createLocationPointers(db_name, group_id)
db.setLocationPointersCreateFlag(group_id)
func (db *Mongodb) generateLocationPointersInDbIfNeeded(db_name string, collection_name string, group_id string) {
if db.needCreateLocationPointersInDb(collection_name, group_id) {
db.createLocationPointers(db_name, collection_name, group_id)
db.setLocationPointersCreateFlag(collection_name, group_id)
}
}
......@@ -287,7 +287,7 @@ func (db *Mongodb) getParentDB() *Mongodb {
}
}
func (db *Mongodb) checkDatabaseOperationPrerequisites(db_name string, group_id string) error {
func (db *Mongodb) checkDatabaseOperationPrerequisites(db_name string, collection_name string, group_id string) error {
if db.client == nil {
return &DBError{utils.StatusServiceUnavailable, no_session_msg}
}
......@@ -297,19 +297,19 @@ func (db *Mongodb) checkDatabaseOperationPrerequisites(db_name string, group_id
}
if len(group_id) > 0 {
db.getParentDB().generateLocationPointersInDbIfNeeded(db_name, group_id)
db.getParentDB().generateLocationPointersInDbIfNeeded(db_name, collection_name, group_id)
}
return nil
}
func (db *Mongodb) getCurrentPointer(db_name string, group_id string, dataset bool) (LocationPointer, int, error) {
max_ind, err := db.getMaxIndex(db_name, dataset)
func (db *Mongodb) getCurrentPointer(db_name string, collection_name string, group_id string, dataset bool) (LocationPointer, int, error) {
max_ind, err := db.getMaxIndex(db_name, collection_name, dataset)
if err != nil {
return LocationPointer{}, 0, err
}
var curPointer LocationPointer
err = db.incrementField(db_name, group_id, max_ind, &curPointer)
err = db.incrementField(db_name, collection_name, group_id, max_ind, &curPointer)
if err != nil {
return LocationPointer{}, 0, err
}
......@@ -317,8 +317,8 @@ func (db *Mongodb) getCurrentPointer(db_name string, group_id string, dataset bo
return curPointer, max_ind, nil
}
func (db *Mongodb) getNextRecord(db_name string, group_id string, dataset bool) ([]byte, error) {
curPointer, max_ind, err := db.getCurrentPointer(db_name, group_id, dataset)
func (db *Mongodb) getNextRecord(db_name string, collection_name string, group_id string, dataset bool) ([]byte, error) {
curPointer, max_ind, err := db.getCurrentPointer(db_name, collection_name, group_id, dataset)
if err != nil {
log_str := "error getting next pointer for " + db_name + ", groupid: " + group_id + ":" + err.Error()
logger.Debug(log_str)
......@@ -326,23 +326,23 @@ func (db *Mongodb) getNextRecord(db_name string, group_id string, dataset bool)
}
log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + db_name + ", groupid: " + group_id
logger.Debug(log_str)
return db.getRecordByIDRow(db_name, curPointer.Value, max_ind, dataset)
return db.getRecordByIDRow(db_name, collection_name, curPointer.Value, max_ind, dataset)
}
func (db *Mongodb) getLastRecord(db_name string, group_id string, dataset bool) ([]byte, error) {
max_ind, err := db.getMaxIndex(db_name, dataset)
func (db *Mongodb) getLastRecord(db_name string, collection_name string, group_id string, dataset bool) ([]byte, error) {
max_ind, err := db.getMaxIndex(db_name, collection_name, dataset)
if err != nil {
return nil, err
}
res, err := db.getRecordByIDRow(db_name, max_ind, max_ind, dataset)
res, err := db.getRecordByIDRow(db_name, collection_name, max_ind, max_ind, dataset)
db.setCounter(db_name, group_id, max_ind)
db.setCounter(db_name, collection_name, group_id, max_ind)
return res, err
}
func (db *Mongodb) getSize(db_name string) ([]byte, error) {
c := db.client.Database(db_name).Collection(data_collection_name)
func (db *Mongodb) getSize(db_name string, collection_name string) ([]byte, error) {
c := db.client.Database(db_name).Collection(data_collection_name_prefix + collection_name)
var rec SizeRecord
var err error
......@@ -354,13 +354,13 @@ func (db *Mongodb) getSize(db_name string) ([]byte, error) {
return json.Marshal(&rec)
}
func (db *Mongodb) resetCounter(db_name string, group_id string, id_str string) ([]byte, error) {
func (db *Mongodb) resetCounter(db_name string, collection_name string, group_id string, id_str string) ([]byte, error) {
id, err := strconv.Atoi(id_str)
if err != nil {
return nil, err
}
err = db.setCounter(db_name, group_id, id)
err = db.setCounter(db_name, collection_name, group_id, id)
return []byte(""), err
}
......@@ -391,7 +391,7 @@ func (db *Mongodb) processQueryError(query, dbname string, err error) ([]byte, e
return nil, &DBError{utils.StatusNoData, err.Error()}
}
func (db *Mongodb) queryImages(dbname string, query string) ([]byte, error) {
func (db *Mongodb) queryImages(dbname string, collection_name string, query string) ([]byte, error) {
var res []map[string]interface{}
q, sort, err := db.BSONFromSQL(dbname, query)
if err != nil {
......@@ -400,7 +400,7 @@ func (db *Mongodb) queryImages(dbname string, query string) ([]byte, error) {
return nil, &DBError{utils.StatusWrongInput, err.Error()}
}
c := db.client.Database(dbname).Collection(data_collection_name)
c := db.client.Database(dbname).Collection(data_collection_name_prefix + collection_name)
opts := options.Find()
if len(sort) > 0 {
......@@ -427,32 +427,32 @@ func (db *Mongodb) queryImages(dbname string, query string) ([]byte, error) {
}
func (db *Mongodb) ProcessRequest(db_name string, group_id string, op string, extra_param string) (answer []byte, err error) {
func (db *Mongodb) ProcessRequest(db_name string, collection_name string, group_id string, op string, extra_param string) (answer []byte, err error) {
dataset := false
if strings.HasSuffix(op, "_dataset") {
dataset = true
op = op[:len(op)-8]
}
if err := db.checkDatabaseOperationPrerequisites(db_name, group_id); err != nil {
if err := db.checkDatabaseOperationPrerequisites(db_name, collection_name, group_id); err != nil {
return nil, err
}
switch op {
case "next":
return db.getNextRecord(db_name, group_id, dataset)
return db.getNextRecord(db_name, collection_name, group_id, dataset)
case "id":
return db.getRecordByID(db_name, group_id, extra_param, dataset)
return db.getRecordByID(db_name, collection_name, group_id, extra_param, dataset)
case "last":
return db.getLastRecord(db_name, group_id, dataset)
return db.getLastRecord(db_name, collection_name, group_id, dataset)
case "resetcounter":
return db.resetCounter(db_name, group_id, extra_param)
return db.resetCounter(db_name, collection_name, group_id, extra_param)
case "size":
return db.getSize(db_name)
return db.getSize(db_name, collection_name)
case "meta":
return db.getMeta(db_name, extra_param)
case "queryimages":
return db.queryImages(db_name, extra_param)
return db.queryImages(db_name, collection_name, extra_param)
}
return nil, errors.New("Wrong db operation: " + op)
......
......@@ -27,6 +27,8 @@ type TestDataset struct {
var db Mongodb
const dbname = "run1"
const collection = "substream"
const collection2 = "substream2"
const dbaddress = "127.0.0.1:27017"
const groupId = "bid2a5auidddp1vl71d0"
const metaID = 0
......@@ -70,24 +72,31 @@ func TestMongoDBConnectOK(t *testing.T) {
}
func TestMongoDBGetNextErrorWhenNotConnected(t *testing.T) {
_, err := db.ProcessRequest(dbname, groupId, "next", "")
_, err := db.ProcessRequest(dbname, collection, groupId, "next", "")
assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code)
}
func TestMongoDBGetMetaErrorWhenNotConnected(t *testing.T) {
_, err := db.ProcessRequest(dbname, "", "meta", "0")
_, err := db.ProcessRequest(dbname, collection, "", "meta", "0")
assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code)
}
func TestMongoDBQueryImagesErrorWhenNotConnected(t *testing.T) {
_, err := db.ProcessRequest(dbname, "", "queryimages", "0")
_, err := db.ProcessRequest(dbname, collection, "", "queryimages", "0")
assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code)
}
func TestMongoDBGetNextErrorWhenWrongDatabasename(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
_, err := db.ProcessRequest("", groupId, "next", "")
_, err := db.ProcessRequest("", collection, groupId, "next", "")
assert.Equal(t, utils.StatusWrongInput, err.(*DBError).Code)
}
func TestMongoDBGetNextErrorWhenWrongDatacollectionname(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
_, err := db.ProcessRequest(dbname, "", groupId, "next", "")
assert.Equal(t, utils.StatusWrongInput, err.(*DBError).Code)
}
......@@ -95,15 +104,15 @@ func TestMongoDBGetNextErrorWhenEmptyCollection(t *testing.T) {
db.Connect(dbaddress)
db.databases = append(db.databases, dbname)
defer cleanup()
_, err := db.ProcessRequest(dbname, groupId, "next", "")
_, err := db.ProcessRequest(dbname, collection, groupId, "next", "")
assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
}
func TestMongoDBGetNextErrorWhenRecordNotThereYet(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, &rec2)
_, err := db.ProcessRequest(dbname, groupId, "next", "")
db.insertRecord(dbname, collection, &rec2)
_, err := db.ProcessRequest(dbname, collection, groupId, "next", "")
assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":2}", err.Error())
}
......@@ -111,8 +120,8 @@ func TestMongoDBGetNextErrorWhenRecordNotThereYet(t *testing.T) {
func TestMongoDBGetNextOK(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, &rec1)
res, err := db.ProcessRequest(dbname, groupId, "next", "")
db.insertRecord(dbname, collection, &rec1)
res, err := db.ProcessRequest(dbname, collection, groupId, "next", "")
assert.Nil(t, err)
assert.Equal(t, string(rec1_expect), string(res))
}
......@@ -120,9 +129,9 @@ func TestMongoDBGetNextOK(t *testing.T) {
func TestMongoDBGetNextErrorOnNoMoreData(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, &rec1)
db.ProcessRequest(dbname, groupId, "next", "")
_, err := db.ProcessRequest(dbname, groupId, "next", "")
db.insertRecord(dbname, collection, &rec1)
db.ProcessRequest(dbname, collection, groupId, "next", "")
_, err := db.ProcessRequest(dbname, collection, groupId, "next", "")
assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1}", err.(*DBError).Message)
......@@ -131,10 +140,10 @@ func TestMongoDBGetNextErrorOnNoMoreData(t *testing.T) {
func TestMongoDBGetNextCorrectOrder(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, &rec2)
db.insertRecord(dbname, &rec1)
res1, _ := db.ProcessRequest(dbname, groupId, "next", "")
res2, _ := db.ProcessRequest(dbname, groupId, "next", "")
db.insertRecord(dbname, collection, &rec2)
db.insertRecord(dbname, collection, &rec1)
res1, _ := db.ProcessRequest(dbname, collection, groupId, "next", "")
res2, _ := db.ProcessRequest(dbname, collection, groupId, "next", "")
assert.Equal(t, string(rec1_expect), string(res1))
assert.Equal(t, string(rec2_expect), string(res2))
}
......@@ -154,7 +163,7 @@ func insertRecords(n int) {
for ind, record := range records {
record.ID = ind
record.FName = string(ind)
db.insertRecord(dbname, &record)
db.insertRecord(dbname, collection, &record)
}
}
......@@ -166,7 +175,7 @@ func getRecords(n int) []int {
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
res_bin, _ := db.ProcessRequest(dbname, groupId, "next", "")
res_bin, _ := db.ProcessRequest(dbname, collection, groupId, "next", "")
var res TestRecord
json.Unmarshal(res_bin, &res)
results[res.ID] = 1
......@@ -191,8 +200,8 @@ func TestMongoDBGetNextInParallel(t *testing.T) {
func TestMongoDBgetRecordByID(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, &rec1)
res, err := db.getRecordByID(dbname, "", "1", false)
db.insertRecord(dbname, collection, &rec1)
res, err := db.ProcessRequest(dbname, collection, groupId, "id", "1")
assert.Nil(t, err)
assert.Equal(t, string(rec1_expect), string(res))
}
......@@ -200,8 +209,8 @@ func TestMongoDBgetRecordByID(t *testing.T) {
func TestMongoDBgetRecordByIDFails(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, &rec1)
_, err := db.getRecordByID(dbname, "", "2", false)
db.insertRecord(dbname, collection, &rec1)
_, err := db.ProcessRequest(dbname, collection, groupId, "id", "2")
assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":1}", err.Error())
}
......@@ -209,17 +218,36 @@ func TestMongoDBgetRecordByIDFails(t *testing.T) {
func TestMongoDBGetRecordNext(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, &rec1)
res, err := db.ProcessRequest(dbname, groupId, "next", "0")
db.insertRecord(dbname, collection, &rec1)
res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0")
assert.Nil(t, err)
assert.Equal(t, string(rec1_expect), string(res))
}
func TestMongoDBGetRecordNextMultipleCollections(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, collection, &rec1)
db.insertRecord(dbname, collection2, &rec_dataset1)
res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0")
res_string, err2 := db.ProcessRequest(dbname, collection2, groupId, "next_dataset", "0")
var res_ds TestDataset
json.Unmarshal(res_string, &res_ds)
assert.Nil(t, err)
assert.Equal(t, string(rec1_expect), string(res))
assert.Nil(t, err2)
assert.Equal(t, rec_dataset1, res_ds)
}
func TestMongoDBGetRecordID(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, &rec1)
res, err := db.ProcessRequest(dbname, groupId, "id", "1")
db.insertRecord(dbname, collection, &rec1)
res, err := db.ProcessRequest(dbname, collection, groupId, "id", "1")
assert.Nil(t, err)
assert.Equal(t, string(rec1_expect), string(res))
}
......@@ -227,18 +255,18 @@ func TestMongoDBGetRecordID(t *testing.T) {
func TestMongoDBWrongOp(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, &rec1)
_, err := db.ProcessRequest(dbname, groupId, "bla", "0")
db.insertRecord(dbname, collection, &rec1)
_, err := db.ProcessRequest(dbname, collection, groupId, "bla", "0")
assert.NotNil(t, err)
}
func TestMongoDBGetRecordLast(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, &rec1)
db.insertRecord(dbname, &rec2)
db.insertRecord(dbname, collection, &rec1)
db.insertRecord(dbname, collection, &rec2)
res, err := db.ProcessRequest(dbname, groupId, "last", "0")
res, err := db.ProcessRequest(dbname, collection, groupId, "last", "0")
assert.Nil(t, err)
assert.Equal(t, string(rec2_expect), string(res))
}
......@@ -246,16 +274,16 @@ func TestMongoDBGetRecordLast(t *testing.T) {
func TestMongoDBGetNextAfterGetLastCorrect(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, &rec1)
db.insertRecord(dbname, &rec2)
db.insertRecord(dbname, collection, &rec1)
db.insertRecord(dbname, collection, &rec2)
res, err := db.ProcessRequest(dbname, groupId, "last", "0")
res, err := db.ProcessRequest(dbname, collection, groupId, "last", "0")
assert.Nil(t, err)
assert.Equal(t, string(rec2_expect), string(res))
db.insertRecord(dbname, &rec3)
db.insertRecord(dbname, collection, &rec3)
res, err = db.ProcessRequest(dbname, groupId, "next", "0")
res, err = db.ProcessRequest(dbname, collection, groupId, "next", "0")
assert.Nil(t, err)
assert.Equal(t, string(rec3_expect), string(res))
......@@ -264,11 +292,11 @@ func TestMongoDBGetNextAfterGetLastCorrect(t *testing.T) {
func TestMongoDBGetSize(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, &rec1)
db.insertRecord(dbname, &rec2)
db.insertRecord(dbname, &rec3)
db.insertRecord(dbname, collection, &rec1)
db.insertRecord(dbname, collection, &rec2)
db.insertRecord(dbname, collection, &rec3)
res, err := db.ProcessRequest(dbname, "", "size", "0")
res, err := db.ProcessRequest(dbname, collection, "", "size", "0")
assert.Nil(t, err)
assert.Equal(t, string(recs1_expect), string(res))
}
......@@ -277,10 +305,10 @@ func TestMongoDBGetSizeNoRecords(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
// to have empty collection
db.insertRecord(dbname, &rec1)
db.client.Database(dbname).Collection(data_collection_name).DeleteOne(context.TODO(), bson.M{"_id": 1}, options.Delete())
db.insertRecord(dbname, collection, &rec1)
db.client.Database(dbname).Collection(data_collection_name_prefix+collection).DeleteOne(context.TODO(), bson.M{"_id": 1}, options.Delete())
res, err := db.ProcessRequest(dbname, "", "size", "0")
res, err := db.ProcessRequest(dbname, collection, "", "size", "0")
assert.Nil(t, err)
assert.Equal(t, string(recs2_expect), string(res))
}
......@@ -288,7 +316,7 @@ func TestMongoDBGetSizeNoRecords(t *testing.T) {
func TestMongoDBGetSizeNoDatabase(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
_, err := db.ProcessRequest(dbname, "", "size", "0")
_, err := db.ProcessRequest(dbname, collection, "", "size", "0")
assert.NotNil(t, err)
}
......@@ -305,25 +333,25 @@ func TestMongoPingNotConected(t *testing.T) {
}
func TestMongoDBgetRecordByIDNotConnected(t *testing.T) {
_, err := db.ProcessRequest(dbname, "", "id", "2")
_, err := db.ProcessRequest(dbname, collection, "", "id", "2")
assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code)
}
func TestMongoDBResetCounter(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, &rec1)
db.insertRecord(dbname, &rec2)
db.insertRecord(dbname, collection, &rec1)
db.insertRecord(dbname, collection, &rec2)
res1, err1 := db.ProcessRequest(dbname, groupId, "next", "0")
res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "0")
assert.Nil(t, err1)
assert.Equal(t, string(rec1_expect), string(res1))
_, err_reset := db.ProcessRequest(dbname, groupId, "resetcounter", "1")
_, err_reset := db.ProcessRequest(dbname, collection, groupId, "resetcounter", "1")
assert.Nil(t, err_reset)
res2, err2 := db.ProcessRequest(dbname, groupId, "next", "0")
res2, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "0")
assert.Nil(t, err2)
assert.Equal(t, string(rec2_expect), string(res2))
......@@ -336,7 +364,7 @@ func TestMongoDBGetMetaOK(t *testing.T) {
rec_expect, _ := json.Marshal(rec1)
db.insertMeta(dbname, &rec1)
res, err := db.ProcessRequest(dbname, "", "meta", metaID_str)
res, err := db.ProcessRequest(dbname, collection, "", "meta", metaID_str)
assert.Nil(t, err)
assert.Equal(t, string(rec_expect), string(res))
......@@ -346,7 +374,7 @@ func TestMongoDBGetMetaErr(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
_, err := db.ProcessRequest(dbname, "", "meta", metaID_str)
_, err := db.ProcessRequest(dbname, collection, "", "meta", metaID_str)
assert.NotNil(t, err)
}
......@@ -408,10 +436,10 @@ func TestMongoDBQueryImagesOK(t *testing.T) {
defer cleanup()
// logger.SetLevel(logger.DebugLevel)
db.insertRecord(dbname, &recq1)
db.insertRecord(dbname, &recq2)
db.insertRecord(dbname, &recq3)
db.insertRecord(dbname, &recq4)
db.insertRecord(dbname, collection, &recq1)
db.insertRecord(dbname, collection, &recq2)
db.insertRecord(dbname, collection, &recq3)
db.insertRecord(dbname, collection, &recq4)
for _, test := range tests {
// info, _ := db.client.BuildInfo()
......@@ -420,7 +448,7 @@ func TestMongoDBQueryImagesOK(t *testing.T) {
// continue
// }
res_string, err := db.ProcessRequest(dbname, "", "queryimages", test.query)
res_string, err := db.ProcessRequest(dbname, collection, "", "queryimages", test.query)
var res []TestRecordMeta
json.Unmarshal(res_string, &res)
// fmt.Println(string(res_string))
......@@ -444,9 +472,9 @@ func TestMongoDBGetDataset(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, &rec_dataset1)
db.insertRecord(dbname, collection, &rec_dataset1)
res_string, err := db.ProcessRequest(dbname, groupId, "next_dataset", "0")
res_string, err := db.ProcessRequest(dbname, collection, groupId, "next_dataset", "0")
assert.Nil(t, err)
......@@ -460,9 +488,9 @@ func TestMongoDBNoDataOnNotCompletedFirstDataset(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, &rec_dataset1_incomplete)
db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
res_string, err := db.ProcessRequest(dbname, groupId, "next_dataset", "0")
res_string, err := db.ProcessRequest(dbname, collection, groupId, "next_dataset", "0")
assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0}", err.(*DBError).Message)
......@@ -474,10 +502,10 @@ func TestMongoDBGetRecordLastDataSetSkipsIncompleteSets(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, &rec_dataset1)
db.insertRecord(dbname, &rec_dataset2)
db.insertRecord(dbname, collection, &rec_dataset1)
db.insertRecord(dbname, collection, &rec_dataset2)
res_string, err := db.ProcessRequest(dbname, groupId, "last_dataset", "0")
res_string, err := db.ProcessRequest(dbname, collection, groupId, "last_dataset", "0")
assert.Nil(t, err)
......@@ -491,10 +519,10 @@ func TestMongoDBGetRecordLastDataSetOK(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, &rec_dataset1)
db.insertRecord(dbname, &rec_dataset3)
db.insertRecord(dbname, collection, &rec_dataset1)
db.insertRecord(dbname, collection, &rec_dataset3)
res_string, err := db.ProcessRequest(dbname, groupId, "last_dataset", "0")
res_string, err := db.ProcessRequest(dbname, collection, groupId, "last_dataset", "0")
assert.Nil(t, err)
......@@ -507,9 +535,9 @@ func TestMongoDBGetRecordLastDataSetOK(t *testing.T) {
func TestMongoDBGetDatasetID(t *testing.T) {
db.Connect(dbaddress)
defer cleanup()
db.insertRecord(dbname, &rec_dataset1)
db.insertRecord(dbname, collection, &rec_dataset1)
res_string, err := db.ProcessRequest(dbname, groupId, "id_dataset", "1")
res_string, err := db.ProcessRequest(dbname, collection, groupId, "id_dataset", "1")
assert.Nil(t, err)
......
......@@ -39,7 +39,7 @@ func TestGetIDTestSuite(t *testing.T) {
}
func (suite *GetIDTestSuite) TestGetIdCallsCorrectRoutine() {
suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "id", "1").Return([]byte("Hello"), nil)
suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "id", "1").Return([]byte("Hello"), nil)
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request")))
w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/1" + correctTokenSuffix)
......
......@@ -33,7 +33,7 @@ func TestGetLastTestSuite(t *testing.T) {
}
func (suite *GetLastTestSuite) TestGetLastCallsCorrectRoutine() {
suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "last", "0").Return([]byte("Hello"), nil)
suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "last", "0").Return([]byte("Hello"), nil)
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request last")))
w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/last" + correctTokenSuffix)
......
......@@ -33,7 +33,7 @@ func TestGetMetaTestSuite(t *testing.T) {
}
func (suite *GetMetaTestSuite) TestGetMetaOK() {
suite.mock_db.On("ProcessRequest", expectedDBName, "", "meta", "0").Return([]byte("{\"test\":10}"), nil)
suite.mock_db.On("ProcessRequest", expectedDBName, "default", "", "meta", "0").Return([]byte("{\"test\":10}"), nil)
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request meta")))
w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/default/0/meta/0" + correctTokenSuffix)
......
......@@ -33,7 +33,7 @@ func TestGetNextTestSuite(t *testing.T) {
}
func (suite *GetNextTestSuite) TestGetNextCallsCorrectRoutine() {
suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "next", "0").Return([]byte("Hello"), nil)
suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "0").Return([]byte("Hello"), nil)
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next")))
w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/next" + correctTokenSuffix)
......
......@@ -33,7 +33,7 @@ func TestGetSizeTestSuite(t *testing.T) {
}
func (suite *GetSizeTestSuite) TestGetSizeOK() {
suite.mock_db.On("ProcessRequest", expectedDBName, "", "size", "0").Return([]byte("{\"size\":10}"), nil)
suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, "", "size", "0").Return([]byte("{\"size\":10}"), nil)
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request size")))
w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/size" + correctTokenSuffix)
......
......@@ -32,7 +32,7 @@ var listRoutes = utils.Routes{
utils.Route{
"GetMeta",
"Get",
"/database/{dbname}/{stream}/default/0/meta/{id}",
"/database/{dbname}/{stream}/{substream}/0/meta/{id}",
routeGetMeta,
},
utils.Route{
......
......@@ -34,7 +34,7 @@ func TestQueryTestSuite(t *testing.T) {
func (suite *QueryTestSuite) TestQueryOK() {
query_str := "aaaa"
suite.mock_db.On("ProcessRequest", expectedDBName, "", "queryimages", query_str).Return([]byte("{}"), nil)
suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, "", "queryimages", query_str).Return([]byte("{}"), nil)
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request queryimages")))
w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedStream+"/"+expectedSubstream+"/0/queryimages"+correctTokenSuffix, "POST", query_str)
......
......@@ -33,7 +33,7 @@ func TestResetCounterTestSuite(t *testing.T) {
}
func (suite *ResetCounterTestSuite) TestResetCounterOK() {
suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "resetcounter", "10").Return([]byte(""), nil)
suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "resetcounter", "10").Return([]byte(""), nil)
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request resetcounter")))
w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedStream+"/"+expectedSubstream+"/"+expectedGroupID+"/resetcounter"+correctTokenSuffix+"&value=10", "POST")
......
......@@ -10,18 +10,19 @@ import (
"net/http"
)
func extractRequestParameters(r *http.Request, needGroupID bool) (string, string, string, bool) {
func extractRequestParameters(r *http.Request, needGroupID bool) (string, string, string, string, bool) {
vars := mux.Vars(r)
db_name, ok1 := vars["dbname"]
stream, ok3 := vars["stream"]
substream, ok4 := vars["substream"]
ok2 := true
group_id := ""
if needGroupID {
group_id, ok2 = vars["groupid"]
}
return db_name, stream, group_id, ok1 && ok2 && ok3
return db_name, stream, substream, group_id, ok1 && ok2 && ok3 && ok4
}
func checkGroupID(w http.ResponseWriter, needGroupID bool, group_id string, db_name string, op string) bool {
......@@ -42,7 +43,7 @@ func checkGroupID(w http.ResponseWriter, needGroupID bool, group_id string, db_n
func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_param string, needGroupID bool) {
r.Header.Set("Content-type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*")
db_name, stream, group_id, ok := extractRequestParameters(r, needGroupID)
db_name, stream, substream, group_id, ok := extractRequestParameters(r, needGroupID)
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
......@@ -61,7 +62,7 @@ func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_par
op = op + "_dataset"
}
answer, code := processRequestInDb(db_name+"_"+stream, group_id, op, extra_param)
answer, code := processRequestInDb(db_name+"_"+stream, substream, group_id, op, extra_param)
w.WriteHeader(code)
w.Write(answer)
}
......@@ -98,9 +99,9 @@ func reconnectIfNeeded(db_error error) {
}
}
func processRequestInDb(db_name string, group_id string, op string, extra_param string) (answer []byte, code int) {
func processRequestInDb(db_name string, data_collection_name string, group_id string, op string, extra_param string) (answer []byte, code int) {
statistics.IncreaseCounter()
answer, err := db.ProcessRequest(db_name, group_id, op, extra_param)
answer, err := db.ProcessRequest(db_name, data_collection_name, group_id, op, extra_param)
log_str := "processing request " + op + " in " + db_name + " at " + settings.GetDatabaseServer()
if err != nil {
go reconnectIfNeeded(err)
......
......@@ -121,7 +121,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithNoToken() {
}
func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() {
suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "next", "0").Return([]byte(""),
suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "0").Return([]byte(""),
&database.DBError{utils.StatusWrongInput, ""})
logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next")))
......@@ -132,7 +132,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName()
}
func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() {
suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "next", "0").Return([]byte(""),
suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "0").Return([]byte(""),
&database.DBError{utils.StatusServiceUnavailable, ""})
logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next")))
......@@ -145,7 +145,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() {
}
func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() {
suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "next", "0").Return([]byte(""), errors.New(""))
suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "0").Return([]byte(""), errors.New(""))
logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next")))
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("reconnected")))
......@@ -157,7 +157,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() {
}
func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() {
suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "next", "0").Return([]byte("Hello"), nil)
suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "0").Return([]byte("Hello"), nil)
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName)))
doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/next" + correctTokenSuffix)
......@@ -171,7 +171,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWrongGroupID() {
}
func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() {
suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "next_dataset", "0").Return([]byte("Hello"), nil)
suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next_dataset", "0").Return([]byte("Hello"), nil)
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next_dataset in "+expectedDBName)))
doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/next" + correctTokenSuffix + "&dataset=true")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment