Skip to content
Snippets Groups Projects
Commit 340a8db9 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Merge branch 'speedup_get_streams' into 'develop'

Speedup get_next

See merge request asapo/asapo!203
parents 4fe89582 7067a7c1
No related branches found
No related tags found
No related merge requests found
Showing
with 399 additions and 325 deletions
......@@ -69,6 +69,7 @@ const data_collection_name_prefix = "data_"
const acks_collection_name_prefix = "acks_"
const inprocess_collection_name_prefix = "inprocess_"
const meta_collection_name = "meta"
const streams_info = "streams"
const pointer_collection_name = "current_location"
const pointer_field_name = "current_pointer"
const last_message_collection_name = "last_messages"
......
//+build !test
//go:build !test
// +build !test
package database
import (
log "asapo_common/logger"
"asapo_common/utils"
"context"
"encoding/json"
"errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"sort"
"strconv"
"strings"
"sync"
"time"
)
type GetStreamsParams struct {
From string `json:"from"`
Filter string `json:"filter"`
Detailed string `json:"detailed"`
}
type StreamInfo struct {
LastId int64 `json:"lastId"`
Name string `json:"name"`
......@@ -29,13 +41,12 @@ type StreamsRecord struct {
type Streams struct {
records map[string]StreamsRecord
lastUpdated map[string]time.Time
lastSynced map[string]time.Time
}
var streams = Streams{lastSynced: make(map[string]time.Time, 0),lastUpdated: make(map[string]time.Time, 0), records: make(map[string]StreamsRecord, 0)}
var streams = Streams{lastUpdated: make(map[string]time.Time, 0), records: make(map[string]StreamsRecord, 0)}
var streamsLock sync.Mutex
func (ss *Streams) tryGetFromCache(db_name string, updatePeriodMs int) (StreamsRecord, error) {
func (ss *Streams) getStreamsInfoFromCache(db_name string, updatePeriodMs int) (StreamsRecord, error) {
if time.Now().Sub(ss.lastUpdated[db_name]).Milliseconds() >= int64(updatePeriodMs) {
return StreamsRecord{}, errors.New("cache expired")
}
......@@ -43,12 +54,14 @@ func (ss *Streams) tryGetFromCache(db_name string, updatePeriodMs int) (StreamsR
if !ok {
return StreamsRecord{}, errors.New("no records for " + db_name)
}
res :=StreamsRecord{}
utils.DeepCopy(rec,&res)
res := StreamsRecord{}
utils.DeepCopy(rec, &res)
return res, nil
}
func readStreams(db *Mongodb, db_name string) (StreamsRecord, error) {
// Return StreamsRecord with names of streams filled from DB
// Function query list of collections for given db_name and them filter it by name
func readAllStreamNames(db *Mongodb, db_name string) (StreamsRecord, error) {
database := db.client.Database(db_name)
result, err := database.ListCollectionNames(context.TODO(), bson.D{})
if err != nil {
......@@ -57,7 +70,7 @@ func readStreams(db *Mongodb, db_name string) (StreamsRecord, error) {
var rec = StreamsRecord{[]StreamInfo{}}
for _, coll := range result {
if strings.HasPrefix(coll, data_collection_name_prefix) {
sNameEncoded:= strings.TrimPrefix(coll, data_collection_name_prefix)
sNameEncoded := strings.TrimPrefix(coll, data_collection_name_prefix)
si := StreamInfo{Name: decodeString(sNameEncoded)}
rec.Streams = append(rec.Streams, si)
}
......@@ -65,20 +78,30 @@ func readStreams(db *Mongodb, db_name string) (StreamsRecord, error) {
return rec, nil
}
func getCurrentStreams(db_name string) []StreamInfo {
ss, dbFound := streams.records[db_name]
currentStreams := []StreamInfo{}
if dbFound {
// sort streams by name
currentStreams = ss.Streams
sort.Slice(currentStreams, func(i, j int) bool {
return currentStreams[i].Name >= currentStreams[j].Name
})
// Read stream info for all streams from streams_info collection
// If correction is empty or does not exist return empty array
// and create collecton with unique index `name`
func readStreamsInfo(db *Mongodb, db_name string) ([]StreamInfo, error) {
coll := db.client.Database(db_name).Collection(streams_info)
opts := options.Find().SetSort(bson.D{{"name", 1}}) // Order by name to simplify search
cursor, err := coll.Find(context.TODO(), bson.D{}, opts)
if err != nil {
log.Error("Get streams fails", err)
return []StreamInfo{}, err
}
var rec []StreamInfo
if err = cursor.All(context.TODO(), &rec); err != nil {
log.Error("Decoding streams fails", err)
return []StreamInfo{}, err
}
if len(rec) == 0 {
createIndexStreamInfoDB(db, db_name)
}
return currentStreams
return rec, nil
}
func findStreamAmongCurrent(currentStreams []StreamInfo, record StreamInfo) (int, bool) {
// currentStreams is already ordered by Name
ind := sort.Search(len(currentStreams), func(i int) bool {
return currentStreams[i].Name >= record.Name
})
......@@ -124,7 +147,9 @@ func fillInfoFromLastRecord(db *Mongodb, db_name string, rec *StreamsRecord, rec
return nil
}
func updateStreamInfofromCurrent(currentStreams []StreamInfo, record StreamInfo, rec *StreamInfo) (found bool, updateFinished bool) {
// If stream name from record is found in currentStreams corresponding
// record get copied to rec.
func updateStreamInfoFromCurrent(currentStreams []StreamInfo, record StreamInfo, rec *StreamInfo) (found bool, updateFinished bool) {
ind, found := findStreamAmongCurrent(currentStreams, record)
if found {
*rec = currentStreams[ind]
......@@ -135,20 +160,64 @@ func updateStreamInfofromCurrent(currentStreams []StreamInfo, record StreamInfo,
return found, false
}
func updateStreamInfos(db *Mongodb, db_name string, rec *StreamsRecord,forceSync bool) error {
currentStreams := getCurrentStreams(db_name)
// Create unique index `name` for collection of streams info
// If collection does not exist, DB will create it
func createIndexStreamInfoDB(db *Mongodb, db_name string) error {
coll := db.client.Database(db_name).Collection(streams_info)
_, err := coll.Indexes().CreateOne(
context.Background(),
mongo.IndexModel{
Keys: bson.D{{Key: "name", Value: 1}},
Options: options.Index().SetUnique(true),
},
)
return err
}
// Update streams_info collection with current StreamInfo
func updateStreamInfoDB(db *Mongodb, db_name string, rec StreamInfo) error {
coll := db.client.Database(db_name).Collection(streams_info)
_, err := coll.InsertOne(context.TODO(), rec)
return err
}
// Fill stream information from different sources to the rec.
// Stream information is taken from collection streams_info
// If stream is not finished, only timestamp is valid
// If stream is not found in streams_info information is
// extracted form data collections
// If detailed is True, information is extracted from data collection
func updateStreamInfos(db *Mongodb, db_name string, rec *StreamsRecord, detailed bool) error {
currentStreams, err := readStreamsInfo(db, db_name)
if err != nil {
return err
}
for i, record := range rec.Streams {
found, mayContinue := updateStreamInfofromCurrent(currentStreams, record, &rec.Streams[i])
if mayContinue && !forceSync {
found, finished := updateStreamInfoFromCurrent(currentStreams, record, &rec.Streams[i])
if found && (!detailed || finished) { // Don't need to update if stream is finished
continue
}
if !found || forceSync { // set timestamp
if !found { // set timestamp
if err := fillInfoFromEarliestRecord(db, db_name, rec, record, i); err != nil {
return err
}
}
if err := fillInfoFromLastRecord(db, db_name, rec, record, i); err != nil { // update last record (timestamp, stream finished flag)
return err
if detailed { // update last record (TimestampLast, stream finished flag)
if err := fillInfoFromLastRecord(db, db_name, rec, record, i); err != nil {
return err
}
}
if !found || rec.Streams[i].Finished {
// Inset stream info in DB if is it not yet there or stream is now finished
err := updateStreamInfoDB(db, db_name, rec.Streams[i])
// Error may happen if two brokers works with the same stream and try to
// populate streams info.
if err != nil {
log.Debug("Update stream info in DB failed", err)
}
}
}
return nil
......@@ -160,91 +229,93 @@ func sortRecords(rec *StreamsRecord) {
})
}
func (ss *Streams) updateFromDb(db *Mongodb, db_name string) (StreamsRecord, error) {
rec, err := readStreams(db, db_name)
func getStreamsInfoFromDb(db *Mongodb, db_name string, detailed bool) (StreamsRecord, error) {
rec, err := readAllStreamNames(db, db_name)
if err != nil {
return StreamsRecord{}, err
}
forceSync:= false
if time.Now().Sub(ss.lastSynced[db_name]).Seconds() > 5 {
forceSync = true
}
err = updateStreamInfos(db, db_name, &rec,forceSync)
err = updateStreamInfos(db, db_name, &rec, detailed)
if err != nil {
return StreamsRecord{}, err
}
if forceSync {
ss.lastSynced[db_name] = time.Now()
}
sortRecords(&rec)
return rec, nil
}
// Save StreamsRecord in global variable for caching purpose
func (ss *Streams) cacheStreams(db_name string, rec StreamsRecord) {
if len(rec.Streams) > 0 {
res :=StreamsRecord{}
utils.DeepCopy(rec,&res)
ss.records[db_name] = res
result := StreamsRecord{}
utils.DeepCopy(rec, &result)
ss.records[db_name] = result
ss.lastUpdated[db_name] = time.Now()
}
return rec, nil
}
func getFiltersFromString(filterString string) (string, string, error) {
firstStream, streamStatus, err := utils.DecodeTwoStrings(filterString)
if err!=nil {
return "", "", errors.New("wrong format: " + filterString)
// Convert json string filterString to extra requests parameters
func getFiltersFromString(filterString string) (string, string, bool, error) {
var params GetStreamsParams
err := json.Unmarshal([]byte(filterString), &params)
if err != nil {
return "", "", true, errors.New("wrong format: " + filterString)
}
if streamStatus == "" {
streamStatus = stream_filter_all
if params.Filter == "" {
params.Filter = stream_filter_all
}
return firstStream, streamStatus, nil
detailed, _ := strconv.ParseBool(params.Detailed)
return params.From, params.Filter, detailed, nil
}
func getStreamsParamsFromRequest(request Request) (string, string, error) {
// Return extra request parameters from request
func getStreamsParamsFromRequest(request Request) (string, string, bool, error) {
if request.ExtraParam == "" {
return "", stream_filter_all, nil
return "", stream_filter_all, true, nil
}
firstStream, streamStatus, err := getFiltersFromString(request.ExtraParam)
firstStream, streamStatus, detailed, err := getFiltersFromString(request.ExtraParam)
if err != nil {
return "", "", err
return "", "", false, err
}
err = checkStreamstreamStatus(streamStatus)
err = checkStreamStatus(streamStatus)
if err != nil {
return "", "", err
return "", "", false, err
}
return firstStream, streamStatus, nil
return firstStream, streamStatus, detailed, nil
}
func checkStreamstreamStatus(streamStatus string) error {
func checkStreamStatus(streamStatus string) error {
if !utils.StringInSlice(streamStatus, []string{stream_filter_all, stream_filter_finished, stream_filter_unfinished}) {
return errors.New("getStreamsParamsFromRequest: wrong streamStatus " + streamStatus)
}
return nil
}
func keepStream(rec StreamInfo, streamStatus string) bool {
func filterStreamsByStatus(rec StreamInfo, streamStatus string) bool {
return (rec.Finished && streamStatus == stream_filter_finished) || (!rec.Finished && streamStatus == stream_filter_unfinished)
}
func filterStreams(rec StreamsRecord, firstStream string, streamStatus string) []StreamInfo {
limitedStreams := limitStreams(rec, firstStream)
limitedStreams := filterStreamsByName(rec, firstStream)
if streamStatus == stream_filter_all {
return limitedStreams
}
nextStreams := limitedStreams[:0]
for _, rec := range limitedStreams {
if keepStream(rec, streamStatus) {
if filterStreamsByStatus(rec, streamStatus) {
nextStreams = append(nextStreams, rec)
}
}
return nextStreams
}
func limitStreams(rec StreamsRecord, firstStream string) []StreamInfo {
func filterStreamsByName(rec StreamsRecord, firstStream string) []StreamInfo {
if firstStream != "" {
ind := len(rec.Streams)
for i, rec := range rec.Streams {
......@@ -259,22 +330,25 @@ func limitStreams(rec StreamsRecord, firstStream string) []StreamInfo {
}
func (ss *Streams) getStreams(db *Mongodb, request Request) (StreamsRecord, error) {
firstStream, streamStatus, err := getStreamsParamsFromRequest(request)
firstStream, streamStatus, detailed, err := getStreamsParamsFromRequest(request)
if err != nil {
return StreamsRecord{}, err
}
streamsLock.Lock()
rec, err := ss.tryGetFromCache(request.DbName(), db.settings.UpdateStreamCachePeriodMs)
rec, err := ss.getStreamsInfoFromCache(request.DbName(), db.settings.UpdateStreamCachePeriodMs)
streamsLock.Unlock()
if err != nil {
rec, err = ss.updateFromDb(db, request.DbName())
rec, err = getStreamsInfoFromDb(db, request.DbName(), detailed)
}
streamsLock.Unlock()
if err != nil {
return StreamsRecord{}, err
}
rec.Streams = filterStreams(rec, firstStream, streamStatus)
streamsLock.Lock()
ss.cacheStreams(request.DbName(), rec)
streamsLock.Unlock()
rec.Streams = filterStreams(rec, firstStream, streamStatus)
return rec, nil
}
This diff is collapsed.
//go:build integration_tests
// +build integration_tests
package database
import (
"asapo_common/utils"
"fmt"
"github.com/stretchr/testify/suite"
"testing"
......@@ -28,16 +28,16 @@ func TestStreamsTestSuite(t *testing.T) {
}
func (suite *StreamsTestSuite) TestStreamsEmpty() {
rec, err := streams.getStreams(&db, Request{Beamtime:"test",DataSource:datasource, ExtraParam: ""})
rec, err := streams.getStreams(&db, Request{Beamtime: "test", DataSource: datasource, ExtraParam: ""})
suite.Nil(err)
suite.Empty(rec.Streams, 0)
}
func (suite *StreamsTestSuite) TestStreamsNotUsesCacheWhenEmpty() {
db.settings.UpdateStreamCachePeriodMs = 1000
streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""})
streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""})
db.insertRecord(dbname, collection, &rec1)
rec, err := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""})
rec, err := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""})
suite.Nil(err)
suite.Equal(1, len(rec.Streams))
}
......@@ -45,9 +45,9 @@ func (suite *StreamsTestSuite) TestStreamsNotUsesCacheWhenEmpty() {
func (suite *StreamsTestSuite) TestStreamsUsesCache() {
db.settings.UpdateStreamCachePeriodMs = 1000
db.insertRecord(dbname, collection, &rec2)
streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""})
streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""})
db.insertRecord(dbname, collection, &rec1)
rec, err := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""})
rec, err := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""})
suite.Nil(err)
suite.Equal(int64(1), rec.Streams[0].Timestamp)
suite.Equal(false, rec.Streams[0].Finished)
......@@ -60,39 +60,37 @@ func (suite *StreamsTestSuite) TestStreamsCacheexpires() {
var res1 StreamsRecord
go func() {
db.insertRecord(dbname, collection, &rec1)
streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""})
streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""})
db.insertRecord(dbname, collection, &rec_finished)
res1,_ = streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""})
res1, _ = streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""})
}()
db.insertRecord(dbname, collection+"1", &rec1_later)
res2,_ := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""})
res2, _ := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""})
db.insertRecord(dbname, collection+"1", &rec_finished)
time.Sleep(time.Second)
res3, err := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""})
res3, err := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""})
suite.Nil(err)
suite.Equal(true, res3.Streams[0].Finished)
fmt.Println(res1,res2)
// suite.Equal(true, rec.Streams[1].Finished)
fmt.Println(res1, res2)
// suite.Equal(true, rec.Streams[1].Finished)
}
func (suite *StreamsTestSuite) TestStreamsGetFinishedInfo() {
db.settings.UpdateStreamCachePeriodMs = 1000
db.insertRecord(dbname, collection, &rec1)
db.insertRecord(dbname, collection, &rec_finished)
rec, err := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""})
rec, err := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""})
suite.Nil(err)
suite.Equal(int64(0), rec.Streams[0].Timestamp)
suite.Equal(true, rec.Streams[0].Finished)
suite.Equal("next1", rec.Streams[0].NextStream)
}
func (suite *StreamsTestSuite) TestStreamsDataSetsGetFinishedInfo() {
db.settings.UpdateStreamCachePeriodMs = 1000
db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
db.insertRecord(dbname, collection, &rec_finished)
rec, err := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""})
rec, err := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""})
suite.Nil(err)
suite.Equal(int64(1), rec.Streams[0].Timestamp)
suite.Equal(int64(2), rec.Streams[0].TimestampLast)
......@@ -106,8 +104,8 @@ func (suite *StreamsTestSuite) TestStreamsMultipleRequests() {
db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
db.insertRecord(dbname, collection, &rec_finished)
db.insertRecord(dbname, collection2, &rec_dataset1_incomplete)
rec, err := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: "0/unfinished"})
rec2, err2 := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: "0/finished"})
rec, err := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"","filter":"unfinished","detailed":"True"}`})
rec2, err2 := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"","filter":"finished","detailed":"True"}`})
suite.Nil(err)
suite.Equal(collection2, rec.Streams[0].Name)
suite.Equal(1, len(rec.Streams))
......@@ -119,10 +117,10 @@ func (suite *StreamsTestSuite) TestStreamsMultipleRequests() {
func (suite *StreamsTestSuite) TestStreamsNotUsesCacheWhenExpired() {
db.settings.UpdateStreamCachePeriodMs = 10
db.insertRecord(dbname, collection, &rec2)
streams.getStreams(&db, Request{Beamtime:beamtime,DataSource:datasource, ExtraParam: ""})
streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""})
db.insertRecord(dbname, collection, &rec1)
time.Sleep(time.Millisecond * 100)
rec, err := streams.getStreams(&db, Request{Beamtime:beamtime,DataSource:datasource, ExtraParam: ""})
rec, err := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""})
suite.Nil(err)
suite.Equal(int64(1), rec.Streams[0].Timestamp)
}
......@@ -130,31 +128,31 @@ func (suite *StreamsTestSuite) TestStreamsNotUsesCacheWhenExpired() {
func (suite *StreamsTestSuite) TestStreamRemovesDatabase() {
db.settings.UpdateStreamCachePeriodMs = 0
db.insertRecord(dbname, collection, &rec1)
streams.getStreams(&db, Request{Beamtime:beamtime,DataSource:datasource, ExtraParam: ""})
streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""})
db.dropDatabase(dbname)
rec, err := streams.getStreams(&db, Request{Beamtime:beamtime,DataSource:datasource, ExtraParam: ""})
rec, err := streams.getStreams(&db, Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: ""})
suite.Nil(err)
suite.Empty(rec.Streams, 0)
}
var streamFilterTests=[]struct{
var streamFilterTests = []struct {
request Request
error bool
error bool
streams []string
message string
}{
{request: Request{Beamtime:beamtime,DataSource:datasource,ExtraParam:""},error: false,streams: []string{collection,collection2},message: "default all streams"},
{request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:"0/"},error: false,streams: []string{collection,collection2},message: "default 0/ all streams"},
{request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings(collection,"")},error: false,streams: []string{collection,collection2},message: "first parameter only - all streams"},
{request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:"0/all"},error: false,streams: []string{collection,collection2},message: "second parameter only - all streams"},
{request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:"0/finished"},error: false,streams: []string{collection2},message: "second parameter only - finished streams"},
{request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:"0/unfinished"},error: false,streams: []string{collection},message: "second parameter only - unfinished streams"},
{request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings(collection2,"all")},error: false,streams: []string{collection2},message: "from stream2"},
{request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings(collection2,"unfinished")},error: false,streams: []string{},message: "from stream2 and filter"},
{request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings(collection2,"bla")},error: true,streams: []string{},message: "wrong filter"},
{request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings(collection2,"all_aaa")},error: true,streams: []string{},message: "wrong filter2"},
{request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings("blabla","")},error: false,streams: []string{},message: "from unknown stream returns nothing"},
{request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings(collection2,"")},error: false,streams: []string{collection2},message: "from stream2, first parameter only"},
{request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"","filter":"","detailed":""}`}, error: false, streams: []string{collection, collection2}, message: "default all streams"},
{request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"","filter":"","detailed":""}`}, error: false, streams: []string{collection, collection2}, message: "default 0/ all streams"},
{request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"` + collection + `","filter":"","detailed":"True"}`}, error: false, streams: []string{collection, collection2}, message: "first parameter only - all streams"},
{request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"","filter":"all","detailed":"False"}`}, error: false, streams: []string{collection, collection2}, message: "second parameter only - all streams"},
{request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"","filter":"finished","detailed":"True"}`}, error: false, streams: []string{collection2}, message: "second parameter only - finished streams"},
{request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"","filter":"unfinished","detailed":"True"}`}, error: false, streams: []string{collection}, message: "second parameter only - unfinished streams"},
{request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"` + collection2 + `","filter":"all","detailed":"False"}`}, error: false, streams: []string{collection2}, message: "from stream2"},
{request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"` + collection2 + `","filter":"unfinished","detailed":"True"}`}, error: false, streams: []string{}, message: "from stream2 and filter"},
{request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"` + collection2 + `","filter":"bla","detailed":"False"}`}, error: true, streams: []string{}, message: "wrong filter"},
{request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"` + collection2 + `","filter":"all_aaa","detailed":"False"}`}, error: true, streams: []string{}, message: "wrong filter2"},
{request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"blabla","filter":"","detailed":"False"}`}, error: false, streams: []string{}, message: "from unknown stream returns nothing"},
{request: Request{Beamtime: beamtime, DataSource: datasource, ExtraParam: `{"from":"` + collection2 + `","filter":"","detailed":"False"}`}, error: false, streams: []string{collection2}, message: "from stream2, first parameter only"},
}
func (suite *StreamsTestSuite) TestStreamFilters() {
......@@ -164,16 +162,16 @@ func (suite *StreamsTestSuite) TestStreamFilters() {
for _, test := range streamFilterTests {
rec, err := streams.getStreams(&db, test.request)
if test.error {
suite.NotNil(err,test.message)
suite.NotNil(err, test.message)
continue
}
if err!=nil {
if err != nil {
fmt.Println(err.Error())
}
streams:=make([]string,0)
for _,si:=range rec.Streams {
streams=append(streams,si.Name)
streams := make([]string, 0)
for _, si := range rec.Streams {
streams = append(streams, si.Name)
}
suite.Equal(test.streams,streams,test.message)
suite.Equal(test.streams, streams, test.message)
}
}
\ No newline at end of file
}
......@@ -35,63 +35,60 @@ func TestGetCommandsTestSuite(t *testing.T) {
}
var testsGetCommand = []struct {
command string
source string
stream string
groupid string
reqString string
queryParams string
command string
source string
stream string
groupid string
reqString string
queryParams string
externalParam string
}{
{"last", expectedSource,expectedStream, "", expectedStream + "/0/last","","0"},
{"id", expectedSource,expectedStream, "", expectedStream + "/0/1","","1"},
{"meta", expectedSource,"default", "", "default/0/meta/0","","0"},
{"nacks",expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/nacks","","0_0"},
{"groupedlast", expectedSource,expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/groupedlast","",""},
{"next", expectedSource,expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/next","",""},
{"next", expectedSource,expectedStream, expectedGroupID, expectedStream + "/" +
expectedGroupID + "/next","&resend_nacks=true&delay_ms=10000&resend_attempts=3","10000_3"},
{"size", expectedSource,expectedStream, "", expectedStream + "/size","",""},
{"size",expectedSource, expectedStream, "", expectedStream + "/size","&incomplete=true","true"},
{"streams",expectedSource, "0", "", "0/streams","","0/"},
{"lastack", expectedSource,expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/lastack","",""},
{"last", expectedSource, expectedStream, "", expectedStream + "/0/last", "", "0"},
{"id", expectedSource, expectedStream, "", expectedStream + "/0/1", "", "1"},
{"meta", expectedSource, "default", "", "default/0/meta/0", "", "0"},
{"nacks", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/nacks", "", "0_0"},
{"groupedlast", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/groupedlast", "", ""},
{"next", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/next", "", ""},
{"next", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" +
expectedGroupID + "/next", "&resend_nacks=true&delay_ms=10000&resend_attempts=3", "10000_3"},
{"size", expectedSource, expectedStream, "", expectedStream + "/size", "", ""},
{"size", expectedSource, expectedStream, "", expectedStream + "/size", "&incomplete=true", "true"},
{"streams", expectedSource, "0", "", "0/streams", "", "{\"from\":\"\",\"filter\":\"\",\"detailed\":\"\"}"},
{"lastack", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/lastack", "", ""},
}
func (suite *GetCommandsTestSuite) TestGetCommandsCallsCorrectRoutine() {
for _, test := range testsGetCommand {
suite.mock_db.On("ProcessRequest", database.Request{Beamtime: expectedBeamtimeId, DataSource: test.source, Stream: test.stream, GroupId: test.groupid, Op: test.command, ExtraParam: test.externalParam}).Return([]byte("Hello"), nil)
logger.MockLog.On("WithFields", mock.MatchedBy(containsMatcherMap(test.command)))
logger.MockLog.On("Debug", mock.Anything)
w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + test.source + "/" + test.reqString+correctTokenSuffix+test.queryParams)
suite.Equal(http.StatusOK, w.Code, test.command+ " OK")
w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + test.source + "/" + test.reqString + correctTokenSuffix + test.queryParams)
suite.Equal(http.StatusOK, w.Code, test.command+" OK")
suite.Equal("Hello", string(w.Body.Bytes()), test.command+" sends data")
}
}
func (suite *GetCommandsTestSuite) TestGetCommandsCorrectlyProcessedEncoding() {
badSymbols:="%$&./\\_$&\""
badSymbols := "%$&./\\_$&\""
for _, test := range testsGetCommand {
newstream := test.stream+badSymbols
newsource := test.source+badSymbols
newgroup :=""
if test.groupid!="" {
newgroup = test.groupid+badSymbols
newstream := test.stream + badSymbols
newsource := test.source + badSymbols
newgroup := ""
if test.groupid != "" {
newgroup = test.groupid + badSymbols
}
encodedStream:=url.PathEscape(newstream)
encodedSource:=url.PathEscape(newsource)
encodedGroup:=url.PathEscape(newgroup)
test.reqString = strings.Replace(test.reqString,test.groupid,encodedGroup,1)
test.reqString = strings.Replace(test.reqString,test.source,encodedSource,1)
test.reqString = strings.Replace(test.reqString,test.stream,encodedStream,1)
suite.mock_db.On("ProcessRequest", database.Request{Beamtime: expectedBeamtimeId,DataSource: newsource, Stream: newstream, GroupId: newgroup, Op: test.command, ExtraParam: test.externalParam}).Return([]byte("Hello"), nil)
encodedStream := url.PathEscape(newstream)
encodedSource := url.PathEscape(newsource)
encodedGroup := url.PathEscape(newgroup)
test.reqString = strings.Replace(test.reqString, test.groupid, encodedGroup, 1)
test.reqString = strings.Replace(test.reqString, test.source, encodedSource, 1)
test.reqString = strings.Replace(test.reqString, test.stream, encodedStream, 1)
suite.mock_db.On("ProcessRequest", database.Request{Beamtime: expectedBeamtimeId, DataSource: newsource, Stream: newstream, GroupId: newgroup, Op: test.command, ExtraParam: test.externalParam}).Return([]byte("Hello"), nil)
logger.MockLog.On("WithFields", mock.MatchedBy(containsMatcherMap(test.command)))
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("got request")))
w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + encodedSource + "/" + test.reqString+correctTokenSuffix+test.queryParams)
suite.Equal(http.StatusOK, w.Code, test.command+ " OK")
w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + encodedSource + "/" + test.reqString + correctTokenSuffix + test.queryParams)
suite.Equal(http.StatusOK, w.Code, test.command+" OK")
suite.Equal("Hello", string(w.Body.Bytes()), test.command+" sends data")
}
}
package server
import (
"asapo_common/utils"
"asapo_broker/database"
"encoding/json"
"net/http"
)
type GetStreamsParams = database.GetStreamsParams
func routeGetStreams(w http.ResponseWriter, r *http.Request) {
keys := r.URL.Query()
from := keys.Get("from")
filter := keys.Get("filter")
utils.EncodeTwoStrings(from,filter)
encoded := utils.EncodeTwoStrings(from,filter)
processRequest(w, r, "streams", encoded, false)
extraParams := GetStreamsParams{From: keys.Get("from"),
Filter: keys.Get("filter"),
Detailed: keys.Get("detailed")}
encoded, _ := json.Marshal(extraParams)
processRequest(w, r, "streams", string(encoded), false)
}
......@@ -92,6 +92,7 @@ class Consumer {
//! Get list of streams with filter, set from to "" to get all streams
virtual StreamInfos GetStreamList(std::string from, StreamFilter filter, Error* err) = 0;
virtual StreamInfos GetStreamList(std::string from, StreamFilter filter, bool detailed, Error* err) = 0;
//! Delete stream
/*!
......
......@@ -818,7 +818,11 @@ std::string filterToString(StreamFilter filter) {
}
StreamInfos ConsumerImpl::GetStreamList(std::string from, StreamFilter filter, Error* err) {
RequestInfo ri = GetStreamListRequest(from, filter);
return GetStreamList(from, filter, true, err);
}
StreamInfos ConsumerImpl::GetStreamList(std::string from, StreamFilter filter, bool detailed, Error* err) {
RequestInfo ri = GetStreamListRequest(from, filter, detailed);
auto response = BrokerRequestWithTimeout(ri, err);
if (*err) {
......@@ -827,13 +831,15 @@ StreamInfos ConsumerImpl::GetStreamList(std::string from, StreamFilter filter, E
return ParseStreamsFromResponse(std::move(response), err);
}
RequestInfo ConsumerImpl::GetStreamListRequest(const std::string& from, const StreamFilter& filter) const {
RequestInfo ConsumerImpl::GetStreamListRequest(const std::string& from, const StreamFilter& filter, bool detailed) const {
RequestInfo ri = CreateBrokerApiRequest("0", "", "streams");
ri.post = false;
if (!from.empty()) {
ri.extra_params = "&from=" + httpclient__->UrlEscape(from);
}
ri.extra_params += "&filter=" + filterToString(filter);
std::string detailed_str = detailed ? "true" : "false";
ri.extra_params += "&detailed=" + detailed_str;
return ri;
}
......
......@@ -107,7 +107,8 @@ class ConsumerImpl final : public asapo::Consumer {
Error RetrieveData(MessageMeta* info, MessageData* data) override;
StreamInfos GetStreamList(std::string from, StreamFilter filter, Error* err) override;
StreamInfos GetStreamList(std::string from, StreamFilter filter, bool detailed, Error* err) override;
StreamInfos GetStreamList(std::string from, StreamFilter filter, Error* err);
void SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) override;
virtual void InterruptCurrentOperation() override;
......@@ -153,7 +154,7 @@ class ConsumerImpl final : public asapo::Consumer {
Error UpdateFolderTokenIfNeeded(bool ignore_existing);
uint64_t GetCurrentCount(const RequestInfo& ri, Error* err);
RequestInfo GetStreamListRequest(const std::string& from, const StreamFilter& filter) const;
RequestInfo GetStreamListRequest(const std::string& from, const StreamFilter& filter, bool detailed) const;
Error GetServerVersionInfo(std::string* server_info, bool* supported) ;
RequestInfo CreateBrokerApiRequest(std::string stream, std::string group, std::string suffix) const;
......
......@@ -1276,7 +1276,8 @@ TEST_F(ConsumerImplTests, GetStreamListUsesCorrectUri) {
R"({"lastId":124,"name":"test1","timestampCreated":2000000,"timestampLast":2000,"finished":true,"nextStream":"next"}]})";
EXPECT_CALL(mock_http_client,
Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/0/streams"
+ "?token=" + expected_token + "&from=" + expected_stream_encoded + "&filter=all", _,
+ "?token=" + expected_token + "&from=" + expected_stream_encoded + "&filter=all"
+ "&detailed=true", _,
_)).WillOnce(DoAll(
SetArgPointee<1>(HttpCode::OK),
SetArgPointee<2>(nullptr),
......@@ -1299,7 +1300,7 @@ TEST_F(ConsumerImplTests, GetStreamListUsesCorrectUriWithoutFrom) {
Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/0/streams"
+ "?token=" + expected_token
+ "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded
+ "&filter=finished", _,
+ "&filter=finished&detailed=true", _,
_)).WillOnce(DoAll(
SetArgPointee<1>(HttpCode::OK),
SetArgPointee<2>(nullptr),
......
......@@ -93,7 +93,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil:
DataSet GetLastDataset(string group_id, uint64_t min_size, string stream, Error* err)
DataSet GetDatasetById(uint64_t id, uint64_t min_size, string stream, Error* err)
Error RetrieveData(MessageMeta* info, MessageData* data)
vector[StreamInfo] GetStreamList(string from_stream, StreamFilter filter, Error* err)
vector[StreamInfo] GetStreamList(string from_stream, StreamFilter filter, bool detailed, Error* err)
void SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts)
void InterruptCurrentOperation()
Error GetVersionInfo(string* client_info,string* server_info, bool* supported)
......
......@@ -264,13 +264,14 @@ cdef class PyConsumer:
if err:
throw_exception(err)
return _str(group_id)
def get_stream_list(self,from_stream = "",filter="all"):
def get_stream_list(self,from_stream = "",filter="all", detailed=True):
cdef Error err
cdef vector[StreamInfo] streams
cdef string b_from_stream = _bytes(from_stream)
cdef StreamFilter stream_filter = self._filter_to_cfilter(filter)
cdef bool b_detailed = detailed
with nogil:
streams = self.c_consumer.get().GetStreamList(b_from_stream,stream_filter,&err)
streams = self.c_consumer.get().GetStreamList(b_from_stream,stream_filter,b_detailed,&err)
if err:
throw_exception(err)
list = []
......
......@@ -13,14 +13,8 @@ RUN echo "deb [signed-by=/usr/share/keyrings/influxdb.gpg] https://repos.influxd
RUN set -eux; \
apt-get update; \
DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
apt-utils
RUN set -eux; \
apt-get update; \
DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
apt-get install -y --no-install-recommends \
cmake \
supervisor \
libicu-dev \
libfabric-dev \
librdkafka-dev \
......
......@@ -85,6 +85,12 @@ command=/usr/bin/consul agent -config-dir=/etc/consul.d/
stderr_logfile=/tmp/consul_err.log
stderr_logfile_maxbytes = 0
stdout_logfile=/tmp/consul_out.log
[program:envoyproxy]
command=envoy -c /etc/envoy/envoy.yaml
stderr_logfile=/tmp/envoy_err.log
stderr_logfile_maxbytes = 0
stdout_logfile=/tmp/envoy_out.log
stdout_logfile_maxbytes = 0
[program:envoyproxy]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment