Documentation

使用Flux转换数据

从 InfluxDB 查询数据 时,您通常需要以某种方式转换该数据。常见的例子包括聚合数据、降采样数据等。

本指南演示了如何使用 Flux 函数 转换您的数据。它讲解了创建一个 Flux 脚本,该脚本将数据分成时间窗口,计算每个窗口中的 _value 平均值,并将这些平均值作为新表输出。

如果您不熟悉Flux如何构建和操作数据,请参见 Flux数据模型

查询数据

使用在之前的 从 InfluxDB 查询数据 指南中建立的查询,但更新范围以提取过去一个小时的数据:

from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")

Flux 函数

Flux 提供了许多执行特定操作、转换和任务的函数。 你还可以在你的 Flux 查询中 创建自定义函数函数的详细信息在 Flux 标准库 文档中进行了详细介绍。

在从InfluxDB查询数据时,常用的一种函数是汇总函数。 汇总函数将表中的一组 _value 进行汇总,并将其转换为一个新值。

此示例使用mean()函数在每个时间窗口内计算值的平均值。

以下示例演示了窗口化和聚合数据所需的步骤,但有一个aggregateWindow() 辅助函数可以为你完成这项工作。了解这个过程中的步骤是很重要的。

窗口你的数据

Flux的 window() 函数 根据时间值对记录进行分区。使用 every 参数定义每个窗口的持续时间。

日历月份和年份

every 支持所有 有效的持续时间单位,包括 日历月份 (1mo)年 (1y)

在这个例子中,窗口数据以五分钟的间隔(5m)进行。

from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")
    |> window(every: 5m)

随着数据被收集到时间窗口中,每个窗口作为自己的表输出。可视化时,每个表被分配一个唯一的颜色。

Windowed data tables

聚合窗口数据

Flux 聚合函数获取每个表中的 _value,并以某种方式进行聚合。使用 mean() 函数 来计算每个表的 _value 的平均值。

from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")
    |> window(every: 5m)
    |> mean()

由于每个窗口中的行被聚合,它们的输出表只包含一个带有聚合值的单行。窗口化的表仍然是相互独立的,当可视化时,将显示为单个、不相连的点。

Windowed aggregate data

向您的聚合添加时间

由于值被汇总,结果表中没有 _time 列,因为用于汇总的记录都有不同的时间戳。汇总函数无法推断应该使用哪个时间作为汇总值。因此 _time 列被丢弃。

下一个操作中需要一个 _time 列。要添加一个,请使用duplicate() 函数_stop 列复制为每个窗口表的 _time 列。

from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")
    |> window(every: 5m)
    |> mean()
    |> duplicate(column: "_stop", as: "_time")

未窗口聚合表

使用 window() 函数和 every: inf 参数将所有点收集到一个单一的无限窗口中。

from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")
    |> window(every: 5m)
    |> mean()
    |> duplicate(column: "_stop", as: "_time")
    |> window(every: inf)

一旦解除分组并合并成一个单表,汇总数据点将在您的可视化中显示为连接状态。

Unwindowed aggregate data

辅助函数

这看起来可能是很多编码,仅仅是为了构建一个聚合数据的查询,但是这个过程有助于理解数据在每个函数传递时如何“变形”。

Flux 提供了(并允许您创建)“辅助”函数,这些函数抽象了许多步骤。 本指南中执行的相同操作可以使用 aggregateWindow() 完成。

from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")
    |> aggregateWindow(every: 5m, fn: mean)

恭喜!

您现在已经构造了一个使用 Flux 函数转换数据的 Flux 查询。有许多其他方法可以使用 Flux 的原始函数和您自己的自定义函数来操作数据,但这为基本语法和查询结构提供了良好的介绍。


要更深入地了解窗体和聚合数据及每个转换的示例数据输出,请查看窗口和聚合数据指南。




Flux的未来

Flux 正在进入维护模式。您可以像现在一样继续使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开Alpha测试

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

我们将发布两个产品作为测试版的一部分。

InfluxDB 3 核心,是我们新的开源产品。 它是一个用于时间序列和事件数据的实时数据引擎。 InfluxDB 3 企业版是建立在核心基础之上的商业版本,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始的更多信息,请查看:

由TSM驱动的InfluxDB Cloud