优化器

备注

自2024.03.0版本起,Dask DataFrame 支持查询规划。

优化步骤

Dask DataFrame 在执行计算之前会运行几个优化。这些计算旨在提高查询的效率。

优化包括以下步骤(此列表并不完整):

  • 投影下推:

    在每个步骤中只选择所需的列。这减少了需要从存储中读取的数据量,同时也减少了处理过程中的数据量。列在查询的最早阶段被删除。

  • 过滤器下推:

    尽可能将过滤器下推,潜在地推到IO步骤中。过滤器在查询的最早阶段执行。

  • 分区修剪:

    分区选择尽可能地向下推,可能会进入IO步骤。

  • 避免洗牌:

    Dask DataFrame 将尽可能避免在工作线程之间洗牌数据。如果列布局已经知道,即如果DataFrame之前已经在同一列上洗牌过,则可以实现这一点。例如,在合并操作之后执行 df.groupby(...).apply(...) 不会再次洗牌数据,如果groupby发生在合并列上。

    _images/avoiding-shuffles.svg

    同样地,在相同的连接列上执行两次连续的连接/合并操作将避免再次打乱数据。优化器识别到DataFrame的分区已经符合预期,因此将操作简化为一次打乱和一次简单的合并操作。

  • 自动调整分区大小:

    IO 层会根据从数据集中选择的列子集自动调整分区数量。非常小的分区会对调度器和像洗牌这样的昂贵操作产生负面影响。这通过自动调整分区数量来解决。

    _images/automatic-repartitioning.svg

    选择两个列,它们共同占用每个200 MB文件中的40 MB。优化器将分区数量减少了5倍。

探索优化查询

Dask 将在执行计算之前调用 df.optimize()。此方法应用上述步骤并返回一个表示优化查询的新 Dask DataFrame。

通过调用 df.pprint() 可以获得优化查询的初步表示。这将把查询计划以人类可读的格式打印到命令行/控制台。这种方法的优点是不需要任何额外的依赖。

pdf = pd.DataFrame({"a": [1, 2, 3] * 5, "b": [1, 2, 3] * 5})
df = dd.from_pandas(pdf, npartitions=2)
df = df.replace(1, 5)[["a"]]

df.optimize().pprint()

Replace: to_replace=1 value=5
    FromPandas: frame='<pandas>' npartitions=2 columns=['a'] pyarrow_strings_enabled=True

通过调用 df.explain() 可以获得更高级且更易读的表示。此方法需要安装 graphviz 包。该方法将返回一个表示查询计划的图,并从中创建一个图像。

df.explain()
优化查询

我们可以在两种表示中看到,FromPandas 消耗了列投影,只选择了列 a

explain() 方法对于更复杂的查询来说,理解起来明显更容易。