高层图
内容
高层图¶
Dask 图表,如数组、包和数据框等集合生成的高级结构,对于可视化和高级优化非常有用。这些集合生成的任务图表将这种结构明确编码为 HighLevelGraph
对象。本文档详细描述了如何更详细地处理这些对象。
动机与示例¶
在一般情况下,Dask 调度器期望任意任务图,其中每个节点是一个单独的 Python 函数调用,每条边是两个函数调用之间的依赖关系。这些通常存储在扁平字典中。以下是一些简单的 Dask DataFrame 代码及其可能生成的任务图:
import dask.dataframe as dd
df = dd.read_csv('myfile.*.csv')
df = df + 100
df = df[df.name == 'Alice']
{
('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
('read-csv', 3): (pandas.read_csv, 'myfile.3.csv'),
('add', 0): (operator.add, ('read-csv', 0), 100),
('add', 1): (operator.add, ('read-csv', 1), 100),
('add', 2): (operator.add, ('read-csv', 2), 100),
('add', 3): (operator.add, ('read-csv', 3), 100),
('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3)),
}
任务图是一个字典,存储了计算最终结果所需的每个Pandas级别函数调用。我们可以看到,如果我们将与每个高级Dask DataFrame操作相关的任务分开,这个字典有一些结构:
{
# From the dask.dataframe.read_csv call
('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
('read-csv', 3): (pandas.read_csv, 'myfile.3.csv'),
# From the df + 100 call
('add', 0): (operator.add, ('read-csv', 0), 100),
('add', 1): (operator.add, ('read-csv', 1), 100),
('add', 2): (operator.add, ('read-csv', 2), 100),
('add', 3): (operator.add, ('read-csv', 3), 100),
# From the df[df.name == 'Alice'] call
('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3)),
}
通过理解这种高层结构,我们能够更容易地理解我们的任务图(这对于每层有数千个任务的大型数据集尤为重要)以及如何执行高层优化。例如,在上面的例子中,我们可能希望自动重写我们的代码,以便在加100之前过滤我们的数据集。
# Before
df = dd.read_csv('myfile.*.csv')
df = df + 100
df = df[df.name == 'Alice']
# After
df = dd.read_csv('myfile.*.csv')
df = df[df.name == 'Alice']
df = df + 100
Dask 的高级图表通过在具有层间依赖关系的层中存储我们的任务图,帮助我们显式地编码这种结构:
>>> import dask.dataframe as dd
>>> df = dd.read_csv('myfile.*.csv')
>>> df = df + 100
>>> df = df[df.name == 'Alice']
>>> graph = df.__dask_graph__()
>>> graph.layers
{
'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},
'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
('add', 1): (operator.add, ('read-csv', 1), 100),
('add', 2): (operator.add, ('read-csv', 2), 100),
('add', 3): (operator.add, ('read-csv', 3), 100)}
'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
}
>>> graph.dependencies
{
'read-csv': set(),
'add': {'read-csv'},
'filter': {'add'}
}
虽然 DataFrame 指向它直接依赖的输出层:
>>> df.__dask_layers__()
{'filter'}
HighLevelGraphs¶
The HighLevelGraph
object is a Mapping
object composed of other
sub-Mappings
, along with a high-level dependency mapping between them:
class HighLevelGraph(Mapping):
layers: Dict[str, Mapping]
dependencies: Dict[str, Set[str]]
你可以通过向构造函数提供两者来显式构造一个 HighLevelGraph:
layers = {
'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},
'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
('add', 1): (operator.add, ('read-csv', 1), 100),
('add', 2): (operator.add, ('read-csv', 2), 100),
('add', 3): (operator.add, ('read-csv', 3), 100)},
'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
}
dependencies = {'read-csv': set(),
'add': {'read-csv'},
'filter': {'add'}}
graph = HighLevelGraph(layers, dependencies)
这个对象满足 Mapping
接口,因此作为一个普通的 Python 字典操作,它是底层层的语义合并:
>>> len(graph)
12
>>> graph[('read-csv', 0)]
('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
API¶
- class dask.highlevelgraph.HighLevelGraph(layers: collections.abc.Mapping[str, collections.abc.Mapping[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any]], dependencies: collections.abc.Mapping[str, set[str]], key_dependencies: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], set[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]]] | None = None)[源代码]¶
由依赖子图层组成的任务图
这个对象编码了一个Dask任务图,该图由依赖的子图层组成,这种情况通常在使用Dask数组、包或数据帧等高级集合构建任务图时出现。
通常,每个高级数组、包或数据帧操作都会获取输入集合的任务图,合并它们,然后为新操作添加一个或多个新的任务层。这些层通常至少有与集合中的分区或块一样多的任务。HighLevelGraph 对象将每个操作的子图分别存储在子图中,并存储它们之间的依赖结构。
- 参数
- 图层Mapping[str, Mapping]
子图层,由唯一名称键控
- 依赖项Mapping[str, set[str]]
每个图层所依赖的图层集合
- 关键依赖dict[键, 集合], 可选
将(某些)高层图中的键映射到它们的依赖项。如果缺少某个键,其依赖项将按需计算。
参见
HighLevelGraph.from_collections
通常由开发人员用来创建新的 HighLevelGraphs
示例
以下是一个理想化的示例,展示了 HighLevelGraph 的内部状态
>>> import dask.dataframe as dd
>>> df = dd.read_csv('myfile.*.csv') >>> df = df + 100 >>> df = df[df.name == 'Alice']
>>> graph = df.__dask_graph__() >>> graph.layers { 'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'), ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'), ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'), ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')}, 'add': {('add', 0): (operator.add, ('read-csv', 0), 100), ('add', 1): (operator.add, ('read-csv', 1), 100), ('add', 2): (operator.add, ('read-csv', 2), 100), ('add', 3): (operator.add, ('read-csv', 3), 100)} 'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)), ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)), ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)), ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))} }
>>> graph.dependencies { 'read-csv': set(), 'add': {'read-csv'}, 'filter': {'add'} }
- cull(keys: collections.abc.Iterable[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]]) dask.highlevelgraph.HighLevelGraph [源代码]¶
返回仅包含计算键所需任务的新 HighLevelGraph。
换句话说,从 dask 中移除不必要的任务。
- 参数
- 键
键的可迭代对象或键的嵌套列表,例如
__dask_keys__()
的输出
- 返回
- hlg: HighLevelGraph
筛选的高层图
- cull_layers(layers: collections.abc.Iterable[str]) dask.highlevelgraph.HighLevelGraph [源代码]¶
返回一个新的 HighLevelGraph,仅包含给定的层及其依赖项。在内部,层不会被修改。
这是
HighLevelGraph.cull()
的一个变体,速度更快,并且在两个筛选后的图表合并时,不会在两个具有相同名称和不同内容的层之间产生冲突。- 返回
- hlg: HighLevelGraph
筛选的高层图
- classmethod from_collections(name: str, layer: collections.abc.Mapping[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any], dependencies: collections.abc.Sequence[dask.typing.DaskCollection] = ()) dask.highlevelgraph.HighLevelGraph [源代码]¶
从新层和一组集合构建一个 HighLevelGraph
这在常见情况下构建了一个 HighLevelGraph,其中我们有一个新的层和一组我们希望依赖的旧集合。
这会提取集合中的
__dask_layers__()
方法(如果存在),并将它们添加到此新层的依赖项中。它还将所有依赖集合的所有层合并到此图的新层中。- 参数
- 名称str
新层的名称
- 层映射
图层本身
- 依赖项Dask 集合列表
其他 dask 集合(如数组或数据框)的列表,它们本身具有图结构
示例
在典型用法中,我们创建一个新的任务层,然后将该层及其所有依赖集合传递给此方法。
>>> def add(self, other): ... name = 'add-' + tokenize(self, other) ... layer = {(name, i): (add, input_key, other) ... for i, input_key in enumerate(self.__dask_keys__())} ... graph = HighLevelGraph.from_collections(name, layer, dependencies=[self]) ... return new_collection(name, graph)
- get(k[, d]) D[k] if k in D, else d. d defaults to None. ¶
- get_all_dependencies() dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], set[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]]] [源代码]¶
获取所有键的依赖项
在大多数情况下,这将实现所有层,这使得它成为一个昂贵的操作。
- 返回
- map: 映射
一个映射,将每个键映射到其依赖项
- get_all_external_keys() set[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]] [源代码]¶
获取所有层的所有输出键
在大多数情况下,这 _不会_ 实现任何层,这使得它成为一个相对廉价的操作。
- 返回
- 键:集合
所有外部键的集合
- keys() collections.abc.KeysView [源代码]¶
获取所有层的所有键。
这在许多情况下会实现层级,这使得它成为一个相对昂贵的操作。参见
get_all_external_keys()
以获取更快的替代方案。