Source code for langchain_community.vectorstores.elasticsearch

import logging
import uuid
from abc import ABC, abstractmethod
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Literal,
    Optional,
    Tuple,
    Union,
)

import numpy as np
from langchain_core._api import deprecated
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore

from langchain_community.vectorstores.utils import (
    DistanceStrategy,
    maximal_marginal_relevance,
)

if TYPE_CHECKING:
    from elasticsearch import Elasticsearch

logger = logging.getLogger(__name__)


[docs]class BaseRetrievalStrategy(ABC): """用于`Elasticsearch`检索策略的基类。"""
[docs] @abstractmethod def query( self, query_vector: Union[List[float], None], query: Union[str, None], *, k: int, fetch_k: int, vector_query_field: str, text_field: str, filter: List[dict], similarity: Union[DistanceStrategy, None], ) -> Dict: """当在商店上执行搜索时执行。 参数: query_vector: 查询向量, 如果不使用基于向量的查询,则为None。 query: 文本查询,如果不使用基于文本的查询,则为None。 k: 要检索的结果总数。 fetch_k: 最初要获取的结果数。 vector_query_field: 索引中包含向量表示的字段。 text_field: 索引中包含文本数据的字段。 filter: 要应用于查询的过滤器子句列表。 similarity: 要使用的相似性策略,如果不使用则为None。 返回: 字典:Elasticsearch查询体。 """
[docs] @abstractmethod def index( self, dims_length: Union[int, None], vector_query_field: str, similarity: Union[DistanceStrategy, None], ) -> Dict: """当索引被创建时执行。 参数: dims_length: 嵌入向量的数值长度, 如果不使用基于向量的查询则为None。 vector_query_field: 包含向量表示的字段 在索引中。 similarity: 要使用的相似性策略, 如果不使用则为None。 返回: 字典:策略的Elasticsearch设置和映射。 """
[docs] def before_index_setup( self, client: "Elasticsearch", text_field: str, vector_query_field: str ) -> None: """在索引创建之前执行。用于设置任何必需的Elasticsearch资源,如管道。 参数: client:Elasticsearch客户端。 text_field:索引中包含文本数据的字段。 vector_query_field:索引中包含向量表示的字段。 """
[docs] def require_inference(self) -> bool: """返回策略是否需要在将文本添加到索引之前执行推理。 返回: 布尔值:策略是否需要在将文本添加到索引之前执行推理。 """ return True
[docs]@deprecated( "0.0.27", alternative="Use class in langchain-elasticsearch package", pending=True ) class ApproxRetrievalStrategy(BaseRetrievalStrategy): """使用`HNSW`算法进行近似检索策略。"""
[docs] def __init__( self, query_model_id: Optional[str] = None, hybrid: Optional[bool] = False, rrf: Optional[Union[dict, bool]] = True, ): self.query_model_id = query_model_id self.hybrid = hybrid # RRF has two optional parameters # 'rank_constant', 'window_size' # https://www.elastic.co/guide/en/elasticsearch/reference/current/rrf.html self.rrf = rrf
[docs] def query( self, query_vector: Union[List[float], None], query: Union[str, None], k: int, fetch_k: int, vector_query_field: str, text_field: str, filter: List[dict], similarity: Union[DistanceStrategy, None], ) -> Dict: knn = { "filter": filter, "field": vector_query_field, "k": k, "num_candidates": fetch_k, } # Embedding provided via the embedding function if query_vector and not self.query_model_id: knn["query_vector"] = query_vector # Case 2: Used when model has been deployed to # Elasticsearch and can infer the query vector from the query text elif query and self.query_model_id: knn["query_vector_builder"] = { "text_embedding": { "model_id": self.query_model_id, # use 'model_id' argument "model_text": query, # use 'query' argument } } else: raise ValueError( "You must provide an embedding function or a" " query_model_id to perform a similarity search." ) # If hybrid, add a query to the knn query # RRF is used to even the score from the knn query and text query # RRF has two optional parameters: {'rank_constant':int, 'window_size':int} # https://www.elastic.co/guide/en/elasticsearch/reference/current/rrf.html if self.hybrid: query_body = { "knn": knn, "query": { "bool": { "must": [ { "match": { text_field: { "query": query, } } } ], "filter": filter, } }, } if isinstance(self.rrf, dict): query_body["rank"] = {"rrf": self.rrf} elif isinstance(self.rrf, bool) and self.rrf is True: query_body["rank"] = {"rrf": {}} return query_body else: return {"knn": knn}
[docs] def index( self, dims_length: Union[int, None], vector_query_field: str, similarity: Union[DistanceStrategy, None], ) -> Dict: """为Elasticsearch索引创建映射。""" if similarity is DistanceStrategy.COSINE: similarityAlgo = "cosine" elif similarity is DistanceStrategy.EUCLIDEAN_DISTANCE: similarityAlgo = "l2_norm" elif similarity is DistanceStrategy.DOT_PRODUCT: similarityAlgo = "dot_product" elif similarity is DistanceStrategy.MAX_INNER_PRODUCT: similarityAlgo = "max_inner_product" else: raise ValueError(f"Similarity {similarity} not supported.") return { "mappings": { "properties": { vector_query_field: { "type": "dense_vector", "dims": dims_length, "index": True, "similarity": similarityAlgo, }, } } }
[docs]@deprecated( "0.0.27", alternative="Use class in langchain-elasticsearch package", pending=True ) class ExactRetrievalStrategy(BaseRetrievalStrategy): """使用`script_score`查询的精确检索策略。"""
[docs] def query( self, query_vector: Union[List[float], None], query: Union[str, None], k: int, fetch_k: int, vector_query_field: str, text_field: str, filter: Union[List[dict], None], similarity: Union[DistanceStrategy, None], ) -> Dict: if similarity is DistanceStrategy.COSINE: similarityAlgo = ( f"cosineSimilarity(params.query_vector, '{vector_query_field}') + 1.0" ) elif similarity is DistanceStrategy.EUCLIDEAN_DISTANCE: similarityAlgo = ( f"1 / (1 + l2norm(params.query_vector, '{vector_query_field}'))" ) elif similarity is DistanceStrategy.DOT_PRODUCT: similarityAlgo = f""" double value = dotProduct(params.query_vector, '{vector_query_field}'); return sigmoid(1, Math.E, -value); """ else: raise ValueError(f"Similarity {similarity} not supported.") queryBool: Dict = {"match_all": {}} if filter: queryBool = {"bool": {"filter": filter}} return { "query": { "script_score": { "query": queryBool, "script": { "source": similarityAlgo, "params": {"query_vector": query_vector}, }, }, } }
[docs] def index( self, dims_length: Union[int, None], vector_query_field: str, similarity: Union[DistanceStrategy, None], ) -> Dict: """为Elasticsearch索引创建映射。""" return { "mappings": { "properties": { vector_query_field: { "type": "dense_vector", "dims": dims_length, "index": False, }, } } }
[docs]@deprecated( "0.0.27", alternative="Use class in langchain-elasticsearch package", pending=True ) class SparseRetrievalStrategy(BaseRetrievalStrategy): """使用`text_expansion`处理器的稀疏检索策略。"""
[docs] def __init__(self, model_id: Optional[str] = None): self.model_id = model_id or ".elser_model_1"
[docs] def query( self, query_vector: Union[List[float], None], query: Union[str, None], k: int, fetch_k: int, vector_query_field: str, text_field: str, filter: List[dict], similarity: Union[DistanceStrategy, None], ) -> Dict: return { "query": { "bool": { "must": [ { "text_expansion": { f"{vector_query_field}.tokens": { "model_id": self.model_id, "model_text": query, } } } ], "filter": filter, } } }
def _get_pipeline_name(self) -> str: return f"{self.model_id}_sparse_embedding"
[docs] def before_index_setup( self, client: "Elasticsearch", text_field: str, vector_query_field: str ) -> None: # If model_id is provided, create a pipeline for the model if self.model_id: client.ingest.put_pipeline( id=self._get_pipeline_name(), description="Embedding pipeline for langchain vectorstore", processors=[ { "inference": { "model_id": self.model_id, "target_field": vector_query_field, "field_map": {text_field: "text_field"}, "inference_config": { "text_expansion": {"results_field": "tokens"} }, } } ], )
[docs] def index( self, dims_length: Union[int, None], vector_query_field: str, similarity: Union[DistanceStrategy, None], ) -> Dict: return { "mappings": { "properties": { vector_query_field: { "properties": {"tokens": {"type": "rank_features"}} } } }, "settings": {"default_pipeline": self._get_pipeline_name()}, }
[docs] def require_inference(self) -> bool: return False
[docs]@deprecated( "0.0.27", alternative="Use class in langchain-elasticsearch package", pending=True ) class ElasticsearchStore(VectorStore): """`Elasticsearch`向量存储。 示例: .. code-block:: python from langchain_community.vectorstores import ElasticsearchStore from langchain_community.embeddings.openai import OpenAIEmbeddings vectorstore = ElasticsearchStore( embedding=OpenAIEmbeddings(), index_name="langchain-demo", es_url="http://localhost:9200" ) 参数: index_name: 要创建的Elasticsearch索引的名称。 es_url: 要连接的Elasticsearch实例的URL。 cloud_id: 要连接的Elasticsearch实例的云ID。 es_user: 连接到Elasticsearch时要使用的用户名。 es_password: 连接到Elasticsearch时要使用的密码。 es_api_key: 连接到Elasticsearch时要使用的API密钥。 es_connection: 可选的现有Elasticsearch连接。 vector_query_field: 可选。用于存储嵌入向量的字段名称。 query_field: 可选。用于存储文本的字段名称。 strategy: 可选。在搜索索引时要使用的检索策略。 默认为ApproxRetrievalStrategy。可以是ExactRetrievalStrategy、ApproxRetrievalStrategy 或SparseRetrievalStrategy之一。 distance_strategy: 可选。在搜索索引时要使用的距离策略。 默认为COSINE。可以是COSINE、EUCLIDEAN_DISTANCE、MAX_INNER_PRODUCT或DOT_PRODUCT之一。 如果要使用云托管的Elasticsearch实例,可以传入cloud_id参数而不是es_url参数。 示例: .. code-block:: python from langchain_community.vectorstores import ElasticsearchStore from langchain_community.embeddings.openai import OpenAIEmbeddings vectorstore = ElasticsearchStore( embedding=OpenAIEmbeddings(), index_name="langchain-demo", es_cloud_id="<cloud_id>" es_user="elastic", es_password="<password>" ) 您还可以通过es_connection参数传入预先存在的Elasticsearch连接来连接到现有的Elasticsearch实例。 示例: .. code-block:: python from langchain_community.vectorstores import ElasticsearchStore from langchain_community.embeddings.openai import OpenAIEmbeddings from elasticsearch import Elasticsearch es_connection = Elasticsearch("http://localhost:9200") vectorstore = ElasticsearchStore( embedding=OpenAIEmbeddings(), index_name="langchain-demo", es_connection=es_connection ) ElasticsearchStore默认使用ApproxRetrievalStrategy,该策略使用HNSW算法执行近似最近邻搜索。这是最快速和最节省内存的算法。 如果要使用暴力/精确策略来搜索向量,可以将ExactRetrievalStrategy传递给ElasticsearchStore构造函数。 示例: .. code-block:: python from langchain_community.vectorstores import ElasticsearchStore from langchain_community.embeddings.openai import OpenAIEmbeddings vectorstore = ElasticsearchStore( embedding=OpenAIEmbeddings(), index_name="langchain-demo", es_url="http://localhost:9200", strategy=ElasticsearchStore.ExactRetrievalStrategy() ) 这两种策略都要求在创建索引时知道要使用的相似度度量。默认为余弦相似度,但也可以使用点积或欧氏距离。 示例: .. code-block:: python from langchain_community.vectorstores import ElasticsearchStore from langchain_community.embeddings.openai import OpenAIEmbeddings from langchain_community.vectorstores.utils import DistanceStrategy vectorstore = ElasticsearchStore( "langchain-demo", embedding=OpenAIEmbeddings(), es_url="http://localhost:9200", distance_strategy="DOT_PRODUCT" )"""
[docs] def __init__( self, index_name: str, *, embedding: Optional[Embeddings] = None, es_connection: Optional["Elasticsearch"] = None, es_url: Optional[str] = None, es_cloud_id: Optional[str] = None, es_user: Optional[str] = None, es_api_key: Optional[str] = None, es_password: Optional[str] = None, vector_query_field: str = "vector", query_field: str = "text", distance_strategy: Optional[ Literal[ DistanceStrategy.COSINE, DistanceStrategy.DOT_PRODUCT, DistanceStrategy.EUCLIDEAN_DISTANCE, DistanceStrategy.MAX_INNER_PRODUCT, ] ] = None, strategy: BaseRetrievalStrategy = ApproxRetrievalStrategy(), es_params: Optional[Dict[str, Any]] = None, ): self.embedding = embedding self.index_name = index_name self.query_field = query_field self.vector_query_field = vector_query_field self.distance_strategy = ( DistanceStrategy.COSINE if distance_strategy is None else DistanceStrategy[distance_strategy] ) self.strategy = strategy if es_connection is not None: headers = dict(es_connection._headers) headers.update({"user-agent": self.get_user_agent()}) self.client = es_connection.options(headers=headers) elif es_url is not None or es_cloud_id is not None: self.client = ElasticsearchStore.connect_to_elasticsearch( es_url=es_url, username=es_user, password=es_password, cloud_id=es_cloud_id, api_key=es_api_key, es_params=es_params, ) else: raise ValueError( """Either provide a pre-existing Elasticsearch connection, \ or valid credentials for creating a new connection.""" )
[docs] @staticmethod def get_user_agent() -> str: from langchain_community import __version__ return f"langchain-py-vs/{__version__}"
[docs] @staticmethod def connect_to_elasticsearch( *, es_url: Optional[str] = None, cloud_id: Optional[str] = None, api_key: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, es_params: Optional[Dict[str, Any]] = None, ) -> "Elasticsearch": try: import elasticsearch except ImportError: raise ImportError( "Could not import elasticsearch python package. " "Please install it with `pip install elasticsearch`." ) if es_url and cloud_id: raise ValueError( "Both es_url and cloud_id are defined. Please provide only one." ) connection_params: Dict[str, Any] = {} if es_url: connection_params["hosts"] = [es_url] elif cloud_id: connection_params["cloud_id"] = cloud_id else: raise ValueError("Please provide either elasticsearch_url or cloud_id.") if api_key: connection_params["api_key"] = api_key elif username and password: connection_params["basic_auth"] = (username, password) if es_params is not None: connection_params.update(es_params) es_client = elasticsearch.Elasticsearch( **connection_params, headers={"user-agent": ElasticsearchStore.get_user_agent()}, ) try: es_client.info() except Exception as e: logger.error(f"Error connecting to Elasticsearch: {e}") raise e return es_client
@property def embeddings(self) -> Optional[Embeddings]: return self.embedding @staticmethod def _identity_fn(score: float) -> float: return score def _select_relevance_score_fn(self) -> Callable[[float], float]: """“正确”的相关性函数可能会有所不同,具体取决于一些因素,包括: - VectorStore使用的距离/相似度度量 - 嵌入的规模(OpenAI的是单位规范化的,而许多其他嵌入则不是!) - 嵌入的维度 - 等等。 VectorStore应该根据自己定义基于相关性的选择方法。 """ # All scores from Elasticsearch are already normalized similarities: # https://www.elastic.co/guide/en/elasticsearch/reference/current/dense-vector.html#dense-vector-params return self._identity_fn
[docs] def similarity_search_with_score( self, query: str, k: int = 4, filter: Optional[List[dict]] = None, **kwargs: Any ) -> List[Tuple[Document, float]]: """返回与查询最相似的Elasticsearch文档,以及分数。 参数: query:要查找类似文档的文本。 k:要返回的文档数量。默认为4。 filter:要应用于查询的Elasticsearch过滤子句数组。 返回: 返回与查询最相似的文档列表,以及每个文档的分数。 """ if isinstance(self.strategy, ApproxRetrievalStrategy) and self.strategy.hybrid: raise ValueError("scores are currently not supported in hybrid mode") return self._search(query=query, k=k, filter=filter, **kwargs)
[docs] def similarity_search_by_vector_with_relevance_scores( self, embedding: List[float], k: int = 4, filter: Optional[List[Dict]] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与查询最相似的Elasticsearch文档,以及分数。 参数: embedding: 要查找与之相似文档的嵌入。 k: 要返回的文档数量。默认为4。 filter: 要应用于查询的Elasticsearch过滤子句数组。 返回: 与嵌入最相似的文档列表,以及每个文档的分数。 """ if isinstance(self.strategy, ApproxRetrievalStrategy) and self.strategy.hybrid: raise ValueError("scores are currently not supported in hybrid mode") return self._search(query_vector=embedding, k=k, filter=filter, **kwargs)
def _search( self, query: Optional[str] = None, k: int = 4, query_vector: Union[List[float], None] = None, fetch_k: int = 50, fields: Optional[List[str]] = None, filter: Optional[List[dict]] = None, custom_query: Optional[Callable[[Dict, Union[str, None]], Dict]] = None, doc_builder: Optional[Callable[[Dict], Document]] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与查询最相似的Elasticsearch文档,以及分数。 参数: query:要查找类似文档的文本。 k:要返回的文档数量。默认为4。 query_vector:要查找类似文档的嵌入。 fetch_k:从每个分片中获取的候选数。 默认为50。 fields:要从Elasticsearch返回的字段列表。 默认仅返回文本字段。 filter:要应用于查询的Elasticsearch过滤器子句数组。 custom_query:在将其发送到Elasticsearch之前修改Elasticsearch查询体的函数。 返回: 与查询最相似的文档列表,以及每个文档的分数。 """ if fields is None: fields = [] if "metadata" not in fields: fields.append("metadata") if self.query_field not in fields: fields.append(self.query_field) if self.embedding and query is not None: query_vector = self.embedding.embed_query(query) query_body = self.strategy.query( query_vector=query_vector, query=query, k=k, fetch_k=fetch_k, vector_query_field=self.vector_query_field, text_field=self.query_field, filter=filter or [], similarity=self.distance_strategy, ) logger.debug(f"Query body: {query_body}") if custom_query is not None: query_body = custom_query(query_body, query) logger.debug(f"Calling custom_query, Query body now: {query_body}") # Perform the kNN search on the Elasticsearch index and return the results. response = self.client.search( index=self.index_name, **query_body, size=k, source=fields, ) def default_doc_builder(hit: Dict) -> Document: return Document( page_content=hit["_source"].get(self.query_field, ""), metadata=hit["_source"]["metadata"], ) doc_builder = doc_builder or default_doc_builder docs_and_scores = [] for hit in response["hits"]["hits"]: for field in fields: if field in hit["_source"] and field not in [ "metadata", self.query_field, ]: if "metadata" not in hit["_source"]: hit["_source"]["metadata"] = {} hit["_source"]["metadata"][field] = hit["_source"][field] docs_and_scores.append( ( doc_builder(hit), hit["_score"], ) ) return docs_and_scores
[docs] def delete( self, ids: Optional[List[str]] = None, refresh_indices: Optional[bool] = True, **kwargs: Any, ) -> Optional[bool]: """从Elasticsearch索引中删除文档。 参数: ids:要删除的文档的id列表。 refresh_indices:在删除文档后是否刷新索引。默认为True。 """ try: from elasticsearch.helpers import BulkIndexError, bulk except ImportError: raise ImportError( "Could not import elasticsearch python package. " "Please install it with `pip install elasticsearch`." ) body = [] if ids is None: raise ValueError("ids must be provided.") for _id in ids: body.append({"_op_type": "delete", "_index": self.index_name, "_id": _id}) if len(body) > 0: try: bulk(self.client, body, refresh=refresh_indices, ignore_status=404) logger.debug(f"Deleted {len(body)} texts from index") return True except BulkIndexError as e: logger.error(f"Error deleting texts: {e}") firstError = e.errors[0].get("index", {}).get("error", {}) logger.error(f"First error reason: {firstError.get('reason')}") raise e else: logger.debug("No texts to delete from index") return False
def _create_index_if_not_exists( self, index_name: str, dims_length: Optional[int] = None ) -> None: """如果Elasticsearch索引不存在,则创建它。 参数: index_name:要创建的Elasticsearch索引的名称。 dims_length:嵌入向量的长度。 """ if self.client.indices.exists(index=index_name): logger.debug(f"Index {index_name} already exists. Skipping creation.") else: if dims_length is None and self.strategy.require_inference(): raise ValueError( "Cannot create index without specifying dims_length " "when the index doesn't already exist. We infer " "dims_length from the first embedding. Check that " "you have provided an embedding function." ) self.strategy.before_index_setup( client=self.client, text_field=self.query_field, vector_query_field=self.vector_query_field, ) indexSettings = self.strategy.index( vector_query_field=self.vector_query_field, dims_length=dims_length, similarity=self.distance_strategy, ) logger.debug( f"Creating index {index_name} with mappings {indexSettings['mappings']}" ) self.client.indices.create(index=index_name, **indexSettings) def __add( self, texts: Iterable[str], embeddings: Optional[List[List[float]]], metadatas: Optional[List[Dict[Any, Any]]] = None, ids: Optional[List[str]] = None, refresh_indices: bool = True, create_index_if_not_exists: bool = True, bulk_kwargs: Optional[Dict] = None, **kwargs: Any, ) -> List[str]: try: from elasticsearch.helpers import BulkIndexError, bulk except ImportError: raise ImportError( "Could not import elasticsearch python package. " "Please install it with `pip install elasticsearch`." ) bulk_kwargs = bulk_kwargs or {} ids = ids or [str(uuid.uuid4()) for _ in texts] requests = [] if create_index_if_not_exists: if embeddings: dims_length = len(embeddings[0]) else: dims_length = None self._create_index_if_not_exists( index_name=self.index_name, dims_length=dims_length ) for i, text in enumerate(texts): metadata = metadatas[i] if metadatas else {} request = { "_op_type": "index", "_index": self.index_name, self.query_field: text, "metadata": metadata, "_id": ids[i], } if embeddings: request[self.vector_query_field] = embeddings[i] requests.append(request) if len(requests) > 0: try: success, failed = bulk( self.client, requests, stats_only=True, refresh=refresh_indices, **bulk_kwargs, ) logger.debug( f"Added {success} and failed to add {failed} texts to index" ) logger.debug(f"added texts {ids} to index") return ids except BulkIndexError as e: logger.error(f"Error adding texts: {e}") firstError = e.errors[0].get("index", {}).get("error", {}) logger.error(f"First error reason: {firstError.get('reason')}") raise e else: logger.debug("No texts to add to index") return []
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[Dict[Any, Any]]] = None, ids: Optional[List[str]] = None, refresh_indices: bool = True, create_index_if_not_exists: bool = True, bulk_kwargs: Optional[Dict] = None, **kwargs: Any, ) -> List[str]: """运行更多的文本通过嵌入并添加到向量存储中。 参数: texts:要添加到向量存储中的字符串的可迭代对象。 metadatas:与文本相关联的元数据的可选列表。 ids:与文本关联的可选id列表。 refresh_indices:在添加文本后是否刷新Elasticsearch索引。 create_index_if_not_exists:如果索引不存在是否创建Elasticsearch索引。 *bulk_kwargs:传递给Elasticsearch批量操作的其他参数。 - chunk_size:可选。一次添加到索引的文本数量。默认为500。 返回: 将文本添加到向量存储中的id列表。 """ if self.embedding is not None: # If no search_type requires inference, we use the provided # embedding function to embed the texts. embeddings = self.embedding.embed_documents(list(texts)) else: # the search_type doesn't require inference, so we don't need to # embed the texts. embeddings = None return self.__add( texts, embeddings, metadatas=metadatas, ids=ids, refresh_indices=refresh_indices, create_index_if_not_exists=create_index_if_not_exists, bulk_kwargs=bulk_kwargs, kwargs=kwargs, )
[docs] def add_embeddings( self, text_embeddings: Iterable[Tuple[str, List[float]]], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, refresh_indices: bool = True, create_index_if_not_exists: bool = True, bulk_kwargs: Optional[Dict] = None, **kwargs: Any, ) -> List[str]: """将给定的文本和嵌入添加到向量存储中。 参数: text_embeddings:要添加到向量存储中的字符串和嵌入的可迭代对。 metadatas:与文本相关的元数据的可选列表。 ids:唯一ID的可选列表。 refresh_indices:在添加文本后是否刷新Elasticsearch索引。 create_index_if_not_exists:如果索引尚不存在,是否创建Elasticsearch索引。 *bulk_kwargs:传递给Elasticsearch批量操作的其他参数。 - chunk_size:可选。一次添加到索引的文本数量。默认为500。 返回: 将文本添加到向量存储中的ID列表。 """ texts, embeddings = zip(*text_embeddings) return self.__add( list(texts), list(embeddings), metadatas=metadatas, ids=ids, refresh_indices=refresh_indices, create_index_if_not_exists=create_index_if_not_exists, bulk_kwargs=bulk_kwargs, kwargs=kwargs, )
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Optional[Embeddings] = None, metadatas: Optional[List[Dict[str, Any]]] = None, bulk_kwargs: Optional[Dict] = None, **kwargs: Any, ) -> "ElasticsearchStore": """从原始文档构建ElasticsearchStore包装器。 示例: .. code-block:: python from langchain_community.vectorstores import ElasticsearchStore from langchain_community.embeddings.openai import OpenAIEmbeddings db = ElasticsearchStore.from_texts( texts, // 如果使用不需要推理的策略,则嵌入是可选的 embeddings, index_name="langchain-demo", es_url="http://localhost:9200" ) 参数: texts:要添加到Elasticsearch索引的文本列表。 embedding:用于嵌入文本的嵌入函数。 metadatas:与文本相关联的元数据的可选列表。 index_name:要创建的Elasticsearch索引的名称。 es_url:要连接的Elasticsearch实例的URL。 cloud_id:要连接的Elasticsearch实例的云ID。 es_user:连接到Elasticsearch时要使用的用户名。 es_password:连接到Elasticsearch时要使用的密码。 es_api_key:连接到Elasticsearch时要使用的API密钥。 es_connection:可选的现有Elasticsearch连接。 vector_query_field:可选。存储嵌入向量的字段名称。 query_field:可选。存储文本的字段名称。 distance_strategy:可选。要使用的距离策略的名称。默认为"COSINE"。可以是"COSINE"、"EUCLIDEAN_DISTANCE"、"DOT_PRODUCT"、"MAX_INNER_PRODUCT"之一。 bulk_kwargs:可选。传递给Elasticsearch批量操作的附加参数。 """ elasticsearchStore = ElasticsearchStore._create_cls_from_kwargs( embedding=embedding, **kwargs ) # Encode the provided texts and add them to the newly created index. elasticsearchStore.add_texts( texts, metadatas=metadatas, bulk_kwargs=bulk_kwargs ) return elasticsearchStore
@staticmethod def _create_cls_from_kwargs( embedding: Optional[Embeddings] = None, **kwargs: Any ) -> "ElasticsearchStore": index_name = kwargs.get("index_name") if index_name is None: raise ValueError("Please provide an index_name.") es_connection = kwargs.get("es_connection") es_cloud_id = kwargs.get("es_cloud_id") es_url = kwargs.get("es_url") es_user = kwargs.get("es_user") es_password = kwargs.get("es_password") es_api_key = kwargs.get("es_api_key") vector_query_field = kwargs.get("vector_query_field") query_field = kwargs.get("query_field") distance_strategy = kwargs.get("distance_strategy") strategy = kwargs.get("strategy", ElasticsearchStore.ApproxRetrievalStrategy()) optional_args = {} if vector_query_field is not None: optional_args["vector_query_field"] = vector_query_field if query_field is not None: optional_args["query_field"] = query_field return ElasticsearchStore( index_name=index_name, embedding=embedding, es_url=es_url, es_connection=es_connection, es_cloud_id=es_cloud_id, es_user=es_user, es_password=es_password, es_api_key=es_api_key, strategy=strategy, distance_strategy=distance_strategy, **optional_args, )
[docs] @classmethod def from_documents( cls, documents: List[Document], embedding: Optional[Embeddings] = None, bulk_kwargs: Optional[Dict] = None, **kwargs: Any, ) -> "ElasticsearchStore": """从文档构建ElasticsearchStore包装器。 示例: .. code-block:: python from langchain_community.vectorstores import ElasticsearchStore from langchain_community.embeddings.openai import OpenAIEmbeddings db = ElasticsearchStore.from_documents( texts, embeddings, index_name="langchain-demo", es_url="http://localhost:9200" ) 参数: texts: 要添加到Elasticsearch索引的文本列表。 embedding: 用于嵌入文本的嵌入函数。 如果使用不需要推理的策略,则不要提供。 metadatas: 与文本相关的元数据的可选列表。 index_name: 要创建的Elasticsearch索引的名称。 es_url: 要连接的Elasticsearch实例的URL。 cloud_id: 要连接的Elasticsearch实例的Cloud ID。 es_user: 连接到Elasticsearch时要使用的用户名。 es_password: 连接到Elasticsearch时要使用的密码。 es_api_key: 连接到Elasticsearch时要使用的API密钥。 es_connection: 可选的现有Elasticsearch连接。 vector_query_field: 可选。用于存储嵌入向量的字段名称。 query_field: 可选。用于存储文本的字段名称。 bulk_kwargs: 可选。传递给Elasticsearch批量操作的其他参数。 """ elasticsearchStore = ElasticsearchStore._create_cls_from_kwargs( embedding=embedding, **kwargs ) # Encode the provided texts and add them to the newly created index. elasticsearchStore.add_documents(documents, bulk_kwargs=bulk_kwargs) return elasticsearchStore
[docs] @staticmethod def ExactRetrievalStrategy() -> "ExactRetrievalStrategy": """用于通过脚本评分执行暴力/精确最近邻搜索。 """ return ExactRetrievalStrategy()
[docs] @staticmethod def ApproxRetrievalStrategy( query_model_id: Optional[str] = None, hybrid: Optional[bool] = False, rrf: Optional[Union[dict, bool]] = True, ) -> "ApproxRetrievalStrategy": """用于使用HNSW算法执行近似最近邻搜索。 在构建索引时,此策略将在索引中创建一个密集向量字段,并将嵌入向量存储在索引中。 在查询时,文本将使用提供的嵌入函数进行嵌入,或者将使用query_model_id来使用部署到Elasticsearch的模型对文本进行嵌入。 如果使用query_model_id,则不要提供嵌入函数。 参数: query_model_id:可选。用于在堆栈内嵌入查询文本的模型的ID。需要将嵌入模型部署到Elasticsearch。 hybrid:可选。如果为True,则将使用knn查询和文本查询执行混合搜索。默认为False。 rrf:可选。rrf是Reciprocal Rank Fusion。当`hybrid`为True时, 并且`rrf`为True时,然后rrf: {}。 并且`rrf`为False时,然后省略rrf。 并且isinstance(rrf, dict)为True时,然后传递字典值。 可以传递rrf以调整'rank_constant'和'window_size'。 """ return ApproxRetrievalStrategy( query_model_id=query_model_id, hybrid=hybrid, rrf=rrf )
[docs] @staticmethod def SparseVectorRetrievalStrategy( model_id: Optional[str] = None, ) -> "SparseRetrievalStrategy": """用于通过文本扩展执行稀疏向量搜索。 用于希望使用ELSER模型执行文档搜索时。 在构建索引时,此策略将创建一个流水线,将使用ELSER模型嵌入文本并将生成的标记存储在索引中。 在查询时,文本将使用ELSER模型嵌入,生成的标记将用于执行文本扩展查询。 参数: model_id:可选。默认为".elser_model_1"。 要用于嵌入查询文本的模型的ID 在Elasticsearch中部署嵌入模型。 """ return SparseRetrievalStrategy(model_id=model_id)