Source code for blissoda.id02.processor

import logging
import os
from typing import Any
from typing import Dict
from typing import Optional

from ..bliss_globals import current_session
from ..ewoks_utils import get_future
from ..ewoks_utils import submit
from ..persistent.parameters import ParameterInfo
from ..processor import BaseProcessor
from ..processor import BlissScanType
from ..utils.directories import get_dataset_processed_dir
from .plotter import Id02Plotter

logger = logging.getLogger(__name__)


[docs] class Id02BaseProcessor( BaseProcessor, parameters=[ ParameterInfo("queue", category="workflows", deprecated_names=["worker"]), ParameterInfo("number_of_scans", category="plotting"), ], deprecated_class_attributes={"DEFAULT_WORKER": "DEFAULT_QUEUE"}, ): DEFAULT_QUEUE = None def __init__( self, config: Optional[Dict[str, Any]] = None, defaults: Optional[Dict[str, Any]] = None, **deprecated_defaults: Dict[str, Any], ) -> None: defaults = self._merge_defaults(deprecated_defaults, defaults) defaults.setdefault("queue", self.DEFAULT_QUEUE) defaults.setdefault("number_of_scans", 4) super().__init__(config=config, defaults=defaults) self._preset = self._set_up_preset() self._plotter = Id02Plotter(number_of_scans=self.number_of_scans) self._plotter.replot() def _set_up_preset(self): raise NotImplementedError()
[docs] def scan_requires_processing(self, scan: BlissScanType) -> bool: return scan.scan_info["save"]
[docs] def get_workflow(self, scan: BlissScanType) -> dict: dets = [d.name for d in self._preset.getDetectors(scan)] return self._preset.buildWorkflow(dets)
[docs] def get_inputs(self, scan: BlissScanType) -> list: dets = [d.name for d in self._preset.getDetectors(scan)] return self._preset.getInputs(scan, dets)
[docs] def get_filename(self, scan: BlissScanType) -> str: filename = scan.scan_info.get("filename") if filename: return filename return current_session.scan_saving.filename
[docs] def scan_processed_directory(self, scan: BlissScanType) -> str: return get_dataset_processed_dir(self.get_filename(scan))
[docs] def workflow_destination(self, scan: BlissScanType) -> str: filename = self.get_filename(scan) scan_nb = scan.scan_info.get("scan_nb") root = self.scan_processed_directory(scan) stem = os.path.splitext(os.path.basename(filename))[0] basename = f"{stem}_{scan_nb:04d}.json" return os.path.join(root, basename)
def _trigger_workflow_on_new_scan(self, scan: BlissScanType) -> Optional[Any]: if not self.scan_requires_processing(scan): return None workflow = self.get_workflow(scan) kwargs = {"inputs": self.get_inputs(scan), "outputs": [{"all": False}]} if scan.scan_info.get("save"): kwargs["convert_destination"] = self.workflow_destination(scan) future = submit(args=(workflow,), kwargs=kwargs, queue=self.queue) # TODO: Handle plotting with Flint future = get_future(future.uuid) return future