优化
内容
优化¶
通过在调用调度器之前对Dask图进行小的优化,可以在不同的上下文中显著提高性能。
dask.optimization
模块包含多个函数,用于以各种有用方式转换图。在大多数情况下,用户不需要直接与这些函数交互,因为这些转换的专门子集在 Dask 集合(dask.array
、dask.bag
和 dask.dataframe
)中是自动完成的。然而,使用自定义图或计算的用户可能会发现应用这些方法会带来显著的速度提升。
一般来说,进行图形优化时有两个目标:
简化计算
提高并行性
简化计算可以通过在图级别上移除不必要的任务(cull
),或在任务级别上用更便宜的操作替换昂贵的操作(RewriteRule
)来实现。
通过减少任务间通信可以提高并行性,无论是通过将多个任务融合为一个(fuse
),还是通过内联廉价操作(inline
,inline_functions
)。
下面,我们展示一个示例,通过使用其中一些方法来优化任务图。
示例¶
假设你有一个用于执行单词计数任务的自定义Dask图:
>>> def print_and_return(string):
... print(string)
... return string
>>> def format_str(count, val, nwords):
... return (f'word list has {count} occurrences of '
... f'{val}, out of {nwords} words')
>>> dsk = {'words': 'apple orange apple pear orange pear pear',
... 'nwords': (len, (str.split, 'words')),
... 'val1': 'orange',
... 'val2': 'apple',
... 'val3': 'pear',
... 'count1': (str.count, 'words', 'val1'),
... 'count2': (str.count, 'words', 'val2'),
... 'count3': (str.count, 'words', 'val3'),
... 'format1': (format_str, 'count1', 'val1', 'nwords'),
... 'format2': (format_str, 'count2', 'val2', 'nwords'),
... 'format3': (format_str, 'count3', 'val3', 'nwords'),
... 'print1': (print_and_return, 'format1'),
... 'print2': (print_and_return, 'format2'),
... 'print3': (print_and_return, 'format3')}
在这里,我们统计单词列表中 'orange'
、'apple'
和 'pear'
的出现次数,格式化输出结果字符串,打印输出,然后返回输出字符串。
为了执行计算,我们首先使用 cull
函数从图中移除不必要的组件,然后将 Dask 图和所需的输出键传递给调度器的 get
函数:
>>> from dask.threaded import get
>>> from dask.optimization import cull
>>> outputs = ['print1', 'print2']
>>> dsk1, dependencies = cull(dsk, outputs) # remove unnecessary tasks from the graph
>>> results = get(dsk1, outputs)
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words
如上所示,调度器只计算了请求的输出('print3'
从未被计算)。这是因为我们调用了 dask.optimization.cull
函数,该函数从图中移除了不必要的任务。
剔除是几乎所有集合默认优化过程的一部分。通常你希望在稍早的时候调用它,以减少后续步骤中的工作量:
>>> from dask.optimization import cull
>>> outputs = ['print1', 'print2']
>>> dsk1, dependencies = cull(dsk, outputs)
观察上面的任务图,Dask 图中多次访问了 'val1'
或 'val2'
等常量。可以使用 inline
函数将这些常量内联到任务中以提高效率。例如:
>>> from dask.optimization import inline
>>> dsk2 = inline(dsk1, dependencies=dependencies)
>>> results = get(dsk2, outputs)
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words
现在我们有两组 几乎 线性的任务链。它们之间唯一的联系是词频统计函数。对于这种廉价的操作,序列化的成本可能比实际计算还要大,因此多次进行计算可能比将结果传递给所有节点更快。要执行这种函数内联,可以使用 inline_functions
函数:
>>> from dask.optimization import inline_functions
>>> dsk3 = inline_functions(dsk2, outputs, [len, str.split],
... dependencies=dependencies)
>>> results = get(dsk3, outputs)
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words
现在我们有一组纯粹的线性任务。我们希望调度器在同一个工作节点上运行所有这些任务,以减少工作节点之间的数据序列化。一个选项是使用 fuse
函数将这些线性链合并为一个大的任务:
>>> from dask.optimization import fuse
>>> dsk4, dependencies = fuse(dsk3)
>>> results = get(dsk4, outputs)
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words
综合起来:
>>> def optimize_and_get(dsk, keys):
... dsk1, deps = cull(dsk, keys)
... dsk2 = inline(dsk1, dependencies=deps)
... dsk3 = inline_functions(dsk2, keys, [len, str.split],
... dependencies=deps)
... dsk4, deps = fuse(dsk3)
... return get(dsk4, keys)
>>> optimize_and_get(dsk, outputs)
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words
总之,上述操作完成了以下内容:
使用
cull
移除了不必要的任务,以达到期望的输出。使用
inline
的内联常量使用
inline_functions
进行内联廉价计算,提高并行性将线性任务融合在一起,以确保它们在同一个工作节点上运行,使用
fuse
如前所述,这些优化已经在 Dask 集合中自动执行。不使用自定义图或计算的用户很少需要直接与它们交互。
这些只是 dask.optimization
中提供的一些优化。更多信息,请参见下面的API。
重写规则¶
对于基于上下文的优化,dask.rewrite
提供了模式匹配和术语重写的功能。这对于用等效但更便宜的计算替换昂贵的计算非常有用。例如,Dask Array 使用重写功能将一系列数组切片操作替换为更高效的单一切片。
重写系统的接口由两个类组成:
RewriteRule(lhs, rhs, vars)
给定一个左边的部分(
lhs
),一个右边的部分(rhs
),以及一组变量(vars
),重写规则声明性地编码了以下操作:lhs -> rhs if task matches lhs over variables
RuleSet(*rules)
一组重写规则。
RuleSet
类的设计允许高效的“多对一”模式匹配,这意味着在使用规则集中的多个规则进行重写时,开销最小。
示例¶
在这里,我们创建了两条重写规则,表示以下数学变换:
a + a -> 2*a
a * a -> a**2
其中 'a'
是一个变量:
>>> from dask.rewrite import RewriteRule, RuleSet
>>> from operator import add, mul, pow
>>> variables = ('a',)
>>> rule1 = RewriteRule((add, 'a', 'a'), (mul, 'a', 2), variables)
>>> rule2 = RewriteRule((mul, 'a', 'a'), (pow, 'a', 2), variables)
>>> rs = RuleSet(rule1, rule2)
RewriteRule
对象以声明的方式描述所需的转换,而 RuleSet
则为应用该转换构建了一个高效的自动机。然后可以使用 rewrite
方法进行重写:
>>> rs.rewrite((add, 5, 5))
(mul, 5, 2)
>>> rs.rewrite((mul, 5, 5))
(pow, 5, 2)
>>> rs.rewrite((mul, (add, 3, 3), (add, 3, 3)))
(pow, (mul, 3, 2), 2)
默认情况下,整个任务会被遍历。如果你只想将转换应用于任务的顶层,可以传入 strategy='top_level'
,如下所示:
# Transforms whole task
>>> rs.rewrite((sum, [(add, 3, 3), (mul, 3, 3)]))
(sum, [(mul, 3, 2), (pow, 3, 2)])
# Only applies to top level, no transform occurs
>>> rs.rewrite((sum, [(add, 3, 3), (mul, 3, 3)]), strategy='top_level')
(sum, [(add, 3, 3), (mul, 3, 3)])
重写系统提供了一个强大的抽象,用于在任务级别上转换计算。同样,对于许多用户来说,直接与这些转换交互将是不必要的。
关键字参数¶
一些优化接受可选的关键字参数。要从计算调用中将关键字传递给正确的优化,请在关键字前加上优化的名称。例如,要从计算调用中将 keys=
关键字参数传递给 fuse
优化,请使用 fuse_keys=
关键字:
def fuse(dsk, keys=None):
...
x.compute(fuse_keys=['x', 'y', 'z'])
自定义优化¶
Dask 为每种集合类型(数组、包、数据框、延迟)定义了默认的优化策略。然而,不同的应用可能有不同的需求。为了应对这种需求的多样性,您可以构建自己的自定义优化函数,并使用它来替代默认的优化函数。优化函数接收一个任务图和所需键的列表,并返回一个新的任务图:
def my_optimize_function(dsk, keys):
new_dsk = {...}
return new_dsk
然后,您可以将此优化类注册到您喜欢的任何集合类型,它将代替默认方案使用:
with dask.config.set(array_optimize=my_optimize_function):
x, y = dask.compute(x, y)
你可以为不同的集合注册单独的优化函数,或者如果你不想对某些类型的集合进行优化,可以注册 None
:
with dask.config.set(array_optimize=my_optimize_function,
dataframe_optimize=None,
delayed_optimize=my_other_optimize_function):
...
您不需要指定所有集合。集合将默认使用其标准优化方案(这通常是一个不错的选择)。
API¶
顶级优化
|
返回仅包含计算键所需任务的新 dask。 |
|
融合形成归约的任务;比 |
|
返回一个新的 dask,其中给定的键与其值内联。 |
|
将内联廉价函数合并到更大的操作中 |
实用函数
|
嵌套任务中包含的函数集 |
重写规则
|
重写规则。 |
|
一组重写规则。 |
定义¶
- dask.optimization.cull(dsk, keys)[源代码]¶
返回仅包含计算键所需任务的新 dask。
换句话说,从 dask 中移除不必要的任务。
keys
可以是一个键或键的列表。- 返回
- dsk: 筛选后的 dask 图
- 依赖项:字典映射 {键: [依赖项]}。 加速的有用副作用。
其他优化,特别是 fuse。
示例
>>> def inc(x): ... return x + 1
>>> def add(x, y): ... return x + y
>>> d = {'x': 1, 'y': (inc, 'x'), 'out': (add, 'x', 10)} >>> dsk, dependencies = cull(d, 'out') >>> dsk {'out': (<function add at ...>, 'x', 10), 'x': 1} >>> dependencies {'out': ['x'], 'x': []}
- dask.optimization.fuse(dsk, keys=None, dependencies=None, ave_width=Default.token, max_width=Default.token, max_height=Default.token, max_depth_new_edges=Default.token, rename_keys=Default.token, fuse_subgraphs=Default.token)[源代码]¶
融合形成归约的任务;比
fuse_linear
更高级这种方法通过使任务粒度更粗来换取更快的调度,从而牺牲了并行性机会。它可以在优化过程中替代
fuse_linear
。此优化适用于所有归约任务——最多只有一个依赖的任务——因此可以将其视为将“多输入、单输出”的任务组合融合为一个任务。有许多参数可以微调其行为,下面将对其进行描述。
ave_width
是自然参数,用于比较并行性与粒度,因此应始终指定。如果需要,将使用ave_width
确定其他参数的合理值。- 参数
- dsk: dict
dask 图
- 键: 列表或集合, 可选
必须保留在返回的 dask 图中的键
- 依赖项: dict, 可选
{键: [键列表]}. 必须是一个列表以提供每个键的计数。这个可选输入通常来自
cull
- ave_width: float (默认值为1)
width = num_nodes / height
的上限,是并行化程度的一个良好度量。dask.config 键:optimization.fuse.ave-width
- max_width: int (默认无限)
如果总宽度大于此值,则不要融合。dask.config 键:
optimization.fuse.max-width
- max_height: int 或 None (默认 None)
不要融合超过这个层级。设置为 None 以动态调整为
1.5 + ave_width * log(ave_width + 1)
。dask.config 键:optimization.fuse.max-height
- max_depth_new_edges: int 或 None (默认 None)
如果在这么多层级之后添加了新的依赖项,请不要融合。设置为 None 以动态调整为 ave_width * 1.5。dask.config 键:
optimization.fuse.max-depth-new-edges
- rename_keys: bool 或 func, 可选 (默认 True)
是否使用
default_fused_keys_renamer
重命名融合键。重命名融合键可以使图表更易理解和全面,但这是以额外的处理为代价的。如果为 False,则将使用最顶层的键。对于高级用法,也可以接受一个用于创建新名称的函数。dask.config 键:optimization.fuse.rename-keys
- fuse_subgraphsbool 或 None, 可选 (默认 None)
是否将多个任务融合为
SubgraphCallable
对象。设置为 None 以让各个 dask 集合的默认优化器决定。如果没有集合特定的默认值,None 默认为 False。dask.config 键:optimization.fuse.subgraphs
- 返回
- dsk
输出融合键的图
- 依赖项
融合后依赖关系的字典映射。加速其他下游优化的有用副作用。
- dask.optimization.inline(dsk, keys=None, inline_constants=True, dependencies=None)[源代码]¶
返回一个新的 dask,其中给定的键与其值内联。
如果
inline_constants
关键字为 True,则内联所有常量。请注意,常量键将保留在图中,要删除它们,请在inline
之后执行cull
。示例
>>> def inc(x): ... return x + 1
>>> def add(x, y): ... return x + y
>>> d = {'x': 1, 'y': (inc, 'x'), 'z': (add, 'x', 'y')} >>> inline(d) {'x': 1, 'y': (<function inc at ...>, 1), 'z': (<function add at ...>, 1, 'y')}
>>> inline(d, keys='y') {'x': 1, 'y': (<function inc at ...>, 1), 'z': (<function add at ...>, 1, (<function inc at ...>, 1))}
>>> inline(d, keys='y', inline_constants=False) {'x': 1, 'y': (<function inc at ...>, 'x'), 'z': (<function add at ...>, 'x', (<function inc at ...>, 'x'))}
- dask.optimization.inline_functions(dsk, output, fast_functions=None, inline_constants=False, dependencies=None)[源代码]¶
将内联廉价函数合并到更大的操作中
示例
>>> inc = lambda x: x + 1 >>> add = lambda x, y: x + y >>> double = lambda x: x * 2 >>> dsk = {'out': (add, 'i', 'd'), ... 'i': (inc, 'x'), ... 'd': (double, 'y'), ... 'x': 1, 'y': 1} >>> inline_functions(dsk, [], [inc]) {'out': (add, (inc, 'x'), 'd'), 'd': (double, 'y'), 'x': 1, 'y': 1}
保护输出键。在下面的例子中,
i
没有被内联,因为它被标记为输出键。>>> inline_functions(dsk, ['i', 'out'], [inc, double]) {'out': (add, 'i', (double, 'y')), 'i': (inc, 'x'), 'x': 1, 'y': 1}
- dask.optimization.functions_of(task)[源代码]¶
嵌套任务中包含的函数集
示例
>>> inc = lambda x: x + 1 >>> add = lambda x, y: x + y >>> mul = lambda x, y: x * y >>> task = (add, (mul, 1, 2), (inc, 3)) >>> functions_of(task) set([add, mul, inc])
- dask.rewrite.RewriteRule(lhs, rhs, vars=())[源代码]¶
重写规则。
表达 lhs -> rhs,对于变量 vars。
- 参数
- lhs任务
重写规则的左侧。
- rhs任务或功能
重写规则的右侧。如果它是一个任务,rhs 中的变量将被主题中与 lhs 中变量匹配的项替换。如果它是一个函数,该函数将以这样的匹配字典作为参数被调用。
- vars: 元组, 可选
在左侧找到的变量的元组。变量可以表示为任何可哈希的对象;一个好的惯例是使用字符串。如果没有变量,可以省略此项。
示例
这里有一个 RewriteRule 用于替换所有嵌套调用 list 的情况,使得 (list, (list, ‘x’)) 被替换为 (list, ‘x’),其中 ‘x’ 是一个变量。
>>> import dask.rewrite as dr >>> lhs = (list, (list, 'x')) >>> rhs = (list, 'x') >>> variables = ('x',) >>> rule = dr.RewriteRule(lhs, rhs, variables)
这里有一个更复杂的规则,它使用了可调用的右手边。一个可调用的 rhs 接收一个字典,该字典将变量映射到它们的匹配值。如果 ‘x’ 本身是一个列表,此规则会将所有 (list, ‘x’) 的出现替换为 ‘x’。
>>> lhs = (list, 'x') >>> def repl_list(sd): ... x = sd['x'] ... if isinstance(x, list): ... return x ... else: ... return (list, x) >>> rule = dr.RewriteRule(lhs, repl_list, variables)
- dask.rewrite.RuleSet(*rules)[源代码]¶
一组重写规则。
形成一个结构,用于在一组重写规则上进行快速重写。这允许对术语进行句法匹配,以同时匹配许多模式。
示例
>>> import dask.rewrite as dr >>> def f(*args): pass >>> def g(*args): pass >>> def h(*args): pass >>> from operator import add
>>> rs = dr.RuleSet( ... dr.RewriteRule((add, 'x', 0), 'x', ('x',)), ... dr.RewriteRule((f, (g, 'x'), 'y'), ... (h, 'x', 'y'), ... ('x', 'y')))
>>> rs.rewrite((add, 2, 0)) 2
>>> rs.rewrite((f, (g, 'a', 3))) (<function h at ...>, 'a', 3)
>>> dsk = {'a': (add, 2, 0), ... 'b': (f, (g, 'a', 3))}
>>> from toolz import valmap >>> valmap(rs.rewrite, dsk) {'a': 2, 'b': (<function h at ...>, 'a', 3)}
- 属性
- 规则列表
RuleSet 中包含的 RewriteRule 列表。