优先处理工作

优先处理工作

当工作量超过工人数量时,Dask 必须决定哪些任务优先于其他任务。Dask 可以自动确定这些优先级以优化性能,或者用户可以根据自己的需求手动指定优先级。

Dask 使用以下优先级,按顺序:

  1. 用户优先级:用户定义的优先级通过 priority= 关键字参数提供给 compute()persist()submit()map() 等函数。优先级较高的任务会在优先级较低的任务之前运行,默认优先级为零。

    future = client.submit(func, *args, priority=10)  # high priority task
    future = client.submit(func, *args, priority=-10)  # low priority task
    
    df = df.persist(priority=10)  # high priority computation
    

    优先级也可以使用 dask 注释机制来指定:

    with dask.annotate(priority=10):
        future = client.submit(func, *args)  # high priority task
    with dask.annotate(priority=-10):
        future = client.submit(func, *args)  # low priority task
    
    with dask.annotate(priority=10):
        df = df.persist()  # high priority computation
    
  2. 先入先出按时间顺序:Dask 优先处理较早提交的计算。由于用户可以异步提交计算,因此可能会有多个不同的计算同时在工作者上运行。通常,Dask 优先处理那些较早提交的任务组。

    作为一种细微差别,在接近的时间窗口内提交的任务通常被认为是同时提交的。

    x = x.persist()  # submitted first and so has higher priority
    # wait a while
    x = x.persist()  # submitted second and so has lower priority
    

    在这种情况下,“一段时间”取决于计算的类型。在批量处理中经常使用的操作,如 computepersist,认为在同一六十秒内提交的任何两次计算具有相同的优先级。在实时处理中经常使用的操作,如 submitmap,如果在彼此的100毫秒内提交,则被视为相同的优先级。这种行为可以通过 fifo_timeout= 关键字来控制:

    x = x.persist()
    # wait one minute
    x = x.persist(fifo_timeout='10 minutes')  # has the same priority
    
    a = client.submit(func, *args)
    # wait no time at all
    b = client.submit(func, *args, fifo_timeout='0ms')  # is lower priority
    
  3. 图结构:在任何给定的计算(计算或持久化调用)中,Dask 以最小化计算内存占用的方式对任务进行排序。这在 任务排序文档 中有更深入的讨论。

如果多个任务的优先级完全相同,那么任务到达工作者的顺序(后进先出)将用于确定任务运行的顺序。