高层图

Dask 图表,如数组、包和数据框等集合生成的高级结构,对于可视化和高级优化非常有用。这些集合生成的任务图表将这种结构明确编码为 HighLevelGraph 对象。本文档详细描述了如何更详细地处理这些对象。

动机与示例

在一般情况下,Dask 调度器期望任意任务图,其中每个节点是一个单独的 Python 函数调用,每条边是两个函数调用之间的依赖关系。这些通常存储在扁平字典中。以下是一些简单的 Dask DataFrame 代码及其可能生成的任务图:

import dask.dataframe as dd

df = dd.read_csv('myfile.*.csv')
df = df + 100
df = df[df.name == 'Alice']
{
 ('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
 ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
 ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
 ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv'),
 ('add', 0): (operator.add, ('read-csv', 0), 100),
 ('add', 1): (operator.add, ('read-csv', 1), 100),
 ('add', 2): (operator.add, ('read-csv', 2), 100),
 ('add', 3): (operator.add, ('read-csv', 3), 100),
 ('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
 ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
 ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
 ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3)),
}

任务图是一个字典,存储了计算最终结果所需的每个Pandas级别函数调用。我们可以看到,如果我们将与每个高级Dask DataFrame操作相关的任务分开,这个字典有一些结构:

{
 # From the dask.dataframe.read_csv call
 ('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
 ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
 ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
 ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv'),

 # From the df + 100 call
 ('add', 0): (operator.add, ('read-csv', 0), 100),
 ('add', 1): (operator.add, ('read-csv', 1), 100),
 ('add', 2): (operator.add, ('read-csv', 2), 100),
 ('add', 3): (operator.add, ('read-csv', 3), 100),

 # From the df[df.name == 'Alice'] call
 ('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
 ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
 ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
 ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3)),
}

通过理解这种高层结构,我们能够更容易地理解我们的任务图(这对于每层有数千个任务的大型数据集尤为重要)以及如何执行高层优化。例如,在上面的例子中,我们可能希望自动重写我们的代码,以便在加100之前过滤我们的数据集。

# Before
df = dd.read_csv('myfile.*.csv')
df = df + 100
df = df[df.name == 'Alice']

# After
df = dd.read_csv('myfile.*.csv')
df = df[df.name == 'Alice']
df = df + 100

Dask 的高级图表通过在具有层间依赖关系的层中存储我们的任务图,帮助我们显式地编码这种结构:

>>> import dask.dataframe as dd

>>> df = dd.read_csv('myfile.*.csv')
>>> df = df + 100
>>> df = df[df.name == 'Alice']

>>> graph = df.__dask_graph__()
>>> graph.layers
{
 'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
              ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
              ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
              ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},

 'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
         ('add', 1): (operator.add, ('read-csv', 1), 100),
         ('add', 2): (operator.add, ('read-csv', 2), 100),
         ('add', 3): (operator.add, ('read-csv', 3), 100)}

 'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
            ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
            ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
            ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
}

>>> graph.dependencies
{
 'read-csv': set(),
 'add': {'read-csv'},
 'filter': {'add'}
}

虽然 DataFrame 指向它直接依赖的输出层:

>>> df.__dask_layers__()
{'filter'}

HighLevelGraphs

The HighLevelGraph object is a Mapping object composed of other sub-Mappings, along with a high-level dependency mapping between them:

class HighLevelGraph(Mapping):
    layers: Dict[str, Mapping]
    dependencies: Dict[str, Set[str]]

你可以通过向构造函数提供两者来显式构造一个 HighLevelGraph:

layers = {
   'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
                ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
                ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
                ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},

   'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
           ('add', 1): (operator.add, ('read-csv', 1), 100),
           ('add', 2): (operator.add, ('read-csv', 2), 100),
           ('add', 3): (operator.add, ('read-csv', 3), 100)},

   'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
              ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
              ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
              ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
}

dependencies = {'read-csv': set(),
                'add': {'read-csv'},
                'filter': {'add'}}

graph = HighLevelGraph(layers, dependencies)

这个对象满足 Mapping 接口,因此作为一个普通的 Python 字典操作,它是底层层的语义合并:

>>> len(graph)
12
>>> graph[('read-csv', 0)]
('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),

API

class dask.highlevelgraph.HighLevelGraph(layers: collections.abc.Mapping[str, collections.abc.Mapping[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any]], dependencies: collections.abc.Mapping[str, set[str]], key_dependencies: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], set[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]]] | None = None)[源代码]

由依赖子图层组成的任务图

这个对象编码了一个Dask任务图,该图由依赖的子图层组成,这种情况通常在使用Dask数组、包或数据帧等高级集合构建任务图时出现。

通常,每个高级数组、包或数据帧操作都会获取输入集合的任务图,合并它们,然后为新操作添加一个或多个新的任务层。这些层通常至少有与集合中的分区或块一样多的任务。HighLevelGraph 对象将每个操作的子图分别存储在子图中,并存储它们之间的依赖结构。

参数
图层Mapping[str, Mapping]

子图层,由唯一名称键控

依赖项Mapping[str, set[str]]

每个图层所依赖的图层集合

关键依赖dict[键, 集合], 可选

将(某些)高层图中的键映射到它们的依赖项。如果缺少某个键,其依赖项将按需计算。

参见

HighLevelGraph.from_collections

通常由开发人员用来创建新的 HighLevelGraphs

示例

以下是一个理想化的示例,展示了 HighLevelGraph 的内部状态

>>> import dask.dataframe as dd
>>> df = dd.read_csv('myfile.*.csv')  
>>> df = df + 100  
>>> df = df[df.name == 'Alice']  
>>> graph = df.__dask_graph__()  
>>> graph.layers  
{
 'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
              ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
              ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
              ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},
 'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
         ('add', 1): (operator.add, ('read-csv', 1), 100),
         ('add', 2): (operator.add, ('read-csv', 2), 100),
         ('add', 3): (operator.add, ('read-csv', 3), 100)}
 'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
            ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
            ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
            ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
}
>>> graph.dependencies  
{
 'read-csv': set(),
 'add': {'read-csv'},
 'filter': {'add'}
}
cull(keys: collections.abc.Iterable[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]]) dask.highlevelgraph.HighLevelGraph[源代码]

返回仅包含计算键所需任务的新 HighLevelGraph。

换句话说,从 dask 中移除不必要的任务。

参数

键的可迭代对象或键的嵌套列表,例如 __dask_keys__() 的输出

返回
hlg: HighLevelGraph

筛选的高层图

cull_layers(layers: collections.abc.Iterable[str]) dask.highlevelgraph.HighLevelGraph[源代码]

返回一个新的 HighLevelGraph,仅包含给定的层及其依赖项。在内部,层不会被修改。

这是 HighLevelGraph.cull() 的一个变体,速度更快,并且在两个筛选后的图表合并时,不会在两个具有相同名称和不同内容的层之间产生冲突。

返回
hlg: HighLevelGraph

筛选的高层图

classmethod from_collections(name: str, layer: collections.abc.Mapping[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any], dependencies: collections.abc.Sequence[dask.typing.DaskCollection] = ()) dask.highlevelgraph.HighLevelGraph[源代码]

从新层和一组集合构建一个 HighLevelGraph

这在常见情况下构建了一个 HighLevelGraph,其中我们有一个新的层和一组我们希望依赖的旧集合。

这会提取集合中的 __dask_layers__() 方法(如果存在),并将它们添加到此新层的依赖项中。它还将所有依赖集合的所有层合并到此图的新层中。

参数
名称str

新层的名称

映射

图层本身

依赖项Dask 集合列表

其他 dask 集合(如数组或数据框)的列表,它们本身具有图结构

示例

在典型用法中,我们创建一个新的任务层,然后将该层及其所有依赖集合传递给此方法。

>>> def add(self, other):
...     name = 'add-' + tokenize(self, other)
...     layer = {(name, i): (add, input_key, other)
...              for i, input_key in enumerate(self.__dask_keys__())}
...     graph = HighLevelGraph.from_collections(name, layer, dependencies=[self])
...     return new_collection(name, graph)
get(k[, d]) D[k] if k in D, else d.  d defaults to None.
get_all_dependencies() dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], set[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]]][源代码]

获取所有键的依赖项

在大多数情况下,这将实现所有层,这使得它成为一个昂贵的操作。

返回
map: 映射

一个映射,将每个键映射到其依赖项

get_all_external_keys() set[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]][源代码]

获取所有层的所有输出键

在大多数情况下,这 _不会_ 实现任何层,这使得它成为一个相对廉价的操作。

返回
键:集合

所有外部键的集合

items() a set-like object providing a view on D's items[源代码]
keys() collections.abc.KeysView[源代码]

获取所有层的所有键。

这在许多情况下会实现层级,这使得它成为一个相对昂贵的操作。参见 get_all_external_keys() 以获取更快的替代方案。

to_dict() dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any][源代码]

高效转换为普通字典。此方法比 dict(self) 更快。

values() an object providing a view on D's values[源代码]