硬件故障的韧性
内容
实时笔记本
你可以在 live session 中运行此笔记本,或查看 Github 上的内容。
硬件故障的韧性¶
场景:我们有一个部分由抢占式资源组成的集群。也就是说,我们必须处理在计算过程中突然关闭的工作节点。虽然这里用 LocalCluster
进行了演示,但 Dask 对抢占式资源的弹性在例如 Dask Kubernetes 或 Dask Jobqueue 中最为有用。
相关文档: http://distributed.dask.org/en/latest/resilience.html#hardware-failures
提高韧性¶
每当一个工作节点关闭时,调度器将增加 所有 分配给该工作节点(不一定是正在计算的)的任务的可疑计数器。每当任务的可疑度超过某个阈值(默认为3)时,该任务将被视为损坏。我们希望在只有少数工作节点的情况下计算许多任务,并且工作节点会随机关闭。因此,我们预计所有任务的可疑度会迅速增长。让我们提高阈值:
[ ]:
import dask
dask.config.set({'distributed.scheduler.allowed-failures': 100});
所有其他导入¶
[ ]:
from dask.distributed import Client, LocalCluster
from dask import bag as db
import os
import random
from time import sleep
一个集群¶
[ ]:
cluster = LocalCluster(threads_per_worker=1, n_workers=4, memory_limit=400e6)
client = Client(cluster)
client
一个简单的负载¶
我们将把一系列数字乘以二,添加一些睡眠以模拟一些实际工作,然后将整个加倍数字序列通过求和来减少。
[ ]:
def multiply_by_two(x):
sleep(0.02)
return 2 * x
[ ]:
N = 400
x = db.from_sequence(range(N), npartitions=N // 2)
mults = x.map(multiply_by_two)
summed = mults.sum()
突然关闭工人¶
让我们将两个工作进程的ID标记为不可抢占。
[ ]:
all_current_workers = [w.pid for w in cluster.scheduler.workers.values()]
non_preemptible_workers = all_current_workers[:2]
[ ]:
def kill_a_worker():
preemptible_workers = [
w.pid for w in cluster.scheduler.workers.values()
if w.pid not in non_preemptible_workers]
if preemptible_workers:
os.kill(random.choice(preemptible_workers), 15)
开始计算并在运行时持续关闭工作节点¶
[ ]:
summed = client.compute(summed)
while not summed.done():
kill_a_worker()
sleep(3.0)
检查结果是否匹配¶
[ ]:
print(f"`sum(range({N}))` on cluster: {summed.result()}\t(should be {N * (N-1)})")