import logging
from pathlib import Path
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from esrf_pathlib import ESRFPath
from ewoksutils.task_utils import task_inputs
from ..persistent.parameters import ParameterInfo
from ..processor import BlissScanType
from ..resources import resource_filename
from ..utils.directories import scan_processed_directory
from ..xrpd.processor import XrpdProcessor
from .utils import assert_url_exists
from .utils import export_filename_prefix
from .utils import get_current_filename
from .utils import get_energy
from .utils import subtracted_nxprocess_name
[docs]
class Bm02XrpdProcessor(
XrpdProcessor,
parameters=[
ParameterInfo("config_filename", category="PyFai"),
ParameterInfo("integration_options", category="PyFai"),
ParameterInfo("energy", category="PyFai"),
ParameterInfo(
"empty_cell_subtraction_options", category="Empty cell subtraction"
),
ParameterInfo("ascii_export_enabled", category="ASCII export"),
ParameterInfo("save_as_zip", category="ASCII export"),
ParameterInfo("max_files", category="ASCII export"),
ParameterInfo("exported_counters", category="ASCII export"),
ParameterInfo("exported_positioners", category="ASCII export"),
],
):
DEFAULT_WORKFLOW: Optional[str] = resource_filename(
"bm02", "integrate_scan_with_saving_subtract_ascii.json"
)
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
defaults: Optional[Dict[str, Any]] = None,
**deprecated_defaults: Dict[str, Any],
) -> None:
defaults = self._merge_defaults(deprecated_defaults, defaults)
defaults.setdefault("config_filename", {"WOS": "", "D5": ""})
defaults.setdefault("energy", None)
defaults.setdefault(
"integration_options",
{
"WOS": {
"method": "no_csr_cython",
"nbpt_rad": 4096,
"unit": "q_nm^-1",
"error_model": "poisson",
},
"D5": {
"method": "no_csr_cython",
"nbpt_rad": 4096,
"unit": "q_nm^-1",
"error_model": "poisson",
},
},
)
defaults.setdefault(
"empty_cell_subtraction_options",
{
"CdTe": {
"enabled": False,
"cell_pattern_url": None,
"empty_cell_factor": "1",
},
"WOS": {
"enabled": False,
"cell_pattern_url": None,
"empty_cell_factor": 1,
},
},
)
defaults.setdefault("ascii_export_enabled", False)
defaults.setdefault("save_as_zip", False)
defaults.setdefault("max_files", 1000)
defaults.setdefault("exported_counters", None)
defaults.setdefault("exported_positioners", None)
super().__init__(config=config, defaults=defaults)
[docs]
def get_config_filename(self, lima_name: str) -> Optional[str]:
try:
return self.config_filename[lima_name]
except KeyError:
raise RuntimeError(
f"Missing pyfai configuration file (poni or json) for '{lima_name}'"
) from None
[docs]
def get_integration_options(self, scan: BlissScanType, lima_name: str) -> dict:
try:
return self.integration_options.get(lima_name, {})
except KeyError:
raise RuntimeError(
f"Missing pyfai integration options for '{lima_name}'"
) from None
def _is_save_ascii_enabled(self, scan: BlissScanType, lima_name: str) -> bool:
if not self.ascii_export_enabled:
return False
nb_sectors = self.get_integration_options(scan, lima_name).get("nbpt_azim", 1)
npoints = scan.scan_info.get("npoints", float("inf"))
if npoints * nb_sectors >= self.max_files:
logging.warning(f"""
Disabling ASCII export for scan {scan.scan_info.get("scan_nb")}
since the number of exported files ({npoints}*{nb_sectors} = {npoints * nb_sectors})
will exceed the max number ({self.max_files}).
""")
return False
return True
[docs]
def enable_ascii_export(self):
self.ascii_export_enabled = True
[docs]
def disable_ascii_export(self):
self.ascii_export_enabled = False
def _export_folder(self, scan: BlissScanType):
return Path(scan_processed_directory(scan)) / "export"
def _get_ascii_export_inputs(
self, scan: BlissScanType, lima_name: str
) -> List[dict]:
export_folder = self._export_folder(scan)
counter_urls = []
if self.exported_counters:
counter_urls.extend(
f"{self.master_output_url(scan)}/measurement/{counter}"
for counter in self.exported_counters
)
if self.exported_positioners:
counter_urls.extend(
f"{self.master_output_url(scan)}/instrument/positioners/{positioner}"
for positioner in self.exported_positioners
)
raw_inputs = self._get_ascii_export_task_inputs(
task_id="SaveRawNexusPatternsAsAscii",
export_folder=export_folder,
filename_prefix=export_filename_prefix(scan, lima_name),
enabled=self._is_save_ascii_enabled(scan, lima_name),
counter_urls=counter_urls,
)
subtracted_inputs = self._get_ascii_export_task_inputs(
task_id="SaveSubtractedNexusPatternsAsAscii",
export_folder=export_folder,
filename_prefix=f"{export_filename_prefix(scan, lima_name)}_subtracted",
enabled=self._is_cell_subtraction_enabled(scan, lima_name)
and self._is_save_ascii_enabled(scan, lima_name),
counter_urls=counter_urls,
)
return [*raw_inputs, *subtracted_inputs]
def _get_ascii_export_task_inputs(
self,
task_id: str,
export_folder: Path,
filename_prefix: str,
enabled: bool,
counter_urls: List[str],
) -> List[dict]:
ascii_basename_template = filename_prefix + "_f%04d.dat"
if self.save_as_zip:
output_filename_template = ascii_basename_template
output_archive_filename = export_folder / f"{filename_prefix}.zip"
else:
output_filename_template = export_folder / ascii_basename_template
output_archive_filename = ""
return task_inputs(
id=task_id,
inputs={
"enabled": enabled,
"output_filename_template": str(output_filename_template),
"output_archive_filename": str(output_archive_filename),
"sector_suffix_template": "sect%04d",
"counter_urls": counter_urls,
"raise_when_missing_counter": True,
},
)
def _is_cell_subtraction_enabled(self, scan: BlissScanType, lima_name: str) -> bool:
if "nbpt_azim" in self.get_integration_options(scan, lima_name):
logging.warning(
f"Cannot subtract empty cell from 2D integrated patterns. Disabling cell subtraction for detector {lima_name} of scan {scan.scan_info.get('scan_nb')}"
)
return False
if lima_name not in self.empty_cell_subtraction_options:
return False
return self.empty_cell_subtraction_options[lima_name].get("enabled", False)
def _get_empty_cell_subtraction_inputs(
self, scan: BlissScanType, lima_name: str
) -> List[dict]:
inputs = self._get_data_access_inputs(
scan, lima_name, "SubtractBackgroundPattern"
)
subtraction_options = self.empty_cell_subtraction_options.get(lima_name, {})
inputs += task_inputs(
task_identifier="SubtractBackgroundPattern",
inputs={
"enabled": self._is_cell_subtraction_enabled(scan, lima_name),
"output_nxprocess_url": f"{self.master_output_url(scan)}/{subtracted_nxprocess_name(lima_name)}",
"background_nxdata_url": subtraction_options.get(
"cell_pattern_url", None
),
"background_factor": subtraction_options.get("empty_cell_factor", 1),
},
)
external_output_url = self.external_output_url(scan, lima_name)
if external_output_url:
inputs.append(
{
"task_identifier": "SubtractBackgroundPattern",
"name": "external_nxprocess_url",
"value": f"{external_output_url}/{subtracted_nxprocess_name(lima_name)}",
}
)
return inputs
def _data_to_plot_url(self, scan: BlissScanType, lima_name: str):
if not scan.scan_info.get("save"):
return None
output_url = self.online_output_url(scan, lima_name)
if self._is_cell_subtraction_enabled(scan, lima_name):
return f"{output_url}/{subtracted_nxprocess_name(lima_name)}/integrated"
else:
return f"{output_url}/{lima_name}_integrate/integrated"
[docs]
def set_cell_pattern_url(
self, sample: str, dataset: str, scan_number: int, detector: str
) -> None:
filename = ESRFPath(get_current_filename())
cell_filename = filename.replace_fields(
collection=sample, dataset=dataset
).processed_dataset_file
cell_pattern_url = (
f"{cell_filename}::/{scan_number}.1/{detector}_integrate/integrated"
)
try:
assert_url_exists(cell_pattern_url)
except FileNotFoundError as e:
logging.warning(f"{e}. Please select another URL.")
return
self.empty_cell_subtraction_options[detector][
"cell_pattern_url"
] = f"{cell_filename}::/{scan_number}.1/{detector}_integrate/integrated"
[docs]
def enable_empty_cell_subtraction(self, *detectors):
for detector_obj in detectors:
detector: str = detector_obj.name
if detector not in self.empty_cell_subtraction_options:
self.empty_cell_subtraction_options[detector] = {}
self.empty_cell_subtraction_options[detector]["enabled"] = True
if (
self.empty_cell_subtraction_options[detector].get("cell_pattern", None)
is None
):
print(
f"Do not forget to set a cell_pattern_url for {detector} now that cell subtraction is enabled!"
)
[docs]
def disable_empty_cell_subtraction(self, *detectors):
for detector in detectors:
if detector.name not in self.empty_cell_subtraction_options:
self.empty_cell_subtraction_options[detector.name] = {}
self.empty_cell_subtraction_options[detector.name]["enabled"] = False
[docs]
def reset_energy(self):
self.energy = None
def _info_categories(self) -> Dict[str, dict]:
categories = super()._info_categories()
if categories["PyFai"]["energy"].value is None:
categories["PyFai"]["energy"] = "From JSON"
return categories