10分钟入门Dask

这是面向新用户的 Dask 简要概述。文档的其他部分包含了更多信息。

Dask 概述。Dask 由三部分组成:集合、任务图和调度器。

高级集合用于生成任务图,这些任务图可以由调度器在单台机器或集群上执行。

我们通常如下导入 Dask:

>>> import numpy as np
>>> import pandas as pd

>>> import dask.dataframe as dd
>>> import dask.array as da
>>> import dask.bag as db

根据你所处理的数据类型,你可能不需要所有这些。

创建一个 Dask 对象

你可以通过提供现有数据并可选地包含关于块应如何结构化的信息,从头开始创建一个 Dask 对象。

参见 数据框

>>> index = pd.date_range("2021-09-01", periods=2400, freq="1h")
... df = pd.DataFrame({"a": np.arange(2400), "b": list("abcaddbe" * 300)}, index=index)
... ddf = dd.from_pandas(df, npartitions=10)
... ddf
Dask DataFrame Structure:
                        a       b
npartitions=10
2021-09-01 00:00:00  int64  object
2021-09-11 00:00:00    ...     ...
...                    ...     ...
2021-11-30 00:00:00    ...     ...
2021-12-09 23:00:00    ...     ...
Dask Name: from_pandas, 10 tasks

现在我们有一个包含2列和2400行的Dask DataFrame,它由10个分区组成,每个分区有240行。每个分区代表数据的一部分。

以下是DataFrame的一些关键属性:

>>> # check the index values covered by each partition
... ddf.divisions
(Timestamp('2021-09-01 00:00:00', freq='H'),
Timestamp('2021-09-11 00:00:00', freq='H'),
Timestamp('2021-09-21 00:00:00', freq='H'),
Timestamp('2021-10-01 00:00:00', freq='H'),
Timestamp('2021-10-11 00:00:00', freq='H'),
Timestamp('2021-10-21 00:00:00', freq='H'),
Timestamp('2021-10-31 00:00:00', freq='H'),
Timestamp('2021-11-10 00:00:00', freq='H'),
Timestamp('2021-11-20 00:00:00', freq='H'),
Timestamp('2021-11-30 00:00:00', freq='H'),
Timestamp('2021-12-09 23:00:00', freq='H'))

>>> # access a particular partition
... ddf.partitions[1]
Dask DataFrame Structure:
                  a       b
npartitions=1
2021-09-11     int64  object
2021-09-21       ...     ...
Dask Name: blocks, 11 tasks

参见 数组

import numpy as np
import dask.array as da

data = np.arange(100_000).reshape(200, 500)
a = da.from_array(data, chunks=(100, 100))
a
Array Chunk
Bytes 781.25 kiB 78.12 kiB
Shape (200, 500) (100, 100)
Dask graph 10 chunks in 1 graph layer
Data type int64 numpy.ndarray
500 200

现在我们有一个形状为 (200, 500) 的二维数组,它由10个块组成,每个块的形状为 (100, 100)。每个块代表数据的一部分。

以下是Dask数组的一些关键属性:

# inspect the chunks
a.chunks
((100, 100), (100, 100, 100, 100, 100))
# access a particular block of data
a.blocks[1, 3]
Array Chunk
Bytes 78.12 kiB 78.12 kiB
Shape (100, 100) (100, 100)
Dask graph 1 chunks in 2 graph layers
Data type int64 numpy.ndarray
100 100

参见

>>> b = db.from_sequence([1, 2, 3, 4, 5, 6, 2, 1], npartitions=2)
... b
dask.bag<from_sequence, npartitions=2>

现在我们有一个包含8个项目的序列,由2个分区组成,每个分区有4个项目。每个分区代表数据的一部分。

索引

索引 Dask 集合感觉就像切片 NumPy 数组或 pandas DataFrame。

>>> ddf.b
Dask Series Structure:
npartitions=10
2021-09-01 00:00:00    object
2021-09-11 00:00:00       ...
                        ...
2021-11-30 00:00:00       ...
2021-12-09 23:00:00       ...
Name: b, dtype: object
Dask Name: getitem, 20 tasks

>>> ddf["2021-10-01": "2021-10-09 5:00"]
Dask DataFrame Structure:
                                 a       b
npartitions=1
2021-10-01 00:00:00.000000000  int64  object
2021-10-09 05:00:59.999999999    ...     ...
Dask Name: loc, 11 tasks
a[:50, 200]
Array Chunk
Bytes 400 B 400 B
Shape (50,) (50,)
Dask graph 1 chunks in 2 graph layers
Data type int64 numpy.ndarray
50 1

Bag 是一个允许重复的无序集合。所以它像一个列表,但不保证元素之间的顺序。由于 Bag 是无序的,因此无法对其进行索引。

计算

Dask 是惰性求值的。计算结果在你请求之前不会被计算。相反,会生成一个用于计算的 Dask 任务图。

任何时候你有一个 Dask 对象并且你想获取结果,调用 compute:

>>> ddf["2021-10-01": "2021-10-09 5:00"].compute()
                     a  b
