数据局部性

数据移动常常不必要地限制了性能。

这对于分析计算尤其如此。Dask.distributed 尽可能减少数据移动,并在必要时让用户能够控制。本文档描述了当前的调度策略和围绕数据局部性的用户API。

当前政策

任务提交

在常见的分布式运行情况下,任务会在已经持有依赖数据的工人上执行。如果你有一个任务 f(x) 需要一些数据 x,那么这个任务很可能会在已经持有 x 的工人上运行。

如果一个任务需要将数据分配给多个工作节点,那么调度器会选择在需要最少数据传输的工作节点上运行该任务。每个数据元素的大小由工作节点使用 sys.getsizeof 函数测量,该函数依赖于通常在大多数相关Python对象上可用的 __sizeof__ 协议。

数据分散

当用户将数据从本地进程分散到分布式网络时,这些数据会以轮询方式按核心数量进行分组。例如,如果我们有两个工作节点 AliceBob,每个节点有两个核心,并且我们将列表 range(10) 分散出去,如下所示:

futures = client.scatter(range(10))

然后 Alice 和 Bob 收到以下数据

  • Alice: [0, 1, 4, 5, 8, 9]

  • 鲍勃: [2, 3, 6, 7]

用户控制

复杂的算法可能需要更多的用户控制。

例如,专用硬件(如GPU或数据库连接)的存在可能会限制特定任务的有效工作者的集合。

在这些情况下,使用 submitmapscatter 函数的 workers= 关键字参数,提供主机名、IP 地址或别名,如下所示:

future = client.submit(func, *args, workers=['Alice'])
  • Alice: [0, 1, 4, 5, 8, 9, new_result]

  • 鲍勃: [2, 3, 6, 7]

所需数据将始终移动到这些工作节点,即使该数据量很大。如果此限制仅是偏好而非严格要求,则添加 allow_other_workers 关键字参数以表明在极端情况下,例如当没有有效工作节点时,可以使用其他工作节点。

future = client.submit(func, *args, workers=['Alice'],
                       allow_other_workers=True)

此外,scatter 函数支持 broadcast= 关键字参数,以强制所有数据发送给所有工作节点,而不是轮询发送。如果有新的工作节点加入,它们不会自动接收这些数据。

futures = client.scatter([1, 2, 3], broadcast=True)  # send data to all workers
  • Alice: [1, 2, 3]

  • 鲍勃: [1, 2, 3]

workers= 的有效参数包括以下内容:

  • 单个IP地址、IP/端口对,或类似以下的主机名:

    192.168.1.100, 192.168.1.100:8989, alice, alice:8989
    
  • 上述内容的列表或集合:

    ['alice'], ['192.168.1.100', '192.168.1.101:9999']
    

如果只提供主机名或IP地址,那么该机器上的任何工作节点都将被视为有效。此外,您可以在创建工作节点时为其提供别名。:

$ dask worker scheduler_address:8786 --name worker_1

然后在指定工作者时使用此名称。

client.map(func, sequence, workers='worker_1')

使用 Compute/Persist 指定工作器

scattersubmitmap 中的 workers= 关键字相当直接,接受一个工作节点主机名、主机:端口对或这些的序列作为有效输入:

client.submit(f, x, workers='127.0.0.1')
client.submit(f, x, workers='127.0.0.1:55852')
client.submit(f, x, workers=['192.168.1.101', '192.168.1.100'])

对于更复杂的计算,例如在使用 dask.dataframe 或 dask.delayed 等 dask 集合时,我们有时希望指定某些计算部分在某些工作节点上运行,而其他部分在其他工作节点上运行。

x = delayed(f)(1)
y = delayed(f)(2)
z = delayed(g)(x, y)

future = client.compute(z, workers={z: '127.0.0.1',
                                    x: '192.168.0.1'})

这里字典的值与之前的形式相同,可以是主机、主机:端口对,或这些的列表。在这种情况下,键要么是dask集合,要么是dask集合的元组。这些集合的所有*最终*键将在指定的机器上运行;依赖项可以在任何地方运行,除非它们也被列在``workers=``中。我们通过一组示例来探讨这一点:

计算 z = f(x, y) 在主机 127.0.0.1 上运行。另外两个计算 xy 可以在任何地方运行。

future = client.compute(z, workers={z: '127.0.0.1'})

zx 的计算必须在 127.0.0.1 上运行

future = client.compute(z, workers={z: '127.0.0.1',
                                    x: '127.0.0.1'})

使用元组来分组集合。这是上述内容的简写。

future = client.compute(z, workers={(x, y): '127.0.0.1'})

请记住,scatter/submit/mapworkers= 的所有选项在这里也同样适用。

future = client.compute(z, workers={(x, y): ['192.168.1.100', '192.168.1.101:9999']})

allow_other_workers=True 设置为使这些限制变为宽松而非硬性要求。

future = client.compute(z, workers={(x, y): '127.0.0.1'},
                        allow_other_workers=True)

提供一个集合给 allow_other_workers=[...] 来说明只有部分集合的键是宽松的。在下面的例子中,z 必须127.0.0.1 上运行,而 x 应该127.0.0.1 上运行,但如果需要也可以在其他地方运行:

future = client.compute(z, workers={(x, y): '127.0.0.1'},
                        allow_other_workers=[x])

这可以很好地与 persist 以及任何 dask 集合(任何具有 .__dask_graph__() 方法的对象)一起工作:

df = dd.read_csv('s3://...')
df = client.persist(df, workers={df: ...})

请参阅 效率 页面以了解最佳实践。