From 19838f4bf7983319f44dd41721bd521ff6969cb0 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Wed, 13 Jan 2021 12:46:28 +0100 Subject: [PATCH] update c++ part --- CHANGELOG.md | 5 ++ common/cpp/include/asapo/common/error.h | 4 +- common/cpp/include/asapo/request/request.h | 1 - .../asapo/request/request_pool_error.h | 15 +++++ common/cpp/src/request/request_pool.cpp | 5 +- .../unittests/request/test_request_pool.cpp | 6 ++ consumer/api/python/asapo_consumer.pxd | 2 +- .../include/asapo/producer/producer_error.h | 7 +++ producer/api/cpp/src/producer_impl.cpp | 57 ++++++++++++++++--- .../api/cpp/unittests/test_producer_impl.cpp | 44 +++++++++++++- 10 files changed, 131 insertions(+), 15 deletions(-) create mode 100644 common/cpp/include/asapo/request/request_pool_error.h diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a5ec0041..a42a03c08 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## 20.12.1 (in progress) + + IMPROVEMENTS +* Producer API - queue limits in Python, for C++ return original data in error custom data + ## 20.12.0 FEATURES diff --git a/common/cpp/include/asapo/common/error.h b/common/cpp/include/asapo/common/error.h index c2259b79b..24d78d539 100644 --- a/common/cpp/include/asapo/common/error.h +++ b/common/cpp/include/asapo/common/error.h @@ -39,7 +39,7 @@ class ErrorInterface { virtual std::string Explain() const noexcept = 0; virtual void Append(const std::string& value) noexcept = 0; virtual ErrorType GetErrorType() const noexcept = 0; - virtual const CustomErrorData* GetCustomData() = 0; + virtual CustomErrorData* GetCustomData() = 0; virtual void SetCustomData(std::unique_ptr<CustomErrorData> data) = 0; virtual ~ErrorInterface() = default; // needed for unique_ptr to delete itself }; @@ -96,7 +96,7 @@ class SimpleError: public ErrorInterface { SimpleError(std::string error, ErrorType error_type ): error_{std::move(error)}, error_type_{error_type} { } - const CustomErrorData* GetCustomData() override { + CustomErrorData* GetCustomData() override { if (custom_data_) { return custom_data_.get(); } else { diff --git a/common/cpp/include/asapo/request/request.h b/common/cpp/include/asapo/request/request.h index e67637a1c..594c03c85 100644 --- a/common/cpp/include/asapo/request/request.h +++ b/common/cpp/include/asapo/request/request.h @@ -9,7 +9,6 @@ namespace asapo { - class GenericRequest { public: GenericRequest() = delete; diff --git a/common/cpp/include/asapo/request/request_pool_error.h b/common/cpp/include/asapo/request/request_pool_error.h new file mode 100644 index 000000000..48c8a5653 --- /dev/null +++ b/common/cpp/include/asapo/request/request_pool_error.h @@ -0,0 +1,15 @@ +#ifndef ASAPO_REQUEST_POOL_ERROR_H +#define ASAPO_REQUEST_POOL_ERROR_H + +#include "asapo/common/error.h" + +namespace asapo { + +class OriginalRequest : public CustomErrorData { + public: + GenericRequestPtr request; +}; + +} + +#endif //ASAPO_REQUEST_POOL_ERROR_H diff --git a/common/cpp/src/request/request_pool.cpp b/common/cpp/src/request/request_pool.cpp index e02001ec0..5df251934 100644 --- a/common/cpp/src/request/request_pool.cpp +++ b/common/cpp/src/request/request_pool.cpp @@ -1,5 +1,5 @@ #include "asapo/request/request_pool.h" - +#include "asapo/request/request_pool_error.h" namespace asapo { RequestPool::RequestPool(uint8_t n_threads, @@ -60,6 +60,9 @@ Error RequestPool::CanAddRequest(const GenericRequestPtr &request, bool top_prio Error RequestPool::AddRequest(GenericRequestPtr request, bool top_priority) { std::unique_lock<std::mutex> lock(mutex_); if (auto err = CanAddRequest(request, top_priority)) { + OriginalRequest* original_request = new OriginalRequest{}; + original_request->request = std::move(request); + err->SetCustomData(std::unique_ptr<CustomErrorData>(original_request)); return err; } diff --git a/common/cpp/unittests/request/test_request_pool.cpp b/common/cpp/unittests/request/test_request_pool.cpp index 8c428f9ad..ccda29729 100644 --- a/common/cpp/unittests/request/test_request_pool.cpp +++ b/common/cpp/unittests/request/test_request_pool.cpp @@ -6,6 +6,7 @@ #include "asapo/common/error.h" #include "asapo/request/request_pool.h" +#include "asapo/request/request_pool_error.h" #include "asapo/request/request_handler_factory.h" #include "mocking.h" @@ -209,6 +210,8 @@ TEST_F(RequestPoolTests, RefuseAddRequestIfHitSizeLimitation) { ASSERT_THAT(nreq, Eq(1)); ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kNoSpaceLeft)); + auto err_data = static_cast<asapo::OriginalRequest*>(err->GetCustomData()); + ASSERT_THAT(err_data, Ne(nullptr)); } TEST_F(RequestPoolTests, RefuseAddRequestIfHitMemoryLimitation) { @@ -225,6 +228,9 @@ TEST_F(RequestPoolTests, RefuseAddRequestIfHitMemoryLimitation) { ASSERT_THAT(nreq, Eq(1)); ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kNoSpaceLeft)); + auto err_data = static_cast<asapo::OriginalRequest*>(err->GetCustomData()); + ASSERT_THAT(err_data, Ne(nullptr)); + } TEST_F(RequestPoolTests, RefuseAddRequestsIfHitSizeLimitation) { diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index e309bd22d..cca41075d 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -13,7 +13,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo": pass cppclass ErrorInterface: string Explain() - const CustomErrorData* GetCustomData() + CustomErrorData* GetCustomData() cppclass ErrorTemplateInterface: pass cdef bool operator==(Error lhs, ErrorTemplateInterface rhs) diff --git a/producer/api/cpp/include/asapo/producer/producer_error.h b/producer/api/cpp/include/asapo/producer/producer_error.h index 2bcc86593..1d1e0904f 100644 --- a/producer/api/cpp/include/asapo/producer/producer_error.h +++ b/producer/api/cpp/include/asapo/producer/producer_error.h @@ -2,6 +2,7 @@ #define ASAPO_PRODUCER_ERROR_H #include "asapo/common/error.h" +#include "asapo/common/data_structs.h" namespace asapo { @@ -17,6 +18,12 @@ enum class ProducerErrorType { using ProducerErrorTemplate = ServiceErrorTemplate<ProducerErrorType, ErrorType::kProducerError>; +class OriginalData : public CustomErrorData { + public: + MessageData data; +}; + + namespace ProducerErrorTemplates { auto const kServerWarning = ProducerErrorTemplate { diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index 0ed76a1b6..3717a1af8 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -10,7 +10,7 @@ #include "producer_request_handler_factory.h" #include "producer_request.h" #include "asapo/common/data_structs.h" - +#include "asapo/request/request_pool_error.h" namespace asapo { @@ -96,6 +96,48 @@ Error CheckProducerRequest(const MessageHeader& message_header, uint64_t ingest_ return CheckIngestMode(ingest_mode); } +Error HandleErrorFromPool(Error original_error,bool manage_data_memory) { + if (original_error == nullptr) { + return nullptr; + } + Error producer_error = ProducerErrorTemplates::kRequestPoolIsFull.Generate(original_error->Explain()); + auto err_data = static_cast<OriginalRequest*>(original_error->GetCustomData()); + if (!err_data) { + return producer_error; + } + auto producer_request = static_cast<ProducerRequest*>(err_data->request.get()); + if (!producer_request) { + return producer_error; + } + MessageData original_data = std::move(producer_request->data); + if (original_data == nullptr) { + return producer_error; + } + if (!manage_data_memory) { + original_data.release(); + } else { + OriginalData* original = new asapo::OriginalData{}; + original->data = std::move(original_data); + producer_error->SetCustomData(std::unique_ptr<asapo::CustomErrorData>{original}); + } + return producer_error; +} + +Error HandleInputError(Error original_error,MessageData data, bool manage_data_memory) { + if (data == nullptr) { + return original_error; + } + if (!manage_data_memory) { + data.release(); + return original_error; + } + + OriginalData* original = new asapo::OriginalData{}; + original->data = std::move(data); + original_error->SetCustomData(std::unique_ptr<asapo::CustomErrorData>{original}); + return original_error; +} + Error ProducerImpl::Send(const MessageHeader& message_header, std::string stream, MessageData data, @@ -105,19 +147,17 @@ Error ProducerImpl::Send(const MessageHeader& message_header, bool manage_data_memory) { auto err = CheckProducerRequest(message_header, ingest_mode, stream); if (err) { - if (!manage_data_memory) { - data.release(); - } log__->Error("error checking request - " + err->Explain()); - return err; + return HandleInputError(std::move(err),std::move(data),manage_data_memory); } auto request_header = GenerateNextSendRequest(message_header, std::move(stream), ingest_mode); - return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header), + err = request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header), std::move(data), std::move(message_header.user_metadata), std::move(full_path), callback, manage_data_memory, timeout_ms_} }); + return HandleErrorFromPool(std::move(err),manage_data_memory); } bool WandTransferData(uint64_t ingest_mode) { @@ -142,7 +182,7 @@ Error ProducerImpl::Send(const MessageHeader &message_header, std::string stream, RequestCallback callback) { if (auto err = CheckData(ingest_mode, message_header, &data)) { - return err; + return HandleInputError(std::move(err),std::move(data),true); } return Send(message_header, std::move(stream), std::move(data), "", ingest_mode, callback, true); @@ -214,9 +254,10 @@ Error ProducerImpl::SendMetadata(const std::string& metadata, RequestCallback ca request_header.custom_data[kPosIngestMode] = asapo::IngestModeFlags::kTransferData | asapo::IngestModeFlags::kStoreInDatabase; MessageData data{new uint8_t[metadata.size()]}; strncpy((char*)data.get(), metadata.c_str(), metadata.size()); - return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header), + auto err = request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header), std::move(data), "", "", callback, true, timeout_ms_} }); + return HandleErrorFromPool(std::move(err), true); } Error ProducerImpl::Send__(const MessageHeader &message_header, diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index e9ea117ee..b157ae36b 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -10,6 +10,7 @@ #include "asapo/producer/producer_error.h" #include "../src/request_handler_tcp.h" +#include "asapo/request/request_pool_error.h" #include "mocking.h" @@ -98,17 +99,22 @@ class ProducerImplTests : public testing::Test { TEST_F(ProducerImplTests, SendReturnsError) { EXPECT_CALL(mock_pull, AddRequest_t(_, false)).WillOnce(Return( - asapo::ProducerErrorTemplates::kRequestPoolIsFull.Generate().release())); + asapo::IOErrorTemplates::kNoSpaceLeft.Generate().release())); asapo::MessageHeader message_header{1, 1, "test"}; auto err = producer.Send(message_header, nullptr, expected_ingest_mode, "default", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull)); } TEST_F(ProducerImplTests, ErrorIfFileNameTooLong) { + asapo::MessageData data = asapo::MessageData{new uint8_t[100]}; + data[34]=12; std::string long_string(asapo::kMaxMessageSize + 100, 'a'); asapo::MessageHeader message_header{1, 1, long_string}; - auto err = producer.Send(message_header, nullptr, expected_ingest_mode, "default", nullptr); + auto err = producer.Send(message_header, std::move(data), expected_ingest_mode, "default", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); + auto err_data = static_cast<asapo::OriginalData*>(err->GetCustomData()); + ASSERT_THAT(err_data, Ne(nullptr)); + ASSERT_THAT(err_data->data[34], Eq(12)); } TEST_F(ProducerImplTests, ErrorIfStreamEmpty) { @@ -137,6 +143,8 @@ TEST_F(ProducerImplTests, ErrorIfZeroDataSize) { asapo::MessageHeader message_header{1, 0, expected_fullpath}; auto err = producer.Send(message_header, std::move(data), asapo::kDefaultIngestMode, "default", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); + auto err_data = static_cast<asapo::OriginalData*>(err->GetCustomData()); + ASSERT_THAT(err_data, Ne(nullptr)); } TEST_F(ProducerImplTests, ErrorIfNoData) { @@ -472,6 +480,38 @@ TEST_F(ProducerImplTests, GetLastStreamMakesCorerctRequest) { ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); } + +TEST_F(ProducerImplTests, ReturnDataIfCanotAddToQueue) { + producer.SetCredentials(expected_credentials); + + asapo::MessageData data = asapo::MessageData{new uint8_t[100]}; + data[40] = 10; + asapo::OriginalRequest* original_request = new asapo::OriginalRequest{}; + + auto request = std::unique_ptr<ProducerRequest> {new ProducerRequest{"", asapo::GenericRequestHeader{},std::move(data), "", "", nullptr, true, 0}}; + original_request->request = std::move(request); + auto pool_err = asapo::IOErrorTemplates::kNoSpaceLeft.Generate(); + pool_err->SetCustomData(std::unique_ptr<asapo::CustomErrorData>{original_request}); + + + EXPECT_CALL(mock_pull, AddRequest_t(_,_)).WillOnce(Return( + std::move(pool_err).release())); + + asapo::MessageHeader message_header{expected_id, 0, expected_name}; + auto err = producer.Send(message_header, std::move(data), expected_ingest_mode, expected_stream, nullptr); + + auto err_data = static_cast<asapo::OriginalData*>(err->GetCustomData()); + ASSERT_THAT(err_data, Ne(nullptr)); + + asapo::MessageData original_data_in_err = std::move(err_data->data); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull)); + ASSERT_THAT(original_data_in_err, Ne(nullptr)); + ASSERT_THAT(original_data_in_err[40], Eq(10)); + +} + + + } #pragma clang diagnostic pop \ No newline at end of file -- GitLab