Bases: BasePydanticReader
从Elasticsearch/Opensearch索引中读取文档。
然后可以在下游的Llama索引数据结构中使用这些文档。
Parameters:
Name |
Type |
Description |
Default |
endpoint |
str
|
|
required
|
index |
str
|
|
required
|
httpx_client_args |
dict
|
可选的额外参数,用于传递给httpx.Client
|
None
|
Source code in llama_index/readers/elasticsearch/base.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 | class ElasticsearchReader(BasePydanticReader):
"""从Elasticsearch/Opensearch索引中读取文档。
然后可以在下游的Llama索引数据结构中使用这些文档。
Args:
endpoint (str): 集群的URL(http/https)
index (str): 索引的名称(必填)
httpx_client_args (dict): 可选的额外参数,用于传递给`httpx.Client`"""
is_remote: bool = True
endpoint: str
index: str
httpx_client_args: Optional[dict] = None
_client: Any = PrivateAttr()
def __init__(
self, endpoint: str, index: str, httpx_client_args: Optional[dict] = None
):
"""使用参数进行初始化。"""
import_err_msg = """
`httpx` package not found. Install via `pip install httpx`
"""
try:
import httpx
except ImportError:
raise ImportError(import_err_msg)
self._client = httpx.Client(base_url=endpoint, **(httpx_client_args or {}))
super().__init__(
endpoint=endpoint, index=index, httpx_client_args=httpx_client_args
)
@classmethod
def class_name(cls) -> str:
return "ElasticsearchReader"
def load_data(
self,
field: str,
query: Optional[dict] = None,
embedding_field: Optional[str] = None,
) -> List[Document]:
"""从Elasticsearch索引中读取数据。
Args:
field(str):要从文档中检索文本的字段
query(Optional[dict]):Elasticsearch JSON查询DSL对象。
例如:
{"query": {"match": {"message": {"query": "this is a test"}}}}
embedding_field(Optional[str]):如果在该索引中存储了嵌入,则可以使用此字段
设置返回的文档列表上的嵌入字段。
Returns:
List[Document]:文档列表。
"""
res = self._client.post(f"{self.index}/_search", json=query).json()
documents = []
for hit in res["hits"]["hits"]:
doc_id = hit["_id"]
value = hit["_source"][field]
embedding = hit["_source"].get(embedding_field or "", None)
documents.append(
Document(
id_=doc_id, text=value, metadata=hit["_source"], embedding=embedding
)
)
return documents
|
load_data
load_data(
field: str,
query: Optional[dict] = None,
embedding_field: Optional[str] = None,
) -> List[Document]
从Elasticsearch索引中读取数据。
Returns:
Source code in llama_index/readers/elasticsearch/base.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 | def load_data(
self,
field: str,
query: Optional[dict] = None,
embedding_field: Optional[str] = None,
) -> List[Document]:
"""从Elasticsearch索引中读取数据。
Args:
field(str):要从文档中检索文本的字段
query(Optional[dict]):Elasticsearch JSON查询DSL对象。
例如:
{"query": {"match": {"message": {"query": "this is a test"}}}}
embedding_field(Optional[str]):如果在该索引中存储了嵌入,则可以使用此字段
设置返回的文档列表上的嵌入字段。
Returns:
List[Document]:文档列表。
"""
res = self._client.post(f"{self.index}/_search", json=query).json()
documents = []
for hit in res["hits"]["hits"]:
doc_id = hit["_id"]
value = hit["_source"][field]
embedding = hit["_source"].get(embedding_field or "", None)
documents.append(
Document(
id_=doc_id, text=value, metadata=hit["_source"], embedding=embedding
)
)
return documents
|