dask.bag.Bag.foldby

dask.bag.Bag.foldby

Bag.foldby(key, binop, initial=_NoDefault.no_default, combine=None, combine_initial=_NoDefault.no_default, split_every=None)[源代码]

组合的归约和分组。

Foldby 提供了一个结合 groupby 和 reduce 的功能,用于高效的并行分割-应用-合并任务。

计算

>>> b.foldby(key, binop, init)                        

等同于以下内容:

>>> def reduction(group):                               
...     return reduce(binop, group, init)               
>>> b.groupby(key).map(lambda (k, v): (k, reduction(v)))

但它使用最少的通信,因此速度*快得多*。

>>> import dask.bag as db
>>> b = db.from_sequence(range(10))
>>> iseven = lambda x: x % 2 == 0
>>> add = lambda x, y: x + y
>>> dict(b.foldby(iseven, add))
{True: 20, False: 25}

关键功能

键函数决定了如何对你的包中的元素进行分组。在常见情况下,如果你的包包含字典,那么键函数通常会提取其中一个元素。

>>> def key(x):
...     return x['name']

这种情况非常常见,因此它被特殊处理,如果你提供了一个不是可调用函数的键,那么 dask.bag 会自动将其转换为一个函数。以下是等价的:

>>> b.foldby(lambda x: x['name'], ...)  
>>> b.foldby('name', ...)  

二元操作

构建正确的二元运算符来执行分析查询可能会有点棘手。foldby 方法接受两个二元运算符,binopcombine。二元运算符的两个输入和输出必须具有相同的类型。

Binop 接受一个运行中的总数和一个新元素,并生成一个新的总数:

>>> def binop(total, x):
...     return total + x['amount']

Combine 接受两个总数并将它们合并:

>>> def combine(total1, total2):
...     return total1 + total2

每个二元运算符都可以有一个默认的初始值用于总计,在看到任何其他值之前。对于上述的加法二元运算符,这通常是 0 或你运算的单位元素。

split_every

在进行归约时,将分组划分为此大小的组。默认为8。

>>> b.foldby('name', binop, 0, combine, 0)  

参见

toolz.reduceby
pyspark.combineByKey

示例

我们可以计算一些 (键, 值) 对的最大值,这些对按 分组。(你可能最好将 Bag 转换为 dask.dataframe 并使用其 groupby)。

>>> import random
>>> import dask.bag as db
>>> tokens = list('abcdefg')
>>> values = range(10000)
>>> a = [(random.choice(tokens), random.choice(values))
...       for _ in range(100)]
>>> a[:2]  
[('g', 676), ('a', 871)]
>>> a = db.from_sequence(a)
>>> def binop(t, x):
...     return max((t, x), key=lambda x: x[1])
>>> a.foldby(lambda x: x[0], binop).compute()  
[('g', ('g', 984)),
 ('a', ('a', 871)),
 ('b', ('b', 999)),
 ('c', ('c', 765)),
 ('f', ('f', 955)),
 ('e', ('e', 991)),
 ('d', ('d', 854))]