"""Automatic pyfai integration for every scan with saving and plotting"""
import os
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from ewoksutils.task_utils import task_inputs
from ..bliss_globals import setup_globals
from ..import_utils import unavailable_class
from ..persistent.parameters import ParameterInfo
from ..processor import BlissScanType
from ..utils import validators
from ..xrpd.processor import XrpdProcessor
try:
from bliss.common.axis import Axis
except ImportError as ex:
Axis = unavailable_class(ex)
[docs]
def is_motor_name(value) -> str:
if isinstance(value, Axis):
return str(value.name)
return str(value)
LIMA2_DETECTORS = "p4", "smartpix"
[docs]
class Id31XrpdProcessor(
XrpdProcessor,
parameters=[
ParameterInfo("pyfai_config", category="PyFai", validator=validators.is_file),
ParameterInfo("integration_options", category="PyFai"),
ParameterInfo(
"second_detector_pyfai_config",
category="PyFai",
validator=validators.is_file,
),
ParameterInfo("second_detector_integration_options", category="PyFai"),
ParameterInfo("flat_enabled", category="Flat-field", validator=bool),
ParameterInfo("newflat", category="Flat-field", validator=validators.is_file),
ParameterInfo("oldflat", category="Flat-field", validator=validators.is_file),
ParameterInfo(
"ascii_options",
category="Data saving",
),
ParameterInfo(
"tomo_enabled",
category="Data saving/Tomo",
validator=bool,
),
ParameterInfo(
"tomo_rot_name",
category="Data saving/Tomo",
validator=is_motor_name,
),
ParameterInfo(
"tomo_y_name",
category="Data saving/Tomo",
validator=is_motor_name,
),
ParameterInfo(
"tomo_scans",
category="Data saving/Tomo",
),
],
):
DEFAULT_LIMA_URL_TEMPLATE: Optional[str] = (
"{dirname}/{images_path_template}/{images_prefix}{{file_index}}.h5::/entry_0000/measurement/data"
)
DEFAULT_WORKFLOW = "integrate_with_saving_with_flat.json"
DEFAULT_WORKFLOW_NO_SAVE = "integrate_without_saving_with_flat.json"
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
defaults: Optional[Dict[str, Any]] = None,
**deprecated_defaults: Dict[str, Any],
) -> None:
defaults = self._merge_defaults(deprecated_defaults, defaults)
defaults.setdefault("queue", "online")
defaults.setdefault(
"integration_options",
{
"method": "no_csr_ocl_gpu",
"nbpt_rad": 4096,
"unit": "q_nm^-1",
},
)
defaults.setdefault("second_detector_integration_options", {})
defaults.setdefault(
"ascii_options",
{
"enabled": False,
"scans": [
"ascan",
"dscan",
"a2scan",
"d2scan",
"loopscan",
"timescan",
"sct",
],
"max_npoints": 1000,
},
)
defaults.setdefault("flat_enabled", True)
defaults.setdefault("newflat", "/data/id31/inhouse/P3/flats.mat")
defaults.setdefault("oldflat", "/data/id31/inhouse/P3/flats_old.mat")
defaults.setdefault("tomo_enabled", False)
defaults.setdefault("tomo_rot_name", "srot")
defaults.setdefault("tomo_y_name", "saby")
defaults.setdefault("tomo_scans", ["fscan2d"])
super().__init__(config=config, defaults=defaults)
self.workflow_with_saving = self.DEFAULT_WORKFLOW
self.workflow_without_saving = self.DEFAULT_WORKFLOW_NO_SAVE
[docs]
def get_config_filename(self, lima_name: str) -> Optional[str]:
if len(self.lima_names) >= 2 and lima_name == self.lima_names[1]:
return self.second_detector_pyfai_config
else:
return self.pyfai_config
[docs]
def get_integration_options(
self, scan: BlissScanType, lima_name: str
) -> Optional[dict]:
if len(self.lima_names) >= 2 and lima_name == self.lima_names[1]:
integration_options = self.second_detector_integration_options
else:
integration_options = self.integration_options
if integration_options:
return integration_options.to_dict()
return None
def _is_save_tomo_enabled(self, scan) -> bool:
"""Returns whether results need to be exported as tomo for this scan"""
if not self.tomo_enabled:
return False
scan_type = scan.scan_info.get("type", "")
return scan_type in self.tomo_scans
def _is_save_ascii_enabled(self, scan) -> bool:
"""Returns whether results need to be exported as ASCII for this scan"""
if not self.ascii_options:
return False
ascii_options = self.ascii_options.to_dict()
if not ascii_options.get("enabled", False):
return False
scan_type = scan.scan_info.get("type", "")
# sct's scan_type is ct with saving enabled
if scan_type == "ct" and scan.scan_info.get("save"):
scan_type = "sct"
return scan_type in ascii_options.get("scans", []) and (
0
< scan.scan_info.get("npoints", float("inf"))
<= ascii_options.get("max_npoints", 0)
)
def _export_filename_prefix(self, scan, lima_name: str) -> str:
"""Returns the prefix to use for exported filenames"""
basename = os.path.basename(self.get_filename(scan))
stem = os.path.splitext(basename)[0]
scan_nb = scan.scan_info.get("scan_nb")
return f"{stem}_{scan_nb:04d}_{lima_name}"
def _export_folder(self, scan) -> str:
"""Returns the folder where to store exported files"""
return os.path.join(
self.scan_processed_directory(scan),
"export",
)
[docs]
def get_workflow(self, scan) -> Optional[str]:
return self._get_workflow(scan)
def _get_default_workflow(self, scan) -> Optional[str]:
# Make sure default workflows are never used
return self._get_workflow(scan)
[docs]
def get_submit_arguments(self, scan, lima_name) -> dict:
kwargs = super().get_submit_arguments(scan, lima_name)
kwargs["outputs"] = [
{"all": False},
{"name": "nxdata_url", "task_identifier": "IntegrateBlissScan"},
]
kwargs["load_options"] = {"root_module": "ewoksid31.workflows"}
return kwargs
# TODO: Redis events don't show up
handler = {
"class": "ewoksjob.events.handlers.RedisEwoksEventHandler",
"arguments": [
{
"name": "url",
"value": "redis://bibhelm:25001/4",
},
{"name": "ttl", "value": 86400},
],
}
kwargs["execinfo"] = {"handlers": [handler]}
return kwargs
[docs]
def enabled_flatfield(self, enable: bool) -> None:
# Kept for backward compatibility
self.flat_enabled = enable
[docs]
def ensure_workflow_accessible(self, scan) -> None:
pass
def _get_lima_url_template_args(
self, scan: BlissScanType, lima_name: str
) -> Optional[Dict[str, str]]:
if self.lima_url_template_args:
lima_url_template_args = dict(self.lima_url_template_args)
else:
lima_url_template_args = dict()
eval_dict = {"img_acq_device": lima_name, "scan_number": scan.scan_number}
images_prefix = scan.scan_saving.images_prefix.format(**eval_dict)
images_path_template = scan.scan_saving.images_path_template.format(**eval_dict)
lima_url_template_args["images_prefix"] = images_prefix
lima_url_template_args["images_path_template"] = images_path_template
return lima_url_template_args