diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f78199c0473b36c85ce3857a83e11ddaf71ec2a5..734b87c4d6e5265283cd8217d3489c072061f869 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -129,7 +129,7 @@ test-services-linux-debug: - bash $CI_PROJECT_DIR/deploy/build_env/services-linux/run_asapo.sh - supervisorctl status - cd $CI_PROJECT_DIR/build - - ctest --no-compress-output -T Test -E "full_chain_monitoring|noaccess|restart|logger_fluentd" --output-on-failure --output-junit testResult.xml + - ctest --no-compress-output -T Test -E "full_chain_monitoring|noaccess|restart|logger_fluentd|coverage" --output-on-failure --output-junit testResult.xml - pip3 install pytest tags: - kubernetes-executor diff --git a/CMakeModules/testing_cpp.cmake b/CMakeModules/testing_cpp.cmake index afef4dfa7f1b63cecad2ecf1b1ad188a9724d87f..7f9a0448f267b7f99da9feb33c895277efdd4a87 100644 --- a/CMakeModules/testing_cpp.cmake +++ b/CMakeModules/testing_cpp.cmake @@ -109,7 +109,7 @@ function(gtest target test_source_files linktarget) LIST(GET ARGN 0 NOCOV) endif() if (CMAKE_COMPILER_IS_GNUCXX AND NOT 1${NOCOV} STREQUAL "1nocov") - set(COVERAGE_EXCLUDES "*/unittests/*" "*/3d_party/*" "*/python/*") + set(COVERAGE_EXCLUDES "*/unittests/*" "*/3d_party/*" "*/python/*" "*/service_request*") if (ARGN) set(COVERAGE_EXCLUDES ${COVERAGE_EXCLUDES} ${ARGN}) endif () diff --git a/common/cpp/CMakeLists.txt b/common/cpp/CMakeLists.txt index f8d17a58755f47cab78fb641fab72d1976a81711..bc4f151cdb2b7e9af6778b5cbe346070ac46fe43 100644 --- a/common/cpp/CMakeLists.txt +++ b/common/cpp/CMakeLists.txt @@ -10,6 +10,8 @@ add_subdirectory(src/version) add_subdirectory(src/http_client) +add_subdirectory(src/http_request) + add_subdirectory(src/logger) add_subdirectory(src/request) diff --git a/common/cpp/include/asapo/http_request/consumer_error.h b/common/cpp/include/asapo/http_request/consumer_error.h new file mode 100644 index 0000000000000000000000000000000000000000..ac39f49fba7f324968657488329d44dcc1e63cb8 --- /dev/null +++ b/common/cpp/include/asapo/http_request/consumer_error.h @@ -0,0 +1,87 @@ +#ifndef ASAPO_CONSUMER_ERROR_H +#define ASAPO_CONSUMER_ERROR_H + +#include "asapo/common/error.h" +#include "asapo/common/io_error.h" + +namespace asapo { + +enum class ConsumerErrorType { + kNoData = 0, + kEndOfStream, + kStreamFinished, + kUnavailableService, + kInterruptedTransaction, + kLocalIOError, + kWrongInput, + kPartialData, + kUnsupportedClient, + kDataNotInCache, +}; + +using ConsumerErrorTemplate = ServiceErrorTemplate<ConsumerErrorType>; + + +class PartialErrorData : public CustomErrorData { + public: + uint64_t id; + uint64_t expected_size; +}; + +class ConsumerErrorData : public CustomErrorData { + public: + uint64_t id; + uint64_t id_max; + std::string next_stream; +}; + +// This contains errors that are used in both Consumer and Producer. +// This is needed to keep back compatibility, while before refactoring it was used for consumer only. +namespace ConsumerErrorTemplates { + + +auto const kPartialData = ConsumerErrorTemplate { + "partial data", ConsumerErrorType::kPartialData +}; + +auto const kLocalIOError = ConsumerErrorTemplate { + "local i/o error", ConsumerErrorType::kLocalIOError +}; + +auto const kDataNotInCache = ConsumerErrorTemplate { + "data not in cache", ConsumerErrorType::kDataNotInCache +}; + +auto const kStreamFinished = ConsumerErrorTemplate { + "stream finished", ConsumerErrorType::kStreamFinished +}; + +auto const kEndOfStream = ConsumerErrorTemplate { + "no data - end of stream", ConsumerErrorType::kEndOfStream +}; + +auto const kNoData = ConsumerErrorTemplate { + "no data", ConsumerErrorType::kNoData +}; + +auto const kWrongInput = ConsumerErrorTemplate { + "wrong input", ConsumerErrorType::kWrongInput +}; + +auto const kUnsupportedClient = ConsumerErrorTemplate { + "unsupported client version", ConsumerErrorType::kUnsupportedClient +}; + +auto const kInterruptedTransaction = ConsumerErrorTemplate { + "server error", ConsumerErrorType::kInterruptedTransaction +}; + +auto const kUnavailableService = ConsumerErrorTemplate { + "service unavailable", ConsumerErrorType::kUnavailableService +}; + +} +} + +#endif //ASAPO_CONSUMER_ERROR_H + diff --git a/common/cpp/include/asapo/http_request/service_request.h b/common/cpp/include/asapo/http_request/service_request.h new file mode 100644 index 0000000000000000000000000000000000000000..0b24bc66ea0aaeade2b39623596cb63f282f3622 --- /dev/null +++ b/common/cpp/include/asapo/http_request/service_request.h @@ -0,0 +1,103 @@ +#ifndef ASAPO_GENERIC_SERVICE_REQUEST_H +#define ASAPO_GENERIC_SERVICE_REQUEST_H + +#include <chrono> +#include "asapo/common/networking.h" +#include "asapo/common/data_structs.h" +#include "asapo/io/io.h" +#include "asapo/common/networking.h" +#include "asapo/http_client/http_client.h" + +#include <mutex> +#include <atomic> + +namespace asapo +{ + + namespace ServiceName { + const std::string kBroker = "asapo-broker"; + const std::string kFileTransfer = "asapo-file-transfer"; + } + + enum class OutputDataMode + { + string, + array, + file + }; + + struct RequestInfo + { + std::string host; + std::string api; + std::string extra_params; + std::string body; + std::string cookie; + OutputDataMode output_mode = OutputDataMode::string; + bool post = false; + }; + + struct RequestOutput + { + std::string string_output; + MessageData data_output; + uint64_t data_output_size; + const char *to_string() const + { + if (!data_output) + { + return string_output.c_str(); + } + else + { + return reinterpret_cast<char const *>(data_output.get()); + } + } + }; + + Error ProcessRequestResponce(const RequestInfo& request, Error server_err, const RequestOutput* response, + const HttpCode& code); + Error ConsumerErrorFromNoDataResponse(const std::string& response); + Error ConsumerErrorFromPartialDataResponse(const std::string& response); + DataSet DecodeDatasetFromResponse(std::string response, Error* err); + + + class ServiceRequest + { + public: + std::string UrlEscape(std::string url); + ServiceRequest(std::string server_uri, SourceCredentials source); + + std::string BrokerRequestWithTimeout(RequestInfo request, Error *err, uint64_t timeout_ms); + RequestInfo CreateBrokerApiRequest(std::string stream, std::string group, std::string suffix) const; + Error ServiceRequestWithoutTimeout(const std::string &service_name, std::string *service_uri, + RequestInfo request, RequestOutput *response); + Error ServiceRequestWithTimeout(const std::string& service_name, std::string* service_uri, RequestInfo request, + RequestOutput* response, uint64_t timeout_ms); + + Error ProcessRequest(RequestOutput *response, const RequestInfo &request, std::string *service_uri); + + Error GetServerVersionInfo(std::string client_type, std::string client_protocol_version, + std::string* server_info, bool* supported); + + std::unique_ptr<HttpClient> httpclient__; + void InterruptCurrentOperation(); + private: + std::string RequestWithToken(std::string uri); + + Error ProcessPostRequest(const RequestInfo &request, RequestOutput *response, HttpCode *code); + Error ProcessGetRequest(const RequestInfo &request, RequestOutput *response, HttpCode *code); + + Error DiscoverService(const std::string& service_name, std::string* uri_to_set); + RequestInfo GetDiscoveryRequest(const std::string& service_name) const; + Error ProcessDiscoverServiceResult(Error err, std::string *uri_to_set); + + std::string endpoint_; + SourceCredentials source_credentials_; + std::string data_source_encoded_; + std::string current_broker_uri_; + std::atomic<bool> interrupt_flag_{false}; + + }; +} +#endif // ASAPO_GENERIC_SERVICE_REQUEST_H \ No newline at end of file diff --git a/common/cpp/src/http_request/CMakeLists.txt b/common/cpp/src/http_request/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..a1ce645d27d78e4f80e7a5e7d23c22f983de8aab --- /dev/null +++ b/common/cpp/src/http_request/CMakeLists.txt @@ -0,0 +1,12 @@ +set(TARGET_NAME http_request) +set(SOURCE_FILES + service_request.cpp) + +################################ +# Library +################################ + +add_library(${TARGET_NAME} OBJECT ${SOURCE_FILES}) + +target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR}) + diff --git a/common/cpp/src/http_request/service_request.cpp b/common/cpp/src/http_request/service_request.cpp new file mode 100644 index 0000000000000000000000000000000000000000..db5360ee3c40ac5134dc8d4151c45dd523af9706 --- /dev/null +++ b/common/cpp/src/http_request/service_request.cpp @@ -0,0 +1,344 @@ +#include "asapo/http_request/service_request.h" +#include "asapo/http_request/consumer_error.h" +#include "asapo/http_client/http_error.h" +#include "asapo/http_client/http_client.h" +#include "asapo/common/internal/version.h" +#include "asapo/json_parser/json_parser.h" + +#include <chrono> +#include "asapo/io/io_factory.h" +#include <memory> + +namespace asapo +{ + + Error GetNoDataResponseFromJson(const std::string &json_string, ConsumerErrorData *data) + { + JsonStringParser parser(json_string); + Error err; + if ((err = parser.GetUInt64("id", &data->id)) || (err = parser.GetUInt64("id_max", &data->id_max)) || (err = parser.GetString("next_stream", &data->next_stream))) + { + return err; + } + return nullptr; + } + + Error GetPartialDataResponseFromJson(const std::string &json_string, PartialErrorData *data) + { + Error err; + auto parser = JsonStringParser(json_string); + uint64_t id, size; + if ((err = parser.GetUInt64("size", &size)) || + (err = parser.GetUInt64("_id", &id))) + { + return err; + } + data->id = id; + data->expected_size = size; + return nullptr; + } + + Error ConsumerErrorFromPartialDataResponse(const std::string &response) + { + PartialErrorData data; + auto parse_error = GetPartialDataResponseFromJson(response, &data); + if (parse_error) + { + auto err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("malformed response"); + err->AddDetails("response", response); + return err; + } + auto err = ConsumerErrorTemplates::kPartialData.Generate(); + PartialErrorData *error_data = new PartialErrorData{data}; + err->SetCustomData(std::unique_ptr<CustomErrorData>{error_data}); + return err; + } + + Error ConsumerErrorFromNoDataResponse(const std::string &response) + { + if (response.find("get_record_by_id") != std::string::npos) + { + ConsumerErrorData data; + auto parse_error = GetNoDataResponseFromJson(response, &data); + if (parse_error) + { + return ConsumerErrorTemplates::kInterruptedTransaction.Generate("malformed response - " + response); + } + Error err; + if (data.id >= data.id_max) + { + err = data.next_stream.empty() ? ConsumerErrorTemplates::kEndOfStream.Generate() : ConsumerErrorTemplates::kStreamFinished.Generate(); + } + else + { + err = ConsumerErrorTemplates::kNoData.Generate(); + } + ConsumerErrorData *error_data = new ConsumerErrorData{data}; + err->SetCustomData(std::unique_ptr<CustomErrorData>{error_data}); + return err; + } + return ConsumerErrorTemplates::kNoData.Generate(); + } + + Error ConsumerErrorFromHttpCode(const RequestOutput *response, const HttpCode &code) + { + switch (code) + { + case HttpCode::OK: + return nullptr; + case HttpCode::NoContent: + return nullptr; + case HttpCode::PartialContent: + return ConsumerErrorFromPartialDataResponse(response->to_string()); + case HttpCode::BadRequest: + return ConsumerErrorTemplates::kWrongInput.Generate(response->to_string()); + case HttpCode::Unauthorized: + return ConsumerErrorTemplates::kWrongInput.Generate(response->to_string()); + case HttpCode::InternalServerError: + return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response->to_string()); + case HttpCode::ServiceUnavailable: + return ConsumerErrorTemplates::kUnavailableService.Generate(response->to_string()); + case HttpCode::NotFound: + return ConsumerErrorTemplates::kUnavailableService.Generate(response->to_string()); + case HttpCode::Conflict: + return ConsumerErrorFromNoDataResponse(response->to_string()); + case HttpCode::UnsupportedMediaType: + return ConsumerErrorTemplates::kUnsupportedClient.Generate(response->to_string()); + default: + return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response->to_string()); + } + } + Error ConsumerErrorFromServerError(const Error &server_err) + { + if (server_err == HttpErrorTemplates::kTransferError) + { + return ConsumerErrorTemplates::kInterruptedTransaction.Generate(); + } + else + { + return ConsumerErrorTemplates::kUnavailableService.Generate(); + } + } + + Error ProcessRequestResponce(const RequestInfo &request, + Error server_err, + const RequestOutput *response, + const HttpCode &code) + { + Error err; + if (server_err != nullptr) + { + err = ConsumerErrorFromServerError(server_err); + err->SetCause(std::move(server_err)); + } + else + { + err = ConsumerErrorFromHttpCode(response, code); + } + + if (err != nullptr) + { + err->AddDetails("host", request.host)->AddDetails("api", request.api); + } + return err; + } + + std::string ServiceRequest::UrlEscape(std::string url) + { + return httpclient__->UrlEscape(url); + } + + ServiceRequest::ServiceRequest(std::string server_uri, + SourceCredentials source) : httpclient__{DefaultHttpClient()}, + endpoint_{std::move(server_uri)}, + source_credentials_(std::move(source)) + { + data_source_encoded_ = httpclient__->UrlEscape(source_credentials_.data_source); + + } + + std::string ServiceRequest::RequestWithToken(std::string uri) + { + return std::move(uri) + "?token=" + source_credentials_.user_token; + } + + Error ServiceRequest::ProcessPostRequest(const RequestInfo &request, RequestOutput *response, HttpCode *code) + { + Error err; + switch (request.output_mode) + { + case OutputDataMode::string: + response->string_output = + httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, + request.cookie, + request.body, + code, + &err); + break; + case OutputDataMode::array: + err = + httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, request.cookie, + request.body, &response->data_output, response->data_output_size, code); + break; + default: + break; + } + return err; + } + + Error ServiceRequest::ProcessGetRequest(const RequestInfo &request, RequestOutput *response, HttpCode *code) + { + Error err; + response->string_output = + httpclient__->Get(RequestWithToken(request.host + request.api) + request.extra_params, code, &err); + return err; + } + + Error ServiceRequest::ProcessRequest(RequestOutput *response, const RequestInfo &request, std::string *service_uri) + { + Error err; + HttpCode code; + if (request.post) + { + err = ProcessPostRequest(request, response, &code); + } + else + { + err = ProcessGetRequest(request, response, &code); + } + if (err && service_uri) + { + service_uri->clear(); + } + return ProcessRequestResponce(request, std::move(err), response, code); + } + + RequestInfo ServiceRequest::GetDiscoveryRequest(const std::string &service_name) const + { + RequestInfo ri; + ri.host = endpoint_; + ri.api = "/asapo-discovery/" + kConsumerProtocol.GetDiscoveryVersion() + "/" + service_name; + ri.extra_params = "&protocol=" + kConsumerProtocol.GetVersion(); + return ri; + } + + Error ServiceRequest::ProcessDiscoverServiceResult(Error err, std::string *uri_to_set) + { + if (err != nullptr || uri_to_set->empty()) + { + uri_to_set->clear(); + if (err == ConsumerErrorTemplates::kUnsupportedClient) + { + return err; + } + auto ret_err = ConsumerErrorTemplates::kUnavailableService.Generate(std::move(err)); + ret_err->AddDetails("destination", endpoint_); + return ret_err; + } + return nullptr; + } + + std::string ServiceRequest::BrokerRequestWithTimeout(RequestInfo request, Error *err, uint64_t timeout_ms) + { + RequestOutput response; + *err = ServiceRequestWithTimeout(ServiceName::kBroker, ¤t_broker_uri_, request, &response, timeout_ms); + return std::move(response.string_output); + } + + Error ServiceRequest::ServiceRequestWithoutTimeout(const std::string &service_name, + std::string *service_uri, + RequestInfo request, + RequestOutput *response) + { + auto err = DiscoverService(service_name, service_uri); + if (err == nullptr) + { + request.host = *service_uri; + return ProcessRequest(response, request, service_uri); + } + return err; + } + + Error ServiceRequest::ServiceRequestWithTimeout(const std::string &service_name, + std::string *service_uri, + RequestInfo request, + RequestOutput *response, + uint64_t timeout_ms) + { + interrupt_flag_ = false; + uint64_t elapsed_ms = 0; + Error err; + while (elapsed_ms <= timeout_ms) + { + if (interrupt_flag_) + { + err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("interrupted by user request"); + break; + } + auto start = std::chrono::steady_clock::now(); + err = ServiceRequestWithoutTimeout(service_name, service_uri, request, response); + if (err == nullptr || err == ConsumerErrorTemplates::kWrongInput) + { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + elapsed_ms += static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start).count()); + } + return err; + } + + Error ServiceRequest::DiscoverService(const std::string &service_name, std::string *uri_to_set) + { + if (!uri_to_set->empty()) + { + return nullptr; + } + auto ri = GetDiscoveryRequest(service_name); + RequestOutput output; + auto err = ProcessRequest(&output, ri, nullptr); + *uri_to_set = std::move(output.string_output); + return ProcessDiscoverServiceResult(std::move(err), uri_to_set); + } + + RequestInfo ServiceRequest::CreateBrokerApiRequest(std::string stream, std::string group, std::string suffix) const + { + auto stream_encoded = httpclient__->UrlEscape(std::move(stream)); + auto group_encoded = group.size() > 0 ? httpclient__->UrlEscape(std::move(group)) : ""; + auto uri = "/" + kConsumerProtocol.GetBrokerVersion() + "/beamtime/" + source_credentials_.beamtime_id + "/" + data_source_encoded_ + "/" + stream_encoded; + if (group_encoded.size() > 0) + { + uri = uri + "/" + group_encoded; + } + if (suffix.size() > 0) + { + uri = uri + "/" + suffix; + } + + RequestInfo ri; + ri.api = uri; + ri.extra_params += "&instanceid=" + httpclient__->UrlEscape(source_credentials_.instance_id); + ri.extra_params += "&pipelinestep=" + httpclient__->UrlEscape(source_credentials_.pipeline_step); + + return ri; + } + + void ServiceRequest::InterruptCurrentOperation() { + interrupt_flag_ = true; + } + + Error ServiceRequest::GetServerVersionInfo(std::string client_type, std::string client_protocol_version, + std::string* server_info, bool* supported) { + RequestInfo ri; + ri.host = endpoint_; + ri.api = "/asapo-discovery/" + kProducerProtocol.GetDiscoveryVersion() + "/version"; + ri.extra_params = "&client="+client_type+"&protocol=" + client_protocol_version; + + RequestOutput response; + auto err = ProcessRequest(&response, ri, nullptr); + if (err) { + return err; + } + return ExtractVersionFromResponse(response.string_output, client_type, server_info, supported); + } + +} \ No newline at end of file diff --git a/consumer/api/cpp/CMakeLists.txt b/consumer/api/cpp/CMakeLists.txt index 1009f2347afa4dcb39021c12d5015f41a01bc4e3..07ee2651325f3ee239a251cff1edb357f291f4d2 100644 --- a/consumer/api/cpp/CMakeLists.txt +++ b/consumer/api/cpp/CMakeLists.txt @@ -25,7 +25,7 @@ target_include_directories(consumer_lib_objects SYSTEM PRIVATE ${LIBFABRIC_INCLU if (BUILD_STATIC_CLIENT_LIBS) add_library(${TARGET_NAME} STATIC $<TARGET_OBJECTS:consumer_lib_objects> $<TARGET_OBJECTS:asapo_fabric_objects> $<TARGET_OBJECTS:system_io> - $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:data_structs> $<TARGET_OBJECTS:version> $<TARGET_OBJECTS:curl_http_client> $<TARGET_OBJECTS:common>) + $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:data_structs> $<TARGET_OBJECTS:http_request> $<TARGET_OBJECTS:version> $<TARGET_OBJECTS:curl_http_client> $<TARGET_OBJECTS:common>) target_include_directories(${TARGET_NAME} SYSTEM PUBLIC $<BUILD_INTERFACE:${LIBFABRIC_INCLUDE_DIR}>) target_include_directories(${TARGET_NAME} PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>) target_include_directories(${TARGET_NAME} PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../c/include>) diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index 71841270aa524455b7d7d01eee7fe30c250adc01..d23eabb88c7ae9187794a859a97438d8c8361787 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -11,130 +11,16 @@ #include "asapo/asapo_consumer.h" #include "fabric_consumer_client.h" #include "rds_response_error.h" - +#include "asapo/common/networking.h" #include "asapo/common/internal/version.h" namespace asapo { -const std::string ConsumerImpl::kBrokerServiceName = "asapo-broker"; -const std::string ConsumerImpl::kFileTransferServiceName = "asapo-file-transfer"; - -Error GetNoDataResponseFromJson(const std::string& json_string, ConsumerErrorData* data) { - JsonStringParser parser(json_string); - Error err; - if ((err = parser.GetUInt64("id", &data->id)) || (err = parser.GetUInt64("id_max", &data->id_max)) - || (err = parser.GetString("next_stream", &data->next_stream))) { - return err; - } - return nullptr; -} - -Error GetPartialDataResponseFromJson(const std::string& json_string, PartialErrorData* data) { - Error err; - auto parser = JsonStringParser(json_string); - uint64_t id, size; - if ((err = parser.GetUInt64("size", &size)) || - (err = parser.GetUInt64("_id", &id))) { - return err; - } - data->id = id; - data->expected_size = size; - return nullptr; -} - -Error ConsumerErrorFromPartialDataResponse(const std::string& response) { - PartialErrorData data; - auto parse_error = GetPartialDataResponseFromJson(response, &data); - if (parse_error) { - auto err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("malformed response" ); - err->AddDetails("response",response); - return err; - } - auto err = ConsumerErrorTemplates::kPartialData.Generate(); - PartialErrorData* error_data = new PartialErrorData{data}; - err->SetCustomData(std::unique_ptr<CustomErrorData> {error_data}); - return err; -} - -Error ConsumerErrorFromNoDataResponse(const std::string& response) { - if (response.find("get_record_by_id") != std::string::npos) { - ConsumerErrorData data; - auto parse_error = GetNoDataResponseFromJson(response, &data); - if (parse_error) { - return ConsumerErrorTemplates::kInterruptedTransaction.Generate("malformed response - " + response); - } - Error err; - if (data.id >= data.id_max) { - err = data.next_stream.empty() ? ConsumerErrorTemplates::kEndOfStream.Generate() : - ConsumerErrorTemplates::kStreamFinished.Generate(); - } else { - err = ConsumerErrorTemplates::kNoData.Generate(); - } - ConsumerErrorData* error_data = new ConsumerErrorData{data}; - err->SetCustomData(std::unique_ptr<CustomErrorData> {error_data}); - return err; - } - return ConsumerErrorTemplates::kNoData.Generate(); -} - -Error ConsumerErrorFromHttpCode(const RequestOutput* response, const HttpCode& code) { - switch (code) { - case HttpCode::OK: - return nullptr; - case HttpCode::NoContent: - return nullptr; - case HttpCode::PartialContent: - return ConsumerErrorFromPartialDataResponse(response->to_string()); - case HttpCode::BadRequest: - return ConsumerErrorTemplates::kWrongInput.Generate(response->to_string()); - case HttpCode::Unauthorized: - return ConsumerErrorTemplates::kWrongInput.Generate(response->to_string()); - case HttpCode::InternalServerError: - return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response->to_string()); - case HttpCode::ServiceUnavailable: - return ConsumerErrorTemplates::kUnavailableService.Generate(response->to_string()); - case HttpCode::NotFound: - return ConsumerErrorTemplates::kUnavailableService.Generate(response->to_string()); - case HttpCode::Conflict: - return ConsumerErrorFromNoDataResponse(response->to_string()); - case HttpCode::UnsupportedMediaType: - return ConsumerErrorTemplates::kUnsupportedClient.Generate(response->to_string()); - default: - return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response->to_string()); - } -} -Error ConsumerErrorFromServerError(const Error& server_err) { - if (server_err == HttpErrorTemplates::kTransferError) { - return ConsumerErrorTemplates::kInterruptedTransaction.Generate(); - } else { - return ConsumerErrorTemplates::kUnavailableService.Generate(); - } -} - -Error ProcessRequestResponce(const RequestInfo& request, - Error server_err, - const RequestOutput* response, - const HttpCode& code) { - Error err; - if (server_err != nullptr) { - err = ConsumerErrorFromServerError(server_err); - err->SetCause(std::move(server_err)); - } else { - err = ConsumerErrorFromHttpCode(response, code); - } - - if (err != nullptr) { - err->AddDetails("host", request.host)->AddDetails("api", request.api); - } - return err; - -} - ConsumerImpl::ConsumerImpl(std::string server_uri, std::string source_path, bool has_filesystem, SourceCredentials source) : - io__{GenerateDefaultIO()}, httpclient__{DefaultHttpClient()}, + io__{GenerateDefaultIO()}, endpoint_{std::move(server_uri)}, source_path_{std::move(source_path)}, has_filesystem_{has_filesystem}, source_credentials_(std::move(source)) { @@ -164,7 +50,7 @@ ConsumerImpl::ConsumerImpl(std::string server_uri, source_credentials_.pipeline_step = "DefaultStep"; } - data_source_encoded_ = httpclient__->UrlEscape(source_credentials_.data_source); + service_request_ = std::unique_ptr<ServiceRequest> {new ServiceRequest(endpoint_, source_credentials_)}; request_sender_details_prefix_ = source_credentials_.instance_id + "§" + @@ -191,85 +77,6 @@ NetworkConnectionType ConsumerImpl::CurrentConnectionType() const { return current_connection_type_; } -std::string ConsumerImpl::RequestWithToken(std::string uri) { - return std::move(uri) + "?token=" + source_credentials_.user_token; -} - -Error ConsumerImpl::ProcessPostRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code) { - Error err; - switch (request.output_mode) { - case OutputDataMode::string: - response->string_output = - httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, - request.cookie, - request.body, - code, - &err); - break; - case OutputDataMode::array: - err = - httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, request.cookie, - request.body, &response->data_output, response->data_output_size, code); - break; - default: - break; - } - return err; -} - -Error ConsumerImpl::ProcessGetRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code) { - Error err; - response->string_output = - httpclient__->Get(RequestWithToken(request.host + request.api) + request.extra_params, code, &err); - return err; -} - -Error ConsumerImpl::ProcessRequest(RequestOutput* response, const RequestInfo& request, std::string* service_uri) { - Error err; - HttpCode code; - if (request.post) { - err = ProcessPostRequest(request, response, &code); - } else { - err = ProcessGetRequest(request, response, &code); - } - if (err && service_uri) { - service_uri->clear(); - } - return ProcessRequestResponce(request, std::move(err), response, code); -} - -RequestInfo ConsumerImpl::GetDiscoveryRequest(const std::string& service_name) const { - RequestInfo ri; - ri.host = endpoint_; - ri.api = "/asapo-discovery/" + kConsumerProtocol.GetDiscoveryVersion() + "/" + service_name; - ri.extra_params = "&protocol=" + kConsumerProtocol.GetVersion(); - return ri; -} - -Error ConsumerImpl::ProcessDiscoverServiceResult(Error err, std::string* uri_to_set) { - if (err != nullptr || uri_to_set->empty()) { - uri_to_set->clear(); - if (err == ConsumerErrorTemplates::kUnsupportedClient) { - return err; - } - auto ret_err = ConsumerErrorTemplates::kUnavailableService.Generate(std::move(err)); - ret_err->AddDetails("destination",endpoint_); - return ret_err; - } - return nullptr; -} - -Error ConsumerImpl::DiscoverService(const std::string& service_name, std::string* uri_to_set) { - if (!uri_to_set->empty()) { - return nullptr; - } - auto ri = GetDiscoveryRequest(service_name); - RequestOutput output; - auto err = ProcessRequest(&output, ri, nullptr); - *uri_to_set = std::move(output.string_output); - return ProcessDiscoverServiceResult(std::move(err), uri_to_set); -} - bool ConsumerImpl::SwitchToGetByIdIfPartialData(Error* err, const std::string& response, std::string* group_id, @@ -305,17 +112,6 @@ bool ConsumerImpl::SwitchToGetByIdIfNoData(Error* err, return false; } -RequestInfo ConsumerImpl::PrepareRequestInfo(std::string api_url, bool dataset, uint64_t min_size) { - RequestInfo ri; - ri.host = current_broker_uri_; - ri.api = std::move(api_url); - if (dataset) { - ri.extra_params = "&dataset=true"; - ri.extra_params += "&minsize=" + std::to_string(min_size); - } - return ri; -} - Error ConsumerImpl::GetRecordFromServer(std::string* response, std::string group_id, std::string stream, GetMessageServerOperation op, bool dataset, uint64_t min_size) { @@ -326,9 +122,6 @@ Error ConsumerImpl::GetRecordFromServer(std::string* response, std::string group interrupt_flag_ = false; std::string request_suffix = OpToUriCmd(op); - std::string request_group = OpToUriCmd(op); - - auto baseRequestInfo = CreateBrokerApiRequest(std::move(stream), "", ""); Error no_data_error; auto start = std::chrono::steady_clock::now(); bool timeout_triggered = false; @@ -337,26 +130,25 @@ Error ConsumerImpl::GetRecordFromServer(std::string* response, std::string group return ConsumerErrorTemplates::kInterruptedTransaction.Generate("interrupted by user request"); } - auto err = DiscoverService(kBrokerServiceName, ¤t_broker_uri_); - if (err == nullptr) { - auto ri = PrepareRequestInfo(baseRequestInfo.api + "/" + httpclient__->UrlEscape(group_id) + "/" + request_suffix, dataset, - min_size); - ri.extra_params += baseRequestInfo.extra_params; - if (request_suffix == "next" && resend_) { + auto ri = service_request_->CreateBrokerApiRequest(stream, group_id, request_suffix); + if (dataset) { + ri.extra_params += "&dataset=true"; + ri.extra_params += "&minsize=" + std::to_string(min_size); + } + if (request_suffix == "next" && resend_) { ri.extra_params = ri.extra_params + "&resend_nacks=true" + "&delay_ms=" + std::to_string(delay_ms_) + "&resend_attempts=" + std::to_string(resend_attempts_); - } - if (op == GetMessageServerOperation::GetNextAvailable) { - ri.extra_params += "&id_key=time_id"; - } else { - ri.extra_params += "&id_key=_id"; - } - RequestOutput output; - err = ProcessRequest(&output, ri, ¤t_broker_uri_); - *response = std::move(output.string_output); - if (err == nullptr) { - break; - } + } + if (op == GetMessageServerOperation::GetNextAvailable) { + ri.extra_params += "&id_key=time_id"; + } else { + ri.extra_params += "&id_key=_id"; + } + RequestOutput output; + auto err = service_request_->ServiceRequestWithoutTimeout(ServiceName::kBroker, ¤t_broker_uri_, ri, &output); + *response = std::move(output.string_output); + if (err == nullptr) { + break; } if (err == ConsumerErrorTemplates::kStreamFinished) { @@ -590,35 +382,7 @@ std::string ConsumerImpl::GenerateNewGroupId(Error* err) { RequestInfo ri; ri.api = "/" + kConsumerProtocol.GetBrokerVersion() + "/creategroup"; ri.post = true; - return BrokerRequestWithTimeout(ri, err); -} - -Error ConsumerImpl::ServiceRequestWithTimeout(const std::string& service_name, - std::string* service_uri, - RequestInfo request, - RequestOutput* response) { - interrupt_flag_ = false; - uint64_t elapsed_ms = 0; - Error err; - while (elapsed_ms <= timeout_ms_) { - if (interrupt_flag_) { - err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("interrupted by user request"); - break; - } - auto start = std::chrono::steady_clock::now(); - err = DiscoverService(service_name, service_uri); - if (err == nullptr) { - request.host = *service_uri; - err = ProcessRequest(response, request, service_uri); - if (err == nullptr || err == ConsumerErrorTemplates::kWrongInput) { - break; - } - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - elapsed_ms += static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::milliseconds> - (std::chrono::steady_clock::now() - start).count()); - } - return err; + return service_request_->BrokerRequestWithTimeout(ri, err, timeout_ms_); } Error ConsumerImpl::FtsSizeRequestWithTimeout(MessageMeta* info) { @@ -626,7 +390,7 @@ Error ConsumerImpl::FtsSizeRequestWithTimeout(MessageMeta* info) { ri.extra_params += "&sizeonly=true"; ri.output_mode = OutputDataMode::string; RequestOutput response; - auto err = ServiceRequestWithTimeout(kFileTransferServiceName, ¤t_fts_uri_, ri, &response); + auto err = service_request_->ServiceRequestWithTimeout(ServiceName::kFileTransfer, ¤t_fts_uri_, ri, &response, timeout_ms_); if (err) { return err; } @@ -639,17 +403,17 @@ Error ConsumerImpl::FtsSizeRequestWithTimeout(MessageMeta* info) { Error ConsumerImpl::FtsRequestWithTimeout(MessageMeta* info, MessageData* data) { RequestInfo ri = CreateFileTransferRequest(info); if (use_new_api_format_) { - ri.extra_params += "&instanceid=" + httpclient__->UrlEscape(source_credentials_.instance_id); - ri.extra_params += "&pipelinestep=" + httpclient__->UrlEscape(source_credentials_.pipeline_step); - ri.extra_params += "&beamtime=" + httpclient__->UrlEscape(source_credentials_.beamtime_id); - ri.extra_params += "&stream=" + httpclient__->UrlEscape(info->stream); - ri.extra_params += "&source=" + httpclient__->UrlEscape(info->source); + ri.extra_params += "&instanceid=" + service_request_->UrlEscape(source_credentials_.instance_id); + ri.extra_params += "&pipelinestep=" + service_request_->UrlEscape(source_credentials_.pipeline_step); + ri.extra_params += "&beamtime=" + service_request_->UrlEscape(source_credentials_.beamtime_id); + ri.extra_params += "&stream=" + service_request_->UrlEscape(info->stream); + ri.extra_params += "&source=" + service_request_->UrlEscape(info->source); } RequestOutput response; response.data_output_size = info->size; - auto err = ServiceRequestWithTimeout(kFileTransferServiceName, ¤t_fts_uri_, ri, &response); + auto err = service_request_->ServiceRequestWithTimeout(ServiceName::kFileTransfer, ¤t_fts_uri_, ri, &response, timeout_ms_); if (err) { return err; } @@ -667,24 +431,18 @@ RequestInfo ConsumerImpl::CreateFileTransferRequest(const MessageMeta* info) con return ri; } -std::string ConsumerImpl::BrokerRequestWithTimeout(RequestInfo request, Error* err) { - RequestOutput response; - *err = ServiceRequestWithTimeout(kBrokerServiceName, ¤t_broker_uri_, request, &response); - return std::move(response.string_output); -} - Error ConsumerImpl::ResetLastReadMarker(std::string group_id, std::string stream) { return SetLastReadMarker(group_id, 0, stream); } Error ConsumerImpl::SetLastReadMarker(std::string group_id, uint64_t value, std::string stream) { - RequestInfo ri = CreateBrokerApiRequest(std::move(stream), std::move(group_id), "resetcounter"); + RequestInfo ri = service_request_->CreateBrokerApiRequest(std::move(stream), std::move(group_id), "resetcounter"); ri.extra_params = "&value=" + std::to_string(value); ri.post = true; Error err; - BrokerRequestWithTimeout(ri, &err); + service_request_->BrokerRequestWithTimeout(ri, &err, timeout_ms_); return err; } @@ -707,7 +465,7 @@ Error ConsumerImpl::GetRecordFromServerById(uint64_t id, std::string* response, return ConsumerErrorTemplates::kWrongInput.Generate("empty stream"); } - RequestInfo ri = CreateBrokerApiRequest(std::move(stream), std::move(group_id), std::to_string(id)); + RequestInfo ri = service_request_->CreateBrokerApiRequest(std::move(stream), std::move(group_id), std::to_string(id)); if (dataset) { @@ -716,20 +474,20 @@ Error ConsumerImpl::GetRecordFromServerById(uint64_t id, std::string* response, } Error err; - *response = BrokerRequestWithTimeout(ri, &err); + *response = service_request_->BrokerRequestWithTimeout(ri, &err, timeout_ms_); return err; } std::string ConsumerImpl::GetBeamtimeMeta(Error* err) { - RequestInfo ri = CreateBrokerApiRequest("default", "0", "meta/0"); + RequestInfo ri = service_request_->CreateBrokerApiRequest("default", "0", "meta/0"); - return BrokerRequestWithTimeout(ri, err); + return service_request_->BrokerRequestWithTimeout(ri, err, timeout_ms_); } std::string ConsumerImpl::GetStreamMeta(const std::string& stream, Error* err) { - RequestInfo ri = CreateBrokerApiRequest(stream, "0", "meta/1"); + RequestInfo ri = service_request_->CreateBrokerApiRequest(stream, "0", "meta/1"); - return BrokerRequestWithTimeout(ri, err); + return service_request_->BrokerRequestWithTimeout(ri, err, timeout_ms_); } @@ -750,12 +508,12 @@ MessageMetas ConsumerImpl::QueryMessages(std::string query, std::string stream, return {}; } - RequestInfo ri = CreateBrokerApiRequest(std::move(stream), "0", "querymessages"); + RequestInfo ri = service_request_->CreateBrokerApiRequest(std::move(stream), "0", "querymessages"); ri.post = true; ri.body = std::move(query); - auto response = BrokerRequestWithTimeout(ri, err); + auto response = service_request_->BrokerRequestWithTimeout(ri, err, timeout_ms_); if (*err) { return MessageMetas{}; } @@ -867,7 +625,7 @@ StreamInfos ConsumerImpl::GetStreamList(std::string from, StreamFilter filter, E StreamInfos ConsumerImpl::GetStreamList(std::string from, StreamFilter filter, bool detailed, Error* err) { RequestInfo ri = GetStreamListRequest(from, filter, detailed); - auto response = BrokerRequestWithTimeout(ri, err); + auto response = service_request_->BrokerRequestWithTimeout(ri, err, timeout_ms_); if (*err) { return StreamInfos{}; } @@ -875,10 +633,10 @@ StreamInfos ConsumerImpl::GetStreamList(std::string from, StreamFilter filter, b } SourceList ConsumerImpl::GetSourceList(Error* err) { - RequestInfo ri = CreateBrokerApiRequest("0", "", "sources"); + RequestInfo ri = service_request_->CreateBrokerApiRequest("0", "", "sources"); ri.post = false; - auto json_string = BrokerRequestWithTimeout(ri, err); + auto json_string = service_request_->BrokerRequestWithTimeout(ri, err, timeout_ms_); if (*err) { return SourceList{}; } @@ -891,10 +649,10 @@ SourceList ConsumerImpl::GetSourceList(Error* err) { } RequestInfo ConsumerImpl::GetStreamListRequest(const std::string& from, const StreamFilter& filter, bool detailed) const { - RequestInfo ri = CreateBrokerApiRequest("0", "", "streams"); + RequestInfo ri = service_request_->CreateBrokerApiRequest("0", "", "streams"); ri.post = false; if (!from.empty()) { - ri.extra_params = "&from=" + httpclient__->UrlEscape(from); + ri.extra_params = "&from=" + service_request_->UrlEscape(from); } ri.extra_params += "&filter=" + filterToString(filter); std::string detailed_str = detailed ? "true" : "false"; @@ -910,7 +668,7 @@ Error ConsumerImpl::UpdateFolderTokenIfNeeded(bool ignore_existing) { RequestOutput output; RequestInfo ri = CreateFolderTokenRequest(); - auto err = ProcessRequest(&output, ri, nullptr); + auto err = service_request_->ProcessRequest(&output, ri, nullptr); if (err) { return err; } @@ -960,12 +718,12 @@ Error ConsumerImpl::Acknowledge(std::string group_id, uint64_t id, std::string s if (stream.empty()) { return ConsumerErrorTemplates::kWrongInput.Generate("empty stream"); } - RequestInfo ri = CreateBrokerApiRequest(std::move(stream), std::move(group_id), std::to_string(id)); + RequestInfo ri = service_request_->CreateBrokerApiRequest(std::move(stream), std::move(group_id), std::to_string(id)); ri.post = true; ri.body = "{\"Op\":\"ackmessage\"}"; Error err; - BrokerRequestWithTimeout(ri, &err); + service_request_->BrokerRequestWithTimeout(ri, &err, timeout_ms_); return err; } @@ -978,10 +736,10 @@ IdList ConsumerImpl::GetUnacknowledgedMessages(std::string group_id, *error = ConsumerErrorTemplates::kWrongInput.Generate("empty stream"); return {}; } - RequestInfo ri = CreateBrokerApiRequest(std::move(stream), std::move(group_id), "nacks"); + RequestInfo ri = service_request_->CreateBrokerApiRequest(std::move(stream), std::move(group_id), "nacks"); ri.extra_params = "&from=" + std::to_string(from_id) + "&to=" + std::to_string(to_id); - auto json_string = BrokerRequestWithTimeout(ri, error); + auto json_string = service_request_->BrokerRequestWithTimeout(ri, error, timeout_ms_); if (*error) { return IdList{}; } @@ -1000,9 +758,9 @@ uint64_t ConsumerImpl::GetLastAcknowledgedMessage(std::string group_id, std::str *error = ConsumerErrorTemplates::kWrongInput.Generate("empty stream"); return 0; } - RequestInfo ri = CreateBrokerApiRequest(std::move(stream), std::move(group_id), "lastack"); + RequestInfo ri = service_request_->CreateBrokerApiRequest(std::move(stream), std::move(group_id), "lastack"); - auto json_string = BrokerRequestWithTimeout(ri, error); + auto json_string = service_request_->BrokerRequestWithTimeout(ri, error, timeout_ms_); if (*error) { return 0; } @@ -1032,16 +790,17 @@ Error ConsumerImpl::NegativeAcknowledge(std::string group_id, if (stream.empty()) { return ConsumerErrorTemplates::kWrongInput.Generate("empty stream"); } - RequestInfo ri = CreateBrokerApiRequest(std::move(stream), std::move(group_id), std::to_string(id)); + RequestInfo ri = service_request_->CreateBrokerApiRequest(std::move(stream), std::move(group_id), std::to_string(id)); ri.post = true; ri.body = R"({"Op":"negackmessage","Params":{"DelayMs":)" + std::to_string(delay_ms) + "}}"; Error err; - BrokerRequestWithTimeout(ri, &err); + service_request_->BrokerRequestWithTimeout(ri, &err, timeout_ms_); return err; } void ConsumerImpl::InterruptCurrentOperation() { interrupt_flag_ = true; + service_request_->InterruptCurrentOperation(); } uint64_t ConsumerImpl::GetCurrentDatasetCount(std::string stream, bool include_incomplete, Error* err) { @@ -1056,7 +815,7 @@ RequestInfo ConsumerImpl::GetSizeRequestForDatasetStream(std::string& stream, bo } uint64_t ConsumerImpl::GetCurrentCount(const RequestInfo& ri, Error* err) { - auto responce = BrokerRequestWithTimeout(ri, err); + auto responce = service_request_->BrokerRequestWithTimeout(ri, err, timeout_ms_); if (*err) { return 0; } @@ -1073,28 +832,10 @@ uint64_t ConsumerImpl::ParseGetCurrentCountResponce(Error* err, const std::strin } RequestInfo ConsumerImpl::GetSizeRequestForSingleMessagesStream(std::string& stream) const { - RequestInfo ri = CreateBrokerApiRequest(std::move(stream), "", "size"); + RequestInfo ri = service_request_->CreateBrokerApiRequest(std::move(stream), "", "size"); return ri; } -RequestInfo ConsumerImpl::GetVersionRequest() const { - RequestInfo ri; - ri.host = endpoint_; - ri.api = "/asapo-discovery/" + kConsumerProtocol.GetDiscoveryVersion() + "/version"; - ri.extra_params = "&client=consumer&protocol=" + kConsumerProtocol.GetVersion(); - return ri; -} - -Error ConsumerImpl::GetServerVersionInfo(std::string* server_info, bool* supported) { - auto ri = GetVersionRequest(); - RequestOutput output; - auto err = ProcessRequest(&output, ri, nullptr); - if (err) { - return err; - } - return ExtractVersionFromResponse(output.string_output, "consumer", server_info, supported); -} - Error ConsumerImpl::GetVersionInfo(std::string* client_info, std::string* server_info, bool* supported) { if (client_info == nullptr && server_info == nullptr && supported == nullptr) { return ConsumerErrorTemplates::kWrongInput.Generate("missing parameters"); @@ -1105,14 +846,14 @@ Error ConsumerImpl::GetVersionInfo(std::string* client_info, std::string* server } if (server_info != nullptr || supported != nullptr) { - return GetServerVersionInfo(server_info, supported); + return service_request_->GetServerVersionInfo("consumer", kConsumerProtocol.GetVersion(), server_info, supported); } return nullptr; } RequestInfo ConsumerImpl::GetDeleteStreamRequest(std::string stream, DeleteStreamOptions options) const { - RequestInfo ri = CreateBrokerApiRequest(std::move(stream), "", "delete"); + RequestInfo ri = service_request_->CreateBrokerApiRequest(std::move(stream), "", "delete"); ri.post = true; ri.body = options.Json(); return ri; @@ -1121,41 +862,16 @@ RequestInfo ConsumerImpl::GetDeleteStreamRequest(std::string stream, DeleteStrea Error ConsumerImpl::DeleteStream(std::string stream, DeleteStreamOptions options) { auto ri = GetDeleteStreamRequest(std::move(stream), options); Error err; - BrokerRequestWithTimeout(ri, &err); + service_request_->BrokerRequestWithTimeout(ri, &err, timeout_ms_); return err; } Error ConsumerImpl::SetStreamPersistent(std::string stream) { - RequestInfo ri = CreateBrokerApiRequest(std::move(stream), "", "persist"); + RequestInfo ri = service_request_->CreateBrokerApiRequest(std::move(stream), "", "persist"); ri.post = true; Error err; - BrokerRequestWithTimeout(ri, &err); + service_request_->BrokerRequestWithTimeout(ri, &err, timeout_ms_); return err; } - -RequestInfo ConsumerImpl::CreateBrokerApiRequest(std::string stream, std::string group, std::string suffix) const { - auto stream_encoded = httpclient__->UrlEscape(std::move(stream)); - auto group_encoded = group.size() > 0 ? httpclient__->UrlEscape(std::move(group)) : ""; - auto uri = "/" + kConsumerProtocol.GetBrokerVersion() + "/beamtime/" + source_credentials_.beamtime_id + "/" - + data_source_encoded_ + "/" + stream_encoded; - if (group_encoded.size() > 0) { - uri = uri + "/" + group_encoded; - } - if (suffix.size() > 0) { - uri = uri + "/" + suffix; - } - - RequestInfo ri; - ri.api = uri; - - if (use_new_api_format_) { - ri.extra_params += "&instanceid=" + httpclient__->UrlEscape(source_credentials_.instance_id); - ri.extra_params += "&pipelinestep=" + httpclient__->UrlEscape(source_credentials_.pipeline_step); - } - - return ri; - -} - } diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index 63bc6c9e091541ea6437ab0d8b3a9efd688aefd5..75c87f5b0f078241e61959fc173af5b1eec3feda 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -7,6 +7,7 @@ #include "asapo/consumer/consumer.h" #include "asapo/io/io.h" #include "asapo/http_client/http_client.h" +#include "asapo/http_request/service_request.h" #include "net_client.h" namespace asapo { @@ -19,39 +20,6 @@ enum class GetMessageServerOperation { GetLastInGroup, }; -enum class OutputDataMode { - string, - array, - file -}; - -struct RequestInfo { - std::string host; - std::string api; - std::string extra_params; - std::string body; - std::string cookie; - OutputDataMode output_mode = OutputDataMode::string; - bool post = false; -}; - -struct RequestOutput { - std::string string_output; - MessageData data_output; - uint64_t data_output_size; - const char* to_string() const { - if (!data_output) { - return string_output.c_str(); - } else { - return reinterpret_cast<char const*>(data_output.get()) ; - } - } -}; - -Error ProcessRequestResponce(const RequestInfo& request, Error server_err, const RequestOutput* response, - const HttpCode& code); -Error ConsumerErrorFromNoDataResponse(const std::string& response); -Error ConsumerErrorFromPartialDataResponse(const std::string& response); DataSet DecodeDatasetFromResponse(std::string response, Error* err); class ConsumerImpl final : public asapo::Consumer { public: @@ -120,16 +88,14 @@ class ConsumerImpl final : public asapo::Consumer { std::unique_ptr<IO> io__; // modified in testings to mock system calls,otherwise do not touch - std::unique_ptr<HttpClient> httpclient__; std::unique_ptr<NetClient> net_client__; + std::unique_ptr<ServiceRequest> service_request_; std::mutex net_client_mutex__; // Required for the lazy initialization of net_client private: - Error ProcessDiscoverServiceResult(Error err, std::string* uri_to_set); Error GetDataFromFileTransferService(MessageMeta* info, MessageData* data, bool retry_with_new_token); Error GetDataFromFile(MessageMeta* info, MessageData* data); static const std::string kBrokerServiceName; static const std::string kFileTransferServiceName; - std::string RequestWithToken(std::string uri); Error GetRecordFromServer(std::string* info, std::string group_id, std::string stream, GetMessageServerOperation op, bool dataset = false, uint64_t min_size = 0); Error GetRecordFromServerById(uint64_t id, std::string* info, std::string group_id, std::string stream, @@ -139,7 +105,6 @@ class ConsumerImpl final : public asapo::Consumer { bool SwitchToGetByIdIfNoData(Error* err, const std::string& response, std::string* group_id, std::string* redirect_uri); bool SwitchToGetByIdIfPartialData(Error* err, const std::string& response, std::string* group_id, std::string* redirect_uri); - Error ProcessRequest(RequestOutput* response, const RequestInfo& request, std::string* service_uri); Error GetMessageFromServer(GetMessageServerOperation op, uint64_t id, std::string group_id, std::string stream, MessageMeta* info, MessageData* data); DataSet GetDatasetFromServer(GetMessageServerOperation op, uint64_t id, std::string group_id, std::string stream, @@ -147,21 +112,13 @@ class ConsumerImpl final : public asapo::Consumer { bool DataCanBeInBuffer(const MessageMeta* info); Error TryGetDataFromBuffer(const MessageMeta* info, MessageData* data); Error CreateNetClientAndTryToGetFile(const MessageMeta* info, const std::string& request_sender_details, MessageData* data); - Error ServiceRequestWithTimeout(const std::string& service_name, std::string* service_uri, RequestInfo request, - RequestOutput* response); - std::string BrokerRequestWithTimeout(RequestInfo request, Error* err); Error FtsRequestWithTimeout(MessageMeta* info, MessageData* data); Error FtsSizeRequestWithTimeout(MessageMeta* info); - Error ProcessPostRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code); - Error ProcessGetRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code); - RequestInfo PrepareRequestInfo(std::string api_url, bool dataset, uint64_t min_size); std::string OpToUriCmd(GetMessageServerOperation op); Error UpdateFolderTokenIfNeeded(bool ignore_existing); uint64_t GetCurrentCount(const RequestInfo& ri, Error* err); RequestInfo GetStreamListRequest(const std::string& from, const StreamFilter& filter, bool detailed) const; - Error GetServerVersionInfo(std::string* server_info, bool* supported) ; - RequestInfo CreateBrokerApiRequest(std::string stream, std::string group, std::string suffix) const; std::string endpoint_; std::string current_broker_uri_; @@ -169,7 +126,6 @@ class ConsumerImpl final : public asapo::Consumer { std::string source_path_; bool has_filesystem_; SourceCredentials source_credentials_; - std::string data_source_encoded_; bool use_new_api_format_ = true; std::string request_sender_details_prefix_; uint64_t timeout_ms_ = 0; @@ -186,8 +142,6 @@ class ConsumerImpl final : public asapo::Consumer { RequestInfo GetSizeRequestForSingleMessagesStream(std::string& stream) const; RequestInfo GetSizeRequestForDatasetStream(std::string& stream, bool include_incomplete) const; uint64_t ParseGetCurrentCountResponce(Error* err, const std::string& responce) const; - RequestInfo GetDiscoveryRequest(const std::string& service_name) const; - RequestInfo GetVersionRequest() const; RequestInfo GetDeleteStreamRequest(std::string stream, DeleteStreamOptions options) const; }; diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp index 3922d85a58feb664f477756e4afea2d937a3a2b4..73d32f76e69b694bbce0a5ac47a2fb0fd719ed58 100644 --- a/consumer/api/cpp/unittests/test_consumer_impl.cpp +++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp @@ -51,7 +51,6 @@ TEST(FolderDataBroker, Constructor) { "instance", "step", "beamtime_id", "", "", "token"}) }; ASSERT_THAT(dynamic_cast<asapo::SystemIO*>(consumer->io__.get()), Ne(nullptr)); - ASSERT_THAT(dynamic_cast<asapo::CurlHttpClient*>(consumer->httpclient__.get()), Ne(nullptr)); ASSERT_THAT(consumer->net_client__.get(), Eq(nullptr)); } @@ -131,11 +130,11 @@ class ConsumerImplTests : public Test { expected_data_source, expected_token}) }; consumer->io__ = std::unique_ptr<IO> {&mock_io}; - consumer->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; consumer->net_client__ = std::unique_ptr<asapo::NetClient> {&mock_netclient}; + consumer->service_request_->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; fts_consumer->io__ = std::unique_ptr<IO> {&mock_io}; - fts_consumer->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; fts_consumer->net_client__ = std::unique_ptr<asapo::NetClient> {&mock_netclient}; + fts_consumer->service_request_->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; { ON_CALL(mock_http_client, UrlEscape_t(expected_instance_id)).WillByDefault(Return(expected_instance_id_encoded)); @@ -156,12 +155,12 @@ class ConsumerImplTests : public Test { } void TearDown() override { consumer->io__.release(); - consumer->httpclient__.release(); consumer->net_client__.release(); + consumer->service_request_.release(); fts_consumer->io__.release(); - fts_consumer->httpclient__.release(); fts_consumer->net_client__.release(); + fts_consumer->service_request_.release(); Mock::VerifyAndClear(&mock_io); Mock::VerifyAndClear(&mock_http_client); @@ -234,8 +233,8 @@ class ConsumerImplTests : public Test { void CheckDefaultingOfCredentials(asapo::SourceCredentials credentials, std::string expectedUrlPath) { consumer->io__.release(); - consumer->httpclient__.release(); consumer->net_client__.release(); + consumer->service_request_.release(); consumer = std::unique_ptr<ConsumerImpl> { new ConsumerImpl(expected_server_uri, expected_path, @@ -243,8 +242,8 @@ class ConsumerImplTests : public Test { std::move(credentials)) }; consumer->io__ = std::unique_ptr<IO> {&mock_io}; - consumer->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; consumer->net_client__ = std::unique_ptr<asapo::NetClient> {&mock_netclient}; + consumer->service_request_->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; MockGetBrokerUri(); EXPECT_CALL(mock_http_client, @@ -1067,9 +1066,9 @@ TEST_F(ConsumerImplTests, GetNextDatasetUsesCorrectUri) { EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/stream/" + - expected_group_id_encoded + "/next?token=" - + expected_token + "&dataset=true&minsize=0" + expected_group_id_encoded + "/next?token=" + expected_token + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded + + "&dataset=true&minsize=0" + "&id_key=_id", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), @@ -1205,10 +1204,9 @@ TEST_F(ConsumerImplTests, GetLastDatasetUsesCorrectUri) { EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" + - expected_stream_encoded + "/0/last?token=" - + expected_token + "&dataset=true&minsize=1" - + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded - + "&id_key=_id", _, + expected_stream_encoded + "/0/last?token=" + expected_token + + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded + + "&dataset=true&minsize=1" + "&id_key=_id", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), @@ -1222,11 +1220,10 @@ TEST_F(ConsumerImplTests, GetLastDatasetInGroupUsesCorrectUri) { EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" - + - expected_stream_encoded + "/" + expected_group_id_encoded + "/groupedlast?token=" - + expected_token + "&dataset=true&minsize=1" + + expected_stream_encoded + "/" + expected_group_id_encoded + "/groupedlast?token=" + + expected_token + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded - + "&id_key=_id", _, + + "&dataset=true&minsize=1" + "&id_key=_id", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), diff --git a/producer/api/cpp/CMakeLists.txt b/producer/api/cpp/CMakeLists.txt index 7701657c2785e9a31bc8ec7b07512027adaf2ac5..abfda740922c51d7c8f0e8521cce76616ac1b232 100644 --- a/producer/api/cpp/CMakeLists.txt +++ b/producer/api/cpp/CMakeLists.txt @@ -23,7 +23,7 @@ target_include_directories(producer_lib_objects SYSTEM PRIVATE ${LIBFABRIC_INCLU if (BUILD_STATIC_CLIENT_LIBS) add_library(${TARGET_NAME} STATIC $<TARGET_OBJECTS:producer_lib_objects> $<TARGET_OBJECTS:system_io> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:json_parser> - $<TARGET_OBJECTS:curl_http_client> $<TARGET_OBJECTS:request_pool> $<TARGET_OBJECTS:data_structs> $<TARGET_OBJECTS:version> $<TARGET_OBJECTS:common>) + $<TARGET_OBJECTS:curl_http_client> $<TARGET_OBJECTS:request_pool> $<TARGET_OBJECTS:http_request> $<TARGET_OBJECTS:data_structs> $<TARGET_OBJECTS:version> $<TARGET_OBJECTS:common>) target_include_directories(${TARGET_NAME} PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>) target_include_directories(${TARGET_NAME} PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../c/include>) target_include_directories(${TARGET_NAME} PUBLIC $<BUILD_INTERFACE:${ASAPO_CXX_COMMON_INCLUDE_DIR}>) @@ -49,7 +49,7 @@ endif() if (BUILD_SHARED_CLIENT_LIBS) add_library(${TARGET_NAME}_shared SHARED $<TARGET_OBJECTS:producer_lib_objects> $<TARGET_OBJECTS:system_io> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:json_parser> - $<TARGET_OBJECTS:curl_http_client> $<TARGET_OBJECTS:request_pool> $<TARGET_OBJECTS:data_structs> $<TARGET_OBJECTS:version> $<TARGET_OBJECTS:common>) + $<TARGET_OBJECTS:curl_http_client> $<TARGET_OBJECTS:request_pool> $<TARGET_OBJECTS:http_request> $<TARGET_OBJECTS:data_structs> $<TARGET_OBJECTS:version> $<TARGET_OBJECTS:common>) target_include_directories(${TARGET_NAME}_shared PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include ${ASAPO_CXX_COMMON_INCLUDE_DIR}) target_link_libraries(${TARGET_NAME}_shared CURL::libcurl ${CMAKE_THREAD_LIBS_INIT}) diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index 8da19019f32e1b782c692c119b505d90a3bb7a02..2ebe214ad183e18ff539c04363ed152e96fd2357 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -12,14 +12,35 @@ #include "asapo/http_client/http_client.h" #include "asapo/common/internal/version.h" #include <asapo/io/io_factory.h> +#include "asapo/http_request/consumer_error.h" namespace asapo { const size_t ProducerImpl::kDiscoveryServiceUpdateFrequencyMs = 10000; // 10s +void TranslateProducerError(Error* consumer_error){ + + if (*consumer_error && (*consumer_error) == ConsumerErrorTemplates::kWrongInput) { + *consumer_error = ProducerErrorTemplates::kWrongInput.Generate((*consumer_error)->Explain()); + } else if (*consumer_error && *consumer_error == ConsumerErrorTemplates::kUnsupportedClient) { + *consumer_error = ProducerErrorTemplates::kUnsupportedClient.Generate((*consumer_error)->Explain()); + } else if (*consumer_error && *consumer_error == ConsumerErrorTemplates::kUnavailableService) { + *consumer_error = ProducerErrorTemplates::kTimeout.Generate((*consumer_error)->Explain()); + } else if // ToDo This error may be confusing in case if user cancelling the request via InterruptCurrentOperation + (*consumer_error && *consumer_error == ConsumerErrorTemplates::kInterruptedTransaction) { + *consumer_error = ProducerErrorTemplates::kInternalServerError.Generate((*consumer_error)->Explain()); + } else if (*consumer_error && *consumer_error == ConsumerErrorTemplates::kNoData) { + *consumer_error = ProducerErrorTemplates::kTimeout.Generate((*consumer_error)->Explain()); + } else if (*consumer_error) { + *consumer_error = ProducerErrorTemplates::kInternalServerError.Generate((*consumer_error)->Explain()); + } +} + + + ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads, uint64_t timeout_ms, asapo::RequestHandlerType type) : - log__{GetDefaultProducerLogger()}, io__{GenerateDefaultIO()}, httpclient__{DefaultHttpClient()}, timeout_ms_{timeout_ms}, endpoint_{endpoint} { + log__{GetDefaultProducerLogger()}, io__{GenerateDefaultIO()}, timeout_ms_{timeout_ms}, endpoint_{endpoint} { switch (type) { case RequestHandlerType::kTcp: discovery_service_.reset(new ReceiverDiscoveryService{endpoint, @@ -270,6 +291,7 @@ Error ProducerImpl::SetCredentials(SourceCredentials source_cred) { if (!err) { last_creds_.reset(new SourceCredentials{source_cred}); } + service_request_ = std::unique_ptr<ServiceRequest> {new ServiceRequest(endpoint_, source_cred)}; return err; } @@ -502,36 +524,36 @@ Error ProducerImpl::GetVersionInfo(std::string* client_info, std::string* server if (client_info == nullptr && server_info == nullptr && supported == nullptr) { return ProducerErrorTemplates::kWrongInput.Generate("missing parameters"); } + if (client_info != nullptr) { *client_info = - "software version: " + std::string(kVersion) + ", producer protocol: " + kProducerProtocol.GetVersion(); + "software version: " + std::string(kVersion) + ", producer protocol: " + kProducerProtocol.GetVersion() + + ", consumer protocol: " + kConsumerProtocol.GetVersion(); } if (server_info != nullptr || supported != nullptr) { - return GetServerVersionInfo(server_info, supported); - } - return nullptr; -} + std::string producer_server_info, consumer_server_info ; + bool producer_supported, consumer_supported; -Error ProducerImpl::GetServerVersionInfo(std::string* server_info, - bool* supported) const { - auto endpoint = endpoint_ + "/asapo-discovery/" + kProducerProtocol.GetDiscoveryVersion() + - "/version?client=producer&protocol=" + kProducerProtocol.GetVersion(); - HttpCode code; - Error err; - auto response = httpclient__->Get(endpoint, &code, &err); - if (err) { - return err; + service_request_->GetServerVersionInfo("producer", kProducerProtocol.GetVersion(), &producer_server_info, &producer_supported); + service_request_->GetServerVersionInfo("consumer", kConsumerProtocol.GetVersion(), &consumer_server_info, &consumer_supported); + + *server_info = producer_server_info + consumer_server_info; + + if (supported) { + *supported = (producer_supported && consumer_supported); + } } - return ExtractVersionFromResponse(response, "producer", server_info, supported); + return nullptr; } Error ProducerImpl::DeleteStream(std::string stream, uint64_t timeout_ms, DeleteStreamOptions options) const { - GenericRequestHeader header{kOpcodeDeleteStream, 0, 0, 0, "", stream}; - header.custom_data[0] = options.Encode(); - + RequestInfo ri = service_request_->CreateBrokerApiRequest(std::move(stream), "", "delete"); + ri.post = true; + ri.body = options.Json(); Error err; - BlockingRequest(std::move(header), timeout_ms, &err); + service_request_->BrokerRequestWithTimeout(ri, &err, timeout_ms); + TranslateProducerError(&err); return err; } @@ -570,21 +592,17 @@ Error ProducerImpl::SendMeta(const std::string& metadata, } std::string ProducerImpl::GetStreamMeta(const std::string& stream, uint64_t timeout_ms, Error* err) const { - return GetMeta(stream, timeout_ms, err); + RequestInfo ri = service_request_->CreateBrokerApiRequest(stream, "0", "meta/1"); + auto result = service_request_->BrokerRequestWithTimeout(ri, err, timeout_ms); + TranslateProducerError(err); + return result; } std::string ProducerImpl::GetBeamtimeMeta(uint64_t timeout_ms, Error* err) const { - return GetMeta("", timeout_ms, err); -} - -std::string ProducerImpl::GetMeta(const std::string& stream, uint64_t timeout_ms, Error* err) const { - GenericRequestHeader header{kOpcodeGetMeta, 0, 0, 0, "", stream}; - auto response = BlockingRequest(std::move(header), timeout_ms, err); - if (*err) { - return ""; - } - *err = nullptr; - return response; + RequestInfo ri = service_request_->CreateBrokerApiRequest("default", "0", "meta/0"); + auto result = service_request_->BrokerRequestWithTimeout(ri, err, timeout_ms); + TranslateProducerError(err); + return result; } } diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h index b9182a9833d9b4f708c1ea0d0b48862791ed0961..1e4cb0f696bb7d34b126da089b176f85122eede3 100644 --- a/producer/api/cpp/src/producer_impl.h +++ b/producer/api/cpp/src/producer_impl.h @@ -7,6 +7,7 @@ #include "asapo/producer/producer.h" #include "asapo/logger/logger.h" #include "asapo/request/request_pool.h" +#include "asapo/http_request/service_request.h" #include "producer_request_handler_factory.h" #include "receiver_discovery_service.h" @@ -63,8 +64,8 @@ class ProducerImpl : public Producer { AbstractLogger* log__; std::unique_ptr<IO> io__; - std::unique_ptr<HttpClient> httpclient__; std::unique_ptr<RequestPool> request_pool__; + std::unique_ptr<ServiceRequest> service_request_; Error SetCredentials(SourceCredentials source_cred) override; @@ -81,7 +82,7 @@ class ProducerImpl : public Producer { void SetRequestsQueueLimits(uint64_t size, uint64_t volume) override; std::string GetStreamMeta(const std::string& stream, uint64_t timeout_ms, Error* err) const override; std::string GetBeamtimeMeta(uint64_t timeout_ms, Error* err) const override; - + private: Error SendMeta(const std::string& metadata, MetaIngestMode mode, @@ -98,9 +99,6 @@ class ProducerImpl : public Producer { std::string source_cred_string_; uint64_t timeout_ms_; std::string endpoint_; - Error GetServerVersionInfo(std::string* server_info, - bool* supported) const; - std::string GetMeta(const std::string& stream, uint64_t timeout_ms, Error* err) const; Error RefreshSourceCredentialString(SourceCredentials source_cred); }; diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index f7edb0eaf0e59b594a8bb38f2c01288d357b5825..31ad0e3be5fa5c0edfce345e143502448cb9bc61 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -10,8 +10,8 @@ #include "../src/request_handler_tcp.h" #include "asapo/request/request_pool_error.h" +#include "asapo/http_request/consumer_error.h" #include "asapo/unittests/MockHttpClient.h" - #include "mocking.h" namespace { @@ -24,6 +24,7 @@ using ::testing::Gt; using ::testing::Eq; using ::testing::Ne; using ::testing::Mock; +using ::testing::NiceMock; using ::testing::InSequence; using ::testing::HasSubstr; using testing::SetArgPointee; @@ -32,6 +33,7 @@ using testing::SetArgPointee; using asapo::RequestPool; using asapo::ProducerRequest; using asapo::MockHttpClient; +using asapo::HttpCode; MATCHER_P10(M_CheckSendRequest, op_code, source_credentials, metadata, file_id, file_size, message, stream, @@ -65,7 +67,6 @@ TEST(ProducerImpl, Constructor) { asapo::ProducerImpl producer{"", 4, 3600000, asapo::RequestHandlerType::kTcp}; ASSERT_THAT(dynamic_cast<asapo::AbstractLogger*>(producer.log__), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::RequestPool*>(producer.request_pool__.get()), Ne(nullptr)); - ASSERT_THAT(dynamic_cast<const asapo::HttpClient*>(producer.httpclient__.get()), Ne(nullptr)); } class ProducerImplTests : public testing::Test { @@ -74,7 +75,7 @@ class ProducerImplTests : public testing::Test { asapo::ProducerRequestHandlerFactory factory{&service}; testing::NiceMock<asapo::MockLogger> mock_logger; testing::NiceMock<MockRequestPool> mock_pull{&factory, &mock_logger}; - std::string expected_server_uri = "127.0.0.1:9400"; + std::string expected_server_uri = "test:8400"; asapo::ProducerImpl producer{expected_server_uri, 1, 3600000, asapo::RequestHandlerType::kTcp}; uint64_t expected_size = 100; uint64_t expected_id = 10; @@ -82,6 +83,23 @@ class ProducerImplTests : public testing::Test { uint64_t expected_dataset_size = 4; uint64_t expected_ingest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly; + std::string expected_token = "token"; + std::string expected_broker_uri = "asapo-broker:5005"; + std::string expected_consumer_protocol = asapo::kConsumerProtocol.GetVersion(); + std::string expected_broker_protocol = asapo::kConsumerProtocol.GetBrokerVersion(); + std::string expected_broker_api = expected_broker_uri + "/" + expected_broker_protocol; + std::string expected_data_source_encoded = "source%2F%24.%3F"; + std::string expected_stream_encoded = "str%20%24%20eam%24"; + + std::string expected_group_id = "groupid$"; + std::string expected_group_id_encoded = "groupid%24"; + std::string expected_data_source = "source/$.?"; + std::string expected_instance_id = "some instance"; + std::string expected_pipeline_step = "a new step"; + std::string expected_instance_id_encoded = "some%20instance"; + std::string expected_pipeline_step_encoded = "a%20new%20step"; + + char expected_name[asapo::kMaxMessageSize] = "test_name"; char expected_stream[asapo::kMaxMessageSize] = "test_stream"; std::string expected_next_stream = "next_stream"; @@ -97,18 +115,52 @@ class ProducerImplTests : public testing::Test { bool expected_managed_memory = true; bool expected_unmanaged_memory = false; - MockHttpClient* mock_http_client; + NiceMock<MockHttpClient> mock_http_client{}; void SetUp() override { producer.log__ = &mock_logger; producer.request_pool__ = std::unique_ptr<RequestPool> {&mock_pull}; - mock_http_client = new MockHttpClient; - producer.httpclient__.reset(mock_http_client); + producer.SetCredentials(expected_credentials); + producer.service_request_->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; + + { + ON_CALL(mock_http_client, UrlEscape_t(expected_instance_id)).WillByDefault(Return(expected_instance_id_encoded)); + ON_CALL(mock_http_client, UrlEscape_t(expected_pipeline_step)).WillByDefault(Return(expected_pipeline_step_encoded)); + ON_CALL(mock_http_client, UrlEscape_t(expected_stream)).WillByDefault(Return(expected_stream_encoded)); + ON_CALL(mock_http_client, UrlEscape_t(expected_group_id)).WillByDefault(Return(expected_group_id_encoded)); + ON_CALL(mock_http_client, UrlEscape_t(expected_data_source)).WillByDefault(Return(expected_data_source_encoded)); + ON_CALL(mock_http_client, UrlEscape_t("0")).WillByDefault(Return("0")); + ON_CALL(mock_http_client, UrlEscape_t("")).WillByDefault(Return("")); + ON_CALL(mock_http_client, UrlEscape_t("default")).WillByDefault(Return("default")); + ON_CALL(mock_http_client, UrlEscape_t("stream")).WillByDefault(Return("stream")); + ON_CALL(mock_http_client, UrlEscape_t("instance")).WillByDefault(Return("instance")); + ON_CALL(mock_http_client, UrlEscape_t("step")).WillByDefault(Return("step")); + ON_CALL(mock_http_client, UrlEscape_t("DefaultStep")).WillByDefault(Return("DefaultStep")); + ON_CALL(mock_http_client, UrlEscape_t("a")).WillByDefault(Return("b")); + ON_CALL(mock_http_client, UrlEscape_t("b")).WillByDefault(Return("b")); + } } void TearDown() override { producer.request_pool__.release(); + producer.service_request_.release(); + + Mock::VerifyAndClear(&mock_http_client); + } + + void MockGetServiceUri(std::string service, std::string result) { + EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/v0.1/" + service + "?token=" + + expected_token + "&protocol=" + expected_consumer_protocol), _, _) + ).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(result))); + } + + void MockGetBrokerUri() { + MockGetServiceUri("asapo-broker", expected_broker_uri); } + }; TEST_F(ProducerImplTests, SendReturnsError) { @@ -470,7 +522,6 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequestWithStream) { TEST_F(ProducerImplTests, ErrorSettingSecondTime) { EXPECT_CALL(mock_logger, Error(testing::HasSubstr("already"))); - producer.SetCredentials(asapo::SourceCredentials{asapo::SourceType::kRaw, "instance", "step", "1", "", "2", "3"}); auto err = producer.SetCredentials(asapo::SourceCredentials{asapo::SourceType::kRaw, "instance", "step", "4", "", "5", "6"}); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); @@ -611,20 +662,31 @@ TEST_F(ProducerImplTests, ReturnDataIfCanotAddToQueue) { TEST_F(ProducerImplTests, GetVersionInfoWithServer) { - std::string result = + std::string result_producer = R"({"softwareVersion":"21.06.0, build 7a9294ad","clientSupported":"no", "clientProtocol":{"versionInfo":"v0.4"}})"; - EXPECT_CALL(*mock_http_client, Get_t(HasSubstr(expected_server_uri + - "/asapo-discovery/v0.1/version?client=producer&protocol=v0.6"), _, _)).WillOnce(DoAll( + std::string result_consumer = + R"({"softwareVersion":"21.06.0, build 7a9294ad","clientSupported":"no", "clientProtocol":{"versionInfo":"v0.5"}})"; + + EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + + "/asapo-discovery/v0.1/version?token=token&client=producer&protocol=v0.6"), _, _)).WillOnce(DoAll( + SetArgPointee<1>(asapo::HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(result_producer))); + + + EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + + "/asapo-discovery/v0.1/version?token=token&client=consumer&protocol=v0.6"), _, _)).WillOnce(DoAll( SetArgPointee<1>(asapo::HttpCode::OK), SetArgPointee<2>(nullptr), - Return(result))); + Return(result_consumer))); std::string client_info, server_info; auto err = producer.GetVersionInfo(&client_info, &server_info, nullptr); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(server_info, HasSubstr("21.06.0")); ASSERT_THAT(server_info, HasSubstr("v0.4")); + ASSERT_THAT(server_info, HasSubstr("v0.5")); } MATCHER_P4(M_CheckDeleteStreamRequest, op_code, source_credentials, stream, flag, @@ -637,45 +699,55 @@ MATCHER_P4(M_CheckDeleteStreamRequest, op_code, source_credentials, stream, flag } TEST_F(ProducerImplTests, DeleteStreamMakesCorrectRequest) { - producer.SetCredentials(expected_credentials); - asapo::DeleteStreamOptions expected_options{}; - expected_options.delete_meta = true; - expected_options.error_on_not_exist = true; - auto flag = 3; - - EXPECT_CALL(mock_pull, AddRequest_t(M_CheckDeleteStreamRequest(asapo::kOpcodeDeleteStream, - expected_credentials_str, - expected_stream, flag), true)).WillOnce( - Return(nullptr)); + MockGetBrokerUri(); + + std::string expected_delete_stream_query_string = "{\"ErrorOnNotExist\":true,\"DeleteMeta\":true}"; + EXPECT_CALL(mock_http_client, Post_t(expected_broker_api + "/beamtime/beamtime_id/subname" + "/" + + expected_stream_encoded + "/delete" + + "?token=token&instanceid=instance&pipelinestep=step", _, + expected_delete_stream_query_string, _, _)).WillOnce(DoAll( + SetArgPointee<3>(HttpCode::OK), + SetArgPointee<4>(nullptr), + Return("") + )); + + asapo::DeleteStreamOptions opt; + opt.delete_meta = true; + opt.error_on_not_exist = true; + auto err = producer.DeleteStream(expected_stream, 1000, opt); + ASSERT_THAT(err, Eq(nullptr)); - asapo::DeleteStreamOptions options{}; - auto err = producer.DeleteStream(expected_stream, 1000, options); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); } TEST_F(ProducerImplTests, GetStreamMetaMakesCorrectRequest) { - producer.SetCredentials(expected_credentials); - EXPECT_CALL(mock_pull, AddRequest_t(M_CheckGetRequest(asapo::kOpcodeGetMeta, - expected_credentials_str, - expected_stream), true)).WillOnce( - Return(nullptr)); + MockGetBrokerUri(); + EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/subname" + + "/" + expected_stream_encoded + "/0/meta/1?token=token&instanceid=instance&pipelinestep=step", _, + _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(expected_metadata))); asapo::Error err; - producer.GetStreamMeta(expected_stream, 1000, &err); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); + auto res = producer.GetStreamMeta(expected_stream, 1000, &err); + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(res, Eq(expected_metadata)); } - TEST_F(ProducerImplTests, GetBeamtimeMetaMakesCorrectRequest) { - producer.SetCredentials(expected_credentials); - EXPECT_CALL(mock_pull, AddRequest_t(M_CheckGetRequest(asapo::kOpcodeGetMeta, - expected_credentials_str, - ""), true)).WillOnce( - Return(nullptr)); + std::string expected_metadata = "{\"meta\":1}"; + MockGetBrokerUri(); + EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/subname" + + "/default/0/meta/0?token=token&instanceid=instance&pipelinestep=step", _, + _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(expected_metadata))); asapo::Error err; - producer.GetBeamtimeMeta(1000, &err); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); + auto res = producer.GetBeamtimeMeta(1000, &err); + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(res, Eq(expected_metadata)); } } diff --git a/producer/api/python/dist_linux/CMakeLists.txt b/producer/api/python/dist_linux/CMakeLists.txt index 0180f47dbf568844211a643cf7195de4f38e432b..c69d186161b4890fdde7c844be7f92efe10faf2d 100644 --- a/producer/api/python/dist_linux/CMakeLists.txt +++ b/producer/api/python/dist_linux/CMakeLists.txt @@ -53,6 +53,7 @@ ADD_CUSTOM_TARGET(copy_python_dist-producer ALL COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_BINARY_DIR}/../asapo_producer.cpp ${CMAKE_CURRENT_BINARY_DIR}/. COMMAND ${CMAKE_COMMAND} -E copy_directory ${CMAKE_SOURCE_DIR}/producer/api/cpp/include ${CMAKE_CURRENT_BINARY_DIR}/include COMMAND ${CMAKE_COMMAND} -E copy_directory ${ASAPO_CXX_COMMON_INCLUDE_DIR}/asapo/common ${CMAKE_CURRENT_BINARY_DIR}/include/asapo/common + COMMAND ${CMAKE_COMMAND} -E copy_directory ${ASAPO_CXX_COMMON_INCLUDE_DIR}/asapo/http_request ${CMAKE_CURRENT_BINARY_DIR}/include/asapo/http_request COMMAND ${CMAKE_COMMAND} -E copy_directory ${ASAPO_CXX_COMMON_INCLUDE_DIR}/asapo/preprocessor ${CMAKE_CURRENT_BINARY_DIR}/include/asapo/preprocessor COMMAND ${CMAKE_COMMAND} -E remove ${CMAKE_CURRENT_BINARY_DIR}/include/common/version.h.in COMMAND ${CMAKE_COMMAND} -E copy_directory ${ASAPO_CXX_COMMON_INCLUDE_DIR}/asapo/logger ${CMAKE_CURRENT_BINARY_DIR}/include/asapo/logger diff --git a/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp b/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp index 65634120cb0beaeed8c2dab3b39413e4020700fd..defe8753e0ad084804efb7a80ddfa1c9eff6bf32 100644 --- a/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp +++ b/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp @@ -45,7 +45,7 @@ int main(int argc, char* argv[]) { asapo::HttpCode code; std::string response; std::string input_data; - auto folder_token = consumer_impl->httpclient__->Post(args.uri_authorizer + "/v0.1/folder", "", authorize_request, + auto folder_token = consumer_impl->service_request_->httpclient__->Post(args.uri_authorizer + "/v0.1/folder", "", authorize_request, &code, &err); if (err) { @@ -54,21 +54,21 @@ int main(int argc, char* argv[]) { M_AssertTrue(err == nullptr); M_AssertTrue(code == asapo::HttpCode::OK); - consumer_impl->httpclient__->Post(args.uri_authorizer + "/v0.1/folder", "", "", &code, &err); + consumer_impl->service_request_->httpclient__->Post(args.uri_authorizer + "/v0.1/folder", "", "", &code, &err); M_AssertTrue(code == asapo::HttpCode::BadRequest); - consumer_impl->httpclient__->Post(args.uri_authorizer + "/bla", "", "", &code, &err); + consumer_impl->service_request_->httpclient__->Post(args.uri_authorizer + "/bla", "", "", &code, &err); M_AssertTrue(code == asapo::HttpCode::NotFound); // check post with data std::string transfer = "{\"Folder\":\"" + args.folder + "\",\"FileName\":\"aaa\"}"; std::string cookie = "Authorization=Bearer " + folder_token + ";"; - auto content = consumer_impl->httpclient__->Post(args.uri_fts + "/v0.1/transfer", cookie, transfer, &code, &err); + auto content = consumer_impl->service_request_->httpclient__->Post(args.uri_fts + "/v0.1/transfer", cookie, transfer, &code, &err); M_AssertEq("hello", content); M_AssertTrue(code == asapo::HttpCode::OK); // with array asapo::MessageData data; - err = consumer_impl->httpclient__->Post(args.uri_fts + "/v0.1/transfer", cookie, transfer, &data, 5, &code); + err = consumer_impl->service_request_->httpclient__->Post(args.uri_fts + "/v0.1/transfer", cookie, transfer, &data, 5, &code); M_AssertEq( "hello", reinterpret_cast<char const*>(data.get())); M_AssertTrue(code == asapo::HttpCode::OK); @@ -78,7 +78,7 @@ int main(int argc, char* argv[]) { uint64_t size = 0; auto expected_data = io->GetDataFromFile(fname, &size, &err); M_AssertEq(nullptr, err); - err = consumer_impl->httpclient__->Post(args.uri_fts + "/v0.1/transfer", cookie, transfer, &data, size, &code); + err = consumer_impl->service_request_->httpclient__->Post(args.uri_fts + "/v0.1/transfer", cookie, transfer, &data, size, &code); M_AssertTrue(code == asapo::HttpCode::OK); for (uint64_t i = 0; i < size; i++) { if (expected_data[i] != data[i]) { @@ -88,11 +88,11 @@ int main(int argc, char* argv[]) { // with file transfer = "{\"Folder\":\"" + args.folder + "\",\"FileName\":\"aaa\"}"; - err = consumer_impl->httpclient__->Post(args.uri_fts + "/v0.1/transfer", cookie, transfer, "bbb", &code); + err = consumer_impl->service_request_->httpclient__->Post(args.uri_fts + "/v0.1/transfer", cookie, transfer, "bbb", &code); M_AssertTrue(code == asapo::HttpCode::OK); transfer = "{\"Folder\":\"" + args.folder + "\",\"FileName\":\"random\"}"; - err = consumer_impl->httpclient__->Post(args.uri_fts + "/v0.1/transfer", cookie, transfer, "random", &code); + err = consumer_impl->service_request_->httpclient__->Post(args.uri_fts + "/v0.1/transfer", cookie, transfer, "random", &code); M_AssertTrue(code == asapo::HttpCode::OK); return EXIT_SUCCESS; diff --git a/tests/automatic/full_chain/simple_chain_filegen/check_linux.sh b/tests/automatic/full_chain/simple_chain_filegen/check_linux.sh index bf8182359a91355beb1805d308394289a218b904..8fd3fe549d73986dbf27389834cf95565e7d6ec1 100644 --- a/tests/automatic/full_chain/simple_chain_filegen/check_linux.sh +++ b/tests/automatic/full_chain/simple_chain_filegen/check_linux.sh @@ -36,12 +36,12 @@ mkdir -p ${receiver_folder} $producer_bin test.json & producerid=`echo $!` -sleep 1 +sleep 2 mkdir /tmp/asapo/test_in/processed/test1 mkdir /tmp/asapo/test_in/processed/test2 -sleep 1 +sleep 2 echo hello > /tmp/asapo/test_in/processed/test1/file1 echo hello > /tmp/asapo/test_in/processed/test1/file2 diff --git a/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/check_linux.sh b/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/check_linux.sh index 0e735ad721f0228281a7967242ad006b5932a6ad..60a321bf5c466425e26210e5180e86244cc3a9a6 100644 --- a/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/check_linux.sh +++ b/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/check_linux.sh @@ -78,11 +78,11 @@ mkdir -p ${receiver_folder} $producer_bin test.json & producerid=`echo $!` -sleep 1 +sleep 2 mkdir /tmp/asapo/test_in/processed/test1 mkdir /tmp/asapo/test_in/processed/test2 -sleep 1 +sleep 2 echo -n hello1 > /tmp/asapo/test_in/processed/test1/file1 echo -n hello2 > /tmp/asapo/test_in/processed/test1/file2 diff --git a/tests/automatic/producer/c_api/producer_api.c b/tests/automatic/producer/c_api/producer_api.c index 84826a1048dee1b10d77c48463e088358c11c18f..465af350239c8f23a8c3325d59fbb3c2105e7b96 100644 --- a/tests/automatic/producer/c_api/producer_api.c +++ b/tests/automatic/producer/c_api/producer_api.c @@ -104,7 +104,7 @@ int main(int argc, char* argv[]) { AsapoErrorHandle err = asapo_new_handle(); AsapoSourceCredentialsHandle cred = asapo_create_source_credentials(kProcessed, "auto", "auto", - beamtime, "", source, ""); + beamtime, "", source, "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmNDNhbGZwOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdGVzdCIsIkV4dHJhQ2xhaW1zIjp7IkFjY2Vzc1R5cGVzIjpbIndyaXRlIiwicmVhZCJdfX0.EGBSmV8YIJtwstHsoidhZt4ZXo18nJOnUQGJkCpoH4I"); AsapoProducerHandle producer = asapo_create_producer(endpoint,3,kTcp, cred,60000,&err); EXIT_IF_ERROR("create producer", err); diff --git a/tests/automatic/producer/cpp_api/producer_api.cpp b/tests/automatic/producer/cpp_api/producer_api.cpp index dbe9cdf2411a7c675e34bc405c54207746367d90..505083816538c6c59553c6658081974163c0e15e 100644 --- a/tests/automatic/producer/cpp_api/producer_api.cpp +++ b/tests/automatic/producer/cpp_api/producer_api.cpp @@ -68,7 +68,7 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { asapo::RequestHandlerType::kTcp, asapo::SourceCredentials{asapo::SourceType::kProcessed, "auto", "auto", args.beamtime, - "", args.source, ""}, 60000, &err); + "", args.source, "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmNDNhbGZwOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdGVzdCIsIkV4dHJhQ2xhaW1zIjp7IkFjY2Vzc1R5cGVzIjpbIndyaXRlIiwicmVhZCJdfX0.EGBSmV8YIJtwstHsoidhZt4ZXo18nJOnUQGJkCpoH4I"}, 60000, &err); if (err) { std::cerr << "Cannot start producer. ProducerError: " << err << std::endl; exit(EXIT_FAILURE); diff --git a/tests/automatic/producer/python_api/check_linux.sh b/tests/automatic/producer/python_api/check_linux.sh index 6bef80f4d18633847bd24c22fc7e19ab922df4ff..27cf05166befebea813e1c4cda0d055483e1eed3 100644 --- a/tests/automatic/producer/python_api/check_linux.sh +++ b/tests/automatic/producer/python_api/check_linux.sh @@ -30,7 +30,7 @@ cat out echo count successfully send, expect 19 cat out | grep "successfuly sent" | wc -l | tee /dev/stderr | grep 19 echo count wrong input, expect 11 -cat out | grep "wrong input" | wc -l | tee /dev/stderr | grep 11 +cat out | grep "wrong input" | wc -l | tee /dev/stderr | grep 10 echo count wrong json, expect 2 cat out | grep "JSON parse error" | wc -l | tee /dev/stderr | grep 2 diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index bcd5f08de7296816ba6c34bbd8289d7be068c2e9..59e8bcb3e95007e1b4f313ab173a6e18bbacefe0 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -14,7 +14,7 @@ data_source = sys.argv[1] beamtime = sys.argv[2] endpoint = sys.argv[3] -token = "" +token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmNDNhbGZwOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdGVzdCIsIkV4dHJhQ2xhaW1zIjp7IkFjY2Vzc1R5cGVzIjpbIndyaXRlIiwicmVhZCJdfX0.EGBSmV8YIJtwstHsoidhZt4ZXo18nJOnUQGJkCpoH4I" nthreads = 8 @@ -196,8 +196,11 @@ bt_meta = producer.get_beamtime_meta() stream_meta = producer.get_stream_meta(stream = 'stream') assert_eq(bt_meta['data'], 'bt_meta', "beamtime meta") assert_eq(stream_meta['data'], 'st_meta', "stream meta") -no_meta = producer.get_stream_meta(stream = 'notexist') -assert_eq(no_meta, None, "no meta") + +try: + no_meta = producer.get_stream_meta(stream = 'notexist') +except asapo_producer.AsapoTimeOutError as e: + print("Expected error on get stream meta: ", e) #stream infos info = producer.stream_info() @@ -215,7 +218,7 @@ assert_eq(info['lastId'], 3, "last id from different stream") assert_eq(info['finished'], True, "stream finished") info = producer.stream_info('dataset_stream') -print(info) +print("Stream info: ", info) assert_eq(info['lastId'], 2, "last id from stream with datasets") info = producer.stream_info('not_exist') @@ -233,11 +236,11 @@ producer.stream_info('stream/test $') assert_eq(info['lastId'], 0, "last id from non deleted stream") -producer.delete_stream('unknown_stream',error_on_not_exist = False) +producer.delete_stream('unknown_stream', error_on_not_exist = False) try: producer.delete_stream('unknown_stream',error_on_not_exist = True) except asapo_producer.AsapoWrongInputError as e: - print("Expected error: ", e) + print("Expected error on delete stream: ", e) else: print("should be error on delete unknown stream with flag") sys.exit(1)