异步操作
内容
异步操作¶
Dask 可以完全异步运行,因此可以与其他高度并发的应用程序互操作。在内部,Dask 构建在 Tornado 协程之上,但也具有与 asyncio 的兼容层(见下文)。
基本操作¶
在启动客户端时,提供 asynchronous=True
关键字以告知 Dask 你打算在异步上下文中使用此客户端,例如在定义为 async/await
语法的函数中。
async def f():
client = await Client(asynchronous=True)
以前会阻塞的操作现在提供了 Tornado 协程,您可以在其上 await
。
仅提交任务的快速函数仍然保持快速,不需要等待。这包括所有向集群提交任务的函数,如 submit
、map
、compute
和 persist
。
future = client.submit(lambda x: x + 1, 10)
你可以直接等待未来
result = await future
>>> print(result)
11
或者你可以使用常规的客户端方法。任何等待直到从调度器接收到信息的操作现在都应该被 await
。
result = await client.gather(future)
如果你想在一个同步的 Client``(即没有使用 ``asynchronous=True
关键字创建的)中使用异步函数,那么你可以在每次方法调用时应用 asynchronous=True
关键字,并使用 Client.sync
函数来运行异步函数:
from dask.distributed import Client
client = Client() # normal blocking client
async def f():
future = client.submit(lambda x: x + 1, 10)
result = await client.gather(future, asynchronous=True)
return result
client.sync(f)
async with Client(asynchronous=True) as client:
arr = da.random.random((1000, 1000), chunks=(1000, 100))
await client.compute(arr.mean())
示例¶
这个独立的示例启动一个异步客户端,提交一个简单的任务,等待结果,然后关闭客户端。你可以看到 Asyncio 和 Tornado 的实现。
使用 Tornado 或 Asyncio 的 Python 3¶
from dask.distributed import Client
async def f():
client = await Client(asynchronous=True)
future = client.submit(lambda x: x + 1, 10)
result = await future
await client.close()
return result
# Either use Tornado
from tornado.ioloop import IOLoop
IOLoop().run_sync(f)
# Or use asyncio
import asyncio
asyncio.get_event_loop().run_until_complete(f())
用例¶
历史上,这已被用于几种应用程序中:
要将 Dask 集成到其他异步服务(如 Web 后端)中,提供一个类似于 Celery 的计算引擎,同时仍然保持高度的并发性,并且不会不必要地阻塞。
对于那些状态变化或更新非常迅速的计算,例如在某些高级机器学习工作负载中常见的情况。
为了开发 Dask 分布式基础设施的内部结构,这些结构完全是用这种方式编写的。
对于高级应用程序中的复杂控制和数据结构。