Source code for blissoda.xrpd.models

import datetime
import logging
import os
from typing import List
from typing import Literal
from typing import Optional
from typing import Type
from typing import Union

from redis import Redis
from redis_om import JsonModel
from redis_om.model.model import NotFoundError

from ..bliss_globals import current_session
from ..import_utils import UnavailableObject
from ..import_utils import is_available
from ..persistent.ndarray import PersistentNdArray
from .compatibility import MODEL_ARGUMENTS
from .compatibility import field_validator
from .compatibility import get_redis_db_url

logger = logging.getLogger(__name__)

XrpdFieldName = Literal["radial", "azimuthal", "intensity"]


def _create_database_proxy() -> Union[Redis, UnavailableObject]:
    try:
        return Redis.from_url(get_redis_db_url())
    except Exception as ex:
        # We are here because one of these reasons:
        #  - BEACON_HOST is not defined (ValueError).
        #  - BEACON_HOST is wrong (some socket exception).
        #  - Beacon or Redis are down (some socket exception).
        if is_available(current_session):
            # We are in a Bliss session. We should always have access to Redis.
            raise
        # We are not in a Bliss session, most likely the test suite.
        logger.warning(
            "Redis connection not available (%s). If this is not the "
            "test suite there is an issue with BEACON_HOST=%r or Redis itself.",
            ex,
            os.environ.get("BEACON_HOST", None),
        )
        return UnavailableObject(ImportError())


[docs] class XrpdPlotInfo(JsonModel, frozen=False, **MODEL_ARGUMENTS): scan_name: str lima_name: str radial_label: str azim_label: Optional[str] hdf5_url: Optional[str] timestamp: datetime.datetime field_names: List[XrpdFieldName] color: Optional[str] = None is_last_scan: bool = True is_sum: bool = False
[docs] @field_validator("timestamp", mode="before") def set_timestamp(cls, value): if isinstance(value, str): try: value = datetime.datetime.fromisoformat(value) except ValueError: pass if isinstance(value, str): try: value = datetime.datetime.strptime(value, "%Y-%m-%dT%H:%M:%S.%fZ") except ValueError: pass if isinstance(value, str): try: value = datetime.datetime.strptime(value, "%Y-%m-%dT%H:%M:%S.%f") except ValueError: pass if isinstance(value, str): logger.warning("XrpdPlotInfo: ignore invalid timestamp %r", value) value = None if value is None: value = datetime.datetime.now() if isinstance(value, datetime.datetime): value = value.astimezone() return value
@property def legend(self) -> str: return f"{self.scan_name} ({self.lima_name})" def _get_redis_key(self, field_name: XrpdFieldName) -> str: # TODO: remove spaces from redis key but handle existing # Redis memory allocated in production return f"{self.scan_name}:{self.lima_name}:plot_data:{field_name}"
[docs] def get_data_array(self, field_name: XrpdFieldName) -> PersistentNdArray: if field_name not in self.field_names: raise KeyError(f"No field {field_name} in this PlotInfo") redis_key = self._get_redis_key(field_name) logger.debug("XrpdPlotInfo GET %r", redis_key) return PersistentNdArray(redis_key)
[docs] def delete_data_arrays(self) -> None: for field_name in self.field_names: redis_key = self._get_redis_key(field_name) logger.debug("XrpdPlotInfo DEL %r", redis_key) PersistentNdArray(redis_key).remove()
[docs] @classmethod def get(cls, pk) -> Type["XrpdPlotInfo"]: try: return super().get(pk) except NotFoundError: # Reraise NotFoundError to get more info in the error message raise KeyError(f"PlotInfo not found at {pk}@{cls._meta.database}")
[docs] class Meta: database = _create_database_proxy()