Source code for blissoda.ewoks_utils
import logging
from typing import Any
from typing import Dict
from typing import Optional
from typing import Tuple
from ewoksjob.client import Future
from ewoksjob.client import convert_graph # noqa F401
from ewoksjob.client import get_future # noqa F401
from ewoksjob.client import submit as _native_submit
logger = logging.getLogger(__name__)
[docs]
def submit(
args: Tuple[Any, ...],
kwargs: Optional[Dict[str, Any]] = None,
queue: str = "celery",
) -> Future:
if _ICAT_UPLOAD_DISABLED:
kwargs = kwargs or {}
upload_parameters = kwargs.get("upload_parameters")
if upload_parameters is not None:
logger.warning("ICAT uploading is disabled: %s", upload_parameters)
kwargs["upload_parameters"] = None
return _native_submit(args=args, kwargs=kwargs, queue=queue)
_ICAT_UPLOAD_DISABLED = False
[docs]
def disable_icat_upload() -> None:
global _ICAT_UPLOAD_DISABLED
_ICAT_UPLOAD_DISABLED = True