Start and stop a streaming fragment
Streamlit 允许你将函数转换为片段,这些片段可以独立于完整脚本重新运行。此外,你可以告诉 Streamlit 以设定的时间间隔重新运行一个片段。这对于流式数据或监控过程非常有用。你可能希望用户启动和停止这种实时流。为此,可以通过编程方式为你的片段设置run_every参数。
Applied concepts
- 使用片段来流式传输实时数据。
- 启动和停止片段自动重新运行。
Prerequisites
-
您的Python环境中必须安装以下内容:
streamlit>=1.37.0 -
你应该有一个干净的工作目录,名为
your-repository。 -
你应该对片段有一个基本的理解。
Summary
在这个例子中,你将构建一个应用程序,该应用程序在折线图中流式传输两个数据系列。你的应用程序将在会话首次加载时收集最近的数据,并静态显示折线图。侧边栏中的两个按钮将允许用户启动和停止数据流,以实时更新图表。你将使用一个片段来管理实时更新的频率和范围。
以下是您将要构建的内容:
import streamlit as st
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
def get_recent_data(last_timestamp):
"""Generate and return data from last timestamp to now, at most 60 seconds."""
now = datetime.now()
if now - last_timestamp > timedelta(seconds=60):
last_timestamp = now - timedelta(seconds=60)
sample_time = timedelta(seconds=0.5) # time between data points
next_timestamp = last_timestamp + sample_time
timestamps = np.arange(next_timestamp, now, sample_time)
sample_values = np.random.randn(len(timestamps), 2)
data = pd.DataFrame(sample_values, index=timestamps, columns=["A", "B"])
return data
if "data" not in st.session_state:
st.session_state.data = get_recent_data(datetime.now() - timedelta(seconds=60))
if "stream" not in st.session_state:
st.session_state.stream = False
def toggle_streaming():
st.session_state.stream = not st.session_state.stream
st.title("Data feed")
st.sidebar.slider(
"Check for updates every: (seconds)", 0.5, 5.0, value=1.0, key="run_every"
)
st.sidebar.button(
"Start streaming", disabled=st.session_state.stream, on_click=toggle_streaming
)
st.sidebar.button(
"Stop streaming", disabled=not st.session_state.stream, on_click=toggle_streaming
)
if st.session_state.stream is True:
run_every = st.session_state.run_every
else:
run_every = None
@st.fragment(run_every=run_every)
def show_latest_data():
last_timestamp = st.session_state.data.index[-1]
st.session_state.data = pd.concat(
[st.session_state.data, get_recent_data(last_timestamp)]
)
st.session_state.data = st.session_state.data[-100:]
st.line_chart(st.session_state.data)
show_latest_data()
Build the example
Initialize your app
-
在
your_repository中,创建一个名为app.py的文件。 -
在终端中,将目录更改为
your_repository并启动您的应用程序。streamlit run app.py由于您仍然需要添加代码,您的应用程序将是空白的。
-
在
app.py中,编写以下内容:import streamlit as st import pandas as pd import numpy as np from datetime import datetime, timedelta你将按以下方式使用这些库:
- 你将在
pandas.DataFrame中处理两个数据系列。 - 你将使用
numpy生成随机数据。 - 数据将具有
datetime.datetime索引值。
- 你将在
-
保存您的
app.py文件并查看正在运行的应用程序。 -
点击“始终重新运行”或在运行的应用中按下“A”键。
当您保存对
app.py的更改时,您的运行预览将自动更新。您的预览仍将是空白的。返回您的代码。
Build a function to generate random, recent data
首先,您将定义一个函数来随机生成两个时间序列的一些数据,您将称之为“A”和“B”。如果您只想复制该函数,可以跳过此部分。
def get_recent_data(last_timestamp):
"""Generate and return data from last timestamp to now, at most 60 seconds."""
now = datetime.now()
if now - last_timestamp > timedelta(seconds=60):
last_timestamp = now - timedelta(seconds=60)
sample_time = timedelta(seconds=0.5) # time between data points
next_timestamp = last_timestamp + sample_time
timestamps = np.arange(next_timestamp, now, sample_time)
sample_values = np.random.randn(len(timestamps), 2)
data = pd.DataFrame(sample_values, index=timestamps, columns=["A", "B"])
return data
-
开始你的函数定义。
def get_recent_data(last_timestamp): """生成并返回从上次时间戳到现在的数据,最多60秒。"""你将把最近数据点的时间戳传递给你的数据生成函数。你的函数将使用这个时间戳来仅返回新数据。
-
获取当前时间,如果上次时间戳超过60秒,则调整上次时间戳。
now = datetime.now() if now - last_timestamp > timedelta(seconds=60): last_timestamp = now - timedelta(seconds=60)通过更新上次时间戳,您将确保函数永远不会返回超过60秒的数据。
-
声明一个新变量
sample_time,用于定义数据点之间的时间间隔。计算第一个新数据点的时间戳。sample_time = timedelta(seconds=0.5) # 数据点之间的时间间隔 next_timestamp = last_timestamp + sample_time -
创建一个
datetime.datetime索引并生成两个相同长度的数据系列。timestamps = np.arange(next_timestamp, now, sample_time) sample_values = np.random.randn(len(timestamps), 2) -
将数据系列与索引合并到
pandas.DataFrame中并返回数据。data = pd.DataFrame(sample_values, index=timestamps, columns=["A", "B"]) return data -
可选:通过调用并显示数据来测试您的函数。
data = get_recent_data(datetime.now() - timedelta(seconds=60)) data保存您的
app.py文件以查看预览。完成后删除这两行。
Initialize Session State values for your app
由于你将动态更改@st.fragment()的run_every参数,你需要在定义片段函数之前初始化相关的变量和会话状态值。你的片段函数还将读取和更新会话状态中的值,因此你现在可以定义这些值,以使片段函数更易于理解。
-
在会话中首次加载应用程序时初始化您的数据。
if "data" not in st.session_state: st.session_state.data = get_recent_data(datetime.now() - timedelta(seconds=60))在用户开始流式传输数据之前,您的应用程序将在静态折线图中显示此初始数据。
-
在会话状态中初始化
"stream"以开启和关闭流式传输。默认设置为关闭 (False)。if "stream" not in st.session_state: st.session_state.stream = False -
创建一个回调函数来切换
"stream"在True和False之间。def toggle_streaming(): st.session_state.stream = not st.session_state.stream -
为您的应用程序添加一个标题。
st.title("Data feed") -
在侧边栏添加一个滑块,用于设置流式传输时检查数据的频率。
st.sidebar.slider( "Check for updates every: (seconds)", 0.5, 5.0, value=1.0, key="run_every" ) -
在侧边栏添加按钮以开启和关闭流媒体。
st.sidebar.button( "Start streaming", disabled=st.session_state.stream, on_click=toggle_streaming ) st.sidebar.button( "Stop streaming", disabled=not st.session_state.stream, on_click=toggle_streaming )这两个函数使用相同的回调函数来切换Session State中的
"stream"。使用当前值"stream"来禁用其中一个按钮。这确保了按钮始终与当前状态一致;"Start streaming"仅在流媒体关闭时可点击,而"Stop streaming"仅在流媒体开启时可点击。按钮还通过高亮显示可用的操作为用户提供状态信息。 -
创建并设置一个新变量
run_every,它将决定片段函数是否会自动重新运行(以及运行速度)。if st.session_state.stream is True: run_every = st.session_state.run_every else: run_every = None
Build a fragment function to stream data
为了让用户能够开启和关闭数据流,你必须在@st.fragment()装饰器中设置run_every参数。
@st.fragment(run_every=run_every)
def show_latest_data():
last_timestamp = st.session_state.data.index[-1]
st.session_state.data = pd.concat(
[st.session_state.data, get_recent_data(last_timestamp)]
)
st.session_state.data = st.session_state.data[-100:]
st.line_chart(st.session_state.data)
-
使用一个
@st.fragment装饰器并开始你的函数定义。@st.fragment(run_every=run_every) def show_latest_data():使用上面声明的
run_every变量来设置同名的参数。 -
检索会话状态中最后一个数据点的时间戳。
last_timestamp = st.session_state.data.index[-1] -
更新会话状态中的数据并修剪以仅保留最后100个时间戳。
st.session_state.data = pd.concat( [st.session_state.data, get_recent_data(last_timestamp)] ) st.session_state.data = st.session_state.data[-100:] -
在线图中显示数据。
st.line_chart(st.session_state.data)您的片段函数定义已完成。
Call and test out your fragment function
-
在代码的底部调用你的函数。
show_latest_data() -
通过点击“开始流式传输”来测试您的应用程序。尝试调整更新的频率。
Next steps
尝试调整数据生成的频率或保留在会话状态中的数据量。在get_recent_data中,尝试使用小部件设置sample_time。
尝试使用 st.plotly_chart 或 st.altair_chart 为您的图表添加标签。
还有问题吗?
我们的 论坛 充满了有用的信息和Streamlit专家。