import os
from glob import glob
from pathlib import Path
from typing import Dict
from typing import List
from typing import Tuple
import h5py
from ....bliss_globals import current_session
from ....bliss_globals import setup_globals
from ....streamline.scanner import StreamlineScanner
from ....utils import directories
from ....version_utils import has_minimal_version
from ... import testing
[docs]
def test_streamline_without_autocalibration(streamline_scanner: StreamlineScanner):
# Nothing loaded and 2 holders in the tray:
streamline_scanner.eject()
streamline_scanner.sample_changer.reset_tray(2)
result_dir_pattern, holder_counter = _unused_holder_counter(streamline_scanner)
# Initialize workflow without calibration
streamline_scanner.init_workflow(with_autocalibration=False)
# Measure all holders
streamline_scanner.run(0.1, setup_globals.difflab6)
# Verify integrated results
samples = {
holder: {sample: [1] for sample in range(16)}
for holder in range(holder_counter, holder_counter + 2)
}
_assert_results(streamline_scanner, result_dir_pattern, samples)
[docs]
def test_streamline_with_autocalibration(streamline_scanner: StreamlineScanner):
# Nothing loaded and 1 holder in the tray:
streamline_scanner.eject()
streamline_scanner.sample_changer.reset_tray(1)
result_dir_pattern, holder_counter = _unused_holder_counter(streamline_scanner)
# Initialize workflow with calibration
streamline_scanner.init_workflow(with_autocalibration=True)
# Load, measure and perform calibration
streamline_scanner.load()
streamline_scanner.calib(0.1, setup_globals.difflab6, sample_index=0)
testing.wait_workflows(raise_on_error=True)
# Verify integrated results
samples = {holder_counter: {0: [1]}}
_assert_results(streamline_scanner, result_dir_pattern, samples)
# Nothing loaded and 2 holders in the tray:
streamline_scanner.eject()
streamline_scanner.sample_changer.reset_tray(2)
result_dir_pattern, holder_counter = _unused_holder_counter(streamline_scanner)
# Measure all holders
streamline_scanner.run(0.1, setup_globals.difflab6)
# Verify integrated results
samples = {
holder: {sample: [1] for sample in range(16)}
for holder in range(holder_counter, holder_counter + 2)
}
_assert_results(streamline_scanner, result_dir_pattern, samples)
def _assert_results(
streamline_scanner,
result_dir_pattern: str,
samples: Dict[int, Dict[int, List[int]]],
):
testing.wait_workflows(raise_on_error=True)
if streamline_scanner.workflow_has_calib:
data_paths = streamline_scanner._TEST_DATA_PATHS_WITH_CALIB
ascii_suffixes = streamline_scanner._TEST_ASCII_SUFFIXES_WITH_CALIB
else:
data_paths = streamline_scanner._TEST_DATA_PATHS_WITHOUT_CALIB
ascii_suffixes = streamline_scanner._TEST_ASCII_SUFFIXES_WITHOUT_CALIB
for holder_counter, sample in samples.items():
for sample_index, dataset_indices in sample.items():
for dataset_index in dataset_indices:
result_dir = Path(
result_dir_pattern.format(
holder_counter=holder_counter,
sample_index=sample_index,
dataset_index=dataset_index,
)
)
file_path = result_dir / f"{result_dir.name}.h5"
for data_path in data_paths:
testing.assert_hdf5_dataset_exists(file_path, data_path, (4096,))
for ascii_suffix in ascii_suffixes:
ascii_file = file_path.parent / f"{file_path.stem}{ascii_suffix}"
assert ascii_file.exists(), f"File '{ascii_file}' does not exist"
thumbnail = result_dir / "gallery" / "integrate.png"
assert thumbnail.exists(), f"File '{thumbnail}' does not exist"
[docs]
def test_streamline_id31_auto_stop(
streamline_scanner: StreamlineScanner, auto_stop_threshold: float = 0.005
):
"""Integration test for the "auto-stop" accumulation mode.
Configures the scanner for auto-stop mode, runs a single sample, and
verifies that:
- A ``*_sum.h5`` file is written with a (1, H, W) dataset.
- The normal streamline pipeline produces integrated 1D results.
:param streamline_scanner: The StreamlineScanner object to test.
:param auto_stop_threshold: The value above or below which the Ewoks auto stop
workflow will send a signal to stop the loopscan. See the definition of the
``IntegrateAndSendMsg`` task in the workflow (which may change over time)
to inspect the computation that involves this threshold. Default: 0.005.
"""
if not has_minimal_version("bliss", "2.3"):
testing.skip_integration_test(
"Requires bliss>=2.3 (lima simulator race on 2.2, see issue #176)"
)
# Nothing loaded, 1 holder in the tray
streamline_scanner.eject()
streamline_scanner.sample_changer.reset_tray(1)
result_dir_pattern, holder_counter = _unused_holder_counter(streamline_scanner)
# Enable auto-stop mode
streamline_scanner.auto_stop_acc_mode = True
streamline_scanner.auto_stop_threshold = auto_stop_threshold
streamline_scanner.optimize_exposure_per = None
streamline_scanner.queue = "celery"
# Init the streamline workflow and the auto-stop workflow
streamline_scanner.init_workflow(with_autocalibration=False)
streamline_scanner.init_auto_stop_workflow()
# Measure sample 0 and 1 only
streamline_scanner.run(0.2, setup_globals.difflab6, sample_indices=[0, 1])
# Assert streamline pipeline produced integrated results
samples = {holder_counter: {0: [1]}}
_assert_results(streamline_scanner, result_dir_pattern, samples)
# Assert that a *_sum.h5 file was created with the (1, H, W) frame sum
# readable at /<scan>.1/measurement/<det> (a SoftLink to the canonical
# NXdetector path /<scan>.1/instrument/<det>/data), and the auto-stop
# scan stopped early.
# The sum file lives in the PROCESSED_DATA tree mirroring the dataset's
# raw location.
dataset_filename = current_session.scan_saving.filename
sample_processed_dir = Path(
directories.get_dataset_processed_dir(dataset_filename)
).parent
sum_files = list(sample_processed_dir.rglob("*_sum.h5"))
assert len(sum_files) > 0, f"No *_sum.h5 file found under '{sample_processed_dir}'"
detector_path = "/1.1/measurement/difflab6"
nframes_path = "/1.1/auto_stop_sum/results/nframes"
with h5py.File(str(sum_files[0]), "r") as f:
assert (
detector_path in f
), f"'{detector_path}' dataset not found in {sum_files[0]}"
ds = f[detector_path]
assert isinstance(
ds, h5py.Dataset
), f"'{detector_path}' is not a dataset in {sum_files[0]}"
sum_frame = ds[:]
nframes_ds = f[nframes_path]
assert isinstance(nframes_ds, h5py.Dataset)
nframes = int(nframes_ds[()])
assert (
sum_frame.ndim == 3 and sum_frame.shape[0] == 1
), f"Expected (1, H, W) sum_frame, got shape {sum_frame.shape}"
assert (
nframes < 50
), f"Expected auto-stop to trigger before 50 frames, got {nframes}"
def _unused_holder_counter(streamline_scanner: StreamlineScanner) -> Tuple[str, int]:
dataset_filename = current_session.scan_saving.filename
raw_data = directories.get_raw_dir(dataset_filename)
sample_changer = streamline_scanner.sample_changer
# Holders start counting from 1
holder_counter = sample_changer._holder_counter or 1
while glob(os.path.join(raw_data, f"holder{holder_counter}_*")):
holder_counter += 1
# Holder counter increments every time we load a new holder
sample_changer._holder_counter = holder_counter - 1
processed_data = directories.get_processed_dir(dataset_filename)
subdir = "holder{holder_counter}_sample{sample_index:02d}_lab6_{dataset_index:04d}"
result_dir_pattern = os.path.join(processed_data, "streamline", subdir)
return result_dir_pattern, holder_counter