提示
本页面仅包含st.connections.SnowflakeConnection类。要深入了解在Streamlit应用程序中创建和管理数据连接,请参阅Connect Streamlit to Snowflake和Connecting to data。
使用Snowflake Connector for Python连接到Snowflake。
使用st.connection("snowflake")或
st.connection("
当应用程序在Snowflake中的Streamlit中运行时, st.connection("snowflake") 会自动使用应用程序所有者的角色进行连接,无需进一步配置。在这种情况下,**kwargs 将被忽略。使用 secrets.toml 和 **kwargs 来配置本地开发的连接。
SnowflakeConnection 包含多个便捷方法。例如,您可以直接使用 .query() 执行 SQL 查询,或使用 .raw_connection 访问底层的 Snowflake Connector 对象。
提示
snowflake-snowpark-python 必须在您的环境中安装才能使用此连接。您可以 安装Snowflake扩展以及Streamlit:
>>> pip install streamlit[snowflake]
重要
账户标识符必须采用
| 类描述[source] | |
|---|---|
st.connections.SnowflakeConnection(connection_name, **kwargs) | |
| 方法 | |
cursor() | 从此连接创建一个新的游标对象。 |
query(sql, *, ttl=None, show_spinner="Running `snowflake.query(...)`.", params=None, **kwargs) | 运行一个只读的SQL查询。 |
reset() | 重置此连接,以便下次使用时重新初始化。 |
session() | 从此连接创建一个新的Snowpark会话。 |
write_pandas(df, table_name, database=None, schema=None, chunk_size=None, **kwargs) | 将pandas.DataFrame写入Snowflake数据库中的表。 |
| 属性 | |
从Python的Snowflake连接器访问底层连接对象。 | |
示例
示例 1:使用 Streamlit 密钥进行配置
您可以使用Streamlit的Secrets management来配置您的Snowflake连接。例如,如果您的账户启用了MFA,您可以使用key-pair authentication进行连接。
.streamlit/secrets.toml:
[connections.snowflake] account = "xxx-xxx" user = "xxx" private_key_file = "/xxx/xxx/xxx.p8" role = "xxx" warehouse = "xxx" database = "xxx" schema = "xxx"您的应用程序代码:
import streamlit as st conn = st.connection("snowflake") df = conn.query("SELECT * FROM my_table")示例2:使用关键字参数和外部认证进行配置
您可以使用关键字参数(带或不带secrets.toml)配置您的Snowflake连接。例如,如果您的Snowflake账户支持SSO,您可以使用browser-based SSO设置一个快速的本地连接用于开发。
import streamlit as st conn = st.connection( "snowflake", account="xxx-xxx", user="xxx", authenticator="externalbrowser" ) df = conn.query("SELECT * FROM my_table")示例3:使用Snowflake的连接配置文件的命名连接
Snowflake的Python连接器支持连接配置文件,它与Streamlit的SnowflakeConnection很好地集成在一起。如果你已经配置了一个或多个连接,你只需要传递要使用的连接名称即可。
~/.snowflake/connections.toml:
[my_connection] account = "xxx-xxx" user = "xxx" password = "xxx" warehouse = "xxx" database = "xxx" schema = "xxx"您的应用程序代码:
import streamlit as st conn = st.connection("my_connection", type="snowflake") df = conn.query("SELECT * FROM my_table")示例4:使用Streamlit秘密和Snowflake的连接配置文件的命名连接
如果您有一个名为my_connection的Snowflake配置文件,如示例3所示,您可以通过secrets.toml传递连接名称。
.streamlit/secrets.toml:
[connections.snowflake] connection_name = "my_connection"您的应用程序代码:
import streamlit as st conn = st.connection("snowflake") df = conn.query("SELECT * FROM my_table")示例 5:使用环境变量的默认连接
如果您有一个名为my_connection的Snowflake配置文件,如示例3所示,您可以设置一个环境变量来将其声明为默认的Snowflake连接。
SNOWFLAKE_DEFAULT_CONNECTION_NAME = "my_connection"您的应用程序代码:
import streamlit as st conn = st.connection("snowflake") df = conn.query("SELECT * FROM my_table")示例6:Snowflake连接配置文件中的默认连接
如果您有一个定义了default连接的Snowflake配置文件,Streamlit将自动使用它,如果没有声明其他连接。
~/.snowflake/connections.toml:
[default] account = "xxx-xxx" user = "xxx" password = "xxx" warehouse = "xxx" database = "xxx" schema = "xxx"您的应用程序代码:
import streamlit as st conn = st.connection("snowflake") df = conn.query("SELECT * FROM my_table")
从此连接创建一个新的游标对象。
Snowflake Connector 游标实现了 Python 数据库 API v2.0 规范(PEP-249)。有关更多信息,请参阅 Snowflake Connector for Python 文档。
| 函数签名[来源] | |
|---|---|
SnowflakeConnection.cursor() | |
| 返回 | |
(snowflake.connector.cursor.SnowflakeCursor) | 用于连接的游标对象。 |
示例
以下示例使用游标将多行插入表中。qmark 参数样式被指定为可选关键字参数。或者,参数样式可以在您的连接配置文件中声明。有关更多信息,请参阅Snowflake Connector for Python 文档。
import streamlit as st conn = st.connection("snowflake", "paramstyle"="qmark") rows_to_insert = [("Mary", "dog"), ("John", "cat"), ("Robert", "bird")] conn.cursor().executemany( "INSERT INTO mytable (name, pet) VALUES (?, ?)", rows_to_insert )
运行一个只读的SQL查询。
此方法实现了查询结果缓存和简单的错误处理/重试。缓存行为与使用@st.cache_data时相同。
注意
未指定ttl的查询将无限期缓存。
| 函数签名[source] | |
|---|---|
SnowflakeConnection.query(sql, *, ttl=None, show_spinner="正在运行 `snowflake.query(...)`。", params=None, **kwargs) | |
| 参数 | |
sql (str) | 要执行的只读SQL查询。 |
ttl (float, int, timedelta or None) | 缓存中保留结果的最大秒数。如果这是None(默认值),则缓存的结果不会随时间过期。 |
show_spinner (boolean or string) | 是否启用加载动画。当执行缓存的查询时,不会显示加载动画,因为结果立即可用。 当执行新查询时,默认显示一个加载动画,并显示消息“正在运行 snowflake.query(...)。” 如果这是 False,则在执行查询时不会显示加载动画。如果这是一个字符串,该字符串将用作加载动画的消息。 |
params (list, tuple, dict or None) | 传递给Python的Snowflake连接器的参数列表 Cursor.execute() 方法。此连接器支持使用qmark绑定将数据绑定到SQL语句。有关更多信息和 示例,请参阅Snowflake Connector for Python documentation。 默认值为None。 |
| 返回 | |
(pandas.DataFrame) | 查询结果,格式化为pandas DataFrame。 |
示例
import streamlit as st
conn = st.connection("snowflake")
df = conn.query("SELECT * FROM my_table")
st.dataframe(df)
从Python的Snowflake连接器访问底层连接对象。
有关如何使用Snowflake Connector for Python的信息,请参阅Snowflake Connector for Python文档。
| 函数签名[source] | |
|---|---|
SnowflakeConnection.raw_connection | |
| 返回 | |
(snowflake.connector.connection.SnowflakeConnection) | 连接对象。 |
示例
以下示例使用游标提交异步查询,保存查询ID,然后通过连接定期检查查询状态,最后检索结果。
import streamlit as st import time conn = st.connection("snowflake") cur = conn.cursor() cur.execute_async("SELECT * FROM my_table") query_id = cur.sfqid while True: status = conn.raw_connection.get_query_status(query_id) if conn.raw_connection.is_still_running(status): time.sleep(1) else: break cur.get_results_from_sfqid(query_id) df = cur.fetchall()
重置此连接,以便下次使用时重新初始化。
当连接变得陈旧、认证令牌过期或在类似情况下,重新初始化可能会修复断开的连接时,此方法可能很有用。请注意,某些连接方法可能已经在它们的错误处理代码中使用了reset()。
| 函数签名[source] | |
|---|---|
SnowflakeConnection.reset() | |
| 返回 | |
(None) | 无描述 |
示例
import streamlit as st
conn = st.connection("my_conn")
# Reset the connection before using it if it isn't healthy
# Note: is_healthy() isn't a real method and is just shown for example here.
if not conn.is_healthy():
conn.reset()
# Do stuff with conn...
从这个连接创建一个新的Snowpark会话。
有关如何使用Snowpark会话的信息,请参阅 Snowpark开发者指南 和Snowpark API参考。
| 函数签名[source] | |
|---|---|
SnowflakeConnection.session() | |
| 返回 | |
(snowflake.snowpark.Session) | 此连接的新 Snowpark 会话。 |
示例
以下示例创建了一个新的 Snowpark 会话,并使用它来运行查询。
import streamlit as st conn = st.connection("snowflake") session = conn.session() df = session.sql("SELECT * FROM my_table").collect()
将pandas.DataFrame写入Snowflake数据库中的表。
这个便捷方法是围绕snowflake.connector.pandas_tools.write_pandas()的一个薄封装,使用底层连接。自动传递conn参数。有关更多信息和其他关键字参数,请参阅Snowflake Connector for Python文档。
| 函数签名[source] | |
|---|---|
SnowflakeConnection.write_pandas(df, 表名, 数据库=无, 模式=无, 块大小=无, **kwargs) | |
| 参数 | |
df (pandas.DataFrame) | 包含要复制到表中的数据的pandas.DataFrame对象。 |
table_name (str) | 数据应复制到的表的名称。 |
database (str) | 包含表的数据库名称。默认情况下,函数会写入当前会话中正在使用的数据库。 注意 如果指定此参数,则还必须指定schema参数。 |
schema (str) | 包含表的模式名称。默认情况下,函数将写入当前会话中使用的模式中的表。 |
chunk_size (int) | 每次插入的元素数量。默认情况下,函数会一次性插入所有元素。 |
**kwargs (Any) | 用于 snowflake.connector.pandas_tools.write_pandas()的额外关键字参数。 |
| 返回 | |
(tuple[bool, int, int]) | 包含三个值的元组:
|
示例
以下示例使用会话中当前使用的数据库和模式,并将数据复制到名为“my_table”的表中。
import streamlit as st import pandas as pd df = pd.DataFrame( {"Name": ["Mary", "John", "Robert"], "Pet": ["dog", "cat", "bird"]} ) conn = st.connection("snowflake") conn.write_pandas(df, "my_table")
还有问题吗?
我们的 论坛 充满了有用的信息和Streamlit专家。