Source code for blissoda.demo.processors.stop_scan_xrpd_id31

from __future__ import annotations

import json
import logging
from pathlib import Path
from typing import Literal

from ...bliss_globals import current_session  # type: ignore
from ...id31.stop_scan_preset import StopIntegrateSum
from ...utils import directories
from .streamline_id31 import id31_patching
from .utils.pyfai_calib import ensure_pyfai_config

_logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Minimal Ewoks workflow: PyFaiConfig → IntegrateAndSendMsg → WriteSumFrame
# (mirrors the graph built by DemoId31StreamlineScanner.init_auto_stop_workflow)
# ---------------------------------------------------------------------------
_DEMO_WORKFLOW = {
    "graph": {"schema_version": "1.1", "id": "stop_scan_pyfai_demo"},
    "nodes": [
        {
            "id": "config",
            "task_type": "class",
            "task_identifier": "ewoksxrpd.tasks.pyfaiconfig.PyFaiConfig",
        },
        {
            "id": "integrate",
            "task_type": "class",
            "task_identifier": "blissoda.demo.tasks.stop_scan_xrpd_integrate.IntegrateAndSendMsg",
        },
        {
            "id": "write_sum",
            "task_type": "class",
            "task_identifier": "blissoda.demo.tasks.stop_scan_xrpd_integrate.WriteSumFrame",
        },
    ],
    "links": [
        {
            "source": "config",
            "target": "integrate",
            "data_mapping": [
                {
                    "source_output": "detector_config",
                    "target_input": "detector_config",
                },
                {"source_output": "energy", "target_input": "energy"},
                {
                    "source_output": "integration_options",
                    "target_input": "integration_options",
                },
                {"source_output": "detector", "target_input": "detector"},
                {"source_output": "mask", "target_input": "mask"},
                {"source_output": "geometry", "target_input": "geometry"},
            ],
        },
        {
            "source": "integrate",
            "target": "write_sum",
            "data_mapping": [
                {
                    "source_output": "acc_profile_threshold",
                    "target_input": "acc_profile_threshold",
                },
                {
                    "source_output": "sum_frame_max_threshold",
                    "target_input": "sum_frame_max_threshold",
                },
                {
                    "source_output": "acc_profile_threshold_reached",
                    "target_input": "acc_profile_threshold_reached",
                },
                {
                    "source_output": "sum_frame_max_threshold_reached",
                    "target_input": "sum_frame_max_threshold_reached",
                },
                {
                    "source_output": "acc_profile_value_at_stop",
                    "target_input": "acc_profile_value_at_stop",
                },
                {
                    "source_output": "sum_frame_max_value_at_stop",
                    "target_input": "sum_frame_max_value_at_stop",
                },
            ],
        },
    ],
}


# ---------------------------------------------------------------------------
# DemoStopIntegrateSum
# ---------------------------------------------------------------------------
[docs] class DemoStopIntegrateSum(StopIntegrateSum): """A demo-friendly ``StopIntegrateSum`` for the Bliss demo session. Calls ``id31_patching()`` before ``super().__init__()`` so that: * ``setup_globals.atten`` resolves to the mock ``controllers.Attenuator`` (with ``.bits`` attribute), * ``from id31.attenuator import Attenuator, SiO2trans`` resolves to the mock implementations. The PyFAI config JSON and Ewoks workflow are auto-created in the processed directory before the parent is initialised. All other behaviour (workflow submission, Redis stream feedback, ``ScanPointIterator`` / ``_adjust_filter`` logic) is inherited unchanged. """ def __init__( self, workflow_threshold: float, detector_name: str = "difflab6", detector_saturation: float = 1e6, frame_target_max: float | None = None, frame_target_min: float | None = None, attenuation_mode: Literal["freeze", "reactive"] = "reactive", ghost_threshold_per_frame: float = 3750.0, spottiness_threshold: float = 0.1, spotty_safe_atten: int = 16, spotty_stability_frames: int = 5, spotty_stability_tol: float = 0.1, spotty_extra_atten: int = 2, metric_timeout: float = 2.0, sum_frame_max_threshold: float = 3e5, canned_metrics: list[dict] | None = None, ) -> None: """Demo wrapper around :class:`StopIntegrateSum`. :param canned_metrics: If provided, the real Ewoks workflow submission is skipped and these events are xadded onto the feedback Redis stream at scan.start() time. Each item is a dict with keys ``frame``, ``max_pixel``, ``spottiness`` (``type`` is always set to ``"metric"``). Useful for deterministic "freeze" attenuation mode tests that do not want to depend on the demo workflow producing events. Note: the demo defaults to ``attenuation_mode="reactive"`` to keep the existing ``test_attenuation_increase`` / ``test_attenuation_decrease`` tests passing unchanged. Freeze mode tests must opt in explicitly. """ id31_patching(energy=75.05) # need it for proper atten integration tests pyfai_config_path = ensure_pyfai_config("difflab6") workflow_path = _ensure_demo_workflow() super().__init__( workflow_threshold=workflow_threshold, workflow_path=workflow_path, detector_name=detector_name, detector_saturation=detector_saturation, pyfai_config_path=pyfai_config_path, attenuator_name="atten", frame_target_max=frame_target_max, frame_target_min=frame_target_min, ewoksjob_queue="celery", attenuation_mode=attenuation_mode, ghost_threshold_per_frame=ghost_threshold_per_frame, spottiness_threshold=spottiness_threshold, spotty_safe_atten=spotty_safe_atten, spotty_stability_frames=spotty_stability_frames, spotty_stability_tol=spotty_stability_tol, spotty_extra_atten=spotty_extra_atten, metric_timeout=metric_timeout, sum_frame_max_threshold=sum_frame_max_threshold, ) self._canned_metrics = list(canned_metrics) if canned_metrics else [] def _start_workflow(self) -> None: """Override: when canned_metrics are provided, skip Celery submission and xadd the canned events directly onto the feedback stream. Otherwise fall through to the real StopIntegrateSum._start_workflow path. """ if not self._canned_metrics: super()._start_workflow() return # Mirror the minimum of super()._start_workflow needed for the # iterator loop: clear stream, reset last_id, push canned events, and # spawn the event-collector greenlet. import gevent self._redis.delete(self._stream_name) self._last_id = "0-0" for m in self._canned_metrics: event = {"type": "metric"} for k in ("frame", "max_pixel", "spottiness"): if k in m: event[k] = str(m[k]) self._redis.xadd(self._stream_name, event) self._greenlet = gevent.spawn(self._background_event_collector)
# --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _ensure_demo_workflow() -> str: """Write the demo workflow JSON into the processed directory. Always rewrites so the on-disk JSON matches the current ``_DEMO_WORKFLOW`` definition (otherwise a stale file from a prior package version would silently override schema updates). """ processed_dir = directories.get_processed_dir(current_session.scan_saving.filename) workflow_path = Path(processed_dir, "workflows", "stop_scan_xrpd_id31.json") workflow_path.parent.mkdir(parents=True, exist_ok=True) workflow_path.write_text(json.dumps(_DEMO_WORKFLOW, indent=2)) return str(workflow_path)