Source code for blissoda.tests.test_stop_scan_preset

"""Unit tests for the ``StopIntegrateSum`` attenuation "freeze-mode" state machine.

These tests drive ``ScanPointIterator._freeze_setup_step`` directly with a
mocked parent preset, no Bliss session is needed. Integration tests are not adequate
to check freeze mode as the demo Bliss detector (``difflab6``) does not respond
to changes in attenuation (e.g. via ``setup_globals.atten.bits``).
"""

from __future__ import annotations

import logging
import types
from collections import deque
from unittest.mock import MagicMock

import numpy
import pytest

from ..id31 import stop_scan_preset
from ..id31.stop_scan_preset import ScanPointIterator


# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
class _FakeFilter:
    """Minimal stand-in for the id31 Attenuator object: just holds `bits`."""

    def __init__(self, bits: int = 0):
        self.bits = bits


def _make_parent(
    *,
    attenuation_mode: str = "freeze",
    ghost_threshold_per_frame: float = 1_000.0,
    spottiness_threshold: float = 0.1,
    spotty_safe_atten: int = 16,
    spotty_stability_frames: int = 5,
    spotty_stability_tol: float = 0.1,
    spotty_extra_atten: int = 2,
    metric_timeout: float = 0.2,
    initial_bits: int = 0,
) -> MagicMock:
    """Build a mock parent preset exposing only the attributes the iterator uses."""
    parent = MagicMock(name="StopIntegrateSum")
    parent._atten_mode = attenuation_mode
    parent._ghost_threshold_per_frame = ghost_threshold_per_frame
    parent._spottiness_threshold = spottiness_threshold
    parent._spotty_safe_atten = spotty_safe_atten
    parent._spotty_stability_frames = spotty_stability_frames
    parent._spotty_stability_tol = spotty_stability_tol
    parent._spotty_extra_atten = spotty_extra_atten
    parent._metric_timeout = metric_timeout

    parent._setup_phase_done = False
    parent.classification = None
    parent._atten_frozen_at = None
    parent._spotty_probe_start_iter = None
    parent._metrics_by_frame = deque(maxlen=32)
    parent._events_buffer = []

    parent._filter = _FakeFilter(bits=initial_bits)
    parent._cursor = MagicMock()

    # _apply_filter mutates the filter's bits and records the call
    def _apply_filter(bits: int) -> None:
        parent._filter.bits = int(bits)

    parent._apply_filter = MagicMock(side_effect=_apply_filter)
    # predict that stepping down never crosses the threshold (tests override
    # when a boundary scenario is needed)
    parent._predict_frame_max_at = MagicMock(return_value=0.0)
    # default the predicted-optimal helper to "unavailable" so existing tests
    # that drive the ±1 path through _predict_frame_max_at keep working;
    # tests exercising the jump path override side_effect explicitly.
    parent._find_least_atten_below_threshold = MagicMock(return_value=None)
    return parent


class _FrameView:
    def __init__(self, frame: numpy.ndarray):
        self._frame = frame

    def get_data(self) -> numpy.ndarray:
        return self._frame


def _queue_frames(parent: MagicMock, maxes: list[float]) -> None:
    """Programme the cursor to return a frame with each given max pixel in turn."""

    def _read(timeout=None, last_only=True):  # noqa: ARG001
        try:
            next_max = _read.queue.pop(0)
        except IndexError:
            next_max = _read.queue_last  # keep returning the last one
        _read.queue_last = next_max
        return _FrameView(numpy.array([[next_max]], dtype=float))

    _read.queue = list(maxes)
    _read.queue_last = maxes[-1] if maxes else 0.0
    parent._cursor.read = _read


def _push_metric(
    parent: MagicMock, frame: int, spottiness: float, max_pixel: float = 0.0
) -> None:
    """Push a metric event onto the raw events buffer (byte-keyed like Redis)."""
    parent._events_buffer.append(
        {
            b"type": b"metric",
            b"frame": str(frame).encode(),
            b"max_pixel": str(max_pixel).encode(),
            b"spottiness": str(spottiness).encode(),
        }
    )


def _make_iter(parent: MagicMock, iter_nb: int) -> ScanPointIterator:
    chain = MagicMock(name="AcquisitionChain")
    it = ScanPointIterator(parent=parent, chain=chain, iter_nb=iter_nb)
    return it


# ---------------------------------------------------------------------------
# Tests: event dispatch
# ---------------------------------------------------------------------------
[docs] def test_event_dispatch_metric_stop_unknown(caplog): """_check_workflow_events routes each event type to the right sink.""" parent = _make_parent() parent._events_buffer = [ { b"type": b"metric", b"frame": b"3", b"max_pixel": b"42", b"spottiness": b"0.5", }, {b"type": b"unknown"}, {b"type": b"stop"}, ] it = _make_iter(parent, iter_nb=2) with caplog.at_level(logging.DEBUG): it._check_workflow_events() # stop event triggers scan.stop; subsequent events after stop are not # guaranteed to be processed (stop returns early) it.chain.scan.stop.assert_called_once_with(wait=False) assert list(parent._metrics_by_frame) == [ {"frame": 3, "max_pixel": 42.0, "spottiness": 0.5} ] assert any("Unknown event type" in r.message for r in caplog.records)
[docs] def test_event_dispatch_drops_malformed_metric(caplog): """Malformed metric events are logged and dropped without crashing.""" parent = _make_parent() parent._events_buffer = [ { b"type": b"metric", b"frame": b"not-an-int", b"max_pixel": b"0", b"spottiness": b"0", } ] it = _make_iter(parent, iter_nb=2) with caplog.at_level(logging.DEBUG): it._check_workflow_events() assert not parent._metrics_by_frame assert any("Malformed metric event" in r.message for r in caplog.records)
# --------------------------------------------------------------------------- # Tests: freeze-mode — standard branch # ---------------------------------------------------------------------------
[docs] def test_freeze_standard_convergence_jump_then_verify(): """Standard-branch safe-frame loop jumps to the predicted-optimal attenuator position and converges after a single verify iteration. Frame 0 (at bits=0) is well above the ghost threshold. The helper predicts the optimal landing position is bits=3. Iter 1 jumps 0→3; iter 2 reads frame at bits=3 (below threshold) and the helper now returns bits=3 again (no better position) → converged. """ parent = _make_parent( ghost_threshold_per_frame=1000.0, spottiness_threshold=0.1, initial_bits=0 ) # frame 0 at bits=0 max=5000; subsequent frames at bits=3 max=300. _queue_frames(parent, [5000.0, 300.0]) # helper: first call (frame_max=5000, cur=0) → jump to 3; second call (frame_max=300, # cur=3) → already optimal, return 3 (verify). parent._find_least_atten_below_threshold.side_effect = lambda frame_max, cur, thr: ( 3 if cur == 0 else cur ) _push_metric(parent, frame=0, spottiness=0.05) # iter 1: classify standard, then jump 0 → 3 _make_iter(parent, iter_nb=1).prepare() assert parent.classification == "standard" assert parent._filter.bits == 3 assert parent._setup_phase_done is False # iter 2: verify at 3 — helper returns 3, frame_max < threshold → converged _make_iter(parent, iter_nb=2).prepare() assert parent._setup_phase_done is True assert parent._atten_frozen_at == 3 assert parent._filter.bits == 3 assert parent.classification == "standard" # subsequent iters: no more filter moves calls_before = parent._apply_filter.call_count _make_iter(parent, iter_nb=3).prepare() _make_iter(parent, iter_nb=10).prepare() assert parent._apply_filter.call_count == calls_before
[docs] def test_freeze_standard_jump_down_then_verify(): """Starting over-attenuated (bits=4, frame max well below threshold) the helper jumps directly down to bits=2; iter 2 verifies and freezes. """ parent = _make_parent(ghost_threshold_per_frame=1000.0, initial_bits=4) # frame at bits=4 max=10; frame at bits=2 max=400 (still < threshold). _queue_frames(parent, [10.0, 400.0]) parent._find_least_atten_below_threshold.side_effect = lambda frame_max, cur, thr: ( 2 if cur == 4 else cur ) _push_metric(parent, frame=0, spottiness=0.01) # iter 1: classify standard, jump 4 → 2 _make_iter(parent, iter_nb=1).prepare() assert parent._filter.bits == 2 assert parent._setup_phase_done is False # iter 2: verify at 2, frame_max < threshold and helper returns 2 → converged _make_iter(parent, iter_nb=2).prepare() assert parent._setup_phase_done is True assert parent._atten_frozen_at == 2
[docs] def test_freeze_classification_timeout(caplog, mocker): """No frame-0 metric within metric_timeout → default to 'standard'.""" # ``_wait_for_frame0_metric`` calls ``gevent.sleep``; in a Bliss-less env # the codebase's import guard turns it into an ``unavailable_module`` that # raises on attribute access. Stub the module-level binding for this test # only — same per-test patching style as ``tests/mock_id31``. mocker.patch.object( stop_scan_preset, "gevent", types.SimpleNamespace(sleep=lambda *_: None) ) parent = _make_parent( ghost_threshold_per_frame=1000.0, metric_timeout=0.05, initial_bits=0 ) _queue_frames(parent, [5000.0, 300.0]) parent._predict_frame_max_at.side_effect = lambda *a: 2000.0 # note: no _push_metric — the wait will time out with caplog.at_level(logging.WARNING): _make_iter(parent, iter_nb=1).prepare() assert parent.classification == "standard" assert any("no frame-0 metric" in r.message for r in caplog.records) # iter 1 should still have done one atten step (frame 0 was saturating) assert parent._filter.bits == 1
# --------------------------------------------------------------------------- # Tests: freeze-mode — spotty stable / unstable # ---------------------------------------------------------------------------
[docs] def test_freeze_spotty_stable(): """Spotty-stable branch jumps from safe_atten to predicted-optimal in one move, verifies, applies the +extra_atten bump, then freezes. """ parent = _make_parent( ghost_threshold_per_frame=1000.0, spottiness_threshold=0.1, spotty_safe_atten=16, spotty_stability_frames=5, spotty_stability_tol=0.1, spotty_extra_atten=2, initial_bits=4, ) # classify spotty via a high frame-0 spottiness _push_metric(parent, frame=0, spottiness=0.3) # frames at safe_atten=16 read low; frames at bits=14 still under threshold _queue_frames(parent, [200.0, 400.0]) # helper: at cur=16 jump to 14; at cur=14 verify-converged. parent._find_least_atten_below_threshold.side_effect = lambda frame_max, cur, thr: ( 14 if cur == 16 else cur ) # iter 1: classify → spotty_probing → apply_filter(safe_atten=16) _make_iter(parent, iter_nb=1).prepare() assert parent.classification == "spotty_probing" assert parent._filter.bits == 16 assert parent._spotty_probe_start_iter == 1 # feed 5 stable spottiness metrics (rel_change ≈ 0) for f in range(1, 6): _push_metric(parent, frame=f, spottiness=0.30 + f * 0.001) # iter 6: probe window complete → classify spotty_stable_tuning, start # safe-frame loop from bits=16. Helper returns 14 → jump 16 → 14. _make_iter(parent, iter_nb=6).prepare() assert parent.classification == "spotty_stable_tuning" assert parent._filter.bits == 14 # iter 7: verify at 14 — helper returns 14, frame_max < threshold → converged. # Apply +spotty_extra_atten (14 + 2 = 16). Freeze. _make_iter(parent, iter_nb=7).prepare() assert parent.classification == "spotty_stable" assert parent._atten_frozen_at == 16 assert parent._filter.bits == 16 assert parent._setup_phase_done is True
[docs] def test_freeze_standard_jump_then_verify_re_jumps(): """Model under-predicted: first jump lands at bits=5 but the actual frame max there is still ≥ threshold; helper picks bits=7; verify at bits=7 → converged. Two jumps + one verify-converge. """ parent = _make_parent(ghost_threshold_per_frame=1000.0, initial_bits=0) # frame at bits=0 max=5000; frame at bits=5 max=1200 (still > threshold); # frame at bits=7 max=300 (below threshold) _queue_frames(parent, [5000.0, 1200.0, 300.0]) def _helper(frame_max, cur, thr): # First call (cur=0, frame_max=5000): predict landing at 5. # Second call (cur=5, frame_max=1200): model now sees actual is higher than # original prediction, re-picks 7. # Third call (cur=7, frame_max=300): below threshold, already optimal. if cur == 0: return 5 if cur == 5: return 7 return cur parent._find_least_atten_below_threshold.side_effect = _helper _push_metric(parent, frame=0, spottiness=0.05) # iter 1: classify standard, jump 0 → 5 _make_iter(parent, iter_nb=1).prepare() assert parent._filter.bits == 5 assert parent._setup_phase_done is False # iter 2: verify at 5 fails (frame_max still ≥ threshold), re-jump 5 → 7 _make_iter(parent, iter_nb=2).prepare() assert parent._filter.bits == 7 assert parent._setup_phase_done is False # iter 3: verify at 7 — frame_max < threshold, helper returns 7 → converged _make_iter(parent, iter_nb=3).prepare() assert parent._setup_phase_done is True assert parent._atten_frozen_at == 7 # exactly two jumps (apply_filter) happened during setup assert parent._apply_filter.call_count == 2
[docs] def test_freeze_standard_jump_falls_back_to_pm1(): """When _find_least_atten_below_threshold returns None (e.g. missing transmission data) the safe-frame loop falls back to the legacy ±1 step path driven by _predict_frame_max_at. """ parent = _make_parent(ghost_threshold_per_frame=1000.0, initial_bits=4) _queue_frames(parent, [500.0]) # helper unavailable → fall back parent._find_least_atten_below_threshold.side_effect = lambda *a: None # predict that stepping from bits=2→1 would exceed threshold (boundary) parent._predict_frame_max_at.side_effect = lambda frame_max, cur, tgt: ( 2000.0 if (cur, tgt) == (2, 1) else 0.0 ) _push_metric(parent, frame=0, spottiness=0.01) # iter 1: bits 4→3 (safe ±1 step) _make_iter(parent, iter_nb=1).prepare() assert parent._filter.bits == 3 # iter 2: bits 3→2 _make_iter(parent, iter_nb=2).prepare() assert parent._filter.bits == 2 # iter 3: at 2, stepping down to 1 would exceed → freeze at 2 _make_iter(parent, iter_nb=3).prepare() assert parent._setup_phase_done is True assert parent._atten_frozen_at == 2
[docs] def test_freeze_standard_jump_caps_at_max_atten(caplog): """Count rate too high for any position to predict below threshold: the helper returns max_atten and the loop converges there with a warning. """ parent = _make_parent(ghost_threshold_per_frame=1000.0, initial_bits=0) _queue_frames(parent, [1e9, 5e8]) # helper saturates at the strongest available attenuation parent._find_least_atten_below_threshold.side_effect = ( lambda frame_max, cur, thr: 31 ) _push_metric(parent, frame=0, spottiness=0.05) # iter 1: jump 0 → 31 _make_iter(parent, iter_nb=1).prepare() assert parent._filter.bits == 31 # iter 2: verify at 31 — frame_max still ≥ threshold, helper returns 31 → # freeze at max with warning with caplog.at_level(logging.WARNING): _make_iter(parent, iter_nb=2).prepare() assert parent._setup_phase_done is True assert parent._atten_frozen_at == 31 assert any("freezing at max attenuation" in r.message for r in caplog.records)
[docs] def test_freeze_spotty_unstable(): """Spotty-unstable branch keeps safe_atten and freezes immediately.""" parent = _make_parent( ghost_threshold_per_frame=1000.0, spottiness_threshold=0.1, spotty_safe_atten=20, spotty_stability_frames=5, spotty_stability_tol=0.1, initial_bits=4, ) _push_metric(parent, frame=0, spottiness=0.3) # iter 1: classify → spotty_probing → apply_filter(20) _make_iter(parent, iter_nb=1).prepare() assert parent.classification == "spotty_probing" assert parent._filter.bits == 20 # 5 unstable spottiness values (rel_change > 0.1) for f, s in zip(range(1, 6), [0.2, 0.4, 0.1, 0.35, 0.5]): _push_metric(parent, frame=f, spottiness=s) # queue a frame just so _safe_frame_step won't error if entered; but the # spotty_unstable branch returns before the safe-frame loop _queue_frames(parent, [200.0]) apply_calls_before = parent._apply_filter.call_count _make_iter(parent, iter_nb=6).prepare() assert parent.classification == "spotty_unstable" assert parent._setup_phase_done is True assert parent._atten_frozen_at == 20 # no further _apply_filter calls after the initial safe_atten set assert parent._apply_filter.call_count == apply_calls_before
# --------------------------------------------------------------------------- # Tests: reactive-mode back-compat guard # ---------------------------------------------------------------------------
[docs] def test_reactive_mode_does_not_run_frame0_setup(): """In ``reactive`` mode, _freeze_setup_step is never entered.""" parent = _make_parent(attenuation_mode="reactive") # reactive mode calls _adjust_filter, which we don't want to exercise here; # stub it on the iterator. it = _make_iter(parent, iter_nb=1) it._adjust_filter = MagicMock() it._freeze_setup_step = MagicMock() it.prepare() it._adjust_filter.assert_called_once() it._freeze_setup_step.assert_not_called() assert parent._setup_phase_done is False
# --------------------------------------------------------------------------- # Invalid constructor arg # ---------------------------------------------------------------------------
[docs] def test_invalid_attenuation_mode_raises(mocker): # ``__init__`` starts with ``assert_has_minimal_version("bliss", "2.2")``, # which queries ``importlib.metadata`` for the bliss package. In a # Bliss-less env that lookup raises ``PackageNotFoundError``, masking the # validation under test. Patch the version check off for this test only. mocker.patch.object(stop_scan_preset, "assert_has_minimal_version") with pytest.raises(ValueError, match="attenuation_mode"): stop_scan_preset.StopIntegrateSum.__init__( object.__new__(stop_scan_preset.StopIntegrateSum), workflow_threshold=0.01, workflow_path="/tmp/nope.json", detector_name="x", detector_saturation=1e6, pyfai_config_path="/tmp/nope.json", attenuation_mode="ciao", # type: ignore )
# --------------------------------------------------------------------------- # Tests: _find_least_atten_below_threshold helper # ---------------------------------------------------------------------------
[docs] def test_find_optimal_atten_for_threshold_selection_rule(mocker): """The helper picks the smallest atten whose predicted intensity is strictly below the threshold; falls back to the strongest available attenuation when no position satisfies; returns None on missing or invalid transmission data. """ preset = object.__new__(stop_scan_preset.StopIntegrateSum) # synthetic transmissions: 1, 1/2, 1/4, …, 1/64 (7 positions) transmissions = [1.0, 0.5, 0.25, 0.125, 0.0625, 0.03125, 0.015625] mocker.patch.object( stop_scan_preset.StopIntegrateSum, "_get_filter_transmissions", return_value=transmissions, ) # frame_max=8000 at current=0 → count_rate=8000. # predicted: 8000, 4000, 2000, 1000, 500, … # first strictly < 1000 is idx 4 (500). assert preset._find_least_atten_below_threshold(8000.0, 0, 1000.0) == 4 # over-attenuated: frame_max=300 at current=4 → count_rate=4800. # predicted: 4800, 2400, 1200, 600, … # first strictly < 1000 is idx 3 (600). assert preset._find_least_atten_below_threshold(300.0, 4, 1000.0) == 3 # exactly on the cliff: frame_max=1000 at current=0 → count_rate=1000. # predicted at idx 0 is 1000 (not strictly <), at idx 1 is 500 → idx 1. assert preset._find_least_atten_below_threshold(1000.0, 0, 1000.0) == 1 # count rate too high: no position satisfies → return strongest available assert preset._find_least_atten_below_threshold(1e6, 0, 1000.0) == 6 # missing transmission at current → None mocker.patch.object( stop_scan_preset.StopIntegrateSum, "_get_filter_transmissions", return_value=[None, 0.5, 0.25], ) assert preset._find_least_atten_below_threshold(100.0, 0, 50.0) is None # zero transmission at current → None mocker.patch.object( stop_scan_preset.StopIntegrateSum, "_get_filter_transmissions", return_value=[0.0, 0.5, 0.25], ) assert preset._find_least_atten_below_threshold(100.0, 0, 50.0) is None # exception during lookup → None mocker.patch.object( stop_scan_preset.StopIntegrateSum, "_get_filter_transmissions", side_effect=RuntimeError("kaboom"), ) assert preset._find_least_atten_below_threshold(100.0, 0, 50.0) is None