from __future__ import annotations
import os
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from ..bliss_globals import current_session
from ..ewoks_utils import submit
from ..persistent.parameters import ParameterInfo
from ..processor import BaseProcessor
from ..processor import BlissScanType
from ..resources import resource_filename
from ..utils.directories import get_dataset_processed_dir
_EIGER2CBF_INPUT_NAMES = {
"images",
"output",
"mask",
"offset",
"dummy",
"pilatus",
"dry_run",
"energy",
"wavelength",
"distance",
"beam",
"alpha",
"kappa",
"chi",
"phi",
"omega",
"rotation",
"transpose",
"flip_ud",
"flip_lr",
"verbose",
"debug",
}
_DEFAULT_USER_PARAMETERS: Dict[str, Any] = {
"offset": 1,
"dummy": -1,
"pilatus": False,
"dry_run": False,
"wavelength": 0.1,
"distance": 0.1,
"beam": [1000, 1100],
"alpha": None,
"kappa": None,
"chi": None,
"phi": None,
"omega": "",
"rotation": 0,
"transpose": False,
"flip_ud": False,
"flip_lr": False,
}
[docs]
class Eiger2CbfProcessor(
BaseProcessor,
parameters=[
ParameterInfo("queue", category="workflows", deprecated_names=["worker"]),
ParameterInfo("workflow", category="workflows"),
ParameterInfo(
"lima_name",
category="Eiger2CBF",
doc="Lima name of the camera, for example 'eiger'",
),
ParameterInfo(
"scan_parameters",
category="Eiger2CBF",
doc="Derived from scan and motors",
),
ParameterInfo(
"user_parameters",
category="Eiger2CBF",
doc="FabIO eiger2cbf parameters",
),
],
):
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
defaults: Optional[Dict[str, Any]] = None,
**deprecated_defaults: Dict[str, Any],
) -> None:
defaults = self._merge_defaults(deprecated_defaults, defaults)
defaults.setdefault("trigger_at", "END")
defaults.setdefault("workflow", resource_filename("scxrd", "eiger2cbf.json"))
defaults.setdefault("lima_name", "eiger")
defaults.setdefault("scan_parameters", {})
defaults.setdefault("user_parameters", _DEFAULT_USER_PARAMETERS)
super().__init__(config=config, defaults=defaults)
def _info_categories(self) -> Dict[str, dict]:
self.update_scan_parameters()
return super()._info_categories()
[docs]
def update_scan_parameters(self, scan: BlissScanType = None) -> Dict[str, Any]:
scan_parameters: Dict[str, Any] = {}
if scan is not None:
scan_parameters["images"] = self.get_lima_filenames(scan)
scan_parameters["output"] = self.get_output_path(scan)
self.scan_parameters.update(scan_parameters)
return scan_parameters
[docs]
def get_filename(self, scan: BlissScanType) -> str:
filename = scan.scan_info.get("filename")
if filename:
return filename
return current_session.scan_saving.filename
[docs]
def get_lima_name(self, scan: BlissScanType) -> str:
channels = scan.scan_info.get("channels", {})
if f"{self.lima_name}:image" in channels:
return self.lima_name
raise AssertionError(f"{self.lima_name} detector not found.")
[docs]
def get_lima_filenames(self, scan: BlissScanType) -> List[str]:
scan_number = scan.scan_number
image_path = scan.scan_saving.images_path.format(
scan_number=scan_number, img_acq_device=self.lima_name
)
return [f"{image_path}0000.h5"]
[docs]
def get_output_path(self, scan: BlissScanType) -> str:
scan_nb = scan.scan_info.get("scan_nb", scan.scan_number)
dataset_processed_dir = get_dataset_processed_dir(self.get_filename(scan))
scan_processed_dir = os.path.join(dataset_processed_dir, f"scan{scan_nb:04d}")
dataset_name = os.path.basename(dataset_processed_dir)
return os.path.join(
scan_processed_dir,
"cbf",
f"{dataset_name}_{scan_nb:04d}_{{index:04d}}.cbf",
)
[docs]
def workflow_destination(self, scan: BlissScanType) -> str:
cbf_dir = os.path.dirname(self.get_output_path(scan))
dataset_name = os.path.basename(
get_dataset_processed_dir(self.get_filename(scan))
)
scan_nb = scan.scan_info.get("scan_nb", scan.scan_number)
return os.path.join(cbf_dir, f"{dataset_name}_{scan_nb:04d}_eiger2cbf.json")
[docs]
def get_omega(self, scan: BlissScanType) -> Optional[str]:
fscan_parameters = scan.scan_info.get("instrument", {}).get("fscan_parameters")
if not fscan_parameters:
return self.user_parameters.to_dict().get("omega")
start = fscan_parameters["start_pos"]
step = fscan_parameters["step_size"]
return f"{start}+index*{step}"
[docs]
def get_scan_parameters(self, scan: BlissScanType) -> Dict[str, Any]:
return {"omega": self.get_omega(scan)}
[docs]
def get_submit_arguments(self, scan: BlissScanType) -> Dict[str, Any]:
return {"inputs": self.get_inputs(scan), "outputs": [{"all": "False"}]}
[docs]
def run_conversion(self, scan: BlissScanType) -> None:
if "fscan" in scan.scan_info.get("type", "") and self.get_lima_name(scan):
kwargs = self.get_submit_arguments(scan)
kwargs["convert_destination"] = self.workflow_destination(scan)
submit(args=(self.workflow,), kwargs=kwargs, queue=self.queue)
def _trigger_workflow_on_new_scan(self, scan: BlissScanType) -> None:
self.run_conversion(scan)