Source code for langchain_community.vectorstores.redis.base

"""Redis向量数据库的封装。"""

from __future__ import annotations

import logging
import os
import uuid
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Mapping,
    Optional,
    Tuple,
    Type,
    Union,
    cast,
)

import numpy as np
import yaml
from langchain_core._api import deprecated
from langchain_core.callbacks import (
    AsyncCallbackManagerForRetrieverRun,
    CallbackManagerForRetrieverRun,
)
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.utils import get_from_dict_or_env
from langchain_core.vectorstores import VectorStore, VectorStoreRetriever

from langchain_community.utilities.redis import (
    _array_to_buffer,
    _buffer_to_array,
    check_redis_module_exist,
    get_client,
)
from langchain_community.vectorstores.redis.constants import (
    REDIS_REQUIRED_MODULES,
    REDIS_TAG_SEPARATOR,
)
from langchain_community.vectorstores.utils import maximal_marginal_relevance

logger = logging.getLogger(__name__)
ListOfDict = List[Dict[str, str]]

if TYPE_CHECKING:
    from redis.client import Redis as RedisType
    from redis.commands.search.query import Query

    from langchain_community.vectorstores.redis.filters import RedisFilterExpression
    from langchain_community.vectorstores.redis.schema import RedisModel


def _default_relevance_score(val: float) -> float:
    return 1 - val


[docs]def check_index_exists(client: RedisType, index_name: str) -> bool: """检查Redis索引是否存在。""" try: client.ft(index_name).info() except: # noqa: E722 logger.debug("Index does not exist") return False logger.debug("Index already exists") return True
[docs]class Redis(VectorStore): """Redis向量数据库。 要使用,应该安装``redis`` python包并运行Redis Enterprise或Redis-Stack服务器。 对于生产用例,建议使用Redis Enterprise,因为扩展性、性能、稳定性和可用性要比Redis-Stack好得多。 然而,对于测试和原型设计,并不需要这样做。Redis-Stack可作为Docker容器使用,提供完整的向量搜索API。 .. code-block:: bash # 在本地Docker中运行redis stack docker run -d -p 6379:6379 -p 8001:8001 redis/redis-stack:latest 运行后,可以使用以下URL模式连接到redis服务器: - redis://<host>:<port> # 简单连接 - redis://<username>:<password>@<host>:<port> # 带身份验证的连接 - rediss://<host>:<port> # 带SSL连接 - rediss://<username>:<password>@<host>:<port> # 带SSL和身份验证的连接 示例: 以下示例展示了如何使用LangChain中的Redis VectorStore。 对于以下所有示例,假设我们有以下导入: .. code-block:: python from langchain_community.vectorstores import Redis from langchain_community.embeddings import OpenAIEmbeddings 初始化、创建索引并加载文档 .. code-block:: python from langchain_community.vectorstores import Redis from langchain_community.embeddings import OpenAIEmbeddings rds = Redis.from_documents( documents, # 来自加载器或创建的Document对象列表 embeddings, # 一个Embeddings对象 redis_url="redis://localhost:6379", ) 初始化、创建索引并加载带元数据的文档 .. code-block:: python rds = Redis.from_texts( texts, # 字符串列表 metadata, # 元数据字典列表 embeddings, # 一个Embeddings对象 redis_url="redis://localhost:6379", ) 初始化、创建索引并加载带元数据的文档并返回键 .. code-block:: python rds, keys = Redis.from_texts_return_keys( texts, # 字符串列表 metadata, # 元数据字典列表 embeddings, # 一个Embeddings对象 redis_url="redis://localhost:6379", ) 对于需要保持索引活动的用例,可以初始化一个索引名称,这样以后更容易引用 .. code-block:: python rds = Redis.from_texts( texts, # 字符串列表 metadata, # 元数据字典列表 embeddings, # 一个Embeddings对象 index_name="my-index", redis_url="redis://localhost:6379", ) 初始化并连接到现有索引(来自上述) .. code-block:: python # 必须从另一个索引传入schema和key_prefix existing_rds = Redis.from_existing_index( embeddings, # 一个Embeddings对象 index_name="my-index", schema=rds.schema, # 从另一个索引转储的schema key_prefix=rds.key_prefix, # 从另一个索引的键前缀 redis_url="redis://localhost:6379", ) 高级示例: 可以提供自定义向量模式以更改Redis创建的底层向量模式。这对于生产用例很有用,可以优化向量模式以适应您的用例。例如,使用HNSW而不是默认的FLAT(knn) .. code-block:: python vector_schema = { "algorithm": "HNSW" } rds = Redis.from_texts( texts, # 字符串列表 metadata, # 元数据字典列表 embeddings, # 一个Embeddings对象 vector_schema=vector_schema, redis_url="redis://localhost:6379", ) 可以提供自定义索引模式以更改元数据的索引方式。如果想要使用Redis的混合查询(过滤)功能,这将很有用。 默认情况下,此实现将根据以下规则自动生成索引模式: - 所有字符串都作为文本字段索引 - 所有数字都作为数值字段索引 - 所有字符串列表都作为标签字段索引(由langchain_community.vectorstores.redis.constants.REDIS_TAG_SEPARATOR连接) - 所有None值不被索引,但仍存储在Redis中,这些值无法通过此接口检索,但可以使用原始Redis客户端检索。 - 所有其他类型不被索引 要覆盖这些规则,可以传入类似以下的自定义索引模式 .. code-block:: yaml tag: - name: credit_score text: - name: user - name: job 通常,“credit_score”字段应该是文本字段,因为它是一个字符串,但我们可以通过上面显示的yaml配置(也可以是字典)和下面的代码来指定字段类型以覆盖此行为。 .. code-block:: python rds = Redis.from_texts( texts, # 字符串列表 metadata, # 元数据字典列表 embeddings, # 一个Embeddings对象 index_schema="path/to/index_schema.yaml", # 也可以是字典 redis_url="redis://localhost:6379", ) 当连接到应用了自定义模式的现有索引时,重要的是将相同的模式传递给``from_existing_index``方法。否则,新添加的样本的模式将不正确,并且元数据将不会被返回。""" DEFAULT_VECTOR_SCHEMA = { "name": "content_vector", "algorithm": "FLAT", "dims": 1536, "distance_metric": "COSINE", "datatype": "FLOAT32", }
[docs] def __init__( self, redis_url: str, index_name: str, embedding: Embeddings, index_schema: Optional[Union[Dict[str, ListOfDict], str, os.PathLike]] = None, vector_schema: Optional[Dict[str, Union[str, int]]] = None, relevance_score_fn: Optional[Callable[[float], float]] = None, key_prefix: Optional[str] = None, **kwargs: Any, ): """使用必要的组件初始化Redis向量存储。""" self._check_deprecated_kwargs(kwargs) try: # TODO use importlib to check if redis is installed import redis # noqa: F401 except ImportError as e: raise ImportError( "Could not import redis python package. " "Please install it with `pip install redis`." ) from e self.index_name = index_name self._embeddings = embedding try: redis_client = get_client(redis_url=redis_url, **kwargs) # check if redis has redisearch module installed check_redis_module_exist(redis_client, REDIS_REQUIRED_MODULES) except ValueError as e: raise ValueError(f"Redis failed to connect: {e}") self.client = redis_client self.relevance_score_fn = relevance_score_fn self._schema = self._get_schema_with_defaults(index_schema, vector_schema) self.key_prefix = key_prefix if key_prefix is not None else f"doc:{index_name}"
@property def embeddings(self) -> Optional[Embeddings]: """如果可用,访问查询嵌入对象。""" return self._embeddings
[docs] @classmethod def from_texts_return_keys( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, index_name: Optional[str] = None, index_schema: Optional[Union[Dict[str, ListOfDict], str, os.PathLike]] = None, vector_schema: Optional[Dict[str, Union[str, int]]] = None, **kwargs: Any, ) -> Tuple[Redis, List[str]]: """从原始文档创建一个Redis向量存储。 这是一个用户友好的接口,它: 1. 嵌入文档。 2. 如果索引不存在,则创建一个新的Redis索引。 3. 将文档添加到新创建的Redis索引中。 4. 存储后返回新创建文档的键。 如果未定义`index_schema`,此方法将根据传入的元数据生成模式。如果定义了`index_schema`,它将与生成的模式进行比较,并在存在差异时发出警告。如果您有意为元数据定义模式,则可以忽略该警告。 要查看模式选项,请初始化此类的一个实例,并使用`Redis.schema`属性打印模式。这将包括始终存在于langchain模式中的content和content_vector类。 示例: .. code-block:: python from langchain_community.vectorstores import Redis from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() redis, keys = Redis.from_texts_return_keys( texts, embeddings, redis_url="redis://localhost:6379" ) 参数: texts(List[str]):要添加到向量存储中的文本列表。 embedding(Embeddings):用于向量存储的嵌入。 metadatas(Optional[List[dict],可选):要添加到向量存储中的元数据字典的可选列表。默认为None。 index_name(Optional[str],可选):要创建或添加的索引的可选名称。默认为None。 index_schema(Optional[Union[Dict[str,ListOfDict],str,os.PathLike],可选):要在元数据中索引的可选字段。覆盖生成的模式。默认为None。 vector_schema(Optional[Dict[str,Union[str,int]],可选):要使用的向量模式。默认为None。 **kwargs(Any):传递给Redis客户端的其他关键字参数。 返回: Tuple[Redis,List[str]]:Redis实例和新创建文档的键的元组。 Raises: ValueError:如果元数据的数量与文本的数量不匹配。 """ try: # TODO use importlib to check if redis is installed import redis # noqa: F401 from langchain_community.vectorstores.redis.schema import read_schema except ImportError as e: raise ImportError( "Could not import redis python package. " "Please install it with `pip install redis`." ) from e redis_url = get_from_dict_or_env(kwargs, "redis_url", "REDIS_URL") if "redis_url" in kwargs: kwargs.pop("redis_url") # flag to use generated schema if "generate" in kwargs: kwargs.pop("generate") # see if the user specified keys keys = None if "keys" in kwargs: keys = kwargs.pop("keys") # Name of the search index if not given if not index_name: index_name = uuid.uuid4().hex # type check for metadata if metadatas: if isinstance(metadatas, list) and len(metadatas) != len(texts): # type: ignore # noqa: E501 raise ValueError("Number of metadatas must match number of texts") if not (isinstance(metadatas, list) and isinstance(metadatas[0], dict)): raise ValueError("Metadatas must be a list of dicts") generated_schema = _generate_field_schema(metadatas[0]) if index_schema: # read in the schema solely to compare to the generated schema user_schema = read_schema(index_schema) # type: ignore # the very rare case where a super user decides to pass the index # schema and a document loader is used that has metadata which # we need to map into fields. if user_schema != generated_schema: logger.warning( "`index_schema` does not match generated metadata schema.\n" + "If you meant to manually override the schema, please " + "ignore this message.\n" + f"index_schema: {user_schema}\n" + f"generated_schema: {generated_schema}\n" ) else: # use the generated schema index_schema = generated_schema # Create instance # init the class -- if Redis is unavailable, will throw exception instance = cls( redis_url, index_name, embedding, index_schema=index_schema, vector_schema=vector_schema, **kwargs, ) # Add data to Redis keys = instance.add_texts(texts, metadatas, keys=keys) return instance, keys
[docs] @classmethod def from_texts( cls: Type[Redis], texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, index_name: Optional[str] = None, index_schema: Optional[Union[Dict[str, ListOfDict], str, os.PathLike]] = None, vector_schema: Optional[Dict[str, Union[str, int]]] = None, **kwargs: Any, ) -> Redis: """从文本列表创建一个Redis向量存储。 这是一个用户友好的接口,可以: 1. 嵌入文档。 2. 如果索引不存在,则创建一个新的Redis索引。 3. 将文档添加到新创建的Redis索引中。 如果未定义`index_schema`,此方法将根据传入的元数据生成模式。如果定义了`index_schema`,它将与生成的模式进行比较,并在存在差异时发出警告。如果您有意为元数据定义模式,则可以忽略该警告。 要查看模式选项,请初始化此类的一个实例,并使用`Redis.schema`属性打印出模式。这将包括langchain模式中始终存在的content和content_vector类。 示例: .. code-block:: python from langchain_community.vectorstores import Redis from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() redisearch = RediSearch.from_texts( texts, embeddings, redis_url="redis://username:password@localhost:6379" ) 参数: texts (List[str]): 要添加到向量存储中的文本列表。 embedding (Embeddings): 嵌入模型类(例如OpenAIEmbeddings)用于嵌入查询。 metadatas (Optional[List[dict]], optional): 要添加到向量存储中的元数据字典的可选列表。默认为None。 index_name (Optional[str], optional): 要创建或添加的索引的可选名称。默认为None。 index_schema (Optional[Union[Dict[str, ListOfDict], str, os.PathLike]], optional): 要在元数据中索引的可选字段。覆盖生成的模式。默认为None。 vector_schema (Optional[Dict[str, Union[str, int]]], optional): 要使用的向量模式。默认为None。 **kwargs (Any): 要传递给Redis客户端的其他关键字参数。 返回: Redis: Redis向量存储实例。 引发: ValueError: 如果元数据的数量与文本的数量不匹配。 ImportError: 如果未安装redis python包。 """ instance, _ = cls.from_texts_return_keys( texts, embedding, metadatas=metadatas, index_name=index_name, index_schema=index_schema, vector_schema=vector_schema, **kwargs, ) return instance
[docs] @classmethod def from_existing_index( cls, embedding: Embeddings, index_name: str, schema: Union[Dict[str, ListOfDict], str, os.PathLike, Dict[str, ListOfDict]], key_prefix: Optional[str] = None, **kwargs: Any, ) -> Redis: """连接到现有的Redis索引。 示例: .. code-block:: python from langchain_community.vectorstores import Redis from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() # 必须从另一个索引中传递模式和键前缀 existing_rds = Redis.from_existing_index( embeddings, index_name="my-index", schema=rds.schema, # 从另一个索引中导出的模式 key_prefix=rds.key_prefix, # 从另一个索引中的键前缀 redis_url="redis://username:password@localhost:6379", ) 参数: embedding (Embeddings): 嵌入模型类(例如OpenAIEmbeddings)用于嵌入查询。 index_name (str): 要连接的索引的名称。 schema (Union[Dict[str, str], str, os.PathLike, Dict[str, ListOfDict]]): 索引和向量模式的模式。可以是字典或yaml文件的路径。 key_prefix (Optional[str]): 与此索引关联的Redis中所有键的前缀。 **kwargs (Any): 传递给Redis客户端的其他关键字参数。 返回: Redis: Redis VectorStore实例。 引发: ValueError: 如果索引不存在。 ImportError: 如果未安装redis python包。 """ redis_url = get_from_dict_or_env(kwargs, "redis_url", "REDIS_URL") # We need to first remove redis_url from kwargs, # otherwise passing it to Redis will result in an error. if "redis_url" in kwargs: kwargs.pop("redis_url") # Create instance # init the class -- if Redis is unavailable, will throw exception instance = cls( redis_url, index_name, embedding, index_schema=schema, key_prefix=key_prefix, **kwargs, ) # Check for existence of the declared index if not check_index_exists(instance.client, index_name): # Will only raise if the running Redis server does not # have a record of this particular index raise ValueError( f"Redis failed to connect: Index {index_name} does not exist." ) return instance
@property def schema(self) -> Dict[str, List[Any]]: """返回索引的模式。""" return self._schema.as_dict()
[docs] def write_schema(self, path: Union[str, os.PathLike]) -> None: """将模式写入一个yaml文件。""" with open(path, "w+") as f: yaml.dump(self.schema, f)
[docs] @staticmethod def delete( ids: Optional[List[str]] = None, **kwargs: Any, ) -> bool: """删除Redis条目。 参数: ids:要删除的id列表(在redis中的键)。 redis_url:Redis连接URL。这应该在kwargs中传递或设置为环境变量:REDIS_URL。 返回: bool:删除是否成功。 引发: ValueError:如果未安装redis python包。 ValueError:如果未提供ids(在redis中的键)。 """ redis_url = get_from_dict_or_env(kwargs, "redis_url", "REDIS_URL") if ids is None: raise ValueError("'ids' (keys)() were not provided.") try: import redis # noqa: F401 except ImportError: raise ImportError( "Could not import redis python package. " "Please install it with `pip install redis`." ) try: # We need to first remove redis_url from kwargs, # otherwise passing it to Redis will result in an error. if "redis_url" in kwargs: kwargs.pop("redis_url") client = get_client(redis_url=redis_url, **kwargs) except ValueError as e: raise ValueError(f"Your redis connected error: {e}") # Check if index exists try: client.delete(*ids) logger.info("Entries deleted") return True except: # noqa: E722 # ids does not exist return False
[docs] @staticmethod def drop_index( index_name: str, delete_documents: bool, **kwargs: Any, ) -> bool: """删除一个Redis搜索索引。 参数: index_name(str):要删除的索引名称。 delete_documents(bool):是否删除关联的文档。 返回: bool:删除是否成功。 """ redis_url = get_from_dict_or_env(kwargs, "redis_url", "REDIS_URL") try: import redis # noqa: F401 except ImportError: raise ImportError( "Could not import redis python package. " "Please install it with `pip install redis`." ) try: # We need to first remove redis_url from kwargs, # otherwise passing it to Redis will result in an error. if "redis_url" in kwargs: kwargs.pop("redis_url") client = get_client(redis_url=redis_url, **kwargs) except ValueError as e: raise ValueError(f"Your redis connected error: {e}") # Check if index exists try: client.ft(index_name).dropindex(delete_documents) logger.info("Drop index") return True except: # noqa: E722 # Index not exist return False
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, embeddings: Optional[List[List[float]]] = None, batch_size: int = 1000, clean_metadata: bool = True, **kwargs: Any, ) -> List[str]: """向向量存储中添加更多文本。 参数: texts (Iterable[str]): 要添加到向量存储中的字符串/文本的可迭代对象。 metadatas (Optional[List[dict]], optional): 元数据的可选列表。默认为None。 embeddings (Optional[List[List[float]]], optional): 可选的预生成的嵌入。默认为None。 keys (List[str]) or ids (List[str]): 条目的标识符。默认为None。 batch_size (int, optional): 用于写入的批处理大小。默认为1000。 返回: List[str]: 添加到向量存储中的id列表 """ ids = [] # Get keys or ids from kwargs # Other vectorstores use ids keys_or_ids = kwargs.get("keys", kwargs.get("ids")) # type check for metadata if metadatas: if isinstance(metadatas, list) and len(metadatas) != len(texts): # type: ignore # noqa: E501 raise ValueError("Number of metadatas must match number of texts") if not (isinstance(metadatas, list) and isinstance(metadatas[0], dict)): raise ValueError("Metadatas must be a list of dicts") embeddings = embeddings or self._embeddings.embed_documents(list(texts)) self._create_index_if_not_exist(dim=len(embeddings[0])) # Write data to redis pipeline = self.client.pipeline(transaction=False) for i, text in enumerate(texts): # Use provided values by default or fallback key = keys_or_ids[i] if keys_or_ids else str(uuid.uuid4().hex) if not key.startswith(self.key_prefix + ":"): key = self.key_prefix + ":" + key metadata = metadatas[i] if metadatas else {} metadata = _prepare_metadata(metadata) if clean_metadata else metadata pipeline.hset( key, mapping={ self._schema.content_key: text, self._schema.content_vector_key: _array_to_buffer( embeddings[i], self._schema.vector_dtype ), **metadata, }, ) ids.append(key) # Write batch if i % batch_size == 0: pipeline.execute() # Cleanup final batch pipeline.execute() return ids
[docs] def as_retriever(self, **kwargs: Any) -> RedisVectorStoreRetriever: tags = kwargs.pop("tags", None) or [] tags.extend(self._get_retriever_tags()) return RedisVectorStoreRetriever(vectorstore=self, **kwargs, tags=tags)
[docs] @deprecated("0.0.1", alternative="similarity_search(distance_threshold=0.1)") def similarity_search_limit_score( self, query: str, k: int = 4, score_threshold: float = 0.2, **kwargs: Any ) -> List[Document]: """返回与查询文本在score_threshold范围内最相似的索引文档。 已弃用:请改用带有distance_threshold的similarity_search。 参数: query(str):要查找相似文档的查询文本。 k(int):要返回的文档数量。默认为4。 score_threshold(float):要被视为匹配所需的最小匹配*距离*。默认为0.2。 返回: List[Document]:与查询文本最相似的文档列表,包括每个文档的匹配分数。 注意: 如果没有文档满足score_threshold值,则返回一个空列表。 """ return self.similarity_search( query, k=k, distance_threshold=score_threshold, **kwargs )
[docs] def similarity_search_with_score( self, query: str, k: int = 4, filter: Optional[RedisFilterExpression] = None, return_metadata: bool = True, **kwargs: Any, ) -> List[Tuple[Document, float]]: """运行使用 **向量距离** 进行相似性搜索。 从此函数返回的“scores”是从查询向量到原始向量距离。对于相似性分数,请使用``similarity_search_with_relevance_scores``。 参数: query (str): 要查找相似文档的查询文本。 k (int): 要返回的文档数量。默认为4。 filter (RedisFilterExpression, optional): 可选的元数据过滤器。 默认为None。 return_metadata (bool, optional): 是否返回元数据。 默认为True。 返回: List[Tuple[Document, float]]: 与查询最相似的文档列表,以及每个文档的距离。 """ try: import redis except ImportError as e: raise ImportError( "Could not import redis python package. " "Please install it with `pip install redis`." ) from e if "score_threshold" in kwargs: logger.warning( "score_threshold is deprecated. Use distance_threshold instead." + "score_threshold should only be used in " + "similarity_search_with_relevance_scores." + "score_threshold will be removed in a future release.", ) query_embedding = self._embeddings.embed_query(query) redis_query, params_dict = self._prepare_query( query_embedding, k=k, filter=filter, with_metadata=return_metadata, with_distance=True, **kwargs, ) # Perform vector search # ignore type because redis-py is wrong about bytes try: results = self.client.ft(self.index_name).search(redis_query, params_dict) # type: ignore # noqa: E501 except redis.exceptions.ResponseError as e: # split error message and see if it starts with "Syntax" if str(e).split(" ")[0] == "Syntax": raise ValueError( "Query failed with syntax error. " + "This is likely due to malformation of " + "filter, vector, or query argument" ) from e raise e # Prepare document results docs_with_scores: List[Tuple[Document, float]] = [] for result in results.docs: metadata = {} if return_metadata: metadata = {"id": result.id} metadata.update(self._collect_metadata(result)) doc = Document(page_content=result.content, metadata=metadata) distance = self._calculate_fp_distance(result.distance) docs_with_scores.append((doc, distance)) return docs_with_scores
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[RedisFilterExpression] = None, return_metadata: bool = True, distance_threshold: Optional[float] = None, **kwargs: Any, ) -> List[Document]: """运行查询向量和索引向量之间的相似性搜索。 参数: embedding (List[float]): 要查找相似文档的查询向量。 k (int): 要返回的文档数量。默认为4。 filter (RedisFilterExpression, optional): 可选的元数据过滤器。默认为None。 return_metadata (bool, optional): 是否返回元数据。默认为True。 distance_threshold (Optional[float], optional): 选定文档与查询向量之间的最大向量距离。默认为None。 返回: List[Document]: 与查询文本最相似的文档列表。 """ try: import redis except ImportError as e: raise ImportError( "Could not import redis python package. " "Please install it with `pip install redis`." ) from e if "score_threshold" in kwargs: logger.warning( "score_threshold is deprecated. Use distance_threshold instead." + "score_threshold should only be used in " + "similarity_search_with_relevance_scores." + "score_threshold will be removed in a future release.", ) redis_query, params_dict = self._prepare_query( embedding, k=k, filter=filter, distance_threshold=distance_threshold, with_metadata=return_metadata, with_distance=False, ) # Perform vector search # ignore type because redis-py is wrong about bytes try: results = self.client.ft(self.index_name).search(redis_query, params_dict) # type: ignore # noqa: E501 except redis.exceptions.ResponseError as e: # split error message and see if it starts with "Syntax" if str(e).split(" ")[0] == "Syntax": raise ValueError( "Query failed with syntax error. " + "This is likely due to malformation of " + "filter, vector, or query argument" ) from e raise e # Prepare document results docs = [] for result in results.docs: metadata = {} if return_metadata: metadata = {"id": result.id} metadata.update(self._collect_metadata(result)) content_key = self._schema.content_key docs.append( Document(page_content=getattr(result, content_key), metadata=metadata) ) return docs
def _collect_metadata(self, result: "Document") -> Dict[str, Any]: """从Redis中收集元数据。 该方法确保元数据与用户传递给该类的索引模式或该类生成的索引模式之间没有不匹配。 参数: result(Document):从Redis返回的redis.commands.search.Document对象。 返回: Dict[str, Any]:收集的元数据。 """ # new metadata dict as modified by this method meta = {} for key in self._schema.metadata_keys: try: meta[key] = getattr(result, key) except AttributeError: # warning about attribute missing logger.warning( f"Metadata key {key} not found in metadata. " + "Setting to None. \n" + "Metadata fields defined for this instance: " + f"{self._schema.metadata_keys}" ) meta[key] = None return meta def _prepare_query( self, query_embedding: List[float], k: int = 4, filter: Optional[RedisFilterExpression] = None, distance_threshold: Optional[float] = None, with_metadata: bool = True, with_distance: bool = False, ) -> Tuple["Query", Dict[str, Any]]: # Creates Redis query params_dict: Dict[str, Union[str, bytes, float]] = { "vector": _array_to_buffer(query_embedding, self._schema.vector_dtype), } # prepare return fields including score return_fields = [self._schema.content_key] if with_distance: return_fields.append("distance") if with_metadata: return_fields.extend(self._schema.metadata_keys) if distance_threshold: params_dict["distance_threshold"] = distance_threshold return ( self._prepare_range_query( k, filter=filter, return_fields=return_fields ), params_dict, ) return ( self._prepare_vector_query(k, filter=filter, return_fields=return_fields), params_dict, ) def _prepare_range_query( self, k: int, filter: Optional[RedisFilterExpression] = None, return_fields: Optional[List[str]] = None, ) -> "Query": try: from redis.commands.search.query import Query except ImportError as e: raise ImportError( "Could not import redis python package. " "Please install it with `pip install redis`." ) from e return_fields = return_fields or [] vector_key = self._schema.content_vector_key base_query = f"@{vector_key}:[VECTOR_RANGE $distance_threshold $vector]" if filter: base_query = str(filter) + " " + base_query query_string = base_query + "=>{$yield_distance_as: distance}" return ( Query(query_string) .return_fields(*return_fields) .sort_by("distance") .paging(0, k) .dialect(2) ) def _prepare_vector_query( self, k: int, filter: Optional[RedisFilterExpression] = None, return_fields: Optional[List[str]] = None, ) -> "Query": """准备用于向量搜索的查询。 参数: k:要返回的结果数量。 filter:可选的元数据过滤器。 返回: query:查询对象。 """ try: from redis.commands.search.query import Query except ImportError as e: raise ImportError( "Could not import redis python package. " "Please install it with `pip install redis`." ) from e return_fields = return_fields or [] query_prefix = "*" if filter: query_prefix = f"{str(filter)}" vector_key = self._schema.content_vector_key base_query = f"({query_prefix})=>[KNN {k} @{vector_key} $vector AS distance]" query = ( Query(base_query) .return_fields(*return_fields) .sort_by("distance") .paging(0, k) .dialect(2) ) return query def _get_schema_with_defaults( self, index_schema: Optional[Union[Dict[str, ListOfDict], str, os.PathLike]] = None, vector_schema: Optional[Dict[str, Union[str, int]]] = None, ) -> "RedisModel": # should only be called after init of Redis (so Import handled) from langchain_community.vectorstores.redis.schema import ( RedisModel, read_schema, ) schema = RedisModel() # read in schema (yaml file or dict) and # pass to the Pydantic validators if index_schema: schema_values = read_schema(index_schema) # type: ignore schema = RedisModel(**schema_values) # ensure user did not exclude the content field # no modifications if content field found schema.add_content_field() # if no content_vector field, add vector field to schema # this makes adding a vector field to the schema optional when # the user just wants additional metadata try: # see if user overrode the content vector schema.content_vector # if user overrode the content vector, check if they # also passed vector schema. This won't be used since # the index schema overrode the content vector if vector_schema: logger.warning( "`vector_schema` is ignored since content_vector is " + "overridden in `index_schema`." ) # user did not override content vector except ValueError: # set default vector schema and update with user provided schema # if the user provided any vector_field = self.DEFAULT_VECTOR_SCHEMA.copy() if vector_schema: vector_field.update(vector_schema) # add the vector field either way schema.add_vector_field(vector_field) return schema def _create_index_if_not_exist(self, dim: int = 1536) -> None: try: from redis.commands.search.indexDefinition import ( # type: ignore IndexDefinition, IndexType, ) except ImportError: raise ImportError( "Could not import redis python package. " "Please install it with `pip install redis`." ) # Set vector dimension # can't obtain beforehand because we don't # know which embedding model is being used. self._schema.content_vector.dims = dim # Check if index exists if not check_index_exists(self.client, self.index_name): # Create Redis Index self.client.ft(self.index_name).create_index( fields=self._schema.get_fields(), definition=IndexDefinition( prefix=[self.key_prefix], index_type=IndexType.HASH ), ) def _calculate_fp_distance(self, distance: str) -> float: """根据向量数据类型计算距离 支持两种数据类型: - FLOAT32 - FLOAT64 如果是FLOAT32,则需要将距离四舍五入到小数点后4位 否则,四舍五入到小数点后7位。 """ if self._schema.content_vector.datatype == "FLOAT32": return round(float(distance), 4) return round(float(distance), 7) def _check_deprecated_kwargs(self, kwargs: Mapping[str, Any]) -> None: """检查已弃用的kwargs。""" deprecated_kwargs = { "redis_host": "redis_url", "redis_port": "redis_url", "redis_password": "redis_url", "content_key": "index_schema", "vector_key": "vector_schema", "distance_metric": "vector_schema", } for key, value in kwargs.items(): if key in deprecated_kwargs: raise ValueError( f"Keyword argument '{key}' is deprecated. " f"Please use '{deprecated_kwargs[key]}' instead." ) def _select_relevance_score_fn(self) -> Callable[[float], float]: if self.relevance_score_fn: return self.relevance_score_fn metric_map = { "COSINE": self._cosine_relevance_score_fn, "IP": self._max_inner_product_relevance_score_fn, "L2": self._euclidean_relevance_score_fn, } try: return metric_map[self._schema.content_vector.distance_metric] except KeyError: return _default_relevance_score
def _generate_field_schema(data: Dict[str, Any]) -> Dict[str, Any]: """根据输入的元数据,在Redis中为搜索索引生成一个模式。 给定一个元数据字典,此函数将每个元数据字段分类为以下三个类别之一: - 文本:字段包含文本数据。 - 数值:字段包含数值数据(整数或浮点数)。 - 标签:字段包含标签列表(字符串)。 参数 data(Dict[str,Any]):一个字典,其中键是元数据字段名称,值是元数据值。 返回: Dict[str,Any]:一个包含三个键“text”、“numeric”和“tag”的字典。 每个键映射到属于该类别的字段列表。 引发: ValueError:如果无法将元数据字段分类为任何三种已知类型之一。 """ result: Dict[str, Any] = { "text": [], "numeric": [], "tag": [], } for key, value in data.items(): # Numeric fields try: int(value) result["numeric"].append({"name": key}) continue except (ValueError, TypeError): pass # None values are not indexed as of now if value is None: continue # if it's a list of strings, we assume it's a tag if isinstance(value, (list, tuple)): if not value or isinstance(value[0], str): result["tag"].append({"name": key}) else: name = type(value[0]).__name__ raise ValueError( f"List/tuple values should contain strings: '{key}': {name}" ) continue # Check if value is string before processing further if isinstance(value, str): result["text"].append({"name": key}) continue # Unable to classify the field value name = type(value).__name__ raise ValueError( "Could not generate Redis index field type mapping " + f"for metadata: '{key}': {name}" ) return result def _prepare_metadata(metadata: Dict[str, Any]) -> Dict[str, Any]: """准备元数据以在Redis中进行索引,通过清理其值。 - 字符串、整数和浮点数的值保持不变。 - None或空值将被替换为空字符串。 - 字符串列表/元组将被连接成一个带逗号分隔符的单个字符串。 参数: metadata(Dict[str, Any]):一个字典,其中键是元数据字段名称,值是元数据值。 返回: Dict[str, Any]:一个经过清理的字典,准备在Redis中进行索引。 引发: ValueError:如果任何元数据值不是已知类型(字符串、整数、浮点数或字符串列表)。 """ def raise_error(key: str, value: Any) -> None: raise ValueError( f"Metadata value for key '{key}' must be a string, int, " + f"float, or list of strings. Got {type(value).__name__}" ) clean_meta: Dict[str, Union[str, float, int]] = {} for key, value in metadata.items(): if value is None: clean_meta[key] = "" continue # No transformation needed if isinstance(value, (str, int, float)): clean_meta[key] = value # if it's a list/tuple of strings, we join it elif isinstance(value, (list, tuple)): if not value or isinstance(value[0], str): clean_meta[key] = REDIS_TAG_SEPARATOR.join(value) else: raise_error(key, value) else: raise_error(key, value) return clean_meta
[docs]class RedisVectorStoreRetriever(VectorStoreRetriever): """用于Redis VectorStore的检索器。""" vectorstore: Redis """Redis 向量存储。""" search_type: str = "similarity" """要执行的搜索类型。可以是 '相似性', '相似性距离阈值', '相似性分数阈值'""" search_kwargs: Dict[str, Any] = { "k": 4, "score_threshold": 0.9, # set to None to avoid distance used in score_threshold search "distance_threshold": None, } """默认搜索参数。""" allowed_search_types = [ "similarity", "similarity_distance_threshold", "similarity_score_threshold", "mmr", ] """允许的搜索类型。""" class Config: """此pydantic对象的配置。""" arbitrary_types_allowed = True def _get_relevant_documents( self, query: str, *, run_manager: CallbackManagerForRetrieverRun ) -> List[Document]: if self.search_type == "similarity": docs = self.vectorstore.similarity_search(query, **self.search_kwargs) elif self.search_type == "similarity_distance_threshold": if self.search_kwargs["distance_threshold"] is None: raise ValueError( "distance_threshold must be provided for " + "similarity_distance_threshold retriever" ) docs = self.vectorstore.similarity_search(query, **self.search_kwargs) elif self.search_type == "similarity_score_threshold": docs_and_similarities = ( self.vectorstore.similarity_search_with_relevance_scores( query, **self.search_kwargs ) ) docs = [doc for doc, _ in docs_and_similarities] elif self.search_type == "mmr": docs = self.vectorstore.max_marginal_relevance_search( query, **self.search_kwargs ) else: raise ValueError(f"search_type of {self.search_type} not allowed.") return docs async def _aget_relevant_documents( self, query: str, *, run_manager: AsyncCallbackManagerForRetrieverRun ) -> List[Document]: if self.search_type == "similarity": docs = await self.vectorstore.asimilarity_search( query, **self.search_kwargs ) elif self.search_type == "similarity_distance_threshold": if self.search_kwargs["distance_threshold"] is None: raise ValueError( "distance_threshold must be provided for " + "similarity_distance_threshold retriever" ) docs = await self.vectorstore.asimilarity_search( query, **self.search_kwargs ) elif self.search_type == "similarity_score_threshold": docs_and_similarities = ( await self.vectorstore.asimilarity_search_with_relevance_scores( query, **self.search_kwargs ) ) docs = [doc for doc, _ in docs_and_similarities] elif self.search_type == "mmr": docs = await self.vectorstore.amax_marginal_relevance_search( query, **self.search_kwargs ) else: raise ValueError(f"search_type of {self.search_type} not allowed.") return docs
[docs] def add_documents(self, documents: List[Document], **kwargs: Any) -> List[str]: """向向量存储库添加文档。""" return self.vectorstore.add_documents(documents, **kwargs)
[docs] async def aadd_documents( self, documents: List[Document], **kwargs: Any ) -> List[str]: """向向量存储库添加文档。""" return await self.vectorstore.aadd_documents(documents, **kwargs)