diff --git a/3d_party/rapidjson/include/rapidjson/reader.h b/3d_party/rapidjson/include/rapidjson/reader.h index 206b261417b9bc15594fec165685339123b639c0..9b28b268530ca29101b4e351fd4228899a223e4e 100644 --- a/3d_party/rapidjson/include/rapidjson/reader.h +++ b/3d_party/rapidjson/include/rapidjson/reader.h @@ -1223,8 +1223,7 @@ class GenericReader { } i = i * 10 + static_cast<unsigned>(s.TakePush() - '0'); significandDigit++; - } - else + } else while (RAPIDJSON_LIKELY(s.Peek() >= '0' && s.Peek() <= '9')) { if (RAPIDJSON_UNLIKELY(i >= 429496729)) { // 2^32 - 1 = 4294967295 if (RAPIDJSON_LIKELY(i != 429496729 || s.Peek() > '5')) { @@ -1265,8 +1264,7 @@ class GenericReader { } i64 = i64 * 10 + static_cast<unsigned>(s.TakePush() - '0'); significandDigit++; - } - else + } else while (RAPIDJSON_LIKELY(s.Peek() >= '0' && s.Peek() <= '9')) { if (RAPIDJSON_UNLIKELY(i64 >= RAPIDJSON_UINT64_C2(0x19999999, 0x99999999))) // 2^64 - 1 = 18446744073709551615 if (RAPIDJSON_LIKELY(i64 != RAPIDJSON_UINT64_C2(0x19999999, 0x99999999) || s.Peek() > '5')) { diff --git a/common/cpp/include/common/error.h b/common/cpp/include/common/error.h index e4602afd5a948194d8d7eb2214a6c40d69945c15..71c32b6e80c816d207bb6639952419c70d67e7de 100644 --- a/common/cpp/include/common/error.h +++ b/common/cpp/include/common/error.h @@ -146,11 +146,11 @@ inline Error TextErrorWithType(const std::string& error, ErrorType error_type) { namespace ErrorTemplates { auto const kMemoryAllocationError = SimpleErrorTemplate { - "kMemoryAllocationError", ErrorType::kMemoryAllocationError -}; + "kMemoryAllocationError", ErrorType::kMemoryAllocationError + }; auto const kEndOfFile = SimpleErrorTemplate { - "End of file", ErrorType::kEndOfFile -}; + "End of file", ErrorType::kEndOfFile + }; } diff --git a/common/cpp/include/common/io_error.h b/common/cpp/include/common/io_error.h index 40c9d4dd7e2c62ad73531d0a4aa18ebbbdeff2d0..db3606b3b0a8ea5276a1489fc1b703168f96b379 100644 --- a/common/cpp/include/common/io_error.h +++ b/common/cpp/include/common/io_error.h @@ -72,68 +72,68 @@ static inline std::ostream& operator<<(std::ostream& os, const IOErrorTemplate& namespace IOErrorTemplates { auto const kUnknownIOError = IOErrorTemplate { - "Unknown Error", IOErrorType::kUnknownIOError -}; + "Unknown Error", IOErrorType::kUnknownIOError + }; auto const kFileNotFound = IOErrorTemplate { - "No such file or directory", IOErrorType::kFileNotFound -}; + "No such file or directory", IOErrorType::kFileNotFound + }; auto const kReadError = IOErrorTemplate { - "Read error", IOErrorType::kReadError -}; + "Read error", IOErrorType::kReadError + }; auto const kBadFileNumber = IOErrorTemplate { - "Bad file number", IOErrorType::kBadFileNumber -}; + "Bad file number", IOErrorType::kBadFileNumber + }; auto const kResourceTemporarilyUnavailable = IOErrorTemplate { - "Resource temporarily unavailable", IOErrorType::kResourceTemporarilyUnavailable -}; + "Resource temporarily unavailable", IOErrorType::kResourceTemporarilyUnavailable + }; auto const kPermissionDenied = IOErrorTemplate { - "Permission denied", IOErrorType::kPermissionDenied -}; + "Permission denied", IOErrorType::kPermissionDenied + }; auto const kUnsupportedAddressFamily = IOErrorTemplate { - "Unsupported address family", IOErrorType::kUnsupportedAddressFamily -}; + "Unsupported address family", IOErrorType::kUnsupportedAddressFamily + }; auto const kInvalidAddressFormat = IOErrorTemplate { - "Invalid address format", IOErrorType::kInvalidAddressFormat -}; + "Invalid address format", IOErrorType::kInvalidAddressFormat + }; auto const kAddressAlreadyInUse = IOErrorTemplate { - "Address already in use", IOErrorType::kAddressAlreadyInUse -}; + "Address already in use", IOErrorType::kAddressAlreadyInUse + }; auto const kConnectionRefused = IOErrorTemplate { - "Connection refused", IOErrorType::kConnectionRefused -}; + "Connection refused", IOErrorType::kConnectionRefused + }; auto const kConnectionResetByPeer = IOErrorTemplate { - "kConnectionResetByPeer", IOErrorType::kConnectionResetByPeer -}; + "kConnectionResetByPeer", IOErrorType::kConnectionResetByPeer + }; auto const kTimeout = IOErrorTemplate { - "kTimeout", IOErrorType::kTimeout -}; + "kTimeout", IOErrorType::kTimeout + }; auto const kFileAlreadyExists = IOErrorTemplate { - "kFileAlreadyExists", IOErrorType::kFileAlreadyExists -}; + "kFileAlreadyExists", IOErrorType::kFileAlreadyExists + }; auto const kNoSpaceLeft = IOErrorTemplate { - "kNoSpaceLeft", IOErrorType::kNoSpaceLeft -}; + "kNoSpaceLeft", IOErrorType::kNoSpaceLeft + }; auto const kSocketOperationOnNonSocket = IOErrorTemplate { - "kSocketOperationOnNonSocket", IOErrorType::kSocketOperationOnNonSocket -}; + "kSocketOperationOnNonSocket", IOErrorType::kSocketOperationOnNonSocket + }; auto const kInvalidMemoryAddress = IOErrorTemplate { - "kInvalidMemoryAddress", IOErrorType::kInvalidMemoryAddress -}; + "kInvalidMemoryAddress", IOErrorType::kInvalidMemoryAddress + }; auto const kUnableToResolveHostname = IOErrorTemplate { - "kUnableToResolveHostname", IOErrorType::kUnableToResolveHostname -}; + "kUnableToResolveHostname", IOErrorType::kUnableToResolveHostname + }; auto const kSocketOperationUnknownAtLevel = IOErrorTemplate { - "kSocketOperationUnknownAtLevel", IOErrorType::kSocketOperationUnknownAtLevel -}; + "kSocketOperationUnknownAtLevel", IOErrorType::kSocketOperationUnknownAtLevel + }; auto const kSocketOperationValueOutOfBound = IOErrorTemplate { - "kSocketOperationValueOutOfBound", IOErrorType::kSocketOperationValueOutOfBound -}; + "kSocketOperationValueOutOfBound", IOErrorType::kSocketOperationValueOutOfBound + }; auto const kAddressNotValid = IOErrorTemplate { - "Address not valid", IOErrorType::kAddressNotValid -}; + "Address not valid", IOErrorType::kAddressNotValid + }; } diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index a9476d52422276874a68b692957d4d034c19191b..658ce992e758ee664852e7a3046a6ae3fb0b5d96 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -45,12 +45,12 @@ bool SendDummyData(hidra2::Producer* producer, size_t number_of_byte, uint64_t i int main (int argc, char* argv[]) { std::string receiver_address; - size_t number_of_byte; + size_t number_of_kbytes; uint64_t iterations; - std::tie(receiver_address, number_of_byte, iterations) = ProcessCommandArguments(argc, argv); + std::tie(receiver_address, number_of_kbytes, iterations) = ProcessCommandArguments(argc, argv); std::cout << "receiver_address: " << receiver_address << std::endl - << "number_of_byte: " << number_of_byte << std::endl + << "number_of_kbytes: " << number_of_kbytes << std::endl << "iterations: " << iterations << std::endl << std::endl; @@ -62,9 +62,16 @@ int main (int argc, char* argv[]) { } std::cout << "Successfully connected" << std::endl; - if(!SendDummyData(producer.get(), number_of_byte, iterations)) { + high_resolution_clock::time_point t1 = high_resolution_clock::now(); + if(!SendDummyData(producer.get(), number_of_kbytes * 1024, iterations)) { return EXIT_FAILURE; } + high_resolution_clock::time_point t2 = high_resolution_clock::now(); + double duration_sec = std::chrono::duration_cast<std::chrono::milliseconds>( t2 - t1 ).count() / 1000.0; + double size_gb = double(number_of_kbytes) * iterations / 1024.0 / 1024.0; + double rate = iterations / duration_sec; + std::cout << "Rate: " << rate << " Hz" << std::endl; + std::cout << "Bandwidth " << size_gb / duration_sec << " GB/s" << std::endl; return EXIT_SUCCESS; } diff --git a/producer/api/include/producer/producer_error.h b/producer/api/include/producer/producer_error.h index cdcf69a742ad54dd6d248f654e56953ce6faf701..decf910834fe976b84b787e61e476bb22e17c8e1 100644 --- a/producer/api/include/producer/producer_error.h +++ b/producer/api/include/producer/producer_error.h @@ -53,23 +53,23 @@ class ProducerErrorTemplate : public SimpleErrorTemplate { namespace ProducerErrorTemplates { auto const kAlreadyConnected = ProducerErrorTemplate { - "Already connected", ProducerErrorType::kAlreadyConnected -}; + "Already connected", ProducerErrorType::kAlreadyConnected + }; auto const kConnectionNotReady = ProducerErrorTemplate { - "Connection not ready", ProducerErrorType::kConnectionNotReady -}; + "Connection not ready", ProducerErrorType::kConnectionNotReady + }; auto const kFileTooLarge = ProducerErrorTemplate { - "File too large", ProducerErrorType::kFileTooLarge -}; + "File too large", ProducerErrorType::kFileTooLarge + }; auto const kFileIdAlreadyInUse = ProducerErrorTemplate { - "File already in use", ProducerErrorType::kFileIdAlreadyInUse -}; + "File already in use", ProducerErrorType::kFileIdAlreadyInUse + }; auto const kUnknownServerError = ProducerErrorTemplate { - "Unknown server error", ProducerErrorType::kUnknownServerError -}; + "Unknown server error", ProducerErrorType::kUnknownServerError + }; }; diff --git a/receiver/src/connection.cpp b/receiver/src/connection.cpp index 8cd2f4a1595832e172e1e01f0df236e8898b9b08..1f3794491fae420280a1c2442c0faec06b500ca0 100644 --- a/receiver/src/connection.cpp +++ b/receiver/src/connection.cpp @@ -69,7 +69,8 @@ void Connection::Listen() const noexcept { ProcessStatisticsAfterRequest(request); } io__->CloseSocket(socket_fd_, nullptr); - std::cout << "[" << GetId() << "] Disconnected." << std::endl; + statistics__->Send(); + std::cerr << "[" << GetId() << "] Disconnected." << std::endl; } diff --git a/receiver/src/receiver_error.h b/receiver/src/receiver_error.h index 42eb039d8e5720e23a98dfded74973b788233235..63da34f6fe47518ca7d55278a47e82b0f302d7ed 100644 --- a/receiver/src/receiver_error.h +++ b/receiver/src/receiver_error.h @@ -50,11 +50,11 @@ class ReceiverErrorTemplate : public SimpleErrorTemplate { namespace ReceiverErrorTemplates { auto const kInvalidOpCode = ReceiverErrorTemplate { - "Invalid Opcode", ReceiverErrorType::kInvalidOpCode -}; + "Invalid Opcode", ReceiverErrorType::kInvalidOpCode + }; auto const kBadRequest = ReceiverErrorTemplate { - "Bad request", ReceiverErrorType::kBadRequest -}; + "Bad request", ReceiverErrorType::kBadRequest + }; }; } diff --git a/receiver/src/statistics.cpp b/receiver/src/statistics.cpp index 49a032cf68744d6fdb7d0b7d5744aa734aa471e7..fa806eb46bb702d55c95b4ff690411a8c6b248b8 100644 --- a/receiver/src/statistics.cpp +++ b/receiver/src/statistics.cpp @@ -1,23 +1,28 @@ #include "statistics.h" #include "statistics_sender_influx_db.h" +#include <algorithm> using std::chrono::high_resolution_clock; namespace hidra2 { void Statistics::SendIfNeeded() noexcept { - auto elapsed_ms = GetTotalElapsedMs(); - if (elapsed_ms > write_interval_) { - statistics_sender__->SendStatistics(PrepareStatisticsToSend()); - ResetStatistics(); + if (GetTotalElapsedMs() > write_interval_) { + Send(); } } +void Statistics::Send() noexcept { + statistics_sender__->SendStatistics(PrepareStatisticsToSend()); + ResetStatistics(); +} + + StatisticsToSend Statistics::PrepareStatisticsToSend() const noexcept { StatisticsToSend stat; stat.n_requests = nrequests_; stat.data_volume = volume_counter_; - stat.elapsed_ms = GetTotalElapsedMs(); + stat.elapsed_ms = std::max(uint64_t{1}, GetTotalElapsedMs()); for (auto i = 0; i < kNStatisticEntities; i++) { stat.entity_shares[i] = double(GetElapsedMs(StatisticEntity(i))) / stat.elapsed_ms; } diff --git a/receiver/src/statistics.h b/receiver/src/statistics.h index cd3cb2986a134df08460f7b9e3860356c664b125..de374ed981a2dc9236cbeec7ae792770b32b6ffb 100644 --- a/receiver/src/statistics.h +++ b/receiver/src/statistics.h @@ -26,6 +26,7 @@ class Statistics { public: // virtual needed for unittests, could be replaced with #define VIRTUAL ... in case of performance issues virtual void SendIfNeeded() noexcept; + virtual void Send() noexcept; explicit Statistics(unsigned int write_interval = kDefaultStatisticWriteIntervalMs); virtual void IncreaseRequestCounter() noexcept; virtual void StartTimer(const StatisticEntity& entity) noexcept; diff --git a/receiver/src/statistics_sender_influx_db.cpp b/receiver/src/statistics_sender_influx_db.cpp index beed7f232d5340ae56c2dd5992c8688f01062936..f1ba02590bcf60e321b74e12a0e93a325b6d5f4b 100644 --- a/receiver/src/statistics_sender_influx_db.cpp +++ b/receiver/src/statistics_sender_influx_db.cpp @@ -20,7 +20,7 @@ void StatisticsSenderInfluxDb::SendStatistics(const StatisticsToSend& statistic) HttpCode code; Error err; //TODO influxdb uri from config - auto responce = httpclient__->Post("http://zitpcx27016.desy.de:8086/write?db=db_test", StatisticsToString(statistic), + auto responce = httpclient__->Post("localhost:8086/write?db=db_test", StatisticsToString(statistic), &code, &err); if (err) { diff --git a/receiver/unittests/mock_statistics.h b/receiver/unittests/mock_statistics.h index 6c34db0e55a48e331b99ddd635a6cfd291b14c1f..dc4077a72ff9f683a106df5368c1de03da4c4f9c 100644 --- a/receiver/unittests/mock_statistics.h +++ b/receiver/unittests/mock_statistics.h @@ -13,6 +13,11 @@ class MockStatistics : public hidra2::Statistics { void SendIfNeeded() noexcept override { SendIfNeeded_t(); } + + void Send() noexcept override { + Send_t(); + } + void IncreaseRequestCounter() noexcept override { IncreaseRequestCounter_t(); } @@ -27,6 +32,7 @@ class MockStatistics : public hidra2::Statistics { } MOCK_METHOD0(SendIfNeeded_t, void()); + MOCK_METHOD0(Send_t, void()); MOCK_METHOD0(IncreaseRequestCounter_t, void()); MOCK_METHOD0(StopTimer_t, void()); MOCK_METHOD1(IncreaseRequestDataVolume_t, void (uint64_t diff --git a/receiver/unittests/test_connection.cpp b/receiver/unittests/test_connection.cpp index ccdfb640a9d65756166a439662380ac8550c051f..d09e26507ecb8e02de64b065bd1673799f5ac736 100644 --- a/receiver/unittests/test_connection.cpp +++ b/receiver/unittests/test_connection.cpp @@ -91,6 +91,7 @@ class ConnectionTests : public Test { WillByDefault(DoAll(testing::SetArgPointee<4>(nullptr), testing::Return(0))); EXPECT_CALL(mock_io, CloseSocket_t(_, _)); + EXPECT_CALL(mock_statictics, Send_t()); } void TearDown() override { diff --git a/receiver/unittests/test_request.cpp b/receiver/unittests/test_request.cpp index 3a98aaadbbbde36b8c7b8a2e71805d6bd9a1fa85..8b4d19aa81bc3fcdea7086476aaa98d5393eace4 100644 --- a/receiver/unittests/test_request.cpp +++ b/receiver/unittests/test_request.cpp @@ -90,7 +90,7 @@ class RequestTests : public Test { uint64_t data_id_{15}; std::unique_ptr<Request> request; NiceMock<MockIO> mock_io; - MockStatistics mock_statistics; + NiceMock<MockStatistics> mock_statistics; std::unique_ptr<hidra2::Statistics> stat; void SetUp() override { stat = std::unique_ptr<hidra2::Statistics> {&mock_statistics}; diff --git a/receiver/unittests/test_statistics.cpp b/receiver/unittests/test_statistics.cpp index 4ba29133e28e21aab9e7d3a5b52a6a1c2ff7dc9f..13115b7b7eb2b9d47f1ab9b5524453ebf18f42d3 100644 --- a/receiver/unittests/test_statistics.cpp +++ b/receiver/unittests/test_statistics.cpp @@ -189,4 +189,25 @@ TEST_F(StatisticTests, SendStaticsDoesCallsSender) { } +TEST_F(StatisticTests, StatisticsSend) { + statistics.IncreaseRequestCounter(); + + StatisticsToSend stat; + stat.elapsed_ms = 0; + stat.n_requests = 0; + stat.data_volume = 0; + for (int i = 0; i < hidra2::kNStatisticEntities; i++) { + stat.entity_shares[i] = 0.0; + } + + EXPECT_CALL(mock_statistics_sender, SendStatistics_t(_)). + WillOnce(SaveArg1ToSendStat(&stat)); + + statistics.Send(); + std::cout << stat.elapsed_ms << std::endl; + + ASSERT_THAT(stat.elapsed_ms, Ge(1)); +} + + } diff --git a/receiver/unittests/test_statistics_sender_influx_db.cpp b/receiver/unittests/test_statistics_sender_influx_db.cpp index 7c2b5a5f4eb24166f2553d99ceb5c45e6dcb4527..81f91db1b679117f6a7edd90801025c3ea55e8f9 100644 --- a/receiver/unittests/test_statistics_sender_influx_db.cpp +++ b/receiver/unittests/test_statistics_sender_influx_db.cpp @@ -59,7 +59,7 @@ TEST_F(SenderInfluxDbTests, SendStatisticsCallsPost) { statistics.data_volume = 1000; std::string expect_string = "statistics,receiver=1,connection=1 elapsed_ms=100,data_volume=1000," "n_requests=4,db_share=0.1000,network_share=0.3000,disk_share=0.6000"; - EXPECT_CALL(mock_http_client, Post_t("http://zitpcx27016.desy.de:8086/write?db=db_test", expect_string, _, _)). + EXPECT_CALL(mock_http_client, Post_t("localhost:8086/write?db=db_test", expect_string, _, _)). WillOnce( DoAll(SetArgPointee<3>(new hidra2::IOError("Test Read Error", hidra2::IOErrorType::kReadError)), Return("") diff --git a/tests/automatic/producer_receiver/CMakeLists.txt b/tests/automatic/producer_receiver/CMakeLists.txt index c4da20d000bc7375e799fcd9b063a640d75c506a..ad36394912da0e110b2707f73e0270452030d41f 100644 --- a/tests/automatic/producer_receiver/CMakeLists.txt +++ b/tests/automatic/producer_receiver/CMakeLists.txt @@ -1 +1,4 @@ add_subdirectory(transfer_single_file) +if (UNIX) + add_subdirectory(check_monitoring) +endif() \ No newline at end of file diff --git a/tests/automatic/producer_receiver/check_monitoring/CMakeLists.txt b/tests/automatic/producer_receiver/check_monitoring/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..d783ebe25d5bf941cd3c94d7b112a24c5ed75e11 --- /dev/null +++ b/tests/automatic/producer_receiver/check_monitoring/CMakeLists.txt @@ -0,0 +1,7 @@ +set(TARGET_NAME receiver) + +################################ +# Testing +################################ +add_script_test("${TARGET_NAME}-monitoring" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:receiver-bin>" nomem + ) diff --git a/tests/automatic/producer_receiver/check_monitoring/check_linux.sh b/tests/automatic/producer_receiver/check_monitoring/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..7f7dbb65dc12d17f74aac4856bf2ab4f4a534f44 --- /dev/null +++ b/tests/automatic/producer_receiver/check_monitoring/check_linux.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash + +database_name=db_test + +set -e + +trap Cleanup EXIT + +Cleanup() { + echo cleanup + influx -execute "drop database ${database_name}" + kill $receiverid + rm -rf files + +} + +influx -execute "create database ${database_name}" + +nohup $2 &>/dev/null & +sleep 0.3 +receiverid=`echo $!` + +mkdir files + +$1 localhost:4200 100 112 + +sleep 1 + +influx -execute "select sum(n_requests) from statistics" -database=${database_name} -format=json | jq .results[0].series[0].values[0][1] | grep 112 diff --git a/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh b/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh index 163d4ab08b8144ea133ca0f6f48899b304234e63..f03eaad1067fb4a3056f9ac1504d07139c1aa8df 100644 --- a/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh +++ b/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh @@ -18,4 +18,4 @@ mkdir files $1 localhost:4200 100 1 -ls -ln files/0.bin | awk '{ print $5 }'| grep 100 +ls -ln files/0.bin | awk '{ print $5 }'| grep 102400 diff --git a/tests/automatic/producer_receiver/transfer_single_file/check_windows.bat b/tests/automatic/producer_receiver/transfer_single_file/check_windows.bat index 9413a82a228200e5a50ed87135e10450ab471714..c1f3a66fda640e07332fa0b755f54c0b34d23b4a 100644 --- a/tests/automatic/producer_receiver/transfer_single_file/check_windows.bat +++ b/tests/automatic/producer_receiver/transfer_single_file/check_windows.bat @@ -13,7 +13,7 @@ ping 1.0.0.0 -n 1 -w 100 > nul FOR /F "usebackq" %%A IN ('files\0.bin') DO set size=%%~zA -if %size% NEQ 100 goto :error +if %size% NEQ 102400 goto :error goto :clean