diff --git a/CMakeModules/prepare_asapo.cmake b/CMakeModules/prepare_asapo.cmake index 53580a80b0f90d9b3c3546efab2ec5a460216922..26bedee49dad2b3f7a138f5860db32fab6cdf101 100644 --- a/CMakeModules/prepare_asapo.cmake +++ b/CMakeModules/prepare_asapo.cmake @@ -14,5 +14,8 @@ function(prepare_asapo) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/broker.nmd.in broker.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/discovery_settings.json.tpl discovery.json.tpl COPYONLY) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/broker_settings.json.tpl broker.json.tpl COPYONLY) + configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/nginx.conf.tpl nginx.conf.tpl COPYONLY) + configure_file(${CMAKE_SOURCE_DIR}/config/nomad/nginx.nmd.in nginx.nmd @ONLY) + endfunction() diff --git a/broker/src/asapo_broker/utils/status_codes.go b/broker/src/asapo_broker/utils/status_codes.go index 70a2193004b0e34bd07f5f0804b50375fd8ba910..58fef4da3eba0fb3d6962487a88b0c2fc8a4d393 100644 --- a/broker/src/asapo_broker/utils/status_codes.go +++ b/broker/src/asapo_broker/utils/status_codes.go @@ -10,5 +10,5 @@ const ( //error codes StatusError = http.StatusInternalServerError StatusWrongInput = http.StatusBadRequest - StatusNoData = http.StatusNotFound + StatusNoData = http.StatusConflict ) diff --git a/common/cpp/include/logger/logger.h b/common/cpp/include/logger/logger.h index 4657c9874e262a7e0a2ce7bb09ae63726a5e6151..feaba694551091f0bd989f9b5d988ffb02ca384d 100644 --- a/common/cpp/include/logger/logger.h +++ b/common/cpp/include/logger/logger.h @@ -16,6 +16,20 @@ enum class LogLevel { Warning }; +class LogMessageWithFields { + public: + LogMessageWithFields(std::string key, uint64_t val); + LogMessageWithFields(std::string key, double val, int precision); + LogMessageWithFields(std::string key, std::string val); + LogMessageWithFields& Append(std::string key, uint64_t val); + LogMessageWithFields& Append(std::string key, double val, int precision); + LogMessageWithFields& Append(std::string key, std::string val); + std::string LogString() const; + private: + std::string log_string_; +}; + + class AbstractLogger { public: virtual void SetLogLevel(LogLevel level) = 0; @@ -23,6 +37,10 @@ class AbstractLogger { virtual void Error(const std::string& text) const = 0; virtual void Debug(const std::string& text) const = 0; virtual void Warning(const std::string& text) const = 0; + virtual void Info(const LogMessageWithFields& msg) const = 0; + virtual void Error(const LogMessageWithFields& msg) const = 0; + virtual void Debug(const LogMessageWithFields& msg) const = 0; + virtual void Warning(const LogMessageWithFields& msg) const = 0; virtual void EnableLocalLog(bool enable) = 0; virtual void EnableRemoteLog(bool enable) = 0; virtual ~AbstractLogger() = default; @@ -36,6 +54,7 @@ Logger CreateDefaultLoggerApi(const std::string& name, const std::string& endpoi LogLevel StringToLogLevel(const std::string& name, Error* err); + } #endif //ASAPO_LOGGER_H diff --git a/common/cpp/include/unittests/MockLogger.h b/common/cpp/include/unittests/MockLogger.h index aa8f900dd8a217da2bc53db5bf6b71d8ed081232..8ffc2e6bcf2fb0ff5e88249602240b9b593c57a7 100644 --- a/common/cpp/include/unittests/MockLogger.h +++ b/common/cpp/include/unittests/MockLogger.h @@ -14,6 +14,19 @@ class MockLogger : public AbstractLogger { MOCK_CONST_METHOD1(Error, void(const std::string& )); MOCK_CONST_METHOD1(Debug, void(const std::string& )); MOCK_CONST_METHOD1(Warning, void(const std::string& )); + void Info(const LogMessageWithFields& msg) const override { + Info(msg.LogString()); + }; + void Error(const LogMessageWithFields& msg) const override { + Error(msg.LogString()); + }; + void Debug(const LogMessageWithFields& msg) const override { + Debug(msg.LogString()); + }; + void Warning(const LogMessageWithFields& msg) const override { + Warning(msg.LogString()); + }; + MOCK_METHOD1(SetLogLevel, void(LogLevel)); MOCK_METHOD1(EnableLocalLog, void(bool)); MOCK_METHOD1(EnableRemoteLog, void(bool)); diff --git a/common/cpp/src/logger/CMakeLists.txt b/common/cpp/src/logger/CMakeLists.txt index 4db51b27f4eca2b48154bb31ba54ece84fec2292..600a1b66693026ba87d7db2e5769f2c3419aea7d 100644 --- a/common/cpp/src/logger/CMakeLists.txt +++ b/common/cpp/src/logger/CMakeLists.txt @@ -1,6 +1,6 @@ set(TARGET_NAME logger) set(SOURCE_FILES - spd_logger.cpp spd_logger.h logger_factory.cpp fluentd_sink.cpp fluentd_sink.h) + spd_logger.cpp spd_logger.h logger.cpp fluentd_sink.cpp fluentd_sink.h) ################################ # Library diff --git a/common/cpp/src/logger/fluentd_sink.cpp b/common/cpp/src/logger/fluentd_sink.cpp index 419a9d4a9566b41660fb955928c99f3f4c997878..540841f0a7ba650c3d1832a74a37381c85ec45c9 100644 --- a/common/cpp/src/logger/fluentd_sink.cpp +++ b/common/cpp/src/logger/fluentd_sink.cpp @@ -8,9 +8,8 @@ void FluentdSink::_sink_it(const spdlog::details::log_msg& msg) { std::string log_str = msg.formatted.str(); HttpCode code; log_str.erase(log_str.find_last_not_of("\n\r\t") + 1); - std::string string_to_send = "json={\"message\":\"" + log_str + "\"}"; Error err; - httpclient__->Post(endpoint_uri_, string_to_send, &code, &err); + httpclient__->Post(endpoint_uri_, log_str, &code, &err); if (err) { std::cerr << "cannot send logs - " + err->Explain() << std::endl; } diff --git a/common/cpp/src/logger/logger.cpp b/common/cpp/src/logger/logger.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8ab8acbf9b8ec2e0bd0cd956d4f5c8d1ee3fdd88 --- /dev/null +++ b/common/cpp/src/logger/logger.cpp @@ -0,0 +1,82 @@ +#include "spd_logger.h" + +namespace asapo { + +Logger CreateLogger(std::string name, bool console, bool centralized_log, const std::string& endpoint_uri) { + auto logger = new SpdLogger{name, endpoint_uri}; + logger->SetLogLevel(LogLevel::Info); + if (console) { + logger->EnableLocalLog(true); + } + if (centralized_log) { + logger->EnableRemoteLog(true); + } + + return Logger{logger}; +} + + +Logger CreateDefaultLoggerBin(const std::string& name) { + return CreateLogger(name, true, false, ""); +} + +Logger CreateDefaultLoggerApi(const std::string& name, const std::string& endpoint_uri) { + return CreateLogger(name, false, true, endpoint_uri); +} + +LogLevel StringToLogLevel(const std::string& name, Error* err) { + *err = nullptr; + if (name == "debug") return LogLevel::Debug; + if (name == "info") return LogLevel::Info; + if (name == "warning") return LogLevel::Warning; + if (name == "none") return LogLevel::None; + if (name == "error") return LogLevel::Error; + + *err = TextError("wrong log level: " + name); + return LogLevel::None; +} + +template<typename ... Args> +std::string string_format( const std::string& format, Args ... args ) { + size_t size = snprintf( nullptr, 0, format.c_str(), args ... ) + 1; + std::unique_ptr<char[]> buf( new char[ size ] ); + snprintf( buf.get(), size, format.c_str(), args ... ); + return std::string( buf.get(), buf.get() + size - 1 ); +} + + +std::string EncloseQuotes(std::string str) { + return "\"" + std::move(str) + "\""; +} + +LogMessageWithFields::LogMessageWithFields(std::string key, uint64_t val) { + log_string_ = EncloseQuotes(key) + ":" + std::to_string(val); +} + +LogMessageWithFields::LogMessageWithFields(std::string key, double val, int precision) { + log_string_ = EncloseQuotes(key) + ":" + string_format("%." + std::to_string(precision) + "f", val); +} + +LogMessageWithFields::LogMessageWithFields(std::string key, std::string val) { + log_string_ = EncloseQuotes(key) + ":" + EncloseQuotes(val); +} + +LogMessageWithFields& LogMessageWithFields::Append(std::string key, uint64_t val) { + log_string_ += "," + EncloseQuotes(key) + ":" + std::to_string(val); + return *this; +} + +LogMessageWithFields& LogMessageWithFields::Append(std::string key, double val, int precision) { + log_string_ += "," + EncloseQuotes(key) + ":" + string_format("%." + std::to_string(precision) + "f", val); + return *this; +} + +LogMessageWithFields& LogMessageWithFields::Append(std::string key, std::string val) { + log_string_ += "," + EncloseQuotes(key) + ":" + EncloseQuotes(val); + return *this; +} + +std::string LogMessageWithFields::LogString() const { + return log_string_; +} +}; \ No newline at end of file diff --git a/common/cpp/src/logger/logger_factory.cpp b/common/cpp/src/logger/logger_factory.cpp deleted file mode 100644 index d5fbf0f1724f1a15e837e436b289985956465629..0000000000000000000000000000000000000000 --- a/common/cpp/src/logger/logger_factory.cpp +++ /dev/null @@ -1,39 +0,0 @@ -#include "spd_logger.h" - -namespace asapo { - -Logger CreateLogger(std::string name, bool console, bool centralized_log, const std::string& endpoint_uri) { - auto logger = new SpdLogger{name, endpoint_uri}; - logger->SetLogLevel(LogLevel::Info); - if (console) { - logger->EnableLocalLog(true); - } - if (centralized_log) { - logger->EnableRemoteLog(true); - } - - return Logger{logger}; -} - - -Logger CreateDefaultLoggerBin(const std::string& name) { - return CreateLogger(name, true, false, ""); -} - -Logger CreateDefaultLoggerApi(const std::string& name, const std::string& endpoint_uri) { - return CreateLogger(name, false, true, endpoint_uri); -} - -LogLevel StringToLogLevel(const std::string& name, Error* err) { - *err = nullptr; - if (name == "debug") return LogLevel::Debug; - if (name == "info") return LogLevel::Info; - if (name == "warning") return LogLevel::Warning; - if (name == "none") return LogLevel::None; - if (name == "error") return LogLevel::Error; - - *err = TextError("wrong log level: " + name); - return LogLevel::None; -} - -}; \ No newline at end of file diff --git a/common/cpp/src/logger/spd_logger.cpp b/common/cpp/src/logger/spd_logger.cpp index 51b6053a9a578d376000b83d578e884e5f9ebc1c..4b66a759cb36d2dc56f156464ee49cfb4b83ecb9 100644 --- a/common/cpp/src/logger/spd_logger.cpp +++ b/common/cpp/src/logger/spd_logger.cpp @@ -25,10 +25,18 @@ void SpdLogger::SetLogLevel(LogLevel level) { } } } +std::string EncloseMsg(std::string msg) { + if (msg.find(":") == std::string::npos) { + return std::string(R"("message":")") + msg + "\""; + } else { + return msg; + } + +} void SpdLogger::Info(const std::string& text)const { if (log__) { - log__->info(text); + log__->info(EncloseMsg(text)); } } @@ -47,6 +55,7 @@ void SpdLogger::UpdateLoggerSinks() { sinks_.push_back(std::shared_ptr<FluentdSink> {new FluentdSink(endpoint_uri_)}); } log__ = std::unique_ptr<spdlog::logger> {new spdlog::async_logger(name_, std::begin(sinks_), std::end(sinks_), 1024)}; + log__->set_pattern(R"({"time":"%Y-%m-%d %H:%M:%S.%F","source":"%n","level":"%l",%v})"); } SpdLogger::SpdLogger(const std::string& name, const std::string& endpoint_uri): name_{name}, endpoint_uri_{endpoint_uri} { @@ -54,20 +63,20 @@ SpdLogger::SpdLogger(const std::string& name, const std::string& endpoint_uri): } void SpdLogger::Error(const std::string& text)const { if (log__) { - log__->error(text); + log__->error(EncloseMsg(text)); } } void SpdLogger::Debug(const std::string& text) const { if (log__) { - log__->debug(text); + log__->debug(EncloseMsg(text)); } } void SpdLogger::Warning(const std::string& text)const { if (log__) { - log__->warn(text); + log__->warn(EncloseMsg(text)); } } @@ -75,5 +84,19 @@ void SpdLogger::EnableRemoteLog(bool enable) { centralized_log_ = enable; UpdateLoggerSinks(); } +void SpdLogger::Info(const LogMessageWithFields& msg) const { + Info(msg.LogString()); +} +void SpdLogger::Error(const LogMessageWithFields& msg) const { + Error(msg.LogString()); + +} +void SpdLogger::Debug(const LogMessageWithFields& msg) const { + Debug(msg.LogString()); -} \ No newline at end of file +} +void SpdLogger::Warning(const LogMessageWithFields& msg) const { + Warning(msg.LogString()); +} + +} diff --git a/common/cpp/src/logger/spd_logger.h b/common/cpp/src/logger/spd_logger.h index 8309bc4911a9aa44f365446b47ed6c2d365907aa..904dbfc26c0cdff440d677dc1906691c89a46f19 100644 --- a/common/cpp/src/logger/spd_logger.h +++ b/common/cpp/src/logger/spd_logger.h @@ -14,6 +14,11 @@ class SpdLogger : public AbstractLogger { void Error(const std::string& text) const override; void Debug(const std::string& text) const override; void Warning(const std::string& text) const override; + void Info(const LogMessageWithFields& msg) const override; + void Error(const LogMessageWithFields& msg) const override; + void Debug(const LogMessageWithFields& msg) const override; + void Warning(const LogMessageWithFields& msg) const override; + void EnableLocalLog(bool enable) override; void EnableRemoteLog(bool enable) override; ~SpdLogger() = default; @@ -26,6 +31,12 @@ class SpdLogger : public AbstractLogger { bool centralized_log_ = false; void UpdateLoggerSinks(); }; + +std::string EncloseMsg(std::string msg); + } + + + #endif //ASAPO_SPDLOGGER_H diff --git a/common/cpp/unittests/logger/test_logger.cpp b/common/cpp/unittests/logger/test_logger.cpp index 7db8dcbe002dfc8f6bbb9a36baa533b7f3c4d281..bead7006ce0a668f07929f35d8f1ef93978f121f 100644 --- a/common/cpp/unittests/logger/test_logger.cpp +++ b/common/cpp/unittests/logger/test_logger.cpp @@ -16,6 +16,7 @@ using ::testing::HasSubstr; using ::testing::ElementsAre; using asapo::LogLevel; +using asapo::LogMessageWithFields; namespace { @@ -68,9 +69,14 @@ class LoggerTests : public Test { std::unique_ptr<spdlog::logger> log; asapo::SpdLogger logger{"test", "test_uri"}; spdlog::details::log_msg msg; + spdlog::details::log_msg msg_json; + std::string test_string{"Hello"}; + std::string test_string_json{R"("Hello":"test","int":1,"double":123.234)"}; + void SetUp() override { - msg.raw << test_string; + msg.raw << R"("message":"Hello")"; + msg_json.raw << R"("Hello":"test","int":1,"double":123.234)"; log.reset(new spdlog::logger("mylogger", mock_sink)); logger.log__ = std::move(log); } @@ -79,14 +85,18 @@ class LoggerTests : public Test { }; MATCHER_P(CompareMsg, msg, "") { + *result_listener << "Comparing " << "|" << arg.raw.str() << "|" << " and " << "|" << (*msg).raw.str() << "|" << + "Level:" << arg.level << " and " << (*msg).level; if (arg.level != (*msg).level) return false; - if (arg.raw.str() != (*msg).raw.c_str()) return false; + + if (arg.raw.str() != (*msg).raw.str()) return false; return true; } TEST_F(LoggerTests, Info) { msg.level = spdlog::level::info; + logger.SetLogLevel(LogLevel::Info); EXPECT_CALL(*mock_sink, _sink_it(CompareMsg(&msg))); @@ -94,6 +104,29 @@ TEST_F(LoggerTests, Info) { logger.Info(test_string); } +TEST_F(LoggerTests, InfoJson) { + msg_json.level = spdlog::level::info; + + logger.SetLogLevel(LogLevel::Info); + + EXPECT_CALL(*mock_sink, _sink_it(CompareMsg(&msg_json))); + + logger.Info(test_string_json); +} + +TEST_F(LoggerTests, InfoMessage) { + msg_json.level = spdlog::level::info; + + asapo::LogMessageWithFields msg{"Hello", "test"}; + logger.SetLogLevel(LogLevel::Info); + + EXPECT_CALL(*mock_sink, _sink_it(CompareMsg(&msg_json))); + + logger.Info(msg.Append("int", 1).Append("double", 123.234, 3)); +} + + + TEST_F(LoggerTests, Debug) { msg.level = spdlog::level::debug; logger.SetLogLevel(LogLevel::Debug); @@ -146,5 +179,65 @@ TEST_F(LoggerTests, NoDebugOnNoneLevel) { logger.Info(test_string); } +TEST(Message, ConstructorString) { + asapo::LogMessageWithFields msg{"Hello", "test"}; + + auto message = msg.LogString(); + + ASSERT_THAT(message, Eq(R"("Hello":"test")")); +} + + +TEST(Message, ConstructorInt) { + asapo::LogMessageWithFields msg{"Hello", 123}; + + auto message = msg.LogString(); + + ASSERT_THAT(message, Eq(R"("Hello":123)")); +} + + +TEST(Message, ConstructorDouble) { + asapo::LogMessageWithFields msg{"Hello", 123.0, 1}; + + auto message = msg.LogString(); + + ASSERT_THAT(message, Eq(R"("Hello":123.0)")); + +} + +TEST(Message, AddString) { + auto message = asapo::LogMessageWithFields{"Hello", "test"} .Append("test", "test").LogString(); + + ASSERT_THAT(message, Eq(R"("Hello":"test","test":"test")")); +} + +TEST(Message, AddInt) { + asapo::LogMessageWithFields msg{"Hello", "test"}; + msg = msg.Append("test", 123); + + auto message = msg.LogString(); + + ASSERT_THAT(message, Eq(R"("Hello":"test","test":123)")); +} + +TEST(Message, AddDouble) { + asapo::LogMessageWithFields msg{"Hello", "test"}; + + auto message = msg.Append("test", 123.2, 2).LogString(); + + ASSERT_THAT(message, Eq(R"("Hello":"test","test":123.20)")); +} + +TEST(Message, Multi) { + asapo::LogMessageWithFields msg{"Hello", "test"}; + msg.Append("test", 123).Append("test", "test").Append("test", 123.2, 2); + + auto message = msg.LogString(); + + ASSERT_THAT(message, Eq(R"("Hello":"test","test":123,"test":"test","test":123.20)")); +} + + } diff --git a/config/fluentd/fluentd.conf b/config/fluentd/fluentd.conf index faa87694e1e518eea5cc4abcef9e73672a34029a..6da854300d9708cf40f9d22fb30172f8207b0e53 100644 --- a/config/fluentd/fluentd.conf +++ b/config/fluentd/fluentd.conf @@ -3,6 +3,8 @@ port 9880 bind 0.0.0.0 add_remote_addr true + format json + time_format %Y-%m-%d %H:%M:%S.%N </source> <source> @@ -14,18 +16,6 @@ time_format %Y-%m-%d %H:%M:%S.%N </source> -<filter asapo> -@type parser -key_name message -reserve_data true -<parse> - @type regexp - expression /\[(?<time>[^\]]*)\] \[(?<source>[^ ]*)\] \[(?<level>[^ ]*)\] (?<message>[^\n]*)/ - time_key time - time_format %Y-%m-%d %H:%M:%S.%N - </parse> -</filter> - <match asapo.**> @type copy <store> @@ -47,5 +37,3 @@ reserve_data true path /tmp/fluentd/asapo </store> </match> - - diff --git a/config/nomad/nginx.nmd.in b/config/nomad/nginx.nmd.in new file mode 100644 index 0000000000000000000000000000000000000000..da6826764f1c7f2ce9979246c2440aee482d7891 --- /dev/null +++ b/config/nomad/nginx.nmd.in @@ -0,0 +1,63 @@ +job "nginx" { + datacenters = ["dc1"] + + type = "service" + + update { + max_parallel = 1 + min_healthy_time = "10s" + healthy_deadline = "3m" + auto_revert = false + } + + group "nginx" { + count = 1 + + restart { + attempts = 2 + interval = "30m" + delay = "15s" + mode = "fail" + } + + task "nginx" { + driver = "raw_exec" + + config { + command = "nginx", + args = ["-c","${NOMAD_TASK_DIR}/nginx.conf"] + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + mbits = 10 + port "nginx" { + static = 8400 + } + } + } + + service { + port = "nginx" + name = "nginx" + check { + name = "alive" + type = "http" + path = "/nginx_health" + timeout = "2s" + interval = "10s" + } + } + + template { + source = "@WORK_DIR@/nginx.conf.tpl" + destination = "local/nginx.conf" + change_mode = "restart" + } + + + } + } +} diff --git a/discovery/src/asapo_discovery/request_handler/request_handler.go b/discovery/src/asapo_discovery/request_handler/request_handler.go index e2cac458fd6e68d543dae36d56354ddef531b244..b2ce9b5601af3c2df091c926e6ef54e6acc5ab11 100644 --- a/discovery/src/asapo_discovery/request_handler/request_handler.go +++ b/discovery/src/asapo_discovery/request_handler/request_handler.go @@ -1,7 +1,10 @@ package request_handler +import "asapo_discovery/utils" + type Agent interface { GetReceivers() ([]byte, error) - Init(int,[]string) error + GetBroker() ([]byte, error) + Init(settings utils.Settings) error } diff --git a/discovery/src/asapo_discovery/request_handler/request_handler_consul.go b/discovery/src/asapo_discovery/request_handler/request_handler_consul.go index c414158e5865d9d85c9100311c732ab16de0e51d..c4a948a4b26aaea7608e7b29188966c376b9550e 100644 --- a/discovery/src/asapo_discovery/request_handler/request_handler_consul.go +++ b/discovery/src/asapo_discovery/request_handler/request_handler_consul.go @@ -5,6 +5,8 @@ import ( "github.com/hashicorp/consul/api" "strconv" "errors" + "sort" + "sync" ) type ConsulRequestHandler struct { @@ -17,20 +19,63 @@ type Responce struct { Uris []string } +type SafeCounter struct { + counter int + mux sync.Mutex +} + +func (c *SafeCounter) Next(size int) int { + c.mux.Lock() + defer c.mux.Unlock() + val := c.counter % size + c.counter++ + return val +} + +var counter SafeCounter + +func (rh *ConsulRequestHandler) GetServices(name string) ([]string, error) { + var result = make([]string, 0) + services, _, err := rh.client.Health().Service(name, "", true, nil) + if err != nil { + return nil, err + } + for _, service := range (services) { + result = append(result, service.Node.Address+":"+strconv.Itoa(service.Service.Port)) + } + sort.Strings(result) + return result, nil +} + func (rh *ConsulRequestHandler) GetReceivers() ([]byte, error) { - if (rh.client == nil){ - return nil,errors.New("consul client not connected") + if (rh.client == nil) { + return nil, errors.New("consul client not connected") + } + var response Responce + var err error + response.Uris, err = rh.GetServices("receiver") + if err != nil { + return nil, err } - var responce Responce - services,_,err := rh.client.Health().Service("receiver","",true,nil) - if err!=nil { - return nil,err + response.MaxConnections = rh.MaxConnections + return utils.MapToJson(&response) +} + +func (rh *ConsulRequestHandler) GetBroker() ([]byte, error) { + if (rh.client == nil) { + return nil, errors.New("consul client not connected") + } + response, err := rh.GetServices("broker") + if err != nil { + return nil, err } - for _,service := range (services) { - responce.Uris = append(responce.Uris,service.Node.Address+":"+strconv.Itoa(service.Service.Port)) + size := len(response) + if size ==0 { + return []byte(""),nil + }else { + return []byte(response[counter.Next(size)]),nil } - responce.MaxConnections = rh.MaxConnections - return utils.MapToJson(&responce) + return nil, nil } func (rh *ConsulRequestHandler) connectClient(uri string) (client *api.Client, err error) { @@ -48,13 +93,13 @@ func (rh *ConsulRequestHandler) connectClient(uri string) (client *api.Client, e return } -func (rh *ConsulRequestHandler) Init(maxCons int, uris []string) (err error) { - rh.MaxConnections = maxCons - if len(uris) == 0 { +func (rh *ConsulRequestHandler) Init(settings utils.Settings) (err error) { + rh.MaxConnections = settings.Receiver.MaxConnections + if len(settings.ConsulEndpoints) == 0 { rh.client, err = rh.connectClient("") return err } - for _, uri := range (uris) { + for _, uri := range (settings.ConsulEndpoints) { rh.client, err = rh.connectClient(uri) if err == nil { return nil diff --git a/discovery/src/asapo_discovery/request_handler/request_handler_consul_test.go b/discovery/src/asapo_discovery/request_handler/request_handler_consul_test.go index 5e9d18f00fc376b4326d4341e880043128ce98bf..e3e5b78c9378a0f4d0bb1271386db745b24fcd3a 100644 --- a/discovery/src/asapo_discovery/request_handler/request_handler_consul_test.go +++ b/discovery/src/asapo_discovery/request_handler/request_handler_consul_test.go @@ -3,13 +3,14 @@ package request_handler import ( "github.com/stretchr/testify/suite" "testing" - "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/api" "strconv" + "asapo_discovery/utils" ) type ConsulHandlerTestSuite struct { suite.Suite - client *api.Client + client *api.Client handler ConsulRequestHandler } @@ -17,68 +18,122 @@ func TestConsulHandlerTestSuite(t *testing.T) { suite.Run(t, new(ConsulHandlerTestSuite)) } +var consul_settings utils.Settings + +func (suite *ConsulHandlerTestSuite) registerAgents(name string) { + for i := 1234; i < 1236; i++ { + reg := &api.AgentServiceRegistration{ + ID: name + strconv.Itoa(i), + Name: name, + Port: i, + Check: &api.AgentServiceCheck{ + Interval: "10m", + Status: "passing", + HTTP: "http://localhost:5000/health", + }, + } + err := suite.client.Agent().ServiceRegister(reg) + if err != nil { + panic(err) + } + } + +} + func (suite *ConsulHandlerTestSuite) SetupTest() { var err error + consul_settings = utils.Settings{Receiver: utils.ReceiverInfo{MaxConnections: 10, StaticEndpoints: []string{}}} + suite.client, err = api.NewClient(api.DefaultConfig()) if err != nil { panic(err) } - for i:=1234;i<1236;i++ { - reg := &api.AgentServiceRegistration{ - ID: "receiver"+strconv.Itoa(i), - Name: "receiver", - Port: i, - Check: &api.AgentServiceCheck{ - Interval:"10m", - Status:"passing", - HTTP: "http://localhost:5000/health", - }, - } - err = suite.client.Agent().ServiceRegister(reg) - if err != nil { - panic(err) - } - } + + suite.registerAgents("receiver") + suite.registerAgents("broker") + } func (suite *ConsulHandlerTestSuite) TearDownTest() { suite.client.Agent().ServiceDeregister("receiver1234") suite.client.Agent().ServiceDeregister("receiver1235") + suite.client.Agent().ServiceDeregister("broker1234") + suite.client.Agent().ServiceDeregister("broker1235") } - func (suite *ConsulHandlerTestSuite) TestInitDefaultUri() { - err := suite.handler.Init(10,[]string{}) - suite.NoError(err, "empty list") + err := suite.handler.Init(consul_settings) + suite.NoError(err, "empty list") } func (suite *ConsulHandlerTestSuite) TestInitWrongUri() { - err := suite.handler.Init(10,[]string{"blabla"}) - suite.Error(err, "wrong consul uri") - suite.Nil(suite.handler.client, "client nli after error") + consul_settings.ConsulEndpoints = []string{"blabla"} + err := suite.handler.Init(consul_settings) + suite.Error(err, "wrong consul uri") + suite.Nil(suite.handler.client, "client nli after error") } func (suite *ConsulHandlerTestSuite) TestInitOkUriFirst() { - err := suite.handler.Init(10,[]string{"http://127.0.0.1:8500"}) - suite.NoError(err, "") + consul_settings.ConsulEndpoints = []string{"http://127.0.0.1:8500"} + + err := suite.handler.Init(consul_settings) + suite.NoError(err, "") } func (suite *ConsulHandlerTestSuite) TestInitOkUriNotFirst() { - err := suite.handler.Init(10,[]string{"blabla","http://127.0.0.1:8500"}) - suite.NoError(err, "") -} + consul_settings.ConsulEndpoints = []string{"blabla", "http://127.0.0.1:8500"} + err := suite.handler.Init(consul_settings) + suite.NoError(err, "") +} func (suite *ConsulHandlerTestSuite) TestGetReceivers() { - suite.handler.Init(10,[]string{}) - res,err := suite.handler.GetReceivers() - suite.NoError(err, "") - suite.Equal("{\"MaxConnections\":10,\"Uris\":[\"127.0.0.1:1234\",\"127.0.0.1:1235\"]}",string(res),"uris") + suite.handler.Init(consul_settings) + res, err := suite.handler.GetReceivers() + suite.NoError(err, "") + suite.Equal("{\"MaxConnections\":10,\"Uris\":[\"127.0.0.1:1234\",\"127.0.0.1:1235\"]}", string(res), "uris") } func (suite *ConsulHandlerTestSuite) TestGetReceiversWhenNotConnected() { - suite.handler.Init(10,[]string{"blabla"}) - _,err := suite.handler.GetReceivers() - suite.Error(err, "") + consul_settings.ConsulEndpoints = []string{"blabla"} + suite.handler.Init(consul_settings) + _, err := suite.handler.GetReceivers() + suite.Error(err, "") +} + +func (suite *ConsulHandlerTestSuite) TestGetBrokerWhenNotConnected() { + consul_settings.ConsulEndpoints = []string{"blabla"} + suite.handler.Init(consul_settings) + _, err := suite.handler.GetBroker() + suite.Error(err, "") } + +func (suite *ConsulHandlerTestSuite) TestGetBrokerRoundRobin() { + suite.handler.Init(consul_settings) + res, err := suite.handler.GetBroker() + suite.NoError(err, "") + suite.Equal("127.0.0.1:1234", string(res), "uris") + + res, err = suite.handler.GetBroker() + suite.NoError(err, "") + suite.Equal("127.0.0.1:1235", string(res), "uris") + + res, err = suite.handler.GetBroker() + suite.NoError(err, "") + suite.Equal("127.0.0.1:1234", string(res), "uris") + +} + + +func (suite *ConsulHandlerTestSuite) TestGetBrokerEmpty() { + suite.client.Agent().ServiceDeregister("broker1234") + suite.client.Agent().ServiceDeregister("broker1235") + + suite.handler.Init(consul_settings) + res, err := suite.handler.GetBroker() + suite.NoError(err, "") + suite.Equal("", string(res), "uris") +} + + diff --git a/discovery/src/asapo_discovery/request_handler/request_handler_static.go b/discovery/src/asapo_discovery/request_handler/request_handler_static.go index d52cb205d5b0c00315c871055e8faf96634e269c..1fbe2b4a3edf2a7ef907f97ebe167ce27261983b 100644 --- a/discovery/src/asapo_discovery/request_handler/request_handler_static.go +++ b/discovery/src/asapo_discovery/request_handler/request_handler_static.go @@ -5,15 +5,23 @@ import ( ) type StaticRequestHandler struct { - Responce - } + receiverResponce Responce + broker string +} + func (rh *StaticRequestHandler) GetReceivers() ([]byte, error) { - return utils.MapToJson(&rh) + return utils.MapToJson(&rh.receiverResponce) } -func (rh *StaticRequestHandler) Init(maxCons int,uris []string) error { - rh.MaxConnections = maxCons - rh.Uris = uris +func (rh *StaticRequestHandler) GetBroker() ([]byte, error) { + return []byte(rh.broker),nil +} + + +func (rh *StaticRequestHandler) Init(settings utils.Settings) error { + rh.receiverResponce.MaxConnections = settings.Receiver.MaxConnections + rh.receiverResponce.Uris = settings.Receiver.StaticEndpoints + rh.broker = settings.Broker.StaticEndpoint return nil } diff --git a/discovery/src/asapo_discovery/request_handler/request_handler_static_test.go b/discovery/src/asapo_discovery/request_handler/request_handler_static_test.go index 01e769e4359ac2b05cd2b162907c6d9b2c0a7e5b..af9bf7d80efb0c2d3c23bea6c21d65d847194b14 100644 --- a/discovery/src/asapo_discovery/request_handler/request_handler_static_test.go +++ b/discovery/src/asapo_discovery/request_handler/request_handler_static_test.go @@ -3,23 +3,35 @@ package request_handler import ( "github.com/stretchr/testify/assert" "testing" + "asapo_discovery/utils" ) var uris = []string{"ip1","ip2"} const max_conn = 1 +var static_settings utils.Settings= utils.Settings{Receiver:utils.ReceiverInfo{MaxConnections:max_conn,StaticEndpoints:uris},Broker:utils.BrokerInfo{ + StaticEndpoint:"ip_broker"}} + + + var rh StaticRequestHandler; func TestStaticHandlerInitOK(t *testing.T) { - err := rh.Init(max_conn,uris) + err := rh.Init(static_settings) assert.Nil(t, err) } -func TestStaticHandlerGetOK(t *testing.T) { - rh.Init(max_conn,uris) +func TestStaticHandlerGetReceviersOK(t *testing.T) { + rh.Init(static_settings) res,err := rh.GetReceivers() assert.Equal(t,string(res), "{\"MaxConnections\":1,\"Uris\":[\"ip1\",\"ip2\"]}") assert.Nil(t, err) +} +func TestStaticHandlerGetBrokerOK(t *testing.T) { + rh.Init(static_settings) + res,err := rh.GetBroker() + assert.Equal(t,string(res), "ip_broker") + assert.Nil(t, err) } diff --git a/discovery/src/asapo_discovery/server/get_receivers.go b/discovery/src/asapo_discovery/server/get_receivers.go index 41e0cfa68399036a38f31c68bf490f225e9aa025..70cb2249cb4a4c0270f78ba59bb9a48c96d91c3c 100644 --- a/discovery/src/asapo_discovery/server/get_receivers.go +++ b/discovery/src/asapo_discovery/server/get_receivers.go @@ -3,11 +3,23 @@ package server import ( "net/http" "asapo_discovery/logger" + "errors" ) -func getReceivers() (answer []byte, code int) { - answer, err := requestHandler.GetReceivers() - log_str := "processing get receivers " +func getService(service string) (answer []byte, code int) { + var err error + switch service { + case "receivers": + answer, err = requestHandler.GetReceivers() + break + case "broker": + answer, err = requestHandler.GetBroker() + break + default: + err = errors.New("wrong request: "+service) + } + + log_str := "processing get "+service if err != nil { logger.Error(log_str + " - " + err.Error()) return []byte(err.Error()),http.StatusInternalServerError @@ -19,8 +31,14 @@ func getReceivers() (answer []byte, code int) { func routeGetReceivers(w http.ResponseWriter, r *http.Request) { r.Header.Set("Content-type", "application/json") - answer,code := getReceivers() + answer,code := getService("receivers") w.WriteHeader(code) w.Write(answer) } +func routeGetBroker(w http.ResponseWriter, r *http.Request) { + r.Header.Set("Content-type", "application/json") + answer,code := getService("broker") + w.WriteHeader(code) + w.Write(answer) +} diff --git a/discovery/src/asapo_discovery/server/listroutes.go b/discovery/src/asapo_discovery/server/listroutes.go index 5e87f88da6acbb38b2d4ff5b487dfe7dfc3425f9..ed068c3430ed7c61249515db7e2e05f26a785ac8 100644 --- a/discovery/src/asapo_discovery/server/listroutes.go +++ b/discovery/src/asapo_discovery/server/listroutes.go @@ -11,4 +11,11 @@ var listRoutes = utils.Routes{ "/receivers", routeGetReceivers, }, + utils.Route{ + "GetBroker", + "Get", + "/broker", + routeGetBroker, + }, + } diff --git a/discovery/src/asapo_discovery/server/get_receivers_test.go b/discovery/src/asapo_discovery/server/routes_test.go similarity index 75% rename from discovery/src/asapo_discovery/server/get_receivers_test.go rename to discovery/src/asapo_discovery/server/routes_test.go index 59c8ce226fe4be3af87f34f900db129fbce6e175..4d35c2c090bfd6c597ec105a7937e5038cff92a5 100644 --- a/discovery/src/asapo_discovery/server/get_receivers_test.go +++ b/discovery/src/asapo_discovery/server/routes_test.go @@ -30,7 +30,10 @@ type GetReceiversTestSuite struct { func (suite *GetReceiversTestSuite) SetupTest() { requestHandler = new(request_handler.StaticRequestHandler) - requestHandler.Init(10,[]string{"ip1","ip2"}) + var s utils.Settings= utils.Settings{Receiver:utils.ReceiverInfo{MaxConnections:10,StaticEndpoints:[]string{"ip1","ip2"}}, + Broker:utils.BrokerInfo{StaticEndpoint:"ip_broker"}} + + requestHandler.Init(s) logger.SetMockLog() } @@ -64,3 +67,14 @@ func (suite *GetReceiversTestSuite) TestGetReceivers() { } +func (suite *GetReceiversTestSuite) TestGetBroker() { + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing get broker"))) + + w := doRequest("/broker") + + suite.Equal(http.StatusOK, w.Code, "code ok") + suite.Equal(w.Body.String(), "ip_broker", "result") + assertExpectations(suite.T()) +} + + diff --git a/discovery/src/asapo_discovery/server/server.go b/discovery/src/asapo_discovery/server/server.go index 36979843ddf4ab258ed47f7761cc93c3e453074a..fb1bb5d5c629a18c517c3193bd56730ac18e9c8a 100644 --- a/discovery/src/asapo_discovery/server/server.go +++ b/discovery/src/asapo_discovery/server/server.go @@ -2,50 +2,20 @@ package server import ( "asapo_discovery/request_handler" - "errors" + "asapo_discovery/utils" ) var requestHandler request_handler.Agent -type serverSettings struct { - Endpoints []string - Mode string - Port int - MaxConnections int - LogLevel string -} -var settings serverSettings +var settings utils.Settings func SetHandler(rh request_handler.Agent) error { requestHandler = rh - err := requestHandler.Init(settings.MaxConnections,settings.Endpoints) + err := requestHandler.Init(settings) return err } -func (settings *serverSettings) Validate() error { - if len(settings.Endpoints) == 0 && settings.Mode != "consul"{ - return errors.New("Endpoints not set") - } - - if settings.MaxConnections == 0 { - return errors.New("Max connections not set") - } - - if settings.Port == 0 { - return errors.New("Server port not set") - } - - if settings.Mode == "" { - return errors.New("Mode not set") - } - - if settings.Mode != "static" && settings.Mode != "consul" { - return errors.New("wrong mode: " + settings.Mode+ ", (allowed static|consul)") - } - - return nil -} func GetHandlerMode()string { return settings.Mode diff --git a/discovery/src/asapo_discovery/server/server_nottested.go b/discovery/src/asapo_discovery/server/server_nottested.go index 9037866606296617a6e0cbd6c509944e1a3af0a3..d193d866331875e5ec8801758b3a5745d51ba69d 100644 --- a/discovery/src/asapo_discovery/server/server_nottested.go +++ b/discovery/src/asapo_discovery/server/server_nottested.go @@ -23,7 +23,5 @@ func ReadConfig(fname string) (log.Level, error) { if err := settings.Validate(); err != nil { return log.FatalLevel,err } - return log.LevelFromString(settings.LogLevel) - } diff --git a/discovery/src/asapo_discovery/server/settings_test.go b/discovery/src/asapo_discovery/server/settings_test.go index 00e12fdd7367e33a65cca193d06831c43d178147..2d85beb1da0aa200187f88616d44a15be0035506 100644 --- a/discovery/src/asapo_discovery/server/settings_test.go +++ b/discovery/src/asapo_discovery/server/settings_test.go @@ -3,15 +3,18 @@ package server import ( "github.com/stretchr/testify/assert" "testing" + "asapo_discovery/utils" ) -func fillSettings(mode string)serverSettings { - var settings serverSettings +func fillSettings(mode string) utils.Settings { + var settings utils.Settings settings.Port = 1 settings.Mode = mode - settings.MaxConnections = 10 + settings.Receiver.MaxConnections = 10 settings.LogLevel = "info" - settings.Endpoints=[]string{"ip1","ip2"} + settings.Receiver.StaticEndpoints=[]string{"ip1","ip2"} + settings.Broker.StaticEndpoint="ip_b" + settings.ConsulEndpoints=[]string{"ipc1","ipc2"} return settings } @@ -27,16 +30,25 @@ func TestSettingsWrongMode(t *testing.T) { assert.NotNil(t, err) } -func TestSettingsStaticModeNoEndpoints(t *testing.T) { +func TestSettingsStaticModeNoReceiverEndpoints(t *testing.T) { settings := fillSettings("static") - settings.Endpoints=[]string{} + settings.Receiver.StaticEndpoints=[]string{} err := settings.Validate() assert.NotNil(t, err) } +func TestSettingsStaticModeNoBrokerEndpoints(t *testing.T) { + settings := fillSettings("static") + settings.Broker.StaticEndpoint="" + err := settings.Validate() + assert.NotNil(t, err) +} + + + func TestSettingsConsulModeNoEndpoints(t *testing.T) { settings := fillSettings("consul") - settings.Endpoints=[]string{} + settings.ConsulEndpoints=[]string{} err := settings.Validate() assert.Nil(t, err) } diff --git a/discovery/src/asapo_discovery/utils/stucts.go b/discovery/src/asapo_discovery/utils/stucts.go new file mode 100644 index 0000000000000000000000000000000000000000..f9bf6c316d657841c564b184dd9ea9fcc114f505 --- /dev/null +++ b/discovery/src/asapo_discovery/utils/stucts.go @@ -0,0 +1,48 @@ +package utils + +import "errors" + +type ReceiverInfo struct { + StaticEndpoints []string + MaxConnections int +} + +type BrokerInfo struct { + StaticEndpoint string +} + + +type Settings struct { + Receiver ReceiverInfo + Broker BrokerInfo + ConsulEndpoints []string + Mode string + Port int + LogLevel string +} + +func (settings *Settings) Validate() error { + if settings.Mode != "consul"{ + if len(settings.Receiver.StaticEndpoints) == 0 || len(settings.Broker.StaticEndpoint) == 0 { + return errors.New("receiver or broker endpoints not set") + } + } + + if settings.Receiver.MaxConnections == 0 { + return errors.New("Max connections not set") + } + + if settings.Port == 0 { + return errors.New("Server port not set") + } + + if settings.Mode == "" { + return errors.New("Mode not set") + } + + if settings.Mode != "static" && settings.Mode != "consul" { + return errors.New("wrong mode: " + settings.Mode+ ", (allowed static|consul)") + } + + return nil +} diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 3b4a76e0800068a081f8ad355a7b44d0420b0db5..1d55b247bbeef7e7b6b6c2ed4fb43353bbd73459 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -85,13 +85,13 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { } producer->EnableLocalLog(true); - producer->SetLogLevel(asapo::LogLevel::Debug); + producer->SetLogLevel(asapo::LogLevel::Info); return producer; } void WaitThreadsFinished(const Args& args) { uint64_t elapsed_ms = 0; - uint64_t timeout_sec = 30; + uint64_t timeout_sec = 3000; while (true) { mutex.lock(); if (iterations_remained <= 0) { diff --git a/examples/worker/getnext_broker/CMakeLists.txt b/examples/worker/getnext_broker/CMakeLists.txt index b8be1ad68eb16bd8e6527f84f749ba781364d9b0..19796f976678b2127e7bdcbeb15b2e9c900739ec 100644 --- a/examples/worker/getnext_broker/CMakeLists.txt +++ b/examples/worker/getnext_broker/CMakeLists.txt @@ -12,10 +12,10 @@ set_target_properties(${TARGET_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY get_target_property(VAR ${TARGET_NAME} RUNTIME_OUTPUT_DIRECTORY) add_dependencies(${TARGET_NAME} asapo-broker) - +prepare_asapo() configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/broker_settings.json settings.json COPYONLY) -add_script_test("${TARGET_NAME}" "${CMAKE_CURRENT_BINARY_DIR}/${TARGET_NAME} $<TARGET_PROPERTY:asapo-broker,EXENAME>") +add_script_test("${TARGET_NAME}" "${CMAKE_CURRENT_BINARY_DIR}/${TARGET_NAME}") set (dir examples/worker/${TARGET_NAME}) install(TARGETS ${TARGET_NAME} DESTINATION "${dir}") diff --git a/examples/worker/getnext_broker/check_linux.sh b/examples/worker/getnext_broker/check_linux.sh index e2e13639bd5f8b56199c909d9a583a7c91a798dc..18fe844f51ae3e6f40943fe0678f0cb0e7252d0e 100644 --- a/examples/worker/getnext_broker/check_linux.sh +++ b/examples/worker/getnext_broker/check_linux.sh @@ -7,22 +7,21 @@ set -e trap Cleanup EXIT Cleanup() { -: - kill -9 $brokerid + set +e + nomad stop nginx + nomad stop discovery + nomad stop broker echo "db.dropDatabase()" | mongo ${database_name} } -args=${@:1:$(($# - 1))} -broker=${@:$#} - -$broker -config settings.json & -brokerid=`echo $!` -sleep 0.3 +nomad run nginx.nmd +nomad run discovery.nmd +nomad run broker.nmd for i in `seq 1 3`; do echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1})' | mongo ${database_name} done -$args 127.0.0.1:5005 $database_name 2 | grep "Processed 3 file(s)" +$@ 127.0.0.1:8400 $database_name 2 | grep "Processed 3 file(s)" diff --git a/examples/worker/getnext_broker/check_windows.bat b/examples/worker/getnext_broker/check_windows.bat index cdbb2597763def528899e49f23d2ad9d7a7b514d..891e876adee4f71d609df9bb626e90b339aa1d2f 100644 --- a/examples/worker/getnext_broker/check_windows.bat +++ b/examples/worker/getnext_broker/check_windows.bat @@ -1,20 +1,16 @@ SET database_name=test_run SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" -::first argument path to the executable -:: second argument path to the broker +c:\opt\consul\nomad run discovery.nmd +c:\opt\consul\nomad run broker.nmd +c:\opt\consul\nomad run nginx.nmd -set full_name="%2" -set short_name="%~nx2" - -start /B "" "%full_name%" -config settings.json - -ping 1.0.0.0 -n 1 -w 100 > nul +ping 1.0.0.0 -n 10 -w 100 > nul for /l %%x in (1, 1, 3) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1}) | %mongo_exe% %database_name% || goto :error -"%1" 127.0.0.1:5005 %database_name% 1 | findstr "Processed 3 file" || goto :error +"%1" 127.0.0.1:8400 %database_name% 1 | findstr /c:"Processed 3 file" || goto :error goto :clean :error @@ -22,5 +18,7 @@ call :clean exit /b 1 :clean -Taskkill /IM "%short_name%" /F +c:\opt\consul\nomad stop discovery +c:\opt\consul\nomad stop broker +c:\opt\consul\nomad stop nginx echo db.dropDatabase() | %mongo_exe% %database_name% diff --git a/examples/worker/getnext_broker/getnext_broker.cpp b/examples/worker/getnext_broker/getnext_broker.cpp index b8bcd17ed7b9f77704aab9d2fc61526d751df011..8655fdbb454a2f3ed04dbd871bbe5e3baffd5f94 100644 --- a/examples/worker/getnext_broker/getnext_broker.cpp +++ b/examples/worker/getnext_broker/getnext_broker.cpp @@ -85,6 +85,6 @@ int main(int argc, char* argv[]) { std::cout << "Processed " << nfiles << " file(s)" << std::endl; std::cout << "Elapsed : " << duration_ms << "ms" << std::endl; - std::cout << "Rate : " << 1000.0f * nfiles / duration_ms << std::endl; + std::cout << "Rate : " << 1000.0f * nfiles / (duration_ms - 10000) << std::endl; return 0; } diff --git a/examples/worker/process_folder/check_windows.bat b/examples/worker/process_folder/check_windows.bat index f03ab5c7094ed5c5d2e9e9c555ba3b77018bac54..27e5525f9b9f2dc38fd2bd566748da488c08b44c 100644 --- a/examples/worker/process_folder/check_windows.bat +++ b/examples/worker/process_folder/check_windows.bat @@ -1,6 +1,6 @@ mkdir test echo "" > test/1 -.\worker_processfolder test | findstr "Processed 1 file(s)" +.\worker_processfolder test | findstr /c:"Processed 1 file(s)" rmdir /S /Q test diff --git a/producer/api/src/receiver_discovery_service.cpp b/producer/api/src/receiver_discovery_service.cpp index 5ca5f7315bacd63c8e2de397ef4eeb1e453bcc4a..ebf87761f06eaf6a75fc0e319f3305cd33826cb1 100644 --- a/producer/api/src/receiver_discovery_service.cpp +++ b/producer/api/src/receiver_discovery_service.cpp @@ -9,9 +9,11 @@ namespace asapo { +const std::string ReceiverDiscoveryService::kServiceEndpointSuffix = "/discovery/receivers"; + ReceiverDiscoveryService::ReceiverDiscoveryService(std::string endpoint, uint64_t update_frequency_ms): httpclient__{DefaultHttpClient()}, log__{GetDefaultProducerLogger()}, - endpoint_{std::move(endpoint) + "/receivers"}, update_frequency_ms_{update_frequency_ms} { + endpoint_{std::move(endpoint) + kServiceEndpointSuffix}, update_frequency_ms_{update_frequency_ms} { } @@ -107,4 +109,4 @@ uint64_t ReceiverDiscoveryService::UpdateFrequency() { return update_frequency_ms_; } -} \ No newline at end of file +} diff --git a/producer/api/src/receiver_discovery_service.h b/producer/api/src/receiver_discovery_service.h index c9893423e2fb1d2764f1411b26a49b92ac5b787a..c1eab17ff3a8a73fdf4b231d03b2667a6af493f0 100644 --- a/producer/api/src/receiver_discovery_service.h +++ b/producer/api/src/receiver_discovery_service.h @@ -28,6 +28,7 @@ class ReceiverDiscoveryService { std::unique_ptr<HttpClient> httpclient__; AbstractLogger* log__; private: + static const std::string kServiceEndpointSuffix; void ThreadHandler(); Error UpdateFromEndpoint(ReceiversList* list, uint64_t* max_connections); Error ParseResponse(const std::string& responce, ReceiversList* list, uint64_t* max_connections); diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp index 3b5972d8a965ff2ac024b9632b7946cd9a6e6148..e85d51645444fa3b9e81912e554853a0d356800d 100644 --- a/producer/api/src/request_handler_tcp.cpp +++ b/producer/api/src/request_handler_tcp.cpp @@ -70,7 +70,7 @@ Error RequestHandlerTcp::TrySendToReceiver(const Request* request) { } log__->Debug(std::string("successfully sent data ") + " id: " + std::to_string(request->header.data_id) + " to " + - connected_receiver_uri_); + connected_receiver_uri_); return nullptr; } @@ -94,7 +94,7 @@ bool RequestHandlerTcp::UpdateReceiversList() { bool RequestHandlerTcp::TimeToUpdateReceiverList() { uint64_t elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( high_resolution_clock::now() - - last_receivers_uri_update_).count(); + last_receivers_uri_update_).count(); return elapsed_ms > discovery_service__->UpdateFrequency(); } diff --git a/producer/api/unittests/test_receiver_discovery_service.cpp b/producer/api/unittests/test_receiver_discovery_service.cpp index c69af5fdd533f2373d8fde6af12682c9e49caef6..c31136152534e2d352c58a46d6642b73352d7ffa 100644 --- a/producer/api/unittests/test_receiver_discovery_service.cpp +++ b/producer/api/unittests/test_receiver_discovery_service.cpp @@ -48,7 +48,7 @@ class ReceiversStatusTests : public Test { NiceMock<asapo::MockLogger> mock_logger; NiceMock<MockHttpClient>* mock_http_client; - std::string expected_endpoint{"endpoint/receivers"}; + std::string expected_endpoint{"endpoint/discovery/receivers"}; ReceiverDiscoveryService status{"endpoint", 20}; void SetUp() override { diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index b21db56d503bb1e3e3c1f2acaeee21256d88eab4..13c0004060ca8f68396d58bed7ab2003f032abc2 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -9,7 +9,7 @@ set(SOURCE_FILES src/statistics_sender_influx_db.cpp src/receiver_config.cpp src/producer_logger.cpp - src/request_handler_db_write.cpp) + src/request_handler_db_write.cpp src/statistics_sender_fluentd.cpp src/statistics_sender_fluentd.h) ################################ @@ -48,6 +48,7 @@ set(TEST_SOURCE_FILES unittests/test_request_handler_file_write.cpp unittests/test_request_handler_db_writer.cpp unittests/test_statistics_sender_influx_db.cpp + unittests/test_statistics_sender_fluentd.cpp unittests/mock_receiver_config.cpp ) # diff --git a/receiver/src/statistics.cpp b/receiver/src/statistics.cpp index f2c431e81a4425c99e54f3d766c1d95a04db9af0..fa628baa59bba7fa7284ac7a96ff35ea5d1f4916 100644 --- a/receiver/src/statistics.cpp +++ b/receiver/src/statistics.cpp @@ -1,5 +1,7 @@ #include "statistics.h" #include "statistics_sender_influx_db.h" +#include "statistics_sender_fluentd.h" + #include <algorithm> using std::chrono::high_resolution_clock; @@ -13,7 +15,9 @@ void Statistics::SendIfNeeded() noexcept { } void Statistics::Send() noexcept { - statistics_sender__->SendStatistics(PrepareStatisticsToSend()); + for (auto& sender : statistics_sender_list__) { + sender->SendStatistics(PrepareStatisticsToSend()); + } ResetStatistics(); } @@ -23,7 +27,7 @@ StatisticsToSend Statistics::PrepareStatisticsToSend() const noexcept { stat.n_requests = nrequests_; stat.data_volume = volume_counter_; stat.elapsed_ms = std::max(uint64_t{1}, GetTotalElapsedMs()); - stat.tags = tag_; + stat.tags = tags_; for (auto i = 0; i < kNStatisticEntities; i++) { stat.entity_shares[i] = double(GetElapsedMs(StatisticEntity(i))) / stat.elapsed_ms; } @@ -56,8 +60,11 @@ void Statistics::IncreaseRequestCounter() noexcept { nrequests_++; } -Statistics::Statistics(unsigned int write_frequency) : statistics_sender__{new StatisticsSenderInfluxDb}, -write_interval_{write_frequency} { +Statistics::Statistics(unsigned int write_frequency) : + write_interval_{write_frequency} { + statistics_sender_list__.emplace_back(new StatisticsSenderInfluxDb); +// statistics_sender_list__.emplace_back(new StatisticsSenderFluentd); + ResetStatistics(); } @@ -78,10 +85,7 @@ void Statistics::StopTimer() noexcept { } void Statistics::AddTag(const std::string& name, const std::string& value) noexcept { - if (!tag_.empty()) { - tag_ += ","; - } - tag_ += name + "=" + value; + tags_.push_back(std::make_pair(name, value)); } -} \ No newline at end of file +} diff --git a/receiver/src/statistics.h b/receiver/src/statistics.h index bbe409dff521dbbc167c5185cb222af03df64ebb..be72cbfe07b283f5eea5c03896ea2ed5682ca046 100644 --- a/receiver/src/statistics.h +++ b/receiver/src/statistics.h @@ -4,6 +4,8 @@ #include <chrono> #include <memory> #include <string> +#include <utility> +#include <vector> #include "statistics_sender.h" @@ -23,7 +25,7 @@ struct StatisticsToSend { uint64_t elapsed_ms; uint64_t data_volume; uint64_t n_requests; - std::string tags; + std::vector<std::pair<std::string, std::string>> tags; }; class Statistics { @@ -39,7 +41,7 @@ class Statistics { void SetWriteInterval(uint64_t interval_ms); - std::unique_ptr<StatisticsSender> statistics_sender__; + std::vector<std::unique_ptr<StatisticsSender>> statistics_sender_list__; private: uint64_t GetElapsedMs(StatisticEntity entity) const noexcept; void ResetStatistics() noexcept; @@ -53,8 +55,7 @@ class Statistics { std::chrono::nanoseconds time_counters_[kNStatisticEntities]; uint64_t volume_counter_; unsigned int write_interval_; - std::string tag_; - + std::vector<std::pair<std::string, std::string>> tags_; }; } diff --git a/receiver/src/statistics_sender_fluentd.cpp b/receiver/src/statistics_sender_fluentd.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4a8ed67838ac31541739630a2f52ef0776b00d81 --- /dev/null +++ b/receiver/src/statistics_sender_fluentd.cpp @@ -0,0 +1,31 @@ +#include "statistics_sender_fluentd.h" +#include "statistics.h" + +namespace asapo { + +StatisticsSenderFluentd::StatisticsSenderFluentd() : statistics_log__{asapo::CreateDefaultLoggerApi("receiver_stat", "localhost:8400/logs/")} { + statistics_log__->SetLogLevel(LogLevel::Info); +// statistics_log__->EnableLocalLog(true); +} + +void StatisticsSenderFluentd::SendStatistics(const asapo::StatisticsToSend& statistic) const noexcept { + statistics_log__->Info(StatisticsToString(statistic)); +} + +std::string StatisticsSenderFluentd::StatisticsToString(const asapo::StatisticsToSend& statistic) const noexcept { + LogMessageWithFields msg{"@target_type_key", "stat"}; + for (auto tag : statistic.tags) { + msg.Append(tag.first, tag.second); + } + + msg.Append("elapsed_ms", statistic.elapsed_ms); + msg.Append("data_volume", statistic.data_volume); + msg.Append("n_requests", statistic.n_requests); + msg.Append("db_share", statistic.entity_shares[StatisticEntity::kDatabase], 4); + msg.Append("network_share", statistic.entity_shares[StatisticEntity::kNetwork], 4); + msg.Append("disk_share", statistic.entity_shares[StatisticEntity::kDisk], 4); + + return msg.LogString(); +} + +} diff --git a/receiver/src/statistics_sender_fluentd.h b/receiver/src/statistics_sender_fluentd.h new file mode 100644 index 0000000000000000000000000000000000000000..afd315c713a9f9b7bd3cd83cbe155a9eb7760831 --- /dev/null +++ b/receiver/src/statistics_sender_fluentd.h @@ -0,0 +1,22 @@ + +#ifndef ASAPO_STATISTICS_SENDER_FLUENTD_H +#define ASAPO_STATISTICS_SENDER_FLUENTD_H + +#include "statistics_sender.h" +#include "logger/logger.h" + +namespace asapo { + +class StatisticsSenderFluentd : public StatisticsSender { + public: + StatisticsSenderFluentd(); + virtual void SendStatistics(const StatisticsToSend& statistic) const noexcept override; + Logger statistics_log__; + private: + std::string StatisticsToString(const StatisticsToSend& statistic) const noexcept; + +}; + +} + +#endif //ASAPO_STATISTICS_SENDER_FLUENTD_H diff --git a/receiver/src/statistics_sender_influx_db.cpp b/receiver/src/statistics_sender_influx_db.cpp index 642f2aea9ae38126d85fe77b8b951330ab9346dc..ffbdfac656a4e7c98617e4711f7840434deccdca 100644 --- a/receiver/src/statistics_sender_influx_db.cpp +++ b/receiver/src/statistics_sender_influx_db.cpp @@ -40,8 +40,11 @@ void StatisticsSenderInfluxDb::SendStatistics(const StatisticsToSend& statistic) } std::string StatisticsSenderInfluxDb::StatisticsToString(const StatisticsToSend& statistic) const noexcept { - std::string str; - str = "statistics," + statistic.tags + " elapsed_ms=" + string_format("%ld", statistic.elapsed_ms); + std::string str, tags; + for (auto tag : statistic.tags) { + tags += "," + tag.first + "=" + tag.second; + } + str = "statistics" + tags + " elapsed_ms=" + string_format("%ld", statistic.elapsed_ms); str += ",data_volume=" + string_format("%ld", statistic.data_volume); str += ",n_requests=" + string_format("%ld", statistic.n_requests); str += ",db_share=" + string_format("%.4f", statistic.entity_shares[StatisticEntity::kDatabase]); diff --git a/receiver/unittests/test_statistics.cpp b/receiver/unittests/test_statistics.cpp index 59cace0fe901f15abaf4a45681614aa2e6883ca4..1d910ace5c71a7beabeb47154611dd57ddd45492 100644 --- a/receiver/unittests/test_statistics.cpp +++ b/receiver/unittests/test_statistics.cpp @@ -5,6 +5,7 @@ #include "../src/statistics.h" #include "../src/statistics_sender.h" #include "../src/statistics_sender_influx_db.h" +#include "../src/statistics_sender_fluentd.h" using ::testing::Test; using ::testing::Gt; @@ -19,8 +20,9 @@ using asapo::Statistics; using asapo::StatisticEntity; using asapo::StatisticsSender; using asapo::StatisticsSenderInfluxDb; -using asapo::StatisticsToSend; +using asapo::StatisticsSenderFluentd; +using asapo::StatisticsToSend; namespace { @@ -28,7 +30,8 @@ namespace { TEST(StatisticTestsConstructor, Constructor) { Statistics statistics; - ASSERT_THAT(dynamic_cast<asapo::StatisticsSenderInfluxDb*>(statistics.statistics_sender__.get()), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<asapo::StatisticsSenderInfluxDb*>(statistics.statistics_sender_list__[0].get()), Ne(nullptr)); +// ASSERT_THAT(dynamic_cast<asapo::StatisticsSenderFluentd*>(statistics.statistics_sender_list__[1].get()), Ne(nullptr)); } @@ -46,10 +49,11 @@ class StatisticTests : public Test { void TestTimer(const StatisticEntity& entity); MockStatisticsSender mock_statistics_sender; void SetUp() override { - statistics.statistics_sender__.reset(&mock_statistics_sender); + statistics.statistics_sender_list__.clear(); + statistics.statistics_sender_list__.emplace_back(&mock_statistics_sender); } void TearDown() override { - statistics.statistics_sender__.release(); + statistics.statistics_sender_list__[0].release(); } StatisticsToSend ExtractStat(); }; @@ -98,7 +102,9 @@ TEST_F(StatisticTests, AddTag) { auto stat = ExtractStat(); - ASSERT_THAT(stat.tags, Eq("name=value")); + ASSERT_THAT(stat.tags[0].first, Eq("name")); + ASSERT_THAT(stat.tags[0].second, Eq("value")); + } TEST_F(StatisticTests, AddTagTwice) { @@ -107,7 +113,11 @@ TEST_F(StatisticTests, AddTagTwice) { auto stat = ExtractStat(); - ASSERT_THAT(stat.tags, Eq("name1=value1,name2=value2")); + ASSERT_THAT(stat.tags[0].first, Eq("name1")); + ASSERT_THAT(stat.tags[0].second, Eq("value1")); + ASSERT_THAT(stat.tags[1].first, Eq("name2")); + ASSERT_THAT(stat.tags[1].second, Eq("value2")); + } diff --git a/receiver/unittests/test_statistics_sender_fluentd.cpp b/receiver/unittests/test_statistics_sender_fluentd.cpp new file mode 100644 index 0000000000000000000000000000000000000000..dc6db3cca3e805a195cba07209af84bafea1a056 --- /dev/null +++ b/receiver/unittests/test_statistics_sender_fluentd.cpp @@ -0,0 +1,90 @@ +#include <gtest/gtest.h> +#include <gmock/gmock.h> + +#include "unittests/MockIO.h" +#include "unittests/MockLogger.h" + +#include "../src/statistics_sender_influx_db.h" +#include "../src/statistics_sender.h" +#include "../../common/cpp/src/http_client/curl_http_client.h" +#include "unittests/MockHttpClient.h" +#include "../src/statistics.h" + +#include "../src/receiver_config.h" +#include "mock_receiver_config.h" +#include "../src/statistics_sender_fluentd.h" + +using ::testing::Test; +using ::testing::Return; +using ::testing::_; +using ::testing::DoAll; +using ::testing::SetArgReferee; +using ::testing::Gt; +using ::testing::Eq; +using ::testing::Ne; +using ::testing::Mock; +using ::testing::NiceMock; +using ::testing::SaveArg; +using ::testing::HasSubstr; +using ::testing::AllOf; +using ::testing::SaveArgPointee; +using ::testing::InSequence; +using ::testing::SetArgPointee; + +using asapo::StatisticsSenderFluentd; +using asapo::MockHttpClient; +using asapo::StatisticsToSend; +using asapo::ReceiverConfig; +using asapo::SetReceiverConfig; + +namespace { + +TEST(SenderFluentd, Constructor) { + StatisticsSenderFluentd sender; + ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(sender.statistics_log__.get()), Ne(nullptr)); +} + + +class SenderFluentdTests : public Test { + public: + StatisticsSenderFluentd sender; + NiceMock<asapo::MockLogger> mock_logger; + StatisticsToSend statistics; + ReceiverConfig config; + + void SetUp() override { + statistics.n_requests = 4; + statistics.entity_shares[asapo::StatisticEntity::kDisk] = 0.6; + statistics.entity_shares[asapo::StatisticEntity::kNetwork] = 0.3; + statistics.entity_shares[asapo::StatisticEntity::kDatabase] = 0.1; + statistics.elapsed_ms = 100; + statistics.data_volume = 1000; + statistics.tags.push_back(std::make_pair("name1", "value1")); + statistics.tags.push_back(std::make_pair("name2", "value2")); + + config.monitor_db_uri = "test_uri"; + config.monitor_db_name = "test_name"; + SetReceiverConfig(config); + + sender.statistics_log__.reset(&mock_logger); + + } + void TearDown() override { + sender.statistics_log__.release(); + } +}; + + +TEST_F(SenderFluentdTests, SendStatisticsCallsPost) { + std::string expect_string = + R"("@target_type_key":"stat","name1":"value1","name2":"value2","elapsed_ms":100,"data_volume":1000)" + R"(,"n_requests":4,"db_share":0.1000,"network_share":0.3000,"disk_share":0.6000)"; + EXPECT_CALL(mock_logger, Info(HasSubstr(expect_string))); + + + sender.SendStatistics(statistics); +} + + + +} diff --git a/receiver/unittests/test_statistics_sender_influx_db.cpp b/receiver/unittests/test_statistics_sender_influx_db.cpp index 48f7f704c54916a69230e4b82dcc1950791b6b8f..ae60de76acb61862b8eb9bba18c7f76aa33a1e7c 100644 --- a/receiver/unittests/test_statistics_sender_influx_db.cpp +++ b/receiver/unittests/test_statistics_sender_influx_db.cpp @@ -61,7 +61,8 @@ class SenderInfluxDbTests : public Test { statistics.entity_shares[asapo::StatisticEntity::kDatabase] = 0.1; statistics.elapsed_ms = 100; statistics.data_volume = 1000; - statistics.tags = "name1=value1,name2=value2"; + statistics.tags.push_back(std::make_pair("name1", "value1")); + statistics.tags.push_back(std::make_pair("name2", "value2")); config.monitor_db_uri = "test_uri"; config.monitor_db_name = "test_name"; @@ -106,7 +107,7 @@ TEST_F(SenderInfluxDbTests, LogErrorWithWrongResponceSendStatistics) { TEST_F(SenderInfluxDbTests, LogDebugSendStatistics) { EXPECT_CALL(mock_http_client, Post_t(_, _, _, _)). WillOnce( - DoAll(SetArgPointee<3>(nullptr), SetArgPointee<2>(asapo::HttpCode::OK), Return("error response") + DoAll(SetArgPointee<3>(nullptr), SetArgPointee<2>(asapo::HttpCode::OK), Return("ok response") )); EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("sending statistics"), diff --git a/tests/automatic/broker/check_monitoring/check_linux.sh b/tests/automatic/broker/check_monitoring/check_linux.sh index 254e8ea577510249c4f945c34e93cacda55127a4..b0a2de9abbec2284cd1e3ff409ba39cc74aa3049 100644 --- a/tests/automatic/broker/check_monitoring/check_linux.sh +++ b/tests/automatic/broker/check_monitoring/check_linux.sh @@ -7,6 +7,7 @@ set -e trap Cleanup EXIT Cleanup() { + set +e echo cleanup influx -execute "drop database ${database_name}" kill -9 $brokerid diff --git a/tests/automatic/broker/get_next/check_linux.sh b/tests/automatic/broker/get_next/check_linux.sh index 8c2da0557ee06fc479e37bb9d5c4498a5212640a..3a3119b1a0df98ed9cbb29a7f6859819e27531ae 100644 --- a/tests/automatic/broker/get_next/check_linux.sh +++ b/tests/automatic/broker/get_next/check_linux.sh @@ -23,4 +23,4 @@ brokerid=`echo $!` curl -v --silent 127.0.0.1:5005/database/data/next --stderr - | grep '"_id":1' curl -v --silent 127.0.0.1:5005/database/data/next --stderr - | grep '"_id":2' -curl -v --silent 127.0.0.1:5005/database/data/next --stderr - | grep "Not Found" +curl -v --silent 127.0.0.1:5005/database/data/next --stderr - | grep "not found" diff --git a/tests/automatic/broker/get_next/check_windows.bat b/tests/automatic/broker/get_next/check_windows.bat index 026563bea3cc4fea8d233d23ca70cf80c7280aca..443c05422d74a9c346e72ca65cfd5c5535a1f964 100644 --- a/tests/automatic/broker/get_next/check_windows.bat +++ b/tests/automatic/broker/get_next/check_windows.bat @@ -11,9 +11,9 @@ start /B "" "%full_name%" -config settings.json ping 1.0.0.0 -n 1 -w 100 > nul -C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next --stderr - | findstr \"_id\":1 || goto :error -C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next --stderr - | findstr \"_id\":2 || goto :error -C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next --stderr - | findstr "Not Found" || goto :error +C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next --stderr - | findstr /c:\"_id\":1 || goto :error +C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next --stderr - | findstr /c:\"_id\":2 || goto :error +C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next --stderr - | findstr /c:"not found" || goto :error goto :clean diff --git a/tests/automatic/full_chain/simple_chain/check_linux.sh b/tests/automatic/full_chain/simple_chain/check_linux.sh index f384a4b70254a617746823ea3c5f84e69ed619de..11c022e52ccfcf5a5eb5d996b34a1ebdb6363e07 100644 --- a/tests/automatic/full_chain/simple_chain/check_linux.sh +++ b/tests/automatic/full_chain/simple_chain/check_linux.sh @@ -6,13 +6,14 @@ trap Cleanup EXIT broker_database_name=test_run monitor_database_name=db_test -broker_address=127.0.0.1:5005 +proxy_address=127.0.0.1:8400 receiver_folder=/tmp/asapo/receiver/files Cleanup() { echo cleanup rm -rf ${receiver_folder} + nomad stop nginx nomad stop receiver nomad stop discovery nomad stop broker @@ -24,6 +25,7 @@ Cleanup() { influx -execute "create database ${monitor_database_name}" echo "db.${broker_database_name}.insert({dummy:1})" | mongo ${broker_database_name} +nomad run nginx.nmd nomad run receiver.nmd nomad run discovery.nmd nomad run broker.nmd @@ -32,8 +34,8 @@ sleep 1 #producer mkdir -p ${receiver_folder} -$1 localhost:5006 100 1000 4 0 & +$1 localhost:8400 100 1000 4 0 & #producerid=`echo $!` -$2 ${broker_address} ${broker_database_name} 2 | grep "Processed 1000 file(s)" +$2 ${proxy_address} ${broker_database_name} 2 | grep "Processed 1000 file(s)" diff --git a/tests/automatic/full_chain/simple_chain/check_windows.bat b/tests/automatic/full_chain/simple_chain/check_windows.bat index bd7115b3858e0283c548d108f8143d80e17f7c2c..2c6f5291593fc91718a68b81a4415f191bdad4ea 100644 --- a/tests/automatic/full_chain/simple_chain/check_windows.bat +++ b/tests/automatic/full_chain/simple_chain/check_windows.bat @@ -1,23 +1,24 @@ SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" set broker_database_name=test_run SET receiver_folder="c:\tmp\asapo\receiver\files" +set proxy_address="127.0.0.1:8400" echo db.%broker_database_name%.insert({dummy:1}) | %mongo_exe% %broker_database_name% c:\opt\consul\nomad run receiver.nmd c:\opt\consul\nomad run discovery.nmd c:\opt\consul\nomad run broker.nmd +c:\opt\consul\nomad run nginx.nmd ping 1.0.0.0 -n 10 -w 100 > nul REM producer mkdir %receiver_folder% -start /B "" "%1" localhost:5006 100 1000 4 0 +start /B "" "%1" %proxy_address% 100 1000 4 0 ping 1.0.0.0 -n 1 -w 100 > nul REM worker -set broker_address="127.0.0.1:5005" -"%2" %broker_address% %broker_database_name% 2 | findstr "Processed 1000 file(s)" || goto :error +"%2" %proxy_address% %broker_database_name% 2 | findstr /c:"Processed 1000 file(s)" || goto :error goto :clean @@ -30,6 +31,7 @@ exit /b 1 c:\opt\consul\nomad stop receiver c:\opt\consul\nomad stop discovery c:\opt\consul\nomad stop broker +c:\opt\consul\nomad stop nginx rmdir /S /Q %receiver_folder% echo db.dropDatabase() | %mongo_exe% %broker_database_name% diff --git a/tests/automatic/producer_receiver/check_monitoring/check_linux.sh b/tests/automatic/producer_receiver/check_monitoring/check_linux.sh index 817e76757865684c21985e90c262eb2a056a2b33..936ca16de475b8e2a744afa28eb84507862ac532 100644 --- a/tests/automatic/producer_receiver/check_monitoring/check_linux.sh +++ b/tests/automatic/producer_receiver/check_monitoring/check_linux.sh @@ -12,6 +12,7 @@ Cleanup() { influx -execute "drop database ${database_name}" nomad stop receiver nomad stop discovery + nomad stop nginx echo "db.dropDatabase()" | mongo ${mongo_database_name} rm -rf ${receiver_folder} } @@ -22,10 +23,11 @@ influx -execute "create database ${database_name}" nomad run receiver.nmd nomad run discovery.nmd +nomad run nginx.nmd sleep 1 -$1 localhost:5006 100 112 4 0 +$1 localhost:8400 100 112 4 0 sleep 1 diff --git a/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh b/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh index d904c84f601fb0dee70caedc51fe94f5b0ae091f..d2b4f5f5ae318c4517fa0d5db643dfaf9d9085d9 100644 --- a/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh +++ b/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh @@ -13,6 +13,7 @@ Cleanup() { rm -rf ${receiver_folder} nomad stop receiver nomad stop discovery + nomad stop nginx echo "db.dropDatabase()" | mongo ${mongo_database_name} influx -execute "drop database ${database_name}" } @@ -20,12 +21,13 @@ Cleanup() { influx -execute "create database ${database_name}" echo "db.${mongo_database_name}.insert({dummy:1})" | mongo ${mongo_database_name} +nomad run nginx.nmd nomad run receiver.nmd nomad run discovery.nmd mkdir -p ${receiver_folder} -$1 localhost:5006 100 1 1 0 +$1 localhost:8400 100 1 1 0 ls -ln ${receiver_folder}/1.bin | awk '{ print $5 }'| grep 102400 diff --git a/tests/automatic/producer_receiver/transfer_single_file/check_windows.bat b/tests/automatic/producer_receiver/transfer_single_file/check_windows.bat index af71a9d81b50613e19968b03b0f19aff90adc46c..cb26780f3fa028d4de4f62daf5f57750aca7571d 100644 --- a/tests/automatic/producer_receiver/transfer_single_file/check_windows.bat +++ b/tests/automatic/producer_receiver/transfer_single_file/check_windows.bat @@ -7,12 +7,13 @@ echo db.%database_name%.insert({dummy:1})" | %mongo_exe% %database_name% c:\opt\consul\nomad run receiver.nmd c:\opt\consul\nomad run discovery.nmd +c:\opt\consul\nomad run nginx.nmd ping 1.0.0.0 -n 1 -w 100 > nul mkdir %receiver_folder% -%1 localhost:5006 100 1 1 0 +%1 localhost:8400 100 1 1 0 ping 1.0.0.0 -n 1 -w 100 > nul @@ -28,6 +29,7 @@ exit /b 1 :clean c:\opt\consul\nomad stop receiver c:\opt\consul\nomad stop discovery +c:\opt\consul\nomad stop nginx rmdir /S /Q %receiver_folder% echo db.dropDatabase() | %mongo_exe% %database_name% diff --git a/tests/automatic/settings/discovery_settings.json.tpl b/tests/automatic/settings/discovery_settings.json.tpl index 62cb9864b6b7cf0c4a738ca8f1a2b254ff4400fc..25cba67824339d7c81a9d39b4a89d8356d4ea9bf 100644 --- a/tests/automatic/settings/discovery_settings.json.tpl +++ b/tests/automatic/settings/discovery_settings.json.tpl @@ -1,6 +1,8 @@ { - "MaxConnections": 32, "Mode": "consul", + "Receiver": { + "MaxConnections": 32 + }, "Port": {{ env "NOMAD_PORT_discovery" }}, "LogLevel":"debug" } diff --git a/tests/automatic/settings/nginx.conf.tpl b/tests/automatic/settings/nginx.conf.tpl new file mode 100644 index 0000000000000000000000000000000000000000..88b81082f43ab60f599b7483a23ae5deb70aa16c --- /dev/null +++ b/tests/automatic/settings/nginx.conf.tpl @@ -0,0 +1,36 @@ +worker_processes 1; +daemon off; + +events { + worker_connections 1024; +} + +http { +# include mime.types; +# default_type application/octet-stream; + +# sendfile on; +# tcp_nopush on; + +# keepalive_timeout 0; +# keepalive_timeout 65; + + resolver 127.0.0.1:8600 valid=1s; + server { + listen {{ env "NOMAD_PORT_nginx" }}; + set $discovery_endpoint discovery.service.asapo; + # set $fluentd_endpoint localhost; + location /discovery/ { + rewrite ^/discovery(/.*) $1 break; + proxy_pass http://$discovery_endpoint:5006$uri; + } + location /logs/ { + # rewrite ^/logs(/.*) $1 break; + proxy_pass http://localhost:9880/asapo; + } + + location /nginx-health { + return 200 "healthy\n"; + } + } +} diff --git a/tests/automatic/spd_logger/console/check_linux.sh b/tests/automatic/spd_logger/console/check_linux.sh index 5c055c579efa32b194027dc05b4fedc77aa83579..bc934a732cde989660de74baa5737b84e07ed5a9 100644 --- a/tests/automatic/spd_logger/console/check_linux.sh +++ b/tests/automatic/spd_logger/console/check_linux.sh @@ -4,10 +4,11 @@ set -e res=`$@` -echo $res | grep "\[info\] test info" -echo $res | grep "\[error\] test error" -echo $res | grep "\[debug\] test debug" -echo $res | grep "\[warning\] test warning" +echo $res | grep '"level":"info","message":"test info"' +echo $res | grep '"test_int":2,"test_double":1.0}' +echo $res | grep '"level":"error","message":"test error"' +echo $res | grep '"level":"debug","message":"test debug"' +echo $res | grep '"level":"warning","message":"test warning"' echo $res | grep "test info_mt_0" echo $res | grep "test info_mt_1" echo $res | grep "test info_mt_2" diff --git a/tests/automatic/spd_logger/console/check_windows.bat b/tests/automatic/spd_logger/console/check_windows.bat index 1bbc2b0bafd05aeb27d1b4eebe4aebfca7776853..78ecaf0a0ffb0006370b2c51093a62108cf727c6 100644 --- a/tests/automatic/spd_logger/console/check_windows.bat +++ b/tests/automatic/spd_logger/console/check_windows.bat @@ -1,9 +1,11 @@ "%1" > output -findstr /I /L /C:"[info] test info" output || goto :error -findstr /I /L /C:"[error] test error" output || goto :error -findstr /I /L /C:"[debug] test debug" output|| goto :error -findstr /I /L /C:"[warning] test warning" output || goto :error +findstr /I /L /C:"\"level\":\"info\",\"message\":\"test info\"" output || goto :error +findstr /I /L /C:"\"level\":\"error\",\"message\":\"test error\"" output || goto :error +findstr /I /L /C:"\"level\":\"debug\",\"message\":\"test debug\"" output || goto :error +findstr /I /L /C:"\"level\":\"warning\",\"message\":\"test warning\"" output || goto :error +findstr /I /L /C:"\"test_int\":2,\"test_double\":1.0" output || goto :error + findstr /I /L /C:"test info_mt_0" output || goto :error findstr /I /L /C:"test info_mt_1" output || goto :error findstr /I /L /C:"test info_mt_2" output || goto :error diff --git a/tests/automatic/spd_logger/console/spd_logger_console.cpp b/tests/automatic/spd_logger/console/spd_logger_console.cpp index 6b557098810ba29dd1945623b212299582da4f67..130e163f82edd8442cc03923436367fa37718e2e 100644 --- a/tests/automatic/spd_logger/console/spd_logger_console.cpp +++ b/tests/automatic/spd_logger/console/spd_logger_console.cpp @@ -24,6 +24,9 @@ int main(int argc, char* argv[]) { thread.join(); } + + logger->Info(LogMessageWithFields{"test_int", 2} .Append("test_double", 1, 1)); + logger->Info("test info"); logger->Error("test error"); logger->Warning("test warning"); diff --git a/tests/automatic/spd_logger/fluentd/check_linux.sh b/tests/automatic/spd_logger/fluentd/check_linux.sh index 2a9b53535bf91c0a5fec4003d15b3abb3a50207c..819c8a3b950ae9171cffa3b4258aa96ead53d8d5 100644 --- a/tests/automatic/spd_logger/fluentd/check_linux.sh +++ b/tests/automatic/spd_logger/fluentd/check_linux.sh @@ -12,7 +12,7 @@ cat /tmp/fluentd/asapo.*.log res=`cat /tmp/fluentd/asapo.*.log` -echo $res | grep "test info" +echo $res | grep '"json_test":"info"' echo $res | grep "test error" echo $res | grep "test debug" echo $res | grep "test warning" diff --git a/tests/automatic/spd_logger/fluentd/check_windows.bat b/tests/automatic/spd_logger/fluentd/check_windows.bat index ce8be6302dab8e139d0545e16f19f0f3dc26e469..3053cca71eced24528919e7e0f76e5a23f371b35 100644 --- a/tests/automatic/spd_logger/fluentd/check_windows.bat +++ b/tests/automatic/spd_logger/fluentd/check_windows.bat @@ -5,7 +5,8 @@ del %folder%\asapo.*.log ping 1.0.0.0 -n 5 > nul -findstr /I /L /C:"test info" %folder%\asapo.*.log || goto :error + +findstr /I /L /C:"\"json_test\":\"info\"" %folder%\asapo.*.log || goto :error findstr /I /L /C:"test error" %folder%\asapo.*.log || goto :error findstr /I /L /C:"test debug" %folder%\asapo.*.log || goto :error findstr /I /L /C:"test warning" %folder%\asapo.*.log || goto :error diff --git a/tests/automatic/spd_logger/fluentd/spd_logger_fluentd.cpp b/tests/automatic/spd_logger/fluentd/spd_logger_fluentd.cpp index 580188bbd4bba3745006dd09a8795b7936eff1bd..30e7f71f4364f090ba7b93bcb47af642e0982fa3 100644 --- a/tests/automatic/spd_logger/fluentd/spd_logger_fluentd.cpp +++ b/tests/automatic/spd_logger/fluentd/spd_logger_fluentd.cpp @@ -11,7 +11,7 @@ int main(int argc, char* argv[]) { logger->SetLogLevel(LogLevel::Debug); - logger->Info("test info"); + logger->Info(LogMessageWithFields{"json_test", "info"}); logger->Error("test error"); logger->Warning("test warning"); logger->Debug("test debug"); diff --git a/tests/automatic/worker/next_multithread_broker/CMakeLists.txt b/tests/automatic/worker/next_multithread_broker/CMakeLists.txt index 28525645e8c1e08f24251df35bbff7c2562b9f91..69750c8f47ce37a2e8e7b4a5adccd022a6373328 100644 --- a/tests/automatic/worker/next_multithread_broker/CMakeLists.txt +++ b/tests/automatic/worker/next_multithread_broker/CMakeLists.txt @@ -11,7 +11,7 @@ target_link_libraries(${TARGET_NAME} test_common asapo-worker) ################################ # Testing ################################ -configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/broker_settings.json settings.json COPYONLY) -add_script_test("${TARGET_NAME}" "$<TARGET_FILE:${TARGET_NAME}> $<TARGET_PROPERTY:asapo-broker,EXENAME>" +prepare_asapo() +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:${TARGET_NAME}>" ) diff --git a/tests/automatic/worker/next_multithread_broker/check_linux.sh b/tests/automatic/worker/next_multithread_broker/check_linux.sh index 086f98a26573c2815fc36b227b876ac9be313add..c5a52cbb37b40d257309f329049f3b3da5b72470 100644 --- a/tests/automatic/worker/next_multithread_broker/check_linux.sh +++ b/tests/automatic/worker/next_multithread_broker/check_linux.sh @@ -7,23 +7,25 @@ set -e trap Cleanup EXIT Cleanup() { -: - kill -9 $brokerid + set +e + nomad stop nginx + nomad stop discovery + nomad stop broker echo "db.dropDatabase()" | mongo ${database_name} } -args=${@:1:$(($# - 1))} -broker=${@:$#} -$broker -config settings.json & -brokerid=`echo $!` -sleep 0.3 +nomad run nginx.nmd +nomad run discovery.nmd +nomad run broker.nmd + +sleep 1 for i in `seq 1 10`; do echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1})' | mongo ${database_name} done -$args 127.0.0.1:5005 $database_name 4 10 +$@ 127.0.0.1:8400 $database_name 4 10 diff --git a/tests/automatic/worker/next_multithread_broker/check_windows.bat b/tests/automatic/worker/next_multithread_broker/check_windows.bat index 321f9c5c947274cfffa449f5ada1b8412e17a100..b3762c8a856d534d4f92590f68b5820e09265ab1 100644 --- a/tests/automatic/worker/next_multithread_broker/check_windows.bat +++ b/tests/automatic/worker/next_multithread_broker/check_windows.bat @@ -2,19 +2,17 @@ SET database_name=test_run SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" ::first argument path to the executable -:: second argument path to the broker -set full_name="%2" -set short_name="%~nx2" +c:\opt\consul\nomad run discovery.nmd +c:\opt\consul\nomad run broker.nmd +c:\opt\consul\nomad run nginx.nmd -start /B "" "%full_name%" -config settings.json - -ping 1.0.0.0 -n 1 -w 100 > nul +ping 1.0.0.0 -n 10 -w 100 > nul for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1}) | %mongo_exe% %database_name% || goto :error -%1 127.0.0.1:5005 %database_name% 4 10 || goto :error +%1 127.0.0.1:8400 %database_name% 4 10 || goto :error goto :clean @@ -23,5 +21,7 @@ call :clean exit /b 1 :clean -Taskkill /IM "%short_name%" /F +c:\opt\consul\nomad stop discovery +c:\opt\consul\nomad stop broker +c:\opt\consul\nomad stop nginx echo db.dropDatabase() | %mongo_exe% %database_name% diff --git a/tests/manual/performance_broker/discovery.json b/tests/manual/performance_broker/discovery.json new file mode 100644 index 0000000000000000000000000000000000000000..a1438446b22a2d31e815c7f93185c44a5c14f066 --- /dev/null +++ b/tests/manual/performance_broker/discovery.json @@ -0,0 +1,14 @@ +{ + "Mode": "static", + "Broker": { + "StaticEndpoint": "localhost:5005" + }, + "Receiver": { + "MaxConnections":1, + "StaticEndpoints": [ + "test" + ] + }, + "Port": 5006, + "LogLevel": "info" +} \ No newline at end of file diff --git a/tests/manual/performance_broker/settings.json b/tests/manual/performance_broker/settings.json index c45d16f2f7b59b7966ad9d2d406ef530da720a2b..a2c1a4a5ab7238e14c26667e5bfc7335e935d96d 100644 --- a/tests/manual/performance_broker/settings.json +++ b/tests/manual/performance_broker/settings.json @@ -2,5 +2,6 @@ "BrokerDbAddress":"localhost:27017", "MonitorDbAddress": "localhost:8086", "MonitorDbName": "db_test", - "port":5005 + "port":5005, + "LogLevel":"info" } \ No newline at end of file diff --git a/tests/manual/performance_broker/test.sh b/tests/manual/performance_broker/test.sh index e0b083c3d7e973cd6b1782ecd107b20eea0abb4e..9c36fce459ccb7f53f587fdaed07cbe13fa36bb1 100755 --- a/tests/manual/performance_broker/test.sh +++ b/tests/manual/performance_broker/test.sh @@ -20,22 +20,25 @@ worker_dir=~/broker_test service_dir=~/broker_test -cat settings.json | - jq "to_entries | - map(if .key == \"MonitorDbAddress\" - then . + {value:\"${monitor_node}:${monitor_port}\"} - else . - end - ) | - from_entries" > settings_tmp.json +cat settings.json | jq ".MonitorDbAddress = \"${monitor_node}:${monitor_port}\"" > settings_tmp.json + +cat discovery.json | jq ".Broker.StaticEndpoint = \"${service_node}:5005\"" > discovery_tmp.json + ssh ${monitor_node} influx -execute \"create database db_test\" ssh ${service_node} docker run -d -p 27017:27017 --name mongo mongo #ssh ${service_node} docker run -d -p 8086 -p 8086 --name influxdb influxdb -ssh ${service_node} mkdir ${service_dir} -ssh ${worker_node} mkdir ${worker_dir} +ssh ${service_node} mkdir -p ${service_dir} +ssh ${worker_node} mkdir -p ${worker_dir} + + +scp ../../../cmake-build-release/discovery/asapo-discovery ${service_node}:${service_dir} +scp discovery_tmp.json ${service_node}:${service_dir}/discovery.json +rm discovery_tmp.json + +ssh ${service_node} "bash -c 'cd ${service_dir}; nohup ./asapo-discovery -config discovery.json &> ${service_dir}/discovery.log &'" scp settings_tmp.json ${service_node}:${service_dir}/settings.json @@ -49,10 +52,12 @@ ssh ${worker_node} ${worker_dir}/folder2db -n ${nthreads} ${dir} ${run_name} ${s sleep 3 scp ../../../cmake-build-release/examples/worker/getnext_broker/getnext_broker ${worker_node}:${worker_dir} -ssh ${worker_node} ${worker_dir}/getnext_broker ${service_node}:5005 ${run_name} ${nthreads} +ssh ${worker_node} ${worker_dir}/getnext_broker ${service_node}:8400 ${run_name} ${nthreads} ssh ${service_node} killall asapo-broker +ssh ${service_node} killall asapo-discovery + ssh ${service_node} docker rm -f mongo #ssh ${service_node} docker rm -f influxdb diff --git a/tests/manual/performance_full_chain_simple/discovery.json b/tests/manual/performance_full_chain_simple/discovery.json index 476f732bbedad31adc0e4ce4fbbee1ca081cc025..8cdd433a71f05e3d7408b696e0c1e220873b7de4 100644 --- a/tests/manual/performance_full_chain_simple/discovery.json +++ b/tests/manual/performance_full_chain_simple/discovery.json @@ -1,7 +1,15 @@ { - "MaxConnections": 32, "Mode": "static", - "Endpoints":["localhost:4200"], - "Port":5006, - "LogLevel":"info" -} \ No newline at end of file + "Receiver": { + "StaticEndpoints": [ + "localhost:4200" + ], + "MaxConnections": 32 + }, + "Broker": { + "StaticEndpoint": "localhost:5005" + }, + "Port": 5006, + "LogLevel": "info" +} + diff --git a/tests/manual/performance_full_chain_simple/fluentd.conf b/tests/manual/performance_full_chain_simple/fluentd.conf index e24b4f127709f9d137103badcfb7117792c9e03d..145dbce4bd148936c75da0c94b13786359908a25 100644 --- a/tests/manual/performance_full_chain_simple/fluentd.conf +++ b/tests/manual/performance_full_chain_simple/fluentd.conf @@ -6,18 +6,6 @@ add_remote_addr true </source> -<filter asapo> -@type parser -key_name message -reserve_data true -<parse> - @type regexp - expression /\[(?<time>[^\]]*)\] \[(?<source>[^ ]*)\] \[(?<level>[^ ]*)\] (?<message>[^\n]*)/ - time_key time - time_format %Y-%m-%d %H:%M:%S.%N - </parse> -</filter> - <source> @type tail path /logs/*.broker @@ -32,7 +20,8 @@ reserve_data true path /logs/*.receiver pos_file /tmp/asapo.logrus.log.pos2 tag asapo - format none + format json + time_format %Y-%m-%d %H:%M:%S.%N </source> <match asapo.**> diff --git a/tests/manual/performance_full_chain_simple/receiver.json b/tests/manual/performance_full_chain_simple/receiver.json index 7cf0d85c122d91a081050a4c7fe648618e84f841..adf89f8ee2e32fef2d2ea21ca5e3d271a6071d65 100644 --- a/tests/manual/performance_full_chain_simple/receiver.json +++ b/tests/manual/performance_full_chain_simple/receiver.json @@ -7,5 +7,6 @@ "WriteToDisk":true, "WriteToDb":true, "LogLevel":"info", - "Tag": "test_receiver" + "Tag": "test_receiver", + "RootFolder" : "/gpfs/petra3/scratch/yakubov/receiver_tests/files" } diff --git a/tests/manual/performance_full_chain_simple/test.sh b/tests/manual/performance_full_chain_simple/test.sh index 880e038853ebcc12ff5604d03743a150bac3a707..3ddd186fb746e14c1b473bba8973e0127721ac3a 100755 --- a/tests/manual/performance_full_chain_simple/test.sh +++ b/tests/manual/performance_full_chain_simple/test.sh @@ -26,7 +26,7 @@ log_dir=~/fullchain_tests/logs # runs producer with various file sizes from $producer_node and measures performance file_size=10000 -file_num=$((100000000 / $file_size)) +file_num=$((10000000 / $file_size)) echo filesize: ${file_size}K, filenum: $file_num # receiver_setup @@ -62,10 +62,15 @@ cat discovery.json | end ) | from_entries" > discovery_tmp.json + +cat discovery_tmp.json | jq ".Broker.StaticEndpoint = \"${receiver_node}:5005\"" > discovery_tmp1.json + +cat discovery_tmp1.json | jq ".Receiver.StaticEndpoints = [\"${receiver_node}:${receiver_port}\"]" > discovery_tmp2.json + scp ../../../cmake-build-release/discovery/asapo-discovery ${receiver_node}:${receiver_dir} -scp discovery_tmp.json ${receiver_node}:${receiver_dir}/discovery.json +scp discovery_tmp2.json ${receiver_node}:${receiver_dir}/discovery.json discovery_ip=`resolveip -s ${receiver_node}` -rm discovery_tmp.json +rm discovery_tmp*.json #producer_setup producer_node=max-display001 @@ -117,11 +122,13 @@ sleep 0.3 ssh ${broker_node} "bash -c 'cd ${broker_dir}; nohup ./asapo-broker -config broker.json &> ${log_dir}/log.broker &'" sleep 0.3 +sleep 5 + #producer_start -ssh ${producer_node} "bash -c 'cd ${producer_dir}; nohup ./dummy-data-producer ${discovery_ip}:${discovery_port} ${file_size} ${file_num} ${producer_nthreads} 0 &> ${log_dir}/producer.log &'" +ssh ${producer_node} "bash -c 'cd ${producer_dir}; nohup ./dummy-data-producer ${receiver_node}:8400 ${file_size} ${file_num} ${producer_nthreads} 0 &> ${log_dir}/producer.log &'" sleep 1 #worker_start -ssh ${worker_node} ${worker_dir}/getnext_broker ${broker_node}:5005 test_run ${nthreads} +ssh ${worker_node} ${worker_dir}/getnext_broker ${receiver_node}:8400 test_run ${nthreads} diff --git a/tests/manual/performance_producer_receiver/discovery.json b/tests/manual/performance_producer_receiver/discovery.json index 476f732bbedad31adc0e4ce4fbbee1ca081cc025..e741be430a6c727cd53b1f35753a2c3010f83deb 100644 --- a/tests/manual/performance_producer_receiver/discovery.json +++ b/tests/manual/performance_producer_receiver/discovery.json @@ -1,7 +1,14 @@ { - "MaxConnections": 32, "Mode": "static", - "Endpoints":["localhost:4200"], + "Receiver": { + "StaticEndpoints": [ + "localhost:4200" + ], + "MaxConnections": 32 + }, + "Broker": { + "StaticEndpoint": "localhost:5005" + }, "Port":5006, "LogLevel":"info" } \ No newline at end of file diff --git a/tests/manual/performance_producer_receiver/receiver.json b/tests/manual/performance_producer_receiver/receiver.json index 7cf0d85c122d91a081050a4c7fe648618e84f841..adf89f8ee2e32fef2d2ea21ca5e3d271a6071d65 100644 --- a/tests/manual/performance_producer_receiver/receiver.json +++ b/tests/manual/performance_producer_receiver/receiver.json @@ -7,5 +7,6 @@ "WriteToDisk":true, "WriteToDb":true, "LogLevel":"info", - "Tag": "test_receiver" + "Tag": "test_receiver", + "RootFolder" : "/gpfs/petra3/scratch/yakubov/receiver_tests/files" } diff --git a/tests/manual/performance_producer_receiver/test.sh b/tests/manual/performance_producer_receiver/test.sh index 83a52730d310e50cd533cb3fe85475cef4a8a391..53f9dfe6f417defc8eab078a206221cea07f4d52 100755 --- a/tests/manual/performance_producer_receiver/test.sh +++ b/tests/manual/performance_producer_receiver/test.sh @@ -4,6 +4,14 @@ set -e trap Cleanup EXIT +Cleanup() { +set +e +ssh ${service_node} rm -f ${service_dir}/files/* +ssh ${service_node} killall receiver +ssh ${service_node} killall asapo-discovery +ssh ${service_node} docker rm -f -v mongo +} + # starts receiver on $service_node # runs producer with various file sizes from $worker_node and measures performance @@ -48,20 +56,24 @@ cat receiver.json | end ) | from_entries" > receiver_tmp.json + cat discovery.json | jq "to_entries | map(if .key == \"Port\" then . + {value:${discovery_port}} - elif .key == \"Endpoints\" - then . + {value:[\"${service_node}:${receiver_port}\"]} else . end ) | from_entries" > discovery_tmp.json -scp discovery_tmp.json ${service_node}:${service_dir}/discovery.json +cat discovery.json | jq ".Port = ${discovery_port}" > discovery_tmp.json + +cat discovery_tmp.json | jq ".Receiver.StaticEndpoints = [\"${service_node}:${receiver_port}\"]" > discovery_tmp1.json + + +scp discovery_tmp1.json ${service_node}:${service_dir}/discovery.json scp receiver_tmp.json ${service_node}:${service_dir}/receiver.json -rm discovery_tmp.json receiver_tmp.json +rm discovery_tmp*.json receiver_tmp.json ssh ${service_node} "bash -c 'cd ${service_dir}; nohup ./receiver receiver.json &> ${service_dir}/receiver.log &'" ssh ${service_node} "bash -c 'cd ${service_dir}; nohup ./asapo-discovery -config discovery.json &> ${service_dir}/discovery.log &'" @@ -70,8 +82,11 @@ for size in 100 1000 10000 do ssh ${service_node} docker run -d -p 27017:27017 --name mongo mongo echo =================================================================== -ssh ${worker_node} ${worker_dir}/dummy-data-producer ${service_ip}:${discovery_port} ${size} 1000 8 0 -ssh ${service_node} rm -f ${service_dir}/files/* +ssh ${worker_node} ${worker_dir}/dummy-data-producer ${service_ip}:8400 ${size} 1000 8 0 +if [ "$1" == "true" ] +then + ssh ${service_node} rm -f ${service_dir}/files/* +fi ssh ${service_node} docker rm -f -v mongo done ssh ${service_node} killall receiver diff --git a/worker/api/cpp/include/worker/data_broker.h b/worker/api/cpp/include/worker/data_broker.h index ed9e356cad549ed78c8df753d404e91c2a3c5aa7..d916e629dd16daa6056f45763abd7f76cfc8623a 100644 --- a/worker/api/cpp/include/worker/data_broker.h +++ b/worker/api/cpp/include/worker/data_broker.h @@ -17,6 +17,7 @@ auto const kSourceNotFound = "Source Not Found"; auto const kSourceNotConnected = "Source Not Connacted"; auto const kSourceAlreadyConnected = "Source Already Connected"; auto const kErrorReadingSource = "Error Reading Source"; +auto const kNotFound = "Uri not found"; auto const kPermissionDenied = "Permissionn Denied"; auto const kNoData = "No Data"; auto const kWrongInput = "Wrong Input"; diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp index 887c9bfc77d97156c77cdbf1bb07a629eff37233..2859b63e297cce2fefe8704121dabc99b01d631f 100644 --- a/worker/api/cpp/src/server_data_broker.cpp +++ b/worker/api/cpp/src/server_data_broker.cpp @@ -24,6 +24,9 @@ Error HttpCodeToWorkerError(const HttpCode& code) { message = WorkerErrorMessage::kErrorReadingSource; break; case HttpCode::NotFound: + message = WorkerErrorMessage::kErrorReadingSource; + break; + case HttpCode::Conflict: message = WorkerErrorMessage::kNoData; return TextErrorWithType(message, ErrorType::kEndOfFile); default: @@ -56,24 +59,23 @@ std::string GetIDFromJson(const std::string& json_string, Error* err) { return std::to_string(id); } -void ServerDataBroker::ProcessServerError(Error* err,const std::string& response,std::string* redirect_uri) { - if ((*err)->GetErrorType() != asapo::ErrorType::kEndOfFile) { - (*err)->Append(response); - return; - } else { +void ServerDataBroker::ProcessServerError(Error* err, const std::string& response, std::string* op) { + (*err)->Append(response); + if ((*err)->GetErrorType() == asapo::ErrorType::kEndOfFile) { if (response.find("id") != std::string::npos) { - auto id = GetIDFromJson(response, err); - if (*err) { + Error parse_error; + auto id = GetIDFromJson(response, &parse_error); + if (parse_error) { + (*err)->Append(parse_error->Explain()); return; } - *redirect_uri = server_uri_ + "/database/" + source_name_ + "/" + id; + *op = id; } } - *err=nullptr; return; } -Error ServerDataBroker::ProcessRequest(std::string* response,std::string request_uri) { +Error ServerDataBroker::ProcessRequest(std::string* response, std::string request_uri) { Error err; HttpCode code; *response = httpclient__->Get(request_uri, &code, &err); @@ -83,23 +85,40 @@ Error ServerDataBroker::ProcessRequest(std::string* response,std::string request return HttpCodeToWorkerError(code); } +Error ServerDataBroker::GetBrokerUri() { + if (!current_broker_uri_.empty()) { + return nullptr; + } + + std::string request_uri = server_uri_ + "/discovery/broker"; + Error err; + err = ProcessRequest(¤t_broker_uri_, request_uri); + if (err != nullptr || current_broker_uri_.empty()) { + current_broker_uri_ = ""; + return TextError("cannot get broker uri from " + server_uri_); + } + return nullptr; +} + + Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, const std::string& operation) { - std::string request_uri = server_uri_ + "/database/" + source_name_ + "/" + operation; + std::string request_suffix = operation; uint64_t elapsed_ms = 0; std::string response; while (true) { - auto err = ProcessRequest(&response,request_uri); + auto err = GetBrokerUri(); if (err == nullptr) { - break; + std::string request_api = current_broker_uri_ + "/database/" + source_name_ + "/"; + err = ProcessRequest(&response, request_api + request_suffix); + if (err == nullptr) { + break; + } } - ProcessServerError(&err,response,&request_uri); - if (err != nullptr) { - return err; - } + ProcessServerError(&err, response, &request_suffix); if (elapsed_ms >= timeout_ms_) { - err = TextErrorWithType("no more data found, exit on timeout", asapo::ErrorType::kTimeOut); + err = TextErrorWithType("exit on timeout, last error: " + err->Explain(), asapo::ErrorType::kTimeOut); return err; } std::this_thread::sleep_for(std::chrono::milliseconds(100)); diff --git a/worker/api/cpp/src/server_data_broker.h b/worker/api/cpp/src/server_data_broker.h index f17ce25c30bff2362c8c84f25094e63b5c566312..43ccfeb7be55df5a95c8748eada7604a79986f5b 100644 --- a/worker/api/cpp/src/server_data_broker.h +++ b/worker/api/cpp/src/server_data_broker.h @@ -20,9 +20,11 @@ class ServerDataBroker final : public asapo::DataBroker { std::unique_ptr<HttpClient> httpclient__; private: Error GetFileInfoFromServer(FileInfo* info, const std::string& operation); - void ProcessServerError(Error* err,const std::string& response,std::string* redirect_uri); - Error ProcessRequest(std::string* response,std::string request_uri); + Error GetBrokerUri(); + void ProcessServerError(Error* err, const std::string& response, std::string* redirect_uri); + Error ProcessRequest(std::string* response, std::string request_uri); std::string server_uri_; + std::string current_broker_uri_; std::string source_name_; uint64_t timeout_ms_ = 0; }; diff --git a/worker/api/cpp/unittests/test_server_broker.cpp b/worker/api/cpp/unittests/test_server_broker.cpp index 28e2caef439d0a720983785b64dba9575e5db300..bf8c4748d341220be3a9a48e5d60d421228d856a 100644 --- a/worker/api/cpp/unittests/test_server_broker.cpp +++ b/worker/api/cpp/unittests/test_server_broker.cpp @@ -33,6 +33,7 @@ using ::testing::NiceMock; using ::testing::Return; using ::testing::SetArgPointee; using ::testing::SetArgReferee; +using testing::AllOf; namespace { @@ -53,9 +54,11 @@ class ServerDataBrokerTests : public Test { NiceMock<MockIO> mock_io; NiceMock<MockHttpClient> mock_http_client; FileInfo info; + std::string expected_server_uri = "test:8400"; + std::string expected_broker_uri = "broker:5005"; void SetUp() override { - data_broker = std::unique_ptr<ServerDataBroker> {new ServerDataBroker("test", "dbname")}; + data_broker = std::unique_ptr<ServerDataBroker> {new ServerDataBroker(expected_server_uri, "dbname")}; data_broker->io__ = std::unique_ptr<IO> {&mock_io}; data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; } @@ -64,13 +67,20 @@ class ServerDataBrokerTests : public Test { data_broker->httpclient__.release(); } void MockGet(const std::string& response) { - EXPECT_CALL(mock_http_client, Get_t(_, _, _)).WillOnce(DoAll( + EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_broker_uri), _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), Return(response) )); } + void MockGetBrokerUri() { + EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/discovery/broker"), _, _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(expected_broker_uri))); + } + }; TEST_F(ServerDataBrokerTests, CanConnect) { @@ -85,7 +95,9 @@ TEST_F(ServerDataBrokerTests, GetNextReturnsErrorOnWrongInput) { TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) { - EXPECT_CALL(mock_http_client, Get_t("test/database/dbname/next", _, _)).WillOnce(DoAll( + MockGetBrokerUri(); + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/dbname/next", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), Return(""))); @@ -94,19 +106,26 @@ TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) { TEST_F(ServerDataBrokerTests, GetNextReturnsEOFFromHttpClient) { + MockGetBrokerUri(); + + EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::NotFound), + SetArgPointee<1>(HttpCode::Conflict), SetArgPointee<2>(nullptr), Return("{\"id\":1}"))); auto err = data_broker->GetNext(&info, nullptr); + ASSERT_THAT(err, Ne(nullptr)); ASSERT_THAT(err->Explain(), HasSubstr("timeout")); } TEST_F(ServerDataBrokerTests, GetNextReturnsWrongResponseFromHttpClient) { + + MockGetBrokerUri(); + EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::NotFound), + SetArgPointee<1>(HttpCode::Conflict), SetArgPointee<2>(nullptr), Return("id"))); @@ -115,17 +134,63 @@ TEST_F(ServerDataBrokerTests, GetNextReturnsWrongResponseFromHttpClient) { ASSERT_THAT(err->Explain(), HasSubstr("Cannot parse")); } +TEST_F(ServerDataBrokerTests, GetNextReturnsIfBrokerAddressNotFound) { + EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/discovery/broker"), _, + _)).Times(AtLeast(2)).WillRepeatedly(DoAll( + SetArgPointee<1>(HttpCode::NotFound), + SetArgPointee<2>(nullptr), + Return(""))); + + data_broker->SetTimeout(100); + auto err = data_broker->GetNext(&info, nullptr); + + ASSERT_THAT(err->Explain(), AllOf(HasSubstr("broker uri"), HasSubstr("cannot"))); +} + +TEST_F(ServerDataBrokerTests, GetNextReturnsIfBrokerUriEmpty) { + EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/discovery/broker"), _, + _)).Times(AtLeast(2)).WillRepeatedly(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(""))); + + data_broker->SetTimeout(100); + auto err = data_broker->GetNext(&info, nullptr); + + ASSERT_THAT(err->Explain(), AllOf(HasSubstr("broker uri"), HasSubstr("cannot"))); +} + + + +TEST_F(ServerDataBrokerTests, GetDoNotCallBrokerUriIfAlreadyFound) { + MockGetBrokerUri(); + MockGet("error_response"); + + data_broker->SetTimeout(100); + data_broker->GetNext(&info, nullptr); + Mock::VerifyAndClearExpectations(&mock_http_client); + + EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/discovery/broker"), _, _)).Times(0); + MockGet("error_response"); + data_broker->GetNext(&info, nullptr); +} + + TEST_F(ServerDataBrokerTests, GetNextReturnsEOFFromHttpClientUntilTimeout) { + MockGetBrokerUri(); + + EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::NotFound), + SetArgPointee<1>(HttpCode::Conflict), SetArgPointee<2>(nullptr), Return("{\"id\":1}"))); - EXPECT_CALL(mock_http_client, Get_t(HasSubstr("1"), _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll( - SetArgPointee<1>(HttpCode::NotFound), - SetArgPointee<2>(nullptr), - Return("{\"id\":1}"))); + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/dbname/1", _, + _)).Times(AtLeast(1)).WillRepeatedly(DoAll( + SetArgPointee<1>(HttpCode::Conflict), + SetArgPointee<2>(nullptr), + Return("{\"id\":1}"))); data_broker->SetTimeout(100); @@ -146,6 +211,8 @@ FileInfo CreateFI() { } TEST_F(ServerDataBrokerTests, GetNextReturnsFileInfo) { + MockGetBrokerUri(); + auto to_send = CreateFI(); auto json = to_send.Json(); @@ -163,6 +230,7 @@ TEST_F(ServerDataBrokerTests, GetNextReturnsFileInfo) { TEST_F(ServerDataBrokerTests, GetNextReturnsParseError) { + MockGetBrokerUri(); MockGet("error_response"); auto err = data_broker->GetNext(&info, nullptr); @@ -171,6 +239,7 @@ TEST_F(ServerDataBrokerTests, GetNextReturnsParseError) { TEST_F(ServerDataBrokerTests, GetNextReturnsIfNoDtataNeeded) { + MockGetBrokerUri(); MockGet("error_response"); EXPECT_CALL( mock_io, GetDataFromFile_t(_, _, _)).Times(0); @@ -178,9 +247,9 @@ TEST_F(ServerDataBrokerTests, GetNextReturnsIfNoDtataNeeded) { } TEST_F(ServerDataBrokerTests, GetNextCallsReadFromFile) { + MockGetBrokerUri(); auto to_send = CreateFI(); auto json = to_send.Json(); - MockGet(json); EXPECT_CALL(mock_io, GetDataFromFile_t("name", 100, _)).