Documentation

使用pandas分析数据

使用 pandas,Python 数据分析库,来处理、分析和可视化存储在 InfluxDB 集群数据库中的数据。

pandas 是一个开源的,BSD 许可的库,为 Python 编程语言提供高性能、易于使用的数据结构和数据分析工具。

安装先决条件

本指南中的示例假设使用Python虚拟环境和InfluxDB 3 influxdb3-python Python客户端库。有关更多信息,请参阅如何开始使用Python查询InfluxDB

安装 influxdb3-python 也会安装 pyarrow 库,该库为 Apache Arrow 提供 Python 绑定。

安装 pandas

要使用pandas,您需要安装并导入pandas库。

在您的终端中,使用 pip 在您的活动 Python 虚拟环境 中安装 pandas

pip install pandas

使用 PyArrow 将查询结果转换为 pandas

以下步骤使用Python、influxdb3-pythonpyarrow来查询InfluxDB并将Arrow数据流发送到pandas DataFrame

  1. 在你的编辑器中,将以下代码复制并粘贴到一个新文件中——例如, pandas-example.py:

    # pandas-example.py
    
    from influxdb_client_3 import InfluxDBClient3
    import pandas
    
    # Instantiate an InfluxDB client configured for a database
    client = InfluxDBClient3(
      "https://cluster-host.com",
      database="
    DATABASE_NAME
    "
    ,
    token="
    DATABASE_TOKEN
    "
    )
    # Execute the query to retrieve all record batches in the stream # formatted as a PyArrow Table. table = client.query( '''SELECT * FROM home WHERE time >= now() - INTERVAL '90 days' ORDER BY time''' ) client.close() # Convert the PyArrow Table to a pandas DataFrame. dataframe = table.to_pandas() print(dataframe)
  2. 替换以下配置值:

    • DATABASE_NAME: 要查询的数据库的名称
    • DATABASE_TOKEN: 一个 数据库令牌 具有对指定数据库的 读取 权限
  3. 在你的终端中,使用Python解释器运行该文件:

    python pandas-example.py
    

示例调用以下方法:

查看示例结果

接下来,使用pandas分析数据

使用pandas分析数据

查看数据信息和统计

下面的示例展示如何使用 pandas DataFrame 方法来转换和总结存储在 InfluxDB 集群中的数据。

# pandas-example.py

from influxdb_client_3 import InfluxDBClient3
import pandas

# Instantiate an InfluxDB client configured for a database
client = InfluxDBClient3(
  "https://cluster-host.com",
  database="
DATABASE_NAME
"
,
token="
DATABASE_TOKEN
"
)
# Execute the query to retrieve all record batches in the stream # formatted as a PyArrow Table. table = client.query( '''SELECT * FROM home WHERE time >= now() - INTERVAL '90 days' ORDER BY time''' ) client.close() # Convert the PyArrow Table to a pandas DataFrame. dataframe = table.to_pandas() # Print information about the results DataFrame, # including the index dtype and columns, non-null values, and memory usage. dataframe.info() # Calculate descriptive statistics that summarize the distribution of the results. print(dataframe.describe()) # Extract a DataFrame column. print(dataframe['temp']) # Print the DataFrame in Markdown format. print(dataframe.to_markdown())

替换以下配置值:

  • DATABASE_NAME: 要查询的InfluxDB 数据库的名称
  • DATABASE_TOKEN: 一个 数据库令牌 具有指定数据库的读取权限

降采样时间序列

pandas库提供了丰富的功能用于处理时间序列数据。

pandas.DataFrame.resample()方法对数据进行下采样和上采样,以时间为基础进行分组–例如:

# pandas-example.py

...

# Use the `time` column to generate a DatetimeIndex for the DataFrame
dataframe = dataframe.set_index('time')

# Print information about the index
print(dataframe.index)

# Downsample data into 1-hour groups based on the DatetimeIndex
resample = dataframe.resample("1H")

# Print a summary that shows the start time and average temp for each group
print(resample['temp'].mean())

查看示例结果

有关更多详细信息和示例,请参阅 pandas documentation



Flux的未来

Flux 正在进入维护模式。您可以像现在一样继续使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开Alpha测试

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

我们将发布两个产品作为测试版的一部分。

InfluxDB 3 核心,是我们新的开源产品。 它是一个用于时间序列和事件数据的实时数据引擎。 InfluxDB 3 企业版是建立在核心基础之上的商业版本,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始的更多信息,请查看: