Source code for blissoda.streamline.scanner

"""
.. code-block:: python

    DEMO_SESSION [1]: from blissoda.demo.streamline_scanner import streamline_scanner,sample_changer
    DEMO_SESSION [2]: streamline_scanner.eject()
    DEMO_SESSION [3]: streamline_scanner.load()
    DEMO_SESSION [4]: streamline_scanner.calib(1, sample_index=0)
    DEMO_SESSION [5]: streamline_scanner.run(0.1)
"""

import os
import re
import shutil
from numbers import Number
from contextlib import contextmanager
from typing import Optional, NamedTuple, Tuple, Dict, Generator

import numpy

try:
    from bliss import current_session
    from bliss import setup_globals
    from bliss.common.logtools import elog_print
except ImportError:
    setup_globals = None
    current_session = None
    elog_print = print

try:
    from ewoksjob.client import submit
except ImportError:
    submit = None

from ..persistent.parameters import WithPersistentParameters
from ..persistent.parameters import ParameterInfo
from ..utils import directories
from ..resources.streamline import RESOURCE_ROOT


[docs] class ScanInfo(NamedTuple): filename: str scan_nb: int @property def url(self): return f"{self.filename}::/{self.scan_nb}.1"
[docs] class StreamlineScanner( WithPersistentParameters, parameters=[ ParameterInfo("workflow", category="data processing"), ParameterInfo("sample_changer_name", category="names"), ParameterInfo("detector_name", category="names"), ParameterInfo("energy_name", category="names"), ParameterInfo("calibration_scans", category="calibration"), ParameterInfo("calibration_motor", category="calibration"), ParameterInfo("image_slice", category="PyFai"), ParameterInfo("integration_options", category="PyFai"), ParameterInfo("pyfai_config", category="PyFai"), ParameterInfo("calibrant", category="calibration"), ParameterInfo("trigger_workflows", category="data processing"), ParameterInfo("vibration_speed_during_scan", category="sample changer"), ParameterInfo("verify_qrcode", category="robust vs. speed"), ParameterInfo("autotune_qrreader_per", category="robust vs. speed"), ParameterInfo("dryrun", category="testing"), ParameterInfo("calib_ring_detector_name", category="calibration"), ], ): def __init__(self, **defaults) -> None: defaults.setdefault("image_slice", 0) defaults.setdefault("trigger_workflows", True) defaults.setdefault("vibration_speed_during_scan", 40) defaults.setdefault("dryrun", False) defaults.setdefault("verify_qrcode", False) defaults.setdefault("autotune_qrreader_per", "baguette") super().__init__(**defaults) def _info_categories(self) -> Dict[str, dict]: categories = super()._info_categories() try: sample_changer = self.sample_changer except (AttributeError, KeyError): return categories categories["status"] = { "# sample holders left": sample_changer.number_of_remaining_baguettes, "selected sample": int(sample_changer.translation.position), "vibration speed (%)": sample_changer.vibration_speed, "automatic calibration": self.workflow_has_calib, "calibration": "OFF", "flat-field": "OFF", } if self.workflow_has_calib: categories["status"]["calibration"] = self._get_calibration().get("image") if self.workflow_has_flatfield: categories["status"]["flat-field"] = "ON" return categories @property def autotune_qrreader_per(self) -> Optional[str]: return self._get_parameter("autotune_qrreader_per") @autotune_qrreader_per.setter def autotune_qrreader_per(self, value: Optional[str]): if value not in (None, "sample", "baguette"): raise ValueError("Allowed values are 'sample', 'baguette' or None") self._set_parameter("autotune_qrreader_per", value)
[docs] def measure_sample(self, *args, has_qrcode: bool = True, **kwargs): if self.dryrun: print("Dry-run: skip measurement") else: return setup_globals.sct(*args, **kwargs)
@property def sample_changer(self): self._raise_when_missing("sample_changer_name") return current_session.env_dict[self.sample_changer_name]
[docs] def eject(self): self.sample_changer.translation.on() self.sample_changer.eject_old_baguette() print( "\n\nNumber of remaining sample holders:", self.sample_changer.number_of_remaining_baguettes, )
[docs] def load(self): self.sample_changer.translation.on() self.sample_changer.load_baguette_with_homing() if self.autotune_qrreader_per == "baguette": self.tune_qrreader_for_baguette()
[docs] def run( self, *scan_args, nholders: Optional[int] = None, use_qr_code: bool = True, current_holder: bool = False, sample_indices: Optional[Tuple[int]] = None, **scan_kwargs, ): if self.workflow_has_calib and not self._get_calibration(): raise RuntimeError("measure a calibration standard first") if self.trigger_workflows: if not self.workflow: raise RuntimeError("initialize a workflow first") if self.pyfai_config and not os.path.exists(self.pyfai_config): raise RuntimeError("the pyFAI configuration file no longer exists") with self.run_context(): if current_holder: self.load() self._run_holder( scan_args, scan_kwargs, use_qr_code=use_qr_code, sample_indices=sample_indices, ) elif nholders is None: while self.sample_changer.has_remaining_baguettes(): self.eject() self.load() self._run_holder( scan_args, scan_kwargs, use_qr_code=use_qr_code, sample_indices=sample_indices, ) self.eject() else: for _ in range(nholders): self.eject() self.load() self._run_holder( scan_args, scan_kwargs, use_qr_code=use_qr_code, sample_indices=sample_indices, ) self.eject()
def _run_holder( self, scan_args: tuple, scan_kwargs: dict, use_qr_code: bool = True, sample_indices: Optional[Tuple[int]] = None, ): if use_qr_code: itfunc = self.sample_changer.iterate_samples else: itfunc = self.sample_changer.iterate_samples_without_qr print("") print("========== RUN HOLDER ==========") for qrcode in itfunc( sample_indices=sample_indices, autoTuningAllowed=self.autotune_qrreader_per == "sample", ): print() self._process_sample(qrcode, scan_args, scan_kwargs) print("================================")
[docs] def calib( self, *scan_args, sample_index: Optional[int] = None, use_qr_code: bool = True, **scan_kwargs, ): if sample_index is None: raise ValueError("argument 'sample_index' not provided") with self.run_context(): qrcode = self._select_sample(sample_index, use_qr_code=use_qr_code) self._process_sample(qrcode, scan_args, scan_kwargs, is_calibration=True)
[docs] def select_sample(self, sample_index: int, use_qr_code: bool = True) -> str: self.sample_changer.translation.on() return self._select_sample(sample_index, use_qr_code=use_qr_code)
def _select_sample(self, sample_index: int, use_qr_code: bool = True) -> str: if use_qr_code: select_sample = self.sample_changer.select_sample else: select_sample = self.sample_changer.select_sample_without_qr return select_sample( sample_index, autoTuningAllowed=self.autotune_qrreader_per == "sample" )
[docs] def init_workflow(self, with_autocalibration: bool = False): if with_autocalibration: basename = "streamline_with_calib" else: basename = "streamline_without_calib" filename = f"{basename}.json" dirname = self._get_workflows_dir(current_session.scan_saving.filename) destination = os.path.join(dirname, filename) if not os.path.exists(destination): os.makedirs(dirname, exist_ok=True) source = os.path.join(RESOURCE_ROOT, filename) shutil.copyfile(source, destination) self.workflow = destination print(f"Active data processing workflow: {destination}")
@property def workflow_has_calib(self): return self.workflow and "with_calib" in self.workflow @property def workflow_has_flatfield(self): return self.workflow and "with_flat" in self.workflow def _set_calibration_scan(self, scan_info: ScanInfo): if not scan_info.filename: print("Cannot use as calibration because no data was collected") return info = { "image": self._get_image_url(scan_info), "gallery_directory": self._get_gallery_directory(scan_info.filename), } position = self._get_calibration_position() if self.calibration_scans is None: self.calibration_scans = dict() self.calibration_scans[position] = info def _trigger_processing(self, scan_info: ScanInfo, processed_metadata: dict): if not scan_info.filename: print("Cannot trigger workflow because no data was collected") return args, kwargs = self._job_arguments(scan_info, processed_metadata) submit(args=args, kwargs=kwargs)
[docs] @contextmanager def run_context(self): self.sample_changer.translation.on() self.sample_changer.vibration_speed = self.vibration_speed_during_scan elog_print("Start streamline run") try: yield finally: self.sample_changer.vibration_speed = 0 elog_print("End streamline run")
def _process_sample( self, qrcode: str, scan_args: tuple, scan_kwargs: dict, is_calibration: bool = False, ): try: with self._verify_qrcode(qrcode) as qrcode: has_qrcode = qrcode != self._qrcode_error self._newsample(qrcode) self._set_scan_metadata(scan_args, scan_kwargs) self._set_raw_dataset_metadata(scan_args, scan_kwargs) scan = self.measure_sample( *scan_args, has_qrcode=has_qrcode, **scan_kwargs ) scan_info = self._get_scan_info(scan) if is_calibration: self._set_calibration_scan(scan_info) if self.trigger_workflows: processed_metadata = self._get_processed_dataset_metadata(scan_args) if self.dryrun: print("Dry-run: skip workflow triggering") else: self._trigger_processing(scan_info, processed_metadata) if is_calibration: elog_print( f"Streamline calibrant {self.calibrant}: {current_session.scan_saving.filename}" ) finally: setup_globals.enddataset()
[docs] def qr_read(self) -> str: return self.sample_changer.qr_read( autoTuningAllowed=self.autotune_qrreader_per == "sample" )
@property def _qrcode_error(self) -> str: return self.sample_changer.qrreader.QRCODE_NOT_READABLE
[docs] def tune_qrreader(self, force=False) -> str: self.sample_changer.tune_qrreader(force=force)
[docs] def tune_qrreader_for_baguette(self) -> None: for _ in self.sample_changer.iterate_samples_without_qr(): qrcode = self.tune_qrreader() if qrcode != self._qrcode_error: break else: print("QR-reader tuning failed when loading the baguette")
@contextmanager def _verify_qrcode(self, qrcode: str) -> Generator[str, None, None]: """Check the QR-code before (yields the new code when it changed) and check the QR-code after (raises and exception when it changed)""" if not self.verify_qrcode: yield qrcode return qrcode_now = self.qr_read() if qrcode_now != qrcode and qrcode_now != self._qrcode_error: print( f"Reading the QR-code twice gives first '{qrcode}' and then '{qrcode_now}'" ) qrcode = qrcode_now try: yield qrcode finally: qrcode_now = self.qr_read() if qrcode_now != qrcode and qrcode_now != self._qrcode_error: if qrcode == self._qrcode_error: msg = f"The sample name of dataset '{current_session.scan_saving.filename}' is '{qrcode_now}' (read at the end of the dataset)" elog_print(msg) else: msg = f"The sample name of dataset '{current_session.scan_saving.filename}' might be '{qrcode_now}' (read at the end of the dataset)" elog_print(msg) raise RuntimeError(msg) def _newsample(self, qrcode: str) -> None: setup_globals.newsample(qrcode) setup_globals.newdataset() def _set_raw_dataset_metadata(self, scan_args: tuple, scan_kwargs: dict) -> None: for k, v in self._get_raw_dataset_metadata(scan_args).items(): current_session.scan_saving.dataset[k] = v def _set_scan_metadata(self, scan_args: tuple, scan_kwargs: dict) -> None: scan_info = self._get_scan_metadata() if scan_info: scan_kwargs["scan_info"] = scan_info def _get_scan_metadata(self) -> Optional[dict]: return None def _get_raw_dataset_metadata(self, scan_args: tuple) -> dict: metadata = {"definition": "HTXRPD"} if self.energy_name: metadata["HTXRPD_energy"] = getattr( setup_globals, self.energy_name ).position if scan_args and isinstance(scan_args[0], Number): metadata["HTXRPD_exposureTime"] = scan_args[0] speed = self.vibration_speed_during_scan if speed is not None: metadata["HTXRPD_sampleVibration"] = speed position = self._get_calibration_position() if position is not None: metadata["HTXRPD_distance"] = position return metadata def _get_processed_dataset_metadata(self, scan_args: tuple) -> dict: metadata = dict() metadata["Sample_name"] = current_session.scan_saving.dataset["Sample_name"] return metadata def _get_calibration_position(self) -> Optional[Number]: if self.calibration_motor: return getattr(setup_globals, self.calibration_motor).position def _get_calibration(self) -> dict: calibration_scans = self.calibration_scans if not calibration_scans: return dict() position = self._get_calibration_position() if position is not None: positions = [p for p in calibration_scans if p is not None] if positions: idx = (numpy.abs(numpy.array(positions) - position)).argmin() position = positions[idx] return calibration_scans.get(position, dict()) def _get_scan_info(self, scan) -> ScanInfo: if scan is None: return ScanInfo(filename="", scan_nb=0) if isinstance(scan, ScanInfo): return scan filename = scan.scan_info.get("filename") scan_nb = scan.scan_info.get("scan_nb") return ScanInfo(filename=filename, scan_nb=scan_nb) def _get_image_url(self, scan_info: ScanInfo) -> str: url = f"silx://{scan_info.filename}?path=/{scan_info.scan_nb}.1/measurement/{self.detector_name}" image_slice = self.image_slice if image_slice is not None: image_slice = str(image_slice) image_slice = re.sub(r"[\s\(\)]+", "", image_slice) url = f"{url}&slice={image_slice}" return url def _get_output_dir(self, dataset_filename: str) -> str: return os.path.join( directories.get_processed_dir(dataset_filename), "streamline" ) def _get_transient_dirname(self, dataset_filename: str) -> str: return directories.get_nobackup_dir(dataset_filename) def _get_workflows_dir(self, dataset_filename: str) -> str: return directories.get_workflows_dir(dataset_filename) def _get_output_dirname(self, dataset_filename: str) -> str: filename = os.path.basename(dataset_filename) return os.path.join( self._get_output_dir(dataset_filename), os.path.splitext(filename)[0] ) def _get_gallery_dirname(self, dataset_filename: str) -> str: return os.path.join(self._get_output_dirname(dataset_filename), "gallery") def _get_hdf5_output_filename(self, dataset_filename: str) -> str: return os.path.join( self._get_output_dirname(dataset_filename), os.path.basename(dataset_filename), ) def _get_ascii_output_filename(self, dataset_filename: str, unit: str) -> str: basename, _ = os.path.splitext(os.path.basename(dataset_filename)) return os.path.join( self._get_output_dirname(dataset_filename), f"{basename}_{unit}.xye" ) def _get_gallery_directory(self, dataset_filename: str) -> str: return self._get_gallery_dirname(dataset_filename) def _get_workflow_save_filename(self, dataset_filename: str) -> str: basename = os.path.basename(self.workflow) return os.path.join( self._get_output_dirname(dataset_filename), basename, ) def _get_workflow_upload_parameters( self, dataset_filename: str, processed_metadata: dict ) -> Optional[dict]: raw = os.path.dirname(dataset_filename) dataset = "integrate" scan_saving = current_session.scan_saving proposal = scan_saving.proposal_name beamline = scan_saving.beamline path = self._get_output_dirname(dataset_filename) return { "beamline": beamline, "proposal": proposal, "dataset": dataset, "path": path, "raw": [raw], "metadata": processed_metadata, } def _job_arguments(self, scan_info: ScanInfo, processed_metadata: dict): """Arguments for the workflow execution""" self._raise_when_missing("workflow") if self.workflow_has_calib: self._raise_when_missing("calibration_scans", "calibrant") inputs = list() integrate_image_url = self._get_image_url(scan_info) # Configuration if self.integration_options: inputs.append( { "task_identifier": "PyFaiConfig", "name": "integration_options", "value": self.integration_options.to_dict(), } ) if self.pyfai_config: inputs.append( { "task_identifier": "PyFaiConfig", "name": "filename", "value": self.pyfai_config, } ) if self.calibrant: inputs.append( { "task_identifier": "PyFaiConfig", "name": "calibrant", "value": self.calibrant, } ) # Calibration if self.workflow_has_calib: calibration = self._get_calibration() if not calibration: raise RuntimeError("no valid calibration found") inputs += [ { "task_identifier": "CalibrateSingle", "name": "image", "value": calibration["image"], }, { "task_identifier": "CalibrateSingle", "name": "fixed", "value": ["energy"], }, { "task_identifier": "CalibrateSingle", "name": "robust", "value": False, }, { "task_identifier": "CalibrateSingle", "name": "ring_detector", "value": self.calib_ring_detector_name, }, { "task_identifier": "DiagnoseCalibrateSingleResults", "name": "image", "value": calibration["image"], }, { "task_identifier": "DiagnoseCalibrateSingleResults", "name": "filename", "value": os.path.join( calibration["gallery_directory"], "ring_detection.png" ), }, ] if calibration["image"] == integrate_image_url and self.calibrant: inputs += [ { "task_identifier": "DiagnoseIntegrate1D", "name": "calibrant", "value": self.calibrant, }, ] # Integration inputs += [ { "task_identifier": "Integrate1D", "name": "image", "value": integrate_image_url, }, { "task_identifier": "Integrate1D", "name": "maximum_persistent_workers", "value": 2, # Q and 2theta }, { "task_identifier": "SaveNexusPattern1D", "name": "url", "value": self._get_hdf5_output_filename(scan_info.filename), }, { "task_identifier": "SaveNexusPattern1D", "name": "bliss_scan_url", "value": scan_info.url, }, ] # Different outputs depending on the unit detector_name = self.detector_name for unit in ("q", "2th"): inputs += [ { "task_identifier": "SaveNexusPattern1D", "label": f"save_{unit}_hdf5", "name": "nxprocess_name", "value": f"{detector_name}_integrate_{unit}", }, { "task_identifier": "SaveNexusPattern1D", "label": f"save_{unit}_hdf5", "name": "nxmeasurement_name", "value": f"{detector_name}_integrated_{unit}", }, { "task_identifier": "SaveNexusPattern1D", "label": f"save_{unit}_hdf5", "name": "metadata", "value": { f"{detector_name}_integrate_{unit}": { "configuration": {"workflow": self.workflow} } }, }, { "task_identifier": "SaveAsciiPattern1D", "label": f"save_{unit}_ascii", "name": "filename", "value": self._get_ascii_output_filename(scan_info.filename, unit), }, ] inputs += [ { "task_identifier": "DiagnoseIntegrate1D", "name": "filename", "value": os.path.join( self._get_gallery_directory(scan_info.filename), "integrate.png", ), }, ] # Job arguments args = (self.workflow,) convert_destination = self._get_workflow_save_filename(scan_info.filename) upload_parameters = self._get_workflow_upload_parameters( scan_info.filename, processed_metadata ) if self.workflow_has_calib: varinfo = { "root_uri": self._get_transient_dirname(scan_info.filename), "scheme": "nexus", } else: varinfo = None kwargs = { "engine": None, "inputs": inputs, "varinfo": varinfo, "convert_destination": convert_destination, "upload_parameters": upload_parameters, "save_options": {"indent": 2}, } return args, kwargs