import os
import time
from contextlib import contextmanager
from typing import Generator
from typing import List
from ....bliss_globals import setup_globals
from ....processor import BlissScanType
from ....xrpd.plots import Xrpd2dIntegrationPlot
from ....xrpd.plots import XrpdCurvePlot
from ....xrpd.plots import XrpdImagePlot
from ... import testing
_RETRY_TIMEOUT = 20
_LIMA1_SLEEP = 10 if os.environ.get("CI") else 0
[docs]
def test_xrpd_ct_with_1d_integration(
processor, detectors, nrepeats: int = None, expo=0.2
):
if nrepeats is None:
nrepeats = processor.number_of_scans + 1
with _scan_context() as scans:
for _ in range(nrepeats):
time.sleep(
_LIMA1_SLEEP
) # Lima1: Camera not Prepared nor Ready nor Armed (state: 5)
scan = processor.pct(expo, *detectors)
scans.append(scan)
# Workflow is getting data from Lima directly so we cannot start
# the next scan until the workflow is done.
testing.wait_workflows()
for lima_name in processor.lima_names:
expected_labels = [_get_scan_legend(processor, scans[-1], lima_name)]
_assert_1d_plot_accum(processor._plotter, expected_labels, lima_name)
nremoved = max(len(scans) - processor.number_of_scans, 0)
expected_labels = [
_get_scan_legend(processor, scan, lima_name)
for scan in scans[nremoved:]
for lima_name in processor.lima_names
]
_assert_1d_plot_last(processor._plotter, expected_labels)
[docs]
def test_xrpd_ct_with_2d_integration(
processor, detectors, nrepeats: int = None, expo=0.2
):
if nrepeats is None:
nrepeats = processor.number_of_scans + 1
with _scan_context() as scans:
for _ in range(nrepeats):
time.sleep(
_LIMA1_SLEEP
) # Lima1: Camera not Prepared nor Ready nor Armed (state: 5)
scan = processor.pct(expo, *detectors)
scans.append(scan)
# Workflow is getting data from Lima directly so we cannot start
# the next scan until the workflow is done.
testing.wait_workflows()
for lima_name in processor.lima_names:
expected_labels = [_get_scan_legend(processor, scans[-1], lima_name)]
_assert_2d_plot_last(processor._plotter, expected_labels, lima_name)
[docs]
def test_xrpd_scan_with_1d_integration(
processors,
detectors,
nrepeats: int = None,
expo=0.2,
npoints=10,
plotting_fails: bool = True,
):
if nrepeats is None:
nrepeats = min(p.number_of_scans for p in processors) + 1
with _scan_context() as scans:
for _ in range(nrepeats):
time.sleep(
_LIMA1_SLEEP
) # Lima1: Camera not Prepared nor Ready nor Armed (state: 5)
scan = setup_globals.loopscan(npoints, expo, *detectors)
scans.append(scan)
testing.wait_workflows()
for processor in processors:
for scan in scans:
for lima_name in processor.lima_names:
_assert_1d_data(
lima_name, processor, scan, scan.scan_info["scan_nb"], npoints
)
for lima_name in processor.lima_names:
expected_labels = [_get_scan_legend(processor, scans[-1], lima_name)]
_assert_1d_plot_accum(processor._plotter, expected_labels, lima_name)
if plotting_fails:
expected_labels = [
_get_scan_legend(processor, scan, lima_name)
for scan in scans[max(len(scans) - processor.number_of_scans, 0) :]
for processor in processors
for lima_name in processor.lima_names
]
else:
expected_labels = []
_assert_1d_plot_last(processor._plotter, expected_labels)
[docs]
def test_xrpd_scan_with_2d_integration(
processors, detectors, nrepeats: int = None, expo=0.2, npoints=10
):
if nrepeats is None:
nrepeats = min(p.number_of_scans for p in processors) + 1
with _scan_context() as scans:
for _ in range(nrepeats):
time.sleep(
_LIMA1_SLEEP
) # Lima1: Camera not Prepared nor Ready nor Armed (state: 5)
scan = setup_globals.loopscan(npoints, expo, *detectors)
scans.append(scan)
testing.wait_workflows()
for processor in processors:
for scan in scans:
for lima_name in processor.lima_names:
_assert_2d_data(
lima_name, processor, scan, scan.scan_info["scan_nb"], npoints
)
for lima_name in processor.lima_names:
expected_labels = [_get_scan_legend(processor, scans[-1], lima_name)]
_assert_2d_plot_last(processor._plotter, expected_labels, lima_name)
return scans
def _assert_1d_plot_last(plotter, expected_labels):
"""The last 1D diffractogram of each scan is plotted as a curve."""
_assert_plot(plotter, expected_labels, "Integrated (Last)", XrpdCurvePlot)
def _assert_1d_plot_accum(plotter, expected_labels, lima_name):
"""The 1D diffractograms of the last scan are plotted as an image."""
_assert_plot(plotter, expected_labels, f"Integrated {lima_name}", XrpdImagePlot)
def _assert_2d_plot_last(plotter, expected_labels, lima_name):
"""The last 2D cake of the last scan is plotted as an image."""
_assert_plot(
plotter,
expected_labels,
f"2D Integrated {lima_name} (last)",
Xrpd2dIntegrationPlot,
)
@testing.demo_assert("Check {plot_id} plot for {expected_labels}")
def _assert_plot(plotter, expected_labels, plot_id, plot_cls):
_ = plotter.wait_tasks(_RETRY_TIMEOUT)
plot = plotter._get_plot(plot_id, plot_cls)
labels = plot.get_labels()
err_msg = f"{labels} instead of {expected_labels}"
assert set(labels) == set(expected_labels), err_msg
def _get_scan_legend(processor, scan, lima_name) -> str:
scan_name = processor._get_scan_name(scan)
return f"{scan_name} ({lima_name})"
@testing.demo_assert("Check 1D integration data for {lima_name} of scan #{scan_number}")
def _assert_1d_data(lima_name, processor, scan, scan_number, npoints):
data_keys = processor.get_data_keys(
scan, lima_name, retry_timeout=_RETRY_TIMEOUT, retry_period=0.2
)
shapes = {
name: processor.get_data(
scan, name, retry_timeout=_RETRY_TIMEOUT, retry_period=0.2
).shape
for name in data_keys
}
expected = processor._expected_data_shapes_1d(lima_name, npoints)
assert shapes == expected, shapes
@testing.demo_assert("Check 2D integration data for {lima_name} of scan #{scan_number}")
def _assert_2d_data(lima_name, processor, scan, scan_number, npoints):
data_keys = processor.get_data_keys(
scan, lima_name, retry_timeout=_RETRY_TIMEOUT, retry_period=0.2
)
shapes = {
name: processor.get_data(
scan, name, retry_timeout=_RETRY_TIMEOUT, retry_period=0.2
).shape
for name in data_keys
}
expected = {
f"{lima_name}:chi": (360,),
f"{lima_name}:q": (4096,),
f"{lima_name}:intensity": (npoints, 360, 4096),
f"{lima_name}:points": (npoints,),
}
assert shapes == expected, shapes
@contextmanager
def _scan_context() -> Generator[List[BlissScanType], None, None]:
scans = []
try:
yield scans
finally:
# Holds references to processor instances
for scan in scans:
scan._Scan__user_scan_meta = None