Source code for blissoda.demo.processors.stop_scan

import logging
from typing import Generator

from blissdata.beacon.data import BeaconData
from blissdata.redis_engine.store import DataStore
from esrf_pathlib import ESRFPath
from ewoksutils.task_utils import task_inputs

from ...ewoks_utils import submit
from ...import_utils import unavailable_class
from ...import_utils import unavailable_module
from ...version_utils import assert_has_minimal_version

try:
    import gevent
except ImportError as ex:
    gevent = unavailable_module(ex)

try:
    from bliss.scanning.chain import AcquisitionChain
except ImportError as ex:
    AcquisitionChain = unavailable_class(ex)


try:
    from bliss.scanning.chain import ChainPreset
except ImportError as ex:
    ChainPreset = unavailable_class(ex)

try:
    from bliss.scanning.chain import ChainIterationPreset
except ImportError as ex:
    ChainIterationPreset = unavailable_class(ex)

logger = logging.getLogger(__name__)


[docs] class DemoPreset(ChainPreset): """Start an Ewoks workflow at the start of the scan. This workflow analyses scan data and provides feedback in the form of events in a Redis stream. Each scan point the events are checked with the following result: - stop the scan - do nothing """ def __init__(self, lima_limits: dict, scalar_counter_limits: dict) -> None: assert_has_minimal_version("bliss", "2.2") self._lima_limits = lima_limits self._scalar_counter_limits = scalar_counter_limits self._future = None self._filename = None self._scan_number = None self._scan_key = None self._redis_url = BeaconData().get_redis_data_db() self._data_store = DataStore(self._redis_url) self._stream_name = "blissoda:feedback:stop_task" self._last_id = "0-0" self._events_buffer = [] self._greenlet = None
[docs] def start(self, chain: AcquisitionChain) -> None: self._cache_scan_info(chain) self._start_workflow()
[docs] def stop(self, chain: AcquisitionChain) -> None: self._stop_workflow()
def _cache_scan_info(self, chain: AcquisitionChain) -> None: self._filename = chain.scan.scan_info["filename"] self._scan_number = chain.scan.scan_info["scan_nb"] self._scan_key = chain.scan._scan_data.key def _start_workflow(self) -> None: """Start the Ewoks workflow and the background greenlet.""" if self._filename is None: raise RuntimeError("Scan file name is not known") workflow = { "graph": {"schema_version": "1.1", "id": "stop_scan"}, "nodes": [ { "task_type": "class", "task_identifier": "blissoda.demo.tasks.stop_scan.StopScan", } ], } inputs = task_inputs( task_identifier="StopScan", inputs=dict( filename=self._filename, scan_number=self._scan_number, scan_key=self._scan_key, lima_limits=self._lima_limits, scalar_counter_limits=self._scalar_counter_limits, redis_url=self._redis_url, response_stream_name=self._stream_name, retry_timeout=60, ), ) # clear previous stream self._data_store._redis.delete(self._stream_name) self._last_id = "0-0" # useful for manual workflow execution during development phase convert_destination = str( ESRFPath(self._filename).nobackup_path / "last_stop_scan.json" ) # submit workflow self._future = submit( args=(workflow,), kwargs=dict(inputs=inputs, convert_destination=convert_destination), ) logger.debug("Stop scan workflow started: %r", self._future.uuid) # start background greenlet self._greenlet = gevent.spawn(self._background_event_collector) def _stop_workflow(self) -> None: """Kill the background greenlet and ensure workflow is finished.""" if self._greenlet is not None: self._greenlet.kill(block=True) self._greenlet = None # ensure workflow finished if self._future is None: return if self._future.done(): try: self._future.result() except Exception as ex: logger.error("Stop scan workflow failed: %s", ex) else: self._future.cancel()
[docs] def get_iterator( self, chain: AcquisitionChain ) -> Generator[ChainIterationPreset, None, None]: """Consume feedback events each iteration.""" while True: yield ChainIterationPreset() # consume and clear buffered events events_to_process = list(self._events_buffer) self._events_buffer.clear() for event in events_to_process: if event.get(b"type") == b"stop": chain.scan.stop(wait=False) break
def _background_event_collector(self) -> None: """Continuously fetch and buffer feedback events.""" while True: try: data = self._data_store._redis.xread( {self._stream_name: self._last_id}, count=None, # read all available block=1000, # wait for 1 sec until at least one new event ) if not data: continue for _, events in data: for event_id, event in events: self._last_id = event_id self._events_buffer.append(event) except Exception as ex: logger.exception("Error reading Redis stream: %s", ex)