Source code for blissoda.tests.test_create_nxtomo

from types import SimpleNamespace

import numpy as np
import pytest

from ..tomo.create_nxtomo import CreateNxTomoProcessor


def _scan(tmp_path):
    filename = tmp_path / "RAW_DATA" / "sample" / "sample_0001" / "sample_0001.h5"
    scan_info = {
        "title": "tomo:basic",
        "filename": str(filename),
        "scan_nb": 10,
        "start_time": "2026-03-31T10:00:00",
        "end_time": "2026-03-31T10:01:00",
        "sample": {"name": "sample"},
        "instrument": {
            "name": "id00",
            "tomo_config": {"tomo_detector": ["tomodet_detector"]},
            "tomodet_detector": {"data_axes": ["-z", "y"]},
            "mrtomo_config": {"detector": ["legacy_detector_optic"]},
        },
        "measurement": {},
        "positioners": {
            "positioners_start": {
                "omega": 10.0,
                "sx": 1.5,
                "sy": 0.25,
                "sz": 2.5,
            }
        },
        "technique": {
            "tomoconfig": {
                "detector": ["detector"],
                "rotation": ["omega"],
                "sample_x": ["sx"],
                "sample_y": ["sy"],
                "translation_y": ["sy"],
                "translation_z": ["sz"],
                "sample_pixel_size": 1.0,
            },
            "active_tomo_config": ["tomo_config"],
            "scan": {
                "dark_n": 1,
                "flat_n": 2,
                "tomo_n": 3,
                "scan_range": 180.0,
                "energy": 20000.0,
                "energy@units": "eV",
                "exposure_time": 0.2,
                "sample_pixel_size": 1.0,
                "sample_pixel_size@units": "um",
                "flat_on": [3],
                "sample_detector_distance": 100.0,
                "source_sample_distance": 200.0,
                "field_of_view": "full",
                "effective_propagation_distance": 300.0,
            },
            "scan_flags": {"return_images_aligned_to_flats": False},
            "subscans": {
                "scan1": {"type": "tomo:dark"},
                "scan2": {"type": "tomo:flat"},
                "scan3": {"type": "tomo"},
                "scan4": {"type": "tomo:return_ref"},
            },
            "saving": {"frames_per_file": 2, "image_file_format": "HDF5"},
            "optic": {
                "optics_pixel_size": 1.25,
                "optics_pixel_size@units": "um",
            },
            "detector": {
                "detector": {
                    "size": [5, 4],
                    "depth": [2],
                    "pixel_size": [6.5e-6, 7.5e-6],
                    "pixel_size@units": "m",
                    "data_path": "/entry_0000/instrument/detector/data",
                }
            },
        },
    }
    images_path = str(tmp_path / "images" / "{scan_number}_{img_acq_device}_")
    scan_saving = SimpleNamespace(
        filename=scan_info["filename"],
        beamline="id00",
        sample_name="sample",
        images_path=images_path,
        scan_number_format="%04d",
    )
    return SimpleNamespace(scan_info=scan_info, scan_saving=scan_saving)


def _inputs_by_name(inputs):
    return {item["name"]: item["value"] for item in inputs}


def _input_sets_by_label(input_sets):
    return {item["label"]: _inputs_by_name(item["inputs"]) for item in input_sets}


[docs] def test_get_inputs_uses_tomoconfig_aliases(mock_persistent, tmp_path): processor = CreateNxTomoProcessor() scan = _scan(tmp_path) inputs = _inputs_by_name(processor._get_inputs(scan)) np.testing.assert_array_equal( inputs["image_key_control"], np.array([2, 1, 1, 0, 0, 0, -1, -1, -1]) ) np.testing.assert_array_equal(inputs["x_translation_mm"], np.array([0.25])) np.testing.assert_array_equal(inputs["y_translation_mm"], np.array([2.5])) np.testing.assert_array_equal(inputs["z_translation_mm"], np.array([1.5])) np.testing.assert_array_equal(inputs["count_time_s"], np.full(9, 0.2)) np.testing.assert_array_equal( inputs["sequence_number"], np.arange(9, dtype=np.uint32) ) assert inputs["detector_data_file_paths"] == [ str(tmp_path / "images" / "0011_detector_0000.h5"), str(tmp_path / "images" / "0012_detector_0000.h5"), str(tmp_path / "images" / "0013_detector_0000.h5"), str(tmp_path / "images" / "0013_detector_0001.h5"), str(tmp_path / "images" / "0014_detector_0000.h5"), str(tmp_path / "images" / "0014_detector_0001.h5"), ] assert inputs["detector_data_shapes"] == [ [1, 4, 5], [2, 4, 5], [2, 4, 5], [1, 4, 5], [2, 4, 5], [1, 4, 5], ] assert inputs["detector_data_dtype"] == ["uint16"] * 6 assert ( inputs["detector_data_h5_url"] == ["/entry_0000/instrument/detector/data"] * 6 ) np.testing.assert_array_equal( inputs["rotation_angle_deg"], np.array([10.0, 10.0, 10.0, 10.0, 70.0, 130.0, 190.0, 100.0, 10.0]), ) assert inputs["sample_name"] == "sample" assert inputs["nx_path"] == str( tmp_path / "PROCESSED_DATA" / "sample" / "sample_0001" / "projections" / "sample_0001_seq_start.nx" ) assert inputs["instrument_name"] == "id00" assert inputs["detector_data_axes"] == ["-z", "y"] assert inputs["detector_x_pixel_size_um"] == 1.25 assert inputs["detector_y_pixel_size_um"] == 1.25 assert inputs["sample_x_pixel_size_um"] == 1.0 assert inputs["sample_y_pixel_size_um"] == 1.0 assert inputs["estimated_cor"] == pytest.approx(250.0) assert inputs["sample_detector_distance_mm"] == 100.0 assert inputs["source_sample_distance_mm"] == 200.0 assert inputs["field_of_view"] == "full" assert inputs["propagation_distance_mm"] == 300.0 assert inputs["energy_kev"] == 20.0 assert processor.workflow_destination() == str( tmp_path / "PROCESSED_DATA" / "sample" / "sample_0001" / "workflows" / "gallery" / "sample_0001_seq_start_nx.json" )
[docs] def test_get_inputs_supports_helical_metadata(mock_persistent, tmp_path): processor = CreateNxTomoProcessor() scan = _scan(tmp_path) entry = scan.scan_info["technique"]["scan"] scan.scan_info["title"] = "tomo:helical" entry["sequence"] = "tomo:helical" entry["nb_turns"] = 2 entry["z_step"] = 0.5 inputs = _inputs_by_name(processor._get_inputs(scan)) np.testing.assert_array_equal( inputs["image_key_control"], np.array([2, 1, 1, 0, 0, 0, 0, 0, 0, -1, -1, -1]), ) np.testing.assert_array_equal( inputs["rotation_angle_deg"], np.array( [ 10.0, 10.0, 10.0, 10.0, 70.0, 130.0, 190.0, 250.0, 310.0, 190.0, 100.0, 10.0, ] ), ) np.testing.assert_allclose( inputs["y_translation_mm"], np.array( [ 2.5, 2.5, 2.5, 2.5, 2.66666667, 2.83333333, 3.0, 3.16666667, 3.33333333, 3.5, 3.5, 3.5, ] ), ) assert inputs["detector_data_shapes"] == [ [1, 4, 5], [2, 4, 5], [2, 4, 5], [2, 4, 5], [2, 4, 5], [2, 4, 5], [1, 4, 5], ] np.testing.assert_array_equal( inputs["sequence_number"], np.arange(12, dtype=np.uint32) )
[docs] def test_get_input_sets_split_zhelical_into_bottom_helix_top(mock_persistent, tmp_path): processor = CreateNxTomoProcessor() scan = _scan(tmp_path) entry = scan.scan_info["technique"]["scan"] scan.scan_info["title"] = "tomo:helical" entry["sequence"] = "tomo:helical" entry["nb_turns"] = 2 entry["z_step"] = 0.5 scan.scan_info["technique"]["subscans"] = { "scan1": {"type": "tomo:dark"}, "scan2": {"type": "tomo:flat"}, "scan3": {"type": "tomo"}, "scan4": {"type": "tomo"}, "scan5": {"type": "tomo"}, "scan6": {"type": "tomo:return_ref"}, } entry["flat_on"] = [3, 3, 3] input_sets = _input_sets_by_label(processor._get_input_sets(scan)) assert list(input_sets) == ["bottom", "helix", "top"] bottom = input_sets["bottom"] helix = input_sets["helix"] top = input_sets["top"] np.testing.assert_array_equal( bottom["image_key_control"], np.array([2, 1, 1, 0, 0, 0]) ) np.testing.assert_array_equal(helix["image_key_control"], np.zeros(6, dtype=int)) np.testing.assert_array_equal( top["image_key_control"], np.array([0, 0, 0, -1, -1, -1]) ) np.testing.assert_allclose(bottom["y_translation_mm"], np.full(6, 2.5)) np.testing.assert_allclose( helix["y_translation_mm"], np.array([2.5, 2.66666667, 2.83333333, 3.0, 3.16666667, 3.33333333]), ) np.testing.assert_allclose(top["y_translation_mm"], np.full(6, 3.5)) np.testing.assert_array_equal( bottom["sequence_number"], np.arange(6, dtype=np.uint32) ) np.testing.assert_array_equal( helix["sequence_number"], np.arange(6, dtype=np.uint32) ) np.testing.assert_array_equal(top["sequence_number"], np.arange(6, dtype=np.uint32)) assert bottom["nx_path"] == str( tmp_path / "PROCESSED_DATA" / "sample" / "sample_0001_bottom" / "projections" / "sample_0001_seq_start_bottom.nx" ) assert helix["nx_path"] == str( tmp_path / "PROCESSED_DATA" / "sample" / "sample_0001_helix" / "projections" / "sample_0001_seq_start_helix.nx" ) assert top["nx_path"] == str( tmp_path / "PROCESSED_DATA" / "sample" / "sample_0001_top" / "projections" / "sample_0001_seq_start_top.nx" ) assert processor.workflow_destination("bottom") == str( tmp_path / "PROCESSED_DATA" / "sample" / "sample_0001_bottom" / "workflows" / "gallery" / "sample_0001_seq_start_bottom_nx.json" ) assert processor.workflow_destination("helix") == str( tmp_path / "PROCESSED_DATA" / "sample" / "sample_0001_helix" / "workflows" / "gallery" / "sample_0001_seq_start_helix_nx.json" ) assert processor.workflow_destination("top") == str( tmp_path / "PROCESSED_DATA" / "sample" / "sample_0001_top" / "workflows" / "gallery" / "sample_0001_seq_start_top_nx.json" )
[docs] @pytest.mark.parametrize("sequence_name", ["tomo:multitomo", "tomo:pcotomo"]) def test_get_input_sets_split_multiturn_sequences_per_turn( mock_persistent, tmp_path, sequence_name ): processor = CreateNxTomoProcessor() scan = _scan(tmp_path) entry = scan.scan_info["technique"]["scan"] scan.scan_info["title"] = sequence_name entry["sequence"] = sequence_name entry["nb_turns"] = 99 scan.scan_info["technique"]["proj"] = {"proj_n": 3, "nb_turns": 2} scan.scan_info["technique"]["saving"]["frames_per_file"] = 1 input_sets = _input_sets_by_label(processor._get_input_sets(scan)) assert list(input_sets) == ["turn_001", "turn_002"] turn_001 = input_sets["turn_001"] turn_002 = input_sets["turn_002"] expected_image_keys = np.array([2, 1, 1, 0, 0, 0, -1, -1, -1]) np.testing.assert_array_equal(turn_001["image_key_control"], expected_image_keys) np.testing.assert_array_equal(turn_002["image_key_control"], expected_image_keys) np.testing.assert_array_equal( turn_001["rotation_angle_deg"], np.array([10.0, 10.0, 10.0, 10.0, 70.0, 130.0, 190.0, 100.0, 10.0]), ) np.testing.assert_array_equal( turn_002["rotation_angle_deg"], np.array([10.0, 10.0, 10.0, 190.0, 250.0, 310.0, 190.0, 100.0, 10.0]), ) np.testing.assert_array_equal( turn_001["sequence_number"], np.arange(9, dtype=np.uint32) ) np.testing.assert_array_equal( turn_002["sequence_number"], np.arange(9, dtype=np.uint32) ) assert turn_001["nx_path"] == str( tmp_path / "PROCESSED_DATA" / "sample" / "sample_0001_turn_001" / "projections" / "sample_0001_seq_start_turn_001.nx" ) assert turn_002["nx_path"] == str( tmp_path / "PROCESSED_DATA" / "sample" / "sample_0001_turn_002" / "projections" / "sample_0001_seq_start_turn_002.nx" ) assert turn_001["detector_data_file_paths"] == [ str(tmp_path / "images" / "0011_detector_0000.h5"), str(tmp_path / "images" / "0012_detector_0000.h5"), str(tmp_path / "images" / "0012_detector_0001.h5"), str(tmp_path / "images" / "0013_detector_0000.h5"), str(tmp_path / "images" / "0013_detector_0001.h5"), str(tmp_path / "images" / "0013_detector_0002.h5"), str(tmp_path / "images" / "0014_detector_0000.h5"), str(tmp_path / "images" / "0014_detector_0001.h5"), str(tmp_path / "images" / "0014_detector_0002.h5"), ] assert turn_002["detector_data_file_paths"] == [ str(tmp_path / "images" / "0011_detector_0000.h5"), str(tmp_path / "images" / "0012_detector_0000.h5"), str(tmp_path / "images" / "0012_detector_0001.h5"), str(tmp_path / "images" / "0013_detector_0003.h5"), str(tmp_path / "images" / "0013_detector_0004.h5"), str(tmp_path / "images" / "0013_detector_0005.h5"), str(tmp_path / "images" / "0014_detector_0000.h5"), str(tmp_path / "images" / "0014_detector_0001.h5"), str(tmp_path / "images" / "0014_detector_0002.h5"), ]
[docs] def test_multiturn_split_requires_turn_aligned_file_boundaries( mock_persistent, tmp_path ): processor = CreateNxTomoProcessor() scan = _scan(tmp_path) entry = scan.scan_info["technique"]["scan"] scan.scan_info["title"] = "tomo:multitomo" entry["sequence"] = "tomo:multitomo" scan.scan_info["technique"]["proj"] = {"proj_n": 3, "nb_turns": 2} with pytest.raises(ValueError, match="frames_per_file does not align"): processor._get_input_sets(scan)
[docs] def test_multiturn_requires_proj_metadata(mock_persistent, tmp_path): processor = CreateNxTomoProcessor() scan = _scan(tmp_path) entry = scan.scan_info["technique"]["scan"] scan.scan_info["title"] = "tomo:multitomo" entry["sequence"] = "tomo:multitomo" with pytest.raises(ValueError, match="technique.proj.proj_n"): processor._get_input_sets(scan)
[docs] def test_get_input_sets_split_backandforth_per_360deg(mock_persistent, tmp_path): processor = CreateNxTomoProcessor() scan = _scan(tmp_path) entry = scan.scan_info["technique"]["scan"] scan.scan_info["title"] = "tomo:backandforthtomo" entry["sequence"] = "tomo:backandforthtomo" entry["nb_turns"] = 4 entry["scan_range"] = 180.0 entry["flat_on"] = [3, 3, 3, 3] scan.scan_info["technique"]["subscans"] = { "scan1": {"type": "tomo:dark"}, "scan2": {"type": "tomo:flat"}, "scan3": {"type": "tomo"}, "scan4": {"type": "tomo"}, "scan5": {"type": "tomo"}, "scan6": {"type": "tomo"}, "scan7": {"type": "tomo:flat"}, "scan8": {"type": "tomo:dark"}, } scan.scan_info["technique"]["saving"]["frames_per_file"] = 1 input_sets = _input_sets_by_label(processor._get_input_sets(scan)) assert list(input_sets) == ["turn_001", "turn_002"] turn_001 = input_sets["turn_001"] turn_002 = input_sets["turn_002"] np.testing.assert_array_equal( turn_001["image_key_control"], np.array([2, 1, 1, 0, 0, 0, 0, 0, 0]), ) np.testing.assert_array_equal( turn_002["image_key_control"], np.array([0, 0, 0, 0, 0, 0, 1, 1, 2]), ) np.testing.assert_array_equal( turn_001["rotation_angle_deg"], np.array([10.0, 10.0, 10.0, 10.0, 70.0, 130.0, 190.0, 130.0, 70.0]), ) np.testing.assert_array_equal( turn_002["rotation_angle_deg"], np.array([10.0, 70.0, 130.0, 190.0, 130.0, 70.0, 70.0, 70.0, 70.0]), ) assert turn_001["nx_path"] == str( tmp_path / "PROCESSED_DATA" / "sample" / "sample_0001_turn_001" / "projections" / "sample_0001_seq_start_turn_001.nx" ) assert turn_002["nx_path"] == str( tmp_path / "PROCESSED_DATA" / "sample" / "sample_0001_turn_002" / "projections" / "sample_0001_seq_start_turn_002.nx" )
[docs] def test_get_input_sets_store_holotomo_subsequences_under_projection_index( mock_persistent, tmp_path ): processor = CreateNxTomoProcessor() scan = _scan(tmp_path) scan.scan_info["title"] = "holotomo_distance1" scan.scan_info["technique"]["num_position"] = 0 scan.scan_info["technique"]["scan_category"] = "tomo:fulltomo+tomo:holostep" input_sets = _input_sets_by_label(processor._get_input_sets(scan)) assert list(input_sets) == ["0000"] subsequence = input_sets["0000"] assert subsequence["nx_path"] == str( tmp_path / "PROCESSED_DATA" / "sample" / "sample_0001" / "projections" / "sample_0001_seq_start_0000.nx" ) assert processor.workflow_destination("0000", processor._entry(scan)) == str( tmp_path / "PROCESSED_DATA" / "sample" / "sample_0001" / "workflows" / "gallery" / "sample_0001_seq_start_0000_nx.json" )
[docs] def test_top_level_holotomo_container_is_not_supported(mock_persistent, tmp_path): processor = CreateNxTomoProcessor() scan = _scan(tmp_path) scan.scan_info["title"] = "holotomo_sequence" scan.scan_info["technique"]["scan"]["sequence"] = "tomo:holotomo" scan.scan_info["technique"]["scan_category"] = "tomo:holotomo" assert processor._is_tomo_sequence_scan(scan) is False
[docs] def test_sequence_name_falls_back_to_scan_category_before_title(mock_persistent): processor = CreateNxTomoProcessor() entry = { "title": "something-freeform", "technique": { "scan": {}, "scan_category": "tomo:backandforthtomo", }, } assert processor._sequence_name(entry) == "tomo:backandforthtomo"
[docs] def test_get_inputs_converts_exposure_time_from_milliseconds(mock_persistent, tmp_path): processor = CreateNxTomoProcessor() scan = _scan(tmp_path) scan.scan_info["technique"]["scan"]["exposure_time"] = 7 scan.scan_info["technique"]["scan"]["exposure_time@units"] = "ms" inputs = _inputs_by_name(processor._get_inputs(scan)) np.testing.assert_array_equal(inputs["count_time_s"], np.full(9, 0.007))
[docs] def test_detector_name_does_not_fallback_to_instrument_mrtomo_config(mock_persistent): processor = CreateNxTomoProcessor() entry = { "technique": {"tomoconfig": {}}, "instrument": {"mrtomo_config": {"detector": ["legacy_detector_optic"]}}, } with pytest.raises(KeyError, match="detector"): processor._detector_name(entry)
[docs] def test_detector_data_axes_falls_back_to_active_tomo_detector_metadata( mock_persistent, ): processor = CreateNxTomoProcessor() entry = { "instrument": { "tomo2_config": {"tomo_detector": ["tomodet_tomo2cam"]}, "tomodet_tomo2cam": {"data_axes": ("-z", "y")}, }, "technique": { "active_tomo_config": ["tomo2_config"], "tomoconfig": {"detector": ["tomo2cam"]}, }, } assert processor._detector_data_axes(entry) == ["-z", "y"]
[docs] def test_detector_data_axes_defaults_to_no_flip_axes_when_metadata_is_missing( mock_persistent, ): processor = CreateNxTomoProcessor() entry = { "instrument": {"tomo2cam": {"data_axes": ("-z", "y")}}, "technique": {"tomoconfig": {"detector": ["tomo2cam"]}}, } assert processor._detector_data_axes(entry) == ["-z", "y"]
[docs] def test_detector_shape_dtype_uses_depth_when_dtype_is_missing(mock_persistent): processor = CreateNxTomoProcessor() entry = { "technique": { "detector": { "tomo2cam": { "size": [128, 128], "depth": [4], } } } } shape, dtype = processor._detector_shape_dtype(entry, "tomo2cam") assert shape == [128, 128] assert dtype == "uint32"
[docs] def test_pixel_size_defaults_match_bliss_metadata_sources(mock_persistent): processor = CreateNxTomoProcessor() entry = { "technique": { "optic": { "optics_pixel_size": 0.0066, "optics_pixel_size@units": "mm", }, "scan": { "sample_pixel_size": 0.0024, "sample_pixel_size@units": "mm", }, "detector": { "tomo2cam": { "pixel_size": [0.0065, 0.0075], "pixel_size@units": "mm", } }, } } assert processor._sample_pixel_size_um(entry) == pytest.approx(2.4) assert processor._detector_pixel_size_um(entry, "tomo2cam") == pytest.approx( (6.6, 6.6) )
[docs] def test_detector_pixel_size_falls_back_to_detector_metadata(mock_persistent): processor = CreateNxTomoProcessor() entry = { "technique": { "detector": { "tomo2cam": { "pixel_size": [0.0065, 0.0075], "pixel_size@units": "mm", } }, } } assert processor._detector_pixel_size_um(entry, "tomo2cam") == pytest.approx( (6.5, 7.5) )
[docs] def test_position_array_warns_and_defaults_to_zero(mock_persistent, caplog): processor = CreateNxTomoProcessor() entry = { "technique": {"tomoconfig": {"sample_x": ["sx"]}}, "positioners": {"positioners_start": {}}, } with caplog.at_level("WARNING"): value = processor._position_array(entry, "sample_x") np.testing.assert_array_equal(value, np.array([0.0])) assert "Could not resolve sample_x position from scan metadata" in caplog.text