Skip to content
Snippets Groups Projects
broker_request.cpp 12.3 KiB
Newer Older
#include "asapo/request/broker_request.h"
#include "asapo/request/consumer_error.h"
#include "asapo/http_client/http_error.h"
#include "asapo/http_client/http_client.h"
#include "asapo/common/internal/version.h"
#include "asapo/json_parser/json_parser.h"

#include <chrono>
#include "asapo/io/io_factory.h"
#include <memory>

namespace asapo
{

    Error GetNoDataResponseFromJson(const std::string &json_string, ConsumerErrorData *data)
    {
        JsonStringParser parser(json_string);
        Error err;
        if ((err = parser.GetUInt64("id", &data->id)) || (err = parser.GetUInt64("id_max", &data->id_max)) || (err = parser.GetString("next_stream", &data->next_stream)))
        {
            return err;
        }
        return nullptr;
    }

    Error GetPartialDataResponseFromJson(const std::string &json_string, PartialErrorData *data)
    {
        Error err;
        auto parser = JsonStringParser(json_string);
        uint64_t id, size;
        if ((err = parser.GetUInt64("size", &size)) ||
            (err = parser.GetUInt64("_id", &id)))
        {
            return err;
        }
        data->id = id;
        data->expected_size = size;
        return nullptr;
    }

    Error ConsumerErrorFromPartialDataResponse(const std::string &response)
    {
        PartialErrorData data;
        auto parse_error = GetPartialDataResponseFromJson(response, &data);
        if (parse_error)
        {
            auto err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("malformed response");
            err->AddDetails("response", response);
            return err;
        }
        auto err = ConsumerErrorTemplates::kPartialData.Generate();
        PartialErrorData *error_data = new PartialErrorData{data};
        err->SetCustomData(std::unique_ptr<CustomErrorData>{error_data});
        return err;
    }

    Error ConsumerErrorFromNoDataResponse(const std::string &response)
    {
        if (response.find("get_record_by_id") != std::string::npos)
        {
            ConsumerErrorData data;
            auto parse_error = GetNoDataResponseFromJson(response, &data);
            if (parse_error)
            {
                return ConsumerErrorTemplates::kInterruptedTransaction.Generate("malformed response - " + response);
            }
            Error err;
            if (data.id >= data.id_max)
            {
                err = data.next_stream.empty() ? ConsumerErrorTemplates::kEndOfStream.Generate() : ConsumerErrorTemplates::kStreamFinished.Generate();
            }
            else
            {
                err = ConsumerErrorTemplates::kNoData.Generate();
            }
            ConsumerErrorData *error_data = new ConsumerErrorData{data};
            err->SetCustomData(std::unique_ptr<CustomErrorData>{error_data});
            return err;
        }
        return ConsumerErrorTemplates::kNoData.Generate();
    }

    Error ConsumerErrorFromHttpCode(const RequestOutput *response, const HttpCode &code)
    {
        switch (code)
        {
        case HttpCode::OK:
            return nullptr;
        case HttpCode::NoContent:
            return nullptr;
        case HttpCode::PartialContent:
            return ConsumerErrorFromPartialDataResponse(response->to_string());
        case HttpCode::BadRequest:
            return ConsumerErrorTemplates::kWrongInput.Generate(response->to_string());
        case HttpCode::Unauthorized:
            return ConsumerErrorTemplates::kWrongInput.Generate(response->to_string());
        case HttpCode::InternalServerError:
            return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response->to_string());
        case HttpCode::ServiceUnavailable:
            return ConsumerErrorTemplates::kUnavailableService.Generate(response->to_string());
        case HttpCode::NotFound:
            return ConsumerErrorTemplates::kUnavailableService.Generate(response->to_string());
        case HttpCode::Conflict:
            return ConsumerErrorFromNoDataResponse(response->to_string());
        case HttpCode::UnsupportedMediaType:
            return ConsumerErrorTemplates::kUnsupportedClient.Generate(response->to_string());
        default:
            return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response->to_string());
        }
    }
    Error ConsumerErrorFromServerError(const Error &server_err)
    {
        if (server_err == HttpErrorTemplates::kTransferError)
        {
            return ConsumerErrorTemplates::kInterruptedTransaction.Generate();
        }
        else
        {
            return ConsumerErrorTemplates::kUnavailableService.Generate();
        }
    }

    Error ProcessRequestResponce(const RequestInfo &request,
                                 Error server_err,
                                 const RequestOutput *response,
                                 const HttpCode &code)
    {
        Error err;
        if (server_err != nullptr)
        {
            err = ConsumerErrorFromServerError(server_err);
            err->SetCause(std::move(server_err));
        }
        else
        {
            err = ConsumerErrorFromHttpCode(response, code);
        }

        if (err != nullptr)
        {
            err->AddDetails("host", request.host)->AddDetails("api", request.api);
        }
        return err;
    }

    ServiceRequest::ServiceRequest(std::string server_uri,
                                   SourceCredentials source) : httpclient__{DefaultHttpClient()},
                                                               endpoint_{std::move(server_uri)},
                                                               source_credentials_(std::move(source))
    {
        data_source_encoded_ = httpclient__->UrlEscape(source_credentials_.data_source);

    }

    std::string ServiceRequest::RequestWithToken(std::string uri)
    {
        return std::move(uri) + "?token=" + source_credentials_.user_token;
    }

    Error ServiceRequest::ProcessPostRequest(const RequestInfo &request, RequestOutput *response, HttpCode *code)
    {
        Error err;
        switch (request.output_mode)
        {
        case OutputDataMode::string:
            response->string_output =
                httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params,
                                   request.cookie,
                                   request.body,
                                   code,
                                   &err);
            break;
        case OutputDataMode::array:
            err =
                httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, request.cookie,
                                   request.body, &response->data_output, response->data_output_size, code);
            break;
        default:
            break;
        }
        return err;
    }

    Error ServiceRequest::ProcessGetRequest(const RequestInfo &request, RequestOutput *response, HttpCode *code)
    {
        Error err;
        response->string_output =
            httpclient__->Get(RequestWithToken(request.host + request.api) + request.extra_params, code, &err);
        return err;
    }

    Error ServiceRequest::ProcessRequest(RequestOutput *response, const RequestInfo &request, std::string *service_uri)
    {
        Error err;
        HttpCode code;
        if (request.post)
        {
            err = ProcessPostRequest(request, response, &code);
        }
        else
        {
            err = ProcessGetRequest(request, response, &code);
        }
        if (err && service_uri)
        {
            service_uri->clear();
        }
        return ProcessRequestResponce(request, std::move(err), response, code);
    }

    RequestInfo ServiceRequest::GetDiscoveryRequest(const std::string &service_name) const
    {
        RequestInfo ri;
        ri.host = endpoint_;
        ri.api = "/asapo-discovery/" + kConsumerProtocol.GetDiscoveryVersion() + "/" + service_name;
        ri.extra_params = "&protocol=" + kConsumerProtocol.GetVersion();
        return ri;
    }

    Error ServiceRequest::ProcessDiscoverServiceResult(Error err, std::string *uri_to_set)
    {
        if (err != nullptr || uri_to_set->empty())
        {
            uri_to_set->clear();
            if (err == ConsumerErrorTemplates::kUnsupportedClient)
            {
                return err;
            }
            auto ret_err = ConsumerErrorTemplates::kUnavailableService.Generate(std::move(err));
            ret_err->AddDetails("destination", endpoint_);
            return ret_err;
        }
        return nullptr;
    }

    std::string ServiceRequest::BrokerRequestWithTimeout(RequestInfo request, Error *err, uint64_t timeout_ms,
                                                    std::atomic<bool> &interrupt_flag)
        *err = ServiceRequestWithTimeout(ServiceName::kBroker, &current_broker_uri_, request, &response, timeout_ms, interrupt_flag);
    std::string ServiceRequest::BrokerRequestWithTimeout(RequestInfo request, Error *err, uint64_t timeout_ms)
        std::atomic<bool> interrupt_flag{false};
        return BrokerRequestWithTimeout(request, err, timeout_ms, interrupt_flag);
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed

    Error ServiceRequest::ServiceRequestWithoutTimeout(const std::string &service_name,
                                                    std::string *service_uri,
                                                    RequestInfo request,
                                                    RequestOutput *response)
    {
        auto err = DiscoverService(service_name, service_uri);
        if (err == nullptr)
        {
            request.host = *service_uri;
            return ProcessRequest(response, request, service_uri);
        }
        return err;
    }

    Error ServiceRequest::ServiceRequestWithTimeout(const std::string &service_name,
                                                    std::string *service_uri,
                                                    RequestInfo request,
                                                    RequestOutput *response,
                                                    uint64_t timeout_ms,
                                                    std::atomic<bool> &interrupt_flag)
    {
        uint64_t elapsed_ms = 0;

        Error err;
        while (elapsed_ms <= timeout_ms)
        {
            if (interrupt_flag)
            {
                err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("interrupted by user request");
                break;
            }
            auto start = std::chrono::steady_clock::now();
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
            err = ServiceRequestWithoutTimeout(service_name, service_uri, request, response);
            if (err == nullptr || err == ConsumerErrorTemplates::kWrongInput)
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
                break;
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            elapsed_ms += static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start).count());
        }
        return err;
    }

    Error ServiceRequest::DiscoverService(const std::string &service_name, std::string *uri_to_set)
    {
        if (!uri_to_set->empty())
        {
            return nullptr;
        }
        auto ri = GetDiscoveryRequest(service_name);
        RequestOutput output;
        auto err = ProcessRequest(&output, ri, nullptr);
        *uri_to_set = std::move(output.string_output);
        return ProcessDiscoverServiceResult(std::move(err), uri_to_set);
    }

    RequestInfo ServiceRequest::CreateBrokerApiRequest(std::string stream, std::string group, std::string suffix) const
    {
        auto stream_encoded = httpclient__->UrlEscape(std::move(stream));
        auto group_encoded = group.size() > 0 ? httpclient__->UrlEscape(std::move(group)) : "";
        auto uri = "/" + kConsumerProtocol.GetBrokerVersion() + "/beamtime/" + source_credentials_.beamtime_id + "/" + data_source_encoded_ + "/" + stream_encoded;
        if (group_encoded.size() > 0)
        {
            uri = uri + "/" + group_encoded;
        }
        if (suffix.size() > 0)
        {
            uri = uri + "/" + suffix;
        }

        RequestInfo ri;
        ri.api = uri;
Mikhail Karnevskiy's avatar
Mikhail Karnevskiy committed
        ri.extra_params += "&instanceid=" + httpclient__->UrlEscape(source_credentials_.instance_id);
        ri.extra_params += "&pipelinestep=" + httpclient__->UrlEscape(source_credentials_.pipeline_step);