异步/等待与非阻塞执行

实时笔记本

你可以在 live session Binder 中运行此笔记本,或查看 on Github

异步/等待与非阻塞执行

Dask 原生集成使用 TornadoAsyncio 框架的并发应用程序,并且可以利用 Python 的 asyncawait 关键字。

这个示例展示了一个如何在异步模式下启动 Dask 客户端的小示例。

asynchronous=True 参数

Dask 的 LocalCluster 和 Client 对象可以在异步等待模式下运行,如果你传递 asynchronous=True 参数。

[ ]:
from dask.distributed import Client
client = await Client(asynchronous=True)
[ ]:
def inc(x: int) -> int:
    return x + 1

future = client.submit(inc, 10)
future
[ ]:
await future

集合

请注意,在异步模式下,像 .compute() 方法这样的阻塞操作是不合适的。相反,你需要使用 Client.compute 方法。

[ ]:
import dask
df = dask.datasets.timeseries()
df
[ ]:
df = df.persist()             # persist is non-blocking, so it's ok
[ ]:
total = df[['x', 'y']].sum()  # lazy computations are also ok
[ ]:
# total.compute()             # but compute is bad, because compute blocks until done
[ ]:
future = client.compute(total)
future
[ ]:
await future

在脚本中

在 Jupyter 中运行 async/await 代码有些不寻常。Jupyter 已经有一个事件循环在运行,因此可以直接在其中使用 async/await 语法。在普通的 Python 脚本中则不会这样。以下是一个示例脚本,它应该在普通的 Python 解释器中或作为脚本运行。

import asyncio
from dask.distributed import Client


def inc(x: int) -> int:
    return x + 1


async def f():
    async with Client(asynchronous=True) as client:
        future = client.submit(inc, 10)
        result = await future
        print(result)


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(f())