from __future__ import annotations
import logging
import uuid
from typing import Any, Iterable, List, Optional, Tuple
import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
from langchain_community.vectorstores.utils import maximal_marginal_relevance
logger = logging.getLogger(__name__)
[docs]class Dingo(VectorStore):
"""`Dingo` 向量存储。
要使用,您应该已安装``dingodb`` python包。
示例:
.. code-block:: python
from langchain_community.vectorstores import Dingo
from langchain_community.embeddings.openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
dingo = Dingo(embeddings, "text")"""
[docs] def __init__(
self,
embedding: Embeddings,
text_key: str,
*,
client: Any = None,
index_name: Optional[str] = None,
dimension: int = 1024,
host: Optional[List[str]] = None,
user: str = "root",
password: str = "123123",
self_id: bool = False,
):
"""使用Dingo客户端进行初始化。"""
try:
import dingodb
except ImportError:
raise ImportError(
"Could not import dingo python package. "
"Please install it with `pip install dingodb."
)
host = host if host is not None else ["172.20.31.10:13000"]
# collection
if client is not None:
dingo_client = client
else:
try:
# connect to dingo db
dingo_client = dingodb.DingoDB(user, password, host)
except ValueError as e:
raise ValueError(f"Dingo failed to connect: {e}")
self._text_key = text_key
self._client = dingo_client
if (
index_name is not None
and index_name not in dingo_client.get_index()
and index_name.upper() not in dingo_client.get_index()
):
if self_id is True:
dingo_client.create_index(
index_name, dimension=dimension, auto_id=False
)
else:
dingo_client.create_index(index_name, dimension=dimension)
self._index_name = index_name
self._embedding = embedding
@property
def embeddings(self) -> Optional[Embeddings]:
return self._embedding
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
text_key: str = "text",
batch_size: int = 500,
**kwargs: Any,
) -> List[str]:
"""运行更多文本通过嵌入并添加到向量存储。
参数:
texts:要添加到向量存储的字符串的可迭代对象。
metadatas:与文本相关的元数据的可选列表。
ids:与文本关联的可选id列表。
返回:
将文本添加到向量存储中的id列表。
"""
# Embed and create the documents
ids = ids or [str(uuid.uuid4().int)[:13] for _ in texts]
metadatas_list = []
texts = list(texts)
embeds = self._embedding.embed_documents(texts)
for i, text in enumerate(texts):
metadata = metadatas[i] if metadatas else {}
metadata[self._text_key] = text
metadatas_list.append(metadata)
# upsert to Dingo
for i in range(0, len(list(texts)), batch_size):
j = i + batch_size
add_res = self._client.vector_add(
self._index_name, metadatas_list[i:j], embeds[i:j], ids[i:j]
)
if not add_res:
raise Exception("vector add fail")
return ids
[docs] def similarity_search(
self,
query: str,
k: int = 4,
search_params: Optional[dict] = None,
timeout: Optional[int] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与查询最相似的Dingo文档,以及它们的分数。
参数:
query: 要查找相似文档的文本。
k: 要返回的文档数量。默认为4。
search_params: 要在元数据上过滤的参数字典。
返回:
返回与查询最相似的文档列表,以及每个文档的分数。
"""
docs_and_scores = self.similarity_search_with_score(
query, k=k, search_params=search_params, **kwargs
)
return [doc for doc, _ in docs_and_scores]
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 4,
search_params: Optional[dict] = None,
timeout: Optional[int] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的Dingo文档,以及它们的分数。
参数:
query: 要查找相似文档的文本。
k: 要返回的文档数量。默认为4。
search_params: 要在元数据上过滤的参数字典。
返回:
返回与查询最相似的文档列表,以及每个文档的分数。
"""
docs = []
query_obj = self._embedding.embed_query(query)
results = self._client.vector_search(
self._index_name, xq=query_obj, top_k=k, search_params=search_params
)
if not results:
return []
for res in results[0]["vectorWithDistances"]:
score = res["distance"]
if (
"score_threshold" in kwargs
and kwargs.get("score_threshold") is not None
):
if score > kwargs.get("score_threshold"):
continue
metadatas = res["scalarData"]
id = res["id"]
text = metadatas[self._text_key]["fields"][0]["data"]
metadata = {"id": id, "text": text, "score": score}
for meta_key in metadatas.keys():
metadata[meta_key] = metadatas[meta_key]["fields"][0]["data"]
docs.append((Document(page_content=text, metadata=metadata), score))
return docs
[docs] def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
search_params: Optional[dict] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding:要查找相似文档的嵌入。
k:要返回的文档数量。默认为4。
fetch_k:要获取并传递给MMR算法的文档数量。
lambda_mult:0到1之间的数字,确定结果之间多样性的程度,
0对应最大多样性,1对应最小多样性。
默认为0.5。
返回:
由最大边际相关性选择的文档列表。
"""
results = self._client.vector_search(
self._index_name, [embedding], search_params=search_params, top_k=k
)
mmr_selected = maximal_marginal_relevance(
np.array([embedding], dtype=np.float32),
[
item["vector"]["floatValues"]
for item in results[0]["vectorWithDistances"]
],
k=k,
lambda_mult=lambda_mult,
)
selected = []
for i in mmr_selected:
meta_data = {}
for k, v in results[0]["vectorWithDistances"][i]["scalarData"].items():
meta_data.update({str(k): v["fields"][0]["data"]})
selected.append(meta_data)
return [
Document(page_content=metadata.pop(self._text_key), metadata=metadata)
for metadata in selected
]
[docs] def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
search_params: Optional[dict] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
query:要查找相似文档的文本。
k:要返回的文档数量。默认为4。
fetch_k:要获取以传递给MMR算法的文档数量。
lambda_mult:0到1之间的数字,确定结果之间多样性的程度,其中0对应最大多样性,1对应最小多样性。默认为0.5。
返回:
由最大边际相关性选择的文档列表。
"""
embedding = self._embedding.embed_query(query)
return self.max_marginal_relevance_search_by_vector(
embedding, k, fetch_k, lambda_mult, search_params
)
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
text_key: str = "text",
index_name: Optional[str] = None,
dimension: int = 1024,
client: Any = None,
host: List[str] = ["172.20.31.10:13000"],
user: str = "root",
password: str = "123123",
batch_size: int = 500,
**kwargs: Any,
) -> Dingo:
"""从原始文档构建 Dingo 包装器。
这是一个用户友好的接口,可以:
1. 嵌入文档。
2. 将文档添加到提供的 Dingo 索引中。
这旨在是一个快速入门的方式。
示例:
```python
from langchain_community.vectorstores import Dingo
from langchain_community.embeddings import OpenAIEmbeddings
import dingodb
embeddings = OpenAIEmbeddings()
dingo = Dingo.from_texts(
texts,
embeddings,
index_name="langchain-demo"
)
```
"""
try:
import dingodb
except ImportError:
raise ImportError(
"Could not import dingo python package. "
"Please install it with `pip install dingodb`."
)
if client is not None:
dingo_client = client
else:
try:
# connect to dingo db
dingo_client = dingodb.DingoDB(user, password, host)
except ValueError as e:
raise ValueError(f"Dingo failed to connect: {e}")
if kwargs is not None and kwargs.get("self_id") is True:
if (
index_name is not None
and index_name not in dingo_client.get_index()
and index_name.upper() not in dingo_client.get_index()
):
dingo_client.create_index(
index_name, dimension=dimension, auto_id=False
)
else:
if (
index_name is not None
and index_name not in dingo_client.get_index()
and index_name.upper() not in dingo_client.get_index()
):
dingo_client.create_index(index_name, dimension=dimension)
# Embed and create the documents
ids = ids or [str(uuid.uuid4().int)[:13] for _ in texts]
metadatas_list = []
texts = list(texts)
embeds = embedding.embed_documents(texts)
for i, text in enumerate(texts):
metadata = metadatas[i] if metadatas else {}
metadata[text_key] = text
metadatas_list.append(metadata)
# upsert to Dingo
for i in range(0, len(list(texts)), batch_size):
j = i + batch_size
add_res = dingo_client.vector_add(
index_name, metadatas_list[i:j], embeds[i:j], ids[i:j]
)
if not add_res:
raise Exception("vector add fail")
return cls(embedding, text_key, client=dingo_client, index_name=index_name)
[docs] def delete(
self,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> Any:
"""根据向量ID或过滤器进行删除。
参数:
ids:要删除的ID列表。
"""
if ids is None:
raise ValueError("No ids provided to delete.")
return self._client.vector_delete(self._index_name, ids=ids)