import configparser
import copy
import os
from pathlib import Path
try:
from bliss.shell.cli.pt_widgets import BlissDialog
from bliss.shell.cli.user_dialog import Container
from bliss.shell.cli.user_dialog import UserCheckBox
from bliss.shell.cli.user_dialog import UserChoice
from bliss.shell.cli.user_dialog import UserFloatInput
from bliss.shell.cli.user_dialog import UserInput
from bliss.shell.cli.user_dialog import UserIntInput
from tomo.helpers.select_dialog2 import select_dialog2
except ImportError as ex:
from ..import_utils import unavailable_class
from ..import_utils import unavailable_function
BlissDialog = unavailable_class(ex)
Container = unavailable_class(ex)
UserCheckBox = unavailable_class(ex)
UserChoice = unavailable_class(ex)
UserFloatInput = unavailable_class(ex)
UserInput = unavailable_class(ex)
UserIntInput = unavailable_class(ex)
select_dialog2 = unavailable_function(ex)
NABU_DEFAULT_CONFIG = {
"dataset": {
"location": "",
"darks_flats_dir": "",
},
"preproc": {
"flatfield": "1",
"flatfield_loading_mode": "load_if_present",
"ccd_filter_enabled": "0",
"ccd_filter_threshold": "0.04",
"double_flatfield": "0",
"take_logarithm": "1",
"sino_rings_correction": "",
},
"phase": {
"method": "none",
"delta_beta": "100.0",
"unsharp_coeff": "0",
"unsharp_sigma": "0",
"unsharp_method": "gaussian",
"ctf_geometry": "z1_v=None; z1_h=None; detec_pixel_size=None; magnification=True",
},
"reconstruction": {
"method": "FBP",
"angles_file": "",
"rotation_axis_position": "sliding-window",
"padding_type": "edges",
"enable_halftomo": "auto",
"clip_outer_circle": "0",
"outer_circle_value": "0",
"centered_axis": "1",
"start_x": "0",
"end_x": "-1",
"start_y": "0",
"end_y": "-1",
"start_z": "0",
"end_z": "-1",
},
"output": {
"location": "",
"file_prefix": "",
"file_format": "hdf5",
"overwrite_results": "1",
},
"postproc": {
"output_histogram": "1",
},
"resources": {
"method": "local",
},
"pipeline": {
"save_steps": "",
"resume_from_step": "",
"ignore_checkpoint_config": "0",
"verbosity": "2",
},
}
def _bool_val(value: str) -> bool:
"""Convert a nabu boolean string to a Python bool."""
return str(value).strip().lower() in ("1", "true", "yes")
def _int_or_default(value, default: int = 0) -> int:
try:
return int(value)
except (TypeError, ValueError):
return default
def _float_or_default(value, default: float = 0.0) -> float:
try:
return float(value)
except (TypeError, ValueError):
return default
def _load_config(path: str) -> dict:
cfg = configparser.ConfigParser(
comment_prefixes=("#", ";"),
inline_comment_prefixes=("#",),
interpolation=None,
allow_no_value=True,
)
cfg.optionxform = str # preserve key case
cfg.read(path)
return cfg._sections
def _parse_options(opt_str: str) -> dict:
"""Parse a semicolon-separated key=value string into a dict."""
res = {}
if not opt_str:
return res
for item in opt_str.split(";"):
if "=" in item:
key, val = item.split("=", 1)
res[key.strip()] = val.strip()
return res
def _format_options(opt_dict: dict) -> str:
"""Format a dict into a semicolon-separated key=value string."""
return "; ".join(f"{k} = {v}" for k, v in opt_dict.items())
def _save_config(cfg: dict, path: str) -> None:
parser = configparser.ConfigParser(
comment_prefixes=("#", ";"),
inline_comment_prefixes=("#",),
interpolation=None,
allow_no_value=True,
)
parser.optionxform = str
parser.read_dict(cfg)
with open(path, "w") as fh:
parser.write(fh)
def _merge_defaults(cfg: dict) -> dict:
"""Return a copy of *cfg* with every key from NABU_DEFAULT_CONFIG filled in
wherever it is absent. Values already present in *cfg* are never touched,
so user customisations are always preserved."""
merged = copy.deepcopy(NABU_DEFAULT_CONFIG)
for section, keys in cfg.items():
if section not in merged:
merged[section] = {}
merged[section].update(keys)
return merged
def _get(cfg, section, key, fallback=""):
val = cfg.get(section, {}).get(key, fallback)
return fallback if val is None else val
def _preproc_dialog(cfg: dict, submenu: bool = False) -> bool:
if "dataset" not in cfg:
cfg["dataset"] = {}
sec = "preproc"
# flatfield – can be 0/1/pca; represent as choice
ff_options = [(0, "Disabled"), (1, "Enabled"), (2, "PCA")]
ff_raw = str(_get(cfg, sec, "flatfield", "1")).lower()
if ff_raw == "pca":
ff_defval = 2
elif _bool_val(ff_raw):
ff_defval = 1
else:
ff_defval = 0
dlg_flatfield = UserChoice(
label="Flat-field normalization",
values=ff_options,
defval=ff_defval,
)
fl_modes = [
(0, "Load_if_present"),
(1, "Force-load"),
(2, "Force-compute"),
]
fl_raw = str(_get(cfg, sec, "flatfield_loading_mode", "Load_if_present")).lower()
fl_map = {v.lower(): i for i, (_, v) in enumerate(fl_modes)}
dlg_fl_mode = UserChoice(
label="Flat-field loading mode",
values=fl_modes,
defval=fl_map.get(fl_raw, 0),
)
dlg_ccd_filter = UserCheckBox(
name="ccd_filter_enabled",
label="CCD hotspot correction enabled",
defval=_bool_val(_get(cfg, sec, "ccd_filter_enabled", "0")),
)
dlg_ccd_thresh = UserFloatInput(
label="CCD filter threshold",
defval=_float_or_default(_get(cfg, sec, "ccd_filter_threshold"), 0.04),
)
# double_flatfield: 0/1/force-load/force-compute
dff_options = [
(0, "Disabled"),
(1, "Enabled"),
(2, "Force-load"),
(3, "Force-compute"),
]
dff_raw = str(_get(cfg, sec, "double_flatfield", "0")).lower()
if dff_raw == "force-load":
dff_def = 2
elif dff_raw == "force-compute":
dff_def = 3
elif _bool_val(dff_raw):
dff_def = 1
else:
dff_def = 0
dlg_dff = UserChoice(
label="Double flat-field",
values=dff_options,
defval=dff_def,
)
dlg_log = UserCheckBox(
name="take_logarithm",
label="Take logarithm after flat-field",
defval=_bool_val(_get(cfg, sec, "take_logarithm", "1")),
)
rings_options = [
(0, "None"),
(1, "Munch"),
(2, "Vo"),
(3, "Wavelets-median"),
(4, "Mean-subtraction"),
(5, "Mean-division"),
]
rings_raw = (
str(_get(cfg, sec, "sino_rings_correction", "")).strip().lower() or "none"
)
rings_map = {v.lower(): i for i, (_, v) in enumerate(rings_options)}
dlg_rings = UserChoice(
label="Sinogram rings correction",
values=rings_options,
defval=rings_map.get(rings_raw, 0),
)
ct1 = Container(
[dlg_flatfield, dlg_fl_mode, dlg_dff, dlg_log], title="Flat-field & Logarithm"
)
ct2 = Container([dlg_ccd_filter, dlg_ccd_thresh], title="CCD Filter")
ct3 = Container([dlg_rings], title="Rings Correction")
cancel_text = "Back" if submenu else "Cancel"
ret = BlissDialog(
[[ct1], [ct2], [ct3]], title="Pre-processing Setup", cancel_text=cancel_text
).show()
if ret is False:
return False
# --- write back ---
ff_value = ff_options[ret[dlg_flatfield]][1].lower()
cfg[sec]["flatfield"] = (
"pca" if ff_value == "pca" else ("1" if ff_value == "enabled" else "0")
)
cfg[sec]["flatfield_loading_mode"] = fl_modes[ret[dlg_fl_mode]][1].lower()
cfg[sec]["ccd_filter_enabled"] = "1" if ret[dlg_ccd_filter] else "0"
cfg[sec]["ccd_filter_threshold"] = str(ret[dlg_ccd_thresh])
dff_value = dff_options[ret[dlg_dff]][1].lower()
cfg[sec]["double_flatfield"] = (
dff_value
if dff_value in ("force-load", "force-compute")
else ("1" if dff_value == "enabled" else "0")
)
cfg[sec]["take_logarithm"] = "1" if ret[dlg_log] else "0"
rings_value = rings_options[ret[dlg_rings]][1].lower()
cfg[sec]["sino_rings_correction"] = "" if rings_value == "none" else rings_value
return True
def _phase_dialog(cfg: dict, submenu: bool = False) -> bool:
sec = "phase"
method_options = [(0, "None"), (1, "Paganin"), (2, "CTF")]
method_raw = str(_get(cfg, sec, "method", "none")).lower()
method_map = {v.lower(): i for i, (_, v) in enumerate(method_options)}
dlg_method = UserChoice(
label="Phase retrieval method",
values=method_options,
defval=method_map.get(method_raw, 0),
)
dlg_delta_beta = UserFloatInput(
label="Delta/Beta ratio (Paganin / CTF)",
defval=_float_or_default(_get(cfg, sec, "delta_beta"), 100.0),
)
dlg_unsharp_coeff = UserFloatInput(
label="Unsharp mask coefficient (0 = disabled)",
defval=_float_or_default(_get(cfg, sec, "unsharp_coeff"), 0.0),
)
dlg_unsharp_sigma = UserFloatInput(
label="Unsharp mask Gaussian sigma (0 = disabled)",
defval=_float_or_default(_get(cfg, sec, "unsharp_sigma"), 0.0),
)
unsharp_method_options = [(0, "Gaussian"), (1, "Laplacian"), (2, "ImageJ")]
um_raw = str(_get(cfg, sec, "unsharp_method", "gaussian")).lower()
um_map = {v.lower(): i for i, (_, v) in enumerate(unsharp_method_options)}
dlg_unsharp_method = UserChoice(
label="Unsharp mask filter type",
values=unsharp_method_options,
defval=um_map.get(um_raw, 0),
)
ctf_geom_raw = _get(
cfg,
sec,
"ctf_geometry",
"z1_v=None; z1_h=None; detec_pixel_size=None; magnification=True",
)
ctf_opts = _parse_options(ctf_geom_raw)
dlg_z1_v = UserInput(label="z1_v (m)", defval=ctf_opts.get("z1_v", "None"))
dlg_z1_h = UserInput(label="z1_h (m)", defval=ctf_opts.get("z1_h", "None"))
dlg_pixel_size = UserInput(
label="Detector pixel size (m)",
defval=ctf_opts.get("detec_pixel_size", "None"),
)
dlg_magnification = UserInput(
label="Magnification (float or True/False)",
defval=ctf_opts.get("magnification", "True"),
)
ct1 = Container([dlg_method, dlg_delta_beta], title="Phase Retrieval")
ct2 = Container(
[dlg_unsharp_coeff, dlg_unsharp_sigma, dlg_unsharp_method], title="Unsharp Mask"
)
ct3 = Container(
[dlg_z1_v, dlg_z1_h, dlg_pixel_size, dlg_magnification], title="CTF Geometry"
)
cancel_text = "Back" if submenu else "Cancel"
ret = BlissDialog(
[[ct1], [ct2], [ct3]], title="Phase Setup", cancel_text=cancel_text
).show()
if ret is False:
return False
cfg[sec]["method"] = method_options[ret[dlg_method]][1].lower()
cfg[sec]["delta_beta"] = str(ret[dlg_delta_beta])
cfg[sec]["unsharp_coeff"] = str(ret[dlg_unsharp_coeff])
cfg[sec]["unsharp_sigma"] = str(ret[dlg_unsharp_sigma])
cfg[sec]["unsharp_method"] = unsharp_method_options[ret[dlg_unsharp_method]][
1
].lower()
new_ctf_opts = {
"z1_v": ret[dlg_z1_v],
"z1_h": ret[dlg_z1_h],
"detec_pixel_size": ret[dlg_pixel_size],
"magnification": ret[dlg_magnification],
}
cfg[sec]["ctf_geometry"] = _format_options(new_ctf_opts)
return True
def _reconstruction_dialog(cfg: dict, submenu: bool = False) -> bool:
sec = "reconstruction"
method_options = [(0, "FBP"), (1, "HBP"), (2, "Cone"), (3, "MLEM"), (4, "None")]
method_raw = str(_get(cfg, sec, "method", "FBP")).lower()
method_map = {v.lower(): i for i, (_, v) in enumerate(method_options)}
dlg_method = UserChoice(
label="Reconstruction method",
values=method_options,
defval=method_map.get(method_raw, 0),
)
dlg_angles_file = UserInput(
label="Angles file (empty = use metadata)",
defval=_get(cfg, sec, "angles_file"),
)
dlg_cor = UserInput(
label="Rotation axis position (number or method name)",
defval=_get(cfg, sec, "rotation_axis_position", ""),
)
padding_options = [(0, "Edges"), (1, "Zeros")]
pad_raw = str(_get(cfg, sec, "padding_type", "edges")).lower()
pad_map = {v.lower(): i for i, (_, v) in enumerate(padding_options)}
dlg_padding = UserChoice(
label="Padding type (FBP)",
values=padding_options,
defval=pad_map.get(pad_raw, 0),
)
halftomo_options = [(0, "Auto"), (1, "Enabled"), (2, "Disabled")]
ht_raw = str(_get(cfg, sec, "enable_halftomo", "auto")).lower()
if ht_raw == "auto":
ht_def = 0
elif _bool_val(ht_raw):
ht_def = 1
else:
ht_def = 2
dlg_halftomo = UserChoice(
label="Half-acquisition mode",
values=halftomo_options,
defval=ht_def,
)
dlg_clip_circle = UserCheckBox(
name="clip_outer_circle",
label="Clip voxels outside reconstruction region",
defval=_bool_val(_get(cfg, sec, "clip_outer_circle", "0")),
)
dlg_outer_value = UserFloatInput(
label="Outer circle fill value",
defval=_float_or_default(_get(cfg, sec, "outer_circle_value"), 0.0),
)
dlg_centered_axis = UserCheckBox(
name="centered_axis",
label="Center reconstructed image on rotation axis",
defval=_bool_val(_get(cfg, sec, "centered_axis", "1")),
)
dlg_start_x = UserIntInput(
label="start_x", defval=_int_or_default(_get(cfg, sec, "start_x"), 0)
)
dlg_end_x = UserIntInput(
label="end_x", defval=_int_or_default(_get(cfg, sec, "end_x"), -1)
)
dlg_start_y = UserIntInput(
label="start_y", defval=_int_or_default(_get(cfg, sec, "start_y"), 0)
)
dlg_end_y = UserIntInput(
label="end_y", defval=_int_or_default(_get(cfg, sec, "end_y"), -1)
)
dlg_start_z = UserIntInput(
label="start_z", defval=_int_or_default(_get(cfg, sec, "start_z"), 0)
)
dlg_end_z = UserIntInput(
label="end_z", defval=_int_or_default(_get(cfg, sec, "end_z"), -1)
)
ct1 = Container(
[dlg_method, dlg_angles_file, dlg_cor, dlg_padding, dlg_halftomo], title="Main"
)
ct2 = Container(
[dlg_clip_circle, dlg_outer_value, dlg_centered_axis], title="Geometry"
)
ct3 = Container(
[dlg_start_x, dlg_end_x, dlg_start_y, dlg_end_y, dlg_start_z, dlg_end_z],
title="Sub-volume (indices, upper bound INCLUDED, -1 = full)",
)
cancel_text = "Back" if submenu else "Cancel"
ret = BlissDialog(
[[ct1], [ct2], [ct3]], title="Reconstruction Setup", cancel_text=cancel_text
).show()
if ret is False:
return False
cfg[sec]["method"] = (
method_options[ret[dlg_method]][1].lower()
if method_options[ret[dlg_method]][1].lower() != "fbp"
and method_options[ret[dlg_method]][1].lower() != "hbp"
and method_options[ret[dlg_method]][1].lower() != "mlem"
else method_options[ret[dlg_method]][1]
)
cfg[sec]["angles_file"] = ret[dlg_angles_file]
cfg[sec]["rotation_axis_position"] = ret[dlg_cor]
cfg[sec]["padding_type"] = padding_options[ret[dlg_padding]][1].lower()
ht_value = halftomo_options[ret[dlg_halftomo]][1].lower()
cfg[sec]["enable_halftomo"] = (
"auto" if ht_value == "auto" else ("1" if ht_value == "enabled" else "0")
)
cfg[sec]["clip_outer_circle"] = "1" if ret[dlg_clip_circle] else "0"
cfg[sec]["outer_circle_value"] = str(ret[dlg_outer_value])
cfg[sec]["centered_axis"] = "1" if ret[dlg_centered_axis] else "0"
cfg[sec]["start_x"] = str(ret[dlg_start_x])
cfg[sec]["end_x"] = str(ret[dlg_end_x])
cfg[sec]["start_y"] = str(ret[dlg_start_y])
cfg[sec]["end_y"] = str(ret[dlg_end_y])
cfg[sec]["start_z"] = str(ret[dlg_start_z])
cfg[sec]["end_z"] = str(ret[dlg_end_z])
return True
def _output_dialog(cfg: dict, submenu: bool = False) -> bool:
sec = "output"
dlg_prefix = UserInput(
label="File prefix (empty = inferred from dataset)",
defval=_get(cfg, sec, "file_prefix"),
)
dlg_overwrite = UserCheckBox(
name="overwrite_results",
label="Overwrite existing output files",
defval=_bool_val(_get(cfg, sec, "overwrite_results", "1")),
)
ct = Container([dlg_prefix, dlg_overwrite], title="Output")
cancel_text = "Back" if submenu else "Cancel"
ret = BlissDialog([[ct]], title="Output Setup", cancel_text=cancel_text).show()
if ret is False:
return False
cfg[sec]["file_prefix"] = ret[dlg_prefix]
cfg[sec]["overwrite_results"] = "1" if ret[dlg_overwrite] else "0"
return True
_SECTION_DIALOGS = {
"preproc": (_preproc_dialog, "Pre-processing"),
"phase": (_phase_dialog, "Phase Retrieval"),
"reconstruction": (_reconstruction_dialog, "Reconstruction"),
"output": (_output_dialog, "Output"),
}
[docs]
def nabu_config_setup(config_path: str) -> None:
"""
Open an interactive Bliss dialog to edit a nabu configuration file.
Parameters
----------
config_path : str
Path to the nabu ``*.conf`` file to edit. The file is read at startup
and written back every time a section is saved.
"""
path = Path(config_path)
if not os.path.exists(config_path):
print(
f"[nabu_config_editor] Creating new file with internal fallback defaults: {config_path}"
)
os.makedirs(os.path.dirname(os.path.abspath(config_path)), exist_ok=True)
cfg = copy.deepcopy(NABU_DEFAULT_CONFIG)
_save_config(cfg, config_path)
else:
cfg = _load_config(str(path))
# Merge with defaults so that every section/key nabu expects is present,
# including hidden sections (postproc, resources, pipeline) and hidden
# dataset keys (location, darks_flats_dir) that are filled in at runtime.
cfg = _merge_defaults(cfg)
choice = None
while True:
value_list = [(key, label) for key, (_, label) in _SECTION_DIALOGS.items()]
choice = select_dialog2(
title=f"Nabu Config – {path.name}",
values=value_list,
selection=choice,
cancel_text="Close",
)
if choice is None:
break
dialog_fn, _ = _SECTION_DIALOGS[choice]
saved = dialog_fn(cfg, submenu=True)
if saved:
_save_config(cfg, str(path))
print(f"[nabu_config_editor] Saved [{choice}] → {path}")