从任务启动任务

有时从其他任务启动任务会很方便。例如,你可能直到获得某些初始计算的结果后,才知道要运行哪些计算。

激励示例

我们想要下载一段数据并将其转换为列表。然后我们希望为该列表的每个元素提交一个任务。在我们获取数据之前,我们不知道列表会有多长。

因此,我们发送我们的原始 download_and_convert_to_list 函数,该函数在我们的一个工作机器上下载数据并将其转换为列表:

future = client.submit(download_and_convert_to_list, uri)

但现在我们需要为这些数据的各个部分提交新任务。我们有三种选择。

  1. 将数据收集回本地进程,然后从本地进程提交新任务

  2. 只收集足够的数据信息返回到本地进程,并从本地进程提交作业

  3. 提交一个任务到集群,该任务将直接从该工作节点提交其他任务

本地收集数据

如果数据量不大,我们可以将其带回到客户端,在我们的本地机器上执行必要的逻辑:

>>> data = future.result()                  # gather data to local process
>>> data                                    # data is a list
[...]

>>> futures = e.map(process_element, data)  # submit new tasks on data
>>> analysis = e.submit(aggregate, futures) # submit final aggregation task

这很简单,如果 data 很小,那么这可能是最简单的,因此是正确的选择。然而,如果 data 很大,那么我们必须选择另一个选项。

从客户端提交任务

我们可以在远程数据上运行小函数,以确定足够的信息来提交正确类型的任务。在下面的示例中,我们在远程计算 datalen 函数,然后将数据分解为其各个元素。

>>> n = client.submit(len, data)            # compute number of elements
>>> n = n.result()                          # gather n (small) locally

>>> from operator import getitem
>>> elements = [client.submit(getitem, data, i) for i in range(n)]  # split data

>>> futures = client.map(process_element, elements)
>>> analysis = client.submit(aggregate, futures)

我们在远程计算长度,收集回这个非常小的结果,然后使用它来提交更多任务以分解数据并在集群上处理。这更复杂,因为我们必须在集群和本地进程之间来回几次,但移动的数据非常小,因此这只会给我们的总处理时间增加几毫秒。

扩展示例

计算斐波那契数涉及一个递归函数。当函数运行时,它会使用已计算的值调用自身。我们将在整个文档中使用这个作为示例,来说明从任务中提交任务的不同技术。

def fib(n):
    if n < 2:
        return n
    a = fib(n - 1)
    b = fib(n - 2)
    return a + b

print(fib(10))  # prints "55"

我们将使用这个示例来展示不同的接口。

从工作者提交任务

注意:此接口是新的且实验性的。它可能在未来的版本中未经警告而更改。

我们可以从其他任务中提交任务。这使我们能够在工作节点上做出决策。

要从工作者提交新任务,该工作者必须首先创建一个新的客户端对象,该对象连接到调度器。有三种选择:

  1. dask.delayeddask.compute

  2. get_clientsecederejoin

  3. worker_client

dask.delayed

Dask 延迟行为如常:它将函数提交到图表中,优化带宽/计算量,并收集结果。更多详情,请参见 dask.delayed

from distributed import Client
from dask import delayed, compute


@delayed
def fib(n):
    if n < 2:
        return n
    # We can use dask.delayed and dask.compute to launch
    # computation from within tasks
    a = fib(n - 1)  # these calls are delayed
    b = fib(n - 2)
    a, b = compute(a, b)  # execute both in parallel
    return a + b

if __name__ == "__main__":
    # these features require the dask.distributed scheduler
    client = Client()

    result = fib(10).compute()
    print(result)  # prints "55"

在工作者上获取客户端

The get_client function provides a normal Client object that gives full access to the dask cluster, including the ability to submit, scatter, and gather results.

from distributed import Client, get_client, secede, rejoin

def fib(n):
    if n < 2:
        return n
    client = get_client()
    a_future = client.submit(fib, n - 1)
    b_future = client.submit(fib, n - 2)
    a, b = client.gather([a_future, b_future])
    return a + b

if __name__ == "__main__":
    client = Client()
    future = client.submit(fib, 10)
    result = future.result()
    print(result)  # prints "55"

然而,如果太多任务同时请求作业,这可能会导致调度器死锁。每个任务不会通知调度器它们正在等待结果,并且可以自由计算其他任务。如果每个调度槽都在运行一个任务并且它们都请求更多任务,这可能会导致集群死锁。

为了避免这种死锁问题,我们可以使用 secederejoin。这些函数将分别从集群中移除并重新加入当前任务。

def fib(n):
    if n < 2:
        return n
    client = get_client()
    a_future = client.submit(fib, n - 1)
    b_future = client.submit(fib, n - 2)
    secede()
    a, b = client.gather([a_future, b_future])
    rejoin()
    return a + b

与上下文管理器连接

函数 worker_client 执行与 get_client 相同的任务,但实现为上下文管理器。使用 worker_client 作为上下文管理器可以确保工作线程上的正确清理。

from dask.distributed import Client, worker_client


def fib(n):
    if n < 2:
        return n
    with worker_client() as client:
        a_future = client.submit(fib, n - 1)
        b_future = client.submit(fib, n - 2)
        a, b = client.gather([a_future, b_future])
    return a + b

if __name__ == "__main__":
    client = Client()
    future = client.submit(fib, 10)
    result = future.result()
    print(result)  # prints "55"

调用 worker_client 的任务被保守地假设为 长时间运行 。它们可能需要很长时间,等待其他任务完成,收集结果等。为了避免它们占用处理槽,每当任务调用 worker_client 时,以下操作就会发生。

  1. 在工作线程上运行此函数的线程 退出 线程池并自行运行。这允许线程池用新线程填充该槽位,并继续处理其他任务,而不将此长时间运行的任务计入其正常配额。

  2. Worker 向调度器发送一条消息,暂时将其允许的任务数量增加一个。这同样使得调度器可以分配更多的任务给这个Worker,而不将这个长时间运行的任务计入其中。

建立与调度器的连接需要几毫秒的时间,因此使用此功能的计算持续时间至少应比这长几倍是明智的。