Source code for blissoda.persistent.ndarray.ndarrayv0

from typing import List
from typing import Union

import numpy

from ...import_utils import unavailable_class

try:
    from blissdata.data.events.channel import ChannelDataEvent
    from blissdata.streaming import DataStream
except ImportError as ex:
    ChannelDataEvent = unavailable_class(ex)
    DataStream = unavailable_class(ex)


[docs] class PersistentNdArray: def __init__(self, redis_key: str) -> None: self._datastream = DataStream(redis_key)
[docs] def append(self, value: numpy.ndarray) -> None: self._datastream.add_event(_encode(value[numpy.newaxis, ...]))
[docs] def extend(self, value: numpy.ndarray) -> None: self._datastream.add_event(_encode(value))
def __getitem__(self, idx) -> Union[numpy.ndarray, List[numpy.ndarray]]: if idx == 0: events = self._datastream.range(count=1) if not events: raise IndexError("index out of range") adict = events[0][1] return _decode(adict)[0] if idx == -1: events = self._datastream.rev_range(count=1) if not events: raise IndexError("index out of range") adict = events[0][1] return _decode(adict)[-1] events = self._datastream.range() if events: event = ChannelDataEvent.merge(events) arr = _data_from_event(event) else: arr = numpy.array([]) if idx == (): return arr return arr[idx]
[docs] def remove(self) -> None: self._datastream.clear()
def _encode(data: numpy.ndarray) -> ChannelDataEvent: desc = {"shape": data.shape[1:], "dtype": data.dtype} return ChannelDataEvent(data, desc) def _decode(data: dict) -> numpy.ndarray: event = ChannelDataEvent(raw=data) return _data_from_event(event) def _data_from_event(event: ChannelDataEvent) -> numpy.ndarray: if event.npoints == 1: return event.data[numpy.newaxis, ...] return event.data