From c500aa4d96ec5d89006b5a749a037b6734d68bb5 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Mon, 4 Oct 2021 12:31:12 +0200
Subject: [PATCH] allow _  in stream name

---
 .../asapo_broker/database/mongodb_streams.go  | 12 ++-----
 .../src/asapo_broker/database/mongodb_test.go |  2 +-
 .../src/asapo_broker/database/streams_test.go | 27 +++++++--------
 .../asapo_broker/server/get_commands_test.go  |  2 +-
 broker/src/asapo_broker/server/get_streams.go |  5 ++-
 common/go/src/asapo_common/utils/helpers.go   | 20 ++++++++++-
 .../go/src/asapo_common/utils/helpers_test.go | 33 +++++++++++++++++++
 7 files changed, 74 insertions(+), 27 deletions(-)
 create mode 100644 common/go/src/asapo_common/utils/helpers_test.go

diff --git a/broker/src/asapo_broker/database/mongodb_streams.go b/broker/src/asapo_broker/database/mongodb_streams.go
index 243df816d..a182f5080 100644
--- a/broker/src/asapo_broker/database/mongodb_streams.go
+++ b/broker/src/asapo_broker/database/mongodb_streams.go
@@ -190,16 +190,8 @@ func (ss *Streams) updateFromDb(db *Mongodb, db_name string) (StreamsRecord, err
 }
 
 func getFiltersFromString(filterString string) (string, string, error) {
-	firstStream := ""
-	streamStatus := ""
-	s := strings.Split(filterString, "_")
-	switch len(s) {
-	case 1:
-		firstStream = s[0]
-	case 2:
-		firstStream = s[0]
-		streamStatus = s[1]
-	default:
+	firstStream, streamStatus, err := utils.DecodeTwoStrings(filterString)
+	if err!=nil {
 		return "", "", errors.New("wrong format: " + filterString)
 	}
 	if streamStatus == "" {
diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go
index e4c8f458d..c81813611 100644
--- a/broker/src/asapo_broker/database/mongodb_test.go
+++ b/broker/src/asapo_broker/database/mongodb_test.go
@@ -930,7 +930,7 @@ func TestMongoDBListStreams(t *testing.T) {
 		}
 		var rec_streams_expect, _ = json.Marshal(test.expectedStreams)
 
-		res, err := db.ProcessRequest(Request{DbName: dbname, Stream: "0", Op: "streams", ExtraParam: test.from})
+		res, err := db.ProcessRequest(Request{DbName: dbname, Stream: "0", Op: "streams", ExtraParam: utils.EncodeTwoStrings(test.from,"")})
 		if test.ok {
 			assert.Nil(t, err, test.test)
 			assert.Equal(t, string(rec_streams_expect), string(res), test.test)
diff --git a/broker/src/asapo_broker/database/streams_test.go b/broker/src/asapo_broker/database/streams_test.go
index fef6ff3af..4ba11e0b3 100644
--- a/broker/src/asapo_broker/database/streams_test.go
+++ b/broker/src/asapo_broker/database/streams_test.go
@@ -3,6 +3,7 @@
 package database
 
 import (
+	"asapo_common/utils"
 	"fmt"
 	"github.com/stretchr/testify/suite"
 	"testing"
@@ -105,8 +106,8 @@ func (suite *StreamsTestSuite) TestStreamsMultipleRequests() {
 	db.insertRecord(dbname, collection, &rec_dataset1_incomplete)
 	db.insertRecord(dbname, collection, &rec_finished)
 	db.insertRecord(dbname, collection2, &rec_dataset1_incomplete)
-	rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: "_unfinished"})
-	rec2, err2 := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: "_finished"})
+	rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: "0/unfinished"})
+	rec2, err2 := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: "0/finished"})
 	suite.Nil(err)
 	suite.Equal(collection2, rec.Streams[0].Name)
 	suite.Equal(1, len(rec.Streams))
@@ -143,17 +144,17 @@ var streamFilterTests=[]struct{
 	message string
 }{
 	{request: Request{DbName:dbname, ExtraParam:""},error: false,streams: []string{collection,collection2},message: "default all streams"},
-	{request: Request{DbName:dbname, ExtraParam:"_"},error: false,streams: []string{collection,collection2},message: "default _ all streams"},
-	{request: Request{DbName:dbname, ExtraParam:collection},error: false,streams: []string{collection,collection2},message: "first parameter only -  all streams"},
-	{request: Request{DbName:dbname, ExtraParam:"_all"},error: false,streams: []string{collection,collection2},message: "second parameter only -  all streams"},
-	{request: Request{DbName:dbname, ExtraParam:"_finished"},error: false,streams: []string{collection2},message: "second parameter only -  finished streams"},
-	{request: Request{DbName:dbname, ExtraParam:"_unfinished"},error: false,streams: []string{collection},message: "second parameter only -  unfinished streams"},
-	{request: Request{DbName:dbname, ExtraParam:collection2+"_all"},error: false,streams: []string{collection2},message: "from stream2"},
-	{request: Request{DbName:dbname, ExtraParam:collection2+"_unfinished"},error: false,streams: []string{},message: "from stream2 and filter"},
-	{request: Request{DbName:dbname, ExtraParam:collection2+"_bla"},error: true,streams: []string{},message: "wrong filter"},
-	{request: Request{DbName:dbname, ExtraParam:collection2+"_all_aaa"},error: true,streams: []string{},message: "wrong filter2"},
-	{request: Request{DbName:dbname, ExtraParam:"blabla"},error: false,streams: []string{},message: "from unknown stream returns nothing"},
-	{request: Request{DbName:dbname, ExtraParam:collection2+"_"},error: false,streams: []string{collection2},message: "from stream2, first parameter only"},
+	{request: Request{DbName:dbname, ExtraParam:"0/"},error: false,streams: []string{collection,collection2},message: "default 0/ all streams"},
+	{request: Request{DbName:dbname, ExtraParam:utils.EncodeTwoStrings(collection,"")},error: false,streams: []string{collection,collection2},message: "first parameter only -  all streams"},
+	{request: Request{DbName:dbname, ExtraParam:"0/all"},error: false,streams: []string{collection,collection2},message: "second parameter only -  all streams"},
+	{request: Request{DbName:dbname, ExtraParam:"0/finished"},error: false,streams: []string{collection2},message: "second parameter only -  finished streams"},
+	{request: Request{DbName:dbname, ExtraParam:"0/unfinished"},error: false,streams: []string{collection},message: "second parameter only -  unfinished streams"},
+	{request: Request{DbName:dbname, ExtraParam:utils.EncodeTwoStrings(collection2,"all")},error: false,streams: []string{collection2},message: "from stream2"},
+	{request: Request{DbName:dbname, ExtraParam:utils.EncodeTwoStrings(collection2,"unfinished")},error: false,streams: []string{},message: "from stream2 and filter"},
+	{request: Request{DbName:dbname, ExtraParam:utils.EncodeTwoStrings(collection2,"bla")},error: true,streams: []string{},message: "wrong filter"},
+	{request: Request{DbName:dbname, ExtraParam:utils.EncodeTwoStrings(collection2,"all_aaa")},error: true,streams: []string{},message: "wrong filter2"},
+	{request: Request{DbName:dbname, ExtraParam:utils.EncodeTwoStrings("blabla","")},error: false,streams: []string{},message: "from unknown stream returns nothing"},
+	{request: Request{DbName:dbname, ExtraParam:utils.EncodeTwoStrings(collection2,"")},error: false,streams: []string{collection2},message: "from stream2, first parameter only"},
 }
 
 func (suite *StreamsTestSuite) TestStreamFilters() {
diff --git a/broker/src/asapo_broker/server/get_commands_test.go b/broker/src/asapo_broker/server/get_commands_test.go
index 980946e49..3f34750b2 100644
--- a/broker/src/asapo_broker/server/get_commands_test.go
+++ b/broker/src/asapo_broker/server/get_commands_test.go
@@ -52,7 +52,7 @@ var testsGetCommand = []struct {
 		expectedGroupID + "/next","&resend_nacks=true&delay_ms=10000&resend_attempts=3","10000_3"},
 	{"size", expectedSource,expectedStream, "", expectedStream  + "/size","",""},
 	{"size",expectedSource, expectedStream, "", expectedStream  + "/size","&incomplete=true","true"},
-	{"streams",expectedSource, "0", "", "0/streams","","_"},
+	{"streams",expectedSource, "0", "", "0/streams","","0/"},
 	{"lastack", expectedSource,expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/lastack","",""},
 }
 
diff --git a/broker/src/asapo_broker/server/get_streams.go b/broker/src/asapo_broker/server/get_streams.go
index a22274553..01e1e8edc 100644
--- a/broker/src/asapo_broker/server/get_streams.go
+++ b/broker/src/asapo_broker/server/get_streams.go
@@ -1,6 +1,7 @@
 package server
 
 import (
+	"asapo_common/utils"
 	"net/http"
 )
 
@@ -8,5 +9,7 @@ func routeGetStreams(w http.ResponseWriter, r *http.Request) {
 	keys := r.URL.Query()
 	from := keys.Get("from")
 	filter := keys.Get("filter")
-	processRequest(w, r, "streams", from+"_"+filter, false)
+	utils.EncodeTwoStrings(from,filter)
+	encoded := utils.EncodeTwoStrings(from,filter)
+	processRequest(w, r, "streams", encoded, false)
 }
diff --git a/common/go/src/asapo_common/utils/helpers.go b/common/go/src/asapo_common/utils/helpers.go
index 714cebf61..ed19b875d 100644
--- a/common/go/src/asapo_common/utils/helpers.go
+++ b/common/go/src/asapo_common/utils/helpers.go
@@ -2,9 +2,10 @@ package utils
 
 import (
 	json "encoding/json"
+	"errors"
 	"io/ioutil"
+	"strconv"
 	"strings"
-	"errors"
 )
 
 func StringInSlice(a string, list []string) bool {
@@ -102,3 +103,20 @@ func MapToStruct(m map[string]interface{}, val interface{}) error {
 	}
 	return nil
 }
+
+func EncodeTwoStrings(first,second string) string {
+	return strconv.Itoa(len(first))+"/"+first + second
+}
+
+func DecodeTwoStrings(encoded string) (string,string,error) {
+	temp := strings.SplitN(encoded,"/",2);
+	if len(temp)!=2 {
+		return "","",errors.New("wrong input: "+encoded)
+	}
+	length,err := strconv.Atoi(temp[0]);
+	if err!=nil {
+		return "","",errors.New("wrong input: "+encoded)
+	}
+	return temp[1][:length],temp[1][length:],err
+
+}
\ No newline at end of file
diff --git a/common/go/src/asapo_common/utils/helpers_test.go b/common/go/src/asapo_common/utils/helpers_test.go
new file mode 100644
index 000000000..6c817f837
--- /dev/null
+++ b/common/go/src/asapo_common/utils/helpers_test.go
@@ -0,0 +1,33 @@
+package utils
+
+import (
+	"github.com/stretchr/testify/assert"
+	"testing"
+)
+
+
+
+var encodeTests = []struct {
+	input       string
+	str1        string
+	str2       string
+	message    string
+}{
+	{"5/hellohello1","hello", "hello1","ok"},
+	{"0/","", "","ok"},
+	{"5/hello","hello", "","ok"},
+	{"10/hello_testall","hello_test", "all","ok"},
+
+}
+
+func TestEncodeDecode(t *testing.T) {
+	for _, test := range encodeTests {
+		encoded:=EncodeTwoStrings(test.str1,test.str2)
+		assert.Equal(t, test.input,encoded,test.message)
+		s1,s2,err:=DecodeTwoStrings(test.input)
+		assert.Equal(t, test.str1,s1,test.message)
+		assert.Equal(t, test.str2,s2,test.message)
+		assert.NoError(t, err,test.message)
+	}
+}
+
-- 
GitLab