from __future__ import annotations
import logging
import shutil
import sys
import tempfile
import time
import warnings
from collections import deque
from collections.abc import Generator
from collections.abc import Sequence
from pathlib import Path
from typing import Literal
from typing import Optional
import redis
from blissdata.beacon.data import BeaconData
from ewoksutils.task_utils import task_inputs
from blissoda.utils.directories import get_workflows_dir
from ..ewoks_utils import submit
from ..import_utils import unavailable_class
from ..import_utils import unavailable_module
from ..version_utils import assert_has_minimal_version
try:
import gevent
except ImportError as ex:
gevent = unavailable_module(ex)
try:
from bliss import setup_globals # type: ignore
except ImportError as ex:
setup_globals = unavailable_class(ex)
try:
from bliss.scanning.chain import AcquisitionChain # type: ignore
except ImportError as ex:
AcquisitionChain = unavailable_class(ex)
try:
from bliss.scanning.chain import ChainIterationPreset # type: ignore
except ImportError as ex:
ChainIterationPreset = unavailable_class(ex)
try:
from bliss.scanning.chain import ChainPreset # type: ignore
except ImportError as ex:
ChainPreset = unavailable_class(ex)
try:
from bliss.common.auto_filter.filterset import FilterSet # type: ignore
except ImportError as ex:
FilterSet = unavailable_class(ex)
logger = logging.getLogger(__name__)
_MOTOR_POLL_INTERVAL = 0.05 # s to wait for filter/atten motor move
_CURSOR_READ_TIMEOUT = 0.5 # s to wait for a new blissdata frame
_REDIS_XREAD_BLOCK_MS = 100 # ms to block waiting for stream events
_SETUP_POLL_INTERVAL = 0.05 # s between buffer checks while waiting on a metric
_ID31_ATTENUATOR_BITS = 32
_COLORS = {
"red": "\033[31m",
"green": "\033[32m",
"yellow": "\033[33m",
"blue": "\033[34m",
"reset": "\033[0m",
}
def _color(text: str, color: str) -> str:
"""Wrap text in ANSI color codes only when stderr is a TTY."""
if not hasattr(sys.stderr, "isatty") or not sys.stderr.isatty():
return text
return f"{_COLORS.get(color, '')}{text}{_COLORS['reset']}"
def _timestamp() -> str:
now = time.time()
ms = int((now - int(now)) * 1000)
s = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(now)) + ",%03d" % ms
return s
def _log_atten_action(
level: int,
symbol: str,
color: str,
frame_nb: int,
frame_max: float,
threshold: float,
comparator: str,
atten_msg: str,
) -> None:
"""Emit a log message for insertion/removal of filters or attenuators."""
logger.log(
level,
"[%s] %s #%2.0f max: %2.2e %s %2.2e, %s",
_timestamp(),
_color(symbol, color),
frame_nb,
frame_max,
comparator,
threshold,
_color(atten_msg, color),
)
[docs]
class StopIntegrateSum(ChainPreset): # pyright: ignore[reportGeneralTypeIssues]
def __init__(
self,
workflow_threshold: float,
workflow_path: str | Path,
detector_name: str,
detector_saturation: float,
pyfai_config_path: str | Path,
attenuator_name: str = "filt_wheel",
frame_target_max: float | None = None,
frame_target_min: float | None = None,
ewoksjob_queue: str = "mt",
attenuation_mode: Literal["freeze", "reactive"] = "freeze",
ghost_threshold_per_frame: float = 3750.0,
spottiness_threshold: float = 0.1,
spotty_safe_atten: int = 16,
spotty_stability_frames: int = 5,
spotty_stability_tol: float = 0.1,
spotty_extra_atten: int = 2,
metric_timeout: float = 2.0,
sum_frame_max_threshold: float = 3e5,
) -> None:
"""An acquisition chain preset for automatic attenuation and scan stopping in
XRPD measurements.
To be used with:
- a continuous scan mimicking detector accumulation, e.g.
``timescan(0.2, npoints=0)``.
- an Ewoks workflow containing an ``IntegrateAndSendMsg`` task inheriting
from ``ewoksxrpd.tasks.integrate._BaseIntegrateSingleGeometry`` that
publishes events on the ``blissoda:feedback:stop_task`` Redis stream:
- ``{'type': 'stop'}`` — sent once the integration metric crosses
``workflow_threshold``; the preset stops the scan.
- ``{'type': 'metric', 'frame': i, 'max_pixel': ..., 'spottiness': ...}``
— sent per integrated frame; consumed by the "freeze" auto attenuation mode
during its setup phase to classify the sample and steer the attenuator.
Attach this preset to a scan to:
- **adjust attenuation automatically** in one of two modes (selectable
via ``attenuation_mode``):
- ``"freeze"`` (default): the sample is classified from the first
frame's ``spottiness`` metric, the attenuator is then tuned
iteratively to keep the frame max below ``ghost_threshold_per_frame``
(with a dedicated branch for spotty samples that probes spottiness
stability over ``spotty_stability_frames`` frames at
``spotty_safe_atten`` before deciding), and finally the attenuator
is **frozen** for the rest of the scan to remove the per-iteration
blocking read overhead.
- ``"reactive"`` (legacy): the attenuator is re-evaluated at every
scan point against the ``frame_target_min`` / ``frame_target_max``
window — kept as a fallback.
- **stop the scan** based on the output of the Ewoks workflow
triggered at scan start.
The Pilatus protection trip is handled at the scanner level
(``streamline_scanner._measure_sample_auto_stop`` retries the scan with
a higher attenuator) — a fresh preset instance is created on each retry,
so freeze-mode classification simply re-runs from the new frame 0.
:param workflow_threshold: The "statistic" computed by the Ewoks workflow, upon
reaching which the workflow sends a signal to stop the scan.
:param workflow_path: Path to the Ewoks workflow `.json`.
:param detector_name: Name of the detector used in the scan, whose `.image`
counter (i.e. raw frames) will be fed to the Ewoks workflow.
:param detector_saturation: Max of detector dynamic range. Damage above.
:param pyfai_config_path: Path to the PyFAI configuration `.json`.
:param attenuator_name: Name of the filter or attenuator Bliss object to
insert/remove dynamically.
:param frame_target_max: Only used in ``attenuation_mode="reactive"``.
Maximum number of desired counts in a frame. Select attenuation to get
closest to this value, and above `frame_target_min`. If None, set to
`0.9 * detector_saturation`.
:param frame_target_min: Only used in ``attenuation_mode="reactive"``.
Minimum number of desired counts in a frame. Select attenuation to be
above this value, and closest to `frame_target_max`. If None, set to
`0.1 * detector_saturation`.
:param ewoksjob_queue: Name of the Celery queue to route the Ewoks tasks
submitted by the preset to. Default: 'mt'.
:param attenuation_mode: ``"freeze"`` (default) classifies the sample from
the first collected frame and runs a one-time attenuation tuning loop before
freezing the attenuator for the rest of the scan; ``"reactive"`` uses
per-scan-point tuning based on ``frame_target_min/_max``.
:param ghost_threshold_per_frame: Only used in ``"freeze"`` mode. Max
per-frame pixel count above which "ghost peaks" are expected. The
attenuator tuning procedure jumps directly to the predicted optimal
attenuator value - i.e. the value whose predicted transmitted intensity
is just below this threshold, as computed from the SiO2 transmission
table, and verifies the prediction on the next iteration before
freezing. Falls back to a ±1 step loop when the transmission table
is unavailable.
:param spottiness_threshold: Only used in ``"freeze"`` mode. Value of
``pyFAI.Integrate1dResult.calc_spottiness(weighted=False)`` for the first
collected frame, above which the sample is classified as spotty.
:param spotty_safe_atten: Only used in ``"freeze"`` mode. Safe attenuator
position applied at the start of the spotty stability test.
:param spotty_stability_frames: Only used in ``"freeze"`` mode. Number of
consecutive spottiness values to collect at ``spotty_safe_atten``
before deciding stable vs. unstable.
:param spotty_stability_tol: Only used in ``"freeze"`` mode. Maximum
``(max(S) - min(S)) / mean(S)`` with ``S`` being the spottiness,
over ``spotty_stability_frames`` for a spotty sample to be considered
stable.
:param spotty_extra_atten: Only used in ``"freeze"`` mode. Additional
attenuator steps added on top of the safe-frame sweet spot for
stable-spotty samples to extend the accumulation time.
:param metric_timeout: Only used in ``"freeze"`` mode. Seconds to block at
the start of iter 1 waiting for the Ewoks workflow's frame-0 spottiness
event. Falls back to the ``"standard"`` branch on timeout.
:param sum_frame_max_threshold: Stop the scan once the maximum counts in
the **accumulated** 2D detector frame exceed this value. Evaluated
inside the Ewoks workflow alongside the ``acc_profile_threshold``
(rel_err) criterion; first-to-fire wins. Default ``3e5`` cts.
"""
assert_has_minimal_version("bliss", "2.2")
self._detector_saturation = detector_saturation
self._workflow_threshold = workflow_threshold
self._target_max = (
frame_target_max
if frame_target_max is not None
else detector_saturation * 0.9
)
self._target_min = (
frame_target_min
if frame_target_min is not None
else detector_saturation * 0.1
)
if attenuation_mode not in ("freeze", "reactive"):
raise ValueError(
f"attenuation_mode must be 'freeze' or 'reactive', got "
f"{attenuation_mode!r}"
)
self._atten_mode: str = attenuation_mode
self._ghost_threshold_per_frame = float(ghost_threshold_per_frame)
self._spottiness_threshold = float(spottiness_threshold)
self._spotty_safe_atten = int(spotty_safe_atten)
self._spotty_stability_frames = int(spotty_stability_frames)
self._spotty_stability_tol = float(spotty_stability_tol)
self._spotty_extra_atten = int(spotty_extra_atten)
self._metric_timeout = float(metric_timeout)
if sum_frame_max_threshold < 0:
raise ValueError(
f"sum_frame_max_threshold must be >= 0, "
f"got {sum_frame_max_threshold!r}"
)
self._sum_frame_max_threshold = float(sum_frame_max_threshold)
# freeze-mode state (per preset instance, i.e. per sample scan)
self._setup_phase_done: bool = False
self.classification: Optional[str] = None # standard | spotty_* | None
self._atten_frozen_at: Optional[int] = None
self._spotty_probe_start_iter: Optional[int] = None
self._metrics_by_frame: deque = deque(
maxlen=max(self._spotty_stability_frames + 5, 16)
)
try:
self._filter = getattr(setup_globals, attenuator_name)
except AttributeError as ex:
raise RuntimeError(
f"The attenuator {attenuator_name} is not available "
f"in setup_globals: {ex}."
)
self._filter_name = attenuator_name
self._filename = None
self._scan_number = None
self._scan_key = None
self._detector_name = detector_name
self.pyfai_config = pyfai_config_path
if self.pyfai_config is None:
raise NotImplementedError(
"pyfai_config cannot be None: workflow needs detector geometry params."
"Could be supplied via _integration_options. TBD."
)
self._energy = None
self._workflow_path = workflow_path
self._integration_options = dict(
method=("no", "csr", "cython"),
unit="2th_deg",
integrator_name="sigma_clip_ng",
extra_options={"max_iter": 5, "thres": 3},
error_model="azimuthal",
correct_solid_angle=True,
polarization_factor=0.97,
)
self._redis_url = BeaconData().get_redis_data_db()
self._redis = redis.Redis.from_url(self._redis_url)
self._stream_name = "blissoda:feedback:stop_task"
self._last_id = "0-0"
self._future = None
self._events_buffer = []
self._greenlet = None
self._cursor = None
self._queue = ewoksjob_queue
self._transmissions_cache: tuple[tuple, Sequence] | tuple[()] = ()
def __repr__(self) -> str:
return (
"StopSumPreset\n"
f" Detector: {self._detector_name}\n"
f" Attenuator: {self._filter_name}\n"
f" PyFAI config: {self.pyfai_config}\n"
f" Integration options: (precedence over config)\n"
+ "\n".join(
[f" {k}: {v}" for k, v in self._integration_options.items()]
)
)
@property
def pyfai_integration_options(self) -> dict:
return self._integration_options
@pyfai_integration_options.setter
def pyfai_integration_options(self, value: dict) -> None:
self._integration_options = value
@property
def pyfai_config(self) -> str | Path | None:
return self._pyfai_config
@pyfai_config.setter
def pyfai_config(self, value: str | Path) -> None:
if value is None:
self._pyfai_config = None
return
p = Path(value)
if not p.exists():
raise FileNotFoundError(f"pyFAI config file does not exist: {p}")
self._pyfai_config = str(p.absolute())
[docs]
def prepare(self, chain: AcquisitionChain) -> None: # type: ignore
self._cache_scan_info(chain)
[docs]
def start(self, chain: AcquisitionChain) -> None: # type: ignore
self._start_workflow()
[docs]
def stop(self, chain: AcquisitionChain) -> None: # type: ignore
self._stop_workflow()
def _cache_scan_info(self, chain: AcquisitionChain) -> None: # type: ignore
"""Called at scan.prepare()."""
self._scan = chain.scan
self._filename = chain.scan.scan_info["filename"]
self._scan_number = chain.scan.scan_info["scan_nb"]
self._scan_key = chain.scan._scan_data.key
self._energy = chain.scan.scan_info["positioners"]["positioners_start"][
"energy"
]
def _get_workflow_task_inputs(self) -> list[dict]:
if not self._pyfai_config:
warnings.warn("No pyFAI config file specified")
# pyfai config task inputs
# out: energy / geometry (dict) / detector name(str)
# detector config (dict) / mask (array|None) / ff (array|none)
# integration options
inputs = task_inputs(
task_identifier="PyFaiConfig",
inputs=dict(
energy=self._energy, # priority 1
filename=self._pyfai_config, # priority 4
integration_options=self._integration_options, # priority 2
),
)
# integration task inputs
# uses iter_bliss_data
inputs += task_inputs(
task_identifier="IntegrateAndSendMsg",
inputs=dict(
scan_key=self._scan_key, # checked first
filename=self._filename, # fall back to fname/scan
scan_number=self._scan_number,
detector_name=self._detector_name,
response_stream_name=self._stream_name,
acc_profile_threshold=self._workflow_threshold,
sum_frame_max_threshold=self._sum_frame_max_threshold,
redis_url=self._redis_url,
retry_timeout=1000, # 1s
),
)
# write frame sum task inputs
# reads frames from output loopscan and writes a single summed frame
# in *_sum.h5, then fed to the xrpd workflow
inputs += task_inputs(
task_identifier="WriteSumFrame",
inputs=dict(
filename=self._filename,
scan_number=self._scan_number,
detector_name=self._detector_name,
),
)
return inputs
def _resolve_workflow_path(self) -> str:
"""Ensure the workflow file is in the proposal's workflows directory.
Follows the same pattern as the blissoda xrpd processor: copy the
source workflow into ``<processed>/workflows/`` (derived from the
dataset filename) so that the Ewoks worker can always reach it via
an absolute path on the shared filesystem.
"""
src = Path(self._workflow_path).absolute()
if not src.is_file():
raise FileNotFoundError(f"Ewoks workflow not found: {src}")
if self._filename:
dst_dir = Path(get_workflows_dir(self._filename))
else:
dst_dir = Path(tempfile.gettempdir())
dst = dst_dir / src.name
if src != dst and not dst.is_file():
dst_dir.mkdir(parents=True, exist_ok=True)
shutil.copyfile(src, dst)
return str(dst)
def _start_workflow(self) -> None:
"""Called at scan.start(). Also starts _background_event_collector."""
if not self._filename and not self._scan_key:
raise RuntimeError("Neither scan key nor filename are known")
# clear previous stream
self._redis.delete(self._stream_name)
self._last_id = "0-0"
# submit workflow
workflow = self._resolve_workflow_path()
inputs = self._get_workflow_task_inputs()
self._future = submit(
args=(workflow,),
kwargs=dict(
inputs=inputs,
convert_destination="last_stop_scan.json",
),
queue=self._queue,
)
logger.info(
"[%s] Workflow %s started for fname=%s key=%s, queue=%s",
_timestamp(),
self._future.uuid,
self._filename,
self._scan_key,
self._queue,
)
# start background greenlet
self._greenlet = gevent.spawn(self._background_event_collector)
def _background_event_collector(self) -> None:
"""Continuously fetch and buffer feedback events."""
while True:
try:
data = self._redis.xread(
{self._stream_name: self._last_id},
count=None, # read all available
block=_REDIS_XREAD_BLOCK_MS,
)
if not data:
continue
for _, events in data:
for event_id, event in events:
self._last_id = event_id
self._events_buffer.append(event)
except Exception as ex:
logger.exception("Error reading Redis stream: %s", ex)
def _stop_workflow(self) -> None:
"""Called at scan.stop(). Only really stops _background_event_collector.
The workflow needs to wait for scan.stop() to return in order to sum frames.
"""
if self._greenlet is not None:
self._greenlet.kill(block=True)
self._greenlet = None
# keep check only to catch exceptions: the WriteSumFrame task will
# not fire until the data is flushed, which only happens at scan.stop()
if self._future is None:
return
if self._future.done():
try:
self._future.result()
except Exception as ex:
logger.error("Stop scan workflow failed: %s", ex)
def _get_filter_transmissions(self) -> Sequence:
"""Return a list of calculated transmission values for every filter or
attenuator position. Results are cached per preset instance and cleared
when the energy or filter object is changed.
"""
import numpy
from id31.attenuator import Attenuator # pyright: ignore[reportMissingImports]
from id31.attenuator import SiO2trans # pyright: ignore[reportMissingImports]
cache_key = (id(self._filter), self._energy)
if self._transmissions_cache and self._transmissions_cache[0] == cache_key:
return self._transmissions_cache[1]
if isinstance(self._filter, FilterSet):
result = [
f.get("transmission_calc", None) for f in self._filter.get_filters()
]
elif isinstance(self._filter, Attenuator):
result = SiO2trans(self._energy, numpy.arange(_ID31_ATTENUATOR_BITS))
else:
raise RuntimeError(f"Filter {self._filter} is not supported.")
self._transmissions_cache = (cache_key, result)
return result
def _predicted_frame_maxima(
self, frame_max: float, current_atten_pos: int
) -> list[tuple[int, float]] | None:
"""Predicted frame max at every usable attenuator position, given a
measurement ``frame_max`` taken at ``current_atten_pos``.
Returns a list of ``(position, predicted_max)`` for every position with
a usable transmission (skipping ``None``/negative table entries), or
``None`` when the prediction cannot be made at all — missing transmission
table, or the current position's transmission zero/undefined.
Shared scaffold for :meth:`_find_position_nearest_target` (reactive mode)
and :meth:`_find_least_atten_below_threshold` (freeze mode), which apply
their own selection rule over the returned predictions.
"""
transmissions = self._get_filter_transmissions()
t_current = transmissions[current_atten_pos]
if t_current is None or t_current <= 0:
return None
count_rate = frame_max / t_current
return [
(idx, count_rate * t)
for idx, t in enumerate(transmissions)
if t is not None and t >= 0
]
def _find_position_nearest_target(
self, frame_max: float, current_atten_pos: int
) -> int | None:
"""Find the filter/attenuator position whose predicted intensity is closest
to `_target_max` while staying below `_detector_saturation`.
Returns `None` when the calculation cannot be performed
(e.g. missing transmission data).
"""
try:
predictions = self._predicted_frame_maxima(frame_max, current_atten_pos)
if predictions is None:
return None
# pick the position giving cts closest to _target_max while staying
# below _detector_saturation
best_pos = None
best_deviation = float("inf")
for idx, predicted in predictions:
if predicted < self._detector_saturation:
deviation = abs(predicted - self._target_max)
if deviation < best_deviation:
best_deviation = deviation
best_pos = idx
return best_pos
except Exception as ex:
logger.warning("Cannot compute optimal filter position: %s", ex)
return None
def _find_least_atten_below_threshold(
self,
frame_max: float,
current_atten_pos: int,
threshold: float,
) -> int | None:
"""Smallest atten position whose predicted frame max is strictly below
``threshold``, using the SiO2 transmission table. Used by the ``"freeze"``
mode loop to jump to the predicted optimal attenuation in a single step.
Returns:
- the smallest valid index ``t`` such that
``predicted(t) = count_rate * transmissions[t] < threshold``
(maximises signal subject to the ghost-peak threshold);
- the largest valid index (i.e. strongest attenuation available) when
no position satisfies the constraint — caller is expected to log a
warning and freeze there;
- ``None`` when the calculation cannot be performed (missing
transmission data, current transmission zero or undefined,
exception during lookup) — caller is expected to fall back to the
±1 step loop.
"""
try:
predictions = self._predicted_frame_maxima(frame_max, current_atten_pos)
if predictions is None:
return None
largest_valid: int | None = None
for idx, predicted in predictions:
largest_valid = idx
if predicted < threshold:
return idx # smallest atten with predicted<threshold wins
# nothing predicted below threshold: fall back to the strongest
# available attenuation (largest valid index), or None if none valid
return largest_valid
except Exception as ex:
logger.warning(
"Cannot compute optimal attenuation for threshold %g: %s",
threshold,
ex,
)
return None
def _predict_frame_max_at(
self, frame_max: float, current_atten: int, target_atten: int
) -> float:
"""Predict the max pixel intensity at attenuator `target_atten` given a
measurement at attenuator `current_atten`. Returns `inf` when the prediction
cannot be made, treated as saturation (conservative).
"""
try:
transmissions = self._get_filter_transmissions()
t_current = transmissions[current_atten]
t_target = transmissions[target_atten]
if t_current and t_current > 0 and t_target is not None:
return frame_max * t_target / t_current
except Exception:
pass
return float("inf")
def _apply_filter(self, target_atten: int) -> None:
"""Move the filter/atten with blocking."""
try:
if isinstance(self._filter, FilterSet):
self._filter.set_filter(target_atten)
while self._filter.rotation_axis.is_moving:
gevent.sleep(_MOTOR_POLL_INTERVAL)
else: # it's Attenuator
# use ID31 custom `att` function: changes atten.bits internally
setup_globals.att(target_atten)
# self._filter.bits = target_atten
# TODO proper way to block atten.bits?
while self._filter.bits != target_atten:
gevent.sleep(_MOTOR_POLL_INTERVAL)
logger.debug(
"[%s] %s moved to position %d",
_timestamp(),
self._filter_name,
target_atten,
)
except ValueError as err:
logger.error(
"[%s] Could not set %s to %d: %s",
_timestamp(),
self._filter_name,
target_atten,
err,
)
[docs]
def get_iterator(
self,
chain: AcquisitionChain, # type: ignore
) -> Generator[ChainIterationPreset, None, None]: # type: ignore
"""Consume feedback events each iteration."""
iter_nb = 0
while True:
yield ScanPointIterator(self, chain, iter_nb)
iter_nb += 1
[docs]
class ScanPointIterator(ChainIterationPreset): # type: ignore
def __init__(
self,
parent: StopIntegrateSum,
chain: AcquisitionChain, # type: ignore
iter_nb: int,
):
self.iter = iter_nb
self.parent = parent
self.chain = chain
# executed synchronously
[docs]
def prepare(self) -> None:
"""Adjust attenuator/filter and stop scan if "stop" message received, at the
preparation of each scan point. Skip the fist point.
"""
if self.iter == 0:
return
# drain events first so _metrics_by_frame and stop requests are up to
# date before any atten decision.
self._check_workflow_events()
if self.parent._atten_mode == "reactive":
self._adjust_filter()
elif not self.parent._setup_phase_done:
self._freeze_setup_step()
[docs]
def start(self) -> None:
"""Start blissdata stream cursor at the start of the *first* scan point."""
if self.iter == 0:
stream = self.parent._scan.streams[f"{self.parent._detector_name}:image"]
self.parent._cursor = stream.cursor()
def _adjust_filter(self) -> None:
"""Read the frame acquired in the previous iter and adjust the
attenuator/filter position.
If:
1. **Saturating** (max >= ``detector_saturation``): increase attenuation.
Use transmission ratios to jump to the optimal position;
fall back to +1.
2. **Too dim** (max < ``frame_target_min``): decrease attenuation.
Predict intensity at the target to avoid saturating;
stay put if it would.
3. **Acceptable range**: do nothing.
"""
frame_max = self._read_last_frame_max()
current_atten, max_atten = self._current_atten_pos()
# last frame was saturating: *increase* atten
if frame_max >= self.parent._detector_saturation and current_atten < max_atten:
best = self.parent._find_position_nearest_target(frame_max, current_atten)
if best is not None and best > current_atten:
target_atten = best
else:
target_atten = current_atten + 1 # fallback: +1
target_atten = min(target_atten, max_atten)
_log_atten_action(
logging.WARNING,
"↓",
"red",
self.iter - 1,
frame_max,
self.parent._detector_saturation,
"≥",
"atten: %d → %d" % (current_atten, target_atten),
)
self.parent._apply_filter(target_atten)
# last frame was too dim: *decrease* atten
elif frame_max < self.parent._target_min and current_atten > 0:
best = self.parent._find_position_nearest_target(frame_max, current_atten)
if best is not None and best < current_atten:
target_atten = best
else:
target_atten = current_atten - 1 # fallback: step down
target_atten = max(target_atten, 0)
# if removing atten would fry detector: do nothing
predicted = self.parent._predict_frame_max_at(
frame_max, current_atten, target_atten
)
if predicted >= self.parent._detector_saturation:
_log_atten_action(
logging.WARNING,
"⨉",
"yellow",
self.iter - 1,
frame_max,
self.parent._target_min,
"<",
"atten: %d (keep)" % current_atten,
)
else:
_log_atten_action(
logging.WARNING,
"↑",
"blue",
self.iter - 1,
frame_max,
self.parent._target_min,
"<",
"atten: %d → %d" % (current_atten, target_atten),
)
self.parent._apply_filter(target_atten)
else:
_log_atten_action(
logging.INFO,
"✓",
"green",
self.iter - 1,
frame_max,
self.parent._detector_saturation,
"<",
"atten: %d (in range [%.2e, %.2e])"
% (current_atten, self.parent._target_min, self.parent._target_max),
)
def _check_workflow_events(self) -> None:
"""Drain buffered workflow events.
Dispatches by ``type`` field:
- ``b"stop"`` → stop the scan.
- ``b"metric"`` → parse ``frame``, ``max_pixel``, ``spottiness`` into
``parent._metrics_by_frame`` for later consumption by the freeze-mode
setup loop.
- anything else → logged at DEBUG and dropped.
"""
events_to_process, self.parent._events_buffer = (
self.parent._events_buffer,
[],
)
for event in events_to_process:
etype = event.get(b"type", b"")
if etype == b"stop":
self.chain.scan.stop(wait=False)
return
if etype == b"metric":
try:
metric = {
"frame": int(event.get(b"frame", -1)),
"max_pixel": float(event.get(b"max_pixel", 0)),
"spottiness": float(event.get(b"spottiness", 0)),
}
except (TypeError, ValueError):
logger.debug("Malformed metric event dropped: %r", event)
continue
self.parent._metrics_by_frame.append(metric)
continue
logger.debug("Unknown event type %r: %r", etype, event)
# ---------------------------------------------------------------
# freeze-mode state machine
# ---------------------------------------------------------------
def _freeze_setup_step(self) -> None:
"""Drive the ``attenuation_mode="freeze"`` setup phase.
Called each iteration (except iter 0) until ``parent._setup_phase_done``
flips to True. The state machine:
1. Iter 1, classify exactly once from the frame-0 spottiness metric
published by the Ewoks workflow. On timeout, fall back to the
``"standard"`` branch.
2. ``"standard"`` branch: iteratively tune ``atten.bits`` until frame
max drops below ``ghost_threshold_per_frame``, then freeze.
3. ``"spotty_probing"`` branch: set atten to ``spotty_safe_atten`` and
accumulate ``spotty_stability_frames`` spottiness values to decide
stable vs. unstable.
4. ``"spotty_stable_tuning"`` branch: run the same safe-frame loop
from ``spotty_safe_atten``, then apply ``+spotty_extra_atten`` and
freeze.
5. ``"spotty_unstable"`` branch: keep ``spotty_safe_atten`` frozen.
"""
parent = self.parent
# step 1: classify from frame 0 (blocking, bounded wait)
if parent.classification is None:
spottiness = self._wait_for_frame0_metric()
if spottiness is None:
logger.warning(
"[%s] no frame-0 metric within %.1fs; "
"defaulting to 'standard' branch",
_timestamp(),
parent._metric_timeout,
)
parent.classification = "standard"
elif spottiness >= parent._spottiness_threshold:
logger.info(
"[%s] frame-0 spottiness=%.4f ≥ %.4f → spotty branch; "
"setting atten to safe-large value %d",
_timestamp(),
spottiness,
parent._spottiness_threshold,
parent._spotty_safe_atten,
)
parent.classification = "spotty_probing"
parent._spotty_probe_start_iter = self.iter
parent._apply_filter(parent._spotty_safe_atten)
return
else:
logger.info(
"[%s] frame-0 spottiness=%.4f < %.4f → standard branch",
_timestamp(),
spottiness,
parent._spottiness_threshold,
)
parent.classification = "standard"
# step 3: spotty probe — accumulate X spottiness values at safe_atten
if parent.classification == "spotty_probing":
probe_start = parent._spotty_probe_start_iter or self.iter
spotty_values = [
m["spottiness"]
for m in parent._metrics_by_frame
if m["frame"] >= probe_start
]
if len(spotty_values) < parent._spotty_stability_frames:
return # wait for more metric events
window = spotty_values[: parent._spotty_stability_frames]
mean = sum(window) / len(window)
rel_change = (max(window) - min(window)) / mean if mean > 0 else 0.0
logger.info(
"[%s] spotty stability over %d frames: rel_change=%.3f (tol=%.3f)",
_timestamp(),
parent._spotty_stability_frames,
rel_change,
parent._spotty_stability_tol,
)
if rel_change <= parent._spotty_stability_tol:
parent.classification = "spotty_stable_tuning"
else:
parent.classification = "spotty_unstable"
self._freeze_at(self._current_atten_pos()[0])
return
# step 2 & 4: run (or continue) the safe-frame loop
if parent.classification in ("standard", "spotty_stable_tuning"):
converged = self._safe_frame_step()
if not converged:
return
# converged: in standard just freeze; in spotty_stable_tuning apply
# the extra attenuation bump and then freeze
current_atten, max_atten = self._current_atten_pos()
if parent.classification == "spotty_stable_tuning":
target = min(current_atten + parent._spotty_extra_atten, max_atten)
if target != current_atten:
logger.info(
"[%s] spotty_stable: atten %d → %d (+%d extra)",
_timestamp(),
current_atten,
target,
parent._spotty_extra_atten,
)
parent._apply_filter(target)
current_atten = target
parent.classification = "spotty_stable"
self._freeze_at(current_atten)
def _wait_for_frame0_metric(self) -> Optional[float]:
"""Block up to ``parent._metric_timeout`` waiting for the frame-0 metric
event. Returns the spottiness value or ``None`` on timeout.
This method naturally pauses the loopscan: ``ScanPointIterator.prepare``
runs synchronously *before* the next frame is acquired, so blocking here
delays iter 1's acquisition until the metric arrives.
"""
parent = self.parent
def _found() -> Optional[float]:
for m in parent._metrics_by_frame:
if m["frame"] == 0:
return m["spottiness"]
return None
spottiness = _found()
if spottiness is not None:
return spottiness
deadline = time.monotonic() + parent._metric_timeout
while time.monotonic() < deadline:
gevent.sleep(_SETUP_POLL_INTERVAL)
self._check_workflow_events()
spottiness = _found()
if spottiness is not None:
return spottiness
return None
def _current_atten_pos(self) -> tuple[int, int]:
"""Return ``(current_atten, max_atten)`` for the attached filter."""
if isinstance(self.parent._filter, FilterSet):
n = len(self.parent._filter.get_filters())
return self.parent._filter.get_filter(), n - 1
return self.parent._filter.bits, _ID31_ATTENUATOR_BITS - 1
def _read_last_frame_max(self) -> float:
"""Read the most-recent frame from the blissdata cursor and return its
max pixel value. Raises if the cursor was never started.
"""
if self.parent._cursor is None:
raise RuntimeError(
"No blissdata Cursor present, did the first scan point run?"
)
view = self.parent._cursor.read(timeout=_CURSOR_READ_TIMEOUT, last_only=True)
return float(view.get_data().max())
def _freeze_at(self, current_atten: int) -> None:
"""Finalise freeze-mode setup: record the frozen position, set the
``_setup_phase_done`` flag, and emit the freeze log line. Caller is
expected to have already set ``parent.classification`` to its final
value.
"""
parent = self.parent
parent._atten_frozen_at = current_atten
parent._setup_phase_done = True
logger.info(
"[%s] ● freeze (%s): atten=%d",
_timestamp(),
parent.classification,
current_atten,
)
def _safe_frame_step(self) -> bool:
"""Run one step of the attenuator optimisation loop.
Computes the predicted optimal attenuator position from the SiO2
transmission table (``parent._find_least_atten_below_threshold``),
jumps there, and lets the next iteration verify by reading the
actual frame max at the new position. Returns ``True`` when the
sweet spot has been reached:
- **jump case** (target != current): apply filter, return False.
- **verify case** (target == current): if ``frame_max < threshold``,
converged; if ``frame_max >= threshold`` we are already at the
strongest available attenuation, log a warning and converge
anyway (best we can do).
Falls back to the legacy ±1 step loop when the helper cannot
produce a target (missing transmission data, exception).
"""
parent = self.parent
frame_max = self._read_last_frame_max()
current_atten, max_atten = self._current_atten_pos()
threshold = parent._ghost_threshold_per_frame
target = parent._find_least_atten_below_threshold(
frame_max, current_atten, threshold
)
if target is None:
return self._safe_frame_step_pm1(
frame_max, current_atten, max_atten, threshold
)
def _log(
level: int, symbol: str, color: str, comparator: str, msg: str
) -> None:
_log_atten_action(
level,
symbol,
color,
self.iter - 1,
frame_max,
threshold,
comparator,
"setup: " + msg,
)
# jump case: model picked a different position, jump and let the next
# iter verify the actual frame max at that position.
if target != current_atten:
_log(
logging.INFO,
"⇉",
"yellow",
"≥" if frame_max >= threshold else "<",
"atten %d → %d (predicted-optimal jump)" % (current_atten, target),
)
parent._apply_filter(target)
return False
# verify case: helper says current_atten is already optimal.
if frame_max < threshold:
_log(
logging.INFO,
"⇢",
"green",
"<",
"atten %d (predicted-optimal, converged)" % current_atten,
)
return True
# frame_max still above threshold and helper returned current → we are at
# the strongest available attenuation. Freeze here, log a warning.
_log(
logging.WARNING,
"⨉",
"red",
"≥",
"atten %d (freezing at max attenuation: no position predicts < threshold)"
% current_atten,
)
return True
def _safe_frame_step_pm1(
self,
frame_max: float,
current_atten: int,
max_atten: int,
threshold: float,
) -> bool:
"""Legacy ±1 safe-frame step. Used as fallback when the predicted-
optimal helper cannot produce a target (missing transmission data,
exception during lookup).
"""
parent = self.parent
def _log(comparator: str, msg: str) -> None:
_log_atten_action(
logging.INFO,
"⇢",
"yellow",
self.iter - 1,
frame_max,
threshold,
comparator,
"setup: " + msg,
)
if frame_max >= threshold and current_atten < max_atten:
target = min(current_atten + 1, max_atten)
_log("≥", "atten %d → %d" % (current_atten, target))
parent._apply_filter(target)
return False
if frame_max < threshold and current_atten > 0:
target = current_atten - 1
predicted = parent._predict_frame_max_at(frame_max, current_atten, target)
if predicted >= threshold:
_log("<", "atten %d (keep; -1 predicts ≥ threshold)" % current_atten)
return True
_log("<", "atten %d → %d" % (current_atten, target))
parent._apply_filter(target)
return False
_log("=", "atten %d (boundary, stop)" % current_atten)
return True