Skip to content
Snippets Groups Projects
Commit 3fae2eef authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

Merge branch 'develop' into feature_ASAPO-152-kafka-notifications

parents 7cdda5c6 1e52da7d
No related branches found
No related tags found
No related merge requests found
Showing
with 221 additions and 286 deletions
......@@ -54,9 +54,7 @@
.idea/**/libraries
# CMake
cmake-build-debug/
cmake-build-release/
cmake-build-relwithdebinfo/
cmake-build-*/
# Mongo Explorer plugin:
.idea/**/mongoSettings.xml
......
## 21.12.0 (in progress)
## 21.12.1 (in progress)
IMPROVEMENTS
* renamed and hid C++ macros from client code
BUG FIXES
* Producer API: fixed bug segfault in Python code when sending data object which memory is from some other object
## 21.12.0
FEATURES
* Consumer API: Get last within consumer group returns message only once
* Producer API: An option to write raw data to core filesystem directly
* Consumer/Producer API - packages for Debian 11.1, wheel for Python 3.9
* Consumer/Producer API - dropped Python 2 support for wheels and packages for new Debian/CentOS versions
INTERNAL
* Improved logging - tags for beamline, beamtime, ...
* Updated orchestration tools to latest version
## 21.09.0
......
function(cleanup varname)
string (REPLACE "-" "_" out ${${varname}})
SET( ${varname} ${out} PARENT_SCOPE)
string(REPLACE "-" "_" out ${${varname}})
SET(${varname} ${out} PARENT_SCOPE)
endfunction()
execute_process(COMMAND git describe --tags --abbrev=0
OUTPUT_VARIABLE ASAPO_TAG
WORKING_DIRECTORY ..)
execute_process(COMMAND git describe --tags --abbrev=0
OUTPUT_VARIABLE ASAPO_TAG
WORKING_DIRECTORY ..)
string(STRIP ${ASAPO_TAG} ASAPO_TAG)
execute_process(COMMAND git rev-parse --abbrev-ref HEAD
OUTPUT_VARIABLE BRANCH
WORKING_DIRECTORY ..)
OUTPUT_VARIABLE BRANCH
WORKING_DIRECTORY ..)
string(STRIP ${BRANCH} BRANCH)
cleanup(BRANCH)
......@@ -20,22 +20,37 @@ execute_process(COMMAND git rev-parse --short=10 HEAD
string(STRIP ${ASAPO_VERSION_COMMIT} ASAPO_VERSION_COMMIT)
if (${BRANCH} STREQUAL "master")
SET (ASAPO_VERSION ${ASAPO_TAG})
SET (ASAPO_VERSION_COMMIT "")
SET (ASAPO_VERSION_DOCKER_SUFFIX "")
SET (PYTHON_ASAPO_VERSION ${ASAPO_VERSION})
SET(ASAPO_VERSION ${ASAPO_TAG})
SET(ASAPO_VERSION_COMMIT "")
SET(ASAPO_VERSION_DOCKER_SUFFIX "")
SET(PYTHON_ASAPO_VERSION ${ASAPO_VERSION})
string(REGEX REPLACE "\\.0([0-9]+)\\."
".\\1." ASAPO_WHEEL_VERSION
${ASAPO_VERSION})
else()
SET (ASAPO_VERSION ${BRANCH})
SET (ASAPO_VERSION_COMMIT ", build ${ASAPO_VERSION_COMMIT}")
SET (ASAPO_VERSION_DOCKER_SUFFIX "-dev")
else ()
SET(ASAPO_VERSION ${BRANCH})
SET(ASAPO_VERSION_COMMIT ", build ${ASAPO_VERSION_COMMIT}")
SET(ASAPO_VERSION_DOCKER_SUFFIX "-dev")
string(REPLACE "_" "-" ASAPO_VERSION ${ASAPO_VERSION})
SET (ASAPO_VERSION 100.0.${ASAPO_VERSION})
SET (PYTHON_ASAPO_VERSION ${ASAPO_VERSION})
SET (ASAPO_WHEEL_VERSION ${ASAPO_VERSION})
endif()
SET(ASAPO_VERSION 100.0.${ASAPO_VERSION})
if (${BRANCH} STREQUAL "develop")
SET(PYTHON_ASAPO_VERSION 100.0.dev0)
else ()
string(FIND ${BRANCH} feature_ASAPO pos)
if( ${pos} EQUAL 0)
string(SUBSTRING ${ASAPO_VERSION} 20 -1 TMP)
string(REGEX MATCH "^([0-9]+)|.+$" ISSUE_NUM "${TMP}")
if (ISSUE_NUM STREQUAL "")
SET(PYTHON_ASAPO_VERSION 100.0.dev1)
else ()
SET(PYTHON_ASAPO_VERSION 100.0.dev${ISSUE_NUM})
endif ()
else ()
SET(PYTHON_ASAPO_VERSION 100.0.dev1)
endif ()
endif ()
SET(ASAPO_WHEEL_VERSION ${ASAPO_VERSION})
endif ()
message("Asapo Version: " ${ASAPO_VERSION})
message("Python Asapo Version: " ${PYTHON_ASAPO_VERSION})
......
......@@ -22,6 +22,8 @@ set(ConfigPackageLocation lib/cmake/Asapo)
# options
option(BUILD_PYTHON "Build python libs" ON)
option(BUILD_PYTHON2_PACKAGES "Build python2 packages" OFF)
option(BUILD_CLIENTS_ONLY "Build clients only" OFF)
option(BUILD_CONSUMER_TOOLS "Build consumer tools" OFF)
......
### Producer Protocol
| Release | used by client | Supported by server | Status |
| ------------ | ------------------- | -------------------- | ---------------- |
| v0.5 | | | In development |
| v0.4 | 21.09.0 - 21.09.0 | 21.09.0 - 21.09.0 | Current version |
| v0.3 | 21.06.0 - 21.06.0 | 21.06.0 - 21.09.0 | Deprecates from 01.09.2022 |
| v0.2 | 21.03.2 - 21.03.2 | 21.03.2 - 21.09.0 | Deprecates from 01.07.2022 |
| v0.1 | 21.03.0 - 21.03.1 | 21.03.0 - 21.09.0 | Deprecates from 01.06.2022 |
| v0.5 | 21.12.0 - 21.12.0 | 21.12.0 - 21.12.0 | Current version |
| v0.4 | 21.09.0 - 21.09.0 | 21.09.0 - 21.12.0 | Deprecates from 01.12.2022 |
| v0.3 | 21.06.0 - 21.06.0 | 21.06.0 - 21.12.0 | Deprecates from 01.09.2022 |
| v0.2 | 21.03.2 - 21.03.2 | 21.03.2 - 21.12.0 | Deprecates from 01.07.2022 |
| v0.1 | 21.03.0 - 21.03.1 | 21.03.0 - 21.12.0 | Deprecates from 01.06.2022 |
### Consumer Protocol
| Release | used by client | Supported by server | Status |
| ------------ | ------------------- | -------------------- | ---------------- |
| v0.5 | | | In development |
| v0.4 | 21.06.0 - 21.09.0 | 21.06.0 - 21.09.0 | Current version |
| v0.3 | 21.03.3 - 21.03.3 | 21.03.3 - 21.09.0 | Deprecates from 01.07.2022 |
| v0.2 | 21.03.2 - 21.03.2 | 21.03.2 - 21.09.0 | Deprecates from 01.06.2022 |
| v0.1 | 21.03.0 - 21.03.1 | 21.03.0 - 21.09.0 | Deprecates from 01.06.2022 |
| v0.5 | 21.12.0 - 21.12.0 | 21.12.0 - 21.12.0 | Current version |
| v0.4 | 21.06.0 - 21.09.0 | 21.06.0 - 21.12.0 | Deprecates from 01.12.2022 |
| v0.3 | 21.03.3 - 21.03.3 | 21.03.3 - 21.12.0 | Deprecates from 01.07.2022 |
| v0.2 | 21.03.2 - 21.03.2 | 21.03.2 - 21.12.0 | Deprecates from 01.06.2022 |
| v0.1 | 21.03.0 - 21.03.1 | 21.03.0 - 21.12.0 | Deprecates from 01.06.2022 |
......@@ -2,25 +2,25 @@
| Release | API changed\*\* | Protocol | Supported by server from/to | Status |Comment|
| ------------ | ----------- | -------- | ------------------------- | --------------------- | ------- |
| 21.12.0 | No | v0.5 | 21.12.0/21.12.0 | in development | |
| 21.09.0 | No | v0.4 | 21.09.0/21.09.0 | current version |beamline token for raw |
| 21.06.0 | Yes | v0.3 | 21.06.0/21.09.0 | deprecates 01.09.2022 |arbitrary characters|
| 21.03.3 | No | v0.2 | 21.03.2/21.09.0 | deprecates 01.07.2022 |bugfix in server|
| 21.03.2 | Yes | v0.2 | 21.03.2/21.09.0 | deprecates 01.07.2022 |bugfixes, add delete_stream|
| 21.03.1 | No | v0.1 | 21.03.0/21.09.0 | deprecates 01.06.2022 |bugfix in server|
| 21.03.0 | Yes | v0.1 | 21.03.0/21.09.0 | | |
| 21.12.0 | Yes | v0.5 | 21.12.0/21.12.0 | current version | |
| 21.09.0 | Yes | v0.4 | 21.09.0/21.12.0 | deprecates 01.12.2022 |beamline token for raw |
| 21.06.0 | Yes | v0.3 | 21.06.0/21.12.0 | deprecates 01.09.2022 |arbitrary characters|
| 21.03.3 | No | v0.2 | 21.03.2/21.12.0 | deprecates 01.07.2022 |bugfix in server|
| 21.03.2 | Yes | v0.2 | 21.03.2/21.12.0 | deprecates 01.07.2022 |bugfixes, add delete_stream|
| 21.03.1 | No | v0.1 | 21.03.0/21.12.0 | deprecates 01.06.2022 |bugfix in server|
| 21.03.0 | Yes | v0.1 | 21.03.0/21.12.0 | deprecates 01.06.2022 | |
### Consumer API
| Release | API changed\*\* | Protocol | Supported by server from/to | Status |Comment|
| ------------ | ----------- | --------- | ------------------------- | ---------------- | ------- |
| 21.12.0 | Yes | v0.5 | 21.12.0/21.12.0 | in development | |
| 21.09.0 | No | v0.4 | 21.06.0/21.09.0 | current version | |
| 21.06.0 | Yes | v0.4 | 21.06.0/21.09.0 | |arbitrary characters, bugfixes |
| 21.03.3 | Yes | v0.3 | 21.03.3/21.09.0 | deprecates 01.06.2022 |bugfix in server, error type for dublicated ack|
| 21.03.2 | Yes | v0.2 | 21.03.2/21.09.0 | deprecates 01.06.2022 |bugfixes, add delete_stream|
| 21.03.1 | No | v0.1 | 21.03.0/21.09.0 | deprecates 01.06.2022 |bugfix in server|
| 21.03.0 | Yes | v0.1 | 21.03.0/21.09.0 | | |
| 21.12.0 | Yes | v0.5 | 21.12.0/21.12.0 | current version | |
| 21.09.0 | No | v0.4 | 21.06.0/21.12.0 | deprecates 01.12.2022 | |
| 21.06.0 | Yes | v0.4 | 21.06.0/21.12.0 | deprecates 01.09.2022 |arbitrary characters, bugfixes |
| 21.03.3 | Yes | v0.3 | 21.03.3/21.12.0 | deprecates 01.06.2022 |bugfix in server, error type for dublicated ack|
| 21.03.2 | Yes | v0.2 | 21.03.2/21.12.0 | deprecates 01.06.2022 |bugfixes, add delete_stream|
| 21.03.1 | No | v0.1 | 21.03.0/21.12.0 | deprecates 01.06.2022 |bugfix in server|
| 21.03.0 | Yes | v0.1 | 21.03.0/21.12.0 | deprecates 01.06.2022 | |
\* insignificant changes/bugfixes (e.g. in return type, etc), normally do not require client code changes, but formally might break the client
......
......@@ -294,7 +294,7 @@ func authorizeMeta(meta common.BeamtimeMeta, request authorizationRequest, creds
if creds.Beamline != "auto" && meta.Beamline != creds.Beamline {
err_string := "given beamline (" + creds.Beamline + ") does not match the found one (" + meta.Beamline + ")"
log.Debug(err_string)
log.Error(err_string)
return nil, errors.New(err_string)
}
......@@ -330,8 +330,14 @@ func authorize(request authorizationRequest, creds SourceCredentials) (common.Be
}
meta.AccessTypes = accessTypes
log.Debug("authorized creds bl/bt: ", creds.Beamline+"/"+creds.BeamtimeId+", beamtime "+meta.BeamtimeId+" for "+request.OriginHost+" in "+
meta.Beamline+", type "+meta.Type, "online path "+meta.OnlinePath+", offline path "+meta.OfflinePath)
log.WithFields(map[string]interface{}{
"beamline":creds.Beamline,
"beamtime":creds.BeamtimeId,
"origin":request.OriginHost,
"type":meta.Type,
"onlinePath":meta.OnlinePath,
"offlinePath":meta.OfflinePath,
}).Debug("authorized credentials")
return meta, nil
}
......@@ -372,7 +378,7 @@ func routeAuthorize(w http.ResponseWriter, r *http.Request) {
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(res))
w.Write(res)
}
func checkRole(w http.ResponseWriter, r *http.Request, role string) error {
......
......@@ -387,3 +387,21 @@ func TestGetBeamtimeInfo(t *testing.T) {
}
}
func TestExpiredToken(t *testing.T) {
Auth = authorization.NewAuth(utils.NewJWTAuth("secret_user"), utils.NewJWTAuth("secret_admin"), utils.NewJWTAuth("secret"))
token := "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2MzU3NTMxMDksImp0aSI6ImMyOTR0NWFodHY1am9vZHVoaGNnIiwic3ViIjoiYnRfMTEwMTIxNzEiLCJFeHRyYUNsYWltcyI6eyJBY2Nlc3NUeXBlcyI6WyJyZWFkIiwid3JpdGUiXX19.kITePbv_dXY2ACxpAQ-PeQJPQtnR02bMoFrXq0Pbcm0"
request := authorizationRequest{"asapo_test%%"+token, "host"}
creds, _ := getSourceCredentials(request)
creds.Token = token
creds.DataSource = "test"
creds.BeamtimeId = "11012171"
creds.Beamline = "p21.2"
_, err := authorizeByToken(creds)
assert.Error(t, err, "")
if (err!=nil) {
assert.Contains(t, err.Error(), "expired")
}
}
......@@ -125,7 +125,10 @@ func routeFolderToken(w http.ResponseWriter, r *http.Request) {
return
}
log.Debug("generated folder token for beamtime " + request.BeamtimeId + ", folder " + request.Folder)
log.WithFields(map[string]interface{}{
"folder":request.Folder,
"beamtime":request.BeamtimeId,
}).Debug("issued folder token")
answer := folderTokenResponce(token)
w.WriteHeader(http.StatusOK)
......
......@@ -46,7 +46,10 @@ func routeIntrospect(w http.ResponseWriter, r *http.Request) {
return
}
log.Debug("verified user token for "+response.Sub)
log.WithFields(map[string]interface{}{
"subject":response.Sub,
}).Debug("verified user token")
answer,_ := json.Marshal(&response)
w.WriteHeader(http.StatusOK)
......
......@@ -30,8 +30,8 @@ func extractUserTokenrequest(r *http.Request) (request structs.IssueTokenRequest
}
for _, ar := range request.AccessTypes {
if ar != "read" && ar != "write" && !(ar== "writeraw" && request.Subject["beamline"]!="") {
return request, errors.New("wrong requested access rights: "+ar)
if ar != "read" && ar != "write" && !(ar == "writeraw" && request.Subject["beamline"] != "") {
return request, errors.New("wrong requested access rights: " + ar)
}
}
......@@ -72,8 +72,12 @@ func issueUserToken(w http.ResponseWriter, r *http.Request) {
return
}
log.Debug("generated user token ")
log.WithFields(map[string]interface{}{
"id": claims.Id,
"subject": claims.Subject,
"validDays": request.DaysValid,
"accessTypes": request.AccessTypes,
}).Info("issued user token")
answer := authorization.UserTokenResponce(request, token)
w.WriteHeader(http.StatusOK)
......
......@@ -32,7 +32,12 @@ func revokeToken(w http.ResponseWriter, r *http.Request) {
return
}
log.Debug("revoked token " + rec.Token)
log.WithFields(map[string]interface{}{
"id": rec.Id,
"subject": rec.Subject,
"token": rec.Token,
}).Info("revoked token")
answer, _ := json.Marshal(&rec)
w.WriteHeader(http.StatusOK)
w.Write(answer)
......
package server
/*
import (
"asapo_authorizer/database"
"asapo_common/discovery"
"asapo_common/logger"
"errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"net/http"
"net/http/httptest"
"testing"
)
func setup() *database.MockedDatabase {
mock_db := new(database.MockedDatabase)
mock_db.On("Connect", mock.AnythingOfType("string")).Return(nil)
return mock_db
}
func setup_and_init(t *testing.T) *database.MockedDatabase {
mock_db := new(database.MockedDatabase)
mock_db.On("Connect", mock.AnythingOfType("string")).Return(nil)
InitDB(mock_db)
assertExpectations(t, mock_db)
return mock_db
}
func assertExpectations(t *testing.T, mock_db *database.MockedDatabase) {
mock_db.AssertExpectations(t)
mock_db.ExpectedCalls = nil
logger.MockLog.AssertExpectations(t)
logger.MockLog.ExpectedCalls = nil
}
var initDBTests = []struct {
address string
answer error
message string
}{
{"bad address", errors.New(""), "error on get bad address"},
{"good address", nil, "no error on good address"},
}
func TestInitDBWithWrongAddress(t *testing.T) {
mock_db := setup()
mock_db.ExpectedCalls = nil
settings.DatabaseServer = "0.0.0.0:0000"
for _, test := range initDBTests {
mock_db.On("Connect", "0.0.0.0:0000").Return(test.answer)
err := InitDB(mock_db)
assert.Equal(t, test.answer, err, test.message)
assertExpectations(t, mock_db)
}
db = nil
}
func TestInitDBWithAutoAddress(t *testing.T) {
mongo_address := "0.0.0.0:0000"
mock_db := setup()
mock_db.ExpectedCalls = nil
settings.DatabaseServer = "auto"
mock_server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, req.URL.String(), "/asapo-mongodb", "request string")
rw.Write([]byte(mongo_address))
}))
defer mock_server.Close()
discoveryService = discovery.CreateDiscoveryService(mock_server.Client(), mock_server.URL)
mock_db.On("Connect", "0.0.0.0:0000").Return(nil)
err := InitDB(mock_db)
assert.Equal(t, nil, err, "auto connect ok")
assertExpectations(t, mock_db)
db = nil
}
func TestReconnectDB(t *testing.T) {
mongo_address := "0.0.0.0:0000"
mock_server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, req.URL.String(), "/asapo-mongodb", "request string")
rw.Write([]byte(mongo_address))
}))
discoveryService = discovery.CreateDiscoveryService(mock_server.Client(), mock_server.URL)
defer mock_server.Close()
settings.DatabaseServer = "auto"
mock_db := setup_and_init(t)
mock_db.ExpectedCalls = nil
mongo_address = "1.0.0.0:0000"
mock_db.On("Close").Return()
mock_db.On("Connect", "1.0.0.0:0000").Return(nil)
err := ReconnectDb()
assert.Equal(t, nil, err, "auto connect ok")
assertExpectations(t, mock_db)
db = nil
}
func TestErrorWhenReconnectNotConnectedDB(t *testing.T) {
err := ReconnectDb()
assert.NotNil(t, err, "error reconnect")
db = nil
}
func TestCleanupDBWithoutInit(t *testing.T) {
mock_db := setup()
mock_db.AssertNotCalled(t, "Close")
CleanupDB()
}
func TestCleanupDBInit(t *testing.T) {
settings.DatabaseServer = "0.0.0.0"
mock_db := setup_and_init(t)
mock_db.On("Close").Return()
CleanupDB()
assertExpectations(t, mock_db)
}
*/
\ No newline at end of file
......@@ -51,7 +51,7 @@ func (store *TokenStore) initDB() (dbaddress string, err error) {
if dbaddress == "" {
return "", errors.New("no token_store servers found")
}
log.Debug("Got mongodb server: " + dbaddress)
log.WithFields(map[string]interface{}{"address": dbaddress}).Debug("found mongodb server")
}
return dbaddress, store.db.Connect(dbaddress)
......@@ -66,7 +66,7 @@ func (store *TokenStore) reconnectIfNeeded(db_error error) {
if dbaddress, err := store.reconnectDb(); err != nil {
log.Error("cannot reconnect to database: " + err.Error())
} else {
log.Debug("reconnected to database at" + dbaddress)
log.WithFields(map[string]interface{}{"address":dbaddress}).Debug("reconnected to database")
}
}
......@@ -196,7 +196,7 @@ func (store *TokenStore) loopGetRevokedTokens() {
next_update = 1
log.Error("cannot get revoked tokens: " + err.Error())
} else {
log.Debug("received revoked tokens list")
//log.Debug("received revoked tokens list")
next_update = common.Settings.UpdateRevokedTokensIntervalSec
tokens := make([]string, len(res))
for i, token := range res {
......
......@@ -60,6 +60,7 @@ func (suite *TokenStoreTestSuite) TestProcessRequestWithConnectionError() {
ExpectReconnect(suite.mock_db)
suite.mock_db.On("ProcessRequest", mock.Anything, mock.Anything).Return([]byte(""),
&DBError{utils.StatusServiceUnavailable, ""})
logger.MockLog.On("WithFields", mock.Anything)
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("reconnected")))
err := suite.store.AddToken(TokenRecord{})
......@@ -138,8 +139,6 @@ func (suite *TokenStoreTestSuite) TestProcessRequestCheckRevokedToken() {
Op: "read_records",
}
suite.mock_db.On("ProcessRequest", req, mock.Anything).Return([]byte(""), nil)
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("list")))
time.Sleep(time.Second*1)
res,err := suite.store.IsTokenRevoked("123")
suite.Equal(err, nil, "ok")
......
package database
import "asapo_common/utils"
import (
"asapo_common/logger"
"asapo_common/utils"
)
type Request struct {
DbName string
Beamtime string
DataSource string
Stream string
GroupId string
Op string
......@@ -12,6 +16,20 @@ type Request struct {
ExtraParam string
}
func (request *Request) Logger() logger.Logger {
return logger.WithFields(map[string]interface{}{
"beamtime": request.Beamtime,
"dataSource": decodeString(request.DataSource),
"stream": decodeString(request.Stream),
"groupId": decodeString(request.GroupId),
"operation": request.Op,
})
}
func (request *Request) DbName() string {
return request.Beamtime + "_" + request.DataSource
}
type Agent interface {
ProcessRequest(request Request) ([]byte, error)
Ping() error
......@@ -21,7 +39,7 @@ type Agent interface {
}
type DBSettings struct {
ReadFromInprocessPeriod int
ReadFromInprocessPeriod int
UpdateStreamCachePeriodMs int
}
......@@ -42,4 +60,3 @@ func GetStatusCodeFromError(err error) int {
return utils.StatusServiceUnavailable
}
}
......@@ -80,8 +80,9 @@ func encodeStringForColName(original string) (result string) {
}
func encodeRequest(request *Request) error {
request.DbName = encodeStringForDbName(request.DbName)
if len(request.DbName)> max_encoded_source_size {
request.DataSource = encodeStringForDbName(request.DataSource)
request.Beamtime = encodeStringForDbName(request.Beamtime)
if len(request.DbName())> max_encoded_source_size {
return &DBError{utils.StatusWrongInput, "source name is too long"}
}
......
......@@ -18,7 +18,8 @@ func TestEncoding(t *testing.T) {
assert.Equal(t, sourceDecoded, source)
r := Request{
DbName: source,
Beamtime: "bt",
DataSource: source,
Stream: stream,
GroupId: stream,
Op: "",
......@@ -29,7 +30,7 @@ func TestEncoding(t *testing.T) {
err := encodeRequest(&r)
assert.Equal(t, r.Stream, streamEncoded)
assert.Equal(t, r.GroupId, streamEncoded)
assert.Equal(t, r.DbName, sourceEncoded)
assert.Equal(t, r.DataSource, sourceEncoded)
assert.Nil(t, err)
}
......@@ -61,9 +62,10 @@ func TestEncodingTooLong(t *testing.T) {
for _, test := range encodeTests {
stream := RandomString(test.streamSize)
group := RandomString(test.groupSize)
source := RandomString(test.sourceSize)
source := RandomString(test.sourceSize-3)
r := Request{
DbName: source,
Beamtime: "bt",
DataSource: source,
Stream: stream,
GroupId: group,
Op: "",
......
//+build !test
//go:build !test
// +build !test
package database
import (
"asapo_common/logger"
log "asapo_common/logger"
"asapo_common/utils"
"context"
"encoding/json"
......@@ -84,10 +85,10 @@ const (
type fieldChangeRequest struct {
collectionName string
fieldName string
op int
max_ind int
val int
fieldName string
op int
max_ind int
val int
}
var dbSessionLock sync.Mutex
......@@ -199,7 +200,7 @@ func maxIndexQuery(request Request, returnIncompete bool) bson.M {
}
func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool) (max_id int, err error) {
c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.Stream)
c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream)
q := maxIndexQuery(request, returnIncompete)
opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true)
......@@ -227,7 +228,7 @@ func duplicateError(err error) bool {
func (db *Mongodb) setCounter(request Request, ind int) (err error) {
update := bson.M{"$set": bson.M{pointer_field_name: ind}}
opts := options.Update().SetUpsert(true)
c := db.client.Database(request.DbName).Collection(pointer_collection_name)
c := db.client.Database(request.DbName()).Collection(pointer_collection_name)
q := bson.M{"_id": request.GroupId + "_" + request.Stream}
_, err = c.UpdateOne(context.TODO(), q, update, opts)
return
......@@ -252,7 +253,7 @@ func (db *Mongodb) changeField(request Request, change fieldChangeRequest, res i
opts := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After)
q := bson.M{"_id": request.GroupId + "_" + request.Stream, change.fieldName: bson.M{"$lt": change.max_ind}}
c := db.client.Database(request.DbName).Collection(change.collectionName)
c := db.client.Database(request.DbName()).Collection(change.collectionName)
err = c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res)
if err != nil {
......@@ -306,12 +307,11 @@ func recordContainsPartialData(request Request, rec map[string]interface{}) bool
func (db *Mongodb) getRecordFromDb(request Request, id, id_max int) (res map[string]interface{}, err error) {
q := bson.M{"_id": id}
c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.Stream)
c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream)
err = c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res)
if err != nil {
answer := encodeAnswer(id, id_max, "")
log_str := "error getting record id " + strconv.Itoa(id) + " for " + request.DbName + " : " + err.Error()
logger.Debug(log_str)
request.Logger().WithFields(map[string]interface{}{"id": id, "cause": err.Error()}).Debug("error getting record")
return res, &DBError{utils.StatusNoData, answer}
}
return res, err
......@@ -327,8 +327,7 @@ func (db *Mongodb) getRecordByIDRaw(request Request, id, id_max int) ([]byte, er
return nil, err
}
log_str := "got record id " + strconv.Itoa(id) + " for " + request.DbName
logger.Debug(log_str)
request.Logger().WithFields(map[string]interface{}{"id": id}).Debug("got record from db")
record, err := utils.MapToJson(&res)
if err != nil {
......@@ -392,7 +391,7 @@ func (db *Mongodb) negAckRecord(request Request) ([]byte, error) {
return nil, &DBError{utils.StatusWrongInput, err.Error()}
}
err = db.InsertRecordToInprocess(request.DbName, inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, input.Id, input.Params.DelayMs, 1, true)
err = db.InsertRecordToInprocess(request.DbName(), inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, input.Id, input.Params.DelayMs, 1, true)
return []byte(""), err
}
......@@ -402,7 +401,7 @@ func (db *Mongodb) ackRecord(request Request) ([]byte, error) {
if err != nil {
return nil, &DBError{utils.StatusWrongInput, err.Error()}
}
c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId)
c := db.client.Database(request.DbName()).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId)
_, err = c.InsertOne(context.Background(), &record)
if err != nil {
if duplicateError(err) {
......@@ -411,7 +410,7 @@ func (db *Mongodb) ackRecord(request Request) ([]byte, error) {
return nil, err
}
c = db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.Stream + "_" + request.GroupId)
c = db.client.Database(request.DbName()).Collection(inprocess_collection_name_prefix + request.Stream + "_" + request.GroupId)
_, err_del := c.DeleteOne(context.Background(), bson.M{"_id": record.ID})
if err_del != nil {
return nil, &DBError{utils.StatusWrongInput, err.Error()}
......@@ -425,7 +424,7 @@ func (db *Mongodb) checkDatabaseOperationPrerequisites(request Request) error {
return &DBError{utils.StatusServiceUnavailable, no_session_msg}
}
if len(request.DbName) == 0 || len(request.Stream) == 0 {
if len(request.DbName()) <= 1 || len(request.Stream) == 0 {
return &DBError{utils.StatusWrongInput, "beamtime_id ans stream must be set"}
}
......@@ -445,9 +444,9 @@ func (db *Mongodb) getCurrentPointer(request Request) (LocationPointer, int, err
var curPointer LocationPointer
err = db.changeField(request, fieldChangeRequest{
collectionName: pointer_collection_name,
fieldName: pointer_field_name,
op: field_op_inc,
max_ind: max_ind}, &curPointer)
fieldName: pointer_field_name,
op: field_op_inc,
max_ind: max_ind}, &curPointer)
if err != nil {
return LocationPointer{}, 0, err
}
......@@ -455,7 +454,7 @@ func (db *Mongodb) getCurrentPointer(request Request) (LocationPointer, int, err
return curPointer, max_ind, nil
}
func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, delayMs int, nResendAttempts int) (int, error) {
func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, delayMs int, nResendAttempts int, rlog log.Logger) (int, error) {
var res InProcessingRecord
opts := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After)
tNow := time.Now().UnixNano()
......@@ -476,8 +475,7 @@ func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, delay
return 0, err
}
log_str := "got unprocessed id " + strconv.Itoa(res.ID) + " for " + dbname
logger.Debug(log_str)
rlog.WithFields(map[string]interface{}{"id": res.ID}).Debug("got unprocessed message")
return res.ID, nil
}
......@@ -527,10 +525,10 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi
t := db.lastReadFromInprocess[request.Stream+"_"+request.GroupId]
dbSessionLock.Unlock()
if (t <= tNow-int64(db.settings.ReadFromInprocessPeriod)) || ignoreTimeout {
record_ind, err = db.getUnProcessedId(request.DbName, inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, delayMs, nResendAttempts)
record_ind, err = db.getUnProcessedId(request.DbName(), inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, delayMs, nResendAttempts,
request.Logger())
if err != nil {
log_str := "error getting unprocessed id " + request.DbName + ", groupid: " + request.GroupId + ":" + err.Error()
logger.Debug(log_str)
request.Logger().WithFields(map[string]interface{}{"cause": err.Error()}).Debug("error getting unprocessed message")
return 0, 0, err
}
}
......@@ -552,12 +550,10 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi
func (db *Mongodb) getNextAndMaxIndexesFromCurPointer(request Request) (int, int, error) {
curPointer, max_ind, err := db.getCurrentPointer(request)
if err != nil {
log_str := "error getting next pointer for " + request.DbName + ", groupid: " + request.GroupId + ":" + err.Error()
logger.Debug(log_str)
request.Logger().WithFields(map[string]interface{}{"cause": err.Error()}).Debug("error getting next pointer")
return 0, 0, err
}
log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + request.DbName + ", groupid: " + request.GroupId
logger.Debug(log_str)
request.Logger().WithFields(map[string]interface{}{"id": curPointer.Value}).Debug("got next pointer")
return curPointer.Value, max_ind, nil
}
......@@ -622,8 +618,7 @@ func checkStreamFinished(request Request, id, id_max int, data map[string]interf
if !ok || !r.FinishedStream {
return nil
}
log_str := "reached end of stream " + request.Stream + " , next_stream: " + r.NextStream
logger.Debug(log_str)
request.Logger().WithFields(map[string]interface{}{"nextStream": r.NextStream}).Debug("reached end of stream")
answer := encodeAnswer(r.ID-1, r.ID-1, r.NextStream)
return &DBError{utils.StatusNoData, answer}
......@@ -641,7 +636,7 @@ func (db *Mongodb) getNextRecord(request Request) ([]byte, error) {
}
if err == nil {
err_update := db.InsertToInprocessIfNeeded(request.DbName, inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, nextInd, request.ExtraParam)
err_update := db.InsertToInprocessIfNeeded(request.DbName(), inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, nextInd, request.ExtraParam)
if err_update != nil {
return nil, err_update
}
......@@ -666,10 +661,10 @@ func (db *Mongodb) getLastRecordInGroup(request Request) ([]byte, error) {
var res map[string]interface{}
err = db.changeField(request, fieldChangeRequest{
collectionName: last_message_collection_name,
fieldName: last_message_field_name,
op: field_op_set,
max_ind: max_ind,
val: max_ind,
fieldName: last_message_field_name,
op: field_op_set,
max_ind: max_ind,
val: max_ind,
}, &res)
if err != nil {
return nil, err
......@@ -689,7 +684,7 @@ func getSizeFilter(request Request) bson.M {
}
func (db *Mongodb) getSize(request Request) ([]byte, error) {
c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.Stream)
c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream)
filter := getSizeFilter(request)
size, err := c.CountDocuments(context.TODO(), filter, options.Count())
......@@ -716,7 +711,7 @@ func (db *Mongodb) resetCounter(request Request) ([]byte, error) {
return []byte(""), err
}
c := db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.Stream + "_" + request.GroupId)
c := db.client.Database(request.DbName()).Collection(inprocess_collection_name_prefix + request.Stream + "_" + request.GroupId)
_, err_del := c.DeleteMany(context.Background(), bson.M{"_id": bson.M{"$gte": id}})
if err_del != nil {
return nil, &DBError{utils.StatusWrongInput, err_del.Error()}
......@@ -743,40 +738,35 @@ func (db *Mongodb) getMeta(request Request) ([]byte, error) {
}
q := bson.M{"_id": id}
var res map[string]interface{}
c := db.client.Database(request.DbName).Collection(meta_collection_name)
c := db.client.Database(request.DbName()).Collection(meta_collection_name)
err = c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res)
if err != nil {
log_str := "error getting meta for " + id + " in " + request.DbName + " : " + err.Error()
logger.Debug(log_str)
request.Logger().WithFields(map[string]interface{}{"id": id, "cause": err.Error()}).Debug("error getting meta")
return nil, &DBError{utils.StatusNoData, err.Error()}
}
userMeta, ok := res["meta"]
if !ok {
log_str := "error getting meta for " + id + " in " + request.DbName + " : cannot parse database response"
logger.Error(log_str)
return nil, errors.New(log_str)
request.Logger().WithFields(map[string]interface{}{"id": id, "cause": "cannot parse database response"}).Debug("error getting meta")
return nil, errors.New("cannot get metadata")
}
log_str := "got metadata for " + id + " in " + request.DbName
logger.Debug(log_str)
request.Logger().WithFields(map[string]interface{}{"id": id}).Error("got metadata")
return utils.MapToJson(&userMeta)
}
func (db *Mongodb) processQueryError(query, dbname string, err error) ([]byte, error) {
log_str := "error processing query: " + query + " for " + dbname + " : " + err.Error()
logger.Debug(log_str)
func (db *Mongodb) processQueryError(query, dbname string, err error, rlog log.Logger) ([]byte, error) {
rlog.WithFields(map[string]interface{}{"query": query, "cause": err.Error()}).Debug("error processing query")
return nil, &DBError{utils.StatusNoData, err.Error()}
}
func (db *Mongodb) queryMessages(request Request) ([]byte, error) {
var res []map[string]interface{}
q, sort, err := db.BSONFromSQL(request.DbName, request.ExtraParam)
q, sort, err := db.BSONFromSQL(request.DbName(), request.ExtraParam)
if err != nil {
log_str := "error parsing query: " + request.ExtraParam + " for " + request.DbName + " : " + err.Error()
logger.Debug(log_str)
request.Logger().WithFields(map[string]interface{}{"query": request.ExtraParam, "cause": err.Error()}).Debug("error parsing query")
return nil, &DBError{utils.StatusWrongInput, err.Error()}
}
c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.Stream)
c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream)
opts := options.Find()
if len(sort) > 0 {
......@@ -786,15 +776,15 @@ func (db *Mongodb) queryMessages(request Request) ([]byte, error) {
cursor, err := c.Find(context.TODO(), q, opts)
if err != nil {
return db.processQueryError(request.ExtraParam, request.DbName, err)
return db.processQueryError(request.ExtraParam, request.DbName(), err, request.Logger())
}
err = cursor.All(context.TODO(), &res)
if err != nil {
return db.processQueryError(request.ExtraParam, request.DbName, err)
return db.processQueryError(request.ExtraParam, request.DbName(), err, request.Logger())
}
log_str := "processed query " + request.ExtraParam + " for " + request.DbName + " ,found" + strconv.Itoa(len(res)) + " records"
logger.Debug(log_str)
request.Logger().WithFields(map[string]interface{}{"query": request.ExtraParam, "recordsFound": len(res)}).Debug("processed query")
if res != nil {
return utils.MapToJson(&res)
} else {
......@@ -880,11 +870,11 @@ func (db *Mongodb) nacks(request Request) ([]byte, error) {
}
func (db *Mongodb) deleteCollection(request Request, name string) error {
return db.client.Database(request.DbName).Collection(name).Drop(context.Background())
return db.client.Database(request.DbName()).Collection(name).Drop(context.Background())
}
func (db *Mongodb) collectionExist(request Request, name string) (bool, error) {
result, err := db.client.Database(request.DbName).ListCollectionNames(context.TODO(), bson.M{"name": name})
result, err := db.client.Database(request.DbName()).ListCollectionNames(context.TODO(), bson.M{"name": name})
if err != nil {
return false, err
}
......@@ -910,7 +900,7 @@ func (db *Mongodb) deleteDataCollection(errorOnNotexist bool, request Request) e
func (db *Mongodb) deleteDocumentsInCollection(request Request, collection string, field string, pattern string) error {
filter := bson.M{field: bson.D{{"$regex", primitive.Regex{Pattern: pattern, Options: "i"}}}}
_, err := db.client.Database(request.DbName).Collection(collection).DeleteMany(context.TODO(), filter)
_, err := db.client.Database(request.DbName()).Collection(collection).DeleteMany(context.TODO(), filter)
return err
}
......@@ -923,7 +913,7 @@ func escapeQuery(query string) (res string) {
}
func (db *Mongodb) deleteCollectionsWithPrefix(request Request, prefix string) error {
cols, err := db.client.Database(request.DbName).ListCollectionNames(context.TODO(), bson.M{"name": bson.D{
cols, err := db.client.Database(request.DbName()).ListCollectionNames(context.TODO(), bson.M{"name": bson.D{
{"$regex", primitive.Regex{Pattern: "^" + escapeQuery(prefix), Options: "i"}}}})
if err != nil {
return err
......@@ -966,7 +956,7 @@ func (db *Mongodb) deleteStream(request Request) ([]byte, error) {
return nil, &DBError{utils.StatusWrongInput, "wrong params: " + request.ExtraParam}
}
if !*params.DeleteMeta {
logger.Debug("skipping delete stream meta for " + request.Stream + " in " + request.DbName)
request.Logger().Debug("skipping delete stream meta")
return nil, nil
}
......@@ -980,7 +970,7 @@ func (db *Mongodb) deleteStream(request Request) ([]byte, error) {
}
func (db *Mongodb) lastAck(request Request) ([]byte, error) {
c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId)
c := db.client.Database(request.DbName()).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId)
opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true)
result := LastAck{0}
var q bson.M = nil
......@@ -1047,7 +1037,7 @@ func extractNacsFromCursor(err error, cursor *mongo.Cursor) ([]int, error) {
}
func (db *Mongodb) getNacks(request Request, min_index, max_index int) ([]int, error) {
c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId)
c := db.client.Database(request.DbName()).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId)
if res, err, ok := db.canAvoidDbRequest(min_index, max_index, c); ok {
return res, err
......@@ -1062,7 +1052,7 @@ func (db *Mongodb) getNacks(request Request, min_index, max_index int) ([]int, e
func (db *Mongodb) getStreams(request Request) ([]byte, error) {
rec, err := streams.getStreams(db, request)
if err != nil {
return db.processQueryError("get streams", request.DbName, err)
return db.processQueryError("get streams", request.DbName(), err, request.Logger())
}
return json.Marshal(&rec)
}
......
......@@ -36,7 +36,7 @@ var streams = Streams{lastSynced: make(map[string]time.Time, 0),lastUpdated: mak
var streamsLock sync.Mutex
func (ss *Streams) tryGetFromCache(db_name string, updatePeriodMs int) (StreamsRecord, error) {
if time.Now().Sub(ss.lastUpdated[db_name]).Milliseconds() > int64(updatePeriodMs) {
if time.Now().Sub(ss.lastUpdated[db_name]).Milliseconds() >= int64(updatePeriodMs) {
return StreamsRecord{}, errors.New("cache expired")
}
rec, ok := ss.records[db_name]
......@@ -265,9 +265,9 @@ func (ss *Streams) getStreams(db *Mongodb, request Request) (StreamsRecord, erro
}
streamsLock.Lock()
rec, err := ss.tryGetFromCache(request.DbName, db.settings.UpdateStreamCachePeriodMs)
rec, err := ss.tryGetFromCache(request.DbName(), db.settings.UpdateStreamCachePeriodMs)
if err != nil {
rec, err = ss.updateFromDb(db, request.DbName)
rec, err = ss.updateFromDb(db, request.DbName())
}
streamsLock.Unlock()
if err != nil {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment