from __future__ import annotations
import uuid
import warnings
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterable,
List,
Mapping,
Optional,
Tuple,
Union,
)
from langchain_core._api import deprecated
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.utils import get_from_dict_or_env
from langchain_core.vectorstores import VectorStore
if TYPE_CHECKING:
from elasticsearch import Elasticsearch
def _default_text_mapping(dim: int) -> Dict:
return {
"properties": {
"text": {"type": "text"},
"vector": {"type": "dense_vector", "dims": dim},
}
}
def _default_script_query(query_vector: List[float], filter: Optional[dict]) -> Dict:
if filter:
((key, value),) = filter.items()
filter = {"match": {f"metadata.{key}.keyword": f"{value}"}}
else:
filter = {"match_all": {}}
return {
"script_score": {
"query": filter,
"script": {
"source": "cosineSimilarity(params.query_vector, 'vector') + 1.0",
"params": {"query_vector": query_vector},
},
}
}
[docs]@deprecated(
"0.0.27",
alternative="Use ElasticsearchStore class in langchain-elasticsearch package",
pending=True,
)
class ElasticVectorSearch(VectorStore):
"""ElasticVectorSearch 使用向量的暴力搜索方法。
建议改用 ElasticsearchStore,它提供了使用适用于大型数据集的近似 HNSW 算法的选项,性能更好。
ElasticsearchStore 还支持元数据过滤、自定义查询检索器等功能!
您可以在 ElasticsearchStore 上阅读更多信息:
https://python.langchain.com/docs/integrations/vectorstores/elasticsearch
要连接到不需要登录凭据的 `Elasticsearch` 实例,请将 Elasticsearch URL 和索引名称以及嵌入对象传递给构造函数。
示例:
```python
from langchain_community.vectorstores import ElasticVectorSearch
from langchain_community.embeddings import OpenAIEmbeddings
embedding = OpenAIEmbeddings()
elastic_vector_search = ElasticVectorSearch(
elasticsearch_url="http://localhost:9200",
index_name="test_index",
embedding=embedding
)
```
要连接到需要登录凭据的 Elasticsearch 实例,包括 Elastic Cloud,请使用 Elasticsearch URL 格式 https://username:password@es_host:9243。例如,要连接到 Elastic Cloud,请使用所需的身份验证详细信息创建 Elasticsearch URL,并将其作为命名参数 elasticsearch_url 传递给 ElasticVectorSearch 构造函数。
您可以通过登录到 Elastic Cloud 控制台 https://cloud.elastic.co,选择部署,然后导航到“部署”页面来获取 Elastic Cloud URL 和登录凭据。
要获取默认“elastic”用户的 Elastic Cloud 密码:
1. 登录到 Elastic Cloud 控制台 https://cloud.elastic.co
2. 转到“Security” > “Users”
3. 找到“elastic”用户并单击“编辑”
4. 单击“重置密码”
5. 按照提示重置密码
Elastic Cloud URL 的格式为 https://username:password@cluster_id.region_id.gcp.cloud.es.io:9243。
示例:
```python
from langchain_community.vectorstores import ElasticVectorSearch
from langchain_community.embeddings import OpenAIEmbeddings
embedding = OpenAIEmbeddings()
elastic_host = "cluster_id.region_id.gcp.cloud.es.io"
elasticsearch_url = f"https://username:password@{elastic_host}:9243"
elastic_vector_search = ElasticVectorSearch(
elasticsearch_url=elasticsearch_url,
index_name="test_index",
embedding=embedding
)
```
参数:
elasticsearch_url (str): Elasticsearch 实例的 URL。
index_name (str): 嵌入索引的 Elasticsearch 索引名称。
embedding (Embeddings): 提供文本嵌入能力的对象。
它应该是一个继承 Embeddings 抽象基类的类的实例,例如 OpenAIEmbeddings()。
引发:
ValueError: 如果未安装 elasticsearch python 包。"""
[docs] def __init__(
self,
elasticsearch_url: str,
index_name: str,
embedding: Embeddings,
*,
ssl_verify: Optional[Dict[str, Any]] = None,
):
"""使用必要的组件进行初始化。"""
warnings.warn(
"ElasticVectorSearch will be removed in a future release. See"
"Elasticsearch integration docs on how to upgrade."
)
try:
import elasticsearch
except ImportError:
raise ImportError(
"Could not import elasticsearch python package. "
"Please install it with `pip install elasticsearch`."
)
self.embedding = embedding
self.index_name = index_name
_ssl_verify = ssl_verify or {}
try:
self.client = elasticsearch.Elasticsearch(
elasticsearch_url,
**_ssl_verify,
headers={"user-agent": self.get_user_agent()},
)
except ValueError as e:
raise ValueError(
f"Your elasticsearch client string is mis-formatted. Got error: {e} "
)
[docs] @staticmethod
def get_user_agent() -> str:
from langchain_community import __version__
return f"langchain-py-dvs/{__version__}"
@property
def embeddings(self) -> Embeddings:
return self.embedding
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
refresh_indices: bool = True,
**kwargs: Any,
) -> List[str]:
"""运行更多文本通过嵌入并添加到向量存储。
参数:
texts:要添加到向量存储的字符串的可迭代对象。
metadatas:与文本相关的元数据的可选列表。
ids:唯一ID的可选列表。
refresh_indices:刷新ElasticSearch索引的布尔值。
返回:
将文本添加到向量存储后的ID列表。
"""
try:
from elasticsearch.exceptions import NotFoundError
from elasticsearch.helpers import bulk
except ImportError:
raise ImportError(
"Could not import elasticsearch python package. "
"Please install it with `pip install elasticsearch`."
)
requests = []
ids = ids or [str(uuid.uuid4()) for _ in texts]
embeddings = self.embedding.embed_documents(list(texts))
dim = len(embeddings[0])
mapping = _default_text_mapping(dim)
# check to see if the index already exists
try:
self.client.indices.get(index=self.index_name)
except NotFoundError:
# TODO would be nice to create index before embedding,
# just to save expensive steps for last
self.create_index(self.client, self.index_name, mapping)
for i, text in enumerate(texts):
metadata = metadatas[i] if metadatas else {}
request = {
"_op_type": "index",
"_index": self.index_name,
"vector": embeddings[i],
"text": text,
"metadata": metadata,
"_id": ids[i],
}
requests.append(request)
bulk(self.client, requests)
if refresh_indices:
self.client.indices.refresh(index=self.index_name)
return ids
[docs] def similarity_search(
self, query: str, k: int = 4, filter: Optional[dict] = None, **kwargs: Any
) -> List[Document]:
"""返回与查询最相似的文档。
参数:
query:要查找与之相似的文档的文本。
k:要返回的文档数量。默认为4。
返回:
与查询最相似的文档列表。
"""
docs_and_scores = self.similarity_search_with_score(query, k, filter=filter)
documents = [d[0] for d in docs_and_scores]
return documents
[docs] def similarity_search_with_score(
self, query: str, k: int = 4, filter: Optional[dict] = None, **kwargs: Any
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的文档。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
返回:
与查询最相似的文档列表。
"""
embedding = self.embedding.embed_query(query)
script_query = _default_script_query(embedding, filter)
response = self.client_search(
self.client, self.index_name, script_query, size=k
)
hits = [hit for hit in response["hits"]["hits"]]
docs_and_scores = [
(
Document(
page_content=hit["_source"]["text"],
metadata=hit["_source"]["metadata"],
),
hit["_score"],
)
for hit in hits
]
return docs_and_scores
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
index_name: Optional[str] = None,
refresh_indices: bool = True,
**kwargs: Any,
) -> ElasticVectorSearch:
"""从原始文档构建ElasticVectorSearch包装器。
这是一个用户友好的接口,可以:
1. 嵌入文档。
2. 在Elasticsearch实例中为嵌入创建一个新的索引。
3. 将文档添加到新创建的Elasticsearch索引中。
这旨在是一个快速入门的方式。
示例:
.. code-block:: python
from langchain_community.vectorstores import ElasticVectorSearch
from langchain_community.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
elastic_vector_search = ElasticVectorSearch.from_texts(
texts,
embeddings,
elasticsearch_url="http://localhost:9200"
)
"""
elasticsearch_url = get_from_dict_or_env(
kwargs, "elasticsearch_url", "ELASTICSEARCH_URL"
)
if "elasticsearch_url" in kwargs:
del kwargs["elasticsearch_url"]
index_name = index_name or uuid.uuid4().hex
vectorsearch = cls(elasticsearch_url, index_name, embedding, **kwargs)
vectorsearch.add_texts(
texts, metadatas=metadatas, ids=ids, refresh_indices=refresh_indices
)
return vectorsearch
[docs] def create_index(self, client: Any, index_name: str, mapping: Dict) -> None:
version_num = client.info()["version"]["number"][0]
version_num = int(version_num)
if version_num >= 8:
client.indices.create(index=index_name, mappings=mapping)
else:
client.indices.create(index=index_name, body={"mappings": mapping})
[docs] def client_search(
self, client: Any, index_name: str, script_query: Dict, size: int
) -> Any:
version_num = client.info()["version"]["number"][0]
version_num = int(version_num)
if version_num >= 8:
response = client.search(index=index_name, query=script_query, size=size)
else:
response = client.search(
index=index_name, body={"query": script_query, "size": size}
)
return response
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> None:
"""根据向量ID删除。
参数:
ids:要删除的ID列表。
"""
if ids is None:
raise ValueError("No ids provided to delete.")
# TODO: Check if this can be done in bulk
for id in ids:
self.client.delete(index=self.index_name, id=id)
[docs]@deprecated(
"0.0.1",
alternative="Use ElasticsearchStore class in langchain-elasticsearch package",
pending=True,
)
class ElasticKnnSearch(VectorStore):
"""[已弃用] 使用k-最近邻搜索(`k-NN`)向量存储的`Elasticsearch`。
建议改用`ElasticsearchStore`,它支持元数据过滤、自定义查询检索器等更多功能!
您可以在`ElasticsearchStore`中阅读更多信息:
https://python.langchain.com/docs/integrations/vectorstores/elasticsearch
它创建了一个文本数据的Elasticsearch索引,可以使用k-NN搜索进行搜索。文本数据使用提供的嵌入模型转换为向量嵌入,并将这些嵌入存储在Elasticsearch索引中。
属性:
index_name (str): Elasticsearch索引的名称。
embedding (Embeddings): 用于将文本数据转换为向量嵌入的嵌入模型。
es_connection (Elasticsearch, 可选): 现有的Elasticsearch连接。
es_cloud_id (str, 可选): 您的Elasticsearch服务部署的Cloud ID。
es_user (str, 可选): 您的Elasticsearch服务部署的用户名。
es_password (str, 可选): 您的Elasticsearch服务部署的密码。
vector_query_field (str, 可选): Elasticsearch索引中包含向量嵌入的字段的名称。
query_field (str, 可选): Elasticsearch索引中包含原始文本数据的字段的名称。
用法:
>>> from embeddings import Embeddings
>>> embedding = Embeddings.load('glove')
>>> es_search = ElasticKnnSearch('my_index', embedding)
>>> es_search.add_texts(['Hello world!', 'Another text'])
>>> results = es_search.knn_search('Hello')
[(Document(page_content='Hello world!', metadata={}), 0.9)]"""
[docs] def __init__(
self,
index_name: str,
embedding: Embeddings,
es_connection: Optional["Elasticsearch"] = None,
es_cloud_id: Optional[str] = None,
es_user: Optional[str] = None,
es_password: Optional[str] = None,
vector_query_field: Optional[str] = "vector",
query_field: Optional[str] = "text",
):
try:
import elasticsearch
except ImportError:
raise ImportError(
"Could not import elasticsearch python package. "
"Please install it with `pip install elasticsearch`."
)
warnings.warn(
"ElasticKnnSearch will be removed in a future release."
"Use ElasticsearchStore instead. See Elasticsearch "
"integration docs on how to upgrade."
)
self.embedding = embedding
self.index_name = index_name
self.query_field = query_field
self.vector_query_field = vector_query_field
# If a pre-existing Elasticsearch connection is provided, use it.
if es_connection is not None:
self.client = es_connection
else:
# If credentials for a new Elasticsearch connection are provided,
# create a new connection.
if es_cloud_id and es_user and es_password:
self.client = elasticsearch.Elasticsearch(
cloud_id=es_cloud_id, basic_auth=(es_user, es_password)
)
else:
raise ValueError(
"""Either provide a pre-existing Elasticsearch connection, \
or valid credentials for creating a new connection."""
)
@staticmethod
def _default_knn_mapping(
dims: int, similarity: Optional[str] = "dot_product"
) -> Dict:
return {
"properties": {
"text": {"type": "text"},
"vector": {
"type": "dense_vector",
"dims": dims,
"index": True,
"similarity": similarity,
},
}
}
def _default_knn_query(
self,
query_vector: Optional[List[float]] = None,
query: Optional[str] = None,
model_id: Optional[str] = None,
k: Optional[int] = 10,
num_candidates: Optional[int] = 10,
) -> Dict:
knn: Dict = {
"field": self.vector_query_field,
"k": k,
"num_candidates": num_candidates,
}
# Case 1: `query_vector` is provided, but not `model_id` -> use query_vector
if query_vector and not model_id:
knn["query_vector"] = query_vector
# Case 2: `query` and `model_id` are provided, -> use query_vector_builder
elif query and model_id:
knn["query_vector_builder"] = {
"text_embedding": {
"model_id": model_id, # use 'model_id' argument
"model_text": query, # use 'query' argument
}
}
else:
raise ValueError(
"Either `query_vector` or `model_id` must be provided, but not both."
)
return knn
[docs] def similarity_search(
self, query: str, k: int = 4, filter: Optional[dict] = None, **kwargs: Any
) -> List[Document]:
"""
传递给`knn_search`
"""
results = self.knn_search(query=query, k=k, **kwargs)
return [doc for doc, score in results]
[docs] def similarity_search_with_score(
self, query: str, k: int = 10, **kwargs: Any
) -> List[Tuple[Document, float]]:
"""传递到`knn_search`,包括分数"""
return self.knn_search(query=query, k=k, **kwargs)
[docs] def knn_search(
self,
query: Optional[str] = None,
k: Optional[int] = 10,
query_vector: Optional[List[float]] = None,
model_id: Optional[str] = None,
size: Optional[int] = 10,
source: Optional[bool] = True,
fields: Optional[
Union[List[Mapping[str, Any]], Tuple[Mapping[str, Any], ...], None]
] = None,
page_content: Optional[str] = "text",
) -> List[Tuple[Document, float]]:
"""在Elasticsearch索引上执行k-NN搜索。
参数:
query (str, optional): 要搜索的查询文本。
k (int, optional): 要返回的最近邻居的数量。
query_vector (List[float], optional): 要搜索的查询向量。
model_id (str, optional): 用于将查询文本转换为向量的模型的ID。
size (int, optional): 要返回的搜索结果数量。
source (bool, optional): 是否返回搜索结果的源。
fields (List[Mapping[str, Any]], optional): 要在搜索结果中返回的字段。
page_content (str, optional): 包含页面内容的字段的名称。
返回:
一个元组列表,每个元组包含一个Document对象和一个分数。
"""
# if not source and (fields == None or page_content not in fields):
if not source and (
fields is None or not any(page_content in field for field in fields)
):
raise ValueError("If source=False `page_content` field must be in `fields`")
knn_query_body = self._default_knn_query(
query_vector=query_vector, query=query, model_id=model_id, k=k
)
# Perform the kNN search on the Elasticsearch index and return the results.
response = self.client.search(
index=self.index_name,
knn=knn_query_body,
size=size,
source=source,
fields=fields,
)
hits = [hit for hit in response["hits"]["hits"]]
docs_and_scores = [
(
Document(
page_content=(
hit["_source"][page_content]
if source
else hit["fields"][page_content][0]
),
metadata=hit["fields"] if fields else {},
),
hit["_score"],
)
for hit in hits
]
return docs_and_scores
[docs] def knn_hybrid_search(
self,
query: Optional[str] = None,
k: Optional[int] = 10,
query_vector: Optional[List[float]] = None,
model_id: Optional[str] = None,
size: Optional[int] = 10,
source: Optional[bool] = True,
knn_boost: Optional[float] = 0.9,
query_boost: Optional[float] = 0.1,
fields: Optional[
Union[List[Mapping[str, Any]], Tuple[Mapping[str, Any], ...], None]
] = None,
page_content: Optional[str] = "text",
) -> List[Tuple[Document, float]]:
"""在Elasticsearch索引上执行混合k-NN和文本搜索。
参数:
query(str,可选):要搜索的查询文本。
k(int,可选):要返回的最近邻居的数量。
query_vector(List[float],可选):要搜索的查询向量。
model_id(str,可选):用于将查询文本转换为向量的模型的ID。
size(int,可选):要返回的搜索结果数量。
source(bool,可选):是否返回搜索结果的源。
knn_boost(float,可选):应用于k-NN搜索结果的增强值。
query_boost(float,可选):应用于文本搜索结果的增强值。
fields(List[Mapping[str, Any]],可选):要在搜索结果中返回的字段。
page_content(str,可选):包含页面内容的字段的名称。
返回:
一个元组列表,每个元组包含一个Document对象和一个分数。
"""
# if not source and (fields == None or page_content not in fields):
if not source and (
fields is None or not any(page_content in field for field in fields)
):
raise ValueError("If source=False `page_content` field must be in `fields`")
knn_query_body = self._default_knn_query(
query_vector=query_vector, query=query, model_id=model_id, k=k
)
# Modify the knn_query_body to add a "boost" parameter
knn_query_body["boost"] = knn_boost
# Generate the body of the standard Elasticsearch query
match_query_body = {
"match": {self.query_field: {"query": query, "boost": query_boost}}
}
# Perform the hybrid search on the Elasticsearch index and return the results.
response = self.client.search(
index=self.index_name,
query=match_query_body,
knn=knn_query_body,
fields=fields,
size=size,
source=source,
)
hits = [hit for hit in response["hits"]["hits"]]
docs_and_scores = [
(
Document(
page_content=(
hit["_source"][page_content]
if source
else hit["fields"][page_content][0]
),
metadata=hit["fields"] if fields else {},
),
hit["_score"],
)
for hit in hits
]
return docs_and_scores
[docs] def create_knn_index(self, mapping: Dict) -> None:
"""在Elasticsearch中创建一个新的k-NN索引。
参数:
mapping (字典): 用于新索引的映射。
返回:
无
"""
self.client.indices.create(index=self.index_name, mappings=mapping)
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[Dict[Any, Any]]] = None,
model_id: Optional[str] = None,
refresh_indices: bool = False,
**kwargs: Any,
) -> List[str]:
"""将一组文本添加到Elasticsearch索引中。
参数:
texts (Iterable[str]): 要添加到索引中的文本。
metadatas (List[Dict[Any, Any]], optional): 与文本关联的元数据字典列表。
model_id (str, optional): 用于将文本转换为向量的模型的ID。
refresh_indices (bool, optional): 在添加文本后是否刷新Elasticsearch索引。
**kwargs: 任意关键字参数。
返回:
添加文本的ID列表。
"""
# Check if the index exists.
if not self.client.indices.exists(index=self.index_name):
dims = kwargs.get("dims")
if dims is None:
raise ValueError("ElasticKnnSearch requires 'dims' parameter")
similarity = kwargs.get("similarity")
optional_args = {}
if similarity is not None:
optional_args["similarity"] = similarity
mapping = self._default_knn_mapping(dims=dims, **optional_args)
self.create_knn_index(mapping)
embeddings = self.embedding.embed_documents(list(texts))
# body = []
body: List[Mapping[str, Any]] = []
for text, vector in zip(texts, embeddings):
body.extend(
[
{"index": {"_index": self.index_name}},
{"text": text, "vector": vector},
]
)
responses = self.client.bulk(operations=body)
ids = [
item["index"]["_id"]
for item in responses["items"]
if item["index"]["result"] == "created"
]
if refresh_indices:
self.client.indices.refresh(index=self.index_name)
return ids
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[Dict[Any, Any]]] = None,
**kwargs: Any,
) -> ElasticKnnSearch:
"""创建一个新的ElasticKnnSearch实例,并将文本列表添加到Elasticsearch索引中。
参数:
texts (List[str]): 要添加到索引中的文本。
embedding (Embeddings): 用于将文本转换为向量的嵌入模型。
metadatas (List[Dict[Any, Any]], optional): 与文本关联的元数据字典列表。
**kwargs: 任意关键字参数。
返回:
一个新的ElasticKnnSearch实例。
"""
index_name = kwargs.get("index_name", str(uuid.uuid4()))
es_connection = kwargs.get("es_connection")
es_cloud_id = kwargs.get("es_cloud_id")
es_user = kwargs.get("es_user")
es_password = kwargs.get("es_password")
vector_query_field = kwargs.get("vector_query_field", "vector")
query_field = kwargs.get("query_field", "text")
model_id = kwargs.get("model_id")
dims = kwargs.get("dims")
if dims is None:
raise ValueError("ElasticKnnSearch requires 'dims' parameter")
optional_args = {}
if vector_query_field is not None:
optional_args["vector_query_field"] = vector_query_field
if query_field is not None:
optional_args["query_field"] = query_field
knnvectorsearch = cls(
index_name=index_name,
embedding=embedding,
es_connection=es_connection,
es_cloud_id=es_cloud_id,
es_user=es_user,
es_password=es_password,
**optional_args,
)
# Encode the provided texts and add them to the newly created index.
knnvectorsearch.add_texts(texts, model_id=model_id, dims=dims, **optional_args)
return knnvectorsearch