Skip to content
Snippets Groups Projects
Commit 7c74d4ff authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

Merge pull request #56 in ASAPO/asapo from bugfix_asapo-consumer-error to develop

* commit '1452aec4':
  increas producer test timeout
  fix
parents e3774afa 1452aec4
No related branches found
No related tags found
No related merge requests found
......@@ -121,47 +121,58 @@ Error ServerDataBroker::GetBrokerUri() {
return nullptr;
}
void ServerDataBroker::ProcessServerError(Error* err, const std::string& response, std::string* op) {
bool ServerDataBroker::SwitchToGetByIdIfNoData(Error* err, const std::string& response, std::string* redirect_uri) {
if (*err == ConsumerErrorTemplates::kNoData) {
auto error_data = static_cast<const ConsumerErrorData*>((*err)->GetCustomData());
if (error_data == nullptr) {
*err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("malformed response - " + response);
return;
return false;
}
*op = std::to_string(error_data->id);
*redirect_uri = std::to_string(error_data->id);
return true;
}
return;
return false;
}
RequestInfo ServerDataBroker::PrepareRequestInfo(std::string api_url, bool dataset) {
RequestInfo ri;
ri.host = current_broker_uri_;
ri.api = std::move(api_url);
if (dataset) {
ri.extra_params = "&dataset=true";
}
return ri;
}
Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string group_id, GetImageServerOperation op,
bool dataset) {
std::string request_suffix = OpToUriCmd(op);
std::string request_api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/" +
std::move(group_id) + "/";
uint64_t elapsed_ms = 0;
Error no_data_error;
while (true) {
auto err = GetBrokerUri();
if (err == nullptr) {
RequestInfo ri;
ri.host = current_broker_uri_;
ri.api = request_api + request_suffix;
if (dataset) {
ri.extra_params = "&dataset=true";
}
auto ri = PrepareRequestInfo(request_api + request_suffix, dataset);
err = ProcessRequest(response, ri);
if (err == nullptr) {
break;
}
}
ProcessServerError(&err, *response, &request_suffix);
if (err == ConsumerErrorTemplates::kInterruptedTransaction && request_suffix == "next") {
return err;
if (request_suffix == "next") {
auto save_error = SwitchToGetByIdIfNoData(&err, *response, &request_suffix);
if (err == ConsumerErrorTemplates::kInterruptedTransaction) {
return err;
}
if (save_error) {
no_data_error = std::move(err);
}
}
if (elapsed_ms >= timeout_ms_) {
return err;
return no_data_error ? std::move(no_data_error) : std::move(err);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
elapsed_ms += 100;
......
......@@ -53,7 +53,7 @@ class ServerDataBroker final : public asapo::DataBroker {
Error GetRecordFromServerById(uint64_t id, std::string* info, std::string group_id, bool dataset = false);
Error GetDataIfNeeded(FileInfo* info, FileData* data);
Error GetBrokerUri();
void ProcessServerError(Error* err, const std::string& response, std::string* redirect_uri);
bool SwitchToGetByIdIfNoData(Error* err, const std::string& response, std::string* redirect_uri);
Error ProcessRequest(std::string* response, const RequestInfo& request);
Error GetImageFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, FileInfo* info, FileData* data);
DataSet GetDatasetFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, Error* err);
......@@ -62,7 +62,7 @@ class ServerDataBroker final : public asapo::DataBroker {
std::string BrokerRequestWithTimeout(RequestInfo request, Error* err);
std::string AppendUri(std::string request_string);
DataSet DecodeDatasetFromResponse(std::string response, Error* err);
RequestInfo PrepareRequestInfo(std::string api_url, bool dataset);
std::string OpToUriCmd(GetImageServerOperation op);
std::string server_uri_;
std::string current_broker_uri_;
......
......@@ -309,6 +309,30 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsEofStreamFromHttpClientUntilTimeout
ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kEndOfStream));
}
TEST_F(ServerDataBrokerTests, GetImageReturnsNoDataAfterTimeoutEvenIfOtherErrorOccured) {
MockGetBrokerUri();
data_broker->SetTimeout(300);
EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
SetArgPointee<1>(HttpCode::Conflict),
SetArgPointee<2>(nullptr),
Return("{\"op\":\"get_record_by_id\",\"id\":" + std::to_string(expected_dataset_id) + ",\"id_max\":2}")));
EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token="
+ expected_token, _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll(
SetArgPointee<1>(HttpCode::NotFound),
SetArgPointee<2>(nullptr),
Return("")));
data_broker->SetTimeout(300);
auto err = data_broker->GetNext(&info, expected_group_id, nullptr);
ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kNoData));
}
TEST_F(ServerDataBrokerTests, GetNextImageReturnsImmediatelyOnServerError) {
MockGetBrokerUri();
......
......@@ -67,6 +67,6 @@ else:
time.sleep(10)
time.sleep(25)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment