Bases: BaseReader
初始化SnowflakeReader的新实例。
该类使用SQLAlchemy建立与Snowflake的连接,执行查询并将每一行连接成LlamaIndex使用的Document。
属性:
engine(可选[Engine]):数据库连接的SQLAlchemy Engine对象。
或
account(可选[str]):Snowflake账户标识符。
user(可选[str]):Snowflake账户用户名。
password(可选[str]):Snowflake账户的密码。
database(可选[str]):Snowflake数据库名称。
schema(可选[str]):Snowflake模式名称。
warehouse(可选[str]):Snowflake仓库名称。
proxy(可选[str]):连接的代理设置。
Source code in llama_index/readers/snowflake/base.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 | class SnowflakeReader(BaseReader):
"""初始化SnowflakeReader的新实例。
该类使用SQLAlchemy建立与Snowflake的连接,执行查询并将每一行连接成LlamaIndex使用的Document。
属性:
engine(可选[Engine]):数据库连接的SQLAlchemy Engine对象。
或
account(可选[str]):Snowflake账户标识符。
user(可选[str]):Snowflake账户用户名。
password(可选[str]):Snowflake账户的密码。
database(可选[str]):Snowflake数据库名称。
schema(可选[str]):Snowflake模式名称。
warehouse(可选[str]):Snowflake仓库名称。
proxy(可选[str]):连接的代理设置。"""
def __init__(
self,
account: Optional[str] = None,
user: Optional[str] = None,
password: Optional[str] = None,
database: Optional[str] = None,
schema: Optional[str] = None,
warehouse: Optional[str] = None,
role: Optional[str] = None,
proxy: Optional[str] = None,
engine: Optional[Engine] = None,
) -> None:
"""使用可选的连接详细信息、代理配置或直接使用引擎初始化SnowflakeReader。
Args:
account (Optional[str]): Snowflake账户标识符。
user (Optional[str]): Snowflake账户用户名。
password (Optional[str]): Snowflake账户密码。
database (Optional[str]): Snowflake数据库名称。
schema (Optional[str]): Snowflake模式名称。
warehouse (Optional[str]): Snowflake仓库名称。
role (Optional[str]): Snowflake角色名称。
proxy (Optional[str]): 连接的代理设置。
engine (Optional[Engine]): 已存在的SQLAlchemy引擎。
"""
from snowflake.sqlalchemy import URL
if engine is None:
connect_args = {}
if proxy:
connect_args["proxy"] = proxy
# Create an SQLAlchemy engine for Snowflake
self.engine = create_engine(
URL(
account=account or "",
user=user or "",
password=password or "",
database=database or "",
schema=schema or "",
warehouse=warehouse or "",
role=role or "",
),
connect_args=connect_args,
)
else:
self.engine = engine
# Create a sessionmaker bound to the engine
self.Session = sessionmaker(bind=self.engine)
def execute_query(self, query_string: str) -> List[Any]:
"""执行一个SQL查询并返回获取的结果。
Args:
query_string(str):要执行的SQL查询。
Returns:
List[Any]:来自查询的获取结果。
"""
# Create a session and execute the query
session = self.Session()
try:
result = session.execute(text(query_string))
return result.fetchall()
finally:
# Ensure the session is closed after query execution
session.close()
def load_data(self, query: str) -> List[Document]:
"""查询并从数据库中加载数据,返回一个文档列表。
Args:
query (str): 用于过滤表和行的查询参数。
Returns:
List[Document]: 一个Document对象的列表。
"""
documents = []
if query is None:
raise ValueError("A query parameter is necessary to filter the data")
try:
result = self.execute_query(query)
for item in result:
# fetch each item
doc_str = ", ".join([str(entry) for entry in item])
documents.append(Document(text=doc_str))
return documents
except Exception as e:
logger.error(
f"An error occurred while loading the data: {e}", exc_info=True
)
|
execute_query
execute_query(query_string: str) -> List[Any]
执行一个SQL查询并返回获取的结果。
Returns:
Type |
Description |
List[Any]
|
|
Source code in llama_index/readers/snowflake/base.py
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100 | def execute_query(self, query_string: str) -> List[Any]:
"""执行一个SQL查询并返回获取的结果。
Args:
query_string(str):要执行的SQL查询。
Returns:
List[Any]:来自查询的获取结果。
"""
# Create a session and execute the query
session = self.Session()
try:
result = session.execute(text(query_string))
return result.fetchall()
finally:
# Ensure the session is closed after query execution
session.close()
|
load_data
查询并从数据库中加载数据,返回一个文档列表。
Parameters:
Name |
Type |
Description |
Default |
query |
str
|
|
required
|
Returns:
Type |
Description |
List[Document]
|
List[Document]: 一个Document对象的列表。
|
Source code in llama_index/readers/snowflake/base.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 | def load_data(self, query: str) -> List[Document]:
"""查询并从数据库中加载数据,返回一个文档列表。
Args:
query (str): 用于过滤表和行的查询参数。
Returns:
List[Document]: 一个Document对象的列表。
"""
documents = []
if query is None:
raise ValueError("A query parameter is necessary to filter the data")
try:
result = self.execute_query(query)
for item in result:
# fetch each item
doc_str = ", ".join([str(entry) for entry in item])
documents.append(Document(text=doc_str))
return documents
except Exception as e:
logger.error(
f"An error occurred while loading the data: {e}", exc_info=True
)
|