Source code for blissoda.id32.hdf5_to_spec_processor
import json
import logging
import os
import pathlib
from typing import Any
from typing import Dict
from typing import Optional
from ..bliss_globals import current_session
from ..ewoks_utils import submit
from ..persistent.parameters import ParameterInfo
from ..processor import BaseProcessor
from ..resources import resource_filename
from ..utils.directories import get_dataset_processed_dir
logger = logging.getLogger(__name__)
[docs]
class ID32Hdf5ToSpecProcessor(
BaseProcessor,
parameters=[
ParameterInfo("save_single_scans", category="workflows"),
],
):
QUEUE = "lid32xmcd2"
WORKFLOW_FILENAME = "convert_hdf5_to_spec.json"
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
defaults: Optional[Dict[str, Any]] = None,
**deprecated_defaults: Dict[str, Any],
) -> None:
defaults = self._merge_defaults(deprecated_defaults, defaults)
defaults.setdefault("trigger_at", "END")
defaults.setdefault("save_single_scans", True)
super().__init__(config=config, defaults=defaults)
def _get_workflow(self) -> dict:
with open(resource_filename("id32", self.WORKFLOW_FILENAME), "r") as wf:
return json.load(wf)
def _get_workflow_inputs(self, scan) -> list:
return [
{
"name": "scan_numbers",
"value": [scan.scan_info.get("scan_nb")],
},
{
"name": "save_single_scans",
"value": self.save_single_scans,
},
{
"name": "input_file",
"value": self._get_scan_filename(scan),
},
{
"name": "output_path",
"value": self._get_scan_processed_directory(scan),
},
]
def _get_scan_filename(self, scan) -> str:
filename = scan.scan_info.get("filename")
if filename:
return filename
return current_session.scan_saving.filename
def _get_scan_processed_directory(self, scan) -> str:
return get_dataset_processed_dir(self._get_scan_filename(scan))
def _get_workflow_destination(self, scan) -> str:
"""Builds the path where the workflow JSON will be saved."""
filename = self._get_scan_filename(scan)
scan_nb = scan.scan_info.get("scan_nb")
root = self._get_scan_processed_directory(scan)
stem = os.path.splitext(os.path.basename(filename))[0]
wf_path = os.path.join(root, "workflows")
pathlib.Path(wf_path).mkdir(parents=True, exist_ok=True)
basename = f"{stem}_{scan_nb:04d}_make_specfile.json"
return os.path.join(wf_path, basename)
def _trigger_workflow_on_new_scan(self, scan) -> None:
if not scan.scan_info["save"]:
return
workflow = self._get_workflow()
inputs = self._get_workflow_inputs(scan)
kwargs = {"inputs": inputs, "outputs": [{"all": False}]}
kwargs["convert_destination"] = self._get_workflow_destination(scan)
_ = submit(args=(workflow,), kwargs=kwargs, queue=self.QUEUE)