创建自定义聚合函数
要聚合您的数据,请使用 Flux
聚合函数
或使用
reduce()函数创建自定义聚合函数。
聚合函数特性
聚合函数具有相同的基本特征:
- 它们对单个输入表进行操作,将所有记录转换为单个记录。
- 输出表与输入表具有相同的 group key。
reduce() 如何工作
这个 reduce() 函数一次对一行进行操作,使用在 fn 参数 中定义的函数。fn 函数使用以下参数指定的两个 records 将键映射到特定值:
| 参数 | 描述 |
|---|---|
r | 表示行或记录的记录。 |
accumulator | 一个记录,包含每行聚合计算中使用的值。 |
函数 reduce() 的 identity 参数 定义了初始 accumulator 记录。
示例 reduce() 函数
以下示例 reduce() 函数生成输入表中所有值的总和和积。
|> reduce(
fn: (r, accumulator) => ({
sum: r._value + accumulator.sum,
product: r._value * accumulator.product
}),
identity: {sum: 0.0, product: 1.0},
)
为了说明这个函数是如何工作的,取这个简化的表作为例子:
| 时间 | 值 |
|---|---|
| 2019-04-23T16:10:49Z | 1.6 |
| 2019-04-23T16:10:59Z | 2.3 |
| 2019-04-23T16:11:09Z | 0.7 |
| 2019-04-23T16:11:19Z | 1.2 |
| 2019-04-23T16:11:29Z | 3.8 |
输入记录
fn 函数使用第一行中的数据来定义 r 记录。它使用 identity 参数来定义 accumulator 记录。
r = { _time: 2019-04-23T16:10:49.00Z, _value: 1.6 }
accumulator = { sum : 0.0, product : 1.0 }
键位映射
然后它使用 r 和 accumulator 记录来填充键映射中的值:
// sum: r._value + accumulator.sum
sum: 1.6 + 0.0
// product: r._value * accumulator.product
product: 1.6 * 1.0
输出记录
这将生成一个包含以下键值对的输出记录:
{ sum: 1.6, product: 1.6 }
该函数然后使用这个 输出记录 处理下一行作为 accumulator。
因为 reduce() 在处理下一行时将输出记录作为 accumulator,所以在 fn 函数中映射的键必须与 identity 和 accumulator 记录中的键匹配。
处理下一行
// Input records for the second row
r = { _time: 2019-04-23T16:10:59.00Z, _value: 2.3 }
accumulator = { sum : 1.6, product : 1.6 }
// Key mappings for the second row
sum: 2.3 + 1.6
product: 2.3 * 1.6
// Output record of the second row
{ sum: 3.9, product: 3.68 }
然后,它使用新的输出记录作为下一个行的 accumulator。这个循环一直持续到表中的所有行都被处理。
最终输出记录和表格
在处理完表中的所有记录后,reduce() 使用最终输出记录创建一个转换后的表,该表包含一行和每个映射键的列。
最终输出记录
{ sum: 9.6, product: 11.74656 }
输出表
| 总和 | 乘积 |
|---|---|
| 9.6 | 11.74656 |
_time 列发生了什么?
该 reduce() 函数仅保留的列是:
- 是输入表的 group key 的一部分。
- 在
fn函数中显式映射。
它会丢弃所有其他列。因为 _time 不是组键的一部分,并且在 fn 函数中没有映射,所以它不会被包括在输出表中。
自定义聚合函数示例
要创建自定义聚合函数,请使用创建自定义函数中概述的原则以及reduce()函数对每个输入表中的行进行聚合。
创建自定义平均函数
这个例子说明了如何创建一个函数来计算表中值的平均值。
这仅用于演示目的。
内置的 mean() 函数
做同样的事情,并且性能更佳。
average = (tables=<-, outputField="average") => tables
|> reduce(
// Define the initial accumulator record
identity: {count: 0.0, sum: 0.0, avg: 0.0},
fn: (r, accumulator) => ({
// Increment the counter on each reduce loop
count: accumulator.count + 1.0,
// Add the _value to the existing sum
sum: accumulator.sum + r._value,
// Divide the existing sum by the existing count for a new average
avg: (accumulator.sum + r._value) / (accumulator.count + 1.0),
}),
)
// Drop the sum and the count columns since they are no longer needed
|> drop(columns: ["sum", "count"])
// Set the _field column of the output table to to the value
// provided in the outputField parameter
|> set(key: "_field", value: outputField)
// Rename avg column to _value
|> rename(columns: {avg: "_value"})
average = (tables=<-, outputField="average") => tables
|> reduce(
identity: {count: 0.0, sum: 0.0, avg: 0.0},
fn: (r, accumulator) => ({
count: accumulator.count + 1.0,
sum: accumulator.sum + r._value,
avg: (accumulator.sum + r._value) / (accumulator.count + 1.0),
}),
)
|> drop(columns: ["sum", "count"])
|> set(key: "_field", value: outputField)
|> rename(columns: {avg: "_value"})
汇总多个列
内置聚合函数仅作用于一列。使用 reduce() 创建一个自定义聚合函数以聚合多个列。
以下函数期望输入表具有 c1_value 和 c2_value 列,并为每个生成平均值。
multiAvg = (tables=<-) => tables
|> reduce(
identity: {
count: 1.0,
c1_sum: 0.0,
c1_avg: 0.0,
c2_sum: 0.0,
c2_avg: 0.0,
},
fn: (r, accumulator) => ({
count: accumulator.count + 1.0,
c1_sum: accumulator.c1_sum + r.c1_value,
c1_avg: accumulator.c1_sum / accumulator.count,
c2_sum: accumulator.c2_sum + r.c2_value,
c2_avg: accumulator.c2_sum / accumulator.count,
}),
)
汇总毛利和净利润
使用 reduce() 创建一个汇总总利润和净利润的函数。 这个例子期望输入表中有 profit 和 expenses 列。
profitSummary = (tables=<-) => tables
|> reduce(
identity: {gross: 0.0, net: 0.0},
fn: (r, accumulator) => ({
gross: accumulator.gross + r.profit,
net: accumulator.net + r.profit - r.expenses
}
)
)