Source code for langchain_community.vectorstores.dingo

from __future__ import annotations

import logging
import uuid
from typing import Any, Iterable, List, Optional, Tuple

import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore

from langchain_community.vectorstores.utils import maximal_marginal_relevance

logger = logging.getLogger(__name__)


[docs]class Dingo(VectorStore): """`Dingo` 向量存储。 要使用,您应该已安装``dingodb`` python包。 示例: .. code-block:: python from langchain_community.vectorstores import Dingo from langchain_community.embeddings.openai import OpenAIEmbeddings embeddings = OpenAIEmbeddings() dingo = Dingo(embeddings, "text")"""
[docs] def __init__( self, embedding: Embeddings, text_key: str, *, client: Any = None, index_name: Optional[str] = None, dimension: int = 1024, host: Optional[List[str]] = None, user: str = "root", password: str = "123123", self_id: bool = False, ): """使用Dingo客户端进行初始化。""" try: import dingodb except ImportError: raise ImportError( "Could not import dingo python package. " "Please install it with `pip install dingodb." ) host = host if host is not None else ["172.20.31.10:13000"] # collection if client is not None: dingo_client = client else: try: # connect to dingo db dingo_client = dingodb.DingoDB(user, password, host) except ValueError as e: raise ValueError(f"Dingo failed to connect: {e}") self._text_key = text_key self._client = dingo_client if ( index_name is not None and index_name not in dingo_client.get_index() and index_name.upper() not in dingo_client.get_index() ): if self_id is True: dingo_client.create_index( index_name, dimension=dimension, auto_id=False ) else: dingo_client.create_index(index_name, dimension=dimension) self._index_name = index_name self._embedding = embedding
@property def embeddings(self) -> Optional[Embeddings]: return self._embedding
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, text_key: str = "text", batch_size: int = 500, **kwargs: Any, ) -> List[str]: """运行更多文本通过嵌入并添加到向量存储。 参数: texts:要添加到向量存储的字符串的可迭代对象。 metadatas:与文本相关的元数据的可选列表。 ids:与文本关联的可选id列表。 返回: 将文本添加到向量存储中的id列表。 """ # Embed and create the documents ids = ids or [str(uuid.uuid4().int)[:13] for _ in texts] metadatas_list = [] texts = list(texts) embeds = self._embedding.embed_documents(texts) for i, text in enumerate(texts): metadata = metadatas[i] if metadatas else {} metadata[self._text_key] = text metadatas_list.append(metadata) # upsert to Dingo for i in range(0, len(list(texts)), batch_size): j = i + batch_size add_res = self._client.vector_add( self._index_name, metadatas_list[i:j], embeds[i:j], ids[i:j] ) if not add_res: raise Exception("vector add fail") return ids
[docs] def similarity_search_with_score( self, query: str, k: int = 4, search_params: Optional[dict] = None, timeout: Optional[int] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与查询最相似的Dingo文档,以及它们的分数。 参数: query: 要查找相似文档的文本。 k: 要返回的文档数量。默认为4。 search_params: 要在元数据上过滤的参数字典。 返回: 返回与查询最相似的文档列表,以及每个文档的分数。 """ docs = [] query_obj = self._embedding.embed_query(query) results = self._client.vector_search( self._index_name, xq=query_obj, top_k=k, search_params=search_params ) if not results: return [] for res in results[0]["vectorWithDistances"]: score = res["distance"] if ( "score_threshold" in kwargs and kwargs.get("score_threshold") is not None ): if score > kwargs.get("score_threshold"): continue metadatas = res["scalarData"] id = res["id"] text = metadatas[self._text_key]["fields"][0]["data"] metadata = {"id": id, "text": text, "score": score} for meta_key in metadatas.keys(): metadata[meta_key] = metadatas[meta_key]["fields"][0]["data"] docs.append((Document(page_content=text, metadata=metadata), score)) return docs
[docs] def max_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, search_params: Optional[dict] = None, **kwargs: Any, ) -> List[Document]: """返回使用最大边际相关性选择的文档。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: embedding:要查找相似文档的嵌入。 k:要返回的文档数量。默认为4。 fetch_k:要获取并传递给MMR算法的文档数量。 lambda_mult:0到1之间的数字,确定结果之间多样性的程度, 0对应最大多样性,1对应最小多样性。 默认为0.5。 返回: 由最大边际相关性选择的文档列表。 """ results = self._client.vector_search( self._index_name, [embedding], search_params=search_params, top_k=k ) mmr_selected = maximal_marginal_relevance( np.array([embedding], dtype=np.float32), [ item["vector"]["floatValues"] for item in results[0]["vectorWithDistances"] ], k=k, lambda_mult=lambda_mult, ) selected = [] for i in mmr_selected: meta_data = {} for k, v in results[0]["vectorWithDistances"][i]["scalarData"].items(): meta_data.update({str(k): v["fields"][0]["data"]}) selected.append(meta_data) return [ Document(page_content=metadata.pop(self._text_key), metadata=metadata) for metadata in selected ]
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, text_key: str = "text", index_name: Optional[str] = None, dimension: int = 1024, client: Any = None, host: List[str] = ["172.20.31.10:13000"], user: str = "root", password: str = "123123", batch_size: int = 500, **kwargs: Any, ) -> Dingo: """从原始文档构建 Dingo 包装器。 这是一个用户友好的接口,可以: 1. 嵌入文档。 2. 将文档添加到提供的 Dingo 索引中。 这旨在是一个快速入门的方式。 示例: ```python from langchain_community.vectorstores import Dingo from langchain_community.embeddings import OpenAIEmbeddings import dingodb embeddings = OpenAIEmbeddings() dingo = Dingo.from_texts( texts, embeddings, index_name="langchain-demo" ) ``` """ try: import dingodb except ImportError: raise ImportError( "Could not import dingo python package. " "Please install it with `pip install dingodb`." ) if client is not None: dingo_client = client else: try: # connect to dingo db dingo_client = dingodb.DingoDB(user, password, host) except ValueError as e: raise ValueError(f"Dingo failed to connect: {e}") if kwargs is not None and kwargs.get("self_id") is True: if ( index_name is not None and index_name not in dingo_client.get_index() and index_name.upper() not in dingo_client.get_index() ): dingo_client.create_index( index_name, dimension=dimension, auto_id=False ) else: if ( index_name is not None and index_name not in dingo_client.get_index() and index_name.upper() not in dingo_client.get_index() ): dingo_client.create_index(index_name, dimension=dimension) # Embed and create the documents ids = ids or [str(uuid.uuid4().int)[:13] for _ in texts] metadatas_list = [] texts = list(texts) embeds = embedding.embed_documents(texts) for i, text in enumerate(texts): metadata = metadatas[i] if metadatas else {} metadata[text_key] = text metadatas_list.append(metadata) # upsert to Dingo for i in range(0, len(list(texts)), batch_size): j = i + batch_size add_res = dingo_client.vector_add( index_name, metadatas_list[i:j], embeds[i:j], ids[i:j] ) if not add_res: raise Exception("vector add fail") return cls(embedding, text_key, client=dingo_client, index_name=index_name)
[docs] def delete( self, ids: Optional[List[str]] = None, **kwargs: Any, ) -> Any: """根据向量ID或过滤器进行删除。 参数: ids:要删除的ID列表。 """ if ids is None: raise ValueError("No ids provided to delete.") return self._client.vector_delete(self._index_name, ids=ids)