Skip to content
Snippets Groups Projects
Commit 3c5ece52 authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

switch receiver to use broker functions for streams

parent dfb23ece
No related branches found
No related tags found
No related merge requests found
......@@ -12,7 +12,7 @@ type GetStreamsParams = database.GetStreamsParams
func getBoolKey(vals url.Values, key string) bool {
val_str := vals.Get(key)
if val_str == "" || val_str == "true" {
if val_str == "true" {
return true
}
return false
......
......@@ -9,6 +9,7 @@
#include "producer_request.h"
#include "asapo/common/data_structs.h"
#include "asapo/request/request_pool_error.h"
#include "asapo/json_parser/json_parser.h"
#include "asapo/http_client/http_client.h"
#include "asapo/common/internal/version.h"
#include <asapo/io/io_factory.h>
......@@ -262,10 +263,10 @@ Error ProducerImpl::SendStreamFinishedFlag(std::string stream, uint64_t last_id,
}
Error ProducerImpl::SetStreamPersistent(std::string stream, uint64_t timeout_ms) {
GenericRequestHeader request_header{kOpcodePersistStream, 0, 0, 0,
stream + ".persist", stream};
RequestInfo ri = service_request_->CreateBrokerApiRequest(std::move(stream), "", "persist");
ri.post = true;
Error err;
BlockingRequest(std::move(request_header), timeout_ms, &err);
service_request_->BrokerRequestWithTimeout(ri, &err, timeout_ms);
return err;
}
......@@ -439,65 +440,38 @@ T GetResultFromCallback(std::future<T>* promiseResult, uint64_t timeout_ms, Erro
return T{};
}
GenericRequestHeader CreateRequestHeaderFromOp(StreamRequestOp op, std::string stream) {
switch (op) {
case StreamRequestOp::kStreamInfo:
return GenericRequestHeader{kOpcodeStreamInfo, 0, 0, 0, "", stream};
case StreamRequestOp::kLastStream:
return GenericRequestHeader{kOpcodeLastStream, 0, 0, 0, "", ""};
}
return GenericRequestHeader{};
}
StreamInfo ProducerImpl::StreamRequest(StreamRequestOp op, std::string stream, uint64_t timeout_ms, Error* err) const {
std::string ProducerImpl::BlockingRequest(GenericRequestHeader header, uint64_t timeout_ms, Error* err) const {
std::unique_ptr<std::promise<ReceiverResponse>> promise{new std::promise<ReceiverResponse>};
std::future<ReceiverResponse> promiseResult = promise->get_future();
RequestInfo ri = service_request_->CreateBrokerApiRequest("0", "", "streams");
ri.extra_params = "&detailed=true&from=" + service_request_->UrlEscape(stream);
*err = request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {
new ProducerRequest{
source_cred_string_, std::move(header),
nullptr, "", "",
unwrap_callback(
ActivatePromiseForReceiverResponse,
std::move(promise)), true,
timeout_ms}
}, true);
if (*err) {
return "";
if (op == StreamRequestOp::kStreamInfo) {
ri.extra_params += "&single=true";
} else {
ri.extra_params += "&last=true";
}
auto res = GetResultFromCallback<ReceiverResponse>(&promiseResult, timeout_ms + 2000,
err); // we give two more sec for request to exit by timeout
auto response = service_request_->BrokerRequestWithTimeout(ri, err, timeout_ms);
if (*err) {
return "";
}
if (res.err == nullptr) {
return res.payload;
} else {
(*err).reset(res.err);
return "";
return StreamInfo{};
}
}
StreamInfo ProducerImpl::StreamRequest(StreamRequestOp op, std::string stream, uint64_t timeout_ms, Error* err) const {
auto header = CreateRequestHeaderFromOp(op, stream);
auto response = BlockingRequest(std::move(header), timeout_ms, err);
std::vector<std::string> streams_encoded;
Error parse_err;
auto parser = JsonStringParser(std::move(response));
*err = parser.GetArrayRawStrings("streams", &streams_encoded);
if (*err) {
return StreamInfo{};
}
StreamInfo res;
if (!res.SetFromJson(response)) {
*err = ProducerErrorTemplates::kInternalServerError.Generate(
std::string("cannot read JSON string from server response: ") + response);
StreamInfo si;
auto ok = si.SetFromJson(streams_encoded[0]);
if (!ok) {
*err = GeneralErrorTemplates::kSimpleError.Generate("cannot parse " + streams_encoded[0]);
return StreamInfo{};
}
return si;
*err = nullptr;
return res;
}
StreamInfo ProducerImpl::GetStreamInfo(std::string stream, uint64_t timeout_ms, Error* err) const {
......
......@@ -93,7 +93,6 @@ class ProducerImpl : public Producer {
std::string stream,
RequestCallback callback);
StreamInfo StreamRequest(StreamRequestOp op, std::string stream, uint64_t timeout_ms, Error* err) const;
std::string BlockingRequest(GenericRequestHeader header, uint64_t timeout_ms, Error* err) const;
Error Send(const MessageHeader& message_header, std::string stream, MessageData data, std::string full_path,
uint64_t ingest_mode,
RequestCallback callback, bool manage_data_memory);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment