diff --git a/.gitignore b/.gitignore index 32dbba481b82ad0ce03caea4c4fc4e99270a13aa..1e78577332eab730f22156280545fc4f0484928f 100644 --- a/.gitignore +++ b/.gitignore @@ -126,3 +126,9 @@ discovery/pkg common/go/pkg authorizer/pkg asapo_tools/pkg + + + +# +*.rpm + diff --git a/CMakeLists.txt b/CMakeLists.txt index 31dc962d885da0880e1b54fdfaabf7655074bfbc..7c32a77ca020ad1bc7d84c1469cc24aee94ec227 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,6 +7,7 @@ IF(WIN32) ELSEIF(CMAKE_C_COMPILER_ID STREQUAL "GNU") SET( CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libgcc -static-libstdc++") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall") + SET(BUILD_SHARED_LIBS OFF) ENDIF(WIN32) #TODO: Better way then GLOBAL PROPERTY @@ -72,6 +73,8 @@ add_subdirectory(authorizer) add_subdirectory(asapo_tools) +add_subdirectory(deploy) + if(BUILD_INTEGRATION_TESTS) add_subdirectory(tests) diff --git a/CMakeModules/prepare_asapo.cmake b/CMakeModules/prepare_asapo.cmake index 86fbf0e39fbdd4633972097af471d582ee05a95b..17681e5bd0645f497429addf73575c4728c531d6 100644 --- a/CMakeModules/prepare_asapo.cmake +++ b/CMakeModules/prepare_asapo.cmake @@ -23,3 +23,20 @@ function(prepare_asapo) endfunction() +macro(configure_files srcDir destDir) + message(STATUS "Configuring directory ${destDir}") + make_directory(${destDir}) + + file(GLOB templateFiles RELATIVE ${srcDir} ${srcDir}/*) + foreach(templateFile ${templateFiles}) + set(srcTemplatePath ${srcDir}/${templateFile}) + string(REGEX REPLACE "\\.in$" "" File ${templateFile}) + if(NOT IS_DIRECTORY ${srcTemplatePath}) + message(STATUS "Configuring file ${templateFile}") + configure_file( + ${srcTemplatePath} + ${destDir}/${File} + @ONLY) + endif(NOT IS_DIRECTORY ${srcTemplatePath}) + endforeach(templateFile) +endmacro(configure_files) diff --git a/asapo_tools/CMakeLists.txt b/asapo_tools/CMakeLists.txt index 2a517df18b3ffb1417721dd797003b0fabb53fbf..de647a4f819390df275048a7adcec7a0e3ecbc53 100644 --- a/asapo_tools/CMakeLists.txt +++ b/asapo_tools/CMakeLists.txt @@ -16,13 +16,14 @@ IF(WIN32) ELSE() set (gopath ${GOPATH}:${CMAKE_CURRENT_SOURCE_DIR}:${CMAKE_SOURCE_DIR}/common/go) set (exe_name "${TARGET_NAME}") +# set (GO_OPTS "GOOS=linux;CGO_ENABLED=0") ENDIF() include(testing_go) add_custom_target(asapo ALL COMMAND ${CMAKE_COMMAND} -E env GOPATH=${gopath} - go build ${GO_OPTS} -o ${exe_name} asapo_tools/main + ${GO_OPTS} go build -o ${exe_name} asapo_tools/main VERBATIM) define_property(TARGET PROPERTY EXENAME BRIEF_DOCS <executable name> diff --git a/authorizer/CMakeLists.txt b/authorizer/CMakeLists.txt index 1b4b8165a9541757d1087ae9a41b2f4ea205d2b8..a6cd2b3c79b20c30b09bf6af01b206e5999d2276 100644 --- a/authorizer/CMakeLists.txt +++ b/authorizer/CMakeLists.txt @@ -20,6 +20,8 @@ ENDIF() include(testing_go) +configure_file(docker/Dockerfile . COPYONLY) + add_custom_target(asapo-authorizer ALL COMMAND ${CMAKE_COMMAND} -E env GOPATH=${gopath} go build ${GO_OPTS} -o ${exe_name} asapo_authorizer/main diff --git a/authorizer/docker/Dockerfile b/authorizer/docker/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..6b182a10254e736e71c7d8d07cda92800b469ba3 --- /dev/null +++ b/authorizer/docker/Dockerfile @@ -0,0 +1,3 @@ +FROM busybox:glibc +ADD asapo-authorizer / +CMD ["/asapo-authorizer","-config","/var/lib/authorizer/config.json"] diff --git a/authorizer/src/asapo_authorizer/main/authorizer.go b/authorizer/src/asapo_authorizer/main/authorizer.go index 1a0f39f42406a0412735911b9a717d01fba70bd6..0916ddbed565ec670066af8df0720a69e413a414 100644 --- a/authorizer/src/asapo_authorizer/main/authorizer.go +++ b/authorizer/src/asapo_authorizer/main/authorizer.go @@ -16,6 +16,8 @@ func PrintUsage() { func main() { var fname = flag.String("config", "", "config file path") + log.SetSoucre("authorizer") + flag.Parse() if *fname == "" { PrintUsage() diff --git a/broker/CMakeLists.txt b/broker/CMakeLists.txt index 49014ff5d141594888f0e713310d1b6ece1942fe..c010ae09f84a4e794ebe0524c29b56e3b20b730a 100644 --- a/broker/CMakeLists.txt +++ b/broker/CMakeLists.txt @@ -20,6 +20,8 @@ ENDIF() include(testing_go) +configure_file(docker/Dockerfile . COPYONLY) + add_custom_target(asapo-broker ALL COMMAND ${CMAKE_COMMAND} -E env GOPATH=${gopath} go build ${GO_OPTS} -o ${exe_name} asapo_broker/main diff --git a/broker/docker/Dockerfile b/broker/docker/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..9efde1c7775936b4f74e9ba858895a6369558188 --- /dev/null +++ b/broker/docker/Dockerfile @@ -0,0 +1,3 @@ +FROM busybox:glibc +ADD asapo-broker / +CMD ["/asapo-broker","-config","/var/lib/broker/config.json"] diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index de41e4f90a062beec6aea811e6fd436c41a7beb2..b020d2a3a307f72778098a72beed10c1a3c8a273 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -3,11 +3,13 @@ package database import ( + "asapo_common/logger" "asapo_common/utils" "encoding/json" "errors" - "gopkg.in/mgo.v2" - "gopkg.in/mgo.v2/bson" + "github.com/globalsign/mgo" + "github.com/globalsign/mgo/bson" + "strconv" "sync" "time" ) @@ -81,6 +83,8 @@ func (db *Mongodb) Connect(address string) (err error) { return err } + // db.session.SetSafe(&mgo.Safe{J: true}) + if err := db.updateDatabaseList(); err != nil { return err } @@ -156,17 +160,18 @@ func (db *Mongodb) GetRecordByID(dbname string, id int) ([]byte, error) { q := bson.M{"_id": id} c := db.session.DB(dbname).C(data_collection_name) err := c.Find(q).One(&res) - if err == mgo.ErrNotFound { + if err != nil { var r = struct { Id int `json:"id""` }{id} res, _ := json.Marshal(&r) + log_str := "error getting record id " + strconv.Itoa(id) + " for " + dbname + " : " + err.Error() + logger.Debug(log_str) return nil, &DBError{utils.StatusNoData, string(res)} } - if err != nil { - return nil, err - } + log_str := "got record id " + strconv.Itoa(id) + " for " + dbname + logger.Debug(log_str) return utils.MapToJson(&res) } @@ -237,8 +242,12 @@ func (db *Mongodb) GetNextRecord(db_name string) ([]byte, error) { curPointer, err := db.getCurrentPointer(db_name) if err != nil { + log_str := "error getting next pointer for " + db_name + ":" + err.Error() + logger.Debug(log_str) return nil, err } + log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + db_name + logger.Debug(log_str) return db.GetRecordByID(db_name, curPointer.Value) } diff --git a/broker/src/asapo_broker/main/broker.go b/broker/src/asapo_broker/main/broker.go index 30a017728a8aaeede6519cdd6b525e5de47e1f7f..e4b9c6e742c5d1f865c8b3c204aedc438926ed85 100644 --- a/broker/src/asapo_broker/main/broker.go +++ b/broker/src/asapo_broker/main/broker.go @@ -21,6 +21,7 @@ func PrintUsage() { func main() { var fname = flag.String("config", "", "config file path") + log.SetSoucre("broker") flag.Parse() if *fname == "" { PrintUsage() diff --git a/broker/src/asapo_broker/server/get_next_test.go b/broker/src/asapo_broker/server/get_next_test.go index 268390973765572c78bcc5dd1de4c2b7a7b87f36..72693797db667131bbd22b823e38b6a4b56d4e3c 100644 --- a/broker/src/asapo_broker/server/get_next_test.go +++ b/broker/src/asapo_broker/server/get_next_test.go @@ -35,8 +35,15 @@ type request struct { message string } -func containsMatcher(substr string) func(str string) bool { - return func(str string) bool { return strings.Contains(str, substr) } +func containsMatcher(substrings ...string) func(str string) bool { + return func(str string) bool { + for _, substr := range substrings { + if !strings.Contains(str, substr) { + return false + } + } + return true + } } func doRequest(path string) *httptest.ResponseRecorder { diff --git a/common/cpp/src/database/CMakeLists.txt b/common/cpp/src/database/CMakeLists.txt index 5467be48c86fca1899439167f56620cc824d487c..30ebedb96509db3002438899291c0a8a77fbc71e 100644 --- a/common/cpp/src/database/CMakeLists.txt +++ b/common/cpp/src/database/CMakeLists.txt @@ -11,7 +11,6 @@ message ("-- mongoc found version \"${MONGOC_STATIC_VERSION}\"") message ("-- mongoc include path \"${MONGOC_STATIC_INCLUDE_DIRS}\"") message ("-- mongoc libraries \"${MONGOC_STATIC_LIBRARIES}\"") - add_library(${TARGET_NAME} STATIC ${SOURCE_FILES} $<TARGET_OBJECTS:data_structs> $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:system_io>) target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR} PUBLIC "${MONGOC_STATIC_INCLUDE_DIRS}") diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp index 6acdca6d9ec8df9d46f87ecc919fd214344e7efb..6bd22a3f7501f2c9340e40f09787007e8444db63 100644 --- a/common/cpp/src/database/mongodb_client.cpp +++ b/common/cpp/src/database/mongodb_client.cpp @@ -47,6 +47,11 @@ void MongoDBClient::InitializeCollection(const string& database_name, const string& collection_name) { collection_ = mongoc_client_get_collection (client_, database_name.c_str(), collection_name.c_str()); + + write_concern_ = mongoc_write_concern_new (); + mongoc_write_concern_set_w (write_concern_, MONGOC_WRITE_CONCERN_W_DEFAULT); + mongoc_write_concern_set_journal (write_concern_, true); + mongoc_collection_set_write_concern (collection_, write_concern_); } Error MongoDBClient::TryConnectDatabase() { @@ -82,6 +87,7 @@ string MongoDBClient::DBAddress(const string& address) const { } void MongoDBClient::CleanUp() { + mongoc_write_concern_destroy(write_concern_); mongoc_collection_destroy (collection_); mongoc_client_destroy (client_); } @@ -105,7 +111,7 @@ Error MongoDBClient::InsertBsonDocument(const bson_p& document, bool ignore_dupl if (mongo_err.code == MONGOC_ERROR_DUPLICATE_KEY) { return ignore_duplicates ? nullptr : TextError(DBError::kDuplicateID); } - return TextError(DBError::kInsertError); + return TextError(std::string(DBError::kInsertError) + " - " + mongo_err.message); } return nullptr; diff --git a/common/cpp/src/database/mongodb_client.h b/common/cpp/src/database/mongodb_client.h index 7d9d38c08bbd10e84d4b5b6c1649fb95bfb4f017..8ce6ec965cdc63e03c47cc4b14f594ee905e981d 100644 --- a/common/cpp/src/database/mongodb_client.h +++ b/common/cpp/src/database/mongodb_client.h @@ -43,6 +43,7 @@ class MongoDBClient final : public Database { private: mongoc_client_t* client_{nullptr}; mongoc_collection_t* collection_{nullptr}; + mongoc_write_concern_t* write_concern_; bool connected_{false}; void CleanUp(); std::string DBAddress(const std::string& address) const; diff --git a/common/cpp/src/logger/spd_logger.cpp b/common/cpp/src/logger/spd_logger.cpp index 4b66a759cb36d2dc56f156464ee49cfb4b83ecb9..9228b8f6104db2154d7c17718f2c903209c23b9e 100644 --- a/common/cpp/src/logger/spd_logger.cpp +++ b/common/cpp/src/logger/spd_logger.cpp @@ -26,7 +26,7 @@ void SpdLogger::SetLogLevel(LogLevel level) { } } std::string EncloseMsg(std::string msg) { - if (msg.find(":") == std::string::npos) { + if (msg.find("\"") != 0) { return std::string(R"("message":")") + msg + "\""; } else { return msg; diff --git a/common/cpp/src/system_io/system_io.cpp b/common/cpp/src/system_io/system_io.cpp index f8dc19c3148cdb603fa584dd4d41e5bd62ae8266..630685b381299c54e99fe4ac2fc776ced2ba9e1e 100644 --- a/common/cpp/src/system_io/system_io.cpp +++ b/common/cpp/src/system_io/system_io.cpp @@ -124,7 +124,7 @@ void asapo::SystemIO::CreateNewDirectory(const std::string& directory_name, Erro Error SystemIO::WriteDataToFile(const std::string& fname, const uint8_t* data, size_t length) const { Error err; - auto fd = Open(fname, IO_OPEN_MODE_CREATE_AND_FAIL_IF_EXISTS | IO_OPEN_MODE_RW, &err); + auto fd = Open(fname, IO_OPEN_MODE_CREATE | IO_OPEN_MODE_RW, &err); if (err) { return err; } diff --git a/common/go/src/asapo_common/logger/logger.go b/common/go/src/asapo_common/logger/logger.go index e198cd23d555819c34ddb9da020909e987653bc0..0f026aa18b9e777a1b72089ca32d3685b97cf6f0 100644 --- a/common/go/src/asapo_common/logger/logger.go +++ b/common/go/src/asapo_common/logger/logger.go @@ -23,6 +23,7 @@ type Logger interface { Warning(args ...interface{}) Error(args ...interface{}) SetLevel(level Level) + SetSource(source string) } var my_logger Logger = &logRusLogger{} @@ -51,6 +52,10 @@ func SetLevel(level Level) { my_logger.SetLevel(level) } +func SetSoucre(source string ){ + my_logger.SetSource(source) +} + func LevelFromString(str string) (Level, error) { switch strings.ToLower(str) { case "debug": diff --git a/common/go/src/asapo_common/logger/logrus_logger.go b/common/go/src/asapo_common/logger/logrus_logger.go index 750c37e9c134850ad80caf324283854c3ca0af86..4625f27492f47efe5d14ed1a1032b4810c418572 100644 --- a/common/go/src/asapo_common/logger/logrus_logger.go +++ b/common/go/src/asapo_common/logger/logrus_logger.go @@ -6,6 +6,11 @@ import ( type logRusLogger struct { logger_entry *log.Entry + source string +} + +func (l *logRusLogger) SetSource(source string) { + l.source = source } func (l *logRusLogger) entry() *log.Entry { @@ -23,7 +28,7 @@ func (l *logRusLogger) entry() *log.Entry { log.SetFormatter(formatter) l.logger_entry = log.WithFields(log.Fields{ - "source": "discovery", + "source": l.source, }) return l.logger_entry diff --git a/common/go/src/asapo_common/logger/mock_logger.go b/common/go/src/asapo_common/logger/mock_logger.go index 0e597978bcb3be1739f6beb7eb9620e3d636b9a5..484b86cb0175db0e801cc3e11cf42592ecef123a 100644 --- a/common/go/src/asapo_common/logger/mock_logger.go +++ b/common/go/src/asapo_common/logger/mock_logger.go @@ -20,6 +20,11 @@ func UnsetMockLog() { my_logger = &logRusLogger{} } +func (l *MockLogger) SetSource(source string) { + l.Called(source) + return +} + func (l *MockLogger) Info(args ...interface{}) { l.Called(args...) return diff --git a/config/nomad/broker.nmd.in b/config/nomad/broker.nmd.in index 211c71e21a640256e1b4aea1f334eed4bed3fa45..c0032fca94faf877999ea092142d98eed5d08e99 100644 --- a/config/nomad/broker.nmd.in +++ b/config/nomad/broker.nmd.in @@ -25,7 +25,7 @@ job "broker" { } service { - name = "broker" + name = "asapo-broker" port = "broker" check { name = "alive" diff --git a/config/nomad/receiver.nmd.in b/config/nomad/receiver.nmd.in index 1a9a6e893d1c6559a95ec3150e09509357c85e66..9642abb2314c77eb557a070c5e37ea1ef4dc30cd 100644 --- a/config/nomad/receiver.nmd.in +++ b/config/nomad/receiver.nmd.in @@ -23,7 +23,7 @@ job "receiver" { } service { - name = "receiver" + name = "asapo-receiver" port = "recv" check { name = "alive" diff --git a/deploy/CMakeLists.txt b/deploy/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..6eb356f10b81ddcd2d53c343fcceeb240765cf7b --- /dev/null +++ b/deploy/CMakeLists.txt @@ -0,0 +1,11 @@ +if(DEFINED ENV{INSTALLPATH_NOMAD_JOBS}) + SET (NOMAD_INSTALL $ENV{INSTALLPATH_NOMAD_JOBS}/nomad_jobs) +else() + SET (NOMAD_INSTALL ${CMAKE_INSTALL_PREFIX}/nomad_jobs) +endif() + +configure_files(${CMAKE_CURRENT_SOURCE_DIR}/nomad_jobs ${CMAKE_CURRENT_BINARY_DIR}/nomad_jobs) + + +install(DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/nomad_jobs/ DESTINATION ${NOMAD_INSTALL}) + diff --git a/deploy/nomad_consul/CMakeLists.txt b/deploy/nomad_consul/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..3ae6016eabce64b43cd65a03a17360f297db7109 --- /dev/null +++ b/deploy/nomad_consul/CMakeLists.txt @@ -0,0 +1,42 @@ +cmake_minimum_required(VERSION 3.5) + +PROJECT(ASAPO_HA) + +add_subdirectory (Packages) + +INCLUDE(ExternalProject) +ExternalProject_Add( + consul + URL https://releases.hashicorp.com/consul/1.2.0/consul_1.2.0_linux_amd64.zip + PATCH_COMMAND "" + CONFIGURE_COMMAND "" + CMAKE_COMMAND "" + BUILD_IN_SOURCE 1 + BUILD_COMMAND "" + INSTALL_COMMAND "" +) + +ExternalProject_Add( + nomad + URL https://releases.hashicorp.com/nomad/0.8.4/nomad_0.8.4_linux_amd64.zip + PATCH_COMMAND "" + CONFIGURE_COMMAND "" + CMAKE_COMMAND "" + BUILD_IN_SOURCE 1 + BUILD_COMMAND "" + INSTALL_COMMAND "" +) + + +ExternalProject_Get_Property(consul SOURCE_DIR) +set (CONSUL_SOURCE_DIR ${SOURCE_DIR}) +ExternalProject_Get_Property(nomad SOURCE_DIR) +set (NOMAD_SOURCE_DIR ${SOURCE_DIR}) + +install(PROGRAMS ${CONSUL_SOURCE_DIR}/consul DESTINATION bin) +install(PROGRAMS ${NOMAD_SOURCE_DIR}/nomad DESTINATION bin) + +install(PROGRAMS ${NOMAD_SOURCE_DIR}/nomad DESTINATION bin) + +install(DIRECTORY ${CMAKE_SOURCE_DIR}/usr/lib/systemd/system/ + DESTINATION /usr/lib/systemd/system) \ No newline at end of file diff --git a/deploy/nomad_consul/Packages/CMakeLists.txt b/deploy/nomad_consul/Packages/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..fe7829db5fd63c8ec27893ead4f27320515d8e16 --- /dev/null +++ b/deploy/nomad_consul/Packages/CMakeLists.txt @@ -0,0 +1,29 @@ +# create debian package + +SET(CPACK_BINARY_DEB ON) +SET(CPACK_BINARY_RPM ON) + +#SET (CPACK_DEBIAN_PACKAGE_MAINTAINER "Sergey Yakubov (sergey.yakubov@desy.de)") +#SET(CPACK_DEBIAN_PACKAGE_ARCHITECTURE "amd64") + +SET (CPACK_PACKAGE_CONTACT "Sergey Yakubov (sergey.yakubov@desy.de)") +SET(CPACK_PACKAGE_ARCHITECTURE "amd64") + + +SET(CPACK_PACKAGE_NAME "asapo-ha") +SET(CPACK_PACKAGE_VENDOR "DESY") +SET(CPACK_PACKAGE_VERSION_MAJOR "1") +SET(CPACK_PACKAGE_VERSION_MINOR "2") +SET(CPACK_PACKAGE_VERSION_PATCH "3") + +SET(CPACK_PACKAGE_DESCRIPTION_SUMMARY "Nomad and Consul for Asapo Debian package") + +SET(CPACK_BINARY_STGZ OFF) +SET(CPACK_BINARY_TBZ2 OFF) +SET(CPACK_BINARY_TGZ OFF) +SET(CPACK_BINARY_TZ OFF) +SET(CPACK_RPM_PRE_INSTALL_SCRIPT_FILE "${CMAKE_CURRENT_SOURCE_DIR}/postinst") +set(CPACK_DEBIAN_PACKAGE_CONTROL_EXTRA "${CMAKE_CURRENT_SOURCE_DIR}/postinst") + +INCLUDE(CPack) + diff --git a/deploy/nomad_consul/Packages/postinst b/deploy/nomad_consul/Packages/postinst new file mode 100755 index 0000000000000000000000000000000000000000..54d8d8ac79b08cc80d60e25b421eac467919f318 --- /dev/null +++ b/deploy/nomad_consul/Packages/postinst @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +useradd -r -s /bin/false asapo || true diff --git a/deploy/nomad_consul/usr/lib/systemd/system/consul.service b/deploy/nomad_consul/usr/lib/systemd/system/consul.service new file mode 100644 index 0000000000000000000000000000000000000000..4c0b1e24d773eb4d0354656b2149afd346d37705 --- /dev/null +++ b/deploy/nomad_consul/usr/lib/systemd/system/consul.service @@ -0,0 +1,19 @@ +[Unit] +Description=consul agent +Requires=network-online.target +After=network-online.target +Documentation=https://consul.io/docs/ + +[Service] +User=asapo +Group=asapo +Environment=GOMAXPROCS=4 +Restart=on-failure +ExecStart=/usr/bin/consul agent -config-dir=/etc/consul.d +ExecReload=/bin/kill -HUP $MAINPID +KillSignal=SIGINT +AmbientCapabilities=CAP_NET_BIND_SERVICE + +[Install] +WantedBy=multi-user.target + diff --git a/deploy/nomad_consul/usr/lib/systemd/system/nomad.service b/deploy/nomad_consul/usr/lib/systemd/system/nomad.service new file mode 100644 index 0000000000000000000000000000000000000000..624f441526389071c2690888486ee31b731a9287 --- /dev/null +++ b/deploy/nomad_consul/usr/lib/systemd/system/nomad.service @@ -0,0 +1,19 @@ +[Unit] +Description=nomad agent +Requires=network-online.target +After=network-online.target +Documentation=https://nomadproject.io/docs/ +After=consul.service + +[Service] +User=asapo +Group=asapo +Environment=GOMAXPROCS=4 +Restart=on-failure +ExecStart=/usr/bin/nomad agent -config=/etc/nomad.d +ExecReload=/bin/kill -HUP $MAINPID +KillMode=process +KillSignal=SIGINT + +[Install] +WantedBy=multi-user.target diff --git a/deploy/nomad_jobs/asapo-brokers.nmd.in b/deploy/nomad_jobs/asapo-brokers.nmd.in new file mode 100644 index 0000000000000000000000000000000000000000..e601184374f7d8a1085bfdc7800cddfc2a1c3d0d --- /dev/null +++ b/deploy/nomad_jobs/asapo-brokers.nmd.in @@ -0,0 +1,76 @@ +job "asapo-brokers" { + datacenters = ["dc1"] + + update { + max_parallel = 1 + min_healthy_time = "10s" + healthy_deadline = "3m" + auto_revert = false + } + + group "brokers" { + count = 1 + + restart { + attempts = 2 + interval = "3m" + delay = "15s" + mode = "fail" + } + + task "brokers" { + driver = "docker" + + config { + network_mode = "host" + dns_servers = ["127.0.0.1"] + image = "yakser/asapo-broker:feature_ha" + force_pull = true + volumes = ["local/config.json:/var/lib/broker/config.json"] + logging { + type = "fluentd" + config { + fluentd-address = "localhost:9881" + fluentd-async-connect = true + tag = "asapo.docker" + } + } + } + + resources { + network { + port "broker" {} + } + } + + service { + port = "broker" + name = "asapo-broker" + check { + name = "asapo-broker-alive" + type = "http" + path = "/health" + interval = "10s" + timeout = "2s" + } + check_restart { + limit = 2 + grace = "90s" + ignore_warnings = false + } + } + + template { + source = "@NOMAD_INSTALL@/broker.json.tpl" + destination = "local/config.json" + change_mode = "restart" + } + + template { + source = "@NOMAD_INSTALL@/broker_secret.key" + destination = "secrets/secret.key" + change_mode = "restart" + } + } #task brokers + } +} diff --git a/deploy/nomad_jobs/asapo-logging.nmd.in b/deploy/nomad_jobs/asapo-logging.nmd.in new file mode 100644 index 0000000000000000000000000000000000000000..e7c9cdfc11f030338b7b39e5a29d4fb9f3062025 --- /dev/null +++ b/deploy/nomad_jobs/asapo-logging.nmd.in @@ -0,0 +1,179 @@ +job "asapo-logging" { + datacenters = ["dc1"] + +# update { +# max_parallel = 1 +# min_healthy_time = "10s" +# healthy_deadline = "3m" +# auto_revert = false +# } + + group "fluentd" { + count = 1 + restart { + attempts = 2 + interval = "3m" + delay = "15s" + mode = "delay" + } + + task "fluentd" { + driver = "docker" + + meta { + change_me_to_restart = 1 + } + + config { + dns_servers = ["127.0.0.1"] + network_mode = "host" + image = "yakser/fluentd_elastic" + volumes = ["local/fluentd.conf:/fluentd/etc/fluent.conf", + "/${meta.shared_storage}/fluentd:/shared"] + } + + resources { + cpu = 500 + memory = 256 + network { + mbits = 10 + port "fluentd" { + static = 9880 + } + } + } + + service { + port = "fluentd" + name = "fluentd" + check { + name = "alive" + type = "script" + command = "/bin/pidof" + args = ["ruby2.3"] + timeout = "2s" + interval = "10s" + } + check_restart { + limit = 2 + grace = "15s" + ignore_warnings = false + } + } + template { + source = "@NOMAD_INSTALL@/fluentd.conf" + destination = "local/fluentd.conf" + change_mode = "restart" + } + } + } +#elasticsearch + group "elk" { + count = 1 + restart { + attempts = 2 + interval = "3m" + delay = "15s" + mode = "delay" + } + + task "elasticsearch" { + driver = "docker" + + env { + bootstrap.memory_lock = "true" + cluster.name = "asapo-logging" + ES_JAVA_OPTS = "-Xms512m -Xmx512m" + } + + config { + ulimit { + memlock = "-1:-1" + nofile = "65536:65536" + nproc = "8192" + } + network_mode = "host" + dns_servers = ["127.0.0.1"] + image = "docker.elastic.co/elasticsearch/elasticsearch:6.3.0" + volumes = ["/${meta.shared_storage}/esdatadir:/usr/share/elasticsearch/data"] + } + + resources { + #MHz + cpu = 4000 + #MB + memory = 2048 + network { + mbits = 10 + port "elasticsearch" { + static = 9200 + } + } + } + + service { + port = "elasticsearch" + name = "elasticsearch" + check { + name = "alive" + type = "http" + path = "/_cluster/health" + interval = "10s" + timeout = "1s" + } + check_restart { + limit = 2 + grace = "90s" + ignore_warnings = false + } + } + } +#kibana + task "kibana" { + driver = "docker" + + config { + network_mode = "host" + dns_servers = ["127.0.0.1"] + image = "docker.elastic.co/kibana/kibana:6.3.0" + volumes = ["local/kibana.yml:/usr/share/kibana/config/kibana.yml"] + } + + template { + source = "@NOMAD_INSTALL@/kibana.yml" + destination = "local/kibana.yml" + change_mode = "restart" + } + + resources { + cpu = 256 + memory = 1024 + network { + mbits = 10 + port "kibana" { + static = 5601 + } + } + } + + service { + port = "kibana" + name = "kibana" + check { + name = "alive" + type = "http" + path = "/logsview" + interval = "10s" + timeout = "1s" + } + check_restart { + limit = 2 + grace = "90s" + ignore_warnings = false + } + } + } + + } + +} diff --git a/deploy/nomad_jobs/asapo-mongo.nmd.in b/deploy/nomad_jobs/asapo-mongo.nmd.in new file mode 100644 index 0000000000000000000000000000000000000000..be05168e3d880c9d15c607985853409f78cb1e11 --- /dev/null +++ b/deploy/nomad_jobs/asapo-mongo.nmd.in @@ -0,0 +1,59 @@ +job "asapo-mongo" { + datacenters = ["dc1"] + + update { + max_parallel = 1 + min_healthy_time = "10s" + healthy_deadline = "3m" + auto_revert = false + } + + group "mongo" { + count = 1 + + restart { + attempts = 2 + interval = "3m" + delay = "15s" + mode = "delay" + } + + task "mongo" { + driver = "docker" + + config { + network_mode = "host" + image = "mongo:4.0.0" + volumes = ["/${meta.shared_storage}/mongodb:/data/db"] + } + + resources { + cpu = 1500 + memory = 12560 + network { + port "mongo" { + static = 27017 + } + } + } + + service { + port = "mongo" + name = "mongo" + check { + type = "script" + name = "alive" + command = "mongo" + args = ["--eval","db.version()"] + interval = "10s" + timeout = "5s" + } + check_restart { + limit = 2 + grace = "90s" + ignore_warnings = false + } + } + } + } +} diff --git a/deploy/nomad_jobs/asapo-nginx.nmd.in b/deploy/nomad_jobs/asapo-nginx.nmd.in new file mode 100644 index 0000000000000000000000000000000000000000..b9455b414b9eca5a741dd2ff528b5eefb7a3ee18 --- /dev/null +++ b/deploy/nomad_jobs/asapo-nginx.nmd.in @@ -0,0 +1,70 @@ +job "asapo-nginx" { + datacenters = ["dc1"] + + type = "system" + +# update { +# max_parallel = 1 +# min_healthy_time = "10s" +# healthy_deadline = "3m" +# auto_revert = false +# } + + group "nginx" { + count = 1 + + restart { + attempts = 2 + interval = "3m" + delay = "15s" + mode = "delay" + } + + task "nginx" { + driver = "docker" + + config { + network_mode = "host" + image = "nginx:1.14" + volumes = ["local/nginx.conf:/etc/nginx/nginx.conf"] + } + + resources { + cpu = 500 + memory = 256 + network { + mbits = 10 + port "nginx" { + static = 8400 + } + } + } + + service { + port = "nginx" + name = "nginx" + check { + name = "alive" + type = "http" + path = "/nginx-health" + timeout = "2s" + interval = "10s" + } + + check_restart { + limit = 2 + grace = "15s" + ignore_warnings = false + } + } + + template { + source = "@NOMAD_INSTALL@/nginx.conf.tpl" + destination = "local/nginx.conf" + change_mode = "restart" + } + + + } + } +} diff --git a/deploy/nomad_jobs/asapo-perfmetrics.nmd.in b/deploy/nomad_jobs/asapo-perfmetrics.nmd.in new file mode 100644 index 0000000000000000000000000000000000000000..bb8db1b2ba4aa1b8254915d17de74f5124de1e91 --- /dev/null +++ b/deploy/nomad_jobs/asapo-perfmetrics.nmd.in @@ -0,0 +1,108 @@ +job "asapo-perfmetrics" { + datacenters = ["dc1"] + +# update { +# max_parallel = 1 +# min_healthy_time = "10s" +# healthy_deadline = "3m" +# auto_revert = false +# } + + group "perfmetrics" { + count = 1 + restart { + attempts = 2 + interval = "3m" + delay = "15s" + mode = "delay" + } + + task "influxdb" { + driver = "docker" + + config { + dns_servers = ["127.0.0.1"] + network_mode = "host" + image = "influxdb" + volumes = ["/${meta.shared_storage}/influxdb:/var/lib/influxdb"] + } + + resources { + cpu = 1500 + memory = 32000 + network { + mbits = 10 + port "influxdb" { + static = 8086 + } + } + } + + service { + port = "influxdb" + name = "influxdb" + check { + name = "alive" + type = "http" + path = "/ping" + interval = "10s" + timeout = "1s" + } + check_restart { + limit = 2 + grace = "90s" + ignore_warnings = false + } + } + + } #influxdb + + + task "grafana" { + driver = "docker" + + env { + GF_SERVER_DOMAIN = "${attr.unique.hostname}" + GF_SERVER_ROOT_URL = "%(protocol)s://%(domain)s/performance/" + } + + config { + dns_servers = ["127.0.0.1"] + network_mode = "host" + image = "grafana/grafana" + volumes = ["/${meta.shared_storage}/grafana:/var/lib/grafana"] + } + + resources { + cpu = 1500 + memory = 2560 + network { + mbits = 10 + port "grafana" { + static = 3000 + } + } + } + + service { + port = "grafana" + name = "grafana" + check { + name = "alive" + type = "http" + path = "/api/health" + interval = "10s" + timeout = "1s" + } + check_restart { + limit = 2 + grace = "90s" + ignore_warnings = false + } + } + + } #grafana + + + } +} diff --git a/deploy/nomad_jobs/asapo-receivers.nmd.in b/deploy/nomad_jobs/asapo-receivers.nmd.in new file mode 100644 index 0000000000000000000000000000000000000000..8d58581771321d3fe839dcc85f417ec0800e0378 --- /dev/null +++ b/deploy/nomad_jobs/asapo-receivers.nmd.in @@ -0,0 +1,72 @@ +job "asapo-receivers" { + datacenters = ["dc1"] + + update { + max_parallel = 1 + min_healthy_time = "10s" + healthy_deadline = "3m" + auto_revert = false + } + + group "receivers" { + count = 1 + + restart { + attempts = 2 + interval = "3m" + delay = "15s" + mode = "fail" + } + + task "receivers" { + driver = "docker" + + config { + network_mode = "host" + dns_servers = ["127.0.0.1"] + image = "yakser/asapo-receiver:feature_ha" + force_pull = true + volumes = ["local/config.json:/var/lib/receiver/config.json", + "/bldocuments/support/asapo/data:/var/lib/receiver/data"] + logging { + type = "fluentd" + config { + fluentd-address = "localhost:9881" + tag = "asapo.docker" + } + } + } + + resources { + network { + port "recv" {} + } + } + + service { + name = "asapo-receiver" + port = "recv" + check { + name = "asapo-receiver-alive" + type = "script" + command = "/bin/ps" + args = ["-fC","receiver"] + interval = "10s" + timeout = "2s" + } + check_restart { + limit = 2 + grace = "15s" + ignore_warnings = false + } + } + + template { + source = "@NOMAD_INSTALL@/receiver.json.tpl" + destination = "local/config.json" + change_mode = "restart" + } + } #task receivers + } +} + diff --git a/deploy/nomad_jobs/asapo-services.nmd.in b/deploy/nomad_jobs/asapo-services.nmd.in new file mode 100644 index 0000000000000000000000000000000000000000..3e31d425cfd06563a69c4f73a6e64805dbef9f52 --- /dev/null +++ b/deploy/nomad_jobs/asapo-services.nmd.in @@ -0,0 +1,123 @@ +job "asapo-services" { + datacenters = ["dc1"] + + type = "service" + + group "asapo-authorizer" { + count = 1 + + task "asapo-authorizer" { + driver = "docker" + + config { + network_mode = "host" + dns_servers = ["127.0.0.1"] + image = "yakser/asapo-authorizer:feature_ha" + force_pull = true + volumes = ["local/config.json:/var/lib/authorizer/config.json", + "/bldocuments/support/asapo/beamtime_beamline_mapping.txt:/var/lib/authorizer/beamtime_beamline_mapping.txt", + "/bldocuments/support/asapo/ip_beamtime_mapping:/var/lib/authorizer/ip_beamtime_mapping"] + logging { + type = "fluentd" + config { + fluentd-address = "localhost:9881" + fluentd-async-connect = true + tag = "asapo.docker" + } + } + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + port "authorizer" { + static = "5007" + } + } + } + + service { + name = "asapo-authorizer" + port = "authorizer" + check { + name = "alive" + type = "http" + path = "/health-check" + interval = "10s" + timeout = "2s" + initial_status = "passing" + } + check_restart { + limit = 2 + grace = "15s" + ignore_warnings = false + } + } + + template { + source = "@NOMAD_INSTALL@/authorizer.json.tpl" + destination = "local/config.json" + change_mode = "restart" + } + } + } #authorizer + group "asapo-discovery" { + count = 1 + + task "asapo-discovery" { + driver = "docker" + + config { + network_mode = "host" + dns_servers = ["127.0.0.1"] + image = "yakser/asapo-discovery:feature_ha" + force_pull = true + volumes = ["local/config.json:/var/lib/discovery/config.json"] + logging { + type = "fluentd" + config { + fluentd-address = "localhost:9881" + fluentd-async-connect = true + tag = "asapo.docker" + } + } + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + port "discovery" { + static = "5006" + } + } + } + + service { + name = "asapo-discovery" + port = "discovery" + check { + name = "alive" + type = "http" + path = "/receivers" + interval = "10s" + timeout = "2s" + initial_status = "passing" + } + check_restart { + limit = 2 + grace = "15s" + ignore_warnings = false + } + + } + + template { + source = "@NOMAD_INSTALL@/discovery.json.tpl" + destination = "local/config.json" + change_mode = "restart" + } + } + } +} diff --git a/deploy/nomad_jobs/asapo-test.nmd b/deploy/nomad_jobs/asapo-test.nmd new file mode 100644 index 0000000000000000000000000000000000000000..db71a705a63bb40407526841e77f00b9141ed3b9 --- /dev/null +++ b/deploy/nomad_jobs/asapo-test.nmd @@ -0,0 +1,177 @@ +job "asapo-test" { + datacenters = ["dc1"] + + type = "batch" + + group "windows-test" { + + constraint { + attribute = "${attr.kernel.name}" + value = "windows" + } + + count = 0 + task "producer" { + driver = "raw_exec" + + config { + command = "local/dummy-data-producer.exe" + args = [ + "psana002:8400", + "asapo_test1", + "100", + "1000000", + "8", + "0", + "1000"] + } + + artifact { + source = "http://nims.desy.de/extra/asapo/dummy-data-producer.exe" + mode = "file" + destination = "local/dummy-data-producer.exe" + + } + + # resources { + # cpu = 5000 + # memory = 128 + # network { + # mbits = 10000 + # } + } + + } #windows + + group "worker-linux1" { + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + +# constraint { +# attribute = "${meta.location}" +# operator = "!=" +# value = "petra3" +# } + + count = 1 + + task "worker-linux" { + driver = "raw_exec" + + config { + command = "local/getnext_broker" + args = [ + "psana002:8400", + "asapo_test1", + "16", + "oTsKsj8i6WcW_gVzeIFvZCtSfMErjDELJEyAI23n7Ik=", + "30000"] + } + + resources { + cpu = 5000 + memory = 128 + network { + mbits = 10000 + } + } + + artifact { + source = "http://nims.desy.de/extra/asapo/getnext_broker" + } + } + + } # worker-linux1 + + +group "worker-linux2" { + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + +# constraint { +# attribute = "${meta.location}" +# operator = "!=" +# value = "petra3" +# } + + count = 1 + + task "worker-linux" { + driver = "raw_exec" + + config { + command = "local/getnext_broker" + args = [ + "psana002:8400", + "asapo_test2", + "16", + "yzgAcLmijSLWIm8dBiGNCbc0i42u5HSm-zR6FRqo__Y=", + "30000"] + } + resources { + cpu = 5000 + memory = 128 + network { + mbits = 10000 + } + } + + artifact { + source = "http://nims.desy.de/extra/asapo/getnext_broker" + } + } + + } # worker-linux2 + + + group "linux-test" { + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + constraint { + attribute = "${meta.location}" + value = "petra3" + } + + count = 1 + + task "producer" { + driver = "raw_exec" + + config { + command = "local/dummy-data-producer" + args = [ + "psana002:8400", + "asapo_test2", + "100", + "1000000", + "8", + "0", + "1000"] + } + + artifact { + source = "http://nims.desy.de/extra/asapo/dummy-data-producer" + } + + # resources { + # cpu = 5000 + # memory = 128 + # network { + # mbits = 10000 + # } + # } + + } + + } #linux +} diff --git a/deploy/nomad_jobs/authorizer.json.tpl b/deploy/nomad_jobs/authorizer.json.tpl new file mode 100644 index 0000000000000000000000000000000000000000..9dc4efbfa0d7b4c890afd29404789f0d1d2b1ae5 --- /dev/null +++ b/deploy/nomad_jobs/authorizer.json.tpl @@ -0,0 +1,11 @@ +{ + "Port": {{ env "NOMAD_PORT_authorizer" }}, + "LogLevel":"debug", + "AlwaysAllowedBeamtimes":[{"BeamtimeId":"asapo_test","Beamline":"test"}, + {"BeamtimeId":"asapo_test1","Beamline":"test1"}, + {"BeamtimeId":"asapo_test2","Beamline":"test2"}], + "BeamtimeBeamlineMappingFile":"//var//lib//authorizer//beamtime_beamline_mapping.txt", + "IpBeamlineMappingFolder":"//var//lib//authorizer//ip_beamtime_mapping" +} + + diff --git a/deploy/nomad_jobs/broker.json.tpl b/deploy/nomad_jobs/broker.json.tpl new file mode 100644 index 0000000000000000000000000000000000000000..fe73cff797cad90fa2908979fa11327e26dc0e38 --- /dev/null +++ b/deploy/nomad_jobs/broker.json.tpl @@ -0,0 +1,8 @@ +{ + "BrokerDbAddress":"mongo.service.asapo:27017", + "MonitorDbAddress":"influxdb.service.asapo:8086", + "MonitorDbName": "asapo_brokers", + "port":{{ env "NOMAD_PORT_broker" }}, + "LogLevel":"info", + "SecretFile":"/secrets/secret.key" +} \ No newline at end of file diff --git a/deploy/nomad_jobs/discovery.json.tpl b/deploy/nomad_jobs/discovery.json.tpl new file mode 100644 index 0000000000000000000000000000000000000000..fdb2773288e50f17354d2ac36fff8bbcd923d999 --- /dev/null +++ b/deploy/nomad_jobs/discovery.json.tpl @@ -0,0 +1,10 @@ +{ + "Mode": "consul", + "Receiver": { + "MaxConnections": 32 + }, + "Port": {{ env "NOMAD_PORT_discovery" }}, + "LogLevel": "{{ keyOrDefault "log_level" "info" }}" +} + + diff --git a/deploy/nomad_jobs/fluentd.conf b/deploy/nomad_jobs/fluentd.conf new file mode 100644 index 0000000000000000000000000000000000000000..3c7d54708e94b1dc168d7dac3922e78447ae85b7 --- /dev/null +++ b/deploy/nomad_jobs/fluentd.conf @@ -0,0 +1,55 @@ +<source> + @type forward + port 24224 + source_hostname_key source_addr + bind 0.0.0.0 +</source> + +<source> + @type http + port 9880 + bind 0.0.0.0 + add_remote_addr true + format json + time_format %Y-%m-%d %H:%M:%S.%N +</source> + +<filter asapo.docker> + @type parser + key_name log + format json + time_format %Y-%m-%d %H:%M:%S.%N + reserve_data true +</filter> + +<filter asapo.docker> + @type record_transformer + enable_ruby + remove_keys ["log","container_id","container_name"] + <record> + source_addr ${record["source_addr"].split('.')[0]} + </record> +</filter> + +<match asapo.**> +@type copy +<store> + @type elasticsearch + host elasticsearch.service.asapo + port 9200 + flush_interval 5s + logstash_format true + time_key_format %Y-%m-%dT%H:%M:%S.%N + time_key time + time_key_exclude_timestamp true + buffer_type memory + flush_interval 1s + </store> + <store> + @type file + flush_interval 1s + buffer_type memory + path /shared/asapo-logs + </store> +</match> + diff --git a/deploy/nomad_jobs/init_influx.sh b/deploy/nomad_jobs/init_influx.sh new file mode 100644 index 0000000000000000000000000000000000000000..0319c4c4f38a2aa9f47e0f1e885600d665859c9a --- /dev/null +++ b/deploy/nomad_jobs/init_influx.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash + +influx=`dig +short @127.0.0.1 influxdb.service.asapo | head -1` + +databases="asapo_receivers asapo_brokers" + +for database in $databases +do + curl -i -XPOST http://${influx}:8086/query --data-urlencode "q=CREATE DATABASE $database" +done \ No newline at end of file diff --git a/deploy/nomad_jobs/kibana.yml b/deploy/nomad_jobs/kibana.yml new file mode 100644 index 0000000000000000000000000000000000000000..4b8e272b46ab6d6abc68c4519a7f16d83e4c517e --- /dev/null +++ b/deploy/nomad_jobs/kibana.yml @@ -0,0 +1,6 @@ +elasticsearch: + url: "http://elasticsearch.service.asapo:9200" +server: + basePath: "/logsview" + rewriteBasePath: true + host: "0.0.0.0" diff --git a/deploy/nomad_jobs/nginx.conf.tpl b/deploy/nomad_jobs/nginx.conf.tpl new file mode 100644 index 0000000000000000000000000000000000000000..854cd7a18980c411e73c46d7aa9228f718692a7c --- /dev/null +++ b/deploy/nomad_jobs/nginx.conf.tpl @@ -0,0 +1,73 @@ +worker_processes 1; + +events { + worker_connections 1024; +} + +http { +# include mime.types; +# default_type application/octet-stream; + +# sendfile on; +# tcp_nopush on; + +# keepalive_timeout 0; +# keepalive_timeout 65; + + resolver 127.0.0.1:53 valid=1s; + server { + listen {{ env "NOMAD_PORT_nginx" }}; + set $discovery_endpoint asapo-discovery.service.asapo; +# set $authorizer_endpoint asapo-authorizer.service.asapo; + set $fluentd_endpoint fluentd.service.asapo; + set $kibana_endpoint kibana.service.asapo; + set $grafana_endpoint grafana.service.asapo; + + location /discovery/ { + rewrite ^/discovery(/.*) $1 break; + proxy_pass http://$discovery_endpoint:5006$uri$is_args$args; + } + + location /logs/ { + rewrite ^/logs(/.*) $1 break; + proxy_pass http://$fluentd_endpoint:9880$uri$is_args$args; + } + + location /logsview/ { + proxy_pass http://$kibana_endpoint:5601$uri$is_args$args; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header Host $http_host; + } + + location /performance/ { + rewrite ^/performance(/.*) $1 break; + proxy_pass http://$grafana_endpoint:3000$uri$is_args$args; + } + +# location /authorizer/ { +# rewrite ^/authorizer(/.*) $1 break; +# proxy_pass http://$authorizer_endpoint:5007$uri$is_args$args; +# } + + location /nginx-health { + return 200 "healthy\n"; + } + } +} + +stream { + resolver 127.0.0.1:53 valid=1s; + + map $remote_addr $upstream { + default fluentd.service.asapo; + } + + + server { + listen 9881; + proxy_pass $upstream:24224; + } +} + + diff --git a/deploy/nomad_jobs/receiver.json.tpl b/deploy/nomad_jobs/receiver.json.tpl new file mode 100644 index 0000000000000000000000000000000000000000..b4cabe70803d549891023281c5586a021720c455 --- /dev/null +++ b/deploy/nomad_jobs/receiver.json.tpl @@ -0,0 +1,13 @@ +{ + "MonitorDbAddress":"influxdb.service.asapo:8086", + "MonitorDbName": "asapo_receivers", + "BrokerDbAddress":"mongo.service.asapo:27017", + "AuthorizationServer": "asapo-authorizer.service.asapo:5007", + "AuthorizationInterval": 10000, + "ListenPort": {{ env "NOMAD_PORT_recv" }}, + "Tag": "{{ env "NOMAD_ADDR_recv" }}", + "WriteToDisk":true, + "WriteToDb":true, + "LogLevel" : "info", + "RootFolder" : "/var/lib/receiver/data" +} diff --git a/deploy/nomad_jobs/start_asapo.sh b/deploy/nomad_jobs/start_asapo.sh new file mode 100644 index 0000000000000000000000000000000000000000..59a12f2b1de077d04ca1e115970565dc2d3c23db --- /dev/null +++ b/deploy/nomad_jobs/start_asapo.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +nomad run asapo-nginx.nmd +nomad run asapo-logging.nmd +nomad run asapo-mongo.nmd +nomad run asapo-brokers.nmd +nomad run asapo-services.nmd +nomad run asapo-perfmetrics.nmd +nomad run asapo-receivers.nmd diff --git a/deploy/nomad_jobs/stop_asapo.sh b/deploy/nomad_jobs/stop_asapo.sh new file mode 100644 index 0000000000000000000000000000000000000000..9ede6dd7d44e97a485f4baf5f257c4f09c194a60 --- /dev/null +++ b/deploy/nomad_jobs/stop_asapo.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +nomad stop asapo-nginx +nomad stop asapo-logging +nomad stop asapo-mongo +nomad stop asapo-services +nomad stop asapo-perfmetrics +nomad stop asapo-receivers +nomad stop asapo-brokers diff --git a/discovery/CMakeLists.txt b/discovery/CMakeLists.txt index 871e6e93faf61f313071790d6c72d206d756cdf6..2c863597936d4ba34e5dd0434f913989e14908a7 100644 --- a/discovery/CMakeLists.txt +++ b/discovery/CMakeLists.txt @@ -20,6 +20,8 @@ ENDIF() include(testing_go) +configure_file(docker/Dockerfile . COPYONLY) + add_custom_target(${TARGET_NAME} ALL COMMAND ${CMAKE_COMMAND} -E env GOPATH=${gopath} go build ${GO_OPTS} -o ${exe_name} asapo_discovery/main diff --git a/discovery/docker/Dockerfile b/discovery/docker/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..e2e745708cad2b5c28cda18f98230e209aaf306f --- /dev/null +++ b/discovery/docker/Dockerfile @@ -0,0 +1,3 @@ +FROM busybox:glibc +ADD asapo-discovery / +CMD ["/asapo-discovery","-config","/var/lib/discovery/config.json"] diff --git a/discovery/src/asapo_discovery/main/discovery.go b/discovery/src/asapo_discovery/main/discovery.go index 683b80472cd936661dee73b890952db860be7bcb..d78d64898b6203fe7d4bab24e8303eb2a384187a 100644 --- a/discovery/src/asapo_discovery/main/discovery.go +++ b/discovery/src/asapo_discovery/main/discovery.go @@ -29,6 +29,7 @@ func PrintUsage() { func main() { var fname = flag.String("config", "", "config file path") + log.SetSoucre("discovery") flag.Parse() if *fname == "" { PrintUsage() diff --git a/discovery/src/asapo_discovery/request_handler/request_handler_consul.go b/discovery/src/asapo_discovery/request_handler/request_handler_consul.go index 12f3dc13c98f64a0255a2ab05dd2d3194a5affe1..9c212504522a2ab0ba2d55a641f4c61293adba18 100644 --- a/discovery/src/asapo_discovery/request_handler/request_handler_consul.go +++ b/discovery/src/asapo_discovery/request_handler/request_handler_consul.go @@ -53,7 +53,7 @@ func (rh *ConsulRequestHandler) GetReceivers() ([]byte, error) { } var response Responce var err error - response.Uris, err = rh.GetServices("receiver") + response.Uris, err = rh.GetServices("asapo-receiver") if err != nil { return nil, err } @@ -65,7 +65,7 @@ func (rh *ConsulRequestHandler) GetBroker() ([]byte, error) { if (rh.client == nil) { return nil, errors.New("consul client not connected") } - response, err := rh.GetServices("broker") + response, err := rh.GetServices("asapo-broker") if err != nil { return nil, err } diff --git a/discovery/src/asapo_discovery/request_handler/request_handler_consul_test.go b/discovery/src/asapo_discovery/request_handler/request_handler_consul_test.go index 716d545310d28be660ba634aa5b9a6a5680675be..002afc6e0f56fdda9ef764f084cae042cf09f530 100644 --- a/discovery/src/asapo_discovery/request_handler/request_handler_consul_test.go +++ b/discovery/src/asapo_discovery/request_handler/request_handler_consul_test.go @@ -49,16 +49,16 @@ func (suite *ConsulHandlerTestSuite) SetupTest() { panic(err) } - suite.registerAgents("receiver") - suite.registerAgents("broker") + suite.registerAgents("asapo-receiver") + suite.registerAgents("asapo-broker") } func (suite *ConsulHandlerTestSuite) TearDownTest() { - suite.client.Agent().ServiceDeregister("receiver1234") - suite.client.Agent().ServiceDeregister("receiver1235") - suite.client.Agent().ServiceDeregister("broker1234") - suite.client.Agent().ServiceDeregister("broker1235") + suite.client.Agent().ServiceDeregister("asapo-receiver1234") + suite.client.Agent().ServiceDeregister("asapo-receiver1235") + suite.client.Agent().ServiceDeregister("asapo-broker1234") + suite.client.Agent().ServiceDeregister("asapo-broker1235") } func (suite *ConsulHandlerTestSuite) TestInitDefaultUri() { @@ -127,8 +127,8 @@ func (suite *ConsulHandlerTestSuite) TestGetBrokerRoundRobin() { func (suite *ConsulHandlerTestSuite) TestGetBrokerEmpty() { - suite.client.Agent().ServiceDeregister("broker1234") - suite.client.Agent().ServiceDeregister("broker1235") + suite.client.Agent().ServiceDeregister("asapo-broker1234") + suite.client.Agent().ServiceDeregister("asapo-broker1235") suite.handler.Init(consul_settings) res, err := suite.handler.GetBroker() diff --git a/examples/producer/dummy-data-producer/check_linux.sh b/examples/producer/dummy-data-producer/check_linux.sh index c9601daa177e35f58479f84f2f9cc4ba3c7dc35e..f6a290300eaad68a91929ae1782f90710113234f 100644 --- a/examples/producer/dummy-data-producer/check_linux.sh +++ b/examples/producer/dummy-data-producer/check_linux.sh @@ -14,8 +14,7 @@ mkdir files $@ files beamtime_id 11 4 4 1 10 2>&1 | grep Rate - -ls -ln files/0.bin | awk '{ print $5 }'| grep 11264 -ls -ln files/1.bin | awk '{ print $5 }'| grep 11264 -ls -ln files/2.bin | awk '{ print $5 }'| grep 11264 -ls -ln files/3.bin | awk '{ print $5 }'| grep 11264 +ls -ln files/0.bin | awk '{ print $5 }'| grep 11000 +ls -ln files/1.bin | awk '{ print $5 }'| grep 11000 +ls -ln files/2.bin | awk '{ print $5 }'| grep 11000 +ls -ln files/3.bin | awk '{ print $5 }'| grep 11000 diff --git a/examples/producer/dummy-data-producer/check_windows.bat b/examples/producer/dummy-data-producer/check_windows.bat index 6270913bc8fc6118a3c7cc144ebf1467643dcb46..016a343469ca429802746830ecc92d168c570c4d 100644 --- a/examples/producer/dummy-data-producer/check_windows.bat +++ b/examples/producer/dummy-data-producer/check_windows.bat @@ -5,16 +5,16 @@ mkdir %folder% "%1" %folder% beamtime_id 11 4 4 1 10 2>&1 | findstr "Rate" || goto :error FOR /F "usebackq" %%A IN ('%folder%\0.bin') DO set size=%%~zA -if %size% NEQ 11264 goto :error +if %size% NEQ 11000 goto :error FOR /F "usebackq" %%A IN ('%folder%\1.bin') DO set size=%%~zA -if %size% NEQ 11264 goto :error +if %size% NEQ 11000 goto :error FOR /F "usebackq" %%A IN ('%folder%\2.bin') DO set size=%%~zA -if %size% NEQ 11264 goto :error +if %size% NEQ 11000 goto :error FOR /F "usebackq" %%A IN ('%folder%\3.bin') DO set size=%%~zA -if %size% NEQ 11264 goto :error +if %size% NEQ 11000 goto :error goto :clean diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 49769c2f73483957366cae0691991216effa94fc..87ca3582c44018c2cd66a7838acfbbac195a301c 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -25,7 +25,7 @@ struct Args { void PrintCommandArguments(const Args& args) { std::cout << "receiver_address: " << args.receiver_address << std::endl << "beamtime_id: " << args.beamtime_id << std::endl - << "Package size: " << args.number_of_bytes / 1024 << "k" << std::endl + << "Package size: " << args.number_of_bytes / 1000 << "k" << std::endl << "iterations: " << args.iterations << std::endl << "nthreads: " << args.nthreads << std::endl << "mode: " << args.mode << std::endl @@ -46,7 +46,7 @@ void ProcessCommandArguments(int argc, char* argv[], Args* args) { try { args->receiver_address = argv[1]; args->beamtime_id = argv[2]; - args->number_of_bytes = std::stoull(argv[3]) * 1024; + args->number_of_bytes = std::stoull(argv[3]) * 1000; args->iterations = std::stoull(argv[4]); args->nthreads = std::stoull(argv[5]); args->mode = std::stoull(argv[6]); @@ -120,7 +120,7 @@ void WaitThreadsFinished(const Args& args) { void PrintOutput(const Args& args, const high_resolution_clock::time_point& start) { high_resolution_clock::time_point t2 = high_resolution_clock::now(); double duration_sec = std::chrono::duration_cast<std::chrono::milliseconds>( t2 - start ).count() / 1000.0; - double size_gb = double(args.number_of_bytes) * args.iterations / 1024.0 / 1024.0 / 1024.0 * 8.0; + double size_gb = double(args.number_of_bytes) * args.iterations / 1000.0 / 1000.0 / 1000.0 * 8.0; double rate = args.iterations / duration_sec; std::cout << "Rate: " << rate << " Hz" << std::endl; std::cout << "Bandwidth " << size_gb / duration_sec << " Gbit/s" << std::endl; diff --git a/examples/worker/getnext_broker/check_linux.sh b/examples/worker/getnext_broker/check_linux.sh index 8455966d010beea4c21e85df5d25753e8f8ff15a..ed90a1acba61ea67c9c6034145cf38b10fee0ca5 100644 --- a/examples/worker/getnext_broker/check_linux.sh +++ b/examples/worker/getnext_broker/check_linux.sh @@ -24,7 +24,7 @@ do echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1})' | mongo ${database_name} done -$@ 127.0.0.1:8400 $database_name 2 $token_test_run | grep "Processed 3 file(s)" +$@ 127.0.0.1:8400 $database_name 2 $token_test_run 1000 | grep "Processed 3 file(s)" diff --git a/examples/worker/getnext_broker/check_windows.bat b/examples/worker/getnext_broker/check_windows.bat index d96b652d8e7993e7bff84a30842a492ca3a5760c..1e678d49985463eca3cc2da2d15988f514084beb 100644 --- a/examples/worker/getnext_broker/check_windows.bat +++ b/examples/worker/getnext_broker/check_windows.bat @@ -11,7 +11,7 @@ ping 1.0.0.0 -n 10 -w 100 > nul for /l %%x in (1, 1, 3) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1}) | %mongo_exe% %database_name% || goto :error -"%1" 127.0.0.1:8400 %database_name% 1 %token_test_run% | findstr /c:"Processed 3 file" || goto :error +"%1" 127.0.0.1:8400 %database_name% 1 %token_test_run% 1000 | findstr /c:"Processed 3 file" || goto :error goto :clean :error diff --git a/examples/worker/getnext_broker/getnext_broker.cpp b/examples/worker/getnext_broker/getnext_broker.cpp index 46648a2db695c8f0d77ae76f51c859561a3b91b0..51484cf7f6b8cff4b4516c21f2d912423fc263bd 100644 --- a/examples/worker/getnext_broker/getnext_broker.cpp +++ b/examples/worker/getnext_broker/getnext_broker.cpp @@ -16,6 +16,7 @@ struct Params { std::string server; std::string beamtime_id; std::string token; + int timeout_ms; int nthreads; }; @@ -40,7 +41,7 @@ std::vector<std::thread> StartThreads(const Params& params, std::vector<int>* nf asapo::FileInfo fi; Error err; auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.beamtime_id, params.token, &err); - broker->SetTimeout(10000); + broker->SetTimeout(params.timeout_ms); while ((err = broker->GetNext(&fi, nullptr)) == nullptr) { (*nfiles)[i] ++; } @@ -78,8 +79,8 @@ int ReadAllData(const Params& params, uint64_t* duration_ms) { } int main(int argc, char* argv[]) { - if (argc != 5) { - std::cout << "Usage: " + std::string{argv[0]} +" <server> <run_name> <nthreads> <token>" << std::endl; + if (argc != 6) { + std::cout << "Usage: " + std::string{argv[0]} +" <server> <run_name> <nthreads> <token> <timeout ms>" << std::endl; exit(EXIT_FAILURE); } Params params; @@ -87,6 +88,7 @@ int main(int argc, char* argv[]) { params.beamtime_id = std::string{argv[2]}; params.nthreads = atoi(argv[3]); params.token = std::string{argv[4]}; + params.timeout_ms = atoi(argv[5]); uint64_t duration_ms; auto nfiles = ReadAllData(params, &duration_ms); diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp index 043b1881a1cc1d5b4b1b4ad741d4d00c11da3dd0..8aad81b27323386c831fd74d1e816f904147c5ef 100644 --- a/producer/api/src/request_handler_tcp.cpp +++ b/producer/api/src/request_handler_tcp.cpp @@ -33,7 +33,7 @@ Error RequestHandlerTcp::ConnectToReceiver(const std::string& beamtime_id, const log__->Debug("cannot connect to receiver at " + receiver_address + " - " + err->Explain()); return err; } - log__->Info("connected to receiver at " + receiver_address); + log__->Debug("connected to receiver at " + receiver_address); connected_receiver_uri_ = receiver_address; err = Authorize(beamtime_id); @@ -43,7 +43,7 @@ Error RequestHandlerTcp::ConnectToReceiver(const std::string& beamtime_id, const return err; } - log__->Debug("authorized at " + receiver_address); + log__->Info("authorized connection to receiver at " + receiver_address); return nullptr; } @@ -143,7 +143,7 @@ bool RequestHandlerTcp::NeedRebalance() { void RequestHandlerTcp::CloseConnectionToPeformRebalance() { io__->CloseSocket(sd_, nullptr); - log__->Info("rebalancing"); + log__->Debug("rebalancing"); sd_ = kDisconnectedSocketDescriptor; } @@ -171,7 +171,8 @@ Error RequestHandlerTcp::ProcessRequestUnlocked(const Request* request) { auto err = TrySendToReceiver(request); if (ServerError(err)) { Disconnect(); - log__->Debug("cannot send data to " + receiver_uri + ": " + err->Explain()); + log__->Debug("cannot send data id " + std::to_string(request->header.data_id) + " to " + receiver_uri + ": " + + err->Explain()); continue; } diff --git a/producer/api/unittests/test_request_handler_tcp.cpp b/producer/api/unittests/test_request_handler_tcp.cpp index 9a9d404abbba2df9de9ccdd99c9b242e5fe29355..c335f7ae07e50ce8d6e5734678cf1ec96209e990 100644 --- a/producer/api/unittests/test_request_handler_tcp.cpp +++ b/producer/api/unittests/test_request_handler_tcp.cpp @@ -116,8 +116,8 @@ ACTION_P(A_WriteSendDataResponse, error_code) { MATCHER_P4(M_CheckSendDataRequest, op_code, file_id, file_size, message, "Checks if a valid GenericRequestHeader was Send") { return ((asapo::GenericRequestHeader*)arg)->op_code == op_code - && ((asapo::GenericRequestHeader*)arg)->data_id == file_id - && ((asapo::GenericRequestHeader*)arg)->data_size == file_size + && ((asapo::GenericRequestHeader*)arg)->data_id == uint64_t(file_id) + && ((asapo::GenericRequestHeader*)arg)->data_size == uint64_t(file_size) && strcmp(((asapo::GenericRequestHeader*)arg)->message, message) == 0; } @@ -162,6 +162,12 @@ void RequestHandlerTcpTests::ExpectFailAuthorize(bool only_once) { testing::ReturnArg<2>() )); EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); + EXPECT_CALL(mock_logger, Debug(AllOf( + HasSubstr("disconnected"), + HasSubstr(receivers_list[i]) + ) + )); + EXPECT_CALL(mock_logger, Error(AllOf( HasSubstr("authorization"), HasSubstr(expected_auth_message), @@ -193,11 +199,11 @@ void RequestHandlerTcpTests::ExpectOKAuthorize(bool only_once) { A_WriteSendDataResponse(asapo::kNetErrorNoError), testing::ReturnArg<2>() )); - EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("authorized"), - HasSubstr(receivers_list[i]) - ) - )); + EXPECT_CALL(mock_logger, Info(AllOf( + HasSubstr("authorized"), + HasSubstr(receivers_list[i]) + ) + )); if (only_once) break; i++; } @@ -335,11 +341,11 @@ void RequestHandlerTcpTests::ExpectOKConnect(bool only_once) { testing::SetArgPointee<1>(nullptr), Return(expected_sds[i]) )); - EXPECT_CALL(mock_logger, Info(AllOf( - HasSubstr("connected"), - HasSubstr(expected_address) - ) - )); + EXPECT_CALL(mock_logger, Debug(AllOf( + HasSubstr("connected to"), + HasSubstr(expected_address) + ) + )); if (only_once) break; i++; } diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index c4b327d95e07c6c1eccdda2ed298abfe85f9b001..3390a85d7cab61dfcd39cd450408f9aae2d38e19 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -18,6 +18,7 @@ set(SOURCE_FILES ################################ # Library ################################ +#SET( CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static") add_library(${TARGET_NAME} STATIC ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> $<TARGET_OBJECTS:curl_http_client> @@ -35,6 +36,9 @@ set_target_properties(${TARGET_NAME}-bin PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}$<$<CONFIG:Debug>:> ) +configure_file(docker/Dockerfile . COPYONLY) + + ################################ # Testing ################################ diff --git a/receiver/docker/Dockerfile b/receiver/docker/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..ad8b5c2634f7f6d9dae2f72e33587457cee68773 --- /dev/null +++ b/receiver/docker/Dockerfile @@ -0,0 +1,3 @@ +FROM ubuntu:18.04 +ADD receiver / +CMD ["/receiver","/var/lib/receiver/config.json"] diff --git a/receiver/src/request_handler_db_write.cpp b/receiver/src/request_handler_db_write.cpp index 7da85f40d12bd794abe320df6e4b52a7738fbf50..96d6b48555f811dc47f3ceb9487b10ba5fb77ec6 100644 --- a/receiver/src/request_handler_db_write.cpp +++ b/receiver/src/request_handler_db_write.cpp @@ -18,9 +18,11 @@ Error RequestHandlerDbWrite::ProcessRequest(Request* request) const { file_info.name = request->GetFileName(); file_info.size = request->GetDataSize(); file_info.id = request->GetDataID(); - auto err = db_client__->Insert(file_info, false); + // todo: create flag ignore dups, allow dups for attempts to resend data + auto err = db_client__->Insert(file_info, true); if (!err) { - log__->Debug(std::string{"insert record to "} + kDBCollectionName + " in " + db_name_ + + log__->Debug(std::string{"insert record id "} + std::to_string(file_info.id) + " to " + kDBCollectionName + " in " + + db_name_ + " at " + GetReceiverConfig()->broker_db_uri); } return err; diff --git a/receiver/src/requests_dispatcher.cpp b/receiver/src/requests_dispatcher.cpp index ddd2ad76b415b40b65622e4c4fca241fb33798c3..7ddfdcce38db120ecf6300ac59d09425e164bc36 100644 --- a/receiver/src/requests_dispatcher.cpp +++ b/receiver/src/requests_dispatcher.cpp @@ -28,7 +28,7 @@ NetworkErrorCode GetNetworkCodeFromError(const Error& err) { } Error RequestsDispatcher::ProcessRequest(const std::unique_ptr<Request>& request) const noexcept { - log__->Debug("processing request from " + producer_uri_ ); + log__->Debug("processing request id " + std::to_string(request->GetDataID()) + " from " + producer_uri_ ); Error handle_err; handle_err = request->Handle(statistics__); GenericNetworkResponse generic_response; @@ -70,100 +70,4 @@ std::unique_ptr<Request> RequestsDispatcher::GetNextRequest(Error* err) const no return request; } - - -/* - #include <cstring> -#include <assert.h> -#include "connection.h" -#include "receiver_error.h" -#include "io/io_factory.h" - -#include "receiver_logger.h" - -namespace asapo { - -size_t Connection::kRequestHandlerMaxBufferSize; -std::atomic<uint32_t> Connection::kNetworkProducerPeerImplGlobalCounter(0); - -Connection::Connection(SocketDescriptor socket_fd, const std::string& address, - std::string receiver_tag) : request_factory__{new RequestFactory}, - io__{GenerateDefaultIO()}, - statistics__{new Statistics}, - log__{GetDefaultReceiverLogger()}, - authorizer__{new ConnectionAuthorizer}, - requests_dispatcher__{new RequestsDispatcher}{ - socket_fd_ = socket_fd; - connection_id_ = kNetworkProducerPeerImplGlobalCounter++; - address_ = address; - statistics__->AddTag("connection_from", address); - statistics__->AddTag("receiver_tag", std::move(receiver_tag)); -} - -uint64_t Connection::GetId() const noexcept { - return connection_id_; -} - - -Error Connection::ReadAuthorizationHeaderIfNeeded() const { - if (auth_header_was_read_) return nullptr; - - Error err; - GenericRequestHeader generic_request_header; - io__->Receive(socket_fd_, &generic_request_header, sizeof(GenericRequestHeader), &err); - if (err) { - log__->Error("error receive authorization header from " + address_ + " - " + err->Explain()); - return err; - } - - if (generic_request_header.op_code != kOpcodeAuthorize) { - std::string msg= "wrong code in authorization header from " + address_; - log__->Error(msg); - return TextError(msg); - } - - beamtime_id_=std::string{generic_request_header.message}; - return nullptr; -} - -Error Connection::SendAuthorizationResponseIfNeeded(const Error& auth_err) const { - if (auth_header_was_read_) return nullptr; - - GenericNetworkResponse generic_response; - if (auth_err == nullptr) { - generic_response.error_code = kNetErrorNoError; - } else { - generic_response.error_code = kNetAuthorizationError; - strcpy(generic_response.message, auth_err->Explain().c_str()); - } - - Error send_err; - io__->Send(socket_fd_, &generic_response, sizeof(GenericNetworkResponse), &send_err); - if (send_err) { - log__->Error("error sending authorization response to " + address_ + " - " + send_err->Explain()); - return send_err; - } - auth_header_was_read_ = true; - return nullptr; -} - -Error Connection::AuthorizeIfNeeded() const { - Error err = ReadAuthorizationHeaderIfNeeded(); - if (err == nullptr) { - err = authorizer__->Authorize(beamtime_id_,address_); - } - Error err_send = SendAuthorizationResponseIfNeeded(err); - - return err == nullptr ? std::move(err_send) : std::move(err); -} - - - -} - - - - - */ - } diff --git a/receiver/unittests/test_requests_dispatcher.cpp b/receiver/unittests/test_requests_dispatcher.cpp index 561bed160e101ac87224256c608a0574fb489efe..ab08460aed608245475260810f0505b610c406d2 100644 --- a/receiver/unittests/test_requests_dispatcher.cpp +++ b/receiver/unittests/test_requests_dispatcher.cpp @@ -149,7 +149,7 @@ class RequestsDispatcherTests : public Test { } void MockHandleRequest(bool error, Error err = asapo::IOErrorTemplates::kUnknownIOError.Generate() ) { - EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("processing request from"), HasSubstr(connected_uri)))); + EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("processing request"), HasSubstr(connected_uri)))); EXPECT_CALL(mock_request, Handle_t()).WillOnce( Return(error ? err.release() : nullptr) diff --git a/tests/automatic/broker/CMakeLists.txt b/tests/automatic/broker/CMakeLists.txt index 6f1685c7d12bf036a9fa6f126b357fd962d2b954..24ec3cef4137fb098dffcf31f475da4355ae1e01 100644 --- a/tests/automatic/broker/CMakeLists.txt +++ b/tests/automatic/broker/CMakeLists.txt @@ -3,4 +3,5 @@ add_subdirectory(read_config) if (UNIX) add_subdirectory(check_monitoring) +add_subdirectory(check_mongo_restart) endif() \ No newline at end of file diff --git a/tests/automatic/broker/check_mongo_restart/CMakeLists.txt b/tests/automatic/broker/check_mongo_restart/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..a3d2838ce60d8687f25707e867889cdf7a232b03 --- /dev/null +++ b/tests/automatic/broker/check_mongo_restart/CMakeLists.txt @@ -0,0 +1,7 @@ +set(TARGET_NAME check_broker_mongo_restart) + +################################ +# Testing +################################ +prepare_asapo() +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:getnext_broker> $<TARGET_PROPERTY:asapo,EXENAME>" nomem) diff --git a/tests/automatic/broker/check_mongo_restart/check_linux.sh b/tests/automatic/broker/check_mongo_restart/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..a8285563df4885bc92d17547bb673e9c3a4c1994 --- /dev/null +++ b/tests/automatic/broker/check_mongo_restart/check_linux.sh @@ -0,0 +1,98 @@ +#!/usr/bin/env bash + +set -e + +trap Cleanup EXIT + +beamtime_id=asapo_test +token=`$3 token -secret broker_secret.key $beamtime_id` + +monitor_database_name=db_test +proxy_address=127.0.0.1:8400 + +beamline=test +receiver_root_folder=/tmp/asapo/receiver/files +receiver_folder=${receiver_root_folder}/${beamline}/${beamtime_id} + +function wait_mongo { +NEXT_WAIT_TIME=0 +until mongo --port 27016 --eval "db.version()" | tail -2 | grep version || [ $NEXT_WAIT_TIME -eq 30 ]; do + echo "Wait for mongo" + NEXT_WAIT_TIME=$(( NEXT_WAIT_TIME++ )) + sleep 1 +done +if (( NEXT_WAIT_TIME == 30 )); then + echo "Timeout" + exit -1 +fi +} + + +function kill_mongo { + kill -9 `ps xa | grep mongod | grep 27016 | awk '{print $1;}'` +} + + +function start_mongo { + mongod --dbpath /tmp/mongo --port 27016 --logpath /tmp/mongolog --fork +} + + +Cleanup() { + echo cleanup + rm -rf ${receiver_root_folder} + nomad stop nginx + nomad stop receiver + nomad stop discovery + nomad stop broker + nomad stop authorizer +# kill $producerid + echo "db.dropDatabase()" | mongo --port 27016 ${beamtime_id} + influx -execute "drop database ${monitor_database_name}" + kill_mongo +} + +influx -execute "create database ${monitor_database_name}" + +sed -i 's/27017/27016/g' receiver.json.tpl +sed -i 's/"WriteToDisk":true/"WriteToDisk":false/g' receiver.json.tpl +sed -i 's/27017/27016/g' broker.json.tpl +sed -i 's/info/debug/g' broker.json.tpl + +start_mongo +wait_mongo + + +nomad run nginx.nmd +nomad run authorizer.nmd +nomad run receiver.nmd +nomad run discovery.nmd +nomad run broker.nmd + +sleep 1 + +echo "db.${beamtime_id}.insert({dummy:1})" | mongo --port 27016 ${beamtime_id} + + + +#producer +mkdir -p ${receiver_folder} +$1 localhost:8400 ${beamtime_id} 100 1000 4 0 100 & +#producerid=`echo $!` + +wait + +$2 ${proxy_address} ${beamtime_id} 2 $token 10000 &> output.txt & + +sleep 2 + +kill_mongo +sleep 3 +start_mongo + +wait + +cat output.txt +nfiles=`cat output.txt | grep "Processed" | awk '{print $2;}'` +test $nfiles -ge 1000 +rm output.txt \ No newline at end of file diff --git a/tests/automatic/curl_http_client/curl_http_client_command/CMakeLists.txt b/tests/automatic/curl_http_client/curl_http_client_command/CMakeLists.txt index 8d8de34465fe9fb3ecb05e4e42e05d7023f8154e..d88fa1fe801bb73514b28b13f66061de38624c75 100644 --- a/tests/automatic/curl_http_client/curl_http_client_command/CMakeLists.txt +++ b/tests/automatic/curl_http_client/curl_http_client_command/CMakeLists.txt @@ -19,9 +19,9 @@ target_link_libraries(${TARGET_NAME} test_common asapo-worker) ################################ #add_test_setup_cleanup(${TARGET_NAME}) -add_integration_test(${TARGET_NAME} get_httpbin "GET httpbin.org body 200") +#add_integration_test(${TARGET_NAME} get_httpbin "GET http://httpbin.org body 200") add_integration_test(${TARGET_NAME} get_badaddress "GET google.com/badaddress found 404") add_integration_test(${TARGET_NAME} get_badaddress2 "GET 111 clienterror 404") -add_integration_test(${TARGET_NAME} post "POST httpbin.org/post testdata 200") +#add_integration_test(${TARGET_NAME} post "POST http://httpbin.org/post data 200") add_integration_test(${TARGET_NAME} post_badaddress "POST google.com/badaddress found 404") add_integration_test(${TARGET_NAME} post_badaddress2 "POST 111 clienterror 404") \ No newline at end of file diff --git a/tests/automatic/full_chain/simple_chain/check_linux.sh b/tests/automatic/full_chain/simple_chain/check_linux.sh index 12b1d68ae529465c22d9903dbdef602bc55f3c58..a19a6d5f9bb6ab64edeba9575eb9db267c56c531 100644 --- a/tests/automatic/full_chain/simple_chain/check_linux.sh +++ b/tests/automatic/full_chain/simple_chain/check_linux.sh @@ -44,4 +44,4 @@ $1 localhost:8400 ${beamtime_id} 100 1000 4 0 100 & #producerid=`echo $!` -$2 ${proxy_address} ${beamtime_id} 2 $token | grep "Processed 1000 file(s)" +$2 ${proxy_address} ${beamtime_id} 2 $token 1000 | grep "Processed 1000 file(s)" diff --git a/tests/automatic/full_chain/simple_chain/check_windows.bat b/tests/automatic/full_chain/simple_chain/check_windows.bat index f26def490a092b7281adf7e2e11bfd4dfcdf3156..6a1448da32c5135cbd11f8536a5b8726f772e454 100644 --- a/tests/automatic/full_chain/simple_chain/check_windows.bat +++ b/tests/automatic/full_chain/simple_chain/check_windows.bat @@ -26,7 +26,7 @@ start /B "" "%1" %proxy_address% %beamtime_id% 100 1000 4 0 100 ping 1.0.0.0 -n 1 -w 100 > nul REM worker -"%2" %proxy_address% %beamtime_id% 2 %token% | findstr /c:"Processed 1000 file(s)" || goto :error +"%2" %proxy_address% %beamtime_id% 2 %token% 1000 | findstr /c:"Processed 1000 file(s)" || goto :error goto :clean diff --git a/tests/automatic/full_chain/two_beamlines/check_linux.sh b/tests/automatic/full_chain/two_beamlines/check_linux.sh index d089d59f3c0bbee48ded9ee8bba173edbd305783..5e18fa175b19617ce375a8c7e9f9882143ca83c2 100644 --- a/tests/automatic/full_chain/two_beamlines/check_linux.sh +++ b/tests/automatic/full_chain/two_beamlines/check_linux.sh @@ -53,5 +53,5 @@ $1 localhost:8400 ${beamtime_id2} 100 900 4 0 100 & #producerid=`echo $!` -$2 ${proxy_address} ${beamtime_id1} 2 $token1 | grep "Processed 1000 file(s)" -$2 ${proxy_address} ${beamtime_id2} 2 $token2 | grep "Processed 900 file(s)" +$2 ${proxy_address} ${beamtime_id1} 2 $token1 1000 | grep "Processed 1000 file(s)" +$2 ${proxy_address} ${beamtime_id2} 2 $token2 1000 | grep "Processed 900 file(s)" diff --git a/tests/automatic/full_chain/two_beamlines/check_windows.bat b/tests/automatic/full_chain/two_beamlines/check_windows.bat index cb8ef418fe03a370cad97f805accb2d514dafb92..2b96bb574460d4d099b8be7158497b733ee00b98 100644 --- a/tests/automatic/full_chain/two_beamlines/check_windows.bat +++ b/tests/automatic/full_chain/two_beamlines/check_windows.bat @@ -35,8 +35,8 @@ start /B "" "%1" %proxy_address% %beamtime_id2% 100 900 4 0 100 ping 1.0.0.0 -n 1 -w 100 > nul REM worker -"%2" %proxy_address% %beamtime_id1% 2 %token1% | findstr /c:"Processed 1000 file(s)" || goto :error -"%2" %proxy_address% %beamtime_id2% 2 %token2% | findstr /c:"Processed 900 file(s)" || goto :error +"%2" %proxy_address% %beamtime_id1% 2 %token1% 1000 | findstr /c:"Processed 1000 file(s)" || goto :error +"%2" %proxy_address% %beamtime_id2% 2 %token2% 1000 | findstr /c:"Processed 900 file(s)" || goto :error goto :clean diff --git a/tests/automatic/producer_receiver/CMakeLists.txt b/tests/automatic/producer_receiver/CMakeLists.txt index ad36394912da0e110b2707f73e0270452030d41f..70f089ea89556bb9f0c722f58a3fa0276018e836 100644 --- a/tests/automatic/producer_receiver/CMakeLists.txt +++ b/tests/automatic/producer_receiver/CMakeLists.txt @@ -1,4 +1,5 @@ add_subdirectory(transfer_single_file) if (UNIX) add_subdirectory(check_monitoring) + add_subdirectory(check_mongo_restart) endif() \ No newline at end of file diff --git a/tests/automatic/producer_receiver/check_mongo_restart/CMakeLists.txt b/tests/automatic/producer_receiver/check_mongo_restart/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..e76f25ff44af36e70e25e5440bf17658446fb218 --- /dev/null +++ b/tests/automatic/producer_receiver/check_mongo_restart/CMakeLists.txt @@ -0,0 +1,7 @@ +set(TARGET_NAME check-mongo-restart) + +################################ +# Testing +################################ +prepare_asapo() +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:dummy-data-producer>" nomem) diff --git a/tests/automatic/producer_receiver/check_mongo_restart/check_linux.sh b/tests/automatic/producer_receiver/check_mongo_restart/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..e7a5dc5e57a84e402d2e62ca07c0847c3c066c48 --- /dev/null +++ b/tests/automatic/producer_receiver/check_mongo_restart/check_linux.sh @@ -0,0 +1,91 @@ +#!/usr/bin/env bash + +set -e + +trap Cleanup EXIT + +function wait_mongo { +NEXT_WAIT_TIME=0 +until mongo --port 27016 --eval "db.version()" | tail -2 | grep version || [ $NEXT_WAIT_TIME -eq 30 ]; do + echo "Wait for mongo" + NEXT_WAIT_TIME=$(( NEXT_WAIT_TIME++ )) + sleep 1 +done +if (( NEXT_WAIT_TIME == 30 )); then + echo "Timeout" + exit -1 +fi +} + + +function kill_mongo { + kill -9 `ps xa | grep mongod | grep 27016 | awk '{print $1;}'` +} + + +function start_mongo { + mongod --dbpath /tmp/mongo --port 27016 --logpath /tmp/mongolog --fork +} + + +database_name=db_test +beamtime_id=asapo_test +beamline=test + +receiver_root_folder=/tmp/asapo/receiver/files +receiver_folder=${receiver_root_folder}/${beamline}/${beamtime_id} + + +Cleanup() { + echo cleanup + rm -rf ${receiver_root_folder} + nomad stop receiver + nomad stop discovery + nomad stop authorizer + nomad stop nginx + echo "db.dropDatabase()" | mongo --port 27016 ${beamtime_id} + kill_mongo +} + +start_mongo +wait_mongo + + +# create db before worker starts reading it. todo: git rid of it +echo "db.${beamtime_id}.insert({dummy:1})" | mongo --port 27016 ${beamtime_id} + +sed -i 's/27017/27016/g' receiver.json.tpl +#sed -i 's/"WriteToDisk":true/"WriteToDisk":false/g' receiver.json.tpl + + +nomad run authorizer.nmd +nomad run nginx.nmd +nomad run receiver.nmd +nomad run discovery.nmd + +mkdir -p ${receiver_folder} + + +sleep 1 + +nfiles=1000 + +$1 localhost:8400 ${beamtime_id} 100 $nfiles 1 0 200 & + +sleep 0.5 + +kill_mongo +sleep 3 +start_mongo + +wait + +echo "db.data.validate(true)" | mongo --port 27016 ${beamtime_id} + +echo processed files: +echo "db.data.count()" | mongo --port 27016 ${beamtime_id} + + +echo "db.data.count()" | mongo --port 27016 ${beamtime_id} | grep $nfiles + + diff --git a/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh b/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh index 972324483c226905c385cb6d51031366f4b52555..433d26bbe0e12a70bdb6cb9ebb733682b0ff2535 100644 --- a/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh +++ b/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh @@ -34,7 +34,7 @@ mkdir -p ${receiver_folder} $1 localhost:8400 ${beamtime_id} 100 1 1 0 30 -ls -ln ${receiver_folder}/1.bin | awk '{ print $5 }'| grep 102400 +ls -ln ${receiver_folder}/1.bin | awk '{ print $5 }'| grep 100000 -$1 localhost:8400 wrong_beamtime_id 100 1 1 0 1 2>1 | grep "authorization failed" \ No newline at end of file +$1 localhost:8400 wrong_beamtime_id 100 1 1 0 1 2>1 | grep "authorization failed" diff --git a/tests/automatic/producer_receiver/transfer_single_file/check_windows.bat b/tests/automatic/producer_receiver/transfer_single_file/check_windows.bat index 7a453b7b1663cb1aa81dc8c261079215cb836202..c17b785ea084fb75120e4e89f5d64db811d50cfe 100644 --- a/tests/automatic/producer_receiver/transfer_single_file/check_windows.bat +++ b/tests/automatic/producer_receiver/transfer_single_file/check_windows.bat @@ -22,7 +22,7 @@ mkdir %receiver_folder% ping 1.0.0.0 -n 1 -w 100 > nul FOR /F "usebackq" %%A IN ('%receiver_folder%\1.bin') DO set size=%%~zA -if %size% NEQ 102400 goto :error +if %size% NEQ 100000 goto :error "%1" localhost:8400 wrong_id 100 1 1 0 2 2>1 | findstr /c:"authorization failed" || goto :error diff --git a/tests/automatic/spd_logger/console/check_linux.sh b/tests/automatic/spd_logger/console/check_linux.sh index bc934a732cde989660de74baa5737b84e07ed5a9..efc8f639ecf1566cd58168f3b54c4b7ff10302bd 100644 --- a/tests/automatic/spd_logger/console/check_linux.sh +++ b/tests/automatic/spd_logger/console/check_linux.sh @@ -5,6 +5,7 @@ set -e res=`$@` echo $res | grep '"level":"info","message":"test info"' +echo $res | grep '"level":"info","message":"test : info"' echo $res | grep '"test_int":2,"test_double":1.0}' echo $res | grep '"level":"error","message":"test error"' echo $res | grep '"level":"debug","message":"test debug"' diff --git a/tests/automatic/spd_logger/console/check_windows.bat b/tests/automatic/spd_logger/console/check_windows.bat index 78ecaf0a0ffb0006370b2c51093a62108cf727c6..e136f913210612b81331b515f08981d730c442c6 100644 --- a/tests/automatic/spd_logger/console/check_windows.bat +++ b/tests/automatic/spd_logger/console/check_windows.bat @@ -1,6 +1,7 @@ "%1" > output findstr /I /L /C:"\"level\":\"info\",\"message\":\"test info\"" output || goto :error +findstr /I /L /C:"\"level\":\"info\",\"message\":\"test : info\"" output || goto :error findstr /I /L /C:"\"level\":\"error\",\"message\":\"test error\"" output || goto :error findstr /I /L /C:"\"level\":\"debug\",\"message\":\"test debug\"" output || goto :error findstr /I /L /C:"\"level\":\"warning\",\"message\":\"test warning\"" output || goto :error diff --git a/tests/automatic/spd_logger/console/spd_logger_console.cpp b/tests/automatic/spd_logger/console/spd_logger_console.cpp index 130e163f82edd8442cc03923436367fa37718e2e..e14e4bf856262d56e292a9cea107f40729566b43 100644 --- a/tests/automatic/spd_logger/console/spd_logger_console.cpp +++ b/tests/automatic/spd_logger/console/spd_logger_console.cpp @@ -32,6 +32,7 @@ int main(int argc, char* argv[]) { logger->Warning("test warning"); logger->Debug("test debug"); + logger->Info("test : info"); logger->SetLogLevel(LogLevel::Error); logger->Info("test info_errorlev"); diff --git a/tests/automatic/system_io/write_data_to_file/CMakeLists.txt b/tests/automatic/system_io/write_data_to_file/CMakeLists.txt index fd034a4d2385b6b8069962b68ffc0485dd16afaa..8a576706d71ad65f73eeb8257400307c4f1bdf43 100644 --- a/tests/automatic/system_io/write_data_to_file/CMakeLists.txt +++ b/tests/automatic/system_io/write_data_to_file/CMakeLists.txt @@ -15,6 +15,6 @@ set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX) ################################ add_test_setup_cleanup(${TARGET_NAME}) add_integration_test(${TARGET_NAME} writeok "test_file ok dummy" nomem) -add_integration_test(${TARGET_NAME} writetwice "test_file error kFileAlreadyExists:test" nomem) +add_integration_test(${TARGET_NAME} writetwice "test_file ok dummy" nomem) add_integration_test(${TARGET_NAME} dirnoaccess "test_noaccess/test_file error Permissiondenied:test_noaccess/test_file" nomem)