插件
内容
插件¶
Dask的插件系统使您能够为某些事件运行自定义Python代码。您可以使用特定于调度器、工作节点或保姆的插件。例如,工作节点插件允许您在工作节点的生命周期中的某些事件(例如工作节点进程启动时)在所有工作节点上运行自定义Python代码。在下面的每个部分中,您将看到如何创建自己的插件或使用Dask提供的内置插件。
调度器插件¶
- class distributed.diagnostics.plugin.SchedulerPlugin[源代码]¶
扩展调度器的接口
插件能够在特定事件发生时运行自定义钩子。每当调度器的相应方法运行时,调度器将运行此插件的方法。这使得用户代码可以在调度器线程内运行,并能与调度器本身同步执行任意操作。
插件通常用于诊断和测量,但它们可以完全访问调度器,原则上可能会影响核心调度。
要实现一个插件:
继承自此类
覆盖其某些方法
使用
Client.register_plugin
注册插件。
idempotent
属性用于控制如果已经存在同名的调度器插件时,是否应该忽略该插件的注册。如果为True
,则忽略该插件,否则替换现有的插件。默认为False
。示例
>>> class Counter(SchedulerPlugin): ... def __init__(self): ... self.counter = 0 ... ... def transition(self, key, start, finish, *args, **kwargs): ... if start == 'processing' and finish == 'memory': ... self.counter += 1 ... ... def restart(self, scheduler): ... self.counter = 0
>>> plugin = Counter() >>> scheduler.add_plugin(plugin)
- add_worker(scheduler: Scheduler, worker: str) None | Awaitable[None] [源代码]¶
当新工作者进入集群时运行
如果此方法是同步的,它将立即同步执行,而不会让
Scheduler.add_worker
让出事件循环。如果它是异步的,它将在所有同步的SchedulerPlugin.add_worker
钩子执行完毕后被等待。警告
无法保证各个
SchedulerPlugin.add_worker
钩子之间的执行顺序,且顺序可能会在没有弃用周期的情况下发生变化。
- remove_worker(scheduler: Scheduler, worker: str, *, stimulus_id: str, **kwargs: Any) None | Awaitable[None] [源代码]¶
当一个工作节点离开集群时运行
如果此方法是同步的,它将立即同步执行,而不会让
Scheduler.remove_worker
让出事件循环。如果它是异步的,它将在所有同步的SchedulerPlugin.remove_worker
钩子执行后被等待。警告
在各个
SchedulerPlugin.remove_worker
钩子之间的执行顺序没有保证,顺序可能会在没有弃用周期的情况下发生变化。
- transition(key: Key, start: SchedulerTaskStateState, finish: SchedulerTaskStateState, *args: Any, stimulus_id: str, **kwargs: Any) None [源代码]¶
每当任务状态改变时运行
有关过渡机制和可用状态的描述,请参阅 调度器任务状态。
警告
这是一个高级功能,任务状态的转换机制和细节可能会在没有弃用周期的情况下发生变化。
- 参数
- 关键
- 开始
过渡的初始状态。可以是 released、waiting、processing、memory、error 之一。
- 完成
过渡的最终状态。
- stimulus_id
引起过渡的刺激的ID。
- *args, **kwargs
更多选项在转换时传递 这可能包括工作ID、计算时间等。
- update_graph(scheduler: Scheduler, *, client: str, keys: set[Key], tasks: list[Key], annotations: dict[str, dict[Key, Any]], priority: dict[Key, tuple[int | float, ...]], dependencies: dict[Key, set[Key]], **kwargs: Any) None [源代码]¶
当新的图/任务进入调度器时运行
- 参数
- 调度器:
Scheduler 实例。
- 客户端:
唯一的客户端ID。
- 键:
客户端在调用 update_graph 时感兴趣的键。
- 任务:
The
- 注解:
完全解析的注释应用于以下格式的任务:
{ "annotation": { "key": "value, ... }, ... }
- 优先级:
任务根据分配的任务计算优先级。
- 依赖项:
一个将键映射到其依赖项的映射。
- **kwargs:
建议允许插件接受更多参数,以确保未来的兼容性。
RabbitMQ 示例¶
RabbitMQ 是一个分布式消息队列,我们可以用它来发布任务状态转换的更新。通过向 RabbitMQ 发布状态转换,我们允许其他机器处理这些转换,并将调度器的处理保持在最低限度。有关 RabbitMQ 及其如何消费消息的更多信息,请参阅 RabbitMQ 教程。
import json
from distributed.diagnostics.plugin import SchedulerPlugin
import pika
class RabbitMQPlugin(SchedulerPlugin):
def __init__(self):
# Update host to be your RabbitMQ host
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
self.channel.queue_declare(queue='dask_task_status', durable=True)
def transition(self, key, start, finish, *args, **kwargs):
message = dict(
key=key,
start=start,
finish=finish,
)
self.channel.basic_publish(
exchange='',
routing_key='dask_task_status',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
@click.command()
def dask_setup(scheduler):
plugin = RabbitMQPlugin()
scheduler.add_plugin(plugin)
运行方式:dask scheduler --preload <filename.py>
访问完整任务状态¶
如果你想访问调度器中存储的完整 distributed.scheduler.TaskState
,你可以通过传递并存储对调度器的引用来实现,如下所示:
from distributed.diagnostics.plugin import SchedulerPlugin
class MyPlugin(SchedulerPlugin):
def __init__(self, scheduler):
self.scheduler = scheduler
def transition(self, key, start, finish, *args, **kwargs):
# Get full TaskState
ts = self.scheduler.tasks[key]
@click.command()
def dask_setup(scheduler):
plugin = MyPlugin(scheduler)
scheduler.add_plugin(plugin)
内置调度器插件¶
- class distributed.diagnostics.plugin.PipInstall(packages: list[str], pip_options: list[str] | None = None, restart_workers: bool = False)[源代码]¶
一个用于 pip 安装一组包的插件
这接受一组要在所有工作节点上安装的软件包,以及安装时使用的选项。您还可以选择在执行此安装后要求工作节点重新启动自身。
备注
这将增加每个工作进程的启动时间。如果可能,我们建议将库包含在工作环境或镜像中。这主要用于实验和调试。
- 参数
示例
>>> from dask.distributed import PipInstall >>> plugin = PipInstall(packages=["scikit-learn"], pip_options=["--upgrade"]) >>> client.register_plugin(plugin)
使用
TOKEN
环境变量从私有仓库安装包。>>> from dask.distributed import PipInstall >>> plugin = PipInstall(packages=["private_package@git+https://${TOKEN}@github.com/dask/private_package.git]) >>> client.register_plugin(plugin)
- class distributed.diagnostics.plugin.CondaInstall(packages: list[str], conda_options: list[str] | None = None, restart_workers: bool = False)[源代码]¶
一个用于conda安装一组包的插件
这接受一组要在调度器和所有工作节点上安装的软件包,以及安装时使用的选项。
您也可以选择在完成此安装后要求工作人员重新启动。
备注
这将增加集群启动所需的时间。如果可能,我们建议将库包含在集群环境或镜像中。这主要是用于实验和调试。
- 参数
- 包
使用 conda 安装的包列表(可选版本)
- conda_选项
传递给 conda 的额外选项
- 重启工作进程
是否在安装包后重启工作进程。仅在工作者进程附带保姆进程时有效。
示例
>>> from dask.distributed import CondaInstall >>> plugin = CondaInstall(packages=["scikit-learn"], conda_options=["--update-deps"])
>>> client.register_plugin(plugin)
- class distributed.diagnostics.plugin.InstallPlugin(install_fn: Callable[[], None], restart_workers: bool)[源代码]¶
调度器插件以在集群上安装软件
这接受一个在调度器和所有工作节点上安装软件的函数。你也可以选择在执行此安装后要求工作节点重启。
备注
这将增加每个工作进程的启动时间。如果可能,我们建议将软件包含在工作环境或镜像中。这主要是用于实验和调试。
- 参数
- install_fn
用于安装软件的可调用对象;必须是幂等的。
- 重启工作进程
是否在安装包后重启工作进程。仅在工作进程有附加的保姆进程时有效。
工作插件¶
distributed.diagnostics.plugin.WorkerPlugin
提供了一个用于创建自己的工作插件的基类。此外,Dask 还提供了一些 内置插件。
观看下面的视频,了解如何使用 WorkerPlugin
添加 concurrent.futures.ProcessPoolExecutor
的示例:
- class distributed.diagnostics.plugin.WorkerPlugin[源代码]¶
扩展Worker的接口
一个 worker 插件使自定义代码能够在 Workers 生命周期的不同阶段运行。
插件使自定义代码能够在 Worker 生命周期的每个步骤中运行。每当发生此类事件时,此类上的相应方法将被调用。请注意,用户代码始终在 Worker 的主线程中运行。
要实现一个插件:
继承自此类
覆盖其某些方法
使用
Client.register_plugin
注册插件。
idempotent
属性用于控制如果同名的 worker 插件已经存在时,是否应该忽略该插件的注册。如果为True
,则忽略该插件,否则替换现有的插件。默认为False
。示例
>>> class ErrorLogger(WorkerPlugin): ... def __init__(self, logger): ... self.logger = logger ... ... def setup(self, worker): ... self.worker = worker ... ... def transition(self, key, start, finish, *args, **kwargs): ... if finish == 'error': ... ts = self.worker.tasks[key] ... exc_info = (type(ts.exception), ts.exception, ts.traceback) ... self.logger.error( ... "Error during computation of '%s'.", key, ... exc_info=exc_info ... )
>>> import logging >>> plugin = ErrorLogger(logging) >>> client.register_plugin(plugin)
- setup(worker: Worker) None | Awaitable[None] [源代码]¶
在插件附加到工作进程时运行。这发生在插件注册并附加到现有工作进程时,或者在工作进程在插件注册后创建时。
- transition(key: Key, start: WorkerTaskStateState, finish: WorkerTaskStateState, **kwargs: Any) None [源代码]¶
在整个任务的生命周期中(参见 Worker 状态),Worker 由调度器指示计算某些任务,从而导致每个任务状态的转换。拥有该任务的 Worker 随后会收到此状态转换的通知。
每当任务改变其状态时,将调用此方法。
警告
这是一个高级功能,任务状态的转换机制和细节可能会在没有弃用周期的情况下发生变化。
- 参数
- 关键
- 开始
过渡的初始状态。可以是等待、准备、执行、长时间运行、内存、错误之一。
- 完成
过渡的最终状态。
- kwargs
转换时传递的更多选项
保姆插件¶
- class distributed.diagnostics.plugin.NannyPlugin[源代码]¶
扩展Nanny的接口
一个 worker 插件使得自定义代码可以在 Workers 生命周期的不同阶段运行。一个 nanny 插件做同样的事情,但它可以在 worker 启动之前运行代码,或者在必要时重启 worker。
要实现一个插件:
继承自此类
覆盖其某些方法
使用
Client.register_plugin
注册插件。
idempotent
属性用于控制如果已经存在一个同名的保姆插件时,是否应该忽略该插件的注册。如果为True
,则忽略该插件,否则替换现有的插件。默认为False
。restart
属性用于控制当注册插件时,是否需要重新启动正在运行的Worker
。
内置保姆插件¶
- class distributed.diagnostics.plugin.UploadDirectory(path, restart=False, update_path=False, skip_words=('.git', '.github', '.pytest_cache', 'tests', 'docs'), skip=(<function UploadDirectory.<lambda>>, ))[源代码]¶
一个用于将本地文件上传到工作者的 NannyPlugin。
- 参数
- 路径: str
上传目录的路径
示例
>>> from distributed.diagnostics.plugin import UploadDirectory >>> client.register_plugin(UploadDirectory("/path/to/directory"), nanny=True)