Skip to content
Snippets Groups Projects
Commit 4bd86ffe authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

fix memleaks and race conditions

parent e8e6f0c4
No related branches found
No related tags found
No related merge requests found
......@@ -645,13 +645,11 @@ Error SystemIO::RemoveFile(const std::string& fname) const {
std::string SystemIO::GetHostName(Error* err) const noexcept {
char host[1024];
gethostname(host, sizeof(host));
*err = GetLastError();
if (*err) {
if (gethostname(host, sizeof(host))!=0) {
*err = GetLastError();
return "";
} else {
return host;
}
return host;
}
Error SystemIO::SendFile(SocketDescriptor socket_fd, const std::string& fname, size_t length) const {
......
......@@ -51,7 +51,7 @@ void AddDataServers(const asapo::ReceiverConfig* config, const asapo::SharedCach
asapo::SharedReceiverMonitoringClient StartMonitoringClient(const asapo::ReceiverConfig* config, asapo::SharedCache cache, asapo::Error* error) {
bool useNoopImpl = !config->monitor_performance;
auto monitoring = asapo::SharedReceiverMonitoringClient(asapo::GenerateDefaultReceiverMonitoringClient(cache, useNoopImpl));
monitoring->StartMonitoring();
*error = nullptr;
return monitoring;
}
......
......@@ -17,7 +17,7 @@ public:
ReceiverMonitoringClient(const ReceiverMonitoringClient&) = delete;
ReceiverMonitoringClient& operator=(const ReceiverMonitoringClient&) = delete;
virtual void StartMonitoring() = 0;
virtual void SendProducerToReceiverTransferDataPoint(const std::string& pipelineStepId,
const std::string& producerInstanceId,
const std::string& beamtime,
......
......@@ -26,9 +26,15 @@ asapo::ReceiverMonitoringClientImpl::ReceiverMonitoringClientImpl(asapo::SharedC
}
receiverName_ = "receiver_" + hostname + "_" + std::to_string(io__->GetCurrentPid());
}
void ReceiverMonitoringClientImpl::StartMonitoring() {
ReinitializeClient();
StartSendingThread();
}
void asapo::ReceiverMonitoringClientImpl::StartSendingThread() {
if (sendingThreadRunning__) {
return;
......@@ -47,10 +53,16 @@ void asapo::ReceiverMonitoringClientImpl::StartSendingThread() {
}
void asapo::ReceiverMonitoringClientImpl::StopSendingThread() {
if (!sendingThreadRunning__) {
log__->Info("sending thread already stopped");
return;
}
log__->Info("Stopping sending thread");
sendingThreadRunning__ = false;
if (sendingThread_.get() && sendingThread_->joinable()) {
sendingThread_->join();
sendingThread_=nullptr;
}
log__->Info("Stopped sending thread");
}
......@@ -228,9 +240,6 @@ asapo::Error asapo::ReceiverMonitoringClientImpl::ReinitializeClient() {
newClient.reset(); // delete client first
newChannel.reset();
}
StartSendingThread();
return nullptr;
}
......@@ -345,4 +354,9 @@ uint64_t asapo::ReceiverMonitoringClientImpl::WaitTimeMsUntilNextInterval(std::c
return result;
}
ReceiverMonitoringClientImpl::~ReceiverMonitoringClientImpl() {
StopSendingThread();
}
......@@ -43,10 +43,10 @@ public:
explicit ReceiverMonitoringClientImpl(SharedCache cache);
ReceiverMonitoringClientImpl(const ReceiverMonitoringClientImpl&) = delete;
ReceiverMonitoringClientImpl& operator=(const ReceiverMonitoringClientImpl&) = delete;
~ReceiverMonitoringClientImpl();
void StartSendingThread();
void StopSendingThread();
void StartMonitoring() override;
void SendProducerToReceiverTransferDataPoint(const std::string& pipelineStepId,
const std::string& producerInstanceId,
const std::string& beamtime,
......@@ -85,7 +85,7 @@ public:
//std::mutex mutex;
ReceiverDataPointContainer container;
ASAPO_VIRTUAL ~ToBeSendData()=default;
ASAPO_VIRTUAL ProducerToReceiverTransferDataPoint* GetProducerToReceiverTransfer(
ASAPO_VIRTUAL ProducerToReceiverTransferDataPoint* GetProducerToReceiverTransfer(
const std::string& pipelineStepId,
const std::string& producerInstanceId,
const std::string& beamtime,
......
......@@ -52,3 +52,7 @@ void asapo::ReceiverMonitoringClientNoop::SendReceiverRequestDataPoint(const std
void asapo::ReceiverMonitoringClientNoop::FillMemoryStats() {
}
void asapo::ReceiverMonitoringClientNoop::StartMonitoring() {
}
......@@ -6,6 +6,7 @@
namespace asapo {
class ReceiverMonitoringClientNoop : public ReceiverMonitoringClient {
public:
void StartMonitoring() override;
void SendProducerToReceiverTransferDataPoint(const std::string& pipelineStepId,
const std::string& producerInstanceId, const std::string& beamtime,
const std::string& source, const std::string& stream,
......
......@@ -10,6 +10,7 @@ namespace asapo {
class MockReceiverMonitoringClient : public asapo::ReceiverMonitoringClient {
public:
void StartMonitoring() override {};
MOCK_METHOD9(SendProducerToReceiverTransferDataPoint, void(const std::string& pipelineStepId, const std::string& producerInstanceId,
const std::string& beamtime, const std::string& source,
const std::string& stream, uint64_t fileSize,
......
......@@ -58,27 +58,23 @@ namespace {
class MonitoringClientTest : public Test {
public:
std::shared_ptr<StrictMock<asapo::MockDataCache>> mock_cache;
std::unique_ptr<StrictMock<asapo::MockReceiverMonitoringClientImpl_ToBeSendData>> mock_toBeSend;
std::shared_ptr<NiceMock<asapo::MockDataCache>> mock_cache;
NiceMock<asapo::MockReceiverMonitoringClientImpl_ToBeSendData>* mock_toBeSend;
std::unique_ptr<asapo::ReceiverMonitoringClientImpl> monitoring;
void SetUp() override {
mock_cache.reset(new StrictMock<asapo::MockDataCache>);
mock_cache.reset(new NiceMock<asapo::MockDataCache>);
monitoring.reset(new asapo::ReceiverMonitoringClientImpl{mock_cache});
monitoring->sendingThreadRunning__ = true;
mock_toBeSend.reset(new StrictMock<asapo::MockReceiverMonitoringClientImpl_ToBeSendData>);
monitoring->toBeSendData__.reset(mock_toBeSend.get());
mock_toBeSend = new NiceMock<asapo::MockReceiverMonitoringClientImpl_ToBeSendData>;
monitoring->toBeSendData__.reset(mock_toBeSend);
}
void TearDown() override {
monitoring->toBeSendData__.release(); // NOLINT(bugprone-unused-return-value) because it is a local part of this->mock_toBeSend
}
};
TEST_F(MonitoringClientTest, DefaultGenerator) {
std::shared_ptr<StrictMock<asapo::MockDataCache>> mock_cache;
asapo::SharedReceiverMonitoringClient monitoring_l = asapo::GenerateDefaultReceiverMonitoringClient(mock_cache, false);
asapo::ReceiverMonitoringClientImpl* monitoring_l_impl = dynamic_cast<asapo::ReceiverMonitoringClientImpl*>(monitoring_l.get());
EXPECT_THAT(monitoring_l_impl, Ne(nullptr));
EXPECT_THAT(monitoring_l_impl->log__, Ne(nullptr));
EXPECT_THAT(monitoring_l_impl->io__, Ne(nullptr));
......@@ -114,6 +110,7 @@ namespace {
"p1", "i1", "b1", "so1", "st1"
)).WillOnce(Return(x));
monitoring->sendingThreadRunning__ = true; // to trick client and initiata data transfer
monitoring->SendProducerToReceiverTransferDataPoint("p1", "i1", "b1", "so1", "st1", 1, 2, 3, 4);
EXPECT_THAT(x->totalfilesize(), Eq(101));
......@@ -132,6 +129,7 @@ namespace {
"p1", "i1", "b1", "so1", "st1"
)).WillOnce(Return(x));
monitoring->sendingThreadRunning__ = true; // to trick client and initiata data transfer
monitoring->SendRdsRequestWasMissDataPoint("p1", "i1", "b1", "so1", "st1");
EXPECT_THAT(x->totalfilesize(), Eq(100));
......@@ -150,6 +148,7 @@ namespace {
"p1", "i1", "b1", "so1", "st1"
)).WillOnce(Return(x));
monitoring->sendingThreadRunning__ = true; // to trick client and initiata data transfer
monitoring->SendReceiverRequestDataPoint("p1", "i1", "b1", "so1", "st1", 2, 3);
EXPECT_THAT(x->totalfilesize(), Eq(102));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment