开发指南
内容
开发指南¶
此仓库是 Dask 项目的一部分。 一般开发指南包括在哪里寻求帮助、仓库布局、测试实践以及文档和风格标准,可在主文档中的 Dask 开发者指南 中找到。
安装¶
使用 git 克隆此仓库:
git clone git@github.com:dask/distributed.git cd distributed
安装 anaconda 或 miniconda(取决于操作系统)
conda env create --file continuous_integration/environment-3.11.yaml conda activate dask-distributed python -m pip install -e .
要保持分支与上游源代码同步:
cd distributed
git remote add upstream git@github.com:dask/distributed.git
git remote -v
git fetch -a upstream
git checkout main
git pull upstream main
git push origin main
Tornado¶
Dask.distributed 是一个 Tornado TCP 应用程序。Tornado 为我们提供了基于套接字的通信层,以及用于编写异步协程的语法,类似于 asyncio。你可以在不了解太多 Tornado 的情况下对这个库中的策略进行适度更改,但是中等程度的更改可能需要你理解 Tornado IOLoops、协程以及一些关于非阻塞通信的知识。Tornado API 文档非常出色,我们建议你阅读以下资源:
此外,如果你想在较低层次上与工作者和调度器之间的通信进行交互,那么你应该理解这里可用的 Tornado TCPServer
和 IOStream
。
Dask.distributed 在 Tornado 周围封装了一些逻辑。更多信息请参见 基础。
编写测试¶
测试分布式系统通常非常困难,因为当出现问题时,很难检查所有组件的状态。幸运的是,Tornado 中的非阻塞异步模型允许我们在单个线程中运行调度器、多个工作者和多个客户端。这为我们提供了可预测的性能、干净的关闭,以及在执行过程中进入代码任何点的能力。同时,有时我们希望所有内容在不同的进程中运行,以模拟更真实的设置。
测试套件包含三种测试
@gen_cluster
: 完全异步测试,其中所有组件在主线程的同一事件循环中运行。这些测试适用于测试复杂逻辑并直接检查系统状态。它们也更容易调试,并且在关闭时引发的问题最少。def test_foo(client)
: 使用从主进程分叉的多个进程进行测试。这些测试适用于同步(普通用户)API的测试,以及在触发硬故障以进行弹性测试时使用。popen
: 调用命令行启动系统的测试。这些测试很少,主要用于测试命令行接口。
如果你对 Tornado 接口感到满意,那么你会最开心地使用 @gen_cluster
风格的测试,例如:
# tests/test_submit.py
from distributed.utils_test import gen_cluster, inc
from distributed import Client, Future, Scheduler, Worker
@gen_cluster(client=True)
async def test_submit(c, s, a, b):
assert isinstance(c, Client)
assert isinstance(s, Scheduler)
assert isinstance(a, Worker)
assert isinstance(b, Worker)
future = c.submit(inc, 1)
assert isinstance(future, Future)
assert future.key in c.futures
# result = future.result() # This synchronous API call would block
result = await future
assert result == 2
assert future.key in s.tasks
assert future.key in a.data or future.key in b.data
@gen_cluster
装饰器为你设置调度器、客户端和工作线程,并在测试后清理它们。它还允许你直接检查集群中每个元素的状态。然而,你不能使用正常的同步API(这样做会导致测试永远等待),而是需要使用协程API,其中所有阻塞函数都以一个下划线(_
)为前缀,并使用 await
等待。注意,在这些测试中使用阻塞接口是一个常见的错误。
如果你想测试正常的同步API,你可以使用 client
pytest fixture 风格的测试,它会在不同的分叉进程中为你设置调度器和工作节点:
from distributed.utils_test import client
def test_submit(client):
future = client.submit(inc, 10)
assert future.result() == 11
此外,如果你想访问调度器和工作进程,你也可以添加 s, a, b
固定装置。
from distributed.utils_test import client
def test_submit(client, s, a, b):
future = client.submit(inc, 10)
assert future.result() == 11 # use the synchronous/blocking API here
a['proc'].terminate() # kill one of the workers
result = future.result() # test that future remains valid
assert result == 2
在这种测试风格中,您无法访问调度器或工作器。变量 s, a, b
现在是包含 multiprocessing.Process
对象和端口整数的字典。然而,您现在可以使用正常的同步API(在这种测试风格中永远不要使用 await
),并且可以通过终止它们来轻松关闭进程。
通常,对于大多数面向用户的功能,你会发现这两种测试。@gen_cluster
测试特定的逻辑,而 client
pytest 夹具测试基本的接口和弹性。
除非绝对必要,例如需要测试命令行接口,否则应避免使用 popen
风格的测试。
代码格式化¶
Dask.distributed 使用多个代码检查工具(flake8、black、isort、pyupgrade、mypy),这些工具由 CI 强制执行。开发者在提交 PR 之前,应通过单个命令 pre-commit run --all-files
在本地运行这些工具。这确保了所有开发者的检查工具版本和选项保持一致。
可选地,您可能希望设置 pre-commit hooks 以便在您进行 git 提交时自动运行。这可以通过运行以下命令来完成:
pre-commit install
从分布式仓库的根目录开始。现在每次提交更改时,代码检查器都会运行。你可以使用 git commit --no-verify
或简短版本 git commit -n
跳过这些检查。