Skip to content
Snippets Groups Projects
Commit 68f811c6 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

python producer api arrors as exceptions

parent 9cffd47d
No related branches found
No related tags found
No related merge requests found
......@@ -29,13 +29,6 @@ def wait_send(n_files, timeout_s):
break
time.sleep(1)
def assert_err(err):
if err is not None:
print(err)
sys.stdout.flush()
sys.exit(1)
source, path, beamtime,stream_in, stream_out, token, timeout_s,nthreads, transfer_data = sys.argv[1:]
timeout_s=int(timeout_s)
nthreads=int(nthreads)
......@@ -43,8 +36,7 @@ transfer_data=int(transfer_data)>0
broker = asapo_consumer.create_server_broker(source,path, beamtime,stream_in,token,timeout_s*1000)
producer, err = asapo_producer.create_producer(source,beamtime, stream_out, token, nthreads)
assert_err(err)
producer = asapo_producer.create_producer(source,beamtime, stream_out, token, nthreads)
group_id = broker.generate_group_id()
......@@ -60,11 +52,12 @@ while True:
data, meta = broker.get_next(group_id, meta_only=not transfer_data)
print ("received: ",meta)
n_recv = n_recv + 1
err = producer.send_data(meta['_id'],meta['name']+"_"+stream_out ,data,
producer.send_data(meta['_id'],meta['name']+"_"+stream_out ,data,
ingest_mode = ingest_mode, callback = callback)
assert_err(err)
except asapo_consumer.AsapoEndOfStreamError:
break
except asapo_producer.AsapoProducerError:
break
wait_send(n_recv,timeout_s)
......
......@@ -5,6 +5,7 @@
#include "common/version.h"
#include "producer/producer.h"
#include "producer/producer_error.h"
#endif //ASAPO_ASAPO_PRODUCER_H
......@@ -6,8 +6,6 @@
namespace asapo {
enum class ProducerErrorType {
kAlreadyConnected,
kConnectionNotReady,
kFileTooLarge,
kFileNameTooLong,
kEmptyFileName,
......@@ -28,12 +26,7 @@ enum class ProducerErrorType {
using ProducerErrorTemplate = ServiceErrorTemplate<ProducerErrorType, ErrorType::kProducerError>;
namespace ProducerErrorTemplates {
auto const kAlreadyConnected = ProducerErrorTemplate {
"Already connected", ProducerErrorType::kAlreadyConnected
};
auto const kConnectionNotReady = ProducerErrorTemplate {
"Connection not ready", ProducerErrorType::kConnectionNotReady
};
auto const kWrongIngestMode = ProducerErrorTemplate {
"wrong ingest mode", ProducerErrorType::kWrongIngestMode
......
......@@ -6,14 +6,33 @@ from libcpp cimport bool
ctypedef unsigned char uint8_t
ctypedef unsigned long uint64_t
ctypedef unique_ptr[ErrorInterface] Error
cdef extern from "asapo_producer.h" namespace "asapo":
cppclass Error:
cppclass CustomErrorData:
pass
cppclass ErrorInterface:
string Explain()
cppclass ErrorTemplateInterface:
pass
cdef bool operator==(Error lhs, ErrorTemplateInterface rhs)
cdef extern from "asapo_wrappers.h" namespace "asapo":
string GetErrorString(Error* err)
cdef extern from "asapo_producer.h" namespace "asapo":
ErrorTemplateInterface kFileTooLarge "asapo::ProducerErrorTemplates::kFileTooLarge"
ErrorTemplateInterface kFileNameTooLong "asapo::ProducerErrorTemplates::kFileNameTooLong"
ErrorTemplateInterface kEmptyFileName "asapo::ProducerErrorTemplates::kEmptyFileName"
ErrorTemplateInterface kNoData "asapo::ProducerErrorTemplates::kNoData"
ErrorTemplateInterface kZeroDataSize "asapo::ProducerErrorTemplates::kZeroDataSize"
ErrorTemplateInterface kBeamtimeIdTooLong "asapo::ProducerErrorTemplates::kBeamtimeIdTooLong"
ErrorTemplateInterface kBeamtimeAlreadySet "asapo::ProducerErrorTemplates::kBeamtimeAlreadySet"
ErrorTemplateInterface kFileIdAlreadyInUse "asapo::ProducerErrorTemplates::kFileIdAlreadyInUse"
ErrorTemplateInterface kErrorInMetadata "asapo::ProducerErrorTemplates::kErrorInMetadata"
ErrorTemplateInterface kErrorSubsetSize "asapo::ProducerErrorTemplates::kErrorSubsetSize"
ErrorTemplateInterface kAuthorizationFailed "asapo::ProducerErrorTemplates::kAuthorizationFailed"
ErrorTemplateInterface kInternalServerError "asapo::ProducerErrorTemplates::kInternalServerError"
ErrorTemplateInterface kCannotSendDataToReceivers "asapo::ProducerErrorTemplates::kCannotSendDataToReceivers"
ErrorTemplateInterface kRequestPoolIsFull "asapo::ProducerErrorTemplates::kRequestPoolIsFull"
ErrorTemplateInterface kWrongIngestMode "asapo::ProducerErrorTemplates::kWrongIngestMode"
cdef extern from "asapo_producer.h" namespace "asapo":
cppclass FileData:
......
......@@ -34,6 +34,21 @@ cdef bytes _bytes(s):
raise TypeError("Could not convert to unicode.")
class AsapoProducerError(Exception):
pass
class AsapoWrongInputError(AsapoProducerError):
pass
#todo: more types
cdef python_exception_from_error(Error& err):
return AsapoProducerError(err.get().Explain())
cdef throw_exception(Error& err):
raise python_exception_from_error(err)
cdef void* data_pointer_nparray(data):
if data is None:
return <void*>NULL
......@@ -77,16 +92,14 @@ cdef class PyProducer:
err = self.c_producer.get().SendData_(event_header, data_pointer_nparray(data), ingest_mode,
unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_ndarr,
<void*>self,<void*>callback if callback != None else NULL, <void*>data))
cdef err_str = GetErrorString(&err)
if err_str.strip():
return err_str
else:
if data is not None:
if data.base is not None:
Py_XINCREF(<PyObject*>data.base)
else:
Py_XINCREF(<PyObject*>data)
return None
if err:
throw_exception(err)
if data is not None:
if data.base is not None:
Py_XINCREF(<PyObject*>data.base)
else:
Py_XINCREF(<PyObject*>data)
return
cdef EventHeader create_event_header(self,int id, exposed_path,user_meta,subset,ingest_mode):
cdef EventHeader event_header
event_header.file_id = id
......@@ -106,12 +119,10 @@ cdef class PyProducer:
err = self.c_producer.get().SendData_(event_header, data_pointer_bytes(data), ingest_mode,
unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr,
<void*>self,<void*>callback if callback != None else NULL, <void*>data))
cdef err_str = GetErrorString(&err)
if err_str.strip():
return err_str
else:
Py_XINCREF(<PyObject*>data)
return None
if err:
throw_exception(err)
Py_XINCREF(<PyObject*>data)
return
def send_data(self, id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None):
"""
......@@ -129,16 +140,16 @@ cdef class PyProducer:
:type ingest_mode: int
:param callback: callback function, default None
:type callback: callback(info,err), where info - json string with event header that was used to send data, err - error string or None
:return: error, None if success.
:raises: AsapoProducerError
:rtype: string
"""
if type(data) == np.ndarray or data == None:
return self.__send_np_array(id,exposed_path,data,user_meta,subset,ingest_mode,callback)
self.__send_np_array(id,exposed_path,data,user_meta,subset,ingest_mode,callback)
elif type(data) == bytes:
return self.__send_bytes(id,exposed_path,data,user_meta,subset,ingest_mode,callback)
self.__send_bytes(id,exposed_path,data,user_meta,subset,ingest_mode,callback)
else:
return "wrong data type: " + str(type(data))
raise(AsapoProducerError("wrong data type: " + str(type(data))))
def send_file(self, id, local_path, exposed_path,user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None):
"""
......@@ -156,7 +167,7 @@ cdef class PyProducer:
:type ingest_mode: int
:param callback: callback function, default None
:type callback: callback(info,err), where info - json string with event header that was used to send data, err - error string or None
:return: error, None if success.
:raises: AsapoProducerError
:rtype: string
"""
......@@ -164,24 +175,21 @@ cdef class PyProducer:
event_header.file_size = 0
err = self.c_producer.get().SendFile(event_header, _bytes(local_path), ingest_mode,
unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL))
cdef err_str = GetErrorString(&err)
if err_str.strip():
return err_str
else:
return None
cdef void c_callback_python(self,py_callback, GenericRequestHeader header, string err_str):
if err:
throw_exception(err)
return
cdef void c_callback_python(self,py_callback, GenericRequestHeader header, Error& err):
if py_callback != None:
info_str = _str(header.Json())
info = json.loads(info_str)
if err_str.strip():
py_err = err_str
if err:
py_err = python_exception_from_error(err)
else:
py_err = None
py_err = None
py_callback(info,py_err)
cdef void c_callback(self,py_callback, GenericRequestHeader header, Error err) with gil:
cdef err_str = GetErrorString(&err)
self.c_callback_python(py_callback,header,"")
self.c_callback_python(py_callback,header,err)
cdef void c_callback_ndarr(self,py_callback,nd_array,GenericRequestHeader header, Error err) with gil:
if nd_array is not None:
......@@ -189,12 +197,12 @@ cdef class PyProducer:
Py_XDECREF(<PyObject*>nd_array.base)
else:
Py_XDECREF(<PyObject*>nd_array)
self.c_callback_python(py_callback,header,GetErrorString(&err))
self.c_callback_python(py_callback,header,err)
cdef void c_callback_bytesaddr(self,py_callback,bytes_array,GenericRequestHeader header, Error err) with gil:
if bytes_array is not None:
Py_XDECREF(<PyObject*>bytes_array)
self.c_callback_python(py_callback,header,GetErrorString(&err))
self.c_callback_python(py_callback,header,err)
@staticmethod
def __create_producer(endpoint,beamtime_id,stream,token,nthreads):
......@@ -205,11 +213,9 @@ cdef class PyProducer:
source.user_token = token
source.stream = stream
pyProd.c_producer = Producer.Create(endpoint,nthreads,RequestHandlerType_Tcp,source,&err)
cdef err_str = GetErrorString(&err)
if err_str.strip():
return None,err_str
else:
return pyProd,None
if err:
throw_exception(err)
return pyProd
def create_producer(endpoint,beamtime_id,stream,token,nthreads):
return PyProducer.__create_producer(_bytes(endpoint),_bytes(beamtime_id),_bytes(stream),_bytes(token),nthreads)
......
......@@ -13,21 +13,21 @@ receiver_folder=${receiver_root_folder}/${beamline}/${beamtime_id}
Cleanup() {
echo cleanup
rm -rf ${receiver_root_folder}
nomad stop receiver
nomad stop discovery
nomad stop authorizer
nomad stop nginx
echo "db.dropDatabase()" | mongo ${beamtime_id}_${stream}
nomad stop receiver >/dev/null
nomad stop discovery >/dev/null
nomad stop authorizer >/dev/null
nomad stop nginx >/dev/null
echo "db.dropDatabase()" | mongo ${beamtime_id}_${stream} >/dev/null
}
export PYTHONPATH=$2:${PYTHONPATH}
echo "db.${beamtime_id}_${stream}.insert({dummy:1})" | mongo ${beamtime_id}_${stream}
echo "db.${beamtime_id}_${stream}.insert({dummy:1})" | mongo ${beamtime_id}_${stream} >/dev/null
nomad run authorizer.nmd
nomad run nginx.nmd
nomad run receiver.nmd
nomad run discovery.nmd
nomad run authorizer.nmd >/dev/null
nomad run nginx.nmd >/dev/null
nomad run receiver.nmd >/dev/null
nomad run discovery.nmd >/dev/null
mkdir -p ${receiver_folder}
......
......@@ -25,45 +25,47 @@ def callback(header,err):
print ("successfuly sent: ",header)
lock.release()
def assert_err(err):
if err is not None:
print(err)
sys.exit(1)
producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads)
producer, err = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads)
assert_err(err)
producer.set_log_level("info")
#send single file
err = producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback)
assert_err(err)
producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback)
#send subsets
producer.send_file(2, local_path = "./file1", exposed_path = stream+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
producer.send_file(3, local_path = "./file1", exposed_path = stream+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
#send meta only
err = producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever",
producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever",
ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback)
assert_err(err)
data = np.arange(10,dtype=np.float64)
#send data from array
err = producer.send_data(4, stream+"/"+"file5",data,
ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
assert_err(err)
#send data from string
err = producer.send_data(5, stream+"/"+"file6",b"hello",
ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
assert_err(err)
#send metadata only
err = producer.send_data(6, stream+"/"+"file7",None,
ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback)
assert_err(err)
# create with error
try:
producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, 0)
except Exception as e:
print(e)
else:
print("should be error")
sys.exit(1)
time.sleep(5)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment