日志记录
内容
日志记录¶
在整个Dask集群中,有几种方式记录状态和其他活动。
任务过渡日志¶
调度器跟踪每个任务的所有 状态转换 。这提供了任务如何通过其计算过程的洞察,并且在调试时特别有价值。要检索给定任务的转换日志,请将任务的键传递给 Scheduler.story()
方法。
>>> f = client.submit(inc, 123)
>>> f
<Future: finished, type: builtins.int, key: inc-aad7bbea25dc61c8e53d929c7ec50bed>
>>> s.story(f.key)
[('inc-aad7bbea25dc61c8e53d929c7ec50bed', 'released', 'waiting', {'inc-aad7bbea25dc61c8e53d929c7ec50bed': 'processing'}, 1605143345.7283862),
('inc-aad7bbea25dc61c8e53d929c7ec50bed', 'waiting', 'processing', {}, 1605143345.7284858),
('inc-aad7bbea25dc61c8e53d929c7ec50bed', 'processing', 'memory', {}, 1605143345.731495)]
结构化日志¶
调度器、工作节点和客户端都支持将结构化事件记录到一个集中式的账本中,该账本按主题索引。默认情况下,Dask 会记录一些管理事件到此系统(例如工作节点加入和离开集群时),但可以使用 Scheduler.log_event()
、Worker.log_event()
或 Client.log_event()
方法记录自定义事件。
例如,下面我们使用工作者的 log_event
方法将开始和结束时间记录到 "runtimes"
主题中:
>>> def myfunc(x):
... start = time()
... ...
... stop = time()
... dask.distributed.get_worker().log_event("runtimes", {"start": start, "stop": stop})
>>> futures = client.map(myfunc, range(10))
>>> client.get_events("runtimes")
((1605207481.77175, {'start': 1605207481.769397, 'stop': 1605207481.769397}),
(1605207481.772021, {'start': 1605207481.770036, 'stop': 1605207481.770037}),
...
)
可以使用 Client.get_events()
方法获取给定主题的事件。在上面的示例中,我们使用 client.get_events("runtimes")
获取了记录的开始和停止时间。请注意,Client.get_events
为每个记录的事件返回一个元组,其中包含记录的消息以及事件被记录时的时间戳。
当与调度器和工作器插件结合时,结构化事件系统可以生成丰富的日志/诊断系统。