Source code for blissoda.id15b.eiger2crysalis

import os
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

from ..bliss_globals import current_session
from ..ewoks_utils import submit
from ..persistent.parameters import ParameterInfo
from ..processor import BaseProcessor
from ..processor import BlissScanType
from ..utils import directories

_DEFAULT_OUTPUT_FILE_FORMAT = "esperanto"
_VALID_OUTPUT_FILE_FORMATS = {"cbf", "esperanto"}
_EIGER2CBF_INPUT_NAMES = {
    "images",
    "output",
    "mask",
    "offset",
    "dummy",
    "pilatus",
    "dry_run",
    "energy",
    "wavelength",
    "distance",
    "beam",
    "alpha",
    "kappa",
    "chi",
    "phi",
    "omega",
    "rotation",
    "transpose",
    "flip_ud",
    "flip_lr",
    "verbose",
    "debug",
}
_DEFAULT_XDS_INP_PARAMETERS: Dict[str, Any] = {
    "detector": "EIGER",
    "nx": 3108,
    "ny": 3262,
    "qx": 0.075,
    "qy": 0.075,
}
_DEFAULT_UNTRUSTED_RECTANGLES = [
    [513, 514, 0, 3262],
    [1028, 1039, 0, 3262],
    [1553, 1554, 0, 3262],
    [2068, 2079, 0, 3262],
    [2593, 2594, 0, 3262],
    [0, 3108, 512, 549],
    [0, 3108, 1062, 1099],
    [0, 3108, 1612, 1649],
    [0, 3108, 2162, 2199],
    [0, 3108, 2712, 2749],
]
_DEFAULT_XDS_OPTIONAL_PARAMETERS: Dict[str, Any] = {
    "filename": "XDS.INP",
    "job": "XYCORR INIT COLSPOT IDXREF DEFPIX INTEGRATE CORRECT",
    "overload": 100000000,
    "space_group_number": 0,
    "friedels_law": "FALSE",
    "minimum_valid_pixel_value": 0,
    "trusted_region": [0.0, 1.41],
    "polarization_plane_normal": [0.0, 1.0, 0.0],
    "direction_of_detector_x_axis": [1.0, 0.0, 0.0],
    "direction_of_detector_y_axis": [0.0, 1.0, 0.0],
    "rotation_axis": [0.0, -1.0, 0.0],
    "incident_beam_direction": [0.0, 0.0, 1.0],
    "sensor_thickness": 0.75,
    "silicon": 18.023,
}
_DEFAULT_USER_PARS: Dict[str, Any] = {
    "flip_ud": False,
    "flip_lr": False,
    "wavelength": 0.1,
    "distance": 100,
    "beam": [1000, 1100],
    "polarization": 0.99,
    "kappa": 0,
    "alpha": 50,
    "theta": 0,
    "phi": 0,
    "omega": "",
    "rotation": 180,
    "dummy": -1,
    "offset": 1,
    "dry_run": False,
    "calc_mask": False,
}

_DEFAULT_SCAN_PARS: Dict[str, Any] = {
    "images": "",
    "output": "",
}


[docs] class Id15bEiger2Crysalis( BaseProcessor, parameters=[ ParameterInfo("queue", category="workflows", deprecated_names=["worker"]), ParameterInfo( "_workflow", category="workflows", deprecated_names=["workflow"], ), ParameterInfo( "_cbf_workflow", category="workflows", deprecated_names=["cbf_workflow"], doc="Workflow used when output_file_format includes 'cbf'", ), ParameterInfo( "lima_name", category="Eiger2Crysalis", doc="Lima name of the camera: (i.e. eiger)", ), ParameterInfo( "output_file_format", category="Eiger2Crysalis", doc="Output file format: 'esperanto', 'cbf', or both as a list, e.g. ['esperanto', 'cbf']", ), ParameterInfo( "scan_parameters", category="Eiger2Crysalis", doc="Derived from scan and motors", ), ParameterInfo( "user_parameters", category="Eiger2Crysalis", doc="Specify explicitly" ), ParameterInfo( "xds_parameters", category="Eiger2Crysalis", doc="Optional overrides for the CreateXDSInp task used in the CBF workflow", ), ParameterInfo( "ini_file", category="ExtraFiles", doc="CrysalisExpSettings.ini path", ), ParameterInfo( "par_file", category="ExtraFiles", doc=".par file path", ), ParameterInfo( "set_file", category="ExtraFiles", doc=".set file path", ), ParameterInfo( "ccd_file", category="ExtraFiles", doc=".ccd file path", ), ], ): def __init__( self, config: Optional[Dict[str, Any]] = None, defaults: Optional[Dict[str, Any]] = None, **deprecated_defaults: Dict[str, Any], ) -> None: defaults = self._merge_defaults(deprecated_defaults, defaults) defaults.setdefault("trigger_at", "END") defaults.setdefault("_workflow", "") defaults.setdefault("_cbf_workflow", "") defaults.setdefault("lima_name", "eiger") defaults.setdefault("output_file_format", _DEFAULT_OUTPUT_FILE_FORMAT) defaults.setdefault("user_parameters", _DEFAULT_USER_PARS) defaults.setdefault("xds_parameters", _DEFAULT_XDS_INP_PARAMETERS.copy()) defaults.setdefault("scan_parameters", _DEFAULT_SCAN_PARS) defaults.setdefault("crysalis_ini", "") super().__init__(config=config, defaults=defaults) def _info_categories(self) -> Dict[str, dict]: self.update_scan_parameters() return super()._info_categories()
[docs] def update_scan_parameters(self, scan: Any = None) -> None: scan_parameters = dict() if scan: scan_parameters["images"] = self.get_lima_filenames(scan) if "esperanto" in self.get_output_file_formats(): scan_parameters["output"] = self.get_output_path(scan) scan_parameters["processed_output"] = self.get_processed_output_path( scan ) if "cbf" in self.get_output_file_formats(): scan_parameters["cbf_output"] = self.get_cbf_output_path(scan) self.scan_parameters.update(scan_parameters) return scan_parameters
[docs] def get_output_file_formats(self) -> List[str]: output_file_format = self.output_file_format if output_file_format is None: values = [_DEFAULT_OUTPUT_FILE_FORMAT] elif isinstance(output_file_format, str): values = [item.strip() for item in output_file_format.split(",")] else: values = list(output_file_format) normalized = [] for value in values: if not value: continue if not isinstance(value, str): raise TypeError( "output_file_format must be a string or an iterable of strings" ) format_name = value.strip().lower() if format_name not in _VALID_OUTPUT_FILE_FORMATS: raise ValueError( "Unsupported output_file_format " f"{value!r}. Expected one of {_VALID_OUTPUT_FILE_FORMATS}." ) if format_name not in normalized: normalized.append(format_name) return normalized or [_DEFAULT_OUTPUT_FILE_FORMAT]
[docs] def get_filename(self, scan: BlissScanType) -> str: filename = scan.scan_info.get("filename") if filename: return filename return current_session.scan_saving.filename
[docs] def get_lima_name(self, scan: BlissScanType) -> List[str]: channels = scan.scan_info.get("channels", dict()) if f"{self.lima_name}:image" in channels: return self.lima_name else: raise AssertionError(f"{self.lima_name} detector not found.")
[docs] def get_output_path(self, scan: BlissScanType) -> str: scan_nb = scan.scan_info["scan_nb"] dataset_processed_dir = get_dataset_processed_dir(self.get_filename(scan)) scan_processed_dir = os.path.join(dataset_processed_dir, f"scan{scan_nb:04d}") dataset_name = os.path.basename(dataset_processed_dir) output = os.path.join( scan_processed_dir, f"{dataset_name}_{scan_nb:04d}" + "_1_{index}.esperanto" ) return output
[docs] def get_cbf_output_path(self, scan: BlissScanType) -> str: scan_nb = scan.scan_info.get("scan_nb", scan.scan_number) dataset_processed_dir = get_dataset_processed_dir(self.get_filename(scan)) scan_processed_dir = os.path.join(dataset_processed_dir, f"scan{scan_nb:04d}") dataset_name = os.path.basename(dataset_processed_dir) return os.path.join( scan_processed_dir, "cbf", f"{dataset_name}_{scan_nb:04d}_{{index:04d}}.cbf", )
[docs] def get_processed_output_path(self, scan: BlissScanType) -> str: scan_nb = scan.scan_info["scan_nb"] dataset_processed_dir = get_dataset_processed_dir(self.get_filename(scan)) scan_processed_dir = os.path.join(dataset_processed_dir, f"scan{scan_nb:04d}") dataset_name = os.path.basename(dataset_processed_dir) output = os.path.join(scan_processed_dir, f"{dataset_name}_{scan_nb:04d}") return output
[docs] def workflow_destination(self, scan, output_file_format: str) -> str: if output_file_format == "esperanto": return f"{self.get_processed_output_path(scan)}.json" if output_file_format == "cbf": cbf_dir = os.path.dirname(self.get_cbf_output_path(scan)) base_name = os.path.basename(self.get_processed_output_path(scan)) return os.path.join(cbf_dir, f"{base_name}_cbf.json") raise ValueError( f"Unsupported output_file_format {output_file_format!r} for workflow destination." )
[docs] def get_lima_filenames(self, scan: BlissScanType) -> List[str]: scan_number = scan.scan_number lima_files = [ f"{scan.scan_saving.images_path.format(scan_number=scan_number, img_acq_device=self.lima_name)}0000.h5" ] return lima_files
[docs] def get_omega(self, scan: BlissScanType) -> str: """ Provide the omega parameter as a formatted string based on scan info. The scan is assumed to be centered around zero. """ # Extract scan parameters. start = scan.scan_info["instrument"]["fscan_parameters"]["start_pos"] step = scan.scan_info["instrument"]["fscan_parameters"]["step_size"] return f"{start}+index*{step}"
[docs] def get_scan_parameters(self, scan: BlissScanType) -> Dict[str, Any]: scan_parameters = dict() scan_parameters["omega"] = self.get_omega(scan) scan_parameters["exposure_time"] = scan.scan_info["instrument"][ "fscan_parameters" ]["acq_time"] return scan_parameters
[docs] def get_run_parameters(self, scan: BlissScanType) -> Dict[str, Any]: return { "count": scan.scan_info["npoints"], "omega": 0, "omega_start": scan.scan_info["instrument"]["fscan_parameters"][ "start_pos" ], "omega_end": scan.scan_info["instrument"]["fscan_parameters"]["start_pos"] + scan.scan_info["npoints"] * scan.scan_info["instrument"]["fscan_parameters"]["step_size"], "pixel_size": 0.075, "omega_runs": None, "theta": 0, "kappa": 0, "phi": 0, "domega": scan.scan_info["instrument"]["fscan_parameters"]["step_size"], "dtheta": 0, "dkappa": 0, "dphi": 0, "center_x": self.user_parameters["beam"][0], "center_y": self.user_parameters["beam"][1], "alpha": 50, "dist": self.user_parameters["distance"], "l1": self.user_parameters["wavelength"], "l2": self.user_parameters["wavelength"], "l12": self.user_parameters["wavelength"], "b": self.user_parameters["wavelength"], "mono": 0.99, "monotype": "SYNCHROTRON", "chip": [1024, 1024], "Exposure_time": scan.scan_info["instrument"]["fscan_parameters"][ "acq_time" ], }
[docs] def get_icat_metadata(self, scan): metadata = { "Sample_name": scan.scan_saving.sample_name, "SCXRD_sample_detector_distance": self.user_parameters["distance"] * 1e-3, "SCXRD_beam_center_x": self.user_parameters["beam"][0], "SCXRD_beam_center_y": self.user_parameters["beam"][1], "InstrumentMonochromator_wavelength": self.user_parameters["wavelength"], # "SCXRD_detector_tilts" : [0,0,0], "SCXRD_rotation_axis": "omega", "SCXRD_rotation_range": scan.scan_info["npoints"] * scan.scan_info["instrument"]["fscan_parameters"]["step_size"], "SCXRD_rotation_step": scan.scan_info["instrument"]["fscan_parameters"][ "step_size" ], "SCXRD_exposure_time": scan.scan_info["instrument"]["fscan_parameters"][ "acq_time" ], } return metadata
def _get_esperanto_inputs(self, scan: BlissScanType) -> List[Dict[str, Any]]: parameters = self.user_parameters.to_dict() parameters.update( { "images": self.get_lima_filenames(scan), "output": self.get_output_path(scan), "processed_output": self.get_processed_output_path(scan), } ) parameters.update(self.get_scan_parameters(scan)) icat_metadata = self.get_icat_metadata(scan) inputs = [] for key, value in parameters.items(): inputs.append( { "task_identifier": "Eiger2Crysalis", "name": key, "value": value, } ) inputs += [ { "id": "1", "task_identifier": "CreateIniFiles", "name": "ini_file", "value": self.ini_file, }, { "id": "2", "task_identifier": "CreateSetCcdFiles", "name": "ccd_set_file", "value": self.ccd_file, }, { "id": "3", "task_identifier": "CreateSetCcdFiles", "name": "ccd_set_file", "value": self.set_file, }, { "id": "4", "task_identifier": "CreateRunFiles", "name": "run_parameters", "value": self.get_run_parameters(scan), }, { "id": "5", "task_identifier": "CreateParFiles", "name": "par_file", "value": self.par_file, }, { "id": "6", "task_identifier": "AverageFrames", "name": "images", "value": parameters["images"], }, { "id": "7", "task_identifier": "DataPortal", "name": "metadata", "value": icat_metadata, }, ] return inputs def _get_cbf_inputs(self, scan: BlissScanType) -> List[Dict[str, Any]]: parameters = self.user_parameters.to_dict() parameters.update( { "images": self.get_lima_filenames(scan), "output": self.get_cbf_output_path(scan), } ) parameters.update(self.get_scan_parameters(scan)) inputs = [] for key, value in parameters.items(): if key not in _EIGER2CBF_INPUT_NAMES: continue inputs.append( { "task_identifier": "Eiger2CBF", "name": key, "value": value, } ) xds_parameters = self.get_xds_parameters(scan) for key, value in xds_parameters.items(): inputs.append( { "task_identifier": "CreateXDSInp", "name": key, "value": value, } ) return inputs
[docs] def get_xds_parameters(self, scan: BlissScanType) -> Dict[str, Any]: fscan_parameters = scan.scan_info["instrument"]["fscan_parameters"] offset = int(self.user_parameters["offset"]) count = int(scan.scan_info["npoints"]) data_range = [offset, offset + count - 1] parameters = { "background_range": list(data_range), "beam": list(self.user_parameters["beam"]), "data_range": data_range, "distance": self.user_parameters["distance"], "fraction_of_polarization": self.user_parameters["polarization"], "oscillation_range": abs(fscan_parameters["step_size"]), "output": self.get_cbf_output_path(scan), "spot_range": list(data_range), "starting_angle": fscan_parameters["start_pos"], "starting_frame": offset, "untrusted_rectangles": [list(v) for v in _DEFAULT_UNTRUSTED_RECTANGLES], "wavelength": self.user_parameters["wavelength"], } for key, value in _DEFAULT_XDS_OPTIONAL_PARAMETERS.items(): parameters.setdefault( key, list(value) if isinstance(value, list) else value ) parameters.update(self.xds_parameters.to_dict()) parameters["output"] = self.get_cbf_output_path(scan) return parameters
[docs] def get_inputs( self, scan: BlissScanType, output_file_format: Optional[str] = None ) -> List[Dict[str, Any]]: if output_file_format is None: inputs = [] for format_name in self.get_output_file_formats(): inputs.extend(self.get_inputs(scan, output_file_format=format_name)) return inputs if output_file_format == "esperanto": return self._get_esperanto_inputs(scan) if output_file_format == "cbf": return self._get_cbf_inputs(scan) raise ValueError( f"Unsupported output_file_format {output_file_format!r}. Expected one of {_VALID_OUTPUT_FILE_FORMATS}." )
[docs] def get_submit_arguments( self, scan: BlissScanType, output_file_format: str ) -> Dict[str, Any]: return { "inputs": self.get_inputs(scan, output_file_format=output_file_format), "outputs": [{"all": "False"}], }
[docs] def get_workflow(self, output_file_format: str) -> str: if output_file_format == "esperanto": return self._workflow if output_file_format == "cbf": if not self._cbf_workflow: raise ValueError( "cbf_workflow must be configured when output_file_format includes 'cbf'." ) return self._cbf_workflow raise ValueError( f"Unsupported output_file_format {output_file_format!r}. Expected one of {_VALID_OUTPUT_FILE_FORMATS}." )
[docs] def run_conversion(self, scan: BlissScanType) -> None: """Executes on given scan""" if "fscan" in scan.scan_info["type"] and self.get_lima_name(scan): for output_file_format in self.get_output_file_formats(): kwargs = self.get_submit_arguments(scan, output_file_format) kwargs["convert_destination"] = self.workflow_destination( scan, output_file_format ) submit( args=(self.get_workflow(output_file_format),), kwargs=kwargs, queue=self.queue, )
def _trigger_workflow_on_new_scan(self, scan: BlissScanType) -> None: self.run_conversion(scan)
[docs] def get_dataset_processed_dir( dataset_filename: str, ) -> str: # Temporary, waiting for !173 to be merged root = directories.get_processed_dir(dataset_filename) collection = os.path.basename(directories.get_collection_dir(dataset_filename)) dataset = os.path.basename(directories.get_dataset_dir(dataset_filename)) return os.path.join(root, collection, dataset)