Source code for blissoda.app.workflow_server.subscriberv0
try:
from gevent import monkey
except ImportError:
pass
else:
monkey.patch_all()
import logging # noqa E402
from typing import Dict # noqa E402
from typing import Iterator # noqa E402
from typing import Tuple # noqa E402
from ...import_utils import unavailable_function # noqa E402
try:
from blissdata.data.node import get_session_node
except ImportError as ex:
get_session_node = unavailable_function(ex)
logger = logging.getLogger(__name__)
[docs]
def scan_iterator(session_name) -> Iterator[Tuple[str, int, Dict]]:
session = get_session_node(session_name)
logger.info(f"Started listening to Bliss session '{session_name}'")
exclude_scan_types = ("scan", "scan_group")
for ev in session.walk_on_new_events(
exclude_children=exclude_scan_types, wait=True
):
if ev.type == ev.type.NEW_NODE and ev.node.type == "scan":
info = ev.node.info
workflows = info.get("workflows")
if not workflows:
continue
filename = info.get("filename")
scan_nb = info.get("scan_nb")
yield filename, scan_nb, workflows