diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp index fda7b50803b09037f2997c1ed49e5d94fa51a6fa..96809340e1e5714340fefc596c3f7fdb740499b6 100644 --- a/consumer/api/cpp/src/server_data_broker.cpp +++ b/consumer/api/cpp/src/server_data_broker.cpp @@ -121,47 +121,58 @@ Error ServerDataBroker::GetBrokerUri() { return nullptr; } -void ServerDataBroker::ProcessServerError(Error* err, const std::string& response, std::string* op) { +bool ServerDataBroker::SwitchToGetByIdIfNoData(Error* err, const std::string& response, std::string* redirect_uri) { if (*err == ConsumerErrorTemplates::kNoData) { auto error_data = static_cast<const ConsumerErrorData*>((*err)->GetCustomData()); if (error_data == nullptr) { *err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("malformed response - " + response); - return; + return false; } - *op = std::to_string(error_data->id); + *redirect_uri = std::to_string(error_data->id); + return true; } - return; + return false; } +RequestInfo ServerDataBroker::PrepareRequestInfo(std::string api_url, bool dataset) { + RequestInfo ri; + ri.host = current_broker_uri_; + ri.api = std::move(api_url); + if (dataset) { + ri.extra_params = "&dataset=true"; + } + return ri; +} + + Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string group_id, GetImageServerOperation op, bool dataset) { std::string request_suffix = OpToUriCmd(op); std::string request_api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/" + std::move(group_id) + "/"; uint64_t elapsed_ms = 0; + Error no_data_error; while (true) { auto err = GetBrokerUri(); if (err == nullptr) { - RequestInfo ri; - ri.host = current_broker_uri_; - ri.api = request_api + request_suffix; - if (dataset) { - ri.extra_params = "&dataset=true"; - } + auto ri = PrepareRequestInfo(request_api + request_suffix, dataset); err = ProcessRequest(response, ri); if (err == nullptr) { break; } } - ProcessServerError(&err, *response, &request_suffix); - - if (err == ConsumerErrorTemplates::kInterruptedTransaction && request_suffix == "next") { - return err; + if (request_suffix == "next") { + auto save_error = SwitchToGetByIdIfNoData(&err, *response, &request_suffix); + if (err == ConsumerErrorTemplates::kInterruptedTransaction) { + return err; + } + if (save_error) { + no_data_error = std::move(err); + } } - if (elapsed_ms >= timeout_ms_) { - return err; + return no_data_error ? std::move(no_data_error) : std::move(err); } std::this_thread::sleep_for(std::chrono::milliseconds(100)); elapsed_ms += 100; diff --git a/consumer/api/cpp/src/server_data_broker.h b/consumer/api/cpp/src/server_data_broker.h index 287a398a9d623b9d076d0d0eb7b4d2581c0210c8..53a394a426abce2496d3012870f5f63c6e25813d 100644 --- a/consumer/api/cpp/src/server_data_broker.h +++ b/consumer/api/cpp/src/server_data_broker.h @@ -53,7 +53,7 @@ class ServerDataBroker final : public asapo::DataBroker { Error GetRecordFromServerById(uint64_t id, std::string* info, std::string group_id, bool dataset = false); Error GetDataIfNeeded(FileInfo* info, FileData* data); Error GetBrokerUri(); - void ProcessServerError(Error* err, const std::string& response, std::string* redirect_uri); + bool SwitchToGetByIdIfNoData(Error* err, const std::string& response, std::string* redirect_uri); Error ProcessRequest(std::string* response, const RequestInfo& request); Error GetImageFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, FileInfo* info, FileData* data); DataSet GetDatasetFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, Error* err); @@ -62,7 +62,7 @@ class ServerDataBroker final : public asapo::DataBroker { std::string BrokerRequestWithTimeout(RequestInfo request, Error* err); std::string AppendUri(std::string request_string); DataSet DecodeDatasetFromResponse(std::string response, Error* err); - + RequestInfo PrepareRequestInfo(std::string api_url, bool dataset); std::string OpToUriCmd(GetImageServerOperation op); std::string server_uri_; std::string current_broker_uri_; diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp index 64315cac025a0da4b3d2d5759cf4aa9e5e17ddb2..988cb859a73fd4a3a8d769489c3e05973ec456ac 100644 --- a/consumer/api/cpp/unittests/test_server_broker.cpp +++ b/consumer/api/cpp/unittests/test_server_broker.cpp @@ -309,6 +309,30 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsEofStreamFromHttpClientUntilTimeout ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kEndOfStream)); } +TEST_F(ServerDataBrokerTests, GetImageReturnsNoDataAfterTimeoutEvenIfOtherErrorOccured) { + MockGetBrokerUri(); + data_broker->SetTimeout(300); + + EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::Conflict), + SetArgPointee<2>(nullptr), + Return("{\"op\":\"get_record_by_id\",\"id\":" + std::to_string(expected_dataset_id) + ",\"id_max\":2}"))); + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" + + expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token=" + + expected_token, _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll( + SetArgPointee<1>(HttpCode::NotFound), + SetArgPointee<2>(nullptr), + Return(""))); + + + data_broker->SetTimeout(300); + auto err = data_broker->GetNext(&info, expected_group_id, nullptr); + + ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kNoData)); +} + + TEST_F(ServerDataBrokerTests, GetNextImageReturnsImmediatelyOnServerError) { MockGetBrokerUri(); diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index b2c80dc012ca3ef4a8c8585747152ff6de1e775d..c65000b9d007295ce2c1c32746172f8cca4e59b2 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -67,6 +67,6 @@ else: -time.sleep(10) +time.sleep(25)