diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index 7f623e67b2f9cf6dd09470f127770a6e78eeaa44..5290519433b67b6d17d94eced88c1c6f51ad44f7 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -14,8 +14,6 @@ #include "asapo/common/internal/version.h" -using std::chrono::system_clock; - namespace asapo { const std::string ConsumerImpl::kBrokerServiceName = "asapo-broker"; @@ -298,14 +296,14 @@ Error ConsumerImpl::GetRecordFromServer(std::string* response, std::string group std::string request_group = OpToUriCmd(op); std::string request_api = BrokerApiUri(std::move(stream), "", ""); - uint64_t elapsed_ms = 0; Error no_data_error; + auto start = std::chrono::steady_clock::now(); + bool timeout_triggered = false; while (true) { if (interrupt_flag_) { return ConsumerErrorTemplates::kInterruptedTransaction.Generate("interrupted by user request"); } - auto start = system_clock::now(); auto err = DiscoverService(kBrokerServiceName, ¤t_broker_uri_); if (err == nullptr) { auto ri = PrepareRequestInfo(request_api + "/" + httpclient__->UrlEscape(group_id) + "/" + request_suffix, dataset, @@ -336,12 +334,15 @@ Error ConsumerImpl::GetRecordFromServer(std::string* response, std::string group no_data_error = std::move(err); } } + auto elapsed_ms = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::milliseconds> + (std::chrono::steady_clock::now() - start).count()); if (elapsed_ms >= timeout_ms_) { - return no_data_error ? std::move(no_data_error) : std::move(err); + if (timeout_triggered || timeout_ms_ == 0) { + return no_data_error ? std::move(no_data_error) : std::move(err); + } + timeout_triggered = true; // to give a chance make another one request if the previous one took too long } std::this_thread::sleep_for(std::chrono::milliseconds(100)); - elapsed_ms += static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::milliseconds> - (system_clock::now() - start).count()); } return nullptr; } @@ -421,14 +422,14 @@ Error ConsumerImpl::GetDataFromFile(MessageMeta* info, MessageData* data) { err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("interrupted by user request"); break; } - auto start = system_clock::now(); + auto start = std::chrono::steady_clock::now(); *data = io__->GetDataFromFile(info->FullName(source_path_), &info->size, &err); if (err == nullptr) { break; } std::this_thread::sleep_for(std::chrono::milliseconds(100)); elapsed_ms += static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::milliseconds> - (system_clock::now() - start).count()); + (std::chrono::steady_clock::now() - start).count()); } if (err != nullptr) { return ConsumerErrorTemplates::kLocalIOError.Generate(std::move(err)); @@ -531,7 +532,7 @@ Error ConsumerImpl::ServiceRequestWithTimeout(const std::string& service_name, err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("interrupted by user request"); break; } - auto start = system_clock::now(); + auto start = std::chrono::steady_clock::now(); err = DiscoverService(service_name, service_uri); if (err == nullptr) { request.host = *service_uri; @@ -542,7 +543,7 @@ Error ConsumerImpl::ServiceRequestWithTimeout(const std::string& service_name, } std::this_thread::sleep_for(std::chrono::milliseconds(100)); elapsed_ms += static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::milliseconds> - (system_clock::now() - start).count()); + (std::chrono::steady_clock::now() - start).count()); } return err; }