Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • asapo/asapo
  • joao.alvim/asapo
  • philipp.middendorf/asapo
  • stefan.dietrich/asapo
4 results
Show changes
Showing
with 1490 additions and 764 deletions
......@@ -7,15 +7,16 @@ import (
"asapo_common/utils"
"encoding/json"
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"io/ioutil"
"net/http"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
var IssueTokenTests = [] struct {
var IssueTokenTests = []struct {
requestSubject map[string]string
tokenSubject string
roles []string
......@@ -25,46 +26,45 @@ var IssueTokenTests = [] struct {
status int
message string
}{
{map[string]string{"beamtimeId":"test"},"bt_test",[]string{"read"},180,prepareAdminToken("admin"),"aaa",http.StatusOK,"read for beamtime"},
{map[string]string{"beamtimeId":"test"},"bt_test",[]string{"write"},90,prepareAdminToken("admin"),"aaa",http.StatusOK,"write for beamtime"},
{map[string]string{"beamtimeId":"test"},"bt_test",[]string{"writeraw"},90,prepareAdminToken("admin"),"",http.StatusBadRequest,"wrong role"},
{map[string]string{"beamline":"test"},"bl_test",[]string{"writeraw"},90,prepareAdminToken("admin"),"aaa",http.StatusOK,"raw for beamline"},
{map[string]string{"beamline":"test"},"bl_test",[]string{"read"},180,prepareAdminToken("admin"),"aaa",http.StatusOK,"read for beamline"},
{map[string]string{"blabla":"test"},"",[]string{"read"},180,prepareAdminToken("admin"),"",http.StatusBadRequest,"beamline or beamtime not given"},
{map[string]string{"beamtimeId":"test"},"",[]string{"bla"},180,prepareAdminToken("admin"),"",http.StatusBadRequest,"wrong role"},
{map[string]string{"beamtimeId":"test"},"",[]string{"read"},180,prepareAdminToken("bla"),"",http.StatusUnauthorized,"wrong admin token"},
{map[string]string{"beamtimeId":"test"},"bt_test",[]string{"read"},0,prepareAdminToken("admin"),"aaa",http.StatusBadRequest,"0 valid days"},
{map[string]string{"beamtimeId": "test"}, "bt_test", []string{"read"}, 180, prepareAdminToken("admin"), "aaa", http.StatusOK, "read for beamtime"},
{map[string]string{"beamtimeId": "test"}, "bt_test", []string{"write"}, 90, prepareAdminToken("admin"), "aaa", http.StatusOK, "write for beamtime"},
{map[string]string{"beamtimeId": "test"}, "bt_test", []string{"writeraw"}, 90, prepareAdminToken("admin"), "", http.StatusBadRequest, "wrong role"},
{map[string]string{"beamline": "test"}, "bl_test", []string{"writeraw"}, 90, prepareAdminToken("admin"), "aaa", http.StatusOK, "raw for beamline"},
{map[string]string{"beamline": "test"}, "bl_test", []string{"read"}, 180, prepareAdminToken("admin"), "aaa", http.StatusOK, "read for beamline"},
{map[string]string{"blabla": "test"}, "", []string{"read"}, 180, prepareAdminToken("admin"), "", http.StatusBadRequest, "beamline or beamtime not given"},
{map[string]string{"beamtimeId": "test"}, "", []string{"bla"}, 180, prepareAdminToken("admin"), "", http.StatusBadRequest, "wrong role"},
{map[string]string{"beamtimeId": "test"}, "", []string{"read"}, 180, prepareAdminToken("bla"), "", http.StatusUnauthorized, "wrong admin token"},
{map[string]string{"beamtimeId": "test"}, "bt_test", []string{"read"}, 0, prepareAdminToken("admin"), "aaa", http.StatusBadRequest, "0 valid days"},
}
func TestIssueToken(t *testing.T) {
authJWT := utils.NewJWTAuth("secret")
authAdmin := utils.NewJWTAuth("secret_admin")
authUser := utils.NewJWTAuth("secret_user")
Auth = authorization.NewAuth(authUser,authAdmin,authJWT)
Auth = authorization.NewAuth(authUser, authAdmin, authJWT)
mock_store := new(token_store.MockedStore)
store = mock_store
for _, test := range IssueTokenTests {
request := makeRequest(structs.IssueTokenRequest{test.requestSubject,test.validDays,test.roles})
mock_store.On("IsTokenRevoked", mock.Anything).Return(false,nil)
request := makeRequest(structs.IssueTokenRequest{test.requestSubject, test.validDays, test.roles})
mock_store.On("IsTokenRevoked", mock.Anything).Return(false, nil)
if test.status == http.StatusOK {
mock_store.On("AddToken", mock.Anything).Return(nil)
}
w := doPostRequest("/admin/issue",request,authAdmin.Name()+" "+test.adminToken)
w := doPostRequest("/admin/issue", request, authAdmin.Name()+" "+test.adminToken)
if w.Code == http.StatusOK {
body, _ := ioutil.ReadAll(w.Body)
var token structs.IssueTokenResponse
json.Unmarshal(body,&token)
claims,_ := utils.CheckJWTToken(token.Token,"secret_user")
cclaims,_:= claims.(*utils.CustomClaims)
json.Unmarshal(body, &token)
claims, _ := utils.CheckJWTToken(token.Token, "secret_user")
cclaims, _ := claims.(*utils.CustomClaims)
var extra_claim structs.AccessTokenExtraClaim
utils.MapToStruct(claims.(*utils.CustomClaims).ExtraClaims.(map[string]interface{}), &extra_claim)
assert.Equal(t, cclaims.Subject , test.tokenSubject, test.message)
assert.True(t, cclaims.ExpiresAt-time.Now().Unix()>int64(test.validDays)*24*60*60-10, test.message)
assert.True(t, cclaims.ExpiresAt-time.Now().Unix()<int64(test.validDays)*24*60*60+10, test.message)
assert.Equal(t, cclaims.Subject, test.tokenSubject, test.message)
assert.True(t, cclaims.ExpiresAt-time.Now().Unix() > int64(test.validDays)*24*60*60-10, test.message)
assert.True(t, cclaims.ExpiresAt-time.Now().Unix() < int64(test.validDays)*24*60*60+10, test.message)
assert.Equal(t, extra_claim.AccessTypes, test.roles, test.message)
assert.NotEmpty(t, cclaims.Id , test.message)
assert.NotEmpty(t, cclaims.Id, test.message)
} else {
body, _ := ioutil.ReadAll(w.Body)
fmt.Println(string(body))
......@@ -76,4 +76,3 @@ func TestIssueToken(t *testing.T) {
assert.Equal(t, test.status, w.Code, test.message)
}
}
......@@ -27,12 +27,17 @@ func revokeToken(w http.ResponseWriter, r *http.Request) {
rec, err := store.RevokeToken(token, "")
if err != nil {
log.Error("could not revoke token "+ token+": "+ err.Error())
log.Error("could not revoke token " + token + ": " + err.Error())
utils.WriteServerError(w, err, http.StatusBadRequest)
return
}
log.Debug("revoked token " + rec.Token)
log.WithFields(map[string]interface{}{
"id": rec.Id,
"subject": rec.Subject,
"token": rec.Token,
}).Info("revoked token")
answer, _ := json.Marshal(&rec)
w.WriteHeader(http.StatusOK)
w.Write(answer)
......
//+build !test
//go:build !test
// +build !test
package server
......@@ -23,7 +24,7 @@ func Start() {
log.Info("Starting ASAPO Authorizer, version " + version.GetVersion())
store = new(token_store.TokenStore)
err := store.Init(nil)
err := store.Init(nil)
if err != nil {
log.Error(err.Error())
}
......@@ -34,7 +35,7 @@ func Start() {
log.Fatal(http.ListenAndServe(":"+strconv.Itoa(common.Settings.Port), http.HandlerFunc(mux.ServeHTTP)))
}
func createAuth() (*authorization.Auth,error) {
func createAuth() (*authorization.Auth, error) {
secret, err := utils.ReadFirstStringFromFile(common.Settings.UserSecretFile)
if err != nil {
return nil, err
......@@ -43,7 +44,7 @@ func createAuth() (*authorization.Auth,error) {
if err != nil {
return nil, err
}
return authorization.NewAuth(utils.NewJWTAuth(secret), utils.NewJWTAuth(adminSecret), utils.NewJWTAuth(secret)),nil
return authorization.NewAuth(utils.NewJWTAuth(secret), utils.NewJWTAuth(adminSecret), utils.NewJWTAuth(secret)), nil
}
func ReadConfig(fname string) (log.Level, error) {
......@@ -67,7 +68,7 @@ func ReadConfig(fname string) (log.Level, error) {
return log.FatalLevel, errors.New("Database server not set")
}
if common.Settings.DatabaseServer == "auto" && common.Settings.DiscoveryServer=="" {
if common.Settings.DatabaseServer == "auto" && common.Settings.DiscoveryServer == "" {
return log.FatalLevel, errors.New("Discovery server not set")
}
......
package server
/*
import (
"asapo_authorizer/database"
"asapo_common/discovery"
"asapo_common/logger"
"errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"net/http"
"net/http/httptest"
"testing"
)
func setup() *database.MockedDatabase {
mock_db := new(database.MockedDatabase)
mock_db.On("Connect", mock.AnythingOfType("string")).Return(nil)
return mock_db
}
func setup_and_init(t *testing.T) *database.MockedDatabase {
mock_db := new(database.MockedDatabase)
mock_db.On("Connect", mock.AnythingOfType("string")).Return(nil)
InitDB(mock_db)
assertExpectations(t, mock_db)
return mock_db
}
func assertExpectations(t *testing.T, mock_db *database.MockedDatabase) {
mock_db.AssertExpectations(t)
mock_db.ExpectedCalls = nil
logger.MockLog.AssertExpectations(t)
logger.MockLog.ExpectedCalls = nil
}
var initDBTests = []struct {
address string
answer error
message string
}{
{"bad address", errors.New(""), "error on get bad address"},
{"good address", nil, "no error on good address"},
}
func TestInitDBWithWrongAddress(t *testing.T) {
mock_db := setup()
mock_db.ExpectedCalls = nil
settings.DatabaseServer = "0.0.0.0:0000"
for _, test := range initDBTests {
mock_db.On("Connect", "0.0.0.0:0000").Return(test.answer)
err := InitDB(mock_db)
assert.Equal(t, test.answer, err, test.message)
assertExpectations(t, mock_db)
}
db = nil
}
func TestInitDBWithAutoAddress(t *testing.T) {
mongo_address := "0.0.0.0:0000"
mock_db := setup()
mock_db.ExpectedCalls = nil
settings.DatabaseServer = "auto"
mock_server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, req.URL.String(), "/asapo-mongodb", "request string")
rw.Write([]byte(mongo_address))
}))
defer mock_server.Close()
discoveryService = discovery.CreateDiscoveryService(mock_server.Client(), mock_server.URL)
mock_db.On("Connect", "0.0.0.0:0000").Return(nil)
err := InitDB(mock_db)
assert.Equal(t, nil, err, "auto connect ok")
assertExpectations(t, mock_db)
db = nil
}
func TestReconnectDB(t *testing.T) {
mongo_address := "0.0.0.0:0000"
mock_server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, req.URL.String(), "/asapo-mongodb", "request string")
rw.Write([]byte(mongo_address))
}))
discoveryService = discovery.CreateDiscoveryService(mock_server.Client(), mock_server.URL)
defer mock_server.Close()
settings.DatabaseServer = "auto"
mock_db := setup_and_init(t)
mock_db.ExpectedCalls = nil
mongo_address = "1.0.0.0:0000"
mock_db.On("Close").Return()
mock_db.On("Connect", "1.0.0.0:0000").Return(nil)
err := ReconnectDb()
assert.Equal(t, nil, err, "auto connect ok")
assertExpectations(t, mock_db)
db = nil
}
func TestErrorWhenReconnectNotConnectedDB(t *testing.T) {
err := ReconnectDb()
assert.NotNil(t, err, "error reconnect")
db = nil
}
func TestCleanupDBWithoutInit(t *testing.T) {
mock_db := setup()
mock_db.AssertNotCalled(t, "Close")
CleanupDB()
}
func TestCleanupDBInit(t *testing.T) {
settings.DatabaseServer = "0.0.0.0"
mock_db := setup_and_init(t)
mock_db.On("Close").Return()
CleanupDB()
assertExpectations(t, mock_db)
}
*/
\ No newline at end of file
......@@ -2,21 +2,20 @@ package token_store
import "asapo_common/utils"
const KAdminDb = "asapo_admin"
const KTokens = "tokens"
const KRevokedTokens = "revoked_tokens"
type Request struct {
DbName string
Collection string
Op string
DbName string
Collection string
Op string
}
type TokenRecord struct {
Id string `bson:"_id"`
*utils.CustomClaims
Token string
Token string
Revoked bool
}
......
//go:build !release
// +build !release
package token_store
......@@ -25,11 +26,10 @@ func (db *MockedDatabase) Ping() error {
}
func (db *MockedDatabase) ProcessRequest(request Request, extraParams ...interface{}) (answer []byte, err error) {
args := db.Called(request,extraParams)
args := db.Called(request, extraParams)
return args.Get(0).([]byte), args.Error(1)
}
type FakeDatabase struct {
}
......@@ -46,5 +46,5 @@ func (db *FakeDatabase) Ping() error {
}
func (db *FakeDatabase) ProcessRequest(request Request, extraParams ...interface{}) (answer []byte, err error) {
return nil,nil
return nil, nil
}
//go:build !release
// +build !release
package token_store
......@@ -15,22 +16,22 @@ func (store *MockedStore) Init(db Agent) error {
return args.Error(0)
}
func (store *MockedStore) AddToken(token TokenRecord) error {
func (store *MockedStore) AddToken(token TokenRecord) error {
args := store.Called(token)
return args.Error(0)
}
func (store *MockedStore) RevokeToken(token string,id string) (TokenRecord, error) {
args := store.Called(token,id)
func (store *MockedStore) RevokeToken(token string, id string) (TokenRecord, error) {
args := store.Called(token, id)
return args.Get(0).(TokenRecord), args.Error(1)
}
func (store *MockedStore) GetTokenList() ([]TokenRecord,error) {
func (store *MockedStore) GetTokenList() ([]TokenRecord, error) {
args := store.Called()
return args.Get(0).([]TokenRecord), args.Error(1)
}
func (store *MockedStore) IsTokenRevoked(tokenId string) (bool, error) {
func (store *MockedStore) IsTokenRevoked(tokenId string) (bool, error) {
args := store.Called(tokenId)
return args.Get(0).(bool), args.Error(1)
}
......@@ -39,5 +40,3 @@ func (store *MockedStore) Close() {
store.Called()
return
}
//+build !test
//go:build !test
// +build !test
package token_store
......@@ -6,13 +7,14 @@ import (
"asapo_common/utils"
"context"
"errors"
"strings"
"sync"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"strings"
"sync"
"time"
)
const no_session_msg = "database client not created"
......@@ -186,7 +188,6 @@ func (db *Mongodb) updateRecord(request Request, extra_params ...interface{}) ([
return nil, err
}
func (db *Mongodb) ProcessRequest(request Request, extraParams ...interface{}) (answer []byte, err error) {
dbClientLock.RLock()
defer dbClientLock.RUnlock()
......
......@@ -51,7 +51,7 @@ func (store *TokenStore) initDB() (dbaddress string, err error) {
if dbaddress == "" {
return "", errors.New("no token_store servers found")
}
log.Debug("Got mongodb server: " + dbaddress)
log.WithFields(map[string]interface{}{"address": dbaddress}).Debug("found mongodb server")
}
return dbaddress, store.db.Connect(dbaddress)
......@@ -66,7 +66,7 @@ func (store *TokenStore) reconnectIfNeeded(db_error error) {
if dbaddress, err := store.reconnectDb(); err != nil {
log.Error("cannot reconnect to database: " + err.Error())
} else {
log.Debug("reconnected to database at" + dbaddress)
log.WithFields(map[string]interface{}{"address": dbaddress}).Debug("reconnected to database")
}
}
......@@ -196,7 +196,7 @@ func (store *TokenStore) loopGetRevokedTokens() {
next_update = 1
log.Error("cannot get revoked tokens: " + err.Error())
} else {
log.Debug("received revoked tokens list")
//log.Debug("received revoked tokens list")
next_update = common.Settings.UpdateRevokedTokensIntervalSec
tokens := make([]string, len(res))
for i, token := range res {
......
......@@ -4,11 +4,12 @@ import (
"asapo_authorizer/common"
"asapo_common/logger"
"asapo_common/utils"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"strings"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
)
const expectedSource = "datasource"
......@@ -26,18 +27,17 @@ func assertExpectations(t *testing.T, mock_db *MockedDatabase) {
logger.MockLog.ExpectedCalls = nil
}
type TokenStoreTestSuite struct {
suite.Suite
mock_db *MockedDatabase
store Store
store Store
}
func (suite *TokenStoreTestSuite) SetupTest() {
common.Settings.UpdateRevokedTokensIntervalSec = 0
suite.mock_db = new(MockedDatabase)
suite.store = new(TokenStore)
suite.mock_db.On("Connect", mock.Anything).Return( nil)
suite.mock_db.On("Connect", mock.Anything).Return(nil)
suite.store.Init(suite.mock_db)
logger.SetMockLog()
}
......@@ -60,6 +60,7 @@ func (suite *TokenStoreTestSuite) TestProcessRequestWithConnectionError() {
ExpectReconnect(suite.mock_db)
suite.mock_db.On("ProcessRequest", mock.Anything, mock.Anything).Return([]byte(""),
&DBError{utils.StatusServiceUnavailable, ""})
logger.MockLog.On("WithFields", mock.Anything)
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("reconnected")))
err := suite.store.AddToken(TokenRecord{})
......@@ -67,7 +68,6 @@ func (suite *TokenStoreTestSuite) TestProcessRequestWithConnectionError() {
suite.Error(err, "need reconnect")
}
func (suite *TokenStoreTestSuite) TestProcessRequestAddToken() {
req := Request{
DbName: "asapo_admin",
......@@ -79,7 +79,6 @@ func (suite *TokenStoreTestSuite) TestProcessRequestAddToken() {
suite.Equal(err, nil, "ok")
}
func (suite *TokenStoreTestSuite) TestProcessRequestGetTokenList() {
req := Request{
DbName: "asapo_admin",
......@@ -87,21 +86,20 @@ func (suite *TokenStoreTestSuite) TestProcessRequestGetTokenList() {
Op: "read_records",
}
suite.mock_db.On("ProcessRequest", req, mock.Anything).Return([]byte(""), nil)
_,err := suite.store.GetTokenList()
_, err := suite.store.GetTokenList()
suite.Equal(err, nil, "ok")
}
func (suite *TokenStoreTestSuite) TestProcessRequestRevokeToken() {
req := Request{
DbName: "asapo_admin",
Collection: "tokens",
Op: "read_record",
}
expectedToken := TokenRecord{Id :"1234", Token:"token",Revoked: false}
expectedToken := TokenRecord{Id: "1234", Token: "token", Revoked: false}
expectedRevokedToken := expectedToken
expectedRevokedToken.Revoked = true
suite.mock_db.On("ProcessRequest", req,mock.Anything).Return([]byte(""), nil).Run(func(args mock.Arguments) {
suite.mock_db.On("ProcessRequest", req, mock.Anything).Return([]byte(""), nil).Run(func(args mock.Arguments) {
rec := args.Get(1).([]interface{})[1].(*TokenRecord)
*rec = expectedToken
})
......@@ -110,7 +108,7 @@ func (suite *TokenStoreTestSuite) TestProcessRequestRevokeToken() {
DbName: KAdminDb,
Collection: KTokens,
Op: "update_record"}
suite.mock_db.On("ProcessRequest", req,mock.Anything).Return([]byte(""), nil).Run(func(args mock.Arguments) {
suite.mock_db.On("ProcessRequest", req, mock.Anything).Return([]byte(""), nil).Run(func(args mock.Arguments) {
rec := args.Get(1).([]interface{})[3].(*TokenRecord)
*rec = expectedRevokedToken
})
......@@ -122,7 +120,7 @@ func (suite *TokenStoreTestSuite) TestProcessRequestRevokeToken() {
}
suite.mock_db.On("ProcessRequest", req, mock.Anything).Return([]byte(""), nil)
token,err := suite.store.RevokeToken(expectedToken.Token,"")
token, err := suite.store.RevokeToken(expectedToken.Token, "")
suite.Equal(err, nil, "ok")
suite.Equal(token, expectedRevokedToken, "ok")
}
......@@ -138,10 +136,8 @@ func (suite *TokenStoreTestSuite) TestProcessRequestCheckRevokedToken() {
Op: "read_records",
}
suite.mock_db.On("ProcessRequest", req, mock.Anything).Return([]byte(""), nil)
logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("list")))
time.Sleep(time.Second*1)
res,err := suite.store.IsTokenRevoked("123")
time.Sleep(time.Second * 1)
res, err := suite.store.IsTokenRevoked("123")
suite.Equal(err, nil, "ok")
suite.Equal(false, res, "ok")
}
\ No newline at end of file
}
......@@ -24,4 +24,5 @@ gotest(${TARGET_NAME} "${CMAKE_CURRENT_SOURCE_DIR}/src/asapo_broker" "./...")
go_integration_test(${TARGET_NAME}-connectdb "${CMAKE_CURRENT_SOURCE_DIR}/src/asapo_broker" "./..." "MongoDBConnect")
go_integration_test(${TARGET_NAME}-nextrecord "${CMAKE_CURRENT_SOURCE_DIR}/src/asapo_broker" "./..." "MongoDBNext")
go_integration_test(${TARGET_NAME}-nextrecord "${CMAKE_CURRENT_SOURCE_DIR}/src/asapo_broker" "./..." "MongoDB")
go_integration_test(${TARGET_NAME}-server "${CMAKE_CURRENT_SOURCE_DIR}/src/asapo_broker" "./..." "Server")
package database
import "asapo_common/utils"
import (
"asapo_common/logger"
"asapo_common/utils"
)
type Request struct {
DbName string
Beamtime string
DataSource string
Stream string
GroupId string
Op string
DatasetOp bool
MinDatasetSize int
ExtraParam string
Substream int
ExtraParam interface{}
}
type ExtraParamId struct {
Id int `json:"id"`
IdKey string `json:"id_key"`
}
type ExtraParamNacks struct {
From int
To int
}
type ExtraParamSize struct {
IncompleteDataset bool
IsDataset bool
}
type ExtraParamAck struct {
Id int
}
type ExtraParamNegAck struct {
Id int
DelayMs int
}
type ExtraParamDelete struct {
ErrorOnNotExist bool
DeleteMeta bool
}
type ExtraParamNext struct {
IdKey string `json:"id_key"`
Resend bool `json:"resend"`
DelayMs int `json:"delay_ms"`
ResendAttempts int `json:"resend_attempts"`
}
func (request *Request) Logger() logger.Logger {
return logger.WithFields(map[string]interface{}{
"beamtime": request.Beamtime,
"dataSource": decodeString(request.DataSource),
"stream": decodeString(request.Stream),
"groupId": decodeString(request.GroupId),
"operation": request.Op,
"DatasetOp": request.DatasetOp,
"minDatasetSize": request.MinDatasetSize,
"substream": request.Substream,
})
}
func (request *Request) DbName() string {
return request.Beamtime + "_" + request.DataSource
}
type Agent interface {
ProcessRequest(request Request) ([]byte, error)
ProcessRequest(request Request) ([]byte, error, uint64)
Ping() error
Connect(string) error
Close()
DropDatabase(dbname string) error
SetSettings(settings DBSettings)
}
type DBSettings struct {
ReadFromInprocessPeriod int
ReadFromInprocessPeriod int
UpdateStreamCachePeriodMs int
}
......@@ -42,4 +101,3 @@ func GetStatusCodeFromError(err error) int {
return utils.StatusServiceUnavailable
}
}
package database
import (
"github.com/stretchr/testify/mock"
"testing"
"github.com/stretchr/testify/mock"
)
// we run this test just to get 100% coverage for mock_database.go
......
......@@ -69,9 +69,8 @@ func encodeStringForDbName(original string) (result string) {
return escape(original, true)
}
func decodeString(original string) (result string) {
result,_ = url.PathUnescape(original)
result, _ = url.PathUnescape(original)
return result
}
......@@ -80,18 +79,14 @@ func encodeStringForColName(original string) (result string) {
}
func encodeRequest(request *Request) error {
request.DbName = encodeStringForDbName(request.DbName)
if len(request.DbName)> max_encoded_source_size {
request.DataSource = encodeStringForDbName(request.DataSource)
request.Beamtime = encodeStringForDbName(request.Beamtime)
if len(request.DbName()) > max_encoded_source_size {
return &DBError{utils.StatusWrongInput, "source name is too long"}
}
request.Stream = encodeStringForColName(request.Stream)
if len(request.Stream)> max_encoded_stream_size {
return &DBError{utils.StatusWrongInput, "stream name is too long"}
}
request.GroupId = encodeStringForColName(request.GroupId)
if len(request.GroupId)> max_encoded_group_size {
if len(request.GroupId) > max_encoded_group_size {
return &DBError{utils.StatusWrongInput, "group id is too long"}
}
......
......@@ -2,9 +2,10 @@ package database
import (
"asapo_common/utils"
"github.com/stretchr/testify/assert"
"math/rand"
"testing"
"github.com/stretchr/testify/assert"
)
func TestEncoding(t *testing.T) {
......@@ -18,7 +19,8 @@ func TestEncoding(t *testing.T) {
assert.Equal(t, sourceDecoded, source)
r := Request{
DbName: source,
Beamtime: "bt",
DataSource: source,
Stream: stream,
GroupId: stream,
Op: "",
......@@ -27,9 +29,8 @@ func TestEncoding(t *testing.T) {
ExtraParam: "",
}
err := encodeRequest(&r)
assert.Equal(t, r.Stream, streamEncoded)
assert.Equal(t, r.GroupId, streamEncoded)
assert.Equal(t, r.DbName, sourceEncoded)
assert.Equal(t, r.DataSource, sourceEncoded)
assert.Nil(t, err)
}
......@@ -42,7 +43,7 @@ var encodeTests = []struct {
message string
}{
{max_encoded_stream_size, max_encoded_group_size, max_encoded_source_size, true, "ok"},
{max_encoded_stream_size + 1, max_encoded_group_size, max_encoded_source_size, false, "stream"},
{max_encoded_stream_size + 1, max_encoded_group_size, max_encoded_source_size, true, "stream"},
{max_encoded_stream_size, max_encoded_group_size + 1, max_encoded_source_size, false, "group"},
{max_encoded_stream_size, max_encoded_group_size, max_encoded_source_size + 1, false, "source"},
}
......@@ -61,9 +62,10 @@ func TestEncodingTooLong(t *testing.T) {
for _, test := range encodeTests {
stream := RandomString(test.streamSize)
group := RandomString(test.groupSize)
source := RandomString(test.sourceSize)
source := RandomString(test.sourceSize - 3)
r := Request{
DbName: source,
Beamtime: "bt",
DataSource: source,
Stream: stream,
GroupId: group,
Op: "",
......@@ -76,7 +78,7 @@ func TestEncodingTooLong(t *testing.T) {
assert.Nil(t, err, test.message)
} else {
assert.Equal(t, utils.StatusWrongInput, err.(*DBError).Code)
assert.Contains(t,err.Error(),test.message,test.message)
assert.Contains(t, err.Error(), test.message, test.message)
}
}
}
//go:build !release
// +build !release
package database
......@@ -19,6 +20,10 @@ func (db *MockedDatabase) Close() {
db.Called()
}
func (db *MockedDatabase) DropDatabase(dbname string) error {
return nil
}
func (db *MockedDatabase) Ping() error {
args := db.Called()
return args.Error(0)
......@@ -28,8 +33,7 @@ func (db *MockedDatabase) SetSettings(settings DBSettings) {
db.Called()
}
func (db *MockedDatabase) ProcessRequest(request Request) (answer []byte, err error) {
func (db *MockedDatabase) ProcessRequest(request Request) (answer []byte, err error, idx uint64) {
args := db.Called(request)
return args.Get(0).([]byte), args.Error(1)
return args.Get(0).([]byte), args.Error(1), 0
}
//+build !test
//go:build !test
// +build !test
package database
import (
"asapo_common/logger"
log "asapo_common/logger"
"asapo_common/utils"
"context"
"encoding/json"
"errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"math"
"strconv"
"strings"
"sync"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type ID struct {
ID int `bson:"_id"`
type AckRecord struct {
Uid string `bson:"_id" json:"unique_id"`
Id int `bson:"message_id"`
Streams string `bson:"stream" json:"stream"`
GroupId string `bson:"group_id" json:"group_id"`
}
type MessageRecord struct {
ID int `json:"_id"`
Timestamp int `bson:"timestamp" json:"timestamp"`
Name string `json:"name"`
Meta map[string]interface{} `json:"meta"`
NextStream string
FinishedStream bool
ID int `json:"message_id"`
TimeID int `json:"time_id"`
Timestamp int `bson:"timestamp" json:"timestamp"`
Name string `json:"name"`
Meta map[string]interface{} `json:"meta"`
}
type InProcessingRecord struct {
ID int `bson:"_id" json:"_id"`
MaxResendAttempts int `bson:"maxResendAttempts" json:"maxResendAttempts"`
ResendAttempts int `bson:"resendAttempts" json:"resendAttempts"`
DelayMs int64 `bson:"delayMs" json:"delayMs"`
Uid string `bson:"_id" json:"unique_id"`
Id int `bson:"id" json:"_id"`
Streams string `bson:"stream" json:"stream"`
GroupId string `bson:"group_id" json:"group_id"`
MaxResendAttempts int `bson:"maxResendAttempts" json:"maxResendAttempts"`
ResendAttempts int `bson:"resendAttempts" json:"resendAttempts"`
DelayMs int64 `bson:"delayMs" json:"delayMs"`
IdKey string `bson:"id_key" json:"id_key"`
Internal bool `bson:"internal" json:"internal"`
}
type NegAckParamsRecord struct {
......@@ -50,32 +59,45 @@ type Nacks struct {
Unacknowledged []int `json:"unacknowledged"`
}
type Sources struct {
Datasources []string `json:"sources"`
}
type LastAck struct {
ID int `bson:"_id" json:"lastAckId"`
ID int `bson:"message_id" json:"lastAckId"`
}
type LocationPointer struct {
GroupID string `bson:"_id"`
GroupID string `bson:"_id"` // Contain group_id and stream
Value int `bson:"current_pointer"`
}
type PersistedStreamsList struct {
ID int `json:"_id"`
Streams []string `json:"persisted_streams"`
}
const data_collection_name_prefix = "data_"
const acks_collection_name_prefix = "acks_"
const inprocess_collection_name_prefix = "inprocess_"
const meta_collection_name = "meta"
const streams_info = "streams"
const pointer_collection_name = "current_location"
const pointer_field_name = "current_pointer"
const last_message_collection_name = "last_messages"
const last_message_field_name = "last_message"
const auto_id_counter_name = "auto_id_counters"
const no_session_msg = "database client not created"
const already_connected_msg = "already connected"
const failed_extract_extra_params = "failed to extract extra parameters"
const too_many_persisted_streams = "too many persisted streams"
const finish_stream_keyword = "asapo_finish_stream"
const no_next_stream_keyword = "asapo_no_next"
const stream_filter_all = "all"
const stream_filter_finished = "finished"
const stream_filter_unfinished = "unfinished"
const Stream_filter_all = "all"
const Stream_filter_finished = "finished"
const Stream_filter_unfinished = "unfinished"
const (
field_op_inc int = iota
......@@ -84,10 +106,10 @@ const (
type fieldChangeRequest struct {
collectionName string
fieldName string
op int
max_ind int
val int
fieldName string
op int
max_ind int
val int
}
var dbSessionLock sync.Mutex
......@@ -154,6 +176,11 @@ func (db *Mongodb) Close() {
}
}
func (db *Mongodb) DropDatabase(dbname string) (err error) {
return db.dropDatabase(dbname)
}
// This function is used in tests only
func (db *Mongodb) dropDatabase(dbname string) (err error) {
if db.client == nil {
return &DBError{utils.StatusServiceUnavailable, no_session_msg}
......@@ -161,24 +188,47 @@ func (db *Mongodb) dropDatabase(dbname string) (err error) {
return db.client.Database(dbname).Drop(context.TODO())
}
func (db *Mongodb) insertRecord(dbname string, collection_name string, s interface{}) error {
// This function is used in tests only
// One have to take care to create index if needed.
func (db *Mongodb) insertRecord(dbname string, s interface{}) error {
if db.client == nil {
return &DBError{utils.StatusServiceUnavailable, no_session_msg}
}
c := db.client.Database(dbname).Collection(data_collection_name_prefix + collection_name)
c := db.client.Database(dbname).Collection(data_collection_name_prefix)
_, err := c.InsertOne(context.TODO(), s)
return err
}
// This function is used in tests only
func (db *Mongodb) insertStreamFinished(dbname string, s interface{}) error {
if db.client == nil {
return &DBError{utils.StatusServiceUnavailable, no_session_msg}
}
c := db.client.Database(dbname).Collection(streams_info)
_, err := c.InsertOne(context.TODO(), s)
return err
}
// This function is used in tests only
func (db *Mongodb) replaceRecord(dbname string, id int, s interface{}) error {
if db.client == nil {
return &DBError{utils.StatusServiceUnavailable, no_session_msg}
}
c := db.client.Database(dbname).Collection(data_collection_name_prefix)
_, err := c.ReplaceOne(context.TODO(), bson.M{"message_id": id}, s)
return err
}
// This function is used in tests only
func (db *Mongodb) insertMeta(dbname string, s interface{}) error {
if db.client == nil {
return &DBError{utils.StatusServiceUnavailable, no_session_msg}
}
c := db.client.Database(dbname).Collection(meta_collection_name)
_, err := c.InsertOne(context.TODO(), s)
return err
}
......@@ -187,31 +237,69 @@ func maxIndexQuery(request Request, returnIncompete bool) bson.M {
var q bson.M
if request.DatasetOp && !returnIncompete {
if request.MinDatasetSize > 0 {
q = bson.M{"$expr": bson.M{"$gte": []interface{}{bson.M{"$size": "$messages"}, request.MinDatasetSize}}}
q = bson.M{"$expr": bson.M{"$gte": []interface{}{bson.M{"$size": "$messages"}, request.MinDatasetSize}}, "stream": request.Stream}
} else {
q = bson.M{"$expr": bson.M{"$eq": []interface{}{"$size", bson.M{"$size": "$messages"}}}}
q = bson.M{"$expr": bson.M{"$eq": []interface{}{"$dataset_size", bson.M{"$size": "$messages"}}}, "stream": request.Stream}
}
q = bson.M{"$or": []interface{}{bson.M{"name": finish_stream_keyword}, q}}
} else {
q = nil
q = bson.M{"stream": request.Stream}
}
return q
}
func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool) (max_id int, err error) {
c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.Stream)
q := maxIndexQuery(request, returnIncompete)
func getInt32Val(request Request, val interface{}) (int32, error) {
var result int32
switch v := val.(type) {
default:
request.Logger().Debug("unexpected type %T", v)
return 0, errors.New("cannot convert value to int")
case int32:
result, _ = val.(int32)
case int64:
val, _ := val.(int64)
result = int32(val)
}
return result, nil
}
opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true)
var result ID
err = c.FindOne(context.TODO(), q, opts).Decode(&result)
func getIntVal(request Request, val interface{}) (int, error) {
var result int
switch v := val.(type) {
default:
request.Logger().Debug("unexpected type %T", v)
return 0, errors.New("cannot convert value to int")
case int32:
val, _ := val.(int32)
result = int(val)
case int64:
val, _ := val.(int64)
result = int(val)
case int:
val, _ := val.(int)
result = int(val)
}
return result, nil
}
func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool, idKey string) (int, error) {
c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix)
q := maxIndexQuery(request, returnIncompete)
opts := options.FindOne().SetSort(bson.M{idKey: -1}).SetProjection(bson.D{{idKey, 1}})
var result map[string]interface{}
err := c.FindOne(context.TODO(), q, opts).Decode(&result)
if err == mongo.ErrNoDocuments {
return 0, nil
}
return result.ID, err
if err != nil {
return 0, err
}
maxId, ok := getInt32Val(request, result[idKey])
if ok != nil {
return 0, errors.New("cannot get max index by " + idKey)
}
return int(maxId), nil
}
func duplicateError(err error) bool {
command_error, ok := err.(mongo.CommandError)
if !ok {
......@@ -227,22 +315,51 @@ func duplicateError(err error) bool {
func (db *Mongodb) setCounter(request Request, ind int) (err error) {
update := bson.M{"$set": bson.M{pointer_field_name: ind}}
opts := options.Update().SetUpsert(true)
c := db.client.Database(request.DbName).Collection(pointer_collection_name)
c := db.client.Database(request.DbName()).Collection(pointer_collection_name)
q := bson.M{"_id": request.GroupId + "_" + request.Stream}
_, err = c.UpdateOne(context.TODO(), q, update, opts)
return
}
func (db *Mongodb) errorWhenCannotSetField(request Request, max_ind int) error {
if res, err := db.getRecordFromDb(request, max_ind, max_ind); err == nil {
if err2 := checkStreamFinished(request, max_ind, max_ind, res); err2 != nil {
return err2
}
func (db *Mongodb) updateInProcess(request Request, delayMs int, id int) error {
opts := options.Update().SetUpsert(false)
tNow := time.Now().UnixNano()
update := bson.M{"$set": bson.M{"delayMs": tNow + int64(delayMs*1e6)}}
q := bson.M{"stream": request.Stream, "group_id": request.GroupId, "internal": true, "id": id}
c := db.client.Database(request.DbName()).Collection(inprocess_collection_name_prefix)
rec, err := c.UpdateOne(context.TODO(), q, update, opts)
if rec != nil && rec.MatchedCount == 0 {
return mongo.ErrNoDocuments
}
return err
}
func (db *Mongodb) errorWhenCannotSetField(request Request, max_ind int, ordered bool) error {
if errFinish := db.checkStreamFinishedDB(request, max_ind, ordered); errFinish != nil {
return errFinish
}
return &DBError{utils.StatusNoData, encodeAnswer(max_ind, max_ind, "")}
}
func (db *Mongodb) changeField(request Request, change fieldChangeRequest, res interface{}) (err error) {
func message_fallback(res map[string]interface{}) {
res["_id"] = res["message_id"]
messages_bson, ok := res["messages"].(primitive.A)
if !ok {
return
}
if val, ok := res["dataset_size"]; ok {
res["size"] = val
}
var messages_info primitive.A
for _, msg_bson := range messages_bson {
msg, _ := msg_bson.(map[string]interface{})
msg["_id"] = msg["message_id"]
messages_info = append(messages_info, msg)
}
res["messages"] = messages_info
}
func (db *Mongodb) changeField(request Request, change fieldChangeRequest, idKey string, res interface{}) (err error) {
var update bson.M
if change.op == field_op_inc {
update = bson.M{"$inc": bson.M{change.fieldName: 1}}
......@@ -252,7 +369,7 @@ func (db *Mongodb) changeField(request Request, change fieldChangeRequest, res i
opts := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After)
q := bson.M{"_id": request.GroupId + "_" + request.Stream, change.fieldName: bson.M{"$lt": change.max_ind}}
c := db.client.Database(request.DbName).Collection(change.collectionName)
c := db.client.Database(request.DbName()).Collection(change.collectionName)
err = c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res)
if err != nil {
......@@ -264,7 +381,7 @@ func (db *Mongodb) changeField(request Request, change fieldChangeRequest, res i
if err2 := c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res); err2 == nil {
return nil
}
return db.errorWhenCannotSetField(request, change.max_ind)
return db.errorWhenCannotSetField(request, change.max_ind, idKey == "message_id")
}
return &DBError{utils.StatusTransactionInterrupted, err.Error()}
}
......@@ -288,136 +405,165 @@ func recordContainsPartialData(request Request, rec map[string]interface{}) bool
return false
}
name, ok_name := rec["name"].(string)
if ok_name && name == finish_stream_keyword {
return false
}
imgs, ok1 := rec["messages"].(primitive.A)
expectedSize, ok2 := utils.InterfaceToInt64(rec["size"])
expectedSize, ok2 := utils.InterfaceToInt64(rec["dataset_size"])
if !ok1 || !ok2 {
return false
return true
}
nMessages := len(imgs)
if (request.MinDatasetSize == 0 && int64(nMessages) != expectedSize) || (request.MinDatasetSize == 0 && nMessages < request.MinDatasetSize) {
if (request.MinDatasetSize == 0 && int64(nMessages) != expectedSize) || (request.MinDatasetSize > 0 && nMessages < request.MinDatasetSize) {
return true
}
return false
}
func (db *Mongodb) getRecordFromDb(request Request, id, id_max int) (res map[string]interface{}, err error) {
q := bson.M{"_id": id}
c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.Stream)
err = c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res)
func addSubstreamSelectionProjection(opts *options.FindOneOptions, substream int) {
projection := bson.M{
"messages": bson.M{
"$elemMatch": bson.M{
"dataset_substream": substream,
},
}, "stream": 1, "message_id": 1, "time_id": 1, "dataset_size": 1, "timestamp": 1,
}
opts = opts.SetProjection(projection)
}
func (db *Mongodb) getRecordFromDb(request Request, id, id_max int, idKey string) (res map[string]interface{}, err error) {
q := bson.M{idKey: id, "stream": request.Stream}
opts := options.FindOne()
if request.Substream > 0 {
addSubstreamSelectionProjection(opts, request.Substream)
}
c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix)
err = c.FindOne(context.TODO(), q, opts).Decode(&res)
if err != nil {
answer := encodeAnswer(id, id_max, "")
log_str := "error getting record id " + strconv.Itoa(id) + " for " + request.DbName + " : " + err.Error()
logger.Debug(log_str)
request.Logger().WithFields(map[string]interface{}{"id": id, "cause": err.Error(), "idKey": idKey}).Debug("error getting record")
return res, &DBError{utils.StatusNoData, answer}
}
request.Logger().WithFields(map[string]interface{}{"id": id, "idKey": idKey}).Debug("got record from db")
return res, err
}
func (db *Mongodb) getRecordByIDRaw(request Request, id, id_max int) ([]byte, error) {
res, err := db.getRecordFromDb(request, id, id_max)
func (db *Mongodb) getRecordByIDRaw(request Request, id, id_max int, idKey string) ([]byte, uint64, error) {
message_id := 0
// We need this message_id to put message to inProcess
if idKey == "message_id" {
message_id = id
}
res, err := db.getRecordFromDb(request, id, id_max, idKey)
if err != nil {
return nil, err
return nil, uint64(message_id), err
}
if err := checkStreamFinished(request, id, id_max, res); err != nil {
return nil, err
if idKey == "time_id" {
message_id, _ = getIntVal(request, res["message_id"])
}
log_str := "got record id " + strconv.Itoa(id) + " for " + request.DbName
logger.Debug(log_str)
message_fallback(res)
record, err := utils.MapToJson(&res)
if err != nil {
return nil, err
return nil, 0, err
}
if recordContainsPartialData(request, res) {
return nil, &DBError{utils.StatusPartialData, string(record)}
return nil, uint64(message_id), &DBError{utils.StatusPartialData, string(record)}
} else {
return record, nil
return record, uint64(message_id), nil
}
}
func (db *Mongodb) getRawRecordWithSort(dbname string, collection_name string, sortField string, sortOrder int) (map[string]interface{}, error) {
var res map[string]interface{}
c := db.client.Database(dbname).Collection(data_collection_name_prefix + collection_name)
c := db.client.Database(dbname).Collection(data_collection_name_prefix)
opts := options.FindOne().SetSort(bson.M{sortField: sortOrder})
var q bson.M = nil
var q bson.M = bson.M{"stream": collection_name}
err := c.FindOne(context.TODO(), q, opts).Decode(&res)
if err != nil {
if err == mongo.ErrNoDocuments {
return map[string]interface{}{}, nil
}
return nil, err
}
return res, nil
return res, err
}
func (db *Mongodb) getLastRawRecord(dbname string, collection_name string) (map[string]interface{}, error) {
return db.getRawRecordWithSort(dbname, collection_name, "_id", -1)
return db.getRawRecordWithSort(dbname, collection_name, "message_id", -1)
}
func (db *Mongodb) getEarliestRawRecord(dbname string, collection_name string) (map[string]interface{}, error) {
return db.getRawRecordWithSort(dbname, collection_name, "timestamp", 1)
}
func (db *Mongodb) getRecordByID(request Request) ([]byte, error) {
id, err := strconv.Atoi(request.ExtraParam)
if err != nil {
return nil, &DBError{utils.StatusWrongInput, err.Error()}
func (db *Mongodb) getRecordByID(request Request) ([]byte, error, uint64) {
params, ok := request.ExtraParam.(ExtraParamId)
if !ok {
return nil, &DBError{utils.StatusWrongInput, failed_extract_extra_params}, 0
}
max_ind, err := db.getMaxIndex(request, true)
max_ind, err := db.getMaxIndex(request, true, params.IdKey)
if err != nil {
return nil, err
return nil, err, 0
}
return db.getRecordByIDRaw(request, id, max_ind)
data, message_id, err := db.getRecordByIDRaw(request, params.Id, max_ind, params.IdKey)
// If error to get record from DB, may be stream is already finished
if err != nil && params.Id > max_ind {
if errFinish := db.checkStreamFinishedDB(request, max_ind, params.IdKey == "message_id"); errFinish != nil {
return nil, errFinish, 0
}
}
return data, err, message_id
}
func (db *Mongodb) negAckRecord(request Request) ([]byte, error) {
input := struct {
Id int
Params struct {
DelayMs int
}
}{}
err := json.Unmarshal([]byte(request.ExtraParam), &input)
if err != nil {
return nil, &DBError{utils.StatusWrongInput, err.Error()}
params, ok := request.ExtraParam.(ExtraParamNegAck)
if !ok {
return nil, &DBError{utils.StatusWrongInput, failed_extract_extra_params}
}
err = db.InsertRecordToInprocess(request.DbName, inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, input.Id, input.Params.DelayMs, 1, true)
err := db.updateInProcess(request, params.DelayMs, params.Id)
// This should only happen when resending is disabled, but currently it can also happen if resending
// is enabled and a client nacks the wrong `message_id` or `group_id`
// (maybe can be prevented in a later change to the client).
if errors.Is(err, mongo.ErrNoDocuments) {
err = db.InsertRecordToInprocess(request, params.Id, params.DelayMs,
1, "message_id", false)
}
return []byte(""), err
}
func (db *Mongodb) ackRecord(request Request) ([]byte, error) {
var record ID
err := json.Unmarshal([]byte(request.ExtraParam), &record)
if err != nil {
return nil, &DBError{utils.StatusWrongInput, err.Error()}
params, ok := request.ExtraParam.(ExtraParamAck)
if !ok {
return nil, &DBError{utils.StatusWrongInput, failed_extract_extra_params}
}
c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId)
_, err = c.InsertOne(context.Background(), &record)
request.Logger().WithFields(map[string]interface{}{"id": params.Id}).Debug("acknowledge message")
askRecord := AckRecord{
request.Stream + "_" + request.GroupId + "_" + strconv.Itoa(params.Id), // This ensures, that message is unique even if index is not created
params.Id, request.Stream, request.GroupId,
}
c := db.client.Database(request.DbName()).Collection(acks_collection_name_prefix)
_, err := c.InsertOne(context.TODO(), &askRecord)
if err != nil {
if duplicateError(err) {
return nil, &DBError{utils.StatusWrongInput, "already acknowledged"}
request.Logger().WithFields(map[string]interface{}{"id": params.Id, "cause": err.Error()}).Warning("acknowledge failed")
// We still want to try to delete in case of duplication
if !duplicateError(err) {
return nil, err
}
return nil, err
}
c = db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.Stream + "_" + request.GroupId)
_, err_del := c.DeleteOne(context.Background(), bson.M{"_id": record.ID})
if err_del != nil {
return nil, &DBError{utils.StatusWrongInput, err.Error()}
}
return []byte(""), err
c = db.client.Database(request.DbName()).Collection(inprocess_collection_name_prefix)
filter := bson.M{"id": params.Id, "stream": request.Stream, "group_id": request.GroupId, "id_key": "message_id",
"internal": false}
_, err = c.DeleteOne(context.TODO(), filter)
if err != nil {
// It is unclear what errors DeleteOne can return. Let's hope they can be solved by the client retrying a bit later.
// TODO: Figure out if there are errors that are better represented by DBError{utils.StatusTransactionInterrupted, mongoErr.Error()}
return nil, &DBError{utils.StatusServiceUnavailable, err.Error()}
}
// If no matching document was found then `res.DeletedCount == 0` (where `res` is an value returned by `c.DeleteOne`).
// The document could be missing for multiple reasons:
// * A race condition when two requests try to acknowledge the same id simultaneously
// * A message that arrived late was retrieved by the consumer after internally switching
// from `get_next` with `ordered=True` to `get_by_id`, in which case it was not added to the collection
// * The user acknowledged the wrong ID. It would be nice to return a `WrongInputError` in this case,
// but it currently cannot be distinguished from the above cases
// TODO: Make it harder to acknowledge the wrong ID and/or figure out a way to distinguish a wrong input from a race condition.
return []byte(""), nil
}
func (db *Mongodb) checkDatabaseOperationPrerequisites(request Request) error {
......@@ -425,15 +571,15 @@ func (db *Mongodb) checkDatabaseOperationPrerequisites(request Request) error {
return &DBError{utils.StatusServiceUnavailable, no_session_msg}
}
if len(request.DbName) == 0 || len(request.Stream) == 0 {
if len(request.DbName()) <= 1 || len(request.Stream) == 0 {
return &DBError{utils.StatusWrongInput, "beamtime_id ans stream must be set"}
}
return nil
}
func (db *Mongodb) getCurrentPointer(request Request) (LocationPointer, int, error) {
max_ind, err := db.getMaxIndex(request, true)
func (db *Mongodb) getCurrentPointer(request Request, collectionName string, idKey string) (LocationPointer, int, error) {
max_ind, err := db.getMaxIndex(request, true, idKey)
if err != nil {
return LocationPointer{}, 0, err
}
......@@ -444,10 +590,10 @@ func (db *Mongodb) getCurrentPointer(request Request) (LocationPointer, int, err
var curPointer LocationPointer
err = db.changeField(request, fieldChangeRequest{
collectionName: pointer_collection_name,
fieldName: pointer_field_name,
op: field_op_inc,
max_ind: max_ind}, &curPointer)
collectionName: collectionName,
fieldName: pointer_field_name,
op: field_op_inc,
max_ind: max_ind}, idKey, &curPointer)
if err != nil {
return LocationPointer{}, 0, err
}
......@@ -455,241 +601,387 @@ func (db *Mongodb) getCurrentPointer(request Request) (LocationPointer, int, err
return curPointer, max_ind, nil
}
func (db *Mongodb) getUnProcessedId(dbname string, collection_name string, delayMs int, nResendAttempts int) (int, error) {
func (db *Mongodb) getUnProcessedId(dbname string, request Request, collection_name string, delayMs int,
maxResendAttempts int, fromInternal bool, rlog log.Logger) (int, string, error) {
var res InProcessingRecord
opts := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After)
tNow := time.Now().UnixNano()
var update bson.M
if nResendAttempts == 0 {
if maxResendAttempts == 0 {
update = bson.M{"$set": bson.M{"delayMs": tNow + int64(delayMs*1e6), "maxResendAttempts": math.MaxInt32}, "$inc": bson.M{"resendAttempts": 1}}
} else {
update = bson.M{"$set": bson.M{"delayMs": tNow + int64(delayMs*1e6), "maxResendAttempts": nResendAttempts}, "$inc": bson.M{"resendAttempts": 1}}
update = bson.M{"$set": bson.M{"delayMs": tNow + int64(delayMs*1e6), "maxResendAttempts": maxResendAttempts}, "$inc": bson.M{"resendAttempts": 1}}
}
q := bson.M{"delayMs": bson.M{"$lte": tNow}, "$expr": bson.M{"$lt": []string{"$resendAttempts", "$maxResendAttempts"}}}
q := bson.M{"stream": request.Stream, "group_id": request.GroupId, "internal": fromInternal,
"delayMs": bson.M{"$lte": tNow},
"$expr": bson.M{"$lt": []string{"$resendAttempts", "$maxResendAttempts"}}}
c := db.client.Database(dbname).Collection(collection_name)
err := c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(&res)
if err != nil {
if err == mongo.ErrNoDocuments {
return 0, nil
}
return 0, err
return 0, "", err
}
log_str := "got unprocessed id " + strconv.Itoa(res.ID) + " for " + dbname
logger.Debug(log_str)
return res.ID, nil
rlog.WithFields(map[string]interface{}{"id": res.Id, "idKey": res.IdKey, "internal": fromInternal}).Debug("got message id from inProcess")
return res.Id, res.IdKey, nil
}
func (db *Mongodb) InsertRecordToInprocess(db_name string, collection_name string, id int, delayMs int, nResendAttempts int, replaceIfExist bool) error {
func (db *Mongodb) InsertRecordToInprocess(request Request, id int, delayMs int,
nResendAttempts int, idKey string, internal bool) error {
request.Logger().WithFields(map[string]interface{}{"id": id, "idKey": idKey, "delay": delayMs,
"attempts": nResendAttempts, "internal": internal}).Debug("insert to inProcess")
record := InProcessingRecord{
id, nResendAttempts, 0, time.Now().UnixNano() + int64(delayMs*1e6),
request.Stream + "_" + request.GroupId + "_" + utils.BoolToStr(internal) + "_" + strconv.Itoa(id), // This ensures, that message is unique even if index is not created
id, request.Stream, request.GroupId,
nResendAttempts, 0,
time.Now().UnixNano() + int64(delayMs*1e6), idKey, internal,
}
c := db.client.Database(db_name).Collection(collection_name)
c := db.client.Database(request.DbName()).Collection(inprocess_collection_name_prefix)
_, err := c.InsertOne(context.TODO(), &record)
if duplicateError(err) {
if !replaceIfExist {
return nil
}
_, err := c.ReplaceOne(context.TODO(), bson.M{"_id": id}, &record)
return err
}
return err
}
func (db *Mongodb) InsertToInprocessIfNeeded(db_name string, collection_name string, id int, extra_param string) error {
if len(extra_param) == 0 {
return nil
}
delayMs, nResendAttempts, err := extractsTwoIntsFromString(extra_param)
if err != nil {
return err
}
return db.InsertRecordToInprocess(db_name, collection_name, id, delayMs, nResendAttempts, false)
}
func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTimeout bool) (int, int, error) {
var record_ind, max_ind, delayMs, nResendAttempts int
var err error
if len(request.ExtraParam) != 0 {
delayMs, nResendAttempts, err = extractsTwoIntsFromString(request.ExtraParam)
if err != nil {
return 0, 0, err
}
} else {
nResendAttempts = -1
}
func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request,
delayMs int, maxResendAttempts int, fromInternal bool) (int, int, string, error) {
tNow := time.Now().Unix()
dbSessionLock.Lock()
t := db.lastReadFromInprocess[request.Stream+"_"+request.GroupId]
t := db.lastReadFromInprocess[request.DbName()+"_"+request.Stream+"_"+request.GroupId+"_"+strconv.FormatBool(fromInternal)]
dbSessionLock.Unlock()
if (t <= tNow-int64(db.settings.ReadFromInprocessPeriod)) || ignoreTimeout {
record_ind, err = db.getUnProcessedId(request.DbName, inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, delayMs, nResendAttempts)
if t <= tNow-int64(db.settings.ReadFromInprocessPeriod) {
record_ind, idKey, err := db.getUnProcessedId(request.DbName(), request, inprocess_collection_name_prefix,
delayMs, maxResendAttempts, fromInternal,
request.Logger())
if err != nil {
log_str := "error getting unprocessed id " + request.DbName + ", groupid: " + request.GroupId + ":" + err.Error()
logger.Debug(log_str)
return 0, 0, err
request.Logger().WithFields(map[string]interface{}{"cause": err.Error(), "internal": fromInternal}).Debug("error getting message id from inProcess")
// Nothing is inside of inprocess. Update last read to avoid too ofter requests.
if errors.Is(err, mongo.ErrNoDocuments) {
dbSessionLock.Lock()
db.lastReadFromInprocess[request.DbName()+"_"+request.Stream+"_"+request.GroupId+"_"+strconv.FormatBool(fromInternal)] = time.Now().Unix()
dbSessionLock.Unlock()
}
return 0, 0, "", err
}
}
if record_ind != 0 {
max_ind, err = db.getMaxIndex(request, true)
maxInd, err := db.getMaxIndex(request, true, idKey)
if err != nil {
return 0, 0, err
return 0, 0, "", err
}
} else {
dbSessionLock.Lock()
db.lastReadFromInprocess[request.Stream+"_"+request.GroupId] = time.Now().Unix()
dbSessionLock.Unlock()
// Unexpected idKey shows a bug in the broker logic
if (fromInternal && request.DatasetOp && idKey != "message_id") ||
(fromInternal && !request.DatasetOp && idKey != "time_id") ||
(!fromInternal && idKey != "message_id") {
request.Logger().WithFields(map[string]interface{}{"idKey": idKey, "internal": fromInternal}).Error("unexpected idKey from inProcessed")
panic(errors.New("unexpected idKey from inProcessed"))
}
return record_ind, maxInd, idKey, nil
}
return record_ind, max_ind, nil
return 0, 0, "", errors.New("too early to ask for inProcess")
}
func (db *Mongodb) getNextAndMaxIndexesFromCurPointer(request Request) (int, int, error) {
curPointer, max_ind, err := db.getCurrentPointer(request)
func (db *Mongodb) getNextAndMaxIndexesFromCurPointer(request Request, collectionName, idKey string) (int, int, error) {
curPointer, max_ind, err := db.getCurrentPointer(request, collectionName, idKey)
if err != nil {
log_str := "error getting next pointer for " + request.DbName + ", groupid: " + request.GroupId + ":" + err.Error()
logger.Debug(log_str)
request.Logger().WithFields(map[string]interface{}{"cause": err.Error(), "idKey": idKey}).Debug("error getting next pointer")
return 0, 0, err
}
log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + request.DbName + ", groupid: " + request.GroupId
logger.Debug(log_str)
request.Logger().WithFields(map[string]interface{}{"id": curPointer.Value, "idKey": idKey}).Debug("got next pointer")
return curPointer.Value, max_ind, nil
}
func (db *Mongodb) getNextAndMaxIndexes(request Request) (int, int, error) {
nextInd, maxInd, err := db.getNextAndMaxIndexesFromInprocessed(request, false)
func ExtractMessageRecord(data map[string]interface{}) (MessageRecord, bool) {
var r MessageRecord
err := utils.MapToStruct(data, &r)
if err != nil {
return 0, 0, err
return r, false
}
return r, true
}
if nextInd == 0 {
nextInd, maxInd, err = db.getNextAndMaxIndexesFromCurPointer(request)
if err_db, ok := err.(*DBError); ok && err_db.Code == utils.StatusNoData {
var err_inproc error
nextInd, maxInd, err_inproc = db.getNextAndMaxIndexesFromInprocessed(request, true)
if err_inproc != nil {
return 0, 0, err_inproc
}
if nextInd == 0 {
return 0, 0, err
}
}
func (db *Mongodb) removeFromInProcess(request Request, idx int) error {
c := db.client.Database(request.DbName()).Collection(inprocess_collection_name_prefix)
filter := bson.M{"id": idx, "stream": request.Stream, "group_id": request.GroupId, "internal": true}
_, err := c.DeleteOne(context.TODO(), filter)
return err
}
// Get index of the record from in-process collection and
// return corresponding record
func (db *Mongodb) getRecordFromInprocessed(request Request, delayMs int, maxResendAttempts int, fromInternal bool) ([]byte, uint64, int, error) {
nextInd, maxInd, idKey, err := db.getNextAndMaxIndexesFromInprocessed(request, delayMs, maxResendAttempts, fromInternal)
if err != nil {
return nil, 0, 0, err
}
return nextInd, maxInd, nil
data, messageId, err := db.getRecordByIDRaw(request, nextInd, maxInd, idKey)
return data, messageId, nextInd, err
}
func ExtractMessageRecord(data map[string]interface{}) (MessageRecord, bool) {
var r MessageRecord
err := utils.MapToStruct(data, &r)
func (db *Mongodb) getStreamFinishedFlag(dbname string, stream string) (int, string, error) {
c := db.client.Database(dbname).Collection(streams_info)
q := bson.M{"name": stream, "finished": true}
var info StreamInfo
err := c.FindOne(context.TODO(), q).Decode(&info)
if err != nil {
return r, false
return 0, "", err
}
r.FinishedStream = (r.Name == finish_stream_keyword)
if r.FinishedStream {
var next_stream string
next_stream, ok := r.Meta["next_stream"].(string)
if !ok {
next_stream = no_next_stream_keyword
}
r.NextStream = next_stream
next_stream := info.NextStream
if info.NextStream == "" {
next_stream = no_next_stream_keyword
}
return r, true
return int(info.LastId), next_stream, nil
}
func (db *Mongodb) tryGetRecordFromInprocessed(request Request, originalerror error) ([]byte, error) {
var err_inproc error
nextInd, maxInd, err_inproc := db.getNextAndMaxIndexesFromInprocessed(request, true)
if err_inproc != nil {
return nil, err_inproc
}
if nextInd != 0 {
return db.getRecordByIDRaw(request, nextInd, maxInd)
func (db *Mongodb) getNumberOfMessages(dbname string, stream string) (int64, error) {
filter := bson.M{"stream": stream}
c := db.client.Database(dbname).Collection(data_collection_name_prefix)
size, err := c.CountDocuments(context.TODO(), filter, options.Count())
return size, err
}
func (db *Mongodb) getNumberOfDatasets(dbname string, stream string, minDatasetSize int) (int64, error) {
filter := bson.M{"stream": stream}
if minDatasetSize > 0 {
filter["$expr"] = bson.M{"$gte": []interface{}{bson.M{"$size": "$messages"}, minDatasetSize}}
} else {
return nil, originalerror
filter["$expr"] = bson.M{"$eq": []interface{}{"$dataset_size", bson.M{"$size": "$messages"}}}
}
c := db.client.Database(dbname).Collection(data_collection_name_prefix)
size, err := c.CountDocuments(context.TODO(), filter, options.Count())
return size, err
}
func checkStreamFinished(request Request, id, id_max int, data map[string]interface{}) error {
if id != id_max {
func (db *Mongodb) hasInProcessMessages(dbname string, groupId string, stream string) (bool, error) {
c := db.client.Database(dbname).Collection(inprocess_collection_name_prefix)
filter := bson.M{"stream": stream, "group_id": groupId,
"$expr": bson.M{"$lt": []string{"$resendAttempts", "$maxResendAttempts"}}}
var res map[string]interface{}
err := c.FindOne(context.TODO(), filter, options.FindOne()).Decode(&res)
if err == mongo.ErrNoDocuments {
return false, nil
}
// It is unknown if the stream is finished but it safe to assume that it is not because the client can just try again.
return true, err
}
// A stream is finished when the following conditions apply
// 1. The stream finished flag is set
// 2. The stream contains as many messages (that meet the completeness criteria in case of datasets) as given by the flag
// 3. There are no unacknowledged messages waiting to be resend
func (db *Mongodb) checkStreamFinishedDB(request Request, currentId int, ordered bool) error {
maxId, nextStream, err := db.getStreamFinishedFlag(request.DbName(), request.Stream)
if err != nil {
return nil
}
// Because skipped message ids are never resent, the stream is considered finished when the `currentId` reaches the
// `maxId` of the stream finished flag.
// the stream is considered finished when the `currentId` reaches the `maxId` of the stream finished flag.
// This shortcut does not apply to datasets, because datasets might be still incomplete.
if ordered && !request.DatasetOp {
if currentId == maxId {
answer := encodeAnswer(maxId, maxId, nextStream)
request.Logger().WithFields(map[string]interface{}{"id": maxId, "ordered": ordered}).Debug("stream finished")
return &DBError{utils.StatusNoData, answer}
} else {
return nil
}
}
var size int64
if request.DatasetOp {
size, err = db.getNumberOfDatasets(request.DbName(), request.Stream, request.MinDatasetSize)
} else {
size, err = db.getNumberOfMessages(request.DbName(), request.Stream)
}
// It is unknown if the stream is finished but it safe to assume that it is not because the client can just try again.
if err != nil {
return nil
}
r, ok := ExtractMessageRecord(data)
if !ok || !r.FinishedStream {
if int(size) != maxId {
return nil
}
log_str := "reached end of stream " + request.Stream + " , next_stream: " + r.NextStream
logger.Debug(log_str)
answer := encodeAnswer(r.ID-1, r.ID-1, r.NextStream)
hasInProcess, err := db.hasInProcessMessages(request.DbName(), request.GroupId, request.Stream)
// It is unknown if the stream is finished but it safe to assume that it is not because the client can just try again.
if err != nil {
return nil
}
if hasInProcess {
return nil
}
answer := encodeAnswer(maxId, maxId, nextStream)
request.Logger().WithFields(map[string]interface{}{"id": maxId, "ordered": ordered}).Debug("stream finished")
return &DBError{utils.StatusNoData, answer}
}
func (db *Mongodb) getNextRecord(request Request) ([]byte, error) {
nextInd, maxInd, err := db.getNextAndMaxIndexes(request)
func (db *Mongodb) getNextRecord(request Request) ([]byte, uint64, error) {
params, ok := request.ExtraParam.(ExtraParamNext)
if !ok {
return nil, 0, errors.New(failed_extract_extra_params)
}
// In order to avoid multiple return the same message set delay to 30s
// Actual value will be given after we try to get message from DB.
data, messageId, nextInd, err := db.getRecordFromInprocessed(request, 30000, 2,
true)
if err == nil {
if params.Resend {
err = db.InsertRecordToInprocess(request, int(messageId), params.DelayMs,
params.ResendAttempts, "message_id", false)
if err != nil {
if !duplicateError(err) {
// We could return the data here and ignore the error, but then the message
// would be sent again from the internal queue, even though the consumer might have
// tried to acknowledge it in time (which would have silently failed
// because the id is still in the internal and not in the external queue)
return nil, 0, err
}
// If it is duplicated error we still want to remove index from InProcess
// This can only happen if `removeFromInProcess` for the internal queue failed due to a timeout or
// connection error and the id was left in the internal queue in addition to the external queue.
// Try again now to remove it from the internal queue.
}
}
// Remove the time_id (or message_id in case of datasets) from
// the internal queue because it is either already safely stored
// in the external queue of to-be-acknowledged messages or should
// not be redelivered a second time if resending is disabled.
err = db.removeFromInProcess(request, nextInd)
// ToDo: return message here if resending is enabled instead of returning it in case of duplication error above.
if err != nil {
return nil, 0, err
}
return data, messageId, nil
}
if GetStatusCodeFromError(err) == utils.StatusNoData || GetStatusCodeFromError(err) == utils.StatusPartialData {
// If message still not in DB, enable more tries
err = db.updateInProcess(request, 1000, nextInd)
if err != nil {
return nil, 0, err
}
}
// Other error, that can be ignored:
// * mongo.ErrNoDocuments by getting from inProcess, too early to ask for inProcess -
// no id is ready now, try external queue instead. A later request will try again
// * ServerUnavailable by getting next id from inProcess - we don't know, try external queue now. A later request will try again
// * ServerUnavailable by getting message by id - A later request will try again in 30s. TODO: should this update the delay?
data, messageId, _, err = db.getRecordFromInprocessed(request, params.DelayMs, params.ResendAttempts,
false)
if err == nil {
return data, messageId, err
}
if GetStatusCodeFromError(err) == utils.StatusNoData {
request.Logger().WithFields(map[string]interface{}{"id": messageId, "cause": err.Error()}).Error("No data by getting from inProcess")
}
if GetStatusCodeFromError(err) == utils.StatusPartialData {
request.Logger().WithFields(map[string]interface{}{"id": messageId, "cause": err.Error()}).Warning("Partial data by getting from inProcess")
}
// error that can be returned:
// * StatusNoData - should not happen.
// * StatusPartialData - may happen if user change request parameter (minDataSize)
// * mongo.ErrNoDocuments by getting from inProcess, too early to ask for inProcess -
// no id is ready now, try external queue instead. A later request will try again
// * ServerUnavailable by getting next id from inProcess - we don't know, try external queue now. A later request will try again
// * ServerUnavailable by getting message by id - A later request will try again after params.DelayMS.
// In case of error getting record from inProcess, we try to get record from current pointer
// In case of Datasets we iterate across message_id even in case of ordered=False
idKey := params.IdKey
if request.DatasetOp {
idKey = "message_id"
}
nextInd, maxInd, err := db.getNextAndMaxIndexesFromCurPointer(request, pointer_collection_name, idKey)
// The error could be EndOfStream, StreamFinished, ServerUnavailable
if err != nil {
return nil, err
return nil, 0, err
}
data, err := db.getRecordByIDRaw(request, nextInd, maxInd)
data, messageId, err = db.getRecordByIDRaw(request, nextInd, maxInd, idKey)
// The error could be PartialData, NoData, ServerUnavailable
if err != nil {
data, err = db.tryGetRecordFromInprocessed(request, err)
if params.IdKey == "time_id" {
// This means the client used `ordered==False`, therefore retry the id at a later time
err = db.InsertRecordToInprocess(request, nextInd, 100,
1, idKey, true)
if err != nil {
// Inserting failed, id will be skipped for this group_id
// FixMe: Skipping id should not happen is resending is enabled (e.g. with transaction)
return nil, 0, err
}
// Return EndOfStream to let consumer try again (ID and maxID should be equal)
return nil, 0, &DBError{utils.StatusNoData, encodeAnswer(nextInd, nextInd, "")}
}
// In case of other idKey ("message_id") an error, that contain this id is return to the client.
// Client may try again using this id, otherwise record is considered as lost
return data, messageId, err
}
if err == nil {
err_update := db.InsertToInprocessIfNeeded(request.DbName, inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, nextInd, request.ExtraParam)
if err_update != nil {
return nil, err_update
if params.Resend {
err = db.InsertRecordToInprocess(request, int(messageId), params.DelayMs,
params.ResendAttempts, "message_id", false)
if err != nil {
request.Logger().WithFields(map[string]interface{}{"id": nextInd, "idKey": idKey,
"internal": true, "cause": err.Error()}).Warning("insert to inProcess failed")
}
// Ignore the error and return the message (better once than never)
}
return data, err
return data, messageId, nil
}
func (db *Mongodb) getLastRecord(request Request) ([]byte, error) {
max_ind, err := db.getMaxIndex(request, false)
func (db *Mongodb) getLastRecord(request Request, idKey string) ([]byte, error, uint64) {
max_ind, err := db.getMaxIndex(request, false, idKey)
if err != nil {
return nil, err
return nil, err, 0
}
// If stream is finished function will return stream finish error instead of the last message
if errFinish := db.checkStreamFinishedDB(request, max_ind, idKey == "message_id"); errFinish != nil {
return nil, errFinish, 0
}
return db.getRecordByIDRaw(request, max_ind, max_ind)
request.Logger().WithFields(map[string]interface{}{"id": max_ind, "idKey": idKey}).Debug("got max index")
data, message_id, err := db.getRecordByIDRaw(request, max_ind, max_ind, idKey)
return data, err, message_id
}
func (db *Mongodb) getLastRecordInGroup(request Request) ([]byte, error) {
max_ind, err := db.getMaxIndex(request, false)
func (db *Mongodb) getLastRecordInGroup(request Request, idKey string) ([]byte, uint64, error) {
max_ind, err := db.getMaxIndex(request, false, idKey)
if err != nil {
return nil, err
return nil, 0, err
}
if errFinish := db.checkStreamFinishedDB(request, max_ind, idKey == "message_id"); errFinish != nil {
return nil, 0, errFinish
}
var res map[string]interface{}
err = db.changeField(request, fieldChangeRequest{
collectionName: last_message_collection_name,
fieldName: last_message_field_name,
op: field_op_set,
max_ind: max_ind,
val: max_ind,
}, &res)
fieldName: last_message_field_name,
op: field_op_set,
max_ind: max_ind,
val: max_ind,
}, idKey, &res)
if err != nil {
return nil, err
return nil, 0, err
}
return db.getRecordByIDRaw(request, max_ind, max_ind)
return db.getRecordByIDRaw(request, max_ind, max_ind, idKey)
}
func getSizeFilter(request Request) bson.M {
filter := bson.M{}
if request.ExtraParam == "false" { // do not return incomplete datasets
filter = bson.M{"$expr": bson.M{"$eq": []interface{}{"$size", bson.M{"$size": "$messages"}}}}
} else if request.ExtraParam == "true" {
filter = bson.M{"$expr": bson.M{"$gt": []interface{}{bson.M{"$size": "$messages"}, 0}}}
filter := bson.M{"stream": request.Stream}
params := request.ExtraParam.(ExtraParamSize)
if params.IsDataset {
if params.IncompleteDataset {
filter["$expr"] = bson.M{"$gt": []interface{}{bson.M{"$size": "$messages"}, 0}}
} else {
filter["$expr"] = bson.M{"$eq": []interface{}{"$dataset_size", bson.M{"$size": "$messages"}}}
}
}
filter = bson.M{"$and": []interface{}{bson.M{"name": bson.M{"$ne": finish_stream_keyword}}, filter}}
return filter
}
// Calculate number of messages for given stream
func (db *Mongodb) getSize(request Request) ([]byte, error) {
c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.Stream)
c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix)
filter := getSizeFilter(request)
size, err := c.CountDocuments(context.TODO(), filter, options.Count())
......@@ -706,18 +998,20 @@ func (db *Mongodb) getSize(request Request) ([]byte, error) {
}
func (db *Mongodb) resetCounter(request Request) ([]byte, error) {
id, err := strconv.Atoi(request.ExtraParam)
if err != nil {
return nil, err
id, ok := request.ExtraParam.(int)
if !ok {
return nil, &DBError{utils.StatusWrongInput, failed_extract_extra_params}
}
err = db.setCounter(request, id)
err := db.setCounter(request, id)
if err != nil {
return []byte(""), err
}
c := db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.Stream + "_" + request.GroupId)
_, err_del := c.DeleteMany(context.Background(), bson.M{"_id": bson.M{"$gte": id}})
c := db.client.Database(request.DbName()).Collection(inprocess_collection_name_prefix)
filter := bson.M{"id": bson.M{"$gte": id}, "stream": request.Stream,
"group_id": request.GroupId}
_, err_del := c.DeleteMany(context.TODO(), filter)
if err_del != nil {
return nil, &DBError{utils.StatusWrongInput, err_del.Error()}
}
......@@ -726,7 +1020,11 @@ func (db *Mongodb) resetCounter(request Request) ([]byte, error) {
}
func getMetaId(request Request) (string, error) {
switch request.ExtraParam {
meta_id, ok := request.ExtraParam.(string)
if !ok {
return "", &DBError{utils.StatusWrongInput, failed_extract_extra_params}
}
switch meta_id {
case "0":
return "bt", nil
case "1":
......@@ -743,40 +1041,36 @@ func (db *Mongodb) getMeta(request Request) ([]byte, error) {
}
q := bson.M{"_id": id}
var res map[string]interface{}
c := db.client.Database(request.DbName).Collection(meta_collection_name)
c := db.client.Database(request.DbName()).Collection(meta_collection_name)
err = c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res)
if err != nil {
log_str := "error getting meta for " + id + " in " + request.DbName + " : " + err.Error()
logger.Debug(log_str)
request.Logger().WithFields(map[string]interface{}{"meta_id": id, "cause": err.Error()}).Debug("error getting meta")
return nil, &DBError{utils.StatusNoData, err.Error()}
}
userMeta, ok := res["meta"]
if !ok {
log_str := "error getting meta for " + id + " in " + request.DbName + " : cannot parse database response"
logger.Error(log_str)
return nil, errors.New(log_str)
request.Logger().WithFields(map[string]interface{}{"id": id, "cause": "cannot parse database response"}).Debug("error getting meta")
return nil, errors.New("cannot get metadata")
}
log_str := "got metadata for " + id + " in " + request.DbName
logger.Debug(log_str)
request.Logger().WithFields(map[string]interface{}{"meta_id": id}).Debug("got metadata")
return utils.MapToJson(&userMeta)
}
func (db *Mongodb) processQueryError(query, dbname string, err error) ([]byte, error) {
log_str := "error processing query: " + query + " for " + dbname + " : " + err.Error()
logger.Debug(log_str)
func (db *Mongodb) processQueryError(query, dbname string, err error, rlog log.Logger) ([]byte, error) {
rlog.WithFields(map[string]interface{}{"query": query, "cause": err.Error()}).Debug("error processing query")
return nil, &DBError{utils.StatusNoData, err.Error()}
}
func (db *Mongodb) queryMessages(request Request) ([]byte, error) {
var res []map[string]interface{}
q, sort, err := db.BSONFromSQL(request.DbName, request.ExtraParam)
query := "stream = '" + request.Stream + "' AND " + request.ExtraParam.(string)
q, sort, err := db.BSONFromSQL(request.DbName(), query)
if err != nil {
log_str := "error parsing query: " + request.ExtraParam + " for " + request.DbName + " : " + err.Error()
logger.Debug(log_str)
request.Logger().WithFields(map[string]interface{}{"query": request.ExtraParam.(string), "cause": err.Error()}).Debug("error parsing query")
return nil, &DBError{utils.StatusWrongInput, err.Error()}
}
c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.Stream)
c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix)
opts := options.Find()
if len(sort) > 0 {
......@@ -786,15 +1080,18 @@ func (db *Mongodb) queryMessages(request Request) ([]byte, error) {
cursor, err := c.Find(context.TODO(), q, opts)
if err != nil {
return db.processQueryError(request.ExtraParam, request.DbName, err)
return db.processQueryError(query, request.DbName(), err, request.Logger())
}
err = cursor.All(context.TODO(), &res)
if err != nil {
return db.processQueryError(request.ExtraParam, request.DbName, err)
return db.processQueryError(query, request.DbName(), err, request.Logger())
}
log_str := "processed query " + request.ExtraParam + " for " + request.DbName + " ,found" + strconv.Itoa(len(res)) + " records"
logger.Debug(log_str)
request.Logger().WithFields(map[string]interface{}{"query": request.ExtraParam.(string), "recordsFound": len(res)}).Debug("processed query")
for _, msg := range res {
message_fallback(msg)
}
if res != nil {
return utils.MapToJson(&res)
} else {
......@@ -810,50 +1107,28 @@ func makeRange(min, max int) []int {
return a
}
func extractsTwoIntsFromString(from_to string) (int, int, error) {
s := strings.Split(from_to, "_")
if len(s) != 2 {
return 0, 0, errors.New("wrong format: " + from_to)
}
from, err := strconv.Atoi(s[0])
if err != nil {
return 0, 0, err
}
to, err := strconv.Atoi(s[1])
if err != nil {
return 0, 0, err
}
return from, to, nil
}
func (db *Mongodb) getNacksLimits(request Request) (int, int, error) {
from, to, err := extractsTwoIntsFromString(request.ExtraParam)
if err != nil {
return 0, 0, err
}
if from == 0 {
from = 1
pars, ok := request.ExtraParam.(ExtraParamNacks)
if !ok {
return 0, 0, &DBError{utils.StatusWrongInput, failed_extract_extra_params}
}
if to == 0 {
to, err = db.getMaxLimitWithoutEndOfStream(request, err)
var err error
if pars.To == 0 {
pars.To, err = db.getMaxLimitWithoutEndOfStream(request, "message_id")
if err != nil {
return 0, 0, err
}
}
return from, to, nil
return pars.From, pars.To, nil
}
func (db *Mongodb) getMaxLimitWithoutEndOfStream(request Request, err error) (int, error) {
maxInd, err := db.getMaxIndex(request, true)
func (db *Mongodb) getMaxLimitWithoutEndOfStream(request Request, idKey string) (int, error) {
maxInd, err := db.getMaxIndex(request, true, idKey)
if err != nil {
return 0, err
}
_, last_err := db.getRecordByIDRaw(request, maxInd, maxInd)
_, _, last_err := db.getRecordByIDRaw(request, maxInd, maxInd, idKey)
if last_err != nil && maxInd > 0 {
maxInd = maxInd - 1
}
......@@ -880,11 +1155,11 @@ func (db *Mongodb) nacks(request Request) ([]byte, error) {
}
func (db *Mongodb) deleteCollection(request Request, name string) error {
return db.client.Database(request.DbName).Collection(name).Drop(context.Background())
return db.client.Database(request.DbName()).Collection(name).Drop(context.TODO())
}
func (db *Mongodb) collectionExist(request Request, name string) (bool, error) {
result, err := db.client.Database(request.DbName).ListCollectionNames(context.TODO(), bson.M{"name": name})
result, err := db.client.Database(request.DbName()).ListCollectionNames(context.TODO(), bson.M{"name": name})
if err != nil {
return false, err
}
......@@ -895,22 +1170,21 @@ func (db *Mongodb) collectionExist(request Request, name string) (bool, error) {
}
func (db *Mongodb) deleteDataCollection(errorOnNotexist bool, request Request) error {
dataCol := data_collection_name_prefix + request.Stream
if errorOnNotexist {
exist, err := db.collectionExist(request, dataCol)
if err != nil {
return err
}
if !exist {
return &DBError{utils.StatusWrongInput, "stream " + request.Stream + " does not exist"}
}
c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix)
filter := bson.M{"stream": request.Stream}
count, err := c.DeleteMany(context.TODO(), filter)
if err != nil {
return &DBError{utils.StatusWrongInput, err.Error()}
}
if count.DeletedCount == 0 && errorOnNotexist {
return &DBError{utils.StatusWrongInput, "No messages in given stream to be deleted"}
}
return db.deleteCollection(request, dataCol)
return nil
}
func (db *Mongodb) deleteDocumentsInCollection(request Request, collection string, field string, pattern string) error {
filter := bson.M{field: bson.D{{"$regex", primitive.Regex{Pattern: pattern, Options: "i"}}}}
_, err := db.client.Database(request.DbName).Collection(collection).DeleteMany(context.TODO(), filter)
_, err := db.client.Database(request.DbName()).Collection(collection).DeleteMany(context.TODO(), filter)
return err
}
......@@ -923,7 +1197,7 @@ func escapeQuery(query string) (res string) {
}
func (db *Mongodb) deleteCollectionsWithPrefix(request Request, prefix string) error {
cols, err := db.client.Database(request.DbName).ListCollectionNames(context.TODO(), bson.M{"name": bson.D{
cols, err := db.client.Database(request.DbName()).ListCollectionNames(context.TODO(), bson.M{"name": bson.D{
{"$regex", primitive.Regex{Pattern: "^" + escapeQuery(prefix), Options: "i"}}}})
if err != nil {
return err
......@@ -940,50 +1214,93 @@ func (db *Mongodb) deleteCollectionsWithPrefix(request Request, prefix string) e
}
func (db *Mongodb) deleteServiceMeta(request Request) error {
err := db.deleteCollectionsWithPrefix(request, acks_collection_name_prefix+request.Stream)
// Delete InProcess
c := db.client.Database(request.DbName()).Collection(inprocess_collection_name_prefix)
filter := bson.M{"stream": request.Stream}
_, err := c.DeleteMany(context.TODO(), filter)
if err != nil {
return err
}
err = db.deleteCollectionsWithPrefix(request, inprocess_collection_name_prefix+request.Stream)
// Delete acks
c = db.client.Database(request.DbName()).Collection(acks_collection_name_prefix)
filter = bson.M{"stream": request.Stream}
_, err = c.DeleteMany(context.TODO(), filter)
if err != nil {
return err
}
return db.deleteDocumentsInCollection(request, pointer_collection_name, "_id", ".*_"+escapeQuery(request.Stream)+"$")
}
func (db *Mongodb) deleteStream(request Request) ([]byte, error) {
params := struct {
ErrorOnNotExist *bool
DeleteMeta *bool
}{}
// Delete current location
err = db.deleteDocumentsInCollection(request, pointer_collection_name, "_id", ".*_"+escapeQuery(request.Stream))
if err != nil {
return err
}
err := json.Unmarshal([]byte(request.ExtraParam), &params)
// Delete meta
err = db.deleteDocumentsInCollection(request, meta_collection_name, "_id", ".*_"+escapeQuery(request.Stream)+"_.*")
if err != nil {
return nil, err
return err
}
if params.DeleteMeta == nil || params.ErrorOnNotExist == nil {
return nil, &DBError{utils.StatusWrongInput, "wrong params: " + request.ExtraParam}
// Delete from stream list
err = db.deleteDocumentsInCollection(request, streams_info, "name", escapeQuery(request.Stream))
if err != nil {
return err
}
if !*params.DeleteMeta {
logger.Debug("skipping delete stream meta for " + request.Stream + " in " + request.DbName)
return nil
}
func (db *Mongodb) deleteStream(request Request) ([]byte, error) {
params, ok := request.ExtraParam.(ExtraParamDelete)
if !ok {
return nil, &DBError{utils.StatusWrongInput, failed_extract_extra_params}
}
if !params.DeleteMeta {
request.Logger().Debug("skipping delete stream meta")
return nil, nil
}
err = db.deleteDataCollection(*params.ErrorOnNotExist, request)
err := db.deleteDataCollection(params.ErrorOnNotExist, request)
if err != nil {
return nil, err
}
delete(streams.records, request.DbName())
err = db.deleteServiceMeta(request)
return nil, err
}
func (db *Mongodb) persistStream(request Request) ([]byte, error) {
if db.client == nil {
return nil, &DBError{utils.StatusServiceUnavailable, no_session_msg}
}
maxNum, _ := request.ExtraParam.(int)
c := db.client.Database(request.DbName()).Collection(meta_collection_name)
q := bson.M{"_id": "persist"}
l := PersistedStreamsList{}
err := c.FindOne(context.TODO(), q, options.FindOne()).Decode(&l)
if err != nil && err != mongo.ErrNoDocuments {
return nil, err
}
if len(l.Streams) >= maxNum {
return nil, &DBError{utils.StatusWrongInput, too_many_persisted_streams}
}
l.Streams = append(l.Streams, request.Stream)
_, err = c.InsertOne(context.TODO(), l)
return nil, err
}
func (db *Mongodb) lastAck(request Request) ([]byte, error) {
c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId)
opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true)
c := db.client.Database(request.DbName()).Collection(acks_collection_name_prefix)
opts := options.FindOne().SetSort(bson.M{"message_id": -1}).SetReturnKey(true)
result := LastAck{0}
var q bson.M = nil
q := bson.M{"stream": request.Stream, "group_id": request.GroupId}
err := c.FindOne(context.TODO(), q, opts).Decode(&result)
if err == mongo.ErrNoDocuments {
return utils.MapToJson(&result)
......@@ -1016,13 +1333,23 @@ func (db *Mongodb) canAvoidDbRequest(min_index int, max_index int, c *mongo.Coll
return nil, nil, false
}
func getNacksQuery(max_index int, min_index int) []bson.D {
matchStage := bson.D{{"$match", bson.D{{"_id", bson.D{{"$lt", max_index + 1}, {"$gt", min_index - 1}}}}}}
func getNacksQuery(request Request, max_index int, min_index int) []bson.D {
matchStage := bson.D{
{"$match", bson.D{
{"message_id", bson.D{
{"$lt", max_index + 1},
{"$gt", min_index - 1},
}},
{"stream", request.Stream},
{"group_id", request.GroupId},
}},
}
groupStage := bson.D{
{"$group", bson.D{
{"_id", 0},
{"numbers", bson.D{
{"$push", "$_id"},
{"$push", "$message_id"},
}}},
}}
projectStage := bson.D{
......@@ -1039,7 +1366,7 @@ func extractNacsFromCursor(err error, cursor *mongo.Cursor) ([]int, error) {
resp := []struct {
Numbers []int
}{}
err = cursor.All(context.Background(), &resp)
err = cursor.All(context.TODO(), &resp)
if err != nil || len(resp) != 1 {
return []int{}, err
}
......@@ -1047,68 +1374,112 @@ func extractNacsFromCursor(err error, cursor *mongo.Cursor) ([]int, error) {
}
func (db *Mongodb) getNacks(request Request, min_index, max_index int) ([]int, error) {
c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId)
c := db.client.Database(request.DbName()).Collection(acks_collection_name_prefix)
if res, err, ok := db.canAvoidDbRequest(min_index, max_index, c); ok {
return res, err
}
query := getNacksQuery(max_index, min_index)
cursor, err := c.Aggregate(context.Background(), query)
query := getNacksQuery(request, max_index, min_index)
cursor, err := c.Aggregate(context.TODO(), query)
if err != nil {
return []int{}, err
}
return extractNacsFromCursor(err, cursor)
}
func (db *Mongodb) getStreams(request Request) ([]byte, error) {
rec, err := streams.getStreams(db, request)
if err != nil {
return db.processQueryError("get streams", request.DbName, err)
return db.processQueryError("get streams", request.DbName(), err, request.Logger())
}
return json.Marshal(&rec)
}
func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) {
func (db *Mongodb) getDatasources(request Request) ([]byte, error) {
filter := bson.M{"name": primitive.Regex{Pattern: "^" + request.Beamtime + "_"}}
rec, err := db.client.ListDatabaseNames(context.TODO(), filter)
if err != nil {
return db.processQueryError("get streams", request.DbName(), err, request.Logger())
}
for i, source := range rec {
rec[i] = source[len(request.Beamtime)+1:]
}
res := Sources{rec}
return json.Marshal(&res)
}
func (db *Mongodb) checkDuplicate(request Request) ([]byte, error) {
params, ok := request.ExtraParam.(ExtraParamMessage)
if !ok {
return nil, &DBError{utils.StatusWrongInput, "failed to parse extra parameters"}
}
if params.Message.DatasetSubstream > 0 {
request.Substream = params.Message.DatasetSubstream
}
return db.processDuplicatedId(request, params.Message)
}
func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error, idx uint64) {
dbClientLock.RLock()
defer dbClientLock.RUnlock()
if err := db.checkDatabaseOperationPrerequisites(request); err != nil {
return nil, err
return nil, err, 0
}
if err := encodeRequest(&request); err != nil {
return nil, err
return nil, err, 0
}
idx = 0
switch request.Op {
case "next":
return db.getNextRecord(request)
answer, idx, err = db.getNextRecord(request)
case "id":
return db.getRecordByID(request)
answer, err, idx = db.getRecordByID(request)
case "last":
return db.getLastRecord(request)
answer, err, idx = db.getLastRecord(request, "message_id")
case "groupedlast":
return db.getLastRecordInGroup(request)
answer, idx, err = db.getLastRecordInGroup(request, "message_id")
case "resetcounter":
return db.resetCounter(request)
answer, err = db.resetCounter(request)
case "size":
return db.getSize(request)
answer, err = db.getSize(request)
case "meta":
return db.getMeta(request)
answer, err = db.getMeta(request)
case "querymessages":
return db.queryMessages(request)
answer, err = db.queryMessages(request)
case "streams":
return db.getStreams(request)
answer, err = db.getStreams(request)
case "datasources":
answer, err = db.getDatasources(request)
case "ackmessage":
return db.ackRecord(request)
answer, err = db.ackRecord(request)
case "negackmessage":
return db.negAckRecord(request)
answer, err = db.negAckRecord(request)
case "nacks":
return db.nacks(request)
answer, err = db.nacks(request)
case "lastack":
return db.lastAck(request)
answer, err = db.lastAck(request)
case "delete_stream":
return db.deleteStream(request)
answer, err = db.deleteStream(request)
case "persist_stream":
answer, err = db.persistStream(request)
case "write_meta":
answer, err = db.writeMeta(request)
case "write_message":
answer, err = db.writeMessage(request)
case "check_duplicate":
answer, err = db.checkDuplicate(request)
default:
answer, err = nil, errors.New("Wrong db operation: "+request.Op)
}
return nil, errors.New("Wrong db operation: " + request.Op)
return answer, err, idx
}
package database
import (
"asapo_common/utils"
"context"
"encoding/json"
"math/rand"
"strconv"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type ExtraParamMessage struct {
Id int64 `json:"id"`
DatasetSize int `json:"dataset_size"`
Message MessageRaw `json:"message"`
}
type ExtraParamMeta struct {
Id string `json:"id"`
Mode string `json:"mode"`
Upsert bool `json:"upsert"`
Message map[string]interface{} `json:"message"`
}
type MessageRaw struct {
Uid string `bson:"_id" json:"unique_id"`
Id int64 `bson:"message_id" json:"_id"`
TimeId int64 `bson:"time_id" json:"time_id"`
Timestamp int64 `bson:"timestamp" json:"timestamp"`
Name string `bson:"name" json:"name"`
Meta map[string]interface{} `bson:"meta" json:"meta"`
Source string `bson:"source" json:"source"`
IbSource string `bson:"ib_source" json:"ib_source"`
BufId int64 `bson:"buf_id" json:"buf_id"`
Stream string `bson:"stream" json:"stream"`
DatasetSubstream int `bson:"dataset_substream" json:"dataset_substream"`
IngestMode int32 `bson:"ingest_mode" json:"ingest_mode"`
Size int32 `bson:"size" json:"size"`
}
func (db *Mongodb) getNextId(request Request) (int64, error) {
filter := bson.M{"_id": request.DataSource + "_" + request.Stream}
update := bson.M{"$inc": bson.M{"value": int64(1)}}
options := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After)
type NextId struct {
DataSource string `bson:"_id" `
Id int64 `bson:"value"`
}
var res NextId
c := db.client.Database(request.DbName()).Collection(auto_id_counter_name)
err := c.FindOneAndUpdate(context.TODO(), filter, update, options).Decode(&res)
if err != nil {
return 0, err
}
return res.Id, nil
}
func (db *Mongodb) createIndices(request Request) error {
indexModel := mongo.IndexModel{
Keys: bson.D{{Key: "stream", Value: 1}, {Key: "message_id", Value: 1}}, // Compound index on message_id and stream
Options: options.Index().SetUnique(true),
}
c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix)
_, err := c.Indexes().CreateOne(context.TODO(), indexModel)
if err != nil {
return &DBError{utils.StatusServiceUnavailable, "failed to create index: " + err.Error()}
}
indexModel = mongo.IndexModel{
Keys: bson.D{{Key: "stream", Value: 1}, {Key: "time_id", Value: -1}}, // Compound index on message_id and stream
Options: options.Index().SetUnique(true),
}
c = db.client.Database(request.DbName()).Collection(data_collection_name_prefix)
_, err = c.Indexes().CreateOne(context.TODO(), indexModel)
if err != nil {
return &DBError{utils.StatusServiceUnavailable, "failed to create index: " + err.Error()}
}
indexModel = mongo.IndexModel{
Keys: bson.D{{Key: "stream", Value: 1}, {Key: "group_id", Value: 1}, {Key: "internal", Value: 1}, {Key: "id", Value: 1}},
Options: options.Index().SetUnique(true),
}
c = db.client.Database(request.DbName()).Collection(inprocess_collection_name_prefix)
_, err = c.Indexes().CreateOne(context.TODO(), indexModel)
if err != nil {
return &DBError{utils.StatusServiceUnavailable, "failed to create index: " + err.Error()}
}
c = db.client.Database(request.DbName()).Collection(acks_collection_name_prefix)
indexModel = mongo.IndexModel{
Keys: bson.D{{Key: "stream", Value: 1}, {Key: "group_id", Value: 1}, {Key: "message_id", Value: 1}},
Options: options.Index().SetUnique(true),
}
_, err = c.Indexes().CreateOne(context.TODO(), indexModel)
if err != nil {
return &DBError{utils.StatusServiceUnavailable, "failed to create index: " + err.Error()}
}
c = db.client.Database(request.DbName()).Collection(streams_info)
indexModel = mongo.IndexModel{
Keys: bson.D{{Key: "name", Value: 1}},
Options: options.Index().SetUnique(true),
}
_, err = c.Indexes().CreateOne(context.TODO(), indexModel)
if err != nil {
return &DBError{utils.StatusServiceUnavailable, "failed to create index: " + err.Error()}
}
request.Logger().Debug("Create all indices")
return nil
}
// This function try to guess, when DB index need to be created without mutex lock
// In data stream some messages can be missing, therefore we create DB index
// on different message_ids. Create index on each message_id is time-consuming.
func (db *Mongodb) needCreateIndex(id int64) bool {
if id < 1000 {
// 1% chance to create an index for the first 1000 messages
if rand.Intn(100) == 0 {
return true
}
} else {
// 0.1% chance to create an index for all other messages
if rand.Intn(1000) == 0 {
return true
}
}
return false
}
func (db *Mongodb) writeMessage(request Request) ([]byte, error) {
params, ok := request.ExtraParam.(ExtraParamMessage)
if !ok {
return nil, &DBError{utils.StatusWrongInput, "failed to parse extra parameters"}
}
if db.needCreateIndex(params.Message.Id) {
err := db.createIndices(request)
if err != nil {
return nil, err
}
}
if params.Message.Name == finish_stream_keyword {
return db.writeStreamFinishFlag(request, params.Message)
}
timeId, err := db.getNextId(request)
if err != nil {
return nil, &DBError{utils.StatusWrongInput, "failed to get next id: " + err.Error()}
}
params.Message.TimeId = timeId
// This ensures, that message is unique even if index is not created
params.Message.Uid = params.Message.Stream + "_" + strconv.Itoa(int(params.Message.Id))
if params.DatasetSize > 0 {
return db.writeAsDataset(request, params.Message, params.DatasetSize)
} else {
return db.writeAsMessage(request, params.Message)
}
}
func (db *Mongodb) writeStreamFinishFlag(request Request, message MessageRaw) ([]byte, error) {
c := db.client.Database(request.DbName()).Collection(streams_info)
filter := bson.M{"name": message.Stream}
nextStream, ok := message.Meta["next_stream"]
if !ok {
nextStream = ""
}
ts, err := getCreationTime(db, request.DbName(), message.Stream)
if err != nil {
if err != mongo.ErrNoDocuments {
request.Logger().WithFields(map[string]interface{}{"id": message.Id, "cause": err.Error()}).Error("failed to insert stream finish flag")
return nil, &DBError{utils.StatusTransactionInterrupted, "failed to insert record " + err.Error()}
}
request.Logger().WithFields(map[string]interface{}{"id": message.Id}).Warning("stream finished, but no messages found")
}
mRec, err := getLastMessageRecord(db, request.DbName(), message.Stream)
if err != nil {
if err != mongo.ErrNoDocuments {
request.Logger().WithFields(map[string]interface{}{"id": message.Id, "cause": err.Error()}).Error("failed to insert stream finish flag")
return nil, &DBError{utils.StatusTransactionInterrupted, "failed to insert record " + err.Error()}
}
}
update_info := bson.M{"finished": true, "lastId": message.Id - 1, "nextStream": nextStream, "timestampCreated": ts,
"timestampLast": int64(mRec.Timestamp)}
update := bson.M{"$set": update_info}
options := options.Update().SetUpsert(true)
_, err = c.UpdateOne(context.TODO(), filter, update, options)
if err != nil {
// Duplicate error can happen because we set Upsert=true to allow insert entries when it is absent. But then it will try to insert;
// See https://jira.mongodb.org/browse/SERVER-47212
// ToDo: Remove this change after update to MongoDB >= 5.0.
if err == mongo.ErrNoDocuments || duplicateError(err) {
_, err = c.UpdateOne(context.TODO(), filter, update, options)
}
request.Logger().WithFields(map[string]interface{}{"id": message.Id, "cause": err.Error()}).Error("failed to insert stream finish flag")
return nil, &DBError{utils.StatusWrongInput, "failed to insert record " + err.Error()}
}
request.Logger().WithFields(map[string]interface{}{"id": message.Id, "nextStream": nextStream}).Debug("insert stream finish flag")
return nil, nil
}
// Check if requested message is the same as reference
func (db *Mongodb) processDuplicatedId(request Request, message MessageRaw) ([]byte, error) {
request.Logger().WithFields(map[string]interface{}{"id": message.Id, "substream": message.DatasetSubstream}).Debug("Check for duplications")
rec, err := db.getRecordFromDb(request, int(message.Id), 1, "message_id")
if err != nil {
request.Logger().WithFields(map[string]interface{}{"id": message.Id, "substream": message.DatasetSubstream, "cause": err.Error()}).Warning("Error while checking for duplications")
return nil, err
}
if message.Name == finish_stream_keyword {
return nil, nil
}
// If the message is a dataset, extract the message for the requested substream and
// treat that as a normal message in the rest of the function
if _, ok := rec["messages"]; ok {
messages_bson, ok := rec["messages"].(primitive.A)
if !ok || len(messages_bson) != 1 {
request.Logger().WithFields(map[string]interface{}{"id": message.Id, "substream": message.DatasetSubstream, "cause": "failed to decode the message present in the database"}).Error("Error while checking for duplications")
return nil, &DBError{utils.StatusWrongInput, "Duplicated Id"}
}
rec, ok = messages_bson[0].(map[string]interface{})
if !ok {
request.Logger().WithFields(map[string]interface{}{"id": message.Id, "substream": message.DatasetSubstream, "cause": "failed to decode the substream of the message present in the database"}).Error("Error while checking for duplications")
return nil, &DBError{utils.StatusWrongInput, "Duplicated Id"}
}
}
var db_message MessageRaw
_ = utils.MapToStruct(rec, &db_message)
db_meta, err1 := json.Marshal(db_message.Meta)
meta, err2 := json.Marshal(message.Meta)
if err1 != nil || err2 != nil {
return nil, &DBError{utils.StatusWrongInput, "Duplicated Id"}
}
// These messages are considered to be the same
if db_message.DatasetSubstream == message.DatasetSubstream && db_message.Name == message.Name && db_message.Size == message.Size && string(db_meta) == string(meta) {
return nil, &DBError{utils.StatusWrongInput, "Duplicated message"}
}
// If messages are different datasetSubstreams they can have the same Id.
if db_message.DatasetSubstream > 0 && message.DatasetSubstream > 0 && db_message.DatasetSubstream != message.DatasetSubstream {
return nil, &DBError{utils.StatusNoData, encodeAnswer(int(message.Id), int(message.Id), "")}
}
return nil, &DBError{utils.StatusWrongInput, "Duplicated Id"}
}
func (db *Mongodb) writeAsMessage(request Request, message MessageRaw) ([]byte, error) {
// AutoId case
if message.Id == 0 {
message.Id = message.TimeId
}
if message.DatasetSubstream > 0 {
return nil, &DBError{utils.StatusWrongInput, "DatasetSubstream > 0"}
}
c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix)
_, err := c.InsertOne(context.TODO(), message)
if err != nil {
if duplicateError(err) {
return db.processDuplicatedId(request, message)
}
request.Logger().WithFields(map[string]interface{}{"id": message.Id, "cause": err.Error()}).Error("failed to insert record")
return nil, &DBError{utils.StatusWrongInput, "failed to insert record " + err.Error()}
}
request.Logger().WithFields(map[string]interface{}{"id": message.Id}).Debug("insert record into database")
return nil, nil
}
func (db *Mongodb) writeAsDataset(request Request, message MessageRaw, size int) ([]byte, error) {
filter := bson.M{"_id": message.Uid,
"message_id": message.Id, "stream": message.Stream, "dataset_size": int64(size),
"messages.dataset_substream": bson.M{"$ne": message.DatasetSubstream}}
update := bson.M{
"$set": bson.M{"timestamp": message.Timestamp},
"$setOnInsert": bson.M{"time_id": message.TimeId},
"$push": bson.M{"messages": message},
}
options := options.Update().SetUpsert(true)
c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix)
_, err := c.UpdateOne(context.TODO(), filter, update, options)
// first update may fail due to multiple threads try to create document at once, the second one should succeed
// https://jira.mongodb.org/browse/SERVER-14322
if duplicateError(err) {
_, err = c.UpdateOne(context.TODO(), filter, update, options)
}
if err != nil {
if duplicateError(err) {
request.Substream = message.DatasetSubstream
return db.processDuplicatedId(request, message)
}
request.Logger().WithFields(map[string]interface{}{"id": message.Id,
"substream": message.DatasetSubstream, "cause": err.Error()}).Error("failed to insert dataset")
return nil, &DBError{utils.StatusWrongInput, "failed to insert dataset " + err.Error()}
}
request.Logger().WithFields(map[string]interface{}{"id": message.Id, "substream": message.DatasetSubstream}).Debug("insert substream record into database")
return nil, nil
}
func (db *Mongodb) writeMeta(request Request) ([]byte, error) {
var params, ok = request.ExtraParam.(ExtraParamMeta)
if !ok {
return nil, &DBError{utils.StatusWrongInput, "failed to parse extra parameters "}
}
c := db.client.Database(request.DbName()).Collection(meta_collection_name)
// Ingest modes are defined in: common/cpp/include/asapo/common/data_structs.h
var err error
switch params.Mode {
default:
return nil, &DBError{utils.StatusWrongInput, "Unexpected metadata injest mode " + params.Mode}
case "1": // Insert
insert_info := bson.M{"_id": params.Id, "meta": params.Message}
_, err = c.InsertOne(context.TODO(), insert_info)
case "2": // Replace
filter := bson.M{"_id": params.Id}
update := bson.M{"$set": bson.M{"meta": params.Message}}
options := options.Update().SetUpsert(params.Upsert)
_, err = c.UpdateOne(context.TODO(), filter, update, options)
case "3": // Update
filter := bson.M{"_id": params.Id}
update_info := bson.M{}
for k, v := range params.Message {
update_info["meta."+k] = v
}
update := bson.M{"$set": update_info}
options := options.Update().SetUpsert(params.Upsert)
_, err = c.UpdateOne(context.TODO(), filter, update, options)
}
if err != nil {
request.Logger().WithFields(map[string]interface{}{"meta_id": params.Id, "cause": err.Error()}).Error("failed to insert meta")
return nil, &DBError{utils.StatusWrongInput, "failed to insert meta: " + err.Error()}
}
request.Logger().WithFields(map[string]interface{}{"meta_id": params.Id}).Debug("Insert meta into database")
return nil, err
}
//+build !test
//go:build !test
// +build !test
package database
import (
"errors"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"github.com/blastrain/vitess-sqlparser/sqlparser"
"strconv"
"github.com/blastrain/vitess-sqlparser/sqlparser"
"go.mongodb.org/mongo-driver/bson"
)
func SQLOperatorToMongo(sqlOp string) string {
......
//+build !test
//go:build !test
// +build !test
package database
import (
log "asapo_common/logger"
"asapo_common/utils"
"context"
"errors"
"go.mongodb.org/mongo-driver/bson"
"fmt"
"sort"
"strings"
"sync"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
)
type GetStreamsParams struct {
From string `json:"from"` // Filter streams with given name and all later streams. Compatible with Single.
Filter string `json:"filter"` // Filter by status: finished/unfinished/all
Detailed bool `json:"detailed"` // Update stream information with the last message in the streams(TimestampLast, LastID, StreamFinishedFlag)
Last bool `json:"last"` // Filter only last stream from the list of streams. Single flag is ignored.
Single bool `json:"single"` // Filter only one First stream from the list
}
type StreamInfo struct {
LastId int64 `json:"lastId"`
Name string `json:"name"`
Timestamp int64 `json:"timestampCreated"`
TimestampLast int64 `json:"timestampLast"`
Finished bool `json:"finished"`
NextStream string `json:"nextStream"`
LastId int64 `bson:"lastId" json:"lastId"`
Name string `bson:"name" json:"name"`
TimestampCreated int64 `bson:"timestampCreated" json:"timestampCreated"`
TimestampLast int64 `bson:"timestampLast" json:"timestampLast"`
Finished bool `bson:"finished" json:"finished"`
NextStream string `bson:"nextStream" json:"nextStream"`
}
type StreamsRecord struct {
......@@ -29,56 +43,61 @@ type StreamsRecord struct {
type Streams struct {
records map[string]StreamsRecord
lastUpdated map[string]time.Time
lastSynced map[string]time.Time
}
var streams = Streams{lastSynced: make(map[string]time.Time, 0),lastUpdated: make(map[string]time.Time, 0), records: make(map[string]StreamsRecord, 0)}
var streams = Streams{lastUpdated: make(map[string]time.Time, 0), records: make(map[string]StreamsRecord, 0)}
var streamsLock sync.Mutex
func (ss *Streams) tryGetFromCache(db_name string, updatePeriodMs int) (StreamsRecord, error) {
if time.Now().Sub(ss.lastUpdated[db_name]).Milliseconds() > int64(updatePeriodMs) {
func (ss *Streams) getStreamsInfoFromCache(db_name string, updatePeriodMs int) (StreamsRecord, error) {
if time.Now().Sub(ss.lastUpdated[db_name]).Milliseconds() >= int64(updatePeriodMs) {
return StreamsRecord{}, errors.New("cache expired")
}
rec, ok := ss.records[db_name]
if !ok {
return StreamsRecord{}, errors.New("no records for " + db_name)
}
res :=StreamsRecord{}
utils.DeepCopy(rec,&res)
res := StreamsRecord{}
utils.DeepCopy(rec, &res)
return res, nil
}
func readStreams(db *Mongodb, db_name string) (StreamsRecord, error) {
database := db.client.Database(db_name)
result, err := database.ListCollectionNames(context.TODO(), bson.D{})
// Return StreamsRecord with names of streams filled from DB
// Function query list of collections for given db_name and them filter it by name
func readAllStreamNames(db *Mongodb, db_name string) (StreamsRecord, error) {
collection := db.client.Database(db_name).Collection(data_collection_name_prefix)
result, err := collection.Distinct(context.TODO(), "stream", bson.M{})
if err != nil {
return StreamsRecord{}, err
}
var rec = StreamsRecord{[]StreamInfo{}}
for _, coll := range result {
if strings.HasPrefix(coll, data_collection_name_prefix) {
sNameEncoded:= strings.TrimPrefix(coll, data_collection_name_prefix)
si := StreamInfo{Name: decodeString(sNameEncoded)}
rec.Streams = append(rec.Streams, si)
}
si := StreamInfo{Name: fmt.Sprint(coll)}
rec.Streams = append(rec.Streams, si)
}
return rec, nil
}
func getCurrentStreams(db_name string) []StreamInfo {
ss, dbFound := streams.records[db_name]
currentStreams := []StreamInfo{}
if dbFound {
// sort streams by name
currentStreams = ss.Streams
sort.Slice(currentStreams, func(i, j int) bool {
return currentStreams[i].Name >= currentStreams[j].Name
})
}
return currentStreams
// Read stream info for all streams from streams_info collection
// If correction is empty or does not exist return empty array
// and create collecton with unique index `name`
func readStreamsInfo(db *Mongodb, db_name string) ([]StreamInfo, error) {
coll := db.client.Database(db_name).Collection(streams_info)
opts := options.Find().SetSort(bson.D{{"name", 1}}) // Order by name to simplify search
cursor, err := coll.Find(context.TODO(), bson.D{}, opts)
if err != nil {
log.Error("Get streams fails", err)
return []StreamInfo{}, err
}
var rec []StreamInfo
if err = cursor.All(context.TODO(), &rec); err != nil {
log.Error("Decoding streams fails", err)
return []StreamInfo{}, err
}
return rec, nil
}
func findStreamAmongCurrent(currentStreams []StreamInfo, record StreamInfo) (int, bool) {
// currentStreams is already ordered by Name
ind := sort.Search(len(currentStreams), func(i int) bool {
return currentStreams[i].Name >= record.Name
})
......@@ -88,43 +107,53 @@ func findStreamAmongCurrent(currentStreams []StreamInfo, record StreamInfo) (int
return -1, false
}
func fillInfoFromEarliestRecord(db *Mongodb, db_name string, rec *StreamsRecord, record StreamInfo, i int) error {
res, err := db.getEarliestRawRecord(db_name, encodeStringForColName(record.Name))
func getCreationTime(db *Mongodb, db_name string, stream string) (int64, error) {
res, err := db.getEarliestRawRecord(db_name, stream)
if err != nil {
return err
return 0, err
}
ts, ok := utils.GetInt64FromMap(res, "timestamp")
if ok {
rec.Streams[i].Timestamp = ts
} else {
return errors.New("fillInfoFromEarliestRecord: cannot extact timestamp")
return ts, nil
}
return nil
return 0, errors.New("cannot extract timestamp")
}
func fillInfoFromLastRecord(db *Mongodb, db_name string, rec *StreamsRecord, record StreamInfo, i int) error {
res, err := db.getLastRawRecord(db_name, encodeStringForColName(record.Name))
func getLastMessageRecord(db *Mongodb, db_name string, stream string) (MessageRecord, error) {
res, err := db.getLastRawRecord(db_name, stream)
if err != nil {
return err
return MessageRecord{}, err
}
mrec, ok := ExtractMessageRecord(res)
if !ok {
return errors.New("fillInfoFromLastRecord: cannot extract record")
return MessageRecord{}, errors.New("cannot extract record")
}
return mrec, nil
}
func fillInfoFromEarliestRecord(db *Mongodb, db_name string, rec *StreamsRecord, record StreamInfo, i int) error {
ts, err := getCreationTime(db, db_name, record.Name)
if err == nil {
rec.Streams[i].TimestampCreated = ts
} else {
return errors.New("fillInfoFromEarliestRecord: cannot extact timestamp")
}
return nil
}
func fillInfoFromLastRecord(db *Mongodb, db_name string, rec *StreamsRecord, record StreamInfo, i int) error {
mrec, err := getLastMessageRecord(db, db_name, record.Name)
if err != nil {
return errors.New("fillInfoFromLastRecord: cannot extract record, " + err.Error())
}
rec.Streams[i].LastId = int64(mrec.ID)
rec.Streams[i].TimestampLast = int64(mrec.Timestamp)
rec.Streams[i].Finished = mrec.FinishedStream
if mrec.FinishedStream {
rec.Streams[i].LastId = rec.Streams[i].LastId - 1
if mrec.NextStream != no_next_stream_keyword {
rec.Streams[i].NextStream = mrec.NextStream
}
}
return nil
}
func updateStreamInfofromCurrent(currentStreams []StreamInfo, record StreamInfo, rec *StreamInfo) (found bool, updateFinished bool) {
// If stream name from record is found in currentStreams corresponding
// record get copied to rec.
func updateStreamInfoFromCurrent(currentStreams []StreamInfo, record StreamInfo, rec *StreamInfo) (found bool, updateFinished bool) {
ind, found := findStreamAmongCurrent(currentStreams, record)
if found {
*rec = currentStreams[ind]
......@@ -135,20 +164,63 @@ func updateStreamInfofromCurrent(currentStreams []StreamInfo, record StreamInfo,
return found, false
}
func updateStreamInfos(db *Mongodb, db_name string, rec *StreamsRecord,forceSync bool) error {
currentStreams := getCurrentStreams(db_name)
// Update streams_info collection with current StreamInfo
func updateStreamInfoDB(db *Mongodb, db_name string, rec StreamInfo) error {
coll := db.client.Database(db_name).Collection(streams_info)
filter := bson.M{"name": rec.Name}
updateInfo := bson.M{"lastId": rec.LastId, "finished": rec.Finished, "timestampCreated": rec.TimestampCreated,
"timestampLast": rec.TimestampLast}
update := bson.M{"$set": updateInfo}
opts := options.Update().SetUpsert(true)
_, err := coll.UpdateOne(context.TODO(), filter, update, opts)
return err
}
// Update streams_info collection with current StreamInfo
func getStreamInfoDB(db *Mongodb, db_name string, stream_name string) (StreamInfo, error) {
q := bson.M{"name": stream_name}
opts := options.FindOne()
var res StreamInfo
coll := db.client.Database(db_name).Collection(streams_info)
err := coll.FindOne(context.TODO(), q, opts).Decode(&res)
return res, err
}
// Fill stream information from different sources to the rec.
// Stream information is taken from collection streams_info
// If stream is not finished, only timestamp is valid
// If stream is not found in streams_info information is
// extracted form data collections
// If detailed is True, information is extracted from data collection
func updateStreamInfos(db *Mongodb, db_name string, rec *StreamsRecord, detailed bool) error {
currentStreams, err := readStreamsInfo(db, db_name)
if err != nil {
return err
}
for i, record := range rec.Streams {
found, mayContinue := updateStreamInfofromCurrent(currentStreams, record, &rec.Streams[i])
if mayContinue && !forceSync {
found, finished := updateStreamInfoFromCurrent(currentStreams, record, &rec.Streams[i])
if found && (!detailed || finished) { // Don't need to update if stream is finished
continue
}
if !found || forceSync { // set timestamp
if !found { // set timestamp
if err := fillInfoFromEarliestRecord(db, db_name, rec, record, i); err != nil {
return err
}
}
if err := fillInfoFromLastRecord(db, db_name, rec, record, i); err != nil { // update last record (timestamp, stream finished flag)
return err
if detailed { // update last record (TimestampLast, LastId)
if err := fillInfoFromLastRecord(db, db_name, rec, record, i); err != nil {
return err
}
}
if !found || rec.Streams[i].Finished {
// Inset stream info in DB if is it not yet there
err := updateStreamInfoDB(db, db_name, rec.Streams[i])
// Error may happen if two brokers works with the same stream and try to
// populate streams info.
if err != nil {
log.Debug("Update stream info in DB failed", err)
}
}
}
return nil
......@@ -156,95 +228,65 @@ func updateStreamInfos(db *Mongodb, db_name string, rec *StreamsRecord,forceSync
func sortRecords(rec *StreamsRecord) {
sort.Slice(rec.Streams[:], func(i, j int) bool {
return rec.Streams[i].Timestamp < rec.Streams[j].Timestamp
return rec.Streams[i].TimestampCreated < rec.Streams[j].TimestampCreated
})
}
func (ss *Streams) updateFromDb(db *Mongodb, db_name string) (StreamsRecord, error) {
rec, err := readStreams(db, db_name)
func getStreamsInfoFromDb(db *Mongodb, db_name string, detailed bool) (StreamsRecord, error) {
rec, err := readAllStreamNames(db, db_name)
if err != nil {
return StreamsRecord{}, err
}
forceSync:= false
if time.Now().Sub(ss.lastSynced[db_name]).Seconds() > 5 {
forceSync = true
}
err = updateStreamInfos(db, db_name, &rec,forceSync)
err = updateStreamInfos(db, db_name, &rec, detailed)
if err != nil {
return StreamsRecord{}, err
}
if forceSync {
ss.lastSynced[db_name] = time.Now()
}
sortRecords(&rec)
return rec, nil
}
// Save StreamsRecord in global variable for caching purpose
func (ss *Streams) cacheStreams(db_name string, rec StreamsRecord) {
if len(rec.Streams) > 0 {
res :=StreamsRecord{}
utils.DeepCopy(rec,&res)
ss.records[db_name] = res
result := StreamsRecord{}
utils.DeepCopy(rec, &result)
ss.records[db_name] = result
ss.lastUpdated[db_name] = time.Now()
}
return rec, nil
}
func getFiltersFromString(filterString string) (string, string, error) {
firstStream, streamStatus, err := utils.DecodeTwoStrings(filterString)
if err!=nil {
return "", "", errors.New("wrong format: " + filterString)
}
if streamStatus == "" {
streamStatus = stream_filter_all
}
return firstStream, streamStatus, nil
func filterStreamsByStatus(rec StreamInfo, streamStatus string) bool {
return (rec.Finished && streamStatus == Stream_filter_finished) || (!rec.Finished && streamStatus == Stream_filter_unfinished)
}
func getStreamsParamsFromRequest(request Request) (string, string, error) {
if request.ExtraParam == "" {
return "", stream_filter_all, nil
}
func filterStreams(rec StreamsRecord, pars GetStreamsParams) []StreamInfo {
firstStream, streamStatus, err := getFiltersFromString(request.ExtraParam)
if err != nil {
return "", "", err
if len(rec.Streams) == 0 {
return rec.Streams
}
err = checkStreamstreamStatus(streamStatus)
if err != nil {
return "", "", err
if pars.Last {
return rec.Streams[len(rec.Streams)-1:]
}
return firstStream, streamStatus, nil
}
func checkStreamstreamStatus(streamStatus string) error {
if !utils.StringInSlice(streamStatus, []string{stream_filter_all, stream_filter_finished, stream_filter_unfinished}) {
return errors.New("getStreamsParamsFromRequest: wrong streamStatus " + streamStatus)
limitedStreams := filterStreamsByName(rec, pars.From)
if pars.Single {
return limitedStreams[:1]
}
return nil
}
func keepStream(rec StreamInfo, streamStatus string) bool {
return (rec.Finished && streamStatus == stream_filter_finished) || (!rec.Finished && streamStatus == stream_filter_unfinished)
}
func filterStreams(rec StreamsRecord, firstStream string, streamStatus string) []StreamInfo {
limitedStreams := limitStreams(rec, firstStream)
if streamStatus == stream_filter_all {
if pars.Filter == Stream_filter_all {
return limitedStreams
}
nextStreams := limitedStreams[:0]
for _, rec := range limitedStreams {
if keepStream(rec, streamStatus) {
if filterStreamsByStatus(rec, pars.Filter) {
nextStreams = append(nextStreams, rec)
}
}
return nextStreams
}
func limitStreams(rec StreamsRecord, firstStream string) []StreamInfo {
func filterStreamsByName(rec StreamsRecord, firstStream string) []StreamInfo {
if firstStream != "" {
ind := len(rec.Streams)
for i, rec := range rec.Streams {
......@@ -259,22 +301,63 @@ func limitStreams(rec StreamsRecord, firstStream string) []StreamInfo {
}
func (ss *Streams) getStreams(db *Mongodb, request Request) (StreamsRecord, error) {
firstStream, streamStatus, err := getStreamsParamsFromRequest(request)
if err != nil {
return StreamsRecord{}, err
params, ok := request.ExtraParam.(GetStreamsParams)
if !ok {
return StreamsRecord{}, errors.New(failed_extract_extra_params)
}
if params.Single {
return ss.getStreamInfo(db, request.DbName(), params.From)
}
streamsLock.Lock()
rec, err := ss.tryGetFromCache(request.DbName, db.settings.UpdateStreamCachePeriodMs)
rec, err := ss.getStreamsInfoFromCache(request.DbName(), db.settings.UpdateStreamCachePeriodMs)
streamsLock.Unlock()
if err != nil {
rec, err = ss.updateFromDb(db, request.DbName)
rec, err = getStreamsInfoFromDb(db, request.DbName(), params.Detailed)
}
streamsLock.Unlock()
if err != nil {
return StreamsRecord{}, err
}
rec.Streams = filterStreams(rec, firstStream, streamStatus)
streamsLock.Lock()
ss.cacheStreams(request.DbName(), rec)
streamsLock.Unlock()
rec.Streams = filterStreams(rec, params)
return rec, nil
}
func (ss *Streams) getStreamInfo(db *Mongodb, db_name string, stream_name string) (StreamsRecord, error) {
var rec = StreamsRecord{[]StreamInfo{}}
streamInfo, err := getStreamInfoDB(db, db_name, stream_name)
// If stream is finished, take an information here
if err == nil && streamInfo.Finished {
rec.Streams = append(rec.Streams, streamInfo)
return rec, nil
}
timestampCreated, err := getCreationTime(db, db_name, stream_name)
if err != nil {
// ToDo remove this part after deprecation of corresponding API feature
if errors.Is(err, mongo.ErrNoDocuments) {
streamInfo := StreamInfo{Name: stream_name}
rec.Streams = append(rec.Streams, streamInfo)
return rec, nil
} else {
return rec, errors.New("fillInfoFromEarliestRecord: cannot extact timestamp " + err.Error())
}
}
mrec, err := getLastMessageRecord(db, db_name, stream_name)
if err != nil {
return StreamsRecord{}, errors.New("fillInfoFromLastRecord: cannot extract record, " + err.Error())
}
streamInfo = StreamInfo{
LastId: int64(mrec.ID),
Name: stream_name,
TimestampCreated: timestampCreated,
TimestampLast: int64(mrec.Timestamp)}
rec.Streams = append(rec.Streams, streamInfo)
return rec, nil
}