Source code for blissoda.tomo.nxtomo.backandforth

from __future__ import annotations

from typing import Any
from typing import Optional

import numpy as np

SEQUENCES = frozenset({"tomo:backandforthtomo"})


[docs] def matches(processor: Any, entry: Any) -> bool: return processor._sequence_name(entry) in SEQUENCES
def _turns(entry: Any) -> int: scan_info = entry["technique"]["scan"] return max(int(scan_info.get("nb_turns", 1)), 1) def _loop_range(entry: Any) -> float: return float(entry["technique"]["scan"]["scan_range"]) def _tomo_n(entry: Any) -> int: return int(entry["technique"]["scan"]["tomo_n"]) def _projection_logical_ranges( processor: Any, entry: Any, plan: list[tuple[Optional[str], int]] ) -> list[float]: tomo_n = _tomo_n(entry) if tomo_n <= 0: raise ValueError("Invalid technique.scan.tomo_n for back-and-forth NXtomo") loop_range = _loop_range(entry) current_loop_points = 0 current_direction = np.sign(loop_range) or 1.0 ranges = [] for kind, count in plan: if not processor._is_projection_kind(kind): continue block_range = current_direction * abs(loop_range) * float(count) / float(tomo_n) ranges.append(block_range) current_loop_points += int(count) if current_loop_points == tomo_n: current_loop_points = 0 current_direction *= -1.0 return ranges
[docs] def expanded_subscan_plan( processor: Any, entry: Any ) -> list[tuple[Optional[str], int]]: return processor._subscan_plan(entry)
[docs] def synthetic_translation(processor: Any, entry: Any, alias_name: str): return processor._position_array(entry, alias_name)
[docs] def synthetic_rotation(processor: Any, entry: Any) -> np.ndarray: start_angle = float(processor._position_array(entry, "rotation")[0]) tomo_n = _tomo_n(entry) step_deg = abs(_loop_range(entry)) / float(tomo_n) direction = ( -1.0 if entry["technique"]["tomoconfig"].get("rotation_is_clockwise") else 1.0 ) plan = processor._subscan_plan(entry) current_angle = start_angle loop_index = 0 loop_projection_offset = 0 segments = [] for kind, frame_count in plan: if kind in {"tomo:dark", "tomo:flat", "tomo:return_ref"}: segment = np.full(frame_count, current_angle, dtype=float) else: loop_start = start_angle if loop_index % 2 == 1: loop_start = start_angle + direction * abs(_loop_range(entry)) loop_direction = direction if loop_index % 2 == 0 else -direction local_start = ( loop_start + loop_direction * step_deg * loop_projection_offset ) segment = local_start + loop_direction * step_deg * np.arange( frame_count, dtype=float ) loop_projection_offset += frame_count if loop_projection_offset == tomo_n: loop_projection_offset = 0 loop_index += 1 if frame_count: current_angle = float(np.asarray(segment).reshape(-1)[-1]) segments.append(np.asarray(segment, dtype=float).reshape(-1)) return np.concatenate(segments) if segments else np.asarray([], dtype=float)
[docs] def segment_specs(processor: Any, entry: Any, records: list[dict[str, Any]]): plan = processor._subscan_plan(entry) projection_ranges = _projection_logical_ranges(processor, entry, plan) projection_index = 0 specs = [] current_pieces = [] current_abs_range = 0.0 ready_to_flush = False for record in records: if processor._is_projection_kind(record["kind"]) and ready_to_flush: specs.append( { "label": f"turn_{len(specs) + 1:03d}", "pieces": current_pieces, } ) current_pieces = [] current_abs_range = 0.0 ready_to_flush = False current_pieces.append(record) if processor._is_projection_kind(record["kind"]): current_abs_range += abs(projection_ranges[projection_index]) projection_index += 1 if current_abs_range >= 360.0 - 1e-6: ready_to_flush = True if current_pieces: specs.append( { "label": f"turn_{len(specs) + 1:03d}", "pieces": current_pieces, } ) return specs