From 6348ec88c10ab9553c0045d459ab5d1013e46697 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Thu, 24 May 2018 11:26:25 +0200 Subject: [PATCH] refacgtor, add connection tag to monitoring --- receiver/src/connection.cpp | 5 ++++- receiver/src/connection.h | 2 +- receiver/src/receiver.cpp | 3 ++- receiver/src/statistics.cpp | 7 +++++++ receiver/src/statistics.h | 8 ++++++- receiver/src/statistics_sender_influx_db.cpp | 3 +-- receiver/unittests/test_connection.cpp | 6 +++--- receiver/unittests/test_statistics.cpp | 21 +++++++++++++++++++ .../test_statistics_sender_influx_db.cpp | 3 ++- .../full_chain/simple_chain/CMakeLists.txt | 2 +- .../full_chain/simple_chain/check_linux.sh | 2 +- .../check_monitoring/CMakeLists.txt | 2 +- .../check_monitoring/check_linux.sh | 2 +- .../transfer_single_file/CMakeLists.txt | 2 +- .../transfer_single_file/check_linux.sh | 7 +++++-- tests/automatic/settings/receiver.json.tpl | 2 +- 16 files changed, 59 insertions(+), 18 deletions(-) diff --git a/receiver/src/connection.cpp b/receiver/src/connection.cpp index eb4165f6a..678bef893 100644 --- a/receiver/src/connection.cpp +++ b/receiver/src/connection.cpp @@ -11,11 +11,14 @@ namespace asapo { size_t Connection::kRequestHandlerMaxBufferSize; std::atomic<uint32_t> Connection::kNetworkProducerPeerImplGlobalCounter(0); -Connection::Connection(SocketDescriptor socket_fd, const std::string& address): request_factory__{new RequestFactory}, +Connection::Connection(SocketDescriptor socket_fd, const std::string& address,std::string receiver_tag): request_factory__{new RequestFactory}, io__{GenerateDefaultIO()}, statistics__{new Statistics}, log__{GetDefaultReceiverLogger()} { socket_fd_ = socket_fd; connection_id_ = kNetworkProducerPeerImplGlobalCounter++; address_ = address; + statistics__->AddTag("connection_from",address); + statistics__->AddTag("receiver_tag",std::move(receiver_tag)); + } uint64_t Connection::GetId() const noexcept { diff --git a/receiver/src/connection.h b/receiver/src/connection.h index 11fdf1eff..bde002ae6 100644 --- a/receiver/src/connection.h +++ b/receiver/src/connection.h @@ -29,7 +29,7 @@ class Connection { static size_t kRequestHandlerMaxBufferSize; static std::atomic<uint32_t> kNetworkProducerPeerImplGlobalCounter; - Connection(SocketDescriptor socket_fd, const std::string& address); + Connection(SocketDescriptor socket_fd, const std::string& address,std::string receiver_tag); ~Connection() = default; void Listen() const noexcept; diff --git a/receiver/src/receiver.cpp b/receiver/src/receiver.cpp index 8d2c1b1d0..568177a93 100644 --- a/receiver/src/receiver.cpp +++ b/receiver/src/receiver.cpp @@ -56,7 +56,8 @@ void Receiver::ProcessConnections(Error* err) { void Receiver::StartNewConnectionInSeparateThread(int connection_socket_fd, const std::string& address) { log__->Info("new connection from " + address); auto thread = io__->NewThread([connection_socket_fd, address] { - auto connection = std::unique_ptr<Connection>(new Connection(connection_socket_fd, address)); + // todo: reveicer tag + auto connection = std::unique_ptr<Connection>(new Connection(connection_socket_fd, address,"1")); connection->Listen(); }); diff --git a/receiver/src/statistics.cpp b/receiver/src/statistics.cpp index 3f0723315..4cb2a4c1b 100644 --- a/receiver/src/statistics.cpp +++ b/receiver/src/statistics.cpp @@ -23,6 +23,7 @@ StatisticsToSend Statistics::PrepareStatisticsToSend() const noexcept { stat.n_requests = nrequests_; stat.data_volume = volume_counter_; stat.elapsed_ms = std::max(uint64_t{1}, GetTotalElapsedMs()); + stat.tags = tag_; for (auto i = 0; i < kNStatisticEntities; i++) { stat.entity_shares[i] = double(GetElapsedMs(StatisticEntity(i))) / stat.elapsed_ms; } @@ -76,5 +77,11 @@ void Statistics::StopTimer() noexcept { time_counters_[current_statistic_entity_] += elapsed; } +void Statistics::AddTag(const std::string &name, const std::string &value) noexcept { + if (!tag_.empty()) { + tag_ += ","; + } + tag_ += name + "=" + value; +} } \ No newline at end of file diff --git a/receiver/src/statistics.h b/receiver/src/statistics.h index 35a3ef9b5..c52324c23 100644 --- a/receiver/src/statistics.h +++ b/receiver/src/statistics.h @@ -3,6 +3,8 @@ #include <chrono> #include <memory> +#include <string> + #include "statistics_sender.h" @@ -20,6 +22,7 @@ struct StatisticsToSend { uint64_t elapsed_ms; uint64_t data_volume; uint64_t n_requests; + std::string tags; }; #ifdef UNIT_TESTS @@ -35,8 +38,10 @@ class Statistics { VIRTUAL void StartTimer(const StatisticEntity& entity) noexcept; VIRTUAL void IncreaseRequestDataVolume(uint64_t transferred_data_volume) noexcept; VIRTUAL void StopTimer() noexcept; + VIRTUAL void AddTag(const std::string& name,const std::string& value) noexcept; + - void SetWriteInterval(uint64_t interval_ms); + void SetWriteInterval(uint64_t interval_ms); std::unique_ptr<StatisticsSender> statistics_sender__; private: uint64_t GetElapsedMs(StatisticEntity entity) const noexcept; @@ -51,6 +56,7 @@ class Statistics { std::chrono::nanoseconds time_counters_[kNStatisticEntities]; uint64_t volume_counter_; unsigned int write_interval_; + std::string tag_; }; diff --git a/receiver/src/statistics_sender_influx_db.cpp b/receiver/src/statistics_sender_influx_db.cpp index 886b29313..642f2aea9 100644 --- a/receiver/src/statistics_sender_influx_db.cpp +++ b/receiver/src/statistics_sender_influx_db.cpp @@ -41,8 +41,7 @@ void StatisticsSenderInfluxDb::SendStatistics(const StatisticsToSend& statistic) std::string StatisticsSenderInfluxDb::StatisticsToString(const StatisticsToSend& statistic) const noexcept { std::string str; - std::string tags = "receiver=1,connection=1"; - str = "statistics," + tags + " elapsed_ms=" + string_format("%ld", statistic.elapsed_ms); + str = "statistics," + statistic.tags + " elapsed_ms=" + string_format("%ld", statistic.elapsed_ms); str += ",data_volume=" + string_format("%ld", statistic.data_volume); str += ",n_requests=" + string_format("%ld", statistic.n_requests); str += ",db_share=" + string_format("%.4f", statistic.entity_shares[StatisticEntity::kDatabase]); diff --git a/receiver/unittests/test_connection.cpp b/receiver/unittests/test_connection.cpp index 45634411d..66dfdf13e 100644 --- a/receiver/unittests/test_connection.cpp +++ b/receiver/unittests/test_connection.cpp @@ -46,12 +46,12 @@ using asapo::MockStatistics; namespace { TEST(Connection, Constructor) { - Connection connection{0, "some_address"}; + Connection connection{0, "some_address","some_tag"}; ASSERT_THAT(dynamic_cast<asapo::Statistics*>(connection.statistics__.get()), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<asapo::IO*>(connection.io__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::RequestFactory*>(connection.request_factory__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(connection.log__), Ne(nullptr)); - } class MockRequestHandler: public Request { @@ -85,7 +85,7 @@ class MockRequestFactory: public asapo::RequestFactory { class ConnectionTests : public Test { public: std::string connected_uri{"some_address"}; - Connection connection{0, connected_uri}; + Connection connection{0, connected_uri,"some_tag"}; MockIO mock_io; MockRequestFactory mock_factory; NiceMock<MockStatistics> mock_statictics; diff --git a/receiver/unittests/test_statistics.cpp b/receiver/unittests/test_statistics.cpp index 01959339e..9a011cc68 100644 --- a/receiver/unittests/test_statistics.cpp +++ b/receiver/unittests/test_statistics.cpp @@ -60,6 +60,7 @@ ACTION_P(SaveArg1ToSendStat, value) { value->n_requests = resp.n_requests; value->data_volume = resp.data_volume; value->elapsed_ms = resp.elapsed_ms; + value->tags = resp.tags; for (int i = 0; i < asapo::kNStatisticEntities; i++) { value->entity_shares[i] = resp.entity_shares[i]; } @@ -92,6 +93,26 @@ TEST_F(StatisticTests, IncreaseRequestCounter) { ASSERT_THAT(stat.n_requests, Eq(1)); } +TEST_F(StatisticTests, AddTag) { + statistics.AddTag("name","value"); + + auto stat = ExtractStat(); + + ASSERT_THAT(stat.tags, Eq("name=value")); +} + +TEST_F(StatisticTests, AddTagTwice) { + statistics.AddTag("name1","value1"); + statistics.AddTag("name2","value2"); + + auto stat = ExtractStat(); + + ASSERT_THAT(stat.tags, Eq("name1=value1,name2=value2")); +} + + + + TEST_F(StatisticTests, StatisticsResetAfterSend) { statistics.IncreaseRequestCounter(); diff --git a/receiver/unittests/test_statistics_sender_influx_db.cpp b/receiver/unittests/test_statistics_sender_influx_db.cpp index 6408099ce..48f7f704c 100644 --- a/receiver/unittests/test_statistics_sender_influx_db.cpp +++ b/receiver/unittests/test_statistics_sender_influx_db.cpp @@ -61,6 +61,7 @@ class SenderInfluxDbTests : public Test { statistics.entity_shares[asapo::StatisticEntity::kDatabase] = 0.1; statistics.elapsed_ms = 100; statistics.data_volume = 1000; + statistics.tags = "name1=value1,name2=value2"; config.monitor_db_uri = "test_uri"; config.monitor_db_name = "test_name"; @@ -76,7 +77,7 @@ class SenderInfluxDbTests : public Test { TEST_F(SenderInfluxDbTests, SendStatisticsCallsPost) { - std::string expect_string = "statistics,receiver=1,connection=1 elapsed_ms=100,data_volume=1000," + std::string expect_string = "statistics,name1=value1,name2=value2 elapsed_ms=100,data_volume=1000," "n_requests=4,db_share=0.1000,network_share=0.3000,disk_share=0.6000"; EXPECT_CALL(mock_http_client, Post_t("test_uri/write?db=test_name", expect_string, _, _)). WillOnce( diff --git a/tests/automatic/full_chain/simple_chain/CMakeLists.txt b/tests/automatic/full_chain/simple_chain/CMakeLists.txt index 2d74dbc84..a63811d4f 100644 --- a/tests/automatic/full_chain/simple_chain/CMakeLists.txt +++ b/tests/automatic/full_chain/simple_chain/CMakeLists.txt @@ -4,4 +4,4 @@ set(TARGET_NAME full_chain_simple_chain) # Testing ################################ prepare_asapo() -add_script_test("${TARGET_NAME}" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:receiver-bin> $<TARGET_PROPERTY:asapo-broker,EXENAME> $<TARGET_FILE:getnext_broker>" nomem) +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:getnext_broker>" nomem) diff --git a/tests/automatic/full_chain/simple_chain/check_linux.sh b/tests/automatic/full_chain/simple_chain/check_linux.sh index dd1893765..8640f1fa5 100644 --- a/tests/automatic/full_chain/simple_chain/check_linux.sh +++ b/tests/automatic/full_chain/simple_chain/check_linux.sh @@ -34,4 +34,4 @@ $1 localhost:5006 100 100 4 0 & #producerrid=`echo $!` sleep 0.1 -$4 ${broker_address} ${broker_database_name} 2 | grep "Processed 100 file(s)" +$2 ${broker_address} ${broker_database_name} 2 | grep "Processed 100 file(s)" diff --git a/tests/automatic/producer_receiver/check_monitoring/CMakeLists.txt b/tests/automatic/producer_receiver/check_monitoring/CMakeLists.txt index 45cadecbc..80b82ae8e 100644 --- a/tests/automatic/producer_receiver/check_monitoring/CMakeLists.txt +++ b/tests/automatic/producer_receiver/check_monitoring/CMakeLists.txt @@ -5,4 +5,4 @@ set(TARGET_NAME receiver) ################################ prepare_asapo() -add_script_test("${TARGET_NAME}-monitoring" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:receiver-bin> $<TARGET_PROPERTY:asapo-discovery,EXENAME>" nomem) +add_script_test("${TARGET_NAME}-monitoring" "$<TARGET_FILE:dummy-data-producer>" nomem) diff --git a/tests/automatic/producer_receiver/check_monitoring/check_linux.sh b/tests/automatic/producer_receiver/check_monitoring/check_linux.sh index 7c7e9eb0a..9c2ea042c 100644 --- a/tests/automatic/producer_receiver/check_monitoring/check_linux.sh +++ b/tests/automatic/producer_receiver/check_monitoring/check_linux.sh @@ -29,4 +29,4 @@ $1 localhost:5006 100 112 4 0 sleep 1 -influx -execute "select sum(n_requests) from statistics" -database=${database_name} -format=json | jq .results[0].series[0].values[0][1] | grep 112 +influx -execute "select sum(n_requests) from statistics" -database=${database_name} -format=json jq .results[0].series[0].values[0][1] | grep 112 diff --git a/tests/automatic/producer_receiver/transfer_single_file/CMakeLists.txt b/tests/automatic/producer_receiver/transfer_single_file/CMakeLists.txt index c74ddedc8..7b299ca60 100644 --- a/tests/automatic/producer_receiver/transfer_single_file/CMakeLists.txt +++ b/tests/automatic/producer_receiver/transfer_single_file/CMakeLists.txt @@ -4,4 +4,4 @@ set(TARGET_NAME transfer-single-file) # Testing ################################ prepare_asapo() -add_script_test("${TARGET_NAME}" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:receiver-bin>" nomem) +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:dummy-data-producer>" nomem) diff --git a/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh b/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh index ae8edbf6c..07334882f 100644 --- a/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh +++ b/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh @@ -4,14 +4,17 @@ set -e trap Cleanup EXIT -database_name=test_run +database_name=db_test +mongo_database_name=test_run Cleanup() { echo cleanup rm -rf files nomad stop receiver nomad stop discovery - echo "db.dropDatabase()" | mongo ${database_name} + echo "db.dropDatabase()" | mongo ${mongo_database_name} + influx -execute "drop database ${database_name}" + } nomad run receiver.nmd diff --git a/tests/automatic/settings/receiver.json.tpl b/tests/automatic/settings/receiver.json.tpl index 0d0abd37e..ca59f401b 100644 --- a/tests/automatic/settings/receiver.json.tpl +++ b/tests/automatic/settings/receiver.json.tpl @@ -6,5 +6,5 @@ "ListenPort": {{ env "NOMAD_PORT_recv" }}, "WriteToDisk":true, "WriteToDb":true, - "LogLevel" : "info" + "LogLevel" : "debug" } -- GitLab