Skip to content
Snippets Groups Projects
Commit c08f7ff1 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

update producer errors

parent 71e628f4
No related branches found
No related tags found
No related merge requests found
#include "producer/producer.h"
#include "producer_impl.h"
#include "producer/producer_error.h"
std::unique_ptr<asapo::Producer> asapo::Producer::Create(const std::string& endpoint, uint8_t n_processing_threads,
asapo::RequestHandlerType type, SourceCredentials source_cred, Error* err) {
if (n_processing_threads > kMaxProcessingThreads || n_processing_threads == 0) {
*err = TextError("Set number of processing threads > 0 and <= " + std::to_string(kMaxProcessingThreads));
*err = ProducerErrorTemplates::kWrongInput.Generate("Set number of processing threads > 0 and <= " + std::to_string(
kMaxProcessingThreads));
return nullptr;
}
......
......@@ -46,7 +46,7 @@ TEST(CreateProducer, TooManyThreads) {
std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", asapo::kMaxProcessingThreads + 1,
asapo::RequestHandlerType::kTcp, SourceCredentials{"bt", "", ""}, &err);
ASSERT_THAT(producer, Eq(nullptr));
ASSERT_THAT(err, Ne(nullptr));
ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput));
}
......@@ -55,7 +55,7 @@ TEST(CreateProducer, ZeroThreads) {
std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", 0,
asapo::RequestHandlerType::kTcp, SourceCredentials{"bt", "", ""}, &err);
ASSERT_THAT(producer, Eq(nullptr));
ASSERT_THAT(err, Ne(nullptr));
ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput));
}
......
......@@ -18,10 +18,9 @@ cdef extern from "asapo_producer.h" namespace "asapo":
cdef bool operator==(Error lhs, ErrorTemplateInterface rhs)
cdef extern from "asapo_producer.h" namespace "asapo":
ErrorTemplateInterface kInternalServerError "asapo::ProducerErrorTemplates::kInternalServerError"
ErrorTemplateInterface kCannotSendDataToReceivers "asapo::ProducerErrorTemplates::kCannotSendDataToReceivers"
ErrorTemplateInterface kRequestPoolIsFull "asapo::ProducerErrorTemplates::kRequestPoolIsFull"
ErrorTemplateInterface kTimeout "asapo::ProducerErrorTemplates::kTimeout"
ErrorTemplateInterface kWrongInput "asapo::ProducerErrorTemplates::kWrongInput"
ErrorTemplateInterface kLocalIOError "asapo::ProducerErrorTemplates::kLocalIOError"
cdef extern from "asapo_producer.h" namespace "asapo":
cppclass FileData:
......
......@@ -41,15 +41,25 @@ class AsapoProducerError(Exception):
class AsapoWrongInputError(AsapoProducerError):
pass
#todo: more types
cdef python_exception_from_error(Error& err):
return AsapoProducerError(err.get().Explain())
class AsapoLocalIOError(AsapoProducerError):
pass
class AsapoTimeOutError(AsapoProducerError):
pass
cdef python_exception_from_error(Error& err):
if err == kTimeout:
return AsapoTimeOutError(err.get().Explain())
elif err == kWrongInput:
return AsapoWrongInputError(err.get().Explain())
elif err == kLocalIOError:
return AsapoLocalIOError(err.get().Explain())
else:
return AsapoProducerError(err.get().Explain())
cdef throw_exception(Error& err):
raise python_exception_from_error(err)
cdef void* data_pointer_nparray(data):
if data is None:
return <void*>NULL
......@@ -143,8 +153,9 @@ cdef class PyProducer:
:type ingest_mode: int
:param callback: callback function, default None
:type callback: callback(info,err), where info - json string with event header that was used to send data, err - error string or None
:raises: AsapoProducerError
:rtype: string
:raises:
AsapoWrongInputError: wrong input (authorization, meta, ...)
AsapoProducerError: actually should not happen
"""
if type(data) == np.ndarray or data == None:
......@@ -170,8 +181,10 @@ cdef class PyProducer:
:type ingest_mode: int
:param callback: callback function, default None
:type callback: callback(info,err), where info - json string with event header that was used to send data, err - error string or None
:raises: AsapoProducerError
:rtype: string
:raises:
AsapoWrongInputError: wrong input (authorization, meta, ...)
AsapoLocalIOError: problems reading file to send
AsapoProducerError: actually should not happen
"""
cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode)
......@@ -187,7 +200,8 @@ cdef class PyProducer:
"""
:param timeout_ms: timeout in milliseconds
:type timeout_ms: int
:raises: AsapoProducerError
:raises:
AsapoTimeoutError: requests not finished for a given timeout
"""
cdef Error err
cdef uint64_t timeout = timeout_ms
......@@ -239,6 +253,21 @@ cdef class PyProducer:
return pyProd
def create_producer(endpoint,beamtime_id,stream,token,nthreads):
"""
:param endpoint: server endpoint (url:port)
:type endpoint: string
:param beamtime_id: beamtime id
:type beamtime_id: string
:param stream: stream to producer data to
:type stream: string
:param token: authorization token
:type token: string
:param nthreads: ingest mode flag
:type nthreads: int
:raises:
AsapoWrongInputError: wrong input (number of threads, ,,,)
AsapoProducerError: actually should not happen
"""
return PyProducer.__create_producer(_bytes(endpoint),_bytes(beamtime_id),_bytes(stream),_bytes(token),nthreads)
......
......@@ -38,4 +38,5 @@ sleep 1
$1 $3 $stream $beamtime_id "127.0.0.1:8400" > out || cat out
cat out
cat out | grep "successfuly sent" | wc -l | grep 7
\ No newline at end of file
cat out | grep "successfuly sent" | wc -l | grep 7
cat out | grep "local i/o error"
\ No newline at end of file
......@@ -54,6 +54,9 @@ producer.send_data(5, stream+"/"+"file6",b"hello",
producer.send_data(6, stream+"/"+"file7",None,
ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback)
#send single file/wrong filename
producer.send_file(1, local_path = "./file2", exposed_path = stream+"/"+"file1", callback = callback)
producer.wait_requests_finished(50000)
n = producer.get_requests_queue_size()
if n!=0:
......@@ -64,7 +67,7 @@ if n!=0:
# create with error
try:
producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, 0)
except Exception as e:
except asapo_producer.AsapoWrongInputError as e:
print(e)
else:
print("should be error")
......
test
......@@ -64,7 +64,7 @@ if n!=0:
# create with error
try:
producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, 0)
except Exception as e:
except Exception as Asapo:
print(e)
else:
print("should be error")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment