自定义图表

有时你可能想要进行并行计算,但你的应用程序并不适合像 Dask Array 或 Dask Bag 这样的工具。在这些情况下,你可以直接与 Dask 调度器交互。这些调度器作为独立模块运行良好。

这种分离为复杂情况提供了一个释放阀,并允许高级项目即使在那些项目有内部计算表示的情况下,也能有额外的并行执行机会。随着Dask调度器的改进或扩展到分布式内存,为使用Dask调度器编写的代码也将随之进步。

示例

数据管道的 Dask 图

如在 动机规范 部分所讨论的,调度器接收一个任务图(这是一个函数元组的字典)和一个从该图中期望的键的列表。

以下是一个为传统清洗和分析管道构建图形的模拟示例:

def load(filename):
    ...

def clean(data):
    ...

def analyze(sequence_of_data):
    ...

def store(result):
    with open(..., 'w') as f:
        f.write(result)

dsk = {'load-1': (load, 'myfile.a.data'),
       'load-2': (load, 'myfile.b.data'),
       'load-3': (load, 'myfile.c.data'),
       'clean-1': (clean, 'load-1'),
       'clean-2': (clean, 'load-2'),
       'clean-3': (clean, 'load-3'),
       'analyze': (analyze, ['clean-%d' % i for i in [1, 2, 3]]),
       'store': (store, 'analyze')}

from dask.threaded import get
get(dsk, 'store')  # executes in parallel

自定义 Dask 图中的关键字参数

有时,您可能希望在自定义 Dask 图中向函数传递关键字参数。您可以使用 dask.utils.apply() 函数来实现这一点,如下所示:

from dask.utils import apply

task = (apply, func, args, kwargs)  # equivalent to func(*args, **kwargs)

dsk = {'task-name': task,
        ...
       }

在上面的例子中:

  • args 应该是一个元组(例如:(arg_1, arg_2, arg_3)),并且

  • kwargs 应该是一个字典(例如:{"kwarg_1": value, "kwarg_2": value}