Source code for blissoda.utils.trigger
import functools
import logging
from typing import Callable
from typing import Optional
from typing import Union
from ..import_utils import unavailable_class
from ..import_utils import unavailable_function
from ..import_utils import unavailable_type
try:
from bliss.scanning.scan_meta import get_user_scan_meta
except ImportError as ex:
get_user_scan_meta = unavailable_function(ex)
try:
from bliss.scanning.scan import Scan as BlissScanType
except ImportError as ex:
BlissScanType = unavailable_type(ex)
try:
from bliss.scanning.scan_meta import META_TIMING
except ImportError as ex:
META_TIMING = unavailable_class(ex)
try:
from bliss.scanning.scan_state import ScanState
except ImportError as ex:
ScanState = unavailable_class(ex)
TriggerType = Callable[[BlissScanType], Optional[dict]]
_SCAN_META_CATEGORY = "workflows"
logger = logging.getLogger(__name__)
[docs]
def enable_processing_trigger(
processing_id: str,
trigger: TriggerType,
timing: Union[str, META_TIMING],
):
"""
Execute `trigger` when the scan reaches a certain stage (PREPARED by default).
When `trigger` returns a dictionary it will merged with the dictionary of
all other trigger functions in the Bliss session and saved in HDF5 under
the "/x.y/workflows" group. The dictionary follows the convention accepted
by the `dicttonx` method from the silx library.
The `processing_id` needs to be unique within a Bliss session.
"""
if isinstance(timing, str):
try:
timing = META_TIMING[timing.upper()]
except KeyError:
raise KeyError(
f"timing must be one of {META_TIMING.__members__.values()}"
) from None
elif not isinstance(timing, META_TIMING):
raise TypeError(
f"'timing' must be of type 'str' or 'META_TIMING', got '{type(timing)}'"
)
scan_meta_obj = get_user_scan_meta()
if _SCAN_META_CATEGORY not in scan_meta_obj.categories_names():
scan_meta_obj.add_categories({_SCAN_META_CATEGORY})
meta_category = getattr(scan_meta_obj, _SCAN_META_CATEGORY)
# Make category available for all timings (filter in the `_trigger_on_timing` wrapper)
for t in META_TIMING.__members__.values():
meta_category.timing |= t
meta_category.set("@NX_class", {"@NX_class": "NXcollection"})
else:
meta_category = getattr(scan_meta_obj, _SCAN_META_CATEGORY)
if meta_category.is_set(processing_id):
raise RuntimeError(
f"Another processor has already registered the processing id '{processing_id}'"
)
trigger_on_state = _trigger_on_timing(timing)(trigger)
logger.info("Blissoda: trigger %r on %s enabled", processing_id, timing)
meta_category.set(processing_id, trigger_on_state)
[docs]
def disable_processing_trigger(processing_id: str):
scan_meta_obj = get_user_scan_meta()
if _SCAN_META_CATEGORY not in scan_meta_obj.categories_names():
return
meta_category = getattr(scan_meta_obj, _SCAN_META_CATEGORY)
if meta_category.is_set(processing_id):
meta_category.remove(processing_id)
logger.info("Blissoda: trigger %r disabled", processing_id)
def _trigger_on_timing(timing: META_TIMING):
def _decorator(trigger: TriggerType):
@functools.wraps(trigger)
def _wrapper(scan=None):
if scan is None:
return
current_timing = _get_meta_timing(scan)
if timing & current_timing:
# The processing was enabled for this timing
return trigger(scan)
return _wrapper
return _decorator
def _get_meta_timing(scan: BlissScanType) -> META_TIMING:
if scan.state < ScanState.PREPARING:
return META_TIMING.START
if scan.state == ScanState.PREPARING:
return META_TIMING.PREPARED
return META_TIMING.END