From f3ea1aa9048a3c8acc88d66ced2753b8899da04e Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Wed, 18 Nov 2020 12:32:46 +0100 Subject: [PATCH] refactor broker --- broker/src/asapo_broker/database/database.go | 12 +- .../asapo_broker/database/mock_database.go | 4 +- broker/src/asapo_broker/database/mongodb.go | 38 +- .../src/asapo_broker/database/mongodb_test.go | 324 +++++++++--------- .../asapo_broker/server/get_commands_test.go | 2 +- broker/src/asapo_broker/server/get_next.go | 3 - .../asapo_broker/server/post_op_image_test.go | 2 +- .../server/post_query_images_test.go | 3 +- .../server/post_reset_counter_test.go | 4 +- .../asapo_broker/server/process_request.go | 21 +- .../server/process_request_test.go | 29 +- .../src/asapo_broker/server/request_common.go | 20 +- .../getnext_broker/getnext_broker.cpp | 2 +- .../consumer/consumer_api/consumer_api.cpp | 10 +- .../getlast_broker.cpp | 2 +- 15 files changed, 257 insertions(+), 219 deletions(-) diff --git a/broker/src/asapo_broker/database/database.go b/broker/src/asapo_broker/database/database.go index 61ec11360..5df85c2dd 100644 --- a/broker/src/asapo_broker/database/database.go +++ b/broker/src/asapo_broker/database/database.go @@ -1,7 +1,17 @@ package database +type Request struct { + DbName string + DbCollectionName string + GroupId string + Op string + DatasetOp bool + MinDatasetSize int + ExtraParam string +} + type Agent interface { - ProcessRequest(db_name string, data_collection_name string, group_id string, op string, extra string) ([]byte, error) + ProcessRequest(request Request) ([]byte, error) Ping() error Connect(string) error Close() diff --git a/broker/src/asapo_broker/database/mock_database.go b/broker/src/asapo_broker/database/mock_database.go index 3797f8afb..574a8aa51 100644 --- a/broker/src/asapo_broker/database/mock_database.go +++ b/broker/src/asapo_broker/database/mock_database.go @@ -29,7 +29,7 @@ func (db *MockedDatabase) SetSettings(settings DBSettings) { } -func (db *MockedDatabase) ProcessRequest(db_name string, data_collection_name string, group_id string, op string, extra_param string) (answer []byte, err error) { - args := db.Called(db_name, data_collection_name, group_id, op, extra_param) +func (db *MockedDatabase) ProcessRequest(request Request) (answer []byte, err error) { + args := db.Called(request) return args.Get(0).([]byte), args.Error(1) } diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 576655bb4..5a73166b8 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -764,43 +764,37 @@ func (db *Mongodb) getSubstreams(db_name string, from string) ([]byte, error) { } -func (db *Mongodb) ProcessRequest(db_name string, collection_name string, group_id string, op string, extra_param string) (answer []byte, err error) { - dataset := false - if strings.HasSuffix(op, "_dataset") { - dataset = true - op = op[:len(op)-8] - } - - if err := db.checkDatabaseOperationPrerequisites(db_name, collection_name, group_id); err != nil { +func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) { + if err := db.checkDatabaseOperationPrerequisites(request.DbName, request.DbCollectionName, request.GroupId); err != nil { return nil, err } - switch op { + switch request.Op { case "next": - return db.getNextRecord(db_name, collection_name, group_id, dataset, extra_param) + return db.getNextRecord(request.DbName, request.DbCollectionName, request.GroupId, request.DatasetOp, request.ExtraParam) case "id": - return db.getRecordByID(db_name, collection_name, group_id, extra_param, dataset) + return db.getRecordByID(request.DbName, request.DbCollectionName, request.GroupId, request.ExtraParam, request.DatasetOp) case "last": - return db.getLastRecord(db_name, collection_name, group_id, dataset) + return db.getLastRecord(request.DbName, request.DbCollectionName, request.GroupId, request.DatasetOp) case "resetcounter": - return db.resetCounter(db_name, collection_name, group_id, extra_param) + return db.resetCounter(request.DbName, request.DbCollectionName, request.GroupId, request.ExtraParam) case "size": - return db.getSize(db_name, collection_name) + return db.getSize(request.DbName, request.DbCollectionName) case "meta": - return db.getMeta(db_name, extra_param) + return db.getMeta(request.DbName, request.ExtraParam) case "queryimages": - return db.queryImages(db_name, collection_name, extra_param) + return db.queryImages(request.DbName, request.DbCollectionName, request.ExtraParam) case "substreams": - return db.getSubstreams(db_name,extra_param) + return db.getSubstreams(request.DbName,request.ExtraParam) case "ackimage": - return db.ackRecord(db_name, collection_name, group_id, extra_param) + return db.ackRecord(request.DbName, request.DbCollectionName, request.GroupId, request.ExtraParam) case "negackimage": - return db.negAckRecord(db_name, group_id, extra_param) + return db.negAckRecord(request.DbName, request.GroupId, request.ExtraParam) case "nacks": - return db.nacks(db_name, collection_name, group_id, extra_param) + return db.nacks(request.DbName, request.DbCollectionName, request.GroupId, request.ExtraParam) case "lastack": - return db.lastAck(db_name, collection_name, group_id) + return db.lastAck(request.DbName, request.DbCollectionName, request.GroupId) } - return nil, errors.New("Wrong db operation: " + op) + return nil, errors.New("Wrong db operation: " + request.Op) } diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index 8e2d97295..8a93623bb 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -13,10 +13,10 @@ import ( ) type TestRecord struct { - ID int `bson:"_id" json:"_id"` - Meta map[string]string `bson:"meta" json:"meta"` - Name string `bson:"name" json:"name"` - Timestamp int64 `bson:"timestamp" json:"timestamp"` + ID int `bson:"_id" json:"_id"` + Meta map[string]string `bson:"meta" json:"meta"` + Name string `bson:"name" json:"name"` + Timestamp int64 `bson:"timestamp" json:"timestamp"` } type TestDataset struct { @@ -37,10 +37,10 @@ const metaID_str = "0" var empty_next = map[string]string{"next_substream": ""} -var rec1 = TestRecord{1, empty_next, "aaa",0} -var rec_finished = TestRecord{2, map[string]string{"next_substream": "next1"}, finish_substream_keyword,0} -var rec2 = TestRecord{2, empty_next, "bbb",1} -var rec3 = TestRecord{3, empty_next, "ccc",2} +var rec1 = TestRecord{1, empty_next, "aaa", 0} +var rec_finished = TestRecord{2, map[string]string{"next_substream": "next1"}, finish_substream_keyword, 0} +var rec2 = TestRecord{2, empty_next, "bbb", 1} +var rec3 = TestRecord{3, empty_next, "ccc", 2} var rec1_expect, _ = json.Marshal(rec1) var rec2_expect, _ = json.Marshal(rec2) @@ -75,31 +75,31 @@ func TestMongoDBConnectOK(t *testing.T) { } func TestMongoDBGetNextErrorWhenNotConnected(t *testing.T) { - _, err := db.ProcessRequest(dbname, collection, groupId, "next", "") + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) } func TestMongoDBGetMetaErrorWhenNotConnected(t *testing.T) { - _, err := db.ProcessRequest(dbname, collection, "", "meta", "0") + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "meta", ExtraParam: "0"}) assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) } func TestMongoDBQueryImagesErrorWhenNotConnected(t *testing.T) { - _, err := db.ProcessRequest(dbname, collection, "", "queryimages", "0") + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "queryimages", ExtraParam: "0"}) assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) } func TestMongoDBGetNextErrorWhenWrongDatabasename(t *testing.T) { db.Connect(dbaddress) defer cleanup() - _, err := db.ProcessRequest("", collection, groupId, "next", "") + _, err := db.ProcessRequest(Request{DbCollectionName: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusWrongInput, err.(*DBError).Code) } func TestMongoDBGetNextErrorWhenNonExistingDatacollectionname(t *testing.T) { db.Connect(dbaddress) defer cleanup() - _, err := db.ProcessRequest(dbname, "bla", groupId, "next", "") + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: "bla", GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_substream\":\"\"}", err.Error()) } @@ -107,26 +107,25 @@ func TestMongoDBGetNextErrorWhenNonExistingDatacollectionname(t *testing.T) { func TestMongoDBGetLastErrorWhenNonExistingDatacollectionname(t *testing.T) { db.Connect(dbaddress) defer cleanup() - _, err := db.ProcessRequest(dbname, "bla", groupId, "last", "") + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: "bla", GroupId: groupId, Op: "last"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_substream\":\"\"}", err.Error()) } -func TestMongoDBGetByIdErrorWhenNonExistingDatacollectionname(t *testing.T) { +func TestMongoDBGetByIdErrorWhenNoData(t *testing.T) { db.Connect(dbaddress) defer cleanup() - _, err := db.ProcessRequest(dbname, collection, groupId, "id", "2") + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", ExtraParam: "2"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":0,\"next_substream\":\"\"}", err.Error()) } - func TestMongoDBGetNextErrorWhenRecordNotThereYet(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec2) - _, err := db.ProcessRequest(dbname, collection, groupId, "next", "") + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":2,\"next_substream\":\"\"}", err.Error()) } @@ -135,20 +134,19 @@ func TestMongoDBGetNextOK(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - res, err := db.ProcessRequest(dbname, collection, groupId, "next", "") + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } - func TestMongoDBGetNextErrorOnFinishedStream(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec_finished) - db.ProcessRequest(dbname, collection, groupId, "next", "") - _, err := db.ProcessRequest(dbname, collection, groupId, "next", "") + db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":2,\"next_substream\":\"next1\"}", err.(*DBError).Message) @@ -158,8 +156,8 @@ func TestMongoDBGetNextErrorOnNoMoreData(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - db.ProcessRequest(dbname, collection, groupId, "next", "") - _, err := db.ProcessRequest(dbname, collection, groupId, "next", "") + db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"\"}", err.(*DBError).Message) @@ -170,8 +168,8 @@ func TestMongoDBGetNextCorrectOrder(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec2) db.insertRecord(dbname, collection, &rec1) - res1, _ := db.ProcessRequest(dbname, collection, groupId, "next", "") - res2, _ := db.ProcessRequest(dbname, collection, groupId, "next", "") + res1, _ := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) + res2, _ := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, string(rec1_expect), string(res1)) assert.Equal(t, string(rec2_expect), string(res2)) } @@ -191,8 +189,8 @@ func insertRecords(n int) { for ind, record := range records { record.ID = ind + 1 record.Name = string(ind) - if err:= db.insertRecord(dbname, collection, &record);err!=nil { - fmt.Println("error at insert ",ind) + if err := db.insertRecord(dbname, collection, &record); err != nil { + fmt.Println("error at insert ", ind) } } } @@ -201,20 +199,20 @@ func getRecords(n int, resend bool) []int { results := make([]int, n) var wg sync.WaitGroup wg.Add(n) - extra_param:="" + extra_param := "" if resend { - extra_param="0_1" + extra_param = "0_1" } for i := 0; i < n; i++ { go func() { defer wg.Done() - res_bin, err:= db.ProcessRequest(dbname, collection, groupId, "next", extra_param) - if err!=nil { - fmt.Println("error at read ",i) + res_bin, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: extra_param}) + if err != nil { + fmt.Println("error at read ", i) } var res TestRecord json.Unmarshal(res_bin, &res) - if res.ID>0 { + if res.ID > 0 { results[res.ID-1] = 1 } }() @@ -230,7 +228,7 @@ func TestMongoDBGetNextInParallel(t *testing.T) { n := 100 insertRecords(n) - results := getRecords(n,false) + results := getRecords(n, false) assert.Equal(t, n, getNOnes(results)) } @@ -242,26 +240,24 @@ func TestMongoDBGetNextInParallelWithResend(t *testing.T) { n := 100 insertRecords(n) - results := getRecords(n,true) - results2 := getRecords(n,true) + results := getRecords(n, true) + results2 := getRecords(n, true) - assert.Equal(t, n, getNOnes(results),"first") - assert.Equal(t, n, getNOnes(results2),"second") + assert.Equal(t, n, getNOnes(results), "first") + assert.Equal(t, n, getNOnes(results2), "second") } - - func TestMongoDBGetLastAfterErasingDatabase(t *testing.T) { db.Connect(dbaddress) defer cleanup() insertRecords(10) - db.ProcessRequest(dbname, collection, groupId, "next", "") + db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) db.dropDatabase(dbname) db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(dbname, collection, groupId, "last", "0") + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", ExtraParam: "0"}) assert.Nil(t, err) assert.Equal(t, string(rec2_expect), string(res)) } @@ -270,12 +266,12 @@ func TestMongoDBGetNextAfterErasingDatabase(t *testing.T) { db.Connect(dbaddress) defer cleanup() insertRecords(200) - db.ProcessRequest(dbname, collection, groupId, "next", "") + db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) db.dropDatabase(dbname) n := 100 insertRecords(n) - results := getRecords(n,false) + results := getRecords(n, false) assert.Equal(t, n, getNOnes(results)) } @@ -283,20 +279,20 @@ func TestMongoDBGetNextEmptyAfterErasingDatabase(t *testing.T) { db.Connect(dbaddress) defer cleanup() insertRecords(10) - db.ProcessRequest(dbname, collection, groupId, "next", "") + db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) db.dropDatabase(dbname) - _, err := db.ProcessRequest(dbname, collection, groupId, "next", "") + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_substream\":\"\"}", err.Error()) } - func TestMongoDBgetRecordByID(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - res, err := db.ProcessRequest(dbname, collection, groupId, "id", "1") + + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", ExtraParam: "1"}) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -305,7 +301,7 @@ func TestMongoDBgetRecordByIDFails(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - _, err := db.ProcessRequest(dbname, collection, groupId, "id", "2") + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", ExtraParam: "2"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":1,\"next_substream\":\"\"}", err.Error()) } @@ -314,7 +310,7 @@ func TestMongoDBGetRecordNext(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - res, err := db.ProcessRequest(dbname, collection, groupId, "next", "") + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -325,8 +321,8 @@ func TestMongoDBGetRecordNextMultipleCollections(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection2, &rec_dataset1) - res, err := db.ProcessRequest(dbname, collection, groupId, "next", "") - res_string, err2 := db.ProcessRequest(dbname, collection2, groupId, "next_dataset", "") + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) + res_string, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection2, GroupId: groupId, Op: "next", DatasetOp: true}) var res_ds TestDataset json.Unmarshal(res_string, &res_ds) @@ -342,7 +338,7 @@ func TestMongoDBGetRecordID(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - res, err := db.ProcessRequest(dbname, collection, groupId, "id", "1") + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", ExtraParam: "1"}) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -351,7 +347,7 @@ func TestMongoDBWrongOp(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - _, err := db.ProcessRequest(dbname, collection, groupId, "bla", "0") + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "bla"}) assert.NotNil(t, err) } @@ -361,7 +357,7 @@ func TestMongoDBGetRecordLast(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(dbname, collection, groupId, "last", "0") + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", ExtraParam: "0"}) assert.Nil(t, err) assert.Equal(t, string(rec2_expect), string(res)) } @@ -372,13 +368,13 @@ func TestMongoDBGetNextAfterGetLastCorrect(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(dbname, collection, groupId, "last", "0") + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", ExtraParam: "0"}) assert.Nil(t, err) assert.Equal(t, string(rec2_expect), string(res)) db.insertRecord(dbname, collection, &rec3) - res, err = db.ProcessRequest(dbname, collection, groupId, "next", "") + res, err = db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) assert.Nil(t, err) assert.Equal(t, string(rec3_expect), string(res)) @@ -391,7 +387,7 @@ func TestMongoDBGetSize(t *testing.T) { db.insertRecord(dbname, collection, &rec2) db.insertRecord(dbname, collection, &rec3) - res, err := db.ProcessRequest(dbname, collection, "", "size", "0") + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size"}) assert.Nil(t, err) assert.Equal(t, string(recs1_expect), string(res)) } @@ -400,7 +396,7 @@ func TestMongoDBGetSizeNoRecords(t *testing.T) { db.Connect(dbaddress) defer cleanup() - res, err := db.ProcessRequest(dbname, collection, "", "size", "0") + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "size"}) assert.Nil(t, err) assert.Equal(t, string(recs2_expect), string(res)) } @@ -418,7 +414,7 @@ func TestMongoPingNotConected(t *testing.T) { } func TestMongoDBgetRecordByIDNotConnected(t *testing.T) { - _, err := db.ProcessRequest(dbname, collection, "", "id", "2") + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", ExtraParam: "1"}) assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) } @@ -428,15 +424,15 @@ func TestMongoDBResetCounter(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "") + res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) assert.Nil(t, err1) assert.Equal(t, string(rec1_expect), string(res1)) - _, err_reset := db.ProcessRequest(dbname, collection, groupId, "resetcounter", "1") + _, err_reset := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "resetcounter", ExtraParam: "1"}) assert.Nil(t, err_reset) - res2, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "") + res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) assert.Nil(t, err2) assert.Equal(t, string(rec2_expect), string(res2)) @@ -450,7 +446,7 @@ func TestMongoDBGetMetaOK(t *testing.T) { rec_expect, _ := json.Marshal(recm) db.insertMeta(dbname, &recm) - res, err := db.ProcessRequest(dbname, collection, "", "meta", metaID_str) + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "meta", ExtraParam: metaID_str}) assert.Nil(t, err) assert.Equal(t, string(rec_expect), string(res)) @@ -460,7 +456,7 @@ func TestMongoDBGetMetaErr(t *testing.T) { db.Connect(dbaddress) defer cleanup() - _, err := db.ProcessRequest(dbname, collection, "", "meta", metaID_str) + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "meta", ExtraParam: metaID_str}) assert.NotNil(t, err) } @@ -486,7 +482,7 @@ var tests = []struct { res []TestRecordMeta ok bool }{ - {"_id > 0", []TestRecordMeta{recq1, recq2,recq3,recq4}, true}, + {"_id > 0", []TestRecordMeta{recq1, recq2, recq3, recq4}, true}, {"meta.counter = 10", []TestRecordMeta{recq1, recq3}, true}, {"meta.counter = 10 ORDER BY _id DESC", []TestRecordMeta{recq3, recq1}, true}, {"meta.counter > 10 ORDER BY meta.counter DESC", []TestRecordMeta{recq4, recq2}, true}, @@ -535,7 +531,7 @@ func TestMongoDBQueryImagesOK(t *testing.T) { // continue // } - res_string, err := db.ProcessRequest(dbname, collection, "", "queryimages", test.query) + res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "queryimages", ExtraParam: test.query}) var res []TestRecordMeta json.Unmarshal(res_string, &res) // fmt.Println(string(res_string)) @@ -554,7 +550,7 @@ func TestMongoDBQueryImagesOnEmptyDatabase(t *testing.T) { db.Connect(dbaddress) defer cleanup() for _, test := range tests { - res_string, err := db.ProcessRequest(dbname, collection, "", "queryimages", test.query) + res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "queryimages", ExtraParam: test.query}) var res []TestRecordMeta json.Unmarshal(res_string, &res) assert.Equal(t, 0, len(res)) @@ -577,7 +573,7 @@ func TestMongoDBGetDataset(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1) - res_string, err := db.ProcessRequest(dbname, collection, groupId, "next_dataset", "") + res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", DatasetOp: true}) assert.Nil(t, err) @@ -593,7 +589,7 @@ func TestMongoDBNoDataOnNotCompletedFirstDataset(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1_incomplete) - res_string, err := db.ProcessRequest(dbname, collection, groupId, "next_dataset", "") + res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", DatasetOp: true}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_substream\":\"\"}", err.(*DBError).Message) @@ -608,7 +604,7 @@ func TestMongoDBGetRecordLastDataSetSkipsIncompleteSets(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1) db.insertRecord(dbname, collection, &rec_dataset2) - res_string, err := db.ProcessRequest(dbname, collection, groupId, "last_dataset", "0") + res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", DatasetOp:true, ExtraParam: "0"}) assert.Nil(t, err) @@ -625,7 +621,7 @@ func TestMongoDBGetRecordLastDataSetOK(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1) db.insertRecord(dbname, collection, &rec_dataset3) - res_string, err := db.ProcessRequest(dbname, collection, groupId, "last_dataset", "0") + res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"}) assert.Nil(t, err) @@ -640,7 +636,7 @@ func TestMongoDBGetDatasetID(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec_dataset1) - res_string, err := db.ProcessRequest(dbname, collection, groupId, "id_dataset", "1") + res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "id", DatasetOp:true, ExtraParam: "1"}) assert.Nil(t, err) @@ -657,28 +653,29 @@ type Substream struct { } var testsSubstreams = []struct { - from string - substreams []Substream + from string + substreams []Substream expectedSubstreams SubstreamsRecord - test string - ok bool + test string + ok bool }{ - {"",[]Substream{},SubstreamsRecord{[]SubstreamInfo{}}, "no substreams", true}, - {"",[]Substream{{"ss1",[]TestRecord{rec2,rec1}}},SubstreamsRecord{[]SubstreamInfo{SubstreamInfo{Name: "ss1",Timestamp: 0}}}, "one substream", true}, - {"",[]Substream{{"ss1",[]TestRecord{rec2,rec1}},{"ss2",[]TestRecord{rec2,rec3}}},SubstreamsRecord{[]SubstreamInfo{SubstreamInfo{Name: "ss1",Timestamp: 0},SubstreamInfo{Name: "ss2",Timestamp: 1}}}, "two substreams", true}, - {"ss2",[]Substream{{"ss1",[]TestRecord{rec1,rec2}},{"ss2",[]TestRecord{rec2,rec3}}},SubstreamsRecord{[]SubstreamInfo{SubstreamInfo{Name: "ss2",Timestamp: 1}}}, "with from", true}, + {"", []Substream{}, SubstreamsRecord{[]SubstreamInfo{}}, "no substreams", true}, + {"", []Substream{{"ss1", []TestRecord{rec2, rec1}}}, SubstreamsRecord{[]SubstreamInfo{SubstreamInfo{Name: "ss1", Timestamp: 0}}}, "one substream", true}, + {"", []Substream{{"ss1", []TestRecord{rec2, rec1}}, {"ss2", []TestRecord{rec2, rec3}}}, SubstreamsRecord{[]SubstreamInfo{SubstreamInfo{Name: "ss1", Timestamp: 0}, SubstreamInfo{Name: "ss2", Timestamp: 1}}}, "two substreams", true}, + {"ss2", []Substream{{"ss1", []TestRecord{rec1, rec2}}, {"ss2", []TestRecord{rec2, rec3}}}, SubstreamsRecord{[]SubstreamInfo{SubstreamInfo{Name: "ss2", Timestamp: 1}}}, "with from", true}, } func TestMongoDBListSubstreams(t *testing.T) { for _, test := range testsSubstreams { db.Connect(dbaddress) for _, substream := range test.substreams { - for _,rec:= range substream.records { + for _, rec := range substream.records { db.insertRecord(dbname, substream.name, &rec) } } var rec_substreams_expect, _ = json.Marshal(test.expectedSubstreams) - res, err := db.ProcessRequest(dbname, "0", "0", "substreams", test.from) + + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: "0", Op: "substreams", ExtraParam: test.from}) if test.ok { assert.Nil(t, err, test.test) assert.Equal(t, string(rec_substreams_expect), string(res), test.test) @@ -695,50 +692,51 @@ func TestMongoDBAckImage(t *testing.T) { db.insertRecord(dbname, collection, &rec1) query_str := "{\"Id\":1,\"Op\":\"ackimage\"}" - res, err := db.ProcessRequest(dbname, collection, groupId, "ackimage", query_str) - nacks,_ := db.getNacks(dbname,collection,groupId,1,1) + + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "ackimage", ExtraParam: query_str}) + nacks, _ := db.getNacks(dbname, collection, groupId, 1, 1) assert.Nil(t, err) assert.Equal(t, "", string(res)) assert.Equal(t, 0, len(nacks)) } var testsNacs = []struct { - rangeString string - resString string + rangeString string + resString string insertRecords bool - ackRecords bool - ok bool - test string + ackRecords bool + ok bool + test string }{ - {"0_0", "{\"unacknowledged\":[1,2,3,4,5,6,7,8,9,10]}",true,false,true,"whole range"}, - {"", "{\"unacknowledged\":[1,2,3,4,5,6,7,8,9,10]}",true,false,false,"empty string range"}, - {"0_5", "{\"unacknowledged\":[1,2,3,4,5]}",true,false,true,"to given"}, - {"5_0", "{\"unacknowledged\":[5,6,7,8,9,10]}",true,false,true,"from given"}, - {"3_7", "{\"unacknowledged\":[3,4,5,6,7]}",true,false,true,"range given"}, - {"1_1", "{\"unacknowledged\":[1]}",true,false,true,"single record"}, - {"3_1", "{\"unacknowledged\":[]}",true,false,false,"to lt from"}, - {"0_0", "{\"unacknowledged\":[]}",false,false,true,"no records"}, - {"0_0", "{\"unacknowledged\":[1,5,6,7,8,9,10]}",true,true,true,"skip acks"}, - {"2_4", "{\"unacknowledged\":[]}",true,true,true,"all acknowledged"}, - {"1_4", "{\"unacknowledged\":[1]}",true,true,true,"some acknowledged"}, + {"0_0", "{\"unacknowledged\":[1,2,3,4,5,6,7,8,9,10]}", true, false, true, "whole range"}, + {"", "{\"unacknowledged\":[1,2,3,4,5,6,7,8,9,10]}", true, false, false, "empty string range"}, + {"0_5", "{\"unacknowledged\":[1,2,3,4,5]}", true, false, true, "to given"}, + {"5_0", "{\"unacknowledged\":[5,6,7,8,9,10]}", true, false, true, "from given"}, + {"3_7", "{\"unacknowledged\":[3,4,5,6,7]}", true, false, true, "range given"}, + {"1_1", "{\"unacknowledged\":[1]}", true, false, true, "single record"}, + {"3_1", "{\"unacknowledged\":[]}", true, false, false, "to lt from"}, + {"0_0", "{\"unacknowledged\":[]}", false, false, true, "no records"}, + {"0_0", "{\"unacknowledged\":[1,5,6,7,8,9,10]}", true, true, true, "skip acks"}, + {"2_4", "{\"unacknowledged\":[]}", true, true, true, "all acknowledged"}, + {"1_4", "{\"unacknowledged\":[1]}", true, true, true, "some acknowledged"}, } - func TestMongoDBNacks(t *testing.T) { for _, test := range testsNacs { db.Connect(dbaddress) - if test.insertRecords { + if test.insertRecords { insertRecords(10) } - if (test.ackRecords) { - db.ackRecord(dbname, collection, groupId,"{\"Id\":2,\"Op\":\"ackimage\"}") - db.ackRecord(dbname, collection, groupId,"{\"Id\":3,\"Op\":\"ackimage\"}") - db.ackRecord(dbname, collection, groupId,"{\"Id\":4,\"Op\":\"ackimage\"}") + if test.ackRecords { + db.ackRecord(dbname, collection, groupId, "{\"Id\":2,\"Op\":\"ackimage\"}") + db.ackRecord(dbname, collection, groupId, "{\"Id\":3,\"Op\":\"ackimage\"}") + db.ackRecord(dbname, collection, groupId, "{\"Id\":4,\"Op\":\"ackimage\"}") } - res, err := db.ProcessRequest(dbname, collection, groupId, "nacks", test.rangeString) + + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "nacks", ExtraParam: test.rangeString}) if test.ok { assert.Nil(t, err, test.test) - assert.Equal(t, test.resString, string(res),test.test) + assert.Equal(t, test.resString, string(res), test.test) } else { assert.NotNil(t, err, test.test) } @@ -748,43 +746,43 @@ func TestMongoDBNacks(t *testing.T) { var testsLastAcs = []struct { insertRecords bool - ackRecords bool - resString string - test string + ackRecords bool + resString string + test string }{ - {false,false,"{\"lastAckId\":0}","empty db"}, - {true,false,"{\"lastAckId\":0}","no acks"}, - {true,true,"{\"lastAckId\":4}","last ack 4"}, + {false, false, "{\"lastAckId\":0}", "empty db"}, + {true, false, "{\"lastAckId\":0}", "no acks"}, + {true, true, "{\"lastAckId\":4}", "last ack 4"}, } - func TestMongoDBLastAcks(t *testing.T) { for _, test := range testsLastAcs { db.Connect(dbaddress) - if test.insertRecords { + if test.insertRecords { insertRecords(10) } - if (test.ackRecords) { - db.ackRecord(dbname, collection, groupId,"{\"Id\":2,\"Op\":\"ackimage\"}") - db.ackRecord(dbname, collection, groupId,"{\"Id\":3,\"Op\":\"ackimage\"}") - db.ackRecord(dbname, collection, groupId,"{\"Id\":4,\"Op\":\"ackimage\"}") + if test.ackRecords { + db.ackRecord(dbname, collection, groupId, "{\"Id\":2,\"Op\":\"ackimage\"}") + db.ackRecord(dbname, collection, groupId, "{\"Id\":3,\"Op\":\"ackimage\"}") + db.ackRecord(dbname, collection, groupId, "{\"Id\":4,\"Op\":\"ackimage\"}") } - res, err := db.ProcessRequest(dbname, collection, groupId, "lastack", "") + + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "lastack"}) assert.Nil(t, err, test.test) - assert.Equal(t, test.resString, string(res),test.test) + assert.Equal(t, test.resString, string(res), test.test) cleanup() } } - func TestMongoDBGetNextUsesInprocessedImmedeatly(t *testing.T) { db.SetSettings(DBSettings{ReadFromInprocessPeriod: 0}) db.Connect(dbaddress) defer cleanup() err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0_3") - res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "0_3") + + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) assert.Nil(t, err) assert.Nil(t, err1) @@ -797,14 +795,14 @@ func TestMongoDBGetNextUsesInprocessedNumRetry(t *testing.T) { db.Connect(dbaddress) defer cleanup() err := db.insertRecord(dbname, collection, &rec1) - res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0_1") - res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "0_1") - _, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "0_1") + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) + res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) + _, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) assert.Nil(t, err) assert.Nil(t, err1) assert.NotNil(t, err2) - if err2!=nil { + if err2 != nil { assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"\"}", err2.Error()) } assert.Equal(t, string(rec1_expect), string(res)) @@ -817,10 +815,10 @@ func TestMongoDBGetNextUsesInprocessedAfterTimeout(t *testing.T) { defer cleanup() err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(dbname, collection, groupId, "next", "1_3") - res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "1_3") + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"}) + res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"}) time.Sleep(time.Second) - res2, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "1_3") + res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"}) assert.Nil(t, err) assert.Nil(t, err1) assert.Nil(t, err2) @@ -835,10 +833,10 @@ func TestMongoDBGetNextReturnsToNormalAfterUsesInprocessed(t *testing.T) { defer cleanup() err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(dbname, collection, groupId, "next", "1_3") + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"}) time.Sleep(time.Second) - res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "1_3") - res2, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "1_3") + res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"}) + res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "1_3"}) assert.Nil(t, err) assert.Nil(t, err1) assert.Nil(t, err2) @@ -847,15 +845,14 @@ func TestMongoDBGetNextReturnsToNormalAfterUsesInprocessed(t *testing.T) { assert.Equal(t, string(rec2_expect), string(res2)) } - func TestMongoDBGetNextUsesInprocessedImmedeatlyIfFinishedStream(t *testing.T) { db.SetSettings(DBSettings{ReadFromInprocessPeriod: 10}) db.Connect(dbaddress) defer cleanup() err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec_finished) - res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0_3") - res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "0_3") + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) assert.Nil(t, err) assert.Nil(t, err1) assert.Equal(t, string(rec1_expect), string(res)) @@ -868,9 +865,9 @@ func TestMongoDBGetNextUsesInprocessedImmedeatlyIfEndofStream(t *testing.T) { defer cleanup() err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0_3") - res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "0_3") - res2, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "0_3") + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) assert.Nil(t, err) assert.Nil(t, err1) assert.Nil(t, err2) @@ -884,43 +881,43 @@ func TestMongoDBAckDeletesInprocessed(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - db.ProcessRequest(dbname, collection, groupId, "next", "0_3") + db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) query_str := "{\"Id\":1,\"Op\":\"ackimage\"}" - db.ProcessRequest(dbname, collection, groupId, "ackimage", query_str) - _, err := db.ProcessRequest(dbname, collection, groupId, "next", "0_3") + + db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "ackimage", ExtraParam: query_str}) + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) assert.NotNil(t, err) - if err!=nil { + if err != nil { assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"\"}", err.Error()) } } - func TestMongoDBNegAck(t *testing.T) { db.SetSettings(DBSettings{ReadFromInprocessPeriod: 0}) db.Connect(dbaddress) defer cleanup() inputParams := struct { - Id int + Id int Params struct { DelaySec int } }{} inputParams.Id = 1 - inputParams.Params.DelaySec=0 - + inputParams.Params.DelaySec = 0 db.insertRecord(dbname, collection, &rec1) - db.ProcessRequest(dbname, collection, groupId, "next", "") - bparam,_:= json.Marshal(&inputParams) - db.ProcessRequest(dbname, collection, groupId, "negackimage", string(bparam)) - res, err := db.ProcessRequest(dbname, collection, groupId, "next", "") // first time image from negack - _, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "") // second time nothing + db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) + bparam, _ := json.Marshal(&inputParams) + + db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "negackimage", ExtraParam: string(bparam)}) + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) // first time image from negack + _, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) // second time nothing assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) assert.NotNil(t, err1) - if err1!=nil { + if err1 != nil { assert.Equal(t, utils.StatusNoData, err1.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"\"}", err1.Error()) } @@ -932,11 +929,12 @@ func TestMongoDBGetNextClearsInprocessAfterReset(t *testing.T) { defer cleanup() err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(dbname, collection, groupId, "next", "0_1") - res1, err1 := db.ProcessRequest(dbname, collection, groupId, "next", "0_1") - db.ProcessRequest(dbname, collection, groupId, "resetcounter", "0") - res2, err2 := db.ProcessRequest(dbname, collection, groupId, "next", "0_1") - res3, err3 := db.ProcessRequest(dbname, collection, groupId, "next", "0_1") + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) + res1, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) + + db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "resetcounter", ExtraParam: "0"}) + res2, err2 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) + res3, err3 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) assert.Nil(t, err) assert.Nil(t, err1) @@ -946,4 +944,4 @@ func TestMongoDBGetNextClearsInprocessAfterReset(t *testing.T) { assert.Equal(t, string(rec1_expect), string(res1)) assert.Equal(t, string(rec1_expect), string(res2)) assert.Equal(t, string(rec1_expect), string(res3)) -} \ No newline at end of file +} diff --git a/broker/src/asapo_broker/server/get_commands_test.go b/broker/src/asapo_broker/server/get_commands_test.go index c0a4eba12..08f789619 100644 --- a/broker/src/asapo_broker/server/get_commands_test.go +++ b/broker/src/asapo_broker/server/get_commands_test.go @@ -56,7 +56,7 @@ var testsGetCommand = []struct { func (suite *GetCommandsTestSuite) TestGetCommandsCallsCorrectRoutine() { for _, test := range testsGetCommand { - suite.mock_db.On("ProcessRequest", expectedDBName, test.substream, test.groupid, test.command, test.externalParam).Return([]byte("Hello"), nil) + suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, DbCollectionName: test.substream, GroupId: test.groupid, Op: test.command, ExtraParam: test.externalParam}).Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request "+test.command))) w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + test.reqString+correctTokenSuffix+test.queryParams) suite.Equal(http.StatusOK, w.Code, test.command+ " OK") diff --git a/broker/src/asapo_broker/server/get_next.go b/broker/src/asapo_broker/server/get_next.go index 8297dc973..9e588fb9a 100644 --- a/broker/src/asapo_broker/server/get_next.go +++ b/broker/src/asapo_broker/server/get_next.go @@ -4,8 +4,6 @@ import ( "net/http" ) - - func extractResend(r *http.Request) (string) { keys := r.URL.Query() resend := keys.Get("resend_nacks") @@ -18,7 +16,6 @@ func extractResend(r *http.Request) (string) { return resend_params } - func routeGetNext(w http.ResponseWriter, r *http.Request) { processRequest(w, r, "next", extractResend(r), true) } diff --git a/broker/src/asapo_broker/server/post_op_image_test.go b/broker/src/asapo_broker/server/post_op_image_test.go index fbba23069..19d70f940 100644 --- a/broker/src/asapo_broker/server/post_op_image_test.go +++ b/broker/src/asapo_broker/server/post_op_image_test.go @@ -34,7 +34,7 @@ func TestImageOpTestSuite(t *testing.T) { func (suite *ImageOpTestSuite) TestAckImageOpOK() { query_str := "{\"Id\":1,\"Op\":\"ackimage\"}" - suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "ackimage", query_str).Return([]byte(""), nil) + suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, DbCollectionName: expectedSubstream, GroupId: expectedGroupID, Op: "ackimage", ExtraParam: query_str}).Return([]byte(""), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request ackimage"))) w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/1" + correctTokenSuffix,"POST",query_str) suite.Equal(http.StatusOK, w.Code, "ackimage OK") diff --git a/broker/src/asapo_broker/server/post_query_images_test.go b/broker/src/asapo_broker/server/post_query_images_test.go index 51ed9c45f..49010054b 100644 --- a/broker/src/asapo_broker/server/post_query_images_test.go +++ b/broker/src/asapo_broker/server/post_query_images_test.go @@ -34,7 +34,8 @@ func TestQueryTestSuite(t *testing.T) { func (suite *QueryTestSuite) TestQueryOK() { query_str := "aaaa" - suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, "", "queryimages", query_str).Return([]byte("{}"), nil) + + suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, DbCollectionName: expectedSubstream,Op: "queryimages", ExtraParam: query_str}).Return([]byte("{}"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request queryimages"))) w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedStream+"/"+expectedSubstream+"/0/queryimages"+correctTokenSuffix, "POST", query_str) diff --git a/broker/src/asapo_broker/server/post_reset_counter_test.go b/broker/src/asapo_broker/server/post_reset_counter_test.go index bb2f2b2a2..d35f116a1 100644 --- a/broker/src/asapo_broker/server/post_reset_counter_test.go +++ b/broker/src/asapo_broker/server/post_reset_counter_test.go @@ -33,7 +33,9 @@ func TestResetCounterTestSuite(t *testing.T) { } func (suite *ResetCounterTestSuite) TestResetCounterOK() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "resetcounter", "10").Return([]byte(""), nil) + expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedSubstream, GroupId:expectedGroupID, Op: "resetcounter", ExtraParam: "10"} + suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), nil) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request resetcounter"))) w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedStream+"/"+expectedSubstream+"/"+expectedGroupID+"/resetcounter"+correctTokenSuffix+"&value=10", "POST") diff --git a/broker/src/asapo_broker/server/process_request.go b/broker/src/asapo_broker/server/process_request.go index d65dce774..5b4bf2f3c 100644 --- a/broker/src/asapo_broker/server/process_request.go +++ b/broker/src/asapo_broker/server/process_request.go @@ -24,8 +24,6 @@ func extractRequestParameters(r *http.Request, needGroupID bool) (string, string return db_name, stream, substream, group_id, ok1 && ok2 && ok3 && ok4 } -var Sink bool - func IsLetterOrNumbers(s string) bool { for _, r := range s { if (r < 'a' || r > 'z') && (r < 'A' || r > 'Z') && (r<'0' || r>'9') { @@ -69,11 +67,18 @@ func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_par return } - if datasetRequested(r) { - op = op + "_dataset" + request := database.Request{} + request.DbName = db_name+"_"+stream + request.Op = op + request.ExtraParam = extra_param + request.DbCollectionName = substream + request.GroupId = group_id + if yes, minSize := datasetRequested(r); yes { + request.DatasetOp = true + request.MinDatasetSize = minSize } - answer, code := processRequestInDb(db_name+"_"+stream, substream, group_id, op, extra_param) + answer, code := processRequestInDb(request) w.WriteHeader(code) w.Write(answer) } @@ -110,10 +115,10 @@ func reconnectIfNeeded(db_error error) { } } -func processRequestInDb(db_name string, data_collection_name string, group_id string, op string, extra_param string) (answer []byte, code int) { +func processRequestInDb(request database.Request) (answer []byte, code int) { statistics.IncreaseCounter() - answer, err := db.ProcessRequest(db_name, data_collection_name, group_id, op, extra_param) - log_str := "processing request " + op + " in " + db_name + " at " + settings.GetDatabaseServer() + answer, err := db.ProcessRequest(request) + log_str := "processing request " + request.Op + " in " + request.DbName + " at " + settings.GetDatabaseServer() if err != nil { go reconnectIfNeeded(err) return returnError(err, log_str) diff --git a/broker/src/asapo_broker/server/process_request_test.go b/broker/src/asapo_broker/server/process_request_test.go index 1757decba..5aa7b28fc 100644 --- a/broker/src/asapo_broker/server/process_request_test.go +++ b/broker/src/asapo_broker/server/process_request_test.go @@ -123,7 +123,10 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithNoToken() { } func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "").Return([]byte(""), + + expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedSubstream, GroupId:expectedGroupID, Op: "next"} + + suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), &database.DBError{utils.StatusNoData, ""}) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next"))) @@ -134,7 +137,10 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() } func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "").Return([]byte(""), + + expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedSubstream, GroupId:expectedGroupID, Op: "next"} + + suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), &database.DBError{utils.StatusServiceUnavailable, ""}) logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next"))) @@ -147,7 +153,11 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() { } func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "").Return([]byte(""), errors.New("")) + + expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedSubstream, GroupId:expectedGroupID, Op: "next"} + + + suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), errors.New("")) logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next"))) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("reconnected"))) @@ -159,7 +169,11 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() { } func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next", "").Return([]byte("Hello"), nil) + + expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedSubstream, GroupId:expectedGroupID, Op: "next"} + suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil) + + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName))) doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/next" + correctTokenSuffix) @@ -173,8 +187,11 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWrongGroupID() { } func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedSubstream, expectedGroupID, "next_dataset", "").Return([]byte("Hello"), nil) - logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next_dataset in "+expectedDBName))) + + expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedSubstream, GroupId:expectedGroupID, DatasetOp:true, Op: "next"} + suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil) + + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName))) doRequest("/database/" + expectedBeamtimeId + "/" + expectedStream + "/" + expectedSubstream + "/" + expectedGroupID + "/next" + correctTokenSuffix + "&dataset=true") } diff --git a/broker/src/asapo_broker/server/request_common.go b/broker/src/asapo_broker/server/request_common.go index b83147010..28f2bc38d 100644 --- a/broker/src/asapo_broker/server/request_common.go +++ b/broker/src/asapo_broker/server/request_common.go @@ -4,6 +4,7 @@ import ( "asapo_common/logger" "errors" "net/http" + "strconv" ) func writeAuthAnswer(w http.ResponseWriter, requestName string, db_name string, err string) { @@ -13,7 +14,7 @@ func writeAuthAnswer(w http.ResponseWriter, requestName string, db_name string, w.Write([]byte(err)) } -func ValueTrue(r *http.Request, key string) bool { +func valueTrue(r *http.Request, key string) bool { val := r.URL.Query().Get(key) if len(val) == 0 { return false @@ -25,8 +26,21 @@ func ValueTrue(r *http.Request, key string) bool { return false } -func datasetRequested(r *http.Request) bool { - return ValueTrue(r, "dataset") +func valueInt(r *http.Request, key string) int { + val := r.URL.Query().Get(key) + if len(val) == 0 { + return 0 + } + + i, err := strconv.Atoi(val) + if err != nil { + return 0 + } + return i +} + +func datasetRequested(r *http.Request) (bool,int) { + return valueTrue(r, "dataset"),valueInt(r,"minsize") } func testAuth(r *http.Request, beamtime_id string) error { diff --git a/examples/consumer/getnext_broker/getnext_broker.cpp b/examples/consumer/getnext_broker/getnext_broker.cpp index 51c5e1694..e8c9a826e 100644 --- a/examples/consumer/getnext_broker/getnext_broker.cpp +++ b/examples/consumer/getnext_broker/getnext_broker.cpp @@ -132,7 +132,7 @@ StartThreads(const Args& params, std::vector<int>* nfiles, std::vector<int>* err bool isFirstFile = true; while (true) { if (params.datasets) { - auto dataset = broker->GetNextDataset(group_id, &err); + auto dataset = broker->GetNextDataset(group_id, 0, &err); if (err == nullptr) { for (auto& fi : dataset.content) { (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp index dc1ecce0f..fdad4330e 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.cpp +++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp @@ -207,7 +207,7 @@ void TestDataset(const std::unique_ptr<asapo::DataBroker>& broker, const std::st asapo::FileInfo fi; asapo::Error err; - auto dataset = broker->GetNextDataset(group_id, &err); + auto dataset = broker->GetNextDataset(group_id, 0, &err); if (err) { std::cout << err->Explain() << std::endl; } @@ -223,18 +223,18 @@ void TestDataset(const std::unique_ptr<asapo::DataBroker>& broker, const std::st M_AssertEq("hello1", std::string(data.get(), data.get() + dataset.content[0].size)); - dataset = broker->GetLastDataset(group_id, &err); + dataset = broker->GetLastDataset(group_id, 0, &err); M_AssertTrue(err == nullptr, "GetLast no error"); M_AssertTrue(dataset.content[0].name == "10_1", "GetLastDataset filename"); M_AssertTrue(dataset.content[0].metadata == "{\"test\":10}", "GetLastDataset metadata"); - dataset = broker->GetNextDataset(group_id, &err); + dataset = broker->GetNextDataset(group_id, 0, &err); M_AssertTrue(err != nullptr, "GetNextDataset2 error"); - dataset = broker->GetLastDataset(group_id, &err); + dataset = broker->GetLastDataset(group_id,0, &err); M_AssertTrue(err == nullptr, "GetLastDataset2 no error"); - dataset = broker->GetDatasetById(8, group_id, &err); + dataset = broker->GetDatasetById(8, group_id, 0, &err); M_AssertTrue(err == nullptr, "GetDatasetById error"); M_AssertTrue(dataset.content[2].name == "8_3", "GetDatasetById filename"); diff --git a/tests/manual/performance_broker_receiver/getlast_broker.cpp b/tests/manual/performance_broker_receiver/getlast_broker.cpp index 658af3435..c3a285c54 100644 --- a/tests/manual/performance_broker_receiver/getlast_broker.cpp +++ b/tests/manual/performance_broker_receiver/getlast_broker.cpp @@ -81,7 +81,7 @@ std::vector<std::thread> StartThreads(const Args& params, while (std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now() - start).count() < params.timeout_ms) { if (params.datasets) { - auto dataset = broker->GetLastDataset(group_id, &err); + auto dataset = broker->GetLastDataset(group_id, 0, &err); if (err == nullptr) { for (auto& fi : dataset.content) { (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; -- GitLab