from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Literal
from ...bliss_globals import current_session # type: ignore
from ...id31.stop_scan_preset import StopIntegrateSum
from ...utils import directories
from .streamline_id31 import id31_patching
from .utils.pyfai_calib import ensure_pyfai_config
_logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Minimal Ewoks workflow: PyFaiConfig → IntegrateAndSendMsg → WriteSumFrame
# (mirrors the graph built by DemoId31StreamlineScanner.init_auto_stop_workflow)
# ---------------------------------------------------------------------------
_DEMO_WORKFLOW = {
"graph": {"schema_version": "1.1", "id": "stop_scan_pyfai_demo"},
"nodes": [
{
"id": "config",
"task_type": "class",
"task_identifier": "ewoksxrpd.tasks.pyfaiconfig.PyFaiConfig",
},
{
"id": "integrate",
"task_type": "class",
"task_identifier": "blissoda.demo.tasks.stop_scan_xrpd_integrate.IntegrateAndSendMsg",
},
{
"id": "write_sum",
"task_type": "class",
"task_identifier": "blissoda.demo.tasks.stop_scan_xrpd_integrate.WriteSumFrame",
},
],
"links": [
{
"source": "config",
"target": "integrate",
"data_mapping": [
{
"source_output": "detector_config",
"target_input": "detector_config",
},
{"source_output": "energy", "target_input": "energy"},
{
"source_output": "integration_options",
"target_input": "integration_options",
},
{"source_output": "detector", "target_input": "detector"},
{"source_output": "mask", "target_input": "mask"},
{"source_output": "geometry", "target_input": "geometry"},
],
},
{
"source": "integrate",
"target": "write_sum",
"data_mapping": [
{
"source_output": "acc_profile_threshold",
"target_input": "acc_profile_threshold",
},
{
"source_output": "sum_frame_max_threshold",
"target_input": "sum_frame_max_threshold",
},
{
"source_output": "acc_profile_threshold_reached",
"target_input": "acc_profile_threshold_reached",
},
{
"source_output": "sum_frame_max_threshold_reached",
"target_input": "sum_frame_max_threshold_reached",
},
{
"source_output": "acc_profile_value_at_stop",
"target_input": "acc_profile_value_at_stop",
},
{
"source_output": "sum_frame_max_value_at_stop",
"target_input": "sum_frame_max_value_at_stop",
},
],
},
],
}
# ---------------------------------------------------------------------------
# DemoStopIntegrateSum
# ---------------------------------------------------------------------------
[docs]
class DemoStopIntegrateSum(StopIntegrateSum):
"""A demo-friendly ``StopIntegrateSum`` for the Bliss demo session.
Calls ``id31_patching()`` before ``super().__init__()`` so that:
* ``setup_globals.atten`` resolves to the mock ``controllers.Attenuator``
(with ``.bits`` attribute),
* ``from id31.attenuator import Attenuator, SiO2trans`` resolves to the
mock implementations.
The PyFAI config JSON and Ewoks workflow are auto-created in the
processed directory before the parent is initialised.
All other behaviour (workflow submission, Redis stream feedback,
``ScanPointIterator`` / ``_adjust_filter`` logic) is inherited unchanged.
"""
def __init__(
self,
workflow_threshold: float,
detector_name: str = "difflab6",
detector_saturation: float = 1e6,
frame_target_max: float | None = None,
frame_target_min: float | None = None,
attenuation_mode: Literal["freeze", "reactive"] = "reactive",
ghost_threshold_per_frame: float = 3750.0,
spottiness_threshold: float = 0.1,
spotty_safe_atten: int = 16,
spotty_stability_frames: int = 5,
spotty_stability_tol: float = 0.1,
spotty_extra_atten: int = 2,
metric_timeout: float = 2.0,
sum_frame_max_threshold: float = 3e5,
canned_metrics: list[dict] | None = None,
) -> None:
"""Demo wrapper around :class:`StopIntegrateSum`.
:param canned_metrics: If provided, the real Ewoks workflow submission is
skipped and these events are xadded onto the feedback Redis stream at
scan.start() time. Each item is a dict with keys ``frame``,
``max_pixel``, ``spottiness`` (``type`` is always set to ``"metric"``).
Useful for deterministic "freeze" attenuation mode tests that do not want
to depend on the demo workflow producing events.
Note: the demo defaults to ``attenuation_mode="reactive"`` to keep the
existing ``test_attenuation_increase`` / ``test_attenuation_decrease``
tests passing unchanged. Freeze mode tests must opt in explicitly.
"""
id31_patching(energy=75.05) # need it for proper atten integration tests
pyfai_config_path = ensure_pyfai_config("difflab6")
workflow_path = _ensure_demo_workflow()
super().__init__(
workflow_threshold=workflow_threshold,
workflow_path=workflow_path,
detector_name=detector_name,
detector_saturation=detector_saturation,
pyfai_config_path=pyfai_config_path,
attenuator_name="atten",
frame_target_max=frame_target_max,
frame_target_min=frame_target_min,
ewoksjob_queue="celery",
attenuation_mode=attenuation_mode,
ghost_threshold_per_frame=ghost_threshold_per_frame,
spottiness_threshold=spottiness_threshold,
spotty_safe_atten=spotty_safe_atten,
spotty_stability_frames=spotty_stability_frames,
spotty_stability_tol=spotty_stability_tol,
spotty_extra_atten=spotty_extra_atten,
metric_timeout=metric_timeout,
sum_frame_max_threshold=sum_frame_max_threshold,
)
self._canned_metrics = list(canned_metrics) if canned_metrics else []
def _start_workflow(self) -> None:
"""Override: when canned_metrics are provided, skip Celery submission
and xadd the canned events directly onto the feedback stream. Otherwise
fall through to the real StopIntegrateSum._start_workflow path.
"""
if not self._canned_metrics:
super()._start_workflow()
return
# Mirror the minimum of super()._start_workflow needed for the
# iterator loop: clear stream, reset last_id, push canned events, and
# spawn the event-collector greenlet.
import gevent
self._redis.delete(self._stream_name)
self._last_id = "0-0"
for m in self._canned_metrics:
event = {"type": "metric"}
for k in ("frame", "max_pixel", "spottiness"):
if k in m:
event[k] = str(m[k])
self._redis.xadd(self._stream_name, event)
self._greenlet = gevent.spawn(self._background_event_collector)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _ensure_demo_workflow() -> str:
"""Write the demo workflow JSON into the processed directory.
Always rewrites so the on-disk JSON matches the current ``_DEMO_WORKFLOW``
definition (otherwise a stale file from a prior package version would
silently override schema updates).
"""
processed_dir = directories.get_processed_dir(current_session.scan_saving.filename)
workflow_path = Path(processed_dir, "workflows", "stop_scan_xrpd_id31.json")
workflow_path.parent.mkdir(parents=True, exist_ok=True)
workflow_path.write_text(json.dumps(_DEMO_WORKFLOW, indent=2))
return str(workflow_path)