from __future__ import annotations
import logging
import uuid
from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Tuple, Union
import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.utils.iter import batch_iterate
from langchain_core.vectorstores import VectorStore
from langchain_community.vectorstores.utils import (
maximal_marginal_relevance,
)
if TYPE_CHECKING:
from upstash_vector import AsyncIndex, Index
from upstash_vector.types import InfoResult
logger = logging.getLogger(__name__)
[docs]class UpstashVectorStore(VectorStore):
"""Upstash Vector向量存储
要使用,必须安装``upstash-vector`` python包。
还需要一个Upstash Vector索引。首先创建一个新的Upstash Vector索引,复制`index_url`和`index_token`变量。然后通过构造函数传递它们,或者设置环境变量`UPSTASH_VECTOR_REST_URL`和`UPSTASH_VECTOR_REST_TOKEN`。
示例:
.. code-block:: python
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import UpstashVectorStore
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vectorstore = UpstashVectorStore(
embedding=embeddings,
index_url="...",
index_token="..."
)
# 或者
import os
os.environ["UPSTASH_VECTOR_REST_URL"] = "..."
os.environ["UPSTASH_VECTOR_REST_TOKEN"] = "..."
vectorstore = UpstashVectorStore(
embedding=embeddings
)
"""
[docs] def __init__(
self,
text_key: str = "text",
index: Optional[Index] = None,
async_index: Optional[AsyncIndex] = None,
index_url: Optional[str] = None,
index_token: Optional[str] = None,
embedding: Optional[Union[Embeddings, bool]] = None,
):
"""UpstashVectorStore的构造函数。
如果未提供index或index_url和index_token,则构造函数将尝试使用环境变量`UPSTASH_VECTOR_REST_URL`和`UPSTASH_VECTOR_REST_TOKEN`来创建索引。
参数:
text_key:用于在元数据中存储文本的键。
index:UpstashVector索引对象。
async_index:UpstashVector AsyncIndex对象,仅在需要异步函数时提供。
index_url:UpstashVector索引的URL。
index_token:UpstashVector索引的令牌。
embedding:嵌入对象或布尔值。当为false时,不应用嵌入。如果为true,则使用Upstash嵌入。当使用Upstash嵌入时,文本将直接发送到Upstash,并在那里应用嵌入,而不是在Langchain中进行嵌入。
示例:
.. code-block:: python
from langchain_community.vectorstores.upstash import UpstashVectorStore
from langchain_community.embeddings.openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vectorstore = UpstashVectorStore(
embedding=embeddings,
index_url="...",
index_token="..."
)
# 使用现有索引
from upstash_vector import Index
index = Index(url="...", token="...")
vectorstore = UpstashVectorStore(
embedding=embeddings,
index=index
)
"""
try:
from upstash_vector import AsyncIndex, Index
except ImportError:
raise ImportError(
"Could not import upstash_vector python package. "
"Please install it with `pip install upstash_vector`."
)
if index:
if not isinstance(index, Index):
raise ValueError(
"Passed index object should be an "
"instance of upstash_vector.Index, "
f"got {type(index)}"
)
self._index = index
logger.info("Using the index passed as parameter")
if async_index:
if not isinstance(async_index, AsyncIndex):
raise ValueError(
"Passed index object should be an "
"instance of upstash_vector.AsyncIndex, "
f"got {type(async_index)}"
)
self._async_index = async_index
logger.info("Using the async index passed as parameter")
if index_url and index_token:
self._index = Index(url=index_url, token=index_token)
self._async_index = AsyncIndex(url=index_url, token=index_token)
logger.info("Created index from the index_url and index_token parameters")
elif not index and not async_index:
self._index = Index.from_env()
self._async_index = AsyncIndex.from_env()
logger.info("Created index using environment variables")
self._embeddings = embedding
self._text_key = text_key
@property
def embeddings(self) -> Optional[Union[Embeddings, bool]]: # type: ignore
"""如果可用,访问查询嵌入对象。"""
return self._embeddings
def _embed_documents(
self, texts: Iterable[str]
) -> Union[List[List[float]], List[str]]:
"""使用嵌入对象嵌入字符串"""
if not self._embeddings:
raise ValueError(
"No embeddings object provided. "
"Pass an embeddings object to the constructor."
)
if isinstance(self._embeddings, Embeddings):
return self._embeddings.embed_documents(list(texts))
# using self._embeddings is True, Upstash embeddings will be used.
# returning list of text as List[str]
return list(texts)
def _embed_query(self, text: str) -> Union[List[float], str]:
"""使用嵌入对象嵌入查询文本。"""
if not self._embeddings:
raise ValueError(
"No embeddings object provided. "
"Pass an embeddings object to the constructor."
)
if isinstance(self._embeddings, Embeddings):
return self._embeddings.embed_query(text)
# using self._embeddings is True, Upstash embeddings will be used.
# returning query as it is
return text
[docs] def add_documents(
self,
documents: List[Document],
ids: Optional[List[str]] = None,
batch_size: int = 32,
embedding_chunk_size: int = 1000,
**kwargs: Any,
) -> List[str]:
"""为文档获取嵌入并将其添加到向量存储中。
文档以大小为`embedding_chunk_size`的批次发送到嵌入对象。
然后以大小为`batch_size`的批次将嵌入数据插入到向量存储中。
参数:
documents: 要添加到向量存储中的文档的可迭代对象。
batch_size: 在插入嵌入数据时要使用的批处理大小。
Upstash支持每个请求最多1000个向量。
embedding_batch_size: 在嵌入文本时要使用的块大小。
返回:
将文本添加到向量存储中的ID列表。
"""
texts = [doc.page_content for doc in documents]
metadatas = [doc.metadata for doc in documents]
return self.add_texts(
texts,
metadatas=metadatas,
batch_size=batch_size,
ids=ids,
embedding_chunk_size=embedding_chunk_size,
**kwargs,
)
[docs] async def aadd_documents(
self,
documents: Iterable[Document],
ids: Optional[List[str]] = None,
batch_size: int = 32,
embedding_chunk_size: int = 1000,
**kwargs: Any,
) -> List[str]:
"""为文档获取嵌入并将其添加到向量存储中。
文档以大小为`embedding_chunk_size`的批次发送到嵌入对象。
然后以大小为`batch_size`的批次将嵌入数据插入到向量存储中。
参数:
documents: 要添加到向量存储中的文档的可迭代对象。
batch_size: 在插入嵌入数据时要使用的批处理大小。
Upstash支持每个请求最多1000个向量。
embedding_batch_size: 在嵌入文本时要使用的块大小。
返回:
将文本添加到向量存储中的ID列表。
"""
texts = [doc.page_content for doc in documents]
metadatas = [doc.metadata for doc in documents]
return await self.aadd_texts(
texts,
metadatas=metadatas,
ids=ids,
batch_size=batch_size,
embedding_chunk_size=embedding_chunk_size,
**kwargs,
)
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
batch_size: int = 32,
embedding_chunk_size: int = 1000,
**kwargs: Any,
) -> List[str]:
"""获取文本的嵌入并将其添加到向量存储中。
文本以大小为`embedding_chunk_size`的批次发送到嵌入对象中。
然后以大小为`batch_size`的批次将嵌入数据更新到向量存储中。
参数:
texts: 要添加到向量存储中的字符串的可迭代对象。
metadatas: 与文本相关的元数据的可选列表。
ids: 与文本关联的可选id列表。
batch_size: 更新嵌入数据时要使用的批次大小。
Upstash支持每个请求最多1000个向量。
embedding_batch_size: 嵌入文本时要使用的块大小。
返回:
将文本添加到向量存储中的id列表。
"""
texts = list(texts)
ids = ids or [str(uuid.uuid4()) for _ in texts]
# Copy metadatas to avoid modifying the original documents
if metadatas:
metadatas = [m.copy() for m in metadatas]
else:
metadatas = [{} for _ in texts]
# Add text to metadata
for metadata, text in zip(metadatas, texts):
metadata[self._text_key] = text
for i in range(0, len(texts), embedding_chunk_size):
chunk_texts = texts[i : i + embedding_chunk_size]
chunk_ids = ids[i : i + embedding_chunk_size]
chunk_metadatas = metadatas[i : i + embedding_chunk_size]
embeddings = self._embed_documents(chunk_texts)
for batch in batch_iterate(
batch_size, zip(chunk_ids, embeddings, chunk_metadatas)
):
self._index.upsert(vectors=batch, **kwargs)
return ids
[docs] async def aadd_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
batch_size: int = 32,
embedding_chunk_size: int = 1000,
**kwargs: Any,
) -> List[str]:
"""获取文本的嵌入并将其添加到向量存储中。
文本以大小为`embedding_chunk_size`的批次发送到嵌入对象中。
然后以大小为`batch_size`的批次将嵌入数据更新到向量存储中。
参数:
texts: 要添加到向量存储中的字符串的可迭代对象。
metadatas: 与文本相关的元数据的可选列表。
ids: 与文本关联的可选id列表。
batch_size: 更新嵌入数据时要使用的批次大小。
Upstash支持每个请求最多1000个向量。
embedding_batch_size: 嵌入文本时要使用的块大小。
返回:
将文本添加到向量存储中的id列表。
"""
texts = list(texts)
ids = ids or [str(uuid.uuid4()) for _ in texts]
# Copy metadatas to avoid modifying the original documents
if metadatas:
metadatas = [m.copy() for m in metadatas]
else:
metadatas = [{} for _ in texts]
# Add text to metadata
for metadata, text in zip(metadatas, texts):
metadata[self._text_key] = text
for i in range(0, len(texts), embedding_chunk_size):
chunk_texts = texts[i : i + embedding_chunk_size]
chunk_ids = ids[i : i + embedding_chunk_size]
chunk_metadatas = metadatas[i : i + embedding_chunk_size]
embeddings = self._embed_documents(chunk_texts)
for batch in batch_iterate(
batch_size, zip(chunk_ids, embeddings, chunk_metadatas)
):
await self._async_index.upsert(vectors=batch, **kwargs)
return ids
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""检索与查询最相似的文本,并将结果转换为`Document`对象。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
filter:可选的元数据过滤器,以str格式表示。
返回:
与查询最相似的文档列表,以及每个文档的得分。
"""
return self.similarity_search_by_vector_with_score(
self._embed_query(query), k=k, filter=filter, **kwargs
)
[docs] async def asimilarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""检索与查询最相似的文本,并将结果转换为`Document`对象。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
filter:可选的元数据过滤器,以str格式表示。
返回:
与查询最相似的文档列表,以及每个文档的得分。
"""
return await self.asimilarity_search_by_vector_with_score(
self._embed_query(query), k=k, filter=filter, **kwargs
)
def _process_results(self, results: List) -> List[Tuple[Document, float]]:
docs = []
for res in results:
metadata = res.metadata
if metadata and self._text_key in metadata:
text = metadata.pop(self._text_key)
doc = Document(page_content=text, metadata=metadata)
docs.append((doc, res.score))
else:
logger.warning(
f"Found document with no `{self._text_key}` key. Skipping."
)
return docs
[docs] def similarity_search_by_vector_with_score(
self,
embedding: Union[List[float], str],
k: int = 4,
filter: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与给定嵌入最接近的文本"""
filter = filter or ""
if isinstance(embedding, str):
results = self._index.query(
data=embedding, top_k=k, include_metadata=True, filter=filter, **kwargs
)
else:
results = self._index.query(
vector=embedding,
top_k=k,
include_metadata=True,
filter=filter,
**kwargs,
)
return self._process_results(results)
[docs] async def asimilarity_search_by_vector_with_score(
self,
embedding: Union[List[float], str],
k: int = 4,
filter: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与给定嵌入最接近的文本"""
filter = filter or ""
if isinstance(embedding, str):
results = await self._async_index.query(
data=embedding, top_k=k, include_metadata=True, filter=filter, **kwargs
)
else:
results = await self._async_index.query(
vector=embedding,
top_k=k,
include_metadata=True,
filter=filter,
**kwargs,
)
return self._process_results(results)
[docs] def similarity_search(
self,
query: str,
k: int = 4,
filter: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与查询最相似的文档。
参数:
query:要查找相似文档的文本。
k:要返回的文档数量。默认为4。
filter:可选的元数据过滤器,以字符串格式表示。
返回:
与查询最相似的文档列表,以及每个文档的分数。
"""
docs_and_scores = self.similarity_search_with_score(
query, k=k, filter=filter, **kwargs
)
return [doc for doc, _ in docs_and_scores]
[docs] async def asimilarity_search(
self,
query: str,
k: int = 4,
filter: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与查询最相似的文档。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
filter:可选的元数据过滤器,以str格式表示。
返回:
返回与查询最相似的文档列表。
"""
docs_and_scores = await self.asimilarity_search_with_score(
query, k=k, filter=filter, **kwargs
)
return [doc for doc, _ in docs_and_scores]
[docs] def similarity_search_by_vector(
self,
embedding: Union[List[float], str],
k: int = 4,
filter: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与给定嵌入最接近的文档。
参数:
embedding:要查找类似文档的嵌入。
k:要返回的文档数量。默认为4。
filter:可选的元数据过滤器,以str格式表示。
返回:
与查询最相似的文档列表。
"""
docs_and_scores = self.similarity_search_by_vector_with_score(
embedding, k=k, filter=filter, **kwargs
)
return [doc for doc, _ in docs_and_scores]
[docs] async def asimilarity_search_by_vector(
self,
embedding: Union[List[float], str],
k: int = 4,
filter: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与给定嵌入最接近的文档。
参数:
embedding:要查找类似文档的嵌入。
k:要返回的文档数量。默认为4。
filter:可选的元数据过滤器,以str格式表示。
返回:
与查询最相似的文档列表。
"""
docs_and_scores = await self.asimilarity_search_by_vector_with_score(
embedding, k=k, filter=filter, **kwargs
)
return [doc for doc, _ in docs_and_scores]
def _similarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
filter: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""
由于Upstash始终返回相关性分数,因此使用默认实现。
"""
return self.similarity_search_with_score(query, k=k, filter=filter, **kwargs)
async def _asimilarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
filter: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""
由于Upstash始终返回相关性分数,因此使用默认实现。
"""
return await self.asimilarity_search_with_score(
query, k=k, filter=filter, **kwargs
)
[docs] def max_marginal_relevance_search_by_vector(
self,
embedding: Union[List[float], str],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding: 查找与之相似文档的嵌入。
k: 要返回的文档数量。默认为4。
fetch_k: 要获取的文档数量,以传递给MMR算法。
lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度,0表示最大多样性,1表示最小多样性。默认为0.5。
filter: str格式的可选元数据过滤器
返回:
通过最大边际相关性选择的文档列表。
"""
assert isinstance(self.embeddings, Embeddings)
if isinstance(embedding, str):
results = self._index.query(
data=embedding,
top_k=fetch_k,
include_vectors=True,
include_metadata=True,
filter=filter or "",
**kwargs,
)
else:
results = self._index.query(
vector=embedding,
top_k=fetch_k,
include_vectors=True,
include_metadata=True,
filter=filter or "",
**kwargs,
)
mmr_selected = maximal_marginal_relevance(
np.array([embedding], dtype=np.float32),
[item.vector for item in results],
k=k,
lambda_mult=lambda_mult,
)
selected = [results[i].metadata for i in mmr_selected]
return [
Document(page_content=metadata.pop((self._text_key)), metadata=metadata) # type: ignore
for metadata in selected
]
[docs] async def amax_marginal_relevance_search_by_vector(
self,
embedding: Union[List[float], str],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding: 查找与之相似文档的嵌入。
k: 要返回的文档数量。默认为4。
fetch_k: 要获取的文档数量,以传递给MMR算法。
lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度,0表示最大多样性,1表示最小多样性。默认为0.5。
filter: str格式的可选元数据过滤器
返回:
通过最大边际相关性选择的文档列表。
"""
assert isinstance(self.embeddings, Embeddings)
if isinstance(embedding, str):
results = await self._async_index.query(
data=embedding,
top_k=fetch_k,
include_vectors=True,
include_metadata=True,
filter=filter or "",
**kwargs,
)
else:
results = await self._async_index.query(
vector=embedding,
top_k=fetch_k,
include_vectors=True,
include_metadata=True,
filter=filter or "",
**kwargs,
)
mmr_selected = maximal_marginal_relevance(
np.array([embedding], dtype=np.float32),
[item.vector for item in results],
k=k,
lambda_mult=lambda_mult,
)
selected = [results[i].metadata for i in mmr_selected]
return [
Document(page_content=metadata.pop((self._text_key)), metadata=metadata) # type: ignore
for metadata in selected
]
[docs] def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
fetch_k:要获取以传递给MMR算法的文档数量。
lambda_mult:0到1之间的数字,确定结果之间多样性的程度,其中0对应最大多样性,1对应最小多样性。默认为0.5。
filter:可选的元数据过滤器,以str格式。
返回:
通过最大边际相关性选择的文档列表。
"""
embedding = self._embed_query(query)
return self.max_marginal_relevance_search_by_vector(
embedding=embedding,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
filter=filter,
**kwargs,
)
[docs] async def amax_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
fetch_k:要获取以传递给MMR算法的文档数量。
lambda_mult:0到1之间的数字,确定结果之间多样性的程度,其中0对应最大多样性,1对应最小多样性。默认为0.5。
filter:可选的元数据过滤器,以str格式。
返回:
通过最大边际相关性选择的文档列表。
"""
embedding = self._embed_query(query)
return await self.amax_marginal_relevance_search_by_vector(
embedding=embedding,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
filter=filter,
**kwargs,
)
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
embedding_chunk_size: int = 1000,
batch_size: int = 32,
text_key: str = "text",
index: Optional[Index] = None,
async_index: Optional[AsyncIndex] = None,
index_url: Optional[str] = None,
index_token: Optional[str] = None,
**kwargs: Any,
) -> UpstashVectorStore:
"""从文本列表创建一个新的UpstashVectorStore。
示例:
.. code-block:: python
from langchain_community.vectorstores.upstash import UpstashVectorStore
from langchain_community.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vector_store = UpstashVectorStore.from_texts(
texts,
embeddings,
)
"""
vector_store = cls(
embedding=embedding,
text_key=text_key,
index=index,
async_index=async_index,
index_url=index_url,
index_token=index_token,
**kwargs,
)
vector_store.add_texts(
texts,
metadatas=metadatas,
ids=ids,
batch_size=batch_size,
embedding_chunk_size=embedding_chunk_size,
)
return vector_store
[docs] @classmethod
async def afrom_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
embedding_chunk_size: int = 1000,
batch_size: int = 32,
text_key: str = "text",
index: Optional[Index] = None,
async_index: Optional[AsyncIndex] = None,
index_url: Optional[str] = None,
index_token: Optional[str] = None,
**kwargs: Any,
) -> UpstashVectorStore:
"""从文本列表创建一个新的UpstashVectorStore。
示例:
.. code-block:: python
from langchain_community.vectorstores.upstash import UpstashVectorStore
from langchain_community.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vector_store = UpstashVectorStore.from_texts(
texts,
embeddings,
)
"""
vector_store = cls(
embedding=embedding,
text_key=text_key,
index=index,
async_index=async_index,
index_url=index_url,
index_token=index_token,
**kwargs,
)
await vector_store.aadd_texts(
texts,
metadatas=metadatas,
ids=ids,
batch_size=batch_size,
embedding_chunk_size=embedding_chunk_size,
)
return vector_store
[docs] def delete(
self,
ids: Optional[List[str]] = None,
delete_all: Optional[bool] = None,
batch_size: Optional[int] = 1000,
**kwargs: Any,
) -> None:
"""根据向量ID删除
参数:
ids:要删除的ID列表。
delete_all:删除索引中的所有向量。
batch_size:删除嵌入时要使用的批量大小。
Upstash每个请求最多支持1000个删除。
"""
if delete_all:
self._index.reset()
elif ids is not None:
for batch in batch_iterate(batch_size, ids):
self._index.delete(ids=batch)
else:
raise ValueError("Either ids or delete_all should be provided")
return None
[docs] async def adelete(
self,
ids: Optional[List[str]] = None,
delete_all: Optional[bool] = None,
batch_size: Optional[int] = 1000,
**kwargs: Any,
) -> None:
"""根据向量ID删除
参数:
ids:要删除的ID列表。
delete_all:删除索引中的所有向量。
batch_size:删除嵌入时要使用的批量大小。
Upstash每个请求最多支持1000个删除。
"""
if delete_all:
await self._async_index.reset()
elif ids is not None:
for batch in batch_iterate(batch_size, ids):
await self._async_index.delete(ids=batch)
else:
raise ValueError("Either ids or delete_all should be provided")
return None
[docs] def info(self) -> InfoResult:
"""获取有关索引的统计信息。
返回:
- 向量的总数
- 等待被索引的向量总数
- 磁盘上索引的总大小(以字节为单位)
- 索引的维度计数
- 为索引选择的相似性函数
"""
return self._index.info()
[docs] async def ainfo(self) -> InfoResult:
"""获取有关索引的统计信息。
返回:
- 向量的总数
- 等待被索引的向量总数
- 磁盘上索引的总大小(以字节为单位)
- 索引的维度计数
- 为索引选择的相似性函数
"""
return await self._async_index.info()