from typing import List, Dict, Any, Optional
try:
from bliss import current_session
except ImportError:
scan_meta = None
current_session = None
user_print = print
import os
from ewoksjob.client import submit
from ..utils import directories
from ..persistent.parameters import (
WithPersistentParameters,
ParameterInfo,
)
_DEFAULT_USER_PARS: Dict[str, Any] = {
"flip_ud": False,
"flip_lr": True,
"wavelength": 0.1,
"distance": 100,
"beam": [1000, 1100],
"polarization": 0.99,
"kappa": 0,
"alpha": 50,
"theta": 0,
"phi": 0,
"omega": "",
"rotation": 180,
"dummy": -1,
"offset": 1,
"dry_run": False,
"calc_mask": False,
}
_DEFAULT_SCAN_PARS: Dict[str, Any] = {
"images": "",
"output": "",
}
[docs]
class Id11Eiger2Crysalis(
WithPersistentParameters,
parameters=[
ParameterInfo("worker", category="workflows"),
ParameterInfo("workflow", category="workflows"),
ParameterInfo(
"lima_name",
category="Eiger2Crysalis",
doc="Lima name of the camera: (i.e. eiger)",
),
ParameterInfo(
"scan_parameters",
category="Eiger2Crysalis",
doc="Derived from scan and motors",
),
ParameterInfo(
"user_parameters", category="Eiger2Crysalis", doc="Specify explicitly"
),
ParameterInfo(
"frameset_copy",
category="Eiger2Crysalis",
doc="frame.set path that will be copied automatically inside the output folder",
),
],
):
def __init__(self, **defaults: Any) -> None:
if current_session is None:
raise ImportError("bliss")
defaults.setdefault("workflow", "")
defaults.setdefault("user_parameters", _DEFAULT_USER_PARS)
defaults.setdefault("scan_parameters", _DEFAULT_SCAN_PARS)
defaults.setdefault("frameset_copy", "")
super().__init__(**defaults)
def _info_categories(self) -> Dict[str, dict]:
self.update_scan_parameters()
return super()._info_categories()
[docs]
def update_scan_parameters(self, scan: Any = None) -> None:
scan_parameters = dict()
if scan:
scan_parameters["images"] = self.get_lima_filenames(scan)
scan_parameters["output"] = self.get_output_path(scan)
self.scan_parameters.update(scan_parameters)
return scan_parameters
[docs]
def get_filename(self, scan) -> str:
filename = scan.scan_info.get("filename")
if filename:
return filename
return current_session.scan_saving.filename
[docs]
def get_output_path(self, scan: Any) -> str:
scan_nb = scan.scan_info["scan_nb"]
dataset_processed_dir = get_dataset_processed_dir(self.get_filename(scan))
scan_processed_dir = os.path.join(dataset_processed_dir, f"scan{scan_nb:04d}")
output = os.path.join(scan_processed_dir, "frame_1_{index}.esperanto")
return output
[docs]
def get_lima_filenames(self, scan: Any) -> List[str]:
scan_number = scan.scan_number
lima_files = [
f"{scan.scan_saving.images_path.format(scan_number=scan_number,img_acq_device=self.lima_name)}0000.h5"
]
return lima_files
[docs]
def get_submit_arguments(self, scan) -> Dict[str, Any]:
return {
"inputs": self.get_inputs(scan),
"outputs": [{"all": "False"}],
}
[docs]
def run_conversion(self, scan: Optional[Any]) -> None:
"""Executes on given scan"""
kwargs = self.get_submit_arguments(scan)
submit(args=(self.workflow,), kwargs=kwargs, queue=self.worker)
[docs]
def get_dataset_processed_dir(
dataset_filename: str,
) -> str: # Temporary, waiting for !173 to be merged
root = directories.get_processed_dir(dataset_filename)
collection = os.path.basename(directories.get_collection_dir(dataset_filename))
dataset = os.path.basename(directories.get_dataset_dir(dataset_filename))
return os.path.join(root, collection, dataset)