"""Unit tests for the ``StopIntegrateSum`` attenuation "freeze-mode" state machine.
These tests drive ``ScanPointIterator._freeze_setup_step`` directly with a
mocked parent preset, no Bliss session is needed. Integration tests are not adequate
to check freeze mode as the demo Bliss detector (``difflab6``) does not respond
to changes in attenuation (e.g. via ``setup_globals.atten.bits``).
"""
from __future__ import annotations
import logging
import types
from collections import deque
from unittest.mock import MagicMock
import numpy
import pytest
from ..id31 import stop_scan_preset
from ..id31.stop_scan_preset import ScanPointIterator
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
class _FakeFilter:
"""Minimal stand-in for the id31 Attenuator object: just holds `bits`."""
def __init__(self, bits: int = 0):
self.bits = bits
def _make_parent(
*,
attenuation_mode: str = "freeze",
ghost_threshold_per_frame: float = 1_000.0,
spottiness_threshold: float = 0.1,
spotty_safe_atten: int = 16,
spotty_stability_frames: int = 5,
spotty_stability_tol: float = 0.1,
spotty_extra_atten: int = 2,
metric_timeout: float = 0.2,
initial_bits: int = 0,
) -> MagicMock:
"""Build a mock parent preset exposing only the attributes the iterator uses."""
parent = MagicMock(name="StopIntegrateSum")
parent._atten_mode = attenuation_mode
parent._ghost_threshold_per_frame = ghost_threshold_per_frame
parent._spottiness_threshold = spottiness_threshold
parent._spotty_safe_atten = spotty_safe_atten
parent._spotty_stability_frames = spotty_stability_frames
parent._spotty_stability_tol = spotty_stability_tol
parent._spotty_extra_atten = spotty_extra_atten
parent._metric_timeout = metric_timeout
parent._setup_phase_done = False
parent.classification = None
parent._atten_frozen_at = None
parent._spotty_probe_start_iter = None
parent._metrics_by_frame = deque(maxlen=32)
parent._events_buffer = []
parent._filter = _FakeFilter(bits=initial_bits)
parent._cursor = MagicMock()
# _apply_filter mutates the filter's bits and records the call
def _apply_filter(bits: int) -> None:
parent._filter.bits = int(bits)
parent._apply_filter = MagicMock(side_effect=_apply_filter)
# predict that stepping down never crosses the threshold (tests override
# when a boundary scenario is needed)
parent._predict_frame_max_at = MagicMock(return_value=0.0)
# default the predicted-optimal helper to "unavailable" so existing tests
# that drive the ±1 path through _predict_frame_max_at keep working;
# tests exercising the jump path override side_effect explicitly.
parent._find_least_atten_below_threshold = MagicMock(return_value=None)
return parent
class _FrameView:
def __init__(self, frame: numpy.ndarray):
self._frame = frame
def get_data(self) -> numpy.ndarray:
return self._frame
def _queue_frames(parent: MagicMock, maxes: list[float]) -> None:
"""Programme the cursor to return a frame with each given max pixel in turn."""
def _read(timeout=None, last_only=True): # noqa: ARG001
try:
next_max = _read.queue.pop(0)
except IndexError:
next_max = _read.queue_last # keep returning the last one
_read.queue_last = next_max
return _FrameView(numpy.array([[next_max]], dtype=float))
_read.queue = list(maxes)
_read.queue_last = maxes[-1] if maxes else 0.0
parent._cursor.read = _read
def _push_metric(
parent: MagicMock, frame: int, spottiness: float, max_pixel: float = 0.0
) -> None:
"""Push a metric event onto the raw events buffer (byte-keyed like Redis)."""
parent._events_buffer.append(
{
b"type": b"metric",
b"frame": str(frame).encode(),
b"max_pixel": str(max_pixel).encode(),
b"spottiness": str(spottiness).encode(),
}
)
def _make_iter(parent: MagicMock, iter_nb: int) -> ScanPointIterator:
chain = MagicMock(name="AcquisitionChain")
it = ScanPointIterator(parent=parent, chain=chain, iter_nb=iter_nb)
return it
# ---------------------------------------------------------------------------
# Tests: event dispatch
# ---------------------------------------------------------------------------
[docs]
def test_event_dispatch_metric_stop_unknown(caplog):
"""_check_workflow_events routes each event type to the right sink."""
parent = _make_parent()
parent._events_buffer = [
{
b"type": b"metric",
b"frame": b"3",
b"max_pixel": b"42",
b"spottiness": b"0.5",
},
{b"type": b"unknown"},
{b"type": b"stop"},
]
it = _make_iter(parent, iter_nb=2)
with caplog.at_level(logging.DEBUG):
it._check_workflow_events()
# stop event triggers scan.stop; subsequent events after stop are not
# guaranteed to be processed (stop returns early)
it.chain.scan.stop.assert_called_once_with(wait=False)
assert list(parent._metrics_by_frame) == [
{"frame": 3, "max_pixel": 42.0, "spottiness": 0.5}
]
assert any("Unknown event type" in r.message for r in caplog.records)
# ---------------------------------------------------------------------------
# Tests: freeze-mode — standard branch
# ---------------------------------------------------------------------------
[docs]
def test_freeze_standard_convergence_jump_then_verify():
"""Standard-branch safe-frame loop jumps to the predicted-optimal
attenuator position and converges after a single verify iteration.
Frame 0 (at bits=0) is well above the ghost threshold. The helper
predicts the optimal landing position is bits=3. Iter 1 jumps 0→3;
iter 2 reads frame at bits=3 (below threshold) and the helper now
returns bits=3 again (no better position) → converged.
"""
parent = _make_parent(
ghost_threshold_per_frame=1000.0, spottiness_threshold=0.1, initial_bits=0
)
# frame 0 at bits=0 max=5000; subsequent frames at bits=3 max=300.
_queue_frames(parent, [5000.0, 300.0])
# helper: first call (frame_max=5000, cur=0) → jump to 3; second call (frame_max=300,
# cur=3) → already optimal, return 3 (verify).
parent._find_least_atten_below_threshold.side_effect = lambda frame_max, cur, thr: (
3 if cur == 0 else cur
)
_push_metric(parent, frame=0, spottiness=0.05)
# iter 1: classify standard, then jump 0 → 3
_make_iter(parent, iter_nb=1).prepare()
assert parent.classification == "standard"
assert parent._filter.bits == 3
assert parent._setup_phase_done is False
# iter 2: verify at 3 — helper returns 3, frame_max < threshold → converged
_make_iter(parent, iter_nb=2).prepare()
assert parent._setup_phase_done is True
assert parent._atten_frozen_at == 3
assert parent._filter.bits == 3
assert parent.classification == "standard"
# subsequent iters: no more filter moves
calls_before = parent._apply_filter.call_count
_make_iter(parent, iter_nb=3).prepare()
_make_iter(parent, iter_nb=10).prepare()
assert parent._apply_filter.call_count == calls_before
[docs]
def test_freeze_standard_jump_down_then_verify():
"""Starting over-attenuated (bits=4, frame max well below threshold) the
helper jumps directly down to bits=2; iter 2 verifies and freezes.
"""
parent = _make_parent(ghost_threshold_per_frame=1000.0, initial_bits=4)
# frame at bits=4 max=10; frame at bits=2 max=400 (still < threshold).
_queue_frames(parent, [10.0, 400.0])
parent._find_least_atten_below_threshold.side_effect = lambda frame_max, cur, thr: (
2 if cur == 4 else cur
)
_push_metric(parent, frame=0, spottiness=0.01)
# iter 1: classify standard, jump 4 → 2
_make_iter(parent, iter_nb=1).prepare()
assert parent._filter.bits == 2
assert parent._setup_phase_done is False
# iter 2: verify at 2, frame_max < threshold and helper returns 2 → converged
_make_iter(parent, iter_nb=2).prepare()
assert parent._setup_phase_done is True
assert parent._atten_frozen_at == 2
[docs]
def test_freeze_classification_timeout(caplog, mocker):
"""No frame-0 metric within metric_timeout → default to 'standard'."""
# ``_wait_for_frame0_metric`` calls ``gevent.sleep``; in a Bliss-less env
# the codebase's import guard turns it into an ``unavailable_module`` that
# raises on attribute access. Stub the module-level binding for this test
# only — same per-test patching style as ``tests/mock_id31``.
mocker.patch.object(
stop_scan_preset, "gevent", types.SimpleNamespace(sleep=lambda *_: None)
)
parent = _make_parent(
ghost_threshold_per_frame=1000.0, metric_timeout=0.05, initial_bits=0
)
_queue_frames(parent, [5000.0, 300.0])
parent._predict_frame_max_at.side_effect = lambda *a: 2000.0
# note: no _push_metric — the wait will time out
with caplog.at_level(logging.WARNING):
_make_iter(parent, iter_nb=1).prepare()
assert parent.classification == "standard"
assert any("no frame-0 metric" in r.message for r in caplog.records)
# iter 1 should still have done one atten step (frame 0 was saturating)
assert parent._filter.bits == 1
# ---------------------------------------------------------------------------
# Tests: freeze-mode — spotty stable / unstable
# ---------------------------------------------------------------------------
[docs]
def test_freeze_spotty_stable():
"""Spotty-stable branch jumps from safe_atten to predicted-optimal in one
move, verifies, applies the +extra_atten bump, then freezes.
"""
parent = _make_parent(
ghost_threshold_per_frame=1000.0,
spottiness_threshold=0.1,
spotty_safe_atten=16,
spotty_stability_frames=5,
spotty_stability_tol=0.1,
spotty_extra_atten=2,
initial_bits=4,
)
# classify spotty via a high frame-0 spottiness
_push_metric(parent, frame=0, spottiness=0.3)
# frames at safe_atten=16 read low; frames at bits=14 still under threshold
_queue_frames(parent, [200.0, 400.0])
# helper: at cur=16 jump to 14; at cur=14 verify-converged.
parent._find_least_atten_below_threshold.side_effect = lambda frame_max, cur, thr: (
14 if cur == 16 else cur
)
# iter 1: classify → spotty_probing → apply_filter(safe_atten=16)
_make_iter(parent, iter_nb=1).prepare()
assert parent.classification == "spotty_probing"
assert parent._filter.bits == 16
assert parent._spotty_probe_start_iter == 1
# feed 5 stable spottiness metrics (rel_change ≈ 0)
for f in range(1, 6):
_push_metric(parent, frame=f, spottiness=0.30 + f * 0.001)
# iter 6: probe window complete → classify spotty_stable_tuning, start
# safe-frame loop from bits=16. Helper returns 14 → jump 16 → 14.
_make_iter(parent, iter_nb=6).prepare()
assert parent.classification == "spotty_stable_tuning"
assert parent._filter.bits == 14
# iter 7: verify at 14 — helper returns 14, frame_max < threshold → converged.
# Apply +spotty_extra_atten (14 + 2 = 16). Freeze.
_make_iter(parent, iter_nb=7).prepare()
assert parent.classification == "spotty_stable"
assert parent._atten_frozen_at == 16
assert parent._filter.bits == 16
assert parent._setup_phase_done is True
[docs]
def test_freeze_standard_jump_then_verify_re_jumps():
"""Model under-predicted: first jump lands at bits=5 but the actual
frame max there is still ≥ threshold; helper picks bits=7; verify at
bits=7 → converged. Two jumps + one verify-converge.
"""
parent = _make_parent(ghost_threshold_per_frame=1000.0, initial_bits=0)
# frame at bits=0 max=5000; frame at bits=5 max=1200 (still > threshold);
# frame at bits=7 max=300 (below threshold)
_queue_frames(parent, [5000.0, 1200.0, 300.0])
def _helper(frame_max, cur, thr):
# First call (cur=0, frame_max=5000): predict landing at 5.
# Second call (cur=5, frame_max=1200): model now sees actual is higher than
# original prediction, re-picks 7.
# Third call (cur=7, frame_max=300): below threshold, already optimal.
if cur == 0:
return 5
if cur == 5:
return 7
return cur
parent._find_least_atten_below_threshold.side_effect = _helper
_push_metric(parent, frame=0, spottiness=0.05)
# iter 1: classify standard, jump 0 → 5
_make_iter(parent, iter_nb=1).prepare()
assert parent._filter.bits == 5
assert parent._setup_phase_done is False
# iter 2: verify at 5 fails (frame_max still ≥ threshold), re-jump 5 → 7
_make_iter(parent, iter_nb=2).prepare()
assert parent._filter.bits == 7
assert parent._setup_phase_done is False
# iter 3: verify at 7 — frame_max < threshold, helper returns 7 → converged
_make_iter(parent, iter_nb=3).prepare()
assert parent._setup_phase_done is True
assert parent._atten_frozen_at == 7
# exactly two jumps (apply_filter) happened during setup
assert parent._apply_filter.call_count == 2
[docs]
def test_freeze_standard_jump_falls_back_to_pm1():
"""When _find_least_atten_below_threshold returns None (e.g. missing
transmission data) the safe-frame loop falls back to the legacy ±1
step path driven by _predict_frame_max_at.
"""
parent = _make_parent(ghost_threshold_per_frame=1000.0, initial_bits=4)
_queue_frames(parent, [500.0])
# helper unavailable → fall back
parent._find_least_atten_below_threshold.side_effect = lambda *a: None
# predict that stepping from bits=2→1 would exceed threshold (boundary)
parent._predict_frame_max_at.side_effect = lambda frame_max, cur, tgt: (
2000.0 if (cur, tgt) == (2, 1) else 0.0
)
_push_metric(parent, frame=0, spottiness=0.01)
# iter 1: bits 4→3 (safe ±1 step)
_make_iter(parent, iter_nb=1).prepare()
assert parent._filter.bits == 3
# iter 2: bits 3→2
_make_iter(parent, iter_nb=2).prepare()
assert parent._filter.bits == 2
# iter 3: at 2, stepping down to 1 would exceed → freeze at 2
_make_iter(parent, iter_nb=3).prepare()
assert parent._setup_phase_done is True
assert parent._atten_frozen_at == 2
[docs]
def test_freeze_standard_jump_caps_at_max_atten(caplog):
"""Count rate too high for any position to predict below threshold: the
helper returns max_atten and the loop converges there with a warning.
"""
parent = _make_parent(ghost_threshold_per_frame=1000.0, initial_bits=0)
_queue_frames(parent, [1e9, 5e8])
# helper saturates at the strongest available attenuation
parent._find_least_atten_below_threshold.side_effect = (
lambda frame_max, cur, thr: 31
)
_push_metric(parent, frame=0, spottiness=0.05)
# iter 1: jump 0 → 31
_make_iter(parent, iter_nb=1).prepare()
assert parent._filter.bits == 31
# iter 2: verify at 31 — frame_max still ≥ threshold, helper returns 31 →
# freeze at max with warning
with caplog.at_level(logging.WARNING):
_make_iter(parent, iter_nb=2).prepare()
assert parent._setup_phase_done is True
assert parent._atten_frozen_at == 31
assert any("freezing at max attenuation" in r.message for r in caplog.records)
[docs]
def test_freeze_spotty_unstable():
"""Spotty-unstable branch keeps safe_atten and freezes immediately."""
parent = _make_parent(
ghost_threshold_per_frame=1000.0,
spottiness_threshold=0.1,
spotty_safe_atten=20,
spotty_stability_frames=5,
spotty_stability_tol=0.1,
initial_bits=4,
)
_push_metric(parent, frame=0, spottiness=0.3)
# iter 1: classify → spotty_probing → apply_filter(20)
_make_iter(parent, iter_nb=1).prepare()
assert parent.classification == "spotty_probing"
assert parent._filter.bits == 20
# 5 unstable spottiness values (rel_change > 0.1)
for f, s in zip(range(1, 6), [0.2, 0.4, 0.1, 0.35, 0.5]):
_push_metric(parent, frame=f, spottiness=s)
# queue a frame just so _safe_frame_step won't error if entered; but the
# spotty_unstable branch returns before the safe-frame loop
_queue_frames(parent, [200.0])
apply_calls_before = parent._apply_filter.call_count
_make_iter(parent, iter_nb=6).prepare()
assert parent.classification == "spotty_unstable"
assert parent._setup_phase_done is True
assert parent._atten_frozen_at == 20
# no further _apply_filter calls after the initial safe_atten set
assert parent._apply_filter.call_count == apply_calls_before
# ---------------------------------------------------------------------------
# Tests: reactive-mode back-compat guard
# ---------------------------------------------------------------------------
[docs]
def test_reactive_mode_does_not_run_frame0_setup():
"""In ``reactive`` mode, _freeze_setup_step is never entered."""
parent = _make_parent(attenuation_mode="reactive")
# reactive mode calls _adjust_filter, which we don't want to exercise here;
# stub it on the iterator.
it = _make_iter(parent, iter_nb=1)
it._adjust_filter = MagicMock()
it._freeze_setup_step = MagicMock()
it.prepare()
it._adjust_filter.assert_called_once()
it._freeze_setup_step.assert_not_called()
assert parent._setup_phase_done is False
# ---------------------------------------------------------------------------
# Invalid constructor arg
# ---------------------------------------------------------------------------
[docs]
def test_invalid_attenuation_mode_raises(mocker):
# ``__init__`` starts with ``assert_has_minimal_version("bliss", "2.2")``,
# which queries ``importlib.metadata`` for the bliss package. In a
# Bliss-less env that lookup raises ``PackageNotFoundError``, masking the
# validation under test. Patch the version check off for this test only.
mocker.patch.object(stop_scan_preset, "assert_has_minimal_version")
with pytest.raises(ValueError, match="attenuation_mode"):
stop_scan_preset.StopIntegrateSum.__init__(
object.__new__(stop_scan_preset.StopIntegrateSum),
workflow_threshold=0.01,
workflow_path="/tmp/nope.json",
detector_name="x",
detector_saturation=1e6,
pyfai_config_path="/tmp/nope.json",
attenuation_mode="ciao", # type: ignore
)
# ---------------------------------------------------------------------------
# Tests: _find_least_atten_below_threshold helper
# ---------------------------------------------------------------------------
[docs]
def test_find_optimal_atten_for_threshold_selection_rule(mocker):
"""The helper picks the smallest atten whose predicted intensity is
strictly below the threshold; falls back to the strongest available
attenuation when no position satisfies; returns None on missing or
invalid transmission data.
"""
preset = object.__new__(stop_scan_preset.StopIntegrateSum)
# synthetic transmissions: 1, 1/2, 1/4, …, 1/64 (7 positions)
transmissions = [1.0, 0.5, 0.25, 0.125, 0.0625, 0.03125, 0.015625]
mocker.patch.object(
stop_scan_preset.StopIntegrateSum,
"_get_filter_transmissions",
return_value=transmissions,
)
# frame_max=8000 at current=0 → count_rate=8000.
# predicted: 8000, 4000, 2000, 1000, 500, …
# first strictly < 1000 is idx 4 (500).
assert preset._find_least_atten_below_threshold(8000.0, 0, 1000.0) == 4
# over-attenuated: frame_max=300 at current=4 → count_rate=4800.
# predicted: 4800, 2400, 1200, 600, …
# first strictly < 1000 is idx 3 (600).
assert preset._find_least_atten_below_threshold(300.0, 4, 1000.0) == 3
# exactly on the cliff: frame_max=1000 at current=0 → count_rate=1000.
# predicted at idx 0 is 1000 (not strictly <), at idx 1 is 500 → idx 1.
assert preset._find_least_atten_below_threshold(1000.0, 0, 1000.0) == 1
# count rate too high: no position satisfies → return strongest available
assert preset._find_least_atten_below_threshold(1e6, 0, 1000.0) == 6
# missing transmission at current → None
mocker.patch.object(
stop_scan_preset.StopIntegrateSum,
"_get_filter_transmissions",
return_value=[None, 0.5, 0.25],
)
assert preset._find_least_atten_below_threshold(100.0, 0, 50.0) is None
# zero transmission at current → None
mocker.patch.object(
stop_scan_preset.StopIntegrateSum,
"_get_filter_transmissions",
return_value=[0.0, 0.5, 0.25],
)
assert preset._find_least_atten_below_threshold(100.0, 0, 50.0) is None
# exception during lookup → None
mocker.patch.object(
stop_scan_preset.StopIntegrateSum,
"_get_filter_transmissions",
side_effect=RuntimeError("kaboom"),
)
assert preset._find_least_atten_below_threshold(100.0, 0, 50.0) is None