"""Unit tests for the ``StopIntegrateSum`` attenuation "freeze-mode" state machine.
These tests drive ``ScanPointIterator._frame0_setup_step`` directly with a
mocked parent preset, no Bliss session is needed. Integration tests are not adequate
to check freeze mode as the demo Bliss detector (``difflab6``) does not respond
to changes in attenuation (e.g. via ``setup_globals.atten.bits``).
"""
from __future__ import annotations
import logging
import types
from collections import deque
from unittest.mock import MagicMock
import numpy
import pytest
from ..id31 import stop_scan_preset
from ..id31.stop_scan_preset import ScanPointIterator
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
class _FakeFilter:
"""Minimal stand-in for the id31 Attenuator object: just holds `bits`."""
def __init__(self, bits: int = 0):
self.bits = bits
def _make_parent(
*,
attenuation_mode: str = "freeze",
ghost_threshold_per_frame: float = 1_000.0,
spottiness_threshold: float = 0.1,
spotty_safe_atten: int = 16,
spotty_stability_frames: int = 5,
spotty_stability_tol: float = 0.1,
spotty_extra_atten: int = 2,
metric_timeout: float = 0.2,
initial_bits: int = 0,
) -> MagicMock:
"""Build a mock parent preset exposing only the attributes the iterator uses."""
parent = MagicMock(name="StopIntegrateSum")
parent._atten_mode = attenuation_mode
parent._ghost_threshold_per_frame = ghost_threshold_per_frame
parent._spottiness_threshold = spottiness_threshold
parent._spotty_safe_atten = spotty_safe_atten
parent._spotty_stability_frames = spotty_stability_frames
parent._spotty_stability_tol = spotty_stability_tol
parent._spotty_extra_atten = spotty_extra_atten
parent._metric_timeout = metric_timeout
parent._setup_phase_done = False
parent._classification = None
parent._atten_frozen_at = None
parent._spotty_probe_start_iter = None
parent._metrics_by_frame = deque(maxlen=32)
parent._events_buffer = []
parent._filter = _FakeFilter(bits=initial_bits)
parent._cursor = MagicMock()
# _apply_filter mutates the filter's bits and records the call
def _apply_filter(bits: int) -> None:
parent._filter.bits = int(bits)
parent._apply_filter = MagicMock(side_effect=_apply_filter)
# predict that stepping down never crosses the threshold (tests override
# when a boundary scenario is needed)
parent._predict_intensity_at = MagicMock(return_value=0.0)
return parent
class _FrameView:
def __init__(self, frame: numpy.ndarray):
self._frame = frame
def get_data(self) -> numpy.ndarray:
return self._frame
def _queue_frames(parent: MagicMock, maxes: list[float]) -> None:
"""Programme the cursor to return a frame with each given max pixel in turn."""
def _read(timeout=None, last_only=True): # noqa: ARG001
try:
next_max = _read.queue.pop(0)
except IndexError:
next_max = _read.queue_last # keep returning the last one
_read.queue_last = next_max
return _FrameView(numpy.array([[next_max]], dtype=float))
_read.queue = list(maxes)
_read.queue_last = maxes[-1] if maxes else 0.0
parent._cursor.read = _read
def _push_metric(
parent: MagicMock, frame: int, spottiness: float, max_pixel: float = 0.0
) -> None:
"""Push a metric event onto the raw events buffer (byte-keyed like Redis)."""
parent._events_buffer.append(
{
b"type": b"metric",
b"frame": str(frame).encode(),
b"max_pixel": str(max_pixel).encode(),
b"spottiness": str(spottiness).encode(),
}
)
def _make_iter(parent: MagicMock, iter_nb: int) -> ScanPointIterator:
chain = MagicMock(name="AcquisitionChain")
it = ScanPointIterator(parent=parent, chain=chain, iter_nb=iter_nb)
return it
# ---------------------------------------------------------------------------
# Tests: event dispatch
# ---------------------------------------------------------------------------
[docs]
def test_event_dispatch_metric_stop_unknown(caplog):
"""_check_workflow_events routes each event type to the right sink."""
parent = _make_parent()
parent._events_buffer = [
{
b"type": b"metric",
b"frame": b"3",
b"max_pixel": b"42",
b"spottiness": b"0.5",
},
{b"type": b"unknown"},
{b"type": b"stop"},
]
it = _make_iter(parent, iter_nb=2)
with caplog.at_level(logging.DEBUG):
it._check_workflow_events()
# stop event triggers scan.stop; subsequent events after stop are not
# guaranteed to be processed (stop returns early)
it.chain.scan.stop.assert_called_once_with(wait=False)
assert list(parent._metrics_by_frame) == [
{"frame": 3, "max_pixel": 42.0, "spottiness": 0.5}
]
assert any("Unknown event type" in r.message for r in caplog.records)
# ---------------------------------------------------------------------------
# Tests: freeze-mode — standard branch
# ---------------------------------------------------------------------------
[docs]
def test_freeze_standard_convergence():
"""Standard-branch safe-frame loop converges and then freezes attenuation.
Frame 0 is above the ghost threshold, frame at bits=1 still above, frame at
bits=2 below the threshold. Stepping down from 2 to 1 would cross back
above (predict_at returns ≥ threshold), so bits=2 is the sweet spot.
"""
parent = _make_parent(
ghost_threshold_per_frame=1000.0, spottiness_threshold=0.1, initial_bits=0
)
# frame queue: frame 0 max=5000 (>>thresh), frame 1 max=2000 (>thresh),
# frame 2 max=300 (<thresh). Any subsequent frame returns 300.
_queue_frames(parent, [5000.0, 2000.0, 300.0])
# step-down from 2→1 would put imax back above threshold → return ≥ threshold
parent._predict_intensity_at.side_effect = lambda imax, cur, tgt: (
1500.0 if (cur, tgt) == (2, 1) else 0.0
)
# inject the frame-0 "standard" classification metric
_push_metric(parent, frame=0, spottiness=0.05)
# iter 1 sees frame 0 (max=5000 ≥ thresh) → atten 0→1
_make_iter(parent, iter_nb=1).prepare()
assert parent._classification == "standard"
assert parent._filter.bits == 1
assert parent._setup_phase_done is False
# iter 2 sees frame 1 (max=2000 ≥ thresh) → atten 1→2
_make_iter(parent, iter_nb=2).prepare()
assert parent._filter.bits == 2
assert parent._setup_phase_done is False
# iter 3 sees frame 2 (max=300 < thresh); predict at 1 ≥ thresh → keep/freeze
_make_iter(parent, iter_nb=3).prepare()
assert parent._setup_phase_done is True
assert parent._atten_frozen_at == 2
assert parent._filter.bits == 2
assert parent._classification == "standard"
# subsequent iters: no more filter moves
calls_before = parent._apply_filter.call_count
_make_iter(parent, iter_nb=4).prepare()
_make_iter(parent, iter_nb=5).prepare()
_make_iter(parent, iter_nb=10).prepare()
assert parent._apply_filter.call_count == calls_before
[docs]
def test_freeze_standard_decrement_then_below_threshold():
"""Frame 0 below threshold and stepping down stays below → keep stepping
down until the predict-back-up check triggers the final +1 hysteresis.
"""
parent = _make_parent(ghost_threshold_per_frame=1000.0, initial_bits=4)
# every frame reads 500 (below threshold). Stepping down is always "safe"
# except once bits is low enough — the iterator stops when
# _predict_intensity_at ≥ threshold.
_queue_frames(parent, [500.0])
# predict that stepping from bits=2→1 would exceed threshold
parent._predict_intensity_at.side_effect = lambda imax, cur, tgt: (
2000.0 if (cur, tgt) == (2, 1) else 0.0
)
_push_metric(parent, frame=0, spottiness=0.01)
# iter 1: bits 4→3 (safe)
_make_iter(parent, iter_nb=1).prepare()
assert parent._filter.bits == 3
# iter 2: bits 3→2 (safe)
_make_iter(parent, iter_nb=2).prepare()
assert parent._filter.bits == 2
# iter 3: at 2, stepping down to 1 would exceed → freeze at 2
_make_iter(parent, iter_nb=3).prepare()
assert parent._setup_phase_done is True
assert parent._atten_frozen_at == 2
[docs]
def test_freeze_classification_timeout(caplog, mocker):
"""No frame-0 metric within metric_timeout → default to 'standard'."""
# ``_wait_for_frame0_metric`` calls ``gevent.sleep``; in a Bliss-less env
# the codebase's import guard turns it into an ``unavailable_module`` that
# raises on attribute access. Stub the module-level binding for this test
# only — same per-test patching style as ``tests/mock_id31``.
mocker.patch.object(
stop_scan_preset, "gevent", types.SimpleNamespace(sleep=lambda *_: None)
)
parent = _make_parent(
ghost_threshold_per_frame=1000.0, metric_timeout=0.05, initial_bits=0
)
_queue_frames(parent, [5000.0, 300.0])
parent._predict_intensity_at.side_effect = lambda *a: 2000.0
# note: no _push_metric — the wait will time out
with caplog.at_level(logging.WARNING):
_make_iter(parent, iter_nb=1).prepare()
assert parent._classification == "standard"
assert any("no frame-0 metric" in r.message for r in caplog.records)
# iter 1 should still have done one atten step (frame 0 was saturating)
assert parent._filter.bits == 1
# ---------------------------------------------------------------------------
# Tests: freeze-mode — spotty stable / unstable
# ---------------------------------------------------------------------------
[docs]
def test_freeze_spotty_stable():
"""Spotty-stable branch runs the safe-frame loop then adds extra atten."""
parent = _make_parent(
ghost_threshold_per_frame=1000.0,
spottiness_threshold=0.1,
spotty_safe_atten=16,
spotty_stability_frames=5,
spotty_stability_tol=0.1,
spotty_extra_atten=2,
initial_bits=4,
)
# classify spotty via a high frame-0 spottiness
_push_metric(parent, frame=0, spottiness=0.3)
# frames at safe_atten: all below ghost threshold, low variance
_queue_frames(parent, [200.0])
# stepping down at bits=14 would exceed threshold → settle at 14 then +2
parent._predict_intensity_at.side_effect = lambda imax, cur, tgt: (
2000.0 if (cur, tgt) == (14, 13) else 0.0
)
# iter 1: classify → spotty_probing → apply_filter(safe_atten=16)
_make_iter(parent, iter_nb=1).prepare()
assert parent._classification == "spotty_probing"
assert parent._filter.bits == 16
assert parent._spotty_probe_start_iter == 1
# feed 5 stable spottiness metrics (rel_change ≈ 0)
for f in range(1, 6):
_push_metric(parent, frame=f, spottiness=0.30 + f * 0.001)
# iter 6: probe window complete → classify spotty_stable_tuning, start
# safe-frame loop from bits=16. At 200<threshold, step down to 15.
_make_iter(parent, iter_nb=6).prepare()
assert parent._classification == "spotty_stable_tuning"
assert parent._filter.bits == 15
# iter 7: 15→14
_make_iter(parent, iter_nb=7).prepare()
assert parent._filter.bits == 14
# iter 8: at 14, stepping down predicts ≥ threshold → converge and apply
# +spotty_extra_atten (14 + 2 = 16). Freeze.
_make_iter(parent, iter_nb=8).prepare()
assert parent._classification == "spotty_stable"
assert parent._atten_frozen_at == 16
assert parent._filter.bits == 16
assert parent._setup_phase_done is True
[docs]
def test_freeze_spotty_unstable():
"""Spotty-unstable branch keeps safe_atten and freezes immediately."""
parent = _make_parent(
ghost_threshold_per_frame=1000.0,
spottiness_threshold=0.1,
spotty_safe_atten=20,
spotty_stability_frames=5,
spotty_stability_tol=0.1,
initial_bits=4,
)
_push_metric(parent, frame=0, spottiness=0.3)
# iter 1: classify → spotty_probing → apply_filter(20)
_make_iter(parent, iter_nb=1).prepare()
assert parent._classification == "spotty_probing"
assert parent._filter.bits == 20
# 5 unstable spottiness values (rel_change > 0.1)
for f, s in zip(range(1, 6), [0.2, 0.4, 0.1, 0.35, 0.5]):
_push_metric(parent, frame=f, spottiness=s)
# queue a frame just so _safe_frame_step won't error if entered; but the
# spotty_unstable branch returns before the safe-frame loop
_queue_frames(parent, [200.0])
apply_calls_before = parent._apply_filter.call_count
_make_iter(parent, iter_nb=6).prepare()
assert parent._classification == "spotty_unstable"
assert parent._setup_phase_done is True
assert parent._atten_frozen_at == 20
# no further _apply_filter calls after the initial safe_atten set
assert parent._apply_filter.call_count == apply_calls_before
# ---------------------------------------------------------------------------
# Tests: reactive-mode back-compat guard
# ---------------------------------------------------------------------------
[docs]
def test_reactive_mode_does_not_run_frame0_setup():
"""In ``reactive`` mode, _frame0_setup_step is never entered."""
parent = _make_parent(attenuation_mode="reactive")
# reactive mode calls _adjust_filter, which we don't want to exercise here;
# stub it on the iterator.
it = _make_iter(parent, iter_nb=1)
it._adjust_filter = MagicMock()
it._frame0_setup_step = MagicMock()
it.prepare()
it._adjust_filter.assert_called_once()
it._frame0_setup_step.assert_not_called()
assert parent._setup_phase_done is False
# ---------------------------------------------------------------------------
# Invalid constructor arg
# ---------------------------------------------------------------------------
[docs]
def test_invalid_attenuation_mode_raises(mocker):
# ``__init__`` starts with ``assert_has_minimal_version("bliss", "2.2")``,
# which queries ``importlib.metadata`` for the bliss package. In a
# Bliss-less env that lookup raises ``PackageNotFoundError``, masking the
# validation under test. Patch the version check off for this test only.
mocker.patch.object(stop_scan_preset, "assert_has_minimal_version")
with pytest.raises(ValueError, match="attenuation_mode"):
stop_scan_preset.StopIntegrateSum.__init__(
object.__new__(stop_scan_preset.StopIntegrateSum),
workflow_threshold=0.01,
workflow_path="/tmp/nope.json",
detector_name="x",
detector_saturation=1e6,
pyfai_config_path="/tmp/nope.json",
attenuation_mode="ciao", # type: ignore
)