Source code for langchain_community.utilities.redis
from __future__ import annotations
import logging
import re
from typing import TYPE_CHECKING, Any, List, Optional, Pattern
from urllib.parse import urlparse
import numpy as np
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from redis.client import Redis as RedisType
def _array_to_buffer(array: List[float], dtype: Any = np.float32) -> bytes:
return np.array(array).astype(dtype).tobytes()
def _buffer_to_array(buffer: bytes, dtype: Any = np.float32) -> List[float]:
return np.frombuffer(buffer, dtype=dtype).tolist()
[docs]class TokenEscaper:
"""在输入字符串中转义标点符号。"""
# Characters that RediSearch requires us to escape during queries.
# Source: https://redis.io/docs/stack/search/reference/escaping/#the-rules-of-text-field-tokenization
DEFAULT_ESCAPED_CHARS = r"[,.<>{}\[\]\\\"\':;!@#$%^&*()\-+=~\/ ]"
[docs] def __init__(self, escape_chars_re: Optional[Pattern] = None):
if escape_chars_re:
self.escaped_chars_re = escape_chars_re
else:
self.escaped_chars_re = re.compile(self.DEFAULT_ESCAPED_CHARS)
[docs] def escape(self, value: str) -> str:
if not isinstance(value, str):
raise TypeError(
"Value must be a string object for token escaping."
f"Got type {type(value)}"
)
def escape_symbol(match: re.Match) -> str:
value = match.group(0)
return f"\\{value}"
return self.escaped_chars_re.sub(escape_symbol, value)
[docs]def check_redis_module_exist(client: RedisType, required_modules: List[dict]) -> None:
"""检查是否安装了正确的Redis模块。"""
installed_modules = client.module_list()
installed_modules = {
module[b"name"].decode("utf-8"): module for module in installed_modules
}
for module in required_modules:
if module["name"] in installed_modules and int(
installed_modules[module["name"]][b"ver"]
) >= int(module["ver"]):
return
# otherwise raise error
error_message = (
"Redis cannot be used as a vector database without RediSearch >=2.4"
"Please head to https://redis.io/docs/stack/search/quick_start/"
"to know more about installing the RediSearch module within Redis Stack."
)
logger.error(error_message)
raise ValueError(error_message)
[docs]def get_client(redis_url: str, **kwargs: Any) -> RedisType:
"""从给定的连接URL获取一个redis客户端。这个辅助函数接受Redis服务器的URL(TCP,带/不带TLS或UnixSocket)以及Redis Sentinel连接。
不支持Redis集群。
在创建连接之前,会检查数据库驱动程序的存在,否则会引发ValueError。
要使用,应该已安装``redis`` python包。
示例:
.. code-block:: python
from langchain_community.utilities.redis import get_client
redis_client = get_client(
redis_url="redis://username:password@localhost:6379"
index_name="my-index",
embedding_function=embeddings.embed_query,
)
要使用具有多个Redis服务器和Redis Sentinel的Redis复制设置,请将"redis_url"设置为"redis+sentinel://"方案。使用此URL格式需要一个路径,其中包含在Sentinels中获取正确的Redis服务器连接的Redis服务的名称。默认服务名称为"mymaster"。路径的可选第二部分是要连接的Redis数据库编号。
用于连接到rediserver和sentinel的可选用户名或密码,不支持为服务器和sentinel使用不同的密码。另一个约束是只能给出一个sentinel实例:
示例:
.. code-block:: python
from langchain_community.utilities.redis import get_client
redis_client = get_client(
redis_url="redis+sentinel://username:password@sentinelhost:26379/mymaster/0"
index_name="my-index",
embedding_function=embeddings.embed_query,
)
"""
# Initialize with necessary components.
try:
import redis
except ImportError:
raise ImportError(
"Could not import redis python package. "
"Please install it with `pip install redis>=4.1.0`."
)
# check if normal redis:// or redis+sentinel:// url
if redis_url.startswith("redis+sentinel"):
redis_client = _redis_sentinel_client(redis_url, **kwargs)
elif redis_url.startswith("rediss+sentinel"): # sentinel with TLS support enables
kwargs["ssl"] = True
if "ssl_cert_reqs" not in kwargs:
kwargs["ssl_cert_reqs"] = "none"
redis_client = _redis_sentinel_client(redis_url, **kwargs)
else:
# connect to redis server from url, reconnect with cluster client if needed
redis_client = redis.from_url(redis_url, **kwargs)
if _check_for_cluster(redis_client):
redis_client.close()
redis_client = _redis_cluster_client(redis_url, **kwargs)
return redis_client
def _redis_sentinel_client(redis_url: str, **kwargs: Any) -> RedisType:
"""用于解析(非官方)redis+sentinel url 并创建一个 Sentinel 连接以获取最终的 redis 客户端连接到一个主从复制的副本-主机进行读写操作的辅助方法。
如果提供了用于身份验证的用户名和/或密码,则相同的凭据将用于 Redis Sentinel 和 Redis 服务器。使用仅使用 redis url 的这种实现,不可能在两个系统上使用不同的身份验证数据。
"""
import redis
parsed_url = urlparse(redis_url)
# sentinel needs list with (host, port) tuple, use default port if none available
sentinel_list = [(parsed_url.hostname or "localhost", parsed_url.port or 26379)]
if parsed_url.path:
# "/mymaster/0" first part is service name, optional second part is db number
path_parts = parsed_url.path.split("/")
service_name = path_parts[1] or "mymaster"
if len(path_parts) > 2:
kwargs["db"] = path_parts[2]
else:
service_name = "mymaster"
sentinel_args = {}
if parsed_url.password:
sentinel_args["password"] = parsed_url.password
kwargs["password"] = parsed_url.password
if parsed_url.username:
sentinel_args["username"] = parsed_url.username
kwargs["username"] = parsed_url.username
# check for all SSL related properties and copy them into sentinel_kwargs too,
# add client_name also
for arg in kwargs:
if arg.startswith("ssl") or arg == "client_name":
sentinel_args[arg] = kwargs[arg]
# sentinel user/pass is part of sentinel_kwargs, user/pass for redis server
# connection as direct parameter in kwargs
sentinel_client = redis.sentinel.Sentinel(
sentinel_list, sentinel_kwargs=sentinel_args, **kwargs
)
# redis server might have password but not sentinel - fetch this error and try
# again without pass, everything else cannot be handled here -> user needed
try:
sentinel_client.execute_command("ping")
except redis.exceptions.AuthenticationError as ae:
if "no password is set" in ae.args[0]:
logger.warning(
"Redis sentinel connection configured with password but Sentinel \
answered NO PASSWORD NEEDED - Please check Sentinel configuration"
)
sentinel_client = redis.sentinel.Sentinel(sentinel_list, **kwargs)
else:
raise ae
return sentinel_client.master_for(service_name)
def _check_for_cluster(redis_client: RedisType) -> bool:
import redis
try:
cluster_info = redis_client.info("cluster")
return cluster_info["cluster_enabled"] == 1
except redis.exceptions.RedisError:
return False
def _redis_cluster_client(redis_url: str, **kwargs: Any) -> RedisType:
from redis.cluster import RedisCluster
return RedisCluster.from_url(redis_url, **kwargs) # type: ignore[return-value]