Skip to content
Snippets Groups Projects
Commit a8cbc4ad authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

add python calls

parent 19838f4b
No related branches found
No related tags found
No related merge requests found
......@@ -21,8 +21,7 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo":
ErrorTemplateInterface kWrongInput "asapo::ProducerErrorTemplates::kWrongInput"
ErrorTemplateInterface kLocalIOError "asapo::ProducerErrorTemplates::kLocalIOError"
ErrorTemplateInterface kServerWarning "asapo::ProducerErrorTemplates::kServerWarning"
ErrorTemplateInterface kRequestPoolIsFull "asapo::ProducerErrorTemplates::kRequestPoolIsFull"
cdef extern from "asapo/asapo_producer.h" namespace "asapo":
cppclass MessageData:
......@@ -100,6 +99,8 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil:
void StopThreads__()
void SetLogLevel(LogLevel level)
uint64_t GetRequestsQueueSize()
uint64_t GetRequestsQueueVolumeMb()
void SetRequestsQueueLimits(uint64_t size, uint64_t volume)
Error WaitRequestsFinished(uint64_t timeout_ms)
Error SendStreamFinishedFlag(string stream, uint64_t last_id, string next_stream, RequestCallback callback)
StreamInfo GetStreamInfo(string stream, uint64_t timeout_ms, Error* err)
......
......@@ -51,6 +51,8 @@ class AsapoTimeOutError(AsapoProducerError):
class AsapoServerWarning(AsapoProducerError):
pass
class AsapoRequestsPoolIsFull(AsapoProducerError):
pass
cdef python_exception_from_error(Error& err):
error_string = _str(err.get().Explain())
......@@ -62,6 +64,8 @@ cdef python_exception_from_error(Error& err):
return AsapoLocalIOError(error_string)
elif err == kServerWarning:
return AsapoServerWarning(error_string)
elif err == kRequestPoolIsFull:
return AsapoRequestsPoolIsFull(error_string)
else:
return AsapoProducerError(error_string)
......@@ -271,6 +275,10 @@ cdef class PyProducer:
return
def get_requests_queue_size(self):
return self.c_producer.get().GetRequestsQueueSize()
def get_requests_queue_volume_mb(self):
return self.c_producer.get().GetRequestsQueueVolumeMb()
def set_requests_queue_limits(self,uint64_t size = 0, uint64_t volume_mb = 0):
return self.c_producer.get().SetRequestsQueueLimits(size,volume_mb)
def wait_requests_finished(self,timeout_ms):
"""
:param timeout_ms: timeout in milliseconds
......
......@@ -128,6 +128,7 @@ producer.send_file(1, local_path="./file1", exposed_path="processed/" + data_sou
producer.wait_requests_finished(50000)
n = producer.get_requests_queue_size()
assert_eq(n, 0, "requests in queue")
assert_eq(n, 0, "requests in queue")
# send to another data to stream stream
producer.send(2, "processed/" + data_source + "/" + "file10", None,
......@@ -137,9 +138,19 @@ producer.wait_requests_finished(50000)
n = producer.get_requests_queue_size()
assert_eq(n, 0, "requests in queue")
#stream infos
# pool limits (checking volume only)
data = np.arange(1000000, dtype=np.float64)
producer.set_requests_queue_limits(0,1)
try:
producer.send(11, "processed/bla", data)
except asapo_producer.AsapoRequestsPoolIsFull as e:
print(e)
else:
print("should be AsapoRequestsPoolIsFull error ")
sys.exit(1)
#stream infos
info = producer.stream_info()
assert_eq(info['lastId'], 10, "stream_info last id")
assert_eq(info['name'], "default", "stream_info name")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment