Source code for blissoda.tomo.nabu_config

import configparser
import copy
import os
from pathlib import Path

try:
    from bliss.shell.cli.pt_widgets import BlissDialog
    from bliss.shell.cli.user_dialog import Container
    from bliss.shell.cli.user_dialog import UserCheckBox
    from bliss.shell.cli.user_dialog import UserChoice
    from bliss.shell.cli.user_dialog import UserFloatInput
    from bliss.shell.cli.user_dialog import UserInput
    from bliss.shell.cli.user_dialog import UserIntInput
    from tomo.helpers.select_dialog2 import select_dialog2
except ImportError as ex:
    from ..import_utils import unavailable_class
    from ..import_utils import unavailable_function

    BlissDialog = unavailable_class(ex)
    Container = unavailable_class(ex)
    UserCheckBox = unavailable_class(ex)
    UserChoice = unavailable_class(ex)
    UserFloatInput = unavailable_class(ex)
    UserInput = unavailable_class(ex)
    UserIntInput = unavailable_class(ex)
    select_dialog2 = unavailable_function(ex)

NABU_DEFAULT_CONFIG = {
    "dataset": {
        "location": "",
        "darks_flats_dir": "",
    },
    "preproc": {
        "flatfield": "1",
        "flatfield_loading_mode": "load_if_present",
        "ccd_filter_enabled": "0",
        "ccd_filter_threshold": "0.04",
        "double_flatfield": "0",
        "take_logarithm": "1",
        "sino_rings_correction": "",
    },
    "phase": {
        "method": "none",
        "delta_beta": "100.0",
        "unsharp_coeff": "0",
        "unsharp_sigma": "0",
        "unsharp_method": "gaussian",
        "ctf_geometry": "z1_v=None; z1_h=None; detec_pixel_size=None; magnification=True",
    },
    "reconstruction": {
        "method": "FBP",
        "angles_file": "",
        "rotation_axis_position": "sliding-window",
        "padding_type": "edges",
        "enable_halftomo": "auto",
        "clip_outer_circle": "0",
        "outer_circle_value": "0",
        "centered_axis": "1",
        "start_x": "0",
        "end_x": "-1",
        "start_y": "0",
        "end_y": "-1",
        "start_z": "0",
        "end_z": "-1",
    },
    "output": {
        "location": "",
        "file_prefix": "",
        "file_format": "hdf5",
        "overwrite_results": "1",
    },
    "postproc": {
        "output_histogram": "1",
    },
    "resources": {
        "method": "local",
    },
    "pipeline": {
        "save_steps": "",
        "resume_from_step": "",
        "ignore_checkpoint_config": "0",
        "verbosity": "2",
    },
}


def _bool_val(value: str) -> bool:
    """Convert a nabu boolean string to a Python bool."""
    return str(value).strip().lower() in ("1", "true", "yes")


def _int_or_default(value, default: int = 0) -> int:
    try:
        return int(value)
    except (TypeError, ValueError):
        return default


def _float_or_default(value, default: float = 0.0) -> float:
    try:
        return float(value)
    except (TypeError, ValueError):
        return default


def _load_config(path: str) -> dict:
    cfg = configparser.ConfigParser(
        comment_prefixes=("#", ";"),
        inline_comment_prefixes=("#",),
        interpolation=None,
        allow_no_value=True,
    )
    cfg.optionxform = str  # preserve key case
    cfg.read(path)
    return cfg._sections


def _parse_options(opt_str: str) -> dict:
    """Parse a semicolon-separated key=value string into a dict."""
    res = {}
    if not opt_str:
        return res
    for item in opt_str.split(";"):
        if "=" in item:
            key, val = item.split("=", 1)
            res[key.strip()] = val.strip()
    return res


def _format_options(opt_dict: dict) -> str:
    """Format a dict into a semicolon-separated key=value string."""
    return "; ".join(f"{k} = {v}" for k, v in opt_dict.items())


def _save_config(cfg: dict, path: str) -> None:
    parser = configparser.ConfigParser(
        comment_prefixes=("#", ";"),
        inline_comment_prefixes=("#",),
        interpolation=None,
        allow_no_value=True,
    )
    parser.optionxform = str
    parser.read_dict(cfg)
    with open(path, "w") as fh:
        parser.write(fh)


def _merge_defaults(cfg: dict) -> dict:
    """Return a copy of *cfg* with every key from NABU_DEFAULT_CONFIG filled in
    wherever it is absent.  Values already present in *cfg* are never touched,
    so user customisations are always preserved."""
    merged = copy.deepcopy(NABU_DEFAULT_CONFIG)
    for section, keys in cfg.items():
        if section not in merged:
            merged[section] = {}
        merged[section].update(keys)
    return merged


def _get(cfg, section, key, fallback=""):
    val = cfg.get(section, {}).get(key, fallback)
    return fallback if val is None else val


def _preproc_dialog(cfg: dict, submenu: bool = False) -> bool:
    if "dataset" not in cfg:
        cfg["dataset"] = {}
    sec = "preproc"

    # flatfield – can be 0/1/pca; represent as choice
    ff_options = [(0, "Disabled"), (1, "Enabled"), (2, "PCA")]
    ff_raw = str(_get(cfg, sec, "flatfield", "1")).lower()
    if ff_raw == "pca":
        ff_defval = 2
    elif _bool_val(ff_raw):
        ff_defval = 1
    else:
        ff_defval = 0
    dlg_flatfield = UserChoice(
        label="Flat-field normalization",
        values=ff_options,
        defval=ff_defval,
    )

    fl_modes = [
        (0, "Load_if_present"),
        (1, "Force-load"),
        (2, "Force-compute"),
    ]
    fl_raw = str(_get(cfg, sec, "flatfield_loading_mode", "Load_if_present")).lower()
    fl_map = {v.lower(): i for i, (_, v) in enumerate(fl_modes)}
    dlg_fl_mode = UserChoice(
        label="Flat-field loading mode",
        values=fl_modes,
        defval=fl_map.get(fl_raw, 0),
    )

    dlg_ccd_filter = UserCheckBox(
        name="ccd_filter_enabled",
        label="CCD hotspot correction enabled",
        defval=_bool_val(_get(cfg, sec, "ccd_filter_enabled", "0")),
    )
    dlg_ccd_thresh = UserFloatInput(
        label="CCD filter threshold",
        defval=_float_or_default(_get(cfg, sec, "ccd_filter_threshold"), 0.04),
    )

    # double_flatfield: 0/1/force-load/force-compute
    dff_options = [
        (0, "Disabled"),
        (1, "Enabled"),
        (2, "Force-load"),
        (3, "Force-compute"),
    ]
    dff_raw = str(_get(cfg, sec, "double_flatfield", "0")).lower()
    if dff_raw == "force-load":
        dff_def = 2
    elif dff_raw == "force-compute":
        dff_def = 3
    elif _bool_val(dff_raw):
        dff_def = 1
    else:
        dff_def = 0
    dlg_dff = UserChoice(
        label="Double flat-field",
        values=dff_options,
        defval=dff_def,
    )

    dlg_log = UserCheckBox(
        name="take_logarithm",
        label="Take logarithm after flat-field",
        defval=_bool_val(_get(cfg, sec, "take_logarithm", "1")),
    )

    rings_options = [
        (0, "None"),
        (1, "Munch"),
        (2, "Vo"),
        (3, "Wavelets-median"),
        (4, "Mean-subtraction"),
        (5, "Mean-division"),
    ]
    rings_raw = (
        str(_get(cfg, sec, "sino_rings_correction", "")).strip().lower() or "none"
    )
    rings_map = {v.lower(): i for i, (_, v) in enumerate(rings_options)}
    dlg_rings = UserChoice(
        label="Sinogram rings correction",
        values=rings_options,
        defval=rings_map.get(rings_raw, 0),
    )

    ct1 = Container(
        [dlg_flatfield, dlg_fl_mode, dlg_dff, dlg_log], title="Flat-field & Logarithm"
    )
    ct2 = Container([dlg_ccd_filter, dlg_ccd_thresh], title="CCD Filter")
    ct3 = Container([dlg_rings], title="Rings Correction")

    cancel_text = "Back" if submenu else "Cancel"
    ret = BlissDialog(
        [[ct1], [ct2], [ct3]], title="Pre-processing Setup", cancel_text=cancel_text
    ).show()
    if ret is False:
        return False

    # --- write back ---

    ff_value = ff_options[ret[dlg_flatfield]][1].lower()
    cfg[sec]["flatfield"] = (
        "pca" if ff_value == "pca" else ("1" if ff_value == "enabled" else "0")
    )

    cfg[sec]["flatfield_loading_mode"] = fl_modes[ret[dlg_fl_mode]][1].lower()

    cfg[sec]["ccd_filter_enabled"] = "1" if ret[dlg_ccd_filter] else "0"
    cfg[sec]["ccd_filter_threshold"] = str(ret[dlg_ccd_thresh])

    dff_value = dff_options[ret[dlg_dff]][1].lower()
    cfg[sec]["double_flatfield"] = (
        dff_value
        if dff_value in ("force-load", "force-compute")
        else ("1" if dff_value == "enabled" else "0")
    )

    cfg[sec]["take_logarithm"] = "1" if ret[dlg_log] else "0"

    rings_value = rings_options[ret[dlg_rings]][1].lower()
    cfg[sec]["sino_rings_correction"] = "" if rings_value == "none" else rings_value
    return True


