Source code for blissoda.xrpd.plot_data

import datetime
import logging
import os
from typing import Dict
from typing import List
from typing import NamedTuple
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Union

import numpy
from silx.io import h5py_utils
from silx.utils.retry import RetryError
from silx.utils.retry import RetryTimeoutError

from ..import_utils import unavailable_class
from .models import XrpdFieldName
from .models import XrpdPlotInfo
from .utils import get_axis_data
from .utils import nfs_cache_refresh

try:
    from gevent.lock import RLock
except ImportError as ex:
    RLock = unavailable_class(ex)

logger = logging.getLogger(__name__)


[docs] def create_plot_info( scan_name: str, lima_name: str, radial_label: str, azim_label: Optional[str], color: Optional[str], plot_data: Dict[XrpdFieldName, numpy.ndarray], hdf5_url: Optional[str] = None, timestamp: Union[str, datetime.datetime, None] = None, is_sum: bool = False, ) -> XrpdPlotInfo: with RLock(): plot_info = XrpdPlotInfo( scan_name=scan_name, lima_name=lima_name, radial_label=radial_label, azim_label=azim_label, hdf5_url=hdf5_url, timestamp=timestamp, color=color, field_names=list(plot_data.keys()), is_sum=is_sum, ) plot_info.save() for p in get_plots(): if p.lima_name != lima_name: continue if p.pk == plot_info.pk: continue if p.timestamp < plot_info.timestamp: p.is_last_scan = False p.save() else: plot_info.is_last_scan = False p.save() logger.debug("XrpdPlotInfo CREATE %r", plot_info.legend) plot_info.delete_data_arrays() for key, data in plot_data.items(): persistent_array = plot_info.get_data_array(key) persistent_array.extend(data) return plot_info
[docs] def get_plots_to_remove(max_len: int) -> List[XrpdPlotInfo]: all_plots = get_plots() if max_len <= 0: return all_plots remove: List[XrpdPlotInfo] = list() keep_scans: Set[str] = set() for plot_info in all_plots[::-1]: scan_name = plot_info.scan_name if len(keep_scans) == max_len and scan_name not in keep_scans: remove.append(plot_info) else: keep_scans.add(scan_name) return remove
[docs] def delete_plot_info(plot_info: XrpdPlotInfo) -> None: plot_info.delete_data_arrays() XrpdPlotInfo.delete(plot_info.pk) logger.debug("XrpdPlotInfo DEL %r", plot_info.legend)
[docs] def get_curve_data( plot_key: str, **retry_options ) -> Tuple[Optional[numpy.ndarray], Optional[numpy.ndarray], XrpdPlotInfo]: """Get the data from the results of the last processed point of the scan.""" plot_info = XrpdPlotInfo.get(plot_key) try: x = plot_info.get_data_array("radial")[()] except IndexError: return None, None, plot_info y = _get_integrated_intensity(plot_info, idx=-1, **retry_options) return x, y, plot_info
[docs] def append_image_data( plot_key: str, current_data: Optional[numpy.ndarray] = None, **retry_options, ) -> Tuple[Optional[numpy.ndarray], Optional[numpy.ndarray], XrpdPlotInfo]: """Add new data (if any) to the current data (if any)""" plot_info = XrpdPlotInfo.get(plot_key) try: x = plot_info.get_data_array("radial")[()] except IndexError: return None, None, plot_info if plot_info.hdf5_url: if current_data is None: idx = tuple() else: idx = slice(len(current_data), None) try: y = _get_data_from_file(plot_info.hdf5_url, idx=idx, **retry_options) except RetryTimeoutError: y = None else: if current_data is not None: y = numpy.vstack([current_data, y]) else: y = plot_info.get_data_array("intensity")[()] return x, y, plot_info
def _get_integrated_intensity(plot_info: XrpdPlotInfo, idx=tuple(), **retry_options): if plot_info.hdf5_url: try: return _get_data_from_file(plot_info.hdf5_url, idx=idx, **retry_options) except RetryTimeoutError: return None try: return plot_info.get_data_array("intensity")[idx] except IndexError: return None
[docs] def get_2d_integration_data(plot_key: str, idx=tuple(), **retry_options) -> Tuple[ Optional[numpy.ndarray], Optional[numpy.ndarray], Optional[numpy.ndarray], XrpdPlotInfo, ]: plot_info = XrpdPlotInfo.get(plot_key) try: x = plot_info.get_data_array("radial")[()] y = plot_info.get_data_array("azimuthal")[()] except IndexError: return None, None, None, plot_info intensity = _get_integrated_intensity(plot_info, idx=idx, **retry_options) return x, y, intensity, plot_info
@h5py_utils.retry() def _get_data_from_file(hdf5_url: str, idx=tuple()): filename, dsetname = hdf5_url.split("::") nfs_cache_refresh(os.path.dirname(filename)) with h5py_utils.File(filename) as root: try: return root[dsetname][idx] except KeyError as e: raise RetryError(str(e))
[docs] def get_plots() -> List[XrpdPlotInfo]: with RLock(): return sorted( [XrpdPlotInfo.get(pk) for pk in XrpdPlotInfo.all_pks()], key=lambda p: p.timestamp, )
[docs] def delete_old_entries(current_session) -> int: """Delete Redis entries from an older blissoda version""" db = XrpdPlotInfo._meta.database old_keys = db.keys(f"blissoda:{current_session.name}:Plotter*") for key in old_keys: XrpdPlotInfo._meta.database.delete(key) return len(old_keys)
[docs] class XrpdNXDataInfo(NamedTuple): radial_label: str azim_label: Optional[str] plot_data: Dict[XrpdFieldName, numpy.ndarray] intensity_url: str
[docs] @h5py_utils.retry() def get_xrpd_nxdata_info(nxdata_url: str) -> XrpdNXDataInfo: """Returns NXdata radial axis info and HDF5 URL of the intensity dataset""" plot_data: Dict[XrpdFieldName, numpy.ndarray] = dict() filename, nxdata_name = nxdata_url.split("::") nfs_cache_refresh(os.path.dirname(filename)) with h5py_utils.File(filename, mode="r") as f: try: nxdata = f[nxdata_name] radial_name = nxdata.attrs["axes"][-1] radial_label, radial_data = get_axis_data(nxdata, radial_name) plot_data["radial"] = radial_data signal = nxdata[nxdata.attrs["signal"]] if signal.ndim > 2: azim_name = nxdata.attrs["axes"][-2] azim_label, azim_data = get_axis_data(nxdata, azim_name) plot_data["azimuthal"] = azim_data else: azim_label = None except KeyError as e: raise RetryError(str(e)) return XrpdNXDataInfo( radial_label=radial_label, azim_label=azim_label, plot_data=plot_data, intensity_url=f"{filename}::{signal.name}", )