Source code for blissoda.tomo.nxtomo.backandforth
from __future__ import annotations
from typing import Any
from typing import Optional
import numpy as np
SEQUENCES = frozenset({"tomo:backandforthtomo"})
[docs]
def matches(processor: Any, entry: Any) -> bool:
return processor._sequence_name(entry) in SEQUENCES
def _turns(entry: Any) -> int:
scan_info = entry["technique"]["scan"]
return max(int(scan_info.get("nb_turns", 1)), 1)
def _loop_range(entry: Any) -> float:
return float(entry["technique"]["scan"]["scan_range"])
def _tomo_n(entry: Any) -> int:
return int(entry["technique"]["scan"]["tomo_n"])
def _projection_logical_ranges(
processor: Any, entry: Any, plan: list[tuple[Optional[str], int]]
) -> list[float]:
tomo_n = _tomo_n(entry)
if tomo_n <= 0:
raise ValueError("Invalid technique.scan.tomo_n for back-and-forth NXtomo")
loop_range = _loop_range(entry)
current_loop_points = 0
current_direction = np.sign(loop_range) or 1.0
ranges = []
for kind, count in plan:
if not processor._is_projection_kind(kind):
continue
block_range = current_direction * abs(loop_range) * float(count) / float(tomo_n)
ranges.append(block_range)
current_loop_points += int(count)
if current_loop_points == tomo_n:
current_loop_points = 0
current_direction *= -1.0
return ranges
[docs]
def expanded_subscan_plan(
processor: Any, entry: Any
) -> list[tuple[Optional[str], int]]:
return processor._subscan_plan(entry)
[docs]
def synthetic_translation(processor: Any, entry: Any, alias_name: str):
return processor._position_array(entry, alias_name)
[docs]
def synthetic_rotation(processor: Any, entry: Any) -> np.ndarray:
start_angle = float(processor._position_array(entry, "rotation")[0])
tomo_n = _tomo_n(entry)
step_deg = abs(_loop_range(entry)) / float(tomo_n)
direction = (
-1.0 if entry["technique"]["tomoconfig"].get("rotation_is_clockwise") else 1.0
)
plan = processor._subscan_plan(entry)
current_angle = start_angle
loop_index = 0
loop_projection_offset = 0
segments = []
for kind, frame_count in plan:
if kind in {"tomo:dark", "tomo:flat", "tomo:return_ref"}:
segment = np.full(frame_count, current_angle, dtype=float)
else:
loop_start = start_angle
if loop_index % 2 == 1:
loop_start = start_angle + direction * abs(_loop_range(entry))
loop_direction = direction if loop_index % 2 == 0 else -direction
local_start = (
loop_start + loop_direction * step_deg * loop_projection_offset
)
segment = local_start + loop_direction * step_deg * np.arange(
frame_count, dtype=float
)
loop_projection_offset += frame_count
if loop_projection_offset == tomo_n:
loop_projection_offset = 0
loop_index += 1
if frame_count:
current_angle = float(np.asarray(segment).reshape(-1)[-1])
segments.append(np.asarray(segment, dtype=float).reshape(-1))
return np.concatenate(segments) if segments else np.asarray([], dtype=float)
[docs]
def segment_specs(processor: Any, entry: Any, records: list[dict[str, Any]]):
plan = processor._subscan_plan(entry)
projection_ranges = _projection_logical_ranges(processor, entry, plan)
projection_index = 0
specs = []
current_pieces = []
current_abs_range = 0.0
ready_to_flush = False
for record in records:
if processor._is_projection_kind(record["kind"]) and ready_to_flush:
specs.append(
{
"label": f"turn_{len(specs) + 1:03d}",
"pieces": current_pieces,
}
)
current_pieces = []
current_abs_range = 0.0
ready_to_flush = False
current_pieces.append(record)
if processor._is_projection_kind(record["kind"]):
current_abs_range += abs(projection_ranges[projection_index])
projection_index += 1
if current_abs_range >= 360.0 - 1e-6:
ready_to_flush = True
if current_pieces:
specs.append(
{
"label": f"turn_{len(specs) + 1:03d}",
"pieces": current_pieces,
}
)
return specs