Source code for blissoda.id32.processor

import json
import logging
import os
from typing import Dict, Optional

from blissoda.resources import resource_filename

try:
    from bliss import current_session
except ImportError:
    current_session = None

from ewoksjob.client import submit

from ..persistent.parameters import (
    ParameterInfo,
    WithPersistentParameters,
    autocomplete_property,
)
from ..utils import trigger
from ..utils.directories import get_dataset_processed_dir

logger = logging.getLogger(__name__)


# ~ from bliss import setup_globals
# ~ from bliss.config.settings import HashSetting
# ~ class SpecGenParameters:
# ~ def __init__(self):
# ~ andor_setting = {"slope": 0}
# ~ self.andor = HashSetting('andor1_setting', default_values=andor_setting)


[docs] class Id32SpecGenProcessor( WithPersistentParameters, parameters=[ ParameterInfo("_enabled"), ParameterInfo("trigger_at", category="workflows"), ParameterInfo("parameters", category="workflows"), ], ): WORKER = "ewoksworker_lid32rixs1" WORKFLOW_FILENAME = "convert_image_to_spectrum.json" OUTPUT_FILENAME = "spectrum.txt" def __init__(self, detectors=[], **defaults) -> None: if current_session is None: raise ImportError("bliss") defaults.setdefault("_enabled", False) defaults.setdefault("trigger_at", "PREPARED") defaults.setdefault( "parameters", { detector: { "slope": -0.0688, "energy calibration (meV/px)": 21.5, "points per pixel": 2.7, "low threshold": 0.12, "high threshold": 1, "mask size": 5, "SPC": True, "SPC grid size": 3, "SPC low threshold": 0.2, "SPC high threshold": 1.0, "SPC single event threshold": 0.4, "SPC double event threshold": 1.5, } for detector in detectors }, ) self.detectors = detectors super().__init__(**defaults) if self._enabled: self._register_workflow_trigger() else: self._unregister_workflow_trigger() def _register_workflow_trigger(self): workflows_category = trigger.register_workflow_category(timing=self.trigger_at) workflows_category.set("processing", self._trigger_workflow_on_new_scan) def _unregister_workflow_trigger(self): trigger.unregister_workflow_category()
[docs] def enable(self): self._enabled = True self._register_workflow_trigger()
[docs] def disable(self): self._enabled = False self._unregister_workflow_trigger()
[docs] def setup(self): spacer = 8 * " " req = " ".join( ["(%d) %s" % (i + 1, det) for i, det in enumerate(self.detectors)] ) ret = input(spacer + "Detector? " + req + ": ") try: det = self.detectors[int(ret[0]) - 1] except (ValueError, IndexError): print("Detector unknow") return for par in self.parameters[det].keys(): old_par = self.parameters[det][par] req = "%s (%s): " % (par, str(old_par)) ret = input(spacer + req) if not ret: continue try: if isinstance(old_par, bool): new_par = bool(ret) elif "size" in par: new_par = int(float(ret)) else: new_par = float(ret) except ValueError: print("Invalid input") continue self.parameters[det][par] = new_par
def _info_categories(self) -> Dict[str, dict]: categories = super()._info_categories() categories["status"] = {"Enabled": self._enabled} return categories @autocomplete_property def trigger_at(self) -> Optional[int]: return self._get_parameter("trigger_at") @trigger_at.setter def trigger_at(self, value: int): self._set_parameter("trigger_at", value) if self._enabled: self.disable() self.enable() def _get_workflow(self) -> dict: with open(resource_filename("id32", self.WORKFLOW_FILENAME), "r") as wf: return json.load(wf) def _get_workflow_inputs(self, scan, detector) -> list: return [ { "name": "scan_number", "value": scan.scan_number, }, { "name": "input_path", "value": scan.scan_saving.filename, }, { "name": "output_path", "value": self._get_scan_processed_directory(scan), }, {"name": "detector", "value": detector}, {"name": "specgen_parameters", "value": dict(self.parameters[detector])}, ] def _get_scan_filename(self, scan) -> str: filename = scan.scan_info.get("filename") if filename: return filename return current_session.scan_saving.filename def _get_scan_processed_directory(self, scan) -> str: return get_dataset_processed_dir(self._get_scan_filename(scan)) def _get_workflow_destination(self, scan) -> str: """Builds the path where the workflow JSON will be saved.""" filename = self._get_scan_filename(scan) scan_nb = scan.scan_info.get("scan_nb") root = self._get_scan_processed_directory(scan) stem = os.path.splitext(os.path.basename(filename))[0] basename = f"{stem}_{scan_nb:04d}.json" return os.path.join(root, basename) def _trigger_workflow_on_new_scan(self, scan) -> None: if not scan.scan_info["save"]: return for detector in self.detectors: if "%s:image" % detector in scan.scan_info["channels"].keys(): workflow = self._get_workflow() inputs = self._get_workflow_inputs(scan, detector) kwargs = {"inputs": inputs, "outputs": [{"all": False}]} kwargs["convert_destination"] = self._get_workflow_destination(scan) submit(args=(workflow,), kwargs=kwargs, queue=self.WORKER)