Source code for blissoda.demo.processors.stop_scan
import logging
from typing import Generator
from blissdata.beacon.data import BeaconData
from blissdata.redis_engine.store import DataStore
from esrf_pathlib import ESRFPath
from ewoksutils.task_utils import task_inputs
from ...ewoks_utils import submit
from ...import_utils import unavailable_class
from ...import_utils import unavailable_module
from ...version_utils import assert_has_minimal_version
try:
import gevent
except ImportError as ex:
gevent = unavailable_module(ex)
try:
from bliss.scanning.chain import AcquisitionChain
except ImportError as ex:
AcquisitionChain = unavailable_class(ex)
try:
from bliss.scanning.chain import ChainPreset
except ImportError as ex:
ChainPreset = unavailable_class(ex)
try:
from bliss.scanning.chain import ChainIterationPreset
except ImportError as ex:
ChainIterationPreset = unavailable_class(ex)
logger = logging.getLogger(__name__)
[docs]
class DemoPreset(ChainPreset):
"""Start an Ewoks workflow at the start of the scan. This workflow analyses
scan data and provides feedback in the form of events in a Redis stream.
Each scan point the events are checked with the following result:
- stop the scan
- do nothing
"""
def __init__(self, lima_limits: dict, scalar_counter_limits: dict) -> None:
assert_has_minimal_version("bliss", "2.2")
self._lima_limits = lima_limits
self._scalar_counter_limits = scalar_counter_limits
self._future = None
self._filename = None
self._scan_number = None
self._scan_key = None
self._redis_url = BeaconData().get_redis_data_db()
self._data_store = DataStore(self._redis_url)
self._stream_name = "blissoda:feedback:stop_task"
self._last_id = "0-0"
self._events_buffer = []
self._greenlet = None
[docs]
def start(self, chain: AcquisitionChain) -> None:
self._cache_scan_info(chain)
self._start_workflow()
[docs]
def stop(self, chain: AcquisitionChain) -> None:
self._stop_workflow()
def _cache_scan_info(self, chain: AcquisitionChain) -> None:
self._filename = chain.scan.scan_info["filename"]
self._scan_number = chain.scan.scan_info["scan_nb"]
self._scan_key = chain.scan._scan_data.key
def _start_workflow(self) -> None:
"""Start the Ewoks workflow and the background greenlet."""
if self._filename is None:
raise RuntimeError("Scan file name is not known")
workflow = {
"graph": {"schema_version": "1.1", "id": "stop_scan"},
"nodes": [
{
"task_type": "class",
"task_identifier": "blissoda.demo.tasks.stop_scan.StopScan",
}
],
}
inputs = task_inputs(
task_identifier="StopScan",
inputs=dict(
filename=self._filename,
scan_number=self._scan_number,
scan_key=self._scan_key,
lima_limits=self._lima_limits,
scalar_counter_limits=self._scalar_counter_limits,
redis_url=self._redis_url,
response_stream_name=self._stream_name,
retry_timeout=60,
),
)
# clear previous stream
self._data_store._redis.delete(self._stream_name)
self._last_id = "0-0"
# useful for manual workflow execution during development phase
convert_destination = str(
ESRFPath(self._filename).nobackup_path / "last_stop_scan.json"
)
# submit workflow
self._future = submit(
args=(workflow,),
kwargs=dict(inputs=inputs, convert_destination=convert_destination),
)
logger.debug("Stop scan workflow started: %r", self._future.uuid)
# start background greenlet
self._greenlet = gevent.spawn(self._background_event_collector)
def _stop_workflow(self) -> None:
"""Kill the background greenlet and ensure workflow is finished."""
if self._greenlet is not None:
self._greenlet.kill(block=True)
self._greenlet = None
# ensure workflow finished
if self._future is None:
return
if self._future.done():
try:
self._future.result()
except Exception as ex:
logger.error("Stop scan workflow failed: %s", ex)
else:
self._future.cancel()
[docs]
def get_iterator(
self, chain: AcquisitionChain
) -> Generator[ChainIterationPreset, None, None]:
"""Consume feedback events each iteration."""
while True:
yield ChainIterationPreset()
# consume and clear buffered events
events_to_process = list(self._events_buffer)
self._events_buffer.clear()
for event in events_to_process:
if event.get(b"type") == b"stop":
chain.scan.stop(wait=False)
break
def _background_event_collector(self) -> None:
"""Continuously fetch and buffer feedback events."""
while True:
try:
data = self._data_store._redis.xread(
{self._stream_name: self._last_id},
count=None, # read all available
block=1000, # wait for 1 sec until at least one new event
)
if not data:
continue
for _, events in data:
for event_id, event in events:
self._last_id = event_id
self._events_buffer.append(event)
except Exception as ex:
logger.exception("Error reading Redis stream: %s", ex)