实时笔记本

您可以在 live session Binder 中运行此笔记本,或查看 Github 上的内容。

处理不断演变的工作流程

对于某些工作流程,我们一开始并不知道计算的范围。我们需要进行一些计算,以便弄清楚我们需要进行的其余计算。随着我们做更多的工作,计算会增长和演变。

例如,考虑一个需要读取许多文件的情况,然后根据这些文件的内容触发额外的工作。您希望并行读取文件,然后在每个文件中暴露更多的并行性。

本示例通过三种方式使用 Dask Futures 来处理这种情况。

  1. 使用 as_completed

  2. 使用 async/await

  3. 从任务启动任务

但首先,让我们按顺序运行我们的代码。

0: 顺序代码

[ ]:
filenames = ["file.{}.txt".format(i) for i in range(10)]

filenames[:3]
[ ]:
import random, time


def parse_file(fn: str) -> list:
    """ Returns a list work items of unknown length """
    time.sleep(random.random())
    return [random.random() for _ in range(random.randint(1, 10))]

def process_item(x: float):
    """ Process each work item """
    time.sleep(random.random() / 4)
    return x + 1
[ ]:
%%time

# This takes around 10-20s

results = []

for fn in filenames:
    L = parse_file(fn)
    for x in L:
        out = process_item(x)
        results.append(out)

启动 Dask 客户端

为了管理动态工作负载,我们需要一个 Dask 客户端。

[ ]:
from dask.distributed import Client

client = Client(processes=False, n_workers=1, threads_per_worker=6)
client

1: 使用 as_completed

as_completed 迭代器让我们可以在未来对象完成时处理它们。然后我们可以动态提交更多数据。

  • 我们为每个文件名提交一个任务

  • 我们还计算返回的每个列表的长度

  • 随着这些长度返回,我们提交一个新任务来获取该列表中的每一项。我们以更高的优先级执行此操作,以便在收集新数据之前处理现有数据。

  • 我们等待所有返回的结果

[ ]:
%%time

from dask.distributed import as_completed
import operator

lists = client.map(parse_file, filenames, pure=False)
lengths = client.map(len, lists)

mapping = dict(zip(lengths, lists))

futures = []

for future in as_completed(lengths):
    n = future.result()
    L = mapping[future]
    for i in range(n):
        new = client.submit(operator.getitem, L, i, priority=1)
        new = client.submit(process_item, new, priority=1)
        futures.append(new)

client.gather(futures)

2: 使用 async/await 处理本地单文件处理

我们也可以在我们的本地进程中处理并发。这需要你理解 async/await 语法,但通常更强大,并且可以说比上面的 as_completed 方法更简单。

[ ]:
import asyncio

async def f(fn):
    """ Handle the lifecycle of a single file """
    future = client.submit(parse_file, fn, pure=False)
    length_future = client.submit(len, future)
    length = await length_future

    futures = [client.submit(operator.getitem, future, i, priority=10)
               for i in range(length)]
    futures = client.map(process_item, futures, priority=10)
    return futures

async def run_all(filenames):
    list_of_list_of_futures = await asyncio.gather(*[f(fn) for fn in filenames])
    futures = sum(list_of_list_of_futures, [])
    return await client.gather(futures)

我们现在需要在客户端运行的同一个事件循环中运行这个函数。如果我们已经异步启动了客户端,那么我们可以这样做:

client = await Client(asynchronous=True)

await run_all(filenames)

然而,因为我们启动客户端时没有使用 asynchronous=True 标志,事件循环实际上是在一个单独的线程中运行,所以我们需要请求客户端为我们运行这个。

[ ]:
client.sync(run_all, filenames)

3: 从任务提交任务

我们也可以提交本身提交更多任务的任务。请参阅 这里的文档

[ ]:
%%time

from dask.distributed import get_client, secede, rejoin

def f(fn):
    L = parse_file(fn)
    client = get_client()

    futures = client.map(process_item, L, priority=10)
    secede()
    results = client.gather(futures)
    rejoin()
    return results

futures = client.map(f, filenames, pure=False)
results = client.gather(futures)