diff --git a/README.rst b/README.rst index baef8c2887be4bf42263dcc8ec12ec29c6eb059f..5f2c485ad1c2229b2713d642d9b7d9fb838b8563 100644 --- a/README.rst +++ b/README.rst @@ -23,27 +23,16 @@ :alt: Twitter :target: https://twitter.com/bluesky-blissdata -.. image:: https://img.shields.io/badge/-PyScaffold-005CA0?logo=pyscaffold - :alt: Project generated with PyScaffold - :target: https://pyscaffold.org/ +.. .. image:: https://img.shields.io/badge/-PyScaffold-005CA0?logo=pyscaffold +.. :alt: Project generated with PyScaffold +.. :target: https://pyscaffold.org/ -| +.. image:: https://img.shields.io/pypi/v/bluesky-blissdata.svg + :alt: PyPI-Server + :target: https://pypi.org/project/bluesky-blissdata/ ================= bluesky-blissdata ================= - Add a short description here! - - -A longer description of your project goes here... - - -.. _pyscaffold-notes: - -Note -==== - -This project has been set up using PyScaffold 4.5. For details and usage -information on PyScaffold see https://pyscaffold.org/. diff --git a/setup.cfg b/setup.cfg index 98073ba088cc701ec4e0c240b719589ebadf3f52..054f25ade61edf8c7cb8cb219084d9ae548451b1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -49,6 +49,8 @@ package_dir = # For more information, check out https://semver.org/. install_requires = importlib-metadata; python_version<"3.8" + bluesky-queueserver + blissdata [options.packages.find] @@ -69,8 +71,8 @@ testing = [options.entry_points] # Add here console scripts like: -# console_scripts = -# script_name = bluesky_blissdata.module:function +console_scripts = + bluesky_blissdata = bluesky_blissdata.run:main # For example: # console_scripts = # fibonacci = bluesky_blissdata.skeleton:run @@ -114,10 +116,4 @@ exclude = build dist .eggs - docs/conf.py - -[pyscaffold] -# PyScaffold's parameters when the project was created. -# This will be used when updating. Do not change! -version = 4.5 -package = bluesky_blissdata + docs/conf.py \ No newline at end of file diff --git a/src/bluesky_blissdata/dispacher.py b/src/bluesky_blissdata/dispacher.py new file mode 100644 index 0000000000000000000000000000000000000000..108a522aa16a36df3a5aa8421ee226cba446a9b8 --- /dev/null +++ b/src/bluesky_blissdata/dispacher.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python +import numpy as np + +from blissdata.redis_engine.store import DataStore +import os +from blissdata.redis_engine.scan import ScanState +from blissdata.redis_engine.encoding.numeric import NumericStreamEncoder +from blissdata.redis_engine.encoding.json import JsonStreamEncoder +from blissdata.schemas.scan_info import ScanInfoDict, DeviceDict, ChainDict, ChannelDict + +# Configure blissdata for a Redis database + + +# The identity model we use is ESRFIdentityModel in blissdata.redis_engine.models +# It defines the json fields indexed by RediSearch + + +# ------------------------------------------------------------------ # +# create scan in the database + +# ------------------------------------------------------------------ # + +class blissdata_dispacher: + def __init__(self,host="localhost",port=6380) : + try: + self._data_store = DataStore("redis://"+host+":"+str(port)) + except OSError as e: + raise ConnectionError(self._error_message(e)) + + + + def __call__(self,name,doc): + self.scan_id = { + "name": "my_scan", + "number": 1, + "data_policy": "no_policy", + "session": "sim_session", + "proposal": "blc00001",} + #"collection": "sample1", + #"dataset": "sample1_scan59"} + + if (name=="start"): + self.prepare_scan(doc) + if (name=="descriptor"): + self.config_datastream(doc) + if (name=="event"): + self.push_datastream(doc) + if (name=="stop"): + self.stop_datastream(doc) + def prepare_scan(self,doc): + self.scan_id['name']=doc.get('plan_name',self.scan_id['name']) + self.scan_id['number']=doc.get('scan_id',self.scan_id['number']) + self.uid=doc.get('uid') + self.scan = self._data_store.create_scan( + self.scan_id, info={"name": doc['plan_name'],"uid":self.uid}) + self.dets=doc.get('detectors') + self.motors=doc.get('motors') + self.start_time=doc['time'] + self.npoints=doc['num_points'] + self.count_time=1 + self.start=[] + self.stop=[] + if self.motors is not None: + j=0 + for i in range(len(self.motors)): + self.start.append(doc.get('plan_args')['args'][j+1]) + self.stop.append(doc.get('plan_args')['args'][j+2]) + j+=3 + self.devices: dict[str, DeviceDict] = {} + self.devices["timer"] = DeviceDict( + name="timer", channels=[], metadata={}) + self.devices["counters"] = DeviceDict( + name="counters", channels=[], metadata={}) + self.devices["axis"] = DeviceDict( + name="axis", channels=[], metadata={}) + # declare some streams in the scan + def config_datastream(self,doc): + ddesc_dict = {} + self.stream_list={} + self.acq_chain: dict[str, ChainDict] = {} + self.channels: dict[str, ChannelDict] = {} + elem={'name':None,"label":None,'dtype':None,"shape":None,"unit":None} + for dev in doc.get("data_keys").keys(): + elem['name']=doc.get("data_keys")[dev]['object_name'] + elem['label']=dev + elem['dtype']=np.float64 + elem['shape']=doc.get("data_keys")[dev]['shape'] + elem['precision']=doc.get("data_keys")[dev]['precision'] + unit="" + if elem['name'] in self.motors: + device_type="axis" + if elem['name'] in self.dets: + device_type = "counters" + self.devices[device_type]['channels'].append(dev) + self.channels[elem['label']] = ChannelDict(device=device_type, + dim=len(elem['shape']), + display_name=dev) + + encoder = NumericStreamEncoder(dtype=np.float64,shape=elem['shape']) + scalar_stream = self.scan.create_stream(elem['label'], encoder, info={"unit": unit}) + ddesc_dict[elem['label']] =elem + self.stream_list[elem['label']] = scalar_stream + elem['name']="timer" + elem['label']="time" + elem['dtype']=np.float64 + elem['shape']=[] + elem['precision']=4 + unit="s" + device_type='timer' + self.devices[device_type]['channels'].append(elem['label']) + self.channels[elem['label']] = ChannelDict(device=device_type, + dim=len(elem['shape']), + display_name=elem['name']) + + encoder = NumericStreamEncoder(dtype=np.float64,shape=elem['shape']) + scalar_stream = self.scan.create_stream( elem['label'], encoder, info={"unit": unit}) + ddesc_dict[elem['label']] =elem + self.stream_list[elem['label']] = scalar_stream + # self.scalar_stream = self.scan.create_stream("scalars", encoder, info={"unit": "mV", "embed the info": "you want"}) + + # encoder = NumericStreamEncoder(dtype=np.int32, shape=(4096, )) + # self.vector_stream = scan.create_stream("vectors", encoder, info={}) + + # encoder = NumericStreamEncoder(dtype=np.uint16, shape=(1024, 100, )) + # self.array_stream = scan.create_stream("arrays", encoder, info={}) + + # encoder = JsonStreamEncoder() + # self.json_stream = scan.create_stream("jsons", encoder, info={}) + self.acq_chain["axis"] = ChainDict( + top_master="timer", + devices=list(self.devices.keys()), + scalars=[], + spectra=[], + images=[], + master={}) + # gather some metadata before running the scan + scan_info = self.scan_info(ddesc_dict) + self.scan.info.update(scan_info) + # ------------------------------------------------------------------ # + self.scan.prepare() # upload initial metadata and stream declarations + # ------------------------------------------------------------------ # + self.scan.start() + # Scan is ready to start, eventually wait for other processes. + # Sharing stream keys to external publishers can be done there. + def push_datastream(self, doc): + # ------------------------------------------------------------------ # + # ------------------------------------------------------------------ # + print(doc.get('data')) + data=doc.get('data') + for k in data.keys(): + + try: + ch_stream = self.stream_list[k] + except KeyError: + self.warning("Stream for {} not found".format(k)) + continue + ch_stream.send(data[k]) + + try: + ch_stream = self.stream_list['time'] + except KeyError: + self.warning("Stream for {} not found".format(k)) + ch_stream.send(doc['time']) + + # close streams + # scalar_stream.seal() + # vector_stream.seal() + # array_stream.seal() + # json_stream.seal() + def stop_datastream(self,doc): + for stream in self.stream_list.values(): + try: + stream.seal() + except Exception as e: + print(e) + self.warning( + "Error sealing stream {}".format(stream.name)) + continue + + self.scan.stop() + self.scan.info['end_time'] = doc['time'] + self.scan.info['exit_status'] = doc['exit_status'] + self.scan.info['reason']=doc['reason'] + self.scan.info['num_events']=doc['num_events'] + self.scan.close() + # ------------------------------------------------------------------ # + + # ---------------------------------------------------------------- # + def scan_info(self, ddesc_dict): + # filename, masterfiles, images_path = self.file_info( + # singleFile=self.nx_save_single_file) + scan_info = { + ################################## + # Scan metadata + ################################## + "name": self.scan.name, + "scan_nb": self.scan.number, + "session_name": self.scan.session, + "data_policy": self.scan.data_policy, + "start_time": self.start_time, + "type": self.scan.name, + "npoints": self.npoints, + "count_time": self.count_time, + + ################################## + # Device information + ################################## + "acquisition_chain": self.acq_chain, + "devices": self.devices, + "channels": self.channels, + ################################## + # Plot metadata + ################################## + "display_extra": {"plotselect": []}, + "plots": [], + "start": self.start, + "stop": self.stop, + ################################## + # NeXus writer metadata + ################################## + # "save": self.nexus_save, + # "filename": filename, + # "images_path": images_path, + # "publisher": "test", + # "publisher_version": "1.0", + # "data_writer": "nexus", + # "writer_options": {"chunk_options": {}, "separate_scan_files": False}, + # "scan_meta_categories": [ + # "positioners", + # "nexuswriter", + # "instrument", + # "technique", + # "snapshot", + # "datadesc", + # ], + # "nexuswriter": { + # "devices": {}, + # "instrument_info": {"name": "alba-"+self.scan.beamline, "name@short_name": self.scan.beamline}, + # "masterfiles": masterfiles, + # "technique": {}, + # }, + # "positioners": {}, # TODO decide how to fill this (from snapshot?) + # # TODO decide how to fill this (from instruments? env var?) + # "instrument": {}, + # "datadesc": ddesc_dict, + ################################## + # Mandatory by the schema + ################################## + "user_name": os.getlogin(), # tangosys? + } + + scan_info["plots"].append({"kind": "curve-plot"}) + + # Add curves selected in measurement group for plotting + for elem in ddesc_dict.items(): + try: + plot_type = elem[1].get("plot_type", 0) + plot_axes = elem[1].get("plot_axes", []) + name = elem[1].get("label", "") + axes = [] + if plot_type == 1: + for axis in plot_axes: + if "<idx>" in axis: + axis = "#Pt No" + axes.append({"kind": "curve", "x": axis, "y": name}) + scan_info["plots"].append( + {"kind": "curve-plot", "name": name, "items": axes}) + elif plot_type == 2: + self.info("Image plot not implemented yet") + except IndexError: + continue + + return scan_info + diff --git a/src/bluesky_blissdata/run.py b/src/bluesky_blissdata/run.py new file mode 100644 index 0000000000000000000000000000000000000000..6de83bd8ec93997a09bb266209ac269307d96a85 --- /dev/null +++ b/src/bluesky_blissdata/run.py @@ -0,0 +1,135 @@ +""" +This is a skeleton file that can serve as a starting point for a Python +console script. To run this script uncomment the following lines in the +``[options.entry_points]`` section in ``setup.cfg``:: + + console_scripts = + fibonacci = bluesky_blissdata.skeleton:run + +Then run ``pip install .`` (or ``pip install -e .`` for editable mode) +which will install the command ``fibonacci`` inside your current environment. + +Besides console scripts, the header (i.e. until ``_logger``...) of this file can +also be used as template for Python modules. + +Note: + This file can be renamed depending on your needs or safely removed if not needed. + +References: + - https://setuptools.pypa.io/en/latest/userguide/entry_point.html + - https://pip.pypa.io/en/stable/reference/pip_install +""" + +import argparse +import logging +import sys +from bluesky.callbacks.zmq import RemoteDispatcher +from bluesky_blissdata.dispacher import blissdata_dispacher +from bluesky_blissdata import __version__ +from docopt import docopt, DocoptExit +__author__ = "Udai Singh" +__copyright__ = "Udai Singh" +__license__ = "MIT" + +_logger = logging.getLogger(__name__) + + +# ---- Python API ---- +# The functions defined in this section can be imported by users in their +# Python scripts/interactive interpreter, e.g. via +# `from bluesky_blissdata.skeleton import fib`, +# when using this Python module as a library. + +# ---- CLI ---- +# The functions defined in this section are wrappers around the main Python +# API allowing them to be called directly from the terminal as a CLI +# executable/script. +def parse_args(args): + """ Parse command line parameters + + Args: + args (List[str]): command line parameters as list of strings + (for example ``["--help"]``). + + Returns: + :obj:`argparse.Namespace`: command line parameters namespace + """ + parser = argparse.ArgumentParser(description="Blusesky blissdata interface") + parser.add_argument( + "--version", + action="version", + version=f"bluesky-blissdata {__version__}", + ) + parser.add_argument( + "--redis-host", + "--redis_host", + dest="redis_host", + default="localhost", + help="redis host for bliss data") + parser.add_argument( + "--redis-port", + "--redis_port", + dest="redis_port", + default=6379, + type=int, + help="redis connection port", + ) + parser.add_argument( + "--zmq-host", + "--zmq_host", + dest="zmq_host", + default="localhost", + help="zmq host for bluesky RemoteDispacher") + parser.add_argument( + "--zmq-port", + "--zmq_port", + dest="zmq_port", + default=5578, + type=int, + help="zmq port for bluesky RemoteDispachert", + ) + parser.add_argument( + "-v", + "--verbose", + dest="loglevel", + help="set loglevel to INFO", + action="store_const", + const=logging.INFO, + ) + parser.add_argument( + "-vv", + "--very-verbose", + dest="loglevel", + help="set loglevel to DEBUG", + action="store_const", + const=logging.DEBUG, + ) + return parser.parse_args(args) + + + +def setup_logging(loglevel): + """Setup basic logging + + Args: + loglevel (int): minimum loglevel for emitting messages + """ + logformat = "[%(asctime)s] %(levelname)s:%(name)s:%(message)s" + logging.basicConfig( + level=loglevel, stream=sys.stdout, format=logformat, datefmt="%Y-%m-%d %H:%M:%S" + ) + +def main(argv=None) -> int: + args = parse_args(sys.argv[1:]) + setup_logging(args.loglevel) + _logger.debug("Starting bluesky_blissdata...") + d = RemoteDispatcher((args.zmq_host, args.zmq_port)) + + post_document=blissdata_dispacher(args.redis_host,args.redis_port) + d.subscribe(post_document) + d.start() + _logger.info("stoping bluesky_blissdata") + +if __name__ == "__main__": + sys.exit(main()) + diff --git a/src/bluesky_blissdata/skeleton.py b/src/bluesky_blissdata/skeleton.py deleted file mode 100644 index cd4293ed04d565281ac0a49f6e9a4166cffd87bd..0000000000000000000000000000000000000000 --- a/src/bluesky_blissdata/skeleton.py +++ /dev/null @@ -1,149 +0,0 @@ -""" -This is a skeleton file that can serve as a starting point for a Python -console script. To run this script uncomment the following lines in the -``[options.entry_points]`` section in ``setup.cfg``:: - - console_scripts = - fibonacci = bluesky_blissdata.skeleton:run - -Then run ``pip install .`` (or ``pip install -e .`` for editable mode) -which will install the command ``fibonacci`` inside your current environment. - -Besides console scripts, the header (i.e. until ``_logger``...) of this file can -also be used as template for Python modules. - -Note: - This file can be renamed depending on your needs or safely removed if not needed. - -References: - - https://setuptools.pypa.io/en/latest/userguide/entry_point.html - - https://pip.pypa.io/en/stable/reference/pip_install -""" - -import argparse -import logging -import sys - -from bluesky_blissdata import __version__ - -__author__ = "Udai Singh" -__copyright__ = "Udai Singh" -__license__ = "MIT" - -_logger = logging.getLogger(__name__) - - -# ---- Python API ---- -# The functions defined in this section can be imported by users in their -# Python scripts/interactive interpreter, e.g. via -# `from bluesky_blissdata.skeleton import fib`, -# when using this Python module as a library. - - -def fib(n): - """Fibonacci example function - - Args: - n (int): integer - - Returns: - int: n-th Fibonacci number - """ - assert n > 0 - a, b = 1, 1 - for _i in range(n - 1): - a, b = b, a + b - return a - - -# ---- CLI ---- -# The functions defined in this section are wrappers around the main Python -# API allowing them to be called directly from the terminal as a CLI -# executable/script. - - -def parse_args(args): - """Parse command line parameters - - Args: - args (List[str]): command line parameters as list of strings - (for example ``["--help"]``). - - Returns: - :obj:`argparse.Namespace`: command line parameters namespace - """ - parser = argparse.ArgumentParser(description="Just a Fibonacci demonstration") - parser.add_argument( - "--version", - action="version", - version=f"bluesky-blissdata {__version__}", - ) - parser.add_argument(dest="n", help="n-th Fibonacci number", type=int, metavar="INT") - parser.add_argument( - "-v", - "--verbose", - dest="loglevel", - help="set loglevel to INFO", - action="store_const", - const=logging.INFO, - ) - parser.add_argument( - "-vv", - "--very-verbose", - dest="loglevel", - help="set loglevel to DEBUG", - action="store_const", - const=logging.DEBUG, - ) - return parser.parse_args(args) - - -def setup_logging(loglevel): - """Setup basic logging - - Args: - loglevel (int): minimum loglevel for emitting messages - """ - logformat = "[%(asctime)s] %(levelname)s:%(name)s:%(message)s" - logging.basicConfig( - level=loglevel, stream=sys.stdout, format=logformat, datefmt="%Y-%m-%d %H:%M:%S" - ) - - -def main(args): - """Wrapper allowing :func:`fib` to be called with string arguments in a CLI fashion - - Instead of returning the value from :func:`fib`, it prints the result to the - ``stdout`` in a nicely formatted message. - - Args: - args (List[str]): command line parameters as list of strings - (for example ``["--verbose", "42"]``). - """ - args = parse_args(args) - setup_logging(args.loglevel) - _logger.debug("Starting crazy calculations...") - print(f"The {args.n}-th Fibonacci number is {fib(args.n)}") - _logger.info("Script ends here") - - -def run(): - """Calls :func:`main` passing the CLI arguments extracted from :obj:`sys.argv` - - This function can be used as entry point to create console scripts with setuptools. - """ - main(sys.argv[1:]) - - -if __name__ == "__main__": - # ^ This is a guard statement that will prevent the following code from - # being executed in the case someone imports this file instead of - # executing it as a script. - # https://docs.python.org/3/library/__main__.html - - # After installing your project with pip, users can also run your Python - # modules as scripts via the ``-m`` flag, as defined in PEP 338:: - # - # python -m bluesky_blissdata.skeleton 42 - # - run()