from __future__ import annotations
import json
import time
from pathlib import Path
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from esrf_pathlib import ESRFPath
from ewoksjob.client import submit
from ewoksutils.task_utils import task_inputs
from ..bliss_globals import current_session
from ..persistent.parameters import ParameterInfo
from ..processor import BaseProcessor
from ..processor import BlissScanType
from ..resources import resource_filename
from .xrdct_model import XrdctProcessorModel
try:
from bliss.physics.units import ur as ureg
except ImportError:
from pint import UnitRegistry
ureg = UnitRegistry()
[docs]
class XrdctProcessor(
BaseProcessor,
parameters=[
ParameterInfo("engine", category="workflows"),
ParameterInfo("queue", category="workflows"),
ParameterInfo("workflow", category="workflows"),
ParameterInfo(
"lima_name",
category="Parameters",
doc="Name of the image detector (Ex:pilatus4).",
),
ParameterInfo(
"detector_distance",
category="Parameters",
doc="Variable in HDF5 file containing the distance between the sample and the detector.",
),
ParameterInfo(
"nx_save_path",
category="saving",
doc="Path to save Nx file.",
),
ParameterInfo(
"data_portal_upload",
category="data portal",
doc="Boolean to determine if the results should be uploaded to the data portal.",
),
],
):
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
defaults: Optional[Dict[str, Any]] = None,
) -> None:
"""
Initialize the XrdctProcessor for converting BLISS HDF5 data to Nexus (NX) format.
This processor can be integrated into a BLISS beamline configuration. Typical
usage involves adding the following to a configuration yaml file:
- name: xrdct_blissoda
plugin: generic
class: XrdctProcessor
package: blissoda.xrdct.xrdct_processor
"""
if defaults is None:
defaults = {}
defaults.setdefault("trigger_at", "END")
model_defaults = XrdctProcessorModel().model_dump()
if "xrdct_parameters" in current_session.config.names_list:
self._apply_yaml_to_xrdct_model("xrdct_parameters", model_defaults)
merged_defaults = {**model_defaults, **defaults}
# Store the initial defaults for reset()
super().__setattr__("_initial_defaults", merged_defaults.copy())
super().__setattr__("_xrdct_model", XrdctProcessorModel(**merged_defaults))
super().__init__(config=config, defaults=merged_defaults)
def __setattr__(self, name, value):
"""
Sets the attributes for the class and also the xrdct_model to validate inputs
"""
if hasattr(self, "_xrdct_model") and hasattr(self._xrdct_model, name):
setattr(self._xrdct_model, name, value)
value = getattr(self._xrdct_model, name)
super().__setattr__(name, value)
def _apply_yaml_to_xrdct_model(self, yml_name: str, model_defaults: dict):
yml_params = current_session.config
for key, value in yml_params.get_config(yml_name).items():
if key in model_defaults.keys():
model_defaults[key] = value
def _find_lima_name(self, channels) -> str:
for key, info in channels.items():
if (key.endswith("image") or key.endswith("frame")) and info.get(
"dim"
) >= 2:
return key.split(":")[0]
raise ValueError(
"Cannot determine lima_name automatically. Please enter a string for lima_name"
)
def _build_nx_save_path(self) -> str:
"""
Uses the inputted nx_save_path or builds the path based on the scan filename if nx_save_path=None
Ex: For ESRF_v3 files, the nx_save_path will be under: PROCESSED_DATA/sample/sample_dataset/
"""
nx_path = self._bliss_hdf5_path.processed_dataset_file.with_suffix(".nx")
return str(nx_path)
def _get_dict_entry(self, d: dict, *dict_keys: str, default_val=None):
"""
Traverse a nested dictionary using `dict_keys`.
If the final key has a companion '<key>@units', return a pint.Quantity.
Otherwise return the raw value.
"""
# First, transverse the dictionary to check for a value
current = d
for dict_key in dict_keys:
if not isinstance(current, dict) or dict_key not in current:
return default_val
current = current[dict_key]
# Then, transverse the dictionary to check if the value has units
parent = d
for dict_key in dict_keys[:-1]:
parent = parent.get(dict_key, {})
units_key = f"{dict_keys[-1]}@units"
if isinstance(parent, dict) and units_key in parent:
units = parent[units_key]
try:
return current * ureg[units]
except Exception:
# fallback if units cannot be parsed
return current
return current
def _get_nxxrdct_inputs(self, scan: BlissScanType) -> Dict[str, Any]:
scan_parameters = scan.scan_info
# Save path for NXxrdct file
self.nx_save_path = self.nx_save_path or self._build_nx_save_path()
# Scan information
title = self._get_dict_entry(scan_parameters, "title")
start_time = self._get_dict_entry(scan_parameters, "start_time")
end_time = self._get_dict_entry(scan_parameters, "end_time")
# Beam energy
beam_energy = self._get_dict_entry(
scan_parameters, "positioners", "positioners_start", "energy"
)
# Detector information
self.lima_name = self.lima_name or self._find_lima_name(
scan_parameters["channels"]
)
detector_data = None # Data from AI processing
detector_polar_angle = None # Polar angle output from AI
detector_count_time = self._get_dict_entry(scan_parameters, "count_time")
detector_distance_val = None
detector_x_pixel_size = self._get_dict_entry(
scan_parameters, "instrument", self.lima_name, "x_pixel_size"
)
detector_y_pixel_size = self._get_dict_entry(
scan_parameters, "instrument", self.lima_name, "y_pixel_size"
)
detector_diffraction_channel = None
# Source information
source_name = self._get_dict_entry(
scan_parameters, "instrument", "machine", "name"
)
source_type = self._get_dict_entry(
scan_parameters, "instrument", "machine", "type"
)
source_probe = "x-ray"
# Monochromator information
monochromator_wavelength = self._get_dict_entry(
scan_parameters, "instrument", "zaxis", "wavelength"
)
# Sample information
sample_name = self._get_dict_entry(
scan_parameters, "dataset_metadata_snapshot", "Sample_name"
)
sample_rotation_angles = None
sample_translation_values = None
sample_x_translation = None
sample_y_translation = None
sample_z_translation = None
# Monitor information
monitor_data = None
monitor_mode = None
monitor_preset = None
monitor_integral = None
return {
"nx_path": self.nx_save_path,
"title": title,
"start_time": start_time,
"end_time": end_time,
"beam_incident_energy": beam_energy,
"detector_data": detector_data,
"detector_polar_angle": detector_polar_angle,
"detector_count_time": detector_count_time,
"detector_distance": detector_distance_val,
"detector_x_pixel_size": detector_x_pixel_size,
"detector_y_pixel_size": detector_y_pixel_size,
"detector_diffraction_channel": detector_diffraction_channel,
"source_name": source_name,
"source_type": source_type,
"source_probe": source_probe,
"monochromator_wavelength": monochromator_wavelength,
"sample_name": sample_name,
"sample_rotation_angles": sample_rotation_angles,
"sample_translation_values": sample_translation_values,
"sample_x_translation": sample_x_translation,
"sample_y_translation": sample_y_translation,
"sample_z_translation": sample_z_translation,
"monitor_data": monitor_data,
"monitor_mode": monitor_mode,
"monitor_preset": monitor_preset,
"monitor_integral": monitor_integral,
}
def _collect_workflow_inputs(self, scan: BlissScanType) -> List[Dict[str, Any]]:
workflow_inputs = task_inputs(
task_identifier="ewoksxrdct.tasks.create_nxxrdct.CreateNxxrdct",
inputs=self._get_nxxrdct_inputs(scan),
)
return workflow_inputs
def _get_workflows_dir(self) -> Path:
"""
Get the directory where the workflow output will be saved
Ex: For ESRF_v3 files, the nx_save_path will be under: PROCESSED_DATA/sample/sample_dataset/
"""
return self._bliss_hdf5_path.processed_dataset_path / "workflows" / "gallery"
def _get_workflow_upload_parameters(
self, scan: BlissScanType
) -> Optional[Dict[str, Any]]:
if not scan.scan_info.get("save"):
return None
metadata = {"Sample_name": self.scan_saving.dataset["Sample_name"]}
workflows_dir = self._get_workflows_dir()
raw_directory = str(Path(self.filename).parent)
return {
"beamline": self.scan_saving.beamline,
"proposal": self.scan_saving.proposal_name,
"dataset": "workflows",
"path": str(workflows_dir),
"raw": [raw_directory],
"metadata": metadata,
}
def _get_workflow(self):
with open(resource_filename("xrdct", self.workflow), "r") as wf:
return json.load(wf)
def _get_submit_arguments(self, scan) -> Dict[str, Any]:
inputs = self._collect_workflow_inputs(scan)
kwargs = {
"inputs": inputs,
"outputs": [{"all": False}],
"engine": self.engine,
# "scheduler": "multithreading",
}
upload_parameters = None
if self.data_portal_upload:
upload_parameters = self._get_workflow_upload_parameters(scan)
if upload_parameters:
kwargs["upload_parameters"] = upload_parameters
return kwargs
[docs]
def workflow_destination(self) -> str:
"""
Returns the destination path for the workflow output.
"""
workflows_dir = self._get_workflows_dir()
output_filename = Path(self.nx_save_path).with_suffix(".json").name
return str(workflows_dir / output_filename)
[docs]
def execute_workflow(self, scan: BlissScanType) -> None:
self.scan_saving = scan.scan_saving
self.filename = scan.scan_info.get("filename") or self.scan_saving.filename
self._bliss_hdf5_path = ESRFPath(self.filename)
workflow = self._get_workflow()
kwargs = self._get_submit_arguments(scan)
kwargs["convert_destination"] = self.workflow_destination()
time.sleep(2)
submit(args=(workflow,), kwargs=kwargs, queue=self.queue)
def _trigger_workflow_on_new_scan(self, scan: BlissScanType) -> None:
self.execute_workflow(scan)
[docs]
def reset_to_default_values(self) -> None:
"""
Reset the processor to its initial default values by recreating the Pydantic model and updating attributes accordingly.
"""
# Recompute defaults
model_defaults = XrdctProcessorModel().model_dump()
merged_defaults = {**model_defaults, **self._initial_defaults}
# Reset _xrdct_model defaults
super().__setattr__("_xrdct_model", XrdctProcessorModel(**merged_defaults))
# Reset class values to their defaults
for name, value in merged_defaults.items():
super().__setattr__(name, value)