Source code for blissoda.id15b.xdi_conversion

import os
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

from ..ewoks_utils import submit
from ..persistent.parameters import ParameterInfo
from ..processor import BaseProcessor
from ..processor import BlissScanType

_DEFAULT_SCAN_PARS: Dict[str, Any] = {}


[docs] class TiffFilesProcessor( BaseProcessor, parameters=[ ParameterInfo("queue", category="workflows", deprecated_names=["worker"]), ParameterInfo("workflow", category="workflows"), ParameterInfo( "lima_name", category="TiffFiles", doc="Name of the detector used for the conversion", ), ParameterInfo( "scan_parameters", category="TiffFiles", doc="Derived from scan_info", ), ], ): def __init__( self, config: Optional[Dict[str, Any]] = None, defaults: Optional[Dict[str, Any]] = None, **deprecated_defaults: Dict[str, Any], ) -> None: defaults = self._merge_defaults(deprecated_defaults, defaults) # default trigger at end of scan defaults.setdefault("trigger_at", "END") defaults.setdefault("workflow", "") defaults.setdefault("lima_name", "eiger") # initialize empty scan parameters defaults.setdefault("scan_parameters", _DEFAULT_SCAN_PARS) super().__init__(config=config, defaults=defaults)
[docs] def update_scan_parameters(self, scan: BlissScanType = None) -> Dict[str, Any]: scan_parameters: Dict[str, Any] = {} if scan is not None: # HDF5 input file from LIMA scan_number = scan.scan_number image_path = scan.scan_saving.images_path.format( scan_number=scan_number, img_acq_device=self.lima_name if hasattr(self, "lima_name") else "", ) # typically the first frame file scan_parameters["images"] = [f"{image_path}0000.h5"] # output base name and folder processed_dir = os.path.dirname( scan_parameters["images"][0].replace("RAW_DATA", "PROCESSED_DATA") ) # folder where TIFFs will be created output_folder = os.path.join(processed_dir, "xdi") scan_parameters["output"] = output_folder self.scan_parameters.update(scan_parameters) return scan_parameters
[docs] def get_inputs(self, scan: BlissScanType) -> List[Dict[str, Any]]: # ensure scan_parameters are up to date params = self.update_scan_parameters(scan) inputs: List[Dict[str, Any]] = [ { "task_identifier": "TiffFiles", "name": "images", "value": params["images"], }, { "task_identifier": "TiffFiles", "name": "output", "value": params["output"], }, ] if self.lima_name: inputs.append( { "task_identifier": "TiffFiles", "name": "detector_name", "value": self.lima_name, } ) return inputs
[docs] def workflow_destination(self, scan) -> str: scan_number = scan.scan_info.get("scan_nb", 0) dataset_name = os.path.basename(os.path.dirname(scan.scan_saving.filename)) return os.path.join( self.scan_parameters["output"], f"{dataset_name}_{scan_number}.json" )
[docs] def get_submit_arguments(self, scan: BlissScanType) -> Dict[str, Any]: return {"inputs": self.get_inputs(scan), "outputs": [{"all": "False"}]}
[docs] def run_conversion(self, scan: BlissScanType) -> None: kw = self.get_submit_arguments(scan) # destination for workflow metadata kw["convert_destination"] = self.workflow_destination(scan) submit(args=(self.workflow,), kwargs=kw, queue=self.queue)
def _trigger_workflow_on_new_scan(self, scan: BlissScanType) -> None: # trigger only for fscan-type scans if "dmesh" in scan.scan_info.get("type", ""): self.run_conversion(scan)