数据局部性
内容
数据局部性¶
数据移动常常不必要地限制了性能。
这对于分析计算尤其如此。Dask.distributed 尽可能减少数据移动,并在必要时让用户能够控制。本文档描述了当前的调度策略和围绕数据局部性的用户API。
当前政策¶
任务提交¶
在常见的分布式运行情况下,任务会在已经持有依赖数据的工人上执行。如果你有一个任务 f(x)
需要一些数据 x
,那么这个任务很可能会在已经持有 x
的工人上运行。
如果一个任务需要将数据分配给多个工作节点,那么调度器会选择在需要最少数据传输的工作节点上运行该任务。每个数据元素的大小由工作节点使用 sys.getsizeof
函数测量,该函数依赖于通常在大多数相关Python对象上可用的 __sizeof__
协议。
数据分散¶
当用户将数据从本地进程分散到分布式网络时,这些数据会以轮询方式按核心数量进行分组。例如,如果我们有两个工作节点 Alice
和 Bob
,每个节点有两个核心,并且我们将列表 range(10)
分散出去,如下所示:
futures = client.scatter(range(10))
然后 Alice 和 Bob 收到以下数据
Alice:
[0, 1, 4, 5, 8, 9]
鲍勃:
[2, 3, 6, 7]
用户控制¶
复杂的算法可能需要更多的用户控制。
例如,专用硬件(如GPU或数据库连接)的存在可能会限制特定任务的有效工作者的集合。
在这些情况下,使用 submit
、map
或 scatter
函数的 workers=
关键字参数,提供主机名、IP 地址或别名,如下所示:
future = client.submit(func, *args, workers=['Alice'])
Alice:
[0, 1, 4, 5, 8, 9, new_result]
鲍勃:
[2, 3, 6, 7]
所需数据将始终移动到这些工作节点,即使该数据量很大。如果此限制仅是偏好而非严格要求,则添加 allow_other_workers
关键字参数以表明在极端情况下,例如当没有有效工作节点时,可以使用其他工作节点。
future = client.submit(func, *args, workers=['Alice'],
allow_other_workers=True)
此外,scatter
函数支持 broadcast=
关键字参数,以强制所有数据发送给所有工作节点,而不是轮询发送。如果有新的工作节点加入,它们不会自动接收这些数据。
futures = client.scatter([1, 2, 3], broadcast=True) # send data to all workers
Alice:
[1, 2, 3]
鲍勃:
[1, 2, 3]
workers=
的有效参数包括以下内容:
单个IP地址、IP/端口对,或类似以下的主机名:
192.168.1.100, 192.168.1.100:8989, alice, alice:8989
上述内容的列表或集合:
['alice'], ['192.168.1.100', '192.168.1.101:9999']
如果只提供主机名或IP地址,那么该机器上的任何工作节点都将被视为有效。此外,您可以在创建工作节点时为其提供别名。:
$ dask worker scheduler_address:8786 --name worker_1
然后在指定工作者时使用此名称。
client.map(func, sequence, workers='worker_1')
使用 Compute/Persist 指定工作器¶
在 scatter
、submit
和 map
中的 workers=
关键字相当直接,接受一个工作节点主机名、主机:端口对或这些的序列作为有效输入:
client.submit(f, x, workers='127.0.0.1')
client.submit(f, x, workers='127.0.0.1:55852')
client.submit(f, x, workers=['192.168.1.101', '192.168.1.100'])
对于更复杂的计算,例如在使用 dask.dataframe 或 dask.delayed 等 dask 集合时,我们有时希望指定某些计算部分在某些工作节点上运行,而其他部分在其他工作节点上运行。
x = delayed(f)(1)
y = delayed(f)(2)
z = delayed(g)(x, y)
future = client.compute(z, workers={z: '127.0.0.1',
x: '192.168.0.1'})
这里字典的值与之前的形式相同,可以是主机、主机:端口对,或这些的列表。在这种情况下,键要么是dask集合,要么是dask集合的元组。这些集合的所有*最终*键将在指定的机器上运行;依赖项可以在任何地方运行,除非它们也被列在``workers=``中。我们通过一组示例来探讨这一点:
计算 z = f(x, y)
在主机 127.0.0.1
上运行。另外两个计算 x
和 y
可以在任何地方运行。
future = client.compute(z, workers={z: '127.0.0.1'})
z
和 x
的计算必须在 127.0.0.1
上运行
future = client.compute(z, workers={z: '127.0.0.1',
x: '127.0.0.1'})
使用元组来分组集合。这是上述内容的简写。
future = client.compute(z, workers={(x, y): '127.0.0.1'})
请记住,scatter/submit/map
中 workers=
的所有选项在这里也同样适用。
future = client.compute(z, workers={(x, y): ['192.168.1.100', '192.168.1.101:9999']})
将 allow_other_workers=True
设置为使这些限制变为宽松而非硬性要求。
future = client.compute(z, workers={(x, y): '127.0.0.1'},
allow_other_workers=True)
提供一个集合给 allow_other_workers=[...]
来说明只有部分集合的键是宽松的。在下面的例子中,z
必须 在 127.0.0.1
上运行,而 x
应该 在 127.0.0.1
上运行,但如果需要也可以在其他地方运行:
future = client.compute(z, workers={(x, y): '127.0.0.1'},
allow_other_workers=[x])
这可以很好地与 persist
以及任何 dask 集合(任何具有 .__dask_graph__()
方法的对象)一起工作:
df = dd.read_csv('s3://...')
df = client.persist(df, workers={df: ...})
请参阅 效率 页面以了解最佳实践。