优化

通过在调用调度器之前对Dask图进行小的优化,可以在不同的上下文中显著提高性能。

dask.optimization 模块包含多个函数,用于以各种有用方式转换图。在大多数情况下,用户不需要直接与这些函数交互,因为这些转换的专门子集在 Dask 集合(dask.arraydask.bagdask.dataframe)中是自动完成的。然而,使用自定义图或计算的用户可能会发现应用这些方法会带来显著的速度提升。

一般来说,进行图形优化时有两个目标:

  1. 简化计算

  2. 提高并行性

简化计算可以通过在图级别上移除不必要的任务(cull),或在任务级别上用更便宜的操作替换昂贵的操作(RewriteRule)来实现。

通过减少任务间通信可以提高并行性,无论是通过将多个任务融合为一个(fuse),还是通过内联廉价操作(inlineinline_functions)。

下面,我们展示一个示例,通过使用其中一些方法来优化任务图。

示例

假设你有一个用于执行单词计数任务的自定义Dask图:

>>> def print_and_return(string):
...     print(string)
...     return string

>>> def format_str(count, val, nwords):
...     return (f'word list has {count} occurrences of '
...             f'{val}, out of {nwords} words')

>>> dsk = {'words': 'apple orange apple pear orange pear pear',
...        'nwords': (len, (str.split, 'words')),
...        'val1': 'orange',
...        'val2': 'apple',
...        'val3': 'pear',
...        'count1': (str.count, 'words', 'val1'),
...        'count2': (str.count, 'words', 'val2'),
...        'count3': (str.count, 'words', 'val3'),
...        'format1': (format_str, 'count1', 'val1', 'nwords'),
...        'format2': (format_str, 'count2', 'val2', 'nwords'),
...        'format3': (format_str, 'count3', 'val3', 'nwords'),
...        'print1': (print_and_return, 'format1'),
...        'print2': (print_and_return, 'format2'),
...        'print3': (print_and_return, 'format3')}
原始的非优化 Dask 任务图。

在这里,我们统计单词列表中 'orange''apple''pear' 的出现次数,格式化输出结果字符串,打印输出,然后返回输出字符串。

为了执行计算,我们首先使用 cull 函数从图中移除不必要的组件,然后将 Dask 图和所需的输出键传递给调度器的 get 函数:

>>> from dask.threaded import get
>>> from dask.optimization import cull

>>> outputs = ['print1', 'print2']
>>> dsk1, dependencies = cull(dsk, outputs)  # remove unnecessary tasks from the graph

>>> results = get(dsk1, outputs)
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words

如上所示,调度器只计算了请求的输出('print3' 从未被计算)。这是因为我们调用了 dask.optimization.cull 函数,该函数从图中移除了不必要的任务。

剔除是几乎所有集合默认优化过程的一部分。通常你希望在稍早的时候调用它,以减少后续步骤中的工作量:

>>> from dask.optimization import cull
>>> outputs = ['print1', 'print2']
>>> dsk1, dependencies = cull(dsk, outputs)
优化后剔除任务的Dask任务图。

观察上面的任务图,Dask 图中多次访问了 'val1''val2' 等常量。可以使用 inline 函数将这些常量内联到任务中以提高效率。例如:

>>> from dask.optimization import inline
>>> dsk2 = inline(dsk1, dependencies=dependencies)
>>> results = get(dsk2, outputs)
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words
优化后内联的 Dask 任务图。

现在我们有两组 几乎 线性的任务链。它们之间唯一的联系是词频统计函数。对于这种廉价的操作,序列化的成本可能比实际计算还要大,因此多次进行计算可能比将结果传递给所有节点更快。要执行这种函数内联,可以使用 inline_functions 函数:

>>> from dask.optimization import inline_functions
>>> dsk3 = inline_functions(dsk2, outputs, [len, str.split],
...                         dependencies=dependencies)
>>> results = get(dsk3, outputs)
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words
优化后内联函数的Dask任务图。

现在我们有一组纯粹的线性任务。我们希望调度器在同一个工作节点上运行所有这些任务,以减少工作节点之间的数据序列化。一个选项是使用 fuse 函数将这些线性链合并为一个大的任务:

>>> from dask.optimization import fuse
>>> dsk4, dependencies = fuse(dsk3)
>>> results = get(dsk4, outputs)
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words
优化后融合任务的Dask任务图。

综合起来:

