Flux 查询基础
大多数 Flux 查询遵循相同的基本结构。 熟悉使用 Flux 查询数据时的基本概念和步骤。
基本查询结构
大多数基本的 Flux 查询包括以下步骤:
from(bucket: "example-bucket") // ── Source
|> range(start: -1d) // ── Filter on time
|> filter(fn: (r) => r._field == "foo") // ── Filter on column values
|> group(columns: ["sensorID"]) // ── Shape
|> mean() // ── Process
来源
Flux 输入函数 从数据源检索数据。 所有输入函数返回一个 表的流。
Flux支持多种数据源,包括时间序列数据库(例如InfluxDB和Prometheus),关系数据库(例如MySQL和PostgreSQL),CSV,等等。
- 有关支持的数据源的更多信息,请参见 Query data sources。
- 有关输入函数的列表,请参见 函数类型和类别 - 输入。
过滤器
过滤函数遍历并评估每一输入行,以查看它是否符合指定条件。符合条件的行将包含在函数输出中。不符合指定条件的行将被丢弃。
Flux 提供以下主要过滤函数:
range(): 基于时间过滤数据。filter(): 根据列值过滤数据。filter()使用在fn参数中定义的 谓词函数 来评估输入行。 每一行作为 记录r传递给谓词函数,其中包含行中每列的键值对。
还有其他过滤函数可用。
有关更多信息,请参见 函数类型和类别 – 过滤器。
形状数据
许多查询需要修改数据的结构,以便为处理做好准备。常见的数据整理任务包括 按列值或按时间重新分组数据 或将列值透视为行。
重塑数据的函数包括以下内容:
group(): 修改组键window(): 修改_start和_stop行的值以按时间分组数据pivot(): 将列值透视为行drop(): 删除特定列keep(): 保留特定列并丢弃所有其他列
过程
处理数据可以采取多种形式,包括以下类型的操作:
- 聚合数据: 将输入表所有行聚合为一行。有关信息,请参见 函数类型和类别 - 聚合。
- 选择特定数据点: 从每个输入表中返回特定行。 例如,返回第一行或最后一行,值最高或最低的行等等。有关信息,请参见 函数类型和类别 - 选择器。
- 重写行: 使用
map()重写每个输入行。 使用数学运算转换值,处理字符串,动态添加新列等等。 - 发送通知: 评估数据并使用 Flux 通知端点函数将通知发送到外部服务。有关信息,请参见 函数类型和类别 - 通知端点。
aggregateWindow 辅助函数
aggregateWindow() 是一个辅助函数
可以 塑造和处理数据。
该函数按时间窗口和分组数据,然后对重组的表应用 聚合
或 选择器 函数。
编写基本查询
使用 InfluxDB 示例数据 来编写一个基本的 Flux 查询,该查询查询数据,按时间和列值过滤数据,然后应用一个 聚合。
使用InfluxDB 数据浏览器或Flux REPL来构建并执行以下基本查询。
导入
influxdata/influxdb/sample包 并使用sample.data()函数 加载airSensor示例数据集。import "influxdata/influxdb/sample" sample.data(set: "airSensor")sample.data()返回的数据就像是从 InfluxDB 查询的。 要实际从 InfluxDB 查询数据,请用from()函数替换sample.data()。将返回的数据传递到
range()以按时间过滤数据。
返回过去一小时的数据。import "influxdata/influxdb/sample" sample.data(set: "airSensor") |> range(start: -1h)使用
filter()根据列值过滤行。 在这个示例中,仅返回包含co字段值的行。 字段名称存储在_field列中。import "influxdata/influxdb/sample" sample.data(set: "airSensor") |> range(start: -1h) |> filter(fn: (r) => r._field == "co")使用
mean()来计算每个输入表中的平均值。 因为 InfluxDB 按 系列 对数据进行分组,mean()为每个唯一的sensor_id返回一个包含一行的表, 该行在_value列中包含平均值。import "influxdata/influxdb/sample" sample.data(set: "airSensor") |> range(start: -1h) |> filter(fn: (r) => r._field == "co") |> mean()import "influxdata/influxdb/sample" sample.data(set: "airSensor") |> range(start: -1h) |> filter(fn: (r) => r._field == "co") |> mean() |> group()
这个基本查询的结果应该与以下内容类似:
_start 和 _stop 列已被省略。
| 字段 | 测量 | 传感器ID | 值 |
|---|---|---|---|
| co | 空气传感器 | TLM0100 | 0.42338714381053716 |
| co | 空气传感器 | TLM0101 | 0.4223251339463061 |
| co | 空气传感器 | TLM0102 | 0.8543452859060252 |
| co | 空气传感器 | TLM0103 | 0.2782783780205422 |
| co | 空气传感器 | TLM0200 | 4.612143110484339 |
| co | 空气传感器 | TLM0201 | 0.297474366047375 |
| co | 空气传感器 | TLM0202 | 0.3336370208486757 |
| co | 空气传感器 | TLM0203 | 0.4948166816959906 |