Source code for blissoda.demo.tests.utils.xrpd

import os
import time
from contextlib import contextmanager
from typing import Generator
from typing import List

from ....bliss_globals import setup_globals
from ....processor import BlissScanType
from ....xrpd.plots import Xrpd2dIntegrationPlot
from ....xrpd.plots import XrpdCurvePlot
from ....xrpd.plots import XrpdImagePlot
from ... import testing

_RETRY_TIMEOUT = 20
_LIMA1_SLEEP = 10 if os.environ.get("CI") else 0


[docs] def test_xrpd_ct_with_1d_integration( processor, detectors, nrepeats: int = None, expo=0.2 ): if nrepeats is None: nrepeats = processor.number_of_scans + 1 with _scan_context() as scans: for _ in range(nrepeats): time.sleep( _LIMA1_SLEEP ) # Lima1: Camera not Prepared nor Ready nor Armed (state: 5) scan = processor.pct(expo, *detectors) scans.append(scan) # Workflow is getting data from Lima directly so we cannot start # the next scan until the workflow is done. testing.wait_workflows() for lima_name in processor.lima_names: expected_labels = [_get_scan_legend(processor, scans[-1], lima_name)] _assert_1d_plot_accum(processor._plotter, expected_labels, lima_name) nremoved = max(len(scans) - processor.number_of_scans, 0) expected_labels = [ _get_scan_legend(processor, scan, lima_name) for scan in scans[nremoved:] for lima_name in processor.lima_names ] _assert_1d_plot_last(processor._plotter, expected_labels)
[docs] def test_xrpd_ct_with_2d_integration( processor, detectors, nrepeats: int = None, expo=0.2 ): if nrepeats is None: nrepeats = processor.number_of_scans + 1 with _scan_context() as scans: for _ in range(nrepeats): time.sleep( _LIMA1_SLEEP ) # Lima1: Camera not Prepared nor Ready nor Armed (state: 5) scan = processor.pct(expo, *detectors) scans.append(scan) # Workflow is getting data from Lima directly so we cannot start # the next scan until the workflow is done. testing.wait_workflows() for lima_name in processor.lima_names: expected_labels = [_get_scan_legend(processor, scans[-1], lima_name)] _assert_2d_plot_last(processor._plotter, expected_labels, lima_name)
[docs] def test_xrpd_scan_with_1d_integration( processors, detectors, nrepeats: int = None, expo=0.2, npoints=10, plotting_fails: bool = True, ): if nrepeats is None: nrepeats = min(p.number_of_scans for p in processors) + 1 with _scan_context() as scans: for _ in range(nrepeats): time.sleep( _LIMA1_SLEEP ) # Lima1: Camera not Prepared nor Ready nor Armed (state: 5) scan = setup_globals.loopscan(npoints, expo, *detectors) scans.append(scan) testing.wait_workflows() for processor in processors: for scan in scans: for lima_name in processor.lima_names: _assert_1d_data( lima_name, processor, scan, scan.scan_info["scan_nb"], npoints ) for lima_name in processor.lima_names: expected_labels = [_get_scan_legend(processor, scans[-1], lima_name)] _assert_1d_plot_accum(processor._plotter, expected_labels, lima_name) if plotting_fails: expected_labels = [ _get_scan_legend(processor, scan, lima_name) for scan in scans[max(len(scans) - processor.number_of_scans, 0) :] for processor in processors for lima_name in processor.lima_names ] else: expected_labels = [] _assert_1d_plot_last(processor._plotter, expected_labels)
[docs] def test_xrpd_scan_with_2d_integration( processors, detectors, nrepeats: int = None, expo=0.2, npoints=10 ): if nrepeats is None: nrepeats = min(p.number_of_scans for p in processors) + 1 with _scan_context() as scans: for _ in range(nrepeats): time.sleep( _LIMA1_SLEEP ) # Lima1: Camera not Prepared nor Ready nor Armed (state: 5) scan = setup_globals.loopscan(npoints, expo, *detectors) scans.append(scan) testing.wait_workflows() for processor in processors: for scan in scans: for lima_name in processor.lima_names: _assert_2d_data( lima_name, processor, scan, scan.scan_info["scan_nb"], npoints ) for lima_name in processor.lima_names: expected_labels = [_get_scan_legend(processor, scans[-1], lima_name)] _assert_2d_plot_last(processor._plotter, expected_labels, lima_name) return scans
def _assert_1d_plot_last(plotter, expected_labels): """The last 1D diffractogram of each scan is plotted as a curve.""" _assert_plot(plotter, expected_labels, "Integrated (Last)", XrpdCurvePlot) def _assert_1d_plot_accum(plotter, expected_labels, lima_name): """The 1D diffractograms of the last scan are plotted as an image.""" _assert_plot(plotter, expected_labels, f"Integrated {lima_name}", XrpdImagePlot) def _assert_2d_plot_last(plotter, expected_labels, lima_name): """The last 2D cake of the last scan is plotted as an image.""" _assert_plot( plotter, expected_labels, f"2D Integrated {lima_name} (last)", Xrpd2dIntegrationPlot, ) @testing.demo_assert("Check {plot_id} plot for {expected_labels}") def _assert_plot(plotter, expected_labels, plot_id, plot_cls): _ = plotter.wait_tasks(_RETRY_TIMEOUT) plot = plotter._get_plot(plot_id, plot_cls) labels = plot.get_labels() err_msg = f"{labels} instead of {expected_labels}" assert set(labels) == set(expected_labels), err_msg def _get_scan_legend(processor, scan, lima_name) -> str: scan_name = processor._get_scan_name(scan) return f"{scan_name} ({lima_name})" @testing.demo_assert("Check 1D integration data for {lima_name} of scan #{scan_number}") def _assert_1d_data(lima_name, processor, scan, scan_number, npoints): data_keys = processor.get_data_keys( scan, lima_name, retry_timeout=_RETRY_TIMEOUT, retry_period=0.2 ) shapes = { name: processor.get_data( scan, name, retry_timeout=_RETRY_TIMEOUT, retry_period=0.2 ).shape for name in data_keys } expected = processor._expected_data_shapes_1d(lima_name, npoints) assert shapes == expected, shapes @testing.demo_assert("Check 2D integration data for {lima_name} of scan #{scan_number}") def _assert_2d_data(lima_name, processor, scan, scan_number, npoints): data_keys = processor.get_data_keys( scan, lima_name, retry_timeout=_RETRY_TIMEOUT, retry_period=0.2 ) shapes = { name: processor.get_data( scan, name, retry_timeout=_RETRY_TIMEOUT, retry_period=0.2 ).shape for name in data_keys } expected = { f"{lima_name}:chi": (360,), f"{lima_name}:q": (4096,), f"{lima_name}:intensity": (npoints, 360, 4096), f"{lima_name}:points": (npoints,), } assert shapes == expected, shapes @contextmanager def _scan_context() -> Generator[List[BlissScanType], None, None]: scans = [] try: yield scans finally: # Holds references to processor instances for scan in scans: scan._Scan__user_scan_meta = None