diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 939c7997728b770d1e825870356bd07a47239b4e..b59bf4c8bba62a2c0d462cb0c49c79450d6c65b4 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -69,6 +69,7 @@ const data_collection_name_prefix = "data_" const acks_collection_name_prefix = "acks_" const inprocess_collection_name_prefix = "inprocess_" const meta_collection_name = "meta" +const streams_info = "streams" const pointer_collection_name = "current_location" const pointer_field_name = "current_pointer" const last_message_collection_name = "last_messages" diff --git a/broker/src/asapo_broker/database/mongodb_streams.go b/broker/src/asapo_broker/database/mongodb_streams.go index b57f9973ddfa997af4706f75ece08e6d5969fa61..0d012de402f44a767e94cc8361e958a5ccdbbcc0 100644 --- a/broker/src/asapo_broker/database/mongodb_streams.go +++ b/broker/src/asapo_broker/database/mongodb_streams.go @@ -1,18 +1,30 @@ -//+build !test +//go:build !test +// +build !test package database import ( + log "asapo_common/logger" "asapo_common/utils" "context" + "encoding/json" "errors" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" "sort" + "strconv" "strings" "sync" "time" ) +type GetStreamsParams struct { + From string `json:"from"` + Filter string `json:"filter"` + Detailed string `json:"detailed"` +} + type StreamInfo struct { LastId int64 `json:"lastId"` Name string `json:"name"` @@ -29,13 +41,12 @@ type StreamsRecord struct { type Streams struct { records map[string]StreamsRecord lastUpdated map[string]time.Time - lastSynced map[string]time.Time } -var streams = Streams{lastSynced: make(map[string]time.Time, 0),lastUpdated: make(map[string]time.Time, 0), records: make(map[string]StreamsRecord, 0)} +var streams = Streams{lastUpdated: make(map[string]time.Time, 0), records: make(map[string]StreamsRecord, 0)} var streamsLock sync.Mutex -func (ss *Streams) tryGetFromCache(db_name string, updatePeriodMs int) (StreamsRecord, error) { +func (ss *Streams) getStreamsInfoFromCache(db_name string, updatePeriodMs int) (StreamsRecord, error) { if time.Now().Sub(ss.lastUpdated[db_name]).Milliseconds() >= int64(updatePeriodMs) { return StreamsRecord{}, errors.New("cache expired") } @@ -43,12 +54,14 @@ func (ss *Streams) tryGetFromCache(db_name string, updatePeriodMs int) (StreamsR if !ok { return StreamsRecord{}, errors.New("no records for " + db_name) } - res :=StreamsRecord{} - utils.DeepCopy(rec,&res) + res := StreamsRecord{} + utils.DeepCopy(rec, &res) return res, nil } -func readStreams(db *Mongodb, db_name string) (StreamsRecord, error) { +// Return StreamsRecord with names of streams filled from DB +// Function query list of collections for given db_name and them filter it by name +func readAllStreamNames(db *Mongodb, db_name string) (StreamsRecord, error) { database := db.client.Database(db_name) result, err := database.ListCollectionNames(context.TODO(), bson.D{}) if err != nil { @@ -57,7 +70,7 @@ func readStreams(db *Mongodb, db_name string) (StreamsRecord, error) { var rec = StreamsRecord{[]StreamInfo{}} for _, coll := range result { if strings.HasPrefix(coll, data_collection_name_prefix) { - sNameEncoded:= strings.TrimPrefix(coll, data_collection_name_prefix) + sNameEncoded := strings.TrimPrefix(coll, data_collection_name_prefix) si := StreamInfo{Name: decodeString(sNameEncoded)} rec.Streams = append(rec.Streams, si) } @@ -65,20 +78,30 @@ func readStreams(db *Mongodb, db_name string) (StreamsRecord, error) { return rec, nil } -func getCurrentStreams(db_name string) []StreamInfo { - ss, dbFound := streams.records[db_name] - currentStreams := []StreamInfo{} - if dbFound { - // sort streams by name - currentStreams = ss.Streams - sort.Slice(currentStreams, func(i, j int) bool { - return currentStreams[i].Name >= currentStreams[j].Name - }) +// Read stream info for all streams from streams_info collection +// If correction is empty or does not exist return empty array +// and create collecton with unique index `name` +func readStreamsInfo(db *Mongodb, db_name string) ([]StreamInfo, error) { + coll := db.client.Database(db_name).Collection(streams_info) + opts := options.Find().SetSort(bson.D{{"name", 1}}) // Order by name to simplify search + cursor, err := coll.Find(context.TODO(), bson.D{}, opts) + if err != nil { + log.Error("Get streams fails", err) + return []StreamInfo{}, err + } + var rec []StreamInfo + if err = cursor.All(context.TODO(), &rec); err != nil { + log.Error("Decoding streams fails", err) + return []StreamInfo{}, err + } + if len(rec) == 0 { + createIndexStreamInfoDB(db, db_name) } - return currentStreams + return rec, nil } func findStreamAmongCurrent(currentStreams []StreamInfo, record StreamInfo) (int, bool) { + // currentStreams is already ordered by Name ind := sort.Search(len(currentStreams), func(i int) bool { return currentStreams[i].Name >= record.Name }) @@ -124,7 +147,9 @@ func fillInfoFromLastRecord(db *Mongodb, db_name string, rec *StreamsRecord, rec return nil } -func updateStreamInfofromCurrent(currentStreams []StreamInfo, record StreamInfo, rec *StreamInfo) (found bool, updateFinished bool) { +// If stream name from record is found in currentStreams corresponding +// record get copied to rec. +func updateStreamInfoFromCurrent(currentStreams []StreamInfo, record StreamInfo, rec *StreamInfo) (found bool, updateFinished bool) { ind, found := findStreamAmongCurrent(currentStreams, record) if found { *rec = currentStreams[ind] @@ -135,20 +160,64 @@ func updateStreamInfofromCurrent(currentStreams []StreamInfo, record StreamInfo, return found, false } -func updateStreamInfos(db *Mongodb, db_name string, rec *StreamsRecord,forceSync bool) error { - currentStreams := getCurrentStreams(db_name) +// Create unique index `name` for collection of streams info +// If collection does not exist, DB will create it +func createIndexStreamInfoDB(db *Mongodb, db_name string) error { + coll := db.client.Database(db_name).Collection(streams_info) + + _, err := coll.Indexes().CreateOne( + context.Background(), + mongo.IndexModel{ + Keys: bson.D{{Key: "name", Value: 1}}, + Options: options.Index().SetUnique(true), + }, + ) + return err +} + +// Update streams_info collection with current StreamInfo +func updateStreamInfoDB(db *Mongodb, db_name string, rec StreamInfo) error { + coll := db.client.Database(db_name).Collection(streams_info) + + _, err := coll.InsertOne(context.TODO(), rec) + return err +} + +// Fill stream information from different sources to the rec. +// Stream information is taken from collection streams_info +// If stream is not finished, only timestamp is valid +// If stream is not found in streams_info information is +// extracted form data collections +// If detailed is True, information is extracted from data collection +func updateStreamInfos(db *Mongodb, db_name string, rec *StreamsRecord, detailed bool) error { + currentStreams, err := readStreamsInfo(db, db_name) + if err != nil { + return err + } + for i, record := range rec.Streams { - found, mayContinue := updateStreamInfofromCurrent(currentStreams, record, &rec.Streams[i]) - if mayContinue && !forceSync { + found, finished := updateStreamInfoFromCurrent(currentStreams, record, &rec.Streams[i]) + if found && (!detailed || finished) { // Don't need to update if stream is finished continue } - if !found || forceSync { // set timestamp + if !found { // set timestamp if err := fillInfoFromEarliestRecord(db, db_name, rec, record, i); err != nil { return err } } - if err := fillInfoFromLastRecord(db, db_name, rec, record, i); err != nil { // update last record (timestamp, stream finished flag) - return err + if detailed { // update last record (TimestampLast, stream finished flag) + if err := fillInfoFromLastRecord(db, db_name, rec, record, i); err != nil { + return err + } + } + if !found || rec.Streams[i].Finished { + // Inset stream info in DB if is it not yet there or stream is now finished + err := updateStreamInfoDB(db, db_name, rec.Streams[i]) + // Error may happen if two brokers works with the same stream and try to + // populate streams info. + if err != nil { + log.Debug("Update stream info in DB failed", err) + } } } return nil @@ -160,91 +229,93 @@ func sortRecords(rec *StreamsRecord) { }) } -func (ss *Streams) updateFromDb(db *Mongodb, db_name string) (StreamsRecord, error) { - rec, err := readStreams(db, db_name) +func getStreamsInfoFromDb(db *Mongodb, db_name string, detailed bool) (StreamsRecord, error) { + rec, err := readAllStreamNames(db, db_name) if err != nil { return StreamsRecord{}, err } - forceSync:= false - if time.Now().Sub(ss.lastSynced[db_name]).Seconds() > 5 { - forceSync = true - } - err = updateStreamInfos(db, db_name, &rec,forceSync) + err = updateStreamInfos(db, db_name, &rec, detailed) if err != nil { return StreamsRecord{}, err } - if forceSync { - ss.lastSynced[db_name] = time.Now() - } - sortRecords(&rec) + return rec, nil +} + +// Save StreamsRecord in global variable for caching purpose +func (ss *Streams) cacheStreams(db_name string, rec StreamsRecord) { if len(rec.Streams) > 0 { - res :=StreamsRecord{} - utils.DeepCopy(rec,&res) - ss.records[db_name] = res + result := StreamsRecord{} + utils.DeepCopy(rec, &result) + ss.records[db_name] = result ss.lastUpdated[db_name] = time.Now() } - return rec, nil } -func getFiltersFromString(filterString string) (string, string, error) { - firstStream, streamStatus, err := utils.DecodeTwoStrings(filterString) - if err!=nil { - return "", "", errors.New("wrong format: " + filterString) +// Convert json string filterString to extra requests parameters +func getFiltersFromString(filterString string) (string, string, bool, error) { + + var params GetStreamsParams + err := json.Unmarshal([]byte(filterString), ¶ms) + + if err != nil { + return "", "", true, errors.New("wrong format: " + filterString) } - if streamStatus == "" { - streamStatus = stream_filter_all + if params.Filter == "" { + params.Filter = stream_filter_all } - return firstStream, streamStatus, nil + detailed, _ := strconv.ParseBool(params.Detailed) + return params.From, params.Filter, detailed, nil } -func getStreamsParamsFromRequest(request Request) (string, string, error) { +// Return extra request parameters from request +func getStreamsParamsFromRequest(request Request) (string, string, bool, error) { if request.ExtraParam == "" { - return "", stream_filter_all, nil + return "", stream_filter_all, true, nil } - firstStream, streamStatus, err := getFiltersFromString(request.ExtraParam) + firstStream, streamStatus, detailed, err := getFiltersFromString(request.ExtraParam) if err != nil { - return "", "", err + return "", "", false, err } - err = checkStreamstreamStatus(streamStatus) + err = checkStreamStatus(streamStatus) if err != nil { - return "", "", err + return "", "", false, err } - return firstStream, streamStatus, nil + return firstStream, streamStatus, detailed, nil } -func checkStreamstreamStatus(streamStatus string) error { +func checkStreamStatus(streamStatus string) error { if !utils.StringInSlice(streamStatus, []string{stream_filter_all, stream_filter_finished, stream_filter_unfinished}) { return errors.New("getStreamsParamsFromRequest: wrong streamStatus " + streamStatus) } return nil } -func keepStream(rec StreamInfo, streamStatus string) bool { +func filterStreamsByStatus(rec StreamInfo, streamStatus string) bool { return (rec.Finished && streamStatus == stream_filter_finished) || (!rec.Finished && streamStatus == stream_filter_unfinished) } func filterStreams(rec StreamsRecord, firstStream string, streamStatus string) []StreamInfo { - limitedStreams := limitStreams(rec, firstStream) + limitedStreams := filterStreamsByName(rec, firstStream) if streamStatus == stream_filter_all { return limitedStreams } nextStreams := limitedStreams[:0] for _, rec := range limitedStreams { - if keepStream(rec, streamStatus) { + if filterStreamsByStatus(rec, streamStatus) { nextStreams = append(nextStreams, rec) } } return nextStreams } -func limitStreams(rec StreamsRecord, firstStream string) []StreamInfo { +func filterStreamsByName(rec StreamsRecord, firstStream string) []StreamInfo { if firstStream != "" { ind := len(rec.Streams) for i, rec := range rec.Streams { @@ -259,22 +330,25 @@ func limitStreams(rec StreamsRecord, firstStream string) []StreamInfo { } func (ss *Streams) getStreams(db *Mongodb, request Request) (StreamsRecord, error) { - firstStream, streamStatus, err := getStreamsParamsFromRequest(request) + firstStream, streamStatus, detailed, err := getStreamsParamsFromRequest(request) if err != nil { return StreamsRecord{}, err } streamsLock.Lock() - rec, err := ss.tryGetFromCache(request.DbName(), db.settings.UpdateStreamCachePeriodMs) + rec, err := ss.getStreamsInfoFromCache(request.DbName(), db.settings.UpdateStreamCachePeriodMs) + streamsLock.Unlock() if err != nil { - rec, err = ss.updateFromDb(db, request.DbName()) + rec, err = getStreamsInfoFromDb(db, request.DbName(), detailed) } - streamsLock.Unlock() if err != nil { return StreamsRecord{}, err } - rec.Streams = filterStreams(rec, firstStream, streamStatus) + streamsLock.Lock() + ss.cacheStreams(request.DbName(), rec) + streamsLock.Unlock() + rec.Streams = filterStreams(rec, firstStream, streamStatus) return rec, nil } diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index d7f38ad4745aac84d3a3da6734a171fb63c2b646..e9c2fca30140b26fd6af4e81f269a22914e28264 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -1,3 +1,4 @@ +//go:build integration_tests // +build integration_tests package database @@ -20,8 +21,8 @@ type TestRecord struct { } type TestRecordStreamBtMeta struct { - ID string `bson:"_id" json:"_id"` - Meta string `bson:"meta" json:"meta"` + ID string `bson:"_id" json:"_id"` + Meta string `bson:"meta" json:"meta"` } var recbt = TestRecordStreamBtMeta{"bt", "meta_bt"} @@ -48,7 +49,7 @@ const metaID = "bt" const badSymbolsDb = `/\."$` const badSymbolsCol = `$` const badSymbolsDbEncoded = "%2F%5C%2E%22%24" -const badSymbolsColEncoded ="%24" +const badSymbolsColEncoded = "%24" var empty_next = map[string]string{"next_stream": ""} @@ -85,7 +86,6 @@ func cleanupWithName(name string) { db.Close() } - // these are the integration tests. They assume mongo db is runnig on 127.0.0.1:27027 // test names should contain MongoDB*** so that go test could find them: // go_integration_test(${TARGET_NAME}-connectdb "./..." "MongoDBConnect") @@ -102,17 +102,17 @@ func TestMongoDBConnectOK(t *testing.T) { } func TestMongoDBGetNextErrorWhenNotConnected(t *testing.T) { - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) } func TestMongoDBGetMetaErrorWhenNotConnected(t *testing.T) { - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "meta", ExtraParam: "0"}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, Op: "meta", ExtraParam: "0"}) assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) } func TestMongoDBQueryMessagesErrorWhenNotConnected(t *testing.T) { - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "querymessages", ExtraParam: "0"}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, Op: "querymessages", ExtraParam: "0"}) assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) } @@ -126,7 +126,7 @@ func TestMongoDBGetNextErrorWhenWrongDatabasename(t *testing.T) { func TestMongoDBGetNextErrorWhenNonExistingDatacollectionname(t *testing.T) { db.Connect(dbaddress) defer cleanup() - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: "bla", GroupId: groupId, Op: "next"}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: "bla", GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_stream\":\"\"}", err.Error()) } @@ -134,7 +134,7 @@ func TestMongoDBGetNextErrorWhenNonExistingDatacollectionname(t *testing.T) { func TestMongoDBGetLastErrorWhenNonExistingDatacollectionname(t *testing.T) { db.Connect(dbaddress) defer cleanup() - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: "bla", GroupId: groupId, Op: "last"}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: "bla", GroupId: groupId, Op: "last"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_stream\":\"\"}", err.Error()) } @@ -142,7 +142,7 @@ func TestMongoDBGetLastErrorWhenNonExistingDatacollectionname(t *testing.T) { func TestMongoDBGetByIdErrorWhenNoData(t *testing.T) { db.Connect(dbaddress) defer cleanup() - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "2"}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "2"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":0,\"next_stream\":\"\"}", err.Error()) @@ -152,7 +152,7 @@ func TestMongoDBGetNextErrorWhenRecordNotThereYet(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec2) - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":2,\"next_stream\":\"\"}", err.Error()) } @@ -161,7 +161,7 @@ func TestMongoDBGetNextOK(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -172,8 +172,8 @@ func TestMongoDBGetNextErrorOnFinishedStream(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec_finished) - db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message) @@ -185,23 +185,21 @@ func TestMongoDBGetNextErrorOnFinishedStreamAlways(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec_finished) - db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) - db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) + db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message) } - - func TestMongoDBGetByIdErrorOnFinishedStream(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec_finished) - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "2"}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "2"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message) @@ -213,7 +211,7 @@ func TestMongoDBGetLastErrorOnFinishedStream(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec_finished) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "last"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "last"}) fmt.Println(string(res)) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message) @@ -223,8 +221,8 @@ func TestMongoDBGetNextErrorOnNoMoreData(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"\"}", err.(*DBError).Message) @@ -235,8 +233,8 @@ func TestMongoDBGetNextCorrectOrder(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec2) db.insertRecord(dbname, collection, &rec1) - res1, _ := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) - res2, _ := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + res1, _ := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) + res2, _ := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, string(rec1_expect), string(res1)) assert.Equal(t, string(rec2_expect), string(res2)) } @@ -273,7 +271,7 @@ func getRecords(n int, resend bool) []int { for i := 0; i < n; i++ { go func() { defer wg.Done() - res_bin, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: extra_param}) + res_bin, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: extra_param}) if err != nil { fmt.Println("error at read ", i) } @@ -318,13 +316,13 @@ func TestMongoDBGetLastAfterErasingDatabase(t *testing.T) { db.Connect(dbaddress) defer cleanup() insertRecords(10) - db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) db.dropDatabase(dbname) db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "last", ExtraParam: "0"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "last", ExtraParam: "0"}) assert.Nil(t, err) assert.Equal(t, string(rec2_expect), string(res)) } @@ -333,7 +331,7 @@ func TestMongoDBGetNextAfterErasingDatabase(t *testing.T) { db.Connect(dbaddress) defer cleanup() insertRecords(200) - db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) db.dropDatabase(dbname) n := 100 @@ -346,10 +344,10 @@ func TestMongoDBGetNextEmptyAfterErasingDatabase(t *testing.T) { db.Connect(dbaddress) defer cleanup() insertRecords(10) - db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) db.dropDatabase(dbname) - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_stream\":\"\"}", err.Error()) } @@ -359,7 +357,7 @@ func TestMongoDBgetRecordByID(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec1) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "1"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "1"}) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -368,7 +366,7 @@ func TestMongoDBgetRecordByIDFails(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "2"}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "2"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":1,\"next_stream\":\"\"}", err.Error()) } @@ -377,7 +375,7 @@ func TestMongoDBGetRecordNext(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -388,8 +386,8 @@ func TestMongoDBGetRecordNextMultipleCollections(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection2, &rec_dataset1) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) - res_string, err2 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection2, GroupId: groupId, Op: "next", DatasetOp: true}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) + res_string, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection2, GroupId: groupId, Op: "next", DatasetOp: true}) var res_ds TestDataset json.Unmarshal(res_string, &res_ds) @@ -405,7 +403,7 @@ func TestMongoDBGetRecordID(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "1"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "1"}) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -414,7 +412,7 @@ func TestMongoDBWrongOp(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "bla"}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "bla"}) assert.NotNil(t, err) } @@ -424,7 +422,7 @@ func TestMongoDBGetRecordLast(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "last", ExtraParam: "0"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "last", ExtraParam: "0"}) assert.Nil(t, err) assert.Equal(t, string(rec2_expect), string(res)) } @@ -435,13 +433,13 @@ func TestMongoDBGetNextAfterGetLastCorrect(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "last", ExtraParam: "0"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "last", ExtraParam: "0"}) assert.Nil(t, err) assert.Equal(t, string(rec2_expect), string(res)) db.insertRecord(dbname, collection, &rec3) - res, err = db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + res, err = db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) @@ -451,30 +449,30 @@ func TestMongoDBGetGetLastInGroupCorrect(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) // to check it does not influence groupedlast + db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) // to check it does not influence groupedlast - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) -// first record - ok, then error - res, err = db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + // first record - ok, then error + res, err = db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) assert.NotNil(t, err) if err != nil { assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"\"}", err.(*DBError).Message) } -// second record - ok, then error + // second record - ok, then error db.insertRecord(dbname, collection, &rec2) - res, err = db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + res, err = db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) assert.Nil(t, err) assert.Equal(t, string(rec2_expect), string(res)) - res, err = db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + res, err = db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) assert.NotNil(t, err) -// stream finished - immediately error + // stream finished - immediately error db.insertRecord(dbname, collection, &rec_finished3) - res, err = db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + res, err = db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) assert.NotNil(t, err) if err != nil { assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) @@ -489,9 +487,9 @@ func TestMongoDBGetGetLastInGroupImmediateErrorOnFinishStream(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) db.insertRecord(dbname, collection, &rec_finished3) - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) assert.NotNil(t, err) - _, err = db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + _, err = db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) assert.NotNil(t, err) if err != nil { assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) @@ -499,8 +497,6 @@ func TestMongoDBGetGetLastInGroupImmediateErrorOnFinishStream(t *testing.T) { } } - - func TestMongoDBGetSize(t *testing.T) { db.Connect(dbaddress) defer cleanup() @@ -508,7 +504,7 @@ func TestMongoDBGetSize(t *testing.T) { db.insertRecord(dbname, collection, &rec2) db.insertRecord(dbname, collection, &rec3) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "size"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, Op: "size"}) assert.Nil(t, err) assert.Equal(t, string(recs1_expect), string(res)) } @@ -519,7 +515,7 @@ func TestMongoDBGetSizeWithFinishedStream(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec_finished) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "size"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, Op: "size"}) assert.Nil(t, err) var rec_expect, _ = json.Marshal(&SizeRecord{1}) assert.Equal(t, string(rec_expect), string(res)) @@ -530,10 +526,10 @@ func TestMongoDBGetSizeForDatasets(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec1) - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "size", ExtraParam: "false"}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, Op: "size", ExtraParam: "false"}) assert.Equal(t, utils.StatusWrongInput, err.(*DBError).Code) - _, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "size", ExtraParam: "true"}) + _, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, Op: "size", ExtraParam: "true"}) assert.Equal(t, utils.StatusWrongInput, err1.(*DBError).Code) } @@ -543,7 +539,7 @@ func TestMongoDBGetSizeForDatasetsWithFinishedStream(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1_incomplete) db.insertRecord(dbname, collection, &rec_finished) - res, _ := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "size", ExtraParam: "true"}) + res, _ := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, Op: "size", ExtraParam: "true"}) var rec_expect, _ = json.Marshal(&SizeRecord{1}) assert.Equal(t, string(rec_expect), string(res)) @@ -558,7 +554,7 @@ func TestMongoDBGetSizeDataset(t *testing.T) { size2_expect, _ := json.Marshal(SizeRecord{2}) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "size", ExtraParam: "true"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, Op: "size", ExtraParam: "true"}) assert.Nil(t, err) assert.Equal(t, string(size2_expect), string(res)) } @@ -567,7 +563,7 @@ func TestMongoDBGetSizeNoRecords(t *testing.T) { db.Connect(dbaddress) defer cleanup() - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "size"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, Op: "size"}) assert.Nil(t, err) assert.Equal(t, string(recs2_expect), string(res)) } @@ -585,7 +581,7 @@ func TestMongoPingNotConected(t *testing.T) { } func TestMongoDBgetRecordByIDNotConnected(t *testing.T) { - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "1"}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "1"}) assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) } @@ -595,15 +591,15 @@ func TestMongoDBResetCounter(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res1, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Nil(t, err1) assert.Equal(t, string(rec1_expect), string(res1)) - _, err_reset := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "resetcounter", ExtraParam: "1"}) + _, err_reset := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "resetcounter", ExtraParam: "1"}) assert.Nil(t, err_reset) - res2, err2 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + res2, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Nil(t, err2) assert.Equal(t, string(rec2_expect), string(res2)) @@ -615,7 +611,7 @@ func TestMongoDBGetMetaBtOK(t *testing.T) { rec_expect, _ := json.Marshal(recbt.Meta) db.insertMeta(dbname, &recbt) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: "whatever", Op: "meta", ExtraParam: "0"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: "whatever", Op: "meta", ExtraParam: "0"}) assert.Nil(t, err) assert.Equal(t, string(rec_expect), string(res)) @@ -627,7 +623,7 @@ func TestMongoDBGetMetaStOK(t *testing.T) { rec_expect, _ := json.Marshal(recst.Meta) db.insertMeta(dbname, &recst) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "meta", ExtraParam: "1"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, Op: "meta", ExtraParam: "1"}) assert.Nil(t, err) assert.Equal(t, string(rec_expect), string(res)) @@ -637,7 +633,7 @@ func TestMongoDBGetMetaErr(t *testing.T) { db.Connect(dbaddress) defer cleanup() - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "meta", ExtraParam: "1"}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, Op: "meta", ExtraParam: "1"}) assert.NotNil(t, err) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) } @@ -713,7 +709,7 @@ func TestMongoDBQueryMessagesOK(t *testing.T) { // continue // } - res_string, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "querymessages", ExtraParam: test.query}) + res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, Op: "querymessages", ExtraParam: test.query}) var res []TestRecordMeta json.Unmarshal(res_string, &res) // fmt.Println(string(res_string)) @@ -732,7 +728,7 @@ func TestMongoDBQueryMessagesOnEmptyDatabase(t *testing.T) { db.Connect(dbaddress) defer cleanup() for _, test := range tests { - res_string, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "querymessages", ExtraParam: test.query}) + res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, Op: "querymessages", ExtraParam: test.query}) var res []TestRecordMeta json.Unmarshal(res_string, &res) assert.Equal(t, 0, len(res)) @@ -758,7 +754,7 @@ func TestMongoDBGetDataset(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1) - res_string, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true}) + res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true}) assert.Nil(t, err) @@ -774,7 +770,7 @@ func TestMongoDBNoDataOnNotCompletedFirstDataset(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1_incomplete) - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true}) assert.Equal(t, utils.StatusPartialData, err.(*DBError).Code) var res TestDataset @@ -789,8 +785,8 @@ func TestMongoDBNoDataOnNotCompletedNextDataset(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1_incomplete) db.insertRecord(dbname, collection, &rec_dataset2_incomplete) - _, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true}) - _, err2 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true}) + _, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true}) + _, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true}) assert.Equal(t, utils.StatusPartialData, err1.(*DBError).Code) assert.Equal(t, utils.StatusPartialData, err2.(*DBError).Code) @@ -806,7 +802,7 @@ func TestMongoDBGetRecordLastDataSetSkipsIncompleteSets(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1) db.insertRecord(dbname, collection, &rec_dataset2) - res_string, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"}) + res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"}) assert.Nil(t, err) @@ -823,7 +819,7 @@ func TestMongoDBGetRecordLastDataSetReturnsIncompleteSets(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1) db.insertRecord(dbname, collection, &rec_dataset2) - res_string, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "last", + res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "last", DatasetOp: true, MinDatasetSize: 3, ExtraParam: "0"}) assert.Nil(t, err) @@ -841,7 +837,7 @@ func TestMongoDBGetRecordLastDataSetSkipsIncompleteSetsWithMinSize(t *testing.T) db.insertRecord(dbname, collection, &rec_dataset1) db.insertRecord(dbname, collection, &rec_dataset2_incomplete3) - res_string, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "last", + res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "last", DatasetOp: true, MinDatasetSize: 3, ExtraParam: "0"}) assert.Nil(t, err) @@ -858,7 +854,7 @@ func TestMongoDBGetRecordLastDataSetWithFinishedStream(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1) db.insertRecord(dbname, collection, &rec_finished) - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "last", + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"}) assert.NotNil(t, err) @@ -875,7 +871,7 @@ func TestMongoDBGetRecordLastDataSetWithIncompleteDatasetsAndFinishedStreamRetur db.insertRecord(dbname, collection, &rec_dataset1_incomplete) db.insertRecord(dbname, collection, &rec_finished) - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "last", + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "last", DatasetOp: true, MinDatasetSize: 2, ExtraParam: "0"}) assert.NotNil(t, err) @@ -892,7 +888,7 @@ func TestMongoDBGetRecordLastDataSetOK(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1) db.insertRecord(dbname, collection, &rec_dataset3) - res_string, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"}) + res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"}) assert.Nil(t, err) @@ -907,7 +903,7 @@ func TestMongoDBGetDatasetID(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec_dataset1) - res_string, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "id", DatasetOp: true, ExtraParam: "1"}) + res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "id", DatasetOp: true, ExtraParam: "1"}) assert.Nil(t, err) @@ -923,7 +919,7 @@ func TestMongoDBErrorOnIncompleteDatasetID(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec_dataset1_incomplete) - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "id", DatasetOp: true, ExtraParam: "1"}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "id", DatasetOp: true, ExtraParam: "1"}) assert.Equal(t, utils.StatusPartialData, err.(*DBError).Code) @@ -939,7 +935,7 @@ func TestMongoDBOkOnIncompleteDatasetID(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec_dataset1_incomplete) - res_string, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "id", DatasetOp: true, MinDatasetSize: 3, ExtraParam: "1"}) + res_string, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "id", DatasetOp: true, MinDatasetSize: 3, ExtraParam: "1"}) assert.Nil(t, err) @@ -973,7 +969,6 @@ var testsStreams = []struct { {"", []Stream{{"ss1$", []TestRecord{rec2, rec1}}}, StreamsRecord{[]StreamInfo{StreamInfo{Name: "ss1$", Timestamp: 0, LastId: 2, TimestampLast: 1}}}, "one stream encoded", true}, {"ss2$", []Stream{{"ss1$", []TestRecord{rec1, rec2}}, {"ss2$", []TestRecord{rec2, rec3}}}, StreamsRecord{[]StreamInfo{StreamInfo{Name: "ss2$", Timestamp: 1, LastId: 3, TimestampLast: 2}}}, "with from encoded", true}, - } func TestMongoDBListStreams(t *testing.T) { @@ -986,7 +981,7 @@ func TestMongoDBListStreams(t *testing.T) { } var rec_streams_expect, _ = json.Marshal(test.expectedStreams) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: "0", Op: "streams", ExtraParam: utils.EncodeTwoStrings(test.from,"")}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: "0", Op: "streams", ExtraParam: `{"from":"` + test.from + `","filter":"","detailed":"True"}`}) if test.ok { assert.Nil(t, err, test.test) assert.Equal(t, string(rec_streams_expect), string(res), test.test) @@ -1006,7 +1001,7 @@ func TestMongoDBAckMessage(t *testing.T) { query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}" - request := Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str} + request := Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str} res, err := db.ProcessRequest(request) nacks, _ := db.getNacks(request, 0, 0) assert.Nil(t, err) @@ -1043,12 +1038,12 @@ func TestMongoDBNacks(t *testing.T) { db.insertRecord(dbname, collection, &rec_finished11) } if test.ackRecords { - db.ackRecord(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackmessage\"}"}) - db.ackRecord(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":3,\"Op\":\"ackmessage\"}"}) - db.ackRecord(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":4,\"Op\":\"ackmessage\"}"}) + db.ackRecord(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackmessage\"}"}) + db.ackRecord(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":3,\"Op\":\"ackmessage\"}"}) + db.ackRecord(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":4,\"Op\":\"ackmessage\"}"}) } - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "nacks", ExtraParam: test.rangeString}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "nacks", ExtraParam: test.rangeString}) if test.ok { assert.Nil(t, err, test.test) assert.Equal(t, test.resString, string(res), test.test) @@ -1078,12 +1073,12 @@ func TestMongoDBLastAcks(t *testing.T) { db.insertRecord(dbname, collection, &rec_finished11) } if test.ackRecords { - db.ackRecord(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackmessage\"}"}) - db.ackRecord(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":3,\"Op\":\"ackmessage\"}"}) - db.ackRecord(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":4,\"Op\":\"ackmessage\"}"}) + db.ackRecord(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackmessage\"}"}) + db.ackRecord(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":3,\"Op\":\"ackmessage\"}"}) + db.ackRecord(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":4,\"Op\":\"ackmessage\"}"}) } - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "lastack"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "lastack"}) assert.Nil(t, err, test.test) assert.Equal(t, test.resString, string(res), test.test) cleanup() @@ -1097,8 +1092,8 @@ func TestMongoDBGetNextUsesInprocessedImmedeatly(t *testing.T) { err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) - res1, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) assert.Nil(t, err) assert.Nil(t, err1) @@ -1111,9 +1106,9 @@ func TestMongoDBGetNextUsesInprocessedNumRetry(t *testing.T) { db.Connect(dbaddress) defer cleanup() err := db.insertRecord(dbname, collection, &rec1) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) - res1, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) - _, err2 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) + res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) + _, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) assert.Nil(t, err) assert.Nil(t, err1) @@ -1131,10 +1126,10 @@ func TestMongoDBGetNextUsesInprocessedAfterTimeout(t *testing.T) { defer cleanup() err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) - res1, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) + res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) time.Sleep(time.Second) - res2, err2 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) + res2, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) assert.Nil(t, err) assert.Nil(t, err1) assert.Nil(t, err2) @@ -1150,10 +1145,10 @@ func TestMongoDBGetNextReturnsToNormalAfterUsesInprocessed(t *testing.T) { err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) db.insertRecord(dbname, collection, &rec_finished3) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) time.Sleep(time.Second) - res1, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) - res2, err2 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) + res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) + res2, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) assert.Nil(t, err) assert.Nil(t, err1) assert.Nil(t, err2) @@ -1168,8 +1163,8 @@ func TestMongoDBGetNextUsesInprocessedImmedeatlyIfFinishedStream(t *testing.T) { defer cleanup() err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec_finished) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) - res1, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) assert.Nil(t, err) assert.Nil(t, err1) assert.Equal(t, string(rec1_expect), string(res)) @@ -1182,9 +1177,9 @@ func TestMongoDBGetNextUsesInprocessedImmedeatlyIfEndofStream(t *testing.T) { defer cleanup() err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) - res1, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) - res2, err2 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + res2, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) assert.Nil(t, err) assert.Nil(t, err1) assert.Nil(t, err2) @@ -1198,11 +1193,11 @@ func TestMongoDBAckDeletesInprocessed(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}" - db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}) - _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) assert.NotNil(t, err) if err != nil { assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) @@ -1216,12 +1211,11 @@ func TestMongoDBAckTwiceErrors(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec1) query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}" - db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}) - _,err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}) + db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}) + _, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}) assert.Equal(t, utils.StatusWrongInput, err.(*DBError).Code) } - func TestMongoDBNegAck(t *testing.T) { db.SetSettings(DBSettings{ReadFromInprocessPeriod: 0}) db.Connect(dbaddress) @@ -1236,14 +1230,14 @@ func TestMongoDBNegAck(t *testing.T) { inputParams.Params.DelayMs = 0 db.insertRecord(dbname, collection, &rec1) - db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) bparam, _ := json.Marshal(&inputParams) - db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "negackmessage", ExtraParam: string(bparam)}) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) // first time message from negack - _, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) // second time nothing - db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "negackmessage", ExtraParam: string(bparam)}) - _, err2 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) // second time nothing + db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "negackmessage", ExtraParam: string(bparam)}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) // first time message from negack + _, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) // second time nothing + db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "negackmessage", ExtraParam: string(bparam)}) + _, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next"}) // second time nothing assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) @@ -1262,12 +1256,12 @@ func TestMongoDBGetNextClearsInprocessAfterReset(t *testing.T) { defer cleanup() err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) - res1, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) + res1, err1 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) - db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "resetcounter", ExtraParam: "0"}) - res2, err2 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) - res3, err3 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) + db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "resetcounter", ExtraParam: "0"}) + res2, err2 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) + res3, err3 := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) assert.Nil(t, err) assert.Nil(t, err1) @@ -1283,13 +1277,12 @@ var testsDeleteStream = []struct { stream string params string ok bool - ok2 bool + ok2 bool message string }{ - {"test", "{\"ErrorOnNotExist\":true,\"DeleteMeta\":true}", true,false, "delete stream"}, - {"test", "{\"ErrorOnNotExist\":false,\"DeleteMeta\":true}", true, true,"delete stream"}, - {`test$/\ .%&?*#'`, "{\"ErrorOnNotExist\":false,\"DeleteMeta\":true}", true, true,"delete stream"}, - + {"test", "{\"ErrorOnNotExist\":true,\"DeleteMeta\":true}", true, false, "delete stream"}, + {"test", "{\"ErrorOnNotExist\":false,\"DeleteMeta\":true}", true, true, "delete stream"}, + {`test$/\ .%&?*#'`, "{\"ErrorOnNotExist\":false,\"DeleteMeta\":true}", true, true, "delete stream"}, } func TestDeleteStreams(t *testing.T) { @@ -1297,24 +1290,24 @@ func TestDeleteStreams(t *testing.T) { for _, test := range testsDeleteStream { db.Connect(dbaddress) db.insertRecord(dbname, encodeStringForColName(test.stream), &rec1) - db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: test.stream, GroupId: "123", Op: "next"}) + db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: test.stream, GroupId: "123", Op: "next"}) query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}" - request := Request{Beamtime:beamtime, DataSource:datasource, Stream: test.stream, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str} + request := Request{Beamtime: beamtime, DataSource: datasource, Stream: test.stream, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str} _, err := db.ProcessRequest(request) assert.Nil(t, err, test.message) - _, err = db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: test.stream, GroupId: "", Op: "delete_stream", ExtraParam: test.params}) + _, err = db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: test.stream, GroupId: "", Op: "delete_stream", ExtraParam: test.params}) if test.ok { - rec, err := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) - acks_exist,_:= db.collectionExist(Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""},acks_collection_name_prefix+test.stream) - inprocess_exist,_:= db.collectionExist(Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""},inprocess_collection_name_prefix+test.stream) - assert.Equal(t,0,len(rec.Streams),test.message) - assert.Equal(t,false,acks_exist,test.message) - assert.Equal(t,false,inprocess_exist,test.message) + rec, err := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""}) + acks_exist, _ := db.collectionExist(Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""}, acks_collection_name_prefix+test.stream) + inprocess_exist, _ := db.collectionExist(Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""}, inprocess_collection_name_prefix+test.stream) + assert.Equal(t, 0, len(rec.Streams), test.message) + assert.Equal(t, false, acks_exist, test.message) + assert.Equal(t, false, inprocess_exist, test.message) assert.Nil(t, err, test.message) } else { assert.NotNil(t, err, test.message) } - _, err = db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: test.stream, GroupId: "", Op: "delete_stream", ExtraParam: test.params}) + _, err = db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: test.stream, GroupId: "", Op: "delete_stream", ExtraParam: test.params}) if test.ok2 { assert.Nil(t, err, test.message+" 2") } else { @@ -1323,30 +1316,28 @@ func TestDeleteStreams(t *testing.T) { } } - var testsEncodings = []struct { - beamtime string - datasource string + beamtime string + datasource string collection string - group string - dbname_indb string - collection_indb string - group_indb string - message string + group string + dbname_indb string + collection_indb string + group_indb string + message string ok bool }{ - {"bt","dbname", "col", "group", "bt_dbname","col","group", "no encoding",true}, - {"bt","dbname"+badSymbolsDb, "col", "group", "bt_dbname"+badSymbolsDbEncoded,"col","group", "symbols in db",true}, - {"bt","dbname", "col"+badSymbolsCol, "group"+badSymbolsCol, "bt_dbname","col"+badSymbolsColEncoded,"group"+badSymbolsColEncoded, "symbols in col",true}, - {"bt","dbname"+badSymbolsDb, "col"+badSymbolsCol, "group"+badSymbolsCol, "bt_dbname"+badSymbolsDbEncoded,"col"+badSymbolsColEncoded,"group"+badSymbolsColEncoded, "symbols in col and db",true}, - + {"bt", "dbname", "col", "group", "bt_dbname", "col", "group", "no encoding", true}, + {"bt", "dbname" + badSymbolsDb, "col", "group", "bt_dbname" + badSymbolsDbEncoded, "col", "group", "symbols in db", true}, + {"bt", "dbname", "col" + badSymbolsCol, "group" + badSymbolsCol, "bt_dbname", "col" + badSymbolsColEncoded, "group" + badSymbolsColEncoded, "symbols in col", true}, + {"bt", "dbname" + badSymbolsDb, "col" + badSymbolsCol, "group" + badSymbolsCol, "bt_dbname" + badSymbolsDbEncoded, "col" + badSymbolsColEncoded, "group" + badSymbolsColEncoded, "symbols in col and db", true}, } func TestMongoDBEncodingOK(t *testing.T) { for _, test := range testsEncodings { db.Connect(dbaddress) db.insertRecord(test.dbname_indb, test.collection_indb, &rec1) - res, err := db.ProcessRequest(Request{Beamtime:test.beamtime,DataSource: test.datasource, Stream: test.collection, GroupId: test.group, Op: "next"}) + res, err := db.ProcessRequest(Request{Beamtime: test.beamtime, DataSource: test.datasource, Stream: test.collection, GroupId: test.group, Op: "next"}) if test.ok { assert.Nil(t, err, test.message) assert.Equal(t, string(rec1_expect), string(res), test.message) @@ -1355,4 +1346,4 @@ func TestMongoDBEncodingOK(t *testing.T) { } cleanupWithName(test.dbname_indb) } -} \ No newline at end of file +} diff --git a/broker/src/asapo_broker/database/streams_test.go b/broker/src/asapo_broker/database/streams_test.go index 2bb15c0b575fc8d156d5e186ed6d3f1d4e663daf..cfb40bbbaab531ed85be365d36c2e7b6d52a6e3b 100644 --- a/broker/src/asapo_broker/database/streams_test.go +++ b/broker/src/asapo_broker/database/streams_test.go @@ -1,9 +1,9 @@ +//go:build integration_tests // +build integration_tests package database import ( - "asapo_common/utils" "fmt" "github.com/stretchr/testify/suite" "testing" @@ -28,16 +28,16 @@ func TestStreamsTestSuite(t *testing.T) { } func (suite *StreamsTestSuite) TestStreamsEmpty() { - rec, err := streams.getStreams(&db, Request{Beamtime:"test",DataSource:datasource, ExtraParam: ""}) + rec, err := streams.getStreams(&db, Request{Beamtime: "test", DataSource: datasource, ExtraParam: ""}) suite.Nil(err) suite.Empty(rec.Streams, 0) } func (suite *StreamsTestSuite) TestStreamsNotUsesCacheWhenEmpty() { db.settings.UpdateStreamCachePeriodMs = 1000 - streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) + streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""}) db.insertRecord(dbname, collection, &rec1) - rec, err := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) + rec, err := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""}) suite.Nil(err) suite.Equal(1, len(rec.Streams)) } @@ -45,9 +45,9 @@ func (suite *StreamsTestSuite) TestStreamsNotUsesCacheWhenEmpty() { func (suite *StreamsTestSuite) TestStreamsUsesCache() { db.settings.UpdateStreamCachePeriodMs = 1000 db.insertRecord(dbname, collection, &rec2) - streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) + streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""}) db.insertRecord(dbname, collection, &rec1) - rec, err := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) + rec, err := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""}) suite.Nil(err) suite.Equal(int64(1), rec.Streams[0].Timestamp) suite.Equal(false, rec.Streams[0].Finished) @@ -60,39 +60,37 @@ func (suite *StreamsTestSuite) TestStreamsCacheexpires() { var res1 StreamsRecord go func() { db.insertRecord(dbname, collection, &rec1) - streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) + streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""}) db.insertRecord(dbname, collection, &rec_finished) - res1,_ = streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) + res1, _ = streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""}) }() db.insertRecord(dbname, collection+"1", &rec1_later) - res2,_ := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) + res2, _ := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""}) db.insertRecord(dbname, collection+"1", &rec_finished) time.Sleep(time.Second) - res3, err := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) + res3, err := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""}) suite.Nil(err) suite.Equal(true, res3.Streams[0].Finished) - fmt.Println(res1,res2) -// suite.Equal(true, rec.Streams[1].Finished) + fmt.Println(res1, res2) + // suite.Equal(true, rec.Streams[1].Finished) } - func (suite *StreamsTestSuite) TestStreamsGetFinishedInfo() { db.settings.UpdateStreamCachePeriodMs = 1000 db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec_finished) - rec, err := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) + rec, err := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""}) suite.Nil(err) suite.Equal(int64(0), rec.Streams[0].Timestamp) suite.Equal(true, rec.Streams[0].Finished) suite.Equal("next1", rec.Streams[0].NextStream) } - func (suite *StreamsTestSuite) TestStreamsDataSetsGetFinishedInfo() { db.settings.UpdateStreamCachePeriodMs = 1000 db.insertRecord(dbname, collection, &rec_dataset1_incomplete) db.insertRecord(dbname, collection, &rec_finished) - rec, err := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) + rec, err := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""}) suite.Nil(err) suite.Equal(int64(1), rec.Streams[0].Timestamp) suite.Equal(int64(2), rec.Streams[0].TimestampLast) @@ -106,8 +104,8 @@ func (suite *StreamsTestSuite) TestStreamsMultipleRequests() { db.insertRecord(dbname, collection, &rec_dataset1_incomplete) db.insertRecord(dbname, collection, &rec_finished) db.insertRecord(dbname, collection2, &rec_dataset1_incomplete) - rec, err := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: "0/unfinished"}) - rec2, err2 := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: "0/finished"}) + rec, err := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"","filter":"unfinished","detailed":"True"}`}) + rec2, err2 := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"","filter":"finished","detailed":"True"}`}) suite.Nil(err) suite.Equal(collection2, rec.Streams[0].Name) suite.Equal(1, len(rec.Streams)) @@ -119,10 +117,10 @@ func (suite *StreamsTestSuite) TestStreamsMultipleRequests() { func (suite *StreamsTestSuite) TestStreamsNotUsesCacheWhenExpired() { db.settings.UpdateStreamCachePeriodMs = 10 db.insertRecord(dbname, collection, &rec2) - streams.getStreams(&db, Request{Beamtime:beamtime,DataSource:datasource, ExtraParam: ""}) + streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""}) db.insertRecord(dbname, collection, &rec1) time.Sleep(time.Millisecond * 100) - rec, err := streams.getStreams(&db, Request{Beamtime:beamtime,DataSource:datasource, ExtraParam: ""}) + rec, err := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""}) suite.Nil(err) suite.Equal(int64(1), rec.Streams[0].Timestamp) } @@ -130,31 +128,31 @@ func (suite *StreamsTestSuite) TestStreamsNotUsesCacheWhenExpired() { func (suite *StreamsTestSuite) TestStreamRemovesDatabase() { db.settings.UpdateStreamCachePeriodMs = 0 db.insertRecord(dbname, collection, &rec1) - streams.getStreams(&db, Request{Beamtime:beamtime,DataSource:datasource, ExtraParam: ""}) + streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""}) db.dropDatabase(dbname) - rec, err := streams.getStreams(&db, Request{Beamtime:beamtime,DataSource:datasource, ExtraParam: ""}) + rec, err := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""}) suite.Nil(err) suite.Empty(rec.Streams, 0) } -var streamFilterTests=[]struct{ +var streamFilterTests = []struct { request Request - error bool + error bool streams []string message string }{ - {request: Request{Beamtime:beamtime,DataSource:datasource,ExtraParam:""},error: false,streams: []string{collection,collection2},message: "default all streams"}, - {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:"0/"},error: false,streams: []string{collection,collection2},message: "default 0/ all streams"}, - {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings(collection,"")},error: false,streams: []string{collection,collection2},message: "first parameter only - all streams"}, - {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:"0/all"},error: false,streams: []string{collection,collection2},message: "second parameter only - all streams"}, - {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:"0/finished"},error: false,streams: []string{collection2},message: "second parameter only - finished streams"}, - {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:"0/unfinished"},error: false,streams: []string{collection},message: "second parameter only - unfinished streams"}, - {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings(collection2,"all")},error: false,streams: []string{collection2},message: "from stream2"}, - {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings(collection2,"unfinished")},error: false,streams: []string{},message: "from stream2 and filter"}, - {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings(collection2,"bla")},error: true,streams: []string{},message: "wrong filter"}, - {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings(collection2,"all_aaa")},error: true,streams: []string{},message: "wrong filter2"}, - {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings("blabla","")},error: false,streams: []string{},message: "from unknown stream returns nothing"}, - {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings(collection2,"")},error: false,streams: []string{collection2},message: "from stream2, first parameter only"}, + {request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"","filter":"","detailed":""}`}, error: false, streams: []string{collection, collection2}, message: "default all streams"}, + {request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"","filter":"","detailed":""}`}, error: false, streams: []string{collection, collection2}, message: "default 0/ all streams"}, + {request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"` + collection + `","filter":"","detailed":"True"}`}, error: false, streams: []string{collection, collection2}, message: "first parameter only - all streams"}, + {request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"","filter":"all","detailed":"False"}`}, error: false, streams: []string{collection, collection2}, message: "second parameter only - all streams"}, + {request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"","filter":"finished","detailed":"True"}`}, error: false, streams: []string{collection2}, message: "second parameter only - finished streams"}, + {request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"","filter":"unfinished","detailed":"True"}`}, error: false, streams: []string{collection}, message: "second parameter only - unfinished streams"}, + {request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"` + collection2 + `","filter":"all","detailed":"False"}`}, error: false, streams: []string{collection2}, message: "from stream2"}, + {request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"` + collection2 + `","filter":"unfinished","detailed":"True"}`}, error: false, streams: []string{}, message: "from stream2 and filter"}, + {request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"` + collection2 + `","filter":"bla","detailed":"False"}`}, error: true, streams: []string{}, message: "wrong filter"}, + {request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"` + collection2 + `","filter":"all_aaa","detailed":"False"}`}, error: true, streams: []string{}, message: "wrong filter2"}, + {request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"blabla","filter":"","detailed":"False"}`}, error: false, streams: []string{}, message: "from unknown stream returns nothing"}, + {request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"` + collection2 + `","filter":"","detailed":"False"}`}, error: false, streams: []string{collection2}, message: "from stream2, first parameter only"}, } func (suite *StreamsTestSuite) TestStreamFilters() { @@ -164,16 +162,16 @@ func (suite *StreamsTestSuite) TestStreamFilters() { for _, test := range streamFilterTests { rec, err := streams.getStreams(&db, test.request) if test.error { - suite.NotNil(err,test.message) + suite.NotNil(err, test.message) continue } - if err!=nil { + if err != nil { fmt.Println(err.Error()) } - streams:=make([]string,0) - for _,si:=range rec.Streams { - streams=append(streams,si.Name) + streams := make([]string, 0) + for _, si := range rec.Streams { + streams = append(streams, si.Name) } - suite.Equal(test.streams,streams,test.message) + suite.Equal(test.streams, streams, test.message) } -} \ No newline at end of file +} diff --git a/broker/src/asapo_broker/server/get_commands_test.go b/broker/src/asapo_broker/server/get_commands_test.go index 0c5b4a91868570757dc9181e1b93aa25771fa8a2..b0ee5d04fcebda10ac4c128beddd3c05bf0eec46 100644 --- a/broker/src/asapo_broker/server/get_commands_test.go +++ b/broker/src/asapo_broker/server/get_commands_test.go @@ -35,63 +35,60 @@ func TestGetCommandsTestSuite(t *testing.T) { } var testsGetCommand = []struct { - command string - source string - stream string - groupid string - reqString string - queryParams string + command string + source string + stream string + groupid string + reqString string + queryParams string externalParam string }{ - {"last", expectedSource,expectedStream, "", expectedStream + "/0/last","","0"}, - {"id", expectedSource,expectedStream, "", expectedStream + "/0/1","","1"}, - {"meta", expectedSource,"default", "", "default/0/meta/0","","0"}, - {"nacks",expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/nacks","","0_0"}, - {"groupedlast", expectedSource,expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/groupedlast","",""}, - {"next", expectedSource,expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/next","",""}, - {"next", expectedSource,expectedStream, expectedGroupID, expectedStream + "/" + - expectedGroupID + "/next","&resend_nacks=true&delay_ms=10000&resend_attempts=3","10000_3"}, - {"size", expectedSource,expectedStream, "", expectedStream + "/size","",""}, - {"size",expectedSource, expectedStream, "", expectedStream + "/size","&incomplete=true","true"}, - {"streams",expectedSource, "0", "", "0/streams","","0/"}, - {"lastack", expectedSource,expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/lastack","",""}, + {"last", expectedSource, expectedStream, "", expectedStream + "/0/last", "", "0"}, + {"id", expectedSource, expectedStream, "", expectedStream + "/0/1", "", "1"}, + {"meta", expectedSource, "default", "", "default/0/meta/0", "", "0"}, + {"nacks", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/nacks", "", "0_0"}, + {"groupedlast", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/groupedlast", "", ""}, + {"next", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/next", "", ""}, + {"next", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + + expectedGroupID + "/next", "&resend_nacks=true&delay_ms=10000&resend_attempts=3", "10000_3"}, + {"size", expectedSource, expectedStream, "", expectedStream + "/size", "", ""}, + {"size", expectedSource, expectedStream, "", expectedStream + "/size", "&incomplete=true", "true"}, + {"streams", expectedSource, "0", "", "0/streams", "", "{\"from\":\"\",\"filter\":\"\",\"detailed\":\"\"}"}, + {"lastack", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/lastack", "", ""}, } - func (suite *GetCommandsTestSuite) TestGetCommandsCallsCorrectRoutine() { for _, test := range testsGetCommand { suite.mock_db.On("ProcessRequest", database.Request{Beamtime: expectedBeamtimeId, DataSource: test.source, Stream: test.stream, GroupId: test.groupid, Op: test.command, ExtraParam: test.externalParam}).Return([]byte("Hello"), nil) logger.MockLog.On("WithFields", mock.MatchedBy(containsMatcherMap(test.command))) logger.MockLog.On("Debug", mock.Anything) - - w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + test.source + "/" + test.reqString+correctTokenSuffix+test.queryParams) - suite.Equal(http.StatusOK, w.Code, test.command+ " OK") + w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + test.source + "/" + test.reqString + correctTokenSuffix + test.queryParams) + suite.Equal(http.StatusOK, w.Code, test.command+" OK") suite.Equal("Hello", string(w.Body.Bytes()), test.command+" sends data") } } func (suite *GetCommandsTestSuite) TestGetCommandsCorrectlyProcessedEncoding() { - badSymbols:="%$&./\\_$&\"" + badSymbols := "%$&./\\_$&\"" for _, test := range testsGetCommand { - newstream := test.stream+badSymbols - newsource := test.source+badSymbols - newgroup :="" - if test.groupid!="" { - newgroup = test.groupid+badSymbols + newstream := test.stream + badSymbols + newsource := test.source + badSymbols + newgroup := "" + if test.groupid != "" { + newgroup = test.groupid + badSymbols } - encodedStream:=url.PathEscape(newstream) - encodedSource:=url.PathEscape(newsource) - encodedGroup:=url.PathEscape(newgroup) - test.reqString = strings.Replace(test.reqString,test.groupid,encodedGroup,1) - test.reqString = strings.Replace(test.reqString,test.source,encodedSource,1) - test.reqString = strings.Replace(test.reqString,test.stream,encodedStream,1) - suite.mock_db.On("ProcessRequest", database.Request{Beamtime: expectedBeamtimeId,DataSource: newsource, Stream: newstream, GroupId: newgroup, Op: test.command, ExtraParam: test.externalParam}).Return([]byte("Hello"), nil) + encodedStream := url.PathEscape(newstream) + encodedSource := url.PathEscape(newsource) + encodedGroup := url.PathEscape(newgroup) + test.reqString = strings.Replace(test.reqString, test.groupid, encodedGroup, 1) + test.reqString = strings.Replace(test.reqString, test.source, encodedSource, 1) + test.reqString = strings.Replace(test.reqString, test.stream, encodedStream, 1) + suite.mock_db.On("ProcessRequest", database.Request{Beamtime: expectedBeamtimeId, DataSource: newsource, Stream: newstream, GroupId: newgroup, Op: test.command, ExtraParam: test.externalParam}).Return([]byte("Hello"), nil) logger.MockLog.On("WithFields", mock.MatchedBy(containsMatcherMap(test.command))) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("got request"))) - w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + encodedSource + "/" + test.reqString+correctTokenSuffix+test.queryParams) - suite.Equal(http.StatusOK, w.Code, test.command+ " OK") + w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + encodedSource + "/" + test.reqString + correctTokenSuffix + test.queryParams) + suite.Equal(http.StatusOK, w.Code, test.command+" OK") suite.Equal("Hello", string(w.Body.Bytes()), test.command+" sends data") } } - diff --git a/broker/src/asapo_broker/server/get_streams.go b/broker/src/asapo_broker/server/get_streams.go index 01e1e8edc57e920fecee4d2b9560f5087363ca3f..abbf0b4e20b6d9cb3af13f2e5a0473ce95f5c0f0 100644 --- a/broker/src/asapo_broker/server/get_streams.go +++ b/broker/src/asapo_broker/server/get_streams.go @@ -1,15 +1,18 @@ package server import ( - "asapo_common/utils" + "asapo_broker/database" + "encoding/json" "net/http" ) +type GetStreamsParams = database.GetStreamsParams + func routeGetStreams(w http.ResponseWriter, r *http.Request) { keys := r.URL.Query() - from := keys.Get("from") - filter := keys.Get("filter") - utils.EncodeTwoStrings(from,filter) - encoded := utils.EncodeTwoStrings(from,filter) - processRequest(w, r, "streams", encoded, false) + extraParams := GetStreamsParams{From: keys.Get("from"), + Filter: keys.Get("filter"), + Detailed: keys.Get("detailed")} + encoded, _ := json.Marshal(extraParams) + processRequest(w, r, "streams", string(encoded), false) } diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index 2715705605dfa8665d7ba98612dce8de528ed073..03b48601e4a9c2f0143697249cff6a6d91a12070 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -92,6 +92,7 @@ class Consumer { //! Get list of streams with filter, set from to "" to get all streams virtual StreamInfos GetStreamList(std::string from, StreamFilter filter, Error* err) = 0; + virtual StreamInfos GetStreamList(std::string from, StreamFilter filter, bool detailed, Error* err) = 0; //! Delete stream /*! diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index a6cfacf60ff6c718ba8785f48d207b60cea85b0f..8710b5417b87bd7806a1481b93ef6fb2a2c9ae87 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -818,7 +818,11 @@ std::string filterToString(StreamFilter filter) { } StreamInfos ConsumerImpl::GetStreamList(std::string from, StreamFilter filter, Error* err) { - RequestInfo ri = GetStreamListRequest(from, filter); + return GetStreamList(from, filter, true, err); +} + +StreamInfos ConsumerImpl::GetStreamList(std::string from, StreamFilter filter, bool detailed, Error* err) { + RequestInfo ri = GetStreamListRequest(from, filter, detailed); auto response = BrokerRequestWithTimeout(ri, err); if (*err) { @@ -827,13 +831,15 @@ StreamInfos ConsumerImpl::GetStreamList(std::string from, StreamFilter filter, E return ParseStreamsFromResponse(std::move(response), err); } -RequestInfo ConsumerImpl::GetStreamListRequest(const std::string& from, const StreamFilter& filter) const { +RequestInfo ConsumerImpl::GetStreamListRequest(const std::string& from, const StreamFilter& filter, bool detailed) const { RequestInfo ri = CreateBrokerApiRequest("0", "", "streams"); ri.post = false; if (!from.empty()) { ri.extra_params = "&from=" + httpclient__->UrlEscape(from); } ri.extra_params += "&filter=" + filterToString(filter); + std::string detailed_str = detailed ? "true" : "false"; + ri.extra_params += "&detailed=" + detailed_str; return ri; } diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index 8ac94d281f337bbc1ef910ecaaa35b9810e1a5d6..d1672ea1edcecede0470d4ffb93b443d4efa9b2b 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -107,7 +107,8 @@ class ConsumerImpl final : public asapo::Consumer { Error RetrieveData(MessageMeta* info, MessageData* data) override; - StreamInfos GetStreamList(std::string from, StreamFilter filter, Error* err) override; + StreamInfos GetStreamList(std::string from, StreamFilter filter, bool detailed, Error* err) override; + StreamInfos GetStreamList(std::string from, StreamFilter filter, Error* err); void SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) override; virtual void InterruptCurrentOperation() override; @@ -153,7 +154,7 @@ class ConsumerImpl final : public asapo::Consumer { Error UpdateFolderTokenIfNeeded(bool ignore_existing); uint64_t GetCurrentCount(const RequestInfo& ri, Error* err); - RequestInfo GetStreamListRequest(const std::string& from, const StreamFilter& filter) const; + RequestInfo GetStreamListRequest(const std::string& from, const StreamFilter& filter, bool detailed) const; Error GetServerVersionInfo(std::string* server_info, bool* supported) ; RequestInfo CreateBrokerApiRequest(std::string stream, std::string group, std::string suffix) const; diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp index b8afdab41eed763659166da1d46acb4f6359dd95..15c27317a0b2f28e9fedf245c611ee4330b6ef3c 100644 --- a/consumer/api/cpp/unittests/test_consumer_impl.cpp +++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp @@ -1276,7 +1276,8 @@ TEST_F(ConsumerImplTests, GetStreamListUsesCorrectUri) { R"({"lastId":124,"name":"test1","timestampCreated":2000000,"timestampLast":2000,"finished":true,"nextStream":"next"}]})"; EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/0/streams" - + "?token=" + expected_token + "&from=" + expected_stream_encoded + "&filter=all", _, + + "?token=" + expected_token + "&from=" + expected_stream_encoded + "&filter=all" + + "&detailed=true", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), @@ -1299,7 +1300,7 @@ TEST_F(ConsumerImplTests, GetStreamListUsesCorrectUriWithoutFrom) { Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/0/streams" + "?token=" + expected_token + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded - + "&filter=finished", _, + + "&filter=finished&detailed=true", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 1715bf9930b84cccaf8f337541ad0ffca68f5644..05e32cf8beeaa82f58002979a657c0b257391938 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -93,7 +93,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: DataSet GetLastDataset(string group_id, uint64_t min_size, string stream, Error* err) DataSet GetDatasetById(uint64_t id, uint64_t min_size, string stream, Error* err) Error RetrieveData(MessageMeta* info, MessageData* data) - vector[StreamInfo] GetStreamList(string from_stream, StreamFilter filter, Error* err) + vector[StreamInfo] GetStreamList(string from_stream, StreamFilter filter, bool detailed, Error* err) void SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) void InterruptCurrentOperation() Error GetVersionInfo(string* client_info,string* server_info, bool* supported) diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index cee15bdbdce9d141c8b88696646ae459ac822690..3ba383a617c7f7d80158cc26e09f34a41527315f 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -264,13 +264,14 @@ cdef class PyConsumer: if err: throw_exception(err) return _str(group_id) - def get_stream_list(self,from_stream = "",filter="all"): + def get_stream_list(self,from_stream = "",filter="all", detailed=True): cdef Error err cdef vector[StreamInfo] streams cdef string b_from_stream = _bytes(from_stream) cdef StreamFilter stream_filter = self._filter_to_cfilter(filter) + cdef bool b_detailed = detailed with nogil: - streams = self.c_consumer.get().GetStreamList(b_from_stream,stream_filter,&err) + streams = self.c_consumer.get().GetStreamList(b_from_stream,stream_filter,b_detailed,&err) if err: throw_exception(err) list = [] diff --git a/deploy/build_env/services-linux/Dockerfile b/deploy/build_env/services-linux/Dockerfile index 0f98bfa803e7a69be6c2d56aaa60dbd19af8ffee..c9b14cf668a2a64a4dad4e2987780f22344643a7 100644 --- a/deploy/build_env/services-linux/Dockerfile +++ b/deploy/build_env/services-linux/Dockerfile @@ -13,14 +13,8 @@ RUN echo "deb [signed-by=/usr/share/keyrings/influxdb.gpg] https://repos.influxd RUN set -eux; \ apt-get update; \ - DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \ - apt-utils - -RUN set -eux; \ - apt-get update; \ - DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \ + apt-get install -y --no-install-recommends \ cmake \ - supervisor \ libicu-dev \ libfabric-dev \ librdkafka-dev \ diff --git a/deploy/build_env/services-linux/supervisord.conf b/deploy/build_env/services-linux/supervisord.conf index f8c9cffeed961bfea64eb3b8aa1bea99974d8936..e99e0687d5fd1933c458d4a46fbf8988ce372077 100644 --- a/deploy/build_env/services-linux/supervisord.conf +++ b/deploy/build_env/services-linux/supervisord.conf @@ -76,4 +76,10 @@ command=/usr/bin/consul agent -config-dir=/etc/consul.d/ stderr_logfile=/tmp/consul_err.log stderr_logfile_maxbytes = 0 stdout_logfile=/tmp/consul_out.log + +[program:envoyproxy] +command=envoy -c /etc/envoy/envoy.yaml +stderr_logfile=/tmp/envoy_err.log +stderr_logfile_maxbytes = 0 +stdout_logfile=/tmp/envoy_out.log stdout_logfile_maxbytes = 0