Source code for langchain_community.vectorstores.pinecone

from __future__ import annotations

import logging
import os
import uuid
import warnings
from typing import TYPE_CHECKING, Any, Callable, Iterable, List, Optional, Tuple, Union

import numpy as np
from langchain_core._api.deprecation import deprecated
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.utils.iter import batch_iterate
from langchain_core.vectorstores import VectorStore
from packaging import version

from langchain_community.vectorstores.utils import (
    DistanceStrategy,
    maximal_marginal_relevance,
)

if TYPE_CHECKING:
    from pinecone import Index

logger = logging.getLogger(__name__)


def _import_pinecone() -> Any:
    try:
        import pinecone
    except ImportError as e:
        raise ImportError(
            "Could not import pinecone python package. "
            "Please install it with `pip install pinecone-client`."
        ) from e
    return pinecone


def _is_pinecone_v3() -> bool:
    pinecone = _import_pinecone()
    pinecone_client_version = pinecone.__version__
    return version.parse(pinecone_client_version) >= version.parse("3.0.0.dev")


[docs]@deprecated( since="0.0.18", removal="0.3.0", alternative_import="langchain_pinecone.Pinecone" ) class Pinecone(VectorStore): """```python # Pinecone向量存储。 # 要使用,应安装``pinecone-client`` Python包。 # 此版本的Pinecone已弃用。请改用`langchain_pinecone.Pinecone`。 ```"""
[docs] def __init__( self, index: Any, embedding: Union[Embeddings, Callable], text_key: str, namespace: Optional[str] = None, distance_strategy: Optional[DistanceStrategy] = DistanceStrategy.COSINE, ): """使用Pinecone客户端进行初始化。""" pinecone = _import_pinecone() if not isinstance(embedding, Embeddings): warnings.warn( "Passing in `embedding` as a Callable is deprecated. Please pass in an" " Embeddings object instead." ) if not isinstance(index, pinecone.Index): raise ValueError( f"client should be an instance of pinecone.Index, " f"got {type(index)}" ) self._index = index self._embedding = embedding self._text_key = text_key self._namespace = namespace self.distance_strategy = distance_strategy
@property def embeddings(self) -> Optional[Embeddings]: """如果可用,访问查询嵌入对象。""" if isinstance(self._embedding, Embeddings): return self._embedding return None def _embed_documents(self, texts: Iterable[str]) -> List[List[float]]: """嵌入搜索文档。""" if isinstance(self._embedding, Embeddings): return self._embedding.embed_documents(list(texts)) return [self._embedding(t) for t in texts] def _embed_query(self, text: str) -> List[float]: """嵌入查询文本。""" if isinstance(self._embedding, Embeddings): return self._embedding.embed_query(text) return self._embedding(text)
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, namespace: Optional[str] = None, batch_size: int = 32, embedding_chunk_size: int = 1000, **kwargs: Any, ) -> List[str]: """运行更多文本通过嵌入并添加到向量存储中。 通过对嵌入进行分块并进行upsert优化。 这样做是为了避免内存问题并优化使用基于HTTP的嵌入。 对于OpenAI嵌入,构建pinecone.Index时使用pool_threads>4, embedding_chunk_size>1000和batch_size~64可获得最佳性能。 参数: texts:要添加到向量存储中的字符串的可迭代对象。 metadatas:与文本相关联的元数据的可选列表。 ids:要与文本关联的可选id列表。 namespace:要将文本添加到的可选pinecone命名空间。 batch_size:向向量存储添加文本时要使用的批量大小。 embedding_chunk_size:嵌入文本时要使用的块大小。 返回: 将文本添加到向量存储中的id列表。 """ if namespace is None: namespace = self._namespace texts = list(texts) ids = ids or [str(uuid.uuid4()) for _ in texts] metadatas = metadatas or [{} for _ in texts] for metadata, text in zip(metadatas, texts): metadata[self._text_key] = text # For loops to avoid memory issues and optimize when using HTTP based embeddings # The first loop runs the embeddings, it benefits when using OpenAI embeddings # The second loops runs the pinecone upsert asynchronously. for i in range(0, len(texts), embedding_chunk_size): chunk_texts = texts[i : i + embedding_chunk_size] chunk_ids = ids[i : i + embedding_chunk_size] chunk_metadatas = metadatas[i : i + embedding_chunk_size] embeddings = self._embed_documents(chunk_texts) async_res = [ self._index.upsert( vectors=batch, namespace=namespace, async_req=True, **kwargs, ) for batch in batch_iterate( batch_size, zip(chunk_ids, embeddings, chunk_metadatas) ) ] [res.get() for res in async_res] return ids
[docs] def similarity_search_with_score( self, query: str, k: int = 4, filter: Optional[dict] = None, namespace: Optional[str] = None, ) -> List[Tuple[Document, float]]: """返回与查询最相似的松果文档,以及分数。 参数: query:要查找类似文档的文本。 k:要返回的文档数量。默认为4。 filter:要在元数据上过滤的参数字典 namespace:要搜索的命名空间。默认将在''命名空间中搜索。 返回: 返回与查询最相似的文档列表,以及每个文档的分数。 """ return self.similarity_search_by_vector_with_score( self._embed_query(query), k=k, filter=filter, namespace=namespace )
[docs] def similarity_search_by_vector_with_score( self, embedding: List[float], *, k: int = 4, filter: Optional[dict] = None, namespace: Optional[str] = None, ) -> List[Tuple[Document, float]]: """返回与嵌入最相似的松果文档,以及相似度分数。""" if namespace is None: namespace = self._namespace docs = [] results = self._index.query( vector=[embedding], top_k=k, include_metadata=True, namespace=namespace, filter=filter, ) for res in results["matches"]: metadata = res["metadata"] if self._text_key in metadata: text = metadata.pop(self._text_key) score = res["score"] docs.append((Document(page_content=text, metadata=metadata), score)) else: logger.warning( f"Found document with no `{self._text_key}` key. Skipping." ) return docs
def _select_relevance_score_fn(self) -> Callable[[float], float]: """“正确”的相关性函数可能会有所不同,取决于一些因素,包括: - 向量存储中使用的距离/相似度度量 - 嵌入的规模(OpenAI的是单位规范化的。许多其他嵌入不是!) - 嵌入的维度 - 等等。 """ if self.distance_strategy == DistanceStrategy.COSINE: return self._cosine_relevance_score_fn elif self.distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT: return self._max_inner_product_relevance_score_fn elif self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE: return self._euclidean_relevance_score_fn else: raise ValueError( "Unknown distance strategy, must be cosine, max_inner_product " "(dot product), or euclidean" ) @staticmethod def _cosine_relevance_score_fn(score: float) -> float: """Pinecone返回在[-1,1]之间的余弦相似度分数。""" return (score + 1) / 2
[docs] def max_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[dict] = None, namespace: Optional[str] = None, **kwargs: Any, ) -> List[Document]: """返回使用最大边际相关性选择的文档。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: embedding:要查找相似文档的嵌入。 k:要返回的文档数量。默认为4。 fetch_k:要获取并传递给MMR算法的文档数量。 lambda_mult:0到1之间的数字,确定结果之间多样性的程度, 0对应最大多样性,1对应最小多样性。 默认为0.5。 返回: 由最大边际相关性选择的文档列表。 """ if namespace is None: namespace = self._namespace results = self._index.query( vector=[embedding], top_k=fetch_k, include_values=True, include_metadata=True, namespace=namespace, filter=filter, ) mmr_selected = maximal_marginal_relevance( np.array([embedding], dtype=np.float32), [item["values"] for item in results["matches"]], k=k, lambda_mult=lambda_mult, ) selected = [results["matches"][i]["metadata"] for i in mmr_selected] return [ Document(page_content=metadata.pop((self._text_key)), metadata=metadata) for metadata in selected ]
[docs] @classmethod def get_pinecone_index( cls, index_name: Optional[str], pool_threads: int = 4, ) -> Index: """返回一个Pinecone Index实例。 参数: index_name: 要使用的索引名称。 pool_threads: 用于索引更新的线程数。 返回: Pinecone Index实例。 """ pinecone = _import_pinecone() if _is_pinecone_v3(): pinecone_instance = pinecone.Pinecone( api_key=os.environ.get("PINECONE_API_KEY"), pool_threads=pool_threads ) indexes = pinecone_instance.list_indexes() index_names = [i.name for i in indexes.index_list["indexes"]] else: index_names = pinecone.list_indexes() if index_name in index_names: index = ( pinecone_instance.Index(index_name) if _is_pinecone_v3() else pinecone.Index(index_name, pool_threads=pool_threads) ) elif len(index_names) == 0: raise ValueError( "No active indexes found in your Pinecone project, " "are you sure you're using the right Pinecone API key and Environment? " "Please double check your Pinecone dashboard." ) else: raise ValueError( f"Index '{index_name}' not found in your Pinecone project. " f"Did you mean one of the following indexes: {', '.join(index_names)}" ) return index
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, batch_size: int = 32, text_key: str = "text", namespace: Optional[str] = None, index_name: Optional[str] = None, upsert_kwargs: Optional[dict] = None, pool_threads: int = 4, embeddings_chunk_size: int = 1000, **kwargs: Any, ) -> Pinecone: """已弃用:请使用langchain_pinecone.PineconeVectorStore.from_texts代替: 从原始文档构建Pinecone包装器。 这是一个用户友好的接口,可以: 1. 嵌入文档。 2. 将文档添加到提供的Pinecone索引中。 这旨在是一个快速入门的方式。 `pool_threads`会影响upsert操作的速度。 示例: .. code-block:: python from langchain_pinecone import PineconeVectorStore from langchain_openai import OpenAIEmbeddings embeddings = OpenAIEmbeddings() index_name = "my-index" namespace = "my-namespace" vectorstore = Pinecone( index_name=index_name, embedding=embedding, namespace=namespace, ) """ pinecone_index = cls.get_pinecone_index(index_name, pool_threads) pinecone = cls(pinecone_index, embedding, text_key, namespace, **kwargs) pinecone.add_texts( texts, metadatas=metadatas, ids=ids, namespace=namespace, batch_size=batch_size, embedding_chunk_size=embeddings_chunk_size, **(upsert_kwargs or {}), ) return pinecone
[docs] @classmethod def from_existing_index( cls, index_name: str, embedding: Embeddings, text_key: str = "text", namespace: Optional[str] = None, pool_threads: int = 4, ) -> Pinecone: """从索引名称加载松果向量存储。""" pinecone_index = cls.get_pinecone_index(index_name, pool_threads) return cls(pinecone_index, embedding, text_key, namespace)
[docs] def delete( self, ids: Optional[List[str]] = None, delete_all: Optional[bool] = None, namespace: Optional[str] = None, filter: Optional[dict] = None, **kwargs: Any, ) -> None: """根据向量ID或筛选条件删除。 参数: ids:要删除的ID列表。 filter:用于筛选要删除的向量的条件字典。 """ if namespace is None: namespace = self._namespace if delete_all: self._index.delete(delete_all=True, namespace=namespace, **kwargs) elif ids is not None: chunk_size = 1000 for i in range(0, len(ids), chunk_size): chunk = ids[i : i + chunk_size] self._index.delete(ids=chunk, namespace=namespace, **kwargs) elif filter is not None: self._index.delete(filter=filter, namespace=namespace, **kwargs) else: raise ValueError("Either ids, delete_all, or filter must be provided.") return None