diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index ad70079dbc7b041275d328747139728433bb424d..e00e8a0a645dd93e17a6c65831d5fab8883712c8 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -791,6 +791,94 @@ func (db *Mongodb) nacks(request Request) ([]byte, error) { return utils.MapToJson(&res) } +func (db *Mongodb) deleteCollection(request Request, name string) error { + return db.client.Database(request.DbName).Collection(name).Drop(context.Background()) +} + +func (db *Mongodb) collectionExist(request Request, name string) (bool, error) { + result, err := db.client.Database(request.DbName).ListCollectionNames(context.TODO(), bson.M{"name": name}) + if err != nil { + return false, err + } + if len(result) == 1 { + return true, nil + } + return false, nil +} + +func (db *Mongodb) deleteDataCollection(errorOnNotexist bool, request Request) error { + dataCol := data_collection_name_prefix + request.DbCollectionName + if errorOnNotexist { + exist, err := db.collectionExist(request, dataCol) + if err != nil || !exist { + return err + } + } + return db.deleteCollection(request, dataCol) +} + +func (db *Mongodb) deleteDocumentsInCollection(request Request, collection string, field string, pattern string) error { + filter := bson.M{field: bson.D{{"$regex", primitive.Regex{Pattern: pattern, Options: "i"}}}} + _, err := db.client.Database(request.DbName).Collection(collection).DeleteMany(context.TODO(), filter) + return err +} + +func (db *Mongodb) deleteCollectionsWithPrefix(request Request, prefix string) error { + cols, err := db.client.Database(request.DbName).ListCollectionNames(context.TODO(), bson.M{"name": bson.D{ + {"$regex", primitive.Regex{Pattern: "^" + prefix, Options: "i"}}}}) + if err != nil { + return err + } + + for _, col := range cols { + err := db.deleteCollection(request, col) + if err != nil { + return err + } + } + + return nil +} + +func (db *Mongodb) deleteServiceMeta(request Request) (error) { + err := db.deleteCollectionsWithPrefix(request, acks_collection_name_prefix+request.DbCollectionName) + if err != nil { + return err + } + err = db.deleteCollectionsWithPrefix(request, inprocess_collection_name_prefix+request.DbCollectionName) + if err != nil { + return err + } + return db.deleteDocumentsInCollection(request, pointer_collection_name, "_id", ".*_"+request.DbCollectionName+"$") +} + +func (db *Mongodb) deleteStream(request Request) ([]byte, error) { + params := struct { + ErrorOnNotExist *bool + DeleteMeta *bool + }{} + err := json.Unmarshal([]byte(request.ExtraParam), ¶ms) + if err != nil { + return nil, err + } + + if params.DeleteMeta == nil || params.ErrorOnNotExist == nil { + return nil, errors.New("wrong params: " + request.ExtraParam) + } + if !*params.DeleteMeta { + logger.Debug("skipping delete stream meta for " + request.DbCollectionName + " in " + request.DbName) + return nil, nil + } + + err = db.deleteDataCollection(*params.ErrorOnNotExist, request) + if err != nil { + return nil, err + } + + err = db.deleteServiceMeta(request) + return nil, err +} + func (db *Mongodb) lastAck(request Request) ([]byte, error) { c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId) opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true) @@ -909,6 +997,8 @@ func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) { return db.nacks(request) case "lastack": return db.lastAck(request) + case "delete_stream": + return db.deleteStream(request) } return nil, errors.New("Wrong db operation: " + request.Op) diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index b0f2e97a38901e168cb1964a44beacb2de73593a..abb99ded14e8410b453ccaed44e7db8c75fb0a6b 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -1154,3 +1154,32 @@ func TestMongoDBGetNextClearsInprocessAfterReset(t *testing.T) { assert.Equal(t, string(rec1_expect), string(res2)) assert.Equal(t, string(rec1_expect), string(res3)) } + +var testsDeleteStream = []struct { + stream string + params string + ok bool + message string +}{ + {"test", "{\"ErrorOnNotExist\":true,\"DeleteMeta\":true}", true, "delete stream"}, +} + +func TestDeleteStreams(t *testing.T) { + for _, test := range testsDeleteStream { + db.Connect(dbaddress) + db.insertRecord(dbname, test.stream, &rec_finished11) + + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: test.stream, GroupId: "", Op: "delete_stream", ExtraParam: test.params}) + if test.ok { + rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) + acks_exist,_:= db.collectionExist(Request{DbName: dbname, ExtraParam: ""},acks_collection_name_prefix+test.stream) + inprocess_exist,_:= db.collectionExist(Request{DbName: dbname, ExtraParam: ""},inprocess_collection_name_prefix+test.stream) + assert.Equal(t,0,len(rec.Streams),test.message) + assert.Equal(t,false,acks_exist,test.message) + assert.Equal(t,false,inprocess_exist,test.message) + assert.Nil(t, err, test.message) + } else { + assert.NotNil(t, err, test.message) + } + } +} diff --git a/broker/src/asapo_broker/server/process_request.go b/broker/src/asapo_broker/server/process_request.go index 87b6a5075c842d859a9d40b47666f2b81f96cd8f..27350fed5c270b2555d511eb70dcc0cb5ef64453 100644 --- a/broker/src/asapo_broker/server/process_request.go +++ b/broker/src/asapo_broker/server/process_request.go @@ -55,6 +55,10 @@ func checkBrokerApiVersion(w http.ResponseWriter, r *http.Request) bool { return ok } +func needWriteAccess(op string) bool { + return op=="delete_stream"; +} + func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_param string, needGroupID bool) { if ok := checkBrokerApiVersion(w, r); !ok { return @@ -68,7 +72,7 @@ func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_par return } - if err := authorize(r, db_name); err != nil { + if err := authorize(r, db_name, needWriteAccess(op)); err != nil { writeAuthAnswer(w, "get "+op, db_name, err) return } diff --git a/broker/src/asapo_broker/server/process_request_test.go b/broker/src/asapo_broker/server/process_request_test.go index f29db642a59e41a09672092e9fef1e03fe225e3c..97248769155270c0bd33a7b194b25c2721169576 100644 --- a/broker/src/asapo_broker/server/process_request_test.go +++ b/broker/src/asapo_broker/server/process_request_test.go @@ -17,7 +17,7 @@ import ( "time" ) -var correctTokenSuffix, wrongTokenSuffix, suffixWithWrongToken, expectedBeamtimeId, expectedDBName string +var correctTokenSuffix, correctTokenSuffixWrite, wrongTokenSuffix, suffixWithWrongToken, expectedBeamtimeId, expectedDBName string const expectedGroupID = "bid2a5auidddp1vl71d0" const wrongGroupID = "_bid2a5auidddp1vl71" @@ -27,19 +27,26 @@ const expectedStream = "stream" type MockAuthServer struct { } -func (a * MockAuthServer) AuthorizeToken(tokenJWT string) (token Token, err error) { - if tokenJWT =="ok" { +func (a *MockAuthServer) AuthorizeToken(tokenJWT string) (token Token, err error) { + if tokenJWT == "ok" { return Token{ structs.IntrospectTokenResponse{ - Sub: "bt_"+expectedBeamtimeId, - AccessTypes: []string{"read"}, + Sub: "bt_" + expectedBeamtimeId, + AccessTypes: []string{"read"}, }, - },nil - } else { - return Token{},errors.New("wrong JWT token") + }, nil + } + if tokenJWT == "ok_write" { + return Token{ + structs.IntrospectTokenResponse{ + Sub: "bt_" + expectedBeamtimeId, + AccessTypes: []string{"read", "write"}, + }, + }, nil } -} + return Token{}, errors.New("wrong JWT token") +} func prepareTestAuth() { expectedBeamtimeId = "beamtime_id" @@ -47,6 +54,7 @@ func prepareTestAuth() { auth = &MockAuthServer{} correctTokenSuffix = "?token=ok" + correctTokenSuffixWrite = "?token=ok_write" wrongTokenSuffix = "?blablabla=aa" suffixWithWrongToken = "?token=wrong" } @@ -143,7 +151,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithNoToken() { func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() { - expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId:expectedGroupID, Op: "next"} + expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, Op: "next"} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), &database.DBError{utils.StatusNoData, ""}) @@ -157,7 +165,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() { - expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId:expectedGroupID, Op: "next"} + expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, Op: "next"} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), &database.DBError{utils.StatusServiceUnavailable, ""}) @@ -173,8 +181,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() { func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() { - expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId:expectedGroupID, Op: "next"} - + expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, Op: "next"} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), errors.New("")) logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next"))) @@ -189,10 +196,9 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() { func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() { - expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId:expectedGroupID, Op: "next"} + expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, Op: "next"} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil) - logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName))) doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix) @@ -207,7 +213,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWrongGroupID() { func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() { - expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId:expectedGroupID, DatasetOp:true, Op: "next"} + expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, DatasetOp: true, Op: "next"} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName))) @@ -215,8 +221,25 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() { doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix + "&dataset=true") } - func (suite *ProcessRequestTestSuite) TestProcessRequestErrorOnWrongProtocol() { - w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix,"GET","","/v1.2") + w := doRequest("/beamtime/"+expectedBeamtimeId+"/"+expectedSource+"/"+expectedStream+"/"+expectedGroupID+"/next"+correctTokenSuffix, "GET", "", "/v1.2") suite.Equal(http.StatusUnsupportedMediaType, w.Code, "wrong protocol") } + +func (suite *ProcessRequestTestSuite) TestProcessRequestDeleteStreamReadToken() { + query_str := "query_string" + logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("wrong token access"))) + w := doRequest("/beamtime/"+expectedBeamtimeId+"/"+expectedSource+"/"+expectedStream+"/delete"+correctTokenSuffix, "POST", query_str) + suite.Equal(http.StatusUnauthorized, w.Code, "wrong token type") + +} + +func (suite *ProcessRequestTestSuite) TestProcessRequestDeleteStreamWriteToken() { + query_str := "query_string" + + expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: "", Op: "delete_stream", ExtraParam: query_str} + suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil) + + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request delete_stream in "+expectedDBName))) + doRequest("/beamtime/"+expectedBeamtimeId+"/"+expectedSource+"/"+expectedStream+"/delete"+correctTokenSuffixWrite, "POST", query_str) +} diff --git a/broker/src/asapo_broker/server/request_common.go b/broker/src/asapo_broker/server/request_common.go index 9476a5a7c0ced30369aa3f05be93360188c727a7..49b65f86308c8ddb05e4b09f2f44cf1b206c84ef 100644 --- a/broker/src/asapo_broker/server/request_common.go +++ b/broker/src/asapo_broker/server/request_common.go @@ -50,7 +50,7 @@ func datasetRequested(r *http.Request) (bool, int) { return valueTrue(r, "dataset"), valueInt(r, "minsize") } -func authorize(r *http.Request, beamtime_id string) error { +func authorize(r *http.Request, beamtime_id string, needWriteAccess bool) error { tokenJWT := r.URL.Query().Get("token") if len(tokenJWT) == 0 { @@ -67,7 +67,7 @@ func authorize(r *http.Request, beamtime_id string) error { return err } - return checkAccessType(token.AccessTypes) + return checkAccessType(token.AccessTypes,needWriteAccess) } func checkSubject(subject string, beamtime_id string) error { @@ -77,7 +77,11 @@ func checkSubject(subject string, beamtime_id string) error { return nil } -func checkAccessType(accessTypes []string) error { +func checkAccessType(accessTypes []string, needWriteAccess bool) error { + if needWriteAccess && !utils.StringInSlice("write",accessTypes) { + return errors.New("wrong token access type") + } + if !utils.StringInSlice("read",accessTypes) { return errors.New("wrong token access type") }