Skip to content
Snippets Groups Projects
Commit bfac1fa5 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

fix

parent e3774afa
No related branches found
No related tags found
No related merge requests found
......@@ -121,47 +121,58 @@ Error ServerDataBroker::GetBrokerUri() {
return nullptr;
}
void ServerDataBroker::ProcessServerError(Error* err, const std::string& response, std::string* op) {
bool ServerDataBroker::SwitchToGetByIdIfNoData(Error* err, const std::string& response, std::string* redirect_uri) {
if (*err == ConsumerErrorTemplates::kNoData) {
auto error_data = static_cast<const ConsumerErrorData*>((*err)->GetCustomData());
if (error_data == nullptr) {
*err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("malformed response - " + response);
return;
return false;
}
*op = std::to_string(error_data->id);
*redirect_uri = std::to_string(error_data->id);
return true;
}
return;
return false;
}
RequestInfo ServerDataBroker::PrepareRequestInfo(std::string api_url, bool dataset) {
RequestInfo ri;
ri.host = current_broker_uri_;
ri.api = std::move(api_url);
if (dataset) {
ri.extra_params = "&dataset=true";
}
return ri;
}
Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string group_id, GetImageServerOperation op,
bool dataset) {
std::string request_suffix = OpToUriCmd(op);
std::string request_api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/" +
std::move(group_id) + "/";
uint64_t elapsed_ms = 0;
Error no_data_error;
while (true) {
auto err = GetBrokerUri();
if (err == nullptr) {
RequestInfo ri;
ri.host = current_broker_uri_;
ri.api = request_api + request_suffix;
if (dataset) {
ri.extra_params = "&dataset=true";
}
auto ri = PrepareRequestInfo(request_api + request_suffix, dataset);
err = ProcessRequest(response, ri);
if (err == nullptr) {
break;
}
}
ProcessServerError(&err, *response, &request_suffix);
if (err == ConsumerErrorTemplates::kInterruptedTransaction && request_suffix == "next") {
return err;
if (request_suffix == "next") {
auto save_error = SwitchToGetByIdIfNoData(&err, *response, &request_suffix);
if (err == ConsumerErrorTemplates::kInterruptedTransaction) {
return err;
}
if (save_error) {
no_data_error = std::move(err);
}
}
if (elapsed_ms >= timeout_ms_) {
return err;
return no_data_error ? std::move(no_data_error) : std::move(err);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
elapsed_ms += 100;
......
......@@ -53,7 +53,7 @@ class ServerDataBroker final : public asapo::DataBroker {
Error GetRecordFromServerById(uint64_t id, std::string* info, std::string group_id, bool dataset = false);
Error GetDataIfNeeded(FileInfo* info, FileData* data);
Error GetBrokerUri();
void ProcessServerError(Error* err, const std::string& response, std::string* redirect_uri);
bool SwitchToGetByIdIfNoData(Error* err, const std::string& response, std::string* redirect_uri);
Error ProcessRequest(std::string* response, const RequestInfo& request);
Error GetImageFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, FileInfo* info, FileData* data);
DataSet GetDatasetFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, Error* err);
......@@ -62,7 +62,7 @@ class ServerDataBroker final : public asapo::DataBroker {
std::string BrokerRequestWithTimeout(RequestInfo request, Error* err);
std::string AppendUri(std::string request_string);
DataSet DecodeDatasetFromResponse(std::string response, Error* err);
RequestInfo PrepareRequestInfo(std::string api_url, bool dataset);
std::string OpToUriCmd(GetImageServerOperation op);
std::string server_uri_;
std::string current_broker_uri_;
......
......@@ -309,6 +309,30 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsEofStreamFromHttpClientUntilTimeout
ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kEndOfStream));
}
TEST_F(ServerDataBrokerTests, GetImageReturnsNoDataAfterTimeoutEvenIfOtherErrorOccured) {
MockGetBrokerUri();
data_broker->SetTimeout(300);
EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
SetArgPointee<1>(HttpCode::Conflict),
SetArgPointee<2>(nullptr),
Return("{\"op\":\"get_record_by_id\",\"id\":" + std::to_string(expected_dataset_id) + ",\"id_max\":2}")));
EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token="
+ expected_token, _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll(
SetArgPointee<1>(HttpCode::NotFound),
SetArgPointee<2>(nullptr),
Return("")));
data_broker->SetTimeout(300);
auto err = data_broker->GetNext(&info, expected_group_id, nullptr);
ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kNoData));
}
TEST_F(ServerDataBrokerTests, GetNextImageReturnsImmediatelyOnServerError) {
MockGetBrokerUri();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment