Source code for langchain_community.vectorstores.dashvector

from __future__ import annotations

import logging
import uuid
from typing import (
    Any,
    Iterable,
    List,
    Optional,
    Tuple,
)

import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.utils import get_from_env
from langchain_core.vectorstores import VectorStore

from langchain_community.vectorstores.utils import maximal_marginal_relevance

logger = logging.getLogger(__name__)


[docs]class DashVector(VectorStore): """`DashVector`向量存储。 要使用,您应该已安装``dashvector`` Python包。 示例: .. code-block:: python from langchain_community.vectorstores import DashVector from langchain_community.embeddings.openai import OpenAIEmbeddings import dashvector client = dashvector.Client(api_key="***") client.create("langchain", dimension=1024) collection = client.get("langchain") embeddings = OpenAIEmbeddings() vectorstore = DashVector(collection, embeddings.embed_query, "text") """
[docs] def __init__( self, collection: Any, embedding: Embeddings, text_field: str, ): """使用DashVector集合进行初始化。""" try: import dashvector except ImportError: raise ImportError( "Could not import dashvector python package. " "Please install it with `pip install dashvector`." ) if not isinstance(collection, dashvector.Collection): raise ValueError( f"collection should be an instance of dashvector.Collection, " f"bug got {type(collection)}" ) self._collection = collection self._embedding = embedding self._text_field = text_field
def _create_partition_if_not_exists(self, partition: str) -> None: """在当前集合中创建一个分区。""" self._collection.create_partition(partition) def _similarity_search_with_score_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[str] = None, partition: str = "default", ) -> List[Tuple[Document, float]]: """返回与查询向量最相似的文档,以及相应的分数。""" # query by vector ret = self._collection.query( embedding, topk=k, filter=filter, partition=partition ) if not ret: raise ValueError( f"Fail to query docs by vector, error {self._collection.message}" ) docs = [] for doc in ret: metadata = doc.fields text = metadata.pop(self._text_field) score = doc.score docs.append((Document(page_content=text, metadata=metadata), score)) return docs
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, batch_size: int = 25, partition: str = "default", **kwargs: Any, ) -> List[str]: """运行更多的文本通过嵌入并添加到向量存储中。 参数: texts:要添加到向量存储中的字符串的可迭代对象。 metadatas:与文本相关联的元数据的可选列表。 ids:与文本相关联的id的可选列表。 batch_size:更新文档的可选批量大小。 partition:集合中的一个分区名称。[可选]。 kwargs:向量存储特定参数 返回: 将文本添加到向量存储中的id列表。 """ self._create_partition_if_not_exists(partition) ids = ids or [str(uuid.uuid4().hex) for _ in texts] text_list = list(texts) for i in range(0, len(text_list), batch_size): # batch end end = min(i + batch_size, len(text_list)) batch_texts = text_list[i:end] batch_ids = ids[i:end] batch_embeddings = self._embedding.embed_documents(list(batch_texts)) # batch metadatas if metadatas: batch_metadatas = metadatas[i:end] else: batch_metadatas = [{} for _ in range(i, end)] for metadata, text in zip(batch_metadatas, batch_texts): metadata[self._text_field] = text # batch upsert to collection docs = list(zip(batch_ids, batch_embeddings, batch_metadatas)) ret = self._collection.upsert(docs, partition=partition) if not ret: raise ValueError( f"Fail to upsert docs to dashvector vector database," f"Error: {ret.message}" ) return ids
[docs] def delete( self, ids: Optional[List[str]] = None, partition: str = "default", **kwargs: Any ) -> bool: """根据向量ID删除。 参数: ids:要删除的ID列表。 partition:集合中的一个分区名称。[可选]。 返回: 如果删除成功,则为True, 否则为False。 """ return bool(self._collection.delete(ids, partition=partition))
[docs] def similarity_search_with_relevance_scores( self, query: str, k: int = 4, filter: Optional[str] = None, partition: str = "default", **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与查询文本最相似的文档,以及相关性分数。 Less表示更相似,more表示更不相似。 参数: query:输入文本 k:要返回的文档数量。默认为4。 filter:满足SQL where子句规范的文档字段过滤条件。 partition:集合中的一个分区名称。[可选]。 返回: 元组列表(doc,similarity_score) """ embedding = self._embedding.embed_query(query) return self._similarity_search_with_score_by_vector( embedding, k=k, filter=filter, partition=partition )
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[str] = None, partition: str = "default", **kwargs: Any, ) -> List[Document]: """返回与嵌入向量最相似的文档。 参数: embedding: 要查找与之相似的文档的嵌入。 k: 要返回的文档数量。默认为4。 filter: 满足SQL where子句规范的文档字段过滤条件。 partition: 集合中的一个分区名称。[可选]。 返回: 与查询向量最相似的文档列表。 """ docs_and_scores = self._similarity_search_with_score_by_vector( embedding, k, filter, partition ) return [doc for doc, _ in docs_and_scores]
[docs] def max_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[dict] = None, partition: str = "default", **kwargs: Any, ) -> List[Document]: """返回使用最大边际相关性选择的文档。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: embedding: 查找与之相似的文档的嵌入。 k: 要返回的文档数量。默认为4。 fetch_k: 要获取以传递给MMR算法的文档数量。 lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度, 其中0对应于最大多样性,1对应于最小多样性。 默认为0.5。 filter: 满足SQL where子句规范的文档字段过滤条件。 partition: 集合中的一个分区名称。[可选]。 返回: 通过最大边际相关性选择的文档列表。 """ # query by vector ret = self._collection.query( embedding, topk=fetch_k, filter=filter, partition=partition, include_vector=True, ) if not ret: raise ValueError( f"Fail to query docs by vector, error {self._collection.message}" ) candidate_embeddings = [doc.vector for doc in ret] mmr_selected = maximal_marginal_relevance( np.array(embedding), candidate_embeddings, lambda_mult, k ) metadatas = [ret.output[i].fields for i in mmr_selected] return [ Document(page_content=metadata.pop(self._text_field), metadata=metadata) for metadata in metadatas ]
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, dashvector_api_key: Optional[str] = None, dashvector_endpoint: Optional[str] = None, collection_name: str = "langchain", text_field: str = "text", batch_size: int = 25, ids: Optional[List[str]] = None, **kwargs: Any, ) -> DashVector: """返回从文本和嵌入初始化的DashVector VectorStore。 这是使用dashvector向量存储快速开始的方法。 示例: .. code-block:: python from langchain_community.vectorstores import DashVector from langchain_community.embeddings import OpenAIEmbeddings import dashvector embeddings = OpenAIEmbeddings() dashvector = DashVector.from_documents( docs, embeddings, dashvector_api_key="{DASHVECTOR_API_KEY}" ) """ try: import dashvector except ImportError: raise ImportError( "Could not import dashvector python package. " "Please install it with `pip install dashvector`." ) dashvector_api_key = dashvector_api_key or get_from_env( "dashvector_api_key", "DASHVECTOR_API_KEY" ) dashvector_endpoint = dashvector_endpoint or get_from_env( "dashvector_endpoint", "DASHVECTOR_ENDPOINT", default="dashvector.cn-hangzhou.aliyuncs.com", ) dashvector_client = dashvector.Client( api_key=dashvector_api_key, endpoint=dashvector_endpoint ) dashvector_client.delete(collection_name) collection = dashvector_client.get(collection_name) if not collection: dim = len(embedding.embed_query(texts[0])) # create collection if not existed resp = dashvector_client.create(collection_name, dimension=dim) if resp: collection = dashvector_client.get(collection_name) else: raise ValueError( "Fail to create collection. " f"Error: {resp.message}." ) dashvector_vector_db = cls(collection, embedding, text_field) dashvector_vector_db.add_texts(texts, metadatas, ids, batch_size) return dashvector_vector_db