韧性

软件会失败,硬件会失败,网络连接会失败,用户代码会失败。本文档描述了 dask.distributed 在这些失败和其他已知错误面前的响应方式。

用户代码失败

当一个函数引发错误时,该错误会被保留并在请求时传输给客户端。任何尝试获取该结果 或任何依赖结果 的行为都会引发该异常。

>>> def div(a, b):
...     return a / b

>>> x = client.submit(div, 1, 0)
>>> x.result()
ZeroDivisionError: division by zero

>>> y = client.submit(add, x, 10)
>>> y.result()  # same error as above
ZeroDivisionError: division by zero

这不会以任何方式影响调度器或工作者的顺畅运行。

关闭的网络连接

如果与远程工作者的连接意外关闭,并且本地进程适当地引发 IOError,那么调度器将把所有待处理的计算重新路由到其他工作者。

如果丢失的工人是唯一持有未来计算所需的关键结果的工人,那么这些结果将由幸存的工人重新计算。调度器维护了每个结果如何产生的完整历史,因此能够在其他工人上重现相同的计算。

这有一些失败的情况。

  1. 如果结果依赖于不纯的函数,那么你可能会得到不同的(尽管仍然是完全准确的)结果

  2. 如果工作进程由于一个错误的函数而失败,例如一个导致段错误的函数,那么这个错误的函数将在其他工作进程上被重复调用。在它杀死一定数量的工作进程后(默认为三个),该函数将被标记为“错误”。

  3. 通过调用 scatter() 直接发送给工作者的数据(而不是通过其他 Dask 函数从 Dask 任务图中创建的数据)不会保存在调度器中,因为这些数据通常非常大,因此这些数据的丢失是不可恢复的。您可能希望使用适当的复制因子对数据调用 replicate(),以确保数据长期存在,或者将数据备份到某些弹性存储中,如文件系统。

硬件故障

目前尚不清楚在何种情况下本地进程会知道远程工作线程已关闭连接。如果套接字未正常关闭,系统将等待一个超时,大约三秒钟,然后标记工作线程为失败并恢复平稳操作。

调度器故障

包含调度器的进程可能会死亡。目前没有持久化机制来记录和恢复调度器的状态。

工作者和客户端在调度器重新上线后都会重新连接,但正在进行的计算记录将会丢失。

重启和保姆进程

客户端提供了一种机制来重启集群中的所有工作节点。如果在实验过程中,你发现工作节点处于一种不便的状态,导致它们无响应,这将非常方便。Client.restart 方法会终止所有工作节点,清除所有调度器状态,然后重新启动所有工作节点,从而得到一个干净的集群。这需要保姆进程(默认情况下会启动)。