diff --git a/3d_party/mongo-c-driver/install.cmd b/3d_party/mongo-c-driver/install.cmd index 1aef675599fa2960b20123a54ab9d88ac2bd8fb9..6ee758b070dd44636b93d7ccba0db53cc3489d6d 100644 --- a/3d_party/mongo-c-driver/install.cmd +++ b/3d_party/mongo-c-driver/install.cmd @@ -1,20 +1,12 @@ :: download and untar mongoc driver sources to dir -:: https://github.com/mongodb/mongo-c-driver/releases/download/1.9.0/mongo-c-driver-1.9.0.tar.gz +:: https://github.com/mongodb/mongo-c-driver/releases/download/1.15.2/mongo-c-driver-1.15.2.tar.gz :: set directory with mongoc driver sources -SET dir=c:\tmp\mongo-c-driver-1.9.0 +SET dir=c:\tmp\mongo-c-driver-1.15.2 set mypath=%cd% cd /d %dir% -:: install libbson -cd src\libbson -cmake "-DCMAKE_INSTALL_PREFIX=C:\mongo-c-driver" ^ - "-DCMAKE_BUILD_TYPE=Release" ^ - "-DCMAKE_C_FLAGS_RELEASE=/MT" -cmake --build . --config Release -cmake --build . --target install --config Release - :: install mongoc cd %dir% cmake "-DCMAKE_INSTALL_PREFIX=C:\mongo-c-driver" ^ diff --git a/3d_party/mongo-c-driver/install.sh b/3d_party/mongo-c-driver/install.sh index 33d1d5d8a87be513db430128f461da90cb405513..eba4e08f6b6e3ecc45f7e7e3075a63b9c93b0f89 100755 --- a/3d_party/mongo-c-driver/install.sh +++ b/3d_party/mongo-c-driver/install.sh @@ -1,10 +1,11 @@ #!/usr/bin/env bash cd $1 -wget https://github.com/mongodb/mongo-c-driver/releases/download/1.9.0/mongo-c-driver-1.9.0.tar.gz -tar xzf mongo-c-driver-1.9.0.tar.gz -cd mongo-c-driver-1.9.0 -./configure --disable-automatic-init-and-cleanup --enable-static=yes --enable-shared=no --enable-examples=no --enable-ssl=no --enable-sasl=no +wget https://github.com/mongodb/mongo-c-driver/releases/download/1.15.2/mongo-c-driver-1.15.2.tar.gz +tar xzf mongo-c-driver-1.15.2.tar.gz +cd mongo-c-driver-1.15.2 + +cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_SSL=OFF -DENABLE_SASL=OFF -DENABLE_AUTOMATIC_INIT_AND_CLEANUP=OFF -DMONGOC_ENABLE_STATIC=ON . make #sudo make install diff --git a/broker/src/asapo_broker/database/database.go b/broker/src/asapo_broker/database/database.go index 342eb2816c6895f70d07a0139b6b9ac8a8731e2e..6099eecf043b68848dd17762797dd6f9b56bd82e 100644 --- a/broker/src/asapo_broker/database/database.go +++ b/broker/src/asapo_broker/database/database.go @@ -5,7 +5,6 @@ type Agent interface { Ping() error Connect(string) error Close() - Copy() Agent } type DBError struct { diff --git a/broker/src/asapo_broker/database/database_test.go b/broker/src/asapo_broker/database/database_test.go index 3ca42ab570bc3747bc7d90ba4db3483da902e6a7..c8c68dc82c95a88793f5fd996b2cfc1f72f90ff8 100644 --- a/broker/src/asapo_broker/database/database_test.go +++ b/broker/src/asapo_broker/database/database_test.go @@ -16,7 +16,6 @@ func TestMockDataBase(t *testing.T) { db.Connect("") db.Close() - db.Copy() db.Ping() var err DBError err.Error() diff --git a/broker/src/asapo_broker/database/mock_database.go b/broker/src/asapo_broker/database/mock_database.go index 7edb3bb83cb530c8543506769b5296f9cb2b8c7b..9c389a7a423e29e288b1bdbb710b7b8be5c157ed 100644 --- a/broker/src/asapo_broker/database/mock_database.go +++ b/broker/src/asapo_broker/database/mock_database.go @@ -24,11 +24,6 @@ func (db *MockedDatabase) Ping() error { return args.Error(0) } -func (db *MockedDatabase) Copy() Agent { - db.Called() - return db -} - func (db *MockedDatabase) ProcessRequest(db_name string, group_id string, op string, extra_param string) (answer []byte, err error) { args := db.Called(db_name, group_id, op, extra_param) return args.Get(0).([]byte), args.Error(1) diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index db9e3e96b63dad9e8f965b3f6be736d834c4fbc3..41ad420530fb38257e856f64624df2bdbb0631a1 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -5,26 +5,32 @@ package database import ( "asapo_common/logger" "asapo_common/utils" + "context" "encoding/json" "errors" - "github.com/globalsign/mgo" - "github.com/globalsign/mgo/bson" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" "strconv" "strings" "sync" "time" ) -type Pointer struct { - ID int `bson:"_id"` - Value int `bson:"current_pointer"` +type ID struct { + ID int `bson:"_id"` +} + +type LocationPointer struct { + GroupID string `bson:"_id"` + Value int `bson:"current_pointer"` } const data_collection_name = "data" const meta_collection_name = "meta" const pointer_collection_name = "current_location" const pointer_field_name = "current_pointer" -const no_session_msg = "database session not created" +const no_session_msg = "database client not created" const wrong_id_type = "wrong id type" const already_connected_msg = "already connected" @@ -37,24 +43,13 @@ type SizeRecord struct { } type Mongodb struct { - session *mgo.Session + client *mongo.Client timeout time.Duration databases []string parent_db *Mongodb db_pointers_created map[string]bool } -func (db *Mongodb) Copy() Agent { - new_db := new(Mongodb) - if db.session != nil { - dbSessionLock.RLock() - new_db.session = db.session.Copy() - dbSessionLock.RUnlock() - } - new_db.parent_db = db - return new_db -} - func (db *Mongodb) databaseInList(dbname string) bool { dbListLock.RLock() defer dbListLock.RUnlock() @@ -63,16 +58,20 @@ func (db *Mongodb) databaseInList(dbname string) bool { func (db *Mongodb) updateDatabaseList() (err error) { dbListLock.Lock() - db.databases, err = db.session.DatabaseNames() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + db.databases, err = db.client.ListDatabaseNames(ctx, bson.M{}) dbListLock.Unlock() return err } func (db *Mongodb) Ping() (err error) { - if db.session == nil { + if db.client == nil { return &DBError{utils.StatusServiceUnavailable, no_session_msg} } - return db.session.Ping() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + return db.client.Ping(ctx, nil) } func (db *Mongodb) dataBaseExist(dbname string) (err error) { @@ -92,18 +91,26 @@ func (db *Mongodb) dataBaseExist(dbname string) (err error) { } func (db *Mongodb) Connect(address string) (err error) { - if db.session != nil { + if db.client != nil { return &DBError{utils.StatusServiceUnavailable, already_connected_msg} } - db.session, err = mgo.DialWithTimeout(address, time.Second) + db.client, err = mongo.NewClient(options.Client().SetConnectTimeout(20 * time.Second).ApplyURI("mongodb://" + address)) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + err = db.client.Connect(ctx) if err != nil { + db.client = nil return err } - // db.session.SetSafe(&mgo.Safe{J: true}) + // db.client.SetSafe(&mgo.Safe{J: true}) if err := db.updateDatabaseList(); err != nil { + db.Close() return err } @@ -111,90 +118,94 @@ func (db *Mongodb) Connect(address string) (err error) { } func (db *Mongodb) Close() { - if db.session != nil { + if db.client != nil { dbSessionLock.Lock() - db.session.Close() - db.session = nil + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + db.client.Disconnect(ctx) + db.client = nil dbSessionLock.Unlock() } } func (db *Mongodb) deleteAllRecords(dbname string) (err error) { - if db.session == nil { + if db.client == nil { return &DBError{utils.StatusServiceUnavailable, no_session_msg} } - return db.session.DB(dbname).DropDatabase() + return db.client.Database(dbname).Drop(context.TODO()) } func (db *Mongodb) insertRecord(dbname string, s interface{}) error { - if db.session == nil { + if db.client == nil { return &DBError{utils.StatusServiceUnavailable, no_session_msg} } - c := db.session.DB(dbname).C(data_collection_name) + c := db.client.Database(dbname).Collection(data_collection_name) - return c.Insert(s) + _, err := c.InsertOne(context.TODO(), s) + return err } func (db *Mongodb) insertMeta(dbname string, s interface{}) error { - if db.session == nil { + if db.client == nil { return &DBError{utils.StatusServiceUnavailable, no_session_msg} } - c := db.session.DB(dbname).C(meta_collection_name) + c := db.client.Database(dbname).Collection(meta_collection_name) - return c.Insert(s) + _, err := c.InsertOne(context.TODO(), s) + return err } func (db *Mongodb) getMaxIndex(dbname string, dataset bool) (max_id int, err error) { - c := db.session.DB(dbname).C(data_collection_name) - var id Pointer + c := db.client.Database(dbname).Collection(data_collection_name) var q bson.M if dataset { q = bson.M{"$expr": bson.M{"$eq": []interface{}{"$size", bson.M{"$size": "$images"}}}} } else { q = nil } - err = c.Find(q).Sort("-_id").Select(bson.M{"_id": 1}).One(&id) - if err == mgo.ErrNotFound { + opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true) + var result ID + err = c.FindOne(context.TODO(), q, opts).Decode(&result) + if err == mongo.ErrNoDocuments { return 0, nil } - return id.ID, err + + return result.ID, err } func (db *Mongodb) createLocationPointers(dbname string, group_id string) (err error) { - change := mgo.Change{ - Update: bson.M{"$inc": bson.M{pointer_field_name: 0}}, - Upsert: true, - } + opts := options.Update().SetUpsert(true) + update := bson.M{"$inc": bson.M{pointer_field_name: 0}} q := bson.M{"_id": group_id} - c := db.session.DB(dbname).C(pointer_collection_name) - var res map[string]interface{} - _, err = c.Find(q).Apply(change, &res) - return err + c := db.client.Database(dbname).Collection(pointer_collection_name) + _, err = c.UpdateOne(context.TODO(), q, update, opts) + return } func (db *Mongodb) setCounter(dbname string, group_id string, ind int) (err error) { update := bson.M{"$set": bson.M{pointer_field_name: ind}} - c := db.session.DB(dbname).C(pointer_collection_name) - return c.UpdateId(group_id, update) + c := db.client.Database(dbname).Collection(pointer_collection_name) + q := bson.M{"_id": group_id} + _, err = c.UpdateOne(context.TODO(), q, update, options.Update()) + return } func (db *Mongodb) incrementField(dbname string, group_id string, max_ind int, res interface{}) (err error) { update := bson.M{"$inc": bson.M{pointer_field_name: 1}} - change := mgo.Change{ - Update: update, - Upsert: false, - ReturnNew: true, - } + opts := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After) q := bson.M{"_id": group_id, pointer_field_name: bson.M{"$lt": max_ind}} - c := db.session.DB(dbname).C(pointer_collection_name) - _, err = c.Find(q).Apply(change, res) - if err == mgo.ErrNotFound { - return &DBError{utils.StatusNoData, encodeAnswer(max_ind, max_ind)} - } else if err != nil { // we do not know if counter was updated + c := db.client.Database(dbname).Collection(pointer_collection_name) + + err = c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res) + if err != nil { + if err == mongo.ErrNoDocuments { + return &DBError{utils.StatusNoData, encodeAnswer(max_ind, max_ind)} + } return &DBError{utils.StatusTransactionInterrupted, err.Error()} } + return nil } @@ -217,8 +228,8 @@ func (db *Mongodb) getRecordByIDRow(dbname string, id, id_max int, dataset bool) q = bson.M{"_id": id} } - c := db.session.DB(dbname).C(data_collection_name) - err := c.Find(q).One(&res) + c := db.client.Database(dbname).Collection(data_collection_name) + err := c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res) if err != nil { answer := encodeAnswer(id, id_max) log_str := "error getting record id " + strconv.Itoa(id) + " for " + dbname + " : " + err.Error() @@ -277,7 +288,7 @@ func (db *Mongodb) getParentDB() *Mongodb { } func (db *Mongodb) checkDatabaseOperationPrerequisites(db_name string, group_id string) error { - if db.session == nil { + if db.client == nil { return &DBError{utils.StatusServiceUnavailable, no_session_msg} } @@ -291,16 +302,16 @@ func (db *Mongodb) checkDatabaseOperationPrerequisites(db_name string, group_id return nil } -func (db *Mongodb) getCurrentPointer(db_name string, group_id string, dataset bool) (Pointer, int, error) { +func (db *Mongodb) getCurrentPointer(db_name string, group_id string, dataset bool) (LocationPointer, int, error) { max_ind, err := db.getMaxIndex(db_name, dataset) if err != nil { - return Pointer{}, 0, err + return LocationPointer{}, 0, err } - var curPointer Pointer + var curPointer LocationPointer err = db.incrementField(db_name, group_id, max_ind, &curPointer) if err != nil { - return Pointer{}, 0, err + return LocationPointer{}, 0, err } return curPointer, max_ind, nil @@ -331,13 +342,15 @@ func (db *Mongodb) getLastRecord(db_name string, group_id string, dataset bool) } func (db *Mongodb) getSize(db_name string) ([]byte, error) { - c := db.session.DB(db_name).C(data_collection_name) + c := db.client.Database(db_name).Collection(data_collection_name) var rec SizeRecord var err error - rec.Size, err = c.Count() + + size, err := c.CountDocuments(context.TODO(), bson.M{}, options.Count()) if err != nil { return nil, err } + rec.Size = int(size) return json.Marshal(&rec) } @@ -360,8 +373,8 @@ func (db *Mongodb) getMeta(dbname string, id_str string) ([]byte, error) { var res map[string]interface{} q := bson.M{"_id": id} - c := db.session.DB(dbname).C(meta_collection_name) - err = c.Find(q).One(&res) + c := db.client.Database(dbname).Collection(meta_collection_name) + err = c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res) if err != nil { log_str := "error getting meta with id " + strconv.Itoa(id) + " for " + dbname + " : " + err.Error() logger.Debug(log_str) @@ -372,6 +385,12 @@ func (db *Mongodb) getMeta(dbname string, id_str string) ([]byte, error) { return utils.MapToJson(&res) } +func (db *Mongodb) processQueryError(query, dbname string, err error) ([]byte, error) { + log_str := "error processing query: " + query + " for " + dbname + " : " + err.Error() + logger.Debug(log_str) + return nil, &DBError{utils.StatusNoData, err.Error()} +} + func (db *Mongodb) queryImages(dbname string, query string) ([]byte, error) { var res []map[string]interface{} q, sort, err := db.BSONFromSQL(dbname, query) @@ -381,16 +400,21 @@ func (db *Mongodb) queryImages(dbname string, query string) ([]byte, error) { return nil, &DBError{utils.StatusWrongInput, err.Error()} } - c := db.session.DB(dbname).C(data_collection_name) + c := db.client.Database(dbname).Collection(data_collection_name) + opts := options.Find() + if len(sort) > 0 { - err = c.Find(q).Sort(sort).All(&res) + opts = opts.SetSort(sort) } else { - err = c.Find(q).All(&res) } + + cursor, err := c.Find(context.TODO(), q, opts) if err != nil { - log_str := "error processing query: " + query + " for " + dbname + " : " + err.Error() - logger.Debug(log_str) - return nil, &DBError{utils.StatusNoData, err.Error()} + return db.processQueryError(query, dbname, err) + } + err = cursor.All(context.TODO(), &res) + if err != nil { + return db.processQueryError(query, dbname, err) } log_str := "processed query " + query + " for " + dbname + " ,found" + strconv.Itoa(len(res)) + " records" diff --git a/broker/src/asapo_broker/database/mongodb_query.go b/broker/src/asapo_broker/database/mongodb_query.go index 373339543108a57ce4410bb5dc002d08f0c27976..e85d0135efa518861bfd1ce1d2fc17214a041694 100644 --- a/broker/src/asapo_broker/database/mongodb_query.go +++ b/broker/src/asapo_broker/database/mongodb_query.go @@ -178,39 +178,40 @@ func getBSONFromExpression(node sqlparser.Expr) (res bson.M, err error) { } } -func getSortBSONFromOrderArray(order_array sqlparser.OrderBy) (string, error) { +func getSortBSONFromOrderArray(order_array sqlparser.OrderBy) (bson.M, error) { if len(order_array) != 1 { - return "", errors.New("order by should have single column name") + return bson.M{}, errors.New("order by should have single column name") } order := order_array[0] val, ok := order.Expr.(*sqlparser.ColName) if !ok { - return "", errors.New("order be key name") + return bson.M{}, errors.New("order has to be key name") } name := keyFromColumnName(val) + sign := 1 if order.Direction == sqlparser.DescScr { - name = "-" + name + sign = -1 } - return name, nil + return bson.M{name: sign}, nil } -func (db *Mongodb) BSONFromSQL(dbname string, query string) (bson.M, string, error) { +func (db *Mongodb) BSONFromSQL(dbname string, query string) (bson.M, bson.M, error) { stmt, err := sqlparser.Parse("select * from " + dbname + " where " + query) if err != nil { - return bson.M{}, "", err + return bson.M{}, bson.M{}, err } sel, _ := stmt.(*sqlparser.Select) query_mongo, err := getBSONFromExpression(sel.Where.Expr) if err != nil || len(sel.OrderBy) == 0 { - return query_mongo, "", err + return query_mongo, bson.M{}, err } sort_mongo, err := getSortBSONFromOrderArray(sel.OrderBy) if err != nil { - return bson.M{}, "", err + return bson.M{}, bson.M{}, err } return query_mongo, sort_mongo, nil diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index 44dc0c445849aaf3a91291efe1eb64935ae30d40..8a1136fd774d06f9880c1882f68222913063afe4 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -4,10 +4,11 @@ package database import ( "asapo_common/utils" + "context" "encoding/json" - "fmt" "github.com/stretchr/testify/assert" - "strings" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo/options" "sync" "testing" ) @@ -45,13 +46,16 @@ var recs2 = SizeRecord{0} var recs2_expect, _ = json.Marshal(recs2) func cleanup() { + if db.client == nil { + return + } db.deleteAllRecords(dbname) db.db_pointers_created = nil db.Close() } -// these are tjhe integration tests. They assume mongo db is runnig on 127.0.0.1:27027 -// test names shlud contain MongoDB*** so that go test could find them: +// these are the integration tests. They assume mongo db is runnig on 127.0.0.1:27027 +// test names should contain MongoDB*** so that go test could find them: // go_integration_test(${TARGET_NAME}-connectdb "./..." "MongoDBConnect") func TestMongoDBConnectFails(t *testing.T) { err := db.Connect("blabla") @@ -65,12 +69,6 @@ func TestMongoDBConnectOK(t *testing.T) { assert.Nil(t, err) } -func TestMongoCopyWhenNoSession(t *testing.T) { - db_new := db.Copy() - err := db_new.Connect("sss") - assert.NotNil(t, err) -} - func TestMongoDBGetNextErrorWhenNotConnected(t *testing.T) { _, err := db.ProcessRequest(dbname, groupId, "next", "") assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) @@ -130,14 +128,6 @@ func TestMongoDBGetNextErrorOnNoMoreData(t *testing.T) { assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1}", err.(*DBError).Message) } -//func TestMongoDBGetNextErrorOnDataAtAll(t *testing.T) { -// db.Connect(dbaddress) -// defer cleanup() -// _, err := db.GetNextRecord(dbname, groupId, false) -// assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) -// assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0}", err.(*DBError).Message) -//} - func TestMongoDBGetNextCorrectOrder(t *testing.T) { db.Connect(dbaddress) defer cleanup() @@ -288,7 +278,7 @@ func TestMongoDBGetSizeNoRecords(t *testing.T) { defer cleanup() // to have empty collection db.insertRecord(dbname, &rec1) - db.session.DB(dbname).C(data_collection_name).RemoveId(1) + db.client.Database(dbname).Collection(data_collection_name).DeleteOne(context.TODO(), bson.M{"_id": 1}, options.Delete()) res, err := db.ProcessRequest(dbname, "", "size", "0") assert.Nil(t, err) @@ -424,18 +414,19 @@ func TestMongoDBQueryImagesOK(t *testing.T) { db.insertRecord(dbname, &recq4) for _, test := range tests { - info, _ := db.session.BuildInfo() - if strings.Contains(test.query, "NOT REGEXP") && !info.VersionAtLeast(4, 0, 7) { - fmt.Println("Skipping NOT REGEXP test since it is not supported by this mongodb version") - continue - } + // info, _ := db.client.BuildInfo() + // if strings.Contains(test.query, "NOT REGEXP") && !info.VersionAtLeast(4, 0, 7) { + // fmt.Println("Skipping NOT REGEXP test since it is not supported by this mongodb version") + // continue + // } res_string, err := db.ProcessRequest(dbname, "", "queryimages", test.query) var res []TestRecordMeta json.Unmarshal(res_string, &res) + // fmt.Println(string(res_string)) if test.ok { - assert.Nil(t, err) - assert.Equal(t, test.res, res, test.query) + assert.Nil(t, err, test.query) + assert.Equal(t, test.res, res) } else { assert.NotNil(t, err, test.query) assert.Equal(t, 0, len(res)) diff --git a/broker/src/asapo_broker/server/get_health.go b/broker/src/asapo_broker/server/get_health.go index b124010030ae05fcfa9a919dee0b9e88f7cd2e60..ce5f45ee531005cb5817410052b89b3449761028 100644 --- a/broker/src/asapo_broker/server/get_health.go +++ b/broker/src/asapo_broker/server/get_health.go @@ -5,9 +5,7 @@ import ( ) func routeGetHealth(w http.ResponseWriter, r *http.Request) { - db_new := db.Copy() - defer db_new.Close() - err := db_new.Ping() + err := db.Ping() if err != nil { ReconnectDb() } diff --git a/broker/src/asapo_broker/server/get_health_test.go b/broker/src/asapo_broker/server/get_health_test.go index 60a91692df0f5a60caee4d4bef3e5f9e4483dc36..3675fcbb0a3e6defc90f17b1ae51cbd5f7bfbefb 100644 --- a/broker/src/asapo_broker/server/get_health_test.go +++ b/broker/src/asapo_broker/server/get_health_test.go @@ -33,15 +33,13 @@ func TestGetHealthTestSuite(t *testing.T) { func (suite *GetHealthTestSuite) TestGetHealthOk() { suite.mock_db.On("Ping").Return(nil) - ExpectCopyClose(suite.mock_db) - w := doRequest("/health") suite.Equal(http.StatusNoContent, w.Code) } func (suite *GetHealthTestSuite) TestGetHealthTriesToReconnectsToDataBase() { suite.mock_db.On("Ping").Return(errors.New("ping error")) - ExpectCopyCloseReconnect(suite.mock_db) + ExpectReconnect(suite.mock_db) w := doRequest("/health") suite.Equal(http.StatusNoContent, w.Code) diff --git a/broker/src/asapo_broker/server/get_id_test.go b/broker/src/asapo_broker/server/get_id_test.go index 2991d2e803563f8a2985b21fb1af98a8ed99f3be..97c9bf5fea8c82195f84be2dd2663c91c37810fb 100644 --- a/broker/src/asapo_broker/server/get_id_test.go +++ b/broker/src/asapo_broker/server/get_id_test.go @@ -15,11 +15,6 @@ func TestGetIdWithoutDatabaseName(t *testing.T) { assert.Equal(t, http.StatusNotFound, w.Code, "no database name") } -func ExpectCopyCloseOnID(mock_db *database.MockedDatabase) { - mock_db.On("Copy").Return(mock_db) - mock_db.On("Close").Return() -} - type GetIDTestSuite struct { suite.Suite mock_db *database.MockedDatabase @@ -31,7 +26,6 @@ func (suite *GetIDTestSuite) SetupTest() { suite.mock_db = new(database.MockedDatabase) db = suite.mock_db logger.SetMockLog() - ExpectCopyCloseOnID(suite.mock_db) } func (suite *GetIDTestSuite) TearDownTest() { @@ -47,7 +41,6 @@ func TestGetIDTestSuite(t *testing.T) { func (suite *GetIDTestSuite) TestGetIdCallsCorrectRoutine() { suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "id", "1").Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request"))) - ExpectCopyClose(suite.mock_db) w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedGroupID + "/1" + correctTokenSuffix) suite.Equal(http.StatusOK, w.Code, "GetImage OK") diff --git a/broker/src/asapo_broker/server/get_last_test.go b/broker/src/asapo_broker/server/get_last_test.go index 91f08959bbab1da60e0243d414bd0b3a800c931b..3d9d58cd1aa9e75b46dc9466a02b15aba1e61748 100644 --- a/broker/src/asapo_broker/server/get_last_test.go +++ b/broker/src/asapo_broker/server/get_last_test.go @@ -35,7 +35,6 @@ func TestGetLastTestSuite(t *testing.T) { func (suite *GetLastTestSuite) TestGetLastCallsCorrectRoutine() { suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "last", "0").Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request last"))) - ExpectCopyClose(suite.mock_db) w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedGroupID + "/last" + correctTokenSuffix) suite.Equal(http.StatusOK, w.Code, "GetLast OK") diff --git a/broker/src/asapo_broker/server/get_meta_test.go b/broker/src/asapo_broker/server/get_meta_test.go index 00a9ee4e9905e56711edf5484442c9c1230ad9bf..cc60001fc71aad00f30d0d94b79b9f0b66cb89a7 100644 --- a/broker/src/asapo_broker/server/get_meta_test.go +++ b/broker/src/asapo_broker/server/get_meta_test.go @@ -35,7 +35,6 @@ func TestGetMetaTestSuite(t *testing.T) { func (suite *GetMetaTestSuite) TestGetMetaOK() { suite.mock_db.On("ProcessRequest", expectedDBName, "", "meta", "0").Return([]byte("{\"test\":10}"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request meta"))) - ExpectCopyClose(suite.mock_db) w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/0/meta/0" + correctTokenSuffix) suite.Equal(http.StatusOK, w.Code, "GetSize OK") diff --git a/broker/src/asapo_broker/server/get_next_test.go b/broker/src/asapo_broker/server/get_next_test.go index 6e5954cff2046fcee10c1e06b96f67646e8ff6fd..be9d664d87a6ed752fe668df2d7bf623f3b3ec88 100644 --- a/broker/src/asapo_broker/server/get_next_test.go +++ b/broker/src/asapo_broker/server/get_next_test.go @@ -35,7 +35,6 @@ func TestGetNextTestSuite(t *testing.T) { func (suite *GetNextTestSuite) TestGetNextCallsCorrectRoutine() { suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "next", "0").Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next"))) - ExpectCopyClose(suite.mock_db) w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix) suite.Equal(http.StatusOK, w.Code, "GetNext OK") diff --git a/broker/src/asapo_broker/server/get_size_test.go b/broker/src/asapo_broker/server/get_size_test.go index 709b49cd17e2eeb6b7801d7e4f25469ce3fc5e68..e5ce8fadc643ad0ac5afdad5280ed16f1cdd3fda 100644 --- a/broker/src/asapo_broker/server/get_size_test.go +++ b/broker/src/asapo_broker/server/get_size_test.go @@ -35,7 +35,6 @@ func TestGetSizeTestSuite(t *testing.T) { func (suite *GetSizeTestSuite) TestGetSizeOK() { suite.mock_db.On("ProcessRequest", expectedDBName, "", "size", "0").Return([]byte("{\"size\":10}"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request size"))) - ExpectCopyClose(suite.mock_db) w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/size" + correctTokenSuffix) suite.Equal(http.StatusOK, w.Code, "GetSize OK") diff --git a/broker/src/asapo_broker/server/post_query_images_test.go b/broker/src/asapo_broker/server/post_query_images_test.go index bd96adbbc98bf98841cb3f8148372a9da7861bc7..335a024e3ac9918fcbb06ec8404170d6e3b25f5c 100644 --- a/broker/src/asapo_broker/server/post_query_images_test.go +++ b/broker/src/asapo_broker/server/post_query_images_test.go @@ -36,7 +36,6 @@ func (suite *QueryTestSuite) TestQueryOK() { query_str := "aaaa" suite.mock_db.On("ProcessRequest", expectedDBName, "", "queryimages", query_str).Return([]byte("{}"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request queryimages"))) - ExpectCopyClose(suite.mock_db) w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedStream+"/0/queryimages"+correctTokenSuffix, "POST", query_str) suite.Equal(http.StatusOK, w.Code, "Query OK") diff --git a/broker/src/asapo_broker/server/post_reset_counter_test.go b/broker/src/asapo_broker/server/post_reset_counter_test.go index e0b67f29a1092ce9da74a28c5686581688fc0430..e6a27a1b4666d936f8957501b0a0fd724e45d211 100644 --- a/broker/src/asapo_broker/server/post_reset_counter_test.go +++ b/broker/src/asapo_broker/server/post_reset_counter_test.go @@ -35,7 +35,6 @@ func TestResetCounterTestSuite(t *testing.T) { func (suite *ResetCounterTestSuite) TestResetCounterOK() { suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "resetcounter", "10").Return([]byte(""), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request resetcounter"))) - ExpectCopyClose(suite.mock_db) w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedStream+"/"+expectedGroupID+"/resetcounter"+correctTokenSuffix+"&value=10", "POST") suite.Equal(http.StatusOK, w.Code, "ResetCounter OK") diff --git a/broker/src/asapo_broker/server/process_request.go b/broker/src/asapo_broker/server/process_request.go index ba380a26c50763f1389a6bad8e10e0bbb97d15b1..fa07b77d81c1594c69fc0495f72e64c971ce982d 100644 --- a/broker/src/asapo_broker/server/process_request.go +++ b/broker/src/asapo_broker/server/process_request.go @@ -99,10 +99,8 @@ func reconnectIfNeeded(db_error error) { } func processRequestInDb(db_name string, group_id string, op string, extra_param string) (answer []byte, code int) { - db_new := db.Copy() - defer db_new.Close() statistics.IncreaseCounter() - answer, err := db_new.ProcessRequest(db_name, group_id, op, extra_param) + answer, err := db.ProcessRequest(db_name, group_id, op, extra_param) log_str := "processing request " + op + " in " + db_name + " at " + settings.GetDatabaseServer() if err != nil { go reconnectIfNeeded(err) diff --git a/broker/src/asapo_broker/server/process_request_test.go b/broker/src/asapo_broker/server/process_request_test.go index d5ded161802379748f6e88d1d80323af019cdc01..a21310e738ee04f70bcb9aa15988d86b4b8436c5 100644 --- a/broker/src/asapo_broker/server/process_request_test.go +++ b/broker/src/asapo_broker/server/process_request_test.go @@ -75,14 +75,8 @@ func TestProcessRequestWithoutDatabaseName(t *testing.T) { assert.Equal(t, http.StatusNotFound, w.Code, "no database name") } -func ExpectCopyClose(mock_db *database.MockedDatabase) { - mock_db.On("Copy").Return(mock_db) +func ExpectReconnect(mock_db *database.MockedDatabase) { mock_db.On("Close").Return() -} - -func ExpectCopyCloseReconnect(mock_db *database.MockedDatabase) { - mock_db.On("Copy").Return(mock_db) - mock_db.On("Close").Twice().Return() mock_db.On("Connect", mock.AnythingOfType("string")).Return(nil) } @@ -130,7 +124,6 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() &database.DBError{utils.StatusWrongInput, ""}) logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next"))) - ExpectCopyClose(suite.mock_db) w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix) @@ -142,7 +135,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() { &database.DBError{utils.StatusServiceUnavailable, ""}) logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next"))) - ExpectCopyCloseReconnect(suite.mock_db) + ExpectReconnect(suite.mock_db) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("reconnected"))) w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix) @@ -155,7 +148,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() { logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next"))) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("reconnected"))) - ExpectCopyCloseReconnect(suite.mock_db) + ExpectReconnect(suite.mock_db) w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix) time.Sleep(time.Second) @@ -165,7 +158,6 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() { func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() { suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "next", "0").Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName))) - ExpectCopyClose(suite.mock_db) doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix) suite.Equal(1, statistics.GetCounter(), "ProcessRequest increases counter") @@ -180,7 +172,6 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWrongGroupID() { func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() { suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "next_dataset", "0").Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next_dataset in "+expectedDBName))) - ExpectCopyClose(suite.mock_db) doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix + "&dataset=true") } diff --git a/common/cpp/src/database/CMakeLists.txt b/common/cpp/src/database/CMakeLists.txt index 30ebedb96509db3002438899291c0a8a77fbc71e..261134f12893116d9577afb3db0d0a1eadac4bb5 100644 --- a/common/cpp/src/database/CMakeLists.txt +++ b/common/cpp/src/database/CMakeLists.txt @@ -6,7 +6,7 @@ set(SOURCE_FILES ################################ # Library ################################ -find_package (libmongoc-static-1.0 1.9 REQUIRED) +find_package (libmongoc-static-1.0 1.15 REQUIRED) message ("-- mongoc found version \"${MONGOC_STATIC_VERSION}\"") message ("-- mongoc include path \"${MONGOC_STATIC_INCLUDE_DIRS}\"") message ("-- mongoc libraries \"${MONGOC_STATIC_LIBRARIES}\"") diff --git a/tests/automatic/broker/get_meta/check_linux.sh b/tests/automatic/broker/get_meta/check_linux.sh index 583d7a5edc9db7eeb6373874d99b6ee28e859f88..9c9b5c23fa61de1c8ddc2182dfae4bb86c96059f 100644 --- a/tests/automatic/broker/get_meta/check_linux.sh +++ b/tests/automatic/broker/get_meta/check_linux.sh @@ -21,6 +21,6 @@ $1 -config settings.json & sleep 0.3 brokerid=`echo $!` -curl -v --silent 127.0.0.1:5005/database/test/stream/0/meta/0?token=$token --stderr - | grep '"data":"test"' -curl -v --silent 127.0.0.1:5005/database/test/stream/0/meta/1?token=$token --stderr - | grep 'not found' +curl -v --silent 127.0.0.1:5005/database/test/stream/0/meta/0?token=$token --stderr - | tee /dev/stderr | grep '"data":"test"' +curl -v --silent 127.0.0.1:5005/database/test/stream/0/meta/1?token=$token --stderr - | tee /dev/stderr | grep 'no documents' diff --git a/tests/automatic/broker/get_meta/check_windows.bat b/tests/automatic/broker/get_meta/check_windows.bat index 4fc55dd325f417040ec10c21648641390a47bb2d..3c6a4389155db64ef2c74a015df26abadf38d987 100644 --- a/tests/automatic/broker/get_meta/check_windows.bat +++ b/tests/automatic/broker/get_meta/check_windows.bat @@ -14,7 +14,7 @@ start /B "" "%full_name%" -config settings.json ping 1.0.0.0 -n 1 -w 100 > nul C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/stream/0/meta/0?token=%token% --stderr - | findstr /c:\"_id\":0 || goto :error -C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/stream/0/meta/1?token=%token% --stderr - | findstr /c:"not found" || goto :error +C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/stream/0/meta/1?token=%token% --stderr - | findstr /c:"no documents" || goto :error goto :clean diff --git a/tests/manual/broker_debug_local/asapo b/tests/manual/broker_debug_local/asapo new file mode 100755 index 0000000000000000000000000000000000000000..f7072c1eedeea64f1e2d2251d3ed14d4d59be014 Binary files /dev/null and b/tests/manual/broker_debug_local/asapo differ diff --git a/tests/manual/broker_debug_local/auth_secret.key b/tests/manual/broker_debug_local/auth_secret.key new file mode 100644 index 0000000000000000000000000000000000000000..1d100e0ec247d5df6a06e5029a392b93b2a6fbe2 --- /dev/null +++ b/tests/manual/broker_debug_local/auth_secret.key @@ -0,0 +1 @@ +12ljzgneasfd diff --git a/tests/manual/broker_debug_local/authorizer.json.tpl b/tests/manual/broker_debug_local/authorizer.json.tpl new file mode 100644 index 0000000000000000000000000000000000000000..7c3a796d20b0bdb556fc1d88f82e13922b5d140d --- /dev/null +++ b/tests/manual/broker_debug_local/authorizer.json.tpl @@ -0,0 +1,10 @@ +{ + "Port": {{ env "NOMAD_PORT_authorizer" }}, + "LogLevel":"debug", + "AlwaysAllowedBeamtimes":[{"BeamtimeId":"asapo_test","Beamline":"test"}, + {"BeamtimeId":"asapo_test1","Beamline":"test1"}, + {"BeamtimeId":"asapo_test2","Beamline":"test2"}], + "SecretFile":"auth_secret.key" +} + + diff --git a/tests/manual/broker_debug_local/authorizer.nmd b/tests/manual/broker_debug_local/authorizer.nmd new file mode 100644 index 0000000000000000000000000000000000000000..8b32105cf2e4644e97b6fda5e4aba06c6c269822 --- /dev/null +++ b/tests/manual/broker_debug_local/authorizer.nmd @@ -0,0 +1,55 @@ +job "authorizer" { + datacenters = ["dc1"] + + type = "service" + + group "group" { + count = 1 + + task "authorizer" { + driver = "raw_exec" + + config { + command = "/home/yakubov/projects/asapo/cmake-build-debug/authorizer/asapo-authorizer", + args = ["-config","${NOMAD_TASK_DIR}/authorizer.json"] + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + port "authorizer" { + static = "5007" + } + } + } + + service { + name = "authorizer" + port = "authorizer" + check { + name = "alive" + type = "http" + path = "/health-check" + interval = "10s" + timeout = "2s" + initial_status = "passing" + } + } + + template { + source = "/home/yakubov/projects/asapo/cmake-build-debug/tests/automatic/full_chain/simple_chain/authorizer.json.tpl" + destination = "local/authorizer.json" + change_mode = "signal" + change_signal = "SIGHUP" + } + + template { + source = "/home/yakubov/projects/asapo/cmake-build-debug/tests/automatic/full_chain/simple_chain/auth_secret.key" + destination = "auth_secret.key" + change_mode = "signal" + change_signal = "SIGHUP" + } + } + } +} diff --git a/tests/manual/broker_debug_local/broker.json b/tests/manual/broker_debug_local/broker.json new file mode 100644 index 0000000000000000000000000000000000000000..2827fe3d41de76cd52030923f3456ceb58bd09da --- /dev/null +++ b/tests/manual/broker_debug_local/broker.json @@ -0,0 +1,9 @@ +{ + "DatabaseServer":"auto", + "DiscoveryServer": "localhost:8400/discovery", + "PerformanceDbServer": "localhost:8086", + "PerformanceDbName": "db_test", + "port": 5005, + "LogLevel":"info", + "SecretFile":"auth_secret.key" +} diff --git a/tests/manual/broker_debug_local/broker.nmd b/tests/manual/broker_debug_local/broker.nmd new file mode 100644 index 0000000000000000000000000000000000000000..6028eac3d3eb065186062d7447943f23aa22f8a5 --- /dev/null +++ b/tests/manual/broker_debug_local/broker.nmd @@ -0,0 +1,55 @@ +job "broker" { + datacenters = ["dc1"] + + type = "service" + + group "group" { + count = 1 + + task "broker" { + driver = "raw_exec" + + config { + command = "/home/yakubov/projects/asapo/cmake-build-debug/broker/asapo-broker", + args = ["-config","${NOMAD_TASK_DIR}/broker.json"] + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + port "broker" { + static = "5005" + } + } + } + + service { + name = "asapo-broker" + port = "broker" + check { + name = "alive" + type = "http" + path = "/health" + interval = "10s" + timeout = "2s" + initial_status = "passing" + } + } + + template { + source = "/home/yakubov/projects/asapo/tests/manual/broker_debug_local/broker.json.tpl" + destination = "local/broker.json" + change_mode = "signal" + change_signal = "SIGHUP" + } + + template { + source = "/home/yakubov/projects/asapo/tests/manual/broker_debug_local/auth_secret.key" + destination = "auth_secret.key" + change_mode = "signal" + change_signal = "SIGHUP" + } + } + } +} diff --git a/tests/manual/broker_debug_local/clean_db.sh b/tests/manual/broker_debug_local/clean_db.sh new file mode 100755 index 0000000000000000000000000000000000000000..359d98df324b2135bc015a6c244ec3b63e79e47a --- /dev/null +++ b/tests/manual/broker_debug_local/clean_db.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +beamtime_id=asapo_test + + +echo "db.dropDatabase()" | mongo ${beamtime_id}_detector diff --git a/tests/manual/broker_debug_local/discovery.json b/tests/manual/broker_debug_local/discovery.json new file mode 100644 index 0000000000000000000000000000000000000000..bde0a2b819081929bc7ac4ef3c081d6ec60b13c2 --- /dev/null +++ b/tests/manual/broker_debug_local/discovery.json @@ -0,0 +1,19 @@ +{ + "Mode": "static", + "Receiver": { + "StaticEndpoints": [ + "127.0.0.1:22001" + ], + "MaxConnections": 32 + }, + "Broker": { + "StaticEndpoint": "127.0.0.1:5005" + }, + "Mongo": { + "StaticEndpoint": "127.0.0.1:27017" + }, + "Port": 5900, + "LogLevel":"debug" +} + + diff --git a/tests/manual/broker_debug_local/discovery.json.tpl b/tests/manual/broker_debug_local/discovery.json.tpl new file mode 100644 index 0000000000000000000000000000000000000000..fb4358e09bdc8921c8b76be4022190c954ee22c0 --- /dev/null +++ b/tests/manual/broker_debug_local/discovery.json.tpl @@ -0,0 +1,19 @@ +{ + "Mode": "static", + "Receiver": { + "StaticEndpoints": [ + "127.0.0.1:22001" + ], + "MaxConnections": 32 + }, + "Broker": { + "StaticEndpoint": "localhost:5005" + }, + "Mongo": { + "StaticEndpoint": "asapo-services:27017" + }, + "Port": {{ env "NOMAD_PORT_discovery" }}, + "LogLevel":"debug" +} + + diff --git a/tests/manual/broker_debug_local/discovery.nmd b/tests/manual/broker_debug_local/discovery.nmd new file mode 100644 index 0000000000000000000000000000000000000000..522d4753f290d2005f849d8a01727f54344e8500 --- /dev/null +++ b/tests/manual/broker_debug_local/discovery.nmd @@ -0,0 +1,49 @@ +job "discovery" { + datacenters = ["dc1"] + + type = "service" + + group "group" { + count = 1 + + task "discovery" { + driver = "raw_exec" + + config { + command = "/home/yakubov/projects/asapo/cmake-build-debug/discovery/asapo-discovery", + args = ["-config","${NOMAD_TASK_DIR}/discovery.json"] + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + port "discovery" { + static = "5006" + } + } + } + + service { + name = "discovery" + port = "discovery" + check { + name = "alive" + type = "http" + path = "/receivers" + interval = "10s" + timeout = "2s" + initial_status = "passing" + } + } + + template { + source = "/home/yakubov/projects/asapo/tests/manual/broker_debug_local/discovery.json.tpl" + destination = "local/discovery.json" + change_mode = "signal" + change_signal = "SIGHUP" + } + + } + } +} diff --git a/tests/manual/broker_debug_local/nginx.conf.tpl b/tests/manual/broker_debug_local/nginx.conf.tpl new file mode 100644 index 0000000000000000000000000000000000000000..b784d07256fc978f12d4aa94d8970a435b087df2 --- /dev/null +++ b/tests/manual/broker_debug_local/nginx.conf.tpl @@ -0,0 +1,91 @@ +worker_processes 1; + +events { + worker_connections 1024; +} + +http { +# include mime.types; +# default_type application/octet-stream; + +# sendfile on; +# tcp_nopush on; + +# keepalive_timeout 0; +# keepalive_timeout 65; + + resolver 127.0.0.1:8600 valid=1s; + server { + listen {{ env "NOMAD_PORT_nginx" }}; + set $discovery_endpoint discovery.service.asapo; + set $authorizer_endpoint authorizer.service.asapo; + set $fluentd_endpoint fluentd.service.asapo; + set $kibana_endpoint kibana.service.asapo; + set $grafana_endpoint grafana.service.asapo; + set $mongo_endpoint mongo.service.asapo; + set $influxdb_endpoint influxdb.service.asapo; + set $elasticsearch_endpoint elasticsearch.service.asapo; + + location /mongo/ { + rewrite ^/mongo(/.*) $1 break; + proxy_pass http://$mongo_endpoint:27017$uri$is_args$args; + } + + location /influxdb/ { + rewrite ^/influxdb(/.*) $1 break; + proxy_pass http://$influxdb_endpoint:8086$uri$is_args$args; + } + + location /elasticsearch/ { + rewrite ^/elasticsearch(/.*) $1 break; + proxy_pass http://$elasticsearch_endpoint:9200$uri$is_args$args; + } + + location /discovery/ { + rewrite ^/discovery(/.*) $1 break; + proxy_pass http://$discovery_endpoint:5006$uri$is_args$args; + } + + location /logs/ { + rewrite ^/logs(/.*) $1 break; + proxy_pass http://$fluentd_endpoint:9880$uri$is_args$args; + } + + location /logsview/ { + proxy_pass http://$kibana_endpoint:5601$uri$is_args$args; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header Host $http_host; + } + + location /performance/ { + rewrite ^/performance(/.*) $1 break; + proxy_pass http://$grafana_endpoint:3000$uri$is_args$args; + } + + location /authorizer/ { + rewrite ^/authorizer(/.*) $1 break; + proxy_pass http://$authorizer_endpoint:5007$uri$is_args$args; + } + + location /nginx-health { + return 200 "healthy\n"; + } + } +} + +stream { + resolver 127.0.0.1:8600 valid=1s; + + map $remote_addr $upstream { + default fluentd.service.asapo; + } + + + server { + listen 9881; + proxy_pass $upstream:24224; + } +} + + diff --git a/tests/manual/broker_debug_local/nginx.nmd b/tests/manual/broker_debug_local/nginx.nmd new file mode 100644 index 0000000000000000000000000000000000000000..b424e53874d17c7b0612106225f0250fa274fac4 --- /dev/null +++ b/tests/manual/broker_debug_local/nginx.nmd @@ -0,0 +1,63 @@ +job "nginx" { + datacenters = ["dc1"] + + type = "service" + + update { + max_parallel = 1 + min_healthy_time = "10s" + healthy_deadline = "3m" + auto_revert = false + } + + group "nginx" { + count = 1 + + restart { + attempts = 2 + interval = "30m" + delay = "15s" + mode = "fail" + } + + task "nginx" { + driver = "raw_exec" + + config { + command = "nginx", + args = ["-c","${NOMAD_TASK_DIR}/nginx.conf"] + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + mbits = 10 + port "nginx" { + static = 8400 + } + } + } + + service { + port = "nginx" + name = "nginx" + check { + name = "alive" + type = "http" + path = "/nginx-health" + timeout = "2s" + interval = "10s" + } + } + + template { + source = "/home/yakubov/projects/asapo/cmake-build-debug/tests/automatic/full_chain/simple_chain/nginx.conf.tpl" + destination = "local/nginx.conf" + change_mode = "restart" + } + + + } + } +} diff --git a/tests/manual/broker_debug_local/nginx_kill.nmd b/tests/manual/broker_debug_local/nginx_kill.nmd new file mode 100644 index 0000000000000000000000000000000000000000..cb3abbac259780ce7366042f24a19d635f032994 --- /dev/null +++ b/tests/manual/broker_debug_local/nginx_kill.nmd @@ -0,0 +1,17 @@ +job "nginx_kill" { + datacenters = ["dc1"] + + type = "batch" + + group "nginx_kill" { + count = 1 + + task "nginx_kill" { + driver = "raw_exec" + config { + command = "killall", + args = ["nginx"] + } + } + } +} diff --git a/tests/manual/broker_debug_local/receiver.json b/tests/manual/broker_debug_local/receiver.json new file mode 100644 index 0000000000000000000000000000000000000000..ace0fbecf3ab6726516b2e530ecbabe4ea6da2ad --- /dev/null +++ b/tests/manual/broker_debug_local/receiver.json @@ -0,0 +1,25 @@ +{ + "PerformanceDbServer":"localhost:8086", + "PerformanceDbName": "db_test", + "DatabaseServer":"localhost:27017", + "DiscoveryServer": "localhost:8400/discovery", + "AdvertiseIP":"127.0.0.1", + "DataServer": { + "NThreads": 2, + "ListenPort": 22000 + }, + "DataCache": { + "Use": true, + "SizeGB": 1, + "ReservedShare": 10 + }, + "AuthorizationServer": "localhost:8400/authorizer", + "AuthorizationInterval": 10000, + "ListenPort": 22001, + "Tag": "22001", + "WriteToDisk": true, + "ReceiveToDiskThresholdMB":50, + "WriteToDb": true, + "LogLevel" : "debug", + "RootFolder" : "/tmp/asapo/receiver/files" +} diff --git a/tests/manual/broker_debug_local/receiver.json.tpl b/tests/manual/broker_debug_local/receiver.json.tpl new file mode 100644 index 0000000000000000000000000000000000000000..215df3b161a1b3957ddabe70c0ffb83828a6365a --- /dev/null +++ b/tests/manual/broker_debug_local/receiver.json.tpl @@ -0,0 +1,25 @@ +{ + "AdvertiseIP": "127.0.0.1", + "PerformanceDbServer":"localhost:8086", + "PerformanceDbName": "db_test", + "DatabaseServer":"auto", + "DiscoveryServer": "localhost:8400/discovery", + "DataServer": { + "NThreads": 2, + "ListenPort": {{ env "NOMAD_PORT_recv_ds" }} + }, + "DataCache": { + "Use": true, + "SizeGB": 1, + "ReservedShare": 10 + }, + "AuthorizationServer": "localhost:8400/authorizer", + "AuthorizationInterval": 10000, + "ListenPort": {{ env "NOMAD_PORT_recv" }}, + "Tag": "{{ env "NOMAD_ADDR_recv" }}", + "WriteToDisk": true, + "ReceiveToDiskThresholdMB":50, + "WriteToDb": true, + "LogLevel" : "debug", + "RootFolder" : "/tmp/asapo/receiver/files" +} diff --git a/tests/manual/broker_debug_local/receiver.nmd b/tests/manual/broker_debug_local/receiver.nmd new file mode 100644 index 0000000000000000000000000000000000000000..bb4cfe877376ba98f0137725db8c074f5a93d75a --- /dev/null +++ b/tests/manual/broker_debug_local/receiver.nmd @@ -0,0 +1,47 @@ +job "receiver" { + datacenters = ["dc1"] + + type = "service" + + group "group" { + count = 1 + + task "receiver" { + driver = "raw_exec" + + config { + command = "/home/yakubov/projects/asapo/cmake-build-debug/receiver/receiver", + args = ["${NOMAD_TASK_DIR}/receiver.json"] + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + port "recv" {} + port "recv_ds" {} + } + } + + service { + name = "asapo-receiver" + port = "recv" + check { + name = "alive" + type = "tcp" + interval = "10s" + timeout = "2s" + initial_status = "passing" + } + } + + template { + source = "/home/yakubov/projects/asapo/cmake-build-debug/tests/automatic/producer_receiver/check_monitoring/receiver.json.tpl" + destination = "local/receiver.json" + change_mode = "signal" + change_signal = "SIGHUP" + } + + } + } +} diff --git a/tests/manual/broker_debug_local/start_broker.sh b/tests/manual/broker_debug_local/start_broker.sh new file mode 100755 index 0000000000000000000000000000000000000000..020e1b1c8a91816f3c4bd019a47da976e601eb87 --- /dev/null +++ b/tests/manual/broker_debug_local/start_broker.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + + +exec=/home/yakubov/projects/asapo/cmake-build-debug/broker/asapo-broker + + +$exec -config broker.json diff --git a/tests/manual/broker_debug_local/start_getnext.sh b/tests/manual/broker_debug_local/start_getnext.sh new file mode 100755 index 0000000000000000000000000000000000000000..a9efa78f00c7c85e8422b776b05c40f8f1d74c28 --- /dev/null +++ b/tests/manual/broker_debug_local/start_getnext.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +beamtime_id=asapo_test + +token= +timeout=100 +metaonly=0 +nthreads=4 +token=IEfwsWa0GXky2S3MkxJSUHJT1sI8DD5teRdjBUXVRxk= + +exec=/home/yakubov/projects/asapo/cmake-build-debug/examples/consumer/getnext_broker/getnext_broker + +$exec localhost:8400 /tmp/asapo/receiver/files/test/asapo_test asapo_test $nthreads $token $timeout $metaonly diff --git a/tests/manual/broker_debug_local/start_services.sh b/tests/manual/broker_debug_local/start_services.sh new file mode 100755 index 0000000000000000000000000000000000000000..0de9567bff5b2665537da0cba9fca69a1b386877 --- /dev/null +++ b/tests/manual/broker_debug_local/start_services.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +nomad run authorizer.nmd +nomad run discovery.nmd +#nomad run broker.nmd +nomad run receiver.nmd +nomad run nginx.nmd diff --git a/tests/manual/broker_debug_local/stop_services.sh b/tests/manual/broker_debug_local/stop_services.sh new file mode 100755 index 0000000000000000000000000000000000000000..db503c59efa2b699d0563487cd44dd1315248a48 --- /dev/null +++ b/tests/manual/broker_debug_local/stop_services.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +nomad stop -purge authorizer +nomad stop -purge discovery +nomad stop -purge broker +nomad stop -purge nginx +nomad run nginx_kill.nmd && nomad stop -yes -purge nginx_kill