from __future__ import annotations
import logging
import uuid
from typing import (
Any,
Iterable,
List,
Optional,
Tuple,
)
import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.utils import get_from_env
from langchain_core.vectorstores import VectorStore
from langchain_community.vectorstores.utils import maximal_marginal_relevance
logger = logging.getLogger(__name__)
[docs]class DashVector(VectorStore):
"""`DashVector`向量存储。
要使用,您应该已安装``dashvector`` Python包。
示例:
.. code-block:: python
from langchain_community.vectorstores import DashVector
from langchain_community.embeddings.openai import OpenAIEmbeddings
import dashvector
client = dashvector.Client(api_key="***")
client.create("langchain", dimension=1024)
collection = client.get("langchain")
embeddings = OpenAIEmbeddings()
vectorstore = DashVector(collection, embeddings.embed_query, "text")
"""
[docs] def __init__(
self,
collection: Any,
embedding: Embeddings,
text_field: str,
):
"""使用DashVector集合进行初始化。"""
try:
import dashvector
except ImportError:
raise ImportError(
"Could not import dashvector python package. "
"Please install it with `pip install dashvector`."
)
if not isinstance(collection, dashvector.Collection):
raise ValueError(
f"collection should be an instance of dashvector.Collection, "
f"bug got {type(collection)}"
)
self._collection = collection
self._embedding = embedding
self._text_field = text_field
def _create_partition_if_not_exists(self, partition: str) -> None:
"""在当前集合中创建一个分区。"""
self._collection.create_partition(partition)
def _similarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[str] = None,
partition: str = "default",
) -> List[Tuple[Document, float]]:
"""返回与查询向量最相似的文档,以及相应的分数。"""
# query by vector
ret = self._collection.query(
embedding, topk=k, filter=filter, partition=partition
)
if not ret:
raise ValueError(
f"Fail to query docs by vector, error {self._collection.message}"
)
docs = []
for doc in ret:
metadata = doc.fields
text = metadata.pop(self._text_field)
score = doc.score
docs.append((Document(page_content=text, metadata=metadata), score))
return docs
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
batch_size: int = 25,
partition: str = "default",
**kwargs: Any,
) -> List[str]:
"""运行更多的文本通过嵌入并添加到向量存储中。
参数:
texts:要添加到向量存储中的字符串的可迭代对象。
metadatas:与文本相关联的元数据的可选列表。
ids:与文本相关联的id的可选列表。
batch_size:更新文档的可选批量大小。
partition:集合中的一个分区名称。[可选]。
kwargs:向量存储特定参数
返回:
将文本添加到向量存储中的id列表。
"""
self._create_partition_if_not_exists(partition)
ids = ids or [str(uuid.uuid4().hex) for _ in texts]
text_list = list(texts)
for i in range(0, len(text_list), batch_size):
# batch end
end = min(i + batch_size, len(text_list))
batch_texts = text_list[i:end]
batch_ids = ids[i:end]
batch_embeddings = self._embedding.embed_documents(list(batch_texts))
# batch metadatas
if metadatas:
batch_metadatas = metadatas[i:end]
else:
batch_metadatas = [{} for _ in range(i, end)]
for metadata, text in zip(batch_metadatas, batch_texts):
metadata[self._text_field] = text
# batch upsert to collection
docs = list(zip(batch_ids, batch_embeddings, batch_metadatas))
ret = self._collection.upsert(docs, partition=partition)
if not ret:
raise ValueError(
f"Fail to upsert docs to dashvector vector database,"
f"Error: {ret.message}"
)
return ids
[docs] def delete(
self, ids: Optional[List[str]] = None, partition: str = "default", **kwargs: Any
) -> bool:
"""根据向量ID删除。
参数:
ids:要删除的ID列表。
partition:集合中的一个分区名称。[可选]。
返回:
如果删除成功,则为True,
否则为False。
"""
return bool(self._collection.delete(ids, partition=partition))
[docs] def similarity_search(
self,
query: str,
k: int = 4,
filter: Optional[str] = None,
partition: str = "default",
**kwargs: Any,
) -> List[Document]:
"""返回与查询最相似的文档。
参数:
query:要搜索与之相似文档的文本。
k:要返回的文档数量。默认为4。
filter:满足SQL where子句规范的文档字段过滤条件。
partition:集合中的一个分区名称。[可选]。
返回:
与查询文本最相似的文档列表。
"""
docs_and_scores = self.similarity_search_with_relevance_scores(
query, k, filter, partition
)
return [doc for doc, _ in docs_and_scores]
[docs] def similarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
filter: Optional[str] = None,
partition: str = "default",
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与查询文本最相似的文档,以及相关性分数。
Less表示更相似,more表示更不相似。
参数:
query:输入文本
k:要返回的文档数量。默认为4。
filter:满足SQL where子句规范的文档字段过滤条件。
partition:集合中的一个分区名称。[可选]。
返回:
元组列表(doc,similarity_score)
"""
embedding = self._embedding.embed_query(query)
return self._similarity_search_with_score_by_vector(
embedding, k=k, filter=filter, partition=partition
)
[docs] def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[str] = None,
partition: str = "default",
**kwargs: Any,
) -> List[Document]:
"""返回与嵌入向量最相似的文档。
参数:
embedding: 要查找与之相似的文档的嵌入。
k: 要返回的文档数量。默认为4。
filter: 满足SQL where子句规范的文档字段过滤条件。
partition: 集合中的一个分区名称。[可选]。
返回:
与查询向量最相似的文档列表。
"""
docs_and_scores = self._similarity_search_with_score_by_vector(
embedding, k, filter, partition
)
return [doc for doc, _ in docs_and_scores]
[docs] def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[dict] = None,
partition: str = "default",
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding: 查找与之相似的文档的嵌入。
k: 要返回的文档数量。默认为4。
fetch_k: 要获取以传递给MMR算法的文档数量。
lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度,其中0对应最大多样性,1对应最小多样性。默认为0.5。
filter: 满足SQL where子句规范的文档字段过滤条件。
partition: 集合中的分区名称。[可选]。
返回:
通过最大边际相关性选择的文档列表。
"""
embedding = self._embedding.embed_query(query)
return self.max_marginal_relevance_search_by_vector(
embedding, k, fetch_k, lambda_mult, filter, partition
)
[docs] def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[dict] = None,
partition: str = "default",
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding: 查找与之相似的文档的嵌入。
k: 要返回的文档数量。默认为4。
fetch_k: 要获取以传递给MMR算法的文档数量。
lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度,
其中0对应于最大多样性,1对应于最小多样性。
默认为0.5。
filter: 满足SQL where子句规范的文档字段过滤条件。
partition: 集合中的一个分区名称。[可选]。
返回:
通过最大边际相关性选择的文档列表。
"""
# query by vector
ret = self._collection.query(
embedding,
topk=fetch_k,
filter=filter,
partition=partition,
include_vector=True,
)
if not ret:
raise ValueError(
f"Fail to query docs by vector, error {self._collection.message}"
)
candidate_embeddings = [doc.vector for doc in ret]
mmr_selected = maximal_marginal_relevance(
np.array(embedding), candidate_embeddings, lambda_mult, k
)
metadatas = [ret.output[i].fields for i in mmr_selected]
return [
Document(page_content=metadata.pop(self._text_field), metadata=metadata)
for metadata in metadatas
]
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
dashvector_api_key: Optional[str] = None,
dashvector_endpoint: Optional[str] = None,
collection_name: str = "langchain",
text_field: str = "text",
batch_size: int = 25,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> DashVector:
"""返回从文本和嵌入初始化的DashVector VectorStore。
这是使用dashvector向量存储快速开始的方法。
示例:
.. code-block:: python
from langchain_community.vectorstores import DashVector
from langchain_community.embeddings import OpenAIEmbeddings
import dashvector
embeddings = OpenAIEmbeddings()
dashvector = DashVector.from_documents(
docs,
embeddings,
dashvector_api_key="{DASHVECTOR_API_KEY}"
)
"""
try:
import dashvector
except ImportError:
raise ImportError(
"Could not import dashvector python package. "
"Please install it with `pip install dashvector`."
)
dashvector_api_key = dashvector_api_key or get_from_env(
"dashvector_api_key", "DASHVECTOR_API_KEY"
)
dashvector_endpoint = dashvector_endpoint or get_from_env(
"dashvector_endpoint",
"DASHVECTOR_ENDPOINT",
default="dashvector.cn-hangzhou.aliyuncs.com",
)
dashvector_client = dashvector.Client(
api_key=dashvector_api_key, endpoint=dashvector_endpoint
)
dashvector_client.delete(collection_name)
collection = dashvector_client.get(collection_name)
if not collection:
dim = len(embedding.embed_query(texts[0]))
# create collection if not existed
resp = dashvector_client.create(collection_name, dimension=dim)
if resp:
collection = dashvector_client.get(collection_name)
else:
raise ValueError(
"Fail to create collection. " f"Error: {resp.message}."
)
dashvector_vector_db = cls(collection, embedding, text_field)
dashvector_vector_db.add_texts(texts, metadatas, ids, batch_size)
return dashvector_vector_db