From a8cbc4adb77d7f54c5ff3f9cdc3c2672ef3860e6 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Wed, 13 Jan 2021 13:32:40 +0100
Subject: [PATCH] add python calls

---
 producer/api/python/asapo_producer.pxd              |  5 +++--
 producer/api/python/asapo_producer.pyx.in           |  8 ++++++++
 tests/automatic/producer/python_api/producer_api.py | 13 ++++++++++++-
 3 files changed, 23 insertions(+), 3 deletions(-)

diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd
index c387fe773..aaac948f9 100644
--- a/producer/api/python/asapo_producer.pxd
+++ b/producer/api/python/asapo_producer.pxd
@@ -21,8 +21,7 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo":
   ErrorTemplateInterface kWrongInput "asapo::ProducerErrorTemplates::kWrongInput"
   ErrorTemplateInterface kLocalIOError "asapo::ProducerErrorTemplates::kLocalIOError"
   ErrorTemplateInterface kServerWarning "asapo::ProducerErrorTemplates::kServerWarning"
-
-
+  ErrorTemplateInterface kRequestPoolIsFull "asapo::ProducerErrorTemplates::kRequestPoolIsFull"
 
 cdef extern from "asapo/asapo_producer.h" namespace "asapo":
   cppclass MessageData:
@@ -100,6 +99,8 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil:
         void StopThreads__()
         void SetLogLevel(LogLevel level)
         uint64_t  GetRequestsQueueSize()
+        uint64_t  GetRequestsQueueVolumeMb()
+        void SetRequestsQueueLimits(uint64_t size, uint64_t volume)
         Error WaitRequestsFinished(uint64_t timeout_ms)
         Error SendStreamFinishedFlag(string stream, uint64_t last_id, string next_stream, RequestCallback callback)
         StreamInfo GetStreamInfo(string stream, uint64_t timeout_ms, Error* err)
diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in
index 67bb53dc1..436f1a4ac 100644
--- a/producer/api/python/asapo_producer.pyx.in
+++ b/producer/api/python/asapo_producer.pyx.in
@@ -51,6 +51,8 @@ class AsapoTimeOutError(AsapoProducerError):
 class AsapoServerWarning(AsapoProducerError):
   pass
 
+class AsapoRequestsPoolIsFull(AsapoProducerError):
+  pass
 
 cdef python_exception_from_error(Error& err):
     error_string =  _str(err.get().Explain())
@@ -62,6 +64,8 @@ cdef python_exception_from_error(Error& err):
             return AsapoLocalIOError(error_string)
     elif err == kServerWarning:
             return AsapoServerWarning(error_string)
+    elif err == kRequestPoolIsFull:
+            return AsapoRequestsPoolIsFull(error_string)
     else:
         return AsapoProducerError(error_string)
 
@@ -271,6 +275,10 @@ cdef class PyProducer:
         return
     def get_requests_queue_size(self):
         return self.c_producer.get().GetRequestsQueueSize()
+    def get_requests_queue_volume_mb(self):
+        return self.c_producer.get().GetRequestsQueueVolumeMb()
+    def set_requests_queue_limits(self,uint64_t size = 0, uint64_t volume_mb = 0):
+        return self.c_producer.get().SetRequestsQueueLimits(size,volume_mb)
     def wait_requests_finished(self,timeout_ms):
         """
          :param timeout_ms: timeout in milliseconds
diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py
index eb14bca76..11859cc78 100644
--- a/tests/automatic/producer/python_api/producer_api.py
+++ b/tests/automatic/producer/python_api/producer_api.py
@@ -128,6 +128,7 @@ producer.send_file(1, local_path="./file1", exposed_path="processed/" + data_sou
 producer.wait_requests_finished(50000)
 n = producer.get_requests_queue_size()
 assert_eq(n, 0, "requests in queue")
+assert_eq(n, 0, "requests in queue")
 
 # send to another data to stream stream
 producer.send(2, "processed/" + data_source + "/" + "file10", None,
@@ -137,9 +138,19 @@ producer.wait_requests_finished(50000)
 n = producer.get_requests_queue_size()
 assert_eq(n, 0, "requests in queue")
 
-#stream infos
+# pool limits (checking volume only)
+data = np.arange(1000000, dtype=np.float64)
+producer.set_requests_queue_limits(0,1)
+try:
+    producer.send(11, "processed/bla", data)
+except asapo_producer.AsapoRequestsPoolIsFull as e:
+    print(e)
+else:
+    print("should be AsapoRequestsPoolIsFull error ")
+    sys.exit(1)
 
 
+#stream infos
 info = producer.stream_info()
 assert_eq(info['lastId'], 10, "stream_info last id")
 assert_eq(info['name'], "default", "stream_info name")
-- 
GitLab