工作窃取
内容
工作窃取¶
某些任务倾向于在特定的工作者上运行。这可能是因为该工作者持有任务的数据依赖关系,或者是因为用户表达了任务在特定地点运行的松散愿望。偶尔这会导致一些非常忙碌的工作者和几个闲置的工作者。在这种情况下,闲置的工作者可能会选择从忙碌的工作者那里窃取工作,即使窃取工作需要昂贵的数据移动。
这是一个性能优化,并非正确性所必需。工作窃取在许多临时情况下提供了鲁棒性,但当我们窃取了错误的任务并降低了性能时,也可能适得其反。
偷窃的标准¶
计算与通信比率¶
当任务的计算时间远长于任务依赖的通信时间时,窃取是有利可图的。
坏例子
如果计算速度很快,我们不希望窃取那些需要将大量依赖数据从受害者传输到窃贼的任务。最终,我们在通信上花费的时间远超过只需稍等片刻并放弃并行性。
[data] = client.scatter([np.arange(1000000000)])
x = client.submit(np.sum, data)
好的例子
我们确实想要窃取只需要移动依赖数据片段的任务,特别是在计算时间昂贵的情况下(这里为100秒。)
[data] = client.scatter([100])
x = client.submit(sleep, data)
幸运的是,我们通常知道依赖项的字节数(通过在工作者上调用 sys.getsizeof
报告)和先前看到的函数的运行时成本,这些成本作为指数加权移动平均值维护。
饱和的工作负担¶
即使计算时间与通信时间比率不佳,盗窃也可能是有利可图的。这种情况发生在饱和的工作者有非常长的任务积压,并且有大量闲置工作者时。我们判断是否可以盗窃一个任务,如果饱和工作者最后要运行的任务在被盗窃后能更快完成,或者如果它留在原始/受害者工作者上。
可窃取任务的积压时间越长,以及活跃工作者的数量越少,我们越愿意进行窃取。这与计算与通信成本的比率相平衡。
复制流行数据¶
如果盗窃导致高度需求的数据在更多的工作者上被复制,从长远来看也是好的。
从富人那里偷窃¶
我们更倾向于从任务特别繁重的工人那里窃取任务,而不是从只有少量多余任务的工人那里。
限制¶
如果一个任务被特别限制在特定的工人上运行(例如当需要特殊硬件时),那么我们不会窃取。
选择要窃取的任务¶
我们维护一个可窃取任务的集合列表,按计算与通信时间比率排序并分入不同的箱子。第一个箱子包含所有计算与通信比率大于或等于8的任务(认为足够高以至于总是可以窃取),下一个箱子的比率为4,再下一个箱子的比率为2,等等,一直到比率为1/256的箱子,我们永远不会窃取这些任务。
此数据结构提供了一个大致有序的视图,展示了所有可窃取的任务,我们可以在常数时间内添加和移除任务,而不是像堆这样的传统数据结构那样需要 log(n)
的时间。
在任何阶段,当我们向工作者提交任务时,我们会检查是否同时存在空闲和饱和的工作者,如果是,我们会快速遍历这个集合列表,首先从最佳的桶中选择任务,然后逐步向下到不太理想的可窃取任务的桶。我们停止的情况是:没有更多的可窃取任务,没有更多的空闲工作者,或者在当前积压情况下,待窃取任务的质量不够高。
这种方法速度快,优化了窃取任务的计算与通信成本比率(最多可达两倍),并且倾向于从积压任务最多的工作者那里窃取任务,这是由于随机选择自然倾向于从最大的群体中抽取。
事务性工作窃取¶
为了避免重复执行同一任务,Dask 实现了事务性工作窃取。当调度器识别到一个应移动的任务时,它首先向繁忙的工作者发送请求。工作者检查其当前的任务状态,并向调度器发送响应:
如果任务尚未运行,那么工作节点将取消该任务,并通知调度器可以将任务重新路由到其他地方。
如果任务已经在运行或已完成,那么工作者会告知调度器不应在其他地方复制该任务。
这避免了重复工作,并且在处理更复杂的任务时也避免了副作用的重复。然而,在工人死亡或网络连接中断的情况下,同一任务的并发或重复执行 仍然是可能的。