import json
import sys
from pathlib import Path
from typing import Any
from typing import Dict
from typing import Optional
from ...bliss_globals import current_session # type: ignore
from ...bliss_globals import setup_globals # type: ignore
from ...id31.streamline_scanner import Id31StreamlineScanner
from ...utils import directories
from ..processors.utils.pyfai_calib import DEFAULT_CALIB
from .utils.id31 import ensure_difflab6_id31_flats
from .utils.pyfai_calib import ensure_pyfai_config
from .utils.streamline import DemoStreamlineScannerMixIn
[docs]
class DemoId31StreamlineScanner(DemoStreamlineScannerMixIn, Id31StreamlineScanner):
_TEST_DATA_PATHS_WITHOUT_CALIB = (
"/results/difflab6_integrate_q/integrated/q",
"/results/difflab6_integrate_2th/integrated/2th",
"/results/difflab6_integrate_q_no_sigmaclip/integrated/q",
)
_TEST_DATA_PATHS_WITH_CALIB = (
"/results/difflab6_integrate_q/integrated/q",
"/results/difflab6_integrate_2th/integrated/2th",
)
_TEST_ASCII_SUFFIXES_WITHOUT_CALIB = ("_2th.xye", "_q.xye")
_TEST_ASCII_SUFFIXES_WITH_CALIB = ("_2th.xye", "_q.xye")
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
defaults: Optional[Dict[str, Any]] = None,
**deprecated_defaults: Dict[str, Any],
):
defaults = self._merge_defaults(deprecated_defaults, defaults)
defaults.setdefault("optimize_exposure_per", "sample")
defaults.setdefault("default_attenuator", 4)
defaults.setdefault("energy_name", "energy")
defaults.setdefault("auto_stop_threshold", 0.01)
defaults.setdefault("detector_saturation", 1e6)
defaults.setdefault("auto_stop_scan_npoints", 100)
id31_patching()
super().__init__(config=config, defaults=defaults)
if self._HAS_BLISS:
self.newflat, self.oldflat = ensure_difflab6_id31_flats()
[docs]
def init_auto_stop_workflow(self) -> None:
"""Initialise the demo auto-stop workflow.
Creates the PyFAI config file if needed, writes the
``PyFaiConfig → IntegrateAndSendMsg`` Ewoks workflow JSON into the
proposal's workflows directory, and sets ``auto_stop_workflow_path``.
"""
self.pyfai_config = ensure_pyfai_config("difflab6")
integrate_identifier = (
"blissoda.demo.tasks.stop_scan_xrpd_integrate.IntegrateAndSendMsg"
)
write_sum_identifier = (
"blissoda.demo.tasks.stop_scan_xrpd_integrate.WriteSumFrame"
)
workflow = {
"graph": {"schema_version": "1.1", "id": "streamline_auto_stop"},
"nodes": [
{
"id": "config",
"task_type": "class",
"task_identifier": "ewoksxrpd.tasks.pyfaiconfig.PyFaiConfig",
},
{
"id": "integrate",
"task_type": "class",
"task_identifier": integrate_identifier,
},
{
"id": "write_sum",
"task_type": "class",
"task_identifier": write_sum_identifier,
},
],
"links": [
{
"source": "config",
"target": "integrate",
"data_mapping": [
{
"source_output": "detector_config",
"target_input": "detector_config",
},
{"source_output": "energy", "target_input": "energy"},
{
"source_output": "integration_options",
"target_input": "integration_options",
},
{"source_output": "detector", "target_input": "detector"},
{"source_output": "mask", "target_input": "mask"},
{"source_output": "geometry", "target_input": "geometry"},
],
},
{
"source": "integrate",
"target": "write_sum",
"data_mapping": [
{
"source_output": "acc_profile_threshold",
"target_input": "acc_profile_threshold",
},
{
"source_output": "sum_frame_max_threshold",
"target_input": "sum_frame_max_threshold",
},
{
"source_output": "acc_profile_threshold_reached",
"target_input": "acc_profile_threshold_reached",
},
{
"source_output": "sum_frame_max_threshold_reached",
"target_input": "sum_frame_max_threshold_reached",
},
{
"source_output": "acc_profile_value_at_stop",
"target_input": "acc_profile_value_at_stop",
},
{
"source_output": "sum_frame_max_value_at_stop",
"target_input": "sum_frame_max_value_at_stop",
},
],
},
],
}
workflows_dir = directories.get_workflows_dir(
current_session.scan_saving.filename
)
Path(workflows_dir).mkdir(parents=True, exist_ok=True)
workflow_path = Path(workflows_dir) / "streamline_auto_stop.json"
workflow_path.write_text(json.dumps(workflow, indent=2))
self.auto_stop_workflow_path = str(workflow_path)
print(f"Auto-stop workflow initialized: {workflow_path}")
[docs]
def id31_patching(energy: Optional[float] = None):
if "id31" in sys.modules:
return
from ...tests import mock_id31
# Patch modules to allow "from id31 import attenuator"
sys.modules["id31"] = mock_id31
sys.modules["id31.attenuator"] = mock_id31.attenuator
from ...tests.mock_id31.setup_globals import atten
from ...tests.mock_id31.setup_globals import ehss
setup_globals.shopen = _mock_shopen
if energy is None:
setup_globals.energy.move(DEFAULT_CALIB["difflab6"]["energy"])
else:
setup_globals.energy.move(energy)
setup_globals.atten = atten
setup_globals.att = _att
setup_globals.ehss = ehss
from ...id31 import optimize_exposure
optimize_exposure.id31_attenuator = mock_id31.attenuator
# Keep it patched for manual testing
def _mock_shopen(**kwargs):
arguments = ", ".join(f"{k}={v}" for k, v in kwargs.items())
print(f"shopen({arguments})")
def _att(value):
setup_globals.atten.bits = value