Source code for blissoda.id01.cdi_uploader
from ..bliss_globals import current_session
from ..ewoks_utils import Future
from ..import_utils import unavailable_module
try:
import gevent
except ImportError as ex:
gevent = unavailable_module(ex)
[docs]
class CdiUploader:
def __init__(self) -> None:
super().__init__()
self._tasks = []
[docs]
def upload_workflow_result(
self, future: Future, retry_timeout: int, retry_period: int
):
task = gevent.spawn(
send_figures_to_elogbook, future, retry_timeout, retry_period
)
self._tasks.append(task)
self.purge_tasks()
[docs]
def purge_tasks(self) -> int:
"""Remove references to tasks that have finished."""
self._tasks = [t for t in self._tasks if t]
return len(self._tasks)
[docs]
def kill_tasks(self) -> int:
"""Kill all tasks."""
gevent.killall(self._tasks)
return self.purge_tasks()