Source code for blissoda.demo.streamline

import os
import json
from typing import Tuple, Any, Generator

import numpy

try:
    from bliss import setup_globals
except ImportError:
    setup_globals = None
try:
    from bliss import current_session
except ImportError:
    current_session = None

from ..streamline.scanner import StreamlineScanner
from ..utils import directories


[docs] def energy_wavelength(x): """keV to m and vice versa""" return 12.398419843320026 * 1e-10 / x
_DEFAULT_CALIB = { "dist": 5e-2, # 5 cm "poni1": 10e-2, # 10 cm "poni2": 10e-2, # 10 cm "rot1": numpy.radians(10), # 10 deg "rot2": 0, # 0 deg "rot3": 0, # 0 deg "wavelength": energy_wavelength(12), # 12 keV "detector": "Pilatus1M", }
[docs] class DemoStreamlineScanner(StreamlineScanner): def __init__(self, **defaults): defaults.setdefault("detector_name", "difflab6") defaults.setdefault("calibrant", "LaB6") defaults.setdefault("sample_changer_name", "streamline_sc") defaults.setdefault( "integration_options", { "method": "no_csr_cython", "integrator_name": "sigma_clip_ng", "extra_options": {"max_iter": 3, "thres": 0}, "error_model": "azimuthal", # hybrid gives weird results "nbpt_rad": 4096, "unit": "q_nm^-1", }, ) super().__init__(**defaults)
[docs] def measure_sample(self, *args, has_qrcode: bool = True, **kwargs): return setup_globals.sct(*args, setup_globals.difflab6, **kwargs)
[docs] def calib(self, *args, sample_index=0, **kwargs): return super().calib(*args, sample_index=sample_index, **kwargs)
[docs] def init_workflow(self, with_autocalibration: bool = True): self._ensure_pyfai_config() return super().init_workflow(with_autocalibration=with_autocalibration)
def _ensure_pyfai_config(self): """Set the pyfai_config variable and ensure the file exists.""" if not self.pyfai_config: root_dir = self._get_config_dir(current_session.scan_saving.filename) self.pyfai_config = os.path.join(root_dir, "pyfaicalib.json") if os.path.exists(self.pyfai_config): return os.makedirs(os.path.dirname(self.pyfai_config), exist_ok=True) poni = _DEFAULT_CALIB with open(self.pyfai_config, "w") as f: json.dump(poni, f) def _get_demo_result_dir(self, dataset_filename: str) -> str: root_dir = directories.get_processed_dir(dataset_filename) return os.path.join(root_dir, "demo", "streamline") def _get_workflows_dir(self, dataset_filename: str) -> str: root_dir = self._get_demo_result_dir(dataset_filename) return os.path.join(root_dir, "workflows") def _get_config_dir(self, dataset_filename: str) -> str: root_dir = self._get_demo_result_dir(dataset_filename) return os.path.join(root_dir, "config")
[docs] def run(self, *args, **kwargs): self._ensure_pyfai_config() # in case the file was deleted if not os.path.exists(self.workflow): raise RuntimeError( "the workflow file no longer exists, execute 'init_workflow' again" ) super().run(*args, **kwargs)
@property def sample_changer(self): return streamline_sc def _job_arguments(self, *args, **kw): args, kwargs = super()._job_arguments(*args, **kw) is_even = not bool( setup_globals.difflab6.image.width % 2 ) # lima-camera-simulator<1.9.10 does not support odd image widths kwargs["inputs"].append( {"task_identifier": "Integrate1D", "name": "demo", "value": is_even} ) return args, kwargs def _get_workflow_upload_parameters(self, *args) -> None: return None
[docs] class MockQrReader: QRCODE_NOT_READABLE = "QRCODE_NOT_READABLE"
[docs] class MockSampleChanger: def __init__(self, number_of_samples: int = 16) -> None: translation = MockSampleTranslation() self._vibration_speed = 0 self._translation = translation self._number_of_samples = number_of_samples self._allowed_sample_indices = tuple(range(number_of_samples)) self._qrreader = MockQrReader() # should be in Redis but good enough for a demo self._nholders_in_tray = 2 self._loaded = False self._holder_counter = 0 start = 0 stop = number_of_samples - 1 intervals = number_of_samples - 1 self._ascan_arguments = translation, start, stop, intervals @property def qrreader(self) -> MockQrReader: return self._qrreader
[docs] def ascan_arguments(self) -> Tuple[Any, float, float, int]: return self._ascan_arguments
[docs] def fill_tray(self, n=2) -> None: self._nholders_in_tray = n
[docs] def qr_read(self, **_) -> str: if not self._loaded: return self.qrreader.QRCODE_NOT_READABLE position = self._translation.position sample_index = int(position) on_sample = abs(position - sample_index) < 0.01 if not on_sample or sample_index not in self._allowed_sample_indices: return self.qrreader.QRCODE_NOT_READABLE return f"holder{self._holder_counter}_sample{sample_index:02d}_lab6"
[docs] def select_sample(self, sample_index: int, **qrread_options) -> str: for qr_response in self.iterate_samples([sample_index], **qrread_options): return qr_response
[docs] def select_sample_without_qr(self, sample_index) -> str: return self.select_sample(sample_index)
[docs] def iterate_samples( self, sample_indices=None, **qrread_options ) -> Generator[str, None, None]: if not sample_indices: sample_indices = range(self._number_of_samples) for i in sample_indices: self.translation.position = i yield self.qr_read(**qrread_options)
[docs] def iterate_samples_without_qr( self, sample_indices=None ) -> Generator[str, None, None]: yield from self.iterate_samples(sample_indices=sample_indices)
[docs] def eject_old_baguette(self) -> None: if not self._loaded: print("Streamline sample changer: nothing loaded") return self._loaded = False self.translation.position = 20
[docs] def load_baguette_with_homing(self) -> None: if self._loaded: print("Streamline sample changer: already loaded") return if self._nholders_in_tray == 0: print("Streamline sample changer: no more holders in tray") return self._nholders_in_tray = self._nholders_in_tray - 1 self._loaded = True self._holder_counter += 1 self.translation.position = 0
[docs] def has_remaining_baguettes(self) -> bool: return bool(self._nholders_in_tray)
@property def number_of_remaining_baguettes(self) -> int: return self._nholders_in_tray @property def translation(self): return self._translation @property def vibration_speed(self): return self._vibration_speed @vibration_speed.setter def vibration_speed(self, speed): print("SETTING VIBRATION SPEED TO", speed) if speed < 0 or speed > 100: raise RuntimeError( "Speed for the fluidization system out of range (0-100, as in %)" ) self._vibration_speed = speed
[docs] def tune_qrreader(self, force=False) -> str: return self.qr_read()
[docs] def tune_qrreader_for_baguette(self) -> None: pass
[docs] class MockSampleTranslation: def __init__(self) -> None: self._position = 0
[docs] def on(self): pass
@property def position(self): return self._position @position.setter def position(self, value): print("Move streamline_translation from", self._position, "to", value) self._position = value
if setup_globals is None: streamline_scanner = None streamline_sc = None streamline_translation = None else: streamline_scanner = DemoStreamlineScanner() streamline_sc = MockSampleChanger() streamline_translation = MockSampleTranslation()