diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index a5987b464c5ce159774d1d8270515d58e0ad5ef0..d3c884afff427085c70f0b9ebdd1e8345f94709d 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -15,7 +15,6 @@ INGEST_MODE_TRANSFER_DATA = kTransferData INGEST_MODE_TRANSFER_METADATA_ONLY = kTransferMetaDataOnly INGEST_MODE_STORE_IN_FILESYSTEM = kStoreInFilesystem - cdef extern from "numpy/ndarraytypes.h": void PyArray_ENABLEFLAGS(np.ndarray arr, int flags) @@ -57,6 +56,8 @@ cdef class PyProducer: log_level = LogLevel_Debug elif level == "info" : log_level = LogLevel_Info + elif level == "error" : + log_level = LogLevel_Error elif level == "none" : log_level = LogLevel_None elif level == "warn" : @@ -66,7 +67,7 @@ cdef class PyProducer: return self.c_producer.get().SetLogLevel(log_level) - def send_np_array(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None): + def __send_np_array(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None): cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode) event_header.file_id = id if data is None: @@ -99,7 +100,7 @@ cdef class PyProducer: event_header.subset_size = subset[1] return event_header - def send_bytes(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None): + def __send_bytes(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None): cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode) event_header.file_size = len(data) err = self.c_producer.get().SendData_(event_header, data_pointer_bytes(data), ingest_mode, @@ -112,15 +113,53 @@ cdef class PyProducer: Py_XINCREF(<PyObject*>data) return None - def send_data(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None): + def send_data(self, id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None): + """ + :param id: unique data id + :type id: int + :param exposed_path: Path which will be exposed to consumers + :type exposed_path: string + :param data: data to send + :type data: contiguous numpy or bytes array, can be None for INGEST_MODE_TRANSFER_METADATA_ONLY ingest mode + :param user_meta: user metadata, default None + :type user_meta: JSON string + :param subset: a tuple with two int values (subset id, subset size), default None + :type subset: tuple + :param ingest_mode: ingest mode flag + :type ingest_mode: int + :param callback: callback function, default None + :type callback: callback(info,err), where info - json string with event header that was used to send data, err - error string or None + :return: error, None if success. + :rtype: string + """ + if type(data) == np.ndarray or data == None: - return self.send_np_array(id,exposed_path,data,user_meta,subset,ingest_mode,callback) + return self.__send_np_array(id,exposed_path,data,user_meta,subset,ingest_mode,callback) elif type(data) == bytes: - return self.send_bytes(id,exposed_path,data,user_meta,subset,ingest_mode,callback) + return self.__send_bytes(id,exposed_path,data,user_meta,subset,ingest_mode,callback) else: return "wrong data type: " + str(type(data)) - def send_file(self,int id, local_path, exposed_path,user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None): + def send_file(self, id, local_path, exposed_path,user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None): + """ + :param id: unique data id + :type id: int + :param local_path: Path to file to send + :type local_path: string + :param exposed_path: Path which will be exposed to consumers + :type exposed_path: string + :param user_meta: user metadata, default None + :type user_meta: JSON string + :param subset: a tuple with two int values (subset id, subset size), default None + :type subset: tuple + :param ingest_mode: ingest mode flag + :type ingest_mode: int + :param callback: callback function, default None + :type callback: callback(info,err), where info - json string with event header that was used to send data, err - error string or None + :return: error, None if success. + :rtype: string + """ + cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode) event_header.file_size = 0 err = self.c_producer.get().SendFile(event_header, _bytes(local_path), ingest_mode, @@ -158,7 +197,7 @@ cdef class PyProducer: self.c_callback_python(py_callback,header,GetErrorString(&err)) @staticmethod - def create_producer(endpoint,beamtime_id,stream,token,nthreads): + def __create_producer(endpoint,beamtime_id,stream,token,nthreads): pyProd = PyProducer() cdef Error err cdef SourceCredentials source @@ -173,7 +212,7 @@ cdef class PyProducer: return pyProd,None def create_producer(endpoint,beamtime_id,stream,token,nthreads): - return PyProducer.create_producer(_bytes(endpoint),_bytes(beamtime_id),_bytes(stream),_bytes(token),nthreads) + return PyProducer.__create_producer(_bytes(endpoint),_bytes(beamtime_id),_bytes(stream),_bytes(token),nthreads) __version__ = "@ASAPO_VERSION_PYTHON@" diff --git a/sphinx/source/producer.rst b/sphinx/source/producer.rst index acbbf6a8e3244c2e25dc194cfe6adaeb4f8326d9..b3985141cffca4fd1de08ef40976933fad739c5b 100644 --- a/sphinx/source/producer.rst +++ b/sphinx/source/producer.rst @@ -7,3 +7,23 @@ Producer :members: :undoc-members: :show-inheritance: + +Injest modes: +------------- +.. data:: INGEST_MODE_TRANSFER_DATA +.. data:: INGEST_MODE_TRANSFER_METADATA_ONLY +.. data:: INGEST_MODE_STORE_IN_FILESYSTEM +.. data:: DEFAULT_INGEST_MODE = INGEST_MODE_TRANSFER_DATA | INGEST_MODE_STORE_IN_FILESYSTEM + + +Logger levels: +-------------- +info (default) + +error + +warn + +debug + +none \ No newline at end of file diff --git a/worker/api/python/asapo_worker.pyx.in b/worker/api/python/asapo_worker.pyx.in index e543c02da61a83e8144b1dcf3116d9def6cd0d0a..cd19e16d5304de8fceef47e88edced7c12041bbb 100644 --- a/worker/api/python/asapo_worker.pyx.in +++ b/worker/api/python/asapo_worker.pyx.in @@ -197,14 +197,17 @@ cdef class __PyDataBrokerFactory: else: return broker,None -def create_server_broker(server_name,source_path,beamtime_id,stream,token,timeout): +def create_server_broker(server_name,source_path,beamtime_id,stream,token,timeout_ms): """ - :param server_name: Handler to this. - :type server_name: String - :param source_path: The network this node belongs to. + :param server_name: Server endpoint (hostname:port) + :type server_name: string + :param source_path: Path to the folder to read data from + :type source_path: string + :return: Broker object and error. (None,err) if case of error, (broker, None) if success + :rtype: Tuple with broker object and error. """ factory = __PyDataBrokerFactory() - return factory.create_server_broker(_bytes(server_name),_bytes(source_path),_bytes(beamtime_id),_bytes(stream),_bytes(token),timeout) + return factory.create_server_broker(_bytes(server_name),_bytes(source_path),_bytes(beamtime_id),_bytes(stream),_bytes(token),timeout_ms) __version__ = "@ASAPO_VERSION_PYTHON@"