Start and stop a streaming fragment

Streamlit 允许你将函数转换为片段,这些片段可以独立于完整脚本重新运行。此外,你可以告诉 Streamlit 以设定的时间间隔重新运行一个片段。这对于流式数据或监控过程非常有用。你可能希望用户启动和停止这种实时流。为此,可以通过编程方式为你的片段设置run_every参数。

  • 使用片段来流式传输实时数据。
  • 启动和停止片段自动重新运行。
  • 您的Python环境中必须安装以下内容:

    streamlit>=1.37.0
  • 你应该有一个干净的工作目录,名为 your-repository

  • 你应该对片段有一个基本的理解。

在这个例子中,你将构建一个应用程序,该应用程序在折线图中流式传输两个数据系列。你的应用程序将在会话首次加载时收集最近的数据,并静态显示折线图。侧边栏中的两个按钮将允许用户启动和停止数据流,以实时更新图表。你将使用一个片段来管理实时更新的频率和范围。

以下是您将要构建的内容:

Complete codeexpand_more
import streamlit as st import pandas as pd import numpy as np from datetime import datetime, timedelta def get_recent_data(last_timestamp): """Generate and return data from last timestamp to now, at most 60 seconds.""" now = datetime.now() if now - last_timestamp > timedelta(seconds=60): last_timestamp = now - timedelta(seconds=60) sample_time = timedelta(seconds=0.5) # time between data points next_timestamp = last_timestamp + sample_time timestamps = np.arange(next_timestamp, now, sample_time) sample_values = np.random.randn(len(timestamps), 2) data = pd.DataFrame(sample_values, index=timestamps, columns=["A", "B"]) return data if "data" not in st.session_state: st.session_state.data = get_recent_data(datetime.now() - timedelta(seconds=60)) if "stream" not in st.session_state: st.session_state.stream = False def toggle_streaming(): st.session_state.stream = not st.session_state.stream st.title("Data feed") st.sidebar.slider( "Check for updates every: (seconds)", 0.5, 5.0, value=1.0, key="run_every" ) st.sidebar.button( "Start streaming", disabled=st.session_state.stream, on_click=toggle_streaming ) st.sidebar.button( "Stop streaming", disabled=not st.session_state.stream, on_click=toggle_streaming ) if st.session_state.stream is True: run_every = st.session_state.run_every else: run_every = None @st.fragment(run_every=run_every) def show_latest_data(): last_timestamp = st.session_state.data.index[-1] st.session_state.data = pd.concat( [st.session_state.data, get_recent_data(last_timestamp)] ) st.session_state.data = st.session_state.data[-100:] st.line_chart(st.session_state.data) show_latest_data()
  1. your_repository 中,创建一个名为 app.py 的文件。

  2. 在终端中,将目录更改为 your_repository 并启动您的应用程序。

    streamlit run app.py

    由于您仍然需要添加代码,您的应用程序将是空白的。

  3. app.py 中,编写以下内容:

    import streamlit as st import pandas as pd import numpy as np from datetime import datetime, timedelta

    你将按以下方式使用这些库:

    • 你将在 pandas.DataFrame 中处理两个数据系列。
    • 你将使用 numpy 生成随机数据。
    • 数据将具有 datetime.datetime 索引值。
  4. 保存您的app.py文件并查看正在运行的应用程序。

  5. 点击“始终重新运行”或在运行的应用中按下“A”键。

    当您保存对app.py的更改时,您的运行预览将自动更新。您的预览仍将是空白的。返回您的代码。

首先,您将定义一个函数来随机生成两个时间序列的一些数据,您将称之为“A”和“B”。如果您只想复制该函数,可以跳过此部分。

Complete function to randomly generate sales dataexpand_more
def get_recent_data(last_timestamp): """Generate and return data from last timestamp to now, at most 60 seconds.""" now = datetime.now() if now - last_timestamp > timedelta(seconds=60): last_timestamp = now - timedelta(seconds=60) sample_time = timedelta(seconds=0.5) # time between data points next_timestamp = last_timestamp + sample_time timestamps = np.arange(next_timestamp, now, sample_time) sample_values = np.random.randn(len(timestamps), 2) data = pd.DataFrame(sample_values, index=timestamps, columns=["A", "B"]) return data
  1. 开始你的函数定义。

    def get_recent_data(last_timestamp): """生成并返回从上次时间戳到现在的数据,最多60秒。"""

    你将把最近数据点的时间戳传递给你的数据生成函数。你的函数将使用这个时间戳来仅返回新数据。

  2. 获取当前时间,如果上次时间戳超过60秒,则调整上次时间戳。

    now = datetime.now() if now - last_timestamp > timedelta(seconds=60): last_timestamp = now - timedelta(seconds=60)

    通过更新上次时间戳,您将确保函数永远不会返回超过60秒的数据。

  3. 声明一个新变量 sample_time,用于定义数据点之间的时间间隔。计算第一个新数据点的时间戳。

    sample_time = timedelta(seconds=0.5) # 数据点之间的时间间隔 next_timestamp = last_timestamp + sample_time
  4. 创建一个datetime.datetime索引并生成两个相同长度的数据系列。

    timestamps = np.arange(next_timestamp, now, sample_time) sample_values = np.random.randn(len(timestamps), 2)
  5. 将数据系列与索引合并到pandas.DataFrame中并返回数据。

    data = pd.DataFrame(sample_values, index=timestamps, columns=["A", "B"]) return data
  6. 可选:通过调用并显示数据来测试您的函数。

    data = get_recent_data(datetime.now() - timedelta(seconds=60)) data

    保存您的app.py文件以查看预览。完成后删除这两行。

由于你将动态更改@st.fragment()run_every参数,你需要在定义片段函数之前初始化相关的变量和会话状态值。你的片段函数还将读取和更新会话状态中的值,因此你现在可以定义这些值,以使片段函数更易于理解。

  1. 在会话中首次加载应用程序时初始化您的数据。

    if "data" not in st.session_state: st.session_state.data = get_recent_data(datetime.now() - timedelta(seconds=60))

    在用户开始流式传输数据之前,您的应用程序将在静态折线图中显示此初始数据。

  2. 在会话状态中初始化 "stream" 以开启和关闭流式传输。默认设置为关闭 (False)。

    if "stream" not in st.session_state: st.session_state.stream = False
  3. 创建一个回调函数来切换"stream"TrueFalse之间。

    def toggle_streaming(): st.session_state.stream = not st.session_state.stream
  4. 为您的应用程序添加一个标题。

    st.title("Data feed")
  5. 在侧边栏添加一个滑块,用于设置流式传输时检查数据的频率。

    st.sidebar.slider( "Check for updates every: (seconds)", 0.5, 5.0, value=1.0, key="run_every" )
  6. 在侧边栏添加按钮以开启和关闭流媒体。

    st.sidebar.button( "Start streaming", disabled=st.session_state.stream, on_click=toggle_streaming ) st.sidebar.button( "Stop streaming", disabled=not st.session_state.stream, on_click=toggle_streaming )

    这两个函数使用相同的回调函数来切换Session State中的"stream"。使用当前值"stream"来禁用其中一个按钮。这确保了按钮始终与当前状态一致;"Start streaming"仅在流媒体关闭时可点击,而"Stop streaming"仅在流媒体开启时可点击。按钮还通过高亮显示可用的操作为用户提供状态信息。

  7. 创建并设置一个新变量run_every,它将决定片段函数是否会自动重新运行(以及运行速度)。

    if st.session_state.stream is True: run_every = st.session_state.run_every else: run_every = None

为了让用户能够开启和关闭数据流,你必须在@st.fragment()装饰器中设置run_every参数。

Complete function to show and stream dataexpand_more
@st.fragment(run_every=run_every) def show_latest_data(): last_timestamp = st.session_state.data.index[-1] st.session_state.data = pd.concat( [st.session_state.data, get_recent_data(last_timestamp)] ) st.session_state.data = st.session_state.data[-100:] st.line_chart(st.session_state.data)
  1. 使用一个@st.fragment装饰器并开始你的函数定义。

    @st.fragment(run_every=run_every) def show_latest_data():

    使用上面声明的run_every变量来设置同名的参数。

  2. 检索会话状态中最后一个数据点的时间戳。

    last_timestamp = st.session_state.data.index[-1]
  3. 更新会话状态中的数据并修剪以仅保留最后100个时间戳。

    st.session_state.data = pd.concat( [st.session_state.data, get_recent_data(last_timestamp)] ) st.session_state.data = st.session_state.data[-100:]
  4. 在线图中显示数据。

    st.line_chart(st.session_state.data)

    您的片段函数定义已完成。

  1. 在代码的底部调用你的函数。

    show_latest_data()
  2. 通过点击“开始流式传输”来测试您的应用程序。尝试调整更新的频率。

尝试调整数据生成的频率或保留在会话状态中的数据量。在get_recent_data中,尝试使用小部件设置sample_time

尝试使用 st.plotly_chartst.altair_chart 为您的图表添加标签。

forum

还有问题吗?

我们的 论坛 充满了有用的信息和Streamlit专家。