From 4bd86ffe70a88d560315e8c35ed43ed5a11def8d Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Thu, 27 Jan 2022 11:25:48 +0100 Subject: [PATCH] fix memleaks and race conditions --- common/cpp/src/system_io/system_io.cpp | 8 +++----- receiver/src/main.cpp | 2 +- .../monitoring/receiver_monitoring_client.h | 2 +- .../receiver_monitoring_client_impl.cpp | 20 ++++++++++++++++--- .../receiver_monitoring_client_impl.h | 6 +++--- .../receiver_monitoring_client_noop.cpp | 4 ++++ .../receiver_monitoring_client_noop.h | 1 + .../monitoring/receiver_monitoring_mocking.h | 1 + .../monitoring/test_monitoring_client.cpp | 17 ++++++++-------- 9 files changed, 39 insertions(+), 22 deletions(-) diff --git a/common/cpp/src/system_io/system_io.cpp b/common/cpp/src/system_io/system_io.cpp index aef76f2d1..e0796f61d 100644 --- a/common/cpp/src/system_io/system_io.cpp +++ b/common/cpp/src/system_io/system_io.cpp @@ -645,13 +645,11 @@ Error SystemIO::RemoveFile(const std::string& fname) const { std::string SystemIO::GetHostName(Error* err) const noexcept { char host[1024]; - gethostname(host, sizeof(host)); - *err = GetLastError(); - if (*err) { + if (gethostname(host, sizeof(host))!=0) { + *err = GetLastError(); return ""; - } else { - return host; } + return host; } Error SystemIO::SendFile(SocketDescriptor socket_fd, const std::string& fname, size_t length) const { diff --git a/receiver/src/main.cpp b/receiver/src/main.cpp index d7d5905f7..c2b9c63bd 100644 --- a/receiver/src/main.cpp +++ b/receiver/src/main.cpp @@ -51,7 +51,7 @@ void AddDataServers(const asapo::ReceiverConfig* config, const asapo::SharedCach asapo::SharedReceiverMonitoringClient StartMonitoringClient(const asapo::ReceiverConfig* config, asapo::SharedCache cache, asapo::Error* error) { bool useNoopImpl = !config->monitor_performance; auto monitoring = asapo::SharedReceiverMonitoringClient(asapo::GenerateDefaultReceiverMonitoringClient(cache, useNoopImpl)); - + monitoring->StartMonitoring(); *error = nullptr; return monitoring; } diff --git a/receiver/src/monitoring/receiver_monitoring_client.h b/receiver/src/monitoring/receiver_monitoring_client.h index 857f011fc..cfdfd0d96 100644 --- a/receiver/src/monitoring/receiver_monitoring_client.h +++ b/receiver/src/monitoring/receiver_monitoring_client.h @@ -17,7 +17,7 @@ public: ReceiverMonitoringClient(const ReceiverMonitoringClient&) = delete; ReceiverMonitoringClient& operator=(const ReceiverMonitoringClient&) = delete; - + virtual void StartMonitoring() = 0; virtual void SendProducerToReceiverTransferDataPoint(const std::string& pipelineStepId, const std::string& producerInstanceId, const std::string& beamtime, diff --git a/receiver/src/monitoring/receiver_monitoring_client_impl.cpp b/receiver/src/monitoring/receiver_monitoring_client_impl.cpp index 05ac3c321..ce47f6f72 100644 --- a/receiver/src/monitoring/receiver_monitoring_client_impl.cpp +++ b/receiver/src/monitoring/receiver_monitoring_client_impl.cpp @@ -26,9 +26,15 @@ asapo::ReceiverMonitoringClientImpl::ReceiverMonitoringClientImpl(asapo::SharedC } receiverName_ = "receiver_" + hostname + "_" + std::to_string(io__->GetCurrentPid()); + +} + +void ReceiverMonitoringClientImpl::StartMonitoring() { ReinitializeClient(); + StartSendingThread(); } + void asapo::ReceiverMonitoringClientImpl::StartSendingThread() { if (sendingThreadRunning__) { return; @@ -47,10 +53,16 @@ void asapo::ReceiverMonitoringClientImpl::StartSendingThread() { } void asapo::ReceiverMonitoringClientImpl::StopSendingThread() { + if (!sendingThreadRunning__) { + log__->Info("sending thread already stopped"); + return; + } + log__->Info("Stopping sending thread"); sendingThreadRunning__ = false; if (sendingThread_.get() && sendingThread_->joinable()) { sendingThread_->join(); + sendingThread_=nullptr; } log__->Info("Stopped sending thread"); } @@ -228,9 +240,6 @@ asapo::Error asapo::ReceiverMonitoringClientImpl::ReinitializeClient() { newClient.reset(); // delete client first newChannel.reset(); } - - StartSendingThread(); - return nullptr; } @@ -345,4 +354,9 @@ uint64_t asapo::ReceiverMonitoringClientImpl::WaitTimeMsUntilNextInterval(std::c return result; } +ReceiverMonitoringClientImpl::~ReceiverMonitoringClientImpl() { + StopSendingThread(); +} + + diff --git a/receiver/src/monitoring/receiver_monitoring_client_impl.h b/receiver/src/monitoring/receiver_monitoring_client_impl.h index 8f28670d4..04fc5b7f4 100644 --- a/receiver/src/monitoring/receiver_monitoring_client_impl.h +++ b/receiver/src/monitoring/receiver_monitoring_client_impl.h @@ -43,10 +43,10 @@ public: explicit ReceiverMonitoringClientImpl(SharedCache cache); ReceiverMonitoringClientImpl(const ReceiverMonitoringClientImpl&) = delete; ReceiverMonitoringClientImpl& operator=(const ReceiverMonitoringClientImpl&) = delete; - + ~ReceiverMonitoringClientImpl(); void StartSendingThread(); void StopSendingThread(); - + void StartMonitoring() override; void SendProducerToReceiverTransferDataPoint(const std::string& pipelineStepId, const std::string& producerInstanceId, const std::string& beamtime, @@ -85,7 +85,7 @@ public: //std::mutex mutex; ReceiverDataPointContainer container; ASAPO_VIRTUAL ~ToBeSendData()=default; - ASAPO_VIRTUAL ProducerToReceiverTransferDataPoint* GetProducerToReceiverTransfer( + ASAPO_VIRTUAL ProducerToReceiverTransferDataPoint* GetProducerToReceiverTransfer( const std::string& pipelineStepId, const std::string& producerInstanceId, const std::string& beamtime, diff --git a/receiver/src/monitoring/receiver_monitoring_client_noop.cpp b/receiver/src/monitoring/receiver_monitoring_client_noop.cpp index 1679476c1..1208c444e 100644 --- a/receiver/src/monitoring/receiver_monitoring_client_noop.cpp +++ b/receiver/src/monitoring/receiver_monitoring_client_noop.cpp @@ -52,3 +52,7 @@ void asapo::ReceiverMonitoringClientNoop::SendReceiverRequestDataPoint(const std void asapo::ReceiverMonitoringClientNoop::FillMemoryStats() { } + +void asapo::ReceiverMonitoringClientNoop::StartMonitoring() { + +} diff --git a/receiver/src/monitoring/receiver_monitoring_client_noop.h b/receiver/src/monitoring/receiver_monitoring_client_noop.h index 7950fc11b..a95dca0d6 100644 --- a/receiver/src/monitoring/receiver_monitoring_client_noop.h +++ b/receiver/src/monitoring/receiver_monitoring_client_noop.h @@ -6,6 +6,7 @@ namespace asapo { class ReceiverMonitoringClientNoop : public ReceiverMonitoringClient { public: + void StartMonitoring() override; void SendProducerToReceiverTransferDataPoint(const std::string& pipelineStepId, const std::string& producerInstanceId, const std::string& beamtime, const std::string& source, const std::string& stream, diff --git a/receiver/unittests/monitoring/receiver_monitoring_mocking.h b/receiver/unittests/monitoring/receiver_monitoring_mocking.h index 506bd360b..53094572d 100644 --- a/receiver/unittests/monitoring/receiver_monitoring_mocking.h +++ b/receiver/unittests/monitoring/receiver_monitoring_mocking.h @@ -10,6 +10,7 @@ namespace asapo { class MockReceiverMonitoringClient : public asapo::ReceiverMonitoringClient { public: + void StartMonitoring() override {}; MOCK_METHOD9(SendProducerToReceiverTransferDataPoint, void(const std::string& pipelineStepId, const std::string& producerInstanceId, const std::string& beamtime, const std::string& source, const std::string& stream, uint64_t fileSize, diff --git a/receiver/unittests/monitoring/test_monitoring_client.cpp b/receiver/unittests/monitoring/test_monitoring_client.cpp index 8f43fb46e..0df6ef17c 100644 --- a/receiver/unittests/monitoring/test_monitoring_client.cpp +++ b/receiver/unittests/monitoring/test_monitoring_client.cpp @@ -58,27 +58,23 @@ namespace { class MonitoringClientTest : public Test { public: - std::shared_ptr<StrictMock<asapo::MockDataCache>> mock_cache; - std::unique_ptr<StrictMock<asapo::MockReceiverMonitoringClientImpl_ToBeSendData>> mock_toBeSend; + std::shared_ptr<NiceMock<asapo::MockDataCache>> mock_cache; + NiceMock<asapo::MockReceiverMonitoringClientImpl_ToBeSendData>* mock_toBeSend; std::unique_ptr<asapo::ReceiverMonitoringClientImpl> monitoring; void SetUp() override { - mock_cache.reset(new StrictMock<asapo::MockDataCache>); + mock_cache.reset(new NiceMock<asapo::MockDataCache>); monitoring.reset(new asapo::ReceiverMonitoringClientImpl{mock_cache}); - monitoring->sendingThreadRunning__ = true; - mock_toBeSend.reset(new StrictMock<asapo::MockReceiverMonitoringClientImpl_ToBeSendData>); - monitoring->toBeSendData__.reset(mock_toBeSend.get()); + mock_toBeSend = new NiceMock<asapo::MockReceiverMonitoringClientImpl_ToBeSendData>; + monitoring->toBeSendData__.reset(mock_toBeSend); } void TearDown() override { - monitoring->toBeSendData__.release(); // NOLINT(bugprone-unused-return-value) because it is a local part of this->mock_toBeSend } }; TEST_F(MonitoringClientTest, DefaultGenerator) { std::shared_ptr<StrictMock<asapo::MockDataCache>> mock_cache; asapo::SharedReceiverMonitoringClient monitoring_l = asapo::GenerateDefaultReceiverMonitoringClient(mock_cache, false); - asapo::ReceiverMonitoringClientImpl* monitoring_l_impl = dynamic_cast<asapo::ReceiverMonitoringClientImpl*>(monitoring_l.get()); - EXPECT_THAT(monitoring_l_impl, Ne(nullptr)); EXPECT_THAT(monitoring_l_impl->log__, Ne(nullptr)); EXPECT_THAT(monitoring_l_impl->io__, Ne(nullptr)); @@ -114,6 +110,7 @@ namespace { "p1", "i1", "b1", "so1", "st1" )).WillOnce(Return(x)); + monitoring->sendingThreadRunning__ = true; // to trick client and initiata data transfer monitoring->SendProducerToReceiverTransferDataPoint("p1", "i1", "b1", "so1", "st1", 1, 2, 3, 4); EXPECT_THAT(x->totalfilesize(), Eq(101)); @@ -132,6 +129,7 @@ namespace { "p1", "i1", "b1", "so1", "st1" )).WillOnce(Return(x)); + monitoring->sendingThreadRunning__ = true; // to trick client and initiata data transfer monitoring->SendRdsRequestWasMissDataPoint("p1", "i1", "b1", "so1", "st1"); EXPECT_THAT(x->totalfilesize(), Eq(100)); @@ -150,6 +148,7 @@ namespace { "p1", "i1", "b1", "so1", "st1" )).WillOnce(Return(x)); + monitoring->sendingThreadRunning__ = true; // to trick client and initiata data transfer monitoring->SendReceiverRequestDataPoint("p1", "i1", "b1", "so1", "st1", 2, 3); EXPECT_THAT(x->totalfilesize(), Eq(102)); -- GitLab