>>> def optimize_and_get(dsk, keys):
...     dsk1, deps = cull(dsk, keys)
...     dsk2 = inline(dsk1, dependencies=deps)
...     dsk3 = inline_functions(dsk2, keys, [len, str.split],
...                             dependencies=deps)
...     dsk4, deps = fuse(dsk3)
...     return get(dsk4, keys)

>>> optimize_and_get(dsk, outputs)
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words

总之,上述操作完成了以下内容:

  1. 使用 cull 移除了不必要的任务,以达到期望的输出。

  2. 使用 inline 的内联常量

  3. 使用 inline_functions 进行内联廉价计算,提高并行性

  4. 将线性任务融合在一起,以确保它们在同一个工作节点上运行,使用 fuse

如前所述,这些优化已经在 Dask 集合中自动执行。不使用自定义图或计算的用户很少需要直接与它们交互。

这些只是 dask.optimization 中提供的一些优化。更多信息,请参见下面的API。

重写规则

对于基于上下文的优化,dask.rewrite 提供了模式匹配和术语重写的功能。这对于用等效但更便宜的计算替换昂贵的计算非常有用。例如,Dask Array 使用重写功能将一系列数组切片操作替换为更高效的单一切片。

重写系统的接口由两个类组成:

  1. RewriteRule(lhs, rhs, vars)

    给定一个左边的部分(lhs),一个右边的部分(rhs),以及一组变量(vars),重写规则声明性地编码了以下操作:

    lhs -> rhs if task matches lhs over variables

  2. RuleSet(*rules)

    一组重写规则。RuleSet 类的设计允许高效的“多对一”模式匹配,这意味着在使用规则集中的多个规则进行重写时,开销最小。

示例

在这里,我们创建了两条重写规则,表示以下数学变换:

  1. a + a -> 2*a

  2. a * a -> a**2

其中 'a' 是一个变量:

>>> from dask.rewrite import RewriteRule, RuleSet
>>> from operator import add, mul, pow

>>> variables = ('a',)

>>> rule1 = RewriteRule((add, 'a', 'a'), (mul, 'a', 2), variables)

>>> rule2 = RewriteRule((mul, 'a', 'a'), (pow, 'a', 2), variables)

>>> rs = RuleSet(rule1, rule2)

RewriteRule 对象以声明的方式描述所需的转换,而 RuleSet 则为应用该转换构建了一个高效的自动机。然后可以使用 rewrite 方法进行重写:

>>> rs.rewrite((add, 5, 5))
(mul, 5, 2)

>>> rs.rewrite((mul, 5, 5))
(pow, 5, 2)

>>> rs.rewrite((mul, (add, 3, 3), (add, 3, 3)))
(pow, (mul, 3, 2), 2)

默认情况下,整个任务会被遍历。如果你只想将转换应用于任务的顶层,可以传入 strategy='top_level' ,如下所示:

# Transforms whole task
>>> rs.rewrite((sum, [(add, 3, 3), (mul, 3, 3)]))
(sum, [(mul, 3, 2), (pow, 3, 2)])

# Only applies to top level, no transform occurs
>>> rs.rewrite((sum, [(add, 3, 3), (mul, 3, 3)]), strategy='top_level')
(sum, [(add, 3, 3), (mul, 3, 3)])

重写系统提供了一个强大的抽象,用于在任务级别上转换计算。同样,对于许多用户来说,直接与这些转换交互将是不必要的。

关键字参数

一些优化接受可选的关键字参数。要从计算调用中将关键字传递给正确的优化,请在关键字前加上优化的名称。例如,要从计算调用中将 keys= 关键字参数传递给 fuse 优化,请使用 fuse_keys= 关键字:

def fuse(dsk, keys=None):
    ...

x.compute(fuse_keys=['x', 'y', 'z'])

自定义优化

Dask 为每种集合类型(数组、包、数据框、延迟)定义了默认的优化策略。然而,不同的应用可能有不同的需求。为了应对这种需求的多样性,您可以构建自己的自定义优化函数,并使用它来替代默认的优化函数。优化函数接收一个任务图和所需键的列表,并返回一个新的任务图:

def my_optimize_function(dsk, keys):
    new_dsk = {...}
    return new_dsk

然后,您可以将此优化类注册到您喜欢的任何集合类型,它将代替默认方案使用:

with dask.config.set(array_optimize=my_optimize_function):
    x, y = dask.compute(x, y)

你可以为不同的集合注册单独的优化函数,或者如果你不想对某些类型的集合进行优化,可以注册 None

with dask.config.set(array_optimize=my_optimize_function,
                     dataframe_optimize=None,
                     delayed_optimize=my_other_optimize_function):
    ...

您不需要指定所有集合。集合将默认使用其标准优化方案(这通常是一个不错的选择)。

API

顶级优化

cull(dsk, keys)

返回仅包含计算键所需任务的新 dask。

fuse(dsk[, keys, dependencies, ave_width, ...])

融合形成归约的任务;比 fuse_linear 更高级

inline(dsk[, keys, inline_constants, ...])

返回一个新的 dask,其中给定的键与其值内联。

inline_functions(dsk, output[, ...])

将内联廉价函数合并到更大的操作中

实用函数

functions_of(task)

嵌套任务中包含的函数集

重写规则

RewriteRule(lhs, rhs[, vars])

重写规则。

RuleSet(*rules)

一组重写规则。

定义

dask.optimization.cull(dsk, keys)[源代码]

返回仅包含计算键所需任务的新 dask。

换句话说,从 dask 中移除不必要的任务。keys 可以是一个键或键的列表。

返回
dsk: 筛选后的 dask 图
依赖项:字典映射 {键: [依赖项]}。 加速的有用副作用。

其他优化,特别是 fuse。

示例

>>> def inc(x):
...     return x + 1
>>> def add(x, y):
...     return x + y
>>> d = {'x': 1, 'y': (inc, 'x'), 'out': (add, 'x', 10)}
>>> dsk, dependencies = cull(d, 'out')
>>> dsk                                                     
{'out': (<function add at ...>, 'x', 10), 'x': 1}
>>> dependencies                                            
{'out': ['x'], 'x': []}
dask.optimization.fuse(dsk, keys=None, dependencies=None, ave_width=Default.token, max_width=Default.token, max_height=Default.token, max_depth_new_edges=Default.token, rename_keys=Default.token, fuse_subgraphs=Default.token)[源代码]

融合形成归约的任务;比 fuse_linear 更高级

这种方法通过使任务粒度更粗来换取更快的调度,从而牺牲了并行性机会。它可以在优化过程中替代 fuse_linear

此优化适用于所有归约任务——最多只有一个依赖的任务——因此可以将其视为将“多输入、单输出”的任务组合融合为一个任务。有许多参数可以微调其行为,下面将对其进行描述。ave_width 是自然参数,用于比较并行性与粒度,因此应始终指定。如果需要,将使用 ave_width 确定其他参数的合理值。

参数
dsk: dict

dask 图

键: 列表或集合, 可选

必须保留在返回的 dask 图中的键

依赖项: dict, 可选

{键: [键列表]}. 必须是一个列表以提供每个键的计数。这个可选输入通常来自 cull

ave_width: float (默认值为1)

width = num_nodes / height 的上限,是并行化程度的一个良好度量。dask.config 键:optimization.fuse.ave-width

max_width: int (默认无限)

如果总宽度大于此值,则不要融合。dask.config 键: optimization.fuse.max-width

max_height: int 或 None (默认 None)

不要融合超过这个层级。设置为 None 以动态调整为 1.5 + ave_width * log(ave_width + 1)。dask.config 键:optimization.fuse.max-height

max_depth_new_edges: int 或 None (默认 None)

如果在这么多层级之后添加了新的依赖项,请不要融合。设置为 None 以动态调整为 ave_width * 1.5。dask.config 键: optimization.fuse.max-depth-new-edges

rename_keys: bool 或 func, 可选 (默认 True)

是否使用 default_fused_keys_renamer 重命名融合键。重命名融合键可以使图表更易理解和全面,但这是以额外的处理为代价的。如果为 False,则将使用最顶层的键。对于高级用法,也可以接受一个用于创建新名称的函数。dask.config 键: optimization.fuse.rename-keys

fuse_subgraphsbool 或 None, 可选 (默认 None)

是否将多个任务融合为 SubgraphCallable 对象。设置为 None 以让各个 dask 集合的默认优化器决定。如果没有集合特定的默认值,None 默认为 False。dask.config 键:optimization.fuse.subgraphs

返回
dsk

输出融合键的图

依赖项

融合后依赖关系的字典映射。加速其他下游优化的有用副作用。

dask.optimization.inline(dsk, keys=None, inline_constants=True, dependencies=None)[源代码]

返回一个新的 dask,其中给定的键与其值内联。

如果 inline_constants 关键字为 True,则内联所有常量。请注意,常量键将保留在图中,要删除它们,请在 inline 之后执行 cull

示例