2021-10-01 00:00:00  720  a
2021-10-01 01:00:00  721  b
2021-10-01 02:00:00  722  c
2021-10-01 03:00:00  723  a
2021-10-01 04:00:00  724  d
...                  ... ..
2021-10-09 01:00:00  913  b
2021-10-09 02:00:00  914  c
2021-10-09 03:00:00  915  a
2021-10-09 04:00:00  916  d
2021-10-09 05:00:00  917  d

[198 rows x 2 columns]
>>> a[:50, 200].compute()
array([  200,   700,  1200,  1700,  2200,  2700,  3200,  3700,  4200,
      4700,  5200,  5700,  6200,  6700,  7200,  7700,  8200,  8700,
      9200,  9700, 10200, 10700, 11200, 11700, 12200, 12700, 13200,
      13700, 14200, 14700, 15200, 15700, 16200, 16700, 17200, 17700,
      18200, 18700, 19200, 19700, 20200, 20700, 21200, 21700, 22200,
      22700, 23200, 23700, 24200, 24700])
>>> b.compute()
[1, 2, 3, 4, 5, 6, 2, 1]

方法

Dask 集合匹配现有的 numpy 和 pandas 方法,因此它们应该感觉很熟悉。调用该方法来设置任务图,然后调用 compute 来获取结果。

>>> ddf.a.mean()
dd.Scalar<series-..., dtype=float64>

>>> ddf.a.mean().compute()
1199.5

>>> ddf.b.unique()
Dask Series Structure:
npartitions=1
   object
      ...
Name: b, dtype: object
Dask Name: unique-agg, 33 tasks

>>> ddf.b.unique().compute()
0    a
1    b
2    c
3    d
4    e
Name: b, dtype: object

方法可以像在 pandas 中一样链接在一起

>>> result = ddf["2021-10-01": "2021-10-09 5:00"].a.cumsum() - 100
... result
Dask Series Structure:
npartitions=1
2021-10-01 00:00:00.000000000    int64
2021-10-09 05:00:59.999999999      ...
Name: a, dtype: int64
Dask Name: sub, 16 tasks

>>> result.compute()
2021-10-01 00:00:00       620
2021-10-01 01:00:00      1341
2021-10-01 02:00:00      2063
2021-10-01 03:00:00      2786
2021-10-01 04:00:00      3510
                        ...
2021-10-09 01:00:00    158301
2021-10-09 02:00:00    159215
2021-10-09 03:00:00    160130
2021-10-09 04:00:00    161046
2021-10-09 05:00:00    161963
Freq: H, Name: a, Length: 198, dtype: int64
>>> a.mean()
dask.array<mean_agg-aggregate, shape=(), dtype=float64, chunksize=(), chunktype=numpy.ndarray>

>>> a.mean().compute()
49999.5

>>> np.sin(a)
dask.array<sin, shape=(200, 500), dtype=float64, chunksize=(100, 100), chunktype=numpy.ndarray>

>>> np.sin(a).compute()
array([[ 0.        ,  0.84147098,  0.90929743, ...,  0.58781939,
         0.99834363,  0.49099533],
      [-0.46777181, -0.9964717 , -0.60902011, ..., -0.89796748,
      -0.85547315, -0.02646075],
      [ 0.82687954,  0.9199906 ,  0.16726654, ...,  0.99951642,
         0.51387502, -0.4442207 ],
      ...,
      [-0.99720859, -0.47596473,  0.48287891, ..., -0.76284376,
         0.13191447,  0.90539115],
      [ 0.84645538,  0.00929244, -0.83641393, ...,  0.37178568,
      -0.5802765 , -0.99883514],
      [-0.49906936,  0.45953849,  0.99564877, ...,  0.10563876,
         0.89383946,  0.86024828]])

>>> a.T
dask.array<transpose, shape=(500, 200), dtype=int64, chunksize=(100, 100), chunktype=numpy.ndarray>

>>> a.T.compute()
array([[    0,   500,  1000, ..., 98500, 99000, 99500],
      [    1,   501,  1001, ..., 98501, 99001, 99501],
      [    2,   502,  1002, ..., 98502, 99002, 99502],
      ...,
      [  497,   997,  1497, ..., 98997, 99497, 99997],
      [  498,   998,  1498, ..., 98998, 99498, 99998],
      [  499,   999,  1499, ..., 98999, 99499, 99999]])

方法可以像在 NumPy 中一样链式调用

>>> b = a.max(axis=1)[::-1] + 10
... b
dask.array<add, shape=(200,), dtype=int64, chunksize=(100,), chunktype=numpy.ndarray>

>>> b[:10].compute()
array([100009,  99509,  99009,  98509,  98009,  97509,  97009,  96509,
      96009,  95509])

Dask Bag 在通用 Python 对象的集合上实现了 mapfilterfoldgroupby 等操作。

>>> b.filter(lambda x: x % 2)
dask.bag<filter-lambda, npartitions=2>

>>> b.filter(lambda x: x % 2).compute()
[1, 3, 5, 1]

>>> b.distinct()
dask.bag<distinct-aggregate, npartitions=1>

>>> b.distinct().compute()
[1, 2, 3, 4, 5, 6]

方法可以链式调用。

