Source code for blissoda.flint.plotter
import time
from ..import_utils import unavailable_module
from .access import WithFlintAccess
from .colors import ColorCycler
try:
import gevent
except ImportError as ex:
gevent = unavailable_module(ex)
[docs]
class BasePlotter(WithFlintAccess):
def __init__(self, max_plots) -> None:
super().__init__()
self._max_plots = max_plots
self._tasks = []
self._color_cycler = ColorCycler(max_colors=max_plots + 1)
@property
def number_of_scans(self):
"""Maximum number of scans to be plotted"""
return self._max_plots
@number_of_scans.setter
def number_of_scans(self, value):
self._max_plots = max(value, 0)
self._on_color_reset()
def _on_color_reset(self) -> None:
self._color_cycler.max_colors = self._max_plots
[docs]
def handle_workflow_result(self):
raise NotImplementedError()
def _spawn(self, *args, **kw):
task = gevent.spawn(*args, **kw)
self._tasks.append(task)
self.purge_tasks()
[docs]
def purge_tasks(self) -> int:
"""Remove references to tasks that have finished."""
self._tasks = [t for t in self._tasks if t]
return len(self._tasks)
[docs]
def kill_tasks(self) -> int:
"""Kill all tasks."""
gevent.killall(self._tasks)
return self.purge_tasks()
[docs]
def wait_tasks(self, timeout: int = 10) -> int:
"""Wait for all plot tasks to be finished."""
deadline = time.time() + timeout
for task in list(self._tasks):
timeleft = deadline - time.time()
if timeleft < 0:
raise TimeoutError(f"{timeout} seconds")
task.join(timeout=timeleft)
if task:
raise TimeoutError(f"{timeout} seconds")
return self.purge_tasks()
[docs]
def replot(self, **retry_options) -> None:
raise NotImplementedError()