Documentation

Flux 查询基础

大多数 Flux 查询遵循相同的基本结构。 熟悉使用 Flux 查询数据时的基本概念和步骤。

基本查询结构

大多数基本的 Flux 查询包括以下步骤:

from(bucket: "example-bucket")              // ── Source
    |> range(start: -1d)                    // ── Filter on time
    |> filter(fn: (r) => r._field == "foo") // ── Filter on column values
    |> group(columns: ["sensorID"])         // ── Shape
    |> mean()                               // ── Process

来源

Flux 输入函数 从数据源检索数据。 所有输入函数返回一个 表的流

Flux支持多种数据源,包括时间序列数据库(例如InfluxDBPrometheus),关系数据库(例如MySQLPostgreSQL),CSV,等等。

过滤器

过滤函数遍历并评估每一输入行,以查看它是否符合指定条件。符合条件的行将包含在函数输出中。不符合指定条件的行将被丢弃。

Flux 提供以下主要过滤函数:

  • range(): 基于时间过滤数据。
  • filter(): 根据列值过滤数据。 filter() 使用在 fn 参数中定义的 谓词函数 来评估输入行。 每一行作为 记录 r 传递给谓词函数,其中包含行中每列的键值对。

还有其他过滤函数可用。
有关更多信息,请参见 函数类型和类别 – 过滤器

形状数据

许多查询需要修改数据的结构,以便为处理做好准备。常见的数据整理任务包括 按列值或按时间重新分组数据 或将列值透视为行。

重塑数据的函数包括以下内容:

  • group(): 修改组键
  • window(): 修改 _start_stop 行的值以按时间分组数据
  • pivot(): 将列值透视为行
  • drop(): 删除特定列
  • keep(): 保留特定列并丢弃所有其他列

过程

处理数据可以采取多种形式,包括以下类型的操作:

  • 聚合数据: 将输入表所有行聚合为一行。有关信息,请参见 函数类型和类别 - 聚合
  • 选择特定数据点: 从每个输入表中返回特定行。 例如,返回第一行或最后一行,值最高或最低的行等等。有关信息,请参见 函数类型和类别 - 选择器
  • 重写行: 使用 map() 重写每个输入行。 使用数学运算转换值,处理字符串,动态添加新列等等。
  • 发送通知: 评估数据并使用 Flux 通知端点函数将通知发送到外部服务。有关信息,请参见 函数类型和类别 - 通知端点

aggregateWindow 辅助函数

aggregateWindow() 是一个辅助函数 可以 塑造和处理数据。 该函数按时间窗口和分组数据,然后对重组的表应用 聚合选择器 函数。


编写基本查询

使用 InfluxDB 示例数据 来编写一个基本的 Flux 查询,该查询查询数据,按时间和列值过滤数据,然后应用一个 聚合

使用InfluxDB 数据浏览器Flux REPL来构建并执行以下基本查询。

  1. 导入influxdata/influxdb/sample 并使用sample.data() 函数 加载airSensor 示例数据集。

    import "influxdata/influxdb/sample"
    
    sample.data(set: "airSensor")
    

    sample.data() 返回的数据就像是从 InfluxDB 查询的。 要实际从 InfluxDB 查询数据,请用 from() 函数替换 sample.data()

  2. 将返回的数据传递到range()以按时间过滤数据。
    返回过去一小时的数据。

    import "influxdata/influxdb/sample"
    
    sample.data(set: "airSensor")
        |> range(start: -1h)
    
  3. 使用 filter() 根据列值过滤行。 在这个示例中,仅返回包含 co 字段值的行。 字段名称存储在 _field 列中。

    import "influxdata/influxdb/sample"
    
    sample.data(set: "airSensor")
        |> range(start: -1h)
        |> filter(fn: (r) => r._field == "co")
    
  4. 使用 mean() 来计算每个输入表中的平均值。 因为 InfluxDB 按 系列 对数据进行分组, mean() 为每个唯一的 sensor_id 返回一个包含一行的表, 该行在 _value 列中包含平均值。

    import "influxdata/influxdb/sample"
    
    sample.data(set: "airSensor")
        |> range(start: -1h)
        |> filter(fn: (r) => r._field == "co")
        |> mean()
    
  5. 使用 group()表格重组 为一个单一的表格:

    import "influxdata/influxdb/sample"
    
    sample.data(set: "airSensor")
        |> range(start: -1h)
        |> filter(fn: (r) => r._field == "co")
        |> mean()
        |> group()
    

这个基本查询的结果应该与以下内容类似:

_start_stop 列已被省略。

字段测量传感器ID
co空气传感器TLM01000.42338714381053716
co空气传感器TLM01010.4223251339463061
co空气传感器TLM01020.8543452859060252
co空气传感器TLM01030.2782783780205422
co空气传感器TLM02004.612143110484339
co空气传感器TLM02010.297474366047375
co空气传感器TLM02020.3336370208486757
co空气传感器TLM02030.4948166816959906


Flux的未来

Flux 正在进入维护模式。您可以像现在一样继续使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开Alpha测试

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

我们将发布两个产品作为测试版的一部分。

InfluxDB 3 核心,是我们新的开源产品。 它是一个用于时间序列和事件数据的实时数据引擎。 InfluxDB 3 企业版是建立在核心基础之上的商业版本,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始的更多信息,请查看: