Documentation

开始处理数据

现在您已经了解了从InfluxDB查询数据的基础知识, 让我们超越基本查询,开始处理查询的数据。 “处理”数据可能意味着转换、聚合、下采样或对数据发出警报。 本教程涵盖以下数据处理用例:

大多数数据处理操作都需要手动编辑 Flux 查询。 如果您正在使用 InfluxDB 数据浏览器,请切换到 脚本编辑器 而不是使用 查询构建器。

重新映射或分配数据中的值

使用 map() 函数 来遍历数据中的每一行并更新该行中的值。map() 是 Flux 中最有用的函数之一,将帮助您完成许多需要执行的数据处理操作。

了解更多关于 map() 如何工作的内容

from(bucket: "get-started")
    |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
    |> filter(fn: (r) => r._measurement == "home")
    |> filter(fn: (r) => r._field == "hum")
    |> map(fn: (r) => ({r with _value: r._value / 100.0}))

映射示例

执行数学运算

条件性地分配状态

数据警报

分组数据

使用group()函数根据特定列值重新分组您的数据,以便进行进一步处理。

from(bucket: "get-started")
    |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
    |> filter(fn: (r) => r._measurement == "home")
    |> group(columns: ["room", "_field"])

理解数据分组及其重要性很重要,但对于本“入门”教程来说可能太复杂。有关数据如何分组以及为什么重要的更多信息,请参见Flux 数据模型文档。

默认情况下, from() 返回从 InfluxDB 查询的数据,按系列分组 (测量、标签和字段键)。 返回的表流中的每个表代表一个组。 每个表包含按数据分组的列的相同值。 这种分组非常重要,因为您 聚合数据

组示例

按特定列分组数据

解散数据

聚合或选择特定数据

使用 Flux aggregateselector 函数从 每个 输入表中返回汇总或选定的值。

from(bucket: "get-started")
    |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
    |> filter(fn: (r) => r._measurement == "home")
    |> filter(fn: (r) => r._field == "co" or r._field == "hum" or r._field == "temp")
    |> mean()

按时间汇总

如果你想查询随着时间变化的聚合值,这是一种 降采样的形式。

聚合函数

聚合函数 删除 不在 分组键 中的列,并为每个输入表返回一行,该行包含该表的聚合值。

聚合示例

计算每个房间的平均温度

计算所有房间的整体平均温度

统计所有字段中每个房间报告的点数

分配一个新的汇总时间戳

_time 通常不是分组键的一部分,在使用聚合函数时会被丢弃。要为聚合点分配一个新的时间戳,重复代表查询边界的 _start_stop 列作为新的 _time 列。

from(bucket: "get-started")
    |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
    |> filter(fn: (r) => r._measurement == "home")
    |> filter(fn: (r) => r._field == "temp")
    |> mean()
    |> duplicate(column: "_stop", as: "_time")

选择器函数

选择器函数 从每个输入表中返回一个或多个列,并保留所有列及其值。

选择器示例

返回每个房间的第一个温度

返回每个房间的最后温度

返回每个房间的最高温度

将数据透视为关系模式

如果来自关系型 SQL 或类似 SQL 的查询语言,如 InfluxQL,Flux 使用的数据模型与您习惯的不同。Flux 返回多个表格,每个表格包含不同的字段。“关系型”模式将每个字段结构化为每一行中的一列。

使用pivot()函数将数据根据时间戳转换为“关系型”模式。

from(bucket: "get-started")
    |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
    |> filter(fn: (r) => r._measurement == "home")
    |> filter(fn: (r) => r._field == "co" or r._field == "hum" or r._field == "temp")
    |> filter(fn: (r) => r.room == "Kitchen")
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")

查看输入和透视输出

下采样数据

降采样数据是一种在查询时提高性能并优化长期数据存储的策略。简单来说,降采样减少了查询返回的点的数量,而不会丢失数据中的一般趋势。

有关下采样数据的更多信息,请参阅 Downsample data.

最常见的数据降采样方法是按时间间隔或“窗口”。例如,您可能想查询最后一个小时的数据,并返回每五分钟窗口的平均值。

使用 aggregateWindow() 来按指定时间间隔对数据进行下采样:

  • 使用every参数来指定每个窗口的持续时间。
  • 使用 fn 参数指定要应用于每个窗口的 aggregateselector 函数。
  • (可选) 使用 timeSrc 参数指定要用于为每个窗口创建新的聚合时间戳的列值。默认值是 _stop
from(bucket: "get-started")
    |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
    |> filter(fn: (r) => r._measurement == "home")
    |> filter(fn: (r) => r._field == "temp")
    |> aggregateWindow(every: 2h, fn: mean)

查看输入和下采样输出

使用InfluxDB任务自动化处理

InfluxDB 任务 是调度查询,可以执行上述描述的任何数据处理操作。通常,任务随后使用 to() 函数 将处理结果写回 InfluxDB。

有关创建和配置任务的更多信息,请参见 开始使用 InfluxDB 任务.

示例下采样任务

option task = {
    name: "Example task"
    every: 1d,
}

from(bucket: "get-started-downsampled")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "home")
    |> aggregateWindow(every: 2h, fn: mean)


Flux的未来

Flux 正在进入维护模式。您可以像现在一样继续使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开Alpha测试

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

我们将发布两个产品作为测试版的一部分。

InfluxDB 3 核心,是我们新的开源产品。 它是一个用于时间序列和事件数据的实时数据引擎。 InfluxDB 3 企业版是建立在核心基础之上的商业版本,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始的更多信息,请查看: