import os
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from ..bliss_globals import current_session
from ..ewoks_utils import submit
from ..persistent.parameters import ParameterInfo
from ..processor import BaseProcessor
from ..processor import BlissScanType
from ..utils import directories
_DEFAULT_OUTPUT_FILE_FORMAT = "esperanto"
_VALID_OUTPUT_FILE_FORMATS = {"cbf", "esperanto"}
_EIGER2CBF_INPUT_NAMES = {
"images",
"output",
"mask",
"offset",
"dummy",
"pilatus",
"dry_run",
"energy",
"wavelength",
"distance",
"beam",
"alpha",
"kappa",
"chi",
"phi",
"omega",
"rotation",
"transpose",
"flip_ud",
"flip_lr",
"verbose",
"debug",
}
_DEFAULT_XDS_INP_PARAMETERS: Dict[str, Any] = {
"detector": "EIGER",
"nx": 3108,
"ny": 3262,
"qx": 0.075,
"qy": 0.075,
}
_DEFAULT_UNTRUSTED_RECTANGLES = [
[513, 514, 0, 3262],
[1028, 1039, 0, 3262],
[1553, 1554, 0, 3262],
[2068, 2079, 0, 3262],
[2593, 2594, 0, 3262],
[0, 3108, 512, 549],
[0, 3108, 1062, 1099],
[0, 3108, 1612, 1649],
[0, 3108, 2162, 2199],
[0, 3108, 2712, 2749],
]
_DEFAULT_XDS_OPTIONAL_PARAMETERS: Dict[str, Any] = {
"filename": "XDS.INP",
"job": "XYCORR INIT COLSPOT IDXREF DEFPIX INTEGRATE CORRECT",
"overload": 100000000,
"space_group_number": 0,
"friedels_law": "FALSE",
"minimum_valid_pixel_value": 0,
"trusted_region": [0.0, 1.41],
"polarization_plane_normal": [0.0, 1.0, 0.0],
"direction_of_detector_x_axis": [1.0, 0.0, 0.0],
"direction_of_detector_y_axis": [0.0, 1.0, 0.0],
"rotation_axis": [0.0, -1.0, 0.0],
"incident_beam_direction": [0.0, 0.0, 1.0],
"sensor_thickness": 0.75,
"silicon": 18.023,
}
_DEFAULT_USER_PARS: Dict[str, Any] = {
"flip_ud": False,
"flip_lr": False,
"wavelength": 0.1,
"distance": 100,
"beam": [1000, 1100],
"polarization": 0.99,
"kappa": 0,
"alpha": 50,
"theta": 0,
"phi": 0,
"omega": "",
"rotation": 180,
"dummy": -1,
"offset": 1,
"dry_run": False,
"calc_mask": False,
}
_DEFAULT_SCAN_PARS: Dict[str, Any] = {
"images": "",
"output": "",
}
[docs]
class Id15bEiger2Crysalis(
BaseProcessor,
parameters=[
ParameterInfo("queue", category="workflows", deprecated_names=["worker"]),
ParameterInfo(
"_workflow",
category="workflows",
deprecated_names=["workflow"],
),
ParameterInfo(
"_cbf_workflow",
category="workflows",
deprecated_names=["cbf_workflow"],
doc="Workflow used when output_file_format includes 'cbf'",
),
ParameterInfo(
"lima_name",
category="Eiger2Crysalis",
doc="Lima name of the camera: (i.e. eiger)",
),
ParameterInfo(
"output_file_format",
category="Eiger2Crysalis",
doc="Output file format: 'esperanto', 'cbf', or both as a list, e.g. ['esperanto', 'cbf']",
),
ParameterInfo(
"scan_parameters",
category="Eiger2Crysalis",
doc="Derived from scan and motors",
),
ParameterInfo(
"user_parameters", category="Eiger2Crysalis", doc="Specify explicitly"
),
ParameterInfo(
"xds_parameters",
category="Eiger2Crysalis",
doc="Optional overrides for the CreateXDSInp task used in the CBF workflow",
),
ParameterInfo(
"ini_file",
category="ExtraFiles",
doc="CrysalisExpSettings.ini path",
),
ParameterInfo(
"par_file",
category="ExtraFiles",
doc=".par file path",
),
ParameterInfo(
"set_file",
category="ExtraFiles",
doc=".set file path",
),
ParameterInfo(
"ccd_file",
category="ExtraFiles",
doc=".ccd file path",
),
],
):
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
defaults: Optional[Dict[str, Any]] = None,
**deprecated_defaults: Dict[str, Any],
) -> None:
defaults = self._merge_defaults(deprecated_defaults, defaults)
defaults.setdefault("trigger_at", "END")
defaults.setdefault("_workflow", "")
defaults.setdefault("_cbf_workflow", "")
defaults.setdefault("lima_name", "eiger")
defaults.setdefault("output_file_format", _DEFAULT_OUTPUT_FILE_FORMAT)
defaults.setdefault("user_parameters", _DEFAULT_USER_PARS)
defaults.setdefault("xds_parameters", _DEFAULT_XDS_INP_PARAMETERS.copy())
defaults.setdefault("scan_parameters", _DEFAULT_SCAN_PARS)
defaults.setdefault("crysalis_ini", "")
super().__init__(config=config, defaults=defaults)
def _info_categories(self) -> Dict[str, dict]:
self.update_scan_parameters()
return super()._info_categories()
[docs]
def update_scan_parameters(self, scan: Any = None) -> None:
scan_parameters = dict()
if scan:
scan_parameters["images"] = self.get_lima_filenames(scan)
if "esperanto" in self.get_output_file_formats():
scan_parameters["output"] = self.get_output_path(scan)
scan_parameters["processed_output"] = self.get_processed_output_path(
scan
)
if "cbf" in self.get_output_file_formats():
scan_parameters["cbf_output"] = self.get_cbf_output_path(scan)
self.scan_parameters.update(scan_parameters)
return scan_parameters
[docs]
def get_filename(self, scan: BlissScanType) -> str:
filename = scan.scan_info.get("filename")
if filename:
return filename
return current_session.scan_saving.filename
[docs]
def get_lima_name(self, scan: BlissScanType) -> List[str]:
channels = scan.scan_info.get("channels", dict())
if f"{self.lima_name}:image" in channels:
return self.lima_name
else:
raise AssertionError(f"{self.lima_name} detector not found.")
[docs]
def get_output_path(self, scan: BlissScanType) -> str:
scan_nb = scan.scan_info["scan_nb"]
dataset_processed_dir = get_dataset_processed_dir(self.get_filename(scan))
scan_processed_dir = os.path.join(dataset_processed_dir, f"scan{scan_nb:04d}")
dataset_name = os.path.basename(dataset_processed_dir)
output = os.path.join(
scan_processed_dir, f"{dataset_name}_{scan_nb:04d}" + "_1_{index}.esperanto"
)
return output
[docs]
def get_cbf_output_path(self, scan: BlissScanType) -> str:
scan_nb = scan.scan_info.get("scan_nb", scan.scan_number)
dataset_processed_dir = get_dataset_processed_dir(self.get_filename(scan))
scan_processed_dir = os.path.join(dataset_processed_dir, f"scan{scan_nb:04d}")
dataset_name = os.path.basename(dataset_processed_dir)
return os.path.join(
scan_processed_dir,
"cbf",
f"{dataset_name}_{scan_nb:04d}_{{index:04d}}.cbf",
)
[docs]
def get_processed_output_path(self, scan: BlissScanType) -> str:
scan_nb = scan.scan_info["scan_nb"]
dataset_processed_dir = get_dataset_processed_dir(self.get_filename(scan))
scan_processed_dir = os.path.join(dataset_processed_dir, f"scan{scan_nb:04d}")
dataset_name = os.path.basename(dataset_processed_dir)
output = os.path.join(scan_processed_dir, f"{dataset_name}_{scan_nb:04d}")
return output
[docs]
def workflow_destination(self, scan, output_file_format: str) -> str:
if output_file_format == "esperanto":
return f"{self.get_processed_output_path(scan)}.json"
if output_file_format == "cbf":
cbf_dir = os.path.dirname(self.get_cbf_output_path(scan))
base_name = os.path.basename(self.get_processed_output_path(scan))
return os.path.join(cbf_dir, f"{base_name}_cbf.json")
raise ValueError(
f"Unsupported output_file_format {output_file_format!r} for workflow destination."
)
[docs]
def get_lima_filenames(self, scan: BlissScanType) -> List[str]:
scan_number = scan.scan_number
lima_files = [
f"{scan.scan_saving.images_path.format(scan_number=scan_number, img_acq_device=self.lima_name)}0000.h5"
]
return lima_files
[docs]
def get_omega(self, scan: BlissScanType) -> str:
"""
Provide the omega parameter as a formatted string based on scan info.
The scan is assumed to be centered around zero.
"""
# Extract scan parameters.
start = scan.scan_info["instrument"]["fscan_parameters"]["start_pos"]
step = scan.scan_info["instrument"]["fscan_parameters"]["step_size"]
return f"{start}+index*{step}"
[docs]
def get_scan_parameters(self, scan: BlissScanType) -> Dict[str, Any]:
scan_parameters = dict()
scan_parameters["omega"] = self.get_omega(scan)
scan_parameters["exposure_time"] = scan.scan_info["instrument"][
"fscan_parameters"
]["acq_time"]
return scan_parameters
[docs]
def get_run_parameters(self, scan: BlissScanType) -> Dict[str, Any]:
return {
"count": scan.scan_info["npoints"],
"omega": 0,
"omega_start": scan.scan_info["instrument"]["fscan_parameters"][
"start_pos"
],
"omega_end": scan.scan_info["instrument"]["fscan_parameters"]["start_pos"]
+ scan.scan_info["npoints"]
* scan.scan_info["instrument"]["fscan_parameters"]["step_size"],
"pixel_size": 0.075,
"omega_runs": None,
"theta": 0,
"kappa": 0,
"phi": 0,
"domega": scan.scan_info["instrument"]["fscan_parameters"]["step_size"],
"dtheta": 0,
"dkappa": 0,
"dphi": 0,
"center_x": self.user_parameters["beam"][0],
"center_y": self.user_parameters["beam"][1],
"alpha": 50,
"dist": self.user_parameters["distance"],
"l1": self.user_parameters["wavelength"],
"l2": self.user_parameters["wavelength"],
"l12": self.user_parameters["wavelength"],
"b": self.user_parameters["wavelength"],
"mono": 0.99,
"monotype": "SYNCHROTRON",
"chip": [1024, 1024],
"Exposure_time": scan.scan_info["instrument"]["fscan_parameters"][
"acq_time"
],
}
def _get_esperanto_inputs(self, scan: BlissScanType) -> List[Dict[str, Any]]:
parameters = self.user_parameters.to_dict()
parameters.update(
{
"images": self.get_lima_filenames(scan),
"output": self.get_output_path(scan),
"processed_output": self.get_processed_output_path(scan),
}
)
parameters.update(self.get_scan_parameters(scan))
icat_metadata = self.get_icat_metadata(scan)
inputs = []
for key, value in parameters.items():
inputs.append(
{
"task_identifier": "Eiger2Crysalis",
"name": key,
"value": value,
}
)
inputs += [
{
"id": "1",
"task_identifier": "CreateIniFiles",
"name": "ini_file",
"value": self.ini_file,
},
{
"id": "2",
"task_identifier": "CreateSetCcdFiles",
"name": "ccd_set_file",
"value": self.ccd_file,
},
{
"id": "3",
"task_identifier": "CreateSetCcdFiles",
"name": "ccd_set_file",
"value": self.set_file,
},
{
"id": "4",
"task_identifier": "CreateRunFiles",
"name": "run_parameters",
"value": self.get_run_parameters(scan),
},
{
"id": "5",
"task_identifier": "CreateParFiles",
"name": "par_file",
"value": self.par_file,
},
{
"id": "6",
"task_identifier": "AverageFrames",
"name": "images",
"value": parameters["images"],
},
{
"id": "7",
"task_identifier": "DataPortal",
"name": "metadata",
"value": icat_metadata,
},
]
return inputs
def _get_cbf_inputs(self, scan: BlissScanType) -> List[Dict[str, Any]]:
parameters = self.user_parameters.to_dict()
parameters.update(
{
"images": self.get_lima_filenames(scan),
"output": self.get_cbf_output_path(scan),
}
)
parameters.update(self.get_scan_parameters(scan))
inputs = []
for key, value in parameters.items():
if key not in _EIGER2CBF_INPUT_NAMES:
continue
inputs.append(
{
"task_identifier": "Eiger2CBF",
"name": key,
"value": value,
}
)
xds_parameters = self.get_xds_parameters(scan)
for key, value in xds_parameters.items():
inputs.append(
{
"task_identifier": "CreateXDSInp",
"name": key,
"value": value,
}
)
return inputs
[docs]
def get_xds_parameters(self, scan: BlissScanType) -> Dict[str, Any]:
fscan_parameters = scan.scan_info["instrument"]["fscan_parameters"]
offset = int(self.user_parameters["offset"])
count = int(scan.scan_info["npoints"])
data_range = [offset, offset + count - 1]
parameters = {
"background_range": list(data_range),
"beam": list(self.user_parameters["beam"]),
"data_range": data_range,
"distance": self.user_parameters["distance"],
"fraction_of_polarization": self.user_parameters["polarization"],
"oscillation_range": abs(fscan_parameters["step_size"]),
"output": self.get_cbf_output_path(scan),
"spot_range": list(data_range),
"starting_angle": fscan_parameters["start_pos"],
"starting_frame": offset,
"untrusted_rectangles": [list(v) for v in _DEFAULT_UNTRUSTED_RECTANGLES],
"wavelength": self.user_parameters["wavelength"],
}
for key, value in _DEFAULT_XDS_OPTIONAL_PARAMETERS.items():
parameters.setdefault(
key, list(value) if isinstance(value, list) else value
)
parameters.update(self.xds_parameters.to_dict())
parameters["output"] = self.get_cbf_output_path(scan)
return parameters
[docs]
def get_submit_arguments(
self, scan: BlissScanType, output_file_format: str
) -> Dict[str, Any]:
return {
"inputs": self.get_inputs(scan, output_file_format=output_file_format),
"outputs": [{"all": "False"}],
}
[docs]
def get_workflow(self, output_file_format: str) -> str:
if output_file_format == "esperanto":
return self._workflow
if output_file_format == "cbf":
if not self._cbf_workflow:
raise ValueError(
"cbf_workflow must be configured when output_file_format includes 'cbf'."
)
return self._cbf_workflow
raise ValueError(
f"Unsupported output_file_format {output_file_format!r}. Expected one of {_VALID_OUTPUT_FILE_FORMATS}."
)
[docs]
def run_conversion(self, scan: BlissScanType) -> None:
"""Executes on given scan"""
if "fscan" in scan.scan_info["type"] and self.get_lima_name(scan):
for output_file_format in self.get_output_file_formats():
kwargs = self.get_submit_arguments(scan, output_file_format)
kwargs["convert_destination"] = self.workflow_destination(
scan, output_file_format
)
submit(
args=(self.get_workflow(output_file_format),),
kwargs=kwargs,
queue=self.queue,
)
def _trigger_workflow_on_new_scan(self, scan: BlissScanType) -> None:
self.run_conversion(scan)
[docs]
def get_dataset_processed_dir(
dataset_filename: str,
) -> str: # Temporary, waiting for !173 to be merged
root = directories.get_processed_dir(dataset_filename)
collection = os.path.basename(directories.get_collection_dir(dataset_filename))
dataset = os.path.basename(directories.get_dataset_dir(dataset_filename))
return os.path.join(root, collection, dataset)