插件

Dask的插件系统使您能够为某些事件运行自定义Python代码。您可以使用特定于调度器、工作节点或保姆的插件。例如,工作节点插件允许您在工作节点的生命周期中的某些事件(例如工作节点进程启动时)在所有工作节点上运行自定义Python代码。在下面的每个部分中,您将看到如何创建自己的插件或使用Dask提供的内置插件。

调度器插件

class distributed.diagnostics.plugin.SchedulerPlugin[源代码]

扩展调度器的接口

插件能够在特定事件发生时运行自定义钩子。每当调度器的相应方法运行时,调度器将运行此插件的方法。这使得用户代码可以在调度器线程内运行,并能与调度器本身同步执行任意操作。

插件通常用于诊断和测量,但它们可以完全访问调度器,原则上可能会影响核心调度。

要实现一个插件:

  1. 继承自此类

  2. 覆盖其某些方法

  3. 使用 Client.register_plugin 注册插件。

idempotent 属性用于控制如果已经存在同名的调度器插件时,是否应该忽略该插件的注册。如果为 True,则忽略该插件,否则替换现有的插件。默认为 False

示例

>>> class Counter(SchedulerPlugin):
...     def __init__(self):
...         self.counter = 0
...
...     def transition(self, key, start, finish, *args, **kwargs):
...         if start == 'processing' and finish == 'memory':
...             self.counter += 1
...
...     def restart(self, scheduler):
...         self.counter = 0
>>> plugin = Counter()
>>> scheduler.add_plugin(plugin)  
add_client(scheduler: Scheduler, client: str) None[源代码]

当新客户端连接时运行

add_worker(scheduler: Scheduler, worker: str) None | Awaitable[None][源代码]

当新工作者进入集群时运行

如果此方法是同步的,它将立即同步执行,而不会让 Scheduler.add_worker 让出事件循环。如果它是异步的,它将在所有同步的 SchedulerPlugin.add_worker 钩子执行完毕后被等待。

警告

无法保证各个 SchedulerPlugin.add_worker 钩子之间的执行顺序,且顺序可能会在没有弃用周期的情况下发生变化。

async before_close() None[源代码]

在任何调度器关闭逻辑之前运行

async close() None[源代码]

调度器关闭时运行

这会在调度器关闭过程的开始时运行,但在工作进程被要求优雅地关闭之后。

log_event(topic: str, msg: Any) None[源代码]

在记录事件时运行

remove_client(scheduler: Scheduler, client: str) None[源代码]

客户端断开连接时运行

remove_worker(scheduler: Scheduler, worker: str, *, stimulus_id: str, **kwargs: Any) None | Awaitable[None][源代码]

当一个工作节点离开集群时运行

如果此方法是同步的,它将立即同步执行,而不会让 Scheduler.remove_worker 让出事件循环。如果它是异步的,它将在所有同步的 SchedulerPlugin.remove_worker 钩子执行后被等待。

警告

在各个 SchedulerPlugin.remove_worker 钩子之间的执行顺序没有保证,顺序可能会在没有弃用周期的情况下发生变化。

restart(scheduler: Scheduler) None[源代码]

在调度器重新启动时运行

async start(scheduler: Scheduler) None[源代码]

在调度器启动时运行

这会在调度器启动过程的末尾运行

transition(key: Key, start: SchedulerTaskStateState, finish: SchedulerTaskStateState, *args: Any, stimulus_id: str, **kwargs: Any) None[源代码]

每当任务状态改变时运行

有关过渡机制和可用状态的描述,请参阅 调度器任务状态

警告

这是一个高级功能,任务状态的转换机制和细节可能会在没有弃用周期的情况下发生变化。

参数
关键
开始

过渡的初始状态。可以是 released、waiting、processing、memory、error 之一。

完成

过渡的最终状态。

stimulus_id

引起过渡的刺激的ID。

*args, **kwargs

更多选项在转换时传递 这可能包括工作ID、计算时间等。

update_graph(scheduler: Scheduler, *, client: str, keys: set[Key], tasks: list[Key], annotations: dict[str, dict[Key, Any]], priority: dict[Key, tuple[int | float, ...]], dependencies: dict[Key, set[Key]], **kwargs: Any) None[源代码]

当新的图/任务进入调度器时运行

参数
调度器:

Scheduler 实例。

客户端:

唯一的客户端ID。

键:

客户端在调用 update_graph 时感兴趣的键。

任务:

The

注解:

完全解析的注释应用于以下格式的任务:

{
    "annotation": {
        "key": "value,
        ...
    },
    ...
}
优先级:

任务根据分配的任务计算优先级。

依赖项:

一个将键映射到其依赖项的映射。

**kwargs:

建议允许插件接受更多参数,以确保未来的兼容性。

valid_workers_downscaling(scheduler: Scheduler, workers: list[scheduler_module.WorkerState]) list[scheduler_module.WorkerState][源代码]

确定可以从集群中移除哪些工作节点

当调度器即将通过移除工作节点来缩减集群规模时,会调用此方法。该方法应返回一组可以从集群中移除的工作节点状态。

参数
工人列表

待移除的工作者状态列表。

stimulus_idstr

导致降级的刺激的ID。

返回
列表

可以从集群中移除的工作者状态列表。

RabbitMQ 示例

RabbitMQ 是一个分布式消息队列,我们可以用它来发布任务状态转换的更新。通过向 RabbitMQ 发布状态转换,我们允许其他机器处理这些转换,并将调度器的处理保持在最低限度。有关 RabbitMQ 及其如何消费消息的更多信息,请参阅 RabbitMQ 教程

import json
from distributed.diagnostics.plugin import SchedulerPlugin
import pika

class RabbitMQPlugin(SchedulerPlugin):
    def __init__(self):
        # Update host to be your RabbitMQ host
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue='dask_task_status', durable=True)

    def transition(self, key, start, finish, *args, **kwargs):
        message = dict(
            key=key,
            start=start,
            finish=finish,
        )
        self.channel.basic_publish(
            exchange='',
            routing_key='dask_task_status',
            body=json.dumps(message),
            properties=pika.BasicProperties(
                delivery_mode=2,  # make message persistent
            ))

@click.command()
def dask_setup(scheduler):
    plugin = RabbitMQPlugin()
    scheduler.add_plugin(plugin)

运行方式:dask scheduler --preload <filename.py>

访问完整任务状态

如果你想访问调度器中存储的完整 distributed.scheduler.TaskState,你可以通过传递并存储对调度器的引用来实现,如下所示:

from distributed.diagnostics.plugin import SchedulerPlugin

class MyPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
         self.scheduler = scheduler

    def transition(self, key, start, finish, *args, **kwargs):
         # Get full TaskState
         ts = self.scheduler.tasks[key]

@click.command()
def dask_setup(scheduler):
    plugin = MyPlugin(scheduler)
    scheduler.add_plugin(plugin)

内置调度器插件

class distributed.diagnostics.plugin.PipInstall(packages: list[str], pip_options: list[str] | None = None, restart_workers: bool = False)[源代码]

一个用于 pip 安装一组包的插件

这接受一组要在所有工作节点上安装的软件包,以及安装时使用的选项。您还可以选择在执行此安装后要求工作节点重新启动自身。

备注

这将增加每个工作进程的启动时间。如果可能,我们建议将库包含在工作环境或镜像中。这主要用于实验和调试。

参数

使用 pip 安装的包列表。包应遵循 需求文件 定义的结构。包也可以包含 环境变量

pip_options

传递给 pip 的额外选项

重启工作进程

是否在安装包后重新启动工作进程;仅在工作进程有附加的保姆进程时有效。

示例

>>> from dask.distributed import PipInstall
>>> plugin = PipInstall(packages=["scikit-learn"], pip_options=["--upgrade"])
>>> client.register_plugin(plugin)

使用 TOKEN 环境变量从私有仓库安装包。

