from __future__ import annotations
import logging
import os
import uuid
import warnings
from typing import TYPE_CHECKING, Any, Callable, Iterable, List, Optional, Tuple, Union
import numpy as np
from langchain_core._api.deprecation import deprecated
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.utils.iter import batch_iterate
from langchain_core.vectorstores import VectorStore
from packaging import version
from langchain_community.vectorstores.utils import (
DistanceStrategy,
maximal_marginal_relevance,
)
if TYPE_CHECKING:
from pinecone import Index
logger = logging.getLogger(__name__)
def _import_pinecone() -> Any:
try:
import pinecone
except ImportError as e:
raise ImportError(
"Could not import pinecone python package. "
"Please install it with `pip install pinecone-client`."
) from e
return pinecone
def _is_pinecone_v3() -> bool:
pinecone = _import_pinecone()
pinecone_client_version = pinecone.__version__
return version.parse(pinecone_client_version) >= version.parse("3.0.0.dev")
[docs]@deprecated(
since="0.0.18", removal="0.3.0", alternative_import="langchain_pinecone.Pinecone"
)
class Pinecone(VectorStore):
"""```python
# Pinecone向量存储。
# 要使用,应安装``pinecone-client`` Python包。
# 此版本的Pinecone已弃用。请改用`langchain_pinecone.Pinecone`。
```"""
[docs] def __init__(
self,
index: Any,
embedding: Union[Embeddings, Callable],
text_key: str,
namespace: Optional[str] = None,
distance_strategy: Optional[DistanceStrategy] = DistanceStrategy.COSINE,
):
"""使用Pinecone客户端进行初始化。"""
pinecone = _import_pinecone()
if not isinstance(embedding, Embeddings):
warnings.warn(
"Passing in `embedding` as a Callable is deprecated. Please pass in an"
" Embeddings object instead."
)
if not isinstance(index, pinecone.Index):
raise ValueError(
f"client should be an instance of pinecone.Index, " f"got {type(index)}"
)
self._index = index
self._embedding = embedding
self._text_key = text_key
self._namespace = namespace
self.distance_strategy = distance_strategy
@property
def embeddings(self) -> Optional[Embeddings]:
"""如果可用,访问查询嵌入对象。"""
if isinstance(self._embedding, Embeddings):
return self._embedding
return None
def _embed_documents(self, texts: Iterable[str]) -> List[List[float]]:
"""嵌入搜索文档。"""
if isinstance(self._embedding, Embeddings):
return self._embedding.embed_documents(list(texts))
return [self._embedding(t) for t in texts]
def _embed_query(self, text: str) -> List[float]:
"""嵌入查询文本。"""
if isinstance(self._embedding, Embeddings):
return self._embedding.embed_query(text)
return self._embedding(text)
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
namespace: Optional[str] = None,
batch_size: int = 32,
embedding_chunk_size: int = 1000,
**kwargs: Any,
) -> List[str]:
"""运行更多文本通过嵌入并添加到向量存储中。
通过对嵌入进行分块并进行upsert优化。
这样做是为了避免内存问题并优化使用基于HTTP的嵌入。
对于OpenAI嵌入,构建pinecone.Index时使用pool_threads>4,
embedding_chunk_size>1000和batch_size~64可获得最佳性能。
参数:
texts:要添加到向量存储中的字符串的可迭代对象。
metadatas:与文本相关联的元数据的可选列表。
ids:要与文本关联的可选id列表。
namespace:要将文本添加到的可选pinecone命名空间。
batch_size:向向量存储添加文本时要使用的批量大小。
embedding_chunk_size:嵌入文本时要使用的块大小。
返回:
将文本添加到向量存储中的id列表。
"""
if namespace is None:
namespace = self._namespace
texts = list(texts)
ids = ids or [str(uuid.uuid4()) for _ in texts]
metadatas = metadatas or [{} for _ in texts]
for metadata, text in zip(metadatas, texts):
metadata[self._text_key] = text
# For loops to avoid memory issues and optimize when using HTTP based embeddings
# The first loop runs the embeddings, it benefits when using OpenAI embeddings
# The second loops runs the pinecone upsert asynchronously.
for i in range(0, len(texts), embedding_chunk_size):
chunk_texts = texts[i : i + embedding_chunk_size]
chunk_ids = ids[i : i + embedding_chunk_size]
chunk_metadatas = metadatas[i : i + embedding_chunk_size]
embeddings = self._embed_documents(chunk_texts)
async_res = [
self._index.upsert(
vectors=batch,
namespace=namespace,
async_req=True,
**kwargs,
)
for batch in batch_iterate(
batch_size, zip(chunk_ids, embeddings, chunk_metadatas)
)
]
[res.get() for res in async_res]
return ids
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[dict] = None,
namespace: Optional[str] = None,
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的松果文档,以及分数。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
filter:要在元数据上过滤的参数字典
namespace:要搜索的命名空间。默认将在''命名空间中搜索。
返回:
返回与查询最相似的文档列表,以及每个文档的分数。
"""
return self.similarity_search_by_vector_with_score(
self._embed_query(query), k=k, filter=filter, namespace=namespace
)
[docs] def similarity_search_by_vector_with_score(
self,
embedding: List[float],
*,
k: int = 4,
filter: Optional[dict] = None,
namespace: Optional[str] = None,
) -> List[Tuple[Document, float]]:
"""返回与嵌入最相似的松果文档,以及相似度分数。"""
if namespace is None:
namespace = self._namespace
docs = []
results = self._index.query(
vector=[embedding],
top_k=k,
include_metadata=True,
namespace=namespace,
filter=filter,
)
for res in results["matches"]:
metadata = res["metadata"]
if self._text_key in metadata:
text = metadata.pop(self._text_key)
score = res["score"]
docs.append((Document(page_content=text, metadata=metadata), score))
else:
logger.warning(
f"Found document with no `{self._text_key}` key. Skipping."
)
return docs
[docs] def similarity_search(
self,
query: str,
k: int = 4,
filter: Optional[dict] = None,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与查询最相似的松果文档。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
filter:要在元数据上过滤的参数字典
namespace:要搜索的命名空间。默认将在''命名空间中搜索。
返回:
与查询最相似的文档列表,以及每个文档的得分。
"""
docs_and_scores = self.similarity_search_with_score(
query, k=k, filter=filter, namespace=namespace, **kwargs
)
return [doc for doc, _ in docs_and_scores]
def _select_relevance_score_fn(self) -> Callable[[float], float]:
"""“正确”的相关性函数可能会有所不同,取决于一些因素,包括:
- 向量存储中使用的距离/相似度度量
- 嵌入的规模(OpenAI的是单位规范化的。许多其他嵌入不是!)
- 嵌入的维度
- 等等。
"""
if self.distance_strategy == DistanceStrategy.COSINE:
return self._cosine_relevance_score_fn
elif self.distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
return self._max_inner_product_relevance_score_fn
elif self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
return self._euclidean_relevance_score_fn
else:
raise ValueError(
"Unknown distance strategy, must be cosine, max_inner_product "
"(dot product), or euclidean"
)
@staticmethod
def _cosine_relevance_score_fn(score: float) -> float:
"""Pinecone返回在[-1,1]之间的余弦相似度分数。"""
return (score + 1) / 2
[docs] def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[dict] = None,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding:要查找相似文档的嵌入。
k:要返回的文档数量。默认为4。
fetch_k:要获取并传递给MMR算法的文档数量。
lambda_mult:0到1之间的数字,确定结果之间多样性的程度,
0对应最大多样性,1对应最小多样性。
默认为0.5。
返回:
由最大边际相关性选择的文档列表。
"""
if namespace is None:
namespace = self._namespace
results = self._index.query(
vector=[embedding],
top_k=fetch_k,
include_values=True,
include_metadata=True,
namespace=namespace,
filter=filter,
)
mmr_selected = maximal_marginal_relevance(
np.array([embedding], dtype=np.float32),
[item["values"] for item in results["matches"]],
k=k,
lambda_mult=lambda_mult,
)
selected = [results["matches"][i]["metadata"] for i in mmr_selected]
return [
Document(page_content=metadata.pop((self._text_key)), metadata=metadata)
for metadata in selected
]
[docs] def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[dict] = None,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
query:要查找相似文档的文本。
k:要返回的文档数量。默认为4。
fetch_k:要获取以传递给MMR算法的文档数量。
lambda_mult:0到1之间的数字,确定结果之间多样性的程度,其中0对应最大多样性,1对应最小多样性。默认为0.5。
返回:
由最大边际相关性选择的文档列表。
"""
embedding = self._embed_query(query)
return self.max_marginal_relevance_search_by_vector(
embedding, k, fetch_k, lambda_mult, filter, namespace
)
[docs] @classmethod
def get_pinecone_index(
cls,
index_name: Optional[str],
pool_threads: int = 4,
) -> Index:
"""返回一个Pinecone Index实例。
参数:
index_name: 要使用的索引名称。
pool_threads: 用于索引更新的线程数。
返回:
Pinecone Index实例。
"""
pinecone = _import_pinecone()
if _is_pinecone_v3():
pinecone_instance = pinecone.Pinecone(
api_key=os.environ.get("PINECONE_API_KEY"), pool_threads=pool_threads
)
indexes = pinecone_instance.list_indexes()
index_names = [i.name for i in indexes.index_list["indexes"]]
else:
index_names = pinecone.list_indexes()
if index_name in index_names:
index = (
pinecone_instance.Index(index_name)
if _is_pinecone_v3()
else pinecone.Index(index_name, pool_threads=pool_threads)
)
elif len(index_names) == 0:
raise ValueError(
"No active indexes found in your Pinecone project, "
"are you sure you're using the right Pinecone API key and Environment? "
"Please double check your Pinecone dashboard."
)
else:
raise ValueError(
f"Index '{index_name}' not found in your Pinecone project. "
f"Did you mean one of the following indexes: {', '.join(index_names)}"
)
return index
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
batch_size: int = 32,
text_key: str = "text",
namespace: Optional[str] = None,
index_name: Optional[str] = None,
upsert_kwargs: Optional[dict] = None,
pool_threads: int = 4,
embeddings_chunk_size: int = 1000,
**kwargs: Any,
) -> Pinecone:
"""已弃用:请使用langchain_pinecone.PineconeVectorStore.from_texts代替:
从原始文档构建Pinecone包装器。
这是一个用户友好的接口,可以:
1. 嵌入文档。
2. 将文档添加到提供的Pinecone索引中。
这旨在是一个快速入门的方式。
`pool_threads`会影响upsert操作的速度。
示例:
.. code-block:: python
from langchain_pinecone import PineconeVectorStore
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
index_name = "my-index"
namespace = "my-namespace"
vectorstore = Pinecone(
index_name=index_name,
embedding=embedding,
namespace=namespace,
)
"""
pinecone_index = cls.get_pinecone_index(index_name, pool_threads)
pinecone = cls(pinecone_index, embedding, text_key, namespace, **kwargs)
pinecone.add_texts(
texts,
metadatas=metadatas,
ids=ids,
namespace=namespace,
batch_size=batch_size,
embedding_chunk_size=embeddings_chunk_size,
**(upsert_kwargs or {}),
)
return pinecone
[docs] @classmethod
def from_existing_index(
cls,
index_name: str,
embedding: Embeddings,
text_key: str = "text",
namespace: Optional[str] = None,
pool_threads: int = 4,
) -> Pinecone:
"""从索引名称加载松果向量存储。"""
pinecone_index = cls.get_pinecone_index(index_name, pool_threads)
return cls(pinecone_index, embedding, text_key, namespace)
[docs] def delete(
self,
ids: Optional[List[str]] = None,
delete_all: Optional[bool] = None,
namespace: Optional[str] = None,
filter: Optional[dict] = None,
**kwargs: Any,
) -> None:
"""根据向量ID或筛选条件删除。
参数:
ids:要删除的ID列表。
filter:用于筛选要删除的向量的条件字典。
"""
if namespace is None:
namespace = self._namespace
if delete_all:
self._index.delete(delete_all=True, namespace=namespace, **kwargs)
elif ids is not None:
chunk_size = 1000
for i in range(0, len(ids), chunk_size):
chunk = ids[i : i + chunk_size]
self._index.delete(ids=chunk, namespace=namespace, **kwargs)
elif filter is not None:
self._index.delete(filter=filter, namespace=namespace, **kwargs)
else:
raise ValueError("Either ids, delete_all, or filter must be provided.")
return None