"""User API for HDF5 conversion on the Bliss repl"""
import os
import shutil
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from ..bliss_globals import current_session
from ..ewoks_utils import submit
from ..persistent.parameters import ParameterInfo
from ..processor import BaseProcessor
from ..processor import BlissScanType
from ..resources import resource_filename
from ..utils import counters
from ..utils import directories
def _coefficients_validator(value):
if value is None:
return None
if not isinstance(value, (list, tuple)):
raise TypeError("Coefficients must be a list or tuple")
if len(value) < 2:
raise ValueError("Coefficients must be a list of 2 or more values")
return list(value)
def _map_shape_validator(value):
if value is None:
return None
if not isinstance(value, (list, tuple)):
raise TypeError("Shape must be a list or tuple")
if len(value) != 2:
raise ValueError("Shape must have two numbers: (nslow, nfast)")
return tuple(value)
def _positive_number_validator(value):
if value is None:
return None
if value < 0:
raise ValueError("Value cannot be negative")
return value
def _path_validator(value):
if value is None:
return None
if not os.path.exists(value):
raise FileNotFoundError(value)
return value
[docs]
class Id16bXeolProcessor(
BaseProcessor,
parameters=[
ParameterInfo("workflow", category="workflows"),
ParameterInfo("queue", category="workflows", deprecated_names=["worker"]),
ParameterInfo("main_counter", category="data"),
ParameterInfo(
"calibration_x",
category="calibration",
validator=_coefficients_validator,
doc="Spectral energy calibration polynomial coefficients.",
),
ParameterInfo(
"calibration_y",
category="calibration",
validator=_coefficients_validator,
doc="Spectral intensity calibration polynomial coefficients.",
),
ParameterInfo(
"threshold",
category="filters",
validator=_positive_number_validator,
doc="Total spectral intensity threshold.",
),
ParameterInfo("normalization_counter", category="normalization"),
ParameterInfo(
"configuration_path", validator=_path_validator, category="PyMCA"
),
ParameterInfo("output_suffix", category="results"),
ParameterInfo("data_portal_upload", category="data portal"),
],
):
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
defaults: Optional[Dict[str, Any]] = None,
) -> None:
if defaults is None:
defaults = {}
defaults.setdefault("trigger_at", "END")
defaults.setdefault("queue", "celery")
defaults.setdefault("calibration_x", None) # default could be None of [0,1]
defaults.setdefault("calibration_y", None) # default could be None of [0,1]
defaults.setdefault("threshold", 0)
defaults.setdefault("workflow", resource_filename("id16b", "id16b_xeol.json"))
defaults.setdefault("output_suffix", "_xeol")
super().__init__(config=config, defaults=defaults)
def _trigger_workflow_on_new_scan(self, scan: BlissScanType) -> None:
self.on_new_scan_metadata(scan)
[docs]
def scan_requires_processing(self, scan: BlissScanType) -> bool:
if not scan.scan_info.get("save"):
return False
if scan.scan_info["type"] not in ("fscan2d", "amesh", "dmesh"):
return False
if not self._has_xeol_controllers(scan):
return False
return True
def _has_xeol_controllers(self, scan: BlissScanType) -> bool:
controller_names = counters.counter_to_controller_names([self.main_counter])
ctrl_prefixes = [f"{ctrl_name}:" for ctrl_name in controller_names]
return any(
name
for name in scan.scan_info["channels"].keys()
if any(name.startswith(ctrl_prefix) for ctrl_prefix in ctrl_prefixes)
)
[docs]
def get_submit_arguments(self, scan: BlissScanType) -> dict:
return {
"inputs": self.get_inputs(scan),
"outputs": [{"all": False}],
}
[docs]
def get_filename(self, scan: BlissScanType) -> str:
"""Raw data."""
filename = scan.scan_info.get("filename")
if filename:
return filename
return current_session.scan_saving.filename
[docs]
def get_output_filename(self, scan: BlissScanType) -> str:
"""Processed data."""
filename = directories.master_output_filename(scan)
base, ext = os.path.splitext(filename)
return f"{base}{self.output_suffix or ''}{ext}"
[docs]
def get_workflow(self, scan: BlissScanType) -> Optional[str]:
"""Get the workflow to execute for the scan and ensure it is located
in the proposal directory for user reference and worker accessibility.
"""
src_file = self._get_workflow(scan)
if src_file is None:
return
dataset_filename = self.get_filename(scan)
workflow_directory = self._get_workflows_dir(dataset_filename)
dst_file = os.path.join(workflow_directory, os.path.basename(src_file))
if src_file != dst_file:
self._set_workflow(scan, dst_file)
if not os.path.exists(dst_file):
os.makedirs(workflow_directory, exist_ok=True)
shutil.copyfile(src_file, dst_file)
return dst_file
def _get_workflow(self, scan: BlissScanType) -> Optional[str]:
return resource_filename("id16b", "id16b_xeol.json")
def _set_workflow(self, scan: BlissScanType, filename) -> None:
self.workflow = filename
def _get_workflows_dir(self, dataset_filename: str) -> str:
"""Directory where to save the un-parameterized workflow."""
return directories.get_workflows_dir(dataset_filename)
def _get_workflow_destination(self, scan: BlissScanType) -> str:
"""Directory where to save the workflow parameterized for each scan."""
convert_destination = directories.workflow_destination(scan)
dirname, basename = os.path.split(convert_destination)
base, ext = os.path.splitext(basename)
return os.path.join(
dirname, "workflows", f"{base}{self.output_suffix or ''}{ext}"
)
def _get_workflow_upload_parameters(self, scan: BlissScanType) -> dict:
raw_directory = os.path.dirname(self.get_filename(scan))
processed_directory = directories.scan_processed_directory(scan)
scan_saving = current_session.scan_saving
metadata = {"Sample_name": scan_saving.dataset["Sample_name"]}
return {
"beamline": scan_saving.beamline,
"proposal": scan_saving.proposal_name,
"dataset": "xeol_fit",
"path": processed_directory,
"raw": [raw_directory],
"metadata": metadata,
}