Source code for blissoda.id13.xrpd_processor
import json
import logging
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from ..persistent.parameters import ParameterInfo
from ..resources import resource_filename
from ..utils.pyfai import read_config
from ..xrpd.processor import BlissScanType
from ..xrpd.processor import XrpdProcessor
from ..xrpd.processor import _get_scan_memory_url
logger = logging.getLogger(__name__)
SLURM_JOB_PARAMETERS_INTEGRATE = {
"name": "ID13_pyFAI-integrate",
"partition": "gpu",
"time": "01:00:00",
"tasks_per_node": 1,
"cpus_per_task": 1,
"memory_per_cpu": "50G",
"tres_per_job": "gres/gpu:1",
"constraints": "l40s",
}
SLURM_JOB_PARAMETERS_BACKGROUND = {
"name": "ID13_spi-background",
"partition": "nice",
"time": "02:00:00",
"tasks_per_node": 1,
"cpus_per_task": 32,
"memory_per_cpu": "2G",
"tres_per_job": None,
"constraints": None,
}
SLURM_JOB_PARAMETERS_CNMF = {
"name": "ID13_spi-CNMF",
"partition": "gpu",
"time": "01:00:00",
"tasks_per_node": 1,
"cpus_per_task": 1,
"memory_per_cpu": "50G",
"tres_per_job": "gres/gpu:1",
"constraints": "l40s",
}
QUEUES_GPU_ID13 = [
"gpu3",
"gpu2",
]
QUEUE_SLURM_ID13 = "slurm"
WORKER_MODULE = "scattering"
PRE_SCRIPT = "module load {WORKER_MODULE}; python3 -m ewoksid13.scripts.utils.slurm_python_pre_script"
PYTHON_CMD = "python3"
POST_SCRIPT = "python3 -m ewoksid13.scripts.utils.slurm_python_post_script"
[docs]
class Id13XrpdProcessor(
XrpdProcessor,
parameters=[
ParameterInfo("pyfai_config", category="PyFai"),
ParameterInfo("integration_options", category="PyFai"),
ParameterInfo("workflow_juno", category="workflows"),
ParameterInfo("current_workflow", category="workflows"),
ParameterInfo("counters_to_copy", category="workflows"),
ParameterInfo("normalization_counter", category="Processing"),
ParameterInfo("do_diffmap", category="Processing"),
ParameterInfo("do_average", category="Processing"),
ParameterInfo("do_stackedf", category="Processing"),
ParameterInfo("average_reference", category="Processing"),
ParameterInfo("save_external_files", category="Processing"),
ParameterInfo("radial_limits", category="Processing"),
ParameterInfo("directory_cif_phases", category="Processing"),
ParameterInfo("do_background_removal", category="Processing"),
ParameterInfo("do_cnmf", category="Processing"),
ParameterInfo("nb_components", category="Processing"),
],
):
DEFAULT_LIMA_URL_TEMPLATE: Optional[str] = (
"{dirname}/scan{scan_number_as_str}/{images_prefix}{{file_index}}.h5::/entry_0000/measurement/data"
)
DEFAULT_WORKFLOW_JUNO: Optional[str] = resource_filename("id13", "juno.json")
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
defaults: Optional[Dict[str, Any]] = None,
**deprecated_defaults: Dict[str, Any],
) -> None:
defaults = self._merge_defaults(deprecated_defaults, defaults)
defaults.setdefault("current_workflow", "")
defaults.setdefault("save_scans_separately", True)
defaults.setdefault("do_diffmap", False)
defaults.setdefault("normalization_counter", None)
defaults.setdefault("do_average", False)
defaults.setdefault("do_stackedf", False)
defaults.setdefault("average_reference", "hydrocerussite")
defaults.setdefault("save_external_files", False)
defaults.setdefault("workflow_juno", self.DEFAULT_WORKFLOW_JUNO)
defaults.setdefault("do_background_removal", False)
defaults.setdefault("radial_limits", None)
defaults.setdefault("do_cnmf", False)
defaults.setdefault("directory_cif_phases", None)
defaults.setdefault("nb_components", None)
self._do_phase_inference = False # Not available for online
self._inference_weights_filename = None
self._submit_slurm_integration = False
self._slurm_module = WORKER_MODULE
self._slurm_pre_script = PRE_SCRIPT
self._slurm_python_cmd = PYTHON_CMD
self._slurm_post_script = POST_SCRIPT
self._slurm_parameters_integration = SLURM_JOB_PARAMETERS_INTEGRATE
self._slurm_parameters_background = SLURM_JOB_PARAMETERS_BACKGROUND
self._slurm_parameters_cnmf = SLURM_JOB_PARAMETERS_CNMF
self._slurm_queue = QUEUE_SLURM_ID13
self._queues = QUEUES_GPU_ID13
super().__init__(config=config, defaults=defaults)
[docs]
def get_integration_options(
self, scan: BlissScanType, lima_name: str
) -> Optional[dict]:
if self.pyfai_config:
integration_options = read_config(filename=self.pyfai_config)
else:
integration_options = dict()
if self.integration_options:
integration_options.update(self.integration_options.to_dict())
return integration_options
def _get_lima_url_template_args(
self, scan: BlissScanType, lima_name: str
) -> Optional[Dict[str, str]]:
if self.lima_url_template_args:
lima_url_template_args = dict(self.lima_url_template_args)
else:
lima_url_template_args = dict()
eval_dict = {"img_acq_device": lima_name, "scan_number": scan.scan_number}
images_prefix = scan.scan_saving.images_prefix.format(**eval_dict)
lima_url_template_args["images_prefix"] = images_prefix
lima_url_template_args["scan_number_as_str"] = scan.scan_number
return lima_url_template_args
[docs]
def get_external_output_filename(self, scan: BlissScanType, lima_name: str):
external_output_filename = self.external_output_filename(
scan=scan, lima_name=lima_name
)
if not external_output_filename:
scan_nb = scan.scan_info.get("scan_nb")
master_output_filename = self.master_output_filename(scan)
external_output_filename = master_output_filename.replace(
".h5", f"_{scan_nb:04d}.h5"
)
return external_output_filename
[docs]
def get_inputs(self, scan: BlissScanType, lima_name: str) -> List[dict]:
"""Additional inputs for the Id13 XrpdProcessor."""
inputs = super().get_inputs(scan=scan, lima_name=lima_name)
inputs += self.get_diff_extra_inputs(scan=scan, lima_name=lima_name)
inputs += self.get_neuralnetwork_inputs(scan=scan, lima_name=lima_name)
return inputs
[docs]
def get_diff_extra_inputs(
self,
scan: BlissScanType,
lima_name: str,
) -> List[dict]:
"""Get additional inputs for the diffmap, average, and stacked EDF tasks."""
filename = self.get_filename(scan)
scan_nb = scan.scan_info.get("scan_nb")
master_output_filename = self.master_output_filename(scan)
external_output_filename = self.get_external_output_filename(
scan=scan, lima_name=lima_name
)
external_output_filename_edf = external_output_filename.replace(".h5", ".edf")
if self.save_external_files:
external_output_filename = external_output_filename
else:
external_output_filename = master_output_filename
params = {
"CreateDiffMapFile": {
"master_filename": filename,
"scan_nb": scan_nb,
"output_filename": master_output_filename,
"external_output_filename": external_output_filename,
"integration_options": self.get_integration_options(
scan=scan, lima_name=lima_name
),
"lima_name": lima_name,
"do_diffmap": self.do_diffmap,
"scan_memory_url": _get_scan_memory_url(scan),
"normalization_counter": self.normalization_counter,
"processing_info": self.get_saving_info(scan),
},
"AverageIntegration": {
"master_filename": filename,
"scan_nb": scan_nb,
"lima_name": lima_name,
"output_filename": master_output_filename,
"external_output_filename": external_output_filename,
"do_average": self.do_average,
"reference": self.average_reference,
"scan_memory_url": _get_scan_memory_url(scan),
"normalization_counter": self.normalization_counter,
"processing_info": self.get_saving_info(scan),
},
"StackToEdf": {
"do_stackedf": self.do_stackedf,
"external_output_filename": external_output_filename_edf,
"processing_info": self.get_saving_info(scan),
},
}
return [
{"name": name, "value": value, "task_identifier": task_id}
for task_id, task_params in params.items()
for name, value in task_params.items()
]
[docs]
def get_neuralnetwork_inputs(
self, scan: BlissScanType, lima_name: str
) -> List[dict]:
"""Get additional inputs for the neural network (spi) tasks: background removal, constrained NMF and phase inference."""
external_output_filename = self.get_external_output_filename(
scan=scan, lima_name=lima_name
)
wait_for_bg = self.do_cnmf
do_background_removal = self.do_background_removal or self.do_cnmf
common_params = {
"wavelength": self.get_wavelength(angstroms=True),
"radial_limits": self.radial_limits,
"worker_module": self._slurm_module,
"processing_info": self.get_saving_info(scan),
}
if not do_background_removal:
params = {
"BackgroundRemoval": {
"do_background_removal": False,
**common_params,
},
"ConstrainedNMF": {
"do_matrix_factorization": False,
**common_params,
},
"PhaseInference": {
"do_phase_inference": False,
"references_directory": self.directory_cif_phases,
**common_params,
},
}
else:
params = {
"BackgroundRemoval": {
"do_background_removal": True,
"force_training": False, # Not good for online processing
"use_neuralnetwork": False, # Not good for online processing
"submit_to_slurm": True, # Background needs multiprocessing (--pool=process, which appears to jeopardize the integration sometimes)
"destination_file": external_output_filename.replace(
".h5", "_background.json"
),
"slurm_job_parameters": self._slurm_parameters_background,
"wait_to_finish": wait_for_bg,
**common_params,
},
"ConstrainedNMF": {
"references_directory": self.directory_cif_phases,
"nb_components": self.nb_components,
"do_matrix_factorization": self.do_cnmf,
"submit_to_slurm": True, # This task is too heavy to leave it running on gpu3 during the experiment
"destination_file": external_output_filename.replace(
".h5", "_cnmf.json"
),
"slurm_job_parameters": self._slurm_parameters_cnmf,
"wait_to_finish": False,
**common_params,
},
"PhaseInference": {
"do_phase_inference": self._do_phase_inference,
"references_directory": self.directory_cif_phases,
"inference_weights_filename": self._inference_weights_filename,
**common_params,
},
}
return [
{"name": name, "value": value, "task_identifier": task_id}
for task_id, task_params in params.items()
for name, value in task_params.items()
]
[docs]
def get_wavelength(self, angstroms: bool = True) -> float:
"""Load the wavelength from the PyFai configuration file."""
with open(self.pyfai_config) as f:
config = json.load(f)
if "wavelength" in config:
return float(config["wavelength"]) * (1e10 if angstroms else 1)
if "poni" in config:
return float(config["poni"]["wavelength"]) * (1e10 if angstroms else 1)
def _get_workflow(self, scan: BlissScanType) -> Optional[str]:
"""Get the workflow filename for the scan"""
if scan.scan_info.get("save"):
self.current_workflow = self.workflow_juno
else:
self.current_workflow = self.workflow_without_saving
return self.current_workflow
def _set_workflow(self, scan: BlissScanType, filename: str) -> None:
"""Set the workflow filename for the scan"""
if scan.scan_info.get("save"):
self.workflow_with_saving = filename
else:
self.workflow_without_saving = filename
self.current_workflow = filename
[docs]
def get_submit_arguments(self, scan: BlissScanType, lima_name) -> dict:
"""Additional submit arguments in case of slurm queue, also switch the queue if needed."""
submit_arguments = super().get_submit_arguments(scan, lima_name)
if self.queue == self._slurm_queue:
slurm_parameters = {
**self._slurm_parameters_integration,
"environment": {
"HOME": "/tmp_14_days/dau_id13_opid13",
},
}
return {
**submit_arguments,
"slurm_arguments": {
"parameters": slurm_parameters,
"pre_script": self._slurm_pre_script.format(
WORKER_MODULE=self._slurm_module
),
"python_cmd": self._slurm_python_cmd,
"post_script": self._slurm_post_script,
},
}
return submit_arguments
[docs]
def get_saving_info(self, scan: BlissScanType) -> dict:
return {
"base_path": scan.scan_saving.base_path,
"template": scan.scan_saving.template,
"proposal_dirname": scan.scan_saving.proposal_dirname,
"beamline": scan.scan_saving.beamline,
"proposal_session_name": scan.scan_saving.proposal_session_name,
"collection_name": scan.scan_saving.collection_name,
"dataset_name": scan.scan_saving.dataset_name,
}
[docs]
def get_integrate_inputs(
self, scan: BlissScanType, lima_name: str, task_identifier: str
) -> List[dict]:
return super().get_integrate_inputs(
scan=scan, lima_name=lima_name, task_identifier=task_identifier
) + [
{
"task_identifier": task_identifier,
"name": "measurements_to_copy",
"value": self.counters_to_copy if self.counters_to_copy else [],
},
]