diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..13f11767de12d79b1a90d1623d5d71d1d0d7dd38 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,42 @@ +# See https://pre-commit.com for more information +# See https://pre-commit.com/hooks.html for more hooks +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.1.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + - id: check-added-large-files + - id: check-ast + - id: check-docstring-first + +- repo: https://github.com/psf/black + rev: 22.1.0 + hooks: + - id: black + args: [--line-length=79] +- repo: https://github.com/PyCQA/flake8 + rev: 4.0.1 + hooks: + - id: flake8 +- repo: https://github.com/asottile/reorder_python_imports + rev: v2.7.1 + hooks: + - id: reorder-python-imports + args: [--application-directories, '.:src', --py36-plus] +- repo: https://github.com/asottile/pyupgrade + rev: v2.31.0 + hooks: + - id: pyupgrade + args: [--py36-plus] +- repo: https://github.com/asottile/add-trailing-comma + rev: v2.2.1 + hooks: + - id: add-trailing-comma + args: [--py36-plus] +- repo: https://gitlab.com/iamlikeme/nbhooks + rev: 1.0.0 + hooks: + - id: nb-ensure-clean + args: [--meta, ExecuteTime] diff --git a/poetry.lock b/poetry.lock index e61faff793a1cef7d3a45b39ef62117dceb70c81..e389e72370c16fe2dfd211af250736cd349e4e60 100644 --- a/poetry.lock +++ b/poetry.lock @@ -46,7 +46,7 @@ python-versions = "*" [[package]] name = "dask" -version = "2022.1.1" +version = "2022.2.0" description = "Parallel PyData with Task Scheduling" category = "main" optional = false @@ -62,10 +62,10 @@ toolz = ">=0.8.2" [package.extras] array = ["numpy (>=1.18)"] -complete = ["bokeh (>=2.1.1)", "distributed (==2022.01.1)", "jinja2", "numpy (>=1.18)", "pandas (>=1.0)"] +complete = ["bokeh (>=2.1.1)", "distributed (==2022.02.0)", "jinja2", "numpy (>=1.18)", "pandas (>=1.0)"] dataframe = ["numpy (>=1.18)", "pandas (>=1.0)"] diagnostics = ["bokeh (>=2.1.1)", "jinja2"] -distributed = ["distributed (==2022.01.1)"] +distributed = ["distributed (==2022.02.0)"] test = ["pytest", "pytest-rerunfailures", "pytest-xdist", "pre-commit"] [[package]] @@ -163,7 +163,7 @@ pyparsing = ">=2.0.2,<3.0.5 || >3.0.5" [[package]] name = "pandas" -version = "1.4.0" +version = "1.4.1" description = "Powerful data structures for data analysis, time series, and statistics" category = "main" optional = false @@ -395,8 +395,8 @@ cramjam = [ {file = "cramjam-2.5.0.tar.gz", hash = "sha256:a92c0c2db4c6a3804eaffa253c7ca49f849e7a893a31c902a8123d7c36b2b487"}, ] dask = [ - {file = "dask-2022.1.1-py3-none-any.whl", hash = "sha256:6caaf633c85ecd1197025de0b8bd244ef88b4b1810f315b09005b3fe5c21ee8e"}, - {file = "dask-2022.1.1.tar.gz", hash = "sha256:3d5e935792d8a5a61d19cb7e63771ee02cdfd6122e36beb15c2dad6257320c58"}, + {file = "dask-2022.2.0-py3-none-any.whl", hash = "sha256:feaf838faa23150faadaeb2483e8612cfb8fed51f62e635a1a6dd55d1d793ba4"}, + {file = "dask-2022.2.0.tar.gz", hash = "sha256:cefb5c63d1e26f6dfa650ddd1eb1a53e0cef623141b838820c6b34e6534ea409"}, ] fastparquet = [ {file = "fastparquet-0.8.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:83eb4e5f9acb9aefa099cc3ff968985a004003686f1b7ee77b4d08d85f7477fe"}, @@ -492,27 +492,27 @@ packaging = [ {file = "packaging-21.3.tar.gz", hash = "sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb"}, ] pandas = [ - {file = "pandas-1.4.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:de62cf699122dcef175988f0714678e59c453dc234c5b47b7136bfd7641e3c8c"}, - {file = "pandas-1.4.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:46a18572f3e1cb75db59d9461940e9ba7ee38967fa48dd58f4139197f6e32280"}, - {file = "pandas-1.4.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:73f7da2ccc38cc988b74e5400b430b7905db5f2c413ff215506bea034eaf832d"}, - {file = "pandas-1.4.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5229c95db3a907451dacebc551492db6f7d01743e49bbc862f4a6010c227d187"}, - {file = "pandas-1.4.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fe454180ad31bbbe1e5d111b44443258730467f035e26b4e354655ab59405871"}, - {file = "pandas-1.4.0-cp310-cp310-win_amd64.whl", hash = "sha256:784cca3f69cfd7f6bd7c7fdb44f2bbab17e6de55725e9ff36d6f382510dfefb5"}, - {file = "pandas-1.4.0-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:de8f8999864399529e8514a2e6bfe00fd161f0a667903655552ed12e583ae3cb"}, - {file = "pandas-1.4.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:0f19504f2783526fb5b4de675ea69d68974e21c1624f4b92295d057a31d5ec5f"}, - {file = "pandas-1.4.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:f045bb5c6bfaba536089573bf97d6b8ccc7159d951fe63904c395a5e486fbe14"}, - {file = "pandas-1.4.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5280d057ddae06fe4a3cd6aa79040b8c205cd6dd21743004cf8635f39ed01712"}, - {file = "pandas-1.4.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1f3b74335390dda49f5d5089fab71958812bf56f42aa27663ee4c16d19f4f1c5"}, - {file = "pandas-1.4.0-cp38-cp38-win32.whl", hash = "sha256:51e5da3802aaee1aa4254108ffaf1129a15fb3810b7ce8da1ec217c655b418f5"}, - {file = "pandas-1.4.0-cp38-cp38-win_amd64.whl", hash = "sha256:f103a5cdcd66cb18882ccdc18a130c31c3cfe3529732e7f10a8ab3559164819c"}, - {file = "pandas-1.4.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:4a8d5a200f8685e7ea562b2f022c77ab7cb82c1ca5b240e6965faa6f84e5c1e9"}, - {file = "pandas-1.4.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:b5af258c7b090cca7b742cf2bd67ad1919aa9e4e681007366c9edad2d6a3d42b"}, - {file = "pandas-1.4.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:156aac90dd7b303bf0b91bae96c0503212777f86c731e41929c571125d26c8e9"}, - {file = "pandas-1.4.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2dad075089e17a72391de33021ad93720aff258c3c4b68c78e1cafce7e447045"}, - {file = "pandas-1.4.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1d59c958d6b8f96fdf850c7821571782168d5acfe75ccf78cd8d1ac15fb921df"}, - {file = "pandas-1.4.0-cp39-cp39-win32.whl", hash = "sha256:55ec0e192eefa26d823fc25a1f213d6c304a3592915f368e360652994cdb8d9a"}, - {file = "pandas-1.4.0-cp39-cp39-win_amd64.whl", hash = "sha256:23c04dab11f3c6359cfa7afa83d3d054a8f8c283d773451184d98119ef54da97"}, - {file = "pandas-1.4.0.tar.gz", hash = "sha256:cdd76254c7f0a1583bd4e4781fb450d0ebf392e10d3f12e92c95575942e37df5"}, + {file = "pandas-1.4.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:3dfb32ed50122fe8c5e7f2b8d97387edd742cc78f9ec36f007ee126cd3720907"}, + {file = "pandas-1.4.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:0259cd11e7e6125aaea3af823b80444f3adad6149ff4c97fef760093598b3e34"}, + {file = "pandas-1.4.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:96e9ece5759f9b47ae43794b6359bbc54805d76e573b161ae770c1ea59393106"}, + {file = "pandas-1.4.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:508c99debccd15790d526ce6b1624b97a5e1e4ca5b871319fb0ebfd46b8f4dad"}, + {file = "pandas-1.4.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e6a7bbbb7950063bfc942f8794bc3e31697c020a14f1cd8905fc1d28ec674a01"}, + {file = "pandas-1.4.1-cp310-cp310-win_amd64.whl", hash = "sha256:c614001129b2a5add5e3677c3a213a9e6fd376204cb8d17c04e84ff7dfc02a73"}, + {file = "pandas-1.4.1-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:4e1176f45981c8ccc8161bc036916c004ca51037a7ed73f2d2a9857e6dbe654f"}, + {file = "pandas-1.4.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:bbb15ad79050e8b8d39ec40dd96a30cd09b886a2ae8848d0df1abba4d5502a67"}, + {file = "pandas-1.4.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:6d6ad1da00c7cc7d8dd1559a6ba59ba3973be6b15722d49738b2be0977eb8a0c"}, + {file = "pandas-1.4.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:358b0bc98a5ff067132d23bf7a2242ee95db9ea5b7bbc401cf79205f11502fd3"}, + {file = "pandas-1.4.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6105af6533f8b63a43ea9f08a2ede04e8f43e49daef0209ab0d30352bcf08bee"}, + {file = "pandas-1.4.1-cp38-cp38-win32.whl", hash = "sha256:04dd15d9db538470900c851498e532ef28d4e56bfe72c9523acb32042de43dfb"}, + {file = "pandas-1.4.1-cp38-cp38-win_amd64.whl", hash = "sha256:1b384516dbb4e6aae30e3464c2e77c563da5980440fbdfbd0968e3942f8f9d70"}, + {file = "pandas-1.4.1-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:f02e85e6d832be37d7f16cf6ac8bb26b519ace3e5f3235564a91c7f658ab2a43"}, + {file = "pandas-1.4.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:0b1a13f647e4209ed7dbb5da3497891d0045da9785327530ab696417ef478f84"}, + {file = "pandas-1.4.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:19f7c632436b1b4f84615c3b127bbd7bc603db95e3d4332ed259dc815c9aaa26"}, + {file = "pandas-1.4.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7ea47ba1d6f359680130bd29af497333be6110de8f4c35b9211eec5a5a9630fa"}, + {file = "pandas-1.4.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2e5a7a1e0ecaac652326af627a3eca84886da9e667d68286866d4e33f6547caf"}, + {file = "pandas-1.4.1-cp39-cp39-win32.whl", hash = "sha256:1d85d5f6be66dfd6d1d8d13b9535e342a2214260f1852654b19fa4d7b8d1218b"}, + {file = "pandas-1.4.1-cp39-cp39-win_amd64.whl", hash = "sha256:3129a35d9dad1d80c234dd78f8f03141b914395d23f97cf92a366dcd19f8f8bf"}, + {file = "pandas-1.4.1.tar.gz", hash = "sha256:8db93ec98ac7cb5f8ac1420c10f5e3c43533153f253fe7fb6d891cf5aa2b80d2"}, ] partd = [ {file = "partd-1.2.0-py3-none-any.whl", hash = "sha256:5c3a5d70da89485c27916328dc1e26232d0e270771bd4caef4a5124b6a457288"}, diff --git a/sed/binning.py b/sed/binning.py index b4309c2d48ad72de956f4b4fc8a50fe2d8ae9f0c..4bb0fb476b6a0ccba77cc047834ad675260c65d5 100644 --- a/sed/binning.py +++ b/sed/binning.py @@ -1,22 +1,21 @@ # All functions in this file are adapted from https://github.com/mpes-kit/mpes - -import os -from typing import List, Set, Dict, Tuple, Optional, Union, Sequence, Iterable -from threadpoolctl import threadpool_limits from functools import reduce -import psutil - +from typing import Sequence +from typing import Tuple +from typing import Union +import dask +import numba import numpy as np -import xarray as xr import pandas as pd -import numba -import dask -import dask.dataframe as ddf +import psutil +import xarray as xr +from threadpoolctl import threadpool_limits from tqdm.auto import tqdm N_CPU = psutil.cpu_count() + def _arraysum(array_a, array_b): """ Calculate the sum of two arrays. @@ -24,44 +23,55 @@ def _arraysum(array_a, array_b): return array_a + array_b + def bin_partition( - part: Union[dask.dataframe.core.DataFrame,pd.DataFrame], - binDict: dict = None, - binAxes: Union[str, Sequence[str]] = None, - binRanges: Sequence[Tuple[float,float]] = None, - nBins: Union[int,Sequence[int]] = 100, - hist_mode: str = 'numba', - jitterParams: dict = None, - return_edges: bool=False) -> np.ndarray: - """ Compute the n-dimensional histogram of a single dataframe partition. + part: Union[dask.dataframe.core.DataFrame, pd.DataFrame], + binDict: dict = None, + binAxes: Union[str, Sequence[str]] = None, + binRanges: Sequence[Tuple[float, float]] = None, + nBins: Union[int, Sequence[int]] = 100, + hist_mode: str = "numba", + jitterParams: dict = None, + return_edges: bool = False, +) -> np.ndarray: + """Compute the n-dimensional histogram of a single dataframe partition. Args: part (ddf.DataFrame): dataframe on which to perform the histogram. Usually a partition of a dask DataFrame. - binDict (dict, optional): TODO: implement passing binning parameters as dictionary or other methdods - binAxes (list): List of names of the axes (columns) on which to calculate the histogram. + binDict (dict, optional): TODO: implement passing binning parameters as + dictionary or other methods + binAxes (list): List of names of the axes (columns) on which to + calculate the histogram. The order will be the order of the dimensions in the resulting array. - nBins (int or list, optional): List of number of points along the different axes. Defaults to None. - binranges (tuple, optional): list of tuples containing the start and end point of the binning range. Defaults to None. - hist_mode (str, optional): Histogram calculation method. Choose between "numpy" which uses numpy.histogramdd, and "numba" - which uses a numba powered similar method. Defaults to 'numba'. + nBins (int or list, optional): List of number of points along the + different axes. Defaults to None. + binranges (tuple, optional): list of tuples containing the start and + end point of the binning range. Defaults to None. + hist_mode (str, optional): Histogram calculation method. Choose between + "numpy" which uses numpy.histogramdd, and "numba" which uses a + numba powered similar method. Defaults to 'numba'. jitterParams (dict, optional): Not yet Implemented. Defaults to None. - return_edges: (bool, optional): If true, returns a list of D arrays describing the bin edges for each dimension, similar to - the behaviour of np.histogramdd. Defaults to False + return_edges: (bool, optional): If true, returns a list of D arrays + describing the bin edges for each dimension, similar to the + behaviour of np.histogramdd. Defaults to False Raises: - Warning: Warns if there are unimplemented features the user is trying to use. + Warning: Warns if there are unimplemented features the user is trying + to use. ValueError: When the method requested is not available. - KeyError: when the columns along which to compute the histogram are not present in the dataframe + KeyError: when the columns along which to compute the histogram are not + present in the dataframe Returns: hist (np.array) : The result of the n-dimensional binning - edges (list,optional) : A list of D arrays describing the bin edges for each dimension. + edges (list,optional) : A list of D arrays describing the bin edges for + each dimension. This is returned only when return_edges is True. """ if jitterParams is not None: - raise Warning('Jittering is not yet implemented.') + raise Warning("Jittering is not yet implemented.") cols = part.columns # Locate columns for binning operation @@ -69,98 +79,136 @@ def bin_partition( vals = part.values[:, binColumns] - if hist_mode == 'numba': - hist_partition, edges = numba_histogramdd(vals, bins=nBins, ranges=binRanges) - elif hist_mode == 'numpy': - hist_partition, edges = np.histogramdd(vals, bins=nBins, range=binRanges) + if hist_mode == "numba": + hist_partition, edges = numba_histogramdd( + vals, + bins=nBins, + ranges=binRanges, + ) + elif hist_mode == "numpy": + hist_partition, edges = np.histogramdd( + vals, + bins=nBins, + range=binRanges, + ) else: - raise ValueError(f'No binning method {hist_mode} available. Please choose between numba and numpy.') - + raise ValueError( + f"No binning method {hist_mode} available. Please choose between " + f"numba and numpy.", + ) + if return_edges: - return hist_partition,edges + return hist_partition, edges else: return hist_partition + def bin_dataframe( - df : dask.dataframe.DataFrame, + df: dask.dataframe.DataFrame, binDict: dict = None, binAxes: Union[str, Sequence[str]] = None, - binRanges: Sequence[Tuple[float,float]] = None, - nBins: Union[int,Sequence[int]] = 100, - hist_mode: str = 'numba', - mode: str = 'fast', + binRanges: Sequence[Tuple[float, float]] = None, + nBins: Union[int, Sequence[int]] = 100, + hist_mode: str = "numba", + mode: str = "fast", jitterParams: dict = None, pbar: bool = True, - nCores: int = N_CPU-1, + nCores: int = N_CPU - 1, nThreadsPerWorker: int = 4, - threadpoolAPI: str = 'blas', - **kwds) -> xr.DataArray: - """ Computes the n-dimensional histogram on columns of a dataframe, parallelized. - - + threadpoolAPI: str = "blas", + **kwds, +) -> xr.DataArray: + """Computes the n-dimensional histogram on columns of a dataframe, + parallelized. Args: df (dask.dataframe.DataFrame): _description_ - binDict (dict, optional): TODO: implement passing binning parameters as dictionary or other methdods - binAxes (list): List of names of the axes (columns) on which to calculate the histogram. - The order will be the order of the dimensions in the resulting array. - nBins (int or list, optional): List of number of points along the different axes. Defaults to None. - binranges (tuple, optional): list of tuples containing the start and end point of the binning range. Defaults to None. - hist_mode (str, optional): Histogram calculation method. Choose between "numpy" which uses numpy.histogramdd, and "numba" - which uses a numba powered similar method. Defaults to 'numba'. - mode (str, optional): Defines how the results from each partition are combined. + binDict (dict, optional): TODO: implement passing binning parameters as + dictionary or other methods + binAxes (list): List of names of the axes (columns) on which to + calculate the histogram. + The order will be the order of the dimensions in the resulting + array. + nBins (int or list, optional): List of number of points along the + different axes. Defaults to None. + binranges (tuple, optional): list of tuples containing the start and + end point of the binning range. Defaults to None. + hist_mode (str, optional): Histogram calculation method. Choose between + "numpy" which uses numpy.histogramdd, and "numba" which uses a + numba powered similar method. Defaults to 'numba'. + mode (str, optional): Defines how the results from each partition are + combined. Available modes are 'fast', 'lean' and 'legacy'. Defaults to 'fast'. jitterParams (dict, optional): Not yet Implemented. Defaults to None. - pbar (bool, optional): Allows to deactivate the tqdm progress bar. Defaults to True. - nCores (int, optional): Number of CPU cores to use for parallelization. Defaults to N_CPU-1. - nThreadsPerWorker (int, optional): Limit the number of threads that multiprocessing can spawn. Defaults to 4. - threadpoolAPI (str, optional): The API to use for multiprocessing. Defaults to 'blas'. + pbar (bool, optional): Allows to deactivate the tqdm progress bar. + Defaults to True. + nCores (int, optional): Number of CPU cores to use for parallelization. + Defaults to N_CPU-1. + nThreadsPerWorker (int, optional): Limit the number of threads that + multiprocessing can spawn. Defaults to 4. + threadpoolAPI (str, optional): The API to use for multiprocessing. + Defaults to 'blas'. Raises: - Warning: Warns if there are unimplemented features the user is trying to use. - ValueError: Rises when there is a mismatch in dimensions between the binning parameters + Warning: Warns if there are unimplemented features the user is trying + to use. + ValueError: Rises when there is a mismatch in dimensions between the + binning parameters Returns: - xr.DataArray: The result of the n-dimensional binning represented in an xarray object, combining - the data with the axes. - """ + xr.DataArray: The result of the n-dimensional binning represented in an + xarray object, combining the data with the axes. + """ if jitterParams is not None: - raise Warning('Jittering is not yet implemented.') + raise Warning("Jittering is not yet implemented.") if binDict is not None: - raise Warning('Usage of binDict is not yet implemented.') + raise Warning("Usage of binDict is not yet implemented.") - if isinstance(binAxes,str): + if isinstance(binAxes, str): binAxes = [binAxes] - elif len(binAxes) != len(binRanges): - raise ValueError('Must define ranges for all axes') - elif isinstance(nBins,int): - nBins = [nBins]*len(binAxes) + elif len(binAxes) != len(binRanges): + raise ValueError("Must define ranges for all axes") + elif isinstance(nBins, int): + nBins = [nBins] * len(binAxes) elif len(nBins) != len(binAxes): - raise ValueError('nBins must be integer or a list of integers for each dimension in axes.') - + raise ValueError( + "nBins must be integer or a list of integers for each dimension " + "in axes.", + ) + fullResult = np.zeros(tuple(nBins)) - partitionResults = [] # Partition-level results + partitionResults = [] # Partition-level results # limit multithreading in worker threads with threadpool_limits(limits=nThreadsPerWorker, user_api=threadpoolAPI): # Main loop for binning - for i in tqdm(range(0, df.npartitions, nCores), disable=not(pbar)): + for i in tqdm(range(0, df.npartitions, nCores), disable=not (pbar)): - coreTasks = [] # Core-level jobs + coreTasks = [] # Core-level jobs for j in range(0, nCores): ij = i + j if ij >= df.npartitions: break - dfPartition = df.get_partition(ij) # Obtain dataframe partition - coreTasks.append(dask.delayed(bin_partition)(dfPartition, binAxes, nBins, binRanges, hist_mode)) + dfPartition = df.get_partition( + ij, + ) # Obtain dataframe partition + coreTasks.append( + dask.delayed(bin_partition)( + dfPartition, + binAxes, + nBins, + binRanges, + hist_mode, + ), + ) if len(coreTasks) > 0: coreResults = dask.compute(*coreTasks, **kwds) - if mode == 'legacy': + if mode == "legacy": # Combine all core results for a dataframe partition partitionResult = np.zeros_like(coreResults[0]) for coreResult in coreResults: @@ -169,180 +217,236 @@ def bin_dataframe( partitionResults.append(partitionResult) # del partitionResult - elif mode == 'lean': + elif mode == "lean": # Combine all core results for a dataframe partition partitionResult = reduce(_arraysum, coreResults) fullResult += partitionResult del partitionResult del coreResults - elif mode == 'fast': + elif mode == "fast": combineTasks = [] for j in range(0, nCores): combineParts = [] - # split results along the first dimension among worker threads + # split results along the first dimension among worker + # threads for r in coreResults: - combineParts.append(r[int(j*nBins[0]/nCores):int((j+1)*nBins[0]/nCores),...]) - combineTasks.append(dask.delayed(reduce)(_arraysum, combineParts)) + combineParts.append( + r[ + int(j * nBins[0] / nCores) : int( + (j + 1) * nBins[0] / nCores, + ), + ..., + ], + ) + combineTasks.append( + dask.delayed(reduce)(_arraysum, combineParts), + ) combineResults = dask.compute(*combineTasks, **kwds) - # Directly fill into target array. This is much faster than the (not so parallel) reduce/concatenation used before, and uses less memory. - + # Directly fill into target array. This is much faster than + # the (not so parallel) reduce/concatenation used before, + # and uses less memory. + for j in range(0, nCores): - fullResult[int(j*nBins[0]/nCores):int((j+1)*nBins[0]/nCores),...] += combineResults[j] + fullResult[ + int(j * nBins[0] / nCores) : int( + (j + 1) * nBins[0] / nCores, + ), + ..., + ] += combineResults[j] del combineParts del combineTasks del combineResults del coreResults else: - raise ValueError(f'Could not interpret mode {mode}') - + raise ValueError(f"Could not interpret mode {mode}") del coreTasks - if mode == 'legacy': + if mode == "legacy": # still need to combine all partition results fullResult = np.zeros_like(partitionResults[0]) for pr in partitionResults: fullResult += np.nan_to_num(pr) - da = xr.DataArray( - data = fullResult.astype('float32'), - coords = {ax:np.linspace(r[0],r[1],n) for ax,r,n in zip(binAxes,binRanges,nBins)}, - dims=list(binAxes)) + data=fullResult.astype("float32"), + coords={ + ax: np.linspace(r[0], r[1], n) + for ax, r, n in zip(binAxes, binRanges, nBins) + }, + dims=list(binAxes), + ) return da - -def applyJitter(df: Union[dask.dataframe.core.DataFrame,pd.DataFrame], amp:float, col:str, mode:str='uniform'): - """ Add jittering to the column of a dataframe + +def applyJitter( + df: Union[dask.dataframe.core.DataFrame, pd.DataFrame], + amp: float, + col: str, + mode: str = "uniform", +): + """Add jittering to the column of a dataframe Args: df (pd.DataFrame): Dataframe to add noise/jittering to. amp (float): Amplitude scaling for the jittering noise. col (str): Name of the column to add jittering to. - mode (str, optional): Choose between 'uniform' for uniformly distributed noise, - or 'normal' for noise with normal distribution. For columns with digital values, - one should choose 'uniform' as well as amplitude (amp) equal to half the step size. + mode (str, optional): Choose between 'uniform' for uniformly + distributed noise, or 'normal' for noise with normal distribution. + For columns with digital values, one should choose 'uniform' as + well as amplitude (amp) equal to half the step size. Defaults to 'uniform'. """ colsize = df[col].size - if (mode == 'uniform'): + if mode == "uniform": # Uniform Jitter distribution - df[col] += amp*np.random.uniform(low=-1, high=1, size=colsize) - elif (mode == 'normal'): - # Normal Jitter distribution works better for non-linear transformations and jitter sizes that don't match the original bin sizes - df[col] += amp*np.random.standard_normal(size=colsize) + df[col] += amp * np.random.uniform(low=-1, high=1, size=colsize) + elif mode == "normal": + # Normal Jitter distribution works better for non-linear + # transformations and jitter sizes that don't match the original bin + # sizes + df[col] += amp * np.random.standard_normal(size=colsize) + @numba.jit(nogil=True, parallel=False) def _hist1d_numba_seq(sample, bins, ranges): """ 1D Binning function, pre-compiled by Numba for performance. - Behaves much like numpy.histogramdd, but calculates and returns unsigned 32 bit integers + Behaves much like numpy.histogramdd, but calculates and returns unsigned 32 + bit integers """ H = np.zeros((bins[0]), dtype=np.uint32) - delta = 1/((ranges[:,1] - ranges[:,0]) / bins) + delta = 1 / ((ranges[:, 1] - ranges[:, 0]) / bins) - if (sample.shape[1] != 1): - raise ValueError( - 'The dimension of bins must be equal to the dimension of the sample x.') + if sample.shape[1] != 1: + raise ValueError( + "The dimension of bins must be equal to the dimension of the " + "sample x.", + ) for t in range(sample.shape[0]): - i = (sample[t,0] - ranges[0,0]) * delta[0] + i = (sample[t, 0] - ranges[0, 0]) * delta[0] if 0 <= i < bins[0]: H[int(i)] += 1 return H + @numba.jit(nogil=True, parallel=False) def _hist2d_numba_seq(sample, bins, ranges): """ 2D Binning function, pre-compiled by Numba for performance. - Behaves much like numpy.histogramdd, but calculates and returns unsigned 32 bit integers + Behaves much like numpy.histogramdd, but calculates and returns unsigned 32 + bit integers """ H = np.zeros((bins[0], bins[1]), dtype=np.uint32) - delta = 1/((ranges[:,1] - ranges[:,0]) / bins) + delta = 1 / ((ranges[:, 1] - ranges[:, 0]) / bins) - if (sample.shape[1] != 2): - raise ValueError( - 'The dimension of bins must be equal to the dimension of the sample x.') + if sample.shape[1] != 2: + raise ValueError( + "The dimension of bins must be equal to the dimension of the " + "sample x.", + ) for t in range(sample.shape[0]): - i = (sample[t,0] - ranges[0,0]) * delta[0] - j = (sample[t,1] - ranges[1,0]) * delta[1] + i = (sample[t, 0] - ranges[0, 0]) * delta[0] + j = (sample[t, 1] - ranges[1, 0]) * delta[1] if 0 <= i < bins[0] and 0 <= j < bins[1]: - H[int(i),int(j)] += 1 + H[int(i), int(j)] += 1 return H + @numba.jit(nogil=True, parallel=False) def _hist3d_numba_seq(sample, bins, ranges): """ 3D Binning function, pre-compiled by Numba for performance. - Behaves much like numpy.histogramdd, but calculates and returns unsigned 32 bit integers + Behaves much like numpy.histogramdd, but calculates and returns unsigned 32 + bit integers """ H = np.zeros((bins[0], bins[1], bins[2]), dtype=np.uint32) - delta = 1/((ranges[:,1] - ranges[:,0]) / bins) + delta = 1 / ((ranges[:, 1] - ranges[:, 0]) / bins) - if (sample.shape[1] != 3): - raise ValueError( - 'The dimension of bins must be equal to the dimension of the sample x.') + if sample.shape[1] != 3: + raise ValueError( + "The dimension of bins must be equal to the dimension of the " + "sample x.", + ) for t in range(sample.shape[0]): - i = (sample[t,0] - ranges[0,0]) * delta[0] - j = (sample[t,1] - ranges[1,0]) * delta[1] - k = (sample[t,2] - ranges[2,0]) * delta[2] + i = (sample[t, 0] - ranges[0, 0]) * delta[0] + j = (sample[t, 1] - ranges[1, 0]) * delta[1] + k = (sample[t, 2] - ranges[2, 0]) * delta[2] if 0 <= i < bins[0] and 0 <= j < bins[1] and 0 <= k < bins[2]: - H[int(i),int(j), int(k)] += 1 + H[int(i), int(j), int(k)] += 1 return H + @numba.jit(nogil=True, parallel=False) def _hist4d_numba_seq(sample, bins, ranges): """ 4D Binning function, pre-compiled by Numba for performance. - Behaves much like numpy.histogramdd, but calculates and returns unsigned 32 bit integers + Behaves much like numpy.histogramdd, but calculates and returns unsigned 32 + bit integers """ H = np.zeros((bins[0], bins[1], bins[2], bins[3]), dtype=np.uint32) - delta = 1/((ranges[:,1] - ranges[:,0]) / bins) + delta = 1 / ((ranges[:, 1] - ranges[:, 0]) / bins) - if (sample.shape[1] != 4): - raise ValueError( - 'The dimension of bins must be equal to the dimension of the sample x.') + if sample.shape[1] != 4: + raise ValueError( + "The dimension of bins must be equal to the dimension of the " + "sample x.", + ) for t in range(sample.shape[0]): - i = (sample[t,0] - ranges[0,0]) * delta[0] - j = (sample[t,1] - ranges[1,0]) * delta[1] - k = (sample[t,2] - ranges[2,0]) * delta[2] - l = (sample[t,3] - ranges[3,0]) * delta[3] - if 0 <= i < bins[0] and 0 <= j < bins[1] and 0 <= k < bins[2] and 0 <= l < bins[3]: - H[int(i),int(j),int(k),int(l)] += 1 + dim_1 = (sample[t, 0] - ranges[0, 0]) * delta[0] + dim_2 = (sample[t, 1] - ranges[1, 0]) * delta[1] + dim_3 = (sample[t, 2] - ranges[2, 0]) * delta[2] + dim_4 = (sample[t, 3] - ranges[3, 0]) * delta[3] + if ( + 0 <= dim_1 < bins[0] + and 0 <= dim_2 < bins[1] + and 0 <= dim_3 < bins[2] + and 0 <= dim_4 < bins[3] + ): + H[int(dim_1), int(dim_2), int(dim_3), int(dim_4)] += 1 return H -def numba_histogramdd(sample:np.array, bins:Sequence, ranges:Sequence) -> Tuple[np.array,np.array]: - """ Wrapper for the Number pre-compiled binning functions. + +def numba_histogramdd( + sample: np.array, + bins: Sequence, + ranges: Sequence, +) -> Tuple[np.array, np.array]: + """Wrapper for the Number pre-compiled binning functions. Behaves in total much like numpy.histogramdd. Returns uint32 arrays. - This was chosen because it has a significant performance improvement over uint64 - for large binning volumes. Be aware that this can cause overflows for very large - sample sets exceeding 3E9 counts in a single bin. This should never happen in a - realistic photoemission experiment with useful bin sizes. + This was chosen because it has a significant performance improvement over + uint64 for large binning volumes. Be aware that this can cause overflows + for very large sample sets exceeding 3E9 counts in a single bin. This + should never happen in a realistic photoemission experiment with useful bin + sizes. Args: sample (np.array): The data to be histogrammed with shape N,D bins (Sequence): the number of bins for each dimension D - ranges (Sequence): the + ranges (Sequence): the Raises: ValueError: In case of dimension mismatch. - NotImplementedError: When attempting binning in too high number of dimensions (>4) + NotImplementedError: When attempting binning in too high number of + dimensions (>4) RuntimeError: Internal shape error after binning Returns: hist (np.array): The computed histogram - edges (np.array): A list of D arrays describing the bin edges for each dimension. - """ + edges (np.array): A list of D arrays describing the bin edges for each + dimension. + """ try: # Sample is an ND-array. @@ -356,45 +460,45 @@ def numba_histogramdd(sample:np.array, bins:Sequence, ranges:Sequence) -> Tuple[ M = len(bins) if M != D: raise ValueError( - 'The dimension of bins must be equal to the dimension of the ' - ' sample x.') + "The dimension of bins must be equal to the dimension of the " + " sample x.", + ) except TypeError: # bins is an integer - bins = D*[bins] + bins = D * [bins] nbin = np.empty(D, int) - edges = D*[None] - dedges = D*[None] + edges = D * [None] # normalize the ranges argument if ranges is None: ranges = (None,) * D elif len(ranges) != D: - raise ValueError('range argument must have one entry per dimension') + raise ValueError("range argument must have one entry per dimension") ranges = np.asarray(ranges) bins = np.asarray(bins) # Create edge arrays for i in range(D): - edges[i] = np.linspace(*ranges[i,:], bins[i]+1) + edges[i] = np.linspace(*ranges[i, :], bins[i] + 1) nbin[i] = len(edges[i]) + 1 # includes an outlier on each end - if (D == 1): - hist = _hist1d_numba_seq(sample, bins , ranges) - elif (D == 2): - hist = _hist2d_numba_seq(sample, bins , ranges) - elif (D == 3): - hist = _hist3d_numba_seq(sample, bins , ranges) - elif (D == 4): - hist = _hist4d_numba_seq(sample, bins , ranges) + if D == 1: + hist = _hist1d_numba_seq(sample, bins, ranges) + elif D == 2: + hist = _hist2d_numba_seq(sample, bins, ranges) + elif D == 3: + hist = _hist3d_numba_seq(sample, bins, ranges) + elif D == 4: + hist = _hist4d_numba_seq(sample, bins, ranges) else: - raise NotImplementedError('Only implemented for up to 4 dimensions currently.') + raise NotImplementedError( + "Only implemented for up to 4 dimensions currently.", + ) if (hist.shape != nbin - 2).any(): - raise RuntimeError( - "Internal Shape Error") + raise RuntimeError("Internal Shape Error") return hist, edges - diff --git a/sed/core.py b/sed/core.py index f8bfff887604a14e8b1031dcd38c28b3736c14cf..1e0605e432fd7762488c21c040e6820328d3498f 100644 --- a/sed/core.py +++ b/sed/core.py @@ -1,41 +1,37 @@ - -from typing import Any, Dict, Sequence -import psutil +from typing import Any import pandas as pd -import numpy as np +import psutil import xarray as xr - from .metadata import MetaHandler -from .binning import bin_dataframe#binDataframe, binDataframe_fast,binDataframe_lean,binDataframe_numba N_CPU = psutil.cpu_count() + class SedProcessor: - """[summary] - """ + """[summary]""" def __init__(self): self._dataframe = None - + self._dimensions = [] self._coordinates = {} self._attributes = MetaHandler() def __repr__(self): if self._dataframe is None: - df_str = 'Data Frame: No Data loaded' + df_str = "Data Frame: No Data loaded" else: df_str = self._dataframe.__repr__() - coordinates_str = f'Coordinates: {self._coordinates}' - dimensions_str = f'Dimensions: {self._dimensions}' - s = df_str + '\n' + coordinates_str + '\n' + dimensions_str + coordinates_str = f"Coordinates: {self._coordinates}" + dimensions_str = f"Dimensions: {self._dimensions}" + s = df_str + "\n" + coordinates_str + "\n" + dimensions_str return s - def __getitem__(self,val: Any) -> pd.DataFrame: - """ Accessor to the underlying data structure. + def __getitem__(self, val: Any) -> pd.DataFrame: + """Accessor to the underlying data structure. Args: val (Any): [description] @@ -53,8 +49,8 @@ class SedProcessor: return self._dimensions @dimensions.setter - def dimensions(self,dims): - assert isinstance(dims,list) + def dimensions(self, dims): + assert isinstance(dims, list) self._dimensions = dims @property @@ -62,17 +58,17 @@ class SedProcessor: return self._coordinates @coordinates.setter - def coordinates(self,coords): - assert isinstance(coords,dict) + def coordinates(self, coords): + assert isinstance(coords, dict) self._coordinates = {} - for k,v in coords.items(): + for k, v in coords.items(): self._coordinates[k] = xr.DataArray(v) def load(self, data: pd.DataFrame) -> None: - """ Load tabular data of Single Events + """Load tabular data of Single Events Args: - data (TabularType): data in tabular format. Accepts anything which + data (TabularType): data in tabular format. Accepts anything which can be interpreted by pd.DataFrame as an input Returns: @@ -80,22 +76,25 @@ class SedProcessor: """ self._dataframe = pd.DataFrame(data) - def compute(self, - mode: str='numba', - binDict: dict=None, - axes: list=None, - nbins: int=None, - ranges: list=None, - pbar: bool=True, - jittered: bool=True, - ncores: int=N_CPU, - pbenv: str='classic', - **kwds) -> xr.DataArray: - """ Compute the histogram along the given dimensions. + def compute( + self, + mode: str = "numba", + binDict: dict = None, + axes: list = None, + nbins: int = None, + ranges: list = None, + pbar: bool = True, + jittered: bool = True, + ncores: int = N_CPU, + pbenv: str = "classic", + **kwds, + ) -> xr.DataArray: + """Compute the histogram along the given dimensions. Args: - mode (str, optional): Binning method, choose between numba, - fast, lean and legacy (Y. Acremann's method). Defaults to 'numba'. + mode (str, optional): Binning method, choose between numba, + fast, lean and legacy (Y. Acremann's method). Defaults to + 'numba'. ncores (int, optional): [description]. Defaults to N_CPU. axes ([type], optional): [description]. Defaults to None. nbins (int, optional): [description]. Defaults to None. @@ -110,8 +109,8 @@ class SedProcessor: """ pass - def add_dimension(self,name,range): + def add_dimension(self, name, range): if name in self._coordinates: - raise ValueError(f'Axis {name} already exists') + raise ValueError(f"Axis {name} already exists") else: self.axis[name] = self.make_axis(range) diff --git a/sed/metadata.py b/sed/metadata.py index 909b7f59d42abace03591514093f5efdf17876df..035595840346030702b608b1541585b2fc4c5bc5 100644 --- a/sed/metadata.py +++ b/sed/metadata.py @@ -1,90 +1,84 @@ -from datetime import datetime -from typing import Any, Dict -# import pandas as pd -import pprint +from typing import Any +from typing import Dict -class MetaHandler(): - def __init__(self,meta: Dict=None) -> None: +class MetaHandler: + def __init__(self, meta: Dict = None) -> None: self._m = meta if meta is not None else {} - def __getitem__(self,val: Any) -> None: + def __getitem__(self, val: Any) -> None: return self._m[val] def __repr__(self) -> str: # TODO: #35 add pretty print, possibly to HTML return str(self._m) - def add(self,v: Dict, duplicate: bool='raise') -> None: - """ Add an entry to the metadata container + def add(self, v: Dict, duplicate: bool = "raise") -> None: + """Add an entry to the metadata container Args: - v (Dict): dictionary containing the metadata to add. + v (Dict): dictionary containing the metadata to add. Must contain a 'name' key. - overwrite (str, optional): Control behaviour in case the 'name' key is - already present in the metadata dictionary. If raise, rieses a DuplicateEntryError. - If 'overwrite' it overwrites the previous data with the new one. + overwrite (str, optional): Control behaviour in case the 'name' key + is already present in the metadata dictionary. If raise, raises + a DuplicateEntryError. + If 'overwrite' it overwrites the previous data with the new + one. If 'append' it adds a trailing number, keeping both entries. Defaults to 'raise'. Raises: DuplicateEntryError: [description] - """ - d = { - 'timestamp':datetime.now(tz=None), - # 'timezone':datetime.timetz(), - } - - if v['name'] not in self._m.keys() or duplicate == 'overwrite': - self._m[v['name']] = v - elif duplicate == 'raise': - raise DuplicateEntryError(f"an entry {v['name']} already exists in metadata") - elif duplicate == 'append': - n=0 + """ + if v["name"] not in self._m.keys() or duplicate == "overwrite": + self._m[v["name"]] = v + elif duplicate == "raise": + raise DuplicateEntryError( + f"an entry {v['name']} already exists in metadata", + ) + elif duplicate == "append": + n = 0 while True: n += 1 - newname = f'name_{n}' + newname = f"name_{n}" if newname not in self._m.keys(): break self._m[newname] = v - else: - raise ValueError(f'could not interpret duplication handling method {duplicate}. Please choose between overwrite,append or rise.') - - def addProcessing(self,method:str, **kwds: Any) -> None: + raise ValueError( + f"could not interpret duplication handling method {duplicate}" + f"Please choose between overwrite,append or rise.", + ) + + def addProcessing(self, method: str, **kwds: Any) -> None: # TODO: #36 Add processing metadata validation tests - self._m['processing'][method] = kwds + self._m["processing"][method] = kwds - def from_nexus(self,val: Any) -> None: - raise NotImplementedError() - def to_nexus(self,val: Any) -> None: + def from_nexus(self, val: Any) -> None: raise NotImplementedError() - def from_json(self,val: Any) -> None: + def to_nexus(self, val: Any) -> None: raise NotImplementedError() - def to_json(self,val: Any) -> None: + def from_json(self, val: Any) -> None: raise NotImplementedError() - def from_dict(self,val: Any) -> None: + def to_json(self, val: Any) -> None: raise NotImplementedError() - def to_dict(self,val: Any) -> None: + def from_dict(self, val: Any) -> None: raise NotImplementedError() + def to_dict(self, val: Any) -> None: + raise NotImplementedError() - class DuplicateEntryError(Exception): pass -if __name__ == '__main__': + +if __name__ == "__main__": m = MetaHandler() - m.add({ - 'name':'test', - 'start':0, - 'stop':1 - }) + m.add({"name": "test", "start": 0, "stop": 1}) print(m) -