import logging
import os
import uuid
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from ..bliss_globals import current_session
from ..ewoks_utils import get_future
from ..ewoks_utils import submit
from ..persistent.parameters import ParameterInfo
from ..processor import BaseProcessor
from ..processor import BlissScanType
from ..utils import directories
_logger = logging.getLogger(__name__)
[docs]
class SinogramProcessor(
BaseProcessor,
parameters=[
ParameterInfo("sleep_time", category="sinogram"),
ParameterInfo("deltabeta", category="sinogram"),
ParameterInfo("backends", category="sinogram"),
ParameterInfo("cor_backend", category="sinogram"),
# TODO: Param should be common?
ParameterInfo("queue", category="workflows"),
],
):
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
defaults: Optional[Dict[str, Any]] = None,
**deprecated_defaults: Dict[str, Any],
) -> None:
defaults = self._merge_defaults(deprecated_defaults, defaults)
defaults.setdefault("_enabled", True)
defaults.setdefault("sleep_time", 0)
defaults.setdefault("deltabeta", 200)
defaults.setdefault("backends", "nabu,silx")
defaults.setdefault("cor_backend", "bliss-cor8")
# Set tomo wide defaults
defaults.setdefault("trigger_at", "END")
defaults.setdefault("queue", "tomo_sinogram")
super().__init__(config=config, defaults=defaults)
# TODO: Should move to common location
[docs]
def workflow_destination(self, scan: BlissScanType) -> str:
filename = self.get_filename(scan)
scan_nb = scan.scan_info.get("scan_nb")
root = self.scan_processed_directory(scan)
stem = os.path.splitext(os.path.basename(filename))[0]
basename = f"{stem}_{scan_nb:04d}.json"
return os.path.join(root, basename)
# TODO: Should move to common location
[docs]
def master_output_filename(self, scan: BlissScanType) -> str:
"""Filename which can be used to inspect the results after the processing."""
filename = self.get_filename(scan)
root = self.scan_processed_directory(scan)
basename = os.path.basename(filename)
return os.path.join(root, basename)
# TODO: Should move to common location
[docs]
def get_filename(self, scan: BlissScanType) -> str:
filename = scan.scan_info.get("filename")
if filename:
return filename
return current_session.scan_saving.filename
# TODO: Common
[docs]
def scan_processed_directory(self, scan: BlissScanType) -> str:
return directories.get_dataset_processed_dir(self.get_filename(scan))
[docs]
def get_submit_arguments(self, scan: BlissScanType) -> dict:
return {
"inputs": self.get_inputs(scan),
"outputs": [{"all": False}],
}
[docs]
def get_workflow(self):
return {
"graph": {
"id": "tomo-sinogram-reconstruction",
"label": "tomo sinogram reconstruction",
},
"nodes": [
{
"id": "SinogramReconstruction",
"task_type": "class",
"task_identifier": "tomovis.ewoks.tomo_sinogram_reconstruction.TomoSinogramReconstruction",
}
],
"links": [],
}
def _trigger_workflow_on_new_scan(self, scan):
_logger.debug("Trigger workflow on new scan")
return self.on_new_scan_metadata(scan)
[docs]
def scan_requires_processing(self, scan: BlissScanType) -> bool:
channels = scan.scan_info.get("channels", {})
sinogram_chan = channels.get("sinogram") is not None
rotation_chan = channels.get("rotation") is not None
translation_chan = channels.get("translation") is not None
sinogram = sinogram_chan and rotation_chan and translation_chan
return sinogram
def _on_new_scan(self, scan) -> Tuple[Optional[dict], Optional[Any]]:
_logger.debug("_on_new_scan")
if not self.scan_requires_processing(scan):
return None, None
workflow = self.get_workflow()
kwargs = self.get_submit_arguments(scan)
if scan.scan_info.get("save"):
kwargs["convert_destination"] = self.workflow_destination(scan)
_logger.debug("Execute workflow\n%s\nArgs:\n%s", workflow, kwargs)
_logger.debug("Queue: %s", self.queue)
future = submit(args=(workflow,), kwargs=kwargs, queue=self.queue)
future = get_future(future.uuid)
return None, future
[docs]
def reprocess(
self,
params: Dict[str, Any],
):
_logger.debug("Reprocess: %s", params)
datacollectionid = params.pop("datacollectionid")
deltabeta = params.pop("deltabeta")
axisposition = params.pop("axisposition")
filename = params.pop("filename")
overwrite = params.pop("overwrite", False)
if len(params) != 0:
_logger.error("Unexpected reprocess parameters: %s", params)
workflow = self.get_workflow()
kwargs = {
"inputs": self.get_reprocess_inputs(
axisposition=axisposition,
deltabeta=deltabeta,
filename=filename,
datacollectionid=datacollectionid,
overwrite=overwrite,
),
"outputs": [{"all": False}],
}
future = submit(args=(workflow,), kwargs=kwargs, queue=self.queue)
future = get_future(future.uuid)