Source code for blissoda.scxrd.eiger2cbf

from __future__ import annotations

import os
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

from ..bliss_globals import current_session
from ..ewoks_utils import submit
from ..persistent.parameters import ParameterInfo
from ..processor import BaseProcessor
from ..processor import BlissScanType
from ..resources import resource_filename
from ..utils.directories import get_dataset_processed_dir

_EIGER2CBF_INPUT_NAMES = {
    "images",
    "output",
    "mask",
    "offset",
    "dummy",
    "pilatus",
    "dry_run",
    "energy",
    "wavelength",
    "distance",
    "beam",
    "alpha",
    "kappa",
    "chi",
    "phi",
    "omega",
    "rotation",
    "transpose",
    "flip_ud",
    "flip_lr",
    "verbose",
    "debug",
}
_DEFAULT_USER_PARAMETERS: Dict[str, Any] = {
    "offset": 1,
    "dummy": -1,
    "pilatus": False,
    "dry_run": False,
    "wavelength": 0.1,
    "distance": 0.1,
    "beam": [1000, 1100],
    "alpha": None,
    "kappa": None,
    "chi": None,
    "phi": None,
    "omega": "",
    "rotation": 0,
    "transpose": False,
    "flip_ud": False,
    "flip_lr": False,
}


[docs] class Eiger2CbfProcessor( BaseProcessor, parameters=[ ParameterInfo("queue", category="workflows", deprecated_names=["worker"]), ParameterInfo("workflow", category="workflows"), ParameterInfo( "lima_name", category="Eiger2CBF", doc="Lima name of the camera, for example 'eiger'", ), ParameterInfo( "scan_parameters", category="Eiger2CBF", doc="Derived from scan and motors", ), ParameterInfo( "user_parameters", category="Eiger2CBF", doc="FabIO eiger2cbf parameters", ), ], ): def __init__( self, config: Optional[Dict[str, Any]] = None, defaults: Optional[Dict[str, Any]] = None, **deprecated_defaults: Dict[str, Any], ) -> None: defaults = self._merge_defaults(deprecated_defaults, defaults) defaults.setdefault("trigger_at", "END") defaults.setdefault("workflow", resource_filename("scxrd", "eiger2cbf.json")) defaults.setdefault("lima_name", "eiger") defaults.setdefault("scan_parameters", {}) defaults.setdefault("user_parameters", _DEFAULT_USER_PARAMETERS) super().__init__(config=config, defaults=defaults) def _info_categories(self) -> Dict[str, dict]: self.update_scan_parameters() return super()._info_categories()
[docs] def update_scan_parameters(self, scan: BlissScanType = None) -> Dict[str, Any]: scan_parameters: Dict[str, Any] = {} if scan is not None: scan_parameters["images"] = self.get_lima_filenames(scan) scan_parameters["output"] = self.get_output_path(scan) self.scan_parameters.update(scan_parameters) return scan_parameters
[docs] def get_filename(self, scan: BlissScanType) -> str: filename = scan.scan_info.get("filename") if filename: return filename return current_session.scan_saving.filename
[docs] def get_lima_name(self, scan: BlissScanType) -> str: channels = scan.scan_info.get("channels", {}) if f"{self.lima_name}:image" in channels: return self.lima_name raise AssertionError(f"{self.lima_name} detector not found.")
[docs] def get_lima_filenames(self, scan: BlissScanType) -> List[str]: scan_number = scan.scan_number image_path = scan.scan_saving.images_path.format( scan_number=scan_number, img_acq_device=self.lima_name ) return [f"{image_path}0000.h5"]
[docs] def get_output_path(self, scan: BlissScanType) -> str: scan_nb = scan.scan_info.get("scan_nb", scan.scan_number) dataset_processed_dir = get_dataset_processed_dir(self.get_filename(scan)) scan_processed_dir = os.path.join(dataset_processed_dir, f"scan{scan_nb:04d}") dataset_name = os.path.basename(dataset_processed_dir) return os.path.join( scan_processed_dir, "cbf", f"{dataset_name}_{scan_nb:04d}_{{index:04d}}.cbf", )
[docs] def workflow_destination(self, scan: BlissScanType) -> str: cbf_dir = os.path.dirname(self.get_output_path(scan)) dataset_name = os.path.basename( get_dataset_processed_dir(self.get_filename(scan)) ) scan_nb = scan.scan_info.get("scan_nb", scan.scan_number) return os.path.join(cbf_dir, f"{dataset_name}_{scan_nb:04d}_eiger2cbf.json")
[docs] def get_omega(self, scan: BlissScanType) -> Optional[str]: fscan_parameters = scan.scan_info.get("instrument", {}).get("fscan_parameters") if not fscan_parameters: return self.user_parameters.to_dict().get("omega") start = fscan_parameters["start_pos"] step = fscan_parameters["step_size"] return f"{start}+index*{step}"
[docs] def get_scan_parameters(self, scan: BlissScanType) -> Dict[str, Any]: return {"omega": self.get_omega(scan)}
[docs] def get_inputs(self, scan: BlissScanType) -> List[Dict[str, Any]]: parameters = self.user_parameters.to_dict() parameters.update(self.get_scan_parameters(scan)) parameters.update( { "images": self.get_lima_filenames(scan), "output": self.get_output_path(scan), } ) inputs = [] for key, value in parameters.items(): if key not in _EIGER2CBF_INPUT_NAMES: continue if value is None: continue inputs.append( { "task_identifier": "Eiger2CBF", "name": key, "value": value, } ) return inputs
[docs] def get_submit_arguments(self, scan: BlissScanType) -> Dict[str, Any]: return {"inputs": self.get_inputs(scan), "outputs": [{"all": "False"}]}
[docs] def run_conversion(self, scan: BlissScanType) -> None: if "fscan" in scan.scan_info.get("type", "") and self.get_lima_name(scan): kwargs = self.get_submit_arguments(scan) kwargs["convert_destination"] = self.workflow_destination(scan) submit(args=(self.workflow,), kwargs=kwargs, queue=self.queue)
def _trigger_workflow_on_new_scan(self, scan: BlissScanType) -> None: self.run_conversion(scan)