>>> from dask.distributed import PipInstall
>>> plugin = PipInstall(packages=["private_package@git+https://${TOKEN}@github.com/dask/private_package.git])
>>> client.register_plugin(plugin)
class distributed.diagnostics.plugin.CondaInstall(packages: list[str], conda_options: list[str] | None = None, restart_workers: bool = False)[源代码]

一个用于conda安装一组包的插件

这接受一组要在调度器和所有工作节点上安装的软件包,以及安装时使用的选项。

您也可以选择在完成此安装后要求工作人员重新启动。

备注

这将增加集群启动所需的时间。如果可能,我们建议将库包含在集群环境或镜像中。这主要是用于实验和调试。

参数

使用 conda 安装的包列表(可选版本)

conda_选项

传递给 conda 的额外选项

重启工作进程

是否在安装包后重启工作进程。仅在工作者进程附带保姆进程时有效。

示例

>>> from dask.distributed import CondaInstall
>>> plugin = CondaInstall(packages=["scikit-learn"], conda_options=["--update-deps"])
>>> client.register_plugin(plugin)
class distributed.diagnostics.plugin.InstallPlugin(install_fn: Callable[[], None], restart_workers: bool)[源代码]

调度器插件以在集群上安装软件

这接受一个在调度器和所有工作节点上安装软件的函数。你也可以选择在执行此安装后要求工作节点重启。

备注

这将增加每个工作进程的启动时间。如果可能,我们建议将软件包含在工作环境或镜像中。这主要是用于实验和调试。

参数
install_fn

用于安装软件的可调用对象;必须是幂等的。

重启工作进程

是否在安装包后重启工作进程。仅在工作进程有附加的保姆进程时有效。

class distributed.diagnostics.plugin.SchedulerUploadFile(filepath: str, load: bool = True)[源代码]

工作插件

distributed.diagnostics.plugin.WorkerPlugin 提供了一个用于创建自己的工作插件的基类。此外,Dask 还提供了一些 内置插件

观看下面的视频,了解如何使用 WorkerPlugin 添加 concurrent.futures.ProcessPoolExecutor 的示例:

class distributed.diagnostics.plugin.WorkerPlugin[源代码]

扩展Worker的接口

一个 worker 插件使自定义代码能够在 Workers 生命周期的不同阶段运行。

插件使自定义代码能够在 Worker 生命周期的每个步骤中运行。每当发生此类事件时,此类上的相应方法将被调用。请注意,用户代码始终在 Worker 的主线程中运行。

要实现一个插件:

  1. 继承自此类

  2. 覆盖其某些方法

  3. 使用 Client.register_plugin 注册插件。

idempotent 属性用于控制如果同名的 worker 插件已经存在时,是否应该忽略该插件的注册。如果为 True,则忽略该插件,否则替换现有的插件。默认为 False

示例

>>> class ErrorLogger(WorkerPlugin):
...     def __init__(self, logger):
...         self.logger = logger
...
...     def setup(self, worker):
...         self.worker = worker
...
...     def transition(self, key, start, finish, *args, **kwargs):
...         if finish == 'error':
...             ts = self.worker.tasks[key]
...             exc_info = (type(ts.exception), ts.exception, ts.traceback)
...             self.logger.error(
...                 "Error during computation of '%s'.", key,
...                 exc_info=exc_info
...             )
>>> import logging
>>> plugin = ErrorLogger(logging)
>>> client.register_plugin(plugin)  
setup(worker: Worker) None | Awaitable[None][源代码]

在插件附加到工作进程时运行。这发生在插件注册并附加到现有工作进程时,或者在工作进程在插件注册后创建时。

teardown(worker: Worker) None | Awaitable[None][源代码]

当插件所附加的工作者关闭时,或当插件被移除时运行。

transition(key: Key, start: WorkerTaskStateState, finish: WorkerTaskStateState, **kwargs: Any) None[源代码]

在整个任务的生命周期中(参见 Worker 状态),Worker 由调度器指示计算某些任务,从而导致每个任务状态的转换。拥有该任务的 Worker 随后会收到此状态转换的通知。

每当任务改变其状态时,将调用此方法。

警告

这是一个高级功能,任务状态的转换机制和细节可能会在没有弃用周期的情况下发生变化。

参数
关键
开始

过渡的初始状态。可以是等待、准备、执行、长时间运行、内存、错误之一。

完成

过渡的最终状态。

kwargs

转换时传递的更多选项

内置工作插件

class distributed.diagnostics.plugin.UploadFile(filepath: str, load: bool = True)[源代码]

一个用于将本地文件上传到工作者的 WorkerPlugin。

参数
filepath: str

要上传的文件路径(.py、egg 或 zip)

示例

>>> from distributed.diagnostics.plugin import UploadFile
>>> client.register_plugin(UploadFile("/path/to/file.py"))  

保姆插件

class distributed.diagnostics.plugin.NannyPlugin[源代码]

扩展Nanny的接口

一个 worker 插件使得自定义代码可以在 Workers 生命周期的不同阶段运行。一个 nanny 插件做同样的事情,但它可以在 worker 启动之前运行代码,或者在必要时重启 worker。

要实现一个插件:

  1. 继承自此类

  2. 覆盖其某些方法

  3. 使用 Client.register_plugin 注册插件。

idempotent 属性用于控制如果已经存在一个同名的保姆插件时,是否应该忽略该插件的注册。如果为 True,则忽略该插件,否则替换现有的插件。默认为 False

restart 属性用于控制当注册插件时,是否需要重新启动正在运行的 Worker

setup(nanny)[源代码]

在插件附加到保姆时运行。这发生在插件注册并附加到现有的保姆时,或者在插件注册后创建保姆时。

teardown(nanny)[源代码]

当附加该插件的保姆关闭时运行

内置保姆插件

class distributed.diagnostics.plugin.Environ(environ: dict | None = None)[源代码]
class distributed.diagnostics.plugin.UploadDirectory(path, restart=False, update_path=False, skip_words=('.git', '.github', '.pytest_cache', 'tests', 'docs'), skip=(<function UploadDirectory.<lambda>>, ))[源代码]

一个用于将本地文件上传到工作者的 NannyPlugin。

参数
路径: str

上传目录的路径

示例

>>> from distributed.diagnostics.plugin import UploadDirectory
>>> client.register_plugin(UploadDirectory("/path/to/directory"), nanny=True)