Source code for blissoda.xrpd.compatibility

from blissdata.beacon.data import BeaconData

from ..version_utils import has_minimal_version

if has_minimal_version("blissdata", "1.1"):
    from pydantic import Field  # noqa
    from pydantic import field_validator
else:
    from pydantic.v1 import Field  # noqa
    from pydantic.v1 import validator

    def field_validator(*fields, mode="after", **kwargs):
        pre = mode == "before"
        return validator(*fields, pre=pre, **kwargs)


MODEL_ARGUMENTS = {}

if has_minimal_version("redis-om", "1.0.0"):
    MODEL_ARGUMENTS["index"] = True


[docs] def get_redis_db_url(): if has_minimal_version("blissdata", "1.0"): return BeaconData().get_redis_db() raw_url = BeaconData().get_redis_db() _, url = raw_url.split(":") if url.endswith("sock"): return f"unix://{url}" else: return f"redis://{url}"