优先处理工作
优先处理工作¶
当工作量超过工人数量时,Dask 必须决定哪些任务优先于其他任务。Dask 可以自动确定这些优先级以优化性能,或者用户可以根据自己的需求手动指定优先级。
Dask 使用以下优先级,按顺序:
用户优先级:用户定义的优先级通过
priority=
关键字参数提供给compute()
、persist()
、submit()
或map()
等函数。优先级较高的任务会在优先级较低的任务之前运行,默认优先级为零。future = client.submit(func, *args, priority=10) # high priority task future = client.submit(func, *args, priority=-10) # low priority task df = df.persist(priority=10) # high priority computation
优先级也可以使用 dask 注释机制来指定:
with dask.annotate(priority=10): future = client.submit(func, *args) # high priority task with dask.annotate(priority=-10): future = client.submit(func, *args) # low priority task with dask.annotate(priority=10): df = df.persist() # high priority computation
先入先出按时间顺序:Dask 优先处理较早提交的计算。由于用户可以异步提交计算,因此可能会有多个不同的计算同时在工作者上运行。通常,Dask 优先处理那些较早提交的任务组。
作为一种细微差别,在接近的时间窗口内提交的任务通常被认为是同时提交的。
x = x.persist() # submitted first and so has higher priority # wait a while x = x.persist() # submitted second and so has lower priority
在这种情况下,“一段时间”取决于计算的类型。在批量处理中经常使用的操作,如
compute
和persist
,认为在同一六十秒内提交的任何两次计算具有相同的优先级。在实时处理中经常使用的操作,如submit
或map
,如果在彼此的100毫秒内提交,则被视为相同的优先级。这种行为可以通过fifo_timeout=
关键字来控制:x = x.persist() # wait one minute x = x.persist(fifo_timeout='10 minutes') # has the same priority a = client.submit(func, *args) # wait no time at all b = client.submit(func, *args, fifo_timeout='0ms') # is lower priority
图结构:在任何给定的计算(计算或持久化调用)中,Dask 以最小化计算内存占用的方式对任务进行排序。这在 任务排序文档 中有更深入的讨论。
如果多个任务的优先级完全相同,那么任务到达工作者的顺序(后进先出)将用于确定任务运行的顺序。