Source code for blissoda.persistent.ndarray.ndarrayv1

import pickle
from typing import Dict
from typing import List
from typing import Union

import numpy

from ...import_utils import unavailable_function

try:
    from bliss.config.conductor.client import get_redis_proxy
except ImportError as ex:
    get_redis_proxy = unavailable_function(ex)


[docs] class PersistentNdArray: def __init__(self, redis_key: str) -> None: self._connection = get_redis_proxy(db=0) self._name = redis_key
[docs] def append(self, value: numpy.ndarray) -> None: self._connection.xadd(self._name, _encode(value[numpy.newaxis, ...]))
[docs] def extend(self, value: numpy.ndarray) -> None: self._connection.xadd(self._name, _encode(value))
def __getitem__(self, idx) -> Union[numpy.ndarray, List[numpy.ndarray]]: if idx == 0: adict = self._connection.xrange(self._name, count=1)[0][1] return _decode(adict)[0] if idx == -1: adict = self._connection.xrevrange(self._name, count=1)[0][1] return _decode(adict)[-1] arr = [_decode(adict) for _, adict in self._connection.xrange(self._name)] if arr: arr = numpy.concatenate(arr) else: arr = numpy.array([]) if isinstance(idx, tuple) and not idx: return arr return arr[idx]
[docs] def remove(self) -> None: self._connection.delete(self._name)
def _encode(data: numpy.ndarray) -> Dict[bytes, bytes]: return {b"data": pickle.dumps(data)} def _decode(data: Dict[bytes, bytes]) -> numpy.ndarray: return pickle.loads(data[b"data"])