diff --git a/CHANGELOG.md b/CHANGELOG.md index f8c67a30f2fffc6fa733ca5550d28ee7d3ddf9a3..513b2886d8379590dc38fb75f26e96c779108b8b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ IMPROVEMENTS * receiver use ASAP3 directory structure to save files to * API documentation is available at [asapo-docs.desy.de](asapo-docs.desy.de) * switch to using cmake 3.7+ +* error messages in Python as Python strings, not byte objects BUG FIXES * consumer operation timout - take duration of the operation into account \ No newline at end of file diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 91544e7391e9be7f452a24c8555a5b29bcd0aa1c..1475ad22f0e0ba37df4ba88ee97aa8dc7e3f6fa4 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -72,6 +72,7 @@ cdef extern from "asapo_consumer.h" namespace "asapo" nogil: cdef extern from "asapo_consumer.h" namespace "asapo": ErrorTemplateInterface kNoData "asapo::ConsumerErrorTemplates::kNoData" ErrorTemplateInterface kEndOfStream "asapo::ConsumerErrorTemplates::kEndOfStream" + ErrorTemplateInterface kStreamFinished "asapo::ConsumerErrorTemplates::kStreamFinished" ErrorTemplateInterface kUnavailableService "asapo::ConsumerErrorTemplates::kUnavailableService" ErrorTemplateInterface kInterruptedTransaction "asapo::ConsumerErrorTemplates::kInterruptedTransaction" ErrorTemplateInterface kLocalIOError "asapo::ConsumerErrorTemplates::kLocalIOError" @@ -79,4 +80,5 @@ cdef extern from "asapo_consumer.h" namespace "asapo": cdef cppclass ConsumerErrorData: uint64_t id uint64_t id_max + string next_substream diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 791c754566eb8a22aac1e24447282eb06fbadd78..ad31fd10332652e785bc61d6ea31274262efe2d9 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -37,12 +37,22 @@ class AsapoWrongInputError(AsapoConsumerError): class AsapoInterruptedTransactionError(AsapoConsumerError): pass +class AsapoStreamFinishedError(AsapoConsumerError): + pass + class AsapoUnavailableServiceError(AsapoConsumerError): pass class AsapoLocalIOError(AsapoConsumerError): pass +class AsapoStreamFinishedError(AsapoConsumerError): + def __init__(self,message,id_max=None,next_substream=None): + AsapoConsumerError.__init__(self,message) + self.id_max = id_max + self.next_substream = _str(next_substream) + + class AsapoEndOfStreamError(AsapoConsumerError): def __init__(self,message,id_max=None): AsapoConsumerError.__init__(self,message) @@ -63,6 +73,12 @@ cdef throw_exception(Error& err): raise AsapoEndOfStreamError(error_string,data.id_max) else: raise AsapoEndOfStreamError(error_string) + elif err == kStreamFinished: + data=<ConsumerErrorData*>err.get().GetCustomData() + if data != NULL: + raise AsapoStreamFinishedError(error_string,data.id_max,data.next_substream) + else: + raise AsapoStreamFinishedError(error_string) elif err == kNoData: data=<ConsumerErrorData*>err.get().GetCustomData() if data != NULL: @@ -81,7 +97,6 @@ cdef throw_exception(Error& err): raise AsapoConsumerError(error_string) cdef class PyDataBroker: - _default_stream = "default" cdef DataBroker* c_broker def _op(self, op, group_id, substream, meta_only, uint64_t id): cdef FileInfo info @@ -111,11 +126,11 @@ cdef class PyDataBroker: arr = np.PyArray_SimpleNewFromData(1, dims, np.NPY_BYTE, ptr) PyArray_ENABLEFLAGS(arr,np.NPY_OWNDATA) return arr,meta - def get_next(self, group_id, substream = _default_stream, meta_only = True): + def get_next(self, group_id, substream = "default", meta_only = True): return self._op("next",group_id,substream,meta_only,0) - def get_last(self, group_id, substream = _default_stream, meta_only = True): + def get_last(self, group_id, substream = "default", meta_only = True): return self._op("last",group_id,substream,meta_only,0) - def get_by_id(self,id,group_id, substream = _default_stream,meta_only = True): + def get_by_id(self,id,group_id, substream = "default",meta_only = True): return self._op("id",group_id,substream,meta_only,id) def retrieve_data(self,meta): json_str = json.dumps(meta) @@ -134,7 +149,7 @@ cdef class PyDataBroker: arr = np.PyArray_SimpleNewFromData(1, dims, np.NPY_BYTE, ptr) PyArray_ENABLEFLAGS(arr,np.NPY_OWNDATA) return arr - def get_current_size(self, substream = _default_stream): + def get_current_size(self, substream = "default"): cdef Error err cdef uint64_t size cdef string b_substream = _bytes(substream) @@ -146,7 +161,7 @@ cdef class PyDataBroker: return size def set_timeout(self,timeout): self.c_broker.SetTimeout(timeout) - def set_lastread_marker(self,value,group_id, substream = _default_stream): + def set_lastread_marker(self,value,group_id, substream = "default"): cdef string b_group_id = _bytes(group_id) cdef string b_substream = _bytes(substream) cdef Error err @@ -156,7 +171,7 @@ cdef class PyDataBroker: if err: throw_exception(err) return - def reset_lastread_marker(self,group_id, substream = _default_stream): + def reset_lastread_marker(self,group_id, substream = "default"): cdef string b_group_id = _bytes(group_id) cdef string b_substream = _bytes(substream) cdef Error err @@ -185,7 +200,7 @@ cdef class PyDataBroker: list.append(_str(substream)) return list - def query_images(self,query, substream = _default_stream): + def query_images(self,query, substream = "default"): cdef string b_query = _bytes(query) cdef string b_substream = _bytes(substream) cdef Error err @@ -219,11 +234,11 @@ cdef class PyDataBroker: for fi in dataset.content: json_list.append(json.loads(_str(fi.Json()))) return dataset.id, json_list - def get_next_dataset(self, group_id, substream = _default_stream): + def get_next_dataset(self, group_id, substream = "default"): return self._op_dataset("next",group_id,substream,0) - def get_last_dataset(self, group_id, substream = _default_stream): + def get_last_dataset(self, group_id, substream = "default"): return self._op_dataset("last",group_id,substream,0) - def get_dataset_by_id(self, id, group_id, substream = _default_stream): + def get_dataset_by_id(self, id, group_id, substream = "default"): return self._op_dataset("id",group_id,substream,id) def get_beamtime_meta(self): cdef Error err diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index ce6912c336152fd8a8a6d75979f4da16b68359df..4ce8fda3ba334b5e41bbfd5796205fb06c01240d 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -89,12 +89,13 @@ cdef extern from "asapo_producer.h" namespace "asapo" nogil: cppclass Producer: @staticmethod unique_ptr[Producer] Create(string endpoint,uint8_t nthreads,RequestHandlerType type, SourceCredentials source,Error* error) - Error SendFile(const EventHeader& event_header, string full_path, uint64_t ingest_mode,RequestCallback callback) - Error SendData__(const EventHeader& event_header, void* data, uint64_t ingest_mode,RequestCallback callback) + Error SendFile(const EventHeader& event_header, string substream, string full_path, uint64_t ingest_mode,RequestCallback callback) + Error SendData__(const EventHeader& event_header, string substream, void* data, uint64_t ingest_mode,RequestCallback callback) void StopThreads__() void SetLogLevel(LogLevel level) uint64_t GetRequestsQueueSize() Error WaitRequestsFinished(uint64_t timeout_ms) + Error SendSubstreamFinishedFlag(string substream, uint64_t last_id, string next_substream, RequestCallback callback) cdef extern from "asapo_producer.h" namespace "asapo": uint64_t kDefaultIngestMode diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index e74837c9c25f10df990476969e3c425db00bb4f2..3af967b8ddfa8775ea9ccc079001d5fbde89348f 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -48,14 +48,15 @@ class AsapoTimeOutError(AsapoProducerError): pass cdef python_exception_from_error(Error& err): + error_string = _str(err.get().Explain()) if err == kTimeout: - return AsapoTimeOutError(err.get().Explain()) + return AsapoTimeOutError(error_string) elif err == kWrongInput: - return AsapoWrongInputError(err.get().Explain()) + return AsapoWrongInputError(error_string) elif err == kLocalIOError: - return AsapoLocalIOError(err.get().Explain()) + return AsapoLocalIOError(error_string) else: - return AsapoProducerError(err.get().Explain()) + return AsapoProducerError(error_string) cdef throw_exception(Error& err): raise python_exception_from_error(err) @@ -99,14 +100,14 @@ cdef class PyProducer: return self.c_producer.get().SetLogLevel(log_level) - def __send_np_array(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None): + def __send_np_array(self,int id, exposed_path,data, user_meta=None,subset=None,substream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None): cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode) event_header.file_id = id if data is None: event_header.file_size = 0 else: event_header.file_size = data.nbytes - err = self.c_producer.get().SendData__(event_header, data_pointer_nparray(data), ingest_mode, + err = self.c_producer.get().SendData__(event_header, _bytes(substream), data_pointer_nparray(data),ingest_mode, unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_ndarr, <void*>self,<void*>callback, <void*>data)) if err: @@ -133,10 +134,10 @@ cdef class PyProducer: event_header.subset_size = subset[1] return event_header - def __send_bytes(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None): + def __send_bytes(self,int id, exposed_path,data, user_meta=None,subset=None, substream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None): cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode) event_header.file_size = len(data) - err = self.c_producer.get().SendData__(event_header, data_pointer_bytes(data), ingest_mode, + err = self.c_producer.get().SendData__(event_header,_bytes(substream), data_pointer_bytes(data), ingest_mode, unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr, <void*>self,<void*>callback, <void*>data)) if err: @@ -146,7 +147,7 @@ cdef class PyProducer: Py_XINCREF(<PyObject*>callback) return - def send_data(self, id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None): + def send_data(self, id, exposed_path, data, user_meta=None, subset=None, substream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None): """ :param id: unique data id :type id: int @@ -167,13 +168,31 @@ cdef class PyProducer: AsapoProducerError: actually should not happen """ if type(data) == np.ndarray or data == None: - self.__send_np_array(id,exposed_path,data,user_meta,subset,ingest_mode,callback) + self.__send_np_array(id,exposed_path,data,user_meta,subset,substream,ingest_mode,callback) elif type(data) == bytes: - self.__send_bytes(id,exposed_path,data,user_meta,subset,ingest_mode,callback) + self.__send_bytes(id,exposed_path,data,user_meta,subset,substream,ingest_mode,callback) else: raise(AsapoProducerError("wrong data type: " + str(type(data)))) + def send_substream_finished_flag(self, substream, last_id, next_substream = None, callback = None): + """ + :param substream: substream name + :type substream: string + :param id: id of the last record + :param next_substream: name of the next substream or None + :type substream: string + :param callback: callback function, default None + :type callback: callback(info,err), where info - json string with event header that was used to send data, err - error string or None + :raises: + AsapoWrongInputError: wrong input (authorization, meta, ...) + AsapoProducerError: actually should not happen + """ + err = self.c_producer.get().SendSubstreamFinishedFlag(_bytes(substream), last_id,_bytes(next_substream) if next_substream != None else "", + unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL)) + if err: + throw_exception(err) + - def send_file(self, id, local_path, exposed_path,user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None): + def send_file(self, id, local_path, exposed_path, user_meta=None, subset=None, substream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None): """ :param id: unique data id :type id: int @@ -197,7 +216,7 @@ cdef class PyProducer: cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode) event_header.file_size = 0 - err = self.c_producer.get().SendFile(event_header, _bytes(local_path), ingest_mode, + err = self.c_producer.get().SendFile(event_header, _bytes(local_path), _bytes(substream), ingest_mode, unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL)) if err: throw_exception(err) diff --git a/tests/automatic/full_chain/CMakeLists.txt b/tests/automatic/full_chain/CMakeLists.txt index c177a9f4a17ef05167f054ace379ada74f268945..ecf39ccedb412397637b6ba5778b7180fee60162 100644 --- a/tests/automatic/full_chain/CMakeLists.txt +++ b/tests/automatic/full_chain/CMakeLists.txt @@ -11,4 +11,5 @@ add_subdirectory(simple_chain_filegen_multisource) add_subdirectory(simple_chain_filegen_readdata_cache) add_subdirectory(simple_chain_filegen_readdata_file) add_subdirectory(simple_chain_dataset) -add_subdirectory(send_recv_substreams) \ No newline at end of file +add_subdirectory(send_recv_substreams) +add_subdirectory(send_recv_substreams_python) \ No newline at end of file diff --git a/tests/automatic/full_chain/send_recv_substreams_python/CMakeLists.txt b/tests/automatic/full_chain/send_recv_substreams_python/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..4312c3ca1e80f8d6b9e7530323d2ffaea2fb99cd --- /dev/null +++ b/tests/automatic/full_chain/send_recv_substreams_python/CMakeLists.txt @@ -0,0 +1,14 @@ +set(TARGET_NAME send_recv_substreams_python) +prepare_asapo() + +if (UNIX) + get_target_property(PYTHON_LIBS_CONSUMER python-lib BINARY_DIR) + get_target_property(PYTHON_LIBS_PRODUCER python-lib-producer BINARY_DIR) +else() + get_target_property(PYTHON_LIBS_CONSUMER asapo_consumer BINARY_DIR) + get_target_property(PYTHON_LIBS_PRODUCER asapo_producer BINARY_DIR) +endif() + +file(TO_NATIVE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/send_recv_substreams.py TEST_SCRIPT ) + +add_script_test("${TARGET_NAME}" "${Python_EXECUTABLE} ${PYTHON_LIBS_CONSUMER} ${PYTHON_LIBS_PRODUCER} ${TEST_SCRIPT} " nomem) diff --git a/tests/automatic/full_chain/send_recv_substreams_python/check_linux.sh b/tests/automatic/full_chain/send_recv_substreams_python/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..fed341d8cbddf42ba90d0a2949e6a42aeba0e327 --- /dev/null +++ b/tests/automatic/full_chain/send_recv_substreams_python/check_linux.sh @@ -0,0 +1,37 @@ +#!/usr/bin/env bash + +source_path=. +beamtime_id=asapo_test +stream_in=detector + +indatabase_name=${beamtime_id}_${stream_in} +token=IEfwsWa0GXky2S3MkxJSUHJT1sI8DD5teRdjBUXVRxk= + +beamline=test + +set -e + +trap Cleanup EXIT + +Cleanup() { + set +e + nomad stop nginx + nomad run nginx_kill.nmd && nomad stop -yes -purge nginx_kill + nomad stop discovery + nomad stop broker + nomad stop receiver + nomad stop authorizer + echo "db.dropDatabase()" | mongo ${indatabase_name} +} + +nomad run nginx.nmd +nomad run discovery.nmd +nomad run broker.nmd +nomad run receiver.nmd +nomad run authorizer.nmd + + +export PYTHONPATH=$2:$3:${PYTHONPATH} + + +$1 $4 127.0.0.1:8400 $beamtime_id $token \ No newline at end of file diff --git a/tests/automatic/full_chain/send_recv_substreams_python/check_windows.bat b/tests/automatic/full_chain/send_recv_substreams_python/check_windows.bat new file mode 100644 index 0000000000000000000000000000000000000000..aa79cf59d7dff753f5127ebbe37b6fc81fd09af5 --- /dev/null +++ b/tests/automatic/full_chain/send_recv_substreams_python/check_windows.bat @@ -0,0 +1,36 @@ +SET source_path=. +SET beamtime_id=asapo_test +SET stream_in=detector + +SET indatabase_name=%beamtime_id%_%stream_in% + +SET token=IEfwsWa0GXky2S3MkxJSUHJT1sI8DD5teRdjBUXVRxk= + +SET beamline=test + +SET mongo_exe="c:\Program Files\MongoDB\Server\4.2\bin\mongo.exe" + + +c:\opt\consul\nomad run discovery.nmd +c:\opt\consul\nomad run broker.nmd +c:\opt\consul\nomad run nginx.nmd +c:\opt\consul\nomad run receiver.nmd +c:\opt\consul\nomad run authorizer.nmd + +"%1" 127.0.0.1:8400 %beamtime_id% %token% + +goto :clean + +:error +call :clean +exit /b 1 + +:clean +c:\opt\consul\nomad stop discovery +c:\opt\consul\nomad stop broker +c:\opt\consul\nomad stop nginx +c:\opt\consul\nomad run nginx_kill.nmd && c:\opt\consul\nomad stop -yes -purge nginx_kill +c:\opt\consul\nomad stop receiver +c:\opt\consul\nomad stop authorizer + +echo db.dropDatabase() | %mongo_exe% %indatabase_name% diff --git a/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py b/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py new file mode 100644 index 0000000000000000000000000000000000000000..a8081ed28c697db817aad91f6a82c7869b3093ff --- /dev/null +++ b/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py @@ -0,0 +1,60 @@ +from __future__ import print_function + +import asapo_consumer +import asapo_producer +import sys +import os + + +import threading +lock = threading.Lock() + +timeout = 10 * 1000 + +def assert_eq(val,expected,name): + if val != expected: + print ("error at "+name) + print ('val: ', val,' expected: ',expected) + sys.exit(1) + +def callback(header,err): + lock.acquire() # to print + if err is not None: + print("could not sent: ",header,err) + else: + print ("successfuly sent: ",header) + lock.release() + +source, beamtime, token = sys.argv[1:] + +broker = asapo_consumer.create_server_broker(source,".", beamtime,"",token,timeout) +producer = asapo_producer.create_producer(source,beamtime, "", token, 1) +producer.set_log_level("debug") + +group_id = broker.generate_group_id() + +n_send = 10 + +for i in range(n_send): + producer.send_data(i+1, "name"+str(i),None,ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY,substream = "substream", callback = callback) + +producer.send_substream_finished_flag("substream", 10, next_substream = "next_substream", callback = callback) +producer.wait_requests_finished(timeout) + +n_recv = 0 +substream_finished=False +while True: + try: + data, meta = broker.get_next(group_id,substream = "substream", meta_only=True) + print ("received: ",meta) + n_recv = n_recv + 1 + except asapo_consumer.AsapoStreamFinishedError as finished_substream: + substream_finished = True + assert_eq(finished_substream.id_max, 11, "last id") + assert_eq(finished_substream.next_substream, "next_substream", "next substream") + break + +assert_eq(n_recv, n_send, "send=recv") +assert_eq(substream_finished, True, "substream finished") + +