star

提示

本页面仅包含st.connections.SnowflakeConnection类。要深入了解在Streamlit应用程序中创建和管理数据连接,请参阅Connect Streamlit to SnowflakeConnecting to data

使用Snowflake Connector for Python连接到Snowflake。

使用st.connection("snowflake")st.connection("", type="snowflake")初始化此连接对象。SnowflakeConnection的连接参数可以使用secrets.toml和/或 **kwargs指定。连接参数传递给 snowflake.connector.connect()

当应用程序在Snowflake中的Streamlit中运行时, st.connection("snowflake") 会自动使用应用程序所有者的角色进行连接,无需进一步配置。在这种情况下,**kwargs 将被忽略。使用 secrets.toml**kwargs 来配置本地开发的连接。

SnowflakeConnection 包含多个便捷方法。例如,您可以直接使用 .query() 执行 SQL 查询,或使用 .raw_connection 访问底层的 Snowflake Connector 对象。

提示

snowflake-snowpark-python 必须在您的环境中安装才能使用此连接。您可以 安装Snowflake扩展以及Streamlit:

>>> pip install streamlit[snowflake]

重要

账户标识符必须采用-的形式,其中是您的Snowflake组织的名称,是您组织内账户的唯一名称。这是用破折号分隔的,不像在SQL查询中使用时用点分隔。有关更多信息,请参阅账户标识符

类描述[source]

st.connections.SnowflakeConnection(connection_name, **kwargs)

方法

cursor()

从此连接创建一个新的游标对象。

query(sql, *, ttl=None, show_spinner="Running `snowflake.query(...)`.", params=None, **kwargs)

运行一个只读的SQL查询。

reset()

重置此连接,以便下次使用时重新初始化。

从此连接创建一个新的Snowpark会话。

write_pandas(df, table_name, database=None, schema=None, chunk_size=None, **kwargs)

pandas.DataFrame写入Snowflake数据库中的表。

属性

从Python的Snowflake连接器访问底层连接对象。

示例

示例 1:使用 Streamlit 密钥进行配置

您可以使用Streamlit的Secrets management来配置您的Snowflake连接。例如,如果您的账户启用了MFA,您可以使用key-pair authentication进行连接。

.streamlit/secrets.toml:

[connections.snowflake]
account = "xxx-xxx"
user = "xxx"
private_key_file = "/xxx/xxx/xxx.p8"
role = "xxx"
warehouse = "xxx"
database = "xxx"
schema = "xxx"

您的应用程序代码:

import streamlit as st
conn = st.connection("snowflake")
df = conn.query("SELECT * FROM my_table")

示例2:使用关键字参数和外部认证进行配置

您可以使用关键字参数(带或不带secrets.toml)配置您的Snowflake连接。例如,如果您的Snowflake账户支持SSO,您可以使用browser-based SSO设置一个快速的本地连接用于开发。

import streamlit as st
conn = st.connection(
    "snowflake", account="xxx-xxx", user="xxx", authenticator="externalbrowser"
)
df = conn.query("SELECT * FROM my_table")

示例3:使用Snowflake的连接配置文件的命名连接

Snowflake的Python连接器支持连接配置文件,它与Streamlit的SnowflakeConnection很好地集成在一起。如果你已经配置了一个或多个连接,你只需要传递要使用的连接名称即可。

~/.snowflake/connections.toml:

[my_connection]
account = "xxx-xxx"
user = "xxx"
password = "xxx"
warehouse = "xxx"
database = "xxx"
schema = "xxx"

您的应用程序代码:

import streamlit as st
conn = st.connection("my_connection", type="snowflake")
df = conn.query("SELECT * FROM my_table")

示例4:使用Streamlit秘密和Snowflake的连接配置文件的命名连接

如果您有一个名为my_connection的Snowflake配置文件,如示例3所示,您可以通过secrets.toml传递连接名称。

.streamlit/secrets.toml:

[connections.snowflake]
connection_name = "my_connection"

您的应用程序代码:

import streamlit as st
conn = st.connection("snowflake")
df = conn.query("SELECT * FROM my_table")

示例 5:使用环境变量的默认连接

如果您有一个名为my_connection的Snowflake配置文件,如示例3所示,您可以设置一个环境变量来将其声明为默认的Snowflake连接。

SNOWFLAKE_DEFAULT_CONNECTION_NAME = "my_connection"

您的应用程序代码:

import streamlit as st
conn = st.connection("snowflake")
df = conn.query("SELECT * FROM my_table")

示例6:Snowflake连接配置文件中的默认连接

如果您有一个定义了default连接的Snowflake配置文件,Streamlit将自动使用它,如果没有声明其他连接。

~/.snowflake/connections.toml:

[default]
account = "xxx-xxx"
user = "xxx"
password = "xxx"
warehouse = "xxx"
database = "xxx"
schema = "xxx"

您的应用程序代码:

import streamlit as st
conn = st.connection("snowflake")
df = conn.query("SELECT * FROM my_table")

从此连接创建一个新的游标对象。

Snowflake Connector 游标实现了 Python 数据库 API v2.0 规范(PEP-249)。有关更多信息,请参阅 Snowflake Connector for Python 文档

函数签名[来源]

SnowflakeConnection.cursor()

返回

(snowflake.connector.cursor.SnowflakeCursor)

用于连接的游标对象。

示例

以下示例使用游标将多行插入表中。qmark 参数样式被指定为可选关键字参数。或者,参数样式可以在您的连接配置文件中声明。有关更多信息,请参阅Snowflake Connector for Python 文档

