"""User API for HDF5 conversion on the Bliss repl"""
import logging
import os
import shutil
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from esrf_pathlib import ESRFPath
from ewoksjob.client.futures import FutureInterface
from ..bliss_globals import current_session
from ..ewoks_utils import submit
from ..import_utils import unavailable_module
from ..persistent.parameters import ParameterInfo
from ..processor import BaseProcessor
from ..processor import BlissScanType
from ..resources import resource_filename
try:
import gevent
except ImportError as ex:
gevent = unavailable_module(ex)
logger = logging.getLogger(__name__)
def _optional_float(value):
if value is None:
return None
return float(value)
def _optional_list_of_strings(value):
if value is None:
return None
if isinstance(value, str):
return [value]
return list(map(str, value))
[docs]
class Bm08Hdf5ToXdiConverter(
BaseProcessor,
parameters=[
ParameterInfo(
"workflow",
category="workflows",
validator=str,
doc="Workflows to submit for every Kscan.",
),
ParameterInfo(
"queue",
category="workflows",
validator=str,
doc="Ewoks queue to submit the jobs to.",
),
ParameterInfo(
"mono_counter",
category="parameters",
validator=str,
doc="Monochromator energy or theta counter name.",
),
ParameterInfo(
"crystal_motor",
category="parameters",
validator=str,
doc="Motor that selects the monochromator crystal.",
),
ParameterInfo(
"optional_counters",
category="parameters",
validator=_optional_list_of_strings,
doc="Other counter name to be saved in XDI.",
),
ParameterInfo(
"optional_mca_counters",
category="parameters",
validator=_optional_list_of_strings,
doc="MCA counter names like ROI names to be saved in XDI.",
),
ParameterInfo(
"livetime_normalization",
category="parameters",
validator=_optional_float,
doc="Live-time normalization in seconds for the MCA counters.\n"
" None: no normalization\n"
" `<=0` the median of the elapsed per point",
),
ParameterInfo(
"retry_timeout",
category="data access",
validator=_optional_float,
doc="Timeout for HDF5 reading.\n None: wait forever",
),
ParameterInfo(
"mono_edge_theoretical",
category="calibration",
validator=_optional_float,
doc="The theoretical edge position in 'mono_counter' units.",
),
ParameterInfo(
"mono_edge_experimental",
category="calibration",
validator=_optional_float,
doc="The experimental edge position in 'mono_counter' units.",
),
],
):
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
defaults: Optional[Dict[str, Any]] = None,
) -> None:
if defaults is None:
defaults = dict()
defaults.setdefault("trigger_at", "END")
defaults.setdefault("queue", "online")
defaults.setdefault("mono_counter", "mono_enc")
defaults.setdefault("crystal_motor", "c_sel")
defaults.setdefault(
"optional_counters",
[
"I0_eh1",
"I1_eh1",
"IX_eh1",
"I0_eh2",
"I1_eh2",
"IX_eh2",
"IR_eh2",
"volt1",
"volt2",
],
)
defaults.setdefault("optional_mca_counters", [])
defaults.setdefault("workflow", resource_filename("bm08", "convert.json"))
defaults.setdefault("retry_timeout", 60)
defaults.setdefault("livetime_normalization", -1)
super().__init__(config=config, defaults=defaults)
# For integration tests
self._future = None
def _trigger_workflow_on_new_scan(self, scan: BlissScanType) -> None:
self.on_new_scan_metadata(scan)
[docs]
def scan_requires_processing(self, scan: BlissScanType) -> bool:
if not scan.scan_info["save"]:
return False
scan_parameters = scan.scan_info.get("scan_parameters", {})
scan_type = scan_parameters.get("scan_type")
if scan_type != "Kscan":
return False
counter_names = self._get_counter_names(scan)
if self.mono_counter not in counter_names:
logger.warning(
"Do not trigger XDI conversion: %r is not used in the scan",
self.mono_counter,
)
return False
# TODO: check the motor self.crystal_motor is available
return True
[docs]
def get_submit_arguments(self, scan: BlissScanType) -> dict:
return {
"inputs": self.get_inputs(scan),
"outputs": [{"all": False}],
"convert_destination": self._get_convert_destination(scan),
"upload_parameters": self._get_workflow_upload_parameters(scan),
}
def _get_workflow(self, scan: BlissScanType) -> Optional[str]:
"""Get the workflow to execute for the scan and ensure it is located
in the proposal directory for user reference and worker accessibility.
"""
src_file = self.workflow
if src_file is None:
return
if not os.path.isfile(src_file):
src_file = resource_filename("bm08", "xdi_convert.json")
filename = ESRFPath(scan.scan_info["filename"])
dst_file = filename.scripts_path / "workflows" / "xdi_convert.json"
str_dst_file = str(dst_file)
if src_file != str_dst_file:
self.workflow = str_dst_file
if not dst_file.exists():
dst_file.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(src_file, str_dst_file)
return str_dst_file
def _get_counter_names(self, scan: BlissScanType) -> List[str]:
"""Names in the measurement group."""
counter_names = []
for full_name in scan.scan_info.get("channels", dict()):
_, _, name = full_name.rpartition(":")
counter_names.append(name)
return counter_names
def _get_optional_counters(self, scan: BlissScanType) -> List[str]:
"""Names in the measurement group."""
if not self.optional_counters:
return []
counter_names = self._get_counter_names(scan)
return [s for s in counter_names if s in self.optional_counters]
def _get_convert_destination(self, scan: BlissScanType) -> str:
filename = ESRFPath(scan.scan_info["filename"])
output_directory = filename.processed_dataset_path
output_stem = f"{filename.stem}_scan{scan.scan_info['scan_nb']}"
return str(output_directory / "workflows" / f"{output_stem}.json")
def _get_output_filename(self, scan: BlissScanType) -> str:
filename = ESRFPath(scan.scan_info["filename"])
output_directory = filename.processed_dataset_path
output_stem = scan.scan_info["title"].replace(" ", "_")
return str(output_directory / f"{output_stem}.xdi")
def _get_workflow_upload_parameters(self, scan: BlissScanType) -> dict:
raw_directory = ESRFPath(scan.scan_info["filename"]).raw_dataset_path
processed_directory = raw_directory.processed_dataset_path
scan_saving = current_session.scan_saving
metadata = {"Sample_name": scan_saving.dataset["Sample_name"]}
return {
"beamline": scan_saving.beamline,
"proposal": scan_saving.proposal_name,
"dataset": "integrate",
"path": str(processed_directory),
"raw": [str(raw_directory)],
"metadata": metadata,
}
def _print_result(future: FutureInterface, timeout: Optional[float]) -> None:
try:
output_filename = future.result(timeout)["output_filename"]
print(f"XDI file saved: {output_filename}")
except Exception as ex:
logger.warning("XDI conversion failed: %s", ex)