from __future__ import annotations
import json
import logging
import time
from configparser import ConfigParser
from pathlib import Path
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from ..bliss_globals import current_session
from ..ewoks_utils import submit
from ..persistent.parameters import ParameterInfo
from ..persistent.parameters import ParameterValue
from ..persistent.parameters import _format_info_category
from ..processor import BaseProcessor
from ..processor import BlissScanType
from ..resources import resource_filename
from ..utils import directories
from .flint_tomo_imshow import SingleSliceImshow
from .nabu_config import nabu_config_setup
from .tomo_model import TomoProcessorModel
from .utils import calculate_relative_CoR_estimate
from .utils import get_estimate_cor_metadata
SUPPORTED_TOMO_SCANS = ["tomo:basic", "tomo:zseries", "tomo:fullturn", "tomo:halfturn"]
# The following scan types are not yet supported:
# tomo:helical, tomo:zhelical, tomo:ptychotomo, tomo:multitomo , tomo:holotomo
_logger = logging.getLogger(__name__)
_COR_ALGORITHMS_REQUIRING_360 = {
"composite-coarse-to-fine",
"sino-coarse-to-fine",
}
[docs]
class TomoProcessor(
BaseProcessor,
parameters=[
ParameterInfo(
"slice_reconstruction_workflow",
category="workflows",
doc="Workflow file for slice reconstruction",
),
ParameterInfo(
"volume_reconstruction_workflow",
category="workflows",
doc="Workflow file for volume reconstruction",
),
ParameterInfo("queue", category="workflows"),
ParameterInfo(
"_bliss_hdf5_path",
category="files",
doc="HDF5 Dataset path (filled automatically)",
),
ParameterInfo(
"_output_path",
category="files",
doc="Nx output path (filled automatically)",
),
ParameterInfo(
"nabu_config_file",
category="slice_reconstruction_parameters",
doc="Nabu configuration file.\nIt can be edited using the tomo_processor.edit_nabu_config() command.",
),
ParameterInfo(
"_reference_dir_to_soft_link",
category="files",
doc="Directory from which the reduced darks and flats will be linked.",
),
ParameterInfo(
"offset_mm",
category="estimate_center_of_rotation",
doc="Offset (mm) subtracted from the translation_y motor position",
),
ParameterInfo(
"cor_algorithm",
category="estimate_center_of_rotation",
doc="Method to estimate the centre of rotation in the frame",
),
ParameterInfo(
"estimated_cor",
category="estimate_center_of_rotation",
doc="Pixel value of the CoR in the frame (relative, filled automatically)",
),
ParameterInfo(
"slice_index",
category="slice_reconstruction_parameters",
doc="Index of the slice that will be reconstructed online",
),
ParameterInfo(
"phase_retrieval_method",
category="slice_reconstruction_parameters",
doc="Phase retrieval method or 'None'",
),
ParameterInfo(
"delta_beta",
category="slice_reconstruction_parameters",
doc="For Paganin or CTF phase retrieval, default is 100",
),
ParameterInfo(
"volume_reconstruction",
category="volume_reconstruction_parameters",
doc="If True, performs volume reconstruction after slice reconstruction",
),
ParameterInfo(
"slice_index_range",
category="volume_reconstruction_parameters",
doc="Range of slice indices to reconstruct the volume, (z_bottom, z_top) or 'all'",
),
ParameterInfo(
"output_format",
category="volume_reconstruction_parameters",
doc="Output format for the volume reconstruction: 'hdf5', 'tiff', 'jp2', 'edf' or 'vol'",
),
ParameterInfo(
"show_last_slice",
category="flint_display_parameters",
doc="If True, displays the last reconstructed slice in Flint",
),
],
):
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
defaults: Optional[Dict[str, Any]] = None,
) -> None:
"""
Initialize the TomoProcessor for converting BLISS HDF5 data to Nexus (NX) format.
This processor can be integrated into a BLISS beamline configuration. Typical
usage involves adding the following to a configuration yaml file:
- name: tomo_blissoda
plugin: generic
class: TomoProcessor
package: blissoda.tomo.tomo_processor
"""
if defaults is None:
defaults = {}
defaults.setdefault("trigger_at", "END")
defaults.setdefault(
"slice_reconstruction_workflow", "slice_reconstruction.json"
)
defaults.setdefault(
"volume_reconstruction_workflow", "volume_reconstruction.json"
)
defaults.setdefault("nabu_config_file", resource_filename("tomo", "nabu.conf"))
defaults.setdefault("_reference_dir_to_soft_link", None)
defaults.setdefault("slice_index", "middle")
defaults.setdefault("cor_algorithm", "sliding-window")
defaults.setdefault("phase_retrieval_method", "None")
defaults.setdefault("delta_beta", "100")
defaults.setdefault("offset_mm", 0.0)
defaults.setdefault("estimated_cor", 0.0)
defaults.setdefault("volume_reconstruction", False)
defaults.setdefault("slice_index_range", "all")
defaults.setdefault("output_format", "tiff")
defaults.setdefault("show_last_slice", False)
super().__setattr__("_tomo_model", TomoProcessorModel(**defaults))
self.imshow = SingleSliceImshow(history=1)
super().__init__(config=config, defaults=defaults)
def __setattr__(self, name, value):
if hasattr(self, "_tomo_model") and hasattr(self._tomo_model, name):
setattr(self._tomo_model, name, value)
value = getattr(self._tomo_model, name)
super().__setattr__(name, value)
def __info__(self) -> str:
self.estimated_cor = self.estimate_CoR()
categories = self._info_categories()
for category in categories.values():
for key, info in list(category.items()):
if key in dir(self) and isinstance(info, ParameterValue):
category[key] = ParameterValue(getattr(self, key), info.doc)
# Does not display delta_beta if phase_retrieval_method is None
parameters = categories.get("slice_reconstruction_parameters")
if parameters is not None:
method = parameters.get("phase_retrieval_method")
if method is not None and method.value == "None":
parameters.pop("delta_beta", None)
# Does not display volume reconstruction parameters if volume_reconstruction is False
volume_params = categories.get("volume_reconstruction_parameters")
if volume_params is not None:
volume_reconstruction = volume_params.get("volume_reconstruction")
if volume_reconstruction is not None and not volume_reconstruction.value:
volume_params.pop("slice_index_range", None)
volume_params.pop("output_format", None)
return "\n" + "\n\n".join(
[
f"{name.replace('_', ' ').title()}:\n {_format_info_category(category)}"
for name, category in categories.items()
if category
]
)
[docs]
def load_nabu_config_file(self, nabu_file, allow_no_value=False) -> Dict[str, Any]:
"""
Parse a configuration file and returns a dictionary.
"""
if nabu_file and Path(nabu_file).exists():
parser = ConfigParser(
inline_comment_prefixes=("#",),
allow_no_value=allow_no_value,
)
with open(nabu_file) as fid:
file_content = fid.read()
parser.read_string(file_content)
nabu_dict = parser._sections
else:
nabu_dict = dict()
# Ensure nested dictionaries exist
nabu_dict.setdefault("dataset", {})
nabu_dict.setdefault("reconstruction", {})
darks_flats_dir = Path(self._output_path).parent.parent / "references"
nabu_dict["dataset"]["darks_flats_dir"] = str(darks_flats_dir)
nabu_dict["reconstruction"]["rotation_axis_position"] = self.cor_algorithm
current_cor_options = nabu_dict["reconstruction"].get("cor_options")
nabu_dict["reconstruction"]["cor_options"] = self._update_cor_options(
current_cor_options, self.estimated_cor
)
nabu_dict.setdefault("phase", {})
nabu_dict["phase"]["method"] = self.phase_retrieval_method
nabu_dict["phase"]["delta_beta"] = self.delta_beta
nabu_dict.setdefault("output", {})
return nabu_dict
[docs]
def edit_nabu_config(self):
"""
Open an interactive Bliss dialog to edit the nabu configuration file.
"""
nabu_config_setup(self.nabu_config_file)
def _update_cor_options(
self, existing_options: Optional[str], side_value: Any
) -> str:
"""Replace the 'side' option and keep any others as-is."""
if not existing_options:
return f"side={side_value}"
options = [opt.strip() for opt in existing_options.split(";") if opt.strip()]
options = [opt for opt in options if not opt.startswith("side")]
return "; ".join([f"side={side_value}", *options])
[docs]
def estimate_CoR(self) -> float:
"""
Estimate the relative center of rotation based on scan metadata and processor parameters.
"""
pixel_size_mm, _, translation_y_mm = get_estimate_cor_metadata()
center_of_rotation = calculate_relative_CoR_estimate(
pixel_size_mm=pixel_size_mm,
translation_y_mm=translation_y_mm,
offset_mm=self.offset_mm,
)
return center_of_rotation
def _get_scan_parameters(self, scan: BlissScanType) -> Dict[str, Any]:
scan_parameters = dict()
for key in list(scan.scan_info.keys()):
scan_parameters[key] = scan.scan_info[key]
return scan_parameters
def _build_output_path(self, bliss_path: str) -> str:
"""
Build the Nx output path under PROCESSED_DATA/sample/sample_dataset/projections.
"""
processed_path = bliss_path.replace("RAW_DATA", "PROCESSED_DATA")
nx_path = Path(processed_path).with_suffix(".nx")
projections_dir = nx_path.parent / "projections"
return str(projections_dir / nx_path.name)
def _build_darks_flats_dir_path(self, _output_path: str, scan) -> None:
"""
Build the references output path under PROCESSED_DATA/sample/sample_dataset/references if
the scan contains the required dark and flat images.
"""
if self._has_darks_and_flats(scan):
darks_flats_dir = Path(_output_path).parent.parent / "references"
# This is kept and only used for the ReduceDarkFlat that will create soft link
self._reference_dir_to_soft_link = str(darks_flats_dir)
elif self._reference_dir_to_soft_link is None:
raise ValueError("Darks and flats directory cannot be determined")
def _has_darks_and_flats(self, scan: BlissScanType) -> bool:
"""
Check if the scan contains at least one dark and one flat, regardless of their position.
"""
scan_parameters = self._get_scan_parameters(scan)
scan_flags = scan_parameters.get("technique", {}).get("scan_flags", {})
has_dark_images = any(
bool(scan_flags.get(key, False))
for key in ("dark_images_at_start", "dark_images_at_end")
)
has_flat_images = any(
bool(scan_flags.get(key, False))
for key in ("ref_images_at_start", "ref_images_at_end")
)
return has_dark_images and has_flat_images
def _cor_algorithm_for_scan_range(self, scan_range: Any) -> Any:
cor_algorithm = self.cor_algorithm
if (
cor_algorithm in _COR_ALGORITHMS_REQUIRING_360
and scan_range is not None
and abs(float(scan_range)) < 360.0
):
_logger.info(
"CoR algorithm %r requires a 360 degree scan, but scan range is "
"%s degrees. Using 'sliding-window' instead.",
cor_algorithm,
scan_range,
)
return "sliding-window"
return cor_algorithm
def _get_inputs(self, scan: BlissScanType) -> List[Dict[str, Any]]:
scan_parameters = self._get_scan_parameters(scan)
self.estimated_cor = self.estimate_CoR()
self._bliss_hdf5_path = scan_parameters["filename"]
self._output_path = self._build_output_path(self._bliss_hdf5_path)
self._build_darks_flats_dir_path(self._output_path, scan)
slice_index_range = (
None if self.slice_index_range == "all" else self.slice_index_range
)
scan_range = (
scan_parameters.get("technique", {}).get("scan", {}).get("scan_range")
)
cor_algorithm = self._cor_algorithm_for_scan_range(scan_range)
inputs = list()
inputs.append(
{
"task_identifier": "ewokstomo.tasks.nxtomomill.H5ToNx",
"name": "bliss_hdf5_path",
"value": self._bliss_hdf5_path,
}
)
inputs.append(
{
"task_identifier": "ewokstomo.tasks.nxtomomill.H5ToNx",
"name": "nx_path",
"value": self._output_path,
}
)
inputs.append(
{
"task_identifier": "ewokstomo.tasks.reconstruct_slice.ReconstructSlice",
"name": "nx_path",
"value": self._output_path,
}
)
if not self._has_darks_and_flats(scan):
inputs.append(
{
"task_identifier": "ewokstomo.tasks.reducedarkflat.ReduceDarkFlat",
"name": "reference_dir_to_soft_link",
"value": self._reference_dir_to_soft_link,
}
)
nabu_dict = self.load_nabu_config_file(self.nabu_config_file)
nabu_dict["reconstruction"]["rotation_axis_position"] = cor_algorithm
inputs.append(
{
"task_identifier": "ewokstomo.tasks.reconstruct_slice.ReconstructSlice",
"name": "config_dict",
"value": nabu_dict,
}
)
inputs.append(
{
"task_identifier": "ewokstomo.tasks.reconstruct_slice.ReconstructSlice",
"name": "slice_index",
"value": self.slice_index,
}
)
if self.volume_reconstruction:
inputs.append(
{
"task_identifier": "ewokstomo.tasks.reconstruct_volume.ReconstructVolume",
"name": "nx_path",
"value": self._output_path,
}
)
inputs.append(
{
"task_identifier": "ewokstomo.tasks.reconstruct_volume.ReconstructVolume",
"name": "slice_index_range",
"value": slice_index_range,
}
)
volume_nabu_dict = self.load_nabu_config_file(self.nabu_config_file)
volume_nabu_dict["reconstruction"]["rotation_axis_position"] = cor_algorithm
volume_nabu_dict["output"]["file_format"] = "hdf5"
inputs.append(
{
"task_identifier": "ewokstomo.tasks.reconstruct_volume.ReconstructVolume",
"name": "config_dict",
"value": volume_nabu_dict,
}
)
inputs.append(
{
"task_identifier": "ewokstomo.tasks.convert_volume.ConvertVolumeTo16Bit",
"name": "output_format",
"value": self.output_format,
}
)
inputs.append(
{
"id": "dataportal_task_volumes",
"task_identifier": "ewokstomo.tasks.dataportalupload.DataPortalUpload",
"name": "dataset",
"value": "volumes",
}
)
inputs.append(
{
"id": "dataportal_task_projections",
"task_identifier": "ewokstomo.tasks.dataportalupload.DataPortalUpload",
"name": "dataset",
"value": "projections",
}
)
inputs.append(
{
"id": "dataportal_task_slices",
"task_identifier": "ewokstomo.tasks.dataportalupload.DataPortalUpload",
"name": "dataset",
"value": "slices",
}
)
return inputs
def _get_workflows_dir(self, dataset_filename: str) -> Path:
processed_dir = Path(directories.get_dataset_processed_dir(dataset_filename))
return processed_dir / "workflows" / "gallery"
def _get_workflow_upload_parameters(
self, scan: BlissScanType
) -> Optional[Dict[str, Any]]:
if not scan.scan_info.get("save"):
return None
scan_saving = current_session.scan_saving
filename = scan.scan_info.get("filename") or scan_saving.filename
metadata = {"Sample_name": scan_saving.dataset["Sample_name"]}
workflows_dir = self._get_workflows_dir(filename)
raw_directory = str(Path(filename).parent)
return {
"beamline": scan_saving.beamline,
"proposal": scan_saving.proposal_name,
"dataset": "workflows",
"path": str(workflows_dir),
"raw": [raw_directory],
"metadata": metadata,
}
def _get_workflow(self):
if self.volume_reconstruction:
workflow = self.volume_reconstruction_workflow
else:
workflow = self.slice_reconstruction_workflow
with open(resource_filename("tomo", workflow), "r") as wf:
return json.load(wf)
def _get_submit_arguments(self, scan) -> Dict[str, Any]:
inputs = self._get_inputs(scan)
kwargs = {
"inputs": inputs,
"outputs": [{"all": False}],
"engine": "dask",
"scheduler": "multithreading",
}
upload_parameters = self._get_workflow_upload_parameters(scan)
if upload_parameters:
kwargs["upload_parameters"] = upload_parameters
return kwargs
[docs]
def workflow_destination(self) -> str:
"""
Returns the destination path for the workflow output.
"""
workflows_dir = self._get_workflows_dir(self._bliss_hdf5_path)
filename = Path(self._output_path).with_suffix(".json").name
return str(workflows_dir / filename)
[docs]
def execute_workflow(self, scan: BlissScanType) -> None:
if (
"tomoconfig" not in scan.scan_info.get("technique", "")
or scan.scan_info["title"] not in SUPPORTED_TOMO_SCANS
):
return
workflow = self._get_workflow()
kwargs = self._get_submit_arguments(scan)
kwargs["convert_destination"] = self.workflow_destination()
time.sleep(2)
submit(args=(workflow,), kwargs=kwargs, queue=self.queue)
if self.show_last_slice:
self.imshow._spawn(self.imshow.monitor_and_display_slice, self._output_path)
def _trigger_workflow_on_new_scan(self, scan: BlissScanType) -> None:
self.execute_workflow(scan)