diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 41bf082a0444c2f1248f540a4db52bd03d4ae3de..5e63509e5a687a2bc95bd77df8ae6a6e13bba017 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -72,6 +72,10 @@ const already_connected_msg = "already connected" const finish_stream_keyword = "asapo_finish_stream" const no_next_stream_keyword = "asapo_no_next" +const stream_filter_all = "all" +const stream_filter_finished = "finished" +const stream_filter_unfinished = "unfinished" + var dbSessionLock sync.Mutex @@ -869,7 +873,7 @@ func (db *Mongodb) getNacks(request Request, min_index, max_index int) ([]int, e } func (db *Mongodb) getStreams(request Request) ([]byte, error) { - rec, err := streams.getStreams(db, request.DbName, request.ExtraParam) + rec, err := streams.getStreams(db, request) if err != nil { return db.processQueryError("get streams", request.DbName, err) } diff --git a/broker/src/asapo_broker/database/mongodb_streams.go b/broker/src/asapo_broker/database/mongodb_streams.go index 4c531b1b9f5d978fd5138e79c82aa7eb153ae5ff..9720b404e890d72744c745d08873d58ed8c96135 100644 --- a/broker/src/asapo_broker/database/mongodb_streams.go +++ b/broker/src/asapo_broker/database/mongodb_streams.go @@ -42,7 +42,9 @@ func (ss *Streams) tryGetFromCache(db_name string, updatePeriodMs int) (StreamsR if !ok { return StreamsRecord{}, errors.New("no records for " + db_name) } - return rec, nil + res :=StreamsRecord{} + utils.DeepCopy(rec,&res) + return res, nil } func readStreams(db *Mongodb, db_name string) (StreamsRecord, error) { @@ -143,7 +145,7 @@ func updateStreamInfos(db *Mongodb, db_name string, rec *StreamsRecord) error { return err } } - if err := fillInfoFromLastRecord(db, db_name, rec, record, i); err != nil { // update from last record (timestamp, stream finished flag) + if err := fillInfoFromLastRecord(db, db_name, rec, record, i); err != nil { // update firstStream last record (timestamp, stream finished flag) return err } } @@ -174,26 +176,100 @@ func (ss *Streams) updateFromDb(db *Mongodb, db_name string) (StreamsRecord, err return rec, nil } -func (ss *Streams) getStreams(db *Mongodb, db_name string, from string) (StreamsRecord, error) { - streamsLock.Lock() - rec, err := ss.tryGetFromCache(db_name, db.settings.UpdateStreamCachePeriodMs) +func getFiltersFromString(filterString string) (string, string, error) { + firstStream := "" + streamStatus := "" + s := strings.Split(filterString, "_") + switch len(s) { + case 1: + firstStream = s[0] + case 2: + firstStream = s[0] + streamStatus = s[1] + default: + return "", "", errors.New("wrong format: " + filterString) + } + if streamStatus == "" { + streamStatus = stream_filter_all + } + return firstStream, streamStatus, nil +} + +func getStreamsParamsFromRequest(request Request) (string, string, error) { + if request.ExtraParam == "" { + return "", stream_filter_all, nil + } + + firstStream, streamStatus, err := getFiltersFromString(request.ExtraParam) if err != nil { - rec, err = ss.updateFromDb(db, db_name) + return "", "", err } - streamsLock.Unlock() + + err = checkStreamstreamStatus(streamStatus) if err != nil { - return StreamsRecord{}, err + return "", "", err + } + + return firstStream, streamStatus, nil +} + +func checkStreamstreamStatus(streamStatus string) error { + if !utils.StringInSlice(streamStatus, []string{stream_filter_all, stream_filter_finished, stream_filter_unfinished}) { + return errors.New("getStreamsParamsFromRequest: wrong streamStatus " + streamStatus) + } + return nil +} + +func keepStream(rec StreamInfo, streamStatus string) bool { + return (rec.Finished && streamStatus == stream_filter_finished) || (!rec.Finished && streamStatus == stream_filter_unfinished) +} + +func filterStreams(rec StreamsRecord, firstStream string, streamStatus string) []StreamInfo { + limitedStreams := limitStreams(rec, firstStream) + + if streamStatus == stream_filter_all { + return limitedStreams } + nextStreams := limitedStreams[:0] + for _, rec := range limitedStreams { + if keepStream(rec, streamStatus) { + nextStreams = append(nextStreams, rec) + } + } + return nextStreams +} - if from != "" { +func limitStreams(rec StreamsRecord, firstStream string) []StreamInfo { + if firstStream != "" { ind := len(rec.Streams) for i, rec := range rec.Streams { - if rec.Name == from { + if rec.Name == firstStream { ind = i break } } rec.Streams = rec.Streams[ind:] } + return rec.Streams +} + +func (ss *Streams) getStreams(db *Mongodb, request Request) (StreamsRecord, error) { + firstStream, streamStatus, err := getStreamsParamsFromRequest(request) + if err != nil { + return StreamsRecord{}, err + } + + streamsLock.Lock() + rec, err := ss.tryGetFromCache(request.DbName, db.settings.UpdateStreamCachePeriodMs) + if err != nil { + rec, err = ss.updateFromDb(db, request.DbName) + } + streamsLock.Unlock() + if err != nil { + return StreamsRecord{}, err + } + + rec.Streams = filterStreams(rec, firstStream, streamStatus) + return rec, nil } diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index 08569051628ecec2f601cb8bf83a9a2ca1a644d6..7727b1a99e4ea88bc8ca0d27faef1c1ea960e6dd 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -39,6 +39,7 @@ const metaID_str = "0" var empty_next = map[string]string{"next_stream": ""} var rec1 = TestRecord{1, empty_next, "aaa", 0} +var rec1_later = TestRecord{1, empty_next, "aaa", 1} var rec_finished = TestRecord{2, map[string]string{"next_stream": "next1"}, finish_stream_keyword, 2} var rec2 = TestRecord{2, empty_next, "bbb", 1} var rec3 = TestRecord{3, empty_next, "ccc", 2} diff --git a/broker/src/asapo_broker/database/streams_test.go b/broker/src/asapo_broker/database/streams_test.go index 9408445be8c7667c5e25a520dbfeb97eddf4f36f..c972ba789f8d4b77e44f3c9d8b5a26416db770c9 100644 --- a/broker/src/asapo_broker/database/streams_test.go +++ b/broker/src/asapo_broker/database/streams_test.go @@ -3,6 +3,7 @@ package database import ( + "fmt" "github.com/stretchr/testify/suite" "testing" "time" @@ -26,16 +27,16 @@ func TestStreamsTestSuite(t *testing.T) { } func (suite *StreamsTestSuite) TestStreamsEmpty() { - rec, err := streams.getStreams(&db, "test", "") + rec, err := streams.getStreams(&db, Request{DbName: "test", ExtraParam: ""}) suite.Nil(err) suite.Empty(rec.Streams, 0) } func (suite *StreamsTestSuite) TestStreamsNotUsesCacheWhenEmpty() { db.settings.UpdateStreamCachePeriodMs = 1000 - streams.getStreams(&db, dbname, "") + streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) db.insertRecord(dbname, collection, &rec1) - rec, err := streams.getStreams(&db, dbname, "") + rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) suite.Nil(err) suite.Equal(1, len(rec.Streams)) } @@ -43,9 +44,9 @@ func (suite *StreamsTestSuite) TestStreamsNotUsesCacheWhenEmpty() { func (suite *StreamsTestSuite) TestStreamsUsesCache() { db.settings.UpdateStreamCachePeriodMs = 1000 db.insertRecord(dbname, collection, &rec2) - streams.getStreams(&db, dbname, "") + streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) db.insertRecord(dbname, collection, &rec1) - rec, err := streams.getStreams(&db, dbname, "") + rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) suite.Nil(err) suite.Equal(int64(1), rec.Streams[0].Timestamp) suite.Equal(false, rec.Streams[0].Finished) @@ -57,18 +58,19 @@ func (suite *StreamsTestSuite) TestStreamsGetFinishedInfo() { db.settings.UpdateStreamCachePeriodMs = 1000 db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec_finished) - rec, err := streams.getStreams(&db, dbname, "") + rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) suite.Nil(err) suite.Equal(int64(0), rec.Streams[0].Timestamp) suite.Equal(true, rec.Streams[0].Finished) suite.Equal("next1", rec.Streams[0].NextStream) } + func (suite *StreamsTestSuite) TestStreamsDataSetsGetFinishedInfo() { db.settings.UpdateStreamCachePeriodMs = 1000 db.insertRecord(dbname, collection, &rec_dataset1_incomplete) db.insertRecord(dbname, collection, &rec_finished) - rec, err := streams.getStreams(&db, dbname, "") + rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) suite.Nil(err) suite.Equal(int64(1), rec.Streams[0].Timestamp) suite.Equal(int64(2), rec.Streams[0].TimestampLast) @@ -77,13 +79,28 @@ func (suite *StreamsTestSuite) TestStreamsDataSetsGetFinishedInfo() { suite.Equal(int64(1), rec.Streams[0].LastId) } +func (suite *StreamsTestSuite) TestStreamsMultipleRequests() { + db.settings.UpdateStreamCachePeriodMs = 1000 + db.insertRecord(dbname, collection, &rec_dataset1_incomplete) + db.insertRecord(dbname, collection, &rec_finished) + db.insertRecord(dbname, collection2, &rec_dataset1_incomplete) + rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: "_unfinished"}) + rec2, err2 := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: "_finished"}) + suite.Nil(err) + suite.Equal(collection2, rec.Streams[0].Name) + suite.Equal(1, len(rec.Streams)) + suite.Nil(err2) + suite.Equal(1, len(rec2.Streams)) + suite.Equal(collection, rec2.Streams[0].Name) +} + func (suite *StreamsTestSuite) TestStreamsNotUsesCacheWhenExpired() { db.settings.UpdateStreamCachePeriodMs = 10 db.insertRecord(dbname, collection, &rec2) - streams.getStreams(&db, dbname, "") + streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) db.insertRecord(dbname, collection, &rec1) time.Sleep(time.Millisecond * 100) - rec, err := streams.getStreams(&db, dbname, "") + rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) suite.Nil(err) suite.Equal(int64(1), rec.Streams[0].Timestamp) } @@ -91,9 +108,50 @@ func (suite *StreamsTestSuite) TestStreamsNotUsesCacheWhenExpired() { func (suite *StreamsTestSuite) TestStreamRemovesDatabase() { db.settings.UpdateStreamCachePeriodMs = 0 db.insertRecord(dbname, collection, &rec1) - streams.getStreams(&db, dbname, "") + streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) db.dropDatabase(dbname) - rec, err := streams.getStreams(&db, dbname, "") + rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) suite.Nil(err) suite.Empty(rec.Streams, 0) } + +var streamFilterTests=[]struct{ + request Request + error bool + streams []string + message string +}{ + {request: Request{DbName:dbname, ExtraParam:""},error: false,streams: []string{collection,collection2},message: "default all streams"}, + {request: Request{DbName:dbname, ExtraParam:"_"},error: false,streams: []string{collection,collection2},message: "default _ all streams"}, + {request: Request{DbName:dbname, ExtraParam:collection},error: false,streams: []string{collection,collection2},message: "first parameter only - all streams"}, + {request: Request{DbName:dbname, ExtraParam:"_all"},error: false,streams: []string{collection,collection2},message: "second parameter only - all streams"}, + {request: Request{DbName:dbname, ExtraParam:"_finished"},error: false,streams: []string{collection2},message: "second parameter only - finished streams"}, + {request: Request{DbName:dbname, ExtraParam:"_unfinished"},error: false,streams: []string{collection},message: "second parameter only - unfinished streams"}, + {request: Request{DbName:dbname, ExtraParam:collection2+"_all"},error: false,streams: []string{collection2},message: "from stream2"}, + {request: Request{DbName:dbname, ExtraParam:collection2+"_unfinished"},error: false,streams: []string{},message: "from stream2 and filter"}, + {request: Request{DbName:dbname, ExtraParam:collection2+"_bla"},error: true,streams: []string{},message: "wrong filter"}, + {request: Request{DbName:dbname, ExtraParam:collection2+"_all_aaa"},error: true,streams: []string{},message: "wrong filter2"}, + {request: Request{DbName:dbname, ExtraParam:"blabla"},error: false,streams: []string{},message: "from unknown stream returns nothing"}, + {request: Request{DbName:dbname, ExtraParam:collection2+"_"},error: false,streams: []string{collection2},message: "from stream2, first parameter only"}, +} + +func (suite *StreamsTestSuite) TestStreamFilters() { + db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection2, &rec1_later) + db.insertRecord(dbname, collection2, &rec_finished) + for _, test := range streamFilterTests { + rec, err := streams.getStreams(&db, test.request) + if test.error { + suite.NotNil(err,test.message) + continue + } + if err!=nil { + fmt.Println(err.Error()) + } + streams:=make([]string,0) + for _,si:=range rec.Streams { + streams=append(streams,si.Name) + } + suite.Equal(test.streams,streams,test.message) + } +} \ No newline at end of file diff --git a/broker/src/asapo_broker/server/get_commands_test.go b/broker/src/asapo_broker/server/get_commands_test.go index 9870e6d2d6ee1149391d64ba8a2219fbab5a752b..0fc547da341cf9fdc04312a0480c6f05046929f2 100644 --- a/broker/src/asapo_broker/server/get_commands_test.go +++ b/broker/src/asapo_broker/server/get_commands_test.go @@ -49,7 +49,7 @@ var testsGetCommand = []struct { expectedGroupID + "/next","&resend_nacks=true&delay_ms=10000&resend_attempts=3","10000_3"}, {"size", expectedStream, "", expectedStream + "/size","",""}, {"size", expectedStream, "", expectedStream + "/size","&incomplete=true","true"}, - {"streams", "0", "", "0/streams","",""}, + {"streams", "0", "", "0/streams","","_"}, {"lastack", expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/lastack","",""}, } diff --git a/broker/src/asapo_broker/server/get_streams.go b/broker/src/asapo_broker/server/get_streams.go index 335f15a6eff8b6698bdc338632d6d360b7891b5a..a22274553f58663c2bdbd830c246344b48f0dea9 100644 --- a/broker/src/asapo_broker/server/get_streams.go +++ b/broker/src/asapo_broker/server/get_streams.go @@ -7,5 +7,6 @@ import ( func routeGetStreams(w http.ResponseWriter, r *http.Request) { keys := r.URL.Query() from := keys.Get("from") - processRequest(w, r, "streams", from, false) + filter := keys.Get("filter") + processRequest(w, r, "streams", from+"_"+filter, false) } diff --git a/common/go/src/asapo_common/utils/helpers.go b/common/go/src/asapo_common/utils/helpers.go index e809d31705b3505f704f12e8ca6eb31878e16805..714cebf61ad85db215259c5147d06af861c51ad5 100644 --- a/common/go/src/asapo_common/utils/helpers.go +++ b/common/go/src/asapo_common/utils/helpers.go @@ -25,6 +25,12 @@ func MapToJson(res interface{}) ([]byte, error) { } } +func DeepCopy(a, b interface{}) { + byt, _ := json.Marshal(a) + json.Unmarshal(byt, b) +} + + func GetInt64FromMap(s map[string]interface{}, name string) (int64,bool) { val, ok := InterfaceToInt64(s[name]) if ok { diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index 19f632c34e508c029acf5d60982f8283609ee0eb..2cd1d71b678dcb22554489f9f9bf4428f970f64e 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -15,7 +15,7 @@ namespace asapo { enum class StreamFilter { kAllStreams, kFinishedStreams, - kUnFinishedStreams + kUnfinishedStreams }; class Consumer { diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index 42b3688a1a558e66cdba73dafa0e63bfdd4f8c3b..84a8fe6c9627970a204629e683ffdabbe2790e0a 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -661,7 +661,7 @@ std::string filterToString(StreamFilter filter) { return "all"; case StreamFilter::kFinishedStreams: return "finished"; - case StreamFilter::kUnFinishedStreams: + case StreamFilter::kUnfinishedStreams: return "unfinished"; } } @@ -674,7 +674,6 @@ StreamInfos ConsumerImpl::GetStreamList(std::string from,StreamFilter filter, Er if (*err) { return StreamInfos{}; } - return ParseStreamsFromResponse(std::move(response), err); } diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 279ae39e4adcf0bff444fa694d584d67b10928b8..f0c9fa99c9dc5885b8bee77d9227d89db15d4724 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -58,7 +58,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo": pass StreamFilter StreamFilter_kAllStreams "asapo::StreamFilter::kAllStreams" StreamFilter StreamFilter_kFinishedStreams "asapo::StreamFilter::kFinishedStreams" - StreamFilter StreamFilter_kUnFinishedStreams "asapo::StreamFilter::kUnFinishedStreams" + StreamFilter StreamFilter_kUnfinishedStreams "asapo::StreamFilter::kUnfinishedStreams" cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: cdef cppclass Consumer: diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 346941f2885d3a88637c021fae2a44f402f2bab0..88d5be838750d5e531875823885e41a3da7369c9 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -110,7 +110,7 @@ cdef class PyConsumer: elif filter == "finished": return StreamFilter_kFinishedStreams elif filter == "unfinished": - return StreamFilter_kUnFinishedStreams + return StreamFilter_kUnfinishedStreams else: raise AsapoWrongInputError("wrong filter, must be all|finished|unfinished") def _op(self, op, group_id, stream, meta_only, uint64_t id): diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py index c253db6e4768426b6b9d83e69453a939f015ff0c..b150e5fb57b5fb14c76e78d1662d6525d43e014b 100644 --- a/tests/automatic/consumer/consumer_api_python/consumer_api.py +++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py @@ -145,6 +145,15 @@ def check_single(consumer, group_id): assert_eq(streams[3]["finished"], True, "streams_finished3") assert_eq(streams[3]["nextStream"], "", "next stream 3") assert_eq(streams[3]["lastId"], 5, "last id stream 3") + + finished_streams = consumer.get_stream_list("","finished") + assert_eq(len(finished_streams), 2, "number of finished streams") + assert_eq(finished_streams[0]["name"], "stream1", "finished streams_name1") + + unfinished_streams = consumer.get_stream_list("","unfinished") + assert_eq(len(unfinished_streams), 2, "number of unfinished streams") + assert_eq(unfinished_streams[0]["name"], "default", "unfinished streams_name1") + # acks try: id = consumer.get_last_acknowledged_message(group_id)