异步操作

Dask 可以完全异步运行,因此可以与其他高度并发的应用程序互操作。在内部,Dask 构建在 Tornado 协程之上,但也具有与 asyncio 的兼容层(见下文)。

基本操作

在启动客户端时,提供 asynchronous=True 关键字以告知 Dask 你打算在异步上下文中使用此客户端,例如在定义为 async/await 语法的函数中。

async def f():
    client = await Client(asynchronous=True)

以前会阻塞的操作现在提供了 Tornado 协程,您可以在其上 await

仅提交任务的快速函数仍然保持快速,不需要等待。这包括所有向集群提交任务的函数,如 submitmapcomputepersist

future = client.submit(lambda x: x + 1, 10)

你可以直接等待未来

result = await future

>>> print(result)
11

或者你可以使用常规的客户端方法。任何等待直到从调度器接收到信息的操作现在都应该被 await

result = await client.gather(future)

如果你想在一个同步的 Client``(即没有使用 ``asynchronous=True 关键字创建的)中使用异步函数,那么你可以在每次方法调用时应用 asynchronous=True 关键字,并使用 Client.sync 函数来运行异步函数:

from dask.distributed import Client

client = Client()  # normal blocking client

async def f():
    future = client.submit(lambda x: x + 1, 10)
    result = await client.gather(future, asynchronous=True)
    return result

client.sync(f)
async with Client(asynchronous=True) as client:
    arr = da.random.random((1000, 1000), chunks=(1000, 100))
    await client.compute(arr.mean())

示例

这个独立的示例启动一个异步客户端,提交一个简单的任务,等待结果,然后关闭客户端。你可以看到 Asyncio 和 Tornado 的实现。

使用 Tornado 或 Asyncio 的 Python 3

from dask.distributed import Client

async def f():
    client = await Client(asynchronous=True)
    future = client.submit(lambda x: x + 1, 10)
    result = await future
    await client.close()
    return result

# Either use Tornado
from tornado.ioloop import IOLoop
IOLoop().run_sync(f)

# Or use asyncio
import asyncio
asyncio.get_event_loop().run_until_complete(f())

用例

历史上,这已被用于几种应用程序中:

  1. 要将 Dask 集成到其他异步服务(如 Web 后端)中,提供一个类似于 Celery 的计算引擎,同时仍然保持高度的并发性,并且不会不必要地阻塞。

  2. 对于那些状态变化或更新非常迅速的计算,例如在某些高级机器学习工作负载中常见的情况。

  3. 为了开发 Dask 分布式基础设施的内部结构,这些结构完全是用这种方式编写的。

  4. 对于高级应用程序中的复杂控制和数据结构。