Source code for langchain_community.vectorstores.elastic_vector_search

from __future__ import annotations

import uuid
import warnings
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    Iterable,
    List,
    Mapping,
    Optional,
    Tuple,
    Union,
)

from langchain_core._api import deprecated
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.utils import get_from_dict_or_env
from langchain_core.vectorstores import VectorStore

if TYPE_CHECKING:
    from elasticsearch import Elasticsearch


def _default_text_mapping(dim: int) -> Dict:
    return {
        "properties": {
            "text": {"type": "text"},
            "vector": {"type": "dense_vector", "dims": dim},
        }
    }


def _default_script_query(query_vector: List[float], filter: Optional[dict]) -> Dict:
    if filter:
        ((key, value),) = filter.items()
        filter = {"match": {f"metadata.{key}.keyword": f"{value}"}}
    else:
        filter = {"match_all": {}}
    return {
        "script_score": {
            "query": filter,
            "script": {
                "source": "cosineSimilarity(params.query_vector, 'vector') + 1.0",
                "params": {"query_vector": query_vector},
            },
        }
    }


[docs]@deprecated( "0.0.27", alternative="Use ElasticsearchStore class in langchain-elasticsearch package", pending=True, ) class ElasticVectorSearch(VectorStore): """ElasticVectorSearch 使用向量的暴力搜索方法。 建议改用 ElasticsearchStore,它提供了使用适用于大型数据集的近似 HNSW 算法的选项,性能更好。 ElasticsearchStore 还支持元数据过滤、自定义查询检索器等功能! 您可以在 ElasticsearchStore 上阅读更多信息: https://python.langchain.com/docs/integrations/vectorstores/elasticsearch 要连接到不需要登录凭据的 `Elasticsearch` 实例,请将 Elasticsearch URL 和索引名称以及嵌入对象传递给构造函数。 示例: ```python from langchain_community.vectorstores import ElasticVectorSearch from langchain_community.embeddings import OpenAIEmbeddings embedding = OpenAIEmbeddings() elastic_vector_search = ElasticVectorSearch( elasticsearch_url="http://localhost:9200", index_name="test_index", embedding=embedding ) ``` 要连接到需要登录凭据的 Elasticsearch 实例,包括 Elastic Cloud,请使用 Elasticsearch URL 格式 https://username:password@es_host:9243。例如,要连接到 Elastic Cloud,请使用所需的身份验证详细信息创建 Elasticsearch URL,并将其作为命名参数 elasticsearch_url 传递给 ElasticVectorSearch 构造函数。 您可以通过登录到 Elastic Cloud 控制台 https://cloud.elastic.co,选择部署,然后导航到“部署”页面来获取 Elastic Cloud URL 和登录凭据。 要获取默认“elastic”用户的 Elastic Cloud 密码: 1. 登录到 Elastic Cloud 控制台 https://cloud.elastic.co 2. 转到“Security” > “Users” 3. 找到“elastic”用户并单击“编辑” 4. 单击“重置密码” 5. 按照提示重置密码 Elastic Cloud URL 的格式为 https://username:password@cluster_id.region_id.gcp.cloud.es.io:9243。 示例: ```python from langchain_community.vectorstores import ElasticVectorSearch from langchain_community.embeddings import OpenAIEmbeddings embedding = OpenAIEmbeddings() elastic_host = "cluster_id.region_id.gcp.cloud.es.io" elasticsearch_url = f"https://username:password@{elastic_host}:9243" elastic_vector_search = ElasticVectorSearch( elasticsearch_url=elasticsearch_url, index_name="test_index", embedding=embedding ) ``` 参数: elasticsearch_url (str): Elasticsearch 实例的 URL。 index_name (str): 嵌入索引的 Elasticsearch 索引名称。 embedding (Embeddings): 提供文本嵌入能力的对象。 它应该是一个继承 Embeddings 抽象基类的类的实例,例如 OpenAIEmbeddings()。 引发: ValueError: 如果未安装 elasticsearch python 包。"""
[docs] def __init__( self, elasticsearch_url: str, index_name: str, embedding: Embeddings, *, ssl_verify: Optional[Dict[str, Any]] = None, ): """使用必要的组件进行初始化。""" warnings.warn( "ElasticVectorSearch will be removed in a future release. See" "Elasticsearch integration docs on how to upgrade." ) try: import elasticsearch except ImportError: raise ImportError( "Could not import elasticsearch python package. " "Please install it with `pip install elasticsearch`." ) self.embedding = embedding self.index_name = index_name _ssl_verify = ssl_verify or {} try: self.client = elasticsearch.Elasticsearch( elasticsearch_url, **_ssl_verify, headers={"user-agent": self.get_user_agent()}, ) except ValueError as e: raise ValueError( f"Your elasticsearch client string is mis-formatted. Got error: {e} " )
[docs] @staticmethod def get_user_agent() -> str: from langchain_community import __version__ return f"langchain-py-dvs/{__version__}"
@property def embeddings(self) -> Embeddings: return self.embedding
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, refresh_indices: bool = True, **kwargs: Any, ) -> List[str]: """运行更多文本通过嵌入并添加到向量存储。 参数: texts:要添加到向量存储的字符串的可迭代对象。 metadatas:与文本相关的元数据的可选列表。 ids:唯一ID的可选列表。 refresh_indices:刷新ElasticSearch索引的布尔值。 返回: 将文本添加到向量存储后的ID列表。 """ try: from elasticsearch.exceptions import NotFoundError from elasticsearch.helpers import bulk except ImportError: raise ImportError( "Could not import elasticsearch python package. " "Please install it with `pip install elasticsearch`." ) requests = [] ids = ids or [str(uuid.uuid4()) for _ in texts] embeddings = self.embedding.embed_documents(list(texts)) dim = len(embeddings[0]) mapping = _default_text_mapping(dim) # check to see if the index already exists try: self.client.indices.get(index=self.index_name) except NotFoundError: # TODO would be nice to create index before embedding, # just to save expensive steps for last self.create_index(self.client, self.index_name, mapping) for i, text in enumerate(texts): metadata = metadatas[i] if metadatas else {} request = { "_op_type": "index", "_index": self.index_name, "vector": embeddings[i], "text": text, "metadata": metadata, "_id": ids[i], } requests.append(request) bulk(self.client, requests) if refresh_indices: self.client.indices.refresh(index=self.index_name) return ids
[docs] def similarity_search_with_score( self, query: str, k: int = 4, filter: Optional[dict] = None, **kwargs: Any ) -> List[Tuple[Document, float]]: """返回与查询最相似的文档。 参数: query:要查找类似文档的文本。 k:要返回的文档数量。默认为4。 返回: 与查询最相似的文档列表。 """ embedding = self.embedding.embed_query(query) script_query = _default_script_query(embedding, filter) response = self.client_search( self.client, self.index_name, script_query, size=k ) hits = [hit for hit in response["hits"]["hits"]] docs_and_scores = [ ( Document( page_content=hit["_source"]["text"], metadata=hit["_source"]["metadata"], ), hit["_score"], ) for hit in hits ] return docs_and_scores
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, index_name: Optional[str] = None, refresh_indices: bool = True, **kwargs: Any, ) -> ElasticVectorSearch: """从原始文档构建ElasticVectorSearch包装器。 这是一个用户友好的接口,可以: 1. 嵌入文档。 2. 在Elasticsearch实例中为嵌入创建一个新的索引。 3. 将文档添加到新创建的Elasticsearch索引中。 这旨在是一个快速入门的方式。 示例: .. code-block:: python from langchain_community.vectorstores import ElasticVectorSearch from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() elastic_vector_search = ElasticVectorSearch.from_texts( texts, embeddings, elasticsearch_url="http://localhost:9200" ) """ elasticsearch_url = get_from_dict_or_env( kwargs, "elasticsearch_url", "ELASTICSEARCH_URL" ) if "elasticsearch_url" in kwargs: del kwargs["elasticsearch_url"] index_name = index_name or uuid.uuid4().hex vectorsearch = cls(elasticsearch_url, index_name, embedding, **kwargs) vectorsearch.add_texts( texts, metadatas=metadatas, ids=ids, refresh_indices=refresh_indices ) return vectorsearch
[docs] def create_index(self, client: Any, index_name: str, mapping: Dict) -> None: version_num = client.info()["version"]["number"][0] version_num = int(version_num) if version_num >= 8: client.indices.create(index=index_name, mappings=mapping) else: client.indices.create(index=index_name, body={"mappings": mapping})
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> None: """根据向量ID删除。 参数: ids:要删除的ID列表。 """ if ids is None: raise ValueError("No ids provided to delete.") # TODO: Check if this can be done in bulk for id in ids: self.client.delete(index=self.index_name, id=id)
[docs]@deprecated( "0.0.1", alternative="Use ElasticsearchStore class in langchain-elasticsearch package", pending=True, ) class ElasticKnnSearch(VectorStore): """[已弃用] 使用k-最近邻搜索(`k-NN`)向量存储的`Elasticsearch`。 建议改用`ElasticsearchStore`,它支持元数据过滤、自定义查询检索器等更多功能! 您可以在`ElasticsearchStore`中阅读更多信息: https://python.langchain.com/docs/integrations/vectorstores/elasticsearch 它创建了一个文本数据的Elasticsearch索引,可以使用k-NN搜索进行搜索。文本数据使用提供的嵌入模型转换为向量嵌入,并将这些嵌入存储在Elasticsearch索引中。 属性: index_name (str): Elasticsearch索引的名称。 embedding (Embeddings): 用于将文本数据转换为向量嵌入的嵌入模型。 es_connection (Elasticsearch, 可选): 现有的Elasticsearch连接。 es_cloud_id (str, 可选): 您的Elasticsearch服务部署的Cloud ID。 es_user (str, 可选): 您的Elasticsearch服务部署的用户名。 es_password (str, 可选): 您的Elasticsearch服务部署的密码。 vector_query_field (str, 可选): Elasticsearch索引中包含向量嵌入的字段的名称。 query_field (str, 可选): Elasticsearch索引中包含原始文本数据的字段的名称。 用法: >>> from embeddings import Embeddings >>> embedding = Embeddings.load('glove') >>> es_search = ElasticKnnSearch('my_index', embedding) >>> es_search.add_texts(['Hello world!', 'Another text']) >>> results = es_search.knn_search('Hello') [(Document(page_content='Hello world!', metadata={}), 0.9)]"""
[docs] def __init__( self, index_name: str, embedding: Embeddings, es_connection: Optional["Elasticsearch"] = None, es_cloud_id: Optional[str] = None, es_user: Optional[str] = None, es_password: Optional[str] = None, vector_query_field: Optional[str] = "vector", query_field: Optional[str] = "text", ): try: import elasticsearch except ImportError: raise ImportError( "Could not import elasticsearch python package. " "Please install it with `pip install elasticsearch`." ) warnings.warn( "ElasticKnnSearch will be removed in a future release." "Use ElasticsearchStore instead. See Elasticsearch " "integration docs on how to upgrade." ) self.embedding = embedding self.index_name = index_name self.query_field = query_field self.vector_query_field = vector_query_field # If a pre-existing Elasticsearch connection is provided, use it. if es_connection is not None: self.client = es_connection else: # If credentials for a new Elasticsearch connection are provided, # create a new connection. if es_cloud_id and es_user and es_password: self.client = elasticsearch.Elasticsearch( cloud_id=es_cloud_id, basic_auth=(es_user, es_password) ) else: raise ValueError( """Either provide a pre-existing Elasticsearch connection, \ or valid credentials for creating a new connection.""" )
@staticmethod def _default_knn_mapping( dims: int, similarity: Optional[str] = "dot_product" ) -> Dict: return { "properties": { "text": {"type": "text"}, "vector": { "type": "dense_vector", "dims": dims, "index": True, "similarity": similarity, }, } } def _default_knn_query( self, query_vector: Optional[List[float]] = None, query: Optional[str] = None, model_id: Optional[str] = None, k: Optional[int] = 10, num_candidates: Optional[int] = 10, ) -> Dict: knn: Dict = { "field": self.vector_query_field, "k": k, "num_candidates": num_candidates, } # Case 1: `query_vector` is provided, but not `model_id` -> use query_vector if query_vector and not model_id: knn["query_vector"] = query_vector # Case 2: `query` and `model_id` are provided, -> use query_vector_builder elif query and model_id: knn["query_vector_builder"] = { "text_embedding": { "model_id": model_id, # use 'model_id' argument "model_text": query, # use 'query' argument } } else: raise ValueError( "Either `query_vector` or `model_id` must be provided, but not both." ) return knn
[docs] def similarity_search_with_score( self, query: str, k: int = 10, **kwargs: Any ) -> List[Tuple[Document, float]]: """传递到`knn_search`,包括分数""" return self.knn_search(query=query, k=k, **kwargs)
[docs] def create_knn_index(self, mapping: Dict) -> None: """在Elasticsearch中创建一个新的k-NN索引。 参数: mapping (字典): 用于新索引的映射。 返回: """ self.client.indices.create(index=self.index_name, mappings=mapping)
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[Dict[Any, Any]]] = None, model_id: Optional[str] = None, refresh_indices: bool = False, **kwargs: Any, ) -> List[str]: """将一组文本添加到Elasticsearch索引中。 参数: texts (Iterable[str]): 要添加到索引中的文本。 metadatas (List[Dict[Any, Any]], optional): 与文本关联的元数据字典列表。 model_id (str, optional): 用于将文本转换为向量的模型的ID。 refresh_indices (bool, optional): 在添加文本后是否刷新Elasticsearch索引。 **kwargs: 任意关键字参数。 返回: 添加文本的ID列表。 """ # Check if the index exists. if not self.client.indices.exists(index=self.index_name): dims = kwargs.get("dims") if dims is None: raise ValueError("ElasticKnnSearch requires 'dims' parameter") similarity = kwargs.get("similarity") optional_args = {} if similarity is not None: optional_args["similarity"] = similarity mapping = self._default_knn_mapping(dims=dims, **optional_args) self.create_knn_index(mapping) embeddings = self.embedding.embed_documents(list(texts)) # body = [] body: List[Mapping[str, Any]] = [] for text, vector in zip(texts, embeddings): body.extend( [ {"index": {"_index": self.index_name}}, {"text": text, "vector": vector}, ] ) responses = self.client.bulk(operations=body) ids = [ item["index"]["_id"] for item in responses["items"] if item["index"]["result"] == "created" ] if refresh_indices: self.client.indices.refresh(index=self.index_name) return ids
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[Dict[Any, Any]]] = None, **kwargs: Any, ) -> ElasticKnnSearch: """创建一个新的ElasticKnnSearch实例,并将文本列表添加到Elasticsearch索引中。 参数: texts (List[str]): 要添加到索引中的文本。 embedding (Embeddings): 用于将文本转换为向量的嵌入模型。 metadatas (List[Dict[Any, Any]], optional): 与文本关联的元数据字典列表。 **kwargs: 任意关键字参数。 返回: 一个新的ElasticKnnSearch实例。 """ index_name = kwargs.get("index_name", str(uuid.uuid4())) es_connection = kwargs.get("es_connection") es_cloud_id = kwargs.get("es_cloud_id") es_user = kwargs.get("es_user") es_password = kwargs.get("es_password") vector_query_field = kwargs.get("vector_query_field", "vector") query_field = kwargs.get("query_field", "text") model_id = kwargs.get("model_id") dims = kwargs.get("dims") if dims is None: raise ValueError("ElasticKnnSearch requires 'dims' parameter") optional_args = {} if vector_query_field is not None: optional_args["vector_query_field"] = vector_query_field if query_field is not None: optional_args["query_field"] = query_field knnvectorsearch = cls( index_name=index_name, embedding=embedding, es_connection=es_connection, es_cloud_id=es_cloud_id, es_user=es_user, es_password=es_password, **optional_args, ) # Encode the provided texts and add them to the newly created index. knnvectorsearch.add_texts(texts, model_id=model_id, dims=dims, **optional_args) return knnvectorsearch