diff --git a/receiver/src/receiver_data_server/net_server/fabric_rds_request.cpp b/receiver/src/receiver_data_server/net_server/fabric_rds_request.cpp index 3f905caffb36cf82136b15c4d712823c69fde36e..583e589d4b168f8f43642a7fcd773fef5cbcfcef 100644 --- a/receiver/src/receiver_data_server/net_server/fabric_rds_request.cpp +++ b/receiver/src/receiver_data_server/net_server/fabric_rds_request.cpp @@ -5,7 +5,7 @@ using namespace asapo; FabricRdsRequest::FabricRdsRequest(const GenericRequestHeader& header, - fabric::FabricAddress sourceId, fabric::FabricMessageId messageId, SharedInstancedStatistics statistics) + fabric::FabricAddress sourceId, fabric::FabricMessageId messageId, RequestStatisticsPtr statistics) : ReceiverDataServerRequest(header, sourceId, std::move(statistics)), message_id{messageId} { } diff --git a/receiver/src/receiver_data_server/net_server/fabric_rds_request.h b/receiver/src/receiver_data_server/net_server/fabric_rds_request.h index c9fb6d0ee3ce9f1dcf2775852fb47fe14bb70860..2925221399b33abd2f32097af88dff94263ea7a0 100644 --- a/receiver/src/receiver_data_server/net_server/fabric_rds_request.h +++ b/receiver/src/receiver_data_server/net_server/fabric_rds_request.h @@ -9,7 +9,7 @@ namespace asapo { class FabricRdsRequest : public ReceiverDataServerRequest { public: explicit FabricRdsRequest(const GenericRequestHeader& header, fabric::FabricAddress source_id, - fabric::FabricMessageId messageId, SharedInstancedStatistics statistics); + fabric::FabricMessageId messageId, RequestStatisticsPtr statistics); fabric::FabricMessageId message_id; const fabric::MemoryRegionDetails* GetMemoryRegion() const; }; diff --git a/receiver/src/receiver_data_server/net_server/rds_fabric_server.cpp b/receiver/src/receiver_data_server/net_server/rds_fabric_server.cpp index 4e15952d55565d5c698f22e13d86795bfdbdad13..07b37856ab0bd46631ad833e9186ef28adf4e9b7 100644 --- a/receiver/src/receiver_data_server/net_server/rds_fabric_server.cpp +++ b/receiver/src/receiver_data_server/net_server/rds_fabric_server.cpp @@ -40,7 +40,7 @@ GenericRequests RdsFabricServer::GetNewRequests(Error* err) { fabric::FabricAddress srcAddress; fabric::FabricMessageId messageId; - SharedInstancedStatistics statistics{new InstancedStatistics}; + RequestStatisticsPtr statistics{new RequestStatistics}; // We cannot measure time here, since it is a blocking call :/ GenericRequestHeader header; @@ -49,7 +49,7 @@ GenericRequests RdsFabricServer::GetNewRequests(Error* err) { return {}; // empty result } statistics->AddIncomingBytes(sizeof(header)); - auto requestPtr = new FabricRdsRequest(header, srcAddress, messageId, statistics); + auto requestPtr = new FabricRdsRequest(header, srcAddress, messageId, std::move(statistics)); GenericRequests genericRequests; genericRequests.emplace_back(GenericRequestPtr(requestPtr)); diff --git a/receiver/src/receiver_data_server/net_server/rds_tcp_server.cpp b/receiver/src/receiver_data_server/net_server/rds_tcp_server.cpp index c819045ebdb2e3bf38c0c1001e8d03ac4975964f..57b75e23ded55c582b1a81f3e5bf12b7fe753019 100644 --- a/receiver/src/receiver_data_server/net_server/rds_tcp_server.cpp +++ b/receiver/src/receiver_data_server/net_server/rds_tcp_server.cpp @@ -52,7 +52,7 @@ ReceiverDataServerRequestPtr RdsTcpServer::ReadRequest(SocketDescriptor socket, Error io_err; *err = nullptr; - SharedInstancedStatistics statistics{new InstancedStatistics}; + RequestStatisticsPtr statistics{new RequestStatistics}; statistics->StartTimer(kNetworkIncoming); uint64_t bytesReceived = io__->Receive(socket, &header, sizeof(GenericRequestHeader), &io_err); statistics->StopTimer(); @@ -67,7 +67,7 @@ ReceiverDataServerRequestPtr RdsTcpServer::ReadRequest(SocketDescriptor socket, (*err)->AddDetails("origin",io__->AddressFromSocket(socket)); return nullptr; } - return ReceiverDataServerRequestPtr{new ReceiverDataServerRequest{header, (uint64_t) socket, statistics}}; + return ReceiverDataServerRequestPtr{new ReceiverDataServerRequest{header, (uint64_t) socket, std::move(statistics)}}; } GenericRequests RdsTcpServer::ReadRequests(const ListSocketDescriptors& sockets) { diff --git a/receiver/src/receiver_data_server/receiver_data_server_request.cpp b/receiver/src/receiver_data_server/receiver_data_server_request.cpp index c4568761374794488b63295321630250c9064d70..399efd1c86f4529b8c00349de99b625074305d9b 100644 --- a/receiver/src/receiver_data_server/receiver_data_server_request.cpp +++ b/receiver/src/receiver_data_server/receiver_data_server_request.cpp @@ -5,11 +5,11 @@ namespace asapo { -ReceiverDataServerRequest::ReceiverDataServerRequest(const GenericRequestHeader& header, uint64_t source_id, SharedInstancedStatistics statistics) : +ReceiverDataServerRequest::ReceiverDataServerRequest(const GenericRequestHeader& header, uint64_t source_id, RequestStatisticsPtr statistics) : GenericRequest(header, 0), statistics_{std::move(statistics)}, source_id{source_id} { } -SharedInstancedStatistics ReceiverDataServerRequest::GetStatisticsProvider() { - return statistics_; +RequestStatistics* ReceiverDataServerRequest::GetStatistics() { + return statistics_.get(); } } diff --git a/receiver/src/receiver_data_server/receiver_data_server_request.h b/receiver/src/receiver_data_server/receiver_data_server_request.h index 148599c8aa482666f2ac19d519cdb2f12f1ab921..e1f8435d83524126fa0e814109571016322aba0f 100644 --- a/receiver/src/receiver_data_server/receiver_data_server_request.h +++ b/receiver/src/receiver_data_server/receiver_data_server_request.h @@ -12,15 +12,14 @@ class RdsNetServer; class ReceiverDataServerRequest : public GenericRequest { private: - - SharedInstancedStatistics statistics_; + RequestStatisticsPtr statistics_; public: - explicit ReceiverDataServerRequest(const GenericRequestHeader& header, uint64_t source_id, SharedInstancedStatistics statistics); + explicit ReceiverDataServerRequest(const GenericRequestHeader& header, uint64_t source_id, RequestStatisticsPtr statistics); ~ReceiverDataServerRequest() override = default; const uint64_t source_id; - SharedInstancedStatistics GetStatisticsProvider(); + RequestStatistics* GetStatistics(); }; using ReceiverDataServerRequestPtr = std::unique_ptr<ReceiverDataServerRequest>; diff --git a/receiver/src/request.cpp b/receiver/src/request.cpp index 500bd688e9775c51a2c33ca7b6578bae497b7cbe..45f337bdb364a30ec6d0fc6e7ad3b39a433f68b2 100644 --- a/receiver/src/request.cpp +++ b/receiver/src/request.cpp @@ -10,10 +10,10 @@ namespace asapo { Request::Request(const GenericRequestHeader& header, SocketDescriptor socket_fd, std::string origin_uri, DataCache* cache, const RequestHandlerDbCheckRequest* db_check_handler, - SharedInstancedStatistics statistics) : io__{GenerateDefaultIO()}, - cache__{cache}, log__{GetDefaultReceiverLogger()},statistics_{std::move(statistics)}, request_header_(header), - socket_fd_{socket_fd}, origin_uri_{std::move(origin_uri)}, - check_duplicate_request_handler_{db_check_handler} { + RequestStatisticsPtr statistics) : io__{GenerateDefaultIO()}, + cache__{cache}, log__{GetDefaultReceiverLogger()}, statistics_{std::move(statistics)}, request_header_(header), + socket_fd_{socket_fd}, origin_uri_{std::move(origin_uri)}, + check_duplicate_request_handler_{db_check_handler} { origin_host_ = HostFromUri(origin_uri_); } @@ -59,9 +59,13 @@ Error Request::PrepareDataBufferAndLockIfNeeded() { Error Request::Handle() { Error err; for (auto handler : handlers_) { - statistics_->StartTimer(handler->GetStatisticEntity()); + if (statistics_) { + statistics_->StartTimer(handler->GetStatisticEntity()); + } err = handler->ProcessRequest(this); - statistics_->StopTimer(); + if (statistics_) { + statistics_->StopTimer(); + } if (err) { break; } @@ -243,8 +247,12 @@ SourceType Request::GetSourceType() const { return source_type_; } -SharedInstancedStatistics Request::GetInstancedStatistics() { - return statistics_; +RequestStatistics* Request::GetStatistics() { + if (statistics_) { + return statistics_.get(); + } else { + return nullptr; + } } const std::string& Request::GetOriginHost() const { diff --git a/receiver/src/request.h b/receiver/src/request.h index b1b1755f41a568ce432a4ec449ce1c183b5879ed..4d1cf79c19d9538c087431592fbe20c69218135b 100644 --- a/receiver/src/request.h +++ b/receiver/src/request.h @@ -33,7 +33,7 @@ class Request { Request() = delete; Request(const GenericRequestHeader& request_header, SocketDescriptor socket_fd, std::string origin_uri, DataCache* cache, const RequestHandlerDbCheckRequest* db_check_handler, - SharedInstancedStatistics statistics); + RequestStatisticsPtr statistics); ASAPO_VIRTUAL void AddHandler(const ReceiverRequestHandler*); ASAPO_VIRTUAL const RequestHandlerList& GetListHandlers() const; ASAPO_VIRTUAL uint64_t GetDataSize() const; @@ -83,13 +83,13 @@ class Request { ASAPO_VIRTUAL ResponseMessageType GetResponseMessageType() const; ASAPO_VIRTUAL const std::string& GetResponseMessage() const; ASAPO_VIRTUAL Error CheckForDuplicates(); - ASAPO_VIRTUAL SharedInstancedStatistics GetInstancedStatistics(); + ASAPO_VIRTUAL RequestStatistics* GetStatistics(); const AbstractLogger* log__; private: Error PrepareDataBufferFromMemory(); Error PrepareDataBufferFromCache(); private: - SharedInstancedStatistics statistics_; + RequestStatisticsPtr statistics_; const GenericRequestHeader request_header_; const SocketDescriptor socket_fd_; MessageData data_buffer_; diff --git a/receiver/src/request_handler/request_factory.cpp b/receiver/src/request_handler/request_factory.cpp index 6084f89ced7a43530f44a6baedd0d60192e81c50..b40207a5b030bc104cf2ed28cb0eb8e0c1754e03 100644 --- a/receiver/src/request_handler/request_factory.cpp +++ b/receiver/src/request_handler/request_factory.cpp @@ -114,9 +114,9 @@ Error RequestFactory::AddHandlersToRequest(std::unique_ptr<Request>& request, std::unique_ptr<Request> RequestFactory::GenerateRequest(const GenericRequestHeader& request_header, SocketDescriptor socket_fd, std::string origin_uri, - const SharedInstancedStatistics& statistics, Error* err) const noexcept { + RequestStatisticsPtr statistics, Error* err) const noexcept { auto request = std::unique_ptr<Request> {new Request{request_header, socket_fd, std::move(origin_uri), cache_.get(), - &request_handler_db_check_, statistics} + &request_handler_db_check_, std::move(statistics)} }; *err = AddHandlersToRequest(request, request_header); if (*err) { diff --git a/receiver/src/request_handler/request_factory.h b/receiver/src/request_handler/request_factory.h index 7be05eaaa575b1d4d082dbb14a4b869754a158b0..75c39aa91c08427175d2154d866feac8380a7291 100644 --- a/receiver/src/request_handler/request_factory.h +++ b/receiver/src/request_handler/request_factory.h @@ -28,7 +28,7 @@ class RequestFactory { explicit RequestFactory (SharedReceiverMonitoringClient monitoring, SharedCache cache, KafkaClient* kafka_client); virtual std::unique_ptr<Request> GenerateRequest(const GenericRequestHeader& request_header, SocketDescriptor socket_fd, std::string origin_uri, - const SharedInstancedStatistics& statistics, Error* err) const noexcept; + RequestStatisticsPtr statistics, Error* err) const noexcept; virtual ~RequestFactory() = default; private: Error AddHandlersToRequest(std::unique_ptr<Request>& request, const GenericRequestHeader& request_header) const; diff --git a/receiver/src/request_handler/request_handler_monitoring.cpp b/receiver/src/request_handler/request_handler_monitoring.cpp index 88f2d9960b451d504a69d9be17072cb351567af3..7ba548dd812d40c85b7474f19d040c00dc7de2f8 100644 --- a/receiver/src/request_handler/request_handler_monitoring.cpp +++ b/receiver/src/request_handler/request_handler_monitoring.cpp @@ -13,8 +13,10 @@ StatisticEntity RequestHandlerMonitoring::GetStatisticEntity() const { } Error RequestHandlerMonitoring::ProcessRequest(Request* request) const { - auto stats = request->GetInstancedStatistics(); - + auto stats = request->GetStatistics(); + if (!stats) { + return nullptr; + } monitoring_->SendProducerToReceiverTransferDataPoint( request->GetPipelineStepId(), request->GetProducerInstanceId(), diff --git a/receiver/src/request_handler/request_handler_receive_data.cpp b/receiver/src/request_handler/request_handler_receive_data.cpp index 3103c194f86d1117c3273d0a2368326d2d5d24b3..94e1946265f5a9106d74a085678e6c0d18077677 100644 --- a/receiver/src/request_handler/request_handler_receive_data.cpp +++ b/receiver/src/request_handler/request_handler_receive_data.cpp @@ -20,7 +20,9 @@ Error RequestHandlerReceiveData::ProcessRequest(Request* request) const { } Error io_err; uint64_t byteCount = io__->Receive(request->GetSocket(), request->GetData(), (size_t) request->GetDataSize(), &io_err); - request->GetInstancedStatistics()->AddIncomingBytes(byteCount); + if (request->GetStatistics()) { + request->GetStatistics()->AddIncomingBytes(byteCount); + } if (io_err) { err = ReceiverErrorTemplates::kProcessingError.Generate("cannot receive data",std::move(io_err)); } diff --git a/receiver/src/request_handler/request_handler_receive_metadata.cpp b/receiver/src/request_handler/request_handler_receive_metadata.cpp index 518b45ede6acd7beda4ea8a07b3c4feaa3084e72..df783b61a9db2d5135a8f416a7d2076ba7385f68 100644 --- a/receiver/src/request_handler/request_handler_receive_metadata.cpp +++ b/receiver/src/request_handler/request_handler_receive_metadata.cpp @@ -14,7 +14,9 @@ Error RequestHandlerReceiveMetaData::ProcessRequest(Request* request) const { Error err; auto buf = std::unique_ptr<uint8_t[]> {new uint8_t[meta_size]}; uint64_t byteCount = io__->Receive(request->GetSocket(), (void*) buf.get(), meta_size, &err); - request->GetInstancedStatistics()->AddIncomingBytes(byteCount); + if (request->GetStatistics()) { + request->GetStatistics()->AddIncomingBytes(byteCount); + } if (err) { return ReceiverErrorTemplates::kProcessingError.Generate("cannot receive metadata",std::move(err)); } diff --git a/receiver/src/request_handler/requests_dispatcher.cpp b/receiver/src/request_handler/requests_dispatcher.cpp index d7ef7bf1ac1154014b119708b0f73f14eb958581..068e816508f2dad1c4b9653647076c1c5af1351a 100644 --- a/receiver/src/request_handler/requests_dispatcher.cpp +++ b/receiver/src/request_handler/requests_dispatcher.cpp @@ -59,7 +59,9 @@ Error RequestsDispatcher::HandleRequest(const std::unique_ptr<Request>& request) log__->Debug(RequestLog("got new request", request.get())); Error handle_err; handle_err = request->Handle(); - statistics__->ApplyTimeFrom(request->GetInstancedStatistics()); + if (request->GetStatistics()) { + statistics__->ApplyTimeFrom(request->GetStatistics()); + } if (handle_err) { if (handle_err == ReceiverErrorTemplates::kReAuthorizationFailure) { log__->Warning(LogMessageWithFields(handle_err).Append(RequestLog("", request.get()))); @@ -94,7 +96,7 @@ Error RequestsDispatcher::ProcessRequest(const std::unique_ptr<Request>& request std::unique_ptr<Request> RequestsDispatcher::GetNextRequest(Error* err) const noexcept { //TODO: to be overwritten with MessagePack (or similar) GenericRequestHeader generic_request_header; - SharedInstancedStatistics statistics{new InstancedStatistics}; + RequestStatisticsPtr statistics{new RequestStatistics}; statistics->StartTimer(StatisticEntity::kNetworkIncoming); Error io_err; @@ -107,7 +109,7 @@ std::unique_ptr<Request> RequestsDispatcher::GetNextRequest(Error* err) const no return nullptr; } statistics->StopTimer(); - auto request = request_factory__->GenerateRequest(generic_request_header, socket_fd_, producer_uri_, statistics, err); + auto request = request_factory__->GenerateRequest(generic_request_header, socket_fd_, producer_uri_, std::move(statistics), err); if (*err) { log__->Error(LogMessageWithFields(*err).Append("origin", HostFromUri(producer_uri_))); } diff --git a/receiver/src/statistics/instanced_statistics_provider.cpp b/receiver/src/statistics/instanced_statistics_provider.cpp index 9a42ad26d82cc753b79d94d9b85cebcb2cd61ab2..ae2b74ba176a774ee21e8481e093eda624a70e7c 100644 --- a/receiver/src/statistics/instanced_statistics_provider.cpp +++ b/receiver/src/statistics/instanced_statistics_provider.cpp @@ -3,36 +3,36 @@ using namespace asapo; using namespace std::chrono; -void InstancedStatistics::StartTimer(StatisticEntity entity) { +void RequestStatistics::StartTimer(StatisticEntity entity) { current_statistic_entity_ = entity; current_timer_last_timepoint_ = system_clock::now(); } -void InstancedStatistics::StopTimer() { +void RequestStatistics::StopTimer() { auto elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(system_clock::now() - current_timer_last_timepoint_); time_counters_[current_statistic_entity_] += elapsed; } -std::chrono::nanoseconds InstancedStatistics::GetElapsed(StatisticEntity entity) const { +std::chrono::nanoseconds RequestStatistics::GetElapsed(StatisticEntity entity) const { return time_counters_[entity]; } -uint64_t InstancedStatistics::GetElapsedMicrosecondsCount(StatisticEntity entity) const { +uint64_t RequestStatistics::GetElapsedMicrosecondsCount(StatisticEntity entity) const { return static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(time_counters_[entity]).count()); } -void InstancedStatistics::AddIncomingBytes(uint64_t incomingByteCount) { +void RequestStatistics::AddIncomingBytes(uint64_t incomingByteCount) { incomingBytes_ += incomingByteCount; } -void InstancedStatistics::AddOutgoingBytes(uint64_t outgoingByteCount) { +void RequestStatistics::AddOutgoingBytes(uint64_t outgoingByteCount) { outgoingBytes_ += outgoingByteCount; } -uint64_t InstancedStatistics::GetIncomingBytes() const { +uint64_t RequestStatistics::GetIncomingBytes() const { return incomingBytes_; } -uint64_t InstancedStatistics::GetOutgoingBytes() const { +uint64_t RequestStatistics::GetOutgoingBytes() const { return outgoingBytes_; } diff --git a/receiver/src/statistics/instanced_statistics_provider.h b/receiver/src/statistics/instanced_statistics_provider.h index d46b84cf6ce339b64be597a3dcd4025f023db5ec..7d2ed9acd08db97630dd7c3f49061028e90179ea 100644 --- a/receiver/src/statistics/instanced_statistics_provider.h +++ b/receiver/src/statistics/instanced_statistics_provider.h @@ -8,7 +8,7 @@ namespace asapo { -class InstancedStatistics { +class RequestStatistics { private: std::chrono::system_clock::time_point current_timer_last_timepoint_; StatisticEntity current_statistic_entity_ = StatisticEntity::kDatabase; @@ -29,11 +29,11 @@ public: ASAPO_VIRTUAL std::chrono::nanoseconds GetElapsed(StatisticEntity entity) const; ASAPO_VIRTUAL uint64_t GetElapsedMicrosecondsCount(StatisticEntity entity) const; - ASAPO_VIRTUAL ~InstancedStatistics() = default; + ASAPO_VIRTUAL ~RequestStatistics() = default; }; -using SharedInstancedStatistics = std::shared_ptr<asapo::InstancedStatistics>; +using RequestStatisticsPtr = std::unique_ptr<RequestStatistics>; } diff --git a/receiver/src/statistics/receiver_statistics.cpp b/receiver/src/statistics/receiver_statistics.cpp index 378231b5aeda58f5e1d97351358e5409e51c4880..bfff6ed68b75e6bc0c78602fa7f2772074d381c5 100644 --- a/receiver/src/statistics/receiver_statistics.cpp +++ b/receiver/src/statistics/receiver_statistics.cpp @@ -29,7 +29,7 @@ void ReceiverStatistics::ResetStatistics() noexcept { } } -void ReceiverStatistics::ApplyTimeFrom(const SharedInstancedStatistics& stats) { +void ReceiverStatistics::ApplyTimeFrom(const RequestStatistics* stats) { for (size_t i = 0; i < kNStatisticEntities; i++) { time_counters_[i] += stats->GetElapsed((StatisticEntity) i); } diff --git a/receiver/src/statistics/receiver_statistics.h b/receiver/src/statistics/receiver_statistics.h index 0d3133d13a5e16b3c3c281294dfd2fedc32126c2..96ee773393ef239783d76192cb27f4494836ea74 100644 --- a/receiver/src/statistics/receiver_statistics.h +++ b/receiver/src/statistics/receiver_statistics.h @@ -18,7 +18,7 @@ static const std::vector<std::string> kStatisticEntityNames = { class ReceiverStatistics : public Statistics { public: ReceiverStatistics(unsigned int write_interval = kDefaultStatisticWriteIntervalMs); - void ApplyTimeFrom(const SharedInstancedStatistics& stats); + void ApplyTimeFrom(const RequestStatistics* stats); private: StatisticsToSend PrepareStatisticsToSend() const noexcept override; void ResetStatistics() noexcept override; diff --git a/receiver/unittests/receiver_data_server/net_server/test_rds_fabric_server.cpp b/receiver/unittests/receiver_data_server/net_server/test_rds_fabric_server.cpp index 452f1e69d8e714661fd7af6777ba062e98567177..1818568aa92c723a476687194581c4bed214f7df 100644 --- a/receiver/unittests/receiver_data_server/net_server/test_rds_fabric_server.cpp +++ b/receiver/unittests/receiver_data_server/net_server/test_rds_fabric_server.cpp @@ -37,7 +37,6 @@ TEST(RdsFabricServer, Constructor) { class RdsFabricServerTests : public Test { public: NiceMock<MockLogger> mock_logger; - std::shared_ptr<NiceMock<MockInstancedStatistics>> mock_instanced_statistics; StrictMock<MockIO> mock_io; StrictMock<fabric::MockFabricFactory> mock_fabric_factory; StrictMock<fabric::MockFabricServer> mock_fabric_server; @@ -45,7 +44,6 @@ class RdsFabricServerTests : public Test { std::unique_ptr<RdsFabricServer> rds_server_ptr; void SetUp() override { - mock_instanced_statistics.reset(new NiceMock<MockInstancedStatistics>); mock_monitoring.reset(new StrictMock<MockReceiverMonitoringClient>); RdsFabricServer XX{expected_address, &mock_logger, mock_monitoring}; @@ -221,7 +219,7 @@ TEST_F(RdsFabricServerTests, SendResponseAndSlotData_Ok) { InitServer(); GenericRequestHeader dummyHeader{}; - FabricRdsRequest request(GenericRequestHeader{}, 41, 87, mock_instanced_statistics); + FabricRdsRequest request(GenericRequestHeader{}, 41, 87, nullptr); GenericNetworkResponse response; CacheMeta cacheSlot; cacheSlot.addr = (void*)0xABC; @@ -239,7 +237,7 @@ TEST_F(RdsFabricServerTests, SendResponseAndSlotData_RdmaWrite_Error) { InitServer(); GenericRequestHeader dummyHeader{}; - FabricRdsRequest request(GenericRequestHeader{}, 41, 87, mock_instanced_statistics); + FabricRdsRequest request(GenericRequestHeader{}, 41, 87, nullptr); GenericNetworkResponse response; CacheMeta cacheSlot; cacheSlot.addr = (void*)0xABC; @@ -258,7 +256,7 @@ TEST_F(RdsFabricServerTests, SendResponseAndSlotData_Send_Error) { InitServer(); GenericRequestHeader dummyHeader{}; - FabricRdsRequest request(GenericRequestHeader{}, 41, 87, mock_instanced_statistics); + FabricRdsRequest request(GenericRequestHeader{}, 41, 87, nullptr); GenericNetworkResponse response; CacheMeta cacheSlot; cacheSlot.addr = (void*)0xABC; diff --git a/receiver/unittests/receiver_data_server/net_server/test_rds_tcp_server.cpp b/receiver/unittests/receiver_data_server/net_server/test_rds_tcp_server.cpp index f5c4dbabea4b92c805c68aecec9c01ea46dbb763..30eae2115101932cf2eef385d1eeee3549643109 100644 --- a/receiver/unittests/receiver_data_server/net_server/test_rds_tcp_server.cpp +++ b/receiver/unittests/receiver_data_server/net_server/test_rds_tcp_server.cpp @@ -55,15 +55,11 @@ class RdsTCPServerTests : public Test { ListSocketDescriptors expected_client_sockets{2, 3, 4}; std::vector<std::string> expected_new_connections = {"test1", "test2"}; - std::shared_ptr<NiceMock<asapo::MockInstancedStatistics>> mock_instanced_statistics; - std::shared_ptr<StrictMock<asapo::MockReceiverMonitoringClient>> mock_monitoring; std::unique_ptr<RdsTcpServer> tcp_server_ptr; void SetUp() override { - mock_instanced_statistics.reset(new NiceMock<asapo::MockInstancedStatistics>); - mock_monitoring.reset(new StrictMock<asapo::MockReceiverMonitoringClient>); tcp_server_ptr.reset(new RdsTcpServer{expected_address, &mock_logger, mock_monitoring}); @@ -252,7 +248,7 @@ TEST_F(RdsTCPServerTests, GetNewRequestsReadOk) { TEST_F(RdsTCPServerTests, SendResponse) { asapo::GenericNetworkResponse tmp {}; - asapo::ReceiverDataServerRequest expectedRequest {{}, 30, mock_instanced_statistics}; + asapo::ReceiverDataServerRequest expectedRequest {{}, 30, nullptr}; EXPECT_CALL(mock_io, Send_t(30, &tmp, sizeof(asapo::GenericNetworkResponse), _)) .WillOnce( @@ -270,7 +266,7 @@ TEST_F(RdsTCPServerTests, SendResponseAndSlotData_SendResponseError) { asapo::GenericNetworkResponse tmp {}; - asapo::ReceiverDataServerRequest expectedRequest {{}, 30, mock_instanced_statistics}; + asapo::ReceiverDataServerRequest expectedRequest {{}, 30, nullptr}; asapo::CacheMeta expectedMeta {}; expectedMeta.id = 20; expectedMeta.addr = (void*)0x9234; @@ -290,7 +286,7 @@ TEST_F(RdsTCPServerTests, SendResponseAndSlotData_SendResponseError) { TEST_F(RdsTCPServerTests, SendResponseAndSlotData_SendError) { asapo::GenericNetworkResponse tmp {}; - asapo::ReceiverDataServerRequest expectedRequest {{}, 30, mock_instanced_statistics}; + asapo::ReceiverDataServerRequest expectedRequest {{}, 30, nullptr}; asapo::CacheMeta expectedMeta {}; expectedMeta.id = 20; expectedMeta.addr = (void*)0x9234; @@ -314,7 +310,7 @@ TEST_F(RdsTCPServerTests, SendResponseAndSlotData_SendError) { TEST_F(RdsTCPServerTests, SendResponseAndSlotData_Ok) { asapo::GenericNetworkResponse tmp {}; - asapo::ReceiverDataServerRequest expectedRequest {{}, 30, mock_instanced_statistics}; + asapo::ReceiverDataServerRequest expectedRequest {{}, 30, nullptr}; asapo::CacheMeta expectedMeta {}; expectedMeta.id = 20; expectedMeta.addr = (void*)0x9234; diff --git a/receiver/unittests/receiver_data_server/receiver_dataserver_mocking.h b/receiver/unittests/receiver_data_server/receiver_dataserver_mocking.h index e9240830d8b4629820e589bf27a460e5c7927939..99354692fdd469746281c7a07e1279c67501280c 100644 --- a/receiver/unittests/receiver_data_server/receiver_dataserver_mocking.h +++ b/receiver/unittests/receiver_data_server/receiver_dataserver_mocking.h @@ -19,17 +19,17 @@ class MockNetServer : public RdsNetServer { GenericRequests GetNewRequests(Error* err) override { ErrorInterface* error = nullptr; - auto reqs = GetNewRequests_t(&error); + auto& reqs = GetNewRequests_t(&error); err->reset(error); GenericRequests res; for (const auto& preq : reqs) { - ReceiverDataServerRequestPtr ptr = ReceiverDataServerRequestPtr{new ReceiverDataServerRequest{preq.header, preq.source_id, SharedInstancedStatistics{ new InstancedStatistics } }}; + ReceiverDataServerRequestPtr ptr = ReceiverDataServerRequestPtr{new ReceiverDataServerRequest{preq.header, preq.source_id, nullptr }}; res.push_back(std::move(ptr)); } return res; } - MOCK_METHOD1(GetNewRequests_t, std::vector<ReceiverDataServerRequest> (ErrorInterface** + MOCK_METHOD1(GetNewRequests_t, std::vector<ReceiverDataServerRequest>& (ErrorInterface** error)); Error SendResponse(const ReceiverDataServerRequest* request, diff --git a/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp b/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp index 004915c1f8694fa9150574ddc61a48e4d491d10e..8037ab370588632a13e3b863ed99640dfb2a313f 100644 --- a/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp +++ b/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp @@ -22,6 +22,7 @@ using ::testing::Eq; using ::testing::Ne; using ::testing::Ref; using ::testing::Return; +using ::testing::ReturnRef; using ::testing::_; using ::testing::SetArgPointee; using ::testing::NiceMock; @@ -78,13 +79,14 @@ class ReceiverDataServerTests : public Test { }; TEST_F(ReceiverDataServerTests, TimeoutGetNewRequests) { + auto reqs = std::vector<ReceiverDataServerRequest> {}; EXPECT_CALL(mock_net, GetNewRequests_t(_)).WillOnce( DoAll(SetArgPointee<0>(asapo::IOErrorTemplates::kTimeout.Generate().release()), - Return(std::vector<ReceiverDataServerRequest> {}) + ReturnRef(reqs) ) ).WillOnce( DoAll(SetArgPointee<0>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), - Return(std::vector<ReceiverDataServerRequest> {}) + ReturnRef(reqs) ) ); @@ -95,9 +97,10 @@ TEST_F(ReceiverDataServerTests, TimeoutGetNewRequests) { } TEST_F(ReceiverDataServerTests, ErrorGetNewRequests) { + auto reqs = std::vector<ReceiverDataServerRequest> {}; EXPECT_CALL(mock_net, GetNewRequests_t(_)).WillOnce( DoAll(SetArgPointee<0>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), - Return(std::vector<ReceiverDataServerRequest> {}) + ReturnRef(reqs) ) ); @@ -107,9 +110,10 @@ TEST_F(ReceiverDataServerTests, ErrorGetNewRequests) { } TEST_F(ReceiverDataServerTests, ErrorAddingRequests) { + auto reqs = std::vector<ReceiverDataServerRequest> {}; EXPECT_CALL(mock_net, GetNewRequests_t(_)).WillOnce( DoAll(SetArgPointee<0>(nullptr), - Return(std::vector<ReceiverDataServerRequest> {}) + ReturnRef(reqs) ) ); @@ -123,13 +127,14 @@ TEST_F(ReceiverDataServerTests, ErrorAddingRequests) { } TEST_F(ReceiverDataServerTests, Ok) { + auto reqs = std::vector<ReceiverDataServerRequest> {}; EXPECT_CALL(mock_net, GetNewRequests_t(_)).WillOnce( DoAll(SetArgPointee<0>(nullptr), - Return(std::vector<ReceiverDataServerRequest> {}) + ReturnRef(reqs) ) ).WillOnce( DoAll(SetArgPointee<0>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), - Return(std::vector<ReceiverDataServerRequest> {}) + ReturnRef(reqs) ) ); diff --git a/receiver/unittests/receiver_mocking.h b/receiver/unittests/receiver_mocking.h index f08978bf5f03c1b88037a215e85df940701d0de2..f47ddde586e44e6568fbb186f649d563ddfc2be8 100644 --- a/receiver/unittests/receiver_mocking.h +++ b/receiver/unittests/receiver_mocking.h @@ -32,7 +32,7 @@ class MockStatistics : public asapo::ReceiverStatistics { }; -class MockInstancedStatistics : public asapo::InstancedStatistics { +class MockInstancedStatistics : public asapo::RequestStatistics { public: MOCK_METHOD1(StartTimer, void(StatisticEntity entity)); MOCK_METHOD0(StopTimer, void()); @@ -71,7 +71,7 @@ class MockHandlerDbCheckRequest : public asapo::RequestHandlerDbCheckRequest { class MockRequest: public Request { public: MockRequest(const GenericRequestHeader& request_header, SocketDescriptor socket_fd, std::string origin_uri, - const RequestHandlerDbCheckRequest* db_check_handler, SharedInstancedStatistics statistics): + const RequestHandlerDbCheckRequest* db_check_handler, RequestStatisticsPtr statistics): Request(request_header, socket_fd, std::move(origin_uri), nullptr, db_check_handler, std::move(statistics)) {}; // MOCK_METHOD(, ), (const,override), (override)); @@ -96,7 +96,7 @@ class MockRequest: public Request { MOCK_METHOD(const std::string &, GetOnlinePath, (), (const, override)); MOCK_METHOD(const std::string &, GetOfflinePath, (), (const, override)); - MOCK_METHOD0(GetInstancedStatistics, SharedInstancedStatistics()); + MOCK_METHOD0(GetStatistics, RequestStatistics*()); // not nice casting, but mocking GetCustomData directly does not compile on Windows. const CustomRequestData& GetCustomData() const override { diff --git a/receiver/unittests/request_handler/test_request_handler_monitoring.cpp b/receiver/unittests/request_handler/test_request_handler_monitoring.cpp index 2bb8650e843045bd4e20382038dea331dd2a667a..2a7e4fac51110573fd01b3262581850998a7bb41 100644 --- a/receiver/unittests/request_handler/test_request_handler_monitoring.cpp +++ b/receiver/unittests/request_handler/test_request_handler_monitoring.cpp @@ -58,7 +58,7 @@ namespace { class RequestHandlerMonitoringTests : public Test { public: std::shared_ptr<StrictMock<asapo::MockReceiverMonitoringClient>> mock_monitoring; - std::shared_ptr<StrictMock<asapo::MockInstancedStatistics>> mock_instanced_statistics; + std::unique_ptr<StrictMock<asapo::MockInstancedStatistics>> mock_instanced_statistics; std::unique_ptr<asapo::RequestHandlerMonitoring> handler; std::unique_ptr<NiceMock<MockRequest>> mock_request; NiceMock<MockDatabase> mock_db; @@ -78,14 +78,14 @@ class RequestHandlerMonitoringTests : public Test { mock_instanced_statistics.reset(new StrictMock<asapo::MockInstancedStatistics>); GenericRequestHeader request_header; - mock_request.reset(new NiceMock<MockRequest> {request_header, 1, "", nullptr, mock_instanced_statistics}); + mock_request.reset(new NiceMock<MockRequest> {request_header, 1, "", nullptr, nullptr}); ON_CALL(*mock_request, GetPipelineStepId()).WillByDefault(ReturnRef(expected_pipeline_step_id)); ON_CALL(*mock_request, GetProducerInstanceId()).WillByDefault(ReturnRef(expected_producer_instance_id)); ON_CALL(*mock_request, GetBeamtimeId()).WillByDefault(ReturnRef(expected_beamtime_id)); ON_CALL(*mock_request, GetDataSource()).WillByDefault(ReturnRef(expected_data_source)); ON_CALL(*mock_request, GetStream()).WillByDefault(Return(expected_stream)); ON_CALL(*mock_request, GetFileName()).WillByDefault(Return(expected_file_name)); - ON_CALL(*mock_request, GetInstancedStatistics()).WillByDefault(Return(mock_instanced_statistics)); + ON_CALL(*mock_request, GetStatistics()).WillByDefault(Return(mock_instanced_statistics.get())); } void TearDown() override { } diff --git a/receiver/unittests/request_handler/test_request_handler_receive_data.cpp b/receiver/unittests/request_handler/test_request_handler_receive_data.cpp index 4eaee78f8c42256ba7bc8593a4e7b6ac77a6eb15..b0b7f922393e0b128e5c6d0e9714d8041a127731 100644 --- a/receiver/unittests/request_handler/test_request_handler_receive_data.cpp +++ b/receiver/unittests/request_handler/test_request_handler_receive_data.cpp @@ -43,17 +43,18 @@ class ReceiveDataHandlerTests : public Test { MockDataCache mock_cache; RequestHandlerReceiveData handler; NiceMock<asapo::MockLogger> mock_logger; - std::shared_ptr<StrictMock<asapo::MockInstancedStatistics>> mock_instanced_statistics; + StrictMock<asapo::MockInstancedStatistics>* mock_instanced_statistics; void SetUp() override { - mock_instanced_statistics.reset(new StrictMock<asapo::MockInstancedStatistics>); generic_request_header.data_size = data_size_; generic_request_header.data_id = data_id_; generic_request_header.op_code = expected_op_code; generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; strcpy(generic_request_header.message, expected_request_message); strcpy(generic_request_header.stream, expected_stream); - request.reset(new Request{generic_request_header, socket_fd_, expected_origin_uri, nullptr, nullptr, mock_instanced_statistics}); + mock_instanced_statistics = new StrictMock<asapo::MockInstancedStatistics>; + request.reset(new Request{generic_request_header, socket_fd_, expected_origin_uri, nullptr, nullptr, + std::unique_ptr<asapo::MockInstancedStatistics>{mock_instanced_statistics}}); request->SetBeamtimeId(expected_beamtime); request->SetPipelineStepId(expected_pipelinestepid); request->SetDataSource(expected_source); diff --git a/receiver/unittests/request_handler/test_request_handler_receive_metadata.cpp b/receiver/unittests/request_handler/test_request_handler_receive_metadata.cpp index 0fec0ad5f985e09621e0ded97469d9bbb6a3757f..d0f27fd8d272949ced71cc15fde8065b7239e313 100644 --- a/receiver/unittests/request_handler/test_request_handler_receive_metadata.cpp +++ b/receiver/unittests/request_handler/test_request_handler_receive_metadata.cpp @@ -40,16 +40,17 @@ class ReceiveMetaDataHandlerTests : public Test { NiceMock<MockIO> mock_io; RequestHandlerReceiveMetaData handler; NiceMock<asapo::MockLogger> mock_logger; - std::shared_ptr<StrictMock<asapo::MockInstancedStatistics>> mock_instanced_statistics; + StrictMock<asapo::MockInstancedStatistics>* mock_instanced_statistics; void SetUp() override { - mock_instanced_statistics.reset(new StrictMock<asapo::MockInstancedStatistics>); generic_request_header.data_size = data_size_; generic_request_header.data_id = data_id_; generic_request_header.meta_size = expected_metadata_size; generic_request_header.op_code = expected_op_code; generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; - request.reset(new Request{generic_request_header, socket_fd_, expected_origin_uri, nullptr, nullptr, mock_instanced_statistics}); + mock_instanced_statistics = new StrictMock<asapo::MockInstancedStatistics>; + request.reset(new Request{generic_request_header, socket_fd_, expected_origin_uri, nullptr, nullptr, + std::unique_ptr<asapo::MockInstancedStatistics>{mock_instanced_statistics}}); handler.io__ = std::unique_ptr<asapo::IO> {&mock_io}; handler.log__ = &mock_logger; } diff --git a/receiver/unittests/request_handler/test_requests_dispatcher.cpp b/receiver/unittests/request_handler/test_requests_dispatcher.cpp index 13dd15f3d5ab242afa8cecac1e7084673c68af8b..fd1bd2149d5dd0e676e868b3fd114d358a4fde94 100644 --- a/receiver/unittests/request_handler/test_requests_dispatcher.cpp +++ b/receiver/unittests/request_handler/test_requests_dispatcher.cpp @@ -40,8 +40,7 @@ class MockRequest: public Request { return Error{Handle_t()}; }; MOCK_CONST_METHOD0(Handle_t, ErrorInterface * ()); - - MOCK_METHOD0(GetInstancedStatistics, asapo::SharedInstancedStatistics()); + MOCK_METHOD0(GetStatistics, asapo::RequestStatistics*()); }; @@ -50,17 +49,17 @@ class MockRequestFactory: public asapo::RequestFactory { MockRequestFactory(): RequestFactory(nullptr, nullptr, nullptr) {}; std::unique_ptr<Request> GenerateRequest(const GenericRequestHeader& request_header, SocketDescriptor socket_fd, std::string origin_uri, - const asapo::SharedInstancedStatistics& statistics, + asapo::RequestStatisticsPtr statistics, Error* err) const noexcept override { ErrorInterface* error = nullptr; - auto res = GenerateRequest_t(request_header, socket_fd, origin_uri, statistics, &error); + auto res = GenerateRequest_t(request_header, socket_fd, origin_uri, statistics.get(), &error); err->reset(error); return std::unique_ptr<Request> {res}; } MOCK_CONST_METHOD5(GenerateRequest_t, Request * (const GenericRequestHeader&, SocketDescriptor, std::string, - const asapo::SharedInstancedStatistics& statistics, + const asapo::RequestStatistics* statistics, ErrorInterface**)); }; @@ -87,7 +86,7 @@ class RequestsDispatcherTests : public Test { std::unique_ptr<Request> request{&mock_request}; GenericNetworkResponse response; - std::shared_ptr<StrictMock<asapo::MockInstancedStatistics>> mock_instanced_statistics; + std::unique_ptr<StrictMock<asapo::MockInstancedStatistics>> mock_instanced_statistics; void SetUp() override { @@ -141,8 +140,8 @@ class RequestsDispatcherTests : public Test { EXPECT_CALL(mock_logger, Warning(_)); } - EXPECT_CALL(mock_request, GetInstancedStatistics()).WillRepeatedly( - Return(mock_instanced_statistics) + EXPECT_CALL(mock_request, GetStatistics()).WillRepeatedly( + Return(mock_instanced_statistics.get()) ); } void MockSendResponse(GenericNetworkResponse* response, bool error ) { diff --git a/receiver/unittests/statistics/test_receiver_statistics.cpp b/receiver/unittests/statistics/test_receiver_statistics.cpp index 6e5aa6e48e295e7d6cbf00b78df7dbb132bfcdbc..1758379f874fcd368995417b3e7ba68e8459b649 100644 --- a/receiver/unittests/statistics/test_receiver_statistics.cpp +++ b/receiver/unittests/statistics/test_receiver_statistics.cpp @@ -72,13 +72,13 @@ StatisticsToSend ReceiverStatisticTests::ExtractStat() { void ReceiverStatisticTests::TestTimer(const StatisticEntity& entity) { - asapo::SharedInstancedStatistics instancedStatistics{new asapo::InstancedStatistics}; + asapo::RequestStatisticsPtr instancedStatistics{new asapo::RequestStatistics}; instancedStatistics->StartTimer(entity); std::this_thread::sleep_for(std::chrono::milliseconds(10)); instancedStatistics->StopTimer(); - statistics.ApplyTimeFrom(instancedStatistics); + statistics.ApplyTimeFrom(instancedStatistics.get()); auto stat = ExtractStat(); @@ -108,7 +108,7 @@ TEST_F(ReceiverStatisticTests, TimerForMonitoring) { } TEST_F(ReceiverStatisticTests, ByteCounter) { - asapo::SharedInstancedStatistics instancedStatistics{new asapo::InstancedStatistics}; + asapo::RequestStatisticsPtr instancedStatistics{new asapo::RequestStatistics}; instancedStatistics->AddIncomingBytes(53); instancedStatistics->AddIncomingBytes(23); @@ -121,7 +121,7 @@ TEST_F(ReceiverStatisticTests, ByteCounter) { } TEST_F(ReceiverStatisticTests, TimerForAll) { - asapo::SharedInstancedStatistics instancedStatistics{new asapo::InstancedStatistics}; + asapo::RequestStatisticsPtr instancedStatistics{new asapo::RequestStatistics}; // kDatabase instancedStatistics->StartTimer(StatisticEntity::kDatabase); @@ -148,7 +148,7 @@ TEST_F(ReceiverStatisticTests, TimerForAll) { std::this_thread::sleep_for(std::chrono::milliseconds(30)); instancedStatistics->StopTimer(); - statistics.ApplyTimeFrom(instancedStatistics); + statistics.ApplyTimeFrom(instancedStatistics.get()); auto stat = ExtractStat(); diff --git a/receiver/unittests/test_request.cpp b/receiver/unittests/test_request.cpp index 09df5cc7c7806699fdd427c6dfdaae98a870fe66..b304dd93ff17be5114feb27601d247b1688ab848 100644 --- a/receiver/unittests/test_request.cpp +++ b/receiver/unittests/test_request.cpp @@ -52,10 +52,9 @@ class RequestTests : public Test { std::string expected_api_version = "v0.2"; std::unique_ptr<Request> request; StrictMock<MockIO> mock_io; - std::shared_ptr<StrictMock<asapo::MockInstancedStatistics>> mock_instanced_statistics; + StrictMock<asapo::MockInstancedStatistics>* mock_instanced_statistics; MockDataCache mock_cache; void SetUp() override { - mock_instanced_statistics.reset(new StrictMock<asapo::MockInstancedStatistics>); generic_request_header.data_size = data_size_; generic_request_header.data_id = data_id_; generic_request_header.meta_size = expected_metadata_size; @@ -63,7 +62,9 @@ class RequestTests : public Test { generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::kDefaultIngestMode; strcpy(generic_request_header.message, expected_request_message); strcpy(generic_request_header.api_version, expected_api_version.c_str()); - request.reset(new Request{generic_request_header, expected_socket_id, expected_origin_uri, nullptr, nullptr, mock_instanced_statistics }); + mock_instanced_statistics = new StrictMock<asapo::MockInstancedStatistics>; + request.reset(new Request{generic_request_header, expected_socket_id, expected_origin_uri, nullptr, nullptr, + std::unique_ptr<asapo::MockInstancedStatistics> {mock_instanced_statistics}}); request->io__ = std::unique_ptr<asapo::IO> {&mock_io}; ON_CALL(mock_io, Receive_t(expected_socket_id, _, data_size_, _)).WillByDefault( DoAll(SetArgPointee<3>(nullptr),