diff --git a/broker/src/asapo_broker/database/database.go b/broker/src/asapo_broker/database/database.go index 6099eecf043b68848dd17762797dd6f9b56bd82e..452419aeb1b0e1e00677a26fe1632e8cb76cc87c 100644 --- a/broker/src/asapo_broker/database/database.go +++ b/broker/src/asapo_broker/database/database.go @@ -1,7 +1,7 @@ package database type Agent interface { - ProcessRequest(db_name string, group_id string, op string, extra string) ([]byte, error) + ProcessRequest(db_name string, data_collection_name string, group_id string, op string, extra string) ([]byte, error) Ping() error Connect(string) error Close() diff --git a/broker/src/asapo_broker/database/mock_database.go b/broker/src/asapo_broker/database/mock_database.go index 9c389a7a423e29e288b1bdbb710b7b8be5c157ed..be469668f5f8be780961c32ea7ce1656f76f7fc7 100644 --- a/broker/src/asapo_broker/database/mock_database.go +++ b/broker/src/asapo_broker/database/mock_database.go @@ -24,7 +24,7 @@ func (db *MockedDatabase) Ping() error { return args.Error(0) } -func (db *MockedDatabase) ProcessRequest(db_name string, group_id string, op string, extra_param string) (answer []byte, err error) { - args := db.Called(db_name, group_id, op, extra_param) +func (db *MockedDatabase) ProcessRequest(db_name string, data_collection_name string, group_id string, op string, extra_param string) (answer []byte, err error) { + args := db.Called(db_name, data_collection_name, group_id, op, extra_param) return args.Get(0).([]byte), args.Error(1) } diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 5cc0ad09f74a047d56aab26639f9bedd9bb05e55..95ff954acdb07ec65e783db11760e76e971f1ab9 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -26,7 +26,7 @@ type LocationPointer struct { Value int `bson:"current_pointer"` } -const data_collection_name = "data_default" +const data_collection_name_prefix = "data_" const meta_collection_name = "meta" const pointer_collection_name = "current_location" const pointer_field_name = "current_pointer" @@ -135,12 +135,12 @@ func (db *Mongodb) deleteAllRecords(dbname string) (err error) { return db.client.Database(dbname).Drop(context.TODO()) } -func (db *Mongodb) insertRecord(dbname string, s interface{}) error { +func (db *Mongodb) insertRecord(dbname string, collection_name string, s interface{}) error { if db.client == nil { return &DBError{utils.StatusServiceUnavailable, no_session_msg} } - c := db.client.Database(dbname).Collection(data_collection_name) + c := db.client.Database(dbname).Collection(data_collection_name_prefix + collection_name) _, err := c.InsertOne(context.TODO(), s) return err @@ -157,8 +157,8 @@ func (db *Mongodb) insertMeta(dbname string, s interface{}) error { return err } -func (db *Mongodb) getMaxIndex(dbname string, dataset bool) (max_id int, err error) { - c := db.client.Database(dbname).Collection(data_collection_name) +func (db *Mongodb) getMaxIndex(dbname string, collection_name string, dataset bool) (max_id int, err error) { + c := db.client.Database(dbname).Collection(data_collection_name_prefix + collection_name) var q bson.M if dataset { q = bson.M{"$expr": bson.M{"$eq": []interface{}{"$size", bson.M{"$size": "$images"}}}} @@ -175,27 +175,27 @@ func (db *Mongodb) getMaxIndex(dbname string, dataset bool) (max_id int, err err return result.ID, err } -func (db *Mongodb) createLocationPointers(dbname string, group_id string) (err error) { +func (db *Mongodb) createLocationPointers(dbname string, collection_name string, group_id string) (err error) { opts := options.Update().SetUpsert(true) update := bson.M{"$inc": bson.M{pointer_field_name: 0}} - q := bson.M{"_id": group_id} + q := bson.M{"_id": group_id + "_" + collection_name} c := db.client.Database(dbname).Collection(pointer_collection_name) _, err = c.UpdateOne(context.TODO(), q, update, opts) return } -func (db *Mongodb) setCounter(dbname string, group_id string, ind int) (err error) { +func (db *Mongodb) setCounter(dbname string, collection_name string, group_id string, ind int) (err error) { update := bson.M{"$set": bson.M{pointer_field_name: ind}} c := db.client.Database(dbname).Collection(pointer_collection_name) - q := bson.M{"_id": group_id} + q := bson.M{"_id": group_id + "_" + collection_name} _, err = c.UpdateOne(context.TODO(), q, update, options.Update()) return } -func (db *Mongodb) incrementField(dbname string, group_id string, max_ind int, res interface{}) (err error) { +func (db *Mongodb) incrementField(dbname string, collection_name string, group_id string, max_ind int, res interface{}) (err error) { update := bson.M{"$inc": bson.M{pointer_field_name: 1}} opts := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After) - q := bson.M{"_id": group_id, pointer_field_name: bson.M{"$lt": max_ind}} + q := bson.M{"_id": group_id + "_" + collection_name, pointer_field_name: bson.M{"$lt": max_ind}} c := db.client.Database(dbname).Collection(pointer_collection_name) err = c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res) @@ -219,7 +219,7 @@ func encodeAnswer(id, id_max int) string { return string(answer) } -func (db *Mongodb) getRecordByIDRow(dbname string, id, id_max int, dataset bool) ([]byte, error) { +func (db *Mongodb) getRecordByIDRow(dbname string, collection_name string, id, id_max int, dataset bool) ([]byte, error) { var res map[string]interface{} var q bson.M if dataset { @@ -228,7 +228,7 @@ func (db *Mongodb) getRecordByIDRow(dbname string, id, id_max int, dataset bool) q = bson.M{"_id": id} } - c := db.client.Database(dbname).Collection(data_collection_name) + c := db.client.Database(dbname).Collection(data_collection_name_prefix + collection_name) err := c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res) if err != nil { answer := encodeAnswer(id, id_max) @@ -241,41 +241,41 @@ func (db *Mongodb) getRecordByIDRow(dbname string, id, id_max int, dataset bool) return utils.MapToJson(&res) } -func (db *Mongodb) getRecordByID(dbname string, group_id string, id_str string, dataset bool) ([]byte, error) { +func (db *Mongodb) getRecordByID(dbname string, collection_name string, group_id string, id_str string, dataset bool) ([]byte, error) { id, err := strconv.Atoi(id_str) if err != nil { return nil, &DBError{utils.StatusWrongInput, err.Error()} } - max_ind, err := db.getMaxIndex(dbname, dataset) + max_ind, err := db.getMaxIndex(dbname, collection_name, dataset) if err != nil { return nil, err } - return db.getRecordByIDRow(dbname, id, max_ind, dataset) + return db.getRecordByIDRow(dbname, collection_name, id, max_ind, dataset) } -func (db *Mongodb) needCreateLocationPointersInDb(group_id string) bool { +func (db *Mongodb) needCreateLocationPointersInDb(collection_name string, group_id string) bool { dbPointersLock.RLock() - needCreate := !db.db_pointers_created[group_id] + needCreate := !db.db_pointers_created[group_id+"_"+collection_name] dbPointersLock.RUnlock() return needCreate } -func (db *Mongodb) setLocationPointersCreateFlag(group_id string) { +func (db *Mongodb) setLocationPointersCreateFlag(collection_name string, group_id string) { dbPointersLock.Lock() if db.db_pointers_created == nil { db.db_pointers_created = make(map[string]bool) } - db.db_pointers_created[group_id] = true + db.db_pointers_created[group_id+"_"+collection_name] = true dbPointersLock.Unlock() } -func (db *Mongodb) generateLocationPointersInDbIfNeeded(db_name string, group_id string) { - if db.needCreateLocationPointersInDb(group_id) { - db.createLocationPointers(db_name, group_id) - db.setLocationPointersCreateFlag(group_id) +func (db *Mongodb) generateLocationPointersInDbIfNeeded(db_name string, collection_name string, group_id string) { + if db.needCreateLocationPointersInDb(collection_name, group_id) { + db.createLocationPointers(db_name, collection_name, group_id) + db.setLocationPointersCreateFlag(collection_name, group_id) } } @@ -287,7 +287,7 @@ func (db *Mongodb) getParentDB() *Mongodb { } } -func (db *Mongodb) checkDatabaseOperationPrerequisites(db_name string, group_id string) error { +func (db *Mongodb) checkDatabaseOperationPrerequisites(db_name string, collection_name string, group_id string) error { if db.client == nil { return &DBError{utils.StatusServiceUnavailable, no_session_msg} } @@ -297,19 +297,19 @@ func (db *Mongodb) checkDatabaseOperationPrerequisites(db_name string, group_id } if len(group_id) > 0 { - db.getParentDB().generateLocationPointersInDbIfNeeded(db_name, group_id) + db.getParentDB().generateLocationPointersInDbIfNeeded(db_name, collection_name, group_id) } return nil } -func (db *Mongodb) getCurrentPointer(db_name string, group_id string, dataset bool) (LocationPointer, int, error) { - max_ind, err := db.getMaxIndex(db_name, dataset) +func (db *Mongodb) getCurrentPointer(db_name string, collection_name string, group_id string, dataset bool) (LocationPointer, int, error) { + max_ind, err := db.getMaxIndex(db_name, collection_name, dataset) if err != nil { return LocationPointer{}, 0, err } var curPointer LocationPointer - err = db.incrementField(db_name, group_id, max_ind, &curPointer) + err = db.incrementField(db_name, collection_name, group_id, max_ind, &curPointer) if err != nil { return LocationPointer{}, 0, err } @@ -317,8 +317,8 @@ func (db *Mongodb) getCurrentPointer(db_name string, group_id string, dataset bo return curPointer, max_ind, nil } -func (db *Mongodb) getNextRecord(db_name string, group_id string, dataset bool) ([]byte, error) { - curPointer, max_ind, err := db.getCurrentPointer(db_name, group_id, dataset) +func (db *Mongodb) getNextRecord(db_name string, collection_name string, group_id string, dataset bool) ([]byte, error) { + curPointer, max_ind, err := db.getCurrentPointer(db_name, collection_name, group_id, dataset) if err != nil { log_str := "error getting next pointer for " + db_name + ", groupid: " + group_id + ":" + err.Error() logger.Debug(log_str) @@ -326,23 +326,23 @@ func (db *Mongodb) getNextRecord(db_name string, group_id string, dataset bool) } log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + db_name + ", groupid: " + group_id logger.Debug(log_str) - return db.getRecordByIDRow(db_name, curPointer.Value, max_ind, dataset) + return db.getRecordByIDRow(db_name, collection_name, curPointer.Value, max_ind, dataset) } -func (db *Mongodb) getLastRecord(db_name string, group_id string, dataset bool) ([]byte, error) { - max_ind, err := db.getMaxIndex(db_name, dataset) +func (db *Mongodb) getLastRecord(db_name string, collection_name string, group_id string, dataset bool) ([]byte, error) { + max_ind, err := db.getMaxIndex(db_name, collection_name, dataset) if err != nil { return nil, err } - res, err := db.getRecordByIDRow(db_name, max_ind, max_ind, dataset) + res, err := db.getRecordByIDRow(db_name, collection_name, max_ind, max_ind, dataset) - db.setCounter(db_name, group_id, max_ind) + db.setCounter(db_name, collection_name, group_id, max_ind) return res, err } -func (db *Mongodb) getSize(db_name string) ([]byte, error) { - c := db.client.Database(db_name).Collection(data_collection_name) +func (db *Mongodb) getSize(db_name string, collection_name string) ([]byte, error) { + c := db.client.Database(db_name).Collection(data_collection_name_prefix + collection_name) var rec SizeRecord var err error @@ -354,13 +354,13 @@ func (db *Mongodb) getSize(db_name string) ([]byte, error) { return json.Marshal(&rec) } -func (db *Mongodb) resetCounter(db_name string, group_id string, id_str string) ([]byte, error) { +func (db *Mongodb) resetCounter(db_name string, collection_name string, group_id string, id_str string) ([]byte, error) { id, err := strconv.Atoi(id_str) if err != nil { return nil, err } - err = db.setCounter(db_name, group_id, id) + err = db.setCounter(db_name, collection_name, group_id, id) return []byte(""), err } @@ -391,7 +391,7 @@ func (db *Mongodb) processQueryError(query, dbname string, err error) ([]byte, e return nil, &DBError{utils.StatusNoData, err.Error()} } -func (db *Mongodb) queryImages(dbname string, query string) ([]byte, error) { +func (db *Mongodb) queryImages(dbname string, collection_name string, query string) ([]byte, error) { var res []map[string]interface{} q, sort, err := db.BSONFromSQL(dbname, query) if err != nil { @@ -400,7 +400,7 @@ func (db *Mongodb) queryImages(dbname string, query string) ([]byte, error) { return nil, &DBError{utils.StatusWrongInput, err.Error()} } - c := db.client.Database(dbname).Collection(data_collection_name) + c := db.client.Database(dbname).Collection(data_collection_name_prefix + collection_name) opts := options.Find() if len(sort) > 0 { @@ -427,32 +427,32 @@ func (db *Mongodb) queryImages(dbname string, query string) ([]byte, error) { } -func (db *Mongodb) ProcessRequest(db_name string, group_id string, op string, extra_param string) (answer []byte, err error) { +func (db *Mongodb) ProcessRequest(db_name string, collection_name string, group_id string, op string, extra_param string) (answer []byte, err error) { dataset := false if strings.HasSuffix(op, "_dataset") { dataset = true op = op[:len(op)-8] } - if err := db.checkDatabaseOperationPrerequisites(db_name, group_id); err != nil { + if err := db.checkDatabaseOperationPrerequisites(db_name, collection_name, group_id); err != nil { return nil, err } switch op { case "next": - return db.getNextRecord(db_name, group_id, dataset) + return db.getNextRecord(db_name, collection_name, group_id, dataset) case "id": - return db.getRecordByID(db_name, group_id, extra_param, dataset) + return db.getRecordByID(db_name, collection_name, group_id, extra_param, dataset) case "last": - return db.getLastRecord(db_name, group_id, dataset) + return db.getLastRecord(db_name, collection_name, group_id, dataset) case "resetcounter": - return db.resetCounter(db_name, group_id, extra_param) + return db.resetCounter(db_name, collection_name, group_id, extra_param) case "size": - return db.getSize(db_name) + return db.getSize(db_name, collection_name) case "meta": return db.getMeta(db_name, extra_param) case "queryimages": - return db.queryImages(db_name, extra_param) + return db.queryImages(db_name, collection_name, extra_param) } return nil, errors.New("Wrong db operation: " + op) diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index 8a1136fd774d06f9880c1882f68222913063afe4..16bda6a5af9f419b8cd40047f9c10d9c06196710 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -27,6 +27,8 @@ type TestDataset struct { var db Mongodb const dbname = "run1" +const collection = "substream" +const collection2 = "substream2" const dbaddress = "127.0.0.1:27017" const groupId = "bid2a5auidddp1vl71d0" const metaID = 0 @@ -70,24 +72,31 @@ func TestMongoDBConnectOK(t *testing.T) { } func TestMongoDBGetNextErrorWhenNotConnected(t *testing.T) { - _, err := db.ProcessRequest(dbname, groupId, "next", "") + _, err := db.ProcessRequest(dbname, collection, groupId, "next", "") assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) } func TestMongoDBGetMetaErrorWhenNotConnected(t *testing.T) { - _, err := db.ProcessRequest(dbname, "", "meta", "0") + _, err := db.ProcessRequest(dbname, collection, "", "meta", "0") assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) } func TestMongoDBQueryImagesErrorWhenNotConnected(t *testing.T) { - _, err := db.ProcessRequest(dbname, "", "queryimages", "0") + _, err := db.ProcessRequest(dbname, collection, "", "queryimages", "0") assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) } func TestMongoDBGetNextErrorWhenWrongDatabasename(t *testing.T) { db.Connect(dbaddress) defer cleanup() - _, err := db.ProcessRequest("", groupId, "next", "") + _, err := db.ProcessRequest("", collection, groupId, "next", "") + assert.Equal(t, utils.StatusWrongInput, err.(*DBError).Code) +} + +func TestMongoDBGetNextErrorWhenWrongDatacollectionname(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + _, err := db.ProcessRequest(dbname, "", groupId, "next", "") assert.Equal(t, utils.StatusWrongInput, err.(*DBError).Code) } @@ -95,15 +104,15 @@ func TestMongoDBGetNextErrorWhenEmptyCollection(t *testing.T) { db.Connect(dbaddress) db.databases = append(db.databases, dbname) defer cleanup() - _, err := db.ProcessRequest(dbname, groupId, "next", "") + _, err := db.ProcessRequest(dbname, collection, groupId, "next", "") assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) } func TestMongoDBGetNextErrorWhenRecordNotThereYet(t *testing.T) { db.Connect(dbaddress) defer cleanup() - db.insertRecord(dbname, &rec2) - _, err := db.ProcessRequest(dbname, groupId, "next", "") + db.insertRecord(dbname, collection, &rec2) + _, err := db.ProcessRequest(dbname, collection, groupId, "next", "") assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":2}", err.Error()) } @@ -111,8 +120,8 @@ func TestMongoDBGetNextErrorWhenRecordNotThereYet(t *testing.T) { func TestMongoDBGetNextOK(t *testing.T) { db.Connect(dbaddress) defer cleanup() - db.insertRecord(dbname, &rec1) - res, err := db.ProcessRequest(dbname, groupId, "next", "") + db.insertRecord(dbname, collection, &rec1) + res, err := db.ProcessRequest(dbname, collection, groupId, "next", "") assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -120,9 +129,9 @@ func TestMongoDBGetNextOK(t *testing.T) { func TestMongoDBGetNextErrorOnNoMoreData(t *testing.T) { db.Connect(dbaddress) defer cleanup() - db.insertRecord(dbname, &rec1) - db.ProcessRequest(dbname, groupId, "next", "") - _, err := db.ProcessRequest(dbname, groupId, "next", "") + db.insertRecord(dbname, collection, &rec1) + db.ProcessRequest(dbname, collection, groupId, "next", "") + _, err := db.ProcessRequest(dbname, collection, groupId, "next", "") assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1}", err.(*DBError).Message) @@ -131,10 +140,10 @@ func TestMongoDBGetNextErrorOnNoMoreData(t *testing.T) { func TestMongoDBGetNextCorrectOrder(t *testing.T) { db.Connect(dbaddress) defer cleanup() - db.insertRecord(dbname, &rec2) - db.insertRecord(dbname, &rec1) - res1, _ := db.ProcessRequest(dbname, groupId, "next", "") - res2, _ := db.ProcessRequest(dbname, groupId, "next", "") + db.insertRecord(dbname, collection, &rec2) + db.insertRecord(dbname, collection, &rec1) + res1, _ := db.ProcessRequest(dbname, collection, groupId, "next", "") + res2, _ := db.ProcessRequest(dbname, collection, groupId, "next", "") assert.Equal(t, string(rec1_expect), string(res1)) assert.Equal(t, string(rec2_expect), string(res2)) } @@ -154,7 +163,7 @@ func insertRecords(n int) { for ind, record := range records { record.ID = ind record.FName = string(ind) - db.insertRecord(dbname, &record) + db.insertRecord(dbname, collection, &record) } } @@ -166,7 +175,7 @@ func getRecords(n int) []int { for i := 0; i < n; i++ { go func() { defer wg.Done() - res_bin, _ := db.ProcessRequest(dbname, groupId, "next", "") + res_bin, _ := db.ProcessRequest(dbname, collection, groupId, "next", "") var res TestRecord json.Unmarshal(res_bin, &res) results[res.ID] = 1 @@ -191,8 +200,8 @@ func TestMongoDBGetNextInParallel(t *testing.T) { func TestMongoDBgetRecordByID(t *testing.T) { db.Connect(dbaddress) defer cleanup() - db.insertRecord(dbname, &rec1) - res, err := db.getRecordByID(dbname, "", "1", false) + db.insertRecord(dbname, collection, &rec1) + res, err := db.ProcessRequest(dbname, collection, groupId, "id", "1") assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -200,8 +209,8 @@ func TestMongoDBgetRecordByID(t *testing.T) { func TestMongoDBgetRecordByIDFails(t *testing.T) { db.Connect(dbaddress) defer cleanup() - db.insertRecord(dbname, &rec1) - _, err := db.getRecordByID(dbname, "", "2", false) + db.insertRecord(dbname, collection, &rec1) + _, err := db.ProcessRequest(dbname, collection, groupId, "id", "2") assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":1}", err.Error()) } @@ -209,17 +218,36 @@ func TestMongoDBgetRecordByIDFails(t *testing.T) { func TestMongoDBGetRecordNext(t *testing.T) { db.Connect(dbaddress) defer cleanup() - db.insertRecord(dbname, &rec1) - res, err := db.ProcessRequest(dbname, groupId, "next", "0") + db.insertRecord(dbname, collection, &rec1) + res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0") + assert.Nil(t, err) + assert.Equal(t, string(rec1_expect), string(res)) +} + +func TestMongoDBGetRecordNextMultipleCollections(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection2, &rec_dataset1) + + res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0") + res_string, err2 := db.ProcessRequest(dbname, collection2, groupId, "next_dataset", "0") + var res_ds TestDataset + json.Unmarshal(res_string, &res_ds) + assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) + + assert.Nil(t, err2) + assert.Equal(t, rec_dataset1, res_ds) + } func TestMongoDBGetRecordID(t *testing.T) { db.Connect(dbaddress) defer cleanup() - db.insertRecord(dbname, &rec1) - res, err := db.ProcessRequest(dbname, groupId, "id", "1") + db.insertRecord(dbname, collection, &rec1) + res, err := db.ProcessRequest(dbname, collection, groupId, "id", "1") assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -227,18 +255,18 @@ func TestMongoDBGetRecordID(t *testing.T) { func TestMongoDBWrongOp(t *testing.T) { db.Connect(dbaddress) defer cleanup() - db.insertRecord(dbname, &rec1) - _, err := db.ProcessRequest(dbname, groupId, "bla", "0") + db.insertRecord(dbname, collection, &rec1) + _, err := db.ProcessRequest(dbname, collection, groupId, "bla", "0") assert.NotNil(t, err) } func TestMongoDBGetRecordLast(t *testing.T) { db.Connect(dbaddress) defer cleanup() - db.insertRecord(dbname, &rec1) - db.insertRecord(dbname, &rec2) + db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(dbname, groupId, "last", "0") + res, err := db.ProcessRequest(dbname, collection, groupId, "last", "0") assert.Nil(t, err) assert.Equal(t, string(rec2_expect), string(res)) } @@ -246,16 +274,16 @@ func TestMongoDBGetRecordLast(t *testing.T) { func TestMongoDBGetNextAfterGetLastCorrect(t *testing.T) { db.Connect(dbaddress) defer cleanup() - db.insertRecord(dbname, &rec1) - db.insertRecord(dbname, &rec2) + db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(dbname, groupId, "last", "0") + res, err := db.ProcessRequest(dbname, collection, groupId, "last", "0") assert.Nil(t, err) assert.Equal(t, string(rec2_expect), string(res)) - db.insertRecord(dbname, &rec3) + db.insertRecord(dbname, collection, &rec3) - res, err = db.ProcessRequest(dbname, groupId, "next", "0") + res, err = db.ProcessRequest(dbname, collection, groupId, "next", "0") assert.Nil(t, err) assert.Equal(t, string(rec3_expect), string(res)) @@ -264,11 +292,11 @@ func TestMongoDBGetNextAfterGetLastCorrect(t *testing.T) { func TestMongoDBGetSize(t *testing.T) { db.Connect(dbaddress) defer cleanup() - db.insertRecord(dbname, &rec1) - db.insertRecord(dbname, &rec2) - db.insertRecord(dbname, &rec3) + db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection, &rec2) + db.insertRecord(dbname, collection, &rec3) - res, err := db.ProcessRequest(dbname, "", "size", "0") + res, err := db.ProcessRequest(dbname, collection, "", "size", "0") assert.Nil(t, err) assert.Equal(t, string(recs1_expect), string(res)) } @@ -277,10 +305,10 @@ func TestMongoDBGetSizeNoRecords(t *testing.T) { db.Connect(dbaddress) defer cleanup() // to have empty collection - db.insertRecord(dbname, &rec1) - db.client.Database(dbname).Collection(data_collection_name).DeleteOne(context.TODO(), bson.M{"_id": 1}, options.Delete()) + db.insertRecord(dbname, collection, &rec1) + db.client.Database(dbname).Collection(data_collection_name_prefix+collection).DeleteOne(context.TODO(), bson.M{"_id": 1}, options.Delete()) - res, err := db.ProcessRequest(dbname, "", "size", "0") + res, err := db.ProcessRequest(dbname, collection, "", "size", "0") assert.Nil(t, err) assert.Equal(t, string(recs2_expect), string(res)) } @@ -288,7 +316,7 @@ func TestMongoDBGetSizeNoRecords(t *testing.T) { func TestMongoDBGetSizeNoDatabase(t *testing.T) { db.Connect(dbaddress) defer cleanup() - _, err := db.ProcessRequest(dbname, "", "size", "0") + _, err := db.ProcessRequest(dbname, collection, "", "size", "0") assert.NotNil(t, err) } @@ -305,25 +333,25 @@ func TestMongoPingNotConected(t *testing.T) { } func TestMongoDBgetRecordByIDNotConnected(t *testing.T) { - _, err := db.ProcessRequest(dbname, "", "id", "2") + _, err := db.ProcessRequest(dbname, collection, "", "id", "2") assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) } func TestMongoDBResetCounter(t *testing.T) { db.Connect(dbaddress) defer cleanup() - db.insertRecord(dbname, &rec1) - db.insertRecord(dbname, &rec2) + db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection, &rec2) - res1, err1 := db.ProcessRequest(dbname, groupId, "next", "0") + res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "0") assert.Nil(t, err1) assert.Equal(t, string(rec1_expect), string(res1)) - _, err_reset := db.ProcessRequest(dbname, groupId, "resetcounter", "1") + _, err_reset := db.ProcessRequest(dbname, collection, groupId, "resetcounter", "1") assert.Nil(t, err_reset) - res2, err2 := db.ProcessRequest(dbname, groupId, "next", "0") + res2, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "0") assert.Nil(t, err2) assert.Equal(t, string(rec2_expect), string(res2)) @@ -336,7 +364,7 @@ func TestMongoDBGetMetaOK(t *testing.T) { rec_expect, _ := json.Marshal(rec1) db.insertMeta(dbname, &rec1) - res, err := db.ProcessRequest(dbname, "", "meta", metaID_str) + res, err := db.ProcessRequest(dbname, collection, "", "meta", metaID_str) assert.Nil(t, err) assert.Equal(t, string(rec_expect), string(res)) @@ -346,7 +374,7 @@ func TestMongoDBGetMetaErr(t *testing.T) { db.Connect(dbaddress) defer cleanup() - _, err := db.ProcessRequest(dbname, "", "meta", metaID_str) + _, err := db.ProcessRequest(dbname, collection, "", "meta", metaID_str) assert.NotNil(t, err) } @@ -408,10 +436,10 @@ func TestMongoDBQueryImagesOK(t *testing.T) { defer cleanup() // logger.SetLevel(logger.DebugLevel) - db.insertRecord(dbname, &recq1) - db.insertRecord(dbname, &recq2) - db.insertRecord(dbname, &recq3) - db.insertRecord(dbname, &recq4) + db.insertRecord(dbname, collection, &recq1) + db.insertRecord(dbname, collection, &recq2) + db.insertRecord(dbname, collection, &recq3) + db.insertRecord(dbname, collection, &recq4) for _, test := range tests { // info, _ := db.client.BuildInfo() @@ -420,7 +448,7 @@ func TestMongoDBQueryImagesOK(t *testing.T) { // continue // } - res_string, err := db.ProcessRequest(dbname, "", "queryimages", test.query) + res_string, err := db.ProcessRequest(dbname, collection, "", "queryimages", test.query) var res []TestRecordMeta json.Unmarshal(res_string, &res) // fmt.Println(string(res_string)) @@ -444,9 +472,9 @@ func TestMongoDBGetDataset(t *testing.T) { db.Connect(dbaddress) defer cleanup() - db.insertRecord(dbname, &rec_dataset1) + db.insertRecord(dbname, collection, &rec_dataset1) - res_string, err := db.ProcessRequest(dbname, groupId, "next_dataset", "0") + res_string, err := db.ProcessRequest(dbname, collection, groupId, "next_dataset", "0") assert.Nil(t, err) @@ -460,9 +488,9 @@ func TestMongoDBNoDataOnNotCompletedFirstDataset(t *testing.T) { db.Connect(dbaddress) defer cleanup() - db.insertRecord(dbname, &rec_dataset1_incomplete) + db.insertRecord(dbname, collection, &rec_dataset1_incomplete) - res_string, err := db.ProcessRequest(dbname, groupId, "next_dataset", "0") + res_string, err := db.ProcessRequest(dbname, collection, groupId, "next_dataset", "0") assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0}", err.(*DBError).Message) @@ -474,10 +502,10 @@ func TestMongoDBGetRecordLastDataSetSkipsIncompleteSets(t *testing.T) { db.Connect(dbaddress) defer cleanup() - db.insertRecord(dbname, &rec_dataset1) - db.insertRecord(dbname, &rec_dataset2) + db.insertRecord(dbname, collection, &rec_dataset1) + db.insertRecord(dbname, collection, &rec_dataset2) - res_string, err := db.ProcessRequest(dbname, groupId, "last_dataset", "0") + res_string, err := db.ProcessRequest(dbname, collection, groupId, "last_dataset", "0") assert.Nil(t, err) @@ -491,10 +519,10 @@ func TestMongoDBGetRecordLastDataSetOK(t *testing.T) { db.Connect(dbaddress) defer cleanup() - db.insertRecord(dbname, &rec_dataset1) - db.insertRecord(dbname, &rec_dataset3) + db.insertRecord(dbname, collection, &rec_dataset1) + db.insertRecord(dbname, collection, &rec_dataset3) - res_string, err := db.ProcessRequest(dbname, groupId, "last_dataset", "0") + res_string, err := db.ProcessRequest(dbname, collection, groupId, "last_dataset", "0") assert.Nil(t, err) @@ -507,9 +535,9 @@ func TestMongoDBGetRecordLastDataSetOK(t *testing.T) { func TestMongoDBGetDatasetID(t *testing.T) { db.Connect(dbaddress) defer cleanup() - db.insertRecord(dbname, &rec_dataset1) + db.insertRecord(dbname, collection, &rec_dataset1) - res_string, err := db.ProcessRequest(dbname, groupId, "id_dataset", "1") + res_string, err := db.ProcessRequest(dbname, collection, groupId, "id_dataset", "1") assert.Nil(t, err) diff --git a/broker/src/asapo_broker/server/get_id_test.go b/broker/src/asapo_broker/server/get_id_test.go index bb3c5b43a01e17352b4e612de7b6cdd14eccf542..b53aa0ea650b1e34ee7251cd34f7d13c8689ea90 100644 --- a/broker/src/asapo_broker/server/get_id_test.go +++ b/broker/src/asapo_broker/server/get_id_test.go @@ -39,7 +39,7 @@ func TestGetIDTestSuite(t *testing.T) { } func (suite *GetIDTestSuite) TestGetIdCallsCorrectRoutine() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "id", "1").Return([]byte("Hello"), nil) + suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "id", "1").Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request"))) w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/1" + correctTokenSuffix) diff --git a/broker/src/asapo_broker/server/get_last_test.go b/broker/src/asapo_broker/server/get_last_test.go index 60d1ff4e1fa6e9d30aedef935805de44bfa40977..ed3f6a468c5b40663a89099f3c3450c94110c31c 100644 --- a/broker/src/asapo_broker/server/get_last_test.go +++ b/broker/src/asapo_broker/server/get_last_test.go @@ -33,7 +33,7 @@ func TestGetLastTestSuite(t *testing.T) { } func (suite *GetLastTestSuite) TestGetLastCallsCorrectRoutine() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "last", "0").Return([]byte("Hello"), nil) + suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "last", "0").Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request last"))) w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/last" + correctTokenSuffix) diff --git a/broker/src/asapo_broker/server/get_meta_test.go b/broker/src/asapo_broker/server/get_meta_test.go index aef6f7e2ebff2214a50a17ee3ad104cc8683e57e..51ed9827df0b08100456edfe15d59e85d878c587 100644 --- a/broker/src/asapo_broker/server/get_meta_test.go +++ b/broker/src/asapo_broker/server/get_meta_test.go @@ -33,7 +33,7 @@ func TestGetMetaTestSuite(t *testing.T) { } func (suite *GetMetaTestSuite) TestGetMetaOK() { - suite.mock_db.On("ProcessRequest", expectedDBName, "", "meta", "0").Return([]byte("{\"test\":10}"), nil) + suite.mock_db.On("ProcessRequest", expectedDBName, "default", "", "meta", "0").Return([]byte("{\"test\":10}"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request meta"))) w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/default/0/meta/0" + correctTokenSuffix) diff --git a/broker/src/asapo_broker/server/get_next_test.go b/broker/src/asapo_broker/server/get_next_test.go index 2da6bb2fba6656f1e5aa82766f6702f94531b71b..9321a438e6fed9c928a4646ba22772a81a454045 100644 --- a/broker/src/asapo_broker/server/get_next_test.go +++ b/broker/src/asapo_broker/server/get_next_test.go @@ -33,7 +33,7 @@ func TestGetNextTestSuite(t *testing.T) { } func (suite *GetNextTestSuite) TestGetNextCallsCorrectRoutine() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "next", "0").Return([]byte("Hello"), nil) + suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "0").Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next"))) w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/next" + correctTokenSuffix) diff --git a/broker/src/asapo_broker/server/get_size_test.go b/broker/src/asapo_broker/server/get_size_test.go index a52808356a12015044a40c433c51764a92379efd..86cacbb85560d4a2f0778571f767f46890414d3d 100644 --- a/broker/src/asapo_broker/server/get_size_test.go +++ b/broker/src/asapo_broker/server/get_size_test.go @@ -33,7 +33,7 @@ func TestGetSizeTestSuite(t *testing.T) { } func (suite *GetSizeTestSuite) TestGetSizeOK() { - suite.mock_db.On("ProcessRequest", expectedDBName, "", "size", "0").Return([]byte("{\"size\":10}"), nil) + suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, "", "size", "0").Return([]byte("{\"size\":10}"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request size"))) w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/size" + correctTokenSuffix) diff --git a/broker/src/asapo_broker/server/listroutes.go b/broker/src/asapo_broker/server/listroutes.go index b14482705a1e0a315d83321d7927cadf3f451a15..7a4d9bb7234783e1bdd42487ae1a1e0c81a51904 100644 --- a/broker/src/asapo_broker/server/listroutes.go +++ b/broker/src/asapo_broker/server/listroutes.go @@ -32,7 +32,7 @@ var listRoutes = utils.Routes{ utils.Route{ "GetMeta", "Get", - "/database/{dbname}/{stream}/default/0/meta/{id}", + "/database/{dbname}/{stream}/{substream}/0/meta/{id}", routeGetMeta, }, utils.Route{ diff --git a/broker/src/asapo_broker/server/post_query_images_test.go b/broker/src/asapo_broker/server/post_query_images_test.go index 227f0d8f694e158b2b6816d76c8444f60a6c2dab..51ed9c45fd32814564fb87ccf9fc17b783897a2e 100644 --- a/broker/src/asapo_broker/server/post_query_images_test.go +++ b/broker/src/asapo_broker/server/post_query_images_test.go @@ -34,7 +34,7 @@ func TestQueryTestSuite(t *testing.T) { func (suite *QueryTestSuite) TestQueryOK() { query_str := "aaaa" - suite.mock_db.On("ProcessRequest", expectedDBName, "", "queryimages", query_str).Return([]byte("{}"), nil) + suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, "", "queryimages", query_str).Return([]byte("{}"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request queryimages"))) w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedStream+"/"+expectedSubstream+"/0/queryimages"+correctTokenSuffix, "POST", query_str) diff --git a/broker/src/asapo_broker/server/post_reset_counter_test.go b/broker/src/asapo_broker/server/post_reset_counter_test.go index 74cfec68fe084710b03ac2a6eac58f4153c028bb..bb2f2b2a224ea666e0c543046dd4264f2539ad72 100644 --- a/broker/src/asapo_broker/server/post_reset_counter_test.go +++ b/broker/src/asapo_broker/server/post_reset_counter_test.go @@ -33,7 +33,7 @@ func TestResetCounterTestSuite(t *testing.T) { } func (suite *ResetCounterTestSuite) TestResetCounterOK() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "resetcounter", "10").Return([]byte(""), nil) + suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "resetcounter", "10").Return([]byte(""), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request resetcounter"))) w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedStream+"/"+expectedSubstream+"/"+expectedGroupID+"/resetcounter"+correctTokenSuffix+"&value=10", "POST") diff --git a/broker/src/asapo_broker/server/process_request.go b/broker/src/asapo_broker/server/process_request.go index fa07b77d81c1594c69fc0495f72e64c971ce982d..022439f8dd20467862367cafe34ff82b2dfc8acd 100644 --- a/broker/src/asapo_broker/server/process_request.go +++ b/broker/src/asapo_broker/server/process_request.go @@ -10,18 +10,19 @@ import ( "net/http" ) -func extractRequestParameters(r *http.Request, needGroupID bool) (string, string, string, bool) { +func extractRequestParameters(r *http.Request, needGroupID bool) (string, string, string, string, bool) { vars := mux.Vars(r) db_name, ok1 := vars["dbname"] stream, ok3 := vars["stream"] + substream, ok4 := vars["substream"] ok2 := true group_id := "" if needGroupID { group_id, ok2 = vars["groupid"] } - return db_name, stream, group_id, ok1 && ok2 && ok3 + return db_name, stream, substream, group_id, ok1 && ok2 && ok3 && ok4 } func checkGroupID(w http.ResponseWriter, needGroupID bool, group_id string, db_name string, op string) bool { @@ -42,7 +43,7 @@ func checkGroupID(w http.ResponseWriter, needGroupID bool, group_id string, db_n func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_param string, needGroupID bool) { r.Header.Set("Content-type", "application/json") w.Header().Set("Access-Control-Allow-Origin", "*") - db_name, stream, group_id, ok := extractRequestParameters(r, needGroupID) + db_name, stream, substream, group_id, ok := extractRequestParameters(r, needGroupID) if !ok { w.WriteHeader(http.StatusBadRequest) return @@ -61,7 +62,7 @@ func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_par op = op + "_dataset" } - answer, code := processRequestInDb(db_name+"_"+stream, group_id, op, extra_param) + answer, code := processRequestInDb(db_name+"_"+stream, substream, group_id, op, extra_param) w.WriteHeader(code) w.Write(answer) } @@ -98,9 +99,9 @@ func reconnectIfNeeded(db_error error) { } } -func processRequestInDb(db_name string, group_id string, op string, extra_param string) (answer []byte, code int) { +func processRequestInDb(db_name string, data_collection_name string, group_id string, op string, extra_param string) (answer []byte, code int) { statistics.IncreaseCounter() - answer, err := db.ProcessRequest(db_name, group_id, op, extra_param) + answer, err := db.ProcessRequest(db_name, data_collection_name, group_id, op, extra_param) log_str := "processing request " + op + " in " + db_name + " at " + settings.GetDatabaseServer() if err != nil { go reconnectIfNeeded(err) diff --git a/broker/src/asapo_broker/server/process_request_test.go b/broker/src/asapo_broker/server/process_request_test.go index 219e6a5da5d6eed21c70c2e59c6a5fa8ef780de5..818d0040983b9f286ebea5625921b6c9891b0931 100644 --- a/broker/src/asapo_broker/server/process_request_test.go +++ b/broker/src/asapo_broker/server/process_request_test.go @@ -121,7 +121,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithNoToken() { } func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "next", "0").Return([]byte(""), + suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "0").Return([]byte(""), &database.DBError{utils.StatusWrongInput, ""}) logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next"))) @@ -132,7 +132,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() } func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "next", "0").Return([]byte(""), + suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "0").Return([]byte(""), &database.DBError{utils.StatusServiceUnavailable, ""}) logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next"))) @@ -145,7 +145,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() { } func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "next", "0").Return([]byte(""), errors.New("")) + suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "0").Return([]byte(""), errors.New("")) logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next"))) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("reconnected"))) @@ -157,7 +157,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() { } func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "next", "0").Return([]byte("Hello"), nil) + suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "0").Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName))) doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/next" + correctTokenSuffix) @@ -171,7 +171,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWrongGroupID() { } func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "next_dataset", "0").Return([]byte("Hello"), nil) + suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next_dataset", "0").Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next_dataset in "+expectedDBName))) doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/next" + correctTokenSuffix + "&dataset=true")