Source code for blissoda.xrdct.xrdct_processor

from __future__ import annotations

import json
import time
from pathlib import Path
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

from esrf_pathlib import ESRFPath
from ewoksjob.client import submit
from ewoksutils.task_utils import task_inputs

from ..bliss_globals import current_session
from ..persistent.parameters import ParameterInfo
from ..processor import BaseProcessor
from ..processor import BlissScanType
from ..resources import resource_filename
from .xrdct_model import XrdctProcessorModel

try:
    from bliss.physics.units import ur as ureg
except ImportError:
    from pint import UnitRegistry

    ureg = UnitRegistry()


[docs] class XrdctProcessor( BaseProcessor, parameters=[ ParameterInfo("engine", category="workflows"), ParameterInfo("queue", category="workflows"), ParameterInfo("workflow", category="workflows"), ParameterInfo( "lima_name", category="Parameters", doc="Name of the image detector (Ex:pilatus4).", ), ParameterInfo( "detector_distance", category="Parameters", doc="Variable in HDF5 file containing the distance between the sample and the detector.", ), ParameterInfo( "nx_save_path", category="saving", doc="Path to save Nx file.", ), ParameterInfo( "data_portal_upload", category="data portal", doc="Boolean to determine if the results should be uploaded to the data portal.", ), ], ): def __init__( self, config: Optional[Dict[str, Any]] = None, defaults: Optional[Dict[str, Any]] = None, ) -> None: """ Initialize the XrdctProcessor for converting BLISS HDF5 data to Nexus (NX) format. This processor can be integrated into a BLISS beamline configuration. Typical usage involves adding the following to a configuration yaml file: - name: xrdct_blissoda plugin: generic class: XrdctProcessor package: blissoda.xrdct.xrdct_processor """ if defaults is None: defaults = {} defaults.setdefault("trigger_at", "END") model_defaults = XrdctProcessorModel().model_dump() if "xrdct_parameters" in current_session.config.names_list: self._apply_yaml_to_xrdct_model("xrdct_parameters", model_defaults) merged_defaults = {**model_defaults, **defaults} # Store the initial defaults for reset() super().__setattr__("_initial_defaults", merged_defaults.copy()) super().__setattr__("_xrdct_model", XrdctProcessorModel(**merged_defaults)) super().__init__(config=config, defaults=merged_defaults) def __setattr__(self, name, value): """ Sets the attributes for the class and also the xrdct_model to validate inputs """ if hasattr(self, "_xrdct_model") and hasattr(self._xrdct_model, name): setattr(self._xrdct_model, name, value) value = getattr(self._xrdct_model, name) super().__setattr__(name, value) def _apply_yaml_to_xrdct_model(self, yml_name: str, model_defaults: dict): yml_params = current_session.config for key, value in yml_params.get_config(yml_name).items(): if key in model_defaults.keys(): model_defaults[key] = value def _find_lima_name(self, channels) -> str: for key, info in channels.items(): if (key.endswith("image") or key.endswith("frame")) and info.get( "dim" ) >= 2: return key.split(":")[0] raise ValueError( "Cannot determine lima_name automatically. Please enter a string for lima_name" ) def _build_nx_save_path(self) -> str: """ Uses the inputted nx_save_path or builds the path based on the scan filename if nx_save_path=None Ex: For ESRF_v3 files, the nx_save_path will be under: PROCESSED_DATA/sample/sample_dataset/ """ nx_path = self._bliss_hdf5_path.processed_dataset_file.with_suffix(".nx") return str(nx_path) def _get_dict_entry(self, d: dict, *dict_keys: str, default_val=None): """ Traverse a nested dictionary using `dict_keys`. If the final key has a companion '<key>@units', return a pint.Quantity. Otherwise return the raw value. """ # First, transverse the dictionary to check for a value current = d for dict_key in dict_keys: if not isinstance(current, dict) or dict_key not in current: return default_val current = current[dict_key] # Then, transverse the dictionary to check if the value has units parent = d for dict_key in dict_keys[:-1]: parent = parent.get(dict_key, {}) units_key = f"{dict_keys[-1]}@units" if isinstance(parent, dict) and units_key in parent: units = parent[units_key] try: return current * ureg[units] except Exception: # fallback if units cannot be parsed return current return current def _get_nxxrdct_inputs(self, scan: BlissScanType) -> Dict[str, Any]: scan_parameters = scan.scan_info # Save path for NXxrdct file self.nx_save_path = self.nx_save_path or self._build_nx_save_path() # Scan information title = self._get_dict_entry(scan_parameters, "title") start_time = self._get_dict_entry(scan_parameters, "start_time") end_time = self._get_dict_entry(scan_parameters, "end_time") # Beam energy beam_energy = self._get_dict_entry( scan_parameters, "positioners", "positioners_start", "energy" ) # Detector information self.lima_name = self.lima_name or self._find_lima_name( scan_parameters["channels"] ) detector_data = None # Data from AI processing detector_polar_angle = None # Polar angle output from AI detector_count_time = self._get_dict_entry(scan_parameters, "count_time") detector_distance_val = None detector_x_pixel_size = self._get_dict_entry( scan_parameters, "instrument", self.lima_name, "x_pixel_size" ) detector_y_pixel_size = self._get_dict_entry( scan_parameters, "instrument", self.lima_name, "y_pixel_size" ) detector_diffraction_channel = None # Source information source_name = self._get_dict_entry( scan_parameters, "instrument", "machine", "name" ) source_type = self._get_dict_entry( scan_parameters, "instrument", "machine", "type" ) source_probe = "x-ray" # Monochromator information monochromator_wavelength = self._get_dict_entry( scan_parameters, "instrument", "zaxis", "wavelength" ) # Sample information sample_name = self._get_dict_entry( scan_parameters, "dataset_metadata_snapshot", "Sample_name" ) sample_rotation_angles = None sample_translation_values = None sample_x_translation = None sample_y_translation = None sample_z_translation = None # Monitor information monitor_data = None monitor_mode = None monitor_preset = None monitor_integral = None return { "nx_path": self.nx_save_path, "title": title, "start_time": start_time, "end_time": end_time, "beam_incident_energy": beam_energy, "detector_data": detector_data, "detector_polar_angle": detector_polar_angle, "detector_count_time": detector_count_time, "detector_distance": detector_distance_val, "detector_x_pixel_size": detector_x_pixel_size, "detector_y_pixel_size": detector_y_pixel_size, "detector_diffraction_channel": detector_diffraction_channel, "source_name": source_name, "source_type": source_type, "source_probe": source_probe, "monochromator_wavelength": monochromator_wavelength, "sample_name": sample_name, "sample_rotation_angles": sample_rotation_angles, "sample_translation_values": sample_translation_values, "sample_x_translation": sample_x_translation, "sample_y_translation": sample_y_translation, "sample_z_translation": sample_z_translation, "monitor_data": monitor_data, "monitor_mode": monitor_mode, "monitor_preset": monitor_preset, "monitor_integral": monitor_integral, } def _collect_workflow_inputs(self, scan: BlissScanType) -> List[Dict[str, Any]]: workflow_inputs = task_inputs( task_identifier="ewoksxrdct.tasks.create_nxxrdct.CreateNxxrdct", inputs=self._get_nxxrdct_inputs(scan), ) return workflow_inputs def _get_workflows_dir(self) -> Path: """ Get the directory where the workflow output will be saved Ex: For ESRF_v3 files, the nx_save_path will be under: PROCESSED_DATA/sample/sample_dataset/ """ return self._bliss_hdf5_path.processed_dataset_path / "workflows" / "gallery" def _get_workflow_upload_parameters( self, scan: BlissScanType ) -> Optional[Dict[str, Any]]: if not scan.scan_info.get("save"): return None metadata = {"Sample_name": self.scan_saving.dataset["Sample_name"]} workflows_dir = self._get_workflows_dir() raw_directory = str(Path(self.filename).parent) return { "beamline": self.scan_saving.beamline, "proposal": self.scan_saving.proposal_name, "dataset": "workflows", "path": str(workflows_dir), "raw": [raw_directory], "metadata": metadata, } def _get_workflow(self): with open(resource_filename("xrdct", self.workflow), "r") as wf: return json.load(wf) def _get_submit_arguments(self, scan) -> Dict[str, Any]: inputs = self._collect_workflow_inputs(scan) kwargs = { "inputs": inputs, "outputs": [{"all": False}], "engine": self.engine, # "scheduler": "multithreading", } upload_parameters = None if self.data_portal_upload: upload_parameters = self._get_workflow_upload_parameters(scan) if upload_parameters: kwargs["upload_parameters"] = upload_parameters return kwargs
[docs] def workflow_destination(self) -> str: """ Returns the destination path for the workflow output. """ workflows_dir = self._get_workflows_dir() output_filename = Path(self.nx_save_path).with_suffix(".json").name return str(workflows_dir / output_filename)
[docs] def execute_workflow(self, scan: BlissScanType) -> None: self.scan_saving = scan.scan_saving self.filename = scan.scan_info.get("filename") or self.scan_saving.filename self._bliss_hdf5_path = ESRFPath(self.filename) workflow = self._get_workflow() kwargs = self._get_submit_arguments(scan) kwargs["convert_destination"] = self.workflow_destination() time.sleep(2) submit(args=(workflow,), kwargs=kwargs, queue=self.queue)
def _trigger_workflow_on_new_scan(self, scan: BlissScanType) -> None: self.execute_workflow(scan)
[docs] def reset_to_default_values(self) -> None: """ Reset the processor to its initial default values by recreating the Pydantic model and updating attributes accordingly. """ # Recompute defaults model_defaults = XrdctProcessorModel().model_dump() merged_defaults = {**model_defaults, **self._initial_defaults} # Reset _xrdct_model defaults super().__setattr__("_xrdct_model", XrdctProcessorModel(**merged_defaults)) # Reset class values to their defaults for name, value in merged_defaults.items(): super().__setattr__(name, value)