Skip to content
Snippets Groups Projects
Commit e169bdb0 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

implement delete stream in broker

parent ef74ef57
No related branches found
No related tags found
No related merge requests found
...@@ -791,6 +791,94 @@ func (db *Mongodb) nacks(request Request) ([]byte, error) { ...@@ -791,6 +791,94 @@ func (db *Mongodb) nacks(request Request) ([]byte, error) {
return utils.MapToJson(&res) return utils.MapToJson(&res)
} }
func (db *Mongodb) deleteCollection(request Request, name string) error {
return db.client.Database(request.DbName).Collection(name).Drop(context.Background())
}
func (db *Mongodb) collectionExist(request Request, name string) (bool, error) {
result, err := db.client.Database(request.DbName).ListCollectionNames(context.TODO(), bson.M{"name": name})
if err != nil {
return false, err
}
if len(result) == 1 {
return true, nil
}
return false, nil
}
func (db *Mongodb) deleteDataCollection(errorOnNotexist bool, request Request) error {
dataCol := data_collection_name_prefix + request.DbCollectionName
if errorOnNotexist {
exist, err := db.collectionExist(request, dataCol)
if err != nil || !exist {
return err
}
}
return db.deleteCollection(request, dataCol)
}
func (db *Mongodb) deleteDocumentsInCollection(request Request, collection string, field string, pattern string) error {
filter := bson.M{field: bson.D{{"$regex", primitive.Regex{Pattern: pattern, Options: "i"}}}}
_, err := db.client.Database(request.DbName).Collection(collection).DeleteMany(context.TODO(), filter)
return err
}
func (db *Mongodb) deleteCollectionsWithPrefix(request Request, prefix string) error {
cols, err := db.client.Database(request.DbName).ListCollectionNames(context.TODO(), bson.M{"name": bson.D{
{"$regex", primitive.Regex{Pattern: "^" + prefix, Options: "i"}}}})
if err != nil {
return err
}
for _, col := range cols {
err := db.deleteCollection(request, col)
if err != nil {
return err
}
}
return nil
}
func (db *Mongodb) deleteServiceMeta(request Request) (error) {
err := db.deleteCollectionsWithPrefix(request, acks_collection_name_prefix+request.DbCollectionName)
if err != nil {
return err
}
err = db.deleteCollectionsWithPrefix(request, inprocess_collection_name_prefix+request.DbCollectionName)
if err != nil {
return err
}
return db.deleteDocumentsInCollection(request, pointer_collection_name, "_id", ".*_"+request.DbCollectionName+"$")
}
func (db *Mongodb) deleteStream(request Request) ([]byte, error) {
params := struct {
ErrorOnNotExist *bool
DeleteMeta *bool
}{}
err := json.Unmarshal([]byte(request.ExtraParam), &params)
if err != nil {
return nil, err
}
if params.DeleteMeta == nil || params.ErrorOnNotExist == nil {
return nil, errors.New("wrong params: " + request.ExtraParam)
}
if !*params.DeleteMeta {
logger.Debug("skipping delete stream meta for " + request.DbCollectionName + " in " + request.DbName)
return nil, nil
}
err = db.deleteDataCollection(*params.ErrorOnNotExist, request)
if err != nil {
return nil, err
}
err = db.deleteServiceMeta(request)
return nil, err
}
func (db *Mongodb) lastAck(request Request) ([]byte, error) { func (db *Mongodb) lastAck(request Request) ([]byte, error) {
c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId) c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId)
opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true) opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true)
...@@ -909,6 +997,8 @@ func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) { ...@@ -909,6 +997,8 @@ func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) {
return db.nacks(request) return db.nacks(request)
case "lastack": case "lastack":
return db.lastAck(request) return db.lastAck(request)
case "delete_stream":
return db.deleteStream(request)
} }
return nil, errors.New("Wrong db operation: " + request.Op) return nil, errors.New("Wrong db operation: " + request.Op)
......
...@@ -1154,3 +1154,32 @@ func TestMongoDBGetNextClearsInprocessAfterReset(t *testing.T) { ...@@ -1154,3 +1154,32 @@ func TestMongoDBGetNextClearsInprocessAfterReset(t *testing.T) {
assert.Equal(t, string(rec1_expect), string(res2)) assert.Equal(t, string(rec1_expect), string(res2))
assert.Equal(t, string(rec1_expect), string(res3)) assert.Equal(t, string(rec1_expect), string(res3))
} }
var testsDeleteStream = []struct {
stream string
params string
ok bool
message string
}{
{"test", "{\"ErrorOnNotExist\":true,\"DeleteMeta\":true}", true, "delete stream"},
}
func TestDeleteStreams(t *testing.T) {
for _, test := range testsDeleteStream {
db.Connect(dbaddress)
db.insertRecord(dbname, test.stream, &rec_finished11)
_, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: test.stream, GroupId: "", Op: "delete_stream", ExtraParam: test.params})
if test.ok {
rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""})
acks_exist,_:= db.collectionExist(Request{DbName: dbname, ExtraParam: ""},acks_collection_name_prefix+test.stream)
inprocess_exist,_:= db.collectionExist(Request{DbName: dbname, ExtraParam: ""},inprocess_collection_name_prefix+test.stream)
assert.Equal(t,0,len(rec.Streams),test.message)
assert.Equal(t,false,acks_exist,test.message)
assert.Equal(t,false,inprocess_exist,test.message)
assert.Nil(t, err, test.message)
} else {
assert.NotNil(t, err, test.message)
}
}
}
...@@ -55,6 +55,10 @@ func checkBrokerApiVersion(w http.ResponseWriter, r *http.Request) bool { ...@@ -55,6 +55,10 @@ func checkBrokerApiVersion(w http.ResponseWriter, r *http.Request) bool {
return ok return ok
} }
func needWriteAccess(op string) bool {
return op=="delete_stream";
}
func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_param string, needGroupID bool) { func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_param string, needGroupID bool) {
if ok := checkBrokerApiVersion(w, r); !ok { if ok := checkBrokerApiVersion(w, r); !ok {
return return
...@@ -68,7 +72,7 @@ func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_par ...@@ -68,7 +72,7 @@ func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_par
return return
} }
if err := authorize(r, db_name); err != nil { if err := authorize(r, db_name, needWriteAccess(op)); err != nil {
writeAuthAnswer(w, "get "+op, db_name, err) writeAuthAnswer(w, "get "+op, db_name, err)
return return
} }
......
...@@ -17,7 +17,7 @@ import ( ...@@ -17,7 +17,7 @@ import (
"time" "time"
) )
var correctTokenSuffix, wrongTokenSuffix, suffixWithWrongToken, expectedBeamtimeId, expectedDBName string var correctTokenSuffix, correctTokenSuffixWrite, wrongTokenSuffix, suffixWithWrongToken, expectedBeamtimeId, expectedDBName string
const expectedGroupID = "bid2a5auidddp1vl71d0" const expectedGroupID = "bid2a5auidddp1vl71d0"
const wrongGroupID = "_bid2a5auidddp1vl71" const wrongGroupID = "_bid2a5auidddp1vl71"
...@@ -27,19 +27,26 @@ const expectedStream = "stream" ...@@ -27,19 +27,26 @@ const expectedStream = "stream"
type MockAuthServer struct { type MockAuthServer struct {
} }
func (a * MockAuthServer) AuthorizeToken(tokenJWT string) (token Token, err error) { func (a *MockAuthServer) AuthorizeToken(tokenJWT string) (token Token, err error) {
if tokenJWT =="ok" { if tokenJWT == "ok" {
return Token{ return Token{
structs.IntrospectTokenResponse{ structs.IntrospectTokenResponse{
Sub: "bt_"+expectedBeamtimeId, Sub: "bt_" + expectedBeamtimeId,
AccessTypes: []string{"read"}, AccessTypes: []string{"read"},
}, },
},nil }, nil
} else { }
return Token{},errors.New("wrong JWT token") if tokenJWT == "ok_write" {
return Token{
structs.IntrospectTokenResponse{
Sub: "bt_" + expectedBeamtimeId,
AccessTypes: []string{"read", "write"},
},
}, nil
} }
}
return Token{}, errors.New("wrong JWT token")
}
func prepareTestAuth() { func prepareTestAuth() {
expectedBeamtimeId = "beamtime_id" expectedBeamtimeId = "beamtime_id"
...@@ -47,6 +54,7 @@ func prepareTestAuth() { ...@@ -47,6 +54,7 @@ func prepareTestAuth() {
auth = &MockAuthServer{} auth = &MockAuthServer{}
correctTokenSuffix = "?token=ok" correctTokenSuffix = "?token=ok"
correctTokenSuffixWrite = "?token=ok_write"
wrongTokenSuffix = "?blablabla=aa" wrongTokenSuffix = "?blablabla=aa"
suffixWithWrongToken = "?token=wrong" suffixWithWrongToken = "?token=wrong"
} }
...@@ -143,7 +151,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithNoToken() { ...@@ -143,7 +151,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithNoToken() {
func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() { func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() {
expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId:expectedGroupID, Op: "next"} expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, Op: "next"}
suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""),
&database.DBError{utils.StatusNoData, ""}) &database.DBError{utils.StatusNoData, ""})
...@@ -157,7 +165,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() ...@@ -157,7 +165,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName()
func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() { func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() {
expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId:expectedGroupID, Op: "next"} expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, Op: "next"}
suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""),
&database.DBError{utils.StatusServiceUnavailable, ""}) &database.DBError{utils.StatusServiceUnavailable, ""})
...@@ -173,8 +181,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() { ...@@ -173,8 +181,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() {
func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() { func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() {
expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId:expectedGroupID, Op: "next"} expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, Op: "next"}
suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), errors.New("")) suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), errors.New(""))
logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next"))) logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next")))
...@@ -189,10 +196,9 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() { ...@@ -189,10 +196,9 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() {
func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() { func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() {
expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId:expectedGroupID, Op: "next"} expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, Op: "next"}
suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil) suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil)
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName))) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName)))
doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix) doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix)
...@@ -207,7 +213,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWrongGroupID() { ...@@ -207,7 +213,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWrongGroupID() {
func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() { func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() {
expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId:expectedGroupID, DatasetOp:true, Op: "next"} expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, DatasetOp: true, Op: "next"}
suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil) suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil)
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName))) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName)))
...@@ -215,8 +221,25 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() { ...@@ -215,8 +221,25 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() {
doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix + "&dataset=true") doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix + "&dataset=true")
} }
func (suite *ProcessRequestTestSuite) TestProcessRequestErrorOnWrongProtocol() { func (suite *ProcessRequestTestSuite) TestProcessRequestErrorOnWrongProtocol() {
w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix,"GET","","/v1.2") w := doRequest("/beamtime/"+expectedBeamtimeId+"/"+expectedSource+"/"+expectedStream+"/"+expectedGroupID+"/next"+correctTokenSuffix, "GET", "", "/v1.2")
suite.Equal(http.StatusUnsupportedMediaType, w.Code, "wrong protocol") suite.Equal(http.StatusUnsupportedMediaType, w.Code, "wrong protocol")
} }
func (suite *ProcessRequestTestSuite) TestProcessRequestDeleteStreamReadToken() {
query_str := "query_string"
logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("wrong token access")))
w := doRequest("/beamtime/"+expectedBeamtimeId+"/"+expectedSource+"/"+expectedStream+"/delete"+correctTokenSuffix, "POST", query_str)
suite.Equal(http.StatusUnauthorized, w.Code, "wrong token type")
}
func (suite *ProcessRequestTestSuite) TestProcessRequestDeleteStreamWriteToken() {
query_str := "query_string"
expectedRequest := database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: "", Op: "delete_stream", ExtraParam: query_str}
suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil)
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request delete_stream in "+expectedDBName)))
doRequest("/beamtime/"+expectedBeamtimeId+"/"+expectedSource+"/"+expectedStream+"/delete"+correctTokenSuffixWrite, "POST", query_str)
}
...@@ -50,7 +50,7 @@ func datasetRequested(r *http.Request) (bool, int) { ...@@ -50,7 +50,7 @@ func datasetRequested(r *http.Request) (bool, int) {
return valueTrue(r, "dataset"), valueInt(r, "minsize") return valueTrue(r, "dataset"), valueInt(r, "minsize")
} }
func authorize(r *http.Request, beamtime_id string) error { func authorize(r *http.Request, beamtime_id string, needWriteAccess bool) error {
tokenJWT := r.URL.Query().Get("token") tokenJWT := r.URL.Query().Get("token")
if len(tokenJWT) == 0 { if len(tokenJWT) == 0 {
...@@ -67,7 +67,7 @@ func authorize(r *http.Request, beamtime_id string) error { ...@@ -67,7 +67,7 @@ func authorize(r *http.Request, beamtime_id string) error {
return err return err
} }
return checkAccessType(token.AccessTypes) return checkAccessType(token.AccessTypes,needWriteAccess)
} }
func checkSubject(subject string, beamtime_id string) error { func checkSubject(subject string, beamtime_id string) error {
...@@ -77,7 +77,11 @@ func checkSubject(subject string, beamtime_id string) error { ...@@ -77,7 +77,11 @@ func checkSubject(subject string, beamtime_id string) error {
return nil return nil
} }
func checkAccessType(accessTypes []string) error { func checkAccessType(accessTypes []string, needWriteAccess bool) error {
if needWriteAccess && !utils.StringInSlice("write",accessTypes) {
return errors.New("wrong token access type")
}
if !utils.StringInSlice("read",accessTypes) { if !utils.StringInSlice("read",accessTypes) {
return errors.New("wrong token access type") return errors.New("wrong token access type")
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment