Source code for blissoda.demo.tests.utils.streamline

import os
from glob import glob
from pathlib import Path
from typing import Dict
from typing import List
from typing import Tuple

import h5py

from ....bliss_globals import current_session
from ....bliss_globals import setup_globals
from ....streamline.scanner import StreamlineScanner
from ....utils import directories
from ....version_utils import has_minimal_version
from ... import testing


[docs] def test_streamline_without_autocalibration(streamline_scanner: StreamlineScanner): # Nothing loaded and 2 holders in the tray: streamline_scanner.eject() streamline_scanner.sample_changer.reset_tray(2) result_dir_pattern, holder_counter = _unused_holder_counter(streamline_scanner) # Initialize workflow without calibration streamline_scanner.init_workflow(with_autocalibration=False) # Measure all holders streamline_scanner.run(0.1, setup_globals.difflab6) # Verify integrated results samples = { holder: {sample: [1] for sample in range(16)} for holder in range(holder_counter, holder_counter + 2) } _assert_results(streamline_scanner, result_dir_pattern, samples)
[docs] def test_streamline_with_autocalibration(streamline_scanner: StreamlineScanner): # Nothing loaded and 1 holder in the tray: streamline_scanner.eject() streamline_scanner.sample_changer.reset_tray(1) result_dir_pattern, holder_counter = _unused_holder_counter(streamline_scanner) # Initialize workflow with calibration streamline_scanner.init_workflow(with_autocalibration=True) # Load, measure and perform calibration streamline_scanner.load() streamline_scanner.calib(0.1, setup_globals.difflab6, sample_index=0) testing.wait_workflows(raise_on_error=True) # Verify integrated results samples = {holder_counter: {0: [1]}} _assert_results(streamline_scanner, result_dir_pattern, samples) # Nothing loaded and 2 holders in the tray: streamline_scanner.eject() streamline_scanner.sample_changer.reset_tray(2) result_dir_pattern, holder_counter = _unused_holder_counter(streamline_scanner) # Measure all holders streamline_scanner.run(0.1, setup_globals.difflab6) # Verify integrated results samples = { holder: {sample: [1] for sample in range(16)} for holder in range(holder_counter, holder_counter + 2) } _assert_results(streamline_scanner, result_dir_pattern, samples)
def _assert_results( streamline_scanner, result_dir_pattern: str, samples: Dict[int, Dict[int, List[int]]], ): testing.wait_workflows(raise_on_error=True) if streamline_scanner.workflow_has_calib: data_paths = streamline_scanner._TEST_DATA_PATHS_WITH_CALIB ascii_suffixes = streamline_scanner._TEST_ASCII_SUFFIXES_WITH_CALIB else: data_paths = streamline_scanner._TEST_DATA_PATHS_WITHOUT_CALIB ascii_suffixes = streamline_scanner._TEST_ASCII_SUFFIXES_WITHOUT_CALIB for holder_counter, sample in samples.items(): for sample_index, dataset_indices in sample.items(): for dataset_index in dataset_indices: result_dir = Path( result_dir_pattern.format( holder_counter=holder_counter, sample_index=sample_index, dataset_index=dataset_index, ) ) file_path = result_dir / f"{result_dir.name}.h5" for data_path in data_paths: testing.assert_hdf5_dataset_exists(file_path, data_path, (4096,)) for ascii_suffix in ascii_suffixes: ascii_file = file_path.parent / f"{file_path.stem}{ascii_suffix}" assert ascii_file.exists(), f"File '{ascii_file}' does not exist" thumbnail = result_dir / "gallery" / "integrate.png" assert thumbnail.exists(), f"File '{thumbnail}' does not exist"
[docs] def test_streamline_id31_auto_stop( streamline_scanner: StreamlineScanner, auto_stop_threshold: float = 0.005 ): """Integration test for the "auto-stop" accumulation mode. Configures the scanner for auto-stop mode, runs a single sample, and verifies that: - A ``*_sum.h5`` file is written with a (1, H, W) dataset. - The normal streamline pipeline produces integrated 1D results. :param streamline_scanner: The StreamlineScanner object to test. :param auto_stop_threshold: The value above or below which the Ewoks auto stop workflow will send a signal to stop the loopscan. See the definition of the ``IntegrateAndSendMsg`` task in the workflow (which may change over time) to inspect the computation that involves this threshold. Default: 0.005. """ if not has_minimal_version("bliss", "2.3"): testing.skip_integration_test( "Requires bliss>=2.3 (lima simulator race on 2.2, see issue #176)" ) # Nothing loaded, 1 holder in the tray streamline_scanner.eject() streamline_scanner.sample_changer.reset_tray(1) result_dir_pattern, holder_counter = _unused_holder_counter(streamline_scanner) # Enable auto-stop mode streamline_scanner.auto_stop_acc_mode = True streamline_scanner.auto_stop_threshold = auto_stop_threshold streamline_scanner.optimize_exposure_per = None streamline_scanner.queue = "celery" # Init the streamline workflow and the auto-stop workflow streamline_scanner.init_workflow(with_autocalibration=False) streamline_scanner.init_auto_stop_workflow() # Measure sample 0 and 1 only streamline_scanner.run(0.2, setup_globals.difflab6, sample_indices=[0, 1]) # Assert streamline pipeline produced integrated results samples = {holder_counter: {0: [1]}} _assert_results(streamline_scanner, result_dir_pattern, samples) # Assert that a *_sum.h5 file was created with the (1, H, W) frame sum # readable at /<scan>.1/measurement/<det> (a SoftLink to the canonical # NXdetector path /<scan>.1/instrument/<det>/data), and the auto-stop # scan stopped early. # The sum file lives in the PROCESSED_DATA tree mirroring the dataset's # raw location. dataset_filename = current_session.scan_saving.filename sample_processed_dir = Path( directories.get_dataset_processed_dir(dataset_filename) ).parent sum_files = list(sample_processed_dir.rglob("*_sum.h5")) assert len(sum_files) > 0, f"No *_sum.h5 file found under '{sample_processed_dir}'" detector_path = "/1.1/measurement/difflab6" nframes_path = "/1.1/auto_stop_sum/results/nframes" with h5py.File(str(sum_files[0]), "r") as f: assert ( detector_path in f ), f"'{detector_path}' dataset not found in {sum_files[0]}" ds = f[detector_path] assert isinstance( ds, h5py.Dataset ), f"'{detector_path}' is not a dataset in {sum_files[0]}" sum_frame = ds[:] nframes_ds = f[nframes_path] assert isinstance(nframes_ds, h5py.Dataset) nframes = int(nframes_ds[()]) assert ( sum_frame.ndim == 3 and sum_frame.shape[0] == 1 ), f"Expected (1, H, W) sum_frame, got shape {sum_frame.shape}" assert ( nframes < 50 ), f"Expected auto-stop to trigger before 50 frames, got {nframes}"
def _unused_holder_counter(streamline_scanner: StreamlineScanner) -> Tuple[str, int]: dataset_filename = current_session.scan_saving.filename raw_data = directories.get_raw_dir(dataset_filename) sample_changer = streamline_scanner.sample_changer # Holders start counting from 1 holder_counter = sample_changer._holder_counter or 1 while glob(os.path.join(raw_data, f"holder{holder_counter}_*")): holder_counter += 1 # Holder counter increments every time we load a new holder sample_changer._holder_counter = holder_counter - 1 processed_data = directories.get_processed_dir(dataset_filename) subdir = "holder{holder_counter}_sample{sample_index:02d}_lab6_{dataset_index:04d}" result_dir_pattern = os.path.join(processed_data, "streamline", subdir) return result_dir_pattern, holder_counter