调试
内容
调试¶
调试并行程序是困难的。像日志记录和使用 pdb
与回溯交互这样的正常调试工具在异常发生在远程机器、不同进程或线程时,会停止正常工作。
Dask 提供了多种机制来简化这个过程。根据你的情况,这些方法中的一些可能比其他方法更合适。
这些方法从轻量级或简单的解决方案到更复杂的解决方案依次排列。
打印¶
调试最基本的方法之一是简单地打印值并检查它们。然而,当使用 Python 的内置 print()
函数与 Dask 一起使用时,这些打印通常发生在远程机器上,而不是用户的 Python 会话中,这通常不是开发者在调试时想要的经验。
因此,Dask 提供了一个 dask.distributed.print
函数,它的作用与 Python 的内置 print()
函数相同,但还会将打印的输出转发到客户端的 Python 会话中。这使得分布式调试感觉更像本地调试。
异常¶
当计算中的任务失败时,理解出错的标准方法是查看异常和回溯。通常人们通过 pdb
模块、IPython 的 %debug
或 %pdb
魔法,或者直接查看回溯并调查代码中异常发生的位置来完成这一操作。
通常,当计算在单独的线程或不同的机器上执行时,这些方法会失效。为了解决这个问题,Dask 提供了一些机制来重新创建正常的 Python 调试体验。
检查异常和回溯¶
默认情况下,Dask 已经在异常和回溯发生的地方复制它们,并在本地重新引发该异常。如果你的任务在远程因 ZeroDivisionError
失败,那么你会在交互会话中得到一个 ZeroDivisionError
。同样,你会看到这个错误发生的完整回溯,就像在普通 Python 中一样,这可以帮助你识别代码中的问题所在。
然而,你不能在这些回溯中使用 pdb
模块或 %debug
IPython 魔法来查看失败时变量的值。你只能通过视觉检查。此外,回溯的顶部可能充满了 Dask 特有的函数,这些与你的问题无关,因此你可以安全地忽略这些。
单机和分布式调度器都执行此操作。
使用单线程调度器¶
Dask 自带一个简单的单线程调度器。这不会提供任何并行性能改进,但会在本地线程中忠实地运行您的 Dask 计算,允许您使用诸如 pdb
、%debug
IPython 魔法、像 cProfile
模块这样的分析工具以及 snakeviz 等正常工具。这使得您可以在 Dask 计算中使用所有正常的 Python 调试技巧,只要您不需要并行性。
单线程调度器可以使用,例如,通过在计算调用中设置 scheduler='single-threaded'
:
>>> x.compute(scheduler='single-threaded')
有关配置调度器的更多方法,请参阅 调度器配置文档。
这仅适用于单机调度器。它不适用于 dask.distributed
,除非你熟悉使用 Tornado API(查看 测试基础设施 文档,其中实现了这一点)。此外,由于这是在单机上操作的,它假设你的计算可以在单机上运行而不会超出内存限制。如果可能的话,在较小版本的问题上使用这种方法可能是明智的。
本地重新运行失败的任务¶
如果远程任务失败,我们可以收集函数及其所有输入,将它们带到本地线程,然后重新运行该函数,希望在本地触发相同的异常,以便可以使用正常的调试工具。
使用单机调度器时,使用 rerun_exceptions_locally=True
关键字:
>>> x.compute(rerun_exceptions_locally=True)
在使用分布式调度器时,对包含 Futures
的任何对象使用 recreate_error_locally
方法:
>>> x.compute()
ZeroDivisionError(...)
>>> %pdb
>>> future = client.compute(x)
>>> client.recreate_error_locally(future)
手动移除失败的期货¶
有时只有部分计算失败,例如,如果CSV数据集的某些行在某种程度上是错误的。当使用分布式调度器运行时,如果你切换到处理Futures,你可以移除产生错误结果的数据块:
>>> import dask.dataframe as dd
>>> df = ... # create dataframe
>>> df = df.persist() # start computing on the cluster
>>> from distributed.client import futures_of
>>> futures = futures_of(df) # get futures behind dataframe
>>> futures
[<Future: status: finished, type: pd.DataFrame, key: load-1>
<Future: status: finished, type: pd.DataFrame, key: load-2>
<Future: status: error, key: load-3>
<Future: status: pending, key: load-4>
<Future: status: error, key: load-5>]
>>> # wait until computation is done
>>> while any(f.status == 'pending' for f in futures):
... sleep(0.1)
>>> # pick out only the successful futures and reconstruct the dataframe
>>> good_futures = [f for f in futures if f.status == 'finished']
>>> df = dd.from_delayed(good_futures, meta=df._meta)
这是一个有点hack的做法,但在初次探索杂乱数据时通常很实用。如果你使用的是concurrent.futures API(map, submit, gather),那么这种方法更加自然。
检查调度状态¶
并非所有错误都以异常的形式出现。例如,在分布式系统中,工作节点可能会意外死亡,由于工作节点间通信或调度器开销,计算可能会异常缓慢,或者出现其他多种问题。获取关于正在发生的事情的反馈有助于识别故障和一般性能瓶颈。
对于单机调度器,请参阅 本地诊断 文档。本节的其余部分将假设您正在使用 分布式调度器 ,在这些情况下这些问题更常见。
Web 诊断¶
首先,分布式调度器有许多 诊断工具 ,显示了数十种记录的指标,如CPU、内存、网络和磁盘使用情况,先前任务的历史记录,任务分配给工作者的分配情况,工作者内存压力,工作窃取,打开文件句柄限制等。许多 问题可以通过检查这些页面正确诊断。默认情况下,这些工具可以在 http://scheduler:8787/
访问,其中 scheduler
应替换为调度器的地址。更多信息请参见 诊断性能文档 。
日志¶
调度器、工作者和客户端都使用 Python 的标准日志模块 来记录日志。默认情况下,这些日志会输出到标准错误。当 Dask 由集群作业调度器(SGE/SLURM/YARN/Mesos/Marathon/Kubernetes/任何)启动时,该系统会跟踪这些日志,并提供一个接口帮助你访问它们。如果你自己启动 Dask,除非你 将 stderr 重定向到一个文件,否则它们可能会直接输出到屏幕上。
你可以在 配置 中控制日志记录的详细程度,例如,~/.config/dask/*.yaml
文件。当前的默认设置如下所示:
logging:
distributed: info
distributed.client: warning
bokeh: error
像 distributed.client
、distributed.scheduler
、distributed.nanny
、distributed.worker
等特定组件的日志可以分别独立配置。例如,你可以添加一行 distributed.worker: debug
来获取来自工作节点的 非常 详细的输出。
此外,您可以显式地为记录器分配处理程序。以下示例将文件(“output.log”)和控制台输出分配给调度器和工作器。有关此处特定术语的含义,请参阅 python logging 文档。
logging:
version: 1
handlers:
file:
class: logging.handlers.RotatingFileHandler
filename: output.log
level: INFO
console:
class: logging.StreamHandler
level: INFO
loggers:
distributed.worker:
level: INFO
handlers:
- file
- console
distributed.scheduler:
level: INFO
handlers:
- file
- console
LocalCluster¶
如果你正在使用单机上的分布式调度器,你可能正在通过命令行界面手动设置工作节点,或者你可能正在使用 LocalCluster ,这就是当你只调用 Client()
时运行的内容:
>>> from dask.distributed import Client, LocalCluster
>>> client = Client() # This is actually the following two commands
>>> cluster = LocalCluster()
>>> client = Client(cluster.scheduler.address)
LocalCluster 很有用,因为调度器和工作器与你在同一个进程中,所以你可以在它们运行时轻松检查它们的 `状态 <http://www.aidoczh.com/dask-distributed/scheduling-state.html>`_(它们在单独的线程中运行):
>>> cluster.scheduler.processing
{'worker-one:59858': {'inc-123', 'add-443'},
'worker-two:48248': {'inc-456'}}
你也可以为工作者这样做 如果 你运行它们时不使用保姆进程:
>>> cluster = LocalCluster(nanny=False)
>>> client = Client(cluster)
如果你想使用 Dask 分布式 API 并且仍然想直接在 workers 中调查发生了什么,这会非常有帮助。信息不像在网页诊断中那样为你提炼,但你拥有完全的低级访问权限。