Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
B
bluesky_blissdata
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
fs-ec
bluesky_blissdata
Commits
2bf4f09b
Commit
2bf4f09b
authored
10 months ago
by
Udai Singh
Browse files
Options
Downloads
Patches
Plain Diff
fixed issues
parent
40180a49
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
README.rst
+6
-17
6 additions, 17 deletions
README.rst
setup.cfg
+5
-9
5 additions, 9 deletions
setup.cfg
src/bluesky_blissdata/dispacher.py
+274
-0
274 additions, 0 deletions
src/bluesky_blissdata/dispacher.py
src/bluesky_blissdata/run.py
+135
-0
135 additions, 0 deletions
src/bluesky_blissdata/run.py
with
420 additions
and
26 deletions
README.rst
+
6
−
17
View file @
2bf4f09b
...
...
@@ -23,27 +23,16 @@
:alt: Twitter
:target: https://twitter.com/bluesky-blissdata
.. image:: https://img.shields.io/badge/-PyScaffold-005CA0?logo=pyscaffold
:alt: Project generated with PyScaffold
:target: https://pyscaffold.org/
..
.. image:: https://img.shields.io/badge/-PyScaffold-005CA0?logo=pyscaffold
..
:alt: Project generated with PyScaffold
..
:target: https://pyscaffold.org/
|
.. image:: https://img.shields.io/pypi/v/bluesky-blissdata.svg
:alt: PyPI-Server
:target: https://pypi.org/project/bluesky-blissdata/
=================
bluesky-blissdata
=================
Add a short description here!
A longer description of your project goes here...
.. _pyscaffold-notes:
Note
====
This project has been set up using PyScaffold 4.5. For details and usage
information on PyScaffold see https://pyscaffold.org/.
This diff is collapsed.
Click to expand it.
setup.cfg
+
5
−
9
View file @
2bf4f09b
...
...
@@ -49,6 +49,8 @@ package_dir =
# For more information, check out https://semver.org/.
install_requires
=
importlib-metadata
; python_version<"3.8"
bluesky-queueserver
blissdata
[options.packages.find]
...
...
@@ -69,8 +71,8 @@ testing =
[options.entry_points]
# Add here console scripts like:
#
console_scripts =
#
script_name
= bluesky_blissdata.
module:functio
n
console_scripts
=
bluesky_blissdata
=
bluesky_blissdata.
run:mai
n
# For example:
# console_scripts =
# fibonacci = bluesky_blissdata.skeleton:run
...
...
@@ -114,10 +116,4 @@ exclude =
build
dist
.eggs
docs/conf.py
[pyscaffold]
# PyScaffold's parameters when the project was created.
# This will be used when updating. Do not change!
version
=
4.5
package
=
bluesky_blissdata
docs/conf.py
\ No newline at end of file
This diff is collapsed.
Click to expand it.
src/bluesky_blissdata/dispacher.py
0 → 100644
+
274
−
0
View file @
2bf4f09b
#!/usr/bin/env python
import
numpy
as
np
from
blissdata.redis_engine.store
import
DataStore
import
os
from
blissdata.redis_engine.scan
import
ScanState
from
blissdata.redis_engine.encoding.numeric
import
NumericStreamEncoder
from
blissdata.redis_engine.encoding.json
import
JsonStreamEncoder
from
blissdata.schemas.scan_info
import
ScanInfoDict
,
DeviceDict
,
ChainDict
,
ChannelDict
# Configure blissdata for a Redis database
# The identity model we use is ESRFIdentityModel in blissdata.redis_engine.models
# It defines the json fields indexed by RediSearch
# ------------------------------------------------------------------ #
# create scan in the database
# ------------------------------------------------------------------ #
class
blissdata_dispacher
:
def
__init__
(
self
,
host
=
"
localhost
"
,
port
=
6380
)
:
try
:
self
.
_data_store
=
DataStore
(
"
redis://
"
+
host
+
"
:
"
+
str
(
port
))
except
OSError
as
e
:
raise
ConnectionError
(
self
.
_error_message
(
e
))
def
__call__
(
self
,
name
,
doc
):
self
.
scan_id
=
{
"
name
"
:
"
my_scan
"
,
"
number
"
:
1
,
"
data_policy
"
:
"
no_policy
"
,
"
session
"
:
"
sim_session
"
,
"
proposal
"
:
"
blc00001
"
,}
#"collection": "sample1",
#"dataset": "sample1_scan59"}
if
(
name
==
"
start
"
):
self
.
prepare_scan
(
doc
)
if
(
name
==
"
descriptor
"
):
self
.
config_datastream
(
doc
)
if
(
name
==
"
event
"
):
self
.
push_datastream
(
doc
)
if
(
name
==
"
stop
"
):
self
.
stop_datastream
(
doc
)
def
prepare_scan
(
self
,
doc
):
self
.
scan_id
[
'
name
'
]
=
doc
.
get
(
'
plan_name
'
,
self
.
scan_id
[
'
name
'
])
self
.
scan_id
[
'
number
'
]
=
doc
.
get
(
'
scan_id
'
,
self
.
scan_id
[
'
number
'
])
self
.
uid
=
doc
.
get
(
'
uid
'
)
self
.
scan
=
self
.
_data_store
.
create_scan
(
self
.
scan_id
,
info
=
{
"
name
"
:
doc
[
'
plan_name
'
],
"
uid
"
:
self
.
uid
})
self
.
dets
=
doc
.
get
(
'
detectors
'
)
self
.
motors
=
doc
.
get
(
'
motors
'
)
self
.
start_time
=
doc
[
'
time
'
]
self
.
npoints
=
doc
[
'
num_points
'
]
self
.
count_time
=
1
self
.
start
=
[]
self
.
stop
=
[]
if
self
.
motors
is
not
None
:
j
=
0
for
i
in
range
(
len
(
self
.
motors
)):
self
.
start
.
append
(
doc
.
get
(
'
plan_args
'
)[
'
args
'
][
j
+
1
])
self
.
stop
.
append
(
doc
.
get
(
'
plan_args
'
)[
'
args
'
][
j
+
2
])
j
+=
3
self
.
devices
:
dict
[
str
,
DeviceDict
]
=
{}
self
.
devices
[
"
timer
"
]
=
DeviceDict
(
name
=
"
timer
"
,
channels
=
[],
metadata
=
{})
self
.
devices
[
"
counters
"
]
=
DeviceDict
(
name
=
"
counters
"
,
channels
=
[],
metadata
=
{})
self
.
devices
[
"
axis
"
]
=
DeviceDict
(
name
=
"
axis
"
,
channels
=
[],
metadata
=
{})
# declare some streams in the scan
def
config_datastream
(
self
,
doc
):
ddesc_dict
=
{}
self
.
stream_list
=
{}
self
.
acq_chain
:
dict
[
str
,
ChainDict
]
=
{}
self
.
channels
:
dict
[
str
,
ChannelDict
]
=
{}
elem
=
{
'
name
'
:
None
,
"
label
"
:
None
,
'
dtype
'
:
None
,
"
shape
"
:
None
,
"
unit
"
:
None
}
for
dev
in
doc
.
get
(
"
data_keys
"
).
keys
():
elem
[
'
name
'
]
=
doc
.
get
(
"
data_keys
"
)[
dev
][
'
object_name
'
]
elem
[
'
label
'
]
=
dev
elem
[
'
dtype
'
]
=
np
.
float64
elem
[
'
shape
'
]
=
doc
.
get
(
"
data_keys
"
)[
dev
][
'
shape
'
]
elem
[
'
precision
'
]
=
doc
.
get
(
"
data_keys
"
)[
dev
][
'
precision
'
]
unit
=
""
if
elem
[
'
name
'
]
in
self
.
motors
:
device_type
=
"
axis
"
if
elem
[
'
name
'
]
in
self
.
dets
:
device_type
=
"
counters
"
self
.
devices
[
device_type
][
'
channels
'
].
append
(
dev
)
self
.
channels
[
elem
[
'
label
'
]]
=
ChannelDict
(
device
=
device_type
,
dim
=
len
(
elem
[
'
shape
'
]),
display_name
=
dev
)
encoder
=
NumericStreamEncoder
(
dtype
=
np
.
float64
,
shape
=
elem
[
'
shape
'
])
scalar_stream
=
self
.
scan
.
create_stream
(
elem
[
'
label
'
],
encoder
,
info
=
{
"
unit
"
:
unit
})
ddesc_dict
[
elem
[
'
label
'
]]
=
elem
self
.
stream_list
[
elem
[
'
label
'
]]
=
scalar_stream
elem
[
'
name
'
]
=
"
timer
"
elem
[
'
label
'
]
=
"
time
"
elem
[
'
dtype
'
]
=
np
.
float64
elem
[
'
shape
'
]
=
[]
elem
[
'
precision
'
]
=
4
unit
=
"
s
"
device_type
=
'
timer
'
self
.
devices
[
device_type
][
'
channels
'
].
append
(
elem
[
'
label
'
])
self
.
channels
[
elem
[
'
label
'
]]
=
ChannelDict
(
device
=
device_type
,
dim
=
len
(
elem
[
'
shape
'
]),
display_name
=
elem
[
'
name
'
])
encoder
=
NumericStreamEncoder
(
dtype
=
np
.
float64
,
shape
=
elem
[
'
shape
'
])
scalar_stream
=
self
.
scan
.
create_stream
(
elem
[
'
label
'
],
encoder
,
info
=
{
"
unit
"
:
unit
})
ddesc_dict
[
elem
[
'
label
'
]]
=
elem
self
.
stream_list
[
elem
[
'
label
'
]]
=
scalar_stream
# self.scalar_stream = self.scan.create_stream("scalars", encoder, info={"unit": "mV", "embed the info": "you want"})
# encoder = NumericStreamEncoder(dtype=np.int32, shape=(4096, ))
# self.vector_stream = scan.create_stream("vectors", encoder, info={})
# encoder = NumericStreamEncoder(dtype=np.uint16, shape=(1024, 100, ))
# self.array_stream = scan.create_stream("arrays", encoder, info={})
# encoder = JsonStreamEncoder()
# self.json_stream = scan.create_stream("jsons", encoder, info={})
self
.
acq_chain
[
"
axis
"
]
=
ChainDict
(
top_master
=
"
timer
"
,
devices
=
list
(
self
.
devices
.
keys
()),
scalars
=
[],
spectra
=
[],
images
=
[],
master
=
{})
# gather some metadata before running the scan
scan_info
=
self
.
scan_info
(
ddesc_dict
)
self
.
scan
.
info
.
update
(
scan_info
)
# ------------------------------------------------------------------ #
self
.
scan
.
prepare
()
# upload initial metadata and stream declarations
# ------------------------------------------------------------------ #
self
.
scan
.
start
()
# Scan is ready to start, eventually wait for other processes.
# Sharing stream keys to external publishers can be done there.
def
push_datastream
(
self
,
doc
):
# ------------------------------------------------------------------ #
# ------------------------------------------------------------------ #
print
(
doc
.
get
(
'
data
'
))
data
=
doc
.
get
(
'
data
'
)
for
k
in
data
.
keys
():
try
:
ch_stream
=
self
.
stream_list
[
k
]
except
KeyError
:
self
.
warning
(
"
Stream for {} not found
"
.
format
(
k
))
continue
ch_stream
.
send
(
data
[
k
])
try
:
ch_stream
=
self
.
stream_list
[
'
time
'
]
except
KeyError
:
self
.
warning
(
"
Stream for {} not found
"
.
format
(
k
))
ch_stream
.
send
(
doc
[
'
time
'
])
# close streams
# scalar_stream.seal()
# vector_stream.seal()
# array_stream.seal()
# json_stream.seal()
def
stop_datastream
(
self
,
doc
):
for
stream
in
self
.
stream_list
.
values
():
try
:
stream
.
seal
()
except
Exception
as
e
:
print
(
e
)
self
.
warning
(
"
Error sealing stream {}
"
.
format
(
stream
.
name
))
continue
self
.
scan
.
stop
()
self
.
scan
.
info
[
'
end_time
'
]
=
doc
[
'
time
'
]
self
.
scan
.
info
[
'
exit_status
'
]
=
doc
[
'
exit_status
'
]
self
.
scan
.
info
[
'
reason
'
]
=
doc
[
'
reason
'
]
self
.
scan
.
info
[
'
num_events
'
]
=
doc
[
'
num_events
'
]
self
.
scan
.
close
()
# ------------------------------------------------------------------ #
# ---------------------------------------------------------------- #
def
scan_info
(
self
,
ddesc_dict
):
# filename, masterfiles, images_path = self.file_info(
# singleFile=self.nx_save_single_file)
scan_info
=
{
##################################
# Scan metadata
##################################
"
name
"
:
self
.
scan
.
name
,
"
scan_nb
"
:
self
.
scan
.
number
,
"
session_name
"
:
self
.
scan
.
session
,
"
data_policy
"
:
self
.
scan
.
data_policy
,
"
start_time
"
:
self
.
start_time
,
"
type
"
:
self
.
scan
.
name
,
"
npoints
"
:
self
.
npoints
,
"
count_time
"
:
self
.
count_time
,
##################################
# Device information
##################################
"
acquisition_chain
"
:
self
.
acq_chain
,
"
devices
"
:
self
.
devices
,
"
channels
"
:
self
.
channels
,
##################################
# Plot metadata
##################################
"
display_extra
"
:
{
"
plotselect
"
:
[]},
"
plots
"
:
[],
"
start
"
:
self
.
start
,
"
stop
"
:
self
.
stop
,
##################################
# NeXus writer metadata
##################################
# "save": self.nexus_save,
# "filename": filename,
# "images_path": images_path,
# "publisher": "test",
# "publisher_version": "1.0",
# "data_writer": "nexus",
# "writer_options": {"chunk_options": {}, "separate_scan_files": False},
# "scan_meta_categories": [
# "positioners",
# "nexuswriter",
# "instrument",
# "technique",
# "snapshot",
# "datadesc",
# ],
# "nexuswriter": {
# "devices": {},
# "instrument_info": {"name": "alba-"+self.scan.beamline, "name@short_name": self.scan.beamline},
# "masterfiles": masterfiles,
# "technique": {},
# },
# "positioners": {}, # TODO decide how to fill this (from snapshot?)
# # TODO decide how to fill this (from instruments? env var?)
# "instrument": {},
# "datadesc": ddesc_dict,
##################################
# Mandatory by the schema
##################################
"
user_name
"
:
os
.
getlogin
(),
# tangosys?
}
scan_info
[
"
plots
"
].
append
({
"
kind
"
:
"
curve-plot
"
})
# Add curves selected in measurement group for plotting
for
elem
in
ddesc_dict
.
items
():
try
:
plot_type
=
elem
[
1
].
get
(
"
plot_type
"
,
0
)
plot_axes
=
elem
[
1
].
get
(
"
plot_axes
"
,
[])
name
=
elem
[
1
].
get
(
"
label
"
,
""
)
axes
=
[]
if
plot_type
==
1
:
for
axis
in
plot_axes
:
if
"
<idx>
"
in
axis
:
axis
=
"
#Pt No
"
axes
.
append
({
"
kind
"
:
"
curve
"
,
"
x
"
:
axis
,
"
y
"
:
name
})
scan_info
[
"
plots
"
].
append
(
{
"
kind
"
:
"
curve-plot
"
,
"
name
"
:
name
,
"
items
"
:
axes
})
elif
plot_type
==
2
:
self
.
info
(
"
Image plot not implemented yet
"
)
except
IndexError
:
continue
return
scan_info
This diff is collapsed.
Click to expand it.
src/bluesky_blissdata/
skeleto
n.py
→
src/bluesky_blissdata/
ru
n.py
+
135
−
0
View file @
2bf4f09b
...
...
@@ -23,9 +23,10 @@ References:
import
argparse
import
logging
import
sys
from
bluesky.callbacks.zmq
import
RemoteDispatcher
from
bluesky_blissdata.dispacher
import
blissdata_dispacher
from
bluesky_blissdata
import
__version__
from
docopt
import
docopt
,
DocoptExit
__author__
=
"
Udai Singh
"
__copyright__
=
"
Udai Singh
"
__license__
=
"
MIT
"
...
...
@@ -39,63 +40,72 @@ _logger = logging.getLogger(__name__)
# `from bluesky_blissdata.skeleton import fib`,
# when using this Python module as a library.
def
fib
(
n
):
"""
Fibonacci example function
Args:
n (int): integer
Returns:
int: n-th Fibonacci number
"""
assert
n
>
0
a
,
b
=
1
,
1
for
_i
in
range
(
n
-
1
):
a
,
b
=
b
,
a
+
b
return
a
# ---- CLI ----
# The functions defined in this section are wrappers around the main Python
# API allowing them to be called directly from the terminal as a CLI
# executable/script.
def
parse_args
(
args
):
"""
Parse command line parameters
"""
Parse command line parameters
Args:
args (List[str]): command line parameters as list of strings
(for example ``[
"
--help
"
]``).
Returns:
:obj:`argparse.Namespace`: command line parameters namespace
"""
parser
=
argparse
.
ArgumentParser
(
description
=
"
Blusesky blissdata interface
"
)
parser
.
add_argument
(
"
--version
"
,
action
=
"
version
"
,
version
=
f
"
bluesky-blissdata
{
__version__
}
"
,
)
parser
.
add_argument
(
"
--redis-host
"
,
"
--redis_host
"
,
dest
=
"
redis_host
"
,
default
=
"
localhost
"
,
help
=
"
redis host for bliss data
"
)
parser
.
add_argument
(
"
--redis-port
"
,
"
--redis_port
"
,
dest
=
"
redis_port
"
,
default
=
6379
,
type
=
int
,
help
=
"
redis connection port
"
,
)
parser
.
add_argument
(
"
--zmq-host
"
,
"
--zmq_host
"
,
dest
=
"
zmq_host
"
,
default
=
"
localhost
"
,
help
=
"
zmq host for bluesky RemoteDispacher
"
)
parser
.
add_argument
(
"
--zmq-port
"
,
"
--zmq_port
"
,
dest
=
"
zmq_port
"
,
default
=
5578
,
type
=
int
,
help
=
"
zmq port for bluesky RemoteDispachert
"
,
)
parser
.
add_argument
(
"
-v
"
,
"
--verbose
"
,
dest
=
"
loglevel
"
,
help
=
"
set loglevel to INFO
"
,
action
=
"
store_const
"
,
const
=
logging
.
INFO
,
)
parser
.
add_argument
(
"
-vv
"
,
"
--very-verbose
"
,
dest
=
"
loglevel
"
,
help
=
"
set loglevel to DEBUG
"
,
action
=
"
store_const
"
,
const
=
logging
.
DEBUG
,
)
return
parser
.
parse_args
(
args
)
Args:
args (List[str]): command line parameters as list of strings
(for example ``[
"
--help
"
]``).
Returns:
:obj:`argparse.Namespace`: command line parameters namespace
"""
parser
=
argparse
.
ArgumentParser
(
description
=
"
Just a Fibonacci demonstration
"
)
parser
.
add_argument
(
"
--version
"
,
action
=
"
version
"
,
version
=
f
"
bluesky-blissdata
{
__version__
}
"
,
)
parser
.
add_argument
(
dest
=
"
n
"
,
help
=
"
n-th Fibonacci number
"
,
type
=
int
,
metavar
=
"
INT
"
)
parser
.
add_argument
(
"
-v
"
,
"
--verbose
"
,
dest
=
"
loglevel
"
,
help
=
"
set loglevel to INFO
"
,
action
=
"
store_const
"
,
const
=
logging
.
INFO
,
)
parser
.
add_argument
(
"
-vv
"
,
"
--very-verbose
"
,
dest
=
"
loglevel
"
,
help
=
"
set loglevel to DEBUG
"
,
action
=
"
store_const
"
,
const
=
logging
.
DEBUG
,
)
return
parser
.
parse_args
(
args
)
def
setup_logging
(
loglevel
):
...
...
@@ -109,41 +119,17 @@ def setup_logging(loglevel):
level
=
loglevel
,
stream
=
sys
.
stdout
,
format
=
logformat
,
datefmt
=
"
%Y-%m-%d %H:%M:%S
"
)
def
main
(
args
):
"""
Wrapper allowing :func:`fib` to be called with string arguments in a CLI fashion
Instead of returning the value from :func:`fib`, it prints the result to the
``stdout`` in a nicely formatted message.
Args:
args (List[str]): command line parameters as list of strings
(for example ``[
"
--verbose
"
,
"
42
"
]``).
"""
args
=
parse_args
(
args
)
def
main
(
argv
=
None
)
->
int
:
args
=
parse_args
(
sys
.
argv
[
1
:])
setup_logging
(
args
.
loglevel
)
_logger
.
debug
(
"
Starting crazy calculations...
"
)
print
(
f
"
The
{
args
.
n
}
-th Fibonacci number is
{
fib
(
args
.
n
)
}
"
)
_logger
.
info
(
"
Script ends here
"
)
def
run
():
"""
Calls :func:`main` passing the CLI arguments extracted from :obj:`sys.argv`
This function can be used as entry point to create console scripts with setuptools.
"""
main
(
sys
.
argv
[
1
:])
_logger
.
debug
(
"
Starting bluesky_blissdata...
"
)
d
=
RemoteDispatcher
((
args
.
zmq_host
,
args
.
zmq_port
))
post_document
=
blissdata_dispacher
(
args
.
redis_host
,
args
.
redis_port
)
d
.
subscribe
(
post_document
)
d
.
start
()
_logger
.
info
(
"
stoping bluesky_blissdata
"
)
if
__name__
==
"
__main__
"
:
# ^ This is a guard statement that will prevent the following code from
# being executed in the case someone imports this file instead of
# executing it as a script.
# https://docs.python.org/3/library/__main__.html
# After installing your project with pip, users can also run your Python
# modules as scripts via the ``-m`` flag, as defined in PEP 338::
#
# python -m bluesky_blissdata.skeleton 42
#
run
()
sys
.
exit
(
main
())
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment