处理不断演变的工作流程
内容
实时笔记本
您可以在 live session 中运行此笔记本,或查看 Github 上的内容。
处理不断演变的工作流程¶
对于某些工作流程,我们一开始并不知道计算的范围。我们需要进行一些计算,以便弄清楚我们需要进行的其余计算。随着我们做更多的工作,计算会增长和演变。
例如,考虑一个需要读取许多文件的情况,然后根据这些文件的内容触发额外的工作。您希望并行读取文件,然后在每个文件中暴露更多的并行性。
本示例通过三种方式使用 Dask Futures 来处理这种情况。
使用
as_completed
使用
async/await
从任务启动任务
但首先,让我们按顺序运行我们的代码。
0: 顺序代码¶
[ ]:
filenames = ["file.{}.txt".format(i) for i in range(10)]
filenames[:3]
[ ]:
import random, time
def parse_file(fn: str) -> list:
""" Returns a list work items of unknown length """
time.sleep(random.random())
return [random.random() for _ in range(random.randint(1, 10))]
def process_item(x: float):
""" Process each work item """
time.sleep(random.random() / 4)
return x + 1
[ ]:
%%time
# This takes around 10-20s
results = []
for fn in filenames:
L = parse_file(fn)
for x in L:
out = process_item(x)
results.append(out)
启动 Dask 客户端¶
为了管理动态工作负载,我们需要一个 Dask 客户端。
[ ]:
from dask.distributed import Client
client = Client(processes=False, n_workers=1, threads_per_worker=6)
client
1: 使用 as_completed¶
as_completed 迭代器让我们可以在未来对象完成时处理它们。然后我们可以动态提交更多数据。
我们为每个文件名提交一个任务
我们还计算返回的每个列表的长度
随着这些长度返回,我们提交一个新任务来获取该列表中的每一项。我们以更高的优先级执行此操作,以便在收集新数据之前处理现有数据。
我们等待所有返回的结果
[ ]:
%%time
from dask.distributed import as_completed
import operator
lists = client.map(parse_file, filenames, pure=False)
lengths = client.map(len, lists)
mapping = dict(zip(lengths, lists))
futures = []
for future in as_completed(lengths):
n = future.result()
L = mapping[future]
for i in range(n):
new = client.submit(operator.getitem, L, i, priority=1)
new = client.submit(process_item, new, priority=1)
futures.append(new)
client.gather(futures)
2: 使用 async/await 处理本地单文件处理¶
我们也可以在我们的本地进程中处理并发。这需要你理解 async/await 语法,但通常更强大,并且可以说比上面的 as_completed
方法更简单。
[ ]:
import asyncio
async def f(fn):
""" Handle the lifecycle of a single file """
future = client.submit(parse_file, fn, pure=False)
length_future = client.submit(len, future)
length = await length_future
futures = [client.submit(operator.getitem, future, i, priority=10)
for i in range(length)]
futures = client.map(process_item, futures, priority=10)
return futures
async def run_all(filenames):
list_of_list_of_futures = await asyncio.gather(*[f(fn) for fn in filenames])
futures = sum(list_of_list_of_futures, [])
return await client.gather(futures)
我们现在需要在客户端运行的同一个事件循环中运行这个函数。如果我们已经异步启动了客户端,那么我们可以这样做:
client = await Client(asynchronous=True)
await run_all(filenames)
然而,因为我们启动客户端时没有使用 asynchronous=True
标志,事件循环实际上是在一个单独的线程中运行,所以我们需要请求客户端为我们运行这个。
[ ]:
client.sync(run_all, filenames)
3: 从任务提交任务¶
我们也可以提交本身提交更多任务的任务。请参阅 这里的文档。
[ ]:
%%time
from dask.distributed import get_client, secede, rejoin
def f(fn):
L = parse_file(fn)
client = get_client()
futures = client.map(process_item, L, priority=10)
secede()
results = client.gather(futures)
rejoin()
return results
futures = client.map(f, filenames, pure=False)
results = client.gather(futures)