From e169bdb0a9773188126bc9cc4a849cd030fb4ecf Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Wed, 5 May 2021 17:40:17 +0200
Subject: [PATCH] implement delete stream in broker

---
 broker/src/asapo_broker/database/mongodb.go   | 90 +++++++++++++++++++
 .../src/asapo_broker/database/mongodb_test.go | 29 ++++++
 .../asapo_broker/server/process_request.go    |  6 +-
 .../server/process_request_test.go            | 59 ++++++++----
 .../src/asapo_broker/server/request_common.go | 10 ++-
 5 files changed, 172 insertions(+), 22 deletions(-)

diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go
index ad70079db..e00e8a0a6 100644
--- a/broker/src/asapo_broker/database/mongodb.go
+++ b/broker/src/asapo_broker/database/mongodb.go
@@ -791,6 +791,94 @@ func (db *Mongodb) nacks(request Request) ([]byte, error) {
 	return utils.MapToJson(&res)
 }
 
+func (db *Mongodb) deleteCollection(request Request, name string) error {
+	return db.client.Database(request.DbName).Collection(name).Drop(context.Background())
+}
+
+func (db *Mongodb) collectionExist(request Request, name string) (bool, error) {
+	result, err := db.client.Database(request.DbName).ListCollectionNames(context.TODO(), bson.M{"name": name})
+	if err != nil {
+		return false, err
+	}
+	if len(result) == 1 {
+		return true, nil
+	}
+	return false, nil
+}
+
+func (db *Mongodb) deleteDataCollection(errorOnNotexist bool, request Request) error {
+	dataCol := data_collection_name_prefix + request.DbCollectionName
+	if errorOnNotexist {
+		exist, err := db.collectionExist(request, dataCol)
+		if err != nil || !exist {
+			return err
+		}
+	}
+	return db.deleteCollection(request, dataCol)
+}
+
+func (db *Mongodb) deleteDocumentsInCollection(request Request, collection string, field string, pattern string) error {
+	filter := bson.M{field: bson.D{{"$regex", primitive.Regex{Pattern: pattern, Options: "i"}}}}
+	_, err := db.client.Database(request.DbName).Collection(collection).DeleteMany(context.TODO(), filter)
+	return err
+}
+
+func (db *Mongodb) deleteCollectionsWithPrefix(request Request, prefix string) error {
+	cols, err := db.client.Database(request.DbName).ListCollectionNames(context.TODO(), bson.M{"name": bson.D{
+		{"$regex", primitive.Regex{Pattern: "^" + prefix, Options: "i"}}}})
+	if err != nil {
+		return err
+	}
+
+	for _, col := range cols {
+		err := db.deleteCollection(request, col)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (db *Mongodb) deleteServiceMeta(request Request) (error) {
+	err := db.deleteCollectionsWithPrefix(request, acks_collection_name_prefix+request.DbCollectionName)
+	if err != nil {
+		return  err
+	}
+	err = db.deleteCollectionsWithPrefix(request, inprocess_collection_name_prefix+request.DbCollectionName)
+	if err != nil {
+		return  err
+	}
+	return db.deleteDocumentsInCollection(request, pointer_collection_name, "_id", ".*_"+request.DbCollectionName+"$")
+}
+
+func (db *Mongodb) deleteStream(request Request) ([]byte, error) {
+	params := struct {
+		ErrorOnNotExist *bool
+		DeleteMeta      *bool
+	}{}
+	err := json.Unmarshal([]byte(request.ExtraParam), &params)
+	if err != nil {
+		return nil, err
+	}
+
+	if params.DeleteMeta == nil || params.ErrorOnNotExist == nil {
+		return nil, errors.New("wrong params: " + request.ExtraParam)
+	}
+	if !*params.DeleteMeta {
+		logger.Debug("skipping delete stream meta for " + request.DbCollectionName + " in " + request.DbName)
+		return nil, nil
+	}
+
+	err = db.deleteDataCollection(*params.ErrorOnNotExist, request)
+	if err != nil {
+		return nil, err
+	}
+
+	err = db.deleteServiceMeta(request)
+	return nil, err
+}
+
 func (db *Mongodb) lastAck(request Request) ([]byte, error) {
 	c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId)
 	opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true)
@@ -909,6 +997,8 @@ func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) {
 		return db.nacks(request)
 	case "lastack":
 		return db.lastAck(request)
+	case "delete_stream":
+		return db.deleteStream(request)
 	}
 
 	return nil, errors.New("Wrong db operation: " + request.Op)
diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go
index b0f2e97a3..abb99ded1 100644
--- a/broker/src/asapo_broker/database/mongodb_test.go
+++ b/broker/src/asapo_broker/database/mongodb_test.go
@@ -1154,3 +1154,32 @@ func TestMongoDBGetNextClearsInprocessAfterReset(t *testing.T) {
 	assert.Equal(t, string(rec1_expect), string(res2))
 	assert.Equal(t, string(rec1_expect), string(res3))
 }
+
+var testsDeleteStream = []struct {
+	stream  string
+	params  string
+	ok      bool
+	message string
+}{
+	{"test", "{\"ErrorOnNotExist\":true,\"DeleteMeta\":true}", true, "delete stream"},
+}
+
+func TestDeleteStreams(t *testing.T) {
+	for _, test := range testsDeleteStream {
+		db.Connect(dbaddress)
+		db.insertRecord(dbname, test.stream, &rec_finished11)
+
+		_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: test.stream, GroupId: "", Op: "delete_stream", ExtraParam: test.params})
+		if test.ok {
+			rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""})
+			acks_exist,_:= db.collectionExist(Request{DbName: dbname, ExtraParam: ""},acks_collection_name_prefix+test.stream)
+			inprocess_exist,_:= db.collectionExist(Request{DbName: dbname, ExtraParam: ""},inprocess_collection_name_prefix+test.stream)
+			assert.Equal(t,0,len(rec.Streams),test.message)
+			assert.Equal(t,false,acks_exist,test.message)
+			assert.Equal(t,false,inprocess_exist,test.message)
+			assert.Nil(t, err, test.message)
+		} else {
+			assert.NotNil(t, err, test.message)
+		}
+	}
+}
diff --git a/broker/src/asapo_broker/server/process_request.go b/broker/src/asapo_broker/server/process_request.go
index 87b6a5075..27350fed5 100644
--- a/broker/src/asapo_broker/server/process_request.go
+++ b/broker/src/asapo_broker/server/process_request.go
@@ -55,6 +55,10 @@ func checkBrokerApiVersion(w http.ResponseWriter, r *http.Request) bool {
 	return ok
 }
 
+func needWriteAccess(op string) bool {
+	return op=="delete_stream";
+}
+
 func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_param string, needGroupID bool) {
 	if ok := checkBrokerApiVersion(w, r); !ok {
 		return
@@ -68,7 +72,7 @@ func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_par
 		return
 	}
 
-	if err := authorize(r, db_name); err != nil {
+	if err := authorize(r, db_name, needWriteAccess(op)); err != nil {
 		writeAuthAnswer(w, "get "+op, db_name, err)
 		return
 	}
diff --git a/broker/src/asapo_broker/server/process_request_test.go b/broker/src/asapo_broker/server/process_request_test.go
index f29db642a..972487691 100644
--- a/broker/src/asapo_broker/server/process_request_test.go
+++ b/broker/src/asapo_broker/server/process_request_test.go
@@ -17,7 +17,7 @@ import (
 	"time"
 )
 
-var correctTokenSuffix, wrongTokenSuffix, suffixWithWrongToken, expectedBeamtimeId, expectedDBName string
+var correctTokenSuffix, correctTokenSuffixWrite, wrongTokenSuffix, suffixWithWrongToken, expectedBeamtimeId, expectedDBName string
 
 const expectedGroupID = "bid2a5auidddp1vl71d0"
 const wrongGroupID = "_bid2a5auidddp1vl71"
@@ -27,19 +27,26 @@ const expectedStream = "stream"
 type MockAuthServer struct {
 }
 
-func (a * MockAuthServer) AuthorizeToken(tokenJWT string) (token Token, err error) {
-	if tokenJWT =="ok" {
+func (a *MockAuthServer) AuthorizeToken(tokenJWT string) (token Token, err error) {
+	if tokenJWT == "ok" {
 		return Token{
 			structs.IntrospectTokenResponse{
-			Sub:        "bt_"+expectedBeamtimeId,
-			AccessTypes: []string{"read"},
+				Sub:         "bt_" + expectedBeamtimeId,
+				AccessTypes: []string{"read"},
 			},
-		},nil
-	} else {
-		return Token{},errors.New("wrong JWT token")
+		}, nil
+	}
+	if tokenJWT == "ok_write" {
+		return Token{
+			structs.IntrospectTokenResponse{
+				Sub:         "bt_" + expectedBeamtimeId,
+				AccessTypes: []string{"read", "write"},
+			},
+		}, nil
 	}
-}
 
+	return Token{}, errors.New("wrong JWT token")
+}
 
 func prepareTestAuth() {
 	expectedBeamtimeId = "beamtime_id"
@@ -47,6 +54,7 @@ func prepareTestAuth() {
 
 	auth = &MockAuthServer{}
 	correctTokenSuffix = "?token=ok"
+	correctTokenSuffixWrite = "?token=ok_write"
 	wrongTokenSuffix = "?blablabla=aa"
 	suffixWithWrongToken = "?token=wrong"
 }
@@ -143,7 +151,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithNoToken() {
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() {
 
-	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId:expectedGroupID, Op: "next"}
+	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, Op: "next"}
 
 	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""),
 		&database.DBError{utils.StatusNoData, ""})
@@ -157,7 +165,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName()
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() {
 
-	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId:expectedGroupID, Op: "next"}
+	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, Op: "next"}
 
 	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""),
 		&database.DBError{utils.StatusServiceUnavailable, ""})
