Source code for blissoda.processor

from __future__ import annotations

import logging
from abc import abstractmethod
from typing import Any
from typing import Dict
from typing import Optional

from .automation import BlissAutomationObject
from .flint.plotter import BasePlotter
from .import_utils import unavailable_type
from .persistent.parameters import ParameterInfo
from .persistent.parameters import autocomplete_property
from .utils import trigger

try:
    from bliss.scanning.scan import Scan as BlissScanType
except ImportError as ex:
    BlissScanType = unavailable_type(ex)

logger = logging.getLogger(__name__)


[docs] class BaseProcessor( BlissAutomationObject, parameters=[ ParameterInfo("_enabled"), ParameterInfo("trigger_at", category="workflows"), ], ): """Enable and disable workflow triggering on new scans.""" def __init__( self, config: Optional[Dict[str, Any]] = None, defaults: Optional[Dict[str, Any]] = None, **deprecated_defaults: Dict[str, Any], ) -> None: defaults = self._merge_defaults(deprecated_defaults, defaults) defaults.setdefault("_enabled", False) defaults.setdefault("trigger_at", "PREPARED") # START, PREPARED, END super().__init__(config=config, defaults=defaults) if self._HAS_BLISS: if self._enabled: self._register_workflow_trigger() else: self._unregister_workflow_trigger() def __del__(self) -> None: try: if self._HAS_BLISS: self._unregister_workflow_trigger() except Exception: pass def __repr__(self) -> str: return ( f"<{self.__class__.__module__}.{self.__class__.__name__} " f"object at {hex(id(self))} REDIS KEY={self._redis_key!r} ENABLED={self._enabled}>" ) def _info_categories(self) -> Dict[str, dict]: categories = super()._info_categories() status = categories.setdefault("status", {}) status["Enabled"] = self._enabled return categories @autocomplete_property def trigger_at(self) -> Optional[str]: return self._get_parameter("trigger_at") @trigger_at.setter def trigger_at(self, value: str): self._set_parameter("trigger_at", value) if self._enabled: self.disable() self.enable()
[docs] def enable(self): if self._enabled: return self._register_workflow_trigger() self._enabled = True
[docs] def disable(self): if not self._enabled: return self._unregister_workflow_trigger() self._enabled = False
def _register_workflow_trigger(self): trigger.enable_processing_trigger( self._processing_id(), self._trigger_workflow_on_new_scan, self.trigger_at, ) def _unregister_workflow_trigger(self): trigger.disable_processing_trigger( self._processing_id(), ) def _processing_id(self) -> str: return self._redis_key @abstractmethod def _trigger_workflow_on_new_scan(self, scan: BlissScanType) -> Optional[dict]: pass
[docs] class BaseProcessorWithPlotting( BaseProcessor, parameters=[ ParameterInfo("_plotting_enabled"), ParameterInfo("number_of_scans", category="plotting"), ], ): """Enable and disable workflow triggering and Flint plotting on new scans.""" plotter_class = BasePlotter def __init__( self, config: Optional[Dict[str, Any]] = None, defaults: Optional[Dict[str, Any]] = None, **deprecated_defaults: Dict[str, Any], ) -> None: defaults = self._merge_defaults(deprecated_defaults, defaults) defaults.setdefault("_plotting_enabled", True) defaults.setdefault("number_of_scans", 4) super().__init__(config=config, defaults=defaults) if self._plotting_enabled: self._plotter = self.plotter_class(self.number_of_scans) if self._HAS_BLISS: self._plotter.replot() else: self._plotter = None def _info_categories(self) -> Dict[str, dict]: categories = super()._info_categories() status = categories.setdefault("status", {}) status["Plots in Flint"] = self._plotter is not None if self._plotter: categories["status"]["Plotting tasks"] = self._plotter.purge_tasks() return categories @property def plotter(self) -> Optional[plotter_class]: return self._plotter
[docs] def enable_plotting(self): if self._plotter is not None: return self._plotter = self.plotter_class(self.number_of_scans) self._plotter.replot() self._plotting_enabled = True
[docs] def disable_plotting(self): if self._plotter is None: return self.stop_plotting_tasks() self._plotter = None self._plotting_enabled = False
[docs] def clear_plots(self) -> None: if self._plotter: return self._plotter.clear() else: print("Plotting is disabled")
[docs] def replot(self) -> None: if self._plotter: return self._plotter.replot() else: print("Plotting is disabled")
[docs] def purge_plotting_tasks(self) -> int: if self._plotter: return self._plotter.purge_tasks() else: print("Plotting is disabled") return 0
[docs] def stop_plotting_tasks(self) -> int: if self._plotter: return self._plotter.kill_tasks() else: print("Plotting is disabled") return 0
@autocomplete_property def number_of_scans(self) -> int: return self._get_parameter("number_of_scans") @number_of_scans.setter def number_of_scans(self, value: int): self._plotter.number_of_scans = value self._set_parameter("number_of_scans", self._plotter.number_of_scans)