Source code for blissoda.id11.xrpd_processor

"""Automatic pyfai integration for every scan with saving and plotting with pdf extraction"""

import configparser
import os
import tempfile
from glob import glob
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

from ewoksutils.task_utils import task_inputs

from ..persistent.parameters import ParameterInfo
from ..processor import BlissScanType
from ..resources import resource_filename
from ..utils.pyfai import read_config
from ..xrpd.processor import XrpdProcessor
from .plotter import Id11XrpdPlotter


[docs] class Id11XrpdProcessor( XrpdProcessor, parameters=[ ParameterInfo("pyfai_config_directory", category="PyFai"), ParameterInfo("integration_options", category="PyFai"), ParameterInfo("pdf_enable", category="PDF"), ParameterInfo("pdf_options", category="PDF"), ParameterInfo("pdf_config_file", category="PDF"), ParameterInfo("pdf_average_every", category="PDF"), ParameterInfo("sum_enable", category="Sum"), ], ): plotter_class = Id11XrpdPlotter def __init__( self, config: Optional[Dict[str, Any]] = None, defaults: Optional[Dict[str, Any]] = None, **deprecated_defaults: Dict[str, Any], ) -> None: defaults = self._merge_defaults(deprecated_defaults, defaults) defaults.setdefault("pyfai_config_directory", tempfile.gettempdir()) defaults.setdefault("pdf_config_file", tempfile.gettempdir()) defaults.setdefault("sum_enable", False) defaults.setdefault( "workflow_with_saving", resource_filename("id11", "integrate_scan_with_saving.json"), ) defaults.setdefault( "workflow_without_saving", resource_filename("id11", "integrate_scan_without_saving.json"), ) super().__init__(config=config, defaults=defaults) def _info_categories(self) -> Dict[str, dict]: categories = super()._info_categories() if not self.lima_names: return categories categories["PyFai"]["integration_options"] = "... (see below)" lima_name = self.lima_names[0] categories["PyFai integration"] = { "1. JSON file": self._detector_config_filename(lima_name, ".json"), "2. PONI file": self._detector_config_filename(lima_name, ".poni"), "3. User": self.integration_options, "Merged": self.get_integration_options(scan=None, lima_name=lima_name), } categories["PDF"] = { "PDF Config File Path": self.pdf_config_file, "PDF Average Every N points": self.pdf_average_every, "PDF Config From User": self.pdf_options, "PDFGetX Config Parameters Merged": self.get_pdfgetx_options(lima_name), } categories["Sum"] = {"Sum last integration": self.sum_enable} return categories def _set_parameter(self, name, value): super()._set_parameter(name, value) if name == "pdf_enable": self._update_pdfgetx_status()
[docs] def get_config_filename(self, lima_name: str) -> Optional[str]: return None
[docs] def get_average_every(self) -> Optional[str]: return self.pdf_average_every
[docs] def get_integration_options( self, scan: BlissScanType, lima_name: str ) -> Optional[dict]: options = self._default_azint_options(lima_name) options.update(self._default_calib_options(lima_name)) integration_options = self.integration_options if integration_options: options.update(integration_options.to_dict()) return options
def _default_azint_options(self, lima_name: str) -> dict: filename = self._detector_config_filename(lima_name, ".json") return read_config(filename) def _default_calib_options(self, lima_name: str) -> dict: filename = self._detector_config_filename(lima_name, ".poni") return read_config(filename) def _detector_config_filename(self, lima_name: str, ext: str) -> Optional[str]: pyfai_config_directory = self.pyfai_config_directory if not pyfai_config_directory: pyfai_config_directory = tempfile.gettempdir() pattern = os.path.join(pyfai_config_directory, lima_name, f"*{ext}") files = sorted(glob(pattern)) if not files: return return files[-1]
[docs] def get_inputs(self, scan, lima_name: str) -> List[dict]: inputs = super().get_inputs(scan, lima_name) inputs += self._get_sum_inputs(scan, lima_name) if self.pdf_enable: inputs += self._get_pdfgetx_config_inputs(lima_name) inputs += self._get_pdfgetx_save_inputs(scan, lima_name) return inputs
def _update_pdfgetx_status(self): if self.pdf_enable: self.workflow_with_saving = resource_filename( "id11", "integrate_scan_with_saving_pdf.json" ) else: self.workflow_with_saving = resource_filename( "id11", "integrate_scan_with_saving.json" ) return self.pdf_enable
[docs] def get_pdfgetx_options(self, lima_name: str) -> Optional[dict]: config = configparser.ConfigParser() config.read(self.pdf_config_file) options = dict(config["DEFAULT"]) for key in options.keys(): # Patch config parser str -> float if options[key].replace(".", "").isnumeric(): options[key] = float(options[key]) pdf_options = self.pdf_options if pdf_options: options.update(pdf_options.to_dict()) return options
def _get_pdfgetx_config_inputs(self, lima_name: str) -> List[dict]: inputs = [ { "task_identifier": "PdfGetXConfig", "name": "pdfgetx_options_dict", "value": self.get_pdfgetx_options(lima_name), }, { "task_identifier": "PdfGetXAverage", "name": "average_every", "value": self.get_average_every(), }, ] return inputs def _get_pdfgetx_save_inputs(self, scan, lima_name: str) -> List[dict]: # Save PDF data in the same Nxentry as the PyFAI results # pyfai_filename = self.master_output_filename(scan) # Save PDF data in ascii files as well ascii_filename = self._get_pdfgetx_ascii_filename(scan, lima_name) inputs = [ { "task_identifier": "PdfGetXSaveAscii", "name": "filename", "value": ascii_filename, }, { "task_identifier": "PdfGetXSaveNexus", "name": "pdfgetx_options", "value": self.get_pdfgetx_options(lima_name), }, { "task_identifier": "PdfGetXSaveNexus", "name": "detector_name", "value": lima_name, }, { "task_identifier": "PdfGetXSaveNexus", "name": "output_filename", "value": self.master_output_filename(scan), }, { "task_identifier": "PdfGetXSaveNexus", "name": "scan", "value": scan.scan_info.get("scan_nb"), }, ] return inputs def _get_pdfgetx_ascii_filename(self, scan, lima_name: str) -> str: """Unique name per scan and per detector when more than one.""" filename = self.get_filename(scan) root = self.scan_processed_directory(scan) basename = os.path.basename(filename) stem, _ = os.path.splitext(basename) stem_parts = [stem] if len(self.lima_names) > 1: stem_parts.append(lima_name) scan_nb = scan.scan_info.get("scan_nb") stem_parts.append(f"{scan_nb:04d}") basename = "_".join(stem_parts) # PdfGetXSaveAscii expects a name with the .h5 extension return os.path.join(root, f"{basename}.h5") def _summed_nxprocess_name(self, lima_name: str) -> str: return f"{lima_name}_summed" def _get_sum_output_url(self, scan, lima_name: str) -> str: return ( f"{self.master_output_url(scan)}/{self._summed_nxprocess_name(lima_name)}" ) def _get_sum_inputs(self, scan, lima_name: str): inputs = { "enabled": self.sum_enable, "output_nxprocess_url": self._get_sum_output_url(scan, lima_name), } external_output_url = self.external_output_url(scan, lima_name) if external_output_url: inputs["external_nxprocess_url"] = ( f"{external_output_url}/{self._summed_nxprocess_name(lima_name)}", ) return task_inputs(task_identifier="SumIntegrationResults", inputs=inputs) def _trigger_plotting(self, scan: BlissScanType, lima_name: str, future) -> None: if self.plotter is None: return start_time = scan.scan_info.get("start_time") scan_name = self._get_scan_name(scan) output_urls = {"integrate": self._data_to_plot_url(scan, lima_name)} if self.sum_enable: output_urls["sum"] = ( f"{self._get_sum_output_url(scan, lima_name)}/integrated" ) self.plotter.handle_workflow_result( future=future, timestamp=start_time, lima_name=lima_name, scan_name=scan_name, output_url=output_urls, retry_timeout=self.retry_timeout, retry_period=self.retry_period, )