Skip to content
Snippets Groups Projects
Commit c5ac8b9a authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

Merge pull request #26 in ASAPO/asapo from feature_ha to develop

* commit '14361e5e': (23 commits)
  change receiver health-check to ps
  change some of producer and reciever messages from info to debug loglevel
  allow override files
  remove journal write concern from broker
  use newer mgo package
  getnext returns on error record number if succedded to update it
  journal write concernt a mongo restart test for broker
  use mongo without nomad for restart test
  journal write concern for receiver's mongo client, test to recover after mongo restart
  add workers to nomad job
  revert to single mongo instance
  sharded mongo
  started mongo sharding
  more nomad jobs, start logging
  dockerimages for go services
  always pull receiver image
  add receivers, Dockerfile for receiver
  fix curl
  fix curl
  more nomad jobs,more static libs, Dockerfile for receiver
  ...
parents 6b60ce68 14361e5e
No related branches found
No related tags found
2 merge requests!26Feature ha,!93Release 20.03.01
Showing
with 91 additions and 14 deletions
......@@ -126,3 +126,9 @@ discovery/pkg
common/go/pkg
authorizer/pkg
asapo_tools/pkg
#
*.rpm
......@@ -7,6 +7,7 @@ IF(WIN32)
ELSEIF(CMAKE_C_COMPILER_ID STREQUAL "GNU")
SET( CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libgcc -static-libstdc++")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall")
SET(BUILD_SHARED_LIBS OFF)
ENDIF(WIN32)
#TODO: Better way then GLOBAL PROPERTY
......@@ -72,6 +73,8 @@ add_subdirectory(authorizer)
add_subdirectory(asapo_tools)
add_subdirectory(deploy)
if(BUILD_INTEGRATION_TESTS)
add_subdirectory(tests)
......
......@@ -23,3 +23,20 @@ function(prepare_asapo)
endfunction()
macro(configure_files srcDir destDir)
message(STATUS "Configuring directory ${destDir}")
make_directory(${destDir})
file(GLOB templateFiles RELATIVE ${srcDir} ${srcDir}/*)
foreach(templateFile ${templateFiles})
set(srcTemplatePath ${srcDir}/${templateFile})
string(REGEX REPLACE "\\.in$" "" File ${templateFile})
if(NOT IS_DIRECTORY ${srcTemplatePath})
message(STATUS "Configuring file ${templateFile}")
configure_file(
${srcTemplatePath}
${destDir}/${File}
@ONLY)
endif(NOT IS_DIRECTORY ${srcTemplatePath})
endforeach(templateFile)
endmacro(configure_files)
......@@ -16,13 +16,14 @@ IF(WIN32)
ELSE()
set (gopath ${GOPATH}:${CMAKE_CURRENT_SOURCE_DIR}:${CMAKE_SOURCE_DIR}/common/go)
set (exe_name "${TARGET_NAME}")
# set (GO_OPTS "GOOS=linux;CGO_ENABLED=0")
ENDIF()
include(testing_go)
add_custom_target(asapo ALL
COMMAND ${CMAKE_COMMAND} -E env GOPATH=${gopath}
go build ${GO_OPTS} -o ${exe_name} asapo_tools/main
${GO_OPTS} go build -o ${exe_name} asapo_tools/main
VERBATIM)
define_property(TARGET PROPERTY EXENAME
BRIEF_DOCS <executable name>
......
......@@ -20,6 +20,8 @@ ENDIF()
include(testing_go)
configure_file(docker/Dockerfile . COPYONLY)
add_custom_target(asapo-authorizer ALL
COMMAND ${CMAKE_COMMAND} -E env GOPATH=${gopath}
go build ${GO_OPTS} -o ${exe_name} asapo_authorizer/main
......
FROM busybox:glibc
ADD asapo-authorizer /
CMD ["/asapo-authorizer","-config","/var/lib/authorizer/config.json"]
......@@ -16,6 +16,8 @@ func PrintUsage() {
func main() {
var fname = flag.String("config", "", "config file path")
log.SetSoucre("authorizer")
flag.Parse()
if *fname == "" {
PrintUsage()
......
......@@ -20,6 +20,8 @@ ENDIF()
include(testing_go)
configure_file(docker/Dockerfile . COPYONLY)
add_custom_target(asapo-broker ALL
COMMAND ${CMAKE_COMMAND} -E env GOPATH=${gopath}
go build ${GO_OPTS} -o ${exe_name} asapo_broker/main
......
FROM busybox:glibc
ADD asapo-broker /
CMD ["/asapo-broker","-config","/var/lib/broker/config.json"]
......@@ -3,11 +3,13 @@
package database
import (
"asapo_common/logger"
"asapo_common/utils"
"encoding/json"
"errors"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"strconv"
"sync"
"time"
)
......@@ -81,6 +83,8 @@ func (db *Mongodb) Connect(address string) (err error) {
return err
}
// db.session.SetSafe(&mgo.Safe{J: true})
if err := db.updateDatabaseList(); err != nil {
return err
}
......@@ -156,17 +160,18 @@ func (db *Mongodb) GetRecordByID(dbname string, id int) ([]byte, error) {
q := bson.M{"_id": id}
c := db.session.DB(dbname).C(data_collection_name)
err := c.Find(q).One(&res)
if err == mgo.ErrNotFound {
if err != nil {
var r = struct {
Id int `json:"id""`
}{id}
res, _ := json.Marshal(&r)
log_str := "error getting record id " + strconv.Itoa(id) + " for " + dbname + " : " + err.Error()
logger.Debug(log_str)
return nil, &DBError{utils.StatusNoData, string(res)}
}
if err != nil {
return nil, err
}
log_str := "got record id " + strconv.Itoa(id) + " for " + dbname
logger.Debug(log_str)
return utils.MapToJson(&res)
}
......@@ -237,8 +242,12 @@ func (db *Mongodb) GetNextRecord(db_name string) ([]byte, error) {
curPointer, err := db.getCurrentPointer(db_name)
if err != nil {
log_str := "error getting next pointer for " + db_name + ":" + err.Error()
logger.Debug(log_str)
return nil, err
}
log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + db_name
logger.Debug(log_str)
return db.GetRecordByID(db_name, curPointer.Value)
}
......@@ -21,6 +21,7 @@ func PrintUsage() {
func main() {
var fname = flag.String("config", "", "config file path")
log.SetSoucre("broker")
flag.Parse()
if *fname == "" {
PrintUsage()
......
......@@ -35,8 +35,15 @@ type request struct {
message string
}
func containsMatcher(substr string) func(str string) bool {
return func(str string) bool { return strings.Contains(str, substr) }
func containsMatcher(substrings ...string) func(str string) bool {
return func(str string) bool {
for _, substr := range substrings {
if !strings.Contains(str, substr) {
return false
}
}
return true
}
}
func doRequest(path string) *httptest.ResponseRecorder {
......
......@@ -11,7 +11,6 @@ message ("-- mongoc found version \"${MONGOC_STATIC_VERSION}\"")
message ("-- mongoc include path \"${MONGOC_STATIC_INCLUDE_DIRS}\"")
message ("-- mongoc libraries \"${MONGOC_STATIC_LIBRARIES}\"")
add_library(${TARGET_NAME} STATIC ${SOURCE_FILES} $<TARGET_OBJECTS:data_structs> $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:system_io>)
target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR}
PUBLIC "${MONGOC_STATIC_INCLUDE_DIRS}")
......
......@@ -47,6 +47,11 @@ void MongoDBClient::InitializeCollection(const string& database_name,
const string& collection_name) {
collection_ = mongoc_client_get_collection (client_, database_name.c_str(),
collection_name.c_str());
write_concern_ = mongoc_write_concern_new ();
mongoc_write_concern_set_w (write_concern_, MONGOC_WRITE_CONCERN_W_DEFAULT);
mongoc_write_concern_set_journal (write_concern_, true);
mongoc_collection_set_write_concern (collection_, write_concern_);
}
Error MongoDBClient::TryConnectDatabase() {
......@@ -82,6 +87,7 @@ string MongoDBClient::DBAddress(const string& address) const {
}
void MongoDBClient::CleanUp() {
mongoc_write_concern_destroy(write_concern_);
mongoc_collection_destroy (collection_);
mongoc_client_destroy (client_);
}
......@@ -105,7 +111,7 @@ Error MongoDBClient::InsertBsonDocument(const bson_p& document, bool ignore_dupl
if (mongo_err.code == MONGOC_ERROR_DUPLICATE_KEY) {
return ignore_duplicates ? nullptr : TextError(DBError::kDuplicateID);
}
return TextError(DBError::kInsertError);
return TextError(std::string(DBError::kInsertError) + " - " + mongo_err.message);
}
return nullptr;
......
......@@ -43,6 +43,7 @@ class MongoDBClient final : public Database {
private:
mongoc_client_t* client_{nullptr};
mongoc_collection_t* collection_{nullptr};
mongoc_write_concern_t* write_concern_;
bool connected_{false};
void CleanUp();
std::string DBAddress(const std::string& address) const;
......
......@@ -26,7 +26,7 @@ void SpdLogger::SetLogLevel(LogLevel level) {
}
}
std::string EncloseMsg(std::string msg) {
if (msg.find(":") == std::string::npos) {
if (msg.find("\"") != 0) {
return std::string(R"("message":")") + msg + "\"";
} else {
return msg;
......
......@@ -124,7 +124,7 @@ void asapo::SystemIO::CreateNewDirectory(const std::string& directory_name, Erro
Error SystemIO::WriteDataToFile(const std::string& fname, const uint8_t* data, size_t length) const {
Error err;
auto fd = Open(fname, IO_OPEN_MODE_CREATE_AND_FAIL_IF_EXISTS | IO_OPEN_MODE_RW, &err);
auto fd = Open(fname, IO_OPEN_MODE_CREATE | IO_OPEN_MODE_RW, &err);
if (err) {
return err;
}
......
......@@ -23,6 +23,7 @@ type Logger interface {
Warning(args ...interface{})
Error(args ...interface{})
SetLevel(level Level)
SetSource(source string)
}
var my_logger Logger = &logRusLogger{}
......@@ -51,6 +52,10 @@ func SetLevel(level Level) {
my_logger.SetLevel(level)
}
func SetSoucre(source string ){
my_logger.SetSource(source)
}
func LevelFromString(str string) (Level, error) {
switch strings.ToLower(str) {
case "debug":
......
......@@ -6,6 +6,11 @@ import (
type logRusLogger struct {
logger_entry *log.Entry
source string
}
func (l *logRusLogger) SetSource(source string) {
l.source = source
}
func (l *logRusLogger) entry() *log.Entry {
......@@ -23,7 +28,7 @@ func (l *logRusLogger) entry() *log.Entry {
log.SetFormatter(formatter)
l.logger_entry = log.WithFields(log.Fields{
"source": "discovery",
"source": l.source,
})
return l.logger_entry
......
......@@ -20,6 +20,11 @@ func UnsetMockLog() {
my_logger = &logRusLogger{}
}
func (l *MockLogger) SetSource(source string) {
l.Called(source)
return
}
func (l *MockLogger) Info(args ...interface{}) {
l.Called(args...)
return
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment