diff --git a/common/cpp/src/system_io/system_io.cpp b/common/cpp/src/system_io/system_io.cpp index aef76f2d169a853a9eb7d860fbddbfc99cbc86f5..e0796f61dc56e8a79bbcd6439b1c12d2d8f6f1d4 100644 --- a/common/cpp/src/system_io/system_io.cpp +++ b/common/cpp/src/system_io/system_io.cpp @@ -645,13 +645,11 @@ Error SystemIO::RemoveFile(const std::string& fname) const { std::string SystemIO::GetHostName(Error* err) const noexcept { char host[1024]; - gethostname(host, sizeof(host)); - *err = GetLastError(); - if (*err) { + if (gethostname(host, sizeof(host))!=0) { + *err = GetLastError(); return ""; - } else { - return host; } + return host; } Error SystemIO::SendFile(SocketDescriptor socket_fd, const std::string& fname, size_t length) const { diff --git a/receiver/src/main.cpp b/receiver/src/main.cpp index d7d5905f73b548c479eca9b9a9be1039757dce4a..c2b9c63bde0c7fcaf1be97c6eddf09f5b6004de4 100644 --- a/receiver/src/main.cpp +++ b/receiver/src/main.cpp @@ -51,7 +51,7 @@ void AddDataServers(const asapo::ReceiverConfig* config, const asapo::SharedCach asapo::SharedReceiverMonitoringClient StartMonitoringClient(const asapo::ReceiverConfig* config, asapo::SharedCache cache, asapo::Error* error) { bool useNoopImpl = !config->monitor_performance; auto monitoring = asapo::SharedReceiverMonitoringClient(asapo::GenerateDefaultReceiverMonitoringClient(cache, useNoopImpl)); - + monitoring->StartMonitoring(); *error = nullptr; return monitoring; } diff --git a/receiver/src/monitoring/receiver_monitoring_client.h b/receiver/src/monitoring/receiver_monitoring_client.h index 857f011fca0fb24e023ee804cea19e18e0ec484b..cfdfd0d96f883ce6c8813d679940f94814895a23 100644 --- a/receiver/src/monitoring/receiver_monitoring_client.h +++ b/receiver/src/monitoring/receiver_monitoring_client.h @@ -17,7 +17,7 @@ public: ReceiverMonitoringClient(const ReceiverMonitoringClient&) = delete; ReceiverMonitoringClient& operator=(const ReceiverMonitoringClient&) = delete; - + virtual void StartMonitoring() = 0; virtual void SendProducerToReceiverTransferDataPoint(const std::string& pipelineStepId, const std::string& producerInstanceId, const std::string& beamtime, diff --git a/receiver/src/monitoring/receiver_monitoring_client_impl.cpp b/receiver/src/monitoring/receiver_monitoring_client_impl.cpp index 05ac3c32133a5b74703e15aacb6bfbcb0296b0b6..ce47f6f72b8f5159beaa59d0bf44ea4d816e2358 100644 --- a/receiver/src/monitoring/receiver_monitoring_client_impl.cpp +++ b/receiver/src/monitoring/receiver_monitoring_client_impl.cpp @@ -26,9 +26,15 @@ asapo::ReceiverMonitoringClientImpl::ReceiverMonitoringClientImpl(asapo::SharedC } receiverName_ = "receiver_" + hostname + "_" + std::to_string(io__->GetCurrentPid()); + +} + +void ReceiverMonitoringClientImpl::StartMonitoring() { ReinitializeClient(); + StartSendingThread(); } + void asapo::ReceiverMonitoringClientImpl::StartSendingThread() { if (sendingThreadRunning__) { return; @@ -47,10 +53,16 @@ void asapo::ReceiverMonitoringClientImpl::StartSendingThread() { } void asapo::ReceiverMonitoringClientImpl::StopSendingThread() { + if (!sendingThreadRunning__) { + log__->Info("sending thread already stopped"); + return; + } + log__->Info("Stopping sending thread"); sendingThreadRunning__ = false; if (sendingThread_.get() && sendingThread_->joinable()) { sendingThread_->join(); + sendingThread_=nullptr; } log__->Info("Stopped sending thread"); } @@ -228,9 +240,6 @@ asapo::Error asapo::ReceiverMonitoringClientImpl::ReinitializeClient() { newClient.reset(); // delete client first newChannel.reset(); } - - StartSendingThread(); - return nullptr; } @@ -345,4 +354,9 @@ uint64_t asapo::ReceiverMonitoringClientImpl::WaitTimeMsUntilNextInterval(std::c return result; } +ReceiverMonitoringClientImpl::~ReceiverMonitoringClientImpl() { + StopSendingThread(); +} + + diff --git a/receiver/src/monitoring/receiver_monitoring_client_impl.h b/receiver/src/monitoring/receiver_monitoring_client_impl.h index 8f28670d43a1484e7048ea849bc022eeb74e7b28..04fc5b7f458b73c08db9a403d7c89c38d39391b4 100644 --- a/receiver/src/monitoring/receiver_monitoring_client_impl.h +++ b/receiver/src/monitoring/receiver_monitoring_client_impl.h @@ -43,10 +43,10 @@ public: explicit ReceiverMonitoringClientImpl(SharedCache cache); ReceiverMonitoringClientImpl(const ReceiverMonitoringClientImpl&) = delete; ReceiverMonitoringClientImpl& operator=(const ReceiverMonitoringClientImpl&) = delete; - + ~ReceiverMonitoringClientImpl(); void StartSendingThread(); void StopSendingThread(); - + void StartMonitoring() override; void SendProducerToReceiverTransferDataPoint(const std::string& pipelineStepId, const std::string& producerInstanceId, const std::string& beamtime, @@ -85,7 +85,7 @@ public: //std::mutex mutex; ReceiverDataPointContainer container; ASAPO_VIRTUAL ~ToBeSendData()=default; - ASAPO_VIRTUAL ProducerToReceiverTransferDataPoint* GetProducerToReceiverTransfer( + ASAPO_VIRTUAL ProducerToReceiverTransferDataPoint* GetProducerToReceiverTransfer( const std::string& pipelineStepId, const std::string& producerInstanceId, const std::string& beamtime, diff --git a/receiver/src/monitoring/receiver_monitoring_client_noop.cpp b/receiver/src/monitoring/receiver_monitoring_client_noop.cpp index 1679476c1c56342fdfeae394c9f333ba082403b9..1208c444e50fae15e24f780b8eacdb6ef62c8a4f 100644 --- a/receiver/src/monitoring/receiver_monitoring_client_noop.cpp +++ b/receiver/src/monitoring/receiver_monitoring_client_noop.cpp @@ -52,3 +52,7 @@ void asapo::ReceiverMonitoringClientNoop::SendReceiverRequestDataPoint(const std void asapo::ReceiverMonitoringClientNoop::FillMemoryStats() { } + +void asapo::ReceiverMonitoringClientNoop::StartMonitoring() { + +} diff --git a/receiver/src/monitoring/receiver_monitoring_client_noop.h b/receiver/src/monitoring/receiver_monitoring_client_noop.h index 7950fc11b800f0fe787f5afc39d32c12f66718c3..a95dca0d6a0db6808df168bc8f3e5eebe1b2ee1d 100644 --- a/receiver/src/monitoring/receiver_monitoring_client_noop.h +++ b/receiver/src/monitoring/receiver_monitoring_client_noop.h @@ -6,6 +6,7 @@ namespace asapo { class ReceiverMonitoringClientNoop : public ReceiverMonitoringClient { public: + void StartMonitoring() override; void SendProducerToReceiverTransferDataPoint(const std::string& pipelineStepId, const std::string& producerInstanceId, const std::string& beamtime, const std::string& source, const std::string& stream, diff --git a/receiver/unittests/monitoring/receiver_monitoring_mocking.h b/receiver/unittests/monitoring/receiver_monitoring_mocking.h index 506bd360b05bacfe9090dccfda35f4750a969933..53094572dd846944b709b3d6e9186ae2d8f7962b 100644 --- a/receiver/unittests/monitoring/receiver_monitoring_mocking.h +++ b/receiver/unittests/monitoring/receiver_monitoring_mocking.h @@ -10,6 +10,7 @@ namespace asapo { class MockReceiverMonitoringClient : public asapo::ReceiverMonitoringClient { public: + void StartMonitoring() override {}; MOCK_METHOD9(SendProducerToReceiverTransferDataPoint, void(const std::string& pipelineStepId, const std::string& producerInstanceId, const std::string& beamtime, const std::string& source, const std::string& stream, uint64_t fileSize, diff --git a/receiver/unittests/monitoring/test_monitoring_client.cpp b/receiver/unittests/monitoring/test_monitoring_client.cpp index 8f43fb46e598d9d11dd27182eea31af02b942978..0df6ef17c89e895aa10cfa672de78af5646d3974 100644 --- a/receiver/unittests/monitoring/test_monitoring_client.cpp +++ b/receiver/unittests/monitoring/test_monitoring_client.cpp @@ -58,27 +58,23 @@ namespace { class MonitoringClientTest : public Test { public: - std::shared_ptr<StrictMock<asapo::MockDataCache>> mock_cache; - std::unique_ptr<StrictMock<asapo::MockReceiverMonitoringClientImpl_ToBeSendData>> mock_toBeSend; + std::shared_ptr<NiceMock<asapo::MockDataCache>> mock_cache; + NiceMock<asapo::MockReceiverMonitoringClientImpl_ToBeSendData>* mock_toBeSend; std::unique_ptr<asapo::ReceiverMonitoringClientImpl> monitoring; void SetUp() override { - mock_cache.reset(new StrictMock<asapo::MockDataCache>); + mock_cache.reset(new NiceMock<asapo::MockDataCache>); monitoring.reset(new asapo::ReceiverMonitoringClientImpl{mock_cache}); - monitoring->sendingThreadRunning__ = true; - mock_toBeSend.reset(new StrictMock<asapo::MockReceiverMonitoringClientImpl_ToBeSendData>); - monitoring->toBeSendData__.reset(mock_toBeSend.get()); + mock_toBeSend = new NiceMock<asapo::MockReceiverMonitoringClientImpl_ToBeSendData>; + monitoring->toBeSendData__.reset(mock_toBeSend); } void TearDown() override { - monitoring->toBeSendData__.release(); // NOLINT(bugprone-unused-return-value) because it is a local part of this->mock_toBeSend } }; TEST_F(MonitoringClientTest, DefaultGenerator) { std::shared_ptr<StrictMock<asapo::MockDataCache>> mock_cache; asapo::SharedReceiverMonitoringClient monitoring_l = asapo::GenerateDefaultReceiverMonitoringClient(mock_cache, false); - asapo::ReceiverMonitoringClientImpl* monitoring_l_impl = dynamic_cast<asapo::ReceiverMonitoringClientImpl*>(monitoring_l.get()); - EXPECT_THAT(monitoring_l_impl, Ne(nullptr)); EXPECT_THAT(monitoring_l_impl->log__, Ne(nullptr)); EXPECT_THAT(monitoring_l_impl->io__, Ne(nullptr)); @@ -114,6 +110,7 @@ namespace { "p1", "i1", "b1", "so1", "st1" )).WillOnce(Return(x)); + monitoring->sendingThreadRunning__ = true; // to trick client and initiata data transfer monitoring->SendProducerToReceiverTransferDataPoint("p1", "i1", "b1", "so1", "st1", 1, 2, 3, 4); EXPECT_THAT(x->totalfilesize(), Eq(101)); @@ -132,6 +129,7 @@ namespace { "p1", "i1", "b1", "so1", "st1" )).WillOnce(Return(x)); + monitoring->sendingThreadRunning__ = true; // to trick client and initiata data transfer monitoring->SendRdsRequestWasMissDataPoint("p1", "i1", "b1", "so1", "st1"); EXPECT_THAT(x->totalfilesize(), Eq(100)); @@ -150,6 +148,7 @@ namespace { "p1", "i1", "b1", "so1", "st1" )).WillOnce(Return(x)); + monitoring->sendingThreadRunning__ = true; // to trick client and initiata data transfer monitoring->SendReceiverRequestDataPoint("p1", "i1", "b1", "so1", "st1", 2, 3); EXPECT_THAT(x->totalfilesize(), Eq(102));