from __future__ import annotations
from typing import Any
from typing import Optional
import numpy as np
SEQUENCES = frozenset({"tomo:helical", "tomo:zhelical"})
[docs]
def matches(processor: Any, entry: Any) -> bool:
return processor._sequence_name(entry) in SEQUENCES
def _helical_turns(entry: Any) -> int:
scan_info = entry["technique"]["scan"]
if "nb_turns" in scan_info:
return max(int(scan_info["nb_turns"]), 1)
return 1
def _projection_indices(
processor: Any, plan: list[tuple[Optional[str], int]]
) -> list[int]:
return [
index
for index, (kind, _count) in enumerate(plan)
if processor._is_projection_kind(kind)
]
def _is_split_zhelical(
processor: Any,
entry: Any,
plan: Optional[list[tuple[Optional[str], int]]] = None,
) -> bool:
scan_info = entry["technique"]["scan"]
if "nb_turns" not in scan_info or "z_step" not in scan_info:
return False
plan = processor._subscan_plan(entry) if plan is None else plan
return len(_projection_indices(processor, plan)) == 3
[docs]
def expanded_subscan_plan(
processor: Any, entry: Any
) -> list[tuple[Optional[str], int]]:
plan = processor._subscan_plan(entry)
split_zhelical = _is_split_zhelical(processor, entry, plan)
projection_ordinal = 0
expanded = []
for kind, count in plan:
actual_count = int(count)
if processor._is_projection_kind(kind):
if not split_zhelical or projection_ordinal == 1:
actual_count *= _helical_turns(entry)
projection_ordinal += 1
expanded.append((kind, actual_count))
return expanded
[docs]
def synthetic_translation(processor: Any, entry: Any, alias_name: str) -> np.ndarray:
start_positions = processor._position_array(entry, alias_name)
if alias_name != "translation_z":
return start_positions
scan_info = entry["technique"]["scan"]
expanded_plan = expanded_subscan_plan(processor, entry)
nframes = sum(count for _kind, count in expanded_plan)
start_position = float(start_positions[0])
split_zhelical = _is_split_zhelical(
processor, entry, processor._subscan_plan(entry)
)
total_projection_frames = sum(
count for kind, count in expanded_plan if processor._is_projection_kind(kind)
)
if total_projection_frames <= 0:
return np.full(nframes, start_position, dtype=float)
z_step = float(scan_info["z_step"])
turns = _helical_turns(entry)
end_position = start_position + z_step * turns
projection_step = (end_position - start_position) / float(total_projection_frames)
projection_index = 0
current_position = start_position
projection_ordinal = 0
segments = []
for kind, frame_count in expanded_plan:
if kind in {"tomo:dark", "tomo:flat"}:
segment = np.full(frame_count, current_position, dtype=float)
elif kind == "tomo:return_ref":
segment = np.full(frame_count, end_position, dtype=float)
else:
if split_zhelical and projection_ordinal == 0:
segment = np.full(frame_count, start_position, dtype=float)
elif split_zhelical and projection_ordinal == 2:
segment = np.full(frame_count, end_position, dtype=float)
elif split_zhelical:
segment = start_position + (
(end_position - start_position) / float(frame_count)
) * np.arange(frame_count, dtype=float)
else:
segment = start_position + projection_step * (
projection_index + np.arange(frame_count, dtype=float)
)
projection_index += frame_count
projection_ordinal += 1
if frame_count:
current_position = float(np.asarray(segment).reshape(-1)[-1])
segments.append(np.asarray(segment, dtype=float).reshape(-1))
return np.concatenate(segments) if segments else np.asarray([], dtype=float)
[docs]
def segment_specs(processor: Any, entry: Any, records: list[dict[str, Any]]):
plan = processor._subscan_plan(entry)
if not _is_split_zhelical(processor, entry, plan):
return [{"label": None, "pieces": records}]
projection_indices = _projection_indices(processor, plan)
first_records = records[: projection_indices[1]]
helix_records = records[projection_indices[1] : projection_indices[2]]
last_records = records[projection_indices[2] :]
start_z = float(processor._position_array(entry, "translation_z")[0])
end_z = start_z + float(entry["technique"]["scan"]["z_step"]) * _helical_turns(
entry
)
first_label, last_label = (
("bottom", "top") if start_z <= end_z else ("top", "bottom")
)
return [
{"label": first_label, "pieces": first_records},
{"label": "helix", "pieces": helix_records},
{"label": last_label, "pieces": last_records},
]