Source code for blissoda.utils.trigger

import functools
from typing import Union, Callable, Optional

try:
    from bliss.scanning.scan import Scan
    from bliss.scanning.scan_state import ScanState
    from bliss.scanning.scan_meta import META_TIMING
    from bliss.scanning.scan_meta import get_user_scan_meta
except ImportError:
    from typing import Any as Scan
    from typing import Any as ScanState
    from typing import Any as META_TIMING

[docs] def get_user_scan_meta(): raise ImportError("bliss is required")
TriggerType = Callable[[Scan], Optional[dict]] _SCAN_META_CATEGORY = "workflows"
[docs] def enable_processing_trigger( processing_id: str, trigger: TriggerType, timing: Union[str, META_TIMING], ): """ Execute `trigger` when the scan reaches a certain stage (PREPARED by default). When `trigger` returns a dictionary it will merged with the dictionary of all other trigger functions in the Bliss session and saved in HDF5 under the "/x.y/workflows" group. The dictionary follows the convention accepted by the `dicttonx` method from the silx library. The `processing_id` needs to be unique within a Bliss session. """ if isinstance(timing, str): try: timing = META_TIMING[timing.upper()] except KeyError: raise KeyError( f"timing must be one of {META_TIMING.__members__.values()}" ) from None elif not isinstance(timing, META_TIMING): raise TypeError( f"'timing' must be of type 'str' or 'META_TIMING', got '{type(timing)}'" ) scan_meta_obj = get_user_scan_meta() if _SCAN_META_CATEGORY not in scan_meta_obj.categories_names(): scan_meta_obj.add_categories({_SCAN_META_CATEGORY}) meta_category = getattr(scan_meta_obj, _SCAN_META_CATEGORY) # Make category available for all timings (filter in the `_trigger_on_timing` wrapper) for t in META_TIMING.__members__.values(): meta_category.timing |= t meta_category.set("@NX_class", {"@NX_class": "NXcollection"}) else: meta_category = getattr(scan_meta_obj, _SCAN_META_CATEGORY) if meta_category.is_set(processing_id): raise RuntimeError( f"Another processor has already registered the processing id '{processing_id}'" ) trigger_on_state = _trigger_on_timing(timing)(trigger) meta_category.set(processing_id, trigger_on_state)
[docs] def disable_processing_trigger(processing_id: str): scan_meta_obj = get_user_scan_meta() if _SCAN_META_CATEGORY not in scan_meta_obj.categories_names(): return meta_category = getattr(scan_meta_obj, _SCAN_META_CATEGORY) if meta_category.is_set(processing_id): meta_category.remove(processing_id)
def _trigger_on_timing(timing: META_TIMING): def _decorator(trigger: TriggerType): @functools.wraps(trigger) def _wrapper(scan=None): if scan is None: return current_timing = _get_meta_timing(scan) if timing & current_timing: # The processing was enabled for this timing return trigger(scan) return _wrapper return _decorator def _get_meta_timing(scan: Scan) -> META_TIMING: if scan.state < ScanState.PREPARING: return META_TIMING.START if scan.state == ScanState.PREPARING: return META_TIMING.PREPARED return META_TIMING.END