Source code for blissoda.id16b.xeol_processor

"""User API for HDF5 conversion on the Bliss repl"""

import os
import shutil
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

from ..bliss_globals import current_session
from ..ewoks_utils import submit
from ..persistent.parameters import ParameterInfo
from ..processor import BaseProcessor
from ..processor import BlissScanType
from ..resources import resource_filename
from ..utils import counters
from ..utils import directories


def _coefficients_validator(value):
    if value is None:
        return None
    if not isinstance(value, (list, tuple)):
        raise TypeError("Coefficients must be a list or tuple")
    if len(value) < 2:
        raise ValueError("Coefficients must be a list of 2 or more values")
    return list(value)


def _map_shape_validator(value):
    if value is None:
        return None
    if not isinstance(value, (list, tuple)):
        raise TypeError("Shape must be a list or tuple")
    if len(value) != 2:
        raise ValueError("Shape must have two numbers: (nslow, nfast)")
    return tuple(value)


def _positive_number_validator(value):
    if value is None:
        return None
    if value < 0:
        raise ValueError("Value cannot be negative")
    return value


def _path_validator(value):
    if value is None:
        return None
    if not os.path.exists(value):
        raise FileNotFoundError(value)
    return value


[docs] class Id16bXeolProcessor( BaseProcessor, parameters=[ ParameterInfo("workflow", category="workflows"), ParameterInfo("queue", category="workflows", deprecated_names=["worker"]), ParameterInfo("main_counter", category="data"), ParameterInfo( "calibration_x", category="calibration", validator=_coefficients_validator, doc="Spectral energy calibration polynomial coefficients.", ), ParameterInfo( "calibration_y", category="calibration", validator=_coefficients_validator, doc="Spectral intensity calibration polynomial coefficients.", ), ParameterInfo( "threshold", category="filters", validator=_positive_number_validator, doc="Total spectral intensity threshold.", ), ParameterInfo("normalization_counter", category="normalization"), ParameterInfo( "configuration_path", validator=_path_validator, category="PyMCA" ), ParameterInfo("output_suffix", category="results"), ParameterInfo("data_portal_upload", category="data portal"), ], ): def __init__( self, config: Optional[Dict[str, Any]] = None, defaults: Optional[Dict[str, Any]] = None, ) -> None: if defaults is None: defaults = {} defaults.setdefault("trigger_at", "END") defaults.setdefault("queue", "celery") defaults.setdefault("calibration_x", None) # default could be None of [0,1] defaults.setdefault("calibration_y", None) # default could be None of [0,1] defaults.setdefault("threshold", 0) defaults.setdefault("workflow", resource_filename("id16b", "id16b_xeol.json")) defaults.setdefault("output_suffix", "_xeol") super().__init__(config=config, defaults=defaults)
[docs] def on_new_scan_metadata(self, scan: BlissScanType) -> None: if not self.scan_requires_processing(scan): return kwargs = self.get_submit_arguments(scan) workflow = self.get_workflow(scan) with_saving = scan.scan_info.get("save") if with_saving: kwargs["convert_destination"] = self._get_workflow_destination(scan) upload_parameters = None if self.data_portal_upload and with_saving: # It is allowed to upload the same processed directory more than once. upload_parameters = self._get_workflow_upload_parameters(scan) if upload_parameters: kwargs["upload_parameters"] = upload_parameters _ = submit(args=(workflow,), kwargs=kwargs, queue=self.queue)
def _trigger_workflow_on_new_scan(self, scan: BlissScanType) -> None: self.on_new_scan_metadata(scan)
[docs] def scan_requires_processing(self, scan: BlissScanType) -> bool: if not scan.scan_info.get("save"): return False if scan.scan_info["type"] not in ("fscan2d", "amesh", "dmesh"): return False if not self._has_xeol_controllers(scan): return False return True
def _has_xeol_controllers(self, scan: BlissScanType) -> bool: controller_names = counters.counter_to_controller_names([self.main_counter]) ctrl_prefixes = [f"{ctrl_name}:" for ctrl_name in controller_names] return any( name for name in scan.scan_info["channels"].keys() if any(name.startswith(ctrl_prefix) for ctrl_prefix in ctrl_prefixes) )
[docs] def get_submit_arguments(self, scan: BlissScanType) -> dict: return { "inputs": self.get_inputs(scan), "outputs": [{"all": False}], }
[docs] def get_filename(self, scan: BlissScanType) -> str: """Raw data.""" filename = scan.scan_info.get("filename") if filename: return filename return current_session.scan_saving.filename
[docs] def get_output_filename(self, scan: BlissScanType) -> str: """Processed data.""" filename = directories.master_output_filename(scan) base, ext = os.path.splitext(filename) return f"{base}{self.output_suffix or ''}{ext}"
[docs] def get_workflow(self, scan: BlissScanType) -> Optional[str]: """Get the workflow to execute for the scan and ensure it is located in the proposal directory for user reference and worker accessibility. """ src_file = self._get_workflow(scan) if src_file is None: return dataset_filename = self.get_filename(scan) workflow_directory = self._get_workflows_dir(dataset_filename) dst_file = os.path.join(workflow_directory, os.path.basename(src_file)) if src_file != dst_file: self._set_workflow(scan, dst_file) if not os.path.exists(dst_file): os.makedirs(workflow_directory, exist_ok=True) shutil.copyfile(src_file, dst_file) return dst_file
def _get_workflow(self, scan: BlissScanType) -> Optional[str]: return resource_filename("id16b", "id16b_xeol.json") def _set_workflow(self, scan: BlissScanType, filename) -> None: self.workflow = filename def _get_workflows_dir(self, dataset_filename: str) -> str: """Directory where to save the un-parameterized workflow.""" return directories.get_workflows_dir(dataset_filename) def _get_workflow_destination(self, scan: BlissScanType) -> str: """Directory where to save the workflow parameterized for each scan.""" convert_destination = directories.workflow_destination(scan) dirname, basename = os.path.split(convert_destination) base, ext = os.path.splitext(basename) return os.path.join( dirname, "workflows", f"{base}{self.output_suffix or ''}{ext}" ) def _get_workflow_upload_parameters(self, scan: BlissScanType) -> dict: raw_directory = os.path.dirname(self.get_filename(scan)) processed_directory = directories.scan_processed_directory(scan) scan_saving = current_session.scan_saving metadata = {"Sample_name": scan_saving.dataset["Sample_name"]} return { "beamline": scan_saving.beamline, "proposal": scan_saving.proposal_name, "dataset": "xeol_fit", "path": processed_directory, "raw": [raw_directory], "metadata": metadata, }
[docs] def get_inputs(self, scan: BlissScanType) -> List[dict]: if not self.configuration_path: raise ValueError( "Configuration path is required. Please set it before start" ) inputs = [] # Raw data parameters # Parameters used to read data from HDF5 input_filename = self.get_filename(scan) scan_nb = scan.scan_info.get("scan_nb") # Parameters used to read data from Redis scan_id = scan._scan_data.key for task_identifier in ["ReadStack", "SaveXeolH5"]: inputs += [ { "name": "stack_path", "value": input_filename, "task_identifier": task_identifier, }, { "name": "scan_number", "value": scan_nb, "task_identifier": task_identifier, }, { "name": "key", "value": scan_id, "task_identifier": task_identifier, }, ] # Optional read parameters if self.main_counter: inputs.append( { "name": "counter", "value": self.main_counter, "task_identifier": "ReadStack", } ) if self.normalization_counter: inputs.append( { "name": "norm_counter", "value": self.normalization_counter, "task_identifier": "ReadStack", } ) if self.threshold is not None: inputs.append( { "name": "threshold", "value": self.threshold, "task_identifier": "ReadStack", } ) # Spectral energy calibration calibration_x = self.calibration_x if calibration_x is None: calibration_x = scan.scan_info.get("instrument", dict()).get("calibration") if calibration_x: inputs += [ { "name": "calibration_x", "value": calibration_x, "task_identifier": "ReadStack", }, { "name": "calibration_x", "value": calibration_x, "task_identifier": "SaveXeolH5", }, ] # Spectral intensity calibration if self.calibration_y: inputs += [ { "name": "calibration_y", "value": self.calibration_y, "task_identifier": "ReadStack", }, { "name": "calibration_y", "value": self.calibration_y, "task_identifier": "SaveXeolH5", }, ] # Fit parameters inputs.append( { "name": "config_path", "value": self.configuration_path, "task_identifier": "ReadCorrectConfig", } ) # Save parameters output_filename = self.get_output_filename(scan) inputs += [ { "name": "save_path", "value": output_filename, "task_identifier": "SaveXeolH5", }, ] return inputs