现在您已经了解了从InfluxDB查询数据的基础知识,
让我们超越基本查询,开始处理查询的数据。
“处理”数据可能意味着转换、聚合、下采样或对数据发出警报。
本教程涵盖以下数据处理用例:
大多数数据处理操作都需要手动编辑 Flux 查询。 如果您正在使用 InfluxDB 数据浏览器,请切换到 脚本编辑器 而不是使用 查询构建器。
重新映射或分配数据中的值
使用 map() 函数 来遍历数据中的每一行并更新该行中的值。map() 是 Flux 中最有用的函数之一,将帮助您完成许多需要执行的数据处理操作。
了解更多关于 map() 如何工作的内容
map() 采取一个参数, fn。
fn 采用一个匿名函数,该函数读取每一行作为一个
record,命名为 r。
在 r 记录中,每个键值对代表一个列及其值。
例如:
r = {
_time: 2020-01-01T00:00:00Z,
_measurement: "home",
room: "Kitchen",
_field: "temp",
_value: 21.0,
}
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2020-01-01T00:00:00Z | 家 | 厨房 | 温度 | 21.0 |
fn 函数以任何你需要的方式修改 r 记录,并返回该行的新记录。例如,使用上述记录:
(r) => ({ _time: r._time, _field: "temp_F", _value: (r._value * 1.8) + 32.0})
// Returns: {_time: 2020-01-01T00:00:00Z, _field: "temp_F", _value: 69.8}
| 时间 | 字段 | 值 |
|---|
| 2020-01-01T00:00:00Z | temp_F | 69.8 |
请注意,某些列已从原始行记录中删除。这是因为 fn 函数明确映射了 _time、_field 和 _value 列。要保留现有列并仅更新或添加特定列,请使用 with 运算符扩展您的行记录。例如,使用上面的记录:
(r) => ({r with _value: (r._value * 1.8) + 32.0, degrees: "F"})
// Returns:
// {
// _time: 2020-01-01T00:00:00Z,
// _measurement: "home",
// room: "Kitchen",
// _field: "temp",
// _value: 69.8,
// degrees: "F",
// }
| 时间 | 测量 | 房间 | 字段 | 值 | 度数 |
|---|
| 2020-01-01T00:00:00Z | 家 | 厨房 | 温度 | 69.8 | 华氏 |
from(bucket: "get-started")
|> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "hum")
|> map(fn: (r) => ({r with _value: r._value / 100.0}))
映射示例
条件性地分配状态
在一个 map() 函数中,你可以使用 条件表达式 (if/then/else) 来有条件地赋值。比如,使用 在“开始写入InfluxDB”中的数据:
查询co字段以返回每个房间的碳氧化物百万分之一(ppm)读数。
使用 map() 来迭代每一行,评估 _value 列中的值,然后有条件地赋予状态:
- 如果一氧化碳少于 10 ppm,则分配状态:ok。
- 否则,分配状态: 警告。
将状态存储在state列中。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "co")
|> map(fn: (r) => ({r with state: if r._value < 10 then "ok" else "warning"}))
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T14:00:00Z | 主页 | 厨房 | co | 1 |
| 2022-01-01T15:00:00Z | 家 | 厨房 | co | 3 |
| 2022-01-01T16:00:00Z | 家 | 厨房 | co | 7 |
| 2022-01-01T17:00:00Z | 家 | 厨房 | co | 9 |
| 2022-01-01T18:00:00Z | 家庭 | 厨房 | co | 18 |
| 2022-01-01T19:00:00Z | 家 | 厨房 | co | 22 |
| 2022-01-01T20:00:00Z | 家 | 厨房 | co | 26 |
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T14:00:00Z | 家 | 客厅 | co | 1 |
| 2022-01-01T15:00:00Z | 家 | 客厅 | co | 1 |
| 2022-01-01T16:00:00Z | 家 | 客厅 | co | 4 |
| 2022-01-01T17:00:00Z | 家 | 客厅 | co | 5 |
| 2022-01-01T18:00:00Z | 家 | 客厅 | co | 9 |
| 2022-01-01T19:00:00Z | 家 | 客厅 | co | 14 |
| 2022-01-01T20:00:00Z | 家 | 客厅 | co | 17 |
| 时间 | 测量 | 房间 | 字段 | 值 | 状态 |
|---|
| 2022-01-01T14:00:00Z | 家 | 厨房 | co | 1 | 正常 |
| 2022-01-01T15:00:00Z | 家 | 厨房 | co | 3 | 好 |
| 2022-01-01T16:00:00Z | 家 | 厨房 | co | 7 | 正常 |
| 2022-01-01T17:00:00Z | 家 | 厨房 | co | 9 | 正常 |
| 2022-01-01T18:00:00Z | 家 | 厨房 | co | 18 | 警告 |
| 2022-01-01T19:00:00Z | 家 | 厨房 | co | 22 | 警告 |
| 2022-01-01T20:00:00Z | 家 | 厨房 | co | 26 | 警告 |
| 时间 | 测量 | 房间 | 字段 | 值 | 状态 |
|---|
| 2022-01-01T14:00:00Z | 家 | 客厅 | co | 1 | 好 |
| 2022-01-01T15:00:00Z | 家 | 客厅 | co | 1 | 好 |
| 2022-01-01T16:00:00Z | 家 | 客厅 | co | 4 | 正常 |
| 2022-01-01T17:00:00Z | 家 | 客厅 | co | 5 | 正常 |
| 2022-01-01T18:00:00Z | 家 | 客厅 | co | 9 | 好 |
| 2022-01-01T19:00:00Z | 家 | 客厅 | co | 14 | 警告 |
| 2022-01-01T20:00:00Z | 家 | 客厅 | co | 17 | 警告 |
数据警报
map() 允许你在每行的基础上执行更复杂的操作。使用一个 Flux 块 ({}) 在 fn 函数中,你可以创建作用域变量并在每行的上下文中执行其他函数。例如,你可以向 Slack 发送消息。
例如,使用“开始写入InfluxDB”的数据:
导入slack包。
查询co字段以返回每个房间的碳氧化物百万分之一(ppm)读数。
使用 map() 来迭代每一行,评估 _value 列中的值,然后有条件地赋予状态:
- 如果一氧化碳少于10 ppm,赋值状态:ok。
- 否则,将状态分配为:警告。
将状态存储在state列中。
使用 filter() 仅返回在状态列中包含 警告 的行。
使用 map() 迭代每一行。
在你的 fn 函数中,使用一个 Flux 块 ({}) 来:
- Create a
responseCode variable that uses slack.message()
to send a message to Slack using data from the input row.
slack.message() returns the response code of the Slack API request as an integer. - Use a
return statement to return a new row record.
The new row should extend the input row with a new column, sent, with
a boolean value determined by the responseCode variable.
map() 将每一行消息发送到 Slack,该消息被传递给函数。
import "slack"
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "co")
|> map(fn: (r) => ({r with state: if r._value < 10 then "ok" else "warning"}))
|> filter(fn: (r) => r.state == "warning")
|> map(
fn: (r) => {
responseCode =
slack.message(
token: "mYSlacK70k3n",
color: "#ff0000",
channel: "#alerts",
text: "Carbon monoxide is at dangerous levels in the ${r.room}: ${r._value} ppm.",
)
return {r with sent: responseCode == 200}
},
)
以下输入表示被警告状态过滤的数据。
| 时间 | 测量 | 房间 | 字段 | 值 | 状态 |
|---|
| 2022-01-01T18:00:00Z | 家 | 厨房 | co | 18 | 警告 |
| 2022-01-01T19:00:00Z | 家 | 厨房 | co | 22 | 警告 |
| 2022-01-01T20:00:00Z | 家 | 厨房 | co | 26 | 警告 |
| 时间 | 测量 | 房间 | 字段 | 值 | 状态 |
|---|
| 2022-01-01T19:00:00Z | 家 | 客厅 | co | 14 | 警告 |
| 2022-01-01T20:00:00Z | 家 | 客厅 | co | 17 | 警告 |
输出包含一个 sent 列,指示消息是否已发送。
| 时间 | 测量 | 房间 | 字段 | 值 | 状态 | 发送 |
|---|
| 2022-01-01T18:00:00Z | 家 | 厨房 | co | 18 | 警告 | true |
| 2022-01-01T19:00:00Z | 家 | 厨房 | co | 22 | 警告 | true |
| 2022-01-01T20:00:00Z | 家 | 厨房 | co | 26 | 警告 | true |
| 时间 | 测量 | 房间 | 字段 | 值 | 状态 | 发送 |
|---|
| 2022-01-01T19:00:00Z | 家 | 客厅 | co | 14 | 警告 | true |
| 2022-01-01T20:00:00Z | 家 | 客厅 | co | 17 | 警告 | true |
根据以上结果,您将在Slack中收到以下消息:
厨房中的一氧化碳浓度达到了危险水平:18 ppm。
厨房中的一氧化碳浓度达到了危险水平:22 ppm。
客厅中的一氧化碳浓度达到了危险水平:14 ppm。
厨房中的一氧化碳浓度达到了危险水平:26 ppm。
客厅中的一氧化碳浓度达到了危险水平:17 ppm。
分组数据
使用group()函数根据特定列值重新分组您的数据,以便进行进一步处理。
from(bucket: "get-started")
|> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> group(columns: ["room", "_field"])
理解数据分组及其重要性很重要,但对于本“入门”教程来说可能太复杂。有关数据如何分组以及为什么重要的更多信息,请参见Flux 数据模型文档。
默认情况下, from() 返回从 InfluxDB 查询的数据,按系列分组
(测量、标签和字段键)。
返回的表流中的每个表代表一个组。
每个表包含按数据分组的列的相同值。
这种分组非常重要,因为您 聚合数据。
组示例
按特定列分组数据
使用“开始写入InfluxDB”的数据:
- 查询
temp 和 hum 字段。 - 使用
group() 仅按 _field 列分组。
from(bucket: "get-started")
|> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T10:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp" or r._field == "hum")
|> group(columns: ["_field"])
以下数据是从最后的 filter() 输出并传递到 group() 的:
组键实例 = [_measurement=home, room=厨房, _field=湿度]
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T08:00:00Z | 家 | 厨房 | 湿度 | 35.9 |
| 2022-01-01T09:00:00Z | 家 | 厨房 | 湿度 | 36.2 |
| 2022-01-01T10:00:00Z | 家 | 厨房 | 湿度 | 36.1 |
组密钥实例 = [_measurement=home, room=客厅, _field=湿度]
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T08:00:00Z | 家 | 客厅 | 湿度 | 35.9 |
| 2022-01-01T09:00:00Z | 家 | 客厅 | 湿度 | 35.9 |
| 2022-01-01T10:00:00Z | 家 | 客厅 | 湿度 | 36 |
组键实例 = [_measurement=home, room=Kitchen, _field=temp]
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T08:00:00Z | 家 | 厨房 | 温度 | 21 |
| 2022-01-01T09:00:00Z | 家 | 厨房 | 温度 | 23 |
| 2022-01-01T10:00:00Z | 家 | 厨房 | 温度 | 22.7 |
组键实例 = [_measurement=home, room=客厅, _field=temp]
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T08:00:00Z | 家 | 客厅 | 温度 | 21.1 |
| 2022-01-01T09:00:00Z | 家 | 客厅 | 温度 | 21.4 |
| 2022-01-01T10:00:00Z | 家 | 客厅 | 温度 | 21.8 |
当按 _field 分组时,所有带有 temp 字段的行将会在一个表中,而所有带有 hum 字段的行将会在另一个表中。_measurement 和 room 列不再影响行的分组方式。
组键实例 = [_field=hum]
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T08:00:00Z | 家 | 厨房 | 湿度 | 35.9 |
| 2022-01-01T09:00:00Z | 家 | 厨房 | 湿度 | 36.2 |
| 2022-01-01T10:00:00Z | 家 | 厨房 | 湿度 | 36.1 |
| 2022-01-01T08:00:00Z | 家 | 客厅 | 湿度 | 35.9 |
| 2022-01-01T09:00:00Z | 家 | 客厅 | 湿度 | 35.9 |
| 2022-01-01T10:00:00Z | 家 | 客厅 | 湿度 | 36 |
组键实例 = [_field=temp]
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T08:00:00Z | 家 | 厨房 | 温度 | 21 |
| 2022-01-01T09:00:00Z | 家 | 厨房 | 温度 | 23 |
| 2022-01-01T10:00:00Z | 家 | 厨房 | 温度 | 22.7 |
| 2022-01-01T08:00:00Z | 家 | 客厅 | 温度 | 21.1 |
| 2022-01-01T09:00:00Z | 家 | 客厅 | 温度 | 21.4 |
| 2022-01-01T10:00:00Z | 家 | 客厅 | 温度 | 21.8 |
解散数据
使用“开始写入InfluxDB”中的数据:
- 查询
temp 和 hum 字段。 - 使用
group() 而不带任何参数来“解组”数据或不按任何列分组。
columns 参数的默认值是一个空数组 ([])。
from(bucket: "get-started")
|> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T10:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp" or r._field == "hum")
|> group()
以下数据是从最后的 filter() 输出并传递到 group() 的:
组键实例 = [_measurement=home, room=厨房, _field=湿度]
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T08:00:00Z | 家 | 厨房 | 湿度 | 35.9 |
| 2022-01-01T09:00:00Z | 家 | 厨房 | 湿度 | 36.2 |
| 2022-01-01T10:00:00Z | 家 | 厨房 | 湿度 | 36.1 |
组密钥实例 = [_measurement=home, room=客厅, _field=湿度]
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T08:00:00Z | 家 | 客厅 | 湿度 | 35.9 |
| 2022-01-01T09:00:00Z | 家 | 客厅 | 湿度 | 35.9 |
| 2022-01-01T10:00:00Z | 家 | 客厅 | 湿度 | 36 |
组键实例 = [_measurement=home, room=厨房, _field=温度]
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T08:00:00Z | 家 | 厨房 | 温度 | 21 |
| 2022-01-01T09:00:00Z | 家 | 厨房 | 温度 | 23 |
| 2022-01-01T10:00:00Z | 家 | 厨房 | 温度 | 22.7 |
组键实例 = [_measurement=home, room=客厅, _field=temp]
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T08:00:00Z | 家 | 客厅 | 温度 | 21.1 |
| 2022-01-01T09:00:00Z | 家 | 客厅 | 温度 | 21.4 |
| 2022-01-01T10:00:00Z | 家 | 客厅 | 温度 | 21.8 |
当未分组时,数据以单个表格的形式返回。
组键实例 = []
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T08:00:00Z | 家 | 厨房 | 湿度 | 35.9 |
| 2022-01-01T09:00:00Z | 家 | 厨房 | 湿度 | 36.2 |
| 2022-01-01T10:00:00Z | 家 | 厨房 | 湿度 | 36.1 |
| 2022-01-01T08:00:00Z | 家 | 厨房 | 温度 | 21 |
| 2022-01-01T09:00:00Z | 家 | 厨房 | 温度 | 23 |
| 2022-01-01T10:00:00Z | 家 | 厨房 | 温度 | 22.7 |
| 2022-01-01T08:00:00Z | 家 | 客厅 | 湿度 | 35.9 |
| 2022-01-01T09:00:00Z | 家 | 客厅 | 湿度 | 35.9 |
| 2022-01-01T10:00:00Z | 家 | 客厅 | 湿度 | 36 |
| 2022-01-01T08:00:00Z | 家 | 客厅 | 温度 | 21.1 |
| 2022-01-01T09:00:00Z | 家 | 客厅 | 温度 | 21.4 |
| 2022-01-01T10:00:00Z | 家 | 客厅 | 温度 | 21.8 |
聚合或选择特定数据
使用 Flux aggregate 或 selector 函数从 每个 输入表中返回汇总或选定的值。
from(bucket: "get-started")
|> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "co" or r._field == "hum" or r._field == "temp")
|> mean()
按时间汇总
如果你想查询随着时间变化的聚合值,这是一种
降采样的形式。
聚合函数
聚合函数 删除
不在 分组键 中的列,并为每个输入表返回一行,该行包含该表的聚合值。
聚合示例
计算每个房间的平均温度
使用“开始写入InfluxDB”中的数据:
- 查询
temp 字段。默认情况下,from() 返回按 _measurement,room 和 _field 进行分组的数据,因此每个表代表一个房间。 - 使用
mean() 返回每个房间的平均温度。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> mean()
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T14:00:00Z | 家 | 厨房 | 温度 | 22.8 |
| 2022-01-01T15:00:00Z | 家 | 厨房 | 温度 | 22.7 |
| 2022-01-01T16:00:00Z | 家 | 厨房 | 温度 | 22.4 |
| 2022-01-01T17:00:00Z | 家 | 厨房 | 温度 | 22.7 |
| 2022-01-01T18:00:00Z | 家 | 厨房 | 温度 | 23.3 |
| 2022-01-01T19:00:00Z | 家 | 厨房 | 温度 | 23.1 |
| 2022-01-01T20:00:00Z | 家 | 厨房 | 温度 | 22.7 |
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T14:00:00Z | 家 | 客厅 | 温度 | 22.3 |
| 2022-01-01T15:00:00Z | 家 | 客厅 | 温度 | 22.3 |
| 2022-01-01T16:00:00Z | 家 | 客厅 | 温度 | 22.4 |
| 2022-01-01T17:00:00Z | 家 | 客厅 | 温度 | 22.6 |
| 2022-01-01T18:00:00Z | 家 | 客厅 | 温度 | 22.8 |
| 2022-01-01T19:00:00Z | 家 | 客厅 | 温度 | 22.5 |
| 2022-01-01T20:00:00Z | 家 | 客厅 | 温度 | 22.2 |
| _测量 | 房间 | _字段 | _值 |
|---|
| 主页 | 厨房 | 温度 | 22.814285714285713 |
| _度量 | 房间 | _字段 | _值 |
|---|
| 主页 | 客厅 | 温度 | 22.44285714285714 |
计算所有房间的整体平均温度
使用“开始写入InfluxDB”的数据:
- 查询
temp 字段。 - 使用
group() 将数据 解组 为一个表。默认情况下, from() 返回按 _measurement、room 和 _field 分组的数据。要获得整体平均值,您需要将所有结果组织为一个表。 - 使用
mean() 返回平均温度。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> group()
|> mean()
以下输入数据表示未分组的数据,这些数据被传递到 mean()。
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T14:00:00Z | 家 | 厨房 | 温度 | 22.8 |
| 2022-01-01T15:00:00Z | 家 | 厨房 | 温度 | 22.7 |
| 2022-01-01T16:00:00Z | 家 | 厨房 | 温度 | 22.4 |
| 2022-01-01T17:00:00Z | 家 | 厨房 | 温度 | 22.7 |
| 2022-01-01T18:00:00Z | 家 | 厨房 | 温度 | 23.3 |
| 2022-01-01T19:00:00Z | 家 | 厨房 | 温度 | 23.1 |
| 2022-01-01T20:00:00Z | 家 | 厨房 | 温度 | 22.7 |
| 2022-01-01T14:00:00Z | 家 | 客厅 | 温度 | 22.3 |
| 2022-01-01T15:00:00Z | 家 | 客厅 | 温度 | 22.3 |
| 2022-01-01T16:00:00Z | 家 | 客厅 | 温度 | 22.4 |
| 2022-01-01T17:00:00Z | 家 | 客厅 | 温度 | 22.6 |
| 2022-01-01T18:00:00Z | 家 | 客厅 | 温度 | 22.8 |
| 2022-01-01T19:00:00Z | 家 | 客厅 | 温度 | 22.5 |
| 2022-01-01T20:00:00Z | 家 | 客厅 | 温度 | 22.2 |
统计所有字段中每个房间报告的点数
使用“开始写入InfluxDB”的数据:
- 通过简单地按
home度量过滤来查询所有字段。 home度量中的字段是不同类型的。
使用toFloat()将所有字段值转换为浮点数。- 使用
group() 按 room 分组数据。 - 使用
count() 返回每个输入表中的行数。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> toFloat()
|> group(columns: ["room"])
|> count()
输出
分配一个新的汇总时间戳
_time 通常不是分组键的一部分,在使用聚合函数时会被丢弃。要为聚合点分配一个新的时间戳,重复代表查询边界的 _start 或 _stop 列作为新的 _time 列。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> mean()
|> duplicate(column: "_stop", as: "_time")
选择器函数
选择器函数 从每个输入表中返回一个或多个列,并保留所有列及其值。
选择器示例
返回每个房间的第一个温度
使用“开始写入InfluxDB”的数据:
- 查询
temp 字段。 - 使用
first() 从每个表中返回第一行。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> first()
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T14:00:00Z | 家 | 厨房 | 温度 | 22.8 |
| 2022-01-01T15:00:00Z | 家 | 厨房 | 温度 | 22.7 |
| 2022-01-01T16:00:00Z | 家 | 厨房 | 温度 | 22.4 |
| 2022-01-01T17:00:00Z | 家 | 厨房 | 温度 | 22.7 |
| 2022-01-01T18:00:00Z | 家 | 厨房 | 温度 | 23.3 |
| 2022-01-01T19:00:00Z | 家 | 厨房 | 温度 | 23.1 |
| 2022-01-01T20:00:00Z | 家 | 厨房 | 温度 | 22.7 |
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T14:00:00Z | 家 | 客厅 | 温度 | 22.3 |
| 2022-01-01T15:00:00Z | 家 | 客厅 | 温度 | 22.3 |
| 2022-01-01T16:00:00Z | 家 | 客厅 | 温度 | 22.4 |
| 2022-01-01T17:00:00Z | 家 | 客厅 | 温度 | 22.6 |
| 2022-01-01T18:00:00Z | 家 | 客厅 | 温度 | 22.8 |
| 2022-01-01T19:00:00Z | 家 | 客厅 | 温度 | 22.5 |
| 2022-01-01T20:00:00Z | 家 | 客厅 | 温度 | 22.2 |
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T14:00:00Z | 家 | 厨房 | 温度 | 22.8 |
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T14:00:00Z | 家 | 客厅 | 温度 | 22.3 |
返回每个房间的最后温度
使用“开始写入InfluxDB”的数据:
- 查询
temp 字段。 - 使用
last() 从每个表中返回
最后一行。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> last()
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T14:00:00Z | 家 | 厨房 | 温度 | 22.8 |
| 2022-01-01T15:00:00Z | 家 | 厨房 | 温度 | 22.7 |
| 2022-01-01T16:00:00Z | 家 | 厨房 | 温度 | 22.4 |
| 2022-01-01T17:00:00Z | 家 | 厨房 | 温度 | 22.7 |
| 2022-01-01T18:00:00Z | 家 | 厨房 | 温度 | 23.3 |
| 2022-01-01T19:00:00Z | 家 | 厨房 | 温度 | 23.1 |
| 2022-01-01T20:00:00Z | 家 | 厨房 | 温度 | 22.7 |
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T14:00:00Z | 家 | 客厅 | 温度 | 22.3 |
| 2022-01-01T15:00:00Z | 家 | 客厅 | 温度 | 22.3 |
| 2022-01-01T16:00:00Z | 家 | 客厅 | 温度 | 22.4 |
| 2022-01-01T17:00:00Z | 家 | 客厅 | 温度 | 22.6 |
| 2022-01-01T18:00:00Z | 家 | 客厅 | 温度 | 22.8 |
| 2022-01-01T19:00:00Z | 家 | 客厅 | 温度 | 22.5 |
| 2022-01-01T20:00:00Z | 家 | 客厅 | 温度 | 22.2 |
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T20:00:00Z | 家 | 厨房 | 温度 | 22.7 |
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T20:00:00Z | 家 | 客厅 | 温度 | 22.2 |
返回每个房间的最高温度
使用“开始写入InfluxDB”的数据:
- 查询
temp 字段。 - 使用
max() 返回每个表中 _value 列的最高值所在的行。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> max()
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T14:00:00Z | 家 | 厨房 | 温度 | 22.8 |
| 2022-01-01T15:00:00Z | 家 | 厨房 | 温度 | 22.7 |
| 2022-01-01T16:00:00Z | 家 | 厨房 | 温度 | 22.4 |
| 2022-01-01T17:00:00Z | 家 | 厨房 | 温度 | 22.7 |
| 2022-01-01T18:00:00Z | 家 | 厨房 | 温度 | 23.3 |
| 2022-01-01T19:00:00Z | 家 | 厨房 | 温度 | 23.1 |
| 2022-01-01T20:00:00Z | 家 | 厨房 | 温度 | 22.7 |
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T14:00:00Z | 家 | 客厅 | 温度 | 22.3 |
| 2022-01-01T15:00:00Z | 家 | 客厅 | 温度 | 22.3 |
| 2022-01-01T16:00:00Z | 家 | 客厅 | 温度 | 22.4 |
| 2022-01-01T17:00:00Z | 家 | 客厅 | 温度 | 22.6 |
| 2022-01-01T18:00:00Z | 家 | 客厅 | 温度 | 22.8 |
| 2022-01-01T19:00:00Z | 家 | 客厅 | 温度 | 22.5 |
| 2022-01-01T20:00:00Z | 家 | 客厅 | 温度 | 22.2 |
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T14:00:00Z | 家 | 厨房 | 温度 | 22.8 |
| 时间 | 测量值 | 房间 | 字段 | 值 |
|---|
| 2022-01-01T18:00:00Z | 家 | 客厅 | 温度 | 22.8 |
将数据透视为关系模式
如果来自关系型 SQL 或类似 SQL 的查询语言,如 InfluxQL,Flux 使用的数据模型与您习惯的不同。Flux 返回多个表格,每个表格包含不同的字段。“关系型”模式将每个字段结构化为每一行中的一列。
使用pivot()函数将数据根据时间戳转换为“关系型”模式。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "co" or r._field == "hum" or r._field == "temp")
|> filter(fn: (r) => r.room == "Kitchen")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
下采样数据
降采样数据是一种在查询时提高性能并优化长期数据存储的策略。简单来说,降采样减少了查询返回的点的数量,而不会丢失数据中的一般趋势。
有关下采样数据的更多信息,请参阅
Downsample data.
最常见的数据降采样方法是按时间间隔或“窗口”。例如,您可能想查询最后一个小时的数据,并返回每五分钟窗口的平均值。
使用 aggregateWindow() 来按指定时间间隔对数据进行下采样:
- 使用
every参数来指定每个窗口的持续时间。 - 使用
fn 参数指定要应用于每个窗口的 aggregate 或 selector 函数。 - (可选) 使用
timeSrc 参数指定要用于为每个窗口创建新的聚合时间戳的列值。默认值是 _stop。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> aggregateWindow(every: 2h, fn: mean)
使用InfluxDB任务自动化处理
InfluxDB 任务 是调度查询,可以执行上述描述的任何数据处理操作。通常,任务随后使用 to() 函数 将处理结果写回 InfluxDB。
有关创建和配置任务的更多信息,请参见
开始使用 InfluxDB 任务.
示例下采样任务
option task = {
name: "Example task"
every: 1d,
}
from(bucket: "get-started-downsampled")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "home")
|> aggregateWindow(every: 2h, fn: mean)