import streamlit as st

conn = st.connection("snowflake", "paramstyle"="qmark")
rows_to_insert = [("Mary", "dog"), ("John", "cat"), ("Robert", "bird")]
conn.cursor().executemany(
    "INSERT INTO mytable (name, pet) VALUES (?, ?)", rows_to_insert
)

运行一个只读的SQL查询。

此方法实现了查询结果缓存和简单的错误处理/重试。缓存行为与使用@st.cache_data时相同。

注意

未指定ttl的查询将无限期缓存。

函数签名[source]

SnowflakeConnection.query(sql, *, ttl=None, show_spinner="正在运行 `snowflake.query(...)`。", params=None, **kwargs)

参数

sql (str)

要执行的只读SQL查询。

ttl (float, int, timedelta or None)

缓存中保留结果的最大秒数。如果这是None(默认值),则缓存的结果不会随时间过期。

show_spinner (boolean or string)

是否启用加载动画。当执行缓存的查询时,不会显示加载动画,因为结果立即可用。 当执行新查询时,默认显示一个加载动画,并显示消息“正在运行 snowflake.query(...)。”

如果这是 False,则在执行查询时不会显示加载动画。如果这是一个字符串,该字符串将用作加载动画的消息。

params (list, tuple, dict or None)

传递给Python的Snowflake连接器的参数列表 Cursor.execute() 方法。此连接器支持使用qmark绑定将数据绑定到SQL语句。有关更多信息和 示例,请参阅Snowflake Connector for Python documentation。 默认值为None

返回

(pandas.DataFrame)

查询结果,格式化为pandas DataFrame。

示例

import streamlit as st

conn = st.connection("snowflake")
df = conn.query("SELECT * FROM my_table")
st.dataframe(df)

从Python的Snowflake连接器访问底层连接对象。

有关如何使用Snowflake Connector for Python的信息,请参阅Snowflake Connector for Python文档

函数签名[source]

SnowflakeConnection.raw_connection

返回

(snowflake.connector.connection.SnowflakeConnection)

连接对象。

示例

以下示例使用游标提交异步查询,保存查询ID,然后通过连接定期检查查询状态,最后检索结果。

import streamlit as st
import time

conn = st.connection("snowflake")
cur = conn.cursor()
cur.execute_async("SELECT * FROM my_table")
query_id = cur.sfqid
while True:
    status = conn.raw_connection.get_query_status(query_id)
    if conn.raw_connection.is_still_running(status):
        time.sleep(1)
    else:
        break
cur.get_results_from_sfqid(query_id)
df = cur.fetchall()

重置此连接,以便下次使用时重新初始化。

当连接变得陈旧、认证令牌过期或在类似情况下,重新初始化可能会修复断开的连接时,此方法可能很有用。请注意,某些连接方法可能已经在它们的错误处理代码中使用了reset()

函数签名[source]

SnowflakeConnection.reset()

返回

(None)

无描述

示例

import streamlit as st

conn = st.connection("my_conn")

# Reset the connection before using it if it isn't healthy
# Note: is_healthy() isn't a real method and is just shown for example here.
if not conn.is_healthy():
    conn.reset()

# Do stuff with conn...

从这个连接创建一个新的Snowpark会话。

有关如何使用Snowpark会话的信息,请参阅 Snowpark开发者指南Snowpark API参考

函数签名[source]

SnowflakeConnection.session()

返回

(snowflake.snowpark.Session)

此连接的新 Snowpark 会话。

示例

以下示例创建了一个新的 Snowpark 会话,并使用它来运行查询。

import streamlit as st

conn = st.connection("snowflake")
session = conn.session()
df = session.sql("SELECT * FROM my_table").collect()

pandas.DataFrame写入Snowflake数据库中的表。

这个便捷方法是围绕snowflake.connector.pandas_tools.write_pandas()的一个薄封装,使用底层连接。自动传递conn参数。有关更多信息和其他关键字参数,请参阅Snowflake Connector for Python文档

函数签名[source]

SnowflakeConnection.write_pandas(df, 表名, 数据库=无, 模式=无, 块大小=无, **kwargs)

参数

df (pandas.DataFrame)

包含要复制到表中的数据的pandas.DataFrame对象。

table_name (str)

数据应复制到的表的名称。

database (str)

包含表的数据库名称。默认情况下,函数会写入当前会话中正在使用的数据库。

注意

如果指定此参数,则还必须指定schema参数。

schema (str)

包含表的模式名称。默认情况下,函数将写入当前会话中使用的模式中的表。

chunk_size (int)

每次插入的元素数量。默认情况下,函数会一次性插入所有元素。

**kwargs (Any)

用于 snowflake.connector.pandas_tools.write_pandas()的额外关键字参数。

返回

(tuple[bool, int, int])

包含三个值的元组:

  1. 一个布尔值,如果写入成功则为True
  2. 一个整数,表示复制的数据块数量。
  3. 一个整数,表示插入的行数。

示例

以下示例使用会话中当前使用的数据库和模式,并将数据复制到名为“my_table”的表中。

import streamlit as st
import pandas as pd

df = pd.DataFrame(
    {"Name": ["Mary", "John", "Robert"], "Pet": ["dog", "cat", "bird"]}
)
conn = st.connection("snowflake")
conn.write_pandas(df, "my_table")
forum

还有问题吗?

我们的 论坛 充满了有用的信息和Streamlit专家。