import os
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from ..bliss_globals import current_session
from ..ewoks_utils import convert_graph
from ..ewoks_utils import submit
from ..persistent.parameters import ParameterInfo
from ..processor import BaseProcessor
from ..processor import BlissScanType
from ..utils import directories
# Default user parameters for par file
_DEFAULT_USER_PARS: Dict[str, Any] = {
"flip_ud": False,
"flip_lr": True,
"wavelength": 0.1,
"distance": 100,
"beam": [1000, 1100],
"polarization": 0.99,
"kappa": 0,
"alpha": 50,
"theta": 0,
"phi": 0,
"omega": "",
"rotation": 180,
"dummy": -1,
"offset": 1,
"dry_run": False,
"calc_mask": False,
}
_DEFAULT_SCAN_PARS: Dict[str, Any] = {
"images": "",
"output": "",
}
# Keys eligible for par-file metadata overwrite
_PAR_METADATA_KEYS = list(_DEFAULT_USER_PARS.keys())
[docs]
class Id11Eiger2Crysalis(
BaseProcessor,
parameters=[
ParameterInfo("queue", category="workflows", deprecated_names=["worker"]),
ParameterInfo("workflow", category="workflows"),
ParameterInfo(
"lima_name",
category="Eiger2Crysalis",
doc="Lima name of the camera: (i.e. eiger)",
),
ParameterInfo(
"scan_parameters",
category="Eiger2Crysalis",
doc="Derived from scan and motors",
),
ParameterInfo(
"user_parameters", category="Eiger2Crysalis", doc="Specify explicitly"
),
ParameterInfo(
"ini_file",
category="ExtraFiles",
doc="CrysalisExpSettings.ini path",
),
ParameterInfo(
"par_file",
category="ExtraFiles",
doc=".par file path",
),
ParameterInfo(
"set_file",
category="ExtraFiles",
doc=".set file path",
),
ParameterInfo(
"ccd_file",
category="ExtraFiles",
doc=".ccd file path",
),
ParameterInfo(
"mode",
category="Mode",
doc="Choose 'Execute' to run the conversion or 'Json' just to generate the .json file without executing",
),
ParameterInfo(
"overwrite_par_file_metadata",
category="Eiger2Crysalis",
doc="If True, include individual user and scan parameters (distance, beam, etc.) will be overwritten in the par file",
),
],
):
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
defaults: Optional[Dict[str, Any]] = None,
**deprecated_defaults: Dict[str, Any],
) -> None:
defaults = self._merge_defaults(deprecated_defaults, defaults)
defaults.setdefault("trigger_at", "END")
defaults.setdefault("workflow", "")
defaults.setdefault("user_parameters", _DEFAULT_USER_PARS.copy())
defaults.setdefault("scan_parameters", _DEFAULT_SCAN_PARS.copy())
defaults.setdefault("crysalis_ini", "")
defaults.setdefault("overwrite_par_file_metadata", False)
super().__init__(config=config, defaults=defaults)
def _info_categories(self) -> Dict[str, dict]:
self.update_scan_parameters()
return super()._info_categories()
[docs]
def update_scan_parameters(self, scan: Any = None) -> None:
scan_parameters = dict()
if scan:
scan_parameters["images"] = self.get_lima_filenames(scan)
scan_parameters["output"] = self.get_output_path(scan)
scan_parameters["processed_output"] = self.get_processed_output_path(scan)
self.scan_parameters.update(scan_parameters)
return scan_parameters
[docs]
def get_filename(self, scan: BlissScanType) -> str:
filename = scan.scan_info.get("filename")
if filename:
return filename
return current_session.scan_saving.filename
[docs]
def get_lima_name(self, scan: BlissScanType) -> List[str]:
channels = scan.scan_info.get("channels", dict())
if f"{self.lima_name}:image" in channels:
return self.lima_name
else:
raise AssertionError(f"{self.lima_name} detector not found.")
[docs]
def get_output_path(self, scan: BlissScanType) -> str:
scan_nb = scan.scan_info["scan_nb"]
dataset_processed_dir = get_dataset_processed_dir(self.get_filename(scan))
scan_processed_dir = os.path.join(dataset_processed_dir, f"scan{scan_nb:04d}")
dataset_name = os.path.basename(dataset_processed_dir)
output = os.path.join(
scan_processed_dir, f"{dataset_name}_{scan_nb:04d}" + "_1_{index}.esperanto"
)
return output
[docs]
def get_processed_output_path(self, scan: BlissScanType) -> str:
scan_nb = scan.scan_info["scan_nb"]
dataset_processed_dir = get_dataset_processed_dir(self.get_filename(scan))
scan_processed_dir = os.path.join(dataset_processed_dir, f"scan{scan_nb:04d}")
dataset_name = os.path.basename(dataset_processed_dir)
output = os.path.join(scan_processed_dir, f"{dataset_name}_{scan_nb:04d}")
return output
[docs]
def workflow_destination(self, scan) -> str:
return f"{self.get_processed_output_path(scan)}.json"
[docs]
def get_lima_filenames(self, scan: BlissScanType) -> List[str]:
scan_number = scan.scan_number
lima_files = [
f"{scan.scan_saving.images_path.format(scan_number=scan_number, img_acq_device=self.lima_name)}0000.h5"
]
return lima_files
[docs]
def get_omega(self, scan: BlissScanType) -> str:
"""
Provide the omega parameter as a formatted string based on scan info.
The scan is assumed to be centered around zero.
The omega string is computed after ensuring:
- The step_size is negative (as required by CrysAlis).
"""
# Extract scan parameters.
params = scan.scan_info["instrument"]["fscan_parameters"]
omega_start = params["start_pos"]
step_size = params["step_size"]
nsteps = params["npoints"]
if step_size >= 0:
print(
"WARNING! CrysAlis cannot handle positive steps!! Changing to negative"
)
step_size = -step_size
end = nsteps * step_size + omega_start
npts = (end - omega_start) / step_size
center = step_size * npts / 2 + omega_start
omega_start_symetric = omega_start - center
omega_end_symetric = omega_start + center
self._omega_start_symetric = omega_start_symetric
self._omega_end_symetric = -omega_end_symetric
self._step_size = step_size
self._omega_start = omega_start
self._omega_end = end
return f"-{omega_start}-index*{step_size}"
[docs]
def get_scan_parameters(self, scan: BlissScanType) -> Dict[str, Any]:
scan_parameters = dict()
scan_parameters["omega"] = self.get_omega(scan)
scan_parameters["exposure_time"] = scan.scan_info["instrument"][
"fscan_parameters"
]["acq_time"]
return scan_parameters
[docs]
def get_run_parameters(self, scan: BlissScanType) -> Dict[str, Any]:
return {
"count": scan.scan_info["npoints"],
"omega": 0,
"omega_start": self._omega_start,
"omega_end": self._omega_end,
"pixel_size": 0.075,
"omega_runs": None,
"theta": 0,
"kappa": 0,
"phi": 0,
"domega": self._step_size,
"dtheta": 0,
"dkappa": 0,
"dphi": 0,
"center_x": self.user_parameters["beam"][0],
"center_y": self.user_parameters["beam"][1],
"alpha": 50,
"dist": self.user_parameters["distance"],
"l1": self.user_parameters["wavelength"],
"l2": self.user_parameters["wavelength"],
"l12": self.user_parameters["wavelength"],
"b": self.user_parameters["wavelength"],
"mono": 0.99,
"monotype": "SYNCHROTRON",
"chip": [1024, 1024],
"Exposure_time": scan.scan_info["instrument"]["fscan_parameters"][
"acq_time"
],
}
[docs]
def get_submit_arguments(self, scan: BlissScanType) -> Dict[str, Any]:
return {
"inputs": self.get_inputs(scan),
"outputs": [{"all": "False"}],
}
[docs]
def run_conversion(self, scan: BlissScanType) -> None:
"""Executes on given scan"""
if ("fscan" in scan.scan_info["type"]) and self.get_lima_name(
scan
): # or ("fscan2d" in scan.scan_info["type"])
if self.mode == "Execute":
kwargs = self.get_submit_arguments(scan)
kwargs["convert_destination"] = self.workflow_destination(scan)
submit(args=(self.workflow,), kwargs=kwargs, queue=self.queue)
elif self.mode == "Json":
kwargs = {"inputs": self.get_inputs(scan)}
kwargs["destination"] = self.workflow_destination(scan)
convert_graph(args=(self.workflow,), kwargs=kwargs)
else:
raise TypeError(f"{self.mode} is not Execute or Json")
def _trigger_workflow_on_new_scan(self, scan: BlissScanType) -> None:
self.run_conversion(scan)
[docs]
def get_dataset_processed_dir(
dataset_filename: str,
) -> str: # Temporary, waiting for !173 to be merged
root = directories.get_processed_dir(dataset_filename)
collection = os.path.basename(directories.get_collection_dir(dataset_filename))
dataset = os.path.basename(directories.get_dataset_dir(dataset_filename))
return os.path.join(root, collection, dataset)