>>> c = db.zip(b, b.map(lambda x: x * 10))
... c
dask.bag<zip, npartitions=2>

>>> c.compute()
[(1, 10), (2, 20), (3, 30), (4, 40), (5, 50), (6, 60), (2, 20), (1, 10)]

可视化任务图

到目前为止,我们已经设置了一些计算并调用了 compute。除了触发计算外,我们还可以检查任务图以了解发生了什么。

>>> result.dask
HighLevelGraph with 7 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7f129df7a9d0>
1. from_pandas-0b850a81e4dfe2d272df4dc718065116
2. loc-fb7ada1e5ba8f343678fdc54a36e9b3e
3. getitem-55d10498f88fc709e600e2c6054a0625
4. series-cumsum-map-131dc242aeba09a82fea94e5442f3da9
5. series-cumsum-take-last-9ebf1cce482a441d819d8199eac0f721
6. series-cumsum-d51d7003e20bd5d2f767cd554bdd5299
7. sub-fed3e4af52ad0bd9c3cc3bf800544f57

>>> result.visualize()
Dask 数据帧计算的任务图。任务图显示了在应用累积和 "cumsum" 操作之前,通过 "loc" 和 "getitem" 操作选择数据帧值的一小部分,最后从结果中减去一个值。
>>> b.dask
HighLevelGraph with 6 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7fd33a4aa400>
1. array-ef3148ecc2e8957c6abe629e08306680
2. amax-b9b637c165d9bf139f7b93458cd68ec3
3. amax-partial-aaf8028d4a4785f579b8d03ffc1ec615
4. amax-aggregate-07b2f92aee59691afaf1680569ee4a63
5. getitem-f9e225a2fd32b3d2f5681070d2c3d767
6. add-f54f3a929c7efca76a23d6c42cdbbe84

>>> b.visualize()
Dask 数组计算的任务图。任务图显示了在 Dask 数组的每个块上进行多次 "amax" 操作,然后聚合以沿第一个数组轴找到 "amax",接着通过 "getitem" 切片操作反转数组值的顺序,最后进行 "add" 操作以获得最终结果。
>>> c.dask
HighLevelGraph with 3 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7f96d0814fd0>
1. from_sequence-cca2a33ba6e12645a0c9bc0fd3fe6c88
2. lambda-93a7a982c4231fea874e07f71b4bcd7d
3. zip-474300792cc4f502f1c1f632d50e0272

>>> c.visualize()
Dask 任务图用于 Dask 包的计算。任务图展示了一个“lambda”操作,然后对 Dask 包的分区应用了“zip”操作。包的分区之间不需要通信,这是一个非常适合并行计算的例子。

低级接口

在并行化现有代码库或构建自定义算法时,经常会遇到可以并行化的代码,但这些代码不仅仅是大数据框或数组。

Dask 延迟 允许你将单个函数调用包装成一个延迟构造的任务图:

import dask

@dask.delayed
def inc(x):
   return x + 1

@dask.delayed
def add(x, y):
   return x + y

a = inc(1)       # no work has happened yet
b = inc(2)       # no work has happened yet
c = add(a, b)    # no work has happened yet

c = c.compute()  # This triggers all of the above computations

与目前描述的接口不同,Future 是急切的。一旦提交函数,计算就会立即开始(参见 未来)。

from dask.distributed import Client

client = Client()

def inc(x):
   return x + 1

def add(x, y):
   return x + y

a = client.submit(inc, 1)     # work starts immediately
b = client.submit(inc, 2)     # work starts immediately
c = client.submit(add, a, b)  # work starts immediately

c = c.result()                # block until work finishes, then gather result

备注

期货只能用于分布式集群。更多信息请参见下文。

调度

在生成了任务图之后,调度器的工作就是执行它(参见 调度)。

默认情况下,对于大多数 Dask API,当你在 Dask 对象上调用 compute 时,Dask 会使用你计算机上的线程池(即线程调度器)来并行运行计算。这适用于 Dask 数组Dask DataFrameDask Delayed。例外的是 Dask Bag,它默认使用多进程调度器。

如果你想获得更多控制,请使用分布式调度器。尽管名字中有“分布式”,分布式调度器在单机和多机上都表现良好。可以把它看作是“高级调度器”。

这是如何设置一个仅使用您自己计算机的集群。

>>> from dask.distributed import Client
...
... client = Client()
... client
<Client: 'tcp://127.0.0.1:41703' processes=4 threads=12, memory=31.08 GiB>

这是如何连接到一个已经在运行的集群。

>>> from dask.distributed import Client
...
... client = Client("<url-of-scheduler>")
... client
<Client: 'tcp://127.0.0.1:41703' processes=4 threads=12, memory=31.08 GiB>

设置远程集群有多种方式。更多信息请参考 如何部署dask集群

一旦你创建了一个客户端,任何计算将在它所指向的集群上运行。

诊断

在使用分布式集群时,Dask 提供了一个诊断仪表盘,您可以在其中查看任务的处理情况。

>>> client.dashboard_link
'http://127.0.0.1:8787/status'

要了解更多关于这些图表的信息,请查看 仪表盘诊断