from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from ..bliss_globals import current_session
from ..persistent.parameters import ParameterInfo
from ..processor import BlissScanType
from ..resources import resource_filename
from ..utils import validators
from ..utils.serialization import ensure_numpy_pickle_compat
from ..xrpd.processor import XrpdProcessor
from .utils import get_data_url
from .utils import get_positions
from .utils import is_multigeometry
[docs]
class Id06XrpdProcessor(
XrpdProcessor,
parameters=[
ParameterInfo("pyfai_config", category="PyFai", validator=validators.is_file),
ParameterInfo("integration_options", category="PyFai"),
ParameterInfo("goniometer_file", category="PyFai"),
ParameterInfo("multigeometry_workflow", category="workflows"),
ParameterInfo("queue_singlegeometry", category="workflows"),
ParameterInfo("queue_multigeometry", category="workflows"),
],
):
MULTIGEOMETRY_WORKFLOW = resource_filename(
"id06", "integrate_2dmultigeometry_with_saving.json"
)
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
defaults: Optional[Dict[str, Any]] = None,
**deprecated_defaults: Dict[str, Any],
) -> None:
defaults = self._merge_defaults(deprecated_defaults, defaults)
defaults.setdefault("multigeometry_enabled", False)
defaults.setdefault("step", 10)
defaults.setdefault("multigeometry_workflow", self.MULTIGEOMETRY_WORKFLOW)
defaults.setdefault("queue", "celery")
defaults.setdefault("queue_singlegeometry", "celery")
defaults.setdefault("queue_multigeometry", "lvp")
super().__init__(config=config, defaults=defaults)
ensure_numpy_pickle_compat()
def _get_workflow(self, scan: BlissScanType) -> str:
if is_multigeometry(scan):
return self.multigeometry_workflow
elif scan.scan_info["save"]:
return self.workflow_with_saving
else:
return self.workflow_without_saving
def _set_workflow(self, scan: BlissScanType, filename: str) -> None:
if is_multigeometry(scan):
self.multigeometry_workflow = filename
elif scan.scan_info.get("save"):
self.workflow_with_saving = filename
else:
self.workflow_without_saving = filename
def _init_queue(self, scan: BlissScanType):
self.queue = self.queue_singlegeometry
if is_multigeometry(scan):
self.queue = self.queue_multigeometry
def _on_new_scan(self, scan: BlissScanType):
self._init_queue(scan)
return super()._on_new_scan(scan)
[docs]
def get_config_filename(self, lima_name: str) -> Optional[str]:
return self.pyfai_config
[docs]
def get_integration_options(
self, scan: BlissScanType, lima_name: str
) -> Optional[dict]:
integration_options = self.integration_options
if integration_options:
return integration_options.to_dict()
return None
def _get_multigeometry_inputs(
self, scan: BlissScanType, lima_name: str
) -> List[dict]:
task_identifier = "Integrate2DMultiGeometry"
multigeometry_inputs = [
{
"task_identifier": task_identifier,
"name": "goniometer_file",
"value": self._get_parameter("goniometer_file"),
},
{
"task_identifier": task_identifier,
"name": "positions",
"value": self._get_positions(),
},
{
"task_identifier": task_identifier,
"name": "images",
"value": get_data_url(scan, lima_name),
},
{
"task_identifier": task_identifier,
"name": "retry_timeout",
"value": self.retry_timeout,
},
{
"task_identifier": task_identifier,
"name": "retry_period",
"value": self.retry_period,
},
]
multigeometry_inputs.append(
{
"task_identifier": "SaveNexusIntegrated",
"name": "nxprocess_name",
"value": f"{lima_name}_integrate",
}
)
return multigeometry_inputs
def _get_positions(self) -> List[float]:
fscan_parameters = current_session.setup_globals.fscan.pars
return get_positions(
start=fscan_parameters.start_pos,
step=fscan_parameters.step_size,
npts=fscan_parameters.npoints,
)