Source code for blissoda.tests.test_create_nxtomo
from types import SimpleNamespace
import numpy as np
import pytest
from ..tomo.create_nxtomo import CreateNxTomoProcessor
def _scan(tmp_path):
filename = tmp_path / "RAW_DATA" / "sample" / "sample_0001" / "sample_0001.h5"
scan_info = {
"title": "tomo:basic",
"filename": str(filename),
"scan_nb": 10,
"start_time": "2026-03-31T10:00:00",
"end_time": "2026-03-31T10:01:00",
"sample": {"name": "sample"},
"instrument": {
"name": "id00",
"tomo_config": {"tomo_detector": ["tomodet_detector"]},
"tomodet_detector": {"data_axes": ["-z", "y"]},
"mrtomo_config": {"detector": ["legacy_detector_optic"]},
},
"measurement": {},
"positioners": {
"positioners_start": {
"omega": 10.0,
"sx": 1.5,
"sy": 0.25,
"sz": 2.5,
}
},
"technique": {
"tomoconfig": {
"detector": ["detector"],
"rotation": ["omega"],
"sample_x": ["sx"],
"sample_y": ["sy"],
"translation_y": ["sy"],
"translation_z": ["sz"],
"sample_pixel_size": 1.0,
},
"active_tomo_config": ["tomo_config"],
"scan": {
"dark_n": 1,
"flat_n": 2,
"tomo_n": 3,
"scan_range": 180.0,
"energy": 20000.0,
"energy@units": "eV",
"exposure_time": 0.2,
"sample_pixel_size": 1.0,
"sample_pixel_size@units": "um",
"flat_on": [3],
"sample_detector_distance": 100.0,
"source_sample_distance": 200.0,
"field_of_view": "full",
"effective_propagation_distance": 300.0,
},
"scan_flags": {"return_images_aligned_to_flats": False},
"subscans": {
"scan1": {"type": "tomo:dark"},
"scan2": {"type": "tomo:flat"},
"scan3": {"type": "tomo"},
"scan4": {"type": "tomo:return_ref"},
},
"saving": {"frames_per_file": 2, "image_file_format": "HDF5"},
"optic": {
"optics_pixel_size": 1.25,
"optics_pixel_size@units": "um",
},
"detector": {
"detector": {
"size": [5, 4],
"depth": [2],
"pixel_size": [6.5e-6, 7.5e-6],
"pixel_size@units": "m",
"data_path": "/entry_0000/instrument/detector/data",
}
},
},
}
images_path = str(tmp_path / "images" / "{scan_number}_{img_acq_device}_")
scan_saving = SimpleNamespace(
filename=scan_info["filename"],
beamline="id00",
sample_name="sample",
images_path=images_path,
scan_number_format="%04d",
)
return SimpleNamespace(scan_info=scan_info, scan_saving=scan_saving)
def _inputs_by_name(inputs):
return {item["name"]: item["value"] for item in inputs}
def _input_sets_by_label(input_sets):
return {item["label"]: _inputs_by_name(item["inputs"]) for item in input_sets}
[docs]
def test_get_inputs_uses_tomoconfig_aliases(mock_persistent, tmp_path):
processor = CreateNxTomoProcessor()
scan = _scan(tmp_path)
inputs = _inputs_by_name(processor._get_inputs(scan))
np.testing.assert_array_equal(
inputs["image_key_control"], np.array([2, 1, 1, 0, 0, 0, -1, -1, -1])
)
np.testing.assert_array_equal(inputs["x_translation_mm"], np.array([0.25]))
np.testing.assert_array_equal(inputs["y_translation_mm"], np.array([2.5]))
np.testing.assert_array_equal(inputs["z_translation_mm"], np.array([1.5]))
np.testing.assert_array_equal(inputs["count_time_s"], np.full(9, 0.2))
np.testing.assert_array_equal(
inputs["sequence_number"], np.arange(9, dtype=np.uint32)
)
assert inputs["detector_data_file_paths"] == [
str(tmp_path / "images" / "0011_detector_0000.h5"),
str(tmp_path / "images" / "0012_detector_0000.h5"),
str(tmp_path / "images" / "0013_detector_0000.h5"),
str(tmp_path / "images" / "0013_detector_0001.h5"),
str(tmp_path / "images" / "0014_detector_0000.h5"),
str(tmp_path / "images" / "0014_detector_0001.h5"),
]
assert inputs["detector_data_shapes"] == [
[1, 4, 5],
[2, 4, 5],
[2, 4, 5],
[1, 4, 5],
[2, 4, 5],
[1, 4, 5],
]
assert inputs["detector_data_dtype"] == ["uint16"] * 6
assert (
inputs["detector_data_h5_url"] == ["/entry_0000/instrument/detector/data"] * 6
)
np.testing.assert_array_equal(
inputs["rotation_angle_deg"],
np.array([10.0, 10.0, 10.0, 10.0, 70.0, 130.0, 190.0, 100.0, 10.0]),
)
assert inputs["sample_name"] == "sample"
assert inputs["nx_path"] == str(
tmp_path
/ "PROCESSED_DATA"
/ "sample"
/ "sample_0001"
/ "projections"
/ "sample_0001_seq_start.nx"
)
assert inputs["instrument_name"] == "id00"
assert inputs["detector_data_axes"] == ["-z", "y"]
assert inputs["detector_x_pixel_size_um"] == 1.25
assert inputs["detector_y_pixel_size_um"] == 1.25
assert inputs["sample_x_pixel_size_um"] == 1.0
assert inputs["sample_y_pixel_size_um"] == 1.0
assert inputs["estimated_cor"] == pytest.approx(250.0)
assert inputs["sample_detector_distance_mm"] == 100.0
assert inputs["source_sample_distance_mm"] == 200.0
assert inputs["field_of_view"] == "full"
assert inputs["propagation_distance_mm"] == 300.0
assert inputs["energy_kev"] == 20.0
assert processor.workflow_destination() == str(
tmp_path
/ "PROCESSED_DATA"
/ "sample"
/ "sample_0001"
/ "workflows"
/ "gallery"
/ "sample_0001_seq_start_nx.json"
)
[docs]
def test_get_inputs_supports_helical_metadata(mock_persistent, tmp_path):
processor = CreateNxTomoProcessor()
scan = _scan(tmp_path)
entry = scan.scan_info["technique"]["scan"]
scan.scan_info["title"] = "tomo:helical"
entry["sequence"] = "tomo:helical"
entry["nb_turns"] = 2
entry["z_step"] = 0.5
inputs = _inputs_by_name(processor._get_inputs(scan))
np.testing.assert_array_equal(
inputs["image_key_control"],
np.array([2, 1, 1, 0, 0, 0, 0, 0, 0, -1, -1, -1]),
)
np.testing.assert_array_equal(
inputs["rotation_angle_deg"],
np.array(
[
10.0,
10.0,
10.0,
10.0,
70.0,
130.0,
190.0,
250.0,
310.0,
190.0,
100.0,
10.0,
]
),
)
np.testing.assert_allclose(
inputs["y_translation_mm"],
np.array(
[
2.5,
2.5,
2.5,
2.5,
2.66666667,
2.83333333,
3.0,
3.16666667,
3.33333333,
3.5,
3.5,
3.5,
]
),
)
assert inputs["detector_data_shapes"] == [
[1, 4, 5],
[2, 4, 5],
[2, 4, 5],
[2, 4, 5],
[2, 4, 5],
[2, 4, 5],
[1, 4, 5],
]
np.testing.assert_array_equal(
inputs["sequence_number"], np.arange(12, dtype=np.uint32)
)
[docs]
def test_get_input_sets_split_zhelical_into_bottom_helix_top(mock_persistent, tmp_path):
processor = CreateNxTomoProcessor()
scan = _scan(tmp_path)
entry = scan.scan_info["technique"]["scan"]
scan.scan_info["title"] = "tomo:helical"
entry["sequence"] = "tomo:helical"
entry["nb_turns"] = 2
entry["z_step"] = 0.5
scan.scan_info["technique"]["subscans"] = {
"scan1": {"type": "tomo:dark"},
"scan2": {"type": "tomo:flat"},
"scan3": {"type": "tomo"},
"scan4": {"type": "tomo"},
"scan5": {"type": "tomo"},
"scan6": {"type": "tomo:return_ref"},
}
entry["flat_on"] = [3, 3, 3]
input_sets = _input_sets_by_label(processor._get_input_sets(scan))
assert list(input_sets) == ["bottom", "helix", "top"]
bottom = input_sets["bottom"]
helix = input_sets["helix"]
top = input_sets["top"]
np.testing.assert_array_equal(
bottom["image_key_control"], np.array([2, 1, 1, 0, 0, 0])
)
np.testing.assert_array_equal(helix["image_key_control"], np.zeros(6, dtype=int))
np.testing.assert_array_equal(
top["image_key_control"], np.array([0, 0, 0, -1, -1, -1])
)
np.testing.assert_allclose(bottom["y_translation_mm"], np.full(6, 2.5))
np.testing.assert_allclose(
helix["y_translation_mm"],
np.array([2.5, 2.66666667, 2.83333333, 3.0, 3.16666667, 3.33333333]),
)
np.testing.assert_allclose(top["y_translation_mm"], np.full(6, 3.5))
np.testing.assert_array_equal(
bottom["sequence_number"], np.arange(6, dtype=np.uint32)
)
np.testing.assert_array_equal(
helix["sequence_number"], np.arange(6, dtype=np.uint32)
)
np.testing.assert_array_equal(top["sequence_number"], np.arange(6, dtype=np.uint32))
assert bottom["nx_path"] == str(
tmp_path
/ "PROCESSED_DATA"
/ "sample"
/ "sample_0001_bottom"
/ "projections"
/ "sample_0001_seq_start_bottom.nx"
)
assert helix["nx_path"] == str(
tmp_path
/ "PROCESSED_DATA"
/ "sample"
/ "sample_0001_helix"
/ "projections"
/ "sample_0001_seq_start_helix.nx"
)
assert top["nx_path"] == str(
tmp_path
/ "PROCESSED_DATA"
/ "sample"
/ "sample_0001_top"
/ "projections"
/ "sample_0001_seq_start_top.nx"
)
assert processor.workflow_destination("bottom") == str(
tmp_path
/ "PROCESSED_DATA"
/ "sample"
/ "sample_0001_bottom"
/ "workflows"
/ "gallery"
/ "sample_0001_seq_start_bottom_nx.json"
)
assert processor.workflow_destination("helix") == str(
tmp_path
/ "PROCESSED_DATA"
/ "sample"
/ "sample_0001_helix"
/ "workflows"
/ "gallery"
/ "sample_0001_seq_start_helix_nx.json"
)
assert processor.workflow_destination("top") == str(
tmp_path
/ "PROCESSED_DATA"
/ "sample"
/ "sample_0001_top"
/ "workflows"
/ "gallery"
/ "sample_0001_seq_start_top_nx.json"
)
[docs]
@pytest.mark.parametrize("sequence_name", ["tomo:multitomo", "tomo:pcotomo"])
def test_get_input_sets_split_multiturn_sequences_per_turn(
mock_persistent, tmp_path, sequence_name
):
processor = CreateNxTomoProcessor()
scan = _scan(tmp_path)
entry = scan.scan_info["technique"]["scan"]
scan.scan_info["title"] = sequence_name
entry["sequence"] = sequence_name
entry["nb_turns"] = 99
scan.scan_info["technique"]["proj"] = {"proj_n": 3, "nb_turns": 2}
scan.scan_info["technique"]["saving"]["frames_per_file"] = 1
input_sets = _input_sets_by_label(processor._get_input_sets(scan))
assert list(input_sets) == ["turn_001", "turn_002"]
turn_001 = input_sets["turn_001"]
turn_002 = input_sets["turn_002"]
expected_image_keys = np.array([2, 1, 1, 0, 0, 0, -1, -1, -1])
np.testing.assert_array_equal(turn_001["image_key_control"], expected_image_keys)
np.testing.assert_array_equal(turn_002["image_key_control"], expected_image_keys)
np.testing.assert_array_equal(
turn_001["rotation_angle_deg"],
np.array([10.0, 10.0, 10.0, 10.0, 70.0, 130.0, 190.0, 100.0, 10.0]),
)
np.testing.assert_array_equal(
turn_002["rotation_angle_deg"],
np.array([10.0, 10.0, 10.0, 190.0, 250.0, 310.0, 190.0, 100.0, 10.0]),
)
np.testing.assert_array_equal(
turn_001["sequence_number"], np.arange(9, dtype=np.uint32)
)
np.testing.assert_array_equal(
turn_002["sequence_number"], np.arange(9, dtype=np.uint32)
)
assert turn_001["nx_path"] == str(
tmp_path
/ "PROCESSED_DATA"
/ "sample"
/ "sample_0001_turn_001"
/ "projections"
/ "sample_0001_seq_start_turn_001.nx"
)
assert turn_002["nx_path"] == str(
tmp_path
/ "PROCESSED_DATA"
/ "sample"
/ "sample_0001_turn_002"
/ "projections"
/ "sample_0001_seq_start_turn_002.nx"
)
assert turn_001["detector_data_file_paths"] == [
str(tmp_path / "images" / "0011_detector_0000.h5"),
str(tmp_path / "images" / "0012_detector_0000.h5"),
str(tmp_path / "images" / "0012_detector_0001.h5"),
str(tmp_path / "images" / "0013_detector_0000.h5"),
str(tmp_path / "images" / "0013_detector_0001.h5"),
str(tmp_path / "images" / "0013_detector_0002.h5"),
str(tmp_path / "images" / "0014_detector_0000.h5"),
str(tmp_path / "images" / "0014_detector_0001.h5"),
str(tmp_path / "images" / "0014_detector_0002.h5"),
]
assert turn_002["detector_data_file_paths"] == [
str(tmp_path / "images" / "0011_detector_0000.h5"),
str(tmp_path / "images" / "0012_detector_0000.h5"),
str(tmp_path / "images" / "0012_detector_0001.h5"),
str(tmp_path / "images" / "0013_detector_0003.h5"),
str(tmp_path / "images" / "0013_detector_0004.h5"),
str(tmp_path / "images" / "0013_detector_0005.h5"),
str(tmp_path / "images" / "0014_detector_0000.h5"),
str(tmp_path / "images" / "0014_detector_0001.h5"),
str(tmp_path / "images" / "0014_detector_0002.h5"),
]
[docs]
def test_multiturn_split_requires_turn_aligned_file_boundaries(
mock_persistent, tmp_path
):
processor = CreateNxTomoProcessor()
scan = _scan(tmp_path)
entry = scan.scan_info["technique"]["scan"]
scan.scan_info["title"] = "tomo:multitomo"
entry["sequence"] = "tomo:multitomo"
scan.scan_info["technique"]["proj"] = {"proj_n": 3, "nb_turns": 2}
with pytest.raises(ValueError, match="frames_per_file does not align"):
processor._get_input_sets(scan)
[docs]
def test_multiturn_requires_proj_metadata(mock_persistent, tmp_path):
processor = CreateNxTomoProcessor()
scan = _scan(tmp_path)
entry = scan.scan_info["technique"]["scan"]
scan.scan_info["title"] = "tomo:multitomo"
entry["sequence"] = "tomo:multitomo"
with pytest.raises(ValueError, match="technique.proj.proj_n"):
processor._get_input_sets(scan)
[docs]
def test_get_input_sets_split_backandforth_per_360deg(mock_persistent, tmp_path):
processor = CreateNxTomoProcessor()
scan = _scan(tmp_path)
entry = scan.scan_info["technique"]["scan"]
scan.scan_info["title"] = "tomo:backandforthtomo"
entry["sequence"] = "tomo:backandforthtomo"
entry["nb_turns"] = 4
entry["scan_range"] = 180.0
entry["flat_on"] = [3, 3, 3, 3]
scan.scan_info["technique"]["subscans"] = {
"scan1": {"type": "tomo:dark"},
"scan2": {"type": "tomo:flat"},
"scan3": {"type": "tomo"},
"scan4": {"type": "tomo"},
"scan5": {"type": "tomo"},
"scan6": {"type": "tomo"},
"scan7": {"type": "tomo:flat"},
"scan8": {"type": "tomo:dark"},
}
scan.scan_info["technique"]["saving"]["frames_per_file"] = 1
input_sets = _input_sets_by_label(processor._get_input_sets(scan))
assert list(input_sets) == ["turn_001", "turn_002"]
turn_001 = input_sets["turn_001"]
turn_002 = input_sets["turn_002"]
np.testing.assert_array_equal(
turn_001["image_key_control"],
np.array([2, 1, 1, 0, 0, 0, 0, 0, 0]),
)
np.testing.assert_array_equal(
turn_002["image_key_control"],
np.array([0, 0, 0, 0, 0, 0, 1, 1, 2]),
)
np.testing.assert_array_equal(
turn_001["rotation_angle_deg"],
np.array([10.0, 10.0, 10.0, 10.0, 70.0, 130.0, 190.0, 130.0, 70.0]),
)
np.testing.assert_array_equal(
turn_002["rotation_angle_deg"],
np.array([10.0, 70.0, 130.0, 190.0, 130.0, 70.0, 70.0, 70.0, 70.0]),
)
assert turn_001["nx_path"] == str(
tmp_path
/ "PROCESSED_DATA"
/ "sample"
/ "sample_0001_turn_001"
/ "projections"
/ "sample_0001_seq_start_turn_001.nx"
)
assert turn_002["nx_path"] == str(
tmp_path
/ "PROCESSED_DATA"
/ "sample"
/ "sample_0001_turn_002"
/ "projections"
/ "sample_0001_seq_start_turn_002.nx"
)
[docs]
def test_get_input_sets_store_holotomo_subsequences_under_projection_index(
mock_persistent, tmp_path
):
processor = CreateNxTomoProcessor()
scan = _scan(tmp_path)
scan.scan_info["title"] = "holotomo_distance1"
scan.scan_info["technique"]["num_position"] = 0
scan.scan_info["technique"]["scan_category"] = "tomo:fulltomo+tomo:holostep"
input_sets = _input_sets_by_label(processor._get_input_sets(scan))
assert list(input_sets) == ["0000"]
subsequence = input_sets["0000"]
assert subsequence["nx_path"] == str(
tmp_path
/ "PROCESSED_DATA"
/ "sample"
/ "sample_0001"
/ "projections"
/ "sample_0001_seq_start_0000.nx"
)
assert processor.workflow_destination("0000", processor._entry(scan)) == str(
tmp_path
/ "PROCESSED_DATA"
/ "sample"
/ "sample_0001"
/ "workflows"
/ "gallery"
/ "sample_0001_seq_start_0000_nx.json"
)
[docs]
def test_top_level_holotomo_container_is_not_supported(mock_persistent, tmp_path):
processor = CreateNxTomoProcessor()
scan = _scan(tmp_path)
scan.scan_info["title"] = "holotomo_sequence"
scan.scan_info["technique"]["scan"]["sequence"] = "tomo:holotomo"
scan.scan_info["technique"]["scan_category"] = "tomo:holotomo"
assert processor._is_tomo_sequence_scan(scan) is False
[docs]
def test_sequence_name_falls_back_to_scan_category_before_title(mock_persistent):
processor = CreateNxTomoProcessor()
entry = {
"title": "something-freeform",
"technique": {
"scan": {},
"scan_category": "tomo:backandforthtomo",
},
}
assert processor._sequence_name(entry) == "tomo:backandforthtomo"
[docs]
def test_get_inputs_converts_exposure_time_from_milliseconds(mock_persistent, tmp_path):
processor = CreateNxTomoProcessor()
scan = _scan(tmp_path)
scan.scan_info["technique"]["scan"]["exposure_time"] = 7
scan.scan_info["technique"]["scan"]["exposure_time@units"] = "ms"
inputs = _inputs_by_name(processor._get_inputs(scan))
np.testing.assert_array_equal(inputs["count_time_s"], np.full(9, 0.007))
[docs]
def test_detector_name_does_not_fallback_to_instrument_mrtomo_config(mock_persistent):
processor = CreateNxTomoProcessor()
entry = {
"technique": {"tomoconfig": {}},
"instrument": {"mrtomo_config": {"detector": ["legacy_detector_optic"]}},
}
with pytest.raises(KeyError, match="detector"):
processor._detector_name(entry)
[docs]
def test_detector_data_axes_falls_back_to_active_tomo_detector_metadata(
mock_persistent,
):
processor = CreateNxTomoProcessor()
entry = {
"instrument": {
"tomo2_config": {"tomo_detector": ["tomodet_tomo2cam"]},
"tomodet_tomo2cam": {"data_axes": ("-z", "y")},
},
"technique": {
"active_tomo_config": ["tomo2_config"],
"tomoconfig": {"detector": ["tomo2cam"]},
},
}
assert processor._detector_data_axes(entry) == ["-z", "y"]
[docs]
def test_detector_data_axes_defaults_to_no_flip_axes_when_metadata_is_missing(
mock_persistent,
):
processor = CreateNxTomoProcessor()
entry = {
"instrument": {"tomo2cam": {"data_axes": ("-z", "y")}},
"technique": {"tomoconfig": {"detector": ["tomo2cam"]}},
}
assert processor._detector_data_axes(entry) == ["-z", "y"]
[docs]
def test_detector_shape_dtype_uses_depth_when_dtype_is_missing(mock_persistent):
processor = CreateNxTomoProcessor()
entry = {
"technique": {
"detector": {
"tomo2cam": {
"size": [128, 128],
"depth": [4],
}
}
}
}
shape, dtype = processor._detector_shape_dtype(entry, "tomo2cam")
assert shape == [128, 128]
assert dtype == "uint32"
[docs]
def test_pixel_size_defaults_match_bliss_metadata_sources(mock_persistent):
processor = CreateNxTomoProcessor()
entry = {
"technique": {
"optic": {
"optics_pixel_size": 0.0066,
"optics_pixel_size@units": "mm",
},
"scan": {
"sample_pixel_size": 0.0024,
"sample_pixel_size@units": "mm",
},
"detector": {
"tomo2cam": {
"pixel_size": [0.0065, 0.0075],
"pixel_size@units": "mm",
}
},
}
}
assert processor._sample_pixel_size_um(entry) == pytest.approx(2.4)
assert processor._detector_pixel_size_um(entry, "tomo2cam") == pytest.approx(
(6.6, 6.6)
)
[docs]
def test_detector_pixel_size_falls_back_to_detector_metadata(mock_persistent):
processor = CreateNxTomoProcessor()
entry = {
"technique": {
"detector": {
"tomo2cam": {
"pixel_size": [0.0065, 0.0075],
"pixel_size@units": "mm",
}
},
}
}
assert processor._detector_pixel_size_um(entry, "tomo2cam") == pytest.approx(
(6.5, 7.5)
)
[docs]
def test_position_array_warns_and_defaults_to_zero(mock_persistent, caplog):
processor = CreateNxTomoProcessor()
entry = {
"technique": {"tomoconfig": {"sample_x": ["sx"]}},
"positioners": {"positioners_start": {}},
}
with caplog.at_level("WARNING"):
value = processor._position_array(entry, "sample_x")
np.testing.assert_array_equal(value, np.array([0.0]))
assert "Could not resolve sample_x position from scan metadata" in caplog.text