#include "asapo/request/broker_request.h" #include "asapo/request/consumer_error.h" #include "asapo/http_client/http_error.h" #include "asapo/http_client/http_client.h" #include "asapo/common/internal/version.h" #include "asapo/json_parser/json_parser.h" #include <chrono> #include "asapo/io/io_factory.h" #include <memory> namespace asapo { Error GetNoDataResponseFromJson(const std::string &json_string, ConsumerErrorData *data) { JsonStringParser parser(json_string); Error err; if ((err = parser.GetUInt64("id", &data->id)) || (err = parser.GetUInt64("id_max", &data->id_max)) || (err = parser.GetString("next_stream", &data->next_stream))) { return err; } return nullptr; } Error GetPartialDataResponseFromJson(const std::string &json_string, PartialErrorData *data) { Error err; auto parser = JsonStringParser(json_string); uint64_t id, size; if ((err = parser.GetUInt64("size", &size)) || (err = parser.GetUInt64("_id", &id))) { return err; } data->id = id; data->expected_size = size; return nullptr; } Error ConsumerErrorFromPartialDataResponse(const std::string &response) { PartialErrorData data; auto parse_error = GetPartialDataResponseFromJson(response, &data); if (parse_error) { auto err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("malformed response"); err->AddDetails("response", response); return err; } auto err = ConsumerErrorTemplates::kPartialData.Generate(); PartialErrorData *error_data = new PartialErrorData{data}; err->SetCustomData(std::unique_ptr<CustomErrorData>{error_data}); return err; } Error ConsumerErrorFromNoDataResponse(const std::string &response) { if (response.find("get_record_by_id") != std::string::npos) { ConsumerErrorData data; auto parse_error = GetNoDataResponseFromJson(response, &data); if (parse_error) { return ConsumerErrorTemplates::kInterruptedTransaction.Generate("malformed response - " + response); } Error err; if (data.id >= data.id_max) { err = data.next_stream.empty() ? ConsumerErrorTemplates::kEndOfStream.Generate() : ConsumerErrorTemplates::kStreamFinished.Generate(); } else { err = ConsumerErrorTemplates::kNoData.Generate(); } ConsumerErrorData *error_data = new ConsumerErrorData{data}; err->SetCustomData(std::unique_ptr<CustomErrorData>{error_data}); return err; } return ConsumerErrorTemplates::kNoData.Generate(); } Error ConsumerErrorFromHttpCode(const RequestOutput *response, const HttpCode &code) { switch (code) { case HttpCode::OK: return nullptr; case HttpCode::NoContent: return nullptr; case HttpCode::PartialContent: return ConsumerErrorFromPartialDataResponse(response->to_string()); case HttpCode::BadRequest: return ConsumerErrorTemplates::kWrongInput.Generate(response->to_string()); case HttpCode::Unauthorized: return ConsumerErrorTemplates::kWrongInput.Generate(response->to_string()); case HttpCode::InternalServerError: return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response->to_string()); case HttpCode::ServiceUnavailable: return ConsumerErrorTemplates::kUnavailableService.Generate(response->to_string()); case HttpCode::NotFound: return ConsumerErrorTemplates::kUnavailableService.Generate(response->to_string()); case HttpCode::Conflict: return ConsumerErrorFromNoDataResponse(response->to_string()); case HttpCode::UnsupportedMediaType: return ConsumerErrorTemplates::kUnsupportedClient.Generate(response->to_string()); default: return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response->to_string()); } } Error ConsumerErrorFromServerError(const Error &server_err) { if (server_err == HttpErrorTemplates::kTransferError) { return ConsumerErrorTemplates::kInterruptedTransaction.Generate(); } else { return ConsumerErrorTemplates::kUnavailableService.Generate(); } } Error ProcessRequestResponce(const RequestInfo &request, Error server_err, const RequestOutput *response, const HttpCode &code) { Error err; if (server_err != nullptr) { err = ConsumerErrorFromServerError(server_err); err->SetCause(std::move(server_err)); } else { err = ConsumerErrorFromHttpCode(response, code); } if (err != nullptr) { err->AddDetails("host", request.host)->AddDetails("api", request.api); } return err; } ServiceRequest::ServiceRequest(std::string server_uri, SourceCredentials source) : httpclient__{DefaultHttpClient()}, endpoint_{std::move(server_uri)}, source_credentials_(std::move(source)) { data_source_encoded_ = httpclient__->UrlEscape(source_credentials_.data_source); } std::string ServiceRequest::RequestWithToken(std::string uri) { return std::move(uri) + "?token=" + source_credentials_.user_token; } Error ServiceRequest::ProcessPostRequest(const RequestInfo &request, RequestOutput *response, HttpCode *code) { Error err; switch (request.output_mode) { case OutputDataMode::string: response->string_output = httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, request.cookie, request.body, code, &err); break; case OutputDataMode::array: err = httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, request.cookie, request.body, &response->data_output, response->data_output_size, code); break; default: break; } return err; } Error ServiceRequest::ProcessGetRequest(const RequestInfo &request, RequestOutput *response, HttpCode *code) { Error err; response->string_output = httpclient__->Get(RequestWithToken(request.host + request.api) + request.extra_params, code, &err); return err; } Error ServiceRequest::ProcessRequest(RequestOutput *response, const RequestInfo &request, std::string *service_uri) { Error err; HttpCode code; if (request.post) { err = ProcessPostRequest(request, response, &code); } else { err = ProcessGetRequest(request, response, &code); } if (err && service_uri) { service_uri->clear(); } return ProcessRequestResponce(request, std::move(err), response, code); } RequestInfo ServiceRequest::GetDiscoveryRequest(const std::string &service_name) const { RequestInfo ri; ri.host = endpoint_; ri.api = "/asapo-discovery/" + kConsumerProtocol.GetDiscoveryVersion() + "/" + service_name; ri.extra_params = "&protocol=" + kConsumerProtocol.GetVersion(); return ri; } Error ServiceRequest::ProcessDiscoverServiceResult(Error err, std::string *uri_to_set) { if (err != nullptr || uri_to_set->empty()) { uri_to_set->clear(); if (err == ConsumerErrorTemplates::kUnsupportedClient) { return err; } auto ret_err = ConsumerErrorTemplates::kUnavailableService.Generate(std::move(err)); ret_err->AddDetails("destination", endpoint_); return ret_err; } return nullptr; } std::string ServiceRequest::BrokerRequestWithTimeout(RequestInfo request, Error *err, uint64_t timeout_ms, std::atomic<bool> &interrupt_flag) { RequestOutput response; *err = ServiceRequestWithTimeout(ServiceName::kBroker, ¤t_broker_uri_, request, &response, timeout_ms, interrupt_flag); return std::move(response.string_output); } std::string ServiceRequest::BrokerRequestWithTimeout(RequestInfo request, Error *err, uint64_t timeout_ms) { std::atomic<bool> interrupt_flag{false}; return BrokerRequestWithTimeout(request, err, timeout_ms, interrupt_flag); } Error ServiceRequest::ServiceRequestWithoutTimeout(const std::string &service_name, std::string *service_uri, RequestInfo request, RequestOutput *response) { auto err = DiscoverService(service_name, service_uri); if (err == nullptr) { request.host = *service_uri; return ProcessRequest(response, request, service_uri); } return err; } Error ServiceRequest::ServiceRequestWithTimeout(const std::string &service_name, std::string *service_uri, RequestInfo request, RequestOutput *response, uint64_t timeout_ms, std::atomic<bool> &interrupt_flag) { uint64_t elapsed_ms = 0; Error err; while (elapsed_ms <= timeout_ms) { if (interrupt_flag) { err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("interrupted by user request"); break; } auto start = std::chrono::steady_clock::now(); err = ServiceRequestWithoutTimeout(service_name, service_uri, request, response); if (err == nullptr || err == ConsumerErrorTemplates::kWrongInput) { break; } std::this_thread::sleep_for(std::chrono::milliseconds(100)); elapsed_ms += static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start).count()); } return err; } Error ServiceRequest::DiscoverService(const std::string &service_name, std::string *uri_to_set) { if (!uri_to_set->empty()) { return nullptr; } auto ri = GetDiscoveryRequest(service_name); RequestOutput output; auto err = ProcessRequest(&output, ri, nullptr); *uri_to_set = std::move(output.string_output); return ProcessDiscoverServiceResult(std::move(err), uri_to_set); } RequestInfo ServiceRequest::CreateBrokerApiRequest(std::string stream, std::string group, std::string suffix) const { auto stream_encoded = httpclient__->UrlEscape(std::move(stream)); auto group_encoded = group.size() > 0 ? httpclient__->UrlEscape(std::move(group)) : ""; auto uri = "/" + kConsumerProtocol.GetBrokerVersion() + "/beamtime/" + source_credentials_.beamtime_id + "/" + data_source_encoded_ + "/" + stream_encoded; if (group_encoded.size() > 0) { uri = uri + "/" + group_encoded; } if (suffix.size() > 0) { uri = uri + "/" + suffix; } RequestInfo ri; ri.api = uri; ri.extra_params += "&instanceid=" + httpclient__->UrlEscape(source_credentials_.instance_id); ri.extra_params += "&pipelinestep=" + httpclient__->UrlEscape(source_credentials_.pipeline_step); return ri; } }