Source code for blissoda.tomo.sinogram_processor

import logging
import os
import uuid
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple

from ..bliss_globals import current_session
from ..ewoks_utils import get_future
from ..ewoks_utils import submit
from ..persistent.parameters import ParameterInfo
from ..processor import BaseProcessor
from ..processor import BlissScanType
from ..utils import directories

_logger = logging.getLogger(__name__)


[docs] class SinogramProcessor( BaseProcessor, parameters=[ ParameterInfo("sleep_time", category="sinogram"), ParameterInfo("deltabeta", category="sinogram"), ParameterInfo("backends", category="sinogram"), ParameterInfo("cor_backend", category="sinogram"), # TODO: Param should be common? ParameterInfo("queue", category="workflows"), ], ): def __init__( self, config: Optional[Dict[str, Any]] = None, defaults: Optional[Dict[str, Any]] = None, **deprecated_defaults: Dict[str, Any], ) -> None: defaults = self._merge_defaults(deprecated_defaults, defaults) defaults.setdefault("_enabled", True) defaults.setdefault("sleep_time", 0) defaults.setdefault("deltabeta", 200) defaults.setdefault("backends", "nabu,silx") defaults.setdefault("cor_backend", "bliss-cor8") # Set tomo wide defaults defaults.setdefault("trigger_at", "END") defaults.setdefault("queue", "tomo_sinogram") super().__init__(config=config, defaults=defaults) # TODO: Should move to common location
[docs] def workflow_destination(self, scan: BlissScanType) -> str: filename = self.get_filename(scan) scan_nb = scan.scan_info.get("scan_nb") root = self.scan_processed_directory(scan) stem = os.path.splitext(os.path.basename(filename))[0] basename = f"{stem}_{scan_nb:04d}.json" return os.path.join(root, basename)
# TODO: Should move to common location
[docs] def master_output_filename(self, scan: BlissScanType) -> str: """Filename which can be used to inspect the results after the processing.""" filename = self.get_filename(scan) root = self.scan_processed_directory(scan) basename = os.path.basename(filename) return os.path.join(root, basename)
# TODO: Should move to common location
[docs] def get_filename(self, scan: BlissScanType) -> str: filename = scan.scan_info.get("filename") if filename: return filename return current_session.scan_saving.filename
# TODO: Common
[docs] def scan_processed_directory(self, scan: BlissScanType) -> str: return directories.get_dataset_processed_dir(self.get_filename(scan))
[docs] def get_sinogram_inputs( self, sleep_time: Optional[float] = None, axisposition: Optional[float] = None, deltabeta: Optional[float] = None, backends: Optional[str] = None, cor_backend: Optional[str] = None, ): def arg_else_prop(arg, prop): return arg if arg is not None else prop params = { "sleep_time": arg_else_prop(sleep_time, self.sleep_time), "axisposition": axisposition, "deltabeta": arg_else_prop(deltabeta, self.deltabeta), "backends": arg_else_prop(backends, self.backends), "cor_backend": arg_else_prop(cor_backend, self.cor_backend), } inputs = [ { "task_identifier": "SinogramReconstruction", "name": p, "value": v, } for p, v in params.items() ] return inputs
[docs] def get_inputs(self, scan) -> List[dict]: filename = self.get_filename(scan) scan_nb = scan.scan_info.get("scan_nb") inputs = [ { "task_identifier": "SinogramReconstruction", "name": "filename", "value": filename, }, { "task_identifier": "SinogramReconstruction", "name": "output_filename", "value": self.master_output_filename(scan), }, { "task_identifier": "SinogramReconstruction", "name": "scan", "value": scan_nb, }, ] inputs += self.get_sinogram_inputs() return inputs
[docs] def get_reprocess_inputs( self, datacollectionid: int, filename: str, deltabeta: Optional[float], axisposition: Optional[float], overwrite: bool = False, ) -> List[dict]: process_root = directories.get_dataset_processed_dir(filename) if os.path.exists(process_root): job_uuid = str(uuid.uuid4()) process_root = f"{process_root}_{job_uuid}" os.makedirs(process_root) basename = os.path.basename(filename) inputs = [ { "task_identifier": "SinogramReconstruction", "name": "dataCollectionId", "value": datacollectionid, }, { "task_identifier": "SinogramReconstruction", "name": "filename", "value": filename, }, { "task_identifier": "SinogramReconstruction", "name": "output_filename", "value": os.path.join(process_root, basename), }, ] inputs += self.get_sinogram_inputs( deltabeta=deltabeta, axisposition=axisposition ) return inputs
[docs] def get_submit_arguments(self, scan: BlissScanType) -> dict: return { "inputs": self.get_inputs(scan), "outputs": [{"all": False}], }
[docs] def get_workflow(self): return { "graph": { "id": "tomo-sinogram-reconstruction", "label": "tomo sinogram reconstruction", }, "nodes": [ { "id": "SinogramReconstruction", "task_type": "class", "task_identifier": "tomovis.ewoks.tomo_sinogram_reconstruction.TomoSinogramReconstruction", } ], "links": [], }
def _trigger_workflow_on_new_scan(self, scan): _logger.debug("Trigger workflow on new scan") return self.on_new_scan_metadata(scan)
[docs] def scan_requires_processing(self, scan: BlissScanType) -> bool: channels = scan.scan_info.get("channels", {}) sinogram_chan = channels.get("sinogram") is not None rotation_chan = channels.get("rotation") is not None translation_chan = channels.get("translation") is not None sinogram = sinogram_chan and rotation_chan and translation_chan return sinogram
[docs] def on_new_scan_metadata(self, scan) -> Optional[dict]: metadata, _ = self._on_new_scan(scan) return metadata
def _on_new_scan(self, scan) -> Tuple[Optional[dict], Optional[Any]]: _logger.debug("_on_new_scan") if not self.scan_requires_processing(scan): return None, None workflow = self.get_workflow() kwargs = self.get_submit_arguments(scan) if scan.scan_info.get("save"): kwargs["convert_destination"] = self.workflow_destination(scan) _logger.debug("Execute workflow\n%s\nArgs:\n%s", workflow, kwargs) _logger.debug("Queue: %s", self.queue) future = submit(args=(workflow,), kwargs=kwargs, queue=self.queue) future = get_future(future.uuid) return None, future
[docs] def reprocess( self, params: Dict[str, Any], ): _logger.debug("Reprocess: %s", params) datacollectionid = params.pop("datacollectionid") deltabeta = params.pop("deltabeta") axisposition = params.pop("axisposition") filename = params.pop("filename") overwrite = params.pop("overwrite", False) if len(params) != 0: _logger.error("Unexpected reprocess parameters: %s", params) workflow = self.get_workflow() kwargs = { "inputs": self.get_reprocess_inputs( axisposition=axisposition, deltabeta=deltabeta, filename=filename, datacollectionid=datacollectionid, overwrite=overwrite, ), "outputs": [{"all": False}], } future = submit(args=(workflow,), kwargs=kwargs, queue=self.queue) future = get_future(future.uuid)