reduce() 函数
reduce() 使用一个归约函数 (fn) 聚合每个输入表中的行。
每个表的输出是表的分组键,列对应于减速器记录中的每个字段。 如果减速器记录包含一个与分组键列同名的列, 则分组键列的值会被覆盖,输出的分组键会改变。 然而,如果两个减少的表写入相同的目标分组键,该 函数将返回一个错误。
已删除的列
reduce() 删除任何以下列:
- 不属于输入表的分组键。
- 在
identity记录或归约函数 (fn) 中没有明确映射。
函数类型签名
(<-tables: stream[B], fn: (accumulator: A, r: B) => A, identity: A) => stream[C] where A: Record, B: Record, C: Record
有关更多信息,请参见 Function type signatures。
参数
函数
(必需)
将应用于每行记录的归约函数 (r).
缩减函数接受两个参数:
- r: 表示当前行的记录。
- accumulator: 从reducer函数对上一行的操作返回的记录。
身份
(必需) 定义减少器记录并提供第一行减少器操作的初始值的记录。
可以在异步处理用例中使用多于一次。 身份记录中值的数据类型决定了输出值的数据类型。
表格
输入数据。默认是管道转发数据 (<-).
示例
计算值列的总和
import "sampledata"
sampledata.int()
|> reduce(fn: (r, accumulator) => ({sum: r._value + accumulator.sum}), identity: {sum: 0})
在单个归约器中计算总和和计数
import "sampledata"
sampledata.int()
|> reduce(
fn: (r, accumulator) => ({sum: r._value + accumulator.sum, count: accumulator.count + 1}),
identity: {sum: 0, count: 0},
)
计算所有数值的乘积
import "sampledata"
sampledata.int()
|> reduce(fn: (r, accumulator) => ({prod: r._value * accumulator.prod}), identity: {prod: 1})
计算所有值的平均值
import "sampledata"
sampledata.int()
|> reduce(
fn: (r, accumulator) =>
({
count: accumulator.count + 1,
total: accumulator.total + r._value,
avg: float(v: accumulator.total + r._value) / float(v: accumulator.count + 1),
}),
identity: {count: 0, total: 0, avg: 0.0},
)