def _phase_dialog(cfg: dict, submenu: bool = False) -> bool:
    sec = "phase"

    method_options = [(0, "None"), (1, "Paganin"), (2, "CTF")]
    method_raw = str(_get(cfg, sec, "method", "none")).lower()
    method_map = {v.lower(): i for i, (_, v) in enumerate(method_options)}
    dlg_method = UserChoice(
        label="Phase retrieval method",
        values=method_options,
        defval=method_map.get(method_raw, 0),
    )

    dlg_delta_beta = UserFloatInput(
        label="Delta/Beta ratio (Paganin / CTF)",
        defval=_float_or_default(_get(cfg, sec, "delta_beta"), 100.0),
    )

    dlg_unsharp_coeff = UserFloatInput(
        label="Unsharp mask coefficient (0 = disabled)",
        defval=_float_or_default(_get(cfg, sec, "unsharp_coeff"), 0.0),
    )
    dlg_unsharp_sigma = UserFloatInput(
        label="Unsharp mask Gaussian sigma (0 = disabled)",
        defval=_float_or_default(_get(cfg, sec, "unsharp_sigma"), 0.0),
    )

    unsharp_method_options = [(0, "Gaussian"), (1, "Laplacian"), (2, "ImageJ")]
    um_raw = str(_get(cfg, sec, "unsharp_method", "gaussian")).lower()
    um_map = {v.lower(): i for i, (_, v) in enumerate(unsharp_method_options)}
    dlg_unsharp_method = UserChoice(
        label="Unsharp mask filter type",
        values=unsharp_method_options,
        defval=um_map.get(um_raw, 0),
    )

    ctf_geom_raw = _get(
        cfg,
        sec,
        "ctf_geometry",
        "z1_v=None; z1_h=None; detec_pixel_size=None; magnification=True",
    )
    ctf_opts = _parse_options(ctf_geom_raw)

    dlg_z1_v = UserInput(label="z1_v (m)", defval=ctf_opts.get("z1_v", "None"))
    dlg_z1_h = UserInput(label="z1_h (m)", defval=ctf_opts.get("z1_h", "None"))
    dlg_pixel_size = UserInput(
        label="Detector pixel size (m)",
        defval=ctf_opts.get("detec_pixel_size", "None"),
    )
    dlg_magnification = UserInput(
        label="Magnification (float or True/False)",
        defval=ctf_opts.get("magnification", "True"),
    )

    ct1 = Container([dlg_method, dlg_delta_beta], title="Phase Retrieval")
    ct2 = Container(
        [dlg_unsharp_coeff, dlg_unsharp_sigma, dlg_unsharp_method], title="Unsharp Mask"
    )
    ct3 = Container(
        [dlg_z1_v, dlg_z1_h, dlg_pixel_size, dlg_magnification], title="CTF Geometry"
    )

    cancel_text = "Back" if submenu else "Cancel"
    ret = BlissDialog(
        [[ct1], [ct2], [ct3]], title="Phase Setup", cancel_text=cancel_text
    ).show()
    if ret is False:
        return False

    cfg[sec]["method"] = method_options[ret[dlg_method]][1].lower()
    cfg[sec]["delta_beta"] = str(ret[dlg_delta_beta])
    cfg[sec]["unsharp_coeff"] = str(ret[dlg_unsharp_coeff])
    cfg[sec]["unsharp_sigma"] = str(ret[dlg_unsharp_sigma])
    cfg[sec]["unsharp_method"] = unsharp_method_options[ret[dlg_unsharp_method]][
        1
    ].lower()

    new_ctf_opts = {
        "z1_v": ret[dlg_z1_v],
        "z1_h": ret[dlg_z1_h],
        "detec_pixel_size": ret[dlg_pixel_size],
        "magnification": ret[dlg_magnification],
    }
    cfg[sec]["ctf_geometry"] = _format_options(new_ctf_opts)

    return True


def _reconstruction_dialog(cfg: dict, submenu: bool = False) -> bool:
    sec = "reconstruction"

    method_options = [(0, "FBP"), (1, "HBP"), (2, "Cone"), (3, "MLEM"), (4, "None")]
    method_raw = str(_get(cfg, sec, "method", "FBP")).lower()
    method_map = {v.lower(): i for i, (_, v) in enumerate(method_options)}
    dlg_method = UserChoice(
        label="Reconstruction method",
        values=method_options,
        defval=method_map.get(method_raw, 0),
    )

    dlg_angles_file = UserInput(
        label="Angles file (empty = use metadata)",
        defval=_get(cfg, sec, "angles_file"),
    )

    dlg_cor = UserInput(
        label="Rotation axis position (number or method name)",
        defval=_get(cfg, sec, "rotation_axis_position", ""),
    )

    padding_options = [(0, "Edges"), (1, "Zeros")]
    pad_raw = str(_get(cfg, sec, "padding_type", "edges")).lower()
    pad_map = {v.lower(): i for i, (_, v) in enumerate(padding_options)}
    dlg_padding = UserChoice(
        label="Padding type (FBP)",
        values=padding_options,
        defval=pad_map.get(pad_raw, 0),
    )

    halftomo_options = [(0, "Auto"), (1, "Enabled"), (2, "Disabled")]
    ht_raw = str(_get(cfg, sec, "enable_halftomo", "auto")).lower()
    if ht_raw == "auto":
        ht_def = 0
    elif _bool_val(ht_raw):
        ht_def = 1
    else:
        ht_def = 2
    dlg_halftomo = UserChoice(
        label="Half-acquisition mode",
        values=halftomo_options,
        defval=ht_def,
    )

    dlg_clip_circle = UserCheckBox(
        name="clip_outer_circle",
        label="Clip voxels outside reconstruction region",
        defval=_bool_val(_get(cfg, sec, "clip_outer_circle", "0")),
    )
    dlg_outer_value = UserFloatInput(
        label="Outer circle fill value",
        defval=_float_or_default(_get(cfg, sec, "outer_circle_value"), 0.0),
    )
    dlg_centered_axis = UserCheckBox(
        name="centered_axis",
        label="Center reconstructed image on rotation axis",
        defval=_bool_val(_get(cfg, sec, "centered_axis", "1")),
    )

    dlg_start_x = UserIntInput(
        label="start_x", defval=_int_or_default(_get(cfg, sec, "start_x"), 0)
    )
    dlg_end_x = UserIntInput(
        label="end_x", defval=_int_or_default(_get(cfg, sec, "end_x"), -1)
    )
    dlg_start_y = UserIntInput(
        label="start_y", defval=_int_or_default(_get(cfg, sec, "start_y"), 0)
    )
    dlg_end_y = UserIntInput(
        label="end_y", defval=_int_or_default(_get(cfg, sec, "end_y"), -1)
    )
    dlg_start_z = UserIntInput(
        label="start_z", defval=_int_or_default(_get(cfg, sec, "start_z"), 0)
    )
    dlg_end_z = UserIntInput(
        label="end_z", defval=_int_or_default(_get(cfg, sec, "end_z"), -1)
    )

    ct1 = Container(
        [dlg_method, dlg_angles_file, dlg_cor, dlg_padding, dlg_halftomo], title="Main"
    )
    ct2 = Container(
        [dlg_clip_circle, dlg_outer_value, dlg_centered_axis], title="Geometry"
    )
    ct3 = Container(
        [dlg_start_x, dlg_end_x, dlg_start_y, dlg_end_y, dlg_start_z, dlg_end_z],
        title="Sub-volume (indices, upper bound INCLUDED, -1 = full)",
    )

    cancel_text = "Back" if submenu else "Cancel"
    ret = BlissDialog(
        [[ct1], [ct2], [ct3]], title="Reconstruction Setup", cancel_text=cancel_text
    ).show()
    if ret is False:
        return False

    cfg[sec]["method"] = (
        method_options[ret[dlg_method]][1].lower()
        if method_options[ret[dlg_method]][1].lower() != "fbp"
        and method_options[ret[dlg_method]][1].lower() != "hbp"
        and method_options[ret[dlg_method]][1].lower() != "mlem"
        else method_options[ret[dlg_method]][1]
    )
    cfg[sec]["angles_file"] = ret[dlg_angles_file]
    cfg[sec]["rotation_axis_position"] = ret[dlg_cor]
    cfg[sec]["padding_type"] = padding_options[ret[dlg_padding]][1].lower()

    ht_value = halftomo_options[ret[dlg_halftomo]][1].lower()
    cfg[sec]["enable_halftomo"] = (
        "auto" if ht_value == "auto" else ("1" if ht_value == "enabled" else "0")
    )

    cfg[sec]["clip_outer_circle"] = "1" if ret[dlg_clip_circle] else "0"
    cfg[sec]["outer_circle_value"] = str(ret[dlg_outer_value])
    cfg[sec]["centered_axis"] = "1" if ret[dlg_centered_axis] else "0"

    cfg[sec]["start_x"] = str(ret[dlg_start_x])
    cfg[sec]["end_x"] = str(ret[dlg_end_x])
    cfg[sec]["start_y"] = str(ret[dlg_start_y])
    cfg[sec]["end_y"] = str(ret[dlg_end_y])
    cfg[sec]["start_z"] = str(ret[dlg_start_z])
    cfg[sec]["end_z"] = str(ret[dlg_end_z])
    return True


