Source code for blissoda.xrpd.compatibility
from blissdata.beacon.data import BeaconData
from ..version_utils import has_minimal_version
if has_minimal_version("blissdata", "1.1"):
from pydantic import Field # noqa
from pydantic import field_validator
else:
from pydantic.v1 import Field # noqa
from pydantic.v1 import validator
def field_validator(*fields, mode="after", **kwargs):
pre = mode == "before"
return validator(*fields, pre=pre, **kwargs)
MODEL_ARGUMENTS = {}
if has_minimal_version("redis-om", "1.0.0"):
MODEL_ARGUMENTS["index"] = True
[docs]
def get_redis_db_url():
if has_minimal_version("blissdata", "1.0"):
return BeaconData().get_redis_db()
raw_url = BeaconData().get_redis_db()
_, url = raw_url.split(":")
if url.endswith("sock"):
return f"unix://{url}"
else:
return f"redis://{url}"