Source code for blissoda.app.workflow_server
import json
import logging
from typing import Mapping
from typing import Optional
from ...version_utils import has_minimal_version
# Note: import subscriber first because it might require patching
if has_minimal_version("blissdata", "2.0.0rc1"):
from .subscriberv2 import scan_iterator
elif has_minimal_version("blissdata", "1"):
from .subscriberv1 import scan_iterator
else:
from .subscriberv0 import scan_iterator
from ...ewoks_utils import submit # noqa E402
logger = logging.getLogger(__name__)
[docs]
def submit_scan_workflow(workflow=None, **kwargs) -> Optional[str]:
if not workflow:
return
future = submit(args=(workflow,), kwargs=kwargs)
return future.uuid
[docs]
def main(args) -> None:
for filename, scan_nb, workflows in scan_iterator(args.session):
for wfname, nxprocess in workflows.items():
if not isinstance(nxprocess, Mapping):
continue
try:
job_id = submit_scan_workflow(
**json.loads(nxprocess["configuration"]["data"])
)
except Exception:
logger.exception(
f"Error when submitting workflow '{wfname}' for scan {scan_nb} of file '{filename}'"
)
else:
if job_id is not None:
logger.info(
f"Submitted workflow '{wfname}' (JOB ID {job_id}) for scan {scan_nb} of file '{filename}'"
)