Bases: BaseReader
简单的Mongo读取器。
将每个Mongo文档连接成LlamaIndex使用的文档。
Args:
# host (str): Mongo主机。
# port (int): Mongo端口。
Source code in llama_index/readers/mongodb/base.py
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104 | class SimpleMongoReader(BaseReader):
"""简单的Mongo读取器。
# 将每个Mongo文档连接成LlamaIndex使用的文档。
# Args:
# host (str): Mongo主机。
# port (int): Mongo端口。"""
def __init__(
self,
host: Optional[str] = None,
port: Optional[int] = None,
uri: Optional[str] = None,
) -> None:
"""使用参数进行初始化。"""
try:
from pymongo import MongoClient
except ImportError as err:
raise ImportError(
"`pymongo` package not found, please run `pip install pymongo`"
) from err
client: MongoClient
if uri:
client = MongoClient(uri)
elif host and port:
client = MongoClient(host, port)
else:
raise ValueError("Either `host` and `port` or `uri` must be provided.")
self.client = client
def _flatten(self, texts: List[Union[str, List[str]]]) -> List[str]:
result = []
for text in texts:
result += text if isinstance(text, list) else [text]
return result
def lazy_load_data(
self,
db_name: str,
collection_name: str,
field_names: List[str] = ["text"],
separator: str = "",
query_dict: Optional[Dict] = None,
max_docs: int = 0,
metadata_names: Optional[List[str]] = None,
) -> Iterable[Document]:
"""从输入目录加载数据。
Args:
db_name (str): 数据库的名称。
collection_name (str): 集合的名称。
field_names(List[str]): 要连接的字段的名称。
默认为 ["text"]
separator (str): 字段之间要使用的分隔符。
默认为 ""
query_dict (Optional[Dict]): 用于过滤文档的查询。更多信息请阅读[官方文档](https://www.mongodb.com/docs/manual/reference/method/db.collection.find/#std-label-method-find-query)
默认为 None
max_docs (int): 要加载的最大文档数。
默认为 0(无限制)
metadata_names (Optional[List[str]]): 要添加到文档的元数据属性中的字段名称。
默认为 None
Returns:
List[Document]: 文档的列表。
"""
db = self.client[db_name]
cursor = db[collection_name].find(
filter=query_dict or {},
limit=max_docs,
projection={name: 1 for name in field_names + (metadata_names or [])},
)
for item in cursor:
try:
texts = [item[name] for name in field_names]
except KeyError as err:
raise ValueError(
f"{err.args[0]} field not found in Mongo document."
) from err
texts = self._flatten(texts)
text = separator.join(texts)
if metadata_names is None:
yield Document(text=text)
else:
try:
metadata = {name: item.get(name) for name in metadata_names}
except KeyError as err:
raise ValueError(
f"{err.args[0]} field not found in Mongo document."
) from err
yield Document(text=text, metadata=metadata)
|
lazy_load_data
lazy_load_data(
db_name: str,
collection_name: str,
field_names: List[str] = ["text"],
separator: str = "",
query_dict: Optional[Dict] = None,
max_docs: int = 0,
metadata_names: Optional[List[str]] = None,
) -> Iterable[Document]
从输入目录加载数据。
Parameters:
Name |
Type |
Description |
Default |
db_name |
str
|
|
required
|
collection_name |
str
|
|
required
|
field_names(List[str]) |
|
|
required
|
separator |
str
|
|
''
|
query_dict |
Optional[Dict]
|
用于过滤文档的查询。更多信息请阅读官方文档
默认为 None
|
None
|
max_docs |
int
|
|
0
|
metadata_names |
Optional[List[str]]
|
要添加到文档的元数据属性中的字段名称。
默认为 None
|
None
|
Returns:
Source code in llama_index/readers/mongodb/base.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104 | def lazy_load_data(
self,
db_name: str,
collection_name: str,
field_names: List[str] = ["text"],
separator: str = "",
query_dict: Optional[Dict] = None,
max_docs: int = 0,
metadata_names: Optional[List[str]] = None,
) -> Iterable[Document]:
"""从输入目录加载数据。
Args:
db_name (str): 数据库的名称。
collection_name (str): 集合的名称。
field_names(List[str]): 要连接的字段的名称。
默认为 ["text"]
separator (str): 字段之间要使用的分隔符。
默认为 ""
query_dict (Optional[Dict]): 用于过滤文档的查询。更多信息请阅读[官方文档](https://www.mongodb.com/docs/manual/reference/method/db.collection.find/#std-label-method-find-query)
默认为 None
max_docs (int): 要加载的最大文档数。
默认为 0(无限制)
metadata_names (Optional[List[str]]): 要添加到文档的元数据属性中的字段名称。
默认为 None
Returns:
List[Document]: 文档的列表。
"""
db = self.client[db_name]
cursor = db[collection_name].find(
filter=query_dict or {},
limit=max_docs,
projection={name: 1 for name in field_names + (metadata_names or [])},
)
for item in cursor:
try:
texts = [item[name] for name in field_names]
except KeyError as err:
raise ValueError(
f"{err.args[0]} field not found in Mongo document."
) from err
texts = self._flatten(texts)
text = separator.join(texts)
if metadata_names is None:
yield Document(text=text)
else:
try:
metadata = {name: item.get(name) for name in metadata_names}
except KeyError as err:
raise ValueError(
f"{err.args[0]} field not found in Mongo document."
) from err
yield Document(text=text, metadata=metadata)
|