Source code for blissoda.app.workflow_server

import json
import logging
from typing import Mapping
from typing import Optional

from ...version_utils import has_minimal_version

# Note: import subscriber first because it might require patching

if has_minimal_version("blissdata", "2.0.0rc1"):
    from .subscriberv2 import scan_iterator
elif has_minimal_version("blissdata", "1"):
    from .subscriberv1 import scan_iterator
else:
    from .subscriberv0 import scan_iterator

from ...ewoks_utils import submit  # noqa E402

logger = logging.getLogger(__name__)


[docs] def submit_scan_workflow(workflow=None, **kwargs) -> Optional[str]: if not workflow: return future = submit(args=(workflow,), kwargs=kwargs) return future.uuid
[docs] def main(args) -> None: for filename, scan_nb, workflows in scan_iterator(args.session): for wfname, nxprocess in workflows.items(): if not isinstance(nxprocess, Mapping): continue try: job_id = submit_scan_workflow( **json.loads(nxprocess["configuration"]["data"]) ) except Exception: logger.exception( f"Error when submitting workflow '{wfname}' for scan {scan_nb} of file '{filename}'" ) else: if job_id is not None: logger.info( f"Submitted workflow '{wfname}' (JOB ID {job_id}) for scan {scan_nb} of file '{filename}'" )