diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a5ec00419269b0849b8be9e0285775a73271113..a42a03c088c318d25b2d2c10053ed72943f94765 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## 20.12.1 (in progress) + + IMPROVEMENTS +* Producer API - queue limits in Python, for C++ return original data in error custom data + ## 20.12.0 FEATURES diff --git a/common/cpp/include/asapo/common/error.h b/common/cpp/include/asapo/common/error.h index c2259b79b551f0b3a1449ce30a55443aa75951da..24d78d5399cc7fa64307e44b63692596883dc2d1 100644 --- a/common/cpp/include/asapo/common/error.h +++ b/common/cpp/include/asapo/common/error.h @@ -39,7 +39,7 @@ class ErrorInterface { virtual std::string Explain() const noexcept = 0; virtual void Append(const std::string& value) noexcept = 0; virtual ErrorType GetErrorType() const noexcept = 0; - virtual const CustomErrorData* GetCustomData() = 0; + virtual CustomErrorData* GetCustomData() = 0; virtual void SetCustomData(std::unique_ptr<CustomErrorData> data) = 0; virtual ~ErrorInterface() = default; // needed for unique_ptr to delete itself }; @@ -96,7 +96,7 @@ class SimpleError: public ErrorInterface { SimpleError(std::string error, ErrorType error_type ): error_{std::move(error)}, error_type_{error_type} { } - const CustomErrorData* GetCustomData() override { + CustomErrorData* GetCustomData() override { if (custom_data_) { return custom_data_.get(); } else { diff --git a/common/cpp/include/asapo/request/request.h b/common/cpp/include/asapo/request/request.h index e67637a1ccd171926b687330045c3c83dc9a78d8..594c03c85b002d12a50b76a7bc1921bed341f8e8 100644 --- a/common/cpp/include/asapo/request/request.h +++ b/common/cpp/include/asapo/request/request.h @@ -9,7 +9,6 @@ namespace asapo { - class GenericRequest { public: GenericRequest() = delete; diff --git a/common/cpp/include/asapo/request/request_pool_error.h b/common/cpp/include/asapo/request/request_pool_error.h new file mode 100644 index 0000000000000000000000000000000000000000..48c8a56538b41cf909dcf0716e786e06629ab5b8 --- /dev/null +++ b/common/cpp/include/asapo/request/request_pool_error.h @@ -0,0 +1,15 @@ +#ifndef ASAPO_REQUEST_POOL_ERROR_H +#define ASAPO_REQUEST_POOL_ERROR_H + +#include "asapo/common/error.h" + +namespace asapo { + +class OriginalRequest : public CustomErrorData { + public: + GenericRequestPtr request; +}; + +} + +#endif //ASAPO_REQUEST_POOL_ERROR_H diff --git a/common/cpp/src/request/request_pool.cpp b/common/cpp/src/request/request_pool.cpp index e02001ec0c428ce56f680f95be050baf5899c1c2..5df251934d7b8ca16ced26beda4d7312cfb865f2 100644 --- a/common/cpp/src/request/request_pool.cpp +++ b/common/cpp/src/request/request_pool.cpp @@ -1,5 +1,5 @@ #include "asapo/request/request_pool.h" - +#include "asapo/request/request_pool_error.h" namespace asapo { RequestPool::RequestPool(uint8_t n_threads, @@ -60,6 +60,9 @@ Error RequestPool::CanAddRequest(const GenericRequestPtr &request, bool top_prio Error RequestPool::AddRequest(GenericRequestPtr request, bool top_priority) { std::unique_lock<std::mutex> lock(mutex_); if (auto err = CanAddRequest(request, top_priority)) { + OriginalRequest* original_request = new OriginalRequest{}; + original_request->request = std::move(request); + err->SetCustomData(std::unique_ptr<CustomErrorData>(original_request)); return err; } diff --git a/common/cpp/unittests/request/test_request_pool.cpp b/common/cpp/unittests/request/test_request_pool.cpp index 8c428f9ad80fb02a549016b27024151d3b5569b3..ccda29729425b60a4a914e7d03ca8e060e93deff 100644 --- a/common/cpp/unittests/request/test_request_pool.cpp +++ b/common/cpp/unittests/request/test_request_pool.cpp @@ -6,6 +6,7 @@ #include "asapo/common/error.h" #include "asapo/request/request_pool.h" +#include "asapo/request/request_pool_error.h" #include "asapo/request/request_handler_factory.h" #include "mocking.h" @@ -209,6 +210,8 @@ TEST_F(RequestPoolTests, RefuseAddRequestIfHitSizeLimitation) { ASSERT_THAT(nreq, Eq(1)); ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kNoSpaceLeft)); + auto err_data = static_cast<asapo::OriginalRequest*>(err->GetCustomData()); + ASSERT_THAT(err_data, Ne(nullptr)); } TEST_F(RequestPoolTests, RefuseAddRequestIfHitMemoryLimitation) { @@ -225,6 +228,9 @@ TEST_F(RequestPoolTests, RefuseAddRequestIfHitMemoryLimitation) { ASSERT_THAT(nreq, Eq(1)); ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kNoSpaceLeft)); + auto err_data = static_cast<asapo::OriginalRequest*>(err->GetCustomData()); + ASSERT_THAT(err_data, Ne(nullptr)); + } TEST_F(RequestPoolTests, RefuseAddRequestsIfHitSizeLimitation) { diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index e309bd22dfdf54ff4270dc90896971e2a80fd1cb..cca41075d1bd0e1a236304a481a58c06c0990ae6 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -13,7 +13,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo": pass cppclass ErrorInterface: string Explain() - const CustomErrorData* GetCustomData() + CustomErrorData* GetCustomData() cppclass ErrorTemplateInterface: pass cdef bool operator==(Error lhs, ErrorTemplateInterface rhs) diff --git a/producer/api/cpp/include/asapo/producer/producer_error.h b/producer/api/cpp/include/asapo/producer/producer_error.h index 2bcc86593fe5cb37f0cab5145a1ddce1a5dc5a66..1d1e0904f91f7be183e8dd55545a0cbca8281ba1 100644 --- a/producer/api/cpp/include/asapo/producer/producer_error.h +++ b/producer/api/cpp/include/asapo/producer/producer_error.h @@ -2,6 +2,7 @@ #define ASAPO_PRODUCER_ERROR_H #include "asapo/common/error.h" +#include "asapo/common/data_structs.h" namespace asapo { @@ -17,6 +18,12 @@ enum class ProducerErrorType { using ProducerErrorTemplate = ServiceErrorTemplate<ProducerErrorType, ErrorType::kProducerError>; +class OriginalData : public CustomErrorData { + public: + MessageData data; +}; + + namespace ProducerErrorTemplates { auto const kServerWarning = ProducerErrorTemplate { diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index 0ed76a1b699e4a3335eba9812ed8019b7306c802..3717a1af89568c1e8c9b701603a143824c840007 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -10,7 +10,7 @@ #include "producer_request_handler_factory.h" #include "producer_request.h" #include "asapo/common/data_structs.h" - +#include "asapo/request/request_pool_error.h" namespace asapo { @@ -96,6 +96,48 @@ Error CheckProducerRequest(const MessageHeader& message_header, uint64_t ingest_ return CheckIngestMode(ingest_mode); } +Error HandleErrorFromPool(Error original_error,bool manage_data_memory) { + if (original_error == nullptr) { + return nullptr; + } + Error producer_error = ProducerErrorTemplates::kRequestPoolIsFull.Generate(original_error->Explain()); + auto err_data = static_cast<OriginalRequest*>(original_error->GetCustomData()); + if (!err_data) { + return producer_error; + } + auto producer_request = static_cast<ProducerRequest*>(err_data->request.get()); + if (!producer_request) { + return producer_error; + } + MessageData original_data = std::move(producer_request->data); + if (original_data == nullptr) { + return producer_error; + } + if (!manage_data_memory) { + original_data.release(); + } else { + OriginalData* original = new asapo::OriginalData{}; + original->data = std::move(original_data); + producer_error->SetCustomData(std::unique_ptr<asapo::CustomErrorData>{original}); + } + return producer_error; +} + +Error HandleInputError(Error original_error,MessageData data, bool manage_data_memory) { + if (data == nullptr) { + return original_error; + } + if (!manage_data_memory) { + data.release(); + return original_error; + } + + OriginalData* original = new asapo::OriginalData{}; + original->data = std::move(data); + original_error->SetCustomData(std::unique_ptr<asapo::CustomErrorData>{original}); + return original_error; +} + Error ProducerImpl::Send(const MessageHeader& message_header, std::string stream, MessageData data, @@ -105,19 +147,17 @@ Error ProducerImpl::Send(const MessageHeader& message_header, bool manage_data_memory) { auto err = CheckProducerRequest(message_header, ingest_mode, stream); if (err) { - if (!manage_data_memory) { - data.release(); - } log__->Error("error checking request - " + err->Explain()); - return err; + return HandleInputError(std::move(err),std::move(data),manage_data_memory); } auto request_header = GenerateNextSendRequest(message_header, std::move(stream), ingest_mode); - return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header), + err = request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header), std::move(data), std::move(message_header.user_metadata), std::move(full_path), callback, manage_data_memory, timeout_ms_} }); + return HandleErrorFromPool(std::move(err),manage_data_memory); } bool WandTransferData(uint64_t ingest_mode) { @@ -142,7 +182,7 @@ Error ProducerImpl::Send(const MessageHeader &message_header, std::string stream, RequestCallback callback) { if (auto err = CheckData(ingest_mode, message_header, &data)) { - return err; + return HandleInputError(std::move(err),std::move(data),true); } return Send(message_header, std::move(stream), std::move(data), "", ingest_mode, callback, true); @@ -214,9 +254,10 @@ Error ProducerImpl::SendMetadata(const std::string& metadata, RequestCallback ca request_header.custom_data[kPosIngestMode] = asapo::IngestModeFlags::kTransferData | asapo::IngestModeFlags::kStoreInDatabase; MessageData data{new uint8_t[metadata.size()]}; strncpy((char*)data.get(), metadata.c_str(), metadata.size()); - return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header), + auto err = request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header), std::move(data), "", "", callback, true, timeout_ms_} }); + return HandleErrorFromPool(std::move(err), true); } Error ProducerImpl::Send__(const MessageHeader &message_header, diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index e9ea117ee49a9dbadcf1dff210de7655688d2c6d..b157ae36b9eda7fe9cb3922fc16eb85e76d7f24c 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -10,6 +10,7 @@ #include "asapo/producer/producer_error.h" #include "../src/request_handler_tcp.h" +#include "asapo/request/request_pool_error.h" #include "mocking.h" @@ -98,17 +99,22 @@ class ProducerImplTests : public testing::Test { TEST_F(ProducerImplTests, SendReturnsError) { EXPECT_CALL(mock_pull, AddRequest_t(_, false)).WillOnce(Return( - asapo::ProducerErrorTemplates::kRequestPoolIsFull.Generate().release())); + asapo::IOErrorTemplates::kNoSpaceLeft.Generate().release())); asapo::MessageHeader message_header{1, 1, "test"}; auto err = producer.Send(message_header, nullptr, expected_ingest_mode, "default", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull)); } TEST_F(ProducerImplTests, ErrorIfFileNameTooLong) { + asapo::MessageData data = asapo::MessageData{new uint8_t[100]}; + data[34]=12; std::string long_string(asapo::kMaxMessageSize + 100, 'a'); asapo::MessageHeader message_header{1, 1, long_string}; - auto err = producer.Send(message_header, nullptr, expected_ingest_mode, "default", nullptr); + auto err = producer.Send(message_header, std::move(data), expected_ingest_mode, "default", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); + auto err_data = static_cast<asapo::OriginalData*>(err->GetCustomData()); + ASSERT_THAT(err_data, Ne(nullptr)); + ASSERT_THAT(err_data->data[34], Eq(12)); } TEST_F(ProducerImplTests, ErrorIfStreamEmpty) { @@ -137,6 +143,8 @@ TEST_F(ProducerImplTests, ErrorIfZeroDataSize) { asapo::MessageHeader message_header{1, 0, expected_fullpath}; auto err = producer.Send(message_header, std::move(data), asapo::kDefaultIngestMode, "default", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); + auto err_data = static_cast<asapo::OriginalData*>(err->GetCustomData()); + ASSERT_THAT(err_data, Ne(nullptr)); } TEST_F(ProducerImplTests, ErrorIfNoData) { @@ -472,6 +480,38 @@ TEST_F(ProducerImplTests, GetLastStreamMakesCorerctRequest) { ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); } + +TEST_F(ProducerImplTests, ReturnDataIfCanotAddToQueue) { + producer.SetCredentials(expected_credentials); + + asapo::MessageData data = asapo::MessageData{new uint8_t[100]}; + data[40] = 10; + asapo::OriginalRequest* original_request = new asapo::OriginalRequest{}; + + auto request = std::unique_ptr<ProducerRequest> {new ProducerRequest{"", asapo::GenericRequestHeader{},std::move(data), "", "", nullptr, true, 0}}; + original_request->request = std::move(request); + auto pool_err = asapo::IOErrorTemplates::kNoSpaceLeft.Generate(); + pool_err->SetCustomData(std::unique_ptr<asapo::CustomErrorData>{original_request}); + + + EXPECT_CALL(mock_pull, AddRequest_t(_,_)).WillOnce(Return( + std::move(pool_err).release())); + + asapo::MessageHeader message_header{expected_id, 0, expected_name}; + auto err = producer.Send(message_header, std::move(data), expected_ingest_mode, expected_stream, nullptr); + + auto err_data = static_cast<asapo::OriginalData*>(err->GetCustomData()); + ASSERT_THAT(err_data, Ne(nullptr)); + + asapo::MessageData original_data_in_err = std::move(err_data->data); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull)); + ASSERT_THAT(original_data_in_err, Ne(nullptr)); + ASSERT_THAT(original_data_in_err[40], Eq(10)); + +} + + + } #pragma clang diagnostic pop \ No newline at end of file diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index c387fe773f2a4f277bcef12ce96dee4b4fa8dcd6..aaac948f94739b9c7bc57236264d80cc710fca2f 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -21,8 +21,7 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo": ErrorTemplateInterface kWrongInput "asapo::ProducerErrorTemplates::kWrongInput" ErrorTemplateInterface kLocalIOError "asapo::ProducerErrorTemplates::kLocalIOError" ErrorTemplateInterface kServerWarning "asapo::ProducerErrorTemplates::kServerWarning" - - + ErrorTemplateInterface kRequestPoolIsFull "asapo::ProducerErrorTemplates::kRequestPoolIsFull" cdef extern from "asapo/asapo_producer.h" namespace "asapo": cppclass MessageData: @@ -100,6 +99,8 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil: void StopThreads__() void SetLogLevel(LogLevel level) uint64_t GetRequestsQueueSize() + uint64_t GetRequestsQueueVolumeMb() + void SetRequestsQueueLimits(uint64_t size, uint64_t volume) Error WaitRequestsFinished(uint64_t timeout_ms) Error SendStreamFinishedFlag(string stream, uint64_t last_id, string next_stream, RequestCallback callback) StreamInfo GetStreamInfo(string stream, uint64_t timeout_ms, Error* err) diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 67bb53dc169f1dd38806a6860dd2b8e85feb7ba5..436f1a4ac57129b2a57c89d49d96e836b4bd7e6e 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -51,6 +51,8 @@ class AsapoTimeOutError(AsapoProducerError): class AsapoServerWarning(AsapoProducerError): pass +class AsapoRequestsPoolIsFull(AsapoProducerError): + pass cdef python_exception_from_error(Error& err): error_string = _str(err.get().Explain()) @@ -62,6 +64,8 @@ cdef python_exception_from_error(Error& err): return AsapoLocalIOError(error_string) elif err == kServerWarning: return AsapoServerWarning(error_string) + elif err == kRequestPoolIsFull: + return AsapoRequestsPoolIsFull(error_string) else: return AsapoProducerError(error_string) @@ -271,6 +275,10 @@ cdef class PyProducer: return def get_requests_queue_size(self): return self.c_producer.get().GetRequestsQueueSize() + def get_requests_queue_volume_mb(self): + return self.c_producer.get().GetRequestsQueueVolumeMb() + def set_requests_queue_limits(self,uint64_t size = 0, uint64_t volume_mb = 0): + return self.c_producer.get().SetRequestsQueueLimits(size,volume_mb) def wait_requests_finished(self,timeout_ms): """ :param timeout_ms: timeout in milliseconds diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index eb14bca76ed10a90c94b978bfb501a007332d1d9..11859cc789bf989e00d866f9d45a06f741106a54 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -128,6 +128,7 @@ producer.send_file(1, local_path="./file1", exposed_path="processed/" + data_sou producer.wait_requests_finished(50000) n = producer.get_requests_queue_size() assert_eq(n, 0, "requests in queue") +assert_eq(n, 0, "requests in queue") # send to another data to stream stream producer.send(2, "processed/" + data_source + "/" + "file10", None, @@ -137,9 +138,19 @@ producer.wait_requests_finished(50000) n = producer.get_requests_queue_size() assert_eq(n, 0, "requests in queue") -#stream infos +# pool limits (checking volume only) +data = np.arange(1000000, dtype=np.float64) +producer.set_requests_queue_limits(0,1) +try: + producer.send(11, "processed/bla", data) +except asapo_producer.AsapoRequestsPoolIsFull as e: + print(e) +else: + print("should be AsapoRequestsPoolIsFull error ") + sys.exit(1) +#stream infos info = producer.stream_info() assert_eq(info['lastId'], 10, "stream_info last id") assert_eq(info['name'], "default", "stream_info name")