Source code for langchain_community.retrievers.elastic_search_bm25
"""封装了Elasticsearch向量数据库。"""
from __future__ import annotations
import uuid
from typing import Any, Iterable, List
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
[docs]class ElasticSearchBM25Retriever(BaseRetriever):
"""使用BM25的Elasticsearch检索器。
要连接到需要登录凭据的Elasticsearch实例,包括Elastic Cloud,请使用Elasticsearch URL格式
https://username:password@es_host:9243。例如,要连接到Elastic Cloud,请使用所需的认证详细信息创建Elasticsearch URL,
并将其作为命名参数elasticsearch_url传递给ElasticVectorSearch构造函数。
您可以通过登录到https://cloud.elastic.co的Elastic Cloud控制台,选择部署,并导航到“部署”页面来获取您的Elastic Cloud URL和登录凭据。
要获取默认“elastic”用户的Elastic Cloud密码:
1. 登录到https://cloud.elastic.co的Elastic Cloud控制台
2. 转到“Security” > “Users”
3. 找到“elastic”用户并单击“编辑”
4. 单击“重置密码”
5. 按照提示重置密码
Elastic Cloud URL的格式为
https://username:password@cluster_id.region_id.gcp.cloud.es.io:9243。
"""
client: Any
"""Elasticsearch客户端。"""
index_name: str
"""Elasticsearch中要使用的索引名称。"""
[docs] @classmethod
def create(
cls, elasticsearch_url: str, index_name: str, k1: float = 2.0, b: float = 0.75
) -> ElasticSearchBM25Retriever:
"""从文本列表中创建一个ElasticSearchBM25Retriever。
参数:
elasticsearch_url:要连接的Elasticsearch实例的URL。
index_name:在Elasticsearch中要使用的索引名称。
k1:BM25参数k1。
b:BM25参数b。
返回值:
"""
from elasticsearch import Elasticsearch
# Create an Elasticsearch client instance
es = Elasticsearch(elasticsearch_url)
# Define the index settings and mappings
settings = {
"analysis": {"analyzer": {"default": {"type": "standard"}}},
"similarity": {
"custom_bm25": {
"type": "BM25",
"k1": k1,
"b": b,
}
},
}
mappings = {
"properties": {
"content": {
"type": "text",
"similarity": "custom_bm25", # Use the custom BM25 similarity
}
}
}
# Create the index with the specified settings and mappings
es.indices.create(index=index_name, mappings=mappings, settings=settings)
return cls(client=es, index_name=index_name)
[docs] def add_texts(
self,
texts: Iterable[str],
refresh_indices: bool = True,
) -> List[str]:
"""运行更多的文本通过嵌入并添加到检索器中。
参数:
texts: 要添加到检索器中的字符串的可迭代对象。
refresh_indices: 用于刷新ElasticSearch索引的布尔值
返回:
将文本添加到检索器中的id列表。
"""
try:
from elasticsearch.helpers import bulk
except ImportError:
raise ImportError(
"Could not import elasticsearch python package. "
"Please install it with `pip install elasticsearch`."
)
requests = []
ids = []
for i, text in enumerate(texts):
_id = str(uuid.uuid4())
request = {
"_op_type": "index",
"_index": self.index_name,
"content": text,
"_id": _id,
}
ids.append(_id)
requests.append(request)
bulk(self.client, requests)
if refresh_indices:
self.client.indices.refresh(index=self.index_name)
return ids
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
query_dict = {"query": {"match": {"content": query}}}
res = self.client.search(index=self.index_name, body=query_dict)
docs = []
for r in res["hits"]["hits"]:
docs.append(Document(page_content=r["_source"]["content"]))
return docs