import os
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from ..bliss_globals import current_session
from ..ewoks_utils import submit
from ..persistent.parameters import ParameterInfo
from ..processor import BaseProcessor
from ..processor import BlissScanType
from ..resources import resource_filename
from ..utils.directories import get_dataset_processed_dir
DEFAULT_OUTPUT_FILE_FORMAT = "esperanto"
DEFAULT_EIGER2CRYSALIS_WORKFLOW = resource_filename("scxrd", "eiger2crysalis.json")
DEFAULT_EIGER2CRYSALIS_LIMA2_WORKFLOW = resource_filename(
"scxrd", "eiger2crysalis_lima2.json"
)
DEFAULT_EIGER2CBF_XDS_WORKFLOW = resource_filename("scxrd", "eiger2cbf_xds.json")
VALID_OUTPUT_FILE_FORMATS = {"cbf", "esperanto"}
EIGER2CBF_INPUT_NAMES = {
"images",
"output",
"mask",
"offset",
"dummy",
"pilatus",
"dry_run",
"energy",
"wavelength",
"distance",
"beam",
"alpha",
"kappa",
"chi",
"phi",
"omega",
"rotation",
"transpose",
"flip_ud",
"flip_lr",
"verbose",
"debug",
}
DEFAULT_XDS_INP_PARAMETERS: Dict[str, Any] = {
"detector": "EIGER",
"nx": 3108,
"ny": 3262,
"qx": 0.075,
"qy": 0.075,
}
DEFAULT_UNTRUSTED_RECTANGLES = [
[513, 514, 0, 3262],
[1028, 1039, 0, 3262],
[1553, 1554, 0, 3262],
[2068, 2079, 0, 3262],
[2593, 2594, 0, 3262],
[0, 3108, 512, 549],
[0, 3108, 1062, 1099],
[0, 3108, 1612, 1649],
[0, 3108, 2162, 2199],
[0, 3108, 2712, 2749],
]
DEFAULT_XDS_OPTIONAL_PARAMETERS: Dict[str, Any] = {
"filename": "XDS.INP",
"job": "XYCORR INIT COLSPOT IDXREF DEFPIX INTEGRATE CORRECT",
"overload": 100000000,
"space_group_number": 0,
"friedels_law": "FALSE",
"minimum_valid_pixel_value": 0,
"trusted_region": [0.0, 1.41],
"polarization_plane_normal": [0.0, 1.0, 0.0],
"direction_of_detector_x_axis": [1.0, 0.0, 0.0],
"direction_of_detector_y_axis": [0.0, 1.0, 0.0],
"rotation_axis": [0.0, -1.0, 0.0],
"incident_beam_direction": [0.0, 0.0, 1.0],
"sensor_thickness": 0.75,
"silicon": 18.023,
}
DEFAULT_USER_PARS: Dict[str, Any] = {
"flip_ud": False,
"flip_lr": False,
"wavelength": 0.1,
"distance": 100,
"beam": [1000, 1100],
"polarization": 0.99,
"kappa": 0,
"alpha": 50,
"theta": 0,
"phi": 0,
"omega": "",
"rotation": 180,
"dummy": -1,
"offset": 1,
"dry_run": False,
"calc_mask": False,
}
DEFAULT_SCAN_PARS: Dict[str, Any] = {
"images": "",
"output": "",
}
PAR_METADATA_KEYS = list(DEFAULT_USER_PARS.keys())
[docs]
class Eiger2CrysalisProcessor(
BaseProcessor,
parameters=[
ParameterInfo("queue", category="workflows", deprecated_names=["worker"]),
ParameterInfo("workflow", category="workflows"),
ParameterInfo(
"lima_name",
category="Eiger2Crysalis",
doc="Lima name of the camera: (i.e. eiger)",
),
ParameterInfo(
"scan_parameters",
category="Eiger2Crysalis",
doc="Derived from scan and motors",
),
ParameterInfo(
"user_parameters", category="Eiger2Crysalis", doc="Specify explicitly"
),
ParameterInfo(
"ini_file",
category="ExtraFiles",
doc="CrysalisExpSettings.ini path",
),
ParameterInfo(
"par_file",
category="ExtraFiles",
doc=".par file path",
),
ParameterInfo(
"set_file",
category="ExtraFiles",
doc=".set file path",
),
ParameterInfo(
"ccd_file",
category="ExtraFiles",
doc=".ccd file path",
),
],
):
detector_channel_suffix = "image"
lima_file_suffix = "0000.h5"
default_user_parameters = DEFAULT_USER_PARS
default_scan_parameters = DEFAULT_SCAN_PARS
require_bliss_for_trigger_registration = False
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
defaults: Optional[Dict[str, Any]] = None,
**deprecated_defaults: Dict[str, Any],
) -> None:
defaults = self._merge_defaults(deprecated_defaults, defaults)
self.set_default_parameters(defaults)
super().__init__(config=config, defaults=defaults)
[docs]
def set_default_parameters(self, defaults: Dict[str, Any]) -> None:
defaults.setdefault("trigger_at", "END")
defaults.setdefault("workflow", DEFAULT_EIGER2CRYSALIS_WORKFLOW)
defaults.setdefault("user_parameters", self.default_user_parameters.copy())
defaults.setdefault("scan_parameters", self.default_scan_parameters.copy())
defaults.setdefault("crysalis_ini", "")
def _info_categories(self) -> Dict[str, dict]:
self.update_scan_parameters()
return super()._info_categories()
[docs]
def update_scan_parameters(self, scan: Any = None) -> Dict[str, Any]:
scan_parameters = dict()
if scan:
scan_parameters["images"] = self.get_lima_filenames(scan)
scan_parameters["output"] = self.get_output_path(scan)
scan_parameters["processed_output"] = self.get_processed_output_path(scan)
self.scan_parameters.update(scan_parameters)
return scan_parameters
[docs]
def get_filename(self, scan: BlissScanType) -> str:
filename = scan.scan_info.get("filename")
if filename:
return filename
return current_session.scan_saving.filename
[docs]
def get_lima_name(self, scan: BlissScanType) -> str:
channels = scan.scan_info.get("channels", dict())
if f"{self.lima_name}:{self.detector_channel_suffix}" in channels:
return self.lima_name
raise AssertionError(f"{self.lima_name} detector not found.")
[docs]
def get_output_path(self, scan: BlissScanType) -> str:
scan_nb = scan.scan_info["scan_nb"]
dataset_processed_dir = get_dataset_processed_dir(self.get_filename(scan))
scan_processed_dir = os.path.join(dataset_processed_dir, f"scan{scan_nb:04d}")
dataset_name = os.path.basename(dataset_processed_dir)
return os.path.join(
scan_processed_dir, f"{dataset_name}_{scan_nb:04d}" + "_1_{index}.esperanto"
)
[docs]
def get_processed_output_path(self, scan: BlissScanType) -> str:
scan_nb = scan.scan_info["scan_nb"]
dataset_processed_dir = get_dataset_processed_dir(self.get_filename(scan))
scan_processed_dir = os.path.join(dataset_processed_dir, f"scan{scan_nb:04d}")
dataset_name = os.path.basename(dataset_processed_dir)
return os.path.join(scan_processed_dir, f"{dataset_name}_{scan_nb:04d}")
[docs]
def workflow_destination(self, scan: BlissScanType) -> str:
return f"{self.get_processed_output_path(scan)}.json"
[docs]
def get_lima_filenames(self, scan: BlissScanType) -> List[str]:
scan_number = scan.scan_number
image_path = scan.scan_saving.images_path.format(
scan_number=scan_number, img_acq_device=self.lima_name
)
return [f"{image_path}{self.lima_file_suffix}"]
[docs]
def get_omega(self, scan: BlissScanType) -> str:
fscan_parameters = scan.scan_info["instrument"]["fscan_parameters"]
start = fscan_parameters["start_pos"]
step = fscan_parameters["step_size"]
return f"{start}+index*{step}"
[docs]
def get_scan_parameters(self, scan: BlissScanType) -> Dict[str, Any]:
fscan_parameters = scan.scan_info["instrument"]["fscan_parameters"]
return {
"omega": self.get_omega(scan),
"exposure_time": fscan_parameters["acq_time"],
}
[docs]
def get_run_parameters(self, scan: BlissScanType) -> Dict[str, Any]:
fscan_parameters = scan.scan_info["instrument"]["fscan_parameters"]
step_size = fscan_parameters["step_size"]
start_pos = fscan_parameters["start_pos"]
return {
"count": scan.scan_info["npoints"],
"omega": 0,
"omega_start": start_pos,
"omega_end": start_pos + scan.scan_info["npoints"] * step_size,
"pixel_size": 0.075,
"omega_runs": None,
"theta": 0,
"kappa": 0,
"phi": 0,
"domega": step_size,
"dtheta": 0,
"dkappa": 0,
"dphi": 0,
"center_x": self.user_parameters["beam"][0],
"center_y": self.user_parameters["beam"][1],
"alpha": 50,
"dist": self.user_parameters["distance"],
"l1": self.user_parameters["wavelength"],
"l2": self.user_parameters["wavelength"],
"l12": self.user_parameters["wavelength"],
"b": self.user_parameters["wavelength"],
"mono": 0.99,
"monotype": "SYNCHROTRON",
"chip": [1024, 1024],
"Exposure_time": fscan_parameters["acq_time"],
}
def _get_parameters(self, scan: BlissScanType) -> Dict[str, Any]:
parameters = self.user_parameters.to_dict()
parameters.update(self.update_scan_parameters(scan))
parameters.update(self.get_scan_parameters(scan))
return parameters
[docs]
def get_submit_arguments(self, scan: BlissScanType) -> Dict[str, Any]:
return {
"inputs": self.get_inputs(scan),
"outputs": [{"all": "False"}],
}
[docs]
def run_conversion(self, scan: BlissScanType) -> None:
if "fscan" in scan.scan_info["type"] and self.get_lima_name(scan):
kwargs = self.get_submit_arguments(scan)
kwargs["convert_destination"] = self.workflow_destination(scan)
submit(args=(self.workflow,), kwargs=kwargs, queue=self.queue)
def _trigger_workflow_on_new_scan(self, scan: BlissScanType) -> None:
self.run_conversion(scan)
[docs]
class Eiger2CrysalisWithCbfProcessor(
Eiger2CrysalisProcessor,
parameters=[
ParameterInfo(
"_workflow",
category="workflows",
deprecated_names=["workflow"],
),
ParameterInfo(
"_cbf_workflow",
category="workflows",
deprecated_names=["cbf_workflow"],
doc="Workflow used when output_file_format includes 'cbf'",
),
ParameterInfo(
"output_file_format",
category="Eiger2Crysalis",
doc="Output file format: 'esperanto', 'cbf', or both as a list, e.g. ['esperanto', 'cbf']",
),
ParameterInfo(
"xds_parameters",
category="Eiger2Crysalis",
doc="Optional overrides for the CreateXDSInp task used in the CBF workflow",
),
],
):
[docs]
def set_default_parameters(self, defaults: Dict[str, Any]) -> None:
super().set_default_parameters(defaults)
defaults.setdefault("_workflow", DEFAULT_EIGER2CRYSALIS_WORKFLOW)
defaults.setdefault("_cbf_workflow", DEFAULT_EIGER2CBF_XDS_WORKFLOW)
defaults.setdefault("lima_name", "eiger")
defaults.setdefault("output_file_format", DEFAULT_OUTPUT_FILE_FORMAT)
defaults.setdefault("xds_parameters", DEFAULT_XDS_INP_PARAMETERS.copy())
[docs]
def update_scan_parameters(self, scan: Any = None) -> Dict[str, Any]:
scan_parameters = dict()
if scan:
scan_parameters["images"] = self.get_lima_filenames(scan)
if "esperanto" in self.get_output_file_formats():
scan_parameters["output"] = self.get_output_path(scan)
scan_parameters["processed_output"] = self.get_processed_output_path(
scan
)
if "cbf" in self.get_output_file_formats():
scan_parameters["cbf_output"] = self.get_cbf_output_path(scan)
self.scan_parameters.update(scan_parameters)
return scan_parameters
[docs]
def get_cbf_output_path(self, scan: BlissScanType) -> str:
scan_nb = scan.scan_info.get("scan_nb", scan.scan_number)
dataset_processed_dir = get_dataset_processed_dir(self.get_filename(scan))
scan_processed_dir = os.path.join(dataset_processed_dir, f"scan{scan_nb:04d}")
dataset_name = os.path.basename(dataset_processed_dir)
return os.path.join(
scan_processed_dir,
"cbf",
f"{dataset_name}_{scan_nb:04d}_{{index:04d}}.cbf",
)
[docs]
def workflow_destination(self, scan: BlissScanType, output_file_format: str) -> str:
if output_file_format == "esperanto":
return f"{self.get_processed_output_path(scan)}.json"
if output_file_format == "cbf":
cbf_dir = os.path.dirname(self.get_cbf_output_path(scan))
base_name = os.path.basename(self.get_processed_output_path(scan))
return os.path.join(cbf_dir, f"{base_name}_cbf.json")
raise ValueError(
f"Unsupported output_file_format {output_file_format!r} for workflow destination."
)
def _get_esperanto_inputs(self, scan: BlissScanType) -> List[Dict[str, Any]]:
parameters = self.user_parameters.to_dict()
parameters.update(
{
"images": self.get_lima_filenames(scan),
"output": self.get_output_path(scan),
"processed_output": self.get_processed_output_path(scan),
}
)
parameters.update(self.get_scan_parameters(scan))
inputs = self.get_crysalis_inputs(parameters)
inputs.extend(self.get_extra_inputs(scan, parameters))
return inputs
def _get_cbf_inputs(self, scan: BlissScanType) -> List[Dict[str, Any]]:
parameters = self.user_parameters.to_dict()
parameters.update(
{
"images": self.get_lima_filenames(scan),
"output": self.get_cbf_output_path(scan),
}
)
parameters.update(self.get_scan_parameters(scan))
inputs = []
for key, value in parameters.items():
if key not in EIGER2CBF_INPUT_NAMES:
continue
inputs.append(
{
"task_identifier": "Eiger2CBF",
"name": key,
"value": value,
}
)
xds_parameters = self.get_xds_parameters(scan)
for key, value in xds_parameters.items():
inputs.append(
{
"task_identifier": "CreateXDSInp",
"name": key,
"value": value,
}
)
return inputs
[docs]
def get_xds_parameters(self, scan: BlissScanType) -> Dict[str, Any]:
fscan_parameters = scan.scan_info["instrument"]["fscan_parameters"]
offset = int(self.user_parameters["offset"])
count = int(scan.scan_info["npoints"])
data_range = [offset, offset + count - 1]
parameters = {
"background_range": list(data_range),
"beam": list(self.user_parameters["beam"]),
"data_range": data_range,
"distance": self.user_parameters["distance"],
"fraction_of_polarization": self.user_parameters["polarization"],
"oscillation_range": abs(fscan_parameters["step_size"]),
"output": self.get_cbf_output_path(scan),
"spot_range": list(data_range),
"starting_angle": fscan_parameters["start_pos"],
"starting_frame": offset,
"untrusted_rectangles": [list(v) for v in DEFAULT_UNTRUSTED_RECTANGLES],
"wavelength": self.user_parameters["wavelength"],
}
for key, value in DEFAULT_XDS_OPTIONAL_PARAMETERS.items():
parameters.setdefault(
key, list(value) if isinstance(value, list) else value
)
parameters.update(self.xds_parameters.to_dict())
parameters["output"] = self.get_cbf_output_path(scan)
return parameters
[docs]
def get_submit_arguments(
self, scan: BlissScanType, output_file_format: str
) -> Dict[str, Any]:
return {
"inputs": self.get_inputs(scan, output_file_format=output_file_format),
"outputs": [{"all": "False"}],
}
[docs]
def get_workflow(self, output_file_format: str) -> str:
if output_file_format == "esperanto":
return self._workflow
if output_file_format == "cbf":
if not self._cbf_workflow:
raise ValueError(
"cbf_workflow must be configured when output_file_format includes 'cbf'."
)
return self._cbf_workflow
raise ValueError(
f"Unsupported output_file_format {output_file_format!r}. Expected one of {VALID_OUTPUT_FILE_FORMATS}."
)
[docs]
def run_conversion(self, scan: BlissScanType) -> None:
if "fscan" in scan.scan_info["type"] and self.get_lima_name(scan):
for output_file_format in self.get_output_file_formats():
kwargs = self.get_submit_arguments(scan, output_file_format)
kwargs["convert_destination"] = self.workflow_destination(
scan, output_file_format
)
submit(
args=(self.get_workflow(output_file_format),),
kwargs=kwargs,
queue=self.queue,
)
[docs]
class Eiger2CrysalisLima2Processor(Eiger2CrysalisProcessor):
detector_channel_suffix = "raw_frame"
lima_file_suffix = "frame_0_00000.h5"
[docs]
def set_default_parameters(self, defaults: Dict[str, Any]) -> None:
workflow = defaults.pop("workflow", DEFAULT_EIGER2CRYSALIS_LIMA2_WORKFLOW)
super().set_default_parameters(defaults)
defaults["workflow"] = workflow