import logging
import uuid
from abc import ABC, abstractmethod
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Literal,
Optional,
Tuple,
Union,
)
import numpy as np
from langchain_core._api import deprecated
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
from langchain_community.vectorstores.utils import (
DistanceStrategy,
maximal_marginal_relevance,
)
if TYPE_CHECKING:
from elasticsearch import Elasticsearch
logger = logging.getLogger(__name__)
[docs]class BaseRetrievalStrategy(ABC):
"""用于`Elasticsearch`检索策略的基类。"""
[docs] @abstractmethod
def query(
self,
query_vector: Union[List[float], None],
query: Union[str, None],
*,
k: int,
fetch_k: int,
vector_query_field: str,
text_field: str,
filter: List[dict],
similarity: Union[DistanceStrategy, None],
) -> Dict:
"""当在商店上执行搜索时执行。
参数:
query_vector: 查询向量,
如果不使用基于向量的查询,则为None。
query: 文本查询,如果不使用基于文本的查询,则为None。
k: 要检索的结果总数。
fetch_k: 最初要获取的结果数。
vector_query_field: 索引中包含向量表示的字段。
text_field: 索引中包含文本数据的字段。
filter: 要应用于查询的过滤器子句列表。
similarity: 要使用的相似性策略,如果不使用则为None。
返回:
字典:Elasticsearch查询体。
"""
[docs] @abstractmethod
def index(
self,
dims_length: Union[int, None],
vector_query_field: str,
similarity: Union[DistanceStrategy, None],
) -> Dict:
"""当索引被创建时执行。
参数:
dims_length: 嵌入向量的数值长度,
如果不使用基于向量的查询则为None。
vector_query_field: 包含向量表示的字段
在索引中。
similarity: 要使用的相似性策略,
如果不使用则为None。
返回:
字典:策略的Elasticsearch设置和映射。
"""
[docs] def before_index_setup(
self, client: "Elasticsearch", text_field: str, vector_query_field: str
) -> None:
"""在索引创建之前执行。用于设置任何必需的Elasticsearch资源,如管道。
参数:
client:Elasticsearch客户端。
text_field:索引中包含文本数据的字段。
vector_query_field:索引中包含向量表示的字段。
"""
[docs] def require_inference(self) -> bool:
"""返回策略是否需要在将文本添加到索引之前执行推理。
返回:
布尔值:策略是否需要在将文本添加到索引之前执行推理。
"""
return True
[docs]@deprecated(
"0.0.27", alternative="Use class in langchain-elasticsearch package", pending=True
)
class ApproxRetrievalStrategy(BaseRetrievalStrategy):
"""使用`HNSW`算法进行近似检索策略。"""
[docs] def __init__(
self,
query_model_id: Optional[str] = None,
hybrid: Optional[bool] = False,
rrf: Optional[Union[dict, bool]] = True,
):
self.query_model_id = query_model_id
self.hybrid = hybrid
# RRF has two optional parameters
# 'rank_constant', 'window_size'
# https://www.elastic.co/guide/en/elasticsearch/reference/current/rrf.html
self.rrf = rrf
[docs] def query(
self,
query_vector: Union[List[float], None],
query: Union[str, None],
k: int,
fetch_k: int,
vector_query_field: str,
text_field: str,
filter: List[dict],
similarity: Union[DistanceStrategy, None],
) -> Dict:
knn = {
"filter": filter,
"field": vector_query_field,
"k": k,
"num_candidates": fetch_k,
}
# Embedding provided via the embedding function
if query_vector and not self.query_model_id:
knn["query_vector"] = query_vector
# Case 2: Used when model has been deployed to
# Elasticsearch and can infer the query vector from the query text
elif query and self.query_model_id:
knn["query_vector_builder"] = {
"text_embedding": {
"model_id": self.query_model_id, # use 'model_id' argument
"model_text": query, # use 'query' argument
}
}
else:
raise ValueError(
"You must provide an embedding function or a"
" query_model_id to perform a similarity search."
)
# If hybrid, add a query to the knn query
# RRF is used to even the score from the knn query and text query
# RRF has two optional parameters: {'rank_constant':int, 'window_size':int}
# https://www.elastic.co/guide/en/elasticsearch/reference/current/rrf.html
if self.hybrid:
query_body = {
"knn": knn,
"query": {
"bool": {
"must": [
{
"match": {
text_field: {
"query": query,
}
}
}
],
"filter": filter,
}
},
}
if isinstance(self.rrf, dict):
query_body["rank"] = {"rrf": self.rrf}
elif isinstance(self.rrf, bool) and self.rrf is True:
query_body["rank"] = {"rrf": {}}
return query_body
else:
return {"knn": knn}
[docs] def index(
self,
dims_length: Union[int, None],
vector_query_field: str,
similarity: Union[DistanceStrategy, None],
) -> Dict:
"""为Elasticsearch索引创建映射。"""
if similarity is DistanceStrategy.COSINE:
similarityAlgo = "cosine"
elif similarity is DistanceStrategy.EUCLIDEAN_DISTANCE:
similarityAlgo = "l2_norm"
elif similarity is DistanceStrategy.DOT_PRODUCT:
similarityAlgo = "dot_product"
elif similarity is DistanceStrategy.MAX_INNER_PRODUCT:
similarityAlgo = "max_inner_product"
else:
raise ValueError(f"Similarity {similarity} not supported.")
return {
"mappings": {
"properties": {
vector_query_field: {
"type": "dense_vector",
"dims": dims_length,
"index": True,
"similarity": similarityAlgo,
},
}
}
}
[docs]@deprecated(
"0.0.27", alternative="Use class in langchain-elasticsearch package", pending=True
)
class ExactRetrievalStrategy(BaseRetrievalStrategy):
"""使用`script_score`查询的精确检索策略。"""
[docs] def query(
self,
query_vector: Union[List[float], None],
query: Union[str, None],
k: int,
fetch_k: int,
vector_query_field: str,
text_field: str,
filter: Union[List[dict], None],
similarity: Union[DistanceStrategy, None],
) -> Dict:
if similarity is DistanceStrategy.COSINE:
similarityAlgo = (
f"cosineSimilarity(params.query_vector, '{vector_query_field}') + 1.0"
)
elif similarity is DistanceStrategy.EUCLIDEAN_DISTANCE:
similarityAlgo = (
f"1 / (1 + l2norm(params.query_vector, '{vector_query_field}'))"
)
elif similarity is DistanceStrategy.DOT_PRODUCT:
similarityAlgo = f"""
double value = dotProduct(params.query_vector, '{vector_query_field}');
return sigmoid(1, Math.E, -value);
"""
else:
raise ValueError(f"Similarity {similarity} not supported.")
queryBool: Dict = {"match_all": {}}
if filter:
queryBool = {"bool": {"filter": filter}}
return {
"query": {
"script_score": {
"query": queryBool,
"script": {
"source": similarityAlgo,
"params": {"query_vector": query_vector},
},
},
}
}
[docs] def index(
self,
dims_length: Union[int, None],
vector_query_field: str,
similarity: Union[DistanceStrategy, None],
) -> Dict:
"""为Elasticsearch索引创建映射。"""
return {
"mappings": {
"properties": {
vector_query_field: {
"type": "dense_vector",
"dims": dims_length,
"index": False,
},
}
}
}
[docs]@deprecated(
"0.0.27", alternative="Use class in langchain-elasticsearch package", pending=True
)
class SparseRetrievalStrategy(BaseRetrievalStrategy):
"""使用`text_expansion`处理器的稀疏检索策略。"""
[docs] def __init__(self, model_id: Optional[str] = None):
self.model_id = model_id or ".elser_model_1"
[docs] def query(
self,
query_vector: Union[List[float], None],
query: Union[str, None],
k: int,
fetch_k: int,
vector_query_field: str,
text_field: str,
filter: List[dict],
similarity: Union[DistanceStrategy, None],
) -> Dict:
return {
"query": {
"bool": {
"must": [
{
"text_expansion": {
f"{vector_query_field}.tokens": {
"model_id": self.model_id,
"model_text": query,
}
}
}
],
"filter": filter,
}
}
}
def _get_pipeline_name(self) -> str:
return f"{self.model_id}_sparse_embedding"
[docs] def before_index_setup(
self, client: "Elasticsearch", text_field: str, vector_query_field: str
) -> None:
# If model_id is provided, create a pipeline for the model
if self.model_id:
client.ingest.put_pipeline(
id=self._get_pipeline_name(),
description="Embedding pipeline for langchain vectorstore",
processors=[
{
"inference": {
"model_id": self.model_id,
"target_field": vector_query_field,
"field_map": {text_field: "text_field"},
"inference_config": {
"text_expansion": {"results_field": "tokens"}
},
}
}
],
)
[docs] def index(
self,
dims_length: Union[int, None],
vector_query_field: str,
similarity: Union[DistanceStrategy, None],
) -> Dict:
return {
"mappings": {
"properties": {
vector_query_field: {
"properties": {"tokens": {"type": "rank_features"}}
}
}
},
"settings": {"default_pipeline": self._get_pipeline_name()},
}
[docs] def require_inference(self) -> bool:
return False
[docs]@deprecated(
"0.0.27", alternative="Use class in langchain-elasticsearch package", pending=True
)
class ElasticsearchStore(VectorStore):
"""`Elasticsearch`向量存储。
示例:
.. code-block:: python
from langchain_community.vectorstores import ElasticsearchStore
from langchain_community.embeddings.openai import OpenAIEmbeddings
vectorstore = ElasticsearchStore(
embedding=OpenAIEmbeddings(),
index_name="langchain-demo",
es_url="http://localhost:9200"
)
参数:
index_name: 要创建的Elasticsearch索引的名称。
es_url: 要连接的Elasticsearch实例的URL。
cloud_id: 要连接的Elasticsearch实例的云ID。
es_user: 连接到Elasticsearch时要使用的用户名。
es_password: 连接到Elasticsearch时要使用的密码。
es_api_key: 连接到Elasticsearch时要使用的API密钥。
es_connection: 可选的现有Elasticsearch连接。
vector_query_field: 可选。用于存储嵌入向量的字段名称。
query_field: 可选。用于存储文本的字段名称。
strategy: 可选。在搜索索引时要使用的检索策略。
默认为ApproxRetrievalStrategy。可以是ExactRetrievalStrategy、ApproxRetrievalStrategy
或SparseRetrievalStrategy之一。
distance_strategy: 可选。在搜索索引时要使用的距离策略。
默认为COSINE。可以是COSINE、EUCLIDEAN_DISTANCE、MAX_INNER_PRODUCT或DOT_PRODUCT之一。
如果要使用云托管的Elasticsearch实例,可以传入cloud_id参数而不是es_url参数。
示例:
.. code-block:: python
from langchain_community.vectorstores import ElasticsearchStore
from langchain_community.embeddings.openai import OpenAIEmbeddings
vectorstore = ElasticsearchStore(
embedding=OpenAIEmbeddings(),
index_name="langchain-demo",
es_cloud_id="<cloud_id>"
es_user="elastic",
es_password="<password>"
)
您还可以通过es_connection参数传入预先存在的Elasticsearch连接来连接到现有的Elasticsearch实例。
示例:
.. code-block:: python
from langchain_community.vectorstores import ElasticsearchStore
from langchain_community.embeddings.openai import OpenAIEmbeddings
from elasticsearch import Elasticsearch
es_connection = Elasticsearch("http://localhost:9200")
vectorstore = ElasticsearchStore(
embedding=OpenAIEmbeddings(),
index_name="langchain-demo",
es_connection=es_connection
)
ElasticsearchStore默认使用ApproxRetrievalStrategy,该策略使用HNSW算法执行近似最近邻搜索。这是最快速和最节省内存的算法。
如果要使用暴力/精确策略来搜索向量,可以将ExactRetrievalStrategy传递给ElasticsearchStore构造函数。
示例:
.. code-block:: python
from langchain_community.vectorstores import ElasticsearchStore
from langchain_community.embeddings.openai import OpenAIEmbeddings
vectorstore = ElasticsearchStore(
embedding=OpenAIEmbeddings(),
index_name="langchain-demo",
es_url="http://localhost:9200",
strategy=ElasticsearchStore.ExactRetrievalStrategy()
)
这两种策略都要求在创建索引时知道要使用的相似度度量。默认为余弦相似度,但也可以使用点积或欧氏距离。
示例:
.. code-block:: python
from langchain_community.vectorstores import ElasticsearchStore
from langchain_community.embeddings.openai import OpenAIEmbeddings
from langchain_community.vectorstores.utils import DistanceStrategy
vectorstore = ElasticsearchStore(
"langchain-demo",
embedding=OpenAIEmbeddings(),
es_url="http://localhost:9200",
distance_strategy="DOT_PRODUCT"
)"""
[docs] def __init__(
self,
index_name: str,
*,
embedding: Optional[Embeddings] = None,
es_connection: Optional["Elasticsearch"] = None,
es_url: Optional[str] = None,
es_cloud_id: Optional[str] = None,
es_user: Optional[str] = None,
es_api_key: Optional[str] = None,
es_password: Optional[str] = None,
vector_query_field: str = "vector",
query_field: str = "text",
distance_strategy: Optional[
Literal[
DistanceStrategy.COSINE,
DistanceStrategy.DOT_PRODUCT,
DistanceStrategy.EUCLIDEAN_DISTANCE,
DistanceStrategy.MAX_INNER_PRODUCT,
]
] = None,
strategy: BaseRetrievalStrategy = ApproxRetrievalStrategy(),
es_params: Optional[Dict[str, Any]] = None,
):
self.embedding = embedding
self.index_name = index_name
self.query_field = query_field
self.vector_query_field = vector_query_field
self.distance_strategy = (
DistanceStrategy.COSINE
if distance_strategy is None
else DistanceStrategy[distance_strategy]
)
self.strategy = strategy
if es_connection is not None:
headers = dict(es_connection._headers)
headers.update({"user-agent": self.get_user_agent()})
self.client = es_connection.options(headers=headers)
elif es_url is not None or es_cloud_id is not None:
self.client = ElasticsearchStore.connect_to_elasticsearch(
es_url=es_url,
username=es_user,
password=es_password,
cloud_id=es_cloud_id,
api_key=es_api_key,
es_params=es_params,
)
else:
raise ValueError(
"""Either provide a pre-existing Elasticsearch connection, \
or valid credentials for creating a new connection."""
)
[docs] @staticmethod
def get_user_agent() -> str:
from langchain_community import __version__
return f"langchain-py-vs/{__version__}"
[docs] @staticmethod
def connect_to_elasticsearch(
*,
es_url: Optional[str] = None,
cloud_id: Optional[str] = None,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
es_params: Optional[Dict[str, Any]] = None,
) -> "Elasticsearch":
try:
import elasticsearch
except ImportError:
raise ImportError(
"Could not import elasticsearch python package. "
"Please install it with `pip install elasticsearch`."
)
if es_url and cloud_id:
raise ValueError(
"Both es_url and cloud_id are defined. Please provide only one."
)
connection_params: Dict[str, Any] = {}
if es_url:
connection_params["hosts"] = [es_url]
elif cloud_id:
connection_params["cloud_id"] = cloud_id
else:
raise ValueError("Please provide either elasticsearch_url or cloud_id.")
if api_key:
connection_params["api_key"] = api_key
elif username and password:
connection_params["basic_auth"] = (username, password)
if es_params is not None:
connection_params.update(es_params)
es_client = elasticsearch.Elasticsearch(
**connection_params,
headers={"user-agent": ElasticsearchStore.get_user_agent()},
)
try:
es_client.info()
except Exception as e:
logger.error(f"Error connecting to Elasticsearch: {e}")
raise e
return es_client
@property
def embeddings(self) -> Optional[Embeddings]:
return self.embedding
[docs] def similarity_search(
self,
query: str,
k: int = 4,
fetch_k: int = 50,
filter: Optional[List[dict]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与查询最相似的Elasticsearch文档。
参数:
query: 要查找与之相似的文档的文本。
k: 要返回的文档数量。默认为4。
fetch_k (int): 要获取的文档数量,以传递给knn num_candidates。
filter: 要应用于查询的Elasticsearch过滤器子句数组。
返回:
与查询最相似的文档列表,
按相似度降序排列。
"""
results = self._search(
query=query, k=k, fetch_k=fetch_k, filter=filter, **kwargs
)
return [doc for doc, _ in results]
[docs] def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
fields: Optional[List[str]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
query (str): 要查找类似文档的文本。
k (int): 要返回的文档数量。默认为4。
fetch_k (int): 要获取以传递给MMR算法的文档数量。
lambda_mult (float): 0到1之间的数字,确定结果之间多样性的程度,
0对应最大多样性,1对应最小多样性。默认为0.5。
fields: 从elasticsearch源获取的其他字段。这些字段将添加到文档元数据中。
返回:
List[Document]: 通过最大边际相关性选择的文档列表。
"""
if self.embedding is None:
raise ValueError("You must provide an embedding function to perform MMR")
remove_vector_query_field_from_metadata = True
if fields is None:
fields = [self.vector_query_field]
elif self.vector_query_field not in fields:
fields.append(self.vector_query_field)
else:
remove_vector_query_field_from_metadata = False
# Embed the query
query_embedding = self.embedding.embed_query(query)
# Fetch the initial documents
got_docs = self._search(
query_vector=query_embedding, k=fetch_k, fields=fields, **kwargs
)
# Get the embeddings for the fetched documents
got_embeddings = [doc.metadata[self.vector_query_field] for doc, _ in got_docs]
# Select documents using maximal marginal relevance
selected_indices = maximal_marginal_relevance(
np.array(query_embedding), got_embeddings, lambda_mult=lambda_mult, k=k
)
selected_docs = [got_docs[i][0] for i in selected_indices]
if remove_vector_query_field_from_metadata:
for doc in selected_docs:
del doc.metadata[self.vector_query_field]
return selected_docs
@staticmethod
def _identity_fn(score: float) -> float:
return score
def _select_relevance_score_fn(self) -> Callable[[float], float]:
"""“正确”的相关性函数可能会有所不同,具体取决于一些因素,包括:
- VectorStore使用的距离/相似度度量
- 嵌入的规模(OpenAI的是单位规范化的,而许多其他嵌入则不是!)
- 嵌入的维度
- 等等。
VectorStore应该根据自己定义基于相关性的选择方法。
"""
# All scores from Elasticsearch are already normalized similarities:
# https://www.elastic.co/guide/en/elasticsearch/reference/current/dense-vector.html#dense-vector-params
return self._identity_fn
[docs] def similarity_search_with_score(
self, query: str, k: int = 4, filter: Optional[List[dict]] = None, **kwargs: Any
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的Elasticsearch文档,以及分数。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
filter:要应用于查询的Elasticsearch过滤子句数组。
返回:
返回与查询最相似的文档列表,以及每个文档的分数。
"""
if isinstance(self.strategy, ApproxRetrievalStrategy) and self.strategy.hybrid:
raise ValueError("scores are currently not supported in hybrid mode")
return self._search(query=query, k=k, filter=filter, **kwargs)
[docs] def similarity_search_by_vector_with_relevance_scores(
self,
embedding: List[float],
k: int = 4,
filter: Optional[List[Dict]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的Elasticsearch文档,以及分数。
参数:
embedding: 要查找与之相似文档的嵌入。
k: 要返回的文档数量。默认为4。
filter: 要应用于查询的Elasticsearch过滤子句数组。
返回:
与嵌入最相似的文档列表,以及每个文档的分数。
"""
if isinstance(self.strategy, ApproxRetrievalStrategy) and self.strategy.hybrid:
raise ValueError("scores are currently not supported in hybrid mode")
return self._search(query_vector=embedding, k=k, filter=filter, **kwargs)
def _search(
self,
query: Optional[str] = None,
k: int = 4,
query_vector: Union[List[float], None] = None,
fetch_k: int = 50,
fields: Optional[List[str]] = None,
filter: Optional[List[dict]] = None,
custom_query: Optional[Callable[[Dict, Union[str, None]], Dict]] = None,
doc_builder: Optional[Callable[[Dict], Document]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的Elasticsearch文档,以及分数。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
query_vector:要查找类似文档的嵌入。
fetch_k:从每个分片中获取的候选数。
默认为50。
fields:要从Elasticsearch返回的字段列表。
默认仅返回文本字段。
filter:要应用于查询的Elasticsearch过滤器子句数组。
custom_query:在将其发送到Elasticsearch之前修改Elasticsearch查询体的函数。
返回:
与查询最相似的文档列表,以及每个文档的分数。
"""
if fields is None:
fields = []
if "metadata" not in fields:
fields.append("metadata")
if self.query_field not in fields:
fields.append(self.query_field)
if self.embedding and query is not None:
query_vector = self.embedding.embed_query(query)
query_body = self.strategy.query(
query_vector=query_vector,
query=query,
k=k,
fetch_k=fetch_k,
vector_query_field=self.vector_query_field,
text_field=self.query_field,
filter=filter or [],
similarity=self.distance_strategy,
)
logger.debug(f"Query body: {query_body}")
if custom_query is not None:
query_body = custom_query(query_body, query)
logger.debug(f"Calling custom_query, Query body now: {query_body}")
# Perform the kNN search on the Elasticsearch index and return the results.
response = self.client.search(
index=self.index_name,
**query_body,
size=k,
source=fields,
)
def default_doc_builder(hit: Dict) -> Document:
return Document(
page_content=hit["_source"].get(self.query_field, ""),
metadata=hit["_source"]["metadata"],
)
doc_builder = doc_builder or default_doc_builder
docs_and_scores = []
for hit in response["hits"]["hits"]:
for field in fields:
if field in hit["_source"] and field not in [
"metadata",
self.query_field,
]:
if "metadata" not in hit["_source"]:
hit["_source"]["metadata"] = {}
hit["_source"]["metadata"][field] = hit["_source"][field]
docs_and_scores.append(
(
doc_builder(hit),
hit["_score"],
)
)
return docs_and_scores
[docs] def delete(
self,
ids: Optional[List[str]] = None,
refresh_indices: Optional[bool] = True,
**kwargs: Any,
) -> Optional[bool]:
"""从Elasticsearch索引中删除文档。
参数:
ids:要删除的文档的id列表。
refresh_indices:在删除文档后是否刷新索引。默认为True。
"""
try:
from elasticsearch.helpers import BulkIndexError, bulk
except ImportError:
raise ImportError(
"Could not import elasticsearch python package. "
"Please install it with `pip install elasticsearch`."
)
body = []
if ids is None:
raise ValueError("ids must be provided.")
for _id in ids:
body.append({"_op_type": "delete", "_index": self.index_name, "_id": _id})
if len(body) > 0:
try:
bulk(self.client, body, refresh=refresh_indices, ignore_status=404)
logger.debug(f"Deleted {len(body)} texts from index")
return True
except BulkIndexError as e:
logger.error(f"Error deleting texts: {e}")
firstError = e.errors[0].get("index", {}).get("error", {})
logger.error(f"First error reason: {firstError.get('reason')}")
raise e
else:
logger.debug("No texts to delete from index")
return False
def _create_index_if_not_exists(
self, index_name: str, dims_length: Optional[int] = None
) -> None:
"""如果Elasticsearch索引不存在,则创建它。
参数:
index_name:要创建的Elasticsearch索引的名称。
dims_length:嵌入向量的长度。
"""
if self.client.indices.exists(index=index_name):
logger.debug(f"Index {index_name} already exists. Skipping creation.")
else:
if dims_length is None and self.strategy.require_inference():
raise ValueError(
"Cannot create index without specifying dims_length "
"when the index doesn't already exist. We infer "
"dims_length from the first embedding. Check that "
"you have provided an embedding function."
)
self.strategy.before_index_setup(
client=self.client,
text_field=self.query_field,
vector_query_field=self.vector_query_field,
)
indexSettings = self.strategy.index(
vector_query_field=self.vector_query_field,
dims_length=dims_length,
similarity=self.distance_strategy,
)
logger.debug(
f"Creating index {index_name} with mappings {indexSettings['mappings']}"
)
self.client.indices.create(index=index_name, **indexSettings)
def __add(
self,
texts: Iterable[str],
embeddings: Optional[List[List[float]]],
metadatas: Optional[List[Dict[Any, Any]]] = None,
ids: Optional[List[str]] = None,
refresh_indices: bool = True,
create_index_if_not_exists: bool = True,
bulk_kwargs: Optional[Dict] = None,
**kwargs: Any,
) -> List[str]:
try:
from elasticsearch.helpers import BulkIndexError, bulk
except ImportError:
raise ImportError(
"Could not import elasticsearch python package. "
"Please install it with `pip install elasticsearch`."
)
bulk_kwargs = bulk_kwargs or {}
ids = ids or [str(uuid.uuid4()) for _ in texts]
requests = []
if create_index_if_not_exists:
if embeddings:
dims_length = len(embeddings[0])
else:
dims_length = None
self._create_index_if_not_exists(
index_name=self.index_name, dims_length=dims_length
)
for i, text in enumerate(texts):
metadata = metadatas[i] if metadatas else {}
request = {
"_op_type": "index",
"_index": self.index_name,
self.query_field: text,
"metadata": metadata,
"_id": ids[i],
}
if embeddings:
request[self.vector_query_field] = embeddings[i]
requests.append(request)
if len(requests) > 0:
try:
success, failed = bulk(
self.client,
requests,
stats_only=True,
refresh=refresh_indices,
**bulk_kwargs,
)
logger.debug(
f"Added {success} and failed to add {failed} texts to index"
)
logger.debug(f"added texts {ids} to index")
return ids
except BulkIndexError as e:
logger.error(f"Error adding texts: {e}")
firstError = e.errors[0].get("index", {}).get("error", {})
logger.error(f"First error reason: {firstError.get('reason')}")
raise e
else:
logger.debug("No texts to add to index")
return []
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[Dict[Any, Any]]] = None,
ids: Optional[List[str]] = None,
refresh_indices: bool = True,
create_index_if_not_exists: bool = True,
bulk_kwargs: Optional[Dict] = None,
**kwargs: Any,
) -> List[str]:
"""运行更多的文本通过嵌入并添加到向量存储中。
参数:
texts:要添加到向量存储中的字符串的可迭代对象。
metadatas:与文本相关联的元数据的可选列表。
ids:与文本关联的可选id列表。
refresh_indices:在添加文本后是否刷新Elasticsearch索引。
create_index_if_not_exists:如果索引不存在是否创建Elasticsearch索引。
*bulk_kwargs:传递给Elasticsearch批量操作的其他参数。
- chunk_size:可选。一次添加到索引的文本数量。默认为500。
返回:
将文本添加到向量存储中的id列表。
"""
if self.embedding is not None:
# If no search_type requires inference, we use the provided
# embedding function to embed the texts.
embeddings = self.embedding.embed_documents(list(texts))
else:
# the search_type doesn't require inference, so we don't need to
# embed the texts.
embeddings = None
return self.__add(
texts,
embeddings,
metadatas=metadatas,
ids=ids,
refresh_indices=refresh_indices,
create_index_if_not_exists=create_index_if_not_exists,
bulk_kwargs=bulk_kwargs,
kwargs=kwargs,
)
[docs] def add_embeddings(
self,
text_embeddings: Iterable[Tuple[str, List[float]]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
refresh_indices: bool = True,
create_index_if_not_exists: bool = True,
bulk_kwargs: Optional[Dict] = None,
**kwargs: Any,
) -> List[str]:
"""将给定的文本和嵌入添加到向量存储中。
参数:
text_embeddings:要添加到向量存储中的字符串和嵌入的可迭代对。
metadatas:与文本相关的元数据的可选列表。
ids:唯一ID的可选列表。
refresh_indices:在添加文本后是否刷新Elasticsearch索引。
create_index_if_not_exists:如果索引尚不存在,是否创建Elasticsearch索引。
*bulk_kwargs:传递给Elasticsearch批量操作的其他参数。
- chunk_size:可选。一次添加到索引的文本数量。默认为500。
返回:
将文本添加到向量存储中的ID列表。
"""
texts, embeddings = zip(*text_embeddings)
return self.__add(
list(texts),
list(embeddings),
metadatas=metadatas,
ids=ids,
refresh_indices=refresh_indices,
create_index_if_not_exists=create_index_if_not_exists,
bulk_kwargs=bulk_kwargs,
kwargs=kwargs,
)
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Optional[Embeddings] = None,
metadatas: Optional[List[Dict[str, Any]]] = None,
bulk_kwargs: Optional[Dict] = None,
**kwargs: Any,
) -> "ElasticsearchStore":
"""从原始文档构建ElasticsearchStore包装器。
示例:
.. code-block:: python
from langchain_community.vectorstores import ElasticsearchStore
from langchain_community.embeddings.openai import OpenAIEmbeddings
db = ElasticsearchStore.from_texts(
texts,
// 如果使用不需要推理的策略,则嵌入是可选的
embeddings,
index_name="langchain-demo",
es_url="http://localhost:9200"
)
参数:
texts:要添加到Elasticsearch索引的文本列表。
embedding:用于嵌入文本的嵌入函数。
metadatas:与文本相关联的元数据的可选列表。
index_name:要创建的Elasticsearch索引的名称。
es_url:要连接的Elasticsearch实例的URL。
cloud_id:要连接的Elasticsearch实例的云ID。
es_user:连接到Elasticsearch时要使用的用户名。
es_password:连接到Elasticsearch时要使用的密码。
es_api_key:连接到Elasticsearch时要使用的API密钥。
es_connection:可选的现有Elasticsearch连接。
vector_query_field:可选。存储嵌入向量的字段名称。
query_field:可选。存储文本的字段名称。
distance_strategy:可选。要使用的距离策略的名称。默认为"COSINE"。可以是"COSINE"、"EUCLIDEAN_DISTANCE"、"DOT_PRODUCT"、"MAX_INNER_PRODUCT"之一。
bulk_kwargs:可选。传递给Elasticsearch批量操作的附加参数。
"""
elasticsearchStore = ElasticsearchStore._create_cls_from_kwargs(
embedding=embedding, **kwargs
)
# Encode the provided texts and add them to the newly created index.
elasticsearchStore.add_texts(
texts, metadatas=metadatas, bulk_kwargs=bulk_kwargs
)
return elasticsearchStore
@staticmethod
def _create_cls_from_kwargs(
embedding: Optional[Embeddings] = None, **kwargs: Any
) -> "ElasticsearchStore":
index_name = kwargs.get("index_name")
if index_name is None:
raise ValueError("Please provide an index_name.")
es_connection = kwargs.get("es_connection")
es_cloud_id = kwargs.get("es_cloud_id")
es_url = kwargs.get("es_url")
es_user = kwargs.get("es_user")
es_password = kwargs.get("es_password")
es_api_key = kwargs.get("es_api_key")
vector_query_field = kwargs.get("vector_query_field")
query_field = kwargs.get("query_field")
distance_strategy = kwargs.get("distance_strategy")
strategy = kwargs.get("strategy", ElasticsearchStore.ApproxRetrievalStrategy())
optional_args = {}
if vector_query_field is not None:
optional_args["vector_query_field"] = vector_query_field
if query_field is not None:
optional_args["query_field"] = query_field
return ElasticsearchStore(
index_name=index_name,
embedding=embedding,
es_url=es_url,
es_connection=es_connection,
es_cloud_id=es_cloud_id,
es_user=es_user,
es_password=es_password,
es_api_key=es_api_key,
strategy=strategy,
distance_strategy=distance_strategy,
**optional_args,
)
[docs] @classmethod
def from_documents(
cls,
documents: List[Document],
embedding: Optional[Embeddings] = None,
bulk_kwargs: Optional[Dict] = None,
**kwargs: Any,
) -> "ElasticsearchStore":
"""从文档构建ElasticsearchStore包装器。
示例:
.. code-block:: python
from langchain_community.vectorstores import ElasticsearchStore
from langchain_community.embeddings.openai import OpenAIEmbeddings
db = ElasticsearchStore.from_documents(
texts,
embeddings,
index_name="langchain-demo",
es_url="http://localhost:9200"
)
参数:
texts: 要添加到Elasticsearch索引的文本列表。
embedding: 用于嵌入文本的嵌入函数。
如果使用不需要推理的策略,则不要提供。
metadatas: 与文本相关的元数据的可选列表。
index_name: 要创建的Elasticsearch索引的名称。
es_url: 要连接的Elasticsearch实例的URL。
cloud_id: 要连接的Elasticsearch实例的Cloud ID。
es_user: 连接到Elasticsearch时要使用的用户名。
es_password: 连接到Elasticsearch时要使用的密码。
es_api_key: 连接到Elasticsearch时要使用的API密钥。
es_connection: 可选的现有Elasticsearch连接。
vector_query_field: 可选。用于存储嵌入向量的字段名称。
query_field: 可选。用于存储文本的字段名称。
bulk_kwargs: 可选。传递给Elasticsearch批量操作的其他参数。
"""
elasticsearchStore = ElasticsearchStore._create_cls_from_kwargs(
embedding=embedding, **kwargs
)
# Encode the provided texts and add them to the newly created index.
elasticsearchStore.add_documents(documents, bulk_kwargs=bulk_kwargs)
return elasticsearchStore
[docs] @staticmethod
def ExactRetrievalStrategy() -> "ExactRetrievalStrategy":
"""用于通过脚本评分执行暴力/精确最近邻搜索。
"""
return ExactRetrievalStrategy()
[docs] @staticmethod
def ApproxRetrievalStrategy(
query_model_id: Optional[str] = None,
hybrid: Optional[bool] = False,
rrf: Optional[Union[dict, bool]] = True,
) -> "ApproxRetrievalStrategy":
"""用于使用HNSW算法执行近似最近邻搜索。
在构建索引时,此策略将在索引中创建一个密集向量字段,并将嵌入向量存储在索引中。
在查询时,文本将使用提供的嵌入函数进行嵌入,或者将使用query_model_id来使用部署到Elasticsearch的模型对文本进行嵌入。
如果使用query_model_id,则不要提供嵌入函数。
参数:
query_model_id:可选。用于在堆栈内嵌入查询文本的模型的ID。需要将嵌入模型部署到Elasticsearch。
hybrid:可选。如果为True,则将使用knn查询和文本查询执行混合搜索。默认为False。
rrf:可选。rrf是Reciprocal Rank Fusion。当`hybrid`为True时,
并且`rrf`为True时,然后rrf: {}。
并且`rrf`为False时,然后省略rrf。
并且isinstance(rrf, dict)为True时,然后传递字典值。
可以传递rrf以调整'rank_constant'和'window_size'。
"""
return ApproxRetrievalStrategy(
query_model_id=query_model_id, hybrid=hybrid, rrf=rrf
)
[docs] @staticmethod
def SparseVectorRetrievalStrategy(
model_id: Optional[str] = None,
) -> "SparseRetrievalStrategy":
"""用于通过文本扩展执行稀疏向量搜索。
用于希望使用ELSER模型执行文档搜索时。
在构建索引时,此策略将创建一个流水线,将使用ELSER模型嵌入文本并将生成的标记存储在索引中。
在查询时,文本将使用ELSER模型嵌入,生成的标记将用于执行文本扩展查询。
参数:
model_id:可选。默认为".elser_model_1"。
要用于嵌入查询文本的模型的ID
在Elasticsearch中部署嵌入模型。
"""
return SparseRetrievalStrategy(model_id=model_id)