Source code for blissoda.app.workflow_server

import json
import logging
from typing import Optional, Mapping

from ...exceptions import VersionError

# Note: import subscriber first because it might require patching
try:
    # blissdata >=1
    from .subscriberv1 import scan_iterator
except VersionError as exc:
    try:
        # blissdata >0.3.3,<1 (unreleased, branch id31_2.0)
        from .subscribervid31 import scan_iterator
    except VersionError:
        try:
            # blissdata <=0.3.3
            from .subscriberv0 import scan_iterator
        except VersionError:  # noqa F841
            _EXC = exc

            def scan_iterator(*args, **kw):
                raise _EXC


from ewoksjob.client import submit

logger = logging.getLogger(__name__)


[docs] def submit_scan_workflow(workflow=None, **kwargs) -> Optional[str]: if not workflow: return future = submit(args=(workflow,), kwargs=kwargs) return future.task_id
[docs] def main(args) -> None: for filename, scan_nb, workflows in scan_iterator(args.session): for wfname, nxprocess in workflows.items(): if not isinstance(nxprocess, Mapping): continue try: job_id = submit_scan_workflow( **json.loads(nxprocess["configuration"]["data"]) ) except Exception: logger.exception( f"Error when submitting workflow '{wfname}' for scan {scan_nb} of file '{filename}'" ) else: if job_id is not None: logger.info( f"Submitted workflow '{wfname}' (JOB ID {job_id}) for scan {scan_nb} of file '{filename}'" )