Documentation

优化 Flux 查询

优化您的Flux查询,以减少其内存和计算(CPU)需求。

使用推送进行查询

下推 是将数据操作推送到底层数据源的函数或函数组合,而不是在内存中操作数据。以下推查询开始可以提高查询性能。一旦运行了非下推函数,Flux 就会将数据拉入内存并在内存中运行所有后续操作。

下推函数和函数组合

当查询 InfluxDB 2.7 或 InfluxDB Cloud 数据源时,大多数下推操作是被支持的。如下表所示,在 InfluxDB 2.7 中有一些下推操作不被支持。

函数InfluxDB 2.7InfluxDB Cloud
count()
duplicate()
filter() *
fill()
first()
last()
max()
mean()
min()
range()
rename()
sum()
window()
函数组合
group() |> count()
group() |> first()
group() |> last()
group() |> max()
group() |> min()
group() |> sum()
sort() |> limit()
window() |> count()
window() |> first()
window() |> last()
window() |> max()
window() |> min()
window() |> sum()

* filter() 仅当所有参数值都是静态时才会下推。 查看 Avoid processing filters inline

在查询开始时使用下推函数和函数组合。 一旦运行非下推函数,Flux 就会将数据加载到内存中,并在那里运行所有后续操作。

正在使用的下推函数
from(bucket: "example-bucket")
    |> range(start: -1h)                       //
    |> filter(fn: (r) => r.sensor == "abc123") //
    |> group(columns: ["_field", "host"])      // Pushed to the data source
    |> aggregateWindow(every: 5m, fn: max)     //
    |> filter(fn: (r) => r._value >= 90.0)     //

    |> top(n: 10)                              // Run in memory

避免内联处理过滤器

避免在线使用数学运算或字符串操作来定义数据过滤器。 在线处理过滤值会阻止 filter() 将其操作推送到基础数据源,因此 前一个函数返回的数据加载到内存中。 这通常会导致显著的性能损失。

例如,以下查询使用 仪表板变量 和字符串连接来定义一个过滤区域。因为 filter() 在行内使用字符串连接,所以它无法将其操作推送到底层数据源,并将从 range() 返回的所有数据加载到内存中。

from(bucket: "example-bucket")
    |> range(start: -1h)                      
    |> filter(fn: (r) => r.region == v.provider + v.region)

要动态设置筛选器并保持filter()函数的下推能力,请使用变量在filter()之外定义筛选值:

region = v.provider + v.region

from(bucket: "example-bucket")
    |> range(start: -1h)                      
    |> filter(fn: (r) => r.region == region)

避免短时间窗口

基于时间间隔的窗口处理(分组数据)通常用于聚合和降采样数据。通过避免短窗口持续时间来提高性能。更多的窗口需要更多的计算能力来评估每一行应该分配到哪个窗口。合理的窗口持续时间取决于查询的总时间范围。

谨慎使用“heavy”函数

以下函数比其他函数使用更多的内存或CPU。在使用它们之前,请考虑它们在数据处理中的必要性:

我们正在不断优化Flux,这个列表可能并不代表它的当前状态。

尽可能使用 set() 而不是 map()

set(), experimental.set(), 和 map 可以分别设置数据中列的值,但set函数在性能上优于map()

使用以下指南来确定使用哪一个:

  • 如果将列值设置为预定义的静态值,请使用 set()experimental.set()
  • 如果使用 现有行数据 动态设置列值,请使用 map()

将列值设置为静态值

以下查询功能上是相同的,但使用 set() 的性能优于使用 map()

data
    |> map(fn: (r) => ({ r with foo: "bar" }))

// Recommended
data
    |> set(key: "foo", value: "bar")

使用现有行数据动态设置列值

data
    |> map(fn: (r) => ({ r with foo: r.bar }))

余额时间范围和数据精度

为了确保查询性能,平衡时间范围和数据的精度。 例如,如果您查询每秒存储的数据并请求六个月的数据, 结果将包括每个系列大约1550万个点。根据在filter()(cardinality)之后返回的系列数量,这可能迅速变成数十亿个点。 Flux必须将这些点存储在内存中以生成响应。使用pushdowns来优化内存中存储的点的数量。

要查询长时间段的数据,请创建一个任务以 汇总数据,然后查询汇总后的数据。

使用Flux分析器测量查询性能

使用Flux Profiler package来测量查询性能并将性能指标附加到查询输出中。以下Flux分析器可用:

  • query: 提供有关整个Flux脚本执行的统计信息。
  • 操作符: 提供关于查询中每个操作的统计信息。

导入 profiler 包并使用 profile.enabledProfilers 选项启用分析器。

import "profiler"

option profiler.enabledProfilers = ["query", "operator"]

// Query to profile

有关Flux分析器的更多信息,请参阅 Flux Profiler package



Flux的未来

Flux 正在进入维护模式。您可以像现在一样继续使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开Alpha测试

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

我们将发布两个产品作为测试版的一部分。

InfluxDB 3 核心,是我们新的开源产品。 它是一个用于时间序列和事件数据的实时数据引擎。 InfluxDB 3 企业版是建立在核心基础之上的商业版本,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始的更多信息,请查看: