日志记录

在整个Dask集群中,有几种方式记录状态和其他活动。

日志

调度器、工作者和客户端都使用Python的标准日志模块记录各种管理事件。日志级别和日志处理程序都是可定制的。更多信息请参见 调试文档

任务过渡日志

调度器跟踪每个任务的所有 状态转换 。这提供了任务如何通过其计算过程的洞察,并且在调试时特别有价值。要检索给定任务的转换日志,请将任务的键传递给 Scheduler.story() 方法。

>>> f = client.submit(inc, 123)
>>> f
<Future: finished, type: builtins.int, key: inc-aad7bbea25dc61c8e53d929c7ec50bed>
>>> s.story(f.key)
[('inc-aad7bbea25dc61c8e53d929c7ec50bed', 'released', 'waiting', {'inc-aad7bbea25dc61c8e53d929c7ec50bed': 'processing'}, 1605143345.7283862),
 ('inc-aad7bbea25dc61c8e53d929c7ec50bed', 'waiting', 'processing', {}, 1605143345.7284858),
 ('inc-aad7bbea25dc61c8e53d929c7ec50bed', 'processing', 'memory', {}, 1605143345.731495)]

结构化日志

调度器、工作节点和客户端都支持将结构化事件记录到一个集中式的账本中,该账本按主题索引。默认情况下,Dask 会记录一些管理事件到此系统(例如工作节点加入和离开集群时),但可以使用 Scheduler.log_event()Worker.log_event()Client.log_event() 方法记录自定义事件。

例如,下面我们使用工作者的 log_event 方法将开始和结束时间记录到 "runtimes" 主题中:

>>> def myfunc(x):
...     start = time()
...     ...
...     stop = time()
...     dask.distributed.get_worker().log_event("runtimes", {"start": start, "stop": stop})
>>> futures = client.map(myfunc, range(10))
>>> client.get_events("runtimes")
((1605207481.77175, {'start': 1605207481.769397, 'stop': 1605207481.769397}),
 (1605207481.772021, {'start': 1605207481.770036, 'stop': 1605207481.770037}),
 ...
)

可以使用 Client.get_events() 方法获取给定主题的事件。在上面的示例中,我们使用 client.get_events("runtimes") 获取了记录的开始和停止时间。请注意,Client.get_events 为每个记录的事件返回一个元组,其中包含记录的消息以及事件被记录时的时间戳。

当与调度器和工作器插件结合时,结构化事件系统可以生成丰富的日志/诊断系统。