>>> def inc(x):
...     return x + 1
>>> def add(x, y):
...     return x + y
>>> d = {'x': 1, 'y': (inc, 'x'), 'z': (add, 'x', 'y')}
>>> inline(d)       
{'x': 1, 'y': (<function inc at ...>, 1), 'z': (<function add at ...>, 1, 'y')}
>>> inline(d, keys='y') 
{'x': 1, 'y': (<function inc at ...>, 1), 'z': (<function add at ...>, 1, (<function inc at ...>, 1))}
>>> inline(d, keys='y', inline_constants=False) 
{'x': 1, 'y': (<function inc at ...>, 'x'), 'z': (<function add at ...>, 'x', (<function inc at ...>, 'x'))}
dask.optimization.inline_functions(dsk, output, fast_functions=None, inline_constants=False, dependencies=None)[源代码]

将内联廉价函数合并到更大的操作中

示例

>>> inc = lambda x: x + 1
>>> add = lambda x, y: x + y
>>> double = lambda x: x * 2
>>> dsk = {'out': (add, 'i', 'd'),  
...        'i': (inc, 'x'),
...        'd': (double, 'y'),
...        'x': 1, 'y': 1}
>>> inline_functions(dsk, [], [inc])  
{'out': (add, (inc, 'x'), 'd'),
 'd': (double, 'y'),
 'x': 1, 'y': 1}

保护输出键。在下面的例子中,i 没有被内联,因为它被标记为输出键。

>>> inline_functions(dsk, ['i', 'out'], [inc, double])  
{'out': (add, 'i', (double, 'y')),
 'i': (inc, 'x'),
 'x': 1, 'y': 1}
dask.optimization.functions_of(task)[源代码]

嵌套任务中包含的函数集

示例

>>> inc = lambda x: x + 1
>>> add = lambda x, y: x + y
>>> mul = lambda x, y: x * y
>>> task = (add, (mul, 1, 2), (inc, 3))  
>>> functions_of(task)  
set([add, mul, inc])
dask.rewrite.RewriteRule(lhs, rhs, vars=())[源代码]

重写规则。

表达 lhs -> rhs,对于变量 vars

参数
lhs任务

重写规则的左侧。

rhs任务或功能

重写规则的右侧。如果它是一个任务,rhs 中的变量将被主题中与 lhs 中变量匹配的项替换。如果它是一个函数,该函数将以这样的匹配字典作为参数被调用。

vars: 元组, 可选

在左侧找到的变量的元组。变量可以表示为任何可哈希的对象;一个好的惯例是使用字符串。如果没有变量,可以省略此项。

示例

这里有一个 RewriteRule 用于替换所有嵌套调用 list 的情况,使得 (list, (list, ‘x’)) 被替换为 (list, ‘x’),其中 ‘x’ 是一个变量。

>>> import dask.rewrite as dr
>>> lhs = (list, (list, 'x'))
>>> rhs = (list, 'x')
>>> variables = ('x',)
>>> rule = dr.RewriteRule(lhs, rhs, variables)

这里有一个更复杂的规则,它使用了可调用的右手边。一个可调用的 rhs 接收一个字典,该字典将变量映射到它们的匹配值。如果 ‘x’ 本身是一个列表,此规则会将所有 (list, ‘x’) 的出现替换为 ‘x’

>>> lhs = (list, 'x')
>>> def repl_list(sd):
...     x = sd['x']
...     if isinstance(x, list):
...         return x
...     else:
...         return (list, x)
>>> rule = dr.RewriteRule(lhs, repl_list, variables)
dask.rewrite.RuleSet(*rules)[源代码]

一组重写规则。

形成一个结构,用于在一组重写规则上进行快速重写。这允许对术语进行句法匹配,以同时匹配许多模式。

示例

>>> import dask.rewrite as dr
>>> def f(*args): pass
>>> def g(*args): pass
>>> def h(*args): pass
>>> from operator import add
>>> rs = dr.RuleSet(
...         dr.RewriteRule((add, 'x', 0), 'x', ('x',)),
...         dr.RewriteRule((f, (g, 'x'), 'y'),
...                        (h, 'x', 'y'),
...                        ('x', 'y')))
>>> rs.rewrite((add, 2, 0))
2
>>> rs.rewrite((f, (g, 'a', 3)))    
(<function h at ...>, 'a', 3)
>>> dsk = {'a': (add, 2, 0),
...        'b': (f, (g, 'a', 3))}
>>> from toolz import valmap
>>> valmap(rs.rewrite, dsk)         
{'a': 2, 'b': (<function h at ...>, 'a', 3)}
属性
规则列表

RuleSet 中包含的 RewriteRule 列表。