@@ -173,8 +181,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() {
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() {
 
-	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId:expectedGroupID, Op: "next"}
-
+	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, Op: "next"}
 
 	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), errors.New(""))
 	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next")))
@@ -189,10 +196,9 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() {
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() {
 
-	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId:expectedGroupID, Op: "next"}
+	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, Op: "next"}
 	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil)
 
-
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName)))
 
 	doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix)
@@ -207,7 +213,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWrongGroupID() {
 
 func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() {
 
-	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId:expectedGroupID, DatasetOp:true, Op: "next"}
+	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, DatasetOp: true, Op: "next"}
 	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil)
 
 	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName)))
@@ -215,8 +221,25 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() {
 	doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix + "&dataset=true")
 }
 
-
 func (suite *ProcessRequestTestSuite) TestProcessRequestErrorOnWrongProtocol() {
-	w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix,"GET","","/v1.2")
+	w := doRequest("/beamtime/"+expectedBeamtimeId+"/"+expectedSource+"/"+expectedStream+"/"+expectedGroupID+"/next"+correctTokenSuffix, "GET", "", "/v1.2")
 	suite.Equal(http.StatusUnsupportedMediaType, w.Code, "wrong protocol")
 }
+
+func (suite *ProcessRequestTestSuite) TestProcessRequestDeleteStreamReadToken() {
+	query_str := "query_string"
+	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("wrong token access")))
+	w := doRequest("/beamtime/"+expectedBeamtimeId+"/"+expectedSource+"/"+expectedStream+"/delete"+correctTokenSuffix, "POST", query_str)
+	suite.Equal(http.StatusUnauthorized, w.Code, "wrong token type")
+
+}
+
+func (suite *ProcessRequestTestSuite) TestProcessRequestDeleteStreamWriteToken() {
+	query_str := "query_string"
+
+	expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: "", Op: "delete_stream", ExtraParam: query_str}
+	suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil)
+
+	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request delete_stream in "+expectedDBName)))
+	doRequest("/beamtime/"+expectedBeamtimeId+"/"+expectedSource+"/"+expectedStream+"/delete"+correctTokenSuffixWrite, "POST", query_str)
+}
diff --git a/broker/src/asapo_broker/server/request_common.go b/broker/src/asapo_broker/server/request_common.go
index 9476a5a7c..49b65f863 100644
--- a/broker/src/asapo_broker/server/request_common.go
+++ b/broker/src/asapo_broker/server/request_common.go
@@ -50,7 +50,7 @@ func datasetRequested(r *http.Request) (bool, int) {
 	return valueTrue(r, "dataset"), valueInt(r, "minsize")
 }
 
-func authorize(r *http.Request, beamtime_id string) error {
+func authorize(r *http.Request, beamtime_id string, needWriteAccess bool) error {
 	tokenJWT := r.URL.Query().Get("token")
 
 	if len(tokenJWT) == 0 {
@@ -67,7 +67,7 @@ func authorize(r *http.Request, beamtime_id string) error {
 		return err
 	}
 
-	return checkAccessType(token.AccessTypes)
+	return checkAccessType(token.AccessTypes,needWriteAccess)
 }
 
 func checkSubject(subject string, beamtime_id string) error {
@@ -77,7 +77,11 @@ func checkSubject(subject string, beamtime_id string) error {
 	return nil
 }
 
-func checkAccessType(accessTypes []string) error {
+func checkAccessType(accessTypes []string, needWriteAccess bool) error {
+	if needWriteAccess && !utils.StringInSlice("write",accessTypes) {
+		return errors.New("wrong token access type")
+	}
+
 	if !utils.StringInSlice("read",accessTypes) {
 		return errors.New("wrong token access type")
 	}
-- 
GitLab