Source code for blissoda.scxrd.eiger2crysalis

import os
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

from ..bliss_globals import current_session
from ..ewoks_utils import submit
from ..persistent.parameters import ParameterInfo
from ..processor import BaseProcessor
from ..processor import BlissScanType
from ..resources import resource_filename
from ..utils.directories import get_dataset_processed_dir

DEFAULT_OUTPUT_FILE_FORMAT = "esperanto"
DEFAULT_EIGER2CRYSALIS_WORKFLOW = resource_filename("scxrd", "eiger2crysalis.json")
DEFAULT_EIGER2CRYSALIS_LIMA2_WORKFLOW = resource_filename(
    "scxrd", "eiger2crysalis_lima2.json"
)
DEFAULT_EIGER2CBF_XDS_WORKFLOW = resource_filename("scxrd", "eiger2cbf_xds.json")
VALID_OUTPUT_FILE_FORMATS = {"cbf", "esperanto"}
EIGER2CBF_INPUT_NAMES = {
    "images",
    "output",
    "mask",
    "offset",
    "dummy",
    "pilatus",
    "dry_run",
    "energy",
    "wavelength",
    "distance",
    "beam",
    "alpha",
    "kappa",
    "chi",
    "phi",
    "omega",
    "rotation",
    "transpose",
    "flip_ud",
    "flip_lr",
    "verbose",
    "debug",
}
DEFAULT_XDS_INP_PARAMETERS: Dict[str, Any] = {
    "detector": "EIGER",
    "nx": 3108,
    "ny": 3262,
    "qx": 0.075,
    "qy": 0.075,
}
DEFAULT_UNTRUSTED_RECTANGLES = [
    [513, 514, 0, 3262],
    [1028, 1039, 0, 3262],
    [1553, 1554, 0, 3262],
    [2068, 2079, 0, 3262],
    [2593, 2594, 0, 3262],
    [0, 3108, 512, 549],
    [0, 3108, 1062, 1099],
    [0, 3108, 1612, 1649],
    [0, 3108, 2162, 2199],
    [0, 3108, 2712, 2749],
]
DEFAULT_XDS_OPTIONAL_PARAMETERS: Dict[str, Any] = {
    "filename": "XDS.INP",
    "job": "XYCORR INIT COLSPOT IDXREF DEFPIX INTEGRATE CORRECT",
    "overload": 100000000,
    "space_group_number": 0,
    "friedels_law": "FALSE",
    "minimum_valid_pixel_value": 0,
    "trusted_region": [0.0, 1.41],
    "polarization_plane_normal": [0.0, 1.0, 0.0],
    "direction_of_detector_x_axis": [1.0, 0.0, 0.0],
    "direction_of_detector_y_axis": [0.0, 1.0, 0.0],
    "rotation_axis": [0.0, -1.0, 0.0],
    "incident_beam_direction": [0.0, 0.0, 1.0],
    "sensor_thickness": 0.75,
    "silicon": 18.023,
}
DEFAULT_USER_PARS: Dict[str, Any] = {
    "flip_ud": False,
    "flip_lr": False,
    "wavelength": 0.1,
    "distance": 100,
    "beam": [1000, 1100],
    "polarization": 0.99,
    "kappa": 0,
    "alpha": 50,
    "theta": 0,
    "phi": 0,
    "omega": "",
    "rotation": 180,
    "dummy": -1,
    "offset": 1,
    "dry_run": False,
    "calc_mask": False,
}
DEFAULT_SCAN_PARS: Dict[str, Any] = {
    "images": "",
    "output": "",
}
PAR_METADATA_KEYS = list(DEFAULT_USER_PARS.keys())


[docs] class Eiger2CrysalisProcessor( BaseProcessor, parameters=[ ParameterInfo("queue", category="workflows", deprecated_names=["worker"]), ParameterInfo("workflow", category="workflows"), ParameterInfo( "lima_name", category="Eiger2Crysalis", doc="Lima name of the camera: (i.e. eiger)", ), ParameterInfo( "scan_parameters", category="Eiger2Crysalis", doc="Derived from scan and motors", ), ParameterInfo( "user_parameters", category="Eiger2Crysalis", doc="Specify explicitly" ), ParameterInfo( "ini_file", category="ExtraFiles", doc="CrysalisExpSettings.ini path", ), ParameterInfo( "par_file", category="ExtraFiles", doc=".par file path", ), ParameterInfo( "set_file", category="ExtraFiles", doc=".set file path", ), ParameterInfo( "ccd_file", category="ExtraFiles", doc=".ccd file path", ), ], ): detector_channel_suffix = "image" lima_file_suffix = "0000.h5" default_user_parameters = DEFAULT_USER_PARS default_scan_parameters = DEFAULT_SCAN_PARS require_bliss_for_trigger_registration = False def __init__( self, config: Optional[Dict[str, Any]] = None, defaults: Optional[Dict[str, Any]] = None, **deprecated_defaults: Dict[str, Any], ) -> None: defaults = self._merge_defaults(deprecated_defaults, defaults) self.set_default_parameters(defaults) super().__init__(config=config, defaults=defaults)
[docs] def set_default_parameters(self, defaults: Dict[str, Any]) -> None: defaults.setdefault("trigger_at", "END") defaults.setdefault("workflow", DEFAULT_EIGER2CRYSALIS_WORKFLOW) defaults.setdefault("user_parameters", self.default_user_parameters.copy()) defaults.setdefault("scan_parameters", self.default_scan_parameters.copy()) defaults.setdefault("crysalis_ini", "")
def _info_categories(self) -> Dict[str, dict]: self.update_scan_parameters() return super()._info_categories()
[docs] def update_scan_parameters(self, scan: Any = None) -> Dict[str, Any]: scan_parameters = dict() if scan: scan_parameters["images"] = self.get_lima_filenames(scan) scan_parameters["output"] = self.get_output_path(scan) scan_parameters["processed_output"] = self.get_processed_output_path(scan) self.scan_parameters.update(scan_parameters) return scan_parameters
[docs] def get_filename(self, scan: BlissScanType) -> str: filename = scan.scan_info.get("filename") if filename: return filename return current_session.scan_saving.filename
[docs] def get_lima_name(self, scan: BlissScanType) -> str: channels = scan.scan_info.get("channels", dict()) if f"{self.lima_name}:{self.detector_channel_suffix}" in channels: return self.lima_name raise AssertionError(f"{self.lima_name} detector not found.")
[docs] def get_output_path(self, scan: BlissScanType) -> str: scan_nb = scan.scan_info["scan_nb"] dataset_processed_dir = get_dataset_processed_dir(self.get_filename(scan)) scan_processed_dir = os.path.join(dataset_processed_dir, f"scan{scan_nb:04d}") dataset_name = os.path.basename(dataset_processed_dir) return os.path.join( scan_processed_dir, f"{dataset_name}_{scan_nb:04d}" + "_1_{index}.esperanto" )
[docs] def get_processed_output_path(self, scan: BlissScanType) -> str: scan_nb = scan.scan_info["scan_nb"] dataset_processed_dir = get_dataset_processed_dir(self.get_filename(scan)) scan_processed_dir = os.path.join(dataset_processed_dir, f"scan{scan_nb:04d}") dataset_name = os.path.basename(dataset_processed_dir) return os.path.join(scan_processed_dir, f"{dataset_name}_{scan_nb:04d}")
[docs] def workflow_destination(self, scan: BlissScanType) -> str: return f"{self.get_processed_output_path(scan)}.json"
[docs] def get_lima_filenames(self, scan: BlissScanType) -> List[str]: scan_number = scan.scan_number image_path = scan.scan_saving.images_path.format( scan_number=scan_number, img_acq_device=self.lima_name ) return [f"{image_path}{self.lima_file_suffix}"]
[docs] def get_omega(self, scan: BlissScanType) -> str: fscan_parameters = scan.scan_info["instrument"]["fscan_parameters"] start = fscan_parameters["start_pos"] step = fscan_parameters["step_size"] return f"{start}+index*{step}"
[docs] def get_scan_parameters(self, scan: BlissScanType) -> Dict[str, Any]: fscan_parameters = scan.scan_info["instrument"]["fscan_parameters"] return { "omega": self.get_omega(scan), "exposure_time": fscan_parameters["acq_time"], }
[docs] def get_run_parameters(self, scan: BlissScanType) -> Dict[str, Any]: fscan_parameters = scan.scan_info["instrument"]["fscan_parameters"] step_size = fscan_parameters["step_size"] start_pos = fscan_parameters["start_pos"] return { "count": scan.scan_info["npoints"], "omega": 0, "omega_start": start_pos, "omega_end": start_pos + scan.scan_info["npoints"] * step_size, "pixel_size": 0.075, "omega_runs": None, "theta": 0, "kappa": 0, "phi": 0, "domega": step_size, "dtheta": 0, "dkappa": 0, "dphi": 0, "center_x": self.user_parameters["beam"][0], "center_y": self.user_parameters["beam"][1], "alpha": 50, "dist": self.user_parameters["distance"], "l1": self.user_parameters["wavelength"], "l2": self.user_parameters["wavelength"], "l12": self.user_parameters["wavelength"], "b": self.user_parameters["wavelength"], "mono": 0.99, "monotype": "SYNCHROTRON", "chip": [1024, 1024], "Exposure_time": fscan_parameters["acq_time"], }
[docs] def get_icat_metadata(self, scan: BlissScanType) -> Dict[str, Any]: fscan_parameters = scan.scan_info["instrument"]["fscan_parameters"] return { "Sample_name": scan.scan_saving.sample_name, "SCXRD_sample_detector_distance": self.user_parameters["distance"] * 1e-3, "SCXRD_beam_center_x": self.user_parameters["beam"][0], "SCXRD_beam_center_y": self.user_parameters["beam"][1], "InstrumentMonochromator_wavelength": self.user_parameters["wavelength"], "SCXRD_rotation_axis": "omega", "SCXRD_rotation_range": scan.scan_info["npoints"] * fscan_parameters["step_size"], "SCXRD_rotation_step": fscan_parameters["step_size"], "SCXRD_exposure_time": fscan_parameters["acq_time"], }
def _get_parameters(self, scan: BlissScanType) -> Dict[str, Any]: parameters = self.user_parameters.to_dict() parameters.update(self.update_scan_parameters(scan)) parameters.update(self.get_scan_parameters(scan)) return parameters
[docs] def get_inputs(self, scan: BlissScanType) -> List[Dict[str, Any]]: parameters = self._get_parameters(scan) inputs = self.get_crysalis_inputs(parameters) inputs.extend(self.get_extra_inputs(scan, parameters)) return inputs
[docs] def get_crysalis_inputs(self, parameters: Dict[str, Any]) -> List[Dict[str, Any]]: return [ { "task_identifier": "Eiger2Crysalis", "name": key, "value": value, } for key, value in parameters.items() ]
[docs] def get_extra_inputs( self, scan: BlissScanType, parameters: Dict[str, Any] ) -> List[Dict[str, Any]]: return [ { "id": "1", "task_identifier": "CreateIniFiles", "name": "ini_file", "value": self.ini_file, }, { "id": "2", "task_identifier": "CreateSetCcdFiles", "name": "ccd_set_file", "value": self.ccd_file, }, { "id": "3", "task_identifier": "CreateSetCcdFiles", "name": "ccd_set_file", "value": self.set_file, }, { "id": "4", "task_identifier": "CreateRunFiles", "name": "run_parameters", "value": self.get_run_parameters(scan), }, { "id": "5", "task_identifier": "CreateParFiles", "name": "par_file", "value": self.par_file, }, { "id": "6", "task_identifier": "AverageFrames", "name": "images", "value": parameters["images"], }, { "id": "7", "task_identifier": "DataPortal", "name": "metadata", "value": self.get_icat_metadata(scan), }, ]
[docs] def get_submit_arguments(self, scan: BlissScanType) -> Dict[str, Any]: return { "inputs": self.get_inputs(scan), "outputs": [{"all": "False"}], }
[docs] def run_conversion(self, scan: BlissScanType) -> None: if "fscan" in scan.scan_info["type"] and self.get_lima_name(scan): kwargs = self.get_submit_arguments(scan) kwargs["convert_destination"] = self.workflow_destination(scan) submit(args=(self.workflow,), kwargs=kwargs, queue=self.queue)
def _trigger_workflow_on_new_scan(self, scan: BlissScanType) -> None: self.run_conversion(scan)
[docs] class Eiger2CrysalisWithCbfProcessor( Eiger2CrysalisProcessor, parameters=[ ParameterInfo( "_workflow", category="workflows", deprecated_names=["workflow"], ), ParameterInfo( "_cbf_workflow", category="workflows", deprecated_names=["cbf_workflow"], doc="Workflow used when output_file_format includes 'cbf'", ), ParameterInfo( "output_file_format", category="Eiger2Crysalis", doc="Output file format: 'esperanto', 'cbf', or both as a list, e.g. ['esperanto', 'cbf']", ), ParameterInfo( "xds_parameters", category="Eiger2Crysalis", doc="Optional overrides for the CreateXDSInp task used in the CBF workflow", ), ], ):
[docs] def set_default_parameters(self, defaults: Dict[str, Any]) -> None: super().set_default_parameters(defaults) defaults.setdefault("_workflow", DEFAULT_EIGER2CRYSALIS_WORKFLOW) defaults.setdefault("_cbf_workflow", DEFAULT_EIGER2CBF_XDS_WORKFLOW) defaults.setdefault("lima_name", "eiger") defaults.setdefault("output_file_format", DEFAULT_OUTPUT_FILE_FORMAT) defaults.setdefault("xds_parameters", DEFAULT_XDS_INP_PARAMETERS.copy())
[docs] def update_scan_parameters(self, scan: Any = None) -> Dict[str, Any]: scan_parameters = dict() if scan: scan_parameters["images"] = self.get_lima_filenames(scan) if "esperanto" in self.get_output_file_formats(): scan_parameters["output"] = self.get_output_path(scan) scan_parameters["processed_output"] = self.get_processed_output_path( scan ) if "cbf" in self.get_output_file_formats(): scan_parameters["cbf_output"] = self.get_cbf_output_path(scan) self.scan_parameters.update(scan_parameters) return scan_parameters
[docs] def get_output_file_formats(self) -> List[str]: output_file_format = self.output_file_format if output_file_format is None: values = [DEFAULT_OUTPUT_FILE_FORMAT] elif isinstance(output_file_format, str): values = [item.strip() for item in output_file_format.split(",")] else: values = list(output_file_format) normalized = [] for value in values: if not value: continue if not isinstance(value, str): raise TypeError( "output_file_format must be a string or an iterable of strings" ) format_name = value.strip().lower() if format_name not in VALID_OUTPUT_FILE_FORMATS: raise ValueError( "Unsupported output_file_format " f"{value!r}. Expected one of {VALID_OUTPUT_FILE_FORMATS}." ) if format_name not in normalized: normalized.append(format_name) return normalized or [DEFAULT_OUTPUT_FILE_FORMAT]
[docs] def get_cbf_output_path(self, scan: BlissScanType) -> str: scan_nb = scan.scan_info.get("scan_nb", scan.scan_number) dataset_processed_dir = get_dataset_processed_dir(self.get_filename(scan)) scan_processed_dir = os.path.join(dataset_processed_dir, f"scan{scan_nb:04d}") dataset_name = os.path.basename(dataset_processed_dir) return os.path.join( scan_processed_dir, "cbf", f"{dataset_name}_{scan_nb:04d}_{{index:04d}}.cbf", )
[docs] def workflow_destination(self, scan: BlissScanType, output_file_format: str) -> str: if output_file_format == "esperanto": return f"{self.get_processed_output_path(scan)}.json" if output_file_format == "cbf": cbf_dir = os.path.dirname(self.get_cbf_output_path(scan)) base_name = os.path.basename(self.get_processed_output_path(scan)) return os.path.join(cbf_dir, f"{base_name}_cbf.json") raise ValueError( f"Unsupported output_file_format {output_file_format!r} for workflow destination." )
def _get_esperanto_inputs(self, scan: BlissScanType) -> List[Dict[str, Any]]: parameters = self.user_parameters.to_dict() parameters.update( { "images": self.get_lima_filenames(scan), "output": self.get_output_path(scan), "processed_output": self.get_processed_output_path(scan), } ) parameters.update(self.get_scan_parameters(scan)) inputs = self.get_crysalis_inputs(parameters) inputs.extend(self.get_extra_inputs(scan, parameters)) return inputs def _get_cbf_inputs(self, scan: BlissScanType) -> List[Dict[str, Any]]: parameters = self.user_parameters.to_dict() parameters.update( { "images": self.get_lima_filenames(scan), "output": self.get_cbf_output_path(scan), } ) parameters.update(self.get_scan_parameters(scan)) inputs = [] for key, value in parameters.items(): if key not in EIGER2CBF_INPUT_NAMES: continue inputs.append( { "task_identifier": "Eiger2CBF", "name": key, "value": value, } ) xds_parameters = self.get_xds_parameters(scan) for key, value in xds_parameters.items(): inputs.append( { "task_identifier": "CreateXDSInp", "name": key, "value": value, } ) return inputs
[docs] def get_xds_parameters(self, scan: BlissScanType) -> Dict[str, Any]: fscan_parameters = scan.scan_info["instrument"]["fscan_parameters"] offset = int(self.user_parameters["offset"]) count = int(scan.scan_info["npoints"]) data_range = [offset, offset + count - 1] parameters = { "background_range": list(data_range), "beam": list(self.user_parameters["beam"]), "data_range": data_range, "distance": self.user_parameters["distance"], "fraction_of_polarization": self.user_parameters["polarization"], "oscillation_range": abs(fscan_parameters["step_size"]), "output": self.get_cbf_output_path(scan), "spot_range": list(data_range), "starting_angle": fscan_parameters["start_pos"], "starting_frame": offset, "untrusted_rectangles": [list(v) for v in DEFAULT_UNTRUSTED_RECTANGLES], "wavelength": self.user_parameters["wavelength"], } for key, value in DEFAULT_XDS_OPTIONAL_PARAMETERS.items(): parameters.setdefault( key, list(value) if isinstance(value, list) else value ) parameters.update(self.xds_parameters.to_dict()) parameters["output"] = self.get_cbf_output_path(scan) return parameters
[docs] def get_inputs( self, scan: BlissScanType, output_file_format: Optional[str] = None ) -> List[Dict[str, Any]]: if output_file_format is None: inputs = [] for format_name in self.get_output_file_formats(): inputs.extend(self.get_inputs(scan, output_file_format=format_name)) return inputs if output_file_format == "esperanto": return self._get_esperanto_inputs(scan) if output_file_format == "cbf": return self._get_cbf_inputs(scan) raise ValueError( f"Unsupported output_file_format {output_file_format!r}. Expected one of {VALID_OUTPUT_FILE_FORMATS}." )
[docs] def get_submit_arguments( self, scan: BlissScanType, output_file_format: str ) -> Dict[str, Any]: return { "inputs": self.get_inputs(scan, output_file_format=output_file_format), "outputs": [{"all": "False"}], }
[docs] def get_workflow(self, output_file_format: str) -> str: if output_file_format == "esperanto": return self._workflow if output_file_format == "cbf": if not self._cbf_workflow: raise ValueError( "cbf_workflow must be configured when output_file_format includes 'cbf'." ) return self._cbf_workflow raise ValueError( f"Unsupported output_file_format {output_file_format!r}. Expected one of {VALID_OUTPUT_FILE_FORMATS}." )
[docs] def run_conversion(self, scan: BlissScanType) -> None: if "fscan" in scan.scan_info["type"] and self.get_lima_name(scan): for output_file_format in self.get_output_file_formats(): kwargs = self.get_submit_arguments(scan, output_file_format) kwargs["convert_destination"] = self.workflow_destination( scan, output_file_format ) submit( args=(self.get_workflow(output_file_format),), kwargs=kwargs, queue=self.queue, )
[docs] class Eiger2CrysalisLima2Processor(Eiger2CrysalisProcessor): detector_channel_suffix = "raw_frame" lima_file_suffix = "frame_0_00000.h5"
[docs] def set_default_parameters(self, defaults: Dict[str, Any]) -> None: workflow = defaults.pop("workflow", DEFAULT_EIGER2CRYSALIS_LIMA2_WORKFLOW) super().set_default_parameters(defaults) defaults["workflow"] = workflow
[docs] def get_extra_inputs( self, scan: BlissScanType, parameters: Dict[str, Any] ) -> List[Dict[str, Any]]: return [ { "id": "0", "task_identifier": "lima2Thresholding", "name": "images", "value": self.get_lima_filenames(scan), }, { "id": "0", "task_identifier": "lima2Thresholding", "name": "output", "value": self.get_processed_output_path(scan), }, { "id": "0", "task_identifier": "lima2Thresholding", "name": "scale_factor", "value": self.scale_factor, }, { "id": "2", "task_identifier": "CreateIniFiles", "name": "ini_file", "value": self.ini_file, }, { "id": "3", "task_identifier": "CreateSetCcdFiles", "name": "ccd_set_file", "value": self.ccd_file, }, { "id": "4", "task_identifier": "CreateSetCcdFiles", "name": "ccd_set_file", "value": self.set_file, }, { "id": "5", "task_identifier": "CreateRunFiles", "name": "run_parameters", "value": self.get_run_parameters(scan), }, { "id": "6", "task_identifier": "CreateParFiles", "name": "par_file", "value": self.par_file, }, { "id": "7", "task_identifier": "AverageFrames", "name": "images", "value": parameters["images"], }, ]