diff --git a/broker/src/asapo_broker/database/database.go b/broker/src/asapo_broker/database/database.go index ff49cef31feff7e194c7c9e60ecd7f8e1b7c98ec..814d1a321e701f4e1e91dc5dc958f319a9ee4966 100644 --- a/broker/src/asapo_broker/database/database.go +++ b/broker/src/asapo_broker/database/database.go @@ -1,10 +1,13 @@ package database -import "asapo_common/utils" +import ( + "asapo_common/logger" + "asapo_common/utils" +) type Request struct { - Beamtime string - DataSource string + Beamtime string + DataSource string Stream string GroupId string Op string @@ -13,8 +16,18 @@ type Request struct { ExtraParam string } -func (request * Request ) DbName() string { - return request.Beamtime+"_"+request.DataSource +func (request *Request) Logger() logger.Logger { + return logger.WithFields(map[string]interface{}{ + "beamtime": request.Beamtime, + "dataSource": decodeString(request.DataSource), + "stream": decodeString(request.Stream), + "groupId": decodeString(request.GroupId), + "operation": request.Op, + }) +} + +func (request *Request) DbName() string { + return request.Beamtime + "_" + request.DataSource } type Agent interface { @@ -26,7 +39,7 @@ type Agent interface { } type DBSettings struct { - ReadFromInprocessPeriod int + ReadFromInprocessPeriod int UpdateStreamCachePeriodMs int } @@ -47,4 +60,3 @@ func GetStatusCodeFromError(err error) int { return utils.StatusServiceUnavailable } } - diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 812491c78b3e005c75e8eb956a4077bfdee0ba8e..7a9d2c13ddd66cab74161cca616a3f4dc77c0946 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -1,9 +1,10 @@ -//+build !test +//go:build !test +// +build !test package database import ( - "asapo_common/logger" + log "asapo_common/logger" "asapo_common/utils" "context" "encoding/json" @@ -84,10 +85,10 @@ const ( type fieldChangeRequest struct { collectionName string - fieldName string - op int - max_ind int - val int + fieldName string + op int + max_ind int + val int } var dbSessionLock sync.Mutex @@ -310,8 +311,7 @@ func (db *Mongodb) getRecordFromDb(request Request, id, id_max int) (res map[str err = c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res) if err != nil { answer := encodeAnswer(id, id_max, "") - log_str := "error getting record id " + strconv.Itoa(id) + " for " + request.DbName() + " : " + err.Error() - logger.Debug(log_str) + request.Logger().WithFields(map[string]interface{}{"id": id, "cause": err.Error()}).Debug("error getting record") return res, &DBError{utils.StatusNoData, answer} } return res, err @@ -327,8 +327,7 @@ func (db *Mongodb) getRecordByIDRaw(request Request, id, id_max int) ([]byte, er return nil, err } - log_str := "got record id " + strconv.Itoa(id) + " for " + request.DbName() - logger.Debug(log_str) + request.Logger().WithFields(map[string]interface{}{"id": id}).Debug("got record from db") record, err := utils.MapToJson(&res) if err != nil { @@ -445,9 +444,9 @@ func (db *Mongodb) getCurrentPointer(request Request) (LocationPointer, int, err var curPointer LocationPointer err = db.changeField(request, fieldChangeRequest{ collectionName: pointer_collection_name, - fieldName: pointer_field_name, - op: field_op_inc, - max_ind: max_ind}, &curPointer) + fieldName: pointer_field_name, + op: field_op_inc, + max_ind: max_ind}, &curPointer) if err != nil { return LocationPointer{}, 0, err } @@ -455,7 +454,7 @@ func (db *Mongodb) getCurrentPointer(request Request) (LocationPointer, int, err return curPointer, max_ind, nil } -func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, delayMs int, nResendAttempts int) (int, error) { +func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, delayMs int, nResendAttempts int, rlog log.Logger) (int, error) { var res InProcessingRecord opts := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After) tNow := time.Now().UnixNano() @@ -476,8 +475,7 @@ func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, delay return 0, err } - log_str := "got unprocessed id " + strconv.Itoa(res.ID) + " for " + dbname - logger.Debug(log_str) + rlog.WithFields(map[string]interface{}{"id": res.ID}).Debug("got unprocessed message") return res.ID, nil } @@ -527,10 +525,10 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi t := db.lastReadFromInprocess[request.Stream+"_"+request.GroupId] dbSessionLock.Unlock() if (t <= tNow-int64(db.settings.ReadFromInprocessPeriod)) || ignoreTimeout { - record_ind, err = db.getUnProcessedId(request.DbName(), inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, delayMs, nResendAttempts) + record_ind, err = db.getUnProcessedId(request.DbName(), inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, delayMs, nResendAttempts, + request.Logger()) if err != nil { - log_str := "error getting unprocessed id " + request.DbName() + ", groupid: " + request.GroupId + ":" + err.Error() - logger.Debug(log_str) + request.Logger().WithFields(map[string]interface{}{"cause": err.Error()}).Debug("error getting unprocessed message") return 0, 0, err } } @@ -552,12 +550,10 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi func (db *Mongodb) getNextAndMaxIndexesFromCurPointer(request Request) (int, int, error) { curPointer, max_ind, err := db.getCurrentPointer(request) if err != nil { - log_str := "error getting next pointer for " + request.DbName() + ", groupid: " + request.GroupId + ":" + err.Error() - logger.Debug(log_str) + request.Logger().WithFields(map[string]interface{}{"cause": err.Error()}).Debug("error getting next pointer") return 0, 0, err } - log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + request.DbName() + ", groupid: " + request.GroupId - logger.Debug(log_str) + request.Logger().WithFields(map[string]interface{}{"id": curPointer.Value}).Debug("got next pointer") return curPointer.Value, max_ind, nil } @@ -622,8 +618,7 @@ func checkStreamFinished(request Request, id, id_max int, data map[string]interf if !ok || !r.FinishedStream { return nil } - log_str := "reached end of stream " + request.Stream + " , next_stream: " + r.NextStream - logger.Debug(log_str) + request.Logger().WithFields(map[string]interface{}{"nextStream": r.NextStream}).Debug("reached end of stream") answer := encodeAnswer(r.ID-1, r.ID-1, r.NextStream) return &DBError{utils.StatusNoData, answer} @@ -666,10 +661,10 @@ func (db *Mongodb) getLastRecordInGroup(request Request) ([]byte, error) { var res map[string]interface{} err = db.changeField(request, fieldChangeRequest{ collectionName: last_message_collection_name, - fieldName: last_message_field_name, - op: field_op_set, - max_ind: max_ind, - val: max_ind, + fieldName: last_message_field_name, + op: field_op_set, + max_ind: max_ind, + val: max_ind, }, &res) if err != nil { return nil, err @@ -746,24 +741,20 @@ func (db *Mongodb) getMeta(request Request) ([]byte, error) { c := db.client.Database(request.DbName()).Collection(meta_collection_name) err = c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res) if err != nil { - log_str := "error getting meta for " + id + " in " + request.DbName() + " : " + err.Error() - logger.Debug(log_str) + request.Logger().WithFields(map[string]interface{}{"id": id, "cause": err.Error()}).Debug("error getting meta") return nil, &DBError{utils.StatusNoData, err.Error()} } userMeta, ok := res["meta"] if !ok { - log_str := "error getting meta for " + id + " in " + request.DbName() + " : cannot parse database response" - logger.Error(log_str) - return nil, errors.New(log_str) + request.Logger().WithFields(map[string]interface{}{"id": id, "cause": "cannot parse database response"}).Debug("error getting meta") + return nil, errors.New("cannot get metadata") } - log_str := "got metadata for " + id + " in " + request.DbName() - logger.Debug(log_str) + request.Logger().WithFields(map[string]interface{}{"id": id}).Error("got metadata") return utils.MapToJson(&userMeta) } -func (db *Mongodb) processQueryError(query, dbname string, err error) ([]byte, error) { - log_str := "error processing query: " + query + " for " + dbname + " : " + err.Error() - logger.Debug(log_str) +func (db *Mongodb) processQueryError(query, dbname string, err error, rlog log.Logger) ([]byte, error) { + rlog.WithFields(map[string]interface{}{"query": query, "cause": err.Error()}).Debug("error processing query") return nil, &DBError{utils.StatusNoData, err.Error()} } @@ -771,8 +762,7 @@ func (db *Mongodb) queryMessages(request Request) ([]byte, error) { var res []map[string]interface{} q, sort, err := db.BSONFromSQL(request.DbName(), request.ExtraParam) if err != nil { - log_str := "error parsing query: " + request.ExtraParam + " for " + request.DbName() + " : " + err.Error() - logger.Debug(log_str) + request.Logger().WithFields(map[string]interface{}{"query": request.ExtraParam, "cause": err.Error()}).Debug("error parsing query") return nil, &DBError{utils.StatusWrongInput, err.Error()} } @@ -786,15 +776,15 @@ func (db *Mongodb) queryMessages(request Request) ([]byte, error) { cursor, err := c.Find(context.TODO(), q, opts) if err != nil { - return db.processQueryError(request.ExtraParam, request.DbName(), err) + return db.processQueryError(request.ExtraParam, request.DbName(), err, request.Logger()) } err = cursor.All(context.TODO(), &res) if err != nil { - return db.processQueryError(request.ExtraParam, request.DbName(), err) + return db.processQueryError(request.ExtraParam, request.DbName(), err, request.Logger()) } - log_str := "processed query " + request.ExtraParam + " for " + request.DbName() + " ,found" + strconv.Itoa(len(res)) + " records" - logger.Debug(log_str) + request.Logger().WithFields(map[string]interface{}{"query": request.ExtraParam, "recordsFound": len(res)}).Debug("processed query") + if res != nil { return utils.MapToJson(&res) } else { @@ -966,7 +956,7 @@ func (db *Mongodb) deleteStream(request Request) ([]byte, error) { return nil, &DBError{utils.StatusWrongInput, "wrong params: " + request.ExtraParam} } if !*params.DeleteMeta { - logger.Debug("skipping delete stream meta for " + request.Stream + " in " + request.DbName()) + request.Logger().Debug("skipping delete stream meta") return nil, nil } @@ -1062,7 +1052,7 @@ func (db *Mongodb) getNacks(request Request, min_index, max_index int) ([]int, e func (db *Mongodb) getStreams(request Request) ([]byte, error) { rec, err := streams.getStreams(db, request) if err != nil { - return db.processQueryError("get streams", request.DbName(), err) + return db.processQueryError("get streams", request.DbName(), err, request.Logger()) } return json.Marshal(&rec) } diff --git a/broker/src/asapo_broker/database/mongodb_streams.go b/broker/src/asapo_broker/database/mongodb_streams.go index d3974e9c73358ca1569081d87869d324637b474d..b57f9973ddfa997af4706f75ece08e6d5969fa61 100644 --- a/broker/src/asapo_broker/database/mongodb_streams.go +++ b/broker/src/asapo_broker/database/mongodb_streams.go @@ -36,7 +36,7 @@ var streams = Streams{lastSynced: make(map[string]time.Time, 0),lastUpdated: mak var streamsLock sync.Mutex func (ss *Streams) tryGetFromCache(db_name string, updatePeriodMs int) (StreamsRecord, error) { - if time.Now().Sub(ss.lastUpdated[db_name]).Milliseconds() > int64(updatePeriodMs) { + if time.Now().Sub(ss.lastUpdated[db_name]).Milliseconds() >= int64(updatePeriodMs) { return StreamsRecord{}, errors.New("cache expired") } rec, ok := ss.records[db_name] diff --git a/broker/src/asapo_broker/server/get_commands_test.go b/broker/src/asapo_broker/server/get_commands_test.go index d9e5fffc45071d40229c260dade1ffc9f6c221da..0c5b4a91868570757dc9181e1b93aa25771fa8a2 100644 --- a/broker/src/asapo_broker/server/get_commands_test.go +++ b/broker/src/asapo_broker/server/get_commands_test.go @@ -61,7 +61,10 @@ var testsGetCommand = []struct { func (suite *GetCommandsTestSuite) TestGetCommandsCallsCorrectRoutine() { for _, test := range testsGetCommand { suite.mock_db.On("ProcessRequest", database.Request{Beamtime: expectedBeamtimeId, DataSource: test.source, Stream: test.stream, GroupId: test.groupid, Op: test.command, ExtraParam: test.externalParam}).Return([]byte("Hello"), nil) - logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request "+test.command))) + logger.MockLog.On("WithFields", mock.MatchedBy(containsMatcherMap(test.command))) + logger.MockLog.On("Debug", mock.Anything) + + w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + test.source + "/" + test.reqString+correctTokenSuffix+test.queryParams) suite.Equal(http.StatusOK, w.Code, test.command+ " OK") suite.Equal("Hello", string(w.Body.Bytes()), test.command+" sends data") @@ -84,7 +87,8 @@ func (suite *GetCommandsTestSuite) TestGetCommandsCorrectlyProcessedEncoding() { test.reqString = strings.Replace(test.reqString,test.source,encodedSource,1) test.reqString = strings.Replace(test.reqString,test.stream,encodedStream,1) suite.mock_db.On("ProcessRequest", database.Request{Beamtime: expectedBeamtimeId,DataSource: newsource, Stream: newstream, GroupId: newgroup, Op: test.command, ExtraParam: test.externalParam}).Return([]byte("Hello"), nil) - logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request "+test.command))) + logger.MockLog.On("WithFields", mock.MatchedBy(containsMatcherMap(test.command))) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("got request"))) w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + encodedSource + "/" + test.reqString+correctTokenSuffix+test.queryParams) suite.Equal(http.StatusOK, w.Code, test.command+ " OK") suite.Equal("Hello", string(w.Body.Bytes()), test.command+" sends data") diff --git a/broker/src/asapo_broker/server/get_meta_test.go b/broker/src/asapo_broker/server/get_meta_test.go index 43ac602b23e7d38113adeab3804a744c61ebd0b8..75367d998ca893f0533fe8f57732606b7ef3750b 100644 --- a/broker/src/asapo_broker/server/get_meta_test.go +++ b/broker/src/asapo_broker/server/get_meta_test.go @@ -34,7 +34,9 @@ func TestGetMetaTestSuite(t *testing.T) { func (suite *GetMetaTestSuite) TestGetMetaOK() { suite.mock_db.On("ProcessRequest", database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, Op: "meta", ExtraParam: "0"}).Return([]byte(""), nil) - logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request meta"))) + logger.MockLog.On("WithFields", mock.MatchedBy(containsMatcherMap("meta"))) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("got request"))) + w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/0/meta" + "/0" + correctTokenSuffix,"GET") suite.Equal(http.StatusOK, w.Code, "meta OK") } diff --git a/broker/src/asapo_broker/server/post_create_group.go b/broker/src/asapo_broker/server/post_create_group.go index 008e72f14d4bf36022a094c923ab301d7ed2bf36..ba1ae49c478b885c8bdd8339b0707010f5fecd46 100644 --- a/broker/src/asapo_broker/server/post_create_group.go +++ b/broker/src/asapo_broker/server/post_create_group.go @@ -14,6 +14,6 @@ func routeCreateGroupID(w http.ResponseWriter, r *http.Request) { guid := xid.New() w.Write([]byte(guid.String())) - logger.Debug("generated new group: " + guid.String()) + logger.WithFields(map[string]interface{}{"guid":guid.String()}).Debug("generated new group") statistics.IncreaseCounter() } diff --git a/broker/src/asapo_broker/server/post_create_group_test.go b/broker/src/asapo_broker/server/post_create_group_test.go index dcef0d009e109426d8cb95e9fc5dabd31a0b7692..3189bb46f0cb815f4644bcc28cb31252f02d1a73 100644 --- a/broker/src/asapo_broker/server/post_create_group_test.go +++ b/broker/src/asapo_broker/server/post_create_group_test.go @@ -18,7 +18,9 @@ func GetObjectID(t *testing.T) (xid.ID, error) { func TestGetNewGroup(t *testing.T) { statistics.Reset() logger.SetMockLog() - logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("generated new group"))) + logger.MockLog.On("WithFields", mock.MatchedBy(containsMatcherMap("guid"))) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("generated new group"))) + id1, err := GetObjectID(t) assert.Nil(t, err, "first is ObjectID") diff --git a/broker/src/asapo_broker/server/post_op_image_test.go b/broker/src/asapo_broker/server/post_op_image_test.go index b11944376876844335c204b4651f3dc5b4126aba..facf1922c6dec06c2231de978e96d98da5ed162e 100644 --- a/broker/src/asapo_broker/server/post_op_image_test.go +++ b/broker/src/asapo_broker/server/post_op_image_test.go @@ -35,7 +35,8 @@ func TestMessageOpTestSuite(t *testing.T) { func (suite *MessageOpTestSuite) TestAckMessageOpOK() { query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}" suite.mock_db.On("ProcessRequest", database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "ackmessage", ExtraParam: query_str}).Return([]byte(""), nil) - logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request ackmessage"))) + logger.MockLog.On("WithFields", mock.MatchedBy(containsMatcherMap("ackmessage"))) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("got request"))) w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/1" + correctTokenSuffix,"POST",query_str) suite.Equal(http.StatusOK, w.Code, "ackmessage OK") } diff --git a/broker/src/asapo_broker/server/post_query_images_test.go b/broker/src/asapo_broker/server/post_query_images_test.go index 13071ed7413ea87e11fe2746743c91febe0fdb35..28809d4aa9eee1837890ba49b4117a4cc322f505 100644 --- a/broker/src/asapo_broker/server/post_query_images_test.go +++ b/broker/src/asapo_broker/server/post_query_images_test.go @@ -36,7 +36,9 @@ func (suite *QueryTestSuite) TestQueryOK() { query_str := "aaaa" suite.mock_db.On("ProcessRequest", database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream,Op: "querymessages", ExtraParam: query_str}).Return([]byte("{}"), nil) - logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request querymessages"))) + logger.MockLog.On("WithFields", mock.MatchedBy(containsMatcherMap("querymessages"))) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("got request"))) + w := doRequest("/beamtime/"+expectedBeamtimeId+"/"+expectedSource+"/"+expectedStream+"/0/querymessages"+correctTokenSuffix, "POST", query_str) suite.Equal(http.StatusOK, w.Code, "Query OK") diff --git a/broker/src/asapo_broker/server/post_reset_counter_test.go b/broker/src/asapo_broker/server/post_reset_counter_test.go index 25a9128b2ea064548af2dedf89b6dff65b3bc8c3..84ace072d9cac0872b8f262dbccb72c918af2280 100644 --- a/broker/src/asapo_broker/server/post_reset_counter_test.go +++ b/broker/src/asapo_broker/server/post_reset_counter_test.go @@ -36,7 +36,8 @@ func (suite *ResetCounterTestSuite) TestResetCounterOK() { expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId:expectedGroupID, Op: "resetcounter", ExtraParam: "10"} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), nil) - logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request resetcounter"))) + logger.MockLog.On("WithFields", mock.MatchedBy(containsMatcherMap("resetcounter"))) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("got request"))) w := doRequest("/beamtime/"+expectedBeamtimeId+"/"+expectedSource+"/"+expectedStream+"/"+expectedGroupID+"/resetcounter"+correctTokenSuffix+"&value=10", "POST") suite.Equal(http.StatusOK, w.Code, "ResetCounter OK") diff --git a/broker/src/asapo_broker/server/process_request.go b/broker/src/asapo_broker/server/process_request.go index bbb84f92222e1e64fe2bf690ffdb2722879ac113..8a0065fc2806ada24caa414e825e3e35b4c97778 100644 --- a/broker/src/asapo_broker/server/process_request.go +++ b/broker/src/asapo_broker/server/process_request.go @@ -65,6 +65,7 @@ func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_par w.Header().Set("Access-Control-Allow-Origin", "*") beamtime, datasource, stream, group_id, ok := extractRequestParameters(r, needGroupID) if !ok { + log.WithFields(map[string]interface{}{"request":r.RequestURI}).Error("cannot extract request parameters") w.WriteHeader(http.StatusBadRequest) return } @@ -86,17 +87,19 @@ func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_par request.MinDatasetSize = minSize } - answer, code := processRequestInDb(request) + rlog:=request.Logger() + rlog.Debug("got request") + answer, code := processRequestInDb(request,rlog) w.WriteHeader(code) w.Write(answer) } -func returnError(err error, log_str string) (answer []byte, code int) { +func returnError(err error, rlog logger.Logger) (answer []byte, code int) { code = database.GetStatusCodeFromError(err) if code != utils.StatusNoData && code != utils.StatusPartialData{ - logger.Error(log_str + " - " + err.Error()) + rlog.WithFields(map[string]interface{}{"cause":err.Error()}).Error("cannot process request") } else { - logger.Debug(log_str + " - " + err.Error()) + rlog.WithFields(map[string]interface{}{"cause":err.Error()}).Debug("no data or partial data") } return []byte(err.Error()), code } @@ -108,22 +111,20 @@ func reconnectIfNeeded(db_error error) { } if err := ReconnectDb(); err != nil { - log.Error("cannot reconnect to database at : " + settings.GetDatabaseServer() + " " + err.Error()) + log.WithFields(map[string]interface{}{"address":settings.GetDatabaseServer(),"cause": err.Error()}).Error("cannot reconnect to database") } else { - log.Debug("reconnected to database" + settings.GetDatabaseServer()) + log.WithFields(map[string]interface{}{"address":settings.GetDatabaseServer()}).Debug("reconnected to database") } } -//func dbRequestLogger(request database.Request) -func processRequestInDb(request database.Request) (answer []byte, code int) { + +func processRequestInDb(request database.Request,rlog logger.Logger) (answer []byte, code int) { statistics.IncreaseCounter() answer, err := db.ProcessRequest(request) - log_str := "processing request " + request.Op + " in " + request.DbName() + " at " + settings.GetDatabaseServer() if err != nil { go reconnectIfNeeded(err) - return returnError(err, log_str) + return returnError(err, rlog) } - logger.Debug(log_str) return answer, utils.StatusOK } diff --git a/broker/src/asapo_broker/server/process_request_test.go b/broker/src/asapo_broker/server/process_request_test.go index 1edaf7780f5a8e5556882f719ba48d6cfa64ae86..f62d12814f0e933aa0ec6a549e008d774628d2c0 100644 --- a/broker/src/asapo_broker/server/process_request_test.go +++ b/broker/src/asapo_broker/server/process_request_test.go @@ -66,7 +66,19 @@ type request struct { message string } -func containsMatcher(substrings ...string) func(str string) bool { +func containsMatcherMap(substrings ...string) func(map[string]interface{}) bool { + return func(vals map[string]interface{}) bool { + res,_:=utils.MapToJson(vals) + for _, substr := range substrings { + if !strings.Contains(string(res), substr) { + return false + } + } + return true + } +} + +func containsMatcherStr(substrings ...string) func(str string) bool { return func(str string) bool { for _, substr := range substrings { if !strings.Contains(str, substr) { @@ -77,6 +89,7 @@ func containsMatcher(substrings ...string) func(str string) bool { } } + func doRequest(path string, extra_params ...string) *httptest.ResponseRecorder { m := "GET" if len(extra_params) > 0 { @@ -134,7 +147,9 @@ func TestProcessRequestTestSuite(t *testing.T) { } func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongToken() { - logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("wrong JWT token"))) + + logger.MockLog.On("WithFields", mock.MatchedBy(containsMatcherMap("wrong JWT token"))) + logger.MockLog.On("Error", mock.MatchedBy(containsMatcherStr("cannot authorize request"))) w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/next" + suffixWithWrongToken) @@ -142,7 +157,8 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongToken() { } func (suite *ProcessRequestTestSuite) TestProcessRequestWithNoToken() { - logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("cannot extract"))) + logger.MockLog.On("WithFields", mock.MatchedBy(containsMatcherMap("cannot extract"))) + logger.MockLog.On("Error", mock.MatchedBy(containsMatcherStr("cannot authorize request"))) w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/next" + wrongTokenSuffix) @@ -156,7 +172,10 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), &database.DBError{utils.StatusNoData, ""}) - logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next"))) + logger.MockLog.On("WithFields", mock.Anything) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("got request"))) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("no data or partial data"))) + w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix) @@ -170,9 +189,11 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() { suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), &database.DBError{utils.StatusServiceUnavailable, ""}) - logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next"))) + logger.MockLog.On("WithFields", mock.Anything) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("got request"))) + logger.MockLog.On("Error", mock.MatchedBy(containsMatcherStr("cannot process request"))) ExpectReconnect(suite.mock_db) - logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("reconnected"))) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("reconnected"))) w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix) time.Sleep(time.Second) @@ -184,8 +205,11 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() { expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), errors.New("")) - logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next"))) - logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("reconnected"))) + + logger.MockLog.On("WithFields", mock.Anything) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("got request"))) + logger.MockLog.On("Error", mock.MatchedBy(containsMatcherStr("cannot process request"))) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("reconnected"))) ExpectReconnect(suite.mock_db) w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix) @@ -199,7 +223,8 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() { expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil) - logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName))) + logger.MockLog.On("WithFields", mock.Anything) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("got request"))) doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix) suite.Equal(1, statistics.GetCounter(), "ProcessRequest increases counter") @@ -210,7 +235,8 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() { expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, DatasetOp: true, Op: "next"} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil) - logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName))) + logger.MockLog.On("WithFields", mock.Anything) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("got request"))) doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/next" + correctTokenSuffix + "&dataset=true") } @@ -222,7 +248,9 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestErrorOnWrongProtocol() { func (suite *ProcessRequestTestSuite) TestProcessRequestDeleteStreamReadToken() { query_str := "query_string" - logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("wrong token access"))) + logger.MockLog.On("WithFields", mock.MatchedBy(containsMatcherMap("wrong token access"))) + logger.MockLog.On("Error", mock.MatchedBy(containsMatcherStr("cannot authorize request"))) + w := doRequest("/beamtime/"+expectedBeamtimeId+"/"+expectedSource+"/"+expectedStream+"/delete"+correctTokenSuffix, "POST", query_str) suite.Equal(http.StatusUnauthorized, w.Code, "wrong token type") @@ -234,6 +262,8 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestDeleteStreamWriteToken() expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: "", Op: "delete_stream", ExtraParam: query_str} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil) - logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request delete_stream in "+expectedDBName))) + logger.MockLog.On("WithFields", mock.MatchedBy(containsMatcherMap("delete_stream"))) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcherStr("got request"))) + doRequest("/beamtime/"+expectedBeamtimeId+"/"+expectedSource+"/"+expectedStream+"/delete"+correctTokenSuffixWrite, "POST", query_str) } diff --git a/broker/src/asapo_broker/server/request_common.go b/broker/src/asapo_broker/server/request_common.go index 1a0d5e875034ed369f85128929ef3baae1c17c1f..42c34e4a08b9bc06fe788615047725b8deee7795 100644 --- a/broker/src/asapo_broker/server/request_common.go +++ b/broker/src/asapo_broker/server/request_common.go @@ -8,9 +8,8 @@ import ( "strconv" ) -func writeAuthAnswer(w http.ResponseWriter, requestName string, db_name string, err error) { - log_str := "processing " + requestName + " request in " + db_name + " at " + settings.GetDatabaseServer() - logger.Error(log_str + " - " + err.Error()) +func writeAuthAnswer(w http.ResponseWriter, requestOp string, db_name string, err error) { + logger.WithFields(map[string]interface{}{"operation": requestOp, "cause": err.Error()}).Error("cannot authorize request") switch er := err.(type) { case AuthorizationError: @@ -54,7 +53,7 @@ func authorize(r *http.Request, beamtime_id string, needWriteAccess bool) error tokenJWT := r.URL.Query().Get("token") if len(tokenJWT) == 0 { - return AuthorizationError{errors.New("cannot extract token from request"),http.StatusBadRequest} + return AuthorizationError{errors.New("cannot extract token from request"), http.StatusBadRequest} } token, err := auth.AuthorizeToken(tokenJWT) @@ -67,23 +66,23 @@ func authorize(r *http.Request, beamtime_id string, needWriteAccess bool) error return err } - return checkAccessType(token.AccessTypes,needWriteAccess) + return checkAccessType(token.AccessTypes, needWriteAccess) } func checkSubject(subject string, beamtime_id string) error { if subject != utils.SubjectFromBeamtime(beamtime_id) { - return AuthorizationError{errors.New("wrong token subject"),http.StatusUnauthorized} + return AuthorizationError{errors.New("wrong token subject"), http.StatusUnauthorized} } return nil } func checkAccessType(accessTypes []string, needWriteAccess bool) error { - if needWriteAccess && !utils.StringInSlice("write",accessTypes) { - return AuthorizationError{errors.New("wrong token access type"),http.StatusUnauthorized} + if needWriteAccess && !utils.StringInSlice("write", accessTypes) { + return AuthorizationError{errors.New("wrong token access type"), http.StatusUnauthorized} } - if !utils.StringInSlice("read",accessTypes) { - return AuthorizationError{errors.New("wrong token access type"),http.StatusUnauthorized} + if !utils.StringInSlice("read", accessTypes) { + return AuthorizationError{errors.New("wrong token access type"), http.StatusUnauthorized} } return nil }