异步/等待与非阻塞执行
内容
实时笔记本
你可以在 live session 中运行此笔记本,或查看 on Github。
异步/等待与非阻塞执行¶
Dask 原生集成使用 Tornado 或 Asyncio 框架的并发应用程序,并且可以利用 Python 的 async
和 await
关键字。
这个示例展示了一个如何在异步模式下启动 Dask 客户端的小示例。
asynchronous=True
参数¶
Dask 的 LocalCluster 和 Client 对象可以在异步等待模式下运行,如果你传递 asynchronous=True
参数。
[ ]:
from dask.distributed import Client
client = await Client(asynchronous=True)
[ ]:
def inc(x: int) -> int:
return x + 1
future = client.submit(inc, 10)
future
[ ]:
await future
集合¶
请注意,在异步模式下,像 .compute()
方法这样的阻塞操作是不合适的。相反,你需要使用 Client.compute
方法。
[ ]:
import dask
df = dask.datasets.timeseries()
df
[ ]:
df = df.persist() # persist is non-blocking, so it's ok
[ ]:
total = df[['x', 'y']].sum() # lazy computations are also ok
[ ]:
# total.compute() # but compute is bad, because compute blocks until done
[ ]:
future = client.compute(total)
future
[ ]:
await future
在脚本中¶
在 Jupyter 中运行 async/await 代码有些不寻常。Jupyter 已经有一个事件循环在运行,因此可以直接在其中使用 async/await 语法。在普通的 Python 脚本中则不会这样。以下是一个示例脚本,它应该在普通的 Python 解释器中或作为脚本运行。
import asyncio
from dask.distributed import Client
def inc(x: int) -> int:
return x + 1
async def f():
async with Client(asynchronous=True) as client:
future = client.submit(inc, 10)
result = await future
print(result)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(f())