Source code for blissoda.tests.test_scxrd_eiger2cbf
import json
from types import SimpleNamespace
from unittest.mock import patch
from .. import resources
from ..scxrd.eiger2cbf import Eiger2CbfProcessor
def _scan(tmp_path, scan_type="fscan"):
filename = tmp_path / "RAW_DATA" / "sample" / "sample_0001" / "sample_0001.h5"
scan_info = {
"filename": str(filename),
"scan_nb": 10,
"npoints": 5,
"type": scan_type,
"channels": {"eiger:image": {}},
"instrument": {
"fscan_parameters": {
"start_pos": -5.0,
"step_size": 0.5,
"acq_time": 0.1,
}
},
}
scan_saving = SimpleNamespace(
filename=scan_info["filename"],
images_path=str(tmp_path / "images" / "{scan_number}_{img_acq_device}_"),
)
return SimpleNamespace(
scan_number=scan_info["scan_nb"],
scan_info=scan_info,
scan_saving=scan_saving,
)
def _inputs_by_name(inputs):
return {item["name"]: item["value"] for item in inputs}
[docs]
def test_default_workflow_resource_exists(mock_persistent):
processor = Eiger2CbfProcessor()
assert processor.workflow == resources.resource_filename("scxrd", "eiger2cbf.json")
with open(processor.workflow) as stream:
workflow = json.load(stream)
assert (
workflow["nodes"][0]["task_identifier"]
== "ewoksscxrd.tasks.eiger2cbf.Eiger2CBF"
)
[docs]
def test_run_conversion_dispatches_workflow(mock_persistent, tmp_path):
processor = Eiger2CbfProcessor(defaults={"workflow": "eiger2cbf.json"})
with patch("blissoda.scxrd.eiger2cbf.submit") as submit_mock:
processor.run_conversion(_scan(tmp_path))
submit_mock.assert_called_once()
call = submit_mock.call_args.kwargs
assert call["args"] == ("eiger2cbf.json",)
assert call["kwargs"]["convert_destination"].endswith("_eiger2cbf.json")
assert call["kwargs"]["inputs"][0]["task_identifier"] == "Eiger2CBF"
[docs]
def test_run_conversion_ignores_non_fscan(mock_persistent, tmp_path):
processor = Eiger2CbfProcessor(defaults={"workflow": "eiger2cbf.json"})
with patch("blissoda.scxrd.eiger2cbf.submit") as submit_mock:
processor.run_conversion(_scan(tmp_path, scan_type="ct"))
submit_mock.assert_not_called()