Source code for blissoda.flint.plotter

import time

from ..import_utils import unavailable_module
from .access import WithFlintAccess
from .colors import ColorCycler

try:
    import gevent
except ImportError as ex:
    gevent = unavailable_module(ex)


[docs] class BasePlotter(WithFlintAccess): def __init__(self, max_plots) -> None: super().__init__() self._max_plots = max_plots self._tasks = [] self._color_cycler = ColorCycler(max_colors=max_plots + 1) @property def number_of_scans(self): """Maximum number of scans to be plotted""" return self._max_plots @number_of_scans.setter def number_of_scans(self, value): self._max_plots = max(value, 0) self._on_color_reset() def _on_color_reset(self) -> None: self._color_cycler.max_colors = self._max_plots
[docs] def handle_workflow_result(self): raise NotImplementedError()
def _spawn(self, *args, **kw): task = gevent.spawn(*args, **kw) self._tasks.append(task) self.purge_tasks()
[docs] def purge_tasks(self) -> int: """Remove references to tasks that have finished.""" self._tasks = [t for t in self._tasks if t] return len(self._tasks)
[docs] def kill_tasks(self) -> int: """Kill all tasks.""" gevent.killall(self._tasks) return self.purge_tasks()
[docs] def wait_tasks(self, timeout: int = 10) -> int: """Wait for all plot tasks to be finished.""" deadline = time.time() + timeout for task in list(self._tasks): timeleft = deadline - time.time() if timeleft < 0: raise TimeoutError(f"{timeout} seconds") task.join(timeout=timeleft) if task: raise TimeoutError(f"{timeout} seconds") return self.purge_tasks()
[docs] def replot(self, **retry_options) -> None: raise NotImplementedError()