Skip to content

Database

DatabaseReader #

Bases: BaseReader

简单的数据库读取器。

将每一行连接成LlamaIndex使用的文档。

Parameters:

Name Type Description Default
sql_database Optional[SQLDatabase]

要使用的SQL数据库,包括要指定的表名。 有关更多详细信息,请参见:ref:Ref-Struct-Store

None
engine Optional[Engine]

数据库连接的SQLAlchemy Engine对象。

None
uri Optional[str]

数据库连接的uri。

None
scheme Optional[str]

数据库连接的scheme。

None
host Optional[str]

数据库连接的主机。

None
port Optional[int]

数据库连接的端口。

None
user Optional[str]

数据库连接的用户。

None
password Optional[str]

数据库连接的密码。

None
dbname Optional[str]

数据库连接的dbname。

None

Returns:

Name Type Description
DatabaseReader

一个DatabaseReader对象。

Source code in llama_index/readers/database/base.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class DatabaseReader(BaseReader):
    """简单的数据库读取器。

将每一行连接成LlamaIndex使用的文档。

Args:
    sql_database (Optional[SQLDatabase]): 要使用的SQL数据库,包括要指定的表名。
        有关更多详细信息,请参见:ref:`Ref-Struct-Store`。



    engine (Optional[Engine]): 数据库连接的SQLAlchemy Engine对象。



    uri (Optional[str]): 数据库连接的uri。



    scheme (Optional[str]): 数据库连接的scheme。
    host (Optional[str]): 数据库连接的主机。
    port (Optional[int]): 数据库连接的端口。
    user (Optional[str]): 数据库连接的用户。
    password (Optional[str]): 数据库连接的密码。
    dbname (Optional[str]): 数据库连接的dbname。

Returns:
    DatabaseReader: 一个DatabaseReader对象。"""

    def __init__(
        self,
        sql_database: Optional[SQLDatabase] = None,
        engine: Optional[Engine] = None,
        uri: Optional[str] = None,
        scheme: Optional[str] = None,
        host: Optional[str] = None,
        port: Optional[str] = None,
        user: Optional[str] = None,
        password: Optional[str] = None,
        dbname: Optional[str] = None,
        *args: Any,
        **kwargs: Any,
    ) -> None:
        """使用参数进行初始化。"""
        if sql_database:
            self.sql_database = sql_database
        elif engine:
            self.sql_database = SQLDatabase(engine, *args, **kwargs)
        elif uri:
            self.uri = uri
            self.sql_database = SQLDatabase.from_uri(uri, *args, **kwargs)
        elif scheme and host and port and user and password and dbname:
            uri = f"{scheme}://{user}:{password}@{host}:{port}/{dbname}"
            self.uri = uri
            self.sql_database = SQLDatabase.from_uri(uri, *args, **kwargs)
        else:
            raise ValueError(
                "You must provide either a SQLDatabase, "
                "a SQL Alchemy Engine, a valid connection URI, or a valid "
                "set of credentials."
            )

    def load_data(self, query: str) -> List[Document]:
        """查询并从数据库中加载数据,返回一个文档列表。

Args:
    query (str): 用于过滤表和行的查询参数。

Returns:
    List[Document]: 一个Document对象的列表。
"""
        documents = []
        with self.sql_database.engine.connect() as connection:
            if query is None:
                raise ValueError("A query parameter is necessary to filter the data")
            else:
                result = connection.execute(text(query))

            for item in result.fetchall():
                # fetch each item
                doc_str = ", ".join(
                    [f"{col}: {entry}" for col, entry in zip(result.keys(), item)]
                )
                documents.append(Document(text=doc_str))
        return documents

load_data #

load_data(query: str) -> List[Document]

查询并从数据库中加载数据,返回一个文档列表。

Parameters:

Name Type Description Default
query str

用于过滤表和行的查询参数。

required

Returns:

Type Description
List[Document]

List[Document]: 一个Document对象的列表。

Source code in llama_index/readers/database/base.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
    def load_data(self, query: str) -> List[Document]:
        """查询并从数据库中加载数据,返回一个文档列表。

Args:
    query (str): 用于过滤表和行的查询参数。

Returns:
    List[Document]: 一个Document对象的列表。
"""
        documents = []
        with self.sql_database.engine.connect() as connection:
            if query is None:
                raise ValueError("A query parameter is necessary to filter the data")
            else:
                result = connection.execute(text(query))

            for item in result.fetchall():
                # fetch each item
                doc_str = ", ".join(
                    [f"{col}: {entry}" for col, entry in zip(result.keys(), item)]
                )
                documents.append(Document(text=doc_str))
        return documents