def _output_dialog(cfg: dict, submenu: bool = False) -> bool:
    sec = "output"

    dlg_prefix = UserInput(
        label="File prefix (empty = inferred from dataset)",
        defval=_get(cfg, sec, "file_prefix"),
    )

    dlg_overwrite = UserCheckBox(
        name="overwrite_results",
        label="Overwrite existing output files",
        defval=_bool_val(_get(cfg, sec, "overwrite_results", "1")),
    )

    ct = Container([dlg_prefix, dlg_overwrite], title="Output")
    cancel_text = "Back" if submenu else "Cancel"
    ret = BlissDialog([[ct]], title="Output Setup", cancel_text=cancel_text).show()
    if ret is False:
        return False

    cfg[sec]["file_prefix"] = ret[dlg_prefix]
    cfg[sec]["overwrite_results"] = "1" if ret[dlg_overwrite] else "0"
    return True


_SECTION_DIALOGS = {
    "preproc": (_preproc_dialog, "Pre-processing"),
    "phase": (_phase_dialog, "Phase Retrieval"),
    "reconstruction": (_reconstruction_dialog, "Reconstruction"),
    "output": (_output_dialog, "Output"),
}


[docs] def nabu_config_setup(config_path: str) -> None: """ Open an interactive Bliss dialog to edit a nabu configuration file. Parameters ---------- config_path : str Path to the nabu ``*.conf`` file to edit. The file is read at startup and written back every time a section is saved. """ path = Path(config_path) if not os.path.exists(config_path): print( f"[nabu_config_editor] Creating new file with internal fallback defaults: {config_path}" ) os.makedirs(os.path.dirname(os.path.abspath(config_path)), exist_ok=True) cfg = copy.deepcopy(NABU_DEFAULT_CONFIG) _save_config(cfg, config_path) else: cfg = _load_config(str(path)) # Merge with defaults so that every section/key nabu expects is present, # including hidden sections (postproc, resources, pipeline) and hidden # dataset keys (location, darks_flats_dir) that are filled in at runtime. cfg = _merge_defaults(cfg) choice = None while True: value_list = [(key, label) for key, (_, label) in _SECTION_DIALOGS.items()] choice = select_dialog2( title=f"Nabu Config – {path.name}", values=value_list, selection=choice, cancel_text="Close", ) if choice is None: break dialog_fn, _ = _SECTION_DIALOGS[choice] saved = dialog_fn(cfg, submenu=True) if saved: _save_config(cfg, str(path)) print(f"[nabu_config_editor] Saved [{choice}] → {path}")