Skip to content

Mongodb

SimpleMongoReader #

Bases: BaseReader

简单的Mongo读取器。

将每个Mongo文档连接成LlamaIndex使用的文档。#

Args:#

# host (str): Mongo主机。
# port (int): Mongo端口。
Source code in llama_index/readers/mongodb/base.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class SimpleMongoReader(BaseReader):
    """简单的Mongo读取器。

    # 将每个Mongo文档连接成LlamaIndex使用的文档。

    # Args:
        # host (str): Mongo主机。
        # port (int): Mongo端口。"""

    def __init__(
        self,
        host: Optional[str] = None,
        port: Optional[int] = None,
        uri: Optional[str] = None,
    ) -> None:
        """使用参数进行初始化。"""
        try:
            from pymongo import MongoClient
        except ImportError as err:
            raise ImportError(
                "`pymongo` package not found, please run `pip install pymongo`"
            ) from err

        client: MongoClient
        if uri:
            client = MongoClient(uri)
        elif host and port:
            client = MongoClient(host, port)
        else:
            raise ValueError("Either `host` and `port` or `uri` must be provided.")

        self.client = client

    def _flatten(self, texts: List[Union[str, List[str]]]) -> List[str]:
        result = []
        for text in texts:
            result += text if isinstance(text, list) else [text]
        return result

    def lazy_load_data(
        self,
        db_name: str,
        collection_name: str,
        field_names: List[str] = ["text"],
        separator: str = "",
        query_dict: Optional[Dict] = None,
        max_docs: int = 0,
        metadata_names: Optional[List[str]] = None,
    ) -> Iterable[Document]:
        """从输入目录加载数据。

Args:
    db_name (str): 数据库的名称。
    collection_name (str): 集合的名称。
    field_names(List[str]): 要连接的字段的名称。
        默认为 ["text"]
    separator (str): 字段之间要使用的分隔符。
        默认为 ""
    query_dict (Optional[Dict]): 用于过滤文档的查询。更多信息请阅读[官方文档](https://www.mongodb.com/docs/manual/reference/method/db.collection.find/#std-label-method-find-query)
        默认为 None
    max_docs (int): 要加载的最大文档数。
        默认为 0(无限制)
    metadata_names (Optional[List[str]]): 要添加到文档的元数据属性中的字段名称。
        默认为 None

Returns:
    List[Document]: 文档的列表。
"""
        db = self.client[db_name]
        cursor = db[collection_name].find(
            filter=query_dict or {},
            limit=max_docs,
            projection={name: 1 for name in field_names + (metadata_names or [])},
        )

        for item in cursor:
            try:
                texts = [item[name] for name in field_names]
            except KeyError as err:
                raise ValueError(
                    f"{err.args[0]} field not found in Mongo document."
                ) from err

            texts = self._flatten(texts)
            text = separator.join(texts)

            if metadata_names is None:
                yield Document(text=text)
            else:
                try:
                    metadata = {name: item.get(name) for name in metadata_names}
                except KeyError as err:
                    raise ValueError(
                        f"{err.args[0]} field not found in Mongo document."
                    ) from err
                yield Document(text=text, metadata=metadata)

lazy_load_data #

lazy_load_data(
    db_name: str,
    collection_name: str,
    field_names: List[str] = ["text"],
    separator: str = "",
    query_dict: Optional[Dict] = None,
    max_docs: int = 0,
    metadata_names: Optional[List[str]] = None,
) -> Iterable[Document]

从输入目录加载数据。

Parameters:

Name Type Description Default
db_name str

数据库的名称。

required
collection_name str

集合的名称。

required
field_names(List[str])

要连接的字段的名称。 默认为 ["text"]

required
separator str

字段之间要使用的分隔符。 默认为 ""

''
query_dict Optional[Dict]

用于过滤文档的查询。更多信息请阅读官方文档 默认为 None

None
max_docs int

要加载的最大文档数。 默认为 0(无限制)

0
metadata_names Optional[List[str]]

要添加到文档的元数据属性中的字段名称。 默认为 None

None

Returns:

Type Description
Iterable[Document]

List[Document]: 文档的列表。

Source code in llama_index/readers/mongodb/base.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
    def lazy_load_data(
        self,
        db_name: str,
        collection_name: str,
        field_names: List[str] = ["text"],
        separator: str = "",
        query_dict: Optional[Dict] = None,
        max_docs: int = 0,
        metadata_names: Optional[List[str]] = None,
    ) -> Iterable[Document]:
        """从输入目录加载数据。

Args:
    db_name (str): 数据库的名称。
    collection_name (str): 集合的名称。
    field_names(List[str]): 要连接的字段的名称。
        默认为 ["text"]
    separator (str): 字段之间要使用的分隔符。
        默认为 ""
    query_dict (Optional[Dict]): 用于过滤文档的查询。更多信息请阅读[官方文档](https://www.mongodb.com/docs/manual/reference/method/db.collection.find/#std-label-method-find-query)
        默认为 None
    max_docs (int): 要加载的最大文档数。
        默认为 0(无限制)
    metadata_names (Optional[List[str]]): 要添加到文档的元数据属性中的字段名称。
        默认为 None

Returns:
    List[Document]: 文档的列表。
"""
        db = self.client[db_name]
        cursor = db[collection_name].find(
            filter=query_dict or {},
            limit=max_docs,
            projection={name: 1 for name in field_names + (metadata_names or [])},
        )

        for item in cursor:
            try:
                texts = [item[name] for name in field_names]
            except KeyError as err:
                raise ValueError(
                    f"{err.args[0]} field not found in Mongo document."
                ) from err

            texts = self._flatten(texts)
            text = separator.join(texts)

            if metadata_names is None:
                yield Document(text=text)
            else:
                try:
                    metadata = {name: item.get(name) for name in metadata_names}
                except KeyError as err:
                    raise ValueError(
                        f"{err.args[0]} field not found in Mongo document."
                    ) from err
                yield Document(text=text, metadata=metadata)