import os
import shutil
from pathlib import Path
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from ewoksjob.client import submit
from ..persistent.parameters import ParameterInfo
from ..processor import BaseProcessor
from ..resources import resource_filename
from ..utils import directories
from ..xrpd.processor import _get_scan_memory_url
[docs]
class Id13AutoAlignProcessor(
BaseProcessor,
parameters=[
ParameterInfo("counter_name", category="AutoAlign"),
ParameterInfo("motor1_name", category="AutoAlign"),
ParameterInfo("motor2_name", category="AutoAlign"),
ParameterInfo("workflow_autoalign", category="AutoAlign"),
ParameterInfo("model_filename", category="AutoAlign"),
ParameterInfo("queue", category="workflows"),
],
):
DEFAULT_WORKFLOW_AUTOALIGN: Optional[str] = resource_filename(
"id13", "autoalign_nn.json"
)
QUEUE_TORCH = "gpu3"
MODEL_FILENAME = "/data/id13/inhouse/Nicolas/Test_python/cnn_siemens_align/cnn_center_siemens.pth"
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
defaults: Optional[Dict[str, Any]] = None,
**deprecated_defaults: Dict[str, Any],
) -> None:
defaults = self._merge_defaults(deprecated_defaults, defaults)
defaults.setdefault("_plotting_enabled", False)
defaults.setdefault("counter_name", "")
defaults.setdefault("motor1_name", "")
defaults.setdefault("motor2_name", "")
defaults.setdefault("workflow_autoalign", self.DEFAULT_WORKFLOW_AUTOALIGN)
defaults.setdefault("queue", self.QUEUE_TORCH)
defaults.setdefault("model_filename", self.MODEL_FILENAME)
super().__init__(config=config, defaults=defaults)
[docs]
def trigger_workflow_on_new_scan(self, scan):
return self.on_new_scan_metadata(scan)
[docs]
def get_submit_arguments(self, scan) -> dict:
return {
"inputs": self.get_inputs(scan),
"outputs": [{"all": False}],
}
[docs]
def get_workflow(self, scan) -> Optional[str]:
"""Get the workflow to execute for the scan and ensure it is located
in the proposal directory for user reference and worker accessibility.
"""
script_dir = os.path.join(
scan.scan_info["filename"].split("RAW_DATA")[0], "SCRIPTS"
)
# Create the directory
Path(script_dir).mkdir(parents=True, exist_ok=True)
workflow_path = os.path.join(script_dir, "autoalign_nn.json")
if not os.path.exists(workflow_path):
shutil.copy(self.workflow_autoalign, workflow_path)
self.workflow_autoalign = workflow_path
return self.workflow_autoalign
def _get_workflows_dir(self, dataset_filename: str) -> str:
return directories.get_workflows_dir(dataset_filename)
def _on_new_scan(self, scan) -> Tuple[Optional[dict], Optional[Any]]:
future = None
workflow = self.get_workflow(scan)
kwargs = self.get_submit_arguments(scan)
# Trigger workflow from the current process.
future = submit(args=(workflow,), kwargs=kwargs, queue=self.queue)
return future