Source code for blissoda.tomo.tomo_processor

from __future__ import annotations

import json
import logging
import time
from configparser import ConfigParser
from pathlib import Path
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

from ..bliss_globals import current_session
from ..ewoks_utils import submit
from ..persistent.parameters import ParameterInfo
from ..persistent.parameters import ParameterValue
from ..persistent.parameters import _format_info_category
from ..processor import BaseProcessor
from ..processor import BlissScanType
from ..resources import resource_filename
from ..utils import directories
from .flint_tomo_imshow import SingleSliceImshow
from .nabu_config import nabu_config_setup
from .tomo_model import TomoProcessorModel
from .utils import calculate_relative_CoR_estimate
from .utils import get_estimate_cor_metadata

SUPPORTED_TOMO_SCANS = ["tomo:basic", "tomo:zseries", "tomo:fullturn", "tomo:halfturn"]
# The following scan types are not yet supported:
# tomo:helical, tomo:zhelical, tomo:ptychotomo, tomo:multitomo , tomo:holotomo
_logger = logging.getLogger(__name__)
_COR_ALGORITHMS_REQUIRING_360 = {
    "composite-coarse-to-fine",
    "sino-coarse-to-fine",
}


[docs] class TomoProcessor( BaseProcessor, parameters=[ ParameterInfo( "slice_reconstruction_workflow", category="workflows", doc="Workflow file for slice reconstruction", ), ParameterInfo( "volume_reconstruction_workflow", category="workflows", doc="Workflow file for volume reconstruction", ), ParameterInfo("queue", category="workflows"), ParameterInfo( "_bliss_hdf5_path", category="files", doc="HDF5 Dataset path (filled automatically)", ), ParameterInfo( "_output_path", category="files", doc="Nx output path (filled automatically)", ), ParameterInfo( "nabu_config_file", category="slice_reconstruction_parameters", doc="Nabu configuration file.\nIt can be edited using the tomo_processor.edit_nabu_config() command.", ), ParameterInfo( "_reference_dir_to_soft_link", category="files", doc="Directory from which the reduced darks and flats will be linked.", ), ParameterInfo( "offset_mm", category="estimate_center_of_rotation", doc="Offset (mm) subtracted from the translation_y motor position", ), ParameterInfo( "cor_algorithm", category="estimate_center_of_rotation", doc="Method to estimate the centre of rotation in the frame", ), ParameterInfo( "estimated_cor", category="estimate_center_of_rotation", doc="Pixel value of the CoR in the frame (relative, filled automatically)", ), ParameterInfo( "slice_index", category="slice_reconstruction_parameters", doc="Index of the slice that will be reconstructed online", ), ParameterInfo( "phase_retrieval_method", category="slice_reconstruction_parameters", doc="Phase retrieval method or 'None'", ), ParameterInfo( "delta_beta", category="slice_reconstruction_parameters", doc="For Paganin or CTF phase retrieval, default is 100", ), ParameterInfo( "volume_reconstruction", category="volume_reconstruction_parameters", doc="If True, performs volume reconstruction after slice reconstruction", ), ParameterInfo( "slice_index_range", category="volume_reconstruction_parameters", doc="Range of slice indices to reconstruct the volume, (z_bottom, z_top) or 'all'", ), ParameterInfo( "output_format", category="volume_reconstruction_parameters", doc="Output format for the volume reconstruction: 'hdf5', 'tiff', 'jp2', 'edf' or 'vol'", ), ParameterInfo( "show_last_slice", category="flint_display_parameters", doc="If True, displays the last reconstructed slice in Flint", ), ], ): def __init__( self, config: Optional[Dict[str, Any]] = None, defaults: Optional[Dict[str, Any]] = None, ) -> None: """ Initialize the TomoProcessor for converting BLISS HDF5 data to Nexus (NX) format. This processor can be integrated into a BLISS beamline configuration. Typical usage involves adding the following to a configuration yaml file: - name: tomo_blissoda plugin: generic class: TomoProcessor package: blissoda.tomo.tomo_processor """ if defaults is None: defaults = {} defaults.setdefault("trigger_at", "END") defaults.setdefault( "slice_reconstruction_workflow", "slice_reconstruction.json" ) defaults.setdefault( "volume_reconstruction_workflow", "volume_reconstruction.json" ) defaults.setdefault("nabu_config_file", resource_filename("tomo", "nabu.conf")) defaults.setdefault("_reference_dir_to_soft_link", None) defaults.setdefault("slice_index", "middle") defaults.setdefault("cor_algorithm", "sliding-window") defaults.setdefault("phase_retrieval_method", "None") defaults.setdefault("delta_beta", "100") defaults.setdefault("offset_mm", 0.0) defaults.setdefault("estimated_cor", 0.0) defaults.setdefault("volume_reconstruction", False) defaults.setdefault("slice_index_range", "all") defaults.setdefault("output_format", "tiff") defaults.setdefault("show_last_slice", False) super().__setattr__("_tomo_model", TomoProcessorModel(**defaults)) self.imshow = SingleSliceImshow(history=1) super().__init__(config=config, defaults=defaults) def __setattr__(self, name, value): if hasattr(self, "_tomo_model") and hasattr(self._tomo_model, name): setattr(self._tomo_model, name, value) value = getattr(self._tomo_model, name) super().__setattr__(name, value) def __info__(self) -> str: self.estimated_cor = self.estimate_CoR() categories = self._info_categories() for category in categories.values(): for key, info in list(category.items()): if key in dir(self) and isinstance(info, ParameterValue): category[key] = ParameterValue(getattr(self, key), info.doc) # Does not display delta_beta if phase_retrieval_method is None parameters = categories.get("slice_reconstruction_parameters") if parameters is not None: method = parameters.get("phase_retrieval_method") if method is not None and method.value == "None": parameters.pop("delta_beta", None) # Does not display volume reconstruction parameters if volume_reconstruction is False volume_params = categories.get("volume_reconstruction_parameters") if volume_params is not None: volume_reconstruction = volume_params.get("volume_reconstruction") if volume_reconstruction is not None and not volume_reconstruction.value: volume_params.pop("slice_index_range", None) volume_params.pop("output_format", None) return "\n" + "\n\n".join( [ f"{name.replace('_', ' ').title()}:\n {_format_info_category(category)}" for name, category in categories.items() if category ] )
[docs] def load_nabu_config_file(self, nabu_file, allow_no_value=False) -> Dict[str, Any]: """ Parse a configuration file and returns a dictionary. """ if nabu_file and Path(nabu_file).exists(): parser = ConfigParser( inline_comment_prefixes=("#",), allow_no_value=allow_no_value, ) with open(nabu_file) as fid: file_content = fid.read() parser.read_string(file_content) nabu_dict = parser._sections else: nabu_dict = dict() # Ensure nested dictionaries exist nabu_dict.setdefault("dataset", {}) nabu_dict.setdefault("reconstruction", {}) darks_flats_dir = Path(self._output_path).parent.parent / "references" nabu_dict["dataset"]["darks_flats_dir"] = str(darks_flats_dir) nabu_dict["reconstruction"]["rotation_axis_position"] = self.cor_algorithm current_cor_options = nabu_dict["reconstruction"].get("cor_options") nabu_dict["reconstruction"]["cor_options"] = self._update_cor_options( current_cor_options, self.estimated_cor ) nabu_dict.setdefault("phase", {}) nabu_dict["phase"]["method"] = self.phase_retrieval_method nabu_dict["phase"]["delta_beta"] = self.delta_beta nabu_dict.setdefault("output", {}) return nabu_dict
[docs] def edit_nabu_config(self): """ Open an interactive Bliss dialog to edit the nabu configuration file. """ nabu_config_setup(self.nabu_config_file)
def _update_cor_options( self, existing_options: Optional[str], side_value: Any ) -> str: """Replace the 'side' option and keep any others as-is.""" if not existing_options: return f"side={side_value}" options = [opt.strip() for opt in existing_options.split(";") if opt.strip()] options = [opt for opt in options if not opt.startswith("side")] return "; ".join([f"side={side_value}", *options])
[docs] def estimate_CoR(self) -> float: """ Estimate the relative center of rotation based on scan metadata and processor parameters. """ pixel_size_mm, _, translation_y_mm = get_estimate_cor_metadata() center_of_rotation = calculate_relative_CoR_estimate( pixel_size_mm=pixel_size_mm, translation_y_mm=translation_y_mm, offset_mm=self.offset_mm, ) return center_of_rotation
def _get_scan_parameters(self, scan: BlissScanType) -> Dict[str, Any]: scan_parameters = dict() for key in list(scan.scan_info.keys()): scan_parameters[key] = scan.scan_info[key] return scan_parameters def _build_output_path(self, bliss_path: str) -> str: """ Build the Nx output path under PROCESSED_DATA/sample/sample_dataset/projections. """ processed_path = bliss_path.replace("RAW_DATA", "PROCESSED_DATA") nx_path = Path(processed_path).with_suffix(".nx") projections_dir = nx_path.parent / "projections" return str(projections_dir / nx_path.name) def _build_darks_flats_dir_path(self, _output_path: str, scan) -> None: """ Build the references output path under PROCESSED_DATA/sample/sample_dataset/references if the scan contains the required dark and flat images. """ if self._has_darks_and_flats(scan): darks_flats_dir = Path(_output_path).parent.parent / "references" # This is kept and only used for the ReduceDarkFlat that will create soft link self._reference_dir_to_soft_link = str(darks_flats_dir) elif self._reference_dir_to_soft_link is None: raise ValueError("Darks and flats directory cannot be determined") def _has_darks_and_flats(self, scan: BlissScanType) -> bool: """ Check if the scan contains at least one dark and one flat, regardless of their position. """ scan_parameters = self._get_scan_parameters(scan) scan_flags = scan_parameters.get("technique", {}).get("scan_flags", {}) has_dark_images = any( bool(scan_flags.get(key, False)) for key in ("dark_images_at_start", "dark_images_at_end") ) has_flat_images = any( bool(scan_flags.get(key, False)) for key in ("ref_images_at_start", "ref_images_at_end") ) return has_dark_images and has_flat_images def _cor_algorithm_for_scan_range(self, scan_range: Any) -> Any: cor_algorithm = self.cor_algorithm if ( cor_algorithm in _COR_ALGORITHMS_REQUIRING_360 and scan_range is not None and abs(float(scan_range)) < 360.0 ): _logger.info( "CoR algorithm %r requires a 360 degree scan, but scan range is " "%s degrees. Using 'sliding-window' instead.", cor_algorithm, scan_range, ) return "sliding-window" return cor_algorithm def _get_inputs(self, scan: BlissScanType) -> List[Dict[str, Any]]: scan_parameters = self._get_scan_parameters(scan) self.estimated_cor = self.estimate_CoR() self._bliss_hdf5_path = scan_parameters["filename"] self._output_path = self._build_output_path(self._bliss_hdf5_path) self._build_darks_flats_dir_path(self._output_path, scan) slice_index_range = ( None if self.slice_index_range == "all" else self.slice_index_range ) scan_range = ( scan_parameters.get("technique", {}).get("scan", {}).get("scan_range") ) cor_algorithm = self._cor_algorithm_for_scan_range(scan_range) inputs = list() inputs.append( { "task_identifier": "ewokstomo.tasks.nxtomomill.H5ToNx", "name": "bliss_hdf5_path", "value": self._bliss_hdf5_path, } ) inputs.append( { "task_identifier": "ewokstomo.tasks.nxtomomill.H5ToNx", "name": "nx_path", "value": self._output_path, } ) inputs.append( { "task_identifier": "ewokstomo.tasks.reconstruct_slice.ReconstructSlice", "name": "nx_path", "value": self._output_path, } ) if not self._has_darks_and_flats(scan): inputs.append( { "task_identifier": "ewokstomo.tasks.reducedarkflat.ReduceDarkFlat", "name": "reference_dir_to_soft_link", "value": self._reference_dir_to_soft_link, } ) nabu_dict = self.load_nabu_config_file(self.nabu_config_file) nabu_dict["reconstruction"]["rotation_axis_position"] = cor_algorithm inputs.append( { "task_identifier": "ewokstomo.tasks.reconstruct_slice.ReconstructSlice", "name": "config_dict", "value": nabu_dict, } ) inputs.append( { "task_identifier": "ewokstomo.tasks.reconstruct_slice.ReconstructSlice", "name": "slice_index", "value": self.slice_index, } ) if self.volume_reconstruction: inputs.append( { "task_identifier": "ewokstomo.tasks.reconstruct_volume.ReconstructVolume", "name": "nx_path", "value": self._output_path, } ) inputs.append( { "task_identifier": "ewokstomo.tasks.reconstruct_volume.ReconstructVolume", "name": "slice_index_range", "value": slice_index_range, } ) volume_nabu_dict = self.load_nabu_config_file(self.nabu_config_file) volume_nabu_dict["reconstruction"]["rotation_axis_position"] = cor_algorithm volume_nabu_dict["output"]["file_format"] = "hdf5" inputs.append( { "task_identifier": "ewokstomo.tasks.reconstruct_volume.ReconstructVolume", "name": "config_dict", "value": volume_nabu_dict, } ) inputs.append( { "task_identifier": "ewokstomo.tasks.convert_volume.ConvertVolumeTo16Bit", "name": "output_format", "value": self.output_format, } ) inputs.append( { "id": "dataportal_task_volumes", "task_identifier": "ewokstomo.tasks.dataportalupload.DataPortalUpload", "name": "dataset", "value": "volumes", } ) inputs.append( { "id": "dataportal_task_projections", "task_identifier": "ewokstomo.tasks.dataportalupload.DataPortalUpload", "name": "dataset", "value": "projections", } ) inputs.append( { "id": "dataportal_task_slices", "task_identifier": "ewokstomo.tasks.dataportalupload.DataPortalUpload", "name": "dataset", "value": "slices", } ) return inputs def _get_workflows_dir(self, dataset_filename: str) -> Path: processed_dir = Path(directories.get_dataset_processed_dir(dataset_filename)) return processed_dir / "workflows" / "gallery" def _get_workflow_upload_parameters( self, scan: BlissScanType ) -> Optional[Dict[str, Any]]: if not scan.scan_info.get("save"): return None scan_saving = current_session.scan_saving filename = scan.scan_info.get("filename") or scan_saving.filename metadata = {"Sample_name": scan_saving.dataset["Sample_name"]} workflows_dir = self._get_workflows_dir(filename) raw_directory = str(Path(filename).parent) return { "beamline": scan_saving.beamline, "proposal": scan_saving.proposal_name, "dataset": "workflows", "path": str(workflows_dir), "raw": [raw_directory], "metadata": metadata, } def _get_workflow(self): if self.volume_reconstruction: workflow = self.volume_reconstruction_workflow else: workflow = self.slice_reconstruction_workflow with open(resource_filename("tomo", workflow), "r") as wf: return json.load(wf) def _get_submit_arguments(self, scan) -> Dict[str, Any]: inputs = self._get_inputs(scan) kwargs = { "inputs": inputs, "outputs": [{"all": False}], "engine": "dask", "scheduler": "multithreading", } upload_parameters = self._get_workflow_upload_parameters(scan) if upload_parameters: kwargs["upload_parameters"] = upload_parameters return kwargs
[docs] def workflow_destination(self) -> str: """ Returns the destination path for the workflow output. """ workflows_dir = self._get_workflows_dir(self._bliss_hdf5_path) filename = Path(self._output_path).with_suffix(".json").name return str(workflows_dir / filename)
[docs] def execute_workflow(self, scan: BlissScanType) -> None: if ( "tomoconfig" not in scan.scan_info.get("technique", "") or scan.scan_info["title"] not in SUPPORTED_TOMO_SCANS ): return workflow = self._get_workflow() kwargs = self._get_submit_arguments(scan) kwargs["convert_destination"] = self.workflow_destination() time.sleep(2) submit(args=(workflow,), kwargs=kwargs, queue=self.queue) if self.show_last_slice: self.imshow._spawn(self.imshow.monitor_and_display_slice, self._output_path)
def _trigger_workflow_on_new_scan(self, scan: BlissScanType) -> None: self.execute_workflow(scan)