from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Any
from typing import Dict
from blissdata.beacon.data import BeaconData
from blissdata.redis_engine.store import DataStore
from blissoda.import_utils import unavailable_class
from blissoda.import_utils import unavailable_function
from ..persistent.parameters import ParameterInfo
try:
from bliss.scanning.scan import ScanState as BlissScanState
from tomo.globals import get_active_tomo_config
except ImportError as ex:
BlissScanState = unavailable_class(ex)
get_active_tomo_config = unavailable_function(ex)
from esrf_pathlib import ESRFPath
from blissoda.resources import resource_filename
from blissoda.tomo.online_tomo_plotter import OnlineTomoAccumulatedPlotter
from blissoda.tomo.tomo_processor import TomoProcessor
from ..ewoks_utils import submit
from .tomo_model import TomoProcessorModel
from .utils import ImageKey
from .utils import calculate_CoR_estimate
from .utils import get_estimate_cor_metadata
from .utils import get_reconstruction_metadata
logger = logging.getLogger(__name__)
FUTURE_TIMEOUT = None
SUPPORTED_TOMO_SCANS = ["tomo:basic", "tomo:zseries", "tomo:fullturn", "tomo:halfturn"]
# The following scan types are not yet supported:
# tomo:helical, tomo:zhelical, tomo:ptychotomo, tomo:multitomo , tomo:holotomo
[docs]
class OnlineTomoSliceProcessor(
TomoProcessor,
parameters=[
ParameterInfo(
"batch_size",
category="slice_reconstruction_parameters",
doc="Number of projections to process in each batch",
),
ParameterInfo(
"flat_reduction_method",
category="flat_dark_reduction_parameters",
doc="Method for flat field reduction ('mean', 'median')",
),
ParameterInfo(
"padding_mode",
category="slice_reconstruction_parameters",
doc="Padding mode for reconstruction ('edges', 'zeros', etc.)",
),
ParameterInfo(
"extra_options",
category="slice_reconstruction_parameters",
doc="Additional options for reconstruction as a dictionary",
),
],
):
def __init__(
self,
config: Dict[str, Any] | None = None,
defaults: Dict[str, Any] | None = None,
) -> None:
"""
Initialize the OnlineTomoSliceProcessor for online tomographic slice reconstruction.
This processor can be integrated into a BLISS beamline configuration. Typical
usage involves adding the following to a configuration yaml file:
- name: tomo_blissoda
plugin: generic
class: OnlineTomoSliceProcessor
package: blissoda.tomo.online_tomo_slice_processor
"""
if defaults is None:
defaults = {}
defaults.setdefault("trigger_at", "PREPARED")
defaults.setdefault("workflow", "tomo_reconstruction_online.json")
defaults.setdefault("nabu_config_file", resource_filename("tomo", "nabu.conf"))
defaults.setdefault("slice_index", "middle")
defaults.setdefault("cor_algorithm", "sliding-window")
defaults.setdefault("phase_retrieval_method", "None")
defaults.setdefault("delta_beta", "100")
defaults.setdefault("offset_mm", 0.0)
defaults.setdefault("estimated_cor", 0.0)
defaults.setdefault("show_last_slice", False)
defaults.setdefault("flat_reduction_method", "mean")
defaults.setdefault("batch_size", 100)
defaults.setdefault("padding_mode", "edges")
defaults.setdefault("extra_options", None)
super(TomoProcessor, self).__setattr__(
"_tomo_model", TomoProcessorModel(**defaults)
)
# Initialize internal state
self._sequence_scan = None
self._dark_n = 0
self._flat_n = 0
self._sample_name = None
# Initialize workflow output paths
self.reduced_dark_path = None
self.reduced_flat_path = None
# Initialize workflow futures
self.reduced_dark_future = None
self.reduced_flat_future = None
# Workflow paths
self._reduced_dark_flat_workflow = "tomo_reduction_online.json"
self._reconstruction_workflow = "tomo_reconstruction_online.json"
# Initialize the plotter for accumulated slices
self.accumulated_plotter = OnlineTomoAccumulatedPlotter(
history=1,
)
super(TomoProcessor, self).__init__(config=config, defaults=defaults)
[docs]
def execute_workflow(self, scan) -> None:
"""Execute workflow based on scan type and state."""
# Check if this is a tomography configuration scan
technique = scan.scan_info.get("technique", "")
if (
"tomoconfig" in technique
and scan.scan_info["title"] in SUPPORTED_TOMO_SCANS
):
logger.info(
f"Starting tomography sequence for scan: {scan.scan_info.get('title', 'Unknown')}"
)
self._reset_state()
self._sequence_scan = scan
self._output_path = self.get_processed_path(scan.scan_saving.data_path)
# Set sample name from output path, format will be {sample}_{dataset}
self._sample_name = self._output_path.name
return
# Handle individual scans within the sequence
if self._sequence_scan is not None:
scan_type = self._get_scan_type(scan)
logger.info(f"Processing scan: {scan_type}")
if scan_type is ImageKey.DARK_FIELD:
self._dark_n = self._extract_frame_count(scan, "dark")
self.on_new_darkflat_scan("dark", index=self._flat_n)
elif scan_type is ImageKey.FLAT_FIELD:
self._flat_n = self._extract_frame_count(scan, "flat")
self.on_new_darkflat_scan("flat", index=self._dark_n)
elif scan_type is ImageKey.PROJECTION:
self.wait_for_futures()
# Only run reconstruction if we have both dark and flat processed
if self.reduced_dark_path and self.reduced_flat_path:
self.result = self.on_new_projection_scan()
else:
logger.warning(
"Cannot run reconstruction: missing reduced dark or flat files"
)
def _reset_state(self) -> None:
"""Reset internal state for new tomography sequence."""
self._sequence_scan = None
self._dark_n = 0
self._flat_n = 0
self._output_path = None
self._sample_name = None
self.reduced_dark_path = None
self.reduced_flat_path = None
self.reduced_dark_future = None
self.reduced_flat_future = None
def _extract_frame_count(self, scan, frame_type: str) -> int:
technique = scan.scan_info.get("technique", {})
frame_info = technique.get(frame_type, {})
return int(frame_info.get(f"{frame_type}_n", 0))
def _get_scan_type(self, scan) -> ImageKey:
"""Determine the type of scan based on the key in scan_info."""
image_key = int(scan.scan_info.get("technique", {}).get("image_key", 3))
return ImageKey(image_key)
[docs]
def estimate_CoR(self) -> float:
"""
Estimate the center of rotation based on scan metadata and processor parameters.
"""
pixel_size_mm, detector_width, translation_y_mm = get_estimate_cor_metadata()
center_of_rotation = calculate_CoR_estimate(
pixel_size_mm=pixel_size_mm,
detector_width=detector_width,
translation_y_mm=translation_y_mm,
offset_mm=self.offset_mm,
)
return center_of_rotation
[docs]
def wait_for_futures(self):
"""Wait for any ongoing reduction workflows to complete."""
for attr in ["reduced_dark_future", "reduced_flat_future"]:
future = getattr(self, attr, None)
if future is not None:
try:
future.result(timeout=FUTURE_TIMEOUT) # Wait for completion
logger.info(f"{attr} completed successfully")
except Exception as e:
logger.error(f"Error in {attr}: {e}")
[docs]
def get_processed_path(self, data_path: str) -> Path:
"""
Get the processed path for the reconstructed slices.
"""
output_path = ESRFPath(data_path)
return (
output_path.processed_data_path / output_path.collection / output_path.stem
)
def _get_workflow(self, workflow: str) -> Dict[str, Any]:
"""Load workflow definition from JSON file."""
try:
with open(resource_filename("tomo", workflow), "r") as wf:
return json.load(wf)
except Exception as e:
logger.error(f"Failed to load workflow {workflow}: {e}")
raise
def _connect_data_store(self) -> None:
"""Initialize connection to the data store."""
try:
beacon_client = BeaconData()
redis_url = beacon_client.get_redis_data_db()
data_store = DataStore(redis_url)
logger.info("Connected to beacon")
return data_store
except Exception as e:
logger.error(f"Failed to connect to beacon: {e}")
raise
[docs]
def on_new_darkflat_scan(self, scan_type: str, index: int = 0):
"""Run reduction workflow for dark or flat frames."""
try:
data_store = self._connect_data_store()
_, key = data_store.get_last_scan()
# Create output path
output_path = self._output_path / f"references/{scan_type}_field.h5"
output_path.parent.mkdir(parents=True, exist_ok=True)
# Determine reduction method based on scan type
if "dark" in scan_type.lower():
reduction_method = "mean"
convert_destination = str(
output_path.parent
/ f"workflows/{self._sample_name}_dark_online.json"
)
elif "flat" in scan_type.lower():
reduction_method = self.flat_reduction_method
convert_destination = str(
output_path.parent
/ f"workflows/{self._sample_name}_flat_online.json"
)
else:
logger.warning(f"Unknown scan type for reduction: {scan_type}")
return
logger.info(
f"Submitting reduction workflow for {scan_type} with method {reduction_method}"
)
# Submit the reduction workflow
future = self._submit_reduction_workflow(
scan_key=key,
output_path=output_path,
reduction_method=reduction_method,
index=index,
convert_destination=convert_destination,
)
# Use dynamic attribute names to store futures and paths
# e.g., reduced_dark_future, reduced_flat_future
attr_name = f"reduced_{scan_type}_future"
setattr(self, attr_name, future)
# e.g., reduced_dark_path, reduced_flat_path
attr_name = f"reduced_{scan_type}_path"
setattr(self, attr_name, str(output_path))
except Exception as e:
logger.error(f"Failed to run reduction workflow for {scan_type}: {e}")
[docs]
def on_new_projection_scan(self):
"""Run reconstruction workflow for projection data."""
try:
data_store = self._connect_data_store()
_, key = data_store.get_last_scan()
scan = data_store.load_scan(key)
# Create output path
output_path = self._output_path / "slices/online"
output_path.mkdir(parents=True, exist_ok=True)
# Get rotation motor name from tomo configuration
tomo_config = get_active_tomo_config()
rotation_motor = tomo_config.rotation_axis.name
# Extract reconstruction parameters from scan metadata
technique_info = scan.info.get("technique", {})
tomo_n = int(technique_info.get("proj", {}).get("proj_n"))
scan_info = self._sequence_scan.scan_info.get("technique", {}).get(
"scan", {}
)
halftomo = scan_info.get("half_acquisition", False)
# Get metadata for CoR estimation
center_of_rotation = self.estimate_CoR()
# Get reconstruction metadata
distance_m, energy_keV, pixel_size_m = get_reconstruction_metadata(
self._sequence_scan
)
logger.info(
f"Submitting reconstruction workflow - tomo_n: {tomo_n}, rotation_motor: {rotation_motor}"
)
# Submit the reconstruction workflow
job = self._submit_reconstruction_workflow(
scan_key=key,
output_path=output_path,
rotation_motor=rotation_motor,
total_nb_projection=tomo_n,
center_of_rotation=center_of_rotation,
batch_size=self.batch_size,
reduced_dark_path=self.reduced_dark_path,
reduced_flat_path=self.reduced_flat_path,
pixel_size_m=pixel_size_m,
distance_m=distance_m,
energy_keV=energy_keV,
slice_index=self.slice_index,
delta_beta=self.delta_beta,
halftomo=halftomo,
padding_mode=self.padding_mode,
extra_options=self.extra_options,
)
logger.info("Reconstruction workflow submitted successfully")
# Trigger the plotter if show_last_slice is enabled
if self.show_last_slice:
self.accumulated_plotter.handle_workflow_result(
future=job,
output_path=str(output_path),
slice_index=self.slice_index,
batch_size=self.batch_size,
)
return job
except Exception as e:
logger.error(f"Failed to run reconstruction workflow: {e}")
def _submit_reduction_workflow(
self,
scan_key: str,
output_path: Path,
reduction_method: str,
index: int,
convert_destination: str,
):
"""Submit a reduction workflow for dark or flat frames."""
# Load workflow definition
workflow = self._get_workflow(self._reduced_dark_flat_workflow)
# Prepare inputs for the workflow
inputs = [
{
"task_identifier": "ewokstomo.tasks.online.reducedarkflat.OnlineReduceDarkFlat",
"name": "scan_key",
"value": scan_key,
},
{
"task_identifier": "ewokstomo.tasks.online.reducedarkflat.OnlineReduceDarkFlat",
"name": "output_file_path",
"value": str(output_path),
},
{
"task_identifier": "ewokstomo.tasks.online.reducedarkflat.OnlineReduceDarkFlat",
"name": "reduction_method",
"value": reduction_method,
},
{
"task_identifier": "ewokstomo.tasks.online.reducedarkflat.OnlineReduceDarkFlat",
"name": "index",
"value": index,
},
]
# Submit the workflow
kwargs = {
"inputs": inputs,
"convert_destination": convert_destination,
}
try:
job = submit(args=(workflow,), kwargs=kwargs, queue=self.queue)
logger.info(f"Submitted reduction workflow job: {job}")
return job
except Exception as e:
logger.error(f"Failed to submit reduction workflow: {e}")
raise
def _submit_reconstruction_workflow(
self,
scan_key: str,
output_path: Path,
rotation_motor: str,
batch_size: int,
total_nb_projection: int,
center_of_rotation: float,
reduced_dark_path: str,
reduced_flat_path: str,
pixel_size_m: float,
distance_m: float,
energy_keV: float,
slice_index: str = "middle",
delta_beta: float = 100.0,
halftomo: bool = False,
padding_mode: str = "edges",
extra_options: dict | None = None,
):
"""Submit a reconstruction workflow for projection data."""
if extra_options is None:
extra_options = {"centered_axis": True}
# Load workflow definition
workflow = self._get_workflow(self._reconstruction_workflow)
convert_destination = str(
output_path.parent
/ f"workflows/{self._sample_name}_slice_reconstruction.json"
)
# Prepare inputs for the workflow
inputs = [
{
"task_identifier": "ewokstomo.tasks.online.reconstruct_slice.OnlineReconstructSlice",
"name": "scan_key",
"value": scan_key,
},
{
"task_identifier": "ewokstomo.tasks.online.reconstruct_slice.OnlineReconstructSlice",
"name": "output_path",
"value": str(output_path),
},
{
"task_identifier": "ewokstomo.tasks.online.reconstruct_slice.OnlineReconstructSlice",
"name": "rotation_motor",
"value": rotation_motor,
},
{
"task_identifier": "ewokstomo.tasks.online.reconstruct_slice.OnlineReconstructSlice",
"name": "total_nb_projection",
"value": total_nb_projection,
},
{
"task_identifier": "ewokstomo.tasks.online.reconstruct_slice.OnlineReconstructSlice",
"name": "center_of_rotation",
"value": center_of_rotation,
},
{
"task_identifier": "ewokstomo.tasks.online.reconstruct_slice.OnlineReconstructSlice",
"name": "batch_size",
"value": batch_size,
},
{
"task_identifier": "ewokstomo.tasks.online.reconstruct_slice.OnlineReconstructSlice",
"name": "reduced_dark_path",
"value": str(reduced_dark_path),
},
{
"task_identifier": "ewokstomo.tasks.online.reconstruct_slice.OnlineReconstructSlice",
"name": "reduced_flat_path",
"value": str(reduced_flat_path),
},
{
"task_identifier": "ewokstomo.tasks.online.reconstruct_slice.OnlineReconstructSlice",
"name": "pixel_size_m",
"value": pixel_size_m,
},
{
"task_identifier": "ewokstomo.tasks.online.reconstruct_slice.OnlineReconstructSlice",
"name": "distance_m",
"value": distance_m,
},
{
"task_identifier": "ewokstomo.tasks.online.reconstruct_slice.OnlineReconstructSlice",
"name": "energy_keV",
"value": energy_keV,
},
{
"task_identifier": "ewokstomo.tasks.online.reconstruct_slice.OnlineReconstructSlice",
"name": "slice_index",
"value": slice_index,
},
{
"task_identifier": "ewokstomo.tasks.online.reconstruct_slice.OnlineReconstructSlice",
"name": "delta_beta",
"value": delta_beta,
},
{
"task_identifier": "ewokstomo.tasks.online.reconstruct_slice.OnlineReconstructSlice",
"name": "halftomo",
"value": halftomo,
},
{
"task_identifier": "ewokstomo.tasks.online.reconstruct_slice.OnlineReconstructSlice",
"name": "padding_mode",
"value": padding_mode,
},
{
"task_identifier": "ewokstomo.tasks.online.reconstruct_slice.OnlineReconstructSlice",
"name": "extra_options",
"value": extra_options,
},
]
# Submit the workflow
kwargs = {
"inputs": inputs,
"convert_destination": convert_destination,
}
try:
job = submit(args=(workflow,), kwargs=kwargs, queue=self.queue)
logger.info(f"Submitted reconstruction workflow job: {job}")
return job
except Exception as e:
logger.error(f"Failed to submit reconstruction workflow: {e}")
raise