import logging
import os
import re
import shutil
from enum import Enum
from enum import auto
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
from ..bliss_globals import current_session
from ..ewoks_utils import get_future
from ..ewoks_utils import submit
from ..persistent.parameters import ParameterInfo
from ..processor import BaseProcessor
from ..processor import BlissScanType
from ..resources import resource_filename
from ..utils import counters
from ..utils import directories
from .parameters.fluoxas import fluoxas_workflow_inputs
from .parameters.mosaic_xrfmap import mosaic_xrfmap_workflow_inputs
from .parameters.utils import detectors
from .parameters.xrfmap import xrfmap_workflow_inputs
logger = logging.getLogger(__name__)
[docs]
class ProcessingType(Enum):
xrfmap = auto()
fluoxas = auto()
mosaic_xrfmap = auto()
def _as_list(value):
if isinstance(value, (str, int)):
return [value]
return list(value)
[docs]
class FluoProcessor(
BaseProcessor,
parameters=[
ParameterInfo(
"workflow",
category="workflows",
validator=str,
doc="Workflow to submit for every scan with MCA detectors.",
),
ParameterInfo(
"queue",
category="workflows",
validator=str,
doc="Ewoks queue to submit the jobs to.",
),
ParameterInfo(
"instrument_name",
category="workflows",
validator=str,
doc="Ewoks queue to submit the jobs to.",
),
ParameterInfo(
"detectors",
category="PyMCA",
validator=_as_list,
deprecated_names=["xrf_names"],
doc="MCA detectors defined by Bliss channel name, HDF5 name or number",
),
ParameterInfo(
"pymca_configs",
category="PyMCA",
validator=_as_list,
doc="One configuration file per detector on a single one to fit the sum",
),
ParameterInfo(
"energy_name",
category="PyMCA",
validator=str,
doc="Name of the primary beam energy counter",
),
ParameterInfo(
"quantification",
category="PyMCA",
validator=bool,
doc="PyMca quantification",
),
ParameterInfo(
"norm_counter_name",
category="Normalization",
validator=str,
doc="Name of the counter for normalization",
),
ParameterInfo(
"virtual_axes",
category="regrid",
validator=dict,
doc="Virtual positioners which are typically the sum of scalar and moving motor positions.",
),
ParameterInfo(
"ignore_axes",
category="regrid",
validator=bool,
doc="Positioners to ignore when detecting the map motors",
),
ParameterInfo(
"output_suffix",
category="results",
validator=str,
doc="Output file name suffix",
),
ParameterInfo(
"data_portal_upload",
category="data portal",
validator=bool,
doc="Upload results to the ESRF Data Portal",
),
],
):
"""A class that holds parameters related to online workflow triggering for PyMCA."""
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
defaults: Optional[Dict[str, Any]] = None,
**deprecated_defaults: Dict[str, Any],
) -> None:
defaults = self._merge_defaults(deprecated_defaults, defaults)
defaults.setdefault("workflow", "")
defaults.setdefault("detectors", [])
defaults.setdefault("output_suffix", "")
defaults.setdefault("quantification", True)
defaults.setdefault("virtual_axes", {})
defaults.setdefault("ignore_axes", [])
defaults.setdefault("data_portal_upload", False)
defaults.setdefault("trigger_at", "END")
super().__init__(config=config, defaults=defaults)
def _trigger_workflow_on_new_scan(self, scan: BlissScanType) -> Optional[dict]:
metadata, _ = self._on_new_scan(scan)
return metadata
def _on_new_scan(self, scan: BlissScanType) -> Tuple[Optional[dict], Optional[Any]]:
"""Executed at the end of every scan"""
future = None
metadata = None
if not self.scan_requires_processing(scan):
return metadata, future
# Submit arguments
processing_type = self._get_processing_type(scan)
if processing_type is None:
return metadata, future
workflow, kwargs = self.get_submit_arguments(scan, processing_type)
with_saving = scan.scan_info.get("save")
if with_saving:
kwargs["convert_destination"] = self._get_workflow_destination(scan)
upload_parameters = None
if self.data_portal_upload and with_saving:
# It is allowed to upload the same processed directory more than once.
upload_parameters = self._get_workflow_upload_parameters(scan)
if upload_parameters:
kwargs["upload_parameters"] = upload_parameters
# Trigger workflow from the current process.
logger.info("Starting workflow %r", workflow)
future = submit(args=(workflow,), kwargs=kwargs, queue=self.queue)
future = get_future(future.uuid)
return metadata, future
[docs]
def scan_requires_processing(self, scan: BlissScanType) -> bool:
"""Check whether raw data is saved and whether it contains MCA spectra."""
if not scan.scan_info.get("save"):
return False
if not self._has_mca_controllers(scan):
return False
return True
[docs]
def get_submit_arguments(
self, scan: BlissScanType, processing_type: ProcessingType
) -> Tuple[str, dict]:
if processing_type == ProcessingType.xrfmap:
kwargs = self.get_xrfmap_arguments(scan)
workflow_stem, inputs = xrfmap_workflow_inputs(**kwargs)
elif processing_type == ProcessingType.fluoxas:
kwargs = self.get_fluoxas_arguments(scan)
workflow_stem, inputs = fluoxas_workflow_inputs(**kwargs)
elif processing_type == ProcessingType.mosaic_xrfmap:
kwargs = self.get_mosaic_xrfmap_arguments(scan)
workflow_stem, inputs = mosaic_xrfmap_workflow_inputs(**kwargs)
else:
raise ValueError("Unknown FLUO processing type")
workflow = self.get_workflow(scan, workflow_stem)
kwargs = {"inputs": inputs, "outputs": [{"all": False}]}
return workflow, kwargs
def _set_workflow(self, scan: BlissScanType, filename) -> None:
"""Set the workflow filename for the scan"""
self.workflow = filename
[docs]
def get_filename(self, scan: BlissScanType) -> str:
filename = scan.scan_info.get("filename")
if filename:
return filename
return current_session.scan_saving.filename
def _get_axis_units_from_scan(
self, virtual_axes: Dict[str, str], scan: BlissScanType
) -> Optional[Dict[str, str]]:
units = scan.scan_info["positioners"]["positioners_units"]
units_dict = {}
for virtual_axis in virtual_axes:
for motor_name in _get_motors_from_expression(virtual_axis):
motor_unit = units[motor_name]
if motor_unit is None:
logger.warning(
f"Unit for motor `{motor_name}` is not set in beacon!"
)
else:
units_dict[motor_name] = motor_unit
return units_dict
[docs]
def get_xrfmap_arguments(self, scan: BlissScanType) -> dict:
return dict(
filename=self.get_filename(scan),
output_root_uri=self.master_output_url(scan),
scan_number=scan.scan_info.get("scan_nb"),
config_filenames=self.pymca_configs,
instrument_name=self.instrument_name,
detectors=self.h5_detector_names,
counter_name=self.norm_counter_name,
energy_name=self.energy_name,
quantification=self.quantification,
real_axes=self.get_scanned_motor_names(scan),
virtual_axes=self.virtual_axes,
ignore_axes=self.ignore_axes,
axis_units=self._get_axis_units_from_scan(self.virtual_axes.values(), scan),
)
[docs]
def get_mosaic_xrfmap_arguments(self, scan: BlissScanType) -> dict:
concat_filename = directories.master_output_filename(scan).replace(
".h5", "_concat.h5"
)
scan_ranges = _get_scan_ranges_from_sequence(scan)
return dict(
filenames=[self.get_filename(scan)],
output_root_uri=self.master_output_url(scan),
scan_ranges=[scan_ranges],
concat_bliss_scan_uri=f"{concat_filename}::/{scan_ranges[0]}.1",
config_filenames=self.pymca_configs,
instrument_name=self.instrument_name,
detectors=self.h5_detector_names,
counter_name=self.norm_counter_name,
energy_name=self.energy_name,
quantification=self.quantification,
real_axes=self.get_scanned_motor_names(scan),
virtual_axes=self.virtual_axes,
ignore_axes=self.ignore_axes,
axis_units=self._get_axis_units_from_scan(self.virtual_axes.values(), scan),
)
[docs]
def get_fluoxas_arguments(self, scan: BlissScanType) -> dict:
return dict(
filenames=[self.get_filename(scan)],
output_root_uri=self.master_output_url(scan),
scan_ranges=[_get_scan_ranges_from_sequence(scan)],
config_filenames=self.pymca_configs,
instrument_name=self.instrument_name,
detectors=self.h5_detector_names,
counter_name=self.norm_counter_name,
energy_name=self.energy_name,
quantification=self.quantification,
real_axes=self.get_scanned_motor_names(scan),
virtual_axes=self.virtual_axes
or None, # use None when empty (fallback to defaults)
ignore_axes=self.ignore_axes
or None, # use None when empty (fallback to defaults)
axis_units=self._get_axis_units_from_scan(self.virtual_axes.values(), scan),
)
def _get_processing_type(self, scan: BlissScanType) -> Optional[ProcessingType]:
# Note: we already know the scan contains MCA spectra so no need to check for that
if self._is_xrfmap(scan):
return ProcessingType.xrfmap
if self._is_fluoxas(scan):
return ProcessingType.fluoxas
if self._is_mosaic_xrfmap(scan):
return ProcessingType.mosaic_xrfmap
return None
def _is_xrfmap(self, scan: BlissScanType) -> bool:
return bool(scan.scan_info.get("is_xrfmap"))
def _is_fluoxas(self, scan: BlissScanType) -> bool:
return bool(scan.scan_info.get("is_fluoxas"))
def _is_mosaic_xrfmap(self, scan: BlissScanType) -> bool:
return bool(scan.scan_info.get("is_xrfmap_patch"))
[docs]
def get_workflow(self, scan: BlissScanType, workflow_stem: str) -> Optional[str]:
"""Get the workflow to execute for the scan and ensure it is located
in the proposal directory for user reference and worker accessibility.
"""
src_file = self._get_workflow(scan, workflow_stem)
if src_file is None:
return
dataset_filename = self.get_filename(scan)
workflow_directory = self._get_workflows_dir(dataset_filename)
dst_file = os.path.join(workflow_directory, os.path.basename(src_file))
if src_file != dst_file:
self._set_workflow(scan, dst_file)
if not os.path.exists(dst_file):
os.makedirs(workflow_directory, exist_ok=True)
shutil.copyfile(src_file, dst_file)
return dst_file
def _get_workflow(self, scan: BlissScanType, workflow_stem: str) -> Optional[str]:
return resource_filename("fluo", workflow_stem + ".json")
def _get_workflows_dir(self, dataset_filename: str) -> str:
"""Directory where to save the un-parameterized workflow."""
return directories.get_workflows_dir(dataset_filename)
def _get_workflow_destination(self, scan: BlissScanType) -> str:
"""Directory where to save the workflow parameterized for each scan."""
convert_destination = directories.workflow_destination(scan)
dirname, basename = os.path.split(convert_destination)
base, ext = os.path.splitext(basename)
return os.path.join(
dirname, "workflows", f"{base}{self.output_suffix or ''}{ext}"
)
@property
def detector_names(self) -> List[str]:
"""Bliss channel names or HDF5 names. Detector numbers are resolved as strings."""
return detectors.get_detector_names(self.detectors)
@property
def h5_detector_names(self) -> List[str]:
"""HDF5 names only."""
return counters.counter_to_h5_names(self.detector_names)
@property
def xrf_names(self) -> List[str]:
"""Bliss channel names or HDF5 names. Detector numbers are resolved as strings."""
return self.detector_names
@xrf_names.setter
def xrf_names(self, value) -> None:
"""Bliss channel names, HDF5 names or detector numbers."""
self.detectors = value
[docs]
def add_xrf_names(self, detector, *spectra: Sequence[int]) -> None:
"""Add an xrf detector and its spectra to the processor"""
detector_names = set(self.detector_names)
for counter in detector.counters:
if counter.name.startswith("spectrum_det"):
spectrum_id = int(counter.name.replace("spectrum_det", ""))
if spectra is None or spectrum_id in spectra:
detector_names.add(counter.fullname)
self.detectors = sorted(detector_names)
[docs]
def clear_xrf_names(self):
self.detectors = []
[docs]
def get_xrf_names(self, scan: BlissScanType) -> List[str]:
# If this is a sequence check the first real scan
if scan.scan_info.get("is_scan_sequence"):
channels = scan.streams["SUBSCANS"][0].info.get("channels", dict())
else:
channels = scan.scan_info.get("channels", dict())
return sorted(
(
detector_name
for detector_name in self.detector_names
if detector_name in channels
)
)
[docs]
def get_scanned_motor_names(self, scan: BlissScanType) -> Optional[List[str]]:
# TODO: extract motors for certain scans
return None
[docs]
def master_output_url(self, scan: BlissScanType) -> str:
"""URL which can be used to inspect the results after the processing."""
scan_nb = scan.scan_info.get("scan_nb")
filename = directories.master_output_filename(scan)
base, ext = os.path.splitext(filename)
return f"{base}{self.output_suffix or ''}{ext}::/{scan_nb}.1"
def _get_workflow_upload_parameters(self, scan: BlissScanType) -> dict:
raw_directory = os.path.dirname(self.get_filename(scan))
processed_directory = directories.scan_processed_directory(scan)
scan_saving = current_session.scan_saving
metadata = {"Sample_name": scan_saving.dataset["Sample_name"]}
return {
"beamline": scan_saving.beamline,
"proposal": scan_saving.proposal_name,
"dataset": "xrf_fit",
"path": processed_directory,
"raw": [raw_directory],
"metadata": metadata,
}
def _has_mca_controllers(self, scan: BlissScanType) -> bool:
controller_names = counters.counter_to_controller_names(self.detector_names)
ctrl_prefixes = [f"{ctrl_name}:" for ctrl_name in controller_names]
return any(
name
for name in scan.scan_info["channels"].keys()
if any(name.startswith(ctrl_prefix) for ctrl_prefix in ctrl_prefixes)
)
def _get_motors_from_expression(
expression: str,
start_var: str = "<",
end_var: str = ">",
) -> Tuple[str, str]:
pattern = rf"{re.escape(start_var)}([^{re.escape(end_var)}]+){re.escape(end_var)}"
return re.findall(pattern, expression)
def _get_scan_ranges_from_sequence(scan: BlissScanType):
final_scan = scan.streams["SUBSCANS"][:][-1]
return (2, final_scan.info["scan_nb"])