diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index c387fe773f2a4f277bcef12ce96dee4b4fa8dcd6..aaac948f94739b9c7bc57236264d80cc710fca2f 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -21,8 +21,7 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo": ErrorTemplateInterface kWrongInput "asapo::ProducerErrorTemplates::kWrongInput" ErrorTemplateInterface kLocalIOError "asapo::ProducerErrorTemplates::kLocalIOError" ErrorTemplateInterface kServerWarning "asapo::ProducerErrorTemplates::kServerWarning" - - + ErrorTemplateInterface kRequestPoolIsFull "asapo::ProducerErrorTemplates::kRequestPoolIsFull" cdef extern from "asapo/asapo_producer.h" namespace "asapo": cppclass MessageData: @@ -100,6 +99,8 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil: void StopThreads__() void SetLogLevel(LogLevel level) uint64_t GetRequestsQueueSize() + uint64_t GetRequestsQueueVolumeMb() + void SetRequestsQueueLimits(uint64_t size, uint64_t volume) Error WaitRequestsFinished(uint64_t timeout_ms) Error SendStreamFinishedFlag(string stream, uint64_t last_id, string next_stream, RequestCallback callback) StreamInfo GetStreamInfo(string stream, uint64_t timeout_ms, Error* err) diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 67bb53dc169f1dd38806a6860dd2b8e85feb7ba5..436f1a4ac57129b2a57c89d49d96e836b4bd7e6e 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -51,6 +51,8 @@ class AsapoTimeOutError(AsapoProducerError): class AsapoServerWarning(AsapoProducerError): pass +class AsapoRequestsPoolIsFull(AsapoProducerError): + pass cdef python_exception_from_error(Error& err): error_string = _str(err.get().Explain()) @@ -62,6 +64,8 @@ cdef python_exception_from_error(Error& err): return AsapoLocalIOError(error_string) elif err == kServerWarning: return AsapoServerWarning(error_string) + elif err == kRequestPoolIsFull: + return AsapoRequestsPoolIsFull(error_string) else: return AsapoProducerError(error_string) @@ -271,6 +275,10 @@ cdef class PyProducer: return def get_requests_queue_size(self): return self.c_producer.get().GetRequestsQueueSize() + def get_requests_queue_volume_mb(self): + return self.c_producer.get().GetRequestsQueueVolumeMb() + def set_requests_queue_limits(self,uint64_t size = 0, uint64_t volume_mb = 0): + return self.c_producer.get().SetRequestsQueueLimits(size,volume_mb) def wait_requests_finished(self,timeout_ms): """ :param timeout_ms: timeout in milliseconds diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index eb14bca76ed10a90c94b978bfb501a007332d1d9..11859cc789bf989e00d866f9d45a06f741106a54 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -128,6 +128,7 @@ producer.send_file(1, local_path="./file1", exposed_path="processed/" + data_sou producer.wait_requests_finished(50000) n = producer.get_requests_queue_size() assert_eq(n, 0, "requests in queue") +assert_eq(n, 0, "requests in queue") # send to another data to stream stream producer.send(2, "processed/" + data_source + "/" + "file10", None, @@ -137,9 +138,19 @@ producer.wait_requests_finished(50000) n = producer.get_requests_queue_size() assert_eq(n, 0, "requests in queue") -#stream infos +# pool limits (checking volume only) +data = np.arange(1000000, dtype=np.float64) +producer.set_requests_queue_limits(0,1) +try: + producer.send(11, "processed/bla", data) +except asapo_producer.AsapoRequestsPoolIsFull as e: + print(e) +else: + print("should be AsapoRequestsPoolIsFull error ") + sys.exit(1) +#stream infos info = producer.stream_info() assert_eq(info['lastId'], 10, "stream_info last id") assert_eq(info['name'], "default", "stream_info name")