Source code for blissoda.bm02.xrpd_processor

import logging
from pathlib import Path
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

from esrf_pathlib import ESRFPath
from ewoksutils.task_utils import task_inputs

from ..persistent.parameters import ParameterInfo
from ..processor import BlissScanType
from ..resources import resource_filename
from ..utils.directories import scan_processed_directory
from ..xrpd.processor import XrpdProcessor
from .utils import assert_url_exists
from .utils import export_filename_prefix
from .utils import get_current_filename
from .utils import get_energy
from .utils import subtracted_nxprocess_name


[docs] class Bm02XrpdProcessor( XrpdProcessor, parameters=[ ParameterInfo("config_filename", category="PyFai"), ParameterInfo("integration_options", category="PyFai"), ParameterInfo("energy", category="PyFai"), ParameterInfo( "empty_cell_subtraction_options", category="Empty cell subtraction" ), ParameterInfo("ascii_export_enabled", category="ASCII export"), ParameterInfo("save_as_zip", category="ASCII export"), ParameterInfo("max_files", category="ASCII export"), ParameterInfo("exported_counters", category="ASCII export"), ParameterInfo("exported_positioners", category="ASCII export"), ], ): DEFAULT_WORKFLOW: Optional[str] = resource_filename( "bm02", "integrate_scan_with_saving_subtract_ascii.json" ) def __init__( self, config: Optional[Dict[str, Any]] = None, defaults: Optional[Dict[str, Any]] = None, **deprecated_defaults: Dict[str, Any], ) -> None: defaults = self._merge_defaults(deprecated_defaults, defaults) defaults.setdefault("config_filename", {"WOS": "", "D5": ""}) defaults.setdefault("energy", None) defaults.setdefault( "integration_options", { "WOS": { "method": "no_csr_cython", "nbpt_rad": 4096, "unit": "q_nm^-1", "error_model": "poisson", }, "D5": { "method": "no_csr_cython", "nbpt_rad": 4096, "unit": "q_nm^-1", "error_model": "poisson", }, }, ) defaults.setdefault( "empty_cell_subtraction_options", { "CdTe": { "enabled": False, "cell_pattern_url": None, "empty_cell_factor": "1", }, "WOS": { "enabled": False, "cell_pattern_url": None, "empty_cell_factor": 1, }, }, ) defaults.setdefault("ascii_export_enabled", False) defaults.setdefault("save_as_zip", False) defaults.setdefault("max_files", 1000) defaults.setdefault("exported_counters", None) defaults.setdefault("exported_positioners", None) super().__init__(config=config, defaults=defaults)
[docs] def get_config_filename(self, lima_name: str) -> Optional[str]: try: return self.config_filename[lima_name] except KeyError: raise RuntimeError( f"Missing pyfai configuration file (poni or json) for '{lima_name}'" ) from None
[docs] def get_integration_options(self, scan: BlissScanType, lima_name: str) -> dict: try: return self.integration_options.get(lima_name, {}) except KeyError: raise RuntimeError( f"Missing pyfai integration options for '{lima_name}'" ) from None
def _is_save_ascii_enabled(self, scan: BlissScanType, lima_name: str) -> bool: if not self.ascii_export_enabled: return False nb_sectors = self.get_integration_options(scan, lima_name).get("nbpt_azim", 1) npoints = scan.scan_info.get("npoints", float("inf")) if npoints * nb_sectors >= self.max_files: logging.warning(f""" Disabling ASCII export for scan {scan.scan_info.get("scan_nb")} since the number of exported files ({npoints}*{nb_sectors} = {npoints * nb_sectors}) will exceed the max number ({self.max_files}). """) return False return True
[docs] def enable_ascii_export(self): self.ascii_export_enabled = True
[docs] def disable_ascii_export(self): self.ascii_export_enabled = False
def _export_folder(self, scan: BlissScanType): return Path(scan_processed_directory(scan)) / "export" def _get_ascii_export_inputs( self, scan: BlissScanType, lima_name: str ) -> List[dict]: export_folder = self._export_folder(scan) counter_urls = [] if self.exported_counters: counter_urls.extend( f"{self.master_output_url(scan)}/measurement/{counter}" for counter in self.exported_counters ) if self.exported_positioners: counter_urls.extend( f"{self.master_output_url(scan)}/instrument/positioners/{positioner}" for positioner in self.exported_positioners ) raw_inputs = self._get_ascii_export_task_inputs( task_id="SaveRawNexusPatternsAsAscii", export_folder=export_folder, filename_prefix=export_filename_prefix(scan, lima_name), enabled=self._is_save_ascii_enabled(scan, lima_name), counter_urls=counter_urls, ) subtracted_inputs = self._get_ascii_export_task_inputs( task_id="SaveSubtractedNexusPatternsAsAscii", export_folder=export_folder, filename_prefix=f"{export_filename_prefix(scan, lima_name)}_subtracted", enabled=self._is_cell_subtraction_enabled(scan, lima_name) and self._is_save_ascii_enabled(scan, lima_name), counter_urls=counter_urls, ) return [*raw_inputs, *subtracted_inputs] def _get_ascii_export_task_inputs( self, task_id: str, export_folder: Path, filename_prefix: str, enabled: bool, counter_urls: List[str], ) -> List[dict]: ascii_basename_template = filename_prefix + "_f%04d.dat" if self.save_as_zip: output_filename_template = ascii_basename_template output_archive_filename = export_folder / f"{filename_prefix}.zip" else: output_filename_template = export_folder / ascii_basename_template output_archive_filename = "" return task_inputs( id=task_id, inputs={ "enabled": enabled, "output_filename_template": str(output_filename_template), "output_archive_filename": str(output_archive_filename), "sector_suffix_template": "sect%04d", "counter_urls": counter_urls, "raise_when_missing_counter": True, }, ) def _is_cell_subtraction_enabled(self, scan: BlissScanType, lima_name: str) -> bool: if "nbpt_azim" in self.get_integration_options(scan, lima_name): logging.warning( f"Cannot subtract empty cell from 2D integrated patterns. Disabling cell subtraction for detector {lima_name} of scan {scan.scan_info.get('scan_nb')}" ) return False if lima_name not in self.empty_cell_subtraction_options: return False return self.empty_cell_subtraction_options[lima_name].get("enabled", False) def _get_empty_cell_subtraction_inputs( self, scan: BlissScanType, lima_name: str ) -> List[dict]: inputs = self._get_data_access_inputs( scan, lima_name, "SubtractBackgroundPattern" ) subtraction_options = self.empty_cell_subtraction_options.get(lima_name, {}) inputs += task_inputs( task_identifier="SubtractBackgroundPattern", inputs={ "enabled": self._is_cell_subtraction_enabled(scan, lima_name), "output_nxprocess_url": f"{self.master_output_url(scan)}/{subtracted_nxprocess_name(lima_name)}", "background_nxdata_url": subtraction_options.get( "cell_pattern_url", None ), "background_factor": subtraction_options.get("empty_cell_factor", 1), }, ) external_output_url = self.external_output_url(scan, lima_name) if external_output_url: inputs.append( { "task_identifier": "SubtractBackgroundPattern", "name": "external_nxprocess_url", "value": f"{external_output_url}/{subtracted_nxprocess_name(lima_name)}", } ) return inputs def _data_to_plot_url(self, scan: BlissScanType, lima_name: str): if not scan.scan_info.get("save"): return None output_url = self.online_output_url(scan, lima_name) if self._is_cell_subtraction_enabled(scan, lima_name): return f"{output_url}/{subtracted_nxprocess_name(lima_name)}/integrated" else: return f"{output_url}/{lima_name}_integrate/integrated"
[docs] def set_cell_pattern_url( self, sample: str, dataset: str, scan_number: int, detector: str ) -> None: filename = ESRFPath(get_current_filename()) cell_filename = filename.replace_fields( collection=sample, dataset=dataset ).processed_dataset_file cell_pattern_url = ( f"{cell_filename}::/{scan_number}.1/{detector}_integrate/integrated" ) try: assert_url_exists(cell_pattern_url) except FileNotFoundError as e: logging.warning(f"{e}. Please select another URL.") return self.empty_cell_subtraction_options[detector][ "cell_pattern_url" ] = f"{cell_filename}::/{scan_number}.1/{detector}_integrate/integrated"
[docs] def enable_empty_cell_subtraction(self, *detectors): for detector_obj in detectors: detector: str = detector_obj.name if detector not in self.empty_cell_subtraction_options: self.empty_cell_subtraction_options[detector] = {} self.empty_cell_subtraction_options[detector]["enabled"] = True if ( self.empty_cell_subtraction_options[detector].get("cell_pattern", None) is None ): print( f"Do not forget to set a cell_pattern_url for {detector} now that cell subtraction is enabled!" )
[docs] def disable_empty_cell_subtraction(self, *detectors): for detector in detectors: if detector.name not in self.empty_cell_subtraction_options: self.empty_cell_subtraction_options[detector.name] = {} self.empty_cell_subtraction_options[detector.name]["enabled"] = False
[docs] def reset_energy(self): self.energy = None
[docs] def get_inputs(self, scan: BlissScanType, lima_name: str) -> List[dict]: inputs = super().get_inputs(scan, lima_name) inputs += self._get_ascii_export_inputs(scan, lima_name) inputs += self._get_empty_cell_subtraction_inputs(scan, lima_name) energy = get_energy(self.energy) if energy is not None: inputs += task_inputs( task_identifier="PyFaiConfig", inputs={"energy": energy} ) return inputs
def _info_categories(self) -> Dict[str, dict]: categories = super()._info_categories() if categories["PyFai"]["energy"].value is None: categories["PyFai"]["energy"] = "From JSON" return categories