Source code for langchain_community.vectorstores.upstash

from __future__ import annotations

import logging
import uuid
from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Tuple, Union

import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.utils.iter import batch_iterate
from langchain_core.vectorstores import VectorStore

from langchain_community.vectorstores.utils import (
    maximal_marginal_relevance,
)

if TYPE_CHECKING:
    from upstash_vector import AsyncIndex, Index
    from upstash_vector.types import InfoResult

logger = logging.getLogger(__name__)


[docs]class UpstashVectorStore(VectorStore): """Upstash Vector向量存储 要使用,必须安装``upstash-vector`` python包。 还需要一个Upstash Vector索引。首先创建一个新的Upstash Vector索引,复制`index_url`和`index_token`变量。然后通过构造函数传递它们,或者设置环境变量`UPSTASH_VECTOR_REST_URL`和`UPSTASH_VECTOR_REST_TOKEN`。 示例: .. code-block:: python from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import UpstashVectorStore embeddings = OpenAIEmbeddings(model="text-embedding-3-large") vectorstore = UpstashVectorStore( embedding=embeddings, index_url="...", index_token="..." ) # 或者 import os os.environ["UPSTASH_VECTOR_REST_URL"] = "..." os.environ["UPSTASH_VECTOR_REST_TOKEN"] = "..." vectorstore = UpstashVectorStore( embedding=embeddings ) """
[docs] def __init__( self, text_key: str = "text", index: Optional[Index] = None, async_index: Optional[AsyncIndex] = None, index_url: Optional[str] = None, index_token: Optional[str] = None, embedding: Optional[Union[Embeddings, bool]] = None, ): """UpstashVectorStore的构造函数。 如果未提供index或index_url和index_token,则构造函数将尝试使用环境变量`UPSTASH_VECTOR_REST_URL`和`UPSTASH_VECTOR_REST_TOKEN`来创建索引。 参数: text_key:用于在元数据中存储文本的键。 index:UpstashVector索引对象。 async_index:UpstashVector AsyncIndex对象,仅在需要异步函数时提供。 index_url:UpstashVector索引的URL。 index_token:UpstashVector索引的令牌。 embedding:嵌入对象或布尔值。当为false时,不应用嵌入。如果为true,则使用Upstash嵌入。当使用Upstash嵌入时,文本将直接发送到Upstash,并在那里应用嵌入,而不是在Langchain中进行嵌入。 示例: .. code-block:: python from langchain_community.vectorstores.upstash import UpstashVectorStore from langchain_community.embeddings.openai import OpenAIEmbeddings embeddings = OpenAIEmbeddings() vectorstore = UpstashVectorStore( embedding=embeddings, index_url="...", index_token="..." ) # 使用现有索引 from upstash_vector import Index index = Index(url="...", token="...") vectorstore = UpstashVectorStore( embedding=embeddings, index=index ) """ try: from upstash_vector import AsyncIndex, Index except ImportError: raise ImportError( "Could not import upstash_vector python package. " "Please install it with `pip install upstash_vector`." ) if index: if not isinstance(index, Index): raise ValueError( "Passed index object should be an " "instance of upstash_vector.Index, " f"got {type(index)}" ) self._index = index logger.info("Using the index passed as parameter") if async_index: if not isinstance(async_index, AsyncIndex): raise ValueError( "Passed index object should be an " "instance of upstash_vector.AsyncIndex, " f"got {type(async_index)}" ) self._async_index = async_index logger.info("Using the async index passed as parameter") if index_url and index_token: self._index = Index(url=index_url, token=index_token) self._async_index = AsyncIndex(url=index_url, token=index_token) logger.info("Created index from the index_url and index_token parameters") elif not index and not async_index: self._index = Index.from_env() self._async_index = AsyncIndex.from_env() logger.info("Created index using environment variables") self._embeddings = embedding self._text_key = text_key
@property def embeddings(self) -> Optional[Union[Embeddings, bool]]: # type: ignore """如果可用,访问查询嵌入对象。""" return self._embeddings def _embed_documents( self, texts: Iterable[str] ) -> Union[List[List[float]], List[str]]: """使用嵌入对象嵌入字符串""" if not self._embeddings: raise ValueError( "No embeddings object provided. " "Pass an embeddings object to the constructor." ) if isinstance(self._embeddings, Embeddings): return self._embeddings.embed_documents(list(texts)) # using self._embeddings is True, Upstash embeddings will be used. # returning list of text as List[str] return list(texts) def _embed_query(self, text: str) -> Union[List[float], str]: """使用嵌入对象嵌入查询文本。""" if not self._embeddings: raise ValueError( "No embeddings object provided. " "Pass an embeddings object to the constructor." ) if isinstance(self._embeddings, Embeddings): return self._embeddings.embed_query(text) # using self._embeddings is True, Upstash embeddings will be used. # returning query as it is return text
[docs] def add_documents( self, documents: List[Document], ids: Optional[List[str]] = None, batch_size: int = 32, embedding_chunk_size: int = 1000, **kwargs: Any, ) -> List[str]: """为文档获取嵌入并将其添加到向量存储中。 文档以大小为`embedding_chunk_size`的批次发送到嵌入对象。 然后以大小为`batch_size`的批次将嵌入数据插入到向量存储中。 参数: documents: 要添加到向量存储中的文档的可迭代对象。 batch_size: 在插入嵌入数据时要使用的批处理大小。 Upstash支持每个请求最多1000个向量。 embedding_batch_size: 在嵌入文本时要使用的块大小。 返回: 将文本添加到向量存储中的ID列表。 """ texts = [doc.page_content for doc in documents] metadatas = [doc.metadata for doc in documents] return self.add_texts( texts, metadatas=metadatas, batch_size=batch_size, ids=ids, embedding_chunk_size=embedding_chunk_size, **kwargs, )
[docs] async def aadd_documents( self, documents: Iterable[Document], ids: Optional[List[str]] = None, batch_size: int = 32, embedding_chunk_size: int = 1000, **kwargs: Any, ) -> List[str]: """为文档获取嵌入并将其添加到向量存储中。 文档以大小为`embedding_chunk_size`的批次发送到嵌入对象。 然后以大小为`batch_size`的批次将嵌入数据插入到向量存储中。 参数: documents: 要添加到向量存储中的文档的可迭代对象。 batch_size: 在插入嵌入数据时要使用的批处理大小。 Upstash支持每个请求最多1000个向量。 embedding_batch_size: 在嵌入文本时要使用的块大小。 返回: 将文本添加到向量存储中的ID列表。 """ texts = [doc.page_content for doc in documents] metadatas = [doc.metadata for doc in documents] return await self.aadd_texts( texts, metadatas=metadatas, ids=ids, batch_size=batch_size, embedding_chunk_size=embedding_chunk_size, **kwargs, )
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, batch_size: int = 32, embedding_chunk_size: int = 1000, **kwargs: Any, ) -> List[str]: """获取文本的嵌入并将其添加到向量存储中。 文本以大小为`embedding_chunk_size`的批次发送到嵌入对象中。 然后以大小为`batch_size`的批次将嵌入数据更新到向量存储中。 参数: texts: 要添加到向量存储中的字符串的可迭代对象。 metadatas: 与文本相关的元数据的可选列表。 ids: 与文本关联的可选id列表。 batch_size: 更新嵌入数据时要使用的批次大小。 Upstash支持每个请求最多1000个向量。 embedding_batch_size: 嵌入文本时要使用的块大小。 返回: 将文本添加到向量存储中的id列表。 """ texts = list(texts) ids = ids or [str(uuid.uuid4()) for _ in texts] # Copy metadatas to avoid modifying the original documents if metadatas: metadatas = [m.copy() for m in metadatas] else: metadatas = [{} for _ in texts] # Add text to metadata for metadata, text in zip(metadatas, texts): metadata[self._text_key] = text for i in range(0, len(texts), embedding_chunk_size): chunk_texts = texts[i : i + embedding_chunk_size] chunk_ids = ids[i : i + embedding_chunk_size] chunk_metadatas = metadatas[i : i + embedding_chunk_size] embeddings = self._embed_documents(chunk_texts) for batch in batch_iterate( batch_size, zip(chunk_ids, embeddings, chunk_metadatas) ): self._index.upsert(vectors=batch, **kwargs) return ids
[docs] async def aadd_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, batch_size: int = 32, embedding_chunk_size: int = 1000, **kwargs: Any, ) -> List[str]: """获取文本的嵌入并将其添加到向量存储中。 文本以大小为`embedding_chunk_size`的批次发送到嵌入对象中。 然后以大小为`batch_size`的批次将嵌入数据更新到向量存储中。 参数: texts: 要添加到向量存储中的字符串的可迭代对象。 metadatas: 与文本相关的元数据的可选列表。 ids: 与文本关联的可选id列表。 batch_size: 更新嵌入数据时要使用的批次大小。 Upstash支持每个请求最多1000个向量。 embedding_batch_size: 嵌入文本时要使用的块大小。 返回: 将文本添加到向量存储中的id列表。 """ texts = list(texts) ids = ids or [str(uuid.uuid4()) for _ in texts] # Copy metadatas to avoid modifying the original documents if metadatas: metadatas = [m.copy() for m in metadatas] else: metadatas = [{} for _ in texts] # Add text to metadata for metadata, text in zip(metadatas, texts): metadata[self._text_key] = text for i in range(0, len(texts), embedding_chunk_size): chunk_texts = texts[i : i + embedding_chunk_size] chunk_ids = ids[i : i + embedding_chunk_size] chunk_metadatas = metadatas[i : i + embedding_chunk_size] embeddings = self._embed_documents(chunk_texts) for batch in batch_iterate( batch_size, zip(chunk_ids, embeddings, chunk_metadatas) ): await self._async_index.upsert(vectors=batch, **kwargs) return ids
[docs] def similarity_search_with_score( self, query: str, k: int = 4, filter: Optional[str] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """检索与查询最相似的文本,并将结果转换为`Document`对象。 参数: query:要查找类似文档的文本。 k:要返回的文档数量。默认为4。 filter:可选的元数据过滤器,以str格式表示。 返回: 与查询最相似的文档列表,以及每个文档的得分。 """ return self.similarity_search_by_vector_with_score( self._embed_query(query), k=k, filter=filter, **kwargs )
[docs] async def asimilarity_search_with_score( self, query: str, k: int = 4, filter: Optional[str] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """检索与查询最相似的文本,并将结果转换为`Document`对象。 参数: query:要查找类似文档的文本。 k:要返回的文档数量。默认为4。 filter:可选的元数据过滤器,以str格式表示。 返回: 与查询最相似的文档列表,以及每个文档的得分。 """ return await self.asimilarity_search_by_vector_with_score( self._embed_query(query), k=k, filter=filter, **kwargs )
def _process_results(self, results: List) -> List[Tuple[Document, float]]: docs = [] for res in results: metadata = res.metadata if metadata and self._text_key in metadata: text = metadata.pop(self._text_key) doc = Document(page_content=text, metadata=metadata) docs.append((doc, res.score)) else: logger.warning( f"Found document with no `{self._text_key}` key. Skipping." ) return docs
[docs] def similarity_search_by_vector_with_score( self, embedding: Union[List[float], str], k: int = 4, filter: Optional[str] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与给定嵌入最接近的文本""" filter = filter or "" if isinstance(embedding, str): results = self._index.query( data=embedding, top_k=k, include_metadata=True, filter=filter, **kwargs ) else: results = self._index.query( vector=embedding, top_k=k, include_metadata=True, filter=filter, **kwargs, ) return self._process_results(results)
[docs] async def asimilarity_search_by_vector_with_score( self, embedding: Union[List[float], str], k: int = 4, filter: Optional[str] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与给定嵌入最接近的文本""" filter = filter or "" if isinstance(embedding, str): results = await self._async_index.query( data=embedding, top_k=k, include_metadata=True, filter=filter, **kwargs ) else: results = await self._async_index.query( vector=embedding, top_k=k, include_metadata=True, filter=filter, **kwargs, ) return self._process_results(results)
[docs] def similarity_search_by_vector( self, embedding: Union[List[float], str], k: int = 4, filter: Optional[str] = None, **kwargs: Any, ) -> List[Document]: """返回与给定嵌入最接近的文档。 参数: embedding:要查找类似文档的嵌入。 k:要返回的文档数量。默认为4。 filter:可选的元数据过滤器,以str格式表示。 返回: 与查询最相似的文档列表。 """ docs_and_scores = self.similarity_search_by_vector_with_score( embedding, k=k, filter=filter, **kwargs ) return [doc for doc, _ in docs_and_scores]
[docs] async def asimilarity_search_by_vector( self, embedding: Union[List[float], str], k: int = 4, filter: Optional[str] = None, **kwargs: Any, ) -> List[Document]: """返回与给定嵌入最接近的文档。 参数: embedding:要查找类似文档的嵌入。 k:要返回的文档数量。默认为4。 filter:可选的元数据过滤器,以str格式表示。 返回: 与查询最相似的文档列表。 """ docs_and_scores = await self.asimilarity_search_by_vector_with_score( embedding, k=k, filter=filter, **kwargs ) return [doc for doc, _ in docs_and_scores]
def _similarity_search_with_relevance_scores( self, query: str, k: int = 4, filter: Optional[str] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """ 由于Upstash始终返回相关性分数,因此使用默认实现。 """ return self.similarity_search_with_score(query, k=k, filter=filter, **kwargs) async def _asimilarity_search_with_relevance_scores( self, query: str, k: int = 4, filter: Optional[str] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """ 由于Upstash始终返回相关性分数,因此使用默认实现。 """ return await self.asimilarity_search_with_score( query, k=k, filter=filter, **kwargs )
[docs] def max_marginal_relevance_search_by_vector( self, embedding: Union[List[float], str], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[str] = None, **kwargs: Any, ) -> List[Document]: """返回使用最大边际相关性选择的文档。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: embedding: 查找与之相似文档的嵌入。 k: 要返回的文档数量。默认为4。 fetch_k: 要获取的文档数量,以传递给MMR算法。 lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度,0表示最大多样性,1表示最小多样性。默认为0.5。 filter: str格式的可选元数据过滤器 返回: 通过最大边际相关性选择的文档列表。 """ assert isinstance(self.embeddings, Embeddings) if isinstance(embedding, str): results = self._index.query( data=embedding, top_k=fetch_k, include_vectors=True, include_metadata=True, filter=filter or "", **kwargs, ) else: results = self._index.query( vector=embedding, top_k=fetch_k, include_vectors=True, include_metadata=True, filter=filter or "", **kwargs, ) mmr_selected = maximal_marginal_relevance( np.array([embedding], dtype=np.float32), [item.vector for item in results], k=k, lambda_mult=lambda_mult, ) selected = [results[i].metadata for i in mmr_selected] return [ Document(page_content=metadata.pop((self._text_key)), metadata=metadata) # type: ignore for metadata in selected ]
[docs] async def amax_marginal_relevance_search_by_vector( self, embedding: Union[List[float], str], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[str] = None, **kwargs: Any, ) -> List[Document]: """返回使用最大边际相关性选择的文档。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: embedding: 查找与之相似文档的嵌入。 k: 要返回的文档数量。默认为4。 fetch_k: 要获取的文档数量,以传递给MMR算法。 lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度,0表示最大多样性,1表示最小多样性。默认为0.5。 filter: str格式的可选元数据过滤器 返回: 通过最大边际相关性选择的文档列表。 """ assert isinstance(self.embeddings, Embeddings) if isinstance(embedding, str): results = await self._async_index.query( data=embedding, top_k=fetch_k, include_vectors=True, include_metadata=True, filter=filter or "", **kwargs, ) else: results = await self._async_index.query( vector=embedding, top_k=fetch_k, include_vectors=True, include_metadata=True, filter=filter or "", **kwargs, ) mmr_selected = maximal_marginal_relevance( np.array([embedding], dtype=np.float32), [item.vector for item in results], k=k, lambda_mult=lambda_mult, ) selected = [results[i].metadata for i in mmr_selected] return [ Document(page_content=metadata.pop((self._text_key)), metadata=metadata) # type: ignore for metadata in selected ]
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, embedding_chunk_size: int = 1000, batch_size: int = 32, text_key: str = "text", index: Optional[Index] = None, async_index: Optional[AsyncIndex] = None, index_url: Optional[str] = None, index_token: Optional[str] = None, **kwargs: Any, ) -> UpstashVectorStore: """从文本列表创建一个新的UpstashVectorStore。 示例: .. code-block:: python from langchain_community.vectorstores.upstash import UpstashVectorStore from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() vector_store = UpstashVectorStore.from_texts( texts, embeddings, ) """ vector_store = cls( embedding=embedding, text_key=text_key, index=index, async_index=async_index, index_url=index_url, index_token=index_token, **kwargs, ) vector_store.add_texts( texts, metadatas=metadatas, ids=ids, batch_size=batch_size, embedding_chunk_size=embedding_chunk_size, ) return vector_store
[docs] @classmethod async def afrom_texts( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, embedding_chunk_size: int = 1000, batch_size: int = 32, text_key: str = "text", index: Optional[Index] = None, async_index: Optional[AsyncIndex] = None, index_url: Optional[str] = None, index_token: Optional[str] = None, **kwargs: Any, ) -> UpstashVectorStore: """从文本列表创建一个新的UpstashVectorStore。 示例: .. code-block:: python from langchain_community.vectorstores.upstash import UpstashVectorStore from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() vector_store = UpstashVectorStore.from_texts( texts, embeddings, ) """ vector_store = cls( embedding=embedding, text_key=text_key, index=index, async_index=async_index, index_url=index_url, index_token=index_token, **kwargs, ) await vector_store.aadd_texts( texts, metadatas=metadatas, ids=ids, batch_size=batch_size, embedding_chunk_size=embedding_chunk_size, ) return vector_store
[docs] def delete( self, ids: Optional[List[str]] = None, delete_all: Optional[bool] = None, batch_size: Optional[int] = 1000, **kwargs: Any, ) -> None: """根据向量ID删除 参数: ids:要删除的ID列表。 delete_all:删除索引中的所有向量。 batch_size:删除嵌入时要使用的批量大小。 Upstash每个请求最多支持1000个删除。 """ if delete_all: self._index.reset() elif ids is not None: for batch in batch_iterate(batch_size, ids): self._index.delete(ids=batch) else: raise ValueError("Either ids or delete_all should be provided") return None
[docs] async def adelete( self, ids: Optional[List[str]] = None, delete_all: Optional[bool] = None, batch_size: Optional[int] = 1000, **kwargs: Any, ) -> None: """根据向量ID删除 参数: ids:要删除的ID列表。 delete_all:删除索引中的所有向量。 batch_size:删除嵌入时要使用的批量大小。 Upstash每个请求最多支持1000个删除。 """ if delete_all: await self._async_index.reset() elif ids is not None: for batch in batch_iterate(batch_size, ids): await self._async_index.delete(ids=batch) else: raise ValueError("Either ids or delete_all should be provided") return None
[docs] def info(self) -> InfoResult: """获取有关索引的统计信息。 返回: - 向量的总数 - 等待被索引的向量总数 - 磁盘上索引的总大小(以字节为单位) - 索引的维度计数 - 为索引选择的相似性函数 """ return self._index.info()
[docs] async def ainfo(self) -> InfoResult: """获取有关索引的统计信息。 返回: - 向量的总数 - 等待被索引的向量总数 - 磁盘上索引的总大小(以字节为单位) - 索引的维度计数 - 为索引选择的相似性函数 """ return await self._async_index.info()