用户界面

Dask 支持多种用户接口:

这些用户界面中的每一个都使用了相同的底层并行计算机制,因此具有相同的扩展性、诊断、恢复能力等,但每个界面提供了不同的并行算法和编程风格。

本文档帮助您决定哪个用户界面最适合您的需求,并提供一些适用于所有界面的通用信息。上述链接的页面提供了关于每个界面更深入的信息。

高级集合

许多开始使用 Dask 的人明确地寻找 NumPy、Pandas 或 Scikit-Learn 的可扩展版本。对于这些情况,Dask 中的起点通常相当明确。如果你想使用可扩展的 NumPy 数组,那么从 Dask 数组开始;如果你想使用可扩展的 Pandas DataFrame,那么从 Dask DataFrame 开始,以此类推。

这些高级接口在标准接口的基础上进行了轻微的改动。这些接口会自动对大型数据集进行并行处理,适用于原始项目API中的大部分功能。

# Arrays
import dask.array as da
rng = da.random.default_rng()
x = rng.uniform(low=0, high=10, size=(10000, 10000),  # normal numpy code
                chunks=(1000, 1000))  # break into chunks of size 1000x1000

y = x + x.T - x.mean(axis=0)  # Use normal syntax for high level algorithms

# DataFrames
import dask.dataframe as dd
df = dd.read_csv('2018-*-*.csv', parse_dates='timestamp',  # normal Pandas code
                 blocksize=64000000)  # break text into 64MB chunks

s = df.groupby('name').balance.mean()  # Use normal syntax for high level algorithms

# Bags / lists
import dask.bag as db
b = db.read_text('*.json').map(json.loads)
total = (b.filter(lambda d: d['name'] == 'Alice')
          .map(lambda d: d['balance'])
          .sum())

重要的是要记住,尽管API可能相似,但仍存在一些差异。此外,由于并行编程的优缺点,某些算法的性能可能与其内存中的对应算法不同。在使用Dask时,仍然需要一些思考和关注。

低级接口

在并行化现有代码库或构建自定义算法时,经常会遇到可以并行化但不仅仅是大数据框或数组的代码。考虑下面的for循环代码:

results = []
for a in A:
    for b in B:
        if a < b:
            c = f(a, b)
        else:
            c = g(a, b)
        results.append(c)

这段代码中存在潜在的并行性(对 fg 的多次调用可以并行执行),但不清楚如何将其重写为一个大型数组或DataFrame,以便使用更高层次的API。即使你能将其重写为这些范式之一,也不清楚这是否是一个好主意。在转换过程中,很可能会丢失大部分意义,而且对于更复杂的系统,这个过程会变得更加困难。

相反,Dask 的低级 API 允许你在现有的 for 循环上下文中一次编写一个函数调用的并行代码。一个常见的解决方案是使用 Dask 延迟 将单个函数调用包装成一个惰性构建的任务图:

import dask

lazy_results = []
for a in A:
    for b in B:
        if a < b:
            c = dask.delayed(f)(a, b)  # add lazy task
        else:
            c = dask.delayed(g)(a, b)  # add lazy task
        lazy_results.append(c)

results = dask.compute(*lazy_results)  # compute all in parallel

结合高层次和低层次接口

通常会将高层次和低层次的接口结合使用。例如,你可能会使用 Dask 数组/包/数据帧来加载数据并进行初步预处理,然后切换到 Dask 延迟执行以实现特定于你领域的自定义算法,然后再切换回 Dask 数组/数据帧来清理和存储结果。理解这两组用户接口,以及如何在这些接口之间切换,可以是一种高效的组合。

# Convert to a list of delayed Pandas dataframes
delayed_values = df.to_delayed()

# Manipulate delayed values arbitrarily as you like

# Convert many delayed Pandas DataFrames back to a single Dask DataFrame
df = dd.from_delayed(delayed_values)

懒惰与计算

大多数 Dask 用户界面都是 惰性 的,这意味着它们不会在您使用 compute 方法明确请求结果之前进行评估:

# This array syntax doesn't cause computation
y = x + x.T - x.mean(axis=0)

# Trigger computation by explicitly calling the compute method
y = y.compute()

如果你有多个想要同时计算的结果,使用 dask.compute 函数。这可以共享中间结果,从而更高效:

# compute multiple results at the same time with the compute function
min, max = dask.compute(y.min(), y.max())

注意 compute() 函数返回内存中的结果。它将 Dask DataFrame 转换为 Pandas DataFrame,Dask 数组转换为 NumPy 数组,Dask 包转换为列表。你应该只在结果能够舒适地适应内存时调用 compute。如果你的结果不适合内存,那么你可能需要考虑将其写入磁盘。

# Write larger results out to disk rather than store them in memory
my_dask_dataframe.to_parquet('myfile.parquet')
my_dask_array.to_hdf5('myfile.hdf5')
my_dask_bag.to_textfiles('myfile.*.txt')

持久化到分布式内存

警告

persist 会将完整的数据集存储在内存中。这有一个缺点,即可用内存必须实际超过数据集的大小。仅在交互式迭代同一数据集多次时使用 persist,并尽可能避免在生产用例中使用它。

或者,如果你在一个集群上,那么你可能想要触发一个计算并将结果存储在分布式内存中。在这种情况下,你不希望调用 compute,因为它会创建一个单独的 Pandas、NumPy 或列表结果。相反,你希望调用 persist,它返回一个新的 Dask 对象,该对象指向正在计算或已经计算的结果,这些结果分布在你的集群的内存中。

# Compute returns an in-memory non-Dask object
y = y.compute()

# Persist returns an in-memory Dask object that uses distributed storage if available
y = y.persist()

这在数据加载和预处理步骤之后常见,但在快速迭代、探索或复杂算法之前。例如,我们可能会读取大量数据,筛选到一个更易管理的子集,然后将数据持久化到内存中,以便我们可以快速迭代。

import dask.dataframe as dd
df = dd.read_parquet('...')
df = df[df.name == 'Alice']  # select important subset of data
df = df.persist()  # trigger computation in the background

# These are all relatively fast now that the relevant data is in memory
df.groupby(df.id).balance.sum().compute()   # explore data quickly
df.groupby(df.id).balance.mean().compute()  # explore data quickly
df.id.nunique()                             # explore data quickly

懒惰 vs 立即

如上所述,大多数 Dask 工作负载是惰性的,也就是说,它们不会开始任何工作,直到你通过调用 compute() 显式触发它们。然而,有时你确实希望尽快提交工作,随着时间的推移跟踪它,根据部分结果提交新工作或取消工作等。这在跟踪或响应实时事件、处理流数据或构建复杂且自适应的算法时非常有用。

在这些情况下,人们通常会转向 futures 接口,这是一个类似于 Dask delayed 的低级接口,但它会立即执行而不是延迟执行。

以下是使用 Dask delayed 和 Dask futures 的相同示例,以说明两者之间的区别。

延迟: 惰性

@dask.delayed
def inc(x):
    return x + 1

@dask.delayed
def add(x, y):
    return x + y

a = inc(1)       # no work has happened yet
b = inc(2)       # no work has happened yet
c = add(a, b)    # no work has happened yet

c = c.compute()  # This triggers all of the above computations

未来:立即

from dask.distributed import Client
client = Client()

def inc(x):
    return x + 1

def add(x, y):
    return x + y

a = client.submit(inc, 1)     # work starts immediately
b = client.submit(inc, 2)     # work starts immediately
c = client.submit(add, a, b)  # work starts immediately

c = c.result()                # block until work finishes, then gather result

你也可以使用 persist 函数来触发高级集合的工作。这将在使用分布式调度器时在后台进行工作。

结合接口

有几种已确立的方法来组合上述接口:

  1. 高级接口(数组、包、数据框)有一个 to_delayed 方法,可以将它们转换为 Dask 延迟对象的序列(或网格)。

    delayeds = df.to_delayed()
    
  2. 高级接口(数组、包、数据框)有一个 from_delayed 方法,可以将延迟对象 未来对象转换。

    df = dd.from_delayed(delayeds)
    df = dd.from_delayed(futures)
    
  3. Client.compute 方法将 Delayed 对象转换为 Futures

    futures = client.compute(delayeds)
    
  4. dask.distributed.futures_of 函数从持久化集合中收集未来对象

    from dask.distributed import futures_of
    
    df = df.persist()  # start computation in the background
    futures = futures_of(df)
    
  5. Dask.delayed 对象将 Futures 转换为延迟对象

    delayed_value = dask.delayed(future)
    

上述方法应该足以将任何接口转换为任何其他接口。我们经常看到一些效果不佳的反模式:

  1. 在高层次对象(如Dask数组或DataFrame)上调用低层次API(延迟或未来)。这会将这些对象降级为它们的NumPy或Pandas等价物,这可能不是期望的。通常人们寻找的是像 dask.array.map_blocksdask.dataframe.map_partitions 这样的API。

  2. 在 Future 对象上调用 compute()。通常人们想要的是 .result() 方法。

  3. 在高级 Dask 对象上调用 NumPy/Pandas 函数,或在 NumPy/Pandas 对象上调用高级 Dask 函数

结论

大多数使用 Dask 的人一开始只使用上述接口之一,但最终学会了如何同时使用几个接口。这帮助他们利用高级接口中的复杂算法,同时也能通过低级接口解决棘手的问题。

更多信息,请参阅以下特定用户界面的文档: