Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
A
asapo
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container Registry
Model registry
Operate
Terraform modules
Analyze
Contributor analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Joao Alvim Oliveira Dias De Almeida
asapo
Commits
6cb83777
Commit
6cb83777
authored
5 years ago
by
Sergey Yakubov
Browse files
Options
Downloads
Patches
Plain Diff
update Python docs
parent
4a034008
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
producer/api/python/asapo_producer.pyx.in
+48
-9
48 additions, 9 deletions
producer/api/python/asapo_producer.pyx.in
sphinx/source/producer.rst
+20
-0
20 additions, 0 deletions
sphinx/source/producer.rst
worker/api/python/asapo_worker.pyx.in
+8
-5
8 additions, 5 deletions
worker/api/python/asapo_worker.pyx.in
with
76 additions
and
14 deletions
producer/api/python/asapo_producer.pyx.in
+
48
−
9
View file @
6cb83777
...
...
@@ -15,7 +15,6 @@ INGEST_MODE_TRANSFER_DATA = kTransferData
INGEST_MODE_TRANSFER_METADATA_ONLY = kTransferMetaDataOnly
INGEST_MODE_STORE_IN_FILESYSTEM = kStoreInFilesystem
cdef extern from "numpy/ndarraytypes.h":
void PyArray_ENABLEFLAGS(np.ndarray arr, int flags)
...
...
@@ -57,6 +56,8 @@ cdef class PyProducer:
log_level = LogLevel_Debug
elif level == "info" :
log_level = LogLevel_Info
elif level == "error" :
log_level = LogLevel_Error
elif level == "none" :
log_level = LogLevel_None
elif level == "warn" :
...
...
@@ -66,7 +67,7 @@ cdef class PyProducer:
return
self.c_producer.get().SetLogLevel(log_level)
def send_np_array(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None):
def
__
send_np_array(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None):
cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode)
event_header.file_id = id
if data is None:
...
...
@@ -99,7 +100,7 @@ cdef class PyProducer:
event_header.subset_size = subset[1]
return event_header
def send_bytes(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None):
def
__
send_bytes(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None):
cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode)
event_header.file_size = len(data)
err = self.c_producer.get().SendData_(event_header, data_pointer_bytes(data), ingest_mode,
...
...
@@ -112,15 +113,53 @@ cdef class PyProducer:
Py_XINCREF(<PyObject*>data)
return None
def send_data(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None):
def send_data(self, id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None):
"""
:param id: unique data id
:type id: int
:param exposed_path: Path which will be exposed to consumers
:type exposed_path: string
:param data: data to send
:type data: contiguous numpy or bytes array, can be None for INGEST_MODE_TRANSFER_METADATA_ONLY ingest mode
:param user_meta: user metadata, default None
:type user_meta: JSON string
:param subset: a tuple with two int values (subset id, subset size), default None
:type subset: tuple
:param ingest_mode: ingest mode flag
:type ingest_mode: int
:param callback: callback function, default None
:type callback: callback(info,err), where info - json string with event header that was used to send data, err - error string or None
:return: error, None if success.
:rtype: string
"""
if type(data) == np.ndarray or data == None:
return self.send_np_array(id,exposed_path,data,user_meta,subset,ingest_mode,callback)
return self.
__
send_np_array(id,exposed_path,data,user_meta,subset,ingest_mode,callback)
elif type(data) == bytes:
return self.send_bytes(id,exposed_path,data,user_meta,subset,ingest_mode,callback)
return self.
__
send_bytes(id,exposed_path,data,user_meta,subset,ingest_mode,callback)
else:
return "wrong data type: " + str(type(data))
def send_file(self,int id, local_path, exposed_path,user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None):
def send_file(self, id, local_path, exposed_path,user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None):
"""
:param id: unique data id
:type id: int
:param local_path: Path to file to send
:type local_path: string
:param exposed_path: Path which will be exposed to consumers
:type exposed_path: string
:param user_meta: user metadata, default None
:type user_meta: JSON string
:param subset: a tuple with two int values (subset id, subset size), default None
:type subset: tuple
:param ingest_mode: ingest mode flag
:type ingest_mode: int
:param callback: callback function, default None
:type callback: callback(info,err), where info - json string with event header that was used to send data, err - error string or None
:return: error, None if success.
:rtype: string
"""
cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode)
event_header.file_size = 0
err = self.c_producer.get().SendFile(event_header, _bytes(local_path), ingest_mode,
...
...
@@ -158,7 +197,7 @@ cdef class PyProducer:
self.c_callback_python(py_callback,header,GetErrorString(&err))
@staticmethod
def create_producer(endpoint,beamtime_id,stream,token,nthreads):
def
__
create_producer(endpoint,beamtime_id,stream,token,nthreads):
pyProd = PyProducer()
cdef Error err
cdef SourceCredentials source
...
...
@@ -173,7 +212,7 @@ cdef class PyProducer:
return pyProd,None
def create_producer(endpoint,beamtime_id,stream,token,nthreads):
return PyProducer.create_producer(_bytes(endpoint),_bytes(beamtime_id),_bytes(stream),_bytes(token),nthreads)
return PyProducer.
__
create_producer(_bytes(endpoint),_bytes(beamtime_id),_bytes(stream),_bytes(token),nthreads)
__version__ = "@ASAPO_VERSION_PYTHON@"
This diff is collapsed.
Click to expand it.
sphinx/source/producer.rst
+
20
−
0
View file @
6cb83777
...
...
@@ -7,3 +7,23 @@ Producer
:members:
:undoc-members:
:show-inheritance:
Injest modes:
-------------
.. data:: INGEST_MODE_TRANSFER_DATA
.. data:: INGEST_MODE_TRANSFER_METADATA_ONLY
.. data:: INGEST_MODE_STORE_IN_FILESYSTEM
.. data:: DEFAULT_INGEST_MODE = INGEST_MODE_TRANSFER_DATA | INGEST_MODE_STORE_IN_FILESYSTEM
Logger levels:
--------------
info (default)
error
warn
debug
none
\ No newline at end of file
This diff is collapsed.
Click to expand it.
worker/api/python/asapo_worker.pyx.in
+
8
−
5
View file @
6cb83777
...
...
@@ -197,14 +197,17 @@ cdef class __PyDataBrokerFactory:
else:
return broker,None
def create_server_broker(server_name,source_path,beamtime_id,stream,token,timeout):
def create_server_broker(server_name,source_path,beamtime_id,stream,token,timeout
_ms
):
"""
:param server_name: Handler to this.
:type server_name: String
:param source_path: The network this node belongs to.
:param server_name: Server endpoint (hostname:port)
:type server_name: string
:param source_path: Path to the folder to read data from
:type source_path: string
:return: Broker object and error. (None,err) if case of error, (broker, None) if success
:rtype: Tuple with broker object and error.
"""
factory = __PyDataBrokerFactory()
return factory.create_server_broker(_bytes(server_name),_bytes(source_path),_bytes(beamtime_id),_bytes(stream),_bytes(token),timeout)
return factory.create_server_broker(_bytes(server_name),_bytes(source_path),_bytes(beamtime_id),_bytes(stream),_bytes(token),timeout
_ms
)
__version__ = "@ASAPO_VERSION_PYTHON@"
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment