洗牌性能
内容
洗牌性能¶
像 groupby
、join
和 set_index
这样的操作由于 Dask DataFrame 的并行、大于内存和分布式特性,其性能考虑与普通的 Pandas 有所不同。
简单案例¶
首先,常见的 groupby 操作,如 df.groupby(columns).reduction()
对于已知的归约操作,如 mean, sum, std, var, count, nunique
,即使分区没有明确地按已知划分进行划分,它们也都非常快速和高效。这是常见的情况。
此外,如果已知分组,那么在分组列包含索引时,对组应用任意函数是高效的。
当将 Dask DataFrame 与 Pandas DataFrame 连接,或沿着索引连接两个 Dask DataFrame 时,连接操作也非常快。在这些常见情况下操作时,无需特别考虑。
所以,如果你正在进行常见的分组和连接操作,那么你可以停止阅读这个。一切都会很好地扩展。幸运的是,大多数情况下都是如此:
>>> ddf.groupby(columns).known_reduction() # Fast and common case
>>> ddf.groupby(columns_with_index).apply(user_fn) # Fast and common case
>>> ddf.join(pandas_df, on=column) # Fast and common case
>>> lhs.join(rhs) # Fast and common case
>>> lhs.merge(rhs, on=columns_with_index) # Fast and common case
困难案例¶
在某些情况下,例如当对组应用任意函数(在不按已知分区对索引进行分组时)、在沿非索引列连接时,或者当显式地将未排序的列设置为索引时,我们可能需要触发整个数据集的洗牌:
>>> ddf.groupby(columns_no_index).apply(user_fn) # Requires shuffle
>>> lhs.join(rhs, on=columns_no_index) # Requires shuffle
>>> ddf.set_index(column) # Requires shuffle
当我们需要根据新的索引重新排序数据时,洗牌是必要的。例如,如果我们有按时间组织的银行记录,而现在我们想按用户ID组织它们,那么我们需要移动大量数据。在Pandas中,所有这些数据都适合内存,所以这个操作很容易。现在我们不假设所有数据都适合内存,我们必须更加小心。
通过限制自己只处理上述简单情况,可以避免对数据进行重新排序。
洗牌方法¶
目前有两种策略来打乱数据,取决于你是在单机上还是在分布式集群上:磁盘打乱和网络打乱。
网络上的洗牌¶
在分布式集群上操作时,Dask 工作节点可能无法访问共享硬盘。在这种情况下,我们通过根据数据最终位置将输入分区拆分成多个部分,并在网络中移动这些部分来实现数据洗牌。
选择方法¶
Dask 默认使用磁盘洗牌,但如果默认调度器设置为使用 dask.distributed.Client
,例如用户将 Client 设置为默认时,Dask 将切换到分布式洗牌算法。
client = Client('scheduler:8786', set_as_default=True)
或者,如果你倾向于避免默认设置,你可以通过 dataframe.shuffle.method
配置选项来配置全局洗牌方法。这可以在全局范围内完成:
dask.config.set({"dataframe.shuffle.method": "p2p"})
ddf.groupby(...).apply(...)
或作为上下文管理器:
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
ddf.groupby(...).apply(...)
此外,set_index
还接受一个 shuffle_method
关键字参数,可用于选择基于磁盘或基于任务的洗牌方式:
ddf.set_index(column, shuffle_method='disk')
ddf.set_index(column, shuffle_method='tasks')
ddf.set_index(column, shuffle_method='p2p')
聚合¶
Dask 支持 Pandas 的 aggregate
语法,以在同一组上运行多个归约操作。常见的归约操作,如 max
、sum
、list
和 mean
,都直接得到支持:
>>> ddf.groupby(columns).aggregate(['sum', 'mean', 'max', 'min', list])
Dask 也支持用户定义的归约操作。为了确保性能,归约操作必须以三个独立步骤的形式来定义。chunk
步骤独立应用于每个分区,并在分区内部减少数据。aggregate
步骤合并分区内的结果。可选的 finalize
步骤合并从 aggregate
步骤返回的结果,并应返回一个最终的列。为了让 Dask 识别归约操作,它必须作为 dask.dataframe.Aggregation
的实例传递。
例如,sum
可以实现为:
custom_sum = dd.Aggregation('custom_sum', lambda s: s.sum(), lambda s0: s0.sum())
ddf.groupby('g').agg(custom_sum)
名称参数应与现有归约不同,以避免数据损坏。每个函数的参数是预分组的系列对象,类似于 df.groupby('g')['value']
。
许多归约只能通过多个临时变量来实现。为了实现这些归约,步骤应返回元组并期望多个参数。一个均值函数可以实现为:
custom_mean = dd.Aggregation(
'custom_mean',
lambda s: (s.count(), s.sum()),
lambda count, sum: (count.sum(), sum.sum()),
lambda count, sum: sum / count,
)
ddf.groupby('g').agg(custom_mean)
例如,让我们计算一个DataFrame的组内范围(最大值 - 最小值)。
>>> df = pd.DataFrame({
... 'a': ['a', 'b', 'a', 'a', 'b'],
... 'b': [0, 1, 0, 2, 5],
... })
>>> ddf = dd.from_pandas(df, 2)
我们定义了用于找到每个块的最大值和最小值的基本构建块,然后是所有块的最大值和最小值。最后,我们通过最大值和最小值的Series之间的差异来完成。
>>> def chunk(grouped):
... return grouped.max(), grouped.min()
>>> def agg(chunk_maxes, chunk_mins):
... return chunk_maxes.max(), chunk_mins.min()
>>> def finalize(maxima, minima):
... return maxima - minima
最后,我们创建并使用聚合
>>> extent = dd.Aggregation('extent', chunk, agg, finalize=finalize)
>>> ddf.groupby('a').agg(extent).compute()
b
a
a 2
b 4
要应用 dask.dataframe.groupby.SeriesGroupBy.nunique
到多个列,你可以使用:
>>> df['c'] = [1, 2, 1, 1, 2]
>>> ddf = dd.from_pandas(df, 2)
>>> nunique = dd.Aggregation(
... name="nunique",
... chunk=lambda s: s.apply(lambda x: list(set(x))),
... agg=lambda s0: s0.obj.groupby(level=list(range(s0.obj.index.nlevels))).sum(),
... finalize=lambda s1: s1.apply(lambda final: len(set(final))),
... )
>>> ddf.groupby('a').agg({'b':nunique, 'c':nunique})
要访问 NumPy 函数,请使用 apply
和 lambda 函数,例如 .apply(lambda r: np.sum(r))
。以下是一个平方和聚合的示例:
>>> dd.Aggregation(name="sum_of_squares", chunk=lambda s: s.apply(lambda r: np.sum(np.power(r, 2))), agg=lambda s: s.sum())