优化 Flux 查询
优化您的Flux查询,以减少其内存和计算(CPU)需求。
使用推送进行查询
下推 是将数据操作推送到底层数据源的函数或函数组合,而不是在内存中操作数据。以下推查询开始可以提高查询性能。一旦运行了非下推函数,Flux 就会将数据拉入内存并在内存中运行所有后续操作。
下推函数和函数组合
当查询 InfluxDB 2.7 或 InfluxDB Cloud 数据源时,大多数下推操作是被支持的。如下表所示,在 InfluxDB 2.7 中有一些下推操作不被支持。
| 函数 | InfluxDB 2.7 | InfluxDB Cloud |
|---|---|---|
| count() | ||
| duplicate() | ||
| filter() | ||
| fill() | ||
| first() | ||
| last() | ||
| max() | ||
| mean() | ||
| min() | ||
| range() | ||
| rename() | ||
| sum() | ||
| window() | ||
| 函数组合 | ||
| group() |> count() | ||
| group() |> first() | ||
| group() |> last() | ||
| group() |> max() | ||
| group() |> min() | ||
| group() |> sum() | ||
| sort() |> limit() | ||
| window() |> count() | ||
| window() |> first() | ||
| window() |> last() | ||
| window() |> max() | ||
| window() |> min() | ||
| window() |> sum() |
* filter() 仅当所有参数值都是静态时才会下推。 查看 Avoid processing filters inline。
在查询开始时使用下推函数和函数组合。 一旦运行非下推函数,Flux 就会将数据加载到内存中,并在那里运行所有后续操作。
正在使用的下推函数
from(bucket: "example-bucket")
|> range(start: -1h) //
|> filter(fn: (r) => r.sensor == "abc123") //
|> group(columns: ["_field", "host"]) // Pushed to the data source
|> aggregateWindow(every: 5m, fn: max) //
|> filter(fn: (r) => r._value >= 90.0) //
|> top(n: 10) // Run in memory
避免内联处理过滤器
避免在线使用数学运算或字符串操作来定义数据过滤器。
在线处理过滤值会阻止 filter() 将其操作推送到基础数据源,因此
前一个函数返回的数据加载到内存中。
这通常会导致显著的性能损失。
例如,以下查询使用 仪表板变量 和字符串连接来定义一个过滤区域。因为 filter() 在行内使用字符串连接,所以它无法将其操作推送到底层数据源,并将从 range() 返回的所有数据加载到内存中。
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r.region == v.provider + v.region)
要动态设置筛选器并保持filter()函数的下推能力,请使用变量在filter()之外定义筛选值:
region = v.provider + v.region
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r.region == region)
避免短时间窗口
基于时间间隔的窗口处理(分组数据)通常用于聚合和降采样数据。通过避免短窗口持续时间来提高性能。更多的窗口需要更多的计算能力来评估每一行应该分配到哪个窗口。合理的窗口持续时间取决于查询的总时间范围。
谨慎使用“heavy”函数
以下函数比其他函数使用更多的内存或CPU。在使用它们之前,请考虑它们在数据处理中的必要性:
我们正在不断优化Flux,这个列表可能并不代表它的当前状态。
尽可能使用 set() 而不是 map()
set(),
experimental.set(),
和 map
可以分别设置数据中列的值,但set函数在性能上优于map()。
使用以下指南来确定使用哪一个:
- 如果将列值设置为预定义的静态值,请使用
set()或experimental.set()。 - 如果使用 现有行数据 动态设置列值,请使用
map()。
将列值设置为静态值
以下查询功能上是相同的,但使用 set() 的性能优于使用 map()。
data
|> map(fn: (r) => ({ r with foo: "bar" }))
// Recommended
data
|> set(key: "foo", value: "bar")
使用现有行数据动态设置列值
data
|> map(fn: (r) => ({ r with foo: r.bar }))
余额时间范围和数据精度
为了确保查询性能,平衡时间范围和数据的精度。
例如,如果您查询每秒存储的数据并请求六个月的数据,
结果将包括每个系列大约1550万个点。根据在filter()(cardinality)之后返回的系列数量,这可能迅速变成数十亿个点。
Flux必须将这些点存储在内存中以生成响应。使用pushdowns来优化内存中存储的点的数量。
要查询长时间段的数据,请创建一个任务以 汇总数据,然后查询汇总后的数据。
使用Flux分析器测量查询性能
使用Flux Profiler package来测量查询性能并将性能指标附加到查询输出中。以下Flux分析器可用:
- query: 提供有关整个Flux脚本执行的统计信息。
- 操作符: 提供关于查询中每个操作的统计信息。
导入 profiler 包并使用 profile.enabledProfilers 选项启用分析器。
import "profiler"
option profiler.enabledProfilers = ["query", "operator"]
// Query to profile
有关Flux分析器的更多信息,请参阅 Flux Profiler package。