Source code for langchain_community.utilities.redis

from __future__ import annotations

import logging
import re
from typing import TYPE_CHECKING, Any, List, Optional, Pattern
from urllib.parse import urlparse

import numpy as np

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
    from redis.client import Redis as RedisType


def _array_to_buffer(array: List[float], dtype: Any = np.float32) -> bytes:
    return np.array(array).astype(dtype).tobytes()


def _buffer_to_array(buffer: bytes, dtype: Any = np.float32) -> List[float]:
    return np.frombuffer(buffer, dtype=dtype).tolist()


[docs]class TokenEscaper: """在输入字符串中转义标点符号。""" # Characters that RediSearch requires us to escape during queries. # Source: https://redis.io/docs/stack/search/reference/escaping/#the-rules-of-text-field-tokenization DEFAULT_ESCAPED_CHARS = r"[,.<>{}\[\]\\\"\':;!@#$%^&*()\-+=~\/ ]"
[docs] def __init__(self, escape_chars_re: Optional[Pattern] = None): if escape_chars_re: self.escaped_chars_re = escape_chars_re else: self.escaped_chars_re = re.compile(self.DEFAULT_ESCAPED_CHARS)
[docs] def escape(self, value: str) -> str: if not isinstance(value, str): raise TypeError( "Value must be a string object for token escaping." f"Got type {type(value)}" ) def escape_symbol(match: re.Match) -> str: value = match.group(0) return f"\\{value}" return self.escaped_chars_re.sub(escape_symbol, value)
[docs]def check_redis_module_exist(client: RedisType, required_modules: List[dict]) -> None: """检查是否安装了正确的Redis模块。""" installed_modules = client.module_list() installed_modules = { module[b"name"].decode("utf-8"): module for module in installed_modules } for module in required_modules: if module["name"] in installed_modules and int( installed_modules[module["name"]][b"ver"] ) >= int(module["ver"]): return # otherwise raise error error_message = ( "Redis cannot be used as a vector database without RediSearch >=2.4" "Please head to https://redis.io/docs/stack/search/quick_start/" "to know more about installing the RediSearch module within Redis Stack." ) logger.error(error_message) raise ValueError(error_message)
[docs]def get_client(redis_url: str, **kwargs: Any) -> RedisType: """从给定的连接URL获取一个redis客户端。这个辅助函数接受Redis服务器的URL(TCP,带/不带TLS或UnixSocket)以及Redis Sentinel连接。 不支持Redis集群。 在创建连接之前,会检查数据库驱动程序的存在,否则会引发ValueError。 要使用,应该已安装``redis`` python包。 示例: .. code-block:: python from langchain_community.utilities.redis import get_client redis_client = get_client( redis_url="redis://username:password@localhost:6379" index_name="my-index", embedding_function=embeddings.embed_query, ) 要使用具有多个Redis服务器和Redis Sentinel的Redis复制设置,请将"redis_url"设置为"redis+sentinel://"方案。使用此URL格式需要一个路径,其中包含在Sentinels中获取正确的Redis服务器连接的Redis服务的名称。默认服务名称为"mymaster"。路径的可选第二部分是要连接的Redis数据库编号。 用于连接到rediserver和sentinel的可选用户名或密码,不支持为服务器和sentinel使用不同的密码。另一个约束是只能给出一个sentinel实例: 示例: .. code-block:: python from langchain_community.utilities.redis import get_client redis_client = get_client( redis_url="redis+sentinel://username:password@sentinelhost:26379/mymaster/0" index_name="my-index", embedding_function=embeddings.embed_query, ) """ # Initialize with necessary components. try: import redis except ImportError: raise ImportError( "Could not import redis python package. " "Please install it with `pip install redis>=4.1.0`." ) # check if normal redis:// or redis+sentinel:// url if redis_url.startswith("redis+sentinel"): redis_client = _redis_sentinel_client(redis_url, **kwargs) elif redis_url.startswith("rediss+sentinel"): # sentinel with TLS support enables kwargs["ssl"] = True if "ssl_cert_reqs" not in kwargs: kwargs["ssl_cert_reqs"] = "none" redis_client = _redis_sentinel_client(redis_url, **kwargs) else: # connect to redis server from url, reconnect with cluster client if needed redis_client = redis.from_url(redis_url, **kwargs) if _check_for_cluster(redis_client): redis_client.close() redis_client = _redis_cluster_client(redis_url, **kwargs) return redis_client
def _redis_sentinel_client(redis_url: str, **kwargs: Any) -> RedisType: """用于解析(非官方)redis+sentinel url 并创建一个 Sentinel 连接以获取最终的 redis 客户端连接到一个主从复制的副本-主机进行读写操作的辅助方法。 如果提供了用于身份验证的用户名和/或密码,则相同的凭据将用于 Redis Sentinel 和 Redis 服务器。使用仅使用 redis url 的这种实现,不可能在两个系统上使用不同的身份验证数据。 """ import redis parsed_url = urlparse(redis_url) # sentinel needs list with (host, port) tuple, use default port if none available sentinel_list = [(parsed_url.hostname or "localhost", parsed_url.port or 26379)] if parsed_url.path: # "/mymaster/0" first part is service name, optional second part is db number path_parts = parsed_url.path.split("/") service_name = path_parts[1] or "mymaster" if len(path_parts) > 2: kwargs["db"] = path_parts[2] else: service_name = "mymaster" sentinel_args = {} if parsed_url.password: sentinel_args["password"] = parsed_url.password kwargs["password"] = parsed_url.password if parsed_url.username: sentinel_args["username"] = parsed_url.username kwargs["username"] = parsed_url.username # check for all SSL related properties and copy them into sentinel_kwargs too, # add client_name also for arg in kwargs: if arg.startswith("ssl") or arg == "client_name": sentinel_args[arg] = kwargs[arg] # sentinel user/pass is part of sentinel_kwargs, user/pass for redis server # connection as direct parameter in kwargs sentinel_client = redis.sentinel.Sentinel( sentinel_list, sentinel_kwargs=sentinel_args, **kwargs ) # redis server might have password but not sentinel - fetch this error and try # again without pass, everything else cannot be handled here -> user needed try: sentinel_client.execute_command("ping") except redis.exceptions.AuthenticationError as ae: if "no password is set" in ae.args[0]: logger.warning( "Redis sentinel connection configured with password but Sentinel \ answered NO PASSWORD NEEDED - Please check Sentinel configuration" ) sentinel_client = redis.sentinel.Sentinel(sentinel_list, **kwargs) else: raise ae return sentinel_client.master_for(service_name) def _check_for_cluster(redis_client: RedisType) -> bool: import redis try: cluster_info = redis_client.info("cluster") return cluster_info["cluster_enabled"] == 1 except redis.exceptions.RedisError: return False def _redis_cluster_client(redis_url: str, **kwargs: Any) -> RedisType: from redis.cluster import RedisCluster return RedisCluster.from_url(redis_url, **kwargs) # type: ignore[return-value]