Documentation

创建自定义聚合函数

要聚合您的数据,请使用 Flux 聚合函数 或使用 reduce()函数创建自定义聚合函数。

聚合函数特性

聚合函数具有相同的基本特征:

  • 它们对单个输入表进行操作,将所有记录转换为单个记录。
  • 输出表与输入表具有相同的 group key

reduce() 如何工作

这个 reduce() 函数一次对一行进行操作,使用在 fn 参数 中定义的函数。fn 函数使用以下参数指定的两个 records 将键映射到特定值:

参数描述
r表示行或记录的记录。
accumulator一个记录,包含每行聚合计算中使用的值。

函数 reduce()identity 参数 定义了初始 accumulator 记录。

示例 reduce() 函数

以下示例 reduce() 函数生成输入表中所有值的总和和积。

|> reduce(
    fn: (r, accumulator) => ({
        sum: r._value + accumulator.sum,
        product: r._value * accumulator.product
    }),
    identity: {sum: 0.0, product: 1.0},
)

为了说明这个函数是如何工作的,取这个简化的表作为例子:

时间
2019-04-23T16:10:49Z1.6
2019-04-23T16:10:59Z2.3
2019-04-23T16:11:09Z0.7
2019-04-23T16:11:19Z1.2
2019-04-23T16:11:29Z3.8
输入记录

fn 函数使用第一行中的数据来定义 r 记录。它使用 identity 参数来定义 accumulator 记录。

r           = { _time: 2019-04-23T16:10:49.00Z, _value: 1.6 }
accumulator = { sum  : 0.0, product : 1.0 }
键位映射

然后它使用 raccumulator 记录来填充键映射中的值:

// sum: r._value + accumulator.sum
sum: 1.6 + 0.0

// product: r._value * accumulator.product
product: 1.6 * 1.0
输出记录

这将生成一个包含以下键值对的输出记录:

{ sum: 1.6, product: 1.6 }

该函数然后使用这个 输出记录 处理下一行作为 accumulator

因为 reduce() 在处理下一行时将输出记录作为 accumulator,所以在 fn 函数中映射的键必须与 identityaccumulator 记录中的键匹配。

处理下一行
// Input records for the second row
r           = { _time: 2019-04-23T16:10:59.00Z, _value: 2.3 }
accumulator = { sum  : 1.6, product : 1.6 }

// Key mappings for the second row
sum: 2.3 + 1.6
product: 2.3 * 1.6

// Output record of the second row
{ sum: 3.9, product: 3.68 }

然后,它使用新的输出记录作为下一个行的 accumulator。这个循环一直持续到表中的所有行都被处理。

最终输出记录和表格

在处理完表中的所有记录后,reduce() 使用最终输出记录创建一个转换后的表,该表包含一行和每个映射键的列。

最终输出记录
{ sum: 9.6, product: 11.74656 }
输出表
总和乘积
9.611.74656

_time 列发生了什么?

reduce() 函数仅保留的列是:

  1. 是输入表的 group key 的一部分。
  2. fn函数中显式映射。

它会丢弃所有其他列。因为 _time 不是组键的一部分,并且在 fn 函数中没有映射,所以它不会被包括在输出表中。

自定义聚合函数示例

要创建自定义聚合函数,请使用创建自定义函数中概述的原则以及reduce()函数对每个输入表中的行进行聚合。

创建自定义平均函数

这个例子说明了如何创建一个函数来计算表中值的平均值。 这仅用于演示目的。 内置的 mean() 函数 做同样的事情,并且性能更佳。

average = (tables=<-, outputField="average") => tables
    |> reduce(
        // Define the initial accumulator record
        identity: {count: 0.0, sum: 0.0, avg: 0.0},
        fn: (r, accumulator) => ({
            // Increment the counter on each reduce loop
            count: accumulator.count + 1.0,
            // Add the _value to the existing sum
            sum: accumulator.sum + r._value,
            // Divide the existing sum by the existing count for a new average
            avg: (accumulator.sum + r._value) / (accumulator.count + 1.0),
        }),
    )
    // Drop the sum and the count columns since they are no longer needed
    |> drop(columns: ["sum", "count"])
    // Set the _field column of the output table to to the value
    // provided in the outputField parameter
    |> set(key: "_field", value: outputField)
    // Rename avg column to _value
    |> rename(columns: {avg: "_value"})
average = (tables=<-, outputField="average") => tables
    |> reduce(
        identity: {count: 0.0, sum: 0.0, avg: 0.0},
        fn: (r, accumulator) => ({
            count: accumulator.count + 1.0,
            sum: accumulator.sum + r._value,
            avg: (accumulator.sum + r._value) / (accumulator.count + 1.0),
        }),
    )
    |> drop(columns: ["sum", "count"])
    |> set(key: "_field", value: outputField)
    |> rename(columns: {avg: "_value"})

汇总多个列

内置聚合函数仅作用于一列。使用 reduce() 创建一个自定义聚合函数以聚合多个列。

以下函数期望输入表具有 c1_valuec2_value 列,并为每个生成平均值。

multiAvg = (tables=<-) => tables
    |> reduce(
        identity: {
            count: 1.0,
            c1_sum: 0.0,
            c1_avg: 0.0,
            c2_sum: 0.0,
            c2_avg: 0.0,
        },
        fn: (r, accumulator) => ({
            count: accumulator.count + 1.0,
            c1_sum: accumulator.c1_sum + r.c1_value,
            c1_avg: accumulator.c1_sum / accumulator.count,
            c2_sum: accumulator.c2_sum + r.c2_value,
            c2_avg: accumulator.c2_sum / accumulator.count,
        }),
    )

汇总毛利和净利润

使用 reduce() 创建一个汇总总利润和净利润的函数。 这个例子期望输入表中有 profitexpenses 列。

profitSummary = (tables=<-) => tables
    |> reduce(
        identity: {gross: 0.0, net: 0.0},
        fn: (r, accumulator) => ({
            gross: accumulator.gross + r.profit,
            net: accumulator.net + r.profit - r.expenses
            }
        )
    )


Flux的未来

Flux 正在进入维护模式。您可以像现在一样继续使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开Alpha测试

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

我们将发布两个产品作为测试版的一部分。

InfluxDB 3 核心,是我们新的开源产品。 它是一个用于时间序列和事件数据的实时数据引擎。 InfluxDB 3 企业版是建立在核心基础之上的商业版本,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始的更多信息,请查看: