自定义初始化

通常我们希望在启动或关闭调度器或工作进程时运行自定义代码。我们可能会使用 Client.runClient.run_on_scheduler 等函数手动执行此操作,但这容易出错且难以自动化。

为了解决这个问题,Dask 包含了一些机制,用于在 Scheduler、Worker、Nanny 或 Client 的生命周期中运行任意代码。

预加载脚本

dask-schedulerdask-worker 都支持 --preload 选项,该选项允许分别对每个调度器/工作器进行自定义初始化。作为 --preload 值传递的模块或Python文件保证在建立任何连接之前导入。如果找到 dask_setup(service) 函数,则会调用该函数,参数为 SchedulerWorkerNannyClient 实例。当服务停止时,如果存在 dask_teardown(service),则会调用它。

为了支持额外的配置,一个单一的 --preload 模块可以通过暴露 dask_setup 作为一个 Click 命令来注册额外的命令行参数。这个命令将被用来解析提供给 dask-workerdask-scheduler 的额外参数,并且在服务初始化之前被调用。

示例

作为一个例子,考虑以下创建 调度器插件 并将其注册到调度器的文件

# scheduler-setup.py
import click

from distributed.diagnostics.plugin import SchedulerPlugin

class MyPlugin(SchedulerPlugin):
    def __init__(self, print_count):
      self.print_count = print_count
      super().__init__()

    def add_worker(self, scheduler=None, worker=None, **kwargs):
        print("Added a new worker at:", worker)
        if self.print_count and scheduler is not None:
            print("Total workers:", len(scheduler.workers))

@click.command()
@click.option("--print-count/--no-print-count", default=False)
def dask_setup(scheduler, print_count):
    plugin = MyPlugin(print_count)
    scheduler.add_plugin(plugin)

然后,我们可以在启动调度器时通过引用其文件名(如果它在路径上,则为模块名)来运行此预加载脚本:

dask-scheduler --preload scheduler-setup.py --print-count

类型

预加载可以指定为以下任何一种形式:

  • 一个脚本的路径,例如 /path/to/myfile.py

  • 在路径上的模块名称,例如 my_module.initialize

  • Python 脚本的文本,如 import os; os.environ["A"] = "value"

配置

预加载也可以通过以下配置值进行注册:

distributed:
  scheduler:
    preload:
    - "import os; os.environ['A'] = 'b'"  # use Python text
    - /path/to/myfile.py                  # or a filename
    - my_module                           # or a module name
    preload-argv:
    - []                                  # Pass optional keywords
    - ["--option", "value"]
    - []
  worker:
    preload: []
    preload-argv: []
  nanny:
    preload: []
    preload-argv: []
  client:
    preload: []
    preload-argv: []

备注

由于 dask-worker 命令需要同时接受 Worker 和 Nanny(如果使用 Nanny)的关键字,因此它既有 --preload 也有 --preload-nanny 关键字。所有额外的关键字(如上面的 --print-count)将发送给 Worker 而不是 Nanny。无法在命令行上为 Nanny 预加载脚本指定额外关键字。如果需要,我们建议使用更灵活的配置。

Worker 生命周期插件

你还可以创建一个包含 setupteardowntransition 方法的类,并通过 Client.register_worker_plugin 方法将其注册到调度器中,以便分配给每个工作进程。

Client.register_worker_plugin(plugin[, ...])

为所有当前和未来的工作器注册一个生命周期工作器插件。

Client.register_worker_plugin(plugin: distributed.diagnostics.plugin.NannyPlugin | distributed.diagnostics.plugin.WorkerPlugin, name: str | None = None, nanny: bool | None = None)[源代码]

为所有当前和未来的工作器注册一个生命周期工作器插件。

2023.9.2 版后已移除: 请使用 Client.register_plugin() 代替。

这注册了一个新对象来处理此集群中工作者的设置、任务状态转换和拆卸。插件将在所有当前连接的工作者上实例化自身。它还将在未来连接的任何工作者上运行。

该插件可能包含方法 setupteardowntransitionrelease_key。请参阅 dask.distributed.WorkerPlugin 类或下面的示例以了解接口和文档字符串。它必须可以通过 pickle 或 cloudpickle 模块进行序列化。

如果插件有一个 name 属性,或者使用了 name= 关键字,那么这将控制幂等性。如果已经注册了具有该名称的插件,那么它将被移除并替换为新的插件。

对于插件的替代方案,您可能还希望了解预加载脚本。

参数
插件WorkerPlugin 或 NannyPlugin

要注册的 WorkerPlugin 或 NannyPlugin 实例。

名称str, 可选

插件的名称。使用相同名称注册插件将不会有任何效果。如果插件没有名称属性,则使用随机名称。

保姆bool, 可选

是否将插件注册到工作节点或保姆节点。

参见

distributed.WorkerPlugin
unregister_worker_plugin

示例

>>> class MyPlugin(WorkerPlugin):
...     def __init__(self, *args, **kwargs):
...         pass  # the constructor is up to you
...     def setup(self, worker: dask.distributed.Worker):
...         pass
...     def teardown(self, worker: dask.distributed.Worker):
...         pass
...     def transition(self, key: str, start: str, finish: str,
...                    **kwargs):
...         pass
...     def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool):
...         pass
>>> plugin = MyPlugin(1, 2, 3)
>>> client.register_plugin(plugin)

你可以通过 get_worker 函数访问插件

>>> client.register_plugin(other_plugin, name='my-plugin')
>>> def f():
...    worker = get_worker()
...    plugin = worker.plugins['my-plugin']
...    return plugin.my_state
>>> future = client.run(f)