Source code for blissoda.ewoks_utils

import logging
from typing import Any
from typing import Dict
from typing import Optional
from typing import Tuple

from ewoksjob.client import Future
from ewoksjob.client import convert_graph  # noqa F401
from ewoksjob.client import get_future  # noqa F401
from ewoksjob.client import submit as _native_submit

logger = logging.getLogger(__name__)


[docs] def submit( args: Tuple[Any, ...], kwargs: Optional[Dict[str, Any]] = None, queue: str = "celery", ) -> Future: if _ICAT_UPLOAD_DISABLED: kwargs = kwargs or {} upload_parameters = kwargs.get("upload_parameters") if upload_parameters is not None: logger.warning("ICAT uploading is disabled: %s", upload_parameters) kwargs["upload_parameters"] = None return _native_submit(args=args, kwargs=kwargs, queue=queue)
_ICAT_UPLOAD_DISABLED = False
[docs] def disable_icat_upload() -> None: global _ICAT_UPLOAD_DISABLED _ICAT_UPLOAD_DISABLED = True