dask.bag.Bag.foldby
dask.bag.Bag.foldby¶
- Bag.foldby(key, binop, initial=_NoDefault.no_default, combine=None, combine_initial=_NoDefault.no_default, split_every=None)[源代码]¶
组合的归约和分组。
Foldby 提供了一个结合 groupby 和 reduce 的功能,用于高效的并行分割-应用-合并任务。
计算
>>> b.foldby(key, binop, init)
等同于以下内容:
>>> def reduction(group): ... return reduce(binop, group, init)
>>> b.groupby(key).map(lambda (k, v): (k, reduction(v)))
但它使用最少的通信,因此速度*快得多*。
>>> import dask.bag as db >>> b = db.from_sequence(range(10)) >>> iseven = lambda x: x % 2 == 0 >>> add = lambda x, y: x + y >>> dict(b.foldby(iseven, add)) {True: 20, False: 25}
关键功能
键函数决定了如何对你的包中的元素进行分组。在常见情况下,如果你的包包含字典,那么键函数通常会提取其中一个元素。
>>> def key(x): ... return x['name']
这种情况非常常见,因此它被特殊处理,如果你提供了一个不是可调用函数的键,那么 dask.bag 会自动将其转换为一个函数。以下是等价的:
>>> b.foldby(lambda x: x['name'], ...) >>> b.foldby('name', ...)
二元操作
构建正确的二元运算符来执行分析查询可能会有点棘手。
foldby
方法接受两个二元运算符,binop
和combine
。二元运算符的两个输入和输出必须具有相同的类型。Binop 接受一个运行中的总数和一个新元素,并生成一个新的总数:
>>> def binop(total, x): ... return total + x['amount']
Combine 接受两个总数并将它们合并:
>>> def combine(total1, total2): ... return total1 + total2
每个二元运算符都可以有一个默认的初始值用于总计,在看到任何其他值之前。对于上述的加法二元运算符,这通常是
0
或你运算的单位元素。split_every
在进行归约时,将分组划分为此大小的组。默认为8。
>>> b.foldby('name', binop, 0, combine, 0)
参见
toolz.reduceby
pyspark.combineByKey
示例
我们可以计算一些
(键, 值)
对的最大值,这些对按键
分组。(你可能最好将Bag
转换为dask.dataframe
并使用其 groupby)。>>> import random >>> import dask.bag as db
>>> tokens = list('abcdefg') >>> values = range(10000) >>> a = [(random.choice(tokens), random.choice(values)) ... for _ in range(100)] >>> a[:2] [('g', 676), ('a', 871)]
>>> a = db.from_sequence(a)
>>> def binop(t, x): ... return max((t, x), key=lambda x: x[1])
>>> a.foldby(lambda x: x[0], binop).compute() [('g', ('g', 984)), ('a', ('a', 871)), ('b', ('b', 999)), ('c', ('c', 765)), ('f', ('f', 955)), ('e', ('e', 991)), ('d', ('d', 854))]