Source code for blissoda.flint.access

import logging
from contextlib import contextmanager
from typing import Dict
from typing import Generator
from typing import Optional
from typing import Type

from ..bliss_globals import current_session
from ..import_utils import is_available
from ..import_utils import unavailable_class
from ..import_utils import unavailable_function
from . import BasePlot
from . import FlintClient

try:
    from gevent.lock import RLock
except ImportError as ex:
    RLock = unavailable_class(ex)

try:
    from bliss.common.plot import get_flint as _native_get_flint
except ImportError as ex:
    _native_get_flint = unavailable_function(ex)


logger = logging.getLogger(__name__)


[docs] class WithFlintAccess: _HAS_BLISS = is_available(current_session) def __init__(self) -> None: self.__plots: Dict[str, BasePlot] = dict() self.__flint_client_id = None self.__plots_lock = None self.__get_plot_stack = 0 @property def _plots_lock(self) -> RLock: if self.__plots_lock is None: self.__plots_lock = RLock() return self.__plots_lock def _get_plot(self, plot_id: str, plot_cls: Type[BasePlot]) -> BasePlot: """Launches Flint and creates the plot when either is missing.""" with self._plots_lock: with self._access_flint_context() as flint_client: plot = self.__plots.get(plot_id) if plot is None: plot = flint_client.get_plot(plot_cls, unique_name=plot_id) logger.info("Created Flint plot %r", plot_id) self.__plots[plot_id] = plot return plot @contextmanager def _access_flint_context( self, reset: bool = False ) -> Generator[FlintClient, None, None]: with self._plots_lock: self.__get_plot_stack += 1 try: flint_client = _get_flint(reset=reset) flint_client_id = id(flint_client), flint_client.pid if flint_client_id != self.__flint_client_id: self.__flint_client_id = flint_client_id if self.__get_plot_stack > 1: if self.__plots: logger.warning( "Cannot resetting Flint plots after restart (recursive call)" ) else: self._on_flint_restart(flint_client) yield flint_client finally: self.__get_plot_stack -= 1
[docs] def reset_flint(self) -> None: with self._access_flint_context(reset=True) as _: pass
def _on_flint_restart(self, flint_client: FlintClient) -> None: """Called whenever a new Flint client is instantiated.""" with self._plots_lock: plots = {} for plot_id in self.__plots: plot_instance = self.__plots[plot_id] plot_cls = type(plot_instance) plots[plot_id] = flint_client.get_plot(plot_cls, unique_name=plot_id) self.__plots = plots
_flint_lock: Optional[RLock] = None _flint_client: Optional[FlintClient] = None _flint_pid: Optional[int] = None def _get_flint(reset: bool = False) -> FlintClient: """Create the Flint client when needed: - not created yet - different process id - not available """ global _flint_client, _flint_pid, _flint_lock if _flint_lock is None: _flint_lock = RLock() with _flint_lock: if reset: _flint_client = None _flint_pid = None try: new_client = ( _flint_client is None or _flint_pid != _flint_client.pid or not _flint_client.is_available() ) except FileNotFoundError: new_client = True if new_client: _flint_client = _native_get_flint() _flint_pid = _flint_client.pid return _flint_client